Getting Started with Kafka Streams – building a streaming analytics Java application against a Kafka Topic

7

Kafka Streams is a light weight Java library for creating advanced streaming applications on top of Apache Kafka Topics. Kafka Streams provides easy to use constructs that allow quick and almost declarative composition by Java developers of streaming pipelines that do running aggregates, real time filtering, time windows, joining of streams. Results from the streaming analysis can easily be published to Kafka Topic or to external destinations. Despite the close integration with Kafka and the many out of the box library elements, the application created with Kafka Streams is just a Java application, that can be deployed and run wherever Java applications can run (which is of course virtually anywhere).

In this article I will show you my first steps with Kafka Streams. I will create a simple Kafka Streams application that streams messages from a Kafka Topic, processes them after dimensioning them (grouping by a specific key) and then keeps a running count. The running count is produced to a second Kafka Topic (as well as written to the console). Anyone interested in the outcome of this streaming analysis can consume this topic – without any dependency on the Kafka Streams based logic.

This next figure shows the application. Country messages – simple JSON messages that describe a country with properties such as name, continent, population and size – are produced to a Kafka Topic. (this is done from a simple Node.JS application that reads the data from a CSV file and publishes the records to a Kafka Topic- this application is described in this article: NodeJS – Publish messages to Apache Kafka Topic with random delays to generate sample events based on records in CSV file). The Kafka Streams application consists of a single Java Class that creates a stream from the Kafka Topic. Elements in the stream are assigned a key – the continent – and are then counted-by-key. The result (the running count of countries per continent) is routed to an outbound stream that produces messages to a second Kafka Topic.

image

 

My starting point in this article is:

  • a running Kafka Cluster somewhere on a server (or as is actually the case, in a VM running on my laptop)
  • locally installed Java 8 (1.8.0_72)
  • locally installed Maven (3.2.5)
  • (optional) locally installed Node.js (6.9.4)

image

You will find all sources discussed in this article in this GitHub Repo: https://github.com/lucasjellema/kafka-streams-getting-started .

The steps discussed below:

  • Initialize a new Maven project
  • Add dependencies on Kafka Streams to Maven pom-file
  • Implement Java Application:
    • create KStream using StreamBuilder for Kafka Topic countries
    • selectKey for messages in the KStream: continent
    • countByKey the messages in the KStream
    • route the resulting running count messages to a stream for the destination Kafka Topic RunningCountryCountPerContinent
    • print the running count messages to the console
  • Compile the application and build a JAR file – using mvn package (note: we will do a little tweaking on the exact dependencies)
  • Download all required JAR files needed for running the Kafka Streams application – using mvn install dependency:copy-dependencies
  • optional: Run NodeJS application to produce country messages to Kafka Topic countries Alternatively: manually publish country messages, create another application to publish country messages or use Kafka Connect to bring country messages across to the countries Topic
  • Run the Java application using the Maven generated JAR file and all JARs downloaded by Maven; this will produce messages on the Kafka Topic (which we can inspect, for example using Kafka Tool) and print messages to the console (that are even easier to inspect).

If you want to inspect and perhaps edit the code but not necessarily create the application from scratch, you can clone the GitHub Repo: git clone https://github.com/lucasjellema/kafka-streams-getting-started.

My starting point is the local file system, freshly cloned from a nearly empty GitHub repository. I have included the NodeJS application that will produce the country messages to the Kafka topic:

SNAGHTML34420f

1. Initialize a new Maven project

 

mvn archetype:generate -DgroupId=nl.amis.streams.countries -DartifactId=Kafka-Streams-Country-Counter -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

 

image

The result:

image

 

2. Add dependency on Kafka Streams to the Maven pom-file

The Maven Repo identifier is found from Maven Central: https://search.maven.org

image

  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
     <version>0.10.0.0</version>
   </dependency>

image
 

3. Implement Java Application

  • create KStream using StreamBuilder for Kafka Topic countries
  • selectKey for messages in the KStream: continent
  • countByKey the messages in the KStream
  • route the resulting running count messages to a stream for the destination Kafka Topic RunningCountryCountPerContinent
  • print the running count messages to the console

image

Or in detail:

 

public class App {
    static public class CountryMessage {
        /* the JSON messages produced to the countries Topic have this structure:
         { "name" : "The Netherlands"
         , "code" : "NL
         , "continent" : "Europe"
         , "population" : 17281811
         , "size" : 42001
         };
  
        this class needs to have at least the corresponding fields to deserialize the JSON messages into
        */

        public String code;
        public String name;
        public int population;
        public int size;
        public String continent;
    }

    private static final String APP_ID = "countries-streaming-analysis-app";

