Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
TheRocketCat committed Jan 26, 2021
1 parent ff18b7b commit 3d0e151
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 39 deletions.
14 changes: 10 additions & 4 deletions pkg/apiOracle/valueOracle.go
Expand Up @@ -11,12 +11,14 @@ import (
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/tellor-io/telliot/pkg/config"
"github.com/tellor-io/telliot/pkg/util"
)

var logger = util.NewLogger("apiOracle", "valueOracle")
var logger = log.With(util.SetupLogger("debug"), "apiOracle", "valueOracle")

// maps symbol to a time window of values.
var valueHistory map[string]*Window
Expand Down Expand Up @@ -67,7 +69,7 @@ func writeOutHistory() {
// this function is single threaded, but we need mutex to access multithreaded history.
valueHistoryMutex.Unlock()
if err != nil {
logger.Error("failed to marshal PSR values: %s", err.Error())
level.Error(logger).Log("msg", "failed to marshal PSR values", "err", err.Error())
return
}

Expand All @@ -76,13 +78,17 @@ func writeOutHistory() {
psrSavedDataTmp := psrSavedData + ".tmp"
err = ioutil.WriteFile(psrSavedDataTmp, data, 0644)
if err != nil {
logger.Error("failed to write out PSR values to %s: %s", psrSavedDataTmp, err.Error())
level.Error(logger).Log(
"msg", "failed to write out PSR values",
"psrSavedDataTmp", psrSavedDataTmp,
"err", err.Error(),
)
return
}
// Rename tmp file to old file (should be atomic on most modern OS)
err = os.Rename(psrSavedDataTmp, psrSavedData)
if err != nil {
logger.Error("failed move new PSR save onto old: %s", err.Error())
level.Error(logger).Log("msg", "failed move new PSR save onto old", "err", err.Error())
return
}
}
Expand Down
64 changes: 47 additions & 17 deletions pkg/db/proxy.go
Expand Up @@ -15,6 +15,8 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/tellor-io/telliot/pkg/config"
Expand Down Expand Up @@ -49,7 +51,7 @@ type DataServerProxy interface {
// how long a signed request is good for before reject it. Semi-protection against replays.
const _validityThreshold = 2 //seconds

var rdbLog *util.Logger
var rdbLog log.Logger

/***************************************************************************************
** NOTE: This component is used to proxy data requests from approved miner processes. Miner
Expand All @@ -69,7 +71,7 @@ type remoteImpl struct {
localDB DB
whitelist map[string]bool
postURL string
log *util.Logger
logger log.Logger
wlHistory map[string]*lru.ARCCache
isRemote bool
rwLock sync.RWMutex
Expand All @@ -85,11 +87,11 @@ func OpenLocal(cfg *config.Config, localDB DB) (DataServerProxy, error) {

// OpenRemoteDB establishes a proxy to a remote data server.
func open(cfg *config.Config, localDB DB, isRemote bool) (DataServerProxy, error) {
rdbLog = util.NewLogger("db", "RemoteDBProxy")
rdbLog = log.With(util.SetupLogger("debug"), "db", "RemoteDBProxy")

privateKey, err := crypto.HexToECDSA(os.Getenv(config.PrivateKeyEnvName))
if err != nil {
fmt.Println("Problem decoding private key", err)
level.Error(rdbLog).Log("msg", "problem decoding private key", "err", err)
return nil, err
}
//get address from config
Expand Down Expand Up @@ -120,10 +122,14 @@ func open(cfg *config.Config, localDB DB, isRemote bool) (DataServerProxy, error
postURL: url,
whitelist: wlMap,
wlHistory: wlLRU,
log: util.NewLogger("db", "RemoteDB"),
logger: log.With(util.SetupLogger("debug"), "db", "RemoteDB"),
isRemote: isRemote,
}
i.log.Info("Created Remote data proxy connector for %s:%d\n", cfg.Mine.RemoteDBHost, cfg.Mine.RemoteDBPort)
level.Info(i.logger).Log(
"msg", "created remote data proxy connector",
"host", cfg.Mine.RemoteDBHost,
"port", cfg.Mine.RemoteDBPort,
)
return i, nil
}

Expand All @@ -142,7 +148,7 @@ func (i *remoteImpl) hasAddressPrefix(key string) bool {
func (i *remoteImpl) IncomingRequest(data []byte) ([]byte, error) {
req, err := decodeRequest(data, i)
if err != nil {
rdbLog.Error("Problem decoding incoming request: %v", err)
level.Error(i.logger).Log("msg", "problem decoding incoding request", "err", err)
return errorResponse(err.Error())
}

Expand Down Expand Up @@ -189,21 +195,21 @@ func (i *remoteImpl) IncomingRequest(data []byte) ([]byte, error) {
defer i.rwLock.RUnlock()
}

i.log.Info("Getting remote request for keys: %v", req.dbKeys)
level.Info(i.logger).Log("msg", "getting remote request for keys", req.dbKeys)

outMap := map[string][]byte{}
for _, k := range req.dbKeys {
if req.dbValues == nil && !isKnownKey(k) {
return errorResponse("Invalid lookup key: " + k)
}
rdbLog.Debug("Looking up local DB key: %v", k)
level.Debug(i.logger).Log("looking up for local DB key", "key", k)
bts, err := i.localDB.Get(k)

if err != nil {
return errorResponse(err.Error())
}
if bts != nil {
rdbLog.Debug("Result to %d bytes of content", len(bts))
level.Debug(i.logger).Log("msg", "get bytes of result", "bytes", len(bts))
outMap[k] = bts
}
}
Expand Down Expand Up @@ -243,7 +249,11 @@ func (i *remoteImpl) BatchGet(keys []string) (map[string][]byte, error) {
outMap[k] = bts
}
}
i.log.Debug("Requested keys: %v, resulting output:%v", keys, outMap)
level.Debug(i.logger).Log(
"msg", "requested keys result in output",
"keys", keys,
"outMap", outMap,
)
return outMap, nil
}
req, err := createRequest(keys, nil, i)
Expand Down Expand Up @@ -334,9 +344,13 @@ func (i *remoteImpl) Verify(hash []byte, timestamp int64, sig []byte) error {
}
addr := crypto.PubkeyToAddress(*pubKey)
ashex := strings.ToLower(addr.Hex())
rdbLog.Debug("Verifying signature from %v request against whitelist: %v", ashex, i.whitelist[ashex])
level.Debug(i.logger).Log(
"msg", "verifying signature against whitelist",
"address", ashex,
"whitlisted", i.whitelist[ashex],
)
if !i.whitelist[ashex] {
rdbLog.Warn("Unauthorized miner detected with address: %v", ashex)
level.Warn(i.logger).Log("msg", "unauthorized miner detected", "address", ashex)
return errors.Errorf("Unauthorized")
}

Expand All @@ -345,17 +359,33 @@ func (i *remoteImpl) Verify(hash []byte, timestamp int64, sig []byte) error {
return errors.Errorf("No history found for address")
}
if cache.Contains(timestamp) {
rdbLog.Debug("Miner %v already made request at %v", ashex, timestamp)
level.Debug(i.logger).Log(
"msg", "miner already made request",
"miner", ashex,
"timestamp", timestamp,
)
expr := time.Unix(timestamp+_validityThreshold, 0)
now := time.Now()
if now.After(expr) {
rdbLog.Warn("Request time %v expired (%v)", time.Unix(timestamp, 0), now)
level.Warn(i.logger).Log(
"msg", "request time expired",
"timestamp", time.Unix(timestamp, 0),
"now", now,
)
return errors.Errorf("Request expired")
}
rdbLog.Debug("Time of last request: %v compared to %v", expr, now)
level.Debug(i.logger).Log(
"msg", "time of last request",
"comparing", expr,
"to", now,
)

} else {
rdbLog.Debug("Never seen miner before: %v at time %v", ashex, timestamp)
level.Debug(i.logger).Log(
"msg", "never seen miner before",
"address", ashex,
"timestamp", timestamp,
)
}
cache.Add(timestamp, true)
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/remoteDataProxy.go
Expand Up @@ -308,7 +308,7 @@ func (i *remoteImpl) Verify(hash []byte, timestamp int64, sig []byte) error {
}
level.Debug(i.logger).Log(
"msg", "time of last request",
"compared", expr,
"comparing", expr,
"to", now,
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/ops/miningManager.go
Expand Up @@ -77,7 +77,7 @@ func CreateMiningManager(
account *rpc.Account,
) (*MiningMgr, error) {

group, err := pow.SetupMiningGroup(logger, cfg, exitCh)
group, err := pow.SetupMiningGroup(cfg, logger, exitCh)
if err != nil {
return nil, errors.Wrap(err, "setup miners")
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/rest/server.go
Expand Up @@ -6,15 +6,16 @@ package rest
import (
"context"
"fmt"
"log"
"net/http"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/tellor-io/telliot/pkg/db"
"github.com/tellor-io/telliot/pkg/util"
)

var serverLog = util.NewLogger("rest", "Server")
var serverLog = log.With(util.SetupLogger("debug"), "rest", "Server")

// Server wraps http server with pre-configured paths.
type Server struct {
Expand All @@ -41,21 +42,21 @@ func Create(ctx context.Context, proxy db.DataServerProxy, host string, port uin
// Start the server listening for incoming requests.
func (s *Server) Start() {
go func() {
serverLog.Info("Starting server on %+v\n", s.server.Addr)
level.Info(serverLog).Log("msg", "starting server", "addr", s.server.Addr)
// returns ErrServerClosed on graceful close
if err := s.server.ListenAndServe(); err != http.ErrServerClosed {
// NOTE: there is a chance that next line won't have time to run,
// as main() doesn't wait for this goroutine to stop. don't use
// code with race conditions like these for production. see post
// comments below on more discussion on how to handle this.
// TODO remove this log and return error instead.
log.Fatalf("ListenAndServe(): %s", err)
level.Error(serverLog).Log("msg", "ListenAndServe()", "err", err)
}
}()
}

// Stop stops the server listening.
func (s *Server) Stop() error {
serverLog.Info("Stopping server")
level.Info(serverLog).Log("msg", "stopping server")
return s.server.Close()
}
25 changes: 20 additions & 5 deletions pkg/tracker/retryFetcher.go
Expand Up @@ -8,6 +8,8 @@ import (
"net/http"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/tellor-io/telliot/pkg/util"
)
Expand All @@ -19,7 +21,7 @@ func init() {
client = http.Client{}
}

var retryFetchLog = util.NewLogger("tracker", "FetchWithRetries")
var retryFetchLog = log.With(util.SetupLogger("debug"), "tracker", "fetchWithRetries")

// FetchRequest holds info for a request.
// TODO: add mock fetch.
Expand All @@ -33,15 +35,23 @@ func fetchWithRetries(req *FetchRequest) ([]byte, error) {
}

func _recFetch(req *FetchRequest, expiration time.Time) ([]byte, error) {
retryFetchLog.Debug("Fetch request will expire at: %v (timeout: %v)", expiration, req.timeout)
level.Debug(retryFetchLog).Log(
"msg", "fetch request will expire",
"at", expiration,
"timeout", req.timeout,
)

now := clck.Now()
client.Timeout = expiration.Sub(now)

r, err := client.Get(req.queryURL)
if err != nil {
//log local non-timeout errors for now
retryFetchLog.Warn("Problem fetching data from: %s. %v", req.queryURL, err)
level.Warn(retryFetchLog).Log(
"msg", "problem fetching data",
"from", req.queryURL,
"err", err,
)
now := clck.Now()
if now.After(expiration) {
return nil, errors.Wrap(err, "retry timeout expired, last error is wrapped")
Expand All @@ -50,7 +60,7 @@ func _recFetch(req *FetchRequest, expiration time.Time) ([]byte, error) {
time.Sleep(1000 * time.Millisecond)

//try again
retryFetchLog.Warn("Trying fetch again...")
level.Warn(retryFetchLog).Log("msg", "trying fetch again")
return _recFetch(req, expiration)
}

Expand All @@ -61,7 +71,12 @@ func _recFetch(req *FetchRequest, expiration time.Time) ([]byte, error) {
}

if r.StatusCode < 200 || r.StatusCode > 299 {
retryFetchLog.Warn("Response from fetching %s. Response code: %d, payload: %s", req.queryURL, r.StatusCode, data)
level.Warn(retryFetchLog).Log(
"msg", "response from fetching",
"queryURL", req.queryURL,
"statusCode", r.StatusCode,
"payload", data,
)
//log local non-timeout errors for now
// this is a duplicated error that is unlikely to be triggered since expiration is updated above
now := clck.Now()
Expand Down

0 comments on commit 3d0e151

Please sign in to comment.