15 Minutes to get a Kafka Cluster running on Kubernetes – and start producing and consuming from a Node application


imageFor  workshop I will present on microservices and communication patterns I need attendees to have their own local Kafka Cluster. I have found a way to have them up and running in virtually no time at all. Thanks to the combination of:

  • Kubernetes
  • Minikube
  • The Yolean/kubernetes-kafka GitHub Repo with Kubernetes yaml files that creates all we need (including Kafka Manager)


  • Minikube and Kubectl are installed
  • The Minikube cluster is running (minikube start)

In my case the versions are:

Minikube: v0.22.3, Kubectl Client 1.9 and (Kubernetes) Server 1.7:




The steps I went through:

Git Clone the GitHub Repository: https://github.com/Yolean/kubernetes-kafka

From the root directory of the cloned repository, run the following kubectl commands:

(note: I did not know until today that kubectl apply –f can be used with a directory reference and will then apply all yaml files in that directory. That is incredibly useful!)

kubectl apply -f ./configure/minikube-storageclass-broker.yml
kubectl apply -f ./configure/minikube-storageclass-zookeeper.yml

(note: I had to comment out the reclaimPolicy attribute in both files – probably because I am running a fairly old version of Kubernetes)

kubectl apply -f ./zookeeper

kubectl apply -f ./kafka

