Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: Add getaddrv2 and addrv2. #2627

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
8 changes: 8 additions & 0 deletions peer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@ go 1.11

require (
github.com/davecgh/go-spew v1.1.1
github.com/decred/dcrd/addrmgr/v2 v2.0.0 // indirect
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not appear to be correct. It's a direct dependency since it's now imported.

github.com/decred/dcrd/chaincfg/chainhash v1.0.3
github.com/decred/dcrd/lru v1.1.0
github.com/decred/dcrd/txscript/v4 v4.0.0
github.com/decred/dcrd/wire v1.5.0
github.com/decred/go-socks v1.1.0
github.com/decred/slog v1.2.0
)

replace (
github.com/decred/dcrd/addrmgr/v2 => ../addrmgr
github.com/decred/dcrd/dcrec/secp256k1/v4 => ../dcrec/secp256k1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe any of these other replacements aside from the addrmgr are needed.

github.com/decred/dcrd/dcrutil/v4 => ../dcrutil
github.com/decred/dcrd/txscript/v4 => ../txscript
)
99 changes: 55 additions & 44 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"time"

"github.com/davecgh/go-spew/spew"
"github.com/decred/dcrd/addrmgr/v2"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/lru"
"github.com/decred/dcrd/wire"
"github.com/decred/go-socks/socks"
"github.com/decred/slog"
)

Expand Down Expand Up @@ -81,6 +81,9 @@ var (
// connection detecting and disconnect logic since they intentionally
// do so for testing purposes.
allowSelfConns bool

// zeroIPv4 is an unroutable IPv4 address consisting of all zeros.
zeroIPv4 = net.IP([]byte{0, 0, 0, 0})
)

