Skip to content

Commit 230e064

Browse files
authored
Merge branch 'main' into tel_metric_attributes
2 parents 2453f69 + 4397ebc commit 230e064

File tree

31 files changed

+209
-109
lines changed

31 files changed

+209
-109
lines changed
+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Update queue size after the element is done exported
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12399]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: After this change the active queue size will include elements in the process of being exported.
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: []

.chloggen/rm-not-component.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: mdatagen
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Remove unused not_component config for mdatagen
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12237]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

.github/workflows/fossa.yml

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
name: FOSSA scanning
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
8+
permissions:
9+
contents: read
10+
11+
jobs:
12+
fossa:
13+
runs-on: ubuntu-latest
14+
steps:
15+
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
16+
17+
- uses: fossas/fossa-action@93a52ecf7c3ac7eb40f5de77fd69b1a19524de94 # v1.5.0
18+
with:
19+
api-key: ${{secrets.FOSSA_API_KEY}}
20+
team: OpenTelemetry

cmd/mdatagen/internal/command.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func run(ymlPath string) error {
7878
codeDir := filepath.Join(ymlDir, "internal", md.GeneratedPackageName)
7979
toGenerate := map[string]string{}
8080
if md.Status != nil {
81-
if md.Status.Class != "cmd" && md.Status.Class != "pkg" && !md.Status.NotComponent {
81+
if md.Status.Class != "cmd" && md.Status.Class != "pkg" {
8282
toGenerate[filepath.Join(tmplDir, "status.go.tmpl")] = filepath.Join(codeDir, "generated_status.go")
8383
if err = generateFile(filepath.Join(tmplDir, "component_test.go.tmpl"),
8484
filepath.Join(ymlDir, "generated_component_test.go"), md, packageName); err != nil {

cmd/mdatagen/internal/status.go

-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ type Status struct {
4646
Warnings []string `mapstructure:"warnings"`
4747
Codeowners *Codeowners `mapstructure:"codeowners"`
4848
UnsupportedPlatforms []string `mapstructure:"unsupported_platforms"`
49-
NotComponent bool `mapstructure:"not_component"`
5049
}
5150

5251
func (s *Status) SortedDistributions() []string {

cmd/otelcorecol/go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ require (
8080
github.com/tklauser/numcpus v0.6.1 // indirect
8181
github.com/yusufpapurcu/wmi v1.2.4 // indirect
8282
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
83-
go.opentelemetry.io/collector v0.119.0 // indirect
83+
go.opentelemetry.io/collector v0.119.1-0.20250217085205-36e046737251 // indirect
8484
go.opentelemetry.io/collector/client v1.25.0 // indirect
8585
go.opentelemetry.io/collector/component/componentstatus v0.119.0 // indirect
8686
go.opentelemetry.io/collector/component/componenttest v0.119.0 // indirect
@@ -101,7 +101,7 @@ require (
101101
go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.119.0 // indirect
102102
go.opentelemetry.io/collector/consumer/consumertest v0.119.0 // indirect
103103
go.opentelemetry.io/collector/consumer/xconsumer v0.119.0 // indirect
104-
go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.119.0 // indirect
104+
go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.119.1-0.20250217142445-a567a0176541 // indirect
105105
go.opentelemetry.io/collector/exporter/exportertest v0.119.0 // indirect
106106
go.opentelemetry.io/collector/exporter/xexporter v0.119.0 // indirect
107107
go.opentelemetry.io/collector/extension/auth v0.119.0 // indirect

connector/xconnector/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ type: xconnector
22
github_project: open-telemetry/opentelemetry-collector
33

44
status:
5-
class: connector
5+
class: pkg
66
codeowners:
77
active:
88
- mx-psi

exporter/debugexporter/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
go.opentelemetry.io/collector/confmap v1.25.0
1111
go.opentelemetry.io/collector/consumer v1.25.0
1212
go.opentelemetry.io/collector/exporter v0.119.0
13-
go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.119.0
13+
go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.119.1-0.20250217142445-a567a0176541
1414
go.opentelemetry.io/collector/exporter/exportertest v0.119.0
1515
go.opentelemetry.io/collector/exporter/xexporter v0.119.0
1616
go.opentelemetry.io/collector/pdata v1.25.0

exporter/exporterhelper/internal/batch_sender_test.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func TestBatchSender_Merge(t *testing.T) {
9090
assert.Equal(t, int64(1), sink.RequestsCount())
9191
assert.Eventually(t, func() bool {
9292
return sink.RequestsCount() == 2 && sink.ItemsCount() == 15
93-
}, 100*time.Millisecond, 10*time.Millisecond)
93+
}, 1*time.Second, 10*time.Millisecond)
9494
})
9595
}
9696
for _, tt := range tests {
@@ -160,12 +160,12 @@ func TestBatchSender_BatchExportError(t *testing.T) {
160160
errReq := &requesttest.FakeRequest{Items: 20, ExportErr: errors.New("transient error"), Sink: sink}
161161
require.NoError(t, be.Send(context.Background(), errReq))
162162

163-
// the batch should be dropped since the queue doesn't have requeuing enabled.
163+
// the batch should be dropped since the queue doesn't have re-queuing enabled.
164164
assert.Eventually(t, func() bool {
165165
return sink.RequestsCount() == tt.expectedRequests &&
166166
sink.ItemsCount() == tt.expectedItems &&
167167
be.queue.Size() == 0
168-
}, 100*time.Millisecond, 10*time.Millisecond)
168+
}, 1*time.Second, 10*time.Millisecond)
169169

170170
require.NoError(t, be.Shutdown(context.Background()))
171171
})
@@ -194,13 +194,13 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
194194
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink}))
195195
assert.Eventually(t, func() bool {
196196
return sink.RequestsCount() == 1 && sink.ItemsCount() == 8
197-
}, 500*time.Millisecond, 10*time.Millisecond)
197+
}, 1*time.Second, 10*time.Millisecond)
198198

