Skip to content

Commit

Permalink
dcrwallet: Fix syncer not reconnecting
Browse files Browse the repository at this point in the history
This fixes an issue where the wallet would lose connection to the
underlying dcrd node and never restart it.

This bug was introduced when upgrading to the wallet v3 package.

The RPCSyncer now correctly attempts to maintain the wallet synchronized
until it is signalled to shutdown.
  • Loading branch information
matheusd committed Jan 17, 2020
1 parent 958d2d9 commit 2bbc804
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 33 deletions.
76 changes: 53 additions & 23 deletions lnwallet/dcrwallet/rpcsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package dcrwallet

import (
"context"
"sync"
"time"

"github.com/decred/dcrd/chaincfg/v2"
"github.com/decred/dcrd/rpcclient/v5"
Expand All @@ -13,9 +15,14 @@ import (
// RPCSyncer implements the required methods for synchronizing a DcrWallet
// instance using a full node dcrd backend.
type RPCSyncer struct {
cancel func()
rpcConfig rpcclient.ConnConfig
net *chaincfg.Params

mtx sync.Mutex

// The following fields are protected by mtx.

cancel func()
}

// NewRPCSyncer initializes a new syncer backed by a full dcrd node. It
Expand All @@ -31,42 +38,65 @@ func NewRPCSyncer(rpcConfig rpcclient.ConnConfig, net *chaincfg.Params) (*RPCSyn
// start the syncer backend and begin synchronizing the given wallet.
func (s *RPCSyncer) start(w *DcrWallet) error {

dcrwLog.Debugf("Starting rpc syncer")

// This context will be canceled by `w` once its Stop() method is
// called.
var ctx context.Context
ctx, s.cancel = context.WithCancel(context.Background())

chainRpcOpts := chain.RPCOptions{
Address: s.rpcConfig.Host,
User: s.rpcConfig.User,
Pass: s.rpcConfig.Pass,
CA: s.rpcConfig.Certificates,
}
syncer := chain.NewSyncer(w.wallet, &chainRpcOpts)
syncer.SetCallbacks(&chain.Callbacks{
Synced: w.onRPCSyncerSynced,
})

go func() {
err := syncer.Run(ctx)
dcrwLog.Debugf("RPCsyncer shutting down")

// TODO: convert to errors.Is
if werr, is := err.(*errors.Error); is && werr.Err == context.Canceled {
// This was a graceful shutdown, so ignore the error.
dcrwLog.Debugf("RPCsyncer shutting down")
return
}
for {
// This context will be canceled by `w` once its Stop() method is
// called.
ctx, cancel := context.WithCancel(context.Background())
s.mtx.Lock()
s.cancel = cancel
s.mtx.Unlock()

dcrwLog.Errorf("RPCSyncer error: %v", err)
syncer := chain.NewSyncer(w.wallet, &chainRpcOpts)
syncer.SetCallbacks(&chain.Callbacks{
Synced: w.onRPCSyncerSynced,
})

dcrwLog.Debugf("Starting rpc syncer")
err := syncer.Run(ctx)
w.rpcSyncerFinished()

// TODO: convert to errors.Is
if werr, is := err.(*errors.Error); is && werr.Err == context.Canceled {
// This was a graceful shutdown, so ignore the error.
dcrwLog.Debugf("RPCsyncer shutting down")
return
}
dcrwLog.Errorf("RPCSyncer error: %v", err)

// Backoff for 5 seconds.
select {
case <-ctx.Done():
// Graceful shutdown.
dcrwLog.Debugf("RPCsyncer shutting down")
return
case <-time.After(5 * time.Second):
}

// Clear and call s.cancel() so we don't leak it.
s.mtx.Lock()
s.cancel = nil
s.mtx.Unlock()
cancel()
}
}()

return nil
}

func (s *RPCSyncer) stop() {
dcrwLog.Debugf("RPCSyncer requested shutdown")
s.cancel()
s.mtx.Lock()
if s.cancel != nil {
s.cancel()
s.cancel = nil
}
s.mtx.Unlock()
}
50 changes: 40 additions & 10 deletions lnwallet/dcrwallet/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ const (
scriptVersion = uint16(0)
)

const (
// The following values are used by atomicWalletSync. Their
// interpretation is the following:
//
// - Unsynced: wallet just started and hasn't performed the first sync
// yet.
// - Synced: wallet is currently synced.
// - LostSync: wallet was synced in the past but lost the connection to
// the network and it's unknown whether it's synced or not.
syncStatusUnsynced uint32 = 0
syncStatusSynced = 1
syncStatusLostSync = 2
)

// DcrWallet is an implementation of the lnwallet.WalletController interface
// backed by an active instance of dcrwallet. At the time of the writing of
// this documentation, this implementation requires a full dcrd node to
Expand All @@ -42,9 +56,12 @@ const (
// wallet has been fully synced.
type DcrWallet struct {
// wallet is an active instance of dcrwallet.
wallet *base.Wallet
loader *walletloader.Loader
atomicWalletSynced uint32 // CAS (synced=1) when wallet syncing complete
wallet *base.Wallet
loader *walletloader.Loader

// atomicWalletSync controls the current sync status of the wallet. It
// MUST be used atomically.
atomicWalletSynced uint32

// syncedChan is a channel that is closed once the wallet has initially
// synced to the network. It is protected by atomicWalletSynced.
Expand Down Expand Up @@ -107,12 +124,13 @@ func New(cfg Config) (*DcrWallet, error) {
}

return &DcrWallet{
cfg: &cfg,
wallet: wallet,
loader: loader,
syncer: syncer,
syncedChan: make(chan struct{}),
netParams: cfg.NetParams,
cfg: &cfg,
wallet: wallet,
loader: loader,
syncer: syncer,
syncedChan: make(chan struct{}),
atomicWalletSynced: syncStatusUnsynced,
netParams: cfg.NetParams,
}, nil
}

Expand Down Expand Up @@ -807,6 +825,12 @@ func (b *DcrWallet) InitialSyncChannel() <-chan struct{} {
func (b *DcrWallet) onRPCSyncerSynced(synced bool) {
dcrwLog.Debug("RPC syncer notified wallet is synced")

if atomic.CompareAndSwapUint32(&b.atomicWalletSynced, syncStatusLostSync, syncStatusSynced) {
// No need to recreate the keyring or close the initial sync
// channel, so just return.
return
}

// Now that the wallet is synced and address discovery has ended, we
// can create the keyring. We can only do this here (after sync)
// because address discovery might upgrade the underlying dcrwallet
Expand All @@ -821,7 +845,13 @@ func (b *DcrWallet) onRPCSyncerSynced(synced bool) {
}

// Signal that the wallet is synced by closing the channel.
if atomic.CompareAndSwapUint32(&b.atomicWalletSynced, 0, 1) {
if atomic.CompareAndSwapUint32(&b.atomicWalletSynced, syncStatusUnsynced, syncStatusSynced) {
close(b.syncedChan)
}
}

func (b *DcrWallet) rpcSyncerFinished() {
// The RPC syncer stopped, so if we were previously synced we need to
// signal that we aren't anymore.
atomic.CompareAndSwapUint32(&b.atomicWalletSynced, syncStatusSynced, syncStatusLostSync)
}

0 comments on commit 2bbc804

Please sign in to comment.