Microservices and Updating Data Bound Context on Oracle Cloud with Application Container and Event Hub (plus DBaaS and MongoDB)–Part Two image 110

Microservices and Updating Data Bound Context on Oracle Cloud with Application Container and Event Hub (plus DBaaS and MongoDB)–Part Two

This article describes – in two installments – how events are used to communicate a change in a data record owned by the Customer microservice to consumers such as the Order microservice that has some details about the modified customer in its bound context. The first installment described the implementation of the Customer microservice – using MongoDB for its private data store and producing events to Event Hub cloud service to inform other microservices about updates in customer records. In the installment you are reading right now, the Order microservice is introduced – implemented in Node, running on Application Container Cloud, bound to Oracle Database in the cloud and consuming events from Event Hub. These events include the CustomerModified event published by the Customer microservice and used by the Order microservice to synchronize its bound context.

imageThe provisioning and configuration of the Oracle Public Cloud services used in this article is described in detail in this article: Prepare and link bind Oracle Database Cloud, Application Container Cloud, Application Container Cache and Event Hub.

The sources for this article are available on GitHub: https://github.com/lucasjellema/order-data-demo-devoxx .

The setup described in this article was used as a demonstration during my presentation on “50 Shades of Data” during Devoxx Morocco (14-16 November, Casablanca, Morocco); the slidedeck for this session is available here:

https://www.slideshare.net/lucasjellema/50-shades-of-data-how-when-and-why-bigrelationalnosqlelasticeventcqrs-devoxx-maroc-november-2017-including-detailed-demo-screenshots

The Order microservice

The Order microservice is implemented in Node and deployed on Oracle Application Container cloud. It has service bindings to Database Cloud (for its private data store with Orders and associated data bound context) and Event Hub (for consuming events such as the CustomerModified event).image

The Node runtime provided by Application Container Cloud included an Oracle Database Client and the Oracle DB driver for Node. This means that connecting to and interacting with an Oracle Database is done very easily.

The Orders microservice supports the REST call GET /order-api/orders which returns a JSON document with all customers:

image

The implementation of this functionality is straightforward Node, Express and Oracle Database driver for Node:

CODE FOR RETRIEVE ORDERS

var express = require('express')
  , http = require('http');

var bodyParser = require('body-parser') // npm install body-parser
var ordersAPI = require( "./orders-api.js" );

var app = express();
var server = http.createServer(app);

var PORT = process.env.PORT || 3000;
var APP_VERSION = '0.0.4.06';

var allowCrossDomain = function(req, res, next) {
    res.header('Access-Control-Allow-Origin', '*');
    res.header('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE');
    res.header('Access-Control-Allow-Headers', 'Content-Type');
    res.header('Access-Control-Allow-Credentials', true); 
    next();
}

server.listen(PORT, function () {
  console.log('Server running, version '+APP_VERSION+', Express is listening... at '+PORT+" for Orders Data API");
});

app.use(bodyParser.json()); // for parsing application/json
app.use(allowCrossDomain);

ordersAPI.registerListeners(app);

And the OrdersAPI:

var oracledb = require('oracledb');

var ordersAPI = module.exports;
var apiURL = "/order-api";

ordersAPI.registerListeners =
  function (app) {
    app.get(apiURL + '/orders', function (req, res) {
      handleGetOrders(req, res);
    });
  }//registerListeners

handleGetOrders = function (req, res) {
  getOrdersFromDBTable(req, res);
}

transformOrders = function (orders) {
  return orders.map(function (o) {
    var order = {};
    order.id = o[0];
    order.customer_id = o[1];
    order.customer_name = o[2];
    order.status = o[3];
    order.shipping_destination = o[4];
    return order;
  })
}


getOrdersFromDBTable = function (req, res) {
  handleDatabaseOperation(req, res, function (request, response, connection) {
    var selectStatement = "select id, customer_id, customer_name, status , shipping_destination from dvx_orders order by last_updated_timestamp";
    connection.execute(selectStatement, {}
      , function (err, result) {
        if (err) {
          return cb(err, conn);
        } else {
          try {
            var orders = result.rows;
            orders = transformOrders(orders);
            response.writeHead(200, { 'Content-Type': 'application/json' });
            response.end(JSON.stringify(orders));
          } catch (e) {
            console.error("Exception in callback from execute " + e)
          }
        }
      });
  })
}//getOrdersFromDBTable


