Celery:任务队列 vs 消息队列
一. 什么是Celery
Celery是一个python模块,它在官网的定义:
Celery is asynchronous task queue/job based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.
这里强调的概念包括:异步任务队列 / 分布式消息传递 / 实时或调度任务,下面对这些概念分别做解释
二. 任务队列 vs 消息队列
消息队列作为进程间异步通信方式,在模块解耦上有广泛的应用。常用的消息队列包括: rabbitmq, zeromq, 以及redis等。 Celery首先在定义上强调了自己是task queue而不是message queue,同时它的具体实现上,会选择rabbitmq(默认方式)或者redis作为broker(貌似对其它mq和database也有支持,具体可见broker列表,也说明它本身并不是一种消息队列,那么任务队列与消息队列的区别在哪里呢?
这个问题的回答主要参考Quora/What is difference between a message queue and a task queue
截取较为重要的一段:
Let's forget Celery for a moment. Let's talk about RabbitMQ. What would we usually do? Our Django/Flask app would send a message to a queue. We will have some workers running which will be waiting for new messages in certain queues. When a new message arrives, it starts working and processes the tasks
也就是说,任务队列会在一个比消息队列更高的层级管理你的工作,你不再需要关注消息的传递,而是重点定义工作的任务。
这仍然是一个模糊的概念,我们需要举一个具体的例子:这里我们定义两个task functions,表示Worker可以执行的任务,然后再在客户端调用执行这些任务。为了对比,我们再用rabbitmq作为消息队列,完成类似的功能。 (在这里省去Celery/rabbitmq/redis的安装和配置,我们假设相关环境已配置ok)。
Celery版本: 1. server端定义(tasks.py)
from celery import Celery
import time
app = Celery('task', broker='redis://localhost', backend='redis://localhost')
app.conf.CELERY_TASK_SERIALIZER = 'json'
@app.task
def add(x, y):
time.sleep(5)
return x + y
@app.task
def multiply(x, y):
return x * y
首先我们定义一个任务队列的实例app,它包含的参数: ‘tasks’: 任务名称,实践发现是可以不同于server文件名的,但简单处理可保持一致 broker: 指定Celery所依赖的消息队列地址,这里使用的是我本机的redis backend: 指定Celery对tasks状态信息的存储位置,此处可以为空
简单对比下设置和未设置backend的效果(假设用redis作为backend):
- 有backend设置运行task, server端日志:
[2016-03-15 16:33:00,736: INFO/MainProcess] Received task: tasks.add[7e5c2132-d2a1-4ef6-a369-bb415b6499c4]
在redis中查找task_id:
get celery-task-meta-7e5c2132-d2a1-4ef6-a369-bb415b6499c4 “\x80\x02}q\x01(U\x06statusq\x02U\aSUCCESSq\x03U\ttracebackq\x04NU\x06resultq\x05K\bU\bchildrenq\x06]u.” 2. 无backend设置运行task, server端日志: [2016-03-15 16:40:10,620: INFO/MainProcess] Received task: tasks.add[68a795c7-ab1e-4480-a735-3db2e8bd2918]
在redis中不能查找到这个task_id。
启动worker server:
celery -A tasks worker --loglevel=info
可以看到类似如下启动日志:
[tasks] . tasks.add . tasks.multiply [2016-03-15 16:40:03,028: INFO/MainProcess] Connected to redis://localhost:6379// [2016-03-15 16:40:03,122: INFO/MainProcess] mingle: searching for neighbors [2016-03-15 16:40:04,146: INFO/MainProcess] mingle: all alone [2016-03-15 16:40:04,188: WARNING/MainProcess] celery@iZ25d0yvrwwZ ready.
我们定义了两个任务add和multiply,redis作为broker对外提供服务。
客户端定义(client.py) 异步调用
from tasks import add import time result = add.delay(4, 4) while 1: if result.ready(): print 'result ',result.get() break else: print 'not ready, wait 1 second' time.sleep(1)
因为我们在server端的add里面加入了sleep,因此result.ready()初始返回False,直到时间达到(模拟任务处理)返回处理结果,如下:
not ready, wait 1 second
not ready, wait 1 second
not ready, wait 1 second
not ready, wait 1 second
not ready, wait 1 second
result 8
同步调用
# synchronous way
try:
# short time
result.get(timeout=1)
# long time
result.get(timeout=10)
except Exception as e:
print e
此时client会阻塞在get函数上,直到取到任务结果,或者timeout。
Rabbitmq版本
由于本篇主要介绍Celery功能,因此这里主要附上代码,它的基本原理是采用RPC调用实现消息传递
Server端 (add_consumer.py)
import pika
import simplejson as json
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def on_request(ch, method, props, body):
[x, y] = json.loads(body)
print(" [.] %d + %d" % (x, y))
response = x + y
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
Client端 (add_producer.py)
import pika
import uuid
import simplejson as json
class TaskRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def add(self, x, y):
body = json.dumps([x, y])
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=body)
while self.response is None:
self.connection.process_data_events()
return int(self.response)
task_rpc = TaskRpcClient()
x = 4
y = 4
print(" [x] Requesting %d + %d" % (x,y))
response = task_rpc.add(x, y)
print(" [.] Got %r" % response)
这里的实现依据Rabbitmq官方介绍。
通过上述代码的对比,可以看到,实现相同的功能,Celery要简洁很多。它对使用者隐藏了底层mq操作的复杂性,使得使用者可以以函数的形式更专注于实际任务,而不需要为mq操作过多操心。