Skip to content

Commit

Permalink
[GOBBLIN-2054] Fix CommitActivityImpl to succeed for TaskStates t…
Browse files Browse the repository at this point in the history
…hat did not originate with `CopySource` (#3934)
  • Loading branch information
phet committed Apr 25, 2024
1 parent 773da76 commit 77f9d38
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,7 @@ public static Optional<Class<? extends DataPublisher>> getJobDataPublisherClass(
* or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true.
*/
public static boolean shouldCommitDataInJob(State state) {
boolean jobCommitPolicyIsFull =
JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
boolean jobCommitPolicyIsFull = JobCommitPolicy.getCommitPolicy(state) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL);
boolean jobDataPublisherSpecified =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,16 +392,21 @@ public List<TaskState> getTaskProgress() {
* @return a {@link Map} from dataset URNs to {@link DatasetState}s representing the dataset states
*/
public Map<String, DatasetState> createDatasetStatesByUrns() {
return calculateDatasetStatesByUrns(this.taskStates.values(), this.skippedTaskStates.values());
}

/** {@see JobState#createDatasetStatesByUrns} */
public Map<String, DatasetState> calculateDatasetStatesByUrns(Collection<TaskState> allTaskStates, Collection<TaskState> allSkippedTaskStates) {
Map<String, DatasetState> datasetStatesByUrns = Maps.newHashMap();

for (TaskState taskState : this.taskStates.values()) {
for (TaskState taskState : allTaskStates) {
String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);

datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
}

for (TaskState taskState : this.skippedTaskStates.values()) {
for (TaskState taskState : allSkippedTaskStates) {
String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
datasetStatesByUrns.get(datasetUrn).addSkippedTaskState(taskState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,7 @@

package org.apache.gobblin.temporal.ddm.activity.impl;

import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import io.temporal.failure.ApplicationFailure;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -33,13 +26,24 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import io.temporal.failure.ApplicationFailure;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.JobContext;
Expand All @@ -57,8 +61,6 @@
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;


@Slf4j
Expand All @@ -72,37 +74,27 @@ public class CommitActivityImpl implements CommitActivity {
public int commit(WUProcessingSpec workSpec) {
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
String jobName = UNDEFINED_JOB_NAME;
Optional<String> optJobName = Optional.empty();
AutomaticTroubleshooter troubleshooter = null;
try {
FileSystem fs = Help.loadFileSystem(workSpec);
JobState jobState = Help.loadJobState(workSpec, fs);
jobName = jobState.getJobName();
optJobName = Optional.ofNullable(jobState.getJobName());
SharedResourcesBroker<GobblinScopeTypes> instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState);
troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
troubleshooter.start();
JobContext globalGobblinContext = new JobContext(jobState.getProperties(), log, instanceBroker, troubleshooter.getIssueRepository());
// TODO: Task state dir is a stub with the assumption it is always colocated with the workunits dir (as in the case of MR which generates workunits)
Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
Path jobOutputPath = new Path(new Path(jobIdParent, "output"), jobIdParent.getName());
log.info("Output path at: " + jobOutputPath + " with fs at " + fs.getUri());
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, fs);
Optional<Queue<TaskState>> taskStateQueueOpt =
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath.getName(), numDeserializationThreads);
if (!taskStateQueueOpt.isPresent()) {
log.error("No task states found at " + jobOutputPath);
return 0;
List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState, numDeserializationThreads);
if (!taskStates.isEmpty()) {
JobContext jobContext = new JobContext(jobState.getProperties(), log, instanceBroker, troubleshooter.getIssueRepository());
commitTaskStates(jobState, taskStates, jobContext);
}
Queue<TaskState> taskStateQueue = taskStateQueueOpt.get();
commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue), globalGobblinContext);
return taskStateQueue.size();
return taskStates.size();
} catch (Exception e) {
//TODO: IMPROVE GRANULARITY OF RETRIES
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed to commit dataset state for some dataset(s) of job %s", jobName),
String.format("Failed to commit dataset state for some dataset(s) of job %s", optJobName.orElse(UNDEFINED_JOB_NAME)),
IOException.class.toString(),
new IOException(e),
null
new IOException(e)
);
} finally {
String errCorrelator = String.format("Commit [%s]", calcCommitId(workSpec));
Expand All @@ -118,8 +110,13 @@ public int commit(WUProcessingSpec workSpec) {
* @param jobContext
* @throws IOException
*/
private void commitTaskStates(State jobState, Collection<TaskState> taskStates, JobContext jobContext) throws IOException {
Map<String, JobState.DatasetState> datasetStatesByUrns = createDatasetStatesByUrns(taskStates);
private void commitTaskStates(JobState jobState, List<TaskState> taskStates, JobContext jobContext) throws IOException {
if (!taskStates.isEmpty()) {
TaskState firstTaskState = taskStates.get(0);
log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true));
}
//TODO: handle skipped tasks?
Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(taskStates, Lists.newArrayList());
final boolean shouldCommitDataInJob = JobContext.shouldCommitDataInJob(jobState);
final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE;
//TODO: Make this configurable
Expand Down Expand Up @@ -148,7 +145,7 @@ public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry
}).iterator(), numCommitThreads,
// TODO: Rewrite executorUtils to use java util optional
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), com.google.common.base.Optional.of("Commit-thread-%d")))
.executeAndGetResults();
.executeAndGetResults();

