Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement SSE for payload attributes #5821

Draft
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type
finalQueue*: AsyncEventQueue[FinalizationInfoObject]
reorgQueue*: AsyncEventQueue[ReorgInfoObject]
contribQueue*: AsyncEventQueue[SignedContributionAndProof]
payloadAttributesQueue*: AsyncEventQueue[PayloadAttributesInfoObject]
finUpdateQueue*: AsyncEventQueue[
RestVersioned[ForkedLightClientFinalityUpdate]]
optUpdateQueue*: AsyncEventQueue[
Expand Down
12 changes: 8 additions & 4 deletions beacon_chain/consensus_object_pools/block_dag.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type
## Root that can be used to retrieve block data from database

executionBlockHash*: Opt[Eth2Digest]
executionBlockNumber*: Opt[uint64]
executionValid*: bool

parent*: BlockRef ##\
Expand All @@ -55,19 +56,21 @@ template slot*(blck: BlockRef): Slot = blck.bid.slot

func init*(
T: type BlockRef, root: Eth2Digest,
executionBlockHash: Opt[Eth2Digest], executionValid: bool, slot: Slot):
BlockRef =
executionBlockHash: Opt[Eth2Digest], executionBlockNumber: Opt[uint64],
executionValid: bool, slot: Slot): BlockRef =
BlockRef(
bid: BlockId(root: root, slot: slot),
executionBlockHash: executionBlockHash, executionValid: executionValid)
executionBlockHash: executionBlockHash,
executionBlockNumber: executionBlockNumber, executionValid: executionValid)

