Skip to content

Commit

Permalink
Add interfaces for StorageClient, Storage and its Implementation for …
Browse files Browse the repository at this point in the history
…Local
  • Loading branch information
HotSushi authored and ctrezzo committed Apr 25, 2024
1 parent eab621e commit bc4a84c
Show file tree
Hide file tree
Showing 8 changed files with 432 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.linkedin.openhouse.cluster.storage;

import java.util.Map;

/**
* The Storage interface represents a storage system in OpenHouse. It provides methods to check if
* the storage is configured, retrieve properties of the storage, get the type of the storage, and
* get a client to interact with the storage.
*
* <p>Implementations of this interface should provide the specific logic for each type of storage.
* For example, the {@link com.linkedin.openhouse.cluster.storage.local.LocalStorage} class is an
* implementation of this interface for local storage, and it uses a {@link
* com.linkedin.openhouse.cluster.storage.local.LocalStorageClient} to interact with the local file
* system.
*/
public interface Storage {

/**
* Check if the storage is configured.
*
* <p>The storage is considered configured if {@link
* com.linkedin.openhouse.cluster.storage.configs.StorageProperties} has type defined for it
*
* @return true if the storage is configured, false otherwise
*/
boolean isConfigured();

/**
* Get the properties of the storage.
*
* @return a map of properties of the storage
*/
Map<String, String> getProperties();

/**
* Get the type of the storage.
*
* <p>Please refer to {@link StorageType} for the list of supported storage types. An example type
* of the local storage that can be returned {@link StorageType.Type#LOCAL}.
*
* @return the type of the storage
*/
StorageType.Type getType();

/**
* Get a client to interact with the storage.
*
* @return a client to interact with the storage
*/
StorageClient<?> getClient();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.linkedin.openhouse.cluster.storage;

/**
* The StorageClient interface represents a client to interact with a storage system in OpenHouse.
* It provides a method to get the native client of the storage system.
*
* <p>Implementations of this interface should provide the specific logic for each type of storage
* client. For example, the {@link com.linkedin.openhouse.cluster.storage.local.LocalStorageClient}
* class is an implementation of this interface for local storage, and it uses an Apache Hadoop
* {@link org.apache.hadoop.fs.FileSystem} to interact with the local file system.
*
* @param <T> the type of the native client of the storage system
*/
public interface StorageClient<T> {

/**
* Get the native client of the storage system.
*
* @return the native client of the storage system
*/
T getNativeClient();
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.openhouse.cluster.storage;

import lombok.*;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;

/**
* Enum for supported storage types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import com.linkedin.openhouse.cluster.configs.YamlPropertySourceFactory;
import java.util.HashMap;
import java.util.Map;
import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.linkedin.openhouse.cluster.storage.local;

import com.linkedin.openhouse.cluster.storage.Storage;
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

/**
* The LocalStorage class is an implementation of the Storage interface for local storage. It uses a
* LocalStorageClient to interact with the local file system. The LocalStorageClient uses an Apache
* Hadoop FileSystem to interact with the local file system.
*/
@Component
public class LocalStorage implements Storage {

private static final StorageType.Type LOCAL_TYPE = StorageType.LOCAL;

@Autowired private StorageProperties storageProperties;

// Lazy initialization of the LocalStorageClient
@Autowired @Lazy private LocalStorageClient localStorageClient;

/**
* Check if the local storage is configured.
*
* <p>The local storage is considered configured if the default type is not set or no types are
* provided or specific "local" type is provided.
*
* @return true if the local storage is configured, false otherwise
*/
@Override
public boolean isConfigured() {
if (storageProperties.getDefaultType() == null) {
return true;
} else if (storageProperties.getTypes() == null || storageProperties.getTypes().isEmpty()) {
return true;
} else {
return storageProperties.getTypes().containsKey(LOCAL_TYPE.getValue());
}
}

/**
* Get the properties of the local storage.
*
* @return a copy of map of properties of the local storage
*/
@Override
public Map<String, String> getProperties() {
return Optional.ofNullable(storageProperties.getTypes())
.map(types -> types.get(LOCAL_TYPE.getValue()))
.map(StorageProperties.StorageTypeProperties::getParameters)
.map(HashMap::new)
.orElseGet(HashMap::new);
}

@Override
public StorageType.Type getType() {
return LOCAL_TYPE;
}

@Override
public StorageClient<?> getClient() {
return localStorageClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.linkedin.openhouse.cluster.storage.local;

import com.google.common.base.Preconditions;
import com.linkedin.openhouse.cluster.storage.StorageClient;
import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

/**
* The LocalStorageClient class is an implementation of the StorageClient interface for local
* storage. It uses an Apache Hadoop FileSystem to interact with the local file system.
*/
@Slf4j
@Lazy
@Component
public class LocalStorageClient implements StorageClient<FileSystem> {

private FileSystem fs;

private static final StorageType.Type LOCAL_TYPE = StorageType.LOCAL;

private static final String DEFAULT_ENDPOINT = "file://";

private static final String DEFAULT_ROOTPATH = "/tmp";

@Autowired private StorageProperties storageProperties;

/** Initialize the LocalStorageClient when the bean is accessed for the first time. */
@PostConstruct
public synchronized void init() throws URISyntaxException, IOException {
log.info("Initializing storage client for type: " + LOCAL_TYPE);

URI uri;
if (storageProperties.getTypes() != null && !storageProperties.getTypes().isEmpty()) {
Preconditions.checkArgument(
storageProperties.getTypes().containsKey(LOCAL_TYPE.getValue()),
"Storage properties doesn't contain type: " + LOCAL_TYPE.getValue());
Preconditions.checkArgument(
storageProperties.getTypes().get(LOCAL_TYPE.getValue()).getEndpoint() != null,
"Storage properties doesn't contain endpoint for: " + LOCAL_TYPE.getValue());
Preconditions.checkArgument(
storageProperties.getTypes().get(LOCAL_TYPE.getValue()).getRootPath() != null,
"Storage properties doesn't contain rootpath for: " + LOCAL_TYPE.getValue());
Preconditions.checkArgument(
storageProperties
.getTypes()
.get(LOCAL_TYPE.getValue())
.getEndpoint()
.startsWith(DEFAULT_ENDPOINT),
"Storage properties endpoint was misconfigured for: " + LOCAL_TYPE.getValue());
try {
uri =
new URI(
storageProperties.getTypes().get(LOCAL_TYPE.getValue()).getEndpoint()
+ storageProperties.getTypes().get(LOCAL_TYPE.getValue()).getRootPath());
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
"Storage properties 'endpoint', 'rootpath' was incorrectly configured for: "
+ LOCAL_TYPE.getValue(),
e);
}
} else {
uri = new URI(DEFAULT_ENDPOINT + DEFAULT_ROOTPATH);
}
this.fs = FileSystem.get(uri, new org.apache.hadoop.conf.Configuration());
Preconditions.checkArgument(
fs instanceof LocalFileSystem,
"Instantiation failed for LocalStorageClient, fileSystem is not a LocalFileSystem");
}

@Override
public FileSystem getNativeClient() {
return fs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.linkedin.openhouse.tables.mock.storage.local;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.when;

import com.linkedin.openhouse.cluster.storage.StorageType;
import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
import com.linkedin.openhouse.cluster.storage.local.LocalStorageClient;
import java.util.Collections;
import java.util.HashMap;
import javax.annotation.PostConstruct;
import org.apache.hadoop.fs.LocalFileSystem;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.ApplicationContext;

@SpringBootTest
public class LocalStorageClientTest {

@MockBean private StorageProperties storageProperties;

@Autowired private ApplicationContext context;

private LocalStorageClient localStorageClient;

@PostConstruct
public void setupTest() {
when(storageProperties.getTypes()).thenReturn(null);
localStorageClient = context.getBean(LocalStorageClient.class);
}

@Test
public void testLocalStorageClientInvalidPropertiesMissingRootPathAndEndpoint() {
when(storageProperties.getTypes())
.thenReturn(
new HashMap<>(
Collections.singletonMap(
StorageType.LOCAL.getValue(), new StorageProperties.StorageTypeProperties())));
assertThrows(IllegalArgumentException.class, () -> localStorageClient.init());
}

@Test
public void testLocalStorageClientNullProperties() {
when(storageProperties.getTypes()).thenReturn(null);
assertDoesNotThrow(() -> localStorageClient.init());
}

@Test
public void testLocalStorageClientEmptyMap() {
when(storageProperties.getTypes()).thenReturn(new HashMap<>());
assertDoesNotThrow(() -> localStorageClient.init());
}

@Test
public void testLocalStorageClientValidProperties() {
when(storageProperties.getTypes())
.thenReturn(
new HashMap<>(
Collections.singletonMap(
StorageType.LOCAL.getValue(),
new StorageProperties.StorageTypeProperties(
"/tmp2", "file://", new HashMap<>()))));
assertDoesNotThrow(() -> localStorageClient.init());
}

@Test
public void testLocalStorageClientInValidEndpoint() {
when(storageProperties.getTypes())
.thenReturn(
new HashMap<>(
Collections.singletonMap(
StorageType.LOCAL.getValue(),
new StorageProperties.StorageTypeProperties(
"/tmp", "s3://", new HashMap<>()))));
assertThrows(IllegalArgumentException.class, () -> localStorageClient.init());
}

@Test
public void testLocalStorageClientInitialized() throws Exception {
when(storageProperties.getTypes()).thenReturn(null);
localStorageClient.init();
Object client = localStorageClient.getNativeClient();
assert client != null;
assert client instanceof LocalFileSystem;
}

@Test
public void testLocalStorageCanCreateFile() throws Exception {
java.util.Random random = new java.util.Random();
String tempFile = String.format("/tmp/testFile%s.orc", Math.abs(random.nextInt()));
when(storageProperties.getTypes())
.thenReturn(
new HashMap<>(
Collections.singletonMap(
StorageType.LOCAL.getValue(),
new StorageProperties.StorageTypeProperties(
"/tmp", "file://", new HashMap<>()))));
localStorageClient.init();
assert localStorageClient
.getNativeClient()
.createNewFile(new org.apache.hadoop.fs.Path(tempFile));
assert localStorageClient.getNativeClient().exists(new org.apache.hadoop.fs.Path(tempFile));
assert localStorageClient
.getNativeClient()
.delete(new org.apache.hadoop.fs.Path(tempFile), false);
}
}

0 comments on commit bc4a84c

Please sign in to comment.