RabbitMQ四种消息投递/交换模式

RabbitMQ消息队列是AMQP协议的具体实现,包含了三种常见的投递模式,优化了消息确认和预取机制,并加入了rpc远程调用的实现。

RabbitMQ同时扮演了邮箱、邮局和邮递员的角色。队列(queue )是邮箱的名字。队列用于存储消息,队列本质上是一个消息缓冲器(message buffer)。

rabbitmp

图中,生产者将消息发送到“hello”队列,消费者将消息从队列中取出。

RabbitMQ 消息投递/交换模式

RabbitMQ消息投递/交换模式分为四种:发布/订阅(Publish/Subscribe)模式、主题模式(Topics)、路由模式(Routing)、工作队列(Work queues),严格来讲工作队列(Work queues)因与RabbitMQ核心思想不匹配,所以,在RabbitMQ中工作队列(Work queues)是一个不完整的消息模式。

工作队列(Work queues)

这种模式下多个消费者共享一个任务,多个消费者之间存在竞争关系,最终只有一个消费者能够成功拿到消息。

rabbitmp

工作队列又称:任务队列,其核心思想是避免立即执行资源密集型任务,以避免引发漫长等待。

任务队列将任务压后执行,通过将任务封装成消息,并发送到队列中,然后利用台程序把任务从队列取出来。这在消峰填谷场景中很常见。

发送者代码:

channel.basic_publish(exchange='',routing_key='hello', body=message)

接收者代码:

ch.basic_ack(delivery_tag = method.delivery_tag)

由于此模式并不能完整的体现RabbitMQ的消息模式,各中概念放在下一个模式中说。

RabbitMQ 发布/订阅模式(Publish/Subscribe)

这种模式更像是广播消息,把一个消息发给多个消费者。就像日志系统将一条记录分别发到控制台和日志文件。

rabbitmp

RabbitMQ消息传递模型的核心思想:生产者永远不会将消息直接发送到队列。

生产者也不知道消息能否被传递到队列,因此,引出了RabbitMQ中Exchanges的概念,在上个例子(工作队列)中隐藏了这一概念,使用默认的exchange=“”。

RabbitMQ Exchanges概念

RabbitMQ消息传递模型的核心思想:生产者永远不直接与队列打交道。Exchange就是介于生产者与队列间的消息投递中间人。

Exchange做的事情也非常简单,一方面接收来自生产者的消息,另一方面将收到的消息推送到队列。

Exchange需要知道使用哪种方式处理收到的消息,如:应该把消息追加到指定的队列,还是应该把消息追加到多个队列,亦或者应该直接被丢弃。这些规则都由exchange type(交换类型)决定。

RabbitMQ Exchanges type

RabbitMQ Exchanges type分为三种类型:发布/订阅模式的exchange type为fanout,路由模式(Routing)下的exchanges type为direct,主题模式(Topics)模式下的exchanges type为topic。

RabbitMQ Exchanges fanout声明

生产者端

1、先声明一个exchange:

exchange='logs',exchange_type='fanout'

2、发送消息到exchange:

channel.basic_publish(exchange='logs',routing_key='',body=message)

消息者端

把exchange绑定到指定的队列上,以便从队列接收。

1、先声明exchange和交换类型:

exchange='logs',exchange_type='fanout'

2、声明绑定:

exchange='logs',queue=queue_name

最后从指定的队列中取出消息:

channel.basic_consume(callback,queue=queue_name,no_ack=True)

这里又多出了一个绑定(Bindings)的概念,鉴于这个概念过于简单就不多说了,至于ack的概念会在后面详细说明。

RabbitMQ 路由模式(Routing)

这种模式下是有选择地接收消息。有点像日志系统中按级别接收日志记录。如,只将“错误”消息发送到日志文件(以节省磁盘空间),同时,仍然能够在控制台上看到所有的日志记录。

rabbitmp

RabbitMQ Exchanges direct声明

生产者端

1、声明一个exchange和交换类型:

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

2、发送消息到exchange并指定消息路由:

channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)

消费者端

1、声明一个exchange和交换类型:

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

2、声明一个绑定并选择路由:

channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

最后从队列获取消息:

channel.basic_consume(callback,queue=queue_name,no_ack=True)

这个模式的重点在于路由,这种类型的exchange需要一个路由的概念,才能把消息发送到正确的地方。

路由是一个随消息一起发出的附加信息,消费者也需要声明路由,以确定希望接到附加了哪种路由的消息。

由于这种模式下的路由key值,需要完全匹配才能被正确接收,所以,暂且认为发送者与接收者中的routing_key是同一个概念。

RabbitMQ 主题模式(Topics)

这种模式根据主题接收消息。

rabbitmp

在日志系统中,不仅希望可以根据日志的级别订阅日志,还要根据日志的来源订阅日志。如:希望看到来自'cron'系统的错误日志,以及来自'kern'系统的所有日志。

