Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add storage.upload(path) #269

Merged
merged 14 commits into from Jun 25, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 26 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/Storage</className>
<method>*.Blob upload(*.BlobInfo, java.nio.file.Path, *.Storage$BlobWriteOption[])</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/Storage</className>
<method>*.Blob upload(*.BlobInfo, java.io.InputStream, *.Storage$BlobWriteOption[])</method>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/storage/spi/v1/StorageRpc</className>
<method>void write(*.String, byte[], int, long, int, boolean)</method>
<to>com.google.api.services.storage.model.StorageObject</to>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/storage/spi/v1/HttpStorageRpc</className>
<method>void write(*.String, byte[], int, long, int, boolean)</method>
<to>com.google.api.services.storage.model.StorageObject</to>
</difference>
</differences>
5 changes: 5 additions & 0 deletions google-cloud-storage/pom.xml
Expand Up @@ -156,6 +156,11 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you split out mockito tests into a separate PR because it got a bit confusing in review? Apologies for churn on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Done #284

<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.RetryHelper.runWithRetries;
import static java.util.concurrent.Executors.callable;

import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.BaseWriteChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.RetryHelper;
Expand Down Expand Up @@ -47,6 +48,13 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
super(options, null, uploadId);
}

// Contains metadata of the updated object or null if upload is not completed.
private StorageObject objectProto;
dmitry-fa marked this conversation as resolved.
Show resolved Hide resolved

StorageObject getObjectProto() {
dmitry-fa marked this conversation as resolved.
Show resolved Hide resolved
return objectProto;
}

