You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

384 lines
12 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
@Date:2022/07/18 15:59:52
import json
import datetime
import random
import os
import platform
import hashlib
import uuid
import psutil
import math
from decimal import Decimal
from config.SystemConfig import SystemConfig
# 判断当前系统是linux还是windows
system_name = platform.system()
class Utils(object):
def __init__(self, *args, **kwargs):
return super().__init__(*args, **kwargs)
# 反转bar_code
def get_bar_code_reverse(bar_code):
if len(bar_code) > 10:
new_code = ''
for i in range(int(len(bar_code) / 2)):
new_code = bar_code[i*2:(i+1)*2] + new_code
new_code = bar_code
return bar_code, new_code
def getDrugUnitCode(num):
return f"{Utils.get_file_name_datetime()[:8]}{(3-len(str(num+1)))* '0' + str(num+1)}"
def MD5(str):
# 创建md5对象
hl = hashlib.md5()
return hl.hexdigest()
def get_str_datetime():
return"%Y-%m-%d %H:%M:%S")
def get_file_name_datetime():
def sql_paging_assemble(sql_all, page_param):
sql_all += ' limit ' + str((page_param.curPage - 1) * page_param.pageRows) + ',' + str(page_param.pageRows)
return sql_all
def classify(num, all_num, fl=2):
return str(round(float(round(float(num) / all_num, fl)) * 100)) + "%"
# return str(round(float(round(math.ceil(num) / all_num, fl)) * 100)) + "%"
def model_to_dict(cls):
return { getattr(cls,, None) for c in cls.__table__.columns}
def UUID():
return str(uuid.uuid1())
def get_server_ip_address():
ip = ''
ip = SystemConfig.getConfig('serverip')
ip = ''
return ip
#将sqlAlchemy Sql执行数据对象列表转换为实体列表
def mysqlTable2Model(dataList):
return [dict(zip(result.keys(), [(x if x is not None else '') for x in result.values()])) for result in dataList]
# 根据自定义字段及数据顺序进行返回
def mysqlfinds_list(data, finds):
return [dict(zip(finds, i)) for i in data]
# sqlalchemy 原生sql转换实体列表
def msyql_table_model(data_list):
return[dict(i._mapping) for i in data_list]
# try:
# return [dict(zip(result.keys(), [(x if x is not None else '') for x in result.values()])) for result in data_list]
# except:
# return [dict(zip(i.keys(), i)) for i in data_list]
# @classmethod
def to_dict(cls):
return { getattr(cls,, None) for c in cls.__table__.columns}
# if not tp:
# return { getattr(cls,, None) for c in cls.__table__.columns}
# else:
# # data_list = []
# # for i in cls:
# # data_list.append(
# # { getattr(i,, None)
# # for c in i.__table__.columns}
# # )
# return [{ getattr(i,, None) for c in i.__table__.columns} for i in cls]
def resultData(status, message, data=""):
return json.dumps({"status": status, "message": message, "data": data}, ensure_ascii=False)
def getFileName():
nowTime ="%Y%m%d%H%M%S") # 生成当前的时间
randomNum = random.randint(0, 100) # 生成随机数n,其中0<=n<=100
if randomNum <= 10:
randomNum = str(0) + str(randomNum)
uniqueNum = str(nowTime) + str(randomNum)
return uniqueNum
def getUDiskPath():
if system_name == 'Windows':
disk_list = psutil.disk_partitions()
# 获取U盘路径
u_path = [
disk.device for disk in disk_list if disk.opts == 'rw,removable']
if u_path:
return u_path[0]
elif system_name == 'Linux':
r = os.popen('ls -a /media/yanyi')
text =
udisklist = text.splitlines()
if(len(udisklist) >= 3):
return '/media/yanyi/' + udisklist[2]
return ""
def mkdir(path):
folder = os.path.exists(path)
if not folder:
# 生成统一格式接口数据
def resultAlchemyData(data):
return json.dumps(data)
def getStableTemValue(cuerrentTem, setTem):
# 差值
diffValue = 0
# 系数
coeValue = 1
diffValue = abs(cuerrentTem - setTem)
if(diffValue > 6):
coeValue = 1
coeValue = 7 - diffValue
if(cuerrentTem > setTem):
return round(setTem + diffValue/coeValue, 1)
return round(setTem - diffValue/coeValue, 1)
# # 获取最近24小时的值
# def get_24_hours(self):
# # 获取当前的小时值
# data_hour_list = []
# curtime_hour ='%H')
# y = int(curtime_hour)
# for x in range(24):
# data_hour_list.append(str(y) + ':00')
# if y == 0:
# y = 23
# continue
# y -= 1
# return data_hour_list
# def get_all_client_data(self):
# client_obj_list = BllClient().get_all_client_list()
# client_dic_list = []
# for i in client_obj_list:
# client_dic_list.append(
# {
# "client_id": i.client_id,
# "client_name": i.client_name
# }
# )
# return client_dic_list
# # 获取温湿度数据
# def get_temperature_or_humidity(self, params=None):
# client_id = params.get('client_id')
# customer_id = params.get('customer_id')
# tp = params.get('tp')
# # 获取所有客户端名称及id
# client_all_list = self.get_all_client_data()
# # 根据条件获取该客户端的信息
# client_obj = BllClient().get_filter_client(client_id, customer_id)
# temperature_dict = {}
# temperature_count_dict = {}
# now_time ='%Y-%m-%d %H:%M:%S')
# start_time = (
# parser.parser( + relativedelta.relativedelta(hourse=-23)
# ).strftime('%Y-%m-%d %H:%M:%S')
# # 获取温湿度的所有对象
# humiture_obj_list = BllHumitureRecord().findList(
# and_(
# EntityHumitureRecord.client_id == client_obj.client_Id,
# EntityHumitureRecord.record_date.between(start_time, now_time)
# )
# ).all()
# for hum_obj in humiture_obj_list:
# # 获取每个对象的record_date的时间值
# hum_hour = hum_obj.record_date.strftime("%H")
# # 判断不在字典中默认赋值为0
# if hum_hour not in temperature_dict:
# temperature_count_dict[hum_hour] = 0
# temperature_dict[hum_hour] = 0
# # 判断获取温度值,获取不到判定为湿度
# if tp:
# temperature_dict[hum_hour] += hum_obj.humidity
# else:
# temperature_dict[hum_hour] += hum_obj.temperature
# # 每次循环+1
# temperature_count_dict[hum_hour] += 1
# # 求平均值
# for ea in temperature_count_dict.keys():
# if ea in temperature_dict:
# temperature_dict[ea] = round(
# temperature_dict[ea] / temperature_count_dict[ea], 2
# )
# hour_list = self.get_24_hours()
# data_temp = []
# # 求当前时间进24小时的温度值和小时值
# for data_hour in hour_list:
# if len(data_hour) == 4:
# data_hour = '0' + data_hour
# if tp == 1:
# data_temp.append(random.randint(55, 68))
# elif tp ==2:
# data_temp.append(0)
# else:
# if str(data_hour[:-3]) in temperature_dict:
# data_temp.append(temperature_dict[data_hour[:-3]])
# else:
# data_temp.append('0.0')
# # 返回字段
# series_list = [{
# "name": client_obj.client_name,
# "type": "line",
# "data": data_temp,
# "markPoint":{
# "data": [
# {"type": "max", "name": "最大值"},
# {"type": "min", "name": "最小值"}
# ]
# },
# "markLine": {
# "data": [
# {"type": "average", "name": "平均值"}
# ]
# }
# }]
# flag_unit_dic = {
# "1": ["最近24小时湿度变化", "%RH"],
# "2": ["最近24小时VOC含量变化", "ppm"],
# }
# flag_unit_dic.get(str(tp), ["最近24小时温度变化", "°C"])
def true_return(status=0, msg='请求成功!', data=''):
data = {
"status": status,
"msg": msg,
"data": data
return data
def false_return(status=1, msg='请求失败!', data=''):
data = {
"status": status,
"msg": msg,
"data": data
return data
def except_return(status=2, msg='', data=''):
data = {
"status": status,
"msg": msg,
"data": data
return data
def reserve_decimal(num, n=2):
:param num: 浮点数
:param n: 小数位数
deci = "0." + "0" * n
return Decimal(num).quantize(Decimal(deci))
if __name__ == '__main__':
a = Utils.get_file_name_datetime()[:4]
class PageParam(object):
def __init__(self, curPage, pageRows, totalRecords=None, orderType=None, orderField=None,):
self.orderType = orderType # 排序类型 asc=正序 desc=倒序
self.orderField = orderField # 排序字段
self.pageRows = pageRows # 单页行数
self.curPage = curPage # 当前页码
self.totalRecords = totalRecords
#def getTotalPagesCount(self):
# if(self.totalRecords > 0):
# return self.totalRecords % self.pageRows if self.totalRecords % self.pageRows == 0 else self.totalRecords % self.pageRows + 1
# else:
# return 1
class DrugRecordType(object):
PutIn = 1
Use = 2
Return = 3
# 登记操作
Register = 4
class DrugStatus(object):
Normal = 1
Out = 2
Empty = 3
# 药剂柜状态类
class CabinetStatus(object):
Tem = None # 温度
Hum = None # 湿度
FanOn = None # 风机是否开启
LightOn = None # 照明是否亮
LockIsLock = None # 电锁是否上
DoorIsClose = None # 门是否关上
RunLigthStatus = None # 运行状态是否正常
SignalLightStatus = None # 通信状态是否正常
FilterLifeLightStatus = None # 滤芯寿命是否正常
AbnormalLightStatus = None # 异常灯是否亮
class DooropenUser():
user_id =''