Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fiatrates: Implement a fiat rate oracle for tatanka #2736

Merged
merged 10 commits into from
May 14, 2024
11 changes: 11 additions & 0 deletions dex/fiatrates/fiatrates.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,22 @@ func FetchCoinpaprikaRates(ctx context.Context, assets []*CoinpaprikaAsset, log
}

func getRates(ctx context.Context, url string, thing any) error {
return getRatesWithHeader(ctx, url, thing, nil)
}

func getRatesWithHeader(ctx context.Context, url string, thing any, header map[string]string) error {
ctx, cancel := context.WithTimeout(ctx, fiatRequestTimeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return err
}

for key, value := range header {
req.Header.Add(key, value)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
Expand Down
274 changes: 274 additions & 0 deletions dex/fiatrates/oracle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package fiatrates

import (
"context"
"fmt"
"strings"
"sync"
"time"

"decred.org/dcrdex/dex"
)

const (
// FiatRateDataExpiry : Any data older than FiatRateDataExpiry will be
// discarded.
FiatRateDataExpiry = 60 * time.Minute

// averageRateRefreshInterval is how long it'll take before a fresh fiat
// average rate is calculated.
averageRateRefreshInterval = defaultRefreshInterval + time.Minute
)

// Oracle manages and retrieves fiat rate information from all enabled rate
// sources.
type Oracle struct {
log dex.Logger
sources []*source
ratesMtx sync.RWMutex
rates map[string]*FiatRateInfo
}

func NewFiatOracle(cfg Config) (*Oracle, error) {
fiatOracle := &Oracle{
rates: make(map[string]*FiatRateInfo),
sources: fiatSources(cfg),
}

assets := strings.Split(cfg.Assets, ",")
for _, asset := range assets {
_, ok := dex.BipSymbolID(strings.ToLower(asset))
if !ok {
return nil, fmt.Errorf("unknown asset %s", asset)
}

// Initialize entry for this asset.
fiatOracle.rates[parseSymbol(asset)] = new(FiatRateInfo)
}

return fiatOracle, nil
}

// Rate returns the current fiat rate information for the provided symbol.
// Returns zero if there are no rates for the provided symbol yet.
func (o *Oracle) Rate(ctx context.Context, symbol string) float64 {
o.ratesMtx.Lock()
defer o.ratesMtx.Unlock()

symbol = parseSymbol(symbol)
rateInfo := o.rates[symbol]
hasRateInfo := rateInfo != nil
if hasRateInfo && time.Since(rateInfo.LastUpdate) < FiatRateDataExpiry && rateInfo.Rate > 0 {
return rateInfo.Rate
}

if !hasRateInfo {
o.fetchFiatRateNow(ctx, symbol)
}

return 0
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved
}

// fetchFiatRateNow attempts to retrieve fiat rate from enabled sources and
// creates an entry for the provided asset. It'll try until one source returns a
// non-zero value for the provided symbol. symbol must have been parsed by
// parseSymbol function. Must be called with a write lock held on o.ratesMtx.
func (o *Oracle) fetchFiatRateNow(ctx context.Context, symbol string) {
var fiatRate float64
defer func() {
// Initiate an entry for this asset. Even if fiatRate is still zero when
// we get here, data for this asset will be fetched in the next refresh
// cycle.
o.rates[symbol] = &FiatRateInfo{
Rate: fiatRate,
LastUpdate: time.Now(),
}
}()

for _, s := range o.sources {
if s.isDisabled() {
continue
}

newRate, err := s.getRates(ctx, []string{symbol}, o.log)
if err != nil {
o.log.Errorf("%s.getRates error: %v", s.name, err)
continue
}

fiatRate = newRate[symbol]
if fiatRate > 0 {
break
}
}
}

// assets retrieves all assets that data can be fetched for.
func (o *Oracle) assets() []string {
o.ratesMtx.RLock()
defer o.ratesMtx.RUnlock()
var assets []string
for sym := range o.rates {
assets = append(assets, sym)
}
return assets
}

// Run starts goroutines that refresh fiat rates every source.refreshInterval.
// This should be called in a goroutine as it's blocking.
func (o *Oracle) Run(ctx context.Context, log dex.Logger) {
o.log = log
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved

var wg sync.WaitGroup
var sourcesEnabled int
initializedWithAssets := o.hasAssets()
for i := range o.sources {
fiatSource := o.sources[i]
if fiatSource.isDisabled() {
o.log.Infof("Fiat rate source %q is disabled...", fiatSource.name)
continue
}

o.fetchFromSource(ctx, fiatSource, &wg)
sourcesEnabled++

if !initializedWithAssets {
continue
}

// Fetch rates now.
newRates, err := fiatSource.getRates(ctx, o.assets(), o.log)
if err != nil {
o.log.Errorf("failed to retrieve rate from %s: %v", fiatSource.name, err)
continue
}

fiatSource.mtx.Lock()
fiatSource.rates = newRates
fiatSource.lastRefresh = time.Now()
fiatSource.mtx.Unlock()
}

if initializedWithAssets {
// Calculate average fiat rate now.
o.calculateAverageRate(ctx, &wg)
}

if sourcesEnabled > 0 {
// Start a goroutine to generate an average fiat rate based on fresh
// data from all enabled sources. This is done every
// averageRateRefreshInterval.
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(averageRateRefreshInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !o.hasAssets() {
continue // nothing to do
}

o.calculateAverageRate(ctx, &wg)
}
}
}()
}

wg.Wait()
}

