Get going with KSQL on Kubernetes


imageThis article describes how to quickly get going with KSQL on Kubernetes. KSQL is Confuent’s ‘continuous streaming query language’. It allows us to write SQL-like queries that operate on Kafka Topics. Queries that join, filter and aggregate – for each event that gets produced and over time windows. Results are also published to Kafka Topics. A KSQL query can be created as a Table or Stream: as a background process running in the KSQL Server – a piece of [Java] logic that consumes, processes and produces events, until the query is explicitly stopped – i.e. the Table or Stream is dropped.

At the time of writing, there is not yet an official Docker Container image for KSQL. It is automatically included in the full blown Confluent platform Docker Compiser set – configured to run against the Kafka Cluster that is also in that constellation:

What I am looking for is a way to run KSQL on my Kubernetes cluster, and preferably not the full platform. And more importantly: I want that KSQL instance to interact with my own Kafka Cluster, that is also running on that same Kubernetes instance. In a previous article – 15 Minutes to get a Kafka Cluster running on Kubernetes – and start producing and consuming from a Node application  – I described how to get going with Apache Kafka on Kubernetes. The final situation in that article is my starting point in this piece.

The Docker Composer yaml file for the fullblown Confuent Platform provides the necessary clue: it uses the Docker Container Image confluentinc/ksql-cli:4.1.0 as part of its setup. And I can work with that.

The steps:

  1. Create Kubernetes YAML for KSQL [server]Pod
  2. Apply YAML file with Kubectl – this creates the Pod with the running container
  3. Open bash shell on the pod and run ksql [command line utility]
  4. Do the KSQL thing (create Streams and Tables, execute queries, …) against the local Kafka cluster

The piece the resistance of this article undoubtedly is the Pod YAML file. And the key element in that file is the reference to the local Kafka Cluster that is passed through the environment variable KSQL_OPTS that contains the bootstrap.servers parameter which is set to broker.kafka:9092, which is the endpoint within the Kubernetes Cluster for the Kafka broker. This settings ties the KSQK server to the local Kafka Cluster.

The full Pod YAML file is listed below:

apiVersion: v1
kind: Pod
  name: ksql-server
    app: ksql-server
  nodeName: minikube
  - name: ksql-server
    # get latest version of image
    image: confluentinc/ksql-cli:4.1.0
    imagePullPolicy: IfNotPresent
    command: ["/bin/bash"]
    args: ["-c","echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker.kafka:9092 1 20 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 2 && \
                       /usr/bin/ksql-server-start /etc/ksql/"]
    - name: KSQL_CONFIG_DIR
      value: "/etc/ksql"
    - name: KSQL_OPTS
      value: "-Dbootstrap.servers=broker.kafka:9092"
    - name: KSQL_LOG4J_OPTS
      value: "-Dlog4j.configuration=file:/etc/ksql/"
    # containerPort is the port exposed by the container (where ksql server is listening)
    - containerPort: 8088

With this Pod definition available, the next steps are very straightforward. Create the Pod using kubectl:

kubectl apply -f ksqlServerPod.yaml

this will create a Pod that runs the KSQL server against the Kafka Broker in Kubernetes (on broker.kafka:9092).

To start ksql commandline, either use Kubernetes Dashboard, select ksql-server pod and use EXEC option


or from command line on laptop (minikube host) use:

kubectl exec -it ksql-server /bin/bash

In both cases, you will land on the command line of the KSQL server. Simply type:


The KSQL command line utility will start – and we can start doing KSQL things.

For example:

list topics;

(this will display a list of all Kafka Topics)

To create a stream on topic workflowEvents:

create stream wf_s (workflowConversationIdentifier VARCHAR, payload map<VARCHAR,VARCHAR>, updateTimeStamp VARCHAR, lastUpdater VARCHAR, audit array<map<VARCHAR,VARCHAR>>) WITH (kafka_topic='workflowEvents', value_format='JSON', key='workflowConversationIdentifier');

And to query from that stream:

select workflowConversationIdentifier, audit, lastUpdater, updateTimeStamp, payload, payload['author'], payload['workflowType'], payload['lastUpdater'] from wf_s;

