Skip to content

Commit

Permalink
Simple ringbuffer with tests
Browse files Browse the repository at this point in the history
Signed-off-by: Marenz <mathias.baumann@frequenz.com>
  • Loading branch information
Marenz authored and Mathias L. Baumann committed Dec 20, 2022
1 parent ef43a71 commit 96de0fa
Show file tree
Hide file tree
Showing 3 changed files with 499 additions and 0 deletions.
107 changes: 107 additions & 0 deletions benchmarks/benchmark_ringbuffer.py
@@ -0,0 +1,107 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""
Performance test for the `Ringbuffer` class
"""

from frequenz.sdk.util.ringbuffer import RingBuffer, ArrayContainer, ListContainer, OrderedRingBuffer

import numpy as np
import random
from datetime import datetime, timedelta

MINUTES_IN_29_DAYS=29 * 24 * 60

def fill_buffer(days, buffer):
"""Fill the given buffer up to the given amount of days, one sample per
minute"""
random.seed(0)
basetime = datetime(2022, 1, 1)

for day in range(days):
# Push in random order
for i in random.sample(range(24 * 60), 24 * 60):
buffer.update(basetime + timedelta(days=day, minutes=i, seconds=i%3), i)


# Fills a buffer completely up and then gets the data for each of the 29 days
def test_days(days, buffer):
print(".", end="", flush=True)

fill_buffer(days, buffer)

basetime = datetime(2022, 1, 1)

for day in range(days):
minutes = (buffer.window(basetime + timedelta(days=day),
basetime+timedelta(days=day+1)))

#(f"{basetime + timedelta(days=day)} median {np.average(minutes)}")
#print("%s avg: %s" % (basetime + timedelta(days=day),
# sum(minutes) / len(minutes)))

# Takes a buffer, fills it up and then excessively gets the data for each day to
# calculate the average/median.
def test_slices(days, buffer):
print(".", end="", flush=True)
fill_buffer(days, buffer)

# Chose uneven starting point so that for the first/last window data has to
# be copied
basetime = datetime(2022, 1, 1, 0, 5, 13, 88)

total = 0

for _ in range(5):
for day in range(days):
minutes = (buffer.window(basetime + timedelta(days=day),
basetime+timedelta(days=day+1)))


total += np.average(minutes)
(f"{basetime + timedelta(days=day)} average {np.average(minutes)}")
(f"{basetime + timedelta(days=day)} median {np.median(minutes)}")
#print(total)

# Fills a buffer completely up and then gets the data for each of the 29 days
def test_29_days_list():
test_days(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, 60))

def test_29_days_array():
test_days(29, OrderedRingBuffer(np.empty(shape=MINUTES_IN_29_DAYS,), 60))


def test_29_days_slicing_list():
test_slices(29, OrderedRingBuffer([0] * MINUTES_IN_29_DAYS, 60))

def test_29_days_slicing_array():
test_slices(29, OrderedRingBuffer(np.empty(shape=MINUTES_IN_29_DAYS,), 60))
import timeit

num_runs = 40

print(f" {''.join(['='] * (num_runs + 1))}")
print("Array: ", end="")
duration = timeit.Timer(test_29_days_array).timeit(number = num_runs)
print("\nList: ", end="")
durationa = timeit.Timer(test_29_days_list).timeit(number = num_runs)
print("")

print(f'Time to fill 29 days with data:\n\t' +
f'Array: {duration/num_runs} seconds\n\t' +
f'List: {durationa/num_runs} seconds\n\t' +
f'Diff: {duration/num_runs - durationa/num_runs}')

print(f" {''.join(['='] * (num_runs + 1))}")
print("Array: ", end="")
duration = timeit.Timer(test_29_days_slicing_array).timeit(number = num_runs)
print("\nList: ", end="")
durationa = timeit.Timer(test_29_days_slicing_list).timeit(number = num_runs)
print("")

print(f'Filling 29 days and running average & mean on every day:\n\t' +
f'Array: {duration/num_runs} seconds\n\t' +
f'List: {durationa/num_runs} seconds\n\t' +
f'Diff: {duration/num_runs - durationa/num_runs}')

258 changes: 258 additions & 0 deletions src/frequenz/sdk/util/ringbuffer.py
@@ -0,0 +1,258 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""Ringbuffer implementation with focus on time & memory efficiency"""

from typing import TypeVar, Generic, Sequence, MutableSequence, Optional, Any
from abc import ABC, abstractmethod

from collections import deque

from enum import Enum, auto

import numpy as np
from itertools import chain

T = TypeVar('T')

class AbstractRingBufferContainer(Generic[T], ABC):
def __init__(self, size):
self.size = size

@abstractmethod
def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None:
"""Write to value to the requested index.
Args:
index: Position to write the value at.
value: Value to write.
"""
pass

@abstractmethod
def __getitem__(self, index: int | slice) -> T:
"""Returns the value at the requested index.
Args:
index: Position to write the value at.
Returns:
The requested item.
"""
pass

def __len__(self):
return self.size

class RingBuffer(Generic[T]):

def __init__(self, container: AbstractRingBufferContainer[T]):
"""Initialize the ring buffer with the given container.
Args:
container: Container to store the data in.
"""
self.container = container
self.write_index = 0
self.read_index = 0
self.size = 0

