Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add shouldReturnRawInputStream option to Get requests #872

Merged
merged 2 commits into from Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -117,6 +117,8 @@ private Storage.BlobSourceOption toSourceOptions(BlobInfo blobInfo) {
return Storage.BlobSourceOption.decryptionKey((String) getValue());
case USER_PROJECT:
return Storage.BlobSourceOption.userProject((String) getValue());
case RETURN_RAW_INPUT_STREAM:
return Storage.BlobSourceOption.shouldReturnRawInputStream((boolean) getValue());
default:
throw new AssertionError("Unexpected enum value");
}
Expand All @@ -136,6 +138,8 @@ private Storage.BlobGetOption toGetOption(BlobInfo blobInfo) {
return Storage.BlobGetOption.userProject((String) getValue());
case CUSTOMER_SUPPLIED_KEY:
return Storage.BlobGetOption.decryptionKey((String) getValue());
case RETURN_RAW_INPUT_STREAM:
return Storage.BlobGetOption.shouldReturnRawInputStream((boolean) getValue());
default:
throw new AssertionError("Unexpected enum value");
}
Expand Down Expand Up @@ -200,6 +204,14 @@ public static BlobSourceOption userProject(String userProject) {
return new BlobSourceOption(StorageRpc.Option.USER_PROJECT, userProject);
}

/**
* Returns an option for whether the request should return the raw input stream, instead of
* automatically decompressing the content. By default, this is true.
*/
public static BlobSourceOption shouldReturnRawInputStream(boolean shouldReturnRawInputStream) {
return new BlobSourceOption(StorageRpc.Option.RETURN_RAW_INPUT_STREAM, shouldReturnRawInputStream);
}

static Storage.BlobSourceOption[] toSourceOptions(
BlobInfo blobInfo, BlobSourceOption... options) {
Storage.BlobSourceOption[] convertedOptions = new Storage.BlobSourceOption[options.length];
Expand Down
Expand Up @@ -847,6 +847,14 @@ public static BlobSourceOption decryptionKey(String key) {
public static BlobSourceOption userProject(String userProject) {
return new BlobSourceOption(StorageRpc.Option.USER_PROJECT, userProject);
}

/**
* Returns an option for whether the request should return the raw input stream, instead of
* automatically decompressing the content. By default, this is true.
*/
public static BlobSourceOption shouldReturnRawInputStream(boolean shouldReturnRawInputStream) {
return new BlobSourceOption(StorageRpc.Option.RETURN_RAW_INPUT_STREAM, shouldReturnRawInputStream);
}
}

/** Class for specifying blob get options. */
Expand All @@ -862,6 +870,10 @@ private BlobGetOption(StorageRpc.Option rpcOption, String value) {
super(rpcOption, value);
}

private BlobGetOption(StorageRpc.Option rpcOption, boolean value) {
super(rpcOption, value);
}

/**
* Returns an option for blob's data generation match. If this option is used the request will
* fail if blob's generation does not match. The generation value to compare with the actual
Expand Down Expand Up @@ -953,6 +965,14 @@ public static BlobGetOption decryptionKey(Key key) {
public static BlobGetOption decryptionKey(String key) {
return new BlobGetOption(StorageRpc.Option.CUSTOMER_SUPPLIED_KEY, key);
}

/**
* Returns an option for whether the request should return the raw input stream, instead of
* automatically decompressing the content. By default, this is true.
*/
public static BlobGetOption shouldReturnRawInputStream(boolean shouldReturnRawInputStream) {
return new BlobGetOption(StorageRpc.Option.RETURN_RAW_INPUT_STREAM, shouldReturnRawInputStream);
}
}

/** Class for specifying bucket list options. */
Expand Down
Expand Up @@ -681,7 +681,6 @@ private Get createReadRequest(StorageObject from, Map<Option, ?> options) throws
.setIfGenerationNotMatch(Option.IF_GENERATION_NOT_MATCH.getLong(options))
.setUserProject(Option.USER_PROJECT.getString(options));
setEncryptionHeaders(req.getRequestHeaders(), ENCRYPTION_KEY_PREFIX, options);
req.setReturnRawInputStream(true);
return req;
}

