Skip to content

Commit 2d45881

Browse files
committed
Bytes based batching for profiles
Signed-off-by: Israel Blancas <[email protected]>
1 parent eb00c82 commit 2d45881

File tree

7 files changed

+196
-69
lines changed

7 files changed

+196
-69
lines changed

exporter/exporterbatcher/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ var SizerTypeRequests = exporterhelper.RequestSizerTypeRequests
2222
// Deprecated: [v0.123.0] use exporterhelper.RequestSizerTypeItems
2323
var SizerTypeItems = exporterhelper.RequestSizerTypeItems
2424

25-
// Deprecated: [v0.123.0] use exporterhelper.RequestSizerTypeRequests
25+
// Deprecated: [v0.123.0] use exporterhelper.RequestSizerTypeBytes
2626
var SizerTypeBytes = exporterhelper.RequestSizerTypeBytes
2727

2828
// Deprecated: [v0.123.0] use exporterhelper.NewDefaultBatcherConfig
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package sizer // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
5+
6+
import (
7+
"go.opentelemetry.io/collector/pdata/pprofile"
8+
)
9+
10+
type ProfilesSizer interface {
11+
ProfilesSize(pd pprofile.Profiles) int
12+
ResourceProfilesSize(rp pprofile.ResourceProfiles) int
13+
ScopeProfilesSize(sp pprofile.ScopeProfiles) int
14+
ProfileSize(p pprofile.Profile) int
15+
DeltaSize(newItemSize int) int
16+
}
17+
18+
// TracesBytesSizer returns the byte size of serialized protos.
19+
type ProfilesBytesSizer struct {
20+
pprofile.ProtoMarshaler
21+
protoDeltaSizer
22+
}
23+
24+
var _ ProfilesSizer = (*ProfilesBytesSizer)(nil)
25+
26+
// ProfilesCountSizer returns the number of profiles in the profiles.
27+
type ProfilesCountSizer struct{}
28+
29+
var _ ProfilesSizer = (*ProfilesCountSizer)(nil)
30+
31+
func (s *ProfilesCountSizer) ProfilesSize(pd pprofile.Profiles) int {
32+
return pd.SampleCount()
33+
}
34+
35+
func (s *ProfilesCountSizer) ResourceProfilesSize(rp pprofile.ResourceProfiles) int {
36+
count := 0
37+
for k := 0; k < rp.ScopeProfiles().Len(); k++ {
38+
count += rp.ScopeProfiles().At(k).Profiles().Len()
39+
}
40+
return count
41+
}
42+
43+
func (s *ProfilesCountSizer) ScopeProfilesSize(sp pprofile.ScopeProfiles) int {
44+
return sp.Profiles().Len()
45+
}
46+
47+
func (s *ProfilesCountSizer) ProfileSize(_ pprofile.Profile) int {
48+
return 1
49+
}
50+
51+
func (s *ProfilesCountSizer) DeltaSize(newItemSize int) int {
52+
return newItemSize
53+
}

exporter/exporterhelper/xexporterhelper/profiles.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"go.opentelemetry.io/collector/exporter/exporterhelper"
1818
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
1919
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
20+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
2021
"go.opentelemetry.io/collector/exporter/xexporter"
2122
"go.opentelemetry.io/collector/pdata/pprofile"
2223
"go.opentelemetry.io/collector/pipeline/xpipeline"
@@ -46,14 +47,14 @@ func NewProfilesQueueBatchSettings() exporterhelper.QueueBatchSettings {
4647
}
4748

4849
type profilesRequest struct {
49-
pd pprofile.Profiles
50-
cachedItemsCount int
50+
pd pprofile.Profiles
51+
cachedSize int
5152
}
5253

5354
func newProfilesRequest(pd pprofile.Profiles) exporterhelper.Request {
5455
return &profilesRequest{
55-
pd: pd,
56-
cachedItemsCount: pd.SampleCount(),
56+
pd: pd,
57+
cachedSize: -1,
5758
}
5859
}
5960

@@ -80,11 +81,18 @@ func (req *profilesRequest) OnError(err error) exporterhelper.Request {
8081
}
8182

8283
func (req *profilesRequest) ItemsCount() int {
83-
return req.cachedItemsCount
84+
return req.pd.SampleCount()
8485
}
8586

