Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Peer Logging #6831

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -215,7 +215,7 @@ public void recordRequestTimeout(final int requestCode) {
.addArgument(this::getLoggableId)
.log();
LOG.trace("Timed out while waiting for response from peer {}", this);
reputation.recordRequestTimeout(requestCode).ifPresent(this::disconnect);
reputation.recordRequestTimeout(requestCode, this).ifPresent(this::disconnect);
}

public void recordUselessResponse(final String requestType) {
Expand All @@ -224,7 +224,7 @@ public void recordUselessResponse(final String requestType) {
.addArgument(requestType)
.addArgument(this::getLoggableId)
.log();
reputation.recordUselessResponse(System.currentTimeMillis()).ifPresent(this::disconnect);
reputation.recordUselessResponse(System.currentTimeMillis(), this).ifPresent(this::disconnect);
}

public void recordUsefulResponse() {
Expand Down
Expand Up @@ -62,13 +62,15 @@ public PeerReputation(final int initialScore, final int maxScore) {
this.score = initialScore;
}

public Optional<DisconnectReason> recordRequestTimeout(final int requestCode) {
public Optional<DisconnectReason> recordRequestTimeout(
final int requestCode, final EthPeer peer) {
final int newTimeoutCount = getOrCreateTimeoutCount(requestCode).incrementAndGet();
if (newTimeoutCount >= TIMEOUT_THRESHOLD) {
LOG.debug(
"Disconnection triggered by {} repeated timeouts for requestCode {}",
"Disconnection triggered by {} repeated timeouts for requestCode {} for peer {}",
newTimeoutCount,
requestCode);
requestCode,
peer.getLoggableId());
score -= LARGE_ADJUSTMENT;
return Optional.of(DisconnectReason.TIMEOUT);
} else {
Expand All @@ -89,14 +91,17 @@ public Map<Integer, AtomicInteger> timeoutCounts() {
return timeoutCountByRequestType;
}

public Optional<DisconnectReason> recordUselessResponse(final long timestamp) {
public Optional<DisconnectReason> recordUselessResponse(
final long timestamp, final EthPeer peer) {
uselessResponseTimes.add(timestamp);
while (shouldRemove(uselessResponseTimes.peek(), timestamp)) {
uselessResponseTimes.poll();
}
if (uselessResponseTimes.size() >= USELESS_RESPONSE_THRESHOLD) {
score -= LARGE_ADJUSTMENT;
LOG.debug("Disconnection triggered by exceeding useless response threshold");
LOG.debug(
"Disconnection triggered by exceeding useless response threshold for peer {}",
peer.getLoggableId());
return Optional.of(DisconnectReason.USELESS_PEER);
} else {
score -= SMALL_ADJUSTMENT;
Expand Down
Expand Up @@ -52,11 +52,13 @@ void sendTransactionsToPeer(final EthPeer peer) {
LOG.atTrace()
.setMessage(
"Sending transactions to peer {} all transactions count {}, "
+ "single message transactions {}, single message list {}")
+ "single message transactions {}, single message list {}, transactions {}, AgreedCapabilities {}")
.addArgument(peer)
.addArgument(allTxToSend::size)
.addArgument(includedTransactions::size)
.addArgument(() -> toHashList(includedTransactions))
.addArgument(() -> includedTransactions)
.addArgument(peer::getAgreedCapabilities)
.log();
allTxToSend.removeAll(limitedTransactionsMessages.getIncludedTransactions());
try {
Expand Down
Expand Up @@ -17,6 +17,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason.TIMEOUT;
import static org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason.USELESS_PEER;
import static org.mockito.Mockito.mock;

import org.hyperledger.besu.ethereum.eth.messages.EthPV62;

Expand All @@ -28,6 +29,7 @@ public class PeerReputationTest {
private static final int INITIAL_SCORE = 25;
private static final int MAX_SCORE = 50;
private final PeerReputation reputation = new PeerReputation(INITIAL_SCORE, MAX_SCORE);
private final EthPeer mockEthPeer = mock(EthPeer.class);

@Test
public void shouldThrowOnInvalidInitialScore() {
Expand All @@ -36,56 +38,60 @@ public void shouldThrowOnInvalidInitialScore() {

@Test
public void shouldOnlyDisconnectWhenTimeoutLimitReached() {
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer))
.contains(TIMEOUT);
}

@Test
public void shouldTrackTimeoutsSeparatelyForDifferentRequestTypes() {
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();

assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES, mockEthPeer)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES, mockEthPeer)).isEmpty();

assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer))
.contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES, mockEthPeer))
.contains(TIMEOUT);
}

@Test
public void shouldResetTimeoutCountForRequestType() {
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer)).isEmpty();

assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES, mockEthPeer)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES, mockEthPeer)).isEmpty();

reputation.resetTimeoutCount(EthPV62.GET_BLOCK_HEADERS);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES, mockEthPeer))
.contains(TIMEOUT);
}

@Test
public void shouldOnlyDisconnectWhenEmptyResponseThresholdReached() {
assertThat(reputation.recordUselessResponse(1001)).isEmpty();
assertThat(reputation.recordUselessResponse(1002)).isEmpty();
assertThat(reputation.recordUselessResponse(1003)).isEmpty();
assertThat(reputation.recordUselessResponse(1004)).isEmpty();
assertThat(reputation.recordUselessResponse(1005)).contains(USELESS_PEER);
assertThat(reputation.recordUselessResponse(1001, mockEthPeer)).isEmpty();
assertThat(reputation.recordUselessResponse(1002, mockEthPeer)).isEmpty();
assertThat(reputation.recordUselessResponse(1003, mockEthPeer)).isEmpty();
assertThat(reputation.recordUselessResponse(1004, mockEthPeer)).isEmpty();
assertThat(reputation.recordUselessResponse(1005, mockEthPeer)).contains(USELESS_PEER);
}

@Test
public void shouldDiscardEmptyResponseRecordsAfterTimeWindowElapses() {
// Bring it to the brink of disconnection.
assertThat(reputation.recordUselessResponse(1001)).isEmpty();
assertThat(reputation.recordUselessResponse(1002)).isEmpty();
assertThat(reputation.recordUselessResponse(1003)).isEmpty();
assertThat(reputation.recordUselessResponse(1004)).isEmpty();
assertThat(reputation.recordUselessResponse(1001, mockEthPeer)).isEmpty();
assertThat(reputation.recordUselessResponse(1002, mockEthPeer)).isEmpty();
assertThat(reputation.recordUselessResponse(1003, mockEthPeer)).isEmpty();
assertThat(reputation.recordUselessResponse(1004, mockEthPeer)).isEmpty();

// But then the next empty response doesn't come in until after the window expires on the first
assertThat(
reputation.recordUselessResponse(
1001 + PeerReputation.USELESS_RESPONSE_WINDOW_IN_MILLIS + 1))
1001 + PeerReputation.USELESS_RESPONSE_WINDOW_IN_MILLIS + 1, mockEthPeer))
.isEmpty();
}

Expand Down