在上个示例中( 路由模式)我们把发送者与接收者的routing_key当成是对等的概念,但实际上,它们是花开两朵,各表一枝。

这个模式下的routing_key必须是由点(.)分隔的单词列表,单词可以是任何内容,但通常会使用与消息相关的名词。如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。routing_key中可以包含任意数量的单词,最多长达255个字节。

发送者与接收者routing_key匹配规则有两点需要注意:

* (star) 可以替代一个单词。

# (hash) 可以替换零个或多个单词。

RabbitMQ Exchanges topic声明

生产者端

channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

channel.basic_publish(exchange='topic_logs', routing_key=routing_key,body=message)

消费者端

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

绑定多个exchange:

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name,routing_key=binding_key)

channel.basic_consume(callback, queue=queue_name,no_ack=True)

主题模式(Topics)下的routing_key匹配模式更像是正则匹配。如:生产者发送消息routing_key为a.b.c,则消费者routing_key为 *.b.*| a.b.*| *.b.c的都能收到消息。

RabbitMQ ACK(RabbitMQ消息确认机制)

根据AMQP协议的定义,由于发送协议无法保证到达对方,或被其成功处理,因此,生产者和消费者都需要一种交付和处理确认的机制。

生产端确认机制

将消息发送出去,不能假定该消息已到达服务器并已成功处理。它可能在途中丢失,或其交付可能会显著延迟。

基于AMQP的标准,保证消息不丢失的唯一方法是使用事务,在这种情况下,交易吞吐量将锐减250倍,为了解决这个问题,引入了确认机制。

通过调用confirm.select方法,启用确认机制,confirm模式无法与事务模式同时出现。

confirm分为三种模式:等待返回的confirm,批量confirm,异步回调confirm。

三者常规测试下效率依次递增,三者都是通过delivery-tag来确认消息是否被正确收到。

消费端确认机制

根据所使用的确认模式,RabbitMQ可以认为消息在发出(写入TCP套接字)后即为成功传递,或者在收到客户端明确确认时(“手动”)。

手动确认可以是确认”成功“,也可以用于告知”失败“:

basic.ack 用于确认成功。

basic.nack 用于告知失败。

basic.reject 用于告知失败,但与basic.nack相比有一个限制(不能批量)。

这里的确认失败,分为两种行为:

一种是直接丢弃消息,另一种是让消息重回队列,这取决于发送确认时的参数,nack的批量操作需要手动设置。以下示例为拒绝单个消息,并要求broker重新排队:

GetResponse gr = channel.basicGet("some.queue",false);

channel.basicNack(gr.getEnvelope(),getDeliveryTag(),false,true);

此示例为一次拒绝两个消息:

GetResponse gr1 = channel.basicGet("some.queue", false);

GetResponse gr2 = channel.basicGet("some.queue", false);

channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true);

当消息被重新排队时,如果可能的话,它将被放置在其队列中的原始位置。

如果不可能(当多个消费者共享队列时,同时传递和来自其他消费者的确认操作),该消息将被重新排队到,靠近队列头的位置。

RabbitMQ 预取机制(QoS)

RabbitMQ在从队列中取出消息,默认一次会取出多条,由于消息在处理后,需要进行确认操作,RabbitMQ允许在使用时,限制通道(连接)上未确认的消息数(也称为“预取计数”)。

不幸的是,以channel为单位进行限制,并不是理想的范围,因为单个channel上,可以获取多个队列的消息,所以,通道和队列需要协调发送的每个消息,以确保它们不超过限制。这在单台机器上会很慢,而在跨群集时会更慢。

因此,RabbitMQ 在basic.qos方法中重新定义了global标志的含义:

AMQP 0-9-1 中prefetch_count的含义:

false为在channel上的所有消费者之间共享,true为在connection上共享所有消费者。

RabbitMQ 中prefetch_count的含义:false为channel上单个消费者,true为channel上所有的消费者。

从以上说明中可以看出RabbitMQ的QoS实现粒度更细更灵活高效,以下为示例:

Channel channel = ...;

Consumer consumer1 = ...;

Consumer consumer2 = ...;

channel.basicQos(10, false); // Per consumer limit

channel.basicQos(15, true);  // Per channel limit

channel.basicConsume("my-queue1", false, consumer1);

channel.basicConsume("my-queue2", false, consumer2);

RabbitMQ 确认模式和QoS预取值,对消费者吞吐量有显著影响。通常,增加预取将提高向消费者传递消息的速率,自动确认模式可以产生最佳的交付率。

但是,在这两种情况下,已传送但尚未处理的消息的数量也将增加,从而增加了消费者的内存消耗。

100到300的范围值通常可提供最佳吞吐量,并且不会出现把消费者压垮的风险。

QoS预取设置不会影响到使用basic.get(“pull API”)获取的消息。