Skip to content

Commit

Permalink
[FLINK-32087][checkpoint] Introduce space amplification statistics of…
Browse files Browse the repository at this point in the history
… file merging (#24762)
  • Loading branch information
fredia committed May 14, 2024
1 parent 4165bac commit 9a5a99b
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.io.Closeable;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -268,4 +269,78 @@ public String toString() {
"%s-%s(%d/%d)", jobIDString, operatorIDString, subtaskIndex, parallelism);
}
}

/** Space usage statistics of a managed directory. */
final class SpaceStat {

AtomicLong physicalFileCount;
AtomicLong physicalFileSize;

AtomicLong logicalFileCount;
AtomicLong logicalFileSize;

public SpaceStat() {
this(0, 0, 0, 0);
}

public SpaceStat(
long physicalFileCount,
long physicalFileSize,
long logicalFileCount,
long logicalFileSize) {
this.physicalFileCount = new AtomicLong(physicalFileCount);
this.physicalFileSize = new AtomicLong(physicalFileSize);
this.logicalFileCount = new AtomicLong(logicalFileCount);
this.logicalFileSize = new AtomicLong(logicalFileSize);
}

public void onLogicalFileCreate(long size) {
physicalFileSize.addAndGet(size);
logicalFileSize.addAndGet(size);
logicalFileCount.incrementAndGet();
}

public void onLogicalFileDelete(long size) {
logicalFileSize.addAndGet(-size);
logicalFileCount.decrementAndGet();
}

public void onPhysicalFileCreate() {
physicalFileCount.incrementAndGet();
}

public void onPhysicalFileDelete(long size) {
physicalFileSize.addAndGet(-size);
physicalFileCount.decrementAndGet();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SpaceStat spaceStat = (SpaceStat) o;
return physicalFileCount.get() == spaceStat.physicalFileCount.get()
&& physicalFileSize.get() == spaceStat.physicalFileSize.get()
&& logicalFileCount.get() == spaceStat.logicalFileCount.get()
&& logicalFileSize.get() == spaceStat.logicalFileSize.get();
}

@Override
public String toString() {
return "SpaceStat{"
+ "physicalFileCount="
+ physicalFileCount.get()
+ ", physicalFileSize="
+ physicalFileSize.get()
+ ", logicalFileCount="
+ logicalFileCount.get()
+ ", logicalFileSize="
+ logicalFileSize.get()
+ '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,16 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps
*/
protected DirectoryStreamStateHandle managedExclusiveStateDirHandle;

/** The current space statistic, updated on file creation/deletion. */
protected SpaceStat spaceStat;

public FileMergingSnapshotManagerBase(
String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) {
this.id = id;
this.maxPhysicalFileSize = maxFileSize;
this.filePoolType = filePoolType;
this.ioExecutor = ioExecutor;
this.spaceStat = new SpaceStat();
}

@Override
Expand Down Expand Up @@ -215,6 +219,9 @@ protected LogicalFile createLogicalFile(
LogicalFileId fileID = LogicalFileId.generateRandomId();
LogicalFile file = new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey);
knownLogicalFiles.put(fileID, file);
if (physicalFile.isOwned()) {
spaceStat.onLogicalFileCreate(length);
}
return file;
}

Expand Down Expand Up @@ -305,7 +312,6 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle(
}

// deal with physicalFile file
physicalFile.incSize(stateSize);
returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile);

