从最基础的概念讲起,彻底讲透3个工具的本质区别、适用场景、可运行代码样例,最后给出生产级最佳实践&反面踩坑案例。哪怕你是刚接触编程的新手,也能一次性搞懂,再也不会选错工具。
前置答疑:最开始的核心疑问——RocketMQ和Feign是不是功能重复?
一句话结论:完全不重复,二者是微服务里两种完全不同的通信方式,就像「打电话」和「发快递」的本质区别
大白话类比&核心定位
Feign(声明式HTTP客户端)= 打电话
你必须知道对方的电话号码(服务地址/接口),对方必须开机在线(服务可用)
拨号后必须等对方接电话,说完话、拿到回复才能挂电话(同步阻塞,必须等结果)
点对点直接对话,中间无第三方
适用场景:必须立刻拿到结果,比如查用户余额、确认支付是否成功
RocketMQ(分布式消息队列)= 发快递
你把包裹(消息)交给快递驿站(RocketMQ Broker),转身就走,不用管收件人什么时候收、在哪收
驿站会把包裹存到仓库(磁盘持久化),保证不丢,还会反复派送,直到收件人收到(自带重试机制)
你和收件人全程不用见面、甚至不用认识,只需要知道收件地址(Topic)就行
适用场景:不需要立刻拿到结果、甚至不需要结果,比如下单后发短信、预热缓存、加积分
核心总结:Feign是同步要结果,RocketMQ是异步解耦,二者不仅不重复,反而在微服务架构中经常搭配使用
一、先搞懂最核心的基础:什么是同步?什么是异步?
搞懂这个,后面所有内容你都能一眼看懂。
1.1 同步:站在柜台等奶茶做好再走
你去奶茶店点单,付了钱之后,站在柜台前啥也不干,就等着奶茶做好。奶茶没做好,你就不能走、不能干别的事。
对应编程:主线程执行一个操作,必须等这个操作执行完、拿到结果,才能继续往下走。
缺点:如果做奶茶要10分钟,你就堵在柜台10分钟,后面的客人也没法点单,系统会卡住。
1.2 异步:拿号就坐,好了叫你
还是点奶茶,你付了钱之后,店员给你一个取餐号,你拿着号就找位置坐下玩手机,不用站在柜台等。奶茶做好了,店员会喊号叫你取餐。
对应编程:主线程把耗时的、不重要的活,交给别人去后台干,主线程继续往下走处理核心业务,后台活干完了再通知主线程。
优点:主线程不被卡住,能一直接待客人,系统响应更快,用户体验更好。
1.3 为什么我们必须要用异步?
不阻塞核心业务:比如用户查笔记详情,核心业务是把笔记内容返回给用户,预热缓存是次要的,不能让缓存写入耽误用户拿到结果。
提升系统吞吐量:主线程不用等耗时操作,能处理更多的用户请求。
业务解耦:核心业务和次要业务分开,比如下单是核心,发短信、加积分是次要,分开之后互不影响。
二、第一类异步工具:自定义线程池(小卖部的后台临时工)
2.1 本质大白话
你开了一家小卖部,你自己是老板(主线程),核心工作是接待客人、收钱、卖货。
但你还有很多杂活:打包商品、扫地、整理货架。这些活不影响接待客人,但你自己干的话,就会耽误接待客人。
于是你雇了几个临时工(线程池里的线程),专门在后台干这些杂活。你接待客人的时候,把杂活喊给临时工干,你继续接待客人,临时工在后台默默把活干完。
自定义线程池,就是你在Java程序(小卖部)里,自己雇的一批后台临时工,专门在同一个程序里干异步杂活,不耽误主线程干核心业务。
2.2 核心特点(大白话版)
同屋干活:临时工和你在同一个小卖部里(同一个JVM进程里),喊一声就动,干活速度极快,没有额外路程开销。
关门就停工:小卖部关门(程序重启/宕机/崩溃),临时工手里的活直接全扔了,再也不会干,任务直接丢失。
人多会乱:临时工雇太多,小卖部里挤不下会乱套(线程太多会导致内存溢出OOM,系统崩溃)。
只在本店干活:临时工只能帮你这个小卖部干活,不能帮隔壁超市干活(只能在当前程序里用,不能跨服务/跨机器)。
2.3 适用场景(什么时候必须用它)
同时满足以下所有条件,就用自定义线程池:
活只在本店干:不需要跨服务、跨机器,就在当前程序里执行。
丢了也没事:就算活没干完、程序重启了,也不会影响业务,不会出线上事故。
要快,要低延迟:追求极致速度,不想有额外的网络开销。
要么马上拿结果,要么纯轻量杂活:要么是并行干活,最后要汇总结果返回给用户;要么是极轻量的、不影响核心业务的杂活。
具体场景举例
同步接口并行查询提速:比如商品详情页,要同时查商品基础信息、库存、价格、标签,用线程池并行查,比串行查快好几倍,最后汇总结果返回给用户。
本地极轻量杂活:比如本地打印简易日志、内存打点统计、临时文件清理。
纯本地内存计算:比如Excel文件本地解析、批量数据内存转换、图片本地处理。
2.4 极简可运行代码样例
第一步:配置自定义线程池(SpringBoot项目)
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 自定义线程池配置 = 给小卖部雇临时工,定好规矩
*/
@Configuration
public class ThreadPoolConfig {
@Bean("myTaskExecutor")
public Executor myTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数 = 长期雇的固定临时工,就算没事干也不辞退
executor.setCorePoolSize(5);
// 最大线程数 = 忙的时候最多能雇多少临时工
executor.setMaxPoolSize(10);
// 队列容量 = 临时工都忙的时候,活放在等候区的最大数量
executor.setQueueCapacity(100);
// 非核心线程空闲时间 = 临时雇的临时工,没事干多久就辞退
executor.setKeepAliveSeconds(60);
// 线程名前缀 = 给临时工起名字,方便找是谁干的活
executor.setThreadNamePrefix("my-task-thread-");
// 拒绝策略 = 临时工全忙、等候区也满了,新活怎么处理(这里是交给主线程干)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
}第二步:使用线程池(并行查询提速场景)
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
@Service
public class GoodsService {
// 注入我们配置好的线程池
@Resource(name = "myTaskExecutor")
private Executor taskExecutor;
// 模拟商品详情页查询:并行查多个数据源,提速返回
public GoodsDetailVO getGoodsDetail(Long goodsId) {
// 1. 并行查商品基础信息,交给临时工干
Future<GoodsBaseInfo> baseInfoFuture = taskExecutor.submit(() -> getGoodsBaseInfo(goodsId));
// 2. 并行查库存,交给临时工干
Future<GoodsStock> stockFuture = taskExecutor.submit(() -> getGoodsStock(goodsId));
// 3. 并行查价格,交给临时工干
Future<GoodsPrice> priceFuture = taskExecutor.submit(() -> getGoodsPrice(goodsId));
try {
// 4. 汇总所有临时工的干活结果
GoodsDetailVO vo = new GoodsDetailVO();
vo.setBaseInfo(baseInfoFuture.get()); // 拿基础信息结果
vo.setStock(stockFuture.get()); // 拿库存结果
vo.setPrice(priceFuture.get()); // 拿价格结果
return vo;
} catch (Exception e) {
// 异常处理
throw new RuntimeException("查询商品详情失败", e);
}
}
// 以下是模拟的三个查询方法,实际项目中是查数据库/调用其他接口
private GoodsBaseInfo getGoodsBaseInfo(Long goodsId) throws InterruptedException {
// 模拟查库耗时100ms
Thread.sleep(100);
return new GoodsBaseInfo();
}
private GoodsStock getGoodsStock(Long goodsId) throws InterruptedException {
Thread.sleep(100);
return new GoodsStock();
}
private GoodsPrice getGoodsPrice(Long goodsId) throws InterruptedException {
Thread.sleep(100);
return new GoodsPrice();
}
}2.5 优缺点总结
三、第二类异步工具:RocketMQ(全国通用的快递驿站系统)
3.1 本质大白话
你是淘宝卖家(生产者),要给全国各地的买家(消费者)寄东西。
你不可能自己开车给每个买家送过去,于是你把包裹(消息)交给快递驿站(RocketMQ Broker),驿站会帮你:
把包裹存到仓库里(磁盘持久化),不会丢,哪怕你店关门了,包裹还在。
按收件地址(Topic),把包裹送到对应的买家手里。
买家没收到,会反复派送(重试机制),实在送不到,就放到问题包裹柜(死信队列),不会直接扔了。
你把包裹交给驿站之后,转身就去干别的事了,不用管包裹什么时候送到,买家有没有收到。买家收到包裹,也不用跟你打招呼。
RocketMQ,就是一个分布式的、高可靠的快递驿站系统,专门帮你在不同的程序、不同的服务器之间传递消息,实现跨服务的、可靠的异步通信。
3.2 核心特点(大白话版)
绝对不丢包裹:包裹只要送到驿站,就会存到仓库里,哪怕驿站停电、重启,包裹也不会丢,一定会送到买家手里。
彻底解耦:卖家和买家不用认识、不用见面、甚至不用同时在线,只要有驿站和收件地址,就能传递东西。
能跨店跨城:不管你在哪个城市,不管买家在哪个服务器,都能通过驿站送包裹,支持跨服务、跨机器、跨机房。
能扛高峰期:双十一的时候,几百万个包裹同时送到驿站,驿站能先存下来,慢慢派送,不会把快递员累死(削峰填谷,保护下游系统)。
有额外功能:支持定时派送(延迟消息)、一个包裹多个买家收(广播消息)、保证包裹不会送错(事务消息)。
3.3 适用场景(什么时候必须用它)
满足以下任意一个条件,就用RocketMQ:
活不能丢:这个任务必须执行完成,哪怕程序重启、宕机,也不能丢,不然会出业务问题。
要跨服务/跨机器:这个任务需要其他服务、其他机器来执行。
要削峰填谷:有瞬时高并发流量,需要缓冲,保护下游数据库/服务不被打垮。
业务解耦:核心业务和次要业务分开,互不影响。
需要重试/定时执行:任务失败要自动重试,或者需要延迟一段时间再执行。
具体场景举例
接口异步预热缓存:比如用户查笔记详情,核心业务是返回笔记内容,异步把笔记内容写入Redis缓存,保证缓存不丢,哪怕服务重启,缓存也能写入成功。
下单后的后置业务:创建订单之后,异步扣库存、发短信、加积分、生成物流单,主流程不阻塞,下游业务解耦。
秒杀/大促流量削峰:瞬时几十万的抢购请求,先发到RocketMQ里,消费者慢慢平稳消费,不会把数据库打垮。
延迟任务:订单15分钟未支付自动关闭、到期续费提醒。
分布式事务最终一致性:跨服务的事务,比如订单+支付+库存,保证数据最终一致,不会出现烂账。
3.4 极简可运行代码样例(SpringBoot整合RocketMQ)
第一步:引入pom依赖
<!-- SpringBoot整合RocketMQ的依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>第二步:配置application.yml
rocketmq:
# RocketMQ的地址,就是快递驿站的地址
name-server: 127.0.0.1:9876
# 生产者配置
producer:
# 生产者组名,就是卖家的店铺编号
group: note-producer-group第三步:定义消息体(包裹里的东西)
import lombok.Data;
import java.io.Serializable;
/**
* 笔记缓存消息体 = 快递包裹里的东西
*/
@Data
public class NoteCacheMsg implements Serializable {
private static final long serialVersionUID = 1L;
// 笔记ID
private Long noteId;
// 笔记详情数据,要写入Redis的内容
private NoteDetailRspVO noteDetail;
}第四步:生产者(卖家)发消息
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class NoteService {
// 注入RocketMQ模板,就是寄快递的工具
@Resource
private RocketMQTemplate rocketMQTemplate;
// 查询笔记详情的核心接口
public Response<NoteDetailRspVO> getNoteDetail(Long noteId) {
// 1. 核心业务:查数据库,获取笔记详情
NoteDetailRspVO noteDetail = queryNoteDetailFromDB(noteId);
// 2. 构建消息包裹
NoteCacheMsg msg = new NoteCacheMsg();
msg.setNoteId(noteId);
msg.setNoteDetail(noteDetail);
// 3. 把包裹交给快递驿站(发消息到RocketMQ),发完就走,不阻塞
// note-cache-topic 就是收件地址,只有对应地址的消费者能收到
rocketMQTemplate.syncSend("note-cache-topic", msg);
// 4. 直接返回结果给用户,不用等缓存写入完成
return Response.success(noteDetail);
}
// 模拟从数据库查笔记详情
private NoteDetailRspVO queryNoteDetailFromDB(Long noteId) {
// 实际项目中是查数据库
return new NoteDetailRspVO();
}
}第五步:消费者(买家)收消息,执行任务
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import cn.hutool.core.util.RandomUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* 笔记缓存消费者 = 收件人,专门收note-cache-topic地址的包裹
*/
@Component
// topic:收件地址,consumerGroup:收件人编号
@RocketMQMessageListener(
topic = "note-cache-topic",
consumerGroup = "note-cache-consumer-group"
)
public class NoteCacheConsumer implements RocketMQListener<NoteCacheMsg> {
@Resource
private RedisTemplate<String, String> redisTemplate;
@Resource
private ObjectMapper objectMapper;
// 收到消息(包裹)后,执行的逻辑
@Override
public void onMessage(NoteCacheMsg msg) {
try {
// 1. 把笔记详情转成JSON字符串
String noteDetailJson = objectMapper.writeValueAsString(msg.getNoteDetail());
// 2. 生成Redis的key
String redisKey = "note:detail:" + msg.getNoteId();
// 3. 过期时间:保底1天 + 随机秒数,打散过期时间,防止缓存雪崩
long expireSeconds = 60 * 60 * 24 + RandomUtil.randomInt(60 * 60 * 24);
// 4. 写入Redis
redisTemplate.opsForValue().set(redisKey, noteDetailJson, expireSeconds, TimeUnit.SECONDS);
} catch (Exception e) {
// 异常处理:抛出异常,RocketMQ会自动重试,不会丢消息
throw new RuntimeException("缓存写入失败", e);
}
}
}3.5 优缺点总结
四、一张表搞懂:自定义线程池 vs RocketMQ 核心区别全对比
五、再也不会选错:终极选型决策树+口诀
5.1 一步一步选,绝对不会错
第一步:这个任务如果没执行完,会不会出业务事故?
会 → 直接选 RocketMQ
不会 → 进入第二步
第二步:这个任务需要跨服务、跨机器执行吗?
是 → 直接选 RocketMQ
不是 → 进入第三步
第三步:这个任务执行完,需要马上拿到结果,同步返回给用户吗?
是 → 直接选 自定义线程池
不是 → 看情况:轻量杂活用线程池,需要可靠兜底用RocketMQ
5.2 一句口诀背下来,面试+实战通用
要可靠、要跨服、要削峰 → 用RocketMQ
本地跑、要提速、丢了没事 → 用线程池
六、实战对比:同个业务的两种写法(反面坑vs最佳实践)
6.1 业务场景
用户查询笔记详情,核心业务是把笔记内容返回给用户,次要业务是把笔记详情异步写入Redis缓存,做预热。
6.2 反面写法:用自定义线程池(有线上事故风险)
@Service
public class NoteService {
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Resource
private RedisTemplate<String, String> redisTemplate;
public Response<NoteDetailRspVO> getNoteDetail(Long noteId) {
// 1. 查数据库,获取笔记详情
NoteDetailRspVO findNoteDetailRspVO = queryNoteDetailFromDB(noteId);
// 2. 异步线程中将笔记详情存入 Redis(反面写法,有坑)
threadPoolTaskExecutor.submit(() -> {
String noteDetailJson1 = JsonUtils.toJsonString(findNoteDetailRspVO);
long expireSeconds = 60*60*24 + RandomUtil.randomInt(60*60*24);
redisTemplate.opsForValue().set(noteDetailRedisKey, noteDetailJson1, expireSeconds, TimeUnit.SECONDS);
});
// 3. 返回结果
return Response.success(findNoteDetailRspVO);
}
}这个写法的致命坑
任务会丢:如果服务刚把任务提交给线程池,还没执行完,就重启/宕机了,这个缓存写入任务直接丢了,缓存没预热,下次查询会直接查数据库,有缓存击穿风险。
无重试机制:如果Redis临时挂了,缓存写入失败,这个任务就没了,不会自动重试,缓存还是没写入。
不可观测:线程池里的任务有没有执行成功、有没有失败,你不知道,只能去翻日志,出了问题很难排查。
资源竞争:如果这个线程池同时还干别的活,缓存写入任务太多,会占用线程资源,影响其他核心任务。
6.3 最佳实践写法:用RocketMQ(生产级可靠)
就是3.4章节的完整代码,核心优势:
绝对不丢任务:消息只要发到RocketMQ,就会持久化到磁盘,哪怕服务宕机、重启,消息还在,消费者重启后会继续消费,缓存一定会写入。
自带重试机制:如果Redis挂了,缓存写入失败,抛出异常,RocketMQ会自动重试,直到写入成功,不会丢消息。
可观测性强:RocketMQ控制台能实时看到消息有没有消费成功、有没有堆积、重试了多少次,出了问题一眼就能定位。
资源隔离:缓存写入的消费逻辑和核心业务完全隔离,不会占用核心业务的线程资源,互不影响。
七、全场景最佳实践 vs 反面案例对照
八、生产环境必看:避坑指南
8.1 自定义线程池避坑
千万别用Executors创建线程池:
Executors.newCachedThreadPool()会无限创建线程,Executors.newFixedThreadPool()队列无界,都会导致OOM,必须用ThreadPoolTaskExecutor手动配置参数。核心线程数别乱设:CPU密集型任务,核心线程数=CPU核心数+1;IO密集型任务,核心线程数=CPU核心数*2,别设太大。
一定要给线程起名字:设置
ThreadNamePrefix,出问题的时候能快速定位是哪个线程池的问题。一定要设置合理的拒绝策略:别用默认的
AbortPolicy(直接抛异常),建议用CallerRunsPolicy(交给主线程执行),保护系统不崩溃。别用线程池执行不能丢的任务:只要是任务丢了会出业务问题的,一律不用线程池,用RocketMQ。
8.2 RocketMQ避坑
一定要处理重复消费:RocketMQ保证消息至少投递一次,所以消费者必须做幂等处理,比如用笔记ID做唯一键,避免重复写入缓存。
别用同步发送阻塞核心业务:核心接口里用
syncSend,要设置超时时间,别让MQ发送失败阻塞核心业务返回。消息别太大:单条消息体别超过1M,太大的消息会导致性能下降,甚至发送失败。
一定要监控消息堆积:消息堆积是线上最常见的问题,一定要监控消费进度,堆积了及时扩容消费者。
别用RocketMQ做同步等待结果的场景:天生异步,别硬改成同步,不然还不如用Feign。
九、一句话终极总结
Feign是打电话:同步点对点通信,必须立刻拿到结果。
自定义线程池是小卖部临时工:纯本地内存级异步,适合并行提速、轻量杂活,丢了没事。
RocketMQ是快递驿站:分布式可靠异步通信,适合不能丢的任务、跨服务解耦、削峰填谷。
三者完全不重复,各司其职,选对工具,才能写出稳定、高效、易维护的代码。