86-
func (req *profilesRequest) setCachedItemsCount(count int) {
87-
req.cachedItemsCount = count
87+
func (req *profilesRequest) size(sizer sizer.ProfilesSizer) int {
88+
if req.cachedSize == -1 {
89+
req.cachedSize = sizer.ProfilesSize(req.pd)
90+
}
91+
return req.cachedSize
92+
}
93+
94+
func (req *profilesRequest) setCachedSize(size int) {
95+
req.cachedSize = size
8896
}
8997

9098
type profileExporter struct {

exporter/exporterhelper/xexporterhelper/profiles_batch.go

Lines changed: 103 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -8,108 +8,157 @@ import (
88
"errors"
99

1010
"go.opentelemetry.io/collector/exporter/exporterhelper"
11+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1112
"go.opentelemetry.io/collector/pdata/pprofile"
1213
)
1314

1415
// MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig.
15-
func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, _ exporterhelper.RequestSizerType, r2 exporterhelper.Request) ([]exporterhelper.Request, error) {
16+
func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt exporterhelper.RequestSizerType, r2 exporterhelper.Request) ([]exporterhelper.Request, error) {
17+
var sz sizer.ProfilesSizer
18+
switch szt {
19+
case exporterhelper.RequestSizerTypeItems:
20+
sz = &sizer.ProfilesCountSizer{}
21+
case exporterhelper.RequestSizerTypeBytes:
22+
sz = &sizer.ProfilesBytesSizer{}
23+
default:
24+
return nil, errors.New("unknown sizer type")
25+
}
26+
1627
if r2 != nil {
1728
req2, ok := r2.(*profilesRequest)
1829
if !ok {
1930
return nil, errors.New("invalid input type")
2031
}
21-
req2.mergeTo(req)
32+
req2.mergeTo(req, sz)
2233
}
2334

2435
// If no limit we can simply merge the new request into the current and return.
2536
if maxSize == 0 {
2637
return []exporterhelper.Request{req}, nil
2738
}
28-
return req.split(maxSize)
39+
return req.split(maxSize, sz)
2940
}
3041

31-
func (req *profilesRequest) mergeTo(dst *profilesRequest) {
32-
dst.setCachedItemsCount(dst.ItemsCount() + req.ItemsCount())
33-
req.setCachedItemsCount(0)
42+
func (req *profilesRequest) mergeTo(dst *profilesRequest, sz sizer.ProfilesSizer) {
43+
if sz != nil {
44+
dst.setCachedSize(dst.size(sz) + req.size(sz))
45+
req.setCachedSize(0)
46+
}
3447
req.pd.ResourceProfiles().MoveAndAppendTo(dst.pd.ResourceProfiles())
3548
}
3649

37-
func (req *profilesRequest) split(maxSize int) ([]exporterhelper.Request, error) {
50+
func (req *profilesRequest) split(maxSize int, sz sizer.ProfilesSizer) []exporterhelper.Request {
3851
var res []exporterhelper.Request
39-
for req.ItemsCount() > maxSize {
40-
pd := extractProfiles(req.pd, maxSize)
41-
size := pd.SampleCount()
42-
req.setCachedItemsCount(req.ItemsCount() - size)
43-
res = append(res, &profilesRequest{pd: pd, cachedItemsCount: size})
52+
for req.size(sz) > maxSize {
53+
pd, rmSize := extractProfiles(req.pd, maxSize, sz)
54+
req.setCachedSize(req.size(sz) - rmSize)
55+
res = append(res, newProfilesRequest(pd))
4456
}
4557
res = append(res, req)
46-
return res, nil
58+
return res
4759
}
4860

4961
// extractProfiles extracts a new profiles with a maximum number of samples.
50-
func extractProfiles(srcProfiles pprofile.Profiles, count int) pprofile.Profiles {
62+
func extractProfiles(srcProfiles pprofile.Profiles, capacity int, sz sizer.ProfilesSizer) (pprofile.Profiles, int) {
5163
destProfiles := pprofile.NewProfiles()
52-
srcProfiles.ResourceProfiles().RemoveIf(func(srcRS pprofile.ResourceProfiles) bool {
53-
if count == 0 {
64+
capacityLeft := capacity - sz.ProfilesSize(destProfiles)
65+
removedSize := 0
66+
srcProfiles.ResourceProfiles().RemoveIf(func(srcRP pprofile.ResourceProfiles) bool {
67+
// If the no more capacity left just return.
68+
if capacityLeft == 0 {
5469
return false
5570
}
56-
needToExtract := samplesCount(srcRS) > count
57-
if needToExtract {
58-
srcRS = extractResourceProfiles(srcRS, count)
71+
rawRpSize := sz.ResourceProfilesSize(srcRP)
72+
rpSize := sz.DeltaSize(rawRpSize)
73+
74+
if rpSize > capacityLeft {
75+
extSrcRP, extRpSize := extractResourceProfiles(srcRP, capacityLeft, sz)
76+
// This cannot make it to exactly 0 for the bytes,
77+
// force it to be 0 since that is the stopping condition.
78+
capacityLeft = 0
79+
removedSize += extRpSize
80+
// There represents the delta between the delta sizes.
81+
removedSize += rpSize - rawRpSize - (sz.DeltaSize(rawRpSize-extRpSize) - (rawRpSize - extRpSize))
82+
// It is possible that for the bytes scenario, the extracted field contains no profiles.
83+
// Do not add it to the destination if that is the case.
84+
if extSrcRP.ScopeProfiles().Len() > 0 {
85+
extSrcRP.MoveTo(destProfiles.ResourceProfiles().AppendEmpty())
86+
}
87+
return extSrcRP.ScopeProfiles().Len() != 0
5988
}
60-
count -= samplesCount(srcRS)
61-
srcRS.MoveTo(destProfiles.ResourceProfiles().AppendEmpty())
62-
return !needToExtract
89+
capacityLeft -= rpSize
90+
removedSize += rpSize
91+
srcRP.MoveTo(destProfiles.ResourceProfiles().AppendEmpty())
92+
return true
6393
})
64-
return destProfiles
94+
return destProfiles, removedSize
6595
}
6696

6797
// extractResourceProfiles extracts profiles and returns a new resource profiles with the specified number of profiles.
68-
func extractResourceProfiles(srcRS pprofile.ResourceProfiles, count int) pprofile.ResourceProfiles {
69-
destRS := pprofile.NewResourceProfiles()
70-
destRS.SetSchemaUrl(srcRS.SchemaUrl())
71-
srcRS.Resource().CopyTo(destRS.Resource())
72-
srcRS.ScopeProfiles().RemoveIf(func(srcSS pprofile.ScopeProfiles) bool {
73-
if count == 0 {
98+
func extractResourceProfiles(srcRP pprofile.ResourceProfiles, capacity int, sz sizer.ProfilesSizer) (pprofile.ResourceProfiles, int) {
99+
destRP := pprofile.NewResourceProfiles()
100+
destRP.SetSchemaUrl(srcRP.SchemaUrl())
101+
srcRP.Resource().CopyTo(destRP.Resource())
102+
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
103+
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.ResourceProfilesSize(destRP)
104+
removedSize := 0
105+
106+
srcRP.ScopeProfiles().RemoveIf(func(srcSS pprofile.ScopeProfiles) bool {
107+
// If the no more capacity left just return.
108+
if capacityLeft == 0 {
74109
return false
75110
}
76-
needToExtract := srcSS.Profiles().Len() > count
77-
if needToExtract {
78-
srcSS = extractScopeProfiles(srcSS, count)
111+
112+
rawSlSize := sz.ScopeProfilesSize(srcSS)
113+
ssSize := sz.DeltaSize(rawSlSize)
114+
if ssSize > capacityLeft {
115+
extSrcSS, extSsSize := extractScopeProfiles(srcSS, capacityLeft, sz)
116+
// This cannot make it to exactly 0 for the bytes,
117+
// force it to be 0 since that is the stopping condition.
118+
capacityLeft = 0
119+
removedSize += extSsSize
120+
// There represents the delta between the delta sizes.
121+
removedSize += ssSize - rawSlSize - (sz.DeltaSize(rawSlSize-extSsSize) - (rawSlSize - extSsSize))
122+
// It is possible that for the bytes scenario, the extracted field contains no profiles.
123+
// Do not add it to the destination if that is the case.
124+
if extSrcSS.Profiles().Len() > 0 {
125+
extSrcSS.MoveTo(destRP.ScopeProfiles().AppendEmpty())
126+
}
127+
return extSrcSS.Profiles().Len() != 0
79128
}
80-
count -= srcSS.Profiles().Len()
81-
srcSS.MoveTo(destRS.ScopeProfiles().AppendEmpty())
82-
return !needToExtract
129+
capacityLeft -= ssSize
130+
removedSize += ssSize
131+
srcSS.MoveTo(destRP.ScopeProfiles().AppendEmpty())
132+
return true
83133
})
84-
srcRS.Resource().CopyTo(destRS.Resource())
85-
return destRS
134+
135+
return destRP, removedSize
86136
}
87137

88138
// extractScopeProfiles extracts profiles and returns a new scope profiles with the specified number of profiles.
89-
func extractScopeProfiles(srcSS pprofile.ScopeProfiles, count int) pprofile.ScopeProfiles {
139+
func extractScopeProfiles(srcSS pprofile.ScopeProfiles, capacity int, sz sizer.ProfilesSizer) (pprofile.ScopeProfiles, int) {
90140
destSS := pprofile.NewScopeProfiles()
91141
destSS.SetSchemaUrl(srcSS.SchemaUrl())
92142
srcSS.Scope().CopyTo(destSS.Scope())
143+
// Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
144+
capacityLeft := capacity - (sz.DeltaSize(capacity) - capacity) - sz.ScopeProfilesSize(destSS)
145+
removedSize := 0
93146
srcSS.Profiles().RemoveIf(func(srcProfile pprofile.Profile) bool {
94-
if count == 0 {
147+
// If the no more capacity left just return.
148+
if capacityLeft == 0 {
149+
return false
150+
}
151+
rsSize := sz.DeltaSize(sz.ProfileSize(srcProfile))
152+
if rsSize > capacityLeft {
153+
// This cannot make it to exactly 0 for the bytes,
154+
// force it to be 0 since that is the stopping condition.
155+
capacityLeft = 0
95156
return false
96157
}
158+
capacityLeft -= rsSize
159+
removedSize += rsSize
97160
srcProfile.MoveTo(destSS.Profiles().AppendEmpty())
98-
count--
99161
return true
100162
})
101-
return destSS
102-
}
103-
104-
// resourceProfilessCount calculates the total number of profiles in the pdata.ResourceProfiles.
105-
func samplesCount(rs pprofile.ResourceProfiles) int {
106-
count := 0
107-
rs.ScopeProfiles().RemoveIf(func(ss pprofile.ScopeProfiles) bool {
108-
ss.Profiles().RemoveIf(func(sp pprofile.Profile) bool {
109-
count += sp.Sample().Len()
110-
return false
111-
})
112-
return false
113-
})
114-
return count
163+
return destSS, removedSize
115164
}

exporter/exporterhelper/xexporterhelper/profiles_batch_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"go.opentelemetry.io/collector/exporter/exporterhelper"
1414
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
15+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1516
"go.opentelemetry.io/collector/pdata/pprofile"
1617
"go.opentelemetry.io/collector/pdata/testdata"
1718
)
@@ -126,7 +127,11 @@ func TestMergeSplitProfiles(t *testing.T) {
126127
require.NoError(t, err)
127128
assert.Equal(t, len(tt.expected), len(res))
128129
for i, r := range res {
129-
assert.Equal(t, tt.expected[i], r)
130+
expected := tt.expected[i].(*profilesRequest)
131+
actual := r.(*profilesRequest)
132+
133+
assert.Equal(t, expected.size(&sizer.ProfilesCountSizer{}), actual.size(&sizer.ProfilesCountSizer{}))
134+
assert.Equal(t, expected.ItemsCount(), actual.ItemsCount())
130135
}
131136
})
132137
}
@@ -135,7 +140,7 @@ func TestMergeSplitProfiles(t *testing.T) {
135140
func TestExtractProfiles(t *testing.T) {
136141
for i := 0; i < 10; i++ {
137142
ld := testdata.GenerateProfiles(10)
138-
extractedProfiles := extractProfiles(ld, i)
143+
extractedProfiles, _ := extractProfiles(ld, i, &sizer.ProfilesCountSizer{})
139144
assert.Equal(t, i, extractedProfiles.SampleCount())
140145
assert.Equal(t, 10-i, ld.SampleCount())
141146
}

exporter/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
go.opentelemetry.io/collector/extension/extensiontest v0.122.1
1717
go.opentelemetry.io/collector/extension/xextension v0.122.1
1818
go.opentelemetry.io/collector/pdata v1.28.1
19+
go.opentelemetry.io/collector/pdata/pprofile v0.122.1
1920
go.opentelemetry.io/collector/pdata/testdata v0.122.1
2021
go.opentelemetry.io/collector/pipeline v0.122.1
2122
go.opentelemetry.io/otel v1.35.0
@@ -50,7 +51,6 @@ require (
5051
go.opentelemetry.io/collector/exporter/xexporter v0.122.1 // indirect
5152
go.opentelemetry.io/collector/extension v1.28.1 // indirect
5253
go.opentelemetry.io/collector/featuregate v1.28.1 // indirect
53-
go.opentelemetry.io/collector/pdata/pprofile v0.122.1 // indirect
5454
go.opentelemetry.io/collector/receiver v1.28.1 // indirect
5555
go.opentelemetry.io/collector/receiver/receivertest v0.122.1 // indirect
5656
go.opentelemetry.io/collector/receiver/xreceiver v0.122.1 // indirect

0 commit comments

Comments
 (0)