Celery 基础使用指南(更新中)

介绍

Celery 是一个分布式的任务队列。所谓任务队列就是给不同线程或进程分发任务的一种工具。分布式是指可以向不同的机器分发任务,充分利用多台机器的计算资源。Celery 由 Python 编写,但也有其他一些语言的实现。

celery logo
celery logo

项目主页:https://github.com/celery/celery

官方文档:https://docs.celeryq.dev

Celery 能做什么

使用 Celery 可以方便地实现异步任务和定时任务,利用其 Canvas 功能可以设计复杂的任务工作流。

比如你需要执行一段耗时计算(任务 A),在计算结束后需要将结果存入数据库(任务 B),再根据结果生成一份报告(任务 C),将报告通过电子邮件发送(任务 D)。你还需要每周对本周的执行结果进行统计(定时任务 E)。

异步任务的顺序如下,任务 A 执行完后 B 和 C 并行执行,C 执行完后执行 D。

1
2
3
任务A
├── 任务B
└── 任务C ── 任务D
有了Celery,你只需编写任务函数和定义工作流,启动工作流后Celery会自动帮你完成其中任务的分发和执行。对于定时任务,只需定义任务函数和调度规则,Celery就会定期执行任务。

基础概念

  • Task:任务,定义为一个 Python 函数,用于执行具体的操作
  • App:Celery 实例,控制任务的分发,管理 Worker
  • Worker:执行任务的进程
  • Broker:消息队列,用于存储待执行任务的信息
  • Backend:存储任务执行结果,如果不需要结果则不是必需的

Celery 工作的基本流程为:

  1. 编写任务函数,注册到 Celery 实例中
  2. 运行 Celery 实例,启动 Worker 进程
  3. 调用任务函数,将任务发送到消息队列
  4. Worker 从消息队列中取出任务并执行
  5. 执行完成后将结果存储到 Backend

基础用法

环境准备

首先需要选择 Celery 所使用的 Broker 和 Backend。

Celery 默认使用 RabbitMQ 作为 Broker,也可使用 Redis、Amazon SQS 等。Backend 可以使用 RabbitMQ、Redis、数据库等。

一个常见的组合是 RabbitMQ 作为 Broker,Redis 作为 Backend。简单起见,这里使用 Redis 作为 Broker 和 Backend。

接下来安装 Celery 和 Redis 库:

1
2
pip install celery
pip install redis

配置实例和注册任务

新建文件 tasks.py,用于配置 Celery 实例和定义任务函数。对于小型项目,可以将配置 Celery 实例和定义任务函数合并到一个文件中,但当项目较大时建议分开以便于管理。

1
2
3
4
5
6
7
8
# tasks.py
from celery import Celery

app = Celery('my_celery', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

@app.task
def add(x, y):
return x + y

tasks.py 中先创建了一个 Celery 实例 app,并对其进行了配置。Celery 的第一个参数是实例的名称,brokerbackend 分别指定了使用的消息队列和结果存储的 url。

接着定义了一个任务函数 add,并使用 app.task 装饰器将其注册到 Celery 实例中。

启动 Worker

运行以下命令启动 Worker:

1
celery -A tasks worker -l INFO

-A 参数指定了 Celery 实例所在的模块,因为我们的实例定义在 tasks.py 中,所以这里指定为 tasks

如果项目结构比较复杂可能需要指定为多级模块。比如 Celery 实例定义在 myproject/celery/app.py 中,那么在 myproject 目录下的启动命令为 celery -A celery.app worker -l INFO

worker 指示启动 Worker 进程。

-l 参数指定了日志级别,这里设置为 INFO。

注意:Windows 下运行需要安装 gevent 库,并使用 -P gevent 参数指定使用 gevent

1
2
pip install gevent
celery -A tasks worker -l INFO -P gevent

运行成功后会看到类似下面的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 -------------- celery@LAPTOP-ABCDEFGH v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- Windows-10-10.0.22631-SP0 2024-12-14 17:54:56
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: my_celery:0x1de24da00d0
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost:6379/1
- *** --- * --- .> concurrency: 16 (gevent)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. tasks.add

[2024-12-14 17:54:57,010: INFO/MainProcess] Connected to redis://localhost:6379/0
[2024-12-14 17:54:57,013: INFO/MainProcess] mingle: searching for neighbors
[2024-12-14 17:54:58,036: INFO/MainProcess] mingle: all alone
[2024-12-14 17:54:58,045: INFO/MainProcess] celery@LAPTOP-ABCDEFGH ready.

[config] 部分显示了 Celery 实例的配置信息,app 是 Celery 实例的名称,transport 是 broker 的 url,results 是 backend 的 url,concurrency 是 Worker 进程的数量,默认为 CPU 核心数。

[queues] 部分显示了所使用的消息队列的信息。

[tasks] 部分显示了注册到 Celery 的任务函数。

调用任务

新建文件 test.py,用于调用任务函数。

1
2
3
4
5
6
7
8
9
10
# test.py
from tasks import add
import time

result = add.delay(1, 2)

time.sleep(1)
if result.ready():
ans = result.get(timeout=1)
print(ans)

使用 delay() 调用任务函数,参数为任务函数的参数。delay() 会立即返回一个 AsyncResult 对象,可以通过 ready() 方法判断任务是否执行完成,通过 get() 方法获取任务的执行结果。get() 方法可以设置超时时间 timeout,如果任务在超时时间内未执行完成则会抛出异常。

运行 test.py,任务执行成功会输出 3。在 Celery 的日志中也会有类似下面的输出:

1
2
[2024-12-14 17:55:02,616: INFO/MainProcess] Task tasks.add[c9f34d96-6490-4c98-b409-550cedc87372] received
[2024-12-14 17:55:58,624: INFO/MainProcess] Task tasks.add[c9f34d96-6490-4c98-b409-550cedc87372] succeeded in 0.014999999999417923s: 3

常见问题

提交了任务却没有执行

现象:提交了任务,但没有 Worker 执行任务,Celery 日志中也无相关输出

可能是配置了 Backend 但无法连接,比如没有启动 Redis 服务或 url 配置错误

任务执行报 PermissionErrorOSError 错误

报错信息类似: * Pool process <billiard.pool.Worker object at 0x000001B328636B50> error: PermissionError(13, '拒绝访问。', None, 5, None) * OSError: [WinError 6] 句柄无效。 * PermissionError: [WinError 5] 拒绝访问。

Windows 不支持 fork,所以无法使用 Celery 默认的 prefork,需要使用 gevent 等替代方案。

解决方法:安装 gevent 库,并在启动 Celery 时指定使用对应的库。

1
2
pip install gevent
celery -A your_app worker -P gevent

任务执行报 ImportError 错误

现象:任务执行时报错,提示找不到模块,但明明有这个模块

一般在项目结构比较复杂时会出现,原因是启动 celery 时的目录会作为任务函数 import 的根目录,如果使用了相对导入就可能找不到模块。最好将 import 都写成绝对导入,并在项目根目录下启动 Celery。


Celery 基础使用指南(更新中)
http://blog.qzink.me/posts/Celery-基础使用指南/
作者
Qzink
发布于
2024年12月14日
许可协议