
Chained Generators as Data Processing Pipeline
Generator in Python3
In Python3, Generator is a simplified implementation of Iterator.
A generator typically can be defined as:
def double_generator(values: list):
for value in values:
yield value * 2
Or concisely by using a generator expression:
double_generator = (i * 2 for i in [0, 1, 2, 3])
Chained Generators
By chaining up multiple generators, i.e., the output of one generator is fed to another generator, we can create a memory-efficient data processing pipeline.
For example, given a stream of infinite readings in Kelvin temperature, and we would like to convert them to Celsius readings in realtime.
Define a generator to consume data list, emulating a stream of incoming data:
def data_stream(data: list):
for datum in data:
yield datum
Define a generator to convert Kelvin to Celsius:
def kelvin_to_celsius(kelvin_data):
for kelvin_datum in kelvin_data:
yield kelvin_datum - 273.15
And a generator to limit desirable precision:
def round_one_decimal(data):
for datum in data:
yield round(datum, 1)
And one more generator to format as strings with Celsius symbol:
def celsius_format(celsius_data):
for celsius_datum in celsius_data:
yield f"{celsius_datum}°C"
And pipeline_process
function demonstrates a way to chain these generators
together:
def pipeline_process(data):
readings = celsius_format(
round_one_decimal(
kelvin_to_celsius(
data_stream(data)
)
)
)
Analysis
Once a datum is processed in one generator, it is fed to the next generator; there’s no need to keep a local result per generator.
As there’s only one data present in the pipeline at a time, requiring no additional memory during runtime, chained generators won’t experience memory spike even in the case of a large volume of data.
Additionally, as generators are defined as isolated functions, it’s straightforward for modification and thus boost maintainability.
Demo
A demo is shared on GitHub chained-generators-data-pipeline