FastAPI + Celery 实战:异步任务里调用 Redis 和数据库的全解析及生产级组织方案
作者:admin | 分类:顶尖机器人 | 浏览:1 | 日期:2026年04月02日
在现代Web开发中,FastAPI凭借其高性能异步特性成为构建API的热门选择,而Celery则是处理异步任务的利器。但在异步任务中调用Redis和数据库时,稍有不慎就会引发连接泄露、资源耗尽等问题。本文将从踩坑案例出发,解析问题根源,并给出生产级的代码组织方案。
一、异步任务调用Redis和数据库的常见陷阱
很多开发者在使用FastAPI的BackgroundTasks时,会直接将请求生命周期内创建的数据库Session或RedisConnection传递给异步任务,看似简单高效,实则隐藏巨大风险。
曾有一个用户头像处理项目,上传头像后异步生成缩略图,将结果存入MySQL,同时把用户ID和任务ID存入Redis做状态追踪。上线初期运行正常,但几天后开始随机报错:MySQL连接已关闭、Redis连接数超限,甚至出现任务执行到一半数据写入失败的情况。
经过排查发现,问题出在资源生命周期管理上。FastAPI的BackgroundTasks在请求响应返回后执行,但请求结束时,框架会自动关闭请求生命周期内创建的数据库会话和Redis连接。此时异步任务再使用这些已关闭的资源,必然报错。核心教训是:绝不能在异步任务中复用请求生命周期内的资源对象。
二、核心原理:资源生命周期分离
正确的做法是遵循“谁用谁创建,用完自己关”的原则,让异步任务拥有独立的资源管理逻辑。在任务函数内部重新创建数据库Session和Redis连接,任务执行完毕后确保资源被正确关闭或归还。
这意味着要摒弃依赖外部传递资源的方式,转而在任务内部实现资源的初始化与释放。虽然增加了代码量,但能从根本上避免资源生命周期错乱问题,保证异步任务的稳定性。
三、生产级代码组织方案
1. 项目结构设计
合理的项目结构是代码可维护的基础,推荐如下结构:
project/
├── app/
│ ├── api/ # 路由层
│ ├── core/ # 核心配置
│ │ ├── database.py # 数据库配置
│ │ └── redis_client.py # Redis配置
│ ├── models/ # 数据库模型
│ ├── schemas/ # Pydantic模型
│ └── tasks/ # 异步任务模块
│ ├── __init__.py
│ ├── user_tasks.py # 具体任务实现
│ └── worker.py # 任务资源管理工具
└── requirements.txt
2. 核心资源管理实现
在tasks/worker.py中定义资源管理的辅助函数,使用上下文管理器确保资源正确释放:
from sqlalchemy.orm import sessionmaker
from app.core.database import engine
from app.core.redis_client import get_redis_client
from contextlib import contextmanager
# 模块级别创建Session工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@contextmanager
def get_db_session():
"""每个任务独立创建数据库会话,用完即关"""
db = SessionLocal()
try:
yield db
db.commit()
except Exception:
db.rollback()
raise
finally:
db.close()
@contextmanager
def get_redis_conn():
"""每个任务独立创建Redis连接,用完归还"""
redis_conn = get_redis_client()
try:
yield redis_conn
finally:
redis_conn.close()
3. 异步任务实现
在tasks/user_tasks.py中编写具体任务逻辑,使用上述资源管理函数:
from app.tasks.worker import get_db_session, get_redis_conn
from app.models import UserAvatar
from PIL import Image
import os
@celery_app.task(bind=True, max_retries=3)
def process_avatar(self, user_id: int, avatar_path: str):
"""处理用户头像,生成缩略图并更新数据库和Redis"""
try:
# 生成不同尺寸缩略图
with Image.open(avatar_path) as img:
sizes = [(100, 100), (200, 200), (400, 400)]
thumb_paths = []
for size in sizes:
thumb = img.resize(size)
thumb_path = f"avatars/thumb_{size}_{user_id}.jpg"
thumb.save(thumb_path)
thumb_paths.append(thumb_path)
# 更新数据库
with get_db_session() as db:
user_avatar = UserAvatar(user_id=user_id, thumb_paths=thumb_paths)
db.add(user_avatar)
# 更新Redis状态
with get_redis_conn() as redis:
redis.set(f"task_status:{self.request.id}", "completed")
redis.set(f"user_avatar:{user_id}", ",".join(thumb_paths))
return {"status": "success", "user_id": user_id}
except Exception as exc:
# 异常重试
with get_redis_conn() as redis:
redis.set(f"task_status:{self.request.id}", f"failed: {str(exc)}")
raise self.retry(exc=exc, countdown=2**self.request.retries)
4. FastAPI接口集成
在api/endpoints.py中编写接口,触发异步任务:
from fastapi import APIRouter, UploadFile, File
from app.tasks.user_tasks import process_avatar
import shutil
router = APIRouter()
@router.post("/upload-avatar")
async def upload_avatar(user_id: int, file: UploadFile = File(...)):
"""上传用户头像并触发异步处理任务"""
# 保存上传文件到临时目录
temp_path = f"temp/{user_id}_{file.filename}"
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 触发异步任务
task = process_avatar.delay(user_id, temp_path)
return {"task_id": task.id, "message": "头像处理任务已启动"}
四、总结
通过资源生命周期分离和合理的代码组织,我们可以在FastAPI + Celery的架构中安全、高效地调用Redis和数据库。这种方案不仅解决了连接泄露等问题,还提高了代码的可维护性和可扩展性。在实际项目中,还可以结合Celery的任务状态跟踪、重试机制等特性,进一步提升系统的稳定性和用户体验。