Skip to content

Commit

Permalink
fix: make CloudStorageFileSystem#forBucket thread safe (#719)
Browse files Browse the repository at this point in the history
Replace manual attempt at caching via a HashMap with a guava Cache.

Fix #691
Fix #698
  • Loading branch information
BenWhitehead committed Sep 27, 2021
1 parent 4de2c06 commit ac8bfee
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 35 deletions.
6 changes: 6 additions & 0 deletions google-cloud-nio/pom.xml
Expand Up @@ -71,6 +71,12 @@
<artifactId>truth</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Expand Up @@ -18,14 +18,19 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.requireNonNull;

import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -38,10 +43,9 @@
import java.nio.file.attribute.FileTime;
import java.nio.file.attribute.UserPrincipalLookupService;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -66,8 +70,21 @@ public final class CloudStorageFileSystem extends FileSystem {
private final CloudStorageFileSystemProvider provider;
private final String bucket;
private final CloudStorageConfiguration config;
private static final Map<CloudStorageConfiguration, Set<CloudStorageFileSystemProvider>>
CONFIG_TO_PROVIDERS_MAP = new HashMap<>();
private static final LoadingCache<ProviderCacheKey, CloudStorageFileSystemProvider>
PROVIDER_CACHE_BY_CONFIG =
CacheBuilder.newBuilder()
.build(
new CacheLoader<ProviderCacheKey, CloudStorageFileSystemProvider>() {
@Override
public CloudStorageFileSystemProvider load(ProviderCacheKey key) {
CloudStorageConfiguration config = key.cloudStorageConfiguration;
StorageOptions storageOptions = key.storageOptions;
String userProject = config.userProject();
return (storageOptions == null)
? new CloudStorageFileSystemProvider(userProject)
: new CloudStorageFileSystemProvider(userProject, storageOptions);
}
});

// Users can change this: then this affects every filesystem object created
// later, including via SPI. This is meant to be done only once, at the beginning
Expand Down Expand Up @@ -144,32 +161,7 @@ public static CloudStorageFileSystem forBucket(String bucket) {
*/
@CheckReturnValue
public static CloudStorageFileSystem forBucket(String bucket, CloudStorageConfiguration config) {
checkArgument(
!bucket.startsWith(URI_SCHEME + ":"), "Bucket name must not have schema: %s", bucket);
checkNotNull(config);
return new CloudStorageFileSystem(
getCloudStorageFileSystemProvider(config, null), bucket, config);
}

private static CloudStorageFileSystemProvider getCloudStorageFileSystemProvider(
CloudStorageConfiguration config, StorageOptions storageOptions) {
CloudStorageFileSystemProvider newProvider =
(storageOptions == null)
? new CloudStorageFileSystemProvider(config.userProject())
: new CloudStorageFileSystemProvider(config.userProject(), storageOptions);
Set<CloudStorageFileSystemProvider> existingProviders = CONFIG_TO_PROVIDERS_MAP.get(config);
if (existingProviders == null) {
existingProviders = new HashSet<>();
} else {
for (CloudStorageFileSystemProvider existiningProvider : existingProviders) {
if (existiningProvider.equals(newProvider)) {
return existiningProvider;
}
}
}
existingProviders.add(newProvider);
CONFIG_TO_PROVIDERS_MAP.put(config, existingProviders);
return newProvider;
return forBucket(bucket, config, null);
}

/**
Expand All @@ -192,8 +184,16 @@ public static CloudStorageFileSystem forBucket(
String bucket, CloudStorageConfiguration config, @Nullable StorageOptions storageOptions) {
checkArgument(
!bucket.startsWith(URI_SCHEME + ":"), "Bucket name must not have schema: %s", bucket);
return new CloudStorageFileSystem(
getCloudStorageFileSystemProvider(config, storageOptions), bucket, checkNotNull(config));
checkNotNull(config);
CloudStorageFileSystemProvider result;
ProviderCacheKey providerCacheKey = new ProviderCacheKey(config, storageOptions);
try {
result = PROVIDER_CACHE_BY_CONFIG.get(providerCacheKey);
} catch (ExecutionException | UncheckedExecutionException e) {
throw new IllegalStateException(
"Unable to resolve CloudStorageFileSystemProvider for the provided configuration", e);
}
return new CloudStorageFileSystem(result, bucket, config);
}

CloudStorageFileSystem(
Expand Down Expand Up @@ -335,4 +335,50 @@ public String toString() {
throw new AssertionError(e);
}
}

/**
* In order to cache a {@link CloudStorageFileSystemProvider} we track the config used to
* instantiate that provider. This class creates an immutable key encapsulating the config to
* allow reliable resolution from the cache.
*/
private static final class ProviderCacheKey {
private final CloudStorageConfiguration cloudStorageConfiguration;
@Nullable private final StorageOptions storageOptions;

public ProviderCacheKey(
CloudStorageConfiguration cloudStorageConfiguration,
@Nullable StorageOptions storageOptions) {
this.cloudStorageConfiguration =
requireNonNull(cloudStorageConfiguration, "cloudStorageConfiguration must be non null");
this.storageOptions = storageOptions;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ProviderCacheKey)) {
return false;
}
ProviderCacheKey that = (ProviderCacheKey) o;
return cloudStorageConfiguration.equals(that.cloudStorageConfiguration)
&& Objects.equals(storageOptions, that.storageOptions);
}

@Override
public int hashCode() {
return Objects.hash(cloudStorageConfiguration, storageOptions);
}

@Override
public String toString() {
return "ConfigTuple{"
+ "cloudStorageConfiguration="
+ cloudStorageConfiguration
+ ", storageOptions="
+ storageOptions
+ '}';
}
}
}
Expand Up @@ -23,12 +23,16 @@
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;

