Apache NiFi: Reading COVID data from a REST API and producing it to a Kafka topic

Maarten Smeets

Apache NiFi can be used to accelerate big data projects by allowing easy integration between various data sources. Using Apache NiFi it is easy to track what happened to your data (data provenance) and to provide features like guaranteed ordered delivery and error handling. In this example I’m going to configure NiFi to read COVID data from a REST API, split the data into individual records per country and publish the result to a Kafka topic. I’ve used the environment described here.

Calling a REST service

The InvokeHTTP processor can be used to call REST APIs. I’ve configured this as follows:

InvokeHTTP processor to call REST services

I’ve used a GET request on the following API: https://api.covid19api.com/summary. This gives you a JSON like:

JSON response from Covid19API

Splitting the JSON

The SplitJson processor can split the input JSON into separate FlowFiles per country. This comes in handy when writing individual records to the Kafka topic.

SplitJson processor

Publishing records to Kafka

Below is the configuration for the PublishKafkaRecord processor. Not in the screenshot is Message Key Field. This indicates which field in the message is used as the key. I’ve used the Country field (“Country” as value for this field). Why did I use the country field? Messages with the same key are published to the same partition within Kafka and the ordering on a partition is guaranteed. When I consume messages, I consume from a specific partition. I can be sure I’ll consume them in the order they arrived per country.

Configuration for the PublishKafkaRecord processor

Important to notice is that I use the JsonTreeReader in order to collect the entire JSON record. The JsonTreeReader and JsonRecordSetWriter are controller services which have a default configuration.

Controller services

Result

When you enable your processors, you can run the processor which calls the REST API once. Of course, you can also schedule the execution of the InvokeHTTP processor to for example fetch data daily. It will read the data, the data will be split into individual FlowFiles and these FlowFiles will be produced to a Kafka topic which uses the country name as its key.

Call the API once to test the flow

You can browse your Kafka topic with for example OffsetExplorer in order to check if the messages have arrived successfully.

Offset Explorer. Previously called Kafkatool

You can also of course use the data provenance facilities by right clicking on a processor, “view data provenance”. This allows you to examine the input and output of every processor.

Data provenance. A powerful feature of Apache NiFi.

Finally

This simple example shows what you can achieve with relatively little effort using Apache NiFi. This is just the tip of the iceberg of this powerful product. Important to keep in mind when starting out with Apache NiFi is to look for simple solutions and not go overboard with many components, custom scripts, Jolt transformations, custom processors, etc when you do not really need them. This allows you and others to better understand what is going and also often makes the solution more efficient.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Next Post

Composing DAPR Custom State Store Component for Oracle Cloud Object Storage

Dapr is a runtime framework for distributed applications as well as a personal assistant with many generic qualities relevant to any application. Through Dapr, applications can easily benefit from configuration and secret management, from interacting with dozens of external technologies in both inbound and outbound direction, from having messages received […]
%d bloggers like this: