Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add historical state regen #6033

Open
wants to merge 22 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
583bc18
feat: add historical state regen
wemeetagain Oct 11, 2023
8c1790b
chore: wire up metrics
wemeetagain Oct 11, 2023
93c1981
chore: make historical state regen module optional
wemeetagain Oct 13, 2023
06e58af
chore: persist pubkey cache across historical state regen runs
wemeetagain Oct 13, 2023
623b58c
chore: cleanup worker termination
wemeetagain Oct 13, 2023
deba946
Merge branch 'unstable' into cayman/historical-state-regen
wemeetagain Oct 13, 2023
b8d6113
chore: fix worker usage
wemeetagain Oct 23, 2023
0315e1a
fix: swap Level for ClassicLevel for multithreading
matthewkeil Nov 6, 2023
cdf3732
fix: getStateV2 state handling hack
matthewkeil Nov 6, 2023
a8f719d
Merge branch 'unstable' into cayman/historical-state-regen
wemeetagain Jan 22, 2024
6d9d256
chore: update classic-level
wemeetagain Jan 22, 2024
5a812e1
chore: fix build errors
wemeetagain Jan 22, 2024
dabd767
chore: add comments
wemeetagain Jan 22, 2024
51fc29a
chore: fix test worker path
wemeetagain Jan 22, 2024
21bb96b
chore: simplify function naming
wemeetagain Jan 22, 2024
dc45097
chore: optimize getSlotFromOffset
wemeetagain Jan 22, 2024
d32f103
chore: refactor to avoid needless deserialization
wemeetagain Jan 22, 2024
25b1c9f
fix: update metrics names
wemeetagain Jan 22, 2024
b6fd577
feat: add historical state regen dashboard
wemeetagain Jan 22, 2024
e8c9d27
fix: update vm dashboards with historical state worker
wemeetagain Jan 22, 2024
03a241c
chore: fix test data
wemeetagain Jan 22, 2024
6001521
feat: transfer state across worker boundary
wemeetagain Jan 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 11 additions & 4 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import {BlockAttributes, produceBlockBody} from "./produceBlock/produceBlockBody
import {computeNewStateRoot} from "./produceBlock/computeNewStateRoot.js";
import {BlockInput} from "./blocks/types.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {HistoricalStateRegen} from "./historicalState/index.js";

