import time import serial import struct import modbus_tk.defines as cst from app.lib import StaticData from app.lib.StaticData import DeviceStatus from app.lib.Utils import Utils from modbus_tk import modbus_rtu from app.conf.Setting import settings from app.lib.Log import logger ''' 下位机地址定义 ''' EXPERIMENT = 801 #试验操作地址位 STATUS_ADDR_BEGIN =3701 #获取状态信息地址位 ELECT_SWITCH =102 #电磁阀 ''' 工作模式定义 ''' STATUS_ADDR_LENGTH =24 #获取状态长度 State_dict={ "0": "空闲", "1": "升温阶段", "2": "结果判定中", } class Status(object): ''' 状态数据结构 ''' status: int = None #状态 0 1 2 status_text: str = None #状态 temp1: str = None #铜炉中心温度 temp2: str = None #加热铜块温度 temp3: str = None #样品温度 temp4: str = None #自燃温度 sy_work_time: str = None switch_state = None result = None def keys(self): return ["status", "status_text", "temp1", "temp2", "temp3", "temp4", "sy_work_time", "switch_state", "result"] def __getitem__(self, key): return getattr(self,key) class ModbusConnection(object): ''' Modbus连接管理器 ''' def __init__(self): try: self.serial =serial.Serial(port=settings.PORT,baudrate=settings.BAUDRATE, bytesize=settings.BYTESIZE, stopbits=settings.STOPBITS) self.master = modbus_rtu.RtuMaster(self.serial) self.master.set_timeout(1.0) except Exception as e: logger.error(f"连接串口异常: {e}") def parser_status(self,data_bytes): ''' 解析状态 ''' channel1_status =struct.unpack('>H', data_bytes[0:2])[0] channel2_status =struct.unpack('>H', data_bytes[2:4])[0] channel3_status =struct.unpack('>H', data_bytes[4:6])[0] channel4_status =struct.unpack('>H', data_bytes[6:8])[0] channel5_status =struct.unpack('>H', data_bytes[8:10])[0] channel1_voltage =float(int(struct.unpack('>H', data_bytes[10:12])[0]) /10) #电压 /10获取小数 channel2_voltage =float(int(struct.unpack('>H', data_bytes[12:14])[0]) /10) channel3_voltage =float(int(struct.unpack('>H', data_bytes[14:16])[0]) /10) channel4_voltage =float(int(struct.unpack('>H', data_bytes[16:18])[0]) /10) channel5_voltage =float(int(struct.unpack('>H', data_bytes[18:20])[0]) /10) channel1_electricity =float(int(struct.unpack('>H', data_bytes[20:22])[0]) /10) #电流 channel2_electricity =float(int(struct.unpack('>H', data_bytes[22:24])[0]) /10) channel3_electricity =float(int(struct.unpack('>H', data_bytes[24:26])[0]) /10) channel4_electricity =float(int(struct.unpack('>H', data_bytes[26:28])[0]) /10) channel5_electricity =float(int(struct.unpack('>H', data_bytes[28:30])[0]) /10) channel1_temperature =float(int(struct.unpack('>H', data_bytes[30:32])[0]) /10) #温度 channel2_temperature =float(int(struct.unpack('>H', data_bytes[32:34])[0]) /10) channel3_temperature =float(int(struct.unpack('>H', data_bytes[34:36])[0]) /10) channel4_temperature =float(int(struct.unpack('>H', data_bytes[36:38])[0]) /10) channel5_temperature =float(int(struct.unpack('>H', data_bytes[38:40])[0]) /10) zfm_temperature =float(int(struct.unpack('>H', data_bytes[40:42])[0]) /10) # 蒸发皿温度 xt_temperature =float(int(struct.unpack('>H', data_bytes[42:44])[0]) /10) # 箱体温度 kzzt_temperature =int(struct.unpack('>H', data_bytes[44:46])[0]) # 温度控制状态 exception_status =int(struct.unpack('>H', data_bytes[46:48])[0]) # 异常状态1 bit0--继电器通讯失败 bit2--加热功能异常 bit3--制冷功能异常 StaticData.DeviceStatus.zfm_temperature = zfm_temperature StaticData.DeviceStatus.xt_temperature = xt_temperature StaticData.DeviceStatus.kzzt_temperature = kzzt_temperature StaticData.DeviceStatus.exception_status = exception_status for item in StaticData.ChannelDatas: if item.name == '通道一': item.status = channel1_status item.voltage = channel1_voltage item.electricity = channel1_electricity item.temperature = channel1_temperature elif item.name == '通道二': item.status = channel2_status item.voltage = channel2_voltage item.electricity = channel2_electricity item.temperature = channel2_temperature elif item.name == '通道三': item.status = channel3_status item.voltage = channel3_voltage item.electricity = channel3_electricity item.temperature = channel3_temperature elif item.name == '通道四': item.status = channel4_status item.voltage = channel4_voltage item.electricity = channel4_electricity item.temperature = channel4_temperature elif item.name == '通道五': item.status = channel5_status item.voltage = channel5_voltage item.electricity = channel5_electricity item.temperature = channel5_temperature return None # def parser_status(self,data_bytes): # ''' # 解析状态 # ''' # status.status =struct.unpack('>H', data_bytes[0:2])[0] # # status.temp1 = round(struct.unpack('>f', data_bytes[8:10] + data_bytes[6:8])[0],2) # status.temp2 = round(struct.unpack('>f', data_bytes[12:14] + data_bytes[10:12])[0],2) # status.temp3 = round(struct.unpack('>f', data_bytes[16:18] + data_bytes[14:16])[0],2) # status.temp4 = round(struct.unpack('>f', data_bytes[20:22] + data_bytes[18:20])[0],2) # # status.sy_work_time = round(struct.unpack('>f', data_bytes[24:26] + data_bytes[22:24])[0],2) # status.switch_state = struct.unpack('>H', data_bytes[26:28])[0] # status.result = struct.unpack('>H', data_bytes[28:30])[0] # return status def Experiment(self,**kwargs_): ''' 试验操作 ''' channel_dict = {'通道一':1, '通道二':10, '通道三':100, '通道四':1000, '通道五':10000 ,'全部': 11111} channel_list = kwargs_["channel_list"] channel = 0 for name in channel_list: channel += channel_dict.get(name) work_time = int(kwargs_["work_time"] * 60) place_holding = struct.pack('>H', int(0))#0 占位符 d_work_status = struct.pack('>H', 1)#1开始试验0停止试验 d_sy_time = struct.pack('>H', work_time) # 实验时间 channel_list = struct.pack('>H', int(channel)) # 通道有效位 00111就代表前3个通道有效 关闭时通道位也传1 模式传0 temperature = struct.pack('>H', int(float(kwargs_["temperature"]) * 10)) # 目标温度 比如511,就是51.1度 temperature_control = struct.pack('>H', int(kwargs_["temperature_control"] or 0)) # 是否开启控温 0关闭 1开启 output = ( d_work_status + d_sy_time + channel_list + temperature + temperature_control ) print(f'开始实验参数::::::::::{d_work_status},{d_sy_time},{channel_list},{temperature},{temperature_control}') vl_output = Utils.bytes_to_int_list(output) try: res = self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=EXPERIMENT, output_value=vl_output) # [11,12,13,12] except Exception as e: print(f'试验操作异常: {e}') res = None print(f'开始实验返回值:::::::::::::{res}') return res # def Experiment(self,**kwargs_): # ''' # 试验操作 # ''' # place_holding = struct.pack('>H', int(0))#0 占位符 # d_work_status = struct.pack('>H', int(kwargs_["work_status"]))#1开始试验2停止试验 # elect_switch = struct.pack('>H', int(kwargs_["elect_switch"])) # # d_natural_temp = struct.pack('>f', float(kwargs_["natural_temp"])) # d_sy_time = struct.pack('>f', float(kwargs_["max_work_time"] or 30)) # # output = ( # d_work_status # + elect_switch # + place_holding # + d_natural_temp[2:] + d_natural_temp[:2] # + place_holding + place_holding # + d_sy_time[2:] + d_sy_time[:2] # ) # vl_output = Utils.bytes_to_int_list(output) # try: # res = self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=EXPERIMENT, output_value=vl_output) # [11,12,13,12] # except Exception as e: # print(f'试验操作异常: {e}') # res = None # return res def StopExperiment(self, **kwargs_): ''' 停止试验操作 ''' channel_dict = {'通道一':1, '通道二':10, '通道三':100, '通道四':1000, '通道五':10000 ,'全部': 11111} channel_list = kwargs_["channel_list"] d_work_status = struct.pack('>H', 0)#1开始试验0停止试验 channel_list = struct.pack('>H', int(channel_dict.get(channel_list[0]))) # 通道有效位 00111就代表前3个通道有效 关闭时通道位也传1 模式传0 00001 一通道 00010 二通道 00100 三通道 01000 四通道 10000 五通道 print(f'结束实验参数::::::::::{d_work_status},{channel_list}') try: res = self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=(EXPERIMENT+2), output_value=Utils.bytes_to_int_list(channel_list)) time.sleep(0.2) res = self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=EXPERIMENT, output_value=Utils.bytes_to_int_list(d_work_status)) except Exception as e: res = None print(f'结束实验返回值:::::::::::::{res}') return res def Send_Mes(self,Addr,status): ''' Addr 地址位 ''' d_0 = struct.pack('>H', status) output = (d_0) vl_output = Utils.bytes_to_int_list(output) try: res =self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS,starting_address=Addr, output_value=vl_output) except Exception as e: print(e) res = None return res def get_status(self): ''' 获取状态 ''' try: status = self.master.execute( 1, cst.READ_INPUT_REGISTERS, STATUS_ADDR_BEGIN, STATUS_ADDR_LENGTH ) except Exception as e: logger.error(f"获取状态异常: {e}") return None print('status:::::::::::::',status) # [12,4,4] print(':int_list_to_bytes:::::',status) # [0xdd,0xll] print('parser_status:::::::::::::',self.parser_status(Utils.int_list_to_bytes(status))) # return self.parser_status(Utils.int_list_to_bytes(status)) modbus_obj = ModbusConnection() # 获取状态 modbus_obj.get_status() # 开始实验 # kwargs_={} # kwargs_["work_status"] = 1 # kwargs_["elect_switch"] = 0 # kwargs_["natural_temp"] = 40 # kwargs_["max_work_time"] = 30 # # modbus_obj.Experiment(**kwargs_)