Oracle offers an Event Bus Cloud Service – an enterprise grade Apache Kafka instance – with large numbers of partitions and topics, (retained) messages and distributed nodes. Setting up this cloud service is simple enough – especially once you know what to do, as I will demonstrate in this article. In order to communicate with this Event Bus from a local client – in this case created with Node – we need to open up some ports for access from the public internet.
The steps I went through:
- create Oracle Event Hub Cloud Service – Platform instance (the Kafka & Zookeeper instance)
- create Oracle Event Hub Cloud Service – Service instance (a Topic)
- create two network rules to allow access from the public internet to ports 2181 – Zookeeper – and 6667 – Kafka Server)
- create two Node applications using the kafka-node package – one to produce and one to consume (based on the work done by Kunal Rupani of Oracle (GitHub: https://github.com/kunalrupani/OracleEventHubConsumer)
Note: the Event Hub Cloud Service is a metered service, billed per hour. The cost for the smallest shape is around $0,70 per hour (or $200/month for non metered).
Create Oracle Event Hub Cloud Service – Platform instance (the Kafka & Zookeeper instance)
Create Oracle Event Hub Cloud Service – Service instance (a Topic)
Switch to the Big Data Compute CS and select the service category Oracle Event Hub Cloud Service –Topics.
Create two network rules to allow access from the public internet to ports 2181 – Zookeeper – and 6667 – Kafka Server)
Create two Node applications using the kafka-node package
one to produce and one to consume (based on the work done by Kunal Rupani of Oracle (GitHub: https://github.com/kunalrupani/OracleEventHubConsumer)
Note: the topic was created as microEventBus. The actual name under which it is accessed is partnercloud17-microEventBus (which is identity domain as prefix to the topic name)
The package.json contains the dependency on kafka-node:
The straightforward code for producing to the Event Hub Kafka Topic:
var EVENT_HUB_PUBLIC_IP = '129.xxxxxxxx'; var TOPIC_NAME = 'partnercloud17-microEventBus'; var ZOOKEEPER_PORT = 2181; var kafka = require('kafka-node'); var Producer = kafka.Producer; var client = new kafka.Client(EVENT_HUB_PUBLIC_IP + ':'+ZOOKEEPER_PORT); var producer = new Producer(client); let payloads = [ { topic: TOPIC_NAME, messages: '*', partition: 0 } ]; console.log(payloads[0].messages); setInterval(function () { console.log('called about every 1 second'); producer.send(payloads, function (err, data) { if (err) { console.error(err); } console.log(data); }); if (payloads[0].messages.length < 10) { payloads[0].messages = payloads[0].messages + "*"; } else { payloads[0].messages = "*"; } }, 1000);
And the similar code for consuming. Note that the producer talks to Zookeeper (port 2181) and the consumer to the Kafka Server itself (port 6667):
var EVENT_HUB_PUBLIC_IP = '129xxxxxxx'; var TOPIC_NAME = 'partnercloud17-microEventBus'; var KAFKA_SERVER_PORT = 6667; var kafka = require('kafka-node'), Consumer = kafka.Consumer, client = new kafka.KafkaClient({ "kafkaHost": EVENT_HUB_PUBLIC_IP + ':' + KAFKA_SERVER_PORT }), consumer = new Consumer( client, [ { topic: TOPIC_NAME, offset: 1 } ], { autoCommit: false, fromOffset: true } ); consumer.on('message', function (message) { console.log(message.value) });
Here is the output for producing:
and here for consuming: