For workshop I will present on microservices and communication patterns I need attendees to have their own local Kafka Cluster. I have found a way to have them up and running in virtually no time at all. Thanks to the combination of:
- Kubernetes
- Minikube
- The Yolean/kubernetes-kafka GitHub Repo with Kubernetes yaml files that creates all we need (including Kafka Manager)
Prerequisites:
- Minikube and Kubectl are installed
- The Minikube cluster is running (minikube start)
In my case the versions are:
Minikube: v0.22.3, Kubectl Client 1.9 and (Kubernetes) Server 1.7:
The steps I went through:
Git Clone the GitHub Repository: https://github.com/Yolean/kubernetes-kafka
From the root directory of the cloned repository, run the following kubectl commands:
(note: I did not know until today that kubectl apply –f can be used with a directory reference and will then apply all yaml files in that directory. That is incredibly useful!)
kubectl apply -f ./configure/minikube-storageclass-broker.yml
kubectl apply -f ./configure/minikube-storageclass-zookeeper.yml
(note: I had to comment out the reclaimPolicy attribute in both files – probably because I am running a fairly old version of Kubernetes)
kubectl apply -f ./zookeeper
kubectl apply -f ./kafka
(note: I had to change API version in 50pzoo and 51zoo as well as in 50kafka.yaml from apiVersion: apps/v1beta2 to apiVersion: apps/v1beta1 – see https://github.com/kubernetes/kubernetes/issues/55894 for details; again, I should upgrade my Kubernetes version)
To make Kafka accessible from the minikube host (outside the K8S cluster itself)
kubectl apply -f ./outside-services
This exposes Services as type NodePort instead of ClusterIP, making them available for client applications that can access the Kubernetes host.
I also installed (Yahoo) Kafka Manager:
kubectl apply -f ./yahoo-kafka-manager
(I had to change API version in kafka-manager from apiVersion: apps/v1beta2 to apiVersion: apps/v1beta1 )
At this point, the Kafka Cluster is running. I can check the pods and services in the Kubernetes Dashboard as well as through kubectl on the command line. I can get the Port at which I can access the Kafka Brokers:
And I can access the Kafka Manager at the indicated Port.
Initially, no cluster is visible in Kafka Manager. By providing the Zookeeper information highlighted in the figure (zookeeper.kafka:2181) I can make the cluster visible in this user interface tool.
Finally the eating of the pudding: programmatic production and consumption of messages to and from the cluster. Using the world’s simplest Node Kafka clients, it is easy to see the stuff is working. I am impressed.
I have created the Node application and its package.json file. Then added the kafka-node dependency (npm install kafka-node –save). Next I created the producer:
// before running, either globally install kafka-node (npm install kafka-node)
// or add kafka-node to the dependencies of the local application
var kafka = require('kafka-node')
var Producer = kafka.Producer
KeyedMessage = kafka.KeyedMessage;
var client;
KeyedMessage = kafka.KeyedMessage;
var APP_VERSION = "0.8.5"
var APP_NAME = "KafkaProducer"
var topicName = "a516817-kentekens";
var KAFKA_BROKER_IP = '192.168.99.100:32400';
// from the Oracle Event Hub - Platform Cluster Connect Descriptor
var kafkaConnectDescriptor = KAFKA_BROKER_IP;
console.log("Running Module " + APP_NAME + " version " + APP_VERSION);
function initializeKafkaProducer(attempt) {
try {
console.log(`Try to initialize Kafka Client at ${kafkaConnectDescriptor} and Producer, attempt ${attempt}`);
const client = new kafka.KafkaClient({ kafkaHost: kafkaConnectDescriptor });
console.log("created client");
producer = new Producer(client);
console.log("submitted async producer creation request");
producer.on('ready', function () {
console.log("Producer is ready in " + APP_NAME);
});
producer.on('error', function (err) {
console.log("failed to create the client or the producer " + JSON.stringify(err));
})
}
catch (e) {
console.log("Exception in initializeKafkaProducer" + JSON.stringify(e));
console.log("Try again in 5 seconds");
setTimeout(initializeKafkaProducer, 5000, ++attempt);
}
}//initializeKafkaProducer
initializeKafkaProducer(1);
var eventPublisher = module.exports;
eventPublisher.publishEvent = function (eventKey, event) {
km = new KeyedMessage(eventKey, JSON.stringify(event));
payloads = [
{ topic: topicName, messages: [km], partition: 0 }
];
producer.send(payloads, function (err, data) {
if (err) {
console.error("Failed to publish event with key " + eventKey + " to topic " + topicName + " :" + JSON.stringify(err));
}
console.log("Published event with key " + eventKey + " to topic " + topicName + " :" + JSON.stringify(data));
});
}
//example calls: (after waiting for three seconds to give the producer time to initialize)
setTimeout(function () {
eventPublisher.publishEvent("mykey", { "kenteken": "56-TAG-2", "country": "nl" })
}
, 3000)
and ran the producer:
The create the consumer:
var kafka = require('kafka-node');
var client;
var APP_VERSION = "0.8.5"
var APP_NAME = "KafkaConsumer"
var eventListenerAPI = module.exports;
var kafka = require('kafka-node')
var Consumer = kafka.Consumer
// from the Oracle Event Hub - Platform Cluster Connect Descriptor
var topicName = "a516817-kentekens";
console.log("Running Module " + APP_NAME + " version " + APP_VERSION);
console.log("Event Hub Topic " + topicName);
var KAFKA_BROKER_IP = '192.168.99.100:32400';
var consumerOptions = {
kafkaHost: KAFKA_BROKER_IP,
groupId: 'local-consume-events-from-event-hub-for-kenteken-applicatie',
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
};
var topics = [topicName];
var consumerGroup = new kafka.ConsumerGroup(Object.assign({ id: 'consumerLocal' }, consumerOptions), topics);
consumerGroup.on('error', onError);
consumerGroup.on('message', onMessage);
consumerGroup.on('connect', function () {
console.log('connected to ' + topicName + " at " + consumerOptions.host);
})
function onMessage(message) {
console.log('%s read msg Topic="%s" Partition=%s Offset=%d'
, this.client.clientId, message.topic, message.partition, message.offset);
}
function onError(error) {
console.error(error);
console.error(error.stack);
}
process.once('SIGINT', function () {
async.each([consumerGroup], function (consumer, callback) {
consumer.close(true, callback);
});
});
and ran the consumer – which duly consumed the event published by the publisher. It is wonderful.
Use
kubectl delete ns kafka
or:
kubectl delete pods,services,pvc,pv,sts,cm –all –namespace=”kafka”
to remove the namespace kafka and its contents.
Resources
The main resources is the GitHub Repo: https://github.com/Yolean/kubernetes-kafka . Absolutely great stuff.
Also useful: npm package kafka-node – https://www.npmjs.com/package/kafka-node
Documentation on Kubernetes: https://kubernetes.io/docs/user-journeys/users/application-developer/foundational/#section-2 – with references to Kubectl and Minikube – and the Katakoda playground: https://www.katacoda.com/courses/kubernetes/playground

Hi There thanks for the post this is really helpful only one doubt, the git repository used here -> https://github.com/Yolean/kubernetes-kafka does not seem to have “config” folder in the master branch, as it is used in the very first command in this blog post “kubectl apply -f ./configure/minikube-storageclass-broker.yml” so I would like to know which branch needs to be checked out apart from master in order to get this config folder ?
I followed the tutorial step by step but I get the following error for each pod:
pod has unbound PersistentVolumeClaims
The same problem to run local
This is a great post!
The only change I made in the config files is to change the replicas to 1 instead of 3 for zookeeper and kafka. But when I start things up, kafka does not connect to zookeeper. It comes back with this error in the log:
[2018-10-08 22:10:08,691] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server ‘zookeeper:2181’ with timeout of 6000 ms
Where does kafka get this name of the zookeeper server as “zookeeper”? I expected it would be “zoo-0” or “pzoo-0”. where is this name “zookeeper” configured for the server? And how do I change it?
Also, what is the difference between zoo and pzoo here? Thanks!
What is the URL of Kubernetes Dashboard? I could not access it. Thx.
URL is the http://:. To make kafka manager accessible from outside, specify the type as NodePort in kafka manager’s yml.
Lucas, can you please share your yaml files, I have not been able to connect to the service using node producer and consumer. I can produce & consume the messages inside the container.
same here, any ideas?
I think the service we created for kafka-manager is a ClusterIP, not a nodeport as suggested
I can reproduce everything except for the Kafka manager. Same issue. Port 0.
Thanks for sharing, I think you missed to mention the step to create kafka namespace,
kubectl apply -f 00-namespace.yml, before running the kubectl apply -f ./zookeeper, otherwise it will complain about
Error from server (NotFound): error when creating “zookeeper/10zookeeper-config.yml”: namespaces “kafka” not found
Error from server (NotFound): error when creating “zookeeper/10zookeeper-config.yml”: namespaces “kafka” not found
use this command to create the namespace:
kubectl create -f 00-namespace.yml
I was just made aware of project http://strimzi.io/ that has as its aim: “Strimzi provides a way to run an Apache Kafka cluster on OpenShift and Kubernetes in various deployment configurations.” Definitely worth a look.
Hi Lucas, were you able to resolve this issue eventually?
I cannot access the Kafla manager. I see port 0 in the kubernetes dashboard.
Hi Desmond, were you able to resolve the issue eventually?
You can access the Kafka Manager using “kubectl proxy” and then open http://localhost:8001/api/v1/namespaces/alvary-kafka/services/kafka-manager:/proxy/ in a web browser.
I was able to access the manager by editing the kafka-manager Service and changing the type to NodePort. You can do so by running :
kubectl edit svc kafka-manager –namespace kafka
You can still access it through port 80