/**
* Arbitrary constants, blobs should be consumed immediately in the same slot they are produced.
Expand Down Expand Up @@ -105,6 +106,7 @@ export class BeaconChain implements IBeaconChain {
readonly regen: QueuedStateRegenerator;
readonly lightClientServer: LightClientServer;
readonly reprocessController: ReprocessController;
readonly historicalStateRegen: HistoricalStateRegen;

// Ops pool
readonly attestationPool: AttestationPool;
Expand Down Expand Up @@ -162,6 +164,7 @@ export class BeaconChain implements IBeaconChain {
eth1,
executionEngine,
executionBuilder,
historicalStateRegen,
}: {
config: BeaconConfig;
db: IBeaconDb;
Expand All @@ -174,6 +177,7 @@ export class BeaconChain implements IBeaconChain {
eth1: IEth1ForBlockProduction;
executionEngine: IExecutionEngine;
executionBuilder?: IExecutionBuilder;
historicalStateRegen: HistoricalStateRegen;
}
) {
this.opts = opts;
Expand All @@ -188,6 +192,7 @@ export class BeaconChain implements IBeaconChain {
this.eth1 = eth1;
this.executionEngine = executionEngine;
this.executionBuilder = executionBuilder;
this.historicalStateRegen = historicalStateRegen;
const signal = this.abortController.signal;
const emitter = new ChainEventEmitter();
// by default, verify signatures on both main threads and worker threads
Expand Down Expand Up @@ -390,11 +395,13 @@ export class BeaconChain implements IBeaconChain {
return state && {state, executionOptimistic: isOptimisticBlock(block)};
}
} else {
// request for finalized state
// request for finalized state using historical state regen
const stateSerialized = await this.historicalStateRegen.getHistoricalState(slot);
const state = this.config
.getForkTypes(slot)
.BeaconState.deserialize(stateSerialized) as unknown as BeaconStateAllForks;

// do not attempt regen, just check if state is already in DB
const state = await this.db.stateArchive.get(slot);
return state && {state, executionOptimistic: false};
return {state, executionOptimistic: false};
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import {
BeaconStateTransitionMetrics,
CachedBeaconStateAllForks,
DataAvailableStatus,
ExecutionPayloadStatus,
PubkeyIndexMap,
createCachedBeaconState,
stateTransition,
} from "@lodestar/state-transition";
import {SignedBeaconBlock} from "@lodestar/types/allForks";
import {BeaconConfig} from "@lodestar/config";
import {IBeaconDb} from "../../db/index.js";

export async function getClosestState(
slot: number,
config: BeaconConfig,
db: IBeaconDb
): Promise<CachedBeaconStateAllForks> {
const states = await db.stateArchive.values({limit: 1, lte: slot, reverse: true});
if (!states.length) {
throw new Error("No close state found in the database");
}
return createCachedBeaconState(states[0], {
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
config,
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
});
}

export function getBlocksBetween(from: number, to: number, db: IBeaconDb): AsyncIterable<SignedBeaconBlock> {
return db.blockArchive.valuesStream({gt: from, lte: to});
}
Comment on lines +62 to +64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason you broke this out? Can we just directly call db.blockArchive.valuesStream in the for await below and save a stack frame?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason, just started the implementation of getHistoricalState in pseudocode and this stuck around. I can remove


export async function getHistoricalState(
slot: number,
config: BeaconConfig,
db: IBeaconDb,
metrics?: BeaconStateTransitionMetrics
): Promise<Uint8Array> {
let state = await getClosestState(slot, config, db);
for await (const block of getBlocksBetween(state.slot, slot, db)) {
state = stateTransition(
state,
block,
{
verifyProposer: false,
verifySignatures: false,
verifyStateRoot: false,
executionPayloadStatus: ExecutionPayloadStatus.valid,
dataAvailableStatus: DataAvailableStatus.available,
},
metrics
);
}
return state.serialize();
}
72 changes: 72 additions & 0 deletions packages/beacon-node/src/chain/historicalState/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import {ModuleThread, spawn} from "@chainsafe/threads";
import {chainConfigToJson} from "@lodestar/config";
import {LoggerNode} from "@lodestar/logger/node";
import {terminateWorkerThread} from "../../util/workerEvents.js";
import {
HistoricalStateRegenInitModules,
HistoricalStateRegenModules,
HistoricalStateWorkerApi,
HistoricalStateWorkerData,
} from "./types.js";

const HISTORICAL_STATE_WORKER_EXIT_RETRY_COUNT = 3;
const HISTORICAL_STATE_WORKER_EXIT_TIMEOUT_MS = 1000;

/**
* HistoricalStateRegen limits the damage from recreating historical states
* by running regen in a separate worker thread.
*/
export class HistoricalStateRegen implements HistoricalStateWorkerApi {
private readonly api: ModuleThread<HistoricalStateWorkerApi>;
private readonly logger: LoggerNode;

constructor(modules: HistoricalStateRegenModules) {
this.api = modules.api;
this.logger = modules.logger;
}
static async init(modules: HistoricalStateRegenInitModules): Promise<HistoricalStateRegen> {
const workerData: HistoricalStateWorkerData = {
chainConfigJson: chainConfigToJson(modules.config),
genesisValidatorsRoot: modules.config.genesisValidatorsRoot,
genesisTime: modules.opts.genesisTime,
maxConcurrency: 1,
maxLength: 50,
dbLocation: modules.opts.dbLocation,
metricsEnabled: Boolean(modules.metrics),
loggerOpts: modules.logger.toOpts(),
};

const worker = new Worker("./worker.js", {
workerData,
} as ConstructorParameters<typeof Worker>[1]);

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't notice that this needs to be here. Deleted with no issue

const api = await spawn<HistoricalStateWorkerApi>(worker, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
});

return new HistoricalStateRegen({...modules, api});
}

async scrapeMetrics(): Promise<string> {
return this.api.scrapeMetrics();
}

async close(): Promise<void> {
await this.api.close();
this.logger.debug("Terminating historical state worker");
await terminateWorkerThread({
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
worker: this.api,
retryCount: HISTORICAL_STATE_WORKER_EXIT_RETRY_COUNT,
retryMs: HISTORICAL_STATE_WORKER_EXIT_TIMEOUT_MS,
logger: this.logger,
});
this.logger.debug("Terminated historical state worker");
}

