diff --git a/pkg/strategy/xdepthmaker/profitfixer.go b/pkg/strategy/xdepthmaker/profitfixer.go new file mode 100644 index 0000000000..66905580eb --- /dev/null +++ b/pkg/strategy/xdepthmaker/profitfixer.go @@ -0,0 +1,84 @@ +package xdepthmaker + +import ( + "context" + "sync" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/c9s/bbgo/pkg/exchange/batch" + "github.com/c9s/bbgo/pkg/types" +) + +type ProfitFixerConfig struct { + TradesSince types.Time `json:"tradesSince,omitempty"` +} + +// ProfitFixer implements a trade history based profit fixer +type ProfitFixer struct { + market types.Market + + sessions map[string]types.ExchangeTradeHistoryService +} + +func NewProfitFixer(market types.Market) *ProfitFixer { + return &ProfitFixer{ + market: market, + sessions: make(map[string]types.ExchangeTradeHistoryService), + } +} + +func (f *ProfitFixer) AddExchange(sessionName string, service types.ExchangeTradeHistoryService) { + f.sessions[sessionName] = service +} + +func (f *ProfitFixer) batchQueryTrades( + ctx context.Context, + service types.ExchangeTradeHistoryService, + symbol string, + since, until time.Time, +) ([]types.Trade, error) { + q := &batch.TradeBatchQuery{ExchangeTradeHistoryService: service} + return q.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ + StartTime: &since, + EndTime: &until, + }) +} + +func (f *ProfitFixer) Fix(ctx context.Context, since, until time.Time, stats *types.ProfitStats, position *types.Position) error { + var mu sync.Mutex + var allTrades = make([]types.Trade, 0, 1000) + + g, subCtx := errgroup.WithContext(ctx) + for n, s := range f.sessions { + // allocate a copy of the iteration variables + sessionName := n + service := s + g.Go(func() error { + log.Infof("batch querying %s trade history from %s since %s until %s", f.market.Symbol, sessionName, since.String(), until.String()) + trades, err := f.batchQueryTrades(subCtx, service, f.market.Symbol, since, until) + if err != nil { + log.WithError(err).Errorf("unable to batch query trades for fixer") + return err + } + + mu.Lock() + allTrades = append(allTrades, trades...) + mu.Unlock() + return nil + }) + } + + if err := g.Wait(); err != nil { + return err + } + + allTrades = types.SortTradesAscending(allTrades) + for _, trade := range allTrades { + stats.AddTrade(trade) + position.AddTrade(trade) + } + + return nil +} diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index 26c24ebea1..4ab8027399 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -193,6 +193,8 @@ type Strategy struct { // Pips is the pips of the layer prices Pips fixedpoint.Value `json:"pips"` + ProfitFixerConfig *ProfitFixerConfig `json:"profitFixer"` + // -------------------------------- // private fields // -------------------------------- @@ -324,14 +326,31 @@ func (s *Strategy) CrossRun( s.stopC = make(chan struct{}) - if s.RecoverTrade { - // go s.runTradeRecover(ctx) - } - - s.authedC = make(chan struct{}, 2) + s.authedC = make(chan struct{}, 5) bindAuthSignal(ctx, s.makerSession.UserDataStream, s.authedC) bindAuthSignal(ctx, s.hedgeSession.UserDataStream, s.authedC) + if s.ProfitFixerConfig != nil { + if s.ProfitFixerConfig.TradesSince.Time().IsZero() { + return errors.New("tradesSince time can not be zero") + } + + fixer := NewProfitFixer(s.makerMarket) + fixer.AddExchange(s.makerSession.Name, s.makerSession.Exchange.(types.ExchangeTradeHistoryService)) + fixer.AddExchange(s.hedgeSession.Name, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService)) + + s.CrossExchangeMarketMakingStrategy.Position = types.NewPositionFromMarket(s.makerMarket) + s.CrossExchangeMarketMakingStrategy.ProfitStats = types.NewProfitStats(s.makerMarket) + + if err2 := fixer.Fix(ctx, s.ProfitFixerConfig.TradesSince.Time(), time.Now(), s.CrossExchangeMarketMakingStrategy.ProfitStats, s.CrossExchangeMarketMakingStrategy.Position); err2 != nil { + return err2 + } + } + + if s.RecoverTrade { + go s.runTradeRecover(ctx) + } + go func() { log.Infof("waiting for user data stream to get authenticated") select { @@ -578,16 +597,14 @@ func (s *Strategy) runTradeRecover(ctx context.Context) { case <-tradeScanTicker.C: log.Infof("scanning trades from %s ago...", tradeScanInterval) - if s.RecoverTrade { - startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod) + startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod) - if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { - log.WithError(err).Errorf("query trades error") - } + if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { + log.WithError(err).Errorf("query trades error") + } - if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { - log.WithError(err).Errorf("query trades error") - } + if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { + log.WithError(err).Errorf("query trades error") } } } diff --git a/pkg/types/profit.go b/pkg/types/profit.go index cca9884148..8011efc889 100644 --- a/pkg/types/profit.go +++ b/pkg/types/profit.go @@ -257,6 +257,10 @@ func (s *ProfitStats) AddTrade(trade Trade) { // IsOver24Hours checks if the since time is over 24 hours func (s *ProfitStats) IsOver24Hours() bool { + if s.TodaySince == 0 { + return false + } + return time.Since(time.Unix(s.TodaySince, 0)) >= 24*time.Hour }