Why wiring websocket events to an event bus

There are multiple reasons to do that but the most common one is to ensure you are transport independent. Concretely it will enable several things:

  1. Switch the transport layer (move from Websocket to HTTP, or to Kafka or to any other transport) keeping the same core code,

  2. Test more easily your "commands" - without requiring a websocket server in all tests,

  3. Reuse very easily your commands,

  4. Think and design/split your code more accurately without mixing concerns more naturally.

How to do it?

There are multiple technical solutions to do and this post will just go through one option, the goal being to give you some hints/pointers about how to do it.

1: define a protocol

The first step is to define a "command" protocol, you can reuse STOMP or, my preference, use a JSON based protocol like JSON-RPC. The important part being to be able to define a "request" object shape with an identifier representing the command to execute and a "response" shape defining if the call was successful or failed and attach the related data in its content.

Here is an example using JSON-RPC:

Request
{"jsonrpc":"2.0","method":"my-method","id":1,"params":{...}}
Response
{"jsonrpc":"2.0","id":1,"result":{...}}

If you want a protocol really generic, you can just define the request as a JSON value and the response as a JSON value too. This is the option we will take in this post but note that you can use a more structured protocol if needed which would enabled you to add transversal features later (like logging errors if you know how to inspect the response shape to detect it is an error or not).

With such request/command model, a java binding can use JSON-P JsonStructure directly. To also enable a command oriented pattern we will enforce the JSON-schema of the incoming object:

  • If it is an object, it must have an method attribute representing the command name (same as for JSON-RPC). Other attributes can be command specific.

  • If it is an array, each item must be an object matching the object schema (previous point).

the array case will be ignored from now on since it is mainly about doing a loop in the on message callback.

2: Define your websocket endpoint

There are multiple pattern to define the websocket endpoint, you can use a single endpoint "à la RPC" or multiple passing the command name in the url (/ws/<command id> for example).

Here we will use javax/jakarta API and just define a RPC like endpoint:

@Log
@ServerEndpoint("/ws/rpc")
public class CommandWebSocket {
    @OnOpen
    public void onOpen(final Session session) {
        logger.finest(() -> "Creating session # " + session.getId());
    }

    @OnClose
    public void onClose(final Session session, final CloseReason closeReason) {
        logger.finest(() -> "Closing session #" + session.getId() + " (" + closeReason.getCloseCode() + ", " + closeReason.getReasonPhrase() + ")");
    }

    @OnError
    public void onError(final Session session, final Throwable throwable) {
        logger.log(SEVERE, throwable, () -> "An error occurred on session #" + session.getId() + ": " + throwable.getMessage());
    }

    @OnMessage
    public void onMessage(final String command, final Session session) {
        logger.finest(() -> "Command received on session # " + session.getId() + ": " + command);
    }
}

This endpoint does nothing except logging the events coming from the websocket connection (Session) but at least we can connect to it.

3: Decode the command and encode the response

We decided - once again for this post - to use JsonStructure as request/response format so we will enable our @OnMessage to read it:

@OnMessage
public void onMessage(final JsonStructure command, final Session session) {
    logger.finest(() -> "Command received on session # " + session.getId() + ": " + command);
}

To do that we will set a decoder on our endpoint. You can implement your own - just ensure to reuse the same JsonReaderFactory for all endpoints if possible - or reuse Apache Johnzon one adding the dependency johnzon-websocket (optionally with classifire jakarta if you don’t use javax package):

@ApplicationScoped
@ServerEndpoint(value = "/ws/rpc", decoders = JsrStructureDecoder.class)
public class CommandWebSocket {
    // ...
}

Simply specifying JsrStructureDecoder enables us to get JsonStructure injected into our @OnMessage callback.

Similarly we can set an encoder if the application sends JsonStructure instances through the Session - using jakarta.websocket.RemoteEndpoint.Async#sendObject(java.lang.Object) for example. It will use the encoder to convert the JsonStructure to a JSON (and byte[]) sent over the wire:

@ApplicationScoped
@ServerEndpoint(
    value = "/ws/rpc",
    decoders = JsrStructureDecoder.class,
    encoders = JsrStructureEncoder.class)
public class CommandWebSocket {
    // ...
}
you can specify multiple decoders/encoders if relevant and/or you can use Apache Johnzon JsonbTextDecoder/JsonbTextEncoder to be able to use JSON-B to directly map on POJO (or even Java records with Johnzon).

4: Integrate with the event bus

Now we have a websocket endpoint, an input/output model and (de)serialization mecanism setup, we need to implement our processing (@OnMessage callback).

