From 02480e8004f79d01012b0ec309bfcfbe22ee5159 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Tue, 29 Jul 2025 18:51:58 +0000 Subject: [PATCH] fix: addressed catch function pickleability --- laygo/transformers/http.py | 2 +- laygo/transformers/parallel.py | 2 +- laygo/transformers/threaded.py | 2 +- laygo/transformers/transformer.py | 25 ++++++++++++++++++------- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/laygo/transformers/http.py b/laygo/transformers/http.py index 5127eb1..4087628 100644 --- a/laygo/transformers/http.py +++ b/laygo/transformers/http.py @@ -233,7 +233,7 @@ def apply[T](self, t: Callable[["HTTPTransformer[In, Out]"], "Transformer[In, T] def catch[U]( self, sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]], - on_error: ChunkErrorHandler[Out, U] | None = None, + on_error: ChunkErrorHandler[Out, None] | None = None, ) -> "HTTPTransformer[In, U]": super().catch(sub_pipeline_builder, on_error) return self # type: ignore diff --git a/laygo/transformers/parallel.py b/laygo/transformers/parallel.py index f30b4e4..5c3ce09 100644 --- a/laygo/transformers/parallel.py +++ b/laygo/transformers/parallel.py @@ -226,7 +226,7 @@ def apply[T]( def catch[U]( self, sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]], - on_error: ChunkErrorHandler[Out, U] | None = None, + on_error: ChunkErrorHandler[Out, None] | None = None, ) -> "ParallelTransformer[In, U]": super().catch(sub_pipeline_builder, on_error) return self # type: ignore diff --git a/laygo/transformers/threaded.py b/laygo/transformers/threaded.py index 11dedca..7f28856 100644 --- a/laygo/transformers/threaded.py +++ b/laygo/transformers/threaded.py @@ -241,7 +241,7 @@ def apply[T]( def catch[U]( self, sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]], - on_error: ChunkErrorHandler[Out, U] | None = None, + on_error: ChunkErrorHandler[Out, None] | None = None, ) -> "ThreadedTransformer[In, U]": super().catch(sub_pipeline_builder, on_error) return self # type: ignore diff --git a/laygo/transformers/transformer.py b/laygo/transformers/transformer.py index b84354a..a01e8f2 100644 --- a/laygo/transformers/transformer.py +++ b/laygo/transformers/transformer.py @@ -442,7 +442,7 @@ def _reduce(data: Iterable[In], context: IContextManager | None = None) -> Itera def catch[U]( self, sub_pipeline_builder: Callable[["Transformer[Out, Out]"], "Transformer[Out, U]"], - on_error: ChunkErrorHandler[Out, U] | None = None, + on_error: ChunkErrorHandler[Out, None] | None = None, ) -> "Transformer[In, U]": """Isolate a sub-pipeline in a chunk-based try-catch block. @@ -450,14 +450,20 @@ def catch[U]( Args: sub_pipeline_builder: A function that builds the sub-pipeline to protect. - on_error: Optional error handler for when the sub-pipeline fails. + on_error: A picklable error handler for when the sub-pipeline fails. + It takes a chunk, exception, and context, and must return a + replacement chunk (`list[U]`). If not provided, an empty + list is returned on error. Returns: A transformer with error handling applied. """ - if on_error: - self.on_error(on_error) # type: ignore + # Use the global error handler if it exists, otherwise create an internal one + catch_error_handler = self.error_handler + + if on_error is not None: + catch_error_handler.on_error(on_error) # type: ignore # Create a blank transformer for the sub-pipeline temp_transformer = createTransformer(_type_hint=..., chunk_size=self.chunk_size) # type: ignore @@ -466,13 +472,18 @@ def catch[U]( sub_pipeline = sub_pipeline_builder(temp_transformer) sub_transformer_func = sub_pipeline.transformer + # This 'operation' function is now picklable. It only closes over + # `sub_transformer_func` and `catch_error_handler`, both of which are + # picklable, and it no longer references `self`. def operation(chunk: list[Out], ctx: IContextManager) -> list[U]: try: - # Attempt to process the whole chunk with the sub-pipeline + # Attempt to process the chunk with the sub-pipeline return sub_transformer_func(chunk, ctx) except Exception as e: - # On failure, delegate to the chunk-based error handler - self.error_handler.handle(chunk, e, ctx) + # Call the error handler (which may include both global and local handlers) + catch_error_handler.handle(chunk, e, ctx) + + # Return an empty list as the default behavior after handling the error return [] return self._pipe(operation) # type: ignore