Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions laygo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,22 @@
from laygo.helpers import PipelineContext
from laygo.pipeline import Pipeline
from laygo.transformers.http import HTTPTransformer
from laygo.transformers.http import createHTTPTransformer
from laygo.transformers.parallel import ParallelTransformer
from laygo.transformers.parallel import createParallelTransformer
from laygo.transformers.threaded import ThreadedTransformer
from laygo.transformers.threaded import createThreadedTransformer
from laygo.transformers.http import create_http_transformer
from laygo.transformers.transformer import Transformer
from laygo.transformers.transformer import build_chunk_generator
from laygo.transformers.transformer import createTransformer
from laygo.transformers.transformer import create_process_transformer
from laygo.transformers.transformer import create_threaded_transformer
from laygo.transformers.transformer import create_transformer
from laygo.transformers.transformer import passthrough_chunks

__all__ = [
"Pipeline",
"Transformer",
"createTransformer",
"ThreadedTransformer",
"createThreadedTransformer",
"ParallelTransformer",
"createParallelTransformer",
"create_transformer",
"create_threaded_transformer",
"create_process_transformer",
"HTTPTransformer",
"createHTTPTransformer",
"create_http_transformer",
"PipelineContext",
"ErrorHandler",
"passthrough_chunks",
Expand Down
19 changes: 11 additions & 8 deletions laygo/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
from laygo.context.types import IContextHandle
from laygo.helpers import is_context_aware
from laygo.transformers.transformer import Transformer
from laygo.transformers.types import BaseTransformer

T = TypeVar("T")
U = TypeVar("U")
PipelineFunction = Callable[[T], Any]


# This function must be defined at the top level of the module (e.g., after imports)
def _branch_consumer_process[T](transformer: Transformer, queue: "Queue", context_handle: IContextHandle) -> list[Any]:
def _branch_consumer_process[T](
transformer: BaseTransformer, queue: "Queue", context_handle: IContextHandle
) -> list[Any]:
"""Entry point for a consumer process in parallel branching.

Reconstructs the necessary objects and runs a dedicated pipeline instance
Expand Down Expand Up @@ -457,7 +460,7 @@ def _producer_broadcast(
@overload
def branch(
self,
branches: Mapping[str, Transformer[T, Any]],
branches: Mapping[str, BaseTransformer[T, Any]],
*,
executor_type: Literal["thread", "process"] = "thread",
batch_size: int = 1000,
Expand All @@ -468,7 +471,7 @@ def branch(
@overload
def branch(
self,
branches: Mapping[str, tuple[Transformer[T, Any], Callable[[T], bool]]],
branches: Mapping[str, tuple[BaseTransformer[T, Any], Callable[[T], bool]]],
*,
executor_type: Literal["thread", "process"] = "thread",
first_match: bool = True,
Expand All @@ -478,7 +481,7 @@ def branch(

def branch(
self,
branches: Mapping[str, Transformer[T, Any]] | Mapping[str, tuple[Transformer[T, Any], Callable[[T], bool]]],
branches: Mapping[str, BaseTransformer[T, Any]] | Mapping[str, tuple[BaseTransformer[T, Any], Callable[[T], bool]]],
*,
executor_type: Literal["thread", "process"] = "thread",
first_match: bool = True,
Expand Down Expand Up @@ -519,7 +522,7 @@ def branch(
first_value = next(iter(branches.values()))
is_conditional = isinstance(first_value, tuple)

parsed_branches: list[tuple[str, Transformer[T, Any], Callable[[T], bool]]]
parsed_branches: list[tuple[str, BaseTransformer[T, Any], Callable[[T], bool]]]
if is_conditional:
parsed_branches = [(name, trans, cond) for name, (trans, cond) in branches.items()] # type: ignore
else:
Expand Down Expand Up @@ -555,7 +558,7 @@ def _execute_branching_process(
self,
*,
producer_fn: Callable,
parsed_branches: list[tuple[str, Transformer, Callable]],
parsed_branches: list[tuple[str, BaseTransformer, Callable]],
batch_size: int,
max_batch_buffer: int,
) -> tuple[dict[str, list[Any]], dict[str, Any]]:
Expand Down Expand Up @@ -629,7 +632,7 @@ def _execute_branching_thread(
self,
*,
producer_fn: Callable,
parsed_branches: list[tuple[str, Transformer, Callable]],
parsed_branches: list[tuple[str, BaseTransformer, Callable]],
batch_size: int,
max_batch_buffer: int,
) -> tuple[dict[str, list[Any]], dict[str, Any]]:
Expand All @@ -654,7 +657,7 @@ def _execute_branching_thread(
final_results: dict[str, list[Any]] = {name: [] for name, _, _ in parsed_branches}
queues = {name: Queue(maxsize=max_batch_buffer) for name, _, _ in parsed_branches}

def consumer(transformer: Transformer, queue: Queue, context_handle: IContextHandle) -> list[Any]:
def consumer(transformer: BaseTransformer, queue: Queue, context_handle: IContextHandle) -> list[Any]:
"""Consume batches from a queue and process them with a transformer.

Creates a mini-pipeline for the transformer and processes all
Expand Down
Loading
Loading