In several previous articles on Apache Kafka, Kafka Streams and Node.JS for interacting with Apache Kafka, I have described how to create a Node.JS application that publishes messages to a Kafka Topic (based on entries in a CSV file), how to create a simple Kafka Streams Java application that processes such messages from that Topic and how to extend that Java application to produce a running Top-N aggregation from that Topic. In this article, I want to discuss a Node application that consumes the Top-N reports from the Kafka Topic produced to by the Kafka Streams application and periodically (once every X seconds) reports on the current standings.
The sources for this article are in this GitHub Repo: https://github.com/lucasjellema/kafka-streams-running-topN.
The Node application uses the npm module kafka-node (https://www.npmjs.com/package/kafka-node) for the interaction with Kafka.
A new Client is created – based on the ZooKeeper connect string (ubuntu:2181/). Using the Client, a Consumer is constructed. The consumer is configured to consume from Topic Top3CountrySizePerContinent. A message handler is associated with the consumer, to handle messages on the topic.
The messages consumed by the Node consumer have the following structure:
{"topic":"Top3CountrySizePerContinent"
,"value":"{\"nrs\":[{\"code\":\"DZ\",\"name\":\"Algeria\",\"population\":40263711,\"size\":2381741,\"continent\":\"Africa\"},{\"code\":\"CD\",\"name\":\"Democratic Republic of the Congo\",\"population\":81331050,\"size\":2344858,\"continent\":\"Africa\"},{\"code\":\"SD\",\"name\":\"Sudan\",\"population\":36729501,\"size\":1861484,\"continent\":\"Africa\"},null]}"
,"offset":244
,"partition":0
,"key":{"type":"Buffer","data":[65,102,114,105,99,97]}
}
The key of the message is of type buffer. We happen to know the key is actually a String (the name of the continent). We can extract the key like this:
var continent = new Buffer(countryMessage.key).toString(‘ascii’);
The payload of the message – the top3 for the continent – is in the value property. It can be extracted easily:
var top3 = JSON.parse(countryMessage.value);
{"nrs":
[
{"code":"BS","name":"Bahamas","population":327316,"size":13880,"continent":"North America"}
,{"code":"AG","name":"Antigua and Barbuda","population":93581,"size":443,"continent":"North America"}
,{"code":"AW","name":"Aruba","population":113648,"size":180,"continent":"North America"}
,null
]
}
The object countrySizeStandings contains a property for each continent. The property is set equal to the top3 that was most recently consumed from the Kafka Topic Top3CountrySizePerContinent.
countrySizeStandings[continent]=top3;
Using the Node built in setInterval() the report() function is scheduled for execution every reportingIntervalInSecs seconds. This function writes the current data in countrySizeStandings to the console.
/*
This program consumes Kafka messages from topic Top3CountrySizePerContinent to which the Running Top3 (size of countries by continent) is produced.
This program reports: top 3 largest countries per continent (periodically, with a configurable interval)
*/
var kafka = require('kafka-node')
var Consumer = kafka.Consumer
var client = new kafka.Client("ubuntu:2181/")
var countriesTopic = "Top3CountrySizePerContinent";
var reportingIntervalInSecs = 4;
var consumer = new Consumer(
client,
[],
{fromOffset: true}
);
consumer.on('message', function (message) {
handleCountryMessage(message);
});
consumer.addTopics([
{ topic: countriesTopic, partition: 0, offset: 0}
], () => console.log("topic "+countriesTopic+" added to consumer for listening"));
var countrySizeStandings = {}; // the global container for the most recent country size standings
function handleCountryMessage(countryMessage) {
var top3 = JSON.parse(countryMessage.value);
// extract key value from the Kafka message
var continent = new Buffer(countryMessage.key).toString('ascii');
// record the top3 for the continent indicated by the message key as current standing in the countrySizeStandings object
countrySizeStandings[continent]=top3;
}// handleCountryMessage
// every reportingIntervalInSecs seconds, report on the current standings per continent
function report() {
var d = new Date();
console.log("Report at "+ d.getHours()+":"+d.getMinutes()+ ":"+d.getSeconds());
// loop over the keys (properties) in the countrySizeStandings map (object)
for (var continent in countrySizeStandings) {
if (countrySizeStandings.hasOwnProperty(continent)) {
var line = continent+ ": ";
var index = 1;
countrySizeStandings[continent].nrs.forEach(function(c) {
if (c) {
line = line + (index++) +'. '+ c.name+ '('+c.size+'), ';
}
});
console.log(line);
}//if
}//for
}//report
// schedule execution of function report at the indicated interval
setInterval(report, reportingIntervalInSecs*1000);
Running the end to end chain requires a running Kafka Cluster and the running of the Node application to produce the country messages from the CSV file, the Kafka Streams Java application to derive the running Top 3 standings and finally the Node application introduced in this article to consume the Top 3 standings and report them to the console (as instructed in the ReadMe in the GitHub Repo):
- node KafkaCountryProducer.js
- java -cp target/Kafka-Streams-Country-TopN-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App
- node KafkaCountryStreamsConsumer.js
The CountryProducer.js Node application writes the messages it produced to Kafka to the console as well:
The Kafka-Streams-Country-TopN Java application also writes its streaming analytic findings to the console:
The outcome of the Kafka Streams analysis – as published to the Kafka Topic – is consumed by the Node application, continuously, and reported to the console, periodically (once every 30 seconds), updated with the latest findings:

i am facing error at line var client = new kafka.Client(“localhost:2181/”) saying TypeError: kafka.Client is not a constructor
Is there any library in JavaScript for programming with Kafka Stream, instead of Java ?
Hi Dane,
No, at present there is not.
kind regards
Lucas
This appears to be a work in progress, but looks promising: https://www.npmjs.com/package/kafka-streams#aim-of-this-library