This article describes – in two installments – how events are used to communicate a change in a data record owned by the Customer microservice to consumers such as the Order microservice that has some details about the modified customer in its bound context.
The microservices are implemented using Node. The Customer microservice uses a cloud based MongoDB instance as its data store. The Order microservices runs on Oracle Application Container Cloud and has a service binding to an Oracle DBaaS (aka Database Cloud) instance. The Oracle Event Hub Cloud is used; it has a Kafka Topic that microservices on the Oracle Cloud as well as any where else can use to produce events to and consume events from. The Event Hub is used to communicate events that describe changes in the data owned by each of the microservices – allowing other microservices to update their bound context.
The provisioning and configuration of the Oracle Public Cloud services used in this article is described in detail in this article: Prepare and link bind Oracle Database Cloud, Application Container Cloud, Application Container Cache and Event Hub.
The sources for this article are available on GitHub: https://github.com/lucasjellema/order-data-demo-devoxx .
The setup described in this article was used as a demonstration during my presentation on “50 Shades of Data” during Devoxx Morocco (14-16 November, Casablanca, Morocco); the slidedeck for this session is available here:
The Customer Microservice
Implemented in Node (JS) using a MongoDB instance (a free cloud based instance on MLab) for its private data store, running locally and engaging with Oracle Event Hub to produce a CustomerModified event in case of a change in a customer record. The Customer Microservice exposes a REST API with a single resource (customer) and operations to retrieve, list, create and update customer(s).
The MongoDB database was created on MLab (https://mlab.com/) – a MongDB hosting service with free tier up to 500 MB. I created a database called world and in it prepared a collection called customers.
The customers can be listed through REST API calls to the customer microservice:
and new customers can be created through the API:
Which of course results in a new record in MongoDB:
Clearly the customer microservice has associated state (in the MongoDB database) but is itself stateless. It can be stopped and restarted and it will still be able to produce customer records. Multiple instances of the microservice could be running and they would each have access to the same data. There could be some concurrency conflicts that we currently do not really cater for.
The salient code for implementing the REST operation for retrieving the customers is the following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | var MongoClient = require( 'mongodb' ).MongoClient; var assert = require( 'assert' ); var mongodbHost = 'ds139719.mlab.com' ; var mongodbPort = '39791' ; var authenticate = 'mongousername:mongopassword@' var mongodbDatabase = 'world' ; var mongoDBUrl = 'mongodb://' + authenticate + mongodbHost + ':' + mongodbPort + '/' + mongodbDatabase; var http = require( 'http' ), express = require( 'express' ), bodyParser = require( 'body-parser' ), // npm install body-parser ; var moduleName = "customer-ms" ; var PORT = process.env.PORT || 5118; var appVersion = "1.0.2" ; var app = express(); var server = http.createServer(app); server.listen(PORT, function () { console.log( 'Server running, version ' + appVersion + ', Express is listening... at ' + PORT + " for Customer Microservice" ); }); app.use(bodyParser.urlencoded({ extended: true })); app.use(bodyParser.json({ "type" : '*/*' , "inflate" : "true" })); app.use( function (request, response, next) { response.setHeader( 'Access-Control-Allow-Origin' , '*' ); response.setHeader( 'Access-Control-Allow-Methods' , 'GET, POST, OPTIONS, PUT, PATCH, DELETE' ); response.setHeader( 'Access-Control-Allow-Headers' , 'X-Requested-With,content-type' ); response.setHeader( 'Access-Control-Allow-Credentials' , true ); next(); }); app.get( '/customer' , function (req, res) { // find customers in database MongoClient.connect(mongoDBUrl, function (err, db) { var nameOfCollection = "customers" db.collection(nameOfCollection).find( function (err, customersCursor) { if (err) { console.log(err); } else { // for each cursor element, add a customer to the result customers = { "customers" : []}; customersCursor.toArray( function (err, cmrs) { customers.customers = cmrs; res.statusCode = 200; res.setHeader( 'Content-Type' , 'application/json' ); res.setHeader( 'MyReply' , 'retrieved all customers' ); res.send( customers); }); } }) }) //connect }) |
Pretty straightforward code for setting up an Express based listener for GET requests at the specified port and url path /customer. When the request comes in, a connection is initialized to the MongoDB instance, all elements from the customers collection are retrieved and returned in a single JSON document.
Here is an example of a call to this operation in the REST API from Postman:
Inserting a new customer requires just a small additional method in the Node application, reacting to PUT requests:
1 2 3 4 5 6 7 8 9 10 11 12 13 | app.put( '/customer' , function (req, res) { var customer = req.body; MongoClient.connect(mongoDBUrl, function (err, db) { var nameOfCollection = "customers" db.collection(nameOfCollection).insertMany([customer], function (err, r) { res.statusCode = 200; res.setHeader( 'Content-Type' , 'application/json' ); res.setHeader( 'MyReply' , 'Create or Updated the Customer ' ); res.send(customer); }) //insertMany } //connect ) }) |
Updating an existing customer – to handle for example a name change as we will in just a moment – is very similar, triggered by POST requests
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | var eventBusPublisher = require( "./EventPublisher.js" ); app.post( '/customer/:customerId' , function (req, res) { var customerId = req.params[ 'customerId' ]; var customer = req.body; customer.id = customerId; // find customer in database and update MongoClient.connect(mongoDBUrl, function (err, db) { var nameOfCollection = "customers" db.collection(nameOfCollection).findAndModify( { "id" : customerId } , [[ '_id' , 'asc' ]] // sort order , { $set: customer } , {} , function (err, updatedCustomer) { if (err) { console.log(err); } else { console.log( "Customer updated :" + JSON.stringify(updatedCustomer)); // Now publish an event of type CustomerModified on Event Hub Cloud Service eventBusPublisher.publishEvent( "CustomerModified" , { "eventType" : "CustomerModified" , "customer" : customer , "module" : "customer.microservice" , "timestamp" : Date.now() }, topicName); // and compose the HTTP Response res.statusCode = 200; res.setHeader( 'Content-Type' , 'application/json' ); res.setHeader( 'MyReply' , 'Updated the Customer and published event on Event Hub - with id - ' + customerId); res.send(customer); } }) }) //connect }) |
There something different and important about this snippet. After the MongoDB findAndModify operation returns, a call is made to a local module EventPublisher. This module handles the communication to the Event Hub [Cloud Service]. The salient code in this module is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | var kafka = require( 'kafka-node' ); // from the Oracle Event Hub - Platform Cluster Connect Descriptor var kafkaConnectDescriptor = "129.xx.yy.zz" ; var Producer = kafka.Producer KeyedMessage = kafka.KeyedMessage; var client; var APP_VERSION = "0.8.3" var APP_NAME = "EventBusPublisher" console.log( "Initialized module " + APP_NAME + "version " + APP_VERSION); var producer; var client; function initializeKafkaProducer(attempt) { try { console.log(`Try to initialize Kafka Client at ${kafkaConnectDescriptor} and Producer, attempt ${attempt}`); client = new kafka.Client(kafkaConnectDescriptor); producer = new Producer(client); producer.on( 'ready' , function () { console.log( "Producer is ready in " + APP_NAME); }); producer.on( 'error' , function (err) { console.log( "failed to create the client or the producer " + JSON.stringify(err)); }) } catch (e) { console.log( "Exception in initializeKafkaProducer" + JSON.stringify(e)); // try again in 5 secs setTimeout(initializeKafkaProducer, 5000, ++attempt); } } //initializeKafkaProducer initializeKafkaProducer(1); var eventPublisher = module.exports; eventPublisher.publishEvent = function (eventKey, event, topic) { km = new KeyedMessage(eventKey, JSON.stringify(event)); payloads = [ { topic: topic, messages: [km], partition: 0 } ]; producer.send(payloads, function (err, data) { if (err) { console.error( "Failed to publish event with key " + eventKey + " to topic " + topic + " :" + JSON.stringify(err)); } console.log( "Published event with key " + eventKey + " to topic " + topic + " :" + JSON.stringify(data)); }); } // publishEvent |
At this point, we have implemented the following situation:
We can update the name of a customer – through a call to the REST API of the Customer microservice. The customer record is updated in the MongoDB database and a CustomerModified event is published to the EventHub’s devoxx-topic topic.
In the next installment, we will implement the Order microservice that runs on the Oracle Application Container cloud, uses a cloud database (DBaaS instance) and consumes CustomerModified events from Event Hub.