Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend peersyncing to sync 1-to-1 messages #4962

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2431,7 +2431,6 @@ func (m *Messenger) sendChatMessage(ctx context.Context, message *common.Message
syncMessageType = peersyncing.SyncMessageCommunityType
} else if chat.PrivateGroupChat() {
syncMessageType = peersyncing.SyncMessagePrivateGroup

}

wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
Expand All @@ -2442,7 +2441,7 @@ func (m *Messenger) sendChatMessage(ctx context.Context, message *common.Message
syncMessage := peersyncing.SyncMessage{
Type: syncMessageType,
ID: types.Hex2Bytes(rawMessage.ID),
GroupID: []byte(chat.ID),
ChatID: []byte(chat.ID),
Payload: wrappedMessage,
Timestamp: m.transport.GetCurrentTime() / 1000,
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/messenger_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2457,7 +2457,7 @@ func (m *Messenger) addPeersyncingMessage(chat *Chat, msg *v1protocol.StatusMess
syncMessage := peersyncing.SyncMessage{
Type: syncMessageType,
ID: msg.ApplicationLayer.ID,
GroupID: []byte(chat.ID),
ChatID: []byte(chat.ID),
Payload: msg.EncryptionLayer.Payload,
Timestamp: uint64(msg.TransportLayer.Message.Timestamp),
}
Expand Down
92 changes: 71 additions & 21 deletions protocol/messenger_peersyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,37 +118,47 @@ func (m *Messenger) sendDatasyncOffers() error {
return nil
}

communities, err := m.communitiesManager.Joined()
err = m.sendDatasyncOffersForCommunities()
if err != nil {
return err
}

for _, community := range communities {
err = m.sendDatasyncOffersForChats()
if err != nil {
return err
}

// Check all the group ids that need to be on offer
vitvly marked this conversation as resolved.
Show resolved Hide resolved
// Get all the messages that need to be offered
// Prepare datasync messages
// Dispatch them to the right group
return nil
}

func (m *Messenger) sendDatasyncOffersForCommunities() error {
joinedCommunities, err := m.communitiesManager.Joined()
if err != nil {
return err
}

for _, community := range joinedCommunities {
var chatIDs [][]byte
for id := range community.Chats() {
chatIDs = append(chatIDs, []byte(community.IDString()+id))
}

if len(chatIDs) == 0 {
continue
}

availableMessages, err := m.peersyncing.AvailableMessagesByGroupIDs(chatIDs, maxAdvertiseMessages)
availableMessagesMap, err := m.peersyncing.AvailableMessagesMapByChatIDs(chatIDs, maxAdvertiseMessages)
if err != nil {
return err
}
availableMessagesMap := make(map[string][][]byte)
for _, m := range availableMessages {
groupID := types.Bytes2Hex(m.GroupID)
availableMessagesMap[groupID] = append(availableMessagesMap[groupID], m.ID)
}

datasyncMessage := &datasyncproto.Payload{}
if len(availableMessages) == 0 {
if len(availableMessagesMap) == 0 {
continue
}
for groupID, m := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(groupID), MessageIds: m})
for chatID, m := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(chatID), MessageIds: m})
}
payload, err := proto.Marshal(datasyncMessage)
if err != nil {
Expand All @@ -164,12 +174,43 @@ func (m *Messenger) sendDatasyncOffers() error {
if err != nil {
return err
}
}
return nil
}

func (m *Messenger) sendDatasyncOffersForChats() error {
for _, chat := range m.Chats() {
chatIDBytes := []byte(chat.ID)
availableMessagesMap, err := m.peersyncing.AvailableMessagesMapByChatIDs([][]byte{chatIDBytes}, maxAdvertiseMessages)
if err != nil {
return err
}
datasyncMessage := &datasyncproto.Payload{}
if len(availableMessagesMap) == 0 {
continue
}
for _, message := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: chatIDBytes, MessageIds: message})
}
payload, err := proto.Marshal(datasyncMessage)
if err != nil {
return err
}

publicKey, err := chat.PublicKey()
if err != nil {
return err
}
rawMessage := common.RawMessage{
Payload: payload,
Ephemeral: true,
SkipApplicationWrap: true,
}
_, err = m.sender.SendPrivate(context.Background(), publicKey, &rawMessage)
if err != nil {
return err
}
}
// Check all the group ids that need to be on offer
// Get all the messages that need to be offered
// Prepare datasync messages
// Dispatch them to the right group
return nil
}

Expand All @@ -188,7 +229,7 @@ func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) erro
var offeredMessages []peersyncing.SyncMessage

for _, o := range offers {
offeredMessages = append(offeredMessages, peersyncing.SyncMessage{GroupID: o.GroupID, ID: o.MessageID})
offeredMessages = append(offeredMessages, peersyncing.SyncMessage{ChatID: o.GroupID, ID: o.MessageID})
}

messagesToFetch, err := m.peersyncing.OnOffer(offeredMessages)
Expand Down Expand Up @@ -235,7 +276,7 @@ func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) erro
func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ecdsa.PublicKey) (bool, error) {
switch message.Type {
case peersyncing.SyncMessageCommunityType:
chat, ok := m.allChats.Load(string(message.GroupID))
chat, ok := m.allChats.Load(string(message.ChatID))
if !ok {
return false, nil
}
Expand All @@ -245,7 +286,12 @@ func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ec
}

return m.canSyncCommunityMessageWith(chat, community, peer)

case peersyncing.SyncMessageOneToOneType:
chat, ok := m.allChats.Load(string(message.ChatID))
if !ok {
return false, nil
}
return m.canSyncOneToOneMessageWith(chat, peer)
default:
return false, nil
}
Expand All @@ -258,6 +304,10 @@ func (m *Messenger) canSyncCommunityMessageWith(chat *Chat, community *communiti
return community.IsMemberInChat(peer, chat.CommunityChatID()), nil
}

