源本科技 | 码上会

Spring Boot 集成 RocketMQ

2026/05/30
0
0

引言

在 Spring Boot 应用中,通过 Apache RocketMQ 官方提供的 spring-boot-starter 组件,可极简集成 RocketMQ 服务,实现消息的异步发送与监听接收

依赖配置

  • 删除之前的 rocketmq-client-java 依赖,替换如下依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.5</version>
</dependency>

服务端部署调整

由于 rocketmq-spring-boot-starter 暂不支持 RocketMQ 的 Proxy 模式,需切换为 传统 Broker 直连模式,重新配置 Docker Compose 与自定义 Broker 配置。

创建 Broker 配置文件

新建目录 ./conf,并创建 broker.conf 配置文件,必须填写宿主机 IP 保证外部应用可正常连接。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 填写本机宿主机 IP,用于外部 Spring Boot 应用连接
brokerIP1 = 192.168.203.200

升级 Docker Compose 配置

services:
  namesrv:
    image: apache/rocketmq:5.3.2
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    networks:
      - rocketmq
    command: sh mqnamesrv

  broker:
    image: apache/rocketmq:5.3.2
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
    # 挂载自定义配置文件
    volumes:
      - ./conf/broker.conf:/opt/rocketmq-5.3.2/conf/broker.conf
    depends_on:
      - namesrv
    networks:
      - rocketmq
    command: sh mqbroker -c /opt/rocketmq-5.3.2/conf/broker.conf

  dashboard:
    image: apacherocketmq/rocketmq-dashboard:2.1.0
    container_name: rmqdashboard
    ports:
      - 9090:8082
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Drocketmq.config.loginRequired=false
    depends_on:
      - namesrv
    networks:
      - rocketmq
    restart: always
    
networks:
  rocketmq:
    driver: bridge

启动服务

  • 执行命令后台启动完整集群

docker compose up -d
  • 登录 RocketMQ Dashboard 校验配置,确认集群中 Broker 地址为宿主机 IP

消息生产者

生产者 sca-message-producer,实现消息发送功能

Maven 依赖

  • 修改之前的 sca-message 模块的 pom.xml 配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.lusifer</groupId>
        <artifactId>sca</artifactId>
        <version>${revision}</version>
    </parent>

    <artifactId>sca-message</artifactId>
    <packaging>pom</packaging>

    <name>${project.artifactId}</name>
    <description>消息模块</description>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <modules>
        <module>sca-message-producer</module>
        <module>sca-message-consumer</module>
    </modules>
</project>

配置文件

application.yml 配置 Name Server 地址与生产者核心参数:

rocketmq:
  # Name Server 地址,多地址使用英文逗号分隔
  name-server: 192.168.203.200:9876
  producer:
    # 生产者分组名称
    group: my-producer-group
    # 发送消息超时时间,单位 毫秒
    send-message-timeout: 3000

启动类

package com.lusifer.sca.message.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MQProducerApplication {
  public static void main(String[] args) {
    SpringApplication.run(MQProducerApplication.class, args);
  }
}

消息发送测试

使用 JUnit 5 编写测试类,通过 RocketMQTemplate 发送普通消息:

package com.lusifer.sca.message.producer.test;

import jakarta.annotation.Resource;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class MessageProducerTests {

  @Resource private RocketMQTemplate rocketMQTemplate;

  /** 发送普通文本消息 */
  @Test
  public void testSendMessage() {
    // 参数1:Topic 名称;参数2:消息体内容
    rocketMQTemplate.convertAndSend("test-topic", "Hello RocketMQ!");
    System.out.println("消息发送成功");
  }
}

运行测试方法,验证消息发送流程正常。

消息消费者

消费者 sca-message-consumer,实现消息监听与业务消费

Maven 依赖

配置文件

rocketmq:
  # Name Server 地址,与生产者保持一致
  name-server: 192.168.203.200:9876
  consumer:
    # 消费者分组名称
    group: my-consumer-group

启动类

package com.lusifer.sca.message.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MQConsumerApplication {
  public static void main(String[] args) {
    SpringApplication.run(MQConsumerApplication.class, args);
  }
}

消息监听服务

通过 @RocketMQMessageListener 注解绑定 Topic,实现自动消息消费:

package com.lusifer.sca.message.consumer.service;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/** 消息消费者 topic:绑定消息主题 consumerGroup:消费者分组,读取配置文件参数 */
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "${rocketmq.consumer.group}")
public class MessageConsumer implements RocketMQListener<String> {

  /**
   * 接收并处理消息
   *
   * @param message 接收到的消息体
   */
  @Override
  public void onMessage(String message) {
    System.out.println("消费者接收消息:" + message);
  }
}

功能测试

  • 启动消费者服务,保持程序持续运行

  • 运行生产者测试类,发送测试消息

  • 消费者控制台成功打印接收的消息,代表集成测试完成