Skip to content

Commit

Permalink
implement SSE for payload attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
tersec committed Feb 2, 2024
1 parent 16b1f03 commit a7ea635
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 31 deletions.
1 change: 1 addition & 0 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type
finalQueue*: AsyncEventQueue[FinalizationInfoObject]
reorgQueue*: AsyncEventQueue[ReorgInfoObject]
contribQueue*: AsyncEventQueue[SignedContributionAndProof]
payloadAttributesQueue*: AsyncEventQueue[PayloadAttributesInfoObject]
finUpdateQueue*: AsyncEventQueue[
RestVersioned[ForkedLightClientFinalityUpdate]]
optUpdateQueue*: AsyncEventQueue[
Expand Down
12 changes: 8 additions & 4 deletions beacon_chain/consensus_object_pools/block_dag.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type
## Root that can be used to retrieve block data from database

executionBlockHash*: Opt[Eth2Digest]
executionBlockNumber*: Opt[uint64]
executionValid*: bool

parent*: BlockRef ##\
Expand All @@ -55,19 +56,21 @@ template slot*(blck: BlockRef): Slot = blck.bid.slot

func init*(
T: type BlockRef, root: Eth2Digest,
executionBlockHash: Opt[Eth2Digest], executionValid: bool, slot: Slot):
BlockRef =
executionBlockHash: Opt[Eth2Digest], executionBlockNumber: Opt[uint64],
executionValid: bool, slot: Slot): BlockRef =
BlockRef(
bid: BlockId(root: root, slot: slot),
executionBlockHash: executionBlockHash, executionValid: executionValid)
executionBlockHash: executionBlockHash,
executionBlockNumber: executionBlockNumber, executionValid: executionValid)