The idea will be to get a CDI Event instance to be able to send an event with the current command. There are several choices to do there:

  1. What is the type of event to send? JsonStructure works but it quite generic and could conflict with another part of the application so we will likely want to create a CommandEvent or equivalent solution,

  2. Does the CommandEvent exposes the Session - making this implementation less transport agnostic but more powerful - or does the event abstract it either by handling the response itself or after the bus propagation in the websocket endpoint,

  3. How does command handler selection is selected?

4.1: command event

Since our requests have a method attribute we can modelize our CommandEvent like that:

public record CommandEvent(
        String method, (1)
        JsonObject command) (2)
{}
1 The method extracted from the incoming JSON,
2 The raw incoming method to let the command handler read any additional information (params if using JSON-RPC style commands).
this example uses a record but a plain class would work too.

However this solution does not bring much compared to the JsonObject since it contains exactly the same information. To make it more CDI friendly, we will use a qualifier which will bind the method name:

@Qualifier
@Target(PARAMETER)
@Retention(RUNTIME)
public @interface Command {
    /**
     * @return the name of the command.
     */
    String value();

    /**
    * Just the literal usable by programmatic extension/code to represent {@code @Command("xxx")}.
    */
    class Literal extends AnnotationLiteral<Command> implements Command {
        private final String name;
        public Literal(final String name) {  this.name = name;  }
        @Override  public String value() {  return name;  }
    }
}

Now we can represent a command by:

  • Its payload (JsonObject),

  • Its qualifier (@Command("<name>")).

4.2: Get the event bus

If your endpoint is a CDI bean - or managed by your IoC - you can directly inject the event into your websocket endpoint:

@Dependent
@ServerEndpoint(value = "/ws/rpc", decoders = JohnzonTextDecoder.class, encoders = JohnzonTextEncoder.class)
public class CommandWebSocket {
    @Inject (1)
    private Event<JsonObject> commandEvent;

    @OnMessage
    public void onMessage(final JsonStruture request, final Session session) {
        switch (request.getValueType()) { (2)
            case OBJECT:
                commandEvent
                    .select(new Command.Literal(request.asJsonObject().getString("method"))) (3)
                    .fire(request); (4)
                break;
            case ARRAY:
                request.asJsonArray().forEach(it -> onMessage(it, session));
                break;
            default:
                onError(session, new IllegalAccessException("unsupported command: " + request));
        }
    }
}
1 In the managed case we can directly inject the event into the endpoint instance,
2 When we get the message we check its type and handle it accordingly,
3 We extract the command method and refine the default event (which is not qualified with @Command("…​")) to bind it to the command name,
4 Now we have an event type (JsonObject) associated to its name (@Command) we fire the event.
this snippet uses a synchronous fire which means observers will use @Observes in the rest of this post but using fireAsync enables to use @ObservesAsync and have an asynchronous event firing.

For the unmanaged case - i.e. you can’t inject the Event into endpoint instance, you can look it up with CDI.current():

@ServerEndpoint(value = "/ws/rpc", decoders = JohnzonTextDecoder.class, encoders = JohnzonTextEncoder.class)
public class CommandWebSocket {
    private Event<JsonObject> commandEvent;

    @OnOpen
    public void onOpen(final Session session) {
        commandEvent = CDI.current()
            .getBeanManager()
            .getEvent()
            .select(JsonObject.class);
    }

    // rest stays the same
}

4.3: Provide a way to the handler(s) to send back messages to the caller

This will work and enable beans to get the requests but not to send back a response. There are multiple options to solve that:

  • Provide the Session in the event payload which would move from JsonObject to a CommandEvent having a shape containing both the request and session`{command,session}`,

  • Abstract the Session in a CommandEvent adding a sendResponse(JsonValue),

  • Create a CommandEvent which can store one or multiple responses (depending the case you want to handle): class CommandEvent {command, setResponse(JsonStructure)} for example.

The last option seems the most accurate since it matches the request/response pattern we spoke about originally and keep the the transport decoupled.

So we create a CommandEvent:

@Getter
@RequiredArgsConstructor
public class CommandEvent {
    private final JsonObject command; (1)
    private JsonStructure response; (2)

    public synchronized void setResponse(JsonStructure response) { (3)
        if (this.response != null) { (4)
            throw new IllegalArgumentException("Response already set");
        }
        this.response = response;
    }
}
1 We propagate the command to let the handler read any needed data,
2 We enable the event to store the response,
3 We don’t know upfront if the event will not be used concurrently - passed from observers to other processors, so we must ensure response setter is synchronized or equivalent,
4 Since we can get multiple handlers for the same command - until you write a CDI extension checking it can’t happen, we prevent it in the setter.

Now we have a new event we just move our Event<> to this type:

