Skip to content

Commit

Permalink
[GOBBLIN-2045] Integrate AutomaticTroubleshooter with gobblin-on-te…
Browse files Browse the repository at this point in the history
…mporal `GenerateWorkUnitsImpl` (#3924)

Integrate `AutomaticTroubleshooter` with gobblin-on-temporal `GenerateWorkUnitsImpl`
  • Loading branch information
phet committed Apr 17, 2024
1 parent 99c5a76 commit cb5a215
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface AutomaticTroubleshooter {
* Those events can be consumed by upstream and analytical systems.
*
* Can be disabled with
* {@link org.apache.gobblin.configuration.ConfigurationKeys.TROUBLESHOOTER_DISABLE_EVENT_REPORTING}.
* {@link org.apache.gobblin.configuration.ConfigurationKeys#TROUBLESHOOTER_DISABLE_EVENT_REPORTING}.
* */
void reportJobIssuesAsEvents(EventSubmitter eventSubmitter)
throws TroubleshooterException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,21 @@
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;


Expand All @@ -65,6 +70,8 @@ public int generateWorkUnits(Properties jobProps, EventSubmitterContext eventSub
// jobState.setBroker(jobBroker);
// jobState.setWorkUnitAndDatasetStateFunctional(new CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));

AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobProps));
troubleshooter.start();
try (Closer closer = Closer.create()) {
// before embarking on (potentially expensive) WU creation, first pre-check that the FS is available
FileSystem fs = JobStateUtils.openFileSystem(jobState);
Expand All @@ -85,7 +92,8 @@ public int generateWorkUnits(Properties jobProps, EventSubmitterContext eventSub
log.error(errMsg, ioe);
throw ApplicationFailure.newFailureWithCause(errMsg, "Failure: generating/writing workunits", ioe);
} finally {
// TODO: implement Troubleshooter integration!
EventSubmitter eventSubmitter = eventSubmitterContext.create();
Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, jobState.getJobId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,7 @@ public int processWorkUnit(WorkUnitClaimCheck wu) {
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
try {
if (troubleshooter == null) {
log.warn("{} - No troubleshooter to report issues from automatic troubleshooter", correlator);
} else {
troubleshooter.refineIssues();
troubleshooter.logIssueSummary();
troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
}
} catch (Exception e) {
log.error(String.format("%s - Failed to report issues from automatic troubleshooter", correlator), e);
}
Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, correlator);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import com.typesafe.config.Config;

import org.slf4j.Logger;
import org.slf4j.MDC;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -41,8 +42,11 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
Expand Down Expand Up @@ -247,4 +251,32 @@ protected static void doGaaSFlowExecutionContextPropagation(String flowGroup, St
MDC.put(ConfigurationKeys.FLOW_NAME_KEY, String.format("%s:%s",ConfigurationKeys.FLOW_NAME_KEY, flowName));
MDC.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, String.format("%s:%s",ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecId));
}

/**
* refine {@link AutomaticTroubleshooter} issues then report them to the {@link EventSubmitter} and log an issues summary via `logger`;
* gracefully handle `null` `troubleshooter`
*/
public static void finalizeTroubleshooting(AutomaticTroubleshooter troubleshooter, EventSubmitter eventSubmitter, Logger logger, String correlator) {
try {
if (troubleshooter == null) {
logger.warn("{} - No troubleshooter to report issues from automatic troubleshooter", correlator);
} else {
Help.reportTroubleshooterIssues(troubleshooter, eventSubmitter);
}
} catch (TroubleshooterException e) {
logger.error(String.format("%s - Failed to report issues from automatic troubleshooter", correlator), e);
}
}

/**
* refine and report {@link AutomaticTroubleshooter} issues to the {@link EventSubmitter}; additionally {@link AutomaticTroubleshooter#logIssueSummary()}
*
* ATTENTION: `troubleshooter` MUST NOT be `null`
*/
public static void reportTroubleshooterIssues(AutomaticTroubleshooter troubleshooter, EventSubmitter eventSubmitter)
throws TroubleshooterException {
troubleshooter.refineIssues();
troubleshooter.logIssueSummary();
troubleshooter.reportJobIssuesAsEvents(eventSubmitter);
}
}

0 comments on commit cb5a215

Please sign in to comment.