In PySpark's structured streaming, striving for modular, reusable code while ensuring adaptability for each unique use case is a common challenge. I recently had the task of leveraging the broad capabilities of the writeStream
method without losing the granular control that the foreachBatch
function provides.
For those unfamiliar, foreachBatch
Spark's streaming API lets you perform custom operations over each batch of data in a streaming data frame. This can be crucial when you have tailored procedures or integrate with external systems.
To tackle this, I designed a class that permitted users to define specific operations to run on each batch. Additionally, there was a default function, used_everywhere
, which was meant to act as a universal operation on all batches.
Here's a snapshot of my preliminary design:
from typing import Callable
from pyspark.sql import DataFrame
class Application:
def __init__(self, ...):
self._batch_functions = [used_everywhere]
...
@property
def batch_functions(self):
return self._batch_functions
def register_batch_function(self, func: Callable[DataFrame, int]):
self._batch_functions = [func] + self.batch_functions
def write(self):
...
stream = self.process().writeStream
for func in self.batch_functions:
stream = stream.foreachBatch(func)
...
The idea behind the used_everywhere
function was to serve as a universal step, perhaps for a consistent output methodology or other similar purposes. However, during testing, I noticed an oversight. Each iteration overrode the previous foreachBatch
function, leading to only the final function in the _batch_functions
list being executed (odd, and potentially can be improved by allowing chaining, like .transform
).
This realization necessitated a rethink. Instead of applying functions in isolation, I pondered how to sequence all functions in self.batch_functions
and pass that unified function to foreachBatch
.
Here's my adapted solution:
...
def write(self):
...
def chained_foreach_batch(df: DataFrame, batch_id: int) -> DataFrame:
for func in self.batch_functions:
df = func(df, batch_id)
...
self.process().writeStream.foreachBatch(chained_foreachBatch)
...
This iteration uses an internal chained_foreach_batch
function within the write
method. It accepts a DataFrame and a batch_id and applies all functions from self.batch_functions
in sequence.
While working on this, I was also made aware of Toolz, a library that can be used for functional composition (more info). It might have offered a more streamlined solution. I could also work on crafting one using functools
's reduce
and partial
methods. However, in this instance, I was keen on keeping the implementation simple and as close to pure Python as possible to ensure broader accessibility and maintainability.
This endeavor underscored the essence of deeply understanding the behavior of the APIs at hand and the importance of iterative refinement to meet nuanced requirements. Stay curious!