Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Marenz <mathias.baumann@frequenz.com>
- Loading branch information
Showing
2 changed files
with
155 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
# License: MIT | ||
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH | ||
|
||
"""Ringbuffer implementation with focus on time & memory efficiency""" | ||
|
||
from typing import TypeVar, Generic, Sequence | ||
from abc import ABC, abstractmethod | ||
|
||
from collections import deque | ||
|
||
from enum import Enum | ||
|
||
import numpy as np | ||
from itertools import chain | ||
|
||
|
||
T = TypeVar('T') | ||
|
||
class AbstractRingBufferContainer(Generic[T], ABC): | ||
def __init__(self, size): | ||
self.size = size | ||
|
||
@abstractmethod | ||
def write(self, value: T, index) -> None: | ||
"""Write to value to the requested index""" | ||
pass | ||
|
||
@abstractmethod | ||
def __getitem__(self, index): | ||
"""Returns the value at the requested index""" | ||
pass | ||
|
||
def __len__(self): | ||
return self.size | ||
|
||
class RingBuffer(Generic[T]): | ||
|
||
# class syntax | ||
class Overwrite(Enum): | ||
GREEN = 2 | ||
BLUE = 3 | ||
|
||
def __init__(self, container: AbstractRingBufferContainer[T]): | ||
self.container = container | ||
self.write_index = 0 | ||
self.read_index = 0 | ||
self.size = 0 | ||
|
||
def __len__(self): | ||
return self.size | ||
|
||
def push(self, value: T) -> int: | ||
"""Pushes a new value into the ring buffer, returns the index in the ringbuffer""" | ||
|
||
if self.size == self.container.size: | ||
# Move read position one forward, dropping the oldest written value | ||
self.read_index = (self.read_index + 1) % self.container.size | ||
else: | ||
self.size += 1 | ||
|
||
self.container.write(value, self.write_index) | ||
value_index = self.write_index | ||
self.write_index = (self.write_index + 1) % self.container.size | ||
|
||
return value_index | ||
|
||
def pop(self) -> T: | ||
"""Remove the oldest value from the ring buffer and return it""" | ||
|
||
if self.size == 0: | ||
raise | ||
|
||
val = self.container[self.read_index] | ||
self.read_index = (self.read_index + 1) % self.container.size | ||
self.size -= 1; | ||
|
||
return val | ||
|
||
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: | ||
"""Returns the value at the given index or value range at the given slice""" | ||
return self.container[index_or_slice] | ||
|
||
|
||
class ListContainer(AbstractRingBufferContainer, ABC): | ||
def __init__(self, size: int): | ||
AbstractRingBufferContainer.__init__(self, size) | ||
self.data = list([T] * size) | ||
|
||
|
||
def write(self, value: T, index: int) -> None: | ||
"""""" | ||
self.data[index] = value | ||
|
||
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: | ||
return self.data.__getitem__(index_or_slice) | ||
|
||
|
||
class ArrayContainer(AbstractRingBufferContainer, ABC): | ||
def __init__(self, size: int): | ||
AbstractRingBufferContainer.__init__(self, size) | ||
if (T == float): | ||
self.data = np.empty(shape=(size,), dtype=np.float64) | ||
else: | ||
self.data = np.empty(shape=(size,), dtype=object) | ||
|
||
def write(self, value: T, index: int) -> None: | ||
"""""" | ||
self.data[index] = value | ||
|
||
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]: | ||
return self.data.__getitem__(index_or_slice) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# License: MIT | ||
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH | ||
|
||
""" | ||
Tests for the `Ringbuffer` class | ||
""" | ||
|
||
from frequenz.sdk.util.ringbuffer import RingBuffer, ArrayContainer, ListContainer | ||
|
||
def _test_simple_push_pop(container): | ||
buffer = RingBuffer(container) | ||
|
||
for i in range(len(container)): | ||
buffer.push(i) | ||
|
||
assert len(buffer) == len(container) | ||
|
||
for i in range(len(container)): | ||
assert i == buffer.pop() | ||
|
||
def test_simple_push_pop() -> None: | ||
_test_simple_push_pop(ListContainer(50000)) | ||
_test_simple_push_pop(ArrayContainer(50000)) | ||
|
||
|
||
|
||
def _test_push_pop_over_limit(container): | ||
buffer = RingBuffer(container) | ||
|
||
over_limit_pushes = 1000 | ||
|
||
for i in range(len(container) + over_limit_pushes): | ||
buffer.push(i) | ||
|
||
assert len(buffer) == len(container) | ||
|
||
for i in range(len(container)): | ||
assert i + over_limit_pushes == buffer.pop() | ||
|
||
assert len(buffer) == 0 | ||
|
||
def test_push_pop_over_limit() -> None: | ||
_test_push_pop_over_limit(ListContainer(50000)) | ||
_test_push_pop_over_limit(ArrayContainer(50000)) |