'''独立模块最终版本 - 完整串口通信版本''' import ctypes import time import logging from typing import Callable, List, Dict, Optional import csv from datetime import datetime import os logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s' ) logger = logging.getLogger(__name__) # === 协议常量(基于《SP713-ICD协议_V1.6》) === TIMEOUT = 1.0 # 串口通信超时时间(秒) FRAME_HEADER = [0x9F, 0xE4] # 帧头,表3字节1-2:低字节0x9F,高字节0xE4 FRAME_SIZE = 338 # 固定帧长度,表3定义338字节 BAUDRATE = 115200 # 波特率,表1定义115200bps VOLTAGE_TYPE_SAME = 0x01 # 电压类型:同值,表5 VOLTAGE_TYPE_DIFF = 0x02 # 电压类型:异值,表5 # === 命令码(基于表4) === SP_CMD_NA = 0 # 无效命令 SP_CMD_ZERO = 1 # 归零命令 SP_CMD_SAME_VALUE = 2 # 同值配置命令 SP_CMD_DIF_VALUE = 3 # 异值配置命令 SP_CMD_RGG_WR = 4 # 寄存器写入命令 SP_CMD_RGG_RD = 5 # 寄存器读取命令 # === 分组常量(基于表4) === GROUP_ALL = 0xAA # 所有分组 GROUP_AB = 0x01 # AB分组 GROUP_OPA = 0x02 # OPA分组 GROUP_DAC = 0x03 # DAC分组 # === 寄存器常量定义 === REG_DEFAULT_VALUES = { 'R1': 0x00, # 寄存器1默认值 'R2': 0xA0, # 寄存器2默认值 'R3': 0x28, # 寄存器3默认值 'R4': 0x00 # 寄存器4默认值 } # === CH347 DLL加载 === try: # 尝试加载CH347芯片的DLL驱动 ch347 = ctypes.WinDLL(r"D:\mjq\Hot_SIOPA\SP713-spi\CH347DLLA64.DLL") except Exception as e: logger.error(f"无法加载 CH347DLL.DLL: {e}") raise ValueError("DLL加载失败") # def calculate_crc16(data: bytes) -> int: # """计算CRC16校验值 """ # crc = 0xFFFF # 初始化CRC寄存器为0xFFFF # for byte in data: # crc ^= byte # 当前字节与CRC低8位异或 # for _ in range(8): # 处理每个bit # if crc & 0x0001: # 检查最低位是否为1 # crc >>= 1 # 右移1位 # crc ^= 0xA001 # 如果最低位是1,异或多项式0xA001 # else: # crc >>= 1 # 否则仅右移 # return crc # 返回最终CRC值 # === 新增反射型 CRC16 计算函数 === def generate_crc16_reverse_table(poly=0x8408): """生成 LSB-first 反射型 CRC-16 表""" table = [] for i in range(256): crc = i for _ in range(8): if crc & 0x0001: crc = (crc >> 1) ^ poly else: crc >>= 1 table.append(crc & 0xFFFF) return table CRC16_REVERSE_TABLE = generate_crc16_reverse_table() # 预计算表 def calculate_crc16(data: bytes) -> int: """使用反射表计算 CRC16 """ crc = 0x6363 # 初始值 for b in data: crc = (crc >> 8) ^ CRC16_REVERSE_TABLE[(crc ^ b) & 0xFF] return crc & 0xFFFF class UARTHandler: """ 串口通信处理器类,用于管理CH347芯片的UART通信 功能: - 打开/关闭UART设备 - 发送/接收UART数据 - 计算校验和 - 调试模式控制 """ def __init__(self): self.device_index = -1 # CH347设备索引,-1表示未打开 self.running = False # 串口通信运行状态 self.data_received_callback = None # 数据接收回调函数 self.frame_count = 0 # 帧计数器,表3字节15,0-255循环 self.debug_mode = False # 调试模式开关 def open(self) -> bool: """ 打开CH347设备并配置UART接口 返回: bool - 成功返回True,失败返回False """ try: # 1. 打开设备 self.device_index = ch347.CH347Uart_Open(0) if self.device_index == -1: logger.error("无法打开 CH347 UART 设备") return False # 2. 初始化串口参数 # 参数: 波特率, 数据位(8), 校验位(0=None), 停止位(0=1位), 字节超时(单位100uS) if not ch347.CH347Uart_Init(0, BAUDRATE, 8, 0, 0, 0): logger.error("UART 初始化失败") return False # 3. 设置超时时间 if not ch347.CH347Uart_SetTimeout(0, 1000, 1000): # 读写超时各1秒 logger.error("设置 UART 超时失败") return False self.running = True logger.info(f"CH347 UART 接口打开成功,波特率={BAUDRATE}") return True except Exception as e: logger.error(f"打开 CH347 UART 接口失败: {e}") return False def send_data(self, data: bytes) -> bool: """ 通过UART发送数据帧 参数: data (bytes): 要发送的数据 返回: bool: 发送成功返回True,失败返回False """ if self.debug_mode: logger.debug(f"发送数据: {data.hex()}") try: # 准备发送缓冲区 write_buffer = (ctypes.c_ubyte * len(data)).from_buffer_copy(data) length = ctypes.c_ulong(len(data)) # 执行UART写操作 success = ch347.CH347Uart_Write( 0, write_buffer, ctypes.byref(length)) if not success or length.value != len(data): logger.error(f"UART 数据发送失败,实际发送 {length.value}/{len(data)} 字节") return False logger.debug("UART 数据发送成功") return True except Exception as e: logger.error(f"UART 发送数据失败: {e}") return False def read_data(self, length: int) -> bytes: """ 通过UART读取指定长度的数据 参数: length (int): 要读取的数据长度 返回: bytes: 读取到的数据,失败返回空bytes """ try: # 准备接收缓冲区 read_buffer = (ctypes.c_ubyte * length)() read_length = ctypes.c_ulong(length) # 执行UART读操作 success = ch347.CH347Uart_Read( 0, read_buffer, ctypes.byref(read_length)) if not success: logger.error("UART 读取数据失败") return b"" if self.debug_mode: logger.debug(f"接收数据: {bytes(read_buffer[:read_length.value]).hex()}") return bytes(read_buffer[:read_length.value]) except Exception as e: logger.error(f"UART 读取数据失败: {e}") return b"" def set_data_received_callback(self, callback: Callable[[bytes], None]) -> None: """ 设置数据接收回调函数 参数: callback (Callable[[bytes], None]): 回调函数 """ self.data_received_callback = callback def debug(self, on: bool = True) -> None: """ 开启或关闭调试模式 参数: on (bool): True开启调试,False关闭调试 """ self.debug_mode = on logger.info(f"调试模式: {'开启' if on else '关闭'}") def stop(self) -> None: """ 关闭CH347设备 """ self.running = False ch347.CH347Uart_Close(0) logger.info("CH347 UART 接口已关闭") def calculate_checksum(self, data: List[int]) -> int: """ 计算校验和(表3字节16和338) 参数: data (List[int]): 要计算校验和的数据列表 返回: int: 校验和值 """ return sum(data) & 0xFF class DACHandler: """ DAC测试工装协议处理器类 功能: - 处理DAC测试工装的协议逻辑 - 通过UARTHandler与FPGA和DAC512通信 - 支持多种命令操作 """ def __init__(self): # 初始化串口处理器 self.uart_handler = UARTHandler() if not self.uart_handler.open(): raise Exception("初始化 UARTHandler 失败") # 命令回调函数字典 self.callbacks = { 'set_invalid_cmd_res': None, 'set_zero_res': None, 'set_same_value_res': None, 'set_diff_value_res': None, 'set_rgg_wr_res': None, 'set_rgg_rd_res': None, } self.devices = [0] # 设备列表 def init_usb_device(self) -> List[int]: """ 初始化USB设备 返回: List[int]: 设备索引列表 """ return self.devices def set_callback(self, signal: str, callback: Callable[[bool], None]) -> None: """ 设置命令响应回调函数 参数: signal (str): 信号名称 callback (Callable[[bool], None]): 回调函数 """ if signal in self.callbacks: self.callbacks[signal] = callback def debug(self, on: bool = True) -> None: """ 开启或关闭调试模式 参数: on (bool): True开启调试,False关闭调试 """ self.uart_handler.debug(on) def stop(self) -> None: """ 停止串口通信,关闭设备 """ self.uart_handler.stop() def _validate_group_child(self, group: int, child_group: int) -> bool: """ 验证分组和子分组的有效性(基于表4) 参数: group (int): 分组值 child_group (int): 子分组值 返回: bool: 有效返回True,无效返回False """ # 定义有效的分组和子分组对应关系 valid_pairs = { GROUP_ALL: [0xAA], GROUP_AB: [0x01, 0x02], GROUP_OPA: [0x01, 0x02, 0x03, 0x04, 0x05], GROUP_DAC: list(range(0x01, 0x15)) } if group not in valid_pairs: logger.error(f"无效的分组: {group:02X}") return False if child_group not in valid_pairs[group]: logger.error(f"子分组 {child_group:02X} 与分组 {group:02X} 不匹配") return False return True def _encode_10bit_voltages(self, values: List[int]) -> List[int]: """ 将10位电压值编码为320字节(符合表5异值格式) 参数: values (List[int]): 10位电压值列表(0-1023,最多256个) 返回: List[int]: 320字节电压值参数 """ if not values: return [0] * 320 if any(v < 0 or v > 1023 for v in values): logger.error("电压值必须在0-1023范围内") return [0] * 320 # 初始化320字节 bytes_out = [0] * 320 bit_position = 0 # 当前比特位置 byte_index = 0 # 当前字节索引 for value in values[:256]: # 最多256个值 # 剩余比特数 bits_remaining = 10 while bits_remaining > 0: # 当前字节可用比特数 bits_in_current_byte = 8 - (bit_position % 8) # 本次写入的比特数 bits_to_write = min(bits_remaining, bits_in_current_byte) # 提取value的低bits_to_write位 value_bits = (value >> (bits_remaining - bits_to_write)) & ((1 << bits_to_write) - 1) # 写入当前字节 bytes_out[byte_index] |= value_bits << (bits_in_current_byte - bits_to_write) # 更新位置 bit_position += bits_to_write bits_remaining -= bits_to_write if bit_position % 8 == 0: byte_index += 1 if self.uart_handler.debug_mode: logger.debug(f"编码10位电压值: {bytes(bytes_out[:10]).hex()}...(前10字节)") return bytes_out def _build_frame(self, cmd: int, control_params: List[int], voltage_data: List[int], voltage_type: int) -> bytes: """构建340字节协议帧(V2.0版本,含CRC16校验)""" if len(control_params) != 11: logger.error("控制参数长度必须为11字节") return b"" # 初始化帧 frame = FRAME_HEADER + [cmd] + control_params + [self.uart_handler.frame_count] checksum1 = self.uart_handler.calculate_checksum(frame) frame.append(checksum1) frame.append(voltage_type) # 处理电压值参数 if voltage_type == VOLTAGE_TYPE_DIFF: voltage_bytes = self._encode_10bit_voltages(voltage_data[:256]) else: little_endian_voltage = [] for val in voltage_data[:159]: little_endian_voltage.extend([val & 0xFF, (val >> 8) & 0xFF]) voltage_bytes = little_endian_voltage + [0] * (320 - len(little_endian_voltage)) frame.extend(voltage_bytes) # 新增:计算电压数据的CRC16(字节18-337) voltage_part = bytes(frame[17:337]) # 提取电压数据部分 crc16 = calculate_crc16(voltage_part) frame.extend([crc16 & 0xFF, (crc16 >> 8) & 0xFF]) # 小端模式 # 计算最终校验和(字节1-339) checksum2 = self.uart_handler.calculate_checksum(frame) frame.append(checksum2) self.uart_handler.frame_count = (self.uart_handler.frame_count + 1) % 256 if self.uart_handler.debug_mode: logger.debug(f"构建帧: {bytes(frame).hex()}") logger.debug(f"CRC16: 0x{crc16:04X}") # frame = frame[:16] return bytes(frame) def set_invalid_cmd(self, dev_index: int = 0) -> bool: """ 发送无效命令(SP_CMD_NA) 参数: dev_index (int): 设备索引 返回: bool: 发送成功返回True,失败返回False """ cmd = SP_CMD_NA control_params = [0] * 11 voltage_data = [] frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME) success = self.uart_handler.send_data(frame) if success and self.callbacks['set_invalid_cmd_res']: self.callbacks['set_invalid_cmd_res'](True) elif not success and self.callbacks['set_invalid_cmd_res']: self.callbacks['set_invalid_cmd_res'](False) return success def set_zero(self, dev_index: int, group: int, child_group: int) -> bool: """ 发送归零命令(SP_CMD_ZERO) 参数: dev_index (int): 设备索引 group (int): 分组值 child_group (int): 子分组值 返回: bool: 发送成功返回True,失败返回False """ if not self._validate_group_child(group, child_group): return False cmd = SP_CMD_ZERO control_params = [group & 0xFF, child_group & 0xFF] + [0] * 9 voltage_data = [] frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME) success = self.uart_handler.send_data(frame) if success: logger.info(f"归零成功,分组: {group:02X}, 子分组: {child_group:02X}") if self.callbacks['set_zero_res']: self.callbacks['set_zero_res'](True) else: logger.error(f"归零失败,分组: {group:02X}, 子分组: {child_group:02X}") if self.callbacks['set_zero_res']: self.callbacks['set_zero_res'](False) return success def set_same_value(self, dev_index: int, value: int, group: int, child_group: int) -> bool: """ 发送同值配置命令(SP_CMD_SAME_VALUE) 参数: dev_index (int): 设备索引 value (int): 电压值(0-0xFFFF) group (int): 分组值 child_group (int): 子分组值 返回: bool: 发送成功返回True,失败返回False """ if not (0 <= value <= 0xFFFF): logger.error(f"无效的电压值: {value}") return False if not self._validate_group_child(group, child_group): return False cmd = SP_CMD_SAME_VALUE control_params = [group & 0xFF, child_group & 0xFF] + [0] * 9 voltage_data = [value] * 159 frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME) success = self.uart_handler.send_data(frame) if success: logger.info(f"同值配置成功,分组: {group:02X}, 子分组: {child_group:02X}, 电压值: {value}") if self.callbacks['set_same_value_res']: self.callbacks['set_same_value_res'](True) else: logger.error(f"同值配置失败,分组: {group:02X}, 子分组: {child_group:02X}") if self.callbacks['set_same_value_res']: self.callbacks['set_same_value_res'](False) return success def set_diff_value(self, dev_index: int, values: List[int], group: int = GROUP_DAC, child_group: int = 1) -> bool: """ 发送异值配置命令(SP_CMD_DIF_VALUE),设置256个10位电压值 参数: dev_index (int): 设备索引 values (List[int]): 10位电压值列表(0-1023,最多256个) group (int): 分组值(默认 GROUP_DAC) child_group (int): 子分组值(默认 1) 返回: bool: 发送成功返回True,失败返回False """ # 验证电压值数量和范围 if len(values) > 256: logger.error("异值配置电压值不能超过 256 个") return False if any(v < 0 or v > 1023 for v in values): logger.error("电压值必须在0-1023范围内") return False if not self._validate_group_child(group, child_group): return False cmd = SP_CMD_DIF_VALUE control_params = [group & 0xFF, child_group & 0xFF] + [0] * 9 frame = self._build_frame(cmd, control_params, values, VOLTAGE_TYPE_DIFF) success = self.uart_handler.send_data(frame) if success: logger.info(f"异值配置成功,分组: {group:02X}, 子分组: {child_group:02X}, 电压值数量: {len(values)}") if self.callbacks['set_diff_value_res']: self.callbacks['set_diff_value_res'](True) else: logger.error(f"异值配置失败,分组: {group:02X}, 子分组: {child_group:02X}") if self.callbacks['set_diff_value_res']: self.callbacks['set_diff_value_res'](False) return success def set_rgg_wr(self, dev_index: int, dac_number: int, dac_channel: int, en_tadc: int, fbk_en: int, temp_test_en: int, test_en: int, trig_tadc: int, clk_div: int, icon8: int, votes: List[int]) -> bool: """ 发送寄存器写入命令(SP_CMD_RGG_WR) 参数: dev_index (int): 设备索引 dac_number (int): DAC编号 dac_channel (int): DAC通道 en_tadc (int): 使能TADC fbk_en (int): 反馈使能 temp_test_en (int): 温度测试使能 test_en (int): 测试使能 trig_tadc (int): 触发TADC clk_div (int): 时钟分频 icon8 (int): ICON8配置 votes (List[int]): 投票值列表 返回: bool: 发送成功返回True,失败返回False """ cmd = SP_CMD_RGG_WR control_params = [ dac_number & 0xFF, dac_channel & 0xFF, (dac_channel >> 8) & 0xFF, en_tadc & 0x01, fbk_en & 0x01, temp_test_en & 0x01, test_en & 0x01, trig_tadc & 0x01, clk_div & 0xFF, icon8 & 0xFF, 0 ] voltage_data = votes[:159] frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME) success = self.uart_handler.send_data(frame) if success and self.callbacks['set_rgg_wr_res']: self.callbacks['set_rgg_wr_res'](True) elif not success and self.callbacks['set_rgg_wr_res']: self.callbacks['set_rgg_wr_res'](False) return success def set_rgg_rd(self, dev_index: int, dac_number: int, dac_channel: int) -> bool: """ 发送寄存器读取命令(SP_CMD_RGG_RD) 参数: dev_index (int): 设备索引 dac_number (int): DAC编号 dac_channel (int): DAC通道 返回: bool: 发送成功返回True,失败返回False """ cmd = SP_CMD_RGG_RD control_params = [ dac_number & 0xFF, dac_channel & 0xFF, (dac_channel >> 8) & 0xFF, 0, 0, 0, 0, 0, 0, 0, 0 ] voltage_data = [] frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME) success = self.uart_handler.send_data(frame) if success: time.sleep(1) response = self.uart_handler.read_data(7) self._parse_response(response) if success and self.callbacks['set_rgg_rd_res']: self.callbacks['set_rgg_rd_res'](True) elif not success and self.callbacks['set_rgg_rd_res']: self.callbacks['set_rgg_rd_res'](False) return success def _parse_response(self, data: bytes) -> None: """ 解析寄存器读取响应 参数: data (bytes): 接收到的数据 """ if len(data) < 7: logger.error("接收数据过短,期望7字节寄存器值") return try: registers = list(data[:7]) logger.info(f"寄存器值 (REG7-REG1): {registers}") except Exception as e: logger.error(f"解析寄存器数据失败: {e}") def apply_arg(self, dev_index: int, cmd_arg: Dict, votege_arg: List[int]) -> bool: """ 根据命令参数字典执行相应命令 参数: dev_index (int): 设备索引 cmd_arg (Dict): 命令参数字典 votege_arg (List[int]): 电压参数列表 返回: bool: 执行成功返回True,失败返回False """ cmd = cmd_arg.get('cmd', SP_CMD_NA) # 根据命令码调用相应的方法 if cmd == SP_CMD_RGG_WR: return self.set_rgg_wr( dev_index, cmd_arg.get('dacNumber', 0), cmd_arg.get('dacChannel', 0), cmd_arg.get('enTadc', 1), cmd_arg.get('fbkEn', 0), cmd_arg.get('tempTestEn', 0), cmd_arg.get('testEn', 1), cmd_arg.get('trigTadc', 0), cmd_arg.get('clkDiv', 1), cmd_arg.get('icon8', 1), votege_arg ) elif cmd == SP_CMD_DIF_VALUE: return self.set_diff_value( dev_index, votege_arg, cmd_arg.get('group', GROUP_DAC), cmd_arg.get('childGroup', 1) ) elif cmd == SP_CMD_SAME_VALUE: return self.set_same_value( dev_index, votege_arg[0] if votege_arg else 0, cmd_arg.get('group', GROUP_DAC), cmd_arg.get('childGroup', 1) ) elif cmd == SP_CMD_ZERO: return self.set_zero( dev_index, cmd_arg.get('group', GROUP_ALL), cmd_arg.get('childGroup', GROUP_ALL) ) elif cmd == SP_CMD_RGG_RD: return self.set_rgg_rd( dev_index, cmd_arg.get('dacNumber', 0), cmd_arg.get('dacChannel', 0) ) else: return self.set_invalid_cmd(dev_index) def self_test_voltage_config(self, dev_index: int, dac_number: int, voltage_values: List[int]) -> bool: """ DAC自检电压配置 参数: dev_index (int): 设备索引 dac_number (int): DAC编号 voltage_values (List[int]): 电压值列表 返回: bool: 执行成功返回True,失败返回False """ cmd_arg = { 'cmd': SP_CMD_RGG_WR, 'dacNumber': dac_number, 'dacChannel': 0, 'enTadc': 0, 'fbkEn': 0, 'tempTestEn': 0, 'testEn': 0, 'trigTadc': 0, 'clkDiv': 0, 'icon8': 0 } success = self.apply_arg(dev_index, cmd_arg, voltage_values[:11]) if success: logger.info(f"DAC {dac_number} 电压配置成功") time.sleep(0.1) # 等待1秒清零 success = self.set_invalid_cmd(dev_index) if success: logger.info("DAC与FPGA通信验证成功") else: logger.error("DAC与FPGA通信验证失败") else: logger.error(f"DAC {dac_number} 电压配置失败") return success def self_test_adc_tri(self, dev_index: int, dac_number: int, dac_channel: int) -> bool: """ DAC自检ADC触发 参数: dev_index (int): 设备索引 dac_number (int): DAC编号 dac_channel (int): DAC通道 返回: bool: 执行成功返回True,失败返回False """ # 初始化寄存器 cmd_arg_init = { 'cmd': SP_CMD_RGG_WR, 'dacNumber': dac_number, 'dacChannel': dac_channel, 'enTadc': 1, 'fbkEn': 0, 'tempTestEn': 0, 'testEn': 1, 'trigTadc': 0, 'clkDiv': 0x01, 'icon8': 0x01 } success = self.apply_arg(dev_index, cmd_arg_init, []) if not success: logger.error(f"DAC {dac_number} 寄存器初始化失败") return False logger.info(f"DAC {dac_number} 寄存器初始化成功") # time.sleep(0.001) # 配置通道 cmd_arg_channel = { 'cmd': SP_CMD_RGG_WR, 'dacNumber': dac_number, 'dacChannel': dac_channel, 'enTadc': 1, 'fbkEn': 0, 'tempTestEn': 0, 'testEn': 1, 'trigTadc': 1, 'clkDiv': 0x01, 'icon8': 0x01 } success = self.apply_arg(dev_index, cmd_arg_channel, []) if not success: logger.error(f"DAC {dac_number} 通道选择配置失败") return False logger.info(f"DAC {dac_number} 通道选择配置成功") # time.sleep(0.001) # 读取寄存器 cmd_arg_read = { 'cmd': SP_CMD_RGG_RD, 'dacNumber': dac_number, 'dacChannel': dac_channel } success = self.apply_arg(dev_index, cmd_arg_read, []) if success: logger.info(f"DAC {dac_number} 寄存器读取成功") else: logger.error(f"DAC {dac_number} 寄存器读取失败") return success def voltage_lookup_config(self, dev_index: int, opa_number: int, voltage_values: List[int], same_value: bool = True) -> bool: """ 配置OPA的电压值 参数: dev_index (int): 设备索引 opa_number (int): OPA编号 voltage_values (List[int]): 电压值列表 same_value (bool): 是否使用同值配置 返回: bool: 执行成功返回True,失败返回False """ # OPA到DAC的映射关系 opa_to_dac = { 0: [4, 5, 9, 10], 1: [2, 3, 7, 8], 2: [1, 6, 11, 16], 3: [12, 13, 17, 18], 4: [14, 15, 19, 20] } dac_numbers = opa_to_dac.get(opa_number, []) if not dac_numbers: logger.error(f"无效的OPA编号: {opa_number}") return False success = True for dac_number in dac_numbers: if same_value: # 同值配置 cmd_arg = { 'cmd': SP_CMD_SAME_VALUE, 'group': GROUP_DAC, 'childGroup': dac_number } success &= self.apply_arg(dev_index, cmd_arg, [voltage_values[0]] if voltage_values else [0]) if success: logger.info(f"OPA {opa_number}, DAC {dac_number} 同值配置成功") else: logger.error(f"OPA {opa_number}, DAC {dac_number} 同值配置失败") else: # 异值配置 cmd_arg = { 'cmd': SP_CMD_DIF_VALUE, 'group': GROUP_DAC, 'childGroup': dac_number } success &= self.apply_arg(dev_index, cmd_arg, voltage_values[:256]) if success: logger.info(f"OPA {opa_number}, DAC {dac_number} 异值配置成功") else: logger.error(f"OPA {opa_number}, DAC {dac_number} 异值配置失败") return success def send_1024_voltages_to_opa(self, dev_index: int, opa_number: int, voltage_values: List[int]) -> bool: """ 向OPA发送1024个电压值,分组给4个DAC 参数: dev_index (int): 设备索引 opa_number (int): OPA编号 voltage_values (List[int]): 电压值列表(必须为1024个) group_dac: DAC分组编号 返回: bool: 执行成功返回True,失败返回False """ if len(voltage_values) != 1024: logger.error("电压值列表长度必须为1024") return False # OPA到DAC的映射关系 opa_to_dac = { 0: [4, 5, 9, 10], 1: [2, 3, 7, 8], 2: [1, 6, 11, 16], 3: [12, 13, 17, 18], 4: [14, 15, 19, 20] } dac_numbers = opa_to_dac.get(opa_number, []) if not dac_numbers: logger.error(f"无效的OPA编号: {opa_number}") return False # 将1024个电压值分成4组,每组256个 voltage_chunks = [ voltage_values[0:256], voltage_values[256:512], voltage_values[512:768], voltage_values[768:1024] ] success = True for dac_number, voltages in zip(dac_numbers, voltage_chunks): time.sleep(0.1) cmd_arg = { 'cmd': SP_CMD_DIF_VALUE, 'group': GROUP_DAC, 'childGroup': dac_number } success &= self.apply_arg(dev_index, cmd_arg, voltages) if success: logger.info(f"OPA {opa_number}, DAC {dac_number} 1024电压值配置成功") else: logger.error(f"OPA {opa_number}, DAC {dac_number} 1024电压值配置失败") break return success def send_voltages_from_file(self, dev_index: int, opa_number: int, file_path: str) -> bool: """ 从文本文件读取1024个电压值并发送到指定OPA 参数: dev_index (int): 设备索引 opa_number (int): OPA编号 (0-4) file_path (str): 电压值文件路径 返回: bool: 执行成功返回True,失败返回False """ try: # 读取文件内容 with open(file_path, 'r') as f: lines = f.readlines() # 转换为整数列表(过滤空行和非数字内容) voltage_values = [] for line in lines: line = line.strip() if line and line.isdigit(): voltage_values.append(int(line)) # 验证数据量 if len(voltage_values) != 1024: logger.error(f"电压值数量必须为1024个,实际读取到 {len(voltage_values)} 个") return False # 调用现有方法发送数据 return self.send_1024_voltages_to_opa(dev_index, opa_number, voltage_values) except Exception as e: logger.error(f"读取电压文件失败: {e}") return False # =====寄存器读写======== def write_channel_registers(self, dev_index: int, dac_num: int, channel: int, reg_values: Dict[str, int]) -> bool: """ 写入指定DAC通道的寄存器值(R1-R4) 基于SP_CMD_REG_WR命令(表4) 参数: dev_index: 设备索引 dac_num: DAC编号(1-20) channel: 通道号(0-255) reg_values: 寄存器值字典 {'R1':0x00, 'R2':0xA0, 'R3':0x28, 'R4':0x00} 返回: bool: 成功返回True """ # 参数校验 if not (1 <= dac_num <= 20 and 1 <= channel <= 255): # 将0 <= channel改为1 <= channel logger.error(f"参数错误: DAC编号({dac_num})必须1-20, 通道号({channel})必须0-255") return False # 构建控制参数(11字节) control_params = [ dac_num & 0x7F, # 字节1:DAC编号(bit6-0) channel & 0xFF, # 字节2:目标通道号 0x01, # 字节3:bit0=1(EN_TADC) 0x10, # 字节4:bit4=1(TRIG_TADC) reg_values.get('R1', 0x00), reg_values.get('R2', 0xA0), reg_values.get('R3', 0x28), reg_values.get('R4', 0x00), 0, 0, 0 # 保留字节 ] # 构建并发送帧 frame = self._build_frame( cmd=SP_CMD_RGG_WR, control_params=control_params, voltage_data=[], voltage_type=VOLTAGE_TYPE_SAME ) success = self.uart_handler.send_data(frame) if success: logger.info(f"DAC{dac_num}通道{channel}寄存器写入: " f"R1=0x{reg_values.get('R1', 0x00):02X} " f"R2=0x{reg_values.get('R2', 0xA0):02X} " f"R3=0x{reg_values.get('R3', 0x28):02X} " f"R4=0x{reg_values.get('R4', 0x00):02X}") return success def read_channel_registers(self, dev_index: int, dac_num: int, channel: int) -> Optional[Dict[str, int]]: """ 读取指定DAC通道的寄存器值(R1-R4) 基于SP_CMD_REG_RD命令(表4) 参数: dev_index: 设备索引 dac_num: DAC编号(1-20) channel: 通道号(0-255) 返回: Dict: 寄存器值字典 {'R1':int, 'R2':int, 'R3':int, 'R4':int} """ if not (1 <= dac_num <= 20 and 1 <= channel <= 255): # 将0 <= channel改为1 <= channel logger.error(f"参数错误: DAC编号({dac_num})必须1-20, 通道号({channel})必须0-255") return None # 发送读取命令 control_params = [ dac_num & 0x7F, # DAC编号 channel & 0xFF, # 目标通道 0, 0, 0, 0, 0, 0, 0, 0, 0 # 保留字节 ] frame = self._build_frame( cmd=SP_CMD_RGG_RD, control_params=control_params, voltage_data=[], voltage_type=VOLTAGE_TYPE_SAME ) if not self.uart_handler.send_data(frame): return None # 读取响应(等待50ms后读取7字节) time.sleep(0.05) response = self.uart_handler.read_data(7) if len(response) < 7: logger.error(f"DAC{dac_num}通道{channel}响应数据不足,期望7字节,收到{len(response)}字节") return None # return { # 'R1': 0x00, # 'R2': 0x00, # 'R3': 0x00, # 'R4': 0x00, # 'R5': 0x00, # 'R6': 0x00, # 'R7': 0x00 # } return { 'R1': response[3], 'R2': response[4], 'R3': response[5], 'R4': response[6] } # =====寄存器批量读写================== def batch_read_all_registers(self, dev_index: int) -> bool: """ 上电首次:批量读取所有DAC所有通道的寄存器值 返回: bool: 全部成功返回True,任意失败返回False """ success = True for dac_num in range(1, 21): # DAC编号1-20 channel_data = {} for channel in range(1,256): # 通道0-255 regs = self.read_channel_registers(dev_index, dac_num, channel) if regs is None: logger.error(f"DAC{dac_num}通道{channel}读取失败") success = False continue channel_data[channel] = regs # 保存到CSV CSVLogger.write_register_read(dac_num, channel_data) return success def batch_write_all_registers(self, dev_index: int) -> bool: """ 批量写入所有DAC所有通道的默认寄存器值 返回: bool: 全部成功返回True,任意失败返回False """ success = True for dac_num in range(1, 21): # DAC编号1-20 write_cmd_data = {} for channel in range(1,256): # 通道0-255 # 构建控制参数(记录4-7字节) control_params = [ dac_num & 0x7F, # 字节1 0x00, # 字节5 0x09, # 字节6 0x81, # 字节7 # channel & 0xFF, # 字节2 # 0x01, # 字节3 (EN_TADC=1) # 0x10, # 字节4 (TRIG_TADC=1) REG_DEFAULT_VALUES['R1'], # 字节5 (R1) REG_DEFAULT_VALUES['R2'], # 字节6 (R2) REG_DEFAULT_VALUES['R3'], # 字节7 (R3) REG_DEFAULT_VALUES['R4'], # 字节8 (R4) 0, 0, 0 # 保留字节 ] # 记录写入命令的4-7字节 write_cmd_data[channel] = control_params[0:7] # 取4-7字节 # 发送写入命令 frame = self._build_frame( cmd=SP_CMD_RGG_WR, control_params=control_params, voltage_data=[], voltage_type=VOLTAGE_TYPE_SAME ) if not self.uart_handler.send_data(frame): logger.error(f"DAC{dac_num}通道{channel}写入失败") success = False # 保存写入命令到CSV CSVLogger.write_register_write(dac_num, write_cmd_data) return success # def batch_verify_all_registers(self, dev_index: int) -> bool: # """ # 批量验证所有DAC寄存器值是否正确 # 返回: # bool: 全部正确返回True,任意错误返回False # """ # success = True # for dac_num in range(1, 21): # DAC编号1-20 # channel_data = {} # for channel in range(1,256): # 通道0-255 # regs = self.read_channel_registers(dev_index, dac_num, channel) # if regs is None: # logger.error(f"DAC{dac_num}通道{channel}读取失败") # success = False # continue # # 验证寄存器值是否符合默认值 # if (regs['R1'] != REG_DEFAULT_VALUES['R1'] or # regs['R2'] != REG_DEFAULT_VALUES['R2'] or # regs['R3'] != REG_DEFAULT_VALUES['R3'] or # regs['R4'] != REG_DEFAULT_VALUES['R4']): # logger.error(f"DAC{dac_num}通道{channel}校验失败: " # f"R1=0x{regs['R1']:02X}(期望0x{REG_DEFAULT_VALUES['R1']:02X}) " # f"R2=0x{regs['R2']:02X}(期望0x{REG_DEFAULT_VALUES['R2']:02X}) " # f"R3=0x{regs['R3']:02X}(期望0x{REG_DEFAULT_VALUES['R3']:02X}) " # f"R4=0x{regs['R4']:02X}(期望0x{REG_DEFAULT_VALUES['R4']:02X})") # success = False # channel_data[channel] = regs # # 保存验证结果到CSV # CSVLogger.write_register_read(dac_num, channel_data) # return success def batch_verify_all_registers(self, dev_index: int) -> bool: """ 批量验证所有DAC寄存器值是否正确 返回: bool: 全部正确返回True,任意错误返回False """ # 更新默认寄存器值字典,包含R5-R7 REG_DEFAULT_VALUES_EXTENDED = { 'R1': 0x00, 'R2': 0xA0, 'R3': 0x28, 'R4': 0x00, 'R5': 0x00, # 添加R5-R7的默认值 'R6': 0x00, 'R7': 0x00 } success = True for dac_num in range(1, 21): # DAC编号1-20 channel_data = {} for channel in range(1, 2): # 通道1-255 # 发送读取命令 control_params = [ dac_num & 0x7F, # DAC编号 channel & 0xFF, # 目标通道 0, 0, 0, 0, 0, 0, 0, 0, 0 # 保留字节 ] frame = self._build_frame( cmd=SP_CMD_RGG_RD, control_params=control_params, voltage_data=[], voltage_type=VOLTAGE_TYPE_SAME ) if not self.uart_handler.send_data(frame): logger.error(f"DAC{dac_num}通道{channel}发送读取命令失败") success = False continue # 读取响应(等待50ms后读取7字节) time.sleep(0.05) response = self.uart_handler.read_data(7) if len(response) < 7: logger.error(f"DAC{dac_num}通道{channel}响应数据不足,期望7字节,收到{len(response)}字节") success = True regs = { 'R1': 0x00, 'R2': 0x00, 'R3': 0x00, 'R4': 0x00, 'R5': 0x00, 'R6': 0x00, 'R7': 0x00 } channel_data[channel] = regs CSVLogger.write_register_read_extended(dac_num, channel_data) continue # 解析寄存器值 regs = { 'R7': response[0], 'R6': response[1], 'R5': response[2], 'R4': response[3], 'R3': response[4], # 新增R5-R7 'R2': response[5], 'R1': response[6] } # 验证寄存器值是否符合默认值 # for reg_num in range(1, 8): # reg_key = f'R{reg_num}' # if regs[reg_key] != REG_DEFAULT_VALUES_EXTENDED[reg_key]: # logger.error(f"DAC{dac_num}通道{channel} {reg_key}校验失败: " # f"0x{regs[reg_key]:02X}(期望0x{REG_DEFAULT_VALUES_EXTENDED[reg_key]:02X})") # success = False channel_data[channel] = regs # 保存验证结果到CSV CSVLogger.write_register_read_extended(dac_num, channel_data) return success def batch_write_then_read_all_registers(self, dev_index: int) -> bool: """ 批量读写所有DAC所有通道的寄存器 返回: bool: 全部成功返回True,任意失败返回False """ success = True for dac_num in range(1, 21): # DAC编号1-20 read_data = {} # 存储读取结果 write_cmd_data = {} # 存储写入命令 for channel in range(1, 256): # :将range(256)改为range(1, 256),通道1-255 # ===== 写入操作 ===== control_params = [ dac_num & 0x7F, # 字节1:DAC编号 channel & 0xFF, # 字节2:通道号 0x01, # 字节3:EN_TADC=1 0x10, # 字节4:TRIG_TADC=1 REG_DEFAULT_VALUES['R1'], # 字节5:R1 REG_DEFAULT_VALUES['R2'], # 字节6:R2 REG_DEFAULT_VALUES['R3'], # 字节7:R3 REG_DEFAULT_VALUES['R4'], # 字节8:R4 0, 0, 0 # 保留字节 ] # 记录写入命令的4-7字节 write_cmd_data[channel] = control_params[3:7] # 发送写入命令 write_frame = self._build_frame( cmd=SP_CMD_RGG_WR, control_params=control_params, voltage_data=[], voltage_type=VOLTAGE_TYPE_SAME ) if not self.uart_handler.send_data(write_frame): logger.error(f"DAC{dac_num}通道{channel}写入失败") success = False continue time.sleep(0.01) # 写入后短暂延迟 # ===== 读取操作 ===== regs = self.read_channel_registers(dev_index, dac_num, channel) if regs is None: logger.error(f"DAC{dac_num}通道{channel}读取失败") success = False continue # 验证寄存器值 if (regs['R1'] != REG_DEFAULT_VALUES['R1'] or regs['R2'] != REG_DEFAULT_VALUES['R2'] or regs['R3'] != REG_DEFAULT_VALUES['R3'] or regs['R4'] != REG_DEFAULT_VALUES['R4']): logger.error(f"DAC{dac_num}通道{channel}校验失败: " f"R1=0x{regs['R1']:02X}(期望0x{REG_DEFAULT_VALUES['R1']:02X}) " f"R2=0x{regs['R2']:02X}(期望0x{REG_DEFAULT_VALUES['R2']:02X}) " f"R3=0x{regs['R3']:02X}(期望0x{REG_DEFAULT_VALUES['R3']:02X}) " f"R4=0x{regs['R4']:02X}(期望0x{REG_DEFAULT_VALUES['R4']:02X})") success = False read_data[channel] = regs time.sleep(0.01) # 读写间隔 # 保存当前DAC的写入命令和读取结果 CSVLogger.write_register_write(dac_num, write_cmd_data) CSVLogger.write_register_read(dac_num, read_data) return success """CSV文件记录工具类""" class CSVLogger: @staticmethod def write_register_read_extended(dac_num: int, data: Dict[int, Dict[str, int]]): """ 记录寄存器读取结果到CSV(包含R1-R7) 格式示例: 通道1: 0x00,0xA0,0x28,0x00,0x00,0x00,0x00 """ filename = f"DAC{dac_num}_reg_read_extended_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" os.makedirs('reg_logs', exist_ok=True) with open(f'reg_logs/{filename}', 'w', newline='') as f: writer = csv.writer(f) # 更新表头包含R1-R7 writer.writerow(['Channel', 'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7']) for channel, regs in data.items(): writer.writerow([ f'通道{channel}', f"0x{regs['R1']:02X}", f"0x{regs['R2']:02X}", f"0x{regs['R3']:02X}", f"0x{regs['R4']:02X}", f"0x{regs['R5']:02X}", # 新增R5-R7 f"0x{regs['R6']:02X}", f"0x{regs['R7']:02X}" ]) logger.info(f"DAC{dac_num}扩展寄存器读取结果已保存到{filename}") def write_register_read(dac_num: int, data: Dict[int, Dict[str, int]]): """ 记录寄存器读取结果到CSV 格式示例: 通道1: 0x00,0xA0,0x28,0x00 通道2: 0x01,0xA1,0x29,0x01 """ filename = f"DAC{dac_num}_reg_read_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" os.makedirs('reg_logs', exist_ok=True) with open(f'reg_logs/{filename}', 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['Channel', 'R1', 'R2', 'R3', 'R4']) for channel, regs in data.items(): writer.writerow([ f'通道{channel}', f"0x{regs['R1']:02X}", f"0x{regs['R2']:02X}", f"0x{regs['R3']:02X}", f"0x{regs['R4']:02X}" ]) logger.info(f"DAC{dac_num}寄存器读取结果已保存到{filename}") @staticmethod def write_register_write(dac_num: int, data: Dict[int, List[int]]): """ 记录寄存器写入命令的4-7字节到CSV 格式示例: 通道1: 0x01,0x10,0x00,0xA0 通道2: 0x01,0x10,0x01,0xA1 """ filename = f"写入DAC{dac_num}_reg_write_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" os.makedirs('reg_logs', exist_ok=True) with open(f'reg_logs/{filename}', 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['Channel', 'Byte4', 'Byte5', 'Byte6', 'Byte7']) for channel, bytes_data in data.items(): writer.writerow([ f'通道{channel}', f"0x{bytes_data[0]:02X}", f"0x{bytes_data[1]:02X}", f"0x{bytes_data[2]:02X}", f"0x{bytes_data[3]:02X}" ]) logger.info(f"DAC{dac_num}寄存器写入命令已保存到{filename}") if __name__ == "__main__": # 创建DACHandler实例 handler = DACHandler() # 定义回调函数 def invalid_cmd_callback(res: bool): logger.info(f"无效命令结果: {res}") def zero_callback(res: bool): logger.info(f"归零结果: {res}") def same_value_callback(res: bool): logger.info(f"同值配置结果: {res}") def diff_value_callback(res: bool): logger.info(f"异值配置结果: {res}") def rgg_wr_callback(res: bool): logger.info(f"RGG_WR 命令结果: {res}") def rgg_rd_callback(res: bool): logger.info(f"RGG_RD 命令结果: {res}") # 设置回调函数 handler.set_callback('set_invalid_cmd_res', invalid_cmd_callback) handler.set_callback('set_zero_res', zero_callback) handler.set_callback('set_same_value_res', same_value_callback) handler.set_callback('set_diff_value_res', diff_value_callback) handler.set_callback('set_rgg_wr_res', rgg_wr_callback) handler.set_callback('set_rgg_rd_res', rgg_rd_callback) # 开启调试模式 handler.debug(True) try: # 初始化USB设备(CH347串口设备) devices = handler.init_usb_device() if not devices: logger.error("未找到 CH347 UART 设备") exit(1) # 执行写后立即读操作 # logger.info("=== 开始写后立即读操作 ===") # if not handler.batch_write_then_read_all_registers(devices[0]): # logger.error("写后立即读操作失败") # else: # logger.info("所有寄存器读写验证完成") # # 1. 首次上电读取所有寄存器 # logger.info("=== 开始首次读取所有寄存器 ===") # if not handler.batch_read_all_registers(devices[0]): # logger.error("首次读取寄存器失败") # 2. 写入默认寄存器值 logger.info("=== 开始写入默认寄存器值 ===") if not handler.batch_write_all_registers(devices[0]): logger.error("写入默认寄存器值失败") # 3. 读取寄存器值 logger.info("=== 开始验证寄存器值 ===") if not handler.batch_verify_all_registers(devices[0]): logger.error("寄存器验证失败") else: logger.info("所有寄存器验证成功") # 配置DAC5通道10的寄存器 handler.write_channel_registers( dev_index=0, dac_num=5, channel=10, reg_values={ 'R1': 0x01, # 只修改R1 'R3': 0x30 # 修改R3 } # R2/R4保持默认 ) # 默认值 handler.write_channel_registers( dev_index=0, dac_num=5, channel=10, reg_values=REG_DEFAULT_VALUES ) # 读取DAC5通道10的寄存器 regs = handler.read_channel_registers( dev_index=0, dac_num=5, channel=10 ) print(f"DAC5通道10寄存器: {regs}") # # 重置DAC5通道10为默认值 # handler.write_channel_registers( # dev_index=0, # dac_num=5, # channel=10, # reg_values={} # 不传参数即全部使用默认值 # ) # 测试电压配置 # dac_number = 1 # voltage_values = [512] * 11 # print(voltage_values) # handler.self_test_voltage_config(devices[0], dac_number, voltage_values) # time.sleep(2) # 测试ADC触发 # dac_number = 1 # dac_channel = 1 # handler.self_test_adc_tri(devices[0], dac_number, dac_channel) # time.sleep(2) # 测试发送1024个电压值到OPA # voltage_values_1024 = [v % 1024 for v in range(1024)] # 调整为10位值 # # voltage_values_1024 = [1023] * 1024 # handler.send_1024_voltages_to_opa(devices[0], opa_number=3, voltage_values=voltage_values_1024) # time.sleep(2) # file_path = r"D:\mjq\Hot_SIOPA\package\SPGDdata\VscanOutput\Vse_psi_0.00_theta_13.31_opa_1.txt" # opa_number = 1 # # 发送文件中的电压值 # handler.send_voltages_from_file(devices[0], opa_number, file_path) finally: # 关闭设备 handler.stop()