Getting Started with Kafka Streams - building a streaming analytics Java application against a Kafka Topic image 10

Getting Started with Kafka Streams – building a streaming analytics Java application against a Kafka Topic

Kafka Streams is a light weight Java library for creating advanced streaming applications on top of Apache Kafka Topics. Kafka Streams provides easy to use constructs that allow quick and almost declarative composition by Java developers of streaming pipelines that do running aggregates, real time filtering, time windows, joining of streams. Results from the streaming analysis can easily be published to Kafka Topic or to external destinations. Despite the close integration with Kafka and the many out of the box library elements, the application created with Kafka Streams is just a Java application, that can be deployed and run wherever Java applications can run (which is of course virtually anywhere).

In this article I will show you my first steps with Kafka Streams. I will create a simple Kafka Streams application that streams messages from a Kafka Topic, processes them after dimensioning them (grouping by a specific key) and then keeps a running count. The running count is produced to a second Kafka Topic (as well as written to the console). Anyone interested in the outcome of this streaming analysis can consume this topic – without any dependency on the Kafka Streams based logic.

This next figure shows the application. Country messages – simple JSON messages that describe a country with properties such as name, continent, population and size – are produced to a Kafka Topic. (this is done from a simple Node.JS application that reads the data from a CSV file and publishes the records to a Kafka Topic- this application is described in this article: NodeJS – Publish messages to Apache Kafka Topic with random delays to generate sample events based on records in CSV file). The Kafka Streams application consists of a single Java Class that creates a stream from the Kafka Topic. Elements in the stream are assigned a key – the continent – and are then counted-by-key. The result (the running count of countries per continent) is routed to an outbound stream that produces messages to a second Kafka Topic.

image

 

My starting point in this article is:

  • a running Kafka Cluster somewhere on a server (or as is actually the case, in a VM running on my laptop)
  • locally installed Java 8 (1.8.0_72)
  • locally installed Maven (3.2.5)
  • (optional) locally installed Node.js (6.9.4)

image

You will find all sources discussed in this article in this GitHub Repo: https://github.com/lucasjellema/kafka-streams-getting-started .

The steps discussed below:

  • Initialize a new Maven project
  • Add dependencies on Kafka Streams to Maven pom-file
  • Implement Java Application:
    • create KStream using StreamBuilder for Kafka Topic countries
    • selectKey for messages in the KStream: continent
    • countByKey the messages in the KStream
    • route the resulting running count messages to a stream for the destination Kafka Topic RunningCountryCountPerContinent
    • print the running count messages to the console
  • Compile the application and build a JAR file – using mvn package (note: we will do a little tweaking on the exact dependencies)
  • Download all required JAR files needed for running the Kafka Streams application – using mvn install dependency:copy-dependencies
  • optional: Run NodeJS application to produce country messages to Kafka Topic countries Alternatively: manually publish country messages, create another application to publish country messages or use Kafka Connect to bring country messages across to the countries Topic
  • Run the Java application using the Maven generated JAR file and all JARs downloaded by Maven; this will produce messages on the Kafka Topic (which we can inspect, for example using Kafka Tool) and print messages to the console (that are even easier to inspect).

If you want to inspect and perhaps edit the code but not necessarily create the application from scratch, you can clone the GitHub Repo: git clone https://github.com/lucasjellema/kafka-streams-getting-started.

My starting point is the local file system, freshly cloned from a nearly empty GitHub repository. I have included the NodeJS application that will produce the country messages to the Kafka topic:

SNAGHTML34420f

1. Initialize a new Maven project

 

mvn archetype:generate -DgroupId=nl.amis.streams.countries -DartifactId=Kafka-Streams-Country-Counter -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

 

image

The result:

image

 

2. Add dependency on Kafka Streams to the Maven pom-file

The Maven Repo identifier is found from Maven Central: https://search.maven.org

image

  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
     <version>0.10.0.0</version>
   </dependency>

image
 

