Skip to content

Commit

Permalink
Merge pull request #95 from salesforce/mountain-car-cont
Browse files Browse the repository at this point in the history
cont mountain car
  • Loading branch information
Emerald01 committed Apr 19, 2024
2 parents c1bb199 + 5dc8e1a commit 2fa1479
Show file tree
Hide file tree
Showing 21 changed files with 673 additions and 68 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
@@ -1,4 +1,8 @@
# Changelog
# Release 2.7.1 (2024-04-19)
- Add Continuous Mountain Car environment
- A2C algorithm supports conditional down-sampling for bad trajectories

# Release 2.7 (2024-02-17)
- Support continuous actions
- Add Pendulum environment that can run up to 100K concurrent replicates
Expand Down
@@ -0,0 +1,126 @@
import numpy as np
from warp_drive.utils.constants import Constants
from warp_drive.utils.data_feed import DataFeed
from warp_drive.utils.gpu_environment_context import CUDAEnvironmentContext

from example_envs.single_agent.base import SingleAgentEnv, map_to_single_agent, get_action_for_single_agent
from gym.envs.classic_control.continuous_mountain_car import Continuous_MountainCarEnv

_OBSERVATIONS = Constants.OBSERVATIONS
_ACTIONS = Constants.ACTIONS
_REWARDS = Constants.REWARDS


class ClassicControlContinuousMountainCarEnv(SingleAgentEnv):

name = "ClassicControlContinuousMountainCarEnv"

def __init__(self, episode_length, env_backend="cpu", reset_pool_size=0, seed=None):
super().__init__(episode_length, env_backend, reset_pool_size, seed=seed)

self.gym_env = Continuous_MountainCarEnv()

self.action_space = map_to_single_agent(self.gym_env.action_space)
self.observation_space = map_to_single_agent(self.gym_env.observation_space)

def step(self, action=None):
self.timestep += 1
action = get_action_for_single_agent(action)
state, reward, terminated, _, _ = self.gym_env.step(action)

obs = map_to_single_agent(state)
rew = map_to_single_agent(reward)
done = {"__all__": self.timestep >= self.episode_length or terminated}
info = {}

return obs, rew, done, info

def reset(self):
self.timestep = 0
if self.reset_pool_size < 2:
# we use a fixed initial state all the time
initial_state, _ = self.gym_env.reset(seed=self.seed)
else:
initial_state, _ = self.gym_env.reset(seed=None)
obs = map_to_single_agent(initial_state)

return obs


class CUDAClassicControlContinuousMountainCarEnv(ClassicControlContinuousMountainCarEnv, CUDAEnvironmentContext):

def get_data_dictionary(self):
data_dict = DataFeed()
initial_state, _ = self.gym_env.reset(seed=self.seed)

if self.reset_pool_size < 2:
data_dict.add_data(
name="state",
data=np.atleast_2d(initial_state),
save_copy_and_apply_at_reset=True,
)
else:
data_dict.add_data(
name="state",
data=np.atleast_2d(initial_state),
save_copy_and_apply_at_reset=False,
)

data_dict.add_data_list(
[
("min_action", self.gym_env.min_action),
("max_action", self.gym_env.max_action),
("min_position", self.gym_env.min_position),
("max_position", self.gym_env.max_position),
("max_speed", self.gym_env.max_speed),
("goal_position", self.gym_env.goal_position),
("goal_velocity", self.gym_env.goal_velocity),
("power", self.gym_env.power),
]
)
return data_dict

def get_tensor_dictionary(self):
tensor_dict = DataFeed()
return tensor_dict

def get_reset_pool_dictionary(self):
reset_pool_dict = DataFeed()
if self.reset_pool_size >= 2:
state_reset_pool = []
for _ in range(self.reset_pool_size):
initial_state, _ = self.gym_env.reset(seed=None)
state_reset_pool.append(np.atleast_2d(initial_state))
state_reset_pool = np.stack(state_reset_pool, axis=0)
assert len(state_reset_pool.shape) == 3 and state_reset_pool.shape[2] == 2

