This tutorial will teach you how to install a Resource Adapter for Apache Kafka on WildFly so that you can Produce and Consume streams of messages on your favourite application server!
First of all some basics: what is Apache Kafka? Apache Kafka is a Streaming Platform which provides some key capabilities:
- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
- Store streams of records in a fault-tolerant durable way.
- Process streams of records as they occur.
Apache Kafka is generally used for two types of applications:
- Application which build real-time streaming data pipelines that reliably get data between systems or applications
- Applications which transform or react to the streams of data
The simplest way to start Kafka is by means of a Docker Compose YAML file, which will take care to start both the Container image of Kafka and Zookeeper, which is needed for the Cluster Management. Here is a sample docker-compose.yaml file:
version: '2'
services:
zookeeper:
image: strimzi/kafka:0.11.3-kafka-2.1.0
command: [
"sh", "-c",
"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: strimzi/kafka:0.11.3-kafka-2.1.0
command: [
"sh", "-c",
"bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
From the same directory where you have saved the docker-compose.yaml file execute:
docker-compose up
Check from the Console that Kafka started successfully:
kafka_1 | [2019-11-04 07:58:50,051] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser) kafka_1 | [2019-11-04 07:58:50,051] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser) kafka_1 | [2019-11-04 07:58:50,053] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Installing and Configuring Kafka Resource adapter on WildFly
A Kafka Resource adapter is available in this GitHub project: https://github.com/payara/Cloud-Connectors/tree/master/Kafka/KafkaRAR
Download the project and build it:
cd Kafka/KafkaRAR mvn install ls KafkaRAR/target/ kafka-rar-0.6.0-SNAPSHOT kafka-rar-0.6.0-SNAPSHOT.rar
Now deploy the kafka-rar-0.6.0-SNAPSHOT.rar into WildFly:
cp target/kafka-rar-0.6.0-SNAPSHOT.rar $JBOSS_HOME/standalone/deployments
Next, we will be adding the RAR configuration in standalone-full.xml:
<subsystem xmlns="urn:jboss:domain:resource-adapters:5.0">
<resource-adapters>
<resource-adapter id="kafka">
<archive>
kafka-rar-0.6.0-SNAPSHOT.rar
</archive>
<transaction-support>XATransaction</transaction-support>
<connection-definitions>
<connection-definition class-name="fish.payara.cloud.connectors.kafka.outbound.KafkaManagedConnectionFactory" jndi-name="java:/KafkaConnectionFactory" enabled="true" pool-name="ConnectionFactory">
<xa-pool>
<min-pool-size>1</min-pool-size>
<max-pool-size>20</max-pool-size>
<prefill>false</prefill>
<is-same-rm-override>false</is-same-rm-override>
</xa-pool>
</connection-definition>
</connection-definitions>
</resource-adapter>
</resource-adapters>
</subsystem>
Now start WildFly and check that the Server logs registered a successful Kafka Connection:
15:28:37,765 INFO [org.apache.kafka.common.utils.AppInfoParser] (MSC service thread 1-1) Kafka version: 2.3.1 15:28:37,766 INFO [org.apache.kafka.common.utils.AppInfoParser] (MSC service thread 1-1) Kafka commitId: 18a913733fb71c01 15:28:37,766 INFO [org.apache.kafka.common.utils.AppInfoParser] (MSC service thread 1-1) Kafka startTimeMs: 1572877717755 15:28:37,767 INFO [org.jboss.as.connector.deployment] (MSC service thread 1-1) WFLYJCA0007: Registered connection factory java:/KafkaConnectionFactory 15:28:37,882 INFO [fish.payara.cloud.connectors.kafka.inbound.KafkaResourceAdapter] (MSC service thread 1-1) Kafka Resource Adapter Started..
Great, let’s deploy an example application.
Creating a Kafka Producer
We will now deploy a REST application which produces Kafka Messages. Later on we will show how to consume these messages. As for every basic JAX-RS application, we will start by adding a JAX-RS Activator Class with the rest context in it:
package com.mastertheboss.kafka;
import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;
@ApplicationPath("/rest")
public class JaxRsActivator extends Application {
}
Then, let’s add a basic Producer Endpoint that will send messages to the “my-topic” topic, using the KafkaConnectionFactory:
package com.mastertheboss.kafka;
import fish.payara.cloud.connectors.kafka.api.KafkaConnection;
import fish.payara.cloud.connectors.kafka.api.KafkaConnectionFactory;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
@Path("/kafka")
@ApplicationScoped
public class KafkaQueueResource {
@Resource(lookup="java:/KafkaConnectionFactory")
KafkaConnectionFactory factory;
public KafkaQueueResource() {
}
@GET
public Response hello() {
try (KafkaConnection conn = factory.createConnection()) {
conn.send(new ProducerRecord("my-topic","Hello world"));
return Response
.status(Response.Status.OK)
.entity("Message sent!")
.build();
} catch (Exception ex) {
ex.printStackTrace();
return Response.serverError().entity(ex.getMessage()).build();
}
}
}
In order to be able to run this application, you will need to link it with the Resource Adapter dependencies in jboss-deployment-structure.xml as follows:
<jboss-deployment-structure>
<deployment>
<dependencies>
<module name="deployment.kafka-rar-0.6.0-SNAPSHOT.rar" export="TRUE"/>
</dependencies>
</deployment>
</jboss-deployment-structure>
Now build the application and deploy in on WildFly:
$ mvn clean install wildfly:deploy
Starting a Kafka Consumer
In first instance, we will start a Topic consumer, by executing the kafka-console-consumer.sh script which is available in the bin folder of Kafka. Let’s find the Kafka process so we can enter into its bash process:
$ docker ps CONTAINER ID IMAGE COMMAND PORTS NAMES bd352522dd35 strimzi/kafka:0.11.3-kafka-2.1.0 "sh -c 'bin/kafka-..." 9091/tcp, 9404/tcp, 0.0.0.0:9092->9092/tcp kafka-demo_kafka_1 f1352dc7783b strimzi/kafka:0.11.3-kafka-2.1.0 "sh -c 'bin/zookee..." 9091-9092/tcp, 0.0.0.0:2181->2181/tcp, 9404/tcp kafka-demo_zookeeper_1 [francesco@fedora kafka-demo]$ docker exec -it bd352522dd35 bash
Now execute the kafka-console-consumer.sh:
[kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic
Back to our Host machine, let’s push one message by requesting the following URL:
curl http://localhost:8080/kafka-example/rest/kafka Message sent!
If you check the kafka-console-consumer logs, you should see the incoming message:

Developing a Kafka Consumer as Message Driven Mean
In second instance, we will add a MessageDriven Bean class which implements the KafkaListener interface to receive callbacks when a message arrives. The methods annotated with OnRecord are used to retrieve the message for one topic:
import fish.payara.cloud.connectors.kafka.api.KafkaListener;
import fish.payara.cloud.connectors.kafka.api.OnRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jboss.ejb3.annotation.ResourceAdapter;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "clientId", propertyValue = "KafkaJCAClient"),
@ActivationConfigProperty(propertyName = "groupIdConfig", propertyValue = "myGroup"),
@ActivationConfigProperty(propertyName = "topics", propertyValue = "my-topic"),
@ActivationConfigProperty(propertyName = "bootstrapServersConfig", propertyValue = "localhost:9092"),
@ActivationConfigProperty(propertyName = "retryBackoff", propertyValue = "1000"),
@ActivationConfigProperty(propertyName = "autoCommitInterval", propertyValue = "100"),
@ActivationConfigProperty(propertyName = "keyDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),
@ActivationConfigProperty(propertyName = "valueDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),
@ActivationConfigProperty(propertyName = "pollInterval", propertyValue = "3000"),
@ActivationConfigProperty(propertyName = "commitEachPoll", propertyValue = "true"),
@ActivationConfigProperty(propertyName = "useSynchMode", propertyValue = "true")
})
@ResourceAdapter(value="kafka")
public class KafkaMDB implements KafkaListener {
public KafkaMDB() {
System.out.println("Bean instance created");
}
@OnRecord( topics={"my-topic"})
public void getMessage(ConsumerRecord record) {
System.out.println("> Got record on topic test " + record);
}
}
Let’s wrap also the MDB in our application. As we send new messages, now they will be consumed by the MDB, as you can see from WildFly’s console:

Congratulations! You have just produced and consume messages with Kafka and WildFly Resource Adapter.
You can find the full source code of this example at: https://github.com/fmarchioni/mastertheboss/tree/master/kafka/kafka-wildfly
Spring Boot users: Interested to see Apache Kafka in action with Spring boot ? Check this tutorial then Apache Kafka and Spring Boot quickstart