-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmixed_batch_test.go
More file actions
222 lines (189 loc) · 5.4 KB
/
mixed_batch_test.go
File metadata and controls
222 lines (189 loc) · 5.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package comet
import (
"context"
"testing"
)
func TestMixedBatchCompression(t *testing.T) {
dir := t.TempDir()
// Create config with compression enabled
config := DefaultCometConfig()
config.Compression.MinCompressSize = 100 // Compress entries >= 100 bytes
client, err := NewClient(dir, config)
if err != nil {
t.Fatal(err)
}
defer client.Close()
ctx := context.Background()
streamName := "test:v1:shard:0000"
// Create a mixed batch
smallEntry := []byte("small") // 5 bytes - won't be compressed
largeEntry := make([]byte, 200) // 200 bytes - will be compressed
for i := range largeEntry {
largeEntry[i] = byte('A' + i%26) // Fill with pattern ABCD...
}
// Create mixed batch: alternating small and large
entries := [][]byte{
smallEntry,
largeEntry,
smallEntry,
largeEntry,
smallEntry,
largeEntry,
smallEntry,
largeEntry,
smallEntry,
largeEntry,
}
// Append the mixed batch
ids, err := client.Append(ctx, streamName, entries)
if err != nil {
t.Fatal(err)
}
if len(ids) != len(entries) {
t.Errorf("Expected %d IDs, got %d", len(entries), len(ids))
}
// Sync to make entries durable and trackable in index
if err := client.Sync(ctx); err != nil {
t.Fatal(err)
}
// Get stats to verify compression happened
stats := client.GetStats()
t.Logf("Stats after mixed batch:")
t.Logf(" Total entries: %d", stats.TotalEntries)
t.Logf(" Compressed entries: %d", stats.CompressedEntries)
t.Logf(" Skipped compression: %d", stats.SkippedCompression)
t.Logf(" Total bytes: %d", stats.TotalBytes)
t.Logf(" Total compressed: %d", stats.TotalCompressed)
// Verify we have both compressed and uncompressed entries
if stats.CompressedEntries == 0 {
t.Error("Expected some entries to be compressed")
}
if stats.SkippedCompression == 0 {
t.Error("Expected some entries to skip compression")
}
// For a mixed batch, we should have:
// - 5 small entries (not compressed but still written)
// - 5 large entries (compressed and written)
expectedCompressed := int64(5)
expectedSkipped := int64(5)
if stats.CompressedEntries < expectedCompressed {
t.Errorf("Expected at least %d compressed entries, got %d", expectedCompressed, stats.CompressedEntries)
}
if stats.SkippedCompression < expectedSkipped {
t.Errorf("Expected at least %d entries to skip compression, got %d", expectedSkipped, stats.SkippedCompression)
}
// Verify all entries were written (compressed + skipped should be <= total)
// Note: These are cumulative stats, so we check that our batch contributed correctly
if stats.TotalEntries < int64(len(entries)) {
t.Errorf("Expected at least %d total entries written, got %d", len(entries), stats.TotalEntries)
}
// Verify the shard has the correct number of entries
shard, err := client.getOrCreateShard(0)
if err != nil {
t.Fatal(err)
}
if shard.index.CurrentEntryNumber != int64(len(entries)) {
t.Errorf("Expected shard to have %d entries, but CurrentEntryNumber is %d",
len(entries), shard.index.CurrentEntryNumber)
}
t.Logf("All %d entries were written successfully:", len(entries))
t.Logf(" - %d were compressed (large entries >= 100 bytes)", expectedCompressed)
t.Logf(" - %d were written uncompressed (small entries < 100 bytes)", expectedSkipped)
}
func TestMixedBatchAllocations(t *testing.T) {
dir := t.TempDir()
config := DefaultCometConfig()
config.Compression.MinCompressSize = 100
client, err := NewClient(dir, config)
if err != nil {
t.Fatal(err)
}
defer client.Close()
ctx := context.Background()
streamName := "test:v1:shard:0000"
// Test allocations for different scenarios
scenarios := []struct {
name string
entries [][]byte
}{
{
name: "AllSmall",
entries: [][]byte{
[]byte("small1"),
[]byte("small2"),
[]byte("small3"),
[]byte("small4"),
[]byte("small5"),
},
},
{
name: "AllLarge",
entries: [][]byte{
make([]byte, 200),
make([]byte, 200),
make([]byte, 200),
make([]byte, 200),
make([]byte, 200),
},
},
{
name: "Mixed",
entries: [][]byte{
[]byte("small1"),
make([]byte, 200),
[]byte("small2"),
make([]byte, 200),
[]byte("small3"),
},
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
// Count allocations
allocs := testing.AllocsPerRun(100, func() {
_, err := client.Append(ctx, streamName, scenario.entries)
if err != nil {
t.Fatal(err)
}
})
t.Logf("%s: %.1f allocations", scenario.name, allocs)
})
}
}
func BenchmarkMixedBatch(b *testing.B) {
dir := b.TempDir()
config := DefaultCometConfig()
config.Compression.MinCompressSize = 100
client, err := NewClient(dir, config)
if err != nil {
b.Fatal(err)
}
defer client.Close()
ctx := context.Background()
streamName := "test:v1:shard:0000"
// Create a realistic mixed batch
entries := make([][]byte, 100)
for i := range entries {
if i%3 == 0 {
// 1/3 are large, compressible
entries[i] = make([]byte, 500)
for j := range entries[i] {
entries[i][j] = byte('A' + j%26)
}
} else {
// 2/3 are small, not compressed
entries[i] = []byte("log entry " + string(rune('0'+i%10)))
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, err := client.Append(ctx, streamName, entries)
if err != nil {
b.Fatal(err)
}
}
stats := client.GetStats()
b.ReportMetric(float64(stats.CompressedEntries), "compressed")
b.ReportMetric(float64(stats.SkippedCompression), "skipped")
}