JavaScript Pipelining using Asynchronous Generators to implement Running Aggregates image 26

JavaScript Pipelining using Asynchronous Generators to implement Running Aggregates

image

As of ES 2018 (recent browsers or Node 10), JavaScript support asynchronous generators. Generators are functions that return a set of values, one value at a time. These values can be processed inside the code that invokes the generator immediately, as soon as they become available. There is no need to wait for the entire result set to be composed first. In cases where the result set is huge or even never ending, this is quite convenient. The result from one generator function can be fed into another function which can be a generator function too. And so on. This makes pipelining possible: a series of functions, all working together (and more or less in parallel) on taking each result through a series of processing steps.

With the fairly recent addition of asynchronous generators, the generator function producing the result set may be asynchronous – relying for example on Promises to gather its values.

In this article, I want to show something of the beauty of all of this. I will share a simple ES 2018/Node application that uses Promises to produce values asynchronously – triggered by time outs. Three Promises represent three temperature sensors; in this case the values are simply generated. However, these Promises could just as well read values from an external source or consume incoming events. Each Promise when resolved produces a sensor readout. The promise is wrapped in a promise that writes the `sensor value to temporary store (latestValue) and removes itself from the sensorPool – the set of promises function sensorValues() is waiting on using Promise.race([…sensorPool])

image

In asynchronous generator function sensorValues() we wait in a endless loop for one or sensorPromises to resolve (Promise.race resolves to first of the set of promises to resolve). When that happens, the latestValue – written when the sensor promise resolved – is yielded.

Another asynchronous generator function – runningSensorAverages – is triggered by the yield from sensorValues (in the loop for await (sensorReading of sensorReadings)). The value yielded added to the values collection for the current sensor in the sensors map. The value of ticks is increased; ticks counts the number of values received since the last calculation of the running aggregate. If the value of ticks equals the value of period (the parameter that specifies after how many values a new aggregate should be calculated), then a new aggregate is calculated, using the last windowSize values in the values collection for the current sensor. The value calculated is yielded (and ticks is reset).

The yielded running aggregate is received in function doIt(). This function writes the yielded value to the console – from another for await loop.

The result looks like this:

image

The pipelining nature of this application is best captured by this line:

for await (runningAverage of runningSensorAverages(filterOutliersFromSensorReadings( sensorValues()), 15, 10)) {..}

The streaming result from sensorValues() is piped – one reading at a tine – to the filter function and the output from that function to runningSensorAverages whose output appears as subsequent values in the for await loop.

 

Adding Time Windowed Aggregates

While we are at it, let’s add Time Windowed aggregates: averages produced every X seconds.

The implementation is done using a cache – a temporary store for the sensor readings that is written by runningSensorAverages(). Function timeWindowedAggregates() is triggered by a time out after a period specified by parameter timeWindow. When the function ‘wakes up’ , it reads the current contents from the cache, calculates and yields the averages.

image

Function doIt2() contains a loop over the generator timeWindowedAggregates():  await for (timedWindowAggregate of timeWindowedAggregates(6000)) that prints the averages to the console.

The combined output looks like this:

image

Note that all timed window averages are produced at the same time (over different numbers of readings between the sensors) and the running aggregates are produced at different times (over the same numbers of readings).

The extended code base:

 

Resources

Iterate partial results of Promise.all – https://agentcooper.io/iterate-promise-all/

JavaScript Arrays — Finding The Minimum, Maximum, Sum, & Average Values – https://codeburst.io/javascript-arrays-finding-the-minimum-maximum-sum-average-values-f02f1b0ce332

Moving Average (Wikipedia) – https://en.wikipedia.org/wiki/Moving_average

How to make your JavaScript functions sleep – https://flaviocopes.com/javascript-sleep/

Javascript – Generator-Yield/Next & Async-Await – https://codeburst.io/javascript-generator-yield-next-async-await-e428b0cb52e4

Asynchronous Generators and Pipelines in JavaScript – https://dev.to/nestedsoftware/asynchronous-generators-and-pipelines-in-javascript–1h62

Let’s experiment with functional generators and the pipeline operator in JavaScript – https://medium.freecodecamp.org/lets-experiment-with-functional-generators-and-the-pipeline-operator-in-javascript-520364f97448