-
Notifications
You must be signed in to change notification settings - Fork 17
/
ringbuffer.py
321 lines (243 loc) · 9.92 KB
/
ringbuffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
"""Ringbuffer implementation with focus on time & memory efficiency."""
from __future__ import annotations
from copy import deepcopy
from datetime import datetime, timedelta
from typing import Any, Generic, Sequence, TypeVar, Union
import numpy as np
T = TypeVar("T")
Container = Union[list, np.ndarray]
class RingBuffer(Generic[T]):
"""A ring buffer with a fixed size.
Should work with most backends, tested with list and np.ndarrays.
"""
class NoElements(Exception):
"""Exception class, thrown when no elements are available when calling pop()."""
def __init__(self, container: Container) -> None:
"""Initialize the ring buffer with the given container.
Args:
container: Container to store the data in.
"""
self._container = container
self._write_index = 0
self._read_index = 0
self._len = 0
def __len__(self) -> int:
"""Get current amount of elements.
Returns:
the amount of items that this container currently holds.
"""
return self._len
@property
def maxlen(self) -> int:
"""Get the max length.
Returns:
the max amount of items this container can hold.
"""
return len(self._container)
def push(self, value: T) -> int:
"""Push a new value into the ring buffer.
Args:
value: value to push into the ring buffer.
Returns:
the index in the ringbuffer.
"""
if self._len == len(self._container):
# Move read position one forward, dropping the oldest written value
self._read_index = self._wrap(self._read_index + 1)
else:
self._len += 1
self._container[self._write_index] = value
value_index = self._write_index
self._write_index = self._wrap(self._write_index + 1)
return value_index
def pop(self) -> T:
"""Remove the oldest value from the ring buffer and return it.
Raises:
NoElements: when no elements exist to pop.
Returns:
Oldest value found in the ring buffer.
"""
if self._len == 0:
raise RingBuffer.NoElements()
val = self._container[self._read_index]
self._read_index = (self._read_index + 1) % len(self._container)
self._len -= 1
return val
@property
def full(self) -> bool:
"""Check if the container is full.
Returns:
True when the container is full.
"""
return len(self) == len(self._container)
def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None:
"""Write the given value to the requested position.
Args:
index: Position to write the value to.
value: Value to write.
"""
self._container[index] = value
def __getitem__(self, index_or_slice: int | slice) -> T | Container:
"""Request a value or slice.
Does not support wrap-around or copying of data.
Args:
index_or_slice: Index or slice specification of the requested data
Returns:
the value at the given index or value range at the given slice.
"""
return self._container[index_or_slice]
def _wrap(self, index: int) -> int:
return index % len(self._container)
class OrderedRingBuffer(Generic[T]):
"""Time aware ringbuffer that keeps its entries sorted time."""
def __init__(
self,
buffer: Any,
resolution_in_seconds: int,
window_border: datetime = datetime(1, 1, 1),
) -> None:
"""Initialize the time aware ringbuffer.
Args:
buffer: instance of a buffer container to use internally
resolution_in_seconds: resolution of the incoming timestamps in
seconds
window_border: datetime depicting point in time to use as border
beginning, useful to make data start at the beginning of the day or
hour.
"""
self._buffer = buffer
self._resolution_in_seconds = resolution_in_seconds
self._window_start = window_border
self._missing_windows = []
self._datetime_newest = datetime.min
self._datetime_oldest = datetime.max
@property
def maxlen(self) -> int:
"""Get the max length.
Returns:
the max amount of items this container can hold.
"""
return len(self._buffer)
def update(self, timestamp: datetime, value: T, missing: bool = False) -> None:
"""Update the buffer with a new value for the given timestamp.
Args:
timestamp: Timestamp of the new value
value: value to add
missing: if true, the given timestamp will be recorded as missing.
The value will still be written.
Returns:
Nothing.
"""
# Update timestamps
self._datetime_newest = max(self._datetime_newest, timestamp)
self._datetime_oldest = min(self._datetime_oldest, timestamp)
if self._datetime_oldest < self._datetime_newest - timedelta(
seconds=len(self._buffer) * self._resolution_in_seconds
):
self._datetime_oldest = self._datetime_newest - timedelta(
len(self._buffer) * self._resolution_in_seconds
)
# Update data
insert_index = self.datetime_to_index(timestamp)
self._buffer[insert_index] = value
# Update list of missing windows
#
# We always append to the last pending window.
# A window is pending when end is None
if missing:
# Create new if no pending window
if (
len(self._missing_windows) == 0
or self._missing_windows[-1].end is not None
):
self._missing_windows.append({"start": timestamp, "end": None})
elif len(self._missing_windows) > 0:
# Finalize a pending window
if self._missing_windows[-1].end is None:
self._missing_windows[-1].end = timestamp
# Delete out-to-date windows
if len(self._missing_windows) > 0 and self._missing_windows[0].end is not None:
if self._missing_windows[0].end <= self._datetime_oldest:
self._missing_windows = self._missing_windows[1:]
def datetime_to_index(self, timestamp: datetime) -> int:
"""Convert the given timestamp to an index.
Throws an index error when the timestamp is not found within this
buffer.
Args:
timestamp: Timestamp to convert.
Raises:
IndexError: when requesting a timestamp outside the range this container holds
Returns:
index where the value for the given timestamp can be found.
"""
if self._datetime_newest < timestamp or timestamp < self._datetime_oldest:
raise IndexError(
f"Requested timestamp {timestamp} is is "
f"outside the range [{self._datetime_oldest} - {self._datetime_newest}]"
)
return self._wrap(int(abs((self._window_start - timestamp).total_seconds())))
def window(self, start: datetime, end: datetime) -> Container:
"""Request a view on the data between start timestamp and end timestamp.
Will return a copy in the following cases:
* The requested time period is crossing the start/end of the buffer
* The requested time period contains missing entries.
This means, if the caller needs to modify the data to account for
missing entries, they can safely do so.
Args:
start: start time of the window
end: end time of the window
Returns:
the requested window
"""
assert start < end
start_index = self.datetime_to_index(start)
end_index = self.datetime_to_index(end)
# Requested window wraps around the ends
if start_index > end_index:
window = self._buffer[start_index:]
if end_index > 0:
if isinstance(self._buffer, list):
window += self._buffer[0:end_index]
else:
window = np.concatenate((window, self._buffer[0:end_index]))
return window
def in_window(window):
if window.start <= start < window.end:
return True
if window.start <= end < window.end:
return True
return False
# Return a copy if there are none-values in the data
if any(map(in_window, self._missing_windows)):
return deepcopy(self._buffer[start_index:end_index])
return self._buffer[start_index:end_index]
def _wrap(self, index: int) -> int:
"""Normalize the given index to fit in the buffer by wrapping it around.
Args:
index: index to normalize.
Returns:
an index that will be within max_size.
"""
return index % self.maxlen
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
"""Get item or slice at requested position.
Args:
index_or_slice: Index or slice specification of the requested data.
Returns:
The requested value or slice.
"""
return self._buffer[index_or_slice]
def __len__(self) -> int:
"""Return the amount of items that this container currently holds.
Returns:
The length.
"""
if self._datetime_newest == datetime.min:
return 0
start_index = self.datetime_to_index(self._datetime_oldest)
end_index = self.datetime_to_index(self._datetime_newest)
if end_index < start_index:
return len(self._buffer) - start_index + end_index
return start_index - end_index