diff --git a/pkg/exchange/retry/order.go b/pkg/exchange/retry/order.go index 2575f51aaa..0191380b52 100644 --- a/pkg/exchange/retry/order.go +++ b/pkg/exchange/retry/order.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strconv" + "time" "github.com/cenkalti/backoff/v4" @@ -119,6 +120,54 @@ func QueryOpenOrdersUntilSuccessfulLite( return openOrders, err } +func QueryClosedOrdersUntilSuccessful( + ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, since, until time.Time, lastOrderID uint64, +) (closedOrders []types.Order, err error) { + var op = func() (err2 error) { + closedOrders, err2 = ex.QueryClosedOrders(ctx, symbol, since, until, lastOrderID) + return err2 + } + + err = GeneralBackoff(ctx, op) + return closedOrders, err +} + +func QueryClosedOrdersUntilSuccessfulLite( + ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, since, until time.Time, lastOrderID uint64, +) (closedOrders []types.Order, err error) { + var op = func() (err2 error) { + closedOrders, err2 = ex.QueryClosedOrders(ctx, symbol, since, until, lastOrderID) + return err2 + } + + err = GeneralLiteBackoff(ctx, op) + return closedOrders, err +} + +func QueryOrderTradesUntilSuccessful( + ctx context.Context, ex types.ExchangeOrderQueryService, q types.OrderQuery, +) (trades []types.Trade, err error) { + var op = func() (err2 error) { + trades, err2 = ex.QueryOrderTrades(ctx, q) + return err2 + } + + err = GeneralBackoff(ctx, op) + return trades, err +} + +func QueryOrderTradesUntilSuccessfulLite( + ctx context.Context, ex types.ExchangeOrderQueryService, q types.OrderQuery, +) (trades []types.Trade, err error) { + var op = func() (err2 error) { + trades, err2 = ex.QueryOrderTrades(ctx, q) + return err2 + } + + err = GeneralLiteBackoff(ctx, op) + return trades, err +} + func QueryAccountUntilSuccessful( ctx context.Context, ex types.ExchangeAccountService, ) (account *types.Account, err error) { diff --git a/pkg/strategy/dca2/active_order_recover.go b/pkg/strategy/dca2/active_order_recover.go index 08894fd232..cbde64e632 100644 --- a/pkg/strategy/dca2/active_order_recover.go +++ b/pkg/strategy/dca2/active_order_recover.go @@ -10,7 +10,7 @@ import ( ) func (s *Strategy) recoverPeriodically(ctx context.Context) { - s.logger.Info("[DCA] monitor and recover periodically") + s.logger.Info("monitor and recover periodically") interval := util.MillisecondsJitter(10*time.Minute, 5*60*1000) ticker := time.NewTicker(interval) defer ticker.Stop() diff --git a/pkg/strategy/dca2/open_position.go b/pkg/strategy/dca2/open_position.go index db6f0b23ca..d9004e054d 100644 --- a/pkg/strategy/dca2/open_position.go +++ b/pkg/strategy/dca2/open_position.go @@ -15,7 +15,7 @@ type cancelOrdersByGroupIDApi interface { } func (s *Strategy) placeOpenPositionOrders(ctx context.Context) error { - s.logger.Infof("[DCA] start placing open position orders") + s.logger.Infof("start placing open position orders") price, err := getBestPriceUntilSuccess(ctx, s.ExchangeSession.Exchange, s.Symbol) if err != nil { return err diff --git a/pkg/strategy/dca2/strategy.go b/pkg/strategy/dca2/strategy.go index 3c3462c176..c335cec2d6 100644 --- a/pkg/strategy/dca2/strategy.go +++ b/pkg/strategy/dca2/strategy.go @@ -203,7 +203,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. // order executor s.OrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { - s.logger.Infof("[DCA] POSITION UPDATE: %s", s.Position.String()) + s.logger.Infof("POSITION UPDATE: %s", s.Position.String()) bbgo.Sync(ctx, s) // update take profit price here @@ -211,7 +211,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. }) s.OrderExecutor.ActiveMakerOrders().OnFilled(func(o types.Order) { - s.logger.Infof("[DCA] FILLED ORDER: %s", o.String()) + s.logger.Infof("FILLED ORDER: %s", o.String()) openPositionSide := types.SideTypeBuy takeProfitSide := types.SideTypeSell @@ -221,7 +221,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. case takeProfitSide: s.emitNextState(WaitToOpenPosition) default: - s.logger.Infof("[DCA] unsupported side (%s) of order: %s", o.Side, o) + s.logger.Infof("unsupported side (%s) of order: %s", o.Side, o) } // update metrics when filled @@ -244,7 +244,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. }) session.UserDataStream.OnAuth(func() { - s.logger.Info("[DCA] user data stream authenticated") + s.logger.Info("user data stream authenticated") time.AfterFunc(3*time.Second, func() { if isInitialize := s.initializeNextStateC(); !isInitialize { @@ -255,16 +255,29 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. s.updateState(WaitToOpenPosition) } else { // recover - if err := s.recover(ctx); err != nil { - s.logger.WithError(err).Error("[DCA] something wrong when state recovering") - return + maxTry := 3 + for try := 1; try <= maxTry; try++ { + s.logger.Infof("try #%d recover", try) + + err := s.recover(ctx) + if err == nil { + s.logger.Infof("recover successfully at #%d", try) + break + } + + s.logger.WithError(err).Warnf("failed to recover at #%d", try) + + if try == 3 { + s.logger.Errorf("failed to recover after %d trying, please check it", maxTry) + return + } } } - s.logger.Infof("[DCA] state: %d", s.state) - s.logger.Infof("[DCA] position %s", s.Position.String()) - s.logger.Infof("[DCA] profit stats %s", s.ProfitStats.String()) - s.logger.Infof("[DCA] startTimeOfNextRound %s", s.startTimeOfNextRound) + s.logger.Infof("state: %d", s.state) + s.logger.Infof("position %s", s.Position.String()) + s.logger.Infof("profit stats %s", s.ProfitStats.String()) + s.logger.Infof("startTimeOfNextRound %s", s.startTimeOfNextRound) s.updateTakeProfitPrice() @@ -299,17 +312,17 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. func (s *Strategy) updateTakeProfitPrice() { takeProfitRatio := s.TakeProfitRatio s.takeProfitPrice = s.Market.TruncatePrice(s.Position.AverageCost.Mul(fixedpoint.One.Add(takeProfitRatio))) - s.logger.Infof("[DCA] cost: %s, ratio: %s, price: %s", s.Position.AverageCost, takeProfitRatio, s.takeProfitPrice) + s.logger.Infof("cost: %s, ratio: %s, price: %s", s.Position.AverageCost, takeProfitRatio, s.takeProfitPrice) } func (s *Strategy) Close(ctx context.Context) error { - s.logger.Infof("[DCA] closing %s dca2", s.Symbol) + s.logger.Infof("closing %s dca2", s.Symbol) defer s.EmitClosed() err := s.OrderExecutor.GracefulCancel(ctx) if err != nil { - s.logger.WithError(err).Errorf("[DCA] there are errors when cancelling orders at close") + s.logger.WithError(err).Errorf("there are errors when cancelling orders at close") } bbgo.Sync(ctx, s) @@ -370,10 +383,12 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error { // TODO: pagination for it // query the orders - orders, err := historyService.QueryClosedOrders(ctx, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID) + s.logger.Infof("query %s closed orders from order id #%d", s.Symbol, s.ProfitStats.FromOrderID) + orders, err := retry.QueryClosedOrdersUntilSuccessfulLite(ctx, historyService, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID) if err != nil { return err } + s.logger.Infof("there are %d closed orders from order id #%d", len(orders), s.ProfitStats.FromOrderID) var rounds []Round var round Round @@ -398,18 +413,20 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error { } } + s.logger.Infof("there are %d rounds from order id #%d", len(rounds), s.ProfitStats.FromOrderID) for _, round := range rounds { + debugRoundOrders(s.logger, "calculate", round) var roundOrders []types.Order = round.OpenPositionOrders roundOrders = append(roundOrders, round.TakeProfitOrder) for _, order := range roundOrders { - s.logger.Infof("[DCA] calculate profit stats from order: %s", order.String()) + s.logger.Infof("calculate profit stats from order: %s", order.String()) // skip no trade orders if order.ExecutedQuantity.Sign() == 0 { continue } - trades, err := queryService.QueryOrderTrades(ctx, types.OrderQuery{ + trades, err := retry.QueryOrderTradesUntilSuccessfulLite(ctx, queryService, types.OrderQuery{ Symbol: order.Symbol, OrderID: strconv.FormatUint(order.OrderID, 10), }) @@ -419,7 +436,7 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error { } for _, trade := range trades { - s.logger.Infof("[DCA] calculate profit stats from trade: %s", trade.String()) + s.logger.Infof("calculate profit stats from trade: %s", trade.String()) s.ProfitStats.AddTrade(trade) } } @@ -430,7 +447,7 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error { // store into persistence bbgo.Sync(ctx, s) - s.logger.Infof("[DCA] profit stats:\n%s", s.ProfitStats.String()) + s.logger.Infof("profit stats:\n%s", s.ProfitStats.String()) // emit profit s.EmitProfit(s.ProfitStats) diff --git a/pkg/strategy/dca2/take_profit.go b/pkg/strategy/dca2/take_profit.go index adfa0bb579..8fed7a03ca 100644 --- a/pkg/strategy/dca2/take_profit.go +++ b/pkg/strategy/dca2/take_profit.go @@ -8,7 +8,7 @@ import ( ) func (s *Strategy) placeTakeProfitOrders(ctx context.Context) error { - s.logger.Info("[DCA] start placing take profit orders") + s.logger.Info("start placing take profit orders") order := generateTakeProfitOrder(s.Market, s.TakeProfitRatio, s.Position, s.OrderGroupID) createdOrders, err := s.OrderExecutor.SubmitOrders(ctx, order) if err != nil {