Skip to content

Commit

Permalink
feat: add storage.upload(path) (#269)
Browse files Browse the repository at this point in the history
* feat: add storage.upload(path)

* feat: use mockito framework

* feat: update upload to return blob

* feat: do not parse response from writers for signed urls

* fix reviewer's comments

* Update Storage.upload() functionality after merge

* feat: deprecate StorageRpc.write(), introduce StorageRpc.upload()

* feat: deprecate StorageRpc.write(), introduce StorageRpc.upload()

* rename upload to createFrom
  • Loading branch information
dmitry-fa committed Jun 25, 2020
1 parent 3e02b9c commit 9457f3a
Show file tree
Hide file tree
Showing 11 changed files with 589 additions and 52 deletions.
15 changes: 5 additions & 10 deletions google-cloud-storage/clirr-ignored-differences.xml
Expand Up @@ -2,24 +2,19 @@
<!-- see http://mojo.codehaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<className>com/google/cloud/storage/Storage</className>
<method>com.google.cloud.storage.PostPolicyV4 generateSignedPostPolicyV4(com.google.cloud.storage.BlobInfo, long, java.util.concurrent.TimeUnit, com.google.cloud.storage.PostPolicyV4$PostFieldsV4, com.google.cloud.storage.PostPolicyV4$PostConditionsV4, com.google.cloud.storage.Storage$PostPolicyV4Option[])</method>
<differenceType>7012</differenceType>
</difference>
<difference>
<className>com/google/cloud/storage/Storage</className>
<method>com.google.cloud.storage.PostPolicyV4 generateSignedPostPolicyV4(com.google.cloud.storage.BlobInfo, long, java.util.concurrent.TimeUnit, com.google.cloud.storage.PostPolicyV4$PostFieldsV4, com.google.cloud.storage.Storage$PostPolicyV4Option[])</method>
<differenceType>7012</differenceType>
<method>*.Blob createFrom(*.BlobInfo, java.nio.file.Path, *.Storage$BlobWriteOption[])</method>
</difference>
<difference>
<className>com/google/cloud/storage/Storage</className>
<method>com.google.cloud.storage.PostPolicyV4 generateSignedPostPolicyV4(com.google.cloud.storage.BlobInfo, long, java.util.concurrent.TimeUnit, com.google.cloud.storage.PostPolicyV4$PostConditionsV4, com.google.cloud.storage.Storage$PostPolicyV4Option[])</method>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/Storage</className>
<method>*.Blob createFrom(*.BlobInfo, java.io.InputStream, *.Storage$BlobWriteOption[])</method>
</difference>
<difference>
<className>com/google/cloud/storage/Storage</className>
<method>com.google.cloud.storage.PostPolicyV4 generateSignedPostPolicyV4(com.google.cloud.storage.BlobInfo, long, java.util.concurrent.TimeUnit, com.google.cloud.storage.Storage$PostPolicyV4Option[])</method>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/spi/v1/StorageRpc</className>
<method>*.StorageObject writeWithResponse(*.String, byte[], int, long, int, boolean)</method>
</difference>
<difference>
<className>com/google/cloud/storage/BucketInfo$Builder</className>
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.RetryHelper.runWithRetries;
import static java.util.concurrent.Executors.callable;

import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.BaseWriteChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.RetryHelper;
Expand Down Expand Up @@ -47,6 +48,13 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
super(options, null, uploadId);
}

// Contains metadata of the updated object or null if upload is not completed.
private StorageObject storageObject;

StorageObject getStorageObject() {
return storageObject;
}

