import re from typing import Optional from fastapi import status from fastapi.security import OAuth2PasswordRequestForm from fastapi.exceptions import HTTPException from fastapi_sqlalchemy import db from models.users import SysUser from common.schemas import UserCreateUpdate from common.password import verify_password, get_password_hash def validate_email(email: str) -> bool: pattern = r"[a-zA-Z0-9_-]+@[a-zA-Z0-9_-]+(?:\.[a-zA-Z0-9_-]+)" if re.match(pattern, email) is not None: return True else: return False def validate_phone_number(phone_number: str) -> object: pattern = r"1[356789]\d{9}" if re.match(pattern, phone_number) is not None: return True else: return False def get_user_by_email(email: str) -> Optional[SysUser]: if validate_email(email) is True: user = db.session.query(SysUser).filter(SysUser.Email == email).first() return user else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="无效的邮箱地址" ) def get_user_by_phone_number(phone_number: str) -> Optional[SysUser]: if validate_phone_number(phone_number) is True: user = db.session.query(SysUser).filter(SysUser.PhoneNumber == phone_number).first() return user else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="无效的手机号码格式" ) def get_user_by_username(username: str) -> Optional[SysUser]: user = db.session.query(SysUser).filter(SysUser.UserName == username).first() return user def authenticate_user(form_data: OAuth2PasswordRequestForm) -> Optional[SysUser]: if validate_email(form_data.username) is True: user = get_user_by_email(form_data.username) else: if validate_phone_number(form_data.username) is True: user = get_user_by_phone_number(form_data.username) else: user = get_user_by_username(form_data.username) if not user: return None if not verify_password(form_data.password, user.Password): return None return user def auth_user(username: str, password: str) -> Optional[SysUser]: if validate_email(username) is True: user = get_user_by_email(username) else: if validate_phone_number(username) is True: user = get_user_by_phone_number(username) else: user = get_user_by_username(username) if not user: return None if not verify_password(password, user.Password): return None return user def create_user(user_data: UserCreateUpdate): user_data.Password = get_password_hash(user_data.Password) user = SysUser(**user_data.dict()) db.session.add(user) db.session.commit() return user