Celery 基础使用指南(更新中)
介绍
Celery 是一个分布式的任务队列。所谓任务队列就是给不同线程或进程分发任务的一种工具。分布式是指可以向不同的机器分发任务,充分利用多台机器的计算资源。Celery 由 Python 编写,但也有其他一些语言的实现。
项目主页:https://github.com/celery/celery
官方文档:https://docs.celeryq.dev
Celery 能做什么
使用 Celery 可以方便地实现异步任务和定时任务,利用其 Canvas 功能可以设计复杂的任务工作流。
比如你需要执行一段耗时计算(任务 A),在计算结束后需要将结果存入数据库(任务 B),再根据结果生成一份报告(任务 C),将报告通过电子邮件发送(任务 D)。你还需要每周对本周的执行结果进行统计(定时任务 E)。
异步任务的顺序如下,任务 A 执行完后 B 和 C 并行执行,C 执行完后执行 D。
1 |
|
基础概念
- Task:任务,定义为一个 Python 函数,用于执行具体的操作
- App:Celery 实例,控制任务的分发,管理 Worker
- Worker:执行任务的进程
- Broker:消息队列,用于存储待执行任务的信息
- Backend:存储任务执行结果,如果不需要结果则不是必需的
Celery 工作的基本流程为:
- 编写任务函数,注册到 Celery 实例中
- 运行 Celery 实例,启动 Worker 进程
- 调用任务函数,将任务发送到消息队列
- Worker 从消息队列中取出任务并执行
- 执行完成后将结果存储到 Backend
基础用法
环境准备
首先需要选择 Celery 所使用的 Broker 和 Backend。
Celery 默认使用 RabbitMQ 作为 Broker,也可使用 Redis、Amazon SQS 等。Backend 可以使用 RabbitMQ、Redis、数据库等。
一个常见的组合是 RabbitMQ 作为 Broker,Redis 作为 Backend。简单起见,这里使用 Redis 作为 Broker 和 Backend。
接下来安装 Celery 和 Redis 库:
1 |
|
配置实例和注册任务
新建文件 tasks.py
,用于配置 Celery 实例和定义任务函数。对于小型项目,可以将配置 Celery 实例和定义任务函数合并到一个文件中,但当项目较大时建议分开以便于管理。
1 |
|
在 tasks.py
中先创建了一个 Celery 实例 app
,并对其进行了配置。Celery
的第一个参数是实例的名称,broker
和 backend
分别指定了使用的消息队列和结果存储的 url。
接着定义了一个任务函数 add
,并使用 app.task
装饰器将其注册到 Celery 实例中。
启动 Worker
运行以下命令启动 Worker:
1 |
|
-A
参数指定了 Celery 实例所在的模块,因为我们的实例定义在 tasks.py
中,所以这里指定为 tasks
。
如果项目结构比较复杂可能需要指定为多级模块。比如 Celery 实例定义在 myproject/celery/app.py
中,那么在 myproject
目录下的启动命令为 celery -A celery.app worker -l INFO
。
worker
指示启动 Worker 进程。
-l
参数指定了日志级别,这里设置为 INFO。
注意:Windows 下运行需要安装 gevent
库,并使用 -P gevent
参数指定使用 gevent
1 |
|
运行成功后会看到类似下面的输出:
1 |
|
[config]
部分显示了 Celery 实例的配置信息,app
是 Celery 实例的名称,transport
是 broker 的 url,results
是 backend 的 url,concurrency
是 Worker 进程的数量,默认为 CPU 核心数。
[queues]
部分显示了所使用的消息队列的信息。
[tasks]
部分显示了注册到 Celery 的任务函数。
调用任务
新建文件 test.py
,用于调用任务函数。
1 |
|
使用 delay()
调用任务函数,参数为任务函数的参数。delay()
会立即返回一个 AsyncResult
对象,可以通过 ready()
方法判断任务是否执行完成,通过 get()
方法获取任务的执行结果。get()
方法可以设置超时时间 timeout
,如果任务在超时时间内未执行完成则会抛出异常。
运行 test.py
,任务执行成功会输出 3
。在 Celery 的日志中也会有类似下面的输出:
1 |
|
常见问题
提交了任务却没有执行
现象:提交了任务,但没有 Worker 执行任务,Celery 日志中也无相关输出
可能是配置了 Backend 但无法连接,比如没有启动 Redis 服务或 url 配置错误
任务执行报 PermissionError
或 OSError
错误
报错信息类似: * Pool process <billiard.pool.Worker object at 0x000001B328636B50> error: PermissionError(13, '拒绝访问。', None, 5, None)
* OSError: [WinError 6] 句柄无效。
* PermissionError: [WinError 5] 拒绝访问。
Windows 不支持 fork
,所以无法使用 Celery 默认的 prefork
,需要使用 gevent
等替代方案。
解决方法:安装 gevent
库,并在启动 Celery 时指定使用对应的库。
1 |
|
任务执行报 ImportError
错误
现象:任务执行时报错,提示找不到模块,但明明有这个模块
一般在项目结构比较复杂时会出现,原因是启动 celery 时的目录会作为任务函数 import 的根目录,如果使用了相对导入就可能找不到模块。最好将 import 都写成绝对导入,并在项目根目录下启动 Celery。