Skip to content

Commit

Permalink
Expose missing config for setVersionAtStart
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Feb 23, 2024
1 parent 06b9438 commit 5debdce
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 13 deletions.
5 changes: 4 additions & 1 deletion integ/basic_test.go
Expand Up @@ -50,7 +50,10 @@ func doTestBasicWorkflow(t *testing.T, backendType service.BackendType, config *
closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

closeFunc2 := startIwfService(backendType)
_, closeFunc2 := startIwfServiceByConfig(IwfServiceTestConfig{
BackendType: backendType,
SetVersionAtStart: true,
})
defer closeFunc2()

// start a workflow
Expand Down
7 changes: 4 additions & 3 deletions integ/config.go
Expand Up @@ -7,11 +7,12 @@ import (
const testWorkflowServerPort = "9714"
const testIwfServerPort = "9715"

func createTestConfig(failAtMemoCompatibility bool) config.Config {
func createTestConfig(failAtMemoCompatibility bool, setVersionAtStart bool) config.Config {
return config.Config{
Api: config.ApiConfig{
Port: 9715,
MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test
Port: 9715,
MaxWaitSeconds: 10, // use 10 so that we can test it in the waiting test
SetVersionAtStart: setVersionAtStart,
},
Interpreter: config.Interpreter{
VerboseDebug: false,
Expand Down
4 changes: 3 additions & 1 deletion integ/persistence_test.go
Expand Up @@ -96,7 +96,9 @@ func TestPersistenceWorkflowCadenceContinueAsNew(t *testing.T) {
}
}

func doTestPersistenceWorkflow(t *testing.T, backendType service.BackendType, useMemo, memoEncryption bool, config *iwfidl.WorkflowConfig) {
func doTestPersistenceWorkflow(
t *testing.T, backendType service.BackendType, useMemo, memoEncryption bool, config *iwfidl.WorkflowConfig,
) {
assertions := assert.New(t)
wfHandler := persistence.NewHandler()
closeFunc1 := startWorkflowWorker(wfHandler)
Expand Down
9 changes: 5 additions & 4 deletions integ/util.go
Expand Up @@ -66,6 +66,7 @@ type IwfServiceTestConfig struct {
BackendType service.BackendType
MemoEncryption bool
DisableFailAtMemoIncompatibility bool // default to false so that we will fail at test
SetVersionAtStart bool
}

func startIwfService(backendType service.BackendType) (closeFunc func()) {
Expand Down Expand Up @@ -110,7 +111,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
panic(err)
}
uclient = temporalapi.NewTemporalClient(temporalClient, testNamespace, dataConverter, config.MemoEncryption)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility), uclient, logger)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.SetVersionAtStart), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -122,7 +123,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := temporal.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter := temporal.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.SetVersionAtStart), temporalClient, service.TaskQueue, config.MemoEncryption, dataConverter, uclient)
interpreter.Start()
return uclient, func() {
iwfServer.Close()
Expand All @@ -141,7 +142,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
panic(err)
}
uclient = cadenceapi.NewCadenceClient(iwf.DefaultCadenceDomain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility), uclient, logger)
iwfService := api.NewService(createTestConfig(failAtMemoIncompatibility, config.SetVersionAtStart), uclient, logger)
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand All @@ -153,7 +154,7 @@ func doStartIwfServiceWithClient(config IwfServiceTestConfig) (uclient uclient.U
}()

// start iwf interpreter worker
interpreter := cadence.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter := cadence.NewInterpreterWorker(createTestConfig(failAtMemoIncompatibility, config.SetVersionAtStart), serviceClient, iwf.DefaultCadenceDomain, service.TaskQueue, closeFunc, uclient)
interpreter.Start()
return uclient, func() {
iwfServer.Close()
Expand Down
17 changes: 13 additions & 4 deletions service/api/service.go
Expand Up @@ -56,14 +56,23 @@ func (s *serviceImpl) ApiV1WorkflowStartPost(
) (wresp *iwfidl.WorkflowStartResponse, retError *errors.ErrorAndStatus) {
defer func() { log.CapturePanic(recover(), s.logger, &retError) }()

var sysSAs map[string]interface{}
if s.config.Api.SetVersionAtStart {
sysSAs = map[string]interface{}{
service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType,
service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions,
}
} else {
sysSAs = map[string]interface{}{
service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType,
}
}

workflowOptions := uclient.StartWorkflowOptions{
ID: req.GetWorkflowId(),
TaskQueue: s.taskQueue,
WorkflowExecutionTimeout: time.Duration(req.WorkflowTimeoutSeconds) * time.Second,
SearchAttributes: map[string]interface{}{
service.SearchAttributeIwfWorkflowType: req.IwfWorkflowType,
service.SearchAttributeGlobalVersion: versions.MaxOfAllVersions,
},
SearchAttributes: sysSAs,
}

var workflowConfig iwfidl.WorkflowConfig
Expand Down
3 changes: 3 additions & 0 deletions service/common/config/config.go
Expand Up @@ -26,6 +26,9 @@ type (
// Port is the port on which the API service will bind to
Port int `yaml:"port"`
MaxWaitSeconds int64 `yaml:"maxWaitSeconds"`
// SetVersionAtStart is the flag to set version at startAPI
// This is for optimizing the workflow actions(version marker, upsertSearchAttribute)
SetVersionAtStart bool `yaml:"setVersionAtStart"`
}

Interpreter struct {
Expand Down
3 changes: 3 additions & 0 deletions service/interpreter/globalVersioner.go
Expand Up @@ -36,6 +36,9 @@ func NewGlobalVersioner(workflowProvider WorkflowProvider, ctx UnifiedContext) (
panic("search attribute global version is not found")
}
version = int(attribute.GetIntegerValue())
if versions.MaxOfAllVersions < version {
panic("requesting for a version that is not supported, panic to retry in next workflow task")
}
isFromStart = true
}

Expand Down

0 comments on commit 5debdce

Please sign in to comment.