Skip to content

Commit

Permalink
add data column to types repo and network
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech committed Apr 22, 2024
1 parent 04400ae commit 0a896f2
Show file tree
Hide file tree
Showing 23 changed files with 510 additions and 10 deletions.
15 changes: 15 additions & 0 deletions packages/beacon-node/src/chain/errors/dataColumnSidecarError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import {Slot} from "@lodestar/types";
import {GossipActionError} from "./gossipValidation.js";

export enum DataColumnSidecarErrorCode {
INVALID_INDEX = "BLOB_SIDECAR_ERROR_INVALID_INDEX",

// following errors are adapted from the block errors
FUTURE_SLOT = "BLOB_SIDECAR_ERROR_FUTURE_SLOT",
}

export type DataColumnSidecarErrorType =
| {code: DataColumnSidecarErrorCode.INVALID_INDEX; columnIndex: number; gossipIndex: number}
| {code: DataColumnSidecarErrorCode.FUTURE_SLOT; blockSlot: Slot; currentSlot: Slot};

export class DataColumnSidecarGossipError extends GossipActionError<DataColumnSidecarErrorType> {}
51 changes: 51 additions & 0 deletions packages/beacon-node/src/chain/validation/dataColumnSidecar.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import {
KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH,
KZG_COMMITMENTS_SUBTREE_INDEX,
DATA_COLUMN_SIDECAR_SUBNET_COUNT,
} from "lodestar/params";
import {electra, ssz} from "@lodestar/types";
import {verifyMerkleBranch} from "@lodestar/utils";

import {DataColumnSidecarGossipError, DataColumnSidecarErrorCode} from "../errors/dataColumnSidecarError.js";
import {GossipAction} from "../errors/gossipValidation.js";
import {IBeaconChain} from "../interface.js";

export async function validateGossipDataColumnSidecar(
chain: IBeaconChain,
dataColumnSideCar: electra.DataColumnSidecar,
gossipIndex: number
): Promise<void> {
const dataColumnSlot = dataColumnSideCar.signedBlockHeader.message.slot;

if (dataColumnSideCar.index % DATA_COLUMN_SIDECAR_SUBNET_COUNT !== gossipIndex) {
throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
code: DataColumnSidecarErrorCode.INVALID_INDEX,
columnIndex: dataColumnSideCar.index,
gossipIndex,
});
}

// [IGNORE] The sidecar is not from a future slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) --
// i.e. validate that sidecar.slot <= current_slot (a client MAY queue future blocks for processing at
// the appropriate slot).
const currentSlotWithGossipDisparity = chain.clock.currentSlotWithGossipDisparity;
if (currentSlotWithGossipDisparity < dataColumnSlot) {
throw new DataColumnSidecarGossipError(GossipAction.IGNORE, {
code: DataColumnSidecarErrorCode.FUTURE_SLOT,
currentSlot: currentSlotWithGossipDisparity,
blockSlot: dataColumnSlot,
});
}

validateInclusionProof(dataColumnSideCar);
}

function validateInclusionProof(dataColumnSideCar: electra.DataColumnSidecar): boolean {
return verifyMerkleBranch(
ssz.deneb.BlobKzgCommitments.hashTreeRoot(dataColumnSideCar.kzgCommitments),
dataColumnSideCar.kzgCommitmentsInclusionProof,
KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH,
KZG_COMMITMENTS_SUBTREE_INDEX,
dataColumnSideCar.signedBlockHeader.message.bodyRoot
);
}
3 changes: 3 additions & 0 deletions packages/beacon-node/src/db/buckets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ export enum Bucket {
// 54 was for bestPartialLightClientUpdate, allocate a fresh one
// lightClient_bestLightClientUpdate = 55, // SyncPeriod -> LightClientUpdate // DEPRECATED on v1.5.0
lightClient_bestLightClientUpdate = 56, // SyncPeriod -> [Slot, LightClientUpdate]

allForks_dataColumnSidecars = 57, // ELECTRA BeaconBlockRoot -> DataColumnSidecars
allForks_dataColumnSidecarsArchive = 58, // ELECTRA BeaconBlockSlot -> DataColumnSidecars
}