func init*(
T: type BlockRef, root: Eth2Digest, executionValid: bool,
blck: phase0.SomeBeaconBlock | altair.SomeBeaconBlock |
phase0.TrustedBeaconBlock | altair.TrustedBeaconBlock): BlockRef =
# Use same formal parameters for simplicity, but it's impossible for these
# blocks to be optimistic.
BlockRef.init(root, Opt.some ZERO_HASH, executionValid = true, blck.slot)
BlockRef.init(
root, Opt.some ZERO_HASH, Opt.some 0'u64, executionValid = true, blck.slot)

func init*(
T: type BlockRef, root: Eth2Digest, executionValid: bool,
Expand All @@ -76,6 +79,7 @@ func init*(
deneb.SomeBeaconBlock | deneb.TrustedBeaconBlock): BlockRef =
BlockRef.init(
root, Opt.some Eth2Digest(blck.body.execution_payload.block_hash),
Opt.some blck.body.execution_payload.block_number,
executionValid =
executionValid or blck.body.execution_payload.block_hash == ZERO_HASH,
blck.slot)
Expand Down
31 changes: 24 additions & 7 deletions beacon_chain/consensus_object_pools/blockchain_dag.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
# `VALID` fcU from an EL plus markBlockVerified. Pre-merge blocks still get
# marked as `VALID`.
let newRef = BlockRef.init(
blck.root, Opt.none Eth2Digest, executionValid = false,
blck.root, Opt.none Eth2Digest, Opt.none uint64, executionValid = false,
blck.summary.slot)
if headRef == nil:
headRef = newRef
Expand Down Expand Up @@ -2226,21 +2226,38 @@ proc pruneHistory*(dag: ChainDAGRef, startup = false) =
if dag.db.clearBlocks(fork):
break

proc loadExecutionBlockHash*(dag: ChainDAGRef, bid: BlockId): Eth2Digest =
proc loadExecutionBlockHashAndNumber(dag: ChainDAGRef, bid: BlockId):
(Eth2Digest, uint64) =
let blockData = dag.getForkedBlock(bid).valueOr:
return ZERO_HASH
return (ZERO_HASH, 0)

withBlck(blockData):
when consensusFork >= ConsensusFork.Bellatrix:
forkyBlck.message.body.execution_payload.block_hash
(forkyBlck.message.body.execution_payload.block_hash,
forkyBlck.message.body.execution_payload.block_number)
else:
ZERO_HASH
(ZERO_HASH, 0)

proc loadExecutionBlockHash*(dag: ChainDAGRef, bid: BlockId): Eth2Digest =
let (executionBlockHash, _) = dag.loadExecutionBlockHashAndNumber(bid)
executionBlockHash

proc ensureExecutionBlockHashAndNumberLoaded(dag: ChainDAGRef, blck: BlockRef) =
# The expensive part is loading the block, not copying a few additional bytes
if blck.executionBlockHash.isNone or blck.executionBlockNumber.isNone:
let (executionBlockHash, executionBlockNumber) =
dag.loadExecutionBlockHashAndNumber(blck.bid)
blck.executionBlockHash = Opt.some executionBlockHash
blck.executionBlockNumber = Opt.some executionBlockNumber

proc loadExecutionBlockHash*(dag: ChainDAGRef, blck: BlockRef): Eth2Digest =
if blck.executionBlockHash.isNone:
blck.executionBlockHash = Opt.some dag.loadExecutionBlockHash(blck.bid)
dag.ensureExecutionBlockHashAndNumberLoaded(blck)
blck.executionBlockHash.unsafeGet

proc loadExecutionBlockNumber*(dag: ChainDAGRef, blck: BlockRef): uint64 =
dag.ensureExecutionBlockHashAndNumberLoaded(blck)
blck.executionBlockNumber.unsafeGet

from std/packedsets import PackedSet, incl, items

func getValidatorChangeStatuses(
Expand Down
15 changes: 7 additions & 8 deletions beacon_chain/fork_choice/fork_choice.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,15 @@ func extend[T](s: var seq[T], minLen: int) =

proc update_justified(
self: var Checkpoints, dag: ChainDAGRef, blck: BlockRef, epoch: Epoch) =
let
epochRef = dag.getEpochRef(blck, epoch, false).valueOr:
# Shouldn't happen for justified data unless out of sync with ChainDAG
warn "Skipping justified checkpoint update, no EpochRef - report bug",
blck, epoch, error
return
justified = Checkpoint(root: blck.root, epoch: epochRef.epoch)
let epochRef = dag.getEpochRef(blck, epoch, false).valueOr:
# Shouldn't happen for justified data unless out of sync with ChainDAG
warn "Skipping justified checkpoint update, no EpochRef - report bug",
blck, epoch, error
return

trace "Updating justified",
store = self.justified.checkpoint, state = justified
store = self.justified.checkpoint,
state = Checkpoint(root: blck.root, epoch: epochRef.epoch)
self.justified = BalanceCheckpoint(
checkpoint: Checkpoint(root: blck.root, epoch: epochRef.epoch),
total_active_balance: epochRef.total_active_balance,
Expand Down
69 changes: 66 additions & 3 deletions beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import
stew/results,
chronicles, chronos, metrics,
../spec/[signatures, signatures_batch],
../spec/[
eth2_apis/eth2_rest_serialization, signatures, signatures_batch],
../sszdump

from std/deques import Deque, addLast, contains, initDeque, items, len, shrink
Expand All @@ -21,7 +22,7 @@ from ../consensus_object_pools/consensus_manager import
updateHeadWithExecution
from ../consensus_object_pools/blockchain_dag import
getBlockRef, getProposer, forkAtEpoch, loadExecutionBlockHash,
markBlockVerified, validatorKey
loadExecutionBlockNumber, markBlockVerified, validatorKey
from ../beacon_clock import GetBeaconTimeFn, toFloatSeconds
from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot
from ../consensus_object_pools/block_pools_types import
Expand Down Expand Up @@ -66,6 +67,9 @@ type
validationDur*: Duration # Time it took to perform gossip validation
src*: MsgSource

OnPayloadAttributesCallback =
proc(data: PayloadAttributesInfoObject) {.gcsafe, raises: [].}

BlockProcessor* = object
## This manages the processing of blocks from different sources
## Blocks and attestations are enqueued in a gossip-validated state
Expand Down Expand Up @@ -109,6 +113,10 @@ type
## The slot at which we sent a payload to the execution client the last
## time

# SSE callback
# ----------------------------------------------------------------
onPayloadAttributesAdded: OnPayloadAttributesCallback

NewPayloadStatus {.pure.} = enum
valid
notValid
Expand All @@ -129,7 +137,9 @@ proc new*(T: type BlockProcessor,
consensusManager: ref ConsensusManager,
validatorMonitor: ref ValidatorMonitor,
blobQuarantine: ref BlobQuarantine,
getBeaconTime: GetBeaconTimeFn): ref BlockProcessor =
getBeaconTime: GetBeaconTimeFn,
onPayloadAttributesAdded: OnPayloadAttributesCallback):
ref BlockProcessor =
(ref BlockProcessor)(
dumpEnabled: dumpEnabled,
dumpDirInvalid: dumpDirInvalid,
Expand All @@ -139,6 +149,7 @@ proc new*(T: type BlockProcessor,
validatorMonitor: validatorMonitor,
blobQuarantine: blobQuarantine,
getBeaconTime: getBeaconTime,
onPayloadAttributesAdded: onPayloadAttributesAdded,
verifier: BatchVerifier.init(rng, taskpool)
)

Expand Down Expand Up @@ -407,6 +418,52 @@ proc enqueueBlock*(
except AsyncQueueFullError:
raiseAssert "unbounded queue"

proc sendNewPayloadAttributeNotification(
self: BlockProcessor, wallSlot: Slot, newHead: BlockRef) =
if not(isNil(self.onPayloadAttributesAdded)):
# https://github.com/ethereum/beacon-APIs/blob/v2.4.2/apis/eventstream/index.yaml#L95-L124
# `payload_attributes`: beacon API encoding of `PayloadAttributesV<N>` as
# defined by the `execution-apis` specification. The version `N` must match
# the payload attributes for the hard fork matching `version`.
#
# The beacon API encoded object must have equivalent fields to its
# counterpart in `execution-apis` with two differences: 1) `snake_case`
# identifiers must be used rather than `camelCase`; 2) integers must be
# encoded as quoted decimals rather than big-endian hex.
let
proposal_slot = wallSlot + 1
payload_attributes =
case self.consensusManager.dag.cfg.consensusForkAtEpoch(
proposal_slot.epoch)
of ConsensusFork.Deneb:
RestJson.encode(RestPayloadAttributesV3(
timestamp: 0'u64,
prev_randao: ZERO_HASH,
suggested_fee_recipient: default(Eth1Address),
withdrawals: @[],
parent_beacon_block_root: ZERO_HASH))
of ConsensusFork.Capella:
RestJson.encode(RestPayloadAttributesV2(
timestamp: 0'u64,
prev_randao: ZERO_HASH,
suggested_fee_recipient: default(Eth1Address),
withdrawals: @[]))
of ConsensusFork.Phase0 .. ConsensusFork.Bellatrix:
RestJson.encode(RestPayloadAttributesV1(
timestamp: 0'u64,
prev_randao: ZERO_HASH,
suggested_fee_recipient: default(Eth1Address)))

self.onPayloadAttributesAdded(PayloadAttributesInfoObject(
proposal_slot: proposal_slot,
parent_block_root: newHead.root,
parent_block_number:
self.consensusManager.dag.loadExecutionBlockNumber(newHead),
parent_block_hash:
self.consensusManager.dag.loadExecutionBlockHash(newHead),
proposer_index: 0'u64, # FIXME
payload_attributes: payload_attributes))

proc storeBlock(
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
signedBlock: ForkySignedBeaconBlock,
Expand Down Expand Up @@ -654,6 +711,7 @@ proc storeBlock(
headExecutionPayloadHash =
dag.loadExecutionBlockHash(newHead.get.blck)
wallSlot = self.getBeaconTime().slotOrZero

if headExecutionPayloadHash.isZero or
NewPayloadStatus.noResponse == payloadStatus:
# Blocks without execution payloads can't be optimistic, and don't try
Expand Down Expand Up @@ -682,6 +740,8 @@ proc storeBlock(
ConsensusFork.Bellatrix:
callExpectValidFCU(payloadAttributeType = PayloadAttributesV1)

self[].sendNewPayloadAttributeNotification(wallSlot, newHead.get.blck)

if self.consensusManager.checkNextProposer(wallSlot).isNone:
# No attached validator is next proposer, so use non-proposal fcU
callForkChoiceUpdated()
Expand All @@ -695,6 +755,9 @@ proc storeBlock(
else:
await self.consensusManager.updateHeadWithExecution(
newHead.get, self.getBeaconTime)

self[].sendNewPayloadAttributeNotification(wallSlot, newHead.get.blck)

else:
warn "Head selection failed, using previous head",
head = shortLog(dag.head), wallSlot
Expand Down
4 changes: 0 additions & 4 deletions beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,6 @@ template checkedReject(
pool: ValidatorChangePool, error: ValidationError): untyped =
pool.dag.checkedReject(error)

template checkedResult(
pool: ValidatorChangePool, error: ValidationError): untyped =
pool.dag.checkedResult(error)

template validateBeaconBlockBellatrix(
signed_beacon_block: phase0.SignedBeaconBlock | altair.SignedBeaconBlock,
parent: BlockRef): untyped =
Expand Down
4 changes: 3 additions & 1 deletion beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ proc initFullNode(
node.eventBus.propSlashQueue.emit(data)
proc onAttesterSlashingAdded(data: AttesterSlashing) =
node.eventBus.attSlashQueue.emit(data)
proc onPayloadAttributesAdded(data: PayloadAttributesInfoObject) =
node.eventBus.payloadAttributesQueue.emit(data)
proc onBlobSidecarAdded(data: BlobSidecar) =
node.eventBus.blobSidecarQueue.emit(
BlobSidecarInfoObject(
Expand Down Expand Up @@ -349,7 +351,7 @@ proc initFullNode(
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, node.validatorMonitor,
blobQuarantine, getBeaconTime)
blobQuarantine, getBeaconTime, onPayloadAttributesAdded)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool):
Future[Result[void, VerifierError]] =
Expand Down
4 changes: 4 additions & 0 deletions beacon_chain/rpc/rest_event_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) =
let handler = response.eventHandler(node.eventBus.attSlashQueue,
"attester_slashing")
res.add(handler)
if EventTopic.PayloadAttributes in eventTopics:
let handler = response.eventHandler(node.eventBus.payloadAttributesQueue,
"payload_attributes")
res.add(handler)
if EventTopic.BlobSidecar in eventTopics:
let handler = response.eventHandler(node.eventBus.blobSidecarQueue,
"blob_sidecar")
Expand Down
10 changes: 10 additions & 0 deletions beacon_chain/spec/datatypes/deneb.nim
Original file line number Diff line number Diff line change
Expand Up @@ -822,3 +822,13 @@ template asTrusted*(
SigVerifiedSignedBeaconBlock |
MsgTrustedSignedBeaconBlock): TrustedSignedBeaconBlock =
isomorphicCast[TrustedSignedBeaconBlock](x)

type
# https://github.com/ethereum/beacon-APIs/blob/v2.4.2/apis/eventstream/index.yaml#L95-L124
PayloadAttributesInfoObject* = object
proposal_slot*: Slot
parent_block_root*: Eth2Digest
parent_block_number*: uint64
parent_block_hash*: Eth2Digest
proposer_index*: uint64
payload_attributes*: string
12 changes: 12 additions & 0 deletions beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ RestJson.useDefaultSerializationFor(
KeystoreInfo,
ListFeeRecipientResponse,
ListGasLimitResponse,
PayloadAttributesInfoObject,
PendingAttestation,
PostKeystoresResponse,
PrepareBeaconProposer,
Expand Down Expand Up @@ -141,6 +142,9 @@ RestJson.useDefaultSerializationFor(
RestNodeExtraData,
RestNodePeer,
RestNodeVersion,
RestPayloadAttributesV1,
RestPayloadAttributesV2,
RestPayloadAttributesV3,
RestPeerCount,
RestProposerDuty,
RestRoot,
Expand Down Expand Up @@ -307,8 +311,12 @@ type
ImportDistributedKeystoresBody |
ImportRemoteKeystoresBody |
KeystoresAndSlashingProtection |
PayloadAttributesInfoObject |
PrepareBeaconProposer |
ProposerSlashing |
RestPayloadAttributesV1 |
RestPayloadAttributesV2 |
RestPayloadAttributesV3 |
SetFeeRecipientRequest |
SetGasLimitRequest |
bellatrix_mev.SignedBlindedBeaconBlock |
Expand Down Expand Up @@ -4175,6 +4183,8 @@ proc decodeString*(t: typedesc[EventTopic],
ok(EventTopic.ProposerSlashing)
of "attester_slashing":
ok(EventTopic.AttesterSlashing)
of "payload_attributes":
ok(EventTopic.PayloadAttributes)
of "blob_sidecar":
ok(EventTopic.BlobSidecar)
of "finalized_checkpoint":
Expand Down Expand Up @@ -4206,6 +4216,8 @@ proc encodeString*(value: set[EventTopic]): Result[string, cstring] =
res.add("proposer_slashing,")
if EventTopic.AttesterSlashing in value:
res.add("attester_slashing,")
if EventTopic.PayloadAttributes in value:
res.add("payload_attributes,")
if EventTopic.BlobSidecar in value:
res.add("blob_sidecar,")
if EventTopic.FinalizedCheckpoint in value:
Expand Down