Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry to get peers best block head #6833

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -24,6 +24,7 @@
public abstract class AbstractPeerTask<R> extends AbstractEthTask<PeerTaskResult<R>> {
protected Optional<EthPeer> assignedPeer = Optional.empty();
protected final EthContext ethContext;
protected EthPeer fixedPeer;

protected AbstractPeerTask(final EthContext ethContext, final MetricsSystem metricsSystem) {
super(metricsSystem);
Expand All @@ -35,6 +36,12 @@ public AbstractPeerTask<R> assignPeer(final EthPeer peer) {
return this;
}

public AbstractPeerTask<R> assignFixedPeer(final EthPeer peer) {
assignedPeer = Optional.ofNullable(peer);
fixedPeer = peer;
return this;
}

public static class PeerTaskResult<T> {
private final EthPeer peer;
private final T result;
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.PeerDisconnectedException;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.plugin.services.MetricsSystem;

Expand Down Expand Up @@ -119,6 +120,9 @@ public static AbstractGetHeadersFromPeerTask forSingleHash(
protected PendingPeerRequest sendRequest() {
return sendRequestToPeer(
peer -> {
if (fixedPeer != null && !peer.equals(fixedPeer)) {
throw new PeerDisconnectedException(fixedPeer);
}
LOG.atTrace()
.setMessage("Requesting {} headers (hash {}...) from peer {}...")
.addArgument(count)
Expand Down
Expand Up @@ -20,11 +20,18 @@
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers.ConnectCallback;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.slf4j.Logger;
Expand All @@ -33,6 +40,7 @@
public class ChainHeadTracker implements ConnectCallback {

private static final Logger LOG = LoggerFactory.getLogger(ChainHeadTracker.class);
public static final int MAX_RETRIES = 4;

private final EthContext ethContext;
private final ProtocolSchedule protocolSchedule;
Expand Down Expand Up @@ -70,38 +78,87 @@ public void onPeerConnected(final EthPeer peer) {
.setMessage("Requesting chain head info from {}...")
.addArgument(peer::getLoggableId)
.log();
GetHeadersFromPeerByHashTask.forSingleHash(
createAndRunGetHeaderTask(peer).whenComplete(checkResultAndMaybeRetry(peer, MAX_RETRIES));
}

private CompletableFuture<AbstractPeerTask.PeerTaskResult<List<BlockHeader>>>
createAndRunGetHeaderTask(final EthPeer peer) {
return GetHeadersFromPeerByHashTask.forSingleHash(
protocolSchedule,
ethContext,
Hash.wrap(peer.chainState().getBestBlock().getHash()),
0,
metricsSystem)
.assignPeer(peer)
.assignFixedPeer(
peer) // want to make sure we are using this peer. If it can't even provide this header,
// it's useless!
.run()
.whenComplete(
(peerResult, error) -> {
if (peerResult != null && !peerResult.getResult().isEmpty()) {
final BlockHeader chainHeadHeader = peerResult.getResult().get(0);
peer.chainState().update(chainHeadHeader);
trailingPeerLimiter.enforceTrailingPeerLimit();
LOG.atDebug()
.setMessage("Retrieved chain head info {} from {}...")
.addArgument(
() ->
chainHeadHeader.getNumber()
+ " ("
+ chainHeadHeader.getBlockHash()
+ ")")
.addArgument(peer::getLoggableId)
.log();
} else {
LOG.atDebug()
.setMessage("Failed to retrieve chain head info. Disconnecting {}... {}")
.addArgument(peer::getLoggableId)
.addArgument(error)
.log();
peer.disconnect(DisconnectReason.USELESS_PEER);
}
});
.exceptionally(handleException(peer));
}

private static Function<Throwable, AbstractPeerTask.PeerTaskResult<List<BlockHeader>>>
handleException(final EthPeer peer) {
return e -> {
LOG.atDebug()
.setMessage("Failed to retrieve chain head info from {}. Reason: {}")
.addArgument(peer::getLoggableId)
.addArgument(e::toString)
.log();
peer.disconnect(DisconnectReason.USELESS_PEER);
return null;
};
}

private BiConsumer<AbstractPeerTask.PeerTaskResult<List<BlockHeader>>, Throwable>
checkResultAndMaybeRetry(final EthPeer peer, final int retries) {
return (peerResult, error) -> {
if (peerResult != null && !peerResult.getResult().isEmpty()) {
final BlockHeader chainHeadHeader = peerResult.getResult().get(0);
peer.chainState().update(chainHeadHeader);
trailingPeerLimiter.enforceTrailingPeerLimit();
LOG.atDebug()
.setMessage("Retrieved chain head info {} from {}...")
.addArgument(
() -> chainHeadHeader.getNumber() + " (" + chainHeadHeader.getBlockHash() + ")")
.addArgument(peer::getLoggableId)
.log();
} else {
if (retries > 0) {
LOG.atDebug()
.setMessage(
"Failed to retrieve chain head info from {}. Reason: {}. {} retires left}")
.addArgument(peer::getLoggableId)
.addArgument(() -> getReason(peerResult, error))
.addArgument(() -> retries)
.log();
Executor delayed = CompletableFuture.delayedExecutor(2L, TimeUnit.SECONDS);
delayed.execute(
() ->
createAndRunGetHeaderTask(peer)
.whenComplete(checkResultAndMaybeRetry(peer, retries - 1)));
} else {
LOG.atDebug()
.setMessage(
"Failed to retrieve chain head info from {}. Disconnecting after "
+ MAX_RETRIES
+ 1
+ " tries. Reason: {}.")
.addArgument(peer::getLoggableId)
.addArgument(() -> getReason(peerResult, error))
.log();
// If that peer does not the block header of it's best block after MAX_RETRIES, it's
// useless. The best block of that peer was set based on the status massage that the peer
// sent moments ago.
peer.disconnect(DisconnectReason.USELESS_PEER);
}
}
};
}

private String getReason(
final AbstractPeerTask.PeerTaskResult<List<BlockHeader>> peerResult, final Throwable error) {
return peerResult != null && peerResult.getResult().isEmpty()
? "Empty result"
: error.toString();
}
}