Skip to content

Commit

Permalink
fixes with UnboundedProcessor and TCP / WS connections design
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
Signed-off-by: OlegDokuka <odokuka@vmware.com>
  • Loading branch information
OlegDokuka committed Dec 1, 2022
1 parent 608c9eb commit 5b0f592
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 183 deletions.
Expand Up @@ -388,7 +388,7 @@ private void terminate(Throwable e) {
requesterLeaseTracker.dispose(e);
}

final Collection<FrameHandler> activeStreamsCopy;
final Collection<FrameHandler> activeStreamsCopy; // in case of graceful shut down is empty
synchronized (this) {
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
activeStreamsCopy = new ArrayList<>(activeStreams.values());
Expand Down
Expand Up @@ -135,7 +135,8 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {

public synchronized boolean add(int streamId, FrameHandler frameHandler) {
if (this.terminating) {
throw new CanceledException("Disposed");
throw new CanceledException(
"This RSocket is either disposed or disposing, and no longer accepting new requests");
}

final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
Expand Down
Expand Up @@ -27,8 +27,9 @@ public abstract class BaseDuplexConnection implements DuplexConnection {
protected final UnboundedProcessor sender =
new UnboundedProcessor(
() -> {
System.out.println("queue is done");
onClose.tryEmitEmpty();
dispose();
// dispose();
},
(__) -> {});

Expand Down
Expand Up @@ -203,8 +203,8 @@ public boolean tryEmitFinal(ByteBuf t) {
return false;
}

this.done = true;
this.last = t;
this.done = true;

final long previousState = markValueAddedAndTerminated(this);
if (isFinalized(previousState)) {
Expand All @@ -216,6 +216,7 @@ public boolean tryEmitFinal(ByteBuf t) {
if (this.outputFused) {
// fast path for fusion
this.actual.onNext(null);
this.actual.onComplete();
return true;
}

Expand Down
Expand Up @@ -29,7 +29,6 @@
import io.rsocket.frame.ResumeOkFrameCodec;
import io.rsocket.keepalive.KeepAliveSupport;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -189,7 +188,7 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
}
final ConnectionErrorException connectionErrorException =
new ConnectionErrorException("RESUME_OK frame must be received before any others");
resumableConnection.dispose(connectionErrorException);
resumableConnection.dispose(nextDuplexConnection, connectionErrorException);
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
nextDuplexConnection.receive().subscribe().dispose();

Expand Down Expand Up @@ -227,7 +226,7 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
}
final ConnectionErrorException t = new ConnectionErrorException(e.getMessage(), e);

resumableConnection.dispose(t);
resumableConnection.dispose(nextDuplexConnection, t);

nextDuplexConnection.sendErrorAndClose(t);
nextDuplexConnection.receive().subscribe().dispose();
Expand Down Expand Up @@ -278,7 +277,7 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
final ConnectionErrorException connectionErrorException =
new ConnectionErrorException("resumption_server_pos=[" + remoteImpliedPos + "]");

resumableConnection.dispose(connectionErrorException);
resumableConnection.dispose(nextDuplexConnection, connectionErrorException);

nextDuplexConnection.sendErrorAndClose(connectionErrorException);
nextDuplexConnection.receive().subscribe().dispose();
Expand All @@ -292,7 +291,7 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
exception);
}
if (exception instanceof RejectedResumeException) {
resumableConnection.dispose(exception);
resumableConnection.dispose(nextDuplexConnection, exception);
nextDuplexConnection.dispose();
nextDuplexConnection.receive().subscribe().dispose();
return;
Expand All @@ -309,7 +308,7 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
final ConnectionErrorException connectionErrorException =
new ConnectionErrorException("RESUME_OK frame must be received before any others");

resumableConnection.dispose(connectionErrorException);
resumableConnection.dispose(nextDuplexConnection, connectionErrorException);

nextDuplexConnection.sendErrorAndClose(connectionErrorException);
nextDuplexConnection.receive().subscribe().dispose();
Expand Down Expand Up @@ -349,11 +348,7 @@ public void onError(Throwable t) {
Operators.onErrorDropped(t, currentContext());
}

if (t instanceof TimeoutException) {
resumableConnection.dispose();
} else {
resumableConnection.dispose(t);
}
resumableConnection.dispose();
}

@Override
Expand Down
Expand Up @@ -21,8 +21,8 @@
import io.netty.util.CharsetUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import io.rsocket.exceptions.ConnectionCloseException;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.internal.UnboundedProcessor;
import java.net.SocketAddress;
Expand Down Expand Up @@ -50,8 +50,8 @@ public class ResumableDuplexConnection extends Flux<ByteBuf>
final ResumableFramesStore resumableFramesStore;

final UnboundedProcessor savableFramesSender;
final Disposable framesSaverDisposable;
final Sinks.Empty<Void> onClose;
final Sinks.Empty<Void> onQueueClose;
final Sinks.Empty<Void> onLastConnectionClose;
final SocketAddress remoteAddress;
final Sinks.Many<Integer> onConnectionClosedSink;

Expand Down Expand Up @@ -79,11 +79,13 @@ public ResumableDuplexConnection(
this.session = session.toString(CharsetUtil.UTF_8);
this.onConnectionClosedSink = Sinks.unsafe().many().unicast().onBackpressureBuffer();
this.resumableFramesStore = resumableFramesStore;
this.savableFramesSender = new UnboundedProcessor();
this.framesSaverDisposable = resumableFramesStore.saveFrames(savableFramesSender).subscribe();
this.onClose = Sinks.empty();
this.onQueueClose = Sinks.unsafe().empty();
this.onLastConnectionClose = Sinks.unsafe().empty();
this.savableFramesSender = new UnboundedProcessor(onQueueClose::tryEmitEmpty, __ -> {});
this.remoteAddress = initialConnection.remoteAddress();

resumableFramesStore.saveFrames(savableFramesSender).subscribe();

ACTIVE_CONNECTION.lazySet(this, initialConnection);
}

Expand Down Expand Up @@ -120,10 +122,12 @@ void initConnection(DuplexConnection nextConnection) {
.resumeStream()
.subscribe(
f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f),
t -> sendErrorAndClose(new ConnectionErrorException(t.getMessage())),
t ->
nextConnection.sendErrorAndClose(
new ConnectionErrorException(t.getMessage(), t)),
() ->
sendErrorAndClose(
new ConnectionCloseException("Connection Closed Unexpectedly")));
nextConnection.sendErrorAndClose(
new ConnectionErrorException("Connection Closed Unexpectedly")));
nextConnection.receive().subscribe(frameReceivingSubscriber);
nextConnection
.onClose()
Expand Down Expand Up @@ -161,9 +165,9 @@ public void disconnect() {
@Override
public void sendFrame(int streamId, ByteBuf frame) {
if (streamId == 0) {
savableFramesSender.onNextPrioritized(frame);
savableFramesSender.tryEmitPrioritized(frame);
} else {
savableFramesSender.onNext(frame);
savableFramesSender.tryEmitNormal(frame);
}
}

Expand All @@ -184,32 +188,25 @@ public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
return;
}

activeConnection.sendErrorAndClose(rSocketErrorException);
savableFramesSender.tryEmitFinal(
ErrorFrameCodec.encode(activeConnection.alloc(), 0, rSocketErrorException));

activeConnection
.onClose()
.subscribe(
null,
t -> {
framesSaverDisposable.dispose();
activeReceivingSubscriber.dispose();
savableFramesSender.onComplete();
savableFramesSender.cancel();
onConnectionClosedSink.tryEmitComplete();

onClose.tryEmitError(t);
onLastConnectionClose.tryEmitEmpty();
},
() -> {
framesSaverDisposable.dispose();
activeReceivingSubscriber.dispose();
savableFramesSender.onComplete();
savableFramesSender.cancel();
onConnectionClosedSink.tryEmitComplete();

final Throwable cause = rSocketErrorException.getCause();
if (cause == null) {
onClose.tryEmitEmpty();
onLastConnectionClose.tryEmitEmpty();
} else {
onClose.tryEmitError(cause);
onLastConnectionClose.tryEmitError(cause);
}
});
}
Expand All @@ -226,50 +223,62 @@ public ByteBufAllocator alloc() {

@Override
public Mono<Void> onClose() {
return onClose.asMono();
return Mono.whenDelayError(
onQueueClose.asMono().log(side + "_queue"),
resumableFramesStore.onClose().log(side + "_frame_store"),
onLastConnectionClose.asMono().log(side + "_last_connection"));
}

@Override
public void dispose() {
dispose(null);
}

void dispose(@Nullable Throwable e) {
logger.info(side + "_disposing");
final DuplexConnection activeConnection =
ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE);
if (activeConnection == DisposedConnection.INSTANCE) {
return;
}

if (activeConnection != null) {
activeConnection.dispose();
}

if (logger.isDebugEnabled()) {
logger.debug(
"Side[{}]|Session[{}]|DuplexConnection[{}]. Disposing...",
side,
session,
connectionIndex);
}

framesSaverDisposable.dispose();
activeReceivingSubscriber.dispose();
logger.info(side + "_disposing2");
savableFramesSender.onComplete();
savableFramesSender.cancel();
onConnectionClosedSink.tryEmitComplete();
activeConnection
.onClose()
.subscribe(
null,
t -> {
onConnectionClosedSink.tryEmitComplete();
onLastConnectionClose.tryEmitEmpty();
},
() -> {
onConnectionClosedSink.tryEmitComplete();
onLastConnectionClose.tryEmitEmpty();
});
}

