Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fiatrates: Implement a fiat rate oracle for tatanka #2736

Merged
merged 10 commits into from
May 14, 2024
11 changes: 11 additions & 0 deletions dex/fiatrates/fiatrates.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,22 @@ func FetchCoinpaprikaRates(ctx context.Context, assets []*CoinpaprikaAsset, log
}

func getRates(ctx context.Context, url string, thing any) error {
return getRatesWithHeader(ctx, url, thing, nil)
}

func getRatesWithHeader(ctx context.Context, url string, thing any, header map[string]string) error {
ctx, cancel := context.WithTimeout(ctx, fiatRequestTimeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return err
}

for key, value := range header {
req.Header.Add(key, value)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
Expand Down
189 changes: 189 additions & 0 deletions dex/fiatrates/oracle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package fiatrates

import (
"context"
"sync"
"time"

"decred.org/dcrdex/dex"
)

const (
// FiatRateDataExpiry : Any data older than FiatRateDataExpiry will be
// discarded.
FiatRateDataExpiry = 60 * time.Minute

// averageRateRefreshInterval is how long it'll take before a fresh fiat
// average rate is calculated.
averageRateRefreshInterval = defaultRefreshInterval + 1*time.Minute
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved
)

// Oracle manages and retrieves fiat rate information from all enabled rate
// sources.
type Oracle struct {
sources []*source
ratesMtx sync.RWMutex
fiatRates map[string]*FiatRateInfo
}

func NewFiatOracle(cfg Config) *Oracle {
return &Oracle{
fiatRates: make(map[string]*FiatRateInfo),
sources: fiatSources(cfg),
}
}

// assets retrieves all assets that data can be fetched for.
func (o *Oracle) assets() []string {
o.ratesMtx.Lock()
defer o.ratesMtx.Unlock()
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved
var assets []string
for sym := range o.fiatRates {
assets = append(assets, sym)
}
return assets
}

// Rate returns the current fiat rate information for the provided symbol.
// Returns zero if there are no rates for the provided symbol yet.
func (o *Oracle) Rate(symbol string) float64 {
o.ratesMtx.Lock()
defer o.ratesMtx.Unlock()
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved

symbol = parseSymbol(symbol)
rateInfo := o.fiatRates[symbol]
hasRateInfo := rateInfo != nil
if hasRateInfo && time.Since(rateInfo.LastUpdate) < FiatRateDataExpiry && rateInfo.Rate > 0 {
return rateInfo.Rate
}

if !hasRateInfo {
// Initiate an entry for this asset. Data for this asset will be fetched
// in the next refresh cycle.
o.fiatRates[symbol] = new(FiatRateInfo)
}

return 0
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved
}

// Run starts goroutines that refreshes fiat rate every source.refreshInterval.
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved
// This should be called in a goroutine as it's blocking.
func (o *Oracle) Run(ctx context.Context, log dex.Logger) {
var wg sync.WaitGroup
var sourcesEnabled int
for i := range o.sources {
fiatSource := o.sources[i]
if fiatSource.disabled {
log.Infof("Fiat rate source %q is disabled...", fiatSource.name)
continue
}

wg.Add(1)
go func(s *source) {
defer wg.Done()
ticker := time.NewTicker(s.requestInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved
if len(o.fiatRates) == 0 || s.disabled { // nothing to fetch.
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved
continue
}

if len(s.rates) > 0 && time.Since(s.lastRefresh) > FiatRateDataExpiry {
s.mtx.Lock()
s.rates = nil // empty previous rates
s.disabled = true
s.mtx.Unlock()
log.Errorf("Fiat rate source %q has been disabled due to lack of fresh data...", s.name)
return
}
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved

assets := o.assets()
if len(assets) == 0 {
continue // nothing to do
}

newRates, err := s.getRates(ctx, assets, log)
if err != nil {
log.Errorf("%s.getRates error: %v", s.name, err)
continue
}

s.mtx.Lock()
s.rates = newRates
s.lastRefresh = time.Now()
s.mtx.Unlock()
}
}
}(fiatSource)

sourcesEnabled++
}

if sourcesEnabled > 0 {
// Start a goroutine to generate an average fiat rate based on fresh
// data from all enabled sources. This is done every
// averageRateRefreshInterval.
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(averageRateRefreshInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if len(o.fiatRates) == 0 {
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved
continue // nothing to do
}

newRatesInfo := make(map[string]*fiatRateAndSourceCount)
for i := range o.sources {
s := o.sources[i]
if s.disabled {
continue
}
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved

s.mtx.RLock()
sourceRates := s.rates
s.mtx.RUnlock()

for sym, rate := range sourceRates {
if rate == 0 {
continue
}

info, ok := newRatesInfo[sym]
if !ok {
info = new(fiatRateAndSourceCount)
newRatesInfo[sym] = info
}

info.sources++
info.totalFiatRate += rate
}
}

now := time.Now()
o.ratesMtx.Lock()
for sym := range o.fiatRates {
rateInfo := newRatesInfo[sym]
if rateInfo != nil {
o.fiatRates[sym].Rate = rateInfo.totalFiatRate / float64(rateInfo.sources)
o.fiatRates[sym].LastUpdate = now
}
}
o.ratesMtx.Unlock()
}
}
}()
}

wg.Wait()
}