Skip to content

FastAPI 后端开发实战指南

基于 Python 3.9+ 的高性能异步 Web 框架,支持自动文档生成、类型验证、依赖注入等现代化特性。

1. 🚀 环境准备

项目依赖导出

bash
pip freeze > requirements.txt

完整依赖清单

txt
aerich==0.9.2
aiofiles==25.1.0
aiomysql==0.3.2
aiosqlite==0.22.0
annotated-doc==0.0.4
annotated-types==0.7.0
anyio==4.12.0
asyncclick==8.3.0.7
asyncio==4.0.0
bcrypt==5.0.0
cffi==2.0.0
click==8.3.1
cryptography==46.0.3
dictdiffer==0.9.0
dotenv==0.9.9
ecdsa==0.19.1
fastapi==0.124.4
fastapi-limiter==0.1.6
h11==0.16.0
idna==3.11
iso8601==2.1.0
loguru==0.7.3
passlib==1.7.4
pyasn1==0.6.1
pycparser==2.23
pydantic==2.12.5
pydantic-settings==2.12.0
pydantic_core==2.41.5
PyMySQL==1.1.2
pypika-tortoise==0.6.3
python-dotenv==1.2.1
python-jose==3.3.0
python-json-logger==4.0.0
python-multipart==0.0.20
pytz==2025.2
redis==7.1.0
rsa==4.9.1
six==1.17.0
starlette==0.50.0
tomlkit==0.13.3
tortoise-orm==0.25.2
typing-inspection==0.4.2
typing_extensions==4.15.0
uvicorn==0.38.0

2. ✅ 数据验证

Query 参数验证

python
# 验证
Item = Annotated[str,BeforeValidator(validate)]

@app.get("/item1/{item_id}")
def read_item(item_id: Item):
    return {"item_id": item_id}

Pydantic Field 验证

python
# ...代表必填
class Account(BaseModel):
    user_name:str = Field(...,min_length=3,max_length=10)
    password:str = Field(...)


# 自定义验证器
from pydantic import field_validator
class Account(BaseModel):
    user_name:str = Field(...,min_length=3,max_length=10)
    password:str = Field(...)

    @field_validator('password')
    def password_validator(cls, v):
        if '@' not in v:
            raise ValueError('密码必须包含@')
        return v

class Product(BaseModel):
    name:str = Field(...)
    price:float = Field(...,gt=1,lt=1000)
    description:str = Field(...)

    # 验证所有字段,mode有before和after
    @model_validator(mode='before')
    def validate_fields(cls,values):
        if values['name'].startswith('@'):
            raise ValueError('名称不能以@开头')
        if values['description'].startswith('@'):
            raise ValueError('描述不能以@开头')
        return values

class Order(BaseModel):
    order_id:str = Field(default_factory=lambda: str(uuid4()))
    quantity:int = Field(...,gt=0,lt=100)
    price:float = Field(...)
    total_price:float = Field(...)
    
    @model_validator(mode='before')
    def validate_order(cls, values):
        if values['total_price'] != values['quantity'] * values['price']:
            raise ValueError('总价计算错误')
        return values

3. ⚡ 异步处理

同步 vs 异步对比

python
# 异步处理
synTags = "异步操作"
@app.get("/async",tags=[synTags],summary="异步操作",description="异步操作")
async def async_operation():
    start = time.time()
    # 异步并发,多协程度操作
    tasks = [asyncio.sleep(1) for _ in range(5)]
    await asyncio.gather(*tasks)
    end = time.time()
    return {"message": "异步操作成功", "duration": end - start}

@app.get('/sync',tags=[synTags],summary='同步操作',description='同步操作')
def syn_operation():
    start = time.time()
    for _ in range(5):
        time.sleep(1)
    end = time.time()
    return {"message": "同步操作成功", "duration": end - start}

4. 📁 文件处理

单文件上传

python
# 异步读取上传文件
uploadTags = "上传文件"

