在 Spring Boot 应用中,通过 Apache RocketMQ 官方提供的 spring-boot-starter 组件,可极简集成 RocketMQ 服务,实现消息的异步发送与监听接收
删除之前的 rocketmq-client-java 依赖,替换如下依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.5</version>
</dependency>由于 rocketmq-spring-boot-starter 暂不支持 RocketMQ 的 Proxy 模式,需切换为 传统 Broker 直连模式,重新配置 Docker Compose 与自定义 Broker 配置。
新建目录 ./conf,并创建 broker.conf 配置文件,必须填写宿主机 IP 保证外部应用可正常连接。
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 填写本机宿主机 IP,用于外部 Spring Boot 应用连接
brokerIP1 = 192.168.203.200services:
namesrv:
image: apache/rocketmq:5.3.2
container_name: rmqnamesrv
ports:
- 9876:9876
networks:
- rocketmq
command: sh mqnamesrv
broker:
image: apache/rocketmq:5.3.2
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
# 挂载自定义配置文件
volumes:
- ./conf/broker.conf:/opt/rocketmq-5.3.2/conf/broker.conf
depends_on:
- namesrv
networks:
- rocketmq
command: sh mqbroker -c /opt/rocketmq-5.3.2/conf/broker.conf
dashboard:
image: apacherocketmq/rocketmq-dashboard:2.1.0
container_name: rmqdashboard
ports:
- 9090:8082
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Drocketmq.config.loginRequired=false
depends_on:
- namesrv
networks:
- rocketmq
restart: always
networks:
rocketmq:
driver: bridge执行命令后台启动完整集群
docker compose up -d登录 RocketMQ Dashboard 校验配置,确认集群中 Broker 地址为宿主机 IP

生产者 sca-message-producer,实现消息发送功能
修改之前的 sca-message 模块的 pom.xml 配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.lusifer</groupId>
<artifactId>sca</artifactId>
<version>${revision}</version>
</parent>
<artifactId>sca-message</artifactId>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>消息模块</description>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<modules>
<module>sca-message-producer</module>
<module>sca-message-consumer</module>
</modules>
</project>application.yml 配置 Name Server 地址与生产者核心参数:
rocketmq:
# Name Server 地址,多地址使用英文逗号分隔
name-server: 192.168.203.200:9876
producer:
# 生产者分组名称
group: my-producer-group
# 发送消息超时时间,单位 毫秒
send-message-timeout: 3000package com.lusifer.sca.message.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MQProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MQProducerApplication.class, args);
}
}使用 JUnit 5 编写测试类,通过 RocketMQTemplate 发送普通消息:
package com.lusifer.sca.message.producer.test;
import jakarta.annotation.Resource;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class MessageProducerTests {
@Resource private RocketMQTemplate rocketMQTemplate;
/** 发送普通文本消息 */
@Test
public void testSendMessage() {
// 参数1:Topic 名称;参数2:消息体内容
rocketMQTemplate.convertAndSend("test-topic", "Hello RocketMQ!");
System.out.println("消息发送成功");
}
}运行测试方法,验证消息发送流程正常。

消费者 sca-message-consumer,实现消息监听与业务消费
rocketmq:
# Name Server 地址,与生产者保持一致
name-server: 192.168.203.200:9876
consumer:
# 消费者分组名称
group: my-consumer-grouppackage com.lusifer.sca.message.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MQConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(MQConsumerApplication.class, args);
}
}通过 @RocketMQMessageListener 注解绑定 Topic,实现自动消息消费:
package com.lusifer.sca.message.consumer.service;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/** 消息消费者 topic:绑定消息主题 consumerGroup:消费者分组,读取配置文件参数 */
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "${rocketmq.consumer.group}")
public class MessageConsumer implements RocketMQListener<String> {
/**
* 接收并处理消息
*
* @param message 接收到的消息体
*/
@Override
public void onMessage(String message) {
System.out.println("消费者接收消息:" + message);
}
}启动消费者服务,保持程序持续运行
运行生产者测试类,发送测试消息
消费者控制台成功打印接收的消息,代表集成测试完成