Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify activity interruption #239

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

jeffschoner
Copy link
Contributor

@jeffschoner jeffschoner commented May 1, 2023

Summary

Activities can be interrupted for three reasons:

  1. It has been canceled by its workflow, or as an effect of its workflow being terminated
  2. It has timed out
  3. The worker executing the activity is shutting down

This change,

  • Unifies all three interruption modes as subclassed errors of ActivityInterrupted that can be raised by calling heartbeat_interrupted. This provides a convenient way of taking an automatic behavior when an activity should be interrupted. For those that want more granular control, the individual errors can be rescued or methods on the activity context can be used to inspect each case.
  • Allows activities to detect when the worker they are executing on is shutting down by raising an error above, and with a new shutting_down? method on the activity context. This is wired into the activity task processor which already was tracking shutdown state. That code is now wrapped in a mutex since it is now accessed by multiple threads.
  • Allows activities to detect when their start-to-close timeout has been exceeded. This is done by determining a deadline when the activity task is first started. The Go SDK already does by this by setting a deadline on the Go context. In discussions with Temporal, they recommended it as something good to add to the Java SDK as well.
  • Fixes a small race condition that occasionally crops up in the worker spec. This was reliably reproing on Windows for me, but not on Mac, for whatever reason.

Testing

There is a new example test for worker shutdown:

cd examples
bundle exec rspec examples/spec/integration/activity_shutting_down_spec.rb

This test is novel in that it spins up its own workers and does not rely on those started via bin/worker. The activity worker must be spun down while the workflow worker continues to run, in order for the test to pass. Moreover, the workflow and activity used here are not interesting as samples in their own right.

A similar standalone test has also been added for activity timeout:

cd examples
bundle exec rspec examples/spec/integration/activity_timeout_spec.rb

This test needs to understand the error raising behavior inside an activity since the workflow will only see the activity timeout from Temporal server. This isn't possible unless the worker is running in the same process as the spec.

There are many new and updated unit specs in,

bundle exec rspec spec/unit/lib/temporal/activity/context_spec.rb
bundle exec rspec spec/unit/lib/temporal/activity/poller_spec.rb
bundle exec rspec spec/unit/lib/temporal/activity/task_processor_spec.rb
bundle exce rspec spec/unit/lib/temporal/worker_spec.rb # updated spec with race condition

@jeffschoner
Copy link
Contributor Author

cc @calum-stripe @DeRauk

@jeffschoner jeffschoner force-pushed the worker-stopped-timeout branch 2 times, most recently from 35ad297 to c8e989f Compare May 8, 2023 17:05
@jeffschoner
Copy link
Contributor Author

Rebased master to resolve conflicts with heartbeat throttling and to get test reliability fixes

Copy link
Contributor

@antstorm antstorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @jeffschoner, thank you for the PR!

I've pointed out a few things inline, but the notable decision we'll need to make is around the cancellation behaviour in general. Please let me know if you wanna sync on it and figure out a path forward since it's a breaking change potentially.

# Returns true if the activity has been canceled directly by its workflow or indirectly by
# its workflow being terminated. This will only be set following a call to heartbeat that is
# not throttled.
def cancel_requested
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's go with cancel_requested? to be consistent with other query-type methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we may want to call thins cancelled? instead based on the sdk-ruby API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@antstorm This is just moving an existing method from an attr_reader to a proper method so that I can more clearly put a comment on it. I'm happy to alias this as cancelled? and mark the existing method as deprecated if that's the direction you want to move instead.

require 'temporal/uuid'
require 'temporal/activity/async_token'

module Temporal
class Activity
class Context
def initialize(connection, metadata, config, heartbeat_thread_pool)
def initialize(connection, metadata, config, heartbeat_thread_pool, is_shutting_down)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is_shutting_down -> shutting_down_proc? Otherwise it feels like a boolean is being passed

