diff --git a/google-cloud-storage/clirr-ignored-differences.xml b/google-cloud-storage/clirr-ignored-differences.xml index ace7a6ef0..bca2faaff 100644 --- a/google-cloud-storage/clirr-ignored-differences.xml +++ b/google-cloud-storage/clirr-ignored-differences.xml @@ -2,24 +2,19 @@ - com/google/cloud/storage/Storage - com.google.cloud.storage.PostPolicyV4 generateSignedPostPolicyV4(com.google.cloud.storage.BlobInfo, long, java.util.concurrent.TimeUnit, com.google.cloud.storage.PostPolicyV4$PostFieldsV4, com.google.cloud.storage.PostPolicyV4$PostConditionsV4, com.google.cloud.storage.Storage$PostPolicyV4Option[]) 7012 - - com/google/cloud/storage/Storage - com.google.cloud.storage.PostPolicyV4 generateSignedPostPolicyV4(com.google.cloud.storage.BlobInfo, long, java.util.concurrent.TimeUnit, com.google.cloud.storage.PostPolicyV4$PostFieldsV4, com.google.cloud.storage.Storage$PostPolicyV4Option[]) - 7012 + *.Blob createFrom(*.BlobInfo, java.nio.file.Path, *.Storage$BlobWriteOption[]) - com/google/cloud/storage/Storage - com.google.cloud.storage.PostPolicyV4 generateSignedPostPolicyV4(com.google.cloud.storage.BlobInfo, long, java.util.concurrent.TimeUnit, com.google.cloud.storage.PostPolicyV4$PostConditionsV4, com.google.cloud.storage.Storage$PostPolicyV4Option[]) 7012 + com/google/cloud/storage/Storage + *.Blob createFrom(*.BlobInfo, java.io.InputStream, *.Storage$BlobWriteOption[]) - com/google/cloud/storage/Storage - com.google.cloud.storage.PostPolicyV4 generateSignedPostPolicyV4(com.google.cloud.storage.BlobInfo, long, java.util.concurrent.TimeUnit, com.google.cloud.storage.Storage$PostPolicyV4Option[]) 7012 + com/google/cloud/storage/spi/v1/StorageRpc + *.StorageObject writeWithResponse(*.String, byte[], int, long, int, boolean) com/google/cloud/storage/BucketInfo$Builder diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java index ec5376697..0c9520849 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java @@ -19,6 +19,7 @@ import static com.google.cloud.RetryHelper.runWithRetries; import static java.util.concurrent.Executors.callable; +import com.google.api.services.storage.model.StorageObject; import com.google.cloud.BaseWriteChannel; import com.google.cloud.RestorableState; import com.google.cloud.RetryHelper; @@ -47,6 +48,13 @@ class BlobWriteChannel extends BaseWriteChannel { super(options, null, uploadId); } + // Contains metadata of the updated object or null if upload is not completed. + private StorageObject storageObject; + + StorageObject getStorageObject() { + return storageObject; + } + @Override protected void flushBuffer(final int length, final boolean last) { try { @@ -55,9 +63,11 @@ protected void flushBuffer(final int length, final boolean last) { new Runnable() { @Override public void run() { - getOptions() - .getStorageRpcV1() - .write(getUploadId(), getBuffer(), 0, getPosition(), length, last); + storageObject = + getOptions() + .getStorageRpcV1() + .writeWithResponse( + getUploadId(), getBuffer(), 0, getPosition(), length, last); } }), getOptions().getRetrySettings(), diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java index 988f87d39..eb15dd37a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java @@ -39,9 +39,11 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.io.BaseEncoding; +import java.io.IOException; import java.io.InputStream; import java.io.Serializable; import java.net.URL; +import java.nio.file.Path; import java.security.Key; import java.util.Arrays; import java.util.Collections; @@ -1821,7 +1823,7 @@ public static Builder newBuilder() { * Blob blob = storage.create(blobInfo); * } * - * @return a [@code Blob} with complete information + * @return a {@code Blob} with complete information * @throws StorageException upon failure */ Blob create(BlobInfo blobInfo, BlobTargetOption... options); @@ -1842,7 +1844,7 @@ public static Builder newBuilder() { * Blob blob = storage.create(blobInfo, "Hello, World!".getBytes(UTF_8)); * } * - * @return a [@code Blob} with complete information + * @return a {@code Blob} with complete information * @throws StorageException upon failure * @see Hashes and ETags */ @@ -1865,7 +1867,7 @@ public static Builder newBuilder() { * Blob blob = storage.create(blobInfo, "Hello, World!".getBytes(UTF_8), 7, 5); * } * - * @return a [@code Blob} with complete information + * @return a {@code Blob} with complete information * @throws StorageException upon failure * @see Hashes and ETags */ @@ -1908,12 +1910,124 @@ Blob create( * Blob blob = storage.create(blobInfo, content, BlobWriteOption.encryptionKey(encryptionKey)); * } * - * @return a [@code Blob} with complete information + * @return a {@code Blob} with complete information * @throws StorageException upon failure */ @Deprecated Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options); + /** + * Uploads {@code path} to the blob using {@link #writer}. By default any MD5 and CRC32C values in + * the given {@code blobInfo} are ignored unless requested via the {@link + * BlobWriteOption#md5Match()} and {@link BlobWriteOption#crc32cMatch()} options. Folder upload is + * not supported. + * + *

