You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

322 lines
9.8 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/07/18 15:59:52
'''
import json
import datetime
import random
import os
import platform
import hashlib
import uuid
import psutil
from dateutil import parser, relativedelta
from sqlalchemy import and_
from config.SystemConfig import SystemConfig
from db_logic.client import BllClient
from db_logic.humiture_record import BllHumitureRecord
from models.humiture_models import EntityHumitureRecord
# 判断当前系统是linux还是windows
system_name = platform.system()
"""通用工具类"""
class Utils(object):
def __init__(self, *args, **kwargs):
return super().__init__(*args, **kwargs)
#MD5加密78
def MD5(str):
# 创建md5对象
hl = hashlib.md5()
hl.update(str.encode(encoding='utf-8'))
return hl.hexdigest()
def get_str_datetime():
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#获取唯一识别码
def UUID():
return str(uuid.uuid1())
#获取当前设置服务器IP地址
def get_server_ip_address():
ip = ''
try:
ip = SystemConfig.getConfig('serverip')
except:
ip = ''
return ip
#将sqlAlchemy Sql执行数据对象列表转换为实体列表
def mysqlTable2Model(dataList):
return [dict(zip(result.keys(), [(x if x is not None else '') for x in result.values()])) for result in dataList]
# 根据自定义字段及数据顺序进行返回
def mysqlfinds_list(data, finds):
return [dict(zip(finds, i)) for i in data]
# sqlalchemy 原生sql转换实体列表
def msyql_table_model(data_list):
return[i._mapping for i in data_list]
#生成统一格式接口数据
def resultData(status, message, data=""):
return json.dumps({"status": status, "message": message, "data": data}, ensure_ascii=False)
#按时间生成随机文件名
def getFileName():
nowTime = datetime.datetime.now().strftime("%Y%m%d%H%M%S") # 生成当前的时间
randomNum = random.randint(0, 100) # 生成随机数n,其中0<=n<=100
if randomNum <= 10:
randomNum = str(0) + str(randomNum)
uniqueNum = str(nowTime) + str(randomNum)
return uniqueNum
#获取当前插入U盘路径
@staticmethod
def getUDiskPath():
if system_name == 'Windows':
disk_list = psutil.disk_partitions()
# 获取U盘路径
u_path = [
disk.device for disk in disk_list if disk.opts == 'rw,removable']
if u_path:
return u_path[0]
elif system_name == 'Linux':
r = os.popen('ls -a /media/yanyi')
text = r.read()
r.close()
udisklist = text.splitlines()
if(len(udisklist) >= 3):
return '/media/yanyi/' + udisklist[2]
return ""
#创建文件夹
def mkdir(path):
folder = os.path.exists(path)
if not folder:
os.makedirs(path)
# 生成统一格式接口数据
@staticmethod
def resultAlchemyData(data):
return json.dumps(data)
'''
@currentTem:当前温度值
@setTem设定温度值
'''
def getStableTemValue(cuerrentTem, setTem):
# 差值
diffValue = 0
# 系数
coeValue = 1
diffValue = abs(cuerrentTem - setTem)
if(diffValue > 6):
coeValue = 1
else:
coeValue = 7 - diffValue
if(cuerrentTem > setTem):
return round(setTem + diffValue/coeValue, 1)
else:
return round(setTem - diffValue/coeValue, 1)
# # 获取最近24小时的值
# def get_24_hours(self):
# # 获取当前的小时值
# data_hour_list = []
# curtime_hour = datetime.datetime.now().strftime('%H')
# y = int(curtime_hour)
# for x in range(24):
# data_hour_list.append(str(y) + ':00')
# if y == 0:
# y = 23
# continue
# y -= 1
# return data_hour_list
# def get_all_client_data(self):
# client_obj_list = BllClient().get_all_client_list()
# client_dic_list = []
# for i in client_obj_list:
# client_dic_list.append(
# {
# "client_id": i.client_id,
# "client_name": i.client_name
# }
# )
# return client_dic_list
# # 获取温湿度数据
# def get_temperature_or_humidity(self, params=None):
# client_id = params.get('client_id')
# customer_id = params.get('customer_id')
# tp = params.get('tp')
# # 获取所有客户端名称及id
# client_all_list = self.get_all_client_data()
# # 根据条件获取该客户端的信息
# client_obj = BllClient().get_filter_client(client_id, customer_id)
# temperature_dict = {}
# temperature_count_dict = {}
# now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# start_time = (
# parser.parser(datetime.datetime.now().strftime()) + relativedelta.relativedelta(hourse=-23)
# ).strftime('%Y-%m-%d %H:%M:%S')
# # 获取温湿度的所有对象
# humiture_obj_list = BllHumitureRecord().findList(
# and_(
# EntityHumitureRecord.client_id == client_obj.client_Id,
# EntityHumitureRecord.record_date.between(start_time, now_time)
# )
# ).all()
# for hum_obj in humiture_obj_list:
# # 获取每个对象的record_date的时间值
# hum_hour = hum_obj.record_date.strftime("%H")
# # 判断不在字典中默认赋值为0
# if hum_hour not in temperature_dict:
# temperature_count_dict[hum_hour] = 0
# temperature_dict[hum_hour] = 0
# # 判断获取温度值,获取不到判定为湿度
# if tp:
# temperature_dict[hum_hour] += hum_obj.humidity
# else:
# temperature_dict[hum_hour] += hum_obj.temperature
# # 每次循环+1
# temperature_count_dict[hum_hour] += 1
# # 求平均值
# for ea in temperature_count_dict.keys():
# if ea in temperature_dict:
# temperature_dict[ea] = round(
# temperature_dict[ea] / temperature_count_dict[ea], 2
# )
# hour_list = self.get_24_hours()
# data_temp = []
# # 求当前时间进24小时的温度值和小时值
# for data_hour in hour_list:
# if len(data_hour) == 4:
# data_hour = '0' + data_hour
# if tp == 1:
# data_temp.append(random.randint(55, 68))
# elif tp ==2:
# data_temp.append(0)
# else:
# if str(data_hour[:-3]) in temperature_dict:
# data_temp.append(temperature_dict[data_hour[:-3]])
# else:
# data_temp.append('0.0')
# # 返回字段
# series_list = [{
# "name": client_obj.client_name,
# "type": "line",
# "data": data_temp,
# "markPoint":{
# "data": [
# {"type": "max", "name": "最大值"},
# {"type": "min", "name": "最小值"}
# ]
# },
# "markLine": {
# "data": [
# {"type": "average", "name": "平均值"}
# ]
# }
# }]
# flag_unit_dic = {
# "1": ["最近24小时湿度变化", "%RH"],
# "2": ["最近24小时VOC含量变化", "ppm"],
# }
# flag_unit_dic.get(str(tp), ["最近24小时温度变化", "°C"])
#接口数据返回
def true_return(status=0, msg='请求成功!', data=''):
data = {
"status": status,
"msg": msg,
"data": data
}
return data
def false_return(status=1, msg='请求失败!', data=''):
data = {
"status": status,
"msg": msg,
"data": data
}
return data
def except_return(status=2, msg='', data=''):
data = {
"status": status,
"msg": msg,
"data": data
}
return data
#分页参数模型
class PageParam(object):
def __init__(self, curPage, pageRows, totalRecords=None, orderType=None, orderField=None,):
self.orderType = orderType # 排序类型 asc=正序 desc=倒序
self.orderField = orderField # 排序字段
self.pageRows = pageRows # 单页行数
self.curPage = curPage # 当前页码
self.totalRecords = totalRecords
#获取总页数
#def getTotalPagesCount(self):
# if(self.totalRecords > 0):
# return self.totalRecords % self.pageRows if self.totalRecords % self.pageRows == 0 else self.totalRecords % self.pageRows + 1
# else:
# return 1
#药剂操作类型
class DrugRecordType(object):
#入库操作
PutIn = 1
#领用操作
Use = 2
#归还操作
Return = 3
#药剂状态类型
class DrugStatus(object):
#在库
Normal = 1
#出库
Out = 2
#空瓶
Empty = 3
# 药剂柜状态类
class CabinetStatus(object):
Tem = None # 温度
Hum = None # 湿度
FanOn = None # 风机是否开启
LightOn = None # 照明是否亮
LockIsLock = None # 电锁是否上
DoorIsClose = None # 门是否关上
RunLigthStatus = None # 运行状态是否正常
SignalLightStatus = None # 通信状态是否正常
FilterLifeLightStatus = None # 滤芯寿命是否正常
AbnormalLightStatus = None # 异常灯是否亮