/
server.go
4321 lines (3850 loc) · 139 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2024 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"context"
"crypto/elliptic"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"errors"
"fmt"
"math"
"net"
"os"
"path"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/decred/dcrd/addrmgr/v2"
"github.com/decred/dcrd/blockchain/stake/v5"
"github.com/decred/dcrd/blockchain/standalone/v2"
"github.com/decred/dcrd/certgen"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/chaincfg/v3"
"github.com/decred/dcrd/connmgr/v3"
"github.com/decred/dcrd/container/apbf"
"github.com/decred/dcrd/database/v3"
"github.com/decred/dcrd/dcrutil/v4"
"github.com/decred/dcrd/internal/blockchain"
"github.com/decred/dcrd/internal/blockchain/indexers"
"github.com/decred/dcrd/internal/fees"
"github.com/decred/dcrd/internal/mempool"
"github.com/decred/dcrd/internal/mining"
"github.com/decred/dcrd/internal/mining/cpuminer"
"github.com/decred/dcrd/internal/netsync"
"github.com/decred/dcrd/internal/rpcserver"
"github.com/decred/dcrd/internal/version"
"github.com/decred/dcrd/math/uint256"
"github.com/decred/dcrd/peer/v3"
"github.com/decred/dcrd/txscript/v4"
"github.com/decred/dcrd/wire"
"github.com/syndtr/goleveldb/leveldb"
)
const (
// defaultServices describes the default services that are supported by
// the server.
defaultServices = wire.SFNodeNetwork
// defaultRequiredServices describes the default services that are
// required to be supported by outbound peers.
defaultRequiredServices = wire.SFNodeNetwork
// defaultTargetOutbound is the default number of outbound peers to
// target.
defaultTargetOutbound = 8
// defaultMaximumVoteAge is the threshold of blocks before the tip
// that can be voted on.
defaultMaximumVoteAge = 1440
// connectionRetryInterval is the base amount of time to wait in between
// retries when connecting to persistent peers. It is adjusted by the
// number of retries such that there is a retry backoff.
connectionRetryInterval = time.Second * 5
// maxProtocolVersion is the max protocol version the server supports.
maxProtocolVersion = wire.BatchedCFiltersV2Version
// These fields are used to track known addresses on a per-peer basis.
//
// maxKnownAddrsPerPeer is the maximum number of items to track.
//
// knownAddrsFPRate is the false positive rate for the APBF used to track
// them. It is set to a rate of 1 per 1000 since addresses are not very
// large and they only need to be filtered once per connection, so an extra
// 10 of them being sent (on average) again even though they technically
// wouldn't need to is a good tradeoff.
//
// These values result in about 40 KiB memory usage including overhead.
maxKnownAddrsPerPeer = 10000
knownAddrsFPRate = 0.001
// maxCachedNaSubmissions is the maximum number of network address
// submissions cached.
maxCachedNaSubmissions = 20
// These constants control the maximum number of simultaneous pending
// getdata messages and the individual data item requests they make without
// being disconnected.
//
// Since each getdata message is comprised of several individual data item
// requests, the limiting is applied on both dimensions to offer more
// flexibility while still keeping memory usage bounded to reasonable
// limits.
//
// maxConcurrentGetDataReqs is the maximum number of simultaneous pending
// getdata message requests.
//
// maxPendingGetDataItemReqs is the maximum number of overall total
// simultaneous pending individual data item requests.
//
// In other words, when combined, a peer may mix and match simultaneous
// getdata requests for varying amounts of data items so long as it does not
// exceed the maximum specified number of simultaneous pending getdata
// messages or the maximum number of total overall pending individual data
// item requests.
maxConcurrentGetDataReqs = 1000
maxPendingGetDataItemReqs = 2 * wire.MaxInvPerMsg
// maxReorgDepthNotify specifies the maximum reorganization depth for which
// winning ticket notifications will be sent over RPC. The reorg depth is
// the number of blocks that would be reorganized out of the current best
// chain if a side chain being considered for notifications were to
// ultimately be extended to be longer than the current one.
//
// In effect, this helps to prevent large reorgs by refusing to send the
// winning ticket information to RPC clients, such as voting wallets, which
// depend on it to cast votes.
//
// This check also doubles to help reduce exhaustion attacks that could
// otherwise arise from sending old orphan blocks and forcing nodes to do
// expensive lottery data calculations for them.
maxReorgDepthNotify = 6
// These fields are used to track recently confirmed transactions.
//
// maxRecentlyConfirmedTxns specifies the maximum number to track and is set
// to target tracking the maximum number transactions of the minimum
// realistic size (~206 bytes) in approximately one hour of blocks on the
// main network.
//
// recentlyConfirmedTxnsFPRate is the false positive rate for the APBF used
// to track them and is set to a rate of 1 per 1 million which supports up
// to ~11.5 transactions/s before a single false positive would be seen on
// average and thus allows for plenty of future growth.
//
// These values result in about 183 KiB memory usage including overhead.
maxRecentlyConfirmedTxns = 23000
recentlyConfirmedTxnsFPRate = 0.000001
)
var (
// userAgentName is the user agent name and is used to help identify
// ourselves to other Decred peers.
userAgentName = "dcrd"
// userAgentVersion is the user agent version and is used to help
// identify ourselves to other peers.
userAgentVersion = fmt.Sprintf("%d.%d.%d", version.Major, version.Minor,
version.Patch)
)
// simpleAddr implements the net.Addr interface with two struct fields
type simpleAddr struct {
net, addr string
}
// String returns the address.
//
// This is part of the net.Addr interface.
func (a simpleAddr) String() string {
return a.addr
}
// Network returns the network.
//
// This is part of the net.Addr interface.
func (a simpleAddr) Network() string {
return a.net
}
// Ensure simpleAddr implements the net.Addr interface.
var _ net.Addr = simpleAddr{}
// broadcastMsg provides the ability to house a Decred message to be broadcast
// to all connected peers except specified excluded peers.
type broadcastMsg struct {
message wire.Message
excludePeers []*serverPeer
}
// broadcastInventoryAdd is a type used to declare that the InvVect it contains
// needs to be added to the rebroadcast map
type broadcastInventoryAdd relayMsg
// broadcastInventoryDel is a type used to declare that the InvVect it contains
// needs to be removed from the rebroadcast map
type broadcastInventoryDel *wire.InvVect
// broadcastPruneInventory is a type used to declare that rebroadcast
// inventory entries need to be filtered and removed where necessary
type broadcastPruneInventory struct{}
// relayMsg packages an inventory vector along with the newly discovered
// inventory and a flag that determines if the relay should happen immediately
// (it will be put into a trickle queue if false) so the relay has access to
// that information.
type relayMsg struct {
invVect *wire.InvVect
data interface{}
immediate bool
reqServices wire.ServiceFlag
}
// naSubmission represents a network address submission from an outbound peer.
type naSubmission struct {
na *wire.NetAddress
netType addrmgr.NetAddressType
reach addrmgr.NetAddressReach
score uint32
lastAccessed int64
}
// naSubmissionCache represents a bounded map for network address submisions.
type naSubmissionCache struct {
cache map[string]*naSubmission
limit int
mtx sync.Mutex
}
// add caches the provided address submission.
func (sc *naSubmissionCache) add(sub *naSubmission) error {
if sub == nil {
return fmt.Errorf("submission cannot be nil")
}
key := sub.na.IP.String()
if key == "" {
return fmt.Errorf("submission key cannot be an empty string")
}
sc.mtx.Lock()
defer sc.mtx.Unlock()
// Remove the oldest submission if cache limit has been reached.
if len(sc.cache) == sc.limit {
var oldestSub *naSubmission
for _, sub := range sc.cache {
if oldestSub == nil {
oldestSub = sub
continue
}
if sub.lastAccessed < oldestSub.lastAccessed {
oldestSub = sub
}
}
if oldestSub != nil {
delete(sc.cache, oldestSub.na.IP.String())
}
}
sub.score = 1
sub.lastAccessed = time.Now().Unix()
sc.cache[key] = sub
return nil
}
// exists returns true if the provided key exist in the submissions cache.
func (sc *naSubmissionCache) exists(key string) bool {
if key == "" {
return false
}
sc.mtx.Lock()
_, ok := sc.cache[key]
sc.mtx.Unlock()
return ok
}
// incrementScore increases the score of address submission referenced by
// the provided key by one.
func (sc *naSubmissionCache) incrementScore(key string) error {
if key == "" {
return fmt.Errorf("submission key cannot be an empty string")
}
sc.mtx.Lock()
defer sc.mtx.Unlock()
sub, ok := sc.cache[key]
if !ok {
return fmt.Errorf("submission key not found: %s", key)
}
sub.score++
sub.lastAccessed = time.Now().Unix()
sc.cache[key] = sub
return nil
}
// bestSubmission fetches the best scoring submission of the provided
// network interface.
func (sc *naSubmissionCache) bestSubmission(net addrmgr.NetAddressType) *naSubmission {
sc.mtx.Lock()
defer sc.mtx.Unlock()
var best *naSubmission
for _, sub := range sc.cache {
if sub.netType != net {
continue
}
if best == nil {
best = sub
continue
}
if sub.score > best.score {
best = sub
}
}
return best
}
// peerState maintains state of inbound, persistent, outbound peers as well
// as banned peers and outbound groups.
type peerState struct {
inboundPeers map[int32]*serverPeer
outboundPeers map[int32]*serverPeer
persistentPeers map[int32]*serverPeer
banned map[string]time.Time
outboundGroups map[string]int
subCache *naSubmissionCache
}
// ConnectionsWithIP returns the number of connections with the given IP.
func (ps *peerState) ConnectionsWithIP(ip net.IP) int {
var total int
for _, p := range ps.inboundPeers {
if ip.Equal(p.NA().IP) {
total++
}
}
for _, p := range ps.outboundPeers {
if ip.Equal(p.NA().IP) {
total++
}
}
for _, p := range ps.persistentPeers {
if ip.Equal(p.NA().IP) {
total++
}
}
return total
}
// Count returns the count of all known peers.
func (ps *peerState) Count() int {
return len(ps.inboundPeers) + len(ps.outboundPeers) +
len(ps.persistentPeers)
}
// forAllOutboundPeers is a helper function that runs closure on all outbound
// peers known to peerState.
func (ps *peerState) forAllOutboundPeers(closure func(sp *serverPeer)) {
for _, e := range ps.outboundPeers {
closure(e)
}
for _, e := range ps.persistentPeers {
closure(e)
}
}
// forAllPeers is a helper function that runs closure on all peers known to
// peerState.
func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) {
for _, e := range ps.inboundPeers {
closure(e)
}
ps.forAllOutboundPeers(closure)
}
// ResolveLocalAddress picks the best suggested network address from available
// options, per the network interface key provided. The best suggestion, if
// found, is added as a local address.
func (ps *peerState) ResolveLocalAddress(netType addrmgr.NetAddressType, addrMgr *addrmgr.AddrManager, services wire.ServiceFlag) {
best := ps.subCache.bestSubmission(netType)
if best == nil {
return
}
targetOutbound := defaultTargetOutbound
if cfg.MaxPeers < targetOutbound {
targetOutbound = cfg.MaxPeers
}
// A valid best address suggestion must have a majority
// (60 percent majority) of outbound peers concluding on
// the same result.
if best.score < uint32(math.Ceil(float64(targetOutbound)*0.6)) {
return
}
addLocalAddress := func(bestSuggestion string, port uint16, services wire.ServiceFlag) {
na, err := addrMgr.HostToNetAddress(bestSuggestion, port, services)
if err != nil {
amgrLog.Errorf("unable to generate network address using host %v: "+
"%v", bestSuggestion, err)
return
}
if !addrMgr.HasLocalAddress(na) {
err := addrMgr.AddLocalAddress(na, addrmgr.ManualPrio)
if err != nil {
amgrLog.Errorf("unable to add local address: %v", err)
return
}
}
}
stripIPv6Zone := func(ip string) string {
// Strip IPv6 zone id if present.
zoneIndex := strings.LastIndex(ip, "%")
if zoneIndex > 0 {
return ip[:zoneIndex]
}
return ip
}
for _, listener := range cfg.Listeners {
host, portStr, err := net.SplitHostPort(listener)
if err != nil {
amgrLog.Errorf("unable to split network address: %v", err)
return
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
amgrLog.Errorf("unable to parse port: %v", err)
return
}
host = stripIPv6Zone(host)
// Add a local address if the best suggestion is referenced by a
// listener.
if best.na.IP.String() == host {
addLocalAddress(best.na.IP.String(), uint16(port), services)
continue
}
// Add a local address if the listener is generic (applies
// for both IPv4 and IPv6).
if host == "" || (host == "*" && runtime.GOOS == "plan9") {
addLocalAddress(best.na.IP.String(), uint16(port), services)
continue
}
listenerIP := net.ParseIP(host)
if listenerIP == nil {
amgrLog.Errorf("unable to parse listener: %v", host)
return
}
// Add a local address if the network address is a probable external
// endpoint of the listener.
lNa := wire.NewNetAddressIPPort(listenerIP, uint16(port), services)
lNet := addrmgr.IPv4Address
if lNa.IP.To4() == nil {
lNet = addrmgr.IPv6Address
}
validExternal := (lNet == addrmgr.IPv4Address &&
best.reach == addrmgr.Ipv4) || lNet == addrmgr.IPv6Address &&
(best.reach == addrmgr.Ipv6Weak || best.reach == addrmgr.Ipv6Strong ||
best.reach == addrmgr.Teredo)
if validExternal {
addLocalAddress(best.na.IP.String(), uint16(port), services)
continue
}
}
}
// server provides a Decred server for handling communications to and from
// Decred peers.
type server struct {
bytesReceived atomic.Uint64 // Total bytes received from all peers since start.
bytesSent atomic.Uint64 // Total bytes sent by all peers since start.
shutdown atomic.Bool
// minKnownWork houses the minimum known work from the associated network
// params converted to a uint256 so the conversion only needs to be
// performed once when the server is initialized. Ideally, the chain params
// should be updated to use the new type, but that will be a major version
// bump, so a one-time conversion is a good tradeoff in the mean time.
minKnownWork uint256.Uint256
chainParams *chaincfg.Params
addrManager *addrmgr.AddrManager
connManager *connmgr.ConnManager
sigCache *txscript.SigCache
subsidyCache *standalone.SubsidyCache
rpcServer *rpcserver.Server
syncManager *netsync.SyncManager
bg *mining.BgBlkTmplGenerator
chain *blockchain.BlockChain
txMemPool *mempool.TxPool
feeEstimator *fees.Estimator
cpuMiner *cpuminer.CPUMiner
modifyRebroadcastInv chan interface{}
newPeers chan *serverPeer
donePeers chan *serverPeer
banPeers chan *serverPeer
query chan interface{}
relayInv chan relayMsg
broadcast chan broadcastMsg
nat *upnpNAT
db database.DB
timeSource blockchain.MedianTimeSource
services wire.ServiceFlag
quit chan struct{}
// The following fields are used for optional indexes. They will be nil
// if the associated index is not enabled. These fields are set during
// initial creation of the server and never changed afterwards, so they
// do not need to be protected for concurrent access.
indexSubscriber *indexers.IndexSubscriber
txIndex *indexers.TxIndex
existsAddrIndex *indexers.ExistsAddrIndex
// These following fields are used to filter duplicate block lottery data
// anouncements.
lotteryDataBroadcastMtx sync.RWMutex
lotteryDataBroadcast map[chainhash.Hash]struct{}
// recentlyConfirmedTxns tracks transactions that have been confirmed in the
// most recent blocks.
recentlyConfirmedTxns *apbf.Filter
}
// serverPeer extends the peer to maintain state shared by the server.
type serverPeer struct {
*peer.Peer
connReq *connmgr.ConnReq
server *server
persistent bool
continueHash atomic.Pointer[chainhash.Hash]
disableRelayTx atomic.Bool
isWhitelisted bool
knownAddresses *apbf.Filter
banScore connmgr.DynamicBanScore
quit chan struct{}
// syncMgrPeer houses the network sync manager peer instance that wraps the
// underlying peer similar to the way this server peer itself wraps it.
syncMgrPeer *netsync.Peer
// addrsSent, getMiningStateSent and initState all track whether or not
// the peer has already sent the respective request. It is used to
// prevent more than one response per connection.
addrsSent bool
getMiningStateSent bool
initStateSent bool
// The following fields are used to synchronize the net sync manager and
// server.
txProcessed chan struct{}
blockProcessed chan struct{}
// peerNa is network address of the peer connected to.
peerNa atomic.Pointer[wire.NetAddress]
// announcedBlock tracks the most recent block announced to this peer and is
// used to filter duplicates.
announcedBlock *chainhash.Hash
// The following fields are used to serve getdata requests asynchronously as
// opposed to directly in the peer input handler.
//
// getDataQueue is a buffered channel for queueing up concurrent getdata
// requests.
//
// numPendingGetDataItemReqs tracks the total number of pending individual
// data item requests that still need to be served.
getDataQueue chan []*wire.InvVect
numPendingGetDataItemReqs atomic.Uint32
}
// newServerPeer returns a new serverPeer instance. The peer needs to be set by
// the caller.
func newServerPeer(s *server, isPersistent bool) *serverPeer {
return &serverPeer{
server: s,
persistent: isPersistent,
knownAddresses: apbf.NewFilter(maxKnownAddrsPerPeer, knownAddrsFPRate),
quit: make(chan struct{}),
txProcessed: make(chan struct{}, 1),
blockProcessed: make(chan struct{}, 1),
getDataQueue: make(chan []*wire.InvVect, maxConcurrentGetDataReqs),
}
}
// handleServeGetData is the primary logic for servicing queued getdata
// requests.
//
// It makes use of the given send done channel and semaphore to provide
// a little pipelining of database loads while keeping the memory usage bounded
// to reasonable limits.
//
// It is invoked from the serveGetData goroutine.
func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect,
sendDoneChan chan struct{}, semaphore chan struct{}) {
var notFoundMsg *wire.MsgNotFound
for _, iv := range invVects {
var sendInv bool
var dataMsg wire.Message
switch iv.Type {
case wire.InvTypeTx:
// Attempt to fetch the requested transaction from the pool. A call
// could be made to check for existence first, but simply trying to
// fetch a missing transaction results in the same behavior. Do not
// allow peers to request transactions already in a block but are
// unconfirmed, as they may be expensive. Restrict that to the
// authenticated RPC only.
txHash := &iv.Hash
tx, err := sp.server.txMemPool.FetchTransaction(txHash)
if err != nil {
peerLog.Tracef("Unable to fetch tx %v from transaction pool: %v",
txHash, err)
break
}
dataMsg = tx.MsgTx()
case wire.InvTypeBlock:
blockHash := &iv.Hash
block, err := sp.server.chain.BlockByHash(blockHash)
if err != nil {
peerLog.Tracef("Unable to fetch requested block hash %v: %v",
blockHash, err)
break
}
dataMsg = block.MsgBlock()
// When the peer requests the final block that was advertised in
// response to a getblocks message which requested more blocks than
// would fit into a single message, it requires a new inventory
// message to trigger it to issue another getblocks message for the
// next batch of inventory.
//
// However, that inventory message should not be sent until after
// the block itself is sent, so keep a flag for later use.
//
// Note that this is to support the legacy syncing model that is no
// longer used in dcrd which is now based on a much more robust
// headers-based syncing model. Nevertheless, this behavior is
// still a required part of the getblocks protocol semantics. It
// can be removed if a future protocol upgrade also removes the
// getblocks message.
continueHash := sp.continueHash.Load()
sendInv = continueHash != nil && *continueHash == *blockHash
default:
peerLog.Warnf("Unknown type '%d' in inventory request from %s",
iv.Type, sp)
continue
}
if dataMsg == nil {
// Keep track of all items that were not found in order to send a
// consolidated messsage once the entire batch is processed.
//
// The error when adding the inventory vector is ignored because the
// only way it could fail would be by exceeding the max allowed
// number of items which is impossible given the getdata message is
// enforced to not exceed that same maximum limit.
if notFoundMsg == nil {
notFoundMsg = wire.NewMsgNotFound()
}
notFoundMsg.AddInvVect(iv)
// There is no need to wait for the semaphore below when there is
// not any data to send.
sp.numPendingGetDataItemReqs.Add(^uint32(0))
continue
}
// Limit the number of items that can be queued to prevent wasting a
// bunch of memory by queuing far more data than can be sent in a
// reasonable time. The waiting occurs after the database fetch for the
// next one to provide a little pipelining.
//
// This also monitors the channel that is notified when queued messages
// are sent in order to release the semaphore without needing a separate
// monitoring goroutine.
for semAcquired := false; !semAcquired; {
select {
case <-sp.quit:
return
case semaphore <- struct{}{}:
semAcquired = true
case <-sendDoneChan:
// Release semaphore.
<-semaphore
}
}
// Decrement the pending data item requests accordingly and queue the
// data to be sent to the peer.
sp.numPendingGetDataItemReqs.Add(^uint32(0))
sp.QueueMessage(dataMsg, sendDoneChan)
// Send a new inventory message to trigger the peer to issue another
// getblocks message for the next batch of inventory if needed.
if sendInv {
best := sp.server.chain.BestSnapshot()
invMsg := wire.NewMsgInvSizeHint(1)
iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash)
invMsg.AddInvVect(iv)
sp.QueueMessage(invMsg, nil)
sp.continueHash.Store(nil)
}
}
if notFoundMsg != nil {
sp.QueueMessage(notFoundMsg, nil)
}
}
// serveGetData provides an asynchronous queue that services all data requested
// via getdata requests such that the peer may mix and match simultaneous
// getdata requests for varying amounts of data items so long as it does not
// exceed the maximum number of simultaneous pending getdata messages or the
// maximum number of total overall pending data item requests.
//
// It must be run in a goroutine.
func (sp *serverPeer) serveGetData() {
// Allow a max number of items to be loaded from the database/mempool and
// queued for send.
const maxPendingSend = 3
sendDoneChan := make(chan struct{}, maxPendingSend+1)
semaphore := make(chan struct{}, maxPendingSend)
for {
select {
case <-sp.quit:
return
case invVects := <-sp.getDataQueue:
sp.handleServeGetData(invVects, sendDoneChan, semaphore)
// Release the semaphore as queued messages are sent.
case <-sendDoneChan:
<-semaphore
}
}
}
// Run starts additional async processing for the peer and blocks until the peer
// disconnects at which point it notifies the server and net sync manager that
// the peer has disconnected and performs other associated cleanup such as
// evicting any remaining orphans sent by the peer and shutting down all
// goroutines.
func (sp *serverPeer) Run() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
sp.serveGetData()
wg.Done()
}()
// Wait for the peer to disconnect and notify the net sync manager and
// server accordingly.
sp.WaitForDisconnect()
srvr := sp.server
srvr.DonePeer(sp)
srvr.syncManager.PeerDisconnected(sp.syncMgrPeer)
if sp.VersionKnown() {
// Evict any remaining orphans that were sent by the peer.
numEvicted := srvr.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID()))
if numEvicted > 0 {
srvrLog.Debugf("Evicted %d %s from peer %v (id %d)", numEvicted,
pickNoun(numEvicted, "orphan", "orphans"), sp, sp.ID())
}
}
// Shutdown remaining peer goroutines.
close(sp.quit)
wg.Wait()
}
// newestBlock returns the current best block hash and height using the format
// required by the configuration for the peer package.
func (sp *serverPeer) newestBlock() (*chainhash.Hash, int64, error) {
best := sp.server.chain.BestSnapshot()
return &best.Hash, best.Height, nil
}
// addKnownAddress adds the given address to the set of known addresses to
// the peer to prevent sending duplicate addresses.
func (sp *serverPeer) addKnownAddress(na *addrmgr.NetAddress) {
sp.knownAddresses.Add([]byte(na.Key()))
}
// addKnownAddresses adds the given addresses to the set of known addresses to
// the peer to prevent sending duplicate addresses.
func (sp *serverPeer) addKnownAddresses(addresses []*addrmgr.NetAddress) {
for _, na := range addresses {
sp.addKnownAddress(na)
}
}
// addressKnown true if the given address is already known to the peer.
func (sp *serverPeer) addressKnown(na *addrmgr.NetAddress) bool {
return sp.knownAddresses.Contains([]byte(na.Key()))
}
// wireToAddrmgrNetAddress converts a wire NetAddress to an address manager
// NetAddress.
func wireToAddrmgrNetAddress(netAddr *wire.NetAddress) *addrmgr.NetAddress {
newNetAddr := addrmgr.NewNetAddressIPPort(netAddr.IP, netAddr.Port, netAddr.Services)
newNetAddr.Timestamp = netAddr.Timestamp
return newNetAddr
}
// wireToAddrmgrNetAddresses converts a collection of wire net addresses to a
// collection of address manager net addresses.
func wireToAddrmgrNetAddresses(netAddr []*wire.NetAddress) []*addrmgr.NetAddress {
addrs := make([]*addrmgr.NetAddress, len(netAddr))
for i, wireAddr := range netAddr {
addrs[i] = wireToAddrmgrNetAddress(wireAddr)
}
return addrs
}
// addrmgrToWireNetAddress converts an address manager net address to a wire net
// address.
func addrmgrToWireNetAddress(netAddr *addrmgr.NetAddress) *wire.NetAddress {
return wire.NewNetAddressTimestamp(netAddr.Timestamp, netAddr.Services,
netAddr.IP, netAddr.Port)
}
// pushAddrMsg sends an addr message to the connected peer using the provided
// addresses.
func (sp *serverPeer) pushAddrMsg(addresses []*addrmgr.NetAddress) {
// Filter addresses already known to the peer.
addrs := make([]*wire.NetAddress, 0, len(addresses))
for _, addr := range addresses {
if !sp.addressKnown(addr) {
wireNetAddr := addrmgrToWireNetAddress(addr)
addrs = append(addrs, wireNetAddr)
}
}
known, err := sp.PushAddrMsg(addrs)
if err != nil {
peerLog.Errorf("Can't push address message to %s: %v", sp, err)
sp.Disconnect()
return
}
knownNetAddrs := wireToAddrmgrNetAddresses(known)
sp.addKnownAddresses(knownNetAddrs)
}
// addBanScore increases the persistent and decaying ban score fields by the
// values passed as parameters. If the resulting score exceeds half of the ban
// threshold, a warning is logged including the reason provided. Further, if
// the score is above the ban threshold, the peer will be banned and
// disconnected.
func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) bool {
// No warning is logged and no score is calculated if banning is disabled.
if cfg.DisableBanning {
return false
}
if sp.isWhitelisted {
peerLog.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason)
return false
}
warnThreshold := cfg.BanThreshold >> 1
if transient == 0 && persistent == 0 {
// The score is not being increased, but a warning message is still
// logged if the score is above the warn threshold.
score := sp.banScore.Int()
if score > warnThreshold {
peerLog.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+
"it was not increased this time", sp, reason, score)
}
return false
}
score := sp.banScore.Increase(persistent, transient)
if score > warnThreshold {
peerLog.Warnf("Misbehaving peer %s: %s -- ban score increased to %d",
sp, reason, score)
if score > cfg.BanThreshold {
peerLog.Warnf("Misbehaving peer %s -- banning and disconnecting",
sp)
sp.server.BanPeer(sp)
sp.Disconnect()
return true
}
}
return false
}
// hasServices returns whether or not the provided advertised service flags have
// all of the provided desired service flags set.
func hasServices(advertised, desired wire.ServiceFlag) bool {
return advertised&desired == desired
}
// OnVersion is invoked when a peer receives a version wire message and is used
// to negotiate the protocol version details as well as kick start the
// communications.
func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
// Update the address manager with the advertised services for outbound
// connections in case they have changed. This is not done for inbound
// connections to help prevent malicious behavior and is skipped when
// running on the simulation and regression test networks since they are
// only intended to connect to specified peers and actively avoid
// advertising and connecting to discovered peers.
//
// NOTE: This is done before rejecting peers that are too old to ensure
// it is updated regardless in the case a new minimum protocol version is
// enforced and the remote node has not upgraded yet.
isInbound := sp.Inbound()
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
addrManager := sp.server.addrManager
if !cfg.SimNet && !cfg.RegNet && !isInbound {
err := addrManager.SetServices(remoteAddr, msg.Services)
if err != nil {
srvrLog.Errorf("Setting services for address failed: %v", err)
}
}
// Enforce the minimum protocol limit on outbound connections.
if !isInbound && msg.ProtocolVersion < int32(wire.RemoveRejectVersion) {
srvrLog.Debugf("Rejecting outbound peer %s with protocol version %d prior to "+
"the required version %d", sp, msg.ProtocolVersion,
wire.RemoveRejectVersion)
sp.Disconnect()
return
}
// Reject peers that have a protocol version that is too old.
const reqProtocolVersion = int32(wire.RemoveRejectVersion)
if msg.ProtocolVersion < reqProtocolVersion {
srvrLog.Debugf("Rejecting peer %s with protocol version %d prior to "+
"the required version %d", sp, msg.ProtocolVersion,
reqProtocolVersion)
sp.Disconnect()
return
}
// Reject outbound peers that are not full nodes.
wantServices := wire.SFNodeNetwork
if !isInbound && !hasServices(msg.Services, wantServices) {
missingServices := wantServices & ^msg.Services
srvrLog.Debugf("Rejecting peer %s with services %v due to not "+
"providing desired services %v", sp, msg.Services, missingServices)
sp.Disconnect()
return
}
// Update the address manager and request known addresses from the
// remote peer for outbound connections. This is skipped when running
// on the simulation and regression test networks since they are only
// intended to connect to specified peers and actively avoid advertising
// and connecting to discovered peers.
if !cfg.SimNet && !cfg.RegNet && !isInbound {
// Advertise the local address when the server accepts incoming
// connections and it believes itself to be close to the best
// known tip.
if !cfg.DisableListen && sp.server.syncManager.IsCurrent() {
// Get address that best matches.
lna := addrManager.GetBestLocalAddress(remoteAddr)
if lna.IsRoutable() {
// Filter addresses the peer already knows about.
addresses := []*addrmgr.NetAddress{lna}
sp.pushAddrMsg(addresses)
}
}
// Request known addresses if the server address manager needs
// more.
if addrManager.NeedMoreAddresses() {
sp.QueueMessage(wire.NewMsgGetAddr(), nil)
}
// Mark the address as a known good address.
err := addrManager.Good(remoteAddr)
if err != nil {
srvrLog.Errorf("Marking address as good failed: %v", err)