CQRS through Twitter – more fun than meaning

0

As part of my 50 Shades of Data presentation, I talk about CQRS – Command Query Responsibility Segregation. The idea that you can have one or more read only data stores with data replicated from the master store where all data manipulation (commands) take place. In a different presentation, I introduce Apache Kafka and explain its concepts. In this article, these two topics come together, in a light hearted fashion.

My starting point is the observation that what Kafka can do for machines or systems is very similar to what Twitter does for humans. Twitter allows decoupled communication to potentially large (or very small) audiences that you may be completely unaware of as tweet publisher. These consumers of your tweets of wisdom can be anywhere in the world, use any type of device and read your tweets at their leisure – long after you tweeted them. For now, Twitter retains the full history of tweets. Since most people understand all of this about Twitter, it seems like a good introduction of Kafka to start with Twitter.

image

Then I briefly sum up the key characteristics of Twitter (as well as some features it is lacking with regard to a true system oriented messaging platform):

image

And then I demonstrate how one can implement CQRS using Twitter as middleware.

image

The set up that this article introduces is like this:

  • Data Manipulation is performed on the Oracle Database in the lower and corner, running on the Oracle Cloud
  • Each (relevant) DML operation is intercepted – and published as a Tweet (with a JSON payload that captures the essence of the DML action)
  • A Node JS application is registered as a Twitter consumer – consuming Tweets with the hashtag used in the DML Tweets; this application consumes the Tweet, interprets the JSON payload and replicates the data manipulation on a MongoDB database, running in the mlab cloud.
  • With all this in place, I can create Orders in the Oracle Database and I will see them replicated in the MongoDB database – through the tweets that relay the information. Note: by publishing my own Tweets with the hashtag and a meaningful JSON structure, I can also trigger the code that performs the manipulation of the MongoDB database.

The relevant code  for this article is found on GitHub: https://github.com/lucasjellema/groundbreaker-japac-tour-cqrs-via-twitter-and-event-hub . This includes DDL scripts for the Oracle Database, two Node.JS applications, artifacts for containerizing the Node applications and for Kubernetalizing these resulting containers. The Node applications – either stand-alone or in a container or on a Kubernetes cluster – can run virtually anywhere, as long as the Oracle Database can make a call out to one of them (for example: if the Oracle Database is running on the cloud, then from that cloud you need to be able to invoke the endpoint for the Node application over HTTP).

I am assuming an Oracle Database is already running somewhere (in my case on the Oracle Cloud as DBaaS – see for example https://www.oracle.com/webfolder/technetwork/tutorials/obe/cloud/dbaas/obe_dbaas_QS/oracle_database_cloud_service_dbaas_quick_start.html on how to get started with Oracle DBaaS). I also assume a MongoDB database is available to me. One quick option for MongoDB is MLab (https://mlab.com/) – a MongDB hosting service with free tier up to 500 MB.

The two main steps that I will describe in this article:

  1. Publish Tweet for Database Manipulation – using a database trigger, a PL/SQL function that calls out via UTL_HTTP and a Node application that publishes a Tweet
  2. Consume Tweet and Update MongoDB Database – a Node application that registers as a Twitter client and subscribes to the specified hashtag and creates document in a MongoDB collection for the JSON document in the Tweets it processes

Publish Tweet for Database Manipulation

In order to intercept changes to the ORDERS table in the Oracle Database and publish a tweet for each change, I can use various techniques:

  • Database Continuous Query Change Notification
  • Periodic Flashback Versions Query
  • Periodic LogMiner
  • Table trigger

In this article I will use the straightforward approach with a Table trigger, to be triggered for INSERT operations (to keep the example simple). I am well aware that the right thing to do in the trigger is to schedule a job with dbms_job or dbms_scheduler. Only when the change that excited the trigger is actually committed will the job be executed. The job is to make the HTTP call to inform the Node application of the database change. To further simplify this article, I will forego the job and simply make the HTTP call when the trigger is firing.

In order for [PL/SQL program units owned by] a database user to make HTTP calls, they need to have been granted specific permissions – including permissions to call out to the specific endpoint. The statements to be executed as a DBA can be found in this file: https://github.com/lucasjellema/groundbreaker-japac-tour-cqrs-via-twitter-and-event-hub/blob/master/db-synch-orcl-2-mongodb-over-twitter-or-kafka/database/sys-prepare-ddl.sql , using the DBMS_NETWORK_ACL_ADMIN package.

The DDL statements that create the table, a trigger and a PL/SQL procedure to make the HTTP call are in this file: https://github.com/lucasjellema/groundbreaker-japac-tour-cqrs-via-twitter-and-event-hub/blob/master/db-synch-orcl-2-mongodb-over-twitter-or-kafka/database/oracle-ddl.sql.

The most interesting snippets are included below. Note: I had some difficulties with making a POST call and instead of resolving them I took the easy way out and used an elaborate GET request to make my intent known to the Node application.

CODE database DDL

create table dvx_orders
( id varchar2(200)
, status varchar2(100)
, customer_id varchar2(200)
, customer_name varchar2(200)
, shipping_destination varchar2(200)
, last_updated_timestamp timestamp default systimestamp
);



create or replace
procedure publish_order_event
( p_order_id in varchar2
, p_status in varchar2
, p_customer_id in varchar2
, p_customer_name in varchar2
, p_shipping_destination in varchar2
, p_last_update_timestamp in timestamp default systimestamp
) is
  url varchar2(4000) := 'http://your.host/order';
  urlparams varchar2(4000) := utl_url.escape('orderId='||p_order_id
  ||chr(38)||'customerName='||p_customer_name
  ||chr(38)||'status='||p_status
  ||chr(38)||'customerId='||p_customer_id
  ||chr(38)||'shippingDestination='||p_shipping_destination
  ||chr(38)||'timestamp='||to_char(systimestamp,'YYYYMMDDHH24MISS')
  
  );
  l_result varchar2(4000);
begin
  l_result:= utl_http.request(url||'?'||urlparams);
end;

create or replace 
trigger order_event_reporter
after INSERT OR UPDATE ON dvx_orders
FOR EACH ROW
begin
  publish_order_event(
    p_order_id => :new.id
  , p_status => :new.status
  , p_customer_id => :new.customer_id
  , p_customer_name => :new.customer_name
  , p_shipping_destination => :new.shipping_destination
  , p_last_update_timestamp => systimestamp
);
end;

The Node application is again a simplification – and not at all production ready code nor even a very efficient implementation. It will do the job. Not much more.

The Node application uses Express to handle HTTP requests. See file https://github.com/lucasjellema/groundbreaker-japac-tour-cqrs-via-twitter-and-event-hub/blob/master/db-synch-orcl-2-mongodb-over-twitter-or-kafka/index.js for the core module. The application handles GET requests on the /about and the /order endpoint. The former returns a simple response which can easily be used as health check. The /order endpoint takes the URL request parameters from the request and invokes either a function to tweet about te new order or to post a message to a Kafka Topic.

// listen to post requests about new or changed orders - and turn them into a Tweet message
var express = require('express') //npm install express
    , bodyParser = require('body-parser') // npm install body-parser
    , http = require('http')
    ;
var PORT = process.env.PORT || 3000;


const app = express()
    .use(bodyParser.urlencoded({ extended: true }))
    ;

const server = http.createServer(app);

const REQUIRED_ENVIRONMENT_SETTINGS = [
    {name:"PUBLISH_TO_KAFKA_YN" , message:"with either Y (publish event to Kafka) or N (publish to Twitter instead)"},
    {name:"KAFKA_SERVER" , message:"with the IP address of the Kafka Server to which the application should publish"},
    {name:"KAFKA_TOPIC" , message:"with the name of the Kafka Topic to which the application should publish"},
    {name:"TWITTER_CONSUMER_KEY" , message:"with the consumer key for a set of Twitter client credentials"},
    {name:"TWITTER_CONSUMER_SECRET" , message:"with the consumer secret for a set of Twitter client credentials"},
    {name:"TWITTER_ACCESS_TOKEN_KEY" , message:"with the access token key for a set of Twitter client credentials"},
    {name:"TWITTER_ACCESS_TOKEN_SECRET" , message:"with the access token secret for a set of Twitter client credentials"},
    {name:"TWITTER_HASHTAG" , message:"with the value for the twitter hashtag to use when publishing tweets"},
    ]

for(var env of REQUIRED_ENVIRONMENT_SETTINGS) {
  if (!process.env[env.name]) {
    console.error(`Environment variable ${env.name} should be set: ${env.message}`);  
  } else {
    // convenient for debug; however: this line exposes all environment variable values - including any secret values they may contain
    // console.log(`Environment variable ${env.name} is set to : ${process.env[env.name]}`);  
  }
}

server.listen(PORT, function listening() {
    console.log('Listening on %d', server.address().port);
});

app.get('/about', function (req, res) {
    var msg = req.body;
    console.log(msg)

    res.writeHead(200, { 'Content-Type': 'text/html' });
    res.write("About Tweeting REST API:");
    res.write("incoming headers" + JSON.stringify(req.headers));
    res.end();
});

console.log("Publishing to Kafka or Twitter?")
console.log("Environment Variable PUBLISH_TO_KAFKA_YN ="+process.env.PUBLISH_TO_KAFKA_YN)
var publishOrderSynchEventOverKafka = (process.env.PUBLISH_TO_KAFKA_YN||'N')=='Y';
console.log("publishOrderSynchEventOverKafka ="+publishOrderSynchEventOverKafka)
var twitterHashTag = process.env.TWITTER_HASHTAG || "#japacorderevent"
app.get('/order', function (req, res) {
    var customerName = req.query.customerName;
    var result = { "summary": "tremendous success!" }
    res.writeHead(200, { 'Content-Type': 'application/json' });
    res.end(JSON.stringify(result));

    if (!(publishOrderSynchEventOverKafka)) {
        // now tweet the order event
        var tweetMsg = twitterHashTag+ " " + JSON.stringify(req.query);
        tweet.postMessage(tweetMsg);
    } else {
        eventBusPublisher.publishEvent("NewJAPACOrderEvent", {
            "eventType": "NewJAPACOrderEvent"
            , "order": req.query
            , "module": "database-synchronizer"
            , "timestamp": Date.now()
        }, topicName);
    }
});

eventBusPublisher = require("./EventPublisher.js");
// from the Oracle Event Hub - Platform Cluster Connect Descriptor
var topicName = process.env.KAFKA_TOPIC || "ordersTopic";
var tweet = require("./tweet");

Note how the Node application does not contain any hard coded environment references – such as the Twitter Hashtag, the Twitter Client App credentials or any of the Kafka configuration details. All these values are derived from environment variables – that we will be able to set when a container with this application inside it is run or from the Pod configuration when we run the container on Kubernetes.

The logic for publishing the tweet to Twitter is in the tweet module. The sources are found here: https://github.com/lucasjellema/groundbreaker-japac-tour-cqrs-via-twitter-and-event-hub/blob/master/db-synch-orcl-2-mongodb-over-twitter-or-kafka/tweet.js .

var tweet=     module.exports ;
var Twit = require('twit');

const { twitterconfig } = require('./twitterconfig');
var T = new Twit({
  consumer_key: twitterconfig.consumer_key,
  consumer_secret: twitterconfig.consumer_secret,
  access_token: twitterconfig.access_token_key,
  access_token_secret: twitterconfig.access_token_secret,
  timeout_ms: 60 * 1000,
});

//https://developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/post-statuses-update 
tweet.postMessage = function (message) {
T.post('statuses/update', { status: 
    message }, function(err, data, response) {
      if(err) {
        console.log("Error in Tweeting "+err);
      }
    console.log(data)
  })
}

At this point, any insert into the ORDERS table in the Oracle Database should result in a Tweet being published with some JSON representation of the database change.

2. Consume the Tweet and Turn it into a MongoDB Document Manipulation

Part two of the story is about consuming the Tweet and using it to replicate the database change to the MongoDB collection. We assume the MongoDB instance with a database that contains this collection is already present. What needs to be created is a Node application that registers as a listener on Twitter for tweets with the specific hashtag we have chosen for our CQRS implementation. It should consume the tweets, interpret their JSON contents and create a record in the MongoDB collection. Eazy peazy.

The Tweet.js module – https://github.com/lucasjellema/groundbreaker-japac-tour-cqrs-via-twitter-and-event-hub/blob/master/db-synch-to-mongodb-over-twitter-or-kafka/tweet.js – contains the code that registers the Twitter listener on a stream based on the designated hashtag (#GroundbreakersTourOrderEvent). When a tweet is received, the JSON content is extracted and parsed into a JavaScript object. Then the ordersAPI is invoked for the object.

var Twit = require('twit');
var ordersAPI = require('./orders-api');

const { twitterconfig } = require('./twitterconfig');
var T = new Twit({
  consumer_key: twitterconfig.consumer_key,
  consumer_secret: twitterconfig.consumer_secret,
  access_token: twitterconfig.access_token_key,
  access_token_secret: twitterconfig.access_token_secret,
  timeout_ms: 60 * 1000,
});

var twitterHashTag = process.env.TWITTER_HASHTAG||"#Trump";

var tracks = { track: [twitterHashTag] };
let tweetStream = T.stream('statuses/filter', tracks)
tweetstream(tracks, tweetStream);

function tweetstream(hashtags, tweetStream) {
  console.log("Started tweet stream for hashtag " + JSON.stringify(hashtags));

  tweetStream.on('connected', function (response) {
    console.log("Stream connected to twitter for " + JSON.stringify(hashtags));
  })
  tweetStream.on('error', function (error) {
    console.log("Error in Stream for #" + JSON.stringify(hashtags) + " " + error);
  })
  tweetStream.on('tweet', function (tweet) {
    processTweetEvent(tweet);
  });
}


function processTweetEvent(tweet) {
  console.log("tweet text " + tweet.extended_tweet.full_text.substring(twitterHashTag.length));
  var order = JSON.parse(tweet.extended_tweet.full_text.substring(twitterHashTag.length));

  console.log("Order from Tweet is " + JSON.stringify(order));

  order.id = order.orderId;
  ordersAPI.insertOrderIntoDatabase(order)
}

The OrdersAPI module (orders-api.js) receives the order object, connects to the MongoDB instance and database, inspects the current contents of the orders collection (not really necessary, only for demonstration purposes) and inserts the new order document.

var ordersAPI = module.exports;

var MongoClient = require('mongodb').MongoClient;

var mongodbHost =  process.env.MONGODB_HOST; 
var mongodbPort = process.env.MONGODB_PORT;
var authenticate = 'mongouser:mongopass@'
var mongodbDatabase = process.env.MONGODB_DATABASE||'world'; // name of your MongoDB database instance

var mongoDBUrl = 'mongodb://' + authenticate + mongodbHost + ':' + mongodbPort + '/' + mongodbDatabase;
var nameOfCollection ="orders"

ordersAPI.insertOrderIntoDatabase = function (orderEvent) {
	var order = orderEvent.order
	console.log(`Order received for processing: ${JSON.stringify(order)} into collection ${nameOfCollection}` )
	// find all current orders:
    MongoClient.connect(mongoDBUrl, function (err, db) {
		console.log("read current orders ")
		if (err) throw err;
		var dbo = db.db(mongodbDatabase);
		dbo.collection(nameOfCollection).find({}).toArray(function(err, result) {
		  if (err) throw err;
		  console.log(result);
		  db.close();
		});
	  });

    MongoClient.connect(mongoDBUrl, function (err, db) {
		db.collection(nameOfCollection).insertMany([order], function (err, r) {
			if (err) {
				console.error("Problem with inserting order "+err)
			} else {
			   console.log(r.insertedCount + "orders created into collection " + nameOfCollection);
			}
		})//insertMany
	}//connect
	)

}

Note how all the configuration details for the Tweet Consumer (from twitterconfig,.js and tweet.js) and for the MongoDB manipulation (in order-api.js) are read from environment variables using process.env.<name of variable>. These variables can be set in the start script in de package.json file, or in the docker run -e name-of-variable=value-of-variable command used to run a container with the application inside or in the deployment.yaml file used to deploy the application to Kubernetes.

And – action

image

To see the CQRS chain in action, perform an insert in the Oracle Database:

image

Check in Twitter for new tweets with the specified hashtag #GroundbreakersTourOrderEvent :

image

and inspect the contents of the MongoDB orders collection:

SNAGHTML450596ac

Clearly, using Twitter as our messaging infrastructure, we are able to implement inter-cloud CQRS from Oracle Database to MongoDB instance. No mean feat!

Resources

Source code for this article: https://github.com/lucasjellema/groundbreaker-japac-tour-cqrs-via-twitter-and-event-hub .

Step by step explanation on containerizing and Kubernetalizing a Node application: https://technology.amis.nl/2018/11/16/from-locally-running-node-application-to-cloud-based-kubernetes-deployment/ 

MLab (https://mlab.com/) – a MongDB hosting service with free tier up to 500 MB.

NPM module twit – for doing Twitter from Node: https://www.npmjs.com/package/twit

Twitter API Reference – https://developer.twitter.com/en/docs/api-reference-index.html

NPM module MongoDB: https://www.npmjs.com/package/mongodb

About Author

Lucas Jellema, active in IT (and with Oracle) since 1994. Oracle ACE Director and Oracle Developer Champion. Solution architect and developer on diverse areas including SQL, JavaScript, Kubernetes & Docker, Machine Learning, Java, SOA and microservices, events in various shapes and forms and many other things. Author of the Oracle Press book Oracle SOA Suite 12c Handbook. Frequent presenter on user groups and community events and conferences such as JavaOne, Oracle Code, CodeOne, NLJUG JFall and Oracle OpenWorld.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.