#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @Date:2022/07/18 16:44:57 ''' import sys sys.path.append(".") import jwt import base64 import datetime from flask import current_app from sqlalchemy import and_, insert, or_, desc from db_logic.db_base import Repository from models.user_models import EntityUser from Common.Utils import Utils #用户操作业务逻辑类 class BllUser(Repository): def __init__(self, entityType=EntityUser): return super().__init__(entityType) #用户账号密码登录 def login(self, user_name, password): user_info = self.findEntity(EntityUser.account == user_name) if user_info: if user_info.is_enabled: if user_info.check_password(password): return self.get_jwt(user_info) return False def get_jwt(self, obj, expires_in=36000): '''用户登录后,发放有效的 JWT''' now = datetime.datetime.now() payload = { 'user_id': obj.user_id, 'user_name': obj.role_name, 'user_avatar': obj.avatar_url, 'exp': now + datetime.timedelta(seconds=expires_in), 'iat': now } return jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256') def verify_jwt(self, token): '''验证 JWT 的有效性''' try: payload = jwt.decode( token, current_app.config['SECRET_KEY'], algorithms=['HS256']) except (jwt.exceptions.ExpiredSignatureError, jwt.exceptions.InvalidSignatureError, jwt.exceptions.DecodeError) as e: # Token过期,或被人修改,那么签名验证也会失败 return None # return User.query.get(payload.get('user_id')) return self.findEntity(payload.get('user_id')) #根据条码获取用户 def getUserByBarCode(self, barCode): return self.findEntity(EntityUser.BarCode == barCode) #获取用户列表 def getUserList(self, customerId, pageParam, keyWord=''): keyWord = '%' + keyWord + '%' orm_query = self.findList().filter(EntityUser.CustomerId == customerId ).filter(or_(EntityUser.UserCode.like(keyWord), EntityUser.RealName.like(keyWord))).order_by(desc(EntityUser.CreateDate)) return self.queryPage(orm_query, pageParam) #获取用户详情信息 def getUserInfo(self, userId): return self.findEntity(userId)