当前位置: 首页 > news >正文

然后做服装网站建设银行网站背景图片

然后做服装网站,建设银行网站背景图片,如何做 旅游网站内容,上海市中小企业服务平台RabbitMQ之发布确认高级 一、发布确认 SpringBoot 版本1.1 确认机制方案1.2 代码架构图1.3 配置文件1.4 添加配置类1.5 消息生产者1.6 回调接口1.7 消息消费者1.8 结果分析 二、回退消息2.1 Mandatory 参数2.2 消息生产者代码2.3 回调接口2.4 结果分析 三、备份交换机3.1 代码架… RabbitMQ之发布确认高级 一、发布确认 SpringBoot 版本1.1 确认机制方案1.2 代码架构图1.3 配置文件1.4 添加配置类1.5 消息生产者1.6 回调接口1.7 消息消费者1.8 结果分析 二、回退消息2.1 Mandatory 参数2.2 消息生产者代码2.3 回调接口2.4 结果分析 三、备份交换机3.1 代码架构图3.2 修改配置类3.3 报警消费者3.4 测试注意事项3.5 结果分析 在生产环境中由于一些不明原因导致 rabbitmq 重启在 RabbitMQ 重启期间生产者消息投递失败导致消息丢失需要手动处理和恢复。于是我们开始思考如何才能进行 RabbitMQ 的消息可靠投递呢特别是在这样比较极端的情况RabbitMQ 集群不可用的时候无法投递的消息该如何处理呢 应 用 [xxx] 在 [08-1516:36:04] 发 生 [ 错 误 日 志 异 常 ] alertId[xxx] 。 由 [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620] 触 发 。 应用 xxx 可能原因如下 服 务 名 为 异 常 为 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620, 产 生 原 因 如 下 :1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn’t exist or the broker will not allow us to use it.||Consumer received fatalfalse exception on startup: 一、发布确认 SpringBoot 版本 1.1 确认机制方案 1.2 代码架构图 1.3 配置文件 在配置文件当中需要添加 spring.rabbitmq.publisher-confirm-typecorrelatedNONE 禁止发布确认模式是默认值CORRELATED 发布消息成功到交换机后会触发回调方法SIMPLE 经测试有两种效果其一效果和 CORRELATED 值一样会触发回调方法其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果根据返回结果来判定下一步的逻辑要注意的点是 watiForConfirmsOrDie 方法如果返回 false 则会关闭 channel则接下来无法发送消息到 broker。 spring.rabbitmq.host192.168.10.128 spring.rabbitmq.port5672 spring.rabbitmq.usernameadmin spring.rabbitmq.password123 spring.rabbitmq.publisher-confirm-typecorrelated1.4 添加配置类 Configuration public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME confirm.exchange;public static final String CONFIRM_QUEUE_NAME confirm.queue;//声明业务 ExchangeBean(confirmExchange)public DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列Bean(confirmQueue)public Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系Beanpublic Binding queueBinding(Qualifier(confirmQueue) Queue queue, Qualifier(confirmExchange) DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(key1);} }1.5 消息生产者 RestController RequestMapping(/confirm) Slf4j public class Producer {public static final String CONFIRM_EXCHANGE_NAME confirm.exchange;Autowiredprivate RabbitTemplate rabbitTemplate;Autowiredprivate MyCallBack myCallBack;// 依赖注入 rabbitTemplate 之后再设置它的回调对象PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(myCallBack);}GetMapping(sendMessage/{message})public void sendMessage(PathVariable String message) {//指定消息 id 为 1CorrelationData correlationData1 new CorrelationData(1);String routingKey key1;rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message routingKey, correl ationData1);CorrelationData correlationData2 new CorrelationData(2);routingKey key2;rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message routingKey, correl ationData2);log.info(发送消息内容:{}, message);} }1.6 回调接口 Component Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id correlationData ! null ? correlationData.getId() : ;if(ack) {log.info(交换机已经收到 id 为:{}的消息, id);} else {log.info(交换机还未收到 id 为:{}消息,由于原因:{}, id, cause);}} }1.7 消息消费者 Component Slf4j public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME confirm.queue;RabbitListener(queues CONFIRM_QUEUE_NAME)public void receiveMsg(Message message) {String msg newString(message.getBody());log.info(接受到队列 confirm.queue 消息:{}, msg);} }1.8 结果分析 可以看到发送了两条消息第一条消息的 RoutingKey 为“key1”第二条消息的 RoutingKey 为“key2”两条消息都成功被交换机接受也收到了交换机的确认回调但消费者只收到了一条消息因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致也没有其他队列能接受这个消息所以第二条消息被直接丢弃了。 交换机发出了确认回调但实际上队列没有收到消息。 二、回退消息 2.1 Mandatory 参数 在仅开启了生产者确认机制的情况下交换机接收到消息后会直接给消息生产者发送确认消息如果发现该消息不可路由那么消息会被直接丢弃此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下最起码通知我一声我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。 2.2 消息生产者代码 Slf4j Component public class MessageProducer implements RabbitTemplate.ConfirmCallBack,RabbitTemplate.ReturnCallback {Autowiredprivate RabbitTemplate rabbitTemplate;//rabbitTemplate 注入之后就设置该值PostConstructprivate void init() {rabbitTemplate.setConfirmCallback(this);/*** true* 交换机无法将消息进行路由时会将该消息返回给生产者* false* 如果发现消息无法进行路由则直接丢弃*/rabbitTemplate.setMandatory(true);//设置回退消息交给谁处理rabbitTemplate.setReturnCallback(this);}GetMapping(sendMessage)public void sendMessage(String message) {//让消息绑定一个 id 值CorrelationData correlationData1 new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(confirm.exchange, key1, message key1, correlationData1);log.info(发送消息 id 为:{}内容为{}, correlationData1.getId(), message key1);CorrelationData correlationData2 new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(confirm.exchange, key2, message key2, correlationData2);log.info(发送消息 id 为:{}内容为{}, correlationData2.getId(), message key2);}Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id correlationData ! null ? correlationData.getId() : ;if(ack) {log.info(交换机收到消息确认成功, id:{}, id);} else {log.error(消息 id:{}未成功投递到交换机,原因是:{}, id, cause);}}Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info(消息:{}被服务器退回退回原因:{}, 交换机是:{}, 路由 key:{}, new String(message.getBody()), replyText, exchange, routingKey);} }2.3 回调接口 Component Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id correlationData ! null ? correlationData.getId() : ;if(ack) {log.info(交换机已经收到 id 为:{}的消息, id);} else {log.info(交换机还未收到 id 为:{}消息,由于原因:{}, id, cause);}}//当消息无法路由的时候的回调方法Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.error( 消 息 {}, 被 交 换 机 {} 退 回 退 回 原 因 :{}, 路 由 key:{}, new String(message.getBody()), exchange, replyText, routingKey);} }2.4 结果分析 三、备份交换机 有了 mandatory 参数和回退消息我们获得了对无法投递消息的感知能力有机会在生产者的消息无法被投递时发现并处理。但有时候我们并不知道该如何处理这些无法路由的消息最多打个日志然后触发报警再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法特别是当生产者所在的服务有多台机器的时候手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者复杂性需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息又不想增加生产者的复杂性该怎么做呢   前面在设置私信队列的文章中我们提到可以为队列设置私信交换机来存储那些处理失败的消息可是这些不可路由的消息根本没机会进入到队列因此无法使用私信队列来保存消息。在 RabbitMQ 中有一种叫做备份交换机的机制存在可以很好的应对这个问题。什么是备份交换机呢备份交换机可以理解为 RabbitMQ 中交换机的“备胎”当我们为某一个交换机声明一个对应的备份交换机时就是为它创建一个备胎当交换机接收到一条不可路由消息时将会把这条消息转发到备份交换机中由备份交换机来进行转发和处理通常备份交换机的类型为 Fanout这样就能把素有消息都投递到与其绑定的队列中然后我们在备份交换机下绑定一个队列这样所有那些原交换机无法被路由的消息就会进入这个队列中了。当然我们还可以建立一个报警队列用独立的消费者进行检测和报警。 3.1 代码架构图 3.2 修改配置类 Configuration public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME confirm.exchange;public static final String CONFIRM_QUEUE_NAME confirm.queue;public static final String BACKUP_EXCHANGE_NAME backup.exchange;public static final String BACKUP_QUEUE_NAME backup.queue;public static final String WARNING_QUEUE_NAME warning.queue;// 声明确认队列Bean(confirmQueue)public Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系Beanpublic Binding queueBinding(Qualifier(confirmQueue) Queue queue, Qualifier(confirmExchange) DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(key1);}//声明备份 ExchangeBean(backupExchange)public FanoutExchange backupExchange() {return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认 Exchange 交换机的备份交换机Bean(confirmExchange)public DirectExchange confirmExchange() {ExchangeBuilder exchangeBuilder ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备份交换机.withArgument(alternate-exchange, BACKUP_EXCHANGE_NAME);return(DirectExchange) exchangeBuilder.build();}// 声明警告队列Bean(warningQueue)public Queue warningQueue() {return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系Beanpublic Binding warningBinding(Qualifier(warningQueue) Queue queue, Qualifier(backupExchange) FanoutExchange backupExchange) {return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列Bean(backQueue)public Queue backQueue() {return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系Beanpublic Binding backupBinding(Qualifier(backQueue) Queue queue, Qualifier(backupExchange) FanoutExchange backupExchange) {return BindingBuilder.bind(queue).to(backupExchange);} }3.3 报警消费者 Component Slf4j public class WarningConsumer {public static final String WARNING_QUEUE_NAME warning.queue;RabbitListener(queues WARNING_QUEUE_NAME)public void receiveWarningMsg(Message message) {String msg new String(message.getBody());log.error(报警发现不可路由消息{}, msg);} }3.4 测试注意事项 重新启动项目的时候需要把原来的 cofirm.exchange 删除因为我们修改了其绑定属性不然报错 3.5 结果分析 mandatory 参数与备份交换机可以一起使用的时候如果两者同时开启消息究竟何去何从谁优先级高经过上面结果显示答案是备份交换机优先级高.
http://www.yingshimen.cn/news/29031/

