import uuid import os import json import struct import hashlib import random import copy from idlelib.iomenu import encoding import psutil import platform import datetime import numpy as np from app.lib.Log import logger from app.lib.AlchemyJsonEncoder import * from app.conf.Setting import settings import matplotlib.pyplot as plt from scipy.optimize import curve_fit system_name = platform.system() """通用工具类""" class Utils(object): def __init__(self, *args, **kwargs): return super().__init__(*args, **kwargs) #MD5加密 def MD5(str): hl = hashlib.md5() hl.update(str.encode(encoding='utf-8')) return hl.hexdigest() #获取uuid def get_uuid(): return str(uuid.uuid1()) def str_to_time(str): return datetime.datetime.strptime(str, '%Y-%m-%d %H:%M:%S') #获取当前时间 def get_time(): return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") #请求成功返回 def true_return(data='', desc='请求成功', code=200): data = { "code": code, "desc": desc, "data": data } return data def get_md5(input_str: str, encoding: str = 'utf-8') -> str: """ 获取字符串的 MD5 哈希值 Args: input_str: 输入字符串 encoding: 编码方式(默认UTF-8) Returns: 32位小写十六进制MD5哈希值 """ # 创建MD5哈希对象 md5_hash = hashlib.md5() # 将字符串编码为字节并更新哈希对象 md5_hash.update(input_str.encode(encoding)) # 返回十六进制哈希值 return md5_hash.hexdigest() def get_db_path(): """ Returns: 数据库文件路径 """ # 获取当前文件所在目录的绝对路径(即 'app/api') base_dir = os.path.dirname(os.path.abspath(__file__)) # 获取项目根目录(假设 'app' 目录在项目根目录下) project_root = os.path.dirname(base_dir) # 这个将得到 'instrument_short_circuit' # 拼接数据库文件路径 db_path = os.path.join(project_root, 'db', 'Data.db') return db_path #分页请求成功返回 def true_return_pagination(pagination=None, data=None, desc='请求成功', code=200): data = { "pagination": pagination, "code": code, "desc": desc, "data": data } return data #请求失败返回 def false_return(data='', desc='请求失败', code=-1): data = { "code": code, "desc": desc, "data": data } return data #生成统一格式接口数据 def ResultData(status,message,data=""): return json.dumps({"status":status,"message":message,"data":data},cls=AlchemyEncoder) #按时间生成随机文件名 def getFileName(): nowTime = datetime.datetime.now().strftime("%Y%m%d%H%M%S") randomNum = random.randint(0,100) if randomNum <= 10: randomNum = str(0) + str(randomNum) uniqueNum = str(nowTime) + str(randomNum) return uniqueNum #获取当前插入U盘路径 def getUDiskPath(): if system_name == 'Windows': disk_list = psutil.disk_partitions() # 获取U盘路径 u_path = [disk.device for disk in disk_list if disk.opts == 'rw,removable'] if u_path: return u_path[0] elif system_name == 'Linux': import pyudev context = pyudev.Context() removable = [device for device in context.list_devices(subsystem='block', DEVTYPE='disk') if device.attributes.asstring('removable') == "1"] for device in removable: partitions = [device.device_node for device in context.list_devices(subsystem='block', DEVTYPE='partition', parent=device)] for p in psutil.disk_partitions(): if p.device in partitions: return p.mountpoint return "" #创建文件夹 def mkdir(path): folder = os.path.exists(path) if not folder: os.makedirs(path) #int列表转换为字节流 def int_list_to_bytes(int_list): byte_stream = b'' for num in int_list: byte_stream += struct.pack('>H', num) return byte_stream #字节流转换为int列表 def bytes_to_int_list(byte_stream): int_list = [] for i in range(0, byte_stream.__len__(), 2): int_list.append(struct.unpack('>H', byte_stream[i:i+2])[0]) return int_list #获取某比特的值 def get_bit(data: int, bit: int) -> int: if data & (1 << bit): return 1 else: return 0 #获取某比特的值,并转换为布尔值 def get_bit_bool(data: int, bit: int) -> bool: if data & (1 << bit): return True else: return False #修改某比特的值,并返回新值 def set_bit(number: int, pos: int, value: int): new_number = None if value == 1: new_number = number | (1 << pos) elif value == 0: new_number = number & ~(1 << pos) return new_number #获取温度数据 def str_to_list(str_list): if str_list: str_list = str_list[:-1].strip('[]') result = [list(map(eval, sublist.split(','))) for sublist in str_list.split('],[')] return result return [] #上升速率,下降率,data数据列表,num时间间隔,tag(1罐体2炉体),t=0返回x轴为温度,反之为时间 def calculate_temp_rate(data,num,tag,t=0): result = [] x=[] y1 = [] y2 = [] for i in range(0, len(data)-num, 30): time_interval = data[i+num][0] - data[i][0] if time_interval == 0: continue temp_rate = (data[i+num][1] - data[i][1]) / time_interval * 3600 if t==0: result.append([data[i+num][tag],abs(temp_rate)]) else: result.append([data[i+num][0],data[i+num][tag],abs(temp_rate)]) x.append(data[i+num][0]) y1.append(data[i+num][tag]) y2.append(abs(temp_rate)) p_fit = Utils.get_quadratic_curve(result) return result,p_fit,x,y1,y2 # 定义二次函数模型 def quadratic_func(x, a, b, c): return a * x**2 + b * x + c #二次函数导出公式 def derivative_func(x, a, b): return 2* a * x + b #比热计算公式Cp2 def specific_heat_func(E2, K, C,M2,H): return 3600*(E2+K)/C*M2-H/M2 #发热率计算公式 def heat_generation_rate_func(M2, Cp2, H, D ,K): return ((M2*Cp2+H)*D/3600-K)/M2 #获取二次曲线p_fit为二次曲线系数 def get_quadratic_curve(data): x_data = np.array([i[0] for i in data]) y_data = np.array([i[1] for i in data]) degree = 2 # 二次多项式拟合 p_fit = np.polyfit(x_data, y_data, degree) y_fit = np.polyval(p_fit, x_data) # plt.figure() # plt.scatter(x_data, y_data, label='Original Data') # plt.plot(x_data, y_fit, 'r-', label='Fitted Curve') # plt.xlabel('x') # plt.ylabel('y') # plt.title('Polynomial Fitting') # plt.legend() # plt.grid() # plt.show() return p_fit # 热容量计算公式 def heat_capacity_func(E1, A, B, M1,Cp1): # print(type(E1), type(A), type(B), type(M1),type(Cp1)) return (3600*E1)/(A+B)-(M1*Cp1) #热损失计算公式 def heat_loss_func(H, A, M1,Cp1): return A*(H+M1*Cp1)/3600 # 计算杜瓦瓶热容量 def calculate_heat_capacity(data, data1, E1, M1, Cp1): capacity = 0 result, p_fit, x, r1, r2 = Utils.calculate_temp_rate(data, 30, 2) result1, p_fit1, x, r1, r2 = Utils.calculate_temp_rate(data1, 30, 2) for i in range(len(result)): A = abs(Utils.quadratic_func(result[i][0], p_fit[0], p_fit[1], p_fit[2])) B = abs(Utils.quadratic_func(result[i][0], p_fit1[0], p_fit1[1], p_fit1[2])) H = abs(Utils.heat_capacity_func(E1, A, B, M1, Cp1)) capacity += H return round(capacity/len(result), 2) # 计算热容量及热损失(上升数据与下降数据不等长) def calculate_heat_capacity_loss(data,data1,E1,M1,Cp1): capacity=[] capacity_sum = 0 loss=[] temp_data=[]#点位上升速率 temp_data1=[]#点位下降速率 result,p_fit,x,r1,r2 = Utils.calculate_temp_rate(data,30,1) result1,p_fit1,x,r1,r2 = Utils.calculate_temp_rate(data1,30,1) for i in range(len(result)): A = abs(Utils.quadratic_func(result[i][0],p_fit[0],p_fit[1],p_fit[2])) B = abs(Utils.quadratic_func(result[i][0],p_fit1[0],p_fit1[1],p_fit1[2])) H = abs(Utils.heat_capacity_func(E1, A, B, M1,Cp1)) capacity.append([result[i][0],H]) loss.append([result[i][0],Utils.heat_loss_func(H, A, M1,Cp1)]) for n in settings.TEM: A = abs(Utils.quadratic_func(n,p_fit[0],p_fit[1],p_fit[2])) B = abs(Utils.quadratic_func(n,p_fit1[0],p_fit1[1],p_fit1[2])) temp_data.append(round(A, 4)) temp_data1.append(round(B, 4)) H = abs(Utils.heat_capacity_func(E1, A, B, M1, Cp1)) capacity_sum += H capacity_value = round(capacity_sum/len(settings.TEM), 2) return result,p_fit,result1,p_fit1,capacity,loss,temp_data,temp_data1,capacity_value #获取K热损失H热容量2次曲线 def get_K_func(loss,capacity): p_fit = Utils.get_quadratic_curve(loss) p_fit1 = Utils.get_quadratic_curve(capacity) return p_fit,p_fit1 # 计算比热Cp2与发热率data加热data1自加热 def calculate_heat_generation_rate(data,data1,data2,E2,M2,loss,capacity): Cp2_list=[] Qt_list=[] r2,p_fit2,x,r1,r2 = Utils.calculate_temp_rate(data1,30,1)#校准上升率曲线 r3,p_fit3,x,r1,r2 = Utils.calculate_temp_rate(data2,30,1)#试验自加热斜率曲线 p_fit,p_fit1 =Utils.get_K_func(loss,capacity)#热损失,热容量2次曲线 # print(p_fit,p_fit1,p_fit2,p_fit3) result = data2 for i in range(len(result)): C =Utils.quadratic_func(result[i][0],p_fit2[0],p_fit2[1],p_fit2[2]) K =Utils.quadratic_func(result[i][0],p_fit[0],p_fit[1],p_fit[2]) H =Utils.quadratic_func(result[i][0],p_fit1[0],p_fit1[1],p_fit1[2]) D =Utils.quadratic_func(result[i][0],p_fit3[0],p_fit3[1],p_fit3[2]) Cp2 =Utils.specific_heat_func(E2, K, C,M2,H) Qt =Utils.heat_generation_rate_func(M2, Cp2, H, D ,K) Cp2_list.append([result[i][0],Cp2]) Qt_list.append([result[i][1],Qt]) return Cp2_list,Qt_list def format_status_data(status_data): result_data = copy.deepcopy(status_data) # if "g_temp" in result_data: # result_data["g_temp"] = round(status_data["g_temp"], 2) # # if "l_temp" in result_data: # result_data["l_temp"] = round(status_data["l_temp"], 2) # # if "last_g_temp" in result_data: # result_data["last_g_temp"] = round(status_data["last_g_temp"], 2) return result_data def sample_by_interval(arr, interval=30): if len(arr) <= interval: return arr indices = range(0, len(arr), interval) sampled_array = [arr[i] for i in indices] return sampled_array class AlgorithmUtils: @staticmethod def heat_loss_func(H, A, M1,Cp1): return A*(H+M1*Cp1)/3600 @staticmethod def heat_capacity_func(E1, A, B, M1,Cp1): return (3600*E1)/(A+B)-(M1*Cp1) # 定义二次函数模型 @staticmethod def quadratic_func(x, a, b, c): return a * x**2 + b * x + c @staticmethod def specific_heat_func(E2, K, C,M2,H): return 3600*(E2+K)/C*M2-H/M2 @staticmethod def heat_generation_rate_func(M2, Cp2, H, D ,K): return ((M2*Cp2+H)*D/3600-K)/M2 @staticmethod def get_quadratic_curve(data): x_data = np.array([i[0] for i in data]) y_data = np.array([i[1] for i in data]) degree = 2 # 二次多项式拟合 return np.polyfit(x_data, y_data, degree) def capacity_specify_temperature(self, jz_raw_data, rise_p_fit, drop_p_fit, E1, M1, Cp1): """计算特定温度下的值""" capacity, loss, capacity_p_fit, loss_p_fit = self.capacity_loss(jz_raw_data, rise_p_fit, drop_p_fit, E1, M1, Cp1) rise_temp_values = [] drop_temp_values = [] capacity_values = [] for t in [40, 60, 80, 100]: B = self.quadratic_func(t, rise_p_fit[0], rise_p_fit[1], rise_p_fit[2]) # 升温速率 rise_temp_values.append(round(B, 4)) A = self.quadratic_func(t, drop_p_fit[0], drop_p_fit[1], drop_p_fit[2]) # 降温 drop_temp_values.append(round(A, 4)) H = self.heat_capacity_func(E1, A, B, M1, Cp1) capacity_values.append(H) # 杜瓦瓶热容量 capacity_value = sum(capacity_values)/len(capacity_values) return capacity, loss, rise_temp_values, drop_temp_values, capacity_value def calculate_temp_rate(self, data, num=30): result = [] for i in range(0, len(data)-num, num): time_interval = int(data[i+num][0]) - int(data[i][0]) if time_interval == 0: continue temp_rate = (float(data[i+num][1]) - float(data[i][1])) / time_interval * 3600 result.append([float(data[i+num][1]), abs(temp_rate)]) return result def time_temp_rate(self, raw_data, num=60): r = [] x = [] y1 = [] y2 = [] for i in range(0, len(raw_data) - num, num): time_interval = int(raw_data[i + num][0]) - int(raw_data[i][0]) if time_interval == 0: continue temp_rate = (float(raw_data[i + num][1]) - float(raw_data[i][1])) / time_interval * 3600 r.append([int(raw_data[i+num][0]), float(raw_data[i+num][1]), abs(temp_rate)]) x.append(int(raw_data[i + num][0])) y1.append(float(raw_data[i + num][1])) y2.append(abs(temp_rate)) return r, x, y1, y2 def capacity_loss(self, jz_raw_data, rise_p_fit, drop_p_fit, E1, M1, Cp1): """计算热容量和热损失曲线""" capacity = [] loss = [] raw_data = self.calculate_temp_rate(jz_raw_data, num=60) for item in raw_data: B = self.quadratic_func(item[0], rise_p_fit[0], rise_p_fit[1], rise_p_fit[2]) # 升温速率 A = self.quadratic_func(item[0], drop_p_fit[0], drop_p_fit[1], drop_p_fit[2]) # 降温速率 TODO H = abs(self.heat_capacity_func(E1, A, B, M1, Cp1)) # 杜瓦瓶热容量 K = self.heat_loss_func(H, A, M1, Cp1) # 热损失 capacity.append([item[0], H]) loss.append([item[0], K]) capacity_p_fit = self.get_quadratic_curve(capacity) loss_p_fit = self.get_quadratic_curve(loss) return capacity, loss, capacity_p_fit, loss_p_fit def calculate_heat_generation_rate(self, zs_data, capacity_p_fit, loss_p_fit, E2, M2): # 自加速 self_heatingTemp_data = zs_data.get("self_heatingTemp") self_heatingTemp_rate = self.calculate_temp_rate(self_heatingTemp_data, num=2) self_heatingTemp_p_fit = self.get_quadratic_curve(self_heatingTemp_rate) # 样品 riseTemp_data = zs_data.get("riseTemp") riseTemp_rate = self.calculate_temp_rate(riseTemp_data, num=60) rise_p_fit = self.get_quadratic_curve(riseTemp_rate) Cp2_list = [] Qt_list = [] for item in self_heatingTemp_rate: C = self.quadratic_func(item[0],rise_p_fit[0],rise_p_fit[1],rise_p_fit[2]) K = self.quadratic_func(item[0],loss_p_fit[0],loss_p_fit[1],loss_p_fit[2]) H = self.quadratic_func(item[0],capacity_p_fit[0],capacity_p_fit[1],capacity_p_fit[2]) D = self.quadratic_func(item[0],self_heatingTemp_p_fit[0],self_heatingTemp_p_fit[1],self_heatingTemp_p_fit[2]) Cp2 = self.specific_heat_func(E2, K, C,M2,H) Qt = self.heat_generation_rate_func(M2, Cp2, H, D,K) Cp2_list.append([item[0],Cp2]) Qt_list.append([item[0],Qt]) return Cp2_list, Qt_list class ChannelStatusError(Exception): def __init__(self, message): self.message = message super().__init__(self.message)