diff --git a/benchmarks/benchmark_ringbuffer.py b/benchmarks/benchmark_ringbuffer.py new file mode 100644 index 000000000..1b79753a8 --- /dev/null +++ b/benchmarks/benchmark_ringbuffer.py @@ -0,0 +1,107 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +""" +Performance test for the `Ringbuffer` class +""" + +from frequenz.sdk.util.ringbuffer import RingBuffer, ArrayContainer, ListContainer, OrderedRingBuffer + +import numpy as np +import random +from datetime import datetime, timedelta + +MINUTES_IN_29_DAYS=29 * 24 * 60 + +def fill_buffer(days, buffer): + """Fill the given buffer up to the given amount of days, one sample per + minute""" + random.seed(0) + basetime = datetime(2022, 1, 1) + + for day in range(days): + # Push in random order + for i in random.sample(range(24 * 60), 24 * 60): + buffer.update(basetime + timedelta(days=day, minutes=i, seconds=i%3), i) + + +# Fills a buffer completely up and then gets the data for each of the 29 days +def test_days(days, buffer): + print(".", end="", flush=True) + + fill_buffer(days, buffer) + + basetime = datetime(2022, 1, 1) + + for day in range(days): + minutes = (buffer.window(basetime + timedelta(days=day), + basetime+timedelta(days=day+1))) + + #(f"{basetime + timedelta(days=day)} median {np.average(minutes)}") + #print("%s avg: %s" % (basetime + timedelta(days=day), + # sum(minutes) / len(minutes))) + +# Takes a buffer, fills it up and then excessively gets the data for each day to +# calculate the average/median. +def test_slices(days, buffer): + print(".", end="", flush=True) + fill_buffer(days, buffer) + + # Chose uneven starting point so that for the first/last window data has to + # be copied + basetime = datetime(2022, 1, 1, 0, 5, 13, 88) + + total = 0 + + for _ in range(5): + for day in range(days): + minutes = (buffer.window(basetime + timedelta(days=day), + basetime+timedelta(days=day+1))) + + + total += np.average(minutes) + (f"{basetime + timedelta(days=day)} average {np.average(minutes)}") + (f"{basetime + timedelta(days=day)} median {np.median(minutes)}") + #print(total) + +# Fills a buffer completely up and then gets the data for each of the 29 days +def test_29_days_list(): + test_days(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, 60)) + +def test_29_days_array(): + test_days(29, OrderedRingBuffer(np.empty(shape=MINUTES_IN_29_DAYS,), 60)) + + +def test_29_days_slicing_list(): + test_slices(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, 60)) + +def test_29_days_slicing_array(): + test_slices(29, OrderedRingBuffer(np.empty(shape=MINUTES_IN_29_DAYS,), 60)) +import timeit + +num_runs = 10 + +print(f" {''.join(['='] * (num_runs + 1))}") +print("Array: ", end="") +duration = timeit.Timer(test_29_days_array).timeit(number = num_runs) +print("\nList: ", end="") +durationa = timeit.Timer(test_29_days_list).timeit(number = num_runs) +print("") + +print(f'Time to fill 29 days with data:\n\t' + + f'Array: {duration/num_runs} seconds\n\t' + + f'List: {durationa/num_runs} seconds\n\t' + + f'Diff: {duration/num_runs - durationa/num_runs}') + +print(f" {''.join(['='] * (num_runs + 1))}") +print("Array: ", end="") +duration = timeit.Timer(test_29_days_slicing_array).timeit(number = num_runs) +print("\nList: ", end="") +durationa = timeit.Timer(test_29_days_slicing_list).timeit(number = num_runs) +print("") + +print(f'Time to fill 29 days with data:\n\t' + + f'Array: {duration/num_runs} seconds\n\t' + + f'List: {durationa/num_runs} seconds\n\t' + + f'Diff: {duration/num_runs - durationa/num_runs}') + diff --git a/src/frequenz/sdk/util/ringbuffer.py b/src/frequenz/sdk/util/ringbuffer.py new file mode 100644 index 000000000..bf6326a71 --- /dev/null +++ b/src/frequenz/sdk/util/ringbuffer.py @@ -0,0 +1,258 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Ringbuffer implementation with focus on time & memory efficiency""" + +from typing import TypeVar, Generic, Sequence, MutableSequence, Optional, Any +from abc import ABC, abstractmethod + +from collections import deque + +from enum import Enum, auto + +import numpy as np +from itertools import chain + +T = TypeVar('T') + +class AbstractRingBufferContainer(Generic[T], ABC): + def __init__(self, size): + self.size = size + + @abstractmethod + def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None: + """Write to value to the requested index. + + Args: + index: Position to write the value at. + value: Value to write. + """ + pass + + @abstractmethod + def __getitem__(self, index: int | slice) -> T: + """Returns the value at the requested index. + + Args: + index: Position to write the value at. + + Returns: + The requested item. + """ + pass + + def __len__(self): + return self.size + +class RingBuffer(Generic[T]): + + def __init__(self, container: AbstractRingBufferContainer[T]): + """Initialize the ring buffer with the given container. + + Args: + container: Container to store the data in. + """ + self.container = container + self.write_index = 0 + self.read_index = 0 + self.size = 0 + + def __len__(self): + """Return the amount of items that this container currently holds.""" + return self.size + + def max_size(self): + """Return the max amount of items this container can hold""" + return len(self.container) + + def push(self, value: T) -> int: + """Pushes a new value into the ring buffer, returns the index in the + ringbuffer. + + Args: + value: value to push into the ring buffer. + """ + + if self.size == len(self.container): + # Move read position one forward, dropping the oldest written value + self.read_index = (self.read_index + 1) % self.container.size + else: + self.size += 1 + + self.container[self.write_index] = value + value_index = self.write_index + self.write_index = (self.write_index + 1) % self.container.size + + return value_index + + def pop(self) -> T: + """Remove the oldest value from the ring buffer and return it.""" + + if self.size == 0: + raise + + val = self.container[self.read_index] + self.read_index = (self.read_index + 1) % self.container.size + self.size -= 1; + + return val + + def wrap(self, index) -> int: + return index % len(self.container) + + def full(self) -> bool: + return len(self) == len(self.container) + + def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None: + self.container[index] = value + + def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: + """Returns the value at the given index or value range at the given + slice. Does not support wrap-around or copying of data.""" + return self.container[index_or_slice] + + +class ListContainer(AbstractRingBufferContainer, ABC): + def __init__(self, size: int): + AbstractRingBufferContainer.__init__(self, size) + self.data = list([T] * size) + + def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None: + """""" + self.data.__setitem__(index, value) + + def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: + return self.data.__getitem__(index_or_slice) + + +class ArrayContainer(AbstractRingBufferContainer, ABC): + def __init__(self, size: int): + AbstractRingBufferContainer.__init__(self, size) + if (T == float): + self.data = np.empty(shape=(size,), dtype=np.float64) + else: + self.data = np.empty(shape=(size,), dtype=object) + + def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None: + """""" + self.data[index] = value + + def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: + return self.data.__getitem__(index_or_slice) + + +from datetime import datetime, timedelta + +class OrderedRingBuffer(Generic[T]): + """Time aware ringbuffer that keeps its entries sorted time. + """ + + def __init__( + self, + buffer: Any, + resolution_in_seconds: int, + window_border: datetime = datetime(1, 1, 1), + ) -> None: + """Initialize the time aware ringbuffer. + + Args: + buffer: instance of a buffer container to use internally + resolution_in_seconds: resolution of the incoming timestamps in + seconds + window_border: datetime depicting point in time to use as border + beginning, useful to make data start at the beginning of the day or + hour. + """ + self.buffer = buffer + self.resolution_in_seconds = resolution_in_seconds + self.window_start = window_border + + self.missing_windows = [] + self.datetime_newest = datetime.min + self.datetime_oldest = datetime.max + + def max_size(self): + """Return the max amount of items this container can hold""" + return len(self.buffer) + + + def wrap(self, value): + return value % self.max_size() + + # will be called with None + def update(self, datetime: datetime, value: T, missing: bool = False): + # Update timestamps + if self.datetime_newest < datetime: + self.datetime_newest = datetime + + #if self.datetime_newest - timedelta(seconds=self.resolution_in_seconds * self.max_size()) + + if self.datetime_oldest > datetime: + self.datetime_oldest = datetime + # TODO validate this if + elif self.datetime_oldest < self.datetime_newest - timedelta(seconds=len(self.buffer) * self.resolution_in_seconds): + self.datetime_oldest = self.datetime_newest - timedelta(len(self.buffer) * self.resolution_in_seconds) + + # Update data + insert_index = self.datetime_to_index(datetime) + + self.buffer[insert_index] = value + + # Update missing windows + # + # We always append to the last pending window. + # A window is pending when end == None + if missing: + # Create new if no pending window + if len(self.missing_windows) == 0 or self.missing_windows[-1].end != None: + self.missing_windows.append( + { 'start': datetime, 'end': None }) + elif len(self.missing_windows) > 0: + # Finalize a pending window + if self.missing_windows[-1].end == None: + self.missing_windows[-1].end = datetime + + # Delete out-to-date windows + if len(self.missing_windows) > 0 and self.missing_windows[0].end != None: + if self.missing_windows[0].end <= self.datetime_oldest: + self.missing_windows = self.missing_windows[1:] + + def datetime_to_index(self, datetime: datetime) -> int: + assert datetime >= self.datetime_oldest + return self.wrap(int(abs((self.window_start - datetime).total_seconds()))) + + def window(self, start: datetime, end: datetime) -> Any: + assert start < end + + start_index = self.datetime_to_index(start) + end_index = self.datetime_to_index(end) + + # Requested window wraps around the ends + if start_index > end_index: + slice = self.buffer[start_index:] + + if end_index > 0: + if type(self.buffer) == list: + slice += self.buffer[0:end_index] + else: + slice = np.concatenate((slice, self.buffer[0:end_index])) + return slice + + + def in_window(window): + if window.start < start and window.end > start: + return True + if window.start < end and window.end > end: + return True + + return False + + # Return a copy if there are none-values in the data + if any(map(in_window, self.missing_windows)): + import copy + return copy.deepcopy(self.buffer[start_index:end_index]) + + return self.buffer[start_index:end_index] + + def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: + return self.buffer[index_or_slice] diff --git a/tests/utils/ringbuffer.py b/tests/utils/ringbuffer.py new file mode 100644 index 000000000..ac39b29ec --- /dev/null +++ b/tests/utils/ringbuffer.py @@ -0,0 +1,134 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +""" +Tests for the `Ringbuffer` class +""" + +from frequenz.sdk.util.ringbuffer import RingBuffer, ArrayContainer, ListContainer, OrderedRingBuffer + +from datetime import datetime, timedelta + +import random + +def _test_simple_push_pop(container): + buffer = RingBuffer(container) + + for i in range(len(container)): + buffer.push(i) + + assert len(buffer) == len(container) + + for i in range(len(container)): + assert i == buffer.pop() + +def test_simple_push_pop_list() -> None: + _test_simple_push_pop(ListContainer(50000)) + +def test_simple_push_pop_array() -> None: + _test_simple_push_pop(ArrayContainer(50000)) + + +def _test_push_pop_over_limit(container): + buffer = RingBuffer(container) + + over_limit_pushes = 1000 + + for i in range(len(container) + over_limit_pushes): + buffer.push(i) + + assert len(buffer) == len(container) + + for i in range(len(container)): + assert i + over_limit_pushes == buffer.pop() + + assert len(buffer) == 0 + +def test_push_pop_over_limit_list() -> None: + _test_push_pop_over_limit(ListContainer(50000)) +def test_push_pop_over_limit_array() -> None: + _test_push_pop_over_limit(ArrayContainer(50000)) + + + +def _test_slicing(container): + buffer = RingBuffer(container) + + for i in range(len(container)): + buffer.push(i) + + # Wrap in extra list() otherwise pytest complains about numpy arrays + assert list(buffer[0:]) == list(range(len(container))) + +def test_simple_slicing_list() -> None: + _test_slicing(ListContainer(2000)) +def test_simple_slicing_array() -> None: + _test_slicing(ArrayContainer(2000)) + + +from typing import TypeVar, Type, Any + +def _test_timestamp_ringbuffer(buffer, make_container: Any) -> None: + size = buffer.max_size() + +# import pdb; pdb.set_trace() + random.seed(0) + + # Push in random order + for i in random.sample(range(size), size): + buffer.update(datetime.fromtimestamp(200 + i), i) + + + # Check all possible window sizes and start positions + for i in range(size): + for j in range(1, size): + start = datetime.fromtimestamp(200 + i) + end = datetime.fromtimestamp(200 + j + i) + from itertools import cycle, islice + tmp = list(islice(cycle(range(0, size)), i, i+j)) + assert list(buffer.window(start, end)) == list(tmp) + +#map(lambda x: datetime(1, 1, 1, second=x), range(size)) + + +def test_timestamp_ringbuffer_list() -> None: + _test_timestamp_ringbuffer(OrderedRingBuffer([0] * 24, 1), lambda x: list(x)) + +def test_timestamp_ringbuffer_array() -> None: + import numpy as np + _test_timestamp_ringbuffer(OrderedRingBuffer(np.empty(shape=(24,), dtype=np.float64), 1), lambda x: np.array(x)) + +def _test_timestamp_ringbuffer_overwrite(buffer, make_container: Any) -> None: + size = buffer.max_size() + +# import pdb; pdb.set_trace() + random.seed(0) + + # Push in random order + for i in random.sample(range(size), size): + buffer.update(datetime.fromtimestamp(200 + i), i) + + # Push the same amount twice + for i in random.sample(range(size), size): + buffer.update(datetime.fromtimestamp(200 + i), i*2) + + # Check all possible window sizes and start positions + for i in range(size): + for j in range(1, size): + start = datetime.fromtimestamp(200 + i) + end = datetime.fromtimestamp(200 + j + i) + from itertools import cycle, islice + tmp = islice(cycle(range(0, size*2, 2)), i, i+j) + #assert list(buffer.window(start, end)) == list(tmp) + for actual, expectation in zip(buffer.window(start, end), tmp): + assert actual == expectation + + assert j == len(buffer.window(start, end)) + + + +def test_timestamp_ringbuffer_overwrite_list() -> None: + _test_timestamp_ringbuffer_overwrite(OrderedRingBuffer([0] * 24, 1), lambda x: list(x)) +def test_timestamp_ringbuffer_overwrite_array() -> None: + import numpy as np + _test_timestamp_ringbuffer_overwrite(OrderedRingBuffer(np.empty(shape=(24,), dtype=np.float64), 1), lambda x: np.array(x))