Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fiatrates: Implement a fiat rate oracle for tatanka #2736

Merged
merged 10 commits into from
May 14, 2024
119 changes: 79 additions & 40 deletions dex/fiatrates/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,21 @@ const (
// Oracle manages and retrieves fiat rate information from all enabled rate
// sources.
type Oracle struct {
log dex.Logger
sources []*source
ratesMtx sync.RWMutex
rates map[string]*FiatRateInfo
rateBroadcastChan chan map[string]*FiatRateInfo
log dex.Logger
sources []*source
ratesMtx sync.RWMutex
rates map[string]*FiatRateInfo

listenersMtx sync.RWMutex
listeners map[string]chan<- map[string]*FiatRateInfo
}

func NewFiatOracle(cfg Config, tickerSymbols string, log dex.Logger) (*Oracle, error) {
fiatOracle := &Oracle{
log: log,
rates: make(map[string]*FiatRateInfo),
sources: fiatSources(cfg),
rateBroadcastChan: make(chan map[string]*FiatRateInfo),
log: log,
rates: make(map[string]*FiatRateInfo),
sources: fiatSources(cfg),
listeners: make(map[string]chan<- map[string]*FiatRateInfo),
}

tickers := strings.Split(tickerSymbols, ",")
Expand All @@ -57,11 +59,6 @@ func NewFiatOracle(cfg Config, tickerSymbols string, log dex.Logger) (*Oracle, e
return fiatOracle, nil
}

// BroadcastChan returns a read only channel to listen for latest rates.
func (o *Oracle) BroadcastChan() <-chan map[string]*FiatRateInfo {
return o.rateBroadcastChan
}

// tickers retrieves all tickers that data can be fetched for.
func (o *Oracle) tickers() []string {
o.ratesMtx.RLock()
Expand All @@ -73,12 +70,26 @@ func (o *Oracle) tickers() []string {
return tickers
}

// Rates returns the current fiat rates. Returns an empty map if there are no
// valid rates.
func (o *Oracle) Rates() map[string]*FiatRateInfo {
o.ratesMtx.Lock()
defer o.ratesMtx.Unlock()
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved
rates := make(map[string]*FiatRateInfo, len(o.rates))
for ticker, rate := range o.rates {
if rate.Value > 0 && !rate.IsExpired() {
r := *rate
rates[ticker] = &r
}
}
return rates
}

// Run starts goroutines that refresh fiat rates every source.refreshInterval.
// This should be called in a goroutine as it's blocking.
func (o *Oracle) Run(ctx context.Context) {
var wg sync.WaitGroup
var sourcesEnabled int
initializedWithTicker := o.hasTicker()
for i := range o.sources {
fiatSource := o.sources[i]
if fiatSource.isDisabled() {
Expand All @@ -89,10 +100,6 @@ func (o *Oracle) Run(ctx context.Context) {
o.fetchFromSource(ctx, fiatSource, &wg)
sourcesEnabled++

if !initializedWithTicker {
continue
}

// Fetch rates now.
newRates, err := fiatSource.getRates(ctx, o.tickers(), o.log)
if err != nil {
Expand All @@ -106,10 +113,8 @@ func (o *Oracle) Run(ctx context.Context) {
fiatSource.mtx.Unlock()
}

if initializedWithTicker {
// Calculate average fiat rate now.
o.calculateAverageRate(ctx, &wg)
}
// Calculate average fiat rate now.
o.calculateAverageRate()

if sourcesEnabled > 0 {
// Start a goroutine to generate an average fiat rate based on fresh
Expand All @@ -126,30 +131,69 @@ func (o *Oracle) Run(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
if !o.hasTicker() {
continue // nothing to do
reActivatedSources := o.calculateAverageRate()
for _, index := range reActivatedSources {
s := o.sources[index]
// Start a new goroutine for this source.
o.fetchFromSource(ctx, s, &wg)
}

o.calculateAverageRate(ctx, &wg)
}
}
}()
}

wg.Wait()
close(o.rateBroadcastChan)

o.listenersMtx.Lock()
for id, rateChan := range o.listeners {
close(rateChan) // we are done sending fiat rates
delete(o.listeners, id)
}
o.listenersMtx.Unlock()
}

// AddFiatRateListener adds a new fiat rate listener for the provided uniqueID.
// Overrides existing rateChan if uniqueID already exists.
func (o *Oracle) AddFiatRateListener(uniqueID string, ratesChan chan<- map[string]*FiatRateInfo) {
o.listenersMtx.Lock()
defer o.listenersMtx.Unlock()
o.listeners[uniqueID] = ratesChan
}

// RemoveFiatRateListener removes a fiat rate listener. no-op if there's no
// listener for the provided uniqueID. The fiat rate chan will be closed to
// signal to readers that we are done sending.
func (o *Oracle) RemoveFiatRateListener(uniqueID string) {
o.listenersMtx.Lock()
defer o.listenersMtx.Unlock()
rateChan, ok := o.listeners[uniqueID]
if !ok {
return
}

delete(o.listeners, uniqueID)
close(rateChan) // we are done sending.
}

// notifyListeners sends the provided rates to all listener.
func (o *Oracle) notifyListeners(rates map[string]*FiatRateInfo) {
o.listenersMtx.RLock()
defer o.listenersMtx.RUnlock()
for _, rateChan := range o.listeners {
rateChan <- rates
}
}

// calculateAverageRate is a shared function to support fiat average rate
// calculations before and after averageRateRefreshInterval.
func (o *Oracle) calculateAverageRate(ctx context.Context, wg *sync.WaitGroup) {
func (o *Oracle) calculateAverageRate() []int {
var reActivatedSourceIndexes []int
newRatesInfo := make(map[string]*fiatRateAndSourceCount)
for i := range o.sources {
s := o.sources[i]
if s.isDisabled() {
if s.checkIfSourceCanReactivate() {
// Start a new goroutine for this source.
o.fetchFromSource(ctx, s, wg)
reActivatedSourceIndexes = append(reActivatedSourceIndexes, i)
}
continue
}
Expand Down Expand Up @@ -178,11 +222,10 @@ func (o *Oracle) calculateAverageRate(ctx context.Context, wg *sync.WaitGroup) {
broadcastRates := make(map[string]*FiatRateInfo)
o.ratesMtx.Lock()
for ticker := range o.rates {
oldRate := o.rates[ticker].Value
rateInfo := newRatesInfo[ticker]
if rateInfo != nil {
newRate := rateInfo.totalFiatRate / float64(rateInfo.sources)
if oldRate != newRate && newRate > 0 {
if newRate > 0 {
o.rates[ticker].Value = newRate
o.rates[ticker].LastUpdate = now
rate := *o.rates[ticker] // copy
Expand All @@ -193,14 +236,10 @@ func (o *Oracle) calculateAverageRate(ctx context.Context, wg *sync.WaitGroup) {
o.ratesMtx.Unlock()

if len(broadcastRates) > 0 {
o.rateBroadcastChan <- broadcastRates
o.notifyListeners(broadcastRates)
}
}

func (o *Oracle) hasTicker() bool {
o.ratesMtx.RLock()
defer o.ratesMtx.RUnlock()
return len(o.rates) != 0
return reActivatedSourceIndexes
}

// fetchFromSource starts a goroutine that retrieves fiat rate from the provided
Expand All @@ -217,7 +256,7 @@ func (o *Oracle) fetchFromSource(ctx context.Context, s *source, wg *sync.WaitGr
case <-ctx.Done():
return
case <-ticker.C:
if !o.hasTicker() || s.isDisabled() { // nothing to fetch.
if s.isDisabled() { // nothing to fetch.
continue
}

Expand Down
4 changes: 4 additions & 0 deletions dex/fiatrates/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,7 @@ type FiatRateInfo struct {
Value float64
LastUpdate time.Time
}

func (f *FiatRateInfo) IsExpired() bool {
return time.Since(f.LastUpdate) > FiatRateDataExpiry
}
ukane-philemon marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 1 addition & 2 deletions tatanka/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,9 @@ func (c *TankaClient) FiatRate(assetID uint32) float64 {
defer c.fiatRatesMtx.RUnlock()
sym := dex.BipIDSymbol(assetID)
rateInfo := c.fiatRates[sym]
if rateInfo != nil && time.Since(rateInfo.LastUpdate) < fiatrates.FiatRateDataExpiry && rateInfo.Value > 0 {
if rateInfo != nil && !rateInfo.IsExpired() && rateInfo.Value > 0 {
return rateInfo.Value
}

return 0
}

Expand Down
24 changes: 22 additions & 2 deletions tatanka/client_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ func (t *Tatanka) handleSubscription(c *client, msg *msgjson.Message) *msgjson.E
// Send it to all other remote tatankas.
t.relayBroadcast(bcast, c.ID)

// Find it and broadcaast to locally-connected clients, or add the subject
// if it doesn't exist.
// Find it and broadcast to locally-connected clients, or add the subject if
// it doesn't exist.
t.clientMtx.Lock()
defer t.clientMtx.Unlock()
topic, exists := t.topics[sub.Topic]
Expand Down Expand Up @@ -346,9 +346,29 @@ func (t *Tatanka) handleSubscription(c *client, msg *msgjson.Message) *msgjson.E
}

t.sendResult(c, msg.ID, true)
t.replySubscription(c, sub.Topic)
return nil
}

// replySubscription sends a follow up reply to a sender's subscription after
// their message has been processed successfully.
func (t *Tatanka) replySubscription(cl tanka.Sender, topic tanka.Topic) {
switch topic {
case mj.TopicFiatRate:
if t.fiatOracleEnabled() {
rates := t.fiatRateOracle.Rates()
if len(rates) == 0 { // no data to send
return
}

t.send(cl, mj.MustNotification(mj.RouteRates, &mj.RateMessage{
Topic: mj.TopicFiatRate,
Rates: rates,
}))
}
}
}

func (t *Tatanka) unsub(peerID tanka.PeerID, topicID tanka.Topic, subjectID tanka.Subject) *msgjson.Error {
t.clientMtx.Lock()
defer t.clientMtx.Unlock()
Expand Down
32 changes: 24 additions & 8 deletions tatanka/tatanka.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ import (
"golang.org/x/text/language"
)

const version = 0
const (
version = 0

// tatankaUniqueID is the unique ID used to register a Tatanka as a fiat
// rate listener.
tatankaUniqueID = "Tatanka"
)

// remoteTatanka is a remote tatanka node. A remote tatanka node can either
// be outgoing (whitelist loop) or incoming via handleInboundTatankaConnect.
Expand Down Expand Up @@ -116,6 +122,7 @@ type Tatanka struct {
tatankas map[tanka.PeerID]*remoteTatanka

fiatRateOracle *fiatrates.Oracle
fiatRateChan chan map[string]*fiatrates.FiatRateInfo
}

// Config is the configuration of the Tatanka.
Expand Down Expand Up @@ -220,6 +227,10 @@ func New(cfg *Config) (*Tatanka, error) {
if err != nil {
return nil, fmt.Errorf("error initializing fiat oracle: %w", err)
}

// Register tatanka as a listener
t.fiatRateChan = make(chan map[string]*fiatrates.FiatRateInfo)
t.fiatRateOracle.AddFiatRateListener(tatankaUniqueID, t.fiatRateChan)
Comment on lines +231 to +233
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, but who will the other listeners be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pkg I think would be useful to other decred projects, e.g cryptopower.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it will behave differently for cryptopower? Trying to imagine why you would need multiple listeners for the same instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same pattern. Start an oracle, and register.

}

t.nets.Store(nets)
Expand Down Expand Up @@ -722,19 +733,24 @@ func (t *Tatanka) clientDisconnected(peerID tanka.PeerID) {
}
}

// broadcastRates sends market rates to all fiat rate subscribers every
// t.fiatRateBroadcastInterval. This method blocks and must be called from a
// goroutine.
// broadcastRates sends market rates to all fiat rate subscribers once new rates
// are received from the fiat oracle.
func (t *Tatanka) broadcastRates() {
broadcastChan := t.fiatRateOracle.BroadcastChan()
for {
select {
case <-t.ctx.Done():
return
case rates := <-broadcastChan:
case rates, ok := <-t.fiatRateChan:
if !ok {
t.log.Debug("Tatanka stopped listening for fiat rates.")
return
}

t.clientMtx.RLock()
defer t.clientMtx.RUnlock()
if topic := t.topics[mj.TopicFiatRate]; topic != nil && len(topic.subscribers) > 0 {
topic := t.topics[mj.TopicFiatRate]
t.clientMtx.RUnlock()

if topic != nil && len(topic.subscribers) > 0 {
t.batchSend(topic.subscribers, mj.MustNotification(mj.RouteRates, &mj.RateMessage{
Topic: mj.TopicFiatRate,
Rates: rates,
Expand Down