Skip to content

Commit

Permalink
Merge pull request #1148 from subutai-io/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
crioto committed Nov 5, 2018
2 parents 0239354 + fed82f6 commit 84a5ff7
Show file tree
Hide file tree
Showing 9 changed files with 1,537 additions and 54 deletions.
6 changes: 3 additions & 3 deletions CHANGELOG
Expand Up @@ -5,10 +5,10 @@
* Implemented latency measurement for peers (#445)
* Implemented latency measurement for proxies (#907)
* Added reconnect timeout for failing DHT connections (#1114)
* Refactored save/restore sybsysten (#1106)
* Save dump now uses YAML (#1105)
* Refactored save/restore sybsystem (#1106)
* Dump file now uses YAML (#1105)
* Output to stderr instead of stdout on non-zero exit code (#1102)
* Fixed non-zero exit code on failed start attempt (#1132)
* Fixed non-zero exit code on failed start attempts (#1132)

## [8.0.1] 10/09/2018

Expand Down
103 changes: 88 additions & 15 deletions lib/dht_callbacks.go
Expand Up @@ -12,6 +12,10 @@ import (
type dhtCallback func(*protocol.DHTPacket) error

func (p *PeerToPeer) setupTCPCallbacks() {
if p.Dht == nil {
Log(Error, "Can't setup TCP callbacks: DHT is nil")
return
}
p.Dht.TCPCallbacks = make(map[protocol.DHTPacketType]dhtCallback)
p.Dht.TCPCallbacks[protocol.DHTPacketType_BadProxy] = p.packetBadProxy
p.Dht.TCPCallbacks[protocol.DHTPacketType_Connect] = p.packetConnect
Expand Down Expand Up @@ -39,6 +43,12 @@ func (p *PeerToPeer) packetBadProxy(packet *protocol.DHTPacket) error {

// Handshake response should be handled here.
func (p *PeerToPeer) packetConnect(packet *protocol.DHTPacket) error {
if p.Dht == nil {
return fmt.Errorf("nil dht")
}
if packet == nil {
return fmt.Errorf("nil packet")
}
if len(packet.Id) != 36 {
return fmt.Errorf("Received malformed ID")
}
Expand All @@ -49,6 +59,12 @@ func (p *PeerToPeer) packetConnect(packet *protocol.DHTPacket) error {
}

func (p *PeerToPeer) packetDHCP(packet *protocol.DHTPacket) error {
if p.Dht == nil {
return fmt.Errorf("nil dht")
}
if packet == nil {
return fmt.Errorf("nil packet")
}
if packet.Data != "" && packet.Extra != "" {
ip, network, err := net.ParseCIDR(fmt.Sprintf("%s/%s", packet.Data, packet.Extra))
if err != nil {
Expand All @@ -63,6 +79,9 @@ func (p *PeerToPeer) packetDHCP(packet *protocol.DHTPacket) error {
}

func (p *PeerToPeer) packetError(packet *protocol.DHTPacket) error {
if packet == nil {
return fmt.Errorf("nil packet")
}
lvl := LogLevel(Trace)
if packet.Data == "" {
lvl = Error
Expand All @@ -76,14 +95,26 @@ func (p *PeerToPeer) packetError(packet *protocol.DHTPacket) error {
}

func (p *PeerToPeer) packetFind(packet *protocol.DHTPacket) error {
if packet == nil {
return fmt.Errorf("nil packet")
}
if p.Dht == nil {
return fmt.Errorf("nil dht")
}
if len(packet.Arguments) == 0 {
Log(Warning, "Received empty peer list")
return nil
}
if packet.Data == p.Dht.ID {
Log(Debug, "Skipping self")
Log(Debug, "Skipping self [%s = %s]", packet.Data, p.Dht.ID)
return nil
}
if p.Peers == nil {
return fmt.Errorf("nil peer list")
}
if p.ProxyManager == nil {
return fmt.Errorf("nil proxy manager")
}

Log(Debug, "Received `find`: %+v", packet)
peer := p.Peers.GetPeer(packet.Data)
Expand Down Expand Up @@ -140,11 +171,14 @@ func (p *PeerToPeer) packetFind(packet *protocol.DHTPacket) error {
Log(Debug, "Adding proxy: %s", addr.String())
}
}
peer.SetState(PeerStateInit, p)
peer.LastFind = time.Now()
p.Peers.Update(peer.ID, peer)
p.Peers.RunPeer(peer.ID, p)
if packet.GetExtra() != "skip" {
peer.SetState(PeerStateInit, p)
peer.LastFind = time.Now()
p.Peers.Update(peer.ID, peer)
p.Peers.RunPeer(peer.ID, p)
}
} else {
// This is an existing peer
peer.LastFind = time.Now()

ips := []*net.UDPAddr{}
Expand Down Expand Up @@ -200,6 +234,13 @@ func (p *PeerToPeer) packetForward(packet *protocol.DHTPacket) error {
}

func (p *PeerToPeer) packetNode(packet *protocol.DHTPacket) error {
if packet == nil {
return fmt.Errorf("nil packet")
}

if p.Peers == nil {
return fmt.Errorf("nil peer list")
}

if len(packet.Arguments) == 0 {
return fmt.Errorf("Empty IP's list")
Expand Down Expand Up @@ -238,6 +279,18 @@ func (p *PeerToPeer) packetPing(packet *protocol.DHTPacket) error {
}

func (p *PeerToPeer) packetProxy(packet *protocol.DHTPacket) error {
if packet == nil {
return fmt.Errorf("nil packet")
}
if p.UDPSocket == nil {
return fmt.Errorf("nil socket")
}
if p.ProxyManager == nil {
return fmt.Errorf("nil proxy manager")
}
if p.Dht == nil {
return fmt.Errorf("nil dht")
}
Log(Debug, "Received list of proxies")
for _, proxy := range packet.Proxies {
proxyAddr, err := net.ResolveUDPAddr("udp4", proxy)
Expand All @@ -258,6 +311,9 @@ func (p *PeerToPeer) packetProxy(packet *protocol.DHTPacket) error {

// packetRequestProxy received when we was requesting proxy to connect to some peer
func (p *PeerToPeer) packetRequestProxy(packet *protocol.DHTPacket) error {
if p.Peers == nil {
return fmt.Errorf("nil peer list")
}
list := []*net.UDPAddr{}
for _, proxy := range packet.Proxies {
addr, err := net.ResolveUDPAddr("udp4", proxy)
Expand All @@ -268,16 +324,6 @@ func (p *PeerToPeer) packetRequestProxy(packet *protocol.DHTPacket) error {
list = append(list, addr)
}

// peers := p.Peers.Get()
// for _, proxy := range list {
// for _, existingPeer := range peers {
// if existingPeer.Endpoint.String() == proxy.String() && existingPeer.ID != packet.Data {
// existingPeer.SetState(PeerStateDisconnect, p)
// Log(Info, "Peer %s was associated with address %s. Disconnecting", existingPeer.ID, proxy.String())
// }
// }
// }

peer := p.Peers.GetPeer(packet.Data)
if peer != nil {
peer.Proxies = list
Expand All @@ -291,6 +337,9 @@ func (p *PeerToPeer) packetReportProxy(packet *protocol.DHTPacket) error {
}

func (p *PeerToPeer) packetRegisterProxy(packet *protocol.DHTPacket) error {
if packet == nil {
return fmt.Errorf("nil packet")
}
if packet.Data == "OK" {
Log(Info, "Proxy registration confirmed")
}
Expand All @@ -302,6 +351,12 @@ func (p *PeerToPeer) packetReportLoad(packet *protocol.DHTPacket) error {
}

func (p *PeerToPeer) packetState(packet *protocol.DHTPacket) error {
if packet == nil {
return fmt.Errorf("nil packet")
}
if p.Peers == nil {
return fmt.Errorf("nil peer list")
}
if len(packet.Data) != 36 {
return fmt.Errorf("Receied state packet for unknown/broken ID")
}
Expand Down Expand Up @@ -330,6 +385,18 @@ func (p *PeerToPeer) packetStop(packet *protocol.DHTPacket) error {
}

func (p *PeerToPeer) packetUnknown(packet *protocol.DHTPacket) error {
if packet == nil {
return fmt.Errorf("nil packet")
}
if p.Dht == nil {
return fmt.Errorf("nil dht")
}
if p.ProxyManager == nil {
return fmt.Errorf("nil proxy manager")
}
if p.Interface == nil {
return fmt.Errorf("nil interface")
}
Log(Debug, "Received unknown packet")
p.FindNetworkAddresses()
if len(packet.Data) > 0 && packet.Data == "DHCP" {
Expand All @@ -342,6 +409,12 @@ func (p *PeerToPeer) packetUnknown(packet *protocol.DHTPacket) error {
}

func (p *PeerToPeer) packetUnsupported(packet *protocol.DHTPacket) error {
if packet == nil {
return fmt.Errorf("nil packet")
}
if p.Dht == nil {
return fmt.Errorf("nil dht")
}
Log(Error, "Bootstap node doesn't support our version. Shutting down")
return p.Dht.Close()
}

0 comments on commit 84a5ff7

Please sign in to comment.