RabbitMQ消息投递模式

RabbitMQ实现并扩展了AMQP协议,是目前最受欢迎的开源消息代理,是异构系统间消息传递的首选。

RabbitMQ概述

RabbitMQ基于Erlang语言,使用高级消息队列协议(AMQP)构建。

AMQP是面向消息的开放标准应用层协议,特点是消息定向、排队、路由(包括点对点和发布 - 订阅),可靠和安全。

RabbitMQ易于部署、轻量级,支持多种消息传递协议,在分布式和高可用场景中受到追捧。

RabbitMQ工作模式

RabbitMQ中生产者创建一个消息,直接投递exchange,Routes根据头headers、bindings和routing key,判断消息应该发往的队列,消费者从队列取出消息。

消息发布者(生产者)在发送消息时,除消息本身还会附带一些元数据,RabbitMQ根据元数据信息了解发布者意图,按期望规则投递消息。

RabbitMQ将消息路由规则一分为二,一部分由发布者决定,一部分取决于bindings信息,bindings可通过RabbitMQ控制台动态创建。

如此,在不修改应用的前提下,提高消息路由灵活性。

消息发布者可决定消息交换类型(Exchange Types),bindings根据交换类型的匹配规则细分调整,不同exchange type决定bindings所能使用的匹配规则。

所以,对RabbitMQ初期往往都集中在Exchange Types和bindings上。

注:exchange概念不是所有MQ都有,如:ActiveMQ。

RabbitMQ交换类型/Exchange Types

Exchange接收一条消息并将消息路由到零或多个队列。

所使用的路由算法取决于交换类型( exchange type)和绑定规则(bindings)。

RabbitMQ提供四种交换类型:Direct、Fanout、Topic、Headers。

Exchange type外其他重要属性:

Durability:持久性(exchanges在代理重启依然存在)。

Auto-delete:当最后一个队列被解除绑定时删除exchange。

Arguments:可选,由插件和特定的代理使用。

默认模式

RabbitMQ默认交换类型是Direct,一个预先声明好的,名称为空的exchange。对简单应用程序非常有用。队列会自动绑定它,这个模式下队列名称和路由key直接匹配。

如,声明一个名为"search-indexing-online"队列,在发布消息时,若使用“search-index-online”作为路由key,并指定使用默认的exchange。

最终,消息将被路由到"search-index -online"队列。

使用默认exchange,让人感觉直接将消息投递给了队列,尽管从技术上讲是使用了一个名称为空的exchange。

channel.basic_publish(exchange='',routing_key='search-index -online', body=message)

Direct模式/Direct Exchange

Direct exchange根据消息路由key将消息传递到队列。

Direct exchange对于消息的单播路由非常理想(尽管也能用于多播路由)。

工作过程如下:

队列使用routing key K绑定到exchange。

消息带着routing key R到达exchange时,如果K = R,交换器将消息路由到该队列。

Direct exchange通常用于以循环方式在多个worke间分配任务,消费者间形成竞争关系。相当于消息在多个消费者间负载均衡。

注:不是在队列间进行负载均衡。

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)


Fanout模式/Fanout Exchange

Fanout模式会忽略routing key,将消息直接路由到绑定的队列。

若将N个队列绑定到同一个fanout exchange,向该exchange发布新消息,这N个队列将收到同样消息。

Fanout exchange是广播消息的理想选择。

exchange='logs',exchange_type='fanout'
channel.basic_publish(exchange='logs',routing_key='',body=message)


Topic模式/Topic Exchange

Topic模式下routing key不能随意命名,须以逗点(.)分隔的单词列表,如:quick.orange.rabbit。

binding用的key也要遵循此规则。

Topic模式与Direct模式很像,通过路由key将消息发送到匹配队列,Topic支持通配有选择的将一条消息发给N个队列。

routing_key匹配规则需要注意两点:* (star) 代表匹配一个单词。# (hash) 代表匹配零或多个单词。

channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
channel.basic_publish(exchange='topic_logs', routing_key=routing_key,body=message)
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name,routing_key=binding_key)

channel.basic_consume(callback, queue=queue_name,no_ack=True)

注:routing_key中可包含任意数量单词,最多达255个字节。

此模式应用场景:

希望根据日志级别和日志来源订阅日志。

如:希望看到来自'cron'系统的错误日志,以及来自'kern'系统的所有日志。

Headers模式/Headers Exchange

此模式使用消息标头属性进行路由。

Headers Exchange在多个属性上进行路由。

此模式忽略routing key属性。

用于路由的属性取自headers。

若header值等于绑定时指定值,则认为消息是匹配的。

可使用多个标头将队列绑定到headers exchange。

在这种情况下,代理需要一条额外的信息,即匹配任意一个标题,还是匹配所有标题,这就是"x-match"绑定参数的用途。

当"x-match"参数设置为"any"时,只需一个匹配header就够了。

"x-match"设置为"all",强制所有值匹配。

注:以字符串x-开头的header将不用于匹配计算。

RabbitMQ ACK/消息确认机制

根据AMQP协议定义,发送协议无法保证到达,生产者和消费者都需一种交付和确认机制。

生产端确认机制

消息发送出去不能假定该消息已到达服务器并已成功处理。

可能在途中丢失或交付可能会延迟。

基于AMQP标准,保证消息不丢失的唯一方法是事务,事务模式下交易吞吐量将锐减250倍,为解决此问题,引入确认机制。

调用confirm.select方法,启用确认机制,confirm模式无法与事务模式同时出现。

confirm分三种模式:同步阻塞的confirm,批量confirm,异步回调confirm。

三者常规测试结果效率依次递增,都是通过delivery-tag确认消息是否到达。

消费端确认机制

确认模式分为:发出后即为成功投递和收到客户端明确回复。

明确回复可以是”成功“,也可以是”失败“:basic.ack用于确认成功,basic.nack和basic.reject用于告知失败,reject不能用于批量告知失败。

失败处理方案

直接丢弃消息和消息重回队列,取决于回复参数,nack批量操作需明确指定参数,示例为拒绝单个消息,并要求重回队列:

GetResponse gr = channel.basicGet("some.queue",false);
channel.basicNack(gr.getEnvelope(),getDeliveryTag(),false,true);

一次拒绝两个消息:

GetResponse gr1 = channel.basicGet("some.queue", false);
GetResponse gr2 = channel.basicGet("some.queue", false);
channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true);

消息重回队列,如果可能,消息会被放置在队列原始位置。

如果不能(当多个消费者共享队列时),消息被重新排队,靠近队列头位置。

预取机制/QoS

RabbitMQ从队列中取出消息,默认一次会取出多条,消息在处理后需要确认操作,RabbitMQ允许在使用时,限制通道(连接)上未确认消息数(“预取计数”)。

以channel为单位进行限制不是理想范围,单个channel上允许获取多个队列消息。

channel和队列需要协调发送的消息以确保不超出限制。

在单台机器上会很慢,在跨群集时会更慢。

RabbitMQ重新定义basic.qos方法参数解决以上问题。

prefetch_count=false,则上述限制针对channel上单个消费者,true针对channel上所有消费者。示例:

Channel channel = ...;

Consumer consumer1 = ...;

Consumer consumer2 = ...;

channel.basicQos(10, false); // Per consumer limit

channel.basicQos(15, true);  // Per channel limit

channel.basicConsume("my-queue1", false, consumer1);

channel.basicConsume("my-queue2", false, consumer2);

RabbitMQ消息确认模式和QoS预取值,对消费者吞吐量影响很大。

增加预取将提高向消费者传递消息的速率,自动确认模式能产生最佳交付率。

同时,已传送尚未处理的消息也将增加,从而,增加消费者的内存消耗。

100到300范围值通常拥有最佳吞吐量,将消费者压垮的机率大大减少。

注:QoS预取设置不会影响到使用basic.get(“pull API”)获取的消息。