@app.post('/upload',tags=[uploadTags],summary='文件上传',description='文件上传')
async def upload_file(file: UploadFile = UploadFile(...)):
    async with aiofiles.open(f"./static/{file.filename}", 'wb') as f:
        # file.read是异步操作
        await f.write(await file.read())
    return {"filename": file.filename}

@app.post('/batch-upload',tags=[uploadTags],summary='批量文件上传',description='批量文件上传')
async def batch_upload_file(files: list[UploadFile]):
    # 判断文件格式,是否是图片
    for file in files:
        if file.filename.endswith(('.jpg', '.jpeg', '.png', '.gif')):
            continue
        else:
            raise ValueError('文件格式错误')

    for file in files:
        async with aiofiles.open(f"./static/{file.filename}", 'wb') as f:
            await f.write(await file.read())
    return {"filenames": [file.filename for file in files]}

批量文件上传


5. 📡 请求处理

客户端信息获取

python
@app.get('/client-info',tags=[reqTags],summary='客户端信息',description='客户端信息')
def client_info(request: Request):
    print(request.query_params.get("id"))
    return {
        "client_ip": request.client.host,
        "client_port": request.client.port,
        "client_host": request.client.host,
        "client_headers": request.headers,
        "client_url": request.url,
        "client_method": request.method,
        "client_scheme": request.url.scheme,
        "client_path": request.url.path,
        "client_query": request.url.query,
        "client_url_params": request.path_params,
        "client_url_params_str": request.query_params,
    }

6. 🎯 统一响应

响应模型设计

python


from typing import Generic, TypeVar
from pydantic import BaseModel,Field
from config.constant import HttpStatusConstant
from datetime import datetime


T = TypeVar('T')

class SuccessResponse(BaseModel,Generic[T]):
    code: int = HttpStatusConstant.SUCCESS
    msg: str = '操作成功'
    success: bool = True
    data: T|None = None
    time: datetime = Field(default_factory=lambda: datetime.now())
    # 为 SuccessResponse 添加了 model_config = {"from_attributes": True} 以支持 ORM 对象转换
    model_config = {"from_attributes": True}

class FailureResponse(BaseModel):
    code: int = HttpStatusConstant.ERROR
    msg: str = '操作失败'
    success: bool = False

class ResponseUtil:
    """
    响应工具类
    """

    @classmethod
    def success(
        cls,
        msg: str = '操作成功',
        data: T = None,
        code: int = HttpStatusConstant.SUCCESS,
    ) -> SuccessResponse[T]:
        """
        成功响应方法

        :param msg: 可选,自定义成功响应信息
        :param data: 可选,成功响应结果中属性为data的值
        :param code: 可选,成功响应结果中属性为code的值
        :return: 成功响应结果
        """
        return SuccessResponse[T](msg=msg,data=data,code=code)

    @classmethod
    def failure(
        cls,
        msg: str = '失败',
        code: int = HttpStatusConstant.ERROR,
    ) -> FailureResponse:
        """
        失败响应方法

        :param msg: 可选,自定义失败响应信息
        :param data: 可选,失败响应结果中属性为data的值
        :param rows: 可选,失败响应结果中属性为rows的值
        :param dict_content: 可选,dict类型,失败响应结果中自定义属性的值
        :param model_content: 可选,BaseModel类型,失败响应结果中自定义属性的值
        :param headers: 可选,响应头信息
        :param media_type: 可选,响应结果媒体类型
        :param background: 可选,响应返回后执行的后台任务
        :return: 失败响应结果
        """
        
        return FailureResponse(msg=msg,code=code)

7. 📋 查询优化

分页查询实现

python
@app.get('/order2',tags=[orderTags],summary='订单查询2',description='使用装饰器自动转换查询参数')
def order_query2(page: int = Query(1, gt=0), limit: int = Query(10, gt=0), order_id: str = Query(None)):
    return {"page": page, "limit": limit, "order_id": order_id}

8. 💾 数据库操作

Tortoise ORM 配置

python
# Tortise-ORM 海龟orm
TORTOISE_ORM = {
    "connections": {
        "default": {
            "engine": "tortoise.backends.mysql",
            "credentials": {
                "host": "43.143.54.227",
                "port": 3306,
                "user": "root",
                "password": "mysql_h3zwAw",
                "database": "fastapi_l",
                "minsize": 20,       # 连接池最小连接数
                "maxsize": 100,      # 连接池最大连接数
                "charset": "utf8mb4",
                "echo": True,       # 生产环境建议关闭SQL日志False
            }
        }
    },
    "apps": {
        "models": {
            "models": ["aerich.models", "app.models"],  # 别忘了aerich的模型
            "default_connection": "default",
        }
    },
    "use_tz": False,
    "timezone": "Asia/Shanghai"
}

# 注册
    register_tortoise(
        app,
        config=TORTOISE_ORM,
        generate_schemas=True,  # 生产环境关闭自动建表False
        add_exception_handlers=True,  # 生产环境关闭异常处理,避免信息泄露False
    )


# Aerich 数据库迁移工具
# 1.初始化
aerich init -t config.settings.TORTOISE_ORM
# 2.生成迁移文件
aerich init-db
# 执行迁移
aerich migrate --name "测试"
# 升级
aerich upgrade
# 回滚迁移
aerich downgrade
# 查看迁移历史
aerich history

# 模型类
from tortoise.models import Model
from tortoise.fields import BooleanField, CharField, IntField,DatetimeField



from tortoise.models import Model
from tortoise.fields import BooleanField, CharField, ForeignKeyField, IntField,DatetimeField, OneToOneField,CASCADE,ManyToManyField


class UserProfile(Model):
    id = IntField(pk=True)
    age = IntField(default=0)
    avatar = CharField(max_length=255)
    created_at = DatetimeField(auto_now_add=True)
    updated_at = DatetimeField(auto_now=True)
    is_deleted = BooleanField(default=False)

    class Meta:
        table = "bfb_user_profile"
        ordering = ["-created_at"]
        indexes = [
            "created_at"
        ]

class User(Model):
    id = IntField(pk=True)
    real_name = CharField(max_length=255)
    user_name = CharField(max_length=20,unique=True)
    password = CharField(max_length=255)
    is_active = BooleanField(default=True)    
    created_at = DatetimeField(auto_now_add=True)
    updated_at = DatetimeField(auto_now=True)
    is_deleted = BooleanField(default=False)
    # related_name 用于反向查询,注册到UserProfile的user字段(models 对应TORTOISE_ORM.apps.models,可以改名) 1:1关联一次就行
    profile = OneToOneField("models.UserProfile",on_delete=CASCADE,related_name="user",null=True)

    # Meta类 用于配置模型元信息
    class Meta:
        table = "bfb_user" # 自定义表名
        ordering = ["-created_at"] # 按创建时间降序排序
        # 创建索引
        indexes = [
            "created_at"
        ]

    # 对象字符串表达
    def __str__(self):
        return self.user_name


class UserAddress(Model):
    id = IntField(pk=True)
    address = CharField(max_length=255)
    created_at = DatetimeField(auto_now_add=True)
    updated_at = DatetimeField(auto_now=True)
    is_deleted = BooleanField(default=False)
    # 外键
    user = ForeignKeyField("models.User", related_name="address")
    
    class Meta:
        table = "bfb_user_address"
        ordering = ["-created_at"]
        indexes = [
            "created_at"
        ]

class UserCourse(Model):
    id = IntField(pk=True)
    created_at = DatetimeField(auto_now_add=True)
    updated_at = DatetimeField(auto_now=True)
    is_deleted = BooleanField(default=False)
    # 外键
    user = ForeignKeyField("models.User", related_name="courses")
    course = ForeignKeyField("models.Course", related_name="users")
    
    class Meta:
        table = "bfb_user_course"
        unique_together = ("user_id", "course_id")
        ordering = ["-created_at"]
        indexes = [
            "created_at"
        ]
