Composing Functions for PySpark's Structured Streaming

Photo by David Clode on Unsplash

Composing Functions for PySpark's Structured Streaming

ยท

2 min read

In PySpark's structured streaming, striving for modular, reusable code while ensuring adaptability for each unique use case is a common challenge. I recently had the task of leveraging the broad capabilities of the writeStream method without losing the granular control that the foreachBatch function provides.

For those unfamiliar, foreachBatch Spark's streaming API lets you perform custom operations over each batch of data in a streaming data frame. This can be crucial when you have tailored procedures or integrate with external systems.

To tackle this, I designed a class that permitted users to define specific operations to run on each batch. Additionally, there was a default function, used_everywhere, which was meant to act as a universal operation on all batches.

Here's a snapshot of my preliminary design:

from typing import Callable
from pyspark.sql import DataFrame

class Application:
    def __init__(self, ...):
        self._batch_functions = [used_everywhere]
        ...

    @property
    def batch_functions(self):
        return self._batch_functions

    def register_batch_function(self, func: Callable[DataFrame, int]):
        self._batch_functions = [func] + self.batch_functions

    def write(self):
        ...
        stream = self.process().writeStream
        for func in self.batch_functions:
            stream = stream.foreachBatch(func)
        ...

The idea behind the used_everywhere function was to serve as a universal step, perhaps for a consistent output methodology or other similar purposes. However, during testing, I noticed an oversight. Each iteration overrode the previous foreachBatch function, leading to only the final function in the _batch_functions list being executed (odd, and potentially can be improved by allowing chaining, like .transform).

This realization necessitated a rethink. Instead of applying functions in isolation, I pondered how to sequence all functions in self.batch_functions and pass that unified function to foreachBatch.

Here's my adapted solution:

...
    def write(self):
        ...
        def chained_foreach_batch(df: DataFrame, batch_id: int) -> DataFrame:
            for func in self.batch_functions:
                df = func(df, batch_id)
        ...
        self.process().writeStream.foreachBatch(chained_foreachBatch)
        ...

This iteration uses an internal chained_foreach_batch function within the write method. It accepts a DataFrame and a batch_id and applies all functions from self.batch_functions in sequence.

While working on this, I was also made aware of Toolz, a library that can be used for functional composition (more info). It might have offered a more streamlined solution. I could also work on crafting one using functools's reduce and partial methods. However, in this instance, I was keen on keeping the implementation simple and as close to pure Python as possible to ensure broader accessibility and maintainability.

This endeavor underscored the essence of deeply understanding the behavior of the APIs at hand and the importance of iterative refinement to meet nuanced requirements. Stay curious!

ย