Javaで並行処理するのにサクッと使えるCompletableFuture

ScalaならFutureでも十分使いやすいのであまり困ることはないが、JavaFutureScalaFutureとは異なり使いにくい(個人的な感想)。 似たような方法で簡単に並行処理を行える方法はないか?と調べたところ、CompletableFutureなるものがあるのを見つけた。1.8から導入されたっぽい。

CompletableFuture#completedFutureCompletableFuture#failedFutureで実行完了済みのCompletableFutureを返すことができる。 値を返さないものはCompletableFuture#runAsyncRunnableを渡せばいいし、CompletableFuture#supplyAsyncSupplierを渡してthenXXXで処理を繋げることもできる。 CompletableFuture#allOfを使うと、複数のタスクを待ち受ける1つのCompletableFutureに纏め上げることも簡単になっている。

以下は、動作を確認するための最小コード。Java11で実行できる。

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

public class Main {

    public static void main(String[] args) {
        final var NUM_CONCURRENCY = 4;
        final var TASK_SIZE = 10;
        final var MAX_WAIT_SECOND = 5;
        final var MIN_WAIT_SECOND = 1;
        final var es = Executors.newFixedThreadPool(NUM_CONCURRENCY);
        final var random = new Random();

        final var tasks =
            Stream.generate(
                () -> random.nextInt(MAX_WAIT_SECOND - MIN_WAIT_SECOND + 1) + MIN_WAIT_SECOND)
                .map(waitSecond -> CompletableFuture.runAsync(() -> {
                    try {
                        System.out.println(String.format("start task: %d second.", waitSecond));
                        Thread.sleep(waitSecond * 1000L);
                        System.out.println(String.format("end task: %d second.", waitSecond));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }, es))
                .limit(TASK_SIZE)
                .toArray(CompletableFuture[]::new);

        CompletableFuture.allOf(tasks).join();

        es.shutdown();
    }

}

実行結果は以下のような感じ。Thread#sleepブロッキング処理なので、スレッドを占有する。NUM_CONCURRENCY = 4で実行すると、タスクは4つしか同時に実行されない。

start task: 1 second.
start task: 5 second.
start task: 4 second.
start task: 2 second.
end task: 1 second.
start task: 2 second.
end task: 2 second.
start task: 1 second.
end task: 2 second.
start task: 3 second.
end task: 1 second.
start task: 1 second.
end task: 4 second.
start task: 3 second.
end task: 1 second.
start task: 5 second.
end task: 5 second.
end task: 3 second.
end task: 3 second.
end task: 5 second.

NUM_CONCURRENCY = 1で実行すると、タスクは逐次処理される事になる。(タスクの順番はインタリーブするかもしれない。)