Skip to content

Commit

Permalink
StorageManager & HDFS Impl
Browse files Browse the repository at this point in the history
  • Loading branch information
HotSushi committed Apr 29, 2024
1 parent 879c429 commit 20af749
Show file tree
Hide file tree
Showing 11 changed files with 486 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.linkedin.openhouse.cluster.storage;

import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;

/**
* The BaseStorage class is an abstract class that implements the Storage interface. It provides
* common functionality for all storage implementations.
*/
public abstract class BaseStorage implements Storage {

@Autowired private StorageProperties storageProperties;

/**
* Check if the storage is configured.
*
* <p>The storage is considered configured if type is defined in the storage properties.
*
* @return true if the storage is configured, false otherwise
*/
@Override
public boolean isConfigured() {
return Optional.ofNullable(storageProperties.getTypes())
.map(types -> types.containsKey(getType().getValue()))
.orElse(false);
}

/**
* Get the properties of the storage.
*
* @return a copy of map of properties of the storage
*/
@Override
public Map<String, String> getProperties() {
return Optional.ofNullable(storageProperties.getTypes())
.map(types -> types.get(getType().getValue()))
.map(StorageProperties.StorageTypeProperties::getParameters)
.map(HashMap::new)
.orElseGet(HashMap::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.linkedin.openhouse.cluster.storage;

import static com.linkedin.openhouse.cluster.storage.StorageType.LOCAL;

import com.google.common.base.Preconditions;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/**
* The StorageManager class is responsible for managing the storage types and providing the
* appropriate storage implementation based on the configuration.
*/
@Component
public class StorageManager {

@Autowired StorageProperties storageProperties;

@Autowired StorageType storageType;

@Autowired List<Storage> storages;

/**
* Validate the storage properties.
*
* <p>It validates storage properties as follows:
*
* <p>1. If any storage type is configured, then default type must be set. Alternatively, if a
* default type is not set, then storage types should also be null or empty. **valid**
*
* <p>2. If default-type is set, then the value of the default type must exist in the configured
* storage types. **valid**
*
* <p>all other configurations are **invalid**
*/
@PostConstruct
public void validateProperties() {
String clusterYamlError = "Cluster yaml is incorrectly configured: ";
if (StringUtils.hasText(storageProperties.getDefaultType())) {
// default-type is configured, types should contain the default-type
Preconditions.checkArgument(
!CollectionUtils.isEmpty(storageProperties.getTypes())
&& storageProperties.getTypes().containsKey(storageProperties.getDefaultType()),
clusterYamlError
+ "storage types should contain the default-type: "
+ storageProperties.getDefaultType());
} else {
// default-type is not configured, types should be null or empty
Preconditions.checkArgument(
CollectionUtils.isEmpty(storageProperties.getTypes()),
clusterYamlError + "default-type must be set if storage types are configured");
}
try {
Optional.ofNullable(storageProperties.getDefaultType()).ifPresent(storageType::fromString);
Optional.ofNullable(storageProperties.getTypes())
.map(Map::keySet)
.ifPresent(keyset -> keyset.forEach(key -> storageType.fromString(key)));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(clusterYamlError + e.getMessage());
}
}

/**
* Get the default storage.
*
* @return the default storage
*/
public Storage getDefaultStorage() {
if (!StringUtils.hasText(storageProperties.getDefaultType())) {
return getStorage(LOCAL);
}
return getStorage(storageType.fromString(storageProperties.getDefaultType()));
}

/**
* Get the storage based on the storage type.
*
* @param storageType the storage type
* @return the storage
*/
public Storage getStorage(StorageType.Type storageType) {
for (Storage storage : storages) {
if (storage.getType().equals(storageType) && storage.isConfigured()) {
return storage;
}
}
throw new IllegalArgumentException(
"No configured storage found for type: " + storageType.getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.springframework.stereotype.Component;

/**
* Enum for supported storage types.
*
* <p>New types should be added here as public static final fields, and their corresponding
* implementations should be added to the fromString method.
*/
@Component
public class StorageType {
public static final Type HDFS = new Type("hdfs");
public static final Type LOCAL = new Type("local");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.linkedin.openhouse.cluster.storage.hdfs;

import com.linkedin.openhouse.cluster.storage.BaseStorage;
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

/**
* The HdfsStorage class is an implementation of the Storage interface for HDFS storage. It uses a
* HdfsStorageClient to interact with the HDFS file system. The HdfsStorageClient uses the {@link
* org.apache.hadoop.fs.FileSystem} class to interact with the HDFS file system.
*/
@Component
public class HdfsStorage extends BaseStorage {

@Autowired @Lazy private HdfsStorageClient hdfsStorageClient;

/**
* Get the type of the HDFS storage.
*
* @return the type of the HDFS storage
*/
@Override
public StorageType.Type getType() {
return StorageType.HDFS;
}

/**
* Get the HDFS storage client.
*
* @return the HDFS storage client
*/
@Override
public StorageClient<?> getClient() {
return hdfsStorageClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.linkedin.openhouse.cluster.storage.hdfs;

import com.google.common.base.Preconditions;
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.io.IOException;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

/**
* The HdfsStorageClient class is an implementation of the StorageClient interface for HDFS Storage.
* It uses the {@link FileSystem} class to interact with the HDFS file system.
*/
@Slf4j
@Lazy
@Component
public class HdfsStorageClient implements StorageClient<FileSystem> {

private FileSystem fs;

@Autowired private StorageProperties storageProperties;

private static final StorageType.Type HDFS_TYPE = StorageType.HDFS;

/** Initialize the HdfsStorageClient when the bean is accessed for the first time. */
@PostConstruct
public synchronized void init() throws IOException {
validateProperties();
StorageProperties.StorageTypeProperties hdfsStorageProperties =
storageProperties.getTypes().get(HDFS_TYPE.getValue());
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
configuration.set("fs.defaultFS", hdfsStorageProperties.getEndpoint());
fs = FileSystem.get(configuration);
}

/** Validate the storage properties. */
private void validateProperties() {
log.info("Initializing storage client for type: " + HDFS_TYPE);
Preconditions.checkArgument(
!CollectionUtils.isEmpty(storageProperties.getTypes())
&& storageProperties.getTypes().containsKey(HDFS_TYPE.getValue()),
"Storage properties doesn't contain type: " + HDFS_TYPE.getValue());
StorageProperties.StorageTypeProperties hdfsStorageProperties =
storageProperties.getTypes().get(HDFS_TYPE.getValue());
Preconditions.checkArgument(
hdfsStorageProperties != null,
"Storage properties doesn't contain type: " + HDFS_TYPE.getValue());
Preconditions.checkArgument(
hdfsStorageProperties.getEndpoint() != null,
"Storage properties doesn't contain endpoint for: " + HDFS_TYPE.getValue());
Preconditions.checkArgument(
hdfsStorageProperties.getRootPath() != null,
"Storage properties doesn't contain rootpath for: " + HDFS_TYPE.getValue());
}

@Override
public FileSystem getNativeClient() {
return fs;
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package com.linkedin.openhouse.cluster.storage.local;

import com.linkedin.openhouse.cluster.storage.Storage;
import com.linkedin.openhouse.cluster.storage.BaseStorage;
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
Expand All @@ -17,7 +14,7 @@
* Hadoop FileSystem to interact with the local file system.
*/
@Component
public class LocalStorage implements Storage {
public class LocalStorage extends BaseStorage {

private static final StorageType.Type LOCAL_TYPE = StorageType.LOCAL;

Expand Down Expand Up @@ -45,20 +42,6 @@ public boolean isConfigured() {
}
}

/**
* Get the properties of the local storage.
*
* @return a copy of map of properties of the local storage
*/
@Override
public Map<String, String> getProperties() {
return Optional.ofNullable(storageProperties.getTypes())
.map(types -> types.get(LOCAL_TYPE.getValue()))
.map(StorageProperties.StorageTypeProperties::getParameters)
.map(HashMap::new)
.orElseGet(HashMap::new);
}

@Override
public StorageType.Type getType() {
return LOCAL_TYPE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.linkedin.openhouse.tables.mock.properties;

import com.linkedin.openhouse.cluster.configs.ClusterProperties;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ContextConfiguration;

@SpringBootTest
Expand All @@ -14,6 +16,12 @@ public class CustomClusterPropertiesTest {

@Autowired private ClusterProperties clusterProperties;

/**
* StorageManager validates storage properties, the 'cluster-test-properties.yaml' contains
* invalid storage type called "objectstore" for testing.
*/
@MockBean private StorageManager storageManager;

@Test
public void testClusterProperties() {
Assertions.assertEquals("TestCluster", clusterProperties.getClusterName());
Expand Down

0 comments on commit 20af749

Please sign in to comment.