Simple CQRS – Tweets to Apache Kafka to Elastic Search Index using a little Node code image 7

Simple CQRS – Tweets to Apache Kafka to Elastic Search Index using a little Node code

Put simply – CQRS (Command Query Responsibility Segregation) is an architecture pattern that recognizes the fact that it may be wise to separate the database that processes data manipulations from the engines that handle queries. When data retrieval requires special formats, scale, availability, TCO, location, search options and response times, it is worth considering introducing additional databases to handle those specific needs. These databases can provide data in a way that caters for the special needs to special consumers – by offering data in filtered, preprocessed format or shape or aggregation, with higher availability, at closer physical distance, with support for special search patterns and with better performance and scalability.

A note of caution: you only introduce CQRS in a system if there is a clear need for it. Not because you feel obliged to implement such a shiny, much talked about pattern or you feel as if everyone should have it. CQRS is not a simple thing – especially in existing systems, packaged applications and legacy databases. Detecting changes and extracting data from the source, transporting and converting the data and applying the data in a reliable, fast enough way with the required level of consistency is not trivial.

In many of my conference presentations, I show demonstrations with running software. To better clarify what I am talking about, to allow the audience to try things out for themselves and because doing demos usually is fun. And a frequent element in these demos is Twitter. Because it is well known and because it allows the audience to participate in the demo. I can invite an audience to tweet using an agreed hashtag and their tweets trigger the demo or at least make an appearance. In this article, I discuss one of these demos – showing an example of CQRS. The picture shows the outline: tweets are consumed by a Node application. Each tweet is converted to an event on a Kafka Topic. This event is consumed by a second Node application (potentially one of multiple instances in Kafka Consumer Group, to allow for more scalability. This Node application creates a new record in an Elastic Search index – the Query destination in this little CQRS spiel.  The out of the box dashboard tool Kibana allows us to quickly inspect and analyse the tweet records. Additionally we can create an advanced query service on top of Elastic Search.

This article shows the code behind this demo. This code as prepared for the JEEConf 2018 conference in Kyiv, Ukraine – and can be found in GitHub: https://github.com/lucasjellema/50-shades-of-data-jeeconf2018-kyiv/tree/master/twitter-kafka-demo .

image

The main elements in the demo:

1. Kafka Topic tweets-topic (in my demo, this topic is created in Oracle Cloud Event Hub Service, a managed Kafka cloud service)

2. Node application that consumes from Twitter – and publishes to the Kafka topic

3. (Postman Collection to create) Elastic Search Index plus custom mapping (primarily to extract proper creation date time value from a date string) (in my demo, this Elastic Search Index is created in a Elastic Search instance running in a Docker Container on Oracle Container Cloud)

4. Node application that consumes the events from the Kafka tweets-topic and turns each event into a new record in the index. In this demo, the Node application is also running on Oracle Cloud (Application Container Cloud), but that does not have to be the case

5. Kibana dashboard on top of the Tweets Index. In my demo, Kibana is also running in a Docker container in Oracle Container Cloud

1. Kafka Tweets Topic on Oracle Event Hub Cloud Service

image

After completing the wizard, the topic is created an can be accessed by producers and consumers.

2. Node application that consumes from Twitter – and publishes to the Kafka topic

The Node application consists of an index.js file that handles HTTP Requests – for health checking – and consumes from Twitter and pulishes to a Kafka Topic. It uses a file twitterconfig.js (not included) that contains the secret details of a Twitter client. The contents of this file should look like this – and should contain your own Twitter Client Details:

// CHANGE THIS **************************************************************
// go to https://apps.twitter.com/ to register your app
var twitterconfig = {
    consumer_key: 'mykey',
    consumer_secret: 'mysecret',
    access_token_key: 'at-key',
    access_token_secret: 'at-secret'  
    };
    
    module.exports = {twitterconfig};

The index.js file requires the npm libraries kafka-node and twit as well as express and http for handling http requests.

The code can be said to be divided in three parts:

  • initialization, create HTTP server and handle HTTP requests
  • Consume from Twitter
  • Publish to Kafka

Here are the three code sections:

const express = require('express');
var http = require('http')
const app = express();
var PORT = process.env.PORT || 8144;
const server = http.createServer(app);
var APP_VERSION = "0.0.3"

const startTime = new Date()
const bodyParser = require('body-parser');
app.use(bodyParser.json());
var tweetCount = 0;
app.get('/about', function (req, res) {
  var about = {
    "about": "Twitter Consumer and Producer to " + TOPIC_NAME,
    "PORT": process.env.PORT,
    "APP_VERSION ": APP_VERSION,
    "Running Since": startTime,
    "Total number of tweets processed": tweetCount

  }
  res.json(about);
})
server.listen(PORT, function listening() {
  console.log('Listening on %d', server.address().port);
});

Code for consuming from Twitter – in this case for the hash tags #jeeconf,#java and #oraclecode:

var Twit = require('twit');
const { twitterconfig } = require('./twitterconfig');

var T = new Twit({
  consumer_key: twitterconfig.consumer_key,
  consumer_secret: twitterconfig.consumer_secret,
  access_token: twitterconfig.access_token_key,
  access_token_secret: twitterconfig.access_token_secret,
  timeout_ms: 60 * 1000,
});


var twiterHashTags = process.env.TWITTER_HASHTAGS || '#oraclecode,#java,#jeeconf';
var tracks = { track: twiterHashTags.split(',') };

let tweetStream = T.stream('statuses/filter', tracks)
tweetstream(tracks, tweetStream);

function tweetstream(hashtags, tweetStream) {
  console.log("Started tweet stream for hashtag #" + JSON.stringify(hashtags));

  tweetStream.on('connected', function (response) {
    console.log("Stream connected to twitter for #" + JSON.stringify(hashtags));
  })
  tweetStream.on('error', function (error) {
    console.log("Error in Stream for #" + JSON.stringify(hashtags) + " " + error);
  })
  tweetStream.on('tweet', function (tweet) {
    produceTweetEvent(tweet);
  });
}

Code for publishing to the Kafka Topic a516817-tweetstopic:

const kafka = require('kafka-node');
const APP_NAME = "TwitterConsumer"

var EVENT_HUB_PUBLIC_IP = process.env.KAFKA_HOST || '129.1.1.116';
var TOPIC_NAME = process.env.KAFKA_TOPIC || 'a516817-tweetstopic';

var Producer = kafka.Producer;
var client = new kafka.Client(EVENT_HUB_PUBLIC_IP);
var producer = new Producer(client);
KeyedMessage = kafka.KeyedMessage;

producer.on('ready', function () {
  console.log("Producer is ready in " + APP_NAME);
});
producer.on('error', function (err) {
  console.log("failed to create the client or the producer " + JSON.stringify(err));
})


let payloads = [
  { topic: TOPIC_NAME, messages: '*', partition: 0 }
];

function produceTweetEvent(tweet) {
  var hashtagFound = false;
  try {
    // find out which of the original hashtags { track: ['oraclecode', 'java', 'jeeconf'] } in the hashtags for this tweet; 
    //that is the one for the tagFilter property
    // select one other hashtag from tweet.entities.hashtags to set in property hashtag
    var tagFilter = "#jeeconf";
    var extraHashTag = "liveForCode";
    for (var i = 0; i < tweet.entities.hashtags.length; i++) {
      var tag = '#' + tweet.entities.hashtags[i].text.toLowerCase();
      console.log("inspect hashtag " + tag);
      var idx = tracks.track.indexOf(tag);
      if (idx > -1) {
        tagFilter = tag;
        hashtagFound = true;
      } else {
        extraHashTag = tag
      }
    }//for

    if (hashtagFound) {
      var tweetEvent = {
        "eventType": "tweetEvent"
        , "text": tweet.text
        , "isARetweet": tweet.retweeted_status ? "y" : "n"
        , "author": tweet.user.name
        , "hashtag": extraHashTag
        , "createdAt": tweet.created_at
        , "language": tweet.lang
        , "tweetId": tweet.id
        , "tagFilter": tagFilter
        , "originalTweetId": tweet.retweeted_status ? tweet.retweeted_status.id : null
      };
      eventPublisher.publishEvent(tweet.id, tweetEvent)
      tweetCount++
    }// if hashtag found
  } catch (e) {
    console.log("Exception in publishing Tweet Event " + JSON.stringify(e))
  }
}

var eventPublisher = module.exports;

eventPublisher.publishEvent = function (eventKey, event) {
  km = new KeyedMessage(eventKey, JSON.stringify(event));
  payloads = [
    { topic: TOPIC_NAME, messages: [km], partition: 0 }
  ];
  producer.send(payloads, function (err, data) {
    if (err) {
      console.error("Failed to publish event with key " + eventKey + " to topic " + TOPIC_NAME + " :" + JSON.stringify(err));
    }
    console.log("Published event with key " + eventKey + " to topic " + TOPIC_NAME + " :" + JSON.stringify(data));
  });
}//publishEvent

3. (Postman Collection to create) Elastic Search Index plus custom mapping

Preparation of an Elastic Search environment is done through REST API calls. These can be made from code or from the command line (using CURL) or from a tool such as Postman. In this case, I have created a Postman collection with a number of calls to prepare the Elastic Search index tweets.

image

The following requests are relevant:

  • Check if the Elastic Search server is healthy: GET {{ELASTIC_HOME}}:9200/_cat/health
  • Create the tweets index: PUT {{ELASTIC_HOME}}:9200/tweets
  • Create the mapping for the tweets index: PUT {{ELASTIC_HOME}}:9200/tweets/_mapping/doc

The body for the last request is relevant:

{
                "properties": {
                    "author": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "createdAt": {
                        "type": "date",
          "format": "EEE MMM dd HH:mm:ss ZZ yyyy"
  
                    },
                    "eventType": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "hashtag": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "isARetweet": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "language": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "tagFilter": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "text": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            }
                        }
                    },
                    "tweetId": {
                        "type": "long"
                    }
                }
            }

