Messaging is more and more used in our applications. You probably saw Kafka or Pulsar popping up. But it has some challenges at dev and maintenance time so let see how we can integrate it in a CDI application.

The common challenges

The common challenges with messaging you can face are:

  • Testing: mocking Kafka is not always trivial even if testcontainers helps (at the cost of test execution),

  • Migration: this is not because you start with one broker than you will keep it (you started with Kafka because it is hype but you need some more transactional constraints so went back to JMS for example or your company bought Pulsar etc…​),

  • You can need to run with multiple brokers (if you deploy in multiple environments, AWS/Azure/…​, you can use the managed service and a different bus per env).

The solution(s)

The idea when you face such issues is generally to abstract the backend. Rephrased it means that when your application will read or write from/to a broker, it will actually do it through an application abstraction and then something in the application will handle it from/to the broker.

Microprofile Reactive Messaging

An example of such an implementation is Microprofile Reactive Messaging.

With this specification, when you read/write you actually use:

@Inject
@Channel("my-outgoing-topic")
private Emitter<String> emitter;

@Incoming("my-ingoing-topic")
public void consume(final String message) {
  emitter.send(process(message));
  emitter.complete();
}

The custom abstraction

Microprofile Reactive Messaging works but has some pitfalls you can desire to avoid as an application maintainer:

  • you can not always use it without efforts depending your stack (javax/jakarta migration can be an issue, using a custom container or environment - cloud - can be another one, …​)

  • you can not desire part of the (dependency) stack (reactivestreams which has now equivalent in the JRE, vertx-context or mutiny stacks, …​)

  • due to the past instability of Microprofile you can not desire to depend on it

  • there is not a variety of implementations (you will end up using Smallrye)

  • connectors (the implementations behind read/write primitives) can miss some configuration or integration you need

If you can’t use it, it does not mean you cannot abstract the primitives. There are a ton of solutions to do it - even doing something close to Microprofile Reactive Messaging - but one solution I like is to use the CDI bus event.

Long story short, you will design an In and an Out message and @Observes/fire it over the CDI event bus. This then means, that sending a message is firing an event and listening for incoming messages is observing an event:

@Inject
private Event<Out> sender;

public void onMessage(@Observes final In message) {
    sender.fire(process(message));
}

The In (resp. Out) messages contain the right abstraction to send to the underlying(s) backend(s). For example:

public record Out( (1)
        Integer partition, Long timestamp, (2)
        Map<String, byte[]> headers, Object key, Object value, (3)
        CompletableFuture<Void> completion) { (4)
}
1 The message can be a record or a plain class.
2 The kafka oriented metadata (partition, timestamp) are optional but are generally mappable on any backend (even JMS with some semantic extension),
3 The message itself is represented by headers, a key and a value - which can be typed if needed,
4 An optional CompletionFuture enables the observer (actual backend producer) to notify the caller when the message is ack.

The In message is more or less the same but completion can be implicit is the message is well processed for example.

What is important there is to have an abstraction which does not depend on your backend (no Kafka import for ex.).

if you handle multiple channels/topics, you must choose if you reuse the same message shapes (type) and make it specific with a @Qualifier (for example @Topic("name")) or if you type each message differently. Both options have pros and cons. We will prefer to use @Topic qualifier when the code stays quite technical but not too verbose and without more abstraction and a strong typing only (SendOrder/ReceivedOrder types) when the semantic is more important in the code.

At that stage we have a way to send and listen to messages but nothing plugged behind.

The job then, is to implement an @Observer for each type (or type+qualifier if you used @Topic qualifier) of sent message which will actually send it to the backend and for each incoming data, a backend consumer which will fire an event per message.

Here is a light example for kafka - some robustness should be added but it is out of scope of this post:

@ApplicationScoped
public class MyKafkaProducer {
    private Producer<JsonValue, JsonValue> producer;
    private String topic;

    @PostConstruct
    private void init() { (1)
        producer = createKafkaProducer(); // standard creation
        topic = getTopicName(); // from conf generally
    }

    public void onMessage(@Observes final Out message) { (2)
        final var record = new ProducerRecord<>(
            topic, message.partition(), message.timestamp(),
            message.key(), message.value());
        if (message.headers() != null) {
            message.headers().forEach(record.headers()::add);
        }
        if (message.completion() == null) { (3)
            producer.send(record);
        } else {
            producer.send(record, new NotifyOnCompletion(message.completion()));
        }
    }

    private record NotifyOnCompletion(CompletableFuture<OutgoingMessage.Completion> promise) implements Callback {
        @Override
        public void onCompletion(final RecordMetadata metadata, final Exception exception) {
            if (exception  != null) {
                promise.completeExceptionally(exception);
            } else {
                promise.complete(new OutgoingMessage.Completion(metadata.partition(), metadata.offset(), metadata.timestamp()));
            }
        }
    }
}
1 When the bean is initialized, we create a kafka producer and read the topic name to send records to from the application configuration,
2 We observe messages to send created from the application (note that you can adjust it with @Topic or a more precise type - SendOrder - if needed, see previous note),
3 Depending if the ack must be pushed back to the caller we forward it or not thanks a kafka producer Callback.

