diff --git a/benchmarks/benchmark_ringbuffer.py b/benchmarks/benchmark_ringbuffer.py new file mode 100644 index 000000000..d9583692c --- /dev/null +++ b/benchmarks/benchmark_ringbuffer.py @@ -0,0 +1,135 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +""" +Performance test for the `Ringbuffer` class +""" + +import random +from datetime import datetime, timedelta + +import numpy as np + +from frequenz.sdk.util.ringbuffer import ( + ArrayContainer, + ListContainer, + OrderedRingBuffer, + RingBuffer, +) + +MINUTES_IN_29_DAYS = 29 * 24 * 60 + + +def fill_buffer(days, buffer): + """Fill the given buffer up to the given amount of days, one sample per + minute""" + random.seed(0) + basetime = datetime(2022, 1, 1) + + for day in range(days): + # Push in random order + for i in random.sample(range(24 * 60), 24 * 60): + buffer.update(basetime + timedelta(days=day, minutes=i, seconds=i % 3), i) + + +# Fills a buffer completely up and then gets the data for each of the 29 days +def test_days(days, buffer): + print(".", end="", flush=True) + + fill_buffer(days, buffer) + + basetime = datetime(2022, 1, 1) + + for day in range(days): + minutes = buffer.window( + basetime + timedelta(days=day), basetime + timedelta(days=day + 1) + ) + +# Takes a buffer, fills it up and then excessively gets the data for each day to +# calculate the average/median. +def test_slices(days, buffer): + print(".", end="", flush=True) + fill_buffer(days, buffer) + + # Chose uneven starting point so that for the first/last window data has to + # be copied + basetime = datetime(2022, 1, 1, 0, 5, 13, 88) + + total = 0 + + for _ in range(5): + for day in range(days): + minutes = buffer.window( + basetime + timedelta(days=day), basetime + timedelta(days=day + 1) + ) + + total += np.average(minutes) + (f"{basetime + timedelta(days=day)} average {np.average(minutes)}") + (f"{basetime + timedelta(days=day)} median {np.median(minutes)}") + # print(total) + + +# Fills a buffer completely up and then gets the data for each of the 29 days +def test_29_days_list(): + test_days(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, 60)) + + +def test_29_days_array(): + test_days( + 29, + OrderedRingBuffer( + np.empty( + shape=MINUTES_IN_29_DAYS, + ), + 60, + ), + ) + + +def test_29_days_slicing_list(): + test_slices(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, 60)) + + +def test_29_days_slicing_array(): + test_slices( + 29, + OrderedRingBuffer( + np.empty( + shape=MINUTES_IN_29_DAYS, + ), + 60, + ), + ) + + +import timeit + +num_runs = 40 + +print(f" {''.join(['='] * (num_runs + 1))}") +print("Array: ", end="") +duration_array = timeit.Timer(test_29_days_array).timeit(number=num_runs) +print("\nList: ", end="") +duration_list = timeit.Timer(test_29_days_list).timeit(number=num_runs) +print("") + +print( + f"Time to fill 29 days with data:\n\t" + + f"Array: {duration_array/num_runs} seconds\n\t" + + f"List: {duration_list/num_runs} seconds\n\t" + + f"Diff: {duration_array/num_runs - duration_list/num_runs}" +) + +print(f" {''.join(['='] * (num_runs + 1))}") +print("Array: ", end="") +duration_array = timeit.Timer(test_29_days_slicing_array).timeit(number=num_runs) +print("\nList: ", end="") +duration_list = timeit.Timer(test_29_days_slicing_list).timeit(number=num_runs) +print("") + +print( + f"Filling 29 days and running average & mean on every day:\n\t" + + f"Array: {duration_array/num_runs} seconds\n\t" + + f"List: {duration_list/num_runs} seconds\n\t" + + f"Diff: {duration_array/num_runs - duration_list/num_runs}" +) diff --git a/src/frequenz/sdk/util/ringbuffer.py b/src/frequenz/sdk/util/ringbuffer.py new file mode 100644 index 000000000..ae5881234 --- /dev/null +++ b/src/frequenz/sdk/util/ringbuffer.py @@ -0,0 +1,240 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Ringbuffer implementation with focus on time & memory efficiency""" + +from abc import ABC, abstractmethod +from datetime import datetime, timedelta +from typing import Any, Generic, Sequence, TypeVar + +import numpy as np + +T = TypeVar("T") + +def RingBufferArray(size: int): + return RingBuffer[T](np.empty(shape=(size,), dtype=np.float64)) + + + +class RingBuffer(Generic[T]): + def __init__(self, container: Any): + """Initialize the ring buffer with the given container. + + Args: + container: Container to store the data in. + """ + self.container = container + self.write_index = 0 + self.read_index = 0 + self.size = 0 + + def __len__(self): + """Return the amount of items that this container currently holds.""" + return self.size + + def max_size(self): + """Return the max amount of items this container can hold""" + return len(self.container) + + def push(self, value: T) -> int: + """Pushes a new value into the ring buffer, returns the index in the + ringbuffer. + + Args: + value: value to push into the ring buffer. + """ + + if self.size == len(self.container): + # Move read position one forward, dropping the oldest written value + self.read_index = (self.read_index + 1) % len(self.container) + else: + self.size += 1 + + self.container[self.write_index] = value + value_index = self.write_index + self.write_index = (self.write_index + 1) % len(self.container) + + return value_index + + def pop(self) -> T: + """Remove the oldest value from the ring buffer and return it.""" + + if self.size == 0: + raise + + val = self.container[self.read_index] + self.read_index = (self.read_index + 1) % len(self.container) + self.size -= 1 + + return val + + def wrap(self, index) -> int: + return index % len(self.container) + + def full(self) -> bool: + return len(self) == len(self.container) + + def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None: + self.container[index] = value + + def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: + """Returns the value at the given index or value range at the given + slice. Does not support wrap-around or copying of data.""" + return self.container[index_or_slice] + + +class OrderedRingBuffer(Generic[T]): + """Time aware ringbuffer that keeps its entries sorted time.""" + + def __init__( + self, + buffer: Any, + resolution_in_seconds: int, + window_border: datetime = datetime(1, 1, 1), + ) -> None: + """Initialize the time aware ringbuffer. + + Args: + buffer: instance of a buffer container to use internally + resolution_in_seconds: resolution of the incoming timestamps in + seconds + window_border: datetime depicting point in time to use as border + beginning, useful to make data start at the beginning of the day or + hour. + """ + self.buffer = buffer + self.resolution_in_seconds = resolution_in_seconds + self.window_start = window_border + + self.missing_windows = [] + self.datetime_newest = datetime.min + self.datetime_oldest = datetime.max + + def max_size(self): + """Return the max amount of items this container can hold.""" + return len(self.buffer) + + def update(self, datetime: datetime, value: T, missing: bool = False): + """Update the buffer with a new value for the given timestamp. + + Args: + datetime: Timestamp of the new value + value: value to add + missing: if true, the given timestamp will be recorded as missing. + The value will still be written. + """ + # Update timestamps + if self.datetime_newest < datetime: + self.datetime_newest = datetime + + if self.datetime_oldest > datetime: + self.datetime_oldest = datetime + elif self.datetime_oldest < self.datetime_newest - timedelta( + seconds=len(self.buffer) * self.resolution_in_seconds + ): + self.datetime_oldest = self.datetime_newest - timedelta( + len(self.buffer) * self.resolution_in_seconds + ) + + # Update data + insert_index = self.datetime_to_index(datetime) + + self.buffer[insert_index] = value + + # Update list of missing windows + # + # We always append to the last pending window. + # A window is pending when end == None + if missing: + # Create new if no pending window + if len(self.missing_windows) == 0 or self.missing_windows[-1].end != None: + self.missing_windows.append({"start": datetime, "end": None}) + elif len(self.missing_windows) > 0: + # Finalize a pending window + if self.missing_windows[-1].end == None: + self.missing_windows[-1].end = datetime + + # Delete out-to-date windows + if len(self.missing_windows) > 0 and self.missing_windows[0].end != None: + if self.missing_windows[0].end <= self.datetime_oldest: + self.missing_windows = self.missing_windows[1:] + + def datetime_to_index(self, datetime: datetime) -> int: + """Convert the given timestamp to an index. + + Throws an index error when the timestamp is not found within this + buffer. + + Args: + datetime: Timestamp to convert. + Returns: + index where the value for the given timestamp can be found. + """ + if datetime < self.datetime_oldest: + raise IndexError() + + return self.wrap(int(abs((self.window_start - datetime).total_seconds()))) + + def window(self, start: datetime, end: datetime) -> Any: + """Request a view on the data between start timestamp and end timestamp. + + Will return a copy in the following cases: + * The requested time period is crossing the start/end of the buffer + * The requested time period contains missing entries. + + This means, if the caller needs to modify the data to account for + missing entries, they can safely do so. + + Args: + start: start time of the window + end: end time of the window + Returns: + the requested window + """ + + assert start < end + + start_index = self.datetime_to_index(start) + end_index = self.datetime_to_index(end) + + # Requested window wraps around the ends + if start_index > end_index: + slice = self.buffer[start_index:] + + if end_index > 0: + if type(self.buffer) == list: + slice += self.buffer[0:end_index] + else: + slice = np.concatenate((slice, self.buffer[0:end_index])) + return slice + + def in_window(window): + if window.start < start and window.end > start: + return True + if window.start < end and window.end > end: + return True + + return False + + # Return a copy if there are none-values in the data + if any(map(in_window, self.missing_windows)): + import copy + + return copy.deepcopy(self.buffer[start_index:end_index]) + + return self.buffer[start_index:end_index] + + def wrap(self, index: int): + """Normalizes the given index to fit in the buffer by wrapping it + around. + + Args: + index: index to normalize. + + Returns: + an index that will be within max_size. + """ + return index % self.max_size() + + def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: + return self.buffer[index_or_slice] diff --git a/tests/utils/ringbuffer.py b/tests/utils/ringbuffer.py new file mode 100644 index 000000000..bfd048742 --- /dev/null +++ b/tests/utils/ringbuffer.py @@ -0,0 +1,162 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +""" +Tests for the `Ringbuffer` class +""" + +import random +from datetime import datetime, timedelta + +import numpy as np + +from frequenz.sdk.util.ringbuffer import ( + OrderedRingBuffer, + RingBuffer, +) + + +def _test_simple_push_pop(container): + buffer = RingBuffer(container) + + for i in range(len(container)): + buffer.push(i) + + assert len(buffer) == len(container) + + for i in range(len(container)): + assert i == buffer.pop() + + +def test_simple_push_pop_list() -> None: + _test_simple_push_pop([0] * 50000) + + +def test_simple_push_pop_array() -> None: + _test_simple_push_pop(np.empty(shape=(50000,), dtype=np.float64)) + + +def _test_push_pop_over_limit(container): + buffer = RingBuffer(container) + + over_limit_pushes = 1000 + + for i in range(len(container) + over_limit_pushes): + buffer.push(i) + + assert len(buffer) == len(container) + + for i in range(len(container)): + assert i + over_limit_pushes == buffer.pop() + + assert len(buffer) == 0 + + +def test_push_pop_over_limit_list() -> None: + _test_push_pop_over_limit([0] * 50000) + + +def test_push_pop_over_limit_array() -> None: + _test_push_pop_over_limit(np.empty(shape=(50000,), dtype=np.float64)) + + +def _test_slicing(container): + buffer = RingBuffer(container) + + for i in range(len(container)): + buffer.push(i) + + # Wrap in extra list() otherwise pytest complains about numpy arrays + assert list(buffer[0:]) == list(range(len(container))) + + +def test_simple_slicing_list() -> None: + _test_slicing([0] * 2000) + + +def test_simple_slicing_array() -> None: + _test_slicing(np.empty(shape=(2000,), dtype=np.float64)) + + +from typing import Any, Type, TypeVar + + +def _test_timestamp_ringbuffer(buffer, make_container: Any) -> None: + size = buffer.max_size() + + # import pdb; pdb.set_trace() + random.seed(0) + + # Push in random order + for i in random.sample(range(size), size): + buffer.update(datetime.fromtimestamp(200 + i), i) + + # Check all possible window sizes and start positions + for i in range(size): + for j in range(1, size): + start = datetime.fromtimestamp(200 + i) + end = datetime.fromtimestamp(200 + j + i) + from itertools import cycle, islice + + tmp = list(islice(cycle(range(0, size)), i, i + j)) + assert list(buffer.window(start, end)) == list(tmp) + + +# map(lambda x: datetime(1, 1, 1, second=x), range(size)) + + +def test_timestamp_ringbuffer_list() -> None: + _test_timestamp_ringbuffer(OrderedRingBuffer([0] * 24, 1), lambda x: list(x)) + + +def test_timestamp_ringbuffer_array() -> None: + import numpy as np + + _test_timestamp_ringbuffer( + OrderedRingBuffer(np.empty(shape=(24,), dtype=np.float64), 1), + lambda x: np.array(x), + ) + + +def _test_timestamp_ringbuffer_overwrite(buffer, make_container: Any) -> None: + size = buffer.max_size() + + # import pdb; pdb.set_trace() + random.seed(0) + + # Push in random order + for i in random.sample(range(size), size): + buffer.update(datetime.fromtimestamp(200 + i), i) + + # Push the same amount twice + for i in random.sample(range(size), size): + buffer.update(datetime.fromtimestamp(200 + i), i * 2) + + # Check all possible window sizes and start positions + for i in range(size): + for j in range(1, size): + start = datetime.fromtimestamp(200 + i) + end = datetime.fromtimestamp(200 + j + i) + from itertools import cycle, islice + + tmp = islice(cycle(range(0, size * 2, 2)), i, i + j) + # assert list(buffer.window(start, end)) == list(tmp) + for actual, expectation in zip(buffer.window(start, end), tmp): + assert actual == expectation + + assert j == len(buffer.window(start, end)) + + +def test_timestamp_ringbuffer_overwrite_list() -> None: + _test_timestamp_ringbuffer_overwrite( + OrderedRingBuffer([0] * 24, 1), lambda x: list(x) + ) + + +def test_timestamp_ringbuffer_overwrite_array() -> None: + import numpy as np + + _test_timestamp_ringbuffer_overwrite( + OrderedRingBuffer(np.empty(shape=(24,), dtype=np.float64), 1), + lambda x: np.array(x), + )