@ServerEndpoint(value = "/ws/rpc", decoders = JohnzonTextDecoder.class, encoders = JohnzonTextEncoder.class)
public class CommandWebSocket {
    @Inject
    private Event<CommandEvent> commandEvent;

    @OnMessage
    public void onMessage(final JsonStruture request, final Session session) {
        commandEvent
            .select(new Command.Literal(request.asJsonObject().getString("method")))
            .fire(new CommandEvent(request.asJsonObject()));
    }
}

The last missing piece is to send back to the caller the response:

@OnMessage
public void onMessage(final JsonStruture request, final Session session) {
    final var event = new CommandEvent(request.asJsonObject());
    commandEvent
        .select(new Command.Literal(request.asJsonObject().getString("method")))
        .fire(event);
    if (event.getResponse() != null) { (1)
        try {
            session.getBasicRemote() (2)
                .sendObject(event.getResponse()); (3)
        } catch (final IOException | EncodeException e) {
            throw new IllegalStateException(e);
        }
    }
}
1 If there is a response only we will send it back,
2 We use basic (synchronous) remote API but using the async API would work too,
3 We send directly our response (JsonStructure) which will be mapped to the output stream using the encoder we set up earlier.
this is a synchronous implementation but it can become completely reactive using CompletionStage from Event#fireAsync and getAsyncRemote. The only trick to take care is to ensure to add to the CommandEvent a method addSynchronization(CompletionStage) which will enable to await for the end of the processing of the message. Here what it can look like in pseudo code:
@Getter
@RequiredArgsConstructor
public class CommandEvent {
    // same as before

    private Collection<CompletableFuture<?>> synchronizations = new ArrayList<>();

    public synchronized void registerSynchronization(CompletionStage<?> promise) {
        this.synchronizations.add(promise.toCompletableFuture());
    }
}
@OnMessage
public void onMessage(final JsonStruture request, final Session session) {
    commandEvent
        .select(new Command.Literal(request.asJsonObject().getString("method")))
        .fireAsync(new CommandEvent(request.asJsonObject()), notificationOptions)
        .thenCompose(event -> event.getSynchronizations().isEmpty() ?
            completedFuture(event) :
            // if there is any synchronization await them all then propagate back the event (with the response)
            CompletableFuture.allOf(event.getSynchronizations().toArray(new CompletableFuture[0]))
                .thenApply(ignored -> event))
        .thenAccept(event -> {
            if (event.getResponse() != null) {
                try {
                    session.getAyncRemote().sendObject(event.getResponse(), result -> {
                        if (!result.isOk()) {
                            throw new IllegalStateException(result.getException());
                        }
                    });
                } catch (final IOException | EncodeException e) {
                    throw new IllegalStateException(e);
                }
            }
        });
}
ensure to use a custom notificationOptions instance from a thread pool and not the default one which uses the ForkJoin common pool which almost never work in any real application.

5: implement handlers

Now the most interesting part: how to implement a handler of one command. With previous setup it is as easy as implementing an observer of a CommandEvent with the qualifier of the command name the handler handles:

@ApplicationScoped (1)
public class MyCommandHandler {
    public void onMyCommand(
            @Observes (2)
            @Command("my-command") (3)
            final CommandEvent event) (4)
    {
        event.setResponse(execute(event.getCommand())); (5)
    }
}
1 The bean must be a valid CDI bean with a scope available in the websocket context,
2 We used fire so we only support @Observes,
3 This method only handles my-command request,
4 The event (CommandEvent type is important to match the event firing side) which enables to get access to all request data.

In other words you can add as much method/classes you want to handle each commands. These observers are automatically called by the websocket fire part but can also be called programmaticlly directly which means they became super easy to test:

@Cdi
class MyCommandHandler {
    @Inject
    private MyCommandHandler handler;

    @Test
    void test() {
        final var event = new CommandEvent(
                Json.createObjectBuilder()/* add all needed attributes by the test */.build());
        handle.onMyCommand(event);
        assertNotNull(event.getResponse());
        assertEquals("xxx", event.getResponse().getString("value"));
    }
}
indeed it is good to also test the websocket setup is working but respecting the contract it is not needed for all handlers saving a lot of time and complexity in tests.

Conclusion

This post explored how to wire a websocket endpoint to CDI bus but it also went through multiple options to do it and some design points. Once it is set up it really opens a lot of doors and speeds up your development when working with websockets. It is really adapted to a lot of modern applications and API even if not used a lot yet. One tip is that you shouldn’t hesitate to enable JSON-B and not only JSON-P as in this post since you will then be able to use command specific objects quite easily.

Last trick is that this pattern also enables to use very easily all your commands through any transport as mentionned…​.even a command line interface (CLI) application so don’t hesitate to use it ;).

From the same author:

In the same category: