Skip to content

Commit

Permalink
feat: wrap GZIPInputStream for connection reuse
Browse files Browse the repository at this point in the history
If a connection is closed and there are some bytes that have not
been read that connection can't be reused. Now GZIPInputStream
will have all of its bytes read on close automatically to promote
connection reuse.

Cherry-picked: googleapis#749
Fixes: googleapis#367
  • Loading branch information
codyoss committed Oct 14, 2019
1 parent abf01a5 commit 756987a
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 41 deletions.
@@ -0,0 +1,58 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.api.client.http;

import com.google.api.client.util.Preconditions;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;

/**
* This class in meant to wrap an {@link InputStream} so that all bytes in the steam are read and
* discarded on {@link InputStream#close()}. This ensures that the underlying connection has the
* option to be reused.
*/
final class ConsumingInputStream extends InputStream {
private InputStream inputStream;
private boolean closed = false;

ConsumingInputStream(InputStream inputStream) {
this.inputStream = Preconditions.checkNotNull(inputStream);
}

@Override
public int read() throws IOException {
return inputStream.read();
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return inputStream.read(b, off, len);
}

@Override
public void close() throws IOException {
if (!closed && inputStream != null) {
try {
ByteStreams.exhaust(this);
inputStream.close();
} finally {
this.closed = true;
}
}
}
}
Expand Up @@ -331,8 +331,8 @@ public InputStream getContent() throws IOException {
if (!returnRawInputStream
&& contentEncoding != null
&& contentEncoding.contains("gzip")) {
lowLevelResponseContent = new ConsumingInputStream(
new GZIPInputStream(lowLevelResponseContent));
lowLevelResponseContent =
new ConsumingInputStream(new GZIPInputStream(lowLevelResponseContent));
}
// logging (wrap content with LoggingInputStream)
Logger logger = HttpTransport.LOGGER;
Expand All @@ -357,44 +357,6 @@ public InputStream getContent() throws IOException {
return content;
}

static class ConsumingInputStream extends InputStream {
private InputStream inputStream;
private boolean closed = false;

private ConsumingInputStream(InputStream inputStream) {
this.inputStream = Preconditions.checkNotNull(inputStream);
}

@Override
public int read() throws IOException {
return inputStream.read();
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return inputStream.read(b, off, len);
}

@Override
public void close() throws IOException {
if (!closed && inputStream != null) {
try {
drainInputStream(this);
inputStream.close();
} finally {
this.closed = true;
}
}
}

static void drainInputStream(final InputStream inputStream) throws IOException {
byte buffer[] = new byte[1024];
while (inputStream.read(buffer) >= 0) {
// do nothing
}
}
}

/**
* Writes the content of the HTTP response into the given destination output stream.
*
Expand Down
@@ -0,0 +1,63 @@
/*
* Copyright 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.api.client.http;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.io.InputStream;
import org.junit.Test;

public class ConsumingInputStreamTest {

@Test
public void testClose_drainsBytesOnClose() throws IOException {
MockInputStream mockInputStream = new MockInputStream("abc123".getBytes());
InputStream consumingInputStream = new ConsumingInputStream(mockInputStream);

assertEquals(6, mockInputStream.getBytesToRead());

// read one byte
consumingInputStream.read();
assertEquals(5, mockInputStream.getBytesToRead());

// closing the stream should read the remaining bytes
consumingInputStream.close();
assertEquals(0, mockInputStream.getBytesToRead());
}

private class MockInputStream extends InputStream {
private int bytesToRead;

MockInputStream(byte[] data) {
this.bytesToRead = data.length;
}

@Override
public int read() throws IOException {
if (bytesToRead == 0) {
return -1;
}
bytesToRead--;
return 1;
}

int getBytesToRead() {
return bytesToRead;
}
}
}
Expand Up @@ -33,7 +33,6 @@
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import junit.framework.TestCase;
import sun.net.www.http.ChunkedInputStream;

/**
* Tests {@link HttpResponse}.
Expand Down

0 comments on commit 756987a

Please sign in to comment.