Skip to content

Commit

Permalink
*wip* spv: Add batched cfilter fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
matheusd committed May 13, 2024
1 parent 5e8f132 commit 9cdea5b
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 10 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ require (
google.golang.org/protobuf v1.27.1
)

replace github.com/decred/dcrd/wire => github.com/matheusd/dcrd/wire v1.5.1-0.20240513180946-5a28304e53f8

require (
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 // indirect
github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ github.com/decred/dcrd/rpcclient/v8 v8.0.0 h1:O4B5d+8e2OjbeFW+c1XcZNQzyp++04ArWh
github.com/decred/dcrd/rpcclient/v8 v8.0.0/go.mod h1:gx4+DI5apuOEeLwPBJFlMoj3GFWq1I7/X8XCQmMTi8Q=
github.com/decred/dcrd/txscript/v4 v4.1.0 h1:uEdcibIOl6BuWj3AqmXZ9xIK/qbo6lHY9aNk29FtkrU=
github.com/decred/dcrd/txscript/v4 v4.1.0/go.mod h1:OVguPtPc4YMkgssxzP8B6XEMf/J3MB6S1JKpxgGQqi0=
github.com/decred/dcrd/wire v1.6.0 h1:YOGwPHk4nzGr6OIwUGb8crJYWDiVLpuMxfDBCCF7s/o=
github.com/decred/dcrd/wire v1.6.0/go.mod h1:XQ8Xv/pN/3xaDcb7sH8FBLS9cdgVctT7HpBKKGsIACk=
github.com/decred/go-socks v1.1.0 h1:dnENcc0KIqQo3HSXdgboXAHgqsCIutkqq6ntQjYtm2U=
github.com/decred/go-socks v1.1.0/go.mod h1:sDhHqkZH0X4JjSa02oYOGhcGHYp12FsY1jQ/meV8md0=
github.com/decred/slog v1.2.0 h1:soHAxV52B54Di3WtKLfPum9OFfWqwtf/ygf9njdfnPM=
Expand Down Expand Up @@ -120,6 +118,8 @@ github.com/jrick/wsrpc/v2 v2.3.5 h1:CwdycaR/df09iGkPMXs1FxqAHMCQbdAiTGoHfOrtuds=
github.com/jrick/wsrpc/v2 v2.3.5/go.mod h1:7oBeDM/xMF6Yqy4GDAjpppuOf1hm6lWsaG3EaMrm+aA=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/matheusd/dcrd/wire v1.5.1-0.20240513180946-5a28304e53f8 h1:+Up02Z/ejzYXb4TK2R896wAmtMl4DO/v4tbjW8NnaoY=
github.com/matheusd/dcrd/wire v1.5.1-0.20240513180946-5a28304e53f8/go.mod h1:XQ8Xv/pN/3xaDcb7sH8FBLS9cdgVctT7HpBKKGsIACk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand Down
105 changes: 101 additions & 4 deletions p2p/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var uaVersion = version.String()
const minPver = wire.RemoveRejectVersion

// Pver is the maximum protocol version implemented by the LocalPeer.
const Pver = wire.RemoveRejectVersion
const Pver = wire.BatchedCFiltersV2Version

// stallTimeout is the amount of time allowed before a request to receive data
// that is known to exist at the RemotePeer times out with no matching reply.
Expand Down Expand Up @@ -92,9 +92,10 @@ type RemotePeer struct {
requestedBlocksMu sync.Mutex
requestedBlocks map[chainhash.Hash]*blockRequest

requestedCFiltersV2 sync.Map // k=chainhash.Hash v=chan<- *wire.MsgCFilterV2
requestedTxs map[chainhash.Hash]chan<- *wire.MsgTx
requestedTxsMu sync.Mutex
requestedCFiltersV2 sync.Map // k=chainhash.Hash v=chan<- *wire.MsgCFilterV2
requestedManyCFiltersV2 sync.Map // k=chainhash.Hash v=chan<- *wire.MsgCFilterV2
requestedTxs map[chainhash.Hash]chan<- *wire.MsgTx
requestedTxsMu sync.Mutex

// headers message management. Headers can either be fetched synchronously
// or used to push block notifications with sendheaders.
Expand Down Expand Up @@ -738,6 +739,8 @@ func (rp *RemotePeer) readMessages(ctx context.Context) error {
rp.receivedBlock(ctx, m)
case *wire.MsgCFilterV2:
rp.receivedCFilterV2(ctx, m)
case *wire.MsgCFiltersV2:
rp.receivedManyCFilterV2(ctx, m)
case *wire.MsgNotFound:
rp.receivedNotFound(ctx, m)
case *wire.MsgTx:
Expand Down Expand Up @@ -923,6 +926,42 @@ func (rp *RemotePeer) receivedBlock(ctx context.Context, msg *wire.MsgBlock) {
rp.requestedBlocksMu.Unlock()
}

func (rp *RemotePeer) addRequestedManyCFilterV2(hash *chainhash.Hash, c chan<- *wire.MsgCFiltersV2) (newRequest bool) {
_, loaded := rp.requestedManyCFiltersV2.LoadOrStore(*hash, c)
return !loaded
}

func (rp *RemotePeer) deleteRequestedManyCFilterV2(hash *chainhash.Hash) {
rp.requestedManyCFiltersV2.Delete(*hash)
}

func (rp *RemotePeer) receivedManyCFilterV2(ctx context.Context, msg *wire.MsgCFiltersV2) {
const opf = "remotepeer(%v).receivedCFilterV2(%v)"
if len(msg.CFilters) == 0 {
op := errors.Opf(opf, rp.raddr, chainhash.Hash{})
err := errors.E(op, errors.Protocol, "received empty cfiltersv2 message")
rp.Disconnect(err)
return

}

var k any = msg.CFilters[0].BlockHash
v, ok := rp.requestedManyCFiltersV2.Load(k)
if !ok {
op := errors.Opf(opf, rp.raddr, k)
err := errors.E(op, errors.Protocol, "received unrequested many cfilter")
rp.Disconnect(err)
return
}

rp.requestedManyCFiltersV2.Delete(k)
c := v.(chan<- *wire.MsgCFiltersV2)
select {
case <-ctx.Done():
case c <- msg:
}
}

func (rp *RemotePeer) addRequestedCFilterV2(hash *chainhash.Hash, c chan<- *wire.MsgCFilterV2) (newRequest bool) {
_, loaded := rp.requestedCFiltersV2.LoadOrStore(*hash, c)
return !loaded
Expand Down Expand Up @@ -1585,6 +1624,64 @@ func (rp *RemotePeer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.H
return filters, nil
}

// BatchedCFiltersV2 fetches all cfilters between the passed start and end
// blocks. The first block MUST be an ancestor of the final block.
func (rp *RemotePeer) BatchedCFiltersV2(ctx context.Context, startHash, endHash *chainhash.Hash) ([]filterProof, error) {
const opf = "remotepeer(%v).CFilterV2(%v)"

if rp.pver < wire.CFilterV2Version {
op := errors.Opf(opf, rp.raddr, startHash)
err := errors.Errorf("protocol version %v is too low to fetch cfiltersv2 from this peer", rp.pver)
return nil, errors.E(op, errors.Protocol, err)
}

m := wire.NewMsgGetCFsV2(startHash, endHash)
c := make(chan *wire.MsgCFiltersV2, 1)
if !rp.addRequestedManyCFilterV2(startHash, c) {
op := errors.Opf(opf, rp.raddr, startHash)
return nil, errors.E(op, errors.Invalid, "cfilterv2 is already being requested from this peer for this block")
}
stalled := time.NewTimer(stallTimeout)
out := rp.out
for {
select {
case <-ctx.Done():
rp.deleteRequestedManyCFilterV2(startHash)
go func() {
<-stalled.C
}()
return nil, ctx.Err()
case <-stalled.C:
rp.deleteRequestedManyCFilterV2(startHash)
op := errors.Opf(opf, rp.raddr, startHash)
err := errors.E(op, errors.IO, "peer appears stalled")
rp.Disconnect(err)
return nil, err
case <-rp.errc:
stalled.Stop()
return nil, rp.err
case out <- &msgAck{m, nil}:
out = nil
case m := <-c:
stalled.Stop()
res := make([]filterProof, len(m.CFilters))
for i := 0; i < len(m.CFilters); i++ {
var f *gcs.FilterV2
var err error
f, err = gcs.FromBytesV2(blockcf.B, blockcf.M, m.CFilters[i].Data)
if err != nil {
op := errors.Opf(opf, rp.raddr, startHash)
return nil, errors.E(op, err)
}
res[i].Filter = f
res[i].Proof = m.CFilters[i].ProofHashes
res[i].ProofIndex = m.CFilters[i].ProofIndex
}
return res, nil
}
}
}

// SendHeaders sends the remote peer a sendheaders message. This informs the
// peer to announce new blocks by immediately sending them in a headers message
// rather than sending an inv message containing the block hash.
Expand Down
21 changes: 17 additions & 4 deletions spv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockN
cnet := s.wallet.ChainParams().Net

// Split fetching into batches of a max size.
const cfilterBatchSize = 100
const cfilterBatchSize = wire.MaxCFiltersV2PerBatch
if len(nodes) > cfilterBatchSize {
g, ctx := errgroup.WithContext(ctx)
for len(nodes) > cfilterBatchSize {
Expand Down Expand Up @@ -174,9 +174,22 @@ nextTry:

startTime := time.Now()

// TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching.

filters, err := rp.CFiltersV2(ctx, nodeHashes)
// If the node supports batched fetch, use it. Otherwise,
// fetch one by one.
var filters []wallet.FilterProof
if rp.Pver() >= wire.BatchedCFiltersV2Version {
filters, err = rp.BatchedCFiltersV2(ctx, nodes[0].Hash, nodes[len(nodes)-1].Hash)
if err == nil && len(filters) != len(nodes) {
errMsg := fmt.Errorf("peer returned unexpected "+
"number of filters (got %d, want %d)",
len(filters), len(nodes))
err = errors.E(errors.Protocol, errMsg)
rp.Disconnect(err)
continue nextTry
}
} else {
filters, err = rp.CFiltersV2(ctx, nodeHashes)
}
if err != nil {
log.Tracef("Unable to fetch cfilter batch for "+
"from %v: %v", rp, err)
Expand Down

0 comments on commit 9cdea5b

Please sign in to comment.