背景:
系统需要在极短的时间(短时间可以减少实际余额偏差)拉取多个第三方平台的账户余额,并保存到数据库。
每个平台都是HTTP IO 调用 + 数据解析 + DB 写入,典型的IO 密集型任务。
一、业务场景简介(将具体的平台脱敏了,是真实数据)
系统涉及多个余额来源:
- 单账号平台
- PLATFORM_A
- PLATFORM_B
- PLATFORM_C
- PLATFORM_D
- 多账号平台(N 个账号,数量不固定)
每次定时任务需要:
- 调用平台接口获取余额
- 解析返回数据
- 保存余额记录到数据库
二、顺序执行版本(传统写法)
核心特点
- 所有平台按顺序执行
- 每一个平台的网络 IO 都会阻塞当前线程
- 总耗时 =所有平台耗时之和
@Slf4j@ComponentpublicclassBalanceTaskSequential{@AutowiredprivateMultiAccountUtilsmultiAccountUtils;@AutowiredprivatePlatformAUtilsplatformAUtils;@AutowiredprivatePlatformBUtilsplatformBUtils;@AutowiredprivatePlatformCUtilsplatformCUtils;@AutowiredprivatePlatformDUtilsplatformDUtils;@AutowiredprivateMultiAccountMappermultiAccountMapper;@AutowiredprivateAccountBalanceMapperaccountBalanceMapper;@ResourceprivateStringRedisTemplateredisTemplate;privatestaticfinalintPLATFORM_MULTI=0;privatestaticfinalintPLATFORM_A=1;privatestaticfinalintPLATFORM_B=2;privatestaticfinalintPLATFORM_C=3;privatestaticfinalintPLATFORM_D=4;@Scheduled(cron="0/5 * * * * ?")publicvoidsaveAccountBalanceTask(){longstartTime=System.currentTimeMillis();log.info("开始拉取所有平台余额(顺序)");// --- PLATFORM_A ---try{BigDecimalbalance=platformAUtils.getBalance().getBigDecimal("balance");saveBalance("PLATFORM_A账号",balance,PLATFORM_A,null);}catch(Exceptione){log.error("PLATFORM_A获取余额异常",e);}// --- PLATFORM_B ---try{ResponseDtodto=platformBUtils.getAccountBalance();if(dto.getCode()==1){BigDecimalbalance=newBigDecimal(JSONObject.parseObject(dto.getData()).getString("data"));saveBalance("PLATFORM_B账号",balance,PLATFORM_B,null);}}catch(Exceptione){log.error("PLATFORM_B获取余额异常",e);}// --- PLATFORM_C ---try{Map<String,String>response=platformCUtils.getMerchantBalance();if("0".equals(response.get("code"))){BigDecimalbalance=JSONObject.parseObject(response.get("data")).getBigDecimal("Money");saveBalance("PLATFORM_C账号",balance,PLATFORM_C,null);}}catch(Exceptione){log.error("PLATFORM_C获取余额异常",e);}// --- PLATFORM_MULTI ---List<AccountApiInfo>multiAccounts=multiAccountMapper.findAllActiveAccounts();for(AccountApiInfoaccount:multiAccounts){try{ResponseDtodto=multiAccountUtils.getBalance(account.getApiKey());if(dto.getCode()==1){BigDecimalbalance=newBigDecimal(dto.getData());saveBalance(account.getName(),balance,PLATFORM_MULTI,account.getPhone());}}catch(Exceptione){log.error("PLATFORM_MULTI账号[{}]获取余额异常",account.getName(),e);}}// --- PLATFORM_D ---try{Stringtoken=redisTemplate.opsForValue().get("platform_d_token");Map<String,String>headers=Map.of("Authorization",token);Map<String,String>data=platformDUtils.getBalance(headers);if("0".equals(data.get("code"))){BigDecimalbalance=newBigDecimal(data.get("data"));saveBalance("PLATFORM_D账号",balance,PLATFORM_D,null);}}catch(Exceptione){log.error("PLATFORM_D获取余额异常",e);}longendTime=System.currentTimeMillis();log.info("顺序任务结束,总耗时: {} ms",endTime-startTime);}privatevoidsaveBalance(Stringname,BigDecimalbalance,inttype,Stringphone){AccountBalanceaccountBalance=newAccountBalance(null,name,phone,balance,type,1,null,LocalDateTime.now());accountBalanceMapper.insert(accountBalance);log.info("{}余额保存成功: {}",name,balance);}}存在的问题
| 问题 | 说明 |
|---|---|
| ⏳ 总耗时长 | 任意一个平台慢,全局都慢 |
| 🧵 线程阻塞 | HTTP IO 占用线程 |
| 📈 扩展性差 | 多账号平台账号数越多越慢 |
顺序执行时间
三、虚拟线程并发版本(JDK 21+)
设计思路
- 每个平台一个任务
- 每个账号一个虚拟线程
- 使用
Executors.newVirtualThreadPerTaskExecutor() - 使用
CountDownLatch等待所有任务完成
@Slf4j@ComponentpublicclassBalanceTaskConcurrent{@AutowiredprivateMultiAccountUtilsmultiAccountUtils;@AutowiredprivatePlatformAUtilsplatformAUtils;@AutowiredprivatePlatformBUtilsplatformBUtils;@AutowiredprivatePlatformCUtilsplatformCUtils;@AutowiredprivatePlatformDUtilsplatformDUtils;@AutowiredprivateMultiAccountMappermultiAccountMapper;@AutowiredprivateAccountBalanceMapperaccountBalanceMapper;@ResourceprivateStringRedisTemplateredisTemplate;privatestaticfinalintPLATFORM_MULTI=0;privatestaticfinalintPLATFORM_A=1;privatestaticfinalintPLATFORM_B=2;privatestaticfinalintPLATFORM_C=3;privatestaticfinalintPLATFORM_D=4;@Scheduled(cron="0/5 * * * * ?")publicvoidsaveAccountBalanceTask(){longstartTime=System.currentTimeMillis();log.info("开始拉取所有平台余额(并发)");List<AccountApiInfo>multiAccounts=multiAccountMapper.findAllActiveAccounts();inttotalTasks=4+multiAccounts.size();CountDownLatchlatch=newCountDownLatch(totalTasks);try(ExecutorServiceexecutor=Executors.newVirtualThreadPerTaskExecutor()){// --- PLATFORM_A ---executor.submit(()->runBalanceTask(()->{BigDecimalbalance=platformAUtils.getBalance().getBigDecimal("balance");saveBalance("PLATFORM_A账号",balance,PLATFORM_A,null);},"PLATFORM_A",latch));// --- PLATFORM_B ---executor.submit(()->runBalanceTask(()->{ResponseDtodto=platformBUtils.getAccountBalance();if(dto.getCode()==1){BigDecimalbalance=newBigDecimal(JSONObject.parseObject(dto.getData()).getString("data"));saveBalance("PLATFORM_B账号",balance,PLATFORM_B,null);}},"PLATFORM_B",latch));// --- PLATFORM_C ---executor.submit(()->runBalanceTask(()->{Map<String,String>response=platformCUtils.getMerchantBalance();if("0".equals(response.get("code"))){BigDecimalbalance=JSONObject.parseObject(response.get("data")).getBigDecimal("Money");saveBalance("PLATFORM_C账号",balance,PLATFORM_C,null);}},"PLATFORM_C",latch));// --- PLATFORM_MULTI ---for(AccountApiInfoaccount:multiAccounts){executor.submit(()->runBalanceTask(()->{ResponseDtodto=multiAccountUtils.getBalance(account.getApiKey());if(dto.getCode()==1){BigDecimalbalance=newBigDecimal(dto.getData());saveBalance(account.getName(),balance,PLATFORM_MULTI,account.getPhone());}},"PLATFORM_MULTI-"+account.getName(),latch));}// --- PLATFORM_D ---executor.submit(()->runBalanceTask(()->{Stringtoken=redisTemplate.opsForValue().get("platform_d_token");Map<String,String>headers=Map.of("Authorization",token);Map<String,String>data=platformDUtils.getBalance(headers);if("0".equals(data.get("code"))){BigDecimalbalance=newBigDecimal(data.get("data"));saveBalance("PLATFORM_D账号",balance,PLATFORM_D,null);}},"PLATFORM_D",latch));latch.await();}catch(Exceptione){log.error("虚拟线程执行异常",e);}finally{longendTime=System.currentTimeMillis();log.info("定时任务结束,总耗时: {} ms",endTime-startTime);}}privatevoidrunBalanceTask(Runnabletask,Stringname,CountDownLatchlatch){try{task.run();}catch(Exceptione){log.error("{}余额获取异常",name,e);}finally{latch.countDown();}}privatevoidsaveBalance(Stringname,BigDecimalbalance,inttype,Stringphone){AccountBalanceaccountBalance=newAccountBalance(null,name,phone,balance,type,1,null,LocalDateTime.now());accountBalanceMapper.insert(accountBalance);log.info("{}余额保存成功: {}",name,balance);}}并发执行时间
四、虚拟线程到底解决了什么问题?
1️⃣ IO 不再“占用”线程
传统线程模型:
# OS 线程为操作系统真实线程 1 HTTP 请求 = 1 OS 线程阻塞虚拟线程模型:
# OS 线程为操作系统真实线程 1 HTTP 请求 = 虚拟线程挂起(不占 OS 线程)➡️CPU 线程被释放出来给其他任务使用
2️⃣ 并发数量不再是问题
| 模型 | 可承载并发 |
|---|---|
| 线程池 | 几百 ~ 几千 |
| 虚拟线程 | 几十万级别 |
对于多账号余额采集:
- 10 个账号?没区别
- 100 个账号?没区别
- 1000 个账号?依然可控
3️⃣ 代码仍然是“同步写法”
这是虚拟线程最恐怖的优势:
❌ CompletableFuture 地狱
❌ Reactive 风格侵入
❌ 回调嵌套
✅ try / catch
✅ 顺序逻辑
✅ 易读、易调试
4️⃣ 实际耗时对比(真实业务)
假设:
- 单个平台平均耗时:4个
- 多账号平台:4 个账号
| 方案 | 总耗时 |
|---|---|
| 顺序执行 | ≈4430ms |
| 虚拟线程并发 | ≈1254ms |
➡️耗时降低接近 4 倍
五、为什么不直接用线程池?
线程池的根本问题
| 问题 | 描述 |
|---|---|
| 线程昂贵 | 每个线程 1MB+ 栈 |
| 易耗尽 | 高并发下 OOM |
| 配置困难 | core/max/queue 很难调 |
| IO 浪费 | 大量线程在等待 |
虚拟线程的本质
线程 = 任务抽象,而不是系统资源
JVM 帮你做了:
- 调度
- 挂起
- 恢复
- 映射到少量载体线程
六、适合使用虚拟线程的场景
✅ HTTP 调用
✅ RPC / 第三方接口
✅ DB 操作
✅ 定时批量任务
✅ 多账号 / 多租户系统
❌ CPU 密集型计算
❌ 高精度实时任务
七、总结
虚拟线程不是“并发新玩法”,而是 Java 并发模型的质变
对这类余额采集系统来说:
- ✅ 更快
- ✅ 更稳
- ✅ 更简单
- ✅ 更容易扩展
📌一句话结论:
虚拟线程 = 用同步代码写出高并发系统