15 Minutes to get a Kafka Cluster running on Kubernetes – and start producing and consuming from a Node application

Lucas Jellema 20
0 0
Read Time:5 Minute, 25 Second

imageFor  workshop I will present on microservices and communication patterns I need attendees to have their own local Kafka Cluster. I have found a way to have them up and running in virtually no time at all. Thanks to the combination of:

  • Kubernetes
  • Minikube
  • The Yolean/kubernetes-kafka GitHub Repo with Kubernetes yaml files that creates all we need (including Kafka Manager)

Prerequisites:

  • Minikube and Kubectl are installed
  • The Minikube cluster is running (minikube start)

In my case the versions are:

Minikube: v0.22.3, Kubectl Client 1.9 and (Kubernetes) Server 1.7:

image

 

 

The steps I went through:

Git Clone the GitHub Repository: https://github.com/Yolean/kubernetes-kafka

From the root directory of the cloned repository, run the following kubectl commands:

(note: I did not know until today that kubectl apply –f can be used with a directory reference and will then apply all yaml files in that directory. That is incredibly useful!)

kubectl apply -f ./configure/minikube-storageclass-broker.yml
kubectl apply -f ./configure/minikube-storageclass-zookeeper.yml

(note: I had to comment out the reclaimPolicy attribute in both files – probably because I am running a fairly old version of Kubernetes)

kubectl apply -f ./zookeeper

kubectl apply -f ./kafka

