Microservices and Updating Data Bound Context on Oracle Cloud with Application Container and Event Hub (plus DBaaS and MongoDB)–Part One image 99

Microservices and Updating Data Bound Context on Oracle Cloud with Application Container and Event Hub (plus DBaaS and MongoDB)–Part One

This article describes – in two installments – how events are used to communicate a change in a data record owned by the Customer microservice to consumers such as the Order microservice that has some details about the modified customer in its bound context.

image

The microservices are implemented using Node. The Customer microservice uses a cloud based MongoDB instance as its data store. The Order microservices runs on Oracle Application Container Cloud and has a service binding to an Oracle DBaaS (aka Database Cloud) instance. The Oracle Event Hub Cloud is used; it has a Kafka Topic that microservices on the Oracle Cloud as well as any where else can use to produce events to and consume events from. The Event Hub is used to communicate events that describe changes in the data owned by each of the microservices – allowing other microservices to update their bound context.

The provisioning and configuration of the Oracle Public Cloud services used in this article is described in detail in this article: Prepare and link bind Oracle Database Cloud, Application Container Cloud, Application Container Cache and Event Hub.

The sources for this article are available on GitHub: https://github.com/lucasjellema/order-data-demo-devoxx .

The setup described in this article was used as a demonstration during my presentation on “50 Shades of Data” during Devoxx Morocco (14-16 November, Casablanca, Morocco); the slidedeck for this session is available here:

https://www.slideshare.net/lucasjellema/50-shades-of-data-how-when-and-why-bigrelationalnosqlelasticeventcqrs-devoxx-maroc-november-2017-including-detailed-demo-screenshots

 

The Customer Microservice

Implemented in Node (JS) using a MongoDB instance (a free cloud based instance on MLab) for its private data store, running locally and engaging with Oracle Event Hub to produce a CustomerModified event in case of a change in a customer record. The Customer Microservice exposes a REST API with a single resource (customer) and operations to retrieve, list, create and update customer(s).

