You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
4.4 KiB

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/07/18 16:44:57
'''
import sys
import sqlalchemy
sys.path.append(".")
import jwt
import base64
import datetime
from flask import current_app
from sqlalchemy import and_, insert, or_, desc
from db_logic.db_base import Repository
from models.user_models import EntityUser
from Common.Utils import Utils
#用户操作业务逻辑类
class BllUser(Repository):
def __init__(self, entityType=EntityUser):
return super().__init__(entityType)
#用户账号密码登录
def login(self, user_name, password):
2 years ago
user_info = self.findEntity(
or_(
EntityUser.account == user_name,
EntityUser.user_code == user_name
)
)
if user_info:
if user_info.is_enabled:
if str(user_info.password).lower() == Utils.MD5(password).lower():
2 years ago
self.last_visit_date=Utils.get_str_datetime()
self.update(user_info)
return self.get_jwt(user_info), user_info
return False, None
# 用户密码修改
def update_password(self, user_info, old_password, new_password):
2 years ago
if user_info.password == Utils.MD5(old_password):
user_info.password = Utils.MD5(new_password)
self.update(user_info)
return True, self.get_jwt(user_info)
else:
return False, "旧密码错误"
def get_jwt(self, obj, expires_in=36000):
'''用户登录后,发放有效的 JWT'''
now = datetime.datetime.now()
payload = {
'user_id': obj.user_id,
'user_name': obj.role_name,
'user_avatar': obj.avatar_url,
'exp': now + datetime.timedelta(seconds=expires_in),
'iat': now
}
return jwt.encode(
payload,
current_app.config['SECRET_KEY'],
algorithm='HS256')
def verify_jwt(self, token):
'''验证 JWT 的有效性'''
try:
payload = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256'])
except (jwt.exceptions.ExpiredSignatureError,
jwt.exceptions.InvalidSignatureError,
jwt.exceptions.DecodeError) as e:
# Token过期或被人修改那么签名验证也会失败
return None
# return User.query.get(payload.get('user_id'))
return self.findEntity(payload.get('user_id'))
# 用户管理
def get_seach_user_list(self, user_code, real_name, role_name, is_enabled, page_param, user_id):
filter_base = ""
if user_code:
2 years ago
filter_base += f" user_code like '%{user_code}%'"
if real_name:
if filter_base:
filter_base += " and "
2 years ago
filter_base += f" real_name like '%{real_name}%' "
if role_name:
if filter_base:
filter_base += " and "
2 years ago
filter_base += f" role_name like '%{role_name}%'"
if is_enabled:
if filter_base:
filter_base += " and "
filter_base += f" is_enabled={is_enabled}"
if filter_base:
filter_base += " and "
filter_base += f" user_id != '{user_id}' "
if filter_base:
filter_base = f" where {filter_base}"
sql_all = f"""
select * from rms_user {filter_base} order by create_date desc
"""
try:
count_number = len(self.execute(sql_all).fetchall())
except Exception:
count_number = 0
page_param.totalRecords = count_number
sql_page = Utils.sql_paging_assemble(sql_all, page_param)
return self.execute(sql_page).fetchall()
# #根据条码获取用户
# def getUserByBarCode(self, barCode):
# return self.findEntity(EntityUser.BarCode == barCode)
# #获取用户列表
# def getUserList(self, customerId, pageParam, keyWord=''):
# keyWord = '%' + keyWord + '%'
# orm_query = self.findList().filter(EntityUser.CustomerId == customerId
# ).filter(or_(EntityUser.UserCode.like(keyWord), EntityUser.RealName.like(keyWord))).order_by(desc(EntityUser.CreateDate))
# return self.queryPage(orm_query, pageParam)
# #获取用户详情信息
# def getUserInfo(self, userId):
# return self.findEntity(userId)