@Override
protected void flushBuffer(final int length, final boolean last) {
try {
Expand All @@ -55,9 +63,10 @@ protected void flushBuffer(final int length, final boolean last) {
new Runnable() {
@Override
public void run() {
getOptions()
.getStorageRpcV1()
.write(getUploadId(), getBuffer(), 0, getPosition(), length, last);
objectProto =
getOptions()
.getStorageRpcV1()
.write(getUploadId(), getBuffer(), 0, getPosition(), length, last);
}
}),
getOptions().getRetrySettings(),
Expand Down
Expand Up @@ -37,9 +37,11 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.BaseEncoding;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URL;
import java.nio.file.Path;
import java.security.Key;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1718,7 +1720,7 @@ public static Builder newBuilder() {
* Blob blob = storage.create(blobInfo);
* }</pre>
*
* @return a [@code Blob} with complete information
* @return a {@code Blob} with complete information
* @throws StorageException upon failure
*/
Blob create(BlobInfo blobInfo, BlobTargetOption... options);
Expand All @@ -1739,7 +1741,7 @@ public static Builder newBuilder() {
* Blob blob = storage.create(blobInfo, "Hello, World!".getBytes(UTF_8));
* }</pre>
*
* @return a [@code Blob} with complete information
* @return a {@code Blob} with complete information
* @throws StorageException upon failure
* @see <a href="https://cloud.google.com/storage/docs/hashes-etags">Hashes and ETags</a>
*/
Expand All @@ -1762,7 +1764,7 @@ public static Builder newBuilder() {
* Blob blob = storage.create(blobInfo, "Hello, World!".getBytes(UTF_8), 7, 5);
* }</pre>
*
* @return a [@code Blob} with complete information
* @return a {@code Blob} with complete information
* @throws StorageException upon failure
* @see <a href="https://cloud.google.com/storage/docs/hashes-etags">Hashes and ETags</a>
*/
Expand Down Expand Up @@ -1805,12 +1807,123 @@ Blob create(
* Blob blob = storage.create(blobInfo, content, BlobWriteOption.encryptionKey(encryptionKey));
* }</pre>
*
* @return a [@code Blob} with complete information
* @return a {@code Blob} with complete information
* @throws StorageException upon failure
*/
@Deprecated
Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options);

/**
* Uploads {@code path} to the blob using {@link #writer}. By default any MD5 and CRC32C values in
* the given {@code blobInfo} are ignored unless requested via the {@link
* BlobWriteOption#md5Match()} and {@link BlobWriteOption#crc32cMatch()} options. Folder upload is
* not supported.
*
* <p>Example of uploading a file:
*
* <pre>{@code
* String bucketName = "my-unique-bucket";
* String fileName = "readme.txt";
* BlobId blobId = BlobId.of(bucketName, fileName);
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
* storage.upload(blobInfo, Paths.get(fileName));
* }</pre>
*
* @param blobInfo blob to create
* @param path file to upload
* @param options blob write options
* @return a {@code Blob} with complete information
* @throws IOException on I/O error
* @throws StorageException on failure
* @see #upload(BlobInfo, Path, int, BlobWriteOption...)
*/
Blob upload(BlobInfo blobInfo, Path path, BlobWriteOption... options) throws IOException;

/**
* Uploads {@code path} to the blob using {@link #writer} and {@code bufferSize}. By default any
* MD5 and CRC32C values in the given {@code blobInfo} are ignored unless requested via the {@link
* BlobWriteOption#md5Match()} and {@link BlobWriteOption#crc32cMatch()} options. Folder upload is
* not supported.
*
* <p>{@link #upload(BlobInfo, Path, BlobWriteOption...)} invokes this method with a buffer size
* of 15 MiB. Users can pass alternative values. Larger buffer sizes might improve the upload
* performance but require more memory. This can cause an OutOfMemoryError or add significant
* garbage collection overhead. Smaller buffer sizes reduce memory consumption, that is noticeable
* when uploading many objects in parallel. Buffer sizes less than 256 KiB are treated as 256 KiB.
*
* <p>Example of uploading a humongous file:
*
* <pre>{@code
* BlobId blobId = BlobId.of(bucketName, blobName);
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("video/webm").build();
*
* int largeBufferSize = 150 * 1024 * 1024;
* Path file = Paths.get("humongous.file");
* storage.upload(blobInfo, file, largeBufferSize);
* }</pre>
*
* @param blobInfo blob to create
* @param path file to upload
* @param bufferSize size of the buffer I/O operations
* @param options blob write options
* @return a {@code Blob} with complete information
* @throws IOException on I/O error
* @throws StorageException on failure
*/
Blob upload(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options)
throws IOException;

/**
* Reads bytes from an input stream and uploads those bytes to the blob using {@link #writer}. By
* default any MD5 and CRC32C values in the given {@code blobInfo} are ignored unless requested
* via the {@link BlobWriteOption#md5Match()} and {@link BlobWriteOption#crc32cMatch()} options.
*
* <p>Example of uploading data with CRC32C checksum:
*
* <pre>{@code
* BlobId blobId = BlobId.of(bucketName, blobName);
* byte[] content = "Hello, world".getBytes(StandardCharsets.UTF_8);
* Hasher hasher = Hashing.crc32c().newHasher().putBytes(content);
* String crc32c = BaseEncoding.base64().encode(Ints.toByteArray(hasher.hash().asInt()));
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setCrc32c(crc32c).build();
* storage.upload(blobInfo, new ByteArrayInputStream(content), Storage.BlobWriteOption.crc32cMatch());
* }</pre>
*
* @param blobInfo blob to create
* @param content input stream to read from
* @param options blob write options
* @return a {@code Blob} with complete information
* @throws IOException on I/O error
* @throws StorageException on failure
* @see #upload(BlobInfo, InputStream, int, BlobWriteOption...)
*/
Blob upload(BlobInfo blobInfo, InputStream content, BlobWriteOption... options)
throws IOException;

/**
* Reads bytes from an input stream and uploads those bytes to the blob using {@link #writer} and
* {@code bufferSize}. By default any MD5 and CRC32C values in the given {@code blobInfo} are
* ignored unless requested via the {@link BlobWriteOption#md5Match()} and {@link
* BlobWriteOption#crc32cMatch()} options.
*
* <p>{@link #upload(BlobInfo, InputStream, BlobWriteOption...)} )} invokes this method with a
* buffer size of 15 MiB. Users can pass alternative values. Larger buffer sizes might improve the
* upload performance but require more memory. This can cause an OutOfMemoryError or add
* significant garbage collection overhead. Smaller buffer sizes reduce memory consumption, that
* is noticeable when uploading many objects in parallel. Buffer sizes less than 256 KiB are
* treated as 256 KiB.
*
* @param blobInfo blob to create
* @param content input stream to read from
* @param bufferSize size of the buffer I/O operations
* @param options blob write options
* @return a {@code Blob} with complete information
* @throws IOException on I/O error
* @throws StorageException on failure
dmitry-fa marked this conversation as resolved.
Show resolved Hide resolved
*/
Blob upload(BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options)
throws IOException;

/**
* Returns the requested bucket or {@code null} if not found.
*
Expand Down
Expand Up @@ -48,6 +48,7 @@
import com.google.cloud.ReadChannel;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.Tuple;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
import com.google.cloud.storage.spi.v1.StorageRpc;
Expand All @@ -66,12 +67,18 @@
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Ints;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
Expand All @@ -92,6 +99,9 @@ final class StorageImpl extends BaseService<StorageOptions> implements Storage {

private static final String STORAGE_XML_URI_HOST_NAME = "storage.googleapis.com";

private static final int DEFAULT_BUFFER_SIZE = 15 * 1024 * 1024;
private static final int MIN_BUFFER_SIZE = 256 * 1024;

private static final Function<Tuple<Storage, Boolean>, Boolean> DELETE_FUNCTION =
new Function<Tuple<Storage, Boolean>, Boolean>() {
@Override
Expand Down Expand Up @@ -204,6 +214,59 @@ public StorageObject call() {
}
}

@Override
public Blob upload(BlobInfo blobInfo, Path path, BlobWriteOption... options) throws IOException {
return upload(blobInfo, path, DEFAULT_BUFFER_SIZE, options);
}

@Override
public Blob upload(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options)
throws IOException {
if (Files.isDirectory(path)) {
throw new StorageException(0, path + " is a directory");
}
try (InputStream input = Files.newInputStream(path)) {
return upload(blobInfo, input, bufferSize, options);
}
}

@Override
public Blob upload(BlobInfo blobInfo, InputStream content, BlobWriteOption... options)
throws IOException {
return upload(blobInfo, content, DEFAULT_BUFFER_SIZE, options);
}

@Override
public Blob upload(
BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options)
throws IOException {

BlobWriteChannel blobWriteChannel;
try (WriteChannel writer = writer(blobInfo, options)) {
blobWriteChannel = (BlobWriteChannel) writer;
uploadHelper(Channels.newChannel(content), writer, bufferSize);
}
StorageObject objectProto = blobWriteChannel.getObjectProto();
return Blob.fromPb(this, objectProto);
}

/*
* Uploads the given content to the storage using specified write channel and the given buffer
* size. This method does not close any channels.
*/
private static void uploadHelper(ReadableByteChannel reader, WriteChannel writer, int bufferSize)
throws IOException {
bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE);
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
writer.setChunkSize(bufferSize);

while (reader.read(buffer) >= 0) {
buffer.flip();
writer.write(buffer);
buffer.clear();
}
}

