Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add storage.upload(path) #269

Merged
merged 14 commits into from Jun 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 5 additions & 10 deletions google-cloud-storage/clirr-ignored-differences.xml
Expand Up @@ -2,24 +2,19 @@
<!-- see http://mojo.codehaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<className>com/google/cloud/storage/Storage</className>
<method>com.google.cloud.storage.PostPolicyV4 generateSignedPostPolicyV4(com.google.cloud.storage.BlobInfo, long, java.util.concurrent.TimeUnit, com.google.cloud.storage.PostPolicyV4$PostFieldsV4, com.google.cloud.storage.PostPolicyV4$PostConditionsV4, com.google.cloud.storage.Storage$PostPolicyV4Option[])</method>
<differenceType>7012</differenceType>
</difference>
<difference>
<className>com/google/cloud/storage/Storage</className>
<method>com.google.cloud.storage.PostPolicyV4 generateSignedPostPolicyV4(com.google.cloud.storage.BlobInfo, long, java.util.concurrent.TimeUnit, com.google.cloud.storage.PostPolicyV4$PostFieldsV4, com.google.cloud.storage.Storage$PostPolicyV4Option[])</method>
<differenceType>7012</differenceType>
<method>*.Blob createFrom(*.BlobInfo, java.nio.file.Path, *.Storage$BlobWriteOption[])</method>
</difference>
<difference>
<className>com/google/cloud/storage/Storage</className>
<method>com.google.cloud.storage.PostPolicyV4 generateSignedPostPolicyV4(com.google.cloud.storage.BlobInfo, long, java.util.concurrent.TimeUnit, com.google.cloud.storage.PostPolicyV4$PostConditionsV4, com.google.cloud.storage.Storage$PostPolicyV4Option[])</method>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/Storage</className>
<method>*.Blob createFrom(*.BlobInfo, java.io.InputStream, *.Storage$BlobWriteOption[])</method>
</difference>
<difference>
<className>com/google/cloud/storage/Storage</className>
<method>com.google.cloud.storage.PostPolicyV4 generateSignedPostPolicyV4(com.google.cloud.storage.BlobInfo, long, java.util.concurrent.TimeUnit, com.google.cloud.storage.Storage$PostPolicyV4Option[])</method>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/spi/v1/StorageRpc</className>
<method>*.StorageObject writeWithResponse(*.String, byte[], int, long, int, boolean)</method>
</difference>
<difference>
<className>com/google/cloud/storage/BucketInfo$Builder</className>
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.RetryHelper.runWithRetries;
import static java.util.concurrent.Executors.callable;

import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.BaseWriteChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.RetryHelper;
Expand Down Expand Up @@ -47,6 +48,13 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
super(options, null, uploadId);
}

// Contains metadata of the updated object or null if upload is not completed.
private StorageObject storageObject;

StorageObject getStorageObject() {
return storageObject;
}

@Override
protected void flushBuffer(final int length, final boolean last) {
try {
Expand All @@ -55,9 +63,11 @@ protected void flushBuffer(final int length, final boolean last) {
new Runnable() {
@Override
public void run() {
getOptions()
.getStorageRpcV1()
.write(getUploadId(), getBuffer(), 0, getPosition(), length, last);
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
}
}),
getOptions().getRetrySettings(),
Expand Down
Expand Up @@ -39,9 +39,11 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.BaseEncoding;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URL;
import java.nio.file.Path;
import java.security.Key;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1821,7 +1823,7 @@ public static Builder newBuilder() {
* Blob blob = storage.create(blobInfo);
* }</pre>
*
* @return a [@code Blob} with complete information
* @return a {@code Blob} with complete information
* @throws StorageException upon failure
*/
Blob create(BlobInfo blobInfo, BlobTargetOption... options);
Expand All @@ -1842,7 +1844,7 @@ public static Builder newBuilder() {
* Blob blob = storage.create(blobInfo, "Hello, World!".getBytes(UTF_8));
* }</pre>
*
* @return a [@code Blob} with complete information
* @return a {@code Blob} with complete information
* @throws StorageException upon failure
* @see <a href="https://cloud.google.com/storage/docs/hashes-etags">Hashes and ETags</a>
*/
Expand All @@ -1865,7 +1867,7 @@ public static Builder newBuilder() {
* Blob blob = storage.create(blobInfo, "Hello, World!".getBytes(UTF_8), 7, 5);
* }</pre>
*
* @return a [@code Blob} with complete information
* @return a {@code Blob} with complete information
* @throws StorageException upon failure
* @see <a href="https://cloud.google.com/storage/docs/hashes-etags">Hashes and ETags</a>
*/
Expand Down Expand Up @@ -1908,12 +1910,124 @@ Blob create(
* Blob blob = storage.create(blobInfo, content, BlobWriteOption.encryptionKey(encryptionKey));
* }</pre>
*
* @return a [@code Blob} with complete information
* @return a {@code Blob} with complete information
* @throws StorageException upon failure
*/
@Deprecated
Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options);

/**
* Uploads {@code path} to the blob using {@link #writer}. By default any MD5 and CRC32C values in
* the given {@code blobInfo} are ignored unless requested via the {@link
* BlobWriteOption#md5Match()} and {@link BlobWriteOption#crc32cMatch()} options. Folder upload is
* not supported.
*
* <p>Example of uploading a file:
*
* <pre>{@code
* String bucketName = "my-unique-bucket";
* String fileName = "readme.txt";
* BlobId blobId = BlobId.of(bucketName, fileName);
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
* storage.createFrom(blobInfo, Paths.get(fileName));
* }</pre>
*
* @param blobInfo blob to create
* @param path file to upload
* @param options blob write options
* @return a {@code Blob} with complete information
* @throws IOException on I/O error
* @throws StorageException on server side error
* @see #createFrom(BlobInfo, Path, int, BlobWriteOption...)
*/
Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options) throws IOException;

