Apache Kafka Streams – Running Top-N Aggregation grouped by Dimension – from and to Kafka Topic


This article explains how to implement a streaming analytics application using Kafka Streams that performs a running Top N analysis on a Kafka Topic and produces the results to another Kafka Topic. Visualized, this looks like this:


Two previous articles are relevant as reference:

This GitHub Repo contains the sources for this article: https://github.com/lucasjellema/kafka-streams-running-topN.

Note that almost all aggregations are specializations of this top-N: min, max, sum, avg and count are all simple top-1 aggregations that can be implemented using a simplified version of this code.

To get started, go through the steps described in my previous article. This will result in an App.java class, that we can flesh out.

<br>package nl.amis.streams.countries;</p> <p>import nl.amis.streams.JsonPOJOSerializer;<br>import nl.amis.streams.JsonPOJODeserializer;</p> <p>// generic Java imports<br>import java.util.Properties;<br>import java.util.HashMap;<br>import java.util.Map;<br>import java.util.Arrays;<br>// Kafka imports<br>import org.apache.kafka.common.serialization.Serde;<br>import org.apache.kafka.common.serialization.Serdes;<br>import org.apache.kafka.common.serialization.Serializer;<br>import org.apache.kafka.common.serialization.Deserializer;<br>// Kafka Streams related imports<br>import org.apache.kafka.streams.StreamsConfig;<br>import org.apache.kafka.streams.KafkaStreams;<br>import org.apache.kafka.streams.kstream.KStream;<br>import org.apache.kafka.streams.kstream.KTable;<br>import org.apache.kafka.streams.kstream.KStreamBuilder;<br>import org.apache.kafka.streams.processor.WallclockTimestampExtractor;</p> <p>import org.apache.kafka.streams.kstream.Window;<br>import org.apache.kafka.streams.kstream.Windowed;<br>import org.apache.kafka.streams.kstream.Windows;<br>import org.apache.kafka.streams.kstream.TimeWindows;</p> <p>public class App {<br>static public class CountryMessage {<br>/* the JSON messages produced to the countries Topic have this structure:<br>{ "name" : "The Netherlands"<br>, "code" : "NL<br>, "continent" : "Europe"<br>, "population" : 17281811<br>, "size" : 42001<br>};<br><br>this class needs to have at least the corresponding fields to deserialize the JSON messages into<br>*/</p> <p>public String code;<br>public String name;<br>public int population;<br>public int size;<br>public String continent;<br>}</p> <p>static public class CountryTop3 {</p> <p>public CountryMessage[] nrs = new CountryMessage[4] ;<br>public CountryTop3() {}<br>}</p> <p>private static final String APP_ID = "countries-top3-kafka-streaming-analysis-app";</p> <p>public static void main(String[] args) {<br>System.out.println("Kafka Streams Top 3 Demonstration");</p> <p>// Create an instance of StreamsConfig from the Properties instance<br>StreamsConfig config = new StreamsConfig(getProperties());<br>final Serde &lt; String &gt; stringSerde = Serdes.String();<br>final Serde &lt; Long &gt; longSerde = Serdes.Long();</p> <p>// define countryMessageSerde<br>Map &lt; String, Object &gt; serdeProps = new HashMap &lt; String, Object &gt; ();<br>final Serializer &lt; CountryMessage &gt; countryMessageSerializer = new JsonPOJOSerializer &lt; &gt; ();<br>serdeProps.put("JsonPOJOClass", CountryMessage.class);<br>countryMessageSerializer.configure(serdeProps, false);</p> <p>final Deserializer &lt; CountryMessage &gt; countryMessageDeserializer = new JsonPOJODeserializer &lt; &gt; ();<br>serdeProps.put("JsonPOJOClass", CountryMessage.class);<br>countryMessageDeserializer.configure(serdeProps, false);<br>final Serde &lt; CountryMessage &gt; countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer, countryMessageDeserializer);</p> <p>// define countryTop3Serde<br>serdeProps = new HashMap&lt;String, Object&gt;();<br>final Serializer&lt;CountryTop3&gt; countryTop3Serializer = new JsonPOJOSerializer&lt;&gt;();<br>serdeProps.put("JsonPOJOClass", CountryTop3.class);<br>countryTop3Serializer.configure(serdeProps, false);</p> <p>final Deserializer&lt;CountryTop3&gt; countryTop3Deserializer = new JsonPOJODeserializer&lt;&gt;();<br>serdeProps.put("JsonPOJOClass", CountryTop3.class);<br>countryTop3Deserializer.configure(serdeProps, false);<br>final Serde&lt;CountryTop3&gt; countryTop3Serde = Serdes.serdeFrom(countryTop3Serializer, countryTop3Deserializer );</p> <p>// building Kafka Streams Model<br>KStreamBuilder kStreamBuilder = new KStreamBuilder();<br>// the source of the streaming analysis is the topic with country messages<br>KStream&lt;String, CountryMessage&gt; countriesStream = <br>kStreamBuilder.stream(stringSerde, countryMessageSerde, "countries");</p> <p>// A hopping time window with a size of 5 minutes and an advance interval of 1 minute.<br>// The window's name -- the string parameter -- is used to e.g. name the backing state store.<br>long windowSizeMs = 5 * 60 * 1000L;<br>long advanceMs = 1 * 60 * 1000L;<br>TimeWindows.of("hopping-window-example", windowSizeMs).advanceBy(advanceMs);</p> <p>// THIS IS THE CORE OF THE STREAMING ANALYTICS:<br>// top 3 largest countries per continent, published to topic Top3CountrySizePerContinent<br>KTable&lt;String,CountryTop3&gt; top3PerContinent = countriesStream<br>// the dimension for aggregation is continent; assign the continent as the key for each message<br>.selectKey((k, country) -&gt; country.continent)<br>// for each key value (every continent in the stream) perform an aggregation<br>.aggregateByKey( <br>// first initialize a new CountryTop3 object, initially empty<br>CountryTop3::new<br>, // for each country in the continent, invoke the aggregator, passing in the continent, the country element and the CountryTop3 object for the continent <br>(continent, countryMsg, top3) -&gt; {<br>// add the new country as the last element in the nrs array<br>top3.nrs[3]=countryMsg;<br>// sort the array by country size, largest first<br>Arrays.sort(<br>top3.nrs, (a, b) -&gt; {<br>// in the initial cycles, not all nrs element contain a CountryMessage object <br>if (a==null) return 1;<br>if (b==null) return -1;<br>// with two proper CountryMessage objects, do the normal comparison<br>return Integer.compare(b.size, a.size);<br>}<br>);<br>// lose nr 4, only top 3 is relevant<br>top3.nrs[3]=null;<br>return (top3);<br>}<br>, stringSerde, countryTop3Serde<br>, "Top3LargestCountriesPerContinent"<br>);<br>// publish the Top3 messages to Kafka Topic Top3CountrySizePerContinent <br>top3PerContinent.to(stringSerde, countryTop3Serde, "Top3CountrySizePerContinent");</p> <p>// prepare Top3 messages to be printed to the console<br>top3PerContinent.&lt;String&gt;mapValues((top3) -&gt; {<br>String rank = " 1. "+top3.nrs[0].name+" - "+top3.nrs[0].size <br>+ ((top3.nrs[1]!=null)? ", 2. "+top3.nrs[1].name+" - "+top3.nrs[1].size:"")<br>+ ((top3.nrs[2]!=null) ? ", 3. "+top3.nrs[2].name+" - "+top3.nrs[2].size:"")<br>; <br>return "List for "+ top3.nrs[0].continent +rank;<br>} <br>)<br>.print(stringSerde,stringSerde);</p> <p>System.out.println("Starting Kafka Streams Countries Example");<br>KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);<br>kafkaStreams.start();<br>System.out.println("Now started CountriesStreams Example");<br>}</p> <p>private static Properties getProperties() {<br>Properties settings = new Properties();<br>// Set a few key parameters<br>settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);<br>// Kafka bootstrap server (broker to talk to); ubuntu is the host name for my VM running Kafka, port 9092 is where the (single) broker listens <br>settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092");<br>// Apache ZooKeeper instance keeping watch over the Kafka cluster; ubuntu is the host name for my VM running Kafka, port 2181 is where the ZooKeeper listens <br>settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "ubuntu:2181");<br>// default serdes for serialzing and deserializing key and value from and to streams in case no specific Serde is specified<br>settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());<br>settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());<br>settings.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");<br>// to work around exception Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1<br>// at org.apache.kafka.clients.producer.ProducerRecord.&lt;init&gt;(ProducerRecord.java:60)<br>// see: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo<br>settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);<br>return settings;<br>}</p> <p>}<br>

Some special attention for:

  • static class CountryTop3 – custom class to hold the actual top3 for a continent; objects based on this class are passed around in the aggregator, and are produced to the output stream & topic
  • countryTop3Serde – defined for serializing CountryTop3 object to Kafka Topic, using the JsonPOJOSerializer that can translate a Java POJO to a JSON representation (that is subsequently serialized as String to the Kafka Topic)
  • aggregator implementation in lambda that performs the actual aggregation – the operatoin aggregateByKey (aggregateByKey) is invoked with an Initializer(Initializer) that returns the initial instance of the CountryTop3 object (per continent) on which the aggregate will be built, an Aggregator (Aggregator) that receives the continent, the CountryTop3 object and the next CountryMessage and upgrades the CountryTop3 object to include the new CountryMessage, the Serdes (serializer/deserializer) for the key and the value and a String that is the name of the resulting KTable.image
  • mapValues in order to create printable strings from the CountryTop3 object – the Lambda expression used in the call to mapValues gets a CountryTop3 object as input – the value in the top3PerContinent KTable – and maps it to a String. As a result, the KTable <String,CountryTop3> is mapped to a KTable<String,String> that is streamed to the print operation, using the stringSerde for serializing the two String values.image



To run the application, go through these four command line steps steps

  • (in an empty directory:)
    git clone https://github.com/lucasjellema/kafka-streams-running-topN
  • (navigate to directory kafka-streams-running-topN\Kafka-Streams-Country-TopN)
    mvn package
  • (in the same directory)
    mvn install dependency:copy-dependencies
  • (in the same directory)
    java -cp target/Kafka-Streams-Country-TopN-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App





