Skip to content

Commit

Permalink
Add attempt and firstAttemptTimestamp to context (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Jan 20, 2023
1 parent dcdab81 commit f892c02
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 3 deletions.
12 changes: 12 additions & 0 deletions integ/basic_workflow_state1.go
Expand Up @@ -14,10 +14,22 @@ func (b basicWorkflowState1) GetStateId() string {
}

func (b basicWorkflowState1) Start(ctx iwf.WorkflowContext, input iwf.Object, persistence iwf.Persistence, communication iwf.Communication) (*iwf.CommandRequest, error) {
if ctx.GetAttempt() <= 0 {
panic("attempt should be greater than zero")
}
if ctx.GetFirstAttemptTimestampSeconds() <= 0 {
panic("GetFirstAttemptTimestampSeconds should be greater than zero")
}
return iwf.EmptyCommandRequest(), nil
}

func (b basicWorkflowState1) Decide(ctx iwf.WorkflowContext, input iwf.Object, commandResults iwf.CommandResults, persistence iwf.Persistence, communication iwf.Communication) (*iwf.StateDecision, error) {
if ctx.GetAttempt() <= 0 {
panic("attempt should be greater than zero")
}
if ctx.GetFirstAttemptTimestampSeconds() <= 0 {
panic("GetFirstAttemptTimestampSeconds should be greater than zero")
}
var i int
err := input.Get(&i)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions iwf/worker_service_impl.go
Expand Up @@ -17,7 +17,9 @@ func (w *workerServiceImpl) HandleWorkflowStateStart(ctx context.Context, reques
stateDef := w.registry.getWorkflowStateDef(wfType, request.GetWorkflowStateId())
input := NewObject(request.StateInput, w.options.ObjectEncoder)
reqContext := request.GetContext()
wfCtx := newWorkflowContext(ctx, reqContext.GetWorkflowId(), reqContext.GetWorkflowRunId(), reqContext.GetStateExecutionId(), reqContext.GetWorkflowStartedTimestamp())
wfCtx := newWorkflowContext(
ctx, reqContext.GetWorkflowId(), reqContext.GetWorkflowRunId(), reqContext.GetStateExecutionId(), reqContext.GetWorkflowStartedTimestamp(),
int(reqContext.GetAttempt()), reqContext.GetFirstAttemptTimestamp())

pers, err := newPersistence(w.options.ObjectEncoder, w.registry.getWorkflowDataObjectKeyStore(wfType), w.registry.getSearchAttributeTypeStore(wfType), request.DataObjects, request.SearchAttributes, nil)
if err != nil {
Expand Down Expand Up @@ -96,7 +98,9 @@ func (w *workerServiceImpl) HandleWorkflowStateDecide(ctx context.Context, reque
stateDef := w.registry.getWorkflowStateDef(wfType, request.GetWorkflowStateId())
input := NewObject(request.StateInput, w.options.ObjectEncoder)
reqContext := request.GetContext()
wfCtx := newWorkflowContext(ctx, reqContext.GetWorkflowId(), reqContext.GetWorkflowRunId(), reqContext.GetStateExecutionId(), reqContext.GetWorkflowStartedTimestamp())
wfCtx := newWorkflowContext(
ctx, reqContext.GetWorkflowId(), reqContext.GetWorkflowRunId(), reqContext.GetStateExecutionId(), reqContext.GetWorkflowStartedTimestamp(),
int(reqContext.GetAttempt()), reqContext.GetFirstAttemptTimestamp())

commandResults, err := fromIdlCommandResults(request.CommandResults, w.options.ObjectEncoder)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions iwf/workflow_context.go
Expand Up @@ -8,4 +8,10 @@ type WorkflowContext interface {
GetWorkflowStartTimestampSeconds() int64
GetStateExecutionId() string
GetWorkflowRunId() string
// GetFirstAttemptTimestampSeconds returns the start time of the first attempt of the API call. It's from ScheduledTimestamp of Cadence/Temporal activity.GetInfo
// require server version 1.2.2+, return 0 if server version is lower
GetFirstAttemptTimestampSeconds() int64
// GetAttempt returns an attempt number, which starts from 1, and increased by 1 for every retry if retry policy is specified. It's from Attempt of Cadence/Temporal activity.GetInfo
// require server version 1.2.2+, return 0 if server version is lower
GetAttempt() int
}
17 changes: 16 additions & 1 deletion iwf/workflow_context_impl.go
Expand Up @@ -8,15 +8,22 @@ type workflowContextImpl struct {
workflowRunId string
stateExecutionId string
workflowStartTimestampSeconds int64
attempt int
firstAttemptTimestampSeconds int64
}

func newWorkflowContext(ctx context.Context, workflowId string, workflowRunId string, stateExecutionId string, workflowStartTimestampSeconds int64) WorkflowContext {
func newWorkflowContext(
ctx context.Context, workflowId string, workflowRunId string, stateExecutionId string, workflowStartTimestampSeconds int64,
attempt int, firstAttemptTimestampSeconds int64,
) WorkflowContext {
return &workflowContextImpl{
Context: ctx,
workflowId: workflowId,
workflowRunId: workflowRunId,
stateExecutionId: stateExecutionId,
workflowStartTimestampSeconds: workflowStartTimestampSeconds,
attempt: attempt,
firstAttemptTimestampSeconds: firstAttemptTimestampSeconds,
}
}

Expand All @@ -35,3 +42,11 @@ func (w workflowContextImpl) GetStateExecutionId() string {
func (w workflowContextImpl) GetWorkflowRunId() string {
return w.workflowRunId
}

func (w workflowContextImpl) GetFirstAttemptTimestampSeconds() int64 {
return w.firstAttemptTimestampSeconds
}

func (w workflowContextImpl) GetAttempt() int {
return w.attempt
}

0 comments on commit f892c02

Please sign in to comment.