func init*(
T: type BlockRef, root: Eth2Digest, executionValid: bool,
blck: phase0.SomeBeaconBlock | altair.SomeBeaconBlock |
phase0.TrustedBeaconBlock | altair.TrustedBeaconBlock): BlockRef =
# Use same formal parameters for simplicity, but it's impossible for these
# blocks to be optimistic.
BlockRef.init(root, Opt.some ZERO_HASH, executionValid = true, blck.slot)
BlockRef.init(
root, Opt.some ZERO_HASH, Opt.some 0'u64, executionValid = true, blck.slot)

func init*(
T: type BlockRef, root: Eth2Digest, executionValid: bool,
Expand All @@ -76,6 +79,7 @@ func init*(
deneb.SomeBeaconBlock | deneb.TrustedBeaconBlock): BlockRef =
BlockRef.init(
root, Opt.some Eth2Digest(blck.body.execution_payload.block_hash),
Opt.some blck.body.execution_payload.block_number,
executionValid =
executionValid or blck.body.execution_payload.block_hash == ZERO_HASH,
blck.slot)
Expand Down
31 changes: 24 additions & 7 deletions beacon_chain/consensus_object_pools/blockchain_dag.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
# `VALID` fcU from an EL plus markBlockVerified. Pre-merge blocks still get
# marked as `VALID`.
let newRef = BlockRef.init(
blck.root, Opt.none Eth2Digest, executionValid = false,
blck.root, Opt.none Eth2Digest, Opt.none uint64, executionValid = false,
blck.summary.slot)
if headRef == nil:
headRef = newRef
Expand Down Expand Up @@ -2226,21 +2226,38 @@ proc pruneHistory*(dag: ChainDAGRef, startup = false) =
if dag.db.clearBlocks(fork):
break

proc loadExecutionBlockHash*(dag: ChainDAGRef, bid: BlockId): Eth2Digest =
proc loadExecutionBlockHashAndNumber(dag: ChainDAGRef, bid: BlockId):
(Eth2Digest, uint64) =
let blockData = dag.getForkedBlock(bid).valueOr:
return ZERO_HASH
return (ZERO_HASH, 0)

withBlck(blockData):
when consensusFork >= ConsensusFork.Bellatrix:
forkyBlck.message.body.execution_payload.block_hash
(forkyBlck.message.body.execution_payload.block_hash,
forkyBlck.message.body.execution_payload.block_number)
else:
ZERO_HASH
(ZERO_HASH, 0)

proc loadExecutionBlockHash*(dag: ChainDAGRef, bid: BlockId): Eth2Digest =
let (executionBlockHash, _) = dag.loadExecutionBlockHashAndNumber(bid)
executionBlockHash

proc ensureExecutionBlockHashAndNumberLoaded(dag: ChainDAGRef, blck: BlockRef) =
# The expensive part is loading the block, not copying a few additional bytes
if blck.executionBlockHash.isNone or blck.executionBlockNumber.isNone:
let (executionBlockHash, executionBlockNumber) =
dag.loadExecutionBlockHashAndNumber(blck.bid)
blck.executionBlockHash = Opt.some executionBlockHash
blck.executionBlockNumber = Opt.some executionBlockNumber

proc loadExecutionBlockHash*(dag: ChainDAGRef, blck: BlockRef): Eth2Digest =
if blck.executionBlockHash.isNone:
blck.executionBlockHash = Opt.some dag.loadExecutionBlockHash(blck.bid)
dag.ensureExecutionBlockHashAndNumberLoaded(blck)
blck.executionBlockHash.unsafeGet

proc loadExecutionBlockNumber*(dag: ChainDAGRef, blck: BlockRef): uint64 =
dag.ensureExecutionBlockHashAndNumberLoaded(blck)
blck.executionBlockNumber.unsafeGet

from std/packedsets import PackedSet, incl, items

func getValidatorChangeStatuses(
Expand Down
15 changes: 7 additions & 8 deletions beacon_chain/fork_choice/fork_choice.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,15 @@ func extend[T](s: var seq[T], minLen: int) =

proc update_justified(
self: var Checkpoints, dag: ChainDAGRef, blck: BlockRef, epoch: Epoch) =
let
epochRef = dag.getEpochRef(blck, epoch, false).valueOr:
# Shouldn't happen for justified data unless out of sync with ChainDAG
warn "Skipping justified checkpoint update, no EpochRef - report bug",
blck, epoch, error
return
justified = Checkpoint(root: blck.root, epoch: epochRef.epoch)
let epochRef = dag.getEpochRef(blck, epoch, false).valueOr:
# Shouldn't happen for justified data unless out of sync with ChainDAG
warn "Skipping justified checkpoint update, no EpochRef - report bug",
blck, epoch, error
return

trace "Updating justified",
store = self.justified.checkpoint, state = justified
store = self.justified.checkpoint,
state = Checkpoint(root: blck.root, epoch: epochRef.epoch)
self.justified = BalanceCheckpoint(
checkpoint: Checkpoint(root: blck.root, epoch: epochRef.epoch),
total_active_balance: epochRef.total_active_balance,
Expand Down
69 changes: 66 additions & 3 deletions beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import
stew/results,
chronicles, chronos, metrics,
../spec/[signatures, signatures_batch],
../spec/[
eth2_apis/eth2_rest_serialization, signatures, signatures_batch],
../sszdump

from std/deques import Deque, addLast, contains, initDeque, items, len, shrink
Expand All @@ -21,7 +22,7 @@ from ../consensus_object_pools/consensus_manager import
updateHeadWithExecution
from ../consensus_object_pools/blockchain_dag import
getBlockRef, getProposer, forkAtEpoch, loadExecutionBlockHash,
markBlockVerified, validatorKey
loadExecutionBlockNumber, markBlockVerified, validatorKey
from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds
from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot
from ../consensus_object_pools/block_pools_types import
Expand Down Expand Up @@ -66,6 +67,9 @@ type
validationDur*: Duration # Time it took to perform gossip validation
src*: MsgSource

OnPayloadAttributesCallback =
proc(data: PayloadAttributesInfoObject) {.gcsafe, raises: [].}

BlockProcessor* = object
## This manages the processing of blocks from different sources
## Blocks and attestations are enqueued in a gossip-validated state
Expand Down Expand Up @@ -109,6 +113,10 @@ type
## The slot at which we sent a payload to the execution client the last
## time

# SSE callback
# ----------------------------------------------------------------
onPayloadAttributesAdded: OnPayloadAttributesCallback

NewPayloadStatus {.pure.} = enum
valid
notValid
Expand All @@ -129,7 +137,9 @@ proc new*(T: type BlockProcessor,
consensusManager: ref ConsensusManager,
validatorMonitor: ref ValidatorMonitor,
blobQuarantine: ref BlobQuarantine,
getBeaconTime: GetBeaconTimeFn): ref BlockProcessor =
getBeaconTime: GetBeaconTimeFn,
onPayloadAttributesAdded: OnPayloadAttributesCallback):
ref BlockProcessor =
(ref BlockProcessor)(
dumpEnabled: dumpEnabled,
dumpDirInvalid: dumpDirInvalid,
Expand All @@ -139,6 +149,7 @@ proc new*(T: type BlockProcessor,
validatorMonitor: validatorMonitor,
blobQuarantine: blobQuarantine,
getBeaconTime: getBeaconTime,
onPayloadAttributesAdded: onPayloadAttributesAdded,
verifier: BatchVerifier.init(rng, taskpool)
)

Expand Down Expand Up @@ -407,6 +418,52 @@ proc enqueueBlock*(
except AsyncQueueFullError:
raiseAssert "unbounded queue"

proc sendNewPayloadAttributeNotification(
self: BlockProcessor, wallSlot: Slot, newHead: BlockRef) =
if not(isNil(self.onPayloadAttributesAdded)):
# https://github.com/ethereum/beacon-APIs/blob/v2.4.2/apis/eventstream/index.yaml#L95-L124
# `payload_attributes`: beacon API encoding of `PayloadAttributesV<N>` as
# defined by the `execution-apis` specification. The version `N` must match
# the payload attributes for the hard fork matching `version`.
#
# The beacon API encoded object must have equivalent fields to its
# counterpart in `execution-apis` with two differences: 1) `snake_case`
# identifiers must be used rather than `camelCase`; 2) integers must be
# encoded as quoted decimals rather than big-endian hex.
let
proposal_slot = wallSlot + 1
payload_attributes =
case self.consensusManager.dag.cfg.consensusForkAtEpoch(
proposal_slot.epoch)
of ConsensusFork.Deneb:
RestJson.encode(RestPayloadAttributesV3(
timestamp: 0'u64,
prev_randao: ZERO_HASH,
suggested_fee_recipient: default(Eth1Address),
withdrawals: @[],
parent_beacon_block_root: ZERO_HASH))
of ConsensusFork.Capella:
RestJson.encode(RestPayloadAttributesV2(
timestamp: 0'u64,
prev_randao: ZERO_HASH,
suggested_fee_recipient: default(Eth1Address),
withdrawals: @[]))
of ConsensusFork.Phase0 .. ConsensusFork.Bellatrix:
RestJson.encode(RestPayloadAttributesV1(
timestamp: 0'u64,
prev_randao: ZERO_HASH,
suggested_fee_recipient: default(Eth1Address)))

self.onPayloadAttributesAdded(PayloadAttributesInfoObject(
proposal_slot: proposal_slot,
parent_block_root: newHead.root,
parent_block_number:
self.consensusManager.dag.loadExecutionBlockNumber(newHead),
parent_block_hash:
self.consensusManager.dag.loadExecutionBlockHash(newHead),
proposer_index: 0'u64, # FIXME
payload_attributes: payload_attributes))

proc storeBlock(
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
signedBlock: ForkySignedBeaconBlock,
Expand Down Expand Up @@ -654,6 +711,7 @@ proc storeBlock(
headExecutionPayloadHash =
dag.loadExecutionBlockHash(newHead.get.blck)
wallSlot = self.getBeaconTime().slotOrZero

if headExecutionPayloadHash.isZero or
NewPayloadStatus.noResponse == payloadStatus:
# Blocks without execution payloads can't be optimistic, and don't try
Expand Down Expand Up @@ -682,6 +740,8 @@ proc storeBlock(
ConsensusFork.Bellatrix:
callExpectValidFCU(payloadAttributeType = PayloadAttributesV1)

self[].sendNewPayloadAttributeNotification(wallSlot, newHead.get.blck)

if self.consensusManager.checkNextProposer(wallSlot).isNone:
# No attached validator is next proposer, so use non-proposal fcU
callForkChoiceUpdated()
Expand All @@ -695,6 +755,9 @@ proc storeBlock(
else:
await self.consensusManager.updateHeadWithExecution(
newHead.get, self.getBeaconTime)

self[].sendNewPayloadAttributeNotification(wallSlot, newHead.get.blck)

else:
warn "Head selection failed, using previous head",
head = shortLog(dag.head), wallSlot
Expand Down
4 changes: 0 additions & 4 deletions beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,6 @@ template checkedReject(
pool: ValidatorChangePool, error: ValidationError): untyped =
pool.dag.checkedReject(error)

template checkedResult(
pool: ValidatorChangePool, error: ValidationError): untyped =
pool.dag.checkedResult(error)

template validateBeaconBlockBellatrix(
signed_beacon_block: phase0.SignedBeaconBlock | altair.SignedBeaconBlock,
parent: BlockRef): untyped =
Expand Down
4 changes: 3 additions & 1 deletion beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ proc initFullNode(
node.eventBus.propSlashQueue.emit(data)
proc onAttesterSlashingAdded(data: AttesterSlashing) =
node.eventBus.attSlashQueue.emit(data)
proc onPayloadAttributesAdded(data: PayloadAttributesInfoObject) =
node.eventBus.payloadAttributesQueue.emit(data)
proc onBlobSidecarAdded(data: BlobSidecar) =
node.eventBus.blobSidecarQueue.emit(
BlobSidecarInfoObject(
Expand Down Expand Up @@ -349,7 +351,7 @@ proc initFullNode(
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, node.validatorMonitor,
blobQuarantine, getBeaconTime)
blobQuarantine, getBeaconTime, onPayloadAttributesAdded)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool):
Future[Result[void, VerifierError]] =
Expand Down
4 changes: 4 additions & 0 deletions beacon_chain/rpc/rest_event_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) =
let handler = response.eventHandler(node.eventBus.attSlashQueue,
"attester_slashing")
res.add(handler)
if EventTopic.PayloadAttributes in eventTopics:
let handler = response.eventHandler(node.eventBus.payloadAttributesQueue,
"payload_attributes")
res.add(handler)
if EventTopic.BlobSidecar in eventTopics:
let handler = response.eventHandler(node.eventBus.blobSidecarQueue,
"blob_sidecar")
Expand Down
10 changes: 10 additions & 0 deletions beacon_chain/spec/datatypes/deneb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -822,3 +822,13 @@ template asTrusted*(
SigVerifiedSignedBeaconBlock |
MsgTrustedSignedBeaconBlock): TrustedSignedBeaconBlock =
isomorphicCast[TrustedSignedBeaconBlock](x)

type
# https://github.com/ethereum/beacon-APIs/blob/v2.4.2/apis/eventstream/index.yaml#L95-L124
PayloadAttributesInfoObject* = object
proposal_slot*: Slot
parent_block_root*: Eth2Digest
parent_block_number*: uint64
parent_block_hash*: Eth2Digest
proposer_index*: uint64
payload_attributes*: string
12 changes: 12 additions & 0 deletions beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ RestJson.useDefaultSerializationFor(
KeystoreInfo,
ListFeeRecipientResponse,
ListGasLimitResponse,
PayloadAttributesInfoObject,
PendingAttestation,
PostKeystoresResponse,
PrepareBeaconProposer,
Expand Down Expand Up @@ -141,6 +142,9 @@ RestJson.useDefaultSerializationFor(
RestNodeExtraData,
RestNodePeer,
RestNodeVersion,
RestPayloadAttributesV1,
RestPayloadAttributesV2,
RestPayloadAttributesV3,
RestPeerCount,
RestProposerDuty,
RestRoot,
Expand Down Expand Up @@ -307,8 +311,12 @@ type
ImportDistributedKeystoresBody |
ImportRemoteKeystoresBody |
KeystoresAndSlashingProtection |
PayloadAttributesInfoObject |
PrepareBeaconProposer |
ProposerSlashing |
RestPayloadAttributesV1 |
RestPayloadAttributesV2 |
RestPayloadAttributesV3 |
SetFeeRecipientRequest |
SetGasLimitRequest |
bellatrix_mev.SignedBlindedBeaconBlock |
Expand Down Expand Up @@ -4175,6 +4183,8 @@ proc decodeString*(t: typedesc[EventTopic],
ok(EventTopic.ProposerSlashing)
of "attester_slashing":
ok(EventTopic.AttesterSlashing)
of "payload_attributes":
ok(EventTopic.PayloadAttributes)
of "blob_sidecar":
ok(EventTopic.BlobSidecar)
of "finalized_checkpoint":
Expand Down Expand Up @@ -4206,6 +4216,8 @@ proc encodeString*(value: set[EventTopic]): Result[string, cstring] =
res.add("proposer_slashing,")
if EventTopic.AttesterSlashing in value:
res.add("attester_slashing,")
if EventTopic.PayloadAttributes in value:
res.add("payload_attributes,")
if EventTopic.BlobSidecar in value:
res.add("blob_sidecar,")
if EventTopic.FinalizedCheckpoint in value:
Expand Down

0 comments on commit a7ea635

Please sign in to comment.