Here is a screenshot of the Node.JS application busing producing country messages:


And here is some of the output produced by the Kafka Streams application:


Note how Mayotte enters at position one for the African continent, only to be quickly relegated by first Mozambique and then Namibia, only to disappear from the running top 3 when the message for Niger is consumed in the stream.

You should also know that instead of simply pushing every change to the destination topic, we can using timing control – to calculate aggregates over a time slice and or produce outcomes only once every so often. I will demonstrate this is in a subsequent article.

Kafka Tool shows us the topics involved in this article:


  • countries is the source, produced to by the Node.js application
  • Top3CountrySizePerContinent is the destination topic for the Kafka Streams application, to which the running Top 3 messages are produced
  • countries-topn-streaming-analysis-app-Top3LargestCountriesPerContinent-changelog is a Topic created by Kafka Streams on the fly as store for intermediate results; the name of this Topic is derived from the (intermediate) KTable create in the streaming application.

By routing the KTable to a Topic, all change events on the table are produced to the Topic. What I would have liked to be able to do is have only the latest message for each key – in this case the most recent top 3 for each continent – on the Topic. That is not what Kafka Streams does for me, not even when I am producing a KTable as opposed to a KStream. One thing I can do in this case is enable Log Compaction for the topic – although that is more like a hint to the Kafka engine than a strict instruction for removing older messages on the Topic for a key.

Note: the Kafka Streaming application makes use of RocksDB – a simple local Java client database – to hold intermediate results. RocksDB stores data locally in a directory that can be configured. During development, when you run the same analysis on the same set of test data, over and over again, you may get unexpected results, because RocksDB continues with the data it has retained from previous runs. It may be wise to delete the RocksDB local data repository regularly, by just deleting the directory:



An interesting resource is the Kafka Streams example KafkaMusicExample.java on generating a running Top 5 of all songs being played.

A good read is the article Processing Tweets with Kafka Streams https://www.madewithtea.com/processing-tweets-with-kafka-streams.html

About Author

Lucas Jellema, active in IT (and with Oracle) since 1994. Oracle ACE Director and Oracle Developer Champion. Solution architect and developer on diverse areas including SQL, JavaScript, Docker, Machine Learning, Java, SOA and microservices, events in various shapes and forms and many other things. Author of the Oracle Press books: Oracle SOA Suite 11g Handbook and Oracle SOA Suite 12c Handbook. Frequent presenter on community events and conferences such as JavaOne, Oracle Code and Oracle OpenWorld.


Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.