You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

218 lines
7.6 KiB

from datetime import timedelta
from fastapi import APIRouter, Request, Depends
from pydantic import BaseModel, constr
from tortoise.expressions import Q
from tortoise.queryset import QuerySet
from conf import setting
from helper.log import record_log, logger_wrapper
from helper import respond_to, login_required
from helper.mask import Mask
from helper.utils import encrypt_md5, timezone_now
from models import User, PasswordSecureMode, PlatformSecurity, Role, Drawer
router = APIRouter(prefix='/users', dependencies=[Depends(login_required)])
class SignUp(BaseModel):
username: constr(strip_whitespace=True)
name: str
role_id: int
class UserEdit(BaseModel):
name: str
password: str | None
role_id: int
class ProfileEdit(BaseModel):
name: str | None
password: str | None
class UserDrawersEdit(BaseModel):
drawers: list[str] | None
cabinet_id: str
@router.get('', summary='用户列表')
async def index(request: Request, page_no: int = 1, page_size: int = 20, keyword: str = None):
offset = (page_no - 1) * page_size
query = QuerySet(User).only('id', 'username', 'name', 'role_id', 'locked', 'login_frozen_count',
'password_expired_at', 'last_login_at', 'face')
if keyword:
query = query.filter(Q(username__istartswith=keyword) | Q(name__startswith=keyword))
# 当前登录用户role_id
current_role_id = request.state.current_user.role_id
current_role = await Role.get(id=current_role_id)
current_grade = current_role.grade
role_ids = await Role.filter(grade__lte=current_grade).values_list("id", flat=True)
query = query.filter(role_id__in=role_ids)
count = await query.count()
users = await query.limit(page_size).offset(offset).order_by('-created_at')
user_list = []
for user in users:
has_face = bool(user.face) # Check if face has a value
user_data = {
'id': user.id,
'username': user.username,
'name': user.name,
'role_id': user.role_id,
'locked': user.locked,
'login_frozen_count': user.login_frozen_count,
'password_expired_at': user.password_expired_at,
'last_login_at': user.last_login_at,
'has_face': has_face, # Include has_face field
'raw': user.raw
}
user_list.append(user_data)
return respond_to(data=dict(count=count, users=user_list))
@router.get('/{id}', summary='获取个人信息')
async def show(id: str):
user = await User.get(id=id)
return respond_to(data=user.as_json())
@router.post('', summary='创建账号')
async def create(request: Request, model: SignUp):
if len(model.username) < 4:
return respond_to(code=400, desc='用户名长度不能小于4位')
user = await User.get_or_none(username=model.username)
if user:
return respond_to(code=409, desc='账号已注册')
password_digest = encrypt_md5(setting.REGISTER_INIT_PASSWORD)
user = await User.create(**model.dict(), password_digest=password_digest)
record = {
"user_id": request.state.current_user.id,
"users": request.state.current_user.name,
"kind": "用户",
"comment": f"{request.state.current_user.name}创建用户{model.username},创建用户成功"
}
await record_log(**record)
return respond_to(data=user.as_json())
@router.patch('/{id}', summary='修改个人信息')
@logger_wrapper
async def update(request: Request, id: str, model: ProfileEdit):
"""修改个人信息"""
users: list[str] = request.state.users
source = await PlatformSecurity.all().first()
if id in users:
user: User = await User.get(id=id)
if model.name:
user.name = model.name
if model.password:
if encrypt_md5(model.password) == user.password_digest:
return respond_to(code=403, desc='新密码不可与原密码相同')
if not await PasswordSecureMode.verify(model.password):
mode = await PasswordSecureMode.get(checked=True)
return respond_to(code=403, desc=f'密码安全等级不足,{mode.comment}')
user.password_digest = encrypt_md5(model.password)
user.login_frozen_count = 0
user.last_attempt_at = None
user.password_expired_at = timezone_now() + timedelta(days=source.reset_password_duration)
await user.save(
update_fields=['name', 'password_digest', 'login_frozen_count', 'last_attempt_at', 'password_expired_at'])
return respond_to()
else:
return respond_to(code=403, desc='权限不足')
@router.put('/{id}', summary='修改用户信息')
@logger_wrapper
async def update(request: Request, id: str, model: UserEdit):
"""修改用户信息"""
current_user: User = request.state.current_user
permit = await current_user.permit('1001', Mask.UPDATE)
if not permit:
return respond_to(code=403, desc='权限不足')
user: User = await User.get(id=id)
user.role_id = model.role_id
user.name = model.name
source = await PlatformSecurity.all().first()
if model.password:
if not await PasswordSecureMode.verify(model.password):
return respond_to(code=403, desc='密码安全等级不足')
user.password_digest = encrypt_md5(model.password)
user.login_frozen_count = 0
user.last_attempt_at = None
user.password_expired_at = timezone_now() + timedelta(days=source.reset_password_duration)
await user.save(update_fields=['role_id', 'name', 'password_digest', 'login_frozen_count', 'last_attempt_at',
'password_expired_at'])
return respond_to(200, desc=f"修改[{user.name}]用户信息成功")
@router.delete('/{id}', summary='锁定账号')
@logger_wrapper
async def destroy(request: Request, id: str):
"""锁定账号"""
current_user: User = request.state.current_user
permit = await current_user.permit('1001', Mask.DELETE)
if not permit:
return respond_to(code=403, desc='权限不足')
user = await User.get(id=id)
user.locked = not user.locked
await user.save(update_fields=['locked'])
return respond_to(200, desc=f"{user.name}账号已锁定")
@router.put('/{id}/drawers', summary="更新柜体权限")
@logger_wrapper
async def update(request: Request, id: str, cabinet_drawers: list[dict]):
"""更新柜体权限"""
user = await User.get(id=id)
for model in cabinet_drawers:
drawers = model.get("drawers")
cabinet_id = model.get("cabinet_id")
current_drawers_query = Drawer.filter(cabinet_id=cabinet_id).values('id')
current_drawers_set = {str(drawer['id']) async for drawer in current_drawers_query}
db_drawers_set = set(user.drawers)
other_drawers_set = db_drawers_set - current_drawers_set
updated_drawers = set(drawers) | other_drawers_set
user.drawers = list(updated_drawers)
await user.save(update_fields=['drawers'])
return respond_to()
@router.get('/{id}/drawers', summary="获取柜体权限")
async def show(request: Request, id: str):
user = await User.get(id=id)
return respond_to(data=user.drawers)
class UserKeys(BaseModel):
keys: list[str]
@router.put('/keys/{id}', summary='修改用户钥匙权限')
@logger_wrapper
async def update(request: Request, id: str, model: UserKeys):
"""修改用户钥匙权限"""
current_user: User = request.state.current_user
permit = await current_user.permit('1001', Mask.UPDATE)
if not permit:
return respond_to(code=403, desc='权限不足')
user: User = await User.get(id=id)
user.keys = model.keys
await user.save(update_fields=['keys'])