@Override
protected void flushBuffer(final int length, final boolean last) {
try {
Expand All @@ -55,9 +63,11 @@ protected void flushBuffer(final int length, final boolean last) {
new Runnable() {
@Override
public void run() {
getOptions()
.getStorageRpcV1()
.write(getUploadId(), getBuffer(), 0, getPosition(), length, last);
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
}
}),
getOptions().getRetrySettings(),
Expand Down
Expand Up @@ -39,9 +39,11 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.BaseEncoding;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URL;
import java.nio.file.Path;
import java.security.Key;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1821,7 +1823,7 @@ public static Builder newBuilder() {
* Blob blob = storage.create(blobInfo);
* }</pre>
*
* @return a [@code Blob} with complete information
* @return a {@code Blob} with complete information
* @throws StorageException upon failure
*/
Blob create(BlobInfo blobInfo, BlobTargetOption... options);
Expand All @@ -1842,7 +1844,7 @@ public static Builder newBuilder() {
* Blob blob = storage.create(blobInfo, "Hello, World!".getBytes(UTF_8));
* }</pre>
*
* @return a [@code Blob} with complete information
* @return a {@code Blob} with complete information
* @throws StorageException upon failure
* @see <a href="https://cloud.google.com/storage/docs/hashes-etags">Hashes and ETags</a>
*/
Expand All @@ -1865,7 +1867,7 @@ public static Builder newBuilder() {
* Blob blob = storage.create(blobInfo, "Hello, World!".getBytes(UTF_8), 7, 5);
* }</pre>
*
* @return a [@code Blob} with complete information
* @return a {@code Blob} with complete information
* @throws StorageException upon failure
* @see <a href="https://cloud.google.com/storage/docs/hashes-etags">Hashes and ETags</a>
*/
Expand Down Expand Up @@ -1908,12 +1910,124 @@ Blob create(
* Blob blob = storage.create(blobInfo, content, BlobWriteOption.encryptionKey(encryptionKey));
* }</pre>
*
* @return a [@code Blob} with complete information
* @return a {@code Blob} with complete information
* @throws StorageException upon failure
*/
@Deprecated
Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options);

/**
* Uploads {@code path} to the blob using {@link #writer}. By default any MD5 and CRC32C values in
* the given {@code blobInfo} are ignored unless requested via the {@link
* BlobWriteOption#md5Match()} and {@link BlobWriteOption#crc32cMatch()} options. Folder upload is
* not supported.
*
* <p>Example of uploading a file:
*
* <pre>{@code
* String bucketName = "my-unique-bucket";
* String fileName = "readme.txt";
* BlobId blobId = BlobId.of(bucketName, fileName);
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
* storage.createFrom(blobInfo, Paths.get(fileName));
* }</pre>
*
* @param blobInfo blob to create
* @param path file to upload
* @param options blob write options
* @return a {@code Blob} with complete information
* @throws IOException on I/O error
* @throws StorageException on server side error
* @see #createFrom(BlobInfo, Path, int, BlobWriteOption...)
*/
Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options) throws IOException;

/**
* Uploads {@code path} to the blob using {@link #writer} and {@code bufferSize}. By default any
* MD5 and CRC32C values in the given {@code blobInfo} are ignored unless requested via the {@link
* BlobWriteOption#md5Match()} and {@link BlobWriteOption#crc32cMatch()} options. Folder upload is
* not supported.
*
* <p>{@link #createFrom(BlobInfo, Path, BlobWriteOption...)} invokes this method with a buffer
* size of 15 MiB. Users can pass alternative values. Larger buffer sizes might improve the upload
* performance but require more memory. This can cause an OutOfMemoryError or add significant
* garbage collection overhead. Smaller buffer sizes reduce memory consumption, that is noticeable
* when uploading many objects in parallel. Buffer sizes less than 256 KiB are treated as 256 KiB.
*
* <p>Example of uploading a humongous file:
*
* <pre>{@code
* BlobId blobId = BlobId.of(bucketName, blobName);
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("video/webm").build();
*
* int largeBufferSize = 150 * 1024 * 1024;
* Path file = Paths.get("humongous.file");
* storage.createFrom(blobInfo, file, largeBufferSize);
* }</pre>
*
* @param blobInfo blob to create
* @param path file to upload
* @param bufferSize size of the buffer I/O operations
* @param options blob write options
* @return a {@code Blob} with complete information
* @throws IOException on I/O error
* @throws StorageException on server side error
*/
Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options)
throws IOException;

/**
* Reads bytes from an input stream and uploads those bytes to the blob using {@link #writer}. By
* default any MD5 and CRC32C values in the given {@code blobInfo} are ignored unless requested
* via the {@link BlobWriteOption#md5Match()} and {@link BlobWriteOption#crc32cMatch()} options.
*
* <p>Example of uploading data with CRC32C checksum:
*
* <pre>{@code
* BlobId blobId = BlobId.of(bucketName, blobName);
* byte[] content = "Hello, world".getBytes(StandardCharsets.UTF_8);
* Hasher hasher = Hashing.crc32c().newHasher().putBytes(content);
* String crc32c = BaseEncoding.base64().encode(Ints.toByteArray(hasher.hash().asInt()));
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setCrc32c(crc32c).build();
* storage.createFrom(blobInfo, new ByteArrayInputStream(content), Storage.BlobWriteOption.crc32cMatch());
* }</pre>
*
* @param blobInfo blob to create
* @param content input stream to read from
* @param options blob write options
* @return a {@code Blob} with complete information
* @throws IOException on I/O error
* @throws StorageException on server side error
* @see #createFrom(BlobInfo, InputStream, int, BlobWriteOption...)
*/
Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption... options)
throws IOException;

