Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Apr 25, 2024
1 parent 7cb225e commit cf49232
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,9 @@ private Mono<Void> updateDisposition(String deliveryTag, DeliveryState deliveryS
// RecoverableReactorReceiver, Or downstream cancels parent RecoverableReactorReceiver.
// 1 & 2 means we don't have to read the 'volatile' variables 'parent.done', 'parent.cancelled'
//
final String state
= String.format("[link.done:%b link.cancelled:%b parent.done:%b parent.cancelled:%b]", done,
s == CANCELLED_SUBSCRIPTION, parent.done, parent.cancelled);

final DeliveryNotOnLinkException dispositionError
= DeliveryNotOnLinkException.linkClosed(deliveryTag, deliveryState);
Expand All @@ -994,16 +997,10 @@ private Mono<Void> updateDisposition(String deliveryTag, DeliveryState deliveryS
if (upstreamError != null) {
dispositionError.addSuppressed(upstreamError);
}

LoggingEventBuilder log = logger.atError()
return monoError(logger.atError()
.addKeyValue(DELIVERY_TAG_KEY, deliveryTag)
.addKeyValue(DELIVERY_STATE_KEY, deliveryState)
.addKeyValue("link.done", done)
.addKeyValue("link.cancelled", s == CANCELLED_SUBSCRIPTION)
.addKeyValue("parent.done", parent.done)
.addKeyValue("parent.cancelled", parent.cancelled);

return monoError(log, dispositionError);
.addKeyValue("messageFluxState", state), dispositionError);
}
return receiver.updateDisposition(deliveryTag, deliveryState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void run(Selectable selectable) {
ioSignal.sink().close();
}
} catch (IOException ioException) {
logger.warning("CloseHandler.sink().close() failed with an error.", ioException);
logger.error("CloseHandler.sink().close() failed with an error.", ioException);
}

workScheduler.run(null);
Expand All @@ -257,7 +257,7 @@ public void run(Selectable selectable) {
ioSignal.source().close();
}
} catch (IOException ioException) {
logger.warning("CloseHandler.source().close() failed with an error.", ioException);
logger.error("CloseHandler.source().close() failed with an error.", ioException);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ protected Mono<AmqpReceiveLink> createConsumer(String linkName, String entityPat
.addKeyValue(ENTITY_PATH_KEY, entityPath)
.addKeyValue(LINK_NAME_KEY, linkName);

return monoError(logBuilder, Exceptions.propagate(new AmqpException(true,
"Cannot create receive link from a closed session.", sessionHandler.getErrorContext())));
return monoError(logBuilder, new AmqpException(true,
"Cannot create receive link from a closed session.", sessionHandler.getErrorContext()));
}

final LinkSubscription<AmqpReceiveLink> existingLink = openReceiveLinks.get(linkName);
Expand Down Expand Up @@ -448,8 +448,8 @@ private Mono<AmqpSendLink> createProducer(String linkName, String entityPath,
.addKeyValue(ENTITY_PATH_KEY, entityPath)
.addKeyValue(LINK_NAME_KEY, linkName);

return monoError(logBuilder, Exceptions.propagate(new AmqpException(true,
"Cannot create send link from a closed session.", sessionHandler.getErrorContext())));
return monoError(logBuilder, new AmqpException(true,
"Cannot create send link from a closed session.", sessionHandler.getErrorContext()));
}

final LinkSubscription<AmqpSendLink> existing = openSendLinks.get(linkName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,9 @@ public Flux<RuleProperties> listRules() {
} else if (statusCode == AmqpResponseCode.NO_CONTENT) {
list = Collections.emptyList();
} else {
throw logger.logExceptionAsWarning(Exceptions.propagate(new AmqpException(true,
throw logger.logExceptionAsWarning(new AmqpException(true,
"Get rules response error. Could not get rules.",
getErrorContext())));
getErrorContext()));
}

return Flux.fromIterable(list);
Expand Down

0 comments on commit cf49232

Please sign in to comment.