Chained Generators as Data Processing Pipeline


Generator in Python3

In Python3, Generator is a simplified implementation of Iterator.

A generator typically can be defined as:

def double_generator(values: list):
    for value in values:
        yield value * 2

Or concisely by using a generator expression:

double_generator = (i * 2 for i in [0, 1, 2, 3])

Chained Generators

By chaining up multiple generators, i.e., the output of one generator is fed to another generator, we can create a memory-efficient data processing pipeline.

For example, given a stream of infinite readings in Kelvin temperature, and we would like to convert them to Celsius readings in realtime.

Define a generator to consume data list, emulating a stream of incoming data:

def data_stream(data: list):
    for datum in data:
        yield datum

Define a generator to convert Kelvin to Celsius:

def kelvin_to_celsius(kelvin_data):
    for kelvin_datum in kelvin_data:
        yield kelvin_datum - 273.15

And a generator to limit desirable precision:

def round_one_decimal(data):
    for datum in data:
        yield round(datum, 1)

And one more generator to format as strings with Celsius symbol:

def celsius_format(celsius_data):
    for celsius_datum in celsius_data:
        yield f"{celsius_datum}°C"

And pipeline_process function demonstrates a way to chain these generators together:

def pipeline_process(data):
    readings = celsius_format(
        round_one_decimal(
            kelvin_to_celsius(
                data_stream(data)
            )
        )
    )

Analysis

Once a datum is processed in one generator, it is fed to the next generator; there’s no need to keep a local result per generator.

As there’s only one data present in the pipeline at a time, requiring no additional memory during runtime, chained generators won’t experience memory spike even in the case of a large volume of data.

Additionally, as generators are defined as isolated functions, it’s straightforward for modification and thus boost maintainability.

Demo

A demo is shared on GitHub chained-generators-data-pipeline