Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
  • Loading branch information
OlegDokuka committed May 18, 2023
1 parent 5547cb1 commit 9a43436
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 30 deletions.
Expand Up @@ -142,7 +142,7 @@ public final void request(long n) {

this.requested = Operators.addCap(this.requested, n);

long previousState = addRequestN(STATE, this, n, this.requesterLeaseTracker == null);
long previousState = markRequestAdded(STATE, this, n, this.requesterLeaseTracker == null);
if (isTerminated(previousState)) {
return;
}
Expand Down
Expand Up @@ -183,7 +183,7 @@ public void request(long n) {

this.requested = Operators.addCap(this.requested, n);

long previousState = StateUtils.addRequestN(STATE, this, n);
long previousState = StateUtils.markRequestAdded(STATE, this, n);
if (isTerminated(previousState)) {
// full termination can be the result of both sides completion / cancelFrame / remote or local
// error
Expand Down
Expand Up @@ -138,7 +138,7 @@ public final void request(long n) {

final RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker;
final boolean leaseEnabled = requesterLeaseTracker != null;
final long previousState = addRequestN(STATE, this, n, !leaseEnabled);
final long previousState = markRequestAdded(STATE, this, n, !leaseEnabled);

if (isTerminated(previousState) || hasRequested(previousState)) {
return;
Expand Down
Expand Up @@ -61,11 +61,15 @@ final class RequestStreamRequesterFlux extends Flux<Payload>
static final AtomicLongFieldUpdater<RequestStreamRequesterFlux> STATE =
AtomicLongFieldUpdater.newUpdater(RequestStreamRequesterFlux.class, "state");

volatile long requested;
static final AtomicLongFieldUpdater<RequestStreamRequesterFlux> REQUESTED =
AtomicLongFieldUpdater.newUpdater(RequestStreamRequesterFlux.class, "requested");


int streamId;
CoreSubscriber<? super Payload> inboundSubscriber;
CompositeByteBuf frames;
boolean done;
long requested;
long produced;

RequestStreamRequesterFlux(Payload payload, RequesterResponderSupport requesterResponderSupport) {
Expand Down Expand Up @@ -136,18 +140,19 @@ public final void request(long n) {
return;
}

this.requested = Operators.addCap(this.requested, n);
if (Operators.addCap(REQUESTED, this, n) > 0) {
return;
}

final RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker;
final boolean leaseEnabled = requesterLeaseTracker != null;
final long previousState = addRequestN(STATE, this, n, !leaseEnabled);
final long previousState = markRequestAdded(STATE, this, !leaseEnabled);
if (isTerminated(previousState)) {
return;
}

if (hasRequested(previousState)) {
if (isFirstFrameSent(previousState)
&& !isMaxAllowedRequestN(extractRequestN(previousState))) {
if (isFirstFrameSent(previousState)) {
final int streamId = this.streamId;
final ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, n);
this.connection.sendFrame(streamId, requestNFrame);
Expand All @@ -171,7 +176,7 @@ public boolean handlePermit() {
return false;
}

sendFirstPayload(this.payload, extractRequestN(previousState));
sendFirstPayload(this.payload, this.requested);
return true;
}

Expand Down Expand Up @@ -209,6 +214,9 @@ void sendFirstPayload(Payload payload, long initialRequestN) {
}

try {
if (initialRequestN != Long.MAX_VALUE) {
REQUESTED.addAndGet()
}
sendReleasingPayload(
streamId,
FrameType.REQUEST_STREAM,
Expand Down
28 changes: 7 additions & 21 deletions rsocket-core/src/main/java/io/rsocket/core/StateUtils.java
Expand Up @@ -437,12 +437,12 @@ static boolean isSubscribedOrTerminated(long state) {
return state == TERMINATED_STATE || (state & SUBSCRIBED_FLAG) == SUBSCRIBED_FLAG;
}

static <T> long addRequestN(AtomicLongFieldUpdater<T> updater, T instance, long toAdd) {
return addRequestN(updater, instance, toAdd, false);
static <T> long markRequestAdded(AtomicLongFieldUpdater<T> updater, T instance) {
return markRequestAdded(updater, instance, false);
}

static <T> long addRequestN(
AtomicLongFieldUpdater<T> updater, T instance, long toAdd, boolean markPrepared) {
static <T> long markRequestAdded(
AtomicLongFieldUpdater<T> updater, T instance, boolean markPrepared) {
long currentState, flags, requestN, nextRequestN;
for (; ; ) {
currentState = updater.get(instance);
Expand All @@ -457,16 +457,16 @@ static <T> long addRequestN(
}

flags = (currentState & FLAGS_MASK) | (markPrepared ? READY_TO_SEND_FIRST_FRAME_FLAG : 0);
nextRequestN = addRequestN(requestN, toAdd);
nextRequestN = incrementRequestField(requestN);

if (updater.compareAndSet(instance, currentState, nextRequestN | flags)) {
return currentState;
}
}
}

static long addRequestN(long a, long b) {
long res = a + b;
static long incrementRequestField(long a) {
long res = a + 1;
if (res < 0 || res > REQUEST_MASK) {
return REQUEST_MASK;
}
Expand All @@ -476,18 +476,4 @@ static long addRequestN(long a, long b) {
static boolean hasRequested(long state) {
return (state & REQUEST_MASK) > 0;
}

static long extractRequestN(long state) {
long requestN = state & REQUEST_MASK;

if (requestN == REQUEST_MASK) {
return REQUEST_MASK;
}

return requestN;
}

static boolean isMaxAllowedRequestN(long n) {
return n >= REQUEST_MASK;
}
}

0 comments on commit 9a43436

Please sign in to comment.