CDI interceptors and asynchronous operations
CDI interceptors are a great way to enrich some beans with more or less complicated logic without polluting the business code. Most common and used examples are related to security validation, transaction handling, logging/tracking, etc...
Here is how their implementation can be summarized:
@MyInterceptorBinding
@Interceptor
@Priority(Interceptor.Priority.APPLICATION)
public class MyInterceptor implements Serializable {
@AroundInvoke
public Object around(final InvocationContext context) throws Exception {
doSomethingBeforeInvocation();
try {
return context.proceed();
} finally {
doSomethingAfterInvocation();
}
}
}
This works great but has one issue: it assumes the context execution is synchronous which is more and more unlikely while we start to use in our code java 8 Streams and CompletableFutures.
Let's take the simple case of a logger: we want to log with the interceptor binding @Log that we are entering in a method and exiting this method logic. What is important here is the last part of that sentence "method logic". If you think of an asynchronous execution, you want to log the computation is done and not that you quit the method which would be quite immediate.
Let's assume we have a InterceptorBase class which abstract the before/after handling:
public abstract class InterceptorBase<T> implements Serializable {
protected abstract T before(InvocationContext invocationContext);
protected abstract void after(T state, Object result, Throwable exception);
@AroundInvoke
public Object around(final InvocationContext context) throws Exception {
final T state = before(context);
Throwable exception = null;
Object result = null;
try {
result = context.proceed();
return result;
} catch (final Throwable error) {
exception = error;
throw error;
} finally {
after(state, result, exception);
}
}
}
Then our logging implementation can look like:
@Log
@Interceptor
@Priority(Interceptor.Priority.APPLICATION)
public class LogInterceptor extends InterceptorBase<Logger> {
@Override
protected Logger before(final InvocationContext invocationContext) {
final Logger logger = Logger.getLogger(invocationContext.getMethod().toGenericString());
logger.info("Before " + invocationContext.getMethod().toGenericString());
return logger;
}
@Override
protected void after(final Logger state, final Object result, final Throwable exception) {
if (exception != null) {
state.log(Level.SEVERE, String.valueOf(result), exception);
} else {
state.info(String.valueOf(result));
}
}
}
This will work fine for a synchronous implementation but not an asynchronous one or even one returning a stream since it will not execute the stream flow but just prepare it.
To handle those cases you need to integrate with the returned value. And listen for the actual completion. For a Stream you can enrich it with a peek() which would transparently be called when executed or even wrap the stream itself in a custom implementation/proxy if you need to guarantee the execution in any case (peek can be bypassed if there is an error). To illustrate this post I'll use CompletionStage/CompletableFuture API which is surely easier to understand and more common in microservices architectures. Idea here is to add a step after the completion handling the success/error with our after() hook. For that we just check if the result is a CompletionStage and if so register a handle method and we return the enriched CompletionStage. This way the flow would be:
- use completion stage handling
- intercpetor after handling
- caller handling
It just exactly does what we want: execute code after the logic is done.
Here is on possible implementation:
public abstract class InterceptorBase<T> implements Serializable {
protected abstract T before(InvocationContext invocationContext);
protected abstract void after(T state, Object result, Throwable exception);
@AroundInvoke
public Object around(final InvocationContext context) throws Exception {
final T state = before(context);
boolean async = false;
Throwable exception = null;
Object returnValue = null;
try {
returnValue = context.proceed();
async = CompletionStage.class.isInstance(returnValue);
if (async) {
final CompletionStage<?> result = CompletionStage.class.cast(returnValue);
return result.handle((o, throwable) -> {
after(state, o, throwable);
return o;
});
}
return returnValue;
} catch (final Throwable error) {
exception = error;
throw error;
} finally {
if (!async) {
after(state, returnValue, exception);
}
}
}
}
There is not much tricks except the async flag to handle both synchronous methods and asynchronous methods (marked by returning a CompletionStage) but it opens a lot of features implementable with interceptors and numbers of integrations doable this standard way.
What is interesting here is to see that the new API don't break the old one and that we don't need to forget the way we were implementating implementations before the reactive/microservice erea. We just need to make it evolving a bit.
The last trick to handle with this pattern is how to replace ThreadLocal which were often abused by the past. This is a bit out of topic for this post but assuming you can decorate the thread pool you submit tasks to this is quite easy to do since you get the hook to get the state before changing of context/thread and to propagate it before running the submitted task. The tip I'll give in this post will be that Java EE, through its EE concurrency API, provides listeners enabling you to do it.
Happy asynchronism ;).
From the same author:
In the same category: