技术记录栈

记录点滴:java、datebase、linux、spring、javascript、nginx

2018/08/27

RabbitMQ中的AMQP概念全解析

RabbitMQ是一款实现了AMQP协议的消息队列,在分布式应用场景中时常被提及。RabbitMQ除了性能卓越外,对消息队列的操作也很简单。

RabbitMQ是一个消息代理:它接受和转发消息。用发送邮件的场景举例,RabbitMQ同时扮演了邮箱、邮局和邮递员的角色。队列(queue )是RabbitMQ中的邮箱的名称。用于存储消息,它本质上是一个很大的消息缓冲器(message buffer)。

rabbitmp

示例图中,生产者将消息发送到“hello”队列,使用者从该队列接收消息。

RabbitMQ常用的4种模式

工作队列(Work queues)

在多个消费端之间分配任务(竞争消费者模式),这种模式下多个消费者共享一个任务,只有一个消费者能够成功处理消息。工作队列(又称:任务队列)的核心思想是,避免立即执行资源密集型任务,而引发漫长等待。相反,把任务压后执行。通过将任务封装为消息,将其发送到队列中,用后台程序把任务从队列中取出并执行。

rabbitmp

此模式下的发送者:

channel.basic_publish(exchange='',routing_key='hello', body=message)

此模式下的接收者:

ch.basic_ack(delivery_tag = method.delivery_tag)

由于此模式并不能完整的体现RabbitMQ的消息模式,各中概念放在下一个模式中说。

发布/订阅(Publish/Subscribe)

 

一次向多个消费者发送消息,这种模式更像是在广播消息,就像日志系统将一份消息发送到控制台,一份发送到日志文件。这一模式引入了Exchanges的概念,RabbitMQ中消息传递模型的核心思想是,生产者永远不会将消息直接发送到队列,实际上,生产者甚至不知道消息能否会被传递到队列(上一个例子隐藏了这一概念,使用默认exchange=“”)。

 

rabbitmp

生产者只能把消息发送到exchange,exchange需要做的事情也非常简单,一方面,它接收来自生产者的消息,另一方面将它们推送到队列。exchange必须确切知道如何处理收到的消息,如:应该把消息追加到指定队列吗?应该把消息追加到多个队列吗?或者它应该被丢弃,这些规则都由交换类型(exchange type)决定。

Exchanges四种类型

发布/订阅模式的exchange类型为fanout。

此模式下的发送者

1、先声明一个exchange:

exchange='logs',exchange_type='fanout'

2、发送消息到exchange:

channel.basic_publish(exchange='logs',routing_key='',body=message)

此模式下的接收者

先声明模式与exchange:

exchange='logs',exchange_type='fanout'

然后声明绑定:

exchange='logs',queue=queue_name

把exchange绑定到指定的队列上,以便从队列接收。

最后从指定的队列中取出消息:

channel.basic_consume(callback,queue=queue_name,no_ack=True)

这里又多出了一个绑定(Bindings)的概念,鉴于这个概念过于简单就不多说了,至于ack的概念会在后面详细说明。

路由模式(Routing)

 

有选择地接收消息,这种模式下的消息,有点像日志系统中的,按日志级别分类处理。将“错误”消息发送到日志文件(以节省磁盘空间),同时,仍然能够在控制台上看到所有的日志消息。

路由模式下的Exchanges为direct。

rabbitmp

路由模式(Routing)模式下的发送者

声明一个exchange和类型:

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

发送消息到exchange并指定消息路由:

channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)

路由模式(Routing)模式下的接收者

 

声明一个exchange和类型:

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

声明一个绑定并选择路由:

channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

最后从队列获取消息:

channel.basic_consume(callback,queue=queue_name,no_ack=True)

此模式与上一个示例大致相同,唯一的重点在于路由,这种类型的exchange需要一个路由的概念,才能把消息发送到正确的地方。路由是一个随消息一起发出的附加信息,接收者需要声明,希望接到附加了哪种路由的消息。由于这种模式下的路由key值,需要完全匹配才能被正确接收,所以暂且认为发送者与接收者中的routing_key是同一个概念。

主题模式(Topics)

根据模式(主题)接收消息,这一模式接着上个示例中路由的概念继续做文章,基于多个标准进行路由。在日志系统中,我们不仅希望可以根据日志的级别来订阅日志,还要根据日志的来源去订阅日志。比如:我们希望听听来自'cron'系统的错误日志,以及来自'kern'系统的所有日志。

上个示例中我们暂且认为发送者与接收者routing_key是相等的概念,但实际上,它们是花开两朵,各表一枝。这个模式下的routing_key必须是由点(.)分隔的单词列表,单词可以是任何内容,但通常它们会使用与消息相关的名词。如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。routing_key中可以包含任意数量的单词,最多可达255个字节。发送者与接收者routing_key匹配规则有两点需要注意:

* (star) 可以替代一个单词。

# (hash) 可以替换零个或多个单词。

主题模式(Topics)模式下的Exchanges为topic。

rabbitmp

主题模式(Topics)模式下的发送者

channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

channel.basic_publish(exchange='topic_logs', routing_key=routing_key,body=message)

主题模式(Topics)模式下的接收者:

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

绑定多个 exchange:

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name,routing_key=binding_key)

channel.basic_consume(callback, queue=queue_name,no_ack=True)

主题模式(Topics)模式下的routing_key的匹配更像是正则匹配模式。如:发送者routing_key=a.b.c,则routing_key为 *.b.*| a.b.*| *.b.c的接收者都能收到消息。

ACK(RabbitMQ消息确认机制)

根据AMQP协议的定义,由于发送协议无法保证到达对方,或被其成功处理,因此发布者和消费者,都需要一种交付和处理确认的机制。

发送者消息确认机制

将消息发送出去,不能假定该消息已到达服务器并且已成功处理。它可能在途中丢失,或其交付可能会显着延迟。基于AMQP的标准,保证消息不丢失的唯一方法是使用事务。在这种情况下,交易吞吐量将锐减250倍。为了解决这个问题,引入了确认机制。

通过调用confirm.select方法,启用确认机制,confirm模式无法与事务模式同时出现。confirm分为三种模式,等待返回的confirm,批量confirm,异步回调confirm。三者常规测试下效率依次递增。三者都是通过delivery-tag来确认消息是否被正确收到。

 

接收者消息确认机制

 

根据所使用的确认模式,RabbitMQ可以认为消息在发出(写入TCP套接字)后即为成功传递,或者在收到客户端明确确认时(“手动”)。手动确认可以是确认”成功“,也可以用于告知”失败“:

basic.ack用于确认成功

basic.nack用于告知失败

basic.reject用于告知失败,但与basic.nack相比有一个限制(不能批量)

这里的确认失败,分为两种行为,一种是直接丢弃消息,一种是让消息重回队列,这取决于发送确认时的参数,nack的批量操作需要手动设置。以下示例为拒绝单个消息,要求broker重新排队:

GetResponse gr = channel.basicGet("some.queue",false);

channel.basicNack(gr.getEnvelope(),getDeliveryTag(),false,true);

此示例为一次拒绝两个消息:

GetResponse gr1 = channel.basicGet("some.queue", false);

GetResponse gr2 = channel.basicGet("some.queue", false);

channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true);

当消息被重新排队时,如果可能的话,它将被放置在其队列中的原始位置。如果不可能(由于当多个消费者共享队列时,同时传递和来自其他消费者的确认操作),该消息将被重新排队到,靠近队列头的位置。

channel预取设置(QoS)

RabbitMQ在从队列中取出消息,默认一次会取出多条,由于消息在处理后,需要进行确认操作,RabbitMQ允许在使用时,限制通道(连接)上未确认的消息数(也称为“预取计数”)。

不幸的是,以channel为单位进行限制,并不是理想的范围,因为单个channel上,可以获取多个队列的消息,所以通道和队列需要协调发送的每个消息,以确保它们不超过限制。这在单台机器上会很慢,而在跨群集时会更慢。因此,RabbitMQ 在basic.qos方法中重新定义了global标志的含义:

AMQP 0-9-1 中prefetch_count的含义:false为在channel上的所有消费者之间共享,true为在connection上共享所有消费者。

RabbitMQ 中prefetch_count的含义:false为channel上单个消费者,true为channel上所有的消费者。从以上说明中可以看出RabbitMQ的QoS实现粒度更细更灵活高效,以下为示例:

Channel channel = ...;

Consumer consumer1 = ...;

Consumer consumer2 = ...;

channel.basicQos(10, false); // Per consumer limit

channel.basicQos(15, true);  // Per channel limit

channel.basicConsume("my-queue1", false, consumer1);

channel.basicConsume("my-queue2", false, consumer2);

确认模式和QoS预取值,对消费者吞吐量有显着影响。通常,增加预取将提高向消费者传递消息的速率。自动确认模式可以产生最佳的交付率。

但是,在这两种情况下,已传送但尚未处理的消息的数量也将增加,从而增加了消费者的内存消耗。100到300的范围值通常可提供最佳吞吐量,并且不会出现把消费者压垮的风险。QoS预取设置不会影响到使用basic.get(“pull API”)获取的消息。