From a04c70cee8c3ad7ebaeec337f6304cd7594fc37c Mon Sep 17 00:00:00 2001 From: ela-kotulska-frequenz Date: Mon, 19 Dec 2022 19:44:23 +0100 Subject: [PATCH 1/9] Stop actor in test_decorator Non stopped actor will raise warning: `Task was destroyed but it is pending!` in the next test. Signed-off-by: ela-kotulska-frequenz --- tests/actor/test_decorator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/actor/test_decorator.py b/tests/actor/test_decorator.py index f81f9e039..72644307f 100644 --- a/tests/actor/test_decorator.py +++ b/tests/actor/test_decorator.py @@ -96,6 +96,8 @@ async def test_basic_actor() -> None: msg = await echo_rx.receive() assert msg is False + # pylint: disable=protected-access,no-member + await _echo_actor._stop() # type: ignore[attr-defined] async def test_actor_does_not_restart() -> None: From 01728befc9d2c0891dc0fe75194a5a9ccd332638 Mon Sep 17 00:00:00 2001 From: ela-kotulska-frequenz Date: Mon, 19 Dec 2022 20:58:01 +0100 Subject: [PATCH 2/9] Close MockServer (Fake server) gracefully in tests Previosly only one of 2 methods (stop, wait_for_termination) was called to shut down the server. But both should be called. Otherwise server will continue working. Server will be destroyed by GC in different test causing missleading warnings. `stop` method stops the server from servicing new RPCs. `wait_for_termination` Block current coroutine until the server stops. Signed-off-by: ela-kotulska-frequenz --- tests/actor/test_data_sourcing.py | 2 +- tests/microgrid/mock_api.py | 27 +++++++++++++++++++++++++-- tests/microgrid/test_client.py | 20 ++++++++++---------- tests/microgrid/test_graph.py | 2 +- tests/microgrid/test_mock_api.py | 4 ++-- tests/microgrid/test_timeout.py | 10 +++------- tests/timeseries/mock_microgrid.py | 2 +- 7 files changed, 43 insertions(+), 24 deletions(-) diff --git a/tests/actor/test_data_sourcing.py b/tests/actor/test_data_sourcing.py index 7481d186b..7ca9cd774 100644 --- a/tests/actor/test_data_sourcing.py +++ b/tests/actor/test_data_sourcing.py @@ -93,5 +93,5 @@ async def test_data_sourcing_actor(self) -> None: assert sample is not None assert 100.0 == sample.value - await server.stop(0.1) + assert await server.graceful_shutdown() _microgrid._MICROGRID = None # pylint: disable=protected-access diff --git a/tests/microgrid/mock_api.py b/tests/microgrid/mock_api.py index 0655ee47f..d3a6750d8 100644 --- a/tests/microgrid/mock_api.py +++ b/tests/microgrid/mock_api.py @@ -10,6 +10,8 @@ from __future__ import annotations +import asyncio + # pylint: disable=invalid-name,no-name-in-module,unused-import from concurrent import futures from typing import Iterable, Iterator, List, Optional, Tuple @@ -268,10 +270,31 @@ async def start(self) -> None: """Start the server.""" await self._server.start() - async def stop(self, grace: Optional[float]) -> None: + async def _stop(self, grace: Optional[float]) -> None: """Stop the server.""" await self._server.stop(grace) - async def wait_for_termination(self, timeout: Optional[float] = None) -> None: + async def _wait_for_termination(self, timeout: Optional[float] = None) -> None: """Wait for termination.""" await self._server.wait_for_termination(timeout) + + async def graceful_shutdown( + self, stop_timeout: float = 0.1, terminate_timeout: float = 0.2 + ) -> bool: + """Shutdown server gracefully. + + Args: + stop_timeout: Argument for self.stop method + terminate_timeout: Argument for self.wait_for_termination method. + + Returns: + True if server was stopped in given timeout. False otherwise. + """ + await self._stop(stop_timeout) + try: + await asyncio.wait_for( + self._wait_for_termination(None), timeout=terminate_timeout + ) + except TimeoutError: + return False + return True diff --git a/tests/microgrid/test_client.py b/tests/microgrid/test_client.py index 5819be99b..9ba53c992 100644 --- a/tests/microgrid/test_client.py +++ b/tests/microgrid/test_client.py @@ -130,7 +130,7 @@ async def test_components(self) -> None: } finally: - await server.stop(grace=1.0) + assert await server.graceful_shutdown() async def test_connections(self) -> None: servicer = mock_api.MockMicrogridServicer() @@ -284,7 +284,7 @@ async def test_connections(self) -> None: } finally: - await server.stop(grace=1.0) + assert await server.graceful_shutdown() async def test_bad_connections(self) -> None: """Validate that the client does not apply connection filters itself.""" @@ -355,7 +355,7 @@ def ListAllComponents( ) finally: - await server.stop(grace=1.0) + assert await server.graceful_shutdown() async def test_meter_data(self) -> None: servicer = mock_api.MockMicrogridServicer() @@ -383,7 +383,7 @@ async def test_meter_data(self) -> None: await asyncio.sleep(0.2) finally: - await server.stop(0.1) + assert await server.graceful_shutdown() latest = peekable.peek() assert isinstance(latest, MeterData) @@ -415,7 +415,7 @@ async def test_battery_data(self) -> None: await asyncio.sleep(0.2) finally: - await server.stop(0.1) + assert await server.graceful_shutdown() latest = peekable.peek() assert isinstance(latest, BatteryData) @@ -447,7 +447,7 @@ async def test_inverter_data(self) -> None: await asyncio.sleep(0.2) finally: - await server.stop(0.1) + assert await server.graceful_shutdown() latest = peekable.peek() assert isinstance(latest, InverterData) @@ -479,7 +479,7 @@ async def test_ev_charger_data(self) -> None: await asyncio.sleep(0.2) finally: - await server.stop(0.1) + assert await server.graceful_shutdown() latest = peekable.peek() assert isinstance(latest, EVChargerData) @@ -506,7 +506,7 @@ async def test_charge(self) -> None: assert servicer.latest_charge.power_w == 12 finally: - await server.stop(0.1) + assert await server.graceful_shutdown() async def test_discharge(self) -> None: """Check if discharge is able to discharge component.""" @@ -528,7 +528,7 @@ async def test_discharge(self) -> None: assert servicer.latest_discharge.component_id == 73 assert servicer.latest_discharge.power_w == 15 finally: - await server.stop(0.1) + assert await server.graceful_shutdown() async def test_set_bounds(self) -> None: servicer = mock_api.MockMicrogridServicer() @@ -558,7 +558,7 @@ async def test_set_bounds(self) -> None: await asyncio.sleep(0.1) finally: - await server.stop(0.1) + assert await server.graceful_shutdown() assert len(expected_bounds) == len(servicer.get_bounds()) diff --git a/tests/microgrid/test_graph.py b/tests/microgrid/test_graph.py index a3877b342..1fd1e2354 100644 --- a/tests/microgrid/test_graph.py +++ b/tests/microgrid/test_graph.py @@ -766,7 +766,7 @@ async def test_refresh_from_api(self) -> None: } graph.validate() - await server.stop(1) + assert await server.graceful_shutdown() def test_validate(self) -> None: # `validate` will fail if any of the following are the case: diff --git a/tests/microgrid/test_mock_api.py b/tests/microgrid/test_mock_api.py index fbdf68266..50c8fa8da 100644 --- a/tests/microgrid/test_mock_api.py +++ b/tests/microgrid/test_mock_api.py @@ -239,7 +239,7 @@ async def test_MockGrpcServer() -> None: Connection(start=2, end=3), ] - await server1.stop(1) + await server1.graceful_shutdown() servicer2 = mock_api.MockMicrogridServicer( components=[ @@ -285,4 +285,4 @@ async def test_MockGrpcServer() -> None: Connection(start=77, end=9999), ] - await server2.wait_for_termination(0.1) + await server2.graceful_shutdown() diff --git a/tests/microgrid/test_timeout.py b/tests/microgrid/test_timeout.py index 561ba5c96..dcadf38c0 100644 --- a/tests/microgrid/test_timeout.py +++ b/tests/microgrid/test_timeout.py @@ -30,10 +30,6 @@ # error and needs to be greater than `GRPC_CALL_TIMEOUT`. GRPC_SERVER_DELAY: float = 0.3 -# How long a mocked Microgrid server should be running for a single test function, -# before it gets shut down so that the tests can finish. -GRPC_SERVER_SHUTDOWN_DELAY: float = 1.0 - @patch( "frequenz.sdk.microgrid.client._client.DEFAULT_GRPC_CALL_TIMEOUT", GRPC_CALL_TIMEOUT @@ -59,7 +55,7 @@ def mock_list_components( with pytest.raises(grpc.aio.AioRpcError) as err_ctx: _ = await client.components() assert err_ctx.value.code() == grpc.StatusCode.DEADLINE_EXCEEDED - await server.wait_for_termination(GRPC_SERVER_SHUTDOWN_DELAY) + assert await server.graceful_shutdown() @patch( @@ -86,7 +82,7 @@ def mock_list_connections( with pytest.raises(grpc.aio.AioRpcError) as err_ctx: _ = await client.connections() assert err_ctx.value.code() == grpc.StatusCode.DEADLINE_EXCEEDED - await server.wait_for_termination(GRPC_SERVER_SHUTDOWN_DELAY) + assert await server.graceful_shutdown() @patch( @@ -117,4 +113,4 @@ def mock_set_power( _ = await client.set_power(component_id=1, power_w=power_w) assert err_ctx.value.code() == grpc.StatusCode.DEADLINE_EXCEEDED - await server.stop(GRPC_SERVER_SHUTDOWN_DELAY) + assert await server.graceful_shutdown() diff --git a/tests/timeseries/mock_microgrid.py b/tests/timeseries/mock_microgrid.py index 6eeed00ff..eebaaabbc 100644 --- a/tests/timeseries/mock_microgrid.py +++ b/tests/timeseries/mock_microgrid.py @@ -243,5 +243,5 @@ async def _init_client_and_actors( async def cleanup(self) -> None: """Clean up after a test.""" - await self._server.stop(0.1) + await self._server.graceful_shutdown() microgrid._microgrid._MICROGRID = None # pylint: disable=protected-access From e15d151891d5a2092852f387d9e96ec4238b75c3 Mon Sep 17 00:00:00 2001 From: ela-kotulska-frequenz Date: Tue, 20 Dec 2022 16:23:00 +0100 Subject: [PATCH 3/9] Update release notes Signed-off-by: ela-kotulska-frequenz --- RELEASE_NOTES.md | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index bfc91957f..ab8a3d29d 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,20 +2,16 @@ ## Summary - - ## Upgrading - - -- Add fine-grained Result types for the PowerDistributingActor. - Previously Result was one class with many fields. Now each result has its own class - that derives from Result parent class. +- Add fine-grained Result types for the PowerDistributingActor. Previously Result was one class with many fields. Now each type of the result has its own class that derives from Result parent class. ## New Features - +- Add inverter type to the Component data. Inverter type tells what kind of inverter it is (Battery, Solar etc). +- Add `FormulaGenerator` class for generating formulas from the component graph. +- Add formulas for: `grid_power`, `battery_power`, `PV power`. ## Bug Fixes - +- Fix ComponentMetricResamplingActor, to not subscribe twice to the same source. From 0b5b1af4fca89b15feffa79fbe9e6b2741f25d8a Mon Sep 17 00:00:00 2001 From: ela-kotulska-frequenz Date: Tue, 20 Dec 2022 21:51:31 +0100 Subject: [PATCH 4/9] Clear release notes Signed-off-by: ela-kotulska-frequenz --- RELEASE_NOTES.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index ab8a3d29d..afb10a8ef 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,16 +2,16 @@ ## Summary + + ## Upgrading -- Add fine-grained Result types for the PowerDistributingActor. Previously Result was one class with many fields. Now each type of the result has its own class that derives from Result parent class. + ## New Features -- Add inverter type to the Component data. Inverter type tells what kind of inverter it is (Battery, Solar etc). -- Add `FormulaGenerator` class for generating formulas from the component graph. -- Add formulas for: `grid_power`, `battery_power`, `PV power`. + ## Bug Fixes -- Fix ComponentMetricResamplingActor, to not subscribe twice to the same source. + From 5c110e55a83cbc1e449f9b268c962e8a8c6701d8 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 21 Dec 2022 12:46:47 -0300 Subject: [PATCH 5/9] Add typing_extensions to the dependencies Signed-off-by: Leandro Lucarella --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index b0de607c3..389fae441 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "sympy >= 1.10.1, < 2", "toml >= 0.10", "tqdm >= 4.38.0, < 5", + "typing_extensions >= 4.4.0, < 5", "watchfiles >= 0.15.0", ] dynamic = [ "version" ] From a0f465783f446e4528890b4aa7f1185293ec3bfa Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 7 Dec 2022 12:27:49 +0100 Subject: [PATCH 6/9] Move resampler configuration to its own class This makes user code a little bit more verbose but makes the code much more maintainable, as we avoid having to copy and passing around a lot of configuration variables that are only (or mostly) only relevant to the internal resampling class. It also removes a lot of documentation duplication that can get easily out of sync and cause confusion. And we'll add quite a few more configuration variables in subsequent commits, which will just exacerbate the above mentioned issues without a config class. Signed-off-by: Leandro Lucarella --- examples/resampling.py | 6 +- src/frequenz/sdk/actor/__init__.py | 3 +- src/frequenz/sdk/actor/_resampling.py | 33 +------ src/frequenz/sdk/timeseries/_resampling.py | 107 ++++++++++----------- tests/actor/test_resampling.py | 13 ++- tests/timeseries/mock_microgrid.py | 3 +- tests/timeseries/test_resampling.py | 41 +++++--- 7 files changed, 97 insertions(+), 109 deletions(-) diff --git a/examples/resampling.py b/examples/resampling.py index 684490a91..6412a240d 100644 --- a/examples/resampling.py +++ b/examples/resampling.py @@ -18,7 +18,7 @@ ) from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._resampling import Resampler, Sink, Source +from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig, Sink, Source HOST = "microgrid.sandbox.api.frequenz.io" PORT = 61060 @@ -65,7 +65,7 @@ async def run() -> None: # pylint: disable=too-many-locals channel_registry=channel_registry, data_sourcing_request_sender=data_source_request_sender, resampling_request_receiver=resampling_request_receiver, - resampling_period_s=1, + config=ResamplerConfig(resampling_period_s=1), ) components = await microgrid.get().api_client.components() @@ -104,7 +104,7 @@ async def run() -> None: # pylint: disable=too-many-locals # Create a channel to calculate an average for all the data average_chan = Broadcast[Sample]("average") - second_stage_resampler = Resampler(resampling_period_s=3.0) + second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0)) second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample) average_sender = average_chan.new_sender() diff --git a/src/frequenz/sdk/actor/__init__.py b/src/frequenz/sdk/actor/__init__.py index 875684daf..abc94a2aa 100644 --- a/src/frequenz/sdk/actor/__init__.py +++ b/src/frequenz/sdk/actor/__init__.py @@ -7,7 +7,7 @@ from ._config_managing import ConfigManagingActor from ._data_sourcing import ComponentMetricRequest, DataSourcingActor from ._decorator import actor -from ._resampling import ComponentMetricsResamplingActor +from ._resampling import ComponentMetricsResamplingActor, ResamplerConfig __all__ = [ "ChannelRegistry", @@ -15,5 +15,6 @@ "ComponentMetricsResamplingActor", "ConfigManagingActor", "DataSourcingActor", + "ResamplerConfig", "actor", ] diff --git a/src/frequenz/sdk/actor/_resampling.py b/src/frequenz/sdk/actor/_resampling.py index c79e980d0..db9aa3e5b 100644 --- a/src/frequenz/sdk/actor/_resampling.py +++ b/src/frequenz/sdk/actor/_resampling.py @@ -14,12 +14,7 @@ from frequenz.sdk.util.asyncio import cancel_and_await from ..timeseries import Sample -from ..timeseries._resampling import ( - Resampler, - ResamplingError, - ResamplingFunction, - average, -) +from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError from ._channel_registry import ChannelRegistry from ._data_sourcing import ComponentMetricRequest from ._decorator import actor @@ -37,9 +32,7 @@ def __init__( # pylint: disable=too-many-arguments channel_registry: ChannelRegistry, data_sourcing_request_sender: Sender[ComponentMetricRequest], resampling_request_receiver: Receiver[ComponentMetricRequest], - resampling_period_s: float = 0.2, - max_data_age_in_periods: float = 3.0, - resampling_function: ResamplingFunction = average, + config: ResamplerConfig, ) -> None: """Initialize an instance. @@ -51,34 +44,16 @@ def __init__( # pylint: disable=too-many-arguments to subscribe to component metrics. resampling_request_receiver: The receiver to use to receive new resampmling subscription requests. - resampling_period_s: The time it passes between resampled data - should be calculated (in seconds). - max_data_age_in_periods: The maximum age a sample can have to be - considered *relevant* for resampling purposes, expressed in the - number of resampling periods. For exapmle is - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, - then data older than `3*2 = 6` secods will be discarded when - creating a new sample and never passed to the resampling - function. - resampling_function: The function to be applied to the sequence of - *relevant* samples at a given time. The result of the function - is what is sent as the resampled data. + config: The configuration for the resampler. """ self._channel_registry: ChannelRegistry = channel_registry - self._resampling_period_s: float = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods - self._resampling_function: ResamplingFunction = resampling_function self._data_sourcing_request_sender: Sender[ ComponentMetricRequest ] = data_sourcing_request_sender self._resampling_request_receiver: Receiver[ ComponentMetricRequest ] = resampling_request_receiver - self._resampler: Resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=max_data_age_in_periods, - resampling_function=resampling_function, - ) + self._resampler: Resampler = Resampler(config) self._active_req_channels: set[str] = set() async def _subscribe(self, request: ComponentMetricRequest) -> None: diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index d550e3d0f..4abd54f00 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -9,6 +9,7 @@ import logging import math from collections import deque +from dataclasses import dataclass from datetime import datetime, timedelta from typing import AsyncIterator, Callable, Coroutine, Sequence @@ -85,6 +86,34 @@ def average(samples: Sequence[Sample], resampling_period_s: float) -> float: return sum(values) / len(values) +@dataclass(frozen=True) +class ResamplerConfig: + """Resampler configuration.""" + + resampling_period_s: float + """The resapmling period in seconds. + + This is the time it passes between resampled data should be calculated. + """ + + max_data_age_in_periods: float = 3.0 + """The maximum age a sample can have to be considered *relevant* for resampling. + + Expressed in number of resampling periods. For example if + `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, then data + older than 3*2 = 6 secods will be discarded when creating a new sample and + never passed to the resampling function. + """ + + resampling_function: ResamplingFunction = average + """The resampling function. + + This function will be applied to the sequence of relevant samples at + a given time. The result of the function is what is sent as the resampled + value. + """ + + class SourceStoppedError(RuntimeError): """A timeseries stopped producing samples.""" @@ -166,34 +195,24 @@ class Resampler: no way to produce meaningful samples with the available data. """ - def __init__( - self, - *, - resampling_period_s: float, - resampling_function: ResamplingFunction = average, - max_data_age_in_periods: float = 3.0, - ) -> None: + def __init__(self, config: ResamplerConfig) -> None: """Initialize an instance. Args: - resampling_period_s: The time it passes between resampled data - should be calculated (in seconds). - max_data_age_in_periods: The maximum age a sample can have to be - considered *relevant* for resampling purposes, expressed in the - number of resampling periods. For exapmle is - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, - then data older than `3*2 = 6` secods will be discarded when - creating a new sample and never passed to the resampling - function. - resampling_function: The function to be applied to the sequence of - *relevant* samples at a given time. The result of the function - is what is sent as the resampled data. + config: The configuration for the resampler. """ - self._resampling_period_s = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods - self._resampling_function: ResamplingFunction = resampling_function + self._config = config self._resamplers: dict[Source, _StreamingHelper] = {} - self._timer: Timer = Timer(self._resampling_period_s) + self._timer: Timer = Timer(config.resampling_period_s) + + @property + def config(self) -> ResamplerConfig: + """Get the resampler configuration. + + Returns: + The resampler configuration. + """ + return self._config async def stop(self) -> None: """Cancel all receiving tasks.""" @@ -214,15 +233,7 @@ def add_timeseries(self, source: Source, sink: Sink) -> bool: if source in self._resamplers: return False - resampler = _StreamingHelper( - _ResamplingHelper( - resampling_period_s=self._resampling_period_s, - max_data_age_in_periods=self._max_data_age_in_periods, - resampling_function=self._resampling_function, - ), - source, - sink, - ) + resampler = _StreamingHelper(_ResamplingHelper(self._config), source, sink) self._resamplers[source] = resampler return True @@ -288,33 +299,14 @@ class _ResamplingHelper: when calling the `resample()` method. All older samples are discarded. """ - def __init__( - self, - *, - resampling_period_s: float, - max_data_age_in_periods: float, - resampling_function: ResamplingFunction, - ) -> None: + def __init__(self, config: ResamplerConfig) -> None: """Initialize an instance. Args: - resampling_period_s: The time it passes between resampled data - should be calculated (in seconds). - max_data_age_in_periods: The maximum age a sample can have to be - considered *relevant* for resampling purposes, expressed in the - number of resampling periods. For exapmle is - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, - then data older than 3*2 = 6 secods will be discarded when - creating a new sample and never passed to the resampling - function. - resampling_function: The function to be applied to the sequence of - relevant samples at a given time. The result of the function is - what is sent as the resampled data. + config: The configuration for the resampler. """ - self._resampling_period_s = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods + self._config = config self._buffer: deque[Sample] = deque() - self._resampling_function: ResamplingFunction = resampling_function def add_sample(self, sample: Sample) -> None: """Add a new sample to the internal buffer. @@ -361,14 +353,17 @@ def resample(self, timestamp: datetime) -> Sample: have `None` as `value`. """ threshold = timestamp - timedelta( - seconds=self._max_data_age_in_periods * self._resampling_period_s + seconds=self._config.max_data_age_in_periods + * self._config.resampling_period_s ) self._remove_outdated_samples(threshold=threshold) value = ( None if not self._buffer - else self._resampling_function(self._buffer, self._resampling_period_s) + else self._config.resampling_function( + self._buffer, self._config.resampling_period_s + ) ) return Sample(timestamp, value) diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index 102a2d71d..af5439995 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -16,6 +16,7 @@ ChannelRegistry, ComponentMetricRequest, ComponentMetricsResamplingActor, + ResamplerConfig, ) from frequenz.sdk.microgrid.component import ComponentMetricId from frequenz.sdk.timeseries import Sample @@ -123,8 +124,10 @@ async def test_single_request( channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), - resampling_period_s=0.2, - max_data_age_in_periods=2, + config=ResamplerConfig( + resampling_period_s=0.2, + max_data_age_in_periods=2, + ), ) subs_req = ComponentMetricRequest( @@ -167,8 +170,10 @@ async def test_duplicate_request( channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), - resampling_period_s=0.2, - max_data_age_in_periods=2, + config=ResamplerConfig( + resampling_period_s=0.2, + max_data_age_in_periods=2, + ), ) subs_req = ComponentMetricRequest( diff --git a/tests/timeseries/mock_microgrid.py b/tests/timeseries/mock_microgrid.py index eebaaabbc..106ea0b56 100644 --- a/tests/timeseries/mock_microgrid.py +++ b/tests/timeseries/mock_microgrid.py @@ -27,6 +27,7 @@ ComponentMetricRequest, ComponentMetricsResamplingActor, DataSourcingActor, + ResamplerConfig, ) from tests.microgrid import mock_api @@ -236,7 +237,7 @@ async def _init_client_and_actors( channel_registry=channel_registry, data_sourcing_request_sender=data_source_request_sender, resampling_request_receiver=resampling_actor_request_receiver, - resampling_period_s=0.1, + config=ResamplerConfig(resampling_period_s=0.1), ) return (resampling_actor_request_sender, channel_registry) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index b81d8f79d..61b0fdb28 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -17,6 +17,7 @@ from frequenz.sdk.timeseries import Sample from frequenz.sdk.timeseries._resampling import ( Resampler, + ResamplerConfig, ResamplingError, ResamplingFunction, Sink, @@ -90,9 +91,11 @@ async def test_resampling_with_one_window( spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=1.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -180,9 +183,11 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=1.5, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=1.5, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -311,9 +316,11 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -442,9 +449,11 @@ async def test_receiving_stopped_resampling_error( spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -499,9 +508,11 @@ async def test_receiving_resampling_error(fake_time: time_machine.Coordinates) - spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + ) ) class TestException(Exception): From 42143f55bafc3a9619d0d89ab818883bb633cf72 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 6 Dec 2022 16:42:50 +0100 Subject: [PATCH 7/9] Add a benchmark for the resampler Signed-off-by: Leandro Lucarella --- benchmarks/timeseries/resampling.py | 51 +++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 benchmarks/timeseries/resampling.py diff --git a/benchmarks/timeseries/resampling.py b/benchmarks/timeseries/resampling.py new file mode 100644 index 000000000..cbf89f6a0 --- /dev/null +++ b/benchmarks/timeseries/resampling.py @@ -0,0 +1,51 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Benchmark resampling.""" + +from datetime import datetime, timedelta, timezone +from timeit import timeit +from typing import Sequence + +from frequenz.sdk.timeseries import Sample +from frequenz.sdk.timeseries._resampling import ResamplerConfig, _ResamplingHelper + + +def nop( # pylint: disable=unused-argument + samples: Sequence[Sample], resampling_period_s: float +) -> float: + """Return 0.0.""" + return 0.0 + + +def _benchmark_resampling_helper(resamples: int, samples: int) -> None: + """Benchmark the resampling helper.""" + helper = _ResamplingHelper( + ResamplerConfig( + resampling_period_s=1.0, + max_data_age_in_periods=3.0, + resampling_function=nop, + ) + ) + now = datetime.now(timezone.utc) + + def _do_work() -> None: + nonlocal now + for _n_resample in range(resamples): + for _n_sample in range(samples): + now = now + timedelta(seconds=1 / samples) + helper.add_sample(Sample(now, 0.0)) + helper.resample(now) + + print(timeit(_do_work, number=5)) + + +def _benchmark() -> None: + for resamples in [10, 100, 1000]: + for samples in [10, 100, 1000]: + print(f"{resamples=} {samples=}") + _benchmark_resampling_helper(resamples, samples) + + +if __name__ == "__main__": + _benchmark() From 3a59bdda7d48a9810ec989a004bdccdddcc09c0d Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 7 Dec 2022 15:15:20 +0100 Subject: [PATCH 8/9] Add initial buffer length to the resampler This commit makes the resampler use a proper ring buffer instead of using an unbound buffer that only gets clear up when a resampling is done (which could easily end up in memory issues if the input sampling rate is much higher than the resampling rate). This also improves the performance of the resampling buy an average of 20% (on my local machine, your millage might vary), even when the current implementation now needs to copy the buffer when passed to the resampling function. Here are the results of running benchmarks/timeseries/resampling.py for the old implementation, the new implementation (without doing the bisection and copying) and the new complete implementation with the current fixable inefficiencies: OLD NEW NEW WITH BISECT resamples=10 samples=10 resamples=10 samples=10 resamples=10 samples=10 0.0008896420185919851 0.00062773801619187 0.0008012260077521205 resamples=10 samples=100 resamples=10 samples=100 resamples=10 samples=100 0.007817161997081712 0.005761806009104475 0.006307974021183327 resamples=10 samples=1000 resamples=10 samples=1000 resamples=10 samples=1000 0.07768873398890719 0.05851042701397091 0.0604277040110901 resamples=100 samples=10 resamples=100 samples=10 resamples=100 samples=10 0.008742492995224893 0.0062527229893021286 0.00808265499654226 resamples=100 samples=100 resamples=100 samples=100 resamples=100 samples=100 0.07808284999919124 0.057997508003609255 0.0624066719901748 resamples=100 samples=1000 resamples=100 samples=1000 resamples=100 samples=1000 0.782658567011822 0.5870920980232768 0.6098103950207587 resamples=1000 samples=10 resamples=1000 samples=10 resamples=1000 samples=10 0.08764891701866873 0.062448524025967345 0.07815460601705126 resamples=1000 samples=100 resamples=1000 samples=100 resamples=1000 samples=100 0.78426024899818 0.5858371119829826 0.6357307220168877 resamples=1000 samples=1000 resamples=1000 samples=1000 resamples=1000 samples=1000 7.513815971993608 5.984694316983223 6.42200836900156 Average improvement: 35.3% 19.7% This commit sadly introduces some nasty hack to be able to bisect the buffer, we need to make Sample ordered by timestamp because bisect doesn't support using a key extraction function until Python 3.10. Signed-off-by: Leandro Lucarella --- benchmarks/timeseries/resampling.py | 1 + src/frequenz/sdk/timeseries/_base_types.py | 15 +++-- src/frequenz/sdk/timeseries/_resampling.py | 70 +++++++++++----------- 3 files changed, 47 insertions(+), 39 deletions(-) diff --git a/benchmarks/timeseries/resampling.py b/benchmarks/timeseries/resampling.py index cbf89f6a0..6e85708dc 100644 --- a/benchmarks/timeseries/resampling.py +++ b/benchmarks/timeseries/resampling.py @@ -25,6 +25,7 @@ def _benchmark_resampling_helper(resamples: int, samples: int) -> None: resampling_period_s=1.0, max_data_age_in_periods=3.0, resampling_function=nop, + initial_buffer_len=samples * 3, ) ) now = datetime.now(timezone.utc) diff --git a/src/frequenz/sdk/timeseries/_base_types.py b/src/frequenz/sdk/timeseries/_base_types.py index 36a6e2bd5..6d54a35b3 100644 --- a/src/frequenz/sdk/timeseries/_base_types.py +++ b/src/frequenz/sdk/timeseries/_base_types.py @@ -3,12 +3,16 @@ """Timeseries basic types.""" -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from typing import Optional -@dataclass(frozen=True) +# Ordering by timestamp is a bit arbitrary, and it is not always what might be +# wanted. We are using this order now because usually we need to do binary +# searches on sequences of samples, and the Python `bisect` module doesn't +# support providing a key until Python 3.10. +@dataclass(frozen=True, order=True) class Sample: """A measurement taken at a particular point in time. @@ -17,5 +21,8 @@ class Sample: coherent view on a group of component metrics for a particular timestamp. """ - timestamp: datetime - value: Optional[float] = None + timestamp: datetime = field(compare=True) + """The time when this sample was generated.""" + + value: Optional[float] = field(compare=False, default=None) + """The value of this sample.""" diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 4abd54f00..3d060b220 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -6,8 +6,10 @@ from __future__ import annotations import asyncio +import itertools import logging import math +from bisect import bisect from collections import deque from dataclasses import dataclass from datetime import datetime, timedelta @@ -21,6 +23,15 @@ _logger = logging.Logger(__name__) +DEFAULT_BUFFER_LEN_INIT = 16 +"""Default initial buffer length. + +Buffers will be created initially with this length, but they could grow or +shrink depending on the source characteristics, like sampling rate, to make +sure all the requested past sampling periods can be stored. +""" + + Source = AsyncIterator[Sample] """A source for a timeseries. @@ -113,6 +124,14 @@ class ResamplerConfig: value. """ + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT + """The initial length of the resampling buffer. + + The buffer could grow or shrink depending on the source characteristics, + like sampling rate, to make sure all the requested past sampling periods + can be stored. + """ + class SourceStoppedError(RuntimeError): """A timeseries stopped producing samples.""" @@ -306,7 +325,7 @@ def __init__(self, config: ResamplerConfig) -> None: config: The configuration for the resampler. """ self._config = config - self._buffer: deque[Sample] = deque() + self._buffer: deque[Sample] = deque(maxlen=config.initial_buffer_len) def add_sample(self, sample: Sample) -> None: """Add a new sample to the internal buffer. @@ -316,30 +335,6 @@ def add_sample(self, sample: Sample) -> None: """ self._buffer.append(sample) - def _remove_outdated_samples(self, threshold: datetime) -> None: - """Remove samples that are older than the provided time threshold. - - It is assumed that items in the buffer are in a sorted order (ascending order - by timestamp). - - The removal works by traversing the buffer starting from the oldest sample - (smallest timestamp) and comparing sample's timestamp with the threshold. - If the sample's threshold is smaller than `threshold`, it means that the - sample is outdated and it is removed from the buffer. This continues until - the first sample that is with timestamp greater or equal to `threshold` is - encountered, then buffer is considered up to date. - - Args: - threshold: samples whose timestamp is older than the threshold are - considered outdated and should be remove from the buffer - """ - while self._buffer: - sample: Sample = self._buffer[0] - if sample.timestamp > threshold: - return - - self._buffer.popleft() - def resample(self, timestamp: datetime) -> Sample: """Generate a new sample based on all the current *relevant* samples. @@ -352,18 +347,23 @@ def resample(self, timestamp: datetime) -> Sample: If there are no *relevant* samples, then the new sample will have `None` as `value`. """ - threshold = timestamp - timedelta( - seconds=self._config.max_data_age_in_periods - * self._config.resampling_period_s + conf = self._config + minimum_relevant_timestamp = timestamp - timedelta( + seconds=conf.resampling_period_s * conf.max_data_age_in_periods ) - self._remove_outdated_samples(threshold=threshold) - + # We need to pass a dummy Sample to bisect because it only support + # specifying a key extraction function in Python 3.10, so we need to + # compare samples at the moment. + cut_index = bisect(self._buffer, Sample(minimum_relevant_timestamp, None)) + # pylint: disable=fixme + # FIXME: This is far from efficient, but we don't want to start new + # ring buffer implementation here that uses a list to overcome the + # deque limitation of not being able to get slices + relevant_samples = list(itertools.islice(self._buffer, cut_index, None)) value = ( - None - if not self._buffer - else self._config.resampling_function( - self._buffer, self._config.resampling_period_s - ) + conf.resampling_function(relevant_samples, conf.resampling_period_s) + if relevant_samples + else None ) return Sample(timestamp, value) From 30a47b11a3d54d336fd4035cb9a7ee9e5b5eda2b Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 21 Dec 2022 13:23:17 -0300 Subject: [PATCH 9/9] Update comment about bad performance of deque slicing Signed-off-by: Leandro Lucarella --- src/frequenz/sdk/timeseries/_resampling.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index 3d060b220..c60423918 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -355,10 +355,12 @@ def resample(self, timestamp: datetime) -> Sample: # specifying a key extraction function in Python 3.10, so we need to # compare samples at the moment. cut_index = bisect(self._buffer, Sample(minimum_relevant_timestamp, None)) - # pylint: disable=fixme - # FIXME: This is far from efficient, but we don't want to start new - # ring buffer implementation here that uses a list to overcome the - # deque limitation of not being able to get slices + # Using itertools for slicing doesn't look very efficient, but + # experiements with a custom (ring) buffer that can slice showed that + # it is not that bad. See: + # https://github.com/frequenz-floss/frequenz-sdk-python/pull/130 + # So if we need more performance beyond this point, we probably need to + # resort to some C (or similar) implementation. relevant_samples = list(itertools.islice(self._buffer, cut_index, None)) value = ( conf.resampling_function(relevant_samples, conf.resampling_period_s)