Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3553 from v1r3n/support_domains
Browse files Browse the repository at this point in the history
Support task domains in the worker spring configuration
  • Loading branch information
v1r3n committed Apr 20, 2023
2 parents 56acc35 + 5443798 commit d39b4bf
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ public int getThreadCount(String taskName) {
String key = "conductor.worker." + taskName + ".threadCount";
return environment.getProperty(key, Integer.class, 0);
}

@Override
public String getDomain(String taskName) {
String key = "conductor.worker." + taskName + ".domain";
return environment.getProperty(key, String.class, null);
}
}
3 changes: 2 additions & 1 deletion client-spring/src/test/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
conductor.client.rootUri=http://localhost:8080/api/
conductor.worker.hello.threadCount=100
conductor.worker.hello.threadCount=100
conductor.worker.hello_again.domain=test
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.netflix.conductor.sdk.workflow.task.WorkerTask;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.reflect.ClassPath;

public class AnnotatedWorkerExecutor {
Expand All @@ -42,6 +43,8 @@ public class AnnotatedWorkerExecutor {

private Map<String, Integer> workerToPollingInterval = new HashMap<>();

private Map<String, String> workerDomains = new HashMap<>();

private Map<String, Object> workerClassObjs = new HashMap<>();

private static Set<String> scannedPackages = new HashSet<>();
Expand Down Expand Up @@ -163,6 +166,14 @@ private void addMethod(WorkerTask annotation, Method method, Object bean) {
}
workerToPollingInterval.put(name, pollingInterval);

String domain = workerConfiguration.getDomain(name);
if (Strings.isNullOrEmpty(domain)) {
domain = annotation.domain();
}
if (!Strings.isNullOrEmpty(domain)) {
workerDomains.put(name, domain);
}

workerClassObjs.put(name, bean);
workerExecutors.put(name, method);
LOGGER.info(
Expand All @@ -187,10 +198,12 @@ public void startPolling() {
}

LOGGER.info("Starting workers with threadCount {}", workerToThreadCount);
LOGGER.info("Worker domains {}", workerDomains);

taskRunner =
new TaskRunnerConfigurer.Builder(taskClient, executors)
.withTaskThreadCount(workerToThreadCount)
.withTaskToDomain(workerDomains)
.build();

taskRunner.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ public int getPollingInterval(String taskName) {
public int getThreadCount(String taskName) {
return 0;
}

public String getDomain(String taskName) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@
int threadCount() default 1;

int pollingInterval() default 100;

String domain() default "";
}

0 comments on commit d39b4bf

Please sign in to comment.