From 55c5a679a014ae5edb47a613108fa8ede44f2b81 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Wed, 30 Jul 2025 20:31:48 +0000 Subject: [PATCH 1/2] feat: implemented execution strategy pattern --- laygo/__init__.py | 16 +- laygo/pipeline.py | 19 +- laygo/transformers/http.py | 3 +- laygo/transformers/parallel.py | 236 ------------------ laygo/transformers/strategies/http.py | 0 laygo/transformers/strategies/process.py | 94 ++++++++ laygo/transformers/strategies/sequential.py | 8 + laygo/transformers/strategies/threaded.py | 114 +++++++++ laygo/transformers/strategies/types.py | 25 ++ laygo/transformers/threaded.py | 251 -------------------- laygo/transformers/transformer.py | 94 +++++--- laygo/transformers/types.py | 31 +++ tests/test_integration.py | 26 +- tests/test_parallel_transformer.py | 50 ++-- tests/test_pipeline.py | 78 +++--- tests/test_threaded_transformer.py | 64 +++-- tests/test_transformer.py | 94 ++++---- 17 files changed, 503 insertions(+), 700 deletions(-) delete mode 100644 laygo/transformers/parallel.py create mode 100644 laygo/transformers/strategies/http.py create mode 100644 laygo/transformers/strategies/process.py create mode 100644 laygo/transformers/strategies/sequential.py create mode 100644 laygo/transformers/strategies/threaded.py create mode 100644 laygo/transformers/strategies/types.py delete mode 100644 laygo/transformers/threaded.py create mode 100644 laygo/transformers/types.py diff --git a/laygo/__init__.py b/laygo/__init__.py index ce85957..e5b4829 100644 --- a/laygo/__init__.py +++ b/laygo/__init__.py @@ -9,23 +9,19 @@ from laygo.pipeline import Pipeline from laygo.transformers.http import HTTPTransformer from laygo.transformers.http import createHTTPTransformer -from laygo.transformers.parallel import ParallelTransformer -from laygo.transformers.parallel import createParallelTransformer -from laygo.transformers.threaded import ThreadedTransformer -from laygo.transformers.threaded import createThreadedTransformer from laygo.transformers.transformer import Transformer from laygo.transformers.transformer import build_chunk_generator -from laygo.transformers.transformer import createTransformer +from laygo.transformers.transformer import create_process_transformer +from laygo.transformers.transformer import create_threaded_transformer +from laygo.transformers.transformer import create_transformer from laygo.transformers.transformer import passthrough_chunks __all__ = [ "Pipeline", "Transformer", - "createTransformer", - "ThreadedTransformer", - "createThreadedTransformer", - "ParallelTransformer", - "createParallelTransformer", + "create_transformer", + "create_threaded_transformer", + "create_process_transformer", "HTTPTransformer", "createHTTPTransformer", "PipelineContext", diff --git a/laygo/pipeline.py b/laygo/pipeline.py index 4f3da88..effd8b9 100644 --- a/laygo/pipeline.py +++ b/laygo/pipeline.py @@ -20,6 +20,7 @@ from laygo.context.types import IContextHandle from laygo.helpers import is_context_aware from laygo.transformers.transformer import Transformer +from laygo.transformers.types import BaseTransformer T = TypeVar("T") U = TypeVar("U") @@ -27,7 +28,9 @@ # This function must be defined at the top level of the module (e.g., after imports) -def _branch_consumer_process[T](transformer: Transformer, queue: "Queue", context_handle: IContextHandle) -> list[Any]: +def _branch_consumer_process[T]( + transformer: BaseTransformer, queue: "Queue", context_handle: IContextHandle +) -> list[Any]: """Entry point for a consumer process in parallel branching. Reconstructs the necessary objects and runs a dedicated pipeline instance @@ -457,7 +460,7 @@ def _producer_broadcast( @overload def branch( self, - branches: Mapping[str, Transformer[T, Any]], + branches: Mapping[str, BaseTransformer[T, Any]], *, executor_type: Literal["thread", "process"] = "thread", batch_size: int = 1000, @@ -468,7 +471,7 @@ def branch( @overload def branch( self, - branches: Mapping[str, tuple[Transformer[T, Any], Callable[[T], bool]]], + branches: Mapping[str, tuple[BaseTransformer[T, Any], Callable[[T], bool]]], *, executor_type: Literal["thread", "process"] = "thread", first_match: bool = True, @@ -478,7 +481,7 @@ def branch( def branch( self, - branches: Mapping[str, Transformer[T, Any]] | Mapping[str, tuple[Transformer[T, Any], Callable[[T], bool]]], + branches: Mapping[str, BaseTransformer[T, Any]] | Mapping[str, tuple[BaseTransformer[T, Any], Callable[[T], bool]]], *, executor_type: Literal["thread", "process"] = "thread", first_match: bool = True, @@ -519,7 +522,7 @@ def branch( first_value = next(iter(branches.values())) is_conditional = isinstance(first_value, tuple) - parsed_branches: list[tuple[str, Transformer[T, Any], Callable[[T], bool]]] + parsed_branches: list[tuple[str, BaseTransformer[T, Any], Callable[[T], bool]]] if is_conditional: parsed_branches = [(name, trans, cond) for name, (trans, cond) in branches.items()] # type: ignore else: @@ -555,7 +558,7 @@ def _execute_branching_process( self, *, producer_fn: Callable, - parsed_branches: list[tuple[str, Transformer, Callable]], + parsed_branches: list[tuple[str, BaseTransformer, Callable]], batch_size: int, max_batch_buffer: int, ) -> tuple[dict[str, list[Any]], dict[str, Any]]: @@ -629,7 +632,7 @@ def _execute_branching_thread( self, *, producer_fn: Callable, - parsed_branches: list[tuple[str, Transformer, Callable]], + parsed_branches: list[tuple[str, BaseTransformer, Callable]], batch_size: int, max_batch_buffer: int, ) -> tuple[dict[str, list[Any]], dict[str, Any]]: @@ -654,7 +657,7 @@ def _execute_branching_thread( final_results: dict[str, list[Any]] = {name: [] for name, _, _ in parsed_branches} queues = {name: Queue(maxsize=max_batch_buffer) for name, _, _ in parsed_branches} - def consumer(transformer: Transformer, queue: Queue, context_handle: IContextHandle) -> list[Any]: + def consumer(transformer: BaseTransformer, queue: Queue, context_handle: IContextHandle) -> list[Any]: """Consume batches from a queue and process them with a transformer. Creates a mini-pipeline for the transformer and processes all diff --git a/laygo/transformers/http.py b/laygo/transformers/http.py index 4087628..fb71389 100644 --- a/laygo/transformers/http.py +++ b/laygo/transformers/http.py @@ -22,6 +22,7 @@ from laygo.transformers.transformer import ChunkErrorHandler from laygo.transformers.transformer import PipelineFunction from laygo.transformers.transformer import Transformer +from laygo.transformers.types import BaseTransformer In = TypeVar("In") Out = TypeVar("Out") @@ -56,7 +57,7 @@ def createHTTPTransformer[T]( ) -class HTTPTransformer(Transformer[In, Out]): +class HTTPTransformer[In, Out](BaseTransformer[In, Out]): """A self-sufficient, chainable transformer for distributed execution. This transformer manages its own distributed execution by coordinating diff --git a/laygo/transformers/parallel.py b/laygo/transformers/parallel.py deleted file mode 100644 index 5c3ce09..0000000 --- a/laygo/transformers/parallel.py +++ /dev/null @@ -1,236 +0,0 @@ -"""Parallel transformer implementation using multiple processes and loky.""" - -from collections import deque -from collections.abc import Callable -from collections.abc import Iterable -from collections.abc import Iterator -from concurrent.futures import FIRST_COMPLETED -from concurrent.futures import wait -import copy -import itertools -from typing import Any -from typing import Union -from typing import overload - -from loky import get_reusable_executor - -from laygo.context import ParallelContextManager -from laygo.context.types import IContextHandle -from laygo.context.types import IContextManager -from laygo.errors import ErrorHandler -from laygo.transformers.transformer import ChunkErrorHandler -from laygo.transformers.transformer import InternalTransformer -from laygo.transformers.transformer import PipelineFunction -from laygo.transformers.transformer import Transformer - - -def _worker_process_chunk[In, Out]( - transformer_logic: InternalTransformer[In, Out], - context_handle: IContextHandle, - chunk: list[In], -) -> list[Out]: - """ - Top-level function executed by each worker process. - It reconstructs the context proxy from the handle and runs the transformation. - """ - context_proxy = context_handle.create_proxy() - try: - return transformer_logic(chunk, context_proxy) - finally: - # The proxy's shutdown is a no-op, but it's good practice to call it. - context_proxy.shutdown() - - -def createParallelTransformer[T]( - _type_hint: type[T], - max_workers: int = 4, - ordered: bool = True, - chunk_size: int | None = None, -) -> "ParallelTransformer[T, T]": - """Create a new identity parallel transformer with an explicit type hint. - - Args: - _type_hint: Type hint for the data being processed. - max_workers: Maximum number of worker processes. - ordered: Whether to preserve order of results. - chunk_size: Size of chunks to process data in. - - Returns: - A new identity parallel transformer. - """ - return ParallelTransformer[T, T]( - max_workers=max_workers, - ordered=ordered, - chunk_size=chunk_size, - transformer=None, - ) - - -class ParallelTransformer[In, Out](Transformer[In, Out]): - """A transformer that executes operations concurrently using multiple processes. - - This transformer uses 'loky' to support dynamically created transformation - logic and provides true parallelism by bypassing Python's Global Interpreter - Lock (GIL). It's ideal for CPU-bound operations. - """ - - def __init__( - self, - max_workers: int = 4, - ordered: bool = True, - chunk_size: int | None = None, - transformer: InternalTransformer[In, Out] | None = None, - ) -> None: - """Initialize the parallel transformer. - - Args: - max_workers: Maximum number of worker processes. - ordered: If True, results are yielded in order. If False, results - are yielded as they complete. - chunk_size: Size of data chunks to process. - transformer: The transformation logic chain. - """ - super().__init__(chunk_size, transformer) - self.max_workers = max_workers - self.ordered = ordered - # Rule 3: Parallel transformers create a parallel context manager by default. - self._default_context = ParallelContextManager() - - @classmethod - def from_transformer[T, U]( - cls, - transformer: Transformer[T, U], - chunk_size: int | None = None, - max_workers: int = 4, - ordered: bool = True, - ) -> "ParallelTransformer[T, U]": - """Create a ParallelTransformer from an existing Transformer's logic. - - Args: - transformer: The base transformer to copy the transformation logic from. - chunk_size: Optional chunk size override. - max_workers: Maximum number of worker processes. - ordered: If True, results are yielded in order. - - Returns: - A new ParallelTransformer with the same transformation logic. - """ - return cls( - chunk_size=chunk_size or transformer.chunk_size, - transformer=copy.deepcopy(transformer.transformer), # type: ignore - max_workers=max_workers, - ordered=ordered, - ) - - def __call__(self, data: Iterable[In], context: IContextManager | None = None) -> Iterator[Out]: - """Execute the transformer by distributing chunks to a process pool.""" - run_context = context if context is not None else self._default_context - - # Get the picklable handle from the context manager. - context_handle = run_context.get_handle() - - executor = get_reusable_executor(max_workers=self.max_workers) - chunks_to_process = self._chunk_generator(data) - - gen_func = self._ordered_generator if self.ordered else self._unordered_generator - - try: - processed_chunks_iterator = gen_func(chunks_to_process, executor, context_handle) - for result_chunk in processed_chunks_iterator: - yield from result_chunk - finally: - if run_context is self._default_context: - self._default_context.shutdown() - - def _ordered_generator( - self, - chunks_iter: Iterator[list[In]], - executor, - context_handle: IContextHandle, - ) -> Iterator[list[Out]]: - """Generate results in their original order.""" - futures = deque() - for _ in range(self.max_workers + 1): - try: - chunk = next(chunks_iter) - futures.append(executor.submit(_worker_process_chunk, self.transformer, context_handle, chunk)) - except StopIteration: - break - while futures: - yield futures.popleft().result() - try: - chunk = next(chunks_iter) - futures.append(executor.submit(_worker_process_chunk, self.transformer, context_handle, chunk)) - except StopIteration: - continue - - def _unordered_generator( - self, - chunks_iter: Iterator[list[In]], - executor, - context_handle: IContextHandle, - ) -> Iterator[list[Out]]: - """Generate results as they complete.""" - futures = { - executor.submit(_worker_process_chunk, self.transformer, context_handle, chunk) - for chunk in itertools.islice(chunks_iter, self.max_workers + 1) - } - while futures: - done, futures = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - yield future.result() - try: - chunk = next(chunks_iter) - futures.add(executor.submit(_worker_process_chunk, self.transformer, context_handle, chunk)) - except StopIteration: - continue - - def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "ParallelTransformer[In, Out]": - super().on_error(handler) - return self - - def map[U](self, function: PipelineFunction[Out, U]) -> "ParallelTransformer[In, U]": - super().map(function) - return self # type: ignore - - def filter(self, predicate: PipelineFunction[Out, bool]) -> "ParallelTransformer[In, Out]": - super().filter(predicate) - return self - - @overload - def flatten[T](self: "ParallelTransformer[In, list[T]]") -> "ParallelTransformer[In, T]": ... - @overload - def flatten[T](self: "ParallelTransformer[In, tuple[T, ...]]") -> "ParallelTransformer[In, T]": ... - @overload - def flatten[T](self: "ParallelTransformer[In, set[T]]") -> "ParallelTransformer[In, T]": ... - def flatten[T]( # type: ignore - self: Union[ - "ParallelTransformer[In, list[T]]", - "ParallelTransformer[In, tuple[T, ...]]", - "ParallelTransformer[In, set[T]]", - ], - ) -> "ParallelTransformer[In, T]": - super().flatten() # type: ignore - return self # type: ignore - - def tap(self, arg: Union["Transformer[Out, Any]", PipelineFunction[Out, Any]]) -> "ParallelTransformer[In, Out]": - super().tap(arg) - return self - - def apply[T]( - self, t: Callable[["ParallelTransformer[In, Out]"], "Transformer[In, T]"] - ) -> "ParallelTransformer[In, T]": - super().apply(t) # type: ignore - return self # type: ignore - - def catch[U]( - self, - sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]], - on_error: ChunkErrorHandler[Out, None] | None = None, - ) -> "ParallelTransformer[In, U]": - super().catch(sub_pipeline_builder, on_error) - return self # type: ignore - - def short_circuit(self, function: Callable[[IContextManager], bool | None]) -> "ParallelTransformer[In, Out]": - super().short_circuit(function) - return self diff --git a/laygo/transformers/strategies/http.py b/laygo/transformers/strategies/http.py new file mode 100644 index 0000000..e69de29 diff --git a/laygo/transformers/strategies/process.py b/laygo/transformers/strategies/process.py new file mode 100644 index 0000000..10c3e0d --- /dev/null +++ b/laygo/transformers/strategies/process.py @@ -0,0 +1,94 @@ +from collections import deque +from collections.abc import Iterator +from concurrent.futures import FIRST_COMPLETED +from concurrent.futures import wait +import itertools + +from loky import get_reusable_executor + +from laygo.context.types import IContextHandle +from laygo.transformers.strategies.types import ExecutionStrategy +from laygo.transformers.types import InternalTransformer + + +def _worker_process_chunk[In, Out]( + transformer_logic: InternalTransformer[In, Out], + context_handle: IContextHandle, + chunk: list[In], +) -> list[Out]: + """ + Top-level function executed by each worker process. + It reconstructs the context proxy from the handle and runs the transformation. + """ + context_proxy = context_handle.create_proxy() + try: + return transformer_logic(chunk, context_proxy) + finally: + # The proxy's shutdown is a no-op, but it's good practice to call it. + context_proxy.shutdown() + + +class ProcessStrategy[In, Out](ExecutionStrategy[In, Out]): + def __init__(self, max_workers: int = 4, ordered: bool = True): + self.max_workers = max_workers + self.ordered = ordered + + def execute(self, transformer_logic, chunk_generator, data, context): + """Execute the transformer by distributing chunks to a process pool.""" + + # Get the picklable handle from the context manager. + context_handle = context.get_handle() + + executor = get_reusable_executor(max_workers=self.max_workers) + chunks_to_process = chunk_generator(data) + + gen_func = self._ordered_generator if self.ordered else self._unordered_generator + + processed_chunks_iterator = gen_func(chunks_to_process, transformer_logic, executor, context_handle) + for result_chunk in processed_chunks_iterator: + yield from result_chunk + + def _ordered_generator( + self, + chunks_iter: Iterator[list[In]], + transformer: InternalTransformer[In, Out], + executor, + context_handle: IContextHandle, + ) -> Iterator[list[Out]]: + """Generate results in their original order.""" + futures = deque() + for _ in range(self.max_workers + 1): + try: + chunk = next(chunks_iter) + futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk)) + except StopIteration: + break + while futures: + yield futures.popleft().result() + try: + chunk = next(chunks_iter) + futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk)) + except StopIteration: + continue + + def _unordered_generator( + self, + chunks_iter: Iterator[list[In]], + transformer: InternalTransformer[In, Out], + executor, + context_handle: IContextHandle, + ) -> Iterator[list[Out]]: + """Generate results as they complete.""" + futures = { + executor.submit(_worker_process_chunk, transformer, context_handle, chunk) + for chunk in itertools.islice(chunks_iter, self.max_workers + 1) + } + while futures: + done, futures = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + yield future.result() + try: + chunk = next(chunks_iter) + futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk)) + except StopIteration: + continue diff --git a/laygo/transformers/strategies/sequential.py b/laygo/transformers/strategies/sequential.py new file mode 100644 index 0000000..a936d6e --- /dev/null +++ b/laygo/transformers/strategies/sequential.py @@ -0,0 +1,8 @@ +from laygo.transformers.strategies.types import ExecutionStrategy + + +class SequentialStrategy[In, Out](ExecutionStrategy[In, Out]): + def execute(self, transformer_logic, chunk_generator, data, context): + # Logic from the original Transformer.__call__ + for chunk in chunk_generator(data): + yield from transformer_logic(chunk, context) diff --git a/laygo/transformers/strategies/threaded.py b/laygo/transformers/strategies/threaded.py new file mode 100644 index 0000000..a855458 --- /dev/null +++ b/laygo/transformers/strategies/threaded.py @@ -0,0 +1,114 @@ +from collections import deque +from collections.abc import Iterable +from collections.abc import Iterator +from concurrent.futures import FIRST_COMPLETED +from concurrent.futures import Future +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import wait +from functools import partial +import itertools + +from laygo.context.types import IContextManager +from laygo.transformers.strategies.types import ChunkGenerator +from laygo.transformers.strategies.types import ExecutionStrategy +from laygo.transformers.types import InternalTransformer + + +class ThreadedStrategy[In, Out](ExecutionStrategy[In, Out]): + def __init__(self, max_workers: int = 4, ordered: bool = True): + self.max_workers = max_workers + self.ordered = ordered + + def execute(self, transformer_logic, chunk_generator, data, context): + """Execute the transformer on data concurrently. + + It uses the shared context provided by the Pipeline, if available. + + Args: + data: The input data to process. + context: Optional pipeline context for shared state. + + Returns: + An iterator over the transformed data. + """ + + # Since threads share memory, we can pass the context manager directly. + # No handle/proxy mechanism is needed, but the locking inside + # ParallelContextManager is crucial for thread safety. + yield from self._execute_with_context(data, transformer_logic, context, chunk_generator) + + def _execute_with_context( + self, + data: Iterable[In], + transformer: InternalTransformer[In, Out], + shared_context: IContextManager, + chunk_generator: ChunkGenerator[In], + ) -> Iterator[Out]: + """Execute the transformation logic with a given context. + + Args: + data: The input data to process. + shared_context: The shared context for the execution. + + Returns: + An iterator over the transformed data. + """ + + def process_chunk(chunk: list[In], shared_context: IContextManager) -> list[Out]: + """Process a single chunk by passing the chunk and context explicitly. + + Args: + chunk: The data chunk to process. + shared_context: The shared context for processing. + + Returns: + The processed chunk. + """ + return transformer(chunk, shared_context) # type: ignore + + # Create a partial function with the shared_context "baked in". + process_chunk_with_context = partial(process_chunk, shared_context=shared_context) + + def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: + """Generate results in their original order.""" + futures: deque[Future[list[Out]]] = deque() + for _ in range(self.max_workers + 1): + try: + chunk = next(chunks_iter) + futures.append(executor.submit(process_chunk_with_context, chunk)) + except StopIteration: + break + while futures: + yield futures.popleft().result() + try: + chunk = next(chunks_iter) + futures.append(executor.submit(process_chunk_with_context, chunk)) + except StopIteration: + continue + + def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: + """Generate results as they complete.""" + futures = { + executor.submit(process_chunk_with_context, chunk) + for chunk in itertools.islice(chunks_iter, self.max_workers + 1) + } + while futures: + done, futures = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + yield future.result() + try: + chunk = next(chunks_iter) + futures.add(executor.submit(process_chunk_with_context, chunk)) + except StopIteration: + continue + + def result_iterator_manager() -> Iterator[Out]: + """Manage the thread pool and yield flattened results.""" + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + chunks_to_process = chunk_generator(data) + gen_func = _ordered_generator if self.ordered else _unordered_generator + processed_chunks_iterator = gen_func(chunks_to_process, executor) + for result_chunk in processed_chunks_iterator: + yield from result_chunk + + return result_iterator_manager() diff --git a/laygo/transformers/strategies/types.py b/laygo/transformers/strategies/types.py new file mode 100644 index 0000000..47fbc34 --- /dev/null +++ b/laygo/transformers/strategies/types.py @@ -0,0 +1,25 @@ +from abc import ABC +from abc import abstractmethod +from collections.abc import Callable +from collections.abc import Iterable +from collections.abc import Iterator + +from laygo.context.types import IContextManager +from laygo.transformers.types import InternalTransformer + +type ChunkGenerator[In] = Callable[[Iterable[In]], Iterator[list[In]]] + + +class ExecutionStrategy[In, Out](ABC): + """Defines the contract for all execution strategies.""" + + @abstractmethod + def execute( + self, + transformer_logic: InternalTransformer[In, Out], + chunk_generator: Callable[[Iterable[In]], Iterator[list[In]]], + data: Iterable[In], + context: IContextManager, + ) -> Iterator[Out]: + """Runs the transformation logic on the data.""" + raise NotImplementedError diff --git a/laygo/transformers/threaded.py b/laygo/transformers/threaded.py deleted file mode 100644 index 7f28856..0000000 --- a/laygo/transformers/threaded.py +++ /dev/null @@ -1,251 +0,0 @@ -"""Parallel transformer implementation using multiple threads.""" - -from collections import deque -from collections.abc import Callable -from collections.abc import Iterable -from collections.abc import Iterator -from concurrent.futures import FIRST_COMPLETED -from concurrent.futures import Future -from concurrent.futures import ThreadPoolExecutor -from concurrent.futures import wait -import copy -from functools import partial -import itertools -from typing import Any -from typing import Union -from typing import overload - -from laygo.context import IContextManager -from laygo.context import ParallelContextManager -from laygo.errors import ErrorHandler -from laygo.transformers.transformer import DEFAULT_CHUNK_SIZE -from laygo.transformers.transformer import ChunkErrorHandler -from laygo.transformers.transformer import InternalTransformer -from laygo.transformers.transformer import PipelineFunction -from laygo.transformers.transformer import Transformer - - -def createThreadedTransformer[T]( - _type_hint: type[T], - max_workers: int = 4, - ordered: bool = True, - chunk_size: int = DEFAULT_CHUNK_SIZE, -) -> "ThreadedTransformer[T, T]": - """Create a new identity threaded transformer with an explicit type hint. - - Args: - _type_hint: Type hint for the data being processed. - max_workers: Maximum number of worker threads. - ordered: Whether to preserve order of results. - chunk_size: Size of chunks to process data in. - - Returns: - A new identity threaded transformer. - """ - return ThreadedTransformer[T, T]( - max_workers=max_workers, - ordered=ordered, - chunk_size=chunk_size, - transformer=None, - ) - - -class ThreadedTransformer[In, Out](Transformer[In, Out]): - """A transformer that executes operations concurrently using multiple threads. - - This transformer processes data chunks in parallel using a thread pool, - which is effective for I/O-bound operations but may be limited by the - Global Interpreter Lock (GIL) for CPU-bound tasks. - """ - - def __init__( - self, - max_workers: int = 4, - ordered: bool = True, - chunk_size: int | None = None, - transformer: InternalTransformer[In, Out] | None = None, - ) -> None: - """Initialize the threaded transformer. - - Args: - max_workers: Maximum number of worker threads. - ordered: If True, results are yielded in order. If False, results - are yielded as they complete. - chunk_size: Size of data chunks to process. - transformer: The transformation logic chain. - """ - super().__init__(chunk_size, transformer) - self.max_workers = max_workers - self.ordered = ordered - # Rule 3: Threaded transformers create a parallel context manager by default. - # This is because threads share memory, so a thread-safe (locking) manager is required. - self._default_context = ParallelContextManager() - - @classmethod - def from_transformer[T, U]( - cls, - transformer: Transformer[T, U], - chunk_size: int | None = None, - max_workers: int = 4, - ordered: bool = True, - ) -> "ThreadedTransformer[T, U]": - """Create a ThreadedTransformer from an existing Transformer's logic. - - Args: - transformer: The base transformer to copy the transformation logic from. - chunk_size: Optional chunk size override. - max_workers: Maximum number of worker threads. - ordered: If True, results are yielded in order. - - Returns: - A new ThreadedTransformer with the same transformation logic. - """ - return cls( - chunk_size=chunk_size or transformer.chunk_size, - transformer=copy.deepcopy(transformer.transformer), # type: ignore - max_workers=max_workers, - ordered=ordered, - ) - - def __call__(self, data: Iterable[In], context: IContextManager | None = None) -> Iterator[Out]: - """Execute the transformer on data concurrently. - - It uses the shared context provided by the Pipeline, if available. - - Args: - data: The input data to process. - context: Optional pipeline context for shared state. - - Returns: - An iterator over the transformed data. - """ - run_context = context if context is not None else self._default_context - - # Since threads share memory, we can pass the context manager directly. - # No handle/proxy mechanism is needed, but the locking inside - # ParallelContextManager is crucial for thread safety. - try: - yield from self._execute_with_context(data, run_context) - finally: - if run_context is self._default_context: - self._default_context.shutdown() - - def _execute_with_context(self, data: Iterable[In], shared_context: IContextManager) -> Iterator[Out]: - """Execute the transformation logic with a given context. - - Args: - data: The input data to process. - shared_context: The shared context for the execution. - - Returns: - An iterator over the transformed data. - """ - - def process_chunk(chunk: list[In], shared_context: IContextManager) -> list[Out]: - """Process a single chunk by passing the chunk and context explicitly. - - Args: - chunk: The data chunk to process. - shared_context: The shared context for processing. - - Returns: - The processed chunk. - """ - return self.transformer(chunk, shared_context) # type: ignore - - # Create a partial function with the shared_context "baked in". - process_chunk_with_context = partial(process_chunk, shared_context=shared_context) - - def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: - """Generate results in their original order.""" - futures: deque[Future[list[Out]]] = deque() - for _ in range(self.max_workers + 1): - try: - chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk_with_context, chunk)) - except StopIteration: - break - while futures: - yield futures.popleft().result() - try: - chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk_with_context, chunk)) - except StopIteration: - continue - - def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: - """Generate results as they complete.""" - futures = { - executor.submit(process_chunk_with_context, chunk) - for chunk in itertools.islice(chunks_iter, self.max_workers + 1) - } - while futures: - done, futures = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - yield future.result() - try: - chunk = next(chunks_iter) - futures.add(executor.submit(process_chunk_with_context, chunk)) - except StopIteration: - continue - - def result_iterator_manager() -> Iterator[Out]: - """Manage the thread pool and yield flattened results.""" - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - chunks_to_process = self._chunk_generator(data) - gen_func = _ordered_generator if self.ordered else _unordered_generator - processed_chunks_iterator = gen_func(chunks_to_process, executor) - for result_chunk in processed_chunks_iterator: - yield from result_chunk - - return result_iterator_manager() - - # --- Overridden Chaining Methods to Preserve Type --- - - def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "ThreadedTransformer[In, Out]": - super().on_error(handler) - return self - - def map[U](self, function: PipelineFunction[Out, U]) -> "ThreadedTransformer[In, U]": - super().map(function) - return self # type: ignore - - def filter(self, predicate: PipelineFunction[Out, bool]) -> "ThreadedTransformer[In, Out]": - super().filter(predicate) - return self - - @overload - def flatten[T](self: "ThreadedTransformer[In, list[T]]") -> "ThreadedTransformer[In, T]": ... - @overload - def flatten[T](self: "ThreadedTransformer[In, tuple[T, ...]]") -> "ThreadedTransformer[In, T]": ... - @overload - def flatten[T](self: "ThreadedTransformer[In, set[T]]") -> "ThreadedTransformer[In, T]": ... - def flatten[T]( # type: ignore - self: Union[ - "ThreadedTransformer[In, list[T]]", "ThreadedTransformer[In, tuple[T, ...]]", "ThreadedTransformer[In, set[T]]" - ], - ) -> "ThreadedTransformer[In, T]": - super().flatten() # type: ignore - return self # type: ignore - - def tap(self, arg: Union["Transformer[Out, Any]", PipelineFunction[Out, Any]]) -> "ThreadedTransformer[In, Out]": - super().tap(arg) - return self - - def apply[T]( - self, t: Callable[["ThreadedTransformer[In, Out]"], "Transformer[In, T]"] - ) -> "ThreadedTransformer[In, T]": - super().apply(t) # type: ignore - return self # type: ignore - - def catch[U]( - self, - sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]], - on_error: ChunkErrorHandler[Out, None] | None = None, - ) -> "ThreadedTransformer[In, U]": - super().catch(sub_pipeline_builder, on_error) - return self # type: ignore - - def short_circuit(self, function: Callable[[IContextManager], bool | None]) -> "ThreadedTransformer[In, Out]": - super().short_circuit(function) - return self diff --git a/laygo/transformers/transformer.py b/laygo/transformers/transformer.py index a01e8f2..4a3084e 100644 --- a/laygo/transformers/transformer.py +++ b/laygo/transformers/transformer.py @@ -17,19 +17,21 @@ from laygo.errors import ErrorHandler from laygo.helpers import is_context_aware from laygo.helpers import is_context_aware_reduce +from laygo.transformers.strategies.sequential import SequentialStrategy +from laygo.transformers.strategies.threaded import ThreadedStrategy +from laygo.transformers.strategies.types import ExecutionStrategy +from laygo.transformers.types import BaseTransformer +from laygo.transformers.types import InternalTransformer DEFAULT_CHUNK_SIZE = 1000 type PipelineFunction[Out, T] = Callable[[Out], T] | Callable[[Out, IContextManager], T] type PipelineReduceFunction[U, Out] = Callable[[U, Out], U] | Callable[[U, Out, IContextManager], U] - -# The internal transformer function signature is changed to explicitly accept a context. -type InternalTransformer[In, Out] = Callable[[list[In], IContextManager], list[Out]] type ChunkErrorHandler[In, U] = Callable[[list[In], Exception, IContextManager], list[U]] -def createTransformer[T](_type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SIZE) -> "Transformer[T, T]": +def create_transformer[T](_type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SIZE) -> "Transformer[T, T]": """Create a new identity pipeline with an explicit type hint. Args: @@ -42,6 +44,58 @@ def createTransformer[T](_type_hint: type[T], chunk_size: int = DEFAULT_CHUNK_SI return Transformer[T, T](chunk_size=chunk_size) # type: ignore +def create_threaded_transformer[T]( + _type_hint: type[T], + max_workers: int = 4, + ordered: bool = True, + chunk_size: int = DEFAULT_CHUNK_SIZE, +) -> "Transformer[T, T]": + """Create a new identity threaded transformer with an explicit type hint. + + Args: + _type_hint: Type hint for the data being processed. + max_workers: Maximum number of worker threads. + ordered: Whether to preserve order of results. + chunk_size: Size of chunks to process data in. + + Returns: + A new identity threaded transformer. + """ + return Transformer[T, T]( + chunk_size=chunk_size, + strategy=ThreadedStrategy( + max_workers=max_workers, + ordered=ordered, + ), + ) + + +def create_process_transformer[T]( + _type_hint: type[T], + max_workers: int = 4, + ordered: bool = True, + chunk_size: int = DEFAULT_CHUNK_SIZE, +) -> "Transformer[T, T]": + """Create a new identity threaded transformer with an explicit type hint. + + Args: + _type_hint: Type hint for the data being processed. + max_workers: Maximum number of worker threads. + ordered: Whether to preserve order of results. + chunk_size: Size of chunks to process data in. + + Returns: + A new identity threaded transformer. + """ + return Transformer[T, T]( + chunk_size=chunk_size, + strategy=ThreadedStrategy( + max_workers=max_workers, + ordered=ordered, + ), + ) + + def build_chunk_generator[T](chunk_size: int) -> Callable[[Iterable[T]], Iterator[list[T]]]: """Return a function that breaks an iterable into chunks of a specified size. @@ -76,7 +130,7 @@ def passthrough_chunks[T](data: Iterable[list[T]]) -> Iterator[list[T]]: yield from iter(data) -class Transformer[In, Out]: +class Transformer[In, Out](BaseTransformer[In, Out]): """Define and compose data transformations by passing context explicitly. A Transformer represents a data processing pipeline that can be chained @@ -86,6 +140,7 @@ class Transformer[In, Out]: def __init__( self, + strategy: ExecutionStrategy[In, Out] | None = None, chunk_size: int | None = DEFAULT_CHUNK_SIZE, transformer: InternalTransformer[In, Out] | None = None, ) -> None: @@ -103,6 +158,7 @@ def __init__( self._chunk_generator = build_chunk_generator(chunk_size) if chunk_size else lambda x: iter([list(x)]) # Rule 2: Transformers create a simple context manager by default for standalone use. self._default_context = SimpleContextManager() + self.strategy = strategy if strategy is not None else SequentialStrategy() @classmethod def from_transformer[T, U]( @@ -121,6 +177,7 @@ def from_transformer[T, U]( """ return cls( chunk_size=chunk_size or transformer.chunk_size, + strategy=transformer.strategy, # type: ignore transformer=copy.deepcopy(transformer.transformer), # type: ignore ) @@ -333,29 +390,10 @@ def operation(chunk: list[Out], ctx: IContextManager) -> list[Out]: return self._pipe(operation) def __call__(self, data: Iterable[In], context: IContextManager | None = None) -> Iterator[Out]: - """Execute the transformer on a data source. - - It uses the provided `context` by reference. If none is provided, it uses - the transformer's internal context. - - Args: - data: The input data to process. - context: Optional context (IContextManager or dict) to use during processing. - - Returns: - An iterator over the transformed data. - """ - - # Use the provided context by reference, or default to a simple context. + """Execute the transformer by delegating to its strategy.""" run_context = context if context is not None else self._default_context - - try: - for chunk in self._chunk_generator(data): - # The context is now passed explicitly through the transformer chain. - yield from self.transformer(chunk, run_context) - finally: - if run_context is self._default_context: - self._default_context.shutdown() + # The new __call__ is just one line! + return self.strategy.execute(self.transformer, self._chunk_generator, data, run_context) @overload def reduce[U]( @@ -466,7 +504,7 @@ def catch[U]( catch_error_handler.on_error(on_error) # type: ignore # Create a blank transformer for the sub-pipeline - temp_transformer = createTransformer(_type_hint=..., chunk_size=self.chunk_size) # type: ignore + temp_transformer = create_transformer(_type_hint=..., chunk_size=self.chunk_size) # type: ignore # Build the sub-pipeline and get its internal transformer function sub_pipeline = sub_pipeline_builder(temp_transformer) diff --git a/laygo/transformers/types.py b/laygo/transformers/types.py new file mode 100644 index 0000000..024ab42 --- /dev/null +++ b/laygo/transformers/types.py @@ -0,0 +1,31 @@ +from abc import ABC +from abc import abstractmethod +from collections.abc import Callable +from collections.abc import Iterable +from collections.abc import Iterator +from typing import TypeVar + +from laygo.context import IContextManager + +In = TypeVar("In") +Out = TypeVar("Out") + +type InternalTransformer[In, Out] = Callable[[list[In], IContextManager], list[Out]] + + +class BaseTransformer[In, Out](ABC): + """ + Abstract base class for all transformer types. + + Defines the essential contract for a transformer, which is to be a callable + that processes an iterable of data and yields an iterator of results, + optionally using a context manager. + """ + + @abstractmethod + def __call__(self, data: Iterable[In], context: IContextManager | None = None) -> Iterator[Out]: + """ + Executes the transformation on a data source. + This method must be implemented by all concrete transformer classes. + """ + raise NotImplementedError diff --git a/tests/test_integration.py b/tests/test_integration.py index 81900ba..9500d70 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,9 +1,9 @@ """Integration tests for Pipeline and Transformer working together.""" -from laygo import ParallelTransformer from laygo import Pipeline from laygo import Transformer -from laygo import createTransformer +from laygo import create_process_transformer +from laygo import create_transformer from laygo.context.types import IContextManager @@ -12,7 +12,7 @@ class TestPipelineTransformerBasics: def test_basic_pipeline_transformer_integration(self): """Test basic pipeline and transformer integration.""" - transformer = createTransformer(int).map(lambda x: x * 2).filter(lambda x: x > 5) + transformer = create_transformer(int).map(lambda x: x * 2).filter(lambda x: x > 5) result, _ = Pipeline([1, 2, 3, 4, 5]).apply(transformer).to_list() assert result == [6, 8, 10] @@ -118,19 +118,19 @@ def stage2_processor(x: int, ctx: IContextManager) -> int: class TestPipelineParallelTransformerIntegration: """Test Pipeline integration with ParallelTransformer and context modification.""" - def test_parallel_transformer_basic_integration(self): + def test_process_transformer_basic_integration(self): """Test pipeline with parallel transformer for basic operations.""" - parallel_transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + parallel_transformer = create_process_transformer(int, max_workers=2, chunk_size=2) parallel_transformer = parallel_transformer.map(lambda x: x * 2).filter(lambda x: x > 5) result, _ = Pipeline([1, 2, 3, 4, 5]).apply(parallel_transformer).to_list() assert sorted(result) == [6, 8, 10] - def test_parallel_transformer_with_context_modification(self): + def test_process_transformer_with_context_modification(self): """Test parallel transformer safely modifying shared context.""" context = {"processed_count": 0, "sum_total": 0} - parallel_transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + parallel_transformer = create_process_transformer(int, max_workers=2, chunk_size=2) parallel_transformer = parallel_transformer.map(safe_increment_and_transform) data = [1, 2, 3, 4, 5] @@ -146,7 +146,7 @@ def test_pipeline_accesses_modified_context(self): """Test that pipeline can access context data modified by parallel transformer.""" context = {"items_processed": 0, "even_count": 0, "odd_count": 0} - parallel_transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) + parallel_transformer = create_process_transformer(int, max_workers=2, chunk_size=3) parallel_transformer = parallel_transformer.map(count_and_transform) data = [1, 2, 3, 4, 5, 6] @@ -159,14 +159,14 @@ def test_pipeline_accesses_modified_context(self): assert pipeline.context_manager["even_count"] == 3 # 2, 4, 6 assert pipeline.context_manager["odd_count"] == 3 # 1, 3, 5 - def test_multiple_parallel_transformers_chaining(self): + def test_multiple_process_transformers_chaining(self): """Test chaining multiple parallel transformers with shared context.""" # Shared context for statistics across transformations context = {"stage1_processed": 0, "stage2_processed": 0, "total_sum": 0} # Create two parallel transformers - stage1 = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(stage1_processor) - stage2 = ParallelTransformer[int, int](max_workers=2, chunk_size=2).map(stage2_processor) + stage1 = create_process_transformer(int, max_workers=2, chunk_size=2).map(stage1_processor) + stage2 = create_process_transformer(int, max_workers=2, chunk_size=2).map(stage2_processor) data = [1, 2, 3, 4, 5] @@ -194,7 +194,7 @@ def test_multiple_parallel_transformers_chaining(self): expected_total = original_sum + stage1_sum # 15 + 30 = 45 assert final_context["total_sum"] == expected_total - def test_pipeline_context_isolation_with_parallel_processing(self): + def test_pipeline_context_isolation_with_process_processing(self): """Test that different pipeline instances have isolated contexts.""" # Create base context structure @@ -207,7 +207,7 @@ def increment_counter(x: int, ctx: IContextManager) -> int: ctx["count"] += 1 return x * 2 - parallel_transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=2) + parallel_transformer = create_process_transformer(int, max_workers=2, chunk_size=2) parallel_transformer = parallel_transformer.map(increment_counter) data = [1, 2, 3] diff --git a/tests/test_parallel_transformer.py b/tests/test_parallel_transformer.py index 813c5c6..fea6ce9 100644 --- a/tests/test_parallel_transformer.py +++ b/tests/test_parallel_transformer.py @@ -4,42 +4,32 @@ import time from laygo import ErrorHandler -from laygo import ParallelTransformer +from laygo import Transformer +from laygo import create_process_transformer +from laygo import create_transformer from laygo.context import IContextManager from laygo.context import ParallelContextManager -from laygo.transformers.parallel import createParallelTransformer -from laygo.transformers.transformer import createTransformer class TestParallelTransformerBasics: """Test core parallel transformer functionality.""" - def test_initialization_custom_parameters(self): - """Test initialization with custom parameters.""" - transformer = ParallelTransformer[int, int](max_workers=8, ordered=False, chunk_size=500) - assert transformer.max_workers == 8 - assert transformer.ordered is False - assert transformer.chunk_size == 500 - def test_basic_execution(self): """Test basic parallel transformer execution.""" - transformer = ParallelTransformer[int, int](max_workers=2, chunk_size=3) + transformer = create_process_transformer(int, max_workers=2, chunk_size=3) result = list(transformer([1, 2, 3, 4, 5])) assert result == [1, 2, 3, 4, 5] def test_from_transformer_creation(self): """Test creating ParallelTransformer from existing Transformer.""" - regular = createTransformer(int).map(lambda x: x * 2).filter(lambda x: x > 5) - parallel = ParallelTransformer.from_transformer(regular, max_workers=2, chunk_size=10) + regular = create_transformer(int).map(lambda x: x * 2).filter(lambda x: x > 5) + parallel = Transformer.from_transformer(regular) data = [1, 2, 3, 4, 5, 6] regular_results = list(regular(data)) parallel_results = list(parallel(data)) assert regular_results == parallel_results - assert parallel.max_workers == 2 - assert parallel.ordered is True - assert parallel.chunk_size == 10 class TestParallelTransformerOperations: @@ -47,20 +37,20 @@ class TestParallelTransformerOperations: def test_map_concurrent_execution(self): """Test map operation with concurrent execution.""" - transformer = createParallelTransformer(int).map(lambda x: x * 2) + transformer = create_process_transformer(int).map(lambda x: x * 2) result = list(transformer([1, 2, 3, 4])) assert result == [2, 4, 6, 8] def test_filter_concurrent_execution(self): """Test filter operation with concurrent execution.""" - transformer = createParallelTransformer(int).filter(lambda x: x % 2 == 0) + transformer = create_process_transformer(int).filter(lambda x: x % 2 == 0) result = list(transformer([1, 2, 3, 4, 5, 6])) assert result == [2, 4, 6] def test_chained_operations(self): """Test chained operations work correctly with concurrency.""" transformer = ( - createParallelTransformer(int, chunk_size=2).map(lambda x: x * 2).filter(lambda x: x > 4).map(lambda x: x + 1) + create_process_transformer(int, chunk_size=2).map(lambda x: x * 2).filter(lambda x: x > 4).map(lambda x: x + 1) ) result = list(transformer([1, 2, 3, 4, 5])) assert result == [7, 9, 11] @@ -68,7 +58,7 @@ def test_chained_operations(self): def test_flatten_operation(self): """Test flatten operation with concurrent execution.""" # This defines a transformer that accepts iterables of lists and flattens them. - transformer = createParallelTransformer(list[int]).flatten() + transformer = create_process_transformer(list[int]).flatten() result = list(transformer([[1, 2], [3, 4], [5, 6]])) assert result == [1, 2, 3, 4, 5, 6] @@ -76,7 +66,7 @@ def test_tap_side_effects(self): """Test tap applies side effects correctly in concurrent execution.""" with mp.Manager() as manager: side_effects = manager.list() - transformer = createParallelTransformer(int).tap(lambda x: side_effects.append(x)) + transformer = create_process_transformer(int).tap(lambda x: side_effects.append(x)) result = list(transformer([1, 2, 3, 4])) assert result == [1, 2, 3, 4] @@ -107,7 +97,7 @@ class TestParallelTransformerContextSupport: def test_map_with_context(self): """Test map with context-aware function in concurrent execution.""" context = ParallelContextManager({"multiplier": 3}) - transformer = createParallelTransformer(int).map(lambda x, ctx: x * ctx["multiplier"]) + transformer = create_process_transformer(int).map(lambda x, ctx: x * ctx["multiplier"]) result = list(transformer([1, 2, 3], context)) assert result == [3, 6, 9] @@ -117,7 +107,7 @@ def test_multiple_context_values_modification(self): context = ParallelContextManager({"total_sum": 0, "item_count": 0, "max_value": 0}) - transformer = createParallelTransformer(int, max_workers=3, chunk_size=2).map(update_stats) + transformer = create_process_transformer(int, max_workers=3, chunk_size=2).map(update_stats) data = [1, 5, 3, 8, 2, 7, 4, 6] result = list(transformer(data, context)) @@ -137,16 +127,16 @@ def variable_time_transform(x: int) -> int: time.sleep(0.01 * (5 - x)) # Later elements process faster return x * 2 - transformer = createParallelTransformer(int, max_workers=3, ordered=True).map(variable_time_transform) + transformer = create_process_transformer(int, max_workers=3, ordered=True).map(variable_time_transform) result = list(transformer([1, 2, 3, 4, 5])) assert result == [2, 4, 6, 8, 10] def test_unordered_vs_ordered_same_elements(self): """Test that ordered and unordered produce same elements with different ordering.""" data = list(range(10)) - ordered_transformer = createParallelTransformer(int, max_workers=3, ordered=True).map(lambda x: x * 2) + ordered_transformer = create_process_transformer(int, max_workers=3, ordered=True).map(lambda x: x * 2) ordered_result = list(ordered_transformer(data)) - unordered_transformer = createParallelTransformer(int, max_workers=3, ordered=False).map(lambda x: x * 2) + unordered_transformer = create_process_transformer(int, max_workers=3, ordered=False).map(lambda x: x * 2) unordered_result = list(unordered_transformer(data)) assert sorted(ordered_result) == sorted(unordered_result) @@ -158,7 +148,7 @@ class TestParallelTransformerChunkingAndEdgeCases: def test_empty_data(self): """Test parallel transformer with empty data.""" - transformer = createParallelTransformer(int).map(lambda x: x * 2) + transformer = create_process_transformer(int).map(lambda x: x * 2) result = list(transformer([])) assert result == [] @@ -172,7 +162,7 @@ def failing_function(x: int) -> int: import pytest - transformer = createParallelTransformer(int, chunk_size=1).map(failing_function) + transformer = create_process_transformer(int, chunk_size=1).map(failing_function) with pytest.raises(ValueError, match="Test exception"): list(transformer([1, 2, 3, 4])) @@ -184,7 +174,7 @@ def test_safe_with_error_isolation(self): """Test safe execution isolates errors to specific chunks.""" with mp.Manager() as manager: errored_chunks = manager.list() - transformer = createParallelTransformer(int, chunk_size=1).catch( + transformer = create_process_transformer(int, chunk_size=1).catch( lambda t: t.map(lambda x: x / 0), # Division by zero on_error=lambda chunk, error, context: errored_chunks.append(chunk), # type: ignore ) @@ -200,7 +190,7 @@ def test_global_error_handler(self): error_handler.on_error(lambda chunk, error, context: errored_chunks.append(chunk)) transformer = ( - createParallelTransformer(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) + create_process_transformer(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) ) list(transformer([1, 2, 3])) assert sorted(map(tuple, errored_chunks)) == sorted(map(tuple, [[1], [2], [3]])) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 429964e..9939967 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -5,7 +5,7 @@ from laygo import Pipeline from laygo.context.types import IContextManager -from laygo.transformers.transformer import createTransformer +from laygo.transformers.transformer import create_transformer class TestPipelineBasics: @@ -42,7 +42,7 @@ class TestPipelineTransformations: def test_apply_with_transformer(self): """Test apply with transformer object.""" - transformer = createTransformer(int).map(lambda x: x * 2).filter(lambda x: x > 4) + transformer = create_transformer(int).map(lambda x: x * 2).filter(lambda x: x > 4) result, _ = Pipeline([1, 2, 3, 4]).apply(transformer).to_list() assert result == [6, 8] @@ -99,7 +99,7 @@ def test_first_with_insufficient_data(self): def test_consume_processes_without_return(self): """Test consume processes all elements without returning anything.""" side_effects = [] - transformer = createTransformer(int).tap(lambda x: side_effects.append(x)) + transformer = create_transformer(int).tap(lambda x: side_effects.append(x)) result, _ = Pipeline([1, 2, 3]).apply(transformer).consume() assert result is None @@ -178,7 +178,7 @@ def test_large_dataset_processing(self): def test_chunked_processing_consistency(self): """Test that chunked processing produces consistent results.""" # Use small chunk size to test chunking behavior - transformer = createTransformer(int, chunk_size=10).map(lambda x: x + 1) + transformer = create_transformer(int, chunk_size=10).map(lambda x: x + 1) result, _ = Pipeline(list(range(100))).apply(transformer).to_list() expected = list(range(1, 101)) # [1, 2, 3, ..., 100] @@ -234,8 +234,8 @@ def test_branch_basic_functionality(self): pipeline = Pipeline([1, 2, 3, 4, 5]) # Create two different branch transformers - double_branch = createTransformer(int).map(lambda x: x * 2) - square_branch = createTransformer(int).map(lambda x: x**2) + double_branch = create_transformer(int).map(lambda x: x * 2) + square_branch = create_transformer(int).map(lambda x: x**2) # Execute branching result, _ = pipeline.branch({"doubled": double_branch, "squared": square_branch}) @@ -255,8 +255,8 @@ def test_branch_with_empty_input(self): """Test branch with empty input data.""" pipeline = Pipeline([]) - double_branch = createTransformer(int).map(lambda x: x * 2) - square_branch = createTransformer(int).map(lambda x: x**2) + double_branch = create_transformer(int).map(lambda x: x * 2) + square_branch = create_transformer(int).map(lambda x: x**2) result, _ = pipeline.branch({"doubled": double_branch, "squared": square_branch}) @@ -276,7 +276,7 @@ def test_branch_with_single_branch(self): """Test branch with only one branch.""" pipeline = Pipeline([1, 2, 3, 4]) - triple_branch = createTransformer(int).map(lambda x: x * 3) + triple_branch = create_transformer(int).map(lambda x: x * 3) result, _ = pipeline.branch({"tripled": triple_branch}) @@ -289,8 +289,8 @@ def test_branch_with_custom_queue_size(self): """Test branch with custom queue size parameter.""" pipeline = Pipeline([1, 2, 3, 4, 5]) - double_branch = createTransformer(int).map(lambda x: x * 2) - triple_branch = createTransformer(int).map(lambda x: x * 3) + double_branch = create_transformer(int).map(lambda x: x * 2) + triple_branch = create_transformer(int).map(lambda x: x * 3) # Test with a small queue size result, _ = pipeline.branch( @@ -311,9 +311,9 @@ def test_branch_with_three_branches(self): """Test branch with three branches to verify fan-out distribution.""" pipeline = Pipeline([1, 2, 3, 4, 5, 6, 7, 8, 9]) - add_10 = createTransformer(int).map(lambda x: x + 10) - add_20 = createTransformer(int).map(lambda x: x + 20) - add_30 = createTransformer(int).map(lambda x: x + 30) + add_10 = create_transformer(int).map(lambda x: x + 10) + add_20 = create_transformer(int).map(lambda x: x + 20) + add_30 = create_transformer(int).map(lambda x: x + 30) result, _ = pipeline.branch({"add_10": add_10, "add_20": add_20, "add_30": add_30}) @@ -330,8 +330,8 @@ def test_branch_with_filtering_transformers(self): pipeline = Pipeline([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # Create transformers that filter data - even_branch = createTransformer(int).filter(lambda x: x % 2 == 0) - odd_branch = createTransformer(int).filter(lambda x: x % 2 == 1) + even_branch = create_transformer(int).filter(lambda x: x % 2 == 0) + odd_branch = create_transformer(int).filter(lambda x: x % 2 == 1) result, _ = pipeline.branch({"evens": even_branch, "odds": odd_branch}) @@ -346,10 +346,10 @@ def test_branch_with_multiple_transformations(self): pipeline = Pipeline([1, 2, 3, 4, 5, 6]) # Complex transformer: filter evens, then double, then add 1 - complex_branch = createTransformer(int).filter(lambda x: x % 2 == 0).map(lambda x: x * 2).map(lambda x: x + 1) + complex_branch = create_transformer(int).filter(lambda x: x % 2 == 0).map(lambda x: x * 2).map(lambda x: x + 1) # Simple transformer: just multiply by 10 - simple_branch = createTransformer(int).map(lambda x: x * 10) + simple_branch = create_transformer(int).map(lambda x: x * 10) result, _ = pipeline.branch({"complex": complex_branch, "simple": simple_branch}) @@ -367,8 +367,8 @@ def test_branch_with_chunked_data(self): pipeline = Pipeline(data) # Use small chunk size to ensure multiple chunks - small_chunk_transformer = createTransformer(int, chunk_size=5).map(lambda x: x * 2) - identity_transformer = createTransformer(int, chunk_size=5) + small_chunk_transformer = create_transformer(int, chunk_size=5).map(lambda x: x * 2) + identity_transformer = create_transformer(int, chunk_size=5) result, _ = pipeline.branch( { @@ -389,8 +389,8 @@ def test_branch_with_flatten_operation(self): """Test branch with flatten operations.""" pipeline = Pipeline([[1, 2], [3, 4], [5, 6]]) - flatten_branch = createTransformer(list).flatten() - count_branch = createTransformer(list).map(lambda x: len(x)) + flatten_branch = create_transformer(list).flatten() + count_branch = create_transformer(list).map(lambda x: len(x)) result, _ = pipeline.branch( { @@ -410,7 +410,7 @@ def test_branch_is_terminal_operation(self): pipeline = Pipeline([1, 2, 3, 4, 5]) # Create a simple transformer - double_branch = createTransformer(int).map(lambda x: x * 2) + double_branch = create_transformer(int).map(lambda x: x * 2) # Execute branch result, _ = pipeline.branch({"doubled": double_branch}) @@ -429,8 +429,8 @@ def test_branch_with_different_chunk_sizes(self): pipeline = Pipeline(data) # Different chunk sizes for different branches - large_chunk_branch = createTransformer(int, chunk_size=10).map(lambda x: x + 100) - small_chunk_branch = createTransformer(int, chunk_size=3).map(lambda x: x + 200) + large_chunk_branch = create_transformer(int, chunk_size=10).map(lambda x: x + 100) + small_chunk_branch = create_transformer(int, chunk_size=3).map(lambda x: x + 200) result, _ = pipeline.branch({"large_chunk": large_chunk_branch, "small_chunk": small_chunk_branch}) @@ -446,8 +446,8 @@ def test_branch_preserves_data_order_within_chunks(self): pipeline = Pipeline([5, 3, 8, 1, 9, 2]) # Identity transformer should preserve order - identity_branch = createTransformer(int) - reverse_branch = createTransformer(int).map(lambda x: -x) + identity_branch = create_transformer(int) + reverse_branch = create_transformer(int).map(lambda x: -x) result, _ = pipeline.branch({"identity": identity_branch, "negated": reverse_branch}) @@ -462,8 +462,8 @@ def test_branch_with_error_handling(self): pipeline = Pipeline([1, 2, 0, 4, 5]) # Create a transformer that will fail on zero division - division_branch = createTransformer(int).map(lambda x: 10 // x) - safe_branch = createTransformer(int).map(lambda x: x * 2) + division_branch = create_transformer(int).map(lambda x: 10 // x) + safe_branch = create_transformer(int).map(lambda x: x * 2) # The division_branch should fail when processing 0 # The current implementation catches exceptions and returns empty lists for failed branches @@ -489,8 +489,8 @@ def context_modifier_b(chunk: list[int], ctx: IContextManager) -> list[int]: print("branch b", ctx["branch_b_processed"]) return [x * 3 for x in chunk] - branch_a = createTransformer(int)._pipe(context_modifier_a) - branch_b = createTransformer(int)._pipe(context_modifier_b) + branch_a = create_transformer(int)._pipe(context_modifier_a) + branch_b = create_transformer(int)._pipe(context_modifier_b) result, context = pipeline.branch({"branch_a": branch_a, "branch_b": branch_b}) @@ -512,15 +512,15 @@ def test_branch_conditional_router_mode(self): branches = { "integers": ( - createTransformer(int).map(lambda x: x + 1), + create_transformer(int).map(lambda x: x + 1), lambda x: isinstance(x, int), ), "strings": ( - createTransformer(str).map(lambda x: x.upper()), + create_transformer(str).map(lambda x: x.upper()), lambda x: isinstance(x, str), ), "numbers": ( # This condition also matches integers - createTransformer(float).map(lambda x: x * 10), + create_transformer(float).map(lambda x: x * 10), lambda x: isinstance(x, int | float), ), } @@ -545,15 +545,15 @@ def test_branch_conditional_broadcast_mode(self): branches = { "integers": ( - createTransformer(int).map(lambda x: x + 1), + create_transformer(int).map(lambda x: x + 1), lambda x: isinstance(x, int), ), "strings": ( - createTransformer(str).map(lambda x: x.upper()), + create_transformer(str).map(lambda x: x.upper()), lambda x: isinstance(x, str), ), "numbers": ( # This condition also matches integers - createTransformer(float).map(lambda x: x * 10), + create_transformer(float).map(lambda x: x * 10), lambda x: isinstance(x, int | float), ), } @@ -592,11 +592,11 @@ def check_pid(chunk: list[int], ctx: IContextManager) -> list[int]: # Define branches with CPU-bound work and the PID check branches = { "evens": ( - createTransformer(int).filter(lambda x: x % 2 == 0).map(heavy_computation)._pipe(check_pid), + create_transformer(int).filter(lambda x: x % 2 == 0).map(heavy_computation)._pipe(check_pid), lambda x: True, # Condition to route data ), "odds": ( - createTransformer(int).filter(lambda x: x % 2 != 0).map(heavy_computation)._pipe(check_pid), + create_transformer(int).filter(lambda x: x % 2 != 0).map(heavy_computation)._pipe(check_pid), lambda x: True, ), } diff --git a/tests/test_threaded_transformer.py b/tests/test_threaded_transformer.py index 56ff9cc..84e318f 100644 --- a/tests/test_threaded_transformer.py +++ b/tests/test_threaded_transformer.py @@ -3,42 +3,32 @@ import time from laygo import ErrorHandler -from laygo import ThreadedTransformer from laygo.context.parallel import ParallelContextManager from laygo.context.types import IContextManager -from laygo.transformers.threaded import createThreadedTransformer -from laygo.transformers.transformer import createTransformer +from laygo.transformers.transformer import Transformer +from laygo.transformers.transformer import create_threaded_transformer +from laygo.transformers.transformer import create_transformer class TestThreadedTransformerBasics: """Test core parallel transformer functionality.""" - def test_initialization_custom_parameters(self): - """Test initialization with custom parameters.""" - transformer = ThreadedTransformer[int, int](max_workers=8, ordered=False, chunk_size=500) - assert transformer.max_workers == 8 - assert transformer.ordered is False - assert transformer.chunk_size == 500 - def test_basic_execution(self): """Test basic parallel transformer execution.""" - transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=3) + transformer = create_threaded_transformer(int, max_workers=2, chunk_size=3) result = list(transformer([1, 2, 3, 4, 5])) assert result == [1, 2, 3, 4, 5] def test_from_transformer_creation(self): """Test creating ThreadedTransformer from existing Transformer.""" - regular = createTransformer(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x > 5) - parallel = ThreadedTransformer.from_transformer(regular, max_workers=2, ordered=True) + regular = create_transformer(int, chunk_size=100).map(lambda x: x * 2).filter(lambda x: x > 5) + parallel = Transformer.from_transformer(regular) data = [1, 2, 3, 4, 5, 6] regular_results = list(regular(data)) parallel_results = list(parallel(data)) assert regular_results == parallel_results - assert parallel.max_workers == 2 - assert parallel.ordered is True - assert parallel.chunk_size == 100 class TestThreadedTransformerOperations: @@ -46,20 +36,20 @@ class TestThreadedTransformerOperations: def test_map_concurrent_execution(self): """Test map operation with concurrent execution.""" - transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) + transformer = create_threaded_transformer(int, max_workers=2, chunk_size=2).map(lambda x: x * 2) result = list(transformer([1, 2, 3, 4])) assert result == [2, 4, 6, 8] def test_filter_concurrent_execution(self): """Test filter operation with concurrent execution.""" - transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2).filter(lambda x: x % 2 == 0) + transformer = create_threaded_transformer(int, max_workers=2, chunk_size=2).filter(lambda x: x % 2 == 0) result = list(transformer([1, 2, 3, 4, 5, 6])) assert result == [2, 4, 6] def test_chained_operations(self): """Test chained operations work correctly with concurrency.""" transformer = ( - ThreadedTransformer[int, int](max_workers=2, chunk_size=2) + create_threaded_transformer(int, max_workers=2, chunk_size=2) .map(lambda x: x * 2) .filter(lambda x: x > 4) .map(lambda x: x + 1) @@ -69,14 +59,14 @@ def test_chained_operations(self): def test_flatten_operation(self): """Test flatten operation with concurrent execution.""" - transformer = ThreadedTransformer[list[int], list[int]](max_workers=2, chunk_size=2).flatten() + transformer = create_threaded_transformer(list[int], max_workers=2, chunk_size=2).flatten() result = list(transformer([[1, 2], [3, 4], [5, 6]])) assert result == [1, 2, 3, 4, 5, 6] def test_tap_side_effects(self): """Test tap applies side effects correctly in concurrent execution.""" side_effects = [] - transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2) + transformer = create_threaded_transformer(int, max_workers=2, chunk_size=2) transformer = transformer.tap(lambda x: side_effects.append(x)) result = list(transformer([1, 2, 3, 4])) @@ -90,7 +80,7 @@ class TestThreadedTransformerContextSupport: def test_map_with_context(self): """Test map with context-aware function in concurrent execution.""" context = ParallelContextManager({"multiplier": 3}) - transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2) + transformer = create_threaded_transformer(int, max_workers=2, chunk_size=2) transformer = transformer.map(lambda x, ctx: x * ctx["multiplier"]) result = list(transformer([1, 2, 3], context)) assert result == [3, 6, 9] @@ -106,7 +96,7 @@ def safe_increment(x: int, ctx: IContextManager) -> int: ctx["items"] = ctx["items"] + 1 return x * 2 - transformer = ThreadedTransformer[int, int](max_workers=4, chunk_size=1) + transformer = create_threaded_transformer(int, max_workers=4, chunk_size=1) transformer = transformer.map(safe_increment) data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] @@ -126,7 +116,7 @@ def update_stats(x: int, ctx: IContextManager) -> int: ctx["max_value"] = max(ctx["max_value"], x) return x * 3 - transformer = ThreadedTransformer[int, int](max_workers=3, chunk_size=2) + transformer = create_threaded_transformer(int, max_workers=3, chunk_size=2) transformer = transformer.map(update_stats) data = [1, 5, 3, 8, 2, 7, 4, 6] @@ -148,7 +138,7 @@ def variable_time_transform(x: int) -> int: time.sleep(0.01 * (5 - x)) # Later elements process faster return x * 2 - transformer = ThreadedTransformer[int, int](max_workers=3, ordered=True, chunk_size=2) + transformer = create_threaded_transformer(int, max_workers=3, ordered=True, chunk_size=2) transformer = transformer.map(variable_time_transform) result = list(transformer([1, 2, 3, 4, 5])) @@ -158,10 +148,10 @@ def test_unordered_vs_ordered_same_elements(self): """Test that ordered and unordered produce same elements with different ordering.""" data = list(range(10)) - ordered_transformer = ThreadedTransformer[int, int](max_workers=3, ordered=True, chunk_size=3) + ordered_transformer = create_threaded_transformer(int, max_workers=3, ordered=True, chunk_size=3) ordered_result = list(ordered_transformer.map(lambda x: x * 2)(data)) - unordered_transformer = ThreadedTransformer[int, int](max_workers=3, ordered=False, chunk_size=3) + unordered_transformer = create_threaded_transformer(int, max_workers=3, ordered=False, chunk_size=3) unordered_result = list(unordered_transformer.map(lambda x: x * 2)(data)) assert sorted(ordered_result) == sorted(unordered_result) @@ -179,7 +169,7 @@ def track_processing(x: int) -> int: processed_chunks.append(x) return x * 2 - transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=3) + transformer = create_threaded_transformer(int, max_workers=2, chunk_size=3) transformer = transformer.map(track_processing) result = list(transformer([1, 2, 3, 4, 5, 6, 7])) @@ -188,7 +178,7 @@ def track_processing(x: int) -> int: def test_large_chunk_size_handling(self): """Test parallel transformer with large chunk size relative to data.""" - transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=1000) + transformer = create_threaded_transformer(int, max_workers=2, chunk_size=1000) transformer = transformer.map(lambda x: x + 1) large_data = list(range(100)) # Much smaller than chunk size result = list(transformer(large_data)) @@ -201,28 +191,28 @@ class TestThreadedTransformerEdgeCases: def test_empty_data(self): """Test parallel transformer with empty data.""" - transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2) + transformer = create_threaded_transformer(int, max_workers=2, chunk_size=2).map(lambda x: x * 2) result = list(transformer([])) assert result == [] def test_single_element(self): """Test parallel transformer with single element.""" transformer = ( - ThreadedTransformer[int, int](max_workers=2, chunk_size=2).map(lambda x: x * 2).filter(lambda x: x > 0) + create_threaded_transformer(int, max_workers=2, chunk_size=2).map(lambda x: x * 2).filter(lambda x: x > 0) ) result = list(transformer([5])) assert result == [10] def test_data_smaller_than_chunk_size(self): """Test when data is smaller than chunk size.""" - transformer = ThreadedTransformer[int, int](max_workers=4, chunk_size=100) + transformer = create_threaded_transformer(int, max_workers=4, chunk_size=100) transformer = transformer.map(lambda x: x * 2) result = list(transformer([1, 2, 3])) assert result == [2, 4, 6] def test_more_workers_than_chunks(self): """Test when workers exceed number of chunks.""" - transformer = ThreadedTransformer[int, int](max_workers=10, chunk_size=2) + transformer = create_threaded_transformer(int, max_workers=10, chunk_size=2) transformer = transformer.map(lambda x: x * 2) result = list(transformer([1, 2, 3])) # Only 2 chunks, but 10 workers assert result == [2, 4, 6] @@ -235,7 +225,7 @@ def failing_function(x: int) -> int: raise ValueError("Test exception") return x * 2 - transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2) + transformer = create_threaded_transformer(int, max_workers=2, chunk_size=2) transformer = transformer.map(failing_function) try: @@ -250,14 +240,14 @@ class TestThreadedTransformerErrorHandling: def test_safe_with_successful_operation(self): """Test safe execution with successful transformation.""" - transformer = createThreadedTransformer(int).catch(lambda t: t.map(lambda x: x * 2)) + transformer = create_threaded_transformer(int).catch(lambda t: t.map(lambda x: x * 2)) result = list(transformer([1, 2, 3])) assert result == [2, 4, 6] def test_safe_with_error_isolation(self): """Test safe execution isolates errors to specific chunks.""" errored_chunks = [] - transformer = createThreadedTransformer(int, chunk_size=1).catch( + transformer = create_threaded_transformer(int, chunk_size=1).catch( lambda t: t.map(lambda x: x / 0), # Division by zero on_error=lambda chunk, error, context: errored_chunks.append(chunk), # type: ignore ) @@ -273,7 +263,7 @@ def test_global_error_handler(self): error_handler.on_error(lambda chunk, error, context: errored_chunks.append(chunk)) transformer = ( - createThreadedTransformer(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) + create_threaded_transformer(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) ) list(transformer([1, 2, 3])) diff --git a/tests/test_transformer.py b/tests/test_transformer.py index 6fe53e6..154ff64 100644 --- a/tests/test_transformer.py +++ b/tests/test_transformer.py @@ -5,7 +5,7 @@ from laygo import ErrorHandler from laygo import Transformer from laygo.context.simple import SimpleContextManager -from laygo.transformers.transformer import createTransformer +from laygo.transformers.transformer import create_transformer class TestTransformerBasics: @@ -13,13 +13,13 @@ class TestTransformerBasics: def test_identity_transformer(self): """Test that init creates an identity transformer.""" - transformer = createTransformer(int) + transformer = create_transformer(int) result = list(transformer([1, 2, 3])) assert result == [1, 2, 3] def test_custom_chunk_size(self): """Test transformer with custom chunk size.""" - transformer = createTransformer(int, chunk_size=2) + transformer = create_transformer(int, chunk_size=2) assert transformer.chunk_size == 2 # Functionality should work regardless of chunk size result = list(transformer([1, 2, 3, 4])) @@ -31,27 +31,27 @@ class TestTransformerOperations: def test_map_transformation(self): """Test map transforms each element.""" - transformer = createTransformer(int).map(lambda x: x * 2) + transformer = create_transformer(int).map(lambda x: x * 2) result = list(transformer([1, 2, 3])) assert result == [2, 4, 6] def test_filter_operation(self): """Test filter keeps only matching elements.""" - transformer = createTransformer(int).filter(lambda x: x % 2 == 0) + transformer = create_transformer(int).filter(lambda x: x % 2 == 0) result = list(transformer([1, 2, 3, 4, 5, 6])) assert result == [2, 4, 6] def test_flatten_operation(self): """Test flatten with various iterable types.""" # Test with lists - transformer = createTransformer(list).flatten() + transformer = create_transformer(list).flatten() result = list(transformer([[1, 2], [3, 4], [5]])) assert result == [1, 2, 3, 4, 5] def test_tap_side_effects(self): """Test tap applies side effects without modifying data.""" side_effects = [] - transformer = createTransformer(int).tap(lambda x: side_effects.append(x)) + transformer = create_transformer(int).tap(lambda x: side_effects.append(x)) result = list(transformer([1, 2, 3])) assert result == [1, 2, 3] # Data unchanged @@ -60,13 +60,13 @@ def test_tap_side_effects(self): def test_loop_basic_operation(self): """Test loop applies transformer repeatedly until condition is met.""" # Create a loop transformer that adds 1 to each element - increment_transformer = createTransformer(int).map(lambda x: x + 1) + increment_transformer = create_transformer(int).map(lambda x: x + 1) # Continue looping while any element is less than 5 def condition(chunk): return any(x < 5 for x in chunk) - transformer = createTransformer(int).loop(increment_transformer, condition, max_iterations=10) + transformer = create_transformer(int).loop(increment_transformer, condition, max_iterations=10) result = list(transformer([1, 2, 3])) # Should increment until all elements are >= 5: [1,2,3] -> [2,3,4] -> [3,4,5] -> [4,5,6] -> [5,6,7] @@ -75,13 +75,13 @@ def condition(chunk): def test_loop_with_max_iterations(self): """Test loop respects max_iterations limit.""" # Create a loop transformer that adds 1 to each element - increment_transformer = createTransformer(int).map(lambda x: x + 1) + increment_transformer = create_transformer(int).map(lambda x: x + 1) # Condition that would normally continue indefinitely def always_true_condition(chunk): return True - transformer = createTransformer(int).loop(increment_transformer, always_true_condition, max_iterations=3) + transformer = create_transformer(int).loop(increment_transformer, always_true_condition, max_iterations=3) result = list(transformer([1, 2, 3])) # Should stop after 3 iterations: [1,2,3] -> [2,3,4] -> [3,4,5] -> [4,5,6] @@ -89,13 +89,13 @@ def always_true_condition(chunk): def test_loop_no_iterations(self): """Test loop when condition is false from the start.""" - increment_transformer = createTransformer(int).map(lambda x: x + 1) + increment_transformer = create_transformer(int).map(lambda x: x + 1) # Condition that's immediately false def exit_immediately(chunk): return False - transformer = createTransformer(int).loop(increment_transformer, exit_immediately) + transformer = create_transformer(int).loop(increment_transformer, exit_immediately) result = list(transformer([1, 2, 3])) # Should not iterate at all @@ -135,14 +135,14 @@ def test_tap_with_transformer(self): # Create a side-effect transformer that logs processed values side_effect_transformer = ( - createTransformer(int) + create_transformer(int) .map(lambda x: x * 10) # Transform for side effect .tap(lambda x: side_effects.append(x)) # Capture the transformed values ) # Main transformer that uses the side-effect transformer via tap main_transformer = ( - createTransformer(int) + create_transformer(int) .map(lambda x: x * 2) # Main transformation .tap(side_effect_transformer) # Apply side-effect transformer .map(lambda x: x + 1) # Continue main transformation @@ -163,14 +163,14 @@ def test_tap_with_transformer_and_context(self): # Create a context-aware side-effect transformer side_effect_transformer = ( - createTransformer(int) + create_transformer(int) .map(lambda x, ctx: x * ctx["multiplier"]) # Use context multiplier .tap(lambda x, ctx: side_effects.append(f"{ctx['log_prefix']}{x}")) # Log with context prefix ) # Main transformer main_transformer = ( - createTransformer(int) + create_transformer(int) .map(lambda x: x + 10) # Main transformation .tap(side_effect_transformer) # Apply side-effect transformer with context ) @@ -190,7 +190,7 @@ def test_loop_with_context(self): # Create a context-aware loop transformer that uses context increment loop_transformer = ( - createTransformer(int) + create_transformer(int) .map(lambda x, ctx: x + ctx["increment"]) # Use context increment .tap(lambda x, ctx: side_effects.append(f"iteration:{x}")) # Log each iteration ) @@ -199,7 +199,7 @@ def test_loop_with_context(self): def condition_with_context(chunk, ctx): return sum(chunk) < ctx["target_sum"] - main_transformer = createTransformer(int).loop(loop_transformer, condition_with_context, max_iterations=10) + main_transformer = create_transformer(int).loop(loop_transformer, condition_with_context, max_iterations=10) result = list(main_transformer([1, 2, 3], context)) @@ -216,13 +216,13 @@ def test_loop_with_context_and_side_effects(self): context = SimpleContextManager({"max_value": 20, "increment": 3}) # Simple loop transformer that uses context increment - loop_transformer = createTransformer(int).map(lambda x, ctx: x + ctx["increment"]) + loop_transformer = create_transformer(int).map(lambda x, ctx: x + ctx["increment"]) # Context-aware condition: continue while max value in chunk is less than context max_value def condition_with_context(chunk, ctx): return max(chunk) < ctx["max_value"] - main_transformer = createTransformer(int).loop(loop_transformer, condition_with_context, max_iterations=10) + main_transformer = create_transformer(int).loop(loop_transformer, condition_with_context, max_iterations=10) result = list(main_transformer([5, 8], context)) @@ -235,14 +235,14 @@ class TestTransformerChaining: def test_map_filter_chain(self): """Test map followed by filter.""" - transformer = createTransformer(int).map(lambda x: x * 2).filter(lambda x: x > 4) + transformer = create_transformer(int).map(lambda x: x * 2).filter(lambda x: x > 4) result = list(transformer([1, 2, 3, 4])) assert result == [6, 8] def test_complex_operation_chain(self): """Test complex chain with multiple operations.""" transformer = ( - createTransformer(int) + create_transformer(int) .map(lambda x: [x, x * 2]) # Create pairs .flatten() # Flatten to single list .filter(lambda x: x > 3) # Keep values > 3 @@ -252,7 +252,7 @@ def test_complex_operation_chain(self): def test_transformer_composition(self): """Test transformer composition with apply.""" - base_transformer = createTransformer(int).map(lambda x: x * 2) + base_transformer = create_transformer(int).map(lambda x: x * 2) composed_transformer = base_transformer.apply(lambda t: t.filter(lambda x: x > 4)) result = list(composed_transformer([1, 2, 3, 4])) assert result == [6, 8] @@ -263,7 +263,7 @@ class TestTransformerReduceOperations: def test_basic_reduce(self): """Test reduce with sum operation.""" - transformer = createTransformer(int) + transformer = create_transformer(int) reducer = transformer.reduce(lambda acc, x: acc + x, initial=0) result = list(reducer([1, 2, 3, 4], None)) assert result == [10] @@ -278,14 +278,14 @@ def test_reduce_with_context(self): def test_reduce_after_transformation(self): """Test reduce after map transformation.""" - transformer = createTransformer(int).map(lambda x: x * 2) + transformer = create_transformer(int).map(lambda x: x * 2) reducer = transformer.reduce(lambda acc, x: acc + x, initial=0) result = list(reducer([1, 2, 3], None)) assert result == [12] # [2, 4, 6] summed = 12 def test_reduce_per_chunk_basic(self): """Test reduce with per_chunk=True for basic operation.""" - transformer = createTransformer(int, chunk_size=2).reduce(lambda acc, x: acc + x, initial=0, per_chunk=True) + transformer = create_transformer(int, chunk_size=2).reduce(lambda acc, x: acc + x, initial=0, per_chunk=True) result = list(transformer([1, 2, 3, 4, 5])) # With chunk_size=2: [1, 2] -> 3, [3, 4] -> 7, [5] -> 5 assert result == [3, 7, 5] @@ -293,7 +293,7 @@ def test_reduce_per_chunk_basic(self): def test_reduce_per_chunk_with_context(self): """Test reduce with per_chunk=True and context-aware function.""" context = SimpleContextManager({"multiplier": 2}) - transformer = createTransformer(int, chunk_size=2).reduce( + transformer = create_transformer(int, chunk_size=2).reduce( lambda acc, x, ctx: acc + (x * ctx["multiplier"]), initial=0, per_chunk=True ) result = list(transformer([1, 2, 3], context)) @@ -302,13 +302,13 @@ def test_reduce_per_chunk_with_context(self): def test_reduce_per_chunk_empty_chunks(self): """Test reduce with per_chunk=True handles empty chunks correctly.""" - transformer = createTransformer(int, chunk_size=5).reduce(lambda acc, x: acc + x, initial=0, per_chunk=True) + transformer = create_transformer(int, chunk_size=5).reduce(lambda acc, x: acc + x, initial=0, per_chunk=True) result = list(transformer([])) assert result == [] def test_reduce_per_chunk_single_element_chunks(self): """Test reduce with per_chunk=True with single element chunks.""" - transformer = createTransformer(int, chunk_size=1).reduce(lambda acc, x: acc + x, initial=10, per_chunk=True) + transformer = create_transformer(int, chunk_size=1).reduce(lambda acc, x: acc + x, initial=10, per_chunk=True) result = list(transformer([1, 2, 3])) # Each chunk has one element: [1] -> 10+1=11, [2] -> 10+2=12, [3] -> 10+3=13 assert result == [11, 12, 13] @@ -316,7 +316,7 @@ def test_reduce_per_chunk_single_element_chunks(self): def test_reduce_per_chunk_chaining(self): """Test reduce with per_chunk=True can be chained with other operations.""" transformer = ( - createTransformer(int, chunk_size=2) + create_transformer(int, chunk_size=2) .map(lambda x: x * 2) .reduce(lambda acc, x: acc + x, initial=0, per_chunk=True) .map(lambda x: x * 10) @@ -332,12 +332,12 @@ def test_reduce_per_chunk_different_chunk_sizes(self): data = [1, 2, 3, 4, 5, 6] # Test with chunk_size=2 - transformer_2 = createTransformer(int, chunk_size=2).reduce(lambda acc, x: acc + x, initial=0, per_chunk=True) + transformer_2 = create_transformer(int, chunk_size=2).reduce(lambda acc, x: acc + x, initial=0, per_chunk=True) result_2 = list(transformer_2(data)) assert result_2 == [3, 7, 11] # [1,2]->3, [3,4]->7, [5,6]->11 # Test with chunk_size=3 - transformer_3 = createTransformer(int, chunk_size=3).reduce(lambda acc, x: acc + x, initial=0, per_chunk=True) + transformer_3 = create_transformer(int, chunk_size=3).reduce(lambda acc, x: acc + x, initial=0, per_chunk=True) result_3 = list(transformer_3(data)) assert result_3 == [6, 15] # [1,2,3]->6, [4,5,6]->15 @@ -346,13 +346,13 @@ def test_reduce_per_chunk_versus_terminal(self): data = [1, 2, 3, 4] # Terminal reduce (per_chunk=False) - returns a callable - transformer_terminal = createTransformer(int, chunk_size=2) + transformer_terminal = create_transformer(int, chunk_size=2) reducer_terminal = transformer_terminal.reduce(lambda acc, x: acc + x, initial=0, per_chunk=False) result_terminal = list(reducer_terminal(data, None)) assert result_terminal == [10] # Sum of all elements # Per-chunk reduce (per_chunk=True) - returns a transformer - transformer_per_chunk = createTransformer(int, chunk_size=2).reduce( + transformer_per_chunk = create_transformer(int, chunk_size=2).reduce( lambda acc, x: acc + x, initial=0, per_chunk=True ) result_per_chunk = list(transformer_per_chunk(data)) @@ -364,19 +364,19 @@ class TestTransformerEdgeCases: def test_empty_data(self): """Test transformer with empty data.""" - transformer = createTransformer(int).map(lambda x: x * 2) + transformer = create_transformer(int).map(lambda x: x * 2) result = list(transformer([])) assert result == [] def test_single_element(self): """Test transformer with single element.""" - transformer = createTransformer(int).map(lambda x: x * 2).filter(lambda x: x > 0) + transformer = create_transformer(int).map(lambda x: x * 2).filter(lambda x: x > 0) result = list(transformer([5])) assert result == [10] def test_filter_removes_all_elements(self): """Test filter that removes all elements.""" - transformer = createTransformer(int).filter(lambda x: x > 100) + transformer = create_transformer(int).filter(lambda x: x > 100) result = list(transformer([1, 2, 3])) assert result == [] @@ -385,11 +385,11 @@ def test_chunking_behavior(self): data = list(range(100)) # Small chunks - small_chunk_transformer = createTransformer(int, chunk_size=5).map(lambda x: x * 2) + small_chunk_transformer = create_transformer(int, chunk_size=5).map(lambda x: x * 2) small_result = list(small_chunk_transformer(data)) # Large chunks - large_chunk_transformer = createTransformer(int, chunk_size=50).map(lambda x: x * 2) + large_chunk_transformer = create_transformer(int, chunk_size=50).map(lambda x: x * 2) large_result = list(large_chunk_transformer(data)) # Results should be identical regardless of chunk size @@ -401,7 +401,7 @@ class TestTransformerFromTransformer: def test_copy_transformer_logic(self): """Test that from_transformer copies transformation logic.""" - source = createTransformer(int, chunk_size=50).map(lambda x: x * 3).filter(lambda x: x > 6) + source = create_transformer(int, chunk_size=50).map(lambda x: x * 3).filter(lambda x: x > 6) target = Transformer.from_transformer(source) data = [1, 2, 3, 4, 5] @@ -413,7 +413,7 @@ def test_copy_transformer_logic(self): def test_copy_with_custom_parameters(self): """Test from_transformer with custom parameters.""" - source = createTransformer(int).map(lambda x: x * 2) + source = create_transformer(int).map(lambda x: x * 2) target = Transformer.from_transformer(source, chunk_size=200) assert target.chunk_size == 200 @@ -427,14 +427,14 @@ class TestTransformerErrorHandling: def test_catch_with_successful_operation(self): """Test catch with successful transformation.""" - transformer = createTransformer(int).catch(lambda t: t.map(lambda x: x * 2)) + transformer = create_transformer(int).catch(lambda t: t.map(lambda x: x * 2)) result = list(transformer([1, 2, 3])) assert result == [2, 4, 6] def test_catch_with_error_isolation(self): """Test catch isolates errors to specific chunks.""" errored_chunks = [] - transformer = createTransformer(int, chunk_size=1).catch( + transformer = create_transformer(int, chunk_size=1).catch( lambda t: t.map(lambda x: x / 0), # Division by zero on_error=lambda chunk, error, context: errored_chunks.append(chunk), # type: ignore ) @@ -449,7 +449,7 @@ def test_global_error_handler(self): error_handler = ErrorHandler() error_handler.on_error(lambda chunk, error, context: errored_chunks.append(chunk)) - transformer = createTransformer(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) + transformer = create_transformer(int, chunk_size=1).on_error(error_handler).catch(lambda t: t.map(lambda x: x / 0)) list(transformer([1, 2, 3])) assert errored_chunks == [[1], [2], [3]] @@ -461,7 +461,7 @@ def set_error_flag(_chunk, _error, context): context["error_occurred"] = True transformer = ( - createTransformer(int, chunk_size=1) + create_transformer(int, chunk_size=1) .catch( lambda t: t.map(lambda x: x / 0), on_error=set_error_flag, # type: ignore @@ -483,7 +483,7 @@ def raise_on_error(ctx): raise RuntimeError("Short-circuit condition met, stopping execution.") transformer = ( - createTransformer(int, chunk_size=1) + create_transformer(int, chunk_size=1) .catch( lambda t: t.map(lambda x: x / 0), on_error=set_error_flag, # type: ignore From 3933b08f46a3113e8309e681117e7a104be33671 Mon Sep 17 00:00:00 2001 From: ringoldsdev Date: Wed, 30 Jul 2025 21:13:06 +0000 Subject: [PATCH 2/2] feat: http execution strategy --- laygo/__init__.py | 4 +- laygo/transformers/http.py | 441 ++++++++++++++------------ laygo/transformers/strategies/http.py | 65 ++++ tests/test_http_transformer.py | 12 +- 4 files changed, 319 insertions(+), 203 deletions(-) diff --git a/laygo/__init__.py b/laygo/__init__.py index e5b4829..4405f5f 100644 --- a/laygo/__init__.py +++ b/laygo/__init__.py @@ -8,7 +8,7 @@ from laygo.helpers import PipelineContext from laygo.pipeline import Pipeline from laygo.transformers.http import HTTPTransformer -from laygo.transformers.http import createHTTPTransformer +from laygo.transformers.http import create_http_transformer from laygo.transformers.transformer import Transformer from laygo.transformers.transformer import build_chunk_generator from laygo.transformers.transformer import create_process_transformer @@ -23,7 +23,7 @@ "create_threaded_transformer", "create_process_transformer", "HTTPTransformer", - "createHTTPTransformer", + "create_http_transformer", "PipelineContext", "ErrorHandler", "passthrough_chunks", diff --git a/laygo/transformers/http.py b/laygo/transformers/http.py index fb71389..70ce99e 100644 --- a/laygo/transformers/http.py +++ b/laygo/transformers/http.py @@ -1,244 +1,295 @@ -"""Distributed transformer implementation with HTTP-based worker coordination.""" +"""HTTP transformer implementation for distributed data processing. + +The HTTP transformer enables distributed processing by sending data chunks +to remote HTTP workers. It serves dual purposes: + +1. **Server-side**: Defines transformation logic that can be exposed as an HTTP endpoint +2. **Client-side**: Sends data chunks to remote workers via HTTP requests + +## Architecture + +The HTTPTransformer acts as both a definition of work to be done and a client +that can distribute that work: + +- When used as a **server**, it provides route configuration through `get_route()` + that can be registered with a web framework (Flask, FastAPI, etc.) +- When used as a **client**, it automatically sends data chunks to the configured + worker URL and collects results + +## Usage Example + +```python +# Create an HTTP transformer +http_transformer, get_route = create_http_transformer( + int, + endpoint="http://worker.example.com" +) + +# Define the transformation logic +http_transformer.map(lambda x: x * 2).filter(lambda x: x > 10) + +# Server-side: Get route configuration +endpoint_path, worker_func = get_route() +# Register with your web framework: app.route(endpoint_path)(worker_func) + +# Client-side: Use in a pipeline +pipeline = Pipeline(data).apply(http_transformer) +results, _ = pipeline.to_list() +``` + +## Endpoint Resolution + +The transformer handles different endpoint formats: + +- **Full URL**: `"http://worker.com/process"` - Used as-is for client requests +- **Domain only**: `"http://worker.com"` - Automatically appends `/process/data` +- **Path only**: `"/custom/path"` - Used for server-side route registration +- **Auto-generated**: If no endpoint provided, generates unique path from logic hash +""" from collections.abc import Callable -from collections.abc import Iterable -from collections.abc import Iterator -from concurrent.futures import FIRST_COMPLETED -from concurrent.futures import ThreadPoolExecutor -from concurrent.futures import wait import hashlib -import itertools import pickle -from typing import Any -from typing import TypeVar -from typing import Union -from typing import overload - -import requests - -from laygo.context import IContextManager -from laygo.context import SimpleContextManager -from laygo.errors import ErrorHandler -from laygo.transformers.transformer import ChunkErrorHandler -from laygo.transformers.transformer import PipelineFunction -from laygo.transformers.transformer import Transformer -from laygo.transformers.types import BaseTransformer -In = TypeVar("In") -Out = TypeVar("Out") -T = TypeVar("T") -U = TypeVar("U") +from laygo.context.types import IContextManager +from laygo.transformers.strategies.http import HTTPStrategy +from laygo.transformers.transformer import DEFAULT_CHUNK_SIZE +from laygo.transformers.transformer import Transformer -def createHTTPTransformer[T]( +def create_http_transformer[T]( _type_hint: type[T], - base_url: str, - chunk_size: int | None = None, + chunk_size: int = DEFAULT_CHUNK_SIZE, endpoint: str | None = None, - max_workers: int = 4, -) -> "HTTPTransformer[T, T]": - """Create a new identity HTTP transformer with an explicit type hint. +) -> tuple["HTTPTransformer[T, T]", Callable[[], tuple[str, Callable[[list[T], IContextManager], list[T]]]]]: + """Create a new HTTP transformer with type safety and route configuration. + + This factory function creates an HTTP transformer that can be used for + distributed processing. It returns both the transformer instance and + a function to get the server-side route configuration. Args: - _type_hint: Type hint for the data being processed. - base_url: The base URL for the HTTP worker service. - chunk_size: Size of chunks to process data in. - endpoint: Optional specific endpoint path. - max_workers: Maximum number of concurrent HTTP requests. + _type_hint: Type hint for the data being processed. Used for type safety + but not functionally required. + chunk_size: Number of items to process in each chunk. Defaults to 1000. + endpoint: The worker endpoint specification. Can be: + - Full URL: "http://worker.com/process" (used as-is) + - Domain: "http://worker.com" (auto-appends path) + - Path: "/custom/endpoint" (for server registration) + - None: Auto-generates unique path from logic hash Returns: - A new identity HTTP transformer. + A tuple containing: + - The HTTPTransformer instance for client-side use + - A function that returns (endpoint_path, worker_function) for server setup + + Example: + >>> transformer, get_route = create_http_transformer( + ... int, endpoint="http://worker.example.com" + ... ) + >>> transformer.map(lambda x: x * 2) + >>> endpoint, worker_func = get_route() """ - return HTTPTransformer[T, T]( - base_url=base_url, - endpoint=endpoint, - max_workers=max_workers, - chunk_size=chunk_size, - ) + transformer = HTTPTransformer[T, T](chunk_size=chunk_size, endpoint=endpoint) + return (transformer, transformer.get_route) + + +class HTTPTransformer[In, Out](Transformer[In, Out]): + """A transformer that enables distributed processing via HTTP workers. + The HTTPTransformer serves as both a client and server component for + distributed data processing: -class HTTPTransformer[In, Out](BaseTransformer[In, Out]): - """A self-sufficient, chainable transformer for distributed execution. + - **Client mode**: Automatically sends data chunks to remote HTTP workers + - **Server mode**: Provides route configuration for web framework integration - This transformer manages its own distributed execution by coordinating - with HTTP-based worker endpoints. It can automatically generate worker - endpoints based on the transformation logic or use predefined endpoints. + The transformer uses an HTTPStrategy to handle the actual HTTP communication, + including connection pooling, error handling, and concurrent requests. + + Attributes: + _endpoint: The endpoint specification (URL, domain, or path) + _final_endpoint: Cached final endpoint path for server registration + + Example: + >>> # Create and configure transformer + >>> transformer = HTTPTransformer(endpoint="http://worker.com") + >>> transformer.map(lambda x: x * 2).filter(lambda x: x > 5) + >>> + >>> # Use as client + >>> results = list(transformer([1, 2, 3, 4, 5])) + >>> + >>> # Get server configuration + >>> path, worker_func = transformer.get_route() """ def __init__( self, - base_url: str, endpoint: str | None = None, - max_workers: int = 8, chunk_size: int | None = None, - ) -> None: - """Initialize the HTTP transformer. + ): + """Initialize an HTTP transformer. Args: - base_url: The base URL for the worker service. - endpoint: Optional specific endpoint path. If not provided, - one will be auto-generated. - max_workers: Maximum number of concurrent HTTP requests. - chunk_size: Size of data chunks to process. - """ - super().__init__(chunk_size=chunk_size) - self.base_url = base_url.rstrip("/") - self.endpoint = endpoint - self.max_workers = max_workers - self.session = requests.Session() - self._worker_url: str | None = None - # HTTP transformers always use a simple context manager to avoid serialization issues - self._default_context = SimpleContextManager() - - def _finalize_config(self) -> None: - """Determine the final worker URL, generating one if needed. - - If no explicit endpoint was provided, this method generates a unique - endpoint based on a hash of the transformation logic. + endpoint: The worker endpoint specification. Can be: + - Full URL: "http://worker.com/api/process" + - Domain only: "http://worker.com" (auto-appends path) + - Path only: "/api/process" (for server use) + - None: Auto-generates path from transformation hash + chunk_size: Number of items to process per chunk. If None, uses + the default chunk size from the base transformer. + + Note: + The HTTPStrategy is configured to use the worker URL determined + by the _get_worker_url method. """ - if hasattr(self, "_worker_url") and self._worker_url: - return + super().__init__(strategy=HTTPStrategy(self._get_worker_url), chunk_size=chunk_size) + self._endpoint = endpoint + self._final_endpoint: str | None = None - if self.endpoint: - path = self.endpoint - else: - if not self.transformer: - raise ValueError("Cannot determine endpoint for an empty transformer.") - serialized_logic = pickle.dumps(self.transformer) - hash_id = hashlib.sha1(serialized_logic).hexdigest()[:16] - path = f"/autogen/{hash_id}" + def _generate_endpoint_path(self) -> str: + """Generate a unique endpoint path from the transformation logic. - self.endpoint = path.lstrip("/") - self._worker_url = f"{self.base_url}/{self.endpoint}" + Creates a deterministic path based on the hash of the serialized + transformation logic. This ensures that identical transformations + will always generate the same endpoint path. - def __call__(self, data: Iterable[In], context: IContextManager | None = None) -> Iterator[Out]: - """Execute distributed processing on the data (CLIENT-SIDE). + Returns: + A unique endpoint path in the format "/autogen/{hash}" - This method is called by the Pipeline to start distributed processing. - It sends data chunks to worker endpoints via HTTP. + Note: + Uses SHA-1 hash of the pickled transformer function, truncated + to 16 characters for readability. + """ + serialized_logic = pickle.dumps(self.transformer) + hash_id = hashlib.sha1(serialized_logic).hexdigest()[:16] + return f"/autogen/{hash_id}" - Args: - data: The input data to process. - context: Optional pipeline context. HTTP transformers always use their - internal SimpleContextManager regardless of the provided context. + def _get_worker_url(self) -> str: + """Determine the full worker URL for HTTP requests. + + Resolves the worker URL based on the endpoint configuration: + + - If no endpoint is configured, generates an auto-generated path + - If endpoint is just a path, returns it as-is (may need base URL) + - If endpoint is a domain-only URL, appends default "/process/data" path + - If endpoint is a full URL with path, uses it unchanged Returns: - An iterator over the processed data. + The complete worker URL for HTTP requests. + + Examples: + >>> transformer = HTTPTransformer(endpoint="http://worker.com") + >>> transformer._get_worker_url() + "http://worker.com/process/data" + + >>> transformer = HTTPTransformer(endpoint="http://worker.com/api/v1/process") + >>> transformer._get_worker_url() + "http://worker.com/api/v1/process" """ - run_context = self._default_context + if not self._endpoint: + # Auto-generate endpoint path + return self._generate_endpoint_path() + # If endpoint is a full URL, append default path if it's just a domain - self._finalize_config() + if not self._endpoint.startswith(("http://", "https://")): + # If it's just a path, return it (this case might need a base URL) + return self._endpoint - def process_chunk(chunk: list) -> list: - """Send one chunk to the worker and return the result. + # If it already has a path beyond just '/', use as-is + if "/" in self._endpoint.split("://", 1)[1]: + return self._endpoint - Args: - chunk: The data chunk to process. + # Otherwise append a default path + return f"{self._endpoint.rstrip('/')}/process/data" - Returns: - The processed chunk from the worker. - """ - try: - response = self.session.post( - self._worker_url, # type: ignore - json=chunk, - timeout=300, - ) - response.raise_for_status() - return response.json() - except requests.RequestException as e: - print(f"Error calling worker {self._worker_url}: {e}") - return [] - - try: - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - chunk_iterator = self._chunk_generator(data) - futures = { - executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunk_iterator, self.max_workers) - } - while futures: - done, futures = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - yield from future.result() - try: - new_chunk = next(chunk_iterator) - futures.add(executor.submit(process_chunk, new_chunk)) - except StopIteration: - continue - finally: - # Always clean up our context since we always use the default one - run_context.shutdown() - - def get_route(self): - """Get the route configuration for registering this transformer as a worker. - - This method returns the necessary information to register the worker - in a Flask app or similar web framework. + def finalize_endpoint(self) -> str: + """Get the final endpoint path for server-side route registration. + + Extracts or generates the path component that should be used when + registering this transformer's logic with a web framework. This + method caches the result to ensure consistency. + + Returns: + The endpoint path without leading slash, suitable for route registration. + + Examples: + >>> transformer = HTTPTransformer(endpoint="http://worker.com/api/process") + >>> transformer.finalize_endpoint() + "api/process" + + >>> transformer = HTTPTransformer(endpoint="/custom/endpoint") + >>> transformer.finalize_endpoint() + "custom/endpoint" + + Note: + The result is cached in _final_endpoint to ensure that multiple + calls return the same value even if the transformation logic changes. + """ + if self._final_endpoint: + return self._final_endpoint + + if self._endpoint: + # Extract path from full URL or use path directly + if self._endpoint.startswith(("http://", "https://")): + from urllib.parse import urlparse + + parsed = urlparse(self._get_worker_url()) + path = parsed.path or "/process/data" + else: + path = self._endpoint + else: + path = self._generate_endpoint_path() + + self._final_endpoint = path.lstrip("/") + return self._final_endpoint + + def get_route(self) -> tuple[str, Callable[[list, IContextManager], list]]: + """Get the route configuration for web framework integration. + + Provides the endpoint path and worker function needed to register + this transformer's logic with a web framework like Flask or FastAPI. + The worker function executes the complete transformation pipeline + that has been defined through chaining operations. Returns: - A tuple containing the endpoint path and the worker view function. + A tuple containing: + - Endpoint path (with leading slash) for route registration + - Worker function that processes a chunk and returns results + + Example: + >>> transformer = HTTPTransformer(endpoint="/api/process") + >>> transformer.map(lambda x: x * 2).filter(lambda x: x > 5) + >>> path, worker_func = transformer.get_route() + >>> print(path) # "/api/process" + >>> + >>> # Register with Flask + >>> app.route(path, methods=['POST'])(worker_func) + >>> + >>> # Or with FastAPI + >>> app.post(path)(worker_func) + + Note: + The worker function signature matches the expected format for + HTTP endpoints: it takes a list (the JSON payload) and a context + manager, returning a list of processed results. """ - self._finalize_config() + endpoint = self.finalize_endpoint() - def worker_view_func(chunk: list, context: IContextManager): - """The actual worker logic for this transformer. + def worker_view_func(chunk: list, context: IContextManager) -> list: + """Execute the transformation logic on a data chunk. + + This function represents the actual worker logic that processes + incoming data chunks. It applies all the transformations that + have been chained onto this HTTPTransformer instance. Args: - chunk: The data chunk to process. - context: The pipeline context. + chunk: List of data items to process (from HTTP request JSON). + context: Context manager for sharing state during processing. Returns: - The processed chunk. + List of processed results (will be returned as HTTP response JSON). """ + # The `self.transformer` holds the composed function (e.g., map -> filter) return self.transformer(chunk, context) - return (f"/{self.endpoint}", worker_view_func) - - # --- Overridden Chaining Methods to Preserve Type --- - - def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "HTTPTransformer[In, Out]": - super().on_error(handler) - return self - - def map[U](self, function: PipelineFunction[Out, U]) -> "HTTPTransformer[In, U]": - super().map(function) - return self # type: ignore - - def filter(self, predicate: PipelineFunction[Out, bool]) -> "HTTPTransformer[In, Out]": - super().filter(predicate) - return self - - @overload - def flatten[T](self: "HTTPTransformer[In, list[T]]") -> "HTTPTransformer[In, T]": ... - @overload - def flatten[T](self: "HTTPTransformer[In, tuple[T, ...]]") -> "HTTPTransformer[In, T]": ... - @overload - def flatten[T](self: "HTTPTransformer[In, set[T]]") -> "HTTPTransformer[In, T]": ... - # Forgive me for I have sinned, but this is necessary to avoid type errors - # Sinec I'm setting self type in the parent class, overriding it isn't allowed - def flatten[T]( # type: ignore - self: Union["HTTPTransformer[In, list[T]]", "HTTPTransformer[In, tuple[T, ...]]", "HTTPTransformer[In, set[T]]"], - ) -> "HTTPTransformer[In, T]": - super().flatten() # type: ignore - return self # type: ignore - - def tap(self, arg: Union["Transformer[Out, Any]", PipelineFunction[Out, Any]]) -> "HTTPTransformer[In, Out]": - super().tap(arg) - return self - - def apply[T](self, t: Callable[["HTTPTransformer[In, Out]"], "Transformer[In, T]"]) -> "HTTPTransformer[In, T]": - # Note: The type hint for `t` is slightly adjusted to reflect it receives an HTTPTransformer - super().apply(t) # type: ignore - return self # type: ignore - - def catch[U]( - self, - sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]], - on_error: ChunkErrorHandler[Out, None] | None = None, - ) -> "HTTPTransformer[In, U]": - super().catch(sub_pipeline_builder, on_error) - return self # type: ignore - - def short_circuit(self, function: Callable[[IContextManager], bool | None]) -> "HTTPTransformer[In, Out]": - super().short_circuit(function) - return self + return (f"/{endpoint}", worker_view_func) diff --git a/laygo/transformers/strategies/http.py b/laygo/transformers/strategies/http.py index e69de29..5b8a9b8 100644 --- a/laygo/transformers/strategies/http.py +++ b/laygo/transformers/strategies/http.py @@ -0,0 +1,65 @@ +from collections.abc import Callable +from collections.abc import Iterable +from collections.abc import Iterator +from concurrent.futures import FIRST_COMPLETED +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import wait +import itertools + +import requests + +from laygo.context.types import IContextManager +from laygo.transformers.strategies.types import ExecutionStrategy +from laygo.transformers.types import InternalTransformer + + +class HTTPStrategy[In, Out](ExecutionStrategy[In, Out]): + """ + An execution strategy that sends data chunks to a remote HTTP worker. + This is the CLIENT-SIDE implementation. + """ + + def __init__(self, worker_url: Callable[[], str], max_workers: int = 8, timeout: int = 300): + self.worker_url = worker_url + self.max_workers = max_workers + self.timeout = timeout + self.session = requests.Session() + + def execute( + self, + transformer_logic: InternalTransformer[In, Out], # Note: This is ignored + chunk_generator: Callable[[Iterable[In]], Iterator[list[In]]], + data: Iterable[In], + context: IContextManager, # Note: This is also ignored + ) -> Iterator[Out]: + """Sends data to the remote worker and yields results.""" + + def process_chunk(chunk: list[In]) -> list[Out]: + """Sends one chunk to the worker and returns the result.""" + try: + response = self.session.post( + self.worker_url(), + json=chunk, + timeout=self.timeout, + ) + response.raise_for_status() + return response.json() + except requests.RequestException as e: + print(f"Error calling worker {self.worker_url}: {e}") + # Depending on desired behavior, you might raise an error + # or return an empty list to skip the failed chunk. + return [] + + # Use a ThreadPoolExecutor to make concurrent HTTP requests + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + chunk_iterator = chunk_generator(data) + futures = {executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunk_iterator, self.max_workers)} + while futures: + done, futures = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + yield from future.result() + try: + new_chunk = next(chunk_iterator) + futures.add(executor.submit(process_chunk, new_chunk)) + except StopIteration: + continue diff --git a/tests/test_http_transformer.py b/tests/test_http_transformer.py index 7fa01df..09585d7 100644 --- a/tests/test_http_transformer.py +++ b/tests/test_http_transformer.py @@ -2,8 +2,8 @@ # This includes Pipeline, Transformer, and your HTTPTransformer. import requests_mock -from laygo import HTTPTransformer from laygo import Pipeline +from laygo import create_http_transformer from laygo.context.simple import SimpleContextManager @@ -25,16 +25,16 @@ def test_distributed_transformer_with_mock(self): # 2. Define the transformer and its logic using the chainable API. # This single instance holds both the client and server logic. - http_transformer = ( - HTTPTransformer(base_url=base_url, endpoint=endpoint).map(lambda x: x * 2).filter(lambda x: x > 10) - ) + http_transformer, get_route = create_http_transformer(int, endpoint=base_url) + + http_transformer.map(lambda x: x * 2).filter(lambda x: x > 10) # Set a small chunk_size to ensure the client makes multiple requests http_transformer.chunk_size = 4 # 3. Get the worker's logic from the transformer itself # The `get_route` method provides the exact function the worker would run. - _, worker_view_func = http_transformer.get_route() + _, worker_view_func = get_route() # 4. Configure the mock endpoint to use the real worker logic def mock_response(request, context): @@ -42,7 +42,7 @@ def mock_response(request, context): input_chunk = request.json() # Call the actual view function logic obtained from get_route() # We pass None for the context as it's not used in this simple case. - output_chunk = worker_view_func(chunk=input_chunk, context=SimpleContextManager()) + output_chunk = worker_view_func(input_chunk, SimpleContextManager()) return output_chunk # Use requests_mock context manager