diff --git a/README.md b/README.md index 799482e..976fbe3 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ **Laygo** is the lightweight Python library for data pipelines that I wish existed when I first started. It's designed from the ground up to make data engineering simpler, cleaner, and more intuitive, letting you build resilient, in-memory data workflows with an elegant, fluent API. -It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead. +It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead. **Key Features:** @@ -31,6 +31,8 @@ It's built to grow with you. Scale seamlessly from a single local script to thou - **Effortless Parallelism**: Accelerate CPU-intensive tasks seamlessly. +- **Fan-out Processing**: Split pipelines into multiple concurrent branches for parallel analysis of the same dataset. + - **Distributed by Design**: Your pipeline script is both the manager and the worker. When deployed as a serverless function or a container, this design allows you to scale out massively by simply running more instances of the same code. Your logic scales the same way on a thousand cores as it does on one. - **Powerful Context Management**: Share state and configuration across your entire pipeline for advanced, stateful processing. @@ -197,6 +199,110 @@ results = ( ) ``` +### Pipeline Branching (Fan-out Processing) + +```python +from laygo import Pipeline +from laygo.transformers.transformer import createTransformer + +# Sample data: customer orders +orders = [ + {"id": 1, "customer": "Alice", "amount": 150, "product": "laptop"}, + {"id": 2, "customer": "Bob", "amount": 25, "product": "book"}, + {"id": 3, "customer": "Charlie", "amount": 75, "product": "headphones"}, + {"id": 4, "customer": "Diana", "amount": 200, "product": "monitor"}, + {"id": 5, "customer": "Eve", "amount": 30, "product": "mouse"}, +] + +# Create different analysis branches +high_value_analysis = ( + createTransformer(dict) + .filter(lambda order: order["amount"] > 100) + .map(lambda order: { + "customer": order["customer"], + "amount": order["amount"], + "category": "high_value" + }) +) + +product_summary = ( + createTransformer(dict) + .map(lambda order: {"product": order["product"], "count": 1}) + # Group by product and sum counts (simplified example) +) + +customer_spending = ( + createTransformer(dict) + .map(lambda order: { + "customer": order["customer"], + "total_spent": order["amount"] + }) +) + +# Branch the pipeline into multiple concurrent analyses +results = Pipeline(orders).branch({ + "high_value_orders": high_value_analysis, + "products": product_summary, + "customer_totals": customer_spending +}) + +print("High value orders:", results["high_value_orders"]) +# [{'customer': 'Alice', 'amount': 150, 'category': 'high_value'}, +# {'customer': 'Diana', 'amount': 200, 'category': 'high_value'}] + +print("Product analysis:", len(results["products"])) +# 5 (all products processed) + +print("Customer spending:", len(results["customer_totals"])) +# 5 (all customers processed) +``` + +### Advanced Branching with Error Isolation + +```python +from laygo import Pipeline +from laygo.transformers.transformer import createTransformer + +# Data with potential issues +mixed_data = [1, 2, "invalid", 4, 5, None, 7, 8] + +# Branch 1: Safe numeric processing +safe_numbers = ( + createTransformer(int | str | None) + .filter(lambda x: isinstance(x, int) and x is not None) + .map(lambda x: x * 2) +) + +# Branch 2: String processing with error handling +string_processing = ( + createTransformer(int | str | None) + .filter(lambda x: isinstance(x, str)) + .map(lambda x: f"processed_{x}") + .catch(lambda t: t.map(lambda x: "error_handled")) +) + +# Branch 3: Statistical analysis +stats_analysis = ( + createTransformer(int | str | None) + .filter(lambda x: isinstance(x, int) and x is not None) + .map(lambda x: x) # Pass through for stats +) + +# Execute all branches concurrently +results = Pipeline(mixed_data).branch({ + "numbers": safe_numbers, + "strings": string_processing, + "stats": stats_analysis +}, batch_size=100) + +print("Processed numbers:", results["numbers"]) # [2, 4, 8, 10, 14, 16] +print("Processed strings:", results["strings"]) # ['processed_invalid'] +print("Stats data:", results["stats"]) # [1, 2, 4, 5, 7, 8] + +# Each branch processes the complete dataset independently +# Errors in one branch don't affect others +``` + ### Error Handling and Recovery ```python diff --git a/laygo/pipeline.py b/laygo/pipeline.py index a637085..c94dfd1 100644 --- a/laygo/pipeline.py +++ b/laygo/pipeline.py @@ -27,6 +27,21 @@ class Pipeline[T]: A Pipeline provides a high-level interface for data processing by chaining transformers together. It automatically manages a multiprocessing-safe shared context that can be accessed by all transformers in the chain. + + The Pipeline supports both streaming and batch processing patterns, with + built-in support for buffering, branching (fan-out), and parallel processing. + + Example: + >>> data = [1, 2, 3, 4, 5] + >>> result = (Pipeline(data) + ... .transform(lambda t: t.filter(lambda x: x % 2 == 0)) + ... .transform(lambda t: t.map(lambda x: x * 2)) + ... .to_list()) + >>> result # [4, 8] + + Note: + Most pipeline operations consume the internal iterator, making the + pipeline effectively single-use unless the data source is re-initialized. """ def __init__(self, *data: Iterable[T]) -> None: @@ -64,14 +79,22 @@ def __del__(self) -> None: def context(self, ctx: PipelineContext) -> "Pipeline[T]": """Update the pipeline context and store a reference to the original context. - When the pipeline finishes processing, the original context will be updated - with the final pipeline context data. + The provided context will be used during pipeline execution and any + modifications made by transformers will be synchronized back to the + original context when the pipeline finishes processing. Args: - ctx: The pipeline context to use for this pipeline execution. + ctx: The pipeline context dictionary to use for this pipeline execution. + This should be a mutable dictionary-like object that transformers + can use to share state and communicate. Returns: The pipeline instance for method chaining. + + Note: + Changes made to the context during pipeline execution will be + automatically synchronized back to the original context object + when the pipeline is destroyed or processing completes. """ # Store reference to the original context self._original_context_ref = ctx @@ -96,13 +119,21 @@ def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> " """Apply a transformation using a lambda function. Creates a Transformer under the hood and applies it to the pipeline. - This is a shorthand method for simple transformations. + This is a shorthand method for simple transformations that allows + chaining transformer operations in a functional style. Args: t: A callable that takes a transformer and returns a transformed transformer. + Typically used with lambda expressions like: + `lambda t: t.map(func).filter(predicate)` Returns: - A new Pipeline with the transformed data. + A new Pipeline with the transformed data type. + + Example: + >>> pipeline = Pipeline([1, 2, 3, 4, 5]) + >>> result = pipeline.transform(lambda t: t.filter(lambda x: x % 2 == 0).map(lambda x: x * 2)) + >>> result.to_list() # [4, 8] """ # Create a new transformer and apply the transformation function transformer = t(Transformer[T, T]()) @@ -125,17 +156,28 @@ def apply[U]( ) -> "Pipeline[U]": """Apply a transformer to the current data source. - The pipeline's managed context is passed down to the transformer. + This method accepts various types of transformers and applies them to + the pipeline data. The pipeline's managed context is automatically + passed to context-aware transformers. Args: - transformer: Either a Transformer instance or a callable function - that processes the data. + transformer: One of the following: + - A Transformer instance (preferred for complex operations) + - A callable function that takes an iterable and returns an iterator + - A context-aware callable that takes an iterable and context Returns: - A new Pipeline with the transformed data. + The same Pipeline instance with transformed data (for method chaining). Raises: TypeError: If the transformer is not a supported type. + + Example: + >>> pipeline = Pipeline([1, 2, 3]) + >>> # Using a Transformer instance + >>> pipeline.apply(createTransformer(int).map(lambda x: x * 2)) + >>> # Using a simple function + >>> pipeline.apply(lambda data: (x * 2 for x in data)) """ match transformer: case Transformer(): @@ -157,7 +199,34 @@ def branch( max_batch_buffer: int = 1, use_queue_chunks: bool = True, ) -> dict[str, list[Any]]: - """Forks the pipeline into multiple branches for concurrent, parallel processing.""" + """Forks the pipeline into multiple branches for concurrent, parallel processing. + + This is a **terminal operation** that implements a fan-out pattern where + the entire dataset is copied to each branch for independent processing. + Each branch processes the complete dataset concurrently using separate + transformers, and results are collected and returned in a dictionary. + + Args: + branches: A dictionary where keys are branch names (str) and values + are `Transformer` instances of any subtype. + batch_size: The number of items to batch together when sending data + to branches. Larger batches can improve throughput but + use more memory. Defaults to 1000. + max_batch_buffer: The maximum number of batches to buffer for each + branch queue. Controls memory usage and creates + backpressure. Defaults to 1. + use_queue_chunks: Whether to use passthrough chunking for the + transformers. When True, batches are processed + as chunks. Defaults to True. + + Returns: + A dictionary where keys are the branch names and values are lists + of all items processed by that branch's transformer. + + Note: + This operation consumes the pipeline's iterator, making subsequent + operations on the same pipeline return empty results. + """ if not branches: self.consume() return {} @@ -258,24 +327,47 @@ def _producer() -> None: def __iter__(self) -> Iterator[T]: """Allow the pipeline to be iterated over. + This makes the Pipeline compatible with Python's iterator protocol, + allowing it to be used in for loops, list comprehensions, and other + contexts that expect an iterable. + Returns: An iterator over the processed data. + + Note: + This operation consumes the pipeline's iterator, making subsequent + operations on the same pipeline return empty results. """ yield from self.processed_data def to_list(self) -> list[T]: """Execute the pipeline and return the results as a list. + This is a terminal operation that consumes the pipeline's iterator + and materializes all results into memory. + Returns: A list containing all processed items from the pipeline. + + Note: + This operation consumes the pipeline's iterator, making subsequent + operations on the same pipeline return empty results. """ return list(self.processed_data) def each(self, function: PipelineFunction[T]) -> None: """Apply a function to each element (terminal operation). + This is a terminal operation that processes each element for side effects + and consumes the pipeline's iterator without returning results. + Args: - function: The function to apply to each element. + function: The function to apply to each element. Should be used for + side effects like logging, updating external state, etc. + + Note: + This operation consumes the pipeline's iterator, making subsequent + operations on the same pipeline return empty results. """ for item in self.processed_data: function(item) @@ -283,23 +375,36 @@ def each(self, function: PipelineFunction[T]) -> None: def first(self, n: int = 1) -> list[T]: """Get the first n elements of the pipeline (terminal operation). + This is a terminal operation that consumes up to n elements from the + pipeline's iterator and returns them as a list. + Args: - n: The number of elements to retrieve. + n: The number of elements to retrieve. Must be at least 1. Returns: - A list containing the first n elements. + A list containing the first n elements, or fewer if the pipeline + contains fewer than n elements. Raises: AssertionError: If n is less than 1. + + Note: + This operation partially consumes the pipeline's iterator. Subsequent + operations will continue from where this operation left off. """ assert n >= 1, "n must be at least 1" return list(itertools.islice(self.processed_data, n)) def consume(self) -> None: - """Consume the pipeline without returning results. + """Consume the pipeline without returning results (terminal operation). + + This is a terminal operation that processes all elements in the pipeline + for their side effects without materializing any results. Useful when + the pipeline operations have side effects and you don't need the results. - This is useful when you want to execute the pipeline for side effects - without collecting the results. + Note: + This operation consumes the pipeline's iterator, making subsequent + operations on the same pipeline return empty results. """ for _ in self.processed_data: pass diff --git a/wiki/Home.md b/wiki/Home.md index d86b8cb..7738101 100644 --- a/wiki/Home.md +++ b/wiki/Home.md @@ -19,7 +19,7 @@ **Laygo** is the lightweight Python library for data pipelines that I wish existed when I first started. It's designed from the ground up to make data engineering simpler, cleaner, and more intuitive, letting you build resilient, in-memory data workflows with an elegant, fluent API. -It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead. +It's built to grow with you. Scale seamlessly from a single local script to thousands of concurrent serverless functions with minimal operational overhead. **Key Features:** @@ -31,6 +31,8 @@ It's built to grow with you. Scale seamlessly from a single local script to thou - **Effortless Parallelism**: Accelerate CPU-intensive tasks seamlessly. +- **Fan-out Processing**: Split pipelines into multiple concurrent branches for parallel analysis of the same dataset. + - **Distributed by Design**: Your pipeline script is both the manager and the worker. When deployed as a serverless function or a container, this design allows you to scale out massively by simply running more instances of the same code. Your logic scales the same way on a thousand cores as it does on one. - **Powerful Context Management**: Share state and configuration across your entire pipeline for advanced, stateful processing. @@ -197,6 +199,110 @@ results = ( ) ``` +### Pipeline Branching (Fan-out Processing) + +```python +from laygo import Pipeline +from laygo.transformers.transformer import createTransformer + +# Sample data: customer orders +orders = [ + {"id": 1, "customer": "Alice", "amount": 150, "product": "laptop"}, + {"id": 2, "customer": "Bob", "amount": 25, "product": "book"}, + {"id": 3, "customer": "Charlie", "amount": 75, "product": "headphones"}, + {"id": 4, "customer": "Diana", "amount": 200, "product": "monitor"}, + {"id": 5, "customer": "Eve", "amount": 30, "product": "mouse"}, +] + +# Create different analysis branches +high_value_analysis = ( + createTransformer(dict) + .filter(lambda order: order["amount"] > 100) + .map(lambda order: { + "customer": order["customer"], + "amount": order["amount"], + "category": "high_value" + }) +) + +product_summary = ( + createTransformer(dict) + .map(lambda order: {"product": order["product"], "count": 1}) + # Group by product and sum counts (simplified example) +) + +customer_spending = ( + createTransformer(dict) + .map(lambda order: { + "customer": order["customer"], + "total_spent": order["amount"] + }) +) + +# Branch the pipeline into multiple concurrent analyses +results = Pipeline(orders).branch({ + "high_value_orders": high_value_analysis, + "products": product_summary, + "customer_totals": customer_spending +}) + +print("High value orders:", results["high_value_orders"]) +# [{'customer': 'Alice', 'amount': 150, 'category': 'high_value'}, +# {'customer': 'Diana', 'amount': 200, 'category': 'high_value'}] + +print("Product analysis:", len(results["products"])) +# 5 (all products processed) + +print("Customer spending:", len(results["customer_totals"])) +# 5 (all customers processed) +``` + +### Advanced Branching with Error Isolation + +```python +from laygo import Pipeline +from laygo.transformers.transformer import createTransformer + +# Data with potential issues +mixed_data = [1, 2, "invalid", 4, 5, None, 7, 8] + +# Branch 1: Safe numeric processing +safe_numbers = ( + createTransformer(int | str | None) + .filter(lambda x: isinstance(x, int) and x is not None) + .map(lambda x: x * 2) +) + +# Branch 2: String processing with error handling +string_processing = ( + createTransformer(int | str | None) + .filter(lambda x: isinstance(x, str)) + .map(lambda x: f"processed_{x}") + .catch(lambda t: t.map(lambda x: "error_handled")) +) + +# Branch 3: Statistical analysis +stats_analysis = ( + createTransformer(int | str | None) + .filter(lambda x: isinstance(x, int) and x is not None) + .map(lambda x: x) # Pass through for stats +) + +# Execute all branches concurrently +results = Pipeline(mixed_data).branch({ + "numbers": safe_numbers, + "strings": string_processing, + "stats": stats_analysis +}, batch_size=100) + +print("Processed numbers:", results["numbers"]) # [2, 4, 8, 10, 14, 16] +print("Processed strings:", results["strings"]) # ['processed_invalid'] +print("Stats data:", results["stats"]) # [1, 2, 4, 5, 7, 8] + +# Each branch processes the complete dataset independently +# Errors in one branch don't affect others +``` + ### Error Handling and Recovery ```python