// MessageListeners defines callback function pointers to invoke with message
Expand Down Expand Up @@ -284,40 +287,30 @@ func minUint32(a, b uint32) uint32 {

// newNetAddress attempts to extract the IP address and port from the passed
// net.Addr interface and create a NetAddress structure using that information.
func newNetAddress(addr net.Addr, services wire.ServiceFlag) (*wire.NetAddress, error) {
// addr will be a net.TCPAddr when not using a proxy.
if tcpAddr, ok := addr.(*net.TCPAddr); ok {
ip := tcpAddr.IP
port := uint16(tcpAddr.Port)
na := wire.NewNetAddressIPPort(ip, port, services)
return na, nil
}

// addr will be a socks.ProxiedAddr when using a proxy.
if proxiedAddr, ok := addr.(*socks.ProxiedAddr); ok {
ip := net.ParseIP(proxiedAddr.Host)
if ip == nil {
ip = net.ParseIP("0.0.0.0")
}
port := uint16(proxiedAddr.Port)
na := wire.NewNetAddressIPPort(ip, port, services)
return na, nil
func newNetAddress(addr net.Addr, services wire.ServiceFlag) (*addrmgr.NetAddress, error) {
host, portStr, err := net.SplitHostPort(addr.String())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is introducing a bunch of extra allocations that were previously avoided for the most common cases which is why it was the way it was already.

if err != nil {
return nil, err
}

// For the most part, addr should be one of the two above cases, but
// to be safe, fall back to trying to parse the information from the
// address string as a last resort.
host, portStr, err := net.SplitHostPort(addr.String())
addrType, addrBytes, err := addrmgr.ParseHost(host)
if err != nil {
return nil, err
}
ip := net.ParseIP(host)

if addrType == addrmgr.UnknownAddressType {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably move after the port parse since there is no reason to do it if the parse fails.

addrType = addrmgr.IPv4Address
addrBytes = zeroIPv4
}

port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return nil, err
}
na := wire.NewNetAddressIPPort(ip, uint16(port), services)
return na, nil

timestamp := time.Unix(time.Now().Unix(), 0)
return addrmgr.NewNetAddressByType(addrType, addrBytes, uint16(port),
timestamp, services)
}

// outMsg is used to house a message to be sent along with a channel to signal
Expand Down Expand Up @@ -385,7 +378,7 @@ type AddrFunc func(remoteAddr *wire.NetAddress) *wire.NetAddress
// HostToNetAddrFunc is a func which takes a host, port, services and returns
// the netaddress.
type HostToNetAddrFunc func(host string, port uint16,
services wire.ServiceFlag) (*wire.NetAddress, error)
services wire.ServiceFlag) (*addrmgr.NetAddress, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a major breaking change to the public API of the peer module and thus will need a major module version bump.


// NOTE: The overall data flow of a peer is split into 3 goroutines. Inbound
// messages are read via the inHandler goroutine and generally dispatched to
Expand Down Expand Up @@ -431,7 +424,7 @@ type Peer struct {
inbound bool

flagsMtx sync.Mutex // protects the peer flags below
na *wire.NetAddress
na *addrmgr.NetAddress
id int32
userAgent string
services wire.ServiceFlag
Expand Down Expand Up @@ -564,7 +557,7 @@ func (p *Peer) ID() int32 {
// NA returns the peer network address.
//
// This function is safe for concurrent access.
func (p *Peer) NA() *wire.NetAddress {
func (p *Peer) NA() *addrmgr.NetAddress {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here regarding being a breaking change to the API.

p.flagsMtx.Lock()
if p.na == nil {
p.flagsMtx.Unlock()
Expand Down Expand Up @@ -1820,6 +1813,37 @@ func (p *Peer) readRemoteVersionMsg() error {
return nil
}

// addrmgrToWireNetAddress converts an IPv4, IPv6, or TORv2 address manager
// network address to a wire network address. The host name must not match the
// configured proxy host name.
//
// If the address cannot be converted, an IPv4 network address consisting of all
// zeroes is returned.
func addrmgrToWireNetAddress(addr *addrmgr.NetAddress, proxyAddr string) *wire.NetAddress {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little bit worried about how this is being split out and named as if it is for generic use, but it's not due to the additional proxy handling. It's really specifically for use during version negotiation.

if addr.Type != addrmgr.IPv4Address &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to change it if you prefer the code how it is, but what do you think about using a switch here so it's easier to expand in the future if needed? e.g.

switch addr.Type {
case addrmgr.IPv4Address, addrmgr.IPv6Address, addrmgr.TORv2Address:
	// Recognized types.  Handled below.
default:
	return wire.NewNetAddressIPPort(zeroIPv4, 0, addr.Services)
}

addr.Type != addrmgr.IPv6Address &&
addr.Type != addrmgr.TORv2Address {
return wire.NewNetAddressIPPort(zeroIPv4, 0, addr.Services)
}

// If we are behind a proxy and the connection comes from the proxy then
// return an unroutable address as their address. This is to prevent
// leaking the tor proxy address.
if proxyAddr != "" {
proxyHost, _, err := net.SplitHostPort(proxyAddr)
if err != nil {
return wire.NewNetAddressIPPort(zeroIPv4, 0, addr.Services)
}
peerAddress, _, err := net.SplitHostPort(addr.Key())
if err != nil || peerAddress == proxyHost {
return wire.NewNetAddressIPPort(zeroIPv4, 0, addr.Services)
}
}

return wire.NewNetAddressTimestamp(addr.Timestamp, addr.Services, addr.IP,
addr.Port)
}

// localVersionMsg creates a version message that can be used to send to the
// remote peer.
func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
Expand All @@ -1832,20 +1856,6 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
}
}

theirNA := p.NA()

// If we are behind a proxy and the connection comes from the proxy then
// we return an unroutable address as their address. This is to prevent
// leaking the tor proxy address.
if p.cfg.Proxy != "" {
proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
// invalid proxy means poorly configured, be on the safe side.
if err != nil || p.na.IP.String() == proxyaddress {
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0,
theirNA.Services)
}
}

// Create a wire.NetAddress with only the services set to use as the
// "addrme" in the version message.
//
Expand All @@ -1869,6 +1879,7 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
sentNonces.Add(nonce)

// Version message.
theirNA := addrmgrToWireNetAddress(p.na, p.cfg.Proxy)
msg := wire.NewMsgVersion(ourNA, theirNA, nonce, int32(blockNum))
msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
p.cfg.UserAgentComments...)
Expand Down Expand Up @@ -2077,7 +2088,7 @@ func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
}
p.na = na
} else {
p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
p.na = addrmgr.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
}

return p, nil
Expand Down
42 changes: 20 additions & 22 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package main

import (
"bytes"
"context"
"crypto/elliptic"
"crypto/rand"
Expand Down Expand Up @@ -313,20 +314,20 @@ type peerState struct {
}

// ConnectionsWithIP returns the number of connections with the given IP.
func (ps *peerState) ConnectionsWithIP(ip net.IP) int {
func (ps *peerState) ConnectionsWithIP(ip []byte) int {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this is being changed for an upcoming commit, but it doesn't really make sense to me to make this change in isolation as of this commit.

Would you mind elaborating on the thought process for the change?

var total int
for _, p := range ps.inboundPeers {
if ip.Equal(p.NA().IP) {
if bytes.Equal(ip, p.NA().IP) {
total++
}
}
for _, p := range ps.outboundPeers {
if ip.Equal(p.NA().IP) {
if bytes.Equal(ip, p.NA().IP) {
total++
}
}
for _, p := range ps.persistentPeers {
if ip.Equal(p.NA().IP) {
if bytes.Equal(ip, p.NA().IP) {
total++
}
}
Expand Down Expand Up @@ -739,7 +740,7 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
// enforced and the remote node has not upgraded yet.
isInbound := sp.Inbound()
msgProtocolVersion := uint32(msg.ProtocolVersion)
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
remoteAddr := sp.NA()
addrManager := sp.server.addrManager
if !cfg.SimNet && !cfg.RegNet && !isInbound {
err := addrManager.SetServices(remoteAddr, msg.Services)
Expand Down Expand Up @@ -1458,7 +1459,7 @@ func (sp *serverPeer) OnAddr(p *peer.Peer, msg *wire.MsgAddr) {
// Add addresses to server address manager. The address manager handles
// the details of things such as preventing duplicate addresses, max
// addresses, and last seen updates.
remoteAddr := wireToAddrmgrNetAddress(p.NA())
remoteAddr := p.NA()
sp.server.addrManager.AddAddresses(addrList, remoteAddr)
}

Expand Down Expand Up @@ -1711,7 +1712,10 @@ func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
// Limit max number of connections from a single IP. However, allow
// whitelisted inbound peers and localhost connections regardless.
isInboundWhitelisted := sp.isWhitelisted && sp.Inbound()
peerIP := sp.NA().IP

// Since an address manager IP is just a byte array, cast it to access
// convenience methods on net.IP.
peerIP := net.IP(sp.NA().IP)
if cfg.MaxSameIP > 0 && !isInboundWhitelisted && !peerIP.IsLoopback() &&
state.ConnectionsWithIP(peerIP)+1 > cfg.MaxSameIP {
srvrLog.Infof("Max connections with %s reached [%d] - "+
Expand Down Expand Up @@ -1753,7 +1757,7 @@ func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
}
}
} else {
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
remoteAddr := sp.NA()
state.outboundGroups[remoteAddr.GroupKey()]++
if sp.persistent {
state.persistentPeers[sp.ID()] = sp
Expand Down Expand Up @@ -1840,7 +1844,7 @@ func (s *server) handleDonePeerMsg(state *peerState, sp *serverPeer) {
}
if _, ok := list[sp.ID()]; ok {
if !sp.Inbound() && sp.VersionKnown() {
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
remoteAddr := sp.NA()
state.outboundGroups[remoteAddr.GroupKey()]--
}
if !sp.Inbound() && sp.connReq != nil {
Expand All @@ -1858,7 +1862,7 @@ func (s *server) handleDonePeerMsg(state *peerState, sp *serverPeer) {
// Update the address' last seen time if the peer has acknowledged
// our version and has sent us its version as well.
if sp.VerAckReceived() && sp.VersionKnown() && sp.NA() != nil {
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
remoteAddr := sp.NA()
err := s.addrManager.Connected(remoteAddr)
if err != nil {
srvrLog.Debugf("Marking address as connected failed: %v", err)
Expand Down Expand Up @@ -2072,7 +2076,7 @@ func (s *server) handleQuery(state *peerState, querymsg interface{}) {
found := disconnectPeer(state.persistentPeers, msg.cmp, func(sp *serverPeer) {
// Keep group counts ok since we remove from
// the list now.
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
remoteAddr := sp.NA()
state.outboundGroups[remoteAddr.GroupKey()]--

peerLog.Debugf("Removing persistent peer %s (reqid %d)", remoteAddr,
Expand Down Expand Up @@ -2128,7 +2132,7 @@ func (s *server) handleQuery(state *peerState, querymsg interface{}) {
found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
// Keep group counts ok since we remove from
// the list now.
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
remoteAddr := sp.NA()
state.outboundGroups[remoteAddr.GroupKey()]--
})
if found {
Expand All @@ -2137,7 +2141,7 @@ func (s *server) handleQuery(state *peerState, querymsg interface{}) {
// peers are found.
for found {
found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
remoteAddr := sp.NA()
state.outboundGroups[remoteAddr.GroupKey()]--
})
}
Expand Down Expand Up @@ -2206,14 +2210,8 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
OnWrite: sp.OnWrite,
OnNotFound: sp.OnNotFound,
},
NewestBlock: sp.newestBlock,
HostToNetAddress: func(host string, port uint16, services wire.ServiceFlag) (*wire.NetAddress, error) {
address, err := sp.server.addrManager.HostToNetAddress(host, port, services)
if err != nil {
return nil, err
}
return addrmgrToWireNetAddress(address), nil
},
NewestBlock: sp.newestBlock,
HostToNetAddress: sp.server.addrManager.HostToNetAddress,
Proxy: cfg.Proxy,
UserAgentName: userAgentName,
UserAgentVersion: userAgentVersion,
Expand Down Expand Up @@ -2257,7 +2255,7 @@ func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
sp.AssociateConnection(conn)
go s.peerDoneHandler(sp)

remoteAddr := wireToAddrmgrNetAddress(sp.NA())
remoteAddr := sp.NA()
err = s.addrManager.Attempt(remoteAddr)
if err != nil {
srvrLog.Debugf("Marking address as attempted failed: %v", err)
Expand Down