return new SegmentFileStateHandle(
Expand All @@ -321,7 +327,7 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle(
public void closeStreamExceptionally() throws IOException {
if (physicalFile != null) {
if (logicalFile != null) {
logicalFile.discardWithCheckpointId(checkpointId);
discardSingleLogicalFile(logicalFile, checkpointId);
} else {
// The physical file should be closed anyway. This is because the
// last segmented write on this file is likely to have failed, and
Expand All @@ -336,6 +342,7 @@ public void closeStreamExceptionally() throws IOException {

private void updateFileCreationMetrics(Path path) {
// TODO: FLINK-32091 add io metrics
spaceStat.onPhysicalFileCreate();
LOG.debug("Create a new physical file {} for checkpoint file merging.", path);
}

Expand Down Expand Up @@ -364,11 +371,12 @@ boolean isResponsibleForFile(Path filePath) {
*
* @param filePath the given file path to delete.
*/
protected final void deletePhysicalFile(Path filePath) {
protected final void deletePhysicalFile(Path filePath, long size) {
ioExecutor.execute(
() -> {
try {
fs.delete(filePath, false);
spaceStat.onPhysicalFileDelete(size);
LOG.debug("Physical file deleted: {}.", filePath);
} catch (IOException e) {
LOG.warn("Fail to delete file: {}", filePath);
Expand Down Expand Up @@ -494,6 +502,14 @@ public void reusePreviousStateHandle(
}
}

public void discardSingleLogicalFile(LogicalFile logicalFile, long checkpointId)
throws IOException {
logicalFile.discardWithCheckpointId(checkpointId);
if (logicalFile.getPhysicalFile().isOwned()) {
spaceStat.onLogicalFileDelete(logicalFile.getLength());
}
}

private boolean discardLogicalFiles(
SubtaskKey subtaskKey, long checkpointId, Set<LogicalFile> logicalFiles)
throws Exception {
Expand All @@ -502,7 +518,7 @@ private boolean discardLogicalFiles(
LogicalFile logicalFile = logicalFileIterator.next();
if (logicalFile.getSubtaskKey().equals(subtaskKey)
&& logicalFile.getLastUsedCheckpointID() <= checkpointId) {
logicalFile.discardWithCheckpointId(checkpointId);
discardSingleLogicalFile(logicalFile, checkpointId);
logicalFileIterator.remove();
knownLogicalFiles.remove(logicalFile.getFileId());
}
Expand Down Expand Up @@ -600,15 +616,20 @@ public void restoreStateHandles(
knownPhysicalFiles.computeIfAbsent(
fileHandle.getFilePath(),
path -> {
PhysicalFileDeleter fileDeleter =
(isManagedByFileMergingManager(
path,
subtaskKey,
fileHandle.getScope()))
? physicalFileDeleter
: null;
boolean managedByFileMergingManager =
isManagedByFileMergingManager(
path,
subtaskKey,
fileHandle.getScope());
if (managedByFileMergingManager) {
spaceStat.onPhysicalFileCreate();
}
return new PhysicalFile(
null, path, fileDeleter, fileHandle.getScope());
null,
path,
physicalFileDeleter,
fileHandle.getScope(),
managedByFileMergingManager);
});

LogicalFileId logicalFileId = fileHandle.getLogicalFileId();
Expand All @@ -619,6 +640,10 @@ public void restoreStateHandles(
fileHandle.getStartPos(),
fileHandle.getStateSize(),
subtaskKey);

if (physicalFile.isOwned()) {
spaceStat.onLogicalFileCreate(logicalFile.getLength());
}
knownLogicalFiles.put(logicalFileId, logicalFile);
logicalFile.advanceLastCheckpointId(checkpointId);
restoredLogicalFiles.add(logicalFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public LogicalFile(
this.length = length;
this.subtaskKey = subtaskKey;
physicalFile.incRefCount();
physicalFile.incSize(length);
}

public LogicalFileId getFileId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class PhysicalFile {
@FunctionalInterface
public interface PhysicalFileDeleter {
/** Delete the file. */
void perform(Path filePath) throws IOException;
void perform(Path filePath, long size) throws IOException;
}

/** Functional interface to create the physical file. */
Expand Down Expand Up @@ -88,18 +88,34 @@ PhysicalFile perform(
*/
private boolean deleted = false;

/**
* If a physical file is owned by current {@link FileMergingSnapshotManager}, the current {@link
* FileMergingSnapshotManager} should not delete or count it if not owned.
*/
private boolean isOwned;

public PhysicalFile(
@Nullable FSDataOutputStream outputStream,
Path filePath,
@Nullable PhysicalFileDeleter deleter,
CheckpointedStateScope scope) {
this(outputStream, filePath, deleter, scope, true);
}

public PhysicalFile(
@Nullable FSDataOutputStream outputStream,
Path filePath,
@Nullable PhysicalFileDeleter deleter,
CheckpointedStateScope scope,
boolean owned) {
this.filePath = filePath;
this.outputStream = outputStream;
this.closed = outputStream == null;
this.deleter = deleter;
this.scope = scope;
this.size = new AtomicLong(0);
this.logicalFileRefCount = new AtomicInteger(0);
this.isOwned = owned;
}

@Nullable
Expand Down Expand Up @@ -141,8 +157,8 @@ public void deleteIfNecessary() throws IOException {
LOG.warn("Fail to close output stream when deleting file: {}", filePath);
}
}
if (deleter != null) {
deleter.perform(filePath);
if (deleter != null && isOwned) {
deleter.perform(filePath, size.get());
} else {
LOG.debug(
"Skip deleting this file {} because it is not owned by FileMergingManager.",
Expand Down Expand Up @@ -201,6 +217,10 @@ public CheckpointedStateScope getScope() {
return scope;
}

public boolean isOwned() {
return isOwned;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -211,13 +231,13 @@ public boolean equals(Object o) {
}

PhysicalFile that = (PhysicalFile) o;
return filePath.equals(that.filePath);
return isOwned == that.isOwned && filePath.equals(that.filePath);
}

@Override
public String toString() {
return String.format(
"Physical File: [%s], closed: %s, logicalFileRefCount: %d",
filePath, closed, logicalFileRefCount.get());
"Physical File: [%s], owned: %s, closed: %s, logicalFileRefCount: %d",
filePath, isOwned, closed, logicalFileRefCount.get());
}
}

0 comments on commit 9a5a99b

Please sign in to comment.