/
exp_replay.py
93 lines (72 loc) · 3.35 KB
/
exp_replay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# Experience Replay used in the training.
import numpy as np
class ExperienceReplay(object):
def __init__(self, capacity, prob_alpha=0.6):
self.prob_alpha = prob_alpha
self.capacity = capacity
self.buffer = []
self.pos = 0
def push(self, state, action, reward, next_state, done):
assert state.ndim == next_state.ndim
state = np.expand_dims(state, 0)
next_state = np.expand_dims(next_state, 0)
if len(self.buffer) < self.capacity:
self.buffer.append((state, action, reward, next_state, done))
else:
self.buffer[self.pos] = (state, action, reward, next_state, done)
self.pos = (self.pos + 1) % self.capacity
def sample(self, batch_size, beta=0.4):
indices = np.random.choice(len(self.buffer), batch_size)
samples = [self.buffer[idx] for idx in indices]
batch = list(zip(*samples))
states = np.concatenate(batch[0])
actions = batch[1]
rewards = batch[2]
next_states = np.concatenate(batch[3])
dones = batch[4]
return states, actions, rewards, next_states, dones
def __len__(self):
return len(self.buffer)
class PrioritizedExperienceReplay(object):
def __init__(self, capacity, prob_alpha=0.6):
self.prob_alpha = prob_alpha
self.capacity = capacity
self.buffer = []
self.pos = 0
self.priorities = np.zeros((capacity,), dtype=np.float32)
def push(self, state, action, reward, next_state, done):
assert state.ndim == next_state.ndim
state = np.expand_dims(state, 0)
next_state = np.expand_dims(next_state, 0)
max_prio = self.priorities.max() if self.buffer else 1.0
if len(self.buffer) < self.capacity:
self.buffer.append((state, action, reward, next_state, done))
else:
self.buffer[self.pos] = (state, action, reward, next_state, done)
self.priorities[self.pos] = max_prio
self.pos = (self.pos + 1) % self.capacity
def sample(self, batch_size, beta=0.4):
if len(self.buffer) == self.capacity:
prios = self.priorities
else:
prios = self.priorities[:self.pos]
probs = prios ** self.prob_alpha
probs /= probs.sum()
indices = np.random.choice(len(self.buffer), batch_size, p=probs)
samples = [self.buffer[idx] for idx in indices]
total = len(self.buffer)
weights = (total * probs[indices]) ** (-beta)
weights /= weights.max()
weights = np.array(weights, dtype=np.float32)
batch = list(zip(*samples))
states = np.concatenate(batch[0])
actions = batch[1]
rewards = batch[2]
next_states = np.concatenate(batch[3])
dones = batch[4]
return states, actions, rewards, next_states, dones, indices, weights
def update_priorities(self, batch_indices, batch_priorities):
for idx, prio in zip(batch_indices, batch_priorities):
self.priorities[idx] = prio
def __len__(self):
return len(self.buffer)