# This returns true if the worker has started shutting down upon receiving a
# TERM or INT signal. Once this happens, your activity should finishing processing
# quickly or raise an error to fail the activity attempt.
def shutting_down?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the new SDK we've taken a slightly different approach and I wonder if it might be a good idea to steer this into the same direction. Basically instead of asking whether the worker is shutting down, the worker would attempt to cancel the activity (similar process to activity cancellation, except using a different exception).

The main reason here is to make sure that cancellations are explicitly handled (including explicitly ignored). By exposing the shutting_down? method we're still defaulting to unhandled behaviour and I assume most activities won't use it at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be curious to learn more about the API and mechanics of this. Is the idea that the worker propagates the cancellation request to the activity code and the activity then acks the cancellation once it's ready to shutdown? How is it different than simply raising an error that can be rescued and re-raised later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time looking through the sdk-ruby code. This is much clearer to me now. I agree this pattern should work better.

However, I do have concerns about changing the behavior of the existing API where there are currently no errors raised either directly by the heartbeat method or through thread interruption. For now, here's how I'm thinking to deal with this:

  • Add these methods onto the activity context:
    • cancel for communicating cancellation reasons via an error argument
    • cancelled? an alias to cancel_requested that returns a Boolean. This can be checked by activities that don't want to deal with errors being raised by heartbeat! or who don't want to heartbeat at all, but rather check for cancellation at certain critical points of the activity.
  • Use the cancel method to communicate cancellation to the context for the three cases in this PR. This should help clean up some icky code like is_shutting_down_proc
  • Eliminate ActivityInterrupted, instead making ActivityCanceled the base class for naming consistency. Having subclassed errors is still useful, but I'm open to collapsing these into a single error like in sdk-ruby too.
  • For now, instead of raising an error onto the thread, this errors will only be raised out of the heartbeat_interrupted/heartbeat! method added in this PR already
  • Eventually, an activity or worker option can be added for thread interruption behavior. A shield method can also be written to protect certain sections of activity code from being interrupted.


# Activity cancellation because the workflow canceled the activity or because the workflow
# was terminated, is only communicated back to the activity by heartbeating.
if activity.cancel_requested
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# shutting down. This flag defaults to false, in which these states can be detected by
# inspecting the heartbeat response for cancelation, or calling timed_out? or shutting_down?
# methods.
def heartbeat_interrupted(details = nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: heartbeat!? Similar to Rails' save! and update! which raise in case of a failure

# inspecting the heartbeat response for cancelation, or calling timed_out? or shutting_down?
# methods.
def heartbeat_interrupted(details = nil)
if deadline_exceeded?(schedule_to_close_deadline)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantically it's a bit weird to couple timeouts and shutdowns to the heartbeat mechanism — they don't really have anything to do with one another. Therefore a cancellation mechanism (similar to other SDKs) with a specific exception type might be a better solution overall (see my other comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are disparate, but this is basically the single point where activity execution can be pre-empted without resorting to something like Thread.raise which is much more disruptive to the existing activity programming model. There is precedent for this in the Java SDK, including guidance on the heartbeat method to rescue/catch the super class of all these errors/exceptions as the proper way to handle interruption.

It sounds like the cancellation mechanism you talk about would allow for one way to be made aware of cancellation/interruption based on any of the 3 pre-emption reasons: worker shutdown, activity cancellation, activity task timeout.

@@ -21,16 +21,21 @@ def initialize(namespace, task_queue, activity_lookup, config, middleware = [],
@config = config
@middleware = middleware
@shutting_down = false
@shutting_down_mutex = Mutex.new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this mutex is actually needed since it only wraps accessing the @shutting_down instance variable, which itself is pretty much atomic and won't cause a context switch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to remove it if you don't think it's necessary

@jeffschoner
Copy link
Contributor Author

@antstorm I'm mostly familiar with how the Java SDK handles this. I have a related PR over there right now to add timeout detection too (temporalio/sdk-java#1771), where there's another discussion going on about the correct model.

Do you have a code pointer to the sdk-ruby or one of the other new SDKs for the suggested cancellation API? Or perhaps a sketch of what you're thinking?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants