Skip to content

Commit

Permalink
store/tikv: Make maxTxnTimeUse a configurable value (#10480)
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored and zz-jason committed Jul 3, 2019
1 parent 3affa57 commit 9ddab59
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 16 deletions.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ type TiKVClient struct {
GrpcKeepAliveTimeout uint `toml:"grpc-keepalive-timeout" json:"grpc-keepalive-timeout"`
// CommitTimeout is the max time which command 'commit' will wait.
CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"`
// MaxTxnTimeUse is the max time a Txn may use (in seconds) from its startTS to commitTS.
MaxTxnTimeUse uint `toml:"max-txn-time-use" json:"max-txn-time-use"`
}

// Plugin is the config for plugin
Expand Down Expand Up @@ -357,6 +359,7 @@ var defaultConf = Config{
GrpcKeepAliveTime: 10,
GrpcKeepAliveTimeout: 3,
CommitTimeout: "41s",
MaxTxnTimeUse: 590,
},
Binlog: Binlog{
WriteTimeout: "15s",
Expand Down
5 changes: 5 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ grpc-keepalive-timeout = 3
# max time for commit command, must be twice bigger than raft election timeout.
commit-timeout = "41s"

# The max time a Txn may use (in seconds) from its startTS to commitTS.
# We use it to guarantee GC worker will not influence any active txn. Please make sure that this
# value is less than gc_life_time - 10s.
max-txn-time-use = 590

[txn-local-latches]
# Enable local latches for transactions. Enable it when
# there are lots of conflicts between transactions.
Expand Down
36 changes: 20 additions & 16 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx/binloginfo"
Expand Down Expand Up @@ -79,6 +80,11 @@ type twoPhaseCommitter struct {
connID uint64 // connID is used for log.
cleanWg sync.WaitGroup
detail *execdetails.CommitDetails

// The max time a Txn may use (in ms) from its startTS to commitTS.
// We use it to guarantee GC worker will not influence any active txn. The value
// should be less than GC life time.
maxTxnTimeUse uint64
}

// newTwoPhaseCommitter creates a twoPhaseCommitter.
Expand Down Expand Up @@ -159,20 +165,23 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
return nil, errors.Trace(err)
}

// Convert from sec to ms
maxTxnTimeUse := uint64(config.GetGlobalConfig().TiKVClient.MaxTxnTimeUse) * 1000
commitDetail := &execdetails.CommitDetails{WriteSize: size, WriteKeys: len(keys)}
metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys))
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))
return &twoPhaseCommitter{
store: txn.store,
txn: txn,
startTS: txn.StartTS(),
keys: keys,
mutations: mutations,
lockTTL: txnLockTTL(txn.startTime, size),
priority: getTxnPriority(txn),
syncLog: getTxnSyncLog(txn),
connID: connID,
detail: commitDetail,
store: txn.store,
txn: txn,
startTS: txn.StartTS(),
keys: keys,
mutations: mutations,
lockTTL: txnLockTTL(txn.startTime, size),
priority: getTxnPriority(txn),
syncLog: getTxnSyncLog(txn),
connID: connID,
detail: commitDetail,
maxTxnTimeUse: maxTxnTimeUse,
}, nil
}

Expand Down Expand Up @@ -593,11 +602,6 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionCleanup, keys)
}

// The max time a Txn may use (in ms) from its startTS to commitTS.
// We use it to guarantee GC worker will not influence any active txn. The value
// should be less than `gcRunInterval`.
const maxTxnTimeUse = 590000

func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error {
err := c.execute(ctx)
if err != nil {
Expand Down Expand Up @@ -677,7 +681,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
return errors.Trace(err)
}

if c.store.oracle.IsExpired(c.startTS, maxTxnTimeUse) {
if c.store.oracle.IsExpired(c.startTS, c.maxTxnTimeUse) {
err = errors.Errorf("conn%d txn takes too much time, txnStartTS: %d, comm: %d",
c.connID, c.startTS, c.commitTS)
return errors.Annotate(err, txnRetryableMark)
Expand Down

0 comments on commit 9ddab59

Please sign in to comment.