-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlogs_service.go
More file actions
193 lines (157 loc) · 5.35 KB
/
logs_service.go
File metadata and controls
193 lines (157 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package main
import (
"context"
"fmt"
"log/slog"
"time"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
otellogs "go.opentelemetry.io/proto/otlp/logs/v1"
)
type dash0LogsServiceServer struct {
addr string
config *Config
counter *AttributeCounter
extractor *AttributeExtractor
collogspb.UnimplementedLogsServiceServer
}
func newServer(addr string, config *Config) (collogspb.LogsServiceServer, error) {
// Create attribute counter
counter, err := NewAttributeCounter(config)
if err != nil {
return nil, fmt.Errorf("failed to create attribute counter: %w", err)
}
// Create attribute extractor
extractor := NewAttributeExtractor(config.AttributeKey)
s := &dash0LogsServiceServer{
addr: addr,
config: config,
counter: counter,
extractor: extractor,
}
// Start the counter's background processes
counter.Start()
return s, nil
}
// Export processes incoming OTLP log requests and extracts configured attributes
func (l *dash0LogsServiceServer) Export(ctx context.Context, request *collogspb.ExportLogsServiceRequest) (*collogspb.ExportLogsServiceResponse, error) {
startTime := time.Now()
// Add timeout to prevent long-running requests
processCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// Validate request
if request == nil {
slog.WarnContext(ctx, "Received nil ExportLogsServiceRequest")
return &collogspb.ExportLogsServiceResponse{}, nil
}
resourceLogsCount := len(request.GetResourceLogs())
slog.DebugContext(processCtx, "Received ExportLogsServiceRequest",
"resource_logs_count", resourceLogsCount)
if resourceLogsCount == 0 {
return &collogspb.ExportLogsServiceResponse{}, nil
}
// Process logs and count attributes
processedRecords, rejectedRecords := l.processLogs(processCtx, request)
// Update global metrics
logsReceivedCounter.Add(processCtx, processedRecords)
processingDuration := time.Since(startTime)
slog.InfoContext(processCtx, "Processed log export request",
"processed_records", processedRecords,
"rejected_records", rejectedRecords,
"processing_duration_ms", processingDuration.Microseconds(),
"attribute_key", l.config.AttributeKey)
response := &collogspb.ExportLogsServiceResponse{}
// Include partial success information if there were rejections
if rejectedRecords > 0 {
response.PartialSuccess = &collogspb.ExportLogsPartialSuccess{
RejectedLogRecords: rejectedRecords,
ErrorMessage: fmt.Sprintf("Rejected %d log records due to processing errors", rejectedRecords),
}
}
return response, nil
}
// processLogs extracts attributes from all log records and updates counters
func (l *dash0LogsServiceServer) processLogs(ctx context.Context, request *collogspb.ExportLogsServiceRequest) (int64, int64) {
var processedRecords int64
var rejectedRecords int64
// Process each ResourceLogs entry
for _, resourceLogs := range request.GetResourceLogs() {
if resourceLogs == nil {
continue
}
for _, scopeLogs := range resourceLogs.GetScopeLogs() {
if scopeLogs == nil {
continue
}
for _, logRecord := range scopeLogs.GetLogRecords() {
if logRecord == nil {
rejectedRecords++
slog.DebugContext(ctx, "Rejected nil log record", "scope", scopeLogs, "resource", resourceLogs)
continue
}
// Treat log records with no body and no attributes as malformed
if logRecord.Body == nil && len(logRecord.Attributes) == 0 {
rejectedRecords++
slog.DebugContext(ctx, "Rejected empty log record", "scope", scopeLogs, "resource", resourceLogs)
continue
}
// Extract the configured attribute value
attributeValue := l.extractor.ExtractValue(resourceLogs, scopeLogs, logRecord)
// Add to counter (thread-safe)
l.counter.AddCount(ctx, attributeValue)
processedRecords++
// Add debug logging for first few records
if processedRecords <= 5 {
slog.DebugContext(ctx, "Processed log record",
"record_number", processedRecords,
"attribute_key", l.config.AttributeKey,
"attribute_value", attributeValue,
"log_body", l.getLogBodyPreview(logRecord))
}
// Check for context cancellation
select {
case <-ctx.Done():
slog.WarnContext(ctx, "Context cancelled during log processing",
"processed_so_far", processedRecords)
return processedRecords, rejectedRecords
default:
// Continue processing
}
}
}
}
return processedRecords, rejectedRecords
}
// getLogBodyPreview returns a preview of the log record body for debugging
func (l *dash0LogsServiceServer) getLogBodyPreview(logRecord *otellogs.LogRecord) string {
body := logRecord.GetBody()
if body == nil {
return "[empty]"
}
if stringVal := body.GetStringValue(); stringVal != "" {
if len(stringVal) > 50 {
return stringVal[:50] + "..."
}
return stringVal
}
return "[non-string body]"
}
// Stop gracefully shuts down the logs service server
func (l *dash0LogsServiceServer) Stop() {
if l.counter != nil {
l.counter.Stop()
}
}
// GetStats returns current service statistics
func (l *dash0LogsServiceServer) GetStats() map[string]interface{} {
stats := make(map[string]interface{})
stats["server_addr"] = l.addr
stats["attribute_key"] = l.config.AttributeKey
stats["window_duration"] = l.config.WindowDuration.String()
if l.counter != nil {
counterStats := l.counter.GetCurrentStats()
for k, v := range counterStats {
stats[k] = v
}
}
return stats
}