Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dca2: new struct RoundCollector for testing and use flag to decide re… #1586

Merged
merged 1 commit into from Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 13 additions & 10 deletions pkg/strategy/dca2/recover.go
Expand Up @@ -45,20 +45,25 @@ func (s *Strategy) recover(ctx context.Context) error {
}
debugRoundOrders(s.logger, "current", currentRound)

// TODO: use flag
// recover profit stats
/*
if s.DisableProfitStatsRecover {
s.logger.Info("disableProfitStatsRecover is set, skip profit stats recovery")
} else {
if err := recoverProfitStats(ctx, s); err != nil {
return err
}
s.logger.Info("recover profit stats DONE")
*/
}

// recover position
if err := recoverPosition(ctx, s.Position, queryService, currentRound); err != nil {
return err
if s.DisablePositionRecover {
s.logger.Info("disablePositionRecover is set, skip position recovery")
} else {
if err := recoverPosition(ctx, s.Position, queryService, currentRound); err != nil {
return err
}
s.logger.Info("recover position DONE")
}
s.logger.Info("recover position DONE")

// recover startTimeOfNextRound
startTimeOfNextRound := recoverStartTimeOfNextRound(ctx, currentRound, s.CoolDownInterval)
Expand Down Expand Up @@ -206,16 +211,14 @@ func recoverPosition(ctx context.Context, position *types.Position, queryService
return nil
}

// TODO: use flag to decide which to recover
/*
func recoverProfitStats(ctx context.Context, strategy *Strategy) error {
if strategy.ProfitStats == nil {
return fmt.Errorf("profit stats is nil, please check it")
}

return strategy.CalculateAndEmitProfit(ctx)
_, err := strategy.UpdateProfitStats(ctx)
return err
}
*/

func recoverStartTimeOfNextRound(ctx context.Context, currentRound Round, coolDownInterval types.Duration) time.Time {
if currentRound.TakeProfitOrder.OrderID != 0 && currentRound.TakeProfitOrder.Status == types.OrderStatusFilled {
Expand Down
125 changes: 125 additions & 0 deletions pkg/strategy/dca2/round_collector.go
@@ -0,0 +1,125 @@
package dca2

import (
"context"
"strconv"
"time"

"github.com/c9s/bbgo/pkg/exchange"
maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/types"
"github.com/sirupsen/logrus"
)

type RoundCollector struct {
logger *logrus.Entry
symbol string
groupID uint32
isMax bool

// service
historyService types.ExchangeTradeHistoryService
queryService types.ExchangeOrderQueryService
}

func NewRoundCollector(logger *logrus.Entry, symbol string, groupID uint32, ex types.Exchange) *RoundCollector {
isMax := exchange.IsMaxExchange(ex)
historyService, ok := ex.(types.ExchangeTradeHistoryService)
if !ok {
logger.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", ex.Name())
return nil
}

queryService, ok := ex.(types.ExchangeOrderQueryService)
if !ok {
logger.Errorf("exchange %s doesn't support ExchangeOrderQueryService", ex.Name())
return nil
}

return &RoundCollector{
logger: logger,
symbol: symbol,
groupID: groupID,
isMax: isMax,
historyService: historyService,
queryService: queryService,
}
}

func (rc *RoundCollector) CollectFinishRounds(ctx context.Context, fromOrderID uint64) ([]Round, error) {
// TODO: pagination for it
// query the orders
rc.logger.Infof("query %s closed orders from order id #%d", rc.symbol, fromOrderID)
orders, err := retry.QueryClosedOrdersUntilSuccessfulLite(ctx, rc.historyService, rc.symbol, time.Time{}, time.Time{}, fromOrderID)
if err != nil {
return nil, err
}
rc.logger.Infof("there are %d closed orders from order id #%d", len(orders), fromOrderID)

var rounds []Round
var round Round
for _, order := range orders {
// skip not this strategy order
if order.GroupID != rc.groupID {
continue
}

switch order.Side {
case types.SideTypeBuy:
round.OpenPositionOrders = append(round.OpenPositionOrders, order)
case types.SideTypeSell:
if !rc.isMax {
if order.Status != types.OrderStatusFilled {
rc.logger.Infof("take-profit order is %s not filled, so this round is not finished. Skip it", order.Status)
continue
}
} else {
if !maxapi.IsFilledOrderState(maxapi.OrderState(order.OriginalStatus)) {
rc.logger.Infof("isMax and take-profit order is %s not done or finalizing, so this round is not finished. Skip it", order.OriginalStatus)
continue
}
}

round.TakeProfitOrder = order
rounds = append(rounds, round)
round = Round{}
default:
rc.logger.Errorf("there is order with unsupported side")
}
}

return rounds, nil
}

func (rc *RoundCollector) CollectRoundTrades(ctx context.Context, round Round) ([]types.Trade, error) {
debugRoundOrders(rc.logger, "collect round trades", round)

var roundTrades []types.Trade

var roundOrders []types.Order = round.OpenPositionOrders
roundOrders = append(roundOrders, round.TakeProfitOrder)

for _, order := range roundOrders {
rc.logger.Infof("collect trades from order: %s", order.String())
if order.ExecutedQuantity.Sign() == 0 {
rc.logger.Info("collect trads from order but no executed quantity ", order.String())
continue
} else {
rc.logger.Info("collect trades from order ", order.String())
}

trades, err := retry.QueryOrderTradesUntilSuccessfulLite(ctx, rc.queryService, types.OrderQuery{
Symbol: order.Symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
})

if err != nil {
return nil, err
}

roundTrades = append(roundTrades, trades...)
}

return roundTrades, nil
}
6 changes: 2 additions & 4 deletions pkg/strategy/dca2/state.go
Expand Up @@ -198,10 +198,8 @@ func (s *Strategy) runTakeProfitReady(ctx context.Context, next State) {

s.logger.Info("[State] TakeProfitReady - start reseting position and calculate quote investment for next round")

// reset position

// calculate profit stats
if err := s.CalculateAndEmitProfitUntilSuccessful(ctx); err != nil {
// update profit stats
if err := s.UpdateProfitStatsUntilSuccessful(ctx); err != nil {
s.logger.WithError(err).Warn("failed to calculate and emit profit")
}

Expand Down
136 changes: 45 additions & 91 deletions pkg/strategy/dca2/strategy.go
Expand Up @@ -4,18 +4,16 @@ import (
"context"
"fmt"
"math"
"strconv"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"

"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange"
maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/strategy/common"
Expand Down Expand Up @@ -67,7 +65,9 @@ type Strategy struct {
OrderGroupID uint32 `json:"orderGroupID"`

// RecoverWhenStart option is used for recovering dca states
RecoverWhenStart bool `json:"recoverWhenStart"`
RecoverWhenStart bool `json:"recoverWhenStart"`
DisableProfitStatsRecover bool `json:"disableProfitStatsRecover"`
DisablePositionRecover bool `json:"disablePositionRecover"`

// KeepOrdersWhenShutdown option is used for keeping the grid orders when shutting down bbgo
KeepOrdersWhenShutdown bool `json:"keepOrdersWhenShutdown"`
Expand All @@ -91,6 +91,7 @@ type Strategy struct {
startTimeOfNextRound time.Time
nextStateC chan State
state State
roundCollector *RoundCollector

// callbacks
common.StatusCallbacks
Expand Down Expand Up @@ -171,6 +172,12 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.Position = types.NewPositionFromMarket(s.Market)
}

// round collector
s.roundCollector = NewRoundCollector(s.logger, s.Symbol, s.OrderGroupID, s.ExchangeSession.Exchange)
if s.roundCollector == nil {
return fmt.Errorf("failed to initialize round collector")
}

// prometheus
if s.PrometheusLabels != nil {
initMetrics(labelKeys(s.PrometheusLabels))
Expand Down Expand Up @@ -373,122 +380,69 @@ func (s *Strategy) CleanUp(ctx context.Context) error {
return werr
}

func (s *Strategy) CalculateAndEmitProfitUntilSuccessful(ctx context.Context) error {
fromOrderID := s.ProfitStats.FromOrderID

historyService, ok := s.ExchangeSession.Exchange.(types.ExchangeTradeHistoryService)
if !ok {
return fmt.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", s.ExchangeSession.Exchange.Name())
}

queryService, ok := s.ExchangeSession.Exchange.(types.ExchangeOrderQueryService)
if !ok {
return fmt.Errorf("exchange %s doesn't support ExchangeOrderQueryService", s.ExchangeSession.Exchange.Name())
}

func (s *Strategy) UpdateProfitStatsUntilSuccessful(ctx context.Context) error {
var op = func() error {
if err := s.CalculateAndEmitProfit(ctx, historyService, queryService); err != nil {
return errors.Wrapf(err, "failed to calculate and emit profit, please check it")
}

if s.ProfitStats.FromOrderID == fromOrderID {
return fmt.Errorf("FromOrderID (%d) is not updated, retry it", s.ProfitStats.FromOrderID)
if updated, err := s.UpdateProfitStats(ctx); err != nil {
return errors.Wrapf(err, "failed to update profit stats, please check it")
} else if !updated {
return fmt.Errorf("there is no round to update profit stats, please check it")
}

return nil
}

return retry.GeneralBackoff(ctx, op)
// exponential increased interval retry until success
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 5 * time.Second
bo.MaxInterval = 20 * time.Minute
bo.MaxElapsedTime = 0

return backoff.Retry(op, backoff.WithContext(bo, ctx))
}

func (s *Strategy) CalculateAndEmitProfit(ctx context.Context, historyService types.ExchangeTradeHistoryService, queryService types.ExchangeOrderQueryService) error {
// TODO: pagination for it
// query the orders
s.logger.Infof("query %s closed orders from order id #%d", s.Symbol, s.ProfitStats.FromOrderID)
orders, err := retry.QueryClosedOrdersUntilSuccessfulLite(ctx, historyService, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID)
// UpdateProfitStats will collect round from closed orders and emit update profit stats
// return true, nil -> there is at least one finished round and all the finished rounds we collect update profit stats successfully
// return false, nil -> there is no finished round!
// return true, error -> At least one round update profit stats successfully but there is error when collecting other rounds
func (s *Strategy) UpdateProfitStats(ctx context.Context) (bool, error) {
rounds, err := s.roundCollector.CollectFinishRounds(ctx, s.ProfitStats.FromOrderID)
if err != nil {
return err
return false, errors.Wrapf(err, "failed to collect finish rounds from #%d", s.ProfitStats.FromOrderID)
}
s.logger.Infof("there are %d closed orders from order id #%d", len(orders), s.ProfitStats.FromOrderID)

isMax := exchange.IsMaxExchange(s.ExchangeSession.Exchange)

var rounds []Round
var round Round
for _, order := range orders {
// skip not this strategy order
if order.GroupID != s.OrderGroupID {
continue
}

switch order.Side {
case types.SideTypeBuy:
round.OpenPositionOrders = append(round.OpenPositionOrders, order)
case types.SideTypeSell:
if !isMax {
if order.Status != types.OrderStatusFilled {
s.logger.Infof("take-profit order is %s not filled, so this round is not finished. Skip it", order.Status)
continue
}
} else {
if !maxapi.IsFilledOrderState(maxapi.OrderState(order.OriginalStatus)) {
s.logger.Infof("isMax and take-profit order is %s not done or finalizing, so this round is not finished. Skip it", order.OriginalStatus)
continue
}
}

round.TakeProfitOrder = order
rounds = append(rounds, round)
round = Round{}
default:
s.logger.Errorf("there is order with unsupported side")
}
}

s.logger.Infof("there are %d rounds from order id #%d", len(rounds), s.ProfitStats.FromOrderID)
var updated bool = false
for _, round := range rounds {
debugRoundOrders(s.logger, strconv.FormatInt(s.ProfitStats.Round, 10), round)
var roundOrders []types.Order = round.OpenPositionOrders
roundOrders = append(roundOrders, round.TakeProfitOrder)
for _, order := range roundOrders {
s.logger.Infof("calculate profit stats from order: %s", order.String())

// skip no trade orders
if order.ExecutedQuantity.Sign() == 0 {
continue
}

trades, err := retry.QueryOrderTradesUntilSuccessfulLite(ctx, queryService, types.OrderQuery{
Symbol: order.Symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
})

if err != nil {
return err
}
trades, err := s.roundCollector.CollectRoundTrades(ctx, round)
if err != nil {
return updated, errors.Wrapf(err, "failed to collect the trades of round")
}

for _, trade := range trades {
s.logger.Infof("calculate profit stats from trade: %s", trade.String())
s.ProfitStats.AddTrade(trade)
}
for _, trade := range trades {
s.logger.Infof("update profit stats from trade: %s", trade.String())
s.ProfitStats.AddTrade(trade)
}

// update profit stats FromOrderID to make sure we will not collect duplicated rounds
s.ProfitStats.FromOrderID = round.TakeProfitOrder.OrderID + 1

// update quote investment
s.ProfitStats.QuoteInvestment = s.ProfitStats.QuoteInvestment.Add(s.ProfitStats.CurrentRoundProfit)

// store into persistence
// sync to persistence
bbgo.Sync(ctx, s)
updated = true

s.logger.Infof("profit stats:\n%s", s.ProfitStats.String())

// emit profit
s.EmitProfit(s.ProfitStats)
updateProfitMetrics(s.ProfitStats.Round, s.ProfitStats.CurrentRoundProfit.Float64())

// make profit stats forward to new round
s.ProfitStats.NewRound()
}

return nil
return updated, nil
}

func (s *Strategy) updateNumOfOrdersMetrics(ctx context.Context) {
Expand Down