Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
255a4e0
feat(llc): Add message delivery receipts
xsahil03x Nov 4, 2025
caafe98
fix: Clear delivery candidates on cancel
xsahil03x Nov 4, 2025
46ab6d6
test: fix existing tests
xsahil03x Nov 4, 2025
2e5b853
feat: Add delivery receipts privacy setting
xsahil03x Nov 6, 2025
adbbcfa
fix: Ensure `deliveriesOf` considers already read messages
xsahil03x Nov 7, 2025
b9549bb
feat: Add delivered status to sending indicator
xsahil03x Nov 7, 2025
0b8ca0e
feat(sample): Add message info screen
xsahil03x Nov 7, 2025
b55799c
feat(sample): Add message info screen
xsahil03x Nov 7, 2025
0f943f5
test: Enhance StreamSendingIndicator tests for delivered and read mes…
xsahil03x Nov 7, 2025
6200b73
test: Add tests for delivery receipts privacy settings
xsahil03x Nov 7, 2025
70554a4
test: Add tests for markChannelsDelivered functionality
xsahil03x Nov 7, 2025
dab12df
test: Add tests for ChannelReadHelper functionality
xsahil03x Nov 7, 2025
6ea2c4b
test: Add tests for message delivery and read state preservation
xsahil03x Nov 7, 2025
a75e65c
test: Add delivery submission tests for channel.query
xsahil03x Nov 7, 2025
93b550a
test: Add test for channel delivery submission on new message
xsahil03x Nov 7, 2025
bd84adf
test: Add tests for ChannelDeliveryReporter functionality
xsahil03x Nov 7, 2025
76a5477
chore: fix lints
xsahil03x Nov 7, 2025
18f872b
fix: correct muted user check in message visibility logic
xsahil03x Nov 10, 2025
cb1e1d8
test: Add unit tests for MessageRules functionality
xsahil03x Nov 10, 2025
21a3d3f
Merge branch 'master' into feat/mark-message-delivered
xsahil03x Nov 10, 2025
5a5f7e9
test: add more tests
xsahil03x Nov 10, 2025
050d945
test: update merging logic to preserve last delivered message values
xsahil03x Nov 10, 2025
9332107
feat(persistence): Add support for message delivery receipts
xsahil03x Nov 10, 2025
28f74ee
Merge branch 'master' into feat/mark-message-delivered
xsahil03x Nov 14, 2025
1738b53
Discard changes to packages/stream_chat_persistence/CHANGELOG.md
xsahil03x Nov 14, 2025
95cd62e
chore: update CHANGELOG.md
xsahil03x Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
361 changes: 191 additions & 170 deletions packages/stream_chat/lib/src/client/channel.dart

Large diffs are not rendered by default.

200 changes: 200 additions & 0 deletions packages/stream_chat/lib/src/client/channel_delivery_reporter.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import 'package:logging/logging.dart';
import 'package:rate_limiter/rate_limiter.dart';
import 'package:stream_chat/src/client/channel.dart';
import 'package:stream_chat/src/core/models/message.dart';
import 'package:stream_chat/src/core/models/message_delivery.dart';
import 'package:stream_chat/src/core/util/message_rules.dart';
import 'package:synchronized/synchronized.dart';

/// A callback that sends delivery receipts for multiple channels.
///
/// Each [MessageDeliveryInfo] represents an acknowledgment that the current
/// user has received a message.
typedef MarkChannelsDelivered = Future<void> Function(
Iterable<MessageDelivery> deliveries,
);

