Skip to content

Commit

Permalink
Refactor: Remove @LegacyFileIO and Make DelegationRefreshToken use ne…
Browse files Browse the repository at this point in the history
…w cluster.yaml (#100)

Refactor: Remove @LegacyFileIO and Make DelegationRefreshToken use new cluster.yaml (#100)
  • Loading branch information
HotSushi committed May 13, 2024
1 parent f9b2417 commit 81c0887
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 134 deletions.

This file was deleted.

@@ -1,4 +1,4 @@
package com.linkedin.openhouse.cluster.storage.filesystem;
package com.linkedin.openhouse.cluster.storage.hdfs;

import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;

Expand All @@ -7,6 +7,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -19,23 +20,36 @@
* basis and token file as pointed by HADOOP_TOKEN_FILE_LOCATION is always updated.
*/
@Slf4j
public class DelegationTokenRefresher {
public class HdfsDelegationTokenRefresher {

@Autowired private FsStorageProvider fsStorageProvider;
@Autowired HdfsStorage hdfsStorage;

/**
* Schedule credential refresh (hadoop delegation tokens) daily twice. The schedule cron
* expression represented by #{clusterProperties.clusterStorageHadoopTokenRefreshScheduleCron}
* sets the cron to run every 12 hours i.e. daily twice. Hadoop delegation token is valid for 24
* hours and hence the token must be refreshed before that. The hadoop delegation token file is
* pointed by environment variable i.e. HADOOP_TOKEN_FILE_LOCATION. The renewal of the delegation
* token must be done before it expires. This code assumes that hadoop delegation tokens are
* renewed on a regular basis and token file as pointed by HADOOP_TOKEN_FILE_LOCATION is always
* updated. So, this methods reads the token file and updates the current user
* UserGroupInformation (UGI) with the renewed token and this update is done daily twice.
* expression represented by HdfsStorage specific parameter "token.refresh.schedule.cron" sets the
* cron to run every 12 hours i.e. daily twice. Hadoop delegation token is valid for 24 hours and
* hence the token must be refreshed before that. The hadoop delegation token file is pointed by
* environment variable i.e. HADOOP_TOKEN_FILE_LOCATION. The renewal of the delegation token must
* be done before it expires. This code assumes that hadoop delegation tokens are renewed on a
* regular basis and token file as pointed by HADOOP_TOKEN_FILE_LOCATION is always updated. So,
* this methods reads the token file and updates the current user UserGroupInformation (UGI) with
* the renewed token and this update is done daily twice. The relevant configuration in the
* cluster YAML file is as follows:
*
* <pre>
* cluster:
* storages:
* hdfs:
* parameter:
* token.refresh.enabled: true
* token.refresh.schedule.cron: 0 0 0/12 * * ?
* </pre>
*/
@Scheduled(cron = "#{clusterProperties.clusterStorageHadoopTokenRefreshScheduleCron}")
@Scheduled(
cron =
"#{hdfsStorage.getProperties().getOrDefault('token.refresh.schedule.cron', '0 0 0/12 * * ?')}")
public void refresh() {
log.info("Refreshing HDFS delegation token");
String tokenFileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
try {
log.info(
Expand All @@ -55,9 +69,8 @@ public void refresh() {
+ HADOOP_TOKEN_FILE_LOCATION
+ " not found");
}
Credentials cred =
Credentials.readTokenStorageFile(
tokenFile, fsStorageProvider.storageClient().getConf());
FileSystem fs = (FileSystem) hdfsStorage.getClient().getNativeClient();
Credentials cred = Credentials.readTokenStorageFile(tokenFile, fs.getConf());
log.info("Loaded {} tokens", cred.numberOfTokens());
UserGroupInformation.getCurrentUser().addCredentials(cred);
log.info(
Expand Down
@@ -0,0 +1,55 @@
package com.linkedin.openhouse.cluster.storage.hdfs;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
* Configuration class to conditionally enable/disable creation of {@link
* HdfsDelegationTokenRefresher} bean and enable delegation token refresh scheduling.
*/
@Slf4j
@Configuration
@EnableScheduling
public class HdfsDelegationTokenRefresherConfig {

@Autowired private HdfsStorage hdfsStorage;

private static final String HDFS_TOKEN_REFRESH_ENABLED = "token.refresh.enabled";

/**
* Conditionally provide the HdfsDelegationTokenRefresher bean if the parameter for token refresh
* is enabled in the HdfsStorage properties. The relevant configuration in the cluster YAML file
* is as follows:
*
* <pre>
* cluster:
* storages:
* hdfs:
* parameter:
* token.refresh.enabled: true
* </pre>
*
* @return HdfsDelegationTokenRefresher
*/
@Bean
public HdfsDelegationTokenRefresher getDelegationTokenRefresher() {
if (!hdfsStorage.isConfigured()) {
log.debug(
"Hdfs storage is not configured, ignoring HdfsDelegationTokenRefresher bean creation");
return null;
}
String refreshEnabled =
hdfsStorage.getProperties().getOrDefault(HDFS_TOKEN_REFRESH_ENABLED, "false");
if (Boolean.parseBoolean(refreshEnabled)) {
log.info("Creating HdfsDelegationTokenRefresher bean");
return new HdfsDelegationTokenRefresher();
} else {
log.debug(
"Hdfs storage token refresh is not enabled, ignoring HdfsDelegationTokenRefresher bean creation");
return null;
}
}
}
Expand Up @@ -3,6 +3,8 @@
import static com.linkedin.openhouse.internal.catalog.InternalCatalogMetricsConstant.METRICS_PREFIX;

import com.linkedin.openhouse.cluster.metrics.micrometer.MetricsReporter;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper;
import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository;
Expand All @@ -21,7 +23,6 @@
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

/**
Expand All @@ -35,9 +36,9 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog {

@Autowired HouseTableRepository houseTableRepository;

@Autowired
@Qualifier("LegacyFileIO")
FileIO fileIO;
@Autowired FileIOManager fileIOManager;

@Autowired StorageManager storageManager;

@Autowired SnapshotInspector snapshotInspector;

Expand All @@ -49,7 +50,7 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog {
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
return new OpenHouseInternalTableOperations(
houseTableRepository,
fileIO,
fileIOManager.getFileIO(storageManager.getDefaultStorage().getType()),
snapshotInspector,
houseTableMapper,
tableIdentifier,
Expand Down Expand Up @@ -100,6 +101,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
}
if (purge) {
// Delete data and metadata files from storage.
FileIO fileIO = fileIOManager.getFileIO(storageManager.getDefaultStorage().getType());
if (fileIO instanceof SupportsPrefixOperations) {
log.debug("Deleting files for table {}", tableLocation);
((SupportsPrefixOperations) fileIO).deletePrefix(tableLocation);
Expand Down
@@ -1,6 +1,9 @@
package com.linkedin.openhouse.internal.catalog;

import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.cluster.storage.filesystem.FsStorageProvider;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOConfig;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.function.Consumer;
Expand All @@ -11,7 +14,6 @@
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.io.FileIO;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.mock.mockito.MockBean;
Expand All @@ -25,7 +27,11 @@ public static void main(String[] args) {

@MockBean Catalog openHouseInternalCatalog;

@MockBean FileIO fileIO;
@MockBean StorageManager storageManager;

@MockBean FileIOManager fileIOManager;

@MockBean FileIOConfig fileIOConfig;

@MockBean FsStorageProvider fsStorageProvider;

Expand Down
@@ -1,9 +1,6 @@
package com.linkedin.openhouse.internal.catalog;

import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOConfig;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapperTest;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -40,7 +37,6 @@
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import;

@SpringBootTest
Expand All @@ -51,11 +47,6 @@ class SnapshotInspectorTest {

@TempDir static Path tempDir;

@MockBean StorageManager storageManager;

@MockBean FileIOManager fileIOManager;

@MockBean FileIOConfig fileIOConfig;
private static final TableMetadata noSnapshotsMetadata =
TableMetadata.newTableMetadata(
new Schema(
Expand Down
Expand Up @@ -4,14 +4,10 @@
import com.linkedin.openhouse.housetables.client.invoker.ApiClient;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepositoryImpl;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;

@SpringBootTest
public class HouseTableMapperTest {
Expand All @@ -35,13 +31,6 @@ public UserTableApi provideMockHtsApiInstance() {
public HouseTableRepository provideRealHtsRepository() {
return new HouseTableRepositoryImpl();
}

@Primary
@Bean
@Qualifier("LegacyFileIO")
public FileIO provideFileIO() {
return new HadoopFileIO();
}
}

@Autowired protected HouseTableMapper houseTableMapper;
Expand Down
Expand Up @@ -4,14 +4,11 @@
import static org.assertj.core.api.Assertions.*;

import com.google.gson.Gson;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.housetables.client.api.UserTableApi;
import com.linkedin.openhouse.housetables.client.invoker.ApiClient;
import com.linkedin.openhouse.housetables.client.model.EntityResponseBodyUserTable;
import com.linkedin.openhouse.housetables.client.model.GetAllEntityResponseBodyUserTable;
import com.linkedin.openhouse.housetables.client.model.UserTable;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOConfig;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper;
import com.linkedin.openhouse.internal.catalog.model.HouseTable;
import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey;
Expand All @@ -24,8 +21,6 @@
import java.util.List;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -34,9 +29,7 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.util.ReflectionUtils;

Expand All @@ -51,12 +44,6 @@ public class HouseTableRepositoryImplTest {

@Autowired HouseTableMapper houseTableMapper;

@MockBean StorageManager storageManager;

@MockBean FileIOManager fileIOManager;

@MockBean FileIOConfig fileIOConfig;

@TestConfiguration
public static class MockWebServerConfiguration {
/**
Expand Down Expand Up @@ -85,13 +72,6 @@ public UserTableApi provideMockHtsApiInstance() {
public HouseTableRepository provideRealHtsRepository() {
return new HouseTableRepositoryImpl();
}

@Primary
@Bean
@Qualifier("LegacyFileIO")
public FileIO provideFileIO() {
return new HadoopFileIO();
}
}

/**
Expand Down

0 comments on commit 81c0887

Please sign in to comment.