Kafka Streams and NodeJS - Consuming and periodically reporting in Node.JS on the results from a Kafka Streams streaming analytics application image 36

Kafka Streams and NodeJS – Consuming and periodically reporting in Node.JS on the results from a Kafka Streams streaming analytics application

In several previous articles on Apache Kafka, Kafka Streams and Node.JS for interacting with Apache Kafka, I have described how to create a Node.JS application that publishes messages to a Kafka Topic (based on entries in a CSV file), how to create a simple Kafka Streams Java application that processes such messages from that Topic and how to extend that Java application to produce a running Top-N aggregation from that Topic. In this article, I want to discuss a Node application that consumes the Top-N reports from the Kafka Topic produced to by the Kafka Streams application and periodically (once every X seconds) reports on the current standings.

image

The sources for this article are in this GitHub Repo: https://github.com/lucasjellema/kafka-streams-running-topN.

The Node application uses the npm module kafka-node (https://www.npmjs.com/package/kafka-node) for the interaction with Kafka.

A new Client is created – based on the ZooKeeper connect string (ubuntu:2181/). Using the Client, a Consumer is constructed. The consumer is configured to consume from Topic Top3CountrySizePerContinent. A message handler is associated with the consumer, to handle messages on the topic.

The messages consumed by the Node consumer have the following structure:

{"topic":"Top3CountrySizePerContinent"
,"value":"{\"nrs\":[{\"code\":\"DZ\",\"name\":\"Algeria\",\"population\":40263711,\"size\":2381741,\"continent\":\"Africa\"},{\"code\":\"CD\",\"name\":\"Democratic Republic of the Congo\",\"population\":81331050,\"size\":2344858,\"continent\":\"Africa\"},{\"code\":\"SD\",\"name\":\"Sudan\",\"population\":36729501,\"size\":1861484,\"continent\":\"Africa\"},null]}"
,"offset":244
,"partition":0
,"key":{"type":"Buffer","data":[65,102,114,105,99,97]}
}

The key of the message is of type buffer. We happen to know the key is actually a String (the name of the continent). We can extract the key like this:

var continent = new Buffer(countryMessage.key).toString(‘ascii’);

The payload of the message – the top3 for the continent – is in the value property. It can be extracted easily:

var top3 = JSON.parse(countryMessage.value);

{"nrs":
  [
   {"code":"BS","name":"Bahamas","population":327316,"size":13880,"continent":"North America"}
  ,{"code":"AG","name":"Antigua and Barbuda","population":93581,"size":443,"continent":"North America"}
  ,{"code":"AW","name":"Aruba","population":113648,"size":180,"continent":"North America"}
  ,null
  ]
}

The object countrySizeStandings contains a property for each continent. The property is set equal to the top3 that was most recently consumed from the Kafka Topic Top3CountrySizePerContinent.

countrySizeStandings[continent]=top3;

Using the Node built in setInterval() the report() function is scheduled for execution every reportingIntervalInSecs seconds. This function writes the current data in countrySizeStandings to the console.

 

/*

This program consumes Kafka messages from topic Top3CountrySizePerContinent to which the Running Top3 (size of countries by continent) is produced.

This program reports: top 3 largest countries per continent (periodically, with a configurable interval) 
*/


var kafka = require('kafka-node')
var Consumer = kafka.Consumer
var client = new kafka.Client("ubuntu:2181/")

var countriesTopic = "Top3CountrySizePerContinent";
var reportingIntervalInSecs = 4;

var consumer = new Consumer(
  client,
  [],
  {fromOffset: true}
);

consumer.on('message', function (message) {
  handleCountryMessage(message);
});

consumer.addTopics([
  { topic: countriesTopic, partition: 0, offset: 0}
], () => console.log("topic "+countriesTopic+" added to consumer for listening"));

var countrySizeStandings = {}; // the global container for the most recent country size standings 

function handleCountryMessage(countryMessage) {
    var top3 = JSON.parse(countryMessage.value);
    // extract key value from the Kafka message
    var continent = new Buffer(countryMessage.key).toString('ascii');
    // record the top3 for the continent indicated by the message key as current standing in the countrySizeStandings object
    countrySizeStandings[continent]=top3;
}// handleCountryMessage

// every reportingIntervalInSecs seconds, report on the current standings per continent
function report() {
   var d = new Date();
   console.log("Report at "+ d.getHours()+":"+d.getMinutes()+ ":"+d.getSeconds());
   // loop over the keys (properties) in the countrySizeStandings map (object)
   for (var continent in countrySizeStandings) {
     if (countrySizeStandings.hasOwnProperty(continent)) {
        var line = continent+ ": ";
        var index = 1;
        countrySizeStandings[continent].nrs.forEach(function(c) {
          if (c) {
            line = line + (index++) +'. '+ c.name+ '('+c.size+'), ';
          }
        });
        console.log(line);
    }//if
  }//for
}//report

// schedule execution of function report at the indicated interval
setInterval(report, reportingIntervalInSecs*1000);

 

Running the end to end chain requires a running Kafka Cluster and the running of the Node application to produce the country messages from the CSV file, the Kafka Streams Java application to derive the running Top 3 standings and finally the Node application introduced in this article to consume the Top 3 standings and report them to the console (as instructed in the ReadMe in the GitHub Repo):

  • node KafkaCountryProducer.js
  • java -cp target/Kafka-Streams-Country-TopN-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App
  • node KafkaCountryStreamsConsumer.js

The CountryProducer.js Node application writes the messages it produced to Kafka to the console as well:

SNAGHTML1c22841

The Kafka-Streams-Country-TopN Java application also writes its streaming analytic findings to the console:

SNAGHTML1c325c1

The outcome of the Kafka Streams analysis – as published to the Kafka Topic – is consumed by the Node application, continuously, and reported to the console, periodically (once every 30 seconds), updated with the latest findings:

image

4 Comments

  1. naina August 13, 2019
  2. Dane David February 7, 2018