/// Manages the delivery reporting for channel messages.
///
/// Collects channels that need delivery acknowledgments and efficiently
/// reports them to the server.
class ChannelDeliveryReporter {
/// Creates a new channel delivery reporter.
///
/// The [onMarkChannelsDelivered] callback is invoked when delivery receipts
/// are ready to be sent to the server.
///
/// The [throttleDuration] controls how frequently delivery receipts are sent.
///
/// The optional [logger] logs warnings and errors during operation.
ChannelDeliveryReporter({
Logger? logger,
required this.onMarkChannelsDelivered,
Duration throttleDuration = const Duration(seconds: 1),
}) : _logger = logger,
_markAsDeliveredThrottleDuration = throttleDuration;

final Logger? _logger;
final Duration _markAsDeliveredThrottleDuration;

/// The callback invoked to send delivery receipts.
///
/// Receives delivery receipts acknowledging that messages were received.
final MarkChannelsDelivered onMarkChannelsDelivered;

final _deliveryCandidatesLock = Lock();
final _deliveryCandidates = <String /* cid */, Message /* message */ >{};

/// Submits [channels] for delivery reporting.
///
/// Marks each channel's last message as delivered if it meets the delivery
/// requirements according to [MessageRules.canMarkAsDelivered]. Channels
/// without a valid cid or last message are skipped.
///
/// Typically used after message.new events or initial channel queries. For
/// read/delivered events see [reconcileDelivery], for hidden/left channels
/// see [cancelDelivery].
Future<void> submitForDelivery(Iterable<Channel> channels) async {
await _deliveryCandidatesLock.synchronized(() {
for (final channel in channels) {
final channelCid = channel.cid;
if (channelCid == null) continue;

final lastMessage = channel.state?.lastMessage;
if (lastMessage == null) continue;

// Only submit for delivery if the message can be marked as delivered.
if (!MessageRules.canMarkAsDelivered(lastMessage, channel)) continue;

_logger?.fine(
'Submitted channel $channelCid for delivery '
'(message: ${lastMessage.id})',
);

// Update the latest message for the channel
_deliveryCandidates[channelCid] = lastMessage;
}
});

// Trigger mark channels delivered request
_throttledMarkCandidatesAsDelivered.call();
}

/// Reconciles delivery reporting for [channels] with their current state.
///
/// Re-evaluates whether messages still need to be marked as delivered based
/// on the channel's current state. Stops tracking messages that are already
/// read, delivered, or otherwise don't need delivery reporting.
///
/// This prevents duplicate delivery reports when a message is marked
/// delivered on another device, and avoids unnecessary reports when a user
/// reads a channel (since read supersedes delivered).
///
/// Typically used after message.read or message.delivered events. See
/// [cancelDelivery] to remove channels entirely, or [submitForDelivery]
/// to add new messages.
///
/// ```dart
/// // After a message.read or message.delivered event
/// reporter.reconcileDelivery([channel]);
/// ```
Future<void> reconcileDelivery(Iterable<Channel> channels) async {
return _deliveryCandidatesLock.synchronized(() {
for (final channel in channels) {
final channelCid = channel.cid;
if (channelCid == null) continue;

// Get the existing candidate message
final message = _deliveryCandidates[channelCid];
if (message == null) continue;

// If the message can still be marked as delivered, keep it
if (MessageRules.canMarkAsDelivered(message, channel)) continue;

_logger?.fine(
'Reconciled delivery for channel $channelCid '
'(message: ${message.id}), removing from candidates',
);

// Otherwise, remove it from the candidates
_deliveryCandidates.remove(channelCid);
}
});
}

/// Cancels pending delivery reports for [channels].
///
/// Prevents the specified channels from being marked as delivered. Typically
/// used when channels are hidden, left, or removed from view.
///
/// See [reconcileDelivery] to re-evaluate based on current read/delivered
/// state instead of removing channels entirely.
Future<void> cancelDelivery(Iterable<String> channels) {
return _deliveryCandidatesLock.synchronized(() {
for (final channelCid in channels) {
if (!_deliveryCandidates.containsKey(channelCid)) continue;

final message = _deliveryCandidates.remove(channelCid);

_logger?.fine(
'Canceled delivery for channel $channelCid '
'(message: ${message?.id})',
);
}
});
}

late final _throttledMarkCandidatesAsDelivered = Throttle(
leading: false,
_markCandidatesAsDelivered,
_markAsDeliveredThrottleDuration,
);

static const _maxCandidatesPerBatch = 100;
Future<void> _markCandidatesAsDelivered() async {
// We only process at-most 100 channels at a time to avoid large payloads.
final batch = {..._deliveryCandidates}.entries.take(_maxCandidatesPerBatch);
final messageDeliveries = batch.map(
(it) => MessageDelivery(channelCid: it.key, messageId: it.value.id),
);

if (messageDeliveries.isEmpty) return;

_logger?.info('Marking ${messageDeliveries.length} channels as delivered');

try {
await onMarkChannelsDelivered(messageDeliveries);

// Clear the successfully delivered candidates. If a channel's message ID
// has changed since we started delivery, keep it for the next batch.
await _deliveryCandidatesLock.synchronized(() {
for (final delivery in messageDeliveries) {
final deliveredChannelCid = delivery.channelCid;
final deliveredMessageId = delivery.messageId;

final currentMessage = _deliveryCandidates[deliveredChannelCid];
// Skip removal if a newer message has been added while we were
// processing the current batch.
if (currentMessage?.id != deliveredMessageId) continue;
_deliveryCandidates.remove(deliveredChannelCid);
}

// Schedule the next batch if there are remaining candidates.
if (_deliveryCandidates.isNotEmpty) {
_throttledMarkCandidatesAsDelivered.call();
}
});
} catch (e, stk) {
_logger?.warning('Failed to mark channels as delivered', e, stk);
}
}

/// Cancels all pending delivery reports.
///
/// Typically used when shutting down the reporter or permanently stopping
/// delivery reporting.
void cancel() {
_throttledMarkCandidatesAsDelivered.cancel();
_deliveryCandidatesLock.synchronized(_deliveryCandidates.clear);
}
}
54 changes: 46 additions & 8 deletions packages/stream_chat/lib/src/client/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:rxdart/rxdart.dart';
import 'package:stream_chat/src/client/channel.dart';
import 'package:stream_chat/src/client/channel_delivery_reporter.dart';
import 'package:stream_chat/src/client/retry_policy.dart';
import 'package:stream_chat/src/core/api/attachment_file_uploader.dart';
import 'package:stream_chat/src/core/api/requests.dart';
Expand All @@ -27,6 +28,7 @@ import 'package:stream_chat/src/core/models/event.dart';
import 'package:stream_chat/src/core/models/filter.dart';
import 'package:stream_chat/src/core/models/member.dart';
import 'package:stream_chat/src/core/models/message.dart';
import 'package:stream_chat/src/core/models/message_delivery.dart';
import 'package:stream_chat/src/core/models/message_reminder.dart';
import 'package:stream_chat/src/core/models/own_user.dart';
import 'package:stream_chat/src/core/models/poll.dart';
Expand Down Expand Up @@ -228,6 +230,15 @@ class StreamChatClient {

StreamSubscription<List<ConnectionStatus>>? _connectionStatusSubscription;

/// Manages delivery receipt reporting for channel messages.
///
/// Collects and batches delivery receipts to acknowledge message delivery
/// to senders across multiple channels.
late final channelDeliveryReporter = ChannelDeliveryReporter(
logger: detachedLogger('🧾'),
onMarkChannelsDelivered: markChannelsDelivered,
);

final _eventController = PublishSubject<Event>();

/// Stream of [Event] coming from [_ws] connection
Expand Down Expand Up @@ -776,6 +787,8 @@ class StreamChatClient {
logger.info('Got ${res.channels.length} channels from api');

final updateData = _mapChannelStateToChannel(channels);
// Submit delivery report for the channels fetched in this query.
await channelDeliveryReporter.submitForDelivery(updateData.value);

await chatPersistenceClient?.updateChannelQueries(
filter,
Expand Down Expand Up @@ -1661,6 +1674,29 @@ class StreamChatClient {
/// Mark all channels for this user as read
Future<EmptyResponse> markAllRead() => _chatApi.channel.markAllRead();

/// Sends delivery receipts for the latest messages in multiple channels.
///
/// Useful when receiving messages through push notifications where only
/// channel IDs and message IDs are available, without full channel/message
/// objects. For in-app message delivery, use [channelDeliveryReporter]
/// which handles this automatically.
///
/// ```dart
/// // From notification payload
/// final receipt = MessageDeliveryInfo(
/// channelCid: notificationData['channel_id'],
/// messageId: notificationData['message_id'],
/// );
/// await client.markChannelsDelivered([receipt]);
/// ```
///
/// Accepts up to 100 channels per call.
Future<EmptyResponse> markChannelsDelivered(
Iterable<MessageDelivery> deliveries,
) {
return _chatApi.channel.markChannelsDelivered([...deliveries]);
}

/// Send an event to a particular channel
Future<EmptyResponse> sendEvent(
String channelId,
Expand Down Expand Up @@ -2097,6 +2133,9 @@ class StreamChatClient {
Future<void> disconnectUser({bool flushChatPersistence = false}) async {
logger.info('Disconnecting user : ${state.currentUser?.id}');

// Cancelling delivery reporter.
channelDeliveryReporter.cancel();

// closing web-socket connection
closeConnection();

Expand Down Expand Up @@ -2235,10 +2274,12 @@ class ClientState {
void _listenAllChannelsRead() {
_eventsSubscription?.add(
_client.on(EventType.notificationMarkRead).listen((event) {
if (event.cid == null) {
channels.forEach((key, value) {
value.state?.unreadCount = 0;
});
// If a cid is provided, it means it's for a specific channel.
if (event.cid != null) return;

// Update all channels' unread count to 0.
for (final channel in channels.values) {
channel.state?.unreadCount = 0;
}
}),
);
Expand Down Expand Up @@ -2342,10 +2383,7 @@ class ClientState {

/// Adds a list of channels to the current list of cached channels
void addChannels(Map<String, Channel> channelMap) {
final newChannels = {
...channels,
...channelMap,
};
final newChannels = {...channels, ...channelMap};
channels = newChannels;
}

Expand Down
21 changes: 17 additions & 4 deletions packages/stream_chat/lib/src/core/api/channel_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import 'package:stream_chat/src/core/models/channel_state.dart';
import 'package:stream_chat/src/core/models/event.dart';
import 'package:stream_chat/src/core/models/filter.dart';
import 'package:stream_chat/src/core/models/message.dart';
import 'package:stream_chat/src/core/models/message_delivery.dart';

/// Defines the api dedicated to channel operations
class ChannelApi {
Expand Down Expand Up @@ -84,10 +85,7 @@ class ChannelApi {

/// Mark all channels for this user as read
Future<EmptyResponse> markAllRead() async {
final response = await _client.post(
'/channels/read',
data: {},
);
final response = await _client.post('/channels/read', data: {});
return EmptyResponse.fromJson(response.data);
}

Expand Down Expand Up @@ -395,4 +393,19 @@ class ChannelApi {
);
return PartialUpdateMemberResponse.fromJson(response.data);
}

/// Sends delivery receipts for the latest messages in multiple channels.
///
/// Accepts up to 100 channels per call.
Future<EmptyResponse> markChannelsDelivered(
List<MessageDelivery> deliveries,
) async {
final response = await _client.post(
'/channels/delivered',
data: jsonEncode({
'latest_delivered_messages': deliveries,
}),
);
return EmptyResponse.fromJson(response.data);
}
}
4 changes: 4 additions & 0 deletions packages/stream_chat/lib/src/core/models/channel_config.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ChannelConfig {
this.skipLastMsgUpdateForSystemMsgs = false,
this.userMessageReminders = false,
this.markMessagesPending = false,
this.deliveryEvents = false,
}) : createdAt = createdAt ?? DateTime.now(),
updatedAt = updatedAt ?? DateTime.now();

Expand Down Expand Up @@ -95,6 +96,9 @@ class ChannelConfig {
/// Whether pending messages are enabled for this channel.
final bool markMessagesPending;

/// Whether delivery events are enabled for this channel.
final bool deliveryEvents;

/// Serialize to json
Map<String, dynamic> toJson() => _$ChannelConfigToJson(this);
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions packages/stream_chat/lib/src/core/models/channel_model.dart
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ extension type const ChannelCapability(String capability) implements String {
/// Ability to receive read events.
static const readEvents = ChannelCapability('read-events');

/// Ability to receive delivery events.
static const deliveryEvents = ChannelCapability('delivery-events');

/// Ability to receive connect events.
static const connectEvents = ChannelCapability('connect-events');

Expand Down
Loading
Loading