网站建设与管理试卷A,网络公司怎么挣钱的,WordPress实现扫码登录,徐州市徐州市城乡建设局网站前言
在当今快速发展的软件开发领域#xff0c;构建高效、稳定的应用系统是每个开发者的追求。Spring Boot 作为一款极具影响力的开发框架#xff0c;凭借其强大的自动化配置和便捷的开发特性#xff0c;极大地简化了项目搭建过程。使用 Spring Boot#xff0c;我们无需再…前言
在当今快速发展的软件开发领域构建高效、稳定的应用系统是每个开发者的追求。Spring Boot 作为一款极具影响力的开发框架凭借其强大的自动化配置和便捷的开发特性极大地简化了项目搭建过程。使用 Spring Boot我们无需再为框架之间的兼容性、适用版本等繁杂问题而烦恼。只需简单添加一个配置就能轻松引入所需的各种功能和组件实现快速开发。
一、技术介绍
1.1 消息队列
消息队列中间件是分布式系统中不可或缺的重要组件。在分布式系统中各个服务之间相互独立又紧密协作消息队列就像是一座桥梁连接着不同的服务实现它们之间的高效通信。其主要功能在于解决应用耦合、异步消息处理以及流量削峰等关键问题。
1.2 RocketMQ
RocketMQ 是一款基于队列模型的消息中间件具有高性能、高可靠、高实时和分布式的显著特点。它采用 Java 语言开发由阿里巴巴团队开发并开源。RocketMQ 的高性能体现在其能够支持高并发的消息读写操作满足大规模分布式系统的性能需求。
二、使用步骤
2.1 引入 Maven 依赖
首先需要在项目的pom.xml文件中引入相关的 Maven 依赖。可以直接复制都是通用的
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.4.1/versionrelativePath/
/parent
dependenciesdependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.1.1/version/dependencydependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.8.0/version/dependencydependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-common/artifactIdversion4.8.0/version/dependency
/dependencies
引入了rocketmq-spring-boot-starter依赖它提供了 Spring Boot 与 RocketMQ 集成的相关功能。同时还引入了rocketmq-client和rocketmq-common依赖它们分别包含了 RocketMQ 客户端的核心功能和公共工具类。
2.2 封装 RocketMQ 工具类
为了更方便地在项目中使用 RocketMQ 发送消息作者封装了一个 RocketMQ 工具类将常用的消息发送操作封装起来。
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;Component
public class RocketMqHelper {private static final Logger LOG LoggerFactory.getLogger(RocketMqHelper.class);/*** rocketmq模板注入*/Autowiredprivate RocketMQTemplate rocketMQTemplate;PostConstructpublic void init() {LOG.info(---RocketMq助手初始化---);}/*** 发送异步消息的基础方法** param topic 消息Topic* param message 消息实体* param sendCallback 回调函数* param timeout 超时时间* param delayLevel 延迟消息的级别*/private void asyncSendBase(String topic, Message? message, SendCallback sendCallback, long timeout, int delayLevel) {if (topic null || message null) {LOG.error(发送异步消息时topic或message不能为空);return;}if (sendCallback null) {sendCallback getDefaultSendCallBack();}rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);}/*** 发送异步消息** param topic 消息Topic* param message 消息实体*/public void asyncSend(Enum topic, Message? message) {asyncSend(topic.name(), message, null, 0, 0);}/*** 发送异步消息** param topic 消息Topic* param message 消息实体* param sendCallback 回调函数*/public void asyncSend(Enum topic, Message? message, SendCallback sendCallback) {asyncSend(topic.name(), message, sendCallback, 0, 0);}/*** 发送异步消息** param topic 消息Topic* param message 消息实体*/public void asyncSend(String topic, Message? message) {asyncSend(topic, message, null, 0, 0);}/*** 发送异步消息** param topic 消息Topic* param message 消息实体* param sendCallback 回调函数*/public void asyncSend(String topic, Message? message, SendCallback sendCallback) {asyncSend(topic, message, sendCallback, 0, 0);}/*** 发送异步消息** param topic 消息Topic* param message 消息实体* param sendCallback 回调函数* param timeout 超时时间*/public void asyncSend(String topic, Message? message, SendCallback sendCallback, long timeout) {asyncSend(topic, message, sendCallback, timeout, 0);}/*** 发送异步消息** param topic 消息Topic* param message 消息实体* param sendCallback 回调函数* param timeout 超时时间* param delayLevel 延迟消息的级别*/public void asyncSend(String topic, Message? message, SendCallback sendCallback, long timeout, int delayLevel) {asyncSendBase(topic, message, sendCallback, timeout, delayLevel);}/*** 发送顺序消息的基础方法** param topic 消息Topic* param message 消息实体* param hashKey 哈希键* param timeout 超时时间*/private void syncSendOrderlyBase(String topic, Message? message, String hashKey, long timeout) {if (topic null || message null || hashKey null) {LOG.error(发送顺序消息时topic、message或hashKey不能为空);return;}LOG.info(发送顺序消息topic:{}, hashKey:{}, timeout:{}, topic, hashKey, timeout);rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);}/*** 发送顺序消息** param topic 消息Topic* param message 消息实体* param hashKey 哈希键*/public void syncSendOrderly(Enum topic, Message? message, String hashKey) {syncSendOrderly(topic.name(), message, hashKey, 0);}/*** 发送顺序消息** param topic 消息Topic* param message 消息实体* param hashKey 哈希键*/public void syncSendOrderly(String topic, Message? message, String hashKey) {syncSendOrderly(topic, message, hashKey, 0);}/*** 发送顺序消息** param topic 消息Topic* param message 消息实体* param hashKey 哈希键* param timeout 超时时间*/public void syncSendOrderly(String topic, Message? message, String hashKey, long timeout) {syncSendOrderlyBase(topic, message, hashKey, timeout);}/*** 默认CallBack函数** return*/private SendCallback getDefaultSendCallBack() {return new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {LOG.info(---发送MQ成功---);}Overridepublic void onException(Throwable throwable) {LOG.error(---发送MQ失败---, throwable);}};}PreDestroypublic void destroy() {LOG.info(---RocketMq助手注销---);}
}
2.3 配置文件
在application.yml配置文件中配置 RocketMQ 的相关参数包括 Name Server 地址、生产者配置等配置信息大部分也是通用的端口号可以自己适配。
server:port: 8088
#rocketmq配置
rocketmq:name-server: 127.0.0.1:9876# 生产者配置 producer:isOnOff: on# 发送同一类消息的设置为同一个group保证唯一group: rocketmq-groupgroupName: rocketmq-group# 服务地址namesrvAddr: 127.0.0.1:9876# 消息最大长度 默认1024*4(4M)maxMessageSize: 4096# 发送消息超时时间,默认3000sendMsgTimeout: 3000# 发送消息失败重试次数默认2retryTimesWhenSendFailed: 2
2.4 单元测试
为了验证 RocketMQ 的集成是否成功编写单元测试代码发送消息并监听消息的接收情况。
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;SpringBootTest
public class RocketMQTest {Autowiredprivate RocketMqHelper rocketMqHelper;Testpublic void testProducter() {Student stu new Student();stu.setName(abc);stu.setScore(19);rocketMqHelper.asyncSend(STUDENT_ADD, MessageBuilder.withPayload(stu).build());}
}
异步发送student对象给RocketMQ的broker
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;Component
RocketMQMessageListener(consumerGroup ${rocketmq.producer.groupName}, topic STUDENT_ADD)
public class PersonMqListener implements RocketMQListenerPerson {Overridepublic void onMessage(Student stu) {System.out.println(接收到消息开始消费..name: stu.getName() ,age: stu.getAge());}
}
消费逻辑是重写RocketMQlistener的父类方法就行