Skip to content

Commit

Permalink
fix(wallet): cleanup token_balances table on account removed
Browse files Browse the repository at this point in the history
Updates #4937
  • Loading branch information
IvanBelyakoff committed Mar 22, 2024
1 parent 0527d60 commit 1586e23
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 13 deletions.
2 changes: 1 addition & 1 deletion protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func NewMessenger(
if c.tokenManager != nil {
managerOptions = append(managerOptions, communities.WithTokenManager(c.tokenManager))
} else if c.rpcClient != nil {
tokenManager := token.NewTokenManager(c.walletDb, c.rpcClient, community.NewManager(database, c.httpServer, nil), c.rpcClient.NetworkManager, database, c.httpServer, nil)
tokenManager := token.NewTokenManager(c.walletDb, c.rpcClient, community.NewManager(database, c.httpServer, nil), c.rpcClient.NetworkManager, database, c.httpServer, nil, nil, nil)
managerOptions = append(managerOptions, communities.WithTokenManager(communities.NewDefaultTokenManager(tokenManager)))
}

Expand Down
4 changes: 3 additions & 1 deletion services/wallet/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func NewService(

communityManager := community.NewManager(db, mediaServer, feed)
balanceCacher := balance.NewCacherWithTTL(5 * time.Minute)
tokenManager := token.NewTokenManager(db, rpcClient, communityManager, rpcClient.NetworkManager, appDB, mediaServer, feed)
tokenManager := token.NewTokenManager(db, rpcClient, communityManager, rpcClient.NetworkManager, appDB, mediaServer, feed, accountFeed, accountsDB)
tokenManager.Start()
savedAddressesManager := &SavedAddressesManager{db: db}
transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB, pendingTxManager, feed)
blockChainState := blockchainstate.NewBlockChainState()
Expand Down Expand Up @@ -262,6 +263,7 @@ func (s *Service) Stop() error {
s.history.Stop()
s.activity.Stop()
s.collectibles.Stop()
s.tokenManager.Stop()
s.started = false
log.Info("wallet stopped")
return nil
Expand Down
110 changes: 99 additions & 11 deletions services/wallet/token/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"github.com/status-im/status-go/contracts/ethscan"
"github.com/status-im/status-go/contracts/ierc20"
eth_node_types "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol/communities/token"
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/server"
"github.com/status-im/status-go/services/accounts/accountsevent"
"github.com/status-im/status-go/services/communitytokens"
"github.com/status-im/status-go/services/utils"
"github.com/status-im/status-go/services/wallet/async"
Expand Down Expand Up @@ -109,6 +111,9 @@ type Manager struct {
communityManager *community.Manager
mediaServer *server.MediaServer
walletFeed *event.Feed
accountFeed *event.Feed
accountWatcher *accountsevent.Watcher
accountsDB *accounts.Database

tokens []*Token

Expand All @@ -130,17 +135,7 @@ func mergeTokens(sliceLists [][]*Token) []*Token {
return res
}

func NewTokenManager(
db *sql.DB,
RPCClient *rpc.Client,
communityManager *community.Manager,
networkManager *network.Manager,
appDB *sql.DB,
mediaServer *server.MediaServer,
walletFeed *event.Feed,
) *Manager {
maker, _ := contracts.NewContractMaker(RPCClient)
stores := []store{newUniswapStore(), newDefaultStore()}
func prepareTokens(networkManager *network.Manager, stores []store) []*Token {
tokens := make([]*Token, 0)

networks, err := networkManager.GetAll()
Expand All @@ -163,6 +158,23 @@ func NewTokenManager(

tokens = mergeTokens([][]*Token{tokens, validTokens})
}
return tokens
}

func NewTokenManager(
db *sql.DB,
RPCClient *rpc.Client,
communityManager *community.Manager,
networkManager *network.Manager,
appDB *sql.DB,
mediaServer *server.MediaServer,
walletFeed *event.Feed,
accountFeed *event.Feed,
accountsDB *accounts.Database,
) *Manager {
maker, _ := contracts.NewContractMaker(RPCClient)
stores := []store{newUniswapStore(), newDefaultStore()}
tokens := prepareTokens(networkManager, stores)

return &Manager{
db: db,
Expand All @@ -175,6 +187,32 @@ func NewTokenManager(
tokens: tokens,
mediaServer: mediaServer,
walletFeed: walletFeed,
accountFeed: accountFeed,
accountsDB: accountsDB,
}
}

func (tm *Manager) Start() {
tm.startAccountsWatcher()
}

func (tm *Manager) startAccountsWatcher() {
if tm.accountWatcher != nil {
return
}

tm.accountWatcher = accountsevent.NewWatcher(tm.accountsDB, tm.accountFeed, tm.onAccountsChange)
tm.accountWatcher.Start()
}

func (tm *Manager) Stop() {
tm.stopAccountsWatcher()
}

func (tm *Manager) stopAccountsWatcher() {
if tm.accountWatcher != nil {
tm.accountWatcher.Stop()
tm.accountWatcher = nil
}
}

Expand Down Expand Up @@ -319,6 +357,7 @@ func (tm *Manager) FindOrCreateTokenByAddress(ctx context.Context, chainID uint6
}

func (tm *Manager) MarkAsPreviouslyOwnedToken(token *Token, owner common.Address) (bool, error) {
log.Info("Marking token as previously owned", "token", token, "owner", owner)
if token == nil {
return false, errors.New("token is nil")
}
Expand Down Expand Up @@ -885,3 +924,52 @@ func (tm *Manager) GetTokenHistoricalBalance(account common.Address, chainID uin
}
return &balance, nil
}

func (tm *Manager) GetPreviouslyOwnedTokens() (map[common.Address][]*Token, error) {
tokenMap := make(map[common.Address][]*Token)
rows, err := tm.db.Query("SELECT user_address, token_name, token_symbol, token_address, token_decimals, chain_id FROM token_balances")
if err != nil {
return nil, err
}
defer rows.Close()

for rows.Next() {
token := &Token{}
var addressStr, tokenAddressStr string
err := rows.Scan(&addressStr, &token.Name, &token.Symbol, &tokenAddressStr, &token.Decimals, &token.ChainID)
if err != nil {
return nil, err
}
address := common.HexToAddress(addressStr)
if (address == common.Address{}) {
continue
}
token.Address = common.HexToAddress(tokenAddressStr)
if (token.Address == common.Address{}) {
continue
}

if _, ok := tokenMap[address]; !ok {
tokenMap[address] = make([]*Token, 0)
}
tokenMap[address] = append(tokenMap[address], token)
}

return tokenMap, nil
}

func (tm *Manager) removeTokenBalances(account common.Address) error {
_, err := tm.db.Exec("DELETE FROM token_balances WHERE user_address = ?", account.String())
return err
}

func (tm *Manager) onAccountsChange(changedAddresses []common.Address, eventType accountsevent.EventType, currentAddresses []common.Address) {
if eventType == accountsevent.EventTypeRemoved {
for _, account := range changedAddresses {
err := tm.removeTokenBalances(account)
if err != nil {
log.Error("token.Manager: can't remove token balances", "error", err)
}
}
}
}
84 changes: 84 additions & 0 deletions services/wallet/token/token_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
package token

import (
"errors"
"math/big"
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
gethrpc "github.com/ethereum/go-ethereum/rpc"

"github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/rpc/network"
mediaserver "github.com/status-im/status-go/server"
"github.com/status-im/status-go/services/accounts/accountsevent"
"github.com/status-im/status-go/services/wallet/bigint"
"github.com/status-im/status-go/services/wallet/community"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/t/utils"
"github.com/status-im/status-go/transactions/fake"
"github.com/status-im/status-go/walletdatabase"
)

Expand Down Expand Up @@ -297,3 +311,73 @@ func TestGetTokenHistoricalBalance(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expectedBalance, balance)
}

func Test_removeTokenBalanceOnEventAccountRemoved(t *testing.T) {
appDB, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
require.NoError(t, err)

walletDB, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{})
require.NoError(t, err)

accountsDB, err := accounts.NewDB(appDB)
require.NoError(t, err)

address := common.HexToAddress("0x1234")
accountFeed := event.Feed{}
chainID := uint64(1)
txServiceMockCtrl := gomock.NewController(t)
server, _ := fake.NewTestServer(txServiceMockCtrl)
client := gethrpc.DialInProc(server)
rpcClient, _ := rpc.NewClient(client, chainID, params.UpstreamRPCConfig{}, nil, nil)
rpcClient.UpstreamChainID = chainID
nm := network.NewManager(appDB)
mediaServer, err := mediaserver.NewMediaServer(appDB, nil, nil, walletDB)
require.NoError(t, err)

manager := NewTokenManager(walletDB, rpcClient, nil, nm, appDB, mediaServer, nil, &accountFeed, accountsDB)

// Insert balances for address
manager.MarkAsPreviouslyOwnedToken(&Token{
Address: common.HexToAddress("0x1234"),
Symbol: "Dummy",
Decimals: 18,
ChainID: 1,
}, address)

tokenByAddress, err := manager.GetPreviouslyOwnedTokens()
require.NoError(t, err)
require.Len(t, tokenByAddress, 1)

// Start service
manager.startAccountsWatcher()

// Watching accounts must start before sending event.
// To avoid running goroutine immediately and let the controller subscribe first,
// use any delay.
group := sync.WaitGroup{}
group.Add(1)
go func() {
defer group.Done()
time.Sleep(1 * time.Millisecond)

accountFeed.Send(accountsevent.Event{
Type: accountsevent.EventTypeRemoved,
Accounts: []common.Address{address},
})

require.NoError(t, utils.Eventually(func() error {
tokenByAddress, err := manager.GetPreviouslyOwnedTokens()
if err == nil && len(tokenByAddress) == 0 {
return nil
}
return errors.New("Token not removed")
}, 100*time.Millisecond, 10*time.Millisecond))
}()

group.Wait()

// Stop service
txServiceMockCtrl.Finish()
server.Stop()
manager.stopAccountsWatcher()
}

0 comments on commit 1586e23

Please sign in to comment.