简介
Celery 是一个简单、灵活、可靠的分布式系统,可处理大量消息,同时提供维护此类系统所需的工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
官方文档:
https://docs.celeryq.dev/en/stable/index.html
参考文档:
https://blog.csdn.net/youzhouliu/article/details/124239709
https://www.bbsmax.com/A/gVdnqYY85W/
基本结构
Celery 主要包含以下几个模块:
- 任务模块 Task
包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。 - 消息中间件 Broker
Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。 - 任务执行单元 Worker
Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。 - 任务结果存储 Backend
Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。
简单运行
本文使用环境:
- CentOS Linux release 7.7.1908 (Core)
- Python 3.7.2
- celery 5.2.7
安装celery
# 本文使用redis作为broken,使用以下命令安装
pip3 install -U "celery[redis]"
# 查看版本
celery --version
# 5.2.7 (dawn-chorus)
安装redis:
docker run -d -p 127.0.0.1:6379:6379 redis
worker
test.py
import time
from celery import Celery
# 消息中间件用redis
broker = 'redis://127.0.0.1:6379'
# 结果存储使用redis
backend = 'redis://127.0.0.1:6379/0'
# 创建了一个Celery实例app,名称为 my_task
app = Celery('my_task', broker=broker, backend=backend)
@app.task
def add(x, y, debug=False):
if debug:
print('DEBUG MODE')
time.sleep(5) # 模拟耗时操作
return x + y # 返回任务结果
运行代码:
celery -A test worker --loglevel=INFO -c 8
# -A, --app APPLICATION 这里-A其实是文件名,对应test,而不是my_task
# -c 子进程数,默认等于cpu个数
保持worker前台运行(如上图),接下来通过client脚本发起任务。
client
同步执行与异步执行
from test import add
import time
# 直接在本地运行任务(阻塞)
print('开始任务(阻塞)')
result = add(4,4)
print(f'任务完成,结果为:{result}')
# 发送到任务队列让worker执行(非阻塞)
print('开始任务(非阻塞)')
result = add.delay(4,4)
# 等待任务完成
while not result.ready():
print('任务进行中,等待完成...')
time.sleep(1)
# 任务完成,获取结果
print(f'任务完成,结果为:{result.get()}')
工作流程
celery支持几种不同的工作流程:
Signatures 签名
签名(Signatures)预先设定好任务的属性参数,方便后续不同工作流程的编排。(类似偏函数),有几种不同的写法:
signature('test.add', args=(2, 2), kwargs={'debug': True}, countdown=10)
add.signature(args=(2, 2), kwargs={'debug': True}, countdown=10)
add.s(2, 2, debug=True).set(countdown=10)
Signatures只是添加了属性,并没有实际执行,执行方式:
s = add.signature(args=(2, 2), kwargs={'debug': True}, countdown=10)
s() # 同步执行
s.delay() # 异步执行
Chains 链式调用
多个任务顺序执行,任务执行成功,上一个任务的结果可以传递给下一个任务。
from celery import chain
from test import add # 本地文件的add任务
# 任务编排
tasks = chain(add.s(2, 2), add.s(4), add.s(8))
# 异步执行
res = tasks.apply_async()
res.get() # 获取结果
# 16
print(res.parent.result) # 获取上一个任务的结果
# 8
print(res.as_list()) # 打印任务链的task_id
# ['ce101446-a103-49a2-b7c6-7f8f29169ca7', '514cdf6b-4f23-4dcf-803f-3c2636701db1', '63a626fc-209a-452e-8f9c-7dc528897b5a']
Groups 任务组
多个任务同时发送到队列里面消费,如果有多个消费进程,多个任务会同时开始。
from celery import group
from test import add # 本地文件的add任务
# 任务编排
tasks = group(add.s(1, 1), add.s(2, 2), add.s(3, 3))
# 异步执行
res = tasks.apply_async()
res.get() # 获取结果
# [2, 4, 6]
print(res.results) # 结果实例
# [<AsyncResult: 05c2eb90-a0be-40ed-8799-828a27063ee1>, <AsyncResult: bdfa13c1-58ae-4b99-ab15-c3e1c672b0f9>, <AsyncResult: c16ba70f-4baa-427d-8c4d-4519922a23eb>]
Chord 任务组回调
多个任务同时发送到队列里面消费,如果有多个消费进程,多个任务会同时开始,全部任务成功执行回调。
往test.py
中加入xsum
任务,用于结果回调。(修改代码后需要重新运行)
@app.task
def xsum(numbers):
print(numbers)
return sum(numbers)
任务组回调:
from celery import chord
from test import add, xsum # 本地文件的add任务
# 任务编排
tasks = chord((add.s(1, 1), add.s(2, 2), add.s(3, 3)), xsum.s())
# 异步执行
res = tasks.apply_async()
res.get() # 获取结果
# 12
Map & Starmap
相当于把group中的多个任务(任务相同,参数不同),组合成一个任务,发送到队列中执行。
Map:
from test import xsum # 本地文件的xsum任务
# 任务编排
task = xsum.map([list(range(10)), list(range(100))])
# 异步执行
res = task.apply_async()
res.get() # 获取结果
# [45, 4950]
该任务相当于:
@app.task
def temp():
return [xsum(range(10)), xsum(range(100))]
Starmap:
from test import add # 本地文件的add任务
task = add.starmap([(1, 1), (2, 2), (3, 3)])
res = task.apply_async()
res.get() # 获取结果
该任务相当于:
@app.task
def temp():
return [add(1, 1), add(2, 2), add(3, 3)]
Chunks
把Group组合成块执行,有n个块就会发送n个任务。
from test import add # 本地文件的add任务
# 任务编排,每块最多2个任务
tasks = add.chunks([(0, 0), (1, 1), (2, 2), (3, 3)], 2)
# 异步执行
res = tasks.apply_async()
res.get() # 获取结果
# [[0, 2], [4, 6]]
celery work的子进程数量是有限的,过多的任务会造成占用(消费不过来)或者过多的上下文切换,可以在考虑好性能与效率的情况下,尽量把任务合并,控制好同时发起的任务个数。
任务路由
默认的queue/exchange/binding的键是celery,exchange的类型是direct
import time
from celery import Celery
from kombu import Exchange, Queue
# 消息中间件用redis
broker = 'redis://127.0.0.1:6379'
# 结果存储使用redis
backend = 'redis://127.0.0.1:6379/0'
# 创建了一个Celery实例app,名称为 my_task
app = Celery('my_task', broker=broker, backend=backend)
# 定义一个Topic类型的Exchange
zone_exchange = Exchange('zone', type='topic')
app.conf.task_queues = (
# 默认任务队列
Queue('default', Exchange('default'), routing_key='default'),
# 地区队列
Queue('china', exchange=zone_exchange, routing_key='china.#'),
# 华南地区
Queue('china_south', exchange=zone_exchange, routing_key='china.south.#'),
# 华东地区
Queue('china_east', exchange=zone_exchange, routing_key='china.east.#'),
# 特殊任务队列,指定special才能使用
Queue('special', Exchange('special', type='direct'), routing_key='special'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'
@app.task
def add(x, y):
print(f'x:{x}, y:{y}')
t = time.time()
print(t)
return t # 返回任务结果
启动worker,并且选择监听的queue:
# 监听华南Topic任务
celery -A test worker --loglevel=INFO -c 4 -n test1 -Q china_south,china_south,china
# 只监听特殊任务
celery -A test worker --loglevel=INFO -c 4 -n test2 -Q special
# 监听默认队列
celery -A test worker --loglevel=INFO -c 4 -n test3 -Q default
from test import add # 本地add任务
# 发送任务到Topic处理
add.apply_async((2, 2), queue='china') # 执行一次
add.apply_async((2, 2), queue='china_south') # 会被2个worker执行
# 发送任务到特殊队列处理
add.apply_async((2, 2), queue='special')
# 发送任务到默认队列处理
add.apply_async((2, 2))
如果不是必要设计,尽量不要使用Topic Exchange
这种形式,感觉比较难控制worker
去消费。明确定义好queue
,然后生产者指定对应的queue
(worker)来处理任务,比较简单合理。
延展阅读
发送到topic的任务,如果被多个worker消费,只会保存第一个返回结果。如果需要获取所有结果,可以在任务中添加额外的代码(比如写日志或插入数据库中)。
相关问题:https://stackoverflow.com/questions/15765618/have-celery-broadcast-return-results-from-all-workers
各种消息队列支持的Exchange Type:
https://docs.celeryq.dev/projects/kombu/en/master/introduction.html#transport-comparison
Queue和Exchange的关系,很容易让人困惑。stackOverflow上有个问答:https://stackoverflow.com/questions/29786019/why-do-celery-routes-have-both-a-queue-and-a-routing-key
翻了几遍官方文档的任务路由部分,感觉也不是很清晰,暂时先记录到这里,后续有新理解再补充。