You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

451 lines
18 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import asyncio
import io
import os
from os import path, makedirs
import json,time
import httpx
import requests
from fastapi import APIRouter, Request
from pydantic import BaseModel
from starlette.websockets import WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
from helper import usb, respond_to
from helper.conversion import InventoryCheck
from helper.log import record_log
from helper.utils import timezone_now
from helper.websocket_manage import manager
from models import Cabinet, User, Drawer, Drug, DrawerBoard, Terminal, Role, DrugStateEnum
from task.drug import run_delayed_task
from transfer.standard import Standard
from conf.setting import setting
from helper.logger import logger
router = APIRouter(prefix='/plane')
ONE_MODEL_LIST = ['9000','eagle90']
# 一层抽屉对应多个板子
MORE_MODEL_LIST = ['3000', '1804', '1800', '1600', '90', '1802','rc800','rc802','rc804','rc806']
# 度目人脸登录
login_user_list = []
clear_list_task = None
class GlobalStaticSetting:
finally_rfid_data_report_time=None #RFID数据盘存时间
finally_rfid_data_mask_duration=2 #RFID数据屏蔽时长
class BeforeOpenModel(BaseModel):
count: int
data: str | None
state: int
# board_addresses: list
class CommentModel(BaseModel):
code: int
data: dict | int | None
class OpenDoor(BaseModel):
drawer_number: int
board_addresses: list[int]
cabinet_id: str
@router.post('/rfids', summary='下位机rfid数据上报')
async def create(request: Request, post: BeforeOpenModel):
"""
下位机上报
:param request: Request
:param post: BeforeOpenModel
:return:
"""
logger.warning("------下位机上报日志----------")
logger.warning(f"上报状态:{post.state}, 上报内容{post.data}")
# 下位机推送的盘点数据
inventory_check = InventoryCheck(post.data)
inventory = await inventory_check.restruct()
state = post.state
print("inventory状态", state, inventory)
# 打开抽屉state=1时就对比盘存单和库存
cabinet_obj = await Cabinet.get(terminal_id=setting.TERMINAL_ID)
login_user = await User.get(id=cabinet_obj.user_id).only('name')
# put_in_d, take_out_d = await inventory_check.inventory_result(cabinet_obj.id, post.board_addresses)
# take_out_err_msg = ""
# if take_out_d:
# for line_no, drug_info in take_out_d.items():
# for rfid in drug_info:
# drug_obj = await Drug.get_or_none(rfid=rfid)
# if login_user and drug_obj.fill_json_content.get('syr') != login_user.name: # 柜体登陆人与药剂使用人不一致
# take_out_err_msg += drug_obj.fill_json_content.get('k1') + ','
# if take_out_err_msg:
# take_out_err_msg = login_user.name + '不是'+take_out_err_msg + '的使用人,请放回柜体!'
# 是否为首次盘存数据
if state != 1:
mac_addr = request.headers.get('mac-addr', '')
# 找到归属柜体
cabinet = await Cabinet.get_or_none(mac_addr=mac_addr)
if not cabinet:
return respond_to(code=400, desc='柜体不存在')
# 9000调用根据定位板数据判断
# if setting.CLIENT_NUMBER in ONE_MODEL_LIST:
# put_in_d, take_out_d = await inventory_check.inventory_result(cabinet.id, post.board_addresses)
# elif setting.CLIENT_NUMBER in MORE_MODEL_LIST:
# # 定位到层
# put_in_d, take_out_d = await inventory_check.inventory_board_result(cabinet.id, post.board_addresses)
# print("上报rfid", put_in_d, take_out_d)
# else:
# put_in_d, take_out_d = await inventory_check.inventory_result(cabinet.id, post.board_addresses)
put_in_d, take_out_d = await inventory_check.inventory_keys_result(cabinet.id)
# 关门状态
if state == 3:
# 抑制下位机多次多次盘点的数据结果上报只取第一次后续的2秒内的上报全部放弃不放弃会导致数据错乱
if GlobalStaticSetting.finally_rfid_data_report_time != None and (int(time.time()) - GlobalStaticSetting.finally_rfid_data_report_time < GlobalStaticSetting.finally_rfid_data_mask_duration):
logger.info("触发RFID数据上报屏蔽")
return respond_to()
GlobalStaticSetting.finally_rfid_data_report_time=int(time.time())
logger.info(f"关门后上报状态:{post.state}, rfid: {inventory}")
logger.info(f"put_in_d{put_in_d}, take_out_d: {take_out_d}")
standard = Standard(cabinet.user_id.split(','))
inventory_result = {}
storage_list = []
return_list = []
take_out_list = []
# 是否有存入药剂
if put_in_d:
for rfid in put_in_d:
drug_obj = await Drug.get_or_none(rfid=rfid)
if drug_obj and drug_obj.state == DrugStateEnum.IN:
continue
position = inventory.get(rfid)
transfer_results = await standard.put_in(rfid, cabinet.id, position)
# if transfer_results:
# transfer_results['rfid'] = rfid
# if transfer_results:
# if transfer_results.get('operate_type') and transfer_results['operate_type'] == 'in':
# storage_list.append(transfer_results)
# else:
# return_list.append(transfer_results)
# 是否有领用药剂
if take_out_d:
for rfid in take_out_d:
position = inventory.get(rfid)
transfer_results = await standard.take_out(rfid, cabinet.id, position)
# if not transfer_results:
# continue
# transfer_results['rfid'] = rfid
# transfer_results['hole'] = line_no
# take_out_list.append(transfer_results)
# take_out_err_record_list = []
# if take_out_list: # 存在拿出的试剂判断是否是非法领用
# for drug_item in take_out_list:
# if login_user and drug_item["fill_json_content"].get('syr') != login_user.name: # 柜体登陆人与药剂使用人不一致
# take_out_err_record_list.append(drug_item)
# print("take_out_err_record_list-->", take_out_err_record_list)
# inventory_result.update(
# {'storage_list': storage_list, 'return_list': return_list, 'take_out_list': take_out_list, 'take_out_err_record_list': take_out_err_record_list})
# if storage_list or return_list or take_out_list:
# await manager.send_message("inventory", inventory_result)
# # 非法领用实时报警
# if take_out_err_record_list:
# print("非法领用-关门上报!!!!!")
# await manager.broadcast_json("take_out_err_realtime", {'take_out_err_realtime_msg': take_out_err_msg + "!"})
# await manager.send_message("warning", "sssss")
# return respond_to()
# 广播盘点结果
# await manager.send_message("opening", inventory)
# # 非法领用实时报警
# await manager.broadcast_json("take_out_err_realtime", {'take_out_err_realtime_msg': take_out_err_msg})
# await manager.send_message("warning", "sssss")
global login_user_list
login_user_list = []
return respond_to()
@router.post('/logs', summary='上报工作日志')
async def create(request: Request, model: CommentModel):
"""
上报工作日志
:param request: Request
:param model: CommentModel
:return:
"""
print("=================",model.data,"============",model.code)
mac_addr = request.headers.get('mac-addr')
cabinet = await Cabinet.get(mac_addr=mac_addr).only('id', 'user_id', 'label')
code = model.code
data = model.data
user_id = cabinet.user_id
if user_id:
user = await User.get(id=user_id.split(',')[0]).only('name')
username = user.name
else:
username = None
if code == 201:
if data['state'] == 1:
state = '打开'
else:
state = '关闭'
await manager.broadcast_json("end", {'state': True})
await record_log(kind='柜体操作', user_id=user_id, users=username, comment=f'{state}{cabinet.label}柜门', cabinet_id=cabinet.id)
if code == 202:
state = '打开' if data['state'] == 1 else '关闭'
drawer = await Drawer.filter(line_no=data['value'], cabinet=cabinet).first()
await record_log(kind='抽屉操作', user_id=user_id, users=username,
comment=f'{state}{cabinet.label}柜门-{drawer.label}抽屉', cabinet_id=cabinet.id)
if code == 210:
await manager.broadcast_json("end", 'end')
await record_log(kind='柜体操作', user_id=user_id, users=username,
comment=f'结束柜体操作', cabinet_id=cabinet.id)
if code == 410:
await manager.broadcast_json("warning", data)
await record_log(kind='柜体异常', user_id=user_id, users=username,
comment=f'未关闭柜门', cabinet_id=cabinet.id)
now = timezone_now()
dir = path.join('static', 'logs', str(cabinet.id))
makedirs(dir, exist_ok=True)
log_path = dir + f'/{timezone_now().strftime("%Y%m%d")}.log'
with open(log_path, 'a') as f:
f.seek(0)
f.write(f'{now.strftime("%Y-%m-%d %H:%M:%S")} - {model}\r\n')
# 登录请求的URL和参数
# login_url = 'http://192.168.2.100:8000/v1/login'
# login_payload = {
# 'username': 'admin',
# 'password': '123456'
# }
# 发送登录请求
# print("===============通道门请求==========")
# login_response = requests.post(login_url, json=login_payload)
# if login_response.status_code == 200:
# # 获取登录成功后返回的 access token 和其他信息
# response_json = login_response.json()
# access_token = response_json.get('value', {}).get('token')
# # 如果成功获取到 access token将其添加到 patch 请求的头部
# if access_token:
# patch_url = 'http://192.168.2.100:8000/v1/real_time_task/choice'
# headers = {
# 'Authorization': f'Bearer {access_token}',
# 'Content-Type': 'application/json'
# }
# patch_payload = {
# 'choice_file_id': 2
# }
# # 发送 patch 请求
# patch_response = requests.patch(patch_url, headers=headers, json=patch_payload)
# if patch_response.status_code == 200:
# return patch_response.json() # 返回请求结果
# else:
# return f'Patch请求失败状态码{patch_response.status_code}'
# else:
# return '未能获取到 Access Token'
# else:
# return f'登录请求失败,状态码:{login_response.status_code}'
@router.get("/cabinet_files/{cabinet_id}", summary='获取下位机日志列表')
async def read_cabinet_files(cabinet_id: str, page_no: int = 1, page_size: int = 10):
"""
获取下位机日志列表
:return:
"""
# 获取static/logs目录下所有子目录的名称
pid_dir = f"static/logs/{cabinet_id}"
if not os.path.exists(pid_dir):
return respond_to(code=400, desc=f'没有找到目录')
result = os.listdir(pid_dir)
sorted_logs = sorted(result, reverse=True)
sorted_logs = sorted_logs[0:30]
return respond_to(data=sorted_logs)
@router.get("/download/{cabinet_id}/{file_name}", summary='下载日志')
async def download_file(cabinet_id: str, file_name: str):
"""
下载日志
:param cabinet_id: 柜体名称
:param file_name: 日志文件名称
:return:
"""
file_path = f"./static/logs/{cabinet_id}/{file_name}"
if not os.path.exists(file_path):
return respond_to(code=400, desc=f'文件不存在')
with open(file_path, mode="rb") as file:
file_content = file.read()
buffer = io.BytesIO(file_content)
try:
usb.put_in(file_name, buffer)
return respond_to()
except usb.DeviceNotFound:
return respond_to(code=404, desc='请先插入U盘')
@router.websocket("/ws")
async def face_login(websocket: WebSocket):
"""
websocket连接
:param websocket: WebSocket
:return:
"""
await manager.connect(websocket)
try:
while True:
message = await websocket.receive_text()
await manager.handle_message(message)
except WebSocketDisconnect:
manager.disconnect(websocket)
# 开门
@router.post("/open_door/{cabinet_id}", summary='开门')
async def open_door(cabinet_id: str):
"""
下位机开门
"""
params = {"drawer_number": 0, "board_addresses": [], "door": True}
cabinet_obj = await Cabinet.get_or_none(id=cabinet_id)
if not cabinet_obj:
return respond_to(404, desc="未获取柜体")
url = f'http://{cabinet_obj.ip}/api/cabinet/v1/open'
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, json=params)
if response.status_code != 200:
return respond_to(code=400, desc="下位机开门失败")
except Exception as e:
return respond_to(code=400, desc=f"下位机通信失败,{str(e)}")
return respond_to(data=response.json())
class UnlockModel(BaseModel):
position: list
layer: list = [0, 0, 0, 0],
duration: int = 0
function: int = 0
@router.post("/unlock/{cabinet_id}", summary='开锁')
async def open_door(cabinet_id: str, params: UnlockModel):
"""
下位机开锁扣
"""
cabinet_obj = await Cabinet.get_or_none(id=cabinet_id)
if not cabinet_obj:
return respond_to(404, desc="未获取柜体")
url = f'http://{cabinet_obj.ip}/api/cabinet/v1/key'
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, json=params)
if response.status_code != 200:
return respond_to(code=400, desc="下位机开锁失败")
except Exception as e:
return respond_to(code=400, desc=f"下位机通信失败,{str(e)}")
return respond_to(data=response.json())
# 开门
@router.post("/openDoor", summary='度目开门')
async def open_door(request: Request):
"""
开下位机抽屉
:param cabinet_id: 柜体id
:param params: {board_address, door, drawer_number}
:return:
"""
dict_data={
"DMCM000BLC21H0003B": ("100.64.0.2", "3d0ff817-45b6-4306-b5e9-3c23cb7fd5f5"),
"DMCM000BLC21I000S6": ("100.64.0.3", "3d0ff817-45b6-4306-b5e9-3c23cb7fd5f6"),
"DMCM000BLC21I000ML": ("100.64.0.4", "3d0ff817-45b6-4306-b5e9-3c23cb7fd5f7"),
}#设备号ip
data =await request.body()
json_data =json.loads(data)
user_info =json_data["user_info"]
timestamp =json_data["timestamp"]
device_id =json_data["device_id"]
# 30s内自动清空列表
async def clear_list():
await asyncio.sleep(30) # 等待30s
login_user_list.clear() # 清空列表
print("login_user_list cleared")
# 3人登录
if len(login_user_list) < 2 and user_info in login_user_list:
return JSONResponse(content={"task_id": json_data["task_id"], "pass_flag": 2,
"promoting_msg": f'请第{len(login_user_list) + 1}人登录'})
else:
print("加入列表", user_info)
global clear_list_task
if clear_list_task and not clear_list_task.done():
clear_list_task.cancel()
clear_list_task = asyncio.create_task(clear_list()) # Start the task
# 查找用户
user_obj = await User.get_or_none(name=user_info.get("name"))
if not user_obj:
return JSONResponse(content={"task_id": json_data["task_id"], "pass_flag": 2, "promoting_msg": '未获取系统用户'})
role_obj = await Role.get_or_none(id=user_obj.role_id)
user_info["user_id"] = user_obj.id
login_user_list.append(user_info)
if len(login_user_list) == 1:
if role_obj.grade == 0: # 普通用户
login_user_list.pop(-1)
return JSONResponse(content={"task_id": json_data["task_id"], "pass_flag": 2, "promoting_msg": '请管理员先登录'})
if len(login_user_list) == 2:
if role_obj.grade == 0: # 普通用户
login_user_list.pop(-1)
return JSONResponse(
content={"task_id": json_data["task_id"], "pass_flag": 2, "promoting_msg": '请管理员登录'})
if user_info.get("name") == login_user_list[0].get("name"):
print("login_user_list", login_user_list)
login_user_list.pop(-1)
return JSONResponse(
content={"task_id": json_data["task_id"], "pass_flag": 2, "promoting_msg": '请勿重复登录'})
if len(login_user_list) == 3:
#第三人一定不是第二个管理员
if user_obj.name == login_user_list[1].get("name"):
login_user_list.pop(-1)
return JSONResponse(
content={"task_id": json_data["task_id"], "pass_flag": 2, "promoting_msg": '第3人不能是第2个管理员'})
if len(login_user_list) < 3:
return JSONResponse(content={"task_id": json_data["task_id"], "pass_flag": 1,
"promoting_msg": '登录成功'}) # pass_flag 1通行2不通行 promoting_msg 提示语
print("login_user_list", login_user_list)
params ={"board_address":['1','2','3','4','5','6','7','8'], "door":True, "drawer_number":0}
url = f'http://{dict_data[device_id][0]}/api/cabinet/v1/open'
try:
response = requests.post(url, json=params)
print('response', response)
except Exception as e:
print("e", e)
return JSONResponse(content={"task_id": json_data["task_id"], "pass_flag": 2, "promoting_msg": '开门失败'})
# 柜体用户id修改
cabinet_id = dict_data[device_id][1]
cabinet_obj = await Cabinet.get_or_none(id=cabinet_id)
if cabinet_obj:
cabinet_obj.user_id = login_user_list[-1].get("user_id")
await cabinet_obj.save()
return JSONResponse(content={"task_id":json_data["task_id"],"pass_flag":1,"promoting_msg":'开门成功'} )#pass_flag 1通行2不通行 promoting_msg 提示语