199199
// big request should be broken down into two requests, both are sent right away.
200200
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 17, Sink: sink}))
201201
assert.Eventually(t, func() bool {
202202
return sink.RequestsCount() == 3 && sink.ItemsCount() == 25
203-
}, 500*time.Millisecond, 10*time.Millisecond)
203+
}, 1*time.Second, 10*time.Millisecond)
204204

205205
// request that cannot be split should be dropped.
206206
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{
@@ -212,7 +212,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) {
212212
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink}))
213213
assert.Eventually(t, func() bool {
214214
return sink.RequestsCount() == 5 && sink.ItemsCount() == 38
215-
}, 500*time.Millisecond, 10*time.Millisecond)
215+
}, 1*time.Second, 10*time.Millisecond)
216216
require.NoError(t, be.Shutdown(context.Background()))
217217
})
218218
}
@@ -370,20 +370,20 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
370370

371371
assert.Eventually(t, func() bool {
372372
return sink.RequestsCount() == 1 && sink.ItemsCount() == 4
373-
}, 100*time.Millisecond, 10*time.Millisecond)
373+
}, 1*time.Second, 10*time.Millisecond)
374374

375375
// the 3rd request should be flushed by itself due to flush interval
376376
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
377377
assert.Eventually(t, func() bool {
378378
return sink.RequestsCount() == 2 && sink.ItemsCount() == 6
379-
}, 100*time.Millisecond, 10*time.Millisecond)
379+
}, 1*time.Second, 10*time.Millisecond)
380380

381381
// the 4th and 5th request should be flushed in the same batched request by max concurrency limit.
382382
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
383383
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink}))
384384
assert.Eventually(t, func() bool {
385385
return sink.RequestsCount() == 3 && sink.ItemsCount() == 10
386-
}, 100*time.Millisecond, 10*time.Millisecond)
386+
}, 1*time.Second, 10*time.Millisecond)
387387

388388
// do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling.
389389
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 5, Sink: sink}))
@@ -392,15 +392,15 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) {
392392
// in case of MaxSizeItems=10, wait for the leftover request to send
393393
assert.Eventually(t, func() bool {
394394
return sink.RequestsCount() == 5 && sink.ItemsCount() == 21
395-
}, 50*time.Millisecond, 10*time.Millisecond)
395+
}, 1*time.Second, 10*time.Millisecond)
396396
}
397397

398398
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink}))
399399
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 6, Sink: sink}))
400400
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 20, Sink: sink}))
401401
assert.Eventually(t, func() bool {
402402
return sink.RequestsCount() == tt.expectedRequests && sink.ItemsCount() == tt.expectedItems
403-
}, 100*time.Millisecond, 10*time.Millisecond)
403+
}, 1*time.Second, 10*time.Millisecond)
404404
})
405405
}
406406
}
@@ -648,7 +648,7 @@ func TestBatchSenderTimerResetNoConflict(t *testing.T) {
648648
assert.EventuallyWithT(t, func(c *assert.CollectT) {
649649
assert.LessOrEqual(c, int64(1), sink.RequestsCount())
650650
assert.EqualValues(c, 8, sink.ItemsCount())
651-
}, 500*time.Millisecond, 10*time.Millisecond)
651+
}, 1*time.Second, 10*time.Millisecond)
652652

653653
require.NoError(t, be.Shutdown(context.Background()))
654654
}

exporter/exporterhelper/xexporterhelper/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ type: xexporterhelper
22
github_project: open-telemetry/opentelemetry-collector
33

44
status:
5-
class: exporter
5+
class: pkg
66
codeowners:
77
active:
88
- mx-psi

