Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use exponential backoff strategy for reconnection retry #25

Merged
merged 1 commit into from Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -45,6 +45,11 @@
<artifactId>gravitee-exchange-connector-embedded</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.gravitee.common</groupId>
<artifactId>gravitee-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
Expand All @@ -65,11 +70,6 @@
<artifactId>rxjava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.gravitee.common</groupId>
<artifactId>gravitee-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.gravitee.exchange</groupId>
<artifactId>gravitee-exchange-api</artifactId>
Expand Down
Expand Up @@ -17,6 +17,7 @@

import static io.gravitee.exchange.api.controller.ws.WebsocketControllerConstants.EXCHANGE_PROTOCOL_HEADER;

import io.gravitee.common.utils.RxHelper;
import io.gravitee.exchange.api.command.Command;
import io.gravitee.exchange.api.command.CommandAdapter;
import io.gravitee.exchange.api.command.CommandHandler;
Expand All @@ -30,7 +31,6 @@
import io.gravitee.exchange.connector.websocket.client.WebSocketConnectorClientFactory;
import io.gravitee.exchange.connector.websocket.exception.WebSocketConnectorException;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import io.vertx.core.http.WebSocketConnectOptions;
Expand Down Expand Up @@ -87,15 +87,16 @@ public Completable initialize() {
})
);
})
.retryWhen(errors ->
errors.flatMap(err -> {
if (err instanceof WebSocketConnectorException connectorException && connectorException.isRetryable()) {
return Flowable.timer(5000, TimeUnit.MILLISECONDS);
}
log.error("Unable to connect to Exchange Connect Endpoint, stop retrying.");
return Flowable.error(err);
})
);
.retryWhen(
RxHelper.retryExponentialBackoff(
1,
300,
TimeUnit.SECONDS,
0.5,
throwable -> throwable instanceof WebSocketConnectorException connectorException && connectorException.isRetryable()
)
)
.doOnError(throwable -> log.error("Unable to connect to Exchange Connect Endpoint."));
}

