Skip to content

Commit

Permalink
Introduce FileIOManager and FileIO implementations for HDFS and Local…
Browse files Browse the repository at this point in the history
… Storage (#96)

Laying foundations for storage part 4: `FileIOManager` and FileIO
implementations for `HDFS` and `Local`
  • Loading branch information
HotSushi committed May 6, 2024
1 parent d824024 commit 9339553
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 3 deletions.
Expand Up @@ -21,6 +21,7 @@
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

/**
Expand All @@ -34,7 +35,9 @@ public class OpenHouseInternalCatalog extends BaseMetastoreCatalog {

@Autowired HouseTableRepository houseTableRepository;

@Autowired FileIO fileIO;
@Autowired
@Qualifier("LegacyFileIO")
FileIO fileIO;

@Autowired SnapshotInspector snapshotInspector;

Expand Down
@@ -0,0 +1,65 @@
package com.linkedin.openhouse.internal.catalog.fileio;

import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.cluster.storage.StorageType;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* Configures the FileIO beans for storages configured in {@link StorageManager}
*
* <p>Each storage type should have a corresponding FileIO bean defined in this class. The return
* value of the bean is null if the storage type is not configured. The return class of the bean is
* the FileIO implementation for the respective storage type. If conflicting class could be returned
* for the same storage type, the bean name should be annotated with Qualifier to distinguish
* between them.
*/
@Slf4j
@Configuration
public class FileIOConfig {

@Autowired StorageManager storageManager;

/**
* Provides the HdfsFileIO bean for HDFS storage type
*
* @return HdfsFileIO bean for HDFS storage type, or null if HDFS storage type is not configured
*/
@Bean("HdfsFileIO")
HadoopFileIO provideHdfsFileIO() {
try {
FileSystem fs =
(FileSystem) storageManager.getStorage(StorageType.HDFS).getClient().getNativeClient();
return new HadoopFileIO(fs.getConf());
} catch (IllegalArgumentException e) {
// If the HDFS storage type is not configured, return null
// Spring doesn't define the bean if the return value is null
log.debug("HDFS storage type is not configured", e);
return null;
}
}

/**
* Provides the HdfsFileIO bean for Local storage type
*
* @return HdfsFileIO bean for Local storage type, or null if Local storage type is not configured
*/
@Bean("LocalFileIO")
FileIO provideLocalFileIO() {
try {
FileSystem fs =
(FileSystem) storageManager.getStorage(StorageType.LOCAL).getClient().getNativeClient();
return new HadoopFileIO(fs.getConf());
} catch (IllegalArgumentException e) {
// If the Local storage type is not configured, return null
// Spring doesn't define the bean if the return value is null
log.debug("Local storage type is not configured", e);
return null;
}
}
}
@@ -0,0 +1,51 @@
package com.linkedin.openhouse.internal.catalog.fileio;

import static com.linkedin.openhouse.cluster.storage.StorageType.HDFS;
import static com.linkedin.openhouse.cluster.storage.StorageType.LOCAL;

import com.linkedin.openhouse.cluster.storage.StorageType;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

/**
* This is the main class that provides the FileIO implementation based on the storage type. Each
* storage type should have a corresponding FileIO bean field defined in this class and the
* corresponding FileIO bean should be returned for appropriate storage type in the method {@link
* #getFileIO(StorageType.Type)}. If the storage type is not configured, the method should throw an
* IllegalArgumentException.
*/
@Component
public class FileIOManager {

@Autowired(required = false)
@Qualifier("HdfsFileIO")
HadoopFileIO hdfsFileIO;

@Autowired(required = false)
@Qualifier("LocalFileIO")
FileIO localFileIO;

/**
* Returns the FileIO implementation for the given storage type.
*
* @param storageType, the storage type for which the FileIO implementation is required
* @return FileIO implementation for the given storage type
* @throws IllegalArgumentException if the storage type is not configured
*/
public FileIO getFileIO(StorageType.Type storageType) throws IllegalArgumentException {
Supplier<? extends RuntimeException> exceptionSupplier =
() -> new IllegalArgumentException(storageType.getValue() + " is not configured");
if (HDFS.equals(storageType)) {
return Optional.ofNullable(hdfsFileIO).orElseThrow(exceptionSupplier);
} else if (LOCAL.equals(storageType)) {
return Optional.ofNullable(localFileIO).orElseThrow(exceptionSupplier);
} else {
throw new IllegalArgumentException("FileIO not supported for storage type: " + storageType);
}
}
}
@@ -1,6 +1,9 @@
package com.linkedin.openhouse.internal.catalog;

import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.internal.catalog.exception.InvalidIcebergSnapshotException;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOConfig;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapperTest;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -37,6 +40,7 @@
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import;

@SpringBootTest
Expand All @@ -47,6 +51,11 @@ class SnapshotInspectorTest {

@TempDir static Path tempDir;

@MockBean StorageManager storageManager;

@MockBean FileIOManager fileIOManager;

@MockBean FileIOConfig fileIOConfig;
private static final TableMetadata noSnapshotsMetadata =
TableMetadata.newTableMetadata(
new Schema(
Expand Down
@@ -0,0 +1,37 @@
package com.linkedin.openhouse.internal.catalog.fileio;

import com.linkedin.openhouse.cluster.storage.StorageType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@SpringBootTest(classes = FileIOManagerTest.FileIOManagerTestConfig.class)
public class FileIOManagerTest {

@Autowired FileIOManager fileIOManager;

@Test
public void testGetLocalFileIO() {
// local storage is configured
Assertions.assertNotNull(fileIOManager.getFileIO(StorageType.LOCAL));
}

@Test
public void testGetUndefinedFileIOThrowsException() {
// hdfs storage is not configured
Assertions.assertThrows(
IllegalArgumentException.class, () -> fileIOManager.getFileIO(StorageType.HDFS));
}

@Configuration
@ComponentScan(
basePackages = {
"com.linkedin.openhouse.internal.catalog.fileio",
"com.linkedin.openhouse.cluster.storage",
"com.linkedin.openhouse.cluster.configs"
})
public static class FileIOManagerTestConfig {}
}
Expand Up @@ -4,10 +4,14 @@
import com.linkedin.openhouse.housetables.client.invoker.ApiClient;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepositoryImpl;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;

@SpringBootTest
public class HouseTableMapperTest {
Expand All @@ -31,6 +35,13 @@ public UserTableApi provideMockHtsApiInstance() {
public HouseTableRepository provideRealHtsRepository() {
return new HouseTableRepositoryImpl();
}

@Primary
@Bean
@Qualifier("LegacyFileIO")
public FileIO provideFileIO() {
return new HadoopFileIO();
}
}

@Autowired protected HouseTableMapper houseTableMapper;
Expand Down
Expand Up @@ -4,11 +4,14 @@
import static org.assertj.core.api.Assertions.*;

import com.google.gson.Gson;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import com.linkedin.openhouse.housetables.client.api.UserTableApi;
import com.linkedin.openhouse.housetables.client.invoker.ApiClient;
import com.linkedin.openhouse.housetables.client.model.EntityResponseBodyUserTable;
import com.linkedin.openhouse.housetables.client.model.GetAllEntityResponseBodyUserTable;
import com.linkedin.openhouse.housetables.client.model.UserTable;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOConfig;
import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager;
import com.linkedin.openhouse.internal.catalog.mapper.HouseTableMapper;
import com.linkedin.openhouse.internal.catalog.model.HouseTable;
import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey;
Expand All @@ -21,6 +24,8 @@
import java.util.List;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -29,7 +34,9 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.util.ReflectionUtils;

Expand All @@ -44,6 +51,12 @@ public class HouseTableRepositoryImplTest {

@Autowired HouseTableMapper houseTableMapper;

@MockBean StorageManager storageManager;

@MockBean FileIOManager fileIOManager;

@MockBean FileIOConfig fileIOConfig;

@TestConfiguration
public static class MockWebServerConfiguration {
/**
Expand Down Expand Up @@ -72,6 +85,13 @@ public UserTableApi provideMockHtsApiInstance() {
public HouseTableRepository provideRealHtsRepository() {
return new HouseTableRepositoryImpl();
}

@Primary
@Bean
@Qualifier("LegacyFileIO")
public FileIO provideFileIO() {
return new HadoopFileIO();
}
}

/**
Expand Down
6 changes: 6 additions & 0 deletions infra/recipes/docker-compose/oh-hadoop-spark/cluster.yaml
Expand Up @@ -4,6 +4,12 @@ cluster:
type: "hadoop"
uri: "hdfs://namenode:9000/"
root-path: "/data/openhouse"
storages:
default-type: "hdfs"
types:
hdfs:
rootpath: "/data/openhouse"
endpoint: "hdfs://namenode:9000/"
iceberg:
write:
format:
Expand Down
6 changes: 6 additions & 0 deletions infra/recipes/docker-compose/oh-hadoop/cluster.yaml
Expand Up @@ -4,6 +4,12 @@ cluster:
type: "hadoop"
uri: "hdfs://namenode:9000/"
root-path: "/data/openhouse"
storages:
default-type: "hdfs"
types:
hdfs:
rootpath: "/data/openhouse"
endpoint: "hdfs://namenode:9000/"
iceberg:
write:
format:
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.springdoc.core.customizers.OpenApiCustomiser;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -48,6 +49,7 @@ public class MainApplicationConfig extends BaseApplicationConfig {
*
* @return Iceberg's File abstraction {@link FileIO}.
*/
@Qualifier("LegacyFileIO")
@Bean
public FileIO provideIcebergFileIO() {
if ("hadoop".equals(fsStorageProvider.storageType())) {
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.context.annotation.Import;
Expand All @@ -56,7 +57,9 @@ public class RepositoryTestWithSettableComponents {

@Autowired Catalog catalog;

@Autowired FileIO fileIO;
@Autowired
@Qualifier("LegacyFileIO")
FileIO fileIO;

@Autowired SnapshotInspector snapshotInspector;

Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.security.test.context.support.WithMockUser;
Expand All @@ -64,7 +65,9 @@ public class SnapshotsControllerTest {

@Autowired FsStorageProvider fsStorageProvider;

@Autowired FileIO fileIo;
@Autowired
@Qualifier("LegacyFileIO")
FileIO fileIo;

/** For now starting with a naive object feeder. */
private static Stream<GetTableResponseBody> responseBodyFeeder() {
Expand Down

0 comments on commit 9339553

Please sign in to comment.