Skip to content

Commit

Permalink
When HttpOperations#afterMarkSentHeaders throws an error in HttpOpera…
Browse files Browse the repository at this point in the history
…tions#sendObject, ensure the ByteBuf is released just once (#3246)
  • Loading branch information
violetagg committed May 16, 2024
1 parent 44e3887 commit 311b02a
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -151,13 +152,9 @@ public NettyOutbound sendObject(Object message) {
if (markSentHeaderAndBody(b)) {
HttpMessage msg = prepareHttpMessage(b);

try {
afterMarkSentHeaders();
}
catch (RuntimeException e) {
b.release();
throw e;
}
// If afterMarkSentHeaders throws an exception there is no need to release the ByteBuf here.
// It will be released by PostHeadersNettyOutbound as there are on error/cancel hooks
afterMarkSentHeaders();

return channel().writeAndFlush(msg);
}
Expand Down Expand Up @@ -472,7 +469,7 @@ HttpMessage prepareHttpMessage(ByteBuf buffer) {

static final Pattern SCHEME_PATTERN = Pattern.compile("^(https?|wss?)://.*$");

protected static final class PostHeadersNettyOutbound implements NettyOutbound, Consumer<Throwable>, Runnable {
protected static final class PostHeadersNettyOutbound extends AtomicBoolean implements NettyOutbound, Consumer<Throwable>, Runnable {

final Mono<Void> source;
final HttpOperations<?, ?> parent;
Expand All @@ -492,14 +489,14 @@ public PostHeadersNettyOutbound(Mono<Void> source, HttpOperations<?, ?> parent,

@Override
public void run() {
if (msg != null && msg.refCnt() > 0) {
if (msg != null && msg.refCnt() > 0 && compareAndSet(false, true)) {
msg.release();
}
}

@Override
public void accept(Throwable throwable) {
if (msg != null && msg.refCnt() > 0) {
if (msg != null && msg.refCnt() > 0 && compareAndSet(false, true)) {
msg.release();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,30 +23,39 @@
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.zip.GZIPInputStream;

import com.aayushatharva.brotli4j.decoder.DecoderJNI;
import com.aayushatharva.brotli4j.decoder.DirectDecompress;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.compression.Brotli;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.netty.BaseHttpTest;
import reactor.netty.DisposableServer;
import reactor.netty.SocketUtils;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerResponse;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple2;

import static org.assertj.core.api.Assertions.assertThat;
import static reactor.netty.NettyPipeline.HttpCodec;

/**
* This test class verifies HTTP compression.
Expand Down Expand Up @@ -542,13 +551,24 @@ void testIssue825_1() {
}

@Test
void testIssue825_2() {
void testIssue825SendMono() {
doTestIssue825_2((b, out) -> out.send(Mono.just(b)));
}

@Test
void testIssue825SendObject() {
doTestIssue825_2((b, out) -> out.sendObject(b));
}

private void doTestIssue825_2(BiFunction<ByteBuf, HttpServerResponse, Publisher<Void>> serverFn) {
int port1 = SocketUtils.findAvailableTcpPort();
int port2 = SocketUtils.findAvailableTcpPort();

AtomicReference<Throwable> error = new AtomicReference<>();
AtomicReference<Throwable> bufferReleasedError = new AtomicReference<>();
DisposableServer server1 = null;
DisposableServer server2 = null;
Sinks.Empty<Void> bufferReleased = Sinks.empty();
try {
server1 =
createServer(port1)
Expand All @@ -557,11 +577,33 @@ void testIssue825_2() {
})
.handle((in, out) ->
createClient(port2)
.doOnChannelInit((obs, ch, addr) ->
ch.pipeline().addAfter(HttpCodec, "doTestIssue825_2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
LastHttpContent last = null;
int expectedRefCount = 0;
if (msg instanceof LastHttpContent) {
last = (LastHttpContent) msg;
expectedRefCount = last.content().refCnt() - 1;
}
ctx.fireChannelRead(msg);
if (last != null) {
if (last.content().refCnt() == expectedRefCount) {
bufferReleased.tryEmitEmpty();
}
else {
bufferReleased.tryEmitError(new RuntimeException("The buffer is not released!"));
}
}
}
}))
.get()
.uri("/")
.responseContent()
.retain()
.flatMap(b -> out.send(Mono.just(b)))
.doOnError(bufferReleasedError::set)
.flatMap(b -> serverFn.apply(b, out))
.doOnError(error::set))
.bindNow();

Expand All @@ -579,8 +621,14 @@ void testIssue825_2() {
.expectError()
.verify(Duration.ofSeconds(30));

bufferReleased.asMono()
.as(StepVerifier::create)
.expectComplete()
.verify(Duration.ofSeconds(5));

assertThat(error.get()).isNotNull()
.isInstanceOf(RuntimeException.class);
assertThat(bufferReleasedError.get()).isNull();
}
finally {
if (server1 != null) {
Expand Down

0 comments on commit 311b02a

Please sign in to comment.