diff --git a/.gitignore b/.gitignore index f9082380e..952e167ae 100644 --- a/.gitignore +++ b/.gitignore @@ -162,3 +162,9 @@ pufferlib/ocean/impulse_wars/*-release/ pufferlib/ocean/impulse_wars/debug-*/ pufferlib/ocean/impulse_wars/release-*/ pufferlib/ocean/impulse_wars/benchmark/ +*.bak +pufferlib/resources/drive/binaries/ +data/ + +pufferlib/resources/drive/binaries/ +data/ \ No newline at end of file diff --git a/pufferlib/config/default.ini b/pufferlib/config/default.ini index 6073c651e..886446590 100644 --- a/pufferlib/config/default.ini +++ b/pufferlib/config/default.ini @@ -6,7 +6,13 @@ rnn_name = None max_suggestion_cost = 3600 [vec] -backend = Multiprocessing +backend = Multithreading +# For the Multithreading backend, you can try limiting the number of threads if the env c_step code is too quick/short. +# (Say 2 or 4 or even 0 to force single-threaded serial execution) +max_num_threads = 1024 + +# Revert to multi-processing backened if needed. +# backend = Multiprocessing num_envs = 2 num_workers = auto batch_size = auto diff --git a/pufferlib/ocean/asteroids/asteroids.py b/pufferlib/ocean/asteroids/asteroids.py index cc3f6f9eb..50a259912 100644 --- a/pufferlib/ocean/asteroids/asteroids.py +++ b/pufferlib/ocean/asteroids/asteroids.py @@ -5,7 +5,8 @@ from pufferlib.ocean.asteroids import binding class Asteroids(pufferlib.PufferEnv): - def __init__(self, num_envs=1, render_mode=None, log_interval=128, buf=None, seed=0, size=500, frameskip=4): + def __init__(self, num_envs=1, render_mode=None, log_interval=128, buf=None, seed=0, size=500, frameskip=4, + max_num_threads=0): obs_shape = 4 + 5 * 20 # player pos, player vel, [asteroid pos, asteroid vel, asteroid size] x num asteroids self.single_observation_space = gymnasium.spaces.Box(low=-5, high=5, shape=(obs_shape,), dtype=np.float32) @@ -13,7 +14,7 @@ def __init__(self, num_envs=1, render_mode=None, log_interval=128, buf=None, see self.render_mode = render_mode self.num_agents = num_envs - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.c_envs = binding.vec_init(self.observations, self.actions, self.rewards, self.terminals, self.truncations, num_envs, seed, size=size, frameskip=frameskip) diff --git a/pufferlib/ocean/battle/battle.py b/pufferlib/ocean/battle/battle.py index 2beb888e5..d9430bb5a 100644 --- a/pufferlib/ocean/battle/battle.py +++ b/pufferlib/ocean/battle/battle.py @@ -9,7 +9,8 @@ class Battle(pufferlib.PufferEnv): def __init__(self, num_envs=1, width=1920, height=1080, size_x=1.0, size_y=1.0, size_z=1.0, num_agents=1024, num_factories=32, - num_armies=4, render_mode=None, log_interval=128, buf=None, seed=0): + num_armies=4, render_mode=None, log_interval=128, buf=None, seed=0, + max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(num_armies*3 + 4*16 + 22 + 8,), dtype=np.float32) self.single_action_space = gymnasium.spaces.Box( @@ -23,7 +24,7 @@ def __init__(self, num_envs=1, width=1920, height=1080, size_x=1.0, if num_agents % num_armies != 0: raise pufferlib.APIUsageError('num_agents must be a multiple of num_armies') - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) c_envs = [] for i in range(num_envs): c_env = binding.env_init( diff --git a/pufferlib/ocean/blastar/blastar.py b/pufferlib/ocean/blastar/blastar.py index e93b3067e..82774104c 100644 --- a/pufferlib/ocean/blastar/blastar.py +++ b/pufferlib/ocean/blastar/blastar.py @@ -4,7 +4,8 @@ from pufferlib.ocean.blastar import binding class Blastar(pufferlib.PufferEnv): - def __init__(self, num_envs=1, render_mode=None, buf=None, seed=0): + def __init__(self, num_envs=1, render_mode=None, buf=None, seed=0, + max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box( low=0, high=1, shape=(10,), dtype=np.float32 ) @@ -15,7 +16,7 @@ def __init__(self, num_envs=1, render_mode=None, buf=None, seed=0): self.tick = 0 self.log_interval = 1 - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.c_envs = binding.vec_init( self.observations, self.actions, diff --git a/pufferlib/ocean/boids/boids.py b/pufferlib/ocean/boids/boids.py index 329b36dea..46804c647 100644 --- a/pufferlib/ocean/boids/boids.py +++ b/pufferlib/ocean/boids/boids.py @@ -21,7 +21,8 @@ def __init__( margin_turn_factor=1.0, centering_factor=0.0, avoid_factor=0.0, - matching_factor=0.0 + matching_factor=0.0, + max_num_threads=0 ): ACTION_SPACE_SIZE = 2 self.num_agents = num_envs * num_boids @@ -39,7 +40,7 @@ def __init__( self.render_mode = render_mode self.report_interval = report_interval - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.actions = self.actions.astype(np.float32) # Create C binding with flattened action buffer diff --git a/pufferlib/ocean/breakout/breakout.py b/pufferlib/ocean/breakout/breakout.py index b5a302145..89d667816 100644 --- a/pufferlib/ocean/breakout/breakout.py +++ b/pufferlib/ocean/breakout/breakout.py @@ -10,11 +10,8 @@ def __init__(self, num_envs=1, render_mode=None, paddle_width=62, paddle_height=8, ball_width=32, ball_height=32, brick_width=32, brick_height=12, - brick_rows=6, brick_cols=18, - initial_ball_speed=256, max_ball_speed=448, - paddle_speed=620, - continuous=False, log_interval=128, - buf=None, seed=0): + brick_rows=6, brick_cols=18, continuous=False, log_interval=128, + buf=None, seed=0, max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(10 + brick_rows*brick_cols,), dtype=np.float32) self.render_mode = render_mode @@ -29,7 +26,7 @@ def __init__(self, num_envs=1, render_mode=None, else: self.single_action_space = gymnasium.spaces.Discrete(3) - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) if continuous: self.actions = self.actions.flatten() else: diff --git a/pufferlib/ocean/cartpole/cartpole.py b/pufferlib/ocean/cartpole/cartpole.py index 7f0c3caf2..7483aaec3 100644 --- a/pufferlib/ocean/cartpole/cartpole.py +++ b/pufferlib/ocean/cartpole/cartpole.py @@ -4,10 +4,7 @@ from pufferlib.ocean.cartpole import binding class Cartpole(pufferlib.PufferEnv): - def __init__(self, num_envs=1, cart_mass=1.0, pole_mass=0.1, - pole_length=0.5, gravity=9.8, force_mag=10.0, dt=0.02, - render_mode='human', report_interval=1, continuous=False, - buf=None, seed=0): + def __init__(self, num_envs=1, render_mode='human', report_interval=1, continuous=False, buf=None, seed=0, max_num_threads=0): self.render_mode = render_mode self.num_agents = num_envs self.report_interval = report_interval @@ -27,7 +24,7 @@ def __init__(self, num_envs=1, cart_mass=1.0, pole_mass=0.1, else: self.single_action_space = gymnasium.spaces.Discrete(2) - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.actions = np.zeros(num_envs, dtype=np.float32) self.c_envs = binding.vec_init( diff --git a/pufferlib/ocean/checkers/checkers.py b/pufferlib/ocean/checkers/checkers.py index 8c2195cfa..2fdfe2f20 100644 --- a/pufferlib/ocean/checkers/checkers.py +++ b/pufferlib/ocean/checkers/checkers.py @@ -5,7 +5,7 @@ from pufferlib.ocean.checkers import binding class Checkers(pufferlib.PufferEnv): - def __init__(self, num_envs=1, render_mode=None, log_interval=128, size=8, buf=None, seed=0): + def __init__(self, num_envs=1, render_mode=None, log_interval=128, size=8, buf=None, seed=0, max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(size*size,), dtype=np.uint8) num_move_types = 8 # Move types are: NW, NE, SW, SE, 2*NW, 2*NE, 2*SW, 2*SE, @@ -15,7 +15,7 @@ def __init__(self, num_envs=1, render_mode=None, log_interval=128, size=8, buf=N self.num_agents = num_envs self.log_interval = log_interval - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.c_envs = binding.vec_init(self.observations, self.actions, self.rewards, self.terminals, self.truncations, num_envs, seed, size=size) diff --git a/pufferlib/ocean/connect4/connect4.py b/pufferlib/ocean/connect4/connect4.py index 27cdffb25..c45855a29 100644 --- a/pufferlib/ocean/connect4/connect4.py +++ b/pufferlib/ocean/connect4/connect4.py @@ -14,7 +14,7 @@ class Connect4(pufferlib.PufferEnv): def __init__(self, num_envs=1, render_mode=None, report_interval=128, - buf=None, seed=0): + buf=None, seed=0, max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(42,), dtype=np.float32) @@ -23,7 +23,7 @@ def __init__(self, num_envs=1, render_mode=None, report_interval=128, self.render_mode = render_mode self.num_agents = num_envs - super().__init__(buf=buf) + super().__init__(buf=buf, binding=binding, max_num_threads=max_num_threads) self.c_envs = binding.vec_init(self.observations, self.actions, self.rewards, self.terminals, self.truncations, num_envs, seed) diff --git a/pufferlib/ocean/convert/convert.py b/pufferlib/ocean/convert/convert.py index 1f57cead8..c3972548e 100644 --- a/pufferlib/ocean/convert/convert.py +++ b/pufferlib/ocean/convert/convert.py @@ -8,7 +8,7 @@ class Convert(pufferlib.PufferEnv): def __init__(self, num_envs=1, width=1920, height=1080, num_agents=1024, num_factories=32, - num_resources=8, render_mode=None, log_interval=128, buf=None, seed=0): + num_resources=8, render_mode=None, log_interval=128, buf=None, seed=0, max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(2*num_resources + 4 + num_resources,), dtype=np.float32) self.single_action_space = gymnasium.spaces.MultiDiscrete([9, 5]) @@ -20,7 +20,7 @@ def __init__(self, num_envs=1, width=1920, height=1080, num_agents=1024, num_fac if num_resources < 1 or num_resources > 8: raise pufferlib.APIUsageError('num_resources must be in [1, 8]') - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) c_envs = [] for i in range(num_envs): c_env = binding.env_init( diff --git a/pufferlib/ocean/convert_circle/convert_circle.py b/pufferlib/ocean/convert_circle/convert_circle.py index f8682535b..571822ee8 100644 --- a/pufferlib/ocean/convert_circle/convert_circle.py +++ b/pufferlib/ocean/convert_circle/convert_circle.py @@ -8,7 +8,8 @@ class ConvertCircle(pufferlib.PufferEnv): def __init__(self, num_envs=1, width=1920, height=1080, num_agents=1024, num_factories=32, - num_resources=8, equidistant=0, radius=30, render_mode=None, log_interval=128, buf=None, seed=0): + num_resources=8, equidistant=0, radius=30, render_mode=None, log_interval=128, buf=None, + seed=0, max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(2*num_resources + 4 + num_resources,), dtype=np.float32) self.single_action_space = gymnasium.spaces.MultiDiscrete([9, 5]) @@ -20,7 +21,7 @@ def __init__(self, num_envs=1, width=1920, height=1080, num_agents=1024, num_fac if num_resources < 1 or num_resources > 8: raise pufferlib.APIUsageError('num_resources must be in [1, 8]') - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) c_envs = [] for i in range(num_envs): c_env = binding.env_init( diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index d6557e3f6..68d4430c9 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -20,7 +20,8 @@ def __init__(self, render_mode=None, report_interval=1, num_maps=100, num_agents=512, buf = None, - seed=1): + seed=1, + max_num_threads=0): # env self.render_mode = render_mode @@ -49,7 +50,7 @@ def __init__(self, render_mode=None, report_interval=1, self.agent_offsets = agent_offsets self.map_ids = map_ids self.num_envs = num_envs - super().__init__(buf=buf) + super().__init__(buf=buf, binding=binding, max_num_threads=max_num_threads) env_ids = [] for i in range(num_envs): cur = agent_offsets[i] diff --git a/pufferlib/ocean/drone_race/drone_race.py b/pufferlib/ocean/drone_race/drone_race.py index 750fb675d..4aa338847 100644 --- a/pufferlib/ocean/drone_race/drone_race.py +++ b/pufferlib/ocean/drone_race/drone_race.py @@ -14,6 +14,7 @@ def __init__( seed=0, max_rings=10, max_moves=1000, + max_num_threads=0 ): self.single_observation_space = gymnasium.spaces.Box( low=-1, @@ -31,7 +32,7 @@ def __init__( self.report_interval = report_interval self.tick = 0 - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.actions = self.actions.astype(np.float32) c_envs = [] diff --git a/pufferlib/ocean/drone_swarm/drone_swarm.py b/pufferlib/ocean/drone_swarm/drone_swarm.py index 3503a7aa4..141bff9f1 100644 --- a/pufferlib/ocean/drone_swarm/drone_swarm.py +++ b/pufferlib/ocean/drone_swarm/drone_swarm.py @@ -14,6 +14,7 @@ def __init__( report_interval=1024, buf=None, seed=0, + max_num_threads=0 ): self.single_observation_space = gymnasium.spaces.Box( low=-1, @@ -31,7 +32,7 @@ def __init__( self.report_interval = report_interval self.tick = 0 - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.actions = self.actions.astype(np.float32) c_envs = [] diff --git a/pufferlib/ocean/enduro/enduro.py b/pufferlib/ocean/enduro/enduro.py index 286e8219e..3fe98b5d5 100644 --- a/pufferlib/ocean/enduro/enduro.py +++ b/pufferlib/ocean/enduro/enduro.py @@ -9,7 +9,7 @@ class Enduro(pufferlib.PufferEnv): def __init__(self, num_envs=1, render_mode=None, width=152, height=210, car_width=16, car_height=11, max_enemies=10, frameskip=1, continuous=False, - log_interval=128, buf=None, seed=None): + log_interval=128, buf=None, seed=None, max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box( low=0, high=1, shape=(8 + (5 * max_enemies) + 9 + 1,), dtype=np.float32 @@ -27,7 +27,7 @@ def __init__(self, num_envs=1, render_mode=None, self.seed = random.randint(1, 1000000) else: self.seed = 0 - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.c_envs = binding.vec_init( self.observations, self.actions, self.rewards, diff --git a/pufferlib/ocean/env_binding.h b/pufferlib/ocean/env_binding.h index f64b6148b..13dee5fdb 100644 --- a/pufferlib/ocean/env_binding.h +++ b/pufferlib/ocean/env_binding.h @@ -1,6 +1,9 @@ #include #include +#include +#include + // Forward declarations for env-specific functions supplied by user static int my_log(PyObject* dict, Log* log); static int my_init(Env* env, PyObject* args, PyObject* kwargs); @@ -192,7 +195,7 @@ static PyObject* env_reset(PyObject* self, PyObject* args) { static PyObject* env_step(PyObject* self, PyObject* args) { int num_args = PyTuple_Size(args); if (num_args != 1) { - PyErr_SetString(PyExc_TypeError, "vec_render requires 1 argument"); + PyErr_SetString(PyExc_TypeError, "env_step requires 1 argument"); return NULL; } @@ -259,11 +262,150 @@ static PyObject* env_put(PyObject* self, PyObject* args, PyObject* kwargs) { Py_RETURN_NONE; } + +typedef struct +{ + atomic_int work_index; + atomic_int num_running_threads; + volatile int num_threads; + pthread_cond_t wake_cnd; + pthread_t* threads; +} ThreadData; + typedef struct { Env** envs; int num_envs; + ThreadData* thread_data; } VecEnv; +static int global_num_threads = 0; + +// Main worker thread; initializes itself and runs a tight loop running through c_step (after waiting for work signal). +static void* c_threadstep(void* arg) +{ + VecEnv* vec_env = (VecEnv*)arg; + + pthread_mutex_t mtx; + pthread_mutex_init(&mtx, NULL); + pthread_cond_t* wake = &vec_env->thread_data->wake_cnd; + + atomic_int* work_index = &vec_env->thread_data->work_index; + atomic_int* num_running_threads = &vec_env->thread_data->num_running_threads; + volatile int* num_threads = &vec_env->thread_data->num_threads; + int index; + atomic_fetch_add(num_running_threads, 1); + while (1) + { + // Wait for work + pthread_mutex_lock(&mtx); + pthread_cond_wait(wake, &mtx); + pthread_mutex_unlock(&mtx); + + if (*num_threads <= 0) { break; } // Exit thread gracefully. + + // Got work to do now. + atomic_fetch_add(num_running_threads, 1); + do + { + // This is important: Go do a bunch of work in our thread, without context switches or locks + // or any new allocs. This is the main speedup and core to ensuring the threads do as little work + // as part of their main loop as possible. We can afford to this as the load balancing naturally happens + // with mutually exclusive index values spread across threads. + index = atomic_fetch_sub(work_index, 1); + if (index >= 0) { c_step(vec_env->envs[index]); } + } + while (index > 0); + atomic_fetch_sub(num_running_threads, 1); + } + pthread_mutex_destroy(&mtx); + return NULL; +} + +// Waits for and exits all threads (if needed). +static void c_vecclose(VecEnv* vec_env) +{ + if (global_num_threads <= 2 || vec_env->num_envs <= 2 || !vec_env->thread_data || vec_env->thread_data->num_threads == 0) { return; } + if (vec_env->thread_data->threads) + { + int num_threads = vec_env->thread_data->num_threads; + atomic_store(&vec_env->thread_data->work_index, -1); + vec_env->thread_data->num_threads = 0; // Signal to threads to exit + pthread_cond_broadcast(&vec_env->thread_data->wake_cnd); + // Wait for them to exit. + while (atomic_load(&vec_env->thread_data->num_running_threads) > 0) {} + + for (int i = 0; i < num_threads; ++i) + { + pthread_join(vec_env->thread_data->threads[i], NULL); + } + pthread_cond_destroy(&vec_env->thread_data->wake_cnd); + free(vec_env->thread_data->threads); + vec_env->thread_data->threads = NULL; + } + free(vec_env->thread_data); +} + +// Inits multi-threading if enabled via vec_enable_mt. +static int c_vecinit(VecEnv* vec_env) +{ + // If we have only a couple envs, it's not worth parallelizing. Also, don't penalize the user as they + // may want to change the .ini dynamically without having to worry about this. + if (global_num_threads <= 2 || vec_env->num_envs <= 2) + { + global_num_threads = 0; + return 1; + } + // NOTE: On failure, we may have sem-initialized state - but it's okay because we will quit the entire program at that point. + vec_env->thread_data = (ThreadData*)calloc(1, sizeof(ThreadData)); + vec_env->thread_data->num_threads = global_num_threads; + vec_env->thread_data->threads = (pthread_t*)calloc(vec_env->thread_data->num_threads, sizeof(pthread_t)); + if (!vec_env->thread_data->threads) { return 0; } + if (pthread_cond_init(&vec_env->thread_data->wake_cnd, NULL) != 0) { return 0; } + atomic_store(&vec_env->thread_data->num_running_threads, 0); + atomic_store(&vec_env->thread_data->work_index, -1); + + for (int i = 0; i < vec_env->thread_data->num_threads; ++i) + { + if (pthread_create(&vec_env->thread_data->threads[i], NULL, c_threadstep, vec_env) != 0) { return 0; } + } + + // Wait for all threads to initialize (okay to busy wait here). + while (atomic_load(&vec_env->thread_data->num_running_threads) < vec_env->thread_data->num_threads) {} + atomic_store_explicit(&vec_env->thread_data->num_running_threads, 0, memory_order_relaxed); + return 1; +} + +// Signals worker threads to step across all environments. This is called from the main thread. +// NOTE: Also uses the main thread to avoid having a signal/wait object. +static int c_vecstep(VecEnv* vec_env) +{ + if (vec_env->thread_data->num_threads == 0 || atomic_load(&vec_env->thread_data->work_index) >= 0) { return 0; } + + // Produce work for the worker threads. + atomic_int* work_index = &vec_env->thread_data->work_index; + atomic_store_explicit(work_index, vec_env->num_envs - 1, memory_order_relaxed); + + // Signal to other threads that there is new work to be done. + pthread_cond_broadcast(&vec_env->thread_data->wake_cnd); + + // Why waste a (main) thread? (Also no need for a lock/condition variable etc). + int index; + do + { + index = atomic_fetch_sub(work_index, 1); + if (index >= 0) { c_step(vec_env->envs[index]); } + } + while (index > 0); + + // Wait for all threads to finish fully. + // TODO(perumaal): I think this is a bad idea though - we should never spin CPU cycles busy waiting. + // This is a simple initial solution and assumes SIMD-like work happening in the worker threads + // which significantly reduces the chance of busy waiting here. + while (atomic_load(&vec_env->thread_data->num_running_threads) > 0) {} + + return 1; +} + static VecEnv* unpack_vecenv(PyObject* args) { PyObject* handle_obj = PyTuple_GetItem(args, 0); if (!PyObject_TypeCheck(handle_obj, &PyLong_Type)) { @@ -285,6 +427,23 @@ static VecEnv* unpack_vecenv(PyObject* args) { return vec; } +static PyObject* vec_enable_mt(PyObject* self, PyObject* args) { + if (PyTuple_Size(args) != 1) { + PyErr_SetString(PyExc_TypeError, "vec_enable_mt requires 1 arguments"); + return NULL; + } + + PyObject* num_threads_arg = PyTuple_GetItem(args, 0); + if (!PyObject_TypeCheck(num_threads_arg, &PyLong_Type)) { + PyErr_SetString(PyExc_TypeError, "num_threads_arg must be an integer"); + return NULL; + } + global_num_threads = PyLong_AsLong(num_threads_arg); + Py_RETURN_NONE; +} + + + static PyObject* vec_init(PyObject* self, PyObject* args, PyObject* kwargs) { if (PyTuple_Size(args) != 7) { PyErr_SetString(PyExc_TypeError, "vec_init requires 6 arguments"); @@ -401,7 +560,6 @@ static PyObject* vec_init(PyObject* self, PyObject* args, PyObject* kwargs) { } else { Py_INCREF(kwargs); // We need to increment the reference since we'll be modifying it } - for (int i = 0; i < num_envs; i++) { Env* env = (Env*)calloc(1, sizeof(Env)); if (!env) { @@ -440,13 +598,18 @@ static PyObject* vec_init(PyObject* self, PyObject* args, PyObject* kwargs) { return NULL; } } + if (!c_vecinit(vec)) { + PyErr_SetString(PyExc_RuntimeError, "Failed to initialize vec env threads"); + return NULL; + } Py_DECREF(kwargs); return PyLong_FromVoidPtr(vec); } -// Python function to close the environment +// Python function to vectorize an array of enviroments and return a strong pointer +// to an internal structure (VecEnv) for use later. static PyObject* vectorize(PyObject* self, PyObject* args) { int num_envs = PyTuple_Size(args); if (num_envs == 0) { @@ -475,7 +638,10 @@ static PyObject* vectorize(PyObject* self, PyObject* args) { } vec->envs[i] = (Env*)PyLong_AsVoidPtr(handle_obj); } - + if (!c_vecinit(vec)) { + PyErr_SetString(PyExc_RuntimeError, "Failed to initialize vec env threads"); + return NULL; + } return PyLong_FromVoidPtr(vec); } @@ -496,7 +662,9 @@ static PyObject* vec_reset(PyObject* self, PyObject* args) { return NULL; } int seed = PyLong_AsLong(seed_arg); - + + // TODO(perumaal): Should this be multi-thread aware as well? (see vec_step below). + // Main issue is that srand is not thread-safe. But do we care? for (int i = 0; i < vec->num_envs; i++) { // Assumes each process has the same number of environments srand(i + seed*vec->num_envs); @@ -516,9 +684,13 @@ static PyObject* vec_step(PyObject* self, PyObject* arg) { if (!vec) { return NULL; } - - for (int i = 0; i < vec->num_envs; i++) { - c_step(vec->envs[i]); + if (global_num_threads > 2) { + c_vecstep(vec); + } + else { + for (int i = 0; i < vec->num_envs; i++) { + c_step(vec->envs[i]); + } } Py_RETURN_NONE; } @@ -603,6 +775,7 @@ static PyObject* vec_close(PyObject* self, PyObject* args) { return NULL; } + c_vecclose(vec); for (int i = 0; i < vec->num_envs; i++) { c_close(vec->envs[i]); free(vec->envs[i]); @@ -649,6 +822,7 @@ static PyMethodDef methods[] = { {"env_close", env_close, METH_VARARGS, "Close the environment"}, {"env_get", env_get, METH_VARARGS, "Get the environment state"}, {"env_put", (PyCFunction)env_put, METH_VARARGS | METH_KEYWORDS, "Put stuff into env"}, + {"vec_enable_mt", vec_enable_mt, METH_VARARGS, "Sets up multi-threading with provided number of threads"}, {"vectorize", vectorize, METH_VARARGS, "Make a vector of environment handles"}, {"vec_init", (PyCFunction)vec_init, METH_VARARGS | METH_KEYWORDS, "Initialize a vector of environments"}, {"vec_reset", vec_reset, METH_VARARGS, "Reset the vector of environments"}, diff --git a/pufferlib/ocean/freeway/freeway.py b/pufferlib/ocean/freeway/freeway.py index 6e33cea87..3694320d6 100644 --- a/pufferlib/ocean/freeway/freeway.py +++ b/pufferlib/ocean/freeway/freeway.py @@ -26,6 +26,7 @@ def __init__( log_interval=128, buf=None, seed=0, + max_num_threads=0, ): assert level < 8, "Level should be in {0, 1, 2, 3, 4, 5, 6, 7} or -1. Level -1 is a random mix of all 8 supported levels." self.single_observation_space = gymnasium.spaces.Box( @@ -38,7 +39,7 @@ def __init__( self.single_action_space = gymnasium.spaces.Discrete(3) - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.c_envs = binding.vec_init( self.observations, diff --git a/pufferlib/ocean/g2048/g2048.py b/pufferlib/ocean/g2048/g2048.py index a18bb0dc7..82c23a9f8 100644 --- a/pufferlib/ocean/g2048/g2048.py +++ b/pufferlib/ocean/g2048/g2048.py @@ -7,7 +7,7 @@ from pufferlib.ocean.g2048 import binding class G2048(pufferlib.PufferEnv): - def __init__(self, num_envs=1, render_mode=None, log_interval=128, buf=None, seed=0): + def __init__(self, num_envs=1, render_mode=None, log_interval=128, buf=None, seed=0, max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box( low=0, high=100, shape=(4,4), dtype=np.uint8 ) @@ -16,7 +16,7 @@ def __init__(self, num_envs=1, render_mode=None, log_interval=128, buf=None, see self.num_agents = num_envs self.log_interval = log_interval - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.c_envs = binding.vec_init( self.observations, self.actions, self.rewards, self.terminals, self.truncations, num_envs, seed diff --git a/pufferlib/ocean/go/go.py b/pufferlib/ocean/go/go.py index 06b87155d..53fccf7d4 100644 --- a/pufferlib/ocean/go/go.py +++ b/pufferlib/ocean/go/go.py @@ -26,7 +26,7 @@ def __init__(self, num_envs=1, render_mode=None, log_interval=1, reward_move_valid = 0.1, reward_player_capture = 0.25, reward_opponent_capture = -0.25, - buf = None, seed=0): + buf = None, seed=0, max_num_threads=0): # env self.num_agents = num_envs @@ -39,7 +39,7 @@ def __init__(self, num_envs=1, render_mode=None, log_interval=1, shape=(self.num_obs,), dtype=np.float32) self.single_action_space = gymnasium.spaces.Discrete(self.num_act) - super().__init__(buf=buf) + super().__init__(buf, binding, max_num_threads) height = 64*(grid_size+1) self.c_envs = binding.vec_init(self.observations, self.actions, self.rewards, self.terminals, self.truncations, num_envs, seed, width=width, height=height, grid_size=grid_size, diff --git a/pufferlib/ocean/grid/grid.py b/pufferlib/ocean/grid/grid.py index 0229d6fe7..f6c961468 100644 --- a/pufferlib/ocean/grid/grid.py +++ b/pufferlib/ocean/grid/grid.py @@ -9,7 +9,7 @@ class Grid(pufferlib.PufferEnv): def __init__(self, render_mode='raylib', vision_range=5, num_envs=4096, num_maps=1000, map_size=-1, max_size=9, - report_interval=128, buf=None, seed=0): + report_interval=128, buf=None, seed=0, max_num_threads=0): assert map_size <= max_size self.obs_size = 2*vision_range + 1 self.single_observation_space = gymnasium.spaces.Box(low=0, high=255, @@ -18,7 +18,7 @@ def __init__(self, render_mode='raylib', vision_range=5, self.render_mode = render_mode self.num_agents = num_envs self.report_interval = report_interval - super().__init__(buf=buf) + super().__init__(buf, binding, max_num_threads) self.float_actions = np.zeros_like(self.actions).astype(np.float32) self.c_state = binding.shared(num_maps=num_maps, max_size=max_size, size=map_size) self.c_envs = binding.vec_init(self.observations, self.float_actions, diff --git a/pufferlib/ocean/impulse_wars/impulse_wars.py b/pufferlib/ocean/impulse_wars/impulse_wars.py index 6fc2f5d27..ab25fa1df 100644 --- a/pufferlib/ocean/impulse_wars/impulse_wars.py +++ b/pufferlib/ocean/impulse_wars/impulse_wars.py @@ -42,6 +42,7 @@ def __init__( render: bool = False, report_interval: int = 64, buf = None, + max_num_threads: int = 0, ): self.obsInfo = SimpleNamespace(**binding.get_consts(num_drones)) @@ -89,7 +90,7 @@ def __init__( self.report_interval = report_interval self.render_mode = "human" if render else None - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) if not self.continuous: self.actions = np.zeros((self.num_agents, self.obsInfo.contActionsSize), dtype=np.float32) diff --git a/pufferlib/ocean/matsci/matsci.py b/pufferlib/ocean/matsci/matsci.py index eda5df1a4..a69e94d08 100644 --- a/pufferlib/ocean/matsci/matsci.py +++ b/pufferlib/ocean/matsci/matsci.py @@ -7,7 +7,7 @@ from pufferlib.ocean.matsci import binding class Matsci(pufferlib.PufferEnv): - def __init__(self, num_envs=1, num_atoms=2, render_mode=None, log_interval=128, buf=None, seed=0): + def __init__(self, num_envs=1, num_atoms=2, render_mode=None, log_interval=128, buf=None, seed=0, max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(3,), dtype=np.float32) self.single_action_space = gymnasium.spaces.Box( @@ -16,7 +16,7 @@ def __init__(self, num_envs=1, num_atoms=2, render_mode=None, log_interval=128, self.render_mode = render_mode self.num_agents = num_envs*num_atoms - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) c_envs = [] for i in range(num_envs): c_envs.append(binding.env_init( diff --git a/pufferlib/ocean/moba/moba.py b/pufferlib/ocean/moba/moba.py index 4b4dd0be8..1b37c3292 100644 --- a/pufferlib/ocean/moba/moba.py +++ b/pufferlib/ocean/moba/moba.py @@ -15,7 +15,8 @@ class Moba(pufferlib.PufferEnv): def __init__(self, num_envs=4, vision_range=5, agent_speed=1.0, discretize=True, reward_death=-1.0, reward_xp=0.006, reward_distance=0.05, reward_tower=3.0, report_interval=32, - script_opponents=True, render_mode='human', buf=None, seed=0): + script_opponents=True, render_mode='human', buf=None, seed=0, + max_num_threads=0): self.report_interval = report_interval self.render_mode = render_mode @@ -25,7 +26,7 @@ def __init__(self, num_envs=4, vision_range=5, agent_speed=1.0, shape=(MAP_OBS_N + PLAYER_OBS_N,), dtype=np.uint8) self.single_action_space = gymnasium.spaces.MultiDiscrete([7, 7, 3, 2, 2, 2]) - super().__init__(buf=buf) + super().__init__(buf, binding, max_num_threads) c_envs = [] players = 5 if script_opponents else 10 diff --git a/pufferlib/ocean/nmmo3/nmmo3.py b/pufferlib/ocean/nmmo3/nmmo3.py index 32623501b..10fe1eaa5 100644 --- a/pufferlib/ocean/nmmo3/nmmo3.py +++ b/pufferlib/ocean/nmmo3/nmmo3.py @@ -18,7 +18,8 @@ def __init__(self, width=8*[512], height=8*[512], num_envs=4, item_respawn_ticks=100, x_window=7, y_window=5, reward_combat_level=1.0, reward_prof_level=1.0, reward_item_level=0.5, reward_market=0.01, - reward_death=-1.0, log_interval=128, buf=None, seed=0): + reward_death=-1.0, log_interval=128, buf=None, seed=0, + max_num_threads=0): self.log_interval = log_interval @@ -145,7 +146,7 @@ def __init__(self, width=8*[512], height=8*[512], num_envs=4, self.single_action_space = gymnasium.spaces.Discrete(26) self.render_mode = 'human' - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) player_count = 0 enemy_count = 0 c_envs = [] diff --git a/pufferlib/ocean/pacman/pacman.py b/pufferlib/ocean/pacman/pacman.py index 1521b6ffb..6926a46b4 100644 --- a/pufferlib/ocean/pacman/pacman.py +++ b/pufferlib/ocean/pacman/pacman.py @@ -17,7 +17,8 @@ def __init__(self, num_envs=1, render_mode=None, scatter_mode_length = 70, chase_mode_length = 140, log_interval=128, - buf=None, seed=0): + buf=None, seed=0, + max_num_threads=0): ghost_observations_count = 9 player_observations_count = 11 @@ -40,7 +41,7 @@ def __init__(self, num_envs=1, render_mode=None, self.human_action = None self.tick = 0 - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.c_envs = binding.vec_init(self.observations, self.actions, self.rewards, self.terminals, self.truncations, num_envs, seed, diff --git a/pufferlib/ocean/rware/rware.py b/pufferlib/ocean/rware/rware.py index 25fb18116..203eb1b66 100644 --- a/pufferlib/ocean/rware/rware.py +++ b/pufferlib/ocean/rware/rware.py @@ -22,7 +22,8 @@ def __init__(self, num_envs=1, render_mode=None, report_interval=1, grid_square_size=64, human_agent_idx=0, reward_type=1, - buf = None, seed=0): + buf = None, seed=0, + max_num_threads=0): # env self.num_agents = num_envs*num_agents @@ -34,7 +35,7 @@ def __init__(self, num_envs=1, render_mode=None, report_interval=1, shape=(self.num_obs,), dtype=np.float32) self.single_action_space = gymnasium.spaces.Discrete(5) - super().__init__(buf=buf) + super().__init__(buf, binding, max_num_threads) c_envs = [] for i in range(num_envs): env_id = binding.env_init( diff --git a/pufferlib/ocean/shared_pool/shared_pool.py b/pufferlib/ocean/shared_pool/shared_pool.py index 885b44366..df44d006e 100644 --- a/pufferlib/ocean/shared_pool/shared_pool.py +++ b/pufferlib/ocean/shared_pool/shared_pool.py @@ -19,6 +19,7 @@ def __init__(self, render_mode=None, buf=None, seed=0, + max_num_threads=0 ): widths = num_envs*widths heights = num_envs*heights @@ -32,7 +33,7 @@ def __init__(self, self.tick = 0 self.report_interval = report_interval - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) c_envs = [] for i in range(num_envs): n = num_agents[i] diff --git a/pufferlib/ocean/squared/squared.py b/pufferlib/ocean/squared/squared.py index 533778810..266f6fdca 100644 --- a/pufferlib/ocean/squared/squared.py +++ b/pufferlib/ocean/squared/squared.py @@ -7,7 +7,8 @@ from pufferlib.ocean.squared import binding class Squared(pufferlib.PufferEnv): - def __init__(self, num_envs=1, render_mode=None, log_interval=128, size=11, buf=None, seed=0): + def __init__(self, num_envs=1, render_mode=None, log_interval=128, size=11, buf=None, seed=0, + max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(size*size,), dtype=np.uint8) self.single_action_space = gymnasium.spaces.Discrete(5) @@ -15,7 +16,7 @@ def __init__(self, num_envs=1, render_mode=None, log_interval=128, size=11, buf= self.num_agents = num_envs self.log_interval = log_interval - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.c_envs = binding.vec_init(self.observations, self.actions, self.rewards, self.terminals, self.truncations, num_envs, seed, size=size) diff --git a/pufferlib/ocean/target/target.py b/pufferlib/ocean/target/target.py index f370a15b1..29aeb7c09 100644 --- a/pufferlib/ocean/target/target.py +++ b/pufferlib/ocean/target/target.py @@ -8,16 +8,23 @@ class Target(pufferlib.PufferEnv): def __init__(self, num_envs=1, width=1080, height=720, num_agents=8, - num_goals=4, render_mode=None, log_interval=128, size=11, buf=None, seed=0): + num_goals=4, render_mode=None, log_interval=128, size=11, buf=None, seed=0, + max_num_threads=0): + # Observation space: Each agent observes how close they are to the goals, and the other agents (including self). + # NOTE: Distance to self for each agent is (0, 0). + # 4 additional features (heading, reward, agent (self) speed, agent (self) heading, etc.) + # All x,y coordinates are normalized to [0, 1] range. self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(2*(num_agents+num_goals) + 4,), dtype=np.float32) + # See https://gymnasium.farama.org/api/spaces/fundamental/#gymnasium.spaces.MultiDiscrete + # Heading: 9 discrete actions, Speed: 4 discrete speeds. self.single_action_space = gymnasium.spaces.MultiDiscrete([9, 5]) self.render_mode = render_mode self.num_agents = num_envs*num_agents self.log_interval = log_interval - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) c_envs = [] for i in range(num_envs): c_env = binding.env_init( diff --git a/pufferlib/ocean/template/template.py b/pufferlib/ocean/template/template.py index b73abd765..d10567f10 100644 --- a/pufferlib/ocean/template/template.py +++ b/pufferlib/ocean/template/template.py @@ -7,14 +7,15 @@ from pufferlib.ocean.template import binding class Template(pufferlib.PufferEnv): - def __init__(self, num_envs=1, render_mode=None, log_interval=128, size=5, buf=None, seed=0): + def __init__(self, num_envs=1, render_mode=None, log_interval=128, size=5, buf=None, seed=0, + max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(1,), dtype=np.uint8) self.single_action_space = gymnasium.spaces.Discrete(2) self.render_mode = render_mode self.num_agents = num_envs - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.c_envs = binding.vec_init(self.observations, self.actions, self.rewards, self.terminals, self.truncations, num_envs, seed, size=size) self.size = size diff --git a/pufferlib/ocean/terraform/terraform.py b/pufferlib/ocean/terraform/terraform.py index 9bb886d17..26ac46706 100644 --- a/pufferlib/ocean/terraform/terraform.py +++ b/pufferlib/ocean/terraform/terraform.py @@ -11,7 +11,7 @@ class Terraform(pufferlib.PufferEnv): def __init__(self, num_envs=1, num_agents=8, map_size=64, render_mode=None, log_interval=32, buf=None, seed=0, reset_frequency=8192, - reward_scale=0.01): + reward_scale=0.01, max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(2*OBS_SIZE*OBS_SIZE + 5 + 36*2,), dtype=np.float32) self.single_action_space = gymnasium.spaces.MultiDiscrete([5, 5, 3], dtype=np.int32) @@ -20,7 +20,7 @@ def __init__(self, num_envs=1, num_agents=8, map_size=64, self.log_interval = log_interval self.reset_frequency = reset_frequency self.reward_scale = reward_scale - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) c_envs = [] for i in range(num_envs): c_env = binding.env_init( diff --git a/pufferlib/ocean/tetris/tetris.py b/pufferlib/ocean/tetris/tetris.py index a566272e2..ea38a23a6 100644 --- a/pufferlib/ocean/tetris/tetris.py +++ b/pufferlib/ocean/tetris/tetris.py @@ -15,7 +15,8 @@ def __init__( render_mode=None, log_interval=32, buf=None, - seed=0 + seed=0, + max_num_threads=0, ): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(n_cols*n_rows + 6 + 7 * 4 + n_noise_obs,), dtype=np.float32) @@ -24,7 +25,7 @@ def __init__( self.log_interval = log_interval self.num_agents = num_envs - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) self.n_cols = n_cols self.n_rows = n_rows self.c_envs = binding.vec_init( diff --git a/pufferlib/ocean/tower_climb/tower_climb.py b/pufferlib/ocean/tower_climb/tower_climb.py index 80b19dde8..e3f19dc6e 100644 --- a/pufferlib/ocean/tower_climb/tower_climb.py +++ b/pufferlib/ocean/tower_climb/tower_climb.py @@ -8,7 +8,7 @@ class TowerClimb(pufferlib.PufferEnv): def __init__(self, num_envs=4096, render_mode=None, report_interval=1, num_maps=50, reward_climb_row = .25, reward_fall_row = 0, reward_illegal_move = -0.01, - reward_move_block = 0.2, buf = None, seed=0): + reward_move_block = 0.2, buf = None, seed=0, max_num_threads=0): # env self.num_agents = num_envs @@ -20,7 +20,7 @@ def __init__(self, num_envs=4096, render_mode=None, report_interval=1, shape=(self.num_obs,), dtype=np.uint8) self.single_action_space = gymnasium.spaces.Discrete(6) - super().__init__(buf=buf) + super().__init__(buf, binding, max_num_threads) c_envs = [] self.c_state = binding.shared(num_maps=num_maps) self.c_envs = binding.vec_init(self.observations, self.actions, diff --git a/pufferlib/ocean/trash_pickup/trash_pickup.py b/pufferlib/ocean/trash_pickup/trash_pickup.py index 57409d0ee..209e9ae34 100644 --- a/pufferlib/ocean/trash_pickup/trash_pickup.py +++ b/pufferlib/ocean/trash_pickup/trash_pickup.py @@ -6,7 +6,8 @@ class TrashPickupEnv(pufferlib.PufferEnv): def __init__(self, num_envs=1, render_mode=None, report_interval=1, buf=None, - grid_size=10, num_agents=3, num_trash=15, num_bins=2, max_steps=300, agent_sight_range=5, seed=0): + grid_size=10, num_agents=3, num_trash=15, num_bins=2, max_steps=300, agent_sight_range=5, seed=0, + max_num_threads=0): # Env Setup self.render_mode = render_mode self.report_interval = report_interval @@ -57,7 +58,7 @@ def __init__(self, num_envs=1, render_mode=None, report_interval=1, buf=None, shape=(self.num_obs,), dtype=np.int8) self.single_action_space = spaces.Discrete(4) - super().__init__(buf=buf) + super().__init__(buf, binding, max_num_threads) c_envs = [] for i in range(num_envs): env_id = binding.env_init( diff --git a/pufferlib/ocean/tripletriad/tripletriad.py b/pufferlib/ocean/tripletriad/tripletriad.py index fce49c4ca..e3ae50b93 100644 --- a/pufferlib/ocean/tripletriad/tripletriad.py +++ b/pufferlib/ocean/tripletriad/tripletriad.py @@ -6,7 +6,8 @@ class TripleTriad(pufferlib.PufferEnv): def __init__(self, num_envs=1, render_mode=None, report_interval=1, - width=990, height=690, card_width=192, card_height=224, buf=None, seed=0): + width=990, height=690, card_width=192, card_height=224, buf=None, seed=0, + max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(114,), dtype=np.float32) self.single_action_space = gymnasium.spaces.Discrete(14) @@ -14,7 +15,7 @@ def __init__(self, num_envs=1, render_mode=None, report_interval=1, self.render_mode = render_mode self.num_agents = num_envs - super().__init__(buf=buf) + super().__init__(buf, binding, max_num_threads) self.c_envs = binding.vec_init(self.observations, self.actions, self.rewards, self.terminals, self.truncations, num_envs, seed, width=width, height=height, card_width=card_width, card_height=card_height) diff --git a/pufferlib/ocean/whisker_racer/whisker_racer.py b/pufferlib/ocean/whisker_racer/whisker_racer.py index 5430ff3fe..5aeeece0f 100644 --- a/pufferlib/ocean/whisker_racer/whisker_racer.py +++ b/pufferlib/ocean/whisker_racer/whisker_racer.py @@ -17,8 +17,8 @@ def __init__(self, num_envs=1, render_mode=None, reward_yellow=0.25, reward_green=0.0, gamma=0.9, track_width=50, num_radial_sectors=16, num_points=4, bezier_resolution=16, w_ang=0.523, corner_thresh=0.5, ftmp1=0.1, ftmp2=0.1, ftmp3=0.1, ftmp4=0.1, - mode7=0, render_many=0, seed=42, - buf=None, rng=42, i=1, method=0): + render_many=0, seed=42, + buf=None, rng=42, i=1, method=0, max_num_threads=0): self.single_observation_space = gymnasium.spaces.Box(low=0, high=1, shape=(3,), dtype=np.float32) self.render_mode = render_mode @@ -32,7 +32,7 @@ def __init__(self, num_envs=1, render_mode=None, else: self.single_action_space = gymnasium.spaces.Discrete(3) - super().__init__(buf) + super().__init__(buf, binding, max_num_threads) if continuous: self.actions = self.actions.flatten() diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index f74fec44b..ec8a68c67 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -154,23 +154,15 @@ def __init__(self, config, vecenv, policy, logger=None): eps=config['adam_eps'], ) elif config['optimizer'] == 'muon': - import heavyball from heavyball import ForeachMuon warnings.filterwarnings(action='ignore', category=UserWarning, module=r'heavyball.*') - heavyball.utils.compile_mode = "default" - - # # optionally a little bit better/faster alternative to newtonschulz iteration - # import heavyball.utils - # heavyball.utils.zeroth_power_mode = 'thinky_polar_express' - - # heavyball_momentum=True introduced in heavyball 2.1.1 - # recovers heavyball-1.7.2 behaviour - previously swept hyperparameters work well + import heavyball.utils + heavyball.utils.compile_mode = config['compile_mode'] if config['compile'] else None optimizer = ForeachMuon( self.policy.parameters(), lr=config['learning_rate'], betas=(config['adam_beta1'], config['adam_beta2']), eps=config['adam_eps'], - heavyball_momentum=True, ) else: raise ValueError(f'Unknown optimizer: {config["optimizer"]}') @@ -903,6 +895,7 @@ def download(self): return f'{data_dir}/{model_file}' def train(env_name, args=None, vecenv=None, policy=None, logger=None, should_stop_early=None): + # If args is not provided, load config from config/default.ini and override with provided config/.ini args = args or load_config(env_name) # Assume TorchRun DDP is used if LOCAL_RANK is set @@ -943,11 +936,11 @@ def train(env_name, args=None, vecenv=None, policy=None, logger=None, should_sto all_logs = [] while pufferl.global_step < train_config['total_timesteps']: - if train_config['device'] == 'cuda': - torch.compiler.cudagraph_mark_step_begin() + # if train_config['device'] == 'cuda': + # torch.compiler.cudagraph_mark_step_begin() pufferl.evaluate() - if train_config['device'] == 'cuda': - torch.compiler.cudagraph_mark_step_begin() + # if train_config['device'] == 'cuda': + # torch.compiler.cudagraph_mark_step_begin() logs = pufferl.train() if logs is not None: @@ -959,6 +952,7 @@ def train(env_name, args=None, vecenv=None, policy=None, logger=None, should_sto pufferl.logger.close(model_path) return all_logs + print("Final eval") # Final eval. You can reset the env here, but depending on # your env, this can skew data (i.e. you only collect the shortest # rollouts within a fixed number of epochs) @@ -972,8 +966,10 @@ def train(env_name, args=None, vecenv=None, policy=None, logger=None, should_sto all_logs.append(logs) pufferl.print_dashboard() + print(f"Starting model save:") model_path = pufferl.close() pufferl.logger.close(model_path) + print(f"...Model saved to {model_path}") return all_logs def eval(env_name, args=None, vecenv=None, policy=None): @@ -1173,6 +1169,7 @@ def load_policy(args, vecenv, env_name=''): #optim_state = torch.load(state_path)['optimizer_state_dict'] #pufferl.optimizer.load_state_dict(optim_state) + print(f'Loaded model from {load_path}') return policy def load_config(env_name, parser=None): diff --git a/pufferlib/pufferlib.py b/pufferlib/pufferlib.py index 5bef2b0a5..3314ccd67 100644 --- a/pufferlib/pufferlib.py +++ b/pufferlib/pufferlib.py @@ -19,9 +19,10 @@ ''' -def set_buffers(env, buf=None): +def set_buffers(env, buf=None, is_multi_threaded=False): if buf is None: obs_space = env.single_observation_space + # TODO(perumaal): If is_multi_threaded, we are in a multithreaded env in a single process so we can use torch directly instead of via numpy transfers. env.observations = np.zeros((env.num_agents, *obs_space.shape), dtype=obs_space.dtype) env.rewards = np.zeros(env.num_agents, dtype=np.float32) env.terminals = np.zeros(env.num_agents, dtype=bool) @@ -43,7 +44,7 @@ def set_buffers(env, buf=None): env.actions = buf['actions'] class PufferEnv: - def __init__(self, buf=None): + def __init__(self, buf=None, binding=None, max_num_threads=0): if not hasattr(self, 'single_observation_space'): raise APIUsageError(ENV_ERROR.format('single_observation_space')) if not hasattr(self, 'single_action_space'): @@ -64,7 +65,19 @@ def __init__(self, buf=None): and not isinstance(self.single_action_space, pufferlib.spaces.Box)): raise APIUsageError('Native action_space must be a Discrete, MultiDiscrete, or Box') - set_buffers(self, buf) + set_buffers(self, buf, max_num_threads > 0) + + # Setup multi-threading (if enabled via config file). + if (binding != None) and max_num_threads > 2: + import psutil + num_cores = psutil.cpu_count(logical=False) + if (num_cores is not None) and (num_cores >= 4): + # Reserves the main thread to run steps as well. + num_threads = min(num_cores, max_num_threads) + num_threads = min(1024, num_threads) # Sanity check limit to 1024 threads - otherwise might bork. + num_threads -= 1 + binding.vec_enable_mt(num_threads) + print(f'Multithreading: Using {self.num_agents} total envs / {num_threads} threads in a single process. Available cores: {num_cores}.') self.action_space = pufferlib.spaces.joint_space(self.single_action_space, self.num_agents) self.observation_space = pufferlib.spaces.joint_space(self.single_observation_space, self.num_agents) diff --git a/pufferlib/vector.py b/pufferlib/vector.py index 78614f4d6..5ba997446 100644 --- a/pufferlib/vector.py +++ b/pufferlib/vector.py @@ -57,7 +57,7 @@ class Serial: def num_envs(self): return self.agents_per_batch - def __init__(self, env_creators, env_args, env_kwargs, num_envs, buf=None, seed=0, **kwargs): + def __init__(self, env_creators, env_args, env_kwargs, num_envs, max_num_threads=0,buf=None, seed=0, **kwargs): self.driver_env = env_creators[0](*env_args[0], **env_kwargs[0]) self.agents_per_batch = self.driver_env.num_agents * num_envs self.num_agents = self.agents_per_batch @@ -237,7 +237,8 @@ def num_envs(self): def __init__(self, env_creators, env_args, env_kwargs, num_envs, num_workers=None, batch_size=None, - zero_copy=True, sync_traj=True, overwork=False, seed=0, **kwargs): + zero_copy=True, sync_traj=True, overwork=False, seed=0, + max_num_threads=0, **kwargs): if batch_size is None: batch_size = num_envs if num_workers is None: @@ -257,7 +258,7 @@ def __init__(self, env_creators, env_args, env_kwargs, # This is so you can have n equal buffers raise pufferlib.APIUsageError( 'zero_copy: num_envs must be divisible by batch_size') - + # Note this is not [env].num_envs, but the vector num_envs. Each Python env will setup native C [env].num_envs. self.num_environments = num_envs envs_per_worker = num_envs // num_workers self.envs_per_worker = envs_per_worker @@ -385,7 +386,7 @@ def recv(self): self.ready_workers.pop(0) break elif self.workers_per_batch == self.num_workers: - # Slowest path. Zero-copy synchornized for all workers + # Slowest path. Zero-copy synchronized for all workers if len(self.ready_workers) < self.num_workers: continue @@ -496,7 +497,7 @@ class Ray(): step = step def __init__(self, env_creators, env_args, env_kwargs, num_envs, - num_workers=None, batch_size=None, **kwargs): + num_workers=None, batch_size=None, max_num_threads=0, **kwargs): if batch_size is None: batch_size = num_envs if num_workers is None: @@ -614,8 +615,146 @@ def close(self): self.ray.get([e.close.remote() for e in self.envs]) self.ray.shutdown() +class Multithreading: + '''Runs environments in parallel using native-C multithreading + Total number of envs = [vec].num_envs * [env].num_envs + ''' + reset = reset + step = step + + @property + def num_envs(self): + return self.agents_per_batch + + def __init__(self, env_creators, env_args, env_kwargs, num_envs, max_num_threads=0, + buf=None, seed=0, **kwargs): + # Convert Multiprocessing envs to multithreading envs + # - Convert [env] num_envs to be [vec].num_envs * [env].num_envs instead + # - Make [vec] num_envs and num_workers be 1 + # - Pass max_num_threads to each env to limit threads per env + if isinstance(env_kwargs[0], dict): + env_kwargs[0] = env_kwargs[0].copy() + if 'num_envs' in env_kwargs[0]: + env_kwargs[0]['num_envs'] *= num_envs + env_kwargs[0]['max_num_threads'] = max_num_threads + elif 'num_agents' in env_kwargs[0]: + env_kwargs[0]['max_num_threads'] = max_num_threads + + # Reset num_envs to 1 since multithreading is handled inside the env now. + num_envs = 1 + + self.driver_env = env_creators[0](*env_args[0], **env_kwargs[0]) + self.agents_per_batch = self.driver_env.num_agents * num_envs + self.num_agents = self.agents_per_batch + + self.single_observation_space = self.driver_env.single_observation_space + self.single_action_space = self.driver_env.single_action_space + self.action_space = pufferlib.spaces.joint_space(self.single_action_space, self.agents_per_batch) + self.observation_space = pufferlib.spaces.joint_space(self.single_observation_space, self.agents_per_batch) + + set_buffers(self, buf, True) + + # TODO(perumaal): Refactor this from self.envs to just a self.env + self.envs = [] + ptr = 0 + for i in range(num_envs): + end = ptr + self.driver_env.num_agents + buf_i = dict( + observations=self.observations[ptr:end], + rewards=self.rewards[ptr:end], + terminals=self.terminals[ptr:end], + truncations=self.truncations[ptr:end], + masks=self.masks[ptr:end], + actions=self.actions[ptr:end] + ) + ptr = end + seed_i = seed + i if seed is not None else None + env = env_creators[i](*env_args[i], buf=buf_i, seed=seed_i, **env_kwargs[i]) + self.envs.append(env) + + self.driver_env = driver = self.envs[0] + self.emulated = self.driver_env.emulated + check_envs(self.envs, self.driver_env) + self.agents_per_env = [env.num_agents for env in self.envs] + assert sum(self.agents_per_env) == self.agents_per_batch + self.agent_ids = np.arange(self.num_agents) + self.initialized = False + self.flag = RESET + + def _avg_infos(self): + infos = {} + for e in self.infos: + for k, v in pufferlib.unroll_nested_dict(e): + if k not in infos: + infos[k] = [] + + if isinstance(v, list): + infos[k].append(np.mean(v)) + else: + infos[k].append(v) + + for k in list(infos.keys()): + try: + infos[k] = np.mean(infos[k]) + except: + del infos[k] + + def async_reset(self, seed=None): + self.flag = RECV + infos = [] + for i, env in enumerate(self.envs): + if seed is None: + ob, i = env.reset() + else: + ob, i = env.reset(seed=seed+i) + + if isinstance(i, list): + infos.extend(i) + else: + infos.append(i) -def make(env_creator_or_creators, env_args=None, env_kwargs=None, backend=PufferEnv, num_envs=1, seed=0, **kwargs): + self.infos = infos + self._avg_infos() + + def send(self, actions): + if not actions.flags.contiguous: + actions = np.ascontiguousarray(actions) + + actions = send_precheck(self, actions) + rewards, dones, truncateds, self.infos = [], [], [], [] + ptr = 0 + for idx, env in enumerate(self.envs): + end = ptr + self.agents_per_env[idx] + atns = actions[ptr:end] + if env.done: + o, i = env.reset() + else: + o, r, d, t, i = env.step(atns) + + if i: + if isinstance(i, list): + self.infos.extend(i) + else: + self.infos.append(i) + + ptr = end + + self._avg_infos() + + def notify(self): + for env in self.envs: + env.notify() + + def recv(self): + recv_precheck(self) + return (self.observations, self.rewards, self.terminals, self.truncations, + self.infos, self.agent_ids, self.masks) + + def close(self): + for env in self.envs: + env.close() +def make(env_creator_or_creators, env_args=None, env_kwargs=None, backend=PufferEnv, num_envs=1, seed=0, + max_num_threads=0, **kwargs): if num_envs < 1: raise pufferlib.APIUsageError('num_envs must be at least 1') if num_envs != int(num_envs): @@ -705,7 +844,7 @@ def make(env_creator_or_creators, env_args=None, env_kwargs=None, backend=Puffer # TODO: First step action space check - return backend(env_creators, env_args, env_kwargs, num_envs, **kwargs) + return backend(env_creators, env_args, env_kwargs, num_envs, max_num_threads=max_num_threads, **kwargs) def make_seeds(seed, num_envs): if isinstance(seed, int):