Celery 分布式任务队列框架,主要包括三个组成成分:
[1] - Celery 客户端(Client)
[2] - 消息中间件(Message Broker)
[3] - Celery Worker
他们之间的关系如图:
这里,采用 FastAPI 作为 Celery Client,RabbitMQ 作为 Message Broker.
其中,
[1] - Celery Client 运行 FastAPI app,并传递消息或后台任务(message/background jobs) 到 RabbitMQ;
[2] - RabbitMQ 作为 Message Broker,将会调度 clients 和 workers 之间的消息;
[3] - RabbitMQ 在接收到 client 发送的消息后,通过将消息发送到一个 Celery Worker 以初始化 client 任务;
[4] - 一个 Celery Worker 被看做为后台任务,其可以从任何网络服务请求,实现异步性;
[5] - 同时可以有很多 workers 进行或完成很多任务(每个任务作为一个独立线程thread);
[6] - Celery 确保每个 worker 在同一时间点只执行一个任务,且每个任务只能被分配到一个 worker.
基于以上,实现如下:
Github - azzan-amin-97/FastAPI_Async_Celery
1. 依赖项安装
pip install fastapi
pip install celery
pip install uvicorn #ASGI server to run FastAPI app.
pip install flower #任务队列监控
设置 Message Broker(基于 Docker):
docker run -d --name some-rabbit -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:3
2. 创建 Celery Worker Task
celery_worker.py
#!/usr/bin/python3
#!--*-- coding: utf-8 --*--
import time
from celery import Celery
from celery.utils.log import get_task_logger
#实例化 Celery
celery = Celery('tasks', broker='amqp://guest:guest@127.0.0.1:5672//')
# 创建 logger,以显示日志信息
celery_log = get_task_logger(__name__)
# 创建任务函数,以订单(Order) 为例,异步进行
@celery.task
def create_order(name, quantity):
# 5 seconds per 1 order
complete_time_per_item = 5
# Keep increasing depending on item quantity being ordered
time.sleep(complete_time_per_item * quantity)
# 显示日志
celery_log.info(f"Order Complete!")
return {"message": f"Hi {name}, Your order has completed!",
"order_quantity": quantity}
3. 创建 Model 和 App
model.py
:
#!/usr/bin/python3
#!--*-- coding: utf-8 --*--
from pydantic import BaseModel
# Pydantic BaseModel
# Order class model for request body
class Order(BaseModel):
customer_name: str
order_quantity: int
main.py
#!/usr/bin/python3
#!--*-- coding: utf-8 --*--
from fastapi import FastAPI
from celery_worker import create_order
from model import Order
# Create FastAPI app
app = FastAPI()
# Create order endpoint
@app.post('/order')
def add_order(order: Order):
# use delay() method to call the celery task
create_order.delay(order.customer_name, order.order_quantity)
return {"message": "Order Received! Thank you for your patience."}
4. 运行 FastAPI 和 Celery Worker server
uvicorn main:app --reload
访问:http://localhost:8000/docs 即可查看 FastAPI Swagger Docs.
启动 Celery Worker:
celery -A celery_worker.celery worker --loglevel=info
启动 flower 服务监控 Celery 消息队列:
celery flower -A celery_worker.celery --broker:amqp://localhost//
访问 http://localhost:5555/
5. 测试和分析 Celery
访问 http://localhost:8000/docs,示例如,
点击 Execute,返回结果如: