Apache NiFi: Let through the latest event in a timeframe image 67

Apache NiFi: Let through the latest event in a timeframe

In the IoT world, some devices generate large volumes of events that can be difficult for back-end systems to process in real time. Of course you can use NiFi to throttle messages, however this will not be sufficient if the flow of events is consistently higher than what can be handled by the back-end system. A way to deal with this is to let Apache NiFi merge messages for a device based on a specific attribute within a timeframe and use a transformation to obtain the latest message from the merged message. Be careful though, since not all messages will pass through the entire flow, you might lose information.

In this blog post I’ll illustrate how you can do this. The trick is to merge several messages using the MergeContent processor and then select the latest one using a Jolt transformation.

Apache NiFi: Let through the latest event in a timeframe image 67
Only let through the latest message in a timeframe

The NiFi flow

I created the following sample NiFi flow which you can download here

Apache NiFi: Let through the latest event in a timeframe image 1 1
NiFi flow for filtering messages

GenerateFlowFile

This flow has 3 generate GenerateFlowFile processors. They generate the following 3 messages. Note only the eventTime and eventInstanceId differ;

Apache NiFi: Let through the latest event in a timeframe image 2 1
Sample messages. Only eventTime and eventInstanceId differ

EvaluateJsonPath

Apache NiFi: Let through the latest event in a timeframe image 3 1
Put the objectIdentifier from the flowfile in an attribute

This processor puts the objectIdentifier from the flowfile in the objectIdentifier attribute.

MergeContent

Apache NiFi: Let through the latest event in a timeframe image 4 1
Merge messages within a timeframe in a single JSON based on objectIdentifier

The MergeContent processor merges the separate JSON messages into one large JSON flowfile for a period of 10 seconds. Mind that the maximum number of messages is in this example set to 1000. If more than 1000 messages arrive with the same objectIdentifier within 10 seconds, the surplus messages will go to the failure relationship. Also mind that the maximum number of bins (5 in the above screenshot) indicates how many different objectIdentifier bins will be held in memory at the same time. You probably want to increase both numbers for a real life scenario.

JoltTransformJSON

You can download the transformation here. It first sorts based on eventtime and then it takes the latest one. In this example, the message with the eventTime “2021-04-16T11:41:28Z” (the second message). If there are several messages with the same eventTime, the latest message received, is passed along. N.b. this transformation can most likely be improved upon.

Apache NiFi: Let through the latest event in a timeframe ordering messages

Finally

This blog post illustrates how you can do simple stream processing using NiFi. NiFi is not specifically the right tool to do this but it is good to know that if you need it, you can. This seems a roundabout way to do something which seems like it could be a standard feature. If you find a more suitable solution to achieve the same using NiFi, please let me know and I’ll update this blog post.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.