相关文章:

  • 怎么给公司免费做网站有免费建站的网站吗
  • 彩网站开发浙江省网站重点学科建设
  • 建设网站教程2016企业小程序怎么申请注册
  • 如何编写网站手机网银怎么开通
  • php网站如何做多语言深圳企业网站制作制作
  • 常州网站建设维护太原心诺做网站
  • 东莞商城网站建设价格qq企业邮箱格式
  • 温州专业微网站制作网络公司东莞网络优化专业乐云seo
  • 怎么做企业网站仿站如何建立一个好的网站
  • 什么网站可以赚钱啊网站服务器共享的 vps
  • 公司的英文网站云南公司建网站多少钱
  • 廊坊开发区规划建设局网站网站建设公司如何收费
  • 营销型网站设计文章wordpress下载整站源码
  • 网站开发职业技能简历网站建设特色
  • 软件下载网站哪个好企业网站手机端开发
  • 武城网站建设价格学做网站推广要多久时间
  • 返利网站建设网站开发完整的解决方案
  • 购买空间后怎么上传网站建培网
  • 做漫画封面的网站如何自己创造一个网站平台
  • 免费网页设计制作网站上海华东建设发展设计有限公司网站
  • 做内部网站费用建e网室内设计网网址
  • 阿里巴巴网站导航怎么做微信怎么开发小程序
  • 网站的代运营网络图片素材
  • 网站营运遵义网站建设遵义
  • 云虚拟主机怎么做网站购物网站建设开发费用分析
  • 微网站建设教学我想建立个网站数据怎么办
  • 截屏的图片wordpress不能显示网站优化排名
  • 大汉网站开发免费的个人网站html代码
  • 国内个人网站欣赏找人做销售网站
  • 济南建网站价格消费品展泉州企业建站模板