公司做网站自己注册域名,大型门户网站建设推广,wordpress能干嘛,工作总结开头和结束语概述 作为消息中间件来说#xff0c;最重要的任务就是收发消息。因此我们在收发消息的过程中#xff0c;就要考虑消息是否会丢失的问题。结果是必然的#xff0c;假设我们没有采取任何措施#xff0c;那么消息一定会丢失。对于一些不那么重要的业务来说#xff0c;消息丢失…概述 作为消息中间件来说最重要的任务就是收发消息。因此我们在收发消息的过程中就要考虑消息是否会丢失的问题。结果是必然的假设我们没有采取任何措施那么消息一定会丢失。对于一些不那么重要的业务来说消息丢失几条是无所谓的例如使用消息中间件来做一个注册成功的业务那丢失几条是无所谓的但是如果是一些比较重要的业务来说消息一条也不能丢失。所以我们就要考虑消息是在哪个阶段丢失的应如何避免消息丢失。
如上图消息丢失大概分为三种情况
1. 生产者到Broker的过程产生问题由于应用程序故障、网络抖动等原因生产者并没有成功向Broker发送消息。
2. Broker本身的问题生产者成功将消息发送给了Broker但是Broker没有把消息保存好导致消息丢失。
3. Broker到消费者的过程产生问题Broker发送消息到消费者消费者在消费消息时由于没有处理好导致Broker将消费失败的消息从队列中删除了。
RabbitMQ针对这三种消息可能丢失的情形进行考虑做出了不同的应对
1. 针对生产者到Broker的问题RabbitMQ推出了发送方确认机制或者说是发布确认模式。
2. 针对Broker自身的问题RabbitMQ推出了持久化的机制例如针对交换机、队列以及消息的持久化。
3. 针对Broker到消费者的问题RabbitMQ推出了消息确认机制。
发送方确认
当消息从生产者发送出去之后消息有没有成功的到达Broker之中这是第一个消息可能丢失的情况。并且由于Broker内部也分成了Exchange和Queue两部分即使消息成功到达了交换机但是有没有到达队列之中这也是需要考虑的点。
对于该问题RabbitMQ提出了两种解决方案
事务发送方确认机制
在该篇文章中主要来介绍发送方确认机制。原因则是因为使用事务比较消耗性能因此日常开发中使用的并不多。针对刚才提到了交换机和队列之间的问题RabbitMQ也是全部考虑到了所以有两个方式来控制消息的可靠性传递
confirm确认模式return退回模式 confirm确认模式
spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testpublisher-confirm-type: correlated # 表示发送方确认模式的confirm机制correlated表示异步确认还有一种同步确认不管同步确认可能造成阻塞
// 可靠性传输的发送方确认
Configuration
public class ConfirmConfig {Bean(confirmQueue)public Queue confirmQueue() {return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}Bean(confirmExchange)public Exchange confirmExchange() {return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).durable(true).build();}Bean(confirmQueueBind)public Binding confirmQueueBind(Qualifier(confirmExchange) Exchange exchange,Qualifier(confirmQueue) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(confirm).noargs();}}
RestController
RequestMapping(/confirm)
public class ConfirmController {Resourceprivate RabbitTemplate rabbitTemplate;RequestMappingpublic void confirmQueue() {// 异步调用方法this.rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println(执行了消息确认机制中的confirm机制);if(b) {System.out.println(交换机接收到了消息消息id为 correlationData.getId());} else {System.out.println(交换机没有接收到消息原因为 s);System.out.println(处理具体业务选择重发或者其他);}}});// 定义一个全局id区分不同消息防止ack时出现错误String uuid UUID.randomUUID().toString().replace(-, );CorrelationData correlationData new CorrelationData(uuid);this.rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,confirm, hello 发送确认模式, correlationData);}}
Configuration
public class ConfirmListener {RabbitListener(queues Constants.CONFIRM_QUEUE)public void confirmListener(String msg) {System.out.println(接收到消息 msg);}} 启动程序之后当生产者发送消息之后就会出现如下内容 但是如果再次发送消息就会直接报错 原因是因为Spring的Bean默认是单例而RabbitTemplate对象同样支持一个回调所以就出现了如上错误。想要解决上述办法的话可以将Bean的作用域设置成多例模式。
Component
public class RabbitTemplateConfig {Bean// 这样做是有问题的但是并不知道问题出在哪里后续进行解决
// Scope(value prototype, proxyMode ScopedProxyMode.TARGET_CLASS)public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}} 由于使用上述方法并没有跑通程序因此我又使用了如下方法
Component
public class RabbitTemplateConfig {Bean(rabbitTemplate)// 这样做是有问题的但是并不知道问题出在哪里后续进行解决
// Scope(value prototype, proxyMode ScopedProxyMode.TARGET_CLASS)public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}Bean(rabbitTemplateConfirm)public RabbitTemplate rabbitTemplateConfirm(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);// 执行回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println(执行了消息确认机制中的confirm机制);if(b) {System.out.println(交换机接收到了消息消息id为 correlationData.getId());} else {System.out.println(交换机没有接收到消息原因为 s);System.out.println(处理具体业务选择重发或者其他);}}});return rabbitTemplate;}}
RestController
RequestMapping(/confirm)
public class ConfirmController {Resource(name rabbitTemplateConfirm)private RabbitTemplate rabbitTemplate;RequestMappingpublic void confirmQueue() {// 定义一个全局id区分不同消息防止ack时出现错误String uuid UUID.randomUUID().toString().replace(-, );CorrelationData correlationData new CorrelationData(uuid);this.rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,confirm, hello 发送确认模式, correlationData);}}
使用如上代码之后成功跑通程序获取到如下结果 在confirm模式中只要是消息没有到达交换机那就会出现错误结果。但是如果正确到达交换机即使没有正确到达队列回调函数也会成功执行到正确结果那部分。测试如下
1. 当给出交换机的名称错误时不会到达交换机返回错误结果 交换机名称故意写错原本是confirm.exchange 2. 交换机正确但是给出的路由键错误导致到不了相应的交换机 路由键故意写错原本是confirm 在上述两个例子中证明了即使使用了confirm确认模式消息也有可能在从交换机到队列的过程中出错。因此我们也需要在交换机和队列的传输过程中增加一个保障那就是return退回模式。
return退回模式
如上描述消息到达交换机之后会根据路由规则进行匹配把消息放到队列中。交换机到队列的过程如果消息无法被任何队列消费可能是队列不存在也可能是路由键没有匹配的队列可以选择把消息退回给发送方。
当我们写一个回调方法对消息进行处理就是return退回模式。
spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testpublisher-confirm-type: correlated # 表示发送方确认模式的confirm机制correlated表示异步确认还有一种同步确认不管同步确认可能造成阻塞publisher-returns: true # 表示发送方确认模式的return模式true表示开启template:mandatory: true # true表示交换机无法进行路由消息时会将消息返回给生产者false表示无法进行路由时直接丢弃
Component
public class RabbitTemplateConfig {Bean(rabbitTemplate)// 这样做是有问题的但是并不知道问题出在哪里后续进行解决
// Scope(value prototype, proxyMode ScopedProxyMode.TARGET_CLASS)public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}Bean(rabbitTemplateConfirm)public RabbitTemplate rabbitTemplateConfirm(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);// 执行confirm机制回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println(执行了消息确认机制中的confirm机制);if(b) {System.out.println(交换机接收到了消息消息id为 correlationData.getId());} else {System.out.println(交换机没有接收到消息原因为 s);System.out.println(处理具体业务选择重发或者其他);}}});// 执行了return机制回调方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println(执行了消息确认机制中的return模式);System.out.println(消息进行退回退回的消息是 new String(returnedMessage.getMessage().getBody()));}});return rabbitTemplate;}}
将发送方确认机制的两种模式完成之后再写一个交换机正确路由键不正确的例子就会出现如下结果 综上所述将生产者到Broker的消息丢失问题给解决了。总的来说包含了两个维度一个是从生产者到交换机一个是从交换机到队列。
持久化
持久化是RabbitMQ的可靠性保证机制之一它保证的是RabbitMQ内部的可靠性。
持久化分为三个部分交换机的持久化、队列的持久化、消息的持久化。
交换机的持久化
其实交换机的持久化早就在使用了只不过是没有进行介绍而已。在JavaSDK中通过声明交换机时的一个参数来实现在SpringBoot中也是通过声明交换机时的一个参数来实现durable。交换机设置成持久化之后交换机的属性就会在服务器内部保存当MQ的服务器发生意外宕机之后重启服务器后不需要重新去建立交换机持久化后的交换机会自动建立。
如果交换机不设置持久化那么MQ服务器在重启之后相关的交换机元数据就会消失。因此对于一个长期使用的交换机来说必然是要将其设置成持久化的。 队列的持久化
和交换机相同队列的持久化也早就在使用了。同样也是通过设置参数durable实现的。
不同的是队列的持久化比交换机的持久化还稍微重要些。设想队列不进行持久化当重启队列之后队列元数据就会被删除。既然队列都被删除了那消息肯定也都没了。这对于生产环境的机子来说是比较难搞的事情因此队列要进行持久化。 消息的持久化
队列持久化之后消息不进行持久化那重启之后照样没数据还不如不持久化队列呢。所以继交换机持久化、队列持久化之后消息也要进行持久化。
在SpringBoot中消息的持久化就是设置MessageProperties中的deliveyMode为PERSISTENT即可如下述代码 Configuration
public class DurableConfig {Bean(durableQueue)public Queue durableQueue() {return QueueBuilder.durable(Constants.DURABLE_QUEUE).build(); // 队列持久化}Bean(durableExchange)public Exchange durableExchange() {return ExchangeBuilder.directExchange(Constants.DURABLE_EXCHANGE).durable(true).build(); // 交换机持久化}Bean(durableQueueBind)public Binding durableQueueBind(Qualifier(durableExchange) Exchange exchange,Qualifier(durableQueue) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(durable).noargs();}}
RestController
RequestMapping(/durable)
public class DurableController {Resourcepublic RabbitTemplate rabbitTemplate;RequestMappingpublic void durableQueue() {String body hello 持久化;Message msg new Message(body.getBytes(StandardCharsets.UTF_8), new MessageProperties());msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化this.rabbitTemplate.convertAndSend(Constants.DURABLE_EXCHANGE, durable, msg);System.out.println(持久化发送消息成功);}}
Configuration
public class DurableLister {RabbitListener(queues Constants.DURABLE_QUEUE)public void durableListener(String msg) {System.out.println(持久化消息消费成功 msg);}}
消息确认
消息确认机制是保证可靠性传输的最后一个机制保障的是从Broker到消费者的整个过程。
RabbitMQ向消费者发送消息之后就会把这条消息删除。但是如果消费者处理消息异常就会造成消息丢失。为了保障消息从队列顺利到达消费者RabbitMQ提出了消息确认机制。
消费者在订阅队列是可以指定autoAck参数根据这个参数设置消息确认机制可以分成以下两种
手动确认当autoAck等于false时RabbitMQ会等待消费者显示地调用Basic.Ack命令恢复确认信号后才能从内存或者磁盘中删除消息。这种模式适合于消息可靠性要求较高的场景。自动确认当autoAck等于true时RabbitMQ会自动把发送出去的消息设置为确认然后从内存或者硬盘中删除消息而不管消费者是否真正消费了这条消息。这种模式适合于消息可靠性要求不高的场景。
当消费者指定需要手动确认时队列中的消息就分成了两个部分
等待投递给消费者的消息已经投递给消费者但是还没有收到消费者确认信号的消息
如果RabbitMQ一直没有收到消费者的确认信号并且消费此消息的消费者已经断开连接则RabbitMQ会安排该消息重新进入队列等待投递给下一个消费者当然也有可能还是原来那个消费者。 手动确认
消费者在收到消息之后可以选择确认也可以选择直接拒绝或者跳过。RabbitMQ也提供了不同的确认应答方式消费者可以调用与其对应的channel相关方法共有三种
肯定确认
channel,basiAck(long deliveryTag, boolean multiple)表示RabbitMQ已经知道该消息处理成功可以将其丢弃了。
deliveryTag表示消息的唯一标识是一个单调递增的64位的长整型值。deliveryTag是每个信道独立维护的所以在每个信道上都是唯一的。当消费者确认一条消息时必须使用对应的信道上进行确认。
multiple表示是否批量确认在某些情况下为了减少网络流量可以对一系列的deliveryTag进行批量确认。值为true则会一次性确认所有小于或等于指定deliveryTag的消息。值为false时则只会确认当前指定的deliveryTag的消息。 deliveryTag是RabbitMQ中消息确认机制的一个重要组成部分他确保了消息传递的可靠性和顺序性。 否定确认
channle.basicReject(long deliveryTag, boolean requeue)消费者可以调用该消息告诉RabbbitMQ拒绝该消息。
requeue表示拒绝后这条消息如何处理。值为true时会重新将该消息存入队列以便可以发送给下一个订阅的消费者。值为false时则会把消息从队列中删除而不会把他发送给新的消费者。
否定确认
channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)该方法和第二个方法含义相同唯一的区别是该方法可以批量处理。
代码案例
主要展示SpringBoot模式下的代码书写。
Spring-AMQP对于消息确认机制提供了三种策略
AcknowledgeMode.NONE表示消息一旦投递给消费者不管消费者是否处理成功该消息RabbitMQ都会自动确认从队列中移除该消息。如果消费者处理消息失败消息可能会丢失。AcknowledgeMode.AUTO默认表示消息投递给消费者如果处理过程中抛出了异常则不会确认该消息但是如果没有发生异常该消息就会自动确认。AcknowledgeMode.MANUAL表示手动确认模式消费者必须在成功处理消息之后调用basicAck来确认消息。如果消息未被确认RabbitMQ会认为消息未处理成功并且会在消费者可用时重新投递该消息这种模式提高了消息处理的可靠性因为即使消费者处理消息后失败消息也不会丢失而是可以被重新处理。
spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testpublisher-confirm-type: correlated # 表示发送方确认模式的confirm机制correlated表示异步确认还有一种同步确认不管同步确认可能造成阻塞publisher-returns: true # 表示发送方确认模式的return模式true表示开启template:mandatory: true # true表示交换机无法进行路由消息时会将消息返回给生产者false表示无法进行路由时直接丢弃listener:simple:acknowledge-mode: auto # 消息确认机制三种策略可选
Configuration
public class AckConfig {Bean(ackQueue)public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}Bean(ackExchange)public Exchange ackExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();}Bean(ackQueueBind)public Binding ackQueueBind(Qualifier(ackExchange) Exchange exchange,Qualifier(ackQueue) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(ack).noargs();}}
RestController
RequestMapping(/ack)
public class AckController {Resourcepublic RabbitTemplate rabbitTemplate;RequestMappingpublic void ackQueue() {this.rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, ack, hello ack);System.out.println(消息确认机制生产者发送成功);}}
当使用策略为NONE时发现即使出现异常消息也会从消费者中删除
Configuration
RabbitListener(queues Constants.ACK_QUEUE) // 当类中所有的消费者都指向一个队列时就可以放在类上
public class AckListener {RabbitHandler // RabbitListener注解放在类上时就需要使用该注解放在方法上public void ackListener(String msg) {System.out.println(接收到消息 msg);int a 3 / 0;System.out.println(自制异常用来感受auto);}} 当使用策略为AUTO时发现出现异常之后会一直重试并且在开源界面中也会出现未确认消息一条
Configuration
RabbitListener(queues Constants.ACK_QUEUE) // 当类中所有的消费者都指向一个队列时就可以放在类上
public class AckListener {RabbitHandler // RabbitListener注解放在类上时就需要使用该注解放在方法上public void ackListener(String msg) {System.out.println(接收到消息 msg);int a 3 / 0;System.out.println(自制异常用来感受auto);}} 当使用策略为MANUAL就需要在消费者中修改代码
Configuration
RabbitListener(queues Constants.ACK_QUEUE) // 当类中所有的消费者都指向一个队列时就可以放在类上
public class AckListener {RabbitHandler // RabbitListener注解放在类上时就需要使用该注解放在方法上public void ackListener(Message msg, Channel channel) throws IOException {try {System.out.println(接收到消息为 msg);/*** 第一个参数是deliveryTag* 第二个参数是是否批量处理*/channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);} catch (Exception e) {/*** 第一个参数是deliveryTag* 第二个参数是requeue*/channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);/*** 第一个参数是deliveryTag* 第二个参数是是否批量处理* 第三个参数是requeue*/// channel.basicNack(msg.getMessageProperties().getDeliveryTag(), true, true);}}}
总结
在该篇文章中主要描述了RabbitMQ的保障可靠性传输的三个策略这三个策略保障了消息从生产者产生到消费者消费整个过程的可靠性使得RabbitMQ的性能更好。但是并不能完全保障消息不丢失或者说没有一个消息队列可以保障消息不丢失例如持久化时是不会直接持久化到硬盘而是持久化到缓存中经过几条消息的沉淀之后再持久化到硬盘所以在这个过程中一点宕机那么消息也是会丢失的。总的来说这三条策略还是尽可能的保障了消息传输的可靠性。