Skip to content

Commit

Permalink
dca2: add more log and retry
Browse files Browse the repository at this point in the history
  • Loading branch information
kbearXD committed Mar 12, 2024
1 parent c5dbd4b commit 36b576b
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 22 deletions.
49 changes: 49 additions & 0 deletions pkg/exchange/retry/order.go
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strconv"
"time"

"github.com/cenkalti/backoff/v4"

Expand Down Expand Up @@ -119,6 +120,54 @@ func QueryOpenOrdersUntilSuccessfulLite(
return openOrders, err
}

func QueryClosedOrdersUntilSuccessful(
ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, since, until time.Time, lastOrderID uint64,
) (closedOrders []types.Order, err error) {
var op = func() (err2 error) {
closedOrders, err2 = ex.QueryClosedOrders(ctx, symbol, since, until, lastOrderID)
return err2
}

err = GeneralBackoff(ctx, op)
return closedOrders, err
}

func QueryClosedOrdersUntilSuccessfulLite(
ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, since, until time.Time, lastOrderID uint64,
) (closedOrders []types.Order, err error) {
var op = func() (err2 error) {
closedOrders, err2 = ex.QueryClosedOrders(ctx, symbol, since, until, lastOrderID)
return err2
}

err = GeneralLiteBackoff(ctx, op)
return closedOrders, err
}

func QueryOrderTradesUntilSuccessful(
ctx context.Context, ex types.ExchangeOrderQueryService, q types.OrderQuery,
) (trades []types.Trade, err error) {
var op = func() (err2 error) {
trades, err2 = ex.QueryOrderTrades(ctx, q)
return err2
}

err = GeneralBackoff(ctx, op)
return trades, err
}

func QueryOrderTradesUntilSuccessfulLite(
ctx context.Context, ex types.ExchangeOrderQueryService, q types.OrderQuery,
) (trades []types.Trade, err error) {
var op = func() (err2 error) {
trades, err2 = ex.QueryOrderTrades(ctx, q)
return err2
}

err = GeneralLiteBackoff(ctx, op)
return trades, err
}

func QueryAccountUntilSuccessful(
ctx context.Context, ex types.ExchangeAccountService,
) (account *types.Account, err error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/strategy/dca2/active_order_recover.go
Expand Up @@ -10,7 +10,7 @@ import (
)

func (s *Strategy) recoverPeriodically(ctx context.Context) {
s.logger.Info("[DCA] monitor and recover periodically")
s.logger.Info("monitor and recover periodically")
interval := util.MillisecondsJitter(10*time.Minute, 5*60*1000)
ticker := time.NewTicker(interval)
defer ticker.Stop()
Expand Down
2 changes: 1 addition & 1 deletion pkg/strategy/dca2/open_position.go
Expand Up @@ -15,7 +15,7 @@ type cancelOrdersByGroupIDApi interface {
}

func (s *Strategy) placeOpenPositionOrders(ctx context.Context) error {
s.logger.Infof("[DCA] start placing open position orders")
s.logger.Infof("start placing open position orders")
price, err := getBestPriceUntilSuccess(ctx, s.ExchangeSession.Exchange, s.Symbol)
if err != nil {
return err
Expand Down
55 changes: 36 additions & 19 deletions pkg/strategy/dca2/strategy.go
Expand Up @@ -203,15 +203,15 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.

// order executor
s.OrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
s.logger.Infof("[DCA] POSITION UPDATE: %s", s.Position.String())
s.logger.Infof("POSITION UPDATE: %s", s.Position.String())
bbgo.Sync(ctx, s)

// update take profit price here
s.updateTakeProfitPrice()
})

s.OrderExecutor.ActiveMakerOrders().OnFilled(func(o types.Order) {
s.logger.Infof("[DCA] FILLED ORDER: %s", o.String())
s.logger.Infof("FILLED ORDER: %s", o.String())
openPositionSide := types.SideTypeBuy
takeProfitSide := types.SideTypeSell

Expand All @@ -221,7 +221,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
case takeProfitSide:
s.emitNextState(WaitToOpenPosition)
default:
s.logger.Infof("[DCA] unsupported side (%s) of order: %s", o.Side, o)
s.logger.Infof("unsupported side (%s) of order: %s", o.Side, o)
}

// update metrics when filled
Expand All @@ -244,7 +244,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
})

session.UserDataStream.OnAuth(func() {
s.logger.Info("[DCA] user data stream authenticated")
s.logger.Info("user data stream authenticated")
time.AfterFunc(3*time.Second, func() {
if isInitialize := s.initializeNextStateC(); !isInitialize {

Expand All @@ -255,16 +255,29 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.updateState(WaitToOpenPosition)
} else {
// recover
if err := s.recover(ctx); err != nil {
s.logger.WithError(err).Error("[DCA] something wrong when state recovering")
return
maxTry := 3
for try := 1; try <= maxTry; try++ {
s.logger.Infof("try #%d recover", try)

err := s.recover(ctx)
if err == nil {
s.logger.Infof("recover successfully at #%d", try)
break
}

s.logger.WithError(err).Warnf("failed to recover at #%d", try)

if try == 3 {
s.logger.Errorf("failed to recover after %d trying, please check it", maxTry)
return
}
}
}

s.logger.Infof("[DCA] state: %d", s.state)
s.logger.Infof("[DCA] position %s", s.Position.String())
s.logger.Infof("[DCA] profit stats %s", s.ProfitStats.String())
s.logger.Infof("[DCA] startTimeOfNextRound %s", s.startTimeOfNextRound)
s.logger.Infof("state: %d", s.state)
s.logger.Infof("position %s", s.Position.String())
s.logger.Infof("profit stats %s", s.ProfitStats.String())
s.logger.Infof("startTimeOfNextRound %s", s.startTimeOfNextRound)

s.updateTakeProfitPrice()

Expand Down Expand Up @@ -299,17 +312,17 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
func (s *Strategy) updateTakeProfitPrice() {
takeProfitRatio := s.TakeProfitRatio
s.takeProfitPrice = s.Market.TruncatePrice(s.Position.AverageCost.Mul(fixedpoint.One.Add(takeProfitRatio)))
s.logger.Infof("[DCA] cost: %s, ratio: %s, price: %s", s.Position.AverageCost, takeProfitRatio, s.takeProfitPrice)
s.logger.Infof("cost: %s, ratio: %s, price: %s", s.Position.AverageCost, takeProfitRatio, s.takeProfitPrice)
}

func (s *Strategy) Close(ctx context.Context) error {
s.logger.Infof("[DCA] closing %s dca2", s.Symbol)
s.logger.Infof("closing %s dca2", s.Symbol)

defer s.EmitClosed()

err := s.OrderExecutor.GracefulCancel(ctx)
if err != nil {
s.logger.WithError(err).Errorf("[DCA] there are errors when cancelling orders at close")
s.logger.WithError(err).Errorf("there are errors when cancelling orders at close")
}

bbgo.Sync(ctx, s)
Expand Down Expand Up @@ -370,10 +383,12 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {

// TODO: pagination for it
// query the orders
orders, err := historyService.QueryClosedOrders(ctx, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID)
s.logger.Infof("query %s closed orders from order id #%d", s.Symbol, s.ProfitStats.FromOrderID)
orders, err := retry.QueryClosedOrdersUntilSuccessfulLite(ctx, historyService, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID)
if err != nil {
return err
}
s.logger.Infof("there are %d closed orders from order id #%d", len(orders), s.ProfitStats.FromOrderID)

var rounds []Round
var round Round
Expand All @@ -398,18 +413,20 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
}
}

s.logger.Infof("there are %d rounds from order id #%d", len(rounds), s.ProfitStats.FromOrderID)
for _, round := range rounds {
debugRoundOrders(s.logger, "calculate", round)
var roundOrders []types.Order = round.OpenPositionOrders
roundOrders = append(roundOrders, round.TakeProfitOrder)
for _, order := range roundOrders {
s.logger.Infof("[DCA] calculate profit stats from order: %s", order.String())
s.logger.Infof("calculate profit stats from order: %s", order.String())

// skip no trade orders
if order.ExecutedQuantity.Sign() == 0 {
continue
}

trades, err := queryService.QueryOrderTrades(ctx, types.OrderQuery{
trades, err := retry.QueryOrderTradesUntilSuccessfulLite(ctx, queryService, types.OrderQuery{
Symbol: order.Symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
})
Expand All @@ -419,7 +436,7 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
}

for _, trade := range trades {
s.logger.Infof("[DCA] calculate profit stats from trade: %s", trade.String())
s.logger.Infof("calculate profit stats from trade: %s", trade.String())
s.ProfitStats.AddTrade(trade)
}
}
Expand All @@ -430,7 +447,7 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
// store into persistence
bbgo.Sync(ctx, s)

s.logger.Infof("[DCA] profit stats:\n%s", s.ProfitStats.String())
s.logger.Infof("profit stats:\n%s", s.ProfitStats.String())

// emit profit
s.EmitProfit(s.ProfitStats)
Expand Down
2 changes: 1 addition & 1 deletion pkg/strategy/dca2/take_profit.go
Expand Up @@ -8,7 +8,7 @@ import (
)

func (s *Strategy) placeTakeProfitOrders(ctx context.Context) error {
s.logger.Info("[DCA] start placing take profit orders")
s.logger.Info("start placing take profit orders")
order := generateTakeProfitOrder(s.Market, s.TakeProfitRatio, s.Position, s.OrderGroupID)
createdOrders, err := s.OrderExecutor.SubmitOrders(ctx, order)
if err != nil {
Expand Down

0 comments on commit 36b576b

Please sign in to comment.