Skip to content

Commit c0651b4

Browse files
committed
Implementing the free on the PushStream
Signed-off-by: alanprot <[email protected]>
1 parent 3ed0ccd commit c0651b4

File tree

4 files changed

+217
-108
lines changed

4 files changed

+217
-108
lines changed

integration/grpc_server_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"flag"
99
"fmt"
10+
"io"
1011
"math/rand"
1112
"net"
1213
"strconv"
@@ -37,6 +38,25 @@ func (m mockGprcServer) QueryStream(req *ingester_client.QueryRequest, streamSer
3738
return streamServer.Send(createStreamResponse(i))
3839
}
3940

41+
func (m mockGprcServer) PushStream(srv ingester_client.Ingester_PushStreamServer) error {
42+
for {
43+
req, err := srv.Recv()
44+
if err == io.EOF {
45+
return nil
46+
}
47+
ctx := metadata.NewIncomingContext(srv.Context(), metadata.MD{"i": []string{req.TenantID}})
48+
res, err := m.Push(ctx, req.Request)
49+
req.Free()
50+
if err != nil {
51+
return err
52+
}
53+
err = srv.Send(res)
54+
if err != nil {
55+
return err
56+
}
57+
}
58+
}
59+
4060
func (m mockGprcServer) Push(ctx context.Context, request *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
4161
defer request.Free()
4262
time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
@@ -130,6 +150,38 @@ func TestConcurrentGrpcCalls(t *testing.T) {
130150
wg.Wait()
131151
},
132152
},
153+
"distributor push stream": {
154+
cfg: cfg,
155+
register: func(s *grpc.Server) {
156+
d := &mockGprcServer{}
157+
ingester_client.RegisterIngesterServer(s, d)
158+
},
159+
validate: func(t *testing.T, conn *grpc.ClientConn) {
160+
ctx := context.Background()
161+
client := ingester_client.NewIngesterClient(conn)
162+
wg := sync.WaitGroup{}
163+
n := 10000
164+
wg.Add(n)
165+
for i := 0; i < n; i++ {
166+
go func(i int) {
167+
defer wg.Done()
168+
stream, err := client.PushStream(ctx)
169+
require.NoError(t, err)
170+
171+
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
172+
err = stream.Send(&cortexpb.StreamWriteRequest{TenantID: strconv.Itoa(i), Request: createRequest(i)})
173+
require.NoError(t, err)
174+
_, err = stream.Recv()
175+
require.NoError(t, err)
176+
//err = stream.Send(&cortexpb.StreamWriteRequest{"i", createRequest(i + 1)})
177+
//require.NoError(t, err)
178+
require.NoError(t, stream.CloseSend())
179+
}(i)
180+
}
181+
182+
wg.Wait()
183+
},
184+
},
133185
"ingester": {
134186
cfg: cfg,
135187
register: func(s *grpc.Server) {

0 commit comments

Comments
 (0)