From a23c476ce8f39d2cff0bbede62c1846d897964e5 Mon Sep 17 00:00:00 2001 From: kbearXD Date: Fri, 15 Mar 2024 18:41:25 +0800 Subject: [PATCH] dca2: new struct RoundCollector for testing and use flag to decide recovery --- pkg/strategy/dca2/recover.go | 23 +++-- pkg/strategy/dca2/round_collector.go | 125 ++++++++++++++++++++++++ pkg/strategy/dca2/state.go | 6 +- pkg/strategy/dca2/strategy.go | 136 +++++++++------------------ 4 files changed, 185 insertions(+), 105 deletions(-) create mode 100644 pkg/strategy/dca2/round_collector.go diff --git a/pkg/strategy/dca2/recover.go b/pkg/strategy/dca2/recover.go index f8d811ac16..3dae11c6d9 100644 --- a/pkg/strategy/dca2/recover.go +++ b/pkg/strategy/dca2/recover.go @@ -45,20 +45,25 @@ func (s *Strategy) recover(ctx context.Context) error { } debugRoundOrders(s.logger, "current", currentRound) - // TODO: use flag // recover profit stats - /* + if s.DisableProfitStatsRecover { + s.logger.Info("disableProfitStatsRecover is set, skip profit stats recovery") + } else { if err := recoverProfitStats(ctx, s); err != nil { return err } s.logger.Info("recover profit stats DONE") - */ + } // recover position - if err := recoverPosition(ctx, s.Position, queryService, currentRound); err != nil { - return err + if s.DisablePositionRecover { + s.logger.Info("disablePositionRecover is set, skip position recovery") + } else { + if err := recoverPosition(ctx, s.Position, queryService, currentRound); err != nil { + return err + } + s.logger.Info("recover position DONE") } - s.logger.Info("recover position DONE") // recover startTimeOfNextRound startTimeOfNextRound := recoverStartTimeOfNextRound(ctx, currentRound, s.CoolDownInterval) @@ -206,16 +211,14 @@ func recoverPosition(ctx context.Context, position *types.Position, queryService return nil } -// TODO: use flag to decide which to recover -/* func recoverProfitStats(ctx context.Context, strategy *Strategy) error { if strategy.ProfitStats == nil { return fmt.Errorf("profit stats is nil, please check it") } - return strategy.CalculateAndEmitProfit(ctx) + _, err := strategy.UpdateProfitStats(ctx) + return err } -*/ func recoverStartTimeOfNextRound(ctx context.Context, currentRound Round, coolDownInterval types.Duration) time.Time { if currentRound.TakeProfitOrder.OrderID != 0 && currentRound.TakeProfitOrder.Status == types.OrderStatusFilled { diff --git a/pkg/strategy/dca2/round_collector.go b/pkg/strategy/dca2/round_collector.go new file mode 100644 index 0000000000..5e2dd0e6f4 --- /dev/null +++ b/pkg/strategy/dca2/round_collector.go @@ -0,0 +1,125 @@ +package dca2 + +import ( + "context" + "strconv" + "time" + + "github.com/c9s/bbgo/pkg/exchange" + maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi" + "github.com/c9s/bbgo/pkg/exchange/retry" + "github.com/c9s/bbgo/pkg/types" + "github.com/sirupsen/logrus" +) + +type RoundCollector struct { + logger *logrus.Entry + symbol string + groupID uint32 + isMax bool + + // service + historyService types.ExchangeTradeHistoryService + queryService types.ExchangeOrderQueryService +} + +func NewRoundCollector(logger *logrus.Entry, symbol string, groupID uint32, ex types.Exchange) *RoundCollector { + isMax := exchange.IsMaxExchange(ex) + historyService, ok := ex.(types.ExchangeTradeHistoryService) + if !ok { + logger.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", ex.Name()) + return nil + } + + queryService, ok := ex.(types.ExchangeOrderQueryService) + if !ok { + logger.Errorf("exchange %s doesn't support ExchangeOrderQueryService", ex.Name()) + return nil + } + + return &RoundCollector{ + logger: logger, + symbol: symbol, + groupID: groupID, + isMax: isMax, + historyService: historyService, + queryService: queryService, + } +} + +func (rc *RoundCollector) CollectFinishRounds(ctx context.Context, fromOrderID uint64) ([]Round, error) { + // TODO: pagination for it + // query the orders + rc.logger.Infof("query %s closed orders from order id #%d", rc.symbol, fromOrderID) + orders, err := retry.QueryClosedOrdersUntilSuccessfulLite(ctx, rc.historyService, rc.symbol, time.Time{}, time.Time{}, fromOrderID) + if err != nil { + return nil, err + } + rc.logger.Infof("there are %d closed orders from order id #%d", len(orders), fromOrderID) + + var rounds []Round + var round Round + for _, order := range orders { + // skip not this strategy order + if order.GroupID != rc.groupID { + continue + } + + switch order.Side { + case types.SideTypeBuy: + round.OpenPositionOrders = append(round.OpenPositionOrders, order) + case types.SideTypeSell: + if !rc.isMax { + if order.Status != types.OrderStatusFilled { + rc.logger.Infof("take-profit order is %s not filled, so this round is not finished. Skip it", order.Status) + continue + } + } else { + if !maxapi.IsFilledOrderState(maxapi.OrderState(order.OriginalStatus)) { + rc.logger.Infof("isMax and take-profit order is %s not done or finalizing, so this round is not finished. Skip it", order.OriginalStatus) + continue + } + } + + round.TakeProfitOrder = order + rounds = append(rounds, round) + round = Round{} + default: + rc.logger.Errorf("there is order with unsupported side") + } + } + + return rounds, nil +} + +func (rc *RoundCollector) CollectRoundTrades(ctx context.Context, round Round) ([]types.Trade, error) { + debugRoundOrders(rc.logger, "collect round trades", round) + + var roundTrades []types.Trade + + var roundOrders []types.Order = round.OpenPositionOrders + roundOrders = append(roundOrders, round.TakeProfitOrder) + + for _, order := range roundOrders { + rc.logger.Infof("collect trades from order: %s", order.String()) + if order.ExecutedQuantity.Sign() == 0 { + rc.logger.Info("collect trads from order but no executed quantity ", order.String()) + continue + } else { + rc.logger.Info("collect trades from order ", order.String()) + } + + trades, err := retry.QueryOrderTradesUntilSuccessfulLite(ctx, rc.queryService, types.OrderQuery{ + Symbol: order.Symbol, + OrderID: strconv.FormatUint(order.OrderID, 10), + }) + + if err != nil { + return nil, err + } + + roundTrades = append(roundTrades, trades...) + } + + return roundTrades, nil +} diff --git a/pkg/strategy/dca2/state.go b/pkg/strategy/dca2/state.go index 722a527748..dac5c5d8ea 100644 --- a/pkg/strategy/dca2/state.go +++ b/pkg/strategy/dca2/state.go @@ -198,10 +198,8 @@ func (s *Strategy) runTakeProfitReady(ctx context.Context, next State) { s.logger.Info("[State] TakeProfitReady - start reseting position and calculate quote investment for next round") - // reset position - - // calculate profit stats - if err := s.CalculateAndEmitProfitUntilSuccessful(ctx); err != nil { + // update profit stats + if err := s.UpdateProfitStatsUntilSuccessful(ctx); err != nil { s.logger.WithError(err).Warn("failed to calculate and emit profit") } diff --git a/pkg/strategy/dca2/strategy.go b/pkg/strategy/dca2/strategy.go index 6519be79d1..89307f98ab 100644 --- a/pkg/strategy/dca2/strategy.go +++ b/pkg/strategy/dca2/strategy.go @@ -4,18 +4,16 @@ import ( "context" "fmt" "math" - "strconv" "sync" "time" + "github.com/cenkalti/backoff/v4" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "go.uber.org/multierr" "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/exchange" - maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi" "github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/strategy/common" @@ -67,7 +65,9 @@ type Strategy struct { OrderGroupID uint32 `json:"orderGroupID"` // RecoverWhenStart option is used for recovering dca states - RecoverWhenStart bool `json:"recoverWhenStart"` + RecoverWhenStart bool `json:"recoverWhenStart"` + DisableProfitStatsRecover bool `json:"disableProfitStatsRecover"` + DisablePositionRecover bool `json:"disablePositionRecover"` // KeepOrdersWhenShutdown option is used for keeping the grid orders when shutting down bbgo KeepOrdersWhenShutdown bool `json:"keepOrdersWhenShutdown"` @@ -91,6 +91,7 @@ type Strategy struct { startTimeOfNextRound time.Time nextStateC chan State state State + roundCollector *RoundCollector // callbacks common.StatusCallbacks @@ -171,6 +172,12 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. s.Position = types.NewPositionFromMarket(s.Market) } + // round collector + s.roundCollector = NewRoundCollector(s.logger, s.Symbol, s.OrderGroupID, s.ExchangeSession.Exchange) + if s.roundCollector == nil { + return fmt.Errorf("failed to initialize round collector") + } + // prometheus if s.PrometheusLabels != nil { initMetrics(labelKeys(s.PrometheusLabels)) @@ -373,111 +380,57 @@ func (s *Strategy) CleanUp(ctx context.Context) error { return werr } -func (s *Strategy) CalculateAndEmitProfitUntilSuccessful(ctx context.Context) error { - fromOrderID := s.ProfitStats.FromOrderID - - historyService, ok := s.ExchangeSession.Exchange.(types.ExchangeTradeHistoryService) - if !ok { - return fmt.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", s.ExchangeSession.Exchange.Name()) - } - - queryService, ok := s.ExchangeSession.Exchange.(types.ExchangeOrderQueryService) - if !ok { - return fmt.Errorf("exchange %s doesn't support ExchangeOrderQueryService", s.ExchangeSession.Exchange.Name()) - } - +func (s *Strategy) UpdateProfitStatsUntilSuccessful(ctx context.Context) error { var op = func() error { - if err := s.CalculateAndEmitProfit(ctx, historyService, queryService); err != nil { - return errors.Wrapf(err, "failed to calculate and emit profit, please check it") - } - - if s.ProfitStats.FromOrderID == fromOrderID { - return fmt.Errorf("FromOrderID (%d) is not updated, retry it", s.ProfitStats.FromOrderID) + if updated, err := s.UpdateProfitStats(ctx); err != nil { + return errors.Wrapf(err, "failed to update profit stats, please check it") + } else if !updated { + return fmt.Errorf("there is no round to update profit stats, please check it") } return nil } - return retry.GeneralBackoff(ctx, op) + // exponential increased interval retry until success + bo := backoff.NewExponentialBackOff() + bo.InitialInterval = 5 * time.Second + bo.MaxInterval = 20 * time.Minute + bo.MaxElapsedTime = 0 + + return backoff.Retry(op, backoff.WithContext(bo, ctx)) } -func (s *Strategy) CalculateAndEmitProfit(ctx context.Context, historyService types.ExchangeTradeHistoryService, queryService types.ExchangeOrderQueryService) error { - // TODO: pagination for it - // query the orders - s.logger.Infof("query %s closed orders from order id #%d", s.Symbol, s.ProfitStats.FromOrderID) - orders, err := retry.QueryClosedOrdersUntilSuccessfulLite(ctx, historyService, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID) +// UpdateProfitStats will collect round from closed orders and emit update profit stats +// return true, nil -> there is at least one finished round and all the finished rounds we collect update profit stats successfully +// return false, nil -> there is no finished round! +// return true, error -> At least one round update profit stats successfully but there is error when collecting other rounds +func (s *Strategy) UpdateProfitStats(ctx context.Context) (bool, error) { + rounds, err := s.roundCollector.CollectFinishRounds(ctx, s.ProfitStats.FromOrderID) if err != nil { - return err + return false, errors.Wrapf(err, "failed to collect finish rounds from #%d", s.ProfitStats.FromOrderID) } - s.logger.Infof("there are %d closed orders from order id #%d", len(orders), s.ProfitStats.FromOrderID) - - isMax := exchange.IsMaxExchange(s.ExchangeSession.Exchange) - - var rounds []Round - var round Round - for _, order := range orders { - // skip not this strategy order - if order.GroupID != s.OrderGroupID { - continue - } - switch order.Side { - case types.SideTypeBuy: - round.OpenPositionOrders = append(round.OpenPositionOrders, order) - case types.SideTypeSell: - if !isMax { - if order.Status != types.OrderStatusFilled { - s.logger.Infof("take-profit order is %s not filled, so this round is not finished. Skip it", order.Status) - continue - } - } else { - if !maxapi.IsFilledOrderState(maxapi.OrderState(order.OriginalStatus)) { - s.logger.Infof("isMax and take-profit order is %s not done or finalizing, so this round is not finished. Skip it", order.OriginalStatus) - continue - } - } - - round.TakeProfitOrder = order - rounds = append(rounds, round) - round = Round{} - default: - s.logger.Errorf("there is order with unsupported side") - } - } - - s.logger.Infof("there are %d rounds from order id #%d", len(rounds), s.ProfitStats.FromOrderID) + var updated bool = false for _, round := range rounds { - debugRoundOrders(s.logger, strconv.FormatInt(s.ProfitStats.Round, 10), round) - var roundOrders []types.Order = round.OpenPositionOrders - roundOrders = append(roundOrders, round.TakeProfitOrder) - for _, order := range roundOrders { - s.logger.Infof("calculate profit stats from order: %s", order.String()) - - // skip no trade orders - if order.ExecutedQuantity.Sign() == 0 { - continue - } - - trades, err := retry.QueryOrderTradesUntilSuccessfulLite(ctx, queryService, types.OrderQuery{ - Symbol: order.Symbol, - OrderID: strconv.FormatUint(order.OrderID, 10), - }) - - if err != nil { - return err - } + trades, err := s.roundCollector.CollectRoundTrades(ctx, round) + if err != nil { + return updated, errors.Wrapf(err, "failed to collect the trades of round") + } - for _, trade := range trades { - s.logger.Infof("calculate profit stats from trade: %s", trade.String()) - s.ProfitStats.AddTrade(trade) - } + for _, trade := range trades { + s.logger.Infof("update profit stats from trade: %s", trade.String()) + s.ProfitStats.AddTrade(trade) } + // update profit stats FromOrderID to make sure we will not collect duplicated rounds s.ProfitStats.FromOrderID = round.TakeProfitOrder.OrderID + 1 + + // update quote investment s.ProfitStats.QuoteInvestment = s.ProfitStats.QuoteInvestment.Add(s.ProfitStats.CurrentRoundProfit) - // store into persistence + // sync to persistence bbgo.Sync(ctx, s) + updated = true s.logger.Infof("profit stats:\n%s", s.ProfitStats.String()) @@ -485,10 +438,11 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context, historyService ty s.EmitProfit(s.ProfitStats) updateProfitMetrics(s.ProfitStats.Round, s.ProfitStats.CurrentRoundProfit.Float64()) + // make profit stats forward to new round s.ProfitStats.NewRound() } - return nil + return updated, nil } func (s *Strategy) updateNumOfOrdersMetrics(ctx context.Context) {