You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
4.2 KiB

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/07/19 13:54:49
'''
import sys
sys.path.append(".")
import random
from sqlalchemy import or_, desc
from Common.Utils import Utils
from db_logic.db_base import Repository
from models.warning_models import EntityWarning
#预警信息业务逻辑类
class BllWarning(Repository):
def __init__(self, entityType=EntityWarning):
return super().__init__(entityType)
#获取预警列表
def getWarningList(self, pageParam=None,key_word=None, object_type=None, start_time=None, end_time=None):
filter_base = ""
if key_word:
key_word = f"'%{key_word}%'"
2 years ago
filter_base += f" (object_name like {key_word} or warning_user_name like {key_word} or warning_content like {key_word}) "
if object_type:
if filter_base:
filter_base += " and "
filter_base += f" object_type={object_type} "
if start_time and end_time:
if filter_base:
filter_base += " and "
2 years ago
filter_base += f" warning_date >='{start_time}' and warning_date <= '{end_time}' "
if filter_base:
filter_base = f" where {filter_base}"
sql_all = f"""
select * from rms_warning {filter_base} ORDER BY warning_date DESC
"""
if not pageParam:
2 years ago
sql_all += " limit 5 "
else:
2 years ago
# sql_all += " ORDER BY warning_date DESC "
try:
total_count = len(self.execute(sql_all).fetchall())
except Exception:
total_count = 0
pageParam.totalRecords = total_count
sql_all = Utils.sql_paging_assemble(sql_all=sql_all, page_param=pageParam)
return self.execute(sql_all).fetchall()
# keyWord='%'+keyWord+'%'
# orm_query= self.findList().filter(
# EntityWarning.customer_id==customerId
# ).filter(
# or_(EntityWarning.object_name.like(keyWord),
# EntityWarning.warning_user_name.like(keyWord))
# ).order_by(desc(EntityWarning.warning_date))
# return self.queryPage(orm_query,pageParam)
#获取预警类型数量
def getWarningCount(self):
recordList=self.findList().all()
# 过期预警列表
gqList = [record for record in recordList if record.object_type == '2']
# 余量预警列表
ylList = [record for record in recordList if record.object_type == '3']
return (len(recordList),len(gqList),len(ylList))
def get_waring_type_classify(self):
sql_all = """
SELECT count(warning_id) type_number, object_type FROM `rms_warning` GROUP BY object_type
"""
data_list = self.execute(sql_all).fetchall()
all_number = sum([i[0] for i in data_list])
data_list = Utils.msyql_table_model(data_list)
new_data_list = []
for i in data_list:
new_dic = {
# "ratio": str(round(i["type_number"] / all_number, 2) * 100) + "%"
"ratio": Utils.classify(i["type_number"], all_number)
}
new_dic.update(**i)
new_data_list.append(new_dic)
return new_data_list, all_number
def create_bluk_waring(self):
data = list(self.execute(
"select medicament_id, name from rms_medicament").fetchall())
inster_list = []
for i in range(1000):
objs = random.choice(data)
inster_list.append(
EntityWarning(
customer_id='1c39cb2a-07f8-11ed-972d-f47b094925e1',
object_type=random.randint(1, 7),
object_id=objs[0],
object_name=objs[1],
warning_content=f'测试预警数据{i}',
warning_date=Utils.get_str_datetime()
)
)
return self.insert_many(inster_list)
if __name__ == '__main__':
a = BllWarning().create_bluk_waring()
print(a)
# base_ratio = 0
# base_number = 0
# for i in a:
# print(i)
# # base_ratio += i["ratio"]
# base_number += i['type_number']
# print(base_ratio)
# print(base_number)