How to use JMS in a Microprofile server: Meecrowave and Artemis case!
Microprofile definition is intentionally limited to CDI and JAX-RS but with the Microservice trend using JMS to get a broker is quite common. Let see how smooth it is to make both working together.
Note: this post is not about the rationnality of that architecture.
The use case
To illustrate this post we'll simply create a server hosting the broker and listener for messages on a topic. The client will send a message on a JAX-RS call.
The dependencies
We'll use Meecrowave 0.3.0 (just released :)) and Artemis 1.5.3 to illustrate this post.
Quick note before listing the dependencies, Apache ActiveMQ Artemis is ActiveMQ 5+1. It is not Apollo which was supposed to replace ActiveMQ 5 not the next version of ActiveMQ 5 but a merge with HornetQ. Therefore if you are used to AMQ 5 you can get some surprises but they did a great work to make it smoother than HornetQ was in my opinion.
Server
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-server</artifactId>
<version>${artemis.version}</version>
</dependency>
<dependency>
<groupId>org.apache.meecrowave</groupId>
<artifactId>meecrowave-core</artifactId>
<version>${meecrowave.version}</version>
</dependency>
</dependencies>
Client
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>${artemis.version}</version>
</dependency>
<dependency>
<groupId>org.apache.meecrowave</groupId>
<artifactId>meecrowave-core</artifactId>
<version>${meecrowave.version}</version>
</dependency>
</dependencies>
artemis-cdi-client
Artemis-cdi-client is a CDI integration of artemis JMS but it has some pitfall in last release like making it hard to use only as a client or server or server with remote connection. Since code is pretty much trivial to replace it I didn't select it to ensure to fully control the deployment and avoid to fake some broker to not bind any uncontrolled port on the client.
I think it will evolve if needed to match more real life use cases.
The server
To make things simple to test we'll host the broker instance in our server but note that in practise you will simply connect to a remote broker as a client. I'll try to make obvious the related code.
Manage a server
Starting a server is as easy as creating a JMSServerManager. Once you get it you have a start and stop methods allowing you to control its lifecycle. Here is a way to handle it:
@ApplicationScoped
public class ServerConfig {
@Produces
@ApplicationScoped
public JMSServerManager startServer() throws Exception {
final JMSServerManagerImpl jmsServerManager = new JMSServerManagerImpl(
ActiveMQServers.newActiveMQServer(new ConfigurationImpl()
.setSecurityEnabled(false)
.setPersistenceEnabled(false)
.setJMXManagementEnabled(false)
.addAcceptorConfiguration("netty-acceptor", "tcp://localhost:1234")
.addConnectorConfiguration("netty-connector", "tcp://localhost:1234"), false));
jmsServerManager.start();
return jmsServerManager;
}
public void stopServer(@Disposes final JMSServerManager manager) throws Exception {
manager.stop();
}
public void doStart(@Observes @Initialized(ApplicationScoped.class) ServletContext context,
final JMSServerManager serverManager) throws Exception {
System.out.println(">>> Started JMS on " + serverManager.getActiveMQServer()
.getConfiguration().getConnectorConfigurations()
.values().stream()
.map(Object::toString)
.collect(joining(", ")));
}
}
The important points are:
- Ensure to bind remote connectors to allow to connect to that broker (tcp://localhost:1234)
- Don't forget to close the server with the application (stopServer)
- Force the broker to start with the application (doStart). The trick there is to inject the server as a parameter of the initialization method (@Observes @Initialized(ApplicationScoped.class) for me) and "touch" the instance calling any method to enforce the proxy (if scoped) to be initialized.
Listen for messages
Then we need a topic to listen on, this part is easy since we just instantiate the right implementation class:
@Produces
@ApplicationScoped
public Topic itemsTopic() {
return ActiveMQDestination.createTopic("items");
}
Of course you can do a "new" on ActiveMQTopic too but the factory will ensure you create it properly if it evolves.
Then we need a JMS 2.0 JMSContext. This part is built-in in artemis-cdi-client but since we don't use it we need to integrate it. It is as easy as creating a ConnectionFactory on our broker and create an @ApplicationScoped context from it:
@ApplicationScoped
public class ServerConfig {
@Produces
@ApplicationScoped
public JMSContext createJMSContext(final ConnectionFactory connectionFactory) {
return connectionFactory.createContext();
}
@Produces
@ApplicationScoped
public ActiveMQConnectionFactory factory(final JMSServerManager serverManager) throws Exception {
return ActiveMQJMSClient.createConnectionFactory("tcp://localhost:1234", null);
}
}
Nothing crazy right? Note however how we chain the producers relying on CDI: we inject the previously created instances as parameter of "next" producer. Of course the JMSServerManager is not mandatory for the factory but since we host it in the same application we ensure it is created before this way.
Finally we need to create a listener to get some messages. Here to make thing simpler in our "business" code, we'll fire the unmarshalled message from JMS to a CDI event. This way our real processor will be JMS agnostic:
@Produces
public JMSConsumer startListener(final Topic topic, final JMSContext context,
final Event<Msg> msg) {
final JMSConsumer consumer = context.createConsumer(topic);
consumer.setMessageListener(message -> {
try {
msg.fire(new Msg(message.getBody(String.class)));
} catch (final JMSException e) {
throw new IllegalStateException(e);
}
});
return consumer;
}
We still abuse of CDI producer parameter injections and simply create an auto-started consumer from the JMSContext. Our listener fires the message on CDI bus.
Tip: if you use multiple destinations, rely on a qualifier to distinguish them.
Finally processing the message (Msg) simply means observing this type:
@ApplicationScoped
public class Listener {
public void onMessage(@Observes final Msg msg) {
System.out.println(">>> " + msg.getMessage());
}
}
The client
The client side is pretty close, we'll find the same connection factory, topic producer and JMSContext producer but it will be all our JMS configuration on that side:
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.jms.ConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.Topic;
@ApplicationScoped
public class ClientConfig {
@Produces
public Topic itemsTopic() {
return new ActiveMQTopic("items");
}
@Produces
@ApplicationScoped
public JMSContext createJMSContext(final ConnectionFactory connectionFactory) {
return connectionFactory.createContext();
}
@Produces
public ActiveMQConnectionFactory factory() throws Exception {
return ActiveMQJMSClient.createConnectionFactory("tcp://localhost:1234", null);
}
}
Then we simply inject these elements in our endpoint and convert our JAX-RS call to a JMS message:
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.jms.ConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.Topic;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("jms")
@ApplicationScoped
public class Sender {
@Inject
private ConnectionFactory connectionFactory;
@Inject
private Topic topic;
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Sent send(final Send send) {
try (final JMSContext jmsContext = connectionFactory.createContext()) {
jmsContext.createProducer().send(topic, send.message);
}
return new Sent(true);
}
public static class Send { public String message; }
public static class Sent { public final boolean acknowledge; } // +constructor
}
If you deploy this client on port 10100 for instance you will be able to test it with this curl command:
curl -XPOST http://localhost:10100/jms \
-d '{"message":"test"}' \
-H 'Accept: application/json' -H 'Content-Type: application/json'
And the server will a few milliseconds after log "test" to show it received it.
Conclusion
Artemis is simple to get started with and integrating its standalone flavor in CDI is only 3 lines of (real) code.
Of course Artemis supports far more than what is in this post but it is mainly configuration related (user, password, failover, ...) or overpass the microservice area (JTA integration for instance).
The good aspect of that experimentation is nothing prevent you to use JMS with CDI today, whatever flavor of CDI you use (a full blown EE server with MDB or a standalone program without all that integration). Integrating both technology - and potential containers since we start to get portable protocols - is very doable in a few minutes.
Even if it can bring some drawbacks like:
- managing a new system for your ops team
- it can require some specific infrastructure (good I/O for instance)
- it is generally based on a connected protocol so you need to ensure failover works if needed
It can still be desirable in some complex cases or environments where JMS can be a good fit. The outcome is the same as at any time: select what you need and not what is easy ;).
From the same author:
In the same category: