-
-
Notifications
You must be signed in to change notification settings - Fork 266
/
index.ts
338 lines (291 loc) Β· 13.5 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import mitt from "mitt";
import {init as initBls} from "@chainsafe/bls/switchable";
import {fromHexString, toHexString} from "@chainsafe/ssz";
import {EPOCHS_PER_SYNC_COMMITTEE_PERIOD} from "@lodestar/params";
import {phase0, RootHex, Slot, SyncPeriod, allForks} from "@lodestar/types";
import {createBeaconConfig, BeaconConfig, ChainForkConfig} from "@lodestar/config";
import {isErrorAborted, sleep} from "@lodestar/utils";
import {getCurrentSlot, slotWithFutureTolerance, timeUntilNextEpoch} from "./utils/clock.js";
import {isNode} from "./utils/utils.js";
import {chunkifyInclusiveRange} from "./utils/chunkify.js";
import {LightclientEmitter, LightclientEvent} from "./events.js";
import {getLcLoggerConsole, ILcLogger} from "./utils/logger.js";
import {computeSyncPeriodAtEpoch, computeSyncPeriodAtSlot, computeEpochAtSlot} from "./utils/clock.js";
import {LightclientSpec} from "./spec/index.js";
import {validateLightClientBootstrap} from "./spec/validateLightClientBootstrap.js";
import {ProcessUpdateOpts} from "./spec/processLightClientUpdate.js";
import {LightClientTransport} from "./transport/interface.js";
// Re-export types
export {LightclientEvent} from "./events.js";
export type {SyncCommitteeFast} from "./types.js";
export {upgradeLightClientFinalityUpdate, upgradeLightClientOptimisticUpdate} from "./spec/utils.js";
export type GenesisData = {
genesisTime: number;
genesisValidatorsRoot: RootHex | Uint8Array;
};
export type LightclientOpts = ProcessUpdateOpts;
export type LightclientInitArgs = {
config: ChainForkConfig;
logger?: ILcLogger;
opts?: LightclientOpts;
genesisData: GenesisData;
transport: LightClientTransport;
bootstrap: allForks.LightClientBootstrap;
};
/** Provides some protection against a server client sending header updates too far away in the future */
const MAX_CLOCK_DISPARITY_SEC = 10;
/** Prevent responses that are too big and get truncated. No specific reasoning for 32 */
const MAX_PERIODS_PER_REQUEST = 32;
/** For mainnet preset 8 epochs, for minimal preset `EPOCHS_PER_SYNC_COMMITTEE_PERIOD / 2` */
const LOOKAHEAD_EPOCHS_COMMITTEE_SYNC = Math.min(8, Math.ceil(EPOCHS_PER_SYNC_COMMITTEE_PERIOD / 2));
/** Prevent infinite loops caused by sync errors */
const ON_ERROR_RETRY_MS = 1000;
// TODO: Customize with option
const ALLOW_FORCED_UPDATES = true;
export enum RunStatusCode {
uninitialized,
started,
syncing,
stopped,
}
type RunStatus =
| {code: RunStatusCode.uninitialized}
| {code: RunStatusCode.started; controller: AbortController}
| {code: RunStatusCode.syncing}
| {code: RunStatusCode.stopped};
/**
* Server-based Lightclient. Current architecture diverges from the spec's proposed updated splitting them into:
* - Sync period updates: To advance to the next sync committee
* - Header updates: To get a more recent header signed by a known sync committee
*
* To stay synced to the current sync period it needs:
* - GET lightclient/committee_updates at least once per period.
*
* To get continuous header updates:
* - subscribe to SSE type lightclient_update
*
* To initialize, it needs:
* - GenesisData: To initialize the clock and verify signatures
* - For known networks it's hardcoded in the source
* - For unknown networks it can be provided by the user with a manual input
* - For unknown test networks it can be queried from a trusted node at GET beacon/genesis
* - `beaconApiUrl`: To connect to a trustless beacon node
* - `LightclientStore`: To have an initial trusted SyncCommittee to start the sync
* - For new lightclient instances, it can be queries from a trustless node at GET lightclient/bootstrap
* - For existing lightclient instances, it should be retrieved from storage
*
* When to trigger a committee update sync:
*
* period 0 period 1 period 2
* -|----------------|----------------|----------------|-> time
* | now
* - active current_sync_committee
* - known next_sync_committee, signed by current_sync_committee
*
* - No need to query for period 0 next_sync_committee until the end of period 0
* - During most of period 0, current_sync_committee known, next_sync_committee unknown
* - At the end of period 0, get a sync committee update, and populate period 1's committee
*
* syncCommittees: Map<SyncPeriod, SyncCommittee>, limited to max of 2 items
*/
export class Lightclient {
readonly emitter: LightclientEmitter = mitt();
readonly config: BeaconConfig;
readonly logger: ILcLogger;
readonly genesisValidatorsRoot: Uint8Array;
readonly genesisTime: number;
private readonly transport: LightClientTransport;
private readonly lightclientSpec: LightclientSpec;
private runStatus: RunStatus = {code: RunStatusCode.stopped};
constructor({config, logger, genesisData, bootstrap, transport}: LightclientInitArgs) {
this.genesisTime = genesisData.genesisTime;
this.genesisValidatorsRoot =
typeof genesisData.genesisValidatorsRoot === "string"
? fromHexString(genesisData.genesisValidatorsRoot)
: genesisData.genesisValidatorsRoot;
this.config = createBeaconConfig(config, this.genesisValidatorsRoot);
this.logger = logger ?? getLcLoggerConsole();
this.transport = transport;
this.runStatus = {code: RunStatusCode.uninitialized};
this.lightclientSpec = new LightclientSpec(
this.config,
{
allowForcedUpdates: ALLOW_FORCED_UPDATES,
onSetFinalizedHeader: (header) => {
this.emitter.emit(LightclientEvent.lightClientFinalityHeader, header);
this.logger.debug("Updated state.finalizedHeader", {slot: header.beacon.slot});
},
onSetOptimisticHeader: (header) => {
this.emitter.emit(LightclientEvent.lightClientOptimisticHeader, header);
this.logger.debug("Updated state.optimisticHeader", {slot: header.beacon.slot});
},
},
bootstrap
);
}
get status(): RunStatusCode {
return this.runStatus.code;
}
// Embed lightweight clock. The epoch cycles are handled with `this.runLoop()`
get currentSlot(): number {
return getCurrentSlot(this.config, this.genesisTime);
}
static async initializeFromCheckpointRoot(
args: Omit<LightclientInitArgs, "bootstrap"> & {
checkpointRoot: phase0.Checkpoint["root"];
}
): Promise<Lightclient> {
const {transport, checkpointRoot} = args;
// Initialize the BLS implementation. This may requires initializing the WebAssembly instance
// so why it's an async process. This should be initialized once before any bls operations.
// This process has to be done manually because of an issue in Karma runner
// https://github.com/karma-runner/karma/issues/3804
await initBls(isNode ? "blst-native" : "herumi");
// Fetch bootstrap state with proof at the trusted block root
const {data: bootstrap} = await transport.getBootstrap(toHexString(checkpointRoot));
validateLightClientBootstrap(args.config, checkpointRoot, bootstrap);
return new Lightclient({...args, bootstrap});
}
/**
* @returns a `Promise` that will resolve once `LightclientEvent.statusChange` with `RunStatusCode.started` value is emitted
*/
start(): Promise<void> {
const startPromise = new Promise<void>((resolve) => {
const lightclientStarted = (status: RunStatusCode): void => {
if (status === RunStatusCode.started) {
this.emitter.off(LightclientEvent.statusChange, lightclientStarted);
resolve();
}
};
this.emitter.on(LightclientEvent.statusChange, lightclientStarted);
});
void this.runLoop();
return startPromise;
}
stop(): void {
if (this.runStatus.code !== RunStatusCode.started) return;
this.runStatus.controller.abort();
this.updateRunStatus({code: RunStatusCode.stopped});
}
getHead(): allForks.LightClientHeader {
return this.lightclientSpec.store.optimisticHeader;
}
getFinalized(): allForks.LightClientHeader {
return this.lightclientSpec.store.finalizedHeader;
}
async sync(fromPeriod: SyncPeriod, toPeriod: SyncPeriod): Promise<void> {
// Initialize the BLS implementation. This may requires initializing the WebAssembly instance
// so why it's a an async process. This should be initialized once before any bls operations.
// This process has to be done manually because of an issue in Karma runner
// https://github.com/karma-runner/karma/issues/3804
await initBls(isNode ? "blst-native" : "herumi");
const periodRanges = chunkifyInclusiveRange(fromPeriod, toPeriod, MAX_PERIODS_PER_REQUEST);
for (const [fromPeriodRng, toPeriodRng] of periodRanges) {
const count = toPeriodRng + 1 - fromPeriodRng;
const updates = await this.transport.getUpdates(fromPeriodRng, count);
for (const update of updates) {
this.processSyncCommitteeUpdate(update.data);
this.logger.debug("processed sync update", {slot: update.data.attestedHeader.beacon.slot});
// Yield to the macro queue, verifying updates is somewhat expensive and we want responsiveness
await new Promise((r) => setTimeout(r, 0));
}
}
}
private async runLoop(): Promise<void> {
// Initialize the BLS implementation. This may requires initializing the WebAssembly instance
// so why it's a an async process. This should be initialized once before any bls operations.
// This process has to be done manually because of an issue in Karma runner
// https://github.com/karma-runner/karma/issues/3804
await initBls(isNode ? "blst-native" : "herumi");
// eslint-disable-next-line no-constant-condition
while (true) {
const currentPeriod = computeSyncPeriodAtSlot(this.currentSlot);
// Check if we have a sync committee for the current clock period
if (!this.lightclientSpec.store.syncCommittees.has(currentPeriod)) {
// Stop head tracking
if (this.runStatus.code === RunStatusCode.started) {
this.runStatus.controller.abort();
}
// Go into sync mode
this.updateRunStatus({code: RunStatusCode.syncing});
const headPeriod = computeSyncPeriodAtSlot(this.getHead().beacon.slot);
this.logger.debug("Syncing", {lastPeriod: headPeriod, currentPeriod});
try {
await this.sync(headPeriod, currentPeriod);
this.logger.debug("Synced", {currentPeriod});
} catch (e) {
this.logger.error("Error sync", {}, e as Error);
// Retry in 1 second
await new Promise((r) => setTimeout(r, ON_ERROR_RETRY_MS));
continue;
}
}
// After successfully syncing, track head if not already
if (this.runStatus.code !== RunStatusCode.started) {
const controller = new AbortController();
this.updateRunStatus({code: RunStatusCode.started, controller});
this.logger.debug("Started tracking the head");
// Fetch latest optimistic head to prevent a potential 12 seconds lag between syncing and getting the first head,
// Don't retry, this is a non-critical UX improvement
try {
const update = await this.transport.getOptimisticUpdate();
this.processOptimisticUpdate(update.data);
} catch (e) {
this.logger.error("Error fetching getLatestHeadUpdate", {currentPeriod}, e as Error);
}
this.transport.onOptimisticUpdate(this.processOptimisticUpdate.bind(this));
this.transport.onFinalityUpdate(this.processFinalizedUpdate.bind(this));
}
// When close to the end of a sync period poll for sync committee updates
// Limit lookahead in case EPOCHS_PER_SYNC_COMMITTEE_PERIOD is configured to be very short
const currentEpoch = computeEpochAtSlot(this.currentSlot);
const epochsIntoPeriod = currentEpoch % EPOCHS_PER_SYNC_COMMITTEE_PERIOD;
// Start fetching updates with some lookahead
if (EPOCHS_PER_SYNC_COMMITTEE_PERIOD - epochsIntoPeriod <= LOOKAHEAD_EPOCHS_COMMITTEE_SYNC) {
const period = computeSyncPeriodAtEpoch(currentEpoch);
try {
await this.sync(period, period);
} catch (e) {
this.logger.error("Error re-syncing period", {period}, e as Error);
}
}
// Wait for the next epoch
if (this.runStatus.code !== RunStatusCode.started) {
return;
} else {
try {
await sleep(timeUntilNextEpoch(this.config, this.genesisTime), this.runStatus.controller.signal);
} catch (e) {
if (isErrorAborted(e)) {
return;
}
throw e;
}
}
}
}
/**
* Processes new optimistic header updates in only known synced sync periods.
* This headerUpdate may update the head if there's enough participation.
*/
private processOptimisticUpdate(optimisticUpdate: allForks.LightClientOptimisticUpdate): void {
this.lightclientSpec.onOptimisticUpdate(this.currentSlotWithTolerance(), optimisticUpdate);
}
/**
* Processes new header updates in only known synced sync periods.
* This headerUpdate may update the head if there's enough participation.
*/
private processFinalizedUpdate(finalizedUpdate: allForks.LightClientFinalityUpdate): void {
this.lightclientSpec.onFinalityUpdate(this.currentSlotWithTolerance(), finalizedUpdate);
}
private processSyncCommitteeUpdate(update: allForks.LightClientUpdate): void {
this.lightclientSpec.onUpdate(this.currentSlotWithTolerance(), update);
}
private currentSlotWithTolerance(): Slot {
return slotWithFutureTolerance(this.config, this.genesisTime, MAX_CLOCK_DISPARITY_SEC);
}
private updateRunStatus(runStatus: RunStatus): void {
this.runStatus = runStatus;
this.emitter.emit(LightclientEvent.statusChange, this.runStatus.code);
}
}