/**
* Reads bytes from an input stream and uploads those bytes to the blob using {@link #writer} and
* {@code bufferSize}. By default any MD5 and CRC32C values in the given {@code blobInfo} are
* ignored unless requested via the {@link BlobWriteOption#md5Match()} and {@link
* BlobWriteOption#crc32cMatch()} options.
*
* <p>{@link #createFrom(BlobInfo, InputStream, BlobWriteOption...)} )} invokes this method with a
* buffer size of 15 MiB. Users can pass alternative values. Larger buffer sizes might improve the
* upload performance but require more memory. This can cause an OutOfMemoryError or add
* significant garbage collection overhead. Smaller buffer sizes reduce memory consumption, that
* is noticeable when uploading many objects in parallel. Buffer sizes less than 256 KiB are
* treated as 256 KiB.
*
* @param blobInfo blob to create
* @param content input stream to read from
* @param bufferSize size of the buffer I/O operations
* @param options blob write options
* @return a {@code Blob} with complete information
* @throws IOException on I/O error
* @throws StorageException on server side error
*/
Blob createFrom(
BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options)
throws IOException;

/**
* Returns the requested bucket or {@code null} if not found.
*
Expand Down
Expand Up @@ -48,6 +48,7 @@
import com.google.cloud.ReadChannel;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.Tuple;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
import com.google.cloud.storage.PostPolicyV4.ConditionV4Type;
Expand All @@ -70,12 +71,18 @@
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Ints;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -99,6 +106,9 @@ final class StorageImpl extends BaseService<StorageOptions> implements Storage {

private static final String STORAGE_XML_URI_HOST_NAME = "storage.googleapis.com";

private static final int DEFAULT_BUFFER_SIZE = 15 * 1024 * 1024;
private static final int MIN_BUFFER_SIZE = 256 * 1024;

private static final Function<Tuple<Storage, Boolean>, Boolean> DELETE_FUNCTION =
new Function<Tuple<Storage, Boolean>, Boolean>() {
@Override
Expand Down Expand Up @@ -211,6 +221,60 @@ public StorageObject call() {
}
}

@Override
public Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options)
throws IOException {
return createFrom(blobInfo, path, DEFAULT_BUFFER_SIZE, options);
}

@Override
public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options)
throws IOException {
if (Files.isDirectory(path)) {
throw new StorageException(0, path + " is a directory");
}
try (InputStream input = Files.newInputStream(path)) {
return createFrom(blobInfo, input, bufferSize, options);
}
}

@Override
public Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption... options)
throws IOException {
return createFrom(blobInfo, content, DEFAULT_BUFFER_SIZE, options);
}

@Override
public Blob createFrom(
BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options)
throws IOException {

BlobWriteChannel blobWriteChannel;
try (WriteChannel writer = writer(blobInfo, options)) {
blobWriteChannel = (BlobWriteChannel) writer;
uploadHelper(Channels.newChannel(content), writer, bufferSize);
}
StorageObject objectProto = blobWriteChannel.getStorageObject();
return Blob.fromPb(this, objectProto);
}

/*
* Uploads the given content to the storage using specified write channel and the given buffer
* size. This method does not close any channels.
*/
private static void uploadHelper(ReadableByteChannel reader, WriteChannel writer, int bufferSize)
throws IOException {
bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE);
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
writer.setChunkSize(bufferSize);

while (reader.read(buffer) >= 0) {
buffer.flip();
writer.write(buffer);
buffer.clear();
}
}

@Override
public Bucket get(String bucket, BucketGetOption... options) {
final com.google.api.services.storage.model.Bucket bucketPb = BucketInfo.of(bucket).toPb();
Expand Down
Expand Up @@ -725,11 +725,23 @@ public void write(
long destOffset,
int length,
boolean last) {
writeWithResponse(uploadId, toWrite, toWriteOffset, destOffset, length, last);
}

@Override
public StorageObject writeWithResponse(
String uploadId,
byte[] toWrite,
int toWriteOffset,
long destOffset,
int length,
boolean last) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_WRITE);
Scope scope = tracer.withSpan(span);
StorageObject updatedBlob = null;
try {
if (length == 0 && !last) {
return;
return updatedBlob;
}
GenericUrl url = new GenericUrl(uploadId);
HttpRequest httpRequest =
Expand All @@ -750,6 +762,9 @@ public void write(
range.append('*');
}
httpRequest.getHeaders().setContentRange(range.toString());
if (last) {
httpRequest.setParser(storage.getObjectParser());
}
int code;
String message;
IOException exception = null;
Expand All @@ -758,6 +773,13 @@ public void write(
response = httpRequest.execute();
code = response.getStatusCode();
message = response.getStatusMessage();
String contentType = response.getContentType();
if (last
&& (code == 200 || code == 201)
&& contentType != null
&& contentType.startsWith("application/json")) {
updatedBlob = response.parseAs(StorageObject.class);
}
} catch (HttpResponseException ex) {
exception = ex;
code = ex.getStatusCode();
Expand All @@ -783,6 +805,7 @@ public void write(
scope.close();
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
}
return updatedBlob;
}

@Override
Expand Down

0 comments on commit 9457f3a

Please sign in to comment.