This commit is contained in:
lumos 2025-07-08 15:54:23 +08:00
parent 4e8ad537b4
commit a0452e58b4
9 changed files with 249 additions and 50 deletions

3
.gitignore vendored
View File

@ -1 +1,2 @@
.idea
.idea/
.vscode/

10
.idea/usbFilter.iml generated
View File

@ -1,10 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.10 (usbFilter)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

25
.idea/workspace.xml generated
View File

@ -1,25 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AutoImportSettings">
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="7ef9b496-3d8c-442c-aa37-26696ead54eb" name="Changes" comment="" />
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="ProjectLevelVcsManager" settingsEditedManually="true" />
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"last_opened_file_path": "/home/leo/devCode/outer/USBFilter_30/usbFilter"
}
}]]></component>
<component name="TaskManager">
<servers />
</component>
<component name="com.intellij.coverage.CoverageDataManagerImpl">
<SUITE FILE_PATH="coverage/usbFilter$main.coverage" NAME="main Coverage Results" MODIFIED="1751898648966" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="false" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$" />
</component>
</project>

View File

@ -1 +1,2 @@
from .core import Filter
from .core import Filter
from .utils import enum_usb_devices, enum_pcie_devices

View File

@ -1,23 +1,117 @@
import frida
import logging
import sys
import os
import json
import subprocess
import re
from exFilter.utils import enum_usb_devices, enum_pcie_devices,checkPid
logging.basicConfig(level=logging.INFO)
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(filename)s: %(lineno)d [%(levelname)s] %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S' # 秒级时间格式
)
class Filter:
def __init__(self):
self.logger = logging.getLogger("exFilter")
self.logger.info("core Initialized")
self.logger.info("Initialized")
def start(self):
self.logger.info("core Started")
return
if not self.selfCheck():
self.logger.error("Self check failed, please check the scripts folder.")
return False
else:
self.logger.info("Self check passed")
return True
def stop(self):
self.logger.info("core Stopped")
return
try:
self.script.unload()
self.session.detach()
self.logger.info("core Stopped")
return True
except Exception as e:
self.logger.error(f"Failed to stop core: {e}")
return False
def EnumDriversAndDevices(self):
self.logger.info("Enumerating drivers and devices")
# 列举USB设备
usb_devices_json = enum_usb_devices()
usb_devices = json.loads(usb_devices_json)
# 列举PCIe设备
pcie_devices_json = enum_pcie_devices()
pcie_devices = json.loads(pcie_devices_json)
# 合并设备列表
all_devices = usb_devices + pcie_devices
# 转换为JSON字符串
combined_json = json.dumps(all_devices, indent=2, ensure_ascii=False)
self.logger.debug("Combined devices:")
self.logger.debug(combined_json)
return combined_json
def open(self,device_path):
self.logger.debug(f"check device at {device_path}")
processPid = checkPid(device_path,self.logger)
if processPid <0:
self.logger.error(f"Device {device_path} is not occupied by any process")
return False
else:
self.logger.debug(f"Device {device_path} is occupied by process with PID: {processPid}")
self.session = frida.attach(processPid);
if not self.session:
self.logger.error(f"Failed to attach to process with PID: {processPid}")
return False
else:
self.logger.info(f"Successfully attached to process with PID: {processPid}")
# 根据设备加载不同的过滤脚本
if "usb" in device_path:
try:
self.logger.info(f"Loading USB filter script for device: {device_path}")
with open("scripts/libusbHook.js", "r") as f:
self.script = self.session.create_script(f.read());
self.script.load()
self.logger.info(f"Script loaded successfully")
return True
except Exception as e:
self.logger.error(f"Failed to load USB filter script: {e}")
return False
elif "pci" in device_path:
try:
self.logger.info(f"Loading PCIe filter script for device: {device_path}")
with open("scripts/pcieHook.js", "r") as f:
self.script = self.session.create_script(f.read());
self.script.load()
self.logger.info(f"Script loaded successfully")
return True
except Exception as e:
self.logger.error(f"Failed to load PCIe filter script: {e}")
return False
def selfCheck(self):
self.logger.info("start SelfCheck")
scripts_folder = "scripts"
if not os.path.exists(scripts_folder):
self.logger.info(f"scripts folder does not exist, please check.")
return False
else:
files = os.listdir(scripts_folder)
if len(files) < 0:
self.logger.info("Scripts folder is empty")
return False
return True

131
exFilter/utils.py Normal file
View File