@Override
public Bucket get(String bucket, BucketGetOption... options) {
final com.google.api.services.storage.model.Bucket bucketPb = BucketInfo.of(bucket).toPb();
Expand Down
Expand Up @@ -713,7 +713,7 @@ public Tuple<String, byte[]> read(
}

@Override
public void write(
public StorageObject write(
String uploadId,
byte[] toWrite,
int toWriteOffset,
Expand All @@ -722,9 +722,10 @@ public void write(
boolean last) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_WRITE);
Scope scope = tracer.withSpan(span);
StorageObject updatedBlob = null;
try {
if (length == 0 && !last) {
return;
return updatedBlob;
}
GenericUrl url = new GenericUrl(uploadId);
HttpRequest httpRequest =
Expand All @@ -745,6 +746,9 @@ public void write(
range.append('*');
}
httpRequest.getHeaders().setContentRange(range.toString());
if (last) {
httpRequest.setParser(storage.getObjectParser());
}
int code;
String message;
IOException exception = null;
Expand All @@ -753,6 +757,13 @@ public void write(
response = httpRequest.execute();
code = response.getStatusCode();
message = response.getStatusMessage();
String contentType = response.getContentType();
if (last
&& (code == 200 || code == 201)
&& contentType != null
&& contentType.startsWith("application/json")) {
updatedBlob = response.parseAs(StorageObject.class);
}
} catch (HttpResponseException ex) {
exception = ex;
code = ex.getStatusCode();
Expand All @@ -778,6 +789,7 @@ public void write(
scope.close();
span.end();
}
return updatedBlob;
}

@Override
Expand Down