Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions laygo/context/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
Laygo Context Management Package.

This package provides different strategies for managing state (context)
within a data pipeline, from simple in-memory dictionaries to
process-safe managers for parallel execution.
"""

from .parallel import ParallelContextManager
from .simple import SimpleContextManager
from .types import IContextHandle
from .types import IContextManager

__all__ = [
"IContextManager",
"IContextHandle",
"SimpleContextManager",
"ParallelContextManager",
]
138 changes: 138 additions & 0 deletions laygo/context/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
A context manager for parallel and distributed processing using
multiprocessing.Manager to share state across processes.
"""

from collections.abc import Callable
from collections.abc import Iterator
import multiprocessing as mp
from multiprocessing.managers import DictProxy
import threading
from threading import Lock
from typing import Any
from typing import TypeVar

from laygo.context.types import IContextHandle
from laygo.context.types import IContextManager

R = TypeVar("R")


class ParallelContextHandle(IContextHandle):
"""
A lightweight, picklable handle that carries the actual shared objects
(the DictProxy and Lock) to worker processes.
"""

def __init__(self, shared_dict: DictProxy, lock: Lock):
self._shared_dict = shared_dict
self._lock = lock

def create_proxy(self) -> "IContextManager":
"""
Creates a new ParallelContextManager instance that wraps the shared
objects received by the worker process.
"""
return ParallelContextManager(handle=self)


class ParallelContextManager(IContextManager):
"""
A context manager that enables state sharing across processes.

It operates in two modes:
1. Main Mode: When created normally, it starts a multiprocessing.Manager
and creates a shared dictionary and lock.
2. Proxy Mode: When created from a handle, it wraps a DictProxy and Lock
that were received from another process. It does not own the manager.
"""

def __init__(self, initial_context: dict[str, Any] | None = None, handle: ParallelContextHandle | None = None):
"""
Initializes the manager. If a handle is provided, it initializes in
proxy mode; otherwise, it starts a new manager.
"""
if handle:
# --- PROXY MODE INITIALIZATION ---
# This instance is a client wrapping objects from an existing server.
self._manager = None # Proxies do not own the manager process.
self._shared_dict = handle._shared_dict
self._lock = handle._lock
else:
# --- MAIN MODE INITIALIZATION ---
# This instance owns the manager and its shared objects.
self._manager = mp.Manager()
self._shared_dict = self._manager.dict(initial_context or {})
self._lock = self._manager.Lock()

# Thread-local storage for lock state to handle concurrent access
self._local = threading.local()

def _lock_context(self) -> None:
"""Acquire the lock for this context manager."""
if not getattr(self._local, "is_locked", False):
self._lock.acquire()
self._local.is_locked = True

def _unlock_context(self) -> None:
"""Release the lock for this context manager."""
if getattr(self._local, "is_locked", False):
self._lock.release()
self._local.is_locked = False

def _execute_locked(self, operation: Callable[[], R]) -> R:
"""A private helper to execute an operation within a lock."""
if not getattr(self._local, "is_locked", False):
self._lock_context()
try:
return operation()
finally:
self._unlock_context()
else:
return operation()

def get_handle(self) -> ParallelContextHandle:
"""
Returns a picklable handle containing the shared dict and lock.
Only the main instance can generate handles.
"""
if not self._manager:
raise TypeError("Cannot get a handle from a proxy context instance.")

return ParallelContextHandle(self._shared_dict, self._lock)

def shutdown(self) -> None:
"""
Shuts down the background manager process.
This is a no-op for proxy instances.
"""
if self._manager:
self._manager.shutdown()

def __enter__(self) -> "ParallelContextManager":
"""Acquires the lock for use in a 'with' statement."""
self._lock_context()
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Releases the lock."""
self._unlock_context()

def __getitem__(self, key: str) -> Any:
return self._shared_dict[key]

def __setitem__(self, key: str, value: Any) -> None:
self._execute_locked(lambda: self._shared_dict.__setitem__(key, value))

def __delitem__(self, key: str) -> None:
self._execute_locked(lambda: self._shared_dict.__delitem__(key))

def __iter__(self) -> Iterator[str]:
# Iteration needs to copy the keys to be safe across processes
return self._execute_locked(lambda: iter(list(self._shared_dict.keys())))

def __len__(self) -> int:
return self._execute_locked(lambda: len(self._shared_dict))

def to_dict(self) -> dict[str, Any]:
return self._execute_locked(lambda: dict(self._shared_dict))
97 changes: 97 additions & 0 deletions laygo/context/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""
A simple, dictionary-based context manager for single-process pipelines.
"""

from collections.abc import Iterator
from typing import Any

from laygo.context.types import IContextHandle
from laygo.context.types import IContextManager


class SimpleContextHandle(IContextHandle):
"""
A handle for the SimpleContextManager that provides a reference back to the
original manager instance.

In a single-process environment, the "proxy" is the manager itself, ensuring
all transformers in a chain share the exact same context dictionary.
"""

def __init__(self, manager_instance: "IContextManager"):
self._manager_instance = manager_instance

def create_proxy(self) -> "IContextManager":
"""
Returns the original SimpleContextManager instance.

This ensures that in a non-distributed pipeline, all chained transformers
operate on the same shared dictionary.
"""
return self._manager_instance


