Kafka integration with MuleSoft
- January 12, 2022
Introduction to Apache Kafka
Apache Kafka is a framework implementation of a software bus using stream-processing. Apache Software Foundation developed this open-source software platform written in Scala and Java.
The project aims to provide a scalable, unified, high-throughput low-latency platform with no data loss that also reduces complexity for handling real-time data feeds.
Basic Kafka terminologies
- Topics are a category or feed name to which messages are published. A topic can have a zero, one or many consumers who can subscribe to the data written to it.
- Partition is a topic with one or more partitions associated with handling large volumes of data. Each partition is an ordered, immutable sequence of records continually appended to a structured commit log.
- Partition Offset is the sequential ID number that uniquely identifies each record within the partition.
- Broker is a Kafka server that manages the storage of messages in the topic. Every broker has a unique ID to identify it.
- Kafka Clusters are formed from Kafka brokers. More than one broker forms a cluster, and each broker has a unique ID. The Kafka cluster consists of many brokers running on many servers.
- Producers publish data to the topics. The producer is responsible for choosing which record to assign to which partition within the topic.
- Consumers read and process data from topics.
Software versions used in this demo
- Java 1.8
- Windows 10
- Mule Runtime 4.3.0 EE
- Anypoint Studio 7.10.0
- Apache Kafka 2.13-2.8.0
- Apache Kafka Connector-Mule 4 Version-4.3.3
Getting started with Apache Kafka
To use Kafka, you must pre-install JDK on your system. Install the latest version of Apache Kafka binaries from https://kafka.apache.org/downloads and extract the binary in your C folder.
NOTE: The earlier version of Kafka also required Zookeeper to be installed separately. The Kafka binaries in recent versions have Zookeeper dependencies already included.
Disabling Hyper-V
Kafka will not work if you’re using any Docker server on your local server. You’ll have to disable your Hyper-V first. Follow these steps:
- Go to your control panel and click Uninstall a Program
- Click Turn Windows Features On or Off
- Ensure the Hyper-V is unchecked
- Click Hyper-V
NOTE: You may be asked to restart your system. If so, do that.
Now, you’re ready to proceed working with Kafka.
Necessary changes in extracted Kafka files
- Go to your Kafka ->config folder. You should see the following files in which changes need to be made:
- zookeeper.properties
- server.properties
- Open the zookeeper.properties file using Notepad/Notepad++ and provide a proper address to the dataDir variable. You can also change the port from the same file if needed.
- Open the server.properties file using an editor of your choice and properly address the logs.dirs variable.
- Add the following lines in the internal topic settings section:
offsets.topic.num.partitions=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1min.insync.replicas=1default.replication.factor=1
NOTE: These directories should be changed because of storage issues. By default, it comes with the temp file location. It should be changed to your folder to increase the storage capacity of logs and to run the server and zookeeper smoothly.
Now you’re ready to deploy the Kafka server locally.
Zookeeper Server deployment
Kafka needs a zookeeper to manage the cluster. Zookeeper coordinates the brokers/clusters network. It’s a consistent file system for configuration information. You must start the zookeeper prior to the Kafka server.
Open a command prompt and navigate to the Kafka directory and run the following command to start the zookeeper:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties |
Your zookeeper should start on port 2181.
Kafka Server deployment
Open a new command prompt, go to your Kafka folder, navigate to the path bin/windows and run the following command to deploy your server:
kafka-server-start.bat ../../config/server.properties |
Your Kafka server should start.
Create a topic
Open a new command prompt in kafka/bin/windows and run the following command to create a topic:
kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic <TopicName> |
To verify if the topic is successfully created or not, use the following command:
kafka-topics.bat -list -zookeeper localhost:2181 |
You’ll see the list of topics present.
Create a MuleSoft Flow for the publish operation
NOTE: Before running the flows, make sure zookeeper and the server are running on your local machine.
We’ll now integrate this Kafka server with Mule ESB.
- Create a new Project and import the latest version of Apache Kafka Connector from Anypoint Exchange.
You should see the list of Kafka processors available in the module.
- Create a simple flow.
- Set all the configurations as follows:<
Listener Configuration:
- Set the method to POST in the advanced tab of the listener properties.
- Leave the rest of the configurations as default.
Logger:
- Log a message stating that the message has been received to publish to Kafka.
Publish:
- Create a global connector configuration element for the Kafka producer.
- Click Test Connection. Your connection should be successful.
- Click OK and then OK again.
Add this configuration in the general tab of publish properties. Provide the topic name created above. In the message section, take the data from the HTTP request body. In other words, in the message section, give the payload in expression mode.
You also need to provide the key as the current time using #[now()].
Here’s the XML code for the flow:
<?xml version=”1.0″ encoding=”UTF-8″?> <mule xmlns:http=”http://www.mulesoft.org/schema/mule/http” xmlns:kafka=”http://www.mulesoft.org/schema/mule/kafka” xmlns=”http://www.mulesoft.org/schema/mule/core” xmlns:doc=”http://www.mulesoft.org/schema/mule/documentation” xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xsi:schemaLocation=”http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsdhttp://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsdhttp://www.mulesoft.org/schema/mule/secure-properties http://www.mulesoft.org/schema/mule/secure-properties/current/mule-secure-properties.xsdhttp://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd”> <http:listener-config name=”HTTP_Listener_config” doc:name=”HTTP Listener config” doc:id=”6bece4e7-abe1-467d-b73a-c56c9e80dc7c” > <http:listener-connection host=”0.0.0.0″ port=”8089″ /> </http:listener-config> <kafka:producer-config name=”Apache_Kafka_Producer_configuration” doc:name=”Apache Kafka Producer configuration” doc:id=”d8e90197-b797-4d14-95de-0d80448da6a9″ > <kafka:producer-plaintext-connection partitioner=”ROUND_ROBIN”> <kafka:bootstrap-servers > <kafka:bootstrap-server value=”localhost:9092″ /> </kafka:bootstrap-servers> </kafka:producer-plaintext-connection> </kafka:producer-config> <flow name=”Publish-flow” doc:id=”5b88c75c-8072-46c1-b000-b71f0b7caaaf” > <http:listener doc:name=”Listener” doc:id=”fb52500c-604c-4939-9a4d-4725bb1bb56e” config-ref=”HTTP_Listener_config” path=”/publish” allowedMethods=”POST”> <http:response > <http:body ><![CDATA[#[“Message sent to kafka producer”]]]></http:body> </http:response> </http:listener> <logger level=”INFO” doc:name=”Logger” doc:id=”192c2478-fefa-45cb-87d9-e3d8c87e1f6e” message=”Received message to publish”/> <kafka:publish doc:name=”Publish” doc:id=”a248980c-efd0-4368-b7cf-49b0cc703d0e” topic=”MuleTopic” config-ref=”Apache_Kafka_Producer_configuration” key=”#[now()]”> <reconnect /> </kafka:publish> </flow></mule> |
Create a MuleSoft consume flow
It’s time to create a flow to consume the messages published in the Kafka topic.
- Create a simple flow.
- Set all the configurations.
- Create a global connector configuration element for the Kafka Consumer.
- Click Test Connection. Your connection should be successful.
- Click OK and then OK again.
Leave all other properties set to default.
Logger: Log a message to know that the message has been successfully consumed.
Here’s the XML code for the flow:
<?xml version=”1.0″ encoding=”UTF-8″?> <mule xmlns:http=”http://www.mulesoft.org/schema/mule/http” xmlns:kafka=”http://www.mulesoft.org/schema/mule/kafka” xmlns=”http://www.mulesoft.org/schema/mule/core” xmlns:doc=”http://www.mulesoft.org/schema/mule/documentation” xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance” xsi:schemaLocation=”http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsdhttp://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsdhttp://www.mulesoft.org/schema/mule/secure-properties http://www.mulesoft.org/schema/mule/secure-properties/current/mule-secure-properties.xsdhttp://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd”> <http:listener-config name=”HTTP_Listener_config” doc:name=”HTTP Listener config” doc:id=”6bece4e7-abe1-467d-b73a-c56c9e80dc7c” > <http:listener-connection host=”0.0.0.0″ port=”8089″ /> </http:listener-config> <kafka:consumer-config name=”Apache_Kafka_Consumer_configuration” doc:name=”Apache Kafka Consumer configuration” doc:id=”64f6184c-016f-4403-b246-1cd551e13b79″ > <kafka:consumer-plaintext-connection groupId=”test-consumer-group”> <kafka:bootstrap-servers > <kafka:bootstrap-server value=”localhost:9092″ /> </kafka:bootstrap-servers> <kafka:topic-patterns > <kafka:topic-pattern value=”DemoTopic” /> </kafka:topic-patterns> </kafka:consumer-plaintext-connection> </kafka:consumer-config><flow name=”ConsumeFlow” doc:id=”558e2042-e8d3-400c-ac2e-fc6009ea43bd” > <kafka:message-listener doc:name=”Message listener” doc:id=”0f8dbcc6-4675-42c6-ad1c-728e858ca99d” config-ref=”Apache_Kafka_Consumer_configuration” ackMode=”AUTO”/> <logger level=”INFO” doc:name=”Logger” doc:id=”0a9cea42-188c-4a87-8264-30417014d178″ message=”#[‘\n Logging the consumed payload: ‘++ payload]”/></flow> |
Now we are ready to test the application.
To deploy the application:
- Click in the console.
- Once the application is deployed, go to your Postman/AdvanceRestClient and send a POST request.
You should receive a message in the response tab.
Check the console of Anypoint Studio to confirm receipt of a response from the publish flow logger component and from the consume-flow logger component.
Parallel processing in Kafka
In Kafka, the topic partition is the unit of parallelism. The more partitions, the higher the processing parallelism. For example, if a topic has 30 partitions, then an application can run up to 30 instances of itself — such as 30 Docker containers to process the topic’s data collaboratively and in parallel. Each of the 30 instances will get exclusive access to one partition, whose messages it will process sequentially. Any instances beyond 30 will remain idle. Message ordering is also guaranteed by the partition construct. Each message will be processed in the order in which it is written to the partition.
To achieve parallel processing using Kafka, first create a topic with multiple partitions. Then, in the general properties of Message Listener, increase the number of parallel consumers.
Try running the application and you’ll see the parallelism working on your local machine.
Commands used for local Kafka set up
Go to the kafka/bin/windows folder and then give the following command:
- Start Kafka Server: kafka-server-start.bat ../../config/server.properties
- Create a Topic: kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic TopicName<topic name
- Describe a Topic: kafka-topics.bat –describe –zookeeper localhost:2181 –topic TopicName<topic name>
- List Kafka Topics: .\bin\windows\kafka-topics.bat –list –zookeeper localhost:2181
- Delete a Topic: kafka-run-class.bat kafka.admin.TopicCommand –delete –topic TopicName<topic name> –zookeeper localhost:2181
- Start Kafka Producer: kafka-console-producer.bat –broker-list localhost:9092 –topic TopicName<topic name>
- Start Kafka Consumer: kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic TopicName<topic name>
— By Ravikant Kumar