# 课程
class Course(Model):
    id = IntField(pk=True)
    name = CharField(max_length=255)
    description = CharField(max_length=255)
    created_at = DatetimeField(auto_now_add=True)
    updated_at = DatetimeField(auto_now=True)
    is_deleted = BooleanField(default=False)


    class Meta:
        table = "bfb_course"
        ordering = ["-created_at"]
        indexes = [
            "created_at"
        ]


class UserCreateReq(BaseModel):
    user_name:str = Field(...,min_length=3,max_length=10)
    password:str = Field(...,min_length=6,max_length=16)
    real_name:str = Field(...,min_length=1)
    age:int = Field(...,gt=0,lt=100)
    avatar:str = Field(...,min_length=1)

class UserUpdateReq(BaseModel):
    id:int = Field(...)
    password:str | None = Field(None,min_length=6,max_length=16)
    real_name:str | None = Field(None,min_length=1)

class UserRes(BaseModel):
    id:int
    user_name:str
    real_name:str
    is_active:bool
    created_at:datetime
    updated_at:datetime

    # 可以从模型中读取数据
    model_config = {"from_attributes": True}
    

async def create_user(userReq:UserCreateReq):
    try:
        # 将字符串编码为字节后再计算MD5
        # 使用事务优化性能
        from tortoise.transactions import in_transaction
        async with in_transaction():
            # 先创建profile
            profile = await UserProfile.create(
                age=req.age,
                avatar=req.avatar,
                is_deleted=False
            )
            
            # 再创建user并关联profile
            user = await User.create(
                real_name=req.real_name,
                user_name=req.user_name,
                password=md5(password.encode('utf-8')).hexdigest(),
                profile=profile
            )
        return user
    except Exception as e:
        raise e

async def update_user(userReq:UserUpdateReq):
    try:
        user = await User.get(id=userReq.id)
        if user is None:
            raise HTTPException(status_code=404, detail="用户不存在")
        if userReq.real_name is not None:
            user.real_name = userReq.real_name
        if userReq.password is not None:
            user.password = md5(userReq.password.encode('utf-8')).hexdigest()
        await user.save()
        return user
    except Exception as e:
        raise e

async def delete_user(user_id:int):
    try:
        user = await User.get(id=user_id)
        if user is None:
            raise HTTPException(status_code=404, detail="用户不存在")
        user.is_deleted = True
        await user.save()
    except Exception as e:
        raise e

@app.post("/user",tags=["users"],summary="创建用户2",description="创建用户2",response_model=SuccessResponse[UserRes] | FailureResponse)
async def create_user2(user: UserCreateReq):
    userRes = await UserService.create_user(user.user_name,user.password,user.real_name)
    return ResponseUtil.success(data=UserRes.model_validate(userRes))

@app.put('/user',tags=['users'],summary='更新用户',description='更新用户',response_model=SuccessResponse[UserRes] | FailureResponse)
async def update_user(user: UserUpdateReq):
    userRes = await UserService.update_user(user)
    return ResponseUtil.success(data=UserRes.model_validate(userRes))

@app.delete('/user',tags=['users'],summary='删除用户',description='删除用户',response_model=SuccessResponse | FailureResponse)
async def delete_user(user_id:int):
    await UserService.delete_user(user_id)
    return ResponseUtil.success()

9. 📁 文件操作

文件流式处理

python
# 带有 yield 的函数称为生成器函数,它不会一次性返回所有结果,而是按需生成值。生成器常用于流式响应,特别适合大文件下载


def read_file(file_path: str) -> Iterator[str]:

    with open(file_path, 'r', encoding='utf-8') as f:

        for line in f:

            yield line

@app.get('/download-file',tags=[fileTags],summary='文件流返回',description='自定义文件操作')
async def custom_file():
    # 文本信息返回
    info = b'File Content'
    return Response(content=info,media_type="text/plain",headers={"Content-Disposition": "attachment; filename=file.txt"})

