CompletionStage/CompletionFuture are great API to orchestrate multiple systems (in the cloud erea it is really the way to go) but Java missed some basic features when they created it.

The first one is exceptionallyCompose or handleCompose. Behind these weird names is hidden the ability to return another completion stage when handling exceptions. 

The second one is the ability to choose to fail fast or not when using CompletionFuture.allOf. If you use concurrent calls and need all result states to be able to clean up resources you don't want the default fail fast behavior which will make you leak some of this state. A quite explicit example is the usage of a cloud SDK (like amazon one), if you create instances and fail fast, you will not be able to clean up some instances and AWS will not forget to bill it ;).

Chain exception handling reactively

To understand that need let's first see what a completion stage chain can look like:

CompletionStage<Result> stage = createStage();
stage
  .thenCompose(result1 -> doRemoteCall(result1)) // 1
  .thenCompose(result2 -> doAnotherRemoteCall(result2))
  1. Here the current stage will be concatenated with the doRemoteCall stage which means, as the caller I see a single stage representing the last result. It is a kind of flatMap for CompletionStages.

It is a common need, right?

Now if you want to handle errors you have two main options, either you use exceptionally:

CompletionStage<Result> stage = createStage();
stage
  .exceptionally(error -> getFallbackResult())

or directly through handle:

CompletionStage<Result> stage = createStage();
stage
  .handle((result, error) -> error != null ? getFallbackResult() : result)

Side note: indeed you can throw another exception to wrap the error but here I just want to show the fallback case.

As you can see both options return the completion stage result, so you can't use that to do a retry for example or - the case I encountered - cleanup resources through some remote calls which return a completion stage and awaiting for them to complete before actually returning.

In pseudo code I would like to do that:

CompletionStage<Result> stage = createStage();
return stage
  .exceptionallyCompose(error -> {
     log.error(error);
     return cleanupResources();
  });

With cleanupResources something like this method signature:

public CompletionStage<ResourceIds> cleanupResources();

Don't fail fast when exception N stages

Assuming you start N tasks and want to synchronize the result of all of them to execute another task, you can be tempted to use CompletionFuture.allOf. But this one will fail fast and return early if one of the future failed.

This means you will miss some state of other promise if you want to do some cleanup. Indeed you can then subscribe to each of the promise but it kind of defeat the usage of allOf.

Add to that previous point that the exception handling can't be reactive and the API becomes quite complex to orchestrate services.

Aggregate results and use a state flag

To solve that my proposal - until the JVM enriches its API - is to use an aggregator (Collector) and combine the results with a reducer.

Instead of starting from an array of futures, you start from a stream (or collection in practise), then you reduce them in a custom aggregator which will have its own promise facade you will use instead of allOff.

Concretely instead of doing:

CompletionFuture<?>[] futures = getFutures();
return CompletionFuture.allOf(futures);

You will do:

List<CompletionFuture<?>> futures = getFutures();
return dependencies.stream()
  .reduce(new Aggregator<Result, List<Result>>(futures.size(), toList()), // 1
          Aggregator::handle, Aggregator::merge)
  .asPromise();
  1. The aggregator takes a number of task to await and a collector to merge the results (here just appending them thanks the Collector.toList() collector).

It looks a bit more complicated but then:

  • The promise has the aggregated result, no need to go through futures to read it,
  • You are sure all promises are done when chain tasks to this stage,
  • You can ensure it returns a boolean "failed/success" and let you chain an error handling returning another stage

Concretely here is what can be the chain once complete:

?List<CompletionFuture<?>> futures = getFutures();
return dependencies.stream()
  .reduce(new Aggregator<Result, List<Result>>(futures.size(), toList()),
          Aggregator::handle, Aggregator::merge)
  .asNotFaillingPromise()
  .thenCompose(result -> {
    if (result.isSuccess()) return handleResult(result.getValue()); // 1
    return cleanupResources(); // 2
  });

?
  1. We can test if all the calls suceeded and then get the aggregated result to continue the processing chain,
  2. If at least one call failed we can cleanup everything returning another stage

In terms of implementation, a simple flavor can be:

