Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT: Support Caching of Data Attributes in GoLang SDK #68

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion iwf/client.go
Expand Up @@ -95,7 +95,7 @@ type UnregisteredClient interface {
// GetWorkflowDataAttributes returns the data objects of a workflow execution
// workflowId is required, workflowRunId is optional and default to current runId of the workflowId
// keys is required to be non-empty. If you intend to return all data objects, use GetAllWorkflowDataAttributes API instead
GetWorkflowDataAttributes(ctx context.Context, workflowId, workflowRunId string, keys []string) (map[string]Object, error)
GetWorkflowDataAttributes(ctx context.Context, workflowId, workflowRunId string, keys []string, useMemoForDataAttributes bool) (map[string]Object, error)
// GetWorkflowSearchAttributes returns search attributes of a workflow execution
// workflowId is required, workflowRunId is optional and default to current runId of the workflowId
// keys is required to be non-empty. If you intend to return all data objects, use GetAllWorkflowSearchAttributes API instead
Expand Down
5 changes: 4 additions & 1 deletion iwf/client_impl.go
Expand Up @@ -55,6 +55,8 @@ func (c *clientImpl) StartWorkflow(ctx context.Context, workflow ObjectWorkflow,
}
unregOpt.InitialSearchAttributes = convertedSAs
}
schemaOptions := c.registry.getPersistenceSchemaOptions(wfType)
unregOpt.UsingMemoForDataAttributes = schemaOptions.CachingDataAttributesByMemo
return c.UnregisteredClient.StartWorkflow(ctx, wfType, startStateId, workflowId, timeoutSecs, input, unregOpt)
}

Expand Down Expand Up @@ -130,7 +132,8 @@ func (c *clientImpl) GetWorkflowDataAttributes(ctx context.Context, workflow Obj
return nil, fmt.Errorf("data object type %v is not registered", k)
}
}
return c.UnregisteredClient.GetWorkflowDataAttributes(ctx, workflowId, workflowRunId, keys)
persistenceSchemaOptions := c.registry.getPersistenceSchemaOptions(wfType)
return c.UnregisteredClient.GetWorkflowDataAttributes(ctx, workflowId, workflowRunId, keys, persistenceSchemaOptions.CachingDataAttributesByMemo)
}

func (c *clientImpl) GetWorkflowSearchAttributes(ctx context.Context, workflow ObjectWorkflow, workflowId, workflowRunId string, keys []string) (map[string]interface{}, error) {
Expand Down
4 changes: 4 additions & 0 deletions iwf/persistence.go
Expand Up @@ -60,3 +60,7 @@ type persistenceInternal interface {
searchAttributes []iwfidl.SearchAttribute,
)
}

type PersistenceSchemaOptions struct {
CachingDataAttributesByMemo bool
}
2 changes: 2 additions & 0 deletions iwf/registry.go
Expand Up @@ -14,6 +14,7 @@ type Registry interface {
getWorkflow(wfType string) ObjectWorkflow
getWorkflowStartingState(wfType string) WorkflowState
getWorkflowStateDef(wfType string, id string) StateDef
getPersistenceSchemaOptions(wfType string) PersistenceSchemaOptions
getWorkflowRPC(wfType string, rpcMethod string) CommunicationMethodDef
getWorkflowSignalNameStore(wfType string) map[string]bool
getWorkflowInternalChannelNameStore(wfType string) map[string]bool
Expand All @@ -31,5 +32,6 @@ func NewRegistry() Registry {
dataAttrsKeyStore: map[string]map[string]bool{},
searchAttributeTypeStore: map[string]map[string]iwfidl.SearchAttributeValueType{},
workflowRPCStore: map[string]map[string]CommunicationMethodDef{},
persistenceSchemaOptions: map[string]PersistenceSchemaOptions{},
}
}
13 changes: 13 additions & 0 deletions iwf/registry_impl.go
Expand Up @@ -13,6 +13,7 @@ type registryImpl struct {
internalChannelNameStore map[string]map[string]bool
dataAttrsKeyStore map[string]map[string]bool
searchAttributeTypeStore map[string]map[string]iwfidl.SearchAttributeValueType
persistenceSchemaOptions map[string]PersistenceSchemaOptions
}