export function getBucketNameByValue<T extends Bucket>(enumValue: T): keyof typeof Bucket {
Expand Down
44 changes: 44 additions & 0 deletions packages/beacon-node/src/db/repositories/dataColumnSidecars.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import {ValueOf, ContainerType, ByteVectorType} from "@chainsafe/ssz";
import {ChainForkConfig} from "@lodestar/config";
import {Db, Repository} from "@lodestar/db";
import {ssz} from "@lodestar/types";
import {MAX_BLOB_COMMITMENTS_PER_BLOCK, NUMBER_OF_COLUMNS} from "@lodestar/params";

import {Bucket, getBucketNameByValue} from "../buckets.js";

export const dataColumnSidecarsWrapperSsz = new ContainerType(
{
blockRoot: ssz.Root,
slot: ssz.Slot,
columnsSize: ssz.Uint8,
// each byte[i] tells what index (1 based) the column i is stored, 0 means not custodied
custodyColumns: new ByteVectorType(NUMBER_OF_COLUMNS),
dataColumnSidecars: ssz.electra.DataColumnSidecars,
},
{typeName: "DataColumnSidecarsWrapper", jsonCase: "eth2"}
);

export type DataColumnSidecarsWrapper = ValueOf<typeof dataColumnSidecarsWrapperSsz>;
export const COLUMN_SIZE_IN_WRAPPER_INDEX = 44;
export const CUSTODY_COLUMNS_IN_IN_WRAPPER_INDEX = 45;
export const DATA_COLUMN_SIDECARS_IN_WRAPPER_INDEX = 46 + MAX_BLOB_COMMITMENTS_PER_BLOCK;

/**
* dataColumnSidecarsWrapper by block root (= hash_tree_root(SignedBeaconBlock.message))
*
* Used to store unfinalized DataColumnSidecars
*/
export class DataColumnSidecarsRepository extends Repository<Uint8Array, DataColumnSidecarsWrapper> {
constructor(config: ChainForkConfig, db: Db) {
const bucket = Bucket.allForks_dataColumnSidecars;
super(config, db, bucket, dataColumnSidecarsWrapperSsz, getBucketNameByValue(bucket));
}

/**
* Id is hashTreeRoot of unsigned BeaconBlock
*/
getId(value: DataColumnSidecarsWrapper): Uint8Array {
const {blockRoot} = value;
return blockRoot;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import {ChainForkConfig} from "@lodestar/config";
import {Db, Repository} from "@lodestar/db";
import {Slot} from "@lodestar/types";
import {bytesToInt} from "@lodestar/utils";
import {Bucket, getBucketNameByValue} from "../buckets.js";
import {dataColumnSidecarsWrapperSsz, DataColumnSidecarsWrapper} from "./dataColumnSidecars.js";

/**
* dataColumnSidecarsWrapper by slot
*
* Used to store finalized DataColumnSidecars
*/
export class BlobSidecarsArchiveRepository extends Repository<Slot, DataColumnSidecarsWrapper> {
constructor(config: ChainForkConfig, db: Db) {
const bucket = Bucket.allForks_dataColumnSidecarsArchive;
super(config, db, bucket, dataColumnSidecarsWrapperSsz, getBucketNameByValue(bucket));
}

// Handle key as slot

getId(value: DataColumnSidecarsWrapper): Slot {
return value.slot;
}

decodeKey(data: Uint8Array): number {
return bytesToInt(super.decodeKey(data) as unknown as Uint8Array, "be");
}
}
6 changes: 5 additions & 1 deletion packages/beacon-node/src/network/gossip/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Libp2p} from "libp2p";
import {Message, TopicValidatorResult} from "@libp2p/interface";
import {PeerIdStr} from "@chainsafe/libp2p-gossipsub/types";
import {ForkName} from "@lodestar/params";
import {allForks, altair, capella, deneb, phase0, Slot} from "@lodestar/types";
import {allForks, altair, capella, deneb, phase0, Slot, electra} from "@lodestar/types";
import {BeaconConfig} from "@lodestar/config";
import {Logger} from "@lodestar/utils";
import {IBeaconChain} from "../../chain/index.js";
Expand All @@ -13,6 +13,7 @@ import {GossipActionError} from "../../chain/errors/gossipValidation.js";
export enum GossipType {
beacon_block = "beacon_block",
blob_sidecar = "blob_sidecar",
data_column_sidecar = "data_column_sidecar",
beacon_aggregate_and_proof = "beacon_aggregate_and_proof",
beacon_attestation = "beacon_attestation",
voluntary_exit = "voluntary_exit",
Expand Down Expand Up @@ -41,6 +42,7 @@ export interface IGossipTopic {
export type GossipTopicTypeMap = {
[GossipType.beacon_block]: {type: GossipType.beacon_block};
[GossipType.blob_sidecar]: {type: GossipType.blob_sidecar; index: number};
[GossipType.data_column_sidecar]: {type: GossipType.data_column_sidecar; index: number};
[GossipType.beacon_aggregate_and_proof]: {type: GossipType.beacon_aggregate_and_proof};
[GossipType.beacon_attestation]: {type: GossipType.beacon_attestation; subnet: number};
[GossipType.voluntary_exit]: {type: GossipType.voluntary_exit};
Expand Down Expand Up @@ -71,6 +73,7 @@ export type SSZTypeOfGossipTopic<T extends GossipTopic> = T extends {type: infer
export type GossipTypeMap = {
[GossipType.beacon_block]: allForks.SignedBeaconBlock;
[GossipType.blob_sidecar]: deneb.BlobSidecar;
[GossipType.data_column_sidecar]: electra.DataColumnSidecar;
[GossipType.beacon_aggregate_and_proof]: phase0.SignedAggregateAndProof;
[GossipType.beacon_attestation]: phase0.Attestation;
[GossipType.voluntary_exit]: phase0.SignedVoluntaryExit;
Expand All @@ -86,6 +89,7 @@ export type GossipTypeMap = {
export type GossipFnByType = {
[GossipType.beacon_block]: (signedBlock: allForks.SignedBeaconBlock) => Promise<void> | void;
[GossipType.blob_sidecar]: (blobSidecar: deneb.BlobSidecar) => Promise<void> | void;
[GossipType.data_column_sidecar]: (blobSidecar: electra.DataColumnSidecar) => Promise<void> | void;
[GossipType.beacon_aggregate_and_proof]: (aggregateAndProof: phase0.SignedAggregateAndProof) => Promise<void> | void;
[GossipType.beacon_attestation]: (attestation: phase0.Attestation) => Promise<void> | void;
[GossipType.voluntary_exit]: (voluntaryExit: phase0.SignedVoluntaryExit) => Promise<void> | void;
Expand Down
20 changes: 20 additions & 0 deletions packages/beacon-node/src/network/gossip/topic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
SYNC_COMMITTEE_SUBNET_COUNT,
isForkLightClient,
MAX_BLOBS_PER_BLOCK,
DATA_COLUMN_SIDECAR_SUBNET_COUNT,
} from "@lodestar/params";

import {GossipAction, GossipActionError, GossipErrorCode} from "../../chain/errors/gossipValidation.js";
Expand Down Expand Up @@ -75,6 +76,8 @@ function stringifyGossipTopicType(topic: GossipTopic): string {
return `${topic.type}_${topic.subnet}`;
case GossipType.blob_sidecar:
return `${topic.type}_${topic.index}`;
case GossipType.data_column_sidecar:
return `${topic.type}_${topic.index}`;
}
}

Expand All @@ -86,6 +89,8 @@ export function getGossipSSZType(topic: GossipTopic) {
return ssz[topic.fork].SignedBeaconBlock;
case GossipType.blob_sidecar:
return ssz.deneb.BlobSidecar;
case GossipType.data_column_sidecar:
return ssz.electra.DataColumnSidecar;
case GossipType.beacon_aggregate_and_proof:
return ssz.phase0.SignedAggregateAndProof;
case GossipType.beacon_attestation:
Expand Down Expand Up @@ -189,6 +194,13 @@ export function parseGossipTopic(forkDigestContext: ForkDigestContext, topicStr:
return {type: GossipType.blob_sidecar, index, fork, encoding};
}

if (gossipTypeStr.startsWith(GossipType.data_column_sidecar)) {
const indexStr = gossipTypeStr.slice(GossipType.data_column_sidecar.length + 1); // +1 for '_' concatenating the topic name and the index
const index = parseInt(indexStr, 10);
if (Number.isNaN(index)) throw Error(`index ${indexStr} is not a number`);
return {type: GossipType.data_column_sidecar, index, fork, encoding};
}

throw Error(`Unknown gossip type ${gossipTypeStr}`);
} catch (e) {
(e as Error).message = `Invalid gossip topic ${topicStr}: ${(e as Error).message}`;
Expand All @@ -212,6 +224,13 @@ export function getCoreTopicsAtFork(
{type: GossipType.attester_slashing},
];

// After Electra also track data_column_sidecar_{index}
if (ForkSeq[fork] >= ForkSeq.electra) {
for (let index = 0; index < DATA_COLUMN_SIDECAR_SUBNET_COUNT; index++) {
topics.push({type: GossipType.data_column_sidecar, index});
}
}

// After Deneb also track blob_sidecar_{index}
if (ForkSeq[fork] >= ForkSeq.deneb) {
for (let index = 0; index < MAX_BLOBS_PER_BLOCK; index++) {
Expand Down Expand Up @@ -262,6 +281,7 @@ function parseEncodingStr(encodingStr: string): GossipEncoding {
export const gossipTopicIgnoreDuplicatePublishError: Record<GossipType, boolean> = {
[GossipType.beacon_block]: true,
[GossipType.blob_sidecar]: true,
[GossipType.data_column_sidecar]: true,
[GossipType.beacon_aggregate_and_proof]: true,
[GossipType.beacon_attestation]: true,
[GossipType.voluntary_exit]: true,
Expand Down
11 changes: 10 additions & 1 deletion packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
import type {AddressManager, ConnectionManager, Registrar, TransportManager} from "@libp2p/interface-internal";
import type {Datastore} from "interface-datastore";
import {Identify} from "@chainsafe/libp2p-identify";
import {Slot, SlotRootHex, allForks, altair, capella, deneb, phase0} from "@lodestar/types";
import {Slot, SlotRootHex, allForks, altair, capella, deneb, phase0, electra} from "@lodestar/types";
import {PeerIdStr} from "../util/peerId.js";
import {INetworkEventBus} from "./events.js";
import {INetworkCorePublic} from "./core/types.js";
Expand Down Expand Up @@ -57,10 +57,19 @@ export interface INetwork extends INetworkCorePublic {
): Promise<WithBytes<allForks.SignedBeaconBlock>[]>;
sendBlobSidecarsByRange(peerId: PeerIdStr, request: deneb.BlobSidecarsByRangeRequest): Promise<deneb.BlobSidecar[]>;
sendBlobSidecarsByRoot(peerId: PeerIdStr, request: deneb.BlobSidecarsByRootRequest): Promise<deneb.BlobSidecar[]>;
sendDataColumnSidecarsByRange(
peerId: PeerIdStr,
request: electra.DataColumnSidecarsByRangeRequest
): Promise<electra.DataColumnSidecar[]>;
sendDataColumnSidecarsByRoot(
peerId: PeerIdStr,
request: electra.DataColumnsSidecarByRootRequest
): Promise<electra.DataColumnSidecar[]>;

// Gossip
publishBeaconBlock(signedBlock: allForks.SignedBeaconBlock): Promise<number>;
publishBlobSidecar(blobSidecar: deneb.BlobSidecar): Promise<number>;
publishDataColumnSidecar(dataColumnSideCar: electra.DataColumnSidecar): Promise<number>;
publishBeaconAggregateAndProof(aggregateAndProof: phase0.SignedAggregateAndProof): Promise<number>;
publishBeaconAttestation(attestation: phase0.Attestation, subnet: number): Promise<number>;
publishVoluntaryExit(voluntaryExit: phase0.SignedVoluntaryExit): Promise<number>;
Expand Down
27 changes: 25 additions & 2 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import {BeaconConfig} from "@lodestar/config";
import {sleep} from "@lodestar/utils";
import {LoggerNode} from "@lodestar/logger/node";
import {computeStartSlotAtEpoch, computeTimeAtSlot} from "@lodestar/state-transition";
import {phase0, allForks, deneb, altair, Root, capella, SlotRootHex} from "@lodestar/types";
import {phase0, allForks, deneb, altair, Root, capella, SlotRootHex, electra} from "@lodestar/types";
import {routes} from "@lodestar/api";
import {ResponseIncoming} from "@lodestar/reqresp";
import {ForkSeq, MAX_BLOBS_PER_BLOCK} from "@lodestar/params";
import {ForkSeq, MAX_BLOBS_PER_BLOCK, MAX_DATA_COLUMNS_PER_BLOCK} from "@lodestar/params";
import {Metrics, RegistryMetricCreator} from "../metrics/index.js";
import {IBeaconChain} from "../chain/index.js";
import {IBeaconDb} from "../db/interface.js";
Expand Down Expand Up @@ -502,6 +502,29 @@ export class Network implements INetwork {
);
}

async sendDataColumnSidecarsByRange(
peerId: PeerIdStr,
request: electra.DataColumnSidecarsByRangeRequest
): Promise<electra.DataColumnSidecar[]> {
return collectMaxResponseTyped(
this.sendReqRespRequest(peerId, ReqRespMethod.DataColumnSidecarsByRange, [Version.V1], request),
// request's count represent the slots, so the actual max count received could be slots * blobs per slot
request.count * MAX_DATA_COLUMNS_PER_BLOCK,
responseSszTypeByMethod[ReqRespMethod.DataColumnSidecarsByRange]
);
}

async sendDataColumnSidecarsByRoot(
peerId: PeerIdStr,
request: electra.BlobSidecarsByRootRequest
): Promise<electra.DataColumnSidecar[]> {
return collectMaxResponseTyped(
this.sendReqRespRequest(peerId, ReqRespMethod.DataColumnSidecarsByRoot, [Version.V1], request),
request.length,
responseSszTypeByMethod[ReqRespMethod.DataColumnSidecarsByRoot]
);
}

private sendReqRespRequest<Req>(
peerId: PeerIdStr,
method: ReqRespMethod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
getSlotFromSignedAggregateAndProofSerialized,
getSlotFromBlobSidecarSerialized,
getSlotFromSignedBeaconBlockSerialized,
getSlotFromDataColumnSidecarSerialized,
} from "../../util/sszBytes.js";
import {GossipType} from "../gossip/index.js";
import {ExtractSlotRootFns} from "./types.js";
Expand Down Expand Up @@ -45,6 +46,14 @@ export function createExtractBlockSlotRootFns(): ExtractSlotRootFns {
[GossipType.blob_sidecar]: (data: Uint8Array): SlotOptionalRoot | null => {
const slot = getSlotFromBlobSidecarSerialized(data);

if (slot === null) {
return null;
}
return {slot};
},
[GossipType.data_column_sidecar]: (data: Uint8Array): SlotOptionalRoot | null => {
const slot = getSlotFromDataColumnSidecarSerialized(data);

if (slot === null) {
return null;
}
Expand Down
7 changes: 7 additions & 0 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,13 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
}
},

[GossipType.data_column_sidecar]: async ({
gossipData,
topic,
peerIdStr,
seenTimestampSec,
}: GossipHandlerParamGeneric<GossipType.data_column_sidecar>) => {},

[GossipType.beacon_aggregate_and_proof]: async ({
gossipData,
topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const defaultGossipQueueOpts: {
type: QueueType.FIFO,
dropOpts: {type: DropType.count, count: 1},
},
[GossipType.data_column_sidecar]: {
maxLength: 4096,
type: QueueType.FIFO,
dropOpts: {type: DropType.count, count: 1},
},
// lighthoue has aggregate_queue 4096 and unknown_block_aggregate_queue 1024, we use single queue
[GossipType.beacon_aggregate_and_proof]: {
maxLength: 5120,
Expand Down
8 changes: 7 additions & 1 deletion packages/beacon-node/src/network/processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type WorkOpts = {
const executeGossipWorkOrderObj: Record<GossipType, WorkOpts> = {
[GossipType.beacon_block]: {bypassQueue: true},
[GossipType.blob_sidecar]: {bypassQueue: true},
[GossipType.data_column_sidecar]: {bypassQueue: true},
[GossipType.beacon_aggregate_and_proof]: {},
[GossipType.voluntary_exit]: {},
[GossipType.bls_to_execution_change]: {},
Expand Down Expand Up @@ -267,7 +268,12 @@ export class NetworkProcessor {
});
return;
}
if (slot === clockSlot && (topicType === GossipType.beacon_block || topicType === GossipType.blob_sidecar)) {
if (
slot === clockSlot &&
(topicType === GossipType.beacon_block ||
topicType === GossipType.blob_sidecar ||
topicType === GossipType.data_column_sidecar)
) {
// in the worse case if the current slot block is not valid, this will be reset in the next slot
this.isProcessingCurrentSlotBlock = true;
}
Expand Down

0 comments on commit 0a896f2

Please sign in to comment.