Skip to content

Commit

Permalink
[FLINK-35302][rest] Ignore unknown fields in REST request deserializa…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
vancior98 committed May 13, 2024
1 parent e4972c0 commit 36b1d2a
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public abstract class AbstractHandler<

protected final Logger log = LoggerFactory.getLogger(getClass());

protected static final ObjectMapper MAPPER = RestMapperUtils.getStrictObjectMapper();
protected static final ObjectMapper MAPPER = RestMapperUtils.getFlexibleObjectMapper();

/**
* Other response payload overhead (in bytes). If we truncate response payload, we should leave
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ void testFileMultipart() throws Exception {
fileHandler.getMessageHeaders().getTargetRestEndpointURL(),
new MultipartUploadExtension.TestRequestBody());
try (Response response = client.newCall(jsonRequest).execute()) {
// JSON payload did not match expected format
assertThat(response.code()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code());
// explicitly rejected by the test handler implementation
assertThat(response.code()).isEqualTo(HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
}

Request fileRequest =
Expand All @@ -347,8 +347,9 @@ void testFileMultipart() throws Exception {
fileHandler.getMessageHeaders().getTargetRestEndpointURL(),
new MultipartUploadExtension.TestRequestBody());
try (Response response = client.newCall(mixedRequest).execute()) {
// JSON payload did not match expected format
assertThat(response.code()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code());
// unknown field in TestRequestBody is ignored
assertThat(response.code())
.isEqualTo(fileHandler.getMessageHeaders().getResponseStatusCode().code());
}

verifyNoFileIsRegisteredToDeleteOnExitHook();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.rest.FlinkHttpObjectAggregator;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.router.RouteResult;
Expand All @@ -43,10 +44,14 @@

import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpStatusClass;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
Expand All @@ -58,6 +63,7 @@

import java.io.File;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
Expand All @@ -68,6 +74,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -182,6 +189,62 @@ void testFileCleanup(@TempDir File temporaryFolder) throws Exception {
assertThat(Files.exists(file)).isFalse();
}

@Test
void testIgnoringUnknownFields() {
RestfulGateway mockRestfulGateway = new TestingRestfulGateway.Builder().build();

CompletableFuture<Void> requestProcessingCompleteFuture = new CompletableFuture<>();
TestHandler handler =
new TestHandler(requestProcessingCompleteFuture, mockGatewayRetriever);

RouteResult<?> routeResult =
new RouteResult<>("", "", Collections.emptyMap(), Collections.emptyMap(), "");
String requestBody = "{\"unknown_field_should_be_ignore\": true}";
HttpRequest request =
new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
HttpMethod.POST,
TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(),
Unpooled.wrappedBuffer(requestBody.getBytes(StandardCharsets.UTF_8)));
RoutedRequest<?> routerRequest = new RoutedRequest<>(routeResult, request);

AtomicReference<HttpResponse> response = new AtomicReference<>();

FlinkHttpObjectAggregator aggregator =
new FlinkHttpObjectAggregator(
RestOptions.SERVER_MAX_CONTENT_LENGTH.defaultValue(),
Collections.emptyMap());
ChannelPipeline pipeline = mock(ChannelPipeline.class);
when(pipeline.get(eq(FlinkHttpObjectAggregator.class))).thenReturn(aggregator);

ChannelFuture succeededFuture = mock(ChannelFuture.class);
when(succeededFuture.isSuccess()).thenReturn(true);

Attribute<FileUploads> attribute = new SimpleAttribute();
Channel channel = mock(Channel.class);
when(channel.attr(any(AttributeKey.class))).thenReturn(attribute);

ChannelHandlerContext context = mock(ChannelHandlerContext.class);
when(context.pipeline()).thenReturn(pipeline);
when(context.channel()).thenReturn(channel);
when(context.write(any()))
.thenAnswer(
invocation -> {
if (invocation.getArguments().length > 0
&& invocation.getArgument(0) instanceof HttpResponse) {
response.set(invocation.getArgument(0));
}
return succeededFuture;
});
when(context.writeAndFlush(any())).thenReturn(succeededFuture);

handler.respondAsLeader(context, routerRequest, mockRestfulGateway);
assertThat(
response.get() == null
|| response.get().status().codeClass() == HttpStatusClass.SUCCESS)
.isTrue();
}

private static class SimpleAttribute implements Attribute<FileUploads> {

private static final AttributeKey<FileUploads> KEY = AttributeKey.valueOf("test");
Expand Down

0 comments on commit 36b1d2a

Please sign in to comment.