Skip to content

Commit

Permalink
enable custom community storenodes (#4532)
Browse files Browse the repository at this point in the history
* enable custom community store nodes

* fix

* fix

* fix

* fix

* cleanup

* fix

* migration

* fix

* cleanup

* fix

* cleanup

* fix

* fix

* cleanup

* message to update the community storenodes

* rename

* fix test

* wait for availability only if global storenode

* fix test

* fix typo

* sync community storenodes

* remove unused

* add tests

* fix imports

* fix todo

* unused

* pr comments

* pr feedback

* revert merge deleted

* fix lint

* fix db and perform ms request

* typo

* fix log

* fix go imports

* refactor handle message

* cleanup public message

* add tests

* fix test

* cleanup test

* fix test

* avoid making one file to big to keep codeclimate from complaining

* fix lint

* revert

* Update protocol/storenodes/database.go

Co-authored-by: richΛrd <info@richardramos.me>

* Update protocol/messenger_mailserver_cycle.go

Co-authored-by: richΛrd <info@richardramos.me>

* PR comment

* fix tx

* proto files

* pr comment

---------

Co-authored-by: richΛrd <info@richardramos.me>
  • Loading branch information
plopezlpz and richard-ramos committed Feb 20, 2024
1 parent 515dbdf commit 3f19972
Show file tree
Hide file tree
Showing 29 changed files with 1,862 additions and 316 deletions.
235 changes: 129 additions & 106 deletions appdatabase/migrations/bindata.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions appdatabase/migrations/sql/1706955596_community_storenodes.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS community_storenodes (
community_id BLOB NOT NULL,
storenode_id VARCHAR NOT NULL,
name VARCHAR NOT NULL,
address VARCHAR NOT NULL,
fleet VARCHAR NOT NULL,
version INT NOT NULL,
clock INT NOT NULL DEFAULT 0,
removed BOOLEAN NOT NULL DEFAULT FALSE,
deleted_at INT NOT NULL DEFAULT 0,
PRIMARY KEY (community_id, storenode_id) -- One to many relationship between communities and storenodes: one community might have multiple storenodes
) WITHOUT ROWID;
10 changes: 8 additions & 2 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/status-im/status-go/protocol/pushnotificationserver"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/sqlite"
"github.com/status-im/status-go/protocol/storenodes"
"github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/protocol/verification"
Expand Down Expand Up @@ -134,6 +135,7 @@ type Messenger struct {
modifiedInstallations *stringBoolMap
installationID string
mailserverCycle mailserverCycle
communityStorenodes *storenodes.CommunityStorenodes
database *sql.DB
multiAccounts *multiaccounts.Database
settings *accounts.Database
Expand Down Expand Up @@ -164,7 +166,7 @@ type Messenger struct {

// TODO(samyoul) Determine if/how the remaining usage of this mutex can be removed
mutex sync.Mutex
mailPeersMutex sync.Mutex
mailPeersMutex sync.RWMutex
handleMessagesMutex sync.Mutex
handleImportMessagesMutex sync.Mutex

Expand Down Expand Up @@ -544,6 +546,7 @@ func NewMessenger(
availabilitySubscriptions: make([]chan struct{}, 0),
},
mailserversDatabase: c.mailserversDatabase,
communityStorenodes: storenodes.NewCommunityStorenodes(storenodes.NewDB(database), logger),
account: c.account,
quit: make(chan struct{}),
ctx: ctx,
Expand Down Expand Up @@ -847,6 +850,10 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
return nil, err
}

if err := m.communityStorenodes.ReloadFromDB(); err != nil {
return nil, err
}

controlledCommunities, err := m.communitiesManager.Controlled()
if err != nil {
return nil, err
Expand Down Expand Up @@ -2206,7 +2213,6 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe
return rawMessage, err
}
case ChatTypeCommunityChat:

community, err := m.communitiesManager.GetByIDString(chat.CommunityID)
if err != nil {
return rawMessage, err
Expand Down
48 changes: 47 additions & 1 deletion protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -2216,6 +2216,51 @@ func (m *Messenger) RemovePubsubTopicPrivateKey(topic string) error {
return m.transport.RemovePubsubTopicKey(topic)
}

func (m *Messenger) SetCommunityStorenodes(request *requests.SetCommunityStorenodes) (*MessengerResponse, error) {
if err := request.Validate(); err != nil {
return nil, err
}
community, err := m.communitiesManager.GetByID(request.CommunityID)
if err != nil {
return nil, err
}
if !community.IsControlNode() {
return nil, errors.New("not admin or owner")
}

if err := m.communityStorenodes.UpdateStorenodesInDB(request.CommunityID, request.Storenodes, 0); err != nil {
return nil, err
}
err = m.sendCommunityPublicStorenodesInfo(community, request.Storenodes)
if err != nil {
return nil, err
}
response := &MessengerResponse{
CommunityStorenodes: request.Storenodes,
}
return response, nil
}

func (m *Messenger) GetCommunityStorenodes(communityID types.HexBytes) (*MessengerResponse, error) {
community, err := m.communitiesManager.GetByID(communityID)
if err != nil {
return nil, err
}
if community == nil {
return nil, communities.ErrOrgNotFound
}

snodes, err := m.communityStorenodes.GetStorenodesFromDB(communityID)
if err != nil {
return nil, err
}

response := &MessengerResponse{
CommunityStorenodes: snodes,
}
return response, nil
}

func (m *Messenger) UpdateCommunityFilters(community *communities.Community) error {
defaultFilters := m.DefaultFilters(community)
publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(defaultFilters)+len(community.Chats()))
Expand Down Expand Up @@ -3377,7 +3422,8 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
}

// Request possibly missed waku messages for community
_, err = m.syncFiltersFrom(filters, uint32(latestWakuMessageTimestamp))
ms := m.getActiveMailserver(c.ID().String())
_, err = m.syncFiltersFrom(*ms, filters, uint32(latestWakuMessageTimestamp))
if err != nil {
m.communitiesManager.LogStdout("failed to request missing messages", zap.Error(err))
continue
Expand Down
32 changes: 12 additions & 20 deletions protocol/messenger_community_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package protocol

import (
"context"
"crypto/ecdsa"
"errors"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -75,13 +74,7 @@ func (m *Messenger) HandleCommunityPublicShardInfo(state *ReceivedMessageState,
m.logger.Error("HandleCommunityPublicShardInfo failed: ", zap.Error(err), zap.String("communityID", types.EncodeHex(publicShardInfo.CommunityId)))
}

signer, err := recoverCommunityShardInfoSignature(a)
if err != nil {
logError(err)
return err
}

err = m.verifyCommunityPublicShardInfo(publicShardInfo, signer)
err = m.verifyCommunitySignature(a.Payload, a.Signature, publicShardInfo.CommunityId, publicShardInfo.ChainId)
if err != nil {
logError(err)
return err
Expand All @@ -95,26 +88,25 @@ func (m *Messenger) HandleCommunityPublicShardInfo(state *ReceivedMessageState,
return nil
}

func recoverCommunityShardInfoSignature(rawShardInfo *protobuf.CommunityPublicShardInfo) (*ecdsa.PublicKey, error) {
if rawShardInfo.Signature == nil || len(rawShardInfo.Signature) == 0 {
return nil, errors.New("missing shard info signature")
func (m *Messenger) verifyCommunitySignature(payload, signature, communityID []byte, chainID uint64) error {
if len(signature) == 0 {
return errors.New("missing signature")
}

return crypto.SigToPub(crypto.Keccak256(rawShardInfo.Payload), rawShardInfo.Signature)
}

func (m *Messenger) verifyCommunityPublicShardInfo(publicShardInfo *protobuf.PublicShardInfo, signer *ecdsa.PublicKey) error {
pubKeyStr := common.PubkeyToHex(signer)
pubKey, err := crypto.SigToPub(crypto.Keccak256(payload), signature)
if err != nil {
return err
}
pubKeyStr := common.PubkeyToHex(pubKey)

var ownerPublicKey string
if publicShardInfo.ChainId > 0 {
owner, err := m.communitiesManager.SafeGetSignerPubKey(publicShardInfo.ChainId, types.EncodeHex(publicShardInfo.CommunityId))
if chainID > 0 {
owner, err := m.communitiesManager.SafeGetSignerPubKey(chainID, types.EncodeHex(communityID))
if err != nil {
return err
}
ownerPublicKey = owner
} else {
communityPubkey, err := crypto.DecompressPubkey(publicShardInfo.CommunityId)
communityPubkey, err := crypto.DecompressPubkey(communityID)
if err != nil {
return err
}
Expand Down
92 changes: 92 additions & 0 deletions protocol/messenger_community_storenodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package protocol

import (
"context"
"errors"

"github.com/golang/protobuf/proto"
"go.uber.org/zap"

"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/storenodes"
v1protocol "github.com/status-im/status-go/protocol/v1"
)

func (m *Messenger) sendCommunityPublicStorenodesInfo(community *communities.Community, snodes storenodes.Storenodes) error {
if !community.IsControlNode() {
return communities.ErrNotControlNode
}

clock, _ := m.getLastClockWithRelatedChat()
pb := &protobuf.CommunityStorenodes{
Clock: clock,
CommunityId: community.ID(),
Storenodes: snodes.ToProtobuf(),
ChainId: communities.CommunityDescriptionTokenOwnerChainID(community.Description()),
}
snPayload, err := proto.Marshal(pb)
if err != nil {
return err
}
signature, err := crypto.Sign(crypto.Keccak256(snPayload), community.PrivateKey())
if err != nil {
return err
}
signedStorenodesInfo := &protobuf.CommunityPublicStorenodesInfo{
Signature: signature,
Payload: snPayload,
}
signedPayload, err := proto.Marshal(signedStorenodesInfo)
if err != nil {
return err
}

rawMessage := common.RawMessage{
Payload: signedPayload,
Sender: community.PrivateKey(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_PUBLIC_STORENODES_INFO,
PubsubTopic: community.PubsubTopic(),
}

_, err = m.sender.SendPublic(context.Background(), community.IDString(), rawMessage)
return err
}

// HandleCommunityPublicStorenodesInfo will process the control message sent by the community owner on updating the community storenodes for his community (sendCommunityPublicStorenodesInfo).
// The message will be received by many peers that are not interested on that community, so if we don't have this community in our DB we just ignore this message.
func (m *Messenger) HandleCommunityPublicStorenodesInfo(state *ReceivedMessageState, a *protobuf.CommunityPublicStorenodesInfo, statusMessage *v1protocol.StatusMessage) error {
sn := &protobuf.CommunityStorenodes{}
err := proto.Unmarshal(a.Payload, sn)
if err != nil {
return err
}
logger := m.logger.Named("HandleCommunityPublicStorenodesInfo").With(zap.String("communityID", types.EncodeHex(sn.CommunityId)))

err = m.verifyCommunitySignature(a.Payload, a.Signature, sn.CommunityId, sn.ChainId)
if err != nil {
logger.Error("failed to verify community signature", zap.Error(err))
return err
}

// verify if we are interested in this control message
_, err = m.communitiesManager.GetByID(sn.CommunityId)
if err != nil {
if errors.Is(err, communities.ErrOrgNotFound) {
logger.Debug("ignoring control message, community not found")
return nil
}
logger.Error("failed get community by id", zap.Error(err))
return err
}

if err := m.communityStorenodes.UpdateStorenodesInDB(sn.CommunityId, storenodes.FromProtobuf(sn.Storenodes, sn.Clock), sn.Clock); err != nil {
logger.Error("failed to update storenodes for community", zap.Error(err))
return err
}
return nil
}
18 changes: 17 additions & 1 deletion protocol/messenger_handlers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 3f19972

Please sign in to comment.