Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: [xdepthmaker] add profit fixer #1559

Merged
merged 5 commits into from Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 85 additions & 0 deletions pkg/strategy/xdepthmaker/profitfixer.go
@@ -0,0 +1,85 @@
package xdepthmaker

import (
"context"
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)

type ProfitFixerConfig struct {
TradesSince types.Time `json:"tradesSince,omitempty"`
}

// ProfitFixer implements a trade history based profit fixer
type ProfitFixer struct {
market types.Market

sessions map[string]types.ExchangeTradeHistoryService
}

func NewProfitFixer(market types.Market) *ProfitFixer {
return &ProfitFixer{
market: market,
sessions: make(map[string]types.ExchangeTradeHistoryService),
}
}

func (f *ProfitFixer) AddExchange(sessionName string, service types.ExchangeTradeHistoryService) {
f.sessions[sessionName] = service
}

func (f *ProfitFixer) batchQueryTrades(
ctx context.Context,
service types.ExchangeTradeHistoryService,
symbol string,
since time.Time,
c9s marked this conversation as resolved.
Show resolved Hide resolved
) ([]types.Trade, error) {

now := time.Now()
q := &batch.TradeBatchQuery{ExchangeTradeHistoryService: service}
trades, err := q.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &since,
EndTime: &now,
})

return trades, err
}

func (f *ProfitFixer) Fix(ctx context.Context, since time.Time) (*types.ProfitStats, error) {
stats := types.NewProfitStats(f.market)

var mu sync.Mutex
var allTrades = make([]types.Trade, 0, 1000)

g, subCtx := errgroup.WithContext(ctx)
for _, service := range f.sessions {
g.Go(func() error {
trades, err := f.batchQueryTrades(subCtx, service, f.market.Symbol, since)
c9s marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.WithError(err).Errorf("unable to batch query trades for fixer")
c9s marked this conversation as resolved.
Show resolved Hide resolved
return err
}

mu.Lock()
allTrades = append(allTrades, trades...)
mu.Unlock()
return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}

allTrades = types.SortTradesAscending(allTrades)
for _, trade := range allTrades {
stats.AddTrade(trade)
}

return stats, nil
}
42 changes: 29 additions & 13 deletions pkg/strategy/xdepthmaker/strategy.go
Expand Up @@ -193,6 +193,8 @@ type Strategy struct {
// Pips is the pips of the layer prices
Pips fixedpoint.Value `json:"pips"`

ProfitFixerConfig *ProfitFixerConfig `json:"profitFixer"`

// --------------------------------
// private fields
// --------------------------------
Expand Down Expand Up @@ -324,14 +326,30 @@ func (s *Strategy) CrossRun(

s.stopC = make(chan struct{})

if s.RecoverTrade {
// go s.runTradeRecover(ctx)
}

s.authedC = make(chan struct{}, 2)
s.authedC = make(chan struct{}, 5)
bindAuthSignal(ctx, s.makerSession.UserDataStream, s.authedC)
bindAuthSignal(ctx, s.hedgeSession.UserDataStream, s.authedC)

if s.ProfitFixerConfig != nil {
if s.ProfitFixerConfig.TradesSince.Time().IsZero() {
return errors.New("tradesSince time can not be zero")
}

fixer := NewProfitFixer(s.makerMarket)
fixer.AddExchange(s.makerSession.Name, s.makerSession.Exchange.(types.ExchangeTradeHistoryService))
fixer.AddExchange(s.hedgeSession.Name, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService))
profitStats, err2 := fixer.Fix(ctx, s.ProfitFixerConfig.TradesSince.Time())
if err2 != nil {
return err2
}

s.CrossExchangeMarketMakingStrategy.ProfitStats = profitStats
}

if s.RecoverTrade {
go s.runTradeRecover(ctx)
}

go func() {
log.Infof("waiting for user data stream to get authenticated")
select {
Expand Down Expand Up @@ -578,16 +596,14 @@ func (s *Strategy) runTradeRecover(ctx context.Context) {
case <-tradeScanTicker.C:
log.Infof("scanning trades from %s ago...", tradeScanInterval)

if s.RecoverTrade {
startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod)
startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod)

if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}

if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/types/profit.go
Expand Up @@ -257,6 +257,10 @@ func (s *ProfitStats) AddTrade(trade Trade) {

// IsOver24Hours checks if the since time is over 24 hours
func (s *ProfitStats) IsOver24Hours() bool {
if s.TodaySince == 0 {
return false
}

return time.Since(time.Unix(s.TodaySince, 0)) >= 24*time.Hour
}

Expand Down