For workshop I will present on microservices and communication patterns I need attendees to have their own local Kafka Cluster. I have found a way to have them up and running in virtually no time at all. Thanks to the combination of:
- Kubernetes
- Minikube
- The Yolean/kubernetes-kafka GitHub Repo with Kubernetes yaml files that creates all we need (including Kafka Manager)
Prerequisites:
- Minikube and Kubectl are installed
- The Minikube cluster is running (minikube start)
In my case the versions are:
Minikube: v0.22.3, Kubectl Client 1.9 and (Kubernetes) Server 1.7:
The steps I went through:
Git Clone the GitHub Repository: https://github.com/Yolean/kubernetes-kafka
From the root directory of the cloned repository, run the following kubectl commands:
(note: I did not know until today that kubectl apply –f can be used with a directory reference and will then apply all yaml files in that directory. That is incredibly useful!)
kubectl apply -f ./configure/minikube-storageclass-broker.yml
kubectl apply -f ./configure/minikube-storageclass-zookeeper.yml
(note: I had to comment out the reclaimPolicy attribute in both files – probably because I am running a fairly old version of Kubernetes)
kubectl apply -f ./zookeeper
kubectl apply -f ./kafka
(note: I had to change API version in 50pzoo and 51zoo as well as in 50kafka.yaml from apiVersion: apps/v1beta2 to apiVersion: apps/v1beta1 – see https://github.com/kubernetes/kubernetes/issues/55894 for details; again, I should upgrade my Kubernetes version)
To make Kafka accessible from the minikube host (outside the K8S cluster itself)
kubectl apply -f ./outside-services
This exposes Services as type NodePort instead of ClusterIP, making them available for client applications that can access the Kubernetes host.
I also installed (Yahoo) Kafka Manager:
kubectl apply -f ./yahoo-kafka-manager
(I had to change API version in kafka-manager from apiVersion: apps/v1beta2 to apiVersion: apps/v1beta1 )
At this point, the Kafka Cluster is running. I can check the pods and services in the Kubernetes Dashboard as well as through kubectl on the command line. I can get the Port at which I can access the Kafka Brokers:
And I can access the Kafka Manager at the indicated Port.
Initially, no cluster is visible in Kafka Manager. By providing the Zookeeper information highlighted in the figure (zookeeper.kafka:2181) I can make the cluster visible in this user interface tool.
Finally the eating of the pudding: programmatic production and consumption of messages to and from the cluster. Using the world’s simplest Node Kafka clients, it is easy to see the stuff is working. I am impressed.
I have created the Node application and its package.json file. Then added the kafka-node dependency (npm install kafka-node –save). Next I created the producer:
// before running, either globally install kafka-node (npm install kafka-node) // or add kafka-node to the dependencies of the local application var kafka = require('kafka-node') var Producer = kafka.Producer KeyedMessage = kafka.KeyedMessage; var client; KeyedMessage = kafka.KeyedMessage; var APP_VERSION = "0.8.5" var APP_NAME = "KafkaProducer" var topicName = "a516817-kentekens"; var KAFKA_BROKER_IP = '192.168.99.100:32400'; // from the Oracle Event Hub - Platform Cluster Connect Descriptor var kafkaConnectDescriptor = KAFKA_BROKER_IP; console.log("Running Module " + APP_NAME + " version " + APP_VERSION); function initializeKafkaProducer(attempt) { try { console.log(`Try to initialize Kafka Client at ${kafkaConnectDescriptor} and Producer, attempt ${attempt}`); const client = new kafka.KafkaClient({ kafkaHost: kafkaConnectDescriptor }); console.log("created client"); producer = new Producer(client); console.log("submitted async producer creation request"); producer.on('ready', function () { console.log("Producer is ready in " + APP_NAME); }); producer.on('error', function (err) { console.log("failed to create the client or the producer " + JSON.stringify(err)); }) } catch (e) { console.log("Exception in initializeKafkaProducer" + JSON.stringify(e)); console.log("Try again in 5 seconds"); setTimeout(initializeKafkaProducer, 5000, ++attempt); } }//initializeKafkaProducer initializeKafkaProducer(1); var eventPublisher = module.exports; eventPublisher.publishEvent = function (eventKey, event) { km = new KeyedMessage(eventKey, JSON.stringify(event)); payloads = [ { topic: topicName, messages: [km], partition: 0 } ]; producer.send(payloads, function (err, data) { if (err) { console.error("Failed to publish event with key " + eventKey + " to topic " + topicName + " :" + JSON.stringify(err)); } console.log("Published event with key " + eventKey + " to topic " + topicName + " :" + JSON.stringify(data)); }); } //example calls: (after waiting for three seconds to give the producer time to initialize) setTimeout(function () { eventPublisher.publishEvent("mykey", { "kenteken": "56-TAG-2", "country": "nl" }) } , 3000)
and ran the producer:
The create the consumer:
var kafka = require('kafka-node'); var client; var APP_VERSION = "0.8.5" var APP_NAME = "KafkaConsumer" var eventListenerAPI = module.exports; var kafka = require('kafka-node') var Consumer = kafka.Consumer // from the Oracle Event Hub - Platform Cluster Connect Descriptor var topicName = "a516817-kentekens"; console.log("Running Module " + APP_NAME + " version " + APP_VERSION); console.log("Event Hub Topic " + topicName); var KAFKA_BROKER_IP = '192.168.99.100:32400'; var consumerOptions = { kafkaHost: KAFKA_BROKER_IP, groupId: 'local-consume-events-from-event-hub-for-kenteken-applicatie', sessionTimeout: 15000, protocol: ['roundrobin'], fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest' }; var topics = [topicName]; var consumerGroup = new kafka.ConsumerGroup(Object.assign({ id: 'consumerLocal' }, consumerOptions), topics); consumerGroup.on('error', onError); consumerGroup.on('message', onMessage); consumerGroup.on('connect', function () { console.log('connected to ' + topicName + " at " + consumerOptions.host); }) function onMessage(message) { console.log('%s read msg Topic="%s" Partition=%s Offset=%d' , this.client.clientId, message.topic, message.partition, message.offset); } function onError(error) { console.error(error); console.error(error.stack); } process.once('SIGINT', function () { async.each([consumerGroup], function (consumer, callback) { consumer.close(true, callback); }); });
and ran the consumer – which duly consumed the event published by the publisher. It is wonderful.
Use
kubectl delete ns kafka
or:
kubectl delete pods,services,pvc,pv,sts,cm –all –namespace=”kafka”
to remove the namespace kafka and its contents.
Resources
The main resources is the GitHub Repo: https://github.com/Yolean/kubernetes-kafka . Absolutely great stuff.
Also useful: npm package kafka-node – https://www.npmjs.com/package/kafka-node
Documentation on Kubernetes: https://kubernetes.io/docs/user-journeys/users/application-developer/foundational/#section-2 – with references to Kubectl and Minikube – and the Katakoda playground: https://www.katacoda.com/courses/kubernetes/playground
Hi There thanks for the post this is really helpful only one doubt, the git repository used here -> https://github.com/Yolean/kubernetes-kafka does not seem to have “config” folder in the master branch, as it is used in the very first command in this blog post “kubectl apply -f ./configure/minikube-storageclass-broker.yml” so I would like to know which branch needs to be checked out apart from master in order to get this config folder ?
I followed the tutorial step by step but I get the following error for each pod:
pod has unbound PersistentVolumeClaims
The same problem to run local
This is a great post!
The only change I made in the config files is to change the replicas to 1 instead of 3 for zookeeper and kafka. But when I start things up, kafka does not connect to zookeeper. It comes back with this error in the log:
[2018-10-08 22:10:08,691] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server ‘zookeeper:2181’ with timeout of 6000 ms
Where does kafka get this name of the zookeeper server as “zookeeper”? I expected it would be “zoo-0” or “pzoo-0”. where is this name “zookeeper” configured for the server? And how do I change it?
Also, what is the difference between zoo and pzoo here? Thanks!
What is the URL of Kubernetes Dashboard? I could not access it. Thx.
URL is the http://:. To make kafka manager accessible from outside, specify the type as NodePort in kafka manager’s yml.
Lucas, can you please share your yaml files, I have not been able to connect to the service using node producer and consumer. I can produce & consume the messages inside the container.
same here, any ideas?
I think the service we created for kafka-manager is a ClusterIP, not a nodeport as suggested
I can reproduce everything except for the Kafka manager. Same issue. Port 0.
Thanks for sharing, I think you missed to mention the step to create kafka namespace,
kubectl apply -f 00-namespace.yml, before running the kubectl apply -f ./zookeeper, otherwise it will complain about
Error from server (NotFound): error when creating “zookeeper/10zookeeper-config.yml”: namespaces “kafka” not found
Error from server (NotFound): error when creating “zookeeper/10zookeeper-config.yml”: namespaces “kafka” not found
use this command to create the namespace:
kubectl create -f 00-namespace.yml
I was just made aware of project http://strimzi.io/ that has as its aim: “Strimzi provides a way to run an Apache Kafka cluster on OpenShift and Kubernetes in various deployment configurations.” Definitely worth a look.
Hi Lucas, were you able to resolve this issue eventually?
I cannot access the Kafla manager. I see port 0 in the kubernetes dashboard.
Hi Desmond, were you able to resolve the issue eventually?
You can access the Kafka Manager using “kubectl proxy” and then open http://localhost:8001/api/v1/namespaces/alvary-kafka/services/kafka-manager:/proxy/ in a web browser.
I was able to access the manager by editing the kafka-manager Service and changing the type to NodePort. You can do so by running :
kubectl edit svc kafka-manager –namespace kafka
You can still access it through port 80