Skip to content

Commit

Permalink
Simple ringbuffer with tests
Browse files Browse the repository at this point in the history
Signed-off-by: Marenz <mathias.baumann@frequenz.com>
  • Loading branch information
Marenz authored and Mathias L. Baumann committed Dec 20, 2022
1 parent ef43a71 commit 2057ab9
Show file tree
Hide file tree
Showing 3 changed files with 537 additions and 0 deletions.
135 changes: 135 additions & 0 deletions benchmarks/benchmark_ringbuffer.py
@@ -0,0 +1,135 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""
Performance test for the `Ringbuffer` class
"""

import random
from datetime import datetime, timedelta

import numpy as np

from frequenz.sdk.util.ringbuffer import (
ArrayContainer,
ListContainer,
OrderedRingBuffer,
RingBuffer,
)

MINUTES_IN_29_DAYS = 29 * 24 * 60


def fill_buffer(days, buffer):
"""Fill the given buffer up to the given amount of days, one sample per
minute"""
random.seed(0)
basetime = datetime(2022, 1, 1)

for day in range(days):
# Push in random order
for i in random.sample(range(24 * 60), 24 * 60):
buffer.update(basetime + timedelta(days=day, minutes=i, seconds=i % 3), i)


# Fills a buffer completely up and then gets the data for each of the 29 days
def test_days(days, buffer):
print(".", end="", flush=True)

fill_buffer(days, buffer)

basetime = datetime(2022, 1, 1)

for day in range(days):
minutes = buffer.window(
basetime + timedelta(days=day), basetime + timedelta(days=day + 1)
)

# Takes a buffer, fills it up and then excessively gets the data for each day to
# calculate the average/median.
def test_slices(days, buffer):
print(".", end="", flush=True)
fill_buffer(days, buffer)

# Chose uneven starting point so that for the first/last window data has to
# be copied
basetime = datetime(2022, 1, 1, 0, 5, 13, 88)

total = 0

for _ in range(5):
for day in range(days):
minutes = buffer.window(
basetime + timedelta(days=day), basetime + timedelta(days=day + 1)
)

total += np.average(minutes)
(f"{basetime + timedelta(days=day)} average {np.average(minutes)}")
(f"{basetime + timedelta(days=day)} median {np.median(minutes)}")
# print(total)


# Fills a buffer completely up and then gets the data for each of the 29 days
def test_29_days_list():
test_days(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, 60))


def test_29_days_array():
test_days(
29,
OrderedRingBuffer(
np.empty(
shape=MINUTES_IN_29_DAYS,
),
60,
),
)


def test_29_days_slicing_list():
test_slices(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, 60))


def test_29_days_slicing_array():
test_slices(
29,
OrderedRingBuffer(
np.empty(
shape=MINUTES_IN_29_DAYS,
),
60,
),
)


import timeit

num_runs = 40

print(f" {''.join(['='] * (num_runs + 1))}")
print("Array: ", end="")
duration_array = timeit.Timer(test_29_days_array).timeit(number=num_runs)
print("\nList: ", end="")
duration_list = timeit.Timer(test_29_days_list).timeit(number=num_runs)
print("")

print(
f"Time to fill 29 days with data:\n\t"
+ f"Array: {duration_array/num_runs} seconds\n\t"
+ f"List: {duration_list/num_runs} seconds\n\t"
+ f"Diff: {duration_array/num_runs - duration_list/num_runs}"
)

print(f" {''.join(['='] * (num_runs + 1))}")
print("Array: ", end="")
duration_array = timeit.Timer(test_29_days_slicing_array).timeit(number=num_runs)
print("\nList: ", end="")
duration_list = timeit.Timer(test_29_days_slicing_list).timeit(number=num_runs)
print("")

print(
f"Filling 29 days and running average & mean on every day:\n\t"
+ f"Array: {duration_array/num_runs} seconds\n\t"
+ f"List: {duration_list/num_runs} seconds\n\t"
+ f"Diff: {duration_array/num_runs - duration_list/num_runs}"
)
240 changes: 240 additions & 0 deletions src/frequenz/sdk/util/ringbuffer.py
@@ -0,0 +1,240 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""Ringbuffer implementation with focus on time & memory efficiency"""

from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import Any, Generic, Sequence, TypeVar

import numpy as np

T = TypeVar("T")

def RingBufferArray(size: int):
return RingBuffer[T](np.empty(shape=(size,), dtype=np.float64))



class RingBuffer(Generic[T]):
def __init__(self, container: Any):
"""Initialize the ring buffer with the given container.
Args:
container: Container to store the data in.
"""
self.container = container
self.write_index = 0
self.read_index = 0
self.size = 0

def __len__(self):
"""Return the amount of items that this container currently holds."""
return self.size

def max_size(self):
"""Return the max amount of items this container can hold"""
return len(self.container)

def push(self, value: T) -> int:
"""Pushes a new value into the ring buffer, returns the index in the
ringbuffer.
Args:
value: value to push into the ring buffer.
"""

