Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: Add shouldReturnRawInputStream option to Get requests (#872)
* feat: Add shouldReturnRawInputStream option to Get requests

* lint, clarify documentation
  • Loading branch information
JesseLovelace committed Jun 14, 2021
1 parent ee7c236 commit 474dfae
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 3 deletions.
Expand Up @@ -117,6 +117,8 @@ private Storage.BlobSourceOption toSourceOptions(BlobInfo blobInfo) {
return Storage.BlobSourceOption.decryptionKey((String) getValue());
case USER_PROJECT:
return Storage.BlobSourceOption.userProject((String) getValue());
case RETURN_RAW_INPUT_STREAM:
return Storage.BlobSourceOption.shouldReturnRawInputStream((boolean) getValue());
default:
throw new AssertionError("Unexpected enum value");
}
Expand All @@ -136,6 +138,8 @@ private Storage.BlobGetOption toGetOption(BlobInfo blobInfo) {
return Storage.BlobGetOption.userProject((String) getValue());
case CUSTOMER_SUPPLIED_KEY:
return Storage.BlobGetOption.decryptionKey((String) getValue());
case RETURN_RAW_INPUT_STREAM:
return Storage.BlobGetOption.shouldReturnRawInputStream((boolean) getValue());
default:
throw new AssertionError("Unexpected enum value");
}
Expand Down Expand Up @@ -200,6 +204,16 @@ public static BlobSourceOption userProject(String userProject) {
return new BlobSourceOption(StorageRpc.Option.USER_PROJECT, userProject);
}

/**
* Returns an option for whether the request should return the raw input stream, instead of
* automatically decompressing the content. By default, this is false for Blob.downloadTo(), but
* true for ReadChannel.read().
*/
public static BlobSourceOption shouldReturnRawInputStream(boolean shouldReturnRawInputStream) {
return new BlobSourceOption(
StorageRpc.Option.RETURN_RAW_INPUT_STREAM, shouldReturnRawInputStream);
}

static Storage.BlobSourceOption[] toSourceOptions(
BlobInfo blobInfo, BlobSourceOption... options) {
Storage.BlobSourceOption[] convertedOptions = new Storage.BlobSourceOption[options.length];
Expand Down
Expand Up @@ -847,6 +847,16 @@ public static BlobSourceOption decryptionKey(String key) {
public static BlobSourceOption userProject(String userProject) {
return new BlobSourceOption(StorageRpc.Option.USER_PROJECT, userProject);
}

/**
* Returns an option for whether the request should return the raw input stream, instead of
* automatically decompressing the content. By default, this is false for Blob.downloadTo(), but
* true for ReadChannel.read().
*/
public static BlobSourceOption shouldReturnRawInputStream(boolean shouldReturnRawInputStream) {
return new BlobSourceOption(
StorageRpc.Option.RETURN_RAW_INPUT_STREAM, shouldReturnRawInputStream);
}
}

/** Class for specifying blob get options. */
Expand All @@ -862,6 +872,10 @@ private BlobGetOption(StorageRpc.Option rpcOption, String value) {
super(rpcOption, value);
}

private BlobGetOption(StorageRpc.Option rpcOption, boolean value) {
super(rpcOption, value);
}

/**
* Returns an option for blob's data generation match. If this option is used the request will
* fail if blob's generation does not match. The generation value to compare with the actual
Expand Down Expand Up @@ -953,6 +967,16 @@ public static BlobGetOption decryptionKey(Key key) {
public static BlobGetOption decryptionKey(String key) {
return new BlobGetOption(StorageRpc.Option.CUSTOMER_SUPPLIED_KEY, key);
}

/**
* Returns an option for whether the request should return the raw input stream, instead of
* automatically decompressing the content. By default, this is false for Blob.downloadTo(), but
* true for ReadChannel.read().
*/
public static BlobGetOption shouldReturnRawInputStream(boolean shouldReturnRawInputStream) {
return new BlobGetOption(
StorageRpc.Option.RETURN_RAW_INPUT_STREAM, shouldReturnRawInputStream);
}
}

/** Class for specifying bucket list options. */
Expand Down
Expand Up @@ -681,7 +681,6 @@ private Get createReadRequest(StorageObject from, Map<Option, ?> options) throws
.setIfGenerationNotMatch(Option.IF_GENERATION_NOT_MATCH.getLong(options))
.setUserProject(Option.USER_PROJECT.getString(options));
setEncryptionHeaders(req.getRequestHeaders(), ENCRYPTION_KEY_PREFIX, options);
req.setReturnRawInputStream(true);
return req;
}

