-
Notifications
You must be signed in to change notification settings - Fork 17
/
ringbuffer.py
240 lines (182 loc) · 7.78 KB
/
ringbuffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
"""Ringbuffer implementation with focus on time & memory efficiency"""
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import Any, Generic, Sequence, TypeVar
import numpy as np
T = TypeVar("T")
def RingBufferArray(size: int):
return RingBuffer[T](np.empty(shape=(size,), dtype=np.float64))
class RingBuffer(Generic[T]):
def __init__(self, container: Any):
"""Initialize the ring buffer with the given container.
Args:
container: Container to store the data in.
"""
self.container = container
self.write_index = 0
self.read_index = 0
self.size = 0
def __len__(self):
"""Return the amount of items that this container currently holds."""
return self.size
def max_size(self):
"""Return the max amount of items this container can hold"""
return len(self.container)
def push(self, value: T) -> int:
"""Pushes a new value into the ring buffer, returns the index in the
ringbuffer.
Args:
value: value to push into the ring buffer.
"""
if self.size == len(self.container):
# Move read position one forward, dropping the oldest written value
self.read_index = (self.read_index + 1) % len(self.container)
else:
self.size += 1
self.container[self.write_index] = value
value_index = self.write_index
self.write_index = (self.write_index + 1) % len(self.container)
return value_index
def pop(self) -> T:
"""Remove the oldest value from the ring buffer and return it."""
if self.size == 0:
raise
val = self.container[self.read_index]
self.read_index = (self.read_index + 1) % len(self.container)
self.size -= 1
return val
def wrap(self, index) -> int:
return index % len(self.container)
def full(self) -> bool:
return len(self) == len(self.container)
def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None:
self.container[index] = value
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
"""Returns the value at the given index or value range at the given
slice. Does not support wrap-around or copying of data."""
return self.container[index_or_slice]
class OrderedRingBuffer(Generic[T]):
"""Time aware ringbuffer that keeps its entries sorted time."""
def __init__(
self,
buffer: Any,
resolution_in_seconds: int,
window_border: datetime = datetime(1, 1, 1),
) -> None:
"""Initialize the time aware ringbuffer.
Args:
buffer: instance of a buffer container to use internally
resolution_in_seconds: resolution of the incoming timestamps in
seconds
window_border: datetime depicting point in time to use as border
beginning, useful to make data start at the beginning of the day or
hour.
"""
self.buffer = buffer
self.resolution_in_seconds = resolution_in_seconds
self.window_start = window_border
self.missing_windows = []
self.datetime_newest = datetime.min
self.datetime_oldest = datetime.max
def max_size(self):
"""Return the max amount of items this container can hold."""
return len(self.buffer)
def update(self, datetime: datetime, value: T, missing: bool = False):
"""Update the buffer with a new value for the given timestamp.
Args:
datetime: Timestamp of the new value
value: value to add
missing: if true, the given timestamp will be recorded as missing.
The value will still be written.
"""
# Update timestamps
if self.datetime_newest < datetime:
self.datetime_newest = datetime
if self.datetime_oldest > datetime:
self.datetime_oldest = datetime
elif self.datetime_oldest < self.datetime_newest - timedelta(
seconds=len(self.buffer) * self.resolution_in_seconds
):
self.datetime_oldest = self.datetime_newest - timedelta(
len(self.buffer) * self.resolution_in_seconds
)
# Update data
insert_index = self.datetime_to_index(datetime)
self.buffer[insert_index] = value
# Update list of missing windows
#
# We always append to the last pending window.
# A window is pending when end == None
if missing:
# Create new if no pending window
if len(self.missing_windows) == 0 or self.missing_windows[-1].end != None:
self.missing_windows.append({"start": datetime, "end": None})
elif len(self.missing_windows) > 0:
# Finalize a pending window
if self.missing_windows[-1].end == None:
self.missing_windows[-1].end = datetime
# Delete out-to-date windows
if len(self.missing_windows) > 0 and self.missing_windows[0].end != None:
if self.missing_windows[0].end <= self.datetime_oldest:
self.missing_windows = self.missing_windows[1:]
def datetime_to_index(self, datetime: datetime) -> int:
"""Convert the given timestamp to an index.
Throws an index error when the timestamp is not found within this
buffer.
Args:
datetime: Timestamp to convert.
Returns:
index where the value for the given timestamp can be found.
"""
if datetime < self.datetime_oldest:
raise IndexError()
return self.wrap(int(abs((self.window_start - datetime).total_seconds())))
def window(self, start: datetime, end: datetime) -> Any:
"""Request a view on the data between start timestamp and end timestamp.
Will return a copy in the following cases:
* The requested time period is crossing the start/end of the buffer
* The requested time period contains missing entries.
This means, if the caller needs to modify the data to account for
missing entries, they can safely do so.
Args:
start: start time of the window
end: end time of the window
Returns:
the requested window
"""
assert start < end
start_index = self.datetime_to_index(start)
end_index = self.datetime_to_index(end)
# Requested window wraps around the ends
if start_index > end_index:
slice = self.buffer[start_index:]
if end_index > 0:
if type(self.buffer) == list:
slice += self.buffer[0:end_index]
else:
slice = np.concatenate((slice, self.buffer[0:end_index]))
return slice
def in_window(window):
if window.start < start and window.end > start:
return True
if window.start < end and window.end > end:
return True
return False
# Return a copy if there are none-values in the data
if any(map(in_window, self.missing_windows)):
import copy
return copy.deepcopy(self.buffer[start_index:end_index])
return self.buffer[start_index:end_index]
def wrap(self, index: int):
"""Normalizes the given index to fit in the buffer by wrapping it
around.
Args:
index: index to normalize.
Returns:
an index that will be within max_size.
"""
return index % self.max_size()
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
return self.buffer[index_or_slice]