Skip to main content

Celery

Celery 是 Python 生态中广泛使用的分布式任务队列:把耗时或需要异步执行的操作(发邮件、生成报表、调用大模型、批处理)封装成「任务」,交给独立进程(worker)在后台执行,Web 或 API 进程只负责接收请求并投递任务,从而避免阻塞、提高吞吐。支持定时/周期任务(Celery Beat)、任务链与组结果存储与重试,可与 Redis、RabbitMQ 等作为 broker 和 result backend 配合使用。

其设计围绕「broker + worker + app」:broker 是消息中间件(Redis、RabbitMQ 等),用于投递任务消息;backend 可选,用于存储任务结果;app(Celery 应用)定义 task、配置与路由;worker 进程消费 broker 中的任务并执行;beat 进程按计划向 broker 投递定时任务。典型场景包括:Web 请求后异步发邮件、定时拉取数据、多机分布式计算、长耗时 AI 推理等。

Celery 官方文档

安装与 Broker

pip install celery

使用 Redis 作为 broker 时需安装 Redis 并确保本机可连接;使用 RabbitMQ 则需安装并启动 RabbitMQ 服务。

# 最小配置:仅指定 broker(Redis 示例)
from celery import Celery

app = Celery("myapp", broker="redis://localhost:6379/0")
tip

Broker 与消息队列:Broker 是「任务消息」的暂存处。应用调用 task.delay() 时,Celery 将任务序列化后发送到 broker;worker 从 broker 取消息、反序列化并执行。Redis 部署简单、适合中小规模;RabbitMQ 功能更全、适合高可靠与复杂路由。

创建 App 与 Task

@app.task 装饰函数即可定义任务;任务函数会在 worker 进程中执行。

from celery import Celery

app = Celery("myapp", broker="redis://localhost:6379/0")

@app.task
def add(x, y):
return x + y

@app.task(bind=True)
def fail_retry(self, x):
if x < 0:
raise self.retry(countdown=5)
return x

bind=True 时,第一个参数为 self(任务请求对象),可调用 self.retry() 等。任务可放在单独模块,由 app 通过 include 或导入方式加载。

启动 Worker

在项目根或能导入 app 的目录下执行:

celery -A myapp worker --loglevel=info

-A 指定「应用所在模块」(如 myapp 表示当前目录下的 myapp.py 或包)。Worker 会持续从 broker 取任务并执行;多台机器可各自启动 worker,共同消费同一 broker,实现分布式。

tip

分布式与多 Worker:多个 worker 进程/多台机器共享同一 broker 时,任务会被任意一个空闲 worker 取走执行;通过增加 worker 数量可水平扩展处理能力。

调用任务与结果

在 Web 或脚本中调用任务:delay(*args, **kwargs) 为异步投递,返回 AsyncResultapply_async() 可指定队列、延迟、过期时间等。

from myapp import add

# 异步调用,立即返回
r = add.delay(4, 5)
print(r.id) # 任务 ID
print(r.get()) # 阻塞直到拿到结果,默认使用 backend 存储的结果

若未配置 result_backendr.get() 无法取回结果;配置后 Celery 会将返回值写入 backend(如 Redis),get() 从 backend 读取。

app = Celery("myapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")

@app.task
def add(x, y):
return x + y

# 其他进程或脚本中
from myapp import add
r = add.delay(2, 3)
print(r.get(timeout=10)) # 5

链与组

chain:前一个任务的返回值作为下一个任务的第一个参数,顺序执行。

from celery import chain
from myapp import add

# add(add(add(1, 2), 3), 4) 即 1+2+3+4
c = chain(add.s(1, 2), add.s(3), add.s(4))
result = c.apply_async()
print(result.get()) # 10

group:多个任务并行投递,等待全部完成。

from celery import group
from myapp import add

g = group(add.s(i, i) for i in range(5))
r = g.apply_async()
print(r.get()) # [0, 2, 4, 6, 8]

.s() 为签名(不立即执行),用于组合成 chain/group。

定时任务(Beat)

Celery Beat 按计划向 broker 投递任务,需单独启动一个 beat 进程,并配置 beat_schedule

from celery import Celery
from celery.schedules import crontab

app = Celery("myapp", broker="redis://localhost:6379/0")

@app.task
def daily_report():
print("run daily report")

app.conf.beat_schedule = {
"daily-report": {
"task": "myapp.daily_report",
"schedule": crontab(hour=8, minute=0),
},
}

启动 beat(与 worker 分开运行):

celery -A myapp beat --loglevel=info

同一项目中 worker 负责执行任务,beat 只负责按时投递。

与 FastAPI / Django 集成

在 FastAPI 中通常将 app 放在可导入模块(如 celery_app.py),在路由中调用 task.delay()task.apply_async(),不在响应中直接 get()(会阻塞);若需将结果返回给前端,可先返回任务 ID,由前端轮询或通过 WebSocket 查询结果。

# celery_app.py
from celery import Celery
celery_app = Celery("myapp", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")

@celery_app.task
def long_task(n):
return sum(range(n))
# main.py (FastAPI)
from fastapi import FastAPI
from celery_app import long_task

app = FastAPI()

@app.post("/run")
def run_task(n: int):
r = long_task.delay(n)
return {"task_id": r.id}

@app.get("/result/{task_id}")
def get_result(task_id: str):
from celery_app import celery_app
res = celery_app.AsyncResult(task_id)
if res.ready():
return {"status": "ready", "result": res.get()}
return {"status": "pending"}

Django 可通过 django-celery-results 等将结果存数据库,用法类似:在视图里 task.delay(),结果由 worker 写入,前端或定时轮询用 AsyncResult(id).get() 获取。