diff --git a/laygo/transformers/strategies/process.py b/laygo/transformers/strategies/process.py index 10c3e0d..ed50e5e 100644 --- a/laygo/transformers/strategies/process.py +++ b/laygo/transformers/strategies/process.py @@ -1,9 +1,9 @@ from collections import deque from collections.abc import Iterator -from concurrent.futures import FIRST_COMPLETED from concurrent.futures import wait import itertools +from loky import as_completed from loky import get_reusable_executor from laygo.context.types import IContextHandle @@ -55,21 +55,40 @@ def _ordered_generator( executor, context_handle: IContextHandle, ) -> Iterator[list[Out]]: - """Generate results in their original order.""" + """Generate results in their original order, with robust error handling.""" futures = deque() + chunks_iter = iter(chunks_iter) + + # Submit the initial batch of tasks for _ in range(self.max_workers + 1): try: chunk = next(chunks_iter) futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk)) except StopIteration: break - while futures: - yield futures.popleft().result() - try: - chunk = next(chunks_iter) - futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk)) - except StopIteration: - continue + + try: + while futures: + # Get the result of the oldest task. If it failed or the pool + # is broken, .result() will raise an exception. + result = futures.popleft().result() + + # If successful, submit a new task. + try: + chunk = next(chunks_iter) + futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk)) + except StopIteration: + # No more chunks to process. + pass + + yield result + finally: + # This cleanup runs if the loop finishes or if an exception occurs. + # It prevents orphaned processes by cancelling pending tasks. + for future in futures: + future.cancel() + if futures: + wait(list(futures)) def _unordered_generator( self, @@ -78,17 +97,35 @@ def _unordered_generator( executor, context_handle: IContextHandle, ) -> Iterator[list[Out]]: - """Generate results as they complete.""" + """Generate results as they complete, with robust error handling.""" futures = { executor.submit(_worker_process_chunk, transformer, context_handle, chunk) for chunk in itertools.islice(chunks_iter, self.max_workers + 1) } - while futures: - done, futures = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - yield future.result() + + try: + # as_completed is ideal for this "process as they finish" pattern + for future in as_completed(futures): + # Get the result. This raises an exception if the task failed, + # which immediately stops the loop and proceeds to finally. + result = future.result() + + # Remove the completed future from our tracking set + futures.remove(future) + + # Try to submit a new task to replace the one that just finished try: chunk = next(chunks_iter) futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk)) except StopIteration: - continue + # No more chunks left to submit. + pass + + yield result + finally: + # Clean up any futures that were still running or pending when + # an exception occurred or the input was exhausted. + for future in futures: + future.cancel() + if futures: + wait(futures) diff --git a/laygo/transformers/strategies/threaded.py b/laygo/transformers/strategies/threaded.py index 2ed8c54..5e8444f 100644 --- a/laygo/transformers/strategies/threaded.py +++ b/laygo/transformers/strategies/threaded.py @@ -74,52 +74,104 @@ def process_chunk(chunk: list[In]) -> list[Out]: Args: chunk: The data chunk to process. - shared_context: The shared context for processing. Returns: The processed chunk. """ - return transformer(chunk, shared_context) # type: ignore + return transformer(chunk, shared_context) def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: """Generate results in their original order.""" futures: deque[Future[list[Out]]] = deque() + executor_shutdown = False # Pre-submit initial batch of futures - for _ in range(min(self.max_workers, 10)): # Limit initial submissions + for _ in range(min(self.max_workers, 10)): + if executor_shutdown: + break try: chunk = next(chunks_iter) futures.append(executor.submit(process_chunk, chunk)) except StopIteration: break + except RuntimeError as e: + if "cannot schedule new futures after shutdown" in str(e): + executor_shutdown = True + break + raise while futures: - # Get the next result and submit the next chunk - result = futures.popleft().result() - yield result - try: - chunk = next(chunks_iter) - futures.append(executor.submit(process_chunk, chunk)) - except StopIteration: - continue + # Get the next result + result = futures.popleft().result() + yield result + + # Try to submit the next chunk only if executor is not shutdown + if not executor_shutdown: + try: + chunk = next(chunks_iter) + futures.append(executor.submit(process_chunk, chunk)) + except StopIteration: + continue + except RuntimeError as e: + if "cannot schedule new futures after shutdown" in str(e): + executor_shutdown = True + continue + raise + except Exception: + # Cancel remaining futures and re-raise + for future in futures: + try: + future.cancel() + except Exception: + pass # Ignore cancellation errors + futures.clear() + raise def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]: """Generate results as they complete.""" + futures = set() + executor_shutdown = False + # Pre-submit initial batch - futures = { - executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunks_iter, min(self.max_workers, 10)) - } + for chunk in itertools.islice(chunks_iter, min(self.max_workers, 10)): + if executor_shutdown: + break + try: + futures.add(executor.submit(process_chunk, chunk)) + except RuntimeError as e: + if "cannot schedule new futures after shutdown" in str(e): + executor_shutdown = True + break + raise while futures: - done, futures = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - yield future.result() - try: - chunk = next(chunks_iter) - futures.add(executor.submit(process_chunk, chunk)) - except StopIteration: - continue + try: + done, futures = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + yield future.result() + + # Try to submit next chunk only if executor is not shutdown + if not executor_shutdown: + try: + chunk = next(chunks_iter) + futures.add(executor.submit(process_chunk, chunk)) + except StopIteration: + continue + except RuntimeError as e: + if "cannot schedule new futures after shutdown" in str(e): + executor_shutdown = True + continue + raise + except Exception: + # Cancel remaining futures and re-raise + for future in futures: + try: + future.cancel() + except Exception: + pass # Ignore cancellation errors + futures.clear() + raise # Use the reusable thread pool instead of creating a new one executor = self._get_thread_pool(self.max_workers) @@ -129,10 +181,3 @@ def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolEx # Process chunks using the reusable executor for result_chunk in gen_func(chunks_to_process, executor): yield from result_chunk - - def __del__(self) -> None: - """Shutdown all cached thread pools. Call this during application cleanup.""" - with self._pool_lock: - for pool in self._thread_pools.values(): - pool.shutdown(wait=True) - self._thread_pools.clear() diff --git a/laygo/transformers/transformer.py b/laygo/transformers/transformer.py index 4a3084e..90cafbd 100644 --- a/laygo/transformers/transformer.py +++ b/laygo/transformers/transformer.py @@ -17,6 +17,7 @@ from laygo.errors import ErrorHandler from laygo.helpers import is_context_aware from laygo.helpers import is_context_aware_reduce +from laygo.transformers.strategies.process import ProcessStrategy from laygo.transformers.strategies.sequential import SequentialStrategy from laygo.transformers.strategies.threaded import ThreadedStrategy from laygo.transformers.strategies.types import ExecutionStrategy @@ -89,7 +90,7 @@ def create_process_transformer[T]( """ return Transformer[T, T]( chunk_size=chunk_size, - strategy=ThreadedStrategy( + strategy=ProcessStrategy( max_workers=max_workers, ordered=ordered, ),