Skip to content

Commit 8c94bcc

Browse files
ozevrenistio-testing
authored andcommitted
Fix MCP dial-out mode. (istio#13399)
* Fix MCP dial-out mode. + The MCP dial-out mode sends an initial trigger response to trigger proper server/client communication. This is needed under certain scenarios. The original code expected a NACK response to this using a synchronous wait. However, this caused problems as the NACK can be sent *after* the actual resource requests are enqueued in the gRPC stream. This PR fixes the issue by making the handling of the trigger response in-line, as part of regular stream handling. + Adding a new dial-out integration tests capturing the basic scenario. + Adding a sleep in the Galley integration component, as the component startup is inherently racy. There is a race between setting the os signal event handlers during startup and applicatrion of configuration (and subsequent event trigger). The stop-gap solution is to sleep. The right solution is to go back to the correct ordering model for the startup of Galley. * Add an explicit name to the trigger collection to avoid collisions. * Fix lint issues. * Fix lint issues. * Remove failing test case. * Update code coverage.
1 parent cd4158d commit 8c94bcc

File tree

11 files changed

+269
-27
lines changed

11 files changed

+269
-27
lines changed

codecov.threshold

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ istio.io/istio/pilot/pkg/proxy/envoy/v2=10
2727
istio.io/istio/pilot/pkg/proxy/envoy/v2/rds.go=25
2828
istio.io/istio/pilot/pkg/serviceregistry/consul/monitor.go=20
2929
istio.io/istio/pkg/mcp/creds/watcher.go=100
30+
istio.io/istio/pkg/mcp/source/client_source.go=90
3031
istio.io/istio/pkg/mcp/source/source.go=90
3132
istio.io/istio/security/pkg/nodeagent=15
3233
istio.io/istio/istioctl/cmd/istioctl/kubeinject.go=41

pkg/appsignals/watcher.go

+3
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,13 @@ func Watch(c chan<- Signal) {
6464
func Notify(trigger string, signal os.Signal) {
6565
handlers.Lock()
6666
defer handlers.Unlock()
67+
log.Infof("watcher.Notify: (trigger: %q, signal: %v)", trigger, signal)
6768
for _, v := range handlers.listeners {
69+
log.Debugf("watcher.Notify: Dispatching to listener '%v' (trigger: %q, signal: %v)", v, trigger, signal)
6870
select {
6971
case v <- Signal{trigger, signal}:
7072
default:
73+
log.Warnf("watcher.Notify: Signal channel is full (trigger: %q, signal: %v)", trigger, signal)
7174
}
7275
}
7376
}

pkg/mcp/sink/client_sink.go

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type Client struct {
4141
reporter monitoring.Reporter
4242
}
4343

44+
// NewClient returns a new instance of Client.
4445
func NewClient(client mcp.ResourceSourceClient, options *Options) *Client {
4546
return &Client{
4647
Sink: New(options),

pkg/mcp/source/client_source.go

+12-19
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,21 @@ package source
1616

1717
import (
1818
"context"
19-
"fmt"
2019
"io"
2120
"time"
2221

23-
"google.golang.org/grpc/codes"
24-
2522
"github.com/gogo/status"
23+
"google.golang.org/grpc/codes"
2624

2725
mcp "istio.io/api/mcp/v1alpha1"
26+
2827
"istio.io/istio/pkg/mcp/monitoring"
2928
)
3029

3130
var (
3231
// try to re-establish the bi-directional grpc stream after this delay.
3332
reestablishStreamDelay = time.Second
33+
triggerCollection = "$triggerCollection"
3434
)
3535

3636
// Client implements the client for the MCP sink service. The client is the
@@ -44,6 +44,7 @@ type Client struct {
4444
source *Source
4545
}
4646

47+
// NewClient returns a new instance of Client.
4748
func NewClient(client mcp.ResourceSinkClient, options *Options) *Client {
4849
return &Client{
4950
source: New(options),
@@ -59,31 +60,23 @@ var reconnectTestProbe = func() {}
5960
// trigger response which we expect the server to NACK.
6061
func (c *Client) sendTriggerResponse(stream Stream) error {
6162
trigger := &mcp.Resources{
62-
Collection: "", // unimplemented collection
63+
Collection: triggerCollection,
6364
}
6465

6566
if err := stream.Send(trigger); err != nil {
6667
return status.Errorf(status.Code(err), "could not send trigger request %v", err)
6768
}
6869

69-
msg, err := stream.Recv()
70-
if err != nil {
71-
return status.Errorf(status.Code(err),
72-
"could not receive expected nack response: %v", err)
73-
}
74-
75-
if msg.ErrorDetail == nil {
76-
return fmt.Errorf("server should have nacked, did not get an error")
77-
}
78-
errCode := codes.Code(msg.ErrorDetail.Code)
79-
if errCode != codes.Unimplemented {
80-
return fmt.Errorf("server should have nacked with code=%v: got %v",
81-
codes.Unimplemented, errCode)
82-
}
83-
8470
return nil
8571
}
8672

73+
// isTriggerResponse checks whether the given RequestResources object is an expected NACK response to a previous
74+
// trigger message.
75+
func isTriggerResponse(msg *mcp.RequestResources) bool {
76+
return msg.Collection == triggerCollection && msg.ErrorDetail != nil && codes.Code(msg.ErrorDetail.Code) == codes.Unimplemented
77+
}
78+
79+
// Run implements mcpClient
8780
func (c *Client) Run(ctx context.Context) {
8881
// The first attempt is immediate.
8982
retryDelay := time.Nanosecond

pkg/mcp/source/client_source_test.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func TestClientSource(t *testing.T) {
8585
Resources: []*mcp.Resource{test.Type2A[0].Resource},
8686
})
8787

88-
h.requestsChan <- test.MakeRequest(false, "", "", codes.Unimplemented)
88+
h.requestsChan <- test.MakeRequest(false, triggerCollection, "", codes.Unimplemented)
8989
h.requestsChan <- test.MakeRequest(false, test.FakeType0Collection, "", codes.OK)
9090
h.requestsChan <- test.MakeRequest(false, test.FakeType1Collection, "", codes.OK)
9191
h.requestsChan <- test.MakeRequest(false, test.FakeType2Collection, "", codes.OK)
@@ -124,11 +124,6 @@ func TestClientSource(t *testing.T) {
124124
h.requestsChan <- test.MakeRequest(false, "", "", codes.OK)
125125
proceed <- true
126126

127-
<-waiting
128-
h.client = true
129-
h.requestsChan <- test.MakeRequest(false, "", "", codes.InvalidArgument)
130-
proceed <- true
131-
132127
<-waiting
133128
h.setRecvError(errors.New("fake recv error"))
134129
h.client = true

pkg/mcp/source/source.go

+4
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ func (con *connection) close() {
399399
}
400400

401401
func (con *connection) processClientRequest(req *mcp.RequestResources) error {
402+
if isTriggerResponse(req) {
403+
return nil
404+
}
405+
402406
collection := req.Collection
403407

404408
con.reporter.RecordRequestSize(collection, con.id, internal.ProtoSize(req))

pkg/test/framework/components/galley/galley.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,12 @@ type Instance interface {
5151
WaitForSnapshot(collection string, validator SnapshotValidatorFunc) error
5252
}
5353

54-
// Configuration for Galley
54+
// Config for Galley
5555
type Config struct {
56+
57+
// SinkAddress to dial-out to, if set.
58+
SinkAddress string
59+
5660
// MeshConfig to use for this instance.
5761
MeshConfig string
5862
}

pkg/test/framework/components/galley/native.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ const (
4949
meshConfigFile = "meshconfig.yaml"
5050
)
5151

52-
// NewNativeComponent factory function for the component
52+
// newNative returns the native implementation of galley.Instance.
5353
func newNative(ctx resource.Context, cfg Config) (Instance, error) {
5454

5555
n := &nativeComponent{
@@ -337,6 +337,11 @@ func (c *nativeComponent) restart() error {
337337
// Bind to an arbitrary port.
338338
a.APIAddress = "tcp://0.0.0.0:0"
339339

340+
if c.cfg.SinkAddress != "" {
341+
a.SinkAddress = c.cfg.SinkAddress
342+
a.SinkAuthMode = "NONE"
343+
}
344+
340345
s, err := server.New(a)
341346
if err != nil {
342347
scopes.Framework.Errorf("Error starting Galley: %v", err)
@@ -347,6 +352,10 @@ func (c *nativeComponent) restart() error {
347352

348353
go s.Run()
349354

355+
// TODO: This is due to Galley start-up being racy. We should go back to the "Start" based model where
356+
// return from s.Start() guarantees that all the setup is complete.
357+
time.Sleep(time.Second)
358+
350359
c.client = &client{
351360
address: fmt.Sprintf("tcp://%s", s.Address().String()),
352361
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2019 Istio Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package mcpserver
16+
17+
import (
18+
"testing"
19+
20+
"istio.io/istio/pkg/mcp/sink"
21+
"istio.io/istio/pkg/test/framework/components/environment"
22+
"istio.io/istio/pkg/test/framework/resource"
23+
)
24+
25+
// Instance is a new mcpserver instance. MCP Server is a generic MCP server implementation for testing purposes.
26+
type Instance interface {
27+
Address() string
28+
GetCollectionStateOrFail(t *testing.T, collection string) []*sink.Object
29+
}
30+
31+
// SinkConfig is configuration for the mcpserver for sink mode.
32+
type SinkConfig struct {
33+
Collections []string
34+
}
35+
36+
// NewSink returns a new instance of MCP Server in Sink mode.
37+
func NewSink(ctx resource.Context, cfg SinkConfig) (i Instance, err error) {
38+
err = resource.UnsupportedEnvironment(ctx.Environment())
39+
ctx.Environment().Case(environment.Native, func() {
40+
i, err = newSinkNative(ctx, cfg)
41+
})
42+
return
43+
}
44+
45+
// NewSinkOrFail returns a new instance of MCP server in Sink mode or fails.
46+
func NewSinkOrFail(t *testing.T, c resource.Context, cfg SinkConfig) Instance {
47+
i, err := NewSink(c, cfg)
48+
if err != nil {
49+
t.Fatalf("mcpserver.NewOrFail: %v", err)
50+
}
51+
return i
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright 2019 Istio Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package mcpserver
16+
17+
import (
18+
"io"
19+
"net"
20+
"testing"
21+
"time"
22+
23+
"google.golang.org/grpc"
24+
25+
mcp "istio.io/api/mcp/v1alpha1"
26+
"istio.io/istio/pkg/mcp/rate"
27+
"istio.io/istio/pkg/mcp/server"
28+
"istio.io/istio/pkg/mcp/sink"
29+
"istio.io/istio/pkg/mcp/testing/monitoring"
30+
"istio.io/istio/pkg/test/framework/resource"
31+
"istio.io/istio/pkg/test/scopes"
32+
)
33+
34+
type native struct {
35+
id resource.ID
36+
l net.Listener
37+
s *grpc.Server
38+
u *sink.InMemoryUpdater
39+
}
40+
41+
var _ Instance = &native{}
42+
var _ resource.Resource = &native{}
43+
var _ io.Closer = &native{}
44+
45+
func newSinkNative(ctx resource.Context, cfg SinkConfig) (*native, error) {
46+
n := &native{}
47+
n.id = ctx.TrackResource(n)
48+
u := sink.NewInMemoryUpdater()
49+
50+
so := sink.Options{
51+
ID: "mcpserver.sink",
52+
CollectionOptions: sink.CollectionOptionsFromSlice(cfg.Collections),
53+
Updater: u,
54+
Reporter: monitoring.NewInMemoryStatsContext(),
55+
}
56+
57+
l, err := net.Listen("tcp", "0.0.0.0:0")
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
s := grpc.NewServer()
63+
srv := sink.NewServer(&so, &sink.ServerOptions{
64+
AuthChecker: &server.AllowAllChecker{},
65+
RateLimiter: rate.NewRateLimiter(time.Millisecond, 1000).Create(),
66+
})
67+
68+
mcp.RegisterResourceSinkServer(s, srv)
69+
70+
go func() {
71+
if err := s.Serve(l); err != nil {
72+
scopes.Framework.Errorf("mcpserver.Serve: %v", err)
73+
}
74+
}()
75+
76+
n.l = l
77+
n.s = s
78+
n.u = u
79+
80+
return n, nil
81+
}
82+
83+
// ID implements resource.Resource
84+
func (n *native) ID() resource.ID {
85+
return n.id
86+
}
87+
88+
// Address implements Instance
89+
func (n *native) Address() string {
90+
return n.l.Addr().String()
91+
}
92+
93+
// GetCollectionStateOrFail implements Instance
94+
func (n *native) GetCollectionStateOrFail(t *testing.T, collection string) []*sink.Object {
95+
return n.u.Get(collection)
96+
}
97+
98+
// Close implements io.Closer
99+
func (n *native) Close() error {
100+
n.s.Stop()
101+
return nil
102+
}

0 commit comments

Comments
 (0)