Skip to content

Commit

Permalink
[GOBBLIN-2061] Fix initialization for DagProcessingEngine (#3943)
Browse files Browse the repository at this point in the history
* Fix initialization for DagProcessingEngine
* Fix compile errors
* Fix dagProcessingEngineTest
* Remove unused import
---------
Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>
  • Loading branch information
umustafi committed May 6, 2024
1 parent 354c78d commit 7d95f70
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,7 @@ binding time (optionally bound classes cannot have names associated with them),
annotatedWith(Names.named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME))
.toProvider(
DagActionProcessingMultiActiveLeaseArbiterFactory.class);
// Multi-active execution is only compatible with dagProcessingEngine configuration
if (serviceConfig.isMultiActiveExecutionEnabled()) {
binder.bind(DagActionReminderScheduler.class);
}
binder.bind(DagActionReminderScheduler.class);

binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
binder.bind(DagTaskStream.class).to(DagManagementTaskStreamImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
Expand Down Expand Up @@ -221,6 +222,9 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
@Inject(optional = true)
protected DagActionStoreChangeMonitor dagActionStoreChangeMonitor;

@Inject(optional = true)
protected DagProcessingEngine dagProcessingEngine;

@Inject
protected GobblinServiceManager(GobblinServiceConfiguration configuration) throws Exception {
this.configuration = Objects.requireNonNull(configuration);
Expand Down Expand Up @@ -382,7 +386,11 @@ private void registerServicesInLauncher(){
}
}

this.serviceLauncher.addService(dagManager);
if (configuration.isDagProcessingEngineEnabled()) {
this.serviceLauncher.addService(dagProcessingEngine);
} {
this.serviceLauncher.addService(dagManager);
}

this.serviceLauncher.addService(databaseManager);
this.serviceLauncher.addService(issueRepository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public class DagActionReminderScheduler {
@Inject
public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory)
throws SchedulerException {
// Create a new Scheduler to be used solely for the DagProc reminders
this.quartzScheduler = schedulerFactory.getScheduler(DAG_ACTION_REMINDER_SCHEDULER_KEY);
// Creates a new Scheduler to be used solely for the DagProc reminders
this.quartzScheduler = schedulerFactory.getScheduler();
}

/**
Expand Down Expand Up @@ -91,8 +91,7 @@ public void execute(JobExecutionContext context) {
String flowGroup = jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
String flowId = jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
DagActionStore.DagActionType dagActionType = DagActionStore.DagActionType.valueOf(
jobDataMap.getString(FLOW_ACTION_TYPE_KEY));
DagActionStore.DagActionType dagActionType = (DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);

log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", flowName: " + flowName
+ ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@
* indication the lease has been completed
* ({@link LeaseAttemptStatus}) then the {@link MultiActiveLeaseArbiter#tryAcquireLease} method will set a reminder for
* the flow action using {@link DagActionReminderScheduler} to reattempt the lease after the current leaseholder's grant
* would have expired.
* would have expired. The {@link DagActionReminderScheduler} is used in the non multi-active execution configuration as
* well to utilize reminders for a single {@link DagManagementTaskStreamImpl} case as well.
* Note that if multi-active execution is NOT enabled, then all flow action events are selected by
* {@link DagManagementTaskStreamImpl#next()} by virtue of having no other contenders for the lease at the time
* {@link MultiActiveLeaseArbiter#tryAcquireLease} is called.
Expand Down Expand Up @@ -90,16 +91,16 @@ public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> dagAc
in {@link GobblinServiceGuiceModule} which handles all possible configurations */
throw new RuntimeException("DagProcessingEngine should not be enabled without dagActionStore enabled.");
}
if (!dagActionReminderScheduler.isPresent()) {
throw new RuntimeException(String.format("DagProcessingEngine requires %s to be instantiated.",
DagActionReminderScheduler.class.getSimpleName()));
}
this.dagActionStore = dagActionStore;
this.dagActionProcessingLeaseArbiter = dagActionProcessingLeaseArbiter;
this.dagActionReminderScheduler = dagActionReminderScheduler;
this.isMultiActiveExecutionEnabled = isMultiActiveExecutionEnabled;
MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.eventSubmitter = new EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build();
if (this.isMultiActiveExecutionEnabled && !this.dagActionReminderScheduler.isPresent()) {
throw new RuntimeException(String.format("Multi-active execution enabled but required "
+ "instance %s is absent.", DagActionReminderScheduler.class.getSimpleName()));
}
}

