Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions laygo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from laygo.transformers.threaded import ThreadedTransformer
from laygo.transformers.threaded import createThreadedTransformer
from laygo.transformers.transformer import Transformer
from laygo.transformers.transformer import build_chunk_generator
from laygo.transformers.transformer import createTransformer
from laygo.transformers.transformer import passthrough_chunks

__all__ = [
"Pipeline",
Expand All @@ -28,4 +30,6 @@
"createHTTPTransformer",
"PipelineContext",
"ErrorHandler",
"passthrough_chunks",
"build_chunk_generator",
]
108 changes: 102 additions & 6 deletions laygo/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
# pipeline.py

from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import itertools
import multiprocessing as mp
from queue import Queue
from typing import Any
from typing import TypeVar
from typing import overload

from laygo.helpers import PipelineContext
from laygo.helpers import is_context_aware
from laygo.transformers.threaded import ThreadedTransformer
from laygo.transformers.transformer import Transformer
from laygo.transformers.transformer import passthrough_chunks

T = TypeVar("T")
U = TypeVar("U")
PipelineFunction = Callable[[T], Any]


Expand Down Expand Up @@ -147,16 +150,109 @@ def apply[U](

return self # type: ignore

def buffer(self, size: int) -> "Pipeline[T]":
"""Buffer the pipeline using threaded processing.
def branch(
self,
branches: dict[str, Transformer[T, Any]],
batch_size: int = 1000,
max_batch_buffer: int = 1,
use_queue_chunks: bool = True,
) -> dict[str, list[Any]]:
"""Forks the pipeline into multiple branches for concurrent, parallel processing."""
if not branches:
self.consume()
return {}

source_iterator = self.processed_data
branch_items = list(branches.items())
num_branches = len(branch_items)
final_results: dict[str, list[Any]] = {}

queues = [Queue(maxsize=max_batch_buffer) for _ in range(num_branches)]

def producer() -> None:
"""Reads from the source and distributes batches to ALL branch queues."""
# Use itertools.batched for clean and efficient batch creation.
for batch_tuple in itertools.batched(source_iterator, batch_size):
# The batch is a tuple; convert to a list for consumers.
batch_list = list(batch_tuple)
for q in queues:
q.put(batch_list)

# Signal to all consumers that the stream is finished.
for q in queues:
q.put(None)

def consumer(transformer: Transformer, queue: Queue) -> list[Any]:
"""Consumes batches from a queue and runs them through a transformer."""

def stream_from_queue() -> Iterator[T]:
while (batch := queue.get()) is not None:
yield batch

if use_queue_chunks:
transformer = transformer.set_chunker(passthrough_chunks)

result_iterator = transformer(stream_from_queue(), self.ctx) # type: ignore
return list(result_iterator)

with ThreadPoolExecutor(max_workers=num_branches + 1) as executor:
executor.submit(producer)

future_to_name = {
executor.submit(consumer, transformer, queues[i]): name for i, (name, transformer) in enumerate(branch_items)
}

for future in as_completed(future_to_name):
name = future_to_name[future]
try:
final_results[name] = future.result()
except Exception as e:
print(f"Branch '{name}' raised an exception: {e}")
final_results[name] = []

return final_results

def buffer(self, size: int, batch_size: int = 1000) -> "Pipeline[T]":
"""Inserts a buffer in the pipeline to allow downstream processing to read ahead.

This creates a background thread that reads from the upstream data source
and fills a queue, decoupling the upstream and downstream stages.

Args:
size: The number of worker threads to use for buffering.
size: The number of **batches** to hold in the buffer.
batch_size: The number of items to accumulate per batch.

Returns:
The pipeline instance for method chaining.
"""
self.apply(ThreadedTransformer(max_workers=size))
source_iterator = self.processed_data

def _buffered_stream() -> Iterator[T]:
queue = Queue(maxsize=size)
# We only need one background thread for the producer.
executor = ThreadPoolExecutor(max_workers=1)

def _producer() -> None:
"""The producer reads from the source and fills the queue."""
try:
for batch_tuple in itertools.batched(source_iterator, batch_size):
queue.put(list(batch_tuple))
finally:
# Always put the sentinel value to signal the end of the stream.
queue.put(None)

# Start the producer in the background thread.
executor.submit(_producer)

try:
# The main thread becomes the consumer.
while (batch := queue.get()) is not None:
yield from batch
finally:
# Ensure the background thread is cleaned up.
executor.shutdown(wait=False, cancel_futures=True)

self.processed_data = _buffered_stream()
return self

def __iter__(self) -> Iterator[T]:
Expand Down
14 changes: 14 additions & 0 deletions laygo/transformers/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ def chunk_generator(data: Iterable[T]) -> Iterator[list[T]]:
return chunk_generator


def passthrough_chunks[T](data: Iterable[list[T]]) -> Iterator[list[T]]:
"""A chunk generator that yields the entire input as a single chunk.

This is useful for transformers that do not require chunking.

Args:
data: The input data to process.

Returns:
An iterator yielding the entire input as a single chunk.
"""
yield from iter(data)


class Transformer[In, Out]:
"""Define and compose data transformations by passing context explicitly.

Expand Down
Loading
Loading