diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index bfc91957f..afb10a8ef 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -8,10 +8,6 @@ -- Add fine-grained Result types for the PowerDistributingActor. - Previously Result was one class with many fields. Now each result has its own class - that derives from Result parent class. - ## New Features diff --git a/benchmarks/timeseries/resampling.py b/benchmarks/timeseries/resampling.py new file mode 100644 index 000000000..6e85708dc --- /dev/null +++ b/benchmarks/timeseries/resampling.py @@ -0,0 +1,52 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Benchmark resampling.""" + +from datetime import datetime, timedelta, timezone +from timeit import timeit +from typing import Sequence + +from frequenz.sdk.timeseries import Sample +from frequenz.sdk.timeseries._resampling import ResamplerConfig, _ResamplingHelper + + +def nop( # pylint: disable=unused-argument + samples: Sequence[Sample], resampling_period_s: float +) -> float: + """Return 0.0.""" + return 0.0 + + +def _benchmark_resampling_helper(resamples: int, samples: int) -> None: + """Benchmark the resampling helper.""" + helper = _ResamplingHelper( + ResamplerConfig( + resampling_period_s=1.0, + max_data_age_in_periods=3.0, + resampling_function=nop, + initial_buffer_len=samples * 3, + ) + ) + now = datetime.now(timezone.utc) + + def _do_work() -> None: + nonlocal now + for _n_resample in range(resamples): + for _n_sample in range(samples): + now = now + timedelta(seconds=1 / samples) + helper.add_sample(Sample(now, 0.0)) + helper.resample(now) + + print(timeit(_do_work, number=5)) + + +def _benchmark() -> None: + for resamples in [10, 100, 1000]: + for samples in [10, 100, 1000]: + print(f"{resamples=} {samples=}") + _benchmark_resampling_helper(resamples, samples) + + +if __name__ == "__main__": + _benchmark() diff --git a/examples/resampling.py b/examples/resampling.py index 684490a91..6412a240d 100644 --- a/examples/resampling.py +++ b/examples/resampling.py @@ -18,7 +18,7 @@ ) from frequenz.sdk.microgrid.component import ComponentCategory, ComponentMetricId from frequenz.sdk.timeseries import Sample -from frequenz.sdk.timeseries._resampling import Resampler, Sink, Source +from frequenz.sdk.timeseries._resampling import Resampler, ResamplerConfig, Sink, Source HOST = "microgrid.sandbox.api.frequenz.io" PORT = 61060 @@ -65,7 +65,7 @@ async def run() -> None: # pylint: disable=too-many-locals channel_registry=channel_registry, data_sourcing_request_sender=data_source_request_sender, resampling_request_receiver=resampling_request_receiver, - resampling_period_s=1, + config=ResamplerConfig(resampling_period_s=1), ) components = await microgrid.get().api_client.components() @@ -104,7 +104,7 @@ async def run() -> None: # pylint: disable=too-many-locals # Create a channel to calculate an average for all the data average_chan = Broadcast[Sample]("average") - second_stage_resampler = Resampler(resampling_period_s=3.0) + second_stage_resampler = Resampler(ResamplerConfig(resampling_period_s=3.0)) second_stage_resampler.add_timeseries(average_chan.new_receiver(), _print_sample) average_sender = average_chan.new_sender() diff --git a/pyproject.toml b/pyproject.toml index b0de607c3..389fae441 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "sympy >= 1.10.1, < 2", "toml >= 0.10", "tqdm >= 4.38.0, < 5", + "typing_extensions >= 4.4.0, < 5", "watchfiles >= 0.15.0", ] dynamic = [ "version" ] diff --git a/src/frequenz/sdk/actor/__init__.py b/src/frequenz/sdk/actor/__init__.py index 875684daf..abc94a2aa 100644 --- a/src/frequenz/sdk/actor/__init__.py +++ b/src/frequenz/sdk/actor/__init__.py @@ -7,7 +7,7 @@ from ._config_managing import ConfigManagingActor from ._data_sourcing import ComponentMetricRequest, DataSourcingActor from ._decorator import actor -from ._resampling import ComponentMetricsResamplingActor +from ._resampling import ComponentMetricsResamplingActor, ResamplerConfig __all__ = [ "ChannelRegistry", @@ -15,5 +15,6 @@ "ComponentMetricsResamplingActor", "ConfigManagingActor", "DataSourcingActor", + "ResamplerConfig", "actor", ] diff --git a/src/frequenz/sdk/actor/_resampling.py b/src/frequenz/sdk/actor/_resampling.py index c79e980d0..db9aa3e5b 100644 --- a/src/frequenz/sdk/actor/_resampling.py +++ b/src/frequenz/sdk/actor/_resampling.py @@ -14,12 +14,7 @@ from frequenz.sdk.util.asyncio import cancel_and_await from ..timeseries import Sample -from ..timeseries._resampling import ( - Resampler, - ResamplingError, - ResamplingFunction, - average, -) +from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError from ._channel_registry import ChannelRegistry from ._data_sourcing import ComponentMetricRequest from ._decorator import actor @@ -37,9 +32,7 @@ def __init__( # pylint: disable=too-many-arguments channel_registry: ChannelRegistry, data_sourcing_request_sender: Sender[ComponentMetricRequest], resampling_request_receiver: Receiver[ComponentMetricRequest], - resampling_period_s: float = 0.2, - max_data_age_in_periods: float = 3.0, - resampling_function: ResamplingFunction = average, + config: ResamplerConfig, ) -> None: """Initialize an instance. @@ -51,34 +44,16 @@ def __init__( # pylint: disable=too-many-arguments to subscribe to component metrics. resampling_request_receiver: The receiver to use to receive new resampmling subscription requests. - resampling_period_s: The time it passes between resampled data - should be calculated (in seconds). - max_data_age_in_periods: The maximum age a sample can have to be - considered *relevant* for resampling purposes, expressed in the - number of resampling periods. For exapmle is - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, - then data older than `3*2 = 6` secods will be discarded when - creating a new sample and never passed to the resampling - function. - resampling_function: The function to be applied to the sequence of - *relevant* samples at a given time. The result of the function - is what is sent as the resampled data. + config: The configuration for the resampler. """ self._channel_registry: ChannelRegistry = channel_registry - self._resampling_period_s: float = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods - self._resampling_function: ResamplingFunction = resampling_function self._data_sourcing_request_sender: Sender[ ComponentMetricRequest ] = data_sourcing_request_sender self._resampling_request_receiver: Receiver[ ComponentMetricRequest ] = resampling_request_receiver - self._resampler: Resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=max_data_age_in_periods, - resampling_function=resampling_function, - ) + self._resampler: Resampler = Resampler(config) self._active_req_channels: set[str] = set() async def _subscribe(self, request: ComponentMetricRequest) -> None: diff --git a/src/frequenz/sdk/timeseries/_base_types.py b/src/frequenz/sdk/timeseries/_base_types.py index 36a6e2bd5..6d54a35b3 100644 --- a/src/frequenz/sdk/timeseries/_base_types.py +++ b/src/frequenz/sdk/timeseries/_base_types.py @@ -3,12 +3,16 @@ """Timeseries basic types.""" -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from typing import Optional -@dataclass(frozen=True) +# Ordering by timestamp is a bit arbitrary, and it is not always what might be +# wanted. We are using this order now because usually we need to do binary +# searches on sequences of samples, and the Python `bisect` module doesn't +# support providing a key until Python 3.10. +@dataclass(frozen=True, order=True) class Sample: """A measurement taken at a particular point in time. @@ -17,5 +21,8 @@ class Sample: coherent view on a group of component metrics for a particular timestamp. """ - timestamp: datetime - value: Optional[float] = None + timestamp: datetime = field(compare=True) + """The time when this sample was generated.""" + + value: Optional[float] = field(compare=False, default=None) + """The value of this sample.""" diff --git a/src/frequenz/sdk/timeseries/_resampling.py b/src/frequenz/sdk/timeseries/_resampling.py index d550e3d0f..c60423918 100644 --- a/src/frequenz/sdk/timeseries/_resampling.py +++ b/src/frequenz/sdk/timeseries/_resampling.py @@ -6,9 +6,12 @@ from __future__ import annotations import asyncio +import itertools import logging import math +from bisect import bisect from collections import deque +from dataclasses import dataclass from datetime import datetime, timedelta from typing import AsyncIterator, Callable, Coroutine, Sequence @@ -20,6 +23,15 @@ _logger = logging.Logger(__name__) +DEFAULT_BUFFER_LEN_INIT = 16 +"""Default initial buffer length. + +Buffers will be created initially with this length, but they could grow or +shrink depending on the source characteristics, like sampling rate, to make +sure all the requested past sampling periods can be stored. +""" + + Source = AsyncIterator[Sample] """A source for a timeseries. @@ -85,6 +97,42 @@ def average(samples: Sequence[Sample], resampling_period_s: float) -> float: return sum(values) / len(values) +@dataclass(frozen=True) +class ResamplerConfig: + """Resampler configuration.""" + + resampling_period_s: float + """The resapmling period in seconds. + + This is the time it passes between resampled data should be calculated. + """ + + max_data_age_in_periods: float = 3.0 + """The maximum age a sample can have to be considered *relevant* for resampling. + + Expressed in number of resampling periods. For example if + `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, then data + older than 3*2 = 6 secods will be discarded when creating a new sample and + never passed to the resampling function. + """ + + resampling_function: ResamplingFunction = average + """The resampling function. + + This function will be applied to the sequence of relevant samples at + a given time. The result of the function is what is sent as the resampled + value. + """ + + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT + """The initial length of the resampling buffer. + + The buffer could grow or shrink depending on the source characteristics, + like sampling rate, to make sure all the requested past sampling periods + can be stored. + """ + + class SourceStoppedError(RuntimeError): """A timeseries stopped producing samples.""" @@ -166,34 +214,24 @@ class Resampler: no way to produce meaningful samples with the available data. """ - def __init__( - self, - *, - resampling_period_s: float, - resampling_function: ResamplingFunction = average, - max_data_age_in_periods: float = 3.0, - ) -> None: + def __init__(self, config: ResamplerConfig) -> None: """Initialize an instance. Args: - resampling_period_s: The time it passes between resampled data - should be calculated (in seconds). - max_data_age_in_periods: The maximum age a sample can have to be - considered *relevant* for resampling purposes, expressed in the - number of resampling periods. For exapmle is - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, - then data older than `3*2 = 6` secods will be discarded when - creating a new sample and never passed to the resampling - function. - resampling_function: The function to be applied to the sequence of - *relevant* samples at a given time. The result of the function - is what is sent as the resampled data. + config: The configuration for the resampler. """ - self._resampling_period_s = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods - self._resampling_function: ResamplingFunction = resampling_function + self._config = config self._resamplers: dict[Source, _StreamingHelper] = {} - self._timer: Timer = Timer(self._resampling_period_s) + self._timer: Timer = Timer(config.resampling_period_s) + + @property + def config(self) -> ResamplerConfig: + """Get the resampler configuration. + + Returns: + The resampler configuration. + """ + return self._config async def stop(self) -> None: """Cancel all receiving tasks.""" @@ -214,15 +252,7 @@ def add_timeseries(self, source: Source, sink: Sink) -> bool: if source in self._resamplers: return False - resampler = _StreamingHelper( - _ResamplingHelper( - resampling_period_s=self._resampling_period_s, - max_data_age_in_periods=self._max_data_age_in_periods, - resampling_function=self._resampling_function, - ), - source, - sink, - ) + resampler = _StreamingHelper(_ResamplingHelper(self._config), source, sink) self._resamplers[source] = resampler return True @@ -288,33 +318,14 @@ class _ResamplingHelper: when calling the `resample()` method. All older samples are discarded. """ - def __init__( - self, - *, - resampling_period_s: float, - max_data_age_in_periods: float, - resampling_function: ResamplingFunction, - ) -> None: + def __init__(self, config: ResamplerConfig) -> None: """Initialize an instance. Args: - resampling_period_s: The time it passes between resampled data - should be calculated (in seconds). - max_data_age_in_periods: The maximum age a sample can have to be - considered *relevant* for resampling purposes, expressed in the - number of resampling periods. For exapmle is - `resampling_period_s` is 3 and `max_data_age_in_periods` is 2, - then data older than 3*2 = 6 secods will be discarded when - creating a new sample and never passed to the resampling - function. - resampling_function: The function to be applied to the sequence of - relevant samples at a given time. The result of the function is - what is sent as the resampled data. + config: The configuration for the resampler. """ - self._resampling_period_s = resampling_period_s - self._max_data_age_in_periods: float = max_data_age_in_periods - self._buffer: deque[Sample] = deque() - self._resampling_function: ResamplingFunction = resampling_function + self._config = config + self._buffer: deque[Sample] = deque(maxlen=config.initial_buffer_len) def add_sample(self, sample: Sample) -> None: """Add a new sample to the internal buffer. @@ -324,30 +335,6 @@ def add_sample(self, sample: Sample) -> None: """ self._buffer.append(sample) - def _remove_outdated_samples(self, threshold: datetime) -> None: - """Remove samples that are older than the provided time threshold. - - It is assumed that items in the buffer are in a sorted order (ascending order - by timestamp). - - The removal works by traversing the buffer starting from the oldest sample - (smallest timestamp) and comparing sample's timestamp with the threshold. - If the sample's threshold is smaller than `threshold`, it means that the - sample is outdated and it is removed from the buffer. This continues until - the first sample that is with timestamp greater or equal to `threshold` is - encountered, then buffer is considered up to date. - - Args: - threshold: samples whose timestamp is older than the threshold are - considered outdated and should be remove from the buffer - """ - while self._buffer: - sample: Sample = self._buffer[0] - if sample.timestamp > threshold: - return - - self._buffer.popleft() - def resample(self, timestamp: datetime) -> Sample: """Generate a new sample based on all the current *relevant* samples. @@ -360,15 +347,25 @@ def resample(self, timestamp: datetime) -> Sample: If there are no *relevant* samples, then the new sample will have `None` as `value`. """ - threshold = timestamp - timedelta( - seconds=self._max_data_age_in_periods * self._resampling_period_s + conf = self._config + minimum_relevant_timestamp = timestamp - timedelta( + seconds=conf.resampling_period_s * conf.max_data_age_in_periods ) - self._remove_outdated_samples(threshold=threshold) - + # We need to pass a dummy Sample to bisect because it only support + # specifying a key extraction function in Python 3.10, so we need to + # compare samples at the moment. + cut_index = bisect(self._buffer, Sample(minimum_relevant_timestamp, None)) + # Using itertools for slicing doesn't look very efficient, but + # experiements with a custom (ring) buffer that can slice showed that + # it is not that bad. See: + # https://github.com/frequenz-floss/frequenz-sdk-python/pull/130 + # So if we need more performance beyond this point, we probably need to + # resort to some C (or similar) implementation. + relevant_samples = list(itertools.islice(self._buffer, cut_index, None)) value = ( - None - if not self._buffer - else self._resampling_function(self._buffer, self._resampling_period_s) + conf.resampling_function(relevant_samples, conf.resampling_period_s) + if relevant_samples + else None ) return Sample(timestamp, value) diff --git a/tests/actor/test_data_sourcing.py b/tests/actor/test_data_sourcing.py index 7481d186b..7ca9cd774 100644 --- a/tests/actor/test_data_sourcing.py +++ b/tests/actor/test_data_sourcing.py @@ -93,5 +93,5 @@ async def test_data_sourcing_actor(self) -> None: assert sample is not None assert 100.0 == sample.value - await server.stop(0.1) + assert await server.graceful_shutdown() _microgrid._MICROGRID = None # pylint: disable=protected-access diff --git a/tests/actor/test_decorator.py b/tests/actor/test_decorator.py index f81f9e039..72644307f 100644 --- a/tests/actor/test_decorator.py +++ b/tests/actor/test_decorator.py @@ -96,6 +96,8 @@ async def test_basic_actor() -> None: msg = await echo_rx.receive() assert msg is False + # pylint: disable=protected-access,no-member + await _echo_actor._stop() # type: ignore[attr-defined] async def test_actor_does_not_restart() -> None: diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index 102a2d71d..af5439995 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -16,6 +16,7 @@ ChannelRegistry, ComponentMetricRequest, ComponentMetricsResamplingActor, + ResamplerConfig, ) from frequenz.sdk.microgrid.component import ComponentMetricId from frequenz.sdk.timeseries import Sample @@ -123,8 +124,10 @@ async def test_single_request( channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), - resampling_period_s=0.2, - max_data_age_in_periods=2, + config=ResamplerConfig( + resampling_period_s=0.2, + max_data_age_in_periods=2, + ), ) subs_req = ComponentMetricRequest( @@ -167,8 +170,10 @@ async def test_duplicate_request( channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), - resampling_period_s=0.2, - max_data_age_in_periods=2, + config=ResamplerConfig( + resampling_period_s=0.2, + max_data_age_in_periods=2, + ), ) subs_req = ComponentMetricRequest( diff --git a/tests/microgrid/mock_api.py b/tests/microgrid/mock_api.py index 0655ee47f..d3a6750d8 100644 --- a/tests/microgrid/mock_api.py +++ b/tests/microgrid/mock_api.py @@ -10,6 +10,8 @@ from __future__ import annotations +import asyncio + # pylint: disable=invalid-name,no-name-in-module,unused-import from concurrent import futures from typing import Iterable, Iterator, List, Optional, Tuple @@ -268,10 +270,31 @@ async def start(self) -> None: """Start the server.""" await self._server.start() - async def stop(self, grace: Optional[float]) -> None: + async def _stop(self, grace: Optional[float]) -> None: """Stop the server.""" await self._server.stop(grace) - async def wait_for_termination(self, timeout: Optional[float] = None) -> None: + async def _wait_for_termination(self, timeout: Optional[float] = None) -> None: """Wait for termination.""" await self._server.wait_for_termination(timeout) + + async def graceful_shutdown( + self, stop_timeout: float = 0.1, terminate_timeout: float = 0.2 + ) -> bool: + """Shutdown server gracefully. + + Args: + stop_timeout: Argument for self.stop method + terminate_timeout: Argument for self.wait_for_termination method. + + Returns: + True if server was stopped in given timeout. False otherwise. + """ + await self._stop(stop_timeout) + try: + await asyncio.wait_for( + self._wait_for_termination(None), timeout=terminate_timeout + ) + except TimeoutError: + return False + return True diff --git a/tests/microgrid/test_client.py b/tests/microgrid/test_client.py index 5819be99b..9ba53c992 100644 --- a/tests/microgrid/test_client.py +++ b/tests/microgrid/test_client.py @@ -130,7 +130,7 @@ async def test_components(self) -> None: } finally: - await server.stop(grace=1.0) + assert await server.graceful_shutdown() async def test_connections(self) -> None: servicer = mock_api.MockMicrogridServicer() @@ -284,7 +284,7 @@ async def test_connections(self) -> None: } finally: - await server.stop(grace=1.0) + assert await server.graceful_shutdown() async def test_bad_connections(self) -> None: """Validate that the client does not apply connection filters itself.""" @@ -355,7 +355,7 @@ def ListAllComponents( ) finally: - await server.stop(grace=1.0) + assert await server.graceful_shutdown() async def test_meter_data(self) -> None: servicer = mock_api.MockMicrogridServicer() @@ -383,7 +383,7 @@ async def test_meter_data(self) -> None: await asyncio.sleep(0.2) finally: - await server.stop(0.1) + assert await server.graceful_shutdown() latest = peekable.peek() assert isinstance(latest, MeterData) @@ -415,7 +415,7 @@ async def test_battery_data(self) -> None: await asyncio.sleep(0.2) finally: - await server.stop(0.1) + assert await server.graceful_shutdown() latest = peekable.peek() assert isinstance(latest, BatteryData) @@ -447,7 +447,7 @@ async def test_inverter_data(self) -> None: await asyncio.sleep(0.2) finally: - await server.stop(0.1) + assert await server.graceful_shutdown() latest = peekable.peek() assert isinstance(latest, InverterData) @@ -479,7 +479,7 @@ async def test_ev_charger_data(self) -> None: await asyncio.sleep(0.2) finally: - await server.stop(0.1) + assert await server.graceful_shutdown() latest = peekable.peek() assert isinstance(latest, EVChargerData) @@ -506,7 +506,7 @@ async def test_charge(self) -> None: assert servicer.latest_charge.power_w == 12 finally: - await server.stop(0.1) + assert await server.graceful_shutdown() async def test_discharge(self) -> None: """Check if discharge is able to discharge component.""" @@ -528,7 +528,7 @@ async def test_discharge(self) -> None: assert servicer.latest_discharge.component_id == 73 assert servicer.latest_discharge.power_w == 15 finally: - await server.stop(0.1) + assert await server.graceful_shutdown() async def test_set_bounds(self) -> None: servicer = mock_api.MockMicrogridServicer() @@ -558,7 +558,7 @@ async def test_set_bounds(self) -> None: await asyncio.sleep(0.1) finally: - await server.stop(0.1) + assert await server.graceful_shutdown() assert len(expected_bounds) == len(servicer.get_bounds()) diff --git a/tests/microgrid/test_graph.py b/tests/microgrid/test_graph.py index a3877b342..1fd1e2354 100644 --- a/tests/microgrid/test_graph.py +++ b/tests/microgrid/test_graph.py @@ -766,7 +766,7 @@ async def test_refresh_from_api(self) -> None: } graph.validate() - await server.stop(1) + assert await server.graceful_shutdown() def test_validate(self) -> None: # `validate` will fail if any of the following are the case: diff --git a/tests/microgrid/test_mock_api.py b/tests/microgrid/test_mock_api.py index fbdf68266..50c8fa8da 100644 --- a/tests/microgrid/test_mock_api.py +++ b/tests/microgrid/test_mock_api.py @@ -239,7 +239,7 @@ async def test_MockGrpcServer() -> None: Connection(start=2, end=3), ] - await server1.stop(1) + await server1.graceful_shutdown() servicer2 = mock_api.MockMicrogridServicer( components=[ @@ -285,4 +285,4 @@ async def test_MockGrpcServer() -> None: Connection(start=77, end=9999), ] - await server2.wait_for_termination(0.1) + await server2.graceful_shutdown() diff --git a/tests/microgrid/test_timeout.py b/tests/microgrid/test_timeout.py index 561ba5c96..dcadf38c0 100644 --- a/tests/microgrid/test_timeout.py +++ b/tests/microgrid/test_timeout.py @@ -30,10 +30,6 @@ # error and needs to be greater than `GRPC_CALL_TIMEOUT`. GRPC_SERVER_DELAY: float = 0.3 -# How long a mocked Microgrid server should be running for a single test function, -# before it gets shut down so that the tests can finish. -GRPC_SERVER_SHUTDOWN_DELAY: float = 1.0 - @patch( "frequenz.sdk.microgrid.client._client.DEFAULT_GRPC_CALL_TIMEOUT", GRPC_CALL_TIMEOUT @@ -59,7 +55,7 @@ def mock_list_components( with pytest.raises(grpc.aio.AioRpcError) as err_ctx: _ = await client.components() assert err_ctx.value.code() == grpc.StatusCode.DEADLINE_EXCEEDED - await server.wait_for_termination(GRPC_SERVER_SHUTDOWN_DELAY) + assert await server.graceful_shutdown() @patch( @@ -86,7 +82,7 @@ def mock_list_connections( with pytest.raises(grpc.aio.AioRpcError) as err_ctx: _ = await client.connections() assert err_ctx.value.code() == grpc.StatusCode.DEADLINE_EXCEEDED - await server.wait_for_termination(GRPC_SERVER_SHUTDOWN_DELAY) + assert await server.graceful_shutdown() @patch( @@ -117,4 +113,4 @@ def mock_set_power( _ = await client.set_power(component_id=1, power_w=power_w) assert err_ctx.value.code() == grpc.StatusCode.DEADLINE_EXCEEDED - await server.stop(GRPC_SERVER_SHUTDOWN_DELAY) + assert await server.graceful_shutdown() diff --git a/tests/timeseries/mock_microgrid.py b/tests/timeseries/mock_microgrid.py index 6eeed00ff..106ea0b56 100644 --- a/tests/timeseries/mock_microgrid.py +++ b/tests/timeseries/mock_microgrid.py @@ -27,6 +27,7 @@ ComponentMetricRequest, ComponentMetricsResamplingActor, DataSourcingActor, + ResamplerConfig, ) from tests.microgrid import mock_api @@ -236,12 +237,12 @@ async def _init_client_and_actors( channel_registry=channel_registry, data_sourcing_request_sender=data_source_request_sender, resampling_request_receiver=resampling_actor_request_receiver, - resampling_period_s=0.1, + config=ResamplerConfig(resampling_period_s=0.1), ) return (resampling_actor_request_sender, channel_registry) async def cleanup(self) -> None: """Clean up after a test.""" - await self._server.stop(0.1) + await self._server.graceful_shutdown() microgrid._microgrid._MICROGRID = None # pylint: disable=protected-access diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index b81d8f79d..61b0fdb28 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -17,6 +17,7 @@ from frequenz.sdk.timeseries import Sample from frequenz.sdk.timeseries._resampling import ( Resampler, + ResamplerConfig, ResamplingError, ResamplingFunction, Sink, @@ -90,9 +91,11 @@ async def test_resampling_with_one_window( spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=1.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -180,9 +183,11 @@ async def test_resampling_with_one_and_a_half_windows( # pylint: disable=too-ma spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=1.5, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=1.5, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -311,9 +316,11 @@ async def test_resampling_with_two_windows( # pylint: disable=too-many-statemen spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -442,9 +449,11 @@ async def test_receiving_stopped_resampling_error( spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + ) ) source_recvr = source_chan.new_receiver() @@ -499,9 +508,11 @@ async def test_receiving_resampling_error(fake_time: time_machine.Coordinates) - spec=ResamplingFunction, return_value=expected_resampled_value ) resampler = Resampler( - resampling_period_s=resampling_period_s, - max_data_age_in_periods=2.0, - resampling_function=resampling_fun_mock, + ResamplerConfig( + resampling_period_s=resampling_period_s, + max_data_age_in_periods=2.0, + resampling_function=resampling_fun_mock, + ) ) class TestException(Exception):