@ -0,0 +1,131 @@
import usb.core
import usb.util
import json
import os
import subprocess
import re
def enum_usb_devices():
devices = usb.core.find(find_all=True)
result = []
for dev in devices:
vendor_id = hex(dev.idVendor)
product_id = hex(dev.idProduct)
# 获取设备名称,部分设备可能没有字符串描述符
try:
device_name = usb.util.get_string(dev, dev.iProduct) or "Unknown" # 必须以sudo的方式运行脚本
except Exception:
device_name = "Unknown"
# 获取设备路径
try:
bus = dev.bus
address = dev.address
device_path = f"/dev/bus/usb/{bus:03d}/{address:03d}"
except Exception:
device_path = "Unknown"
result.append({
"deviceName": device_name,
"vendorID": vendor_id,
"productID": product_id,
"devicePath": device_path,
"subSystem": "USB"
})
return json.dumps(result, indent=2, ensure_ascii=False)
def enum_pcie_devices():
result = []
pci_devices_path = "/sys/bus/pci/devices"
if not os.path.exists(pci_devices_path):
return json.dumps(result, indent=2, ensure_ascii=False)
for device_dir in os.listdir(pci_devices_path):
device_path = os.path.join(pci_devices_path, device_dir)
try:
# 读取vendor ID和product ID
with open(os.path.join(device_path, "vendor"), "r") as f:
vendor_id = f.read().strip()
with open(os.path.join(device_path, "device"), "r") as f:
product_id = f.read().strip()
# 获取设备名称
device_name = "Unknown"
try:
# 尝试从modalias获取设备信息
modalias_path = os.path.join(device_path, "modalias")
if os.path.exists(modalias_path):
with open(modalias_path, "r") as f:
modalias = f.read().strip()
# 尝试从driver获取驱动名称作为设备名称
driver_path = os.path.join(device_path, "driver")
if os.path.islink(driver_path):
driver_name = os.path.basename(os.readlink(driver_path))
device_name = f"PCIe Device ({driver_name})"
# 尝试从subsystem_device和subsystem_vendor获取更详细信息
subsystem_device_path = os.path.join(device_path, "subsystem_device")
if os.path.exists(subsystem_device_path):
with open(subsystem_device_path, "r") as f:
subsystem_device = f.read().strip()
if device_name == "Unknown":
device_name = f"PCIe Device ({subsystem_device})"
except Exception:
device_name = "Unknown"
# 构建设备路径 (PCI设备路径格式)
pci_device_path = f"/sys/bus/pci/devices/{device_dir}"
result.append({
"deviceName": device_name,
"vendorID": vendor_id,
"productID": product_id,
"devicePath": pci_device_path,
"subSystem": "PCIe"
})
except Exception as e:
# 如果读取某个设备信息失败,跳过该设备
continue
return json.dumps(result, indent=2, ensure_ascii=False)
def checkPid(devPath, logger):
processPid = -1
# 通过device_path查看当前占用该设备的进程pid号
try:
# 首先检查设备路径是否存在
if not os.path.exists(devPath):
return processPid
# 添加-w参数禁用警告使用2>/dev/null重定向标准错误
result = subprocess.run(['lsof', '-w', devPath],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True)
if result.returncode == 0:
output = result.stdout.strip()
if output:
# 使用正则表达式提取PID
# lsof输出格式: COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
pid_match = re.search(r'^\S+\s+(\d+)\s+', output, re.MULTILINE)
if pid_match:
processPid = int(pid_match.group(1))
logger.debug(f"Device {devPath} is currently used by process with PID: {processPid}")
except FileNotFoundError:
logger.error("lsof command not found. Please install lsof: sudo apt-get install lsof")
except Exception as e:
logger.error(f"An error occurred while checking the device: {str(e)}")
return processPid

12
main.py
View File

@ -1,9 +1,15 @@
from exFilter import Filter
import sys
from time import sleep
def main():
filter = Filter()
filter.start()
if filter.start():
# 列举USB和PCIe设备
devices = filter.EnumDriversAndDevices()
filter.open("/dev/bus/usb/002/004")
sleep(10)
filter.stop()
return
@ -11,5 +17,3 @@ if __name__ == "__main__":
main()

2
requirement.txt Normal file
View File

@ -0,0 +1,2 @@
frida
pyusb

1
run.sh Normal file
View File

@ -0,0 +1 @@
echo 1|sudo -S /home/leo/devCode/outer/USBFilter_30/usbFilter/.venv/bin/python /home/leo/devCode/outer/USBFilter_30/usbFilter/main.py