Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/asset/{dcr,btc}: RPC wallets TX History #2693

Merged
merged 6 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
174 changes: 130 additions & 44 deletions client/asset/btc/btc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"net/http"
"os"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -787,6 +788,7 @@ type baseWallet struct {
localFeeRate func(context.Context, RawRequester, uint64) (uint64, error)
externalFeeRate func(context.Context, dex.Network) (uint64, error)
decodeAddr dexbtc.AddressDecoder
walletDir string

deserializeTx func([]byte) (*wire.MsgTx, error)
serializeTx func(*wire.MsgTx) ([]byte, error)
Expand Down Expand Up @@ -820,8 +822,7 @@ type baseWallet struct {
// txHistoryDB.
receiveTxLastQuery atomic.Uint64

txHistoryDB atomic.Value // *BadgerTxDB
txHistoryDBPath string
txHistoryDB atomic.Value // *BadgerTxDB

ar *AddressRecycler
}
Expand Down Expand Up @@ -1136,7 +1137,9 @@ func newRPCWallet(requester RawRequester, cfg *BTCCloneCFG, parsedCfg *RPCWallet
baseWallet: btc,
txFeeEstimator: node,
tipRedeemer: node,
txLister: node,
}

w.prepareRedemptionFinder()
return w, nil
}
Expand Down Expand Up @@ -1254,7 +1257,7 @@ func newUnconnectedWallet(cfg *BTCCloneCFG, walletCfg *WalletConfig) (*baseWalle
txVersion: txVersion,
Network: cfg.Network,
pendingTxs: make(map[chainhash.Hash]ExtendedWalletTx),
txHistoryDBPath: filepath.Join(walletDir, "txhistory.db"),
walletDir: walletDir,
ar: addressRecyler,
}
w.cfgV.Store(baseCfg)
Expand Down Expand Up @@ -1296,12 +1299,6 @@ func OpenSPVWallet(cfg *BTCCloneCFG, walletConstructor BTCWalletConstructor) (*E
return nil, err
}

txHistoryDB, err := NewBadgerTxDB(btc.txHistoryDBPath, btc.log.SubLogger("TXHISTORYDB"))
if err != nil {
return nil, fmt.Errorf("failed to create tx history db: %v", err)
}
btc.txHistoryDB.Store(txHistoryDB)

