本节介绍如何使用 Docker Compose 快速部署单节点单副本 RocketMQ 服务,并完成简单的消息收发测试
services:
namesrv:
image: apache/rocketmq:5.3.2
container_name: rmqnamesrv
ports:
- 9876:9876
networks:
- rocketmq
command: sh mqnamesrv
broker:
image: apache/rocketmq:5.3.2
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
depends_on:
- namesrv
networks:
- rocketmq
command: sh mqbroker
proxy:
image: apache/rocketmq:5.3.2
container_name: rmqproxy
networks:
- rocketmq
depends_on:
- broker
- namesrv
ports:
- 8080:8080
- 8081:8081
restart: on-failure
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
command: sh mqproxy
networks:
rocketmq:
driver: bridgedocker-compose up -d服务启动后,需要进入容器创建 Topic 和 消费者分组
docker exec -it rmqbroker bash创建名为 TestTopic 的 Topic,归属默认集群:
sh mqadmin updatetopic -t TestTopic -c DefaultClusterJava 客户端需要指定消费者组,提前创建 YourConsumerGroup:
sh mqadmin updateSubGroup -c DefaultCluster -g YourConsumerGroupexit注意:需要追加到 sca-dependencies
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.2.0</version>
</dependency>
</dependencies>创建名为 sca-message 的模块,pom.xml 如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.lusifer</groupId>
<artifactId>sca</artifactId>
<version>${revision}</version>
</parent>
<artifactId>sca-message</artifactId>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>消息模块</description>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<modules>
<module>sca-message-producer</module>
<module>sca-message-consumer</module>
</modules>
</project>创建名为 sca-message-producer 的子模块,pom.xml 如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.lusifer</groupId>
<artifactId>sca-message</artifactId>
<version>${revision}</version>
</parent>
<artifactId>sca-message-producer</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>服务提供者</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
</build>
</project>测试代码如下
package com.lusifer.sca.message.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
@Slf4j
public class ProducerExample {
public static void main(String[] args) throws ClientException {
// 替换为你的服务器 IP + Proxy 端口 8081
String endpoint = "192.168.203.200:8081";
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration configuration =
ClientConfiguration.newBuilder().setEndpoints(endpoint).build();
// 初始化生产者
Producer producer =
provider
.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 构建并发送消息
Message message =
provider
.newMessageBuilder()
.setTopic(topic)
.setKeys("messageKey")
.setTag("messageTag")
.setBody("messageBody".getBytes())
.build();
try {
SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
log.error("Failed to send message", e);
}
}
}创建名为 sca-message-consumer 的子模块,pom.xml 如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.lusifer</groupId>
<artifactId>sca-message</artifactId>
<version>${revision}</version>
</parent>
<artifactId>sca-message-consumer</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>服务消费者</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
</build>
</project>测试代码如下
package com.lusifer.sca.message.consumer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
@Slf4j
public class PushConsumerExample {
// 保持消费者实例引用,防止被垃圾回收
private static PushConsumer pushConsumer;
public static void main(String[] args) throws ClientException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 替换为你的服务器 IP + Proxy 端口 8081
String endpoints = "192.168.203.200:8081";
ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder().setEndpoints(endpoints).build();
// 订阅所有 Tag 的消息
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
String consumerGroup = "YourConsumerGroup";
String topic = "TestTopic";
// 初始化消费者
pushConsumer =
provider
.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(
messageView -> {
log.info(
"Consume message successfully, messageId={}", messageView.getMessageId());
// 打印消息的详细信息
log.info("Topic: {}", messageView.getTopic());
log.info("Tags: {}", messageView.getTag().orElse(""));
log.info("Keys: {}", messageView.getKeys());
// 读取 ByteBuffer 中的消息体
byte[] body = new byte[messageView.getBody().remaining()];
messageView.getBody().get(body);
String bodyStr = new String(body, StandardCharsets.UTF_8);
log.info("Message Body: {}", bodyStr);
return ConsumeResult.SUCCESS;
})
.build();
// 保持程序运行
Thread.sleep(Long.MAX_VALUE);
}
}