diff --git a/.gitignore b/.gitignore index 485dee6..0a95508 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -.idea +.idea/ +.vscode/ \ No newline at end of file diff --git a/.idea/usbFilter.iml b/.idea/usbFilter.iml deleted file mode 100644 index d2e80eb..0000000 --- a/.idea/usbFilter.iml +++ /dev/null @@ -1,10 +0,0 @@ - - - - - - - - - - \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml deleted file mode 100644 index 969ae81..0000000 --- a/.idea/workspace.xml +++ /dev/null @@ -1,25 +0,0 @@ - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/exFilter/__init__.py b/exFilter/__init__.py index 051ef35..2ce4c08 100644 --- a/exFilter/__init__.py +++ b/exFilter/__init__.py @@ -1 +1,2 @@ -from .core import Filter \ No newline at end of file +from .core import Filter +from .utils import enum_usb_devices, enum_pcie_devices \ No newline at end of file diff --git a/exFilter/core.py b/exFilter/core.py index e796c5c..37c12e5 100644 --- a/exFilter/core.py +++ b/exFilter/core.py @@ -1,23 +1,117 @@ +import frida import logging import sys +import os +import json +import subprocess +import re +from exFilter.utils import enum_usb_devices, enum_pcie_devices,checkPid - -logging.basicConfig(level=logging.INFO) - +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s %(filename)s: %(lineno)d [%(levelname)s] %(message)s', + datefmt = '%Y-%m-%d %H:%M:%S' # 秒级时间格式 +) class Filter: def __init__(self): self.logger = logging.getLogger("exFilter") - self.logger.info("core Initialized") + self.logger.info("Initialized") def start(self): - self.logger.info("core Started") - - return + if not self.selfCheck(): + self.logger.error("Self check failed, please check the scripts folder.") + return False + else: + self.logger.info("Self check passed") + return True def stop(self): - self.logger.info("core Stopped") - return + try: + self.script.unload() + self.session.detach() + self.logger.info("core Stopped") + return True + except Exception as e: + self.logger.error(f"Failed to stop core: {e}") + return False + + + + def EnumDriversAndDevices(self): + self.logger.info("Enumerating drivers and devices") + + # 列举USB设备 + usb_devices_json = enum_usb_devices() + usb_devices = json.loads(usb_devices_json) + + # 列举PCIe设备 + pcie_devices_json = enum_pcie_devices() + pcie_devices = json.loads(pcie_devices_json) + + # 合并设备列表 + all_devices = usb_devices + pcie_devices + + # 转换为JSON字符串 + combined_json = json.dumps(all_devices, indent=2, ensure_ascii=False) + + self.logger.debug("Combined devices:") + self.logger.debug(combined_json) + + return combined_json + + def open(self,device_path): + self.logger.debug(f"check device at {device_path}") + processPid = checkPid(device_path,self.logger) + if processPid <0: + self.logger.error(f"Device {device_path} is not occupied by any process") + return False + else: + self.logger.debug(f"Device {device_path} is occupied by process with PID: {processPid}") + self.session = frida.attach(processPid); + if not self.session: + self.logger.error(f"Failed to attach to process with PID: {processPid}") + return False + else: + self.logger.info(f"Successfully attached to process with PID: {processPid}") + # 根据设备加载不同的过滤脚本 + if "usb" in device_path: + try: + self.logger.info(f"Loading USB filter script for device: {device_path}") + with open("scripts/libusbHook.js", "r") as f: + self.script = self.session.create_script(f.read()); + self.script.load() + self.logger.info(f"Script loaded successfully") + return True + except Exception as e: + self.logger.error(f"Failed to load USB filter script: {e}") + return False + elif "pci" in device_path: + try: + self.logger.info(f"Loading PCIe filter script for device: {device_path}") + with open("scripts/pcieHook.js", "r") as f: + self.script = self.session.create_script(f.read()); + self.script.load() + self.logger.info(f"Script loaded successfully") + return True + except Exception as e: + self.logger.error(f"Failed to load PCIe filter script: {e}") + return False + + + def selfCheck(self): + self.logger.info("start SelfCheck") + scripts_folder = "scripts" + if not os.path.exists(scripts_folder): + self.logger.info(f"scripts folder does not exist, please check.") + return False + else: + files = os.listdir(scripts_folder) + if len(files) < 0: + self.logger.info("Scripts folder is empty") + return False + return True + diff --git a/exFilter/utils.py b/exFilter/utils.py new file mode 100644 index 0000000..9e508e8 --- /dev/null +++ b/exFilter/utils.py @@ -0,0 +1,131 @@ +import usb.core +import usb.util +import json +import os +import subprocess +import re + +def enum_usb_devices(): + devices = usb.core.find(find_all=True) + result = [] + for dev in devices: + vendor_id = hex(dev.idVendor) + product_id = hex(dev.idProduct) + + # 获取设备名称,部分设备可能没有字符串描述符 + try: + device_name = usb.util.get_string(dev, dev.iProduct) or "Unknown" # 必须以sudo的方式运行脚本 + except Exception: + device_name = "Unknown" + + # 获取设备路径 + try: + bus = dev.bus + address = dev.address + device_path = f"/dev/bus/usb/{bus:03d}/{address:03d}" + except Exception: + device_path = "Unknown" + + result.append({ + "deviceName": device_name, + "vendorID": vendor_id, + "productID": product_id, + "devicePath": device_path, + "subSystem": "USB" + }) + return json.dumps(result, indent=2, ensure_ascii=False) + + +def enum_pcie_devices(): + result = [] + pci_devices_path = "/sys/bus/pci/devices" + + if not os.path.exists(pci_devices_path): + return json.dumps(result, indent=2, ensure_ascii=False) + + for device_dir in os.listdir(pci_devices_path): + device_path = os.path.join(pci_devices_path, device_dir) + + try: + # 读取vendor ID和product ID + with open(os.path.join(device_path, "vendor"), "r") as f: + vendor_id = f.read().strip() + + with open(os.path.join(device_path, "device"), "r") as f: + product_id = f.read().strip() + + # 获取设备名称 + device_name = "Unknown" + try: + # 尝试从modalias获取设备信息 + modalias_path = os.path.join(device_path, "modalias") + if os.path.exists(modalias_path): + with open(modalias_path, "r") as f: + modalias = f.read().strip() + + # 尝试从driver获取驱动名称作为设备名称 + driver_path = os.path.join(device_path, "driver") + if os.path.islink(driver_path): + driver_name = os.path.basename(os.readlink(driver_path)) + device_name = f"PCIe Device ({driver_name})" + + # 尝试从subsystem_device和subsystem_vendor获取更详细信息 + subsystem_device_path = os.path.join(device_path, "subsystem_device") + if os.path.exists(subsystem_device_path): + with open(subsystem_device_path, "r") as f: + subsystem_device = f.read().strip() + if device_name == "Unknown": + device_name = f"PCIe Device ({subsystem_device})" + + except Exception: + device_name = "Unknown" + + # 构建设备路径 (PCI设备路径格式) + pci_device_path = f"/sys/bus/pci/devices/{device_dir}" + + result.append({ + "deviceName": device_name, + "vendorID": vendor_id, + "productID": product_id, + "devicePath": pci_device_path, + "subSystem": "PCIe" + }) + + except Exception as e: + # 如果读取某个设备信息失败,跳过该设备 + continue + + return json.dumps(result, indent=2, ensure_ascii=False) + + + +def checkPid(devPath, logger): + processPid = -1 + # 通过device_path查看当前占用该设备的进程pid号 + try: + # 首先检查设备路径是否存在 + if not os.path.exists(devPath): + return processPid + + # 添加-w参数禁用警告,使用2>/dev/null重定向标准错误 + result = subprocess.run(['lsof', '-w', devPath], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + text=True) + + if result.returncode == 0: + output = result.stdout.strip() + if output: + # 使用正则表达式提取PID + # lsof输出格式: COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME + pid_match = re.search(r'^\S+\s+(\d+)\s+', output, re.MULTILINE) + if pid_match: + processPid = int(pid_match.group(1)) + logger.debug(f"Device {devPath} is currently used by process with PID: {processPid}") + + except FileNotFoundError: + logger.error("lsof command not found. Please install lsof: sudo apt-get install lsof") + except Exception as e: + logger.error(f"An error occurred while checking the device: {str(e)}") + + return processPid \ No newline at end of file diff --git a/main.py b/main.py index b60f3ce..1f032b2 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,15 @@ from exFilter import Filter - +import sys +from time import sleep def main(): filter = Filter() - filter.start() + if filter.start(): + # 列举USB和PCIe设备 + devices = filter.EnumDriversAndDevices() + filter.open("/dev/bus/usb/002/004") + sleep(10) + filter.stop() return @@ -11,5 +17,3 @@ if __name__ == "__main__": main() - - diff --git a/requirement.txt b/requirement.txt new file mode 100644 index 0000000..61be80a --- /dev/null +++ b/requirement.txt @@ -0,0 +1,2 @@ +frida +pyusb \ No newline at end of file diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..3983dd1 --- /dev/null +++ b/run.sh @@ -0,0 +1 @@ +echo 1|sudo -S /home/leo/devCode/outer/USBFilter_30/usbFilter/.venv/bin/python /home/leo/devCode/outer/USBFilter_30/usbFilter/main.py \ No newline at end of file