spvw := &spvWallet{
chainParams: cfg.ChainParams,
cfg: walletCfg,
Expand Down Expand Up @@ -1392,6 +1389,113 @@ func (btc *baseWallet) Info() *asset.WalletInfo {
return btc.walletInfo
}

func (btc *baseWallet) txHistoryDBPath(walletID string) string {
return filepath.Join(btc.walletDir, fmt.Sprintf("txhistorydb-%s", walletID))
}

// findExistingAddressBasedTxHistoryDB finds the path of a tx history db that
// was created using an address controlled by the wallet. This should only be
// used for wallets that are unable to generate a fingerprint.
func (btc *baseWallet) findExistingAddressBasedTxHistoryDB() (string, error) {
dir, err := os.Open(btc.walletDir)
if err != nil {
return "", fmt.Errorf("error opening wallet directory: %w", err)
}
defer dir.Close()

entries, err := dir.Readdir(0)
if err != nil {
return "", fmt.Errorf("error reading wallet directory: %w", err)
}

pattern := regexp.MustCompile(`^txhistorydb-(.+)$`)

for _, entry := range entries {
if !entry.IsDir() {
continue
}
buck54321 marked this conversation as resolved.
Show resolved Hide resolved

match := pattern.FindStringSubmatch(entry.Name())
if match == nil {
continue
}

address := match[1]
owns, err := btc.OwnsDepositAddress(address)
if err != nil {
continue
}
if owns {
return filepath.Join(btc.walletDir, entry.Name()), nil
}
}

return "", nil
}

func (btc *baseWallet) startTxHistoryDB(ctx context.Context) (*sync.WaitGroup, error) {
var dbPath string
fingerPrint, err := btc.node.fingerprint()
if err == nil && fingerPrint != "" {
dbPath = btc.txHistoryDBPath(fingerPrint)
}

if dbPath == "" {
addressPath, err := btc.findExistingAddressBasedTxHistoryDB()
if err != nil {
return nil, err
}
if addressPath != "" {
dbPath = addressPath
}
}

if dbPath == "" {
depositAddr, err := btc.DepositAddress()
if err != nil {
return nil, fmt.Errorf("error getting deposit address: %w", err)
}
dbPath = btc.txHistoryDBPath(depositAddr)
}

btc.log.Debugf("Using tx history db at %s", dbPath)

db := NewBadgerTxDB(dbPath, btc.log)
btc.txHistoryDB.Store(db)

wg, err := db.Connect(ctx)
if err != nil {
return nil, err
}

pendingTxs, err := db.GetPendingTxs()
if err != nil {
return nil, fmt.Errorf("failed to load unconfirmed txs: %v", err)
}

btc.pendingTxsMtx.Lock()
for _, tx := range pendingTxs {
txHash, err := chainhash.NewHashFromStr(tx.ID)
if err != nil {
btc.log.Errorf("Invalid txid %v from tx history db: %v", tx.ID, err)
continue
}
btc.pendingTxs[*txHash] = *tx
}
btc.pendingTxsMtx.Unlock()

lastQuery, err := db.GetLastReceiveTxQuery()
if errors.Is(err, ErrNeverQueried) {
lastQuery = 0
} else if err != nil {
return nil, fmt.Errorf("failed to load last query time: %v", err)
}

btc.receiveTxLastQuery.Store(lastQuery)

return wg, nil
}

// connect is shared between Wallet implementations that may have different
// monitoring goroutines or other configuration set after connect. For example
// an asset.Wallet implementation that embeds baseWallet may override Connect to
Expand All @@ -1403,6 +1507,7 @@ func (btc *baseWallet) connect(ctx context.Context) (*sync.WaitGroup, error) {
if err := btc.node.connect(ctx, &wg); err != nil {
return nil, err
}

// Initialize the best block.
bestBlockHdr, err := btc.node.getBestBlockHeader()
if err != nil {
Expand All @@ -1425,39 +1530,6 @@ func (btc *baseWallet) connect(ctx context.Context) (*sync.WaitGroup, error) {
btc.tipMtx.Unlock()
atomic.StoreInt64(&btc.tipAtConnect, btc.currentTip.Height)

if txHistoryDB := btc.txDB(); txHistoryDB != nil {
pendingTxs, err := txHistoryDB.GetPendingTxs()
if err != nil {
return nil, fmt.Errorf("failed to load unconfirmed txs: %v", err)
}

btc.pendingTxsMtx.Lock()
for _, tx := range pendingTxs {
txHash, err := chainhash.NewHashFromStr(tx.ID)
if err != nil {
btc.log.Errorf("Invalid txid %v from tx history db: %v", tx.ID, err)
continue
}
btc.pendingTxs[*txHash] = *tx
}
btc.pendingTxsMtx.Unlock()

lastQuery, err := txHistoryDB.GetLastReceiveTxQuery()
if errors.Is(err, ErrNeverQueried) {
lastQuery = 0
} else if err != nil {
return nil, fmt.Errorf("failed to load last query time: %v", err)
}

btc.receiveTxLastQuery.Store(lastQuery)

wg.Add(1)
go func() {
defer wg.Done()
txHistoryDB.Run(ctx)
}()
}

wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -1476,17 +1548,30 @@ func (btc *intermediaryWallet) Connect(ctx context.Context) (*sync.WaitGroup, er
return nil, err
}

dbWG, err := btc.startTxHistoryDB(ctx)
if err != nil {
return nil, err
}

wg.Add(1)
go func() {
defer wg.Done()
dbWG.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
btc.watchBlocks(ctx)
btc.rf.CancelRedemptionSearches()
}()

wg.Add(1)
go func() {
defer wg.Done()
btc.monitorPeers(ctx)
}()

return wg, nil
}

Expand Down Expand Up @@ -5365,6 +5450,7 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
} else {
blockToQuery = tip - blockQueryBuffer
}

recentTxs, err := btc.txLister.listTransactionsSinceBlock(int32(blockToQuery))
if err != nil {
btc.log.Errorf("Error listing transactions since block %d: %v", blockToQuery, err)
Expand Down Expand Up @@ -5495,7 +5581,7 @@ func (btc *intermediaryWallet) checkPendingTxs(tip uint64) {
// WalletTransaction returns a transaction that either the wallet has made or
// one in which the wallet has received funds. The txID can be either a byte
// reversed tx hash or a hex encoded coin ID.
func (btc *ExchangeWalletSPV) WalletTransaction(ctx context.Context, txID string) (*asset.WalletTransaction, error) {
func (btc *intermediaryWallet) WalletTransaction(ctx context.Context, txID string) (*asset.WalletTransaction, error) {
coinID, err := hex.DecodeString(txID)
if err == nil {
txHash, _, err := decodeCoinID(coinID)
Expand All @@ -5520,7 +5606,7 @@ func (btc *ExchangeWalletSPV) WalletTransaction(ctx context.Context, txID string
// If past is true, the transactions prior to the refID are returned, otherwise
// the transactions after the refID are returned. n is the number of
// transactions to return. If n is <= 0, all the transactions will be returned.
func (btc *ExchangeWalletSPV) TxHistory(n int, refID *string, past bool) ([]*asset.WalletTransaction, error) {
func (btc *intermediaryWallet) TxHistory(n int, refID *string, past bool) ([]*asset.WalletTransaction, error) {
txHistoryDB := btc.txDB()
if txHistoryDB == nil {
return nil, fmt.Errorf("tx database not initialized")
Expand Down
4 changes: 4 additions & 0 deletions client/asset/btc/electrum_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,10 @@ func (ew *electrumWallet) getWalletTransaction(txHash *chainhash.Hash) (*GetTran
}, nil
}

func (ew *electrumWallet) fingerprint() (string, error) {
return "", fmt.Errorf("fingerprint not implemented")
}

// part of the walletTxChecker interface
func (ew *electrumWallet) swapConfirmations(txHash *chainhash.Hash, vout uint32, contract []byte, startTime time.Time) (confs uint32, spent bool, err error) {
// To determine if it is spent, we need the address of the output.
Expand Down
26 changes: 26 additions & 0 deletions client/asset/btc/rpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
methodGetNetworkInfo = "getnetworkinfo"
methodGetBlockchainInfo = "getblockchaininfo"
methodFundRawTransaction = "fundrawtransaction"
methodListSinceBlock = "listsinceblock"
)

// IsTxNotFoundErr will return true if the error indicates that the requested
Expand Down Expand Up @@ -638,6 +639,16 @@ func (wc *rpcClient) listDescriptors(private bool) (*listDescriptorsResult, erro
return descriptors, wc.call(methodListDescriptors, anylist{private}, descriptors)
}

func (wc *rpcClient) listTransactionsSinceBlock(blockHeight int32) ([]btcjson.ListTransactionsResult, error) {
blockHash, err := wc.getBlockHash(int64(blockHeight))
if err != nil {
return nil, fmt.Errorf("getBlockHash error: %w", err)
}

result := new(listTransactionsResult)
return result.Transactions, wc.call(methodListSinceBlock, anylist{blockHash.String()}, result)
martonp marked this conversation as resolved.
Show resolved Hide resolved
}

// privKeyForAddress retrieves the private key associated with the specified
// address.
func (wc *rpcClient) privKeyForAddress(addr string) (*btcec.PrivateKey, error) {
Expand Down Expand Up @@ -910,6 +921,21 @@ func (wc *rpcClient) GetWalletInfo() (*GetWalletInfoResult, error) {
return wi, wc.call(methodGetWalletInfo, nil, wi)
}

// fingerprint returns an identifier for this wallet. Only HD wallets will have
// an identifier. Descriptor wallets will not.
func (wc *rpcClient) fingerprint() (string, error) {
walletInfo, err := wc.GetWalletInfo()
if err != nil {
return "", err
}

if walletInfo.HdSeedID == "" {
return "", fmt.Errorf("fingerprint not availble")
}

return walletInfo.HdSeedID, nil
}

// GetAddressInfo gets information about the given address by calling
// getaddressinfo RPC command.
func (wc *rpcClient) getAddressInfo(addr btcutil.Address, method string) (*GetAddressInfoResult, error) {
Expand Down
20 changes: 20 additions & 0 deletions client/asset/btc/spv_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,26 @@ func (w *spvWallet) numDerivedAddresses() (internal, external uint32, err error)
return props.InternalKeyCount, props.ExternalKeyCount, nil
}

// fingerprint returns an identifier for this wallet. It is the hash of the
// compressed serialization of the account pub key.
func (w *spvWallet) fingerprint() (string, error) {
props, err := w.wallet.AccountProperties(waddrmgr.KeyScopeBIP0084, w.acctNum)
if err != nil {
return "", err
}

if props.AccountPubKey == nil {
return "", fmt.Errorf("no account key available")
}

pk, err := props.AccountPubKey.ECPubKey()
if err != nil {
return "", err
}

return hex.EncodeToString(btcutil.Hash160(pk.SerializeCompressed())), nil
}

// getTxOut finds an unspent transaction output and its number of confirmations.
// To match the behavior of the RPC method, even if an output is found, if it's
// known to be spent, no *wire.TxOut and no error will be returned.
Expand Down