Celery 基础使用指南
介绍
Celery 是一个分布式的任务队列。所谓任务队列就是给不同线程或进程分发任务的一种工具。分布式是指可以向不同的机器分发任务,充分利用多台机器的计算资源。Celery 由 Python 编写,但也有其他一些语言的实现。

项目主页:https://github.com/celery/celery
官方文档:https://docs.celeryq.dev
Celery 能做什么
使用 Celery 可以方便地实现异步任务和定时任务,利用其 Canvas 功能可以设计复杂的任务工作流。
比如你需要执行一段耗时计算(任务 A),在计算结束后需要将结果存入数据库(任务 B),再根据结果生成一份报告(任务 C),将报告通过电子邮件发送(任务 D)。你还需要每周对本周的执行结果进行统计(定时任务 E)。
异步任务的顺序如下,任务 A 执行完后 B 和 C 并行执行,C 执行完后执行 D。
1  |  | 
基础概念
- Task:任务,定义为一个 Python 函数,用于执行具体的操作
 - App:Celery 实例,控制任务的分发,管理 Worker
 - Worker:执行任务的进程
 - Broker:消息队列,用于存储待执行任务的信息
 - Backend:存储任务执行结果,如果不需要结果则不是必需的
 
Celery 工作的基本流程为:
- 编写任务函数,注册到 Celery 实例中
 - 运行 Celery 实例,启动 Worker 进程
 - 调用任务函数,将任务发送到消息队列
 - Worker 从消息队列中取出任务并执行
 - 执行完成后将结果存储到 Backend
 
基础用法
环境准备
首先需要选择 Celery 所使用的 Broker 和 Backend。
Celery 默认使用 RabbitMQ 作为 Broker,也可使用 Redis、Amazon SQS 等。Backend 可以使用 RabbitMQ、Redis、数据库等。
一个常见的组合是 RabbitMQ 作为 Broker,Redis 作为 Backend。简单起见,这里使用 Redis 作为 Broker 和 Backend。
接下来安装 Celery 和 Redis 库:
1  |  | 
配置实例和注册任务
新建文件 tasks.py,用于配置 Celery 实例和定义任务函数。对于小型项目,可以将配置 Celery 实例和定义任务函数合并到一个文件中,但当项目较大时建议分开以便于管理。
1  |  | 
在 tasks.py 中先创建了一个 Celery 实例 app,并对其进行了配置。Celery 的第一个参数是实例的名称,broker 和 backend 分别指定了使用的消息队列和结果存储的 url。
接着定义了一个任务函数 add,并使用 app.task 装饰器将其注册到 Celery 实例中。
启动 Worker
运行以下命令启动 Worker:
1  |  | 
-A 参数指定了 Celery 实例所在的模块,因为我们的实例定义在 tasks.py 中,所以这里指定为 tasks。
如果项目结构比较复杂可能需要指定为多级模块。比如 Celery 实例定义在 myproject/celery/app.py 中,那么在 myproject 目录下的启动命令为 celery -A celery.app worker -l INFO。
worker 指示启动 Worker 进程。
-l 参数指定了日志级别,这里设置为 INFO。
注意:Windows 下运行需要安装 gevent 库,并使用 -P gevent 参数指定使用 gevent
1  |  | 
运行成功后会看到类似下面的输出:
1  |  | 
[config] 部分显示了 Celery 实例的配置信息,app 是 Celery 实例的名称,transport 是 broker 的 url,results 是 backend 的 url,concurrency 是 Worker 进程的数量,默认为 CPU 核心数。
[queues] 部分显示了所使用的消息队列的信息。
[tasks] 部分显示了注册到 Celery 的任务函数。
调用任务
新建文件 test.py,用于调用任务函数。
1  |  | 
使用 delay() 调用任务函数,参数为任务函数的参数。delay() 会立即返回一个 AsyncResult 对象,可以通过 ready() 方法判断任务是否执行完成,通过 get() 方法获取任务的执行结果。get() 方法可以设置超时时间 timeout,如果任务在超时时间内未执行完成则会抛出异常。
运行 test.py,任务执行成功会输出 3。在 Celery 的日志中也会有类似下面的输出: 
1  |  | 
常见问题
提交了任务却没有执行
现象:提交了任务,但没有 Worker 执行任务,Celery 日志中也无相关输出
可能是配置了 Backend 但无法连接,比如没有启动 Redis 服务或 url 配置错误
任务执行报 PermissionError 或 OSError 错误
报错信息类似: * Pool process <billiard.pool.Worker object at 0x000001B328636B50> error: PermissionError(13, '拒绝访问。', None, 5, None) * OSError: [WinError 6] 句柄无效。 * PermissionError: [WinError 5] 拒绝访问。
Windows 不支持 fork,所以无法使用 Celery 默认的 prefork,需要使用 gevent 等替代方案。
解决方法:安装 gevent 库,并在启动 Celery 时指定使用对应的库。 
1  |  | 
任务执行报 ImportError 错误
现象:任务执行时报错,提示找不到模块,但明明有这个模块
一般在项目结构比较复杂时会出现,原因是启动 celery 时的目录会作为任务函数 import 的根目录,如果使用了相对导入就可能找不到模块。最好将 import 都写成绝对导入,并在项目根目录下启动 Celery。