You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

485 lines
11 KiB

#!/usr/bin/env python
# -*- coding: utf_8 -*-
import modbus_tk.defines as cst
from plc.tools import bytes_to_int_list, get_bit_bool, int_list_to_bytes
from plc.modbus import ModbusConnection
from conf.setting import AGVPLC
from models.path import Path
import time
class TaskAddr:
2 weeks ago
#BATCH START
BATCH_ADDR_BEGIN = 21000
#BATCH END
BATCH_ADDR_END = 21059
#BATCH LENGTH
# BATCH_ADDR_LENGTH = 563
BATCH_ADDR_LENGTH = 100
#Task START
TASK_ADDR_BEGIN = 21100
#Task END
TASK_ADDR_END = 21195
#BATCH LENGTH
TASK_ADDR_LENGTH = 100
# 起始地址
2 weeks ago
TASKTYPE_ADDR_BEGIN = 21000
#是否经过玻璃门
2 weeks ago
PASS_DOOR_ADDR = 21001
#货架第几层
2 weeks ago
LAYER_ADDR = 21002
#机械臂是否动作
2 weeks ago
ARM_ADDR = 21003
# AGV路径点集合
PATH_ADDR_BEGIN = 21010
# PATH_ADDR_LENGTH = 50
#任务数量
2 weeks ago
TASK_NUM = 21060
#任务取放开始地址 15061, 15062 -> 15079, 15080
2 weeks ago
T1_ADDR_GET_ADDR = 21061
T1_ADDR_RETURN_ADDR = 21603
#任务截至地址
2 weeks ago
T10_ADDR_GET_ADDR = 21088
T10_ADDR_RETURN_ADDR = 21090
#
######
# Read Registers
######
#任务是否完成
2 weeks ago
TASK_DONE_ADDR = 22000
#任务消息接收
2 weeks ago
TASK_RECV_ADDR = 22001
#AGV所在站点
2 weeks ago
AGV_STOP_ADDR = 22002
#AGV当前电量
2 weeks ago
AGV_POWER_ADDR = 22003
#AGV充电状态
2 weeks ago
AGV_CHARGE_ADDR = 22004
#设备忙
2 weeks ago
BUSY_ADDR = 22005
#设备异常
2 weeks ago
ABNORM_ADDR = 22006
#报警信息
2 weeks ago
ALARM_ADDR = 22007
#状态寄存器长度
# 连续读取8字节单字节长度为16位
STATUS_ADDR_LENGTH = 8
2 weeks ago
##############
### Camera related
##############
# PC -> PLC
PIC_DONE_ADDR = 21500
DOOR_READY_ADDR = 21501
DDX_ADDR = 21502
DDY_ADDR = 21504
# PLC -> PC Status
CAM_BEGIN_ADDR = 22500
CAM_STATUS_ADDR_LENGTH = 12
def clear_batch():
cmdList = []
for _ in range(100):
cmdList.append(0)
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res1 = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.BATCH_ADDR_BEGIN,
output_value=cmdList
)
res2 = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.TASK_ADDR_BEGIN,
output_value=cmdList
)
# print('cmdList: ',res1)
except Exception as e:
print('cmdList信息指令异常' + str(e))
time.sleep(2)
#sleep for a little while after clear register
time.sleep(0.1)
def write_batch(cmdList = []):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.BATCH_ADDR_BEGIN,
output_value=cmdList
)
print('cmdList: ',res)
except Exception as e:
print('cmdList信息指令异常' + str(e))
time.sleep(2)
def write_task_batch(cmdList = []):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.TASK_ADDR_BEGIN,
output_value=cmdList
)
print('write_task_batch: ',res)
except Exception as e:
print('write_task_batch信息指令异常' + str(e))
time.sleep(2)
def write_batch_float(cmdList = []):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.BATCH_ADDR_BEGIN,
output_value=cmdList,
data_format='>f'
)
print('cmdList Float: ',res)
except Exception as e:
print('cmdList Float信息指令异常' + str(e))
time.sleep(2)
#
def cmd_type(cmdType: int):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.TASKTYPE_ADDR_BEGIN,
output_value=cmdType
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
async def setSubPaths(start: str, stop: str):
path = await Path.filter(start = start, stop = stop, is_valid = True, is_active = True)
subPaths = []
if path and len(path) > 0:
subPaths = path.paths
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.TASKTYPE_ADDR_BEGIN,
output_value=subPaths
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
#
def pass_door(passDoor: int):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.PASS_DOOR_ADDR,
output_value=passDoor
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
#
def setLayer(layer: int):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.LAYER_ADDR,
output_value=layer
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
#
def moveArm(move: int):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.ARM_ADDR,
output_value=move
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
#
def taskNum(num: int):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.TASK_NUM,
output_value=num
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
def setTasks(tasks = []):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.T1_ADDR_GET_ADDR,
output_value=tasks
)
print('cmd_type: ',res)
if res:
task_status = False
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
def readStatus():
with ModbusConnection(
AGVPLC.HOST,
AGVPLC.PORT
) as master:
status = master.execute(
1,
cst.READ_HOLDING_REGISTERS,
TaskAddr.TASK_DONE_ADDR,
TaskAddr.STATUS_ADDR_LENGTH
)
2 weeks ago
# print('readStatus: ',status)
return status
2 weeks ago
def readCamStatus():
with ModbusConnection(
AGVPLC.HOST,
AGVPLC.PORT
) as master:
status = master.execute(
1,
cst.READ_HOLDING_REGISTERS,
TaskAddr.CAM_BEGIN_ADDR,
TaskAddr.CAM_STATUS_ADDR_LENGTH,
)
# print('Cam readStatus: ',status)
return status
def setCamIntList(parms = []):
try:
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_MULTIPLE_REGISTERS,
starting_address=TaskAddr.PIC_DONE_ADDR,
output_value=parms
)
# print('setCamIntList: ',res)
except Exception as e:
print('cmd_type信息指令异常' + str(e))
time.sleep(2)
#
def camDone(done: int):
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.PIC_DONE_ADDR,
output_value=done
)
print('camDone: ',res)
#
def doorReady(done: int):
with ModbusConnection(
# 试剂amr机器人
AGVPLC.HOST,
AGVPLC.PORT
) as master:
res = master.execute(
1,
cst.WRITE_SINGLE_REGISTER,
starting_address=TaskAddr.DOOR_READY_ADDR,
output_value=done
)
print('doorReady: ',res)
# def writeddx(ddx: float):
# with ModbusConnection(
# # 试剂amr机器人
# AGVPLC.HOST,
# AGVPLC.PORT
# ) as master:
# res = master.execute(
# 1,
# cst.WRITE_MULTIPLE_REGISTERS,
# starting_address=TaskAddr.DDX_ADDR,
# output_value=[ddx],
# data_format='>f'
# )
# print('writeddx: ',res)
# def writeddy(ddy: float):
# with ModbusConnection(
# # 试剂amr机器人
# AGVPLC.HOST,
# AGVPLC.PORT
# ) as master:
# res = master.execute(
# 1,
# cst.WRITE_MULTIPLE_REGISTERS,
# starting_address=TaskAddr.DDY_ADDR,
# output_value=[ddy],
# data_format='>f'
# )
# print('writeddy: ',res)