Skip to content

Commit

Permalink
fix: use exponential backoff strategy for reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumelamirand committed Mar 27, 2024
1 parent 893dba6 commit 052a6ad
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 129 deletions.
Expand Up @@ -45,6 +45,11 @@
<artifactId>gravitee-exchange-connector-embedded</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.gravitee.common</groupId>
<artifactId>gravitee-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
Expand All @@ -65,11 +70,6 @@
<artifactId>rxjava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.gravitee.common</groupId>
<artifactId>gravitee-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.gravitee.exchange</groupId>
<artifactId>gravitee-exchange-api</artifactId>
Expand Down
Expand Up @@ -17,6 +17,7 @@

import static io.gravitee.exchange.api.controller.ws.WebsocketControllerConstants.EXCHANGE_PROTOCOL_HEADER;

import io.gravitee.common.utils.RxHelper;
import io.gravitee.exchange.api.command.Command;
import io.gravitee.exchange.api.command.CommandAdapter;
import io.gravitee.exchange.api.command.CommandHandler;
Expand All @@ -30,7 +31,6 @@
import io.gravitee.exchange.connector.websocket.client.WebSocketConnectorClientFactory;
import io.gravitee.exchange.connector.websocket.exception.WebSocketConnectorException;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import io.vertx.core.http.WebSocketConnectOptions;
Expand Down Expand Up @@ -87,15 +87,16 @@ public Completable initialize() {
})
);
})
.retryWhen(errors ->
errors.flatMap(err -> {
if (err instanceof WebSocketConnectorException connectorException && connectorException.isRetryable()) {
return Flowable.timer(5000, TimeUnit.MILLISECONDS);
}
log.error("Unable to connect to Exchange Connect Endpoint, stop retrying.");
return Flowable.error(err);
})
);
.retryWhen(
RxHelper.retryExponentialBackoff(
1,
300,
TimeUnit.SECONDS,
0.5,
throwable -> throwable instanceof WebSocketConnectorException connectorException && connectorException.isRetryable()
)
)
.doOnError(throwable -> log.error("Unable to connect to Exchange Connect Endpoint."));
}