@Slf4j
public class Aggregator<T, A, R> {
    private final int total;
    private final AtomicInteger remaning;
    private final Collector<T, A, R> collector;
    private final Collection<Throwable> errors = new ArrayList<>();
    private final CompletableFuture<R> promise = new CompletableFuture<>();
    private volatile A aggregator;

    public Aggregator(final int remaning, final Collector<T, A, R> collector) {
        this.total = remaning;
        this.remaning = new AtomicInteger(remaning);
        this.collector = collector;
        this.aggregator = collector.supplier().get();
    }

    public void onSuccess(final T value) {
        synchronized (this) {
            collector.accumulator().accept(aggregator, value);
        }
        onValue();
    }

    public void onFailure(final Throwable value) {
        log.error(value.getMessage(), value);
        synchronized (this) {
            errors.add(value);
        }
        onValue();
    }

    public Aggregator<T, A, R> handle(final CompletionStage<T> promise) {
        promise.handle(this::handle);
        return this;
    }

    public T handle(final T result, final Throwable error) {
        if (error != null) {
            onFailure(error);
        } else {
            onSuccess(result);
        }
        return result;
    }

    public synchronized Aggregator<T, A, R> merge(final Aggregator<T, A, R> other) {
        remaning.addAndGet(total - other.remaning.get());
        other.errors.forEach(this::onFailure);
        aggregator = collector.combiner().apply(aggregator, other.aggregator);
        return this;
    }

    private void onValue() {
        if (remaning.decrementAndGet() != 0) {
            return;
        }
        // no more need to synchronize, all concurrency is done there
        if (!errors.isEmpty()) {
            final IllegalStateException exception = new IllegalStateException("Invalid execution");
            errors.forEach(exception::addSuppressed);
            promise.completeExceptionally(exception);
        }
        promise.complete(collector.finisher().apply(aggregator));
    }

    public CompletionStage<R> asPromise() {
        return promise;
    }

    /**
     * Replaces the standard completion stage by wrapping the actual result in a wrapper object containing the execution status (failed or not).
     * It enables to handle failures and chain it with another CompletionStage while the JVM does not have an exceptionallyCompose like method.
     *
     * @return a promise always succeeding holding the actual state (sucess/failure) of the execution.
     */
    public CompletionStage<Result<R>> asSafePromise() {
        final CompletableFuture<Result<R>> out = new CompletableFuture<>();
        promise.handle((result, error) -> out.complete(new Result<>(result, null)));
        return out;
    }

    @Getter
    @AllArgsConstructor(access = PRIVATE)
    public static class Result<R> {
        private final R value;
        private final Throwable error;

        public boolean isSuccess() {
            return value != null;
        }
    }

    public static class Simple<T, R> extends Aggregator<T, Object, R> {
        public <A> Simple(final int remaning, final Collector<T, A, R> collector) {
            super(remaning, (Collector<T, Object, R>) collector);
        }

        public synchronized Simple<T, R> merge(final Aggregator.Simple<T, R> other) {
            super.merge(other);
            return this;
        }

        @Override
        public Aggregator.Simple<T, R> handle(final CompletionStage<T> promise) {
            super.handle(promise);
            return this;
        }
    }
}

This implementation really just wraps a Collector implementation and makes its handle CompletionStage friendly. Indeed, you can achieve the save implementing a custom collector wrapping another one (like groupingBy does for example).

With this implementation you will use Aggregator.Simple instead of just Aggregator (mainly to keep a fluent API with generics).

Our previous chain, therefore, becomes:

??List<CompletionFuture<?>> futures = getFutures();
return dependencies.stream()
  .reduce(new Aggregator.Simple<Result, List<Result>>(futures.size(), toList()),
          Aggregator.Simple::handle, Aggregator.Simple::merge)
  .asSafePromise()
  .thenCompose(result -> {
    if (result.isSuccess()) return handleResult(result.getValue());
    return cleanupResources();
  });

Here we are, we can ensure we really await all promises are done to execute some code, we can chain an error handling process returning a stage/promise too and we can aggregate the result of each task on the fly without having to do it in a completely custom "per application" fashion linking manually tasks and merging logic.

When the usage of CompletionStage is not "local" in your application, this really clean up the overall code and enables to have a "standard" way to build orchestration chains and the class enabling it is not complex or huge so it really worth doing it IMHO.

 

From the same author:

In the same category: