Skip to content

Commit

Permalink
Improve GOAWAY handling
Browse files Browse the repository at this point in the history
Currently GOAWAY is detected/handled only in the connection phase but
sometimes can happen in the transfer phase as well.

This now wraps the whole request inside a block that is retried if a
GOAWAY is detected.

(cherry picked from commit deb31c7)
  • Loading branch information
laeubi committed Feb 12, 2024
1 parent 8edc7b4 commit 7a5858b
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 140 deletions.
@@ -0,0 +1,59 @@
/*******************************************************************************
* Copyright (c) 2024 Christoph Läubrich and others.
* This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Christoph Läubrich - initial API and implementation
*******************************************************************************/
package org.eclipse.tycho.p2maven.transport;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.util.List;
import java.util.Map;

public interface Headers extends AutoCloseable {

String ENCODING_IDENTITY = "identity";
String HEADER_ACCEPT_ENCODING = "Accept-Encoding";
String HEADER_CONTENT_ENCODING = "Content-Encoding";
String ENCODING_GZIP = "gzip";
String ETAG_HEADER = "ETag";
String LAST_MODIFIED_HEADER = "Last-Modified";
String EXPIRES_HEADER = "Expires";
String CACHE_CONTROL_HEADER = "Cache-Control";
String MAX_AGE_DIRECTIVE = "max-age";
String MUST_REVALIDATE_DIRECTIVE = "must-revalidate";

int statusCode() throws IOException;

Map<String, List<String>> headers();

@Override
void close();

URI getURI();

String getHeader(String header);

long getLastModified();

default void checkResponseCode() throws FileNotFoundException, IOException {
int code = statusCode();
if (code >= HttpURLConnection.HTTP_BAD_REQUEST) {
if (code == HttpURLConnection.HTTP_NOT_FOUND || code == HttpURLConnection.HTTP_GONE) {
throw new FileNotFoundException(getURI().toString());
} else {
throw new java.io.IOException("Server returned HTTP code: " + code + " for URL " + getURI().toString());
}
}
}

}
@@ -1,14 +1,15 @@
package org.eclipse.tycho.p2maven.transport;

import java.io.IOException;
import java.io.InputStream;

import org.eclipse.tycho.p2maven.transport.Response.ResponseConsumer;

public interface HttpTransport {

void setHeader(String key, String value);

Response<InputStream> get() throws IOException;
<T> T get(ResponseConsumer<T> consumer) throws IOException;

Response<Void> head() throws IOException;
Headers head() throws IOException;

}
Expand Up @@ -15,6 +15,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.SocketAddress;
Expand Down Expand Up @@ -42,6 +43,7 @@
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.eclipse.tycho.p2maven.helper.ProxyHelper;
import org.eclipse.tycho.p2maven.transport.Response.ResponseConsumer;