private Single<WebSocket> connect() {
Expand All @@ -104,7 +105,7 @@ private Single<WebSocket> connect() {
.switchIfEmpty(
Maybe.fromRunnable(() -> {
throw new WebSocketConnectorException(
"No Exchange Controller Endpoint is defined or available. Please check your configuration",
"No Exchange Controller Endpoint is defined. Please check your configuration",
false
);
})
Expand All @@ -123,28 +124,18 @@ private Single<WebSocket> connect() {
return httpClient
.rxWebSocket(webSocketConnectOptions)
.doOnSuccess(webSocket -> {
webSocketEndpoint.resetRetryCount();
webSocketConnectorClientFactory.resetEndpointRetries();
log.info(
"Connector is now connected to Exchange Controller through websocket via [{}]",
webSocketEndpoint.getUri().toString()
);
})
.onErrorResumeNext(throwable -> {
int retryCount = webSocketEndpoint.getRetryCount();
int maxRetryCount = webSocketEndpoint.getMaxRetryCount();
if (retryCount < maxRetryCount) {
log.error(
"Unable to connect to Exchange Connect Endpoint: {}/{} time, retrying...",
retryCount,
maxRetryCount,
throwable
);
} else {
log.error(
"Unable to connect to Exchange Connect Endpoint. Max retries attempt reached, changing endpoint.",
throwable
);
}
log.error(
"Unable to connect to Exchange Connect Endpoint {} times, retrying...",
webSocketConnectorClientFactory.endpointRetries(),
throwable
);
// Force the HTTP client to close after a defect.
return httpClient
.close()
Expand Down
Expand Up @@ -31,8 +31,6 @@
public class WebSocketClientConfiguration {

public static final String HEADERS_KEY = "connector.ws.headers";
public static final String MAX_RETRY_KEY = "connector.ws.maxRetry";
public static final int MAX_RETRY_DEFAULT = 20;
public static final String TRUST_ALL_KEY = "connector.ws.ssl.trustAll";
public static final boolean TRUST_ALL_DEFAULT = false;
public static final String VERIFY_HOST_KEY = "connector.ws.ssl.verifyHost";
Expand Down Expand Up @@ -76,10 +74,6 @@ private Map<String, String> readHeaders() {
return computedHeaders;
}

public int maxRetry() {
return identifyConfiguration.getProperty(MAX_RETRY_KEY, Integer.class, MAX_RETRY_DEFAULT);
}

public boolean trustAll() {
return identifyConfiguration.getProperty(TRUST_ALL_KEY, Boolean.class, TRUST_ALL_DEFAULT);
}
Expand Down Expand Up @@ -156,8 +150,7 @@ private List<WebSocketEndpoint> readEndpoints() {
List<WebSocketEndpoint> endpointsConfiguration = new ArrayList<>();
List<String> propertyList = identifyConfiguration.getPropertyList(ENDPOINTS_KEY);
if (propertyList != null) {
int maxRetryCount = maxRetry();
endpointsConfiguration.addAll(propertyList.stream().map(url -> new WebSocketEndpoint(url, maxRetryCount)).toList());
endpointsConfiguration.addAll(propertyList.stream().map(WebSocketEndpoint::new).toList());
}
return endpointsConfiguration;
}
Expand Down
Expand Up @@ -53,21 +53,15 @@ public WebSocketEndpoint nextEndpoint() {
if (endpoints.isEmpty()) {
return null;
}
return endpoints.get(Math.abs(counter.getAndIncrement() % endpoints.size()));
}

WebSocketEndpoint endpoint = endpoints.get(Math.abs(counter.getAndIncrement() % endpoints.size()));

endpoint.incrementRetryCount();
if (endpoint.isRemovable()) {
log.info(
"Websocket Exchange Connector connects to endpoint at {} more than {} times. Removing instance...",
endpoint.getUri().toString(),
endpoint.getMaxRetryCount()
);
configuration.endpoints().remove(endpoint);
return nextEndpoint();
}
public void resetEndpointRetries() {
counter.set(0);
}

return endpoint;
public int endpointRetries() {
return counter.get();
}

public HttpClient createHttpClient(WebSocketEndpoint websocketEndpoint) {
Expand Down
Expand Up @@ -29,25 +29,12 @@ public class WebSocketEndpoint {
private static final String HTTPS_SCHEME = "https";
private static final int DEFAULT_HTTP_PORT = 80;
private static final int DEFAULT_HTTPS_PORT = 443;
public static final int DEFAULT_MAX_RETRY_COUNT = 5;

private final URI uri;
private final int maxRetryCount;
private int retryCount;

@Builder
public WebSocketEndpoint(final String url, final int maxRetryCount) {
public WebSocketEndpoint(final String url) {
this.uri = URI.create(url);
this.maxRetryCount = maxRetryCount > 0 ? maxRetryCount : DEFAULT_MAX_RETRY_COUNT;
this.retryCount = 0;
}

public void incrementRetryCount() {
this.retryCount++;
}

public void resetRetryCount() {
this.retryCount = 0;
}

public int getPort() {
Expand All @@ -67,8 +54,4 @@ public String getHost() {
public String resolvePath(String path) {
return uri.resolve(path).getRawPath();
}

public boolean isRemovable() {
return this.retryCount > maxRetryCount;
}
}
Expand Up @@ -17,7 +17,11 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.gravitee.exchange.api.command.Command;
import io.gravitee.exchange.api.command.hello.HelloReply;
import io.gravitee.exchange.api.configuration.IdentifyConfiguration;
import io.gravitee.exchange.api.websocket.protocol.ProtocolAdapter;
import io.gravitee.exchange.api.websocket.protocol.ProtocolExchange;
import io.gravitee.exchange.api.websocket.protocol.ProtocolVersion;
import io.gravitee.exchange.connector.websocket.client.WebSocketClientConfiguration;
import io.gravitee.exchange.connector.websocket.client.WebSocketConnectorClientFactory;
Expand Down Expand Up @@ -85,7 +89,40 @@ void should_not_fail_with_timeout_when_initializing_connector_without_endpoint()
}

@Test
void should_reconnect_after_unexpected_close(VertxTestContext testContext) throws InterruptedException {
void should_not_reconnect_after_hello_handshake_failure(VertxTestContext testContext) throws InterruptedException {
AtomicReference<ServerWebSocket> ws = new AtomicReference<>();
Checkpoint checkpoint = testContext.checkpoint(1);
websocketServerHandler =
serverWebSocket -> {
ProtocolAdapter protocolAdapter = protocolAdapter(ProtocolVersion.V1);
serverWebSocket.binaryMessageHandler(buffer -> {
ProtocolExchange websocketExchange = protocolAdapter.read(buffer);
Command<?> command = websocketExchange.asCommand();
HelloReply helloReply = new HelloReply(command.getId(), "onError");
serverWebSocket
.writeBinaryMessage(
protocolAdapter.write(
ProtocolExchange
.builder()
.type(ProtocolExchange.Type.REPLY)
.exchangeType(helloReply.getType())
.exchange(helloReply)
.build()
)
)
.subscribe();
});
ws.set(serverWebSocket);
checkpoint.flag();
};
// Initialize first
websocketExchangeConnector.initialize().test().awaitDone(10, TimeUnit.SECONDS).assertError(WebSocketConnectorException.class);

assertThat(testContext.awaitCompletion(10, TimeUnit.SECONDS)).isTrue();
}

@Test
void should_retry_initialize_on_retryable_exception(VertxTestContext testContext) throws InterruptedException {
AtomicReference<ServerWebSocket> ws = new AtomicReference<>();
Checkpoint checkpoint = testContext.checkpoint(2);
websocketServerHandler =
Expand Down
Expand Up @@ -72,17 +72,6 @@ void should_return_empty_headers_without_configuration() {
assertThat(cut.headers()).isEmpty();
}

@Test
void should_return_max_retry() {
environment.withProperty("%s.connector.ws.maxRetry".formatted(prefix), "123456");
assertThat(cut.maxRetry()).isEqualTo(123456);
}

@Test
void should_return_default_max_retry_without_configuration() {
assertThat(cut.maxRetry()).isEqualTo(5);
}

@Test
void should_return_trust_all() {
environment.withProperty("%s.connector.ws.ssl.trustAll".formatted(prefix), "true");
Expand Down
Expand Up @@ -78,13 +78,15 @@ void should_return_next_endpoint() {
}

@Test
void should_return_null_when_max_retry_reach() {
void should_loop_over_endpoint_on_each_next_endpoint() {
environment.setProperty("exchange.connector.ws.endpoints[0]", "http://endpoint:1234");
for (int i = 0; i < WebSocketEndpoint.DEFAULT_MAX_RETRY_COUNT; i++) {
cut.nextEndpoint();
}
WebSocketEndpoint webSocketEndpoint = cut.nextEndpoint();
assertThat(webSocketEndpoint).isNull();
environment.setProperty("exchange.connector.ws.endpoints[1]", "http://endpoint:5678");
WebSocketEndpoint webSocketEndpoint1 = cut.nextEndpoint();
WebSocketEndpoint webSocketEndpoint2 = cut.nextEndpoint();
WebSocketEndpoint webSocketEndpoint3 = cut.nextEndpoint();
assertThat(webSocketEndpoint1.getPort()).isEqualTo(1234);
assertThat(webSocketEndpoint2.getPort()).isEqualTo(5678);
assertThat(webSocketEndpoint3.getPort()).isEqualTo(1234);
}

@Test
Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.util.stream.Stream;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.DisplayNameGenerator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -47,51 +46,15 @@ private static Stream<Arguments> urls() {
@ParameterizedTest
@MethodSource("urls")
void should_create_default_websocket_endpoint(String baseUrl, String host, int port) {
WebSocketEndpoint endpoint = new WebSocketEndpoint(baseUrl, -1);
WebSocketEndpoint endpoint = new WebSocketEndpoint(baseUrl);
assertThat(endpoint.getHost()).isEqualTo(host);
assertThat(endpoint.getPort()).isEqualTo(port);
assertThat(endpoint.getMaxRetryCount()).isEqualTo(5);
assertThat(endpoint.getRetryCount()).isZero();
}

@ParameterizedTest
@MethodSource("urls")
void should_resolve_path(String baseUrl) {
WebSocketEndpoint endpoint = new WebSocketEndpoint(baseUrl, -1);
WebSocketEndpoint endpoint = new WebSocketEndpoint(baseUrl);
assertThat(endpoint.resolvePath("/path")).isEqualTo("/path");
}

@Test
void should_create_websocket_endpoint_with_max_retry() {
WebSocketEndpoint endpoint = new WebSocketEndpoint("http://localhost:8062", 10);
assertThat(endpoint.getMaxRetryCount()).isEqualTo(10);
assertThat(endpoint.getRetryCount()).isZero();
}

@Test
void should_increment_retry_counter() {
WebSocketEndpoint endpoint = new WebSocketEndpoint("http://localhost:8062", -1);
assertThat(endpoint.getRetryCount()).isZero();
assertThat(endpoint.getMaxRetryCount()).isEqualTo(5);
endpoint.incrementRetryCount();
assertThat(endpoint.getRetryCount()).isEqualTo(1);
}

@Test
void should_resent__retry_counter() {
WebSocketEndpoint endpoint = new WebSocketEndpoint("http://localhost:8062", -1);
endpoint.incrementRetryCount();
assertThat(endpoint.getRetryCount()).isEqualTo(1);
endpoint.resetRetryCount();
assertThat(endpoint.getRetryCount()).isZero();
}

@Test
void should_be_removable_when_retry_is_higher_than_max() {
WebSocketEndpoint endpoint = new WebSocketEndpoint("http://localhost:8062", 1);
endpoint.incrementRetryCount();
assertThat(endpoint.isRemovable()).isFalse();
endpoint.incrementRetryCount();
assertThat(endpoint.isRemovable()).isTrue();
}
}
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -35,7 +35,7 @@

<properties>
<gravitee-bom.version>6.0.35</gravitee-bom.version>
<gravitee-common.version>3.4.1</gravitee-common.version>
<gravitee-common.version>4.2.0</gravitee-common.version>
<gravitee-node.version>5.10.0</gravitee-node.version>

<commons-lang3.version>3.14.0</commons-lang3.version>
Expand Down

0 comments on commit 052a6ad

Please sign in to comment.