In the IoT world, some devices generate large volumes of events that can be difficult for back-end systems to process in real time. Of course you can use NiFi to throttle messages, however this will not be sufficient if the flow of events is consistently higher than what can be handled by the back-end system. A way to deal with this is to let Apache NiFi merge messages for a device based on a specific attribute within a timeframe and use a transformation to obtain the latest message from the merged message. Be careful though, since not all messages will pass through the entire flow, you might lose information.
In this blog post I’ll illustrate how you can do this. The trick is to merge several messages using the MergeContent processor and then select the latest one using a Jolt transformation.
The NiFi flow
I created the following sample NiFi flow which you can download here
This flow has 3 generate GenerateFlowFile processors. They generate the following 3 messages. Note only the eventTime and eventInstanceId differ;
This processor puts the objectIdentifier from the flowfile in the objectIdentifier attribute.
The MergeContent processor merges the separate JSON messages into one large JSON flowfile for a period of 10 seconds. Mind that the maximum number of messages is in this example set to 1000. If more than 1000 messages arrive with the same objectIdentifier within 10 seconds, the surplus messages will go to the failure relationship. Also mind that the maximum number of bins (5 in the above screenshot) indicates how many different objectIdentifier bins will be held in memory at the same time. You probably want to increase both numbers for a real life scenario.
You can download the transformation here. It first sorts based on eventtime and then it takes the latest one. In this example, the message with the eventTime “2021-04-16T11:41:28Z” (the second message). If there are several messages with the same eventTime, the latest message received, is passed along. N.b. this transformation can most likely be improved upon.
This blog post illustrates how you can do simple stream processing using NiFi. NiFi is not specifically the right tool to do this but it is good to know that if you need it, you can. This seems a roundabout way to do something which seems like it could be a standard feature. If you find a more suitable solution to achieve the same using NiFi, please let me know and I’ll update this blog post.