完善主概览接口;

完善报表界面接口
duizhaopin
apan 2 years ago
parent 4a6858028c
commit 2d74ff1d59

@ -38,6 +38,9 @@ class Utils(object):
def get_str_datetime():
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def classify(num, all_num, fl=2):
return str(round(float(round(num / all_num, fl)) * 100)) + "%"
#获取唯一识别码
def UUID():
return str(uuid.uuid1())

@ -0,0 +1,49 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/07/27 15:55:01
'''
from flask import g
from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth
from db_logic.user import BllUser
from models.user_models import EntityUser
from common.errors import error_response
basic_auth = HTTPBasicAuth()
token_auth = HTTPTokenAuth()
@basic_auth.verify_password
def verify_password(username, password):
'''用于检查用户提供的用户名和密码'''
user = EntityUser.query.filter_by(username=username).first()
print(username, password)
if user is None:
return False
g.current_user = user
return user.check_password(password)
@basic_auth.error_handler
def basic_auth_error():
'''用于在认证失败的情况下返回错误响应'''
return error_response(401)
@token_auth.verify_token
def verify_token(token):
'''用于检查用户请求是否有token并且token真实存在还在有效期内'''
# g.current_user = User.verify_jwt(token) if token else None
g.current_user = BllUser().verify_jwt(token)
if g.current_user:
# 每次认证通过后即将访问资源API更新 last_seen 时间
g.current_user.update_last_visit_date()
BllUser().update(g.current_user)
return g.current_user is not None
@token_auth.error_handler
def token_auth_error():
'''用于在 Token Auth 认证失败的情况下返回错误响应'''
return error_response(401)

@ -0,0 +1,19 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/07/27 16:11:57
'''
from flask import jsonify
from werkzeug.http import HTTP_STATUS_CODES
from common.utils import Utils
def error_response(status_code, message=None):
payload = Utils.except_return(
msg="请登录", status=status_code)
# payload = {'error': HTTP_STATUS_CODES.get(status_code, 'Unknown error')}
if message:
payload['msg'] = message
response = jsonify(payload)
response.status_code = 200
return response

@ -8,8 +8,6 @@ from db_logic.medicament import BllMedicament
from models.warning_models import EntityWarning
# from apps.home import home_router
from db_logic.medicament_record import BllMedicamentRecord
from db_logic.warning import BllWarning
@ -22,7 +20,6 @@ home_router = Blueprint('home', __name__, url_prefix="/home")
# 获取页面主概览数据 --- 查询条件缺少柜体id客户id
@home_router.route('/home_number', methods=["GET", "POST"])
# def getRecordTypeDrugRecordListJson():
def get_record_type_drug_record_json():
customer_id = request.values.get('customer_id')
client_id = request.values.get('client_id')
@ -49,6 +46,10 @@ def drug_remaining():
return jsonify(Utils.true_return(data={"total_count": page_param.totalRecords, "data": Utils.msyql_table_model(data_list)}))
# 预警信息总览
@home_router.route("/warning_info_classify", methods=["GET", "POST"])
def warning_info_classify():
data, total_num = BllWarning().get_waring_type_classify()
return jsonify(Utils.true_return(data={"total_count": total_num, "data": data}))
# 获取预警信息列表接口
@ -72,6 +73,7 @@ def get_warning_list():
# 修改预警状态根据预警id进行
@home_router.route("/update_warning_tp", methods=["GET", "POST"])
def update_warning_type():
warning_id = request.values.get('warning_id')
obj = BllWarning().findEntity(EntityWarning.warning_id == warning_id)
# obj.object_type = 2
@ -81,4 +83,7 @@ def update_warning_type():
obj.is_solve=1
BllWarning().update(obj)
return jsonify(Utils.true_return())
return jsonify(Utils.true_return())

@ -17,19 +17,27 @@ report_router = Blueprint("report", __name__)
@report_router.route('/report_home', methods=["GET", "POST"])
def report_home_info():
# 试剂使用统计
base_data = BllMedicamentRecord().get_drug_record_count()
data = []
for i in base_data:
data.append(
{
"tp":i[0],
"number":i[1],
"date_time": i[2]
}
)
return jsonify(Utils.true_return(data=data))
data_list = BllMedicamentRecord().get_drug_record_count()
return jsonify(Utils.true_return(data=data_list))
# 消耗统计
@report_router.route("/drug_stock_use_classify", methods=["GET", "POST"])
def get_drug_stock_use_classify():
data_list, total_number = BllMedicament().get_drug_stock_use_classify()
return jsonify(Utils.true_return(data={"total_count":total_number, "data":data_list}))
# 试剂用量消耗
@report_router.route("/drug_use_classify", methods=["GET", "POST"])
def get_drug_use_classify():
data_list = BllMedicamentRecord().report_home_drug_useing_classify()
return jsonify(Utils.true_return(data=data_list))
@report_router.route("/drug_user_use_info", methods=["GET", "POST"])
def get_drug_user_use_number():
data_list = BllMedicamentRecord().report_home_user_use_info()
return jsonify(Utils.true_return(data=data_list))
# 库存信息总览
@report_router.route("/stock_data_info", methods=["GET", "POST"])

@ -1,10 +0,0 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/07/19 16:34:17
'''
from flask import Blueprint
user_router = Blueprint('user', __name__)

@ -3,21 +3,42 @@
'''
@Date:2022/07/19 16:34:34
'''
from flask import jsonify, request, session
from flask import jsonify, request, Blueprint, g
from common.auth import token_auth
from common.utils import Utils
from . import user_router
from flask_siwadoc import SiwaDoc
from pydantic import BaseModel
from db_logic.user import BllUser
@user_router.router("/login", methods=["POST"])
user_router = Blueprint('user', __name__)
doc_siwa = SiwaDoc(user_router)
# class LoginBase(BaseModel):
# user_name: str
# password: str
@user_router.route("/login", methods=["POST"])
# @token_auth.login_required
# @doc_siwa.doc(body=LoginBase)
def user_login():
user_name = request.values.get("user_name")
user_pwd = request.values.get('password')
user_obj = BllUser.login(user_name, user_pwd)
password = request.values.get('password')
# user_name = body.user_name
# password = body.password
user_obj = BllUser().login(user_name=user_name, password=password)
if user_obj:
session['user_id'] = user_obj.user_id
session['customer_id'] = user_obj.customer_id
return jsonify(Utils.true_return(msg="登陆成功"))
return jsonify(Utils.true_return(msg="登陆成功", data={"token":user_obj}))
else:
return jsonify(Utils.false_return(status=201, msg="登陆失败"))
@user_router.route("/get_user_power", methods=["GET"])
@token_auth.login_required
def get_user_power():
print(g.current_user.user_id)
return jsonify(Utils.true_return())

@ -11,6 +11,7 @@ import decimal
from flask import Flask as New_flask
from flask.json import JSONEncoder as _JSONEncoder
from flask_cors import CORS
from flask_siwadoc import SiwaDoc
from sqlalchemy.ext.declarative import DeclarativeMeta
@ -65,20 +66,22 @@ def register_blueprints(app: Flask):
"""
"""
# from apps import all_router
from apps.home.views import home_router
# from apps.home.views import home_router
from apps.report.views import report_router
from apps.user.views import user_router
# from apps.report.views import report_router
# all_router = Blueprint('all_router', __name__)
app.register_blueprint(home_router, url_prefix="/api/home")
app.register_blueprint(report_router, url_prefix="/api/report")
# app.register_blueprint(home_router, url_prefix="/api/home")
# app.register_blueprint(report_router, url_prefix="/api/report")
app.register_blueprint(user_router, url_prefix="/api/user")
# app.register_blueprint(all_router, url_prefix="/api")
def create_app():
app = Flask(__name__)
CORS(app)
SiwaDoc(app)
# app.config.from_object('setting')
app.config["SECRET_KEY"] = "wYdilHT~TRw7j{lF+Ee5MR3nFBINONPUcObwjwzge&/(~[C?Yz"
register_blueprints(app)

@ -63,6 +63,22 @@ class BllMedicament(Repository):
page_param.totalRecords = self.execute(count_sql).fetchone()[0]
return data_list
# 获取试剂库存消耗
def get_drug_stock_use_classify(self):
sql_all = """
select count(medicament_id) count_number, `status` from rms_medicament GROUP BY `status`
"""
data = self.execute(sql_all).fetchall()
data_number = sum([i[0] for i in data])
data_li = Utils.msyql_table_model(data)
data_list = []
for i in data_li:
new_dict = {
"ratio": str(round(i["count_number"] / data_number,2) * 100) + "%"
}
new_dict.update(i)
data_list.append(new_dict)
return data_list, data_number
@ -309,7 +325,7 @@ if __name__ == '__main__':
from db_logic.medicament import BllMedicament
from common.utils import PageParam
page_param = PageParam(1, 10)
data = BllMedicament().getAllDrugList(page_param=page_param, search_word='')
data = BllMedicament().get_drug_use_classify()
print(data)
# values= {}
# page = values.get("page", 1)

@ -80,18 +80,74 @@ class BllMedicamentRecord(Repository):
return dict(recordListNew)
# 获取报表统计内入库记录信息
def get_drug_log_time_name(self, record_type):
sql_all = f"""
select a.`name` `name`,a.medicament_id, b.create_date create_date from (
select `name`, medicament_id from rms_medicament
) a RIGHT JOIN(
select medicament_id, create_date from rms_medicament_record WHERE record_type={record_type} ORDER BY create_date desc LIMIT 1
) b on a.medicament_id=b.medicament_id
"""
return self.execute(sql_all).fetchall()[0]
# 获取报表统计主页入库记录信息
def get_drug_record_count(self):
query_li = [
self.entityType.record_type,
func.count('record_id'),
func.max(self.entityType.create_date)
]
data = self.session.query(
*tuple(query_li)
).select_from(self.entityType).group_by(EntityMedicamentRecord.record_type).all()
return data
sql_all = """
select COUNT(record_id) count_number, record_type from rms_medicament_record GROUP BY record_type
"""
data = self.execute(sql_all).fetchall()
data_li = Utils.msyql_table_model(data)
data_list = []
for i in data_li:
name_date = self.get_drug_log_time_name(i["record_type"])
new_dic = {
"name": name_date[0],
"date_time": name_date[2]
}
new_dic.update(**i)
data_list.append(new_dic)
return data_list
# 获取报表统计主页试剂用量消耗
def report_home_drug_useing_classify(self):
sql_all = """
select a.count_id count_number, b.`name` from (
select count(record_id) count_id, medicament_id from rms_medicament_record GROUP BY medicament_id
) a LEFT JOIN(
select `name`, medicament_id from rms_medicament
) b on a.medicament_id=b.medicament_id
"""
data = self.execute(sql_all).fetchall()
data_number = sum([i[0] for i in data])
data_li = Utils.msyql_table_model(data)
data_list = []
for i in data_li:
new_dic = {
"ratio": Utils.classify(i["count_number"], data_number)
}
new_dic.update(**i)
data_list.append(new_dic)
return data_list
# 获取报表统计主页人员用量消耗
def report_home_user_use_info(self):
sql_all = """
select a.count_id count_number, a.create_user_id, a.create_user_name, b.avatar_url from(
select count(record_id) count_id, create_user_id,create_user_name from rms_medicament_record GROUP BY create_user_id
) a LEFT JOIN(
select avatar_url, user_id from rms_user
) b on a.create_user_id= b.user_id
"""
data = self.execute(sql_all).fetchall()
data_list = Utils.msyql_table_model(data)
return data_list
# 公用方法
def default_data_list(self, sql_all, finds=None):
med_data = self.execute(sql_all).fetchall()
@ -271,5 +327,5 @@ if __name__ == '__main__':
# sql = f"update rms_medicament_record set use_quantity={round(random.uniform(1,10), 1)}"
from common.utils import PageParam
page_param = PageParam(1, 10)
typ_dic = BllMedicamentRecord().durg_useing_info(page_param=page_param, seach_word='')
print(typ_dic)
typ_dic = BllMedicamentRecord().report_home_user_use_info()
print(typ_dic)

@ -3,31 +3,65 @@
'''
@Date:2022/07/18 16:44:57
'''
from sqlalchemy import and_, or_, desc
import sys
sys.path.append(".")
import jwt
import base64
import datetime
from flask import current_app
from sqlalchemy import and_, insert, or_, desc
from db_logic.db_base import Repository
from models.user_models import EntityUser
from common.utils import Utils
#用户操作业务逻辑类
class BllUser(Repository):
#_instance_lock = threading.Lock()
##实现单例模式
#def __new__(cls, *args, **kwargs):
# if not hasattr(BllUser, "_instance"):
# with BllUser._instance_lock:
# if not hasattr(BllUser, "_instance"):
# BllUser._instance = object.__new__(cls)
# return BllUser._instance
def __init__(self, entityType=EntityUser):
return super().__init__(entityType)
#用户账号密码登录
def login(self, userAccount, userPwd):
d = Utils.MD5(userPwd)
print(d)
return self.findEntity(and_(EntityUser.Account == userAccount, EntityUser.Password == Utils.MD5(userPwd)))
def login(self, user_name, password):
user_info = self.findEntity(EntityUser.account == user_name)
if user_info.is_enabled:
if user_info.check_password(password):
return self.get_jwt(user_info)
else:
return False
else:
return False
def get_jwt(self, obj, expires_in=36000):
'''用户登录后,发放有效的 JWT'''
now = datetime.datetime.now()
payload = {
'user_id': obj.user_id,
'user_name': obj.role_name,
'user_avatar': obj.avatar_url,
'exp': now + datetime.timedelta(seconds=expires_in),
'iat': now
}
return jwt.encode(
payload,
current_app.config['SECRET_KEY'],
algorithm='HS256')
def verify_jwt(self, token):
'''验证 JWT 的有效性'''
try:
payload = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256'])
except (jwt.exceptions.ExpiredSignatureError,
jwt.exceptions.InvalidSignatureError,
jwt.exceptions.DecodeError) as e:
# Token过期或被人修改那么签名验证也会失败
return None
# return User.query.get(payload.get('user_id'))
return self.findEntity(payload.get('user_id'))
#根据条码获取用户
def getUserByBarCode(self, barCode):

@ -3,7 +3,13 @@
'''
@Date:2022/07/19 13:54:49
'''
import sys
sys.path.append(".")
import random
from sqlalchemy import or_, desc
from common.utils import Utils
from db_logic.db_base import Repository
from models.warning_models import EntityWarning
#预警信息业务逻辑类
@ -30,3 +36,48 @@ class BllWarning(Repository):
# 余量预警列表
ylList = [record for record in recordList if record.object_type == '3']
return (len(recordList),len(gqList),len(ylList))
def get_waring_type_classify(self):
sql_all = """
SELECT count(warning_id) type_number, object_type FROM `rms_warning` GROUP BY object_type
"""
data_list = self.execute(sql_all).fetchall()
all_number = sum([i[0] for i in data_list])
data_list = Utils.msyql_table_model(data_list)
new_data_list = []
for i in data_list:
new_dic = {
"ratio": str(round(i["type_number"] / all_number, 2) * 100) + "%"
}
new_dic.update(**i)
new_data_list.append(new_dic)
return new_data_list, all_number
def create_bluk_waring(self):
inster_list = []
for i in range(1000):
inster_list.append(
EntityWarning(
customer_id='1c39cb2a-07f8-11ed-972d-f47b094925e1',
object_type=random.randint(1, 7),
object_id='1c39cb24-07f8-11ed-abd4-f47b094925e1',
object_name=f'测试对象名称{i}',
warning_content=f'测试预警数据{i}',
warning_date=Utils.get_str_datetime()
)
)
return self.insert_many(inster_list)
# if __name__ == '__main__':
# a = BllWarning().get_waring_type_classify()
# base_ratio = 0
# base_number = 0
# for i in a:
# print(i)
# # base_ratio += i["ratio"]
# base_number += i['type_number']
# print(base_ratio)
# print(base_number)

@ -3,12 +3,12 @@
'''
@Date:2022/07/18 15:29:38
'''
# 已同步本地数据库
from sqlalchemy import Column, String, Text, Integer
from common.utils import Utils
from models.models_base import Base, get_uuid
from werkzeug.security import check_password_hash, generate_password_hash
# 已同步本地数据库
class EntityUser(Base):
__tablename__ = "rms_user"
__table_args__ = (
@ -24,7 +24,7 @@ class EntityUser(Base):
bar_code = Column(String(50), comment='条码') # 条码
user_code = Column(String(50), comment='用户编号') # 用户编号
account = Column(String(50), comment='账户') # 账户
password = Column(String(50), comment='密码') # 密码
password = Column(String(255), comment='密码') # 密码
real_name = Column(String(50), comment='真实名称') # 真实名称
avatar_url = Column(String(50), comment='头像地址') # 头像地址
avatar_base64 = Column(Text, comment='头像base64值') # 头像base64值
@ -42,6 +42,17 @@ class EntityUser(Base):
create_user_name = Column(String(50), comment='创建用户名称') # 创建用户名称
is_add = Column(Integer, comment='是否添加', default=1)
def update_last_visit_date(self):
self.last_visit_date=Utils.get_str_datetime()
def set_password(self, password):
'''设置用户密码,保存为 Hash 值'''
self.password = generate_password_hash(password)
def check_password(self, password):
'''验证密码与保存的 Hash 值是否匹配'''
return check_password_hash(self.password, password)
class EntityRole(Base):
__tablename__ = "rms_role"

Loading…
Cancel
Save