In PySpark's structured streaming, striving for modular, reusable code while ensuring adaptability for each unique use case is a common challenge. I recently had the task of leveraging the broad capabilities of the
writeStream method without losing the granular control that the
foreachBatch function provides.
For those unfamiliar,
foreachBatch Spark's streaming API lets you perform custom operations over each batch of data in a streaming data frame. This can be crucial when you have tailored procedures or integrate with external systems.
To tackle this, I designed a class that permitted users to define specific operations to run on each batch. Additionally, there was a default function,
used_everywhere, which was meant to act as a universal operation on all batches.
Here's a snapshot of my preliminary design:
from typing import Callable from pyspark.sql import DataFrame class Application: def __init__(self, ...): self._batch_functions = [used_everywhere] ... def batch_functions(self): return self._batch_functions def register_batch_function(self, func: Callable[DataFrame, int]): self._batch_functions = [func] + self.batch_functions def write(self): ... stream = self.process().writeStream for func in self.batch_functions: stream = stream.foreachBatch(func) ...
The idea behind the
used_everywhere function was to serve as a universal step, perhaps for a consistent output methodology or other similar purposes. However, during testing, I noticed an oversight. Each iteration overrode the previous
foreachBatch function, leading to only the final function in the
_batch_functions list being executed (odd, and potentially can be improved by allowing chaining, like
This realization necessitated a rethink. Instead of applying functions in isolation, I pondered how to sequence all functions in
self.batch_functions and pass that unified function to
Here's my adapted solution:
... def write(self): ... def chained_foreach_batch(df: DataFrame, batch_id: int) -> DataFrame: for func in self.batch_functions: df = func(df, batch_id) ... self.process().writeStream.foreachBatch(chained_foreachBatch) ...
This iteration uses an internal
chained_foreach_batch function within the
write method. It accepts a DataFrame and a batch_id and applies all functions from
self.batch_functions in sequence.
While working on this, I was also made aware of Toolz, a library that can be used for functional composition (more info). It might have offered a more streamlined solution. I could also work on crafting one using
partial methods. However, in this instance, I was keen on keeping the implementation simple and as close to pure Python as possible to ensure broader accessibility and maintainability.
This endeavor underscored the essence of deeply understanding the behavior of the APIs at hand and the importance of iterative refinement to meet nuanced requirements. Stay curious!