async getHistoricalState(slot: number): Promise<Uint8Array> {
return this.api.getHistoricalState(slot);
}
}
34 changes: 34 additions & 0 deletions packages/beacon-node/src/chain/historicalState/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import {ModuleThread} from "@chainsafe/threads";
import {BeaconConfig} from "@lodestar/config";
import {LoggerNode, LoggerNodeOpts} from "@lodestar/logger/node";
import {Metrics} from "../../metrics/index.js";

export type HistoricalStateRegenInitModules = {
opts: {
genesisTime: number;
dbLocation: string;
};
config: BeaconConfig;
logger: LoggerNode;
metrics: Metrics | null;
};
export type HistoricalStateRegenModules = HistoricalStateRegenInitModules & {
api: ModuleThread<HistoricalStateWorkerApi>;
};

export type HistoricalStateWorkerData = {
chainConfigJson: Record<string, string>;
genesisValidatorsRoot: Uint8Array;
genesisTime: number;
maxConcurrency: number;
maxLength: number;
dbLocation: string;
metricsEnabled: boolean;
loggerOpts: LoggerNodeOpts;
};

export type HistoricalStateWorkerApi = {
close(): Promise<void>;
scrapeMetrics(): Promise<string>;
getHistoricalState(slot: number): Promise<Uint8Array>;
};
158 changes: 158 additions & 0 deletions packages/beacon-node/src/chain/historicalState/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import worker from "node:worker_threads";
import {expose} from "@chainsafe/threads";
import {createBeaconConfig, chainConfigFromJson} from "@lodestar/config";
import {getNodeLogger} from "@lodestar/logger/node";
import {BeaconStateTransitionMetrics} from "@lodestar/state-transition";
import {LevelDbController} from "@lodestar/db";
import {RegistryMetricCreator, collectNodeJSMetrics} from "../../metrics/index.js";
import {JobFnQueue} from "../../util/queue/fnQueue.js";
import {QueueMetrics} from "../../util/queue/options.js";
import {BeaconDb} from "../../db/index.js";
import {HistoricalStateWorkerApi, HistoricalStateWorkerData} from "./types.js";
import {getHistoricalState as _getHistoricalState} from "./getHistoricalState.js";

// most of this setup copied from networkCoreWorker.ts

const workerData = worker.workerData as HistoricalStateWorkerData;

// TODO: Pass options from main thread for logging
// TODO: Logging won't be visible in file loggers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in we will not see this in our logs on hetzner?

const logger = getNodeLogger(workerData.loggerOpts);

logger.info("Historical state worker started");

const config = createBeaconConfig(chainConfigFromJson(workerData.chainConfigJson), workerData.genesisValidatorsRoot);

const db = new BeaconDb(config, await LevelDbController.create({name: workerData.dbLocation}, {logger}));

const abortController = new AbortController();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

36vli2-723574080

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we actually need this AbortController here if it will not be synchronized with the main thread. The kill signal comes from the outside


