FreezeJ' Blog

Celery使用介绍

2022-09-09

简介

Celery 是一个简单、灵活、可靠的分布式系统,可处理大量消息,同时提供维护此类系统所需的工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。

官方文档:
https://docs.celeryq.dev/en/stable/index.html
参考文档:
https://blog.csdn.net/youzhouliu/article/details/124239709
https://www.bbsmax.com/A/gVdnqYY85W/

基本结构

Celery 主要包含以下几个模块:

  • 任务模块 Task
    包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
  • 消息中间件 Broker
    Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
  • 任务执行单元 Worker
    Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。
  • 任务结果存储 Backend
    Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。

简单运行

本文使用环境:

  • CentOS Linux release 7.7.1908 (Core)
  • Python 3.7.2
  • celery 5.2.7

安装celery

# 本文使用redis作为broken,使用以下命令安装
pip3 install -U "celery[redis]"

# 查看版本
celery --version
# 5.2.7 (dawn-chorus)

安装redis:

docker run -d -p 127.0.0.1:6379:6379 redis

worker

test.py

import time
from celery import Celery

# 消息中间件用redis
broker = 'redis://127.0.0.1:6379'
# 结果存储使用redis
backend = 'redis://127.0.0.1:6379/0'

# 创建了一个Celery实例app,名称为 my_task
app = Celery('my_task', broker=broker, backend=backend)

@app.task
def add(x, y, debug=False):
    if debug:
        print('DEBUG MODE')
    time.sleep(5)     # 模拟耗时操作
    return x + y      # 返回任务结果

运行代码:

celery -A test worker --loglevel=INFO -c 8
#  -A, --app APPLICATION  这里-A其实是文件名,对应test,而不是my_task
# -c 子进程数,默认等于cpu个数

保持worker前台运行(如上图),接下来通过client脚本发起任务。

client

同步执行与异步执行

from test import add
import time

# 直接在本地运行任务(阻塞)
print('开始任务(阻塞)')
result = add(4,4)
print(f'任务完成,结果为:{result}')

# 发送到任务队列让worker执行(非阻塞)
print('开始任务(非阻塞)')
result = add.delay(4,4)

# 等待任务完成
while not result.ready():
    print('任务进行中,等待完成...')
    time.sleep(1)

# 任务完成,获取结果
print(f'任务完成,结果为:{result.get()}')

工作流程

celery支持几种不同的工作流程:

Signatures 签名

签名(Signatures)预先设定好任务的属性参数,方便后续不同工作流程的编排。(类似偏函数),有几种不同的写法:

signature('test.add', args=(2, 2), kwargs={'debug': True}, countdown=10)
add.signature(args=(2, 2), kwargs={'debug': True}, countdown=10)
add.s(2, 2, debug=True).set(countdown=10)

Signatures只是添加了属性,并没有实际执行,执行方式:

s = add.signature(args=(2, 2), kwargs={'debug': True}, countdown=10)
s()  # 同步执行
s.delay()  # 异步执行

Chains 链式调用

多个任务顺序执行,任务执行成功,上一个任务的结果可以传递给下一个任务。

from celery import chain
from test import add  # 本地文件的add任务

# 任务编排
tasks = chain(add.s(2, 2), add.s(4), add.s(8))

# 异步执行
res = tasks.apply_async()
res.get()  # 获取结果
# 16

print(res.parent.result)  # 获取上一个任务的结果
# 8

print(res.as_list())  # 打印任务链的task_id
# ['ce101446-a103-49a2-b7c6-7f8f29169ca7', '514cdf6b-4f23-4dcf-803f-3c2636701db1', '63a626fc-209a-452e-8f9c-7dc528897b5a']

Groups 任务组

多个任务同时发送到队列里面消费,如果有多个消费进程,多个任务会同时开始

from celery import group
from test import add  # 本地文件的add任务

# 任务编排
tasks = group(add.s(1, 1), add.s(2, 2), add.s(3, 3))

# 异步执行
res = tasks.apply_async()
res.get()  # 获取结果
# [2, 4, 6]

print(res.results)  # 结果实例
# [<AsyncResult: 05c2eb90-a0be-40ed-8799-828a27063ee1>, <AsyncResult: bdfa13c1-58ae-4b99-ab15-c3e1c672b0f9>, <AsyncResult: c16ba70f-4baa-427d-8c4d-4519922a23eb>]

Chord 任务组回调

多个任务同时发送到队列里面消费,如果有多个消费进程,多个任务会同时开始,全部任务成功执行回调。

test.py中加入xsum任务,用于结果回调。(修改代码后需要重新运行)

