Skip to content

Commit

Permalink
[GOBBLIN-2049] Configure Gobblin Distcp Writer to fail if setPermissi…
Browse files Browse the repository at this point in the history
…on fails (#3929)

Configure Gobblin Distcp Writer to fail if setPermission fails
---------

Co-authored-by: Urmi Mustafi <umustafi@linkedin.com>
  • Loading branch information
umustafi and Urmi Mustafi committed Apr 19, 2024
1 parent b57ecbc commit 62f645c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public boolean isThreadSafe() {
protected final DataFileVersionStrategy dstDataFileVersionStrategy;
protected final boolean preserveDirModTime;
protected final boolean resyncDirOwnerAndPermission;
protected final boolean shouldFailWhenPermissionsFail;

/**
* Build a new {@link CopyDataPublisher} from {@link State}. The constructor expects the following to be set in the
Expand Down Expand Up @@ -135,6 +136,7 @@ public CopyDataPublisher(State state) throws IOException {
// Default to be true to preserve the original behavior
this.preserveDirModTime = state.getPropAsBoolean(CopyConfiguration.PRESERVE_MODTIME_FOR_DIR, true);
this.resyncDirOwnerAndPermission = state.getPropAsBoolean(CopyConfiguration.RESYNC_DIR_OWNER_AND_PERMISSION_FOR_MANIFEST_COPY, false);
this.shouldFailWhenPermissionsFail = state.getPropAsBoolean(FileAwareInputStreamDataWriter.GOBBLIN_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL, FileAwareInputStreamDataWriter.DEFAULT_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL);
}

@Override
Expand Down Expand Up @@ -197,7 +199,7 @@ private void preserveFileAttrInPublisher(CopyableFile copyableFile) throws IOExc
FileStatus dstFile = this.fs.getFileStatus(copyableFile.getDestination());
// User specifically try to copy dir metadata, so we change the group and permissions on destination even when the dir already existed
log.info("Setting destination directory {} owner and permission to {}", dstFile.getPath(), copyableFile.getDestinationOwnerAndPermission().getFsPermission());
FileAwareInputStreamDataWriter.safeSetPathPermission(this.fs, dstFile, copyableFile.getDestinationOwnerAndPermission());
FileAwareInputStreamDataWriter.setPathPermission(this.fs, dstFile, copyableFile.getDestinationOwnerAndPermission(), this.shouldFailWhenPermissionsFail);
}
if (preserveDirModTime || copyableFile.getFileStatus().isFile()) {
// Preserving File ModTime, and set the access time to an initializing value when ModTime is declared to be preserved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = "gobblin.copy.task.overwrite.on.commit";
public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = false;
public static final String GOBBLIN_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL = "gobblin.copy.shouldFailWhenPermissionsFail";
public static final boolean DEFAULT_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL = true;

protected final AtomicLong bytesWritten = new AtomicLong();
protected final AtomicLong filesWritten = new AtomicLong();
Expand All @@ -108,6 +110,7 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
private final Configuration conf;

protected final Meter copySpeedMeter;
protected final boolean shouldFailWhenPermissionsFail;

protected final Optional<String> writerAttemptIdOptional;
/**
Expand Down Expand Up @@ -181,6 +184,8 @@ public FileAwareInputStreamDataWriter(State state, FileSystem fileSystem, int nu
} else {
this.renameOptions = Options.Rename.NONE;
}
this.shouldFailWhenPermissionsFail = state.getPropAsBoolean(GOBBLIN_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL,
DEFAULT_COPY_SHOULD_FAIL_WHEN_PERMISSIONS_FAIL);
}

public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId)
Expand Down Expand Up @@ -350,10 +355,13 @@ public static Path getOutputDir(State state) {
}

/**
* Sets the {@link FsPermission}, owner, group for the path passed. It will not throw exceptions, if operations
* cannot be executed, will warn and continue.
* Sets the {@link FsPermission}, owner, group for the path passed. It uses `requirePermissionSetForSuccess` param
* to determine whether an exception will be thrown or only error log printed in the case of failure.
* @param requirePermissionSetForSuccess if true then throw exception, otherwise log error message and continue when
* operations cannot be executed.
*/
public static void safeSetPathPermission(FileSystem fs, FileStatus file, OwnerAndPermission ownerAndPermission) {
public static void setPathPermission(FileSystem fs, FileStatus file, OwnerAndPermission ownerAndPermission,
boolean requirePermissionSetForSuccess) throws IOException {

Path path = file.getPath();
OwnerAndPermission targetOwnerAndPermission = setOwnerExecuteBitIfDirectory(file, ownerAndPermission);
Expand All @@ -367,7 +375,12 @@ public static void safeSetPathPermission(FileSystem fs, FileStatus file, OwnerAn
fs.setPermission(path, targetOwnerAndPermission.getFsPermission());
}
} catch (IOException ioe) {
log.warn("Failed to set permission for directory " + path, ioe);
String permissionFailureMessage = "Failed to set permission for directory " + path;
if (requirePermissionSetForSuccess) {
throw new IOException(permissionFailureMessage, ioe);
} else {
log.error(permissionFailureMessage, ioe);
}
}

String owner = Strings.isNullOrEmpty(targetOwnerAndPermission.getOwner()) ? null : targetOwnerAndPermission.getOwner();
Expand All @@ -378,7 +391,12 @@ public static void safeSetPathPermission(FileSystem fs, FileStatus file, OwnerAn
fs.setOwner(path, owner, group);
}
} catch (IOException ioe) {
log.warn("Failed to set owner and/or group for path " + path + " to " + owner + ":" + group, ioe);
String ownerGroupFailureMessage = "Failed to set owner and/or group for path " + path + " to " + owner + ":" + group;
if (requirePermissionSetForSuccess) {
throw new IOException(ownerGroupFailureMessage, ioe);
} else {
log.error(ownerGroupFailureMessage, ioe);
}
}
}

Expand All @@ -394,7 +412,7 @@ private void setRecursivePermission(Path path, OwnerAndPermission ownerAndPermis
Collections.reverse(files);

for (FileStatus file : files) {
safeSetPathPermission(this.fs, file, ownerAndPermission);
setPathPermission(this.fs, file, ownerAndPermission, this.shouldFailWhenPermissionsFail);
}
}

Expand Down Expand Up @@ -480,7 +498,7 @@ public void commit()
try {
this.fs.delete(this.stagingDir, true);
} catch (IOException ioe) {
log.warn("Failed to delete staging path at " + this.stagingDir);
log.error("Failed to delete staging path at " + this.stagingDir);
}
}
}
Expand Down

0 comments on commit 62f645c

Please sign in to comment.