Skip to content

Commit

Permalink
Improve Vert.x implementation for edge cases.
Browse files Browse the repository at this point in the history
  • Loading branch information
Chavjoh committed Nov 26, 2023
1 parent c88442b commit 4ab8730
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 36 deletions.
Expand Up @@ -8,14 +8,12 @@
import com.chavaillaz.client.common.AbstractHttpClient;
import com.chavaillaz.client.common.security.Authentication;
import com.fasterxml.jackson.databind.JavaType;
import io.vertx.core.Vertx;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -29,88 +27,89 @@ public class AbstractVertxHttpClient extends AbstractHttpClient implements AutoC
/**
* Creates a new abstract client based on Vert.x HTTP client.
*
* @param vertx The Vert.x instance to create the web client
* @param options The web client options
* @param client The web client to use
* @param baseUrl The base URL of endpoints
* @param authentication The authentication information
*/
protected AbstractVertxHttpClient(Vertx vertx, WebClientOptions options, String baseUrl, Authentication authentication) {
protected AbstractVertxHttpClient(WebClient client, String baseUrl, Authentication authentication) {
super(baseUrl, authentication);
this.client = WebClient.create(vertx, options);
}

/**
* Creates a new abstract client based on Vert.x HTTP client.
*
* @param httpClient The HTTP client to wrap in the Vert.x web client
* @param baseUrl The base URL of endpoints
* @param authentication The authentication information
*/
protected AbstractVertxHttpClient(HttpClient httpClient, String baseUrl, Authentication authentication) {
super(baseUrl, authentication);
this.client = WebClient.wrap(httpClient);
this.client = client;
}

/**
* Creates a request based on the given URL and replaces the parameters in it by the given ones.
*
* @param method The HTTP method for the HTTP request to build
* @param method The HTTP method for the request to build
* @param url The URL with possible parameters in it (using braces)
* @param parameters The parameters value to replace in the URL (in the right order)
* @return The request having the URL and authorization header set
* @return The request having the URL and authentication set
*/
protected HttpRequest<Buffer> requestBuilder(HttpMethod method, String url, Object... parameters) {
var request = client.request(method, url(url, parameters).toString())
var request = client.requestAbs(method, url(url, parameters).toString())
.putHeader(HEADER_CONTENT_TYPE, HEADER_CONTENT_JSON);
getAuthentication().fillHeaders(request::putHeader);
getCookieHeader(getAuthentication()).ifPresent(value -> request.putHeader(HEADER_COOKIE, value));
return request;
}

/**
* Sends a request and returns a domain object.
* Creates a body buffer containing the given object serialized as JSON.
*
* @param request The request
* @param object The object to serialize
* @return The corresponding buffer for the request
*/
protected Buffer body(Object object) {
return Buffer.buffer(serialize(object));
}

/**
* Handles the request sent and returns a domain object.
*
* @param future The future response
* @param returnType The domain object type class
* @param <T> The domain object type
* @return A {@link CompletableFuture} with the deserialized domain object
*/
protected <T> CompletableFuture<T> sendAsync(HttpRequest<Buffer> request, Class<T> returnType) {
return sendAsync(request, objectMapper.constructType(returnType));
protected <T> CompletableFuture<T> handleAsync(Future<HttpResponse<Buffer>> future, Class<T> returnType) {
return handleAsync(future, objectMapper.constructType(returnType));
}

/**
* Sends a request and returns a domain object.
* Handles the request sent and returns a domain object.
*
* @param request The request
* @param future The future response
* @param returnType The domain object type class
* @param <T> The domain object type
* @return A {@link CompletableFuture} with the deserialized domain object
*/
protected <T> CompletableFuture<T> sendAsync(HttpRequest<Buffer> request, JavaType returnType) {
protected <T> CompletableFuture<T> handleAsync(Future<HttpResponse<Buffer>> future, JavaType returnType) {
CompletableFuture<HttpResponse<Buffer>> completableFuture = new CompletableFuture<>();
request.send()
.onSuccess(response -> handleResponse(response, completableFuture))
future.onSuccess(response -> handleResponse(response, completableFuture))
.onFailure(completableFuture::completeExceptionally);
return completableFuture.thenApply(HttpResponse::bodyAsString)
.thenApply(body -> deserialize(body, returnType));
}

/**
* Sends a request and returns an input stream.
* Handles the request sent and returns an input stream.
*
* @param request The request
* @param future The future response
* @return A {@link CompletableFuture} with the input stream
*/
protected CompletableFuture<InputStream> sendAsync(HttpRequest<Buffer> request) {
protected CompletableFuture<InputStream> handleAsync(Future<HttpResponse<Buffer>> future) {
CompletableFuture<HttpResponse<Buffer>> completableFuture = new CompletableFuture<>();
request.send()
.onSuccess(response -> handleResponse(response, completableFuture))
future.onSuccess(response -> handleResponse(response, completableFuture))
.onFailure(completableFuture::completeExceptionally);
return completableFuture.thenApply(HttpResponse::body)
.thenApply(VertxInputStream::new);
}

/**
* Handles the response by transmitting its state to the given {@link CompletableFuture}.
*
* @param response The HTTP response
* @param completableFuture The completable future to update
*/
protected void handleResponse(HttpResponse<Buffer> response, CompletableFuture<HttpResponse<Buffer>> completableFuture) {
if (response.statusCode() >= 400) {
completableFuture.completeExceptionally(responseException(response.statusCode(), response.bodyAsString()));
Expand Down
Expand Up @@ -6,6 +6,9 @@
import io.vertx.core.buffer.Buffer;
import lombok.RequiredArgsConstructor;

/**
* Input stream reading the content of a Vert.x {@link Buffer}.
*/
@RequiredArgsConstructor
public class VertxInputStream extends InputStream {

Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/chavaillaz/client/common/vertx/VertxUtils.java
@@ -1,10 +1,15 @@
package com.chavaillaz.client.common.vertx;

import static java.nio.file.Files.probeContentType;

import java.io.File;
import java.util.Optional;

import com.chavaillaz.client.common.utility.ProxyConfiguration;
import io.vertx.core.net.ProxyOptions;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.ext.web.multipart.MultipartForm;
import lombok.SneakyThrows;
import lombok.experimental.UtilityClass;

/**
Expand All @@ -30,4 +35,19 @@ public static WebClientOptions newWebClientOptions(ProxyConfiguration proxy) {
.setIdleTimeout(30_000);
}

/**
* Generates a new multipart form with the given files.
*
* @param files The list of files to include
* @return The multipart form
*/
@SneakyThrows
public static MultipartForm multipartWithFiles(File... files) {
MultipartForm form = MultipartForm.create();
for (File file : files) {
form.binaryFileUpload("file", file.getName(), file.getAbsolutePath(), probeContentType(file.toPath()));
}
return form;
}

}

0 comments on commit 4ab8730

Please sign in to comment.