Skip to content

Commit a06616b

Browse files
committed
feat(chdump): add OTLP logs ingester
1 parent 9ffc737 commit a06616b

File tree

2 files changed

+133
-0
lines changed

2 files changed

+133
-0
lines changed

cmd/otelbench/chdump/ingester.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package chdump
2+
3+
import (
4+
"context"
5+
6+
"go.opentelemetry.io/collector/pdata/plog"
7+
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
8+
"golang.org/x/sync/errgroup"
9+
)
10+
11+
// IngestLogs loads logs from dump and sends them to the collector.
12+
type IngestLogs struct {
13+
// Workers is the number of workers to use.
14+
//
15+
// Defaults to 1.
16+
Workers int
17+
}
18+
19+
// Run ingests logs.
20+
func (lg IngestLogs) Run(ctx context.Context, client plogotlp.GRPCClient, tr TableReader) error {
21+
var (
22+
workers = max(lg.Workers, 1)
23+
batcnCh = make(chan plog.Logs, workers)
24+
)
25+
grp, grpCtx := errgroup.WithContext(ctx)
26+
grp.Go(func() error {
27+
defer close(batcnCh)
28+
return Consume(tr, ConsumeOptions{
29+
OnLogs: func(t *Logs) error {
30+
ctx := grpCtx
31+
32+
batch := plog.NewLogs()
33+
t.ToOTLP(batch)
34+
35+
select {
36+
case <-ctx.Done():
37+
return ctx.Err()
38+
case batcnCh <- batch:
39+
return nil
40+
}
41+
},
42+
})
43+
})
44+
for range workers {
45+
grp.Go(func() error {
46+
ctx := grpCtx
47+
for batch := range batcnCh {
48+
if _, err := client.Export(ctx, plogotlp.NewExportRequestFromLogs(batch)); err != nil {
49+
return err
50+
}
51+
}
52+
return nil
53+
})
54+
}
55+
return grp.Wait()
56+
}

cmd/otelbench/chdump/logs.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"slices"
66

77
"github.com/ClickHouse/ch-go/proto"
8+
"go.opentelemetry.io/collector/pdata/pcommon"
9+
"go.opentelemetry.io/collector/pdata/plog"
810

911
"github.com/go-faster/oteldb/internal/otelstorage"
1012
)
@@ -75,6 +77,81 @@ func (c *Logs) Reset() {
7577
}
7678
}
7779

80+
// ToOTLP appends data from [Logs] to given batch.
81+
func (c *Logs) ToOTLP(batch plog.Logs) {
82+
resMap := map[otelstorage.Hash]plog.ResourceLogs{}
83+
resLogs := batch.ResourceLogs()
84+
for i := range resLogs.Len() {
85+
resLog := resLogs.At(i)
86+
attrs := otelstorage.Attrs(resLog.Resource().Attributes())
87+
resMap[attrs.Hash()] = resLog
88+
}
89+
90+
getResLog := func(resourceAttrs otelstorage.Attrs) plog.ResourceLogs {
91+
hash := resourceAttrs.Hash()
92+
93+
resLog, ok := resMap[hash]
94+
if !ok {
95+
resLog = resLogs.AppendEmpty()
96+
resource := resLog.Resource()
97+
resourceAttrs.AsMap().CopyTo(resource.Attributes())
98+
99+
resMap[hash] = resLog
100+
}
101+
return resLog
102+
}
103+
getScopeLog := func(resLog plog.ResourceLogs, scopeAttrs otelstorage.Attrs, scopeName, scopeVersion string) plog.ScopeLogs {
104+
scopeLogs := resLog.ScopeLogs()
105+
scopeAttrsHash := scopeAttrs.Hash()
106+
107+
for i := range scopeLogs.Len() {
108+
scopeLog := scopeLogs.At(i)
109+
scope := scopeLog.Scope()
110+
if scope.Name() == scopeName &&
111+
scope.Version() == scopeVersion &&
112+
otelstorage.Attrs(scope.Attributes()).Hash() == scopeAttrsHash {
113+
return scopeLog
114+
}
115+
}
116+
scopeLog := scopeLogs.AppendEmpty()
117+
118+
scope := scopeLog.Scope()
119+
scope.SetName(scopeName)
120+
scope.SetVersion(scopeVersion)
121+
scopeAttrs.AsMap().CopyTo(scope.Attributes())
122+
123+
return scopeLog
124+
}
125+
126+
for row := range c.Body.Rows() {
127+
timestamp := c.Timestamp.Row(row)
128+
severityText := c.SeverityText.Row(row)
129+
severityNumber := c.SeverityNumber.Row(row)
130+
traceFlags := c.TraceFlags.Row(row)
131+
traceID := c.TraceID.Row(row)
132+
spanID := c.SpanID.Row(row)
133+
body := c.Body.Row(row)
134+
attributes := c.Attributes.Row(row)
135+
resource := c.Resource.Row(row)
136+
scope := c.Scope.Row(row)
137+
scopeName := c.ScopeName.Row(row)
138+
scopeVersion := c.ScopeVersion.Row(row)
139+
140+
resLog := getResLog(resource)
141+
scopeLog := getScopeLog(resLog, scope, scopeName, scopeVersion)
142+
record := scopeLog.LogRecords().AppendEmpty()
143+
144+
record.SetTimestamp(otelstorage.NewTimestampFromTime(timestamp))
145+
record.SetSeverityText(severityText)
146+
record.SetSeverityNumber(plog.SeverityNumber(severityNumber))
147+
record.SetFlags(plog.LogRecordFlags(traceFlags))
148+
record.SetTraceID(pcommon.TraceID(traceID))
149+
record.SetSpanID(pcommon.SpanID(spanID))
150+
record.Body().SetStr(body)
151+
attributes.CopyTo(record.Attributes())
152+
}
153+
}
154+
78155
func (c *Logs) columns() iter.Seq[proto.ResultColumn] {
79156
return func(yield func(proto.ResultColumn) bool) {
80157
for _, col := range []proto.ResultColumn{

0 commit comments

Comments
 (0)