Skip to content

Commit

Permalink
[GOBBLIN-2048] Integrate AutomaticTroubleshooter with gobblin-on-te…
Browse files Browse the repository at this point in the history
…mporal `CommitActivityImpl` (#3928)
  • Loading branch information
phet committed Apr 18, 2024
1 parent e79375a commit c8db780
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.SafeDatasetCommit;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
Expand All @@ -69,12 +73,15 @@ public int commit(WUProcessingSpec workSpec) {
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
String jobName = UNDEFINED_JOB_NAME;
AutomaticTroubleshooter troubleshooter = null;
try {
FileSystem fs = Help.loadFileSystem(workSpec);
JobState jobState = Help.loadJobState(workSpec, fs);
jobName = jobState.getJobName();
SharedResourcesBroker<GobblinScopeTypes> instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState);
JobContext globalGobblinContext = new JobContext(jobState.getProperties(), log, instanceBroker, null);
troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
troubleshooter.start();
JobContext globalGobblinContext = new JobContext(jobState.getProperties(), log, instanceBroker, troubleshooter.getIssueRepository());
// TODO: Task state dir is a stub with the assumption it is always colocated with the workunits dir (as in the case of MR which generates workunits)
Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
Path jobOutputPath = new Path(new Path(jobIdParent, "output"), jobIdParent.getName());
Expand All @@ -97,6 +104,10 @@ public int commit(WUProcessingSpec workSpec) {
new IOException(e),
null
);
} finally {
String errCorrelator = String.format("Commit [%s]", calcCommitId(workSpec));
EventSubmitter eventSubmitter = workSpec.getEventSubmitterContext().create();
Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, errCorrelator);
}
}

Expand Down Expand Up @@ -163,6 +174,11 @@ public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry
}
}

/** @return id/correlator for this particular commit activity */
private static String calcCommitId(WUProcessingSpec workSpec) {
return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
}

/**
* Organize task states by dataset urns.
* @param taskStates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,15 @@ protected static void doGaaSFlowExecutionContextPropagation(String flowGroup, St
* refine {@link AutomaticTroubleshooter} issues then report them to the {@link EventSubmitter} and log an issues summary via `logger`;
* gracefully handle `null` `troubleshooter`
*/
public static void finalizeTroubleshooting(AutomaticTroubleshooter troubleshooter, EventSubmitter eventSubmitter, Logger logger, String correlator) {
public static void finalizeTroubleshooting(AutomaticTroubleshooter troubleshooter, EventSubmitter eventSubmitter, Logger logger, String errCorrelator) {
try {
if (troubleshooter == null) {
logger.warn("{} - No troubleshooter to report issues from automatic troubleshooter", correlator);
logger.warn("{} - No troubleshooter to report issues from automatic troubleshooter", errCorrelator);
} else {
Help.reportTroubleshooterIssues(troubleshooter, eventSubmitter);
}
} catch (TroubleshooterException e) {
logger.error(String.format("%s - Failed to report issues from automatic troubleshooter", correlator), e);
logger.error(String.format("%s - Failed to report issues from automatic troubleshooter", errCorrelator), e);
}
}

Expand Down

0 comments on commit c8db780

Please sign in to comment.