You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

137 lines
4.7 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/07/18 16:44:57
'''
import sys
import sqlalchemy
sys.path.append(".")
import jwt
import base64
import datetime
from flask import current_app
from sqlalchemy import and_, insert, or_, desc
from db_logic.db_base import Repository
from models.user_models import EntityUser
from Common.Utils import Utils
# 用户操作业务逻辑类
class BllUser(Repository):
def __init__(self, entityType=EntityUser):
return super().__init__(entityType)
# 用户账号密码登录
def login(self, user_name, password):
user_info = self.findEntity(
or_(
EntityUser.account == user_name,
EntityUser.user_code == user_name
)
)
if user_info:
if user_info.is_enabled:
if str(user_info.password).lower() == Utils.MD5(password).lower():
self.last_visit_date = Utils.get_str_datetime()
self.update(user_info)
return self.get_jwt(user_info), user_info
return False, None
# 用户密码修改
def update_password(self, user_info, old_password, new_password):
if user_info.password == Utils.MD5(old_password):
user_info.password = Utils.MD5(new_password)
self.update(user_info)
return True, self.get_jwt(user_info)
else:
return False, "旧密码错误"
def get_jwt(self, obj, expires_in=36000):
'''用户登录后,发放有效的 JWT'''
now = datetime.datetime.now()
payload = {
'user_id': obj.user_id,
'user_name': obj.role_name,
'user_avatar': obj.avatar_url,
'exp': now + datetime.timedelta(seconds=expires_in),
'iat': now
}
return jwt.encode(
payload,
current_app.config['SECRET_KEY'],
algorithm='HS256')
def verify_jwt(self, token):
'''验证 JWT 的有效性'''
try:
payload = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256'])
except (jwt.exceptions.ExpiredSignatureError,
jwt.exceptions.InvalidSignatureError,
jwt.exceptions.DecodeError) as e:
# Token过期或被人修改那么签名验证也会失败
return None
# return User.query.get(payload.get('user_id'))
return self.findEntity(payload.get('user_id'))
# 用户管理
def get_seach_user_list(self, user_code, real_name, role_name, role_id, is_enabled, page_param, user_id,
role_name_self):
filter_base = ""
if user_code:
filter_base += f" user_code like '%{user_code}%'"
if real_name:
if filter_base:
filter_base += " and "
filter_base += f" real_name like '%{real_name}%' "
if role_name:
if filter_base:
filter_base += " and "
filter_base += f" role_name like '%{role_name}%'"
if role_id:
if filter_base:
filter_base += " and "
filter_base += f" role_id='{role_id}' "
if is_enabled:
if filter_base:
filter_base += " and "
filter_base += f" is_enabled={is_enabled}"
# if role_name_self !='管理员':
# if filter_base:
# filter_base += " and "
# filter_base += f" user_id = '{user_id}' "
if filter_base:
filter_base = f" where {filter_base}"
sql_all = f"""
select * from rms_user {filter_base} order by create_date desc
"""
try:
count_number = len(self.execute(sql_all).fetchall())
except Exception:
count_number = 0
page_param.totalRecords = count_number
sql_page = Utils.sql_paging_assemble(sql_all, page_param)
return self.execute(sql_page).fetchall()
# #根据条码获取用户
# def getUserByBarCode(self, barCode):
# return self.findEntity(EntityUser.BarCode == barCode)
# #获取用户列表
# def getUserList(self, customerId, pageParam, keyWord=''):
# keyWord = '%' + keyWord + '%'
# orm_query = self.findList().filter(EntityUser.CustomerId == customerId
# ).filter(or_(EntityUser.UserCode.like(keyWord), EntityUser.RealName.like(keyWord))).order_by(desc(EntityUser.CreateDate))
# return self.queryPage(orm_query, pageParam)
# #获取用户详情信息
# def getUserInfo(self, userId):
# return self.findEntity(userId)