Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add method to MovingWindow that waits for a number of samples #387

Conversation

matthias-wende-frequenz
Copy link
Contributor

Up to now there was no clean way to wait until the MovingWindow got updated with a certain number of samples.
In this PR we introduce a wait_for_samples method that finishes once the number of samples arrived.

@matthias-wende-frequenz matthias-wende-frequenz requested a review from a team as a code owner May 15, 2023 11:58
@github-actions github-actions bot added part:data-pipeline Affects the data pipeline part:tests Affects the unit, integration and performance (benchmarks) tests labels May 15, 2023
@matthias-wende-frequenz matthias-wende-frequenz removed the part:tests Affects the unit, integration and performance (benchmarks) tests label May 15, 2023
@matthias-wende-frequenz matthias-wende-frequenz added this to the v0.21.0 milestone May 15, 2023
@matthias-wende-frequenz
Copy link
Contributor Author

FYI @idlir-shkurti-frequenz

@github-actions github-actions bot added the part:tests Affects the unit, integration and performance (benchmarks) tests label May 15, 2023
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also feel something smells here, I don't like having some class variable to only track a counter that is used by one method. My instinct says that there should be a way to split this functionality to a different class that uses, can be plugged to or proxies to the MovingWindow, but I don't have anything in particular to propose right. At least nothing that's not a lot of extra work (and probably worsen the usability of this too), so I'm fine to leave it as is, but just mentioning it in case anyone suddenly gets a brilliant idea 💡

Comment on lines 141 to 142
self.count_samples = 0
"""The number of samples that have been received."""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one should probably be a read-only public attribute. Also for attributes it's better to use nouns instead of verbs (and it's missing the type), for example:

Suggested change
self.count_samples = 0
"""The number of samples that have been received."""
self._received_samples_count: int = 0
"""The number of samples that have been received."""

Plus

    @property
    def received_samples_count(self) -> int
        return self._received_samples_count

Comment on lines 143 to 146
self.wait_for_num_samples = 0
"""The number of samples to wait for before
the wait_for_samples method triggers."""
self.wait_for_samples_event = asyncio.Event()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two really look like they should be private. Same about names (and missing types):

Suggested change
self.wait_for_num_samples = 0
"""The number of samples to wait for before
the wait_for_samples method triggers."""
self.wait_for_samples_event = asyncio.Event()
self._expected_samples_count: int = 0
"""The number of samples to wait for before `wait_for_samples()` triggers."""
self._wait_for_samples_event: asyncio.Event = asyncio.Event()
"""The event to signal `wait_for_samples()` that the wait is over."""

src/frequenz/sdk/timeseries/_moving_window.py Outdated Show resolved Hide resolved
@leandro-lucarella-frequenz
Copy link
Contributor

Oh, I also noticed in your last PRs that it seems you have your editor configured to a maximum line length of maybe 50 chars? Ideally it should be 88 to match black's config.

return

self.wait_for_num_samples = num_samples
await self.wait_for_samples_event.wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you never call self.wait_for_samples_event.clear() it will always fire immediately in .wait()

Up to now there was no clean way to wait until
the MovingWindow got updated with a certain number
of samples.

In this commit we introduce a `wait_for_samples`
method that finishes once the number of samples
arrived.

Signed-off-by: Matthias Wende <matthias.wende@frequenz.com>
@matthias-wende-frequenz
Copy link
Contributor Author

I've reworked the implementation. Now the user can get a channel that sends a None when the a certain number of samples arrived. That has the advantage that we can plug it into our channels select mechanism.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides a few comments on improving consistency, the API and the tests, the commit message needs to be updated because it mentions a wait_for_samples method.

Comment on lines +141 to +143
self._wait_for_num_samples: int = 0
"""The number of samples to wait for before the wait_for_num_samples channels
sends out an event."""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using nouns for non-boolean attributes:

Suggested change
self._wait_for_num_samples: int = 0
"""The number of samples to wait for before the wait_for_num_samples channels
sends out an event."""
self._num_samples_to_wait_for: int = 0
"""The number of samples to wait for before triggering an event through the channel."""

Comment on lines +144 to +146
self._wait_for_samples_channel = Broadcast[None](
"Wait for number of samples channel."
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the text here is for debugging purposes only, right @shsms?

I'd suggest using a shorter string for that and using the long string as documentation for the variable:

Suggested change
self._wait_for_samples_channel = Broadcast[None](
"Wait for number of samples channel."
)
self._wait_for_samples_channel = Broadcast[None]("wait-for-samples")
"""Channel to send events to when wait for number of samples is triggered."""

@@ -169,6 +176,9 @@ async def _run_impl(self) -> None:
Raises:
asyncio.CancelledError: if the MovingWindow task is cancelled.
"""
received_samples_count = 0
wait_for_samples_sender = self._wait_for_samples_channel.new_sender()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess once a MW was stopped there is no way to start it again, right? Otherwise the sender should probably be created in the constructor instead to avoid leaking sender objects.

Comment on lines +195 to +196
received_samples_count = 0
await wait_for_samples_sender.send(None)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's free and even when the user should know it beforehand anyway, we could also send the number of samples received instead of None, maybe it could become a handy shortcut.

Suggested change
received_samples_count = 0
await wait_for_samples_sender.send(None)
await wait_for_samples_sender.send(received_samples_count)
received_samples_count = 0

Comment on lines +204 to +217
def set_sample_counter(self, num_samples: int) -> None:
"""Set the number of samples to wait for until the sample counter triggers.

Args:
num_samples: The number of samples to wait for.

Raises:
ValueError: if the number of samples is less than or equal to zero.
"""
if num_samples <= 0:
raise ValueError(
"The number of samples to wait for should be greater than zero."
)
self._wait_for_num_samples = num_samples

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this indirectly enables sending the events if num_samples > 0, right? If so I would rename this method to something that makes it easier to realize that happens. What about:

@property
def is_wait_for_samples_event_enabled(self) -> bool:
    # Returns `self._wait_for_num_samples != 0`

def enable_wait_for_samples_event(self, num_samples: int) -> bool:
    # Same as above but check for `num_samples > 0`, return whether it was enabled before

def disable_wait_for_samples_event(self) -> bool:
    # Set `num_samples = 0`, return whether it was enabled before

)
self._wait_for_num_samples = num_samples

def new_sample_count_receiver(self) -> Receiver[None]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the above suggestion is applied, I would rename this for consistency:

Suggested change
def new_sample_count_receiver(self) -> Receiver[None]:
def new_wait_for_samples_event_receiver(self) -> Receiver[None]:

We could go with something different than wait_for_samples_event as it is a bit long, but the 4 methods should use the same term IMHO.


window.set_sample_counter(samples_to_wait_for)

# asyncio.create_task(push_data_delayed())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# asyncio.create_task(push_data_delayed())

?

Comment on lines +121 to +125
for i in range(0, samples_to_wait_for):
await sender.send(
Sample(datetime.now(tz=timezone.utc) + timedelta(seconds=i), 1.0)
)
await sample_count_recv.receive()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some failures that this will not detect or detect wrongly. For example if you trigger before the samples_to_wait_for this will succeed too. If the event doesn't trigger, this test will hang forever, which might be quite annoying.

Maybe could check for the internal counter to verify it is incremented? If you also add my suggestion to return the number of samples received you could add the check here too.

Maybe you can also move the await sample_count_recv.receive() to a different task so you can check it didn't fire before you sent enough samples and that it was fired after you sent all the expected samples but without blocking the main thread if it fails?

Maybe you could also push different sample values and then check that the state of the moving window is what it would be expected after receiving all that samples, so if the even triggered before that check should also fail.

@llucax llucax removed this from the v0.22.0 milestone Jun 27, 2023
@cwasicki
Copy link
Collaborator

Will this not be part of v1 milestone?

@matthias-wende-frequenz
Copy link
Contributor Author

Will this not be part of v1 milestone?

Let's see. This change is not breaking so it can be v1.1.x too. But let's try to get it in.

@matthias-wende-frequenz matthias-wende-frequenz added this to the post-v1.0 milestone Aug 23, 2023
@thomas-nicolai-frequenz thomas-nicolai-frequenz added priority:high Address this as soon as possible type:enhancement New feature or enhancement visitble to users labels Aug 23, 2023
@matthias-wende-frequenz

This comment was marked as off-topic.

@cwasicki

This comment was marked as off-topic.

@matthias-wende-frequenz

This comment was marked as off-topic.

@llucax llucax changed the base branch from v0.x.x to v1.x.x October 11, 2023 07:21
@llucax llucax modified the milestones: post-v1.0, v1.0.0-rc5, v1.0.0-rc6 Jan 29, 2024
@llucax llucax modified the milestones: v1.0.0-rc6, v1.0.0-rc7 Mar 26, 2024
@matthias-wende-frequenz
Copy link
Contributor Author

This is quite old. It we need it we can reopen the PR or start from scratch.

@cwasicki
Copy link
Collaborator

cwasicki commented Jun 5, 2024

Is there another way to know if the moving window was updated?

@matthias-wende-frequenz
Copy link
Contributor Author

Is there another way to know if the moving window was updated?

No there isn't.
This pr was closed, not because we don't want to support it, but rather since nobody is working nor anyone was asking for this feature. So as it appears there is no need for this feature.

@shsms
Copy link
Contributor

shsms commented Jun 6, 2024

It turns out this is required in the Forecast actor. Only yesterday evening, Christoph showed me the place where he's using a while loop that could be replaced with this, just a little bit after we were talking about closing this issue. But I think not top priority, so we can pick this up again soon.

@llucax llucax modified the milestones: v1.0.0-rc800, Dropped Jun 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
part:data-pipeline Affects the data pipeline part:tests Affects the unit, integration and performance (benchmarks) tests priority:high Address this as soon as possible type:enhancement New feature or enhancement visitble to users
Projects
Development

Successfully merging this pull request may close these issues.

None yet

7 participants