Skip to content

Commit

Permalink
more fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
Signed-off-by: OlegDokuka <odokuka@vmware.com>
  • Loading branch information
OlegDokuka committed Jun 9, 2023
1 parent 9a43436 commit ed5bb14
Show file tree
Hide file tree
Showing 14 changed files with 287 additions and 167 deletions.
Expand Up @@ -74,6 +74,10 @@ final class RequestChannelRequesterFlux extends Flux<Payload>
static final AtomicLongFieldUpdater<RequestChannelRequesterFlux> STATE =
AtomicLongFieldUpdater.newUpdater(RequestChannelRequesterFlux.class, "state");

volatile long requestN;
static final AtomicLongFieldUpdater<RequestChannelRequesterFlux> REQUEST_N =
AtomicLongFieldUpdater.newUpdater(RequestChannelRequesterFlux.class, "requestN");

int streamId;

boolean isFirstSignal = true;
Expand Down Expand Up @@ -141,18 +145,22 @@ public final void request(long n) {
}

this.requested = Operators.addCap(this.requested, n);
Operators.addCap(REQUEST_N, this, n);

long previousState = markRequestAdded(STATE, this, n, this.requesterLeaseTracker == null);
long previousState = markRequestAdded(STATE, this, this.requesterLeaseTracker == null);
if (isTerminated(previousState)) {
return;
}