The consuming side is exactly the symmetric:

@Log
@ApplicationScoped
public class MyKafkaConsumer {
    private Consumer<?, ?> consumer;
    private AtomicBoolean running;
    private ExecutorService pool;
    private ExecutorService appPool;

    public void forceStart(@Observes @Intialized(ApplicationScoped.class) final Object start) { (1)
        // force the instance to start when the app begins
    }

    @PostConstruct
    private void onConstruct() {
        (2)
        final var observerMethods = beanManager.resolveObserverMethods(new In(0, 0, 0, null, null, null));
        if (observerMethods.isEmpty()) {
            return;
        }

        (3)
        consumer = newKafkaConsumer();
        consumer.subscribe(List.of(getTopic()));

        (4)
        final var pool = Executors.newFixedThreadPool(Math.max(1, getPollerThreadNumber()), new ThreadFactory() {
            private final AtomicInteger counter = new AtomicInteger();

            @Override
            public Thread newThread(final Runnable r) {
                return new Thread(r, "kafka-" + conf.topic() + "-consumer-poll-" + counter.incrementAndGet());
            }
        });
        final var appPool = Executors.newFixedThreadPool(Math.max(1, getProcessorThreadNumber()), new ThreadFactory() {
            private final AtomicInteger counter = new AtomicInteger();

            @Override
            public Thread newThread(final Runnable r) {
                return new Thread(r, "kafka-" + conf.topic() + "-consumer-process-" + counter.incrementAndGet());
            }
        });

        final java.util.function.Consumer<In> invoke = i -> { (5)
            for (final var m: observerMethods) {
                m.notify(i);
            }
        };

        (6)
        final var running = new AtomicBoolean(true);
        final var pollTimeout = Duration.ofMillis(100);
        while (running.get()) {
            try {
                final var records = consumer.poll(pollTimeout);
                for (final var record: records) {
                    appPool.execute(() -> invoke.accept(new In( (7)
                        record.partition(),
                        record.offset(),
                        record.timestamp(),
                        StreamSupport.stream(record.headers().spliterator(), false)
                            .collect(toMap(Header::key, Header::value, (a, b) -> a)),
                        record.key(),
                        record.value())));
                }
            } catch (final RuntimeException re) {
                log.log(SEVERE, re, re::getMessage);
            }
        }

        return consumer;
    }

    @PreDestroy
    private void onDestroy() { (8)
        running.set(false);
        consumer.close();
        pool.shutdown();
        appPool.shutdown();
        try {
            if (!pool.awaitTermination(1, TimeUnit.MINUTES) || !appPool.awaitTermination(1, TimeUnit.MINUTES)) {
                log.warning(() -> "Can't stop " + conf.topic() + " consumer in 1mn, giving up");
            }
        } catch (final InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
1 Ensure the instance starts listening when the application is deployed/starts,
2 Resolve the observers of the related event (@Topic + In OR ReceiveOrder as per the previous note),
3 Create a consumer and subscribe to the configured topic,
4 Create thread pools to listen for messages and to process messages,
5 Create a handler for messages (we just forward to listeners),
6 Start the event loop and processing of incoming messages,
7 Create an In representation of messages and forward it to application observes,
8 At the end of the application, destroy the consumer and stop started threads.
if you have multiple incoming/outgoing channels, most of this code can go in a KafkaService and be reused since the only change would be the concurrency if you start multiple consumers, the producer/consumer properties and topic names.

So what we did there is a very light abstraction thanks CDI bus for all our external events and a connector composed of an observer (produce side) and a Kafka consumer which sends CDI event for the consumer side.

Switch of connector

In previous part we implemented a Kafka connector so it should be very hard for you now to implement a JMS or even a mock/in-memory connector. The question then is: how to control which one is in use?

There are several options there:

  • Make a toggle in the application configuration which will be read in Kafka code (if (notKafka()) {return;} kind of pattern) - I’m not a big fan of this one,

  • Make a toggle which is read from a CDI extension which will veto() the connectors not in used - this one works well but requires a toggle,

  • Make the code modular, ie extract Kafka integration in a kafka module and not deploy it when you don’t need it. This option is likely the simplest for the end application since it will not require any additional configuration and the packaging is sufficient there,

  • Not always the most sexy but a very efficient way is to exclude the packages/classes you don’t want to deploy using <scan> section of your module beans.xml - this can be conditional with a system property for example.

Advantages of this technic

Indeed, there, you will implement yourself the connector - but to be honest, it is generally not the hardest part since client libraries are often high level - see reactive kafka for example.

However, the advantages are quite huge:

  • You choose your programming model and it will by design integrate well with your stack and libraries,

  • You don’t depend on what the connector library you picked supports in terms of configuration and features,

  • You are the owner of the code and libraries evolutions,

  • You depend on less libraries (so less potential CVE and upgrade work ;)).

Which solution to choose?

As always there is no silver bullet, both options are quite good and both will work well. The choice will mainly be a compromise between your team knowledge - if you have qualified people don’t hesitate and go with option 2 else option 1 can be saner - and the cost you accept to pay in time to maintain the solution.

From the same author:

In the same category: