/
test_pendulum.py
86 lines (69 loc) · 3.27 KB
/
test_pendulum.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import unittest
import numpy as np
import torch
from warp_drive.env_cpu_gpu_consistency_checker import EnvironmentCPUvsGPU
from example_envs.single_agent.classic_control.pendulum.pendulum import \
ClassicControlPendulumEnv, CUDAClassicControlPendulumEnv
from warp_drive.env_wrapper import EnvWrapper
env_configs = {
"test1": {
"episode_length": 200,
"reset_pool_size": 0,
"seed": 32145,
},
}
class MyTestCase(unittest.TestCase):
"""
CPU v GPU consistency unit tests
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.testing_class = EnvironmentCPUvsGPU(
cpu_env_class=ClassicControlPendulumEnv,
cuda_env_class=CUDAClassicControlPendulumEnv,
env_configs=env_configs,
gpu_env_backend="numba",
num_envs=5,
num_episodes=2,
)
def test_env_consistency(self):
try:
self.testing_class.test_env_reset_and_step()
except AssertionError:
self.fail("ClassicControlPendulumEnv environment consistency tests failed")
def test_reset_pool(self):
env_wrapper = EnvWrapper(
env_obj=CUDAClassicControlPendulumEnv(episode_length=100, reset_pool_size=8),
num_envs=3,
env_backend="numba",
)
env_wrapper.reset_all_envs()
env_wrapper.env_resetter.init_reset_pool(env_wrapper.cuda_data_manager, seed=12345)
self.assertTrue(env_wrapper.cuda_data_manager.reset_target_to_pool["state"] == "state_reset_pool")
# squeeze() the agent dimension which is 1 always
state_after_initial_reset = env_wrapper.cuda_data_manager.pull_data_from_device("state").squeeze()
reset_pool = env_wrapper.cuda_data_manager.pull_data_from_device(
env_wrapper.cuda_data_manager.get_reset_pool("state"))
reset_pool_mean = reset_pool.mean(axis=0).squeeze()
self.assertTrue(reset_pool.std(axis=0).mean() > 1e-4)
env_wrapper.cuda_data_manager.data_on_device_via_torch("_done_")[:] = torch.from_numpy(
np.array([1, 1, 0])
).cuda()
state_values = {0: [], 1: [], 2: []}
for _ in range(10000):
env_wrapper.env_resetter.reset_when_done(env_wrapper.cuda_data_manager, mode="if_done", undo_done_after_reset=False)
res = env_wrapper.cuda_data_manager.pull_data_from_device("state")
state_values[0].append(res[0])
state_values[1].append(res[1])
state_values[2].append(res[2])
state_values_env0_mean = np.stack(state_values[0]).mean(axis=0).squeeze()
state_values_env1_mean = np.stack(state_values[1]).mean(axis=0).squeeze()
state_values_env2_mean = np.stack(state_values[2]).mean(axis=0).squeeze()
for i in range(len(reset_pool_mean)):
self.assertTrue(np.absolute(state_values_env0_mean[i] - reset_pool_mean[i]) < 0.1 * abs(reset_pool_mean[i]))
self.assertTrue(np.absolute(state_values_env1_mean[i] - reset_pool_mean[i]) < 0.1 * abs(reset_pool_mean[i]))
self.assertTrue(
np.absolute(
state_values_env2_mean[i] - state_after_initial_reset[0][i]
) < 0.001 * abs(state_after_initial_reset[0][i])
)