Note: messages published to topic workflowEvents have a JSON payload with the following structure:

    "workflowType": "oracle-code-tweet-processor",
    "workflowConversationIdentifier": "OracleCodeTweetProcessor1525151206872",
    "creationTimeStamp": 1525148332841,
    "creator": "WorkflowLauncher",
    "audit": [
            "when": 1525148332841,
            "who": "WorkflowLauncher",
            "what": "creation",
            "comment": "initial creation of workflow"
            "when": 1525151212318,
            "who": "TweetBoard",
            "what": "update",
            "comment": "Tweet Board Capture done"
    "payload": {
        "text": "#556 Today is a microservice workshop at Fontys Hogeschool in Eindhoven",
        "author": "lucasjellema",
        "authorImageUrl": "",
        "createdTime": "May 9th, 2018 at 08:39AM",
        "tweetURL": "",
        "firstLinkFromTweet": "",
        "enrichment": "Lots of Money",
        "translations": [
            "# 556 Heute ist ein Microservice-Workshop in der Fontys Hogeschool in Eindhoven",
            "# 556 Vandaag is een microservice-workshop aan de Fontys Hogeschool in Eindhoven"
    "updateTimeStamp": 1525151212318,
    "lastUpdater": "TweetBoard"

Note: I ran into the following error message when I tried to select from a stream:

Input record ConsumerRecord(topic = logTopic, partition = 0, offset = 268, CreateTime = -1, serialized key size = 8, serialized value size = 1217, headers = RecordHeaders(headers = [], isReadOnly = false), key = logEntry, value = [ null | null | null | null ]) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

The workaround for this – until such moment that I can fix  the producer of these events – is to instruct KSQL to use a special TimestampExtractor. This is done with the following command:

SET 'timestamp.extractor'='org.apache.kafka.streams.processor.WallclockTimestampExtractor';

You can list all properties values, using:

Alternatively, instead of using this generic WallclockTimestampExtractor, we can also use the TIMESTAMP property in the WITH clause of a CREATE STREAM or TABLE statement to assign an arbitrary column from the message payload as the timestamp of the row, if we happen to have such a column:

create stream wf_s (workflowConversationIdentifier VARCHAR, payload map<VARCHAR,VARCHAR>, updateTimeStamp LONG, lastUpdater VARCHAR, audit array<map<VARCHAR,VARCHAR>>) WITH (kafka_topic='workflowEvents', value_format='JSON', key='workflowConversationIdentifier', TIMESTAMP='updateTimeStamp');

(Thanks Robin Moffat for this tip).
Another good tip from Robin: use kafakcat for mocking up topics. E.g. to reproduce your blog I just ran this, and you can also use it to pull from flat files as well for multiple messages (or indeed just pipe multiple messages on stdin)

echo '{"workflowType":"oracle-code-tweet-processor","workflowConversationIdentifier":"OracleCodeTweetProcessor1525151206872","creationTimeStamp":1525148332841,"creator":"WorkflowLauncher","audit":[{"when":15251483328
41,"who":"WorkflowLauncher","what":"creation","comment":"initial creation of workflow"},{"when":1525151212318,"who":"TweetBoard","what":"update","comment":"Tweet Board Capture done"}],"payload":{"text":"#556 Today is a microservice wo
rkshop at Fontys Hogeschool in Eindhoven","author":"lucasjellema","authorImageUrl":"","createdTime":"May 9th, 2018 at 08:39AM","tweetURL":"
ibotAirport/status/853935915714138112","firstLinkFromTweet":"","enrichment":"Lots of Money","translations":["# 556 Heute ist ein Microservice-Workshop in der Fontys Hogeschool in Eindhoven","# 556 Vandaag is een
 microservice-workshop aan de Fontys Hogeschool in Eindhoven"]},"updateTimeStamp":1525151212318,"lastUpdater":"TweetBoard"}'|kafkacat -P -b broker.kafka:9092 -t workflowEvents


KSQL Syntax Reference – 

KSQL Examples –

WallclockTimeExtractor – 

Confluent Docker Quick Start – 

By Naveen on Medium: Tools that make my life easier to work with kubernetes

About Author

Lucas Jellema, active in IT (and with Oracle) since 1994. Oracle ACE Director and Oracle Developer Champion. Solution architect and developer on diverse areas including SQL, JavaScript, Docker, Machine Learning, Java, SOA and microservices, events in various shapes and forms and many other things. Author of the Oracle Press books: Oracle SOA Suite 11g Handbook and Oracle SOA Suite 12c Handbook. Frequent presenter on community events and conferences such as JavaOne, Oracle Code and Oracle OpenWorld.


Leave a Reply