Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/engine/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func (b *streamsResultBuilder) CollectRecord(rec arrow.RecordBatch) {
})

// One of the parsed columns
case ident.ColumnType() == types.ColumnTypeParsed:
case ident.ColumnType() == types.ColumnTypeParsed || (ident.ColumnType() == types.ColumnTypeGenerated &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this added to handle error columns that are of type Generated?

shortName == types.ColumnNameError || shortName == types.ColumnNameErrorDetails):
parsedCol := col.(*array.String)

// TODO: keep errors if --strict is set
Expand Down
111 changes: 70 additions & 41 deletions pkg/engine/internal/executor/aggregator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package executor

import (
"fmt"
"maps"
"slices"
"strings"
Expand All @@ -12,14 +11,14 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/cespare/xxhash/v2"

"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)

type groupState struct {
value float64 // aggregated value
labelValues []string // grouping label values
value float64 // aggregated value
count int64 // values counter
labels []arrow.Field // grouping labels
labelValues []string // grouping label values
}

type aggregationOperation int
Expand All @@ -29,23 +28,23 @@ const (
aggregationOperationMax
aggregationOperationMin
aggregationOperationCount
aggregationOperationAvg
aggregationOperationStddev
aggregationOperationStdvar
aggregationOperationBytes
)

// aggregator is used to aggregate sample values by a set of grouping keys for each point in time.
type aggregator struct {
groupBy []physical.ColumnExpression // columns to group by
points map[time.Time]map[uint64]*groupState // holds the groupState for each point in time series
digest *xxhash.Digest // used to compute key for each group
operation aggregationOperation // aggregation type
labels []arrow.Field // combined list of all label fields for all sample values
}

// newAggregator creates a new aggregator with the specified groupBy columns.
// empty groupBy indicates no grouping. All values are aggregated into a single group.
// TODO: add without argument to support `without(...)` grouping.
// A special case of `without()` that has empty groupBy is used for Noop grouping which retains the input labels as is.
func newAggregator(groupBy []physical.ColumnExpression, pointsSizeHint int, operation aggregationOperation) *aggregator {
// newAggregator creates a new aggregator with the specified grouping.
func newAggregator(pointsSizeHint int, operation aggregationOperation) *aggregator {
a := aggregator{
groupBy: groupBy,
digest: xxhash.New(),
operation: operation,
}
Expand All @@ -61,21 +60,35 @@ func newAggregator(groupBy []physical.ColumnExpression, pointsSizeHint int, oper

// Add adds a new sample value to the aggregation for the given timestamp and grouping label values.
// It expects labelValues to be in the same order as the groupBy columns.
func (a *aggregator) Add(ts time.Time, value float64, labelValues []string) {
func (a *aggregator) Add(ts time.Time, value float64, labels []arrow.Field, labelValues []string) {
if len(labels) != len(labelValues) {
panic("len(labels) != len(labelValues)")
}

for _, label := range labels {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be expensive as we run it for each row. can we check the benchmarks for a metric test with this change?

if !slices.ContainsFunc(a.labels, func(l arrow.Field) bool {
return label.Equal(l)
}) {
a.labels = append(a.labels, label)
}
}

point, ok := a.points[ts]
if !ok {
point = make(map[uint64]*groupState)
a.points[ts] = point
}

var key uint64
if len(a.groupBy) != 0 {
if len(labelValues) != 0 {
a.digest.Reset()
for i, val := range labelValues {
if i > 0 {
_, _ = a.digest.Write([]byte{0}) // separator
}

_, _ = a.digest.WriteString(labels[i].Name)
_, _ = a.digest.Write([]byte("="))
_, _ = a.digest.WriteString(val)
}
key = a.digest.Sum64()
Expand All @@ -96,26 +109,23 @@ func (a *aggregator) Add(ts time.Time, value float64, labelValues []string) {
if value < state.value {
state.value = value
}
case aggregationOperationCount:
state.value = state.value + 1

case aggregationOperationAvg:
state.value += value
}

state.count++
} else {
v := value
if a.operation == aggregationOperationCount {
v = 1
}
count := int64(1)

if len(a.groupBy) == 0 {
if len(labels) == 0 {
// special case: All values aggregated into a single group.
// This applies to queries like `sum(...)`, `sum by () (...)`, `count_over_time by () (...)`.
point[key] = &groupState{
value: v,
value: value,
count: count,
}
return
}

// create a new slice since labelValues is reused by the calling code
labelValuesCopy := make([]string, len(labelValues))
for i, v := range labelValues {
// copy the value as this is backed by the arrow array data buffer.
Expand All @@ -126,26 +136,27 @@ func (a *aggregator) Add(ts time.Time, value float64, labelValues []string) {

// TODO: add limits on number of groups
point[key] = &groupState{
labels: labels,
labelValues: labelValuesCopy,
value: v,
value: value,
count: count,
}
}
}

func (a *aggregator) BuildRecord() (arrow.RecordBatch, error) {
fields := make([]arrow.Field, 0, len(a.groupBy)+2)
fields := make([]arrow.Field, 0, len(a.labels)+2)
fields = append(fields,
semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false),
semconv.FieldFromIdent(semconv.ColumnIdentValue, false),
)

for _, column := range a.groupBy {
colExpr, ok := column.(*physical.ColumnExpr)
if !ok {
panic(fmt.Sprintf("invalid column expression type %T", column))
}
ident := semconv.NewIdentifier(colExpr.Ref.Column, colExpr.Ref.Type, types.Loki.String)
fields = append(fields, semconv.FieldFromIdent(ident, true))
for _, label := range a.labels {
fields = append(fields, arrow.Field{
Name: label.Name,
Type: label.Type,
Nullable: true,
Metadata: label.Metadata,
})
}

schema := arrow.NewSchema(fields, nil)
Expand All @@ -156,16 +167,34 @@ func (a *aggregator) BuildRecord() (arrow.RecordBatch, error) {
tsValue, _ := arrow.TimestampFromTime(ts, arrow.Nanosecond)

for _, entry := range a.points[ts] {
var value float64
switch a.operation {
case aggregationOperationAvg:
value = entry.value / float64(entry.count)
case aggregationOperationCount:
value = float64(entry.count)
default:
value = entry.value
}

rb.Field(0).(*array.TimestampBuilder).Append(tsValue)
rb.Field(1).(*array.Float64Builder).Append(entry.value)
rb.Field(1).(*array.Float64Builder).Append(value)

for i, label := range a.labels {
builder := rb.Field(2 + i) // offset by 2 as the first 2 fields are timestamp and value

for col, val := range entry.labelValues {
builder := rb.Field(col + 2) // offset by 2 as the first 2 fields are timestamp and value
// TODO: differentiate between null and actual empty string
if val == "" {
j := slices.IndexFunc(entry.labels, func(l arrow.Field) bool {
return l.Name == label.Name
})
if j == -1 {
builder.(*array.StringBuilder).AppendNull()
} else {
builder.(*array.StringBuilder).Append(val)
// TODO: differentiate between null and actual empty string
if entry.labelValues[j] == "" {
builder.(*array.StringBuilder).AppendNull()
} else {
builder.(*array.StringBuilder).Append(entry.labelValues[j])
}
}
}
}
Expand Down
Loading