private Single<WebSocket> connect() {
Expand All @@ -104,7 +105,7 @@ private Single<WebSocket> connect() {
.switchIfEmpty(
Maybe.fromRunnable(() -> {
throw new WebSocketConnectorException(
"No Exchange Controller Endpoint is defined or available. Please check your configuration",
"No Exchange Controller Endpoint is defined. Please check your configuration",
false
);
})
Expand All @@ -123,28 +124,18 @@ private Single<WebSocket> connect() {
return httpClient
.rxWebSocket(webSocketConnectOptions)
.doOnSuccess(webSocket -> {
webSocketEndpoint.resetRetryCount();
webSocketConnectorClientFactory.resetEndpointRetries();
log.info(
"Connector is now connected to Exchange Controller through websocket via [{}]",
webSocketEndpoint.getUri().toString()
);
})
.onErrorResumeNext(throwable -> {
int retryCount = webSocketEndpoint.getRetryCount();
int maxRetryCount = webSocketEndpoint.getMaxRetryCount();
if (retryCount < maxRetryCount) {
log.error(
"Unable to connect to Exchange Connect Endpoint: {}/{} time, retrying...",
retryCount,
maxRetryCount,
throwable
);
} else {
log.error(
"Unable to connect to Exchange Connect Endpoint. Max retries attempt reached, changing endpoint.",
throwable
);
}
log.error(
"Unable to connect to Exchange Connect Endpoint {} times, retrying...",
webSocketConnectorClientFactory.endpointRetries(),
throwable
);
// Force the HTTP client to close after a defect.
return httpClient
.close()
Expand Down
Expand Up @@ -31,8 +31,6 @@
public class WebSocketClientConfiguration {

public static final String HEADERS_KEY = "connector.ws.headers";
public static final String MAX_RETRY_KEY = "connector.ws.maxRetry";
public static final int MAX_RETRY_DEFAULT = 5;
public static final String TRUST_ALL_KEY = "connector.ws.ssl.trustAll";
public static final boolean TRUST_ALL_DEFAULT = false;
public static final String VERIFY_HOST_KEY = "connector.ws.ssl.verifyHost";
Expand Down Expand Up @@ -76,10 +74,6 @@ private Map<String, String> readHeaders() {
return computedHeaders;
}

public int maxRetry() {
return identifyConfiguration.getProperty(MAX_RETRY_KEY, Integer.class, MAX_RETRY_DEFAULT);
}

public boolean trustAll() {
return identifyConfiguration.getProperty(TRUST_ALL_KEY, Boolean.class, TRUST_ALL_DEFAULT);
}
Expand Down Expand Up @@ -156,8 +150,7 @@ private List<WebSocketEndpoint> readEndpoints() {
List<WebSocketEndpoint> endpointsConfiguration = new ArrayList<>();
List<String> propertyList = identifyConfiguration.getPropertyList(ENDPOINTS_KEY);
if (propertyList != null) {
int maxRetryCount = maxRetry();
endpointsConfiguration.addAll(propertyList.stream().map(url -> new WebSocketEndpoint(url, maxRetryCount)).toList());
endpointsConfiguration.addAll(propertyList.stream().map(WebSocketEndpoint::new).toList());
}
return endpointsConfiguration;
}
Expand Down
Expand Up @@ -53,21 +53,15 @@ public WebSocketEndpoint nextEndpoint() {
if (endpoints.isEmpty()) {
return null;
}
return endpoints.get(Math.abs(counter.getAndIncrement() % endpoints.size()));
}

WebSocketEndpoint endpoint = endpoints.get(Math.abs(counter.getAndIncrement() % endpoints.size()));

endpoint.incrementRetryCount();
if (endpoint.isRemovable()) {
log.info(
"Websocket Exchange Connector connects to endpoint at {} more than {} times. Removing instance...",
endpoint.getUri().toString(),
endpoint.getMaxRetryCount()
);
configuration.endpoints().remove(endpoint);
return nextEndpoint();
}
public void resetEndpointRetries() {
counter.set(0);
}

return endpoint;
public int endpointRetries() {
return counter.get();
}

public HttpClient createHttpClient(WebSocketEndpoint websocketEndpoint) {
Expand Down
Expand Up @@ -29,25 +29,12 @@ public class WebSocketEndpoint {
private static final String HTTPS_SCHEME = "https";
private static final int DEFAULT_HTTP_PORT = 80;
private static final int DEFAULT_HTTPS_PORT = 443;
public static final int DEFAULT_MAX_RETRY_COUNT = 5;

private final URI uri;
private final int maxRetryCount;
private int retryCount;

@Builder
public WebSocketEndpoint(final String url, final int maxRetryCount) {
public WebSocketEndpoint(final String url) {
this.uri = URI.create(url);
this.maxRetryCount = maxRetryCount > 0 ? maxRetryCount : DEFAULT_MAX_RETRY_COUNT;
this.retryCount = 0;
}

public void incrementRetryCount() {
this.retryCount++;
}

public void resetRetryCount() {
this.retryCount = 0;
}

public int getPort() {
Expand All @@ -67,8 +54,4 @@ public String getHost() {
public String resolvePath(String path) {
return uri.resolve(path).getRawPath();
}

public boolean isRemovable() {
return this.retryCount > maxRetryCount;
}
}
Expand Up @@ -17,7 +17,11 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.gravitee.exchange.api.command.Command;
import io.gravitee.exchange.api.command.hello.HelloReply;
import io.gravitee.exchange.api.configuration.IdentifyConfiguration;
import io.gravitee.exchange.api.websocket.protocol.ProtocolAdapter;
import io.gravitee.exchange.api.websocket.protocol.ProtocolExchange;
import io.gravitee.exchange.api.websocket.protocol.ProtocolVersion;
import io.gravitee.exchange.connector.websocket.client.WebSocketClientConfiguration;
import io.gravitee.exchange.connector.websocket.client.WebSocketConnectorClientFactory;
Expand Down Expand Up @@ -85,7 +89,40 @@ void should_not_fail_with_timeout_when_initializing_connector_without_endpoint()
}

@Test
void should_reconnect_after_unexpected_close(VertxTestContext testContext) throws InterruptedException {
void should_not_reconnect_after_hello_handshake_failure(VertxTestContext testContext) throws InterruptedException {
AtomicReference<ServerWebSocket> ws = new AtomicReference<>();
Checkpoint checkpoint = testContext.checkpoint(1);
websocketServerHandler =
serverWebSocket -> {
ProtocolAdapter protocolAdapter = protocolAdapter(ProtocolVersion.V1);
serverWebSocket.binaryMessageHandler(buffer -> {
ProtocolExchange websocketExchange = protocolAdapter.read(buffer);
Command<?> command = websocketExchange.asCommand();
HelloReply helloReply = new HelloReply(command.getId(), "onError");
serverWebSocket
.writeBinaryMessage(
protocolAdapter.write(
ProtocolExchange
.builder()
.type(ProtocolExchange.Type.REPLY)
.exchangeType(helloReply.getType())
.exchange(helloReply)
.build()
)
)
.subscribe();
});
ws.set(serverWebSocket);
checkpoint.flag();
};
// Initialize first
websocketExchangeConnector.initialize().test().awaitDone(10, TimeUnit.SECONDS).assertError(WebSocketConnectorException.class);

assertThat(testContext.awaitCompletion(10, TimeUnit.SECONDS)).isTrue();
}

@Test
void should_retry_initialize_on_retryable_exception(VertxTestContext testContext) throws InterruptedException {
AtomicReference<ServerWebSocket> ws = new AtomicReference<>();
Checkpoint checkpoint = testContext.checkpoint(2);
websocketServerHandler =
Expand Down
Expand Up @@ -72,17 +72,6 @@ void should_return_empty_headers_without_configuration() {
assertThat(cut.headers()).isEmpty();
}

@Test
void should_return_max_retry() {
environment.withProperty("%s.connector.ws.maxRetry".formatted(prefix), "123456");
assertThat(cut.maxRetry()).isEqualTo(123456);
}

@Test
void should_return_default_max_retry_without_configuration() {
assertThat(cut.maxRetry()).isEqualTo(5);
}

@Test
void should_return_trust_all() {
environment.withProperty("%s.connector.ws.ssl.trustAll".formatted(prefix), "true");
Expand Down
Expand Up @@ -78,13 +78,15 @@ void should_return_next_endpoint() {
}

@Test
void should_return_null_when_max_retry_reach() {
void should_loop_over_endpoint_on_each_next_endpoint() {
environment.setProperty("exchange.connector.ws.endpoints[0]", "http://endpoint:1234");
for (int i = 0; i < WebSocketEndpoint.DEFAULT_MAX_RETRY_COUNT; i++) {
cut.nextEndpoint();
}
WebSocketEndpoint webSocketEndpoint = cut.nextEndpoint();
assertThat(webSocketEndpoint).isNull();
environment.setProperty("exchange.connector.ws.endpoints[1]", "http://endpoint:5678");
WebSocketEndpoint webSocketEndpoint1 = cut.nextEndpoint();
WebSocketEndpoint webSocketEndpoint2 = cut.nextEndpoint();
WebSocketEndpoint webSocketEndpoint3 = cut.nextEndpoint();
assertThat(webSocketEndpoint1.getPort()).isEqualTo(1234);
assertThat(webSocketEndpoint2.getPort()).isEqualTo(5678);
assertThat(webSocketEndpoint3.getPort()).isEqualTo(1234);
}

@Test
Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.util.stream.Stream;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.DisplayNameGenerator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -47,51 +46,15 @@ private static Stream<Arguments> urls() {
@ParameterizedTest
@MethodSource("urls")
void should_create_default_websocket_endpoint(String baseUrl, String host, int port) {
WebSocketEndpoint endpoint = new WebSocketEndpoint(baseUrl, -1);
WebSocketEndpoint endpoint = new WebSocketEndpoint(baseUrl);
assertThat(endpoint.getHost()).isEqualTo(host);
assertThat(endpoint.getPort()).isEqualTo(port);
assertThat(endpoint.getMaxRetryCount()).isEqualTo(5);
assertThat(endpoint.getRetryCount()).isZero();
}

@ParameterizedTest
@MethodSource("urls")
void should_resolve_path(String baseUrl) {
WebSocketEndpoint endpoint = new WebSocketEndpoint(baseUrl, -1);
WebSocketEndpoint endpoint = new WebSocketEndpoint(baseUrl);
assertThat(endpoint.resolvePath("/path")).isEqualTo("/path");
}

@Test
void should_create_websocket_endpoint_with_max_retry() {
WebSocketEndpoint endpoint = new WebSocketEndpoint("http://localhost:8062", 10);
assertThat(endpoint.getMaxRetryCount()).isEqualTo(10);
assertThat(endpoint.getRetryCount()).isZero();
}

@Test
void should_increment_retry_counter() {
WebSocketEndpoint endpoint = new WebSocketEndpoint("http://localhost:8062", -1);
assertThat(endpoint.getRetryCount()).isZero();
assertThat(endpoint.getMaxRetryCount()).isEqualTo(5);
endpoint.incrementRetryCount();
assertThat(endpoint.getRetryCount()).isEqualTo(1);
}

@Test
void should_resent__retry_counter() {
WebSocketEndpoint endpoint = new WebSocketEndpoint("http://localhost:8062", -1);
endpoint.incrementRetryCount();
assertThat(endpoint.getRetryCount()).isEqualTo(1);
endpoint.resetRetryCount();
assertThat(endpoint.getRetryCount()).isZero();
}

@Test
void should_be_removable_when_retry_is_higher_than_max() {
WebSocketEndpoint endpoint = new WebSocketEndpoint("http://localhost:8062", 1);
endpoint.incrementRetryCount();
assertThat(endpoint.isRemovable()).isFalse();
endpoint.incrementRetryCount();
assertThat(endpoint.isRemovable()).isTrue();
}
}
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -35,7 +35,7 @@

<properties>
<gravitee-bom.version>6.0.35</gravitee-bom.version>
<gravitee-common.version>3.4.1</gravitee-common.version>
<gravitee-common.version>4.2.0</gravitee-common.version>
<gravitee-node.version>5.10.0</gravitee-node.version>

<commons-lang3.version>3.14.0</commons-lang3.version>
Expand Down