(note: I had to change API version in 50pzoo and 51zoo as well as in 50kafka.yaml from apiVersion: apps/v1beta2 to apiVersion: apps/v1beta1 – see https://github.com/kubernetes/kubernetes/issues/55894 for details; again, I should upgrade my Kubernetes version)

To make Kafka accessible from the minikube host (outside the K8S cluster itself)

kubectl apply -f ./outside-services

This exposes Services as type NodePort instead of ClusterIP, making them available for client applications that can access the Kubernetes host.

I also installed (Yahoo) Kafka Manager:

kubectl apply -f ./yahoo-kafka-manager

(I had to change API version in kafka-manager from apiVersion: apps/v1beta2 to apiVersion: apps/v1beta1 )

At this point, the Kafka Cluster is running. I can check the pods and services in the Kubernetes Dashboard as well as through kubectl on the command line. I can get the Port at which I can access the Kafka Brokers:


And I can access the Kafka Manager at the indicated Port.


Initially, no cluster is visible in Kafka Manager. By providing the Zookeeper information highlighted in the figure (zookeeper.kafka:2181) I can make the cluster visible in this user interface tool.

Finally the eating of the pudding: programmatic production and consumption of messages to and from the cluster. Using the world’s simplest Node Kafka clients, it is easy to see the stuff is working. I am impressed.

I have created the Node application and its package.json file. Then added the kafka-node dependency (npm install kafka-node –save). Next I created the producer:

// before running, either globally install kafka-node  (npm install kafka-node)
// or add kafka-node to the dependencies of the local application

var kafka = require('kafka-node')
var Producer = kafka.Producer
KeyedMessage = kafka.KeyedMessage;

var client;
KeyedMessage = kafka.KeyedMessage;

var APP_VERSION = "0.8.5"
var APP_NAME = "KafkaProducer"

var topicName = "a516817-kentekens";

// from the Oracle Event Hub - Platform Cluster Connect Descriptor
var kafkaConnectDescriptor = KAFKA_BROKER_IP;

console.log("Running Module " + APP_NAME + " version " + APP_VERSION);

function initializeKafkaProducer(attempt) {
  try {
    console.log(`Try to initialize Kafka Client at ${kafkaConnectDescriptor} and Producer, attempt ${attempt}`);
    const client = new kafka.KafkaClient({ kafkaHost: kafkaConnectDescriptor });
    console.log("created client");
    producer = new Producer(client);
    console.log("submitted async producer creation request");
    producer.on('ready', function () {
      console.log("Producer is ready in " + APP_NAME);
    producer.on('error', function (err) {
      console.log("failed to create the client or the producer " + JSON.stringify(err));
  catch (e) {
    console.log("Exception in initializeKafkaProducer" + JSON.stringify(e));
    console.log("Try again in 5 seconds");
    setTimeout(initializeKafkaProducer, 5000, ++attempt);

var eventPublisher = module.exports;

eventPublisher.publishEvent = function (eventKey, event) {
  km = new KeyedMessage(eventKey, JSON.stringify(event));
  payloads = [
    { topic: topicName, messages: [km], partition: 0 }
  producer.send(payloads, function (err, data) {
    if (err) {
      console.error("Failed to publish event with key " + eventKey + " to topic " + topicName + " :" + JSON.stringify(err));
    console.log("Published event with key " + eventKey + " to topic " + topicName + " :" + JSON.stringify(data));


//example calls: (after waiting for three seconds to give the producer time to initialize)
setTimeout(function () {
  eventPublisher.publishEvent("mykey", { "kenteken": "56-TAG-2", "country": "nl" })
  , 3000)

and ran the producer:


The create the consumer:

var kafka = require('kafka-node');

var client;

var APP_VERSION = "0.8.5"
var APP_NAME = "KafkaConsumer"

var eventListenerAPI = module.exports;

var kafka = require('kafka-node')
var Consumer = kafka.Consumer

// from the Oracle Event Hub - Platform Cluster Connect Descriptor

var topicName = "a516817-kentekens";

console.log("Running Module " + APP_NAME + " version " + APP_VERSION);
console.log("Event Hub Topic " + topicName);


var consumerOptions = {
    kafkaHost: KAFKA_BROKER_IP,
    groupId: 'local-consume-events-from-event-hub-for-kenteken-applicatie',
    sessionTimeout: 15000,
    protocol: ['roundrobin'],
    fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'

var topics = [topicName];
var consumerGroup = new kafka.ConsumerGroup(Object.assign({ id: 'consumerLocal' }, consumerOptions), topics);
consumerGroup.on('error', onError);
consumerGroup.on('message', onMessage);

consumerGroup.on('connect', function () {
    console.log('connected to ' + topicName + " at " + consumerOptions.host);

function onMessage(message) {
    console.log('%s read msg Topic="%s" Partition=%s Offset=%d'
    , this.client.clientId, message.topic, message.partition, message.offset);

function onError(error) {

process.once('SIGINT', function () {
    async.each([consumerGroup], function (consumer, callback) {
        consumer.close(true, callback);

and ran the consumer – which duly consumed the event published by the publisher. It is wonderful.



kubectl delete ns kafka


kubectl delete pods,services,pvc,pv,sts,cm –all –namespace=”kafka”

to remove the namespace kafka and its contents.


The main resources is the GitHub Repo: https://github.com/Yolean/kubernetes-kafka . Absolutely great stuff.

Also useful: npm package kafka-node – https://www.npmjs.com/package/kafka-node

Documentation on Kubernetes: https://kubernetes.io/docs/user-journeys/users/application-developer/foundational/#section-2 – with references to Kubectl and Minikube – and the Katakoda playground: https://www.katacoda.com/courses/kubernetes/playground

About Author

Lucas Jellema, active in IT (and with Oracle) since 1994. Oracle ACE Director and Oracle Developer Champion. Solution architect and developer on diverse areas including SQL, JavaScript, Kubernetes & Docker, Machine Learning, Java, SOA and microservices, events in various shapes and forms and many other things. Author of the Oracle Press book Oracle SOA Suite 12c Handbook. Frequent presenter on user groups and community events and conferences such as JavaOne, Oracle Code, CodeOne, NLJUG JFall and Oracle OpenWorld.


  1. I followed the tutorial step by step but I get the following error for each pod:
    pod has unbound PersistentVolumeClaims

  2. This is a great post!

    The only change I made in the config files is to change the replicas to 1 instead of 3 for zookeeper and kafka. But when I start things up, kafka does not connect to zookeeper. It comes back with this error in the log:
    [2018-10-08 22:10:08,691] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server ‘zookeeper:2181’ with timeout of 6000 ms

    Where does kafka get this name of the zookeeper server as “zookeeper”? I expected it would be “zoo-0” or “pzoo-0”. where is this name “zookeeper” configured for the server? And how do I change it?

    Also, what is the difference between zoo and pzoo here? Thanks!

  3. Lucas, can you please share your yaml files, I have not been able to connect to the service using node producer and consumer. I can produce & consume the messages inside the container.

  4. Thanks for sharing, I think you missed to mention the step to create kafka namespace,
    kubectl apply -f 00-namespace.yml, before running the kubectl apply -f ./zookeeper, otherwise it will complain about
    Error from server (NotFound): error when creating “zookeeper/10zookeeper-config.yml”: namespaces “kafka” not found

  5. Lucas Jellema on

    I was just made aware of project http://strimzi.io/ that has as its aim: “Strimzi provides a way to run an Apache Kafka cluster on OpenShift and Kubernetes in various deployment configurations.” Definitely worth a look.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.