Skip to content

Commit

Permalink
further handle datacolumns sync by range by root and forkaware data h…
Browse files Browse the repository at this point in the history
…andling
  • Loading branch information
g11tech committed Apr 29, 2024
1 parent 17ab496 commit 393ef2f
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 30 deletions.
11 changes: 9 additions & 2 deletions packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {ForkName} from "@lodestar/params";
import {toHex} from "@lodestar/utils";
import {electra, ssz} from "@lodestar/types";
import {BeaconChain} from "../chain.js";
import {BlockInput, BlockInputType} from "./types.js";

Expand Down Expand Up @@ -47,11 +48,14 @@ export async function writeBlockInputToDb(this: BeaconChain, blocksInput: BlockI
});
} else {
const {numColumns, custodyColumns, dataColumns: dataColumnSidecars} = blockData;
const blobsLen = (block.message as electra.BeaconBlock).body.blobKzgCommitments.length;
const columnsSize = ssz.electra.Cell.fixedSize * blobsLen;
fnPromises.push(
this.db.blobSidecars.add({
this.db.dataColumnSidecars.add({
blockRoot,
slot: block.message.slot,
numColumns,
columnsSize,
custodyColumns,
dataColumnSidecars,
})
Expand Down Expand Up @@ -92,7 +96,10 @@ export async function removeEagerlyPersistedBlockInputs(this: BeaconChain, block
blobsToRemove.push({blockRoot, slot, blobSidecars});
} else {
const {numColumns, custodyColumns, dataColumns: dataColumnSidecars} = blockData;
dataColumnsToRemove.push({blockRoot, slot, numColumns, custodyColumns, dataColumnSidecars});
const blobsLen = (block.message as electra.BeaconBlock).body.blobKzgCommitments.length;
const columnsSize = ssz.electra.Cell.fixedSize * blobsLen;

dataColumnsToRemove.push({blockRoot, slot, numColumns, columnsSize, custodyColumns, dataColumnSidecars});
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions packages/beacon-node/src/chain/errors/dataColumnSidecarError.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import {Slot} from "@lodestar/types";
import {Slot, RootHex} from "@lodestar/types";
import {GossipActionError} from "./gossipValidation.js";

export enum DataColumnSidecarErrorCode {
INVALID_INDEX = "BLOB_SIDECAR_ERROR_INVALID_INDEX",
INVALID_INDEX = "DATA_COLUMN_SIDECAR_ERROR_INVALID_INDEX",

// following errors are adapted from the block errors
FUTURE_SLOT = "BLOB_SIDECAR_ERROR_FUTURE_SLOT",
FUTURE_SLOT = "DATA_COLUMN_SIDECAR_ERROR_FUTURE_SLOT",
PARENT_UNKNOWN = "DATA_COLUMN_SIDECAR_ERROR_PARENT_UNKNOWN",
}

export type DataColumnSidecarErrorType =
| {code: DataColumnSidecarErrorCode.INVALID_INDEX; columnIndex: number; gossipIndex: number}
| {code: DataColumnSidecarErrorCode.FUTURE_SLOT; blockSlot: Slot; currentSlot: Slot};
| {code: DataColumnSidecarErrorCode.FUTURE_SLOT; blockSlot: Slot; currentSlot: Slot}
| {code: DataColumnSidecarErrorCode.PARENT_UNKNOWN; parentRoot: RootHex};

export class DataColumnSidecarGossipError extends GossipActionError<DataColumnSidecarErrorType> {}
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ export class SeenGossipBlockInput {
const {dataColumnsCache} = cachedData;

// block is available, check if all blobs have shown up
const {slot, body} = signedBlock.message;
const {blobKzgCommitments} = body as deneb.BeaconBlockBody;
const {slot} = signedBlock.message;
const blockInfo = `blockHex=${blockHex}, slot=${slot}`;

if (NUMBER_OF_COLUMNS < dataColumnsCache.size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const dataColumnSidecarsWrapperSsz = new ContainerType(
{
blockRoot: ssz.Root,
slot: ssz.Slot,
numColumns: ssz.Uint8,
columnsSize: ssz.Uint8,
// each byte[i] tells what index (1 based) the column i is stored, 0 means not custodied
custodyColumns: new ByteVectorType(NUMBER_OF_COLUMNS),
Expand Down
6 changes: 3 additions & 3 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {toHexString} from "@chainsafe/ssz";
import {BeaconConfig, ChainForkConfig} from "@lodestar/config";
import {LogLevel, Logger, prettyBytes} from "@lodestar/utils";
import {Root, Slot, ssz, allForks, deneb, UintNum64} from "@lodestar/types";
import {Root, Slot, ssz, allForks, deneb, UintNum64, electra} from "@lodestar/types";
import {ForkName, ForkSeq} from "@lodestar/params";
import {routes} from "@lodestar/api";
import {computeTimeAtSlot} from "@lodestar/state-transition";
Expand Down Expand Up @@ -256,7 +256,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
}

async function validateBeaconDataColumn(
dataColumnSidecar: deneb.BlobSidecar,
dataColumnSidecar: electra.DataColumnSidecar,
dataColumnBytes: Uint8Array,
gossipIndex: number,
peerIdStr: string,
Expand Down Expand Up @@ -312,7 +312,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler

if (e.action === GossipAction.REJECT) {
chain.persistInvalidSszValue(
ssz.deneb.BlobSidecar,
ssz.electra.DataColumnSidecar,
dataColumnSidecar,
`gossip_reject_slot_${slot}_index_${dataColumnSidecar.index}`
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import {ChainForkConfig} from "@lodestar/config";
import {deneb, Epoch, phase0, allForks, Slot} from "@lodestar/types";
import {deneb, Epoch, phase0, allForks, Slot, electra} from "@lodestar/types";
import {ForkSeq} from "@lodestar/params";
import {computeEpochAtSlot} from "@lodestar/state-transition";

import {BlobsSource, BlockInput, BlockSource, getBlockInput, BlockInputDataBlobs} from "../../chain/blocks/types.js";
import {
BlobsSource,
BlockInput,
BlockSource,
getBlockInput,
BlockInputDataBlobs,
BlockInputDataDataColumns,
DataColumnsSource,
} from "../../chain/blocks/types.js";
import {PeerIdStr} from "../../util/peerId.js";
import {getCustodyColumnIndexes} from "../../util/dataColumns.js";
import {INetwork, WithBytes} from "../interface.js";

export async function beaconBlocksMaybeBlobsByRange(
Expand Down Expand Up @@ -125,3 +134,84 @@ export function matchBlockWithBlobs(
}
return blockInputs;
}

export function matchBlockWithDataColumns(
config: ChainForkConfig,
allBlocks: WithBytes<allForks.SignedBeaconBlock>[],
allDataColumnSidecars: electra.DataColumnSidecar[],
endSlot: Slot,
blockSource: BlockSource,
dataColumnsSource: DataColumnsSource
): BlockInput[] {
const blockInputs: BlockInput[] = [];
let dataColumnSideCarIndex = 0;
let lastMatchedSlot = -1;

// Match dataColumnSideCar with the block as some blocks would have no dataColumns and hence
// would be omitted from the response. If there are any inconsitencies in the
// response, the validations during import will reject the block and hence this
// entire segment.
//
// Assuming that the blocks and blobs will come in same sorted order
for (let i = 0; i < allBlocks.length; i++) {
const block = allBlocks[i];
const forkSeq = config.getForkSeq(block.data.message.slot);
if (forkSeq < ForkSeq.electra) {
throw Error(`Invalid block forkSeq=${forkSeq} < ForSeq.electra for matchBlockWithDataColumns`);
} else {
const dataColumnSidecars: electra.DataColumnSidecar[] = [];

let dataColumnSidecar: electra.DataColumnSidecar;
while (
(dataColumnSidecar = allDataColumnSidecars[dataColumnSideCarIndex])?.signedBlockHeader.message.slot ===
block.data.message.slot
) {
dataColumnSidecars.push(dataColumnSidecar);
lastMatchedSlot = block.data.message.slot;
dataColumnSideCarIndex++;
}

// Quick inspect how many blobSidecars was expected
const columnIndexes = getCustodyColumnIndexes(config);
const dataColumnIndexes = dataColumnSidecars.map((dataColumnSidecar) => dataColumnSidecar.index);
const custodyIndexesPresent = columnIndexes.reduce(
(acc, columnIndex) => acc && dataColumnIndexes.includes(columnIndex),
true
);

if (columnIndexes.length !== dataColumnSidecars.length || !custodyIndexesPresent) {
throw Error(
`Missing or mismatching dataColumnSidecars for blockSlot=${block.data.message.slot} with numColumns=${columnIndexes.length} dataColumnSidecars=${dataColumnSidecars.length} custodyIndexesPresent=${custodyIndexesPresent}`
);
}

const blockData = {
fork: config.getForkName(block.data.message.slot),
dataColumns: dataColumnSidecars,
dataColumnsSource,
dataColumnsBytes: Array.from({length: dataColumnSidecars.length}, () => null),
} as BlockInputDataDataColumns;

// TODO DENEB: instead of null, pass payload in bytes
blockInputs.push(getBlockInput.availableData(config, block.data, blockSource, null, blockData));
}
}

// If there are still unconsumed blobs this means that the response was inconsistent
// and matching was wrong and hence we should throw error
if (
allDataColumnSidecars[dataColumnSideCarIndex] !== undefined &&
// If there are no data columns, the data columns request can give 1 block outside the requested range
allDataColumnSidecars[dataColumnSideCarIndex].signedBlockHeader.message.slot <= endSlot
) {
throw Error(
`Unmatched blobSidecars, blocks=${allBlocks.length}, blobs=${
allDataColumnSidecars.length
} lastMatchedSlot=${lastMatchedSlot}, pending blobSidecars slots=${allDataColumnSidecars
.slice(dataColumnSideCarIndex)
.map((blb) => blb.signedBlockHeader.message.slot)
.join(",")}`
);
}
return blockInputs;
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {fromHexString} from "@chainsafe/ssz";
import {ChainForkConfig} from "@lodestar/config";
import {phase0, deneb, electra} from "@lodestar/types";
import {ForkName, ForkSeq, NUMBER_OF_COLUMNS} from "@lodestar/params";
import {ForkName, ForkSeq} from "@lodestar/params";
import {
BlockInput,
BlockInputType,
Expand All @@ -11,12 +11,14 @@ import {
NullBlockInput,
BlobsSource,
BlockInputDataBlobs,
DataColumnsSource,
} from "../../chain/blocks/types.js";
import {PeerIdStr} from "../../util/peerId.js";
import {INetwork} from "../interface.js";
import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js";
import {Metrics} from "../../metrics/index.js";
import {matchBlockWithBlobs} from "./beaconBlocksMaybeBlobsByRange.js";
import {getCustodyColumns} from "../../util/dataColumns.js";
import {matchBlockWithBlobs, matchBlockWithDataColumns} from "./beaconBlocksMaybeBlobsByRange.js";

export async function beaconBlocksMaybeBlobsByRoot(
config: ChainForkConfig,
Expand Down Expand Up @@ -47,10 +49,10 @@ export async function beaconBlocksMaybeBlobsByRoot(
} else if (ForkSeq[fork] === ForkSeq.electra) {
dataColumnsDataBlocks.push(block);
// TODO get this exported from some util
const {numColumns, custodyColumns} = getCustodyColumns("");
const custodyColumns = getCustodyColumns(config, "");
for (let i = 0; i < custodyColumns.length; i++) {
if (custodyColumns[i] === 1) {
dataColumnsDataBlocks.push({blockRoot, index: i});
dataColumnIdentifiers.push({blockRoot, index: i});
}
}
} else {
Expand All @@ -61,6 +63,7 @@ export async function beaconBlocksMaybeBlobsByRoot(
let blockInputs = preDataBlocks.map((block) =>
getBlockInput.preData(config, block.data, BlockSource.byRoot, block.bytes)
);

if (blobsDataBlocks.length > 1) {
let allBlobSidecars: deneb.BlobSidecar[];
if (blobIdentifiers.length > 0) {
Expand All @@ -83,7 +86,24 @@ export async function beaconBlocksMaybeBlobsByRoot(
}

if (dataColumnsDataBlocks.length > 1) {
// TODO get the datablocks here and push
let allDataColumnsSidecars: electra.DataColumnSidecar[];
if (dataColumnIdentifiers.length > 0) {
allDataColumnsSidecars = await network.sendDataColumnSidecarsByRoot(peerId, blobIdentifiers);
} else {
allDataColumnsSidecars = [];
}

// The last arg is to provide slot to which all blobs should be exausted in matching
// and here it should be infinity since all bobs should match
const blockInputWithBlobs = matchBlockWithDataColumns(
config,
allBlocks,
allDataColumnsSidecars,
Infinity,
BlockSource.byRoot,
DataColumnsSource.byRoot
);
blockInputs = [...blockInputs, ...blockInputWithBlobs];
}

return blockInputs;
Expand All @@ -101,7 +121,7 @@ export async function unavailableBeaconBlobsByRoot(
}

// resolve the block if thats unavailable
let block, blobsCache, blockBytes, resolveAvailability, cachedData;
let block, blockBytes, resolveAvailability, cachedData;
if (unavailableBlockInput.block === null) {
const allBlocks = await network.sendBeaconBlocksByRoot(peerId, [fromHexString(unavailableBlockInput.blockRootHex)]);
block = allBlocks[0].data;
Expand Down Expand Up @@ -157,12 +177,3 @@ export async function unavailableBeaconBlobsByRoot(

return availableBlockInput;
}

export function getCustodyColumns(_nodeId: number | string): {custodyColumns: Uint8Array; numColumns: number} {
const numColumns = NUMBER_OF_COLUMNS;
const custodyColumns = new Uint8Array(NUMBER_OF_COLUMNS);
for (let i = 0; i < NUMBER_OF_COLUMNS; i++) {
custodyColumns[i] = 1;
}
return {custodyColumns, numColumns};
}
12 changes: 12 additions & 0 deletions packages/beacon-node/src/network/reqresp/rateLimit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import {
MAX_REQUEST_LIGHT_CLIENT_UPDATES,
MAX_BLOBS_PER_BLOCK,
MAX_REQUEST_BLOB_SIDECARS,
MAX_REQUEST_BLOCKS_DENEB,
NUMBER_OF_COLUMNS,
} from "@lodestar/params";
import {InboundRateLimitQuota} from "@lodestar/reqresp";
import {ReqRespMethod, RequestBodyByMethod} from "./types.js";
Expand Down Expand Up @@ -46,6 +48,16 @@ export const rateLimitQuotas: Record<ReqRespMethod, InboundRateLimitQuota> = {
byPeer: {quota: 128 * MAX_BLOBS_PER_BLOCK, quotaTimeMs: 10_000},
getRequestCount: getRequestCountFn(ReqRespMethod.BlobSidecarsByRoot, (req) => req.length),
},
[ReqRespMethod.DataColumnSidecarsByRange]: {
// Rationale: MAX_REQUEST_BLOCKS_DENEB * NUMBER_OF_COLUMNS
byPeer: {quota: MAX_REQUEST_BLOCKS_DENEB * NUMBER_OF_COLUMNS, quotaTimeMs: 10_000},
getRequestCount: getRequestCountFn(ReqRespMethod.BlobSidecarsByRange, (req) => req.count),
},
[ReqRespMethod.DataColumnSidecarsByRoot]: {
// Rationale: quota of BeaconBlocksByRoot * NUMBER_OF_COLUMNS
byPeer: {quota: 128 * NUMBER_OF_COLUMNS, quotaTimeMs: 10_000},
getRequestCount: getRequestCountFn(ReqRespMethod.BlobSidecarsByRoot, (req) => req.length),
},
[ReqRespMethod.LightClientBootstrap]: {
// As similar in the nature of `Status` protocol so we use the same rate limits.
byPeer: {quota: 5, quotaTimeMs: 15_000},
Expand Down
16 changes: 16 additions & 0 deletions packages/beacon-node/src/util/dataColumns.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import {NUMBER_OF_COLUMNS} from "@lodestar/params";
import {ColumnIndex} from "@lodestar/types";
import {ChainForkConfig} from "@lodestar/config";

export function getCustodyColumns(config: ChainForkConfig, _nodeId: number | string): Uint8Array {
const custodyColumnIndexes = getCustodyColumnIndexes(config);
const custodyColumns = new Uint8Array(NUMBER_OF_COLUMNS);
for (const columnIndex of custodyColumnIndexes) {
custodyColumns[columnIndex] = 1;
}
return custodyColumns;
}

export function getCustodyColumnIndexes(config: ChainForkConfig): ColumnIndex[] {
return Array.from({length: config.CUSTODY_REQUIREMENT}, (_val, i) => i);
}
2 changes: 1 addition & 1 deletion packages/config/src/chainConfig/configs/mainnet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,5 @@ export const chainConfig: ChainConfig = {

// Electra
SAMPLES_PER_SLOT: 8,
CUSTODY_REQUIREMENT: 1,
CUSTODY_REQUIREMENT: 128,
};
2 changes: 1 addition & 1 deletion packages/config/src/chainConfig/configs/minimal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,5 @@ export const chainConfig: ChainConfig = {

// Electra
SAMPLES_PER_SLOT: 8,
CUSTODY_REQUIREMENT: 1,
CUSTODY_REQUIREMENT: 128,
};

0 comments on commit 393ef2f

Please sign in to comment.