@app.task
def xsum(numbers):
    print(numbers)
    return sum(numbers)

任务组回调:

from celery import chord
from test import add, xsum  # 本地文件的add任务

# 任务编排
tasks = chord((add.s(1, 1), add.s(2, 2), add.s(3, 3)), xsum.s())

# 异步执行
res = tasks.apply_async()
res.get()  # 获取结果
# 12

Map & Starmap

相当于把group中的多个任务(任务相同,参数不同),组合成一个任务,发送到队列中执行。

Map:

from test import xsum  # 本地文件的xsum任务

# 任务编排
task = xsum.map([list(range(10)), list(range(100))])

# 异步执行
res = task.apply_async()
res.get()  # 获取结果
# [45, 4950]

该任务相当于:

@app.task
def temp():
    return [xsum(range(10)), xsum(range(100))]

Starmap:

from test import add  # 本地文件的add任务
task = add.starmap([(1, 1), (2, 2), (3, 3)])
res = task.apply_async()
res.get()  # 获取结果

该任务相当于:

@app.task
def temp():
    return [add(1, 1), add(2, 2), add(3, 3)]

Chunks

把Group组合成块执行,有n个块就会发送n个任务。

from test import add  # 本地文件的add任务

# 任务编排,每块最多2个任务
tasks = add.chunks([(0, 0), (1, 1), (2, 2), (3, 3)], 2)

# 异步执行
res = tasks.apply_async()
res.get()  # 获取结果
# [[0, 2], [4, 6]]

celery work的子进程数量是有限的,过多的任务会造成占用(消费不过来)或者过多的上下文切换,可以在考虑好性能与效率的情况下,尽量把任务合并,控制好同时发起的任务个数。

任务路由

默认的queue/exchange/binding的键是celery,exchange的类型是direct

import time
from celery import Celery
from kombu import Exchange, Queue

# 消息中间件用redis
broker = 'redis://127.0.0.1:6379'
# 结果存储使用redis
backend = 'redis://127.0.0.1:6379/0'

# 创建了一个Celery实例app,名称为 my_task
app = Celery('my_task', broker=broker, backend=backend)

# 定义一个Topic类型的Exchange
zone_exchange = Exchange('zone', type='topic')

app.conf.task_queues = (
    # 默认任务队列
    Queue('default', Exchange('default'), routing_key='default'),

    # 地区队列
    Queue('china',  exchange=zone_exchange, routing_key='china.#'),
    # 华南地区
    Queue('china_south',  exchange=zone_exchange, routing_key='china.south.#'),
    # 华东地区
    Queue('china_east',  exchange=zone_exchange, routing_key='china.east.#'),

    # 特殊任务队列,指定special才能使用
    Queue('special',  Exchange('special', type='direct'),   routing_key='special'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'

@app.task
def add(x, y):
    print(f'x:{x}, y:{y}')
    t = time.time()
    print(t)
    return t      # 返回任务结果

启动worker,并且选择监听的queue:

# 监听华南Topic任务
celery -A test worker --loglevel=INFO -c 4 -n test1 -Q china_south,china_south,china

# 只监听特殊任务
celery -A test worker --loglevel=INFO -c 4 -n test2 -Q special

# 监听默认队列
celery -A test worker --loglevel=INFO -c 4 -n test3 -Q default
from test import add  # 本地add任务
# 发送任务到Topic处理
add.apply_async((2, 2), queue='china')  # 执行一次
add.apply_async((2, 2), queue='china_south')  # 会被2个worker执行

# 发送任务到特殊队列处理
add.apply_async((2, 2), queue='special')

# 发送任务到默认队列处理
add.apply_async((2, 2))

如果不是必要设计,尽量不要使用Topic Exchange这种形式,感觉比较难控制worker去消费。明确定义好queue,然后生产者指定对应的queue(worker)来处理任务,比较简单合理。

延展阅读

发送到topic的任务,如果被多个worker消费,只会保存第一个返回结果。如果需要获取所有结果,可以在任务中添加额外的代码(比如写日志或插入数据库中)。
相关问题:https://stackoverflow.com/questions/15765618/have-celery-broadcast-return-results-from-all-workers

各种消息队列支持的Exchange Type:
https://docs.celeryq.dev/projects/kombu/en/master/introduction.html#transport-comparison

Queue和Exchange的关系,很容易让人困惑。stackOverflow上有个问答:https://stackoverflow.com/questions/29786019/why-do-celery-routes-have-both-a-queue-and-a-routing-key

翻了几遍官方文档的任务路由部分,感觉也不是很清晰,暂时先记录到这里,后续有新理解再补充。