Skip to content

Commit 3989fb6

Browse files
Add OTLP translation (#36)
This adds the protocol/otlp package that can translate from OTLP metric types to SignalFx datapoints. It follows the same conversion process as the OTEL collector, but that code cannot be reused due to the internal format used within the OTEL Collector. This also adds a decoder that reads from an HTTP request and sends to a sink.
1 parent c7eec5c commit 3989fb6

10 files changed

+1370
-7
lines changed

go.mod

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ require (
2121
github.com/smartystreets/assertions v1.0.1
2222
github.com/smartystreets/goconvey v1.6.4
2323
github.com/stretchr/testify v1.7.0
24-
google.golang.org/grpc v1.40.0
24+
go.opentelemetry.io/proto/otlp v0.12.0
25+
google.golang.org/grpc v1.43.0
26+
google.golang.org/protobuf v1.27.1
2527
)
2628

2729
require (
@@ -39,6 +41,5 @@ require (
3941
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
4042
golang.org/x/text v0.3.6 // indirect
4143
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
42-
google.golang.org/protobuf v1.27.1 // indirect
4344
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
4445
)

go.sum

+9-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,11 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
155155
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
156156
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
157157
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
158+
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
158159
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
160+
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
161+
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
162+
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
159163
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
160164
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
161165
github.com/containerd/containerd v1.4.3/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
@@ -224,6 +228,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y
224228
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
225229
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
226230
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
231+
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
227232
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
228233
github.com/esimonov/ifshort v1.0.1/go.mod h1:yZqNJUrNn20K8Q9n2CrjTKYyVEmX209Hgu+M1LBpeZE=
229234
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
@@ -1120,6 +1125,8 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
11201125
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
11211126
go.opentelemetry.io/collector v0.28.0/go.mod h1:AP/BTXwo1eedoJO7V+HQ68CSvJU1lcdqOzJCgt1VsNs=
11221127
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
1128+
go.opentelemetry.io/proto/otlp v0.12.0 h1:CMJ/3Wp7iOWES+CYLfnBv+DVmPbB+kmy9PJ92XvlR6c=
1129+
go.opentelemetry.io/proto/otlp v0.12.0/go.mod h1:TsIjwGWIx5VFYv9KGVlOpxoBl5Dy+63SUguV7GGvlSQ=
11231130
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
11241131
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
11251132
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
@@ -1623,8 +1630,9 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
16231630
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
16241631
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
16251632
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
1626-
google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q=
16271633
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
1634+
google.golang.org/grpc v1.43.0 h1:Eeu7bZtDZ2DpRCsLhUlcrLnvYaMK1Gz86a+hMVvELmM=
1635+
google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
16281636
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
16291637
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
16301638
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

protocol/otlp/decoder.go

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package otlp
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"net/http"
7+
"sync"
8+
9+
"github.com/signalfx/golib/v3/datapoint/dpsink"
10+
"github.com/signalfx/golib/v3/log"
11+
"github.com/signalfx/ingest-protocols/logkey"
12+
"github.com/signalfx/ingest-protocols/protocol"
13+
"github.com/signalfx/ingest-protocols/protocol/signalfx"
14+
metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
15+
"google.golang.org/protobuf/proto"
16+
)
17+
18+
type httpMetricDecoder struct {
19+
sink dpsink.Sink
20+
logger log.Logger
21+
buffs sync.Pool
22+
}
23+
24+
func NewHTTPMetricDecoder(sink dpsink.Sink, logger log.Logger) signalfx.ErrorReader {
25+
return &httpMetricDecoder{
26+
sink: sink,
27+
logger: log.NewContext(logger).With(logkey.Protocol, "otlp"),
28+
buffs: sync.Pool{
29+
New: func() interface{} {
30+
return new(bytes.Buffer)
31+
},
32+
},
33+
}
34+
}
35+
36+
func (d *httpMetricDecoder) Read(ctx context.Context, req *http.Request) (err error) {
37+
jeff := d.buffs.Get().(*bytes.Buffer)
38+
defer d.buffs.Put(jeff)
39+
jeff.Reset()
40+
if err = protocol.ReadFromRequest(jeff, req, d.logger); err != nil {
41+
return err
42+
}
43+
var msg metricsservicev1.ExportMetricsServiceRequest
44+
if err = proto.Unmarshal(jeff.Bytes(), &msg); err != nil {
45+
return err
46+
}
47+
dps := FromOTLPMetricRequest(&msg)
48+
if len(dps) > 0 {
49+
err = d.sink.AddDatapoints(ctx, dps)
50+
}
51+
return nil
52+
}

protocol/otlp/decoder_test.go

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package otlp
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"errors"
7+
"io"
8+
"net/http"
9+
"sync"
10+
"testing"
11+
12+
"github.com/signalfx/golib/v3/datapoint/dptest"
13+
"github.com/signalfx/golib/v3/log"
14+
. "github.com/smartystreets/goconvey/convey"
15+
metricsservicev1 "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
16+
commonv1 "go.opentelemetry.io/proto/otlp/common/v1"
17+
metricsv1 "go.opentelemetry.io/proto/otlp/metrics/v1"
18+
"google.golang.org/protobuf/proto"
19+
)
20+
21+
var errReadErr = errors.New("could not read")
22+
23+
type errorReader struct{}
24+
25+
func (errorReader *errorReader) Read([]byte) (int, error) {
26+
return 0, errReadErr
27+
}
28+
29+
func TestDecoder(t *testing.T) {
30+
Convey("httpMetricDecoder", t, func() {
31+
sendTo := dptest.NewBasicSink()
32+
decoder := NewHTTPMetricDecoder(sendTo, log.Discard)
33+
34+
Convey("Bad request reading", func() {
35+
req := &http.Request{
36+
Body: io.NopCloser(&errorReader{}),
37+
}
38+
req.ContentLength = 1
39+
ctx := context.Background()
40+
So(decoder.Read(ctx, req), ShouldEqual, errReadErr)
41+
})
42+
43+
Convey("Bad request content", func() {
44+
req := &http.Request{
45+
Body: io.NopCloser(bytes.NewBufferString("asdf")),
46+
}
47+
req.ContentLength = 4
48+
ctx := context.Background()
49+
So(decoder.Read(ctx, req), ShouldNotBeNil)
50+
})
51+
52+
Convey("Good request", func(c C) {
53+
var msg metricsservicev1.ExportMetricsServiceRequest
54+
msg.ResourceMetrics = []*metricsv1.ResourceMetrics{
55+
{
56+
InstrumentationLibraryMetrics: []*metricsv1.InstrumentationLibraryMetrics{
57+
{
58+
Metrics: []*metricsv1.Metric{
59+
{
60+
Name: "test",
61+
Data: &metricsv1.Metric_Gauge{
62+
Gauge: &metricsv1.Gauge{
63+
DataPoints: []*metricsv1.NumberDataPoint{
64+
{
65+
Attributes: []*commonv1.KeyValue{},
66+
StartTimeUnixNano: 1000,
67+
TimeUnixNano: 1000,
68+
Value: &metricsv1.NumberDataPoint_AsInt{AsInt: 4},
69+
},
70+
},
71+
},
72+
},
73+
},
74+
},
75+
},
76+
},
77+
},
78+
}
79+
b, _ := proto.Marshal(&msg)
80+
req := &http.Request{
81+
Body: io.NopCloser(bytes.NewBuffer(b)),
82+
}
83+
req.ContentLength = int64(len(b))
84+
ctx := context.Background()
85+
86+
var wg sync.WaitGroup
87+
wg.Add(1)
88+
go func() {
89+
dp := <-sendTo.PointsChan
90+
c.So(dp, ShouldNotBeNil)
91+
wg.Done()
92+
}()
93+
94+
So(decoder.Read(ctx, req), ShouldBeNil)
95+
96+
wg.Wait()
97+
})
98+
})
99+
}

0 commit comments

Comments
 (0)