You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

485 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf_8 -*-
import modbus_tk.defines as cst
from plc.tools import bytes_to_int_list, get_bit_bool, int_list_to_bytes
from plc.modbus import ModbusConnection
from conf.setting import AGVPLC
from models.path import Path
import time
class TaskAddr:
#BATCH START
BATCH_ADDR_BEGIN = 21000
#BATCH END
BATCH_ADDR_END = 21059
#BATCH LENGTH
# BATCH_ADDR_LENGTH = 563
BATCH_ADDR_LENGTH = 100
#Task START
TASK_ADDR_BEGIN = 21100
#Task END
TASK_ADDR_END = 21195
#BATCH LENGTH
TASK_ADDR_LENGTH = 100
# 起始地址
TASKTYPE_ADDR_BEGIN = 21000
#是否经过玻璃门
PASS_DOOR_ADDR = 21001
#货架第几层
LAYER_ADDR = 21002
#机械臂是否动作
ARM_ADDR = 21003
# AGV路径点集合
PATH_ADDR_BEGIN = 21010
# PATH_ADDR_LENGTH = 50
#任务数量
TASK_NUM = 21060
#任务取放开始地址 15061, 15062 -> 15079, 15080
T1_ADDR_GET_ADDR = 21061
T1_ADDR_RETURN_ADDR = 21603
#任务截至地址
T10_ADDR_GET_ADDR = 21088
T10_ADDR_RETURN_ADDR = 21090
#
######
# Read Registers
######
#任务是否完成
TASK_DONE_ADDR = 22000
#任务消息接收
TASK_RECV_ADDR = 22001
#AGV所在站点
AGV_STOP_ADDR = 22002
#AGV当前电量
AGV_POWER_ADDR = 22003
#AGV充电状态
AGV_CHARGE_ADDR = 22004
#设备忙
BUSY_ADDR = 22005
#设备异常
ABNORM_ADDR = 22006
#报警信息
ALARM_ADDR = 22007
#状态寄存器长度
# 连续读取8字节单字节长度为16位
STATUS_ADDR_LENGTH = 8
##############
### Camera related
##############
# PC -> PLC
PIC_DONE_ADDR = 21500
DOOR_READY_ADDR = 21501
DDX_ADDR = 21502
DDY_ADDR = 21504
# PLC -> PC Status
CAM_BEGIN_ADDR = 22500
CAM_STATUS_ADDR_LENGTH = 12
def clear_batch():
cmdList = []
for _ in range(100):
cmdList.append(0)
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res1 = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.BATCH_ADDR_BEGIN,
output_value=cmdList
)
res2 = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.TASK_ADDR_BEGIN,
output_value=cmdList
)
# print('cmdList: ',res1)
except Exception as e:
print('cmdList信息指令异常' + str(e))
time.sleep(2)
#sleep for a little while after clear register
time.sleep(0.1)
def write_batch(cmdList = []):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.BATCH_ADDR_BEGIN,
output_value=cmdList
)
print('cmdList: ',res)
except Exception as e:
print('cmdList信息指令异常' + str(e))
time.sleep(2)
def write_task_batch(cmdList = []):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.TASK_ADDR_BEGIN,
output_value=cmdList
)
print('write_task_batch: ',res)
except Exception as e:
print('write_task_batch信息指令异常' + str(e))
time.sleep(2)
def write_batch_float(cmdList = []):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.BATCH_ADDR_BEGIN,
output_value=cmdList,
data_format='>f'
)
print('cmdList Float: ',res)
except Exception as e:
print('cmdList Float信息指令异常' + str(e))
time.sleep(2)
#
def cmd_type(cmdType: int):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.TASKTYPE_ADDR_BEGIN,
output_value=cmdType
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
async def setSubPaths(start: str, stop: str):
path = await Path.filter(start = start, stop = stop, is_valid = True, is_active = True)
subPaths = []
if path and len(path) > 0:
subPaths = path.paths
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.TASKTYPE_ADDR_BEGIN,
output_value=subPaths
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
#
def pass_door(passDoor: int):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.PASS_DOOR_ADDR,
output_value=passDoor
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
#
def setLayer(layer: int):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.LAYER_ADDR,
output_value=layer
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
#
def moveArm(move: int):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.ARM_ADDR,
output_value=move
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
#
def taskNum(num: int):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.TASK_NUM,
output_value=num
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
def setTasks(tasks = []):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.T1_ADDR_GET_ADDR,
output_value=tasks
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
def readStatus():
with ModbusConnection(
AGVPLC.HOST,
AGVPLC.PORT
) as master:
status = master.execute(
1,
cst.READ_HOLDING_REGISTERS,
TaskAddr.TASK_DONE_ADDR,
TaskAddr.STATUS_ADDR_LENGTH
)
# print('readStatus: ',status)
return status
def readCamStatus():
with ModbusConnection(
AGVPLC.HOST,
AGVPLC.PORT
) as master:
status = master.execute(
1,
cst.READ_HOLDING_REGISTERS,
TaskAddr.CAM_BEGIN_ADDR,
TaskAddr.CAM_STATUS_ADDR_LENGTH,
)
# print('Cam readStatus: ',status)
return status
def setCamIntList(parms = []):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.PIC_DONE_ADDR,
output_value=parms
)
# print('setCamIntList: ',res)
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
#
def camDone(done: int):
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.PIC_DONE_ADDR,
output_value=done
)
print('camDone: ',res)
#
def doorReady(done: int):
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.DOOR_READY_ADDR,
output_value=done
)
print('doorReady: ',res)
# def writeddx(ddx: float):
# with ModbusConnection(
# # 试剂amr机器人
# AGVPLC.HOST,
# AGVPLC.PORT
# ) as master:
# res = master.execute(
# 1,
# cst.WRITE_MULTIPLE_REGISTERS,
# starting_address=TaskAddr.DDX_ADDR,
# output_value=[ddx],
# data_format='>f'
# )
# print('writeddx: ',res)
# def writeddy(ddy: float):
# with ModbusConnection(
# # 试剂amr机器人
# AGVPLC.HOST,
# AGVPLC.PORT
# ) as master:
# res = master.execute(
# 1,
# cst.WRITE_MULTIPLE_REGISTERS,
# starting_address=TaskAddr.DDY_ADDR,
# output_value=[ddy],
# data_format='>f'
# )
# print('writeddy: ',res)