function handleDatabaseOperation(request, response, callback) {
  var connectString = process.env.DBAAS_DEFAULT_CONNECT_DESCRIPTOR;
  oracledb.getConnection(
    {
      user:  process.env.DBAAS_USER_NAME,
      password: process.env.DBAAS_USER_PASSWORD ,
      connectString: connectString
    },
    function (err, connection) {
      if (err) {
        console.log('Error in acquiring connection ...');
        console.log('Error message ' + err.message);
        return;
      }
      // do with the connection whatever was supposed to be done
      console.log('Connection acquired ; go execute - call callback ');
      callback(request, response, connection);
    });
}//handleDatabaseOperation


function doRelease(connection) {
  connection.release(
    function (err) {
      if (err) {
        console.error(err.message);
      }
    });
}

function doClose(connection, resultSet) {
  resultSet.close(
    function (err) {
      if (err) { console.error(err.message); }
      doRelease(connection);
    });
}

Creating new orders is supported through POST operation on the REST API exposed by the Order microservice:

image


The implementation in the Node application is fairly straightforward – see below:

CODE FOR CREATING ORDERS – added to the OrdersAPI module:

var  eventBusPublisher = require("./EventPublisher.js");

ordersAPI.registerListeners =
  function (app) {
    app.get(apiURL + '/orders', function (req, res) {
      handleGetOrders(req, res);
    });
    app.post(apiURL + '/*', function (req, res) {
      handlePost(req, res);
    });
  }//registerListeners

handlePost =
  function (req, res) {
    if (req.url.indexOf('/rest/') > -1) {
      ordersAPI.handleGet(req, res);
    } else {
      var orderId = uuidv4();
      var order = req.body;
      order.id = orderId;
      order.status = "PENDING";
      insertOrderIntoDatabase(order, req, res,
        function (request, response, order, rslt) {

          eventBusPublisher.publishEvent("NewOrderEvent", {
            "eventType": "NewOrder"
            ,"order": order
            , "module": "order.microservice"
            , "timestamp": Date.now()
          }, topicName);

          var result = {
            "description": `Order has been creatd with id=${order.id}`
            , "details": "Published event = not yet created in Database " + JSON.stringify(order)
          }
          response.writeHead(200, { 'Content-Type': 'application/json' });
          response.end(JSON.stringify(result));

        });//insertOrderIntoDatabase
    }
  }//ordersAPI.handlePost



// produce unique identifier
function uuidv4() {
  return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
    var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8);
    return v.toString(16);
  });
}	


The function handlePost() makes a call to the module EventBusPublisher, to publish a NewOrder event on the Event Hub. The code for this module is shown below:

var kafka = require('kafka-node');

var kafkaConnectDescriptor = "129.xx.yy.zz";

var Producer = kafka.Producer
KeyedMessage = kafka.KeyedMessage;

var client;

var APP_VERSION = "0.8.3"
var APP_NAME = "EventBusPublisher"

var producer;
var client;

function initializeKafkaProducer(attempt) {
  try {
    client = new kafka.Client(kafkaConnectDescriptor);
    producer = new Producer(client);
    producer.on('ready', function () {
      console.log("Producer is ready in " + APP_NAME);
    });
    producer.on('error', function (err) {
      console.log("failed to create the client or the producer " + JSON.stringify(err));
    })
  }
  catch (e) {
    console.log("Exception in initializeKafkaProducer" + e);
    console.log("Exception in initializeKafkaProducer" + JSON.stringify(e));
    console.log("Try again in 5 seconds");
    setTimeout(initializeKafkaProducer, 5000, ++attempt);
  }
}//initializeKafkaProducer
initializeKafkaProducer(1);

var eventPublisher = module.exports;

eventPublisher.publishEvent = function (eventKey, event, topic) {
  km = new KeyedMessage(eventKey, JSON.stringify(event));
  payloads = [
    { topic: topic, messages: [km], partition: 0 }
  ];
  producer.send(payloads, function (err, data) {
    if (err) {
      console.error("Failed to publish event with key " + eventKey + " to topic " + topic + " :" + JSON.stringify(err));
    }
    console.log("Published event with key " + eventKey + " to topic " + topic + " :" + JSON.stringify(data));
  });
}

Updating the bound Context

Suppose the details for Customer with identifier 25 are going to be updated, through the Customer microservice. That would mean that the customer record in the MongoDB database will be updated. However, that is not the only place where information about the Customer is recorded. Because we want our microservice to be independent and we can only properly work with the Order microservice if we have some information about elements associated with the order – such as the customer name and some details about each of the products – we have defined the data bound context of the Order microservice to include the customer name. As you can see in the next screenshot, we have an order record in the data store of the Order microservice for customer 25 and it contains the name of the customer.

image

That means that when the Customer microservice records a change in the name of the customer, we should somehow update the bound context of the Order microservice. And that is what we will do using the CustomerModified event, produced by the Customer microservice and consumed by the Order microservice.

image

The REST call to update the name of customer 25 – from Joachim to William:

image

The customer record in MongoDB is updated

image

and subsequently a CustomerModified event is produced to the devoxx-topic on Event Hub:

image

This event is consumed by the Order microservice and subsequently it triggers an update of the DVX_ORDERS table in the Oracle Database cloud instance. The code responsible for consuming the event and updating the database is shown below:

CODE FOR CONSUME EVENT – first the EventBusListener module

var kafka = require('kafka-node');

var client;

var APP_VERSION = "0.1.2"
var APP_NAME = "EventBusListener"

var eventListenerAPI = module.exports;

var kafka = require('kafka-node')
var Consumer = kafka.Consumer

var subscribers = [];

eventListenerAPI.subscribeToEvents = function (callback) {
  subscribers.push(callback);
}

var topicName = "a516817-devoxx-topic";
var KAFKA_ZK_SERVER_PORT = 2181;
var EVENT_HUB_PUBLIC_IP = '129.xx.yy.zz';

var consumerOptions = {
    host: EVENT_HUB_PUBLIC_IP + ':' + KAFKA_ZK_SERVER_PORT,
    groupId: 'consume-order-events-for-devoxx-app',
    sessionTimeout: 15000,
    protocol: ['roundrobin'],
    fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
};

var topics = [topicName];
var consumerGroup = new kafka.ConsumerGroup(Object.assign({ id: 'consumer1' }, consumerOptions), topics);
consumerGroup.on('error', onError);
consumerGroup.on('message', onMessage);



function onMessage(message) {
    subscribers.forEach((subscriber) => {
        subscriber(message.value);
    })
}

function onError(error) {
    console.error(error);
    console.error(error.stack);
}

process.once('SIGINT', function () {
    async.each([consumerGroup], function (consumer, callback) {
        consumer.close(true, callback);
    });
});

And the code in module OrdersAPI that imports the module, registers the event listener and handles the event:

var eventBusListener = require("./EventListener.js");

eventBusListener.subscribeToEvents(
  (message) => {
    console.log("Received event from event hub");
    try {
    var event = JSON.parse(message);
    if (event.eventType=="CustomerModified") {
      console.log(`Details for a customer have been modified and the bounded context for order should be updated accordingly ${event.customer.id}`);
      updateCustomerDetailsInOrders( event.customer.id, event.customer)
    }
    } catch (err) {
      console.log("Parsing event failed "+err);
    }
  }
);

function updateCustomerDetailsInOrders( customerId, customer) {
  console.log(`All orders for cusyomer ${customerId} will  be  updated to new customer name ${customer.name} `);
  console.log('updateCustomerDetailsInOrders');
  handleDatabaseOperation("req", "res", function (request, response, connection) {
    var bindvars = [customer.name, customerId];
    var updateStatement = `update dvx_orders set customer_name = :customerName where customer_id = :customerId` ;
    connection.execute(updateStatement, bindvars, function (err, result) {
      if (err) {
        console.error('error in updateCustomerDetailsInOrders ' + err.message);
        doRelease(connection);
        callback(request, response, order, { "summary": "Update failed", "error": err.message, "details": err });
      }
      else {
        connection.commit(function (error) {
          if (error) console.log(`After commit - error = ${error}`);
          doRelease(connection);
          // there is no callback:  callback(request, response, order, { "summary": "Update Status succeeded", "details": result });
        });
      }//else
    }); //callback for handleDatabaseOperation
  });//handleDatabaseOperation 
}// updateCustomerDetailsInOrders}


When we check the current set of Orders, we will find that the customer name associated with the order(s) for customer 25 have now William as the customer_name, instead of Joachim or Jochem.

imageWe can check directly in the Oracle Database Table DVX_ORDERS to find the customer name updated for both orders for customer 25:

image