源本科技 | 码上会

分布式消息队列 RocketMQ 异步消息

2026/05/30
0
0

引言

在分布式系统中,异步消息是高性能、低延迟的核心通信方式。生产者发送消息后无需阻塞等待 Broker 响应,通过线程池异步执行发送任务,消息发送结果通过 SendCallback 回调通知。该模式不阻塞主线程,完美适配对响应时间敏感、高并发的业务场景。

准备工作

消息实体

复用前文定义的实体对象

package com.lusifer.sca.message.producer.domain;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/** 测试消息实体 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MyObject {
  private String name;
  private int age;
}

异步消息生产者

创建服务类,注入 RocketMQTemplate

package com.lusifer.sca.message.producer.service;

import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;

/** 异步消息发送服务 */
@Service
@RequiredArgsConstructor
public class AsyncMessageService {

  private final RocketMQTemplate rocketMQTemplate;
}

测试基类

package com.lusifer.sca.message.producer.test;

import com.lusifer.sca.message.producer.service.AsyncMessageService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
@Slf4j
public class AsyncMessageTest {
  @Resource private AsyncMessageService asyncMessageService;
}

简单异步消息

最基础的异步消息发送方式,非阻塞执行,发送完成后通过回调通知结果。

  • 核心方法:asyncSend(String destination, Object payload, SendCallback sendCallback)

/**
 * 发送简单异步消息
 * @param destination 消息目标(Topic)
 * @param payload 消息内容(对象/字符串)
 * @param sendCallback 发送结果回调
 */
public void asyncSendSimple(String destination, Object payload, SendCallback sendCallback) {
    rocketMQTemplate.asyncSend(destination, payload, sendCallback);
}

测试代码

@Test
public void testAsyncSendSimple() throws InterruptedException {
    MyObject myObject = new MyObject("张三", 30);

    // Lambda 简化回调写法
    asyncMessageService.asyncSendSimple("test-topic", myObject, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("异步消息发送成功,结果:{}", sendResult);
        }

        @Override
        public void onException(Throwable throwable) {
            log.error("异步消息发送失败,原因:{}", throwable.getMessage());
        }
    });

    // 单元测试需等待异步线程执行完毕,否则程序会直接退出
    Thread.sleep(10 * 1000);
}

超时异步消息

为异步消息配置自定义超时时间,适配网络波动、Broker 响应慢的场景。

  • 核心方法:asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)

/**
 * 发送带超时时间的异步消息
 * @param destination 消息目标(Topic)
 * @param payload 消息内容
 * @param sendCallback 发送结果回调
 * @param timeout 超时时间,单位:毫秒
 */
public void asyncSendTimeout(String destination, Object payload, SendCallback sendCallback, long timeout) {
    rocketMQTemplate.asyncSend(destination, payload, sendCallback, timeout);
}

测试代码

@Test
public void testAsyncSendTimeout() throws InterruptedException {
    MyObject myObject = new MyObject("张三", 30);

    asyncMessageService.asyncSendTimeout("test-topic", myObject, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("带超时的异步消息发送成功:{}", sendResult);
        }

        @Override
        public void onException(Throwable throwable) {
            log.error("带超时的异步消息发送失败:{}", throwable.getMessage());
        }
    }, 5000);

    Thread.sleep(10 * 1000);
}

延迟异步消息

结合延迟级别发送异步消息,消息到达 Broker 后延迟指定时间再投递,适用于订单超时关闭、定时通知等场景。

  • 核心方法:asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel)

/**
 * 发送延迟异步消息
 * @param destination 消息目标(Topic)
 * @param message 消息对象
 * @param sendCallback 发送结果回调
 * @param timeout 超时时间,单位:毫秒
 * @param delayLevel 延迟级别
 */
public void asyncSendDelay(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
    rocketMQTemplate.asyncSend(destination, message, sendCallback, timeout, delayLevel);
}

测试代码

@Test
public void testAsyncSendDelay() throws InterruptedException {
    // 构建消息体
    Message<String> message = MessageBuilder.withPayload("Hello, Delayed Async RocketMQ!").build();

    // 延迟级别 3 = 10 秒后投递
    asyncMessageService.asyncSendDelay("test-topic", message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("延迟异步消息发送成功:{}", sendResult);
        }

        @Override
        public void onException(Throwable throwable) {
            log.error("延迟异步消息发送失败:{}", throwable.getMessage());
        }
    }, 5000, 3);

    // 等待时间大于延迟时间,确保消费到消息
    Thread.sleep(15 * 1000);
}

注意事项

  • 异步特性:发送线程非阻塞,消息发送由后台线程池执行,大幅提升接口响应速度。

  • 回调机制onSuccess 为发送成功回调,onException 为发送失败回调,可在回调中实现重试、告警等逻辑。

  • 单元测试:必须添加 Thread.sleep,否则 JVM 会在异步线程执行完成前退出,导致消息发送失败。

  • 延迟级别:沿用 RocketMQ 固定延迟级别,不支持自定义时间,级别 3 对应 10 秒。