This article describes – in two installments – how events are used to communicate a change in a data record owned by the Customer microservice to consumers such as the Order microservice that has some details about the modified customer in its bound context. The first installment described the implementation of the Customer microservice – using MongoDB for its private data store and producing events to Event Hub cloud service to inform other microservices about updates in customer records. In the installment you are reading right now, the Order microservice is introduced – implemented in Node, running on Application Container Cloud, bound to Oracle Database in the cloud and consuming events from Event Hub. These events include the CustomerModified event published by the Customer microservice and used by the Order microservice to synchronize its bound context.
The provisioning and configuration of the Oracle Public Cloud services used in this article is described in detail in this article: Prepare and link bind Oracle Database Cloud, Application Container Cloud, Application Container Cache and Event Hub.
The sources for this article are available on GitHub: https://github.com/lucasjellema/order-data-demo-devoxx .
The setup described in this article was used as a demonstration during my presentation on “50 Shades of Data” during Devoxx Morocco (14-16 November, Casablanca, Morocco); the slidedeck for this session is available here:
The Order microservice
The Order microservice is implemented in Node and deployed on Oracle Application Container cloud. It has service bindings to Database Cloud (for its private data store with Orders and associated data bound context) and Event Hub (for consuming events such as the CustomerModified event).
The Node runtime provided by Application Container Cloud included an Oracle Database Client and the Oracle DB driver for Node. This means that connecting to and interacting with an Oracle Database is done very easily.
The Orders microservice supports the REST call GET /order-api/orders which returns a JSON document with all customers:
The implementation of this functionality is straightforward Node, Express and Oracle Database driver for Node:
CODE FOR RETRIEVE ORDERS
var express = require('express') , http = require('http'); var bodyParser = require('body-parser') // npm install body-parser var ordersAPI = require( "./orders-api.js" ); var app = express(); var server = http.createServer(app); var PORT = process.env.PORT || 3000; var APP_VERSION = '0.0.4.06'; var allowCrossDomain = function(req, res, next) { res.header('Access-Control-Allow-Origin', '*'); res.header('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE'); res.header('Access-Control-Allow-Headers', 'Content-Type'); res.header('Access-Control-Allow-Credentials', true); next(); } server.listen(PORT, function () { console.log('Server running, version '+APP_VERSION+', Express is listening... at '+PORT+" for Orders Data API"); }); app.use(bodyParser.json()); // for parsing application/json app.use(allowCrossDomain); ordersAPI.registerListeners(app);
And the OrdersAPI:
var oracledb = require('oracledb'); var ordersAPI = module.exports; var apiURL = "/order-api"; ordersAPI.registerListeners = function (app) { app.get(apiURL + '/orders', function (req, res) { handleGetOrders(req, res); }); }//registerListeners handleGetOrders = function (req, res) { getOrdersFromDBTable(req, res); } transformOrders = function (orders) { return orders.map(function (o) { var order = {}; order.id = o[0]; order.customer_id = o[1]; order.customer_name = o[2]; order.status = o[3]; order.shipping_destination = o[4]; return order; }) } getOrdersFromDBTable = function (req, res) { handleDatabaseOperation(req, res, function (request, response, connection) { var selectStatement = "select id, customer_id, customer_name, status , shipping_destination from dvx_orders order by last_updated_timestamp"; connection.execute(selectStatement, {} , function (err, result) { if (err) { return cb(err, conn); } else { try { var orders = result.rows; orders = transformOrders(orders); response.writeHead(200, { 'Content-Type': 'application/json' }); response.end(JSON.stringify(orders)); } catch (e) { console.error("Exception in callback from execute " + e) } } }); }) }//getOrdersFromDBTable function handleDatabaseOperation(request, response, callback) { var connectString = process.env.DBAAS_DEFAULT_CONNECT_DESCRIPTOR; oracledb.getConnection( { user: process.env.DBAAS_USER_NAME, password: process.env.DBAAS_USER_PASSWORD , connectString: connectString }, function (err, connection) { if (err) { console.log('Error in acquiring connection ...'); console.log('Error message ' + err.message); return; } // do with the connection whatever was supposed to be done console.log('Connection acquired ; go execute - call callback '); callback(request, response, connection); }); }//handleDatabaseOperation function doRelease(connection) { connection.release( function (err) { if (err) { console.error(err.message); } }); } function doClose(connection, resultSet) { resultSet.close( function (err) { if (err) { console.error(err.message); } doRelease(connection); }); }
Creating new orders is supported through POST operation on the REST API exposed by the Order microservice:
The implementation in the Node application is fairly straightforward – see below:
CODE FOR CREATING ORDERS – added to the OrdersAPI module:
var eventBusPublisher = require("./EventPublisher.js"); ordersAPI.registerListeners = function (app) { app.get(apiURL + '/orders', function (req, res) { handleGetOrders(req, res); }); app.post(apiURL + '/*', function (req, res) { handlePost(req, res); }); }//registerListeners handlePost = function (req, res) { if (req.url.indexOf('/rest/') > -1) { ordersAPI.handleGet(req, res); } else { var orderId = uuidv4(); var order = req.body; order.id = orderId; order.status = "PENDING"; insertOrderIntoDatabase(order, req, res, function (request, response, order, rslt) { eventBusPublisher.publishEvent("NewOrderEvent", { "eventType": "NewOrder" ,"order": order , "module": "order.microservice" , "timestamp": Date.now() }, topicName); var result = { "description": `Order has been creatd with id=${order.id}` , "details": "Published event = not yet created in Database " + JSON.stringify(order) } response.writeHead(200, { 'Content-Type': 'application/json' }); response.end(JSON.stringify(result)); });//insertOrderIntoDatabase } }//ordersAPI.handlePost // produce unique identifier function uuidv4() { return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) { var r = Math.random() * 16 | 0, v = c == 'x' ? r : (r & 0x3 | 0x8); return v.toString(16); }); }
The function handlePost() makes a call to the module EventBusPublisher, to publish a NewOrder event on the Event Hub. The code for this module is shown below:
var kafka = require('kafka-node'); var kafkaConnectDescriptor = "129.xx.yy.zz"; var Producer = kafka.Producer KeyedMessage = kafka.KeyedMessage; var client; var APP_VERSION = "0.8.3" var APP_NAME = "EventBusPublisher" var producer; var client; function initializeKafkaProducer(attempt) { try { client = new kafka.Client(kafkaConnectDescriptor); producer = new Producer(client); producer.on('ready', function () { console.log("Producer is ready in " + APP_NAME); }); producer.on('error', function (err) { console.log("failed to create the client or the producer " + JSON.stringify(err)); }) } catch (e) { console.log("Exception in initializeKafkaProducer" + e); console.log("Exception in initializeKafkaProducer" + JSON.stringify(e)); console.log("Try again in 5 seconds"); setTimeout(initializeKafkaProducer, 5000, ++attempt); } }//initializeKafkaProducer initializeKafkaProducer(1); var eventPublisher = module.exports; eventPublisher.publishEvent = function (eventKey, event, topic) { km = new KeyedMessage(eventKey, JSON.stringify(event)); payloads = [ { topic: topic, messages: [km], partition: 0 } ]; producer.send(payloads, function (err, data) { if (err) { console.error("Failed to publish event with key " + eventKey + " to topic " + topic + " :" + JSON.stringify(err)); } console.log("Published event with key " + eventKey + " to topic " + topic + " :" + JSON.stringify(data)); }); }
Updating the bound Context
Suppose the details for Customer with identifier 25 are going to be updated, through the Customer microservice. That would mean that the customer record in the MongoDB database will be updated. However, that is not the only place where information about the Customer is recorded. Because we want our microservice to be independent and we can only properly work with the Order microservice if we have some information about elements associated with the order – such as the customer name and some details about each of the products – we have defined the data bound context of the Order microservice to include the customer name. As you can see in the next screenshot, we have an order record in the data store of the Order microservice for customer 25 and it contains the name of the customer.
That means that when the Customer microservice records a change in the name of the customer, we should somehow update the bound context of the Order microservice. And that is what we will do using the CustomerModified event, produced by the Customer microservice and consumed by the Order microservice.
The REST call to update the name of customer 25 – from Joachim to William:
The customer record in MongoDB is updated
and subsequently a CustomerModified event is produced to the devoxx-topic on Event Hub:
This event is consumed by the Order microservice and subsequently it triggers an update of the DVX_ORDERS table in the Oracle Database cloud instance. The code responsible for consuming the event and updating the database is shown below:
CODE FOR CONSUME EVENT – first the EventBusListener module
var kafka = require('kafka-node'); var client; var APP_VERSION = "0.1.2" var APP_NAME = "EventBusListener" var eventListenerAPI = module.exports; var kafka = require('kafka-node') var Consumer = kafka.Consumer var subscribers = []; eventListenerAPI.subscribeToEvents = function (callback) { subscribers.push(callback); } var topicName = "a516817-devoxx-topic"; var KAFKA_ZK_SERVER_PORT = 2181; var EVENT_HUB_PUBLIC_IP = '129.xx.yy.zz'; var consumerOptions = { host: EVENT_HUB_PUBLIC_IP + ':' + KAFKA_ZK_SERVER_PORT, groupId: 'consume-order-events-for-devoxx-app', sessionTimeout: 15000, protocol: ['roundrobin'], fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest' }; var topics = [topicName]; var consumerGroup = new kafka.ConsumerGroup(Object.assign({ id: 'consumer1' }, consumerOptions), topics); consumerGroup.on('error', onError); consumerGroup.on('message', onMessage); function onMessage(message) { subscribers.forEach((subscriber) => { subscriber(message.value); }) } function onError(error) { console.error(error); console.error(error.stack); } process.once('SIGINT', function () { async.each([consumerGroup], function (consumer, callback) { consumer.close(true, callback); }); });
And the code in module OrdersAPI that imports the module, registers the event listener and handles the event:
var eventBusListener = require("./EventListener.js"); eventBusListener.subscribeToEvents( (message) => { console.log("Received event from event hub"); try { var event = JSON.parse(message); if (event.eventType=="CustomerModified") { console.log(`Details for a customer have been modified and the bounded context for order should be updated accordingly ${event.customer.id}`); updateCustomerDetailsInOrders( event.customer.id, event.customer) } } catch (err) { console.log("Parsing event failed "+err); } } ); function updateCustomerDetailsInOrders( customerId, customer) { console.log(`All orders for cusyomer ${customerId} will be updated to new customer name ${customer.name} `); console.log('updateCustomerDetailsInOrders'); handleDatabaseOperation("req", "res", function (request, response, connection) { var bindvars = [customer.name, customerId]; var updateStatement = `update dvx_orders set customer_name = :customerName where customer_id = :customerId` ; connection.execute(updateStatement, bindvars, function (err, result) { if (err) { console.error('error in updateCustomerDetailsInOrders ' + err.message); doRelease(connection); callback(request, response, order, { "summary": "Update failed", "error": err.message, "details": err }); } else { connection.commit(function (error) { if (error) console.log(`After commit - error = ${error}`); doRelease(connection); // there is no callback: callback(request, response, order, { "summary": "Update Status succeeded", "details": result }); }); }//else }); //callback for handleDatabaseOperation });//handleDatabaseOperation }// updateCustomerDetailsInOrders}
When we check the current set of Orders, we will find that the customer name associated with the order(s) for customer 25 have now William as the customer_name, instead of Joachim or Jochem.
We can check directly in the Oracle Database Table DVX_ORDERS to find the customer name updated for both orders for customer 25: