JavaEE Concurrency Utilities: propagate MDC easily!
Applications are more and more "reactive" and even if orthogonal with asynchronism it is often mixed together to have an optimized application. What does it mean? That you go through multiple thread for a single request more and more often.
A lot of frameworks are relying on MDC (or ThreadLocal values), in such a context how can the multi-threads usage (take care it doesn't mean multithreading only there) be smooth?
Since JavaEE 7 you have EE Concurrency utilities API and it comes with a nice solution for that.
Overall need
Before digging into EE, let's see how we want to solve that.
Concretely a MDC (often used in logging frameworks) or ThreadLocal is bound to one thread (take care to InheritedThreadLocal implementations which are a secure way to guarantee you will leak at some point).
A MDC is globally a value so what we need is to retrieve this value and when switching of thread, reinitialize the MDC to the captured value and reset it once our task is done.
Let's assume we have that simplified MDC implementation:
import java.util.HashMap;
import java.util.Map;
public class MDC {
private static final ThreadLocal<Value> VALUE = new ThreadLocal<>();
//
// lifeycle
//
public static void init(final Value init) {
VALUE.set(init);
}
public static void reset() {
VALUE.remove();
}
//
// accessors
//
public static <T> T get(final String key, final Class<T> type) {
return type.cast(VALUE.get().values.get(key));
}
public static <T> T set(final String key, final T value) {
return (T) VALUE.get().values.put(key, value);
}
//
// represents the value, can be anything
//
public static class Value {
private final Map<String, Object> values = new HashMap<>();
// often some helper methods, skipped for the sample
}
}
We clearly identify multiple parts in that MDC:
- lifecycle methods (initialize and reset the thread local). These methods are most of the time provided by your framework (logback for instance) and often init() is transparent.
- accessor methods are the way you modify and read the values (so the main goal of such API)
In term of propagation here is what we would like to do:
public void runPropagating() {
Value state = MDC.getSomehowCurrentState();
executor.submit(() -> {
MDC.init(state);
try {
// do the actual job
} finally {
MDC.reset();
}
});
}
This simple code will propagate the MDC in the task executed in another thread (executor is just an ExecutorService/thread pool).
Most of modern MDC will propose such "state" method. Only thing to take care is to ensure you have it immutable to avoid concurrency issues, in our case it could be:
public class MDC {
// skipping code not changing
public static Value state() {
return VALUE.get().copy();
}
public static class Value {
private final Map<String, Object> values = new HashMap<>();
private Value copy() {
return new HashMap<>(values);
}
}
}
This code works but is not that elegant. Let's see how EE concurrency utilities makes it a bit more elegant.
EE Concurrency Utilities to the rescue
If you analyze previous code, the only pitfall is to require some code in the submitted task. EE concurrency utilities provide a kind on interceptor for the task submitted to a ManagedExecutorService: ManagedTaskListener.
This listener will provide you the hook when the task is started and the one with the task is stopped.
Here is a proposal for our MDC issue:
public class MDCManagedTaskListener {
public MDCManagedTaskListener() {
value = MDC.state();
}
@Override
public void taskStarting(Future<?> future,
ManagedExecutorService executor,
Object task) {
MDC.init(value);
}
@Override
public void taskDone(Future<?> future,
ManagedExecutorService executor,
Object task, Throwable exception) {
MDC.reset();
}
@Override
public void taskSubmitted(Future<?> future,
ManagedExecutorService executor,
Object task) {
// no-op
}
@Override
public void taskAborted(Future<?> future,
ManagedExecutorService executor,
Object task, Throwable exception) {
// no-op
}
}
Starting and done hooks gives you exactly the wrapping try/finally blocking of the previous snippets - even if the execution fails - which is a perfect fit for out MDC problem. The only trick of this code is the constructor which captures the MDC value to propagate in the caller thread (you can think to an implementation where you pass the value you want to propagate to the constructor too).
To summarize: the constructor is executed first and in the caller thread with the initial MDC context and the starting/done hooks are executed in the ManagedExecutorService thread around the task.
Now we have our listener how to bind it to our task? The specification provides some factory methods making it trivial to bind a Runnable or Callable to a ManagedTaskListener through ManagedExecutors:
Callable<?> task = ManagedExecutors.managedTask(
() -> {/*whatever task you need*/},
new MDCManagedTaskListener())
Then just submit this task. Here what the service code can look like:
import javax.annotation.Resource;
import javax.enterprise.concurrent.ManagedExecutorService;
import javax.enterprise.concurrent.ManagedExecutors;
import javax.enterprise.context.ApplicationScoped;
import java.util.Date;
import java.util.concurrent.Future;
@ApplicationScoped
public class PropagatingService {
@Resource
private ManagedExecutorService executor;
public Future<String> sendAndPropagate() {
MDC.set("test", "something important");
return executor.submit(ManagedExecutors.managedTask(
() -> {/*your business can use MDC.get("test", String.class)*/},
new MDCManagedTaskListener()));
}
}
If you want it to be even more automatic you can proxy the ManagedExecutorService to make this wrapping automatic and just use @Inject ExecutorService instead of the @Resource but that's not always needed.
Conclusion
Even if perfectly doable without EE concurrency utilities, relying on the ManagedTaskListener in a EE application can make very smooth the propagation of some MDC context of your stack. It allows you to limit the custom code to its minimum which is a pure gain for business applications.
From the same author:
In the same category: