Node.js application writing to MongoDB - Kafka Streams findings read from Kafka Topic written to MongoDB from Node image 47

Node.js application writing to MongoDB – Kafka Streams findings read from Kafka Topic written to MongoDB from Node

MongoDB is a popular, light weight, highly scalable, very fast and easy to use NoSQL document database. Written in C++, working with JSON documents (stored in binary format BSON), processing JavaScript commands using the V8 engine, MongoDB easily ties in into many different languages and platforms, one of which is Node.JS. In this article, I describe first of all how a very simple interaction between Node.JS and MongoDB can be implemented.

 

image

Then I do something a little more challenging: the Node.JS application consumes messages from an Apache Kafka topic and writes these messages to a MongoDB database collection, to make the results available for many clients to read and query. Finally I will show a little analytical query against the MongoDB collection, to retrieve some information we would not have been able to get from the plain Kafka Topic (although with Kafka Streams it just may be possible as well).

You will see the Mongo DB driver for Node.JS in action, as well as the kafka-node driver for Apache Kafka from Node.JS. All resources are in the GitHub Repo: https://github.com/lucasjellema/kafka-streams-running-topN.

Prerequisites

Node.JS is installed, as is MongoDB.

Run the MongoDB server. On Windows, the command is mongod, optionally followed by the dbpath parameter to specify in which directory the data files are to be stored

mongod --dbpath c:\node\nodetest1\data\

For the part where messages are consumed from a Kafka Topic, a running Apache Kafka Cluster is  available – as described in more detail in several previous articles such as https://technology.amis.nl/2017/02/13/kafka-streams-and-nodejs-consuming-and-periodically-reporting-in-node-js-on-the-results-from-a-kafka-streams-streaming-analytics-application/.

 

Getting Started

Start a new Node application, using npm init.

Into this application, install npm packages kafka-node en mongodb:

npm install mongodb –save

npm install kafka-node –save

This installs the two Node modules with their dependencies and adds them to the package.json

 

First Node Program – for Creating and Updating Two Static Documents

This simple Node.JS program uses the the mongodb driver for Node, connects to a MongoDB server running locally and a database called test. It then tries to update two documents in the top3 collection in the test database; if a document does not yet exist (based on the key which is the continent property) it is created. When the application is done running, two documents exist (and have their lastModified property set if they were updated).

var MongoClient = require('mongodb').MongoClient;
var assert = require('assert');

// connect string for mongodb server running locally, connecting to a database called test
var url = 'mongodb://127.0.0.1:27017/test';

MongoClient.connect(url, function(err, db) {
  assert.equal(null, err);
  console.log("Connected correctly to server.");
   var doc = {
        "continent" : "Europe",
         "nrs" : [ {"name":"Belgium"}, {"name":"Luxemburg"}]
      };
   var doc2 = {
        "continent" : "Asia",
         "nrs" : [ {"name":"China"}, {"name":"India"}]
      };
  insertDocument(db,doc, function() {
    console.log("returned from processing doc "+doc.continent);  
    insertDocument(db,doc2, function() {
      console.log("returned from processing doc "+doc2.continent);          
      db.close();
      console.log("Connection to database is closed. Two documents should exist, either just created or updated. ");
      console.log("From the MongoDB shell: db.top3.find() should list the documents. ");
    });
  });
});

var insertDocument = function(db, doc, callback) {
   // first try to update; if a document could be updated, we're done 
   console.log("Processing doc for "+doc.continent);
   updateTop3ForContinent( db, doc, function (results) {      
       if (!results || results.result.n == 0) {
          // the document was not updated so presumably it does not exist; let's insert it  
          db.collection('top3').insertOne( 
                doc
              , function(err, result) {
                   assert.equal(err, null);
                   callback();
                }
              );   
       }//if
       else {
         callback();
       }
 }); //updateTop3ForContinent
}; //insertDocument

var updateTop3ForContinent = function(db, top3 , callback) {
   db.collection('top3').updateOne(
      { "continent" : top3.continent },
      {
        $set: { "nrs": top3.nrs },
        $currentDate: { "lastModified": true }
      }, function(err, results) {
      //console.log(results);
      callback(results);
   });
};

The console output from the Node application:

image

The output on the MongoDB Shell:

image

Note: I have used db.top3.find() three times: before running the Node application, after it has ran once and after it has ran a second time. Note that after the second time, the lastModified property was added.

Second Node Program – Consume messages from Kafka Topic and Update MongoDB accordingly

This application registers as Kafka Consumer on the Topic Top3CountrySizePerContinent. Each message that is produced to that topic is consumed by the Node application and handled by function handleCountryMessage. This function parses the JSON message received from Kafka, adds a property continent derived from the key of the Kafka message, and calls the insertDocument function. This function attempts to update a record in the MongoDB collection top3 that has the same continent property value as the document passed in as parameter. If the update succeeds, the handling of the Kafka message is complete and the MongoDB collection  contains the most recent standings produced by the Kafka Streams application. If the update fails, presumably that happens because there is no record yet for the current continent. In that case, a new document is inserted for the continent.

image