Example of uploading a file: + * + *

{@code
+   * String bucketName = "my-unique-bucket";
+   * String fileName = "readme.txt";
+   * BlobId blobId = BlobId.of(bucketName, fileName);
+   * BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
+   * storage.createFrom(blobInfo, Paths.get(fileName));
+   * }
+ * + * @param blobInfo blob to create + * @param path file to upload + * @param options blob write options + * @return a {@code Blob} with complete information + * @throws IOException on I/O error + * @throws StorageException on server side error + * @see #createFrom(BlobInfo, Path, int, BlobWriteOption...) + */ + Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options) throws IOException; + + /** + * Uploads {@code path} to the blob using {@link #writer} and {@code bufferSize}. By default any + * MD5 and CRC32C values in the given {@code blobInfo} are ignored unless requested via the {@link + * BlobWriteOption#md5Match()} and {@link BlobWriteOption#crc32cMatch()} options. Folder upload is + * not supported. + * + *

{@link #createFrom(BlobInfo, Path, BlobWriteOption...)} invokes this method with a buffer + * size of 15 MiB. Users can pass alternative values. Larger buffer sizes might improve the upload + * performance but require more memory. This can cause an OutOfMemoryError or add significant + * garbage collection overhead. Smaller buffer sizes reduce memory consumption, that is noticeable + * when uploading many objects in parallel. Buffer sizes less than 256 KiB are treated as 256 KiB. + * + *

Example of uploading a humongous file: + * + *

{@code
+   * BlobId blobId = BlobId.of(bucketName, blobName);
+   * BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("video/webm").build();
+   *
+   * int largeBufferSize = 150 * 1024 * 1024;
+   * Path file = Paths.get("humongous.file");
+   * storage.createFrom(blobInfo, file, largeBufferSize);
+   * }
+ * + * @param blobInfo blob to create + * @param path file to upload + * @param bufferSize size of the buffer I/O operations + * @param options blob write options + * @return a {@code Blob} with complete information + * @throws IOException on I/O error + * @throws StorageException on server side error + */ + Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options) + throws IOException; + + /** + * Reads bytes from an input stream and uploads those bytes to the blob using {@link #writer}. By + * default any MD5 and CRC32C values in the given {@code blobInfo} are ignored unless requested + * via the {@link BlobWriteOption#md5Match()} and {@link BlobWriteOption#crc32cMatch()} options. + * + *

Example of uploading data with CRC32C checksum: + * + *

{@code
+   * BlobId blobId = BlobId.of(bucketName, blobName);
+   * byte[] content = "Hello, world".getBytes(StandardCharsets.UTF_8);
+   * Hasher hasher = Hashing.crc32c().newHasher().putBytes(content);
+   * String crc32c = BaseEncoding.base64().encode(Ints.toByteArray(hasher.hash().asInt()));
+   * BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setCrc32c(crc32c).build();
+   * storage.createFrom(blobInfo, new ByteArrayInputStream(content), Storage.BlobWriteOption.crc32cMatch());
+   * }
+ * + * @param blobInfo blob to create + * @param content input stream to read from + * @param options blob write options + * @return a {@code Blob} with complete information + * @throws IOException on I/O error + * @throws StorageException on server side error + * @see #createFrom(BlobInfo, InputStream, int, BlobWriteOption...) + */ + Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) + throws IOException; + + /** + * Reads bytes from an input stream and uploads those bytes to the blob using {@link #writer} and + * {@code bufferSize}. By default any MD5 and CRC32C values in the given {@code blobInfo} are + * ignored unless requested via the {@link BlobWriteOption#md5Match()} and {@link + * BlobWriteOption#crc32cMatch()} options. + * + *

{@link #createFrom(BlobInfo, InputStream, BlobWriteOption...)} )} invokes this method with a + * buffer size of 15 MiB. Users can pass alternative values. Larger buffer sizes might improve the + * upload performance but require more memory. This can cause an OutOfMemoryError or add + * significant garbage collection overhead. Smaller buffer sizes reduce memory consumption, that + * is noticeable when uploading many objects in parallel. Buffer sizes less than 256 KiB are + * treated as 256 KiB. + * + * @param blobInfo blob to create + * @param content input stream to read from + * @param bufferSize size of the buffer I/O operations + * @param options blob write options + * @return a {@code Blob} with complete information + * @throws IOException on I/O error + * @throws StorageException on server side error + */ + Blob createFrom( + BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options) + throws IOException; + /** * Returns the requested bucket or {@code null} if not found. * diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index 0e24521eb..b1f2bfe3c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -48,6 +48,7 @@ import com.google.cloud.ReadChannel; import com.google.cloud.RetryHelper.RetryHelperException; import com.google.cloud.Tuple; +import com.google.cloud.WriteChannel; import com.google.cloud.storage.Acl.Entity; import com.google.cloud.storage.HmacKey.HmacKeyMetadata; import com.google.cloud.storage.PostPolicyV4.ConditionV4Type; @@ -70,12 +71,18 @@ import com.google.common.io.BaseEncoding; import com.google.common.primitives.Ints; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; import java.net.URLEncoder; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Collections; @@ -99,6 +106,9 @@ final class StorageImpl extends BaseService implements Storage { private static final String STORAGE_XML_URI_HOST_NAME = "storage.googleapis.com"; + private static final int DEFAULT_BUFFER_SIZE = 15 * 1024 * 1024; + private static final int MIN_BUFFER_SIZE = 256 * 1024; + private static final Function, Boolean> DELETE_FUNCTION = new Function, Boolean>() { @Override @@ -211,6 +221,60 @@ public StorageObject call() { } } + @Override + public Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options) + throws IOException { + return createFrom(blobInfo, path, DEFAULT_BUFFER_SIZE, options); + } + + @Override + public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options) + throws IOException { + if (Files.isDirectory(path)) { + throw new StorageException(0, path + " is a directory"); + } + try (InputStream input = Files.newInputStream(path)) { + return createFrom(blobInfo, input, bufferSize, options); + } + } + + @Override + public Blob createFrom(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) + throws IOException { + return createFrom(blobInfo, content, DEFAULT_BUFFER_SIZE, options); + } + + @Override + public Blob createFrom( + BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options) + throws IOException { + + BlobWriteChannel blobWriteChannel; + try (WriteChannel writer = writer(blobInfo, options)) { + blobWriteChannel = (BlobWriteChannel) writer; + uploadHelper(Channels.newChannel(content), writer, bufferSize); + } + StorageObject objectProto = blobWriteChannel.getStorageObject(); + return Blob.fromPb(this, objectProto); + } + + /* + * Uploads the given content to the storage using specified write channel and the given buffer + * size. This method does not close any channels. + */ + private static void uploadHelper(ReadableByteChannel reader, WriteChannel writer, int bufferSize) + throws IOException { + bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE); + ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + writer.setChunkSize(bufferSize); + + while (reader.read(buffer) >= 0) { + buffer.flip(); + writer.write(buffer); + buffer.clear(); + } + } + @Override public Bucket get(String bucket, BucketGetOption... options) { final com.google.api.services.storage.model.Bucket bucketPb = BucketInfo.of(bucket).toPb(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java index e211c0277..519a9e9fa 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java @@ -725,11 +725,23 @@ public void write( long destOffset, int length, boolean last) { + writeWithResponse(uploadId, toWrite, toWriteOffset, destOffset, length, last); + } + + @Override + public StorageObject writeWithResponse( + String uploadId, + byte[] toWrite, + int toWriteOffset, + long destOffset, + int length, + boolean last) { Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_WRITE); Scope scope = tracer.withSpan(span); + StorageObject updatedBlob = null; try { if (length == 0 && !last) { - return; + return updatedBlob; } GenericUrl url = new GenericUrl(uploadId); HttpRequest httpRequest = @@ -750,6 +762,9 @@ public void write( range.append('*'); } httpRequest.getHeaders().setContentRange(range.toString()); + if (last) { + httpRequest.setParser(storage.getObjectParser()); + } int code; String message; IOException exception = null; @@ -758,6 +773,13 @@ public void write( response = httpRequest.execute(); code = response.getStatusCode(); message = response.getStatusMessage(); + String contentType = response.getContentType(); + if (last + && (code == 200 || code == 201) + && contentType != null + && contentType.startsWith("application/json")) { + updatedBlob = response.parseAs(StorageObject.class); + } } catch (HttpResponseException ex) { exception = ex; code = ex.getStatusCode(); @@ -783,6 +805,7 @@ public void write( scope.close(); span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS); } + return updatedBlob; } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java index 36c7a5ff1..7ae9c8ec1 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java @@ -325,6 +325,27 @@ void write( int length, boolean last); + /** + * Writes the provided bytes to a storage object at the provided location. If {@code last=true} + * returns metadata of the updated object, otherwise returns null. + * + * @param uploadId resumable upload ID + * @param toWrite a portion of the content + * @param toWriteOffset starting position in the {@code toWrite} array + * @param destOffset starting position in the destination data + * @param length the number of bytes to be uploaded + * @param last true, if {@code toWrite} is the final content portion + * @throws StorageException upon failure + * @return + */ + StorageObject writeWithResponse( + String uploadId, + byte[] toWrite, + int toWriteOffset, + long destOffset, + int length, + boolean last); + /** * Sends a rewrite request to open a rewrite channel. * diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java index 6bd2d487e..7733f13eb 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java @@ -139,6 +139,17 @@ public void write( throw new UnsupportedOperationException("Not implemented yet"); } + @Override + public StorageObject writeWithResponse( + String uploadId, + byte[] toWrite, + int toWriteOffset, + long destOffset, + int length, + boolean last) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override public RewriteResponse openRewrite(RewriteRequest rewriteRequest) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java index fdbb932b0..a18345be8 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java @@ -27,10 +27,14 @@ import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.api.services.storage.model.StorageObject; import com.google.cloud.RestorableState; import com.google.cloud.WriteChannel; import com.google.cloud.storage.spi.StorageRpcFactory; @@ -57,6 +61,7 @@ public class BlobWriteChannelTest { private static final String BLOB_NAME = "n"; private static final String UPLOAD_ID = "uploadid"; private static final BlobInfo BLOB_INFO = BlobInfo.newBuilder(BUCKET_NAME, BLOB_NAME).build(); + private static final StorageObject UPDATED_BLOB = new StorageObject(); private static final Map EMPTY_RPC_OPTIONS = ImmutableMap.of(); private static final int MIN_CHUNK_SIZE = 256 * 1024; private static final int DEFAULT_CHUNK_SIZE = 60 * MIN_CHUNK_SIZE; // 15MiB @@ -94,6 +99,7 @@ public void testCreate() { replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); assertTrue(writer.isOpen()); + assertNull(writer.getStorageObject()); } @Test @@ -104,6 +110,7 @@ public void testCreateRetryableError() { replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); assertTrue(writer.isOpen()); + assertNull(writer.getStorageObject()); } @Test @@ -131,28 +138,44 @@ public void testWriteWithoutFlush() throws IOException { public void testWriteWithFlush() throws IOException { expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - storageRpcMock.write( - eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(CUSTOM_CHUNK_SIZE), eq(false)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(CUSTOM_CHUNK_SIZE), + eq(false))) + .andReturn(null); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); writer.setChunkSize(CUSTOM_CHUNK_SIZE); ByteBuffer buffer = randomBuffer(CUSTOM_CHUNK_SIZE); assertEquals(CUSTOM_CHUNK_SIZE, writer.write(buffer)); assertArrayEquals(buffer.array(), capturedBuffer.getValue()); + assertNull(writer.getStorageObject()); } @Test public void testWritesAndFlush() throws IOException { expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - storageRpcMock.write( - eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(DEFAULT_CHUNK_SIZE), eq(false)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(DEFAULT_CHUNK_SIZE), + eq(false))) + .andReturn(null); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); ByteBuffer[] buffers = new ByteBuffer[DEFAULT_CHUNK_SIZE / MIN_CHUNK_SIZE]; for (int i = 0; i < buffers.length; i++) { buffers[i] = randomBuffer(MIN_CHUNK_SIZE); assertEquals(MIN_CHUNK_SIZE, writer.write(buffers[i])); + assertNull(writer.getStorageObject()); } for (int i = 0; i < buffers.length; i++) { assertArrayEquals( @@ -166,13 +189,17 @@ public void testWritesAndFlush() throws IOException { public void testCloseWithoutFlush() throws IOException { expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true))) + .andReturn(UPDATED_BLOB); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); assertTrue(writer.isOpen()); writer.close(); assertArrayEquals(new byte[0], capturedBuffer.getValue()); - assertTrue(!writer.isOpen()); + assertFalse(writer.isOpen()); + assertSame(UPDATED_BLOB, writer.getStorageObject()); } @Test @@ -180,8 +207,15 @@ public void testCloseWithFlush() throws IOException { expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); - storageRpcMock.write( - eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(true)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(true))) + .andReturn(UPDATED_BLOB); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); assertTrue(writer.isOpen()); @@ -189,14 +223,18 @@ public void testCloseWithFlush() throws IOException { writer.close(); assertEquals(DEFAULT_CHUNK_SIZE, capturedBuffer.getValue().length); assertArrayEquals(buffer.array(), Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE)); - assertTrue(!writer.isOpen()); + assertFalse(writer.isOpen()); + assertSame(UPDATED_BLOB, writer.getStorageObject()); } @Test public void testWriteClosed() throws IOException { expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true))) + .andReturn(UPDATED_BLOB); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); writer.close(); @@ -206,6 +244,7 @@ public void testWriteClosed() throws IOException { } catch (IOException ex) { // expected } + assertSame(UPDATED_BLOB, writer.getStorageObject()); } @Test @@ -213,13 +252,15 @@ public void testSaveAndRestore() throws IOException { expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(CaptureType.ALL); Capture capturedPosition = Capture.newInstance(CaptureType.ALL); - storageRpcMock.write( - eq(UPLOAD_ID), - capture(capturedBuffer), - eq(0), - captureLong(capturedPosition), - eq(DEFAULT_CHUNK_SIZE), - eq(false)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + captureLong(capturedPosition), + eq(DEFAULT_CHUNK_SIZE), + eq(false))) + .andReturn(null); expectLastCall().times(2); replay(storageRpcMock); ByteBuffer buffer1 = randomBuffer(DEFAULT_CHUNK_SIZE); @@ -239,7 +280,10 @@ public void testSaveAndRestore() throws IOException { public void testSaveAndRestoreClosed() throws IOException { expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true))) + .andReturn(UPDATED_BLOB); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); writer.close(); @@ -283,8 +327,15 @@ public void testWriteWithSignedURLAndWithoutFlush() throws IOException { public void testWriteWithSignedURLAndWithFlush() throws IOException { expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - storageRpcMock.write( - eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(CUSTOM_CHUNK_SIZE), eq(false)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(CUSTOM_CHUNK_SIZE), + eq(false))) + .andReturn(null); replay(storageRpcMock); writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); writer.setChunkSize(CUSTOM_CHUNK_SIZE); @@ -297,8 +348,15 @@ public void testWriteWithSignedURLAndWithFlush() throws IOException { public void testWriteWithSignedURLAndFlush() throws IOException { expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - storageRpcMock.write( - eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(DEFAULT_CHUNK_SIZE), eq(false)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(DEFAULT_CHUNK_SIZE), + eq(false))) + .andReturn(null); replay(storageRpcMock); writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); ByteBuffer[] buffers = new ByteBuffer[DEFAULT_CHUNK_SIZE / MIN_CHUNK_SIZE]; @@ -318,7 +376,10 @@ public void testWriteWithSignedURLAndFlush() throws IOException { public void testCloseWithSignedURLWithoutFlush() throws IOException { expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true))) + .andReturn(UPDATED_BLOB); replay(storageRpcMock); writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); assertTrue(writer.isOpen()); @@ -332,8 +393,15 @@ public void testCloseWithSignedURLWithFlush() throws IOException { expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); - storageRpcMock.write( - eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(true)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(true))) + .andReturn(UPDATED_BLOB); replay(storageRpcMock); writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); assertTrue(writer.isOpen()); @@ -348,7 +416,10 @@ public void testCloseWithSignedURLWithFlush() throws IOException { public void testWriteWithSignedURLClosed() throws IOException { expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(); - storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true))) + .andReturn(UPDATED_BLOB); replay(storageRpcMock); writer = new BlobWriteChannel(options, new URL(SIGNED_URL)); writer.close(); @@ -365,13 +436,15 @@ public void testSaveAndRestoreWithSignedURL() throws IOException { expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID); Capture capturedBuffer = Capture.newInstance(CaptureType.ALL); Capture capturedPosition = Capture.newInstance(CaptureType.ALL); - storageRpcMock.write( - eq(UPLOAD_ID), - capture(capturedBuffer), - eq(0), - captureLong(capturedPosition), - eq(DEFAULT_CHUNK_SIZE), - eq(false)); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + captureLong(capturedPosition), + eq(DEFAULT_CHUNK_SIZE), + eq(false))) + .andReturn(null); expectLastCall().times(2); replay(storageRpcMock); ByteBuffer buffer1 = randomBuffer(DEFAULT_CHUNK_SIZE); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplMockitoTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplMockitoTest.java index e8525c5c6..9eaefc66f 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplMockitoTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplMockitoTest.java @@ -41,7 +41,11 @@ import com.google.common.io.BaseEncoding; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; import java.security.Key; import java.security.KeyFactory; import java.security.NoSuchAlgorithmException; @@ -961,6 +965,185 @@ public void testCreateBlobFromStreamRetryableException() throws IOException { } } + @Test + public void testCreateFromDirectory() throws IOException { + initializeService(); + Path dir = Files.createTempDirectory("unit_"); + try { + storage.createFrom(BLOB_INFO1, dir); + fail(); + } catch (StorageException e) { + assertEquals(dir + " is a directory", e.getMessage()); + } + } + + private BlobInfo initializeUpload(byte[] bytes) { + return initializeUpload(bytes, DEFAULT_BUFFER_SIZE, EMPTY_RPC_OPTIONS); + } + + private BlobInfo initializeUpload(byte[] bytes, int bufferSize) { + return initializeUpload(bytes, bufferSize, EMPTY_RPC_OPTIONS); + } + + private BlobInfo initializeUpload( + byte[] bytes, int bufferSize, Map rpcOptions) { + String uploadId = "upload-id"; + byte[] buffer = new byte[bufferSize]; + System.arraycopy(bytes, 0, buffer, 0, bytes.length); + BlobInfo blobInfo = BLOB_INFO1.toBuilder().setMd5(null).setCrc32c(null).build(); + StorageObject storageObject = new StorageObject(); + storageObject.setBucket(BLOB_INFO1.getBucket()); + storageObject.setName(BLOB_INFO1.getName()); + storageObject.setSize(BigInteger.valueOf(bytes.length)); + doReturn(uploadId) + .doThrow(UNEXPECTED_CALL_EXCEPTION) + .when(storageRpcMock) + .open(blobInfo.toPb(), rpcOptions); + + doReturn(storageObject) + .doThrow(UNEXPECTED_CALL_EXCEPTION) + .when(storageRpcMock) + .writeWithResponse(uploadId, buffer, 0, 0L, bytes.length, true); + + initializeService(); + expectedUpdated = Blob.fromPb(storage, storageObject); + return blobInfo; + } + + @Test + public void testCreateFromFile() throws Exception { + byte[] dataToSend = {1, 2, 3, 4}; + Path tempFile = Files.createTempFile("testCreateFrom", ".tmp"); + Files.write(tempFile, dataToSend); + + BlobInfo blobInfo = initializeUpload(dataToSend); + Blob blob = storage.createFrom(blobInfo, tempFile); + assertEquals(expectedUpdated, blob); + } + + @Test + public void testCreateFromStream() throws Exception { + byte[] dataToSend = {1, 2, 3, 4, 5}; + ByteArrayInputStream stream = new ByteArrayInputStream(dataToSend); + + BlobInfo blobInfo = initializeUpload(dataToSend); + Blob blob = storage.createFrom(blobInfo, stream); + assertEquals(expectedUpdated, blob); + } + + @Test + public void testCreateFromWithOptions() throws Exception { + byte[] dataToSend = {1, 2, 3, 4, 5, 6}; + ByteArrayInputStream stream = new ByteArrayInputStream(dataToSend); + + BlobInfo blobInfo = initializeUpload(dataToSend, DEFAULT_BUFFER_SIZE, KMS_KEY_NAME_OPTIONS); + Blob blob = + storage.createFrom(blobInfo, stream, Storage.BlobWriteOption.kmsKeyName(KMS_KEY_NAME)); + assertEquals(expectedUpdated, blob); + } + + @Test + public void testCreateFromWithBufferSize() throws Exception { + byte[] dataToSend = {1, 2, 3, 4, 5, 6}; + ByteArrayInputStream stream = new ByteArrayInputStream(dataToSend); + int bufferSize = MIN_BUFFER_SIZE * 2; + + BlobInfo blobInfo = initializeUpload(dataToSend, bufferSize); + Blob blob = storage.createFrom(blobInfo, stream, bufferSize); + assertEquals(expectedUpdated, blob); + } + + @Test + public void testCreateFromWithBufferSizeAndOptions() throws Exception { + byte[] dataToSend = {1, 2, 3, 4, 5, 6}; + ByteArrayInputStream stream = new ByteArrayInputStream(dataToSend); + int bufferSize = MIN_BUFFER_SIZE * 2; + + BlobInfo blobInfo = initializeUpload(dataToSend, bufferSize, KMS_KEY_NAME_OPTIONS); + Blob blob = + storage.createFrom( + blobInfo, stream, bufferSize, Storage.BlobWriteOption.kmsKeyName(KMS_KEY_NAME)); + assertEquals(expectedUpdated, blob); + } + + @Test + public void testCreateFromWithSmallBufferSize() throws Exception { + byte[] dataToSend = new byte[100_000]; + ByteArrayInputStream stream = new ByteArrayInputStream(dataToSend); + int smallBufferSize = 100; + + BlobInfo blobInfo = initializeUpload(dataToSend, MIN_BUFFER_SIZE); + Blob blob = storage.createFrom(blobInfo, stream, smallBufferSize); + assertEquals(expectedUpdated, blob); + } + + @Test + public void testCreateFromWithException() throws Exception { + initializeService(); + String uploadId = "id-exception"; + byte[] bytes = new byte[10]; + byte[] buffer = new byte[MIN_BUFFER_SIZE]; + System.arraycopy(bytes, 0, buffer, 0, bytes.length); + BlobInfo info = BLOB_INFO1.toBuilder().setMd5(null).setCrc32c(null).build(); + doReturn(uploadId) + .doThrow(UNEXPECTED_CALL_EXCEPTION) + .when(storageRpcMock) + .open(info.toPb(), EMPTY_RPC_OPTIONS); + + Exception runtimeException = new RuntimeException("message"); + doThrow(runtimeException) + .when(storageRpcMock) + .writeWithResponse(uploadId, buffer, 0, 0L, bytes.length, true); + + InputStream input = new ByteArrayInputStream(bytes); + try { + storage.createFrom(info, input, MIN_BUFFER_SIZE); + fail(); + } catch (StorageException e) { + assertSame(runtimeException, e.getCause()); + } + } + + @Test + public void testCreateFromMultipleParts() throws Exception { + initializeService(); + String uploadId = "id-multiple-parts"; + int extraBytes = 10; + int totalSize = MIN_BUFFER_SIZE + extraBytes; + byte[] dataToSend = new byte[totalSize]; + dataToSend[0] = 42; + dataToSend[MIN_BUFFER_SIZE + 1] = 43; + + StorageObject storageObject = new StorageObject(); + storageObject.setBucket(BLOB_INFO1.getBucket()); + storageObject.setName(BLOB_INFO1.getName()); + storageObject.setSize(BigInteger.valueOf(totalSize)); + + BlobInfo info = BLOB_INFO1.toBuilder().setMd5(null).setCrc32c(null).build(); + doReturn(uploadId) + .doThrow(UNEXPECTED_CALL_EXCEPTION) + .when(storageRpcMock) + .open(info.toPb(), EMPTY_RPC_OPTIONS); + + byte[] buffer1 = new byte[MIN_BUFFER_SIZE]; + System.arraycopy(dataToSend, 0, buffer1, 0, MIN_BUFFER_SIZE); + doReturn(null) + .doThrow(UNEXPECTED_CALL_EXCEPTION) + .when(storageRpcMock) + .writeWithResponse(uploadId, buffer1, 0, 0L, MIN_BUFFER_SIZE, false); + + byte[] buffer2 = new byte[MIN_BUFFER_SIZE]; + System.arraycopy(dataToSend, MIN_BUFFER_SIZE, buffer2, 0, extraBytes); + doReturn(storageObject) + .doThrow(UNEXPECTED_CALL_EXCEPTION) + .when(storageRpcMock) + .writeWithResponse(uploadId, buffer2, 0, (long) MIN_BUFFER_SIZE, extraBytes, true); + + InputStream input = new ByteArrayInputStream(dataToSend); + Blob blob = storage.createFrom(info, input, MIN_BUFFER_SIZE); + assertEquals(Blob.fromPb(storage, storageObject), blob); + } + private void verifyChannelRead(ReadChannel channel, byte[] bytes) throws IOException { assertNotNull(channel); assertTrue(channel.isOpen()); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java index 86bbdef79..6aa76a03d 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageImplTest.java @@ -109,7 +109,8 @@ public class StorageImplTest { "projects/gcloud-devel/locations/us/keyRings/gcs_kms_key_ring_us/cryptoKeys/key"; private static final Long RETENTION_PERIOD = 10L; private static final String USER_PROJECT = "test-project"; - + private static final int DEFAULT_BUFFER_SIZE = 15 * 1024 * 1024; + private static final int MIN_BUFFER_SIZE = 256 * 1024; // BucketInfo objects private static final BucketInfo BUCKET_INFO1 = BucketInfo.newBuilder(BUCKET_NAME1).setMetageneration(42L).build(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java index 7966c9281..3d3bda7a9 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java @@ -109,6 +109,7 @@ import java.net.URLConnection; import java.nio.ByteBuffer; import java.nio.file.Files; +import java.nio.file.Path; import java.security.Key; import java.util.ArrayList; import java.util.Arrays; @@ -3322,4 +3323,45 @@ public void testDeleteLifecycleRules() throws ExecutionException, InterruptedExc RemoteStorageHelper.forceDelete(storage, bucketName, 5, TimeUnit.SECONDS); } } + + @Test + public void testUploadFromDownloadTo() throws Exception { + String blobName = "test-uploadFrom-downloadTo-blob"; + BlobId blobId = BlobId.of(BUCKET, blobName); + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build(); + + Path tempFileFrom = Files.createTempFile("ITStorageTest_", ".tmp"); + Files.write(tempFileFrom, BLOB_BYTE_CONTENT); + Blob blob = storage.createFrom(blobInfo, tempFileFrom); + assertEquals(BUCKET, blob.getBucket()); + assertEquals(blobName, blob.getName()); + assertEquals(BLOB_BYTE_CONTENT.length, (long) blob.getSize()); + + Path tempFileTo = Files.createTempFile("ITStorageTest_", ".tmp"); + storage.get(blobId).downloadTo(tempFileTo); + byte[] readBytes = Files.readAllBytes(tempFileTo); + assertArrayEquals(BLOB_BYTE_CONTENT, readBytes); + } + + @Test + public void testUploadWithEncryption() throws Exception { + String blobName = "test-upload-withEncryption"; + BlobId blobId = BlobId.of(BUCKET, blobName); + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build(); + + ByteArrayInputStream content = new ByteArrayInputStream(BLOB_BYTE_CONTENT); + Blob blob = storage.createFrom(blobInfo, content, Storage.BlobWriteOption.encryptionKey(KEY)); + + try { + blob.getContent(); + fail("StorageException was expected"); + } catch (StorageException e) { + String expectedMessage = + "The target object is encrypted by a customer-supplied encryption key."; + assertTrue(e.getMessage().contains(expectedMessage)); + assertEquals(400, e.getCode()); + } + byte[] readBytes = blob.getContent(Blob.BlobSourceOption.decryptionKey(KEY)); + assertArrayEquals(BLOB_BYTE_CONTENT, readBytes); + } }