RabbitMQ 拓展篇

  1. 1. jackson
  2. 2. 消息高可用
    1. 2.1. 消息持久化
      1. 2.1.1. queue 持久化
      2. 2.1.2. 消息 持久化
      3. 2.1.3. exchange 持久化
    2. 2.2. 消息确认
    3. 2.3. 确认发送(生产者)
    4. 2.4. 确认发送(消费者)
  3. 3. 死信和延时队列
    1. 3.1. 应用场景
    2. 3.2. 死信触发条件
    3. 3.3. 设置方法

jackson

在 rabbitMq 中还有个重要的组件是 MessageConverter,用于消息格式的设置。
默认使用amqp的 SimpleMessageConverter 使用text传输,在传输量较大的数据时比较消耗性能。
另一种就是 Jackson2JsonMessageConverter,使用json传输。

全局配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Configuration
public class RabbitMqConfig {

/* 设置消息传输形式 使用jackson 相对默认SimpleMessageConverter 提高性能 */

/**
* 发送消息设置用json的形式序列化
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}

/**
* 接受的时候使用jackson 反序列化
* @param connectionFactory
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}

接受者配置

1
2
3
4
5
6
7
8
9
10
@Component
@RabbitListener(queues = "dcloud.fanout.queue",containerFactory = "rabbitListenerContainerFactory")
public class FanoutReceiverA {

@RabbitHandler
public void process(@Payload String message) {
System.out.println("receive" + message);
}

}

贴出simple和jack ,string和Object 数据传输格式

javabean_jackson_messagejavabean_jackson_message

javabean_simple_messagejavabean_simple_message

string_jackson_messagestring_jackson_message

string_simple_messagestring_simple_message

消息高可用

消息不管在生成,传输,队列,消费中都可能存在问题,丢失或者重复消费等,因此需要配置一些参数或功能以达到消息高可用

消息持久化

持久化设置时三个缺一不可。

queue 持久化

设置queue为 durable ,new Queue() 和 RabbitMQ Managemnet 默认持久化

消息 持久化

核心为设置 Message 的 MessageDeliveryMode 为 PERSISTENT。
使用 rabbitTemplate.convertAndSend 方法中 默认为此模式

exchange 持久化

同queue 设置 exchange Type 为 durable …

消息确认

确认发送(生产者)

继承 RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback 两个接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 是否正确到达 Exchange
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)

/**
* Exchange 发送到 queue 发送确定
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)

并设置 指定监听回调

1
2
3
4
5
6
template.setConfirmCallback(this);
template.setReturnCallback(this);
// 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,
// 那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);
// 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉
template.setMandatory(true);

确认发送(消费者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RabbitHandler
public void process(@Payload String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.out.println("receive" + message);

//确认消息,deliveryTag为相对channel的消息唯一标识,
//multiple 批处理true 可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(tag, false);

//拒绝消息 requeue 是否重新进入队列, true 重新进入队列 false 消息被丢弃
channel.basicReject(tag, true);

//否认消息
channel.basicNack(tag, false, true);

}

死信和延时队列

保证消息高可用的场景还应包含对异常信息的处理,这部分数据在死信交换机中,
延时队列实现异步延迟操作的功能。

应用场景

  • 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单
  • 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用
  • 延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试
  • 物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时

死信触发条件

  • 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false
  • 消息/队列 因为设置了TTL而过期
  • 消息进入了一条已经达到最大长度的队列

设置方法

  • 新增死信交换器和队列(和普通的无差)
  • 新建延时队列设置args (x-dead-letter-exchange/x-dead-letter-routing-key)
  • 设置队列延时 args(x-message-ttl)或消息延时(message.getMessageProperties().setExpiration)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 延时队列
*
* @return
*/
@Bean
public Queue deployQueue() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", "dcloud.dlxExchange");
args.put("x-dead-letter-routing-key", "dlx");
// 设置消息的过期时间, 单位是毫秒
args.put("x-message-ttl", 5000);
return new Queue("dcloud.deployQueue", true, false, false, args);
}

发送测试

this.rabbitTemplate.convertAndSend(exchange, routingKey, message, message -> {
// 设置消息延时 单位 毫秒
message.getMessageProperties().setExpiration(10000 + "");
return message;
}, correlationDataExtend);
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者