diff --git a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/pom.xml b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/pom.xml index 5958923..30bfc56 100644 --- a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/pom.xml +++ b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/pom.xml @@ -45,6 +45,11 @@ gravitee-exchange-connector-embedded ${project.version} + + io.gravitee.common + gravitee-common + provided + org.springframework spring-context @@ -65,11 +70,6 @@ rxjava provided - - io.gravitee.common - gravitee-common - test - io.gravitee.exchange gravitee-exchange-api diff --git a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/WebSocketExchangeConnector.java b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/WebSocketExchangeConnector.java index 18392f0..dbb90b3 100644 --- a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/WebSocketExchangeConnector.java +++ b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/WebSocketExchangeConnector.java @@ -17,6 +17,7 @@ import static io.gravitee.exchange.api.controller.ws.WebsocketControllerConstants.EXCHANGE_PROTOCOL_HEADER; +import io.gravitee.common.utils.RxHelper; import io.gravitee.exchange.api.command.Command; import io.gravitee.exchange.api.command.CommandAdapter; import io.gravitee.exchange.api.command.CommandHandler; @@ -30,7 +31,6 @@ import io.gravitee.exchange.connector.websocket.client.WebSocketConnectorClientFactory; import io.gravitee.exchange.connector.websocket.exception.WebSocketConnectorException; import io.reactivex.rxjava3.core.Completable; -import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Single; import io.vertx.core.http.WebSocketConnectOptions; @@ -87,15 +87,16 @@ public Completable initialize() { }) ); }) - .retryWhen(errors -> - errors.flatMap(err -> { - if (err instanceof WebSocketConnectorException connectorException && connectorException.isRetryable()) { - return Flowable.timer(5000, TimeUnit.MILLISECONDS); - } - log.error("Unable to connect to Exchange Connect Endpoint, stop retrying."); - return Flowable.error(err); - }) - ); + .retryWhen( + RxHelper.retryExponentialBackoff( + 1, + 300, + TimeUnit.SECONDS, + 0.5, + throwable -> throwable instanceof WebSocketConnectorException connectorException && connectorException.isRetryable() + ) + ) + .doOnError(throwable -> log.error("Unable to connect to Exchange Connect Endpoint.")); } private Single connect() { @@ -104,7 +105,7 @@ private Single connect() { .switchIfEmpty( Maybe.fromRunnable(() -> { throw new WebSocketConnectorException( - "No Exchange Controller Endpoint is defined or available. Please check your configuration", + "No Exchange Controller Endpoint is defined. Please check your configuration", false ); }) @@ -123,28 +124,18 @@ private Single connect() { return httpClient .rxWebSocket(webSocketConnectOptions) .doOnSuccess(webSocket -> { - webSocketEndpoint.resetRetryCount(); + webSocketConnectorClientFactory.resetEndpointRetries(); log.info( "Connector is now connected to Exchange Controller through websocket via [{}]", webSocketEndpoint.getUri().toString() ); }) .onErrorResumeNext(throwable -> { - int retryCount = webSocketEndpoint.getRetryCount(); - int maxRetryCount = webSocketEndpoint.getMaxRetryCount(); - if (retryCount < maxRetryCount) { - log.error( - "Unable to connect to Exchange Connect Endpoint: {}/{} time, retrying...", - retryCount, - maxRetryCount, - throwable - ); - } else { - log.error( - "Unable to connect to Exchange Connect Endpoint. Max retries attempt reached, changing endpoint.", - throwable - ); - } + log.error( + "Unable to connect to Exchange Connect Endpoint {} times, retrying...", + webSocketConnectorClientFactory.endpointRetries(), + throwable + ); // Force the HTTP client to close after a defect. return httpClient .close() diff --git a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/client/WebSocketClientConfiguration.java b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/client/WebSocketClientConfiguration.java index d2378c9..d3a111f 100644 --- a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/client/WebSocketClientConfiguration.java +++ b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/client/WebSocketClientConfiguration.java @@ -31,8 +31,6 @@ public class WebSocketClientConfiguration { public static final String HEADERS_KEY = "connector.ws.headers"; - public static final String MAX_RETRY_KEY = "connector.ws.maxRetry"; - public static final int MAX_RETRY_DEFAULT = 5; public static final String TRUST_ALL_KEY = "connector.ws.ssl.trustAll"; public static final boolean TRUST_ALL_DEFAULT = false; public static final String VERIFY_HOST_KEY = "connector.ws.ssl.verifyHost"; @@ -76,10 +74,6 @@ private Map readHeaders() { return computedHeaders; } - public int maxRetry() { - return identifyConfiguration.getProperty(MAX_RETRY_KEY, Integer.class, MAX_RETRY_DEFAULT); - } - public boolean trustAll() { return identifyConfiguration.getProperty(TRUST_ALL_KEY, Boolean.class, TRUST_ALL_DEFAULT); } @@ -156,8 +150,7 @@ private List readEndpoints() { List endpointsConfiguration = new ArrayList<>(); List propertyList = identifyConfiguration.getPropertyList(ENDPOINTS_KEY); if (propertyList != null) { - int maxRetryCount = maxRetry(); - endpointsConfiguration.addAll(propertyList.stream().map(url -> new WebSocketEndpoint(url, maxRetryCount)).toList()); + endpointsConfiguration.addAll(propertyList.stream().map(WebSocketEndpoint::new).toList()); } return endpointsConfiguration; } diff --git a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/client/WebSocketConnectorClientFactory.java b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/client/WebSocketConnectorClientFactory.java index 25b5c8f..e139ccf 100644 --- a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/client/WebSocketConnectorClientFactory.java +++ b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/client/WebSocketConnectorClientFactory.java @@ -53,21 +53,15 @@ public WebSocketEndpoint nextEndpoint() { if (endpoints.isEmpty()) { return null; } + return endpoints.get(Math.abs(counter.getAndIncrement() % endpoints.size())); + } - WebSocketEndpoint endpoint = endpoints.get(Math.abs(counter.getAndIncrement() % endpoints.size())); - - endpoint.incrementRetryCount(); - if (endpoint.isRemovable()) { - log.info( - "Websocket Exchange Connector connects to endpoint at {} more than {} times. Removing instance...", - endpoint.getUri().toString(), - endpoint.getMaxRetryCount() - ); - configuration.endpoints().remove(endpoint); - return nextEndpoint(); - } + public void resetEndpointRetries() { + counter.set(0); + } - return endpoint; + public int endpointRetries() { + return counter.get(); } public HttpClient createHttpClient(WebSocketEndpoint websocketEndpoint) { diff --git a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/client/WebSocketEndpoint.java b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/client/WebSocketEndpoint.java index fefaea9..ced5f46 100644 --- a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/client/WebSocketEndpoint.java +++ b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/main/java/io/gravitee/exchange/connector/websocket/client/WebSocketEndpoint.java @@ -29,25 +29,12 @@ public class WebSocketEndpoint { private static final String HTTPS_SCHEME = "https"; private static final int DEFAULT_HTTP_PORT = 80; private static final int DEFAULT_HTTPS_PORT = 443; - public static final int DEFAULT_MAX_RETRY_COUNT = 5; private final URI uri; - private final int maxRetryCount; - private int retryCount; @Builder - public WebSocketEndpoint(final String url, final int maxRetryCount) { + public WebSocketEndpoint(final String url) { this.uri = URI.create(url); - this.maxRetryCount = maxRetryCount > 0 ? maxRetryCount : DEFAULT_MAX_RETRY_COUNT; - this.retryCount = 0; - } - - public void incrementRetryCount() { - this.retryCount++; - } - - public void resetRetryCount() { - this.retryCount = 0; } public int getPort() { @@ -67,8 +54,4 @@ public String getHost() { public String resolvePath(String path) { return uri.resolve(path).getRawPath(); } - - public boolean isRemovable() { - return this.retryCount > maxRetryCount; - } } diff --git a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/WebSocketExchangeConnectorTest.java b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/WebSocketExchangeConnectorTest.java index ecc524b..f537af7 100644 --- a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/WebSocketExchangeConnectorTest.java +++ b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/WebSocketExchangeConnectorTest.java @@ -17,7 +17,11 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.gravitee.exchange.api.command.Command; +import io.gravitee.exchange.api.command.hello.HelloReply; import io.gravitee.exchange.api.configuration.IdentifyConfiguration; +import io.gravitee.exchange.api.websocket.protocol.ProtocolAdapter; +import io.gravitee.exchange.api.websocket.protocol.ProtocolExchange; import io.gravitee.exchange.api.websocket.protocol.ProtocolVersion; import io.gravitee.exchange.connector.websocket.client.WebSocketClientConfiguration; import io.gravitee.exchange.connector.websocket.client.WebSocketConnectorClientFactory; @@ -85,7 +89,40 @@ void should_not_fail_with_timeout_when_initializing_connector_without_endpoint() } @Test - void should_reconnect_after_unexpected_close(VertxTestContext testContext) throws InterruptedException { + void should_not_reconnect_after_hello_handshake_failure(VertxTestContext testContext) throws InterruptedException { + AtomicReference ws = new AtomicReference<>(); + Checkpoint checkpoint = testContext.checkpoint(1); + websocketServerHandler = + serverWebSocket -> { + ProtocolAdapter protocolAdapter = protocolAdapter(ProtocolVersion.V1); + serverWebSocket.binaryMessageHandler(buffer -> { + ProtocolExchange websocketExchange = protocolAdapter.read(buffer); + Command command = websocketExchange.asCommand(); + HelloReply helloReply = new HelloReply(command.getId(), "onError"); + serverWebSocket + .writeBinaryMessage( + protocolAdapter.write( + ProtocolExchange + .builder() + .type(ProtocolExchange.Type.REPLY) + .exchangeType(helloReply.getType()) + .exchange(helloReply) + .build() + ) + ) + .subscribe(); + }); + ws.set(serverWebSocket); + checkpoint.flag(); + }; + // Initialize first + websocketExchangeConnector.initialize().test().awaitDone(10, TimeUnit.SECONDS).assertError(WebSocketConnectorException.class); + + assertThat(testContext.awaitCompletion(10, TimeUnit.SECONDS)).isTrue(); + } + + @Test + void should_retry_initialize_on_retryable_exception(VertxTestContext testContext) throws InterruptedException { AtomicReference ws = new AtomicReference<>(); Checkpoint checkpoint = testContext.checkpoint(2); websocketServerHandler = diff --git a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/client/WebSocketClientConfigurationTest.java b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/client/WebSocketClientConfigurationTest.java index 4d530ee..65b0b2e 100644 --- a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/client/WebSocketClientConfigurationTest.java +++ b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/client/WebSocketClientConfigurationTest.java @@ -72,17 +72,6 @@ void should_return_empty_headers_without_configuration() { assertThat(cut.headers()).isEmpty(); } - @Test - void should_return_max_retry() { - environment.withProperty("%s.connector.ws.maxRetry".formatted(prefix), "123456"); - assertThat(cut.maxRetry()).isEqualTo(123456); - } - - @Test - void should_return_default_max_retry_without_configuration() { - assertThat(cut.maxRetry()).isEqualTo(5); - } - @Test void should_return_trust_all() { environment.withProperty("%s.connector.ws.ssl.trustAll".formatted(prefix), "true"); diff --git a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/client/WebSocketConnectorClientFactoryTest.java b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/client/WebSocketConnectorClientFactoryTest.java index 6f3fef3..12337d0 100644 --- a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/client/WebSocketConnectorClientFactoryTest.java +++ b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/client/WebSocketConnectorClientFactoryTest.java @@ -78,13 +78,15 @@ void should_return_next_endpoint() { } @Test - void should_return_null_when_max_retry_reach() { + void should_loop_over_endpoint_on_each_next_endpoint() { environment.setProperty("exchange.connector.ws.endpoints[0]", "http://endpoint:1234"); - for (int i = 0; i < WebSocketEndpoint.DEFAULT_MAX_RETRY_COUNT; i++) { - cut.nextEndpoint(); - } - WebSocketEndpoint webSocketEndpoint = cut.nextEndpoint(); - assertThat(webSocketEndpoint).isNull(); + environment.setProperty("exchange.connector.ws.endpoints[1]", "http://endpoint:5678"); + WebSocketEndpoint webSocketEndpoint1 = cut.nextEndpoint(); + WebSocketEndpoint webSocketEndpoint2 = cut.nextEndpoint(); + WebSocketEndpoint webSocketEndpoint3 = cut.nextEndpoint(); + assertThat(webSocketEndpoint1.getPort()).isEqualTo(1234); + assertThat(webSocketEndpoint2.getPort()).isEqualTo(5678); + assertThat(webSocketEndpoint3.getPort()).isEqualTo(1234); } @Test diff --git a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/client/WebSocketEndpointTest.java b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/client/WebSocketEndpointTest.java index 7770cec..37d75df 100644 --- a/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/client/WebSocketEndpointTest.java +++ b/gravitee-exchange-connector/gravitee-exchange-connector-websocket/src/test/java/io/gravitee/exchange/connector/websocket/client/WebSocketEndpointTest.java @@ -20,7 +20,6 @@ import java.util.stream.Stream; import org.junit.jupiter.api.DisplayNameGeneration; import org.junit.jupiter.api.DisplayNameGenerator; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -47,51 +46,15 @@ private static Stream urls() { @ParameterizedTest @MethodSource("urls") void should_create_default_websocket_endpoint(String baseUrl, String host, int port) { - WebSocketEndpoint endpoint = new WebSocketEndpoint(baseUrl, -1); + WebSocketEndpoint endpoint = new WebSocketEndpoint(baseUrl); assertThat(endpoint.getHost()).isEqualTo(host); assertThat(endpoint.getPort()).isEqualTo(port); - assertThat(endpoint.getMaxRetryCount()).isEqualTo(5); - assertThat(endpoint.getRetryCount()).isZero(); } @ParameterizedTest @MethodSource("urls") void should_resolve_path(String baseUrl) { - WebSocketEndpoint endpoint = new WebSocketEndpoint(baseUrl, -1); + WebSocketEndpoint endpoint = new WebSocketEndpoint(baseUrl); assertThat(endpoint.resolvePath("/path")).isEqualTo("/path"); } - - @Test - void should_create_websocket_endpoint_with_max_retry() { - WebSocketEndpoint endpoint = new WebSocketEndpoint("http://localhost:8062", 10); - assertThat(endpoint.getMaxRetryCount()).isEqualTo(10); - assertThat(endpoint.getRetryCount()).isZero(); - } - - @Test - void should_increment_retry_counter() { - WebSocketEndpoint endpoint = new WebSocketEndpoint("http://localhost:8062", -1); - assertThat(endpoint.getRetryCount()).isZero(); - assertThat(endpoint.getMaxRetryCount()).isEqualTo(5); - endpoint.incrementRetryCount(); - assertThat(endpoint.getRetryCount()).isEqualTo(1); - } - - @Test - void should_resent__retry_counter() { - WebSocketEndpoint endpoint = new WebSocketEndpoint("http://localhost:8062", -1); - endpoint.incrementRetryCount(); - assertThat(endpoint.getRetryCount()).isEqualTo(1); - endpoint.resetRetryCount(); - assertThat(endpoint.getRetryCount()).isZero(); - } - - @Test - void should_be_removable_when_retry_is_higher_than_max() { - WebSocketEndpoint endpoint = new WebSocketEndpoint("http://localhost:8062", 1); - endpoint.incrementRetryCount(); - assertThat(endpoint.isRemovable()).isFalse(); - endpoint.incrementRetryCount(); - assertThat(endpoint.isRemovable()).isTrue(); - } } diff --git a/pom.xml b/pom.xml index c62fbce..e44435b 100644 --- a/pom.xml +++ b/pom.xml @@ -35,7 +35,7 @@ 6.0.35 - 3.4.1 + 4.2.0 5.10.0 3.14.0