Skip to content

Commit

Permalink
Merge branch 'v0.x.x' into simple-ringbuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
mathias-baumann-frequenz committed Dec 27, 2022
2 parents ffa7b7c + 9f074e2 commit 0464117
Show file tree
Hide file tree
Showing 18 changed files with 238 additions and 171 deletions.
4 changes: 0 additions & 4 deletions RELEASE_NOTES.md
Expand Up @@ -8,10 +8,6 @@

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->

- Add fine-grained Result types for the PowerDistributingActor.
Previously Result was one class with many fields. Now each result has its own class
that derives from Result parent class.

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
Expand Down
52 changes: 52 additions & 0 deletions benchmarks/timeseries/resampling.py
@@ -0,0 +1,52 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""Benchmark resampling."""

from datetime import datetime, timedelta, timezone
from timeit import timeit
from typing import Sequence

from frequenz.sdk.timeseries import Sample
from frequenz.sdk.timeseries._resampling import ResamplerConfig, _ResamplingHelper


def nop( # pylint: disable=unused-argument
samples: Sequence[Sample], resampling_period_s: float
) -> float:
"""Return 0.0."""
return 0.0


def _benchmark_resampling_helper(resamples: int, samples: int) -> None:
"""Benchmark the resampling helper."""
helper = _ResamplingHelper(
ResamplerConfig(
resampling_period_s=1.0,
max_data_age_in_periods=3.0,
resampling_function=nop,
initial_buffer_len=samples * 3,
)
)
now = datetime.now(timezone.utc)

def _do_work() -> None:
nonlocal now
for _n_resample in range(resamples):
for _n_sample in range(samples):
now = now + timedelta(seconds=1 / samples)
helper.add_sample(Sample(now, 0.0))
helper.resample(now)

print(timeit(_do_work, number=5))


def _benchmark() -> None:
for resamples in [10, 100, 1000]:
for samples in [10, 100, 1000]:
print(f"{resamples=} {samples=}")
_benchmark_resampling_helper(resamples, samples)


if __name__ == "__main__":
_benchmark()
6 changes: 3 additions & 3 deletions examples/resampling.py
Expand Up @@ -18,7 +18,7 @@
)
from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId
from frequenz.sdk.timeseries import Sample
from frequenz.sdk.timeseries._resampling import Resampler, Sink, Source
from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig, Sink, Source

HOST = "microgrid.sandbox.api.frequenz.io"
PORT = 61060
Expand Down Expand Up @@ -65,7 +65,7 @@ async def run() -> None: # pylint: disable=too-many-locals
channel_registry=channel_registry,
data_sourcing_request_sender=data_source_request_sender,
resampling_request_receiver=resampling_request_receiver,
resampling_period_s=1,
config=ResamplerConfig(resampling_period_s=1),
)

components = await microgrid.get().api_client.components()
Expand Down Expand Up @@ -104,7 +104,7 @@ async def run() -> None: # pylint: disable=too-many-locals
# Create a channel to calculate an average for all the data
average_chan = Broadcast[Sample]("average")

second_stage_resampler = Resampler(resampling_period_s=3.0)
second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0))
second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample)

average_sender = average_chan.new_sender()
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"sympy >= 1.10.1, < 2",
"toml >= 0.10",
"tqdm >= 4.38.0, < 5",
"typing_extensions >= 4.4.0, < 5",
"watchfiles >= 0.15.0",
]
dynamic = [ "version" ]
Expand Down
3 changes: 2 additions & 1 deletion src/frequenz/sdk/actor/__init__.py
Expand Up @@ -7,13 +7,14 @@
from ._config_managing import ConfigManagingActor
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
from ._decorator import actor
from ._resampling import ComponentMetricsResamplingActor
from ._resampling import ComponentMetricsResamplingActor, ResamplerConfig

__all__ = [
"ChannelRegistry",
"ComponentMetricRequest",
"ComponentMetricsResamplingActor",
"ConfigManagingActor",
"DataSourcingActor",
"ResamplerConfig",
"actor",
]
33 changes: 4 additions & 29 deletions src/frequenz/sdk/actor/_resampling.py
Expand Up @@ -14,12 +14,7 @@
from frequenz.sdk.util.asyncio import cancel_and_await

from ..timeseries import Sample
from ..timeseries._resampling import (
Resampler,
ResamplingError,
ResamplingFunction,
average,
)
from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError
from ._channel_registry import ChannelRegistry
from ._data_sourcing import ComponentMetricRequest
from ._decorator import actor
Expand All @@ -37,9 +32,7 @@ def __init__( # pylint: disable=too-many-arguments
channel_registry: ChannelRegistry,
data_sourcing_request_sender: Sender[ComponentMetricRequest],
resampling_request_receiver: Receiver[ComponentMetricRequest],
resampling_period_s: float = 0.2,
max_data_age_in_periods: float = 3.0,
resampling_function: ResamplingFunction = average,
config: ResamplerConfig,
) -> None:
"""Initialize an instance.
Expand All @@ -51,34 +44,16 @@ def __init__( # pylint: disable=too-many-arguments
to subscribe to component metrics.
resampling_request_receiver: The receiver to use to receive new
resampmling subscription requests.
resampling_period_s: The time it passes between resampled data
should be calculated (in seconds).
max_data_age_in_periods: The maximum age a sample can have to be
considered *relevant* for resampling purposes, expressed in the
number of resampling periods. For exapmle is
`resampling_period_s` is 3 and `max_data_age_in_periods` is 2,
then data older than `3*2 = 6` secods will be discarded when
creating a new sample and never passed to the resampling
function.
resampling_function: The function to be applied to the sequence of
*relevant* samples at a given time. The result of the function
is what is sent as the resampled data.
config: The configuration for the resampler.
"""
self._channel_registry: ChannelRegistry = channel_registry
self._resampling_period_s: float = resampling_period_s
self._max_data_age_in_periods: float = max_data_age_in_periods
self._resampling_function: ResamplingFunction = resampling_function
self._data_sourcing_request_sender: Sender[
ComponentMetricRequest
] = data_sourcing_request_sender
self._resampling_request_receiver: Receiver[
ComponentMetricRequest
] = resampling_request_receiver
self._resampler: Resampler = Resampler(
resampling_period_s=resampling_period_s,
max_data_age_in_periods=max_data_age_in_periods,
resampling_function=resampling_function,
)
self._resampler: Resampler = Resampler(config)
self._active_req_channels: set[str] = set()

async def _subscribe(self, request: ComponentMetricRequest) -> None:
Expand Down
15 changes: 11 additions & 4 deletions src/frequenz/sdk/timeseries/_base_types.py
Expand Up @@ -3,12 +3,16 @@

"""Timeseries basic types."""

from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional


@dataclass(frozen=True)
# Ordering by timestamp is a bit arbitrary, and it is not always what might be
# wanted. We are using this order now because usually we need to do binary
# searches on sequences of samples, and the Python `bisect` module doesn't
# support providing a key until Python 3.10.
@dataclass(frozen=True, order=True)
class Sample:
"""A measurement taken at a particular point in time.
Expand All @@ -17,5 +21,8 @@ class Sample:
coherent view on a group of component metrics for a particular timestamp.
"""

timestamp: datetime
value: Optional[float] = None
timestamp: datetime = field(compare=True)
"""The time when this sample was generated."""

value: Optional[float] = field(compare=False, default=None)
"""The value of this sample."""

0 comments on commit 0464117

Please sign in to comment.