Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delayed messages #102

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Delayed messages #102

wants to merge 2 commits into from

Conversation

fjarri
Copy link

@fjarri fjarri commented Nov 2, 2020

This PR implements delayed message delivery, useful for creating event loops which can still be terminated at a moment's notice. Possibly fixes #44. Also fixes an unrelated bug in eventlet.Event that caused event.wait() to result in an AttributeError.

Supporting delay in ask/tell is pretty straightforward, but doing it for proxy() requires some additional indirection. It is in the commit by itself now, because I understand that the approach I used may be questionable; I also hasn't changed the docs/tests because there is a probability the interface will be changed. Currently one would use delayed proxies as

proxy = actor_ref.proxy(extended=True)
proxy.delayed(1).something()
proxy.delayed(2).some.attr = 1

All the introspection is only done once when proxy_base is created.

This PR has some intersection of functionality with PR #95, but is simpler and gives actors more control of the process (specifically, it is more convenient for event loop organization).

@codecov
Copy link

codecov bot commented Nov 2, 2020

Codecov Report

Merging #102 (50dac57) into main (ac006f7) will decrease coverage by 2.68%.
The diff coverage is 84.96%.

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #102      +/-   ##
==========================================
- Coverage   94.59%   91.91%   -2.69%     
==========================================
  Files          12       12              
  Lines         481      581     +100     
