Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 30 additions & 37 deletions messaging/common/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ const (
var RekeyCompatibility = true

type MessageSender struct {
identity *ecdsa.PrivateKey
datasync *datasync.DataSync
datasyncPersistence datasyncnode.Persistence
transport *transport.Transport
protocol *encryption.Protocol
logger *zap.Logger
persistence messagingtypes.MessageSenderPersistence
segmentationPersistence segmentation.Persistence
publisher *pubsub.Publisher
identity *ecdsa.PrivateKey
datasync *datasync.DataSync
datasyncPersistence datasyncnode.Persistence
transport *transport.Transport
segmenter *segmentation.Segmenter
protocol *encryption.Protocol
logger *zap.Logger
persistence messagingtypes.MessageSenderPersistence
publisher *pubsub.Publisher

datasyncEnabled bool

Expand All @@ -79,16 +79,16 @@ func NewMessageSender(
logger *zap.Logger,
) (*MessageSender, error) {
p := &MessageSender{
identity: identity,
datasyncPersistence: datasyncPersistence,
datasyncEnabled: true, // FIXME
protocol: enc,
persistence: persistence,
segmentationPersistence: segmentationPersistence,
publisher: pubsub.NewPublisher(),
transport: transport,
logger: logger,
ephemeralKeys: make(map[string]*ecdsa.PrivateKey),
identity: identity,
datasyncPersistence: datasyncPersistence,
datasyncEnabled: true, // FIXME
transport: transport,
segmenter: segmentation.NewSegmenter(segmentationPersistence, logger),
protocol: enc,
persistence: persistence,
publisher: pubsub.NewPublisher(),
logger: logger,
ephemeralKeys: make(map[string]*ecdsa.PrivateKey),
}

return p, nil
Expand Down Expand Up @@ -837,15 +837,13 @@ func (s *MessageSender) HandleMessages(msg *messagingtypes.ReceivedMessage) (*me
return nil, s.persistence.SaveHashRatchetMessage(info.GroupID, info.KeyID, msg)
}

// The current message segment has been successfully retrieved.
// However, the collection of segments is not yet complete.
if err == ErrMessageSegmentsIncomplete {
return nil, nil
}

return nil, err
}

if response == nil {
return nil, nil
}

// Process queued hash ratchet messages
for _, hashRatchetInfo := range response.Message.EncryptionLayer.HashRatchetInfo {
messages, err := s.persistence.GetHashRatchetMessages(hashRatchetInfo.KeyID)
Expand Down Expand Up @@ -906,8 +904,7 @@ func (h *handleMessageResponse) Messages() []*messagingtypes.Message {
}

func (s *MessageSender) handleMessage(receivedMsg *messagingtypes.ReceivedMessage) (*handleMessageResponse, error) {
logger := s.logger.With(zap.String("site", "handleMessage"))
hlogger := logger.With(zap.String("hash", types.EncodeHex(receivedMsg.Hash)))
hlogger := s.logger.Named("handleMessage").With(zap.String("hash", types.EncodeHex(receivedMsg.Hash)))

message := &messagingtypes.Message{}

Expand All @@ -924,18 +921,14 @@ func (s *MessageSender) handleMessage(receivedMsg *messagingtypes.ReceivedMessag
return nil, err
}

err = s.handleSegmentationLayer(message)
isSegmentMessage, completed, err := s.handleSegmentationLayer(message)
if err != nil {
// Segments not completed yet, stop processing
if err == ErrMessageSegmentsIncomplete {
return nil, err
}
// Segments already completed, stop processing
if err == ErrMessageSegmentsAlreadyCompleted {
return nil, err
}
return nil, err
}

// Not a critical error; message wasn't segmented, proceed with next layers.
// Segments not completed yet, stop processing
if isSegmentMessage && !completed {
return nil, nil
}

err = s.handleEncryptionLayer(context.Background(), message)
Expand Down
80 changes: 80 additions & 0 deletions messaging/common/message_sender_segmentation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package common

import (
"time"

"github.com/jinzhu/copier"
"go.uber.org/zap"

"github.com/status-im/status-go/messaging/layers/segmentation"
"github.com/status-im/status-go/messaging/types"
wakutypes "github.com/status-im/status-go/messaging/waku/types"
)

// reducedMaxMessageSize returns the max message size reduced to 3/4 to leave room for segment metadata
func (s *MessageSender) reducedMaxMessageSize() uint32 {
return s.transport.MaxMessageSize() * 3 / 4
}

func (s *MessageSender) segmentMessage(newMessage *wakutypes.NewMessage) ([]*wakutypes.NewMessage, error) {
return s.segmentMessageWithSize(newMessage, int(s.reducedMaxMessageSize()))
}

func (s *MessageSender) segmentMessageWithSize(newMessage *wakutypes.NewMessage, segmentSize int) ([]*wakutypes.NewMessage, error) {
segments, err := s.segmenter.Segment(newMessage.Payload, segmentSize)
if err != nil {
return nil, err
}

replicateMessage := func(payload []byte) (*wakutypes.NewMessage, error) {
copy := &wakutypes.NewMessage{}
err := copier.Copy(copy, newMessage)
if err != nil {
return nil, err
}

copy.Payload = payload
return copy, nil
}

newMessages := make([]*wakutypes.NewMessage, 0, len(segments))
for _, segment := range segments {
segmentMessage, err := replicateMessage(segment)
if err != nil {
return nil, err
}
newMessages = append(newMessages, segmentMessage)
}

s.logger.Debug("message segmented", zap.Int("segments", len(newMessages)))

return newMessages, err
}

// handleSegmentationLayer is capable of reconstructing the message from both complete and partial sets of data segments.
func (s *MessageSender) handleSegmentationLayer(message *types.Message) (segmented, completed bool, err error) {
var reconstructedPayload []byte
reconstructedPayload, err = s.segmenter.Reconstruct(message.TransportLayer.Payload, message.TransportLayer.SigPubKey)

switch err {
case nil:
message.TransportLayer.Payload = reconstructedPayload
segmented = true
completed = true
case segmentation.ErrIncomplete:
segmented = true
completed = false
err = nil
case segmentation.ErrInvalidPayload:
segmented = false
completed = false
err = nil
}

return
}

func (s *MessageSender) CleanupSegments() error {
monthAgo := time.Now().AddDate(0, -1, 0)
return s.segmenter.CleanupStaleSegments(monthAgo)
}
4 changes: 2 additions & 2 deletions messaging/common/message_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (s *MessageSenderSuite) TestHandleSegmentMessages() {
wrappedPayload, err := v1protocol.WrapMessageV1(encodedPayload, protobuf.ApplicationMetadataMessage_CHAT_MESSAGE, authorKey)
s.Require().NoError(err)

segmentedMessages, err := segmentMessage(&wakutypes.NewMessage{Payload: wrappedPayload}, int(math.Ceil(float64(len(wrappedPayload))/2)))
segmentedMessages, err := s.sender.segmentMessageWithSize(&wakutypes.NewMessage{Payload: wrappedPayload}, int(math.Ceil(float64(len(wrappedPayload))/2)))
s.Require().NoError(err)
s.Require().Len(segmentedMessages, 2)

Expand All @@ -379,7 +379,7 @@ func (s *MessageSenderSuite) TestHandleSegmentMessages() {

// Receiving another segment after the message has been reassembled is considered an error
_, err = s.sender.HandleMessages(message)
s.Require().ErrorIs(err, ErrMessageSegmentsAlreadyCompleted)
s.Require().ErrorIs(err, segmentation.ErrAlreadyCompleted)
}

func (s *MessageSenderSuite) TestGetEphemeralKey() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package types
package segmentation

import "github.com/status-im/status-go/protocol/protobuf"
import "github.com/status-im/status-go/messaging/layers/segmentation/protobuf"

type SegmentMessage struct {
type Message struct {
*protobuf.SegmentMessage
}

func (s *SegmentMessage) IsValid() bool {
func (s *Message) IsValid() bool {
// Check if the hash length is valid (32 bytes for Keccak256)
if len(s.EntireMessageHash) != 32 {
return false
Expand All @@ -25,6 +25,6 @@ func (s *SegmentMessage) IsValid() bool {
return s.SegmentsCount >= 2 || s.ParitySegmentsCount > 0
}

func (s *SegmentMessage) IsParityMessage() bool {
func (s *Message) IsParityMessage() bool {
return s.SegmentsCount == 0 && s.ParitySegmentsCount > 0
}
6 changes: 2 additions & 4 deletions messaging/layers/segmentation/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package segmentation

import (
"crypto/ecdsa"

"github.com/status-im/status-go/messaging/types"
)

type Persistence interface {
IsMessageAlreadyCompleted(hash []byte) (bool, error)
SaveMessageSegment(segment *types.SegmentMessage, sigPubKey *ecdsa.PublicKey, timestamp int64) error
GetMessageSegments(hash []byte, sigPubKey *ecdsa.PublicKey) ([]*types.SegmentMessage, error)
SaveMessageSegment(segment *Message, sigPubKey *ecdsa.PublicKey, timestamp int64) error
GetMessageSegments(hash []byte, sigPubKey *ecdsa.PublicKey) ([]*Message, error)
CompleteMessageSegments(hash []byte, sigPubKey *ecdsa.PublicKey, timestamp int64) error
RemoveMessageSegmentsOlderThan(timestamp int64) error
RemoveMessageSegmentsCompletedOlderThan(timestamp int64) error
Expand Down
3 changes: 3 additions & 0 deletions messaging/layers/segmentation/protobuf/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package protobuf

//go:generate protoc --go_out=. ./segment_message.proto
Loading
Loading