网站设计师发展前景,网站建设包含,网站自定义链接怎么做,网站建设模板之家免费下载什么是顺序消费 例如#xff1a;业务上产生者发送三条消息#xff0c; 分别是对同一条数据的增加、修改、删除操作#xff0c; 如果没有保证顺序消费#xff0c;执行顺序可能变成删除、修改、增加#xff0c;这就乱了。 如何保证顺序性 一般我们讨论如何保证消息的顺序性业务上产生者发送三条消息 分别是对同一条数据的增加、修改、删除操作 如果没有保证顺序消费执行顺序可能变成删除、修改、增加这就乱了。 如何保证顺序性 一般我们讨论如何保证消息的顺序性会从下面三个方面考虑 1发送消息的顺序 2队列中消息的顺序 3消费消息的顺序 发送消息的顺序 消息发送端的顺序大部分业务不做要求谁先发消息无所谓如果遇到业务一定要发送消息也确保顺序那意味着只能全局加锁一个个的操作一个个的发消息不能并发发送消息。
队列中消息的顺序 RabbitMQ 中消息最终会保存在队列中在同一个队列中消息是顺序的先进先出原则这个由 RabbitMQ 保证通常也不需要开发关心。
不同队列 中的消息顺序是没有保证的例如进地铁站的时候排了三个队伍不同队伍之间的不能确保谁先进站。
消费消息的顺序 我们说如何保证消息顺序性通常说的就是消费者消费消息的顺序在多个消费者消费同一个消息队列的场景通常是无法保证消息顺序的
虽然消息队列的消息是顺序的但是多个消费者并发消费消息获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。 例如消息A、B、C按顺序进入队列消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快就跑完了又或者消费者A1挂了都会导致消息顺序不一致。 解决消费顺序的问题 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理 缺点就是并发能力下降了无法并发消费消息这是个取舍问题。
如果业务又要顺序消费又要增加并发通常思路就是开启多个队列业务根据规则将消息分发到不同的队列通过增加队列的数量来提高并发度例如电商订单场景只需要保证同一个用户的订单消息的顺序性就行不同用户之间没有关系所以只要让同一个用户的订单消息进入同一个队列就行其他用户的订单消息可以进入不同的队列。以下为代码设计过程实现 首先我们必须保证只有一个消费者 那么问题就来了我们的项目一般是多副本的如何保证只有一个副本在消费呢 这时就会用到消费者 单活模式 x-single-active-consumer 使用下述配置实现 private Queue creatQueue(String name){// 创建一个 单活模式 队列HashMapString, Object argsnew HashMap();args.put(x-single-active-consumer,true);return new Queue(name,true,false,false,args);}创建之后我们可以在控制台看到 消费者的激活状态
配置类
Configuration
SuppressWarnings(all)
public class DirectExchangeConfiguration {Beanpublic Queue queue15_0() {return creatQueue(Message15.QUEUE_0);}Beanpublic Queue queue15_1() {return creatQueue(Message15.QUEUE_1);}Beanpublic Queue queue15_2() {return creatQueue(Message15.QUEUE_2);}Beanpublic Queue queue15_3() {return creatQueue(Message15.QUEUE_3);}Beanpublic DirectExchange exchange15() {// name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它return new DirectExchange(Message15.EXCHANGE, true, false);}Beanpublic Binding binding15_0() {return BindingBuilder.bind(queue15_0()).to(exchange15()).with(0);}Beanpublic Binding binding15_1() {return BindingBuilder.bind(queue15_1()).to(exchange15()).with(1);}Beanpublic Binding binding15_2() {return BindingBuilder.bind(queue15_2()).to(exchange15()).with(2);}Beanpublic Binding binding15_3() {return BindingBuilder.bind(queue15_3()).to(exchange15()).with(3);}/*** 创建一个 单活 模式的队列* 注意 * p* 如果一个队列已经创建为非x-single-active-consumer而你想更改其为x-single-active-consumer要把之前创建的队列删除** param name* return queue*/private Queue creatQueue(String name) {// 创建一个 单活模式 队列HashMapString, Object args new HashMap();args.put(x-single-active-consumer, true);return new Queue(name, true, false, false, args);}
》生产者
Component
Slf4j
public class Producer15 {Resourceprivate RabbitTemplate rabbitTemplate;/*** 这里的发送是 拟投递到多个队列中** param id 业务id* param msg 业务信息*/public void syncSend(int id, String msg) {Message15 message new Message15(id, msg);rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);}/*** 根据 id 取余来决定丢到那个队列中去** param id id* return routingKey*/private String getRoutingKey(int id) {return String.valueOf(id % Message15.QUEUE_COUNT);}
}
》消费者
/*** 要想保证消息的顺序每个队列只能有一个消费者** author 深漂码农明哥* date 2024-03-18*/
Component
RabbitListener(queues Message15.QUEUE_0)
RabbitListener(queues Message15.QUEUE_1)
RabbitListener(queues Message15.QUEUE_2)
RabbitListener(queues Message15.QUEUE_3)
Slf4j
public class Consumer15 {RabbitHandlerpublic void onMessage(Message15 message) throws InterruptedException {log.info([{}][Consumer15 onMessage][线程编号:{} 消息内容{}], LocalDateTime.now(), Thread.currentThread().getId(), message);// 这里随机睡一会模拟业务处理时候的耗时long l new Random(1000).nextLong();TimeUnit.MILLISECONDS.sleep(l);}
}
》测试类
Testvoid mock() throws InterruptedException {// 先启动这个测试类模拟多个副本情况下看如何消费new CountDownLatch(1).await();}Testvoid syncSend() throws InterruptedException {// 模拟每个队列中扔 10 个数据看看效果for (int i 0; i 10; i) {for (int j 0; j 4; j) {producer15.syncSend(j, 编号 j 第 i 条消息);}}TimeUnit.SECONDS.sleep(20);}
}
ps测试的时候时候 先启动 mock 方式。 在启动 syncSend 方法模拟多个副本同时消费观察是否可以 以上的是RabbitMQ之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接点我—let’s go 若需完整代码 可识别二维码后 给您发代码。