DIY Kafka Topic Watcher tool – Node, Express, Server Sent Events and Apache Kafka

Lucas Jellema

This article can be read in at least two different ways:

  1. as a somewhat lengthy introduction of a handy tool that you can easily run to inspect messages published to the topics in a Kafka Cluster
  2. as a detailed yet basic example of how to combine several techniques to create an end to end (push) pipeline from Kafka Topic (on a server or in a cloud) to the Client Browser (including: Node & node-rdkafka, Express and Server Sent Events, DHTML & Client Side JavaScript

This article gives an introduction of a simple tool – a Node application – that provides useful functionality and which can easily – so easily – be extended with additional features. And the code provided with this article contains a number of useful examples of Node/JavaScript language usage and of creating a Kafka Client using node-rdkafka and of implementing a Server Sent Event channel on server and client slide using Express middleware.

This image visualizes the Kafka Topic Watcher: in a browser window – with automatic refresh when new messages are produced on a Kafka Topic – a live list is shown of messages from all Topics in an Apache Kafka Cluster. And all historic messages can be flushed from the topic into the browser. Quite convenient during development – and for workshops, demonstrations and fun.

The Node application can be run anywhere – on a laptop, in a cloud, on server – and connects to the designated Kafka Cluster – which (provided the network access rules are configured correctly) can be located anywhere, for example in a public cloud.

The sources for the tool are available in GitHub: https://github.com/AMIS-Services/online-meetups-introduction-of-kafka/tree/master/lab2b-topic-watcher (as part of the Repo for a three part Workshop on Apache Kafka)

Overview of the Kafka Topic Watcher Application

The heart of the Kafka Topic Watcher is the Node application – of which app.js is the main module. This module:

  • starts an HTTP Server (port 3010) for serving the web application (a few static files – HTML, Javascript and images)
  • handles REST requests from the web application
  • pushes SSE (Server Sent Events) to the web application (using the SSE support in sse.js)
  • imports and initializes the consume module that connects to the target Kafka Cluster (as specified in config.js)
  • set a message handler with the consume module for handling the messages from all topics in the Kafka Cluster: function handleMessage. This function prepares the event to push to the browser and uses function updateSseClients and the sse.js module for sending a SSE event to all connected Kafka Topic Watcher clients.

The web application consists of file index.html and two supporting (client side) JavaScript files: message-handler.js and topics-management.js. The former subscribes to the SSE endpoint and handles the SSE Events that are pushed from the Node application for each message consumed from one of the Kafka Topics. The messages are collected in a messages array as well as written to the top of the topicMessagesTable HTML Table element.

The current list of Topics on the Kafka Cluster are requested from the Node application in an XHR (aka AJAX request), from topic-management.js. The list of topics return in the asynchronous response is written to the HTML page based on the response content for this request.

The GUI contains a button Clear Messages. When pressed, function clearMessages is invoked (in message-handler.js). It clears the messages array and removes the rows from the topicMessagesTable.

The GUI also contains a link Reread all Topic(s) from Beginning. When clicked on, function config is triggered that in turns sends a POST request to the /config endpoint that subsequently reinitializes the consumer with a flag specifiying that all messages are to be read from all topics, not just the newly arriving messages; function initializeConsumer is invoked and will disconnect the current Stream Consumer and create a new one.

Running the Kafka Topic Watcher

The Kafka Topic Watcher is almost ready to go. You do not need to make changes to the code in order to get it to fun – except to make sure that file config.js has the correct settings for your environment: the Kafka Broker configuration (for your local cluster or for example for a CloudKarafka cloud based cluster) needs to be defined.

Before you can run the application, you need to bring in the dependencies. From the command line in the directory that contains file package.json run:

npm install

to download all required NPM modules into the directory node-modules.

Now you can run the web application:

node app.js

or

npm start

The HTTP server is started and listens on port 3010.

From a browser open the Kafka Topic Watcher web application: http://localhost:3010.

image

You should see a list of all non-internal topics in the designated Kafka Cluster. And you will see all new messages produced to these topics. You can click on the link Reread all Topic(s) from Beginning to see all messages from the topics’ history. In addition to the real messages from Kafka Topics, you will also see heartbeat messages that are sent every 25 seconds by the Node application to all SSE clients.

Implementation Details

Some aspects of the implementation of the tool are perhaps worth a second glance (or to nick some code from):

  • How to create a JavaScript object by merging two other objects
  • How to connect to a Kafka Cluster using node-rdkafka
  • How to retrieve a list of all topics in a Kafka Cluster
  • How to return an asynchronous response from a JavaScript function
  • How to set up a Server Sent Events channel from Node and how to receive the SSE messages in the browser
  • How to send XHR (fka AJAX) requests and handle responses using await fetch

How to create a JavaScript object by merging two other objects

Best look at an example:

image

Compose kafkaConf by merging externalConfig.kafkaConfig with the object created on the spot. All properties in both objects will be in the resulting object. If both object contain the same property, this property will have the value it has in the second object in this operation.

How to connect to a Kafka Cluster using node-rdkafka

First declare the dependency on node-rdkafka in package.json. This provides the bridge from JavaScript through C/C++ to the Kafka communication protocol.

The connection is created as part of a Producer or Consumer – based on a Kafka configuration object – and really on the property metadata.broker.list.

SNAGHTML4c1b239

This property is set with the endpoints of the brokers in the Kafka Cluster – host name and port. Depending on the security configuration of the Kafka Cluster, the configuration object may need to have properties to handle security restrictions. An example of a Kafka configuration object I have been using:

image

How to retrieve a list of all topics in a Kafka Cluster

The node-rdkafka implementation provides an option on a Consumer as well as a Producer to retrieve meta-data for the Kafka Cluster, check this link. In this case, I have used the Producer object, even though the tool is not actually producing anything to the Cluster.

The producer object is created based on the kafkaConfig object. Once connected, the ready event handler is passed a metadata object that contains among other things an array for all topics in the cluster. Topic names starting with “__” indicate internal, administrative topics; these are filtered out in the reduce step.

image

How to return an asynchronous response from a JavaScript function

The function getTopics is defined as an async function. Invokers of this function are likely to use an await in their call:

image

Function getTopics only gets access to the result it will return in the (asynchronous) event handler for the ready event. In order to produce the function result after the function has returned, we make use of a Promise. Function getTopics returns a Promise – which is a ‘deferred result’. Only when the Promise handled to the invoker (getStarted()) resolves is the await fulfilled. Inside the Promise, we use an explicit call to (built in function) resolve to make the Promise produce its result. It feels like function getTopics() hands a little box to function getStarted() with the instruction to getStarted() to continue to wait until the result pops out of the box.

See for more on combining callback functions, event handlers, promises and async/await in my earlier article: https://technology.amis.nl/2020/01/11/javascript-mapping-and-wrapping-classic-callback-functions-to-promises-and-async-await/.

How to Filter an Array to Elements satisfying a Condition in an Efficient and Elegant Manner: Reduce

The array of topic returned by metadata.topics contains internal topics (those whose name start with __). In order to produce an array of topics without these elements, the most elegant approach is to set the reduce operator on the source array. Reduce works with a start value (the empty array in this case: []) and a function that receives two inputs: the intermediate result (which initially is the start value, here the empty array) and the next array element. The function is expected to produce the next intermediate result. In this case, the intermediate result is an array with all elements from the original array that satisfy the filter condition.

image

This snippet can more or less be read as:

const clusterTopics = (select * from metadata.topics as topic where substring(topic.name,1,2) != ‘__’)

How to set up a Server Sent Events channel from Node and how to receive the SSE messages in the browser

The browser client subscribes to a SSE source – using EventSource(url). That is really all it takes to establish a channel that allows the server to send its updates to be handled asynchronously by the client. A message handler is attached to the event source. This function is invoked every time the server pushes an event to the SSE subscribers. The browse just receives a package of JSON and needs to do something useful with it.

image

The server side of SSE when using Node and Express is not too complex either.

Express is configured with multiple “interceptors” that can do work on an HTTP request before it is really handed of to a path & method specific handler – very much like Servlet Filters and Servlets for those of you with a Java (Servlet) background. The sseMiddleware “interceptor” is a simple one that adds an SSE Connection property to each response – whether it will be used or not. Note: only in the /updates handler is this connection truly used.

The request sent from the client – through EventSource(URL) – to the /updates path is the request that results in the subscription to the SSE channel. The connection is setup (and sent to the browser client) and added to the collection of sseClients in order to be able to send messages to it when the server side need to express itself arises.

image

The Topic and Connection objects are defined in the next snippet. Connection is the actual SSE channel – the true handshake with the browser. Topic is a simple administrative object to keep track of all connections:

image

Finally, to send a message from the server to all connected SSE listeners, this next function can be called. It takes a message and sends it to all SSE clients.

image

How to send XHR (fka AJAX) requests from the browser and handle responses using await fetch

I sent my first asynchronous browser request in the previous century, before the term AJAX was coined and before the XML HTTP Request object was introduced. I almost believed I had invented the concept. One (invisible) frame in the webpage submitted a request and was reloaded with the response. The top frame could access the response and use it to update itself or other frames. It feels as if frame-security was a lot more relaxed in those days.  Anyway, to interact from the browser with the server in the background has evolved quite a bit. Asynchronous syntax in recent version of ES (ECMA Script) has made AJAX style programming even less messy. Reactive across the great divide between browser and server.

Here function config() leverages postData() to send a POST request with a JSON body to a URL. The async postData function returns a promise that resolves when the response to the POST request has been received. The response is parsed from JSON to a JavaScript object structure that appears as the input to the then function:

image

Even less code is required when we code the server request like the next fragment. Note that we need to await twice. The fetch operation is asynchronous. What it returns asynchronously is not the response, but the promise of the response. So two layers: first await the asynchronous sending of the request and then with the promise of the response, await the arrival of the actual response. Then process the JSON content of the response (parse the text into JavaScript objects) and in this case write a list based on the array elements in the data.

image

(see for example https://dev.to/shoupn/javascript-fetch-api-and-using-asyncawait-47mp for some background)

TODO/Next Steps

Even though the Kafka Topic Watcher is doing what it is supposed to, it can easily be improved. Some suggestions – that the reader may take as a challenge to accept:

  • allow filtering on specific Topics
  • allow sorting on various properties
  • improve the look & feel of the GUI with styles, fonts and widgets
  • allow publishing messages in addition to consuming them
  • allow the target Kafka Cluster to be set from the GUI (or even to consume from multiple clusters at the same time)

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Next Post

AWS shop example: Lambda

Facebook0TwitterLinkedinIntroduction In the previous blog [1], I wrote about an example shop application in AWS. Let me show the AWS architecture of this shop again: In this blog, I will tell a little bit more about the Lambda functions in this shop example. Lambda functions are serverless functions: you don’t […]