Skip to content

Commit b6bab95

Browse files
ctruedenclaude
andcommitted
Add Service.init() for early worker initialization
Enables execution of scripts before the worker's I/O loop starts, providing a general solution for imports that interfere with I/O operations (e.g., numpy hang on Windows - numpy/numpy#24290). The init script is passed to workers via a temporary file referenced by the APPOSE_INIT_SCRIPT environment variable. GroovyWorker and python_worker both support this mechanism. As per: * apposed/appose-java@fb6b861 * apposed/appose-java@5e3eb7f 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 1619cb6 commit b6bab95

File tree

2 files changed

+100
-0
lines changed

2 files changed

+100
-0
lines changed

src/appose/service.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from __future__ import annotations
1010

1111
import subprocess
12+
import tempfile
1213
import threading
1314
from enum import Enum
1415
from pathlib import Path
@@ -58,6 +59,7 @@ def __init__(
5859
self._stderr_thread: threading.Thread | None = None
5960
self._monitor_thread: threading.Thread | None = None
6061
self._debug_callback: Callable[[Any], Any] | None = None
62+
self._init_script: str | None = None
6163
self._syntax: ScriptSyntax | None = None
6264

6365
def debug(self, debug_callback: Callable[[Any], Any]) -> None:
@@ -70,6 +72,32 @@ def debug(self, debug_callback: Callable[[Any], Any]) -> None:
7072
"""
7173
self._debug_callback = debug_callback
7274

75+
def init(self, script: str) -> "Service":
76+
"""
77+
Register a script to be executed when the worker process first starts up,
78+
before any tasks are processed. This is useful for early initialization that
79+
must happen before the worker's main loop begins, such as importing libraries
80+
that may interfere with I/O operations.
81+
82+
Example: On Windows, importing numpy can hang when stdin is open for reading
83+
(described at https://github.com/numpy/numpy/issues/24290).
84+
Using service.init("import numpy") works around this by importing
85+
numpy before the worker's I/O loop starts.
86+
87+
Args:
88+
script: The script code to execute during worker initialization.
89+
90+
Returns:
91+
This service object, for chaining method calls.
92+
93+
Raises:
94+
RuntimeError: If the service has already started.
95+
"""
96+
if self._process is not None:
97+
raise RuntimeError("Service already started")
98+
self._init_script = script
99+
return self
100+
73101
def start(self) -> None:
74102
"""
75103
Explicitly launch the worker process associated with this service.
@@ -86,6 +114,19 @@ def start(self) -> None:
86114

87115
prefix = f"Appose-Service-{self._service_id}"
88116

117+
# If an init script is provided, write it to a temporary file
118+
# and pass its path via environment variable.
119+
if self._init_script:
120+
with tempfile.NamedTemporaryFile(
121+
mode="w",
122+
encoding="utf-8",
123+
prefix="appose-init-",
124+
suffix=".txt",
125+
delete=False,
126+
) as init_file:
127+
init_file.write(self._init_script)
128+
self._env_vars["APPOSE_INIT_SCRIPT"] = init_file.name
129+
89130
self._process = process.builder(self._cwd, self._env_vars, *self._args)
90131
self._stdout_thread = threading.Thread(
91132
target=self._stdout_loop, name=f"{prefix}-Stdout"

tests/test_service.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,62 @@ def test_main_thread_queue_python():
131131
assert TaskStatus.COMPLETE == task.status
132132
thread = task.outputs.get("thread")
133133
assert thread != "MainThread"
134+
135+
136+
def test_init():
137+
"""Tests that init script is executed before tasks run."""
138+
env = appose.system()
139+
with env.python().init("init_value = 'initialized'") as service:
140+
maybe_debug(service)
141+
142+
# Verify that the init script was executed and the variable is accessible.
143+
task = service.task("init_value").wait_for()
144+
assert TaskStatus.COMPLETE == task.status
145+
146+
result = task.result()
147+
assert result == "initialized", "Init script should set init_value variable"
148+
149+
150+
def test_init_numpy():
151+
"""Tests that NumPy works on every platform, even Windows."""
152+
env = (
153+
appose.pixi()
154+
.base("target/envs/test-init-numpy")
155+
.conda("numpy=2.3.4")
156+
.pypi("appose==0.7.2")
157+
.log_debug()
158+
.build()
159+
)
160+
with env.python().init("import numpy") as service:
161+
maybe_debug(service)
162+
163+
task = service.task(
164+
"import numpy\n"
165+
"narr = numpy.random.default_rng(seed=1337).random([3, 5])\n"
166+
"[float(v) for v in narr.flatten()]"
167+
).wait_for()
168+
assert TaskStatus.COMPLETE == task.status
169+
170+
result = task.outputs.get("result")
171+
assert isinstance(result, list)
172+
expected = [
173+
0.8781019003,
174+
0.1855279616,
175+
0.9209004548,
176+
0.9465658637,
177+
0.8745080903,
178+
0.1157427629,
179+
0.1937316623,
180+
0.3417371975,
181+
0.4957909002,
182+
0.8983712328,
183+
0.0064586191,
184+
0.2274114670,
185+
0.7936549524,
186+
0.4142867178,
187+
0.0838144031,
188+
]
189+
for i, expected_val in enumerate(expected):
190+
actual_val = result[i]
191+
assert isinstance(actual_val, (int, float))
192+
assert abs(actual_val - expected_val) < 1e-10

0 commit comments

Comments
 (0)