Skip to content

Commit

Permalink
[GOBBLIN-2042] Fix bug where flowgraph validation initialization was …
Browse files Browse the repository at this point in the history
…creating multip… (#3921)

Fix bug where flowgraph validation initialization was creating multiple instances of the flow compiler
  • Loading branch information
Will-Lo committed Apr 15, 2024
1 parent 97fc864 commit 1fc5fb0
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
package org.apache.gobblin.service.modules.orchestration;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,7 +68,6 @@
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;

Expand Down Expand Up @@ -117,26 +114,14 @@ public Orchestrator(Config config, TopologyCatalog topologyCatalog, DagManager d
Optional<DagManagementStateStore> dagManagementStateStore,
FlowCompilationValidationHelper flowCompilationValidationHelper) throws IOException {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
ClassAliasResolver<SpecCompiler> aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
this.flowStatusGenerator = flowStatusGenerator;
this.flowTriggerDecorator = flowTriggerDecorator;
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
this.flowCatalog = flowCatalog;
try {
String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
specCompilerClassName = config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
}
_log.info("Using specCompiler class name/alias " + specCompilerClassName);

this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName(aliasResolver.resolve(specCompilerClassName)), config);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException |
ClassNotFoundException e) {
throw new RuntimeException(e);
}

this.flowCompilationValidationHelper = flowCompilationValidationHelper;
this.specCompiler = flowCompilationValidationHelper.getSpecCompiler();
//At this point, the TopologySpecMap is initialized by the SpecCompiler. Pass the TopologySpecMap to the DagManager.
this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
if (dagManagementStateStore.isPresent()) {
Expand All @@ -155,7 +140,6 @@ public Orchestrator(Config config, TopologyCatalog topologyCatalog, DagManager d
quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
config);
this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.typesafe.config.Config;

import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
Expand Down Expand Up @@ -63,6 +64,7 @@
@Data
public class FlowCompilationValidationHelper {
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
@Getter
private final SpecCompiler specCompiler;
private final UserQuotaManager quotaManager;
private final EventSubmitter eventSubmitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
Expand Down Expand Up @@ -124,10 +125,12 @@ public void setup() throws Exception {
MostlyMySqlDagManagementStateStore dagManagementStateStore =
new MostlyMySqlDagManagementStateStore(config, null, null, null);

SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));

this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.topologyCatalog, mockDagManager, Optional.of(logger), mockStatusGenerator,
Optional.of(mockFlowTriggerHandler), new SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(
orchestratorProperties)), Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore), null);
Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton, Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
new FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
// Start application
Expand Down

0 comments on commit 1fc5fb0

Please sign in to comment.