This article describes a simple Node.js application that uses Server Sent Events (SSE) technology to push updates to a simple HTML client, served through the Express framework. The updates originate from messages consumed from a Kafka Topic. Although the approach outlined in this article stands on its own, and does not even depend on Apache Kafka, it also forms the next step in a series of articles that describe an Kafka Streams application that processes messages on a Kafka Topic – deriving running Top N ranks – and produces them to another Kafka Topic. The Node.js application in this current article consumes the Top N messages and pushes them to the HTML client.
The simple story told by this article is:
And the complete picture – including the prequel discussed in https://technology.amis.nl/2017/02/12/apache-kafka-streams-running-top-n-grouped-by-dimension-from-and-to-kafka-topic/ – looks like this:
Sources are found on GitHub:https://github.com/lucasjellema/kafka-streams-running-topN/tree/master/kafka-node-express-topN-sse .
Topics discussed in this article
Browser, HTML & JavaScript
- Programmatically add HTML elements
- Add row to an HTML table and cells to a table row
- Set Id attribute on HTML elements
- Loop over all elements in an array using for .. of
- Subscribe to a SSE server
- Process an incoming SSE message (onMessage, process JSON)
- Formatting (large) numeric values in JavaScript strings
- Concatenating Strings in JavaScript
Node & Express (server side JavaScript)
- Consume message from Kafka Topic
- Serve static HTML documents using Express
- Expose API through Express that allows SSE clients to register for server sent events
- Push JSON messages to all SSE clients
- Execute a function periodically, based on an interval using a Node Time (setInterval)
Browser – Client Side – HTML & JavaScript
The client side of the implementation is a simple HTML document (index.html) with embedded JavaScript. In a real application, the JavaScript should ideally be imported from a separate JavaScript library. In the <script> tag in the <head> of the document is the JavaScript statement that registers the browser as a SSE subscriber:
var source = new EventSource(“../topn/updates”);
The SSE server is located at a path /topn/updates relative to the path where the index.html document was loaded (http://host:port/index.html – downloaded from the public sub directory in the Node application where static resources are located and served from). Requests to this URL path are handled through the Express framework in the Node application.
On this EventSource object, a message handler is created – with the function to be invoked whenever an SSE event is received on this source:
source.onmessage = function(event) { … }
The content of the function is fairly straightforward: the JSON payload from the event is parsed. It contains the name of a continent and an array of the current top 3 countries by size in that continent. Based on this information, the function locates the continent row (if it does not yet exist, the row is created) in the table with top3 records. The top3 in the SSE event is written to the innnerHTML property of the second table cell in the continent’s table row.
<!DOCTYPE html> <html> <head> <title>Continent and Country Overview</title> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <script> // assume that API service is published on same server that is the server of this HTML file // send event to SSE stream /* "{\"nrs\":[{\"code\":\"FR\",\"name\":\"France\",\"population\":66836154,\"size\":643801,\"continent\":\"Europe\"},{\"code\":\"DE\",\"name\":\"Germany\",\"population\":80722792,\"size\":357022,\"continent\":\"Europe\"},{\"code\":\"FI\",\"name\":\"Finland\",\"population\":5498211,\"size\":338145,\"continent\":\"Europe\"},null]}" update all Sse Client with message {"nrs":[{"code":"FR","name":"France","population":66836154,"size":643801,"continent":"Europe"},{"code":"DE","name":"Germany","population":80722792,"size":357022,"continent":"Europe"},{"code":"FI","name":"Finland","population":5498211,"size":338145,"continent":"Europe"},null]} */ var source = new EventSource("../topn/updates"); source.onmessage = function(event) { var top3 = JSON.parse(event.data); if (top3.continent) { var nrs = top3.nrs; var trID = "continent_"+top3.continent; // find row in table with id equal to continent var tr = document.getElementById(trID); // if not found, then add a row if (!tr) { // table does not yet have a row for the continent, than add it // find table with continents var tbl = document.getElementById("continentsTbl"); // Create an empty <tr> element and add it to the 1st position of the table: tr = tbl.insertRow(1); tr.setAttribute("id", trID, 0); // Insert new cells (<td> elements) at the 1st and 2nd position of the "new" <tr> element: var cell1 = tr.insertCell(0); cell1.setAttribute("id",trID+"Continent",0); var cell2 = tr.insertCell(1); cell2.setAttribute("id",trID+"Top3",0); // Add some text to the new cells: cell1.innerHTML = top3.continent; }// tr not found var top3Cell = document.getElementById(trID+"Top3"); var list = "<ol>"; for (country of top3.nrs) { if (country) { list= list.concat( "<li>",country.name," (size ",country.size.toLocaleString(),")","</li>"); } }//for list= list+ "</ol>"; top3Cell.innerHTML = list; }// if continent };//onMessage </script> </head> <body> <div id="loading"> <h2>Please wait...</h2> </div> <div id="result"> <table id="continentsTbl"> <tr><td>Continent</td><td>Top 3 Countries by Size</td></tr> </table> </div> </body> </html>
Node Application – Server Side – JavaScript using Express framework
The server side in this article consists of a simple Node application that leverages the Express module as well as the kafka-node module. A simple, generic SSE library is used – in the file sse.js. It exports the Connection object – that represents the SSE channel to a single client – and the Topic object that manages a collection of Connections (for all SSE consumers around a specific subject). When the connection under a Connection ends (on close), the Connection is removed from the Collection.
"use strict"; console.log("loading sse.js"); // ... with this middleware: function sseMiddleware(req, res, next) { console.log(" sseMiddleware is activated with "+ req+" res: "+res); res.sseConnection = new Connection(res); console.log(" res has now connection res: "+res.sseConnection ); next(); } exports.sseMiddleware = sseMiddleware; /** * A Connection is a simple SSE manager for 1 client. */ var Connection = (function () { function Connection(res) { console.log(" sseMiddleware construct connection for response "); this.res = res; } Connection.prototype.setup = function () { console.log("set up SSE stream for response"); this.res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' }); }; Connection.prototype.send = function (data) { console.log("send event to SSE stream "+JSON.stringify(data)); this.res.write("data: " + JSON.stringify(data) + "\n\n"); }; return Connection; }()); exports.Connection = Connection; /** * A Topic handles a bundle of connections with cleanup after lost connection. */ var Topic = (function () { function Topic() { console.log(" constructor for Topic"); this.connections = []; } Topic.prototype.add = function (conn) { var connections = this.connections; connections.push(conn); console.log('New client connected, now: ', connections.length); conn.res.on('close', function () { var i = connections.indexOf(conn); if (i >= 0) { connections.splice(i, 1); } console.log('Client disconnected, now: ', connections.length); }); }; Topic.prototype.forEach = function (cb) { this.connections.forEach(cb); }; return Topic; }()); exports.Topic = Topic;
The main application – in file topNreport.js does a few things:
- it serves static HTML resources in the public subdirectory (which only contains the index.html document)
- it implements the /topn/updates API where clients can register for SSE updates (that are collected in the sseClients Topic)
- it consumes messages from the Kafka Topic Top3CountrySizePerContinent and pushes each received message as SSE event to all SSE clients
- it schedules a function for periodic execution (once every 10 seconds at the moment); whenever the function executes, it sends a heartbeat event to all SSE clients
/* This program serves a static HTML file (through the Express framework on top of Node). The browser that loads this HTML document registers itself as an SSE client with this program. This program consumes Kafka messages from topic Top3CountrySizePerContinent to which the Running Top3 (size of countries by continent) is produced. This program reports to all its SSE clients the latest update (or potentially a periodice top 3 largest countries per continent (with a configurable interval)) */ var express = require('express') , http = require('http') , sseMW = require('./sse'); var kafka = require('kafka-node') var Consumer = kafka.Consumer var client = new kafka.Client("ubuntu:2181/") var countriesTopic = "Top3CountrySizePerContinent"; var app = express(); var server = http.createServer(app); var PORT = process.env.PORT || 3000; var APP_VERSION = '0.9'; server.listen(PORT, function () { console.log('Server running, version '+APP_VERSION+', Express is listening... at '+PORT+" "); }); // Realtime updates var sseClients = new sseMW.Topic(); app.use(express.static(__dirname + '/public')) app.get('/about', function (req, res) { res.writeHead(200, {'Content-Type': 'text/html'}); res.write("Version "+APP_VERSION+". No Data Requested, so none is returned"); res.write("Supported URLs:"); res.write("/public , /public/index.html "); res.write("incoming headers" + JSON.stringify(req.headers)); res.end(); }); //configure sseMW.sseMiddleware as function to get a stab at incoming requests, in this case by adding a Connection property to the request app.use(sseMW.sseMiddleware) // initial registration of SSE Client Connection app.get('/topn/updates', function(req,res){ var sseConnection = res.sseConnection; sseConnection.setup(); sseClients.add(sseConnection); } ); var m; //send message to all registered SSE clients updateSseClients = function(message) { var msg = message; this.m=message; sseClients.forEach( function(sseConnection) { sseConnection.send(this.m); } , this // this second argument to forEach is the thisArg (https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/forEach) ); //forEach }// updateSseClients // send a heartbeat signal to all SSE clients, once every interval seconds (or every 3 seconds if no interval is specified) initHeartbeat = function(interval) { setInterval(function() { var msg = {"label":"The latest", "time":new Date()}; updateSseClients( JSON.stringify(msg)); }//interval function , interval?interval*1000:3000 ); // setInterval }//initHeartbeat // initialize heartbeat at 10 second interval initHeartbeat(10); // Configure Kafka Consumer for Kafka Top3 Topic and handle Kafka message (by calling updateSseClients) var consumer = new Consumer( client, [], {fromOffset: true} ); consumer.on('message', function (message) { handleCountryMessage(message); }); consumer.addTopics([ { topic: countriesTopic, partition: 0, offset: 0} ], () => console.log("topic "+countriesTopic+" added to consumer for listening")); function handleCountryMessage(countryMessage) { var top3 = JSON.parse(countryMessage.value); var continent = new Buffer(countryMessage.key).toString('ascii'); top3.continent = continent; updateSseClients( top3); }// handleCountryMessage
Running the application
In order to run the application, the Node application that publishes the basic country records to a Kafka Topic is started:
The Kafka Streaming Java application that derives the Top 3 per continent as produces it to a Kafka Topic is started:
And the Node application that consumes from the Top3 Topic and pushes SSE events to the browser clients is run:
After a little wait, the browser displays:
based on output from the Kafka Streams application:
When all messages have been processed from the countries2.csv input file, this is what the browser shows:
This is the result of all the individual top3 messages pushed as SSE events from the Node application to the browser client. The screenshot shows only one browser client; however, many browsers could have connected to the same Node server and have received the same SSE events simultaneously.
Thanks! That was so intricately explained, so helpful!
Thanks Ash for your kind comment. Kind regards, Lucas