Expand All @@ -692,9 +691,15 @@ public long read(
Scope scope = tracer.withSpan(span);
try {
Get req = createReadRequest(from, options);
Boolean shouldReturnRawInputStream = Option.RETURN_RAW_INPUT_STREAM.getBoolean(options);
if (shouldReturnRawInputStream != null ) {
req.setReturnRawInputStream(shouldReturnRawInputStream);
} else {
req.setReturnRawInputStream(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default value is different here so wondering how this will be reconciled.

}
req.getMediaHttpDownloader().setBytesDownloaded(position);
req.getMediaHttpDownloader().setDirectDownloadEnabled(true);
req.executeMediaAndDownloadTo(outputStream);
req.executeMedia().download(outputStream);
return req.getMediaHttpDownloader().getNumBytesDownloaded();
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
Expand All @@ -717,6 +722,12 @@ public Tuple<String, byte[]> read(
try {
checkArgument(position >= 0, "Position should be non-negative, is " + position);
Get req = createReadRequest(from, options);
Boolean shouldReturnRawInputStream = Option.RETURN_RAW_INPUT_STREAM.getBoolean(options);
if (shouldReturnRawInputStream != null ) {
req.setReturnRawInputStream(shouldReturnRawInputStream);
} else {
req.setReturnRawInputStream(true);
}
StringBuilder range = new StringBuilder();
range.append("bytes=").append(position).append("-").append(position + bytes - 1);
HttpHeaders requestHeaders = req.getRequestHeaders();
Expand Down
Expand Up @@ -68,7 +68,8 @@ enum Option {
SERVICE_ACCOUNT_EMAIL("serviceAccount"),
SHOW_DELETED_KEYS("showDeletedKeys"),
REQUESTED_POLICY_VERSION("optionsRequestedPolicyVersion"),
DETECT_CONTENT_TYPE("detectContentType");
DETECT_CONTENT_TYPE("detectContentType"),
RETURN_RAW_INPUT_STREAM("returnRawInputStream");

private final String value;

Expand Down
Expand Up @@ -104,6 +104,7 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
Expand All @@ -127,6 +128,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.crypto.spec.SecretKeySpec;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
Expand Down Expand Up @@ -802,6 +804,36 @@ public void testGetBlobFailNonExistingGeneration() {
}
}

@Test
public void testGetBlobRawInput() throws IOException {
Path file = File.createTempFile("temp", ".txt").toPath();
Files.write(file, "hello world".getBytes());

File gzippedFile = File.createTempFile("temp", ".gz");

GZIPOutputStream gzipOutputStream = new GZIPOutputStream(new FileOutputStream(gzippedFile));
Files.copy(file, gzipOutputStream);
gzipOutputStream.close();

String blobName = "zipped_blob";
BlobId blobId = BlobId.of(BUCKET, blobName);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentEncoding("gzip").setContentType("text/plain").build();

storage.createFrom(blobInfo, gzippedFile.toPath());

Path rawInputGzippedFile = File.createTempFile("rawinputgzippedfile", ".txt").toPath();
Blob blob = storage.get(blobId);

blob.downloadTo(rawInputGzippedFile, Blob.BlobSourceOption.shouldReturnRawInputStream(true));

assertArrayEquals(Files.readAllBytes(gzippedFile.toPath()), Files.readAllBytes(rawInputGzippedFile));

Path unzippedFile = File.createTempFile("unzippedfile", ".txt").toPath();
storage.get(blobId).downloadTo(unzippedFile, Blob.BlobSourceOption.shouldReturnRawInputStream(false));

assertArrayEquals("hello world".getBytes(), Files.readAllBytes(unzippedFile));
}

@Test(timeout = 5000)
public void testListBlobsSelectedFields() throws InterruptedException {
String[] blobNames = {
Expand Down