# import os # import sys # import ctypes # import asyncio # import datetime # import psutil # import requests # import pathlib # from apscheduler.schedulers.asyncio import AsyncIOScheduler # from app.lib.VK70xUMCDAQ import VK70xUMCDAQ # # from app.conf.Setting import settings # # from app.lib.RecordData import ExperimentData # from app.models import * # from app.lib.Utils import Utils # from app.lib.Utils import format_status_data # from fastapi import APIRouter, Request, WebSocket, WebSocketDisconnect, Response # from app.lib.ModBus import ModbusConnection, IDLE, ZHIFENG # from sqlalchemy import and_, desc # from app.models.Business import PageParam, BllUser, BllRole, BllExperiment, BllExperimentRecord, BllSpecimen, \ # BllExperimentConfig, BllWaitDisinfSpecimen, BllTerminal, BllUselog # from app.models.DateEntity import EntitySpecimen, EntityExperiment, EntityWaitDisinfSpecimen, EntityExperimentConfig, \ # EntityTerminal # from app.validators.api import * # from app.lib.StaticData import * # from app.lib.websocket_manageer import ConnectionManager # from app.lib.Log import logger # from app.utils.excel_util import ExcelExport # import io # from app.lib import usb # import httpx # # shiyan_time = 0 # END_TIME = None # START_EXPERIMENT_ID = None # # router = APIRouter(prefix='/api') # # # @router.get('/get_terminal_info', summary='获取系统信息') # async def get_terminal_info(request: Request): # terminal_info = BllTerminal().findEntity(EntityTerminal.is_enabled == 0) # return Utils.true_return(terminal_info, desc="获取成功!") # # # @router.post('/add_user', summary='创建用户') # async def add_user(request: Request, body: SignUpdateModel): # """ # 创建用户 # :body SignUpdateModel # :return: # """ # user = BllUser().findEntity(and_(EntityUser.user_name == body.username, EntityUser.is_enabled == 0)) # if user: # return Utils.false_return('', desc="账号已注册!") # entity = EntityUser() # entity.user_id = str(Utils.get_uuid()) # entity.user_pwd = Utils.MD5(body.password) # entity.user_name = body.username # entity.real_name = body.name # entity.role_id = body.role_id # entity.user_sex = body.sex # entity.update_time = Utils.get_time() # BllUser().insert(entity) # return Utils.true_return('', desc="创建成功!") # # # @router.post('/login', summary='用户登录') # async def login(request: Request, body: SignInModel): # """ # 用户登录 # :body SignInModel # :return: # """ # user = BllUser().login(body.username, body.password) # if user: # CurrentUser["username"] = user.user_name # CurrentUser["user_id"] = user.user_id # return Utils.true_return(user, desc="登录成功!") # return Utils.false_return('', desc="登录失败,账号与密码不匹配!") # # # @router.post('/user_list', summary='用户列表') # async def user_list(request: Request, body: UserModel): # """ # 用户列表 # :return: # """ # queryOrm = BllUser().getUserList() # pageparam = PageParam(body.page_no, body.page_size) # user_list = BllUser().queryPage(queryOrm, pageparam) # pagination = { # "total_count": pageparam.totalRecords, # "page_no": body.page_no, # "page_size": body.page_size # } # return Utils.true_return_pagination(pagination, user_list, desc="获取成功!") # # # @router.put('/{user_id}', summary='编辑用户') # async def update_user(user_id: str, body: SignUpdateModel): # """ # 编辑用户 # :param user_id: str # :body SignUpdateModel # :return: # """ # user = BllUser().findEntity(and_(EntityUser.user_id == user_id, EntityUser.is_enabled == 0)) # if user: # user.user_pwd = Utils.MD5(body.password) # user.user_name = body.username # user.real_name = body.name # user.role_id = body.role_id # user.user_sex = body.sex # user.update_time = Utils.get_time() # BllUser().update(user) # return Utils.true_return(user, desc="编辑成功!") # return Utils.false_return('', desc="查不到该账号!") # # # @router.delete('/delete_user/{user_id}', summary='删除用户') # async def del_user(user_id: str): # """ # 删除用户 # :param user_id: str # :return: # """ # user = BllUser().findEntity(and_(EntityUser.user_id == user_id, EntityUser.is_enabled == 0)) # if user: # user.is_enabled = 1 # user.update_time = Utils.get_time() # BllUser().update(user) # return Utils.true_return('', desc="删除成功!") # return Utils.false_return('', desc="查不到该账号!") # # # @router.get('/get_role_list', summary='获取角色列表') # async def get_role_list(request: Request): # """ # 获取角色列表 # :return: # """ # role_list = BllRole().findList(EntityRole.is_enabled == 0).all() # data_list = [{"role_name": i.role_name, "role_id": i.role_id} for i in role_list] # return Utils.true_return(data_list, desc="查询成功!") # # # @router.post('/get_specimen_list', summary='获取证物数据') # async def get_specimen_data(request: Request, body: SpecimenSearchModel): # queryOrm = BllSpecimen().get_specimen_list(body) # pageparam = PageParam(body.page_no, body.page_size) # specimen_list = BllSpecimen().queryPage(queryOrm, pageparam) # for specimen in specimen_list: # specimen.experiment_list = [i.to_dict() for i in specimen.experiments] # # pagination = { # "total_count": pageparam.totalRecords, # "page_no": body.page_no, # "page_size": body.page_size # } # return Utils.true_return_pagination(pagination, specimen_list, desc="获取成功!") # # # @router.get('/test_data', summary='测试接口') # async def get_specimen_data(type: int = None, open: int = 1): # if type: # if type == 1: # print('open door-------------') # await open_door(open) # elif type == 2: # print('config_v1-------------') # await config_v1() # elif type == 3: # print('get_sensor_data-------------') # await get_sensor_data() # elif type == 4: # print('get_device_state-------------') # await get_device_state() # # return Utils.true_return('', desc="获取成功!") # # # @router.get('/test_request', summary='测试接口') # async def test_request(): # global END_TIME # END_TIME = datetime.datetime.now() # # wait_obj = BllWaitDisinfSpecimen().findEntity( # # EntityWaitDisinfSpecimen.specimen_id == "2bd90215-b91e-11ef-ad8c-2c6dc175c6a5") # # wait_obj.done = 1 # # BllWaitDisinfSpecimen().update(wait_obj) # # return Utils.true_return('', desc="获取成功!") # # # async def get_device_state(): # url = "http://100.64.0.1/api/cabinet/v1/state" # # try: # # 发送GET请求 # async with httpx.AsyncClient() as client: # response = await client.get(url, timeout=4) # # # 检查响应状态码 # if response.status_code == 200: # # 解析响应JSON数据 # data = response.json() # # # 获取各个设备的状态 # led = data.get("led") # door = data.get("door") # fans = data.get("fans") # compressor = data.get("compressor") # heating = data.get("heating") # ozonizer = data.get("ozonizer") # disinfectant = data.get("disinfectant") # pump = data.get("pump") # work_time = data.get("work_time") # sensor = data.get("sensor") # # # 打印设备状态 # print(f"原始值: {data}") # print(f"LED 状态: {'亮' if led == 1 else '灭'}") # print(f"门状态: {'开' if door == 1 else '关'}") # print(f"风机状态: {'开' if fans == 1 else '关'}") # print(f"压缩机状态: {'开' if compressor == 1 else '关'}") # print(f"加热管状态: {'开' if heating == 1 else '关'}") # print(f"臭氧发生器状态: {'开' if ozonizer == 1 else '关'}") # print(f"紫外线消毒灯状态: {'开' if disinfectant == 1 else '关'}") # print(f"水泵状态: {'开' if pump == 1 else '关'}") # print(f"工作时间: {work_time}") # # # 检查传感器状态,注意 bit0, bit1, bit2 # sensor_status = f"温湿度传感器: {'异常' if (sensor & 0b001) else '正常'}, " # sensor_status += f"VOC传感器: {'异常' if (sensor & 0b010) else '正常'}, " # sensor_status += f"臭氧检测仪: {'异常' if (sensor & 0b100) else '正常'}" # print(sensor_status) # # else: # print(f"请求失败,状态码:{response.status_code}") # # except httpx.RequestError as e: # print(f"请求失败:{e}") # # # async def get_sensor_data(): # url = "http://100.64.0.1/api/cabinet/v1/sensor" # # try: # # 发送GET请求 # async with httpx.AsyncClient() as client: # response = await client.get(url, timeout=4) # # # 检查响应状态码 # if response.status_code == 200: # # 解析响应JSON数据 # data = response.json() # temperature = data.get("temperature") # humidity = data.get("humidity") # voc = data.get("voc") # ozone = data.get("ozone") # # # 打印返回的传感器数据 # print(f"温度: {temperature} °C") # print(f"湿度: {humidity} %") # print(f"VOC: {voc} ppm") # print(f"臭氧: {ozone} ppb") # else: # print(f"请求失败,状态码:{response.status_code}") # # except httpx.RequestError as e: # print(f"请求失败:{e}") # # # async def config_v1(): # # 定义接口URL # url = "http://100.64.0.1:80/api/cabinet/v1/setting" # 替换为实际服务器地址 # # # 定义请求数据 # payload = { # "temperature": 20, # 温度设置(18~30℃) # "humidity": 50, # 湿度设置(40~70%RH) # "ozone": 1, # 臭氧值(1为开启,0为关闭) # "time": 30, # 消毒时长(单位:分钟) # "mode": 2, # 消毒模式(0:关闭, 1:臭氧, 2:紫外线消毒) # # "light_config": 2 # LED模式(0:常关, 1:常开, 2:自动模式) # } # # # 定义请求头 # headers = { # "Content-Type": "application/json" # 确保以 JSON 格式发送数据 # } # # # 发送POST请求 # try: # async with httpx.AsyncClient() as client: # response = await client.post(url, json=payload) # # # 如果请求成功,打印响应数据 # if response: # print("响应状态码:", response.status_code) # print("响应数据:", response.json()) # 打印返回的 JSON 数据 # # except httpx.RequestError as e: # print(f"下位机通讯失败: {e}") # # # @router.post('/insertSpecimen', summary='新增证物') # async def insertSpecimen(request: Request, body: SpecimenModel): # ''' # # specimen_id = Column(String(50), comment="物品id", primary_key=True) # specimen_name = Column(String(50), comment="物品名称") # specimen_code = Column(String(50), comment="物品编号") # specimen_type = Column(String(50), comment="物品类型") # specimen_origin = Column(String(255), comment="物品来源") # specimen_desc = Column(String(500), comment="物品描述") # storage_location = Column(String(255), comment="存放位置") # remark = Column(String(500), comment="备注") # rfid = Column(String(50), comment="条码") # # ''' # SpecimenInsertDict.update(body.items) # spkwargs_ = SpecimenInsertDict # specimen_id = str(Utils.get_uuid()) # # specimen = BllSpecimen().findEntity(and_(EntitySpecimen.specimen_code == spkwargs_["specimen_code"], # EntitySpecimen.specimen_name == spkwargs_["specimen_name"], # EntitySpecimen.is_enabled == 0)) # if specimen: # return Utils.false_return('', desc="相同名称及编号的物证已存在!") # # if spkwargs_.get("rfid"): # specimen = BllSpecimen().findEntity(and_(EntitySpecimen.rfid == spkwargs_.get("rfid"), # EntitySpecimen.specimen_id != specimen_id, # EntitySpecimen.is_enabled == 0)) # if specimen: # specimen.rfid_state = 2 # specimen.update_time = Utils.get_time() # specimen.rfid = '' # BllSpecimen().update(specimen) # # spkwargs_["specimen_id"] = specimen_id # if spkwargs_.get("rfid"): # spkwargs_['rfid_state'] = 1 # else: # spkwargs_['rfid_state'] = 2 # # spkwargs_["user_id"] = CurrentUser["user_id"] # spkwargs_["user_name"] = CurrentUser["username"] # BllSpecimen().insertSpecimen(**spkwargs_) # data = { # 'specimen_id': specimen_id, # } # # return Utils.true_return(data, desc="新增成功!") # # # @router.post('/check_rfid', summary='RFID绑定') # async def check_rfid(request: Request, rfid: str, specimen_id: str): # if specimen_id: # specimen = BllSpecimen().findEntity(and_(EntitySpecimen.specimen_id == specimen_id, # EntitySpecimen.is_enabled == 0)) # if not specimen: # return Utils.false_return(data='', desc="查不到该物证!") # if specimen.rfid: # if specimen.rfid == rfid: # return Utils.false_return(data='', desc="该物证已绑定该条码,是否需要解绑?") # return Utils.false_return(data='', desc="该物证已绑定其他条码,确定需要更换新的条码吗?") # # specimen = BllSpecimen().findEntity(and_(EntitySpecimen.rfid == rfid, # EntitySpecimen.specimen_id != specimen_id, # EntitySpecimen.is_enabled == 0)) # if specimen: # return Utils.true_return(data='', desc="该条码已经被其他物证绑定,确定要绑定到该物证上吗?") # # else: # specimen = BllSpecimen().findEntity(and_(EntitySpecimen.rfid == rfid, # EntitySpecimen.is_enabled == 0)) # if specimen: # return Utils.false_return(data='', desc="该条码已经被其他物证绑定,确定要绑定到该物证上吗?") # # return Utils.true_return(data='', desc="可正常绑定!") # # # @router.post('/updateSpecimen', summary='编辑证物') # async def updateSpecimen(request: Request, body: SpecimenModel): # SpecimenInsertDict.update(body.items) # spkwargs_ = SpecimenInsertDict # specimen_id = spkwargs_["specimen_id"] # if spkwargs_.get("rfid"): # specimen = BllSpecimen().findEntity(and_(EntitySpecimen.rfid == spkwargs_.get("rfid"), # EntitySpecimen.specimen_id != specimen_id, # EntitySpecimen.is_enabled == 0)) # if specimen: # specimen.rfid_state = 2 # specimen.update_time = Utils.get_time() # specimen.rfid = '' # BllSpecimen().update(specimen) # # specimen = BllSpecimen().findEntity(EntitySpecimen.specimen_id == specimen_id) # if specimen: # specimen.specimen_name = spkwargs_["specimen_name"] # specimen.specimen_type = spkwargs_["specimen_type"] # specimen.specimen_desc = spkwargs_["specimen_desc"] # specimen.specimen_code = spkwargs_["specimen_code"] # specimen.specimen_origin = spkwargs_["specimen_origin"] # specimen.storage_location = spkwargs_["storage_location"] # if spkwargs_.get("rfid"): # specimen.rfid_state = 1 # else: # specimen.rfid_state = 2 # specimen.rfid = spkwargs_["rfid"] # specimen.update_time = Utils.get_time() # BllSpecimen().update(specimen) # return Utils.true_return('', desc="编辑成功!") # return Utils.false_return('', desc="查不到该条数据!") # # # @router.post('/deleteSpecimen', summary='删除证物') # async def deleteSpecimen(request: Request, specimen_id: str): # """ # 删除证物 # :param specimen_id: str # :return: # """ # specimen = BllSpecimen().findEntity(EntitySpecimen.specimen_id == specimen_id) # if specimen: # specimen.is_enabled = 1 # specimen.update_time = Utils.get_time() # BllSpecimen().update(specimen) # return Utils.true_return('', desc="删除成功!") # # # @router.post('/storage', summary='入库') # async def storage(request: Request, body: InStorageModel): # """ # specimen_ids 证物id # storage_type 1 入库,2 入库并消毒 # """ # specimen_ids = body.specimen_ids # storage_type = body.storage_type # specimens = BllSpecimen().findList(EntitySpecimen.specimen_id.in_(specimen_ids)).all() # # if storage_type == 2: # for specimen in specimens: # specimen_id = specimen.specimen_id # wait_disinf_specimen = BllWaitDisinfSpecimen().findEntity( # and_(EntityWaitDisinfSpecimen.specimen_id == specimen_id, # EntityWaitDisinfSpecimen.done == 0, # EntityWaitDisinfSpecimen.is_enabled == 0, # EntityWaitDisinfSpecimen.user_id == CurrentUser["user_id"])) # if not wait_disinf_specimen: # insertdata = { # 'record_id': str(Utils.get_uuid()), # 'specimen_id': specimen_id, # 'specimen_name': specimen.specimen_name, # 'specimen_code': specimen.specimen_code, # 'specimen_type': specimen.specimen_type, # 'specimen_origin': specimen.specimen_origin, # 'specimen_desc': specimen.specimen_desc, # 'storage_location': specimen.storage_location, # 'rfid': specimen.rfid, # 'remark': specimen.remark, # 'user_id': CurrentUser["user_id"], # 'user_name': CurrentUser["username"], # 'done': 0, # 'disinf_type': 1, # } # BllWaitDisinfSpecimen().insert_wait_disinf_specimen(**insertdata) # for specimen in specimens: # kwarge = { # 'specimen_id': specimen.specimen_id, # 'inventory_status': 1 # 在库 # } # BllSpecimen().updateSpecimen(**kwarge) # await create_use_log(specimen.specimen_id, '入库') # return Utils.true_return('', desc="入库成功!") # # # @router.post('/warehouse_out', summary='出库') # async def storage(request: Request, body: InStorageModel): # """ # specimen_ids 证物id # storage_type 1 出库,2 出库并消毒 # """ # specimen_ids = body.specimen_ids # storage_type = body.storage_type # specimens = BllSpecimen().findList(EntitySpecimen.specimen_id.in_(specimen_ids)).all() # if storage_type == 2: # for specimen in specimens: # specimen_id = specimen.specimen_id # wait_disinf_specimen = BllWaitDisinfSpecimen().findEntity( # and_(EntityWaitDisinfSpecimen.specimen_id == specimen_id, # EntityWaitDisinfSpecimen.done == 0, # EntityWaitDisinfSpecimen.is_enabled == 0, # EntityWaitDisinfSpecimen.user_id == CurrentUser["user_id"])) # if not wait_disinf_specimen: # insertdata = { # 'record_id': str(Utils.get_uuid()), # 'specimen_id': specimen_id, # 'specimen_name': specimen.specimen_name, # 'specimen_code': specimen.specimen_code, # 'specimen_type': specimen.specimen_type, # 'specimen_origin': specimen.specimen_origin, # 'specimen_desc': specimen.specimen_desc, # 'storage_location': specimen.storage_location, # 'rfid': specimen.rfid, # 'remark': specimen.remark, # 'user_id': CurrentUser["user_id"], # 'user_name': CurrentUser["username"], # 'done': 0, # 'disinf_type': 2, # } # BllWaitDisinfSpecimen().insert_wait_disinf_specimen(**insertdata) # for specimen in specimens: # specimen_id = specimen.specimen_id # kwarge = { # 'specimen_id': specimen_id, # 'inventory_status': 2 # } # BllSpecimen().updateSpecimen(**kwarge) # await create_use_log(specimen.specimen_id, '出库') # return Utils.true_return('', desc="出库成功!") # # # @router.post('/addIntoWaitDisinf', summary='添加待消毒') # async def addIntoWaitDisinf(request: Request, body: InStorageModel): # ''' # specimen_ids 证物ids # ''' # specimen_ids = body.specimen_ids # specimen_list = BllSpecimen().findEntity(EntitySpecimen.specimen_id.in_(specimen_ids)) # if not specimen_list: # return Utils.false_return('', desc="无此证物!") # for specimen in specimen_list: # specimen_id = specimen.specimen_id # wait_disinf_specimen = BllWaitDisinfSpecimen().findEntity( # and_(EntityWaitDisinfSpecimen.specimen_id == specimen_id, # EntityWaitDisinfSpecimen.done == 0, # EntityWaitDisinfSpecimen.is_enabled == 0, # EntityWaitDisinfSpecimen.user_id == CurrentUser["user_id"])) # if not wait_disinf_specimen: # insertdata = { # 'record_id': str(Utils.get_uuid()), # 'specimen_id': specimen_id, # 'specimen_name': specimen.specimen_name, # 'specimen_code': specimen.specimen_code, # 'specimen_type': specimen.specimen_type, # 'specimen_origin': specimen.specimen_origin, # 'specimen_desc': specimen.specimen_desc, # 'storage_location': specimen.storage_location, # 'rfid': specimen.rfid, # 'remark': specimen.remark, # 'user_id': CurrentUser["user_id"], # 'user_name': CurrentUser["username"], # 'done': 0, # 'disinf_type': 3, # 其他 # } # await BllWaitDisinfSpecimen().insert_wait_disinf_specimen(**insertdata) # await create_use_log(specimen.specimen_id, '加入待消毒清单') # return Utils.true_return('', desc="添加成功!") # # # @router.post('/getWaitDisinfSpecimenList', summary='获取待消毒列表') # async def getWaitDisinfSpecimenList(request: Request, body: WaitSpecimenSearchModel): # queryOrm = BllWaitDisinfSpecimen().get_wait_disinf_specimen_list(body) # pageparam = PageParam(body.page_no, body.page_size) # waitspecimen_list = BllWaitDisinfSpecimen().queryPage(queryOrm, pageparam) # pagination = { # "total_count": pageparam.totalRecords, # "page_no": body.page_no, # "page_size": body.page_size # } # return Utils.true_return_pagination(pagination, waitspecimen_list, desc="获取成功!") # # # @router.post('/insertConfig', summary='新增消毒配置') # async def insertConfig(request: Request, body: SpecimenConfigModel): # ''' # config_id = Column(String(), comment="id",primary_key=True) # ozone_disinf = Column(Integer, comment="臭氧消毒") # uv_disinf = Column(Integer, comment="紫外线消毒") # ozone_conc = Column(String(50), comment="臭氧浓度") # disinf_time = Column(String(50), comment="消毒时间") # temperature = Column(String(50), comment="温度") # humidity = Column(String(50), comment="湿度") # created_time = Column(String(50), default=Utils.get_time(), comment="创建时间") # update_time = Column(String(50), comment="更新时间") # user_id = Column(String(50), comment="创建人") # user_name = Column(String(50), comment="用户昵称") # # config_name = Column(String(50), comment="配置名称") # config_desc = Column(String(500), comment="配置描述") # ''' # ConfigInsertDict.update(body.items) # spkwargs_ = ConfigInsertDict # config_id = str(Utils.get_uuid()) # # spkwargs_["config_id"] = config_id # spkwargs_["user_id"] = CurrentUser["user_id"] # spkwargs_["user_name"] = CurrentUser["username"] # BllExperimentConfig().insertConfig(**spkwargs_) # data = { # 'config_id': config_id, # } # # return Utils.true_return(data, desc="新增成功!") # # # @router.post('/getConfigList', summary='获取消毒配置') # async def getConfigList(request: Request): # body = { # 'user_id': CurrentUser["user_id"] # } # specimen_list = BllExperimentConfig().get_config_list(body).all() # return Utils.true_return(specimen_list, desc="查询成功!") # # # @router.post('/updateConfig', summary='更新消毒配置') # async def updateConfig(request: Request, body: SpecimenConfigModel): # ConfigInsertDict.update(body.items) # spkwargs_ = ConfigInsertDict # config_id = spkwargs_["config_id"] # config = BllExperimentConfig().findEntity(EntityExperimentConfig.config_id == config_id) # if config: # config.ozone_disinf = spkwargs_.get("ozone_disinf") # config.uv_disinf = spkwargs_.get("uv_disinf") # config.ozone_conc = spkwargs_.get("ozone_conc") # config.disinf_time = spkwargs_.get("disinf_time") # config.temperature = spkwargs_.get("temperature") # config.humidity = spkwargs_.get("humidity") # config.config_name = spkwargs_.get("config_name") # config.config_desc = spkwargs_.get("config_desc") # config.update_time = Utils.get_time() # BllExperimentConfig().update(config) # return Utils.true_return('', desc="更新成功!") # return Utils.false_return('', desc="无此配置!") # # # @router.post('/deleteConfig', summary='删除配置') # async def deleteConfig(request: Request, config_id: str): # config = BllExperimentConfig().findEntity(EntityExperimentConfig.config_id == config_id) # if config: # config.is_enabled = 1 # BllExperimentConfig().update(config) # return Utils.true_return('', desc='删除成功') # return Utils.false_return('', desc="无此配置!") # # # @router.post('/insertExperiment', summary='新增消毒流程') # async def insertExperiment(request: Request, body: InsertModel): # ''' # # experiment_id = Column(String(50), comment="消毒流程id", primary_key=True) # # ozone_disinf = Column(Integer, comment="臭氧消毒") # uv_disinf = Column(Integer, comment="紫外线消毒") # ozone_conc = Column(String(50), comment="臭氧浓度") # disinf_time = Column(String(50), comment="消毒时间") # temperature = Column(String(50), comment="浓度") # humidity = Column(String(50), comment="湿度") # # remark = Column(String(500), comment="备注信息") # created_time = Column(String(50), default=Utils.get_time(), comment="创建时间") # update_time = Column(String(50), comment="更新时间") # # user_id = Column(String(50), comment="创建人") # user_name = Column(String(50), comment="用户昵称") # is_enabled = Column(Integer, default=0, comment="是否禁用或删除") # state = Column(Integer, default=0, comment="状态") # # specimens = relationship('EntitySpecimen', secondary=specimen_experiment, back_populates='experiments',comment="证物") # # ''' # InsertDict.update(body.items) # kwargs_ = InsertDict # experiment_id = str(Utils.get_uuid()) # # kwargs_["experiment_id"] = experiment_id # kwargs_["user_id"] = CurrentUser["user_id"] # kwargs_["user_name"] = CurrentUser["username"] # kwargs_["name"] = "证人证物等{}件证物".format(len(kwargs_["specimens"])) # BllExperiment().insertExperiment(**kwargs_) # # data = { # "experiment_id": kwargs_["experiment_id"], # } # # # await startExperiment(request, experiment_id) # # return Utils.true_return(data, desc="新增成功!") # # # @router.get('/startExperiment/{experiment_id}', summary='开始消毒流程') # async def startExperiment(request: Request, experiment_id: str): # experiment_obj = BllExperiment().findEntity( # and_(EntityExperiment.experiment_id == experiment_id, EntityExperiment.is_enabled == 0)) # if not experiment_obj: # return Utils.false_return('', desc="查不到该实验!") # kwargs_ = { # "experiment_id": experiment_id, # } # global START_EXPERIMENT_ID # START_EXPERIMENT_ID = experiment_id # r = ExperimentData(experiment_id) # r.init() # mode = experiment_obj.ozone_disinf + experiment_obj.uv_disinf * 2 # params = { # "temperature": int(experiment_obj.temperature), # "humidity": int(experiment_obj.humidity), # "ozone": int(experiment_obj.ozone_conc), # "time": int(experiment_obj.disinf_time), # "mode": mode # } # # url = f'http://100.64.0.1/api/cabinet/v1/setting' # print('开始消毒设置下位机参数param:::', params) # async with httpx.AsyncClient() as client: # try: # response = await client.post(url, json=params) # except Exception: # return Utils.false_return(code=400, desc='下位机通讯失败') # if not response: # return Utils.false_return('', desc="试验失败,下位机通讯异常!") # print('开始消毒设置下位机响应:::', response.json()) # return Utils.true_return('', desc="成功") # # @router.get('/stop', summary='停止消毒流程') # async def stop(): # url = f'http://100.64.0.1/api/cabinet/v1/stop' # async with httpx.AsyncClient() as client: # try: # response = await client.get(url, timeout=4) # data = response.json() # return Utils.true_return('', desc=data) # except Exception as e: # return Utils.false_return(code=400, desc='下位机通讯失败') # if not response: # return Utils.false_return('', desc="试验失败,下位机通讯异常!") # print('停止消毒设置下位机响应:::', response.json()) # return Utils.true_return('', desc="成功") # # # @router.post('/updateExperiment', summary='更新试验流程,保存试验结果') # async def updateExperiment(request: Request, body: UpdateModel): # """ # Args: # request (Request): _description_ # """ # experiment_obj = BllExperiment().findEntity( # and_(EntityExperiment.experiment_id == body.id, EntityExperiment.is_enabled == 0)) # if not experiment_obj: # return Utils.false_return('', desc="查不到该实验!") # body.items['experiment_id'] = body.id # obj = BllExperiment().updateExperiment(**body.items) # specimen = BllSpecimen().findEntity(EntitySpecimen.specimen_id == experiment_obj.specimen_id) # if obj.test_result == 1: # if not specimen.min_ignition_energy: # ct = { # 'specimen_id': specimen.specimen_id, # 'min_ignition_energy': obj.ignition_energy # } # BllSpecimen().updateSpecimen(**ct) # elif specimen.min_ignition_energy and float(obj.ignition_energy) < specimen.min_ignition_energy: # ct = { # 'specimen_id': specimen.specimen_id, # 'min_ignition_energy': obj.ignition_energy # } # BllSpecimen().updateSpecimen(**ct) # r = ExperimentData(body.id) # r.finish() # return Utils.true_return(obj, desc="成功") # # # @router.post('/stopExperiment', summary='消毒流程结束') # async def stopExperiment(request: Request, body: StopModel): # experiment = BllExperiment().findEntity( # and_(EntityExperiment.experiment_id == body.id, EntityExperiment.is_enabled == 0)) # if not experiment: # return Utils.false_return('', desc="查不到该消毒!") # # # 试验完成 待消毒清单 done = 1 specimens # # 物证 disinfection_state = 1 # # # 结束试验都传0 # params = { # "temperature": 20, # 温度设置(18~30℃) # "humidity": 50, # 湿度设置(40~70%RH) # "ozone": 1, # 臭氧设置(1为开启,0为关闭) # "time": 30, # 消毒时长(单位:分钟) # "mode": 2, # 消毒模式(0:关闭, 1:臭氧, 2:紫外线消毒) # "light_config": 2 # LED模式(0:常关, 1:常开, 2:自动模式) # } # # print('结束消毒设置下位机参数param:::', params) # url = f'http://100.64.0.1/api/cabinet/v1/setting' # async with httpx.AsyncClient() as client: # try: # response = await client.post(url, json=params) # except Exception: # return Utils.false_return(code=400, desc='下位机通讯失败') # if not response: # return Utils.false_return('', desc="试验失败,下位机通讯异常!") # # print('结束消毒设置下位机响应:::', dict(response)) # return Utils.true_return('', desc="消毒结束!") # # # async def create_use_log(specimen_id, operation_type): # specimen = BllSpecimen().findEntity(EntitySpecimen.specimen_id == specimen_id) # kwargs_ = { # "user_id": CurrentUser["user_id"], # "user_name": CurrentUser["username"], # "specimen_id": specimen_id, # "specimen_code": specimen.specimen_code, # "specimen_name": specimen.specimen_name, # "operation_type": operation_type, # } # BllUselog().insert_log(**kwargs_) # # # @router.post('/get_experiment_list', summary='获取所有消毒流程') # async def get_all_experiment(request: Request, body: SearchWordModel): # """ # 获取所有试验 # :body SearchWordModel # :return: # """ # # pageparam = PageParam(body.page_no, body.page_size) # queryOrm = BllExperiment().getExperimentList(body) # experiment_list = BllExperiment().queryPage(queryOrm, pageparam) # for experiment in experiment_list: # experiment.specimen = [i.to_dict() for i in experiment.specimens] # pagination = { # "total_count": pageparam.totalRecords, # "page_no": body.page_no, # "page_size": body.page_size # } # return Utils.true_return_pagination(pagination, experiment_list, desc="获取成功!") # # # @router.get('/experiment_info', summary='获取试验流程详情') # async def experiment_info(request: Request, experiment_id: str): # """ # 获取正式试验详情 # :return: # """ # experiment_obj = BllExperiment().findEntity(EntityExperiment.experiment_id == experiment_id) # base_data = ExperimentData(experiment_id).get_raw_data() # data = { # 'base_data': base_data, # 'experiment_obj': experiment_obj, # } # return Utils.true_return(data=data, desc="获取成功!") # # # @router.get('/get_temperature') # async def get_temperature(): # try: # url = f'http://100.64.0.1/api/cabinet/v1/sensor' # print('下位机获取温度通信:::') # async with httpx.AsyncClient() as client: # response = await client.get(url, timeout=4) # environment_info = response.json() # print('下位机获取温度通信:::', environment_info) # return Utils.true_return(data=environment_info, desc="获取成功!") # except Exception as e: # return Utils.false_return(code=400, desc=f"下位机获取温度通信失败,{str(e)}") # # # @router.get('/get_machine_state') # async def get_machine_state(): # try: # url = f'http://100.64.0.1/api/cabinet/v1/state' # # print('开始获取下位机状态值:::') # async with httpx.AsyncClient() as client: # response = await client.get(url, timeout=4) # status = response.json() # # print('结束获取下位机状态值:::', status) # if status: # url = "http://100.64.0.1/api/cabinet/v1/sensor" # response_sensor = await client.get(url, timeout=4) # status_sensor = response_sensor.json() # temperature = status_sensor.get("temperature") # humidity = status_sensor.get("humidity") # voc = status_sensor.get("voc") # ozone = status_sensor.get("ozone") # # print('结束获取下位机传感器状态值:::', status_sensor) # before_status = StatusDict["status"] # StatusDict["status"] = status.get('state') # StatusDict["led"] = status.get("led") # StatusDict["door"] = status.get("door") # StatusDict["fans"] = status.get("fans") # StatusDict["compressor"] = status.get("compressor") # StatusDict["heating"] = status.get("heating") # StatusDict["ozonizer"] = status.get("ozonizer") # StatusDict["disinfectant"] = status.get("disinfectant") # StatusDict["pump"] = status.get("pump") # StatusDict["work_time"] = status.get("work_time") if status.get("work_time") else 20 # StatusDict["pump"] = status.get("pump") # sensor = status.get("sensor") # StatusDict["temperature_sensor"] = sensor & 0b001 # StatusDict["voc_sensor"] = sensor & 0b010 # StatusDict["ozone_sensor"] = sensor & 0b100 # # StatusDict["temperature"] = temperature # StatusDict["humidity"] = humidity # StatusDict["voc"] = voc # StatusDict["ozone"] = ozone # # print('当前下位机状态值字典::::', dict(StatusDict)) # # # print('当前设置臭氧、温度::::', InsertDict["ozone_conc"], InsertDict["temperature"]) # # # 臭氧超出设置的值5以上返回错误 # if status.get('state') == 2 and InsertDict["ozone_conc"] and voc > float(InsertDict["ozone_conc"]) - 5: # return Utils.false_return(desc="臭氧浓度超出设置上限!") # # # 温度超出设置的值5以上返回错误 # if status.get('state') == 2 and InsertDict["temperature"] and voc > float( # InsertDict["temperature"]) - 5: # return Utils.false_return(desc="温度超出设置上限!") # # 消毒完成时修改待消毒状态 # print('消毒状态::::', status.get('state'), before_status) # if before_status == 2 and status.get('state') == 0: # await open_door(0) # global END_TIME , START_EXPERIMENT_ID # END_TIME = datetime.datetime.now() # 记录试验开始时间 # print('消毒完成:::', START_EXPERIMENT_ID) # wait_obj = BllWaitDisinfSpecimen().findEntity( # EntityWaitDisinfSpecimen.specimen_id == START_EXPERIMENT_ID) # wait_obj.done = 1 # BllWaitDisinfSpecimen().update(wait_obj) # # return Utils.true_return(data=dict(StatusDict), desc="获取成功!") # # # try: # # if ExperimentData.current_obj: # # pre_status = ExperimentData.current_obj.get_last_status()["status"] # 上一次状态 # # if status.status != pre_status: # # pre_status_data = ExperimentData.current_obj.get_last_status() # # current_work_status = await get_current_work_status(ExperimentData.id) # # StatusDict['work_status'] = current_work_status # # if current_work_status != 99: # # status_data = ExperimentData.current_obj.update_status(status) # 试验数据记录 # # now_status = status['status'] # 本次状态 # # curr_result = await update_experiment(ExperimentData.id, pre_status, now_status, pre_status_data) # # # # except Exception as e: # # logger.exception(e) # # except Exception as e: # return Utils.false_return(code=400, desc=f"下位机获取温度通信失败,{str(e)}") # # # @router.get('/open_door', summary='打开门') # async def open_door(open: int = None, confirm: int = 0): # if StatusDict["status"] and StatusDict["status"] == 2: # return Utils.false_return(desc=f"实验中不能开门!") # if open > 0 and confirm == 0 and END_TIME: # # 获取当前时间 # current_time = datetime.datetime.now() # # # 计算时间差 # time_diff = current_time - END_TIME # # 判断时间差是否小于等于 30 分钟 # if time_diff > datetime.timedelta(seconds=0): # remaining_minutes =30 - int(time_diff.total_seconds() // 60) # 剩余的分钟数 # if remaining_minutes <= 30 and remaining_minutes >= 0: # return Utils.false_return(desc=f"结束试验后等待30分钟才能开门,剩余{remaining_minutes}分钟") # # # 定义请求头 # headers = { # "Content-Type": "application/json" # 确保以 JSON 格式发送数据 # } # params = { # "open": open # } # url = 'http://100.64.0.1/api/cabinet/v1/door' # # async with httpx.AsyncClient() as client: # try: # logger.info(f"Request URL: {url}") # logger.info(f"Request Headers: {headers}") # logger.info(f"Request Body: {params}") # # response = await client.post(url, json=params, headers=headers) # # response.raise_for_status() # 如果响应码是 4xx/5xx,会抛出异常 # # # 解析响应 # response_data = response.json() # # # 打印响应数据 # print('响应数据:', response_data) # # if response_data.get("code") == 200: # print("操作成功!门已开/关。") # elif response_data.get("code") == 405: # print("参数错误或操作失败,请检查请求数据。") # else: # print("未知响应:", response_data) # except Exception as e: # logger.error(f"Request failed: {e}") # return {"code": 400, "desc": "下位机通讯失败"} # # if response.status_code != 200: # logger.warning("Communication with the lower machine failed.") # return {"code": 503, "desc": "试验失败,下位机通讯异常!"} # # return {"code": 200, "desc": "门已成功打开"} # # # @router.get('/open_light', summary='打开门') # async def open_light(open: int = None): # # 定义请求头 # headers = { # "Content-Type": "application/json" # 确保以 JSON 格式发送数据 # } # params = { # "light": open # } # url = 'http://100.64.0.1/api/cabinet/v1/led' # # async with httpx.AsyncClient() as client: # try: # logger.info(f"Request URL: {url}") # logger.info(f"Request Headers: {headers}") # logger.info(f"Request Body: {params}") # # response = await client.post(url, json=params, headers=headers) # # response.raise_for_status() # 如果响应码是 4xx/5xx,会抛出异常 # # # 解析响应 # response_data = response.json() # # # 打印响应数据 # print('响应数据:', response_data) # # if response_data.get("code") == 200: # print("操作成功!灯已开/关。") # elif response_data.get("code") == 405: # print("参数错误或操作失败,请检查请求数据。") # else: # print("未知响应:", response_data) # except Exception as e: # logger.error(f"Request failed: {e}") # return {"code": 400, "desc": "下位机通讯失败"} # # if response.status_code != 200: # logger.warning("Communication with the lower machine failed.") # return {"code": 503, "desc": "试验失败,下位机通讯异常!"} # # return {"code": 200, "desc": "灯已成功打开"} # # # @router.get("/shut_down", summary="关闭服务器") # def shut_down(request: Request): # # 提升权限 # # if not ctypes.windll.shell32.IsUserAnAdmin(): # # os.execl(sys.executable, sys.executable, *sys.argv) # response = modbus_server.StopExperiment(**StopDict) # if not response: # return Utils.false_return('', desc="复位失败,下位机通讯异常!") # # 关闭计算机 # os.system("shutdown -s -t 0") # return Utils.true_return("", desc="关机成功") # # # # 获取当前试验状态 # async def get_current_work_status(experiment_id: str): # experiment = BllExperiment().findEntity( # and_(EntityExperiment.experiment_id == experiment_id, EntityExperiment.is_enabled == 0)) # if not experiment: # return Utils.false_return('', desc="实验不存在!") # return experiment.work_status # # # async def update_experiment(experiment_id: str, pre_status: int, now_status: int, pre_status_data): # # if now_status == pre_status: # # return '' # # experiment_obj = BllExperiment().findEntity( # and_(EntityExperiment.experiment_id == experiment_id, EntityExperiment.is_enabled == 0)) # max_pressure = experiment_obj.max_pressure if experiment_obj.max_pressure else 0 # if experiment_obj.max_pressure < float(StatusDict['s_pressure'] or 0): # max_pressure = float(StatusDict['s_pressure'] or 0) # max_temp = experiment_obj.max_temp if experiment_obj.max_temp else 0 # if experiment_obj.max_temp < float(StatusDict['temperature'] or 0): # max_temp = float(StatusDict['temperature'] or 0) # # if now_status == 5 and pre_status != 5: # kk = { # 'experiment_id': experiment_id, # 'max_temp': max_temp, # 'max_pressure': max_pressure, # 'work_status': 99, # } # BllExperiment().updateExperiment(**kk) # else: # kk = { # 'experiment_id': experiment_id, # 'max_temp': max_temp, # 'max_pressure': max_pressure, # 'work_status': now_status, # } # BllExperiment().updateExperiment(**kk) # # specimen = BllSpecimen().findEntity( # and_(EntitySpecimen.specimen_id == experiment_obj.specimen_id, EntitySpecimen.is_enabled == 0)) # max_pressure = specimen.max_pressure if specimen.max_pressure else 0 # if specimen.max_pressure < float(StatusDict['s_pressure'] or 0): # max_pressure = float(StatusDict['s_pressure'] or 0) # max_temp = specimen.max_temp if specimen.max_temp else 0 # if specimen.max_temp < float(StatusDict['temperature'] or 0): # max_temp = float(StatusDict['temperature'] or 0) # kk = { # 'specimen_id': specimen.id, # 'max_temp': max_temp, # 'max_pressure': max_pressure, # } # BllSpecimen().updateSpecimen(**kk) # # # @router.post("/export_table", summary="导出报表按样品") # async def export_table(request: Request, body: ExportTableModel): # """报表导出 # # Args: # request (Request): _description_ # body (ExportTableModel): # specimen_id: 样品id # download_type: 下载类型(usb:导出到u盘),传空为返回到浏览器 # """ # specimen_obj = BllSpecimen().findEntity(EntitySpecimen.specimen_id == body.specimen_id) # # experiments = BllExperiment().findList(EntityExperiment.specimen_id == body.specimen_id).all() # # all_data = [] # file_name = "" # specimen_data = { # '样品名称': specimen_obj.name, # '试验次数': specimen_obj.experiment_times, # '最高温度': specimen_obj.max_temp, # '最高压力': specimen_obj.max_pressure, # '最后试验时间': specimen_obj.update_time, # } # # base_data_title = [ # '实验时间', '实验人', '样品名称', '当前次数', '粉尘重量g', '点火能量', '喷粉压力', '最大温度', '结果' # ] # data = { # "file_name": file_name, # "specimen_data": specimen_data, # "sheet_title": specimen_obj.created_time.replace(":", "-"), # "title_list": ["{}试验报告".format(specimen_obj.name)], # "base_data_title": base_data_title # } # base_data_list = [] # for experiment in experiments: # base_data = { # "experiment_id": experiment.experiment_id, # "样品名称": experiment.name, # "实验人": experiment.user_name, # "实验时间": experiment.created_time, # "当前次数": experiment.experiment_times, # "粉尘重量g": experiment.specimen_quantity, # "点火能量": experiment.ignition_energy, # "喷粉压力": experiment.powdered_coating_pressure, # "最大温度": experiment.max_temp, # "结果": experiment.test_result, # 'other_data': ExperimentData(experiment.experiment_id).get_raw_data() # # } # base_data_list.append(base_data) # data['base_data_list'] = base_data_list # all_data.append(data) # # excel_obj = ExcelExport(file_name, all_data, '') # binary, encoded_filename = excel_obj.experiment_table() # if isinstance(binary, int): # if binary == 404: # return Utils.false_return(desc="U盘未找到") # return Utils.true_return(desc="导出成功") # # # 导出到浏览器 # return Response(content=binary.getvalue(), # media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # headers={'Content-Disposition': f'attachment; filename={encoded_filename}'}) # # # @router.get("/exit", summary="退出服务") # def exit_server(request: Request): # """退出服务""" # for proc in psutil.process_iter(['pid', 'name']): # print(proc.name()) # if proc.name() == "chrome.exe": # try: # proc.kill() # print(f"进程 {proc.pid} 已被终止。") # except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e: # print(f"无法终止进程 {proc.pid}:{e}") # sys.exit(0) # # # @router.get('/userfaceLogin', ) # def userfaceLogin(request: Request, id: str): # try: # user = BllUser().facelogin(id) # end_time = datetime.datetime.strptime('2082-08-21 01:01', '%Y-%m-%d %H:%M') # now_time = datetime.datetime.now() # if now_time > end_time or os.path.exists('gq.txt'): # if not os.path.exists('gq.txt'): # gqFile = open('gq.txt', 'w+', encoding="utf-8") # gqFile.close() # return Utils.false_return('', "程序已过期!", 0) # if user is None: # return Utils.false_return('', "用户名或密码错误!", 0) # else: # return Utils.true_return(user, "登录成功!", 1) # except Exception as e: # print("设备初始化失败,请重启仪器") # return Utils.false_return('', '设备初始化失败,请重启仪器。' + str(e), 0) # # # @router.post('/sendMsg', summary='向下位机发送命令') # async def sendMsg(request: Request, body: SendMesModel): # """ # 向下位机发送命令 # body (SendMesModel): # name: str, ZHIFENG # status: int, # 0001 开始制粉 # 0002 制粉完成 # """ # # 制粉指令 # if body.name == "ZHIFENG": # response = modbus_server.Send_Mes(Addr=ZHIFENG, status=body.status) # else: # return Utils.false_return('', desc="指令错误!") # if response != None: # return Utils.true_return('', desc="发送成功!") # else: # return Utils.false_return('', desc="发送失败!") # # # async def check_last_disinfection_time(): # """ # 放松下位机指令 # """ # specimen_list = BllSpecimen().findList(EntitySpecimen.is_enabled == 0).all() # for specimen in specimen_list: # disinfection_state = 0 # if specimen.last_disinf_time: # today = datetime.datetime.now() # last_disinf_datetime = datetime.datetime.strptime(specimen.last_disinf_time, "%Y-%m-%d %H:%M:%S") # # 计算两个日期之间的差异 # difference = today - last_disinf_datetime # # 获取差异的天数 # days_difference = difference.days # # ''' # 1:三天内已消毒:在3天之内有消毒记录; # 2:一周内已消毒:在7天之内有消毒记录; # 3:一月内已消毒:在30天之内有消毒记录; # 4:三月内已消毒:在90天之内有消毒记录; # 5:半年内已消毒:在180天之内有消毒记录; # 6:一年内已消毒:在365天之内有消毒记录; # 7:超过一年未消毒:最近一次消毒已超过365天 # ''' # if days_difference <= 3: # disinfection_state = 1 # elif days_difference <= 7: # disinfection_state = 2 # elif days_difference <= 30: # disinfection_state = 3 # elif days_difference <= 90: # disinfection_state = 4 # elif days_difference <= 180: # disinfection_state = 5 # elif days_difference <= 365: # disinfection_state = 6 # else: # disinfection_state = 7 # # specimen.disinfection_state = disinfection_state # BllSpecimen().update(specimen) # # return # # # # def start_empty_scheduler(): # # """启动定时任务,每天0点执行一次""" # # scheduler = AsyncIOScheduler(timezone=datetime.timezone('Asia/Shanghai')) # # # scheduler.add_job(empty_drug_job, 'interval', seconds=30) # # scheduler.add_job(check_last_disinfection_time, 'cron', hour=0, minute=0) # # scheduler.start() # # # modbus_server = ModbusConnection() # manager = ConnectionManager() # # vk70_server = VK70xUMCDAQ()