使用 CompletableFuture 解决接口大 payload 处理慢的问题

使用 CompletableFuture 解决接口大 payload 处理慢的问题

Intro

最近在项目中遇到一个接口需要减少响应时间. 这个接口的操作步骤有很多步.于是脑子就想起了流处理+异步, 尝试使用了 Java 的 CompletableFuture. 达到了预期的效果

背景是: 一个接口需要接收几万条数据的一个大 json 列表将所有符合条件的数据入库.

至于 CompletableFuture 的用法本篇不再赘述.

实战

于是分一下几步:

  1. 初始化返回 response 类(因为不管参数校验或者报错都要返回调用方)
  2. 将所有的数据进行参数校验
  3. 检查是否已经在库中(db 操作)
  4. 将不在库中的数据进行插入
  5. 将插入成功的数据对应的那条 response 类设置为成功

有几个问题需要关注一下:

  1. 如何在流处理中前面步骤失败的情况下跳过本步骤?
  2. 介于 CompletableFuture 会返回 Future 类. 采取怎样的方式去收集所有的流处理数据结果?

解决方案:

  1. 因为每一步其实都是 response 类在传递. response 类中添加一个 flag 来标记是否该条数据处理完成. 并且在 response 类中保存 request

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class Response{

    volatile boolean completeFlag;
    Request r;

    public boolean isCompleteFlag(){ return completeFlag; }

    public boolean setCompleteFlag(boolean flag){ this.completeFlag = flag; }

    public boolean markComplete(){setCompleteFlag(true);}

    public void setRequest(Request r){this.r=r;}
    }
  2. 在循环处理每一条数据之前创建一个 list, 通过 list 去存所有的 future. 将 CompletableFuture 流处理结果存入 list 中.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    ArrayList<CompletableFuture<Response>> list = new ArrayList<>(size); // size is batch size
    for(Request r: requestList){
    CompletableFuture<Response> future = CompletableFuture
    .supplyAssync(()->r)
    .thenApply(xxx) // 这里是要传入一些Function, 所以需要自定义一些 Function 类
    .thenApply(xxx)
    .handle(xxx); // 当然必须 handle 一下 error
    list.add(future);
    }
    // 在最后等待获取结果
    CompletableFuture<Void> allOf = CompletableFuture.allOf(list.toArray(new CompletableFuture[]));
    allOf.join();

    // 使用流处理的方式获取所有结果
    allOf
    .thenApply(v -> list.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toList()) // 获取每个 list 成员的结果
    )
    .join().stream() // stream of Response
    .collect(Collectors.toList()) // list of Response

结果

以 25k 条 json object 数据为例:

  • 修改前处理总时间需要最快 90 秒
  • 采用流处理后时间基本稳定在 20 - 30秒

所以, 个人认为, Stream + CompletableFuture + 线程池 这三者对于这种高并发的时刻成效显著.