Skip to content

Commit

Permalink
[GOBBLIN-2050] Add settings to allow for full cleanup in GobblinYarnA…
Browse files Browse the repository at this point in the history
…ppLauncher (#3931)

Allow for explicit path definitions for token locations and work directories to allow easy cleanup after job completion
  • Loading branch information
Will-Lo committed Apr 25, 2024
1 parent a74d17a commit 773da76
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public class GobblinClusterConfigurationKeys {
public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
// Root working directory for Gobblin cluster
public static final String CLUSTER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "workDir";
// Root working dir without appending the application name, keeping CLUSTER_WORK_DIR property for backward compatibility
// This is used in scenarios where we want to encapsulate multiple files inside of this work dir without coupling it to the YARN application
// Example: Yarn security token refresh location, gobblin cluster worker directories.
// However for concurrent jobs need to ensure that this property is distinct for each job otherwise it can lead to folder conflicts and pre-emptive deletion of files.
public static final String CLUSTER_EXACT_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "exact.workDir";

public static final String DISTRIBUTED_JOB_LAUNCHER_ENABLED = GOBBLIN_CLUSTER_PREFIX + "distributedJobLauncherEnabled";
public static final boolean DEFAULT_DISTRIBUTED_JOB_LAUNCHER_ENABLED = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,14 @@ public static String getHostname() throws UnknownHostException {
*/
public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs,
String applicationName, String applicationId) {
if (config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR)) {
if (config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_EXACT_WORK_DIR)) {
return new Path(new Path(fs.getUri()), config.getString(GobblinClusterConfigurationKeys.CLUSTER_EXACT_WORK_DIR));
} else if (config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR)) {
return new Path(new Path(fs.getUri()), PathUtils.combinePaths(config.getString(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR),
getAppWorkDirPath(applicationName, applicationId)));
} else {
return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, applicationId));
}
return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, applicationId));
}

/**
Expand Down Expand Up @@ -254,4 +257,13 @@ public static FileSystem buildFileSystem(Config config, Configuration conf)
.get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
: FileSystem.get(conf);
}

public static FileSystem createFileSystem(Config config, Configuration conf) throws IOException {
Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config, GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
//Add any Hadoop-specific overrides into the Configuration object
JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides), conf);
return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
.newInstance(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
: FileSystem.newInstance(conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -870,8 +870,7 @@ private Path getHdfsLogDir(Path appWorkDir) throws IOException {
* @throws IOException
*/
private AbstractTokenRefresher buildTokenRefreshManager() throws IOException {
Path tokenFilePath = new Path(this.fs.getHomeDirectory(), this.applicationName + Path.SEPARATOR +
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
Path tokenFilePath = YarnContainerSecurityManager.getYarnTokenFilePath(this.config, this.fs);
String securityManagerClassName = ConfigUtils.getString(config, GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS, GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS);

try {
Expand All @@ -892,10 +891,12 @@ private AbstractTokenRefresher buildTokenRefreshManager() throws IOException {

@VisibleForTesting
void cleanUpAppWorkDirectory(ApplicationId applicationId) throws IOException {
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, applicationId.toString());
if (this.fs.exists(appWorkDir)) {
// Create a new filesystem as this.fs may have been closed by the Yarn Application, and FS.get() will return a cached instance of the closed FS
FileSystem fs = GobblinClusterUtils.createFileSystem(this.config, this.yarnConfiguration);
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, fs, this.applicationName, applicationId.toString());
if (fs.exists(appWorkDir)) {
LOGGER.info("Deleting application working directory " + appWorkDir);
this.fs.delete(appWorkDir, true);
fs.delete(appWorkDir, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public class GobblinYarnConfigurationKeys {
public static final String KEYTAB_FILE_PATH = GOBBLIN_YARN_PREFIX + "keytab.file.path";
public static final String KEYTAB_PRINCIPAL_NAME = GOBBLIN_YARN_PREFIX + "keytab.principal.name";
public static final String TOKEN_FILE_NAME = ".token";
public static final String TOKEN_FILE_PATH_KEY = GOBBLIN_YARN_PREFIX + "token.file.path";
public static final String LOGIN_INTERVAL_IN_MINUTES = GOBBLIN_YARN_PREFIX + "login.interval.minutes";
public static final Long DEFAULT_LOGIN_INTERVAL_IN_MINUTES = Long.MAX_VALUE;
public static final String TOKEN_RENEW_INTERVAL_IN_MINUTES = GOBBLIN_YARN_PREFIX + "token.renew.interval.minutes";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import java.io.IOException;

import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.logs.LogCopier;
import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -63,9 +65,7 @@ public YarnContainerSecurityManager(Config config, FileSystem fs, EventBus event

public YarnContainerSecurityManager(Config config, FileSystem fs, EventBus eventBus, LogCopier logCopier) {
this.fs = fs;
this.tokenFilePath = new Path(this.fs.getHomeDirectory(),
config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY) + Path.SEPARATOR
+ GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
this.tokenFilePath = getYarnTokenFilePath(config, fs);
this.eventBus = eventBus;
this.logCopier = logCopier;
}
Expand Down Expand Up @@ -111,4 +111,19 @@ void addCredentials(Credentials credentials) throws IOException {
}
UserGroupInformation.getCurrentUser().addCredentials(credentials);
}

/**
* A utility method to get the location of the generated security token
* @param config - the configuration that contains the application name and the token file path
* @param fs - the Filesystem that stores the security token
* @return the path to the security token
*/
static Path getYarnTokenFilePath(Config config, FileSystem fs) {
if (config.hasPath(GobblinYarnConfigurationKeys.TOKEN_FILE_PATH_KEY)) {
return new Path(config.getString(GobblinYarnConfigurationKeys.TOKEN_FILE_PATH_KEY), GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
}
// Default to storing the token file in the home directory of the user
return new Path(fs.getHomeDirectory(), PathUtils.combinePaths(config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY),
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME));
}
}

0 comments on commit 773da76

Please sign in to comment.