Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion laygo/transformers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def apply[T](self, t: Callable[["HTTPTransformer[In, Out]"], "Transformer[In, T]
def catch[U](
self,
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
on_error: ChunkErrorHandler[Out, U] | None = None,
on_error: ChunkErrorHandler[Out, None] | None = None,
) -> "HTTPTransformer[In, U]":
super().catch(sub_pipeline_builder, on_error)
return self # type: ignore
Expand Down
2 changes: 1 addition & 1 deletion laygo/transformers/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def apply[T](
def catch[U](
self,
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
on_error: ChunkErrorHandler[Out, U] | None = None,
on_error: ChunkErrorHandler[Out, None] | None = None,
) -> "ParallelTransformer[In, U]":
super().catch(sub_pipeline_builder, on_error)
return self # type: ignore
Expand Down
2 changes: 1 addition & 1 deletion laygo/transformers/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def apply[T](
def catch[U](
self,
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
on_error: ChunkErrorHandler[Out, U] | None = None,
on_error: ChunkErrorHandler[Out, None] | None = None,
) -> "ThreadedTransformer[In, U]":
super().catch(sub_pipeline_builder, on_error)
return self # type: ignore
Expand Down
25 changes: 18 additions & 7 deletions laygo/transformers/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,22 +442,28 @@ def _reduce(data: Iterable[In], context: IContextManager | None = None) -> Itera
def catch[U](
self,
sub_pipeline_builder: Callable[["Transformer[Out, Out]"], "Transformer[Out, U]"],
on_error: ChunkErrorHandler[Out, U] | None = None,
on_error: ChunkErrorHandler[Out, None] | None = None,
) -> "Transformer[In, U]":
"""Isolate a sub-pipeline in a chunk-based try-catch block.

If the sub-pipeline fails for a chunk, the on_error handler is invoked.

Args:
sub_pipeline_builder: A function that builds the sub-pipeline to protect.
on_error: Optional error handler for when the sub-pipeline fails.
on_error: A picklable error handler for when the sub-pipeline fails.
It takes a chunk, exception, and context, and must return a
replacement chunk (`list[U]`). If not provided, an empty
list is returned on error.

Returns:
A transformer with error handling applied.
"""

if on_error:
self.on_error(on_error) # type: ignore
# Use the global error handler if it exists, otherwise create an internal one
catch_error_handler = self.error_handler

if on_error is not None:
catch_error_handler.on_error(on_error) # type: ignore

# Create a blank transformer for the sub-pipeline
temp_transformer = createTransformer(_type_hint=..., chunk_size=self.chunk_size) # type: ignore
Expand All @@ -466,13 +472,18 @@ def catch[U](
sub_pipeline = sub_pipeline_builder(temp_transformer)
sub_transformer_func = sub_pipeline.transformer

# This 'operation' function is now picklable. It only closes over
# `sub_transformer_func` and `catch_error_handler`, both of which are
# picklable, and it no longer references `self`.
def operation(chunk: list[Out], ctx: IContextManager) -> list[U]:
try:
# Attempt to process the whole chunk with the sub-pipeline
# Attempt to process the chunk with the sub-pipeline
return sub_transformer_func(chunk, ctx)
except Exception as e:
# On failure, delegate to the chunk-based error handler
self.error_handler.handle(chunk, e, ctx)
# Call the error handler (which may include both global and local handlers)
catch_error_handler.handle(chunk, e, ctx)

# Return an empty list as the default behavior after handling the error
return []

return self._pipe(operation) # type: ignore
Expand Down
Loading