@app.get('/download-pdf',tags=[fileTags],summary='pdf下载',description='自定义文件操作')
async def download_pdf():
    # PDF文件返回
    pdf_path = './static/document.pdf'
    return FileResponse(path=pdf_path, media_type="application/pdf", headers={"Content-Disposition": "attachment; filename=document.pdf"})


def generate_mp4_file(filepath:str,chunk_size:int=1024*1024):
    # 生成MP4文件
    with open(filepath, 'rb') as f:
        while chunk:=f.read(chunk_size):
            yield chunk  # 每次返回一个数据块
    

@app.get('/download-mp4',tags=[fileTags],summary='下载mpd',description='自定义文件操作')
async def download_mp4():
    # MP4文件返回
    mp4_path = './static/video_1.mp4'
    return StreamingResponse(content=generate_mp4_file(mp4_path), media_type="video/mp4", headers={"Content-Disposition": "attachment; filename=video_1.mp4"})

静态文件服务

python

# 挂载静态文件目录

from fastapi.staticfiles import StaticFiles
app.mount("/static", StaticFiles(directory="static"), name="static")
app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads")
app.mount("/assets", StaticFiles(directory="assets"), name="assets")

# 可以通过不同的name生成对应的URL
url_for("static", path="logo.png")    # → /static/logo.png
url_for("uploads", path="user.jpg")  # → /uploads/user.jpg
url_for("assets", path="app.js")     # → /assets/app.js

静态文件目录挂载


10. 🔗 依赖注入

python
from fastapi import FastAPI,Depends
from tortoise.contrib.fastapi import register_tortoise
from tortoise import Tortoise

app = FastAPI()

# 依赖注入,会把user_id传入get_user函数,然后返回user对象,获取关联属性prefetch_related提前加载
async def get_user(user_id:int):
    try:
        user = await User.get(id=user_id,is_deleted=False).prefetch_related('profile')
        if user is None:
            raise HTTPException(status_code=404, detail="用户不存在")
        return user
    except Exception as e:
        raise e


@app.get("/user/{user_id}",tags=["users"],summary="获取用户",description="获取用户",response_model=SuccessResponse[UserRes] | FailureResponse)
async def get_user(user_id:int,user:User = Depends(get_user)):
    return ResponseUtil.success(data=UserRes.model_validate(user))

# 路径级别注入dependencies
def get_admin_token(token: str = Header(...)):
    if token != "Bear 1234":
        raise HTTPException(status_code=401, detail="Invalid token")
    else:
        return token

@app.get('/admin',tags=['admin'],summary='管理员',description='管理员',dependencies=[Depends(get_admin_token)])
def read_admin():
    return {"message": "Welcome to admin panel"}

### 路由基本注入,多个注入,路径级别注入依赖性更高
def check_auth(token: str = Header(...)):
    if token != "1234":
        raise HTTPException(status_code=401, detail="Invalid token")
    else:
        return token

router = APIRouter(
    tags=["userTags"],
    responses={404: {"description": "Not found"}},
    dependencies=[Depends(check_auth)]
)

@router.get("/{user_id}")
async def read_user(user_id: str):
    return {"user_id": user_id}

### 全局依赖注入,如日志,认证,限流

async def log_request(request: Request):
    print(f"Request: {request.method} {request.url}")
    return {"logged":True}

app = FastAPI(dependencies=[Depends(log_request)])

11. 🛡️ 中间件

自定义中间件

python
from fastapi import FastAPI, Request, Response

app = FastAPI()