3. Implement Java Application

  • create KStream using StreamBuilder for Kafka Topic countries
  • selectKey for messages in the KStream: continent
  • countByKey the messages in the KStream
  • route the resulting running count messages to a stream for the destination Kafka Topic RunningCountryCountPerContinent
  • print the running count messages to the console

image

Or in detail:

 

public class App {
    static public class CountryMessage {
        /* the JSON messages produced to the countries Topic have this structure:
         { "name" : "The Netherlands"
         , "code" : "NL
         , "continent" : "Europe"
         , "population" : 17281811
         , "size" : 42001
         };
  
        this class needs to have at least the corresponding fields to deserialize the JSON messages into
        */

        public String code;
        public String name;
        public int population;
        public int size;
        public String continent;
    }

    private static final String APP_ID = "countries-streaming-analysis-app";

    public static void main(String[] args) {
        System.out.println("Kafka Streams Demonstration");

        // Create an instance of StreamsConfig from the Properties instance
        StreamsConfig config = new StreamsConfig(getProperties());
        final Serde < String > stringSerde = Serdes.String();
        final Serde < Long > longSerde = Serdes.Long();

        // define countryMessageSerde
        Map < String, Object > serdeProps = new HashMap < > ();
        final Serializer < CountryMessage > countryMessageSerializer = new JsonPOJOSerializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageSerializer.configure(serdeProps, false);

        final Deserializer < CountryMessage > countryMessageDeserializer = new JsonPOJODeserializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageDeserializer.configure(serdeProps, false);
        final Serde < CountryMessage > countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer, countryMessageDeserializer);

        // building Kafka Streams Model
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        // the source of the streaming analysis is the topic with country messages
        KStream<String, CountryMessage> countriesStream = 
                                       kStreamBuilder.stream(stringSerde, countryMessageSerde, "countries");

        // THIS IS THE CORE OF THE STREAMING ANALYTICS:
        // running count of countries per continent, published in topic RunningCountryCountPerContinent
        KTable<String,Long> runningCountriesCountPerContinent = countriesStream
                                                                 .selectKey((k, country) -> country.continent)
                                                                 .countByKey("Counts")
                                                                 ;
        runningCountriesCountPerContinent.to(stringSerde, longSerde,  "RunningCountryCountPerContinent");
        runningCountriesCountPerContinent.print(stringSerde, longSerde);



        System.out.println("Starting Kafka Streams Countries Example");
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
        kafkaStreams.start();
        System.out.println("Now started CountriesStreams Example");
    }

    private static Properties getProperties() {
        Properties settings = new Properties();
        // Set a few key parameters
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
        // Kafka bootstrap server (broker to talk to); ubuntu is the host name for my VM running Kafka, port 9092 is where the (single) broker listens 
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092");
        // Apache ZooKeeper instance keeping watch over the Kafka cluster; ubuntu is the host name for my VM running Kafka, port 2181 is where the ZooKeeper listens 
        settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "ubuntu:2181");
        // default serdes for serialzing and deserializing key and value from and to streams in case no specific Serde is specified
        settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        settings.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
        // to work around exception Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
        // at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
        // see: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo
        settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
        return settings;
    }

Note: the Serde is an object that carries a serializer and a deserializer for a specific data type, used to serialize and deserialize keys and values into and from messages on a Kafka topic. Whenever our Java client consumes or produces elements, the Serde for those elements has to be provided. In this case, I have crafted the countryMessageSerde for the CountryMessage Java Class that is instantiated from a JSON message that is the value of consumed Kafka messages. This Serde carries a serializer and deserializer based on the JsonPOJODeserializer and JsonPOJOSerializer that are generic JSON to Java mappers, using the Jackson library for doing so.

 

4. Compile the application and build a JAR file – using mvn package

(note: we will later on do a little tweaking on the exact dependencies and set the correct Java version)

 