    public static void main(String[] args) {
        System.out.println("Kafka Streams Demonstration");

        // Create an instance of StreamsConfig from the Properties instance
        StreamsConfig config = new StreamsConfig(getProperties());
        final Serde < String > stringSerde = Serdes.String();
        final Serde < Long > longSerde = Serdes.Long();

        // define countryMessageSerde
        Map < String, Object > serdeProps = new HashMap < > ();
        final Serializer < CountryMessage > countryMessageSerializer = new JsonPOJOSerializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageSerializer.configure(serdeProps, false);

        final Deserializer < CountryMessage > countryMessageDeserializer = new JsonPOJODeserializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageDeserializer.configure(serdeProps, false);
        final Serde < CountryMessage > countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer, countryMessageDeserializer);

        // building Kafka Streams Model
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        // the source of the streaming analysis is the topic with country messages
        KStream<String, CountryMessage> countriesStream = 
                                       kStreamBuilder.stream(stringSerde, countryMessageSerde, "countries");

        // THIS IS THE CORE OF THE STREAMING ANALYTICS:
        // running count of countries per continent, published in topic RunningCountryCountPerContinent
        KTable<String,Long> runningCountriesCountPerContinent = countriesStream
                                                                 .selectKey((k, country) -> country.continent)
                                                                 .countByKey("Counts")
                                                                 ;
        runningCountriesCountPerContinent.to(stringSerde, longSerde,  "RunningCountryCountPerContinent");
        runningCountriesCountPerContinent.print(stringSerde, longSerde);



        System.out.println("Starting Kafka Streams Countries Example");
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
        kafkaStreams.start();
        System.out.println("Now started CountriesStreams Example");
    }

    private static Properties getProperties() {
        Properties settings = new Properties();
        // Set a few key parameters
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
        // Kafka bootstrap server (broker to talk to); ubuntu is the host name for my VM running Kafka, port 9092 is where the (single) broker listens 
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092");
        // Apache ZooKeeper instance keeping watch over the Kafka cluster; ubuntu is the host name for my VM running Kafka, port 2181 is where the ZooKeeper listens 
        settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "ubuntu:2181");
        // default serdes for serialzing and deserializing key and value from and to streams in case no specific Serde is specified
        settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        settings.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
        // to work around exception Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
        // at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
        // see: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo
        settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
        return settings;
    }

Note: the Serde is an object that carries a serializer and a deserializer for a specific data type, used to serialize and deserialize keys and values into and from messages on a Kafka topic. Whenever our Java client consumes or produces elements, the Serde for those elements has to be provided. In this case, I have crafted the countryMessageSerde for the CountryMessage Java Class that is instantiated from a JSON message that is the value of consumed Kafka messages. This Serde carries a serializer and deserializer based on the JsonPOJODeserializer and JsonPOJOSerializer that are generic JSON to Java mappers, using the Jackson library for doing so.

 

4. Compile the application and build a JAR file – using mvn package

(note: we will later on do a little tweaking on the exact dependencies and set the correct Java version)

 

Add the following plugin in the Maven pom file, to ensure that compilation is done for Java version 1.8 (8.0); this is required for Lambda expressions (Java 8) and use of Generics (Java 7)

...
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

 

 

5. Download all required JAR files needed for running the Kafka Streams application

using mvn install dependency:copy-dependencies

mvn install dependency:copy-dependencies

image

All JAR files that follow from the dependencies defined in the pom.xml file are downloaded to the directory Kafka-Streams-Country-Counter\target\dependency

image

 

6. Produce Country Messages to Kafka Topic

optional: Run NodeJS application to produce country messages to Kafka Topic countries Alternatively: manually publish country messages, create another application to publish country messages or use Kafka Connect to bring country messages across to the countries Topic

image

7. Run the Java application using the Maven generated JAR file and all JARs downloaded by Maven

java -cp target/Kafka-Streams-Country-Counter-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App

(note: on Linux, the semi colon separating the jar files should be a colon)

 

I ran into several exceptions at this point. I will list them and show the resolutions:

Exception in thread “StreamThread-1” org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

        … 1 more
Caused by: java.io.FileNotFoundException: C:\tmp\kafka-streams\countries-streaming-analysis-app\0_0\.lock (The system cannot find the path specified)
        at java.io.RandomAccessFile.open0(Native Method)

 

Add the following line in the Java code getProperties() method:

settings.put(StreamsConfig.STATE_DIR_CONFIG , “C:\\tmp”); // on Windows

or

settings.put(StreamsConfig.STATE_DIR_CONFIG , “/tmp”); // on Linux

Exception in thread “StreamThread-1” java.lang.IllegalArgumentException: Invalid timestamp -1
        at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)

See: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo for details on this exception.

Add following line in Java code, method getProperties():

settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

    and the associated import:

    import org.apache.kafka.streams.processor.WallclockTimestampExtractor;

    Exception in thread “StreamThread-1” java.lang.ExceptionInInitializerError
            at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:47)
            at org.rocksdb.RocksDB.<clinit>(RocksDB.java:23)
            at org.rocksdb.Options.<clinit>(Options.java:21)
            at org.apache.kafka.streams.state.internals.RocksDBStore.<init>(RocksDBStore.java:126)

    Caused by: java.lang.UnsupportedOperationException
            at org.rocksdb.util.Environment.getJniLibraryName(Environment.java:40)
            at org.rocksdb.NativeLibraryLoader.<clinit>(NativeLibraryLoader.java:19)

     

    This exception can occur on Windows and is caused by the fact that the version of RocksDB that Kafka Streams 0.10.0.0 has a dependency on does not include the required Windows DLL; RockDB 4.9 does include that DLL.

    Add dependency to Maven pom.xml:

        <!-- https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni -->
       <dependency>
          <groupId>org.rocksdb</groupId>
          <artifactId>rocksdbjni</artifactId>
          <version>4.9.0</version>
      </dependency>
    

     

    8. Run the Kafka Streams application

    Run (again)

    • mvn package
    • mvn install dependency:copy-dependencies
    • java -cp target/Kafka-Streams-Country-Counter-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App

     

    And now, finally, the running count is produced, both to the console:

    image

    and to the Kafka Topic, here seen in Kafka Tool:

    image

    The messages have a String type Key and a Long type value.

    Note: the topic is in the blue rectangle – countries-streaming-analysis-app-Counts-changelog – is created by the Kafka Streams library  as an intermediate change log for the running count. Instead of keeping the temporary results [only] in memory, they are produced to a Kafka Topic as well.

     

    Resources

    On Line Java Beautifier – http://www.tutorialspoint.com/online_java_formatter.htm

    About Author

    Lucas Jellema, active in IT (and with Oracle) since 1994. Oracle ACE Director and Oracle Developer Champion. Solution architect and developer on diverse areas including SQL, JavaScript, Kubernetes & Docker, Machine Learning, Java, SOA and microservices, events in various shapes and forms and many other things. Author of the Oracle Press book Oracle SOA Suite 12c Handbook. Frequent presenter on user groups and community events and conferences such as JavaOne, Oracle Code, CodeOne, NLJUG JFall and Oracle OpenWorld.

    7 Comments

    1. Thanking you Lucas for the post.
      Out of curiosity I tried your application on Ubuntu 16.04 LTS and used Kafka as 0.10.0 as in pom.xml.
      But at run time used confluent 5.0.0 OSS platform.
      There was an issue of Exception in thread “StreamThread-1” org.apache.kafka.streams.errors.StreamsException: Failed to rebalance and Caused by: java.io.FileNotFoundException: /tmp/countries-streaming-analysis-app/0_0/.lock (The system cannot find the path specified) as you mentioned

      I changed the code from
      settings.put(StreamsConfig.STATE_DIR_CONFIG, “/tmp”);
      to
      settings.put(StreamsConfig.STATE_DIR_CONFIG, “/tmp/kafka-streams”);
      After explicitly created a directory kafka-streams under /tmp
      Becuase state.dir variable default is /tmp/kafka-streams, Path to the base directory for a state storage.

      It works. May be it is useful to someone else!

      Regards
      Gopinathan Munappy

    2. hi
      i’m in a big problem my end of studies project about building a plateforme with microservice using symfony 4 ,apache kafka and kong gateway
      and i don’t know how so can you help me and give me an exemple
      thank you

    3. Milan Agatonovic on

      Nice article, thanks.
      Do you have maybe an idea how to do ntile on kafka streams, not only Top N: e.g Best Third, Second Best Third, Last Third per dimension. I am actually trying to do RFM calculation on the fly and store the calculated scores in KTable as result.

      • Lucas Jellema on

        That is a nice challenge – that I have not worked on yet. Perhaps there will be time. For now I am afraid you are on your own, Maybe the new KSQL library can help you out though.

        kind regards

        Lucas

    4. Hi Lucas,
      This is a very nice article. The example is very good. i like that you use node and Java in the same time.
      I suggest use localhost instead of ubuntu as host in your code and it will work to everyone without modification. I’ve run it on WIndows 7, the same for kafka server and zookeeper.
      Regards,

    5. Matthias J. Sax on

      Thanks for this writeup! It’s a very nice guideline on how to use Kafka’s Streams API.

      Just some comments/question:
      – why did you use 0.10.0.0 — that is a quite old version? For 0.10.1, you would not have hit the RocksDB issues, as we dumped up the dependent version number there already.
      – for the Timestamp issue: it seems that your producer does not write record metadata timestamps — I guess it’s basically a producer issue (cf. https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-IsawexceptionsrunningmyStreamsappforthefirsttimeas“Extractedtimestampvalueisnegative,whichisnotallowed.”Whatdoesthismean?)

      When you did hit the described errors, was it a problem for you to figure out what went wrong (ie, are the error messages helpful enough)?

      Btw: Windows is officially not supported… 🙁

    Leave a Reply

    This site uses Akismet to reduce spam. Learn how your comment data is processed.