/**
* A transport using Java11 HttpClient
Expand Down Expand Up @@ -77,7 +79,7 @@ public class Java11HttpTransportFactory implements HttpTransportFactory, Initial
@Override
public HttpTransport createTransport(URI uri) {
Java11HttpTransport transport = new Java11HttpTransport(client, clientHttp1, HttpRequest.newBuilder().uri(uri),
logger);
uri, logger);
authenticator.preemtiveAuth((k, v) -> transport.setHeader(k, v), uri);
return transport;
}
Expand All @@ -88,11 +90,13 @@ private static final class Java11HttpTransport implements HttpTransport {
private HttpClient client;
private Logger logger;
private HttpClient clientHttp1;
private URI uri;

public Java11HttpTransport(HttpClient client, HttpClient clientHttp1, Builder builder, Logger logger) {
public Java11HttpTransport(HttpClient client, HttpClient clientHttp1, Builder builder, URI uri, Logger logger) {
this.client = client;
this.clientHttp1 = clientHttp1;
this.builder = builder;
this.uri = uri;
this.logger = logger;
}

Expand All @@ -102,78 +106,106 @@ public void setHeader(String key, String value) {
}

@Override
public Response<InputStream> get() throws IOException {
public <T> T get(ResponseConsumer<T> consumer) throws IOException {
try {
HttpResponse<InputStream> response = performGet();
return new ResponseImplementation<>(response) {

@Override
public void close() {
if (response.version() == Version.HTTP_1_1) {
// discard any remaining data and close the stream to return the connection to
// the pool..
try (InputStream stream = body()) {
int discarded = 0;
while (discarded < MAX_DISCARD) {
int read = stream.read(DUMMY_BUFFER);
if (read < 0) {
break;
}
discarded += read;
}
} catch (IOException e) {
// don't care...
}
} else {
// just closing should be enough to signal to the framework...
try (InputStream stream = body()) {
} catch (IOException e) {
// don't care...
}
}
try {
return performGet(consumer, client);
} catch (IOException e) {
if (isGoaway(e)) {
logger.info("Received GOAWAY from server " + uri.getHost() + " will retry with Http/1...");
TimeUnit.SECONDS.sleep(1);
return performGet(consumer, clientHttp1);
}
};
throw e;
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}

private HttpResponse<InputStream> performGet() throws IOException, InterruptedException {
private <T> T performGet(ResponseConsumer<T> consumer, HttpClient httpClient)
throws IOException, InterruptedException {
HttpRequest request = builder.GET().timeout(Duration.ofSeconds(TIMEOUT_SECONDS)).build();
try {
return client.send(request, BodyHandlers.ofInputStream());
} catch (IOException e) {
if (isGoaway(e)) {
logger.warn("Received GOAWAY from server " + request.uri().getHost()
+ " will retry after one second with Http/1...");
TimeUnit.SECONDS.sleep(1);
return clientHttp1.send(request, BodyHandlers.ofInputStream());
HttpResponse<InputStream> response = httpClient.send(request, BodyHandlers.ofInputStream());
try (ResponseImplementation<InputStream> implementation = new ResponseImplementation<>(response) {

@Override
public void close() {
if (response.version() == Version.HTTP_1_1) {
// discard any remaining data and close the stream to return the connection to
// the pool..
try (InputStream stream = response.body()) {
int discarded = 0;
while (discarded < MAX_DISCARD) {
int read = stream.read(DUMMY_BUFFER);
if (read < 0) {
break;
}
discarded += read;
}
} catch (IOException e) {
// don't care...

This comment has been minimized.

Copy link
@basilevs

basilevs Feb 29, 2024

Contributor

We want to know If the response has failed to close for any reason, as it leads to resource leaks.

This comment has been minimized.

Copy link
@laeubi

laeubi Mar 1, 2024

Author Member

Please explain where there is a leak, this is actually to not leak reasources, if reading the stream failed it means that someone has hung-up so nothing to cleanup anyways.

This comment has been minimized.

Copy link
@basilevs

basilevs Mar 1, 2024

Contributor

Please explain where there is a leak, this is actually to not leak reasources, if reading the stream failed it means that someone has hung-up so nothing to cleanup anyways.

try-with-resource block closes the requested resource on exit from "try" section. If the resource's close method throws (indicating potential leak, inability to free the resource), then "catch" section of try-with-resource handles that exception. In this case "catch" section handles both read failures and close failures. While read failures may be unimportant, close failures always are. Even if we are unable to handle them in another meaningful way, they should at least be logged.

This comment has been minimized.

Copy link
@laeubi

laeubi Mar 1, 2024

Author Member

Close failures are nothing we can handle, and this is not an indication of a leak as API has no there way to free a resource than calling close, so if it would leaking its a bug in the implementation not on calling side.

This comment has been minimized.

Copy link
@basilevs

basilevs Mar 1, 2024

Contributor

There may be a bug in implementation and suppression of this error makes it harder to isolate.
See, for example, #3540 where proper propagation of errors in the previous revision of this code allows to eliminate some potential causes.

This comment has been minimized.

Copy link
@laeubi

laeubi Mar 1, 2024

Author Member

Tycho is not a debugging facility. The "bug" is already that Java does not return connections when they are failed and there is no way for client code to tell a request is "done", so this is already a mitigation beside that I have never seen any implementation that actually throws any meaningful exception from close.

}
} else {
// just closing should be enough to signal to the framework...
try (InputStream stream = response.body()) {
} catch (IOException e) {
// don't care...
}
}
}
throw e;

@Override
public void transferTo(OutputStream outputStream, ContentEncoding transportEncoding)
throws IOException {
transportEncoding.decode(response.body()).transferTo(outputStream);
}
}) {
return consumer.handleResponse(implementation);
}
}

@Override
public Response<Void> head() throws IOException {
public Response head() throws IOException {
try {
HttpResponse<Void> response = client.send(
builder.method("HEAD", BodyPublishers.noBody()).timeout(Duration.ofSeconds(TIMEOUT_SECONDS))
.build(),
BodyHandlers.discarding());
return new ResponseImplementation<>(response) {
@Override
public void close() {
// nothing...
try {
return doHead(client);
} catch (IOException e) {
if (isGoaway(e)) {
logger.debug("Received GOAWAY from server " + uri.getHost()
+ " will retry with Http/1...");
TimeUnit.SECONDS.sleep(1);
return doHead(clientHttp1);
}
};
throw e;
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}

private Response doHead(HttpClient httpClient) throws IOException, InterruptedException {
HttpResponse<Void> response = httpClient.send(
builder.method("HEAD", BodyPublishers.noBody()).timeout(Duration.ofSeconds(TIMEOUT_SECONDS))
.build(),
BodyHandlers.discarding());
return new ResponseImplementation<>(response) {
@Override
public void close() {
// nothing...
}

@Override
public void transferTo(OutputStream outputStream, ContentEncoding transportEncoding)
throws IOException {
throw new IOException("HEAD returns no body");
}
};
}

}

private static abstract class ResponseImplementation<T> implements Response<T> {
private static abstract class ResponseImplementation<T> implements Response {
private final HttpResponse<T> response;

private ResponseImplementation(HttpResponse<T> response) {
Expand All @@ -190,11 +222,6 @@ public Map<String, List<String>> headers() {
return response.headers().map();
}

@Override
public T body() {
return response.body();
}

@Override
public String getHeader(String header) {
return response.headers().firstValue(header).orElse(null);
Expand Down
Expand Up @@ -12,39 +12,32 @@
*******************************************************************************/
package org.eclipse.tycho.p2maven.transport;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.util.List;
import java.util.Map;