/**
* Uploads {@code path} to the blob using {@link #writer} and {@code bufferSize}. By default any
* MD5 and CRC32C values in the given {@code blobInfo} are ignored unless requested via the {@link
* BlobWriteOption#md5Match()} and {@link BlobWriteOption#crc32cMatch()} options. Folder upload is
* not supported.
*
* <p>{@link #createFrom(BlobInfo, Path, BlobWriteOption...)} invokes this method with a buffer
* size of 15 MiB. Users can pass alternative values. Larger buffer sizes might improve the upload
* performance but require more memory. This can cause an OutOfMemoryError or add significant
* garbage collection overhead. Smaller buffer sizes reduce memory consumption, that is noticeable
* when uploading many objects in parallel. Buffer sizes less than 256 KiB are treated as 256 KiB.
*
* <p>Example of uploading a humongous file:
*
* <pre>{@code
* BlobId blobId = BlobId.of(bucketName, blobName);
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("video/webm").build();
*
* int largeBufferSize = 150 * 1024 * 1024;
* Path file = Paths.get("humongous.file");
* storage.createFrom(blobInfo, file, largeBufferSize);
* }</pre>
*
* @param blobInfo blob to create
* @param path file to upload
* @param bufferSize size of the buffer I/O operations
* @param options blob write options
* @return a {@code Blob} with complete information
* @throws IOException on I/O error
* @throws StorageException on server side error
*/
Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options)
throws IOException;

/**
* Reads bytes from an input stream and uploads those bytes to the blob using {@link #writer}. By
* default any MD5 and CRC32C values in the given {@code blobInfo} are ignored unless requested
* via the {@link BlobWriteOption#md5Match()} and {@link BlobWriteOption#crc32cMatch()} options.
*
* <p>Example of uploading data with CRC32C checksum:
*
* <pre>{@code
* BlobId blobId = BlobId.of(bucketName, blobName);
* byte[] content = "Hello, world".getBytes(StandardCharsets.UTF_8);
* Hasher hasher = Hashing.crc32c().newHasher().putBytes(content);
* String crc32c = BaseEncoding.base64().encode(Ints.toByteArray(hasher.hash().asInt()));
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setCrc32c(crc32c).build();
* storage.createFrom(blobInfo, new ByteArrayInputStream(content), Storage.BlobWriteOption.crc32cMatch());
* }</pre>
*
* @param blobInfo blob to create
* @param content input stream to read from
* @param options blob write options
* @return a {@code Blob} with complete information
* @throws IOException on I/O error
* @throws StorageException on server side error
* @see #createFrom(BlobInfo, InputStream, int, BlobWriteOption...)
*/
Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption... options)
throws IOException;

/**
* Reads bytes from an input stream and uploads those bytes to the blob using {@link #writer} and
* {@code bufferSize}. By default any MD5 and CRC32C values in the given {@code blobInfo} are
* ignored unless requested via the {@link BlobWriteOption#md5Match()} and {@link
* BlobWriteOption#crc32cMatch()} options.
*
* <p>{@link #createFrom(BlobInfo, InputStream, BlobWriteOption...)} )} invokes this method with a
* buffer size of 15 MiB. Users can pass alternative values. Larger buffer sizes might improve the
* upload performance but require more memory. This can cause an OutOfMemoryError or add
* significant garbage collection overhead. Smaller buffer sizes reduce memory consumption, that
* is noticeable when uploading many objects in parallel. Buffer sizes less than 256 KiB are
* treated as 256 KiB.
*
* @param blobInfo blob to create
* @param content input stream to read from
* @param bufferSize size of the buffer I/O operations
* @param options blob write options
* @return a {@code Blob} with complete information
* @throws IOException on I/O error
* @throws StorageException on server side error
*/
Blob createFrom(
BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options)
throws IOException;

