Skip to content

Commit

Permalink
resampling: Don't subscribe twice to the same source (#123)
Browse files Browse the repository at this point in the history
When receiving a subcription, we need to check if we are already
handling a subscription with the same request parameters to avoid
pushing the data twice to the same channel.

Fixes #115.
  • Loading branch information
llucax committed Dec 20, 2022
2 parents bc9dd63 + 6c12205 commit ef43a71
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 46 deletions.
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,8 @@ module = [
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "async_solipsism"
module = [
"async_solipsism",
"async_solipsism.*",
]
ignore_missing_imports = true
23 changes: 18 additions & 5 deletions src/frequenz/sdk/actor/_resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,37 @@ def __init__( # pylint: disable=too-many-arguments
*relevant* samples at a given time. The result of the function
is what is sent as the resampled data.
"""
self._channel_registry = channel_registry
self._resampling_period_s = resampling_period_s
self._channel_registry: ChannelRegistry = channel_registry
self._resampling_period_s: float = resampling_period_s
self._max_data_age_in_periods: float = max_data_age_in_periods
self._resampling_function: ResamplingFunction = resampling_function
self._data_sourcing_request_sender = data_sourcing_request_sender
self._resampling_request_receiver = resampling_request_receiver
self._resampler = Resampler(
self._data_sourcing_request_sender: Sender[
ComponentMetricRequest
] = data_sourcing_request_sender
self._resampling_request_receiver: Receiver[
ComponentMetricRequest
] = resampling_request_receiver
self._resampler: Resampler = Resampler(
resampling_period_s=resampling_period_s,
max_data_age_in_periods=max_data_age_in_periods,
resampling_function=resampling_function,
)
self._active_req_channels: set[str] = set()

async def _subscribe(self, request: ComponentMetricRequest) -> None:
"""Request data for a component metric.
Args:
request: The request for component metric data.
"""
request_channel_name = request.get_channel_name()

# If we are already handling this request, there is nothing to do.
if request_channel_name in self._active_req_channels:
return

self._active_req_channels.add(request_channel_name)

data_source_request = dataclasses.replace(
request, namespace=request.namespace + ":Source"
)
Expand Down
137 changes: 99 additions & 38 deletions tests/actor/test_resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import async_solipsism
import pytest
import time_machine
from async_solipsism.socket import asyncio
from frequenz.channels import Broadcast

from frequenz.sdk.actor import (
Expand Down Expand Up @@ -42,49 +43,19 @@ def _now() -> datetime:
return datetime.now(timezone.utc)


# Even when it could be refactored to use smaller functions, I'm allowing
# too many statements because it makes following failures in tests more easy
# when the code is very flat.
async def test_component_metrics_resampling_actor( # pylint: disable=too-many-statements
async def _assert_resampling_works(
channel_registry: ChannelRegistry,
fake_time: time_machine.Coordinates,
*,
resampling_chan_name: str,
data_source_chan_name: str,
) -> None:
"""Run main functions that initializes and creates everything."""

channel_registry = ChannelRegistry(name="test")
data_source_req_chan = Broadcast[ComponentMetricRequest]("data-source-req")
data_source_req_recv = data_source_req_chan.new_receiver()
resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req")
resampling_req_sender = resampling_req_chan.new_sender()

resampling_actor = ComponentMetricsResamplingActor(
channel_registry=channel_registry,
data_sourcing_request_sender=data_source_req_chan.new_sender(),
resampling_request_receiver=resampling_req_chan.new_receiver(),
resampling_period_s=0.2,
max_data_age_in_periods=2,
)

subs_req = ComponentMetricRequest(
namespace="Resampling",
component_id=9,
metric_id=ComponentMetricId.SOC,
start_time=None,
)

await resampling_req_sender.send(subs_req)
data_source_req = await data_source_req_recv.receive()
assert data_source_req is not None
assert data_source_req == dataclasses.replace(
subs_req, namespace="Resampling:Source"
)

timeseries_receiver = channel_registry.new_receiver(subs_req.get_channel_name())
timeseries_sender = channel_registry.new_sender(data_source_req.get_channel_name())
timeseries_receiver = channel_registry.new_receiver(resampling_chan_name)
timeseries_sender = channel_registry.new_sender(data_source_chan_name)

fake_time.shift(0.2)
new_sample = await timeseries_receiver.receive() # At 0.2s (timer)
assert new_sample is not None
assert new_sample.value is None
assert new_sample == Sample(_now(), None)

fake_time.shift(0.1)
sample = Sample(_now(), 3) # ts = 0.3s
Expand All @@ -105,6 +76,7 @@ async def test_component_metrics_resampling_actor( # pylint: disable=too-many-s
assert new_sample is not None
assert new_sample.value == 3.5 # avg(3, 4)
assert new_sample.timestamp >= sample.timestamp
assert new_sample.timestamp == _now()

fake_time.shift(0.05)
await timeseries_sender.send(Sample(_now(), 8)) # ts = 0.65s
Expand All @@ -118,19 +90,108 @@ async def test_component_metrics_resampling_actor( # pylint: disable=too-many-s
assert new_sample is not None
assert new_sample.value == 5.5 # avg(4, 8, 1, 9)
assert new_sample.timestamp >= sample.timestamp
assert new_sample.timestamp == _now()

# No more samples sent
fake_time.shift(0.2)
new_sample = await timeseries_receiver.receive() # At 1.0s (timer)
assert new_sample is not None
assert new_sample.value == 6 # avg(8, 1, 9)
assert new_sample.timestamp >= sample.timestamp
assert new_sample.timestamp == _now()

# No more samples sent
fake_time.shift(0.2)
new_sample = await timeseries_receiver.receive() # At 1.2s (timer)
assert new_sample is not None
assert new_sample.value is None
assert new_sample.timestamp == _now()


async def test_single_request(
fake_time: time_machine.Coordinates,
) -> None:
"""Run main functions that initializes and creates everything."""

channel_registry = ChannelRegistry(name="test")
data_source_req_chan = Broadcast[ComponentMetricRequest]("data-source-req")
data_source_req_recv = data_source_req_chan.new_receiver()
resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req")
resampling_req_sender = resampling_req_chan.new_sender()

resampling_actor = ComponentMetricsResamplingActor(
channel_registry=channel_registry,
data_sourcing_request_sender=data_source_req_chan.new_sender(),
resampling_request_receiver=resampling_req_chan.new_receiver(),
resampling_period_s=0.2,
max_data_age_in_periods=2,
)

subs_req = ComponentMetricRequest(
namespace="Resampling",
component_id=9,
metric_id=ComponentMetricId.SOC,
start_time=None,
)

await resampling_req_sender.send(subs_req)
data_source_req = await data_source_req_recv.receive()
assert data_source_req is not None
assert data_source_req == dataclasses.replace(
subs_req, namespace="Resampling:Source"
)

await _assert_resampling_works(
channel_registry,
fake_time,
resampling_chan_name=subs_req.get_channel_name(),
data_source_chan_name=data_source_req.get_channel_name(),
)

await resampling_actor._stop() # type: ignore # pylint: disable=no-member,protected-access
await resampling_actor._resampler.stop() # pylint: disable=protected-access


async def test_duplicate_request(
fake_time: time_machine.Coordinates,
) -> None:
"""Run main functions that initializes and creates everything."""

channel_registry = ChannelRegistry(name="test")
data_source_req_chan = Broadcast[ComponentMetricRequest]("data-source-req")
data_source_req_recv = data_source_req_chan.new_receiver()
resampling_req_chan = Broadcast[ComponentMetricRequest]("resample-req")
resampling_req_sender = resampling_req_chan.new_sender()

resampling_actor = ComponentMetricsResamplingActor(
channel_registry=channel_registry,
data_sourcing_request_sender=data_source_req_chan.new_sender(),
resampling_request_receiver=resampling_req_chan.new_receiver(),
resampling_period_s=0.2,
max_data_age_in_periods=2,
)

subs_req = ComponentMetricRequest(
namespace="Resampling",
component_id=9,
metric_id=ComponentMetricId.SOC,
start_time=None,
)

await resampling_req_sender.send(subs_req)
data_source_req = await data_source_req_recv.receive()

# Send duplicate request
await resampling_req_sender.send(subs_req)
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(data_source_req_recv.receive(), timeout=0.1)

await _assert_resampling_works(
channel_registry,
fake_time,
resampling_chan_name=subs_req.get_channel_name(),
data_source_chan_name=data_source_req.get_channel_name(),
)

await resampling_actor._stop() # type: ignore # pylint: disable=no-member,protected-access
await resampling_actor._resampler.stop() # pylint: disable=protected-access
2 changes: 1 addition & 1 deletion tests/timeseries/test_resampling.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# tests/timeseries/test_resampling.py:93: License: MIT
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/_a_sequence.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# tests/timeseries/test_resampling.py:93: License: MIT
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""Helper class to compare two sequences without caring about the underlying type."""
Expand Down

0 comments on commit ef43a71

Please sign in to comment.