#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @Date:2022/07/19 13:54:49 ''' import sys sys.path.append(".") import random from sqlalchemy import or_, desc from Common.Utils import Utils from db_logic.db_base import Repository from models.warning_models import EntityWarning #预警信息业务逻辑类 class BllWarning(Repository): def __init__(self, entityType=EntityWarning): return super().__init__(entityType) #获取预警列表 def getWarningList(self, pageParam=None,key_word=None, object_type=None, start_time=None, end_time=None): filter_base = "" if key_word: key_word = f"'%{key_word}%'" filter_base += f" (object_name like {key_word} or warning_user_name like {key_word} or warning_content like {key_word}) " if object_type: if filter_base: filter_base += " and " filter_base += f" object_type={object_type} " if start_time and end_time: if filter_base: filter_base += " and " filter_base += f" warning_date >='{start_time}' and warning_date <= '{end_time}' " if filter_base: filter_base = f" where {filter_base}" sql_all = f""" select * from rms_warning {filter_base} ORDER BY warning_date DESC """ if not pageParam: sql_all += " limit 5 " else: # sql_all += " ORDER BY warning_date DESC " try: total_count = len(self.execute(sql_all).fetchall()) except Exception: total_count = 0 pageParam.totalRecords = total_count sql_all = Utils.sql_paging_assemble(sql_all=sql_all, page_param=pageParam) return self.execute(sql_all).fetchall() # keyWord='%'+keyWord+'%' # orm_query= self.findList().filter( # EntityWarning.customer_id==customerId # ).filter( # or_(EntityWarning.object_name.like(keyWord), # EntityWarning.warning_user_name.like(keyWord)) # ).order_by(desc(EntityWarning.warning_date)) # return self.queryPage(orm_query,pageParam) #获取预警类型数量 def getWarningCount(self): recordList=self.findList().all() # 过期预警列表 gqList = [record for record in recordList if record.object_type == '2'] # 余量预警列表 ylList = [record for record in recordList if record.object_type == '3'] return (len(recordList),len(gqList),len(ylList)) def get_waring_type_classify(self): sql_all = """ SELECT count(warning_id) type_number, object_type FROM `rms_warning` GROUP BY object_type """ data_list = self.execute(sql_all).fetchall() all_number = sum([i[0] for i in data_list]) data_list = Utils.msyql_table_model(data_list) new_data_list = [] for i in data_list: new_dic = { # "ratio": str(round(i["type_number"] / all_number, 2) * 100) + "%" "ratio": Utils.classify(i["type_number"], all_number) } new_dic.update(**i) new_data_list.append(new_dic) return new_data_list, all_number def create_bluk_waring(self): data = list(self.execute( "select medicament_id, name from rms_medicament").fetchall()) inster_list = [] for i in range(1000): objs = random.choice(data) inster_list.append( EntityWarning( customer_id='1c39cb2a-07f8-11ed-972d-f47b094925e1', object_type=random.randint(1, 7), object_id=objs[0], object_name=objs[1], warning_content=f'测试预警数据{i}', warning_date=Utils.get_str_datetime() ) ) return self.insert_many(inster_list) if __name__ == '__main__': a = BllWarning().create_bluk_waring() print(a) # base_ratio = 0 # base_number = 0 # for i in a: # print(i) # # base_ratio += i["ratio"] # base_number += i['type_number'] # print(base_ratio) # print(base_number)