#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @Date:2022/07/18 16:44:57 ''' import sys import sqlalchemy sys.path.append(".") import jwt import base64 import datetime from flask import current_app from sqlalchemy import and_, insert, or_, desc from db_logic.db_base import Repository from models.user_models import EntityUser from Common.Utils import Utils # 用户操作业务逻辑类 class BllUser(Repository): def __init__(self, entityType=EntityUser): return super().__init__(entityType) # 用户账号密码登录 def login(self, user_name, password): user_info = self.findEntity( or_( EntityUser.account == user_name, EntityUser.user_code == user_name ) ) if user_info: if user_info.is_enabled: if str(user_info.password).lower() == Utils.MD5(password).lower(): self.last_visit_date = Utils.get_str_datetime() self.update(user_info) return self.get_jwt(user_info), user_info return False, None # 用户密码修改 def update_password(self, user_info, old_password, new_password): if user_info.password == Utils.MD5(old_password): user_info.password = Utils.MD5(new_password) self.update(user_info) return True, self.get_jwt(user_info) else: return False, "旧密码错误" def get_jwt(self, obj, expires_in=36000): '''用户登录后,发放有效的 JWT''' now = datetime.datetime.now() payload = { 'user_id': obj.user_id, 'user_name': obj.role_name, 'user_avatar': obj.avatar_url, 'exp': now + datetime.timedelta(seconds=expires_in), 'iat': now } return jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256') def verify_jwt(self, token): '''验证 JWT 的有效性''' try: payload = jwt.decode( token, current_app.config['SECRET_KEY'], algorithms=['HS256']) except (jwt.exceptions.ExpiredSignatureError, jwt.exceptions.InvalidSignatureError, jwt.exceptions.DecodeError) as e: # Token过期,或被人修改,那么签名验证也会失败 return None # return User.query.get(payload.get('user_id')) return self.findEntity(payload.get('user_id')) # 用户管理 def get_seach_user_list(self, user_code, real_name, role_name, role_id, is_enabled, page_param, user_id, role_name_self): filter_base = "" if user_code: filter_base += f" user_code like '%{user_code}%'" if real_name: if filter_base: filter_base += " and " filter_base += f" real_name like '%{real_name}%' " if role_name: if filter_base: filter_base += " and " filter_base += f" role_name like '%{role_name}%'" if role_id: if filter_base: filter_base += " and " filter_base += f" role_id='{role_id}' " if is_enabled: if filter_base: filter_base += " and " filter_base += f" is_enabled={is_enabled}" # if role_name_self !='管理员': # if filter_base: # filter_base += " and " # filter_base += f" user_id = '{user_id}' " if filter_base: filter_base = f" where {filter_base}" sql_all = f""" select * from rms_user {filter_base} order by create_date desc """ try: count_number = len(self.execute(sql_all).fetchall()) except Exception: count_number = 0 page_param.totalRecords = count_number sql_page = Utils.sql_paging_assemble(sql_all, page_param) return self.execute(sql_page).fetchall() # #根据条码获取用户 # def getUserByBarCode(self, barCode): # return self.findEntity(EntityUser.BarCode == barCode) # #获取用户列表 # def getUserList(self, customerId, pageParam, keyWord=''): # keyWord = '%' + keyWord + '%' # orm_query = self.findList().filter(EntityUser.CustomerId == customerId # ).filter(or_(EntityUser.UserCode.like(keyWord), EntityUser.RealName.like(keyWord))).order_by(desc(EntityUser.CreateDate)) # return self.queryPage(orm_query, pageParam) # #获取用户详情信息 # def getUserInfo(self, userId): # return self.findEntity(userId)