@app.middleware("http")
async def add_process_time_header(request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    return response

# 跨域中间件
app.add_middleware(CORSMiddleware,
                    allow_origins=["*"],    # 允许的域名,*表示所有域名,["http://localhost:8080"]
                    allow_credentials=True,  # 允许携带cookie
                    allow_methods=["*"],       # 允许的请求方法,*表示所有方法,["GET", "POST", "PUT", "DELETE"]
                    allow_headers=["*"],      # 允许的请求头,*表示所有头
                    expose_headers=["*"],     # 暴露的响应头,*表示所有头
                    max_age=3600)             # 预检请求的最大缓存时间

12. ⚠️ 异常处理

全局异常处理器

python
from fastapi import FastAPI, Request, Response
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse

app = FastAPI()

# 全局异常处理
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    return JSONResponse(
        status_code=422,
        content=ResponseUtil.failure(code=422,msg=str(exc)).model_dump()
    )

@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
    return JSONResponse(
        status_code=exc.status_code,
        content=ResponseUtil.failure(code=exc.status_code,msg=exc.detail).model_dump()
    )

13. 🔄 应用生命周期

python

# 方法一(不推荐)
## from fastapi import FastAPI

# app = FastAPI()

# @app.on_event("startup")
# async def startup():
#     print("Application is starting")

# @app.on_event("shutdown")
# async def shutdown():
#     print("Application is shutting down")

# 方法二 lifespan
from fastapi import FastAPI


# 定义生命周期管理器 通过@asynccontextmanager装饰器将函数转换为异步上下文管理器,确保在异步环境中正确执行
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 应用启动前执行的逻辑(加载模型)
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    print("模型加载完成,应用即将启动")
    yield
    # 应用关闭后执行的逻辑(释放资源)
    ml_models.clear()
    print("模型资源已释放,应用关闭完成")

app = FastAPI(lifespan=lifespan)

# 数据库连接池初始化与释放
# 加载共享的机器学习模型
# 启动消息队列消费者
# 初始化缓存服务连接

14. 🌐 异步IO

asyncio 核心概念

python
# Python 原生异步库,适用于网络服务/爬虫等I/O密集型场景。
创建任务:asyncio.create_task()
并发执行:asyncio.gather()
异步等待:await asyncio.sleep()

15. ⚙️ 配置管理

python
"""
环境配置管理
支持开发环境、生产环境等不同配置,按python server.py --env=dev
"""
import os
from dotenv import load_dotenv
from pydantic_settings import BaseSettings
from functools import lru_cache
import argparse


class AppSettings(BaseSettings):
    """
    应用配置
    """

    app_env: str = 'dev'
    app_name: str = 'fastapi-l'
    app_host: str = '0.0.0.0'
    app_port: int = 9099
    app_reload: bool = True
    app_debug: bool = True
    app_workers: int = 1
    app_cors_origins: list[str] = ["http://localhost", "http://localhost:4200", "http://localhost:3000", "http://localhost:8080"]
    app_swagger_url:str | None  = "/docs"
    app_redoc_url:str | None  = "/redoc"
    app_openapi_url:str | None  = "/openapi.json"

class JwtSettings(BaseSettings):
    """
    Jwt配置
    """

    jwt_secret_key: str = 'b01c66dc2c58dc6a0aabfe2144256be36226de378bf87f72c0c795dda67f4d55'
    jwt_expire_minutes: int = 1440
    jwt_refresh_token_expire_days: int = 30

class DataBaseSettings(BaseSettings):
    """
    数据库配置
    """
    db_host: str = '127.0.0.1'
    db_port: int = 3306
    db_username: str = 'root'
    db_password: str = 'mysqlroot'
    db_database: str = 'ruoyi-fastapi'
    db_echo: bool = True
    db_max_overflow: int = 10
    db_pool_size: int = 50
    db_pool_recycle: int = 3600
    db_pool_timeout: int = 30

class RedisSettings(BaseSettings):
    """
    Redis配置
    """

    redis_host: str = '127.0.0.1'
    redis_port: int = 6379
    redis_username: str = ''
    redis_password: str = ''
    redis_database: int = 2


class GetConfig:
    """
    获取配置
    """

    def __init__(self):
        self.parse_cli_args()

    @lru_cache()
    def get_app_config(self):
        """
        获取应用配置
        """
        # 实例化应用配置模型
        return AppSettings()

    @lru_cache()
    def get_jwt_config(self):
        """
        获取Jwt配置
        """
        # 实例化Jwt配置模型
        return JwtSettings()

    @lru_cache()
    def get_database_config(self):
        """
        获取数据库配置
        """
        # 实例化数据库配置模型
        return DataBaseSettings()

    @lru_cache()
    def get_redis_config(self):
        """
        获取Redis配置
        """
        # 实例化Redis配置模型
        return RedisSettings()

    
    @staticmethod
    def parse_cli_args():
        """
        解析命令行参数
        """
        # 检查是否在alembic环境中运行,如果是则跳过参数解析
            # 使用argparse定义命令行参数
        parser = argparse.ArgumentParser(description='命令行参数')
        parser.add_argument('--env', type=str, default='', help='运行环境')
            # 解析命令行参数
        args = parser.parse_args()
            # 设置环境变量,如果未设置命令行参数,默认APP_ENV为dev
        os.environ['APP_ENV'] = args.env if args.env else 'dev'
        # 读取运行环境
        run_env = os.environ.get('APP_ENV', '')
        # 运行环境未指定时默认加载.env.dev
        env_file = '.env.dev'
        # 运行环境不为空时按命令行参数加载对应.env文件
        if run_env != '':
            env_file = f'.env.{run_env}'
        # 加载配置
        _ = load_dotenv(env_file)


# 实例化获取配置类
get_config = GetConfig()
# 应用配置
AppConfig = get_config.get_app_config()
# Jwt配置
JwtConfig = get_config.get_jwt_config()
# 数据库配置
DataBaseConfig = get_config.get_database_config()
# Redis配置
RedisConfig = get_config.get_redis_config()

16. 📝 日志管理

Loguru 日志设计

日志配置与使用

  • Loguru设计方案
Client

FastAPI
  ├─ Access Log(请求日志)
  ├─ App Log(业务日志)
  ├─ Error Log(异常)
  └─ Trace ID(请求级追踪)

       Loguru

    文件 / stdout / 日志系统
  • 安装依赖
bash
pip install loguru python-json-logger
  • 调用
python
import sys
from loguru import logger
from pathlib import Path

LOG_DIR = Path("logs")
LOG_DIR.mkdir(exist_ok=True)

def setup_logger():
    logger.remove()

    # 1️⃣ 控制台(开发)
    _= logger.add(
        sys.stdout,
        level="INFO",
        format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{extra[trace_id]}</cyan> | {message}",
        enqueue=True,
    )

    # 2️⃣ 业务日志
    _ = logger.add(
        LOG_DIR / "app.log",
        level="INFO",
        rotation="100 MB",
        retention="7 days",
        compression="zip",
        enqueue=True,
        serialize=True,
    )

    # 3️⃣ 错误日志
    _ = logger.add(
        LOG_DIR / "error.log",
        level="ERROR",
        rotation="50 MB",
        retention="14 days",
        compression="zip",
        enqueue=True,
        serialize=True,
        backtrace=True,
        diagnose=True,
    )

## 调用
setup_logger()

# 使用
logger.info("test",trace_id="123456")

17. 🔧 限流fastapi-limiter

安装

  • 安装依赖
bash
pip install fastapi-limiter
pip install redis
  • 初始化
python
import redis.asyncio as redis
from config.env_config import RedisConfig

class RateLimiterManager:
    @staticmethod
    async def init():
        redis_url = f"redis://:{RedisConfig.redis_password}@{RedisConfig.redis_host}:{RedisConfig.redis_port}/{RedisConfig.redis_database}"
        redis_client = redis.from_url(redis_url,   encoding="utf-8",decode_responses=True,)
        await FastAPILimiter.init(redis_client)

# 调用
await RateLimiterManager.init()

# 使用(每秒最多10次)
from fastapi_limiter.depends import RateLimiter
@app.get("/test", dependencies=[RateLimiter(times=10, seconds=1)])
async def test():
    return {"message": "test"}

© 2024 快乐的程序员. All rights reserved.