Kafka Streams and NodeJS – Consuming and periodically reporting in Node.JS on the results from a Kafka Streams streaming analytics application


In several previous articles on Apache Kafka, Kafka Streams and Node.JS for interacting with Apache Kafka, I have described how to create a Node.JS application that publishes messages to a Kafka Topic (based on entries in a CSV file), how to create a simple Kafka Streams Java application that processes such messages from that Topic and how to extend that Java application to produce a running Top-N aggregation from that Topic. In this article, I want to discuss a Node application that consumes the Top-N reports from the Kafka Topic produced to by the Kafka Streams application and periodically (once every X seconds) reports on the current standings.


The sources for this article are in this GitHub Repo: https://github.com/lucasjellema/kafka-streams-running-topN.

The Node application uses the npm module kafka-node (https://www.npmjs.com/package/kafka-node) for the interaction with Kafka.

A new Client is created – based on the ZooKeeper connect string (ubuntu:2181/). Using the Client, a Consumer is constructed. The consumer is configured to consume from Topic Top3CountrySizePerContinent. A message handler is associated with the consumer, to handle messages on the topic.

The messages consumed by the Node consumer have the following structure:

,"value":"{\"nrs\":[{\"code\":\"DZ\",\"name\":\"Algeria\",\"population\":40263711,\"size\":2381741,\"continent\":\"Africa\"},{\"code\":\"CD\",\"name\":\"Democratic Republic of the Congo\",\"population\":81331050,\"size\":2344858,\"continent\":\"Africa\"},{\"code\":\"SD\",\"name\":\"Sudan\",\"population\":36729501,\"size\":1861484,\"continent\":\"Africa\"},null]}"

The key of the message is of type buffer. We happen to know the key is actually a String (the name of the continent). We can extract the key like this:

var continent = new Buffer(countryMessage.key).toString(‘ascii’);

The payload of the message – the top3 for the continent – is in the value property. It can be extracted easily:

var top3 = JSON.parse(countryMessage.value);

   {"code":"BS","name":"Bahamas","population":327316,"size":13880,"continent":"North America"}
  ,{"code":"AG","name":"Antigua and Barbuda","population":93581,"size":443,"continent":"North America"}
  ,{"code":"AW","name":"Aruba","population":113648,"size":180,"continent":"North America"}

The object countrySizeStandings contains a property for each continent. The property is set equal to the top3 that was most recently consumed from the Kafka Topic Top3CountrySizePerContinent.


Using the Node built in setInterval() the report() function is scheduled for execution every reportingIntervalInSecs seconds. This function writes the current data in countrySizeStandings to the console.



This program consumes Kafka messages from topic Top3CountrySizePerContinent to which the Running Top3 (size of countries by continent) is produced.

This program reports: top 3 largest countries per continent (periodically, with a configurable interval) 

var kafka = require('kafka-node')
var Consumer = kafka.Consumer
var client = new kafka.Client("ubuntu:2181/")

var countriesTopic = "Top3CountrySizePerContinent";
var reportingIntervalInSecs = 4;

var consumer = new Consumer(
  {fromOffset: true}

consumer.on('message', function (message) {

  { topic: countriesTopic, partition: 0, offset: 0}
], () => console.log("topic "+countriesTopic+" added to consumer for listening"));

var countrySizeStandings = {}; // the global container for the most recent country size standings 

function handleCountryMessage(countryMessage) {
    var top3 = JSON.parse(countryMessage.value);
    // extract key value from the Kafka message
    var continent = new Buffer(countryMessage.key).toString('ascii');
    // record the top3 for the continent indicated by the message key as current standing in the countrySizeStandings object
}// handleCountryMessage

// every reportingIntervalInSecs seconds, report on the current standings per continent
function report() {
   var d = new Date();
   console.log("Report at "+ d.getHours()+":"+d.getMinutes()+ ":"+d.getSeconds());
   // loop over the keys (properties) in the countrySizeStandings map (object)
   for (var continent in countrySizeStandings) {
     if (countrySizeStandings.hasOwnProperty(continent)) {
        var line = continent+ ": ";
        var index = 1;
        countrySizeStandings[continent].nrs.forEach(function(c) {
          if (c) {
            line = line + (index++) +'. '+ c.name+ '('+c.size+'), ';

// schedule execution of function report at the indicated interval
setInterval(report, reportingIntervalInSecs*1000);


Running the end to end chain requires a running Kafka Cluster and the running of the Node application to produce the country messages from the CSV file, the Kafka Streams Java application to derive the running Top 3 standings and finally the Node application introduced in this article to consume the Top 3 standings and report them to the console (as instructed in the ReadMe in the GitHub Repo):

  • node KafkaCountryProducer.js
  • java -cp target/Kafka-Streams-Country-TopN-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App
  • node KafkaCountryStreamsConsumer.js

The CountryProducer.js Node application writes the messages it produced to Kafka to the console as well:


The Kafka-Streams-Country-TopN Java application also writes its streaming analytic findings to the console:


The outcome of the Kafka Streams analysis – as published to the Kafka Topic – is consumed by the Node application, continuously, and reported to the console, periodically (once every 30 seconds), updated with the latest findings:


About Author

Lucas Jellema, active in IT (and with Oracle) since 1994. Oracle ACE Director and Oracle Developer Champion. Solution architect and developer on diverse areas including SQL, JavaScript, Docker, Machine Learning, Java, SOA and microservices, events in various shapes and forms and many other things. Author of the Oracle Press books: Oracle SOA Suite 11g Handbook and Oracle SOA Suite 12c Handbook. Frequent presenter on community events and conferences such as JavaOne, Oracle Code and Oracle OpenWorld.


Leave a Reply