Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions laygo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""
Laygo - A lightweight Python library for building resilient, in-memory data pipelines
"""Laygo - A lightweight Python library for building resilient, in-memory data pipelines.

This library provides a modern, type-safe approach to data processing with
support for parallel execution, error handling, and context-aware operations.
"""

from laygo.errors import ErrorHandler
Expand Down
49 changes: 37 additions & 12 deletions laygo/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,56 @@


def raise_error(chunk: list, error: Exception, context: PipelineContext) -> None:
"""A handler that always re-raises the error, stopping execution."""
"""Handler that always re-raises the error, stopping execution.

This is a default error handler that provides fail-fast behavior by
re-raising any exception that occurs during chunk processing.

Args:
chunk: The data chunk that was being processed when the error occurred.
error: The exception that was raised.
context: The pipeline context at the time of the error.

Raises:
Exception: Always re-raises the provided error.
"""
raise error


class ErrorHandler:
"""
Stores and executes a chain of error handlers.
Handlers are executed in reverse order. The assumption is that the closer the handler
is to the error, the earlier it should be executed.
"""Stores and executes a chain of error handlers.

Error handlers are executed in reverse order of addition. This design
assumes that handlers closer to the error source should be executed first.
"""

def __init__(self):
def __init__(self) -> None:
"""Initialize an empty error handler chain."""
self._handlers: list[ChunkErrorHandler] = []

def on_error(self, handler: ChunkErrorHandler) -> "ErrorHandler":
"""
Adds a new handler to the chain.
This method modifies the ErrorHandler instance in-place.
"""Add a new handler to the beginning of the chain.

Args:
handler: A callable that processes errors. It receives the chunk
being processed, the exception that occurred, and the
pipeline context.

Returns:
The ErrorHandler instance for method chaining.
"""
self._handlers.insert(0, handler)
return self

def handle(self, chunk: list, error: Exception, context: PipelineContext) -> None:
"""
Executes all handlers in the chain using a list comprehension.
Execution only stops if a handler raises an exception.
"""Execute all handlers in the chain.

Handlers are executed in reverse order of addition. Execution stops
if any handler raises an exception.

Args:
chunk: The data chunk that was being processed when the error occurred.
error: The exception that was raised.
context: The pipeline context at the time of the error.
"""
[handler(chunk, error, context) for handler in self._handlers]
37 changes: 27 additions & 10 deletions laygo/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
from typing import TypeGuard


class PipelineContext(dict):
"""Generic, untyped context available to all pipeline operations."""
class PipelineContext(dict[str, Any]):
"""Generic, untyped context available to all pipeline operations.

A dictionary-based context that can store arbitrary data shared across
pipeline operations. This allows passing state and configuration between
different stages of data processing.
"""

pass

Expand All @@ -16,20 +21,32 @@ class PipelineContext(dict):


def is_context_aware(func: Callable[..., Any]) -> TypeGuard[ContextAwareCallable]:
"""
Checks if a function is "context-aware" by inspecting its signature.
"""Check if a function is context-aware by inspecting its signature.

A context-aware function accepts a PipelineContext as its second parameter,
allowing it to access shared state during pipeline execution.

This function uses a TypeGuard, allowing Mypy to narrow the type of
the checked function in conditional blocks.
Args:
func: The function to inspect for context awareness.

Returns:
True if the function accepts more than one parameter (indicating it's
context-aware), False otherwise.
"""
return len(inspect.signature(func).parameters) > 1


def is_context_aware_reduce(func: Callable[..., Any]) -> TypeGuard[ContextAwareReduceCallable]:
"""
Checks if a function is "context-aware" by inspecting its signature.
"""Check if a reduce function is context-aware by inspecting its signature.

A context-aware reduce function accepts an accumulator, current value,
and PipelineContext as its three parameters.

Args:
func: The reduce function to inspect for context awareness.

This function uses a TypeGuard, allowing Mypy to narrow the type of
the checked function in conditional blocks.
Returns:
True if the function accepts more than two parameters (indicating it's
context-aware), False otherwise.
"""
return len(inspect.signature(func).parameters) > 2
120 changes: 88 additions & 32 deletions laygo/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,23 @@


class Pipeline[T]:
"""Manages a data source and applies transformers to it.

A Pipeline provides a high-level interface for data processing by chaining
transformers together. It automatically manages a multiprocessing-safe
shared context that can be accessed by all transformers in the chain.
"""
Manages a data source and applies transformers to it.
Always uses a multiprocessing-safe shared context.
"""

def __init__(self, *data: Iterable[T]):
def __init__(self, *data: Iterable[T]) -> None:
"""Initialize a pipeline with one or more data sources.

Args:
*data: One or more iterable data sources. If multiple sources are
provided, they will be chained together.

Raises:
ValueError: If no data sources are provided.
"""
if len(data) == 0:
raise ValueError("At least one data source must be provided to Pipeline.")
self.data_source: Iterable[T] = itertools.chain.from_iterable(data) if len(data) > 1 else data[0]
Expand All @@ -39,19 +50,25 @@ def __init__(self, *data: Iterable[T]):
# Store reference to original context for final synchronization
self._original_context_ref: PipelineContext | None = None

def __del__(self):
def __del__(self) -> None:
"""Clean up the multiprocessing manager when the pipeline is destroyed."""
try:
self._sync_context_back()
self._manager.shutdown()
except Exception:
pass # Ignore errors during cleanup
pass

def context(self, ctx: PipelineContext) -> "Pipeline[T]":
"""
Updates the pipeline context and stores a reference to the original context.
"""Update the pipeline context and store a reference to the original context.

When the pipeline finishes processing, the original context will be updated
with the final pipeline context data.

Args:
ctx: The pipeline context to use for this pipeline execution.

Returns:
The pipeline instance for method chaining.
"""
# Store reference to the original context
self._original_context_ref = ctx
Expand All @@ -60,9 +77,10 @@ def context(self, ctx: PipelineContext) -> "Pipeline[T]":
return self

def _sync_context_back(self) -> None:
"""
Synchronize the final pipeline context back to the original context reference.
This is called after processing is complete.
"""Synchronize the final pipeline context back to the original context reference.

This is called after processing is complete to update the original
context with any changes made during pipeline execution.
"""
if self._original_context_ref is not None:
# Copy the final context state back to the original context reference
Expand All @@ -72,15 +90,16 @@ def _sync_context_back(self) -> None:
self._original_context_ref.update(final_context_state)

def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]":
"""
Shorthand method to apply a transformation using a lambda function.
"""Apply a transformation using a lambda function.

Creates a Transformer under the hood and applies it to the pipeline.
This is a shorthand method for simple transformations.

Args:
t: A callable that takes a transformer and returns a transformed transformer
t: A callable that takes a transformer and returns a transformed transformer.

Returns:
A new Pipeline with the transformed data
A new Pipeline with the transformed data.
"""
# Create a new transformer and apply the transformation function
transformer = t(Transformer[T, T]())
Expand All @@ -101,53 +120,90 @@ def apply[U](
| Callable[[Iterable[T]], Iterator[U]]
| Callable[[Iterable[T], PipelineContext], Iterator[U]],
) -> "Pipeline[U]":
"""
Applies a transformer to the current data source. The pipeline's
managed context is passed down.
"""Apply a transformer to the current data source.

The pipeline's managed context is passed down to the transformer.

Args:
transformer: Either a Transformer instance or a callable function
that processes the data.

Returns:
A new Pipeline with the transformed data.

Raises:
TypeError: If the transformer is not a supported type.
"""
match transformer:
case Transformer():
# The transformer is called with self.ctx, which is the
# shared mp.Manager.dict proxy when inside a 'with' block.
self.processed_data = transformer(self.processed_data, self.ctx) # type: ignore
case _ if callable(transformer):
if is_context_aware(transformer):
processed_transformer = transformer
self.processed_data = transformer(self.processed_data, self.ctx) # type: ignore
else:
processed_transformer = lambda data, ctx: transformer(data) # type: ignore # noqa: E731
self.processed_data = processed_transformer(self.processed_data, self.ctx) # type: ignore
self.processed_data = transformer(self.processed_data) # type: ignore
case _:
raise TypeError("Transformer must be a Transformer instance or a callable function")

return self # type: ignore

def buffer(self, size: int) -> "Pipeline[T]":
"""Buffer the pipeline using threaded processing.

Args:
size: The number of worker threads to use for buffering.

Returns:
The pipeline instance for method chaining.
"""
self.apply(ThreadedTransformer(max_workers=size))
return self

def __iter__(self) -> Iterator[T]:
"""Allows the pipeline to be iterated over."""
"""Allow the pipeline to be iterated over.

Returns:
An iterator over the processed data.
"""
yield from self.processed_data

def to_list(self) -> list[T]:
"""Executes the pipeline and returns the results as a list."""
"""Execute the pipeline and return the results as a list.

Returns:
A list containing all processed items from the pipeline.
"""
return list(self.processed_data)

def each(self, function: PipelineFunction[T]) -> None:
"""Applies a function to each element (terminal operation)."""
# Context needs to be accessed from the function if it's context-aware,
# but the pipeline itself doesn't own a context. This is a design choice.
# For simplicity, we assume the function is not context-aware here
# or that context is handled within the Transformers.
"""Apply a function to each element (terminal operation).

Args:
function: The function to apply to each element.
"""
for item in self.processed_data:
function(item)

def first(self, n: int = 1) -> list[T]:
"""Gets the first n elements of the pipeline (terminal operation)."""
"""Get the first n elements of the pipeline (terminal operation).

Args:
n: The number of elements to retrieve.

Returns:
A list containing the first n elements.

Raises:
AssertionError: If n is less than 1.
"""
assert n >= 1, "n must be at least 1"
return list(itertools.islice(self.processed_data, n))

def consume(self) -> None:
"""Consumes the pipeline without returning results."""
"""Consume the pipeline without returning results.

This is useful when you want to execute the pipeline for side effects
without collecting the results.
"""
for _ in self.processed_data:
pass
Loading
Loading