From 16ad023cfe3a9883bd18766ad94f5c867b244e23 Mon Sep 17 00:00:00 2001 From: Marenz Date: Tue, 13 Dec 2022 17:40:00 +0100 Subject: [PATCH] Simple ringbuffer with tests Signed-off-by: Marenz --- src/frequenz/sdk/util/ringbuffer.py | 111 ++++++++++++++++++++++++++++ tests/utils/ringbuffer.py | 44 +++++++++++ 2 files changed, 155 insertions(+) create mode 100644 src/frequenz/sdk/util/ringbuffer.py create mode 100644 tests/utils/ringbuffer.py diff --git a/src/frequenz/sdk/util/ringbuffer.py b/src/frequenz/sdk/util/ringbuffer.py new file mode 100644 index 000000000..95588acc3 --- /dev/null +++ b/src/frequenz/sdk/util/ringbuffer.py @@ -0,0 +1,111 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Ringbuffer implementation with focus on time & memory efficiency""" + +from typing import TypeVar, Generic, Sequence +from abc import ABC, abstractmethod + +from collections import deque + +from enum import Enum + +import numpy as np +from itertools import chain + + +T = TypeVar('T') + +class AbstractRingBufferContainer(Generic[T], ABC): + def __init__(self, size): + self.size = size + + @abstractmethod + def write(self, value: T, index) -> None: + """Write to value to the requested index""" + pass + + @abstractmethod + def __getitem__(self, index): + """Returns the value at the requested index""" + pass + + def __len__(self): + return self.size + +class RingBuffer(Generic[T]): + + # class syntax + class Overwrite(Enum): + GREEN = 2 + BLUE = 3 + + def __init__(self, container: AbstractRingBufferContainer[T]): + self.container = container + self.write_index = 0 + self.read_index = 0 + self.size = 0 + + def __len__(self): + return self.size + + def push(self, value: T) -> int: + """Pushes a new value into the ring buffer, returns the index in the ringbuffer""" + + if self.size == self.container.size: + # Move read position one forward, dropping the oldest written value + self.read_index = (self.read_index + 1) % self.container.size + else: + self.size += 1 + + self.container.write(value, self.write_index) + value_index = self.write_index + self.write_index = (self.write_index + 1) % self.container.size + + return value_index + + def pop(self) -> T: + """Remove the oldest value from the ring buffer and return it""" + + if self.size == 0: + raise + + val = self.container[self.read_index] + self.read_index = (self.read_index + 1) % self.container.size + self.size -= 1; + + return val + + def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: + """Returns the value at the given index or value range at the given slice""" + return self.container[index_or_slice] + + +class ListContainer(AbstractRingBufferContainer, ABC): + def __init__(self, size: int): + AbstractRingBufferContainer.__init__(self, size) + self.data = list([T] * size) + + + def write(self, value: T, index: int) -> None: + """""" + self.data[index] = value + + def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: + return self.data.__getitem__(index_or_slice) + + +class ArrayContainer(AbstractRingBufferContainer, ABC): + def __init__(self, size: int): + AbstractRingBufferContainer.__init__(self, size) + if (T == float): + self.data = np.empty(shape=(size,), dtype=np.float64) + else: + self.data = np.empty(shape=(size,), dtype=object) + + def write(self, value: T, index: int) -> None: + """""" + self.data[index] = value + + def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: + return self.data.__getitem__(index_or_slice) diff --git a/tests/utils/ringbuffer.py b/tests/utils/ringbuffer.py new file mode 100644 index 000000000..74e85765e --- /dev/null +++ b/tests/utils/ringbuffer.py @@ -0,0 +1,44 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +""" +Tests for the `Ringbuffer` class +""" + +from frequenz.sdk.util.ringbuffer import RingBuffer, ArrayContainer, ListContainer + +def _test_simple_push_pop(container): + buffer = RingBuffer(container) + + for i in range(len(container)): + buffer.push(i) + + assert len(buffer) == len(container) + + for i in range(len(container)): + assert i == buffer.pop() + +def test_simple_push_pop() -> None: + _test_simple_push_pop(ListContainer(50000)) + _test_simple_push_pop(ArrayContainer(50000)) + + + +def _test_push_pop_over_limit(container): + buffer = RingBuffer(container) + + over_limit_pushes = 1000 + + for i in range(len(container) + over_limit_pushes): + buffer.push(i) + + assert len(buffer) == len(container) + + for i in range(len(container)): + assert i + over_limit_pushes == buffer.pop() + + assert len(buffer) == 0 + +def test_push_pop_over_limit() -> None: + _test_push_pop_over_limit(ListContainer(50000)) + _test_push_pop_over_limit(ArrayContainer(50000))