在分布式系统中,异步消息是高性能、低延迟的核心通信方式。生产者发送消息后无需阻塞等待 Broker 响应,通过线程池异步执行发送任务,消息发送结果通过 SendCallback 回调通知。该模式不阻塞主线程,完美适配对响应时间敏感、高并发的业务场景。
复用前文定义的实体对象
package com.lusifer.sca.message.producer.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/** 测试消息实体 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MyObject {
private String name;
private int age;
}创建服务类,注入 RocketMQTemplate
package com.lusifer.sca.message.producer.service;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
/** 异步消息发送服务 */
@Service
@RequiredArgsConstructor
public class AsyncMessageService {
private final RocketMQTemplate rocketMQTemplate;
}package com.lusifer.sca.message.producer.test;
import com.lusifer.sca.message.producer.service.AsyncMessageService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
@Slf4j
public class AsyncMessageTest {
@Resource private AsyncMessageService asyncMessageService;
}最基础的异步消息发送方式,非阻塞执行,发送完成后通过回调通知结果。
核心方法:asyncSend(String destination, Object payload, SendCallback sendCallback)
/**
* 发送简单异步消息
* @param destination 消息目标(Topic)
* @param payload 消息内容(对象/字符串)
* @param sendCallback 发送结果回调
*/
public void asyncSendSimple(String destination, Object payload, SendCallback sendCallback) {
rocketMQTemplate.asyncSend(destination, payload, sendCallback);
}测试代码
@Test
public void testAsyncSendSimple() throws InterruptedException {
MyObject myObject = new MyObject("张三", 30);
// Lambda 简化回调写法
asyncMessageService.asyncSendSimple("test-topic", myObject, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步消息发送成功,结果:{}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("异步消息发送失败,原因:{}", throwable.getMessage());
}
});
// 单元测试需等待异步线程执行完毕,否则程序会直接退出
Thread.sleep(10 * 1000);
}为异步消息配置自定义超时时间,适配网络波动、Broker 响应慢的场景。
核心方法:asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
/**
* 发送带超时时间的异步消息
* @param destination 消息目标(Topic)
* @param payload 消息内容
* @param sendCallback 发送结果回调
* @param timeout 超时时间,单位:毫秒
*/
public void asyncSendTimeout(String destination, Object payload, SendCallback sendCallback, long timeout) {
rocketMQTemplate.asyncSend(destination, payload, sendCallback, timeout);
}测试代码
@Test
public void testAsyncSendTimeout() throws InterruptedException {
MyObject myObject = new MyObject("张三", 30);
asyncMessageService.asyncSendTimeout("test-topic", myObject, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("带超时的异步消息发送成功:{}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("带超时的异步消息发送失败:{}", throwable.getMessage());
}
}, 5000);
Thread.sleep(10 * 1000);
}结合延迟级别发送异步消息,消息到达 Broker 后延迟指定时间再投递,适用于订单超时关闭、定时通知等场景。
核心方法:asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel)
/**
* 发送延迟异步消息
* @param destination 消息目标(Topic)
* @param message 消息对象
* @param sendCallback 发送结果回调
* @param timeout 超时时间,单位:毫秒
* @param delayLevel 延迟级别
*/
public void asyncSendDelay(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(destination, message, sendCallback, timeout, delayLevel);
}测试代码
@Test
public void testAsyncSendDelay() throws InterruptedException {
// 构建消息体
Message<String> message = MessageBuilder.withPayload("Hello, Delayed Async RocketMQ!").build();
// 延迟级别 3 = 10 秒后投递
asyncMessageService.asyncSendDelay("test-topic", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("延迟异步消息发送成功:{}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("延迟异步消息发送失败:{}", throwable.getMessage());
}
}, 5000, 3);
// 等待时间大于延迟时间,确保消费到消息
Thread.sleep(15 * 1000);
}异步特性:发送线程非阻塞,消息发送由后台线程池执行,大幅提升接口响应速度。
回调机制:onSuccess 为发送成功回调,onException 为发送失败回调,可在回调中实现重试、告警等逻辑。
单元测试:必须添加 Thread.sleep,否则 JVM 会在异步线程执行完成前退出,导致消息发送失败。
延迟级别:沿用 RocketMQ 固定延迟级别,不支持自定义时间,级别 3 对应 10 秒。