/*
This program connects to MongoDB (using the mongodb module )
This program consumes Kafka messages from topic Top3CountrySizePerContinent to which the Running Top3 (size of countries by continent) is produced.

This program records each latest update of the top 3 largest countries for a continent in MongoDB. If a document does not yet exist for a continent (based on the key which is the continent property) it is inserted.

The program ensures that the MongoDB /test/top3 collection contains the latest Top 3 for each continent at any point in time.

*/

var MongoClient = require('mongodb').MongoClient;
var assert = require('assert');

var kafka = require('kafka-node')
var Consumer = kafka.Consumer
var client = new kafka.Client("ubuntu:2181/")
var countriesTopic = "Top3CountrySizePerContinent";


// connect string for mongodb server running locally, connecting to a database called test
var url = 'mongodb://127.0.0.1:27017/test';
var mongodb;

MongoClient.connect(url, function(err, db) {
  assert.equal(null, err);
  console.log("Connected correctly to MongoDB server.");
  mongodb = db;
});

var insertDocument = function(db, doc, callback) {
   // first try to update; if a document could be updated, we're done 
   updateTop3ForContinent( db, doc, function (results) {      
       if (!results || results.result.n == 0) {
          // the document was not updated so presumably it does not exist; let's insert it  
          db.collection('top3').insertOne( 
                doc
              , function(err, result) {
                   assert.equal(err, null);
                   console.log("Inserted doc for "+doc.continent);
                   callback();
                }
              );   
       }//if
       else {
         console.log("Updated doc for "+doc.continent);
         callback();
       }
 }); //updateTop3ForContinent
}; //insertDocument

var updateTop3ForContinent = function(db, top3 , callback) {
   db.collection('top3').updateOne(
      { "continent" : top3.continent },
      {
        $set: { "nrs": top3.nrs },
        $currentDate: { "lastModified": true }
      }, function(err, results) {
      //console.log(results);
      callback(results);
   });
};

// Configure Kafka Consumer for Kafka Top3 Topic and handle Kafka message (by calling updateSseClients)
var consumer = new Consumer(
  client,
  [],
  {fromOffset: true}
);

consumer.on('message', function (message) {
  handleCountryMessage(message);
});

consumer.addTopics([
  { topic: countriesTopic, partition: 0, offset: 0}
], () => console.log("topic "+countriesTopic+" added to consumer for listening"));

function handleCountryMessage(countryMessage) {
    var top3 = JSON.parse(countryMessage.value);
    var continent = new Buffer(countryMessage.key).toString('ascii');
    top3.continent = continent;
    // insert or update the top3 in the MongoDB server
    insertDocument(mongodb,top3, function() {
      console.log("Top3 recorded in MongoDB for "+top3.continent);  
    });

}// handleCountryMessage

Running the application produces the following output.

Producing Countries:

SNAGHTML44a937e

Producing Streaming Analysis – Running Top 3 per Continent:

SNAGHTML44b2c82

Processing Kafka Messages:

image

Resulting MongoDB collection:

SNAGHTML44bf675

And after a little while, here is the latest situation for Europe and Asia in the MongoDB collection :

image

Resulting from processing the latest Kafka Stream result messages:

image

 

 

Querying the MongoDB Collection

The current set of top3 documents – one for each continent – stored in MongoDB can be queried, using MongoDB find and aggregation facilities.

One query we can perform is to retrieve the top 5 largest countries in the world. Here is the query that gives us that insight. First it creates a single record per country (using unwind to join the nrs collection in each top3 document). The countries are then sorted by the size of each country (descending) and the first 5 of the sort result are retained. These five are then projected into a nicer looking output document that only contains continent, country and area fields.

db.top3.aggregate([ {$project: {nrs:1}},{$unwind:’$nrs’}, {$sort: {“nrs.size”:-1}}, {$limit:5}, {$project: {continent:’$nrs.continent’, country:’$nrs.name’, area:’$nrs.size’ }}])

db.top3.aggregate([ 
   {$project: {nrs:1}}
  ,{$unwind:'$nrs'}
  , {$sort: {"nrs.size":-1}}
  , {$limit:5}
  , {$project: {continent:'$nrs.continent', country:'$nrs.name', area:'$nrs.size' }}
])

image

(And because no continent has its number 3 country in the top 4 of this list, we can be sure that this top 5 is the actual top 5 of the world)

 

Resources

A very good read – although a little out of date – is this tutorial on 1st and 2nd steps with Node and Mongodb: http://cwbuecheler.com/web/tutorials/2013/node-express-mongo/ 

MongoDB Driver for Node.js in the official MongoDB documentation: https://docs.mongodb.com/getting-started/node/client/ 

Kafka Connect for MongoDB – YouTube intro – https://www.youtube.com/watch?v=AF9WyW4npwY 

Combining MongoDB and Apache Kafka – with a Java application talking and listening to both: https://www.mongodb.com/blog/post/mongodb-and-data-streaming-implementing-a-mongodb-kafka-consumer 

Tutorials Point MongoDB tutorials – https://www.tutorialspoint.com/mongodb/mongodb_sort_record.htm 

Data Aggregation with Node.JS driver for MongoDB – https://docs.mongodb.com/getting-started/node/aggregation/

2 Comments

  1. Lauri Jalonen February 21, 2018
    • Lucas Jellema February 22, 2018