Skip to content

Commit

Permalink
review changes
Browse files Browse the repository at this point in the history
Signed-off-by: Philemon Ukane <ukanephilemon@gmail.com>
  • Loading branch information
ukane-philemon committed Apr 26, 2024
1 parent 0573397 commit 2e8b16e
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 60 deletions.
71 changes: 54 additions & 17 deletions dex/fiatrates/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
// Oracle manages and retrieves fiat rate information from all enabled rate
// sources.
type Oracle struct {
ctx context.Context
log dex.Logger
sources []*source
ratesMtx sync.RWMutex
rates map[string]*FiatRateInfo
Expand Down Expand Up @@ -62,14 +64,46 @@ func (o *Oracle) Rate(symbol string) float64 {
}

if !hasRateInfo {
// Initiate an entry for this asset. Data for this asset will be fetched
// in the next refresh cycle.
o.rates[symbol] = new(FiatRateInfo)
o.fetchFiatRateNow(symbol)
}

return 0
}

// fetchFiatRateNow attempts to retrieve fiat rate from enabled sources and
// creates an entry for the provided asset. It'll try until one source returns a
// non-zero value for the provided symbol. symbol must have been parsed by
// parseSymbol function. Must be called with a write lock held on o.ratesMtx.
func (o *Oracle) fetchFiatRateNow(symbol string) {
var fiatRate float64
defer func() {
// Initiate an entry for this asset. Even if fiatRate is still zero when
// we get here, data for this asset will be fetched in the next refresh
// cycle.
o.rates[symbol] = &FiatRateInfo{
Rate: fiatRate,
LastUpdate: time.Now(),
}
}()

for _, s := range o.sources {
if s.isDisabled() {
continue
}

newRate, err := s.getRates(o.ctx, []string{symbol}, o.log)
if err != nil {
o.log.Errorf("%s.getRates error: %v", s.name, err)
continue
}

fiatRate = newRate[symbol]
if fiatRate > 0 {
break
}
}
}

// assets retrieves all assets that data can be fetched for.
func (o *Oracle) assets() []string {
o.ratesMtx.RLock()
Expand All @@ -84,27 +118,30 @@ func (o *Oracle) assets() []string {
// Run starts goroutines that refresh fiat rates every source.refreshInterval.
// This should be called in a goroutine as it's blocking.
func (o *Oracle) Run(ctx context.Context, log dex.Logger) {
o.ctx = ctx
o.log = log

var wg sync.WaitGroup
var sourcesEnabled int
initializedWithAssets := o.hasAssets()
for i := range o.sources {
fiatSource := o.sources[i]
if fiatSource.isDisabled() {
log.Infof("Fiat rate source %q is disabled...", fiatSource.name)
o.log.Infof("Fiat rate source %q is disabled...", fiatSource.name)
continue
}

o.fetchFromSource(ctx, fiatSource, &wg, log)
o.fetchFromSource(fiatSource, &wg)
sourcesEnabled++

if !initializedWithAssets {
continue
}

// Fetch rates now.
newRates, err := fiatSource.getRates(ctx, o.assets(), log)
newRates, err := fiatSource.getRates(o.ctx, o.assets(), o.log)
if err != nil {
log.Errorf("failed to retrieve rate from %s: %v", fiatSource.name, err)
o.log.Errorf("failed to retrieve rate from %s: %v", fiatSource.name, err)
continue
}

Expand All @@ -116,7 +153,7 @@ func (o *Oracle) Run(ctx context.Context, log dex.Logger) {

if initializedWithAssets {
// Calculate average fiat rate now.
o.calculateAverageRate(ctx, &wg, log)
o.calculateAverageRate(&wg)
}

if sourcesEnabled > 0 {
Expand All @@ -131,14 +168,14 @@ func (o *Oracle) Run(ctx context.Context, log dex.Logger) {

for {
select {
case <-ctx.Done():
case <-o.ctx.Done():
return
case <-ticker.C:
if !o.hasAssets() {
continue // nothing to do
}

o.calculateAverageRate(ctx, &wg, log)
o.calculateAverageRate(&wg)
}
}
}()
Expand All @@ -149,14 +186,14 @@ func (o *Oracle) Run(ctx context.Context, log dex.Logger) {

// calculateAverageRate is a shared function to support fiat average rate
// calculations before and after averageRateRefreshInterval.
func (o *Oracle) calculateAverageRate(ctx context.Context, wg *sync.WaitGroup, log dex.Logger) {
func (o *Oracle) calculateAverageRate(wg *sync.WaitGroup) {
newRatesInfo := make(map[string]*fiatRateAndSourceCount)
for i := range o.sources {
s := o.sources[i]
if s.isDisabled() {
if s.checkIfSourceCanReactivate() {
// Start a new goroutine for this source.
o.fetchFromSource(ctx, s, wg, log)
o.fetchFromSource(s, wg)
}
continue
}
Expand Down Expand Up @@ -201,7 +238,7 @@ func (o *Oracle) hasAssets() bool {

// fetchFromSource starts a goroutine that retrieves fiat rate from the provided
// source.
func (o *Oracle) fetchFromSource(ctx context.Context, s *source, wg *sync.WaitGroup, log dex.Logger) {
func (o *Oracle) fetchFromSource(s *source, wg *sync.WaitGroup) {
wg.Add(1)
go func(s *source) {
defer wg.Done()
Expand All @@ -210,7 +247,7 @@ func (o *Oracle) fetchFromSource(ctx context.Context, s *source, wg *sync.WaitGr

for {
select {
case <-ctx.Done():
case <-o.ctx.Done():
return
case <-ticker.C:
if !o.hasAssets() || s.isDisabled() { // nothing to fetch.
Expand All @@ -219,13 +256,13 @@ func (o *Oracle) fetchFromSource(ctx context.Context, s *source, wg *sync.WaitGr

if s.hasAssets() && s.isExpired() {
s.deactivate()
log.Errorf("Fiat rate source %q has been disabled due to lack of fresh data. It will be re-enabled after %d hours.", s.name, reactivateDuration.Hours())
o.log.Errorf("Fiat rate source %q has been disabled due to lack of fresh data. It will be re-enabled after %d hours.", s.name, reactivateDuration.Hours())
return
}

newRates, err := s.getRates(ctx, o.assets(), log)
newRates, err := s.getRates(o.ctx, o.assets(), o.log)
if err != nil {
log.Errorf("%s.getRates error: %v", s.name, err)
o.log.Errorf("%s.getRates error: %v", s.name, err)
continue
}

Expand Down
2 changes: 1 addition & 1 deletion dex/fiatrates/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Config struct {
CryptoCompareAPIKey string `long:"ccdataapikey" description:"This is your free API Key from cryptocompare.com."`
EnableBinanceUS bool `long:"enablebinanceus" description:"Set to true, if running the tatanka mesh from a US based server."`
DisabledFiatSources string `long:"disabledfiatsources" description:"A list of disabled sources separated by comma. See fiatrate/sources.go."`
Assets string `long:"assets" description:"A list of comma separated assets to fetch rates for when the oracle if activated."`
Assets string `long:"assets" description:"A list of comma separated assets to fetch rates for when the oracle is activated."`
}

type source struct {
Expand Down
22 changes: 16 additions & 6 deletions tatanka/cmd/demo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func mainErr() (err error) {
defer wg.Done()
defer cancel()

runServer(ctx, dir0, addrs[0], addrs[1], priv1.PubKey().SerializeCompressed())
runServer(ctx, dir0, addrs[0], addrs[1], priv1.PubKey().SerializeCompressed(), true)
}()

time.Sleep(time.Second)
Expand All @@ -102,7 +102,7 @@ func mainErr() (err error) {
defer wg.Done()
defer cancel()

runServer(ctx, dir1, addrs[1], addrs[0], priv0.PubKey().SerializeCompressed())
runServer(ctx, dir1, addrs[1], addrs[0], priv0.PubKey().SerializeCompressed(), false)
}()

time.Sleep(time.Second)
Expand Down Expand Up @@ -256,7 +256,9 @@ func mainErr() (err error) {
}

// Wait for rate messages.
<-time.After(9 * time.Minute) // rates are broadcasted every 8min, wait for 9min.
for i := 0; i < len(supportedDEXAssetIDs); i++ {
<-cl1.Next()
}

want := len(supportedDEXAssetIDs)
got := 0
Expand Down Expand Up @@ -308,7 +310,7 @@ func findOpenAddrs(n int) ([]net.Addr, error) {
return addrs, nil
}

func runServer(ctx context.Context, dir string, addr, peerAddr net.Addr, peerID []byte) {
func runServer(ctx context.Context, dir string, addr, peerAddr net.Addr, peerID []byte, startFiatRateOracle bool) {
n := newBootNode(peerAddr.String(), peerID)

log := logMaker.Logger(fmt.Sprintf("SRV[%s]", addr))
Expand Down Expand Up @@ -359,10 +361,18 @@ func runServer(ctx context.Context, dir string, addr, peerAddr net.Addr, peerID
},
ConfigPath: cfgPath,
WhiteList: []tatanka.BootNode{n},
FiatOracleCfg: fiatrates.Config{
Assets: strings.Join(assetStrs, ","),
FiatOracleConfig: tatanka.FiatOracleConfig{
Config: fiatrates.Config{
Assets: strings.Join(assetStrs, ","),
},
FiatRateBroadcastInterval: 1,
},
}

if !startFiatRateOracle {
cfg.FiatRateBroadcastInterval = 0
}

t, err := tatanka.New(cfg)
if err != nil {
log.Errorf("error creating Tatanka node: %v", err)
Expand Down
3 changes: 1 addition & 2 deletions tatanka/cmd/tatanka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"strings"

"decred.org/dcrdex/dex"
"decred.org/dcrdex/dex/fiatrates"
"decred.org/dcrdex/server/comms"
"decred.org/dcrdex/tatanka"
"github.com/jessevdk/go-flags"
Expand Down Expand Up @@ -115,7 +114,7 @@ type Config struct {

WebAddr string `long:"webaddr" description:"The public facing address by which peers should connect."`

FiatRateOracleCfg fiatrates.Config `group:"Fiat Oracle Config"`
tatanka.FiatOracleConfig `group:"Fiat Oracle Config"`
}

func config() (*dex.LoggerMaker, *Config) {
Expand Down

0 comments on commit 2e8b16e

Please sign in to comment.