Skip to content

Commit

Permalink
review changes and refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Philemon Ukane <ukanephilemon@gmail.com>
  • Loading branch information
ukane-philemon committed Apr 21, 2024
1 parent fe74b6b commit 14be77e
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 134 deletions.
248 changes: 149 additions & 99 deletions dex/fiatrates/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package fiatrates

import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -15,33 +17,35 @@ const (

// averageRateRefreshInterval is how long it'll take before a fresh fiat
// average rate is calculated.
averageRateRefreshInterval = defaultRefreshInterval + 1*time.Minute
averageRateRefreshInterval = defaultRefreshInterval + time.Minute
)

// Oracle manages and retrieves fiat rate information from all enabled rate
// sources.
type Oracle struct {
sources []*source
ratesMtx sync.RWMutex
fiatRates map[string]*FiatRateInfo
sources []*source
ratesMtx sync.RWMutex
rates map[string]*FiatRateInfo
}

func NewFiatOracle(cfg Config) *Oracle {
return &Oracle{
fiatRates: make(map[string]*FiatRateInfo),
sources: fiatSources(cfg),
func NewFiatOracle(cfg Config) (*Oracle, error) {
fiatOracle := &Oracle{
rates: make(map[string]*FiatRateInfo),
sources: fiatSources(cfg),
}
}

// assets retrieves all assets that data can be fetched for.
func (o *Oracle) assets() []string {
o.ratesMtx.Lock()
defer o.ratesMtx.Unlock()
var assets []string
for sym := range o.fiatRates {
assets = append(assets, sym)
assets := strings.Split(cfg.Assets, ",")
for _, asset := range assets {
_, ok := dex.BipSymbolID(strings.ToLower(asset))
if !ok {
return nil, fmt.Errorf("unknown asset %s", asset)
}

// Initialize entry for this asset.
fiatOracle.rates[parseSymbol(asset)] = new(FiatRateInfo)
}
return assets

return fiatOracle, nil
}

// Rate returns the current fiat rate information for the provided symbol.
Expand All @@ -51,7 +55,7 @@ func (o *Oracle) Rate(symbol string) float64 {
defer o.ratesMtx.Unlock()

symbol = parseSymbol(symbol)
rateInfo := o.fiatRates[symbol]
rateInfo := o.rates[symbol]
hasRateInfo := rateInfo != nil
if hasRateInfo && time.Since(rateInfo.LastUpdate) < FiatRateDataExpiry && rateInfo.Rate > 0 {
return rateInfo.Rate
Expand All @@ -60,68 +64,59 @@ func (o *Oracle) Rate(symbol string) float64 {
if !hasRateInfo {
// Initiate an entry for this asset. Data for this asset will be fetched
// in the next refresh cycle.
o.fiatRates[symbol] = new(FiatRateInfo)
o.rates[symbol] = new(FiatRateInfo)
}

return 0
}

// Run starts goroutines that refreshes fiat rate every source.refreshInterval.
// assets retrieves all assets that data can be fetched for.
func (o *Oracle) assets() []string {
o.ratesMtx.Lock()
defer o.ratesMtx.Unlock()
var assets []string
for sym := range o.rates {
assets = append(assets, sym)
}
return assets
}

// Run starts goroutines that refresh fiat rates every source.refreshInterval.
// This should be called in a goroutine as it's blocking.
func (o *Oracle) Run(ctx context.Context, log dex.Logger) {
var wg sync.WaitGroup
var sourcesEnabled int
initializedWithAssets := o.hasAssets()
for i := range o.sources {
fiatSource := o.sources[i]
if fiatSource.disabled {
if fiatSource.isDisabled() {
log.Infof("Fiat rate source %q is disabled...", fiatSource.name)
continue
}

wg.Add(1)
go func(s *source) {
defer wg.Done()
ticker := time.NewTicker(s.requestInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if len(o.fiatRates) == 0 || s.disabled { // nothing to fetch.
continue
}

if len(s.rates) > 0 && time.Since(s.lastRefresh) > FiatRateDataExpiry {
s.mtx.Lock()
s.rates = nil // empty previous rates
s.disabled = true
s.mtx.Unlock()
log.Errorf("Fiat rate source %q has been disabled due to lack of fresh data...", s.name)
return
}
o.fetchFromSource(ctx, fiatSource, &wg, log)
sourcesEnabled++

assets := o.assets()
if len(assets) == 0 {
continue // nothing to do
}
if !initializedWithAssets {
continue
}

newRates, err := s.getRates(ctx, assets, log)
if err != nil {
log.Errorf("%s.getRates error: %v", s.name, err)
continue
}
// Fetch rates now.
newRates, err := fiatSource.getRates(ctx, o.assets(), log)
if err != nil {
log.Errorf("failed to retrieve rate from %s: %v", fiatSource.name, err)
continue
}

s.mtx.Lock()
s.rates = newRates
s.lastRefresh = time.Now()
s.mtx.Unlock()
}
}
}(fiatSource)
fiatSource.mtx.Lock()
fiatSource.rates = newRates
fiatSource.lastRefresh = time.Now()
fiatSource.mtx.Unlock()
}

sourcesEnabled++
if initializedWithAssets {
// Calculate average fiat rate now.
o.calculateAverageRate(ctx, &wg, log)
}

if sourcesEnabled > 0 {
Expand All @@ -139,51 +134,106 @@ func (o *Oracle) Run(ctx context.Context, log dex.Logger) {
case <-ctx.Done():
return
case <-ticker.C:
if len(o.fiatRates) == 0 {
if !o.hasAssets() {
continue // nothing to do
}

newRatesInfo := make(map[string]*fiatRateAndSourceCount)
for i := range o.sources {
s := o.sources[i]
if s.disabled {
continue
}

s.mtx.RLock()
sourceRates := s.rates
s.mtx.RUnlock()

for sym, rate := range sourceRates {
if rate == 0 {
continue
}

info, ok := newRatesInfo[sym]
if !ok {
info = new(fiatRateAndSourceCount)
newRatesInfo[sym] = info
}

info.sources++
info.totalFiatRate += rate
}
}

now := time.Now()
o.ratesMtx.Lock()
for sym := range o.fiatRates {
rateInfo := newRatesInfo[sym]
if rateInfo != nil {
o.fiatRates[sym].Rate = rateInfo.totalFiatRate / float64(rateInfo.sources)
o.fiatRates[sym].LastUpdate = now
}
}
o.ratesMtx.Unlock()
o.calculateAverageRate(ctx, &wg, log)
}
}
}()
}

wg.Wait()
}

// calculateAverageRate is a shared function to support fiat average rate
// calculations before and after averageRateRefreshInterval.
func (o *Oracle) calculateAverageRate(ctx context.Context, wg *sync.WaitGroup, log dex.Logger) {
newRatesInfo := make(map[string]*fiatRateAndSourceCount)
for i := range o.sources {
s := o.sources[i]
if s.isDisabled() {
if s.checkIfSourceCanReactivate() {
// Start a new goroutine for this source.
o.fetchFromSource(ctx, s, wg, log)
}
continue
}

s.mtx.RLock()
sourceRates := s.rates
s.mtx.RUnlock()

for sym, rate := range sourceRates {
if rate == 0 {
continue
}

info, ok := newRatesInfo[sym]
if !ok {
info = new(fiatRateAndSourceCount)
newRatesInfo[sym] = info
}

info.sources++
info.totalFiatRate += rate
}
}

now := time.Now()
o.ratesMtx.Lock()
for sym := range o.rates {
rateInfo := newRatesInfo[sym]
if rateInfo != nil {
o.rates[sym].Rate = rateInfo.totalFiatRate / float64(rateInfo.sources)
o.rates[sym].LastUpdate = now
}
}
o.ratesMtx.Unlock()
}

func (o *Oracle) hasAssets() bool {
o.ratesMtx.RLock()
defer o.ratesMtx.RUnlock()
return len(o.rates) != 0
}

// fetchFromSource starts a goroutine that retrieves fiat rate from the provided
// source.
func (o *Oracle) fetchFromSource(ctx context.Context, s *source, wg *sync.WaitGroup, log dex.Logger) {
wg.Add(1)
go func(s *source) {
defer wg.Done()
ticker := time.NewTicker(s.requestInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !o.hasAssets() || s.isDisabled() { // nothing to fetch.
continue
}

if s.hasAssets() && s.isExpired() {
s.deactivate()
log.Errorf("Fiat rate source %q has been disabled due to lack of fresh data. It will be re-enabled after %d hours.", s.name, reactivateDuration.Hours())
return
}

newRates, err := s.getRates(ctx, o.assets(), log)
if err != nil {
log.Errorf("%s.getRates error: %v", s.name, err)
continue
}

s.mtx.Lock()
s.rates = newRates
s.lastRefresh = time.Now()
s.mtx.Unlock()
}
}
}(s)
}
4 changes: 4 additions & 0 deletions dex/fiatrates/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ func fiatSources(cfg Config) []*source {
},
}

for i := range sources {
sources[i].canReactivate = !sources[i].disabled
}

return sources
}

Expand Down

0 comments on commit 14be77e

Please sign in to comment.