diff --git a/benchmarks/util/benchmark_ringbuffer.py b/benchmarks/util/benchmark_ringbuffer.py new file mode 100644 index 000000000..9aae062ec --- /dev/null +++ b/benchmarks/util/benchmark_ringbuffer.py @@ -0,0 +1,168 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Performance test for the `Ringbuffer` class.""" + +import random +import timeit +from datetime import datetime, timedelta +from typing import TypeVar + +import numpy as np + +from frequenz.sdk.util.ringbuffer import OrderedRingBuffer + +MINUTES_IN_A_DAY = 24 * 60 +MINUTES_IN_29_DAYS = 29 * MINUTES_IN_A_DAY + + +T = TypeVar("T") + + +def fill_buffer(days: int, buffer: OrderedRingBuffer[T], element_type: type) -> None: + """Fill the given buffer up to the given amount of days, one sample per minute.""" + random.seed(0) + basetime = datetime(2022, 1, 1) + + for day in range(days): + # Push in random order + for i in random.sample(range(MINUTES_IN_A_DAY), MINUTES_IN_A_DAY): + buffer.update( + basetime + timedelta(days=day, minutes=i, seconds=i % 3), + element_type(i), + ) + + +def test_days(days: int, buffer: OrderedRingBuffer[int]) -> None: + """Fills a buffer completely up and then gets the data for each of the 29 days.""" + print(".", end="", flush=True) + + fill_buffer(days, buffer, int) + + basetime = datetime(2022, 1, 1) + + for day in range(days): + # pylint: disable=unused-variable + minutes = buffer.window( + basetime + timedelta(days=day), basetime + timedelta(days=day + 1) + ) + + +def test_slices(days: int, buffer: OrderedRingBuffer[T]) -> None: + """Benchmark slicing. + + Takes a buffer, fills it up and then excessively gets + the data for each day to calculate the average/median. + """ + print(".", end="", flush=True) + fill_buffer(days, buffer, float) + + # Chose uneven starting point so that for the first/last window data has to + # be copied + basetime = datetime(2022, 1, 1, 0, 5, 13, 88) + + total_avg = 0.0 + total_median = 0.0 + + for _ in range(5): + for day in range(days): + minutes = buffer.window( + basetime + timedelta(days=day), basetime + timedelta(days=day + 1) + ) + + total_avg += float(np.average(minutes)) + total_median += float(np.median(minutes)) + + +def test_29_days_list() -> None: + """Run the 29 day test on the list backend.""" + test_days(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, 60)) + + +def test_29_days_array() -> None: + """Run the 29 day test on the array backend.""" + test_days( + 29, + OrderedRingBuffer( + np.empty( + shape=MINUTES_IN_29_DAYS, + ), + 60, + ), + ) + + +def test_29_days_slicing_list() -> None: + """Run slicing tests on list backend.""" + test_slices(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, 60)) + + +def test_29_days_slicing_array() -> None: + """Run slicing tests on array backend.""" + test_slices( + 29, + OrderedRingBuffer( + np.empty( + shape=MINUTES_IN_29_DAYS, + ), + 60, + ), + ) + + +def main() -> None: + """Run benchmark. + + Result of previous run: + + Date: Do 22. Dez 15:03:05 CET 2022 + Result: + + ========================================= + Array: ........................................ + List: ........................................ + Time to fill 29 days with data: + Array: 0.09411649959984061 seconds + List: 0.0906366748000437 seconds + Diff: 0.0034798247997969156 + ========================================= + Array: ........................................ + List: ........................................ + Filling 29 days and running average & mean on every day: + Array: 0.09842290654996759 seconds + List: 0.1316629376997298 seconds + Diff: -0.03324003114976222 + """ + num_runs = 40 + + print(f" {''.join(['='] * (num_runs + 1))}") + print("Array: ", end="") + duration_array = timeit.Timer(test_29_days_array).timeit(number=num_runs) + print("\nList: ", end="") + duration_list = timeit.Timer(test_29_days_list).timeit(number=num_runs) + print("") + + print( + "Time to fill 29 days with data:\n\t" + + f"Array: {duration_array/num_runs} seconds\n\t" + + f"List: {duration_list/num_runs} seconds\n\t" + + f"Diff: {duration_array/num_runs - duration_list/num_runs}" + ) + + print(f" {''.join(['='] * (num_runs + 1))}") + print("Array: ", end="") + duration_array = timeit.Timer(test_29_days_slicing_array).timeit(number=num_runs) + print("\nList: ", end="") + duration_list = timeit.Timer(test_29_days_slicing_list).timeit(number=num_runs) + print("") + + print( + "Filling 29 days and running average & mean on every day:\n\t" + + f"Array: {duration_array/num_runs} seconds\n\t" + + f"List: {duration_list/num_runs} seconds\n\t" + + f"Diff: {duration_array/num_runs - duration_list/num_runs}" + ) + + +if __name__ == "__main__": + main() diff --git a/src/frequenz/sdk/util/ringbuffer.py b/src/frequenz/sdk/util/ringbuffer.py new file mode 100644 index 000000000..00f75d2d1 --- /dev/null +++ b/src/frequenz/sdk/util/ringbuffer.py @@ -0,0 +1,332 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Ringbuffer implementation with focus on time & memory efficiency.""" + +from __future__ import annotations + +from copy import deepcopy +from datetime import datetime, timedelta +from typing import Any, Generic, Sequence, TypeVar, Union + +import numpy as np + +T = TypeVar("T") + +Container = Union[list, np.ndarray] + + +class RingBuffer(Generic[T]): + """A ring buffer with a fixed size. + + Should work with most backends, tested with list and np.ndarrays. + """ + + def __init__(self, container: Container) -> None: + """Initialize the ring buffer with the given container. + + Args: + container: Container to store the data in. + """ + self._container = container + self._write_index = 0 + self._read_index = 0 + self._len = 0 + + def __len__(self) -> int: + """Get current amount of elements. + + Returns: + The amount of items that this container currently holds. + """ + return self._len + + @property + def maxlen(self) -> int: + """Get the max length. + + Returns: + The max amount of items this container can hold. + """ + return len(self._container) + + def push(self, value: T) -> int: + """Push a new value into the ring buffer. + + Args: + value: Value to push into the ring buffer. + + Returns: + The index in the ringbuffer. + """ + if self._len == len(self._container): + # Move read position one forward, dropping the oldest written value + self._read_index = self._wrap(self._read_index + 1) + else: + self._len += 1 + + self._container[self._write_index] = value + value_index = self._write_index + self._write_index = self._wrap(self._write_index + 1) + + return value_index + + def pop(self) -> T: + """Remove the oldest value from the ring buffer and return it. + + Raises: + IndexError: when no elements exist to pop. + + Returns: + Oldest value found in the ring buffer. + """ + if self._len == 0: + raise IndexError() + + val = self._container[self._read_index] + self._read_index = (self._read_index + 1) % len(self._container) + self._len -= 1 + + return val + + @property + def is_full(self) -> bool: + """Check if the container is full. + + Returns: + True when the container is full. + """ + return len(self) == len(self._container) + + def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None: + """Write the given value to the requested position. + + Args: + index: Position to write the value to. + value: Value to write. + """ + if isinstance(index, int): + self._container[self._wrap(index + self._read_index)] = value + else: + wstart = self._wrap(index.start) + wstop = self._wrap(index.stop) + if wstop < wstart: + self._container[wstart:] = value[:self.maxlen-wstart] + self._container[:wstop] = value[self.maxlen-wstart:] + else + self._container[wstart:wstop] = value + + + + def __getitem__(self, index_or_slice: int | slice) -> T | Container: + """Request a value or slice. + + Does not support wrap-around or copying of data. + + Args: + index_or_slice: Index or slice specification of the requested data + + Returns: + the value at the given index or value range at the given slice. + """ + return self._container[index_or_slice] + + def _wrap(self, index: int) -> int: + return index % len(self._container) + + +class OrderedRingBuffer(Generic[T]): + """Time aware ringbuffer that keeps its entries sorted time.""" + + def __init__( + self, + buffer: Any, + sample_rate: int, + window_border: datetime = datetime(1, 1, 1), + ) -> None: + """Initialize the time aware ringbuffer. + + Args: + buffer: instance of a buffer container to use internally + sample_rate: resolution of the incoming timestamps in + seconds + window_border: datetime depicting point in time to use as border + beginning, useful to make data start at the beginning of the day or + hour. + """ + self._buffer = buffer + self._sample_rate = sample_rate + self._window_start = window_border + + self._missing_windows = [] + self._datetime_newest = datetime.min + self._datetime_oldest = datetime.max + + @property + def maxlen(self) -> int: + """Get the max length. + + Returns: + the max amount of items this container can hold. + """ + return len(self._buffer) + + def update(self, timestamp: datetime, value: T, missing: bool = False) -> None: + """Update the buffer with a new value for the given timestamp. + + Args: + timestamp: Timestamp of the new value + value: value to add + missing: if true, the given timestamp will be recorded as missing. + The value will still be written. + + Returns: + Nothing. + """ + # Update timestamps + self._datetime_newest = max(self._datetime_newest, timestamp) + self._datetime_oldest = min(self._datetime_oldest, timestamp) + + if self._datetime_oldest < self._datetime_newest - timedelta( + seconds=len(self._buffer) * self._sample_rate + ): + self._datetime_oldest = self._datetime_newest - timedelta( + len(self._buffer) * self._sample_rate + ) + + # Update data + insert_index = self.datetime_to_index(timestamp) + + self._buffer[insert_index] = value + + # Update list of missing windows + # + # We always append to the last pending window. + # A window is pending when end is None + if missing: + # Create new if no pending window + if ( + len(self._missing_windows) == 0 + or self._missing_windows[-1].end is not None + ): + self._missing_windows.append({"start": timestamp, "end": None}) + elif len(self._missing_windows) > 0: + # Finalize a pending window + if self._missing_windows[-1].end is None: + self._missing_windows[-1].end = timestamp + + # Delete out-to-date windows + if len(self._missing_windows) > 0 and self._missing_windows[0].end is not None: + if self._missing_windows[0].end <= self._datetime_oldest: + self._missing_windows = self._missing_windows[1:] + + def datetime_to_index(self, timestamp: datetime) -> int: + """Convert the given timestamp to an index. + + Throws an index error when the timestamp is not found within this + buffer. + + Args: + timestamp: Timestamp to convert. + + Raises: + IndexError: when requesting a timestamp outside the range this container holds + + Returns: + index where the value for the given timestamp can be found. + """ + if self._datetime_newest < timestamp or timestamp < self._datetime_oldest: + raise IndexError( + f"Requested timestamp {timestamp} is is " + f"outside the range [{self._datetime_oldest} - {self._datetime_newest}]" + ) + + return self._wrap(int(abs((self._window_start - timestamp).total_seconds()))) + + def window(self, start: datetime, end: datetime, force_copy: bool = False) -> Container: + """Request a view on the data between start timestamp and end timestamp. + + Will return a copy in the following cases: + * The requested time period is crossing the start/end of the buffer + * The requested time period contains missing entries. + * The force_copy parameter was set to True (default False) + + This means, if the caller needs to modify the data to account for + missing entries, they can safely do so. + + Args: + start: start time of the window + end: end time of the window + + Returns: + the requested window + """ + assert start < end + + start_index = self.datetime_to_index(start) + end_index = self.datetime_to_index(end) + + # Requested window wraps around the ends + if start_index > end_index: + window = self._buffer[start_index:] + + if end_index > 0: + if isinstance(self._buffer, list): + window += self._buffer[0:end_index] + else: + window = np.concatenate((window, self._buffer[0:end_index])) + return window + + def in_window(window): + if window.start <= start < window.end: + return True + if window.start <= end < window.end: + return True + + return False + + # Return a copy if there are none-values in the data + if any(map(in_window, self._missing_windows)) or force_copy: + return deepcopy(self._buffer[start_index:end_index]) + + return self._buffer[start_index:end_index] + + def _wrap(self, index: int) -> int: + """Normalize the given index to fit in the buffer by wrapping it around. + + Args: + index: index to normalize. + + Returns: + an index that will be within maxlen. + """ + return index % self.maxlen + + def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: + """Get item or slice at requested position. + + No wrapping of the index will be done. + + Args: + index_or_slice: Index or slice specification of the requested data. + + Returns: + The requested value or slice. + """ + return self._buffer[index_or_slice] + + def __len__(self) -> int: + """Return the amount of items that this container currently holds. + + Returns: + The length. + """ + if self._datetime_newest == datetime.min: + return 0 + + start_index = self.datetime_to_index(self._datetime_oldest) + end_index = self.datetime_to_index(self._datetime_newest) + + if end_index < start_index: + return len(self._buffer) - start_index + end_index + return start_index - end_index diff --git a/tests/utils/ringbuffer.py b/tests/utils/ringbuffer.py new file mode 100644 index 000000000..915de7d12 --- /dev/null +++ b/tests/utils/ringbuffer.py @@ -0,0 +1,140 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Tests for the `Ringbuffer` class.""" + +from __future__ import annotations + +import random +from datetime import datetime +from itertools import cycle, islice +from typing import TypeVar + +import numpy as np +import pytest + +from frequenz.sdk.util.ringbuffer import OrderedRingBuffer, RingBuffer + +T = TypeVar("T") + + +@pytest.mark.parametrize( + "buffer", + [ + RingBuffer[int]([0] * 50000), + RingBuffer[int](np.empty(shape=(50000,), dtype=np.float64)), + ], +) +def test_simple_push_pop(buffer: RingBuffer[int]) -> None: + """Test simple pushing/popping of RingBuffer.""" + for i in range(buffer.maxlen): + buffer.push(i) + + assert len(buffer) == buffer.maxlen + + for i in range(buffer.maxlen): + assert i == buffer.pop() + + +@pytest.mark.parametrize( + "buffer", + [ + RingBuffer[int]([0] * 50000), + RingBuffer[int](np.empty(shape=(50000,), dtype=np.float64)), + ], +) +def test_push_pop_over_limit(buffer: RingBuffer[int]) -> None: + """Test pushing over the limit and the expected loss of data.""" + over_limit_pushes = 1000 + + for i in range(buffer.maxlen + over_limit_pushes): + buffer.push(i) + + assert len(buffer) == buffer.maxlen + + for i in range(buffer.maxlen): + assert i + over_limit_pushes == buffer.pop() + + assert len(buffer) == 0 + + +@pytest.mark.parametrize( + "buffer, element_type", + [ + (RingBuffer[int]([0] * 5000), int), + (RingBuffer[float](np.empty(shape=(5000,), dtype=np.float64)), float), + ], +) +def test_slicing(buffer: RingBuffer[T], element_type: type) -> None: + """Test slicing method.""" + for i in range(buffer.maxlen): + buffer.push(element_type(i)) + + # Wrap in extra list() otherwise pytest complains about numpy arrays + # pylint: disable=protected-access + assert list(buffer._container) == list(range(buffer.maxlen)) + + +@pytest.mark.parametrize( + "buffer", + [ + OrderedRingBuffer[int]([0] * 24, 1), + OrderedRingBuffer[float](np.empty(shape=(24,), dtype=np.float64), 1), + ], +) +def test_timestamp_ringbuffer(buffer: OrderedRingBuffer[float | int]) -> None: + """Test ordered ring buffer.""" + size = buffer.maxlen + + # import pdb; pdb.set_trace() + random.seed(0) + + # Push in random order + for i in random.sample(range(size), size): + buffer.update(datetime.fromtimestamp(200 + i), i) + + # Check all possible window sizes and start positions + for i in range(size): + for j in range(1, size): + start = datetime.fromtimestamp(200 + i) + end = datetime.fromtimestamp(200 + j + i) + + tmp = list(islice(cycle(range(0, size)), i, i + j)) + assert list(buffer.window(start, end)) == list(tmp) + + +@pytest.mark.parametrize( + "buffer", + [ + (OrderedRingBuffer[float]([0] * 24, 1)), + (OrderedRingBuffer[float](np.empty(shape=(24,), dtype=np.float64), 1)), + ], +) +def test_timestamp_ringbuffer_overwrite(buffer: OrderedRingBuffer[float | int]) -> None: + """Test overwrite behavior and correctness.""" + size = buffer.maxlen + + # import pdb; pdb.set_trace() + random.seed(0) + + # Push in random order + for i in random.sample(range(size), size): + buffer.update(datetime.fromtimestamp(200 + i), i) + + # Push the same amount twice + for i in random.sample(range(size), size): + buffer.update(datetime.fromtimestamp(200 + i), i * 2) + + # Check all possible window sizes and start positions + for i in range(size): + for j in range(1, size): + start = datetime.fromtimestamp(200 + i) + end = datetime.fromtimestamp(200 + j + i) + + tmp = islice(cycle(range(0, size * 2, 2)), i, i + j) + # assert list(buffer.window(start, end)) == list(tmp) + actual: float + for actual, expectation in zip(buffer.window(start, end), tmp): + assert actual == expectation + + assert j == len(buffer.window(start, end))