1553 lines
57 KiB
Python
Executable File
1553 lines
57 KiB
Python
Executable File
'''独立模块最终版本 - 完整串口通信版本'''
|
||
|
||
import ctypes
|
||
import time
|
||
import logging
|
||
from typing import Callable, List, Dict, Optional
|
||
import csv
|
||
from datetime import datetime
|
||
import os
|
||
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s [%(levelname)s] %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# === 协议常量(基于《SP713-ICD协议_V1.6》) ===
|
||
TIMEOUT = 1.0 # 串口通信超时时间(秒)
|
||
FRAME_HEADER = [0x9F, 0xE4] # 帧头,表3字节1-2:低字节0x9F,高字节0xE4
|
||
FRAME_SIZE = 338 # 固定帧长度,表3定义338字节
|
||
BAUDRATE = 115200 # 波特率,表1定义115200bps
|
||
VOLTAGE_TYPE_SAME = 0x01 # 电压类型:同值,表5
|
||
VOLTAGE_TYPE_DIFF = 0x02 # 电压类型:异值,表5
|
||
|
||
# === 命令码(基于表4) ===
|
||
SP_CMD_NA = 0 # 无效命令
|
||
SP_CMD_ZERO = 1 # 归零命令
|
||
SP_CMD_SAME_VALUE = 2 # 同值配置命令
|
||
SP_CMD_DIF_VALUE = 3 # 异值配置命令
|
||
SP_CMD_RGG_WR = 4 # 寄存器写入命令
|
||
SP_CMD_RGG_RD = 5 # 寄存器读取命令
|
||
|
||
# === 分组常量(基于表4) ===
|
||
GROUP_ALL = 0xAA # 所有分组
|
||
GROUP_AB = 0x01 # AB分组
|
||
GROUP_OPA = 0x02 # OPA分组
|
||
GROUP_DAC = 0x03 # DAC分组
|
||
|
||
# === 寄存器常量定义 ===
|
||
REG_DEFAULT_VALUES = {
|
||
'R1': 0x00, # 寄存器1默认值
|
||
'R2': 0xA0, # 寄存器2默认值
|
||
'R3': 0x28, # 寄存器3默认值
|
||
'R4': 0x00 # 寄存器4默认值
|
||
}
|
||
|
||
# === CH347 DLL加载 ===
|
||
try:
|
||
# 尝试加载CH347芯片的DLL驱动
|
||
ch347 = ctypes.WinDLL(r"D:\mjq\Hot_SIOPA\SP713-spi\CH347DLLA64.DLL")
|
||
except Exception as e:
|
||
logger.error(f"无法加载 CH347DLL.DLL: {e}")
|
||
raise ValueError("DLL加载失败")
|
||
|
||
# def calculate_crc16(data: bytes) -> int:
|
||
# """计算CRC16校验值 """
|
||
# crc = 0xFFFF # 初始化CRC寄存器为0xFFFF
|
||
# for byte in data:
|
||
# crc ^= byte # 当前字节与CRC低8位异或
|
||
# for _ in range(8): # 处理每个bit
|
||
# if crc & 0x0001: # 检查最低位是否为1
|
||
# crc >>= 1 # 右移1位
|
||
# crc ^= 0xA001 # 如果最低位是1,异或多项式0xA001
|
||
# else:
|
||
# crc >>= 1 # 否则仅右移
|
||
# return crc # 返回最终CRC值
|
||
|
||
# === 新增反射型 CRC16 计算函数 ===
|
||
def generate_crc16_reverse_table(poly=0x8408):
|
||
"""生成 LSB-first 反射型 CRC-16 表"""
|
||
table = []
|
||
for i in range(256):
|
||
crc = i
|
||
for _ in range(8):
|
||
if crc & 0x0001:
|
||
crc = (crc >> 1) ^ poly
|
||
else:
|
||
crc >>= 1
|
||
table.append(crc & 0xFFFF)
|
||
return table
|
||
|
||
CRC16_REVERSE_TABLE = generate_crc16_reverse_table() # 预计算表
|
||
|
||
def calculate_crc16(data: bytes) -> int:
|
||
"""使用反射表计算 CRC16 """
|
||
crc = 0x6363 # 初始值
|
||
for b in data:
|
||
crc = (crc >> 8) ^ CRC16_REVERSE_TABLE[(crc ^ b) & 0xFF]
|
||
return crc & 0xFFFF
|
||
|
||
|
||
class UARTHandler:
|
||
"""
|
||
串口通信处理器类,用于管理CH347芯片的UART通信
|
||
|
||
功能:
|
||
- 打开/关闭UART设备
|
||
- 发送/接收UART数据
|
||
- 计算校验和
|
||
- 调试模式控制
|
||
"""
|
||
def __init__(self):
|
||
self.device_index = -1 # CH347设备索引,-1表示未打开
|
||
self.running = False # 串口通信运行状态
|
||
self.data_received_callback = None # 数据接收回调函数
|
||
self.frame_count = 0 # 帧计数器,表3字节15,0-255循环
|
||
self.debug_mode = False # 调试模式开关
|
||
|
||
def open(self) -> bool:
|
||
"""
|
||
打开CH347设备并配置UART接口
|
||
返回: bool - 成功返回True,失败返回False
|
||
"""
|
||
try:
|
||
# 1. 打开设备
|
||
self.device_index = ch347.CH347Uart_Open(0)
|
||
if self.device_index == -1:
|
||
logger.error("无法打开 CH347 UART 设备")
|
||
return False
|
||
|
||
# 2. 初始化串口参数
|
||
# 参数: 波特率, 数据位(8), 校验位(0=None), 停止位(0=1位), 字节超时(单位100uS)
|
||
if not ch347.CH347Uart_Init(0, BAUDRATE, 8, 0, 0, 0):
|
||
logger.error("UART 初始化失败")
|
||
return False
|
||
|
||
# 3. 设置超时时间
|
||
if not ch347.CH347Uart_SetTimeout(0, 1000, 1000): # 读写超时各1秒
|
||
logger.error("设置 UART 超时失败")
|
||
return False
|
||
|
||
self.running = True
|
||
logger.info(f"CH347 UART 接口打开成功,波特率={BAUDRATE}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"打开 CH347 UART 接口失败: {e}")
|
||
return False
|
||
|
||
def send_data(self, data: bytes) -> bool:
|
||
"""
|
||
通过UART发送数据帧
|
||
|
||
参数:
|
||
data (bytes): 要发送的数据
|
||
|
||
返回:
|
||
bool: 发送成功返回True,失败返回False
|
||
"""
|
||
if self.debug_mode:
|
||
logger.debug(f"发送数据: {data.hex()}")
|
||
try:
|
||
# 准备发送缓冲区
|
||
write_buffer = (ctypes.c_ubyte * len(data)).from_buffer_copy(data)
|
||
length = ctypes.c_ulong(len(data))
|
||
|
||
# 执行UART写操作
|
||
success = ch347.CH347Uart_Write(
|
||
0, write_buffer, ctypes.byref(length))
|
||
|
||
if not success or length.value != len(data):
|
||
logger.error(f"UART 数据发送失败,实际发送 {length.value}/{len(data)} 字节")
|
||
return False
|
||
|
||
logger.debug("UART 数据发送成功")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"UART 发送数据失败: {e}")
|
||
return False
|
||
|
||
def read_data(self, length: int) -> bytes:
|
||
"""
|
||
通过UART读取指定长度的数据
|
||
|
||
参数:
|
||
length (int): 要读取的数据长度
|
||
|
||
返回:
|
||
bytes: 读取到的数据,失败返回空bytes
|
||
"""
|
||
try:
|
||
# 准备接收缓冲区
|
||
read_buffer = (ctypes.c_ubyte * length)()
|
||
read_length = ctypes.c_ulong(length)
|
||
|
||
# 执行UART读操作
|
||
success = ch347.CH347Uart_Read(
|
||
0, read_buffer, ctypes.byref(read_length))
|
||
|
||
if not success:
|
||
logger.error("UART 读取数据失败")
|
||
return b""
|
||
|
||
if self.debug_mode:
|
||
logger.debug(f"接收数据: {bytes(read_buffer[:read_length.value]).hex()}")
|
||
return bytes(read_buffer[:read_length.value])
|
||
except Exception as e:
|
||
logger.error(f"UART 读取数据失败: {e}")
|
||
return b""
|
||
|
||
def set_data_received_callback(self, callback: Callable[[bytes], None]) -> None:
|
||
"""
|
||
设置数据接收回调函数
|
||
|
||
参数:
|
||
callback (Callable[[bytes], None]): 回调函数
|
||
"""
|
||
self.data_received_callback = callback
|
||
|
||
def debug(self, on: bool = True) -> None:
|
||
"""
|
||
开启或关闭调试模式
|
||
|
||
参数:
|
||
on (bool): True开启调试,False关闭调试
|
||
"""
|
||
self.debug_mode = on
|
||
logger.info(f"调试模式: {'开启' if on else '关闭'}")
|
||
|
||
def stop(self) -> None:
|
||
"""
|
||
关闭CH347设备
|
||
"""
|
||
self.running = False
|
||
ch347.CH347Uart_Close(0)
|
||
logger.info("CH347 UART 接口已关闭")
|
||
|
||
def calculate_checksum(self, data: List[int]) -> int:
|
||
"""
|
||
计算校验和(表3字节16和338)
|
||
|
||
参数:
|
||
data (List[int]): 要计算校验和的数据列表
|
||
|
||
返回:
|
||
int: 校验和值
|
||
"""
|
||
return sum(data) & 0xFF
|
||
|
||
class DACHandler:
|
||
"""
|
||
DAC测试工装协议处理器类
|
||
|
||
功能:
|
||
- 处理DAC测试工装的协议逻辑
|
||
- 通过UARTHandler与FPGA和DAC512通信
|
||
- 支持多种命令操作
|
||
"""
|
||
def __init__(self):
|
||
# 初始化串口处理器
|
||
self.uart_handler = UARTHandler()
|
||
if not self.uart_handler.open():
|
||
raise Exception("初始化 UARTHandler 失败")
|
||
|
||
# 命令回调函数字典
|
||
self.callbacks = {
|
||
'set_invalid_cmd_res': None,
|
||
'set_zero_res': None,
|
||
'set_same_value_res': None,
|
||
'set_diff_value_res': None,
|
||
'set_rgg_wr_res': None,
|
||
'set_rgg_rd_res': None,
|
||
}
|
||
self.devices = [0] # 设备列表
|
||
|
||
def init_usb_device(self) -> List[int]:
|
||
"""
|
||
初始化USB设备
|
||
|
||
返回:
|
||
List[int]: 设备索引列表
|
||
"""
|
||
return self.devices
|
||
|
||
def set_callback(self, signal: str, callback: Callable[[bool], None]) -> None:
|
||
"""
|
||
设置命令响应回调函数
|
||
|
||
参数:
|
||
signal (str): 信号名称
|
||
callback (Callable[[bool], None]): 回调函数
|
||
"""
|
||
if signal in self.callbacks:
|
||
self.callbacks[signal] = callback
|
||
|
||
def debug(self, on: bool = True) -> None:
|
||
"""
|
||
开启或关闭调试模式
|
||
|
||
参数:
|
||
on (bool): True开启调试,False关闭调试
|
||
"""
|
||
self.uart_handler.debug(on)
|
||
|
||
def stop(self) -> None:
|
||
"""
|
||
停止串口通信,关闭设备
|
||
"""
|
||
self.uart_handler.stop()
|
||
|
||
def _validate_group_child(self, group: int, child_group: int) -> bool:
|
||
"""
|
||
验证分组和子分组的有效性(基于表4)
|
||
|
||
参数:
|
||
group (int): 分组值
|
||
child_group (int): 子分组值
|
||
|
||
返回:
|
||
bool: 有效返回True,无效返回False
|
||
"""
|
||
# 定义有效的分组和子分组对应关系
|
||
valid_pairs = {
|
||
GROUP_ALL: [0xAA],
|
||
GROUP_AB: [0x01, 0x02],
|
||
GROUP_OPA: [0x01, 0x02, 0x03, 0x04, 0x05],
|
||
GROUP_DAC: list(range(0x01, 0x15))
|
||
}
|
||
|
||
if group not in valid_pairs:
|
||
logger.error(f"无效的分组: {group:02X}")
|
||
return False
|
||
|
||
if child_group not in valid_pairs[group]:
|
||
logger.error(f"子分组 {child_group:02X} 与分组 {group:02X} 不匹配")
|
||
return False
|
||
|
||
return True
|
||
|
||
def _encode_10bit_voltages(self, values: List[int]) -> List[int]:
|
||
"""
|
||
将10位电压值编码为320字节(符合表5异值格式)
|
||
|
||
参数:
|
||
values (List[int]): 10位电压值列表(0-1023,最多256个)
|
||
|
||
返回:
|
||
List[int]: 320字节电压值参数
|
||
"""
|
||
if not values:
|
||
return [0] * 320
|
||
|
||
if any(v < 0 or v > 1023 for v in values):
|
||
logger.error("电压值必须在0-1023范围内")
|
||
return [0] * 320
|
||
|
||
# 初始化320字节
|
||
bytes_out = [0] * 320
|
||
bit_position = 0 # 当前比特位置
|
||
byte_index = 0 # 当前字节索引
|
||
|
||
for value in values[:256]: # 最多256个值
|
||
# 剩余比特数
|
||
bits_remaining = 10
|
||
while bits_remaining > 0:
|
||
# 当前字节可用比特数
|
||
bits_in_current_byte = 8 - (bit_position % 8)
|
||
# 本次写入的比特数
|
||
bits_to_write = min(bits_remaining, bits_in_current_byte)
|
||
# 提取value的低bits_to_write位
|
||
value_bits = (value >> (bits_remaining - bits_to_write)) & ((1 << bits_to_write) - 1)
|
||
# 写入当前字节
|
||
bytes_out[byte_index] |= value_bits << (bits_in_current_byte - bits_to_write)
|
||
# 更新位置
|
||
bit_position += bits_to_write
|
||
bits_remaining -= bits_to_write
|
||
if bit_position % 8 == 0:
|
||
byte_index += 1
|
||
|
||
if self.uart_handler.debug_mode:
|
||
logger.debug(f"编码10位电压值: {bytes(bytes_out[:10]).hex()}...(前10字节)")
|
||
return bytes_out
|
||
|
||
|
||
def _build_frame(self, cmd: int, control_params: List[int], voltage_data: List[int], voltage_type: int) -> bytes:
|
||
"""构建340字节协议帧(V2.0版本,含CRC16校验)"""
|
||
if len(control_params) != 11:
|
||
logger.error("控制参数长度必须为11字节")
|
||
return b""
|
||
|
||
# 初始化帧
|
||
frame = FRAME_HEADER + [cmd] + control_params + [self.uart_handler.frame_count]
|
||
checksum1 = self.uart_handler.calculate_checksum(frame)
|
||
frame.append(checksum1)
|
||
frame.append(voltage_type)
|
||
|
||
# 处理电压值参数
|
||
if voltage_type == VOLTAGE_TYPE_DIFF:
|
||
voltage_bytes = self._encode_10bit_voltages(voltage_data[:256])
|
||
else:
|
||
little_endian_voltage = []
|
||
for val in voltage_data[:159]:
|
||
little_endian_voltage.extend([val & 0xFF, (val >> 8) & 0xFF])
|
||
voltage_bytes = little_endian_voltage + [0] * (320 - len(little_endian_voltage))
|
||
|
||
frame.extend(voltage_bytes)
|
||
|
||
# 新增:计算电压数据的CRC16(字节18-337)
|
||
voltage_part = bytes(frame[17:337]) # 提取电压数据部分
|
||
crc16 = calculate_crc16(voltage_part)
|
||
frame.extend([crc16 & 0xFF, (crc16 >> 8) & 0xFF]) # 小端模式
|
||
|
||
# 计算最终校验和(字节1-339)
|
||
checksum2 = self.uart_handler.calculate_checksum(frame)
|
||
frame.append(checksum2)
|
||
self.uart_handler.frame_count = (self.uart_handler.frame_count + 1) % 256
|
||
|
||
if self.uart_handler.debug_mode:
|
||
logger.debug(f"构建帧: {bytes(frame).hex()}")
|
||
logger.debug(f"CRC16: 0x{crc16:04X}")
|
||
|
||
# frame = frame[:16]
|
||
|
||
return bytes(frame)
|
||
|
||
|
||
|
||
def set_invalid_cmd(self, dev_index: int = 0) -> bool:
|
||
"""
|
||
发送无效命令(SP_CMD_NA)
|
||
|
||
参数:
|
||
dev_index (int): 设备索引
|
||
|
||
返回:
|
||
bool: 发送成功返回True,失败返回False
|
||
"""
|
||
cmd = SP_CMD_NA
|
||
control_params = [0] * 11
|
||
voltage_data = []
|
||
frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME)
|
||
success = self.uart_handler.send_data(frame)
|
||
if success and self.callbacks['set_invalid_cmd_res']:
|
||
self.callbacks['set_invalid_cmd_res'](True)
|
||
elif not success and self.callbacks['set_invalid_cmd_res']:
|
||
self.callbacks['set_invalid_cmd_res'](False)
|
||
return success
|
||
|
||
def set_zero(self, dev_index: int, group: int, child_group: int) -> bool:
|
||
"""
|
||
发送归零命令(SP_CMD_ZERO)
|
||
|
||
参数:
|
||
dev_index (int): 设备索引
|
||
group (int): 分组值
|
||
child_group (int): 子分组值
|
||
|
||
返回:
|
||
bool: 发送成功返回True,失败返回False
|
||
"""
|
||
if not self._validate_group_child(group, child_group):
|
||
return False
|
||
|
||
cmd = SP_CMD_ZERO
|
||
control_params = [group & 0xFF, child_group & 0xFF] + [0] * 9
|
||
voltage_data = []
|
||
frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME)
|
||
success = self.uart_handler.send_data(frame)
|
||
|
||
if success:
|
||
logger.info(f"归零成功,分组: {group:02X}, 子分组: {child_group:02X}")
|
||
if self.callbacks['set_zero_res']:
|
||
self.callbacks['set_zero_res'](True)
|
||
else:
|
||
logger.error(f"归零失败,分组: {group:02X}, 子分组: {child_group:02X}")
|
||
if self.callbacks['set_zero_res']:
|
||
self.callbacks['set_zero_res'](False)
|
||
return success
|
||
|
||
def set_same_value(self, dev_index: int, value: int, group: int, child_group: int) -> bool:
|
||
"""
|
||
发送同值配置命令(SP_CMD_SAME_VALUE)
|
||
|
||
参数:
|
||
dev_index (int): 设备索引
|
||
value (int): 电压值(0-0xFFFF)
|
||
group (int): 分组值
|
||
child_group (int): 子分组值
|
||
|
||
返回:
|
||
bool: 发送成功返回True,失败返回False
|
||
"""
|
||
if not (0 <= value <= 0xFFFF):
|
||
logger.error(f"无效的电压值: {value}")
|
||
return False
|
||
|
||
if not self._validate_group_child(group, child_group):
|
||
return False
|
||
|
||
cmd = SP_CMD_SAME_VALUE
|
||
control_params = [group & 0xFF, child_group & 0xFF] + [0] * 9
|
||
voltage_data = [value] * 159
|
||
frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME)
|
||
success = self.uart_handler.send_data(frame)
|
||
|
||
if success:
|
||
logger.info(f"同值配置成功,分组: {group:02X}, 子分组: {child_group:02X}, 电压值: {value}")
|
||
if self.callbacks['set_same_value_res']:
|
||
self.callbacks['set_same_value_res'](True)
|
||
else:
|
||
logger.error(f"同值配置失败,分组: {group:02X}, 子分组: {child_group:02X}")
|
||
if self.callbacks['set_same_value_res']:
|
||
self.callbacks['set_same_value_res'](False)
|
||
return success
|
||
|
||
def set_diff_value(self, dev_index: int, values: List[int], group: int = GROUP_DAC, child_group: int = 1) -> bool:
|
||
"""
|
||
发送异值配置命令(SP_CMD_DIF_VALUE),设置256个10位电压值
|
||
|
||
参数:
|
||
dev_index (int): 设备索引
|
||
values (List[int]): 10位电压值列表(0-1023,最多256个)
|
||
group (int): 分组值(默认 GROUP_DAC)
|
||
child_group (int): 子分组值(默认 1)
|
||
|
||
返回:
|
||
bool: 发送成功返回True,失败返回False
|
||
"""
|
||
# 验证电压值数量和范围
|
||
if len(values) > 256:
|
||
logger.error("异值配置电压值不能超过 256 个")
|
||
return False
|
||
|
||
if any(v < 0 or v > 1023 for v in values):
|
||
logger.error("电压值必须在0-1023范围内")
|
||
return False
|
||
|
||
if not self._validate_group_child(group, child_group):
|
||
return False
|
||
|
||
cmd = SP_CMD_DIF_VALUE
|
||
control_params = [group & 0xFF, child_group & 0xFF] + [0] * 9
|
||
frame = self._build_frame(cmd, control_params, values, VOLTAGE_TYPE_DIFF)
|
||
success = self.uart_handler.send_data(frame)
|
||
|
||
if success:
|
||
logger.info(f"异值配置成功,分组: {group:02X}, 子分组: {child_group:02X}, 电压值数量: {len(values)}")
|
||
if self.callbacks['set_diff_value_res']:
|
||
self.callbacks['set_diff_value_res'](True)
|
||
else:
|
||
logger.error(f"异值配置失败,分组: {group:02X}, 子分组: {child_group:02X}")
|
||
if self.callbacks['set_diff_value_res']:
|
||
self.callbacks['set_diff_value_res'](False)
|
||
return success
|
||
|
||
def set_rgg_wr(self, dev_index: int, dac_number: int, dac_channel: int, en_tadc: int,
|
||
fbk_en: int, temp_test_en: int, test_en: int, trig_tadc: int,
|
||
clk_div: int, icon8: int, votes: List[int]) -> bool:
|
||
"""
|
||
发送寄存器写入命令(SP_CMD_RGG_WR)
|
||
|
||
参数:
|
||
dev_index (int): 设备索引
|
||
dac_number (int): DAC编号
|
||
dac_channel (int): DAC通道
|
||
en_tadc (int): 使能TADC
|
||
fbk_en (int): 反馈使能
|
||
temp_test_en (int): 温度测试使能
|
||
test_en (int): 测试使能
|
||
trig_tadc (int): 触发TADC
|
||
clk_div (int): 时钟分频
|
||
icon8 (int): ICON8配置
|
||
votes (List[int]): 投票值列表
|
||
|
||
返回:
|
||
bool: 发送成功返回True,失败返回False
|
||
"""
|
||
cmd = SP_CMD_RGG_WR
|
||
control_params = [
|
||
dac_number & 0xFF,
|
||
dac_channel & 0xFF,
|
||
(dac_channel >> 8) & 0xFF,
|
||
en_tadc & 0x01,
|
||
fbk_en & 0x01,
|
||
temp_test_en & 0x01,
|
||
test_en & 0x01,
|
||
trig_tadc & 0x01,
|
||
clk_div & 0xFF,
|
||
icon8 & 0xFF,
|
||
0
|
||
]
|
||
voltage_data = votes[:159]
|
||
frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME)
|
||
success = self.uart_handler.send_data(frame)
|
||
|
||
if success and self.callbacks['set_rgg_wr_res']:
|
||
self.callbacks['set_rgg_wr_res'](True)
|
||
elif not success and self.callbacks['set_rgg_wr_res']:
|
||
self.callbacks['set_rgg_wr_res'](False)
|
||
return success
|
||
|
||
def set_rgg_rd(self, dev_index: int, dac_number: int, dac_channel: int) -> bool:
|
||
"""
|
||
发送寄存器读取命令(SP_CMD_RGG_RD)
|
||
|
||
参数:
|
||
dev_index (int): 设备索引
|
||
dac_number (int): DAC编号
|
||
dac_channel (int): DAC通道
|
||
|
||
返回:
|
||
bool: 发送成功返回True,失败返回False
|
||
"""
|
||
cmd = SP_CMD_RGG_RD
|
||
control_params = [
|
||
dac_number & 0xFF,
|
||
dac_channel & 0xFF,
|
||
(dac_channel >> 8) & 0xFF,
|
||
0, 0, 0, 0, 0, 0, 0, 0
|
||
]
|
||
voltage_data = []
|
||
frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME)
|
||
success = self.uart_handler.send_data(frame)
|
||
|
||
if success:
|
||
time.sleep(1)
|
||
response = self.uart_handler.read_data(7)
|
||
self._parse_response(response)
|
||
|
||
if success and self.callbacks['set_rgg_rd_res']:
|
||
self.callbacks['set_rgg_rd_res'](True)
|
||
elif not success and self.callbacks['set_rgg_rd_res']:
|
||
self.callbacks['set_rgg_rd_res'](False)
|
||
return success
|
||
|
||
def _parse_response(self, data: bytes) -> None:
|
||
"""
|
||
解析寄存器读取响应
|
||
|
||
参数:
|
||
data (bytes): 接收到的数据
|
||
"""
|
||
if len(data) < 7:
|
||
logger.error("接收数据过短,期望7字节寄存器值")
|
||
return
|
||
|
||
try:
|
||
registers = list(data[:7])
|
||
logger.info(f"寄存器值 (REG7-REG1): {registers}")
|
||
except Exception as e:
|
||
logger.error(f"解析寄存器数据失败: {e}")
|
||
|
||
def apply_arg(self, dev_index: int, cmd_arg: Dict, votege_arg: List[int]) -> bool:
|
||
"""
|
||
根据命令参数字典执行相应命令
|
||
|
||
参数:
|
||
dev_index (int): 设备索引
|
||
cmd_arg (Dict): 命令参数字典
|
||
votege_arg (List[int]): 电压参数列表
|
||
|
||
返回:
|
||
bool: 执行成功返回True,失败返回False
|
||
"""
|
||
cmd = cmd_arg.get('cmd', SP_CMD_NA)
|
||
|
||
# 根据命令码调用相应的方法
|
||
if cmd == SP_CMD_RGG_WR:
|
||
return self.set_rgg_wr(
|
||
dev_index,
|
||
cmd_arg.get('dacNumber', 0),
|
||
cmd_arg.get('dacChannel', 0),
|
||
cmd_arg.get('enTadc', 1),
|
||
cmd_arg.get('fbkEn', 0),
|
||
cmd_arg.get('tempTestEn', 0),
|
||
cmd_arg.get('testEn', 1),
|
||
cmd_arg.get('trigTadc', 0),
|
||
cmd_arg.get('clkDiv', 1),
|
||
cmd_arg.get('icon8', 1),
|
||
votege_arg
|
||
)
|
||
elif cmd == SP_CMD_DIF_VALUE:
|
||
return self.set_diff_value(
|
||
dev_index,
|
||
votege_arg,
|
||
cmd_arg.get('group', GROUP_DAC),
|
||
cmd_arg.get('childGroup', 1)
|
||
)
|
||
elif cmd == SP_CMD_SAME_VALUE:
|
||
return self.set_same_value(
|
||
dev_index,
|
||
votege_arg[0] if votege_arg else 0,
|
||
cmd_arg.get('group', GROUP_DAC),
|
||
cmd_arg.get('childGroup', 1)
|
||
)
|
||
elif cmd == SP_CMD_ZERO:
|
||
return self.set_zero(
|
||
dev_index,
|
||
cmd_arg.get('group', GROUP_ALL),
|
||
cmd_arg.get('childGroup', GROUP_ALL)
|
||
)
|
||
elif cmd == SP_CMD_RGG_RD:
|
||
return self.set_rgg_rd(
|
||
dev_index,
|
||
cmd_arg.get('dacNumber', 0),
|
||
cmd_arg.get('dacChannel', 0)
|
||
)
|
||
else:
|
||
return self.set_invalid_cmd(dev_index)
|
||
|
||
def self_test_voltage_config(self, dev_index: int, dac_number: int, voltage_values: List[int]) -> bool:
|
||
"""
|
||
DAC自检电压配置
|
||
|
||
参数:
|
||
dev_index (int): 设备索引
|
||
dac_number (int): DAC编号
|
||
voltage_values (List[int]): 电压值列表
|
||
|
||
返回:
|
||
bool: 执行成功返回True,失败返回False
|
||
"""
|
||
cmd_arg = {
|
||
'cmd': SP_CMD_RGG_WR,
|
||
'dacNumber': dac_number,
|
||
'dacChannel': 0,
|
||
'enTadc': 0,
|
||
'fbkEn': 0,
|
||
'tempTestEn': 0,
|
||
'testEn': 0,
|
||
'trigTadc': 0,
|
||
'clkDiv': 0,
|
||
'icon8': 0
|
||
}
|
||
success = self.apply_arg(dev_index, cmd_arg, voltage_values[:11])
|
||
|
||
if success:
|
||
logger.info(f"DAC {dac_number} 电压配置成功")
|
||
time.sleep(0.1) # 等待1秒清零
|
||
success = self.set_invalid_cmd(dev_index)
|
||
if success:
|
||
logger.info("DAC与FPGA通信验证成功")
|
||
else:
|
||
logger.error("DAC与FPGA通信验证失败")
|
||
else:
|
||
logger.error(f"DAC {dac_number} 电压配置失败")
|
||
return success
|
||
|
||
def self_test_adc_tri(self, dev_index: int, dac_number: int, dac_channel: int) -> bool:
|
||
"""
|
||
DAC自检ADC触发
|
||
|
||
参数:
|
||
dev_index (int): 设备索引
|
||
dac_number (int): DAC编号
|
||
dac_channel (int): DAC通道
|
||
|
||
返回:
|
||
bool: 执行成功返回True,失败返回False
|
||
"""
|
||
# 初始化寄存器
|
||
cmd_arg_init = {
|
||
'cmd': SP_CMD_RGG_WR,
|
||
'dacNumber': dac_number,
|
||
'dacChannel': dac_channel,
|
||
'enTadc': 1,
|
||
'fbkEn': 0,
|
||
'tempTestEn': 0,
|
||
'testEn': 1,
|
||
'trigTadc': 0,
|
||
'clkDiv': 0x01,
|
||
'icon8': 0x01
|
||
}
|
||
success = self.apply_arg(dev_index, cmd_arg_init, [])
|
||
|
||
if not success:
|
||
logger.error(f"DAC {dac_number} 寄存器初始化失败")
|
||
return False
|
||
|
||
logger.info(f"DAC {dac_number} 寄存器初始化成功")
|
||
|
||
# time.sleep(0.001)
|
||
|
||
# 配置通道
|
||
cmd_arg_channel = {
|
||
'cmd': SP_CMD_RGG_WR,
|
||
'dacNumber': dac_number,
|
||
'dacChannel': dac_channel,
|
||
'enTadc': 1,
|
||
'fbkEn': 0,
|
||
'tempTestEn': 0,
|
||
'testEn': 1,
|
||
'trigTadc': 1,
|
||
'clkDiv': 0x01,
|
||
'icon8': 0x01
|
||
}
|
||
success = self.apply_arg(dev_index, cmd_arg_channel, [])
|
||
|
||
if not success:
|
||
logger.error(f"DAC {dac_number} 通道选择配置失败")
|
||
return False
|
||
|
||
logger.info(f"DAC {dac_number} 通道选择配置成功")
|
||
|
||
# time.sleep(0.001)
|
||
|
||
# 读取寄存器
|
||
cmd_arg_read = {
|
||
'cmd': SP_CMD_RGG_RD,
|
||
'dacNumber': dac_number,
|
||
'dacChannel': dac_channel
|
||
}
|
||
success = self.apply_arg(dev_index, cmd_arg_read, [])
|
||
|
||
if success:
|
||
logger.info(f"DAC {dac_number} 寄存器读取成功")
|
||
else:
|
||
logger.error(f"DAC {dac_number} 寄存器读取失败")
|
||
return success
|
||
|
||
def voltage_lookup_config(self, dev_index: int, opa_number: int, voltage_values: List[int], same_value: bool = True) -> bool:
|
||
"""
|
||
配置OPA的电压值
|
||
|
||
参数:
|
||
dev_index (int): 设备索引
|
||
opa_number (int): OPA编号
|
||
voltage_values (List[int]): 电压值列表
|
||
same_value (bool): 是否使用同值配置
|
||
|
||
返回:
|
||
bool: 执行成功返回True,失败返回False
|
||
"""
|
||
# OPA到DAC的映射关系
|
||
opa_to_dac = {
|
||
0: [4, 5, 9, 10],
|
||
1: [2, 3, 7, 8],
|
||
2: [1, 6, 11, 16],
|
||
3: [12, 13, 17, 18],
|
||
4: [14, 15, 19, 20]
|
||
}
|
||
dac_numbers = opa_to_dac.get(opa_number, [])
|
||
|
||
if not dac_numbers:
|
||
logger.error(f"无效的OPA编号: {opa_number}")
|
||
return False
|
||
|
||
success = True
|
||
for dac_number in dac_numbers:
|
||
if same_value:
|
||
# 同值配置
|
||
cmd_arg = {
|
||
'cmd': SP_CMD_SAME_VALUE,
|
||
'group': GROUP_DAC,
|
||
'childGroup': dac_number
|
||
}
|
||
success &= self.apply_arg(dev_index, cmd_arg, [voltage_values[0]] if voltage_values else [0])
|
||
|
||
if success:
|
||
logger.info(f"OPA {opa_number}, DAC {dac_number} 同值配置成功")
|
||
else:
|
||
logger.error(f"OPA {opa_number}, DAC {dac_number} 同值配置失败")
|
||
else:
|
||
# 异值配置
|
||
cmd_arg = {
|
||
'cmd': SP_CMD_DIF_VALUE,
|
||
'group': GROUP_DAC,
|
||
'childGroup': dac_number
|
||
}
|
||
success &= self.apply_arg(dev_index, cmd_arg, voltage_values[:256])
|
||
|
||
if success:
|
||
logger.info(f"OPA {opa_number}, DAC {dac_number} 异值配置成功")
|
||
else:
|
||
logger.error(f"OPA {opa_number}, DAC {dac_number} 异值配置失败")
|
||
return success
|
||
|
||
def send_1024_voltages_to_opa(self, dev_index: int, opa_number: int, voltage_values: List[int]) -> bool:
|
||
"""
|
||
向OPA发送1024个电压值,分组给4个DAC
|
||
|
||
参数:
|
||
dev_index (int): 设备索引
|
||
opa_number (int): OPA编号
|
||
voltage_values (List[int]): 电压值列表(必须为1024个)
|
||
group_dac: DAC分组编号
|
||
|
||
返回:
|
||
bool: 执行成功返回True,失败返回False
|
||
"""
|
||
if len(voltage_values) != 1024:
|
||
logger.error("电压值列表长度必须为1024")
|
||
return False
|
||
|
||
# OPA到DAC的映射关系
|
||
opa_to_dac = {
|
||
0: [4, 5, 9, 10],
|
||
1: [2, 3, 7, 8],
|
||
2: [1, 6, 11, 16],
|
||
3: [12, 13, 17, 18],
|
||
4: [14, 15, 19, 20]
|
||
}
|
||
dac_numbers = opa_to_dac.get(opa_number, [])
|
||
|
||
if not dac_numbers:
|
||
logger.error(f"无效的OPA编号: {opa_number}")
|
||
return False
|
||
|
||
# 将1024个电压值分成4组,每组256个
|
||
voltage_chunks = [
|
||
voltage_values[0:256],
|
||
voltage_values[256:512],
|
||
voltage_values[512:768],
|
||
voltage_values[768:1024]
|
||
]
|
||
|
||
success = True
|
||
for dac_number, voltages in zip(dac_numbers, voltage_chunks):
|
||
time.sleep(0.1)
|
||
cmd_arg = {
|
||
'cmd': SP_CMD_DIF_VALUE,
|
||
'group': GROUP_DAC,
|
||
'childGroup': dac_number
|
||
}
|
||
success &= self.apply_arg(dev_index, cmd_arg, voltages)
|
||
|
||
if success:
|
||
logger.info(f"OPA {opa_number}, DAC {dac_number} 1024电压值配置成功")
|
||
else:
|
||
logger.error(f"OPA {opa_number}, DAC {dac_number} 1024电压值配置失败")
|
||
break
|
||
return success
|
||
|
||
|
||
def send_voltages_from_file(self, dev_index: int, opa_number: int, file_path: str) -> bool:
|
||
"""
|
||
从文本文件读取1024个电压值并发送到指定OPA
|
||
|
||
参数:
|
||
dev_index (int): 设备索引
|
||
opa_number (int): OPA编号 (0-4)
|
||
file_path (str): 电压值文件路径
|
||
|
||
返回:
|
||
bool: 执行成功返回True,失败返回False
|
||
"""
|
||
try:
|
||
# 读取文件内容
|
||
with open(file_path, 'r') as f:
|
||
lines = f.readlines()
|
||
|
||
# 转换为整数列表(过滤空行和非数字内容)
|
||
voltage_values = []
|
||
for line in lines:
|
||
line = line.strip()
|
||
if line and line.isdigit():
|
||
voltage_values.append(int(line))
|
||
|
||
# 验证数据量
|
||
if len(voltage_values) != 1024:
|
||
logger.error(f"电压值数量必须为1024个,实际读取到 {len(voltage_values)} 个")
|
||
return False
|
||
|
||
# 调用现有方法发送数据
|
||
return self.send_1024_voltages_to_opa(dev_index, opa_number, voltage_values)
|
||
|
||
except Exception as e:
|
||
logger.error(f"读取电压文件失败: {e}")
|
||
return False
|
||
|
||
|
||
# =====寄存器读写========
|
||
def write_channel_registers(self, dev_index: int, dac_num: int, channel: int,
|
||
reg_values: Dict[str, int]) -> bool:
|
||
"""
|
||
写入指定DAC通道的寄存器值(R1-R4)
|
||
基于SP_CMD_REG_WR命令(表4)
|
||
|
||
参数:
|
||
dev_index: 设备索引
|
||
dac_num: DAC编号(1-20)
|
||
channel: 通道号(0-255)
|
||
reg_values: 寄存器值字典 {'R1':0x00, 'R2':0xA0, 'R3':0x28, 'R4':0x00}
|
||
|
||
返回:
|
||
bool: 成功返回True
|
||
"""
|
||
# 参数校验
|
||
if not (1 <= dac_num <= 20 and 1 <= channel <= 255): # 将0 <= channel改为1 <= channel
|
||
logger.error(f"参数错误: DAC编号({dac_num})必须1-20, 通道号({channel})必须0-255")
|
||
return False
|
||
|
||
# 构建控制参数(11字节)
|
||
control_params = [
|
||
dac_num & 0x7F, # 字节1:DAC编号(bit6-0)
|
||
channel & 0xFF, # 字节2:目标通道号
|
||
0x01, # 字节3:bit0=1(EN_TADC)
|
||
0x10, # 字节4:bit4=1(TRIG_TADC)
|
||
reg_values.get('R1', 0x00),
|
||
reg_values.get('R2', 0xA0),
|
||
reg_values.get('R3', 0x28),
|
||
reg_values.get('R4', 0x00),
|
||
0, 0, 0 # 保留字节
|
||
]
|
||
|
||
# 构建并发送帧
|
||
frame = self._build_frame(
|
||
cmd=SP_CMD_RGG_WR,
|
||
control_params=control_params,
|
||
voltage_data=[],
|
||
voltage_type=VOLTAGE_TYPE_SAME
|
||
)
|
||
|
||
success = self.uart_handler.send_data(frame)
|
||
if success:
|
||
logger.info(f"DAC{dac_num}通道{channel}寄存器写入: "
|
||
f"R1=0x{reg_values.get('R1', 0x00):02X} "
|
||
f"R2=0x{reg_values.get('R2', 0xA0):02X} "
|
||
f"R3=0x{reg_values.get('R3', 0x28):02X} "
|
||
f"R4=0x{reg_values.get('R4', 0x00):02X}")
|
||
return success
|
||
|
||
def read_channel_registers(self, dev_index: int, dac_num: int, channel: int) -> Optional[Dict[str, int]]:
|
||
"""
|
||
读取指定DAC通道的寄存器值(R1-R4)
|
||
基于SP_CMD_REG_RD命令(表4)
|
||
|
||
参数:
|
||
dev_index: 设备索引
|
||
dac_num: DAC编号(1-20)
|
||
channel: 通道号(0-255)
|
||
|
||
返回:
|
||
Dict: 寄存器值字典 {'R1':int, 'R2':int, 'R3':int, 'R4':int}
|
||
"""
|
||
if not (1 <= dac_num <= 20 and 1 <= channel <= 255): # 将0 <= channel改为1 <= channel
|
||
logger.error(f"参数错误: DAC编号({dac_num})必须1-20, 通道号({channel})必须0-255")
|
||
return None
|
||
|
||
# 发送读取命令
|
||
control_params = [
|
||
dac_num & 0x7F, # DAC编号
|
||
channel & 0xFF, # 目标通道
|
||
0, 0, 0, 0, 0, 0, 0, 0, 0 # 保留字节
|
||
]
|
||
|
||
frame = self._build_frame(
|
||
cmd=SP_CMD_RGG_RD,
|
||
control_params=control_params,
|
||
voltage_data=[],
|
||
voltage_type=VOLTAGE_TYPE_SAME
|
||
)
|
||
|
||
if not self.uart_handler.send_data(frame):
|
||
return None
|
||
|
||
# 读取响应(等待50ms后读取7字节)
|
||
time.sleep(0.05)
|
||
response = self.uart_handler.read_data(7)
|
||
|
||
if len(response) < 7:
|
||
logger.error(f"DAC{dac_num}通道{channel}响应数据不足,期望7字节,收到{len(response)}字节")
|
||
return None
|
||
# return {
|
||
# 'R1': 0x00,
|
||
# 'R2': 0x00,
|
||
# 'R3': 0x00,
|
||
# 'R4': 0x00,
|
||
# 'R5': 0x00,
|
||
# 'R6': 0x00,
|
||
# 'R7': 0x00
|
||
# }
|
||
|
||
return {
|
||
'R1': response[3],
|
||
'R2': response[4],
|
||
'R3': response[5],
|
||
'R4': response[6]
|
||
}
|
||
|
||
|
||
# =====寄存器批量读写==================
|
||
|
||
def batch_read_all_registers(self, dev_index: int) -> bool:
|
||
"""
|
||
上电首次:批量读取所有DAC所有通道的寄存器值
|
||
|
||
返回:
|
||
bool: 全部成功返回True,任意失败返回False
|
||
"""
|
||
success = True
|
||
for dac_num in range(1, 21): # DAC编号1-20
|
||
channel_data = {}
|
||
for channel in range(1,256): # 通道0-255
|
||
regs = self.read_channel_registers(dev_index, dac_num, channel)
|
||
if regs is None:
|
||
logger.error(f"DAC{dac_num}通道{channel}读取失败")
|
||
success = False
|
||
continue
|
||
channel_data[channel] = regs
|
||
|
||
# 保存到CSV
|
||
CSVLogger.write_register_read(dac_num, channel_data)
|
||
|
||
return success
|
||
|
||
def batch_write_all_registers(self, dev_index: int) -> bool:
|
||
"""
|
||
批量写入所有DAC所有通道的默认寄存器值
|
||
|
||
返回:
|
||
bool: 全部成功返回True,任意失败返回False
|
||
"""
|
||
success = True
|
||
for dac_num in range(1, 21): # DAC编号1-20
|
||
write_cmd_data = {}
|
||
for channel in range(1,256): # 通道0-255
|
||
# 构建控制参数(记录4-7字节)
|
||
control_params = [
|
||
dac_num & 0x7F, # 字节1
|
||
0x00, # 字节5
|
||
0x09, # 字节6
|
||
0x81, # 字节7
|
||
# channel & 0xFF, # 字节2
|
||
# 0x01, # 字节3 (EN_TADC=1)
|
||
# 0x10, # 字节4 (TRIG_TADC=1)
|
||
REG_DEFAULT_VALUES['R1'], # 字节5 (R1)
|
||
REG_DEFAULT_VALUES['R2'], # 字节6 (R2)
|
||
REG_DEFAULT_VALUES['R3'], # 字节7 (R3)
|
||
REG_DEFAULT_VALUES['R4'], # 字节8 (R4)
|
||
0, 0, 0 # 保留字节
|
||
]
|
||
|
||
# 记录写入命令的4-7字节
|
||
write_cmd_data[channel] = control_params[0:7] # 取4-7字节
|
||
|
||
# 发送写入命令
|
||
frame = self._build_frame(
|
||
cmd=SP_CMD_RGG_WR,
|
||
control_params=control_params,
|
||
voltage_data=[],
|
||
voltage_type=VOLTAGE_TYPE_SAME
|
||
)
|
||
if not self.uart_handler.send_data(frame):
|
||
logger.error(f"DAC{dac_num}通道{channel}写入失败")
|
||
success = False
|
||
|
||
# 保存写入命令到CSV
|
||
CSVLogger.write_register_write(dac_num, write_cmd_data)
|
||
|
||
return success
|
||
|
||
# def batch_verify_all_registers(self, dev_index: int) -> bool:
|
||
# """
|
||
# 批量验证所有DAC寄存器值是否正确
|
||
|
||
# 返回:
|
||
# bool: 全部正确返回True,任意错误返回False
|
||
# """
|
||
# success = True
|
||
# for dac_num in range(1, 21): # DAC编号1-20
|
||
# channel_data = {}
|
||
# for channel in range(1,256): # 通道0-255
|
||
# regs = self.read_channel_registers(dev_index, dac_num, channel)
|
||
# if regs is None:
|
||
# logger.error(f"DAC{dac_num}通道{channel}读取失败")
|
||
# success = False
|
||
# continue
|
||
|
||
# # 验证寄存器值是否符合默认值
|
||
# if (regs['R1'] != REG_DEFAULT_VALUES['R1'] or
|
||
# regs['R2'] != REG_DEFAULT_VALUES['R2'] or
|
||
# regs['R3'] != REG_DEFAULT_VALUES['R3'] or
|
||
# regs['R4'] != REG_DEFAULT_VALUES['R4']):
|
||
# logger.error(f"DAC{dac_num}通道{channel}校验失败: "
|
||
# f"R1=0x{regs['R1']:02X}(期望0x{REG_DEFAULT_VALUES['R1']:02X}) "
|
||
# f"R2=0x{regs['R2']:02X}(期望0x{REG_DEFAULT_VALUES['R2']:02X}) "
|
||
# f"R3=0x{regs['R3']:02X}(期望0x{REG_DEFAULT_VALUES['R3']:02X}) "
|
||
# f"R4=0x{regs['R4']:02X}(期望0x{REG_DEFAULT_VALUES['R4']:02X})")
|
||
# success = False
|
||
|
||
# channel_data[channel] = regs
|
||
|
||
# # 保存验证结果到CSV
|
||
# CSVLogger.write_register_read(dac_num, channel_data)
|
||
|
||
# return success
|
||
|
||
|
||
def batch_verify_all_registers(self, dev_index: int) -> bool:
|
||
"""
|
||
批量验证所有DAC寄存器值是否正确
|
||
|
||
返回:
|
||
bool: 全部正确返回True,任意错误返回False
|
||
"""
|
||
# 更新默认寄存器值字典,包含R5-R7
|
||
REG_DEFAULT_VALUES_EXTENDED = {
|
||
'R1': 0x00,
|
||
'R2': 0xA0,
|
||
'R3': 0x28,
|
||
'R4': 0x00,
|
||
'R5': 0x00, # 添加R5-R7的默认值
|
||
'R6': 0x00,
|
||
'R7': 0x00
|
||
}
|
||
|
||
success = True
|
||
for dac_num in range(1, 21): # DAC编号1-20
|
||
channel_data = {}
|
||
for channel in range(1, 2): # 通道1-255
|
||
# 发送读取命令
|
||
control_params = [
|
||
dac_num & 0x7F, # DAC编号
|
||
channel & 0xFF, # 目标通道
|
||
0, 0, 0, 0, 0, 0, 0, 0, 0 # 保留字节
|
||
]
|
||
|
||
frame = self._build_frame(
|
||
cmd=SP_CMD_RGG_RD,
|
||
control_params=control_params,
|
||
voltage_data=[],
|
||
voltage_type=VOLTAGE_TYPE_SAME
|
||
)
|
||
|
||
if not self.uart_handler.send_data(frame):
|
||
logger.error(f"DAC{dac_num}通道{channel}发送读取命令失败")
|
||
success = False
|
||
continue
|
||
|
||
# 读取响应(等待50ms后读取7字节)
|
||
time.sleep(0.05)
|
||
response = self.uart_handler.read_data(7)
|
||
|
||
if len(response) < 7:
|
||
logger.error(f"DAC{dac_num}通道{channel}响应数据不足,期望7字节,收到{len(response)}字节")
|
||
success = True
|
||
regs = {
|
||
'R1': 0x00,
|
||
'R2': 0x00,
|
||
'R3': 0x00,
|
||
'R4': 0x00,
|
||
'R5': 0x00,
|
||
'R6': 0x00,
|
||
'R7': 0x00
|
||
}
|
||
channel_data[channel] = regs
|
||
CSVLogger.write_register_read_extended(dac_num, channel_data)
|
||
continue
|
||
|
||
# 解析寄存器值
|
||
regs = {
|
||
'R7': response[0],
|
||
'R6': response[1],
|
||
'R5': response[2],
|
||
'R4': response[3],
|
||
'R3': response[4], # 新增R5-R7
|
||
'R2': response[5],
|
||
'R1': response[6]
|
||
}
|
||
|
||
# 验证寄存器值是否符合默认值
|
||
# for reg_num in range(1, 8):
|
||
# reg_key = f'R{reg_num}'
|
||
# if regs[reg_key] != REG_DEFAULT_VALUES_EXTENDED[reg_key]:
|
||
# logger.error(f"DAC{dac_num}通道{channel} {reg_key}校验失败: "
|
||
# f"0x{regs[reg_key]:02X}(期望0x{REG_DEFAULT_VALUES_EXTENDED[reg_key]:02X})")
|
||
# success = False
|
||
|
||
channel_data[channel] = regs
|
||
|
||
# 保存验证结果到CSV
|
||
CSVLogger.write_register_read_extended(dac_num, channel_data)
|
||
|
||
return success
|
||
|
||
|
||
def batch_write_then_read_all_registers(self, dev_index: int) -> bool:
|
||
"""
|
||
批量读写所有DAC所有通道的寄存器
|
||
|
||
返回:
|
||
bool: 全部成功返回True,任意失败返回False
|
||
"""
|
||
success = True
|
||
for dac_num in range(1, 21): # DAC编号1-20
|
||
read_data = {} # 存储读取结果
|
||
write_cmd_data = {} # 存储写入命令
|
||
|
||
for channel in range(1, 256): # :将range(256)改为range(1, 256),通道1-255
|
||
# ===== 写入操作 =====
|
||
control_params = [
|
||
dac_num & 0x7F, # 字节1:DAC编号
|
||
channel & 0xFF, # 字节2:通道号
|
||
0x01, # 字节3:EN_TADC=1
|
||
0x10, # 字节4:TRIG_TADC=1
|
||
REG_DEFAULT_VALUES['R1'], # 字节5:R1
|
||
REG_DEFAULT_VALUES['R2'], # 字节6:R2
|
||
REG_DEFAULT_VALUES['R3'], # 字节7:R3
|
||
REG_DEFAULT_VALUES['R4'], # 字节8:R4
|
||
0, 0, 0 # 保留字节
|
||
]
|
||
|
||
# 记录写入命令的4-7字节
|
||
write_cmd_data[channel] = control_params[3:7]
|
||
|
||
# 发送写入命令
|
||
write_frame = self._build_frame(
|
||
cmd=SP_CMD_RGG_WR,
|
||
control_params=control_params,
|
||
voltage_data=[],
|
||
voltage_type=VOLTAGE_TYPE_SAME
|
||
)
|
||
if not self.uart_handler.send_data(write_frame):
|
||
logger.error(f"DAC{dac_num}通道{channel}写入失败")
|
||
success = False
|
||
continue
|
||
|
||
time.sleep(0.01) # 写入后短暂延迟
|
||
|
||
# ===== 读取操作 =====
|
||
regs = self.read_channel_registers(dev_index, dac_num, channel)
|
||
if regs is None:
|
||
logger.error(f"DAC{dac_num}通道{channel}读取失败")
|
||
success = False
|
||
continue
|
||
|
||
# 验证寄存器值
|
||
if (regs['R1'] != REG_DEFAULT_VALUES['R1'] or
|
||
regs['R2'] != REG_DEFAULT_VALUES['R2'] or
|
||
regs['R3'] != REG_DEFAULT_VALUES['R3'] or
|
||
regs['R4'] != REG_DEFAULT_VALUES['R4']):
|
||
logger.error(f"DAC{dac_num}通道{channel}校验失败: "
|
||
f"R1=0x{regs['R1']:02X}(期望0x{REG_DEFAULT_VALUES['R1']:02X}) "
|
||
f"R2=0x{regs['R2']:02X}(期望0x{REG_DEFAULT_VALUES['R2']:02X}) "
|
||
f"R3=0x{regs['R3']:02X}(期望0x{REG_DEFAULT_VALUES['R3']:02X}) "
|
||
f"R4=0x{regs['R4']:02X}(期望0x{REG_DEFAULT_VALUES['R4']:02X})")
|
||
success = False
|
||
|
||
read_data[channel] = regs
|
||
time.sleep(0.01) # 读写间隔
|
||
|
||
# 保存当前DAC的写入命令和读取结果
|
||
CSVLogger.write_register_write(dac_num, write_cmd_data)
|
||
CSVLogger.write_register_read(dac_num, read_data)
|
||
|
||
return success
|
||
|
||
|
||
"""CSV文件记录工具类"""
|
||
class CSVLogger:
|
||
@staticmethod
|
||
|
||
def write_register_read_extended(dac_num: int, data: Dict[int, Dict[str, int]]):
|
||
"""
|
||
记录寄存器读取结果到CSV(包含R1-R7)
|
||
格式示例:
|
||
通道1: 0x00,0xA0,0x28,0x00,0x00,0x00,0x00
|
||
"""
|
||
filename = f"DAC{dac_num}_reg_read_extended_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
||
os.makedirs('reg_logs', exist_ok=True)
|
||
|
||
with open(f'reg_logs/{filename}', 'w', newline='') as f:
|
||
writer = csv.writer(f)
|
||
# 更新表头包含R1-R7
|
||
writer.writerow(['Channel', 'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7'])
|
||
for channel, regs in data.items():
|
||
writer.writerow([
|
||
f'通道{channel}',
|
||
f"0x{regs['R1']:02X}",
|
||
f"0x{regs['R2']:02X}",
|
||
f"0x{regs['R3']:02X}",
|
||
f"0x{regs['R4']:02X}",
|
||
f"0x{regs['R5']:02X}", # 新增R5-R7
|
||
f"0x{regs['R6']:02X}",
|
||
f"0x{regs['R7']:02X}"
|
||
])
|
||
logger.info(f"DAC{dac_num}扩展寄存器读取结果已保存到{filename}")
|
||
|
||
def write_register_read(dac_num: int, data: Dict[int, Dict[str, int]]):
|
||
"""
|
||
记录寄存器读取结果到CSV
|
||
格式示例:
|
||
通道1: 0x00,0xA0,0x28,0x00
|
||
通道2: 0x01,0xA1,0x29,0x01
|
||
"""
|
||
filename = f"DAC{dac_num}_reg_read_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
||
os.makedirs('reg_logs', exist_ok=True)
|
||
|
||
with open(f'reg_logs/{filename}', 'w', newline='') as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow(['Channel', 'R1', 'R2', 'R3', 'R4'])
|
||
for channel, regs in data.items():
|
||
writer.writerow([
|
||
f'通道{channel}',
|
||
f"0x{regs['R1']:02X}",
|
||
f"0x{regs['R2']:02X}",
|
||
f"0x{regs['R3']:02X}",
|
||
f"0x{regs['R4']:02X}"
|
||
])
|
||
logger.info(f"DAC{dac_num}寄存器读取结果已保存到{filename}")
|
||
|
||
@staticmethod
|
||
def write_register_write(dac_num: int, data: Dict[int, List[int]]):
|
||
"""
|
||
记录寄存器写入命令的4-7字节到CSV
|
||
格式示例:
|
||
通道1: 0x01,0x10,0x00,0xA0
|
||
通道2: 0x01,0x10,0x01,0xA1
|
||
"""
|
||
filename = f"写入DAC{dac_num}_reg_write_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
||
os.makedirs('reg_logs', exist_ok=True)
|
||
|
||
with open(f'reg_logs/{filename}', 'w', newline='') as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow(['Channel', 'Byte4', 'Byte5', 'Byte6', 'Byte7'])
|
||
for channel, bytes_data in data.items():
|
||
writer.writerow([
|
||
f'通道{channel}',
|
||
f"0x{bytes_data[0]:02X}",
|
||
f"0x{bytes_data[1]:02X}",
|
||
f"0x{bytes_data[2]:02X}",
|
||
f"0x{bytes_data[3]:02X}"
|
||
])
|
||
logger.info(f"DAC{dac_num}寄存器写入命令已保存到{filename}")
|
||
|
||
if __name__ == "__main__":
|
||
# 创建DACHandler实例
|
||
handler = DACHandler()
|
||
|
||
# 定义回调函数
|
||
def invalid_cmd_callback(res: bool):
|
||
logger.info(f"无效命令结果: {res}")
|
||
|
||
def zero_callback(res: bool):
|
||
logger.info(f"归零结果: {res}")
|
||
|
||
def same_value_callback(res: bool):
|
||
logger.info(f"同值配置结果: {res}")
|
||
|
||
def diff_value_callback(res: bool):
|
||
logger.info(f"异值配置结果: {res}")
|
||
|
||
def rgg_wr_callback(res: bool):
|
||
logger.info(f"RGG_WR 命令结果: {res}")
|
||
|
||
def rgg_rd_callback(res: bool):
|
||
logger.info(f"RGG_RD 命令结果: {res}")
|
||
|
||
# 设置回调函数
|
||
handler.set_callback('set_invalid_cmd_res', invalid_cmd_callback)
|
||
handler.set_callback('set_zero_res', zero_callback)
|
||
handler.set_callback('set_same_value_res', same_value_callback)
|
||
handler.set_callback('set_diff_value_res', diff_value_callback)
|
||
handler.set_callback('set_rgg_wr_res', rgg_wr_callback)
|
||
handler.set_callback('set_rgg_rd_res', rgg_rd_callback)
|
||
|
||
# 开启调试模式
|
||
handler.debug(True)
|
||
|
||
try:
|
||
# 初始化USB设备(CH347串口设备)
|
||
devices = handler.init_usb_device()
|
||
if not devices:
|
||
logger.error("未找到 CH347 UART 设备")
|
||
exit(1)
|
||
|
||
|
||
# 执行写后立即读操作
|
||
# logger.info("=== 开始写后立即读操作 ===")
|
||
# if not handler.batch_write_then_read_all_registers(devices[0]):
|
||
# logger.error("写后立即读操作失败")
|
||
# else:
|
||
# logger.info("所有寄存器读写验证完成")
|
||
|
||
|
||
# # 1. 首次上电读取所有寄存器
|
||
# logger.info("=== 开始首次读取所有寄存器 ===")
|
||
# if not handler.batch_read_all_registers(devices[0]):
|
||
# logger.error("首次读取寄存器失败")
|
||
|
||
# 2. 写入默认寄存器值
|
||
logger.info("=== 开始写入默认寄存器值 ===")
|
||
if not handler.batch_write_all_registers(devices[0]):
|
||
logger.error("写入默认寄存器值失败")
|
||
|
||
# 3. 读取寄存器值
|
||
logger.info("=== 开始验证寄存器值 ===")
|
||
if not handler.batch_verify_all_registers(devices[0]):
|
||
logger.error("寄存器验证失败")
|
||
else:
|
||
logger.info("所有寄存器验证成功")
|
||
|
||
|
||
|
||
# 配置DAC5通道10的寄存器
|
||
handler.write_channel_registers(
|
||
dev_index=0,
|
||
dac_num=5,
|
||
channel=10,
|
||
reg_values={
|
||
'R1': 0x01, # 只修改R1
|
||
'R3': 0x30 # 修改R3
|
||
} # R2/R4保持默认
|
||
)
|
||
|
||
# 默认值
|
||
handler.write_channel_registers(
|
||
dev_index=0,
|
||
dac_num=5,
|
||
channel=10,
|
||
reg_values=REG_DEFAULT_VALUES
|
||
)
|
||
|
||
# 读取DAC5通道10的寄存器
|
||
regs = handler.read_channel_registers(
|
||
dev_index=0,
|
||
dac_num=5,
|
||
channel=10
|
||
)
|
||
print(f"DAC5通道10寄存器: {regs}")
|
||
|
||
# # 重置DAC5通道10为默认值
|
||
# handler.write_channel_registers(
|
||
# dev_index=0,
|
||
# dac_num=5,
|
||
# channel=10,
|
||
# reg_values={} # 不传参数即全部使用默认值
|
||
# )
|
||
|
||
|
||
|
||
|
||
|
||
# 测试电压配置
|
||
# dac_number = 1
|
||
# voltage_values = [512] * 11
|
||
# print(voltage_values)
|
||
# handler.self_test_voltage_config(devices[0], dac_number, voltage_values)
|
||
# time.sleep(2)
|
||
|
||
# 测试ADC触发
|
||
# dac_number = 1
|
||
# dac_channel = 1
|
||
# handler.self_test_adc_tri(devices[0], dac_number, dac_channel)
|
||
# time.sleep(2)
|
||
|
||
# 测试发送1024个电压值到OPA
|
||
# voltage_values_1024 = [v % 1024 for v in range(1024)] # 调整为10位值
|
||
# # voltage_values_1024 = [1023] * 1024
|
||
# handler.send_1024_voltages_to_opa(devices[0], opa_number=3, voltage_values=voltage_values_1024)
|
||
# time.sleep(2)
|
||
|
||
|
||
|
||
# file_path = r"D:\mjq\Hot_SIOPA\package\SPGDdata\VscanOutput\Vse_psi_0.00_theta_13.31_opa_1.txt"
|
||
# opa_number = 1
|
||
# # 发送文件中的电压值
|
||
# handler.send_voltages_from_file(devices[0], opa_number, file_path)
|
||
|
||
finally:
|
||
# 关闭设备
|
||
handler.stop() |