@Override
Expand Down Expand Up @@ -144,9 +145,8 @@ public DagTask next() {
*/
private LeaseAttemptStatus retrieveLeaseStatus(DagActionStore.DagAction dagAction)
throws IOException, SchedulerException {
LeaseAttemptStatus leaseAttemptStatus;
// TODO: need to handle reminder events and flag them
leaseAttemptStatus = this.dagActionProcessingLeaseArbiter
LeaseAttemptStatus leaseAttemptStatus = this.dagActionProcessingLeaseArbiter
.tryAcquireLease(dagAction, System.currentTimeMillis(), false, false);
/* Schedule a reminder for the event unless the lease has been completed to safeguard against the case where even
we, when we might become the lease owner still fail to complete processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.typesafe.config.Config;
Expand All @@ -47,44 +49,72 @@
* encapsulates all processing inside {@link DagProc#process(DagManagementStateStore)}
*/

@AllArgsConstructor
@Alpha
@Slf4j
@Singleton
public class DagProcessingEngine {
public class DagProcessingEngine extends AbstractIdleService {

@Getter private final Optional<DagTaskStream> dagTaskStream;
@Getter Optional<DagManagementStateStore> dagManagementStateStore;
private final Config config;
private final Optional<DagProcFactory> dagProcFactory;
private ScheduledExecutorService scheduledExecutorPool;
private static final Integer TERMINATION_TIMEOUT = 30;

@Inject
public DagProcessingEngine(Config config, Optional<DagTaskStream> dagTaskStream,
Optional<DagProcFactory> dagProcFactory, Optional<DagManagementStateStore> dagManagementStateStore) {
this.config = config;
this.dagProcFactory = dagProcFactory;
this.dagTaskStream = dagTaskStream;
this.dagManagementStateStore = dagManagementStateStore;
if (!dagTaskStream.isPresent() || !dagProcFactory.isPresent() || !dagManagementStateStore.isPresent()) {
throw new RuntimeException(String.format("DagProcessingEngine cannot be initialized without all of the following"
+ "classes present. DagTaskStream is %s, DagProcFactory is %s, DagManagementStateStore is %s",
this.dagTaskStream.isPresent() ? "present" : "MISSING",
this.dagProcFactory.isPresent() ? "present" : "MISSING",
this.dagManagementStateStore.isPresent() ? "present" : "MISSING"));
}
log.info("DagProcessingEngine initialized.");
}

@Override
protected void startUp() {
Integer numThreads = ConfigUtils.getInt
(config, ServiceConfigKeys.NUM_DAG_PROC_THREADS_KEY, ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS);
ScheduledExecutorService scheduledExecutorPool =
this.scheduledExecutorPool =
Executors.newScheduledThreadPool(numThreads,
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
com.google.common.base.Optional.of("DagProcessingEngineThread")));
this.dagTaskStream = dagTaskStream;
this.dagManagementStateStore = dagManagementStateStore;

for (int i=0; i < numThreads; i++) {
// todo - set metrics for count of active DagProcEngineThread
DagProcEngineThread dagProcEngineThread = new DagProcEngineThread(dagTaskStream.get(), dagProcFactory.get(),
dagManagementStateStore.get());
scheduledExecutorPool.submit(dagProcEngineThread);
dagManagementStateStore.get(), i);
this.scheduledExecutorPool.submit(dagProcEngineThread);
}
}

@Override
protected void shutDown()
throws Exception {
log.info("DagProcessingEngine shutting down.");
this.scheduledExecutorPool.shutdown();
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
}

@AllArgsConstructor
@VisibleForTesting
static class DagProcEngineThread implements Runnable {
private DagTaskStream dagTaskStream;
private DagProcFactory dagProcFactory;
private DagManagementStateStore dagManagementStateStore;
private final int threadID;

@Override
public void run() {
while (true) {
log.info("Starting DagProcEngineThread to process dag tasks. Thread id: {}", threadID);
DagTask dagTask = dagTaskStream.next(); // blocking call
if (dagTask == null) {
//todo - add a metrics to count the times dagTask was null
Expand All @@ -100,8 +130,6 @@ public void run() {
log.error("DagProcEngineThread encountered exception while processing dag " + dagProc.getDagId(), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
}
// todo mark lease success and releases it
//dagTaskStream.complete(dagTask);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,12 @@ protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dag
try {
FlowSpec flowSpec = dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(getDagId().getFlowId()));
flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, getDagId().getFlowExecutionId());
return this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
} catch (URISyntaxException | SpecNotFoundException | InterruptedException e) {
Optional<Dag<JobExecutionPlan>> dag = this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
if (dag.isPresent()) {
dagManagementStateStore.checkpointDag(dag.get());
}
return dag;
} catch (URISyntaxException | SpecNotFoundException | InterruptedException | IOException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.typesafe.config.Config;

import lombok.Data;
Expand Down Expand Up @@ -62,6 +63,7 @@
*/
@Slf4j
@Data
@Singleton
public class FlowCompilationValidationHelper {
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void setUp() throws Exception {
false);
this.dagProcFactory = new DagProcFactory(null);
this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
this.dagManagementTaskStream, this.dagProcFactory, dagManagementStateStore);
this.dagManagementTaskStream, this.dagProcFactory, dagManagementStateStore, 0);
}

/* This tests adding and removal of dag actions from dag task stream with a launch task. It verifies that the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,17 @@ public void setUp() throws Exception {
this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config, Optional.of(mock(DagActionStore.class)),
mock(MultiActiveLeaseArbiter.class), Optional.empty(), false);
mock(MultiActiveLeaseArbiter.class), Optional.of(mock(DagActionReminderScheduler.class)), false);
this.dagProcFactory = new DagProcFactory(null);

DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
new DagProcessingEngine.DagProcEngineThread(this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore);
dagManagementStateStore, 0);
this.dagTaskStream = spy(new MockedDagTaskStream());
DagProcessingEngine dagProcessingEngine =
new DagProcessingEngine(config, Optional.ofNullable(dagTaskStream), Optional.ofNullable(this.dagProcFactory),
Optional.ofNullable(dagManagementStateStore));
dagProcessingEngine.startAsync();
}

static class MockedDagTaskStream implements DagTaskStream {
Expand Down Expand Up @@ -171,14 +172,16 @@ protected void act(DagManagementStateStore dagManagementStateStore, Void state)
@Test
public void dagProcessingTest()
throws InterruptedException, TimeoutException, IOException {

// there are MAX_NUM_OF_TASKS dag tasks returned and then each thread additionally call (infinitely) once to wait
// in this unit tests, it does not infinitely wait though, because the mocked task stream throws an exception on
// (MAX_NUM_OF_TASKS + 1) th call
int expectedNumOfInvocations = MockedDagTaskStream.MAX_NUM_OF_TASKS + ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS;
int expectedExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS / MockedDagTaskStream.FAILING_DAGS_FREQUENCY;

AssertWithBackoff.assertTrue(input -> Mockito.mockingDetails(this.dagTaskStream).getInvocations().size() == expectedNumOfInvocations,
10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " number of times",
10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " number of times. "
+ "Actual number of invocations " + Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(),
log, 1, 1000L);

Assert.assertEquals(this.dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(), expectedExceptions);
Expand Down

0 comments on commit 7d95f70

Please sign in to comment.