Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce StorageManager and an Hdfs Implementation for Storage #90

Merged
merged 2 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.linkedin.openhouse.cluster.storage;

import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;

/**
* The BaseStorage class is an abstract class that implements the Storage interface. It provides
* common functionality for all storage implementations.
*/
public abstract class BaseStorage implements Storage {
HotSushi marked this conversation as resolved.
Show resolved Hide resolved

@Autowired private StorageProperties storageProperties;

/**
* Check if the storage is configured.
*
* <p>The storage is considered configured if type is defined in the storage properties.
*
* @return true if the storage is configured, false otherwise
*/
@Override
public boolean isConfigured() {
return Optional.ofNullable(storageProperties.getTypes())
.map(types -> types.containsKey(getType().getValue()))
.orElse(false);
}

/**
* Get the properties of the storage.
*
* @return a copy of map of properties of the storage
*/
@Override
public Map<String, String> getProperties() {
return Optional.ofNullable(storageProperties.getTypes())
.map(types -> types.get(getType().getValue()))
.map(StorageProperties.StorageTypeProperties::getParameters)
.map(HashMap::new)
.orElseGet(HashMap::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.linkedin.openhouse.cluster.storage;

import static com.linkedin.openhouse.cluster.storage.StorageType.LOCAL;

import com.google.common.base.Preconditions;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/**
* The StorageManager class is responsible for managing the storage types and providing the
* appropriate storage implementation based on the configuration.
*/
@Component
public class StorageManager {

@Autowired StorageProperties storageProperties;

@Autowired StorageType storageType;

@Autowired List<Storage> storages;
HotSushi marked this conversation as resolved.
Show resolved Hide resolved

/**
* Validate the storage properties.
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>It validates storage properties as follows:
*
* <p>1. If any storage type is configured, then default type must be set. Alternatively, if a
* default type is not set, then storage types should also be null or empty. **valid**
*
* <p>2. If default-type is set, then the value of the default type must exist in the configured
* storage types. **valid**
*
* <p>all other configurations are **invalid**
*/
@PostConstruct
public void validateProperties() {
String clusterYamlError = "Cluster yaml is incorrectly configured: ";
if (StringUtils.hasText(storageProperties.getDefaultType())) {
// default-type is configured, types should contain the default-type
Preconditions.checkArgument(
!CollectionUtils.isEmpty(storageProperties.getTypes())
&& storageProperties.getTypes().containsKey(storageProperties.getDefaultType()),
clusterYamlError
+ "storage types should contain the default-type: "
+ storageProperties.getDefaultType());
} else {
// default-type is not configured, types should be null or empty
Preconditions.checkArgument(
CollectionUtils.isEmpty(storageProperties.getTypes()),
clusterYamlError + "default-type must be set if storage types are configured");
}
try {
Optional.ofNullable(storageProperties.getDefaultType()).ifPresent(storageType::fromString);
Optional.ofNullable(storageProperties.getTypes())
.map(Map::keySet)
.ifPresent(keyset -> keyset.forEach(key -> storageType.fromString(key)));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(clusterYamlError + e.getMessage());
}
}

/**
* Get the default storage.
*
* @return the default storage
*/
public Storage getDefaultStorage() {
if (!StringUtils.hasText(storageProperties.getDefaultType())) {
return getStorage(LOCAL);
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
}
return getStorage(storageType.fromString(storageProperties.getDefaultType()));
}

/**
* Get the storage based on the storage type.
*
* @param storageType the storage type
* @return the storage
*/
public Storage getStorage(StorageType.Type storageType) {
for (Storage storage : storages) {
if (storage.getType().equals(storageType) && storage.isConfigured()) {
return storage;
}
}
throw new IllegalArgumentException(
"No configured storage found for type: " + storageType.getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.springframework.stereotype.Component;

/**
* Enum for supported storage types.
*
* <p>New types should be added here as public static final fields, and their corresponding
* implementations should be added to the fromString method.
*/
@Component
public class StorageType {
public static final Type HDFS = new Type("hdfs");
public static final Type LOCAL = new Type("local");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.linkedin.openhouse.cluster.storage.hdfs;

import com.linkedin.openhouse.cluster.storage.BaseStorage;
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

/**
* The HdfsStorage class is an implementation of the Storage interface for HDFS storage. It uses a
* HdfsStorageClient to interact with the HDFS file system. The HdfsStorageClient uses the {@link
* org.apache.hadoop.fs.FileSystem} class to interact with the HDFS file system.
*/
@Component
public class HdfsStorage extends BaseStorage {

@Autowired @Lazy private HdfsStorageClient hdfsStorageClient;

/**
* Get the type of the HDFS storage.
*
* @return the type of the HDFS storage
*/
@Override
public StorageType.Type getType() {
return StorageType.HDFS;
}

/**
* Get the HDFS storage client.
*
* @return the HDFS storage client
*/
@Override
public StorageClient<?> getClient() {
return hdfsStorageClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.linkedin.openhouse.cluster.storage.hdfs;

import com.google.common.base.Preconditions;
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.io.IOException;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

/**
* The HdfsStorageClient class is an implementation of the StorageClient interface for HDFS Storage.
* It uses the {@link FileSystem} class to interact with the HDFS file system.
*/
@Slf4j
@Lazy
@Component
public class HdfsStorageClient implements StorageClient<FileSystem> {

private FileSystem fs;

@Autowired private StorageProperties storageProperties;

private static final StorageType.Type HDFS_TYPE = StorageType.HDFS;

/** Initialize the HdfsStorageClient when the bean is accessed for the first time. */
@PostConstruct
public synchronized void init() throws IOException {
validateProperties();
StorageProperties.StorageTypeProperties hdfsStorageProperties =
storageProperties.getTypes().get(HDFS_TYPE.getValue());
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
configuration.set("fs.defaultFS", hdfsStorageProperties.getEndpoint());
fs = FileSystem.get(configuration);
}

/** Validate the storage properties. */
private void validateProperties() {
log.info("Initializing storage client for type: " + HDFS_TYPE);
Preconditions.checkArgument(
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
HotSushi marked this conversation as resolved.
Show resolved Hide resolved
!CollectionUtils.isEmpty(storageProperties.getTypes())
&& storageProperties.getTypes().containsKey(HDFS_TYPE.getValue()),
"Storage properties doesn't contain type: " + HDFS_TYPE.getValue());
StorageProperties.StorageTypeProperties hdfsStorageProperties =
storageProperties.getTypes().get(HDFS_TYPE.getValue());
Preconditions.checkArgument(
hdfsStorageProperties != null,
"Storage properties doesn't contain type: " + HDFS_TYPE.getValue());
Preconditions.checkArgument(
hdfsStorageProperties.getEndpoint() != null,
"Storage properties doesn't contain endpoint for: " + HDFS_TYPE.getValue());
Preconditions.checkArgument(
hdfsStorageProperties.getRootPath() != null,
"Storage properties doesn't contain rootpath for: " + HDFS_TYPE.getValue());
}

@Override
public FileSystem getNativeClient() {
return fs;
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package com.linkedin.openhouse.cluster.storage.local;

import com.linkedin.openhouse.cluster.storage.Storage;
import com.linkedin.openhouse.cluster.storage.BaseStorage;
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
Expand All @@ -17,7 +14,7 @@
* Hadoop FileSystem to interact with the local file system.
*/
@Component
public class LocalStorage implements Storage {
public class LocalStorage extends BaseStorage {

private static final StorageType.Type LOCAL_TYPE = StorageType.LOCAL;

Expand Down Expand Up @@ -45,20 +42,6 @@ public boolean isConfigured() {
}
}

/**
* Get the properties of the local storage.
*
* @return a copy of map of properties of the local storage
*/
@Override
public Map<String, String> getProperties() {
return Optional.ofNullable(storageProperties.getTypes())
.map(types -> types.get(LOCAL_TYPE.getValue()))
.map(StorageProperties.StorageTypeProperties::getParameters)
.map(HashMap::new)
.orElseGet(HashMap::new);
}

@Override
public StorageType.Type getType() {
return LOCAL_TYPE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.linkedin.openhouse.tables.mock.properties;

import com.linkedin.openhouse.cluster.configs.ClusterProperties;
import com.linkedin.openhouse.cluster.storage.StorageManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ContextConfiguration;

@SpringBootTest
Expand All @@ -14,6 +16,12 @@ public class CustomClusterPropertiesTest {

@Autowired private ClusterProperties clusterProperties;

/**
* StorageManager validates storage properties, the 'cluster-test-properties.yaml' contains
* invalid storage type called "objectstore" for testing.
*/
@MockBean private StorageManager storageManager;

@Test
public void testClusterProperties() {
Assertions.assertEquals("TestCluster", clusterProperties.getClusterName());
Expand Down