from fastapi import APIRouter from starlette.requests import Request from tortoise.queryset import QuerySet, Prefetch from models.agv import ( Agv ) from models.warehouse import ( Hole, Vacancy ) from helper import respond_to from tortoise.contrib.pydantic import pydantic_model_creator import requests from typing import List import threading from plc.tools import ( bytes_to_int_list, float2intList, intList2Float ) import struct import time import math from pydantic import BaseModel from typing import Optional from conf import setting from cam.getParam import getParam from cam.axis import ( labelMapBigDisk, labelMapLittleDisk, labelMapLittleBox, labelMapBigBox ) from plc.cmd import( cmd_type, setSubPaths, pass_door, setLayer, moveArm, taskNum, setTasks, clear_batch, readStatus, write_batch, write_batch_float, #Cam related readCamStatus, setCamIntList, camDone, doorReady, write_task_batch ) from algorithm.genpath import ( genPath, getAgvStop ) import traceback from .views import ( clear_batch, clearCmd, clearHoles, returnBox, putAllTask, getAllTask ) #second capture count CNT = 0 LASTITEM = None LASTX = None LASTY = None LABELMAP = [] class OrderM(BaseModel): ordertype: int layer: int stop: str class FullOrder(BaseModel): ordertype_start: int layer_start: int stop_start: str ordertype_end: int layer_end: int stop_end: str agv_test_router = APIRouter(prefix='/agv') @agv_test_router.post("/singlestep", summary='整单测试') async def test3(request: Request, orderm: OrderM): clear_batch() orderType = orderm.ordertype layer = orderm.layer end = orderm.stop vl_output = [] for _ in range(100): vl_output.append(0) #任务类型 vl_output[0] = orderType #是否经过玻璃门 vl_output[1] = 2 #取框层数 vl_output[2] = layer #手臂是否需要动作 vl_output[3] = 2 #D21010-D21059 #AGV站点号 # vl_output[10] = 311 status = readStatus() start = status[2] startStop = getAgvStop(start) paths = genPath(startStop, end) print("Path: ", paths) cnt = 10 for path in paths: vl_output[cnt] = path cnt = cnt + 1 print("T1: ", vl_output) write_batch(vl_output) clearCmd() return respond_to(data={"status": "Hello World!"}) @agv_test_router.post("/oneround", summary='一轮测试') async def test4(request: Request, fullOrder: FullOrder): orderType = fullOrder.ordertype_start layer = fullOrder.layer_start end = fullOrder.stop_start vl_output = [] for _ in range(100): vl_output.append(0) #任务类型 vl_output[0] = orderType #是否经过玻璃门 vl_output[1] = 2 #取框层数 vl_output[2] = layer #手臂是否需要动作 vl_output[3] = 2 #D21010-D21059 #AGV站点号 # vl_output[10] = 311 status = readStatus() start = status[2] startStop = getAgvStop(start) paths = genPath(startStop, end) print("Path: ", paths) cnt = 10 for path in paths: vl_output[cnt] = path cnt = cnt + 1 print("T1: ", vl_output) write_batch(vl_output) clearCmd() #second round orderType = fullOrder.ordertype_end layer = fullOrder.layer_end end = fullOrder.stop_end vl_output = [] for _ in range(100): vl_output.append(0) #任务类型 vl_output[0] = orderType #是否经过玻璃门 vl_output[1] = 2 #取框层数 vl_output[2] = layer #手臂是否需要动作 vl_output[3] = 2 #D21010-D21059 #AGV站点号 # vl_output[10] = 311 status = readStatus() start = status[2] startStop = getAgvStop(start) paths = genPath(startStop, end) print("Path: ", paths) cnt = 10 for path in paths: vl_output[cnt] = path cnt = cnt + 1 print("T1: ", vl_output) write_batch(vl_output) clearCmd() return respond_to(data={"status": "Hello World!"}) @agv_test_router.get('/float', summary="浮点测试") async def floatTest(request: Request): xStatus = readCamStatus() # xStatus = readXStatus()[0] xStatus = list(xStatus) xValue = intList2Float(xStatus[0:2]) print("xValue: ", xValue) yValue = intList2Float(xStatus[2:4]) print("yValue: ", yValue) zValue = intList2Float(xStatus[4:6]) print("zValue: ", zValue) rValue = intList2Float(xStatus[6:8]) print("rValue: ", rValue) DDXList = float2intList(5.12) print("DDXList: ", DDXList) DDYList = float2intList(3.14) print("DDYList: ", DDYList) Delta = DDXList + DDYList setCamIntList(Delta) return respond_to(data={"status": "Hello World!"}) @agv_test_router.get('/singlemove', summary="单只取放") async def singleMove(request: Request): global CNT global LABELMAP # clear all register related in PLC clear_batch() CNT = 0 LABELMAP = [] layerSrc = 3 end = "01-0111-03-02" tasktype = 3 if 1 == tasktype: # little box LABELMAP = labelMapBigDisk + labelMapLittleDisk + labelMapLittleBox elif 3 == tasktype: # big box LABELMAP = labelMapBigDisk + labelMapLittleDisk + labelMapBigBox else: print("Cannot Match") print("LABELMAP: ", LABELMAP) vl_output = [] # for _ in range(100): for _ in range(100): vl_output.append(0) #任务类型 vl_output[0] = 5 #是否经过玻璃门 vl_output[1] = 2 #取框层数 vl_output[2] = layerSrc #手臂是否需要动作 vl_output[3] = 1 #小框 vl_output[4] = 1 #D21010-D21059 #AGV站点号 # vl_output[10] = 311 status = readStatus() start = status[2] startStop = getAgvStop(start) paths = [] paths = genPath(startStop, end) print("Path: ", paths) cnt = 10 for path in paths: vl_output[cnt] = path cnt = cnt + 1 print("T1: ", vl_output) write_batch(vl_output) vl_output = [] # for _ in range(100): for _ in range(100): vl_output.append(0) # #混合任务 # move7(vl_output) #大白瓶单次 move8(vl_output) print("T2: ", vl_output) write_task_batch(vl_output) clearCmd() if 1 == tasktype or 3 == tasktype: returnBox(paths[-1], layerSrc) clearCmd() return respond_to(data={"status": "Hello World!"}) @agv_test_router.get('/simshceduler', summary="模拟调度命令") async def simShceduler(): await clearHoles() assignmentCmd = [ { "01-0113-3-2": { "cap": 40, "name": "大白瓶", "action": 5, "bottle": 60, "height": 180, "orderid": "tr43frr", "traynum": "1X01", "quantity": 1, "typetask": 3, "coordinates": "01-0111-3-2" } }, { "01-0113-3-5": { "cap": 70, "name": "无水亚硫酸钠", "action": 5, "bottle": 60, "height": 150, "orderid": "tr43frr", "traynum": "1X02", "quantity": 1, "typetask": 3, "coordinates": "01-0111-3-5" } } ] wholeBoxLabel = None wholeBoxTask = None taskDic = {} #Combine all task in same box for task in assignmentCmd: print("Task: ", task) destLabel = list(task.keys())[0] parts = task[destLabel]["coordinates"].split("-") # change label '01-0154-1-1' -> '01-0154-1' if len(parts) > 3: taskLabel = "-".join(parts[0:3]) else: wholeBoxLabel = "-".join(parts) wholeBoxTask = task continue try: taskDic[taskLabel].append(task) except: taskDic[taskLabel] = [task] taskDicKeys = list(taskDic.keys()) taskDicKeys.sort() #Whole get task will put at the end of task list if wholeBoxLabel and wholeBoxTask: taskDic[wholeBoxLabel] = [wholeBoxTask] taskDicKeys.append(wholeBoxLabel) for key in taskDicKeys: await getAllTask(taskDic[key]) #where to put in AGV plate in getting task !!! #where to get from AGV plate in putting task !!! ################# ### put whole task! ################# wholeBoxLabel = None wholeBoxTask = None taskDic = {} #Combine all task in same box for task in assignmentCmd: print("Task: ", task) destLabel = list(task.keys())[0] parts = destLabel.split("-") # change label '01-0154-1-1' -> '01-0154-1' if len(parts) > 3: taskLabel = "-".join(parts[0:3]) else: wholeBoxLabel = "-".join(parts) wholeBoxTask = task continue try: taskDic[taskLabel].append(task) except: taskDic[taskLabel] = [task] taskDicKeys = list(taskDic.keys()) taskDicKeys.sort() #Whole get task will put at the end of task list if wholeBoxLabel and wholeBoxTask: taskDic[wholeBoxLabel] = [wholeBoxTask] taskDicKeys.insert(0, wholeBoxLabel) for key in taskDicKeys: await putAllTask(taskDic[key]) await clearHoles() return respond_to(data={"status": "Hello World!"}) @agv_test_router.get('/t1', summary="接驳台整箱到货架") async def t1(): clear_batch() orderType = 1 layer = 2 end = "01-0067-2" vl_output = [] for _ in range(100): vl_output.append(0) #任务类型 vl_output[0] = orderType #是否经过玻璃门 vl_output[1] = 2 #取框层数 vl_output[2] = layer #手臂是否需要动作 vl_output[3] = 2 #D21010-D21059 #AGV站点号 # vl_output[10] = 311 status = readStatus() start = status[2] startStop = getAgvStop(start) paths = genPath(startStop, end) print("Path: ", paths) cnt = 10 for path in paths: vl_output[cnt] = path cnt = cnt + 1 print("T1: ", vl_output) write_batch(vl_output) clearCmd() ####################### orderType = 2 layer = 3 end = "01-0113-3" vl_output = [] for _ in range(100): vl_output.append(0) #任务类型 vl_output[0] = orderType #是否经过玻璃门 vl_output[1] = 2 #取框层数 vl_output[2] = layer #手臂是否需要动作 vl_output[3] = 2 #D21010-D21059 #AGV站点号 # vl_output[10] = 311 status = readStatus() start = status[2] startStop = getAgvStop(start) paths = genPath(startStop, end) print("Path: ", paths) cnt = 10 for path in paths: vl_output[cnt] = path cnt = cnt + 1 print("T2: ", vl_output) write_batch(vl_output) clearCmd() return respond_to(data={"status": "Hello World!"}) @agv_test_router.get('/t2', summary="货架整箱到接驳台") async def t2(): clear_batch() orderType = 1 layer = 3 end = "01-0113-3" vl_output = [] for _ in range(100): vl_output.append(0) #任务类型 vl_output[0] = orderType #是否经过玻璃门 vl_output[1] = 2 #取框层数 vl_output[2] = layer #手臂是否需要动作 vl_output[3] = 2 #D21010-D21059 #AGV站点号 # vl_output[10] = 311 status = readStatus() start = status[2] startStop = getAgvStop(start) paths = genPath(startStop, end) print("Path: ", paths) cnt = 10 for path in paths: vl_output[cnt] = path cnt = cnt + 1 print("T1: ", vl_output) write_batch(vl_output) clearCmd() ####################### orderType = 2 layer = 2 end = "01-0067-2" vl_output = [] for _ in range(100): vl_output.append(0) #任务类型 vl_output[0] = orderType #是否经过玻璃门 vl_output[1] = 2 #取框层数 vl_output[2] = layer #手臂是否需要动作 vl_output[3] = 2 #D21010-D21059 #AGV站点号 # vl_output[10] = 311 status = readStatus() start = status[2] startStop = getAgvStop(start) paths = genPath(startStop, end) print("Path: ", paths) cnt = 10 for path in paths: vl_output[cnt] = path cnt = cnt + 1 print("T2: ", vl_output) write_batch(vl_output) clearCmd() return respond_to(data={"status": "Hello World!"}) @agv_test_router.get('/t3', summary="接驳台取单只到货架") async def t3(): clear_batch() global CNT global LABELMAP # clear all register related in PLC clear_batch() CNT = 0 LABELMAP = [] layerSrc = 2 end = "01-0066-02-02" tasktype = 3 if 1 == tasktype: # little box LABELMAP = labelMapBigDisk + labelMapLittleDisk + labelMapLittleBox elif 3 == tasktype: # big box LABELMAP = labelMapBigDisk + labelMapLittleDisk + labelMapBigBox else: print("Cannot Match") print("LABELMAP: ", LABELMAP) vl_output = [] # for _ in range(100): for _ in range(100): vl_output.append(0) #任务类型 vl_output[0] = 1 #是否经过玻璃门 vl_output[1] = 2 #取框层数 vl_output[2] = layerSrc #手臂是否需要动作 vl_output[3] = 1 status = readStatus() start = status[2] startStop = getAgvStop(start) paths = [] paths = genPath(startStop, end) print("Path: ", paths) cnt = 10 for path in paths: vl_output[cnt] = path cnt = cnt + 1 print("T1: ", vl_output) write_batch(vl_output) vl_output = [] # for _ in range(100): for _ in range(100): vl_output.append(0) #大白瓶单次取 move8(vl_output) print("T2: ", vl_output) write_task_batch(vl_output) clearCmd() if 1 == tasktype or 3 == tasktype: returnBox(paths[-1], layerSrc) clearCmd() ####################### CNT = 0 LABELMAP = [] layerSrc = 3 end = "01-0111-03-02" tasktype = 3 if 1 == tasktype: # little box LABELMAP = labelMapBigDisk + labelMapLittleDisk + labelMapLittleBox elif 3 == tasktype: # big box LABELMAP = labelMapBigDisk + labelMapLittleDisk + labelMapBigBox else: print("Cannot Match") print("LABELMAP: ", LABELMAP) vl_output = [] # for _ in range(100): for _ in range(100): vl_output.append(0) #任务类型 vl_output[0] = 1 #是否经过玻璃门 vl_output[1] = 2 #取框层数 vl_output[2] = layerSrc #手臂是否需要动作 vl_output[3] = 1 status = readStatus() start = status[2] startStop = getAgvStop(start) paths = [] paths = genPath(startStop, end) print("Path: ", paths) cnt = 10 for path in paths: vl_output[cnt] = path cnt = cnt + 1 print("T1: ", vl_output) write_batch(vl_output) vl_output = [] # for _ in range(100): for _ in range(100): vl_output.append(0) #大白瓶单次取 move9(vl_output) print("T2: ", vl_output) write_task_batch(vl_output) clearCmd() if 1 == tasktype or 3 == tasktype: returnBox(paths[-1], layerSrc) clearCmd() @agv_test_router.get('/t4', summary="货架取单只到接驳台") async def t4(): global CNT global LABELMAP clear_batch() ####################### CNT = 0 LABELMAP = [] layerSrc = 3 end = "01-0111-03-02" tasktype = 3 if 1 == tasktype: # little box LABELMAP = labelMapBigDisk + labelMapLittleDisk + labelMapLittleBox elif 3 == tasktype: # big box LABELMAP = labelMapBigDisk + labelMapLittleDisk + labelMapBigBox else: print("Cannot Match") print("LABELMAP: ", LABELMAP) vl_output = [] # for _ in range(100): for _ in range(100): vl_output.append(0) #任务类型 vl_output[0] = 1 #是否经过玻璃门 vl_output[1] = 2 #取框层数 vl_output[2] = layerSrc #手臂是否需要动作 vl_output[3] = 1 status = readStatus() start = status[2] startStop = getAgvStop(start) paths = [] paths = genPath(startStop, end) print("Path: ", paths) cnt = 10 for path in paths: vl_output[cnt] = path cnt = cnt + 1 print("T1: ", vl_output) write_batch(vl_output) vl_output = [] # for _ in range(100): for _ in range(100): vl_output.append(0) #大白瓶单次取 move8(vl_output) print("T2: ", vl_output) write_task_batch(vl_output) clearCmd() if 1 == tasktype or 3 == tasktype: returnBox(paths[-1], layerSrc) clearCmd() ####################### # clear all register related in PLC clear_batch() CNT = 0 LABELMAP = [] layerSrc = 2 end = "01-0066-02-02" tasktype = 3 if 1 == tasktype: # little box LABELMAP = labelMapBigDisk + labelMapLittleDisk + labelMapLittleBox elif 3 == tasktype: # big box LABELMAP = labelMapBigDisk + labelMapLittleDisk + labelMapBigBox else: print("Cannot Match") print("LABELMAP: ", LABELMAP) vl_output = [] # for _ in range(100): for _ in range(100): vl_output.append(0) #任务类型 vl_output[0] = 1 #是否经过玻璃门 vl_output[1] = 2 #取框层数 vl_output[2] = layerSrc #手臂是否需要动作 vl_output[3] = 1 status = readStatus() start = status[2] startStop = getAgvStop(start) paths = [] paths = genPath(startStop, end) print("Path: ", paths) cnt = 10 for path in paths: vl_output[cnt] = path cnt = cnt + 1 print("T1: ", vl_output) write_batch(vl_output) vl_output = [] # for _ in range(100): for _ in range(100): vl_output.append(0) #大白瓶单次取 move9(vl_output) print("T2: ", vl_output) write_task_batch(vl_output) clearCmd() if 1 == tasktype or 3 == tasktype: returnBox(paths[-1], layerSrc) clearCmd() def move1(vl_output): # 黑瓶 碘酸钾 ############### # backward vl_output[0] = 6 #任务1 get vl_output[1] = 102 #put vl_output[2] = 305 #height vl_output[3] = 117 #diameter vl_output[4] = 50 #任务2 get vl_output[5] = 305 #put vl_output[6] = 205 #height vl_output[7] = 117 #diameter vl_output[8] = 50 #任务3 get vl_output[9] = 205 #put vl_output[10] = 206 #height vl_output[11] = 117 #diameter vl_output[12] = 50 ############### # forward #任务1 get vl_output[13] = 206 #put vl_output[14] = 205 #height vl_output[15] = 117 #diameter vl_output[16] = 50 #任务2 get vl_output[17] = 205 #put vl_output[18] = 305 #height vl_output[19] = 117 #diameter vl_output[20] = 50 #任务3 get vl_output[21] = 305 #put vl_output[22] = 102 #height vl_output[23] = 117 #diameter vl_output[24] = 50 return vl_output def move2(vl_output): # 红盖 钙X酸 ############### # backward vl_output[0] = 6 #任务1 get vl_output[1] = 102 #put vl_output[2] = 305 #height vl_output[3] = 90 #diameter vl_output[4] = 50 #任务2 get vl_output[5] = 305 #put vl_output[6] = 205 #height vl_output[7] = 90 #diameter vl_output[8] = 50 #任务3 get vl_output[9] = 205 #put vl_output[10] = 206 #height vl_output[11] = 90 #diameter vl_output[12] = 50 ############### # forward #任务1 get vl_output[13] = 206 #put vl_output[14] = 205 #height vl_output[15] = 90 #diameter vl_output[16] = 50 #任务2 get vl_output[17] = 205 #put vl_output[18] = 305 #height vl_output[19] = 90 #diameter vl_output[20] = 50 #任务3 get vl_output[21] = 305 #put vl_output[22] = 102 #height vl_output[23] = 90 #diameter vl_output[24] = 50 return vl_output def move3(vl_output): # 乙醇 ############### # backward vl_output[0] = 6 #任务1 get vl_output[1] = 102 #put vl_output[2] = 305 #height vl_output[3] = 170 #diameter vl_output[4] = 35 #任务2 get vl_output[5] = 305 #put vl_output[6] = 205 #height vl_output[7] = 170 #diameter vl_output[8] = 35 #任务3 get vl_output[9] = 205 #put vl_output[10] = 206 #height vl_output[11] = 170 #diameter vl_output[12] = 35 ############### # forward #任务1 get vl_output[13] = 206 #put vl_output[14] = 205 #height vl_output[15] = 170 #diameter vl_output[16] = 35 #任务2 get vl_output[17] = 205 #put vl_output[18] = 305 #height vl_output[19] = 170 #diameter vl_output[20] = 35 #任务3 get vl_output[21] = 305 #put vl_output[22] = 102 #height vl_output[23] = 170 #diameter vl_output[24] = 35 return vl_output def move4(vl_output): vl_output[0] = 2 #任务1 get vl_output[1] = 206 #put vl_output[2] = 202 #height vl_output[3] = 117 #diameter vl_output[4] = 50 #任务1 get vl_output[5] = 202 #put vl_output[6] = 205 #height vl_output[7] = 117 #diameter vl_output[8] = 50 return vl_output # 大白瓶 # 大白瓶 def move5(vl_output): # 大白瓶 ############### # backward vl_output[0] = 6 #任务1 get vl_output[1] = 101 #put vl_output[2] = 102 #height vl_output[3] = 180 #diameter vl_output[4] = 40 #任务2 get vl_output[5] = 102 #put vl_output[6] = 401 #height vl_output[7] = 180 #diameter vl_output[8] = 40 #任务3 get vl_output[9] = 401 #put vl_output[10] = 403 #height vl_output[11] = 180 #diameter vl_output[12] = 40 ############### # forward #任务1 get vl_output[13] = 403 #put vl_output[14] = 406 #height vl_output[15] = 180 #diameter vl_output[16] = 40 #任务2 get vl_output[17] = 406 #put vl_output[18] = 404 #height vl_output[19] = 180 #diameter vl_output[20] = 40 #任务3 get vl_output[21] = 404 #put vl_output[22] = 102 #height vl_output[23] = 180 #diameter vl_output[24] = 40 return vl_output #无水亚硫酸钠 def move6(vl_output): #无水亚硫酸钠 vl_output[0] = 6 # # #任务1 get vl_output[1] = 102 #put vl_output[2] = 401 #height vl_output[3] = 150 #diameter vl_output[4] = 70 # # #任务2 get vl_output[5] = 401 #put vl_output[6] = 403 #height vl_output[7] = 150 #diameter vl_output[8] = 70 # # #任务3 get vl_output[9] = 403 #put vl_output[10] = 406 #height vl_output[11] = 150 #diameter vl_output[12] = 70 # # #任务4 get vl_output[13] = 406 #put vl_output[14] = 404 #height vl_output[15] = 150 #diameter vl_output[16] = 70 # # #任务5 get vl_output[17] = 404 #put vl_output[18] = 102 #height vl_output[19] = 150 #diameter vl_output[20] = 70 # # #任务6 get vl_output[21] = 102 #put vl_output[22] = 101 #height vl_output[23] = 150 #diameter vl_output[24] = 70 #混合任务 def move7(vl_output): #混合任务 vl_output[0] = 8 # # #任务1 get vl_output[1] = 102 #put vl_output[2] = 401 #height vl_output[3] = 160 #diameter vl_output[4] = 55 # # #任务2 get vl_output[5] = 401 #put vl_output[6] = 403 #height vl_output[7] = 160 #diameter vl_output[8] = 55 # # #任务3 get vl_output[9] = 403 #put vl_output[10] = 406 #height vl_output[11] = 160 #diameter vl_output[12] = 55 # # #任务4 get vl_output[13] = 406 #put vl_output[14] = 404 #height vl_output[15] = 160 #diameter vl_output[16] = 55 # # #任务5 get vl_output[17] = 404 #put vl_output[18] = 102 #height vl_output[19] = 160 #diameter vl_output[20] = 55 # # #任务6 get vl_output[21] = 102 #put vl_output[22] = 101 #height vl_output[23] = 160 #diameter vl_output[24] = 55 #任务7 get vl_output[25] = 206 #put vl_output[26] = 202 #height vl_output[27] = 117 #diameter vl_output[28] = 50 #任务8 get vl_output[29] = 202 #put vl_output[30] = 205 #height vl_output[31] = 117 #diameter vl_output[32] = 50 # 大白瓶 单次取 def move8(vl_output): # 大白瓶 ############### # backward vl_output[0] = 1 #任务1 get vl_output[1] = 402 #put vl_output[2] = 102 #height vl_output[3] = 180 #diameter vl_output[4] = 40 # 大白瓶 单次放 def move9(vl_output): # 大白瓶 ############### # backward vl_output[0] = 1 #任务1 get vl_output[1] = 102 #put vl_output[2] = 402 #height vl_output[3] = 180 #diameter vl_output[4] = 40 # 酚酞 单次放 def move10(vl_output): # 酚酞 ############### # backward vl_output[0] = 1 #任务1 get vl_output[1] = 307 #put vl_output[2] = 306 #height vl_output[3] = 180 #diameter vl_output[4] = 40