Celery分布式任务队列 - AIUAI 中关于Celery Worker 启动过程的内部分析,如:

当Celery启动一个Worker时,这个Worker会与Broker建立链接(tcp长链接),然后如果有数据传输,则会创建相应的channel, 这个连接可以有多个channel。然后,Worker就会去borker的队列里面取相应的task来进行消费了,这也是典型的消费者生产者模式。

其中,这个worker主要是有四部分组成的,task_pool, consumer, scheduler, mediator。task_pool 主要是用来存放的是一些worker, 当启动了一个worker, 并且提供并发参数的时候,会将一些worker放在这里面。

celery 默认的并发方式是prefork,也就是多进程的方式,这里只是celery对multiprocessing.Pool 进行了轻量的改造,然后给了一个新的名字叫做prefork,这个pool与多进程的进程池的区别就是这个task_pool只是存放一些运行的 worker.consumer 也就是消费者,主要是从broker那里接受一些message,然后将message转化为celery.worker.request.Request的一个实例。并且在适当的时候,会把这个请求包装进Task中,Task就是用装饰器 app_celery.task() 装饰的函数所生成的类,所以可以在自定义的任务函数中使用这个请求参数,获取一些关键的信息.

1. Celery Workers 类型设置

选择合适的 workers 类型对于执行时间和效率有着重要的影响.

celery/concurrency/

Celery Concurrency 支持:

  • Solo
  • Prefork
  • Eventlet
  • Gevent
ALIASES = {
    'prefork': 'celery.concurrency.prefork:TaskPool',
    'eventlet': 'celery.concurrency.eventlet:TaskPool',
    'gevent': 'celery.concurrency.gevent:TaskPool',
    'solo': 'celery.concurrency.solo:TaskPool',
    'processes': 'celery.concurrency.prefork:TaskPool',  # XXX compat alias
}

1.1. Solo

Solo 池是一个內联池(inline pool),意味着,任务不会同时处理,其只是创建一个线程(thread) 并使用该线程执行任务.

使用方式如:

celery -A tasks worker --pool=solo --loglevel=info

适用场景如:

Solo适用需要逐一执行(one by one)的任务. 不过,实际中不使用并发而仅使用 solo pool 的场景不多.

1.2. Prefork

Prefork 池是 Celery 对 Python 标准库 multiprocess 的改造,其能够同时处理多个任务.

使用方式如:

celery -A tasks worker --pool=prefork --concurrency=4 --loglevel=info

适用场景如:

CPU密集型(CPU-bound),即,任务的大部分时间主要是 CPU 计算;只有 CPU 越快时才会速度更快.

CPU密集型任务如:文件转换、压缩、搜索算法等.

1.3. Eventlet & Gevent

Eventlet & Gevent 池使用协程(coroutlines,也被称为绿色线程,green threads) 来执行任务,不是产生传统线程. 能够同时处理多个任务.

Eventlet Pool 使用方式如:

celery -A tasks worker --pool=eventlet --concurrency=500 --loglevel=info

Gevent Pool 使用方式如:

celery -A tasks worker --pool=gevent --concurrency=500 --loglevel=info

适用场景如:

I/O 密集型任务,即,任务的主要瓶颈是 I/O 操作的等待时间. 与 Prefork 不同的是,其可以设置并发高的数量,而不受限于 CPUs 的数量.

I/O密集型任务如,邮件发送、API请求等.

注:

由于 eventlet 和 gevent 不是 Python 标准库,因此需要单独安装:

pip install celery[eventlet]
pip install celery[gevent]

Last modification:June 15th, 2022 at 04:31 pm