if (e != null) {
onClose.tryEmitError(e);
} else {
onClose.tryEmitEmpty();
void dispose(DuplexConnection nextConnection, @Nullable Throwable e) {
final DuplexConnection activeConnection =
ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE);
if (activeConnection == DisposedConnection.INSTANCE) {
return;
}
savableFramesSender.onComplete();
nextConnection
.onClose()
.subscribe(
null,
t -> {
onConnectionClosedSink.tryEmitComplete();
onLastConnectionClose.tryEmitEmpty();
},
() -> {
onConnectionClosedSink.tryEmitComplete();
onLastConnectionClose.tryEmitEmpty();
});
}

@Override
@SuppressWarnings("ConstantConditions")
public boolean isDisposed() {
return onClose.scan(Scannable.Attr.TERMINATED) || onClose.scan(Scannable.Attr.CANCELLED);
return onQueueClose.scan(Scannable.Attr.TERMINATED)
|| onQueueClose.scan(Scannable.Attr.CANCELLED);
}

@Override
Expand All @@ -280,6 +289,7 @@ public SocketAddress remoteAddress() {
@Override
public void request(long n) {
if (state == 1 && STATE.compareAndSet(this, 1, 2)) {
// happens for the very first time with the initial connection
initConnection(this.activeConnection);
}
}
Expand Down
Expand Up @@ -29,35 +29,39 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Mono;

public final class ClientStreamingToServer {

private static final Logger logger = LoggerFactory.getLogger(ClientStreamingToServer.class);

public static void main(String[] args) throws InterruptedException {
RSocketServer.create(
SocketAcceptor.with(
new RSocket() {
final Sinks.Empty<Void> onGracefulShutdownSink = Sinks.unsafe().empty();
(setup, sendingSocket) -> {
sendingSocket.disposeGracefully();

@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.interval(Duration.ofMillis(100))
.takeUntilOther(onGracefulShutdownSink.asMono())
.map(aLong -> DefaultPayload.create("Interval: " + aLong));
}
return Mono.just(
new RSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.interval(Duration.ofMillis(100))
.map(aLong -> DefaultPayload.create("Interval: " + aLong));
}

@Override
public void disposeGracefully() {
// can be intercepted there
// onGracefulShutdownSink.tryEmitEmpty();
}
}))
@Override
public void disposeGracefully() {}
});
})
.bindNow(TcpServerTransport.create("localhost", 7000));

RSocket socket =
RSocketConnector.create()
.acceptor(
SocketAcceptor.with(
new RSocket() {
@Override
public void disposeGracefully() {}
}))
.setupPayload(DefaultPayload.create("test", "test"))
.connect(TcpClientTransport.create("localhost", 7000))
.block();
Expand All @@ -73,11 +77,14 @@ public void disposeGracefully() {
logger.debug(msg);
counter.incrementAndGet();
})
.take(100)
.subscribe();

logger.debug("dispose gracefully");
socket.disposeGracefully();
//
// Mono.delay(Duration.ofSeconds(10))
// .doFinally((__) -> socket.dispose())
// .subscribe();

socket.onClose().block();

Expand Down

0 comments on commit 5b0f592

Please sign in to comment.