class SimpleContextManager(IContextManager):
"""
A basic context manager that uses a standard Python dictionary for state.

This manager is suitable for single-threaded, single-process pipelines where
no state needs to be shared across process boundaries. It is the default
context manager for a Laygo pipeline.
"""

def __init__(self, initial_context: dict[str, Any] | None = None) -> None:
"""
Initializes the context manager with an optional dictionary.

Args:
initial_context: An optional dictionary to populate the context with.
"""
self._context = dict(initial_context or {})

def get_handle(self) -> IContextHandle:
"""
Returns a handle that holds a reference back to this same instance.
"""
return SimpleContextHandle(self)

def __enter__(self) -> "SimpleContextManager":
"""
Provides 'with' statement compatibility. No lock is needed for this
simple, single-threaded context manager.
"""
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""
Provides 'with' statement compatibility. No lock is needed for this
simple, single-threaded context manager.
"""
pass

def __getitem__(self, key: str) -> Any:
return self._context[key]

def __setitem__(self, key: str, value: Any) -> None:
self._context[key] = value

def __delitem__(self, key: str) -> None:
del self._context[key]

def __iter__(self) -> Iterator[str]:
return iter(self._context)

def __len__(self) -> int:
return len(self._context)

def shutdown(self) -> None:
"""No-op for the simple context manager."""
pass

def to_dict(self) -> dict[str, Any]:
"""
Returns a copy of the entire context as a standard Python dictionary.

This operation is performed atomically to ensure consistency.
"""
return self._context
104 changes: 104 additions & 0 deletions laygo/context/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""
Defines the abstract base classes for context management in Laygo.

This module provides the core interfaces (IContextHandle and IContextManager)
that all context managers must implement, ensuring a consistent API for
state management across different execution environments (simple, threaded, parallel).
"""

from abc import ABC
from abc import abstractmethod
from collections.abc import MutableMapping
from typing import Any


class IContextHandle(ABC):
"""
An abstract base class for a picklable handle to a context manager.

A handle contains the necessary information for a worker process to
reconstruct a connection (a proxy) to the shared context.
"""

@abstractmethod
def create_proxy(self) -> "IContextManager":
"""
Creates the appropriate context proxy instance from the handle's data.

This method is called within a worker process to establish its own
connection to the shared state.

Returns:
An instance of an IContextManager proxy.
"""
raise NotImplementedError


class IContextManager(MutableMapping[str, Any], ABC):
"""
Abstract base class for managing shared state (context) in a pipeline.

This class defines the contract for all context managers, ensuring they
provide a dictionary-like interface for state manipulation by inheriting
from `collections.abc.MutableMapping`. It also includes methods for
distribution (get_handle), resource management (shutdown), and context
management (__enter__, __exit__).
"""

@abstractmethod
def get_handle(self) -> IContextHandle:
"""
Returns a picklable handle for connecting from a worker process.

This handle is serialized and sent to distributed workers, which then
use it to create a proxy to the shared context.

Returns:
A picklable IContextHandle instance.
"""
raise NotImplementedError

@abstractmethod
def shutdown(self) -> None:
"""
Performs final synchronization and cleans up any resources.

This method is responsible for releasing connections, shutting down
background processes, or any other cleanup required by the manager.
"""
raise NotImplementedError

def __enter__(self) -> "IContextManager":
"""
Enters the runtime context related to this object.

Returns:
The context manager instance itself.
"""
return self

def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any) -> None:
"""
Exits the runtime context and performs cleanup.

Args:
exc_type: The exception type, if an exception was raised.
exc_val: The exception instance, if an exception was raised.
exc_tb: The traceback object, if an exception was raised.
"""
self.shutdown()

def to_dict(self) -> dict[str, Any]:
"""
Returns a copy of the entire shared context as a standard
Python dictionary.

This operation is performed atomically using a lock to ensure a
consistent snapshot of the context is returned.

Returns:
A standard dict containing a copy of the shared context.
"""
# The dict() constructor iterates over the proxy and copies its items.
# The lock ensures this happens atomically without race conditions.
raise NotImplementedError
8 changes: 4 additions & 4 deletions laygo/errors.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from collections.abc import Callable

from laygo.helpers import PipelineContext
from laygo.context.types import IContextManager

ChunkErrorHandler = Callable[[list, Exception, PipelineContext], None]
ChunkErrorHandler = Callable[[list, Exception, IContextManager], None]


def raise_error(chunk: list, error: Exception, context: PipelineContext) -> None:
def raise_error(chunk: list, error: Exception, context: IContextManager) -> None:
"""Handler that always re-raises the error, stopping execution.

This is a default error handler that provides fail-fast behavior by
Expand Down Expand Up @@ -47,7 +47,7 @@ def on_error(self, handler: ChunkErrorHandler) -> "ErrorHandler":
self._handlers.insert(0, handler)
return self

def handle(self, chunk: list, error: Exception, context: PipelineContext) -> None:
def handle(self, chunk: list, error: Exception, context: IContextManager) -> None:
"""Execute all handlers in the chain.

Handlers are executed in reverse order of addition. Execution stops
Expand Down
Loading
Loading