Just after the CDI SE API, one of the most requested feature of CDI 2 was the asynchronous event support. However, in that work, the specification also affected a bit the synchronous case. Let's go through some of these changes in this post.

Reminder: sending an event in CDI 1

In CDI 1 you had two options to send events:

  • from an Event<X> injection
public class EventSender {
    @Inject
    private Event<MyEvent> myEvent;

    public void fire(MyEvent event) {
        myEvent.fire(event);
    }
}
  • from the BeanManager
public class EventSender {
    @Inject
    private BeanManager beanManager;

    public void fire(MyEvent event) {
        beanManager.fireEvent(event);
    }
}

BeanManager API change

The last option (from the BeanManager), was working well but since we now have synchronous/asynchronous the API needs to evolve to support this new metadata (is the event synchronous or not).

One clever idea to not add a fireAsync was to make both API converge to a single one. To make it working BeanManager now gets a getEvent() method which returns an Event<Object> which can now (in CDI 2) resolve another instance of Event to select the event type (exactly like Instance<?> can do it).

In practise a synchronous sending using the BeanManager looks like:

public class EventSender {
    @Inject
    private BeanManager beanManager;

    public void fire(MyEvent event) {
        beanManager.getEvent().select(MyEvent.class).fire(event);
    }
}

From that point of the post I'll only speak of Event API since both converged but keep in mind you can access it from a direct injection or the BeanManager if you write an extension.

Sending asynchronous events

Sending an asynchronous event passes by the fireAsync() method of the Event:

CompletableStage<MyEvent> sent = event.fireAsync(new MyEvent());

It also has a flavor where you can pass NotificationOptions which will mainly allow you to pass the Executor to use to execute the asynchronous observers:

CompletableStage<MyEvent> sent = event.fireAsync(
    new MyEvent(),
    NotificationOptions.ofExecutor(executor));

It is important to specify the executor to use if you care about the runtime of this bus because the default is unspecified and often fallback on the standalone common ForkJoin pool which has the pitfalls to not define if the eventing is actually asynchronous (can be synchronous!) and the control the pool (size, eviction etc...) is global to the JVM which is never good if you intend to scale.

On the same spirit, if you use asynchronous events for a reactive implementation you will likely need to configure multiple pools per usages to avoid to lock (it is easy to look a single pool if you submit different kind of tasks linked between them by a dependency relationship and you lock in an end task waiting for all tasks like in a batch or rest service).

Concretely my recommandation is to prevent the usage of fireAsync(Event) by any build tool and ensure to always pass a NotificationOptions.

Side note: the NotificationOptions can also just take properties but this is undefined so you are not sure which Executor you will get so not yet ready to use (maybe later some default properties like the core size etc will be standard).

If you took care CDI broke its loose coupling here since the sender needs to know how the receiver is written. If you don't want to loose this feature don't forget to send twice your event:

public void send(MyEvent myEvent) {
    event.fire(myEvent);
    event.fireAsync(myEvent, NotificationOptions.ofExecutor(executor));
}

Listening for events

On the synchronous side, listening for an event still looks like:

public void onEvent(@Observes final MyEvent e) {
    System.out.println("Event: " + e);
}

But on the asynchronous side you use the new @ObservesAsync annotation:

public void onAsyncEvent(@ObservesAsync final MyEvent e) {
    System.out.println(e);
}

Asynchronous event: no CompletionFuture?

When starting to use asynchronous events you will get confused to get a CompletionStage instead of a CompletionFuture.

Debugging you will always get a CompletionFuture implementation so it will be tempting to cast it but resist to the temptation. toCompletionFuture() is here to do the conversion itself:

MyEvent event = fireAsync()
    .toCompletableFuture()
    .get();

Note however the need of such a conversion is often limited to tests and more rarely to the main runtime where the synchronization will not be done this way.

The CompletionStage being mainly the chain handling of the CompletionFuture without the Future (synchronous) part you still get the power of that API and ability to chain processing.

The important thing to keep in mind here is the processing of the event will be done after all the observers got invoked and observers invocation is not sequential so if you mutate your event you need to be thread safe.

Here is an example using the event to accumulate data to send to a 3rd party system:

  • we assume we have N observers (let say an audit one, a logging one and a processing one) with this pattern:
public void addAudit(@ObservesAsync final Result e) {
    try {
        doProcessing();
        e.addMessage("added audit trace");
    } catch (final RuntimeExceptino re) {
        e.addError("failed to add audit trace: " + re.getMessage());
    }
}
  • we then get back the event which is our JAX-RS payload:
public class Result { // + getters
    private final Collection<String> messages = new ArrayList<>();
    private final Collection<String> errors = new ArrayList<>();

    public synchronized void addMessage(final String message) {
        messages.add(message);
    }

    public synchronized void addError(final String message) {
        errors.add(message);
    }
}
  • then our JAX-RS endpoint can look like:
@ApplicationScoped
public class Endpoint {
    @Inject
    private Event<Result> resultEvent;

    @Inject
    @Named("restExecutor")
    private Executor executor;

    @GET
    public void get(@Suspended final AsyncResponse response) {
        resultEvent.fireAsync(new Result(), NotificationOptions.ofExecutor(executor))
                .thenAccept(response::resume);
    }
}

Event ordering: a feature reserved to synchronous events

If you add @Priority on a synchronous observer you can sort the processing:

public void onEvent(@Observes @Priority(2500) MyEvent e) {
    System.out.println(e);
}

They will be sorted by priority, the default being 2500. If you want to pass the result of a processing in the event and audit it in another observer then you will need a priority higher than 2500 but if you want ro sanitize an event mutating its state you will need to use a lower priority.

This feature will not work with @ObservesAsync because asynchronous observers are not sequentially executed. This can sound like a blocker but if you think about it, if you want to sort observers in an async flow you can still fire another synchronous event and just sort this one.

Conclusion

Concretely it means that the asynchronous feature of CDI gives you a great power but requires some carefulness and thoughts:

  • never forget the pool/executor you want to use. Direct implication is you will also decide how you handle the awaiting of the event dispatching. A default simple strategy is to way a (configured) timeout when shutting down the executor. A more advanced one is to handle graceful shutdown waiting for all receptions of the events but is harder to implement and doesn't always give better results.
  • don't use it for ordering but prefer a standard chain or synchronous event or sub-event for such a need
  • the sender decides which kind of observer is triggered so if you want both ensure to send twice your event

From the same author:

In the same category: