Go Rusty Python
Composition of Iterators in Python
Apply function composition to make a pipeline of iterators
Functional and Object-oriented programming is often considered as opposite. Composing Python’s iterator objects into a pipeline is a case when they perfectly complement each other. I will use one of NASA’s open APIs to show this pattern in action.
Showcase
API Asteroids — NeoWs is a RESTful web service for near-earth asteroid information. It allows to search asteroids on their closest approach date to Earth.
You can find it on the NASA API portal.
It contains some curious data — maximum object’s diameter, closest approach distance, date of approach, and others. Among these data, each object has a field that shows is object potentially dangerous for the earth.
Let’s consider a case when we want to fetch asteroids data, then write it to files by date, and find a potentially hazardous asteroid.
Often, I start by writing code how I would like to interact with not existing high-level classes or functions.
pipeline = compose(
Load(url=API_URL, api_key="DEMO_KEY", start_date="2021-05-09"),
Write(to_path="/tmp"),
Find(field="is_potentially_hazardous_asteroid"),
)
for each in pipeline():
print(each)
This code is mixing two paradigms Functional and Object-oriented programing.
Functional programming shines for the data processing tasks, for instance, sequentially operate and change on data. Objects allow to couple related behavior into one entity and store a state. Combining two paradigms makes code more declarative.
Configuration
We will start with isolating each step to separate objects. We will create basic classes which will store configuration for each step and later will add the required implementation.
We will need a step which will load data from the URL.
class Load:
"""Load data from a url"""
def __init__(self, *, url: str, api_key: str, start_date: str, , end_date: str = ""):
self.url = url.format(start_date, api_key)
Next we add step which will write loaded data about asteroids to files by date.
class Write:
"""Write data with asteroids to files by date"""
def __init__(self, *, to_path: str):
self.to_path = to_path
And last last step will be responsible to find hazardous asteroid. We will store a field which is responsible for this property.
class Find:
"""Find hazardous aseroid"""
def __init__(self, *, field: str):
self.field = field
Now let’s look on the function composition.
Function composition
Composition of functions is the one of the main patterns in the Functional Programming.
Intuitively, composing functions is a chaining process in which the output of function f
feeds the input of function g
.
In the Python, we can implement it as follows.
def compose2(f, g):
return lambda *args: g(f(*args))
Note that we want to execute functions as we read them, from left to right, and we use g(f(x))
.
For instance, consider two functions.
def square(x):
return x*x
def format(x):
return f"x*x = {x}"
We can apply compose2()
function to make a new one.
>>> square_and_format = compose2(square, format)
>>> print(square_and_format(2))
x*x = 4
Then we can apply reduce.
Reduce
Another highly used functional pattern is the fold or reduce functions.
The reduce()
method accumulates the array to a single value. The reduce()
method executes a provided function for each value of the array (from left-to-right).
By the way, Guido van Rossum didn’t like it.
So now reduce(). This is actually the one I’ve always hated most, because, apart from a few examples involving + or *, almost every time I see a reduce() call with a non-trivial function argument, I need to grab pen and paper to diagram what’s actually being fed into that function before I understand what the reduce() is supposed to do. So in my mind, the applicability of reduce() is pretty much limited to associative operators, and in all other cases it’s better to write out the accumulation loop explicitly.
- Guido van Rossum, The fate of reduce() in Python 3000
Luckily, it’s been hidden in the functools
module. So we can apply our compose2
function to the list of the functions.
from functools import reducedef compose(*functions):
return reduce(compose2, functions)
Now, we are able to apply composition for the lists of functions.
Callable
To be able to apply function composition and reduce to our objects, we need to make our classes callable.
In Python, if we want to make an object callable we need to implement magic method __call__
.
For the Load
class, it will be a simple returning of requested data.
class Load:
...
def __call__(self):
return requests.get(self.url)
For the Write
object, it will be a bit trickier. We will take an iterator as an argument, keep it, and return the object itself. As we will implement the iteration interface a bit later it will allow us to compose such objects.
class Write:
...
def __call__(self, iterator):
self.iterator = iterator
return self
For the Find
, it will look the same as for the Write
.
class Find:
...
def __call__(self, iterator):
self.iterator = iterator
return self
Iterator
Finally, we have to implement an iterator interface for our objects. Technically speaking, a Python object must implement __iter__
method to provide iteration support.
In our case, iteration will also keep the main logic of data processing.
We don’t need to implement iteration for the Load
class. It returns data as a list that can be iterated by default.
For the class Write
, the logic for processing and saving data to files will be implemented in this method.
class Write:
...
def _process(self, date, objects):
filepath = os.path.join(self.to_path, f"{date}.json")
with open(filepath, "w") as file:
file.write(json.dumps(objects))
def __iter__(self):
for date, objects in self.iterator.items():
self._process(date, objects)
yield date, objects
For the Find
class, while iteration we will filter object which meets our condition.
class Find:
...
def __iter__(self):
for _date, objects in self.iterator:
for item in objects:
if item[self.field]:
yield item
Final Result
And this is it. We implemented a few classes that can be initiated with configuration and composed into a pipeline as follows.
pipe = pipeline.compose(
pipeline.Load(
url=API_URL, api_key="DEMO_KEY", start_date="2021-05-09"
),
pipeline.Write(to_path="/tmp"),
pipeline.Find(field="is_potentially_hazardous_asteroid"),
)
for each in pipe():
print(each)
For the full example source code take a look into my GitHub repo iterators-composition.
Looks nice! What do you think?
Share you thoughts with me on LinkedIn and Twitter.
Conclusion
This pattern has several advantages, let’s name a few.
Pros:
- Separation of concerns. Each step has its separate role.
- More declarative notation for the algorithm.
- Each step can be tested separately.
But as with every pattern it shouldn’t be overused. If logic becoming too complicated or the number of steps too big, it is better to reconsider the approach.
More reading
If you like this article you can be interested in the following.
Thank you for reading! Share you thoughts with me on LinkedIn and Twitter.