Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async shuffling refactor #6521

Draft
wants to merge 11 commits into
base: unstable
Choose a base branch
from
Draft
3 changes: 2 additions & 1 deletion packages/beacon-node/src/api/impl/beacon/state/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
computeStartSlotAtEpoch,
getCurrentEpoch,
getRandaoMix,
ShufflingCacheCaller,
} from "@lodestar/state-transition";
import {EPOCHS_PER_HISTORICAL_VECTOR} from "@lodestar/params";
import {ApiError} from "../../errors.js";
Expand Down Expand Up @@ -195,7 +196,7 @@ export function getBeaconStateApi({

const epoch = filters?.epoch ?? computeEpochAtSlot(state.slot);
const startSlot = computeStartSlotAtEpoch(epoch);
const shuffling = stateCached.epochCtx.getShufflingAtEpoch(epoch);
const shuffling = stateCached.epochCtx.getShufflingAtEpoch(epoch, ShufflingCacheCaller.getEpochCommittees);
const committees = shuffling.committees;
const committeesFlat = committees.flatMap((slotCommittees, slotInEpoch) => {
const slot = startSlot + slotInEpoch;
Expand Down
9 changes: 1 addition & 8 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ export async function importBlock(
const blockRootHex = toHexString(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
const blockEpoch = computeEpochAtSlot(blockSlot);
const parentEpoch = computeEpochAtSlot(parentBlockSlot);
const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;
const recvToValLatency = Date.now() / 1000 - (opts.seenTimestampSec ?? Date.now() / 1000);
Expand Down Expand Up @@ -349,12 +348,6 @@ export async function importBlock(
this.logger.verbose("After importBlock caching postState without SSZ cache", {slot: postState.slot});
}

if (parentEpoch < blockEpoch) {
// current epoch and previous epoch are likely cached in previous states
this.shufflingCache.processState(postState, postState.epochCtx.nextShuffling.epoch);
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: blockSlot});
}

if (blockSlot % SLOTS_PER_EPOCH === 0) {
// Cache state to preserve epoch transition work
const checkpointState = postState;
Expand All @@ -366,7 +359,7 @@ export async function importBlock(
// Note: in-lined code from previos handler of ChainEvent.checkpoint
this.logger.verbose("Checkpoint processed", toCheckpointHex(cp));

const activeValidatorsCount = checkpointState.epochCtx.currentShuffling.activeIndices.length;
const activeValidatorsCount = checkpointState.epochCtx.currentActiveIndices.length;
this.metrics?.currentActiveValidators.set(activeValidatorsCount);
this.metrics?.currentValidators.set({status: "active"}, activeValidatorsCount);

Expand Down
57 changes: 31 additions & 26 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import {
Index2PubkeyCache,
PubkeyIndexMap,
EpochShuffling,
ShufflingCache,
IShufflingCache,
ShufflingCacheError,
ShufflingCacheErrorCode,
} from "@lodestar/state-transition";
import {BeaconConfig} from "@lodestar/config";
import {
Expand Down Expand Up @@ -77,7 +81,6 @@ import {computeNewStateRoot} from "./produceBlock/computeNewStateRoot.js";
import {BlockInput} from "./blocks/types.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {BlockRewards, computeBlockRewards} from "./rewards/blockRewards.js";
import {ShufflingCache} from "./shufflingCache.js";
import {StateContextCache} from "./stateCache/stateContextCache.js";
import {SeenGossipBlockInput} from "./seenCache/index.js";
import {CheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js";
Expand Down Expand Up @@ -136,7 +139,7 @@ export class BeaconChain implements IBeaconChain {

readonly beaconProposerCache: BeaconProposerCache;
readonly checkpointBalancesCache: CheckpointBalancesCache;
readonly shufflingCache: ShufflingCache;
readonly shufflingCache: IShufflingCache;
/** Map keyed by executionPayload.blockHash of the block for those blobs */
readonly producedContentsCache = new Map<BlockHash, deneb.Contents>();

Expand Down Expand Up @@ -216,24 +219,27 @@ export class BeaconChain implements IBeaconChain {

this.beaconProposerCache = new BeaconProposerCache(opts);
this.checkpointBalancesCache = new CheckpointBalancesCache();
this.shufflingCache = new ShufflingCache(metrics, this.opts);

// Restore state caches
// anchorState may already by a CachedBeaconState. If so, don't create the cache again, since deserializing all
// pubkeys takes ~30 seconds for 350k keys (mainnet 2022Q2).
// When the BeaconStateCache is created in eth1 genesis builder it may be incorrect. Until we can ensure that
// it's safe to re-use _ANY_ BeaconStateCache, this option is disabled by default and only used in tests.
const cachedState =
isCachedBeaconState(anchorState) && opts.skipCreateStateCacheIfAvailable
? anchorState
: createCachedBeaconState(anchorState, {
config,
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
});
this.shufflingCache.processState(cachedState, cachedState.epochCtx.previousShuffling.epoch);
this.shufflingCache.processState(cachedState, cachedState.epochCtx.currentShuffling.epoch);
this.shufflingCache.processState(cachedState, cachedState.epochCtx.nextShuffling.epoch);
let cachedState: CachedBeaconStateAllForks;
if (isCachedBeaconState(anchorState) && opts.skipCreateStateCacheIfAvailable) {
cachedState = anchorState;
cachedState.epochCtx.shufflingCache.addMetrics(metrics);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a funky edge case. I do not like adding the metrics here but there is an instance where the ShufflingCache need to be created before the metrics object (during genesis) and refactoring for that condition was not ideal. So in that circumstance the CachedBeaconState is built with a ShufflingCache and then when the seedState is passed into the chain the metrics are added to the class. This will be extended to also add the Logger when the worker thread for building is added.

this.shufflingCache = cachedState.epochCtx.shufflingCache;
} else {
this.shufflingCache = new ShufflingCache(metrics, this.opts);
cachedState = createCachedBeaconState(anchorState, {
config,
logger: this.logger,
shufflingCache: this.shufflingCache,
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
});
}

// Persist single global instance of state caches
this.pubkey2index = cachedState.epochCtx.pubkey2index;
Expand Down Expand Up @@ -711,22 +717,13 @@ export class BeaconChain implements IBeaconChain {
attHeadBlock: ProtoBlock,
regenCaller: RegenCaller
): Promise<EpochShuffling> {
// this is to prevent multiple calls to get shuffling for the same epoch and dependent root
// any subsequent calls of the same epoch and dependent root will wait for this promise to resolve
this.shufflingCache.insertPromise(attEpoch, shufflingDependentRoot);
const blockEpoch = computeEpochAtSlot(attHeadBlock.slot);

let state: CachedBeaconStateAllForks;
if (blockEpoch < attEpoch - 1) {
// thanks to one epoch look ahead, we don't need to dial up to attEpoch
const targetSlot = computeStartSlotAtEpoch(attEpoch - 1);
this.metrics?.gossipAttestation.useHeadBlockStateDialedToTargetEpoch.inc({caller: regenCaller});
state = await this.regen.getBlockSlotState(
attHeadBlock.blockRoot,
targetSlot,
{dontTransferCache: true},
regenCaller
);
await this.regen.getBlockSlotState(attHeadBlock.blockRoot, targetSlot, {dontTransferCache: true}, regenCaller);
} else if (blockEpoch > attEpoch) {
// should not happen, handled inside attestation verification code
throw Error(`Block epoch ${blockEpoch} is after attestation epoch ${attEpoch}`);
Expand All @@ -735,11 +732,19 @@ export class BeaconChain implements IBeaconChain {
// it's not likely to hit this since these shufflings are cached already
// so handle just in case
this.metrics?.gossipAttestation.useHeadBlockState.inc({caller: regenCaller});
state = await this.regen.getState(attHeadBlock.stateRoot, regenCaller);
await this.regen.getState(attHeadBlock.stateRoot, regenCaller);
}

// resolve the promise to unblock other calls of the same epoch and dependent root
return this.shufflingCache.processState(state, attEpoch);
const shuffling = await this.shufflingCache.get(attEpoch, shufflingDependentRoot);
if (!shuffling) {
throw new ShufflingCacheError({
code: ShufflingCacheErrorCode.REGEN_ERROR_NO_SHUFFLING_FOUND,
epoch: attEpoch,
shufflingDecisionRoot: shufflingDependentRoot,
});
}
return shuffling;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/genesis/genesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export class GenesisBuilder implements IGenesisBuilder {
}

// TODO - PENDING: Ensure EpochCacheImmutableData is created only once
this.state = createCachedBeaconState(stateView, createEmptyEpochCacheImmutableData(config, stateView));
this.state = createCachedBeaconState(stateView, createEmptyEpochCacheImmutableData(config, logger, stateView));
this.config = this.state.config;
this.activatedValidatorCount = getActiveValidatorIndices(stateView, GENESIS_EPOCH).length;
}
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
BeaconStateAllForks,
CachedBeaconStateAllForks,
EpochShuffling,
IShufflingCache,
Index2PubkeyCache,
PubkeyIndexMap,
} from "@lodestar/state-transition";
Expand Down Expand Up @@ -51,7 +52,6 @@ import {IChainOptions} from "./options.js";
import {AssembledBlockType, BlockAttributes, BlockType} from "./produceBlock/produceBlockBody.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {SeenGossipBlockInput} from "./seenCache/index.js";
import {ShufflingCache} from "./shufflingCache.js";
import {BlockRewards} from "./rewards/blockRewards.js";
import {SyncCommitteeRewards} from "./rewards/syncCommitteeRewards.js";

Expand Down Expand Up @@ -114,7 +114,7 @@ export interface IBeaconChain {
readonly checkpointBalancesCache: CheckpointBalancesCache;
readonly producedContentsCache: Map<BlockHash, deneb.Contents>;
readonly producedBlockRoot: Map<RootHex, allForks.ExecutionPayload | null>;
readonly shufflingCache: ShufflingCache;
readonly shufflingCache: IShufflingCache;
readonly producedBlindedBlockRoot: Set<RootHex>;
readonly opts: IChainOptions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
computeEpochAtSlot,
computeStartSlotAtEpoch,
getBlockRootAtSlot,
ShufflingCacheCaller,
} from "@lodestar/state-transition";
import {IForkChoice, EpochDifference} from "@lodestar/fork-choice";
import {toHex, MapDef} from "@lodestar/utils";
Expand Down Expand Up @@ -149,7 +150,7 @@ export class AggregatedAttestationPool {
}

const slotDelta = stateSlot - slot;
const shuffling = state.epochCtx.getShufflingAtEpoch(epoch);
const shuffling = state.epochCtx.getShufflingAtEpoch(epoch, ShufflingCacheCaller.getAttestationsForBlock);
const slotCommittees = shuffling.committees[slot % SLOTS_PER_EPOCH];
for (const [committeeIndex, attestationGroupByData] of attestationGroupByDataHashByIndex.entries()) {
// all attestations will be validated against the state in next step so we can get committee from the state
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import {SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY} from "@lodestar/params";
import {defaultOptions as defaultValidatorOptions} from "@lodestar/validator";
import {ShufflingCacheOptions} from "@lodestar/state-transition";
import {ArchiverOpts} from "./archiver/index.js";
import {ForkChoiceOpts} from "./forkChoice/index.js";
import {LightClientServerOpts} from "./lightClient/index.js";
import {ShufflingCacheOpts} from "./shufflingCache.js";

export type IChainOptions = BlockProcessOpts &
PoolOpts &
SeenCacheOpts &
ForkChoiceOpts &
ArchiverOpts &
ShufflingCacheOpts &
ShufflingCacheOptions &
LightClientServerOpts & {
blsVerifyAllMainThread?: boolean;
blsVerifyAllMultiThread?: boolean;
Expand Down