Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
TheRocketCat committed Jan 26, 2021
1 parent ad1b057 commit 041d943
Show file tree
Hide file tree
Showing 25 changed files with 354 additions and 205 deletions.
12 changes: 6 additions & 6 deletions cmd/telliot/cmds.go
Expand Up @@ -118,7 +118,7 @@ func (b *balanceCmd) Run() error {
return errors.Wrapf(err, "creating config")
}

_, err = createLogger(cfg.Logger, cfg.LogLevel)
logger, err := createLogger(cfg.Logger, cfg.LogLevel)
if err != nil {
return errors.Wrapf(err, "creating logger")
}
Expand All @@ -141,7 +141,7 @@ func (b *balanceCmd) Run() error {
return errors.Wrapf(err, "parsing argument")
}
}
return ops.Balance(ctx, client, contract.Getter, addr.addr)
return ops.Balance(ctx, logger, client, contract.Getter, addr.addr)
}

type depositCmd struct {
Expand Down Expand Up @@ -249,7 +249,7 @@ func (n newDisputeCmd) Run() error {
return errors.Wrapf(err, "creating config")
}

_, err = createLogger(cfg.Logger, cfg.LogLevel)
logger, err := createLogger(cfg.Logger, cfg.LogLevel)
if err != nil {
return errors.Wrapf(err, "creating logger")
}
Expand All @@ -275,7 +275,7 @@ func (n newDisputeCmd) Run() error {
if err != nil {
return errors.Wrapf(err, "parsing argument")
}
return ops.Dispute(ctx, client, contract, account, requestID.Int, timestamp.Int, minerIndex.Int)
return ops.Dispute(ctx, logger, client, contract, account, requestID.Int, timestamp.Int, minerIndex.Int)
}

type voteCmd struct {
Expand All @@ -290,7 +290,7 @@ func (v voteCmd) Run() error {
return errors.Wrapf(err, "creating config")
}

_, err = createLogger(cfg.Logger, cfg.LogLevel)
logger, err := createLogger(cfg.Logger, cfg.LogLevel)
if err != nil {
return errors.Wrapf(err, "creating logger")
}
Expand All @@ -306,7 +306,7 @@ func (v voteCmd) Run() error {
if err != nil {
return errors.Wrapf(err, "parsing argument")
}
return ops.Vote(ctx, client, contract, account, disputeID.Int, v.support)
return ops.Vote(ctx, logger, client, contract, account, disputeID.Int, v.support)
}

type showCmd struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/dataServer/dataServer_test.go
Expand Up @@ -5,14 +5,14 @@ package dataServer

import (
"context"
"fmt"
"math/big"

"net/http"
"strconv"
"testing"
"time"

"github.com/go-kit/kit/log/level"
"github.com/tellor-io/telliot/pkg/config"
"github.com/tellor-io/telliot/pkg/contracts"
"github.com/tellor-io/telliot/pkg/db"
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestDataServer(t *testing.T) {
resp, err := http.Get("http://" + cfg.DataServer.ListenHost + ":" + strconv.Itoa(int(cfg.DataServer.ListenPort)) + "/balance")
testutil.Ok(t, err)
defer resp.Body.Close()
fmt.Printf("Finished: %+v", resp)
level.Info(logger).Log("response finished", "resp", resp)
exitCh <- 1
time.Sleep(1 * time.Second)
testutil.Assert(t, ds.Stopped, "Did not stop server")
Expand Down
26 changes: 18 additions & 8 deletions pkg/db/db.go
Expand Up @@ -4,6 +4,8 @@
package db

import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/filter"
Expand Down Expand Up @@ -32,8 +34,8 @@ type DB interface {
}

type impl struct {
db *leveldb.DB
log *util.Logger
db *leveldb.DB
logger log.Logger
}

// Open the database using the given DB file as its data store.
Expand All @@ -52,13 +54,13 @@ func Open(file string) (DB, error) {
return nil, err
}

i := &impl{db: db, log: util.NewLogger("db", "DB")}
i.log.Info("Created DB at path: %s\n", file)
i := &impl{db: db, logger: log.With(util.SetupLogger("debug"), "db", "DB")}
level.Info(i.logger).Log("msg", "created DB", "at", file)
return i, nil
}

func (i *impl) Close() error {
i.log.Info("Closing DB...")
level.Info(i.logger).Log("msg", "closing db")
return i.db.Close()
}

Expand All @@ -67,20 +69,28 @@ func (i *impl) Has(key string) (bool, error) {
}

func (i *impl) Put(key string, value []byte) error {
i.log.Debug("Adding DB entry: %s with %d bytes of data", key, len(value))
level.Debug(i.logger).Log(
"msg", "adding DB entry",
"key", key,
"bytes", len(value),
)

return i.db.Put([]byte(key), value, nil)
}

func (i *impl) Get(key string) ([]byte, error) {
b, e := i.db.Get([]byte(key), nil)
if e == errors.ErrNotFound {
i.log.Debug("Did not find value for key: %s", key)
level.Debug(i.logger).Log(
"msg", "did not find value",
"key", key,
)
return nil, nil
}
return b, e
}

func (i *impl) Delete(key string) error {
i.log.Debug("Deleting key: %s", key)
level.Debug(i.logger).Log("msg", "deleting key", "key", key)
return i.db.Delete([]byte(key), nil)
}
16 changes: 11 additions & 5 deletions pkg/db/localDataProxy.go
Expand Up @@ -4,21 +4,23 @@
package db

import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/tellor-io/telliot/pkg/util"
)

type localProxy struct {
localDB DB
log *util.Logger
logger log.Logger
}

// OpenLocalProxy creates a local data proxy so that the miner operations are seamless regardless
// whether accessing data remotely or locally.
func OpenLocalProxy(localDB DB) (DataServerProxy, error) {
log := util.NewLogger("db", "LocalDataProxy")
log.Info("Using local data proxy to pull data from local DB")
return &localProxy{localDB: localDB, log: log}, nil
logger := log.With(util.SetupLogger("debug"), "db", "LocalDataProxy")
level.Info(logger).Log("msg", "using local data proxy to pull data from local DB")
return &localProxy{localDB: localDB, logger: logger}, nil
}

func (l *localProxy) Get(key string) ([]byte, error) {
Expand All @@ -43,7 +45,11 @@ func (l *localProxy) BatchGet(keys []string) (map[string][]byte, error) {
outMap[k] = bts
}
}
l.log.Debug("Requested keys: %v, resulting output:%v", keys, outMap)
level.Debug(l.logger).Log(
"msg", "get result from requested keys",
"keys", keys,
"output", outMap,
)
return outMap, nil
}

Expand Down
64 changes: 46 additions & 18 deletions pkg/db/remoteDataProxy.go
Expand Up @@ -5,7 +5,6 @@ package db

import (
"crypto/ecdsa"
"fmt"
"os"
"strconv"
"strings"
Expand All @@ -14,6 +13,8 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/tellor-io/telliot/pkg/config"
Expand All @@ -23,8 +24,6 @@ import (
// how long a signed request is good for before reject it. Semi-protection against replays.
const _validityThreshold = 2 //seconds

var rdbLog *util.Logger

/***************************************************************************************
** NOTE: This component is used to proxy data requests from approved miner processes. Miner
** public addresses are whitelisted and a small history of requests is retained to mitigate
Expand All @@ -43,18 +42,18 @@ type remoteImpl struct {
localDB DB
whitelist map[string]bool
postURL string
log *util.Logger
logger log.Logger
wlHistory map[string]*lru.ARCCache
rwLock sync.RWMutex
}

// OpenRemoteDB establishes a proxy to a remote data server.
func OpenRemoteDB(cfg *config.Config, localDB DB) (DataServerProxy, error) {
rdbLog = util.NewLogger("db", "RemoteDBProxy")
logger := log.With(util.SetupLogger("debug"), "db", "remoteDb")

privateKey, err := crypto.HexToECDSA(os.Getenv(config.PrivateKeyEnvName))
if err != nil {
fmt.Println("Problem decoding private key", err)
level.Error(logger).Log("msg", "problem decoding private key", "err", err)
return nil, err
}
//get address from config
Expand Down Expand Up @@ -85,9 +84,15 @@ func OpenRemoteDB(cfg *config.Config, localDB DB) (DataServerProxy, error) {
postURL: url,
whitelist: wlMap,
wlHistory: wlLRU,
log: util.NewLogger("db", "RemoteDB"),
logger: logger,
}
i.log.Info("Created Remote data proxy connector for %s:%d\n", cfg.Mine.RemoteDBHost, cfg.Mine.RemoteDBPort)

level.Info(i.logger).Log(
"msg", "created remote data proxy connector",
"host", cfg.Mine.RemoteDBHost,
"port", cfg.Mine.RemoteDBPort,
)

return i, nil
}

Expand All @@ -106,7 +111,7 @@ func (i *remoteImpl) hasAddressPrefix(key string) bool {
func (i *remoteImpl) IncomingRequest(data []byte) ([]byte, error) {
req, err := decodeRequest(data, i)
if err != nil {
rdbLog.Error("Problem decoding incoming request: %v", err)
level.Error(i.logger).Log("msg", "problem decoding incoming request", "err", err)
return errorResponse(err.Error())
}

Expand Down Expand Up @@ -153,21 +158,21 @@ func (i *remoteImpl) IncomingRequest(data []byte) ([]byte, error) {
defer i.rwLock.RUnlock()
}

i.log.Info("Getting remote request for keys: %v", req.dbKeys)
level.Info(i.logger).Log("msg", "getting remote request for keys", "dbKeys", req.dbKeys)

outMap := map[string][]byte{}
for _, k := range req.dbKeys {
if req.dbValues == nil && !isKnownKey(k) {
return errorResponse("Invalid lookup key: " + k)
}
rdbLog.Debug("Looking up local DB key: %v", k)
level.Debug(i.logger).Log("msg", "looking up local DB key", "key", k)
bts, err := i.localDB.Get(k)

if err != nil {
return errorResponse(err.Error())
}
if bts != nil {
rdbLog.Debug("Result to %d bytes of content", len(bts))
level.Debug(i.logger).Log("msg", "get result byte size", "bytes", len(bts))
outMap[k] = bts
}
}
Expand Down Expand Up @@ -268,9 +273,16 @@ func (i *remoteImpl) Verify(hash []byte, timestamp int64, sig []byte) error {
}
addr := crypto.PubkeyToAddress(*pubKey)
ashex := strings.ToLower(addr.Hex())
rdbLog.Debug("Verifying signature from %v request against whitelist: %v", ashex, i.whitelist[ashex])
level.Debug(i.logger).Log(
"msg", "verifying signature",
"address", ashex,
"whitelist", i.whitelist[ashex],
)
if !i.whitelist[ashex] {
rdbLog.Warn("Unauthorized miner detected with address: %v", ashex)
level.Warn(i.logger).Log(
"msg", "unauthorized miner detected",
"address", ashex,
)
return errors.Errorf("Unauthorized")
}

Expand All @@ -279,17 +291,33 @@ func (i *remoteImpl) Verify(hash []byte, timestamp int64, sig []byte) error {
return errors.Errorf("No history found for address")
}
if cache.Contains(timestamp) {
rdbLog.Debug("Miner %v already made request at %v", ashex, timestamp)
level.Debug(i.logger).Log(
"msg", "miner already made request",
"address", ashex,
"timestamp", timestamp,
)
expr := time.Unix(timestamp+_validityThreshold, 0)
now := time.Now()
if now.After(expr) {
rdbLog.Warn("Request time %v expired (%v)", time.Unix(timestamp, 0), now)
level.Warn(i.logger).Log(
"msg", "request time expired",
"timestamp", time.Unix(timestamp, 0),
"now", now,
)
return errors.Errorf("Request expired")
}
rdbLog.Debug("Time of last request: %v compared to %v", expr, now)
level.Debug(i.logger).Log(
"msg", "time of last request",
"compared", expr,
"to", now,
)

} else {
rdbLog.Debug("Never seen miner before: %v at time %v", ashex, timestamp)
level.Debug(i.logger).Log(
"msg", "never seen miner before",
"address", ashex,
"timestamp", timestamp,
)
}
cache.Add(timestamp, true)
return nil
Expand Down

0 comments on commit 041d943

Please sign in to comment.