Skip to content

Commit de2f8f3

Browse files
authored
coalesce updates to reduce intermediate updates (#7328)
* coalesce updates to reduce redundant processing in subscription handler Signed-off-by: Huabing Zhao <[email protected]> * retain order Signed-off-by: Huabing Zhao <[email protected]> * keep intermediate delete updates Signed-off-by: Huabing Zhao <[email protected]> * minor wording Signed-off-by: Huabing Zhao <[email protected]> * treat delete as normal operations Signed-off-by: Huabing Zhao <[email protected]> * retain the original order of the last updates for each key Signed-off-by: Huabing Zhao <[email protected]> * address comments Signed-off-by: Huabing Zhao <[email protected]> * fix test Signed-off-by: Huabing Zhao <[email protected]> --------- Signed-off-by: Huabing Zhao <[email protected]>
1 parent c429eac commit de2f8f3

File tree

3 files changed

+108
-3
lines changed

3 files changed

+108
-3
lines changed

internal/message/watchutil.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,41 @@ func HandleSubscription[K comparable, V any](
9999
}
100100
for snapshot := range subscription {
101101
watchableDepth.With(meta.LabelValues()...).Record(float64(len(subscription)))
102-
for _, update := range snapshot.Updates {
102+
103+
for _, update := range coalesceUpdates(meta.Runner, snapshot.Updates) {
103104
handleWithCrashRecovery(handle, Update[K, V](update), meta, errChans)
104105
}
105106
}
106107
}
108+
109+
// coalesceUpdates merges multiple updates for the same key into a single update,
110+
// preserving the latest state for each key.
111+
// This helps reduce redundant processing and ensures that only the most recent update per key is handled.
112+
func coalesceUpdates[K comparable, V any](runner string, updates []watchable.Update[K, V]) []watchable.Update[K, V] {
113+
if len(updates) <= 1 {
114+
return updates
115+
}
116+
117+
seen := make(map[K]struct{}, len(updates))
118+
write := len(updates) - 1
119+
120+
for read := len(updates) - 1; read >= 0; read-- {
121+
update := updates[read]
122+
if _, ok := seen[update.Key]; ok {
123+
continue
124+
}
125+
seen[update.Key] = struct{}{}
126+
updates[write] = update
127+
write--
128+
}
129+
130+
result := updates[write+1:]
131+
if len(result) != len(updates) {
132+
logger.WithValues("runner", runner).Info(
133+
"coalesced updates",
134+
"count", len(result),
135+
"before", len(updates),
136+
)
137+
}
138+
return result
139+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright Envoy Gateway Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
// The full text of the Apache license is available in the LICENSE file at
4+
// the root of the repo.
5+
6+
package message
7+
8+
import (
9+
"testing"
10+
11+
"github.com/stretchr/testify/require"
12+
"github.com/telepresenceio/watchable"
13+
)
14+
15+
func TestCoalesceUpdates(t *testing.T) {
16+
t.Parallel()
17+
18+
tests := []struct {
19+
name string
20+
input []watchable.Update[string, int]
21+
expected []watchable.Update[string, int]
22+
}{
23+
{
24+
name: "empty input returns nil",
25+
input: []watchable.Update[string, int]{},
26+
expected: []watchable.Update[string, int]{},
27+
},
28+
{
29+
name: "simple updates without repeats",
30+
input: []watchable.Update[string, int]{
31+
{Key: "foo", Value: 1},
32+
{Key: "bar", Value: 2},
33+
{Key: "baz", Value: 3},
34+
},
35+
expected: []watchable.Update[string, int]{
36+
{Key: "foo", Value: 1},
37+
{Key: "bar", Value: 2},
38+
{Key: "baz", Value: 3},
39+
},
40+
},
41+
{
42+
name: "latest update per key wins",
43+
input: []watchable.Update[string, int]{
44+
{Key: "foo", Value: 1},
45+
{Key: "bar", Delete: true, Value: 2},
46+
{Key: "baz", Value: 3},
47+
{Key: "bar", Value: 4},
48+
{Key: "foo", Value: 5},
49+
{Key: "baz", Delete: true, Value: 6},
50+
{Key: "bar", Value: 7},
51+
},
52+
expected: []watchable.Update[string, int]{
53+
{Key: "foo", Value: 5},
54+
{Key: "baz", Delete: true, Value: 6},
55+
{Key: "bar", Value: 7},
56+
},
57+
},
58+
}
59+
60+
for _, tc := range tests {
61+
t.Run(tc.name, func(t *testing.T) {
62+
t.Parallel()
63+
64+
actual := coalesceUpdates("test-runner", tc.input)
65+
require.Equal(t, tc.expected, actual)
66+
})
67+
}
68+
}

internal/message/watchutil_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func TestHandleSubscriptionAlreadyInitialized(t *testing.T) {
8989
}
9090
},
9191
)
92-
assert.Equal(t, 2, storeCalls)
92+
assert.LessOrEqual(t, 2, storeCalls) // updates can be coalesced
9393
assert.Equal(t, 1, deleteCalls)
9494
}
9595

@@ -268,7 +268,11 @@ func TestControllerResourceUpdate(t *testing.T) {
268268
m.GatewayAPIResources.Close()
269269
}
270270
})
271-
assert.Equal(t, tc.updates, updates)
271+
if tc.updates > 1 {
272+
assert.LessOrEqual(t, updates, tc.updates) // Updates can be coalesced
273+
} else {
274+
assert.Equal(t, 1, updates)
275+
}
272276
})
273277
}
274278
}

0 commit comments

Comments
 (0)