The MongoDB database was created on MLab (https://mlab.com/) – a MongDB hosting service with free tier up to 500 MB. I created a database called world and in it prepared a collection called customers.

image

image

image

The customers can be listed through REST API calls to the customer microservice:

image

and new customers can be created through the API:

image

Which of course results in a new record in MongoDB:

image

Clearly the customer microservice has associated state (in the MongoDB database) but is itself stateless. It can be stopped and restarted and it will still be able to produce customer records. Multiple instances of the microservice could be running and they would each have access to the same data. There could be some concurrency conflicts that we currently do not really cater for.

The salient code for implementing the REST operation for retrieving the customers is the following:

var MongoClient = require('mongodb').MongoClient;
var assert = require('assert');

var mongodbHost = 'ds139719.mlab.com';
var mongodbPort = '39791';
var authenticate = 'mongousername:mongopassword@'
var mongodbDatabase = 'world';

var mongoDBUrl = 'mongodb://' + authenticate + mongodbHost + ':' + mongodbPort + '/' + mongodbDatabase;

var http = require('http'),
	express = require('express'),
	bodyParser = require('body-parser'), // npm install body-parser
;

var moduleName = "customer-ms";
var PORT = process.env.PORT || 5118;
var appVersion = "1.0.2";

var app = express();
var server = http.createServer(app);

server.listen(PORT, function () {
	console.log('Server running, version ' + appVersion + ', Express is listening... at ' + PORT + " for Customer Microservice");
});

app.use(bodyParser.urlencoded({ extended: true }));
app.use(bodyParser.json({ "type": '*/*', "inflate": "true" }));

app.use(function (request, response, next) {
	response.setHeader('Access-Control-Allow-Origin', '*');
	response.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS, PUT, PATCH, DELETE');
	response.setHeader('Access-Control-Allow-Headers', 'X-Requested-With,content-type');
	response.setHeader('Access-Control-Allow-Credentials', true);
	next();
});

app.get('/customer', function (req, res) {
	// find customers in database
	MongoClient.connect(mongoDBUrl, function (err, db) {
		var nameOfCollection = "customers"
		db.collection(nameOfCollection).find(
			 function (err, customersCursor) {
				if (err) {
					console.log(err);
				} else {
					// for each cursor element, add a customer to the result
					customers = {"customers": []};
					customersCursor.toArray(function (err, cmrs)   { 
						customers.customers = cmrs;
						res.statusCode = 200;
						res.setHeader('Content-Type', 'application/json');
						res.setHeader('MyReply', 'retrieved all customers');
						res.send( customers);
						});
				}
			})
	}) //connect
})

Pretty straightforward code for setting up an Express based listener for GET requests at the specified port and url path /customer. When the request comes in, a connection is initialized to the MongoDB instance, all elements from the customers collection are retrieved and returned in a single JSON document.

Here is an example of a call to this operation in the REST API from Postman:

SNAGHTML151dda41

 

Inserting a new customer requires just a small additional method in the Node application, reacting to PUT requests:

app.put('/customer', function (req, res) {
	var customer = req.body;
	MongoClient.connect(mongoDBUrl, function (err, db) {
		var nameOfCollection = "customers"
		db.collection(nameOfCollection).insertMany([customer], function (err, r) {
			res.statusCode = 200;
			res.setHeader('Content-Type', 'application/json');
			res.setHeader('MyReply', 'Create or Updated the Customer ');
			res.send(customer);
		})//insertMany
	}//connect
	)
})

Updating an existing customer – to handle for example a name change as we will in just a moment – is very similar, triggered  by POST requests

var eventBusPublisher = require("./EventPublisher.js");

app.post('/customer/:customerId', function (req, res) {
    var customerId = req.params['customerId'];
    var customer = req.body;
    customer.id = customerId;
    // find customer in database and update
    MongoClient.connect(mongoDBUrl, function (err, db) {
        var nameOfCollection = "customers"
        db.collection(nameOfCollection).findAndModify(
            { "id": customerId }
            , [['_id', 'asc']]  // sort order
            , { $set: customer }
            , {}
            , function (err, updatedCustomer) {
                if (err) {
                    console.log(err);
                } else {
                    console.log("Customer updated :" + JSON.stringify(updatedCustomer));
                    // Now publish an event of type CustomerModified on Event Hub Cloud Service
                    eventBusPublisher.publishEvent("CustomerModified", {
                        "eventType": "CustomerModified"
                        , "customer": customer
                        , "module": "customer.microservice"
                        , "timestamp": Date.now()
                    }, topicName);
                    // and compose the HTTP Response
                    res.statusCode = 200;
                    res.setHeader('Content-Type', 'application/json');
                    res.setHeader('MyReply', 'Updated the Customer and published event on Event Hub - with  id -  ' + customerId);
                    res.send(customer);
                }
            })
    }) //connect
})

There something different and important about this snippet. After the MongoDB findAndModify operation returns, a call is made to a local module EventPublisher. This module handles the communication to the Event Hub [Cloud Service]. The salient code in this module is as follows:

var kafka = require('kafka-node');
// from the Oracle Event Hub - Platform Cluster Connect Descriptor
var kafkaConnectDescriptor = "129.xx.yy.zz";

var Producer = kafka.Producer
KeyedMessage = kafka.KeyedMessage;
var client;

var APP_VERSION = "0.8.3"
var APP_NAME = "EventBusPublisher"
console.log("Initialized module " + APP_NAME + "version " + APP_VERSION);
var producer;
var client;

function initializeKafkaProducer(attempt) {
  try {
    console.log(`Try to initialize Kafka Client at ${kafkaConnectDescriptor} and Producer, attempt ${attempt}`);
    client = new kafka.Client(kafkaConnectDescriptor);
    producer = new Producer(client);
    producer.on('ready', function () {
      console.log("Producer is ready in " + APP_NAME);
    });
    producer.on('error', function (err) {
      console.log("failed to create the client or the producer " + JSON.stringify(err));
    })
  }
  catch (e) {
    console.log("Exception in initializeKafkaProducer" + JSON.stringify(e));
    // try again in 5 secs
    setTimeout(initializeKafkaProducer, 5000, ++attempt);
  }
}//initializeKafkaProducer
initializeKafkaProducer(1);

var eventPublisher = module.exports;

eventPublisher.publishEvent = function (eventKey, event, topic) {
  km = new KeyedMessage(eventKey, JSON.stringify(event));
  payloads = [
    { topic: topic, messages: [km], partition: 0 }
  ];
  producer.send(payloads, function (err, data) {
    if (err) {
      console.error("Failed to publish event with key " + eventKey + " to topic " + topic + " :" + JSON.stringify(err));
    }
    console.log("Published event with key " + eventKey + " to topic " + topic + " :" + JSON.stringify(data));
  });
} // publishEvent   

 

At this point, we have implemented the following situation:

image

We can update the name of a customer – through a call to the REST API of the Customer microservice. The customer record is updated in the MongoDB database and a CustomerModified event is published to the EventHub’s devoxx-topic topic.image

image

image

In the next installment, we will implement the Order microservice that runs on the Oracle Application Container cloud, uses a cloud database (DBaaS instance) and consumes CustomerModified events from Event Hub.