源本科技 | 码上会

分布式消息队列 RocketMQ 部署

2026/05/26
3
0

引言

本节介绍如何使用 Docker Compose 快速部署单节点单副本 RocketMQ 服务,并完成简单的消息收发测试

https://rocketmq.apache.org/zh/

Docker Compose 配置

services:
  namesrv:
    image: apache/rocketmq:5.3.2
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    networks:
      - rocketmq
    command: sh mqnamesrv

  broker:
    image: apache/rocketmq:5.3.2
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
      - 10912:10912
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
    depends_on:
      - namesrv
    networks:
      - rocketmq
    command: sh mqbroker

  proxy:
    image: apache/rocketmq:5.3.2
    container_name: rmqproxy
    networks:
      - rocketmq
    depends_on:
      - broker
      - namesrv
    ports:
      - 8080:8080
      - 8081:8081
    restart: on-failure
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
    command: sh mqproxy
    
networks:
  rocketmq:
    driver: bridge

启动 RocketMQ 集群

docker-compose up -d

集群初始化配置

服务启动后,需要进入容器创建 Topic消费者分组

进入 Broker 容器

docker exec -it rmqbroker bash

创建测试 Topic

创建名为 TestTopic 的 Topic,归属默认集群:

sh mqadmin updatetopic -t TestTopic -c DefaultCluster

创建消费者分组

Java 客户端需要指定消费者组,提前创建 YourConsumerGroup

sh mqadmin updateSubGroup -c DefaultCluster -g YourConsumerGroup

退出容器

exit

Java 消息收发

添加 Maven 依赖

  • 注意:需要追加到 sca-dependencies

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client-java</artifactId>
        <version>5.2.0</version>
    </dependency>
</dependencies>

创建消息模块

  • 创建名为 sca-message 的模块,pom.xml 如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.lusifer</groupId>
        <artifactId>sca</artifactId>
        <version>${revision}</version>
    </parent>

    <artifactId>sca-message</artifactId>
    <packaging>pom</packaging>

    <name>${project.artifactId}</name>
    <description>消息模块</description>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

    <modules>
        <module>sca-message-producer</module>
        <module>sca-message-consumer</module>
    </modules>
</project>

发送普通消息

  • 创建名为 sca-message-producer 的子模块,pom.xml 如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.lusifer</groupId>
        <artifactId>sca-message</artifactId>
        <version>${revision}</version>
    </parent>

    <artifactId>sca-message-producer</artifactId>
    <packaging>jar</packaging>

    <name>${project.artifactId}</name>
    <description>服务提供者</description>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>

    <build>
        <finalName>${project.artifactId}</finalName>
    </build>
</project>
  • 测试代码如下

package com.lusifer.sca.message.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

@Slf4j
public class ProducerExample {
  public static void main(String[] args) throws ClientException {
    // 替换为你的服务器 IP + Proxy 端口 8081
    String endpoint = "192.168.203.200:8081";
    String topic = "TestTopic";

    ClientServiceProvider provider = ClientServiceProvider.loadService();
    ClientConfiguration configuration =
        ClientConfiguration.newBuilder().setEndpoints(endpoint).build();

    // 初始化生产者
    Producer producer =
        provider
            .newProducerBuilder()
            .setTopics(topic)
            .setClientConfiguration(configuration)
            .build();

    // 构建并发送消息
    Message message =
        provider
            .newMessageBuilder()
            .setTopic(topic)
            .setKeys("messageKey")
            .setTag("messageTag")
            .setBody("messageBody".getBytes())
            .build();

    try {
      SendReceipt sendReceipt = producer.send(message);
      log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
    } catch (ClientException e) {
      log.error("Failed to send message", e);
    }
  }
}

订阅普通消息

  • 创建名为 sca-message-consumer 的子模块,pom.xml 如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.lusifer</groupId>
        <artifactId>sca-message</artifactId>
        <version>${revision}</version>
    </parent>

    <artifactId>sca-message-consumer</artifactId>
    <packaging>jar</packaging>

    <name>${project.artifactId}</name>
    <description>服务消费者</description>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>

    <build>
        <finalName>${project.artifactId}</finalName>
    </build>
</project>
  • 测试代码如下

package com.lusifer.sca.message.consumer;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;

@Slf4j
public class PushConsumerExample {

  // 保持消费者实例引用,防止被垃圾回收
  private static PushConsumer pushConsumer;

  public static void main(String[] args) throws ClientException, InterruptedException {
    final ClientServiceProvider provider = ClientServiceProvider.loadService();
    // 替换为你的服务器 IP + Proxy 端口 8081
    String endpoints = "192.168.203.200:8081";

    ClientConfiguration clientConfiguration =
        ClientConfiguration.newBuilder().setEndpoints(endpoints).build();

    // 订阅所有 Tag 的消息
    FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
    String consumerGroup = "YourConsumerGroup";
    String topic = "TestTopic";

    // 初始化消费者
    pushConsumer =
        provider
            .newPushConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            .setConsumerGroup(consumerGroup)
            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
            .setMessageListener(
                messageView -> {
                  log.info(
                      "Consume message successfully, messageId={}", messageView.getMessageId());

                  // 打印消息的详细信息
                  log.info("Topic: {}", messageView.getTopic());
                  log.info("Tags: {}", messageView.getTag().orElse(""));
                  log.info("Keys: {}", messageView.getKeys());

                  // 读取 ByteBuffer 中的消息体
                  byte[] body = new byte[messageView.getBody().remaining()];
                  messageView.getBody().get(body);
                  String bodyStr = new String(body, StandardCharsets.UTF_8);
                  log.info("Message Body: {}", bodyStr);

                  return ConsumeResult.SUCCESS;
                })
            .build();

    // 保持程序运行
    Thread.sleep(Long.MAX_VALUE);
  }
}