Skip to content

Commit

Permalink
During eviction, set is_replaying and raise special exception (#524)
Browse files Browse the repository at this point in the history
Fixes #523
  • Loading branch information
cretz committed May 15, 2024
1 parent f96679b commit a52f25d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 2 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,3 @@ jobs:
python-repo-path: ${{github.event.pull_request.head.repo.full_name}}
version: ${{github.event.pull_request.head.ref}}
version-is-repo-ref: true
features-repo-ref: http-connect-proxy-python
11 changes: 10 additions & 1 deletion temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,9 @@ def _apply_remove_from_cache(
) -> None:
self._deleting = True
self._cancel_requested = True
# We consider eviction to be under replay so that certain code like
# logging that avoids replaying doesn't run during eviction either
self._is_replaying = True
# Cancel everything
for task in self._tasks:
task.cancel()
Expand Down Expand Up @@ -1514,7 +1517,9 @@ def _assert_not_read_only(
self, action_attempted: str, *, allow_during_delete: bool = False
) -> None:
if self._deleting and not allow_during_delete:
raise RuntimeError(f"Ignoring {action_attempted} while deleting")
raise _WorkflowBeingEvictedError(
f"Ignoring {action_attempted} while evicting workflow. This is not an error."
)
if self._read_only:
raise temporalio.workflow.ReadOnlyContextError(
f"While in read-only function, action attempted: {action_attempted}"
Expand Down Expand Up @@ -2614,3 +2619,7 @@ def set(
) -> None:
if not temporalio.workflow.unsafe.is_replaying():
self._underlying.set(value, additional_attributes)


class _WorkflowBeingEvictedError(BaseException):
pass
46 changes: 46 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3429,6 +3429,52 @@ async def signal_count() -> int:
assert not hook_calls


@dataclass
class CapturedEvictionException:
is_replaying: bool
exception: BaseException


captured_eviction_exceptions: List[CapturedEvictionException] = []


@workflow.defn(sandboxed=False)
class EvictionCaptureExceptionWorkflow:
@workflow.run
async def run(self) -> None:
# Going to sleep so we can force eviction
try:
await asyncio.sleep(0.01)
except BaseException as err:
captured_eviction_exceptions.append(
CapturedEvictionException(
is_replaying=workflow.unsafe.is_replaying(), exception=err
)
)


async def test_workflow_eviction_exception(client: Client):
assert not captured_eviction_exceptions

# Run workflow with no cache (forces eviction every step)
async with new_worker(
client, EvictionCaptureExceptionWorkflow, max_cached_workflows=0
) as worker:
await client.execute_workflow(
EvictionCaptureExceptionWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

# Confirm expected eviction replaying state and exception type
assert len(captured_eviction_exceptions) == 1
assert captured_eviction_exceptions[0].is_replaying
assert (
type(captured_eviction_exceptions[0].exception).__name__
== "_WorkflowBeingEvictedError"
)


@dataclass
class DynamicWorkflowValue:
some_string: str
Expand Down

0 comments on commit a52f25d

Please sign in to comment.