// Set up metrics, nodejs, state transition, queue
const metricsRegister = workerData.metricsEnabled ? new RegistryMetricCreator() : null;
let stateTransitionMetrics: BeaconStateTransitionMetrics | undefined;
let queueMetrics: QueueMetrics | undefined;
if (metricsRegister) {
const closeMetrics = collectNodeJSMetrics(metricsRegister, "lodestar_historical_state_worker_");
abortController.signal.addEventListener("abort", closeMetrics, {once: true});

stateTransitionMetrics = {
epochTransitionTime: metricsRegister.histogram({
name: "lodestar_historical_state_stfn_epoch_transition_seconds",
Comment on lines +41 to +46
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't lodestar_historical_state_worker_ get prepended to all of these metric names? If so we will have some super long names and the lodestar_historical_state_ will be redundant on all of them

help: "Time to process a single epoch transition in seconds",
// Epoch transitions are 100ms on very fast clients, and average 800ms on heavy networks
buckets: [0.01, 0.05, 0.1, 0.2, 0.5, 0.75, 1, 1.25, 1.5, 3, 10],
}),
epochTransitionCommitTime: metricsRegister.histogram({
name: "lodestar_historical_state_stfn_epoch_transition_commit_seconds",
help: "Time to call commit after process a single epoch transition in seconds",
buckets: [0.01, 0.05, 0.1, 0.2, 0.5, 0.75, 1],
}),
processBlockTime: metricsRegister.histogram({
name: "lodestar_historical_state_stfn_process_block_seconds",
help: "Time to process a single block in seconds",
// TODO: Add metrics for each step
// Block processing can take 5-40ms, 100ms max
buckets: [0.005, 0.01, 0.02, 0.05, 0.1, 1],
}),
processBlockCommitTime: metricsRegister.histogram({
name: "lodestar_historical_state_stfn_process_block_commit_seconds",
help: "Time to call commit after process a single block in seconds",
buckets: [0.005, 0.01, 0.02, 0.05, 0.1, 1],
}),
stateHashTreeRootTime: metricsRegister.histogram({
name: "lodestar_stfn_hash_tree_root_seconds",
help: "Time to compute the hash tree root of a post state in seconds",
buckets: [0.005, 0.01, 0.02, 0.05, 0.1, 1],
}),
preStateBalancesNodesPopulatedMiss: metricsRegister.gauge<"source">({
name: "lodestar_historical_state_stfn_balances_nodes_populated_miss_total",
help: "Total count state.balances nodesPopulated is false on stfn",
labelNames: ["source"],
}),
preStateBalancesNodesPopulatedHit: metricsRegister.gauge<"source">({
name: "lodestar_historical_state_stfn_balances_nodes_populated_hit_total",
help: "Total count state.balances nodesPopulated is true on stfn",
labelNames: ["source"],
}),
preStateValidatorsNodesPopulatedMiss: metricsRegister.gauge<"source">({
name: "lodestar_historical_state_stfn_validators_nodes_populated_miss_total",
help: "Total count state.validators nodesPopulated is false on stfn",
labelNames: ["source"],
}),
preStateValidatorsNodesPopulatedHit: metricsRegister.gauge<"source">({
name: "lodestar_historical_state_stfn_validators_nodes_populated_hit_total",
help: "Total count state.validators nodesPopulated is true on stfn",
labelNames: ["source"],
}),
preStateClonedCount: metricsRegister.histogram({
name: "lodestar_historical_state_stfn_state_cloned_count",
help: "Histogram of cloned count per state every time state.clone() is called",
buckets: [1, 2, 5, 10, 50, 250],
}),
postStateBalancesNodesPopulatedHit: metricsRegister.gauge({
name: "lodestar_historical_state_stfn_post_state_balances_nodes_populated_hit_total",
help: "Total count state.validators nodesPopulated is true on stfn for post state",
}),
postStateBalancesNodesPopulatedMiss: metricsRegister.gauge({
name: "lodestar_historical_state_stfn_post_state_balances_nodes_populated_miss_total",
help: "Total count state.validators nodesPopulated is false on stfn for post state",
}),
postStateValidatorsNodesPopulatedHit: metricsRegister.gauge({
name: "lodestar_historical_state_stfn_post_state_validators_nodes_populated_hit_total",
help: "Total count state.validators nodesPopulated is true on stfn for post state",
}),
postStateValidatorsNodesPopulatedMiss: metricsRegister.gauge({
name: "lodestar_historical_state_stfn_post_state_validators_nodes_populated_miss_total",
help: "Total count state.validators nodesPopulated is false on stfn for post state",
}),
registerValidatorStatuses: () => {},
};

queueMetrics = {
length: metricsRegister.gauge({
name: "lodestar_historical_state_queue_length",
help: "Count of total regen queue length",
}),
droppedJobs: metricsRegister.gauge({
name: "lodestar_historical_state_queue_dropped_jobs_total",
help: "Count of total regen queue dropped jobs",
}),
jobTime: metricsRegister.histogram({
name: "lodestar_historical_state_queue_job_time_seconds",
help: "Time to process regen queue job in seconds",
buckets: [0.01, 0.1, 1, 10, 100],
}),
jobWaitTime: metricsRegister.histogram({
name: "lodestar_historical_state_queue_job_wait_time_seconds",
help: "Time from job added to the regen queue to starting in seconds",
buckets: [0.01, 0.1, 1, 10, 100],
}),
concurrency: metricsRegister.gauge({
name: "lodestar_historical_state_queue_concurrency",
help: "Current concurrency of regen queue",
}),
};
}

const queue = new JobFnQueue(
{
maxConcurrency: workerData.maxConcurrency,
maxLength: workerData.maxLength,
signal: abortController.signal,
},
queueMetrics
);

const api: HistoricalStateWorkerApi = {
async close() {
abortController.abort();
},
async scrapeMetrics() {
return metricsRegister?.metrics() ?? "";
},
async getHistoricalState(slot) {
return queue.push(() => _getHistoricalState(slot, config, db, stateTransitionMetrics));
},
};

expose(api);