Reactive WebSockets: when javax means RxJava!
JavaEE 7 introduces javax.websocket API which enables to use this technology for modern applications but how to make it really reactive and giving the best of themself? Let's see few options!
Aren't javax.websockets already reactive?
If you check the server side API you have two choices: programmatic or declarative (annotations) API. In both case you have hooks to react to websockets events:
// not a real interface but shares the main hooks
public class MyEndpoint extends Endpoint implements MessageHandler.Whole<String> {
onOpen(session, config);
onMessage(message);
onClose(session, reason);
onError(session, exception);
}
The client side is pretty close excepted it doesn't provide the onClose() hook (sadly).
But wait, this post is about making them reactive but it is really close to the Subscriber definition of reactive stream API. So what's wrong there?
Being reactive is great and, in this example, producing values through events is a good start but if you can't consume the events in an efficient manner it is quite pointless.
What is missing?
The whole point of RxJava - and actually where it differs from this reactive-streams phylosophy - is to make reactive programming smooth and useful. Technically the implementation is not always elegant but the direct consequence is to enable the end user to define a full processing chain from the consuming part to the post flow processing without forgetting the transformation and composition parts. Concretely you can write:
observable
.doOnSubscribe(() -> {/*init*/})
.doOnNext(message -> {/*processing*/})
.doOnCompleted(() -> {/*destroy*/})
.doOnError(exception -> {/*error handling*/});
By itself it doesn't change much what we had in websocket API but if you add all the helpers RxJava provides like composition of Observables, retry, back pressure etc...it just makes the stream built-in in an API and avoids you to reinvent the wheel or reimplement it - with the bug risks or performance responsability - yourself. It also enables you to do it in a single place making it easy to understand your data flow and error handling (which doesn't mean you will define all your logic there but you ill orchestrate it there, a bit like camel DSL if you know it).
Want to see what I'm speaking about? Here a more complex example enabled by RxJava:
Observable<String> source1 = ...;
Observable<Integer> source2 = ...;
source1.retry(3) // accepts 3 errors
.onBackpressureBuffer(2048) // producing is temp. faster than consuming
.doOnNext(item -> {/**/})
.doOnError(oops -> {/*buffering and retries failed*/})
.map(item -> {/*convert to another type*/ return item;})
.filter(item -> {/*remove undesired items*/ return !item.isEmpty();})
// merge both sources
.zipWith(source2, (s, integer) -> s + integer)
.doOnNext(item -> {/*process the merge*/})
This example has two distincts data sources with their own producers (hidden in "..."), first source has some pre processing and supports retries and back pressure cause it is a remote source and is merged with the second source - which can have the same kind of pipeline - to create a third source (zipWith).
If your server is fully reactive and you wrapped everything in Observables then you can compose them like source1 and source2 in previous example, synchronize them, etc...to ensure you process the data when available and that you don't wait for them (at least on the processing thread pool).
It changes the default paradigm but can really improve the scalability of your application and the error handling which is often badly done by default. With RxJava everything is avilable as a DSL and let you activate it easily.
Side note: this also means if you don't understand the small call you do you can completely break your flow like loading in memory a whole stream at some point so ensure to respect the rule "with great power comes great responsabilities" ;).
Wrapping javax.websocket in RxJava
String vs RxMessage
We'll wire the websocket messages to RxJava API. Of course we can do it blindly and pass a String or whatever we use but we can also pass a custom message. That's what we'll do because it allows to pass the message but also the Session which can be useful to send a message back and it also allows you to define shortcuts for commonly needed API like sending a String message which is verbose and requires to handle exceptions by default. Here a sample RxMessage respecting this idea:
import javax.websocket.Session;
public class RxMessage {
private final String message;
private final Session session;
RxMessage(final Session session, final String message) {
this.session = sesson;
this.message = message;
}
public String getMessage() {
return message;
}
public Session getSession() {
return session;
}
//
// some API shortcuts
//
public void send(final String s) {
try {
session.getBasicRemote().sendText(s);
} catch (final IOException e) {
throw new IllegalStateException(e);
}
}
}
Server side
There are multiple ways to do it on server side since there are multiple API available. I'll show you two ways to do it depending which kind of API you use but there are really tons of solutions.
Basis
Before digging on how to register our websocket let's see what we need:
- we want to create an Observable so we'll implement rx.Observable.OnSubscribe
- we'll wire all the java.websocket hooks to the RxJava ones
- we'll need a hook to define the flow since the API can be declarative and we'll not be able to handle the "from" part
Concretely it means we'll create a generic Endpoint (to get javax hooks) implementing OnSubscribe:
import rx.Observable;
import rx.Subscriber;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
public abstract class RxWebSocketEndpoint extends Endpoint implements Observable.OnSubscribe<RxMessage> {
private Subscriber<? super RxMessage> subscriber;
protected abstract void defineFlow(Session session, Observable<RxMessage> root);
@Override
public void onOpen(final Session session, final EndpointConfig config) {
defineFlow(session, Observable.create(this));
session.addMessageHandler(new Handler(this, session));
}
@Override
public void onClose(final Session session, final CloseReason closeReason) {
subscriber.onCompleted();
}
@Override
public void onError(final Session session, final Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void call(final Subscriber<? super RxMessage> subscriber) {
this.subscriber = subscriber;
}
private static class Handler implements MessageHandler.Whole<String> {
private final RxWebSocketEndpoint parent;
private final Session session;
private Handler(final RxWebSocketEndpoint subscriber, final Session session) {
this.parent = subscriber;
this.session = session;
}
@Override
public void onMessage(final String message) {
this.parent.subscriber.onNext(new RxMessage(session, message));
}
}
}
This abstract endpoint gets on the subscription a RxJava subscriber then wires all websocket hooks to it. The only trick there is to wire the onMessage hook which can use several types (Whole or Partial, and depending the decoder/encoder different message type/class). Here the implementation simply provides a Whole<String> but being a bit more generic is impossible. That said it should be good enough for web applications.
Declarative server endpoint
Once we have this generic endpoint writing a declarative endpoint is as easy as extending it:
import com.github.rmannibucau.websocket.rx.RxWebSocketEndpoint;
import rx.Observable;
import javax.enterprise.context.Dependent;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
@Dependent
@ServerEndpoint(value = "/test")
public class RxServerEndpoint extends RxWebSocketEndpoint {
@Override
public void defineFlow(final Session session, final Observable<RxMessage> root) {
root.doOnNext(next -> {/*...*/})
.subscribe();
}
}
Yes as simple as that:
- you declare your child with @ServerEndpoint binding at the same time its path (/test there)
- you define your flow in the defineFlow method designed for that purpose
- you don't forget to call subscribe() otherwise the endpoint will not get any subscriber (this part could be enhanced depending your need)
Programmatic registration
The programmatic registration needs the same information:
- websocket path
- flow definition
To gather that there are several options but let's take the easy one: fire a Registration event through CDI. For that we just need an event giving to the user a light API to define an endpoint by a path and flow. The event will store all definition enabling us to define the actual endpoints later. Here a proposal:
public class Registration {
private final String path;
private final Consumer<Observable<RxMessage>> configurer;
Registration(final String path, final Consumer<Observable<RxMessage>> configurer) {
this.path = path;
this.configurer = configurer;
}
// getters
}
public class Registrations {
private final Collection<Registration> registrations = new ArrayList<>();
public Registrations define(final String path, final Consumer<Observable<RxMessage>> consumer) {
registrations.add(new Registration(path, consumer));
return this;
}
}
So we can now collect all our "registrations", but how to ask the container to define this servlet? The easiest is to write a ServletContextListener and access the websocket container through the ServerContainer attribute of the ServletContext. We'll also fire our Registrations event at that moment allowing us to leverage CDI to collect definitions.
import rx.Observable;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import javax.websocket.DeploymentException;
import javax.websocket.Session;
import javax.websocket.server.ServerContainer;
import javax.websocket.server.ServerEndpointConfig;
@WebListener
public class DslRegistration implements ServletContextListener {
private ServerContainer container;
@Inject
private Event<Registrations> collectEvent;
@Override
public void contextInitialized(final ServletContextEvent sce) {
final Registrations inMemoryRegistrations = new Registrations();
collectEvent.fire(registrations);
container = ServerContainer.class.cast(sce.getServletContext()
.getAttribute("javax.websocket.server.ServerContainer"));
inMemoryRegistrations.getRegistrations().forEach(registration -> {
try {
container.addEndpoint(ServerEndpointConfig.Builder
.create(RxWebSocketEndpoint.class, registration.getPath())
.configurator(new ServerEndpointConfig.Configurator() {
@Override
public <T> T getEndpointInstance(final Class<T> clazz)
throws InstantiationException {
return RxWebSocketEndpoint.class == clazz ?
(T) new RxWebSocketEndpoint() {
@Override
protected void defineFlow(final Session session,
final Observable<RxMessage> root) {
registration.getFlowConfigurer().accept(root);
}
} : super.getEndpointInstance(clazz);
}
})
.build());
} catch (final DeploymentException e) {
throw new IllegalStateException(e);
}
});
}
@Override
public void contextDestroyed(final ServletContextEvent sce) {
// no-op
}
}
The trick there is to use a custom Configurator to implement our endpoint on the fly with our custom flow and not let the container do a newInstance() which would fail since RxWebSocketEndpoint is abstract.
From the user perspective you just do:
@Dependent
public class WsRegistrer {
public void register(@Observes final Registrations registrations) {
registrations.define(
"/test",
root -> root.doOnNext(next -> {/*...*/}).subscribe());
}
}
What about client side?
The client side is pretty close but has a drawback compared to server one: there is no close hook. To workaround that we'll poll the status of the session to know if it is still open or not. To ensure we don't create a lot of threads we'll use a client factory which will be responsible of that: triggering close event.
The factory will just use a schedule thread to check registered sessions.
// RxWebSocketClient is our websocket client wrapper, we'll define it later
public class Factory implements AutoCloseable {
private final ConcurrentMap<String, RxWebSocketClient> sessions = new ConcurrentHashMap<>();
private final Subscription eviction;
public Factory() {
eviction = Observable.timer(500, MILLISECONDS)
.doOnNext(l -> sessions.values().stream()
.filter(client -> !client.getSession().isOpen())
.forEach(client ->
ofNullable(sessions.get(client.getSession().getId()))
.ifPresent(RxWebSocketClient::close)))
.subscribe();
}
public RxWebSocketClient connect(final String uri) {
final RxWebSocketClient client = new RxWebSocketClient(uri, sessions::remove);
final Session session = client.getSession();
sessions.put(session.getId(), client);
return client;
}
@Override
public void close() {
eviction.unsubscribe();
}
}
To keep the code simple we reused Observable.timer but using a custom thread is as good as that solution there.
The thread just check all sessions of the created client and for the not opened anymore ones it calls close() on the client which fires the completed event.
For that to work we just need to create client by the factory and keep track of tham and this is the purpose of connect() method.
Then for other events the client websocket API allows you to bind an Endpoint to a Session so it is pretty much the same as server side API (of course since events are the same). It means we can reuse our abstract endpoint:
import rx.Observable;
import rx.Subscription;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
import javax.websocket.Session;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class RxWebSocketClient extends RxApiAdapter implements AutoCloseable {
private final Session session; // +getter
private final Consumer<String> onClose;
private Observable<RxMessage> observable; // +getter
private RxWebSocketEndpoint endpoint;
private RxWebSocketClient(final String uri, final Consumer<String> onClose) {
this.onClose = onClose;
try {
session = ContainerProvider.getWebSocketContainer()
.connectToServer(new RxWebSocketEndpoint() {
{
endpoint = this;
}
@Override
protected void defineFlow(final Session session,
final Observable<RxMessage> root) {
observable = root;
}
},
ClientEndpointConfig.Builder.create()/*can get config*/.build(),
URI.create(uri));
} catch (final DeploymentException | IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public synchronized void close() {
if (endpoint != null) {
try {
endpoint.onClose(
session,
new CloseReason(CloseReason.CloseCodes.GOING_AWAY, ""));
} finally {
onClose.accept(session.getId());
endpoint = null;
}
}
if (!session.isOpen()) {
return;
}
try {
session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, ""));
} catch (final IOException e) {
throw new IllegalStateException(e);
}
}
}
No big trick there excepted:
- close can be manual (when the client quit the server and not the opposite - depends you protocol) or automatic through the factory
- we use the WebSocketContainer to connect our endpoint implementation and here we don't register the flow directly - we could - but just capture the Observable to expose it and let the user handle it in a more natural manner
In term of end user API it looks like:
try (final RxWebSocketClient.Factory factory = new RxWebSocketClient.Factory()) {
final RxWebSocketClient client = factory.connect("ws://localhost:8080/test");
client.getObservable()
// "normal" pipeline
.doOnSubscribe(() -> { // when connected send a message
try {
client.getSession().getBasicRemote().sendText("client is there");
} catch (final IOException e) {
throw new IllegalStateException(e);
}
})
.doOnNext(next -> {/*..*/})
.subscribe();
// do the work you need with the websocket
// in practise the factory will be created and closed
// in @PostConstruct/@PreDestroy
// of an @ApplicationScoped bean
}
Client side is nicer there cause you can extract the Observable from any registration process and use it "as usual". Composition is also easier.
Conclusion
Before concluding this post a small warning to say that the code of this post is not complete and miss several configuration to make it readable and that a websocket registration in a generic manner has several more options like encoder/decoders, subprotocols, ...
However we made it, with this code you can define a Rx flow from your websockets. The programmatic approach being the most interesting since you will define your flow outside the websocket itself with all the error handling, retry etc...
Also note that even if the client side is not as good as the server one, half of the cases will use browser websockets and not java websockets so you can rely on the excellent RxJs which already wraps browser websockets for you!
From the same author:
In the same category: