import base64 import io import numpy as np from PIL import Image from fastapi import APIRouter, Depends, BackgroundTasks, Request from pydantic import BaseModel from arcsoft.arcface import FaceRecognitionEngine from arcsoft.exception import FaceNotFoundException, FaceMoreThanOneException, ArcSoftCallException, NonLivingException, \ UserNotFoundException from helper import respond_to, login_required from helper.log import record_log from helper.utils import timezone_now, encrypt_md5 from helper.tool import get_ip_address from models import User, Log, PlatformSecurity router = APIRouter(prefix='/sessions') class SignInModel(BaseModel): username: str password: str class FaceModel(BaseModel): raw: str # base64的人脸图片 class FaceRebindModel(BaseModel): source_id: str target_id: str @router.post('', summary='账号密码登录') async def create(model: SignInModel): # 审核人:林长青 2023-05-30 user = await User.get_or_none(username=model.username.strip()) if user is None: return respond_to(code=404, desc='账号尚未注册') if user.locked: return respond_to(code=423, desc='账号已被锁定') source = await PlatformSecurity.all().first() if 0 < source.login_frozen_count <= user.login_frozen_count: return respond_to(code=423, desc='账号已被冻结') if not encrypt_md5(model.password) == user.password_digest: if source.login_frozen_count > 0: user.login_frozen_count = user.login_frozen_count + 1 user.last_attempt_at = timezone_now() await user.save(update_fields=['login_frozen_count', 'last_attempt_at']) return respond_to(code=403, desc=f'密码错误!{source.login_frozen_count - user.login_frozen_count + 1}次后将被系统冻结。') return respond_to(code=403, desc='账号与密码不匹配') record = {"user_id": user.id, "users": user.username, "kind": "用户", "comment": f"用户{user.username},登录成功"} await record_log(**record) user.login_frozen_count = 0 user.last_attempt_at = None user.last_login_at = timezone_now() await user.save(update_fields=['login_frozen_count', 'last_attempt_at', 'last_login_at']) return respond_to(data=await user.profile) @router.post('/face', summary='人脸识别登录') async def create(model: FaceModel): # 审核人:林长青 2023-05-30 bin = base64.b64decode(model.raw) image = Image.open(io.BytesIO(bin)) buffer = np.array(image) users = await User.filter(face__isnull=False).values_list('id', 'face') with FaceRecognitionEngine() as engine: try: user_id = engine.login(buffer, users) except UserNotFoundException: return respond_to(code=400, desc='人脸信息尚未绑定') except ArcSoftCallException: return respond_to(code=400, desc='人脸底层接口无法调用') except FaceNotFoundException: return respond_to(code=400, desc='未检测到人脸信息') except FaceMoreThanOneException: return respond_to(code=400, desc='检测到多张人脸') except NonLivingException: return respond_to(code=400, desc='检测到非活体') user = await User.get_or_none(id=user_id) if user.locked: return respond_to(code=423, desc='账号已被锁定') record = {"user_id": user.id, "users": user.username, "kind": "用户", "comment": f"用户{user.username},登录成功"} await record_log(**record) user.login_frozen_count = 0 user.last_attempt_at = None user.last_login_at = timezone_now() await user.save(update_fields=['login_frozen_count', 'last_attempt_at', 'last_login_at']) return respond_to(data=await user.profile) @router.put('/face/{target_user_id}', summary='人脸信息绑定', dependencies=[Depends(login_required)]) async def update(request: Request, model: FaceModel, target_user_id: str, tasks: BackgroundTasks): # 审核人:林长青 2023-05-30 bin = base64.b64decode(model.raw) image = Image.open(io.BytesIO(bin)) buffer = np.array(image) users = await User.filter(face__isnull=False).values_list('id', 'face') with FaceRecognitionEngine() as engine: try: user_id = engine.login(buffer, users) if str(user_id) != target_user_id: return respond_to(code=409, desc='当前人脸已与其他账号绑定, 是否确认更换?', data={'source_id': user_id}) user = await User.get(id=target_user_id) user.face = engine.feature(buffer) await user.save(update_fields=['face']) tasks.add_task(Log.write, '用户', {'user_id': request.state.current_user.id, 'comment': f'{user.nickname}重新绑定人脸信息'}) record = {"user_id": user.id, "users": user.username, "kind": "用户", "comment": f"用户{user.username},绑定人脸信息"} await record_log(**record) return respond_to() except UserNotFoundException: user = await User.get(id=target_user_id) user.face = engine.feature(buffer) await user.save(update_fields=['face']) tasks.add_task(Log.write, '用户', {'user_id': request.state.current_user.id, 'comment': f'{user.nickname}绑定人脸信息'}) record = {"user_id": user.id, "users": user.username, "kind": "用户", "comment": f"用户{user.username},绑定人脸信息"} await record_log(**record) return respond_to() except ArcSoftCallException: return respond_to(code=400, desc='接口调用失败') except FaceNotFoundException: return respond_to(code=400, desc='未检测到人脸信息') except FaceMoreThanOneException: return respond_to(code=400, desc='检测到多张人脸') except NonLivingException: return respond_to(code=400, desc='检查到非活体') @router.patch('/face', summary='人脸信息更换绑定', dependencies=[Depends(login_required)]) async def update(request: Request, model: FaceRebindModel, tasks: BackgroundTasks): # 审核人:林长青 2023-05-30 source_user = await User.get(id=model.source_id) target_user = await User.get(id=model.target_id) target_user.face = source_user.face source_user.face = None await target_user.save(update_fields=['face']) await source_user.save(update_fields=['face']) tasks.add_task(Log.write, '用户', {'user_id': request.state.current_user.id, 'comment': f'{target_user.nickname}更换人脸信息'}) return respond_to() @router.get('/ip', summary='壁挂终端ip', dependencies=[Depends(login_required)]) async def index(request: Request): ip = get_ip_address() return respond_to(data=ip)