def __len__(self):
"""Return the amount of items that this container currently holds."""
return self.size

def max_size(self):
"""Return the max amount of items this container can hold"""
return len(self.container)

def push(self, value: T) -> int:
"""Pushes a new value into the ring buffer, returns the index in the
ringbuffer.
Args:
value: value to push into the ring buffer.
"""

if self.size == len(self.container):
# Move read position one forward, dropping the oldest written value
self.read_index = (self.read_index + 1) % self.container.size
else:
self.size += 1

self.container[self.write_index] = value
value_index = self.write_index
self.write_index = (self.write_index + 1) % self.container.size

return value_index

def pop(self) -> T:
"""Remove the oldest value from the ring buffer and return it."""

if self.size == 0:
raise

val = self.container[self.read_index]
self.read_index = (self.read_index + 1) % self.container.size
self.size -= 1;

return val

def wrap(self, index) -> int:
return index % len(self.container)

def full(self) -> bool:
return len(self) == len(self.container)

def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None:
self.container[index] = value

def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
"""Returns the value at the given index or value range at the given
slice. Does not support wrap-around or copying of data."""
return self.container[index_or_slice]


class ListContainer(AbstractRingBufferContainer, ABC):
def __init__(self, size: int):
AbstractRingBufferContainer.__init__(self, size)
self.data = list([T] * size)

def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None:
""""""
self.data.__setitem__(index, value)

def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
return self.data.__getitem__(index_or_slice)


class ArrayContainer(AbstractRingBufferContainer, ABC):
def __init__(self, size: int):
AbstractRingBufferContainer.__init__(self, size)
if (T == float):
self.data = np.empty(shape=(size,), dtype=np.float64)
else:
self.data = np.empty(shape=(size,), dtype=object)

def __setitem__(self, index: int | slice, value: T | Sequence[T]) -> None:
""""""
self.data[index] = value

def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
return self.data.__getitem__(index_or_slice)


from datetime import datetime, timedelta

class OrderedRingBuffer(Generic[T]):
"""Time aware ringbuffer that keeps its entries sorted time.
"""

def __init__(
self,
buffer: Any,
resolution_in_seconds: int,
window_border: datetime = datetime(1, 1, 1),
) -> None:
"""Initialize the time aware ringbuffer.
Args:
buffer: instance of a buffer container to use internally
resolution_in_seconds: resolution of the incoming timestamps in
seconds
window_border: datetime depicting point in time to use as border
beginning, useful to make data start at the beginning of the day or
hour.
"""
self.buffer = buffer
self.resolution_in_seconds = resolution_in_seconds
self.window_start = window_border

self.missing_windows = []
self.datetime_newest = datetime.min
self.datetime_oldest = datetime.max

def max_size(self):
"""Return the max amount of items this container can hold"""
return len(self.buffer)


def wrap(self, value):
return value % self.max_size()

# will be called with None
def update(self, datetime: datetime, value: T, missing: bool = False):
# Update timestamps
if self.datetime_newest < datetime:
self.datetime_newest = datetime

#if self.datetime_newest - timedelta(seconds=self.resolution_in_seconds * self.max_size())

if self.datetime_oldest > datetime:
self.datetime_oldest = datetime
# TODO validate this if
elif self.datetime_oldest < self.datetime_newest - timedelta(seconds=len(self.buffer) * self.resolution_in_seconds):
self.datetime_oldest = self.datetime_newest - timedelta(len(self.buffer) * self.resolution_in_seconds)

# Update data
insert_index = self.datetime_to_index(datetime)

self.buffer[insert_index] = value

# Update missing windows
#
# We always append to the last pending window.
# A window is pending when end == None
if missing:
# Create new if no pending window
if len(self.missing_windows) == 0 or self.missing_windows[-1].end != None:
self.missing_windows.append(
{ 'start': datetime, 'end': None })
elif len(self.missing_windows) > 0:
# Finalize a pending window
if self.missing_windows[-1].end == None:
self.missing_windows[-1].end = datetime

# Delete out-to-date windows
if len(self.missing_windows) > 0 and self.missing_windows[0].end != None:
if self.missing_windows[0].end <= self.datetime_oldest:
self.missing_windows = self.missing_windows[1:]

def datetime_to_index(self, datetime: datetime) -> int:
assert datetime >= self.datetime_oldest
return self.wrap(int(abs((self.window_start - datetime).total_seconds())))

def window(self, start: datetime, end: datetime) -> Any:
assert start < end

start_index = self.datetime_to_index(start)
end_index = self.datetime_to_index(end)

# Requested window wraps around the ends
if start_index > end_index:
slice = self.buffer[start_index:]

if end_index > 0:
if type(self.buffer) == list:
slice += self.buffer[0:end_index]
else:
slice = np.concatenate((slice, self.buffer[0:end_index]))
return slice


def in_window(window):
if window.start < start and window.end > start:
return True
if window.start < end and window.end > end:
return True

return False

# Return a copy if there are none-values in the data
if any(map(in_window, self.missing_windows)):
import copy
return copy.deepcopy(self.buffer[start_index:end_index])

return self.buffer[start_index:end_index]

def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
return self.buffer[index_or_slice]

0 comments on commit 96de0fa

Please sign in to comment.