==========================================
+ Hits          455      534      +79     
- Misses         26       47      +21     
Impacted Files Coverage Δ
src/pykka/_proxy.py 88.31% <77.33%> (-10.70%) ⬇️
src/pykka/_ref.py 98.11% <88.88%> (-1.89%) ⬇️
src/pykka/_actor.py 93.90% <95.12%> (-0.46%) ⬇️
src/pykka/_envelope.py 100.00% <100.00%> (ø)
src/pykka/_threading.py 96.42% <100.00%> (+0.20%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ac006f7...50dac57. Read the comment docs.

@fjarri fjarri force-pushed the delayed-delivery branch 3 times, most recently from 2890e7d to 29072ed Compare November 5, 2020 07:41
@fjarri fjarri changed the title [WIP] Delayed messages Delayed messages Nov 5, 2020
@fjarri
Copy link
Author

fjarri commented Nov 5, 2020

@jodal the PR is ready for the initial review now

@djmattyg007
Copy link

My initial thought upon reading the patch is that monotonic timers should be used. Right now it looks like a collision course for problems around daylight saving time changes and leap seconds.

@fjarri
Copy link
Author

fjarri commented Jan 5, 2021

Isn't system timer always monotonic and not affected by timezones/daylight savings etc?

@djmattyg007
Copy link

If that was true there wouldn't be any point for time.monotonic() to exist. The output of time.time() abaolutely is not guaranteed to be monotonic.

@fjarri
Copy link
Author

fjarri commented Jan 5, 2021

My bad, I thought time.time() takes values from the system timer. Seems like an easy fix then?

@djmattyg007
Copy link

I should have looked this up initially:

https://docs.python.org/3/library/time.html?highlight=monotonic#time.time

While this function normally returns non-decreasing values, it can return a lower value than a previous call if the system clock has been set back between the two calls.

You definitely want time.monotonic(). I'm not sure how difficult it would be to switch everything over, but hopefully in most cases if all you're doing is diffing timestamps it should be fine.

@fjarri
Copy link
Author

fjarri commented Jan 9, 2021

@djmattyg007 , updated the PR, thanks for pointing out this problem. It was pretty easy, there were just two places where time.time() was used.

Base automatically changed from develop to main March 6, 2021 18:16
@jodal jodal force-pushed the delayed-delivery branch 2 times, most recently from ac7574e to b9c16d0 Compare March 6, 2021 23:55
Copy link

@adamcik adamcik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these comments are more nits and open questions. I would not run off and try and address everything/anything I pointed out but perhaps see what @jodal thinks about these before spending time on them.

Perhaps doing a wider documentation pass explaining the architecture and how things fit together makes sense after this is merged?

return len(self.timestamps) == 0

def add(self, envelope):
idx = bisect.bisect(self.timestamps, envelope.timestamp)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this use heapq for a priority queue instead of using bisect like this? If there is a reason for preferring it as I would add it as a comment to help future readers og the code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, I admit I wasn't aware of heapq's existence.

if len(self.timestamps) == 0:
return None
else:
return max(self.timestamps[0] - time.monotonic(), 0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being able to sub in a different time source could be useful for tests. Probably not required for this initial PR, but worth keeping in mind.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what kind of API is needed for that, given that it has to match the call in _envelope.py. Perhaps it will be easier to just mock time.monotonic in tests?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to read up on the clock pattern:
https://agilewarrior.wordpress.com/2017/03/03/clock-pattern/

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how is that different from mocking time.monotonic?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mocking time.monotonic() in particular may not be feasible. A dedicated clock object can provide more flexibility in this regard, as well as a nicer interface to manage the clock in tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that's true. But there's still a problem of similarly affecting the timeouted methods of Queue or Event.

@@ -98,6 +124,11 @@ def _create_actor_inbox():
"""Internal method for implementors of new actor types."""
raise NotImplementedError("Use a subclass of Actor")

@staticmethod
def _queue_empty_exception():
"""Internal method for implementors of new actor types."""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this return a type or an instance, could we document this?

# Take all the messages out of the inbox and put them
# in our internal inbox where they're sorted by timestamps
try:
envelope = self.actor_inbox.get(timeout=next_event_in)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when timeout=0

Copy link
Author

@fjarri fjarri Mar 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as with any other timeout, really. It either returns an Envelope (if there's one in the queue), or goes into the except self.__queue_empty_exception branch.

next_event_in = self.__timed_inbox.next_event_in()

# Check if there's something to be processed right now.
if next_event_in > 0:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought next event in can be zero? Is the check here so that we skip any future messages that have delays left and only run those that are ready. Perhaps makes this more clear with a comment?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather, if there are no messages to execute right now (that is, we received some messages, but they were all delayed), we go into the waiting mode again. If there is something to execute, next_event_in would be 0. I'm not sure how can I make it clearer in the comment; could you propose an alternative?

def __setattr__(self, name, value):
if name == "actor_ref" or name.startswith("_"):
return super().__setattr__(name, value)
message_factory = self._message_builder.setattr(name)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this line I almost assumed it was changing an attr on the builder, but I assume it's creating a builder that has the job of setting an attr? Could we make this more clear with better naming or comments?

@@ -263,9 +343,10 @@ class CallableProxy:
proxy.do_work.defer()
"""

def __init__(self, actor_ref, attr_path):
def __init__(self, actor_ref, attr_path, delay=0):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type, units and documentation for delay?


def ask(self, message, block=True, timeout=None):
def ask(self, message, block=True, timeout=None, delay=0):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document, type, units and semantics of delay in ask?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should we allow this to be called with None and is that treated the same as zero? Or a negative delay? Or a duration from datetime?

Should we support an at=None API passing in a time?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should we allow this to be called with None and is that treated the same as zero?

That's possible.

Or a negative delay?

That already works, although the resulting effect is that the message effectively gets a higher priority (because it will be put at the start of the TimedInbox). Not sure if that's a useful thing to allow or not.

Or a duration from datetime?

Not sure what you mean here.

Should we support an at=None API passing in a time?

That's kinda dangerous in view of the earlier discussion about time.monotonic(). We'll have to translate at into the monotonic time, and then the user adjusts the clock, or daylight savings happen, and the execution time will change.

actor_ref.tell(
{"command": "callback", "callback": lambda: lst.append(1)}, delay=0.5
)
event_set = event.wait()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have some largish timeout just in case, so the test exists even if this breaks at some point?

},
delay=delay,
)
event_set = event.wait()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have some largish timeout just in case, so the test exists even if this breaks at some point?

@fjarri
Copy link
Author

fjarri commented Mar 7, 2021

@adamcik , thanks for the comments. I was postponing writing the docs until I'm sure @jodal is happy with the API, especially that of ActorProxy (and with the PR in general).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Actors that drive their own event processing (or maybe have idle processing)
3 participants