Expand All @@ -692,9 +691,15 @@ public long read(
Scope scope = tracer.withSpan(span);
try {
Get req = createReadRequest(from, options);
Boolean shouldReturnRawInputStream = Option.RETURN_RAW_INPUT_STREAM.getBoolean(options);
if (shouldReturnRawInputStream != null) {
req.setReturnRawInputStream(shouldReturnRawInputStream);
} else {
req.setReturnRawInputStream(false);
}
req.getMediaHttpDownloader().setBytesDownloaded(position);
req.getMediaHttpDownloader().setDirectDownloadEnabled(true);
req.executeMediaAndDownloadTo(outputStream);
req.executeMedia().download(outputStream);
return req.getMediaHttpDownloader().getNumBytesDownloaded();
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
Expand All @@ -717,6 +722,12 @@ public Tuple<String, byte[]> read(
try {
checkArgument(position >= 0, "Position should be non-negative, is " + position);
Get req = createReadRequest(from, options);
Boolean shouldReturnRawInputStream = Option.RETURN_RAW_INPUT_STREAM.getBoolean(options);
if (shouldReturnRawInputStream != null) {
req.setReturnRawInputStream(shouldReturnRawInputStream);
} else {
req.setReturnRawInputStream(true);
}
StringBuilder range = new StringBuilder();
range.append("bytes=").append(position).append("-").append(position + bytes - 1);
HttpHeaders requestHeaders = req.getRequestHeaders();
Expand Down
Expand Up @@ -68,7 +68,8 @@ enum Option {
SERVICE_ACCOUNT_EMAIL("serviceAccount"),
SHOW_DELETED_KEYS("showDeletedKeys"),
REQUESTED_POLICY_VERSION("optionsRequestedPolicyVersion"),
DETECT_CONTENT_TYPE("detectContentType");
DETECT_CONTENT_TYPE("detectContentType"),
RETURN_RAW_INPUT_STREAM("returnRawInputStream");

private final String value;

Expand Down
Expand Up @@ -104,6 +104,7 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
Expand All @@ -127,6 +128,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.crypto.spec.SecretKeySpec;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
Expand Down Expand Up @@ -802,6 +804,40 @@ public void testGetBlobFailNonExistingGeneration() {
}
}

@Test
public void testGetBlobRawInput() throws IOException {
Path file = File.createTempFile("temp", ".txt").toPath();
Files.write(file, "hello world".getBytes());

File gzippedFile = File.createTempFile("temp", ".gz");

GZIPOutputStream gzipOutputStream = new GZIPOutputStream(new FileOutputStream(gzippedFile));
Files.copy(file, gzipOutputStream);
gzipOutputStream.close();

String blobName = "zipped_blob";
BlobId blobId = BlobId.of(BUCKET, blobName);
BlobInfo blobInfo =
BlobInfo.newBuilder(blobId).setContentEncoding("gzip").setContentType("text/plain").build();

storage.createFrom(blobInfo, gzippedFile.toPath());

Path rawInputGzippedFile = File.createTempFile("rawinputgzippedfile", ".txt").toPath();
Blob blob = storage.get(blobId);

blob.downloadTo(rawInputGzippedFile, Blob.BlobSourceOption.shouldReturnRawInputStream(true));

assertArrayEquals(
Files.readAllBytes(gzippedFile.toPath()), Files.readAllBytes(rawInputGzippedFile));

Path unzippedFile = File.createTempFile("unzippedfile", ".txt").toPath();
storage
.get(blobId)
.downloadTo(unzippedFile, Blob.BlobSourceOption.shouldReturnRawInputStream(false));

assertArrayEquals("hello world".getBytes(), Files.readAllBytes(unzippedFile));
}

@Test(timeout = 5000)
public void testListBlobsSelectedFields() throws InterruptedException {
String[] blobNames = {
Expand Down

0 comments on commit 474dfae

Please sign in to comment.