Skip to content

Commit

Permalink
Buck review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
martonp committed May 2, 2024
1 parent 00259ff commit 8c8fbf5
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 338 deletions.
88 changes: 26 additions & 62 deletions client/asset/btc/btc.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,10 +825,6 @@ type baseWallet struct {
txHistoryDB atomic.Value // *BadgerTxDB

ar *AddressRecycler

// subprocessWg must be incremented every time the wallet starts a
// goroutine that will use node or the tx history DB.
subprocessWg sync.WaitGroup
}

func (w *baseWallet) fallbackFeeRate() uint64 {
Expand Down Expand Up @@ -1394,7 +1390,7 @@ func (btc *baseWallet) Info() *asset.WalletInfo {
}

func (btc *baseWallet) txHistoryDBPath(walletID string) string {
return filepath.Join(btc.walletDir, fmt.Sprintf("txhistory-%s.db", walletID))
return filepath.Join(btc.walletDir, fmt.Sprintf("txhistorydb-%s", walletID))
}

// findExistingAddressBasedTxHistoryDB finds the path of a tx history db that
Expand All @@ -1412,7 +1408,7 @@ func (btc *baseWallet) findExistingAddressBasedTxHistoryDB() (string, error) {
return "", fmt.Errorf("error reading wallet directory: %w", err)
}

pattern := regexp.MustCompile(`^txhistory-(.+)\.db$`)
pattern := regexp.MustCompile(`^txhistorydb-(.+)$`)

for _, entry := range entries {
if !entry.IsDir() {
Expand All @@ -1437,7 +1433,7 @@ func (btc *baseWallet) findExistingAddressBasedTxHistoryDB() (string, error) {
return "", nil
}

func (btc *baseWallet) startTxHistoryDB() error {
func (btc *baseWallet) startTxHistoryDB(ctx context.Context) (*sync.WaitGroup, error) {
var dbPath string
fingerPrint, err := btc.node.fingerprint()
if err == nil && fingerPrint != "" {
Expand All @@ -1447,7 +1443,7 @@ func (btc *baseWallet) startTxHistoryDB() error {
if dbPath == "" {
addressPath, err := btc.findExistingAddressBasedTxHistoryDB()
if err != nil {
return err
return nil, err
}
if addressPath != "" {
dbPath = addressPath
Expand All @@ -1457,30 +1453,24 @@ func (btc *baseWallet) startTxHistoryDB() error {
if dbPath == "" {
depositAddr, err := btc.DepositAddress()
if err != nil {
return fmt.Errorf("error getting deposit address: %w", err)
return nil, fmt.Errorf("error getting deposit address: %w", err)
}
dbPath = btc.txHistoryDBPath(depositAddr)
}

btc.log.Debugf("Using tx history db at %s", dbPath)

db, err := NewBadgerTxDB(dbPath, btc.log)
db := NewBadgerTxDB(dbPath, btc.log)
btc.txHistoryDB.Store(db)

wg, err := db.Connect(ctx)
if err != nil {
return fmt.Errorf("error opening tx history db: %w", err)
return nil, err
}

var success bool
defer func() {
if !success {
db.Close()
}
}()

btc.txHistoryDB.Store(db)

pendingTxs, err := db.GetPendingTxs()
if err != nil {
return fmt.Errorf("failed to load unconfirmed txs: %v", err)
return nil, fmt.Errorf("failed to load unconfirmed txs: %v", err)
}

btc.pendingTxsMtx.Lock()
Expand All @@ -1498,13 +1488,12 @@ func (btc *baseWallet) startTxHistoryDB() error {
if errors.Is(err, ErrNeverQueried) {
lastQuery = 0
} else if err != nil {
return fmt.Errorf("failed to load last query time: %v", err)
return nil, fmt.Errorf("failed to load last query time: %v", err)
}

btc.receiveTxLastQuery.Store(lastQuery)

success = true
return nil
return wg, nil
}

// connect is shared between Wallet implementations that may have different
Expand Down Expand Up @@ -1554,55 +1543,36 @@ func (btc *baseWallet) connect(ctx context.Context) (*sync.WaitGroup, error) {
// Connect connects the wallet to the btc.Wallet backend and starts monitoring
// blocks and peers. Satisfies the dex.Connector interface.
func (btc *intermediaryWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) {
// A separate baseCtx is used to ensure that all subprocesses are complete
// before the node is shut down.
baseCtx, killBase := context.WithCancel(context.Background())

var success bool
defer func() {
if !success {
killBase()
}
}()

baseWg, err := btc.connect(baseCtx)
wg, err := btc.connect(ctx)
if err != nil {
return nil, err
}

err = btc.startTxHistoryDB()
dbWG, err := btc.startTxHistoryDB(ctx)
if err != nil {
return nil, err
}

success = true
wg.Add(1)
go func() {
defer wg.Done()
dbWG.Wait()
}()

btc.subprocessWg.Add(1)
wg.Add(1)
go func() {
defer btc.subprocessWg.Done()
defer wg.Done()
btc.watchBlocks(ctx)
btc.rf.CancelRedemptionSearches()
}()

btc.subprocessWg.Add(1)
wg.Add(1)
go func() {
defer btc.subprocessWg.Done()
defer wg.Done()
btc.monitorPeers(ctx)
}()

baseWg.Add(1)
go func() {
defer baseWg.Done()
<-ctx.Done()
btc.subprocessWg.Wait()
killBase()
db := btc.txDB()
if db != nil {
db.Close()
}
}()

return baseWg, nil
return wg, nil
}

// Reconfigure attempts to reconfigure the wallet.
Expand Down Expand Up @@ -4100,9 +4070,7 @@ func (btc *baseWallet) AuditContract(coinID, contract, txData dex.Bytes, rebroad
// Broadcast the transaction, but do not block because this is not required
// and does not affect the audit result.
if rebroadcast && tx != nil {
btc.subprocessWg.Add(1)
go func() {
defer btc.subprocessWg.Done()
if hashSent, err := btc.node.sendRawTransaction(tx); err != nil {
btc.log.Debugf("Rebroadcasting counterparty contract %v (THIS MAY BE NORMAL): %v", txHash, err)
} else if !hashSent.IsEqual(txHash) {
Expand Down Expand Up @@ -4680,11 +4648,7 @@ func (btc *intermediaryWallet) reportNewTip(ctx context.Context, newTip *BlockVe
btc.log.Debugf("tip change: %d (%s) => %d (%s)", prevTip.Height, prevTip.Hash, newTip.Height, newTip.Hash)
btc.emit.TipChange(uint64(newTip.Height))

btc.subprocessWg.Add(1)
go func() {
defer btc.subprocessWg.Done()
btc.checkPendingTxs(uint64(newTip.Height))
}()
go btc.checkPendingTxs(uint64(newTip.Height))

btc.rf.ReportNewTip(ctx, prevTip, newTip)
}
Expand Down
96 changes: 45 additions & 51 deletions client/asset/btc/txdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,12 @@ func txKey(txid string) []byte {

type BadgerTxDB struct {
*badger.DB
log dex.Logger
seq *badger.Sequence

running atomic.Bool
wg *sync.WaitGroup
ctx context.Context
die context.CancelFunc
filePath string
log dex.Logger
seq *badger.Sequence
running atomic.Bool
wg *sync.WaitGroup
ctx context.Context
}

// badgerLoggerWrapper wraps dex.Logger and translates Warnf to Warningf to
Expand All @@ -104,67 +103,73 @@ func (log *badgerLoggerWrapper) Warningf(s string, a ...interface{}) {
log.Warnf(s, a...)
}

func NewBadgerTxDB(filePath string, log dex.Logger) (*BadgerTxDB, error) {
func NewBadgerTxDB(filePath string, log dex.Logger) *BadgerTxDB {
return &BadgerTxDB{
filePath: filePath,
log: log,
wg: new(sync.WaitGroup),
}
}

func (db *BadgerTxDB) Connect(ctx context.Context) (*sync.WaitGroup, error) {
// If memory use is a concern, could try
// .WithValueLogLoadingMode(options.FileIO) // default options.MemoryMap
// .WithMaxTableSize(sz int64); // bytes, default 6MB
// .WithValueLogFileSize(sz int64), bytes, default 1 GB, must be 1MB <= sz <= 1GB
opts := badger.DefaultOptions(filePath).WithLogger(&badgerLoggerWrapper{log})
db, err := badger.Open(opts)
opts := badger.DefaultOptions(db.filePath).WithLogger(&badgerLoggerWrapper{db.log})
var err error
db.DB, err = badger.Open(opts)
if err == badger.ErrTruncateNeeded {
// Probably a Windows thing.
// https://github.com/dgraph-io/badger/issues/744
log.Warnf("newTxHistoryStore badger db: %v", err)
db.log.Warnf("newTxHistoryStore badger db: %v", err)
// Try again with value log truncation enabled.
opts.Truncate = true
log.Warnf("Attempting to reopen badger DB with the Truncate option set...")
db, err = badger.Open(opts)
db.log.Warnf("Attempting to reopen badger DB with the Truncate option set...")
db.DB, err = badger.Open(opts)
}
if err != nil {
return nil, err
}

seq, err := db.GetSequence([]byte("seq"), 10)
db.ctx = ctx
db.seq, err = db.GetSequence([]byte("seq"), 10)
if err != nil {
return nil, err
}

ctx, die := context.WithCancel(context.Background())
db.running.Store(true)

badgerDB := &BadgerTxDB{
DB: db,
log: log,
seq: seq,
wg: new(sync.WaitGroup),
die: die,
}
badgerDB.running.Store(true)
var wg sync.WaitGroup

badgerDB.wg.Add(1)
wg.Add(1)
go func() {
defer badgerDB.wg.Done()
defer wg.Done()

ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := db.RunValueLogGC(0.5)
if err != nil && !errors.Is(err, badger.ErrNoRewrite) {
log.Errorf("garbage collection error: %v", err)
db.log.Errorf("garbage collection error: %v", err)
}
case <-ctx.Done():
db.running.Store(false)
db.wg.Wait()
db.Close()
return
}
}
}()

return badgerDB, nil
return &wg, nil
}

// badgerDB returns ErrConflict when a read happening in a update (read/write)
// transaction is stale. This function retries updates multiple times in
// case of conflicts.
func handleConflictWithBackoff(update func() error) error {
func (db *BadgerTxDB) handleConflictWithBackoff(update func() error) error {
maxRetries := 10
sleepTime := 5 * time.Millisecond

Expand All @@ -175,24 +180,21 @@ func handleConflictWithBackoff(update func() error) error {
if err != badger.ErrConflict {
return err
}
time.Sleep(10 * time.Millisecond)
time.Sleep(sleepTime)
}

return err
}

func (db *BadgerTxDB) newBlockKey(txn *badger.Txn, blockNumber uint64) ([]byte, error) {
getKey := func(i uint64) []byte {
if blockNumber == 0 {
return pendingKey(i)
}
return blockKey(blockNumber, i)
}
func (db *BadgerTxDB) newBlockKey(blockNumber uint64) ([]byte, error) {
seq, err := db.seq.Next()
if err != nil {
return nil, err
}
return getKey(seq), nil
if blockNumber == 0 {
return pendingKey(seq), nil
}
return blockKey(blockNumber, seq), nil
}

func hasPrefix(b, prefix []byte) bool {
Expand Down Expand Up @@ -233,7 +235,7 @@ func (db *BadgerTxDB) storeTx(tx *ExtendedWalletTx) error {
}

if key == nil {
key, err = db.newBlockKey(txn, tx.BlockNumber)
key, err = db.newBlockKey(tx.BlockNumber)
if err != nil {
return err
}
Expand Down Expand Up @@ -261,7 +263,7 @@ func (db *BadgerTxDB) StoreTx(tx *ExtendedWalletTx) error {
return fmt.Errorf("database is not running")
}

return handleConflictWithBackoff(func() error { return db.storeTx(tx) })
return db.handleConflictWithBackoff(func() error { return db.storeTx(tx) })
}

func (db *BadgerTxDB) markTxAsSubmitted(txID string) error {
Expand Down Expand Up @@ -313,7 +315,7 @@ func (db *BadgerTxDB) MarkTxAsSubmitted(txID string) error {
return fmt.Errorf("database is not running")
}

return handleConflictWithBackoff(func() error { return db.markTxAsSubmitted(txID) })
return db.handleConflictWithBackoff(func() error { return db.markTxAsSubmitted(txID) })
}

// GetTxs retrieves n transactions from the database. refID optionally
Expand Down Expand Up @@ -472,7 +474,7 @@ func (db *BadgerTxDB) RemoveTx(txID string) error {
return fmt.Errorf("database is not running")
}

return handleConflictWithBackoff(func() error { return db.removeTx(txID) })
return db.handleConflictWithBackoff(func() error { return db.removeTx(txID) })
}

func (db *BadgerTxDB) setLastReceiveTxQuery(block uint64) error {
Expand All @@ -495,7 +497,7 @@ func (db *BadgerTxDB) SetLastReceiveTxQuery(block uint64) error {
return fmt.Errorf("database is not running")
}

return handleConflictWithBackoff(func() error { return db.setLastReceiveTxQuery(block) })
return db.handleConflictWithBackoff(func() error { return db.setLastReceiveTxQuery(block) })
}

const ErrNeverQueried = dex.ErrorKind("never queried")
Expand Down Expand Up @@ -527,11 +529,3 @@ func (db *BadgerTxDB) GetLastReceiveTxQuery() (uint64, error) {
})
return block, err
}

// Close closes the database.
func (db *BadgerTxDB) Close() error {
db.running.Store(false)
db.die()
db.wg.Wait()
return db.DB.Close()
}

0 comments on commit 8c8fbf5

Please sign in to comment.