Add the following plugin in the Maven pom file, to ensure that compilation is done for Java version 1.8 (8.0); this is required for Lambda expressions (Java 8) and use of Generics (Java 7)

...
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

 

 

5. Download all required JAR files needed for running the Kafka Streams application

using mvn install dependency:copy-dependencies

mvn install dependency:copy-dependencies

image

All JAR files that follow from the dependencies defined in the pom.xml file are downloaded to the directory Kafka-Streams-Country-Counter\target\dependency

image

 

6. Produce Country Messages to Kafka Topic

optional: Run NodeJS application to produce country messages to Kafka Topic countries Alternatively: manually publish country messages, create another application to publish country messages or use Kafka Connect to bring country messages across to the countries Topic

image

7. Run the Java application using the Maven generated JAR file and all JARs downloaded by Maven

java -cp target/Kafka-Streams-Country-Counter-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App

(note: on Linux, the semi colon separating the jar files should be a colon)

 

I ran into several exceptions at this point. I will list them and show the resolutions:

Exception in thread “StreamThread-1” org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

        … 1 more
Caused by: java.io.FileNotFoundException: C:\tmp\kafka-streams\countries-streaming-analysis-app\0_0\.lock (The system cannot find the path specified)
        at java.io.RandomAccessFile.open0(Native Method)

 

Add the following line in the Java code getProperties() method:

settings.put(StreamsConfig.STATE_DIR_CONFIG , “C:\\tmp”); // on Windows

or

settings.put(StreamsConfig.STATE_DIR_CONFIG , “/tmp”); // on Linux

Exception in thread “StreamThread-1” java.lang.IllegalArgumentException: Invalid timestamp -1
        at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)

See: https://groups.google.com/forum/#!topic/confluent-platform/5oT0GRztPBo for details on this exception.

Add following line in Java code, method getProperties():

settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

and the associated import:

import org.apache.kafka.streams.processor.WallclockTimestampExtractor;

Exception in thread “StreamThread-1” java.lang.ExceptionInInitializerError
        at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:47)
        at org.rocksdb.RocksDB.<clinit>(RocksDB.java:23)
        at org.rocksdb.Options.<clinit>(Options.java:21)
        at org.apache.kafka.streams.state.internals.RocksDBStore.<init>(RocksDBStore.java:126)

Caused by: java.lang.UnsupportedOperationException
        at org.rocksdb.util.Environment.getJniLibraryName(Environment.java:40)
        at org.rocksdb.NativeLibraryLoader.<clinit>(NativeLibraryLoader.java:19)

 

This exception can occur on Windows and is caused by the fact that the version of RocksDB that Kafka Streams 0.10.0.0 has a dependency on does not include the required Windows DLL; RockDB 4.9 does include that DLL.

Add dependency to Maven pom.xml:

    <!-- https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni -->
   <dependency>
      <groupId>org.rocksdb</groupId>
      <artifactId>rocksdbjni</artifactId>
      <version>4.9.0</version>
  </dependency>

 

8. Run the Kafka Streams application

Run (again)

  • mvn package
  • mvn install dependency:copy-dependencies
  • java -cp target/Kafka-Streams-Country-Counter-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App

 

And now, finally, the running count is produced, both to the console:

image

and to the Kafka Topic, here seen in Kafka Tool:

image

The messages have a String type Key and a Long type value.

Note: the topic is in the blue rectangle – countries-streaming-analysis-app-Counts-changelog – is created by the Kafka Streams library  as an intermediate change log for the running count. Instead of keeping the temporary results [only] in memory, they are produced to a Kafka Topic as well.

 

Resources

On Line Java Beautifier – http://www.tutorialspoint.com/online_java_formatter.htm

7 Comments

  1. Gopinathan Munappy October 11, 2018
    • Lucas Jellema October 14, 2018
  2. imen March 23, 2018
  3. Milan Agatonovic September 20, 2017
    • Lucas Jellema October 11, 2017
  4. nistor May 17, 2017
  5. Matthias J. Sax February 16, 2017