Skip to content

Commit

Permalink
[FLINK-34661][runtime] TaskExecutor supports retain partitions after …
Browse files Browse the repository at this point in the history
…JM crashed.
  • Loading branch information
JunRuiLee authored and zhuzhurk committed May 13, 2024
1 parent 36b1d2a commit 4e6b420
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,8 @@ public CompletableFuture<?> stopTrackingAndReleasePartitions(
allFutures.add(
taskManager
.getTaskExecutorGateway()
.getPartitionWithMetrics(jobGraph.getJobID())));
.getAndRetainPartitionWithMetrics(
jobGraph.getJobID())));
return FutureUtils.combineAll(allFutures)
.thenApply(
partitions ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
@SuppressWarnings("WeakerAccess")
public class ShuffleServiceOptions {

public static final String NETTY_SHUFFLE_SERVICE_FACTORY_CLASS =
"org.apache.flink.runtime.io.network.NettyShuffleServiceFactory";

private ShuffleServiceOptions() {}

/**
Expand All @@ -33,7 +36,7 @@ private ShuffleServiceOptions() {}
public static final ConfigOption<String> SHUFFLE_SERVICE_FACTORY_CLASS =
ConfigOptions.key("shuffle-service-factory.class")
.stringType()
.defaultValue("org.apache.flink.runtime.io.network.NettyShuffleServiceFactory")
.defaultValue(NETTY_SHUFFLE_SERVICE_FACTORY_CLASS)
.withDescription(
"The full class name of the shuffle service factory implementation to be used by the cluster. "
+ "The default implementation uses Netty for network communication and local memory as well disk space "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.management.jmx.JMXService;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
Expand Down Expand Up @@ -189,6 +190,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.runtime.shuffle.ShuffleServiceOptions.NETTY_SHUFFLE_SERVICE_FACTORY_CLASS;
import static org.apache.flink.runtime.shuffle.ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -313,6 +316,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {

private final ProfilingService profilingService;

private final Set<JobID> jobPartitionToCleanupSet = new HashSet<>();

public TaskExecutor(
RpcService rpcService,
TaskManagerConfiguration taskManagerConfiguration,
Expand Down Expand Up @@ -409,6 +414,16 @@ public TaskExecutor(
resourceId, new JobManagerHeartbeatListener(), getMainThreadExecutor(), log);
}

private boolean shouldRetainPartitionsOnJobManagerConnectionLost() {
return taskManagerConfiguration
.getConfiguration()
.get(BatchExecutionOptions.JOB_RECOVERY_ENABLED)
&& taskManagerConfiguration
.getConfiguration()
.get(SHUFFLE_SERVICE_FACTORY_CLASS)
.equals(NETTY_SHUFFLE_SERVICE_FACTORY_CLASS);
}

@Override
public CompletableFuture<Boolean> canBeReleased() {
return CompletableFuture.completedFuture(
Expand Down Expand Up @@ -1365,15 +1380,15 @@ public void disconnectJobManager(JobID jobId, Exception cause) {
.ifPresent(
jobManagerConnection ->
disconnectAndTryReconnectToJobManager(
jobManagerConnection, cause));
jobManagerConnection, cause, true));
}
}

private void disconnectAndTryReconnectToJobManager(
JobTable.Connection jobManagerConnection, Exception cause) {
JobTable.Connection jobManagerConnection, Exception cause, boolean releasePartitions) {
try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobManagerConnection.getJobId()))) {
disconnectJobManagerConnection(jobManagerConnection, cause);
disconnectJobManagerConnection(jobManagerConnection, cause, releasePartitions);
jobLeaderService.reconnect(jobManagerConnection.getJobId());
}
}
Expand Down Expand Up @@ -1449,8 +1464,10 @@ public CompletableFuture<Acknowledge> updateDelegationTokens(
}

@Override
public CompletableFuture<Collection<PartitionWithMetrics>> getPartitionWithMetrics(
public CompletableFuture<Collection<PartitionWithMetrics>> getAndRetainPartitionWithMetrics(
JobID jobId) {
jobPartitionToCleanupSet.remove(jobId);

Collection<TaskExecutorPartitionInfo> partitionInfoList =
partitionTracker.getTrackedPartitionsFor(jobId);
List<PartitionWithMetrics> partitionWithMetrics = new ArrayList<>();
Expand Down Expand Up @@ -1825,7 +1842,8 @@ private void establishJobManagerConnection(
} else {
disconnectJobManagerConnection(
oldJobManagerConnection,
new Exception("Found new job leader for job id " + jobId + '.'));
new Exception("Found new job leader for job id " + jobId + '.'),
true);
}
}

Expand All @@ -1847,14 +1865,16 @@ private void closeJob(JobTable.Job job, Exception cause) {
job.asConnection()
.ifPresent(
jobManagerConnection ->
disconnectJobManagerConnection(jobManagerConnection, cause));
disconnectJobManagerConnection(jobManagerConnection, cause, true));

job.close();
}

private void disconnectJobManagerConnection(
JobTable.Connection jobManagerConnection, Exception cause) {
JobTable.Connection jobManagerConnection, Exception cause, boolean releasePartitions) {
final JobID jobId = jobManagerConnection.getJobId();
jobPartitionToCleanupSet.add(jobId);

log.info(
"Close JobManager connection for job {}.",
jobId,
Expand Down Expand Up @@ -1891,6 +1911,35 @@ private void disconnectJobManagerConnection(
}
}

if (!releasePartitions) {
// this branch is for job recovery from master failures
final Duration maxRegistrationDuration =
taskManagerConfiguration.getMaxRegistrationDuration();

if (maxRegistrationDuration != null) {
log.info(
"Waiting for {} mills for job {} to recover. If the job manager is not reconnected, "
+ "the job's partitions will be cleaned up.",
maxRegistrationDuration.toMillis(),
jobId);
scheduleRunAsync(
() -> {
// If the job is not recovery after wait for a period of time, we will
// clean up the partitions
Optional<JobTable.Job> job = jobTable.getJob(jobId);
if (!job.isPresent()
|| !job.get().isConnected()
|| jobPartitionToCleanupSet.contains(jobId)) {
scheduleResultPartitionCleanup(jobId);
}
},
maxRegistrationDuration);
}
} else {
// cleanup remaining partitions once all tasks for this job have completed
scheduleResultPartitionCleanup(jobId);
}

// 3. Disassociate from the JobManager
try {
jobManagerHeartbeatManager.unmonitorTarget(jobManagerConnection.getResourceId());
Expand Down Expand Up @@ -1935,9 +1984,6 @@ private void disassociateFromJobManager(

final JobID jobId = jobManagerConnection.getJobId();

// cleanup remaining partitions once all tasks for this job have completed
scheduleResultPartitionCleanup(jobId);

final KvStateRegistry kvStateRegistry = kvStateService.getKvStateRegistry();

if (kvStateRegistry != null) {
Expand Down Expand Up @@ -2009,6 +2055,7 @@ private void scheduleResultPartitionCleanup(JobID jobId) {
.thenRunAsync(
() -> {
partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobId);
jobPartitionToCleanupSet.remove(jobId);
},
getMainThreadExecutor());
}
Expand Down Expand Up @@ -2488,7 +2535,8 @@ public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMas
new Exception(
"Job leader for job id "
+ jobId
+ " lost leadership."))));
+ " lost leadership."),
!shouldRetainPartitionsOnJobManagerConnectionLost())));
}

@Override
Expand Down Expand Up @@ -2659,7 +2707,9 @@ private void handleJobManagerConnectionLoss(ResourceID resourceID, Exception cau
.ifPresent(
jobManagerConnection ->
disconnectAndTryReconnectToJobManager(
jobManagerConnection, cause));
jobManagerConnection,
cause,
!shouldRetainPartitionsOnJobManagerConnectionLost()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,13 @@ CompletableFuture<Acknowledge> updateDelegationTokens(
ResourceManagerId resourceManagerId, byte[] tokens);

/**
* Get all partitions and their metrics located on this task executor, the metrics mainly
* includes the meta information of partition(partition bytes, etc).
* Get and retain all partitions and their metrics located on this task executor, the metrics
* mainly includes the meta information of partition(partition bytes, etc).
*
* @param jobId ID of the target job
* @return All partitions belong to the target job and their metrics
*/
default CompletableFuture<Collection<PartitionWithMetrics>> getPartitionWithMetrics(
default CompletableFuture<Collection<PartitionWithMetrics>> getAndRetainPartitionWithMetrics(
JobID jobId) {
throw new UnsupportedOperationException();
}
Expand Down

0 comments on commit 4e6b420

Please sign in to comment.