RabbitMQ教程

在RabbitMQ中消息的生产者将消息投递给exchange,Routes根据头headers、bindings和routing key,判断消息应被发往的队列,消费者从队列取出消息。

此文为简单概述,RabbitMQ核心概念参见RabbitMQ消息投递模式

Exchange

在RabbitMQ中exchange是一个很重要的概念,它决定了消息的很多方面,详情查看RabbitMQ消息投递模式

持久化

RabbitMQ的消息可以是持久化的,也可选择非持久化。声明了持久化的消息,即便生产者在发出消息的那一刻,没有一个消费者在线,消息依然是有效的。而非持久化类型的消息,在没有消费者在线的情况下,消息会被丢弃,后者只关心即时消息。

持久化声明

以编码方式实现。

exchange持久化

--java-*
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE , true);

queue 持久化

channel.queueDeclare(QUEUE_NAME,true, exclusive,autoDelete,arguments);

message持久化

channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

接收端的exchange声明与生产端保持一致

channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE , true);

消息持久化确认

RabbitMQ消息持久化确认分为两种。

使用事务确认机制

channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
CommitOk txCommit = channel.txCommit();

通过confirm.select方式

SelectOk confirmSelect = channel.confirmSelect();//to do
channel.waitForConfirms()

注:有关事务的详细说明参见消息投递

对于消息没有找到对应queue被丢弃时,开启mandatory(true)标识位,可以让生产端得到反馈。如果对消费端的在线状态有要求,使用immediate(true)标识位,可以让生产端知道消息对应的queue(s)是否有消费者在线。

消息处理确认

  Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
          AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
       //使用Ack确认
        long deliveryTag = envelope.getDeliveryTag();
        channel.basicAck(deliveryTag, true);
      }
    };
   channel.basicConsume("ab", false, consumer);

分发策略

RabbitMQ默认不会查看消费端消息堆积的数量,它只负责把消息平均的分发给每个消费端。一个queue允许多个Connection,一个Connection可以创建多个Channel,basicQos针对channel设置一次最多可以拿到多少条消息。

channel.basicQos(prefetchCount);

排他队列

Exclusive Queue

排他性队列属于临时队列的一种,只不过有一个具体的名字,一般在消费端创建,用于临时获取查看消息。独占队列生命周期伴随着一个连接(Connection)的建立而生,连接(Connection)的关闭而死,并且只属于这个连接,其他连接(Connection)无法使用。

声明方式

channel.queueDeclare("exQueueName", true, true, false, null);
channel.queueBind("exQueueName", EXCHANGE_NAME, routingKey);

排他性队列强调首次声明,排它只针对连接(Connection)之间,而非同个连接上的多个通道(Channel)。适用于调试或查看消息的场景。

有关RabbitMQ的更多说明,参考RabbitMQ中的AMQP概念全解析。