IteratorExecutor.logFailures(result, null, 10);

Expand All @@ -166,44 +163,39 @@ public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry
}
if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure and determine whether or not this is retryable to throw a non-retryable failure exception
String jobName = jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, UNDEFINED_JOB_NAME);
String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY, UNDEFINED_JOB_NAME);
throw new IOException("Failed to commit dataset state for some dataset(s) of job " + jobName);
}
} catch (InterruptedException exc) {
throw new IOException(exc);
}
}

/** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>} indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem fs, JobState jobState, int numThreads) throws IOException {
// TODO - decide whether to replace this method by adapting TaskStateCollectorService::collectOutputTaskStates (whence much of this code was drawn)
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, fs);
// NOTE: TaskState dir is assumed to be a sibling to the workunits dir (following conventions of `MRJobLauncher`)
String jobIdPathName = new Path(workSpec.getWorkUnitsDir()).getParent().getName();
log.info("TaskStateStore path (name component): '{}' (fs: '{}')", jobIdPathName, fs.getUri());
Optional<Queue<TaskState>> taskStateQueueOpt = TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobIdPathName, numThreads);
return taskStateQueueOpt.map(taskStateQueue ->
taskStateQueue.stream().peek(taskState ->
// CRITICAL: although some `WorkUnit`s, like those created by `CopySource::FileSetWorkUnitGenerator` for each `CopyEntity`
// already themselves contain every prop of their `JobState`, not all do.
// `TaskState extends WorkUnit` serialization will include its constituent `WorkUnit`, but not the constituent `JobState`.
// given some `JobState` props may be essential for commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
taskState.setJobState(jobState)
// TODO - decide whether something akin necessary to streamline cumulative in-memory size of all issues: consumeTaskIssues(taskState);
).collect(Collectors.toList())
).orElseGet(() -> {
log.error("TaskStateStore successfully opened, but no task states found under (name) '{}'", jobIdPathName);
return Lists.newArrayList();
});
}

/** @return id/correlator for this particular commit activity */
private static String calcCommitId(WUProcessingSpec workSpec) {
return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
}

/**
* Organize task states by dataset urns.
* @param taskStates
* @return A map of dataset urns to dataset task states.
*/
public static Map<String, JobState.DatasetState> createDatasetStatesByUrns(Collection<TaskState> taskStates) {
Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();

//TODO: handle skipped tasks?
for (TaskState taskState : taskStates) {
String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
}

return datasetStatesByUrns;
}

private static String createDatasetUrn(Map<String, JobState.DatasetState> datasetStatesByUrns, TaskState taskState) {
String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN);
if (!datasetStatesByUrns.containsKey(datasetUrn)) {
JobState.DatasetState datasetState = new JobState.DatasetState();
datasetState.setDatasetUrn(datasetUrn);
datasetStatesByUrns.put(datasetUrn, datasetState);
}
return datasetUrn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void onTaskCommitCompletion(Task task) {
// TODO: if metrics configured, report them now
log.info("WU [{} = {}] - finished commit after {}ms with state {}{}", wu.getCorrelator(), task.getTaskId(),
taskState.getTaskDuration(), taskState.getWorkingState(),
taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL)
taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL) && taskState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)
? (" to: " + taskState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)) : "");
log.debug("WU [{} = {}] - task state: {}", wu.getCorrelator(), task.getTaskId(),
taskState.toJsonString(shouldUseExtendedLogging(wu)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public int execute(Properties jobProps, EventSubmitterContext eventSubmitterCont
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed Gobblin job %s", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
e.getClass().toString(),
e,
null
e
);
}
return numWUsGenerated;
Expand Down

0 comments on commit 77f9d38

Please sign in to comment.