Skip to content

Commit b91a1aa

Browse files
committed
feat: add opentelemetry trace to record the sync trace
1 parent acb47d5 commit b91a1aa

File tree

7 files changed

+134
-2
lines changed

7 files changed

+134
-2
lines changed

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ require (
1818
k8s.io/apimachinery v0.32.2
1919
k8s.io/cli-runtime v0.32.2
2020
k8s.io/client-go v0.32.2
21+
go.opentelemetry.io/otel v1.28.0
22+
go.opentelemetry.io/otel/trace v1.28.0
2123
k8s.io/klog/v2 v2.130.1
2224
k8s.io/kube-aggregator v0.32.2
2325
k8s.io/kube-openapi v0.0.0-20241212222426-2c72e554b1e7

pkg/engine/engine.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/argoproj/gitops-engine/pkg/sync"
2424
"github.com/argoproj/gitops-engine/pkg/sync/common"
2525
"github.com/argoproj/gitops-engine/pkg/utils/kube"
26+
"github.com/argoproj/gitops-engine/pkg/utils/tracing"
2627
)
2728

2829
const (
@@ -84,7 +85,7 @@ func (e *gitOpsEngine) Sync(ctx context.Context,
8485
return nil, err
8586
}
8687
opts = append(opts, sync.WithSkipHooks(!diffRes.Modified))
87-
syncCtx, cleanup, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, e.cache.GetOpenAPISchema(), opts...)
88+
syncCtx, cleanup, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, e.cache.GetOpenAPISchema(), tracing.NopTracer{}, "", "", opts...)
8889
if err != nil {
8990
return nil, err
9091
}

pkg/sync/sync_context.go

+45-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/argoproj/gitops-engine/pkg/sync/hook"
3333
resourceutil "github.com/argoproj/gitops-engine/pkg/sync/resource"
3434
kubeutil "github.com/argoproj/gitops-engine/pkg/utils/kube"
35+
"github.com/argoproj/gitops-engine/pkg/utils/tracing"
3536
)
3637

3738
type reconciledResource struct {
@@ -209,6 +210,8 @@ func NewSyncContext(
209210
kubectl kubeutil.Kubectl,
210211
namespace string,
211212
openAPISchema openapi.Resources,
213+
syncTracer tracing.Tracer,
214+
syncTraceID, syncTraceRootSpanID string,
212215
opts ...SyncOpt,
213216
) (SyncContext, func(), error) {
214217
dynamicIf, err := dynamic.NewForConfig(restConfig)
@@ -246,6 +249,9 @@ func NewSyncContext(
246249
permissionValidator: func(_ *unstructured.Unstructured, _ *metav1.APIResource) error {
247250
return nil
248251
},
252+
syncTracer: syncTracer,
253+
syncTraceID: syncTraceID,
254+
syncTraceRootSpanID: syncTraceRootSpanID,
249255
}
250256
for _, opt := range opts {
251257
opt(ctx)
@@ -357,6 +363,11 @@ type syncContext struct {
357363
// lock to protect concurrent updates of the result list
358364
lock sync.Mutex
359365

366+
// tracer for tracing the sync operation
367+
syncTraceID string
368+
syncTraceRootSpanID string
369+
syncTracer tracing.Tracer
370+
360371
// syncNamespace is a function that will determine if the managed
361372
// namespace should be synced
362373
syncNamespace func(*unstructured.Unstructured, *unstructured.Unstructured) (bool, error)
@@ -1262,6 +1273,8 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
12621273
ss.Go(func(state runState) runState {
12631274
logCtx := sc.log.WithValues("dryRun", dryRun, "task", t)
12641275
logCtx.V(1).Info("Pruning")
1276+
span := sc.syncTracer.StartSpanFromTraceParent("pruneObject", sc.syncTraceID, sc.syncTraceRootSpanID)
1277+
defer span.Finish()
12651278
result, message := sc.pruneObject(t.liveObj, sc.prune, dryRun)
12661279
if result == common.ResultCodeSyncFailed {
12671280
state = failed
@@ -1270,6 +1283,7 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
12701283
if !dryRun || sc.dryRun || result == common.ResultCodeSyncFailed {
12711284
sc.setResourceResult(t, result, operationPhases[result], message)
12721285
}
1286+
sc.setBaggageItemForTasks(&span, t, message, result, operationPhases[result])
12731287
return state
12741288
})
12751289
}
@@ -1289,19 +1303,27 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
12891303
ss.Go(func(state runState) runState {
12901304
sc.log.WithValues("dryRun", dryRun, "task", t).V(1).Info("Deleting")
12911305
if !dryRun {
1306+
span := sc.syncTracer.StartSpanFromTraceParent("hooksDeletion", sc.syncTraceID, sc.syncTraceRootSpanID)
1307+
defer span.Finish()
12921308
err := sc.deleteResource(t)
1309+
message := "deleted"
1310+
operationPhase := common.OperationRunning
12931311
if err != nil {
12941312
// it is possible to get a race condition here, such that the resource does not exist when
12951313
// delete is requested, we treat this as a nop
12961314
if !apierrors.IsNotFound(err) {
12971315
state = failed
1298-
sc.setResourceResult(t, "", common.OperationError, fmt.Sprintf("failed to delete resource: %v", err))
1316+
message = fmt.Sprintf("failed to delete resource: %v", err)
1317+
operationPhase = common.OperationError
1318+
sc.setResourceResult(t, "", operationPhase, message)
12991319
}
13001320
} else {
13011321
// if there is anything that needs deleting, we are at best now in pending and
13021322
// want to return and wait for sync to be invoked again
13031323
state = pending
1324+
operationPhase = common.OperationSucceeded
13041325
}
1326+
sc.setBaggageItemForTasks(&span, t, message, "", operationPhase)
13051327
}
13061328
return state
13071329
})
@@ -1330,6 +1352,24 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
13301352
return state
13311353
}
13321354

1355+
func (sc *syncContext) createSpan(operation string, dryrun bool) tracing.Span {
1356+
// skip tracing if dryrun
1357+
if dryrun {
1358+
return tracing.NopTracer{}.StartSpan(operation)
1359+
}
1360+
return sc.syncTracer.StartSpanFromTraceParent(operation, sc.syncTraceID, sc.syncTraceRootSpanID)
1361+
}
1362+
1363+
func (sc *syncContext) setBaggageItemForTasks(span *tracing.Span, t *syncTask, message string, result common.ResultCode, operationPhase common.OperationPhase) {
1364+
resourceKey := t.resourceKey()
1365+
(*span).SetBaggageItem("resource", resourceKey.String())
1366+
(*span).SetBaggageItem("result", string(result))
1367+
(*span).SetBaggageItem("operationPhase", string(operationPhase))
1368+
(*span).SetBaggageItem("message", message)
1369+
(*span).SetBaggageItem("phase", string(t.phase))
1370+
(*span).SetBaggageItem("wave", fmt.Sprint(t.wave()))
1371+
}
1372+
13331373
func (sc *syncContext) processCreateTasks(state runState, tasks syncTasks, dryRun bool) runState {
13341374
ss := newStateSync(state)
13351375
for _, task := range tasks {
@@ -1341,11 +1381,14 @@ func (sc *syncContext) processCreateTasks(state runState, tasks syncTasks, dryRu
13411381
logCtx := sc.log.WithValues("dryRun", dryRun, "task", t)
13421382
logCtx.V(1).Info("Applying")
13431383
validate := sc.validate && !resourceutil.HasAnnotationOption(t.targetObj, common.AnnotationSyncOptions, common.SyncOptionsDisableValidation)
1384+
span := sc.syncTracer.StartSpanFromTraceParent("applyObject", sc.syncTraceID, sc.syncTraceRootSpanID)
1385+
defer span.Finish()
13441386
result, message := sc.applyObject(t, dryRun, validate)
13451387
if result == common.ResultCodeSyncFailed {
13461388
logCtx.WithValues("message", message).Info("Apply failed")
13471389
state = failed
13481390
}
1391+
var phase common.OperationPhase
13491392
if !dryRun || sc.dryRun || result == common.ResultCodeSyncFailed {
13501393
phase := operationPhases[result]
13511394
// no resources are created in dry-run, so running phase means validation was
@@ -1355,6 +1398,7 @@ func (sc *syncContext) processCreateTasks(state runState, tasks syncTasks, dryRu
13551398
}
13561399
sc.setResourceResult(t, result, phase, message)
13571400
}
1401+
sc.setBaggageItemForTasks(&span, t, message, result, phase)
13581402
return state
13591403
})
13601404
}

pkg/utils/tracing/api.go

+3
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ package tracing
88

99
type Tracer interface {
1010
StartSpan(operationName string) Span
11+
StartSpanFromTraceParent(operationName string, parentTraceId, parentSpanId string) Span
1112
}
1213

1314
type Span interface {
1415
SetBaggageItem(key string, value any)
1516
Finish()
17+
SpanID() string
18+
TraceID() string
1619
}

pkg/utils/tracing/logging.go

+14
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ func (l LoggingTracer) StartSpan(operationName string) Span {
3030
}
3131
}
3232

33+
// loggingSpan is not a real distributed tracing system.
34+
// so no need to implement real StartSpanFromTraceParent method.
35+
func (l LoggingTracer) StartSpanFromTraceParent(operationName string, parentTraceId, parentSpanId string) Span {
36+
return l.StartSpan(operationName)
37+
}
38+
3339
type loggingSpan struct {
3440
logger logr.Logger
3541
operationName string
@@ -54,3 +60,11 @@ func baggageToVals(baggage map[string]any) []any {
5460
}
5561
return result
5662
}
63+
64+
func (s loggingSpan) TraceID() string {
65+
return ""
66+
}
67+
68+
func (s loggingSpan) SpanID() string {
69+
return ""
70+
}

pkg/utils/tracing/nop.go

+12
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,22 @@ func (n NopTracer) StartSpan(_ string) Span {
1111
return nopSpan{}
1212
}
1313

14+
func (n NopTracer) StartSpanFromTraceParent(_, _, _ string) Span {
15+
return nopSpan{}
16+
}
17+
1418
type nopSpan struct{}
1519

1620
func (n nopSpan) SetBaggageItem(_ string, _ any) {
1721
}
1822

1923
func (n nopSpan) Finish() {
2024
}
25+
26+
func (s nopSpan) TraceID() string {
27+
return ""
28+
}
29+
30+
func (s nopSpan) SpanID() string {
31+
return ""
32+
}

pkg/utils/tracing/opentelemetry.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package tracing
2+
3+
import (
4+
"context"
5+
6+
"go.opentelemetry.io/otel/attribute"
7+
"go.opentelemetry.io/otel/trace"
8+
)
9+
10+
type OpenTelemetryTracer struct {
11+
realTracer trace.Tracer
12+
}
13+
14+
func NewOpenTelemetryTracer(t trace.Tracer) Tracer {
15+
return &OpenTelemetryTracer{
16+
realTracer: t,
17+
}
18+
}
19+
20+
func (t OpenTelemetryTracer) StartSpan(operationName string) Span {
21+
_, realspan := t.realTracer.Start(context.Background(), operationName)
22+
return openTelemetrySpan{realSpan: realspan}
23+
}
24+
25+
func (t OpenTelemetryTracer) StartSpanFromTraceParent(operationName string, parentTraceId, parentSpanId string) Span {
26+
traceID, _ := trace.TraceIDFromHex(parentTraceId)
27+
parentSpanID, _ := trace.SpanIDFromHex(parentSpanId)
28+
spanCtx := trace.NewSpanContext(trace.SpanContextConfig{
29+
TraceID: traceID,
30+
SpanID: parentSpanID,
31+
TraceFlags: trace.FlagsSampled,
32+
})
33+
ctx := trace.ContextWithSpanContext(context.Background(), spanCtx)
34+
_, realSpan := t.realTracer.Start(ctx, operationName)
35+
return openTelemetrySpan{realSpan: realSpan}
36+
}
37+
38+
type openTelemetrySpan struct {
39+
realSpan trace.Span
40+
}
41+
42+
func (s openTelemetrySpan) SetBaggageItem(key string, value interface{}) {
43+
s.realSpan.SetAttributes(attribute.Key(key).String(value.(string)))
44+
}
45+
46+
func (s openTelemetrySpan) Finish() {
47+
s.realSpan.End()
48+
}
49+
50+
func (s openTelemetrySpan) TraceID() string {
51+
return s.realSpan.SpanContext().TraceID().String()
52+
}
53+
54+
func (s openTelemetrySpan) SpanID() string {
55+
return s.realSpan.SpanContext().SpanID().String()
56+
}

0 commit comments

Comments
 (0)