Skip to content

Commit 1889f16

Browse files
authored
Merge pull request #1805 Topic listener parallelization
2 parents 9dc2525 + d5ff176 commit 1889f16

24 files changed

+4084
-465
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added read partitions in parallel for topic listener.
2+
13
## v3.109.0
24
* Added control plane fields for split-merge topics (Create,Alter,Describe)
35

internal/grpcwrapper/rawtopic/rawtopiccommon/server_message_metadata.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,25 @@ func (m *ServerMessageMetadata) StatusData() ServerMessageMetadata {
3232
func (m *ServerMessageMetadata) SetStatus(status rawydb.StatusCode) {
3333
m.Status = status
3434
}
35+
36+
// Equals compares this ServerMessageMetadata with another ServerMessageMetadata for equality
37+
func (m *ServerMessageMetadata) Equals(other *ServerMessageMetadata) bool {
38+
if m == nil && other == nil {
39+
return true
40+
}
41+
if m == nil || other == nil {
42+
return false
43+
}
44+
45+
// Compare status using StatusCode's Equals method
46+
if !m.Status.Equals(other.Status) {
47+
return false
48+
}
49+
50+
// Compare issues using Issues' Equals method
51+
if !m.Issues.Equals(other.Issues) {
52+
return false
53+
}
54+
55+
return true
56+
}
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
package rawtopiccommon
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
9+
)
10+
11+
func TestServerMessageMetadataInterface_Equals(t *testing.T) {
12+
t.Run("NilComparison", func(t *testing.T) {
13+
var meta1, meta2 *ServerMessageMetadata
14+
require.True(t, meta1.Equals(meta2)) // both nil
15+
16+
meta1 = &ServerMessageMetadata{}
17+
require.False(t, meta1.Equals(meta2)) // one nil
18+
require.False(t, meta2.Equals(meta1)) // reversed nil
19+
})
20+
21+
t.Run("IdenticalEmpty", func(t *testing.T) {
22+
meta1 := &ServerMessageMetadata{}
23+
meta2 := &ServerMessageMetadata{}
24+
require.True(t, meta1.Equals(meta2))
25+
require.True(t, meta2.Equals(meta1)) // symmetric
26+
})
27+
28+
t.Run("IdenticalWithStatus", func(t *testing.T) {
29+
meta1 := &ServerMessageMetadata{
30+
Status: rawydb.StatusSuccess,
31+
}
32+
meta2 := &ServerMessageMetadata{
33+
Status: rawydb.StatusSuccess,
34+
}
35+
require.True(t, meta1.Equals(meta2))
36+
})
37+
38+
t.Run("DifferentStatus", func(t *testing.T) {
39+
meta1 := &ServerMessageMetadata{
40+
Status: rawydb.StatusSuccess,
41+
}
42+
meta2 := &ServerMessageMetadata{
43+
Status: rawydb.StatusInternalError,
44+
}
45+
require.False(t, meta1.Equals(meta2))
46+
})
47+
48+
t.Run("IdenticalWithIssues", func(t *testing.T) {
49+
issues := rawydb.Issues{
50+
{Code: 100, Message: "test issue"},
51+
}
52+
meta1 := &ServerMessageMetadata{
53+
Issues: issues,
54+
}
55+
meta2 := &ServerMessageMetadata{
56+
Issues: issues,
57+
}
58+
require.True(t, meta1.Equals(meta2))
59+
})
60+
61+
t.Run("DifferentIssues", func(t *testing.T) {
62+
meta1 := &ServerMessageMetadata{
63+
Issues: rawydb.Issues{
64+
{Code: 100, Message: "issue1"},
65+
},
66+
}
67+
meta2 := &ServerMessageMetadata{
68+
Issues: rawydb.Issues{
69+
{Code: 200, Message: "issue2"},
70+
},
71+
}
72+
require.False(t, meta1.Equals(meta2))
73+
})
74+
75+
t.Run("IdenticalStatusAndIssues", func(t *testing.T) {
76+
meta1 := &ServerMessageMetadata{
77+
Status: rawydb.StatusSuccess,
78+
Issues: rawydb.Issues{
79+
{Code: 100, Message: "warning"},
80+
{Code: 200, Message: "info"},
81+
},
82+
}
83+
meta2 := &ServerMessageMetadata{
84+
Status: rawydb.StatusSuccess,
85+
Issues: rawydb.Issues{
86+
{Code: 100, Message: "warning"},
87+
{Code: 200, Message: "info"},
88+
},
89+
}
90+
require.True(t, meta1.Equals(meta2))
91+
})
92+
93+
t.Run("ComplexNestedIssues", func(t *testing.T) {
94+
meta1 := &ServerMessageMetadata{
95+
Status: rawydb.StatusInternalError,
96+
Issues: rawydb.Issues{
97+
{
98+
Code: 100,
99+
Message: "parent issue",
100+
Issues: rawydb.Issues{
101+
{Code: 101, Message: "child issue 1"},
102+
{Code: 102, Message: "child issue 2"},
103+
},
104+
},
105+
},
106+
}
107+
meta2 := &ServerMessageMetadata{
108+
Status: rawydb.StatusInternalError,
109+
Issues: rawydb.Issues{
110+
{
111+
Code: 100,
112+
Message: "parent issue",
113+
Issues: rawydb.Issues{
114+
{Code: 101, Message: "child issue 1"},
115+
{Code: 102, Message: "child issue 2"},
116+
},
117+
},
118+
},
119+
}
120+
require.True(t, meta1.Equals(meta2))
121+
})
122+
123+
t.Run("DifferentNestedIssues", func(t *testing.T) {
124+
meta1 := &ServerMessageMetadata{
125+
Status: rawydb.StatusInternalError,
126+
Issues: rawydb.Issues{
127+
{
128+
Code: 100,
129+
Message: "parent issue",
130+
Issues: rawydb.Issues{
131+
{Code: 101, Message: "child issue 1"},
132+
},
133+
},
134+
},
135+
}
136+
meta2 := &ServerMessageMetadata{
137+
Status: rawydb.StatusInternalError,
138+
Issues: rawydb.Issues{
139+
{
140+
Code: 100,
141+
Message: "parent issue",
142+
Issues: rawydb.Issues{
143+
{Code: 102, Message: "child issue 2"}, // different nested issue
144+
},
145+
},
146+
},
147+
}
148+
require.False(t, meta1.Equals(meta2))
149+
})
150+
}
151+
152+
func TestServerMessageMetadataImpl_EdgeCases(t *testing.T) {
153+
t.Run("EmptyVsNilIssues", func(t *testing.T) {
154+
meta1 := &ServerMessageMetadata{
155+
Status: rawydb.StatusSuccess,
156+
Issues: nil,
157+
}
158+
meta2 := &ServerMessageMetadata{
159+
Status: rawydb.StatusSuccess,
160+
Issues: make(rawydb.Issues, 0),
161+
}
162+
require.True(t, meta1.Equals(meta2)) // nil slice equals empty slice
163+
})
164+
165+
t.Run("OneFieldDifferent", func(t *testing.T) {
166+
// Same status, different issues
167+
meta1 := &ServerMessageMetadata{
168+
Status: rawydb.StatusSuccess,
169+
Issues: rawydb.Issues{{Code: 1, Message: "test"}},
170+
}
171+
meta2 := &ServerMessageMetadata{
172+
Status: rawydb.StatusSuccess,
173+
Issues: rawydb.Issues{{Code: 2, Message: "test"}},
174+
}
175+
require.False(t, meta1.Equals(meta2))
176+
177+
// Different status, same issues
178+
meta3 := &ServerMessageMetadata{
179+
Status: rawydb.StatusInternalError,
180+
Issues: rawydb.Issues{{Code: 1, Message: "test"}},
181+
}
182+
meta4 := &ServerMessageMetadata{
183+
Status: rawydb.StatusSuccess,
184+
Issues: rawydb.Issues{{Code: 1, Message: "test"}},
185+
}
186+
require.False(t, meta3.Equals(meta4))
187+
})
188+
189+
t.Run("SelfComparison", func(t *testing.T) {
190+
meta := &ServerMessageMetadata{
191+
Status: rawydb.StatusSuccess,
192+
Issues: rawydb.Issues{
193+
{Code: 100, Message: "test"},
194+
},
195+
}
196+
require.True(t, meta.Equals(meta)) // self comparison
197+
})
198+
}

internal/grpcwrapper/rawydb/issues.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,37 @@ func (issue *Issue) String() string {
5858

5959
return fmt.Sprintf("message: %v, code: %v%v", issue.Message, issue.Code, innerIssues)
6060
}
61+
62+
// Equals compares this Issue with another Issue for equality
63+
func (issue *Issue) Equals(other *Issue) bool {
64+
if issue == nil && other == nil {
65+
return true
66+
}
67+
if issue == nil || other == nil {
68+
return false
69+
}
70+
71+
if issue.Code != other.Code {
72+
return false
73+
}
74+
if issue.Message != other.Message {
75+
return false
76+
}
77+
78+
return issue.Issues.Equals(other.Issues)
79+
}
80+
81+
// Equals compares this Issues slice with another Issues slice for equality
82+
func (issuesPointer Issues) Equals(other Issues) bool {
83+
if len(issuesPointer) != len(other) {
84+
return false
85+
}
86+
87+
for i := range issuesPointer {
88+
if !issuesPointer[i].Equals(&other[i]) {
89+
return false
90+
}
91+
}
92+
93+
return true
94+
}

0 commit comments

Comments
 (0)