(note: I had to change API version in 50pzoo and 51zoo as well as in 50kafka.yaml from apiVersion: apps/v1beta2 to apiVersion: apps/v1beta1 – see https://github.com/kubernetes/kubernetes/issues/55894 for details; again, I should upgrade my Kubernetes version)

To make Kafka accessible from the minikube host (outside the K8S cluster itself)

kubectl apply -f ./outside-services

This exposes Services as type NodePort instead of ClusterIP, making them available for client applications that can access the Kubernetes host.

I also installed (Yahoo) Kafka Manager:

kubectl apply -f ./yahoo-kafka-manager

(I had to change API version in kafka-manager from apiVersion: apps/v1beta2 to apiVersion: apps/v1beta1 )

At this point, the Kafka Cluster is running. I can check the pods and services in the Kubernetes Dashboard as well as through kubectl on the command line. I can get the Port at which I can access the Kafka Brokers:

image

And I can access the Kafka Manager at the indicated Port.

image

Initially, no cluster is visible in Kafka Manager. By providing the Zookeeper information highlighted in the figure (zookeeper.kafka:2181) I can make the cluster visible in this user interface tool.

Finally the eating of the pudding: programmatic production and consumption of messages to and from the cluster. Using the world’s simplest Node Kafka clients, it is easy to see the stuff is working. I am impressed.

I have created the Node application and its package.json file. Then added the kafka-node dependency (npm install kafka-node –save). Next I created the producer:

// before running, either globally install kafka-node  (npm install kafka-node)
// or add kafka-node to the dependencies of the local application

var kafka = require('kafka-node')
var Producer = kafka.Producer
KeyedMessage = kafka.KeyedMessage;

var client;
KeyedMessage = kafka.KeyedMessage;

var APP_VERSION = "0.8.5"
var APP_NAME = "KafkaProducer"

var topicName = "a516817-kentekens";
var KAFKA_BROKER_IP = '192.168.99.100:32400';

// from the Oracle Event Hub - Platform Cluster Connect Descriptor
var kafkaConnectDescriptor = KAFKA_BROKER_IP;

console.log("Running Module " + APP_NAME + " version " + APP_VERSION);

function initializeKafkaProducer(attempt) {
  try {
    console.log(`Try to initialize Kafka Client at ${kafkaConnectDescriptor} and Producer, attempt ${attempt}`);
    const client = new kafka.KafkaClient({ kafkaHost: kafkaConnectDescriptor });
    console.log("created client");
    producer = new Producer(client);
    console.log("submitted async producer creation request");
    producer.on('ready', function () {
      console.log("Producer is ready in " + APP_NAME);
    });
    producer.on('error', function (err) {
      console.log("failed to create the client or the producer " + JSON.stringify(err));
    })
  }
  catch (e) {
    console.log("Exception in initializeKafkaProducer" + JSON.stringify(e));
    console.log("Try again in 5 seconds");
    setTimeout(initializeKafkaProducer, 5000, ++attempt);
  }
}//initializeKafkaProducer
initializeKafkaProducer(1);

var eventPublisher = module.exports;

eventPublisher.publishEvent = function (eventKey, event) {
  km = new KeyedMessage(eventKey, JSON.stringify(event));
  payloads = [
    { topic: topicName, messages: [km], partition: 0 }
  ];
  producer.send(payloads, function (err, data) {
    if (err) {
      console.error("Failed to publish event with key " + eventKey + " to topic " + topicName + " :" + JSON.stringify(err));
    }
    console.log("Published event with key " + eventKey + " to topic " + topicName + " :" + JSON.stringify(data));
  });

}

//example calls: (after waiting for three seconds to give the producer time to initialize)
setTimeout(function () {
  eventPublisher.publishEvent("mykey", { "kenteken": "56-TAG-2", "country": "nl" })
}
  , 3000)

and ran the producer:

image

The create the consumer:

var kafka = require('kafka-node');

var client;

var APP_VERSION = "0.8.5"
var APP_NAME = "KafkaConsumer"

var eventListenerAPI = module.exports;

var kafka = require('kafka-node')
var Consumer = kafka.Consumer

// from the Oracle Event Hub - Platform Cluster Connect Descriptor

var topicName = "a516817-kentekens";

console.log("Running Module " + APP_NAME + " version " + APP_VERSION);
console.log("Event Hub Topic " + topicName);

var KAFKA_BROKER_IP = '192.168.99.100:32400';

var consumerOptions = {
    kafkaHost: KAFKA_BROKER_IP,
    groupId: 'local-consume-events-from-event-hub-for-kenteken-applicatie',
    sessionTimeout: 15000,
    protocol: ['roundrobin'],
    fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
};

var topics = [topicName];
var consumerGroup = new kafka.ConsumerGroup(Object.assign({ id: 'consumerLocal' }, consumerOptions), topics);
consumerGroup.on('error', onError);
consumerGroup.on('message', onMessage);

consumerGroup.on('connect', function () {
    console.log('connected to ' + topicName + " at " + consumerOptions.host);
})

function onMessage(message) {
    console.log('%s read msg Topic="%s" Partition=%s Offset=%d'
    , this.client.clientId, message.topic, message.partition, message.offset);
}

function onError(error) {
    console.error(error);
    console.error(error.stack);
}

process.once('SIGINT', function () {
    async.each([consumerGroup], function (consumer, callback) {
        consumer.close(true, callback);
    });
});

and ran the consumer – which duly consumed the event published by the publisher. It is wonderful.

image

Use

kubectl delete ns kafka

or:

kubectl delete pods,services,pvc,pv,sts,cm –all –namespace=”kafka”

to remove the namespace kafka and its contents.

Resources

The main resources is the GitHub Repo: https://github.com/Yolean/kubernetes-kafka . Absolutely great stuff.

Also useful: npm package kafka-node – https://www.npmjs.com/package/kafka-node

Documentation on Kubernetes: https://kubernetes.io/docs/user-journeys/users/application-developer/foundational/#section-2 – with references to Kubectl and Minikube – and the Katakoda playground: https://www.katacoda.com/courses/kubernetes/playground

About Post Author

Lucas Jellema

Lucas Jellema, active in IT (and with Oracle) since 1994. Oracle ACE Director and Oracle Developer Champion. Solution architect and developer on diverse areas including SQL, JavaScript, Kubernetes & Docker, Machine Learning, Java, SOA and microservices, events in various shapes and forms and many other things. Author of the Oracle Press book Oracle SOA Suite 12c Handbook. Frequent presenter on user groups and community events and conferences such as JavaOne, Oracle Code, CodeOne, NLJUG JFall and Oracle OpenWorld.
Happy
Happy
0 %
Sad
Sad
0 %
Excited
Excited
0 %
Sleepy
Sleepy
0 %
Angry
Angry
0 %
Surprise
Surprise
0 %

Average Rating

5 Star
0%
4 Star
0%
3 Star
0%
2 Star
0%
1 Star
0%

20 thoughts on “15 Minutes to get a Kafka Cluster running on Kubernetes – and start producing and consuming from a Node application

  1. Hi There thanks for the post this is really helpful only one doubt, the git repository used here -> https://github.com/Yolean/kubernetes-kafka does not seem to have “config” folder in the master branch, as it is used in the very first command in this blog post “kubectl apply -f ./configure/minikube-storageclass-broker.yml” so I would like to know which branch needs to be checked out apart from master in order to get this config folder ?

  2. I followed the tutorial step by step but I get the following error for each pod:
    pod has unbound PersistentVolumeClaims

  3. This is a great post!

    The only change I made in the config files is to change the replicas to 1 instead of 3 for zookeeper and kafka. But when I start things up, kafka does not connect to zookeeper. It comes back with this error in the log:
    [2018-10-08 22:10:08,691] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server ‘zookeeper:2181’ with timeout of 6000 ms

    Where does kafka get this name of the zookeeper server as “zookeeper”? I expected it would be “zoo-0” or “pzoo-0”. where is this name “zookeeper” configured for the server? And how do I change it?

    Also, what is the difference between zoo and pzoo here? Thanks!

  4. Lucas, can you please share your yaml files, I have not been able to connect to the service using node producer and consumer. I can produce & consume the messages inside the container.

  5. I think the service we created for kafka-manager is a ClusterIP, not a nodeport as suggested

  6. Thanks for sharing, I think you missed to mention the step to create kafka namespace,
    kubectl apply -f 00-namespace.yml, before running the kubectl apply -f ./zookeeper, otherwise it will complain about
    Error from server (NotFound): error when creating “zookeeper/10zookeeper-config.yml”: namespaces “kafka” not found

  7. I was just made aware of project http://strimzi.io/ that has as its aim: “Strimzi provides a way to run an Apache Kafka cluster on OpenShift and Kubernetes in various deployment configurations.” Definitely worth a look.

    1. I was able to access the manager by editing the kafka-manager Service and changing the type to NodePort. You can do so by running :
      kubectl edit svc kafka-manager –namespace kafka

Comments are closed.

Next Post

Oracle API Platform Cloud Service: using the Developer Portal for discovering APIs via the API Catalog and subscribing applications to APIs

At the Oracle Partner PaaS Summer Camps VII 2017 in Lisbon last year, at the end of august, I attended the API Platform Cloud Service & Integration Cloud Service bootcamp. In a series of article’s I will give a high level overview of what you can do with Oracle API […]
%d bloggers like this: