diff --git a/laygo/__init__.py b/laygo/__init__.py index dc5dc88..1767cc0 100644 --- a/laygo/__init__.py +++ b/laygo/__init__.py @@ -1,5 +1,7 @@ -""" -Laygo - A lightweight Python library for building resilient, in-memory data pipelines +"""Laygo - A lightweight Python library for building resilient, in-memory data pipelines. + +This library provides a modern, type-safe approach to data processing with +support for parallel execution, error handling, and context-aware operations. """ from laygo.errors import ErrorHandler diff --git a/laygo/errors.py b/laygo/errors.py index 8afd5df..5e017ce 100644 --- a/laygo/errors.py +++ b/laygo/errors.py @@ -6,31 +6,56 @@ def raise_error(chunk: list, error: Exception, context: PipelineContext) -> None: - """A handler that always re-raises the error, stopping execution.""" + """Handler that always re-raises the error, stopping execution. + + This is a default error handler that provides fail-fast behavior by + re-raising any exception that occurs during chunk processing. + + Args: + chunk: The data chunk that was being processed when the error occurred. + error: The exception that was raised. + context: The pipeline context at the time of the error. + + Raises: + Exception: Always re-raises the provided error. + """ raise error class ErrorHandler: - """ - Stores and executes a chain of error handlers. - Handlers are executed in reverse order. The assumption is that the closer the handler - is to the error, the earlier it should be executed. + """Stores and executes a chain of error handlers. + + Error handlers are executed in reverse order of addition. This design + assumes that handlers closer to the error source should be executed first. """ - def __init__(self): + def __init__(self) -> None: + """Initialize an empty error handler chain.""" self._handlers: list[ChunkErrorHandler] = [] def on_error(self, handler: ChunkErrorHandler) -> "ErrorHandler": - """ - Adds a new handler to the chain. - This method modifies the ErrorHandler instance in-place. + """Add a new handler to the beginning of the chain. + + Args: + handler: A callable that processes errors. It receives the chunk + being processed, the exception that occurred, and the + pipeline context. + + Returns: + The ErrorHandler instance for method chaining. """ self._handlers.insert(0, handler) return self def handle(self, chunk: list, error: Exception, context: PipelineContext) -> None: - """ - Executes all handlers in the chain using a list comprehension. - Execution only stops if a handler raises an exception. + """Execute all handlers in the chain. + + Handlers are executed in reverse order of addition. Execution stops + if any handler raises an exception. + + Args: + chunk: The data chunk that was being processed when the error occurred. + error: The exception that was raised. + context: The pipeline context at the time of the error. """ [handler(chunk, error, context) for handler in self._handlers] diff --git a/laygo/helpers.py b/laygo/helpers.py index 6fdea3d..f3bbbf2 100644 --- a/laygo/helpers.py +++ b/laygo/helpers.py @@ -4,8 +4,13 @@ from typing import TypeGuard -class PipelineContext(dict): - """Generic, untyped context available to all pipeline operations.""" +class PipelineContext(dict[str, Any]): + """Generic, untyped context available to all pipeline operations. + + A dictionary-based context that can store arbitrary data shared across + pipeline operations. This allows passing state and configuration between + different stages of data processing. + """ pass @@ -16,20 +21,32 @@ class PipelineContext(dict): def is_context_aware(func: Callable[..., Any]) -> TypeGuard[ContextAwareCallable]: - """ - Checks if a function is "context-aware" by inspecting its signature. + """Check if a function is context-aware by inspecting its signature. + + A context-aware function accepts a PipelineContext as its second parameter, + allowing it to access shared state during pipeline execution. - This function uses a TypeGuard, allowing Mypy to narrow the type of - the checked function in conditional blocks. + Args: + func: The function to inspect for context awareness. + + Returns: + True if the function accepts more than one parameter (indicating it's + context-aware), False otherwise. """ return len(inspect.signature(func).parameters) > 1 def is_context_aware_reduce(func: Callable[..., Any]) -> TypeGuard[ContextAwareReduceCallable]: - """ - Checks if a function is "context-aware" by inspecting its signature. + """Check if a reduce function is context-aware by inspecting its signature. + + A context-aware reduce function accepts an accumulator, current value, + and PipelineContext as its three parameters. + + Args: + func: The reduce function to inspect for context awareness. - This function uses a TypeGuard, allowing Mypy to narrow the type of - the checked function in conditional blocks. + Returns: + True if the function accepts more than two parameters (indicating it's + context-aware), False otherwise. """ return len(inspect.signature(func).parameters) > 2 diff --git a/laygo/pipeline.py b/laygo/pipeline.py index df4e1c5..0f82255 100644 --- a/laygo/pipeline.py +++ b/laygo/pipeline.py @@ -19,12 +19,23 @@ class Pipeline[T]: + """Manages a data source and applies transformers to it. + + A Pipeline provides a high-level interface for data processing by chaining + transformers together. It automatically manages a multiprocessing-safe + shared context that can be accessed by all transformers in the chain. """ - Manages a data source and applies transformers to it. - Always uses a multiprocessing-safe shared context. - """ - def __init__(self, *data: Iterable[T]): + def __init__(self, *data: Iterable[T]) -> None: + """Initialize a pipeline with one or more data sources. + + Args: + *data: One or more iterable data sources. If multiple sources are + provided, they will be chained together. + + Raises: + ValueError: If no data sources are provided. + """ if len(data) == 0: raise ValueError("At least one data source must be provided to Pipeline.") self.data_source: Iterable[T] = itertools.chain.from_iterable(data) if len(data) > 1 else data[0] @@ -39,19 +50,25 @@ def __init__(self, *data: Iterable[T]): # Store reference to original context for final synchronization self._original_context_ref: PipelineContext | None = None - def __del__(self): + def __del__(self) -> None: """Clean up the multiprocessing manager when the pipeline is destroyed.""" try: self._sync_context_back() self._manager.shutdown() except Exception: - pass # Ignore errors during cleanup + pass def context(self, ctx: PipelineContext) -> "Pipeline[T]": - """ - Updates the pipeline context and stores a reference to the original context. + """Update the pipeline context and store a reference to the original context. + When the pipeline finishes processing, the original context will be updated with the final pipeline context data. + + Args: + ctx: The pipeline context to use for this pipeline execution. + + Returns: + The pipeline instance for method chaining. """ # Store reference to the original context self._original_context_ref = ctx @@ -60,9 +77,10 @@ def context(self, ctx: PipelineContext) -> "Pipeline[T]": return self def _sync_context_back(self) -> None: - """ - Synchronize the final pipeline context back to the original context reference. - This is called after processing is complete. + """Synchronize the final pipeline context back to the original context reference. + + This is called after processing is complete to update the original + context with any changes made during pipeline execution. """ if self._original_context_ref is not None: # Copy the final context state back to the original context reference @@ -72,15 +90,16 @@ def _sync_context_back(self) -> None: self._original_context_ref.update(final_context_state) def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]": - """ - Shorthand method to apply a transformation using a lambda function. + """Apply a transformation using a lambda function. + Creates a Transformer under the hood and applies it to the pipeline. + This is a shorthand method for simple transformations. Args: - t: A callable that takes a transformer and returns a transformed transformer + t: A callable that takes a transformer and returns a transformed transformer. Returns: - A new Pipeline with the transformed data + A new Pipeline with the transformed data. """ # Create a new transformer and apply the transformation function transformer = t(Transformer[T, T]()) @@ -101,53 +120,90 @@ def apply[U]( | Callable[[Iterable[T]], Iterator[U]] | Callable[[Iterable[T], PipelineContext], Iterator[U]], ) -> "Pipeline[U]": - """ - Applies a transformer to the current data source. The pipeline's - managed context is passed down. + """Apply a transformer to the current data source. + + The pipeline's managed context is passed down to the transformer. + + Args: + transformer: Either a Transformer instance or a callable function + that processes the data. + + Returns: + A new Pipeline with the transformed data. + + Raises: + TypeError: If the transformer is not a supported type. """ match transformer: case Transformer(): - # The transformer is called with self.ctx, which is the - # shared mp.Manager.dict proxy when inside a 'with' block. self.processed_data = transformer(self.processed_data, self.ctx) # type: ignore case _ if callable(transformer): if is_context_aware(transformer): - processed_transformer = transformer + self.processed_data = transformer(self.processed_data, self.ctx) # type: ignore else: - processed_transformer = lambda data, ctx: transformer(data) # type: ignore # noqa: E731 - self.processed_data = processed_transformer(self.processed_data, self.ctx) # type: ignore + self.processed_data = transformer(self.processed_data) # type: ignore case _: raise TypeError("Transformer must be a Transformer instance or a callable function") return self # type: ignore def buffer(self, size: int) -> "Pipeline[T]": + """Buffer the pipeline using threaded processing. + + Args: + size: The number of worker threads to use for buffering. + + Returns: + The pipeline instance for method chaining. + """ self.apply(ThreadedTransformer(max_workers=size)) return self def __iter__(self) -> Iterator[T]: - """Allows the pipeline to be iterated over.""" + """Allow the pipeline to be iterated over. + + Returns: + An iterator over the processed data. + """ yield from self.processed_data def to_list(self) -> list[T]: - """Executes the pipeline and returns the results as a list.""" + """Execute the pipeline and return the results as a list. + + Returns: + A list containing all processed items from the pipeline. + """ return list(self.processed_data) def each(self, function: PipelineFunction[T]) -> None: - """Applies a function to each element (terminal operation).""" - # Context needs to be accessed from the function if it's context-aware, - # but the pipeline itself doesn't own a context. This is a design choice. - # For simplicity, we assume the function is not context-aware here - # or that context is handled within the Transformers. + """Apply a function to each element (terminal operation). + + Args: + function: The function to apply to each element. + """ for item in self.processed_data: function(item) def first(self, n: int = 1) -> list[T]: - """Gets the first n elements of the pipeline (terminal operation).""" + """Get the first n elements of the pipeline (terminal operation). + + Args: + n: The number of elements to retrieve. + + Returns: + A list containing the first n elements. + + Raises: + AssertionError: If n is less than 1. + """ assert n >= 1, "n must be at least 1" return list(itertools.islice(self.processed_data, n)) def consume(self) -> None: - """Consumes the pipeline without returning results.""" + """Consume the pipeline without returning results. + + This is useful when you want to execute the pipeline for side effects + without collecting the results. + """ for _ in self.processed_data: pass diff --git a/laygo/transformers/http.py b/laygo/transformers/http.py index 5a60934..8385a47 100644 --- a/laygo/transformers/http.py +++ b/laygo/transformers/http.py @@ -1,6 +1,4 @@ -""" -The final, self-sufficient DistributedTransformer with corrected typing. -""" +"""Distributed transformer implementation with HTTP-based worker coordination.""" from collections.abc import Callable from collections.abc import Iterable @@ -37,7 +35,18 @@ def createHTTPTransformer[T]( endpoint: str | None = None, max_workers: int = 4, ) -> "HTTPTransformer[T, T]": - """Create a new identity parallel transformer with an explicit type hint.""" + """Create a new identity HTTP transformer with an explicit type hint. + + Args: + _type_hint: Type hint for the data being processed. + base_url: The base URL for the HTTP worker service. + chunk_size: Size of chunks to process data in. + endpoint: Optional specific endpoint path. + max_workers: Maximum number of concurrent HTTP requests. + + Returns: + A new identity HTTP transformer. + """ return HTTPTransformer[T, T]( base_url=base_url, endpoint=endpoint, @@ -47,12 +56,29 @@ def createHTTPTransformer[T]( class HTTPTransformer(Transformer[In, Out]): - """ - A self-sufficient, chainable transformer that manages its own - distributed execution and worker endpoint definition. + """A self-sufficient, chainable transformer for distributed execution. + + This transformer manages its own distributed execution by coordinating + with HTTP-based worker endpoints. It can automatically generate worker + endpoints based on the transformation logic or use predefined endpoints. """ - def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8, chunk_size: int | None = None): + def __init__( + self, + base_url: str, + endpoint: str | None = None, + max_workers: int = 8, + chunk_size: int | None = None, + ) -> None: + """Initialize the HTTP transformer. + + Args: + base_url: The base URL for the worker service. + endpoint: Optional specific endpoint path. If not provided, + one will be auto-generated. + max_workers: Maximum number of concurrent HTTP requests. + chunk_size: Size of data chunks to process. + """ super().__init__(chunk_size=chunk_size) self.base_url = base_url.rstrip("/") self.endpoint = endpoint @@ -60,8 +86,12 @@ def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int self.session = requests.Session() self._worker_url: str | None = None - def _finalize_config(self): - """Determines the final worker URL, generating one if needed.""" + def _finalize_config(self) -> None: + """Determine the final worker URL, generating one if needed. + + If no explicit endpoint was provided, this method generates a unique + endpoint based on a hash of the transformation logic. + """ if hasattr(self, "_worker_url") and self._worker_url: return @@ -77,14 +107,30 @@ def _finalize_config(self): self.endpoint = path.lstrip("/") self._worker_url = f"{self.base_url}/{self.endpoint}" - # --- Original HTTPTransformer Methods --- - def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: - """CLIENT-SIDE: Called by the Pipeline to start distributed processing.""" + """Execute distributed processing on the data (CLIENT-SIDE). + + This method is called by the Pipeline to start distributed processing. + It sends data chunks to worker endpoints via HTTP. + + Args: + data: The input data to process. + context: Optional pipeline context (currently not used in HTTP mode). + + Returns: + An iterator over the processed data. + """ self._finalize_config() def process_chunk(chunk: list) -> list: - """Target for a thread: sends one chunk to the worker.""" + """Send one chunk to the worker and return the result. + + Args: + chunk: The data chunk to process. + + Returns: + The processed chunk from the worker. + """ try: response = self.session.post( self._worker_url, # type: ignore @@ -111,14 +157,26 @@ def process_chunk(chunk: list) -> list: continue def get_route(self): - """ - Function that returns the route for the worker. - This is used to register the worker in a Flask app or similar. + """Get the route configuration for registering this transformer as a worker. + + This method returns the necessary information to register the worker + in a Flask app or similar web framework. + + Returns: + A tuple containing the endpoint path and the worker view function. """ self._finalize_config() def worker_view_func(chunk: list, context: PipelineContext): - """The actual worker logic for this transformer.""" + """The actual worker logic for this transformer. + + Args: + chunk: The data chunk to process. + context: The pipeline context. + + Returns: + The processed chunk. + """ return self.transformer(chunk, context) return (f"/{self.endpoint}", worker_view_func) @@ -151,8 +209,8 @@ def flatten[T]( # type: ignore super().flatten() # type: ignore return self # type: ignore - def tap(self, function: PipelineFunction[Out, Any]) -> "HTTPTransformer[In, Out]": - super().tap(function) + def tap(self, arg: Union["Transformer[Out, Any]", PipelineFunction[Out, Any]]) -> "HTTPTransformer[In, Out]": + super().tap(arg) return self def apply[T](self, t: Callable[["HTTPTransformer[In, Out]"], "Transformer[In, T]"]) -> "HTTPTransformer[In, T]": diff --git a/laygo/transformers/parallel.py b/laygo/transformers/parallel.py index b058b4a..a85c576 100644 --- a/laygo/transformers/parallel.py +++ b/laygo/transformers/parallel.py @@ -32,9 +32,18 @@ def _process_chunk_for_multiprocessing[In, Out]( shared_context: MutableMapping[str, Any], chunk: list[In], ) -> list[Out]: - """ - Top-level function to process a single chunk. - 'loky' will use cloudpickle to serialize the 'transformer' object. + """Process a single chunk at the top level. + + This function is designed to work with 'loky' which uses cloudpickle + to serialize the 'transformer' object. + + Args: + transformer: The transformation function to apply. + shared_context: The shared context for processing. + chunk: The data chunk to process. + + Returns: + The processed chunk. """ return transformer(chunk, shared_context) # type: ignore @@ -45,7 +54,17 @@ def createParallelTransformer[T]( ordered: bool = True, chunk_size: int | None = None, ) -> "ParallelTransformer[T, T]": - """Create a new identity parallel transformer with an explicit type hint.""" + """Create a new identity parallel transformer with an explicit type hint. + + Args: + _type_hint: Type hint for the data being processed. + max_workers: Maximum number of worker processes. + ordered: Whether to preserve order of results. + chunk_size: Size of chunks to process data in. + + Returns: + A new identity parallel transformer. + """ return ParallelTransformer[T, T]( max_workers=max_workers, ordered=ordered, @@ -55,9 +74,11 @@ def createParallelTransformer[T]( class ParallelTransformer[In, Out](Transformer[In, Out]): - """ - A transformer that executes operations concurrently using multiple processes. - It uses 'loky' to support dynamically created transformation logic. + """A transformer that executes operations concurrently using multiple processes. + + This transformer uses 'loky' to support dynamically created transformation + logic and provides true parallelism by bypassing Python's Global Interpreter + Lock (GIL). It's ideal for CPU-bound operations. """ def __init__( @@ -66,7 +87,16 @@ def __init__( ordered: bool = True, chunk_size: int | None = None, transformer: InternalTransformer[In, Out] | None = None, - ): + ) -> None: + """Initialize the parallel transformer. + + Args: + max_workers: Maximum number of worker processes. + ordered: If True, results are yielded in order. If False, results + are yielded as they complete. + chunk_size: Size of data chunks to process. + transformer: The transformation logic chain. + """ super().__init__(chunk_size, transformer) self.max_workers = max_workers self.ordered = ordered @@ -79,6 +109,17 @@ def from_transformer[T, U]( max_workers: int = 4, ordered: bool = True, ) -> "ParallelTransformer[T, U]": + """Create a ParallelTransformer from an existing Transformer's logic. + + Args: + transformer: The base transformer to copy the transformation logic from. + chunk_size: Optional chunk size override. + max_workers: Maximum number of worker processes. + ordered: If True, results are yielded in order. + + Returns: + A new ParallelTransformer with the same transformation logic. + """ return cls( chunk_size=chunk_size or transformer.chunk_size, transformer=copy.deepcopy(transformer.transformer), # type: ignore @@ -87,9 +128,16 @@ def from_transformer[T, U]( ) def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: - """ - Executes the transformer on data concurrently. It uses the shared - context provided by the Pipeline, if available. + """Execute the transformer on data concurrently. + + It uses the shared context provided by the Pipeline, if available. + + Args: + data: The input data to process. + context: Optional pipeline context for shared state. + + Returns: + An iterator over the transformed data. """ run_context = context if context is not None else self.context @@ -100,8 +148,6 @@ def __call__(self, data: Iterable[In], context: PipelineContext | None = None) - # Use the existing shared context and lock from the Pipeline. shared_context = run_context yield from self._execute_with_context(data, shared_context) - # The context is live, so no need to update it here. - # The Pipeline's __exit__ will handle final state. else: # Fallback for standalone use: create a temporary manager. with mp.Manager() as manager: @@ -118,7 +164,15 @@ def __call__(self, data: Iterable[In], context: PipelineContext | None = None) - run_context.update(final_context_state) def _execute_with_context(self, data: Iterable[In], shared_context: MutableMapping[str, Any]) -> Iterator[Out]: - """Helper to run the execution logic with a given context.""" + """Execute the transformation logic with a given context. + + Args: + data: The input data to process. + shared_context: The shared context for the execution. + + Returns: + An iterator over the transformed data. + """ executor = get_reusable_executor(max_workers=self.max_workers) chunks_to_process = self._chunk_generator(data) @@ -128,14 +182,22 @@ def _execute_with_context(self, data: Iterable[In], shared_context: MutableMappi for result_chunk in processed_chunks_iterator: yield from result_chunk - # ... The rest of the file remains the same ... def _ordered_generator( self, chunks_iter: Iterator[list[In]], executor: ProcessPoolExecutor, shared_context: MutableMapping[str, Any], ) -> Iterator[list[Out]]: - """Generate results in their original order.""" + """Generate results in their original order. + + Args: + chunks_iter: Iterator over data chunks. + executor: The process pool executor. + shared_context: The shared context for processing. + + Returns: + An iterator over processed chunks in order. + """ futures: deque[Future[list[Out]]] = deque() for _ in range(self.max_workers + 1): try: @@ -171,7 +233,16 @@ def _unordered_generator( executor: ProcessPoolExecutor, shared_context: MutableMapping[str, Any], ) -> Iterator[list[Out]]: - """Generate results as they complete.""" + """Generate results as they complete. + + Args: + chunks_iter: Iterator over data chunks. + executor: The process pool executor. + shared_context: The shared context for processing. + + Returns: + An iterator over processed chunks as they complete. + """ futures = { executor.submit( _process_chunk_for_multiprocessing, @@ -226,8 +297,8 @@ def flatten[T]( # type: ignore super().flatten() # type: ignore return self # type: ignore - def tap(self, function: PipelineFunction[Out, Any]) -> "ParallelTransformer[In, Out]": - super().tap(function) + def tap(self, arg: Union["Transformer[Out, Any]", PipelineFunction[Out, Any]]) -> "ParallelTransformer[In, Out]": + super().tap(arg) return self def apply[T]( diff --git a/laygo/transformers/threaded.py b/laygo/transformers/threaded.py index 58c661e..f49d643 100644 --- a/laygo/transformers/threaded.py +++ b/laygo/transformers/threaded.py @@ -39,7 +39,17 @@ def createThreadedTransformer[T]( ordered: bool = True, chunk_size: int = DEFAULT_CHUNK_SIZE, ) -> "ThreadedTransformer[T, T]": - """Create a new identity threaded transformer with an explicit type hint.""" + """Create a new identity threaded transformer with an explicit type hint. + + Args: + _type_hint: Type hint for the data being processed. + max_workers: Maximum number of worker threads. + ordered: Whether to preserve order of results. + chunk_size: Size of chunks to process data in. + + Returns: + A new identity threaded transformer. + """ return ThreadedTransformer[T, T]( max_workers=max_workers, ordered=ordered, @@ -49,8 +59,11 @@ def createThreadedTransformer[T]( class ThreadedTransformer[In, Out](Transformer[In, Out]): - """ - A transformer that executes operations concurrently using multiple threads. + """A transformer that executes operations concurrently using multiple threads. + + This transformer processes data chunks in parallel using a thread pool, + which is effective for I/O-bound operations but may be limited by the + Global Interpreter Lock (GIL) for CPU-bound tasks. """ def __init__( @@ -59,9 +72,8 @@ def __init__( ordered: bool = True, chunk_size: int | None = None, transformer: InternalTransformer[In, Out] | None = None, - ): - """ - Initialize the threaded transformer. + ) -> None: + """Initialize the threaded transformer. Args: max_workers: Maximum number of worker threads. @@ -82,8 +94,7 @@ def from_transformer[T, U]( max_workers: int = 4, ordered: bool = True, ) -> "ThreadedTransformer[T, U]": - """ - Create a ThreadedTransformer from an existing Transformer's logic. + """Create a ThreadedTransformer from an existing Transformer's logic. Args: transformer: The base transformer to copy the transformation logic from. @@ -102,9 +113,16 @@ def from_transformer[T, U]( ) def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: - """ - Executes the transformer on data concurrently. It uses the shared - context provided by the Pipeline, if available. + """Execute the transformer on data concurrently. + + It uses the shared context provided by the Pipeline, if available. + + Args: + data: The input data to process. + context: Optional pipeline context for shared state. + + Returns: + An iterator over the transformed data. """ run_context = context if context is not None else self.context @@ -115,8 +133,6 @@ def __call__(self, data: Iterable[In], context: PipelineContext | None = None) - # Use the existing shared context and lock from the Pipeline. shared_context = run_context yield from self._execute_with_context(data, shared_context) - # The context is live, so no need to update it here. - # The Pipeline's __del__ will handle final state. else: # Fallback for standalone use: create a thread-safe context. # Since threads share memory, we can use the context directly with a lock. @@ -124,15 +140,27 @@ def __call__(self, data: Iterable[In], context: PipelineContext | None = None) - run_context["lock"] = threading.Lock() yield from self._execute_with_context(data, run_context) - # Context is already updated in-place for threads (shared memory) def _execute_with_context(self, data: Iterable[In], shared_context: MutableMapping[str, Any]) -> Iterator[Out]: - """Helper to run the execution logic with a given context.""" + """Execute the transformation logic with a given context. + + Args: + data: The input data to process. + shared_context: The shared context for the execution. + + Returns: + An iterator over the transformed data. + """ def process_chunk(chunk: list[In], shared_context: MutableMapping[str, Any]) -> list[Out]: - """ - Process a single chunk by passing the chunk and context explicitly - to the transformer chain. This is safer and avoids mutating self. + """Process a single chunk by passing the chunk and context explicitly. + + Args: + chunk: The data chunk to process. + shared_context: The shared context for processing. + + Returns: + The processed chunk. """ return self.transformer(chunk, shared_context) # type: ignore @@ -211,8 +239,8 @@ def flatten[T]( # type: ignore super().flatten() # type: ignore return self # type: ignore - def tap(self, function: PipelineFunction[Out, Any]) -> "ThreadedTransformer[In, Out]": - super().tap(function) + def tap(self, arg: Union["Transformer[Out, Any]", PipelineFunction[Out, Any]]) -> "ThreadedTransformer[In, Out]": + super().tap(arg) return self def apply[T]( diff --git a/laygo/transformers/transformer.py b/laygo/transformers/transformer.py index 361bfce..6f7bdd6 100644 --- a/laygo/transformers/transformer.py +++ b/laygo/transformers/transformer.py @@ -1,3 +1,5 @@ +"""Core transformer implementation for data pipeline operations.""" + from collections.abc import Callable from collections.abc import Iterable from collections.abc import Iterator @@ -26,14 +28,28 @@ def createTransformer[T](_type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SIZE) -> "Transformer[T, T]": - """Create a new identity pipeline with an explicit type hint.""" + """Create a new identity pipeline with an explicit type hint. + + Args: + _type_hint: Type hint for the data being processed. + chunk_size: Size of chunks to process data in. + + Returns: + A new identity transformer that passes data through unchanged. + """ return Transformer[T, T](chunk_size=chunk_size) # type: ignore def build_chunk_generator[T](chunk_size: int) -> Callable[[Iterable[T]], Iterator[list[T]]]: - """ - Returns a function that breaks an iterable into chunks of a specified size. + """Return a function that breaks an iterable into chunks of a specified size. + This is useful for creating transformers that process data in manageable chunks. + + Args: + chunk_size: The size of each chunk. + + Returns: + A function that takes an iterable and returns an iterator of chunks. """ def chunk_generator(data: Iterable[T]) -> Iterator[list[T]]: @@ -45,15 +61,25 @@ def chunk_generator(data: Iterable[T]) -> Iterator[list[T]]: class Transformer[In, Out]: - """ - Defines and composes data transformations by passing context explicitly. + """Define and compose data transformations by passing context explicitly. + + A Transformer represents a data processing pipeline that can be chained + together with other transformers. It supports context-aware operations, + error handling, and chunked processing for memory efficiency. """ def __init__( self, chunk_size: int | None = DEFAULT_CHUNK_SIZE, transformer: InternalTransformer[In, Out] | None = None, - ): + ) -> None: + """Initialize a new transformer. + + Args: + chunk_size: Size of chunks to process data in. If None, processes + all data as a single chunk. + transformer: Optional existing transformer logic to use. + """ self.chunk_size = chunk_size self.context: PipelineContext = PipelineContext() # The default transformer now accepts and ignores a context argument. @@ -67,21 +93,42 @@ def from_transformer[T, U]( transformer: "Transformer[T, U]", chunk_size: int | None = None, ) -> "Transformer[T, U]": - """Create a new transformer from an existing one, copying its logic.""" + """Create a new transformer from an existing one, copying its logic. + + Args: + transformer: The source transformer to copy logic from. + chunk_size: Optional chunk size override. + + Returns: + A new transformer with the same logic as the source. + """ return cls( chunk_size=chunk_size or transformer.chunk_size, transformer=copy.deepcopy(transformer.transformer), # type: ignore ) def set_chunker(self, chunker: Callable[[Iterable[In]], Iterator[list[In]]]) -> "Transformer[In, Out]": - """Sets a custom chunking function for the transformer.""" + """Set a custom chunking function for the transformer. + + Args: + chunker: A function that takes an iterable and returns an iterator + of chunks. + + Returns: + The transformer instance for method chaining. + """ self._chunk_generator = chunker return self def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "Transformer[In, Out]": - """Registers an error handler for the transformer.""" - # This method is a placeholder for future error handling logic. - # Currently, it does not modify the transformer behavior. + """Register an error handler for the transformer. + + Args: + handler: Either an ErrorHandler instance or a chunk error handler function. + + Returns: + The transformer instance for method chaining. + """ match handler: case ErrorHandler(): self.error_handler = handler @@ -90,21 +137,43 @@ def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "Trans return self def _pipe[U](self, operation: Callable[[list[Out], PipelineContext], list[U]]) -> "Transformer[In, U]": - """Composes the current transformer with a new context-aware operation.""" + """Compose the current transformer with a new context-aware operation. + + Args: + operation: A function that takes a chunk and context, returning a transformed chunk. + + Returns: + A new transformer with the composed operation. + """ prev_transformer = self.transformer # The new transformer chain ensures the context `ctx` is passed at each step. self.transformer = lambda chunk, ctx: operation(prev_transformer(chunk, ctx), ctx) # type: ignore return self # type: ignore def map[U](self, function: PipelineFunction[Out, U]) -> "Transformer[In, U]": - """Transforms elements, passing context explicitly to the mapping function.""" + """Transform elements, passing context explicitly to the mapping function. + + Args: + function: A function to apply to each element. Can be context-aware. + + Returns: + A new transformer with the mapping operation applied. + """ if is_context_aware(function): return self._pipe(lambda chunk, ctx: [function(x, ctx) for x in chunk]) return self._pipe(lambda chunk, _ctx: [function(x) for x in chunk]) # type: ignore def filter(self, predicate: PipelineFunction[Out, bool]) -> "Transformer[In, Out]": - """Filters elements, passing context explicitly to the predicate function.""" + """Filter elements, passing context explicitly to the predicate function. + + Args: + predicate: A function that returns True for elements to keep. + Can be context-aware. + + Returns: + A transformer with the filtering operation applied. + """ if is_context_aware(predicate): return self._pipe(lambda chunk, ctx: [x for x in chunk if predicate(x, ctx)]) @@ -120,7 +189,14 @@ def flatten[T](self: "Transformer[In, set[T]]") -> "Transformer[In, T]": ... def flatten[T]( self: Union["Transformer[In, list[T]]", "Transformer[In, tuple[T, ...]]", "Transformer[In, set[T]]"], ) -> "Transformer[In, T]": - """Flattens nested lists; the context is passed through the operation.""" + """Flatten nested collections into individual elements. + + Args: + self: A transformer that outputs collections (list, tuple, or set). + + Returns: + A transformer that outputs individual elements from the collections. + """ return self._pipe(lambda chunk, ctx: [item for sublist in chunk for item in sublist]) # type: ignore @overload @@ -133,20 +209,22 @@ def tap( self, arg: Union["Transformer[Out, Any]", PipelineFunction[Out, Any]], ) -> "Transformer[In, Out]": - """ - Applies a side-effect without modifying the main data stream. + """Apply a side-effect without modifying the main data stream. This method can be used in two ways: - 1. With a `Transformer`: Applies a sub-pipeline to each chunk for side-effects - (e.g., logging a chunk), discarding the sub-pipeline's output. - 2. With a `function`: Applies a function to each element individually for - side-effects (e.g., printing an item). + 1. With a `Transformer`: Applies a sub-pipeline to each chunk for side-effects + (e.g., logging a chunk), discarding the sub-pipeline's output. + 2. With a `function`: Applies a function to each element individually for + side-effects (e.g., printing an item). Args: arg: A `Transformer` instance or a function to be applied for side-effects. Returns: The transformer instance for method chaining. + + Raises: + TypeError: If the argument is not a Transformer or callable. """ match arg: # Case 1: The argument is another Transformer @@ -173,7 +251,14 @@ def operation(chunk: list[Out], ctx: PipelineContext) -> list[Out]: raise TypeError(f"tap() argument must be a Transformer or a callable, not {type(arg).__name__}") def apply[T](self, t: Callable[[Self], "Transformer[In, T]"]) -> "Transformer[In, T]": - """Apply another pipeline to the current one.""" + """Apply another pipeline to the current one. + + Args: + t: A function that takes this transformer and returns a new transformer. + + Returns: + The result of applying the function to this transformer. + """ return t(self) def loop( @@ -225,11 +310,17 @@ def operation(chunk: list[Out], ctx: PipelineContext) -> list[Out]: return self._pipe(operation) def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]: - """ - Executes the transformer on a data source. + """Execute the transformer on a data source. It uses the provided `context` by reference. If none is provided, it uses the transformer's internal context. + + Args: + data: The input data to process. + context: Optional pipeline context to use during processing. + + Returns: + An iterator over the transformed data. """ # Use the provided context by reference, or default to the instance's context. run_context = context or self.context @@ -239,7 +330,15 @@ def __call__(self, data: Iterable[In], context: PipelineContext | None = None) - yield from self.transformer(chunk, run_context) def reduce[U](self, function: PipelineReduceFunction[U, Out], initial: U): - """Reduces elements to a single value (terminal operation).""" + """Reduce elements to a single value (terminal operation). + + Args: + function: The reduction function. Can be context-aware. + initial: The initial value for the reduction. + + Returns: + A function that executes the reduction when called with data. + """ if is_context_aware_reduce(function): @@ -272,9 +371,16 @@ def catch[U]( sub_pipeline_builder: Callable[["Transformer[Out, Out]"], "Transformer[Out, U]"], on_error: ChunkErrorHandler[Out, U] | None = None, ) -> "Transformer[In, U]": - """ - Isolates a sub-pipeline in a chunk-based try-catch block. + """Isolate a sub-pipeline in a chunk-based try-catch block. + If the sub-pipeline fails for a chunk, the on_error handler is invoked. + + Args: + sub_pipeline_builder: A function that builds the sub-pipeline to protect. + on_error: Optional error handler for when the sub-pipeline fails. + + Returns: + A transformer with error handling applied. """ if on_error: @@ -299,8 +405,7 @@ def operation(chunk: list[Out], ctx: PipelineContext) -> list[U]: return self._pipe(operation) # type: ignore def short_circuit(self, function: Callable[[PipelineContext], bool | None]) -> "Transformer[In, Out]": - """ - Executes a function on the context before processing the next step for a chunk. + """Execute a function on the context before processing the next step for a chunk. This can be used for short-circuiting by raising an exception based on the context's state, which halts the pipeline. If the function executes @@ -309,10 +414,15 @@ def short_circuit(self, function: Callable[[PipelineContext], bool | None]) -> " Args: function: A callable that accepts the `PipelineContext` as its sole - argument. Its return value is ignored. + argument. If it returns True, the pipeline is stopped with + an exception. Returns: The transformer instance for method chaining. + + Raises: + RuntimeError: If the function returns True, indicating a short-circuit + condition has been met. """ def operation(chunk: list[Out], ctx: PipelineContext) -> list[Out]: