Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LOGGING] snap pipeline errors and peer reputation #6924

Merged
merged 16 commits into from May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -215,7 +215,7 @@ public void recordRequestTimeout(final int requestCode) {
.addArgument(this::getLoggableId)
.log();
LOG.trace("Timed out while waiting for response from peer {}", this);
reputation.recordRequestTimeout(requestCode).ifPresent(this::disconnect);
reputation.recordRequestTimeout(requestCode, this).ifPresent(this::disconnect);
}

public void recordUselessResponse(final String requestType) {
Expand All @@ -224,7 +224,7 @@ public void recordUselessResponse(final String requestType) {
.addArgument(requestType)
.addArgument(this::getLoggableId)
.log();
reputation.recordUselessResponse(System.currentTimeMillis()).ifPresent(this::disconnect);
reputation.recordUselessResponse(System.currentTimeMillis(), this).ifPresent(this::disconnect);
}

public void recordUsefulResponse() {
Expand Down
Expand Up @@ -62,13 +62,15 @@ public PeerReputation(final int initialScore, final int maxScore) {
this.score = initialScore;
}

public Optional<DisconnectReason> recordRequestTimeout(final int requestCode) {
public Optional<DisconnectReason> recordRequestTimeout(
final int requestCode, final EthPeer peer) {
final int newTimeoutCount = getOrCreateTimeoutCount(requestCode).incrementAndGet();
if (newTimeoutCount >= TIMEOUT_THRESHOLD) {
LOG.debug(
"Disconnection triggered by {} repeated timeouts for requestCode {}",
"Disconnection triggered by {} repeated timeouts for requestCode {} for peer {}",
newTimeoutCount,
requestCode);
requestCode,
peer.getLoggableId());
score -= LARGE_ADJUSTMENT;
return Optional.of(DisconnectReason.TIMEOUT);
} else {
Expand All @@ -89,14 +91,17 @@ public Map<Integer, AtomicInteger> timeoutCounts() {
return timeoutCountByRequestType;
}

public Optional<DisconnectReason> recordUselessResponse(final long timestamp) {
public Optional<DisconnectReason> recordUselessResponse(
final long timestamp, final EthPeer peer) {
uselessResponseTimes.add(timestamp);
while (shouldRemove(uselessResponseTimes.peek(), timestamp)) {
uselessResponseTimes.poll();
}
if (uselessResponseTimes.size() >= USELESS_RESPONSE_THRESHOLD) {
score -= LARGE_ADJUSTMENT;
LOG.debug("Disconnection triggered by exceeding useless response threshold");
LOG.debug(
"Disconnection triggered by exceeding useless response threshold for peer {}",
peer.getLoggableId());
return Optional.of(DisconnectReason.USELESS_PEER_USELESS_RESPONSES);
} else {
score -= SMALL_ADJUSTMENT;
Expand Down
Expand Up @@ -105,6 +105,14 @@ public CompletableFuture<Task<SnapDataRequest>> requestAccount(
accountDataRequest.addResponse(
worldStateProofProvider, response.accounts(), response.proofs());
}
if (error != null) {
LOG.atDebug()
.setMessage("Error handling account download accounts ({} - {}) task: {}")
.addArgument(accountDataRequest.getStartKeyHash())
.addArgument(accountDataRequest.getEndKeyHash())
.addArgument(error)
.log();
}
return requestTask;
});
}
Expand Down Expand Up @@ -167,6 +175,12 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestStorage(
LOG.error("Error while processing storage range response", e);
}
}
if (error != null) {
LOG.atDebug()
.setMessage("Error handling storage range request task: {}")
.addArgument(error)
.log();
}
return requestTasks;
});
}
Expand Down Expand Up @@ -200,6 +214,12 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestCode(
}
}
}
if (error != null) {
LOG.atDebug()
.setMessage("Error handling code request task: {}")
.addArgument(error)
.log();
}
return requestTasks;
});
}
Expand Down Expand Up @@ -240,6 +260,12 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestTrieNodeByPath(
}
}
}
if (error != null) {
LOG.atDebug()
.setMessage("Error handling trie node request task: {}")
.addArgument(error)
.log();
}
return requestTasks;
});
}
Expand Down
Expand Up @@ -52,11 +52,13 @@ void sendTransactionsToPeer(final EthPeer peer) {
LOG.atTrace()
.setMessage(
"Sending transactions to peer {} all transactions count {}, "
+ "single message transactions {}, single message list {}")
+ "single message transactions {}, single message list {}, transactions {}, AgreedCapabilities {}")
.addArgument(peer)
.addArgument(allTxToSend::size)
.addArgument(includedTransactions::size)
.addArgument(() -> toHashList(includedTransactions))
.addArgument(() -> includedTransactions)
.addArgument(peer::getAgreedCapabilities)
.log();
allTxToSend.removeAll(limitedTransactionsMessages.getIncludedTransactions());
try {
Expand Down
Expand Up @@ -17,6 +17,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason.TIMEOUT;
import static org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason.USELESS_PEER_USELESS_RESPONSES;
import static org.mockito.Mockito.mock;

import org.hyperledger.besu.ethereum.eth.messages.EthPV62;

Expand All @@ -28,6 +29,7 @@ public class PeerReputationTest {
private static final int INITIAL_SCORE = 25;
private static final int MAX_SCORE = 50;
private final PeerReputation reputation = new PeerReputation(INITIAL_SCORE, MAX_SCORE);
private final EthPeer mockEthPeer = mock(EthPeer.class);

@Test
public void shouldThrowOnInvalidInitialScore() {
Expand All @@ -37,16 +39,19 @@ public void shouldThrowOnInvalidInitialScore() {
@Test
public void shouldOnlyDisconnectWhenTimeoutLimitReached() {
sendRequestTimeouts(EthPV62.GET_BLOCK_HEADERS, PeerReputation.TIMEOUT_THRESHOLD - 1);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer))
.contains(TIMEOUT);
}

@Test
public void shouldTrackTimeoutsSeparatelyForDifferentRequestTypes() {
sendRequestTimeouts(EthPV62.GET_BLOCK_HEADERS, PeerReputation.TIMEOUT_THRESHOLD - 1);
sendRequestTimeouts(EthPV62.GET_BLOCK_BODIES, PeerReputation.TIMEOUT_THRESHOLD - 1);

assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer))
.contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES, mockEthPeer))
.contains(TIMEOUT);
}

