# Test Router 页面 from typing import Optional import aioredis from fastapi import APIRouter, Depends, Query from fastapi.security import OAuth2PasswordBearer from fastapi_plugins import depends_redis from pydantic import BaseModel from biz.example import get_user_by_id from dtos import response from dtos.example import UserListPagesResult, UserListResult router = APIRouter(prefix="/test") @router.get("/") def index(): return {"msg": "This is Index Page"} # ==================登录验证相关=================================== class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None def fake_decode_token(token): return User( username=token + "fakedecoded", email="john@example.com", full_name="John Doe" ) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") async def get_current_user(token: str = Depends(oauth2_scheme)): print("get_current_user...") user = fake_decode_token(token) return user @router.get('/user/') async def get_user(): return get_user_by_id(27) # 登录验证 @router.get("/users/me") async def read_users_me(current_user: User = Depends(get_current_user)): return current_user @router.get('/users', response_model=UserListResult) def get_user_list(): data = [{"id": 12, "user_name": "lala"}] res = response(data=data, message="Ok!") return res @router.get('/user_list_pages', response_model=UserListPagesResult) async def get_user_list_pages(page: int = Query(..., description="当前页码"), limit: int = 20): items = [{"id": 12, "user_name": "lala"}] data = {"totalRecords": limit, "totalPages": 0, "currentPage": page, "items": items} res = response(data=data, message="Ok!") return res # Redis 缓存查询 @router.get("/ping") async def ping(cache: aioredis.Redis = Depends(depends_redis)): return dict(ping=await cache.ping())