import com.google.api.gax.rpc.internal.QuotaProjectIdHidingCredentials;
import com.google.auth.Credentials;
import com.google.cloud.NoCredentials;
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
import com.google.cloud.testing.junit4.MultipleAttemptsRule;
import com.google.common.testing.EqualsTester;
import com.google.common.testing.NullPointerTester;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.FileSystem;
Expand Down Expand Up @@ -374,6 +378,46 @@ public void testDifferentProvider() throws IOException {
assertEquals("new-bucket", destFileSystem.bucket());
}
}

// port of test from
// https://github.com/broadinstitute/cromwell/pull/6491/files#diff-758dbbe823e71cc26fee7bc89cd5c434dfb76e604d51005b8327db59aab96068R300-R336
@Test
public void ensureMultipleInstancesDoNotCorruptCredentials() throws Exception {

CloudStorageConfiguration config =
CloudStorageConfiguration.builder()
.permitEmptyPathComponents(true)
.stripPrefixSlash(true)
.usePseudoDirectories(true)
.build();

Credentials noCredentials = NoCredentials.getInstance();
Credentials saCredentials = new QuotaProjectIdHidingCredentials(noCredentials);

StorageOptions noOptions =
StorageOptions.newBuilder()
.setProjectId("public-project")
.setCredentials(noCredentials)
.build();

StorageOptions saOptions =
StorageOptions.newBuilder()
.setProjectId("private-project")
.setCredentials(saCredentials)
.build();

CloudStorageFileSystem noFs =
CloudStorageFileSystem.forBucket("public-bucket", config, noOptions);
CloudStorageFileSystem saFs =
CloudStorageFileSystem.forBucket("private-bucket", config, saOptions);

CloudStoragePath noPath = noFs.getPath("public-file");
CloudStoragePath saPath = saFs.getPath("private-file");

assertThat(credentialsForPath(noPath)).isEqualTo(noCredentials);
assertThat(credentialsForPath(saPath)).isEqualTo(saCredentials);
}

/**
* Delete the given directory and all of its contents if non-empty.
*
Expand Down Expand Up @@ -402,4 +446,19 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx
private void assertMatches(FileSystem fs, PathMatcher matcher, String toMatch, boolean expected) {
assertThat(matcher.matches(fs.getPath(toMatch).getFileName())).isEqualTo(expected);
}

private static Credentials credentialsForPath(Path p)
throws NoSuchFieldException, IllegalAccessException {
CloudStorageFileSystemProvider cloudFilesystemProvider =
(CloudStorageFileSystemProvider) p.getFileSystem().provider();
Field storageOptionsField =
cloudFilesystemProvider.getClass().getDeclaredField("storageOptions");
storageOptionsField.setAccessible(true);
StorageOptions storageOptions =
(StorageOptions) storageOptionsField.get(cloudFilesystemProvider);
Field credentialsField =
storageOptions.getClass().getSuperclass().getDeclaredField("credentials");
credentialsField.setAccessible(true);
return (Credentials) credentialsField.get(storageOptions);
}
}
Expand Up @@ -32,6 +32,7 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

Expand All @@ -40,12 +41,17 @@
public class CloudStorageIsDirectoryTest {
@Rule public final MultipleAttemptsRule multipleAttemptsRule = new MultipleAttemptsRule(3);

@Rule public final TestName testName = new TestName();

private StorageOptions mockOptions;
private Storage mockStorage;

@Before
public void before() {
mockOptions = mock(StorageOptions.class);
mockOptions =
mock(
StorageOptions.class,
String.format("storage-options-mock_%s", testName.getMethodName()));
mockStorage = mock(Storage.class);
when(mockOptions.getService()).thenReturn(mockStorage);
CloudStorageFileSystemProvider.setStorageOptions(mockOptions);
Expand All @@ -54,7 +60,7 @@ public void before() {
@Test
public void testIsDirectoryNoUserProject() {
CloudStorageFileSystem fs =
CloudStorageFileSystem.forBucket("bucket", CloudStorageConfiguration.DEFAULT);
CloudStorageFileSystem.forBucket("bucket", CloudStorageConfiguration.DEFAULT, mockOptions);
when(mockStorage.get(BlobId.of("bucket", "test", null)))
.thenThrow(new IllegalArgumentException());
Page<Blob> pages = mock(Page.class);
Expand All @@ -74,7 +80,9 @@ public void testIsDirectoryNoUserProject() {
public void testIsDirectoryWithUserProject() {
CloudStorageFileSystem fs =
CloudStorageFileSystem.forBucket(
"bucket", CloudStorageConfiguration.builder().userProject("project-id").build());
"bucket",
CloudStorageConfiguration.builder().userProject("project-id").build(),
mockOptions);
when(mockStorage.get(BlobId.of("bucket", "test", null)))
.thenThrow(new IllegalArgumentException());
Page<Blob> pages = mock(Page.class);
Expand Down

0 comments on commit ac8bfee

Please sign in to comment.