func (m *Messenger) canSyncOneToOneMessageWith(chat *Chat, peer *ecdsa.PublicKey) (bool, error) {
return chat.HasMember(common.PubkeyToHex(peer)), nil
}

func (m *Messenger) OnDatasyncRequests(requester *ecdsa.PublicKey, messageIDs [][]byte) error {
if len(messageIDs) == 0 {
return nil
Expand Down
98 changes: 97 additions & 1 deletion protocol/messenger_peersyncing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package protocol

import (
"context"
"encoding/hex"
"testing"
"time"

"github.com/stretchr/testify/suite"
"go.uber.org/zap"

gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities"
Expand Down Expand Up @@ -243,7 +245,7 @@ func (s *MessengerPeersyncingSuite) TestCanSyncMessageWith() {

syncMessage := peersyncing.SyncMessage{
ID: []byte("test-id"),
GroupID: []byte(chat.ID),
ChatID: []byte(chat.ID),
Type: peersyncing.SyncMessageCommunityType,
Payload: []byte("some-payload"),
Timestamp: 1,
Expand All @@ -261,3 +263,97 @@ func (s *MessengerPeersyncingSuite) TestCanSyncMessageWith() {
s.Require().NoError(err)
s.Require().True(canSyncWithAlice)
}

func (s *MessengerPeersyncingSuite) TestSyncOneToOne() {
s.alice.featureFlags.Peersyncing = true
s.owner.featureFlags.Peersyncing = true

pkString := hex.EncodeToString(crypto.FromECDSAPub(&s.alice.identity.PublicKey))
chat := CreateOneToOneChat(pkString, &s.alice.identity.PublicKey, s.owner.transport)

chat.LastClockValue = uint64(100000000000000)
err := s.owner.SaveChat(chat)
s.NoError(err)
_, err = s.alice.Join(chat)
s.NoError(err)

chatID := chat.ID
inputMessage := common.NewMessage()
inputMessage.ChatId = chatID
inputMessage.ContentType = protobuf.ChatMessage_TEXT_PLAIN
inputMessage.Text = "some text"

ctx := context.Background()

// Send message, it should be received
response, err := s.alice.SendChatMessage(ctx, inputMessage)
s.Require().NoError(err)
s.Require().Len(response.Messages(), 1)
messageID := response.Messages()[0].ID

// Make sure the message makes it to the owner
response, err = WaitOnMessengerResponse(
s.owner,
func(r *MessengerResponse) bool {
return len(r.Messages()) == 1 && r.Messages()[0].ID == messageID
},
"message not received",
)
s.Require().NoError(err)
s.Require().NotNil(response)

msg, err := s.owner.peersyncing.AvailableMessages()
s.Require().NoError(err)
s.Require().Len(msg, 1)

// Alice should now send an offer
_, err = WaitOnMessengerResponse(
s.alice,
func(r *MessengerResponse) bool {
return s.alice.peersyncingOffers[messageID[2:]] != 0
},
"offer not sent",
)
s.Require().NoError(err)

// Owner should now reply to the offer
_, err = WaitOnMessengerResponse(
s.owner,
func(r *MessengerResponse) bool {
return s.owner.peersyncingRequests[s.alice.myHexIdentity()+messageID[2:]] != 0
},
"request not sent",
)
s.Require().NoError(err)
}

func (s *MessengerPeersyncingSuite) TestCanSyncOneToOneMessageWith() {
s.alice.featureFlags.Peersyncing = true
s.owner.featureFlags.Peersyncing = true

pkString := hex.EncodeToString(crypto.FromECDSAPub(&s.alice.identity.PublicKey))
chat := CreateOneToOneChat(pkString, &s.alice.identity.PublicKey, s.owner.transport)

chat.LastClockValue = uint64(100000000000000)
err := s.owner.SaveChat(chat)
s.NoError(err)
_, err = s.alice.Join(chat)
s.NoError(err)

syncMessage := peersyncing.SyncMessage{
ID: []byte("test-id"),
ChatID: []byte(chat.ID),
Type: peersyncing.SyncMessageOneToOneType,
Payload: []byte("some-payload"),
Timestamp: chat.LastClockValue,
}
s.Require().NoError(s.owner.peersyncing.Add(syncMessage))

canSyncWithBob, err := s.owner.canSyncOneToOneMessageWith(chat, &s.bob.identity.PublicKey)
s.Require().NoError(err)
s.Require().False(canSyncWithBob)

canSyncWithAlice, err := s.owner.canSyncOneToOneMessageWith(chat, &s.alice.identity.PublicKey)
s.Require().NoError(err)
s.Require().True(canSyncWithAlice)
}
36 changes: 31 additions & 5 deletions protocol/migrations/migrations.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP INDEX IF EXISTS peersyncing_messages_timestamp;

ALTER TABLE peersyncing_messages RENAME COLUMN group_id TO chat_id;

CREATE INDEX peersyncing_messages_timestamp ON peersyncing_messages(chat_id, timestamp);