Skip to content

Commit

Permalink
adding blobsSource for blobsPromise
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech committed Apr 19, 2024
1 parent 3da60d1 commit 55b7d65
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 34 deletions.
4 changes: 2 additions & 2 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {computeEpochAtSlot, computeTimeAtSlot, reconstructFullBlockOrContents} f
import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
import {sleep, toHex} from "@lodestar/utils";
import {allForks, deneb, isSignedBlockContents, ProducedBlockSource} from "@lodestar/types";
import {BlockSource, getBlockInput, ImportBlockOpts, BlockInput, BlobSource} from "../../../../chain/blocks/types.js";
import {BlockSource, getBlockInput, ImportBlockOpts, BlockInput, BlobsSource} from "../../../../chain/blocks/types.js";
import {promiseAllMaybeAsync} from "../../../../util/promises.js";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {computeBlobSidecars} from "../../../../util/blobs.js";
Expand Down Expand Up @@ -52,7 +52,7 @@ export function getBeaconBlockApi({
signedBlock,
BlockSource.api,
blobSidecars,
BlobSource.api,
BlobsSource.api,
// don't bundle any bytes for block and blobs
null,
blobSidecars.map(() => null)
Expand Down
13 changes: 8 additions & 5 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,20 @@ export async function importBlock(
this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex});

// We want to import block asap so call all event handler in the next event loop
setTimeout(() => {
setTimeout(async () => {
this.emitter.emit(routes.events.EventType.block, {
block: blockRootHex,
slot: blockSlot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});

if (blockInput.type === BlockInputType.postDeneb) {
const {blobSource} = blockInput;
this.metrics?.importBlock.blobBySource.inc({blobSource});
for (const blobSidecar of blockInput.blobs) {
if (blockInput.type === BlockInputType.postDeneb || blockInput.type === BlockInputType.blobsPromise) {
const blobsData =
blockInput.type === BlockInputType.postDeneb ? blockInput : await blockInput.availabilityPromise;
const {blobsSource, blobs} = blobsData;

this.metrics?.importBlock.blobBySource.inc({blobsSource});
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
Expand Down
15 changes: 8 additions & 7 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export enum BlockSource {
}

/** Enum to represent where blobs come from */
export enum BlobSource {
export enum BlobsSource {
gossip = "gossip",
api = "api",
byRange = "req_resp_by_range",
Expand All @@ -32,7 +32,7 @@ export enum GossipedInputType {
}

export type BlobsCache = Map<number, {blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null}>;
export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]};
export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]; blobsSource: BlobsSource};
type CachedBlobs = {
blobsCache: BlobsCache;
availabilityPromise: Promise<BlockInputBlobs>;
Expand All @@ -41,7 +41,8 @@ type CachedBlobs = {

export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & (
| {type: BlockInputType.preDeneb}
| ({type: BlockInputType.postDeneb} & {blobSource: BlobSource} & BlockInputBlobs)
| ({type: BlockInputType.postDeneb} & BlockInputBlobs)
// the blobsSource here is added to BlockInputBlobs when availability is resolved
| ({type: BlockInputType.blobsPromise} & CachedBlobs)
);
export type NullBlockInput = {block: null; blockRootHex: RootHex; blockInputPromise: Promise<BlockInput>} & CachedBlobs;
Expand Down Expand Up @@ -77,7 +78,7 @@ export const getBlockInput = {
block: allForks.SignedBeaconBlock,
source: BlockSource,
blobs: deneb.BlobSidecars,
blobSource: BlobSource,
blobsSource: BlobsSource,
blockBytes: Uint8Array | null,
blobsBytes: (Uint8Array | null)[]
): BlockInput {
Expand All @@ -89,7 +90,7 @@ export const getBlockInput = {
block,
source,
blobs,
blobSource,
blobsSource,
blockBytes,
blobsBytes,
};
Expand Down Expand Up @@ -119,7 +120,7 @@ export const getBlockInput = {
},
};

export function getBlockInputBlobs(blobsCache: BlobsCache): BlockInputBlobs {
export function getBlockInputBlobs(blobsCache: BlobsCache, blobsSource: BlobsSource): BlockInputBlobs {
const blobs = [];
const blobsBytes = [];

Expand All @@ -132,7 +133,7 @@ export function getBlockInputBlobs(blobsCache: BlobsCache): BlockInputBlobs {
blobs.push(blobSidecar);
blobsBytes.push(blobBytes);
}
return {blobs, blobsBytes};
return {blobs, blobsBytes, blobsSource};
}

export enum AttestationImportOpt {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
BlobsCache,
GossipedInputType,
getBlockInputBlobs,
BlobSource,
BlobsSource,
} from "../blocks/types.js";
import {Metrics} from "../../metrics/index.js";

Expand Down Expand Up @@ -135,16 +135,16 @@ export class SeenGossipBlockInput {
}

if (blobKzgCommitments.length === blobsCache.size) {
const allBlobs = getBlockInputBlobs(blobsCache);
const allBlobs = getBlockInputBlobs(blobsCache, BlobsSource.gossip);
resolveAvailability(allBlobs);
metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.GOSSIP});
const {blobs, blobsBytes} = allBlobs;
const {blobs, blobsBytes, blobsSource} = allBlobs;
const blockInput = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.gossip,
blobs,
BlobSource.gossip,
blobsSource,
blockBytes ?? null,
blobsBytes
);
Expand Down
6 changes: 3 additions & 3 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {EpochTransitionStep, StateCloneSource, StateHashTreeRootSource} from "@lodestar/state-transition";
import {allForks} from "@lodestar/types";
import {BlockSource, BlobSource} from "../../chain/blocks/types.js";
import {BlockSource, BlobsSource} from "../../chain/blocks/types.js";
import {JobQueueItemType} from "../../chain/bls/index.js";
import {BlockErrorCode} from "../../chain/errors/index.js";
import {InsertOutcome} from "../../chain/opPools/types.js";
Expand Down Expand Up @@ -800,10 +800,10 @@ export function createLodestarMetrics(
help: "Total number of imported blocks by source",
labelNames: ["source"],
}),
blobBySource: register.gauge<{blobSource: BlobSource}>({
blobBySource: register.gauge<{blobsSource: BlobsSource}>({
name: "lodestar_import_blob_by_source_total",
help: "Total number of imported blobs by source",
labelNames: ["blobSource"],
labelNames: ["blobsSource"],
}),
},
engineNotifyNewPayloadResult: register.gauge<{result: ExecutionPayloadStatus}>({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {deneb, Epoch, phase0, allForks, Slot} from "@lodestar/types";
import {ForkSeq} from "@lodestar/params";
import {computeEpochAtSlot} from "@lodestar/state-transition";

import {BlobSource, BlockInput, BlockSource, getBlockInput} from "../../chain/blocks/types.js";
import {BlobsSource, BlockInput, BlockSource, getBlockInput} from "../../chain/blocks/types.js";
import {PeerIdStr} from "../../util/peerId.js";
import {INetwork, WithBytes} from "../interface.js";

Expand Down Expand Up @@ -43,7 +43,7 @@ export async function beaconBlocksMaybeBlobsByRange(
network.sendBlobSidecarsByRange(peerId, request),
]);

return matchBlockWithBlobs(config, allBlocks, allBlobSidecars, endSlot, BlockSource.byRange, BlobSource.byRange);
return matchBlockWithBlobs(config, allBlocks, allBlobSidecars, endSlot, BlockSource.byRange, BlobsSource.byRange);
}

// Post Deneb but old blobs
Expand All @@ -59,7 +59,7 @@ export function matchBlockWithBlobs(
allBlobSidecars: deneb.BlobSidecar[],
endSlot: Slot,
blockSource: BlockSource,
blobSource: BlobSource
blobsSource: BlobsSource
): BlockInput[] {
const blockInputs: BlockInput[] = [];
let blobSideCarIndex = 0;
Expand Down Expand Up @@ -102,7 +102,7 @@ export function matchBlockWithBlobs(
block.data,
blockSource,
blobSidecars,
blobSource,
blobsSource,
null,
Array.from({length: blobKzgCommitmentsLen}, () => null)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
getBlockInputBlobs,
getBlockInput,
NullBlockInput,
BlobSource,
BlobsSource,
} from "../../chain/blocks/types.js";
import {PeerIdStr} from "../../util/peerId.js";
import {INetwork} from "../interface.js";
Expand Down Expand Up @@ -48,7 +48,7 @@ export async function beaconBlocksMaybeBlobsByRoot(

// The last arg is to provide slot to which all blobs should be exausted in matching
// and here it should be infinity since all bobs should match
return matchBlockWithBlobs(config, allBlocks, allBlobSidecars, Infinity, BlockSource.byRoot, BlobSource.byRoot);
return matchBlockWithBlobs(config, allBlocks, allBlobSidecars, Infinity, BlockSource.byRoot, BlobsSource.byRoot);
}

export async function unavailableBeaconBlobsByRoot(
Expand Down Expand Up @@ -99,13 +99,13 @@ export async function unavailableBeaconBlobsByRoot(

// check and see if all blobs are now available and in that case resolve availability
// if not this will error and the leftover blobs will be tried from another peer
const allBlobs = getBlockInputBlobs(blobsCache);
const {blobs, blobsBytes} = allBlobs;
const allBlobs = getBlockInputBlobs(blobsCache, BlobsSource.byRoot);
const {blobs, blobsBytes, blobsSource} = allBlobs;
if (blobs.length !== blobKzgCommitmentsLen) {
throw Error(`Not all blobs fetched missingBlobs=${blobKzgCommitmentsLen - blobs.length}`);
}

resolveAvailability(allBlobs);
metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.UNKNOWN_SYNC});
return getBlockInput.postDeneb(config, block, BlockSource.byRoot, blobs, BlobSource.byRoot, blockBytes, blobsBytes);
return getBlockInput.postDeneb(config, block, BlockSource.byRoot, blobs, blobsSource, blockBytes, blobsBytes);
}
4 changes: 2 additions & 2 deletions packages/beacon-node/test/spec/presets/fork_choice.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
AttestationImportOpt,
BlockSource,
BlobSidecarValidation,
BlobSource,
BlobsSource,
} from "../../../src/chain/blocks/types.js";
import {ZERO_HASH_HEX} from "../../../src/constants/constants.js";
import {PowMergeBlock} from "../../../src/eth1/interface.js";
Expand Down Expand Up @@ -215,7 +215,7 @@ const forkChoiceTest =
signedBlock,
BlockSource.gossip,
blobSidecars,
BlobSource.gossip,
BlobsSource.gossip,
null,
[null]
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {ssz, deneb} from "@lodestar/types";
import {createBeaconConfig, createChainForkConfig, defaultChainConfig} from "@lodestar/config";

import {beaconBlocksMaybeBlobsByRange} from "../../../src/network/reqresp/index.js";
import {BlockInputType, BlockSource, BlobSource} from "../../../src/chain/blocks/types.js";
import {BlockInputType, BlockSource, BlobsSource} from "../../../src/chain/blocks/types.js";
import {initCKZG, loadEthereumTrustedSetup} from "../../../src/util/kzg.js";
import {INetwork} from "../../../src/network/interface.js";
import {ZERO_HASH} from "../../../src/constants/constants.js";
Expand Down Expand Up @@ -104,7 +104,7 @@ describe("beaconBlocksMaybeBlobsByRange", () => {
block,
source: BlockSource.byRange,
blobs,
blobSource: BlobSource.byRange,
blobsSource: BlobsSource.byRange,
blockBytes: null,
blobsBytes: blobs.map(() => null),
};
Expand Down

0 comments on commit 55b7d65

Please sign in to comment.