15 Minutes to get a Kafka Cluster running on Kubernetes – and start producing and consuming from a Node application

2

imageFor  workshop I will present on microservices and communication patterns I need attendees to have their own local Kafka Cluster. I have found a way to have them up and running in virtually no time at all. Thanks to the combination of:

  • Kubernetes
  • Minikube
  • The Yolean/kubernetes-kafka GitHub Repo with Kubernetes yaml files that creates all we need (including Kafka Manager)

Prerequisites:

  • Minikube and Kubectl are installed
  • The Minikube cluster is running (minikube start)

In my case the versions are:

Minikube: v0.22.3, Kubectl Client 1.9 and (Kubernetes) Server 1.7:

image

 

 

The steps I went through:

Git Clone the GitHub Repository: https://github.com/Yolean/kubernetes-kafka

From the root directory of the cloned repository, run the following kubectl commands:

(note: I did not know until today that kubectl apply –f can be used with a directory reference and will then apply all yaml files in that directory. That is incredibly useful!)

kubectl apply -f ./configure/minikube-storageclass-broker.yml
kubectl apply -f ./configure/minikube-storageclass-zookeeper.yml

(note: I had to comment out the reclaimPolicy attribute in both files – probably because I am running a fairly old version of Kubernetes)

kubectl apply -f ./zookeeper

kubectl apply -f ./kafka

(note: I had to change API version in 50pzoo and 51zoo as well as in 50kafka.yaml from apiVersion: apps/v1beta2 to apiVersion: apps/v1beta1 – see https://github.com/kubernetes/kubernetes/issues/55894 for details; again, I should upgrade my Kubernetes version)

To make Kafka accessible from the minikube host (outside the K8S cluster itself)

kubectl apply -f ./outside-services

This exposes Services as type NodePort instead of ClusterIP, making them available for client applications that can access the Kubernetes host.

I also installed (Yahoo) Kafka Manager:

kubectl apply -f ./yahoo-kafka-manager

(I had to change API version in kafka-manager from apiVersion: apps/v1beta2 to apiVersion: apps/v1beta1 )

At this point, the Kafka Cluster is running. I can check the pods and services in the Kubernetes Dashboard as well as through kubectl on the command line. I can get the Port at which I can access the Kafka Brokers:

image

And I can access the Kafka Manager at the indicated Port.

image

Initially, no cluster is visible in Kafka Manager. By providing the Zookeeper information highlighted in the figure (zookeeper.kafka:2181) I can make the cluster visible in this user interface tool.

Finally the eating of the pudding: programmatic production and consumption of messages to and from the cluster. Using the world’s simplest Node Kafka clients, it is easy to see the stuff is working. I am impressed.

I have created the Node application and its package.json file. Then added the kafka-node dependency (npm install kafka-node –save). Next I created the producer:

// before running, either globally install kafka-node  (npm install kafka-node)
// or add kafka-node to the dependencies of the local application

var kafka = require('kafka-node')
var Producer = kafka.Producer
KeyedMessage = kafka.KeyedMessage;

var client;
KeyedMessage = kafka.KeyedMessage;

var APP_VERSION = "0.8.5"
var APP_NAME = "KafkaProducer"

var topicName = "a516817-kentekens";
var KAFKA_BROKER_IP = '192.168.99.100:32400';

// from the Oracle Event Hub - Platform Cluster Connect Descriptor
var kafkaConnectDescriptor = KAFKA_BROKER_IP;

console.log("Running Module " + APP_NAME + " version " + APP_VERSION);

function initializeKafkaProducer(attempt) {
  try {
    console.log(`Try to initialize Kafka Client at ${kafkaConnectDescriptor} and Producer, attempt ${attempt}`);
    const client = new kafka.KafkaClient({ kafkaHost: kafkaConnectDescriptor });
    console.log("created client");
    producer = new Producer(client);
    console.log("submitted async producer creation request");
    producer.on('ready', function () {
      console.log("Producer is ready in " + APP_NAME);
    });
    producer.on('error', function (err) {
      console.log("failed to create the client or the producer " + JSON.stringify(err));
    })
  }
  catch (e) {
    console.log("Exception in initializeKafkaProducer" + JSON.stringify(e));
    console.log("Try again in 5 seconds");
    setTimeout(initializeKafkaProducer, 5000, ++attempt);
  }
}//initializeKafkaProducer
initializeKafkaProducer(1);

var eventPublisher = module.exports;

eventPublisher.publishEvent = function (eventKey, event) {
  km = new KeyedMessage(eventKey, JSON.stringify(event));
  payloads = [
    { topic: topicName, messages: [km], partition: 0 }
  ];
  producer.send(payloads, function (err, data) {
    if (err) {
      console.error("Failed to publish event with key " + eventKey + " to topic " + topicName + " :" + JSON.stringify(err));
    }
    console.log("Published event with key " + eventKey + " to topic " + topicName + " :" + JSON.stringify(data));
  });

}

//example calls: (after waiting for three seconds to give the producer time to initialize)
setTimeout(function () {
  eventPublisher.publishEvent("mykey", { "kenteken": "56-TAG-2", "country": "nl" })
}
  , 3000)

and ran the producer:

image

The create the consumer:

var kafka = require('kafka-node');

var client;

var APP_VERSION = "0.8.5"
var APP_NAME = "KafkaConsumer"

var eventListenerAPI = module.exports;

var kafka = require('kafka-node')
var Consumer = kafka.Consumer

// from the Oracle Event Hub - Platform Cluster Connect Descriptor

var topicName = "a516817-kentekens";

console.log("Running Module " + APP_NAME + " version " + APP_VERSION);
console.log("Event Hub Topic " + topicName);

var KAFKA_BROKER_IP = '192.168.99.100:32400';

var consumerOptions = {
    kafkaHost: KAFKA_BROKER_IP,
    groupId: 'local-consume-events-from-event-hub-for-kenteken-applicatie',
    sessionTimeout: 15000,
    protocol: ['roundrobin'],
    fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
};

var topics = [topicName];
var consumerGroup = new kafka.ConsumerGroup(Object.assign({ id: 'consumerLocal' }, consumerOptions), topics);
consumerGroup.on('error', onError);
consumerGroup.on('message', onMessage);

consumerGroup.on('connect', function () {
    console.log('connected to ' + topicName + " at " + consumerOptions.host);
})

function onMessage(message) {
    console.log('%s read msg Topic="%s" Partition=%s Offset=%d'
    , this.client.clientId, message.topic, message.partition, message.offset);
}

function onError(error) {
    console.error(error);
    console.error(error.stack);
}

process.once('SIGINT', function () {
    async.each([consumerGroup], function (consumer, callback) {
        consumer.close(true, callback);
    });
});

and ran the consumer – which duly consumed the event published by the publisher. It is wonderful.

image

Use

kubectl delete ns kafka

or:

kubectl delete pods,services,pvc,pv,sts,cm –all –namespace=”kafka”

to remove the namespace kafka and its contents.

Resources

The main resources is the GitHub Repo: https://github.com/Yolean/kubernetes-kafka . Absolutely great stuff.

Also useful: npm package kafka-node – https://www.npmjs.com/package/kafka-node

Documentation on Kubernetes: https://kubernetes.io/docs/user-journeys/users/application-developer/foundational/#section-2 – with references to Kubectl and Minikube – and the Katakoda playground: https://www.katacoda.com/courses/kubernetes/playground

About Author

Lucas Jellema, active in IT (and with Oracle) since 1994. Oracle ACE Director and Oracle Developer Champion. Solution architect and developer on diverse areas including SQL, JavaScript, Docker, Machine Learning, Java, SOA and microservices, events in various shapes and forms and many other things. Author of the Oracle Press books: Oracle SOA Suite 11g Handbook and Oracle SOA Suite 12c Handbook. Frequent presenter on community events and conferences such as JavaOne, Oracle Code and Oracle OpenWorld.

2 Comments

  1. Lucas Jellema on

    I was just made aware of project http://strimzi.io/ that has as its aim: “Strimzi provides a way to run an Apache Kafka cluster on OpenShift and Kubernetes in various deployment configurations.” Definitely worth a look.

Leave a Reply