Skip to content

Commit 4f734da

Browse files
committed
x/ref/lib/vxray: add support for logging eks cluster, container info
1 parent 99e7c58 commit 4f734da

File tree

8 files changed

+303
-37
lines changed

8 files changed

+303
-37
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
FROM alpine
2+
COPY echo /bin/echo
3+
COPY creds/ /bin/creds/
4+
ENTRYPOINT ["/bin/echo"]
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
FROM alpine
2+
COPY echod /bin/echod
3+
COPY creds/ /bin/creds/
4+
ENTRYPOINT ["/bin/echod"]
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
FROM alpine
2+
COPY mounttabled /bin/mounttabled
3+
COPY creds/ /bin/creds/
4+
ENTRYPOINT ["/bin/mounttabled"]

x/ref/examples/echo/echo/echo.go

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package main
77

88
import (
9+
"errors"
910
"flag"
1011
"fmt"
1112
"io"
@@ -18,6 +19,7 @@ import (
1819
"github.com/aws/aws-xray-sdk-go/xray"
1920
v23 "v.io/v23"
2021
"v.io/v23/context"
22+
"v.io/v23/naming"
2123
"v.io/v23/vtrace"
2224
"v.io/x/ref/examples/echo"
2325
"v.io/x/ref/lib/aws/vxray"
@@ -50,7 +52,20 @@ func main() {
5052
ctx, shutdown := v23.Init()
5153
defer shutdown()
5254

53-
ctx, _ = vxray.InitXRay(ctx, v23.GetRuntimeFlags().VtraceFlags, xray.Config{ServiceVersion: ""}, vxray.EC2Plugin(), vxray.MergeLogging(true))
55+
ctx, _ = vxray.InitXRay(ctx,
56+
v23.GetRuntimeFlags().VtraceFlags,
57+
xray.Config{ServiceVersion: ""},
58+
vxray.EC2Plugin(),
59+
vxray.EKSCluster(),
60+
vxray.ContainerIDAndHost(),
61+
vxray.MergeLogging(true))
62+
63+
servers := strings.Split(serverFlag, ",")
64+
if len(servers) > 0 {
65+
ctx.Infof("waiting for: %v servers: %v", len(servers), servers)
66+
waitForServers(ctx, servers)
67+
ctx.Infof("servers ready: %v", servers)
68+
}
5469

5570
client := echo.EchoServiceClient(nameFlag)
5671

@@ -73,7 +88,6 @@ func main() {
7388
close(done)
7489
}()
7590

76-
servers := strings.Split(serverFlag, ",")
7791
samplingRequest := &vtrace.SamplingRequest{
7892
Name: nameFlag,
7993
}
@@ -169,7 +183,7 @@ func callPing(ctx *context.T, client echo.EchoServiceClientStub, out io.Writer,
169183
}
170184
result, err := client.Ping(ctx, now, servers)
171185
if err != nil {
172-
ctx.Errorf("%v.%v failed: %v", nameFlag, "ping", err)
186+
ctx.Errorf("%v.%v failed: %v", servers, "ping", err)
173187
}
174188
if len(result) < 100 {
175189
fmt.Fprintln(out, result)
@@ -178,3 +192,27 @@ func callPing(ctx *context.T, client echo.EchoServiceClientStub, out io.Writer,
178192
}
179193
return err
180194
}
195+
196+
func waitForServers(ctx *context.T, servers []string) {
197+
var wg sync.WaitGroup
198+
wg.Add(len(servers))
199+
ns := v23.GetNamespace(ctx)
200+
for _, server := range servers {
201+
go func(server string) {
202+
for {
203+
_, err := ns.Resolve(ctx, server)
204+
ctx.Infof("%v: %v: %v", server, err, errors.Is(err, naming.ErrNoSuchName))
205+
if errors.Is(err, naming.ErrNoSuchName) {
206+
time.Sleep(time.Second)
207+
continue
208+
}
209+
if err == nil {
210+
break
211+
}
212+
ctx.Infof("%v: %v\n", server, err)
213+
}
214+
wg.Done()
215+
}(server)
216+
}
217+
wg.Wait()
218+
}

x/ref/examples/echo/echod/echod.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,13 @@ func main() {
7676
ctx, shutdown := v23.Init()
7777
defer shutdown()
7878

79-
ctx, _ = vxray.InitXRay(ctx, v23.GetRuntimeFlags().VtraceFlags, xray.Config{ServiceVersion: ""}, vxray.EC2Plugin(), vxray.MergeLogging(true))
79+
ctx, _ = vxray.InitXRay(ctx,
80+
v23.GetRuntimeFlags().VtraceFlags,
81+
xray.Config{ServiceVersion: ""},
82+
vxray.EC2Plugin(),
83+
vxray.EKSCluster(),
84+
vxray.ContainerIDAndHost(),
85+
vxray.MergeLogging(true))
8086

8187
ctx, server, err := v23.WithNewServer(ctx, nameFlag, echo.EchoServiceServer(&echod{}), securityflag.NewAuthorizerOrDie(ctx))
8288
if err != nil {

x/ref/lib/aws/vxray/config.go

Lines changed: 88 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package vxray
99

1010
import (
1111
"fmt"
12+
"os"
1213

1314
"github.com/aws/aws-xray-sdk-go/awsplugins/beanstalk"
1415
"github.com/aws/aws-xray-sdk-go/awsplugins/ec2"
@@ -18,15 +19,18 @@ import (
1819
"v.io/v23/context"
1920
"v.io/v23/logging"
2021
"v.io/v23/vtrace"
22+
"v.io/x/ref/lib/aws/vxray/internal"
2123
"v.io/x/ref/lib/flags"
2224
libvtrace "v.io/x/ref/lib/vtrace"
2325
)
2426

2527
type options struct {
26-
mergeLogging bool
27-
mapToHTTP bool
28-
newStore bool
29-
newStoreFlags flags.VtraceFlags
28+
mergeLogging bool
29+
mapToHTTP bool
30+
newStore bool
31+
newStoreFlags flags.VtraceFlags
32+
configMap, configMapKey string
33+
containerized bool
3034
}
3135

3236
// Option represents an option to InitXRay.
@@ -53,6 +57,43 @@ func BeanstalkPlugin() Option {
5357
}
5458
}
5559

60+
// KubernetesCluster configures obtaining information about the process'
61+
// current environment when running under Kubernetes (k8s), whether managed by
62+
// AWS EKS or any other control plane implementation. It requires that the
63+
// K8S configuration creates a configmap that contains the cluster name.
64+
// The configMap argument names that configmap and configMapKey
65+
// is the key in that configmap for the cluster name. For example, when using
66+
// the AWS cloudwatch/insights/xray-daemon daemonset the values for those
67+
// would be:
68+
// /api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info
69+
// cluster.name
70+
//
71+
// When configured, xray segments will contain a 'cluster_name' annotation.
72+
func KubernetesCluster(configMap, configMapKey string) Option {
73+
return func(o *options) {
74+
o.configMap, o.configMapKey = configMap, configMapKey
75+
}
76+
}
77+
78+
// EKSCluster calls KubernetesCluster with the values commonly used
79+
// with EKS clusters.
80+
func EKSCluster() Option {
81+
return KubernetesCluster("/api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info", "cluster.name")
82+
}
83+
84+
// ContainerIDAndHost requests that container id and host information be
85+
// obtained and added to traces. The container id is obtained by parsing
86+
// the /proc/self/cgroup file, and the host by call the operating system's
87+
// hostname function. When running under kubernetes for example, the
88+
//
89+
// When configured, xray segments will contain 'container_id' and 'container_host'
90+
// annotations.
91+
func ContainerIDAndHost() Option {
92+
return func(o *options) {
93+
o.containerized = true
94+
}
95+
}
96+
5697
// MergeLogging arrays for xray logging messages to be merged with vanadium
5798
// log messages.
5899
func MergeLogging(v bool) Option {
@@ -105,44 +146,73 @@ func (xl *xraylogger) Log(level xraylog.LogLevel, msg fmt.Stringer) {
105146
}
106147
}
107148

108-
func initXRay(ctx *context.T, config xray.Config, opts []Option) (*context.T, *options, error) {
109-
o := &options{mapToHTTP: true}
110-
for _, fn := range opts {
111-
fn(o)
112-
}
149+
func (m *manager) initXRay(ctx *context.T, config xray.Config) (*context.T, error) {
113150
if err := xray.Configure(config); err != nil {
114151
ctx.Errorf("failed to configure xray context: %v", err)
115-
return ctx, nil, err
152+
return ctx, err
116153
}
117-
if o.mergeLogging {
154+
if m.options.mergeLogging {
118155
xray.SetLogger(&xraylogger{context.LoggerFromContext(ctx)})
119156
}
120157
ctx, err := WithConfig(ctx, config)
121-
return ctx, o, err
158+
return ctx, err
122159
}
123160

124161
// InitXRay configures the AWS xray service and returns a context containing
125162
// the xray configuration. This should only be called once. The vflags argument
126163
// is used solely to check if xray tracing is enabled and not to create a
127-
// new vtrace.Store, if a new store is required, the
164+
// new vtrace.Store, if a new/alternate store is required, the WithNewStore option
165+
// should be used to specify the store to be used.
128166
func InitXRay(ctx *context.T, vflags flags.VtraceFlags, config xray.Config, opts ...Option) (*context.T, error) {
129167
if !vflags.EnableAWSXRay {
130168
return ctx, nil
131169
}
132170
octx := ctx
133-
ctx, options, err := initXRay(ctx, config, opts)
171+
mgr := &manager{}
172+
mgr.options.mapToHTTP = true
173+
for _, fn := range opts {
174+
fn(&mgr.options)
175+
}
176+
ctx, err := mgr.initXRay(ctx, config)
134177
if err != nil {
135178
return octx, err
136179
}
137-
138-
if options.newStore {
139-
store, err := libvtrace.NewStore(options.newStoreFlags)
180+
if mgr.options.newStore {
181+
store, err := libvtrace.NewStore(mgr.options.newStoreFlags)
140182
if err != nil {
141183
return octx, err
142184
}
143185
ctx = vtrace.WithStore(ctx, store)
144186
}
145-
mgr := &manager{mapToHTTP: options.mapToHTTP}
187+
if mgr.options.containerized {
188+
if hostNameErr == nil {
189+
mgr.containerHost = hostName
190+
} else {
191+
ctx.Infof("failed to obtain host name from: %v", hostNameErr)
192+
}
193+
cgroupFile := "/proc/self/cgroup"
194+
if cid, err := internal.GetContainerID(cgroupFile); err == nil {
195+
mgr.containerID = cid
196+
} else {
197+
ctx.Infof("failed to obtain container id", err)
198+
}
199+
}
200+
if cm := mgr.options.configMap; len(cm) > 0 {
201+
if clusterName, err := internal.GetEKSClusterName(ctx, cm, mgr.options.configMapKey); err == nil {
202+
mgr.clusterName = clusterName
203+
} else {
204+
ctx.Infof("failed to obtain cluster name from %v.%v: %v", cm, mgr.options.configMapKey, err)
205+
}
206+
}
146207
ctx = vtrace.WithManager(ctx, mgr)
147208
return ctx, nil
148209
}
210+
211+
var (
212+
hostName string
213+
hostNameErr error
214+
)
215+
216+
func init() {
217+
hostName, hostNameErr = os.Hostname()
218+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package internal
2+
3+
import (
4+
"bufio"
5+
"bytes"
6+
"context"
7+
"crypto/tls"
8+
"crypto/x509"
9+
"encoding/json"
10+
"fmt"
11+
"io"
12+
"net/http"
13+
"net/url"
14+
"os"
15+
"path/filepath"
16+
)
17+
18+
const (
19+
k8sServiceAccountPrefix = "/var/run/secrets/kubernetes.io/serviceaccount"
20+
k8sCert = "ca.crt"
21+
k8sToken = "token"
22+
k8sAPIHost = "kubernetes.default.svc"
23+
kContainerIDLen = 64
24+
)
25+
26+
func GetEKSClusterName(ctx context.Context, configMap, keyName string) (string, error) {
27+
cm, err := getConfigMap(ctx, configMap)
28+
if err != nil {
29+
return "", err
30+
}
31+
name, ok := cm[keyName]
32+
if !ok {
33+
return "", fmt.Errorf("cluster name key %v not found", keyName)
34+
}
35+
return name, nil
36+
}
37+
38+
func getConfigMap(ctx context.Context, configMap string) (map[string]string, error) {
39+
rootPEM, err := os.ReadFile(filepath.Join(k8sServiceAccountPrefix, k8sCert))
40+
if err != nil {
41+
return nil, err
42+
}
43+
token, err := os.ReadFile(filepath.Join(k8sServiceAccountPrefix, k8sToken))
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
roots := x509.NewCertPool()
49+
ok := roots.AppendCertsFromPEM([]byte(rootPEM))
50+
if !ok {
51+
panic("failed to parse root certificate")
52+
}
53+
tr := &http.Transport{
54+
TLSClientConfig: &tls.Config{
55+
RootCAs: roots,
56+
},
57+
}
58+
client := &http.Client{Transport: tr}
59+
u := &url.URL{
60+
Scheme: "https",
61+
Host: k8sAPIHost,
62+
Path: configMap,
63+
}
64+
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
65+
if err != nil {
66+
return nil, err
67+
}
68+
req.Header.Set("Authorization", "Bearer "+string(token))
69+
resp, err := client.Do(req)
70+
if err != nil {
71+
return nil, err
72+
}
73+
defer resp.Body.Close()
74+
buf := bytes.NewBuffer(make([]byte, 0, 1024))
75+
io.Copy(buf, resp.Body)
76+
if resp.StatusCode != http.StatusOK {
77+
return nil, fmt.Errorf("%s: ERROR: %v", buf.String(), resp.StatusCode)
78+
}
79+
cm := struct {
80+
API string `json:"apiVersion"`
81+
Data map[string]string `json:"data"`
82+
}{}
83+
if err := json.Unmarshal(buf.Bytes(), &cm); err != nil {
84+
return nil, fmt.Errorf("%s: %v", buf.String(), err)
85+
}
86+
if cm.API != "v1" && len(cm.Data) == 0 {
87+
return nil, fmt.Errorf("API version has changed to %v: found no config map data", cm.API)
88+
89+
}
90+
return cm.Data, nil
91+
}
92+
93+
func GetContainerID(cgroupFile string) (string, error) {
94+
rd, err := os.Open(cgroupFile)
95+
if err != nil {
96+
return "", err
97+
}
98+
sc := bufio.NewScanner(rd)
99+
cid := ""
100+
for sc.Scan() {
101+
line := sc.Text()
102+
if l := len(line); l > kContainerIDLen {
103+
cid = line[l-kContainerIDLen:]
104+
break
105+
}
106+
}
107+
if len(cid) == 0 {
108+
return "", fmt.Errorf("failed to find a container id in %v", cgroupFile)
109+
}
110+
return cid, nil
111+
}

0 commit comments

Comments
 (0)