Java 8 and its Stream API brings a lot of fluentness to Java world. Its lazy evaluation (only when a leaf is called like a collector) is very powerful and makes composability of the application logic smooth and efficient.

However Stream API is not perfect and it misses a few features, in particular the ability to decorate streams. What could it be used for? be able to add a feature whatever leaf is called after. Since the stream is lazy evaluated, all the chain elements will be invoked at that moment. We can see it with the following example:

return Stream.of("a", "b").map(this::mapString).collect(toList());

The string mapping (mapString) will be executed in the context of the collection (toList()). This means that if you inject a "context" into the collection you inject a context for the full stream.

Consequently if you return a Stream or compose Streams to create another one you can want to inject a final context as well. This is very useful in EE context where you can desire to force a ClassLoader, some security context, or any caller context at the intermediate steps time.

Looking at the API you can have a hope with the onClose(Runnable) method but this one is actually only called on close() invocation...which is never called in practise.

To solve that you can write a Stream facade which will take a Runnable or better a Consumer<Runnable> which will be able to setup the context and reset it after the collection. It can be implemented through multiple ways but the most portable - in particular to handle Java 8 and Java 9 is to use a Java proxy. Therefore all the logic will be defined into an InvocationHandler.

The logic can be summarized as such:

  • If the returned instance is a stream (BaseStream to include primitive streams too) then just invoke the method and propagate the hook to the returned instance - i.e. re-decorate the returned instance.
  • Else, it is a leaf invocation, so wrap the invocation through the Consumer.

In practise it can be implemented as such:

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.BaseStream;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;

public class StreamDecorator implements InvocationHandler {
    private final BaseStream delegate;
    private final Consumer<Runnable> leafDecorator;

    private StreamDecorator(final BaseStream delegate, final Consumer<Runnable> leafDecorator) {
        this.delegate = delegate;
        this.leafDecorator = leafDecorator;
    }

    // if method is iterator() or splitIterator() the behavior is likely not the hoped exact one but ok for us
    @Override
    public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
        try {
            final boolean stream = BaseStream.class.isAssignableFrom(method.getReturnType());
            final Object result = stream ? method.invoke(delegate, args) : wrap(() -> {
                try {
                    return method.invoke(delegate, args);
                } catch (final IllegalAccessException e) {
                    throw new IllegalArgumentException(e);
                } catch (final InvocationTargetException e) {
                    throw new IllegalArgumentException(e.getTargetException());
                }
            });
            if (stream) {
                if (Stream.class.isInstance(result)) {
                    return decorate(Stream.class.cast(result), Stream.class, leafDecorator);
                }
                if (IntStream.class.isInstance(result)) {
                    return decorate(IntStream.class.cast(result), IntStream.class, leafDecorator);
                }
                if (LongStream.class.isInstance(result)) {
                    return decorate(LongStream.class.cast(result), LongStream.class, leafDecorator);
                }
                if (DoubleStream.class.isInstance(result)) {
                    return decorate(DoubleStream.class.cast(result), DoubleStream.class, leafDecorator);
                }
            }
            return result;
        } catch (final InvocationTargetException ite) {
            throw ite.getTargetException();
        }
    }

    private <V> V wrap(final Supplier<V> supplier) {
        final AtomicReference<V> ref = new AtomicReference<>();
        leafDecorator.accept(() -> ref.set(supplier.get()));
        return ref.get();
    }

    public static <T> Stream<T> decorate(final Stream<T> delegate, final Consumer<Runnable> wrapper) {
        return decorate(delegate, Stream.class, wrapper);
    }

    private static <T, S extends BaseStream<T, ?>> S decorate(final S delegate, final Class<S> type,
                                                              final Consumer<Runnable> wrapper) {
        return (S) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[] { type },
                new StreamDecorator(delegate, wrapper));
    }
}

Then you can use that decorator factory with any stream:

Stream<String> stream = Stream.of("a", "b"); // <1>
Consumer<Runnable> decorator = collector -> { // <2>
    System.out.println("Before");
    collector.run();
    System.out.println("After");
};
// <3>
List<String> collect = StreamDecorator.decorate(stream, decorator)
        .collect(toList());
  1. We create a stream - can be anything
  2. We create a consumer of Runnable, the runnable being the leaf invocation (collect, ...)
  3. We wrap our stream and associate it with the decorator and finally we call a leaf (collect(toList())) but we could call any Stream method after the decoration

 

This decorator implementation is quite simple but really allows you to go further in the lazyness and propagation of the Streams  into your API. Now you can return a wrapped stream which will enforce a context during the evaluation without having to do it in all your intermediate steps (filters, mappers, ...). This makes the Stream based development easier and less intrusive in your code enabling more composability of stream subparts.

 

From the same author:

In the same category: