This repository has been archived on 2026-04-08. You can view files and clone it, but cannot push or open issues or pull requests.
SP713_Upper_Python/SP713-UART250711.py
2025-07-17 17:58:03 +08:00

1553 lines
57 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'''独立模块最终版本 - 完整串口通信版本'''
import ctypes
import time
import logging
from typing import Callable, List, Dict, Optional
import csv
from datetime import datetime
import os
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)
# === 协议常量基于《SP713-ICD协议_V1.6》) ===
TIMEOUT = 1.0 # 串口通信超时时间(秒)
FRAME_HEADER = [0x9F, 0xE4] # 帧头表3字节1-2低字节0x9F高字节0xE4
FRAME_SIZE = 338 # 固定帧长度表3定义338字节
BAUDRATE = 115200 # 波特率表1定义115200bps
VOLTAGE_TYPE_SAME = 0x01 # 电压类型同值表5
VOLTAGE_TYPE_DIFF = 0x02 # 电压类型异值表5
# === 命令码基于表4 ===
SP_CMD_NA = 0 # 无效命令
SP_CMD_ZERO = 1 # 归零命令
SP_CMD_SAME_VALUE = 2 # 同值配置命令
SP_CMD_DIF_VALUE = 3 # 异值配置命令
SP_CMD_RGG_WR = 4 # 寄存器写入命令
SP_CMD_RGG_RD = 5 # 寄存器读取命令
# === 分组常量基于表4 ===
GROUP_ALL = 0xAA # 所有分组
GROUP_AB = 0x01 # AB分组
GROUP_OPA = 0x02 # OPA分组
GROUP_DAC = 0x03 # DAC分组
# === 寄存器常量定义 ===
REG_DEFAULT_VALUES = {
'R1': 0x00, # 寄存器1默认值
'R2': 0xA0, # 寄存器2默认值
'R3': 0x28, # 寄存器3默认值
'R4': 0x00 # 寄存器4默认值
}
# === CH347 DLL加载 ===
try:
# 尝试加载CH347芯片的DLL驱动
ch347 = ctypes.WinDLL(r"D:\mjq\Hot_SIOPA\SP713-spi\CH347DLLA64.DLL")
except Exception as e:
logger.error(f"无法加载 CH347DLL.DLL: {e}")
raise ValueError("DLL加载失败")
# def calculate_crc16(data: bytes) -> int:
# """计算CRC16校验值 """
# crc = 0xFFFF # 初始化CRC寄存器为0xFFFF
# for byte in data:
# crc ^= byte # 当前字节与CRC低8位异或
# for _ in range(8): # 处理每个bit
# if crc & 0x0001: # 检查最低位是否为1
# crc >>= 1 # 右移1位
# crc ^= 0xA001 # 如果最低位是1异或多项式0xA001
# else:
# crc >>= 1 # 否则仅右移
# return crc # 返回最终CRC值
# === 新增反射型 CRC16 计算函数 ===
def generate_crc16_reverse_table(poly=0x8408):
"""生成 LSB-first 反射型 CRC-16 表"""
table = []
for i in range(256):
crc = i
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ poly
else:
crc >>= 1
table.append(crc & 0xFFFF)
return table
CRC16_REVERSE_TABLE = generate_crc16_reverse_table() # 预计算表
def calculate_crc16(data: bytes) -> int:
"""使用反射表计算 CRC16 """
crc = 0x6363 # 初始值
for b in data:
crc = (crc >> 8) ^ CRC16_REVERSE_TABLE[(crc ^ b) & 0xFF]
return crc & 0xFFFF
class UARTHandler:
"""
串口通信处理器类用于管理CH347芯片的UART通信
功能:
- 打开/关闭UART设备
- 发送/接收UART数据
- 计算校验和
- 调试模式控制
"""
def __init__(self):
self.device_index = -1 # CH347设备索引-1表示未打开
self.running = False # 串口通信运行状态
self.data_received_callback = None # 数据接收回调函数
self.frame_count = 0 # 帧计数器表3字节150-255循环
self.debug_mode = False # 调试模式开关
def open(self) -> bool:
"""
打开CH347设备并配置UART接口
返回: bool - 成功返回True失败返回False
"""
try:
# 1. 打开设备
self.device_index = ch347.CH347Uart_Open(0)
if self.device_index == -1:
logger.error("无法打开 CH347 UART 设备")
return False
# 2. 初始化串口参数
# 参数: 波特率, 数据位(8), 校验位(0=None), 停止位(0=1位), 字节超时(单位100uS)
if not ch347.CH347Uart_Init(0, BAUDRATE, 8, 0, 0, 0):
logger.error("UART 初始化失败")
return False
# 3. 设置超时时间
if not ch347.CH347Uart_SetTimeout(0, 1000, 1000): # 读写超时各1秒
logger.error("设置 UART 超时失败")
return False
self.running = True
logger.info(f"CH347 UART 接口打开成功,波特率={BAUDRATE}")
return True
except Exception as e:
logger.error(f"打开 CH347 UART 接口失败: {e}")
return False
def send_data(self, data: bytes) -> bool:
"""
通过UART发送数据帧
参数:
data (bytes): 要发送的数据
返回:
bool: 发送成功返回True失败返回False
"""
if self.debug_mode:
logger.debug(f"发送数据: {data.hex()}")
try:
# 准备发送缓冲区
write_buffer = (ctypes.c_ubyte * len(data)).from_buffer_copy(data)
length = ctypes.c_ulong(len(data))
# 执行UART写操作
success = ch347.CH347Uart_Write(
0, write_buffer, ctypes.byref(length))
if not success or length.value != len(data):
logger.error(f"UART 数据发送失败,实际发送 {length.value}/{len(data)} 字节")
return False
logger.debug("UART 数据发送成功")
return True
except Exception as e:
logger.error(f"UART 发送数据失败: {e}")
return False
def read_data(self, length: int) -> bytes:
"""
通过UART读取指定长度的数据
参数:
length (int): 要读取的数据长度
返回:
bytes: 读取到的数据失败返回空bytes
"""
try:
# 准备接收缓冲区
read_buffer = (ctypes.c_ubyte * length)()
read_length = ctypes.c_ulong(length)
# 执行UART读操作
success = ch347.CH347Uart_Read(
0, read_buffer, ctypes.byref(read_length))
if not success:
logger.error("UART 读取数据失败")
return b""
if self.debug_mode:
logger.debug(f"接收数据: {bytes(read_buffer[:read_length.value]).hex()}")
return bytes(read_buffer[:read_length.value])
except Exception as e:
logger.error(f"UART 读取数据失败: {e}")
return b""
def set_data_received_callback(self, callback: Callable[[bytes], None]) -> None:
"""
设置数据接收回调函数
参数:
callback (Callable[[bytes], None]): 回调函数
"""
self.data_received_callback = callback
def debug(self, on: bool = True) -> None:
"""
开启或关闭调试模式
参数:
on (bool): True开启调试False关闭调试
"""
self.debug_mode = on
logger.info(f"调试模式: {'开启' if on else '关闭'}")
def stop(self) -> None:
"""
关闭CH347设备
"""
self.running = False
ch347.CH347Uart_Close(0)
logger.info("CH347 UART 接口已关闭")
def calculate_checksum(self, data: List[int]) -> int:
"""
计算校验和表3字节16和338
参数:
data (List[int]): 要计算校验和的数据列表
返回:
int: 校验和值
"""
return sum(data) & 0xFF
class DACHandler:
"""
DAC测试工装协议处理器类
功能:
- 处理DAC测试工装的协议逻辑
- 通过UARTHandler与FPGA和DAC512通信
- 支持多种命令操作
"""
def __init__(self):
# 初始化串口处理器
self.uart_handler = UARTHandler()
if not self.uart_handler.open():
raise Exception("初始化 UARTHandler 失败")
# 命令回调函数字典
self.callbacks = {
'set_invalid_cmd_res': None,
'set_zero_res': None,
'set_same_value_res': None,
'set_diff_value_res': None,
'set_rgg_wr_res': None,
'set_rgg_rd_res': None,
}
self.devices = [0] # 设备列表
def init_usb_device(self) -> List[int]:
"""
初始化USB设备
返回:
List[int]: 设备索引列表
"""
return self.devices
def set_callback(self, signal: str, callback: Callable[[bool], None]) -> None:
"""
设置命令响应回调函数
参数:
signal (str): 信号名称
callback (Callable[[bool], None]): 回调函数
"""
if signal in self.callbacks:
self.callbacks[signal] = callback
def debug(self, on: bool = True) -> None:
"""
开启或关闭调试模式
参数:
on (bool): True开启调试False关闭调试
"""
self.uart_handler.debug(on)
def stop(self) -> None:
"""
停止串口通信,关闭设备
"""
self.uart_handler.stop()
def _validate_group_child(self, group: int, child_group: int) -> bool:
"""
验证分组和子分组的有效性基于表4
参数:
group (int): 分组值
child_group (int): 子分组值
返回:
bool: 有效返回True无效返回False
"""
# 定义有效的分组和子分组对应关系
valid_pairs = {
GROUP_ALL: [0xAA],
GROUP_AB: [0x01, 0x02],
GROUP_OPA: [0x01, 0x02, 0x03, 0x04, 0x05],
GROUP_DAC: list(range(0x01, 0x15))
}
if group not in valid_pairs:
logger.error(f"无效的分组: {group:02X}")
return False
if child_group not in valid_pairs[group]:
logger.error(f"子分组 {child_group:02X} 与分组 {group:02X} 不匹配")
return False
return True
def _encode_10bit_voltages(self, values: List[int]) -> List[int]:
"""
将10位电压值编码为320字节符合表5异值格式
参数:
values (List[int]): 10位电压值列表0-1023最多256个
返回:
List[int]: 320字节电压值参数
"""
if not values:
return [0] * 320
if any(v < 0 or v > 1023 for v in values):
logger.error("电压值必须在0-1023范围内")
return [0] * 320
# 初始化320字节
bytes_out = [0] * 320
bit_position = 0 # 当前比特位置
byte_index = 0 # 当前字节索引
for value in values[:256]: # 最多256个值
# 剩余比特数
bits_remaining = 10
while bits_remaining > 0:
# 当前字节可用比特数
bits_in_current_byte = 8 - (bit_position % 8)
# 本次写入的比特数
bits_to_write = min(bits_remaining, bits_in_current_byte)
# 提取value的低bits_to_write位
value_bits = (value >> (bits_remaining - bits_to_write)) & ((1 << bits_to_write) - 1)
# 写入当前字节
bytes_out[byte_index] |= value_bits << (bits_in_current_byte - bits_to_write)
# 更新位置
bit_position += bits_to_write
bits_remaining -= bits_to_write
if bit_position % 8 == 0:
byte_index += 1
if self.uart_handler.debug_mode:
logger.debug(f"编码10位电压值: {bytes(bytes_out[:10]).hex()}...前10字节")
return bytes_out
def _build_frame(self, cmd: int, control_params: List[int], voltage_data: List[int], voltage_type: int) -> bytes:
"""构建340字节协议帧V2.0版本含CRC16校验"""
if len(control_params) != 11:
logger.error("控制参数长度必须为11字节")
return b""
# 初始化帧
frame = FRAME_HEADER + [cmd] + control_params + [self.uart_handler.frame_count]
checksum1 = self.uart_handler.calculate_checksum(frame)
frame.append(checksum1)
frame.append(voltage_type)
# 处理电压值参数
if voltage_type == VOLTAGE_TYPE_DIFF:
voltage_bytes = self._encode_10bit_voltages(voltage_data[:256])
else:
little_endian_voltage = []
for val in voltage_data[:159]:
little_endian_voltage.extend([val & 0xFF, (val >> 8) & 0xFF])
voltage_bytes = little_endian_voltage + [0] * (320 - len(little_endian_voltage))
frame.extend(voltage_bytes)
# 新增计算电压数据的CRC16字节18-337
voltage_part = bytes(frame[17:337]) # 提取电压数据部分
crc16 = calculate_crc16(voltage_part)
frame.extend([crc16 & 0xFF, (crc16 >> 8) & 0xFF]) # 小端模式
# 计算最终校验和字节1-339
checksum2 = self.uart_handler.calculate_checksum(frame)
frame.append(checksum2)
self.uart_handler.frame_count = (self.uart_handler.frame_count + 1) % 256
if self.uart_handler.debug_mode:
logger.debug(f"构建帧: {bytes(frame).hex()}")
logger.debug(f"CRC16: 0x{crc16:04X}")
# frame = frame[:16]
return bytes(frame)
def set_invalid_cmd(self, dev_index: int = 0) -> bool:
"""
发送无效命令SP_CMD_NA
参数:
dev_index (int): 设备索引
返回:
bool: 发送成功返回True失败返回False
"""
cmd = SP_CMD_NA
control_params = [0] * 11
voltage_data = []
frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME)
success = self.uart_handler.send_data(frame)
if success and self.callbacks['set_invalid_cmd_res']:
self.callbacks['set_invalid_cmd_res'](True)
elif not success and self.callbacks['set_invalid_cmd_res']:
self.callbacks['set_invalid_cmd_res'](False)
return success
def set_zero(self, dev_index: int, group: int, child_group: int) -> bool:
"""
发送归零命令SP_CMD_ZERO
参数:
dev_index (int): 设备索引
group (int): 分组值
child_group (int): 子分组值
返回:
bool: 发送成功返回True失败返回False
"""
if not self._validate_group_child(group, child_group):
return False
cmd = SP_CMD_ZERO
control_params = [group & 0xFF, child_group & 0xFF] + [0] * 9
voltage_data = []
frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME)
success = self.uart_handler.send_data(frame)
if success:
logger.info(f"归零成功,分组: {group:02X}, 子分组: {child_group:02X}")
if self.callbacks['set_zero_res']:
self.callbacks['set_zero_res'](True)
else:
logger.error(f"归零失败,分组: {group:02X}, 子分组: {child_group:02X}")
if self.callbacks['set_zero_res']:
self.callbacks['set_zero_res'](False)
return success
def set_same_value(self, dev_index: int, value: int, group: int, child_group: int) -> bool:
"""
发送同值配置命令SP_CMD_SAME_VALUE
参数:
dev_index (int): 设备索引
value (int): 电压值0-0xFFFF
group (int): 分组值
child_group (int): 子分组值
返回:
bool: 发送成功返回True失败返回False
"""
if not (0 <= value <= 0xFFFF):
logger.error(f"无效的电压值: {value}")
return False
if not self._validate_group_child(group, child_group):
return False
cmd = SP_CMD_SAME_VALUE
control_params = [group & 0xFF, child_group & 0xFF] + [0] * 9
voltage_data = [value] * 159
frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME)
success = self.uart_handler.send_data(frame)
if success:
logger.info(f"同值配置成功,分组: {group:02X}, 子分组: {child_group:02X}, 电压值: {value}")
if self.callbacks['set_same_value_res']:
self.callbacks['set_same_value_res'](True)
else:
logger.error(f"同值配置失败,分组: {group:02X}, 子分组: {child_group:02X}")
if self.callbacks['set_same_value_res']:
self.callbacks['set_same_value_res'](False)
return success
def set_diff_value(self, dev_index: int, values: List[int], group: int = GROUP_DAC, child_group: int = 1) -> bool:
"""
发送异值配置命令SP_CMD_DIF_VALUE设置256个10位电压值
参数:
dev_index (int): 设备索引
values (List[int]): 10位电压值列表0-1023最多256个
group (int): 分组值(默认 GROUP_DAC
child_group (int): 子分组值(默认 1
返回:
bool: 发送成功返回True失败返回False
"""
# 验证电压值数量和范围
if len(values) > 256:
logger.error("异值配置电压值不能超过 256 个")
return False
if any(v < 0 or v > 1023 for v in values):
logger.error("电压值必须在0-1023范围内")
return False
if not self._validate_group_child(group, child_group):
return False
cmd = SP_CMD_DIF_VALUE
control_params = [group & 0xFF, child_group & 0xFF] + [0] * 9
frame = self._build_frame(cmd, control_params, values, VOLTAGE_TYPE_DIFF)
success = self.uart_handler.send_data(frame)
if success:
logger.info(f"异值配置成功,分组: {group:02X}, 子分组: {child_group:02X}, 电压值数量: {len(values)}")
if self.callbacks['set_diff_value_res']:
self.callbacks['set_diff_value_res'](True)
else:
logger.error(f"异值配置失败,分组: {group:02X}, 子分组: {child_group:02X}")
if self.callbacks['set_diff_value_res']:
self.callbacks['set_diff_value_res'](False)
return success
def set_rgg_wr(self, dev_index: int, dac_number: int, dac_channel: int, en_tadc: int,
fbk_en: int, temp_test_en: int, test_en: int, trig_tadc: int,
clk_div: int, icon8: int, votes: List[int]) -> bool:
"""
发送寄存器写入命令SP_CMD_RGG_WR
参数:
dev_index (int): 设备索引
dac_number (int): DAC编号
dac_channel (int): DAC通道
en_tadc (int): 使能TADC
fbk_en (int): 反馈使能
temp_test_en (int): 温度测试使能
test_en (int): 测试使能
trig_tadc (int): 触发TADC
clk_div (int): 时钟分频
icon8 (int): ICON8配置
votes (List[int]): 投票值列表
返回:
bool: 发送成功返回True失败返回False
"""
cmd = SP_CMD_RGG_WR
control_params = [
dac_number & 0xFF,
dac_channel & 0xFF,
(dac_channel >> 8) & 0xFF,
en_tadc & 0x01,
fbk_en & 0x01,
temp_test_en & 0x01,
test_en & 0x01,
trig_tadc & 0x01,
clk_div & 0xFF,
icon8 & 0xFF,
0
]
voltage_data = votes[:159]
frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME)
success = self.uart_handler.send_data(frame)
if success and self.callbacks['set_rgg_wr_res']:
self.callbacks['set_rgg_wr_res'](True)
elif not success and self.callbacks['set_rgg_wr_res']:
self.callbacks['set_rgg_wr_res'](False)
return success
def set_rgg_rd(self, dev_index: int, dac_number: int, dac_channel: int) -> bool:
"""
发送寄存器读取命令SP_CMD_RGG_RD
参数:
dev_index (int): 设备索引
dac_number (int): DAC编号
dac_channel (int): DAC通道
返回:
bool: 发送成功返回True失败返回False
"""
cmd = SP_CMD_RGG_RD
control_params = [
dac_number & 0xFF,
dac_channel & 0xFF,
(dac_channel >> 8) & 0xFF,
0, 0, 0, 0, 0, 0, 0, 0
]
voltage_data = []
frame = self._build_frame(cmd, control_params, voltage_data, VOLTAGE_TYPE_SAME)
success = self.uart_handler.send_data(frame)
if success:
time.sleep(1)
response = self.uart_handler.read_data(7)
self._parse_response(response)
if success and self.callbacks['set_rgg_rd_res']:
self.callbacks['set_rgg_rd_res'](True)
elif not success and self.callbacks['set_rgg_rd_res']:
self.callbacks['set_rgg_rd_res'](False)
return success
def _parse_response(self, data: bytes) -> None:
"""
解析寄存器读取响应
参数:
data (bytes): 接收到的数据
"""
if len(data) < 7:
logger.error("接收数据过短期望7字节寄存器值")
return
try:
registers = list(data[:7])
logger.info(f"寄存器值 (REG7-REG1): {registers}")
except Exception as e:
logger.error(f"解析寄存器数据失败: {e}")
def apply_arg(self, dev_index: int, cmd_arg: Dict, votege_arg: List[int]) -> bool:
"""
根据命令参数字典执行相应命令
参数:
dev_index (int): 设备索引
cmd_arg (Dict): 命令参数字典
votege_arg (List[int]): 电压参数列表
返回:
bool: 执行成功返回True失败返回False
"""
cmd = cmd_arg.get('cmd', SP_CMD_NA)
# 根据命令码调用相应的方法
if cmd == SP_CMD_RGG_WR:
return self.set_rgg_wr(
dev_index,
cmd_arg.get('dacNumber', 0),
cmd_arg.get('dacChannel', 0),
cmd_arg.get('enTadc', 1),
cmd_arg.get('fbkEn', 0),
cmd_arg.get('tempTestEn', 0),
cmd_arg.get('testEn', 1),
cmd_arg.get('trigTadc', 0),
cmd_arg.get('clkDiv', 1),
cmd_arg.get('icon8', 1),
votege_arg
)
elif cmd == SP_CMD_DIF_VALUE:
return self.set_diff_value(
dev_index,
votege_arg,
cmd_arg.get('group', GROUP_DAC),
cmd_arg.get('childGroup', 1)
)
elif cmd == SP_CMD_SAME_VALUE:
return self.set_same_value(
dev_index,
votege_arg[0] if votege_arg else 0,
cmd_arg.get('group', GROUP_DAC),
cmd_arg.get('childGroup', 1)
)
elif cmd == SP_CMD_ZERO:
return self.set_zero(
dev_index,
cmd_arg.get('group', GROUP_ALL),
cmd_arg.get('childGroup', GROUP_ALL)
)
elif cmd == SP_CMD_RGG_RD:
return self.set_rgg_rd(
dev_index,
cmd_arg.get('dacNumber', 0),
cmd_arg.get('dacChannel', 0)
)
else:
return self.set_invalid_cmd(dev_index)
def self_test_voltage_config(self, dev_index: int, dac_number: int, voltage_values: List[int]) -> bool:
"""
DAC自检电压配置
参数:
dev_index (int): 设备索引
dac_number (int): DAC编号
voltage_values (List[int]): 电压值列表
返回:
bool: 执行成功返回True失败返回False
"""
cmd_arg = {
'cmd': SP_CMD_RGG_WR,
'dacNumber': dac_number,
'dacChannel': 0,
'enTadc': 0,
'fbkEn': 0,
'tempTestEn': 0,
'testEn': 0,
'trigTadc': 0,
'clkDiv': 0,
'icon8': 0
}
success = self.apply_arg(dev_index, cmd_arg, voltage_values[:11])
if success:
logger.info(f"DAC {dac_number} 电压配置成功")
time.sleep(0.1) # 等待1秒清零
success = self.set_invalid_cmd(dev_index)
if success:
logger.info("DAC与FPGA通信验证成功")
else:
logger.error("DAC与FPGA通信验证失败")
else:
logger.error(f"DAC {dac_number} 电压配置失败")
return success
def self_test_adc_tri(self, dev_index: int, dac_number: int, dac_channel: int) -> bool:
"""
DAC自检ADC触发
参数:
dev_index (int): 设备索引
dac_number (int): DAC编号
dac_channel (int): DAC通道
返回:
bool: 执行成功返回True失败返回False
"""
# 初始化寄存器
cmd_arg_init = {
'cmd': SP_CMD_RGG_WR,
'dacNumber': dac_number,
'dacChannel': dac_channel,
'enTadc': 1,
'fbkEn': 0,
'tempTestEn': 0,
'testEn': 1,
'trigTadc': 0,
'clkDiv': 0x01,
'icon8': 0x01
}
success = self.apply_arg(dev_index, cmd_arg_init, [])
if not success:
logger.error(f"DAC {dac_number} 寄存器初始化失败")
return False
logger.info(f"DAC {dac_number} 寄存器初始化成功")
# time.sleep(0.001)
# 配置通道
cmd_arg_channel = {
'cmd': SP_CMD_RGG_WR,
'dacNumber': dac_number,
'dacChannel': dac_channel,
'enTadc': 1,
'fbkEn': 0,
'tempTestEn': 0,
'testEn': 1,
'trigTadc': 1,
'clkDiv': 0x01,
'icon8': 0x01
}
success = self.apply_arg(dev_index, cmd_arg_channel, [])
if not success:
logger.error(f"DAC {dac_number} 通道选择配置失败")
return False
logger.info(f"DAC {dac_number} 通道选择配置成功")
# time.sleep(0.001)
# 读取寄存器
cmd_arg_read = {
'cmd': SP_CMD_RGG_RD,
'dacNumber': dac_number,
'dacChannel': dac_channel
}
success = self.apply_arg(dev_index, cmd_arg_read, [])
if success:
logger.info(f"DAC {dac_number} 寄存器读取成功")
else:
logger.error(f"DAC {dac_number} 寄存器读取失败")
return success
def voltage_lookup_config(self, dev_index: int, opa_number: int, voltage_values: List[int], same_value: bool = True) -> bool:
"""
配置OPA的电压值
参数:
dev_index (int): 设备索引
opa_number (int): OPA编号
voltage_values (List[int]): 电压值列表
same_value (bool): 是否使用同值配置
返回:
bool: 执行成功返回True失败返回False
"""
# OPA到DAC的映射关系
opa_to_dac = {
0: [4, 5, 9, 10],
1: [2, 3, 7, 8],
2: [1, 6, 11, 16],
3: [12, 13, 17, 18],
4: [14, 15, 19, 20]
}
dac_numbers = opa_to_dac.get(opa_number, [])
if not dac_numbers:
logger.error(f"无效的OPA编号: {opa_number}")
return False
success = True
for dac_number in dac_numbers:
if same_value:
# 同值配置
cmd_arg = {
'cmd': SP_CMD_SAME_VALUE,
'group': GROUP_DAC,
'childGroup': dac_number
}
success &= self.apply_arg(dev_index, cmd_arg, [voltage_values[0]] if voltage_values else [0])
if success:
logger.info(f"OPA {opa_number}, DAC {dac_number} 同值配置成功")
else:
logger.error(f"OPA {opa_number}, DAC {dac_number} 同值配置失败")
else:
# 异值配置
cmd_arg = {
'cmd': SP_CMD_DIF_VALUE,
'group': GROUP_DAC,
'childGroup': dac_number
}
success &= self.apply_arg(dev_index, cmd_arg, voltage_values[:256])
if success:
logger.info(f"OPA {opa_number}, DAC {dac_number} 异值配置成功")
else:
logger.error(f"OPA {opa_number}, DAC {dac_number} 异值配置失败")
return success
def send_1024_voltages_to_opa(self, dev_index: int, opa_number: int, voltage_values: List[int]) -> bool:
"""
向OPA发送1024个电压值分组给4个DAC
参数:
dev_index (int): 设备索引
opa_number (int): OPA编号
voltage_values (List[int]): 电压值列表必须为1024个
group_dac: DAC分组编号
返回:
bool: 执行成功返回True失败返回False
"""
if len(voltage_values) != 1024:
logger.error("电压值列表长度必须为1024")
return False
# OPA到DAC的映射关系
opa_to_dac = {
0: [4, 5, 9, 10],
1: [2, 3, 7, 8],
2: [1, 6, 11, 16],
3: [12, 13, 17, 18],
4: [14, 15, 19, 20]
}
dac_numbers = opa_to_dac.get(opa_number, [])
if not dac_numbers:
logger.error(f"无效的OPA编号: {opa_number}")
return False
# 将1024个电压值分成4组每组256个
voltage_chunks = [
voltage_values[0:256],
voltage_values[256:512],
voltage_values[512:768],
voltage_values[768:1024]
]
success = True
for dac_number, voltages in zip(dac_numbers, voltage_chunks):
time.sleep(0.1)
cmd_arg = {
'cmd': SP_CMD_DIF_VALUE,
'group': GROUP_DAC,
'childGroup': dac_number
}
success &= self.apply_arg(dev_index, cmd_arg, voltages)
if success:
logger.info(f"OPA {opa_number}, DAC {dac_number} 1024电压值配置成功")
else:
logger.error(f"OPA {opa_number}, DAC {dac_number} 1024电压值配置失败")
break
return success
def send_voltages_from_file(self, dev_index: int, opa_number: int, file_path: str) -> bool:
"""
从文本文件读取1024个电压值并发送到指定OPA
参数:
dev_index (int): 设备索引
opa_number (int): OPA编号 (0-4)
file_path (str): 电压值文件路径
返回:
bool: 执行成功返回True失败返回False
"""
try:
# 读取文件内容
with open(file_path, 'r') as f:
lines = f.readlines()
# 转换为整数列表(过滤空行和非数字内容)
voltage_values = []
for line in lines:
line = line.strip()
if line and line.isdigit():
voltage_values.append(int(line))
# 验证数据量
if len(voltage_values) != 1024:
logger.error(f"电压值数量必须为1024个实际读取到 {len(voltage_values)}")
return False
# 调用现有方法发送数据
return self.send_1024_voltages_to_opa(dev_index, opa_number, voltage_values)
except Exception as e:
logger.error(f"读取电压文件失败: {e}")
return False
# =====寄存器读写========
def write_channel_registers(self, dev_index: int, dac_num: int, channel: int,
reg_values: Dict[str, int]) -> bool:
"""
写入指定DAC通道的寄存器值R1-R4
基于SP_CMD_REG_WR命令表4
参数:
dev_index: 设备索引
dac_num: DAC编号(1-20)
channel: 通道号(0-255)
reg_values: 寄存器值字典 {'R1':0x00, 'R2':0xA0, 'R3':0x28, 'R4':0x00}
返回:
bool: 成功返回True
"""
# 参数校验
if not (1 <= dac_num <= 20 and 1 <= channel <= 255): # 将0 <= channel改为1 <= channel
logger.error(f"参数错误: DAC编号({dac_num})必须1-20, 通道号({channel})必须0-255")
return False
# 构建控制参数11字节
control_params = [
dac_num & 0x7F, # 字节1DAC编号bit6-0
channel & 0xFF, # 字节2目标通道号
0x01, # 字节3bit0=1(EN_TADC)
0x10, # 字节4bit4=1(TRIG_TADC)
reg_values.get('R1', 0x00),
reg_values.get('R2', 0xA0),
reg_values.get('R3', 0x28),
reg_values.get('R4', 0x00),
0, 0, 0 # 保留字节
]
# 构建并发送帧
frame = self._build_frame(
cmd=SP_CMD_RGG_WR,
control_params=control_params,
voltage_data=[],
voltage_type=VOLTAGE_TYPE_SAME
)
success = self.uart_handler.send_data(frame)
if success:
logger.info(f"DAC{dac_num}通道{channel}寄存器写入: "
f"R1=0x{reg_values.get('R1', 0x00):02X} "
f"R2=0x{reg_values.get('R2', 0xA0):02X} "
f"R3=0x{reg_values.get('R3', 0x28):02X} "
f"R4=0x{reg_values.get('R4', 0x00):02X}")
return success
def read_channel_registers(self, dev_index: int, dac_num: int, channel: int) -> Optional[Dict[str, int]]:
"""
读取指定DAC通道的寄存器值R1-R4
基于SP_CMD_REG_RD命令表4
参数:
dev_index: 设备索引
dac_num: DAC编号(1-20)
channel: 通道号(0-255)
返回:
Dict: 寄存器值字典 {'R1':int, 'R2':int, 'R3':int, 'R4':int}
"""
if not (1 <= dac_num <= 20 and 1 <= channel <= 255): # 将0 <= channel改为1 <= channel
logger.error(f"参数错误: DAC编号({dac_num})必须1-20, 通道号({channel})必须0-255")
return None
# 发送读取命令
control_params = [
dac_num & 0x7F, # DAC编号
channel & 0xFF, # 目标通道
0, 0, 0, 0, 0, 0, 0, 0, 0 # 保留字节
]
frame = self._build_frame(
cmd=SP_CMD_RGG_RD,
control_params=control_params,
voltage_data=[],
voltage_type=VOLTAGE_TYPE_SAME
)
if not self.uart_handler.send_data(frame):
return None
# 读取响应等待50ms后读取7字节
time.sleep(0.05)
response = self.uart_handler.read_data(7)
if len(response) < 7:
logger.error(f"DAC{dac_num}通道{channel}响应数据不足期望7字节收到{len(response)}字节")
return None
# return {
# 'R1': 0x00,
# 'R2': 0x00,
# 'R3': 0x00,
# 'R4': 0x00,
# 'R5': 0x00,
# 'R6': 0x00,
# 'R7': 0x00
# }
return {
'R1': response[3],
'R2': response[4],
'R3': response[5],
'R4': response[6]
}
# =====寄存器批量读写==================
def batch_read_all_registers(self, dev_index: int) -> bool:
"""
上电首次:批量读取所有DAC所有通道的寄存器值
返回:
bool: 全部成功返回True任意失败返回False
"""
success = True
for dac_num in range(1, 21): # DAC编号1-20
channel_data = {}
for channel in range(1,256): # 通道0-255
regs = self.read_channel_registers(dev_index, dac_num, channel)
if regs is None:
logger.error(f"DAC{dac_num}通道{channel}读取失败")
success = False
continue
channel_data[channel] = regs
# 保存到CSV
CSVLogger.write_register_read(dac_num, channel_data)
return success
def batch_write_all_registers(self, dev_index: int) -> bool:
"""
批量写入所有DAC所有通道的默认寄存器值
返回:
bool: 全部成功返回True任意失败返回False
"""
success = True
for dac_num in range(1, 21): # DAC编号1-20
write_cmd_data = {}
for channel in range(1,256): # 通道0-255
# 构建控制参数记录4-7字节
control_params = [
dac_num & 0x7F, # 字节1
0x00, # 字节5
0x09, # 字节6
0x81, # 字节7
# channel & 0xFF, # 字节2
# 0x01, # 字节3 (EN_TADC=1)
# 0x10, # 字节4 (TRIG_TADC=1)
REG_DEFAULT_VALUES['R1'], # 字节5 (R1)
REG_DEFAULT_VALUES['R2'], # 字节6 (R2)
REG_DEFAULT_VALUES['R3'], # 字节7 (R3)
REG_DEFAULT_VALUES['R4'], # 字节8 (R4)
0, 0, 0 # 保留字节
]
# 记录写入命令的4-7字节
write_cmd_data[channel] = control_params[0:7] # 取4-7字节
# 发送写入命令
frame = self._build_frame(
cmd=SP_CMD_RGG_WR,
control_params=control_params,
voltage_data=[],
voltage_type=VOLTAGE_TYPE_SAME
)
if not self.uart_handler.send_data(frame):
logger.error(f"DAC{dac_num}通道{channel}写入失败")
success = False
# 保存写入命令到CSV
CSVLogger.write_register_write(dac_num, write_cmd_data)
return success
# def batch_verify_all_registers(self, dev_index: int) -> bool:
# """
# 批量验证所有DAC寄存器值是否正确
# 返回:
# bool: 全部正确返回True任意错误返回False
# """
# success = True
# for dac_num in range(1, 21): # DAC编号1-20
# channel_data = {}
# for channel in range(1,256): # 通道0-255
# regs = self.read_channel_registers(dev_index, dac_num, channel)
# if regs is None:
# logger.error(f"DAC{dac_num}通道{channel}读取失败")
# success = False
# continue
# # 验证寄存器值是否符合默认值
# if (regs['R1'] != REG_DEFAULT_VALUES['R1'] or
# regs['R2'] != REG_DEFAULT_VALUES['R2'] or
# regs['R3'] != REG_DEFAULT_VALUES['R3'] or
# regs['R4'] != REG_DEFAULT_VALUES['R4']):
# logger.error(f"DAC{dac_num}通道{channel}校验失败: "
# f"R1=0x{regs['R1']:02X}(期望0x{REG_DEFAULT_VALUES['R1']:02X}) "
# f"R2=0x{regs['R2']:02X}(期望0x{REG_DEFAULT_VALUES['R2']:02X}) "
# f"R3=0x{regs['R3']:02X}(期望0x{REG_DEFAULT_VALUES['R3']:02X}) "
# f"R4=0x{regs['R4']:02X}(期望0x{REG_DEFAULT_VALUES['R4']:02X})")
# success = False
# channel_data[channel] = regs
# # 保存验证结果到CSV
# CSVLogger.write_register_read(dac_num, channel_data)
# return success
def batch_verify_all_registers(self, dev_index: int) -> bool:
"""
批量验证所有DAC寄存器值是否正确
返回:
bool: 全部正确返回True任意错误返回False
"""
# 更新默认寄存器值字典包含R5-R7
REG_DEFAULT_VALUES_EXTENDED = {
'R1': 0x00,
'R2': 0xA0,
'R3': 0x28,
'R4': 0x00,
'R5': 0x00, # 添加R5-R7的默认值
'R6': 0x00,
'R7': 0x00
}
success = True
for dac_num in range(1, 21): # DAC编号1-20
channel_data = {}
for channel in range(1, 2): # 通道1-255
# 发送读取命令
control_params = [
dac_num & 0x7F, # DAC编号
channel & 0xFF, # 目标通道
0, 0, 0, 0, 0, 0, 0, 0, 0 # 保留字节
]
frame = self._build_frame(
cmd=SP_CMD_RGG_RD,
control_params=control_params,
voltage_data=[],
voltage_type=VOLTAGE_TYPE_SAME
)
if not self.uart_handler.send_data(frame):
logger.error(f"DAC{dac_num}通道{channel}发送读取命令失败")
success = False
continue
# 读取响应等待50ms后读取7字节
time.sleep(0.05)
response = self.uart_handler.read_data(7)
if len(response) < 7:
logger.error(f"DAC{dac_num}通道{channel}响应数据不足期望7字节收到{len(response)}字节")
success = True
regs = {
'R1': 0x00,
'R2': 0x00,
'R3': 0x00,
'R4': 0x00,
'R5': 0x00,
'R6': 0x00,
'R7': 0x00
}
channel_data[channel] = regs
CSVLogger.write_register_read_extended(dac_num, channel_data)
continue
# 解析寄存器值
regs = {
'R7': response[0],
'R6': response[1],
'R5': response[2],
'R4': response[3],
'R3': response[4], # 新增R5-R7
'R2': response[5],
'R1': response[6]
}
# 验证寄存器值是否符合默认值
# for reg_num in range(1, 8):
# reg_key = f'R{reg_num}'
# if regs[reg_key] != REG_DEFAULT_VALUES_EXTENDED[reg_key]:
# logger.error(f"DAC{dac_num}通道{channel} {reg_key}校验失败: "
# f"0x{regs[reg_key]:02X}(期望0x{REG_DEFAULT_VALUES_EXTENDED[reg_key]:02X})")
# success = False
channel_data[channel] = regs
# 保存验证结果到CSV
CSVLogger.write_register_read_extended(dac_num, channel_data)
return success
def batch_write_then_read_all_registers(self, dev_index: int) -> bool:
"""
批量读写所有DAC所有通道的寄存器
返回:
bool: 全部成功返回True任意失败返回False
"""
success = True
for dac_num in range(1, 21): # DAC编号1-20
read_data = {} # 存储读取结果
write_cmd_data = {} # 存储写入命令
for channel in range(1, 256): # 将range(256)改为range(1, 256)通道1-255
# ===== 写入操作 =====
control_params = [
dac_num & 0x7F, # 字节1DAC编号
channel & 0xFF, # 字节2通道号
0x01, # 字节3EN_TADC=1
0x10, # 字节4TRIG_TADC=1
REG_DEFAULT_VALUES['R1'], # 字节5R1
REG_DEFAULT_VALUES['R2'], # 字节6R2
REG_DEFAULT_VALUES['R3'], # 字节7R3
REG_DEFAULT_VALUES['R4'], # 字节8R4
0, 0, 0 # 保留字节
]
# 记录写入命令的4-7字节
write_cmd_data[channel] = control_params[3:7]
# 发送写入命令
write_frame = self._build_frame(
cmd=SP_CMD_RGG_WR,
control_params=control_params,
voltage_data=[],
voltage_type=VOLTAGE_TYPE_SAME
)
if not self.uart_handler.send_data(write_frame):
logger.error(f"DAC{dac_num}通道{channel}写入失败")
success = False
continue
time.sleep(0.01) # 写入后短暂延迟
# ===== 读取操作 =====
regs = self.read_channel_registers(dev_index, dac_num, channel)
if regs is None:
logger.error(f"DAC{dac_num}通道{channel}读取失败")
success = False
continue
# 验证寄存器值
if (regs['R1'] != REG_DEFAULT_VALUES['R1'] or
regs['R2'] != REG_DEFAULT_VALUES['R2'] or
regs['R3'] != REG_DEFAULT_VALUES['R3'] or
regs['R4'] != REG_DEFAULT_VALUES['R4']):
logger.error(f"DAC{dac_num}通道{channel}校验失败: "
f"R1=0x{regs['R1']:02X}(期望0x{REG_DEFAULT_VALUES['R1']:02X}) "
f"R2=0x{regs['R2']:02X}(期望0x{REG_DEFAULT_VALUES['R2']:02X}) "
f"R3=0x{regs['R3']:02X}(期望0x{REG_DEFAULT_VALUES['R3']:02X}) "
f"R4=0x{regs['R4']:02X}(期望0x{REG_DEFAULT_VALUES['R4']:02X})")
success = False
read_data[channel] = regs
time.sleep(0.01) # 读写间隔
# 保存当前DAC的写入命令和读取结果
CSVLogger.write_register_write(dac_num, write_cmd_data)
CSVLogger.write_register_read(dac_num, read_data)
return success
"""CSV文件记录工具类"""
class CSVLogger:
@staticmethod
def write_register_read_extended(dac_num: int, data: Dict[int, Dict[str, int]]):
"""
记录寄存器读取结果到CSV包含R1-R7
格式示例:
通道1: 0x00,0xA0,0x28,0x00,0x00,0x00,0x00
"""
filename = f"DAC{dac_num}_reg_read_extended_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
os.makedirs('reg_logs', exist_ok=True)
with open(f'reg_logs/{filename}', 'w', newline='') as f:
writer = csv.writer(f)
# 更新表头包含R1-R7
writer.writerow(['Channel', 'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7'])
for channel, regs in data.items():
writer.writerow([
f'通道{channel}',
f"0x{regs['R1']:02X}",
f"0x{regs['R2']:02X}",
f"0x{regs['R3']:02X}",
f"0x{regs['R4']:02X}",
f"0x{regs['R5']:02X}", # 新增R5-R7
f"0x{regs['R6']:02X}",
f"0x{regs['R7']:02X}"
])
logger.info(f"DAC{dac_num}扩展寄存器读取结果已保存到{filename}")
def write_register_read(dac_num: int, data: Dict[int, Dict[str, int]]):
"""
记录寄存器读取结果到CSV
格式示例:
通道1: 0x00,0xA0,0x28,0x00
通道2: 0x01,0xA1,0x29,0x01
"""
filename = f"DAC{dac_num}_reg_read_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
os.makedirs('reg_logs', exist_ok=True)
with open(f'reg_logs/{filename}', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Channel', 'R1', 'R2', 'R3', 'R4'])
for channel, regs in data.items():
writer.writerow([
f'通道{channel}',
f"0x{regs['R1']:02X}",
f"0x{regs['R2']:02X}",
f"0x{regs['R3']:02X}",
f"0x{regs['R4']:02X}"
])
logger.info(f"DAC{dac_num}寄存器读取结果已保存到{filename}")
@staticmethod
def write_register_write(dac_num: int, data: Dict[int, List[int]]):
"""
记录寄存器写入命令的4-7字节到CSV
格式示例:
通道1: 0x01,0x10,0x00,0xA0
通道2: 0x01,0x10,0x01,0xA1
"""
filename = f"写入DAC{dac_num}_reg_write_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
os.makedirs('reg_logs', exist_ok=True)
with open(f'reg_logs/{filename}', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Channel', 'Byte4', 'Byte5', 'Byte6', 'Byte7'])
for channel, bytes_data in data.items():
writer.writerow([
f'通道{channel}',
f"0x{bytes_data[0]:02X}",
f"0x{bytes_data[1]:02X}",
f"0x{bytes_data[2]:02X}",
f"0x{bytes_data[3]:02X}"
])
logger.info(f"DAC{dac_num}寄存器写入命令已保存到{filename}")
if __name__ == "__main__":
# 创建DACHandler实例
handler = DACHandler()
# 定义回调函数
def invalid_cmd_callback(res: bool):
logger.info(f"无效命令结果: {res}")
def zero_callback(res: bool):
logger.info(f"归零结果: {res}")
def same_value_callback(res: bool):
logger.info(f"同值配置结果: {res}")
def diff_value_callback(res: bool):
logger.info(f"异值配置结果: {res}")
def rgg_wr_callback(res: bool):
logger.info(f"RGG_WR 命令结果: {res}")
def rgg_rd_callback(res: bool):
logger.info(f"RGG_RD 命令结果: {res}")
# 设置回调函数
handler.set_callback('set_invalid_cmd_res', invalid_cmd_callback)
handler.set_callback('set_zero_res', zero_callback)
handler.set_callback('set_same_value_res', same_value_callback)
handler.set_callback('set_diff_value_res', diff_value_callback)
handler.set_callback('set_rgg_wr_res', rgg_wr_callback)
handler.set_callback('set_rgg_rd_res', rgg_rd_callback)
# 开启调试模式
handler.debug(True)
try:
# 初始化USB设备CH347串口设备
devices = handler.init_usb_device()
if not devices:
logger.error("未找到 CH347 UART 设备")
exit(1)
# 执行写后立即读操作
# logger.info("=== 开始写后立即读操作 ===")
# if not handler.batch_write_then_read_all_registers(devices[0]):
# logger.error("写后立即读操作失败")
# else:
# logger.info("所有寄存器读写验证完成")
# # 1. 首次上电读取所有寄存器
# logger.info("=== 开始首次读取所有寄存器 ===")
# if not handler.batch_read_all_registers(devices[0]):
# logger.error("首次读取寄存器失败")
# 2. 写入默认寄存器值
logger.info("=== 开始写入默认寄存器值 ===")
if not handler.batch_write_all_registers(devices[0]):
logger.error("写入默认寄存器值失败")
# 3. 读取寄存器值
logger.info("=== 开始验证寄存器值 ===")
if not handler.batch_verify_all_registers(devices[0]):
logger.error("寄存器验证失败")
else:
logger.info("所有寄存器验证成功")
# 配置DAC5通道10的寄存器
handler.write_channel_registers(
dev_index=0,
dac_num=5,
channel=10,
reg_values={
'R1': 0x01, # 只修改R1
'R3': 0x30 # 修改R3
} # R2/R4保持默认
)
# 默认值
handler.write_channel_registers(
dev_index=0,
dac_num=5,
channel=10,
reg_values=REG_DEFAULT_VALUES
)
# 读取DAC5通道10的寄存器
regs = handler.read_channel_registers(
dev_index=0,
dac_num=5,
channel=10
)
print(f"DAC5通道10寄存器: {regs}")
# # 重置DAC5通道10为默认值
# handler.write_channel_registers(
# dev_index=0,
# dac_num=5,
# channel=10,
# reg_values={} # 不传参数即全部使用默认值
# )
# 测试电压配置
# dac_number = 1
# voltage_values = [512] * 11
# print(voltage_values)
# handler.self_test_voltage_config(devices[0], dac_number, voltage_values)
# time.sleep(2)
# 测试ADC触发
# dac_number = 1
# dac_channel = 1
# handler.self_test_adc_tri(devices[0], dac_number, dac_channel)
# time.sleep(2)
# 测试发送1024个电压值到OPA
# voltage_values_1024 = [v % 1024 for v in range(1024)] # 调整为10位值
# # voltage_values_1024 = [1023] * 1024
# handler.send_1024_voltages_to_opa(devices[0], opa_number=3, voltage_values=voltage_values_1024)
# time.sleep(2)
# file_path = r"D:\mjq\Hot_SIOPA\package\SPGDdata\VscanOutput\Vse_psi_0.00_theta_13.31_opa_1.txt"
# opa_number = 1
# # 发送文件中的电压值
# handler.send_voltages_from_file(devices[0], opa_number, file_path)
finally:
# 关闭设备
handler.stop()