The custom aspect of the mapping is primarily to extract proper creation date time value from a date string.

4. Node application that consumes the events from the Kafka tweets-topic and turns each event into a new record in the elastic search index

The tweetListener.js file contains the code for two main purposes: handle HTTP requests (primarily for health checks) and consume events from the Kafka Topic for tweets. This file requires the npm modules express, http and kafka-node for this. It also imports the local module model from the file model.js. This module writes Tweet records to the Elastic Search index. It uses the npm  module elasticsearch for this.

The code in tweetListener.js is best read in two sections:

First section for handling HTTP requests:

const express = require('express');
var https = require('https')
  , http = require('http')
const app = express();
var PORT = process.env.PORT || 8145;
const server = http.createServer(app);
var APP_VERSION = "0.0.3"


const bodyParser = require('body-parser');
app.use(bodyParser.json());
var tweetCount = 0;
app.get('/about', function (req, res) {
  var about = {
    "about": "Twitter Consumer from  " +SOURCE_TOPIC_NAME,
    "PORT": process.env.PORT,
    "APP_VERSION ": APP_VERSION,
    "Running Since": startTime,
    "Total number of tweets processed": tweetCount

  }
  res.json(about);
})
server.listen(PORT, function listening() {
  console.log('Listening on %d', server.address().port);
});