/**
* Returns the requested bucket or {@code null} if not found.
*
Expand Down
Expand Up @@ -48,6 +48,7 @@
import com.google.cloud.ReadChannel;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.Tuple;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
import com.google.cloud.storage.PostPolicyV4.ConditionV4Type;
Expand All @@ -70,12 +71,18 @@
import com.google.common.io.BaseEncoding;
import com.google.common.primitives.Ints;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -99,6 +106,9 @@ final class StorageImpl extends BaseService<StorageOptions> implements Storage {

private static final String STORAGE_XML_URI_HOST_NAME = "storage.googleapis.com";

private static final int DEFAULT_BUFFER_SIZE = 15 * 1024 * 1024;
private static final int MIN_BUFFER_SIZE = 256 * 1024;

private static final Function<Tuple<Storage, Boolean>, Boolean> DELETE_FUNCTION =
new Function<Tuple<Storage, Boolean>, Boolean>() {
@Override
Expand Down Expand Up @@ -211,6 +221,60 @@ public StorageObject call() {
}
}

@Override
public Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options)
throws IOException {
return createFrom(blobInfo, path, DEFAULT_BUFFER_SIZE, options);
}

@Override
public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options)
throws IOException {
if (Files.isDirectory(path)) {
throw new StorageException(0, path + " is a directory");
}
try (InputStream input = Files.newInputStream(path)) {
return createFrom(blobInfo, input, bufferSize, options);
}
}

@Override
public Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption... options)
throws IOException {
return createFrom(blobInfo, content, DEFAULT_BUFFER_SIZE, options);
}

@Override
public Blob createFrom(
BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options)
throws IOException {

BlobWriteChannel blobWriteChannel;
try (WriteChannel writer = writer(blobInfo, options)) {
blobWriteChannel = (BlobWriteChannel) writer;
uploadHelper(Channels.newChannel(content), writer, bufferSize);
}
StorageObject objectProto = blobWriteChannel.getStorageObject();
return Blob.fromPb(this, objectProto);
}

/*
* Uploads the given content to the storage using specified write channel and the given buffer
* size. This method does not close any channels.
*/
private static void uploadHelper(ReadableByteChannel reader, WriteChannel writer, int bufferSize)
throws IOException {
bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE);
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
writer.setChunkSize(bufferSize);

while (reader.read(buffer) >= 0) {
buffer.flip();
writer.write(buffer);
buffer.clear();
}
}

@Override
public Bucket get(String bucket, BucketGetOption... options) {
final com.google.api.services.storage.model.Bucket bucketPb = BucketInfo.of(bucket).toPb();
Expand Down
Expand Up @@ -725,11 +725,23 @@ public void write(
long destOffset,
int length,
boolean last) {
writeWithResponse(uploadId, toWrite, toWriteOffset, destOffset, length, last);
}

@Override
public StorageObject writeWithResponse(
String uploadId,
byte[] toWrite,
int toWriteOffset,
long destOffset,
int length,
boolean last) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_WRITE);
Scope scope = tracer.withSpan(span);
StorageObject updatedBlob = null;
try {
if (length == 0 && !last) {
return;
return updatedBlob;
}
GenericUrl url = new GenericUrl(uploadId);
HttpRequest httpRequest =
Expand All @@ -750,6 +762,9 @@ public void write(
range.append('*');
}
httpRequest.getHeaders().setContentRange(range.toString());
if (last) {
httpRequest.setParser(storage.getObjectParser());
}
int code;
String message;
IOException exception = null;
Expand All @@ -758,6 +773,13 @@ public void write(
response = httpRequest.execute();
code = response.getStatusCode();
message = response.getStatusMessage();
String contentType = response.getContentType();
if (last
&& (code == 200 || code == 201)
&& contentType != null
&& contentType.startsWith("application/json")) {
updatedBlob = response.parseAs(StorageObject.class);
}
} catch (HttpResponseException ex) {
exception = ex;
code = ex.getStatusCode();
Expand All @@ -783,6 +805,7 @@ public void write(
scope.close();
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
}
return updatedBlob;
}

@Override
Expand Down