#!/usr/bin/env python # -*- coding: utf_8 -*- import modbus_tk.defines as cst from plc.tools import bytes_to_int_list, get_bit_bool, int_list_to_bytes from plc.modbus import ModbusConnection from conf.setting import AGVPLC from models.path import Path import time class TaskAddr: #BATCH START BATCH_ADDR_BEGIN = 21000 #BATCH END BATCH_ADDR_END = 21059 #BATCH LENGTH # BATCH_ADDR_LENGTH = 563 BATCH_ADDR_LENGTH = 100 #Task START TASK_ADDR_BEGIN = 21100 #Task END TASK_ADDR_END = 21195 #BATCH LENGTH TASK_ADDR_LENGTH = 100 # 起始地址 TASKTYPE_ADDR_BEGIN = 21000 #是否经过玻璃门 PASS_DOOR_ADDR = 21001 #货架第几层 LAYER_ADDR = 21002 #机械臂是否动作 ARM_ADDR = 21003 # AGV路径点集合 PATH_ADDR_BEGIN = 21010 # PATH_ADDR_LENGTH = 50 #任务数量 TASK_NUM = 21060 #任务取放开始地址 15061, 15062 -> 15079, 15080 T1_ADDR_GET_ADDR = 21061 T1_ADDR_RETURN_ADDR = 21603 #任务截至地址 T10_ADDR_GET_ADDR = 21088 T10_ADDR_RETURN_ADDR = 21090 # ###### # Read Registers ###### #任务是否完成 TASK_DONE_ADDR = 22000 #任务消息接收 TASK_RECV_ADDR = 22001 #AGV所在站点 AGV_STOP_ADDR = 22002 #AGV当前电量 AGV_POWER_ADDR = 22003 #AGV充电状态 AGV_CHARGE_ADDR = 22004 #设备忙 BUSY_ADDR = 22005 #设备异常 ABNORM_ADDR = 22006 #报警信息 ALARM_ADDR = 22007 #状态寄存器长度 # 连续读取8字节,单字节长度为16位 STATUS_ADDR_LENGTH = 8 ############## ### Camera related ############## # PC -> PLC PIC_DONE_ADDR = 21500 DOOR_READY_ADDR = 21501 DDX_ADDR = 21502 DDY_ADDR = 21504 # PLC -> PC Status CAM_BEGIN_ADDR = 22500 CAM_STATUS_ADDR_LENGTH = 12 def clear_batch(): cmdList = [] for _ in range(100): cmdList.append(0) try: with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res1 = master.execute( 1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=TaskAddr.BATCH_ADDR_BEGIN, output_value=cmdList ) res2 = master.execute( 1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=TaskAddr.TASK_ADDR_BEGIN, output_value=cmdList ) # print('cmdList: ',res1) except Exception as e: print('cmdList信息指令异常:' + str(e)) time.sleep(2) #sleep for a little while after clear register time.sleep(0.1) def write_batch(cmdList = []): try: with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=TaskAddr.BATCH_ADDR_BEGIN, output_value=cmdList ) print('cmdList: ',res) except Exception as e: print('cmdList信息指令异常:' + str(e)) time.sleep(2) def write_task_batch(cmdList = []): try: with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=TaskAddr.TASK_ADDR_BEGIN, output_value=cmdList ) print('write_task_batch: ',res) except Exception as e: print('write_task_batch信息指令异常:' + str(e)) time.sleep(2) def write_batch_float(cmdList = []): try: with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=TaskAddr.BATCH_ADDR_BEGIN, output_value=cmdList, data_format='>f' ) print('cmdList Float: ',res) except Exception as e: print('cmdList Float信息指令异常:' + str(e)) time.sleep(2) # def cmd_type(cmdType: int): try: with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_SINGLE_REGISTER, starting_address=TaskAddr.TASKTYPE_ADDR_BEGIN, output_value=cmdType ) print('cmd_type: ',res) if res: task_status = False except Exception as e: print('cmd_type信息指令异常:' + str(e)) time.sleep(2) async def setSubPaths(start: str, stop: str): path = await Path.filter(start = start, stop = stop, is_valid = True, is_active = True) subPaths = [] if path and len(path) > 0: subPaths = path.paths try: with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=TaskAddr.TASKTYPE_ADDR_BEGIN, output_value=subPaths ) print('cmd_type: ',res) if res: task_status = False except Exception as e: print('cmd_type信息指令异常:' + str(e)) time.sleep(2) # def pass_door(passDoor: int): try: with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_SINGLE_REGISTER, starting_address=TaskAddr.PASS_DOOR_ADDR, output_value=passDoor ) print('cmd_type: ',res) if res: task_status = False except Exception as e: print('cmd_type信息指令异常:' + str(e)) time.sleep(2) # def setLayer(layer: int): try: with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_SINGLE_REGISTER, starting_address=TaskAddr.LAYER_ADDR, output_value=layer ) print('cmd_type: ',res) if res: task_status = False except Exception as e: print('cmd_type信息指令异常:' + str(e)) time.sleep(2) # def moveArm(move: int): try: with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_SINGLE_REGISTER, starting_address=TaskAddr.ARM_ADDR, output_value=move ) print('cmd_type: ',res) if res: task_status = False except Exception as e: print('cmd_type信息指令异常:' + str(e)) time.sleep(2) # def taskNum(num: int): try: with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_SINGLE_REGISTER, starting_address=TaskAddr.TASK_NUM, output_value=num ) print('cmd_type: ',res) if res: task_status = False except Exception as e: print('cmd_type信息指令异常:' + str(e)) time.sleep(2) def setTasks(tasks = []): try: with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=TaskAddr.T1_ADDR_GET_ADDR, output_value=tasks ) print('cmd_type: ',res) if res: task_status = False except Exception as e: print('cmd_type信息指令异常:' + str(e)) time.sleep(2) def readStatus(): with ModbusConnection( AGVPLC.HOST, AGVPLC.PORT ) as master: status = master.execute( 1, cst.READ_HOLDING_REGISTERS, TaskAddr.TASK_DONE_ADDR, TaskAddr.STATUS_ADDR_LENGTH ) # print('readStatus: ',status) return status def readCamStatus(): with ModbusConnection( AGVPLC.HOST, AGVPLC.PORT ) as master: status = master.execute( 1, cst.READ_HOLDING_REGISTERS, TaskAddr.CAM_BEGIN_ADDR, TaskAddr.CAM_STATUS_ADDR_LENGTH, ) # print('Cam readStatus: ',status) return status def setCamIntList(parms = []): try: with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=TaskAddr.PIC_DONE_ADDR, output_value=parms ) # print('setCamIntList: ',res) except Exception as e: print('cmd_type信息指令异常:' + str(e)) time.sleep(2) # def camDone(done: int): with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_SINGLE_REGISTER, starting_address=TaskAddr.PIC_DONE_ADDR, output_value=done ) print('camDone: ',res) # def doorReady(done: int): with ModbusConnection( # 试剂amr机器人 AGVPLC.HOST, AGVPLC.PORT ) as master: res = master.execute( 1, cst.WRITE_SINGLE_REGISTER, starting_address=TaskAddr.DOOR_READY_ADDR, output_value=done ) print('doorReady: ',res) # def writeddx(ddx: float): # with ModbusConnection( # # 试剂amr机器人 # AGVPLC.HOST, # AGVPLC.PORT # ) as master: # res = master.execute( # 1, # cst.WRITE_MULTIPLE_REGISTERS, # starting_address=TaskAddr.DDX_ADDR, # output_value=[ddx], # data_format='>f' # ) # print('writeddx: ',res) # def writeddy(ddy: float): # with ModbusConnection( # # 试剂amr机器人 # AGVPLC.HOST, # AGVPLC.PORT # ) as master: # res = master.execute( # 1, # cst.WRITE_MULTIPLE_REGISTERS, # starting_address=TaskAddr.DDY_ADDR, # output_value=[ddy], # data_format='>f' # ) # print('writeddy: ',res)