-
Notifications
You must be signed in to change notification settings - Fork 17
/
ringbuffer.py
111 lines (78 loc) · 3.07 KB
/
ringbuffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
"""Ringbuffer implementation with focus on time & memory efficiency"""
from typing import TypeVar, Generic, Sequence
from abc import ABC, abstractmethod
from collections import deque
from enum import Enum
import numpy as np
from itertools import chain
T = TypeVar('T')
class AbstractRingBufferContainer(Generic[T], ABC):
def __init__(self, size):
self.size = size
@abstractmethod
def write(self, value: T, index) -> None:
"""Write to value to the requested index"""
pass
@abstractmethod
def __getitem__(self, index):
"""Returns the value at the requested index"""
pass
def __len__(self):
return self.size
class RingBuffer(Generic[T]):
# class syntax
class Overwrite(Enum):
GREEN = 2
BLUE = 3
def __init__(self, container: AbstractRingBufferContainer[T]):
self.container = container
self.write_index = 0
self.read_index = 0
self.size = 0
def __len__(self):
return self.size
def push(self, value: T) -> int:
"""Pushes a new value into the ring buffer, returns the index in the ringbuffer"""
if self.size == self.container.size:
# Move read position one forward, dropping the oldest written value
self.read_index = (self.read_index + 1) % self.container.size
else:
self.size += 1
self.container.write(value, self.write_index)
value_index = self.write_index
self.write_index = (self.write_index + 1) % self.container.size
return value_index
def pop(self) -> T:
"""Remove the oldest value from the ring buffer and return it"""
if self.size == 0:
raise
val = self.container[self.read_index]
self.read_index = (self.read_index + 1) % self.container.size
self.size -= 1;
return val
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
"""Returns the value at the given index or value range at the given slice"""
return self.container[index_or_slice]
class ListContainer(AbstractRingBufferContainer, ABC):
def __init__(self, size: int):
AbstractRingBufferContainer.__init__(self, size)
self.data = list([T] * size)
def write(self, value: T, index: int) -> None:
""""""
self.data[index] = value
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
return self.data.__getitem__(index_or_slice)
class ArrayContainer(AbstractRingBufferContainer, ABC):
def __init__(self, size: int):
AbstractRingBufferContainer.__init__(self, size)
if (T == float):
self.data = np.empty(shape=(size,), dtype=np.float64)
else:
self.data = np.empty(shape=(size,), dtype=object)
def write(self, value: T, index: int) -> None:
""""""
self.data[index] = value
def __getitem__(self, index_or_slice: int | slice) -> T | Sequence[T]:
return self.data.__getitem__(index_or_slice)