Skip to content
This repository has been archived by the owner on Nov 2, 2018. It is now read-only.

Commit

Permalink
add RPC call Metadata requesting sector IDs
Browse files Browse the repository at this point in the history
This is second attempt of #2321

The whole list of IDs can be expensive for a host to provide for free,
but a short list of sector IDs (up to 2^17) is cheap. At the same time
it opens doors for stateless clients, i.e. recovering everything from
seed only.
  • Loading branch information
Boris Nagaev authored and starius committed Jul 15, 2018
1 parent 5b75e1f commit 7833125
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 0 deletions.
1 change: 1 addition & 0 deletions modules/host.go
Expand Up @@ -105,6 +105,7 @@ type (
RenewCalls uint64 `json:"renewcalls"`
ReviseCalls uint64 `json:"revisecalls"`
SettingsCalls uint64 `json:"settingscalls"`
MetadataCalls uint64 `json:"metadatacalls"`
UnrecognizedCalls uint64 `json:"unrecognizedcalls"`
}

Expand Down
1 change: 1 addition & 0 deletions modules/host/host.go
Expand Up @@ -122,6 +122,7 @@ type Host struct {
atomicRenewCalls uint64
atomicReviseCalls uint64
atomicSettingsCalls uint64
atomicMetadataCalls uint64
atomicUnrecognizedCalls uint64

// Error management. There are a few different types of errors returned by
Expand Down
59 changes: 59 additions & 0 deletions modules/host/negotiatemetadata.go
@@ -0,0 +1,59 @@
package host

import (
"errors"
"net"

"github.com/NebulousLabs/Sia/encoding"
"github.com/NebulousLabs/Sia/modules"
)

// managedRPCMetadata accepts a request to get list of sector ids.
func (h *Host) managedRPCMetadata(conn net.Conn) error {
// Perform the file contract revision exchange, giving the renter the most
// recent file contract revision and getting the storage obligation that
// will be used to get sector ids.
_, so, err := h.managedRPCRecentRevision(conn)
if err != nil {
return extendErr("RPCRecentRevision failed: ", err)
}
// The storage obligation is received with a lock on it. Defer a call to
// unlock the storage obligation.
defer func() {
h.managedUnlockStorageObligation(so.id())
}()
// Receive boundaries of so.SectorRoots to return.
var begin, end uint64
err = encoding.ReadObject(conn, &begin, 8)
if err != nil {
return extendErr("unable to read 'begin': ", ErrorConnection(err.Error()))
}
err = encoding.ReadObject(conn, &end, 8)
if err != nil {
return extendErr("unable to read 'end': ", ErrorConnection(err.Error()))
}
if end < begin {
err = errors.New("Range error")
modules.WriteNegotiationRejection(conn, err)
return err
}
if end > uint64(len(so.SectorRoots)) {
err = errors.New("Range out of bounds error")
modules.WriteNegotiationRejection(conn, err)
return err
}
if end-begin > modules.NegotiateMetadataMaxSliceSize {
err = errors.New("The range is too long")
modules.WriteNegotiationRejection(conn, err)
return err
}
if err = modules.WriteNegotiationAcceptance(conn); err != nil {
return extendErr("failed to write [begin,end) acceptance: ", ErrorConnection(err.Error()))
}
// Write roots of all sectors.
err = encoding.WriteObject(conn, so.SectorRoots[begin:end])
if err != nil {
return extendErr("cound not write sectors: ", ErrorConnection(err.Error()))
}
return nil
}
3 changes: 3 additions & 0 deletions modules/host/network.go
Expand Up @@ -281,6 +281,9 @@ func (h *Host) threadedHandleConn(conn net.Conn) {
case modules.RPCDownload:
atomic.AddUint64(&h.atomicDownloadCalls, 1)
err = extendErr("incoming RPCDownload failed: ", h.managedRPCDownload(conn))
case modules.RPCMetadata:
atomic.AddUint64(&h.atomicMetadataCalls, 1)
err = extendErr("incoming RPCMetadata failed: ", h.managedRPCMetadata(conn))
case modules.RPCRenewContract:
atomic.AddUint64(&h.atomicRenewCalls, 1)
err = extendErr("incoming RPCRenewContract failed: ", h.managedRPCRenewContract(conn))
Expand Down
18 changes: 18 additions & 0 deletions modules/negotiate.go
Expand Up @@ -30,6 +30,13 @@ const (
// the negotiation.
NegotiateDownloadTime = 600 * time.Second

// NegotiateMetadataTime establishes the minimum amount of time that
// the connection deadline is expected to be set to when a metadata
// is being requested from the host. The deadline is long
// enough that the connection should be successful even if both parties are
// running Tor.
NegotiateMetadataTime = 120 * time.Second

// NegotiateFileContractRevisionTime defines the minimum amount of time
// that the renter and host have to negotiate a file contract revision. The
// time is set high enough that a full 4MB can be piped through a
Expand Down Expand Up @@ -154,6 +161,9 @@ var (
// RPCDownload is the specifier for downloading a file from a host.
RPCDownload = types.Specifier{'D', 'o', 'w', 'n', 'l', 'o', 'a', 'd', 2}

// RPCMetadata is the specifier for getting the list of sector roots.
RPCMetadata = types.Specifier{'M', 'e', 't', 'a', 'd', 'a', 't', 'a'}

// RPCFormContract is the specifier for forming a contract with a host.
RPCFormContract = types.Specifier{'F', 'o', 'r', 'm', 'C', 'o', 'n', 't', 'r', 'a', 'c', 't', 2}

Expand All @@ -177,6 +187,14 @@ var (
Standard: uint64(1 << 22), // 4 MiB
Testing: uint64(1 << 12), // 4 KiB
}).(uint64)

// NegotiateMetadataMaxSliceSize establishes the maximum allowed length
// of the list of sectors returned by the Metadata RPC.
NegotiateMetadataMaxSliceSize = build.Select(build.Var{
Dev: uint64(1 << 17),
Standard: uint64(1 << 17),
Testing: uint64(1 << 4),
}).(uint64)
)

type (
Expand Down
112 changes: 112 additions & 0 deletions modules/renter/contractor/host_integration_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/NebulousLabs/Sia/modules/host"
"github.com/NebulousLabs/Sia/modules/miner"
"github.com/NebulousLabs/Sia/modules/renter/hostdb"
"github.com/NebulousLabs/Sia/modules/renter/proto"
"github.com/NebulousLabs/Sia/modules/transactionpool"
modWallet "github.com/NebulousLabs/Sia/modules/wallet"
"github.com/NebulousLabs/Sia/types"
Expand Down Expand Up @@ -670,3 +671,114 @@ func TestContractPresenceLeak(t *testing.T) {
t.Fatalf("Expected to get equal errors, got %q and %q.", errors[0], errors[1])
}
}

// TestIntegrationMetadata tests the Metadata RPC.
func TestIntegrationMetadata(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
// create testing trio
h, c, _, err := newTestingTrio(t.Name())
if err != nil {
t.Fatal(err)
}
defer h.Close()
defer c.Close()

// get the host's entry from the db
hostEntry, ok := c.hdb.Host(h.PublicKey())
if !ok {
t.Fatal("no entry for host in db")
}

// form a contract with the host
contract, err := c.managedNewContract(hostEntry, types.SiacoinPrecision.Mul64(10), c.blockHeight+100)
if err != nil {
t.Fatal(err)
}
sc, has := c.staticContracts.Acquire(contract.ID)
if !has {
t.Fatal("c.staticContracts.Acquire returned false")
}
secketKey := sc.Metadata().SecretKey
windowStart := sc.Metadata().EndHeight
c.staticContracts.Return(sc)

// get revision and no sector ids from the host
lastRevision, _, err := proto.GetMetadata(hostEntry, contract.ID, secketKey, windowStart, 0, 0, nil)
if err != nil {
t.Fatalf("RPCMetadata returned error: %v", err)
}
wantSize := uint64(0)
if lastRevision.NewFileSize != wantSize {
t.Errorf("lastRevision.NewFileSize = %d, want %d", lastRevision.NewFileSize, wantSize)
}

n := modules.NegotiateMetadataMaxSliceSize * 2

// revise the contract
editor, err := c.Editor(contract.HostPublicKey, nil)
if err != nil {
t.Fatal(err)
}
var want []crypto.Hash
for i := 0; i < int(n); i++ {
data := fastrand.Bytes(int(modules.SectorSize))
root, err := editor.Upload(data)
if err != nil {
t.Fatal(err)
}
want = append(want, root)
}
err = editor.Close()
if err != nil {
t.Fatal(err)
}

// check correct ranges
correctRanges := []struct{ begin, end uint64 }{
{0, 0},
{0, 1},
{0, 2},
{0, modules.NegotiateMetadataMaxSliceSize},
{1, modules.NegotiateMetadataMaxSliceSize + 1},
{modules.NegotiateMetadataMaxSliceSize, 2 * modules.NegotiateMetadataMaxSliceSize},
{uint64(n - 1), uint64(n)},
{uint64(n - 1), uint64(n - 1)},
{uint64(n), uint64(n)},
}
for _, r := range correctRanges {
// get revision and sector IDs from the host
lastRevision, got, err := proto.GetMetadata(hostEntry, contract.ID, secketKey, windowStart, r.begin, r.end, nil)
size := r.end - r.begin
if err != nil {
t.Fatalf("RPCMetadata returned error for case %v: %v", r, err)
}
wantSize = n * modules.SectorSize
if lastRevision.NewFileSize != wantSize {
t.Errorf("case %v, lastRevision.NewFileSize = %d, want %d", r, lastRevision.NewFileSize, wantSize)
}
if uint64(len(got)) != size {
t.Fatalf("case %v, list length: want %d, got %d", r, size, len(got))
}
for i := r.begin; i < r.end; i++ {
if got[i-r.begin] != want[i] {
t.Errorf("RPCMetadata returned wrong sector id for case %v for sector %d", r, i)
}
}
}

// check incorrect ranges
incorrectRanges := []struct{ begin, end uint64 }{
{5, 4},
{0, modules.NegotiateMetadataMaxSliceSize + 1},
{uint64(n - 1), uint64(n + 1)},
}
for _, r := range incorrectRanges {
_, _, err := proto.GetMetadata(hostEntry, contract.ID, secketKey, windowStart, r.begin, r.end, nil)
if err == nil {
t.Fatalf("RPCMetadata succeeded for case %v, want error", r)
}
}
}
57 changes: 57 additions & 0 deletions modules/renter/proto/metadata.go
@@ -0,0 +1,57 @@
package proto

import (
"errors"
"net"
"time"

"github.com/NebulousLabs/Sia/crypto"
"github.com/NebulousLabs/Sia/encoding"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/Sia/types"
)

// GetMetadata downloads sector IDs from the host.
func GetMetadata(host modules.HostDBEntry, fcid types.FileContractID, sk crypto.SecretKey, windowStart types.BlockHeight, begin, end uint64, cancel <-chan struct{}) (lastRevision types.FileContractRevision, ids []crypto.Hash, err error) {
conn, err := (&net.Dialer{
Cancel: cancel,
Timeout: 15 * time.Second,
}).Dial("tcp", string(host.NetAddress))
if err != nil {
return
}
defer conn.Close()
// allot 2 minutes for RPC request + revision exchange
extendDeadline(conn, modules.NegotiateMetadataTime)
if err = encoding.WriteObject(conn, modules.RPCMetadata); err != nil {
err = errors.New("couldn't initiate RPC: " + err.Error())
return
}
lastRevision, err = getRecentRevision(conn, fcid, sk, windowStart, host.Version)
if err != nil {
return
}
if err = encoding.WriteObject(conn, begin); err != nil {
err = errors.New("unable to write 'begin': " + err.Error())
return
}
if err = encoding.WriteObject(conn, end); err != nil {
err = errors.New("unable to write 'end': " + err.Error())
return
}
// read acceptance
if err = modules.ReadNegotiationAcceptance(conn); err != nil {
err = errors.New("host did not accept [begin,end): " + err.Error())
return
}
numSectors := end - begin
if err = encoding.ReadObject(conn, &ids, numSectors*crypto.HashSize+8); err != nil {
err = errors.New("unable to read 'ids': " + err.Error())
return
}
if uint64(len(ids)) != end-begin {
err = errors.New("the host returned too short list of sector IDs")
return
}
return
}

0 comments on commit 7833125

Please sign in to comment.