You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

339 lines
11 KiB

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/07/18 15:59:52
'''
import json
import datetime
import random
import os
import platform
import hashlib
import uuid
import psutil
import math
from config.SystemConfig import SystemConfig
# 判断当前系统是linux还是windows
system_name = platform.system()
"""通用工具类"""
class Utils(object):
def __init__(self, *args, **kwargs):
return super().__init__(*args, **kwargs)
#MD5加密78
def MD5(str):
# 创建md5对象
hl = hashlib.md5()
hl.update(str.encode(encoding='utf-8'))
return hl.hexdigest()
def get_str_datetime():
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def sql_paging_assemble(sql_all, page_param):
sql_all += ' limit ' + str((page_param.curPage - 1) * page_param.pageRows) + ',' + str(page_param.pageRows)
return sql_all
def classify(num, all_num, fl=2):
return str(round(float(round(float(num) / all_num, fl)) * 100)) + "%"
# return str(round(float(round(math.ceil(num) / all_num, fl)) * 100)) + "%"
#获取唯一识别码
def UUID():
return str(uuid.uuid1())
#获取当前设置服务器IP地址
def get_server_ip_address():
ip = ''
try:
ip = SystemConfig.getConfig('serverip')
except:
ip = ''
return ip
#将sqlAlchemy Sql执行数据对象列表转换为实体列表
def mysqlTable2Model(dataList):
return [dict(zip(result.keys(), [(x if x is not None else '') for x in result.values()])) for result in dataList]
# 根据自定义字段及数据顺序进行返回
def mysqlfinds_list(data, finds):
return [dict(zip(finds, i)) for i in data]
# sqlalchemy 原生sql转换实体列表
def msyql_table_model(data_list):
return[dict(i._mapping) for i in data_list]
# @classmethod
def to_dict(cls):
return {c.name: getattr(cls, c.name, None) for c in cls.__table__.columns}
# if not tp:
# return {c.name: getattr(cls, c.name, None) for c in cls.__table__.columns}
# else:
# # data_list = []
# # for i in cls:
# # data_list.append(
# # {c.name: getattr(i, c.name, None)
# # for c in i.__table__.columns}
# # )
# return [{c.name: getattr(i, c.name, None) for c in i.__table__.columns} for i in cls]
#生成统一格式接口数据
def resultData(status, message, data=""):
return json.dumps({"status": status, "message": message, "data": data}, ensure_ascii=False)
#按时间生成随机文件名
def getFileName():
nowTime = datetime.datetime.now().strftime("%Y%m%d%H%M%S") # 生成当前的时间
randomNum = random.randint(0, 100) # 生成随机数n,其中0<=n<=100
if randomNum <= 10:
randomNum = str(0) + str(randomNum)
uniqueNum = str(nowTime) + str(randomNum)
return uniqueNum
#获取当前插入U盘路径
@staticmethod
def getUDiskPath():
if system_name == 'Windows':
disk_list = psutil.disk_partitions()
# 获取U盘路径
u_path = [
disk.device for disk in disk_list if disk.opts == 'rw,removable']
if u_path:
return u_path[0]
elif system_name == 'Linux':
r = os.popen('ls -a /media/yanyi')
text = r.read()
r.close()
udisklist = text.splitlines()
if(len(udisklist) >= 3):
return '/media/yanyi/' + udisklist[2]
return ""
#创建文件夹
def mkdir(path):
folder = os.path.exists(path)
if not folder:
os.makedirs(path)
# 生成统一格式接口数据
@staticmethod
def resultAlchemyData(data):
return json.dumps(data)
'''
@currentTem:当前温度值
@setTem设定温度值
'''
def getStableTemValue(cuerrentTem, setTem):
# 差值
diffValue = 0
# 系数
coeValue = 1
diffValue = abs(cuerrentTem - setTem)
if(diffValue > 6):
coeValue = 1
else:
coeValue = 7 - diffValue
if(cuerrentTem > setTem):
return round(setTem + diffValue/coeValue, 1)
else:
return round(setTem - diffValue/coeValue, 1)
# # 获取最近24小时的值
# def get_24_hours(self):
# # 获取当前的小时值
# data_hour_list = []
# curtime_hour = datetime.datetime.now().strftime('%H')
# y = int(curtime_hour)
# for x in range(24):
# data_hour_list.append(str(y) + ':00')
# if y == 0:
# y = 23
# continue
# y -= 1
# return data_hour_list
# def get_all_client_data(self):
# client_obj_list = BllClient().get_all_client_list()
# client_dic_list = []
# for i in client_obj_list:
# client_dic_list.append(
# {
# "client_id": i.client_id,
# "client_name": i.client_name
# }
# )
# return client_dic_list
# # 获取温湿度数据
# def get_temperature_or_humidity(self, params=None):
# client_id = params.get('client_id')
# customer_id = params.get('customer_id')
# tp = params.get('tp')
# # 获取所有客户端名称及id
# client_all_list = self.get_all_client_data()
# # 根据条件获取该客户端的信息
# client_obj = BllClient().get_filter_client(client_id, customer_id)
# temperature_dict = {}
# temperature_count_dict = {}
# now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# start_time = (
# parser.parser(datetime.datetime.now().strftime()) + relativedelta.relativedelta(hourse=-23)
# ).strftime('%Y-%m-%d %H:%M:%S')
# # 获取温湿度的所有对象
# humiture_obj_list = BllHumitureRecord().findList(
# and_(
# EntityHumitureRecord.client_id == client_obj.client_Id,
# EntityHumitureRecord.record_date.between(start_time, now_time)
# )
# ).all()
# for hum_obj in humiture_obj_list:
# # 获取每个对象的record_date的时间值
# hum_hour = hum_obj.record_date.strftime("%H")
# # 判断不在字典中默认赋值为0
# if hum_hour not in temperature_dict:
# temperature_count_dict[hum_hour] = 0
# temperature_dict[hum_hour] = 0
# # 判断获取温度值,获取不到判定为湿度
# if tp:
# temperature_dict[hum_hour] += hum_obj.humidity
# else:
# temperature_dict[hum_hour] += hum_obj.temperature
# # 每次循环+1
# temperature_count_dict[hum_hour] += 1
# # 求平均值
# for ea in temperature_count_dict.keys():
# if ea in temperature_dict:
# temperature_dict[ea] = round(
# temperature_dict[ea] / temperature_count_dict[ea], 2
# )
# hour_list = self.get_24_hours()
# data_temp = []
# # 求当前时间进24小时的温度值和小时值
# for data_hour in hour_list:
# if len(data_hour) == 4:
# data_hour = '0' + data_hour
# if tp == 1:
# data_temp.append(random.randint(55, 68))
# elif tp ==2:
# data_temp.append(0)
# else:
# if str(data_hour[:-3]) in temperature_dict:
# data_temp.append(temperature_dict[data_hour[:-3]])
# else:
# data_temp.append('0.0')
# # 返回字段
# series_list = [{
# "name": client_obj.client_name,
# "type": "line",
# "data": data_temp,
# "markPoint":{
# "data": [
# {"type": "max", "name": "最大值"},
# {"type": "min", "name": "最小值"}
# ]
# },
# "markLine": {
# "data": [
# {"type": "average", "name": "平均值"}
# ]
# }
# }]
# flag_unit_dic = {
# "1": ["最近24小时湿度变化", "%RH"],
# "2": ["最近24小时VOC含量变化", "ppm"],
# }
# flag_unit_dic.get(str(tp), ["最近24小时温度变化", "°C"])
#接口数据返回
def true_return(status=0, msg='请求成功!', data=''):
data = {
"status": status,
"msg": msg,
"data": data
}
return data
def false_return(status=1, msg='请求失败!', data=''):
data = {
"status": status,
"msg": msg,
"data": data
}
return data
def except_return(status=2, msg='', data=''):
data = {
"status": status,
"msg": msg,
"data": data
}
return data
#分页参数模型
class PageParam(object):
def __init__(self, curPage, pageRows, totalRecords=None, orderType=None, orderField=None,):
self.orderType = orderType # 排序类型 asc=正序 desc=倒序
self.orderField = orderField # 排序字段
self.pageRows = pageRows # 单页行数
self.curPage = curPage # 当前页码
self.totalRecords = totalRecords
#获取总页数
#def getTotalPagesCount(self):
# if(self.totalRecords > 0):
# return self.totalRecords % self.pageRows if self.totalRecords % self.pageRows == 0 else self.totalRecords % self.pageRows + 1
# else:
# return 1
#药剂操作类型
class DrugRecordType(object):
#入库操作
PutIn = 1
#领用操作
Use = 2
#归还操作
Return = 3
#药剂状态类型
class DrugStatus(object):
#在库
Normal = 1
#出库
Out = 2
#空瓶
Empty = 3
# 药剂柜状态类
class CabinetStatus(object):
Tem = None # 温度
Hum = None # 湿度
FanOn = None # 风机是否开启
LightOn = None # 照明是否亮
LockIsLock = None # 电锁是否上
DoorIsClose = None # 门是否关上
RunLigthStatus = None # 运行状态是否正常
SignalLightStatus = None # 通信状态是否正常
FilterLifeLightStatus = None # 滤芯寿命是否正常
AbnormalLightStatus = None # 异常灯是否亮