Skip to content

Commit

Permalink
Simple ringbuffer with tests
Browse files Browse the repository at this point in the history
Signed-off-by: Marenz <mathias.baumann@frequenz.com>
  • Loading branch information
Marenz authored and Mathias L. Baumann committed Dec 22, 2022
1 parent ef43a71 commit 123807e
Show file tree
Hide file tree
Showing 2 changed files with 412 additions and 0 deletions.
275 changes: 275 additions & 0 deletions src/frequenz/sdk/util/ringbuffer.py
@@ -0,0 +1,275 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""Ringbuffer implementation with focus on time & memory efficiency."""

from __future__ import annotations

from copy import deepcopy
from datetime import datetime, timedelta
from typing import Any, Generic, Sequence, TypeVar

import numpy as np

T = TypeVar("T")

Container = list | np.ndarray


class RingBuffer(Generic[T]):
"""A ring buffer with a fixed size.
Should work with most backends, tested with list and np.ndarrays.
"""

class NoElements(Exception):
"""Exception class, thrown when no elements are available when calling pop()."""

def __init__(self, container: Container) -> None:
"""Initialize the ring buffer with the given container.
Args:
container: Container to store the data in.
"""
self._container = container
self._write_index = 0
self._read_index = 0
self._len = 0

def __len__(self) -> int:
"""Return the amount of items that this container currently holds."""
return self._len

@property
def maxlen(self) -> int:
"""Return the max amount of items this container can hold."""
return len(self._container)

def push(self, value: T) -> int:
"""Push a new value into the ring buffer.
Args:
value: value to push into the ring buffer.
Returns:
the index in the ringbuffer.
"""
if self._len == len(self._container):
# Move read position one forward, dropping the oldest written value
self._read_index = self._wrap(self._read_index + 1)
else:
self._len += 1

self._container[self._write_index] = value
value_index = self._write_index
self._write_index = self._wrap(self._write_index + 1)

return value_index

def pop(self) -> T:
"""Remove the oldest value from the ring buffer and return it."""
if self._len == 0:
raise RingBuffer.NoElements()

val = self._container[self._read_index]
self._read_index = (self._read_index + 1) % len(self._container)
self._len -= 1

return val

@property
def full(self) -> bool:
"""Return true when the container is full."""
return len(self) == len(self._container)

def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None:
"""Write the given value to the requested position.
Args:
index: Position to write the value to.
value: Value to write.
"""
self._container[index] = value

def __getitem__(self, index_or_slice: int | slice) -> T | Container:
"""Return the value at the given index or value range at the given slice.
Does not support wrap-around or copying of data.
"""
return self._container[index_or_slice]

def _wrap(self, index: int) -> int:
return index % len(self._container)


class OrderedRingBuffer(Generic[T]):
"""Time aware ringbuffer that keeps its entries sorted time."""

def __init__(
self,
buffer: Any,
resolution_in_seconds: int,
window_border: datetime = datetime(1, 1, 1),
) -> None:
"""Initialize the time aware ringbuffer.
Args:
buffer: instance of a buffer container to use internally
resolution_in_seconds: resolution of the incoming timestamps in
seconds
window_border: datetime depicting point in time to use as border
beginning, useful to make data start at the beginning of the day or
hour.
"""
self._buffer = buffer
self._resolution_in_seconds = resolution_in_seconds
self._window_start = window_border

self._missing_windows = []
self._datetime_newest = datetime.min
self._datetime_oldest = datetime.max

@property
def maxlen(self) -> int:
"""Return the max amount of items this container can hold."""
return len(self._buffer)

def update(self, timestamp: datetime, value: T, missing: bool = False) -> None:
"""Update the buffer with a new value for the given timestamp.
Args:
datetime: Timestamp of the new value
value: value to add
missing: if true, the given timestamp will be recorded as missing.
The value will still be written.
"""
# Update timestamps
self._datetime_newest = max(self._datetime_newest, timestamp)
self._datetime_oldest = min(self._datetime_oldest, timestamp)

if self._datetime_oldest < self._datetime_newest - timedelta(
seconds=len(self._buffer) * self._resolution_in_seconds
):
self._datetime_oldest = self._datetime_newest - timedelta(
len(self._buffer) * self._resolution_in_seconds
)

# Update data
insert_index = self.datetime_to_index(timestamp)

self._buffer[insert_index] = value

# Update list of missing windows
#
# We always append to the last pending window.
# A window is pending when end is None
if missing:
# Create new if no pending window
if (
len(self._missing_windows) == 0
or self._missing_windows[-1].end is not None
):
self._missing_windows.append({"start": timestamp, "end": None})
elif len(self._missing_windows) > 0:
# Finalize a pending window
if self._missing_windows[-1].end is None:
self._missing_windows[-1].end = timestamp

# Delete out-to-date windows
if len(self._missing_windows) > 0 and self._missing_windows[0].end is not None:
if self._missing_windows[0].end <= self._datetime_oldest:
self._missing_windows = self._missing_windows[1:]

def datetime_to_index(self, timestamp: datetime) -> int:
"""Convert the given timestamp to an index.
Throws an index error when the timestamp is not found within this
buffer.
Args:
datetime: Timestamp to convert.
Returns:
index where the value for the given timestamp can be found.
"""
if timestamp < self._datetime_oldest:
raise IndexError(
f"Requested timestamp {timestamp} is older "
f"than the oldest this container holds: {self._datetime_oldest}"
)

return self._wrap(int(abs((self._window_start - timestamp).total_seconds())))

def window(self, start: datetime, end: datetime) -> Container:
"""Request a view on the data between start timestamp and end timestamp.
Will return a copy in the following cases:
* The requested time period is crossing the start/end of the buffer
* The requested time period contains missing entries.
This means, if the caller needs to modify the data to account for
missing entries, they can safely do so.
Args:
start: start time of the window
end: end time of the window
Returns:
the requested window
"""
assert start < end

start_index = self.datetime_to_index(start)
end_index = self.datetime_to_index(end)

# Requested window wraps around the ends
if start_index > end_index:
window = self._buffer[start_index:]

if end_index > 0:
if isinstance(self._buffer, list):
window += self._buffer[0:end_index]
else:
window = np.concatenate((window, self._buffer[0:end_index]))
return window

def in_window(window):
if window.start <= start < window.end:
return True
if window.start <= end < window.end:
return True

return False

# Return a copy if there are none-values in the data
if any(map(in_window, self._missing_windows)):
return deepcopy(self._buffer[start_index:end_index])

return self._buffer[start_index:end_index]

def _wrap(self, index: int) -> int:
"""Normalize the given index to fit in the buffer by wrapping it around.
Args:
index: index to normalize.
Returns:
an index that will be within max_size.
"""
return index % self.maxlen

def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
"""Get item or slice at requested position."""
return self._buffer[index_or_slice]

def __len__(self) -> int:
"""Return the amount of items that this container currently holds."""
if self._datetime_newest == datetime.min:
return 0

start_index = self.datetime_to_index(self._datetime_oldest)
end_index = self.datetime_to_index(self._datetime_newest)

if end_index < start_index:
return len(self._buffer) - start_index + end_index
return start_index - end_index

0 comments on commit 123807e

Please sign in to comment.