RabbitMQ初探

按照官方文档的说法,RabbitMQ是一种基于AMQP(Advanced Message Queue Protocol)协议实现的“工业级”消息队列,在许多实际系统(这里我本想找到RabbitMQ的项目列表,但很可惜在官网上并没有发现)都有着应用。这里谈到AMQP是一项消息队列协议,那么它就包含了多种实现方式,这里附录下AMQP的具体实现族,比如常用的RabbitMQ/ZeroMQ等,关于不同MQ的比较,可以参考经典的兔子与兔子窝以及RabbitMQ vs ActiveMQ vs ZeroMQ

RabbitMQ:实现一种经纪人模式,通过central node集中管理数据再做分发,这样做的好处是易于使用和部署,但相应的代价是系统为支持这点会变得更慢(我理解是central node负载压力过大)

ZeroMQ:实现P2P模式,是一种较为轻量级的消息系统,很多复杂的功能需要使用者自己实现

ActiveMQ:相比而言,它处于ZeroMQ和RabbitMQ之间,即它既可以按broker模式部署也可以按P2P模式部署,另外据说它有丢失消息的可能性(这点基本要被性能要求的服务pass了)

那么下面来具体说说今天的主题,RabbitMQ,关于它语言特性的支持可以参考官方文档: 可依赖(Reliability) 灵活的路由分发(Flexible Routing) 集群(Clustering) …… 对于这些特性我并没有直观的理解,所以下面还是具体来看它的使用实例。(其中最好的阅读文档还是官方文档

基础对象:

Producer:生产者(P)

Consumer:消费者©

Queues:队列,数据的数据存储(queue)

Exchange:Producer并不是直接把数据发送到Queue中,而是将它委托给Exchange,由Exchange将数据交给合理的route

Bindings: Exchange用于判断该发送到哪个route的规则 (To instruct an exchange E to route messages to a queue Q, Q has to be bound to E)

应用模式:

one Producer one Consumer: 这是最简单的调用形式,Producer把消息发送到消息队列,Consumer从消息队列中读取消息

work queues: 可以视为One Producer,Multi-Consumer,Producer把数据放到消息队列,多个Consumer从Queue中读取数据(任务)进行处理

publish/subscribe:发布者/订阅者模式,当发布者发送消息时,所有的订阅者都可以订阅得到消息,并进行自己的处理

Routings/Topics/RPC:远程过程调用,Topics在其它语言中有了解,这里并没有仔细阅读

应用场景

为什么要使用消息队列? 或许最重要的功能在于解耦,软件模块解耦的价值不需要多做解释,那么我们是否有其它解耦的方式呢?在兔子与兔子窝提到一个很经典的例子: 当我们用MySQL来将处理数据交互时,可以由一个服务把数据写在表里,由另一个服务来读取,但是如果有多个服务都想去读取数据呢?(数据表里的数据实际是单线程的读写),如果程序需要根据压力情况进行动态增减呢? 这就是消息队列能解决的问题,可以使得数据在不同服务(模块)间灵活的交互。

应用语言

RabbitMQ作为AMQP的一种实现,本身也有不同编程语言的实现方法(C#/Java/PHP/Python…),我在这里主要使用Python的一种实现库pika, Python的语言包还包括py-amqplib

安装方法 以Ubuntu为例,第一步,安装rabbitmq-server

    apt-get install rabbitmq-server

第二步,安装pika

    pip install pika

应用实例

最简单的rabbit_test_sender.py:

import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

for m in range(3):
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World %d' % m)
    print " [x] sent 'Hello World' %d" % m

connection.close()

rabbit_test_receiver.py:

import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()

我的应用背景

那么最后说说我为什么要接触RabbitMQ,在我提供的Facebook自动管理服务模块中,原先实际包括了对象创建和对象管理两个部分。之所以两部分写在一起,主要是因为对象的创建需要自动加入管理组中,同时我并不希望每次管理都需要先从数据库中做一次全量的数据加载(对象创建后会在数据库里持久化),而是只对于新创建的对象做增量的内存数据增加。这里可以看到,两个功能上可以解耦的模块被写在同一个服务中,因此带来的问题是: 当某种情况下,对象管理模块出现了问题,那么对象创建模块同样也会受到影响。而对象创建这件事是用户可以感知到的,那么管理的延迟就会导致创建的延迟,进而影响用户体验。。。 通过RabbitMQ,我将对象创建和对象管理模块拆分成两个不同的服务,这样做有两个好处: 1. 当新对象被用户创建时,对象的添加通过RabbitMQ通知对象管理模块,创建模块可以继续做自己的事情,用户不会感知管理模块行为带来的延迟 2. 管理模块可以简单的通过多个服务提高数据处理的效率

因为RabbitMQ是由实际问题引起的,对它谈不上多仔细的研究,上述博客中参考了下面的内容,谨此致谢~

兔子窝的翻译,有意思的一篇文章: http://blog.ftofficer.com/2010/03/translation-rabbitmq-python-rabbits-and-warrens/ 其它就是各个官方文档: https://pika.readthedocs.org/en/0.10.0/examples/blocking_basic_get.html http://www.rabbitmq.com/features.html