Skip to content

Commit ed12a7d

Browse files
xsahil03xclaude
andauthored
fix(llc, core): coalesce queryChannels on flaky-network reconnects (#2652)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1139f35 commit ed12a7d

9 files changed

Lines changed: 508 additions & 78 deletions

File tree

packages/stream_chat/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
- Fixed `StreamChatClient.queryDrafts` not forwarding the `filter` argument to the API.
66

7+
- Coalesced concurrent `queryChannels` calls with identical parameters via an in-flight cache.
8+
Rapid reconnect bursts no longer fire multiple duplicate requests in parallel.
9+
710
✅ Added
811

912
- Added `SortedListX` and `ListX` extensions on `List` for keyed merge, sorted insert/upsert and
@@ -13,6 +16,10 @@
1316
requested number. No-op when the channel isn't up to date. Prefer `StreamChannel.pruneOldest`
1417
from `stream_chat_flutter_core` when available — it also resets top-pagination.
1518

19+
- Added `StreamChatClient.recoverStateOnReconnect` setter (default `true`). Controls whether the
20+
client re-queries active channels on WebSocket reconnect. Set to `false` if your app handles its
21+
own state recovery — e.g. via channel-list controllers.
22+
1623
🚀 Performance
1724

1825
- Faster channel state updates, especially for read receipts, reactions, and other partial-state

packages/stream_chat/lib/src/client/client.dart

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import 'package:stream_chat/src/core/models/poll_vote.dart';
3737
import 'package:stream_chat/src/core/models/push_preference.dart';
3838
import 'package:stream_chat/src/core/models/thread.dart';
3939
import 'package:stream_chat/src/core/models/user.dart';
40+
import 'package:stream_chat/src/core/util/in_flight_cache.dart';
4041
import 'package:stream_chat/src/core/util/utils.dart';
4142
import 'package:stream_chat/src/db/chat_persistence_client.dart';
4243
import 'package:stream_chat/src/event_type.dart';
@@ -88,7 +89,8 @@ class StreamChatClient {
8889
StreamAttachmentFileUploader.new,
8990
Iterable<Interceptor>? chatApiInterceptors,
9091
HttpClientAdapter? httpClientAdapter,
91-
}) {
92+
bool recoverStateOnReconnect = true,
93+
}) : _recoverStateOnReconnect = recoverStateOnReconnect {
9294
logger.info('Initiating new StreamChatClient');
9395

9496
final options = StreamHttpClientOptions(
@@ -192,6 +194,21 @@ class StreamChatClient {
192194
/// The retry policy options getter
193195
RetryPolicy get retryPolicy => _retryPolicy;
194196

197+
/// Whether the client should automatically refresh local state from the
198+
/// server when the WebSocket connection recovers.
199+
///
200+
/// When `true` (default), the client re-queries the active channels on
201+
/// reconnect (capped at 30, ordered by `state.channels.keys`). The set of
202+
/// state recovered on reconnect may grow in the future to cover threads,
203+
/// reminders, etc.
204+
///
205+
/// Setting this to `false` disables that client-level recovery. Consumers
206+
/// that opt out are responsible for refreshing their own state when the
207+
/// [EventType.connectionRecovered] event fires — for example, by re-running
208+
/// their channel list query.
209+
set recoverStateOnReconnect(bool value) => _recoverStateOnReconnect = value;
210+
bool _recoverStateOnReconnect;
211+
195212
/// By default the Chat client will write all messages with level Warn or
196213
/// Error to stdout.
197214
///
@@ -530,13 +547,17 @@ class StreamChatClient {
530547
// connection recovered
531548
final cids = [...state.channels.keys.toSet()];
532549
if (cids.isNotEmpty) {
533-
await queryChannelsOnline(
534-
filter: Filter.in_('cid', cids),
535-
paginationParams: const PaginationParams(limit: 30),
536-
);
537-
538550
// Sync the persistence client if available
539551
if (persistenceEnabled) await sync(cids: cids);
552+
553+
// Recover the channels that were active before the connection was lost,
554+
// only if the client is configured to do so.
555+
if (_recoverStateOnReconnect) {
556+
await queryChannelsOnline(
557+
filter: Filter.in_('cid', cids),
558+
paginationParams: const PaginationParams(limit: 30),
559+
);
560+
}
540561
}
541562

542563
handleEvent(Event(
@@ -613,7 +634,7 @@ class StreamChatClient {
613634
});
614635
}
615636

616-
final _queryChannelsStreams = <String, Future<List<Channel>>>{};
637+
final _queryChannelsCache = InFlightCache<String, List<Channel>>();
617638

618639
/// Requests channels with a given query.
619640
Stream<List<Channel>> queryChannels({
@@ -643,19 +664,7 @@ class StreamChatClient {
643664
paginationParams,
644665
]);
645666

646-
// Return results from cache if available
647-
if (_queryChannelsStreams.containsKey(hash)) {
648-
try {
649-
yield await _queryChannelsStreams[hash]!;
650-
return;
651-
} catch (e, stk) {
652-
logger.severe('Error retrieving cached query results', e, stk);
653-
// Cache is invalid, continue with fresh query
654-
_queryChannelsStreams.remove(hash);
655-
}
656-
}
657-
658-
// Get offline results first
667+
// Per-caller offline emit — local persistence, not coalesced.
659668
var offlineChannels = <Channel>[];
660669
try {
661670
offlineChannels = await queryChannelsOffline(
@@ -671,31 +680,30 @@ class StreamChatClient {
671680
}
672681

673682
try {
674-
final newQueryChannelsFuture = queryChannelsOnline(
675-
filter: filter,
676-
sort: channelStateSort,
677-
state: state,
678-
watch: watch,
679-
presence: presence,
680-
memberLimit: memberLimit,
681-
messageLimit: messageLimit,
682-
paginationParams: paginationParams,
683-
waitForConnect: waitForConnect,
684-
).timeout(
685-
const Duration(seconds: 30),
686-
onTimeout: () {
687-
logger.warning('Online channel query timed out');
688-
throw TimeoutException('Channel query timed out');
689-
},
690-
).whenComplete(() {
691-
// Always clean up cache reference when done
692-
_queryChannelsStreams.remove(hash);
693-
});
694-
695-
// Store the future in cache
696-
_queryChannelsStreams[hash] = newQueryChannelsFuture;
697-
698-
yield await newQueryChannelsFuture;
683+
// Coalesce concurrent identical online queries — concurrent callers
684+
// share both success and failure outcomes. See [InFlightCache] for
685+
// the lifecycle details.
686+
final result = await _queryChannelsCache.run(
687+
hash,
688+
() => queryChannelsOnline(
689+
filter: filter,
690+
sort: channelStateSort,
691+
state: state,
692+
watch: watch,
693+
presence: presence,
694+
memberLimit: memberLimit,
695+
messageLimit: messageLimit,
696+
paginationParams: paginationParams,
697+
waitForConnect: waitForConnect,
698+
).timeout(
699+
const Duration(seconds: 30),
700+
onTimeout: () {
701+
logger.warning('Online channel query timed out');
702+
throw TimeoutException('Channel query timed out');
703+
},
704+
),
705+
);
706+
yield result;
699707
} catch (e, stk) {
700708
logger.severe('Error querying channels online', e, stk);
701709
// Only rethrow if we have no channels to show the user
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import 'dart:async';
2+
3+
/// Coalesces concurrent identical async calls by sharing a single in-flight
4+
/// [Future] keyed by [K].
5+
///
6+
/// When [run] is called and no [Future] is in flight for [key], the [work]
7+
/// closure runs and its [Future] is cached. Concurrent callers passing the
8+
/// same [key] receive the cached [Future] and share its eventual outcome —
9+
/// both success and failure. Sequential callers arriving after the [Future]
10+
/// settles see an empty cache and start fresh.
11+
///
12+
/// Sharing failures is intentional: falling through to a fresh call on
13+
/// error would defeat the dedup precisely when it matters most (e.g., a
14+
/// rate-limit storm), turning one rejected request into N.
15+
///
16+
/// Mirrors the `DistinctChatApi` / `DistinctCall` design used across the
17+
/// Android Stream Chat SDK.
18+
class InFlightCache<K, V> {
19+
final _inFlight = <K, Future<V>>{};
20+
21+
/// Returns the in-flight [Future] for [key] if one exists, otherwise runs
22+
/// [work], caches its [Future], and frees the slot when it settles.
23+
Future<V> run(K key, Future<V> Function() work) {
24+
if (_inFlight[key] case final existing?) return existing;
25+
26+
// [Future.sync] runs [work] synchronously on the current microtask and
27+
// wraps any synchronous throw in a rejected [Future] — so concurrent
28+
// callers in the same tick share the same single invocation.
29+
final future = Future.sync(work);
30+
_inFlight[key] = future;
31+
32+
// The cleanup chain runs regardless of outcome. The trailing [ignore]
33+
// silences unhandled-error reporting on the chained future since
34+
// [whenComplete] forwards the original error rather than handling it.
35+
future.whenComplete(() => _inFlight.remove(key)).ignore();
36+
37+
return future;
38+
}
39+
}

0 commit comments

Comments
 (0)