@Test
Expand All @@ -55,14 +60,16 @@ public void shouldResetTimeoutCountForRequestType() {
sendRequestTimeouts(EthPV62.GET_BLOCK_BODIES, PeerReputation.TIMEOUT_THRESHOLD - 1);

reputation.resetTimeoutCount(EthPV62.GET_BLOCK_HEADERS);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES, mockEthPeer))
.contains(TIMEOUT);
}

@Test
public void shouldOnlyDisconnectWhenEmptyResponseThresholdReached() {
sendUselessResponses(1001, PeerReputation.USELESS_RESPONSE_THRESHOLD - 1);
assertThat(reputation.recordUselessResponse(1005)).contains(USELESS_PEER_USELESS_RESPONSES);
assertThat(reputation.recordUselessResponse(1005, mockEthPeer))
.contains(USELESS_PEER_USELESS_RESPONSES);
}

@Test
Expand All @@ -73,7 +80,7 @@ public void shouldDiscardEmptyResponseRecordsAfterTimeWindowElapses() {
// But then the next empty response doesn't come in until after the window expires on the first
assertThat(
reputation.recordUselessResponse(
1001 + PeerReputation.USELESS_RESPONSE_WINDOW_IN_MILLIS + 1))
1001 + PeerReputation.USELESS_RESPONSE_WINDOW_IN_MILLIS + 1, mockEthPeer))
.isEmpty();
}

Expand All @@ -93,13 +100,13 @@ public void shouldNotIncreaseScoreOverMax() {

private void sendRequestTimeouts(final int requestType, final int repeatCount) {
for (int i = 0; i < repeatCount; i++) {
assertThat(reputation.recordRequestTimeout(requestType)).isEmpty();
assertThat(reputation.recordRequestTimeout(requestType, mockEthPeer)).isEmpty();
}
}

private void sendUselessResponses(final long timestamp, final int repeatCount) {
for (int i = 0; i < repeatCount; i++) {
assertThat(reputation.recordUselessResponse(timestamp + i)).isEmpty();
assertThat(reputation.recordUselessResponse(timestamp + i, mockEthPeer)).isEmpty();
}
}
}