MongoDB is a popular, light weight, highly scalable, very fast and easy to use NoSQL document database. Written in C++, working with JSON documents (stored in binary format BSON), processing JavaScript commands using the V8 engine, MongoDB easily ties in into many different languages and platforms, one of which is Node.JS. In this article, I describe first of all how a very simple interaction between Node.JS and MongoDB can be implemented.
Then I do something a little more challenging: the Node.JS application consumes messages from an Apache Kafka topic and writes these messages to a MongoDB database collection, to make the results available for many clients to read and query. Finally I will show a little analytical query against the MongoDB collection, to retrieve some information we would not have been able to get from the plain Kafka Topic (although with Kafka Streams it just may be possible as well).
You will see the Mongo DB driver for Node.JS in action, as well as the kafka-node driver for Apache Kafka from Node.JS. All resources are in the GitHub Repo: https://github.com/lucasjellema/kafka-streams-running-topN.
Prerequisites
Node.JS is installed, as is MongoDB.
Run the MongoDB server. On Windows, the command is mongod, optionally followed by the dbpath parameter to specify in which directory the data files are to be stored
mongod --dbpath c:\node\nodetest1\data\
For the part where messages are consumed from a Kafka Topic, a running Apache Kafka Cluster is available – as described in more detail in several previous articles such as https://technology.amis.nl/2017/02/13/kafka-streams-and-nodejs-consuming-and-periodically-reporting-in-node-js-on-the-results-from-a-kafka-streams-streaming-analytics-application/.
Getting Started
Start a new Node application, using npm init.
Into this application, install npm packages kafka-node en mongodb:
npm install mongodb –save
npm install kafka-node –save
This installs the two Node modules with their dependencies and adds them to the package.json
First Node Program – for Creating and Updating Two Static Documents
This simple Node.JS program uses the the mongodb driver for Node, connects to a MongoDB server running locally and a database called test. It then tries to update two documents in the top3 collection in the test database; if a document does not yet exist (based on the key which is the continent property) it is created. When the application is done running, two documents exist (and have their lastModified property set if they were updated).
var MongoClient = require('mongodb').MongoClient; var assert = require('assert'); // connect string for mongodb server running locally, connecting to a database called test var url = 'mongodb://127.0.0.1:27017/test'; MongoClient.connect(url, function(err, db) { assert.equal(null, err); console.log("Connected correctly to server."); var doc = { "continent" : "Europe", "nrs" : [ {"name":"Belgium"}, {"name":"Luxemburg"}] }; var doc2 = { "continent" : "Asia", "nrs" : [ {"name":"China"}, {"name":"India"}] }; insertDocument(db,doc, function() { console.log("returned from processing doc "+doc.continent); insertDocument(db,doc2, function() { console.log("returned from processing doc "+doc2.continent); db.close(); console.log("Connection to database is closed. Two documents should exist, either just created or updated. "); console.log("From the MongoDB shell: db.top3.find() should list the documents. "); }); }); }); var insertDocument = function(db, doc, callback) { // first try to update; if a document could be updated, we're done console.log("Processing doc for "+doc.continent); updateTop3ForContinent( db, doc, function (results) { if (!results || results.result.n == 0) { // the document was not updated so presumably it does not exist; let's insert it db.collection('top3').insertOne( doc , function(err, result) { assert.equal(err, null); callback(); } ); }//if else { callback(); } }); //updateTop3ForContinent }; //insertDocument var updateTop3ForContinent = function(db, top3 , callback) { db.collection('top3').updateOne( { "continent" : top3.continent }, { $set: { "nrs": top3.nrs }, $currentDate: { "lastModified": true } }, function(err, results) { //console.log(results); callback(results); }); };
The console output from the Node application:
The output on the MongoDB Shell:
Note: I have used db.top3.find() three times: before running the Node application, after it has ran once and after it has ran a second time. Note that after the second time, the lastModified property was added.
Second Node Program – Consume messages from Kafka Topic and Update MongoDB accordingly
This application registers as Kafka Consumer on the Topic Top3CountrySizePerContinent. Each message that is produced to that topic is consumed by the Node application and handled by function handleCountryMessage. This function parses the JSON message received from Kafka, adds a property continent derived from the key of the Kafka message, and calls the insertDocument function. This function attempts to update a record in the MongoDB collection top3 that has the same continent property value as the document passed in as parameter. If the update succeeds, the handling of the Kafka message is complete and the MongoDB collection contains the most recent standings produced by the Kafka Streams application. If the update fails, presumably that happens because there is no record yet for the current continent. In that case, a new document is inserted for the continent.
/* This program connects to MongoDB (using the mongodb module ) This program consumes Kafka messages from topic Top3CountrySizePerContinent to which the Running Top3 (size of countries by continent) is produced. This program records each latest update of the top 3 largest countries for a continent in MongoDB. If a document does not yet exist for a continent (based on the key which is the continent property) it is inserted. The program ensures that the MongoDB /test/top3 collection contains the latest Top 3 for each continent at any point in time. */ var MongoClient = require('mongodb').MongoClient; var assert = require('assert'); var kafka = require('kafka-node') var Consumer = kafka.Consumer var client = new kafka.Client("ubuntu:2181/") var countriesTopic = "Top3CountrySizePerContinent"; // connect string for mongodb server running locally, connecting to a database called test var url = 'mongodb://127.0.0.1:27017/test'; var mongodb; MongoClient.connect(url, function(err, db) { assert.equal(null, err); console.log("Connected correctly to MongoDB server."); mongodb = db; }); var insertDocument = function(db, doc, callback) { // first try to update; if a document could be updated, we're done updateTop3ForContinent( db, doc, function (results) { if (!results || results.result.n == 0) { // the document was not updated so presumably it does not exist; let's insert it db.collection('top3').insertOne( doc , function(err, result) { assert.equal(err, null); console.log("Inserted doc for "+doc.continent); callback(); } ); }//if else { console.log("Updated doc for "+doc.continent); callback(); } }); //updateTop3ForContinent }; //insertDocument var updateTop3ForContinent = function(db, top3 , callback) { db.collection('top3').updateOne( { "continent" : top3.continent }, { $set: { "nrs": top3.nrs }, $currentDate: { "lastModified": true } }, function(err, results) { //console.log(results); callback(results); }); }; // Configure Kafka Consumer for Kafka Top3 Topic and handle Kafka message (by calling updateSseClients) var consumer = new Consumer( client, [], {fromOffset: true} ); consumer.on('message', function (message) { handleCountryMessage(message); }); consumer.addTopics([ { topic: countriesTopic, partition: 0, offset: 0} ], () => console.log("topic "+countriesTopic+" added to consumer for listening")); function handleCountryMessage(countryMessage) { var top3 = JSON.parse(countryMessage.value); var continent = new Buffer(countryMessage.key).toString('ascii'); top3.continent = continent; // insert or update the top3 in the MongoDB server insertDocument(mongodb,top3, function() { console.log("Top3 recorded in MongoDB for "+top3.continent); }); }// handleCountryMessage
Running the application produces the following output.
Producing Countries:
Producing Streaming Analysis – Running Top 3 per Continent:
Processing Kafka Messages:
Resulting MongoDB collection:
And after a little while, here is the latest situation for Europe and Asia in the MongoDB collection :
Resulting from processing the latest Kafka Stream result messages:
Querying the MongoDB Collection
The current set of top3 documents – one for each continent – stored in MongoDB can be queried, using MongoDB find and aggregation facilities.
One query we can perform is to retrieve the top 5 largest countries in the world. Here is the query that gives us that insight. First it creates a single record per country (using unwind to join the nrs collection in each top3 document). The countries are then sorted by the size of each country (descending) and the first 5 of the sort result are retained. These five are then projected into a nicer looking output document that only contains continent, country and area fields.
db.top3.aggregate([ {$project: {nrs:1}},{$unwind:’$nrs’}, {$sort: {“nrs.size”:-1}}, {$limit:5}, {$project: {continent:’$nrs.continent’, country:’$nrs.name’, area:’$nrs.size’ }}])
db.top3.aggregate([ {$project: {nrs:1}} ,{$unwind:'$nrs'} , {$sort: {"nrs.size":-1}} , {$limit:5} , {$project: {continent:'$nrs.continent', country:'$nrs.name', area:'$nrs.size' }} ])
(And because no continent has its number 3 country in the top 4 of this list, we can be sure that this top 5 is the actual top 5 of the world)
Resources
A very good read – although a little out of date – is this tutorial on 1st and 2nd steps with Node and Mongodb: http://cwbuecheler.com/web/tutorials/2013/node-express-mongo/
MongoDB Driver for Node.js in the official MongoDB documentation: https://docs.mongodb.com/getting-started/node/client/
Kafka Connect for MongoDB – YouTube intro – https://www.youtube.com/watch?v=AF9WyW4npwY
Combining MongoDB and Apache Kafka – with a Java application talking and listening to both: https://www.mongodb.com/blog/post/mongodb-and-data-streaming-implementing-a-mongodb-kafka-consumer
Tutorials Point MongoDB tutorials – https://www.tutorialspoint.com/mongodb/mongodb_sort_record.htm
Data Aggregation with Node.JS driver for MongoDB – https://docs.mongodb.com/getting-started/node/aggregation/
Hi Lucas,
Great post! I noticed the mongoDB client library has updated, so that your example no longer works for version 3.x. The MongoClient.connect() method now returns a client object, that has a separate db() function which is needed to extract the db:
MongoClient.connect(‘mongodb://localhost:27017’, (err, client) => {
// Client returned
var db = client.db(‘mytestingdb’);
});
Also the close() method has been moved to the client instance, so:
MongoClient.connect(‘mongodb://localhost’, function (err, client) {
if (err) throw err;
var db = client.db(‘mytestingdb’);
db.collection(‘customers’).findOne({}, function (findErr, result) {
if (findErr) throw findErr;
console.log(result.name);
client.close();
});
});
Thanks Lauri for the comment and the valuable guidance!