diff --git a/pkg/debuginfo/fake.go b/pkg/debuginfo/fake.go new file mode 100644 index 000000000..4515c339c --- /dev/null +++ b/pkg/debuginfo/fake.go @@ -0,0 +1,47 @@ +package debuginfo + +import ( + "context" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + + parcadebuginfov1 "github.com/parca-dev/parca/gen/proto/go/parca/debuginfo/v1alpha1" +) + +type fakeDebugInfo struct { + parcadebuginfov1.UnimplementedDebugInfoServiceServer + + logger log.Logger +} + +func New(logger log.Logger) parcadebuginfov1.DebugInfoServiceServer { + return &fakeDebugInfo{ + logger: logger, + } +} + +// Exists returns true if the given build_id has debug info uploaded for it. +func (f *fakeDebugInfo) Exists(ctx context.Context, req *parcadebuginfov1.ExistsRequest) (*parcadebuginfov1.ExistsResponse, error) { + level.Warn(f.logger).Log("msg", "received exists request", "buildid", req.GetBuildId(), "hash", req.GetHash()) + + return &parcadebuginfov1.ExistsResponse{ + Exists: false, + }, nil +} + +// Upload ingests debug info for a given build_id +func (f *fakeDebugInfo) Upload(u parcadebuginfov1.DebugInfoService_UploadServer) error { + req, err := u.Recv() + if err != nil { + return err + } + level.Warn(f.logger).Log("msg", "received upload", "buildid", req.GetInfo().GetBuildId(), "hash", req.GetInfo().GetHash()) + + return nil +} + +// Download returns the debug info for a given build_id. +func (_ *fakeDebugInfo) Download(*parcadebuginfov1.DownloadRequest, parcadebuginfov1.DebugInfoService_DownloadServer) error { + return nil +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 3eebe3d75..e85f66af6 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -12,11 +12,13 @@ import ( "github.com/bufbuild/connect-go" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/google/uuid" "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" "github.com/opentracing/opentracing-go" + parcastorev1 "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1" "github.com/parca-dev/parca/pkg/scrape" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -147,6 +149,8 @@ func (d *Distributor) Push(ctx context.Context, req *connect.Request[pushv1.Push p.Normalize() + level.Warn(d.logger).Log("msg", "received sample", "labels", firemodel.LabelPairsString(series.Labels), "type", p.StringTable[p.SampleType[0].Type]) + // reuse the data buffer if possible size := p.SizeVT() if cap(data) < size { @@ -309,3 +313,44 @@ func TokenFor(tenantID, labels string) uint32 { _, _ = h.Write([]byte(labels)) return h.Sum32() } + +func (d *Distributor) ParcaProfileStore() parcastorev1.ProfileStoreServiceServer { + return &ParcaProfileStore{ + distributor: d, + } +} + +type ParcaProfileStore struct { + parcastorev1.UnimplementedProfileStoreServiceServer + distributor *Distributor +} + +func (s *ParcaProfileStore) WriteRaw(ctx context.Context, req *parcastorev1.WriteRawRequest) (*parcastorev1.WriteRawResponse, error) { + nReq := &pushv1.PushRequest{ + Series: make([]*pushv1.RawProfileSeries, len(req.Series)), + } + for idxSeries, series := range req.Series { + nReq.Series[idxSeries] = &pushv1.RawProfileSeries{ + Samples: make([]*pushv1.RawSample, len(series.Samples)), + Labels: make([]*commonv1.LabelPair, len(series.Labels.Labels)), + } + for idx, l := range series.Labels.Labels { + nReq.Series[idxSeries].Labels[idx] = &commonv1.LabelPair{ + Name: l.Name, + Value: l.Value, + } + } + for idx, s := range series.Samples { + nReq.Series[idxSeries].Samples[idx] = &pushv1.RawSample{ + RawProfile: s.RawProfile, + } + } + level.Warn(s.distributor.logger).Log("msg", "converted parca sample", "labels", firemodel.LabelPairsString(nReq.Series[idxSeries].Labels)) + } + + if _, err := s.distributor.Push(ctx, connect.NewRequest(nReq)); err != nil { + return nil, err + } + + return &parcastorev1.WriteRawResponse{}, nil +} diff --git a/pkg/fire/modules.go b/pkg/fire/modules.go index a872d60a4..b72cd3828 100644 --- a/pkg/fire/modules.go +++ b/pkg/fire/modules.go @@ -13,8 +13,9 @@ import ( "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" grpcgw "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + parcadebuginfov1 "github.com/parca-dev/parca/gen/proto/go/parca/debuginfo/v1alpha1" + parcastorev1 "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/version" "github.com/thanos-io/thanos/pkg/discovery/dns" @@ -24,6 +25,7 @@ import ( "golang.org/x/net/http2/h2c" "github.com/grafana/fire/pkg/agent" + "github.com/grafana/fire/pkg/debuginfo" "github.com/grafana/fire/pkg/distributor" "github.com/grafana/fire/pkg/firedb" agentv1 "github.com/grafana/fire/pkg/gen/agent/v1" @@ -107,7 +109,13 @@ func (f *Fire) initDistributor() (services.Service, error) { // initialise direct pusher, this overwrites the default HTTP client f.pusherClient = d + // register pusher pushv1connect.RegisterPusherServiceHandler(f.Server.HTTP, d) + + // register parca compatible profile store + parcastorev1.RegisterProfileStoreServiceServer(f.Server.GRPC, d.ParcaProfileStore()) + parcadebuginfov1.RegisterDebugInfoServiceServer(f.Server.GRPC, debuginfo.New(f.logger)) + return d, nil } diff --git a/pkg/firedb/head_test.go b/pkg/firedb/head_test.go index 936cdf98c..e22abcdb9 100644 --- a/pkg/firedb/head_test.go +++ b/pkg/firedb/head_test.go @@ -301,6 +301,7 @@ func TestHeadIngestRealProfiles(t *testing.T) { profilePaths := []string{ "testdata/heap", "testdata/profile", + "testdata/parca-agent", } head, err := NewHead(t.TempDir()) diff --git a/pkg/firedb/locations.go b/pkg/firedb/locations.go index ca9416903..199b0118d 100644 --- a/pkg/firedb/locations.go +++ b/pkg/firedb/locations.go @@ -53,7 +53,10 @@ func (*locationsHelper) addToRewriter(r *rewriter, elemRewriter idConversionTabl } func (*locationsHelper) rewrite(r *rewriter, l *profilev1.Location) error { - r.mappings.rewriteUint64(&l.MappingId) + // ignore mappingIDs of 0, as they indicate that it has already been symbolized. + if l.MappingId != 0 { + r.mappings.rewriteUint64(&l.MappingId) + } for pos := range l.Line { r.functions.rewriteUint64(&l.Line[pos].FunctionId) diff --git a/pkg/firedb/profiles.go b/pkg/firedb/profiles.go index becb36010..600461693 100644 --- a/pkg/firedb/profiles.go +++ b/pkg/firedb/profiles.go @@ -280,7 +280,6 @@ func (*profilesHelper) key(s *schemav1.Profile) noKey { } func (*profilesHelper) addToRewriter(r *rewriter, elemRewriter idConversionTable) { - r.locations = elemRewriter } func (*profilesHelper) rewrite(r *rewriter, s *schemav1.Profile) error { diff --git a/pkg/firedb/testdata/parca-agent b/pkg/firedb/testdata/parca-agent new file mode 100644 index 000000000..78aed6b76 Binary files /dev/null and b/pkg/firedb/testdata/parca-agent differ