RabbitMQ指南

RabbitMQ消息生产者将消息投递给exchange,Routes根据headers、bindings和routing key,判断消息发往的队列,消费者从队列取出消息。

此文为简单概述,RabbitMQ核心概念参见RabbitMQ消息投递模式

Exchange

Exchange是RabbitMQ一个重要概念,详情查看RabbitMQ消息投递模式

持久化

RabbitMQ消息支持久化,声明持久化的消息,即便生产者在发出消息的那一刻,没有一个消费者在线,消息依然有效。

非持久化类型的消息,没有消费者在线时,消息会被丢弃,后者只关心即时消息。

持久化声明

以编码方式实现。

exchange持久化

channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE , true);

queue持久化

channel.queueDeclare(QUEUE_NAME,true, exclusive,autoDelete,arguments);

message持久化

channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

接收端exchange声明与生产端保持一致

channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE , true);

持久化确认

RabbitMQ消息持久化确认分为两种:

事务确认机制

channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
CommitOk txCommit = channel.txCommit();

confirm.select方式

SelectOk confirmSelect = channel.confirmSelect();//to do
channel.waitForConfirms()

注:事务详细说明参见消息投递

消息没有找到对应queue被丢弃时,开启mandatory(true)标识位,可让生产端得到反馈。

如果对消费端在线状态有要求,使用immediate(true)标识位,可让生产端知道消息对应的queue(s)是否有消费者在线。

消息处理确认

  Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
          AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
       //使用Ack确认
        long deliveryTag = envelope.getDeliveryTag();
        channel.basicAck(deliveryTag, true);
      }
    };
   channel.basicConsume("ab", false, consumer);

分发策略

RabbitMQ默认不关心消费端消息堆积数量,只负责把消息平均分发给每个消费端。

一个queue允许多个Connection,一个Connection可创建多个Channel,basicQos针对channel设置消息预取条数。

channel.basicQos(prefetchCount);

排他队列

Exclusive Queue

排他性队列属于临时队列的一种,一般在消费端创建,用于临时获取消息。

独占队列生命周期伴随着一个连接(Connection)的创建而生,连接(Connection)关闭则销毁,且只属于当前连接,其他连接(Connection)无法使用。

声明方式

channel.queueDeclare("exQueueName", true, true, false, null);
channel.queueBind("exQueueName", EXCHANGE_NAME, routingKey);

排他队列强调首次声明,排它只针对连接(Connection),而非同个连接上的多个通道(Channel)。

适用于调试或查看消息。 更多参考RabbitMQ AMQP概念全解析