if self.size == len(self.container):
# Move read position one forward, dropping the oldest written value
self.read_index = (self.read_index + 1) % len(self.container)
else:
self.size += 1

self.container[self.write_index] = value
value_index = self.write_index
self.write_index = (self.write_index + 1) % len(self.container)

return value_index

def pop(self) -> T:
"""Remove the oldest value from the ring buffer and return it."""

if self.size == 0:
raise

val = self.container[self.read_index]
self.read_index = (self.read_index + 1) % len(self.container)
self.size -= 1

return val

def wrap(self, index) -> int:
return index % len(self.container)

def full(self) -> bool:
return len(self) == len(self.container)

def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None:
self.container[index] = value

def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
"""Returns the value at the given index or value range at the given
slice. Does not support wrap-around or copying of data."""
return self.container[index_or_slice]


class OrderedRingBuffer(Generic[T]):
"""Time aware ringbuffer that keeps its entries sorted time."""

def __init__(
self,
buffer: Any,
resolution_in_seconds: int,
window_border: datetime = datetime(1, 1, 1),
) -> None:
"""Initialize the time aware ringbuffer.
Args:
buffer: instance of a buffer container to use internally
resolution_in_seconds: resolution of the incoming timestamps in
seconds
window_border: datetime depicting point in time to use as border
beginning, useful to make data start at the beginning of the day or
hour.
"""
self.buffer = buffer
self.resolution_in_seconds = resolution_in_seconds
self.window_start = window_border

self.missing_windows = []
self.datetime_newest = datetime.min
self.datetime_oldest = datetime.max

def max_size(self):
"""Return the max amount of items this container can hold."""
return len(self.buffer)

def update(self, datetime: datetime, value: T, missing: bool = False):
"""Update the buffer with a new value for the given timestamp.
Args:
datetime: Timestamp of the new value
value: value to add
missing: if true, the given timestamp will be recorded as missing.
The value will still be written.
"""
# Update timestamps
if self.datetime_newest < datetime:
self.datetime_newest = datetime

if self.datetime_oldest > datetime:
self.datetime_oldest = datetime
elif self.datetime_oldest < self.datetime_newest - timedelta(
seconds=len(self.buffer) * self.resolution_in_seconds
):
self.datetime_oldest = self.datetime_newest - timedelta(
len(self.buffer) * self.resolution_in_seconds
)

# Update data
insert_index = self.datetime_to_index(datetime)

self.buffer[insert_index] = value

# Update list of missing windows
#
# We always append to the last pending window.
# A window is pending when end == None
if missing:
# Create new if no pending window
if len(self.missing_windows) == 0 or self.missing_windows[-1].end != None:
self.missing_windows.append({"start": datetime, "end": None})
elif len(self.missing_windows) > 0:
# Finalize a pending window
if self.missing_windows[-1].end == None:
self.missing_windows[-1].end = datetime

# Delete out-to-date windows
if len(self.missing_windows) > 0 and self.missing_windows[0].end != None:
if self.missing_windows[0].end <= self.datetime_oldest:
self.missing_windows = self.missing_windows[1:]

def datetime_to_index(self, datetime: datetime) -> int:
"""Convert the given timestamp to an index.
Throws an index error when the timestamp is not found within this
buffer.
Args:
datetime: Timestamp to convert.
Returns:
index where the value for the given timestamp can be found.
"""
if datetime < self.datetime_oldest:
raise IndexError()

return self.wrap(int(abs((self.window_start - datetime).total_seconds())))

def window(self, start: datetime, end: datetime) -> Any:
"""Request a view on the data between start timestamp and end timestamp.
Will return a copy in the following cases:
* The requested time period is crossing the start/end of the buffer
* The requested time period contains missing entries.
This means, if the caller needs to modify the data to account for
missing entries, they can safely do so.
Args:
start: start time of the window
end: end time of the window
Returns:
the requested window
"""

assert start < end

start_index = self.datetime_to_index(start)
end_index = self.datetime_to_index(end)

# Requested window wraps around the ends
if start_index > end_index:
slice = self.buffer[start_index:]

if end_index > 0:
if type(self.buffer) == list:
slice += self.buffer[0:end_index]
else:
slice = np.concatenate((slice, self.buffer[0:end_index]))
return slice

def in_window(window):
if window.start < start and window.end > start:
return True
if window.start < end and window.end > end:
return True

return False

# Return a copy if there are none-values in the data
if any(map(in_window, self.missing_windows)):
import copy

return copy.deepcopy(self.buffer[start_index:end_index])

return self.buffer[start_index:end_index]

def wrap(self, index: int):
"""Normalizes the given index to fit in the buffer by wrapping it
around.
Args:
index: index to normalize.
Returns:
an index that will be within max_size.
"""
return index % self.max_size()

def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
return self.buffer[index_or_slice]

0 comments on commit 2057ab9

Please sign in to comment.