Skip to content

Commit

Permalink
fix(da): increase backoff for mempool errors (#1535)
Browse files Browse the repository at this point in the history
## Overview

This PR increases the backoff duration for mempool errors, avoiding the
scenario where a temporary mempool error e.g due to congestion can
exhaust all retries. Fixes #1522

## Checklist

- [ ] New and updated code has appropriate documentation
- [ ] New and updated code has new and/or updated testing
- [ ] Required CI checks are passing
- [ ] Visual proof for any user facing features like CLI or
documentation updates
- [ ] Linked issues closed with keywords


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a default expiration time for mempool transactions to
improve transaction management.
- Enhanced block submission with handling for `maxBlobSize` and
`gasPrice`, offering better flexibility and efficiency.
- Added a new flag in `rollkit start` command for setting DA gas price
multiplier, facilitating more robust transaction retries.
- **Improvements**
- Extended the submission timeout period from 60 to 90 seconds, allowing
for more reliable block submissions under network delays.
- Enriched error messaging and introduced new status codes for block
submission to improve debugging and operational transparency.
- **Refactor**
- Replaced `MockDA` struct with `mock.MockDA` in tests for a more
streamlined testing approach.
- Implemented a mock `DA` interface in `da/mock/mock.go`, enhancing
testability and development efficiency.
- **Tests**
- Updated various test functions to accommodate new parameters and
improved error handling, ensuring the reliability of changes.
- **Documentation**
- Documented the addition of the DA gas price multiplier flag in
`rollkit start` command usage.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
tuxcanfly committed Feb 29, 2024
1 parent a5c1173 commit bec9ad0
Show file tree
Hide file tree
Showing 13 changed files with 238 additions and 90 deletions.
40 changes: 39 additions & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const defaultDABlockTime = 15 * time.Second
// defaultBlockTime is used only if BlockTime is not configured for manager
const defaultBlockTime = 1 * time.Second

// defaultMempoolTTL is the number of blocks until transaction is dropped from mempool
const defaultMempoolTTL = 25

// maxSubmitAttempts defines how many times Rollkit will re-try to publish block to DA layer.
// This is temporary solution. It will be removed in future versions.
const maxSubmitAttempts = 30
Expand Down Expand Up @@ -164,6 +167,11 @@ func NewManager(
conf.BlockTime = defaultBlockTime
}

if conf.DAMempoolTTL == 0 {
logger.Info("Using default mempool ttl", "MempoolTTL", defaultMempoolTTL)
conf.DAMempoolTTL = defaultMempoolTTL
}

proposerAddress, err := getAddress(proposerKey)
if err != nil {
return nil, err
Expand Down Expand Up @@ -835,8 +843,15 @@ func (m *Manager) submitBlocksToDA(ctx context.Context) error {
blocksToSubmit := m.pendingBlocks.getPendingBlocks()
numSubmittedBlocks := 0
attempt := 0
maxBlobSize, err := m.dalc.DA.MaxBlobSize(ctx)
if err != nil {
return err
}
initialMaxBlobSize := maxBlobSize
initialGasPrice := m.dalc.GasPrice
gasPrice := m.dalc.GasPrice
for ctx.Err() == nil && !submittedAllBlocks && attempt < maxSubmitAttempts {
res := m.dalc.SubmitBlocks(ctx, blocksToSubmit)
res := m.dalc.SubmitBlocks(ctx, blocksToSubmit, maxBlobSize, gasPrice)
switch res.Code {
case da.StatusSuccess:
m.logger.Info("successfully submitted Rollkit blocks to DA layer", "daHeight", res.DAHeight, "count", res.SubmittedCount)
Expand All @@ -850,6 +865,29 @@ func (m *Manager) submitBlocksToDA(ctx context.Context) error {
}
m.pendingBlocks.removeSubmittedBlocks(submittedBlocks)
blocksToSubmit = notSubmittedBlocks
// reset submission options when successful
// scale back gasPrice gradually
backoff = initialBackoff
maxBlobSize = initialMaxBlobSize
gasPrice = gasPrice / m.dalc.GasMultiplier
if gasPrice < initialGasPrice {
gasPrice = initialGasPrice
}
m.logger.Debug("resetting DA layer submission options", "backoff", backoff, "gasPrice", gasPrice, "maxBlobSize", maxBlobSize)
case da.StatusNotIncludedInBlock, da.StatusAlreadyInMempool:
m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt)
backoff = m.conf.DABlockTime * time.Duration(m.conf.DAMempoolTTL)
if m.dalc.GasMultiplier != -1 && gasPrice != -1 {
gasPrice = gasPrice * m.dalc.GasMultiplier
}
m.logger.Info("retrying DA layer submission with", "backoff", backoff, "gasPrice", gasPrice, "maxBlobSize", maxBlobSize)
time.Sleep(backoff)
backoff = m.exponentialBackoff(backoff)
case da.StatusTooBig:
m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt)
maxBlobSize = maxBlobSize / 4
time.Sleep(backoff)
backoff = m.exponentialBackoff(backoff)
default:
m.logger.Error("DA layer submission failed", "error", res.Message, "attempt", attempt)
time.Sleep(backoff)
Expand Down
51 changes: 48 additions & 3 deletions block/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
package block

import (
"bytes"
"context"
"testing"
"time"

cmtypes "github.com/cometbft/cometbft/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

goDA "github.com/rollkit/go-da"
goDATest "github.com/rollkit/go-da/test"

"github.com/rollkit/rollkit/da"
"github.com/rollkit/rollkit/da/mock"
"github.com/rollkit/rollkit/store"
test "github.com/rollkit/rollkit/test/log"
"github.com/rollkit/rollkit/types"
)

// Returns a minimalistic block manager
func getManager(t *testing.T) *Manager {
func getManager(t *testing.T, backend goDA.DA) *Manager {
logger := test.NewFileLoggerCustom(t, test.TempLogFileName(t, t.Name()))
return &Manager{
dalc: &da.DAClient{DA: goDATest.NewDummyDA(), GasPrice: -1, Logger: logger},
dalc: &da.DAClient{DA: backend, GasPrice: -1, GasMultiplier: -1, Logger: logger},
blockCache: NewBlockCache(),
logger: logger,
}
Expand Down Expand Up @@ -125,11 +129,52 @@ func TestIsDAIncluded(t *testing.T) {
require.True(m.IsDAIncluded(hash))
}

func TestSubmitBlocksToMockDA(t *testing.T) {
ctx := context.Background()

mockDA := &mock.MockDA{}
m := getManager(t, mockDA)
m.conf.DABlockTime = time.Millisecond
m.conf.DAMempoolTTL = 1
m.dalc.GasPrice = 1.0
m.dalc.GasMultiplier = 1.2

t.Run("handle_tx_already_in_mempool", func(t *testing.T) {
var blobs [][]byte
block := types.GetRandomBlock(1, 5)
blob, err := block.MarshalBinary()

require.NoError(t, err)
blobs = append(blobs, blob)
// Set up the mock to
// * throw timeout waiting for tx to be included exactly once
// * wait for tx to drop from mempool exactly DABlockTime * DAMempoolTTL seconds
// * retry with a higher gas price
// * successfully submit
mockDA.On("MaxBlobSize").Return(uint64(12345), nil)
mockDA.
On("Submit", blobs, 1.0, []byte(nil)).
Return([][]byte{}, da.ErrTxTimedout).Once()
mockDA.
On("Submit", blobs, 1.0*1.2, []byte(nil)).
Return([][]byte{}, da.ErrTxAlreadyInMempool).Times(int(m.conf.DAMempoolTTL))
mockDA.
On("Submit", blobs, 1.0*1.2*1.2, []byte(nil)).
Return([][]byte{bytes.Repeat([]byte{0x00}, 8)}, nil)

m.pendingBlocks = NewPendingBlocks()
m.pendingBlocks.addPendingBlock(block)
err = m.submitBlocksToDA(ctx)
require.NoError(t, err)
mockDA.AssertExpectations(t)
})
}

func TestSubmitBlocksToDA(t *testing.T) {
require := require.New(t)
ctx := context.Background()

m := getManager(t)
m := getManager(t, goDATest.NewDummyDA())

maxDABlobSizeLimit, err := m.dalc.DA.MaxBlobSize(ctx)
require.NoError(err)
Expand Down
1 change: 1 addition & 0 deletions cmd/rollkit/docs/rollkit_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rollkit start [flags]
--rollkit.block_time duration block time (for aggregator mode) (default 1s)
--rollkit.da_address string DA address (host:port) (default ":26650")
--rollkit.da_block_time duration DA chain block time (for syncing) (default 15s)
--rollkit.da_gas_multiplier float DA gas price multiplier for retrying blob transactions (default -1)
--rollkit.da_gas_price float DA gas price for blob transactions (default -1)
--rollkit.da_namespace string DA namespace to submit blob transactions
--rollkit.da_start_height uint starting DA block height (for syncing)
Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
FlagDABlockTime = "rollkit.da_block_time"
// FlagDAGasPrice is a flag for specifying the data availability layer gas price
FlagDAGasPrice = "rollkit.da_gas_price"
// FlagDAGasMultiplier is a flag for specifying the data availability layer gas price retry multiplier
FlagDAGasMultiplier = "rollkit.da_gas_multiplier"
// FlagDAStartHeight is a flag for specifying the data availability layer start height
FlagDAStartHeight = "rollkit.da_start_height"
// FlagDANamespace is a flag for specifying the DA namespace ID
Expand Down Expand Up @@ -48,6 +50,7 @@ type NodeConfig struct {
LazyAggregator bool `mapstructure:"lazy_aggregator"`
Instrumentation *cmcfg.InstrumentationConfig `mapstructure:"instrumentation"`
DAGasPrice float64 `mapstructure:"da_gas_price"`
DAGasMultiplier float64 `mapstructure:"da_gas_multiplier"`

// CLI flags
DANamespace string `mapstructure:"da_namespace"`
Expand All @@ -66,6 +69,8 @@ type BlockManagerConfig struct {
DABlockTime time.Duration `mapstructure:"da_block_time"`
// DAStartHeight allows skipping first DAStartHeight-1 blocks when querying for blocks.
DAStartHeight uint64 `mapstructure:"da_start_height"`
// DAMempoolTTL is the number of DA blocks until transaction is dropped from the mempool.
DAMempoolTTL uint64 `mapstructure:"da_mempool_ttl"`
}

// GetNodeConfig translates Tendermint's configuration into Rollkit configuration.
Expand Down Expand Up @@ -102,6 +107,7 @@ func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error {
nc.Aggregator = v.GetBool(FlagAggregator)
nc.DAAddress = v.GetString(FlagDAAddress)
nc.DAGasPrice = v.GetFloat64(FlagDAGasPrice)
nc.DAGasMultiplier = v.GetFloat64(FlagDAGasMultiplier)
nc.DANamespace = v.GetString(FlagDANamespace)
nc.DAStartHeight = v.GetUint64(FlagDAStartHeight)
nc.DABlockTime = v.GetDuration(FlagDABlockTime)
Expand All @@ -124,6 +130,7 @@ func AddFlags(cmd *cobra.Command) {
cmd.Flags().Duration(FlagBlockTime, def.BlockTime, "block time (for aggregator mode)")
cmd.Flags().Duration(FlagDABlockTime, def.DABlockTime, "DA chain block time (for syncing)")
cmd.Flags().Float64(FlagDAGasPrice, def.DAGasPrice, "DA gas price for blob transactions")
cmd.Flags().Float64(FlagDAGasMultiplier, def.DAGasMultiplier, "DA gas price multiplier for retrying blob transactions")
cmd.Flags().Uint64(FlagDAStartHeight, def.DAStartHeight, "starting DA block height (for syncing)")
cmd.Flags().String(FlagDANamespace, def.DANamespace, "DA namespace to submit blob transactions")
cmd.Flags().Bool(FlagLight, def.Light, "run light client")
Expand Down
7 changes: 4 additions & 3 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ var DefaultNodeConfig = NodeConfig{
BlockTime: 1 * time.Second,
DABlockTime: 15 * time.Second,
},
DAAddress: ":26650",
DAGasPrice: -1,
Light: false,
DAAddress: ":26650",
DAGasPrice: -1,
DAGasMultiplier: -1,
Light: false,
HeaderConfig: HeaderConfig{
TrustedHash: "",
},
Expand Down
57 changes: 41 additions & 16 deletions da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"strings"
"time"

"github.com/gogo/protobuf/proto"
Expand All @@ -29,6 +30,21 @@ var (

// ErrBlobSizeOverLimit is used to indicate that the blob size is over limit
ErrBlobSizeOverLimit = errors.New("blob: over size limit")

// ErrTxTimedout is the error message returned by the DA when mempool is congested
ErrTxTimedout = errors.New("timed out waiting for tx to be included in a block")

// ErrTxAlreadyInMempool is the error message returned by the DA when tx is already in mempool
ErrTxAlreadyInMempool = errors.New("tx already in mempool")

// ErrTxIncorrectAccountSequence is the error message returned by the DA when tx has incorrect sequence
ErrTxIncorrectAccountSequence = errors.New("incorrect account sequence")

// ErrTxSizeTooBig is the error message returned by the DA when tx size is too big
ErrTxSizeTooBig = errors.New("tx size is too big")

// ErrContextDeadline is the error message returned by the DA when context deadline exceeds
ErrContextDeadline = errors.New("context deadline")
)

// StatusCode is a type for DA layer return status.
Expand All @@ -42,6 +58,10 @@ const (
StatusUnknown StatusCode = iota
StatusSuccess
StatusNotFound
StatusNotIncludedInBlock
StatusAlreadyInMempool
StatusTooBig
StatusContextDeadline
StatusError
)

Expand Down Expand Up @@ -75,25 +95,17 @@ type ResultRetrieveBlocks struct {

// DAClient is a new DA implementation.
type DAClient struct {
DA goDA.DA
Namespace goDA.Namespace
GasPrice float64
Logger log.Logger
DA goDA.DA
GasPrice float64
GasMultiplier float64
Namespace goDA.Namespace
Logger log.Logger
}

// SubmitBlocks submits blocks to DA.
func (dac *DAClient) SubmitBlocks(ctx context.Context, blocks []*types.Block) ResultSubmitBlocks {
func (dac *DAClient) SubmitBlocks(ctx context.Context, blocks []*types.Block, maxBlobSize uint64, gasPrice float64) ResultSubmitBlocks {
var blobs [][]byte
var blobSize uint64
maxBlobSize, err := dac.DA.MaxBlobSize(ctx)
if err != nil {
return ResultSubmitBlocks{
BaseResult: BaseResult{
Code: StatusError,
Message: "unable to get DA max blob size",
},
}
}
var submitted uint64
for i := range blocks {
blob, err := blocks[i].MarshalBinary()
Expand Down Expand Up @@ -123,11 +135,24 @@ func (dac *DAClient) SubmitBlocks(ctx context.Context, blocks []*types.Block) Re
}
ctx, cancel := context.WithTimeout(ctx, submitTimeout)
defer cancel()
ids, err := dac.DA.Submit(ctx, blobs, dac.GasPrice, dac.Namespace)
ids, err := dac.DA.Submit(ctx, blobs, gasPrice, dac.Namespace)
if err != nil {
status := StatusError
switch {
case strings.Contains(err.Error(), ErrTxTimedout.Error()):
status = StatusNotIncludedInBlock
case strings.Contains(err.Error(), ErrTxAlreadyInMempool.Error()):
status = StatusAlreadyInMempool
case strings.Contains(err.Error(), ErrTxIncorrectAccountSequence.Error()):
status = StatusAlreadyInMempool
case strings.Contains(err.Error(), ErrTxSizeTooBig.Error()):
status = StatusTooBig
case strings.Contains(err.Error(), ErrContextDeadline.Error()):
status = StatusContextDeadline
}
return ResultSubmitBlocks{
BaseResult: BaseResult{
Code: StatusError,
Code: status,
Message: "failed to submit blocks: " + err.Error(),
},
}
Expand Down

0 comments on commit bec9ad0

Please sign in to comment.