-
Notifications
You must be signed in to change notification settings - Fork 415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TEZ-4547: Add Tez AM JobID to the JobConf #339
base: master
Are you sure you want to change the base?
TEZ-4547: Add Tez AM JobID to the JobConf #339
Conversation
Some committers require a job-wide UUID to function correctly. Adding the AM JobID to the JobConf will allow applications to pass that to the committers that need it.
25cae6d
to
9040a2f
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
@@ -417,6 +418,7 @@ protected List<Event> initializeBase() throws IOException, InterruptedException | |||
.createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(), | |||
getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(), | |||
getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), isMapperOutput); | |||
jobConf.set(MRJobConfig.MR_PARENT_JOB_ID, new JobID(String.valueOf(getContext().getApplicationId().getClusterTimestamp()), getContext().getApplicationId().getId()).toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this to line above TaskAttemptID taskAttemptId =
assertNotEquals(parentJobID,invalidJobID); | ||
assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID),parentJobID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix code formatting. space after ,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If my understanding is correct, Hive/Pig would use the value from mapreduce.parent.job.id
to set the correct committer UUID right?
@@ -119,6 +119,7 @@ public void abortOutput(VertexStatus.State finalState) throws IOException { | |||
|| jobConf.getBoolean("mapred.mapper.new-api", false)) { | |||
newApiCommitter = true; | |||
} | |||
jobConf.set(MRJobConfig.MR_PARENT_JOB_ID, new org.apache.hadoop.mapred.JobID(String.valueOf(getContext().getApplicationId().getClusterTimestamp()), getContext().getApplicationId().getId()).toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have String.valueOf(getContext().getApplicationId().getClusterTimestamp()), getContext().getApplicationId().getId()).toString(
inside a method and re-use it in MROutput.java as well ?
Yes, that was the plan. The property name was just chosen arbitrarily so I could put the PR up, any suggestions for a better one are welcome. |
This commit also adds the DAG identifier to the job UUID to ensure that multiple jobs within the same session will be assigned different UUIDs.
This comment was marked as outdated.
This comment was marked as outdated.
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
Outdated
Show resolved
Hide resolved
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java
Outdated
Show resolved
Hide resolved
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
Outdated
Show resolved
Hide resolved
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
Outdated
Show resolved
Hide resolved
This comment was marked as outdated.
This comment was marked as outdated.
@shameersss1 We could actually just set fs.s3a.committer.uuid directly instead of the indirection through the other setting. |
Switch UUID property name to the one required by S3A committers.
This comment was marked as outdated.
This comment was marked as outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
@abstractdog - Could you please review the same ? |
Refactors the implementation to reuse Tez's DAGID type instead of hand-rolling our own.
🎊 +1 overall
This message was automatically generated. |
@abstractdog @shameersss1 Is there anything else needed? |
tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
Outdated
Show resolved
Hide resolved
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
Outdated
Show resolved
Hide resolved
🎊 +1 overall
This message was automatically generated. |
Some committers require a job-wide UUID to function correctly. Adding the AM JobID to the JobConf
will allow applications to pass that to
the committers that need it.