Skip to content

Commit

Permalink
*wip* spv: Add batched cfilter fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
matheusd committed May 13, 2024
1 parent 5e8f132 commit 01bc63a
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 14 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.1.0
github.com/decred/dcrd/rpcclient/v8 v8.0.0
github.com/decred/dcrd/txscript/v4 v4.1.0
github.com/decred/dcrd/wire v1.6.0
github.com/decred/dcrd/wire v1.7.0
github.com/decred/go-socks v1.1.0
github.com/decred/slog v1.2.0
github.com/decred/vspd/client/v3 v3.0.0
Expand Down Expand Up @@ -55,5 +55,5 @@ require (
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
lukechampine.com/blake3 v1.3.0 // indirect
)
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ github.com/decred/dcrd/rpcclient/v8 v8.0.0 h1:O4B5d+8e2OjbeFW+c1XcZNQzyp++04ArWh
github.com/decred/dcrd/rpcclient/v8 v8.0.0/go.mod h1:gx4+DI5apuOEeLwPBJFlMoj3GFWq1I7/X8XCQmMTi8Q=
github.com/decred/dcrd/txscript/v4 v4.1.0 h1:uEdcibIOl6BuWj3AqmXZ9xIK/qbo6lHY9aNk29FtkrU=
github.com/decred/dcrd/txscript/v4 v4.1.0/go.mod h1:OVguPtPc4YMkgssxzP8B6XEMf/J3MB6S1JKpxgGQqi0=
github.com/decred/dcrd/wire v1.6.0 h1:YOGwPHk4nzGr6OIwUGb8crJYWDiVLpuMxfDBCCF7s/o=
github.com/decred/dcrd/wire v1.6.0/go.mod h1:XQ8Xv/pN/3xaDcb7sH8FBLS9cdgVctT7HpBKKGsIACk=
github.com/decred/dcrd/wire v1.7.0 h1:5JHiDjEQeS4XUl4PfnTZYLwAD/E/+LwBmPRec/fP76o=
github.com/decred/dcrd/wire v1.7.0/go.mod h1:lAqrzV0SU4kyV6INLEJgDtUjJaTaVKrbF4LHtaYl+zU=
github.com/decred/go-socks v1.1.0 h1:dnENcc0KIqQo3HSXdgboXAHgqsCIutkqq6ntQjYtm2U=
github.com/decred/go-socks v1.1.0/go.mod h1:sDhHqkZH0X4JjSa02oYOGhcGHYp12FsY1jQ/meV8md0=
github.com/decred/slog v1.2.0 h1:soHAxV52B54Di3WtKLfPum9OFfWqwtf/ygf9njdfnPM=
Expand Down Expand Up @@ -214,5 +214,5 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI=
lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k=
lukechampine.com/blake3 v1.3.0 h1:sJ3XhFINmHSrYCgl958hscfIa3bw8x4DqMP3u1YvoYE=
lukechampine.com/blake3 v1.3.0/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k=
105 changes: 101 additions & 4 deletions p2p/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var uaVersion = version.String()
const minPver = wire.RemoveRejectVersion

// Pver is the maximum protocol version implemented by the LocalPeer.
const Pver = wire.RemoveRejectVersion
const Pver = wire.BatchedCFiltersV2Version