public interface Response<T> extends AutoCloseable {

int statusCode() throws IOException;

Map<String, List<String>> headers();

@Override
void close();

T body() throws IOException;

URI getURI();
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.GZIPInputStream;

public interface Response extends Headers {

default void transferTo(OutputStream outputStream) throws IOException {
String encoding = getHeader(Headers.HEADER_CONTENT_ENCODING);
if (Headers.ENCODING_GZIP.equals(encoding)) {
transferTo(outputStream, GZIPInputStream::new);
} else if (encoding == null || encoding.isEmpty() || Headers.ENCODING_IDENTITY.equals(encoding)) {
transferTo(outputStream, stream -> stream);
} else {
throw new IOException("Unknown content encoding: " + encoding);
}
}

String getHeader(String header);
void transferTo(OutputStream outputStream, ContentEncoding transportEncoding) throws IOException;

long getLastModified();
interface ContentEncoding {
InputStream decode(InputStream raw) throws IOException;
}

default void checkResponseCode() throws FileNotFoundException, IOException {
int code = statusCode();
if (code >= HttpURLConnection.HTTP_BAD_REQUEST) {
if (code == HttpURLConnection.HTTP_NOT_FOUND || code == HttpURLConnection.HTTP_GONE) {
throw new FileNotFoundException(getURI().toString());
} else {
throw new java.io.IOException("Server returned HTTP code: " + code + " for URL " + getURI().toString());
}
}
interface ResponseConsumer<T> {
T handleResponse(Response response) throws IOException;
}

}

0 comments on commit 7a5858b

Please sign in to comment.