Kafka Streams is a light weight Java library for creating advanced streaming applications on top of Apache Kafka Topics. Kafka Streams provides easy to use constructs that allow quick and almost declarative composition by Java developers of streaming pipelines that do running aggregates, real time filtering, time windows, joining of streams. Results from the streaming analysis can easily be published to Kafka Topic or to external destinations. Despite the close integration with Kafka and the many out of the box library elements, the application created with Kafka Streams is just a Java application, that can be deployed and run wherever Java applications can run (which is of course virtually anywhere).
In this article I will show you my first steps with Kafka Streams. I will create a simple Kafka Streams application that streams messages from a Kafka Topic, processes them after dimensioning them (grouping by a specific key) and then keeps a running count. The running count is produced to a second Kafka Topic (as well as written to the console). Anyone interested in the outcome of this streaming analysis can consume this topic – without any dependency on the Kafka Streams based logic.
This next figure shows the application. Country messages – simple JSON messages that describe a country with properties such as name, continent, population and size – are produced to a Kafka Topic. (this is done from a simple Node.JS application that reads the data from a CSV file and publishes the records to a Kafka Topic- this application is described in this article: NodeJS – Publish messages to Apache Kafka Topic with random delays to generate sample events based on records in CSV file). The Kafka Streams application consists of a single Java Class that creates a stream from the Kafka Topic. Elements in the stream are assigned a key – the continent – and are then counted-by-key. The result (the running count of countries per continent) is routed to an outbound stream that produces messages to a second Kafka Topic.
My starting point in this article is:
- a running Kafka Cluster somewhere on a server (or as is actually the case, in a VM running on my laptop)
- locally installed Java 8 (1.8.0_72)
- locally installed Maven (3.2.5)
- (optional) locally installed Node.js (6.9.4)
You will find all sources discussed in this article in this GitHub Repo: https://github.com/lucasjellema/kafka-streams-getting-started .
The steps discussed below:
- Initialize a new Maven project
- Add dependencies on Kafka Streams to Maven pom-file
- Implement Java Application:
- create KStream using StreamBuilder for Kafka Topic countries
- selectKey for messages in the KStream: continent
- countByKey the messages in the KStream
- route the resulting running count messages to a stream for the destination Kafka Topic RunningCountryCountPerContinent
- print the running count messages to the console
- Compile the application and build a JAR file – using mvn package (note: we will do a little tweaking on the exact dependencies)
- Download all required JAR files needed for running the Kafka Streams application – using mvn install dependency:copy-dependencies
- optional: Run NodeJS application to produce country messages to Kafka Topic countries Alternatively: manually publish country messages, create another application to publish country messages or use Kafka Connect to bring country messages across to the countries Topic
- Run the Java application using the Maven generated JAR file and all JARs downloaded by Maven; this will produce messages on the Kafka Topic (which we can inspect, for example using Kafka Tool) and print messages to the console (that are even easier to inspect).
If you want to inspect and perhaps edit the code but not necessarily create the application from scratch, you can clone the GitHub Repo: git clone https://github.com/lucasjellema/kafka-streams-getting-started.
My starting point is the local file system, freshly cloned from a nearly empty GitHub repository. I have included the NodeJS application that will produce the country messages to the Kafka topic:
1. Initialize a new Maven project
mvn archetype:generate -DgroupId=nl.amis.streams.countries -DartifactId=Kafka-Streams-Country-Counter -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
The result:
2. Add dependency on Kafka Streams to the Maven pom-file
The Maven Repo identifier is found from Maven Central: https://search.maven.org
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.0.0</version> </dependency>
3. Implement Java Application
- create KStream using StreamBuilder for Kafka Topic countries
- selectKey for messages in the KStream: continent
- countByKey the messages in the KStream
- route the resulting running count messages to a stream for the destination Kafka Topic RunningCountryCountPerContinent
- print the running count messages to the console
Or in detail:
public class App { static public class CountryMessage { /* the JSON messages produced to the countries Topic have this structure: { "name" : "The Netherlands" , "code" : "NL , "continent" : "Europe" , "population" : 17281811 , "size" : 42001 }; this class needs to have at least the corresponding fields to deserialize the JSON messages into */ public String code; public String name; public int population; public int size; public String continent; } private static final String APP_ID = "countries-streaming-analysis-app"; public static void main(String[] args) { System.out.println("Kafka Streams Demonstration"); // Create an instance of StreamsConfig from the Properties instance StreamsConfig config = new StreamsConfig(getProperties()); final Serde < String > stringSerde = Serdes.String(); final Serde < Long > longSerde = Serdes.Long(); // define countryMessageSerde Map < String, Object > serdeProps = new HashMap < > (); final Serializer < CountryMessage > countryMessageSerializer = new JsonPOJOSerializer < > (); serdeProps.put("JsonPOJOClass", CountryMessage.class); countryMessageSerializer.configure(serdeProps, false); final Deserializer < CountryMessage > countryMessageDeserializer = new JsonPOJODeserializer < > (); serdeProps.put("JsonPOJOClass", CountryMessage.class); countryMessageDeserializer.configure(serdeProps, false); final Serde < CountryMessage > countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer, countryMessageDeserializer); // building Kafka Streams Model KStreamBuilder kStreamBuilder = new KStreamBuilder(); // the source of the streaming analysis is the topic with country messages KStream<String, CountryMessage> countriesStream = kStreamBuilder.stream(stringSerde, countryMessageSerde, "countries"); // THIS IS THE CORE OF THE STREAMING ANALYTICS: // running count of countries per continent, published in topic RunningCountryCountPerContinent KTable<String,Long> runningCountriesCountPerContinent = countriesStream .selectKey((k, country) -> country.continent) .countByKey("Counts") ; runningCountriesCountPerContinent.to(stringSerde, longSerde, "RunningCountryCountPerContinent"); runningCountriesCountPerContinent.print(stringSerde, longSerde); System.out.println("Starting Kafka Streams Countries Example"); KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config); kafkaStreams.start(); System.out.println("Now started CountriesStreams Example"); } private static Properties getProperties() { Properties settings = new Properties(); // Set a few key parameters settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); // Kafka bootstrap server (broker to talk to); ubuntu is the host name for my VM running Kafka, port 9092 is where the (single) broker listens settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092"); // Apache ZooKeeper instance keeping watch over the Kafka cluster; ubuntu is the host name for my VM running Kafka, port 2181 is where the ZooKeeper listens settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "ubuntu:2181"); // default serdes for serialzing and deserializing key and value from and to streams in case no specific Serde is specified settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); settings.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp"); // to work around exception Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1 // at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60) // see: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); return settings; }
Note: the Serde is an object that carries a serializer and a deserializer for a specific data type, used to serialize and deserialize keys and values into and from messages on a Kafka topic. Whenever our Java client consumes or produces elements, the Serde for those elements has to be provided. In this case, I have crafted the countryMessageSerde for the CountryMessage Java Class that is instantiated from a JSON message that is the value of consumed Kafka messages. This Serde carries a serializer and deserializer based on the JsonPOJODeserializer and JsonPOJOSerializer that are generic JSON to Java mappers, using the Jackson library for doing so.
4. Compile the application and build a JAR file – using mvn package
(note: we will later on do a little tweaking on the exact dependencies and set the correct Java version)
Add the following plugin in the Maven pom file, to ensure that compilation is done for Java version 1.8 (8.0); this is required for Lambda expressions (Java 8) and use of Generics (Java 7)
... </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
5. Download all required JAR files needed for running the Kafka Streams application
using mvn install dependency:copy-dependencies
mvn install dependency:copy-dependencies
All JAR files that follow from the dependencies defined in the pom.xml file are downloaded to the directory Kafka-Streams-Country-Counter\target\dependency
6. Produce Country Messages to Kafka Topic
optional: Run NodeJS application to produce country messages to Kafka Topic countries Alternatively: manually publish country messages, create another application to publish country messages or use Kafka Connect to bring country messages across to the countries Topic
7. Run the Java application using the Maven generated JAR file and all JARs downloaded by Maven
java -cp target/Kafka-Streams-Country-Counter-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App
(note: on Linux, the semi colon separating the jar files should be a colon)
I ran into several exceptions at this point. I will list them and show the resolutions:
Exception in thread “StreamThread-1” org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
… 1 more
Caused by: java.io.FileNotFoundException: C:\tmp\kafka-streams\countries-streaming-analysis-app\0_0\.lock (The system cannot find the path specified)
at java.io.RandomAccessFile.open0(Native Method)
Add the following line in the Java code getProperties() method:
settings.put(StreamsConfig.STATE_DIR_CONFIG , “C:\\tmp”); // on Windows
or
settings.put(StreamsConfig.STATE_DIR_CONFIG , “/tmp”); // on Linux
Exception in thread “StreamThread-1” java.lang.IllegalArgumentException: Invalid timestamp -1
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
See: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo for details on this exception.
Add following line in Java code, method getProperties():
settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
and the associated import:
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
Exception in thread “StreamThread-1” java.lang.ExceptionInInitializerError
at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:47)
at org.rocksdb.RocksDB.<clinit>(RocksDB.java:23)
at org.rocksdb.Options.<clinit>(Options.java:21)
at org.apache.kafka.streams.state.internals.RocksDBStore.<init>(RocksDBStore.java:126)
…
Caused by: java.lang.UnsupportedOperationException
at org.rocksdb.util.Environment.getJniLibraryName(Environment.java:40)
at org.rocksdb.NativeLibraryLoader.<clinit>(NativeLibraryLoader.java:19)
This exception can occur on Windows and is caused by the fact that the version of RocksDB that Kafka Streams 0.10.0.0 has a dependency on does not include the required Windows DLL; RockDB 4.9 does include that DLL.
Add dependency to Maven pom.xml:
<!-- https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni --> <dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> <version>4.9.0</version> </dependency>
8. Run the Kafka Streams application
Run (again)
- mvn package
- mvn install dependency:copy-dependencies
- java -cp target/Kafka-Streams-Country-Counter-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App
And now, finally, the running count is produced, both to the console:
and to the Kafka Topic, here seen in Kafka Tool:
The messages have a String type Key and a Long type value.
Note: the topic is in the blue rectangle – countries-streaming-analysis-app-Counts-changelog – is created by the Kafka Streams library as an intermediate change log for the running count. Instead of keeping the temporary results [only] in memory, they are produced to a Kafka Topic as well.
Resources
On Line Java Beautifier – http://www.tutorialspoint.com/online_java_formatter.htm
Thanking you Lucas for the post.
Out of curiosity I tried your application on Ubuntu 16.04 LTS and used Kafka as 0.10.0 as in pom.xml.
But at run time used confluent 5.0.0 OSS platform.
There was an issue of Exception in thread “StreamThread-1” org.apache.kafka.streams.errors.StreamsException: Failed to rebalance and Caused by: java.io.FileNotFoundException: /tmp/countries-streaming-analysis-app/0_0/.lock (The system cannot find the path specified) as you mentioned
I changed the code from
settings.put(StreamsConfig.STATE_DIR_CONFIG, “/tmp”);
to
settings.put(StreamsConfig.STATE_DIR_CONFIG, “/tmp/kafka-streams”);
After explicitly created a directory kafka-streams under /tmp
Becuase state.dir variable default is /tmp/kafka-streams, Path to the base directory for a state storage.
It works. May be it is useful to someone else!
Regards
Gopinathan Munappy
Thanks Gopinathan for your research and providing this feedback.
Kind regards,
Lucas
hi
i’m in a big problem my end of studies project about building a plateforme with microservice using symfony 4 ,apache kafka and kong gateway
and i don’t know how so can you help me and give me an exemple
thank you
Nice article, thanks.
Do you have maybe an idea how to do ntile on kafka streams, not only Top N: e.g Best Third, Second Best Third, Last Third per dimension. I am actually trying to do RFM calculation on the fly and store the calculated scores in KTable as result.
That is a nice challenge – that I have not worked on yet. Perhaps there will be time. For now I am afraid you are on your own, Maybe the new KSQL library can help you out though.
kind regards
Lucas
Hi Lucas,
This is a very nice article. The example is very good. i like that you use node and Java in the same time.
I suggest use localhost instead of ubuntu as host in your code and it will work to everyone without modification. I’ve run it on WIndows 7, the same for kafka server and zookeeper.
Regards,
Thanks for this writeup! It’s a very nice guideline on how to use Kafka’s Streams API.
Just some comments/question:
– why did you use 0.10.0.0 — that is a quite old version? For 0.10.1, you would not have hit the RocksDB issues, as we dumped up the dependent version number there already.
– for the Timestamp issue: it seems that your producer does not write record metadata timestamps — I guess it’s basically a producer issue (cf. https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-IsawexceptionsrunningmyStreamsappforthefirsttimeas“Extractedtimestampvalueisnegative,whichisnotallowed.”Whatdoesthismean?)
When you did hit the described errors, was it a problem for you to figure out what went wrong (ie, are the error messages helpful enough)?
Btw: Windows is officially not supported… 🙁