// stallTimeout is the amount of time allowed before a request to receive data
// that is known to exist at the RemotePeer times out with no matching reply.
Expand Down Expand Up @@ -92,9 +92,10 @@ type RemotePeer struct {
requestedBlocksMu sync.Mutex
requestedBlocks map[chainhash.Hash]*blockRequest

requestedCFiltersV2 sync.Map // k=chainhash.Hash v=chan<- *wire.MsgCFilterV2
requestedTxs map[chainhash.Hash]chan<- *wire.MsgTx
requestedTxsMu sync.Mutex
requestedCFiltersV2 sync.Map // k=chainhash.Hash v=chan<- *wire.MsgCFilterV2
requestedManyCFiltersV2 sync.Map // k=chainhash.Hash v=chan<- *wire.MsgCFilterV2
requestedTxs map[chainhash.Hash]chan<- *wire.MsgTx
requestedTxsMu sync.Mutex

// headers message management. Headers can either be fetched synchronously
// or used to push block notifications with sendheaders.
Expand Down Expand Up @@ -738,6 +739,8 @@ func (rp *RemotePeer) readMessages(ctx context.Context) error {
rp.receivedBlock(ctx, m)
case *wire.MsgCFilterV2:
rp.receivedCFilterV2(ctx, m)
case *wire.MsgCFiltersV2:
rp.receivedManyCFilterV2(ctx, m)
case *wire.MsgNotFound:
rp.receivedNotFound(ctx, m)
case *wire.MsgTx:
Expand Down Expand Up @@ -923,6 +926,42 @@ func (rp *RemotePeer) receivedBlock(ctx context.Context, msg *wire.MsgBlock) {
rp.requestedBlocksMu.Unlock()
}

func (rp *RemotePeer) addRequestedManyCFilterV2(hash *chainhash.Hash, c chan<- *wire.MsgCFiltersV2) (newRequest bool) {
_, loaded := rp.requestedManyCFiltersV2.LoadOrStore(*hash, c)
return !loaded
}

func (rp *RemotePeer) deleteRequestedManyCFilterV2(hash *chainhash.Hash) {
rp.requestedManyCFiltersV2.Delete(*hash)
}

func (rp *RemotePeer) receivedManyCFilterV2(ctx context.Context, msg *wire.MsgCFiltersV2) {
const opf = "remotepeer(%v).receivedCFilterV2(%v)"
if len(msg.CFilters) == 0 {
op := errors.Opf(opf, rp.raddr, chainhash.Hash{})
err := errors.E(op, errors.Protocol, "received empty cfiltersv2 message")
rp.Disconnect(err)
return

}

var k any = msg.CFilters[0].BlockHash
v, ok := rp.requestedManyCFiltersV2.Load(k)
if !ok {
op := errors.Opf(opf, rp.raddr, k)
err := errors.E(op, errors.Protocol, "received unrequested many cfilter")
rp.Disconnect(err)
return
}

rp.requestedManyCFiltersV2.Delete(k)
c := v.(chan<- *wire.MsgCFiltersV2)
select {
case <-ctx.Done():
case c <- msg:
}
}

func (rp *RemotePeer) addRequestedCFilterV2(hash *chainhash.Hash, c chan<- *wire.MsgCFilterV2) (newRequest bool) {
_, loaded := rp.requestedCFiltersV2.LoadOrStore(*hash, c)
return !loaded
Expand Down Expand Up @@ -1585,6 +1624,64 @@ func (rp *RemotePeer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.H
return filters, nil
}

// BatchedCFiltersV2 fetches all cfilters between the passed start and end
// blocks. The first block MUST be an ancestor of the final block.
func (rp *RemotePeer) BatchedCFiltersV2(ctx context.Context, startHash, endHash *chainhash.Hash) ([]filterProof, error) {
const opf = "remotepeer(%v).CFilterV2(%v)"

if rp.pver < wire.CFilterV2Version {
op := errors.Opf(opf, rp.raddr, startHash)
err := errors.Errorf("protocol version %v is too low to fetch cfiltersv2 from this peer", rp.pver)
return nil, errors.E(op, errors.Protocol, err)
}

m := wire.NewMsgGetCFsV2(startHash, endHash)
c := make(chan *wire.MsgCFiltersV2, 1)
if !rp.addRequestedManyCFilterV2(startHash, c) {
op := errors.Opf(opf, rp.raddr, startHash)
return nil, errors.E(op, errors.Invalid, "cfilterv2 is already being requested from this peer for this block")
}
stalled := time.NewTimer(stallTimeout)
out := rp.out
for {
select {
case <-ctx.Done():
rp.deleteRequestedManyCFilterV2(startHash)
go func() {
<-stalled.C
}()
return nil, ctx.Err()
case <-stalled.C:
rp.deleteRequestedManyCFilterV2(startHash)
op := errors.Opf(opf, rp.raddr, startHash)
err := errors.E(op, errors.IO, "peer appears stalled")
rp.Disconnect(err)
return nil, err
case <-rp.errc:
stalled.Stop()
return nil, rp.err
case out <- &msgAck{m, nil}:
out = nil
case m := <-c:
stalled.Stop()
res := make([]filterProof, len(m.CFilters))
for i := 0; i < len(m.CFilters); i++ {
var f *gcs.FilterV2
var err error
f, err = gcs.FromBytesV2(blockcf.B, blockcf.M, m.CFilters[i].Data)
if err != nil {
op := errors.Opf(opf, rp.raddr, startHash)
return nil, errors.E(op, err)
}
res[i].Filter = f
res[i].Proof = m.CFilters[i].ProofHashes
res[i].ProofIndex = m.CFilters[i].ProofIndex
}
return res, nil
}
}
}

// SendHeaders sends the remote peer a sendheaders message. This informs the
// peer to announce new blocks by immediately sending them in a headers message
// rather than sending an inv message containing the block hash.
Expand Down
21 changes: 17 additions & 4 deletions spv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockN
cnet := s.wallet.ChainParams().Net

// Split fetching into batches of a max size.
const cfilterBatchSize = 100
const cfilterBatchSize = wire.MaxCFiltersV2PerBatch
if len(nodes) > cfilterBatchSize {
g, ctx := errgroup.WithContext(ctx)
for len(nodes) > cfilterBatchSize {
Expand Down Expand Up @@ -174,9 +174,22 @@ nextTry:

startTime := time.Now()

// TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching.

filters, err := rp.CFiltersV2(ctx, nodeHashes)
// If the node supports batched fetch, use it. Otherwise,
// fetch one by one.
var filters []wallet.FilterProof
if rp.Pver() >= wire.BatchedCFiltersV2Version {
filters, err = rp.BatchedCFiltersV2(ctx, nodes[0].Hash, nodes[len(nodes)-1].Hash)
if err == nil && len(filters) != len(nodes) {
errMsg := fmt.Errorf("peer returned unexpected "+
"number of filters (got %d, want %d)",
len(filters), len(nodes))
err = errors.E(errors.Protocol, errMsg)
rp.Disconnect(err)
continue nextTry
}
} else {
filters, err = rp.CFiltersV2(ctx, nodeHashes)
}
if err != nil {
log.Tracef("Unable to fetch cfilter batch for "+
"from %v: %v", rp, err)
Expand Down

0 comments on commit 01bc63a

Please sign in to comment.