使用 CompletableFuture 解决接口大 payload 处理慢的问题
Intro
最近在项目中遇到一个接口需要减少响应时间. 这个接口的操作步骤有很多步.于是脑子就想起了流处理+异步, 尝试使用了 Java 的 CompletableFuture. 达到了预期的效果
背景是: 一个接口需要接收几万条数据的一个大 json 列表将所有符合条件的数据入库.
至于 CompletableFuture 的用法本篇不再赘述.
实战
于是分一下几步:
- 初始化返回 response 类(因为不管参数校验或者报错都要返回调用方)
- 将所有的数据进行参数校验
- 检查是否已经在库中(db 操作)
- 将不在库中的数据进行插入
- 将插入成功的数据对应的那条 response 类设置为成功
有几个问题需要关注一下:
- 如何在流处理中前面步骤失败的情况下跳过本步骤?
- 介于 CompletableFuture 会返回 Future 类. 采取怎样的方式去收集所有的流处理数据结果?
解决方案:
因为每一步其实都是 response 类在传递. response 类中添加一个 flag 来标记是否该条数据处理完成. 并且在 response 类中保存 request
1
2
3
4
5
6
7
8
9
10
11
12
13public class Response{
volatile boolean completeFlag;
Request r;
public boolean isCompleteFlag(){ return completeFlag; }
public boolean setCompleteFlag(boolean flag){ this.completeFlag = flag; }
public boolean markComplete(){setCompleteFlag(true);}
public void setRequest(Request r){this.r=r;}
}在循环处理每一条数据之前创建一个 list, 通过 list 去存所有的 future. 将 CompletableFuture 流处理结果存入 list 中.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21ArrayList<CompletableFuture<Response>> list = new ArrayList<>(size); // size is batch size
for(Request r: requestList){
CompletableFuture<Response> future = CompletableFuture
.supplyAssync(()->r)
.thenApply(xxx) // 这里是要传入一些Function, 所以需要自定义一些 Function 类
.thenApply(xxx)
.handle(xxx); // 当然必须 handle 一下 error
list.add(future);
}
// 在最后等待获取结果
CompletableFuture<Void> allOf = CompletableFuture.allOf(list.toArray(new CompletableFuture[]));
allOf.join();
// 使用流处理的方式获取所有结果
allOf
.thenApply(v -> list.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()) // 获取每个 list 成员的结果
)
.join().stream() // stream of Response
.collect(Collectors.toList()) // list of Response
结果
以 25k 条 json object 数据为例:
- 修改前处理总时间需要最快 90 秒
- 采用流处理后时间基本稳定在 20 - 30秒
所以, 个人认为, Stream + CompletableFuture + 线程池 这三者对于这种高并发的时刻成效显著.