Skip to content

Commit

Permalink
[GOBBLIN-2051] Rework FlowLaunchHandler, DagActionStore, and rela…
Browse files Browse the repository at this point in the history
…ted classes for clarity (#3927)

Rework `FlowLaunchHandler`, `DagActionStore`, and related class' javadoc and method naming for clarity
  • Loading branch information
phet committed Apr 23, 2024
1 parent 36cdf2b commit a74d17a
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public void scheduleJob(Properties jobProps, JobListener jobListener, Map<String

try {
// Schedule the Quartz job with a trigger built from the job configuration
Trigger trigger = createTriggerForJob(job.getKey(), jobProps, Optional.absent());
Trigger trigger = createTriggerForJob(job.getKey(), jobProps, java.util.Optional.empty());
this.scheduler.getScheduler().scheduleJob(job, trigger);
logNewlyScheduledJob(job, trigger);
} catch (SchedulerException se) {
Expand Down Expand Up @@ -585,11 +585,11 @@ public void close() throws IOException {
* Get a {@link org.quartz.Trigger} from the given job configuration properties. If triggerSuffix is provided, appends
* it to the end of the flow name. The suffix is used to add multiple unique triggers associated with the same job
*/
public static Trigger createTriggerForJob(JobKey jobKey, Properties jobProps, Optional<String> triggerSuffix) {
public static Trigger createTriggerForJob(JobKey jobKey, Properties jobProps, java.util.Optional<String> triggerSuffix) {
// Build a trigger for the job with the given cron-style schedule
return TriggerBuilder.newTrigger()
.withIdentity(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)
+ triggerSuffix.transform(s -> "_" + s).or(""),
+ triggerSuffix.map(s -> "_" + s).orElse(""),
Strings.nullToEmpty(jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY)))
.forJob(jobKey)
.withSchedule(CronScheduleBuilder.cronSchedule(jobProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.gobblin.scheduler;

import com.google.common.base.Optional;
import java.util.Optional;
import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.junit.Assert;
Expand All @@ -39,7 +39,7 @@ public void testCreateUniqueTriggersForJob() {
jobProps.put(ConfigurationKeys.JOB_GROUP_KEY, jobGroup);
jobProps.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?");

Trigger trigger1 = JobScheduler.createTriggerForJob(jobKey, jobProps, Optional.absent());
Trigger trigger1 = JobScheduler.createTriggerForJob(jobKey, jobProps, Optional.empty());
Trigger trigger2 = JobScheduler.createTriggerForJob(jobKey, jobProps, Optional.of("suffix"));

Assert.assertFalse(trigger1.getKey().equals(trigger2.getKey()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class DagAction {
final String jobName;
final DagActionType dagActionType;

public static DagAction forFlow(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) {
return new DagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT, dagActionType);
}

public FlowId getFlowId() {
return new FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
}
Expand Down Expand Up @@ -90,6 +94,16 @@ public DagNodeId getDagNodeId() {
*/
boolean exists(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) throws IOException, SQLException;

/** Persist the {@link DagAction} in {@link DagActionStore} for durability */
default void addDagAction(DagAction dagAction) throws IOException {
addJobDagAction(
dagAction.getFlowGroup(),
dagAction.getFlowName(),
dagAction.getFlowExecutionId(),
dagAction.getJobName(),
dagAction.getDagActionType());
}

/**
* Persist the dag action in {@link DagActionStore} for durability
* @param flowGroup flow group for the dag action
Expand All @@ -109,11 +123,13 @@ public DagNodeId getDagNodeId() {
* @param dagActionType the value of the dag action
* @throws IOException
*/
void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) throws IOException;
default void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) throws IOException {
addDagAction(DagAction.forFlow(flowGroup, flowName, flowExecutionId, dagActionType));
}

/**
* delete the dag action from {@link DagActionStore}
* @param DagAction containing all information needed to identify dag and specific action value
* @param dagAction containing all information needed to identify dag and specific action value
* @throws IOException
* @return true if we successfully delete one record, return false if the record does not exist
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private DagTask createDagTask(DagActionStore.DagAction dagAction, LeaseAttemptSt
*/
protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
throws SchedulerException {
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getDagAction(),
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusDagAction(),
leaseStatus.getMinimumLingerDurationMillis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;

Expand All @@ -35,7 +36,6 @@
import org.quartz.impl.JobDetailImpl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.typesafe.config.Config;

import javax.inject.Inject;
Expand Down Expand Up @@ -68,7 +68,7 @@
@Slf4j
public class FlowLaunchHandler {
private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
private Optional<DagActionStore> dagActionStore;
private DagActionStore dagActionStore;
private final MetricContext metricContext;
private final int schedulerMaxBackoffMillis;
private static Random random = new Random();
Expand All @@ -80,9 +80,14 @@ public class FlowLaunchHandler {
@Inject
public FlowLaunchHandler(Config config,
@Named(ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME) MultiActiveLeaseArbiter leaseArbiter,
SchedulerService schedulerService, Optional<DagActionStore> dagActionStore) {
SchedulerService schedulerService, com.google.common.base.Optional<DagActionStore> optDagActionStore) {
this.multiActiveLeaseArbiter = leaseArbiter;
this.dagActionStore = dagActionStore;

if (!optDagActionStore.isPresent()) {
throw new RuntimeException("DagActionStore MUST be present for flow launch handling!");
}
this.dagActionStore = optDagActionStore.get();

this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
this.schedulerService = schedulerService;
Expand All @@ -99,51 +104,46 @@ public FlowLaunchHandler(Config config,
* This method is used in the multi-active scheduler case for one or more hosts to respond to a launch dag action
* event triggered by the scheduler by attempting a lease for the launch event and processing the result depending on
* the status of the attempt.
* @param jobProps
* @param dagAction
* @param eventTimeMillis
* @param isReminderEvent
* @param skipFlowExecutionIdReplacement
* @throws IOException
*/
public void handleFlowLaunchTriggerEvent(Properties jobProps, DagActionStore.DagAction dagAction,
long eventTimeMillis, boolean isReminderEvent, boolean skipFlowExecutionIdReplacement) throws IOException {
LeaseAttemptStatus
leaseAttemptStatus = this.multiActiveLeaseArbiter
.tryAcquireLease(dagAction, eventTimeMillis, isReminderEvent, skipFlowExecutionIdReplacement);
if (leaseAttemptStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus) {
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus;
if (persistDagAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getDagAction(),
leaseObtainedStatus.getEventTimeMillis());
return;
}
// If persisting the dag action failed, then we set another trigger for this event to occur immediately to
// re-attempt handling the event
scheduleReminderForEvent(jobProps,
new LeaseAttemptStatus.LeasedToAnotherStatus(leaseObtainedStatus.getDagAction(), 0L), eventTimeMillis);
} else if (leaseAttemptStatus instanceof LeaseAttemptStatus.LeasedToAnotherStatus) {
scheduleReminderForEvent(jobProps, (LeaseAttemptStatus.LeasedToAnotherStatus) leaseAttemptStatus,
eventTimeMillis);
}
// Otherwise leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus & no need to do anything
long eventTimeMillis, boolean isReminderEvent, boolean adoptConsensusFlowExecutionId) throws IOException {
LeaseAttemptStatus leaseAttempt = this.multiActiveLeaseArbiter.tryAcquireLease(
dagAction, eventTimeMillis, isReminderEvent, adoptConsensusFlowExecutionId);
if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus
&& persistDagAction((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseAttempt.getConsensusDagAction(),
((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt).getEventTimeMillis());
} else { // when NOT successfully `persistDagAction`, set a reminder to re-attempt handling (unless leasing finished)
calcLeasedToAnotherStatusForReminder(leaseAttempt).ifPresent(leasedToAnother ->
scheduleReminderForEvent(jobProps, leasedToAnother, eventTimeMillis));
}
}

// Called after obtaining a lease to persist the dag action to {@link DagActionStore} and mark the lease as done
private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) {
if (this.dagActionStore.isPresent()) {
try {
DagActionStore.DagAction dagAction = leaseStatus.getDagAction();
this.dagActionStore.get().addFlowDagAction(dagAction.getFlowGroup(), dagAction.getFlowName(), dagAction.getFlowExecutionId(), dagAction.getDagActionType());
// If the dag action has been persisted to the {@link DagActionStore} we can close the lease
this.numFlowsSubmitted.mark();
return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
} catch (IOException e) {
throw new RuntimeException(e);
}
/** @return {@link Optional} status for reminding, unless {@link LeaseAttemptStatus.NoLongerLeasingStatus} (hence nothing to do) */
private Optional<LeaseAttemptStatus.LeasedToAnotherStatus> calcLeasedToAnotherStatusForReminder(LeaseAttemptStatus leaseAttempt) {
if (leaseAttempt instanceof LeaseAttemptStatus.NoLongerLeasingStatus) { // all done: nothing to remind about
return Optional.empty();
} else if (leaseAttempt instanceof LeaseAttemptStatus.LeasedToAnotherStatus) { // already have one: just return it
return Optional.of((LeaseAttemptStatus.LeasedToAnotherStatus) leaseAttempt);
} else if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus) { // remind w/o delay to immediately re-attempt handling
return Optional.of(new LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(), 0L));
} else {
throw new RuntimeException("DagActionStore is " + (this.dagActionStore.isPresent() ? "" : "NOT") + " present.");
throw new RuntimeException("unexpected `LeaseAttemptStatus` derived type: '" + leaseAttempt.getClass().getName() + "' in '" + leaseAttempt + "'");
}
}

/**
* Called after obtaining a lease to both persist to the {@link DagActionStore} and
* {@link MultiActiveLeaseArbiter#recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus)}
*/
private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) {
try {
this.dagActionStore.addDagAction(leaseStatus.getConsensusDagAction());
this.numFlowsSubmitted.mark();
// after successfully persisting, close the lease
return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

Expand All @@ -156,7 +156,7 @@ private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseSta
*/
private void scheduleReminderForEvent(Properties jobProps, LeaseAttemptStatus.LeasedToAnotherStatus status,
long triggerEventTimeMillis) {
DagActionStore.DagAction dagAction = status.getDagAction();
DagActionStore.DagAction dagAction = status.getConsensusDagAction();
JobKey origJobKey = new JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job group>>"));
try {
Expand Down Expand Up @@ -196,7 +196,7 @@ protected Trigger createAndScheduleReminder(JobKey origJobKey, LeaseAttemptStatu
Trigger reminderTrigger = JobScheduler.createTriggerForJob(reminderJobKey, getJobPropertiesFromJobDetail(jobDetail),
Optional.of(reminderSuffix));
log.debug("Flow Launch Handler - [{}, eventTimestamp: {}] - attempting to schedule reminder for event {} with "
+ "reminderJobKey {} and reminderTriggerKey {}", status.getDagAction(), triggerEventTimeMillis,
+ "reminderJobKey {} and reminderTriggerKey {}", status.getConsensusDagAction(), triggerEventTimeMillis,
status.getEventTimeMillis(), reminderJobKey, reminderTrigger.getKey());
this.schedulerService.getScheduler().scheduleJob(jobDetail, reminderTrigger);
return reminderTrigger;
Expand Down Expand Up @@ -258,7 +258,7 @@ public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
// Saves the following properties in jobProps to retrieve when the trigger fires
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
String.valueOf(getUTCTimeFromDelayPeriod(delayPeriodMillis)));
// Use the db laundered timestamp for the reminder to ensure consensus between hosts. Participant trigger timestamps
// Use the db consensus timestamp for the reminder to ensure inter-host agreement. Participant trigger timestamps
// can differ between participants and be interpreted as a reminder for a distinct flow trigger which will cause
// excess flows to be triggered by the reminder functionality.
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@


/**
Class used to encapsulate status of lease acquisition attempts made by {@link MultiActiveLeaseArbiter} and contains
information specific to the status that results. The {@link LeaseAttemptStatus#getDagAction} and
{@link LeaseAttemptStatus#getMinimumLingerDurationMillis} are meant to be
overridden and used by relevant derived classes.
* Hierarchy to convey the specific outcome of attempted lease acquisition via the {@link MultiActiveLeaseArbiter},
* with each derived type carrying outcome-specific status info.
*
* IMPL. NOTE: {@link LeaseAttemptStatus#getConsensusDagAction} and {@link LeaseAttemptStatus#getMinimumLingerDurationMillis}
* intended for `@Override`.
*/
public abstract class LeaseAttemptStatus {
public DagActionStore.DagAction getDagAction() {
/**
* @return the {@link DagActionStore.DagAction}, which may now have an updated flowExecutionId that MUST henceforth be
* used; {@see MultiActiveLeaseArbiter#tryAcquireLease}
*/
public DagActionStore.DagAction getConsensusDagAction() {
return null;
}

Expand All @@ -53,7 +58,7 @@ public static class NoLongerLeasingStatus extends LeaseAttemptStatus {}
*/
@Data
public static class LeaseObtainedStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction dagAction;
private final DagActionStore.DagAction consensusDagAction;
private final long leaseAcquisitionTimestamp;
private final long minimumLingerDurationMillis;
@Getter(AccessLevel.NONE)
Expand All @@ -63,7 +68,7 @@ public static class LeaseObtainedStatus extends LeaseAttemptStatus {
* @return event time in millis since epoch for the event of this lease acquisition
*/
public long getEventTimeMillis() {
return Long.parseLong(dagAction.getFlowExecutionId());
return Long.parseLong(consensusDagAction.getFlowExecutionId());
}

/**
Expand All @@ -85,15 +90,15 @@ public boolean completeLease() throws IOException {
*/
@Data
public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
private final DagActionStore.DagAction dagAction;
private final DagActionStore.DagAction consensusDagAction;
private final long minimumLingerDurationMillis;

/**
* Returns event time in millis since epoch for the event whose lease was obtained by another participant.
* @return
*/
public long getEventTimeMillis() {
return Long.parseLong(dagAction.getFlowExecutionId());
return Long.parseLong(consensusDagAction.getFlowExecutionId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ public interface MultiActiveLeaseArbiter {
* @param eventTimeMillis is the time this dag action was triggered
* @param isReminderEvent true if the dag action event we're checking on is a reminder event
* @param adoptConsensusFlowExecutionId if true then replaces the dagAction flowExecutionId returned in
* LeaseAttemptStatuses with the consensus eventTime
* LeaseAttemptStatuses with the consensus eventTime, accessed via
* {@link LeaseAttemptStatus#getConsensusDagAction()}
*
* @return LeaseAttemptStatus, containing a dag action that will have an updated flow execution id if `
* adoptConsensusFlowExecutionId` is true. The caller should use the newer version of the dag action to easily track
* the action moving forward.
* @return {@link LeaseAttemptStatus}, containing, when `adoptConsensusFlowExecutionId`, a universally-agreed-upon
* {@link DagActionStore.DagAction} with a possibly updated ("laundered") flow execution id that MUST be used thereafter
* @throws IOException
*/
LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction dagAction, long eventTimeMillis, boolean isReminderEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,6 @@ public void addJobDagAction(String flowGroup, String flowName, String flowExecut
}}, true);
}

@Override
public void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType)
throws IOException {
addJobDagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT, dagActionType);
}

@Override
public boolean deleteDagAction(DagAction dagAction) throws IOException {
return dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT, tableName), deleteStatement -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ protected static void completeUpdatePreparedStatement(PreparedStatement statemen
@Override
public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus status)
throws IOException {
DagActionStore.DagAction dagAction = status.getDagAction();
DagActionStore.DagAction dagAction = status.getConsensusDagAction();
return dbStatementExecutor.withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, leaseArbiterTableName),
updateStatement -> {
int i = 0;
Expand Down

0 comments on commit a74d17a

Please sign in to comment.