Celery 概念:分布式、路由队列与工作流

一. 分布式

在Celery基础教程中,我们了解了如何定义task,启动celery worker和client调用。再次回到Celery定义:

Celery is asynchronous task queue/job based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.

可见Celery也是一个分布式的消息系统,那么如何利用分布式的方法执行任务呢?

Celery的分布式实际包含两个层次:

  1. Distribute work on a given machine across all CPUs
  2. Distribute work to many machines

先说第一点,默认情况下,Celery在一台机器上启动worker,worker的进程数量和机器的CPU个数一致。但也可以通过concurrency参数控制启动worker的进程数量:

同时启动5个worker进程
celery -A tasks worker --loglevel=INFO --concurrency=5

比如你的机器只有一个CPU,但仍然可以通过上述方法启动5个worker进程,在某些IO密集型的任务中,可以考虑启动worker的数量多于CPU数量,在CPU密集型的任务中,这样的操作可能没有什么好处。

再说第二点,因为Celery只指定了worker的broker,所以只需要在不同机器上启动worker,它们都会从相同的broker中获取任务并处理。

在考虑不同机器上的操作时,涉及远程控制的概念。

  1. celery inspect

观察所有运行worker的信息,例如观察当前处于活跃状态的worker和task:

        celery -A tasks inspect active          
  1. celery control

控制worker的行为,例如向worker中增加对某队列的消费:

    celery control -d w1.e.com add_consumer queue_name
  1. celery status

观察当前worker状态

celery -A tasks status
# celery@iZ25d0yvrwwZ: OK
#
# 1 node online.

二. 路由(Route)与队列(Queue)

如果不特殊指定,Celery将会创建名为celery的默认队列,用于消息传递。但在某些应用场景下,例如不同任务的耗时和优先级不同,不应让耗时低和优先级高的任务等待耗时较高优先级较低的任务执行,毕竟如上面提到的,worker进程的数量是有限的,过多的任务会造成任务等待,此时需要把不同的消息投递到不同的任务队列处理。

这里涉及的Route,Queue和Exchange概念,我的理解是依赖AMQP本身相关概念的,因此相应定义也是一致的。另外由于我研究较浅,虽然Exchange有多重类型(direct, topic…),但这里我们只使用默认的Exchange,问题的焦点放在Route, Queue和Task的绑定上。

(1) 绑定Route与Queue: 在Celery的配置文件中,设定CELERY_QUEUES变量

CELERY_QUEUES = (
    Queue('add', routing_key='task.#'),
    Queue('default', routing_key='default'),
)

这里多说一句的是,配置文件如何覆盖到Worker上(因为看了网上很多文章,都在配置文件的内容上,但没有写清如何在Worker上生效),可以依赖如下代码:

app = Celery('task_info', broker='redis://localhost', backend='redis://localhost') # for case with backend
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_QUEUES = CELERY_QUEUES

(2) 绑定Queue与task:

@app.task(queue='add')
def add(x, y):
    return 2*x + 2*y

指定task处理来自名为add的队列内容

(3) 启动只处理指定队列的worker

celery -A tasks worker -Q add

启动worker,只处理来自名为add的队列消息

(4) 客户端调用

client调用时可以指定路由:

result = add.apply_async(args=[4,4],     routing_key='task.add')

也可以指定队列:

result = add.apply_async(args=[4,4], queue='default')

三. 工作流(canvas)

子任务:也可以视为一种任务,但如果把任务视为函数的话,它可能是填了部分参数的函数。子任务的主要价值在于它可以用于关联运算中,即几个子任务按某种工作流方式的定义执行更为复杂的任务。

Celery工作流包含以下原语:

  1. group

    group并行的执行一系列任务:

    from celery import group
    from proj.tasks import add
    
    group(add.s(i, i) for i in xrange(10))().get()
    # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    
  2. chain

    chain串行的执行任务:

     from celery import chain
     from proj.tasks import add, mul
    
     chain(add.s(4, 4) | mul.s(8))().get()
     # (4 + 4) * 8
    
  3. chord

    chord是包含回调的group操作

  4. map
  5. starmap
  6. chunks