In several previous articles on Apache Kafka, Kafka Streams and Node.JS for interacting with Apache Kafka, I have described how to create a Node.JS application that publishes messages to a Kafka Topic (based on entries in a CSV file), how to create a simple Kafka Streams Java application that processes such messages from that Topic and how to extend that Java application to produce a running Top-N aggregation from that Topic. In this article, I want to discuss a Node application that consumes the Top-N reports from the Kafka Topic produced to by the Kafka Streams application and periodically (once every X seconds) reports on the current standings.
The sources for this article are in this GitHub Repo: https://github.com/lucasjellema/kafka-streams-running-topN.
The Node application uses the npm module kafka-node (https://www.npmjs.com/package/kafka-node) for the interaction with Kafka.
A new Client is created – based on the ZooKeeper connect string (ubuntu:2181/). Using the Client, a Consumer is constructed. The consumer is configured to consume from Topic Top3CountrySizePerContinent. A message handler is associated with the consumer, to handle messages on the topic.
The messages consumed by the Node consumer have the following structure:
{"topic":"Top3CountrySizePerContinent" ,"value":"{\"nrs\":[{\"code\":\"DZ\",\"name\":\"Algeria\",\"population\":40263711,\"size\":2381741,\"continent\":\"Africa\"},{\"code\":\"CD\",\"name\":\"Democratic Republic of the Congo\",\"population\":81331050,\"size\":2344858,\"continent\":\"Africa\"},{\"code\":\"SD\",\"name\":\"Sudan\",\"population\":36729501,\"size\":1861484,\"continent\":\"Africa\"},null]}" ,"offset":244 ,"partition":0 ,"key":{"type":"Buffer","data":[65,102,114,105,99,97]} }
The key of the message is of type buffer. We happen to know the key is actually a String (the name of the continent). We can extract the key like this:
var continent = new Buffer(countryMessage.key).toString(‘ascii’);
The payload of the message – the top3 for the continent – is in the value property. It can be extracted easily:
var top3 = JSON.parse(countryMessage.value);
{"nrs": [ {"code":"BS","name":"Bahamas","population":327316,"size":13880,"continent":"North America"} ,{"code":"AG","name":"Antigua and Barbuda","population":93581,"size":443,"continent":"North America"} ,{"code":"AW","name":"Aruba","population":113648,"size":180,"continent":"North America"} ,null ] }
The object countrySizeStandings contains a property for each continent. The property is set equal to the top3 that was most recently consumed from the Kafka Topic Top3CountrySizePerContinent.
countrySizeStandings[continent]=top3;
Using the Node built in setInterval() the report() function is scheduled for execution every reportingIntervalInSecs seconds. This function writes the current data in countrySizeStandings to the console.
/* This program consumes Kafka messages from topic Top3CountrySizePerContinent to which the Running Top3 (size of countries by continent) is produced. This program reports: top 3 largest countries per continent (periodically, with a configurable interval) */ var kafka = require('kafka-node') var Consumer = kafka.Consumer var client = new kafka.Client("ubuntu:2181/") var countriesTopic = "Top3CountrySizePerContinent"; var reportingIntervalInSecs = 4; var consumer = new Consumer( client, [], {fromOffset: true} ); consumer.on('message', function (message) { handleCountryMessage(message); }); consumer.addTopics([ { topic: countriesTopic, partition: 0, offset: 0} ], () => console.log("topic "+countriesTopic+" added to consumer for listening")); var countrySizeStandings = {}; // the global container for the most recent country size standings function handleCountryMessage(countryMessage) { var top3 = JSON.parse(countryMessage.value); // extract key value from the Kafka message var continent = new Buffer(countryMessage.key).toString('ascii'); // record the top3 for the continent indicated by the message key as current standing in the countrySizeStandings object countrySizeStandings[continent]=top3; }// handleCountryMessage // every reportingIntervalInSecs seconds, report on the current standings per continent function report() { var d = new Date(); console.log("Report at "+ d.getHours()+":"+d.getMinutes()+ ":"+d.getSeconds()); // loop over the keys (properties) in the countrySizeStandings map (object) for (var continent in countrySizeStandings) { if (countrySizeStandings.hasOwnProperty(continent)) { var line = continent+ ": "; var index = 1; countrySizeStandings[continent].nrs.forEach(function(c) { if (c) { line = line + (index++) +'. '+ c.name+ '('+c.size+'), '; } }); console.log(line); }//if }//for }//report // schedule execution of function report at the indicated interval setInterval(report, reportingIntervalInSecs*1000);
Running the end to end chain requires a running Kafka Cluster and the running of the Node application to produce the country messages from the CSV file, the Kafka Streams Java application to derive the running Top 3 standings and finally the Node application introduced in this article to consume the Top 3 standings and report them to the console (as instructed in the ReadMe in the GitHub Repo):
- node KafkaCountryProducer.js
- java -cp target/Kafka-Streams-Country-TopN-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App
- node KafkaCountryStreamsConsumer.js
The CountryProducer.js Node application writes the messages it produced to Kafka to the console as well:
The Kafka-Streams-Country-TopN Java application also writes its streaming analytic findings to the console:
The outcome of the Kafka Streams analysis – as published to the Kafka Topic – is consumed by the Node application, continuously, and reported to the console, periodically (once every 30 seconds), updated with the latest findings:
i am facing error at line var client = new kafka.Client(“localhost:2181/”) saying TypeError: kafka.Client is not a constructor
Is there any library in JavaScript for programming with Kafka Stream, instead of Java ?
Hi Dane,
No, at present there is not.
kind regards
Lucas
This appears to be a work in progress, but looks promising: https://www.npmjs.com/package/kafka-streams#aim-of-this-library