Skip to content
This repository has been archived by the owner on Nov 2, 2018. It is now read-only.

Commit

Permalink
Merge pull request #621 from NebulousLabs/host-finding
Browse files Browse the repository at this point in the history
Host Scanning
  • Loading branch information
lukechampine committed Jun 22, 2015
2 parents 81eba8c + c1ba924 commit deaa8ea
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 74 deletions.
27 changes: 23 additions & 4 deletions modules/hostdb/hostdb.go
Expand Up @@ -18,6 +18,11 @@ const (
// Hosts will not be removed if there are fewer than this many hosts.
// Eventually, this number should be in the low thousands.
MinHostThreshold = 5

// scanPoolSize sets the buffer size of the channel that holds hosts which
// need to be scanned. A thread pool pulls from the scan pool to query
// hosts that are due for an update.
scanPoolSize = 1000
)

var (
Expand Down Expand Up @@ -46,13 +51,20 @@ type HostDB struct {
// including hosts that are currently offline.
allHosts map[modules.NetAddress]*hostEntry

// the scanPool is a set of hosts that need to be scanned. There are a
// handful of goroutines constantly waiting on the channel for hosts to
// scan.
scanPool chan *hostEntry

subscribers []chan struct{}

mu *sync.RWMutex
}

// New returns an empty HostDatabase.
// New returns a host database that will still crawling the hosts it finds on
// the blockchain.
func New(cs *consensus.State, g modules.Gateway) (hdb *HostDB, err error) {
// Check for nil dependencies.
if cs == nil {
err = ErrNilConsensusSet
return
Expand All @@ -62,18 +74,25 @@ func New(cs *consensus.State, g modules.Gateway) (hdb *HostDB, err error) {
return
}

// Build an empty hostdb.
hdb = &HostDB{
consensusSet: cs,
gateway: g,

activeHosts: make(map[modules.NetAddress]*hostNode),
allHosts: make(map[modules.NetAddress]*hostEntry),

allHosts: make(map[modules.NetAddress]*hostEntry),

scanPool: make(chan *hostEntry, scanPoolSize),

mu: sync.New(modules.SafeMutexDelay, 1),
}

cs.ConsensusSetSubscribe(hdb)
// Begin listening to consensus and looking for hosts.
for i := 0; i < scanningThreads; i++ {
go hdb.threadedProbeHosts()
}
go hdb.threadedScan()

cs.ConsensusSetSubscribe(hdb)
return
}
5 changes: 3 additions & 2 deletions modules/hostdb/hostentry.go
Expand Up @@ -14,6 +14,7 @@ var (
baseWeight = types.NewCurrency(new(big.Int).Exp(big.NewInt(10), big.NewInt(120), nil))
)

// A hostEntry represents a host on the network.
type hostEntry struct {
modules.HostSettings
weight types.Currency
Expand All @@ -40,12 +41,12 @@ func (hdb *HostDB) insertHost(host modules.HostSettings) {
// Add the host to allHosts.
entry := &hostEntry{
HostSettings: host,
reliability: InactiveReliability,
reliability: DefaultReliability,
}
_, exists := hdb.allHosts[entry.IPAddress]
if !exists {
hdb.allHosts[entry.IPAddress] = entry
go hdb.threadedProbeHost(entry)
hdb.scanHostEntry(entry)
}
}

Expand Down
148 changes: 84 additions & 64 deletions modules/hostdb/scan.go
Expand Up @@ -17,22 +17,38 @@ import (
)

const (
DefaultScanSleep = 4 * time.Hour
MaxScanSleep = 8 * time.Hour
DefaultScanSleep = 2*time.Hour + 18*time.Minute
MaxScanSleep = 6 * time.Hour
MinScanSleep = 1 * time.Hour

MaxActiveHosts = 200
InactiveHostCheckupQuantity = 100
MaxActiveHosts = 500
InactiveHostCheckupQuantity = 250

maxSettingsLen = 1024

hostRequestTimeout = 5 * time.Second

// scanningThreads is the number of threads that will be probing hosts for
// their settings and checking for reliability.
scanningThreads = 25
)

var (
ActiveReliability = types.NewCurrency64(20)
InactiveReliability = types.NewCurrency64(10)
UnreachablePenalty = types.NewCurrency64(1)
MaxReliability = types.NewCurrency64(50) // Given the scanning defaults, about 1 week of survival.
DefaultReliability = types.NewCurrency64(20) // Given the scanning defaults, about 3 days of survival.
UnreachablePenalty = types.NewCurrency64(1)
)

// addHostToScanPool creates a gofunc that adds a host to the scan pool. If the
// scan pool is currently full, the blocking gofunc will not cause a deadlock.
// The gofunc is created inside of this function to eliminate the burden of
// needing to remember to call 'go addHostToScanPool'.
func (hdb *HostDB) scanHostEntry(entry *hostEntry) {
go func() {
hdb.scanPool <- entry
}()
}

// decrementReliability reduces the reliability of a node, moving it out of the
// set of active hosts or deleting it entirely if necessary.
func (hdb *HostDB) decrementReliability(addr modules.NetAddress, penalty types.Currency) {
Expand All @@ -43,10 +59,10 @@ func (hdb *HostDB) decrementReliability(addr modules.NetAddress, penalty types.C
}
entry.reliability = entry.reliability.Sub(penalty)

// If the entry is in the active database and has fallen below
// InactiveReliability, remove it from the active database.
// If the entry is in the active database, remove it from the active
// database.
node, exists := hdb.activeHosts[addr]
if exists && entry.reliability.Cmp(InactiveReliability) < 0 {
if exists {
delete(hdb.activeHosts, entry.IPAddress)
node.removeNode()
hdb.notifySubscribers()
Expand All @@ -62,46 +78,50 @@ func (hdb *HostDB) decrementReliability(addr modules.NetAddress, penalty types.C
// threadedProbeHost tries to fetch the settings of a host. If successful, the
// host is put in the set of active hosts. If unsuccessful, the host id deleted
// from the set of active hosts.
func (hdb *HostDB) threadedProbeHost(entry *hostEntry) {
// Request the most recent set of settings from the host.
var settings modules.HostSettings
err := func() error {
conn, err := net.DialTimeout("tcp", string(entry.IPAddress), 10e9)
if err != nil {
return err
}
defer conn.Close()
err = encoding.WriteObject(conn, [8]byte{'S', 'e', 't', 't', 'i', 'n', 'g', 's'})
if err != nil {
return err
}
return encoding.ReadObject(conn, &settings, maxSettingsLen)
}()

// Now that network communication is done, lock the hostdb to modify the
// host entry.
id := hdb.mu.Lock()
defer hdb.mu.Unlock(id)
func (hdb *HostDB) threadedProbeHosts() {
for hostEntry := range hdb.scanPool {
// Request settings from the queued host entry.
var settings modules.HostSettings
err := func() error {
conn, err := net.DialTimeout("tcp", string(hostEntry.IPAddress), hostRequestTimeout)
if err != nil {
return err
}
defer conn.Close()
err = encoding.WriteObject(conn, [8]byte{'S', 'e', 't', 't', 'i', 'n', 'g', 's'})
if err != nil {
return err
}
return encoding.ReadObject(conn, &settings, maxSettingsLen)
}()

if err != nil {
hdb.decrementReliability(entry.IPAddress, UnreachablePenalty)
return
}
// Now that network communication is done, lock the hostdb to modify the
// host entry.
id := hdb.mu.Lock()
{
if err != nil {
hdb.decrementReliability(hostEntry.IPAddress, UnreachablePenalty)
hdb.mu.Unlock(id)
continue
}

// Update the host settings, reliability, and weight. The old IPAddress
// must be preserved.
settings.IPAddress = entry.HostSettings.IPAddress
entry.HostSettings = settings
entry.reliability = ActiveReliability
entry.weight = hdb.hostWeight(*entry)

// If the host is not already in the database and 'MaxActiveHosts' has not
// been reached, add the host to the database.
_, exists1 := hdb.activeHosts[entry.IPAddress]
_, exists2 := hdb.allHosts[entry.IPAddress]
if !exists1 && exists2 && len(hdb.activeHosts) < MaxActiveHosts {
hdb.insertNode(entry)
hdb.notifySubscribers()
// Update the host settings, reliability, and weight. The old IPAddress
// must be preserved.
settings.IPAddress = hostEntry.HostSettings.IPAddress
hostEntry.HostSettings = settings
hostEntry.reliability = MaxReliability
hostEntry.weight = hdb.hostWeight(*hostEntry)

// If the host is not already in the database and 'MaxActiveHosts' has not
// been reached, add the host to the database.
_, exists1 := hdb.activeHosts[hostEntry.IPAddress]
_, exists2 := hdb.allHosts[hostEntry.IPAddress]
if !exists1 && exists2 && len(hdb.activeHosts) < MaxActiveHosts {
hdb.insertNode(hostEntry)
hdb.notifySubscribers()
}
}
hdb.mu.Unlock(id)
}
}

Expand All @@ -114,13 +134,12 @@ func (hdb *HostDB) threadedScan() {
// inactive hosts.
id := hdb.mu.Lock()
{
// Check all of the active hosts.
// Scan all active hosts.
for _, host := range hdb.activeHosts {
go hdb.threadedProbeHost(host.hostEntry)
hdb.scanHostEntry(host.hostEntry)
}

// Assemble all of the inactive hosts into a single array and
// shuffle it.
// Assemble all of the inactive hosts into a single array.
var random []*hostEntry
for _, entry := range hdb.allHosts {
entry2, exists := hdb.activeHosts[entry.IPAddress]
Expand Down Expand Up @@ -154,32 +173,33 @@ func (hdb *HostDB) threadedScan() {
}

// Select the first InactiveHostCheckupQuantity hosts from the
// shuffled list.
// shuffled list and scan them.
n := InactiveHostCheckupQuantity
if len(random) < InactiveHostCheckupQuantity {
n = len(random)
}
for i := 0; i < n; i++ {
go hdb.threadedProbeHost(random[i])
hdb.scanHostEntry(random[i])
}
}
hdb.mu.Unlock(id)

// Sleep for a random amount of time between 4 and 24 hours. The time
// is randomly generated so that hosts who are only on at certain times
// of the day or week will still be included. Random times also make it
// harder for hosts to game the system.
randSleep, err := rand.Int(rand.Reader, big.NewInt(int64(MaxScanSleep)))
// Sleep for a random amount of time before doing another round of
// scanning. The minimums and maximums keep the scan time reasonable,
// while the randomness prevents the scanning from always happening at
// the same time of day or week.
maxBig := big.NewInt(int64(MaxScanSleep))
minBig := big.NewInt(int64(MinScanSleep))
randSleep, err := rand.Int(rand.Reader, maxBig.Sub(maxBig, minBig))
if err != nil {
if build.DEBUG {
panic(err)
} else {
// If there's an error generating the random number, just sleep
// for 15 hours because it'll hit all times of the day after
// enough iterations.
randSleep = big.NewInt(int64(DefaultScanSleep))
// If there's an error, sleep for the default amount of time.
defaultBig := big.NewInt(int64(DefaultScanSleep))
randSleep = defaultBig.Sub(defaultBig, minBig)
}
}
time.Sleep(time.Duration(randSleep.Int64()) + MinScanSleep)
time.Sleep(time.Duration(randSleep.Int64()) + MinScanSleep) // this means the MaxScanSleep is actual Max+Min.
}
}
2 changes: 1 addition & 1 deletion modules/renter/negotiate.go
Expand Up @@ -110,7 +110,7 @@ func (r *Renter) negotiateContract(host modules.HostSettings, up modules.FileUpl
terms := modules.ContractTerms{
FileSize: filesize,
Duration: up.Duration,
DurationStart: height - 1,
DurationStart: height - 3,
WindowSize: defaultWindowSize,
Price: host.Price,
Collateral: host.Collateral,
Expand Down
6 changes: 3 additions & 3 deletions siae/main_test.go
Expand Up @@ -22,9 +22,9 @@ func TestMain(t *testing.T) {
"siad",
"-n",
"-a",
"localhost:45150",
"localhost:45350",
"-r",
"localhost:45151",
"localhost:45351",
"-d",
testDir,
}
Expand All @@ -34,7 +34,7 @@ func TestMain(t *testing.T) {
// daemon.
<-started
time.Sleep(250 * time.Millisecond)
resp, err := http.Get("http://localhost:45150/daemon/stop")
resp, err := http.Get("http://localhost:45350/daemon/stop")
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit deaa8ea

Please sign in to comment.