You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

293 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import time
import serial
import struct
import modbus_tk.defines as cst
from app.lib import StaticData
from app.lib.StaticData import DeviceStatus
from app.lib.Utils import Utils
from modbus_tk import modbus_rtu
from app.conf.Setting import settings
from app.lib.Log import logger
'''
下位机地址定义
'''
EXPERIMENT = 801 #试验操作地址位
STATUS_ADDR_BEGIN =3701 #获取状态信息地址位
ELECT_SWITCH =102 #电磁阀
'''
工作模式定义
'''
STATUS_ADDR_LENGTH =24 #获取状态长度
State_dict={
"0": "空闲",
"1": "升温阶段",
"2": "结果判定中",
}
class Status(object):
'''
状态数据结构
'''
status: int = None #状态 0 1 2
status_text: str = None #状态
temp1: str = None #铜炉中心温度
temp2: str = None #加热铜块温度
temp3: str = None #样品温度
temp4: str = None #自燃温度
sy_work_time: str = None
switch_state = None
result = None
def keys(self):
return ["status", "status_text", "temp1", "temp2", "temp3", "temp4", "sy_work_time", "switch_state", "result"]
def __getitem__(self, key):
return getattr(self,key)
class ModbusConnection(object):
'''
Modbus连接管理器
'''
def __init__(self):
try:
self.serial =serial.Serial(port=settings.PORT,baudrate=settings.BAUDRATE, bytesize=settings.BYTESIZE, stopbits=settings.STOPBITS)
self.master = modbus_rtu.RtuMaster(self.serial)
self.master.set_timeout(1.0)
except Exception as e:
logger.error(f"连接串口异常: {e}")
def parser_status(self,data_bytes):
'''
解析状态
'''
channel1_status =struct.unpack('>H', data_bytes[0:2])[0]
channel2_status =struct.unpack('>H', data_bytes[2:4])[0]
channel3_status =struct.unpack('>H', data_bytes[4:6])[0]
channel4_status =struct.unpack('>H', data_bytes[6:8])[0]
channel5_status =struct.unpack('>H', data_bytes[8:10])[0]
channel1_voltage =float(int(struct.unpack('>H', data_bytes[10:12])[0]) /10) #电压 /10获取小数
channel2_voltage =float(int(struct.unpack('>H', data_bytes[12:14])[0]) /10)
channel3_voltage =float(int(struct.unpack('>H', data_bytes[14:16])[0]) /10)
channel4_voltage =float(int(struct.unpack('>H', data_bytes[16:18])[0]) /10)
channel5_voltage =float(int(struct.unpack('>H', data_bytes[18:20])[0]) /10)
channel1_electricity =float(int(struct.unpack('>H', data_bytes[20:22])[0]) /10) #电流
channel2_electricity =float(int(struct.unpack('>H', data_bytes[22:24])[0]) /10)
channel3_electricity =float(int(struct.unpack('>H', data_bytes[24:26])[0]) /10)
channel4_electricity =float(int(struct.unpack('>H', data_bytes[26:28])[0]) /10)
channel5_electricity =float(int(struct.unpack('>H', data_bytes[28:30])[0]) /10)
channel1_temperature =float(int(struct.unpack('>H', data_bytes[30:32])[0]) /10) #温度
channel2_temperature =float(int(struct.unpack('>H', data_bytes[32:34])[0]) /10)
channel3_temperature =float(int(struct.unpack('>H', data_bytes[34:36])[0]) /10)
channel4_temperature =float(int(struct.unpack('>H', data_bytes[36:38])[0]) /10)
channel5_temperature =float(int(struct.unpack('>H', data_bytes[38:40])[0]) /10)
zfm_temperature =float(int(struct.unpack('>H', data_bytes[40:42])[0]) /10) # 蒸发皿温度
xt_temperature =float(int(struct.unpack('>H', data_bytes[42:44])[0]) /10) # 箱体温度
kzzt_temperature =int(struct.unpack('>H', data_bytes[44:46])[0]) # 温度控制状态
exception_status =int(struct.unpack('>H', data_bytes[46:48])[0]) # 异常状态1 bit0--继电器通讯失败 bit2--加热功能异常 bit3--制冷功能异常
StaticData.DeviceStatus.zfm_temperature = zfm_temperature
StaticData.DeviceStatus.xt_temperature = xt_temperature
StaticData.DeviceStatus.kzzt_temperature = kzzt_temperature
StaticData.DeviceStatus.exception_status = exception_status
for item in StaticData.ChannelDatas:
if item.name == '通道一':
item.status = channel1_status
item.voltage = channel1_voltage
item.electricity = channel1_electricity
item.temperature = channel1_temperature
elif item.name == '通道二':
item.status = channel2_status
item.voltage = channel2_voltage
item.electricity = channel2_electricity
item.temperature = channel2_temperature
elif item.name == '通道三':
item.status = channel3_status
item.voltage = channel3_voltage
item.electricity = channel3_electricity
item.temperature = channel3_temperature
elif item.name == '通道四':
item.status = channel4_status
item.voltage = channel4_voltage
item.electricity = channel4_electricity
item.temperature = channel4_temperature
elif item.name == '通道五':
item.status = channel5_status
item.voltage = channel5_voltage
item.electricity = channel5_electricity
item.temperature = channel5_temperature
return None
# def parser_status(self,data_bytes):
# '''
# 解析状态
# '''
# status.status =struct.unpack('>H', data_bytes[0:2])[0]
#
# status.temp1 = round(struct.unpack('>f', data_bytes[8:10] + data_bytes[6:8])[0],2)
# status.temp2 = round(struct.unpack('>f', data_bytes[12:14] + data_bytes[10:12])[0],2)
# status.temp3 = round(struct.unpack('>f', data_bytes[16:18] + data_bytes[14:16])[0],2)
# status.temp4 = round(struct.unpack('>f', data_bytes[20:22] + data_bytes[18:20])[0],2)
#
# status.sy_work_time = round(struct.unpack('>f', data_bytes[24:26] + data_bytes[22:24])[0],2)
# status.switch_state = struct.unpack('>H', data_bytes[26:28])[0]
# status.result = struct.unpack('>H', data_bytes[28:30])[0]
# return status
def Experiment(self,**kwargs_):
'''
试验操作
'''
channel_dict = {'通道一':1, '通道二':10, '通道三':100, '通道四':1000, '通道五':10000 ,'全部': 11111}
channel_list = kwargs_["channel_list"]
channel = 0
for name in channel_list:
channel += channel_dict.get(name)
work_time = int(kwargs_["work_time"] * 60)
place_holding = struct.pack('>H', int(0))#0 占位符
d_work_status = struct.pack('>H', 1)#1开始试验0停止试验
d_sy_time = struct.pack('>H', work_time) # 实验时间
channel_list = struct.pack('>H', int(channel)) # 通道有效位 00111就代表前3个通道有效 关闭时通道位也传1 模式传0
temperature = struct.pack('>H', int(float(kwargs_["temperature"]) * 10)) # 目标温度 比如511就是51.1度
temperature_control = struct.pack('>H', int(kwargs_["temperature_control"] or 0)) # 是否开启控温 0关闭 1开启
output = (
d_work_status
+ d_sy_time
+ channel_list
+ temperature
+ temperature_control
)
print(f'开始实验参数::::::::::{d_work_status},{d_sy_time},{channel_list},{temperature},{temperature_control}')
vl_output = Utils.bytes_to_int_list(output)
try:
res = self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=EXPERIMENT, output_value=vl_output) # [11,12,13,12]
except Exception as e:
print(f'试验操作异常: {e}')
res = None
print(f'开始实验返回值:::::::::::::{res}')
return res
# def Experiment(self,**kwargs_):
# '''
# 试验操作
# '''
# place_holding = struct.pack('>H', int(0))#0 占位符
# d_work_status = struct.pack('>H', int(kwargs_["work_status"]))#1开始试验2停止试验
# elect_switch = struct.pack('>H', int(kwargs_["elect_switch"]))
#
# d_natural_temp = struct.pack('>f', float(kwargs_["natural_temp"]))
# d_sy_time = struct.pack('>f', float(kwargs_["max_work_time"] or 30))
#
# output = (
# d_work_status
# + elect_switch
# + place_holding
# + d_natural_temp[2:] + d_natural_temp[:2]
# + place_holding + place_holding
# + d_sy_time[2:] + d_sy_time[:2]
# )
# vl_output = Utils.bytes_to_int_list(output)
# try:
# res = self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=EXPERIMENT, output_value=vl_output) # [11,12,13,12]
# except Exception as e:
# print(f'试验操作异常: {e}')
# res = None
# return res
def StopExperiment(self, **kwargs_):
'''
停止试验操作
'''
channel_dict = {'通道一':1, '通道二':10, '通道三':100, '通道四':1000, '通道五':10000 ,'全部': 11111}
channel_list = kwargs_["channel_list"]
d_work_status = struct.pack('>H', 0)#1开始试验0停止试验
channel_list = struct.pack('>H', int(channel_dict.get(channel_list[0]))) # 通道有效位 00111就代表前3个通道有效 关闭时通道位也传1 模式传0 00001 一通道 00010 二通道 00100 三通道 01000 四通道 10000 五通道
print(f'结束实验参数::::::::::{d_work_status},{channel_list}')
try:
res = self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=(EXPERIMENT+2), output_value=Utils.bytes_to_int_list(channel_list))
time.sleep(0.2)
res = self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=EXPERIMENT, output_value=Utils.bytes_to_int_list(d_work_status))
except Exception as e:
res = None
print(f'结束实验返回值:::::::::::::{res}')
return res
def Send_Mes(self,Addr,status):
'''
Addr 地址位
'''
d_0 = struct.pack('>H', status)
output = (d_0)
vl_output = Utils.bytes_to_int_list(output)
try:
res =self.master.execute(1, cst.WRITE_MULTIPLE_REGISTERS,starting_address=Addr, output_value=vl_output)
except Exception as e:
print(e)
res = None
return res
def get_status(self):
'''
获取状态
'''
try:
status = self.master.execute(
1,
cst.READ_INPUT_REGISTERS,
STATUS_ADDR_BEGIN,
STATUS_ADDR_LENGTH
)
except Exception as e:
logger.error(f"获取状态异常: {e}")
return None
print('status:::::::::::::',status) # [12,4,4]
print(':int_list_to_bytes:::::',status) # [0xdd,0xll]
print('parser_status:::::::::::::',self.parser_status(Utils.int_list_to_bytes(status))) #
return self.parser_status(Utils.int_list_to_bytes(status))
modbus_obj = ModbusConnection()
# 获取状态
modbus_obj.get_status()
# 开始实验
# kwargs_={}
# kwargs_["work_status"] = 1
# kwargs_["elect_switch"] = 0
# kwargs_["natural_temp"] = 40
# kwargs_["max_work_time"] = 30
#
# modbus_obj.Experiment(**kwargs_)