1.RabbitMQ的源码持久化
2.详细讲解!RabbitMQ如何防止数据丢失,分析看这篇就够了
RabbitMQ的持久持久化
RabbitMQ的持久化特性涉及交换机、队列及消息三个层面,源码确保系统在异常或重启后能自动恢复。分析
当使用php-amqplib实现RabbitMQ时,持久wcheng源码可直接调用相关函数,源码无需额外配置。分析
1、持久交换机持久化:通过exchange_declare方法创建交换机,源码设置durable参数为true可使交换机在RabbitMQ重启后自动重建,分析反之则不会重建。持久
2、源码队列持久化:队列创建时,分析queue_declare方法的持久durable参数决定队列是否在RabbitMQ重启后自动恢复。
3、消息持久化:消息持久化依赖队列持久化,通过在创建消息时设置delivery_mode属性为2,可确保消息在磁盘中持久保存,即使队列消失,消息也不会丢失。精准ip查询源码
消息持久化与交换机、队列持久化不同之处在于,它侧重于消息的存储方式,确保消息在系统重启后仍然可用。
在配置客户端时,合理设置参数有助于提升开发效率,利用客户端对象的管理功能如索引、集群管理等,方便IDE搜索与管理。
总体而言,个人博客主页源码RabbitMQ的持久化机制通过在关键组件上启用持久性,确保了系统的稳定性和数据完整性,是构建可靠消息队列系统的重要保障。
详细讲解!RabbitMQ如何防止数据丢失,看这篇就够了
思维导图
一、分析数据丢失的原因
分析RabbitMQ消息丢失的情况,不妨先看看一条消息从生产者发送到消费者消费的过程:
可以看出,一条消息整个过程要经历两次的网络传输:从生产者发送到RabbitMQ服务器,从RabbitMQ服务器发送到消费者。网页QB跳转源码在消费者未消费前存储在队列(Queue)中。所以可以知道,有三个场景下是会发生消息丢失的:
针对以上三种场景,RabbitMQ提供了三种解决的方式,分别是消息持久化,confirm机制,ACK事务机制。
二、消息持久化
RabbitMQ是支持消息持久化的,消息持久化需要设置:Exchange为持久化和Queue持久化,thinkphp掌上小说源码这样当消息发送到RabbitMQ服务器时,消息就会持久化。首先看Exchange交换机的类图:
看这个类图其实是要说明上一篇文章介绍的四种交换机都是AbstractExchange抽象类的子类,所以根据java的特性,创建子类的实例会先调用父类的构造器,父类也就是AbstractExchange的构造器是怎么样的呢?
从上面的注释可以看到durable参数表示是否持久化。默认是持久化(true)。创建持久化的Exchange可以这样写:
@Bean public DirectExchange rabbitmqDemoDirectExchange() { //Direct交换机 return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false); }
接着是Queue队列,我们先看看Queue的构造器是怎么样的:
也是通过durable参数设置是否持久化,默认是true。所以创建时可以不指定:
@Bean public Queue fanoutExchangeQueueA() { //只需要指定名称,默认是持久化的 return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A); }
这就完成了消息持久化的设置,接下来启动项目,发送几条消息,我们可以看到:
怎么证明是已经持久化了呢,实际上可以找到对应的文件:
找到对应磁盘中的目录:
消息持久化可以防止消息在RabbitMQ Server中不会因为宕机重启而丢失。
三、消息确认机制
3.1 confirm机制
在生产者发送到RabbitMQ Server时有可能因为网络问题导致投递失败,从而丢失数据。我们可以使用confirm模式防止数据丢失。工作流程是怎么样的呢,看以下图解:
从上图中可以看到是通过两个回调函数confirm()、returnedMessage()进行通知。一条消息从生产者发送到RabbitMQ,首先会发送到Exchange,对应回调函数confirm()。第二步从Exchange路由分配到Queue中,对应回调函数则是returnedMessage()。
代码怎么实现呢,请看演示:
首先在application.yml配置文件中加上如下配置:
spring: rabbitmq: publisher-confirms: true # publisher-returns: true template: mandatory: true # publisher-confirms:设置为true时。当消息投递到Exchange后,会回调confirm()方法进行通知生产者 # publisher-returns:设置为true时。当消息匹配到Queue并且失败时,会通过回调returnedMessage()方法返回消息 # spring.rabbitmq.template.mandatory: 设置为true时。指定消息在没有被队列接收时会通过回调returnedMessage()方法退回。
接着我们需要定义回调方法:
@Component public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class); /** * 监听消息是否到达Exchange * * @param correlationData 包含消息的唯一标识的对象 * @param ack true 标识 ack,false 标识 nack * @param cause nack 投递失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { logger.info("消息投递成功~消息Id:{ }", correlationData.getId()); } else { logger.error("消息投递失败,Id:{ },错误提示:{ }", correlationData.getId(), cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("消息没有路由到队列,获得返回的消息"); Map map = byteToObject(message.getBody(), Map.class); logger.info("message body: { }", map == null ? "" : map.toString()); logger.info("replyCode: { }", replyCode); logger.info("replyText: { }", replyText); logger.info("exchange: { }", exchange); logger.info("routingKey: { }", exchange); logger.info("------------> end <------------"); } @SuppressWarnings("unchecked") private T byteToObject(byte[] bytes, Class clazz) { T t; try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bis)) { t = (T) ois.readObject(); } catch (Exception e) { e.printStackTrace(); return null; } return t; } }
...