JAX-RS 2.1 will introduce Server Sent Events support but do you know you can already do it with JAX-RS 2.0?

Server Sent Events?

Server sent events is a solution to sent data from the server to the client (often a javascript one since it has built in support) but not the opposite. Now we have websocket what's the point? It uses plain HTTP so no issues with proxies!

The usage is globally to define with the client a protocol. The format is simple and can be advanced with message type etc but to keep this post simple we'll just use a plain list of messages (and assume javascript can detect which kind of message we send based on the format - it is not that rare actually).

Format

We need to send the Content-Type header with the value text/event-stream to start, then each line (message) will look like;

data:<the message>

Not obvious in previous snippet but you need two end of lines after the message (\n\n).

The message format is not part of the SSE format so it is common to use json:

data:{"command":"show-news","value":[{"title":"Java rocks", "content":"SSE are great"}]}

JAX-RS 2.0 implementation

Most of JAX-RS implementations will buffer the response to capture the status code, headers or do some other advanced stuff with the response. This doesn't help our SSE implementation cause we need to flush each message to send it to the client.

To solve this issue we need to use the response instance not wrapped by the JAX-RS implementation. For that multiple solutions:

  • Use a custom filter and set the request as attribute and retrieve it in your JAX-RS endpoint
  • Use some internals of your JAX-RS provider to get the original response.
  • ...

Wait, we'll use the HttpServletResponse so why using JAX-RS at all? Because it still gives us:

  • a nice routing (@Path)
  • serialization support (Providers)
  • parameter parsing (PathParameter, @QueryParam, @HeaderParam, ...)

Here is the skeleton of our implementation:

@GET
@Path("invoke/stream")
@Produces("text/event-stream")
public void invokeScenario(
        @Suspended final AsyncResponse asyncResponse,
        @Context final Providers providers,
        @Context final HttpServletRequest httpServletRequest) {

    // 1.
    final MultivaluedHashMap<String, Object> fakeHttpHeaders = new MultivaluedHashMap<>();
    final Annotation[] annotations = new Annotation[0];

    // 2.
    final MessageBodyWriter<Content> writerResponse = providers.getMessageBodyWriter(
                Content.class, Content.class,
                annotations, APPLICATION_JSON_TYPE);

    // 3.
    final HttpServletResponse httpServletResponse = HttpServletResponse.class.cast(httpServletRequest.getAttribute("original.response"));

    // 4.
    httpServletResponse.setHeader("Content-Type", "text/event-stream");
    try {
        httpServletResponse.flushBuffer();
    } catch (final IOException e) {
        throw new IllegalStateException(e);
    }

    // 5.
    final ServletOutputStream out;
    try {
        out = httpServletResponse.getOutputStream();
    } catch (final IOException e) {
        throw new IllegalStateException(e);
    }

    // 6.
    executorService.submit(() -> {
        try {
           try {
               Content content;
               // 7.
               while ((content = getNextContent()) != null) { // this block is the one to repeat for all messages
                   // 8.
                   out.write("data:".getBytes());
                   writerResponse.writeTo(content, Content.class, Content.class, annotations, APPLICATION_JSON_TYPE, fakeHttpHeaders, out);
                   out.write("\n\n".getBytes());
                   out.flush();
               }
           } catch (final IOException e) {
               throw new IllegalStateException(e);
            } finally {
                try { // 9.
                    asyncResponse.resume("");
                } catch (final RuntimeException re) {
                    // no-op: not that important
                }
            }
        });
    }

Comments on that code are:

  1. some values we could use as constants, just a technical part without any knowledge
  2. we extract the json mapper from jax-rs provider, this can be replaced by a new ObjectMapper() if you use jackson or new MapperBuilder().build() with johnzon - or jsonb if you already upgraded ;)
  3. we get back the not wrapped response instance
  4. we set the Content-Type to let the client know we will use SSE (could be done in thread 6.)
  5. we keep a reference on the output stream to avoid to do it each time (optional)
  6. we run the actual feed (stream) in a dedicated thread to not block http pool (this can use a ManagedExecutorService)
  7. this loop is responsible to retrieve events one by one, you can multithread this but ensure to synchronize on the out instance otherwise you will not write respecting the format on the output stream
  8. this block writes a single chunk/message
  9. we finally request to JAX-RS 2.0 to resume the request (= finish it)

Going further

This code works (assuming you fill the small abstraction like how to retrieve the feed) but can be enhance. What is happening if the client closes? In previous case you will likely get an exception and quit which is ok - even if a 500 is not the cleanest way. However if you multithread a long running task then you can get troubles. To avoid it, and since the client can't say "I'm leaving" with SSE, I recommand you to wrap the writing operation and catch IOExceptions and count them. After some count to define (2 or 3 should work not bad with multithreading) then you consider the client left and stop the processing with a flag you added (AtomicBoolean works very well for that).

The nice parts of that implementation are:

  • you reuse a lot of JAX-RS 2.0 - including the serialization - which avoids you to reinvent the wheel and keep the code small
  • it uses asynchronous request processing (AsyncResponse). This is not mandatory but makes long running task better handled for the HTTP container
  • It will work with JAX-RS 2.1 as well so will not require to rewrite the method when upgrading
  • We can still use JAX-RS!

This last point is important cause you probably notice the sample is not secured. It is common to see sample not on security not using any security but when you will want to secure this endpoint you will realize SSE requires GET and it is hard to pass header parameter portably in the browser (from EventSource). To solve that you can encode the security token and pass it as a query parameter. JAX-RS makes this very easy and its integration with CDI or Spring will makes the validation of the token quite trivial at the beggining of the method. Last tip on this topic: you can write a POST endpoint returning you a ciphered version of the security token the SSE endpoint can decipher before validating the token. This allows to not rely on a almost clear token in the url which would end up in the access logs. Instead of that you will get a ciphered token which is not usable without the cipher secret :).

From the same author:

In the same category: