|
|
|
|
#!/usr/bin/env python
|
|
|
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
|
'''
|
|
|
|
|
@Date:2022/07/19 16:34:34
|
|
|
|
|
'''
|
|
|
|
|
from flask import jsonify, request, Blueprint, g
|
|
|
|
|
from sqlalchemy import and_
|
|
|
|
|
from Common.auth import token_auth
|
|
|
|
|
from Common.Utils import PageParam, Utils
|
|
|
|
|
from io import BytesIO
|
|
|
|
|
from db_logic.user import BllUser
|
|
|
|
|
from models.user_models import EntityUser,EntityRole,EntityUserFace
|
|
|
|
|
from models.power_models import EntityUserApply
|
|
|
|
|
from db_logic.user_apply import BllUserApply
|
|
|
|
|
from db_logic.medicament import BllMedicament
|
|
|
|
|
from db_logic.role import BllRole
|
|
|
|
|
from db_logic.user_face import BllUserFace
|
|
|
|
|
from Common.Utils import DooropenUser
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
user_router = Blueprint('user', __name__)
|
|
|
|
|
|
|
|
|
|
# 登录
|
|
|
|
|
@user_router.route("/login", methods=["POST"])
|
|
|
|
|
# @token_auth.login_required
|
|
|
|
|
def user_login():
|
|
|
|
|
user_name = request.values.get("user_name")
|
|
|
|
|
password = request.values.get('password')
|
|
|
|
|
# user_name = body.user_name
|
|
|
|
|
# password = body.password
|
|
|
|
|
print(user_name,password,'**********************************************************')
|
|
|
|
|
user_obj, user_info = BllUser().login(user_name=user_name, password=password)
|
|
|
|
|
if user_obj:
|
|
|
|
|
return jsonify(Utils.true_return(msg="登陆成功", data={"token":user_obj, "user_info": Utils.to_dict(user_info)}))
|
|
|
|
|
else:
|
|
|
|
|
return jsonify(Utils.false_return(status=201, msg="账号或密码输入有误!"))
|
|
|
|
|
|
|
|
|
|
# 修改密码,根据token获取用户信息,接受老密码进行校验,新密码写入
|
|
|
|
|
@user_router.route("/update_password", methods=["POST"])
|
|
|
|
|
@token_auth.login_required
|
|
|
|
|
def user_update_password():
|
|
|
|
|
old_password = request.values.get("old_password")
|
|
|
|
|
new_password = request.values.get("new_password")
|
|
|
|
|
new_password1 = request.values.get("new_password1")
|
|
|
|
|
# user_id = g.current_user.use
|
|
|
|
|
if new_password != new_password1:
|
|
|
|
|
return jsonify(Utils.false_return(msg="两次密码不一致"))
|
|
|
|
|
msg_bool, msg_token = BllUser().update_password(g.current_user, old_password=old_password, new_password=new_password)
|
|
|
|
|
if msg_bool:
|
|
|
|
|
return jsonify(Utils.true_return(data={"token": msg_token}))
|
|
|
|
|
else:
|
|
|
|
|
return jsonify(Utils.false_return(msg=msg_token))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 获取用户列表
|
|
|
|
|
@user_router.route("/get_user_list", methods=["GET", "POST"])
|
|
|
|
|
@token_auth.login_required
|
|
|
|
|
def get_user_info_list():
|
|
|
|
|
user_code = request.values.get("user_code")
|
|
|
|
|
real_name = request.values.get("real_name")
|
|
|
|
|
role_name = request.values.get("role_name")
|
|
|
|
|
role_id = request.values.get("role_id")
|
|
|
|
|
is_enabled = request.values.get("is_enabled")
|
|
|
|
|
if role_id:
|
|
|
|
|
role =BllRole().findEntity(EntityRole.role_id ==role_id)
|
|
|
|
|
role_name=role.role_name
|
|
|
|
|
else:
|
|
|
|
|
role_name=''
|
|
|
|
|
page = int(request.values.get("page", 1))
|
|
|
|
|
page_size = int(request.values.get("page_size", 10))
|
|
|
|
|
print(page_size,'44444444444444444444')
|
|
|
|
|
page_param = PageParam(page, page_size)
|
|
|
|
|
data_list = BllUser().get_seach_user_list(
|
|
|
|
|
user_id = g.current_user.user_id,
|
|
|
|
|
user_code=user_code,
|
|
|
|
|
real_name=real_name,
|
|
|
|
|
role_name=role_name,
|
|
|
|
|
role_id=role_id,
|
|
|
|
|
is_enabled=is_enabled,
|
|
|
|
|
role_name_self=g.current_user.role_name,
|
|
|
|
|
page_param=page_param
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return jsonify(Utils.true_return(data={"total_count":page_param.totalRecords, "data_list": Utils.msyql_table_model(data_list)}))
|
|
|
|
|
|
|
|
|
|
# 新增用户
|
|
|
|
|
@user_router.route("/add", methods=["GET", "POST"])
|
|
|
|
|
@token_auth.login_required
|
|
|
|
|
def add_user_info():
|
|
|
|
|
create_user_info = g.current_user
|
|
|
|
|
new_obj = BllUser().execute(f"select * from rms_user where user_code={request.values.get('user_code')} and user_code is not null").fetchone()
|
|
|
|
|
if new_obj:
|
|
|
|
|
return jsonify(Utils.false_return(msg="工号不能重复"))
|
|
|
|
|
try:
|
|
|
|
|
obj = EntityUser(
|
|
|
|
|
real_name=request.values.get("real_name"),
|
|
|
|
|
mobile=request.values.get("mobile"),
|
|
|
|
|
sex=request.values.get("sex"),
|
|
|
|
|
email=request.values.get("email"),
|
|
|
|
|
user_code=request.values.get("user_code"),
|
|
|
|
|
account =request.values.get("user_code"),
|
|
|
|
|
qq=request.values.get("qq"),
|
|
|
|
|
customer_id ='1002437b-debf-46d6-b186-3e16bcf0cc0f',
|
|
|
|
|
role_id=request.values.get("role_id"),
|
|
|
|
|
role_name=request.values.get("role_name"),
|
|
|
|
|
create_date=Utils.get_str_datetime(),
|
|
|
|
|
create_user_id=create_user_info.user_id,
|
|
|
|
|
create_user_name=create_user_info.real_name,
|
|
|
|
|
is_enabled=request.values.get("is_enabled")
|
|
|
|
|
)
|
|
|
|
|
setattr(obj, "password", Utils.MD5('123456'))
|
|
|
|
|
BllUser().insert(obj)
|
|
|
|
|
return jsonify(Utils.true_return(msg=f"添加成功!"))
|
|
|
|
|
except Exception as error:
|
|
|
|
|
return jsonify(Utils.except_return(msg=error))
|
|
|
|
|
|
|
|
|
|
# 个人资料
|
|
|
|
|
@user_router.route("/getinfo", methods=["GET", "POST"])
|
|
|
|
|
@token_auth.login_required
|
|
|
|
|
def get_user_info():
|
|
|
|
|
data = Utils.to_dict(g.current_user)
|
|
|
|
|
try:
|
|
|
|
|
data.pop("password")
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
return jsonify(Utils.true_return(data=data))
|
|
|
|
|
|
|
|
|
|
# 修改用户
|
|
|
|
|
@user_router.route("/update", methods=["GET", "POST"])
|
|
|
|
|
@token_auth.login_required
|
|
|
|
|
def update_user_info():
|
|
|
|
|
user_id = request.values.get("user_id")
|
|
|
|
|
try:
|
|
|
|
|
obj = BllUser().findEntity(user_id)
|
|
|
|
|
if not obj:
|
|
|
|
|
return jsonify(Utils.false_return(msg="用户id有误"))
|
|
|
|
|
else:
|
|
|
|
|
# new_obj = BllUser().findEntity(
|
|
|
|
|
# and_(
|
|
|
|
|
# EntityUser.user_code == obj.user_code,
|
|
|
|
|
# EntityUser.user_code.isnot(None)
|
|
|
|
|
# )
|
|
|
|
|
# )
|
|
|
|
|
new_obj = BllUser().execute(f"select * from rms_user where user_code='{request.values.get('user_code')}' and user_code is not null").fetchone()
|
|
|
|
|
|
|
|
|
|
if new_obj and obj.user_id != new_obj.user_id:
|
|
|
|
|
return jsonify(Utils.false_return(msg="工号不能重复"))
|
|
|
|
|
finds_list = dir(EntityUser)
|
|
|
|
|
for i in finds_list:
|
|
|
|
|
va = request.values.get(i)
|
|
|
|
|
if va:
|
|
|
|
|
setattr(obj, i, va)
|
|
|
|
|
obj.update_last_visit_date()
|
|
|
|
|
BllUser().update(obj)
|
|
|
|
|
return jsonify(Utils.true_return(msg="修改成功"))
|
|
|
|
|
except Exception as error:
|
|
|
|
|
return jsonify(Utils.except_return(msg=error))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 删除用户
|
|
|
|
|
@user_router.route("/del", methods=["GET", "POST"])
|
|
|
|
|
@token_auth.login_required
|
|
|
|
|
def del_user():
|
|
|
|
|
user_id = request.values.get("user_id")
|
|
|
|
|
user_id_list = user_id.split(',')
|
|
|
|
|
for i in user_id_list:
|
|
|
|
|
obj = BllUserFace().findEntity(EntityUserFace.user_id==i)
|
|
|
|
|
# print(obj.face_value)
|
|
|
|
|
if obj:
|
|
|
|
|
featureData = BytesIO(b'0000')
|
|
|
|
|
obj.face_value = featureData.getvalue()
|
|
|
|
|
# obj.user_id =''
|
|
|
|
|
# setattr(obj, 'face_value', '')
|
|
|
|
|
BllUserFace().update(obj)
|
|
|
|
|
BllUser().delete(EntityUser.user_id==i)
|
|
|
|
|
return jsonify(Utils.true_return(msg="删除成功"))
|
|
|
|
|
# return jsonify(Utils.false_return("未选择用户"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 开门逻辑判断
|
|
|
|
|
@user_router.route("/opendoorcheck", methods=["GET", "POST"])
|
|
|
|
|
def opendoorcheck():
|
|
|
|
|
name1 = request.values.get("name1")
|
|
|
|
|
name2 = request.values.get("name2")
|
|
|
|
|
user1 =BllUser().findEntity(EntityUser.real_name==name1)
|
|
|
|
|
user2 =BllUser().findEntity(EntityUser.real_name==name2)
|
|
|
|
|
print(user1,user2)
|
|
|
|
|
data=False
|
|
|
|
|
DooropenUser.user_id =user1.user_id
|
|
|
|
|
if user1 and user2:
|
|
|
|
|
data=BllUserApply().get_apply_list(user1,user2)
|
|
|
|
|
if data:
|
|
|
|
|
print(DooropenUser.user_id)
|
|
|
|
|
return jsonify(Utils.true_return(msg="请求成功",data=data))
|
|
|
|
|
|
|
|
|
|
data = BllMedicament().get_drug_type_list2(user_id=user1.user_id)
|
|
|
|
|
if data:
|
|
|
|
|
DooropenUser.user_id =user1.user_id
|
|
|
|
|
return jsonify(Utils.true_return(msg="请求成功",data=data))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@user_router.route("/get_active_user_list", methods=["GET", "POST"])
|
|
|
|
|
def get_active_user_list():
|
|
|
|
|
"""
|
|
|
|
|
获取所有申通通过有效期内用户与管理员与危化品管理员用户列表
|
|
|
|
|
:return: [user_id_list]
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
data = BllUserApply().get_apply_solve_user_list()
|
|
|
|
|
apply_user_id_list = [item.user_id for item in data]
|
|
|
|
|
return jsonify(Utils.true_return(msg="请求成功", data=apply_user_id_list))
|