import asyncio import io import os import datetime import os import sqlite3 import sys import time import psutil from fastapi import APIRouter, Request, Response, BackgroundTasks from sqlalchemy import and_, desc from app.lib import StaticData, usb from app.lib.Camera import CvCamera from app.lib.ModBus import ModbusConnection from app.lib.StaticData import * from app.lib.Utils import Utils from app.lib.websocket_manageer import ConnectionManager from app.models import * from app.models.Business import PageParam, BllUser, BllRole, BllDeviceConfig, BllExperimentSpecimen, \ BllExperimentInfo, BllChannel, BllExperimentReport from app.models.DateEntity import ExperimentSpecimen, ExperimentInfo, Channel, ExperimentReport from app.utils.excel_util import ExcelExport from app.validators.api import * shiyan_time = 0 END_TIME = None START_EXPERIMENT_ID = None router = APIRouter(prefix='/api/shortcircuit', tags=['短路实验']) #视频查询 class VideoModel(BaseModel): id: str @router.post('/add_user', summary='创建用户') async def add_user(request: Request, body: SignUpdateModel): """ 创建用户 :body SignUpdateModel :return: """ user = BllUser().findEntity(and_(EntityUser.user_name == body.username, EntityUser.is_enabled == 0)) if user: return Utils.false_return('', desc="账号已注册!") entity = EntityUser() entity.user_id = str(Utils.get_uuid()) entity.user_pwd = Utils.MD5(body.password) entity.user_name = body.username entity.real_name = body.name entity.role_id = body.role_id entity.user_sex = body.sex entity.update_time = Utils.get_time() BllUser().insert(entity) return Utils.true_return('', desc="创建成功!") @router.post('/login', summary='用户登录') async def login(request: Request, body: SignInModel): """ 用户登录 :body SignInModel :return: """ user = BllUser().login(body.username, body.password) if user: CurrentUser["username"] = user.user_name CurrentUser["user_id"] = user.user_id return Utils.true_return(user, desc="登录成功!") return Utils.false_return('', desc="登录失败,账号与密码不匹配!") @router.post('/user_list', summary='用户列表') async def user_list(request: Request, body: UserModel): """ 用户列表 :return: """ queryOrm = BllUser().getUserList() pageparam = PageParam(body.page_no, body.page_size) user_list = BllUser().queryPage(queryOrm, pageparam) pagination = { "total_count": pageparam.totalRecords, "page_no": body.page_no, "page_size": body.page_size } return Utils.true_return_pagination(pagination, user_list, desc="获取成功!") @router.put('/{user_id}', summary='编辑用户') async def update_user(user_id: str, body: SignUpdateModel): """ 编辑用户 :param user_id: str :body SignUpdateModel :return: """ user = BllUser().findEntity(and_(EntityUser.user_id == user_id, EntityUser.is_enabled == 0)) if user: user.user_pwd = Utils.MD5(body.password) user.user_name = body.username user.real_name = body.name user.role_id = body.role_id user.user_sex = body.sex user.update_time = Utils.get_time() BllUser().update(user) return Utils.true_return(user, desc="编辑成功!") return Utils.false_return('', desc="查不到该账号!") @router.delete('/delete_user/{user_id}', summary='删除用户') async def del_user(user_id: str): """ 删除用户 :param user_id: str :return: """ user = BllUser().findEntity(and_(EntityUser.user_id == user_id, EntityUser.is_enabled == 0)) if user: user.is_enabled = 1 user.update_time = Utils.get_time() BllUser().update(user) return Utils.true_return('', desc="删除成功!") return Utils.false_return('', desc="查不到该账号!") @router.get('/get_role_list', summary='获取角色列表') async def get_role_list(request: Request): """ 获取角色列表 :return: """ role_list = BllRole().findList(EntityRole.is_enabled == 0).all() data_list = [{"role_name": i.role_name, "role_id": i.role_id} for i in role_list] return Utils.true_return(data_list, desc="查询成功!") @router.get("/shut_down", summary="关闭服务器") def shut_down(request: Request): # 提升权限 # if not ctypes.windll.shell32.IsUserAnAdmin(): # os.execl(sys.executable, sys.executable, *sys.argv) kwargs_ = { "channel_list": ['全部'], } response = modbus_server.StopExperiment(kwargs_) camera.release() if not response: return Utils.false_return('', desc="复位失败,下位机通讯异常!") # 关闭计算机 os.system("shutdown -s -t 0") return Utils.true_return("", desc="关机成功") @router.get("/exit", summary="退出服务") def exit_server(request: Request): """退出服务""" for proc in psutil.process_iter(['pid', 'name']): print(proc.name()) if proc.name() == "chrome.exe": try: proc.kill() print(f"进程 {proc.pid} 已被终止。") except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e: print(f"无法终止进程 {proc.pid}:{e}") sys.exit(0) @router.get('/userfaceLogin', ) def userfaceLogin(request: Request, id: str): try: user = BllUser().facelogin(id) end_time = datetime.datetime.strptime('2082-08-21 01:01', '%Y-%m-%d %H:%M') now_time = datetime.datetime.now() if now_time > end_time or os.path.exists('gq.txt'): if not os.path.exists('gq.txt'): gqFile = open('gq.txt', 'w+', encoding="utf-8") gqFile.close() return Utils.false_return('', "程序已过期!", 0) if user is None: return Utils.false_return('', "用户名或密码错误!", 0) else: return Utils.true_return(user, "登录成功!", 1) except Exception as e: print("设备初始化失败,请重启仪器") return Utils.false_return('', '设备初始化失败,请重启仪器。' + str(e), 0) @router.get('/getConfig' ,summary="获取设备配置") async def getConfig(request: Request ): config_info = dict(StaticData.DeviceConfig) config_info.update(StaticData.DeviceStatus) return config_info @router.get('/getChannelList' ,summary="获取通道列表") async def getChannelList(request: Request ,status :int = 0): return [item for item in StaticData.ChannelDatas if item.status == status] @router.post('/updateConfig' ,summary="更新设备配置") async def updateConfig(request: Request , body :DeviceConfigModel): StaticData.DeviceConfig.update(body) device_config_list = BllDeviceConfig().findAllList() device_config = device_config_list[0] # 假设 device_config 是从数据库查询出来的模型实例 for key, value in body.__dict__.items(): if hasattr(device_config, key): # 确保属性存在 setattr(device_config, key, value) BllDeviceConfig().update(device_config) return StaticData.DeviceConfig @router.get('/getChannelInfo' ,summary="获取通道详情") async def getChannelInfo(request: Request ,channel_id :str): chnanels = [item for item in StaticData.ChannelDatas if item.id == channel_id] experiment_ids = [item.experiment_id for item in StaticData.ChannelDatas if item.id == channel_id and hasattr(item, 'experiment_id')] report_list = [] if len(experiment_ids) > 0: report_list = BllExperimentReport().findList(and_(ExperimentReport.channel_id == channel_id, ExperimentReport.experiment_id == experiment_ids[0])).all() result = { "base_info": chnanels[0], "report_list": report_list } return result @router.get('/getStatus', summary="获取下位机状态") async def getStatus(request: Request, background_tasks: BackgroundTasks): """ 该路由会返回一个立即的响应,但不会阻塞周期性任务的执行。 """ background_tasks.add_task(run_periodic_task) # 启动后台任务 return {"message": "已启动周期性任务,每30秒调用一次下位机状态获取。"} @router.get('/experimentOver' ,summary="停止试验") async def experimentOver(request: Request ,experiment_id :str, cause :str = ''): path = Utils.get_db_path() print(path) # 获取数据库连接(假设你使用 sqlite) conn = sqlite3.connect(path) # 数据库路径 cursor = conn.cursor() try: # 开始事务 conn.execute("BEGIN TRANSACTION") # 更新实验信息 状态 0:未开始 1:进行中 2:已完成 3:已取消 curr_time = Utils.get_time() experimentInfo = BllExperimentInfo().findEntity(ExperimentInfo.id == experiment_id) if experimentInfo.state != 1: return Utils.false_return('', '试验未开始!') channel_list = [item.name for item in StaticData.ChannelDatas if hasattr(item, 'experiment_id') and item.experiment_id == experiment_id] kwargs_ = { "channel_list": channel_list, } modbus_server.StopExperiment(kwargs_) experimentInfo.state = 3 experimentInfo.end_time = curr_time experimentInfo.desc = f'手动停止试验,原因:{cause}' BllExperimentInfo().update(experimentInfo) # 更新通道状态为空闲 channel_ids = experimentInfo.channel_id.split(',') channel_experiment_dict = {experimentInfo.channel_id: None} await update_channel_status(channel_ids, curr_time,0,channel_experiment_dict,None) # 提交事务 conn.commit() except ChannelStatusError as e: # 捕获自定义异常并返回给前端 conn.rollback() # 回滚事务 print(f'发生异常:::::::{e}') return Utils.false_return('', str(e)) except Exception as e: # 捕获非自定义异常(如数据库错误等) conn.rollback() # 回滚事务 print(f'发生异常:::::::{e}') # 返回一个通用错误信息给前端 return Utils.false_return('', f"发生了系统错误: {str(e)}") finally: # 确保关闭数据库连接 conn.close() return Utils.true_return('', '试验已停止!') # @router.post('/startExperiment' ,summary="开始实验") # async def startExperiment(request: Request , body :ExperimentParamModel): # try: # curr_time = Utils.get_time() # # 调用检查通道状态的方法 # check_channel_status(body.channel_ids, StaticData.ChannelDatas) # # md5值 样品名称+电池型号+样品编号+电池规格+电池厂家+备注 # await check_specimen_exist(body) # # 保存实验信息 # await save_experiment_info(body) # # # 更新通道状态为正在实验 # channel_ids = body.channel_ids.split(',') # await update_channel_status(channel_ids, curr_time,1) # # # # except ChannelStatusError as e: # # 捕获异常并返回给前端 # return Utils.false_return('', str(e)) # # # return None @router.post('/startExperiment', summary="开始实验") async def startExperiment(request: Request, body: ExperimentParamModel): path = Utils.get_db_path() print(path) # 获取数据库连接(假设你使用 sqlite) conn = sqlite3.connect(path) # 数据库路径 cursor = conn.cursor() try: # 开始事务 conn.execute("BEGIN TRANSACTION") curr_time = Utils.get_time() timestamp = time.time() # 调用检查通道状态的方法 check_channel_status(body.channel_ids, StaticData.ChannelDatas) # 下位机发送指令 channel_ids = body.channel_ids.split(',') channel_list = [item.name for item in StaticData.ChannelDatas if channel_ids.__contains__(item.id)] kwargs_ = { "channel_list": channel_list, "temperature": body.temperature, "work_time": body.expose_time, } modbus_server.Experiment(kwargs_) # md5值 样品名称+电池型号+样品编号+电池规格+电池厂家+备注 await check_specimen_exist(body) # 保存实验信息 await save_experiment_info(body) # 更新通道状态为正在实验 channel_ids = body.channel_ids.split(',') await update_channel_status(channel_ids, curr_time,1,body.channel_experiment_dict,timestamp) # 提交事务 conn.commit() except ChannelStatusError as e: # 捕获自定义异常并返回给前端 conn.rollback() # 回滚事务 print(f'发生异常:::::::{e}') return Utils.false_return('', str(e)) except Exception as e: # 捕获非自定义异常(如数据库错误等) conn.rollback() # 回滚事务 print(f'发生异常:::::::{e}') # 返回一个通用错误信息给前端 return Utils.false_return('', f"发生了系统错误: {str(e)}") finally: # 确保关闭数据库连接 conn.close() return Utils.true_return('', '实验已开始!') @router.get('/getAllSpecimen' ,summary="获取所有样本") async def getAllSpecimen(request: Request ): experiment_specimen_list = BllExperimentSpecimen().findAllList() return experiment_specimen_list @router.post('/pageExperimentInfo' ,summary="分页获取试验信息") async def getAllSpecimen(request: Request ,body: SearchWordModel): pageparam = PageParam(body.page_no, body.page_size) queryOrm = BllExperimentInfo().getExperimentList(body) experiment_list = BllExperimentInfo().queryPage(queryOrm, pageparam) pagination = { "total_count": pageparam.totalRecords, "page_no": body.page_no, "page_size": body.page_size } return Utils.true_return_pagination(pagination, experiment_list, desc="获取成功!") @router.get('/getExperimentInfo' ,summary="获取实验详情") async def getExperimentInfo(request: Request ,experiment_id :str): experiment_info = BllExperimentInfo().findEntity(ExperimentInfo.id == experiment_id) report_list = BllExperimentReport().findList((ExperimentReport.experiment_id == experiment_id)).all() result = { "base_info": experiment_info, "report_list": report_list } return result @router.get('/mergeGate' ,summary="合闸") async def mergeGate(request: Request ,channel_id :str): # experiment_specimen_list = BllExperimentSpecimen.findAllList() return None # 保存报表数据 async def save_report(): curr_time = time.time() for item in StaticData.ChannelDatas: if not hasattr(item, 'timestamp') or not item.timestamp: continue time_ratio = round(float((curr_time - item.timestamp)/60), 1) if (item.status == 1 or item.status == 2) and item.id not in StaticData.StopIds: # 如果通道状态为正在实验或已实验,且通道id不在停止列表中 if item.status == 2 : StaticData.StopIds.append(item.id) new_id = str(Utils.get_uuid()) experiment_report = ExperimentReport(id= new_id, experiment_id=item.experiment_id, channel_id=item.id, time=time_ratio, experiment_temperature=item.temperature, battery_voltage=item.voltage, battery_electricity=item.electricity ) BllExperimentReport().insert(experiment_report) return None @router.get('/updateDoor' ,summary="修改门状态") async def updateDoor(request: Request ,status :int): if StaticData.DeviceConfig['door_status'] == status: return Utils.false_return('', "门状态未改变", 0) StaticData.DeviceConfig['door_status'] = status device_config_list = BllDeviceConfig().findAllList() device_config = device_config_list[0] device_config.door_status = status BllDeviceConfig().update(device_config) return Utils.true_return('', "修改成功") # 获取指定日期的文件列表 @router.get("/files") async def get_files_by_date(): current_date = datetime.datetime.now() date = current_date.strftime("%Y-%m-%d") pid_dir = f"/data/video" if not os.path.exists(pid_dir): return Utils.false_return(code=400, desc=f'没有找到{pid_dir}的目录') files = os.listdir(pid_dir) result = [] for file in files: if file.startswith(date.replace('-', '')) and file.endswith(".mp4"): result.append(file) result.sort(reverse=True) return Utils.true_return(data=result) #实时相机显示 @router.get('/camera_img/') def camera_img(): return Response(camera.generate(), mimetype = "multipart/x-mixed-replace; boundary=frame") @router.post("/view_video", summary="查看视频") async def view_video(request: Request, body: VideoModel): """查看视频""" record_id = body.id record_obj = BllExperimentInfo().findEntity(and_(BllExperimentInfo.id == body.id)) if not record_obj: return Utils.false_return('', desc="查不到该实验!") video_path = 'http://127.0.0.1:8088/videoFold' + '/' + record_id + '.mp4' return Utils.true_return(video_path, desc="查看成功") @router.post("/export_video", summary="导出视频") async def export_video(request: Request, body: VideoModel): """ 导出视频 id 使用 试验id """ if not body.id: return Utils.false_return("", desc="未选择导出文件") try: # 检查U盘挂载路径 upath = Utils.getUDiskPath() if not upath: return Utils.false_return("", desc="U盘未正确挂载") source_file = 'D:/Project/UI/videoFold' + '/' + body.id + '.mp4' # 检查源文件是否存在 if not os.path.isfile(source_file): return Utils.false_return("", desc="指定的视频文件不存在") # 复制视频文件到U盘 with open(source_file, mode="rb") as file: file_content = file.read() buffer = io.BytesIO(file_content) usb.put_in(body.id + '.mp4', buffer) return Utils.true_return("", desc="导出成功") except Exception as e: return Utils.false_return("", desc="导出失败") @router.post("/export_table", summary="导出报表") async def export_table(request: Request, body: ExportTableModel): """报表导出 Args: request (Request): _description_ body (ExportTableModel): ids: 实验id列表 download_type: 下载类型(usb:导出到u盘),传空为返回到浏览器 """ experiment_ids = body.specimen_id.split(',') experiments = BllExperimentInfo().findList(ExperimentInfo.id.in_(experiment_ids)).all() all_data = [] file_name = "" base_data_title = [ '实验时间', '实验人', '样品名称', '设置温度', '暴露时间', '实验结果' ] data = { "specimen_data": {}, "file_name": file_name, "sheet_title": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S").replace(":", "-"), "title_list": ["BAT05-A电池短路试验报告"], "base_data_title": base_data_title } base_data_list = [] other_data = [] if len(experiment_ids) == 1: experiment_info = BllExperimentInfo().findEntity(ExperimentInfo.id == experiment_ids[0]) other_data = BllExperimentReport().findList(and_(ExperimentReport.experiment_id == experiment_ids[0],ExperimentReport.channel_id == experiment_info.channel_id)).all() for experiment in experiments: base_data = { "experiment_id": experiment.id, "样品名称": experiment.specimen_name, "实验人": experiment.user_name, "实验时间": experiment.created_time, "设置温度": experiment.temperature, "暴露时间": experiment.expose_time, "实验结果": '成功' if experiment.state == 2 else '失败', 'other_data': other_data } base_data_list.append(base_data) data['base_data_list'] = base_data_list all_data.append(data) excel_obj = ExcelExport(file_name, all_data, body.download_type) binary, encoded_filename = excel_obj.experiment_table() if isinstance(binary, int): if binary != 404: return Response(content=binary.getvalue(), media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={'Content-Disposition': f'attachment; filename={encoded_filename}'}) return Utils.false_return(desc=encoded_filename) # 导出到浏览器 return Response(content=binary.getvalue(), media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={'Content-Disposition': f'attachment; filename={encoded_filename}'}) async def app_start(): # 项目启动执行 初始化项目配置 device_config_list = BllDeviceConfig().findAllList() StaticData.DeviceConfig.update(device_config_list[0].__dict__) # 初始化通道信息 channel_list = BllChannel().findAllList() StaticData.ChannelDatas = channel_list # 调用下位机 asyncio.create_task(run_periodic_task()) async def run_periodic_task(): while True: print("开始执行任务-------------") modbus_server.get_status() # 调用获取状态的方法 await save_report() await asyncio.sleep(30) # 每隔30秒执行一次 modbus_server = ModbusConnection() manager = ConnectionManager() camera =CvCamera() camera.release() # vk70_server = VK70xUMCDAQ() async def update_channel_status(channel_ids, curr_time,status,channel_experiment_dict,timestamp): channel_list = BllChannel().findList(Channel.id.in_(channel_ids)).all() channel_dict = {item.id : item for item in StaticData.ChannelDatas } for channel in channel_list: # 更新通道状态为正在实验 channel.status = status channel.last_start_time = curr_time BllChannel().update(channel) channel_dict.get(channel.id).status = status channel_dict.get(channel.id).last_start_time = curr_time channel_dict.get(channel.id).experiment_id = channel_experiment_dict.get(channel.id) channel_dict.get(channel.id).timestamp = timestamp if channel.id in StaticData.StopIds: StaticData.StopIds.remove(channel.id) async def save_experiment_info(body): channel_ids = body.channel_ids.split(',') for channel_id in channel_ids: new_id = str(Utils.get_uuid()) experiment_info = ExperimentInfo(id= new_id, specimen_id=body.experiment_specimen_id, channel_id=channel_id, specimen_name=body.specimen_name, battery_type=body.battery_type, specimen_num=body.specimen_num, battery_specification=body.battery_specification, battery_manufacturers=body.battery_manufacturers, temperature=body.temperature, expose_time=body.expose_time, put_time=body.put_time, short_circuit_param=body.short_circuit_param, user_id=CurrentUser['user_id'], user_name=CurrentUser['username'], state=1) # 保存实验信息 BllExperimentInfo().insert(experiment_info) body.channel_experiment_dict.update({channel_id: new_id}) camera.start(record_id=new_id) class ChannelStatusError(Exception): def __init__(self, message): self.message = message super().__init__(self.message) def check_channel_status(channel_ids, channel_datas): # 将channel_ids字符串转换为列表 channel_ids = channel_ids.split(',') # 获取 status == 0 的 id 集合 wait_ids = [item.id for item in channel_datas if item.status == 0] # 判断选择的通道是否为空 if len(channel_ids) <= 0: raise ChannelStatusError("请选择通道") # 判断是否所有通道都在实验中 if len(wait_ids) == 0: raise ChannelStatusError("所有通道都在实验中,请等待") # 判断每个通道是否处于空闲状态 for channel_id in channel_ids: if channel_id not in wait_ids: raise ChannelStatusError(f"通道 {channel_id} 不在空闲状态,请重新选择") # 如果都通过了检查,返回 True 或其他成功状态 return True async def check_specimen_exist(body): # 拼接字符串生成 MD5 source_str = body.specimen_name + body.battery_type + body.specimen_num + body.battery_specification + body.battery_manufacturers md5_upper = Utils.get_md5(source_str).upper() # 查询数据库中的样本 experiment_specimen = BllExperimentSpecimen().findEntity(ExperimentSpecimen.md5 == md5_upper) # 如果样本不存在,则创建样本 if experiment_specimen : # 将新创建的样本 ID 赋值给 body body.experiment_specimen_id = experiment_specimen.id else: new_id = Utils.get_uuid() body.experiment_specimen_id = new_id experiment_specimen = ExperimentSpecimen( id=str(new_id), md5=md5_upper, specimen_name=body.specimen_name, battery_type=body.battery_type, specimen_num=body.specimen_num, battery_specification=body.battery_specification, battery_manufacturers=body.battery_manufacturers, user_id=CurrentUser['user_id'], user_name=CurrentUser['username'] ) # 异步插入新样本 BllExperimentSpecimen().insert(experiment_specimen)