Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Marenz <mathias.baumann@frequenz.com>
- Loading branch information
Showing
3 changed files
with
499 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
# License: MIT | ||
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH | ||
|
||
""" | ||
Performance test for the `Ringbuffer` class | ||
""" | ||
|
||
from frequenz.sdk.util.ringbuffer import RingBuffer, ArrayContainer, ListContainer, OrderedRingBuffer | ||
|
||
import numpy as np | ||
import random | ||
from datetime import datetime, timedelta | ||
|
||
MINUTES_IN_29_DAYS=29 * 24 * 60 | ||
|
||
def fill_buffer(days, buffer): | ||
"""Fill the given buffer up to the given amount of days, one sample per | ||
minute""" | ||
random.seed(0) | ||
basetime = datetime(2022, 1, 1) | ||
|
||
for day in range(days): | ||
# Push in random order | ||
for i in random.sample(range(24 * 60), 24 * 60): | ||
buffer.update(basetime + timedelta(days=day, minutes=i, seconds=i%3), i) | ||
|
||
|
||
# Fills a buffer completely up and then gets the data for each of the 29 days | ||
def test_days(days, buffer): | ||
print(".", end="", flush=True) | ||
|
||
fill_buffer(days, buffer) | ||
|
||
basetime = datetime(2022, 1, 1) | ||
|
||
for day in range(days): | ||
minutes = (buffer.window(basetime + timedelta(days=day), | ||
basetime+timedelta(days=day+1))) | ||
|
||
#(f"{basetime + timedelta(days=day)} median {np.average(minutes)}") | ||
#print("%s avg: %s" % (basetime + timedelta(days=day), | ||
# sum(minutes) / len(minutes))) | ||
|
||
# Takes a buffer, fills it up and then excessively gets the data for each day to | ||
# calculate the average/median. | ||
def test_slices(days, buffer): | ||
print(".", end="", flush=True) | ||
fill_buffer(days, buffer) | ||
|
||
# Chose uneven starting point so that for the first/last window data has to | ||
# be copied | ||
basetime = datetime(2022, 1, 1, 0, 5, 13, 88) | ||
|
||
total = 0 | ||
|
||
for _ in range(5): | ||
for day in range(days): | ||
minutes = (buffer.window(basetime + timedelta(days=day), | ||
basetime+timedelta(days=day+1))) | ||
|
||
|
||
total += np.average(minutes) | ||
(f"{basetime + timedelta(days=day)} average {np.average(minutes)}") | ||
(f"{basetime + timedelta(days=day)} median {np.median(minutes)}") | ||
#print(total) | ||
|
||
# Fills a buffer completely up and then gets the data for each of the 29 days | ||
def test_29_days_list(): | ||
test_days(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, 60)) | ||
|
||
def test_29_days_array(): | ||
test_days(29, OrderedRingBuffer(np.empty(shape=MINUTES_IN_29_DAYS,), 60)) | ||
|
||
|
||
def test_29_days_slicing_list(): | ||
test_slices(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, 60)) | ||
|
||
def test_29_days_slicing_array(): | ||
test_slices(29, OrderedRingBuffer(np.empty(shape=MINUTES_IN_29_DAYS,), 60)) | ||
import timeit | ||
|
||
num_runs = 10 | ||
|
||
print(f" {''.join(['='] * (num_runs + 1))}") | ||
print("Array: ", end="") | ||
duration = timeit.Timer(test_29_days_array).timeit(number = num_runs) | ||
print("\nList: ", end="") | ||
durationa = timeit.Timer(test_29_days_list).timeit(number = num_runs) | ||
print("") | ||
|
||
print(f'Time to fill 29 days with data:\n\t' + | ||
f'Array: {duration/num_runs} seconds\n\t' + | ||
f'List: {durationa/num_runs} seconds\n\t' + | ||
f'Diff: {duration/num_runs - durationa/num_runs}') | ||
|
||
print(f" {''.join(['='] * (num_runs + 1))}") | ||
print("Array: ", end="") | ||
duration = timeit.Timer(test_29_days_slicing_array).timeit(number = num_runs) | ||
print("\nList: ", end="") | ||
durationa = timeit.Timer(test_29_days_slicing_list).timeit(number = num_runs) | ||
print("") | ||
|
||
print(f'Time to fill 29 days with data:\n\t' + | ||
f'Array: {duration/num_runs} seconds\n\t' + | ||
f'List: {durationa/num_runs} seconds\n\t' + | ||
f'Diff: {duration/num_runs - durationa/num_runs}') | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,258 @@ | ||
# License: MIT | ||
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH | ||
|
||
"""Ringbuffer implementation with focus on time & memory efficiency""" | ||
|
||
from typing import TypeVar, Generic, Sequence, MutableSequence, Optional, Any | ||
from abc import ABC, abstractmethod | ||
|
||
from collections import deque | ||
|
||
from enum import Enum, auto | ||
|
||
import numpy as np | ||
from itertools import chain | ||
|
||
T = TypeVar('T') | ||
|
||
class AbstractRingBufferContainer(Generic[T], ABC): | ||
def __init__(self, size): | ||
self.size = size | ||
|
||
@abstractmethod | ||
def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None: | ||
"""Write to value to the requested index. | ||
Args: | ||
index: Position to write the value at. | ||
value: Value to write. | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def __getitem__(self, index: int | slice) -> T: | ||
"""Returns the value at the requested index. | ||
Args: | ||
index: Position to write the value at. | ||
Returns: | ||
The requested item. | ||
""" | ||
pass | ||
|
||
def __len__(self): | ||
return self.size | ||
|
||
class RingBuffer(Generic[T]): | ||
|
||
def __init__(self, container: AbstractRingBufferContainer[T]): | ||
"""Initialize the ring buffer with the given container. | ||
Args: | ||
container: Container to store the data in. | ||
""" | ||
self.container = container | ||
self.write_index = 0 | ||
self.read_index = 0 | ||
self.size = 0 | ||
|
||
def __len__(self): | ||
"""Return the amount of items that this container currently holds.""" | ||
return self.size | ||
|
||
def max_size(self): | ||
"""Return the max amount of items this container can hold""" | ||
return len(self.container) | ||
|
||
def push(self, value: T) -> int: | ||
"""Pushes a new value into the ring buffer, returns the index in the | ||
ringbuffer. | ||
Args: | ||
value: value to push into the ring buffer. | ||
""" | ||
|
||
if self.size == len(self.container): | ||
# Move read position one forward, dropping the oldest written value | ||
self.read_index = (self.read_index + 1) % self.container.size | ||
else: | ||
self.size += 1 | ||
|
||
self.container[self.write_index] = value | ||
value_index = self.write_index | ||
self.write_index = (self.write_index + 1) % self.container.size | ||
|
||
return value_index | ||
|
||
def pop(self) -> T: | ||
"""Remove the oldest value from the ring buffer and return it.""" | ||
|
||
if self.size == 0: | ||
raise | ||
|
||
val = self.container[self.read_index] | ||
self.read_index = (self.read_index + 1) % self.container.size | ||
self.size -= 1; | ||
|
||
return val | ||
|
||
def wrap(self, index) -> int: | ||
return index % len(self.container) | ||
|
||
def full(self) -> bool: | ||
return len(self) == len(self.container) | ||
|
||
def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None: | ||
self.container[index] = value | ||
|
||
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: | ||
"""Returns the value at the given index or value range at the given | ||
slice. Does not support wrap-around or copying of data.""" | ||
return self.container[index_or_slice] | ||
|
||
|
||
class ListContainer(AbstractRingBufferContainer, ABC): | ||
def __init__(self, size: int): | ||
AbstractRingBufferContainer.__init__(self, size) | ||
self.data = list([T] * size) | ||
|
||
def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None: | ||
"""""" | ||
self.data.__setitem__(index, value) | ||
|
||
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: | ||
return self.data.__getitem__(index_or_slice) | ||
|
||
|
||
class ArrayContainer(AbstractRingBufferContainer, ABC): | ||
def __init__(self, size: int): | ||
AbstractRingBufferContainer.__init__(self, size) | ||
if (T == float): | ||
self.data = np.empty(shape=(size,), dtype=np.float64) | ||
else: | ||
self.data = np.empty(shape=(size,), dtype=object) | ||
|
||
def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None: | ||
"""""" | ||
self.data[index] = value | ||
|
||
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: | ||
return self.data.__getitem__(index_or_slice) | ||
|
||
|
||
from datetime import datetime, timedelta | ||
|
||
class OrderedRingBuffer(Generic[T]): | ||
"""Time aware ringbuffer that keeps its entries sorted time. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
buffer: Any, | ||
resolution_in_seconds: int, | ||
window_border: datetime = datetime(1, 1, 1), | ||
) -> None: | ||
"""Initialize the time aware ringbuffer. | ||
Args: | ||
buffer: instance of a buffer container to use internally | ||
resolution_in_seconds: resolution of the incoming timestamps in | ||
seconds | ||
window_border: datetime depicting point in time to use as border | ||
beginning, useful to make data start at the beginning of the day or | ||
hour. | ||
""" | ||
self.buffer = buffer | ||
self.resolution_in_seconds = resolution_in_seconds | ||
self.window_start = window_border | ||
|
||
self.missing_windows = [] | ||
self.datetime_newest = datetime.min | ||
self.datetime_oldest = datetime.max | ||
|
||
def max_size(self): | ||
"""Return the max amount of items this container can hold""" | ||
return len(self.buffer) | ||
|
||
|
||
def wrap(self, value): | ||
return value % self.max_size() | ||
|
||
# will be called with None | ||
def update(self, datetime: datetime, value: T, missing: bool = False): | ||
# Update timestamps | ||
if self.datetime_newest < datetime: | ||
self.datetime_newest = datetime | ||
|
||
#if self.datetime_newest - timedelta(seconds=self.resolution_in_seconds * self.max_size()) | ||
|
||
if self.datetime_oldest > datetime: | ||
self.datetime_oldest = datetime | ||
# TODO validate this if | ||
elif self.datetime_oldest < self.datetime_newest - timedelta(seconds=len(self.buffer) * self.resolution_in_seconds): | ||
self.datetime_oldest = self.datetime_newest - timedelta(len(self.buffer) * self.resolution_in_seconds) | ||
|
||
# Update data | ||
insert_index = self.datetime_to_index(datetime) | ||
|
||
self.buffer[insert_index] = value | ||
|
||
# Update missing windows | ||
# | ||
# We always append to the last pending window. | ||
# A window is pending when end == None | ||
if missing: | ||
# Create new if no pending window | ||
if len(self.missing_windows) == 0 or self.missing_windows[-1].end != None: | ||
self.missing_windows.append( | ||
{ 'start': datetime, 'end': None }) | ||
elif len(self.missing_windows) > 0: | ||
# Finalize a pending window | ||
if self.missing_windows[-1].end == None: | ||
self.missing_windows[-1].end = datetime | ||
|
||
# Delete out-to-date windows | ||
if len(self.missing_windows) > 0 and self.missing_windows[0].end != None: | ||
if self.missing_windows[0].end <= self.datetime_oldest: | ||
self.missing_windows = self.missing_windows[1:] | ||
|
||
def datetime_to_index(self, datetime: datetime) -> int: | ||
assert datetime >= self.datetime_oldest | ||
return self.wrap(int(abs((self.window_start - datetime).total_seconds()))) | ||
|
||
def window(self, start: datetime, end: datetime) -> Any: | ||
assert start < end | ||
|
||
start_index = self.datetime_to_index(start) | ||
end_index = self.datetime_to_index(end) | ||
|
||
# Requested window wraps around the ends | ||
if start_index > end_index: | ||
slice = self.buffer[start_index:] | ||
|
||
if end_index > 0: | ||
if type(self.buffer) == list: | ||
slice += self.buffer[0:end_index] | ||
else: | ||
slice = np.concatenate((slice, self.buffer[0:end_index])) | ||
return slice | ||
|
||
|
||
def in_window(window): | ||
if window.start < start and window.end > start: | ||
return True | ||
if window.start < end and window.end > end: | ||
return True | ||
|
||
return False | ||
|
||
# Return a copy if there are none-values in the data | ||
if any(map(in_window, self.missing_windows)): | ||
import copy | ||
return copy.deepcopy(self.buffer[start_index:end_index]) | ||
|
||
return self.buffer[start_index:end_index] | ||
|
||
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: | ||
return self.buffer[index_or_slice] |
Oops, something went wrong.