Wednesday, January 6, 2016

CompletableFuture

Basic usage

// Use runAsync instead when the Task is a Runnable
final CompletableFuture<String> f1 = 

    CompletableFuture.supplyAsync(() -> longRunningTask1(params), executor);
final CompletableFuture<String> f2 = ...

final CompletableFuture<String> f3 = ...

// Wait for all to complete

CompletableFuture.allOf(f1, f2, f3).join();

References:

Java 8 parallel stream gotchas

When using parallel streams in Java you are using the ForkJoinPool.commonPool() singleton and therefore sharing a single thread pool across the entire program. It, by default, has one less threads as you have processors, as returned by Runtime.availableProcessors() (can be overriden with -Djava.util.concurrent.ForkJoinPool.common.parallelism).

As a consequence:

  • You have no control over the pool properties/size.
  • There is no isolation between pools so long running tasks or IO locking calls may cause performance problems.
References:


The solution is executing the stream in a specific ForkJoinPool:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    IntStream.range(1, 1_000_000)
      .parallel()
      .filter(PrimesPrint::isPrime)
      .collect(toList())
).get();

References:

Other solution using CompletableFuture:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
  range(1, 1_000_000)
    .parallel()
    .filter(PrimesPrint::isPrime)
    .collect(toList()), 
  forkJoinPool
);

See next blog entry on  CompletableFuture.