Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 52 additions & 15 deletions laygo/transformers/strategies/process.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from collections import deque
from collections.abc import Iterator
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import wait
import itertools

from loky import as_completed
from loky import get_reusable_executor

from laygo.context.types import IContextHandle
Expand Down Expand Up @@ -55,21 +55,40 @@ def _ordered_generator(
executor,
context_handle: IContextHandle,
) -> Iterator[list[Out]]:
"""Generate results in their original order."""
"""Generate results in their original order, with robust error handling."""
futures = deque()
chunks_iter = iter(chunks_iter)

# Submit the initial batch of tasks
for _ in range(self.max_workers + 1):
try:
chunk = next(chunks_iter)
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
except StopIteration:
break
while futures:
yield futures.popleft().result()
try:
chunk = next(chunks_iter)
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
except StopIteration:
continue

try:
while futures:
# Get the result of the oldest task. If it failed or the pool
# is broken, .result() will raise an exception.
result = futures.popleft().result()

# If successful, submit a new task.
try:
chunk = next(chunks_iter)
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
except StopIteration:
# No more chunks to process.
pass

yield result
finally:
# This cleanup runs if the loop finishes or if an exception occurs.
# It prevents orphaned processes by cancelling pending tasks.
for future in futures:
future.cancel()
if futures:
wait(list(futures))

def _unordered_generator(
self,
Expand All @@ -78,17 +97,35 @@ def _unordered_generator(
executor,
context_handle: IContextHandle,
) -> Iterator[list[Out]]:
"""Generate results as they complete."""
"""Generate results as they complete, with robust error handling."""
futures = {
executor.submit(_worker_process_chunk, transformer, context_handle, chunk)
for chunk in itertools.islice(chunks_iter, self.max_workers + 1)
}
while futures:
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
yield future.result()

try:
# as_completed is ideal for this "process as they finish" pattern
for future in as_completed(futures):
# Get the result. This raises an exception if the task failed,
# which immediately stops the loop and proceeds to finally.
result = future.result()

# Remove the completed future from our tracking set
futures.remove(future)

# Try to submit a new task to replace the one that just finished
try:
chunk = next(chunks_iter)
futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
except StopIteration:
continue
# No more chunks left to submit.
pass

yield result
finally:
# Clean up any futures that were still running or pending when
# an exception occurred or the input was exhausted.
for future in futures:
future.cancel()
if futures:
wait(futures)
103 changes: 74 additions & 29 deletions laygo/transformers/strategies/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,52 +74,104 @@ def process_chunk(chunk: list[In]) -> list[Out]:

Args:
chunk: The data chunk to process.
shared_context: The shared context for processing.

Returns:
The processed chunk.
"""
return transformer(chunk, shared_context) # type: ignore
return transformer(chunk, shared_context)

def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]:
"""Generate results in their original order."""
futures: deque[Future[list[Out]]] = deque()
executor_shutdown = False

# Pre-submit initial batch of futures
for _ in range(min(self.max_workers, 10)): # Limit initial submissions
for _ in range(min(self.max_workers, 10)):
if executor_shutdown:
break
try:
chunk = next(chunks_iter)
futures.append(executor.submit(process_chunk, chunk))
except StopIteration:
break
except RuntimeError as e:
if "cannot schedule new futures after shutdown" in str(e):
executor_shutdown = True
break
raise

while futures:
# Get the next result and submit the next chunk
result = futures.popleft().result()
yield result

try:
chunk = next(chunks_iter)
futures.append(executor.submit(process_chunk, chunk))
except StopIteration:
continue
# Get the next result
result = futures.popleft().result()
yield result

# Try to submit the next chunk only if executor is not shutdown
if not executor_shutdown:
try:
chunk = next(chunks_iter)
futures.append(executor.submit(process_chunk, chunk))
except StopIteration:
continue
except RuntimeError as e:
if "cannot schedule new futures after shutdown" in str(e):
executor_shutdown = True
continue
raise
except Exception:
# Cancel remaining futures and re-raise
for future in futures:
try:
future.cancel()
except Exception:
pass # Ignore cancellation errors
futures.clear()
raise

def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]:
"""Generate results as they complete."""
futures = set()
executor_shutdown = False

# Pre-submit initial batch
futures = {
executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunks_iter, min(self.max_workers, 10))
}
for chunk in itertools.islice(chunks_iter, min(self.max_workers, 10)):
if executor_shutdown:
break
try:
futures.add(executor.submit(process_chunk, chunk))
except RuntimeError as e:
if "cannot schedule new futures after shutdown" in str(e):
executor_shutdown = True
break
raise

while futures:
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
yield future.result()
try:
chunk = next(chunks_iter)
futures.add(executor.submit(process_chunk, chunk))
except StopIteration:
continue
try:
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
yield future.result()

# Try to submit next chunk only if executor is not shutdown
if not executor_shutdown:
try:
chunk = next(chunks_iter)
futures.add(executor.submit(process_chunk, chunk))
except StopIteration:
continue
except RuntimeError as e:
if "cannot schedule new futures after shutdown" in str(e):
executor_shutdown = True
continue
raise
except Exception:
# Cancel remaining futures and re-raise
for future in futures:
try:
future.cancel()
except Exception:
pass # Ignore cancellation errors
futures.clear()
raise

# Use the reusable thread pool instead of creating a new one
executor = self._get_thread_pool(self.max_workers)
Expand All @@ -129,10 +181,3 @@ def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolEx
# Process chunks using the reusable executor
for result_chunk in gen_func(chunks_to_process, executor):
yield from result_chunk

def __del__(self) -> None:
"""Shutdown all cached thread pools. Call this during application cleanup."""
with self._pool_lock:
for pool in self._thread_pools.values():
pool.shutdown(wait=True)
self._thread_pools.clear()
3 changes: 2 additions & 1 deletion laygo/transformers/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from laygo.errors import ErrorHandler
from laygo.helpers import is_context_aware
from laygo.helpers import is_context_aware_reduce
from laygo.transformers.strategies.process import ProcessStrategy
from laygo.transformers.strategies.sequential import SequentialStrategy
from laygo.transformers.strategies.threaded import ThreadedStrategy
from laygo.transformers.strategies.types import ExecutionStrategy
Expand Down Expand Up @@ -89,7 +90,7 @@ def create_process_transformer[T](
"""
return Transformer[T, T](
chunk_size=chunk_size,
strategy=ThreadedStrategy(
strategy=ProcessStrategy(
max_workers=max_workers,
ordered=ordered,
),
Expand Down
Loading