// calculateAverageRate is a shared function to support fiat average rate
// calculations before and after averageRateRefreshInterval.
func (o *Oracle) calculateAverageRate(ctx context.Context, wg *sync.WaitGroup) {
newRatesInfo := make(map[string]*fiatRateAndSourceCount)
for i := range o.sources {
s := o.sources[i]
if s.isDisabled() {
if s.checkIfSourceCanReactivate() {
// Start a new goroutine for this source.
o.fetchFromSource(ctx, s, wg)
}
continue
}

s.mtx.RLock()
sourceRates := s.rates
s.mtx.RUnlock()

for sym, rate := range sourceRates {
if rate == 0 {
continue
}

info, ok := newRatesInfo[sym]
if !ok {
info = new(fiatRateAndSourceCount)
newRatesInfo[sym] = info
}

info.sources++
info.totalFiatRate += rate
}
}

now := time.Now()
o.ratesMtx.Lock()
for sym := range o.rates {
rateInfo := newRatesInfo[sym]
if rateInfo != nil {
o.rates[sym].Rate = rateInfo.totalFiatRate / float64(rateInfo.sources)
o.rates[sym].LastUpdate = now
}
}
o.ratesMtx.Unlock()
}

func (o *Oracle) hasAssets() bool {
o.ratesMtx.RLock()
defer o.ratesMtx.RUnlock()
return len(o.rates) != 0
}

// fetchFromSource starts a goroutine that retrieves fiat rate from the provided
// source.
func (o *Oracle) fetchFromSource(ctx context.Context, s *source, wg *sync.WaitGroup) {
wg.Add(1)
go func(s *source) {
defer wg.Done()
ticker := time.NewTicker(s.requestInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !o.hasAssets() || s.isDisabled() { // nothing to fetch.
continue
}

if s.hasAssets() && s.isExpired() {
s.deactivate()
o.log.Errorf("Fiat rate source %q has been disabled due to lack of fresh data. It will be re-enabled after %d hours.", s.name, reactivateDuration.Hours())
return
}

newRates, err := s.getRates(ctx, o.assets(), o.log)
if err != nil {
o.log.Errorf("%s.getRates error: %v", s.name, err)
continue
}

s.mtx.Lock()
s.rates = newRates
s.lastRefresh = time.Now()
s.mtx.Unlock()
}
}
}(s)
}