if (hasRequested(previousState)) {
if (isFirstFrameSent(previousState)
&& !isMaxAllowedRequestN(extractRequestN(previousState))) {
final int streamId = this.streamId;
final ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, n);
this.connection.sendFrame(streamId, requestNFrame);
if (isFirstFrameSent(previousState)) {
long requestN = extractRequestN(REQUEST_N, this);
if (requestN > 0) {
final int streamId = this.streamId;
final ByteBuf requestNFrame =
RequestNFrameCodec.encode(this.allocator, streamId, requestN);
this.connection.sendFrame(streamId, requestNFrame);
}
}
return;
}
Expand Down Expand Up @@ -192,7 +200,7 @@ public void onNext(Payload p) {
return;
}
// TODO: check if source is Scalar | Callable | Mono
sendFirstPayload(p, extractRequestN(state), false);
sendFirstPayload(p, false);
}
} else {
sendFollowingPayload(p);
Expand All @@ -210,12 +218,11 @@ public boolean handlePermit() {
final Payload firstPayload = this.firstPayload;
this.firstPayload = null;

sendFirstPayload(
firstPayload, extractRequestN(previousState), isOutboundTerminated(previousState));
sendFirstPayload(firstPayload, isOutboundTerminated(previousState));
return true;
}

void sendFirstPayload(Payload firstPayload, long initialRequestN, boolean completed) {
void sendFirstPayload(Payload firstPayload, boolean completed) {
int mtu = this.mtu;
try {
if (!isValid(mtu, this.maxFrameLength, firstPayload, true)) {
Expand Down Expand Up @@ -304,6 +311,8 @@ void sendFirstPayload(Payload firstPayload, long initialRequestN, boolean comple
requestInterceptor.onStart(streamId, FrameType.REQUEST_CHANNEL, firstPayload.metadata());
}

long initialRequestN = extractRequestN(REQUEST_N, this);

try {
sendReleasingPayload(
streamId,
Expand Down Expand Up @@ -387,21 +396,17 @@ void sendFirstPayload(Payload firstPayload, long initialRequestN, boolean comple
connection.sendFrame(streamId, completeFrame);
}

if (isMaxAllowedRequestN(initialRequestN)) {
return;
}

long requestN = extractRequestN(previousState);
if (isMaxAllowedRequestN(requestN)) {
final ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN);
connection.sendFrame(streamId, requestNFrame);
if (initialRequestN > Integer.MAX_VALUE) {
return;
}

if (requestN > initialRequestN) {
final ByteBuf requestNFrame =
RequestNFrameCodec.encode(allocator, streamId, requestN - initialRequestN);
connection.sendFrame(streamId, requestNFrame);
long requestedTimes = requestedTimes(previousState);
if (requestedTimes > 1) {
long requestN = extractRequestN(REQUEST_N, this);
if (requestN > 0) {
final ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN);
connection.sendFrame(streamId, requestNFrame);
}
}
}

Expand Down Expand Up @@ -731,6 +736,16 @@ public final void handlePayload(Payload value) {

this.produced = produced + 1;

if (this.produced % LIMIT == 0) {
long requestN = extractRequestN(REQUEST_N, this);
if (requestN > 0) {
final int streamId = this.streamId;
final ByteBuf requestNFrame =
RequestNFrameCodec.encode(this.allocator, streamId, requestN);
this.connection.sendFrame(streamId, requestNFrame);
}
}

this.inboundSubscriber.onNext(value);
}
}
Expand Down
Expand Up @@ -73,6 +73,10 @@ final class RequestChannelResponderSubscriber extends Flux<Payload>
static final AtomicLongFieldUpdater<RequestChannelResponderSubscriber> STATE =
AtomicLongFieldUpdater.newUpdater(RequestChannelResponderSubscriber.class, "state");

volatile long requestN;
static final AtomicLongFieldUpdater<RequestChannelResponderSubscriber> REQUEST_N =
AtomicLongFieldUpdater.newUpdater(RequestChannelResponderSubscriber.class, "requestN");

Payload firstPayload;

Subscription outboundSubscription;
Expand Down Expand Up @@ -182,8 +186,9 @@ public void request(long n) {
}

this.requested = Operators.addCap(this.requested, n);
Operators.addCap(REQUEST_N, this, n);

long previousState = StateUtils.markRequestAdded(STATE, this, n);
long previousState = StateUtils.markRequestAdded(STATE, this);
if (isTerminated(previousState)) {
// full termination can be the result of both sides completion / cancelFrame / remote or local
// error
Expand Down Expand Up @@ -234,11 +239,14 @@ public void request(long n) {
}

if (hasRequested(previousState)) {
if (isFirstFrameSent(previousState)
&& !isMaxAllowedRequestN(StateUtils.extractRequestN(previousState))) {
final int streamId = this.streamId;
final ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, n);
this.connection.sendFrame(streamId, requestNFrame);
if (isFirstFrameSent(previousState)) {
long requestN = extractRequestN(REQUEST_N, this);
if (requestN > 0) {
final int streamId = this.streamId;
final ByteBuf requestNFrame =
RequestNFrameCodec.encode(this.allocator, streamId, requestN);
this.connection.sendFrame(streamId, requestNFrame);
}
}
return;
}
Expand Down Expand Up @@ -279,13 +287,13 @@ public void request(long n) {
return;
}

long requestN = StateUtils.extractRequestN(previousState);
if (isMaxAllowedRequestN(requestN)) {
long requestN = extractRequestN(REQUEST_N, this);
if (requestN == Long.MAX_VALUE) {
final int streamId = this.streamId;
final ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN);
final ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, requestN);
this.connection.sendFrame(streamId, requestNFrame);
} else {
long firstRequestN = requestN - 1;
long firstRequestN = Math.min(Integer.MAX_VALUE, requestN) - 1;
if (firstRequestN > 0) {
final int streamId = this.streamId;
final ByteBuf requestNFrame =
Expand Down Expand Up @@ -480,6 +488,16 @@ final void handlePayload(Payload p) {

this.produced = produced + 1;

if (this.produced % LIMIT == 0) {
long requestN = extractRequestN(REQUEST_N, this);
if (requestN > 0) {
final int streamId = this.streamId;
final ByteBuf requestNFrame =
RequestNFrameCodec.encode(this.allocator, streamId, requestN);
this.connection.sendFrame(streamId, requestNFrame);
}
}

this.inboundSubscriber.onNext(p);
}
}
Expand Down
Expand Up @@ -138,7 +138,7 @@ public final void request(long n) {

final RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker;
final boolean leaseEnabled = requesterLeaseTracker != null;
final long previousState = markRequestAdded(STATE, this, n, !leaseEnabled);
final long previousState = markRequestAdded(STATE, this, !leaseEnabled);

if (isTerminated(previousState) || hasRequested(previousState)) {
return;
Expand Down
Expand Up @@ -61,15 +61,15 @@ final class RequestStreamRequesterFlux extends Flux<Payload>
static final AtomicLongFieldUpdater<RequestStreamRequesterFlux> STATE =
AtomicLongFieldUpdater.newUpdater(RequestStreamRequesterFlux.class, "state");

volatile long requested;
static final AtomicLongFieldUpdater<RequestStreamRequesterFlux> REQUESTED =
AtomicLongFieldUpdater.newUpdater(RequestStreamRequesterFlux.class, "requested");

volatile long requestN;
static final AtomicLongFieldUpdater<RequestStreamRequesterFlux> REQUEST_N =
AtomicLongFieldUpdater.newUpdater(RequestStreamRequesterFlux.class, "requestN");

int streamId;
CoreSubscriber<? super Payload> inboundSubscriber;
CompositeByteBuf frames;
boolean done;
long requested;
long produced;

RequestStreamRequesterFlux(Payload payload, RequesterResponderSupport requesterResponderSupport) {
Expand Down Expand Up @@ -140,9 +140,8 @@ public final void request(long n) {
return;
}

if (Operators.addCap(REQUESTED, this, n) > 0) {
return;
}
this.requested = Operators.addCap(this.requested, n);
Operators.addCap(REQUEST_N, this, n);

final RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker;
final boolean leaseEnabled = requesterLeaseTracker != null;
Expand All @@ -153,9 +152,13 @@ public final void request(long n) {

if (hasRequested(previousState)) {
if (isFirstFrameSent(previousState)) {
final int streamId = this.streamId;
final ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, n);
this.connection.sendFrame(streamId, requestNFrame);
long requestN = extractRequestN(REQUEST_N, this);
if (requestN > 0) {
final int streamId = this.streamId;
final ByteBuf requestNFrame =
RequestNFrameCodec.encode(this.allocator, streamId, requestN);
this.connection.sendFrame(streamId, requestNFrame);
}
}
return;
}
Expand All @@ -165,7 +168,7 @@ public final void request(long n) {
return;
}

sendFirstPayload(this.payload, n);
sendFirstPayload(this.payload);
}

@Override
Expand All @@ -176,11 +179,11 @@ public boolean handlePermit() {
return false;
}

sendFirstPayload(this.payload, this.requested);
sendFirstPayload(this.payload);
return true;
}

void sendFirstPayload(Payload payload, long initialRequestN) {
void sendFirstPayload(Payload payload) {

final RequesterResponderSupport sm = this.requesterResponderSupport;
final DuplexConnection connection = this.connection;
Expand Down Expand Up @@ -213,10 +216,9 @@ void sendFirstPayload(Payload payload, long initialRequestN) {
requestInterceptor.onStart(streamId, FrameType.REQUEST_STREAM, payload.metadata());
}

long initialRequestN = extractRequestN(REQUEST_N, this);

try {
if (initialRequestN != Long.MAX_VALUE) {
REQUESTED.addAndGet()
}
sendReleasingPayload(
streamId,
FrameType.REQUEST_STREAM,
Expand Down Expand Up @@ -257,21 +259,17 @@ void sendFirstPayload(Payload payload, long initialRequestN) {
return;
}

if (isMaxAllowedRequestN(initialRequestN)) {
if (initialRequestN > Integer.MAX_VALUE) {
return;
}

long requestN = extractRequestN(previousState);
if (isMaxAllowedRequestN(requestN)) {
final ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN);
connection.sendFrame(streamId, requestNFrame);
return;
}

if (requestN > initialRequestN) {
final ByteBuf requestNFrame =
RequestNFrameCodec.encode(allocator, streamId, requestN - initialRequestN);
connection.sendFrame(streamId, requestNFrame);
long requestedTimes = requestedTimes(previousState);
if (requestedTimes > 1) {
long requestN = extractRequestN(REQUEST_N, this);
if (requestN > 0) {
final ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN);
connection.sendFrame(streamId, requestNFrame);
}
}
}

Expand Down Expand Up @@ -309,6 +307,7 @@ public final void handlePayload(Payload p) {
}

final long produced = this.produced;
// check overflow
if (this.requested == produced) {
p.release();

Expand Down Expand Up @@ -337,6 +336,15 @@ public final void handlePayload(Payload p) {

this.produced = produced + 1;

if (this.produced % LIMIT == 0) {
long requestN = extractRequestN(REQUEST_N, this);
if (requestN > 0) {
final int streamId = this.streamId;
final ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, requestN);
this.connection.sendFrame(streamId, requestNFrame);
}
}

this.inboundSubscriber.onNext(p);
}

Expand Down Expand Up @@ -444,7 +452,7 @@ public Object scanUnsafe(Attr key) {
long state = this.state;

if (key == Attr.TERMINATED) return isTerminated(state);
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return extractRequestN(state);
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return this.requested;

return null;
}
Expand Down
25 changes: 25 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/core/StateUtils.java
Expand Up @@ -476,4 +476,29 @@ static long incrementRequestField(long a) {
static boolean hasRequested(long state) {
return (state & REQUEST_MASK) > 0;
}

static int requestedTimes(long state) {
return (int) (state & REQUEST_MASK);
}

static <T> long extractRequestN(AtomicLongFieldUpdater<T> updater, T instance) {
for (; ; ) {
long requestN = updater.get(instance);

if (requestN == 0) {
return 0;
}

if (requestN == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}

long rsocketRequestN = Math.min(requestN, Integer.MAX_VALUE);
if (updater.compareAndSet(instance, requestN, (requestN - rsocketRequestN))) {
return requestN;
}
}
}

static final long LIMIT = (long) Integer.MAX_VALUE << 2;
}
Expand Up @@ -25,6 +25,6 @@ public static long requestN(ByteBuf byteBuf) {
byteBuf.skipBytes(FrameHeaderCodec.size());
int i = byteBuf.readInt();
byteBuf.resetReaderIndex();
return i == Integer.MAX_VALUE ? Long.MAX_VALUE : i;
return i;
}
}

0 comments on commit ed5bb14

Please sign in to comment.