From 95a5e542ba06f1d880721883ce3cdcf1887c7f4c Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 5 Mar 2024 18:12:30 +0800 Subject: [PATCH 1/5] xdepthmaker: add profitx fixer --- pkg/strategy/xdepthmaker/profitfixer.go | 85 +++++++++++++++++++++++++ pkg/strategy/xdepthmaker/strategy.go | 42 ++++++++---- pkg/types/profit.go | 4 ++ 3 files changed, 118 insertions(+), 13 deletions(-) create mode 100644 pkg/strategy/xdepthmaker/profitfixer.go diff --git a/pkg/strategy/xdepthmaker/profitfixer.go b/pkg/strategy/xdepthmaker/profitfixer.go new file mode 100644 index 0000000000..f662b5bb12 --- /dev/null +++ b/pkg/strategy/xdepthmaker/profitfixer.go @@ -0,0 +1,85 @@ +package xdepthmaker + +import ( + "context" + "sync" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/c9s/bbgo/pkg/exchange/batch" + "github.com/c9s/bbgo/pkg/types" +) + +type ProfitFixerConfig struct { + TradesSince types.Time `json:"tradesSince,omitempty"` +} + +// ProfitFixer implements a trade history based profit fixer +type ProfitFixer struct { + market types.Market + + sessions map[string]types.ExchangeTradeHistoryService +} + +func NewProfitFixer(market types.Market) *ProfitFixer { + return &ProfitFixer{ + market: market, + sessions: make(map[string]types.ExchangeTradeHistoryService), + } +} + +func (f *ProfitFixer) AddExchange(sessionName string, service types.ExchangeTradeHistoryService) { + f.sessions[sessionName] = service +} + +func (f *ProfitFixer) batchQueryTrades( + ctx context.Context, + service types.ExchangeTradeHistoryService, + symbol string, + since time.Time, +) ([]types.Trade, error) { + + now := time.Now() + q := &batch.TradeBatchQuery{ExchangeTradeHistoryService: service} + trades, err := q.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ + StartTime: &since, + EndTime: &now, + }) + + return trades, err +} + +func (f *ProfitFixer) Fix(ctx context.Context, since time.Time) (*types.ProfitStats, error) { + stats := types.NewProfitStats(f.market) + + var mu sync.Mutex + var allTrades = make([]types.Trade, 0, 1000) + + g, subCtx := errgroup.WithContext(ctx) + for _, service := range f.sessions { + g.Go(func() error { + trades, err := f.batchQueryTrades(subCtx, service, f.market.Symbol, since) + if err != nil { + log.WithError(err).Errorf("unable to batch query trades for fixer") + return err + } + + mu.Lock() + allTrades = append(allTrades, trades...) + mu.Unlock() + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + allTrades = types.SortTradesAscending(allTrades) + for _, trade := range allTrades { + stats.AddTrade(trade) + } + + return stats, nil +} diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index 26c24ebea1..20fae833a6 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -193,6 +193,8 @@ type Strategy struct { // Pips is the pips of the layer prices Pips fixedpoint.Value `json:"pips"` + ProfitFixerConfig *ProfitFixerConfig `json:"profitFixer"` + // -------------------------------- // private fields // -------------------------------- @@ -324,14 +326,30 @@ func (s *Strategy) CrossRun( s.stopC = make(chan struct{}) - if s.RecoverTrade { - // go s.runTradeRecover(ctx) - } - - s.authedC = make(chan struct{}, 2) + s.authedC = make(chan struct{}, 5) bindAuthSignal(ctx, s.makerSession.UserDataStream, s.authedC) bindAuthSignal(ctx, s.hedgeSession.UserDataStream, s.authedC) + if s.ProfitFixerConfig != nil { + if s.ProfitFixerConfig.TradesSince.Time().IsZero() { + return errors.New("tradesSince time can not be zero") + } + + fixer := NewProfitFixer(s.makerMarket) + fixer.AddExchange(s.makerSession.Name, s.makerSession.Exchange.(types.ExchangeTradeHistoryService)) + fixer.AddExchange(s.hedgeSession.Name, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService)) + profitStats, err2 := fixer.Fix(ctx, s.ProfitFixerConfig.TradesSince.Time()) + if err2 != nil { + return err2 + } + + s.CrossExchangeMarketMakingStrategy.ProfitStats = profitStats + } + + if s.RecoverTrade { + go s.runTradeRecover(ctx) + } + go func() { log.Infof("waiting for user data stream to get authenticated") select { @@ -578,16 +596,14 @@ func (s *Strategy) runTradeRecover(ctx context.Context) { case <-tradeScanTicker.C: log.Infof("scanning trades from %s ago...", tradeScanInterval) - if s.RecoverTrade { - startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod) + startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod) - if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { - log.WithError(err).Errorf("query trades error") - } + if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { + log.WithError(err).Errorf("query trades error") + } - if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { - log.WithError(err).Errorf("query trades error") - } + if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { + log.WithError(err).Errorf("query trades error") } } } diff --git a/pkg/types/profit.go b/pkg/types/profit.go index cca9884148..8011efc889 100644 --- a/pkg/types/profit.go +++ b/pkg/types/profit.go @@ -257,6 +257,10 @@ func (s *ProfitStats) AddTrade(trade Trade) { // IsOver24Hours checks if the since time is over 24 hours func (s *ProfitStats) IsOver24Hours() bool { + if s.TodaySince == 0 { + return false + } + return time.Since(time.Unix(s.TodaySince, 0)) >= 24*time.Hour } From a518cf71c0604f9cfbcd891e22db9778ee5b135f Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 5 Mar 2024 18:15:25 +0800 Subject: [PATCH 2/5] xdepthmaker: fix both profit stats and position --- pkg/strategy/xdepthmaker/profitfixer.go | 9 ++++----- pkg/strategy/xdepthmaker/strategy.go | 9 +++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/strategy/xdepthmaker/profitfixer.go b/pkg/strategy/xdepthmaker/profitfixer.go index f662b5bb12..4b8be3570d 100644 --- a/pkg/strategy/xdepthmaker/profitfixer.go +++ b/pkg/strategy/xdepthmaker/profitfixer.go @@ -50,9 +50,7 @@ func (f *ProfitFixer) batchQueryTrades( return trades, err } -func (f *ProfitFixer) Fix(ctx context.Context, since time.Time) (*types.ProfitStats, error) { - stats := types.NewProfitStats(f.market) - +func (f *ProfitFixer) Fix(ctx context.Context, since time.Time, stats *types.ProfitStats, position *types.Position) error { var mu sync.Mutex var allTrades = make([]types.Trade, 0, 1000) @@ -73,13 +71,14 @@ func (f *ProfitFixer) Fix(ctx context.Context, since time.Time) (*types.ProfitSt } if err := g.Wait(); err != nil { - return nil, err + return err } allTrades = types.SortTradesAscending(allTrades) for _, trade := range allTrades { stats.AddTrade(trade) + position.AddTrade(trade) } - return stats, nil + return nil } diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index 20fae833a6..7182848d6f 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -338,12 +338,13 @@ func (s *Strategy) CrossRun( fixer := NewProfitFixer(s.makerMarket) fixer.AddExchange(s.makerSession.Name, s.makerSession.Exchange.(types.ExchangeTradeHistoryService)) fixer.AddExchange(s.hedgeSession.Name, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService)) - profitStats, err2 := fixer.Fix(ctx, s.ProfitFixerConfig.TradesSince.Time()) - if err2 != nil { + + s.CrossExchangeMarketMakingStrategy.Position = types.NewPositionFromMarket(s.makerMarket) + s.CrossExchangeMarketMakingStrategy.ProfitStats = types.NewProfitStats(s.makerMarket) + + if err2 := fixer.Fix(ctx, s.ProfitFixerConfig.TradesSince.Time(), s.CrossExchangeMarketMakingStrategy.ProfitStats, s.CrossExchangeMarketMakingStrategy.Position); err2 != nil { return err2 } - - s.CrossExchangeMarketMakingStrategy.ProfitStats = profitStats } if s.RecoverTrade { From 4bed29ad022e46cc331cc71c3af825e4646d97cc Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 5 Mar 2024 21:11:51 +0800 Subject: [PATCH 3/5] xdepthmaker: pull out until argument --- pkg/strategy/xdepthmaker/profitfixer.go | 14 +++++--------- pkg/strategy/xdepthmaker/strategy.go | 4 ++-- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/pkg/strategy/xdepthmaker/profitfixer.go b/pkg/strategy/xdepthmaker/profitfixer.go index 4b8be3570d..e2d0b48832 100644 --- a/pkg/strategy/xdepthmaker/profitfixer.go +++ b/pkg/strategy/xdepthmaker/profitfixer.go @@ -37,27 +37,23 @@ func (f *ProfitFixer) batchQueryTrades( ctx context.Context, service types.ExchangeTradeHistoryService, symbol string, - since time.Time, + since, until time.Time, ) ([]types.Trade, error) { - - now := time.Now() q := &batch.TradeBatchQuery{ExchangeTradeHistoryService: service} - trades, err := q.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ + return q.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ StartTime: &since, - EndTime: &now, + EndTime: &until, }) - - return trades, err } -func (f *ProfitFixer) Fix(ctx context.Context, since time.Time, stats *types.ProfitStats, position *types.Position) error { +func (f *ProfitFixer) Fix(ctx context.Context, since, until time.Time, stats *types.ProfitStats, position *types.Position) error { var mu sync.Mutex var allTrades = make([]types.Trade, 0, 1000) g, subCtx := errgroup.WithContext(ctx) for _, service := range f.sessions { g.Go(func() error { - trades, err := f.batchQueryTrades(subCtx, service, f.market.Symbol, since) + trades, err := f.batchQueryTrades(subCtx, service, f.market.Symbol, since, until) if err != nil { log.WithError(err).Errorf("unable to batch query trades for fixer") return err diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index 7182848d6f..4ab8027399 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -341,8 +341,8 @@ func (s *Strategy) CrossRun( s.CrossExchangeMarketMakingStrategy.Position = types.NewPositionFromMarket(s.makerMarket) s.CrossExchangeMarketMakingStrategy.ProfitStats = types.NewProfitStats(s.makerMarket) - - if err2 := fixer.Fix(ctx, s.ProfitFixerConfig.TradesSince.Time(), s.CrossExchangeMarketMakingStrategy.ProfitStats, s.CrossExchangeMarketMakingStrategy.Position); err2 != nil { + + if err2 := fixer.Fix(ctx, s.ProfitFixerConfig.TradesSince.Time(), time.Now(), s.CrossExchangeMarketMakingStrategy.ProfitStats, s.CrossExchangeMarketMakingStrategy.Position); err2 != nil { return err2 } } From 26c34618b2eb2f21e52cd5864d2819ef752501ba Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 5 Mar 2024 21:13:19 +0800 Subject: [PATCH 4/5] xdepthmaker: improve fixer logging --- pkg/strategy/xdepthmaker/profitfixer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/strategy/xdepthmaker/profitfixer.go b/pkg/strategy/xdepthmaker/profitfixer.go index e2d0b48832..93234eb869 100644 --- a/pkg/strategy/xdepthmaker/profitfixer.go +++ b/pkg/strategy/xdepthmaker/profitfixer.go @@ -51,8 +51,10 @@ func (f *ProfitFixer) Fix(ctx context.Context, since, until time.Time, stats *ty var allTrades = make([]types.Trade, 0, 1000) g, subCtx := errgroup.WithContext(ctx) - for _, service := range f.sessions { + for n, service := range f.sessions { + sessionName := n g.Go(func() error { + log.Infof("batch querying %s trade history from %s since %s until %s", f.market.Symbol, sessionName, since.String(), until.String()) trades, err := f.batchQueryTrades(subCtx, service, f.market.Symbol, since, until) if err != nil { log.WithError(err).Errorf("unable to batch query trades for fixer") From 0d3483e7c36448937c3e465655fd3d0b3f03966e Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 5 Mar 2024 21:16:35 +0800 Subject: [PATCH 5/5] xdepthmaker: fix loopvar issue --- pkg/strategy/xdepthmaker/profitfixer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/strategy/xdepthmaker/profitfixer.go b/pkg/strategy/xdepthmaker/profitfixer.go index 93234eb869..66905580eb 100644 --- a/pkg/strategy/xdepthmaker/profitfixer.go +++ b/pkg/strategy/xdepthmaker/profitfixer.go @@ -51,8 +51,10 @@ func (f *ProfitFixer) Fix(ctx context.Context, since, until time.Time, stats *ty var allTrades = make([]types.Trade, 0, 1000) g, subCtx := errgroup.WithContext(ctx) - for n, service := range f.sessions { + for n, s := range f.sessions { + // allocate a copy of the iteration variables sessionName := n + service := s g.Go(func() error { log.Infof("batch querying %s trade history from %s since %s until %s", f.market.Symbol, sessionName, since.String(), until.String()) trades, err := f.batchQueryTrades(subCtx, service, f.market.Symbol, since, until)