reset_pool_dict.add_pool_for_reset(name="state_reset_pool",
data=state_reset_pool,
reset_target="state")
return reset_pool_dict

def step(self, actions=None):
self.timestep += 1
args = [
"state",
_ACTIONS,
"_done_",
_REWARDS,
_OBSERVATIONS,
"min_action",
"max_action",
"min_position",
"max_position",
"max_speed",
"goal_position",
"goal_velocity",
"power",
"_timestep_",
("episode_length", "meta"),
]
if self.env_backend == "numba":
self.cuda_step[
self.cuda_function_manager.grid, self.cuda_function_manager.block
](*self.cuda_step_function_feed(args))
else:
raise Exception("CUDAClassicControlContinuousMountainCarEnv expects env_backend = 'numba' ")
@@ -0,0 +1,72 @@
import numba.cuda as numba_driver
import math


@numba_driver.jit
def _clip(v, min, max):
if v < min:
return min
if v > max:
return max
return v


@numba_driver.jit
def NumbaClassicControlContinuousMountainCarEnvStep(
state_arr,
action_arr,
done_arr,
reward_arr,
observation_arr,
min_action,
max_action,
min_position,
max_position,
max_speed,
goal_position,
goal_velocity,
power,
env_timestep_arr,
episode_length):

kEnvId = numba_driver.blockIdx.x
kThisAgentId = numba_driver.threadIdx.x

assert kThisAgentId == 0, "We only have one agent per environment"

env_timestep_arr[kEnvId] += 1

assert 0 < env_timestep_arr[kEnvId] <= episode_length

action = action_arr[kEnvId, kThisAgentId, 0]

position = state_arr[kEnvId, kThisAgentId, 0]
velocity = state_arr[kEnvId, kThisAgentId, 1]
force = _clip(action, min_action, max_action)

velocity += force * power - 0.0025 * math.cos(3 * position)
velocity = _clip(velocity, -max_speed, max_speed)

position += velocity
position = _clip(position, min_position, max_position)
if position == min_position and velocity < 0:
velocity = 0

state_arr[kEnvId, kThisAgentId, 0] = position
state_arr[kEnvId, kThisAgentId, 1] = velocity

observation_arr[kEnvId, kThisAgentId, 0] = state_arr[kEnvId, kThisAgentId, 0]
observation_arr[kEnvId, kThisAgentId, 1] = state_arr[kEnvId, kThisAgentId, 1]

terminated = bool(
position >= goal_position and velocity >= goal_velocity
)

rew = 0.0
if terminated:
rew = 100.0
rew -= math.pow(action, 2) * 0.1
reward_arr[kEnvId, kThisAgentId] = rew

if env_timestep_arr[kEnvId] == episode_length or terminated:
done_arr[kEnvId] = 1
Expand Up @@ -64,5 +64,7 @@ def NumbaClassicControlMountainCarEnvStep(
# as long as not reset, we assign reward -1. This is consistent with original cartpole logic
reward_arr[kEnvId, kThisAgentId] = -1.0

if env_timestep_arr[kEnvId] == episode_length or terminated:
done_arr[kEnvId] = 1
if env_timestep_arr[kEnvId] == episode_length:
done_arr[kEnvId] = 1
elif terminated:
done_arr[kEnvId] = 2
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -14,7 +14,7 @@

setup(
name="rl-warp-drive",
version="2.7",
version="2.7.1",
author="Tian Lan, Sunil Srinivasa, Brenton Chu, Stephan Zheng",
author_email="tian.lan@salesforce.com",
description="Framework for fast end-to-end "
Expand Down
Expand Up @@ -63,7 +63,7 @@ def test_reset_pool(self):
).cuda()

state_values = {0: [], 1: [], 2: []}
for _ in range(10000):
for _ in range(30000):
env_wrapper.env_resetter.reset_when_done(env_wrapper.cuda_data_manager, mode="if_done", undo_done_after_reset=False)
res = env_wrapper.cuda_data_manager.pull_data_from_device("state")
state_values[0].append(res[0])
Expand All @@ -75,9 +75,9 @@ def test_reset_pool(self):
state_values_env2_mean = np.stack(state_values[2]).mean(axis=0).squeeze()

for i in range(len(reset_pool_mean)):
self.assertTrue(np.absolute(state_values_env0_mean[i] - reset_pool_mean[i]) < 0.1 * abs(reset_pool_mean[i]),
self.assertTrue(np.absolute(state_values_env0_mean[i] - reset_pool_mean[i]) < 0.2 * abs(reset_pool_mean[i]),
f"sampled mean: {state_values_env0_mean[i]}, expected mean: {reset_pool_mean[i]}")
self.assertTrue(np.absolute(state_values_env1_mean[i] - reset_pool_mean[i]) < 0.1 * abs(reset_pool_mean[i]),
self.assertTrue(np.absolute(state_values_env1_mean[i] - reset_pool_mean[i]) < 0.2 * abs(reset_pool_mean[i]),
f"sampled mean: {state_values_env1_mean[i]}, expected mean: {reset_pool_mean[i]}")
self.assertTrue(
np.absolute(
Expand Down
@@ -0,0 +1,91 @@
import unittest
import numpy as np
import torch

from warp_drive.env_cpu_gpu_consistency_checker import EnvironmentCPUvsGPU
from example_envs.single_agent.classic_control.continuous_mountain_car.continuous_mountain_car import \
ClassicControlContinuousMountainCarEnv, CUDAClassicControlContinuousMountainCarEnv
from warp_drive.env_wrapper import EnvWrapper


env_configs = {
"test1": {
"episode_length": 999,
"reset_pool_size": 0,
"seed": 32145,
},
"test2": {
"episode_length": 200,
"reset_pool_size": 0,
"seed": 54231,
},
}


class MyTestCase(unittest.TestCase):
"""
CPU v GPU consistency unit tests
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.testing_class = EnvironmentCPUvsGPU(
cpu_env_class=ClassicControlContinuousMountainCarEnv,
cuda_env_class=CUDAClassicControlContinuousMountainCarEnv,
env_configs=env_configs,
gpu_env_backend="numba",
num_envs=5,
num_episodes=2,
)

def test_env_consistency(self):
try:
self.testing_class.test_env_reset_and_step()
except AssertionError:
self.fail("ClassicControlContinuousMountainCarEnv environment consistency tests failed")

def test_reset_pool(self):
env_wrapper = EnvWrapper(
env_obj=CUDAClassicControlContinuousMountainCarEnv(episode_length=100, reset_pool_size=3),
num_envs=3,
env_backend="numba",
)
env_wrapper.reset_all_envs()
env_wrapper.env_resetter.init_reset_pool(env_wrapper.cuda_data_manager, seed=12345)
self.assertTrue(env_wrapper.cuda_data_manager.reset_target_to_pool["state"] == "state_reset_pool")

# squeeze() the agent dimension which is 1 always
state_after_initial_reset = env_wrapper.cuda_data_manager.pull_data_from_device("state").squeeze()

reset_pool = env_wrapper.cuda_data_manager.pull_data_from_device(
env_wrapper.cuda_data_manager.get_reset_pool("state"))
reset_pool_mean = reset_pool.mean(axis=0).squeeze()

# we only need to check the 0th element of state because state[1] = 0 for reset always
self.assertTrue(reset_pool.std(axis=0).squeeze()[0] > 1e-4)

env_wrapper.cuda_data_manager.data_on_device_via_torch("_done_")[:] = torch.from_numpy(
np.array([1, 1, 0])
).cuda()

state_values = {0: [], 1: [], 2: []}
for _ in range(10000):
env_wrapper.env_resetter.reset_when_done(env_wrapper.cuda_data_manager, mode="if_done", undo_done_after_reset=False)
res = env_wrapper.cuda_data_manager.pull_data_from_device("state")
state_values[0].append(res[0])
state_values[1].append(res[1])
state_values[2].append(res[2])

state_values_env0_mean = np.stack(state_values[0]).mean(axis=0).squeeze()
state_values_env1_mean = np.stack(state_values[1]).mean(axis=0).squeeze()
state_values_env2_mean = np.stack(state_values[2]).mean(axis=0).squeeze()

self.assertTrue(np.absolute(state_values_env0_mean[0] - reset_pool_mean[0]) < 0.1 * abs(reset_pool_mean[0]))
self.assertTrue(np.absolute(state_values_env1_mean[0] - reset_pool_mean[0]) < 0.1 * abs(reset_pool_mean[0]))
self.assertTrue(
np.absolute(
state_values_env2_mean[0] - state_after_initial_reset[0][0]
) < 0.001 * abs(state_after_initial_reset[0][0])
)


Expand Up @@ -63,7 +63,7 @@ def test_reset_pool(self):
).cuda()

state_values = {0: [], 1: [], 2: []}
for _ in range(10000):
for _ in range(30000):
env_wrapper.env_resetter.reset_when_done(env_wrapper.cuda_data_manager, mode="if_done", undo_done_after_reset=False)
res = env_wrapper.cuda_data_manager.pull_data_from_device("state")
state_values[0].append(res[0])
Expand All @@ -75,8 +75,8 @@ def test_reset_pool(self):
state_values_env2_mean = np.stack(state_values[2]).mean(axis=0).squeeze()

for i in range(len(reset_pool_mean)):
self.assertTrue(np.absolute(state_values_env0_mean[i] - reset_pool_mean[i]) < 0.1 * abs(reset_pool_mean[i]))
self.assertTrue(np.absolute(state_values_env1_mean[i] - reset_pool_mean[i]) < 0.1 * abs(reset_pool_mean[i]))
self.assertTrue(np.absolute(state_values_env0_mean[i] - reset_pool_mean[i]) < 0.2 * abs(reset_pool_mean[i]))
self.assertTrue(np.absolute(state_values_env1_mean[i] - reset_pool_mean[i]) < 0.2 * abs(reset_pool_mean[i]))
self.assertTrue(
np.absolute(
state_values_env2_mean[i] - state_after_initial_reset[0][i]
Expand Down
2 changes: 1 addition & 1 deletion tests/wd_training/pycuda_tests/test_env_training.py
Expand Up @@ -11,7 +11,7 @@
import torch
import yaml

from warp_drive.training.example_training_script_pycuda import setup_trainer_and_train
from warp_drive.training.scripts.example_training_script_pycuda import setup_trainer_and_train
from warp_drive.training.utils.device_child_process.child_process_base import ProcessWrapper
from warp_drive.training.utils.distributed_train.distributed_trainer_pycuda import (
perform_distributed_training,
Expand Down
8 changes: 6 additions & 2 deletions warp_drive/env_wrapper.py
Expand Up @@ -355,7 +355,7 @@ def repeat_across_env_dimension(array, num_envs):
def init_reset_pool(self, seed=None):
self.env_resetter.init_reset_pool(self.cuda_data_manager, seed)

def reset_only_done_envs(self):
def reset_only_done_envs(self, undo_done_after_reset=True):
"""
This function only works for GPU example_envs.
It will check all the running example_envs,
Expand All @@ -366,7 +366,11 @@ def reset_only_done_envs(self):
"for pycuda or numba backends and self.reset_on_host = False"
)

self.env_resetter.reset_when_done(self.cuda_data_manager, mode="if_done")
self.env_resetter.reset_when_done(
self.cuda_data_manager,
mode="if_done",
undo_done_after_reset=undo_done_after_reset
)
return {}

def custom_reset_all_envs(self, args=None, block=None, grid=None):
Expand Down

0 comments on commit 2fa1479

Please sign in to comment.