#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @Date:2022/07/18 15:59:52 ''' import json import datetime import random import os import platform import hashlib import uuid import psutil from dateutil import parser, relativedelta from sqlalchemy import and_ from config.SystemConfig import SystemConfig from db_logic.client import BllClient from db_logic.humiture_record import BllHumitureRecord from models.humiture_models import EntityHumitureRecord # 判断当前系统是linux还是windows system_name = platform.system() """通用工具类""" class Utils(object): def __init__(self, *args, **kwargs): return super().__init__(*args, **kwargs) #MD5加密78 def MD5(str): # 创建md5对象 hl = hashlib.md5() hl.update(str.encode(encoding='utf-8')) return hl.hexdigest() def get_str_datetime(): return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") #获取唯一识别码 def UUID(): return str(uuid.uuid1()) #获取当前设置服务器IP地址 def get_server_ip_address(): ip = '' try: ip = SystemConfig.getConfig('serverip') except: ip = '' return ip #将sqlAlchemy Sql执行数据对象列表转换为实体列表 def mysqlTable2Model(dataList): # mac = uuid.UUID(int=uuid.getnode()).hex[-12:] #return [dict(zip(result.keys(), result.values())) for result in dataList] return [dict(zip(result.keys(), [(x if x is not None else '') for x in result.values()])) for result in dataList] #生成统一格式接口数据 def resultData(status, message, data=""): return json.dumps({"status": status, "message": message, "data": data}, ensure_ascii=False) #按时间生成随机文件名 def getFileName(): nowTime = datetime.datetime.now().strftime("%Y%m%d%H%M%S") # 生成当前的时间 randomNum = random.randint(0, 100) # 生成随机数n,其中0<=n<=100 if randomNum <= 10: randomNum = str(0) + str(randomNum) uniqueNum = str(nowTime) + str(randomNum) return uniqueNum #获取当前插入U盘路径 @staticmethod def getUDiskPath(): if system_name == 'Windows': disk_list = psutil.disk_partitions() # 获取U盘路径 u_path = [ disk.device for disk in disk_list if disk.opts == 'rw,removable'] if u_path: return u_path[0] elif system_name == 'Linux': r = os.popen('ls -a /media/yanyi') text = r.read() r.close() udisklist = text.splitlines() if(len(udisklist) >= 3): return '/media/yanyi/' + udisklist[2] return "" #创建文件夹 def mkdir(path): folder = os.path.exists(path) if not folder: os.makedirs(path) # 生成统一格式接口数据 @staticmethod def resultAlchemyData(data): return json.dumps(data) ''' @currentTem:当前温度值 @setTem:设定温度值 ''' def getStableTemValue(cuerrentTem, setTem): # 差值 diffValue = 0 # 系数 coeValue = 1 diffValue = abs(cuerrentTem - setTem) if(diffValue > 6): coeValue = 1 else: coeValue = 7 - diffValue if(cuerrentTem > setTem): return round(setTem + diffValue/coeValue, 1) else: return round(setTem - diffValue/coeValue, 1) # # 获取最近24小时的值 # def get_24_hours(self): # # 获取当前的小时值 # data_hour_list = [] # curtime_hour = datetime.datetime.now().strftime('%H') # y = int(curtime_hour) # for x in range(24): # data_hour_list.append(str(y) + ':00') # if y == 0: # y = 23 # continue # y -= 1 # return data_hour_list # def get_all_client_data(self): # client_obj_list = BllClient().get_all_client_list() # client_dic_list = [] # for i in client_obj_list: # client_dic_list.append( # { # "client_id": i.client_id, # "client_name": i.client_name # } # ) # return client_dic_list # # 获取温湿度数据 # def get_temperature_or_humidity(self, params=None): # client_id = params.get('client_id') # customer_id = params.get('customer_id') # tp = params.get('tp') # # 获取所有客户端名称及id # client_all_list = self.get_all_client_data() # # 根据条件获取该客户端的信息 # client_obj = BllClient().get_filter_client(client_id, customer_id) # temperature_dict = {} # temperature_count_dict = {} # now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') # start_time = ( # parser.parser(datetime.datetime.now().strftime()) + relativedelta.relativedelta(hourse=-23) # ).strftime('%Y-%m-%d %H:%M:%S') # # 获取温湿度的所有对象 # humiture_obj_list = BllHumitureRecord().findList( # and_( # EntityHumitureRecord.client_id == client_obj.client_Id, # EntityHumitureRecord.record_date.between(start_time, now_time) # ) # ).all() # for hum_obj in humiture_obj_list: # # 获取每个对象的record_date的时间值 # hum_hour = hum_obj.record_date.strftime("%H") # # 判断不在字典中默认赋值为0 # if hum_hour not in temperature_dict: # temperature_count_dict[hum_hour] = 0 # temperature_dict[hum_hour] = 0 # # 判断获取温度值,获取不到判定为湿度 # if tp: # temperature_dict[hum_hour] += hum_obj.humidity # else: # temperature_dict[hum_hour] += hum_obj.temperature # # 每次循环+1 # temperature_count_dict[hum_hour] += 1 # # 求平均值 # for ea in temperature_count_dict.keys(): # if ea in temperature_dict: # temperature_dict[ea] = round( # temperature_dict[ea] / temperature_count_dict[ea], 2 # ) # hour_list = self.get_24_hours() # data_temp = [] # # 求当前时间进24小时的温度值和小时值 # for data_hour in hour_list: # if len(data_hour) == 4: # data_hour = '0' + data_hour # if tp == 1: # data_temp.append(random.randint(55, 68)) # elif tp ==2: # data_temp.append(0) # else: # if str(data_hour[:-3]) in temperature_dict: # data_temp.append(temperature_dict[data_hour[:-3]]) # else: # data_temp.append('0.0') # # 返回字段 # series_list = [{ # "name": client_obj.client_name, # "type": "line", # "data": data_temp, # "markPoint":{ # "data": [ # {"type": "max", "name": "最大值"}, # {"type": "min", "name": "最小值"} # ] # }, # "markLine": { # "data": [ # {"type": "average", "name": "平均值"} # ] # } # }] # flag_unit_dic = { # "1": ["最近24小时湿度变化", "%RH"], # "2": ["最近24小时VOC含量变化", "ppm"], # } # flag_unit_dic.get(str(tp), ["最近24小时温度变化", "°C"]) #接口数据返回 def true_return(status=0, msg='请求成功!', data=''): data = { "status": status, "msg": msg, "data": data } return data def false_return(status=1, msg='请求失败!', data=''): data = { "status": status, "msg": msg, "data": data } return data def except_return(status=2, msg='', data=''): data = { "status": status, "msg": msg, "data": data } return data #分页参数模型 class PageParam(object): def __init__(self, curPage, pageRows, totalRecords=None, orderType=None, orderField=None,): self.orderType = orderType # 排序类型 asc=正序 desc=倒序 self.orderField = orderField # 排序字段 self.pageRows = pageRows # 单页行数 self.curPage = curPage # 当前页码 self.totalRecords = totalRecords #获取总页数 #def getTotalPagesCount(self): # if(self.totalRecords > 0): # return self.totalRecords % self.pageRows if self.totalRecords % self.pageRows == 0 else self.totalRecords % self.pageRows + 1 # else: # return 1 #药剂操作类型 class DrugRecordType(object): #入库操作 PutIn = 1 #领用操作 Use = 2 #归还操作 Return = 3 #药剂状态类型 class DrugStatus(object): #在库 Normal = 1 #出库 Out = 2 #空瓶 Empty = 3 # 药剂柜状态类 class CabinetStatus(object): Tem = None # 温度 Hum = None # 湿度 FanOn = None # 风机是否开启 LightOn = None # 照明是否亮 LockIsLock = None # 电锁是否上 DoorIsClose = None # 门是否关上 RunLigthStatus = None # 运行状态是否正常 SignalLightStatus = None # 通信状态是否正常 FilterLifeLightStatus = None # 滤芯寿命是否正常 AbnormalLightStatus = None # 异常灯是否亮