func (r *registryImpl) AddWorkflows(workflows ...ObjectWorkflow) error {
Expand All @@ -35,6 +36,9 @@ func (r *registryImpl) AddWorkflow(wf ObjectWorkflow) error {
if err := r.registerWorkflowCommunicationSchema(wf); err != nil {
return err
}
if err := r.registerWorkflowPersistenceSchemaOptions(wf); err != nil {
return err
}
return r.registerWorkflowPersistenceSchema(wf)
}

Expand All @@ -50,6 +54,10 @@ func (r *registryImpl) getWorkflowStartingState(wfType string) WorkflowState {
return r.workflowStartingState[wfType]
}

func (r *registryImpl) getPersistenceSchemaOptions(wfType string) PersistenceSchemaOptions {
return r.persistenceSchemaOptions[wfType]
}

func (r *registryImpl) getWorkflowStateDef(wfType string, id string) StateDef {
return r.workflowStateStore[wfType][id]
}
Expand Down Expand Up @@ -156,3 +164,8 @@ func (r *registryImpl) registerWorkflowPersistenceSchema(wf ObjectWorkflow) erro
r.searchAttributeTypeStore[wfType] = searchAttributes
return nil
}
func (r *registryImpl) registerWorkflowPersistenceSchemaOptions(wf ObjectWorkflow) error {
wfType := GetFinalWorkflowType(wf)
r.persistenceSchemaOptions[wfType] = wf.GetPersistenceSchemaOptions()
return nil
}
4 changes: 4 additions & 0 deletions iwf/rpc_options.go
Expand Up @@ -11,4 +11,8 @@ type RPCOptions struct {
DataAttributesLoadingPolicy *iwfidl.PersistenceLoadingPolicy
// default is ALL_WITHOUT_LOCKING
SearchAttributesLoadingPolicy *iwfidl.PersistenceLoadingPolicy

// Only used when workflow has enabled CachingDataAttributesByMemo (see PersistenceSchemaOptions)
// By default, it's false for high throughput support. Flip to true to bypass the caching for a strong consistent read
BypassCachingForStrongConsistency bool
}
17 changes: 9 additions & 8 deletions iwf/unregistered_client_impl.go
Expand Up @@ -77,23 +77,24 @@ func (u *unregisteredClientImpl) SignalWorkflow(ctx context.Context, workflowId,
return u.processError(err, httpResp)
}

func (u *unregisteredClientImpl) GetWorkflowDataAttributes(ctx context.Context, workflowId, workflowRunId string, keys []string) (map[string]Object, error) {
func (u *unregisteredClientImpl) GetWorkflowDataAttributes(ctx context.Context, workflowId, workflowRunId string, keys []string, useMemoForDataAttributes bool) (map[string]Object, error) {
if len(keys) == 0 {
return nil, fmt.Errorf("must specify keys to return, use GetAllWorkflowDataAttributes if intended to get all keys")
}
return u.doGetWorkflowDataObjects(ctx, workflowId, workflowRunId, keys)
return u.doGetWorkflowDataObjects(ctx, workflowId, workflowRunId, keys, useMemoForDataAttributes)
}

func (u *unregisteredClientImpl) GetAllWorkflowDataAttributes(ctx context.Context, workflowId, workflowRunId string) (map[string]Object, error) {
return u.doGetWorkflowDataObjects(ctx, workflowId, workflowRunId, nil)
func (u *unregisteredClientImpl) GetAllWorkflowDataAttributes(ctx context.Context, workflowId, workflowRunId string, useMemoForDataAttributes bool) (map[string]Object, error) {
return u.doGetWorkflowDataObjects(ctx, workflowId, workflowRunId, nil, useMemoForDataAttributes)
}

func (u *unregisteredClientImpl) doGetWorkflowDataObjects(ctx context.Context, workflowId, workflowRunId string, keys []string) (map[string]Object, error) {
func (u *unregisteredClientImpl) doGetWorkflowDataObjects(ctx context.Context, workflowId, workflowRunId string, keys []string, useMemoForDataAttributes bool) (map[string]Object, error) {
reqPost := u.apiClient.DefaultApi.ApiV1WorkflowDataobjectsGetPost(ctx)
resp, httpResp, err := reqPost.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{
WorkflowId: workflowId,
WorkflowRunId: iwfidl.PtrString(workflowRunId),
Keys: keys,
WorkflowId: workflowId,
WorkflowRunId: iwfidl.PtrString(workflowRunId),
Keys: keys,
UseMemoForDataAttributes: &useMemoForDataAttributes,
}).Execute()
if err := u.processError(err, httpResp); err != nil {
return nil, err
Expand Down
11 changes: 6 additions & 5 deletions iwf/unregistered_workflow_options.go
Expand Up @@ -3,9 +3,10 @@ package iwf
import "github.com/indeedeng/iwf-golang-sdk/gen/iwfidl"

type UnregisteredWorkflowOptions struct {
WorkflowIdReusePolicy *iwfidl.IDReusePolicy
WorkflowCronSchedule *string
WorkflowRetryPolicy *iwfidl.WorkflowRetryPolicy
StartStateOptions *iwfidl.WorkflowStateOptions
InitialSearchAttributes []iwfidl.SearchAttribute
WorkflowIdReusePolicy *iwfidl.IDReusePolicy
WorkflowCronSchedule *string
WorkflowRetryPolicy *iwfidl.WorkflowRetryPolicy
StartStateOptions *iwfidl.WorkflowStateOptions
InitialSearchAttributes []iwfidl.SearchAttribute
UsingMemoForDataAttributes bool
}
9 changes: 9 additions & 0 deletions iwf/workflow.go
Expand Up @@ -31,6 +31,15 @@ type ObjectWorkflow interface {
// External applications can also use "SearchWorkflow" API to find workflows by SQL-like query
GetPersistenceSchema() []PersistenceFieldDef

// PersistenceSchemaOptions defines options for load/search attributes
//
// By default, RPC implementations will load data/search attributes using the Cadence/Temporal
// query API, which is not optimized for very high volume requests a single workflow execution(like 100 rps),
// because it could cause too many replay with history, especially when workflows are closed.
//
// You can use the PersistenceSchema Options to enable caching
GetPersistenceSchemaOptions() PersistenceSchemaOptions

// GetCommunicationSchema defines all the communication methods for this workflow, this includes
// 1. Signal channel
// 2. Interstate channel
Expand Down