Apache Beam provides an abstraction for BIg Data environments. In nowdays IT world, it is a very high value to be able to change of platform without having to redevelop most of the business (Batches or Streams). This is the most costly part since the environment by itsef is mainly reusing some open source product or buying a solution, but your own business is what costs and what is risky to modify after some time when the knowledge is no more fresh. In that context using Beam is a good solutio, however you can need to extend some of its connectivity because the project is still just providing the bare minimum.

In this context we will see how to implement very quickly a WebSocket output which will send all the records arriving at the end of the pipeline (execution "flow") to a websocket server.

This can sound a showcase example but actually websocket is a very efficient solution in a lot of cases because:

  • It provides a connected solution (i.e. you don't reconnect to the server per message) so it is generally faster than using plain HTTP,
  • It is (now) supported in most servers so it provides a simple and generic way to connect some Big Data flow with some external system (like a real time web application).

If you never implemented or looked a Beam connector code here are the high level steps you need to go through to implement an output:

  • PTransform will be responsible to integrate the connector into the pipeline. In other words it will create a subpipeline and merge it with current pipeline,
  • An output can be either a WriteOperation or a simple Fn. The first option is becoming deprecated and requires a lot of boilerplate so we will just use a DoFn which is basically an instance with a defined lifecycle in the runner (execution implementation more exactly).

The WebSocket PTransform

Our PTransform will be quite simple: take the pipeline which will represent the whole processing the user wants and append it the sending of all records (called elements in Beam semantic) over a websocket.

Before being able to code it we need to know which configuration we need. To stay simple we will limit it to the following entries:

  • An uri representing the server to connect to (it can be a load balancer),
  • A flag to say if we want to ignore or not the errors (if we don't ignore them we can make the bundle fail, which is a group of records for the runner, and will generally be reprocessed),
  • The idle timeout for the connection (when to disconnect the websocket connection if nothing happens),
  • An optional ClientEndpointConfig which is a way to configure websocket subprotocols, extensions and configurator (which can be used to handle the websocket security or tracing),
  • And finally an optional Sender, which will be a custom API to handle the send of the data, you can see it as the way to send the incoming record (as TEXT, BYTES or OBJECTS).

Here is the overall structure for our transform:

public class WebSocketOutput<T> extends PTransform<PCollection<T>, PDone> {
    private final URI uri;
    private int errorTolerance;
    private long timeout;
    private ValueProvider<ClientEndpointConfig> clientEndpointConfiguration;
    private Sender sender;

    public WebSocketOutput(final URI uri) {
        this.uri = uri;
    }

    @Override
    public PDone expand(final PCollection<T> input) {
        input.apply(ParDo.of(new Fn<>(uri, errorTolerance, timeout, clientEndpointConfiguration, sender)));
        return PDone.in(input.getPipeline());
    }

    // Fn definition, see next part

    //
    // fluent setters
    //

    public WebSocketOutput<T> setErrorTolerance(final int errorTolerance) {
        this.errorTolerance = errorTolerance;
        return this;
    }

    public WebSocketOutput<T> setTimeout(final long timeout) {
        this.timeout = timeout;
        return this;
    }

    public WebSocketOutput<T> setSender(final Sender sender) {
        this.sender = sender;
        return this;
    }

    public WebSocketOutput<T> setClientEndpointConfiguration(final ValueProvider<ClientEndpointConfig> clientEndpointConfiguration) {
        this.clientEndpointConfiguration = clientEndpointConfiguration;
        return this;
    }
}
  1. Our PTransform will read a PCollection of T, which means a collection of records will inflow into the steps we will append to the pipeline, and our output will be a PDone, which means we mark the end of the current branch of the pipeline.
  2. The expand() method is the one responsible of appending the output (websocket) to the incoming pipeline. Since we will use a DoFn, we just integrate it with the ParDo.of() utility. Finally, since we consider the pipeline done after this step we return a PDone built from its in() utility.
  3. The last part of the class is just composed of all the fluent setters for optional parameters. It allows to keep a nice API for the programmatic usage but still be compatible with configuration so not a bad compromise.

Now we have a transform we can add to a pipeline, let's implement the actual logic of the transform: our DoFn.

The WebSocket DoFn

A DoFn is an element of the pipeline which takes some input(s), a record in our case, and optionally output some values, nothing for our case.

The DoFn defines the following lifecycle:

  • Setup: called once at the beginning of the usage of the fn in the pipeline,
  • Teardown: symmetric step of the Setup, it is called once before the fn instance is no more used at all,
  • StartBundle: when a bundle starts this step is called. A bundle is a group of records. Its size depends the environment and runner,
  • FinishBundle: this step is called when the bundle is considered completed by the runner, very high level it is a kind of commit step but without the classical semantic associated (retries etc, all that depends of the runner),
  • ProcessElement: this step is called for each record of the incoming collection.

For a WebSocket output, we need to:

  • Initialize the connection: we will do it lazily when a record arrives to support reconnection. Assume you have an idle timeout of 1mn and you get one record per 5mn then you need to reconnect. The simplest implementation of that will be to just check the connection when a record arrives. So we will do it in the process element hook. In terms of complexity, it is just a few null checks and a reference check once obtains so not an issue.
  • Send correctly the record to the server over the connection. This means detecting if we use a sendText()sendBinary(), or sendObject(). This is the responsability of the Sender enum we saw in the transform. The user can specify it but we will also be able to detect it automatically from the first record. It means we do it in the process element hook too (it is a null check at the end so not a big deal).
  • Count the errors to be able to make the bundle fail if we have some. To implement that, we need to reset a counter when a bundle starts and check it when the bundle finishes and throw an exception if we don't respect the condition.
  • Send the records and be able to wait for them before a bundle ends. For that we will call the sender in the process element step and store a way to know if it happens or not we will check in the finish bundle phase.
  • Ensure the connection is disconnected when no more needed. We could do it per bundle but since there is no real reason to disconnect to reconnect immediately we will just close the connection in the tear down phase if needed.

Pick a WebSocket client implementation

To limit the dependency stack and ensure we run in most environment without any conflict the best choice is to use the JSR256 which is just one small jar and then the user can bind the implementation of its choice (tomcat, jetty, undertow, ....).

Count errors and ensure record was sent

The JSR356 provides two way to handle the send of the records:

  • send and return a Future
  • send with a completion callback

The first option is quite classical but will require to bufferize all futures per bundle. Not a big deal...until a bundle is the whole dataset which can be huge and lead to an out of memory. Since the bundle size depends the runner we can't use that for a generic and portable implementation. Luckily the callback flavor allows us to have all we need.

The callback is named SendHandler and requires to implement the method onResult(SendResult). The SendResult parameter will give us if the record was sent successfully or not. This method also gives us a callback to remove the record "wait" logic, it is like removing the future from the pending list of acknowledgements.

To handle that we will manage in our fn a list of Cleaner which will be the name of our implementation of handler and a counter of errors. The cleaner will add itself in the list when created and when the callback is triggered, we will remove it and increment the error counter if needed. Finally we will also add a CountDownLatch to be able to wait for the trigger to be called in our finish bundle phase:

// private cause implemented as an inner class of the transform next to the fn
private static class Cleaner implements SendHandler {
    private final Collection<Cleaner> cleaners;
    private final AtomicInteger errors;
    private final CountDownLatch latch = new CountDownLatch(1);

    private Cleaner(final Collection<Cleaner> cleaners, final AtomicInteger errors) {
        this.cleaners = cleaners;
        this.errors = errors;
        synchronized (this.cleaners) {
            cleaners.add(this);
        }
    }

    @Override
    public void onResult(final SendResult result) {
        latch.countDown();
        synchronized (cleaners) {
            cleaners.remove(this);
        }
        if (!result.isOK()) {
            errors.incrementAndGet();
        }
    }

    private void await() {
        try {
            latch.await();
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

The Sender implementation: TEXT, BYTE_BUFFER, BYTE_ARRAY and OBJECT

The sender just provides the casting of the record to the right type to use the right send method of the Async API of the JSR356:

// still an inner class
private enum Sender {
    TEXT {
        @Override
        void send(final RemoteEndpoint.Async async, final Object element, final Cleaner cleaner) {
            async.sendText(element.toString(), cleaner);
        }
    }, BYTE_ARRAY {
        @Override
        void send(final RemoteEndpoint.Async async, final Object element, final Cleaner cleaner) {
            BYTE_BUFFER.send(async, ByteBuffer.wrap(byte[].class.cast(element)), cleaner);
        }
    }, BYTE_BUFFER {
        @Override
        void send(final RemoteEndpoint.Async async, final Object element, final Cleaner cleaner) {
            async.sendBinary(ByteBuffer.class.cast(element), cleaner);
        }
    }, OBJECT {
        @Override
        void send(final RemoteEndpoint.Async async, final Object element, final Cleaner cleaner) {
            async.sendObject(element, cleaner);
        }
    };

    abstract void send(RemoteEndpoint.Async async, final Object element, Cleaner cleaner);
}

The WebSocket DoFn itself

The DoFn will do exactly what we talked about earlier. The only two points to take care are:

  • Use the Async API to send the record (the sync one wouldn't be fast enough since it would add latency for each single record instead of considering a bundle as a batch). Note that you can add a configuration to force a flush if the list of cleaner becomes too big too.
  • Get lazily the Session (which represents the connection in the JSR356) with an "empty" Endpoint since we don't need anything else that being able to send records which is already provided by the Session.
private static class Fn<T> extends DoFn<T, Void> {
    private final int errorTolerance;
    private final URI uri;
    private final long timeout;
    private final ValueProvider<ClientEndpointConfig> clientEndpointConfiguration;
    private Sender sender;

    private Session session;
    private RemoteEndpoint.Async async;
    private final Collection<Cleaner> cleaners = new ArrayList<>();
    private final AtomicInteger errors = new AtomicInteger();

    private Fn(final URI uri, final int errorTolerance, final long timeout,
               final ValueProvider<ClientEndpointConfig> clientEndpointConfiguration, final Sender sender) {
        this.uri = uri;
        this.errorTolerance = errorTolerance;
        this.timeout = timeout;
        this.clientEndpointConfiguration = clientEndpointConfiguration;
        this.sender = sender;
    }

    @StartBundle
    public void onSetup() {
        errors.set(0);
        cleaners.clear();
    }

    @ProcessElement
    public void onElement(final ProcessContext context) {
        final T element = context.element();
        getSender(element).send(getAsync(), element, new Cleaner(cleaners, errors));
    }

    @FinishBundle
    public void onFinishBundle() {
        // flush all pending send
        synchronized (cleaners) {
            cleaners.forEach(Cleaner::await);
        }
        if (errorTolerance > 0 && errors.get() > 0) {
            throw new IllegalStateException("Error(s) sending messages: #" + errors.get());
        }
    }

    @Teardown
    public void onTearDown() {
        ofNullable(session).filter(Session::isOpen).ifPresent(s -> {
            try {
                s.close(new CloseReason(CloseReason.CloseCodes.GOING_AWAY, "DoFn teardown"));
            } catch (final IOException e) {
                throw new IllegalStateException(e);
            }
        });
    }

    private Sender getSender(final T element) {
        if (sender == null) { // assume all elements will match the same case, will likely be the case
            if (CharSequence.class.isInstance(element)) {
                sender = Sender.TEXT;
            } else if (ByteBuffer.class.isInstance(element)) {
                sender = Sender.BYTE_BUFFER;
            } else if (byte[].class.isInstance(element)) {
                sender = Sender.BYTE_ARRAY;
            } else { // default
                sender = Sender.OBJECT;
            }
        }
        return sender;
    }

    private RemoteEndpoint.Async getAsync() {
        if (session == null || async == null || !session.isOpen()) {
            try {
                final ClientEndpointConfig config = ofNullable(clientEndpointConfiguration)
                        .map(ValueProvider::get)
                        .orElseGet(() -> ClientEndpointConfig.Builder.create().build());
                session = ContainerProvider.getWebSocketContainer()
                        .connectToServer(new Endpoint() {
                            @Override
                            public void onOpen(final Session session, final EndpointConfig config) {
                                // no-op, we just want a session
                            }
                        }, config, uri);
                session.setMaxIdleTimeout(timeout);
                async = session.getAsyncRemote();
            } catch (final DeploymentException | IOException e) {
                throw new IllegalStateException(e);
            }
        }
        return async;
    }
}

Use the Beam WebSocket output

Now we have our output we just need to add it in any pipeline:

pipeline
  .apply(....) // read from a database for instance
  .apply(....) // convert records to anything
  .apply(new WebSocketOutput<>(URI.create("ws://remote.server.com/endpoint")));

// finally run it
pipeline.run();

Tip: if you don't care about providing a primitive transform you can directly use the ParDo one and just write a DoFn, it will save you from one class:

pipeline
  .apply(....) // read from a database for instance
  .apply(....) // convert records to anything
  .apply(ParDo.of(new WebSocketOutputFn<>(URI.create("ws://remote.server.com/endpoint"))));

// finally run it
pipeline.run();

Bonus: test your Beam WebSocket output with Apache Meecrowave

To test our websocket we need an endpoint. We can reuse the JSR356 to implement a server endpoint. The server will need to:

  • Store the messages to ensure they are received (that's all we want at the end)
  • Store the close reason to ensure the DoFn closes the connection properly

Here is a simple endpoint:

@ServerEndpoint("/WebSocketOutputTest")
public static class Server {
    static final Collection<Server> INSTANCES = new ArrayList<>();

    final Collection<String> messages = new ArrayList<>();
    CloseReason reason;

    @OnOpen
    public void onOpen() {
        synchronized (INSTANCES) {
            INSTANCES.add(this);
        }
    }

    @OnMessage
    public void onMessage(final String message) {
        synchronized (messages) {
            messages.add(message);
        }
    }

    @OnClose
    public void onClose(final CloseReason reason) {
        this.reason = reason;
    }

    @Override
    public String toString() {
        return "Server{messages=" + messages + '}';
    }
}

Now we have an endpoint, we need to deploy it. The easiest is to import meecrowave-junit artifact and just use its integration with JUnit 4 (not the JUnit 5 one since Beam doesn't support it for now):

public class WebSocketOutputTest implements Serializable {
    @ClassRule
    public static final MeecrowaveRule SERVER = new MeecrowaveRule(new Meecrowave.Builder() {{
        randomHttpPort();
        setScanningExcludes("beam,avro,jsr,para,findbugs,xz");
    }}, "");

}

Normally you can pass no parameters to the rule but since beam imports some dependencies we don't want to scan we just change the scanning rules - other configurations are the defaults, i.e. a random port and the root context as deployment base.

Finally we write a pipeline creating some strings and sending it to our websocket output then assert all were received:

@Test(timeout = 60_000)
public void sendText() throws InterruptedException {
    final Set<String> texts = new HashSet<>(asList("Apache", "Beam", "&", "WebSocket", "Output"));
    pipeline.apply(Create.of(texts))
            .apply(new WebSocketOutput<>(URI.create("ws://localhost:" + SERVER.getConfiguration().getHttpPort() + "/WebSocketOutputTest")));
    pipeline.run().waitUntilFinish();

    Thread.sleep(1000); // waituntilfinish bug, fixed on master
    assertEquals(texts, Server.INSTANCES.stream().flatMap(s -> s.messages.stream()).collect(toSet()));
    assertEquals(singleton("DoFn teardown"), Server.INSTANCES.stream().map(s -> s.reason.getReasonPhrase()).collect(toSet()));
}

Note that the sleep is a workaround until Beam 2.5.0 is out, a better implementation - but more verbose for a blog post ;), would be to retry the asserts.

Conclusion

On one side, writing a Beam output is not that much of work, there is a central class (the DoFn) and then it is code to integrate with the backend you use (websocket for us).

On another side, the WebSocket technology is very simple to integrate in java land, in particular thanks to the JSR356.

Finally, using websockets is a very nice and simple way to make Big Data more interactive in your applications and notify your users of new informations available without having to have another batch to do it. What about adding some push notification after some computation in a big data cluster? We are not far from it!

Happy big data!

 

 

 

From the same author:

In the same category: