This repository has been archived by the owner on Nov 2, 2018. It is now read-only.
Make wallet rebroadcast transactions until they are confirmed #2611
Open
ChrisSchinnerl
wants to merge
12
commits into
master
Choose a base branch
from
wallet-rebroadcast
base: master
Could not load branches
Branch not found: {{ refName }}
Could not load tags
Nothing to show
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
41eeb45
Make wallet rebroadcast transactions until they are confirmed
ChrisSchinnerl f707467
move broadcast code to new file and use RespendTimeout instead of a t…
ChrisSchinnerl 6d54370
fix failing TestValidRevertedTransaction
ChrisSchinnerl cdc217b
fix race condition
ChrisSchinnerl dd3e2f7
persist tracked transaction sets
ChrisSchinnerl 4fc4302
make tpool broadcast transactions if there is a duplicate transaction…
ChrisSchinnerl 8a023f0
added api integration test
ChrisSchinnerl 538b838
export rebroadcast timeout for other modules and skip pruning testcase
ChrisSchinnerl 8c3091a
fix segfault
ChrisSchinnerl 3164e5b
reset txn height after rebroadcasting txn
ChrisSchinnerl 4635265
move broadcast out of lockedTryTransactionSet
ChrisSchinnerl 00e9604
changes requested in review
ChrisSchinnerl File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package modules | ||
|
||
import "github.com/NebulousLabs/Sia/types" | ||
|
||
// Consts that are required by multiple modules | ||
const ( | ||
// maxTxnAge determines the maximum age of a transaction (in block height) | ||
// allowed before the transaction is pruned from the transaction pool. | ||
MaxTxnAge = types.BlockHeight(24) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -331,21 +331,38 @@ func (tp *TransactionPool) AcceptTransactionSet(ts []types.Transaction) error { | |
return errors.New("consensus set does not support LockedTryTransactionSet method") | ||
} | ||
|
||
return cs.LockedTryTransactionSet(func(txnFn func(txns []types.Transaction) (modules.ConsensusChange, error)) error { | ||
tp.log.Println("Beginning broadcast of transaction set") | ||
tp.log.Println("Beginning broadcast of transaction set") | ||
err := cs.LockedTryTransactionSet(func(txnFn func(txns []types.Transaction) (modules.ConsensusChange, error)) error { | ||
tp.mu.Lock() | ||
defer tp.mu.Unlock() | ||
err := tp.acceptTransactionSet(ts, txnFn) | ||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
tp.log.Println("Transaction set broadcast has failed") | ||
return err | ||
} | ||
go tp.gateway.Broadcast("RelayTransactionSet", ts, tp.gateway.Peers()) | ||
// Notify subscribers of an accepted transaction set | ||
tp.updateSubscribersTransactions() | ||
tp.log.Println("Transaction set broadcast appears to have succeeded") | ||
return nil | ||
}) | ||
// In case of certain errors we still want to broadcast the set | ||
if err == nil || err == modules.ErrDuplicateTransactionSet || err == errLowMinerFees { | ||
// This set was broadcasted before if err != nil. We need to update | ||
// it's seen txn height when we rebroadcast it. If we don't do | ||
// that, the transaction will be pruned from the tpool and they | ||
// might no longer show up in the wallet while still beingt | ||
// tracked. | ||
if err != nil { | ||
tp.mu.Lock() | ||
for _, txn := range ts { | ||
tp.transactionHeights[txn.ID()] = tp.blockHeight | ||
} | ||
tp.mu.Unlock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah I missed this. This should happen under the same lock that you update the subscribers with. |
||
} | ||
go tp.gateway.Broadcast("RelayTransactionSet", ts, tp.gateway.Peers()) | ||
tp.log.Println("Transaction set broadcast appears to have succeeded") | ||
} else { | ||
tp.log.Println("Transaction set broadcast has failed") | ||
} | ||
return err | ||
} | ||
|
||
// relayTransactionSet is an RPC that accepts a transaction set from a peer. If | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
package wallet | ||
|
||
import ( | ||
"github.com/NebulousLabs/Sia/crypto" | ||
"github.com/NebulousLabs/Sia/modules" | ||
"github.com/NebulousLabs/Sia/types" | ||
"github.com/NebulousLabs/bolt" | ||
) | ||
|
||
// broadcastedTSet is a helper struct to keep track of transaction sets and to | ||
// help rebroadcast them. | ||
type broadcastedTSet struct { | ||
firstTry types.BlockHeight // first time the tSet was broadcasted | ||
lastTry types.BlockHeight // last time the tSet was broadcasted | ||
confirmedTxn map[types.TransactionID]bool // tracks confirmed txns of set | ||
transactions []types.Transaction // the tSet | ||
id modules.TransactionSetID // the tSet's ID | ||
w *Wallet | ||
} | ||
|
||
// persistBTS is the on-disk version of the broadcastedTSets structure. This is | ||
// necessary since we can't marshal a map directly. Instead we make sure that | ||
// confirmedTxn[i] corresponds to the confirmation state of transactions[i]. | ||
type persistBTS struct { | ||
FirstTry types.BlockHeight // first time the tSet was broadcasted | ||
LastTry types.BlockHeight // last time the tSet was broadcasted | ||
ConfirmedTxn []bool // tracks confirmed txns of set | ||
Transactions []types.Transaction // the tSet | ||
} | ||
|
||
// markConfirmation is a helper function that sets a certain transactions to markConfirmation | ||
// or unconfirmed. It also updates the state on disk. | ||
func (bts *broadcastedTSet) markConfirmation(txid types.TransactionID, confirmed bool) error { | ||
bts.confirmedTxn[txid] = confirmed | ||
return dbPutBroadcastedTSet(bts.w.dbTx, *bts) | ||
} | ||
|
||
// deleteBroadcastedTSet removes a broadcastedTSet from the wallet and disk | ||
func (w *Wallet) deleteBroadcastedTSet(tSetID modules.TransactionSetID) error { | ||
// Remove it from wallet | ||
delete(w.broadcastedTSets, tSetID) | ||
|
||
// Remove it from disk | ||
if err := dbDeleteBroadcastedTSet(w.dbTx, tSetID); err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
// newBroadcastedTSet creates a broadcastedTSet from a normal tSet | ||
func (w *Wallet) newBroadcastedTSet(tSet []types.Transaction) (bts *broadcastedTSet, err error) { | ||
bts = &broadcastedTSet{ | ||
w: w, | ||
} | ||
// Set the height of the first and last try | ||
bts.firstTry, err = dbGetConsensusHeight(w.dbTx) | ||
if err != nil { | ||
return | ||
} | ||
bts.lastTry = bts.firstTry | ||
|
||
// Initialize confirmedTxn and transactions | ||
bts.confirmedTxn = make(map[types.TransactionID]bool) | ||
for _, txn := range tSet { | ||
bts.confirmedTxn[txn.ID()] = false | ||
bts.transactions = append(bts.transactions, txn) | ||
} | ||
|
||
// Persist the new tSet | ||
bts.id = modules.TransactionSetID(crypto.HashAll(tSet)) | ||
if err := dbPutBroadcastedTSet(w.dbTx, *bts); err != nil { | ||
return nil, err | ||
} | ||
return | ||
} | ||
|
||
// rebroadcastOldTransaction rebroadcasts transactions that haven't been | ||
// confirmed within rebroadcastInterval blocks | ||
func (w *Wallet) rebroadcastOldTransactions(tx *bolt.Tx, cc modules.ConsensusChange) error { | ||
// Get the current consensus height | ||
consensusHeight, err := dbGetConsensusHeight(tx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Build an index to quickly map a transaction to a set in broadcastedTSets | ||
broadcastedTxns := make(map[types.TransactionID]modules.TransactionSetID) | ||
for tSetID, bts := range w.broadcastedTSets { | ||
for _, txn := range bts.transactions { | ||
broadcastedTxns[txn.ID()] = tSetID | ||
} | ||
} | ||
|
||
// Mark reverted transactions as not confirmed | ||
for _, block := range cc.RevertedBlocks { | ||
for _, txn := range block.Transactions { | ||
if tSetID, exists := broadcastedTxns[txn.ID()]; exists { | ||
bts := w.broadcastedTSets[tSetID] | ||
if err := bts.markConfirmation(txn.ID(), false); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Mark applied transactions as confirmed | ||
for _, block := range cc.AppliedBlocks { | ||
for _, txn := range block.Transactions { | ||
if tSetID, exists := broadcastedTxns[txn.ID()]; exists { | ||
bts := w.broadcastedTSets[tSetID] | ||
if err := bts.markConfirmation(txn.ID(), true); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Check if all transactions of the set are confirmed | ||
for tSetID, bts := range w.broadcastedTSets { | ||
confirmed := true | ||
for _, c := range bts.confirmedTxn { | ||
if !c { | ||
confirmed = false | ||
break | ||
} | ||
} | ||
// If the transaction set has been confirmed for one broadcast cycle it | ||
// should be safe to remove it | ||
if confirmed && consensusHeight >= bts.lastTry+RebroadcastInterval { | ||
if err := w.deleteBroadcastedTSet(tSetID); err != nil { | ||
return err | ||
} | ||
continue | ||
} | ||
// If the transaction set has been confirmed recently we wait a little | ||
// bit longer before we remove it | ||
if confirmed { | ||
continue | ||
} | ||
// If the transaction set is not confirmed and hasn't been broadcasted | ||
// for rebroadcastInterval blocks we try to broadcast it again | ||
if consensusHeight >= bts.lastTry+RebroadcastInterval { | ||
bts.lastTry = consensusHeight | ||
go func(tSet []types.Transaction) { | ||
if err := w.tpool.AcceptTransactionSet(tSet); err != nil { | ||
w.log.Println("WARNING: Rebroadcast failed: ", err) | ||
} | ||
}(bts.transactions) | ||
// Delete the transaction set once we have tried for RespendTimeout | ||
// blocks | ||
if consensusHeight >= bts.firstTry+RebroadcastTimeout { | ||
if err := w.deleteBroadcastedTSet(tSetID); err != nil { | ||
w.log.Println("ERROR: Failed to delete broadcasted TSet from db: ", err) | ||
} | ||
} | ||
} | ||
} | ||
return nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that there are other consts throughout the modules package, we may want to collect them all here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should that be done in the same PR?