from datetime import timedelta from fastapi import APIRouter, Request, Depends from pydantic import BaseModel, constr from tortoise.expressions import Q from tortoise.queryset import QuerySet from conf import setting from helper.log import record_log, logger_wrapper from helper import respond_to, login_required from helper.mask import Mask from helper.utils import encrypt_md5, timezone_now from models import User, PasswordSecureMode, PlatformSecurity, Role, Drawer router = APIRouter(prefix='/users', dependencies=[Depends(login_required)]) class SignUp(BaseModel): username: constr(strip_whitespace=True) name: str role_id: int class UserEdit(BaseModel): name: str password: str | None role_id: int class ProfileEdit(BaseModel): name: str | None password: str | None class UserDrawersEdit(BaseModel): drawers: list[str] | None cabinet_id: str @router.get('', summary='用户列表') async def index(request: Request, page_no: int = 1, page_size: int = 20, keyword: str = None): offset = (page_no - 1) * page_size query = QuerySet(User).only('id', 'username', 'name', 'role_id', 'locked', 'login_frozen_count', 'password_expired_at', 'last_login_at', 'face') if keyword: query = query.filter(Q(username__istartswith=keyword) | Q(name__startswith=keyword)) # 当前登录用户role_id current_role_id = request.state.current_user.role_id current_role = await Role.get(id=current_role_id) current_grade = current_role.grade role_ids = await Role.filter(grade__lte=current_grade).values_list("id", flat=True) query = query.filter(role_id__in=role_ids) count = await query.count() users = await query.limit(page_size).offset(offset).order_by('-created_at') user_list = [] for user in users: has_face = bool(user.face) # Check if face has a value user_data = { 'id': user.id, 'username': user.username, 'name': user.name, 'role_id': user.role_id, 'locked': user.locked, 'login_frozen_count': user.login_frozen_count, 'password_expired_at': user.password_expired_at, 'last_login_at': user.last_login_at, 'has_face': has_face, # Include has_face field 'raw': user.raw } user_list.append(user_data) return respond_to(data=dict(count=count, users=user_list)) @router.get('/{id}', summary='获取个人信息') async def show(id: str): user = await User.get(id=id) return respond_to(data=user.as_json()) @router.post('', summary='创建账号') async def create(request: Request, model: SignUp): if len(model.username) < 4: return respond_to(code=400, desc='用户名长度不能小于4位') user = await User.get_or_none(username=model.username) if user: return respond_to(code=409, desc='账号已注册') password_digest = encrypt_md5(setting.REGISTER_INIT_PASSWORD) user = await User.create(**model.dict(), password_digest=password_digest) record = { "user_id": request.state.current_user.id, "users": request.state.current_user.name, "kind": "用户", "comment": f"{request.state.current_user.name}创建用户{model.username},创建用户成功" } await record_log(**record) return respond_to(data=user.as_json()) @router.patch('/{id}', summary='修改个人信息') @logger_wrapper async def update(request: Request, id: str, model: ProfileEdit): """修改个人信息""" users: list[str] = request.state.users source = await PlatformSecurity.all().first() if id in users: user: User = await User.get(id=id) if model.name: user.name = model.name if model.password: if encrypt_md5(model.password) == user.password_digest: return respond_to(code=403, desc='新密码不可与原密码相同') if not await PasswordSecureMode.verify(model.password): mode = await PasswordSecureMode.get(checked=True) return respond_to(code=403, desc=f'密码安全等级不足,{mode.comment}') user.password_digest = encrypt_md5(model.password) user.login_frozen_count = 0 user.last_attempt_at = None user.password_expired_at = timezone_now() + timedelta(days=source.reset_password_duration) await user.save( update_fields=['name', 'password_digest', 'login_frozen_count', 'last_attempt_at', 'password_expired_at']) return respond_to() else: return respond_to(code=403, desc='权限不足') @router.put('/{id}', summary='修改用户信息') @logger_wrapper async def update(request: Request, id: str, model: UserEdit): """修改用户信息""" current_user: User = request.state.current_user permit = await current_user.permit('1001', Mask.UPDATE) if not permit: return respond_to(code=403, desc='权限不足') user: User = await User.get(id=id) user.role_id = model.role_id user.name = model.name source = await PlatformSecurity.all().first() if model.password: if not await PasswordSecureMode.verify(model.password): return respond_to(code=403, desc='密码安全等级不足') user.password_digest = encrypt_md5(model.password) user.login_frozen_count = 0 user.last_attempt_at = None user.password_expired_at = timezone_now() + timedelta(days=source.reset_password_duration) await user.save(update_fields=['role_id', 'name', 'password_digest', 'login_frozen_count', 'last_attempt_at', 'password_expired_at']) return respond_to(200, desc=f"修改[{user.name}]用户信息成功") @router.delete('/{id}', summary='锁定账号') @logger_wrapper async def destroy(request: Request, id: str): """锁定账号""" current_user: User = request.state.current_user permit = await current_user.permit('1001', Mask.DELETE) if not permit: return respond_to(code=403, desc='权限不足') user = await User.get(id=id) user.locked = not user.locked await user.save(update_fields=['locked']) return respond_to(200, desc=f"{user.name}账号已锁定") @router.put('/{id}/drawers', summary="更新柜体权限") @logger_wrapper async def update(request: Request, id: str, cabinet_drawers: list[dict]): """更新柜体权限""" user = await User.get(id=id) for model in cabinet_drawers: drawers = model.get("drawers") cabinet_id = model.get("cabinet_id") current_drawers_query = Drawer.filter(cabinet_id=cabinet_id).values('id') current_drawers_set = {str(drawer['id']) async for drawer in current_drawers_query} db_drawers_set = set(user.drawers) other_drawers_set = db_drawers_set - current_drawers_set updated_drawers = set(drawers) | other_drawers_set user.drawers = list(updated_drawers) await user.save(update_fields=['drawers']) return respond_to() @router.get('/{id}/drawers', summary="获取柜体权限") async def show(request: Request, id: str): user = await User.get(id=id) return respond_to(data=user.drawers) class UserKeys(BaseModel): keys: list[str] @router.put('/keys/{id}', summary='修改用户钥匙权限') @logger_wrapper async def update(request: Request, id: str, model: UserKeys): """修改用户钥匙权限""" current_user: User = request.state.current_user permit = await current_user.permit('1001', Mask.UPDATE) if not permit: return respond_to(code=403, desc='权限不足') user: User = await User.get(id=id) user.keys = model.keys await user.save(update_fields=['keys'])