Skip to content

Commit

Permalink
[LOGGING] snap pipeline errors and peer reputation (#6924)
Browse files Browse the repository at this point in the history
* add logs for snap errors

Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>

* add logs for requests that are returning with errors

Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>

* spotless

Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>

* turn down the logging

Signed-off-by: Sally MacFarlane <macfarla.github@gmail.com>

* transaction message sender logging

Signed-off-by: Jason Frame <jason.frame@consensys.net>

* log peer with peer reputation event

Signed-off-by: Sally MacFarlane <macfarla.github@gmail.com>

* formatting

Signed-off-by: Sally MacFarlane <macfarla.github@gmail.com>

---------

Signed-off-by: Gabriel Fukushima <gabrielfukushima@gmail.com>
Signed-off-by: Sally MacFarlane <macfarla.github@gmail.com>
Signed-off-by: Jason Frame <jason.frame@consensys.net>
Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
Co-authored-by: Jason Frame <jason.frame@consensys.net>
  • Loading branch information
3 people committed May 8, 2024
1 parent 892fc5e commit 8601438
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 17 deletions.
Expand Up @@ -215,7 +215,7 @@ public void recordRequestTimeout(final int requestCode) {
.addArgument(this::getLoggableId)
.log();
LOG.trace("Timed out while waiting for response from peer {}", this);
reputation.recordRequestTimeout(requestCode).ifPresent(this::disconnect);
reputation.recordRequestTimeout(requestCode, this).ifPresent(this::disconnect);
}

public void recordUselessResponse(final String requestType) {
Expand All @@ -224,7 +224,7 @@ public void recordUselessResponse(final String requestType) {
.addArgument(requestType)
.addArgument(this::getLoggableId)
.log();
reputation.recordUselessResponse(System.currentTimeMillis()).ifPresent(this::disconnect);
reputation.recordUselessResponse(System.currentTimeMillis(), this).ifPresent(this::disconnect);
}

public void recordUsefulResponse() {
Expand Down
Expand Up @@ -62,13 +62,15 @@ public PeerReputation(final int initialScore, final int maxScore) {
this.score = initialScore;
}

public Optional<DisconnectReason> recordRequestTimeout(final int requestCode) {
public Optional<DisconnectReason> recordRequestTimeout(
final int requestCode, final EthPeer peer) {
final int newTimeoutCount = getOrCreateTimeoutCount(requestCode).incrementAndGet();
if (newTimeoutCount >= TIMEOUT_THRESHOLD) {
LOG.debug(
"Disconnection triggered by {} repeated timeouts for requestCode {}",
"Disconnection triggered by {} repeated timeouts for requestCode {} for peer {}",
newTimeoutCount,
requestCode);
requestCode,
peer.getLoggableId());
score -= LARGE_ADJUSTMENT;
return Optional.of(DisconnectReason.TIMEOUT);
} else {
Expand All @@ -89,14 +91,17 @@ public Map<Integer, AtomicInteger> timeoutCounts() {
return timeoutCountByRequestType;
}

public Optional<DisconnectReason> recordUselessResponse(final long timestamp) {
public Optional<DisconnectReason> recordUselessResponse(
final long timestamp, final EthPeer peer) {
uselessResponseTimes.add(timestamp);
while (shouldRemove(uselessResponseTimes.peek(), timestamp)) {
uselessResponseTimes.poll();
}
if (uselessResponseTimes.size() >= USELESS_RESPONSE_THRESHOLD) {
score -= LARGE_ADJUSTMENT;
LOG.debug("Disconnection triggered by exceeding useless response threshold");
LOG.debug(
"Disconnection triggered by exceeding useless response threshold for peer {}",
peer.getLoggableId());
return Optional.of(DisconnectReason.USELESS_PEER_USELESS_RESPONSES);
} else {
score -= SMALL_ADJUSTMENT;
Expand Down
Expand Up @@ -105,6 +105,14 @@ public CompletableFuture<Task<SnapDataRequest>> requestAccount(
accountDataRequest.addResponse(
worldStateProofProvider, response.accounts(), response.proofs());
}
if (error != null) {
LOG.atDebug()
.setMessage("Error handling account download accounts ({} - {}) task: {}")
.addArgument(accountDataRequest.getStartKeyHash())
.addArgument(accountDataRequest.getEndKeyHash())
.addArgument(error)
.log();
}
return requestTask;
});
}
Expand Down Expand Up @@ -167,6 +175,12 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestStorage(
LOG.error("Error while processing storage range response", e);
}
}
if (error != null) {
LOG.atDebug()
.setMessage("Error handling storage range request task: {}")
.addArgument(error)
.log();
}
return requestTasks;
});
}
Expand Down Expand Up @@ -200,6 +214,12 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestCode(
}
}
}
if (error != null) {
LOG.atDebug()
.setMessage("Error handling code request task: {}")
.addArgument(error)
.log();
}
return requestTasks;
});
}
Expand Down Expand Up @@ -240,6 +260,12 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestTrieNodeByPath(
}
}
}
if (error != null) {
LOG.atDebug()
.setMessage("Error handling trie node request task: {}")
.addArgument(error)
.log();
}
return requestTasks;
});
}
Expand Down
Expand Up @@ -52,11 +52,13 @@ void sendTransactionsToPeer(final EthPeer peer) {
LOG.atTrace()
.setMessage(
"Sending transactions to peer {} all transactions count {}, "
+ "single message transactions {}, single message list {}")
+ "single message transactions {}, single message list {}, transactions {}, AgreedCapabilities {}")
.addArgument(peer)
.addArgument(allTxToSend::size)
.addArgument(includedTransactions::size)
.addArgument(() -> toHashList(includedTransactions))
.addArgument(() -> includedTransactions)
.addArgument(peer::getAgreedCapabilities)
.log();
allTxToSend.removeAll(limitedTransactionsMessages.getIncludedTransactions());
try {
Expand Down
Expand Up @@ -17,6 +17,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason.TIMEOUT;
import static org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason.USELESS_PEER_USELESS_RESPONSES;
import static org.mockito.Mockito.mock;

import org.hyperledger.besu.ethereum.eth.messages.EthPV62;

Expand All @@ -28,6 +29,7 @@ public class PeerReputationTest {
private static final int INITIAL_SCORE = 25;
private static final int MAX_SCORE = 50;
private final PeerReputation reputation = new PeerReputation(INITIAL_SCORE, MAX_SCORE);
private final EthPeer mockEthPeer = mock(EthPeer.class);

@Test
public void shouldThrowOnInvalidInitialScore() {
Expand All @@ -37,16 +39,19 @@ public void shouldThrowOnInvalidInitialScore() {
@Test
public void shouldOnlyDisconnectWhenTimeoutLimitReached() {
sendRequestTimeouts(EthPV62.GET_BLOCK_HEADERS, PeerReputation.TIMEOUT_THRESHOLD - 1);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer))
.contains(TIMEOUT);
}

@Test
public void shouldTrackTimeoutsSeparatelyForDifferentRequestTypes() {
sendRequestTimeouts(EthPV62.GET_BLOCK_HEADERS, PeerReputation.TIMEOUT_THRESHOLD - 1);
sendRequestTimeouts(EthPV62.GET_BLOCK_BODIES, PeerReputation.TIMEOUT_THRESHOLD - 1);

assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer))
.contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES, mockEthPeer))
.contains(TIMEOUT);
}

@Test
Expand All @@ -55,14 +60,16 @@ public void shouldResetTimeoutCountForRequestType() {
sendRequestTimeouts(EthPV62.GET_BLOCK_BODIES, PeerReputation.TIMEOUT_THRESHOLD - 1);

reputation.resetTimeoutCount(EthPV62.GET_BLOCK_HEADERS);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES, mockEthPeer))
.contains(TIMEOUT);
}

@Test
public void shouldOnlyDisconnectWhenEmptyResponseThresholdReached() {
sendUselessResponses(1001, PeerReputation.USELESS_RESPONSE_THRESHOLD - 1);
assertThat(reputation.recordUselessResponse(1005)).contains(USELESS_PEER_USELESS_RESPONSES);
assertThat(reputation.recordUselessResponse(1005, mockEthPeer))
.contains(USELESS_PEER_USELESS_RESPONSES);
}

@Test
Expand All @@ -73,7 +80,7 @@ public void shouldDiscardEmptyResponseRecordsAfterTimeWindowElapses() {
// But then the next empty response doesn't come in until after the window expires on the first
assertThat(
reputation.recordUselessResponse(
1001 + PeerReputation.USELESS_RESPONSE_WINDOW_IN_MILLIS + 1))
1001 + PeerReputation.USELESS_RESPONSE_WINDOW_IN_MILLIS + 1, mockEthPeer))
.isEmpty();
}

Expand All @@ -93,13 +100,13 @@ public void shouldNotIncreaseScoreOverMax() {

private void sendRequestTimeouts(final int requestType, final int repeatCount) {
for (int i = 0; i < repeatCount; i++) {
assertThat(reputation.recordRequestTimeout(requestType)).isEmpty();
assertThat(reputation.recordRequestTimeout(requestType, mockEthPeer)).isEmpty();
}
}

private void sendUselessResponses(final long timestamp, final int repeatCount) {
for (int i = 0; i < repeatCount; i++) {
assertThat(reputation.recordUselessResponse(timestamp + i)).isEmpty();
assertThat(reputation.recordUselessResponse(timestamp + i, mockEthPeer)).isEmpty();
}
}
}

0 comments on commit 8601438

Please sign in to comment.