Second section for consuming Kafka events from tweets topic – and invoking the model module for each event:

var kafka = require('kafka-node');
var model = require("./model");

var tweetListener = module.exports;

var subscribers = [];
tweetListener.subscribeToTweets = function (callback) {
  subscribers.push(callback);
}

// var kafkaHost = process.env.KAFKA_HOST || "192.168.188.102";
// var zookeeperPort = process.env.ZOOKEEPER_PORT || 2181;
// var TOPIC_NAME = process.env.KAFKA_TOPIC ||'tweets-topic';

var KAFKA_ZK_SERVER_PORT = 2181;

var SOURCE_KAFKA_HOST = '129.1.1.116';
var SOURCE_TOPIC_NAME = 'a516817-tweetstopic';

var consumerOptions = {
    host: SOURCE_KAFKA_HOST + ':' + KAFKA_ZK_SERVER_PORT ,
  groupId: 'consume-tweets-for-elastic-index',
  sessionTimeout: 15000,
  protocol: ['roundrobin'],
  fromOffset: 'latest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
};

var topics = [SOURCE_TOPIC_NAME];
var consumerGroup = new kafka.ConsumerGroup(Object.assign({ id: 'consumer1' }, consumerOptions), topics);
consumerGroup.on('error', onError);
consumerGroup.on('message', onMessage);

function onMessage(message) {
  console.log('%s read msg Topic="%s" Partition=%s Offset=%d', this.client.clientId, message.topic, message.partition, message.offset);
  console.log("Message Value " + message.value)

  subscribers.forEach((subscriber) => {
    subscriber(message.value);

  })
}

function onError(error) {
  console.error(error);
  console.error(error.stack);
}

process.once('SIGINT', function () {
  async.each([consumerGroup], function (consumer, callback) {
    consumer.close(true, callback);
  });
});


tweetListener.subscribeToTweets((message) => {
  var tweetEvent = JSON.parse(message);
  tweetCount++; 
  // ready to elastify tweetEvent
  console.log("Ready to put on Elastic "+JSON.stringify(tweetEvent));
  model.saveTweet(tweetEvent).then((result, error) => {
    console.log("Saved to Elastic "+JSON.stringify(result)+'Error?'+JSON.stringify(error));
})

})

The file model.js connects to the Elastic Search server and saves tweets to the tweets index when so requested. Very straightforward. Without any exception handling, for example in case the Elastic Search server does not accept a record or is simply unavailable. Remember: this is just the code for a demo.

var tweetsModel = module.exports;
var elasticsearch = require('elasticsearch');

var ELASTIC_SEARCH_HOST = process.env.ELASTIC_CONNECTOR || 'http://129.150.114.134:9200';

var client = new elasticsearch.Client({
    host: ELASTIC_SEARCH_HOST,
});

client.ping({
    requestTimeout: 30000,
}, function (error) {
    if (error) {
        console.error('elasticsearch cluster is down!');
    } else {
        console.log('Connection to Elastic Search is established');
    }
});

tweetsModel.saveTweet = async function (tweet) {
    try {
        var response = await client.index({
            index: 'tweets',
            id: tweet.tweetId,
            type: 'doc',
            body: tweet
        }
        );

        console.log("Response: " + JSON.stringify(response));
        return tweet;
    }
    catch (e) {
        console.error("Error in Elastic Search - index document " + tweet.tweetId + ":" + JSON.stringify(e))
    }

}

5. Kibana dashboard on top of the Tweets Index.

Kibana is an out of the box application, preconfigured in my case for the colocated Elastic Search server. Once I provide the name of the index – TWEETS – I am interested in, immediately Kibana shows an overview of (selected time ranges in) this index (the peaks in the screenshot indicate May 19th and 20th when the JEEConf was taking place in Kyiv, where I presented this demo:

image

The same results in the Twitter UI:

image