RabbitMQ消息投递模式

RabbitMQ实现并扩展了AMQP协议,是目前最受欢迎的开源消息代理,是异构系统之间消息传递的首选。

RabbitMQ概述

RabbitMQ基于Erlang语言,使用高级消息队列协议(AMQP)构建。AMQP是面向消息的开放标准应用层协议,AMQP的特点是消息定向、排队、路由(包括点对点和发布 - 订阅),可靠和安全。

RabbitMQ的易于部署和轻量级,支持多种消息传递协议的特性,在分布式和高可用场景中受到追捧。

RabbitMQ工作模式

RabbitMQ中,生产者创建一个消息,直接投递exchange,Routes根据头headers、bindings和routing key,判断消息应该被发往的队列,消费者从队列取出消息。

消息的发布者(生产者),在发送消息时,除了消息本身,还会附带一些元数据,RabbitMQ根据元数据信息了解发布者的意图,按期望的规则投递消息。

RabbitMQ将消息的路由规则一分为二,一部分由发布者决定,一部分取决于bindings信息,bindings可通过RabbitMQ控制台动态创建。如此,在不修改应用程序的前提下,提高了消息路由的灵活性。

消息发布者可以决定消息所使用的交换类型(Exchange Types),bindings根据交换类型允许的匹配规则进行细分调整,不同的exchange type决定了bindings所能使用的匹配规则。所以,对RabbitMQ初期的了解往往都集中在Exchange Types和bindings上。

注:其他MQ不一定有exchange的概念,如ActiveMQ。

RabbitMQ交换类型(Exchange Types)

Exchange接收一条消息,并将消息路由到零个或多个队列。所使用的路由算法,取决于交换类型( exchange type)和绑定规则(bindings)。RabbitMQ提供四种交换类型:Direct、Fanout、Topic、Headers。

除了exchange type外,还有一些重要的属性:

Durability:持久性(exchanges在代理重启依然存在)

Auto-delete:当最后一个队列被解除绑定时删除exchange。

Arguments:可选,由插件和特定的代理使用。

默认模式

RabbitMQ默认交换类型是Direct,是一个预先就声明好的,名称为空的exchange。对于简单的应用程序非常有用。每个队列都自动绑定它,这个模式下队列名称和路由key直接匹配。

例如,声明一个名为"search-indexing-online"的队列。在发布消息时,如果使用“search-index-online”作为路由key,并指定使用默认的exchange,最终,消息将被路由到"search-index -online"队列。

使用默认的exchange,让人感觉直接将消息投递给了队列,尽管从技术上讲是使用了一个名称为空的exchange。

channel.basic_publish(exchange='',routing_key='search-index -online', body=message)

Direct模式(Direct Exchange)

Direct exchange根据消息路由key将消息传递到队列。direct exchange对于消息的单播路由非常理想(尽管也能用于多播路由)。工作过程如下:

队列使用routing key K绑定到exchange。

消息带着routing key R到达exchange时,如果K = R,交换器将消息路由到该队列。

Direct exchange通常用于以循环方式,在多个worke之间分配任务,消费者之间成竞争关系。相当于消息是在多个消费者之间负载均衡。

注:不是在队列之间进行负载均衡。

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)


Fanout模式(Fanout Exchange)

Fanout模式会忽略routing key,将消息直接路由到绑定的队列。如果将N个队列绑定到同一个fanout exchange,向该exchange发布新消息,这N个队列将收到同样的消息。fanout exchange是广播消息的理想选择。

exchange='logs',exchange_type='fanout'
channel.basic_publish(exchange='logs',routing_key='',body=message)


Topic模式(Topic Exchange)

Topic模式下的routing key不能随意命名,必须是以逗点(.)分隔的单词列表,如:quick.orange.rabbit。binding所使用的key也要遵循此规则。

Topic模式与Direct模式很像,都是通过路由key将消息发送到匹配的队列,Topic支持通配可以有选择的将一条消息发给N个队列。

routing_key匹配规则有两点需要注意:* (star) 代表匹配一个单词。# (hash) 代表匹配零个或多个单词。

channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
channel.basic_publish(exchange='topic_logs', routing_key=routing_key,body=message)
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name,routing_key=binding_key)

channel.basic_consume(callback, queue=queue_name,no_ack=True)

注:routing_key中可以包含任意数量的单词,最多长达255个字节。

此模式应用场景:希望根据日志的级别和日志的来源订阅日志。如:希望看到来自'cron'系统的错误日志,以及来自'kern'系统的所有日志。

Headers模式(Headers Exchange)

此模式使用消息标头属性进行路由。Headers Exchange用于在多个属性上进行路由。此模式忽略routing key属性。用于路由的属性取自headers。如果header的值等于绑定时指定的值,则认为消息是匹配的。

可以使用多个标头将队列绑定到headers exchange。在这种情况下,代理需要一条额外的信息,即匹配任意一个标题,还是匹配所有标题,这就是"x-match"绑定参数的用途。当"x-match"参数设置为"any"时,只需一个匹配header就够了。"x-match"设置为"all",强制所有值匹配。

注:以字符串x-开头的header将不用于匹配计算。

RabbitMQ ACK(消息确认机制)

根据AMQP协议的定义,发送协议无法保证到达,生产者和消费者都需要一种交付和确认的机制。

生产端确认机制

消息发送出去,不能假定该消息已到达服务器并已成功处理。可能在途中丢失,或交付可能会延迟。基于AMQP标准,保证消息不丢失的唯一方法是事务,事务模式下的交易吞吐量将锐减250倍,为解决此问题,引入确认机制。

调用confirm.select方法,启用确认机制,confirm模式无法与事务模式同时出现。

confirm分三种模式:同步阻塞的confirm,批量confirm,异步回调confirm。三者常规测试结果效率依次递增,都是通过delivery-tag确认消息是否到达。

消费端确认机制

确认模式分为:发出后即为成功投递,和收到客户端明确回复。

明确回复可以是”成功“,也可以是”失败“:basic.ack用于确认成功,basic.nack和basic.reject用于告知失败,reject不能用于批量告知失败。

失败处理方案

直接丢弃消息和消息重回队列,取决于回复的参数,nack批量操作需明确指定参数,示例为拒绝单个消息,并要求重回队列:

GetResponse gr = channel.basicGet("some.queue",false);
channel.basicNack(gr.getEnvelope(),getDeliveryTag(),false,true);

一次拒绝两个消息:

GetResponse gr1 = channel.basicGet("some.queue", false);
GetResponse gr2 = channel.basicGet("some.queue", false);
channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true);

消息重回队列,如果可能会被放置在队列的原始位置。如果不可能(当多个消费者共享队列时),消息被重新排队,靠近队列头的位置。

预取机制(QoS)

RabbitMQ从队列中取出消息,默认一次会取出多条,消息在处理后需要确认操作,RabbitMQ允许在使用时,限制通道(连接)上未确认的消息数(“预取计数”)。

不幸的是,以channel为单位进行限制,不是理想的范围;单个channel上允许获取多个队列的消息。channel和队列需要协调发送的消息,以确保不超出限制。在单台机器上会很慢,在跨群集时会更慢。

RabbitMQ重新定义basic.qos方法中参数解决以上问题。prefetch_count=false,则上述限制针对channel上单个消费者,true则针对channel上所有的消费者。示例:

Channel channel = ...;

Consumer consumer1 = ...;

Consumer consumer2 = ...;

channel.basicQos(10, false); // Per consumer limit

channel.basicQos(15, true);  // Per channel limit

channel.basicConsume("my-queue1", false, consumer1);

channel.basicConsume("my-queue2", false, consumer2);

RabbitMQ消息确认模式和QoS预取值,对消费者吞吐量有很大影响。增加预取将提高向消费者传递消息的速率,自动确认模式可以产生最佳的交付率。同时,已传送尚未处理的消息也将增加,从而,增加消费者的内存消耗。100到300的范围值通常拥有最佳吞吐量,把消费者压垮的机率也大大减少。

注:QoS预取设置不会影响到使用basic.get(“pull API”)获取的消息。