原文:5 tips for writing production-ready Celery tasks - 2021.09.15
作者:Jerry Pussinen
Wolt.com ,生产环境采用 Celery 多年. 应用场景覆盖了定时任务(fixed 上schedule,cron),到即时任务(fire-and-forget).
这里,作者分享了五个生产环境 Celery 任务开发的建议.
1. 任务短比长更好(Short > long)
根据经验,短任务是比长任务更好的.
任务越长,则其占用的 worker 进程越久,进而可能导致更为重要的任务在队列中等待.
Celery primitives 提供了将任务划分为不同的小任务的工具箱.
例如,这样的应用场景,数据从多个源采集,并在最终生成一些报告.
- 一种直接的方法是,序列地从每个源采集数据,并最终再创建报告. 每个都是单个任务的形式.
- 一种更好的方法是,采用 Celery chord,其可以并行的运行数据采集,并在所有数据采集任务完成后,生成报告. work 的所有分片都可以作为单独的任务实现. 即,由于数据采集的并行性,更短的任务和很可能显著缩短整个操作的执行时间.
在需要的时候,可以考虑配置 global task execution time limit,并使用特定的 hard/soft limits.
例如,如果 global time limit 设置为 3 分钟,但某个任务执行从未超过 10s,则更好的实践是,将任务 limit 设置为 10s. 其不但保护了系统避免出现“不可能出现但仍出现”的困境,还可以很好的纪律了其他开发人员对该特定任务的时间要求期望.
如果任务超时,需要进行清理,配置 soft time limit 是由帮助的.
Celery Primitives
[1] - group
- 待并行执行的任务列表
[2] - chain
- 串联一个接一个的调用
[3] - chord
- chord 类似于与 group,但包含了 callback. chord 由一个 header group 和一个 body 组成,其中 body 是一个任务,其当 header 里所有的任务都执行完以后才会执行.
[4] - map
- 类似于 python 的 map
函数,但是,其会创建临时任务,其中,参数列表被用于任务. 例如,task.map([1,2])
,会调用单个任务,并将参数给定任务,其结果为:
res = [task(1), task(2)]
[5] - startmap
- 类似于 map
,但参数是多个 *args
,例如,add.startmap([(2, 2), (4, 4)])
调用单个任务,其结果为:
res = a[add(2, 2), add(4, 4)
[6] - chunks
- 将参数的长列表划分为小段,例如:
items = zip(range(1000), range(1000))
add.chunks(items, 10)
会将列表元素划分为 10 个小段,每个 100 个任务(每次处理序列中的10个元素).
Celery Primitives 示例
[1] - chain
#Simple chain
#第一个任务的结果,传递到chain的下一个任务
# 2 + 2 + 4 + 8
res = chain(add.s(2, 2), add.s(4), add.s(8))()
res.get() #16
#等价形式:
res = (add.s(2, 2) | add.s(4) | add.s(8))().get()
[2] - group
#Simple group
#创建并行执行任务
from celery import group
res = group(add.s(i, i) for i in range(10))()
res.get(timeout=1)
#[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
[3] - chord
chord 可以添加一个 callback 来调用,当 group 中所有的任务执行完后.
from celery import chord
res = chord((add.s(i, i) for i in range(10)), xsum.s())()
#创建 10 个并行任务,且当他们都完成后,返回值被组合进列表,送入 xsum 任务.
res.get() #90
Celery Group
示例如,
from celery import group
from tasks import add
job = group([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8),
add.s(16, 16),
add.s(32, 32),])
result = job.apply_async()
result.ready() # have all subtasks completed?
#True
result.successful() # were all subtasks successful?
#True
result.get()
#[4, 8, 16, 32, 64]
task_soft_time_limit
如,
from celery.exceptions import SoftTimeLimitExceeded
@app.task
def mytask():
try:
return do_work()
except SoftTimeLimitExceeded:
cleanup_in_a_hurry()
2. 任务的幂等性和原子性(Idempotent & atomic)
当执行 Celery 任务时,要保持幂等性(idempotence).简单来说,任务对于相同参数,不管执行多少次,结果都不会受到影响.
幂等性定义:
一次和多次请求某一个资源对于资源本身应该具有同样的结果任意多次执行对资源本身所产生的影响均与一次执行的影响相同
幂等性任务的主要动机是,任务可以被安全配置到 ack late. 如果 worker 进程在任务执行期间崩溃时,结果确认(例如,任务执行结束之后) 能够保证任务被至少完整执行一次.
原子性(Atomicity))是 Celery 任务的另一重要特征. 例如,如果数据库操作是非原子性的,在自己调试某些条件或数据集成时,发现问题是只是时间问题. 使用事务进行数据库操作,和/或在需要的时候通过其他方式保证原子性.
所谓的原子性是指在一次操作或者多次操作中,要么所有的操作全部都得到了执行并且不会受到任何因素的干扰而中断,要么所有的操作都不执行,多个操作是一个不可以分割的整体。
3. 期望语义和重试机制(Desired semantics & retry behaviour)
当实现新的 Celery 任务,或者修改已有的 Celery 任务时, 考虑下这种场景:
如果一个任务调用了某些第三方 API, 每一次请求都可能会出现 timeout,或者一整天都在失败. 此时该怎么处理呢,
对于大部分应用场景,“at-least-one” semantic 是首选,特别是如果任务是幂等的. 然而,也有一些应用场景中,期望的语义是 "at-most-once" ,甚至是 “now or never” 的.
比较熟悉的方案是,Celery 集成的 built-in retry capabilities. 例如,考虑到第三方API调用失败,可以采用一种有效的重试策略,类似于:
@celery.task(
autoretry_for=(MyThirdPartyApiCallException,),
retry_backoff=5,
max_retries=7,
retry_jitter=False,
)
def my_task_which_calls_some_third_party_api():
...
其中,会在 5,10,20,40,80,160,320 秒后重试(最多重试 7 次),异常为 MyThirdPartyApiCallException.
实际场景中,优先采用 retry_jitter=True
,以随机化重试延迟.
除了代码逻辑中可能出现问题之外,还有部署, 了解部署过程中发生的情况非常重要.
理想情况下,old workers 应该接受到 TERM 信号,并有足够的时间完成他们正在进行的事情,然后再停止(如,使用 KILL 信号). 多少时间才够呢?理想情况下,worker 的执行时间至少和最久的任务一样长,最久的任务应该至少执行一次,但不会确认(ACK);否则在部署期间会有丢失任务的风险.
对于 "now or never" 场景,在发送任务时,定义 expiration
会有帮助. 实际例子如,发送一个推送订单状态消息到目标用户的 Celery 任务,太迟发送“订单将在 2 分钟送达” 一个小时是没有意义的.
关于语义和重试机制的最后一个建议是,阅读 Celery 文档 Should I use retry or acks_late? 章节. 如果使用 acks_late=True
,则很可能会结合 reject_on_worker_lost=True
一起使用. 此外,注意 visibility_timeout
(Redis 和 SQS 所支持的)会在 acks_late=True
时产生影响.
4. 注意签名更改(signature changes)
如果项目中使用了带有 ETAs (estimated time of arrival)or countdowns 的 Celery 任务,则很大可能会遇到签名更改相关的问题.
简而言之,实际上会发生在:
- [1] - 有一个变更集(changeset),其可以更改 Celery 任务的签名(如,添加参数、更改参数名、将 Celery 任务从一个模块移动到另一个模块,等等).
- [2] - 虽然旧版代码仍在运行,但有许多使用 ETA/倒计时(countdown)安排的任务.
- [3] - 变更集(changeset)被部署:旧的 workers 被 killed,新的 workers 被启动.
- [4] - 新的 worker 从队列中拉取或者开始执行旧的 ETA/countdown 任务.
- [5] - Boom
Celery worker 日志,类似于:
[ERROR/MainProcess] Received unregistered task of type old_module.my_task
或:
[ERROR/ForkPoolWorker-1] Task my_task[<task_id>] raised unexpected: TypeError("my_task() missing 1 required positional argument: 'some_new_argument'")
取决于签名改变的类型.
需要注意,同样的问题也可能发生在没有 ETA或 countdown 发送的任务上,因为它需要一些认真的努力,来进行部署设置,一遍同时将更改部署到 Celery workers 和 Celery client 进程(或者,以其他方式确保旧与新保持一致). 例如,如果 client 进程刚刚部署,它们可以发送旧 worker 进程所不理解的任务. 类似的现象当然也可以反过来发生.
如果任务是从一个模块移动到另一个模块,可以添加一个 "proxy task" 到旧位置,以确保平滑过渡. 另一种方式是,显式地对任务进行命名(names for tasks ),并通过 send_task
进行触发.
如果是对某个任务引入新的参数,可以提供一个默认值.
4.1. ETA and Countdown
#指定秒
result = add.apply_async((2, 2), countdown=3)
result.get() # this takes at least 3 seconds to return
#指定时间
from datetime import datetime, timedelta
tomorrow = datetime.utcnow() + timedelta(days=1)
add.apply_async((2, 2), eta=tomorrow)
5. 使用多个队列(Use multiple queues)
默认情况下,只有一个叫 celery
的队列. 如果在某些场景需要用到将 work 划分到多个队列的情况.
例如,如果有很多任务需要实时执行,而有些任务需要延迟几分钟(或,小时),则他们需要被划分为不同的队列.
多队列设置的另一个好处是,其能够使得分别对每个队列进行 worker 缩放. 注意,还有可能定义在同一队列中运行的任务的优先级. 但是,Celery 文档建议在实际场景中优先选择多队列来设定任务优先级.
可以显式地指定队列:
# celeryconfig.py
from kombu import Exchange, Queue
_ALLOWED_QUEUES = ('hiprio', 'lowprio')
task_create_missing_queues = False # default is True
task_queues = tuple(Queue(name=q, exchange=Exchange(q), routing_key=q) for q in _ALLOWED_QUEUES)
例如,这种配置,对于任务被发送到拼错的队列时,会报 celery.exceptions.QueueNotFound
.
而,当没有这种配置时,Celery client 侧的逻辑会将任务发送到没有 Celery worker 消费的队列里,这在部署环境中是很难被发现的.
总结
Celery 是功能强大的库,其需要很大努力来理解如何在业务场景中有效的使用.
一切都要从业务场景理解开始:如何划分 work,什么是期望语义等等.
最后一个时间建议,确保有一个合适的本地 Celery 环境,以便于尝试某些特性. 例如,为了理解不同的重试策略或者原子(primitives)实际上是如何运行的,可以将他们在放到生产环境前,在简单环境中进行测试.