This article can be read in at least two different ways:
- as a somewhat lengthy introduction of a handy tool that you can easily run to inspect messages published to the topics in a Kafka Cluster
This image visualizes the Kafka Topic Watcher: in a browser window – with automatic refresh when new messages are produced on a Kafka Topic – a live list is shown of messages from all Topics in an Apache Kafka Cluster. And all historic messages can be flushed from the topic into the browser. Quite convenient during development – and for workshops, demonstrations and fun.
The Node application can be run anywhere – on a laptop, in a cloud, on server – and connects to the designated Kafka Cluster – which (provided the network access rules are configured correctly) can be located anywhere, for example in a public cloud.
The sources for the tool are available in GitHub: https://github.com/AMIS-Services/online-meetups-introduction-of-kafka/tree/master/lab2b-topic-watcher (as part of the Repo for a three part Workshop on Apache Kafka)
Overview of the Kafka Topic Watcher Application
The heart of the Kafka Topic Watcher is the Node application – of which app.js is the main module. This module:
- handles REST requests from the web application
- pushes SSE (Server Sent Events) to the web application (using the SSE support in sse.js)
- imports and initializes the consume module that connects to the target Kafka Cluster (as specified in config.js)
- set a message handler with the consume module for handling the messages from all topics in the Kafka Cluster: function handleMessage. This function prepares the event to push to the browser and uses function updateSseClients and the sse.js module for sending a SSE event to all connected Kafka Topic Watcher clients.
The current list of Topics on the Kafka Cluster are requested from the Node application in an XHR (aka AJAX request), from topic-management.js. The list of topics return in the asynchronous response is written to the HTML page based on the response content for this request.
The GUI contains a button Clear Messages. When pressed, function clearMessages is invoked (in message-handler.js). It clears the messages array and removes the rows from the topicMessagesTable.
The GUI also contains a link Reread all Topic(s) from Beginning. When clicked on, function config is triggered that in turns sends a POST request to the /config endpoint that subsequently reinitializes the consumer with a flag specifiying that all messages are to be read from all topics, not just the newly arriving messages; function initializeConsumer is invoked and will disconnect the current Stream Consumer and create a new one.
Running the Kafka Topic Watcher
The Kafka Topic Watcher is almost ready to go. You do not need to make changes to the code in order to get it to fun – except to make sure that file config.js has the correct settings for your environment: the Kafka Broker configuration (for your local cluster or for example for a CloudKarafka cloud based cluster) needs to be defined.
Before you can run the application, you need to bring in the dependencies. From the command line in the directory that contains file package.json run:
to download all required NPM modules into the directory node-modules.
Now you can run the web application:
The HTTP server is started and listens on port 3010.
From a browser open the Kafka Topic Watcher web application: http://localhost:3010.
You should see a list of all non-internal topics in the designated Kafka Cluster. And you will see all new messages produced to these topics. You can click on the link Reread all Topic(s) from Beginning to see all messages from the topics’ history. In addition to the real messages from Kafka Topics, you will also see heartbeat messages that are sent every 25 seconds by the Node application to all SSE clients.
Some aspects of the implementation of the tool are perhaps worth a second glance (or to nick some code from):
- How to connect to a Kafka Cluster using node-rdkafka
- How to retrieve a list of all topics in a Kafka Cluster
- How to set up a Server Sent Events channel from Node and how to receive the SSE messages in the browser
- How to send XHR (fka AJAX) requests and handle responses using await fetch
Best look at an example:
Compose kafkaConf by merging externalConfig.kafkaConfig with the object created on the spot. All properties in both objects will be in the resulting object. If both object contain the same property, this property will have the value it has in the second object in this operation.
How to connect to a Kafka Cluster using node-rdkafka
The connection is created as part of a Producer or Consumer – based on a Kafka configuration object – and really on the property metadata.broker.list.
This property is set with the endpoints of the brokers in the Kafka Cluster – host name and port. Depending on the security configuration of the Kafka Cluster, the configuration object may need to have properties to handle security restrictions. An example of a Kafka configuration object I have been using:
How to retrieve a list of all topics in a Kafka Cluster
The node-rdkafka implementation provides an option on a Consumer as well as a Producer to retrieve meta-data for the Kafka Cluster, check this link. In this case, I have used the Producer object, even though the tool is not actually producing anything to the Cluster.
The producer object is created based on the kafkaConfig object. Once connected, the ready event handler is passed a metadata object that contains among other things an array for all topics in the cluster. Topic names starting with “__” indicate internal, administrative topics; these are filtered out in the reduce step.
The function getTopics is defined as an async function. Invokers of this function are likely to use an await in their call:
Function getTopics only gets access to the result it will return in the (asynchronous) event handler for the ready event. In order to produce the function result after the function has returned, we make use of a Promise. Function getTopics returns a Promise – which is a ‘deferred result’. Only when the Promise handled to the invoker (getStarted()) resolves is the await fulfilled. Inside the Promise, we use an explicit call to (built in function) resolve to make the Promise produce its result. It feels like function getTopics() hands a little box to function getStarted() with the instruction to getStarted() to continue to wait until the result pops out of the box.
How to Filter an Array to Elements satisfying a Condition in an Efficient and Elegant Manner: Reduce
The array of topic returned by metadata.topics contains internal topics (those whose name start with __). In order to produce an array of topics without these elements, the most elegant approach is to set the reduce operator on the source array. Reduce works with a start value (the empty array in this case: ) and a function that receives two inputs: the intermediate result (which initially is the start value, here the empty array) and the next array element. The function is expected to produce the next intermediate result. In this case, the intermediate result is an array with all elements from the original array that satisfy the filter condition.
This snippet can more or less be read as:
const clusterTopics = (select * from metadata.topics as topic where substring(topic.name,1,2) != ‘__’)
How to set up a Server Sent Events channel from Node and how to receive the SSE messages in the browser
The browser client subscribes to a SSE source – using EventSource(url). That is really all it takes to establish a channel that allows the server to send its updates to be handled asynchronously by the client. A message handler is attached to the event source. This function is invoked every time the server pushes an event to the SSE subscribers. The browse just receives a package of JSON and needs to do something useful with it.
The server side of SSE when using Node and Express is not too complex either.
Express is configured with multiple “interceptors” that can do work on an HTTP request before it is really handed of to a path & method specific handler – very much like Servlet Filters and Servlets for those of you with a Java (Servlet) background. The sseMiddleware “interceptor” is a simple one that adds an SSE Connection property to each response – whether it will be used or not. Note: only in the /updates handler is this connection truly used.
The request sent from the client – through EventSource(URL) – to the /updates path is the request that results in the subscription to the SSE channel. The connection is setup (and sent to the browser client) and added to the collection of sseClients in order to be able to send messages to it when the server side need to express itself arises.
The Topic and Connection objects are defined in the next snippet. Connection is the actual SSE channel – the true handshake with the browser. Topic is a simple administrative object to keep track of all connections:
Finally, to send a message from the server to all connected SSE listeners, this next function can be called. It takes a message and sends it to all SSE clients.
How to send XHR (fka AJAX) requests from the browser and handle responses using await fetch
I sent my first asynchronous browser request in the previous century, before the term AJAX was coined and before the XML HTTP Request object was introduced. I almost believed I had invented the concept. One (invisible) frame in the webpage submitted a request and was reloaded with the response. The top frame could access the response and use it to update itself or other frames. It feels as if frame-security was a lot more relaxed in those days. Anyway, to interact from the browser with the server in the background has evolved quite a bit. Asynchronous syntax in recent version of ES (ECMA Script) has made AJAX style programming even less messy. Reactive across the great divide between browser and server.
Even though the Kafka Topic Watcher is doing what it is supposed to, it can easily be improved. Some suggestions – that the reader may take as a challenge to accept:
- allow filtering on specific Topics
- allow sorting on various properties
- improve the look & feel of the GUI with styles, fonts and widgets
- allow publishing messages in addition to consuming them
- allow the target Kafka Cluster to be set from the GUI (or even to consume from multiple clusters at the same time)