exporter/exporterqueue/async_queue_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
7070
wg.Add(1)
7171
go func() {
7272
defer wg.Done()
73-
for j := 0; j < 11; j++ {
73+
for j := 0; j < 10; j++ {
7474
assert.NoError(t, ac.Offer(ctx, 1))
7575
}
7676
assert.ErrorIs(t, ac.Offer(ctx, 3), context.Canceled)

exporter/exporterqueue/disabled_queue.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ var donePool = sync.Pool{
1919

2020
func newDisabledQueue[T any](consumeFunc ConsumeFunc[T]) Queue[T] {
2121
return &disabledQueue[T]{
22+
sizer: &requestSizer[T]{},
2223
consumeFunc: consumeFunc,
2324
size: &atomic.Int64{},
2425
}
@@ -28,14 +29,18 @@ type disabledQueue[T any] struct {
2829
component.StartFunc
2930
component.ShutdownFunc
3031
consumeFunc ConsumeFunc[T]
32+
sizer sizer[T]
3133
size *atomic.Int64
3234
}
3335

3436
func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
37+
elSize := d.sizer.Sizeof(req)
38+
d.size.Add(elSize)
39+
3540
done := donePool.Get().(*blockingDone)
36-
d.size.Add(1)
41+
done.queue = d
42+
done.elSize = elSize
3743
d.consumeFunc(ctx, req, done)
38-
defer d.size.Add(-1)
3944
// Only re-add the blockingDone instance back to the pool if successfully received the
4045
// message from the consumer which guarantees consumer will not use that anymore,
4146
// otherwise no guarantee about when the consumer will add the message to the channel so cannot reuse or close.
@@ -48,6 +53,10 @@ func (d *disabledQueue[T]) Offer(ctx context.Context, req T) error {
4853
}
4954
}
5055

56+
func (d *disabledQueue[T]) onDone(elSize int64) {
57+
d.size.Add(-elSize)
58+
}
59+
5160
// Size returns the current number of blocked requests waiting to be processed.
5261
func (d *disabledQueue[T]) Size() int64 {
5362
return d.size.Load()
@@ -59,9 +68,14 @@ func (d *disabledQueue[T]) Capacity() int64 {
5968
}
6069

6170
type blockingDone struct {
62-
ch chan error
71+
queue interface {
72+
onDone(int64)
73+
}
74+
elSize int64
75+
ch chan error
6376
}
6477

6578
func (d *blockingDone) OnDone(err error) {
79+
d.queue.onDone(d.elSize)
6680
d.ch <- err
6781
}

exporter/exporterqueue/memory_queue.go

+37-7
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ import (
1111
"go.opentelemetry.io/collector/component"
1212
)
1313

14+
var sizeDonePool = sync.Pool{
15+
New: func() any {
16+
return &sizeDone{}
17+
},
18+
}
19+
1420
var errInvalidSize = errors.New("invalid element size")
1521

1622
// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
@@ -91,11 +97,11 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool)
9197
defer sq.mu.Unlock()
9298

9399
for {
94-
if sq.size > 0 {
100+
if sq.items.hasElements() {
95101
elCtx, el, elSize := sq.items.pop()
96-
sq.size -= elSize
97-
sq.hasMoreSpace.Signal()
98-
return elCtx, el, noopDoneInst, true
102+
sd := sizeDonePool.Get().(*sizeDone)
103+
sd.reset(elSize, sq)
104+
return elCtx, el, sd, true
99105
}
100106

101107
if sq.stopped {
@@ -109,6 +115,13 @@ func (sq *memoryQueue[T]) Read(context.Context) (context.Context, T, Done, bool)
109115
}
110116
}
111117

118+
func (sq *memoryQueue[T]) onDone(elSize int64) {
119+
sq.mu.Lock()
120+
defer sq.mu.Unlock()
121+
sq.size -= elSize
122+
sq.hasMoreSpace.Signal()
123+
}
124+
112125
// Shutdown closes the queue channel to initiate draining of the queue.
113126
func (sq *memoryQueue[T]) Shutdown(context.Context) error {
114127
sq.mu.Lock()
@@ -142,6 +155,7 @@ type linkedQueue[T any] struct {
142155

143156
func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
144157
n := &node[T]{ctx: ctx, data: data, size: size}
158+
// If tail is nil means list is empty so update both head and tail to point to same element.
145159
if l.tail == nil {
146160
l.head = n
147161
l.tail = n
@@ -151,18 +165,34 @@ func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
151165
l.tail = n
152166
}
153167

168+
func (l *linkedQueue[T]) hasElements() bool {
169+
return l.head != nil
170+
}
171+
154172
func (l *linkedQueue[T]) pop() (context.Context, T, int64) {
155173
n := l.head
156174
l.head = n.next
175+
// If it gets to the last element, then update tail as well.
157176
if l.head == nil {
158177
l.tail = nil
159178
}
160179
n.next = nil
161180
return n.ctx, n.data, n.size
162181
}
163182

164-
type noopDone struct{}
183+
type sizeDone struct {
184+
size int64
185+
queue interface {
186+
onDone(int64)
187+
}
188+
}
165189

166-
func (*noopDone) OnDone(error) {}
190+
func (sd *sizeDone) reset(size int64, queue interface{ onDone(int64) }) {
191+
sd.size = size
192+
sd.queue = queue
193+
}
167194

168-
var noopDoneInst = &noopDone{}
195+
func (sd *sizeDone) OnDone(error) {
196+
defer sizeDonePool.Put(sd)
197+
sd.queue.onDone(sd.size)
198+
}

0 commit comments

Comments
 (0)