RabbitMQ教程集锦

RabbitMQ的生产者依赖于exchange,消费者依赖于queue,bind控制exchange与queue间的关系。

RabbitMQ的exchange、queue、bind可以在控制台创建,也可以通过程序端声明。

创建exchange关注名字、类型(topic、fanout、direct、headers)和是否持久化,创建queue关注名字和是否持久化,bind这一动作既可以在exchange端做,也可以在queue端进行。

对于topic、fanout类型的exchange,如果有多个queue对应,在生产端声明bind是没有意义的。理想用法生产端声明exchange,消费端声明queue,控制台创建路由配置。

RabbitMQ的消息可以是持久化的,也可以是非持久化的。声明了持久化的消息,即便生产者在发出消息的那一刻,没有一个消费者在线,消息依然是有效的。

而非持久化类型的消息,在没有消费者在线的情况下消息会被丢弃。两种不同的方式对应不同的业务场景,后者只关心即时消息。
对于消息没有找到对应queue被丢弃时,开启mandatory(true)标识位,可以让生产端得到反馈。

如果对消费端的在线状态有要求,使用immediate(true)标识位,可以让生产端知道消息对应的queue(s)是否有消费者在线。

消息持久化声明

以编码方式实现。

exchange持久化

--java-*

channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE , true);

queue 持久化

channel.queueDeclare(QUEUE_NAME,true, exclusive,autoDelete,arguments);

message持久化

channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

接收端的exchange声明与生产端保持一致

channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE , true);

生产端消息持久化确认

使用事务确认机制

channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
CommitOk txCommit = channel.txCommit();

通过confirm.select方式

SelectOk confirmSelect = channel.confirmSelect();
//to do
channel.waitForConfirms()

消费端消息确认处理

  Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
          AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
       //使用Ack确认
        long deliveryTag = envelope.getDeliveryTag();
        channel.basicAck(deliveryTag, true);
      }
    };
   channel.basicConsume("ab", false, consumer);

RabbitMQ分发策略

RabbitMQ默认不会查看消费端消息堆积的数量,它只负责把消息平均的分发给每个消费端。一个queue允许多个Connection,一个Connection可以创建多个Channel,basicQos针对channel设置一次最多可以拿到多少条消息。

channel.basicQos(prefetchCount);

独占/排他队列

Exclusive Queue

排他性队列属于临时队列的一种,只不过有一个具体的名字,一般在消费端创建,用于临时获取查看消息。独占队列生命周期伴随着一个连接(Connection)的建立而生,连接(Connection)的关闭而死,并且只属于这个连接,其他连接(Connection)无法使用。

声明方式

channel.queueDeclare("exQueueName", true, true, false, null);
channel.queueBind("exQueueName", EXCHANGE_NAME, routingKey);

排他性队列强调首次声明,排它只针对连接(Connection)之间,而非同个连接上的多个通道(Channel)。适用于调试或查看消息的场景。

有关RabbitMQ的更多说明,参考RabbitMQ中的AMQP概念全解析。