Skip to content

Commit

Permalink
Refactor kernel and protocol log when sending to backend (#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Mar 6, 2025
1 parent 16b0438 commit ecb6e8c
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 54 deletions.
4 changes: 2 additions & 2 deletions pkg/accesslog/collector/protocols/http1.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, connection *Partit
if host == "" && originalRequest.URL != nil {
host = originalRequest.URL.Host
}
forwarder.SendTransferProtocolEvent(p.ctx, details, &v3.AccessLogProtocolLogs{
forwarder.SendTransferProtocolEvent(p.ctx, common.NewProtocolLogEvent(details, &v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
StartTime: forwarder.BuildOffsetTimestamp(details[0].GetStartTime()),
Expand All @@ -265,7 +265,7 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, connection *Partit
},
},
},
})
}))
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/accesslog/collector/protocols/http2.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (r *HTTP2Protocol) HandleWholeStream(_ *PartitionConnection, stream *HTTP2S
if streamHost == "" {
streamHost = stream.ReqHeader[":host"]
}
forwarder.SendTransferProtocolEvent(r.ctx, details, &v3.AccessLogProtocolLogs{
forwarder.SendTransferProtocolEvent(r.ctx, common.NewProtocolLogEvent(details, &v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
StartTime: forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, details[0]).GetStartTime()),
Expand All @@ -286,7 +286,7 @@ func (r *HTTP2Protocol) HandleWholeStream(_ *PartitionConnection, stream *HTTP2S
},
},
},
})
}))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/accesslog/common/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (c *ConnectionManager) Start(ctx context.Context, accessLogContext *AccessL
SocketFD: activateConn.SocketFD,
Success: 0,
})
accessLogContext.Queue.AppendKernelLog(LogTypeClose, wapperedEvent)
accessLogContext.Queue.AppendKernelLog(NewKernelLogEvent(LogTypeClose, wapperedEvent))
}

case <-ctx.Done():
Expand Down
64 changes: 64 additions & 0 deletions pkg/accesslog/common/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package common

import (
"github.com/apache/skywalking-rover/pkg/accesslog/events"

v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)

type kernelLogEvent struct {
logType LogType
event events.Event
}

func NewKernelLogEvent(logType LogType, event events.Event) KernelLog {
return &kernelLogEvent{
logType: logType,
event: event,
}
}

func (k *kernelLogEvent) Type() LogType {
return k.logType
}

func (k *kernelLogEvent) Event() events.Event {
return k.event
}

type ProtocolEventData struct {
KernelLogs []events.SocketDetail
ProtocolLogData *v3.AccessLogProtocolLogs
}

func (r *ProtocolEventData) RelateKernelLogs() []events.SocketDetail {
return r.KernelLogs
}

func (r *ProtocolEventData) ProtocolLog() *v3.AccessLogProtocolLogs {
return r.ProtocolLogData
}

func NewProtocolLogEvent(kernelLogs []events.SocketDetail, protocolData *v3.AccessLogProtocolLogs) ProtocolLog {
return &ProtocolEventData{
KernelLogs: kernelLogs,
ProtocolLogData: protocolData,
}
}
36 changes: 15 additions & 21 deletions pkg/accesslog/common/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ import (

var log = logger.GetLogger("access_log", "common")

type KernelLog struct {
Type LogType
Event events.Event
type KernelLog interface {
Type() LogType
Event() events.Event
}

type ProtocolLog struct {
KernelLogs []events.SocketDetail
Protocol *v3.AccessLogProtocolLogs
type ProtocolLog interface {
RelateKernelLogs() []events.SocketDetail
ProtocolLog() *v3.AccessLogProtocolLogs
}

type Queue struct {
kernelLogs chan *KernelLog
protocolLogs chan *ProtocolLog
kernelLogs chan KernelLog
protocolLogs chan ProtocolLog

maxFlushCount int
period time.Duration
Expand All @@ -57,39 +57,33 @@ type Queue struct {
}

type QueueConsumer interface {
Consume(kernels chan *KernelLog, protocols chan *ProtocolLog)
Consume(kernels chan KernelLog, protocols chan ProtocolLog)
}

func NewQueue(maxFlushCount int, period time.Duration, consumer QueueConsumer) *Queue {
return &Queue{
kernelLogs: make(chan *KernelLog, maxFlushCount*3),
protocolLogs: make(chan *ProtocolLog, maxFlushCount*3),
kernelLogs: make(chan KernelLog, maxFlushCount*3),
protocolLogs: make(chan ProtocolLog, maxFlushCount*3),
maxFlushCount: maxFlushCount,
period: period,
consumer: consumer,
consumeLock: &sync.Mutex{},
}
}

func (q *Queue) AppendKernelLog(tp LogType, event events.Event) {
func (q *Queue) AppendKernelLog(log KernelLog) {
select {
case q.kernelLogs <- &KernelLog{
Type: tp,
Event: event,
}:
case q.kernelLogs <- log:
default:
atomic.AddInt64(&q.dropKernelLogCount, 1)
return
}
q.consumeIfNeed()
}

func (q *Queue) AppendProtocolLog(kernelLogs []events.SocketDetail, protocol *v3.AccessLogProtocolLogs) {
func (q *Queue) AppendProtocolLog(log ProtocolLog) {
select {
case q.protocolLogs <- &ProtocolLog{
KernelLogs: kernelLogs,
Protocol: protocol,
}:
case q.protocolLogs <- log:
default:
atomic.AddInt64(&q.dropProtocolLogCount, 1)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/accesslog/forwarder/close.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func init() {
}

func SendCloseEvent(context *common.AccessLogContext, event *common.CloseEventWithNotify) {
context.Queue.AppendKernelLog(common.LogTypeClose, event)
context.Queue.AppendKernelLog(common.NewKernelLogEvent(common.LogTypeClose, event))
}

func closeLogBuilder(event events.Event) *v3.AccessLogKernelLog {
Expand Down
4 changes: 2 additions & 2 deletions pkg/accesslog/forwarder/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func init() {
}

func SendConnectEvent(context *common.AccessLogContext, event *events.SocketConnectEvent, socketPair *ip.SocketPair) {
context.Queue.AppendKernelLog(common.LogTypeConnect, &common.ConnectEventWithSocket{
context.Queue.AppendKernelLog(common.NewKernelLogEvent(common.LogTypeConnect, &common.ConnectEventWithSocket{
SocketConnectEvent: event,
SocketPair: socketPair,
})
}))
}

func connectLogBuilder(event events.Event) *v3.AccessLogKernelLog {
Expand Down
7 changes: 2 additions & 5 deletions pkg/accesslog/forwarder/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ package forwarder

import (
"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"

v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)

func SendTransferProtocolEvent(context *common.AccessLogContext, kernelLogs []events.SocketDetail, protocolData *v3.AccessLogProtocolLogs) {
context.Queue.AppendProtocolLog(kernelLogs, protocolData)
func SendTransferProtocolEvent(context *common.AccessLogContext, event common.ProtocolLog) {
context.Queue.AppendProtocolLog(event)
}
2 changes: 1 addition & 1 deletion pkg/accesslog/forwarder/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func init() {
}

func SendTransferNoProtocolEvent(context *common.AccessLogContext, event events.SocketDetail) {
context.Queue.AppendKernelLog(common.LogTypeKernelTransfer, event)
context.Queue.AppendKernelLog(common.NewKernelLogEvent(common.LogTypeKernelTransfer, event))
}

func kernelTransferLogBuilder(event events.Event) *v3.AccessLogKernelLog {
Expand Down
39 changes: 20 additions & 19 deletions pkg/accesslog/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (r *Runner) Start(ctx context.Context) error {
return nil
}

func (r *Runner) Consume(kernels chan *common.KernelLog, protocols chan *common.ProtocolLog) {
func (r *Runner) Consume(kernels chan common.KernelLog, protocols chan common.ProtocolLog) {
if r.backendOp.GetConnectionStatus() != backend.Connected {
log.Warnf("failure to connect to the backend, skip generating access log")
return
Expand All @@ -117,21 +117,21 @@ func (r *Runner) Consume(kernels chan *common.KernelLog, protocols chan *common.
r.sender.AddBatch(batch)
}

func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan *common.KernelLog, protocols chan *common.ProtocolLog) {
func (r *Runner) buildConnectionLogs(batch *sender.BatchLogs, kernels chan common.KernelLog, protocols chan common.ProtocolLog) {
r.buildKernelLogs(kernels, batch)
r.buildProtocolLogs(protocols, batch)

r.context.ConnectionMgr.OnBuildConnectionLogFinished()
}

func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, batch *sender.BatchLogs) {
delayAppends := make([]*common.KernelLog, 0)
func (r *Runner) buildKernelLogs(kernels chan common.KernelLog, batch *sender.BatchLogs) {
delayAppends := make([]common.KernelLog, 0)
for {
select {
case kernelLog := <-kernels:
connection, curLog, delay := r.buildKernelLog(kernelLog)
log.Debugf("building kernel log result, connetaion ID: %d, random ID: %d, exist connection: %t, delay: %t",
kernelLog.Event.GetConnectionID(), kernelLog.Event.GetRandomID(), connection != nil, delay)
kernelLog.Event().GetConnectionID(), kernelLog.Event().GetRandomID(), connection != nil, delay)
if connection != nil && curLog != nil {
batch.AppendKernelLog(connection, curLog)
} else if delay {
Expand All @@ -150,17 +150,18 @@ func (r *Runner) buildKernelLogs(kernels chan *common.KernelLog, batch *sender.B
}
}

func (r *Runner) buildProtocolLogs(protocols chan *common.ProtocolLog, batch *sender.BatchLogs) {
delayAppends := make([]*common.ProtocolLog, 0)
func (r *Runner) buildProtocolLogs(protocols chan common.ProtocolLog, batch *sender.BatchLogs) {
delayAppends := make([]common.ProtocolLog, 0)
for {
select {
case protocolLog := <-protocols:
connection, kernelLogs, protocolLogs, delay := r.buildProtocolLog(protocolLog)
if log.Enable(logrus.DebugLevel) {
kernelLogCount := len(protocolLog.KernelLogs)
kernelLogCount := len(protocolLog.RelateKernelLogs())
var conID, randomID uint64
if kernelLogCount > 0 {
conID, randomID = protocolLog.KernelLogs[0].GetConnectionID(), protocolLog.KernelLogs[0].GetRandomID()
conID, randomID = protocolLog.RelateKernelLogs()[0].GetConnectionID(),
protocolLog.RelateKernelLogs()[0].GetRandomID()
}
log.Debugf("building protocol log result, connetaion ID: %d, random ID: %d, connection exist: %t, delay: %t",
conID, randomID, connection != nil, delay)
Expand Down Expand Up @@ -200,12 +201,12 @@ func (r *Runner) shouldReportProcessLog(pid uint32) bool {
return true
}

func (r *Runner) buildProtocolLog(protocolLog *common.ProtocolLog) (*common.ConnectionInfo,
func (r *Runner) buildProtocolLog(protocolLog common.ProtocolLog) (*common.ConnectionInfo,
[]*v3.AccessLogKernelLog, *v3.AccessLogProtocolLogs, bool) {
if len(protocolLog.KernelLogs) == 0 {
if len(protocolLog.RelateKernelLogs()) == 0 {
return nil, nil, nil, false
}
firstKernelLog := protocolLog.KernelLogs[0]
firstKernelLog := protocolLog.RelateKernelLogs()[0]
pid, _ := events.ParseConnectionID(firstKernelLog.GetConnectionID())
// if the process not monitoring, then ignore it
if !r.shouldReportProcessLog(pid) {
Expand All @@ -221,33 +222,33 @@ func (r *Runner) buildProtocolLog(protocolLog *common.ProtocolLog) (*common.Conn
return nil, nil, nil, true
}
kernelLogs := make([]*v3.AccessLogKernelLog, 0)
for _, kl := range protocolLog.KernelLogs {
for _, kl := range protocolLog.RelateKernelLogs() {
event := forwarder.BuildKernelLogFromEvent(common.LogTypeKernelTransfer, kl)
if event == nil {
continue
}
kernelLogs = append(kernelLogs, event)
}

return connection, kernelLogs, protocolLog.Protocol, false
return connection, kernelLogs, protocolLog.ProtocolLog(), false
}

func (r *Runner) buildKernelLog(kernelLog *common.KernelLog) (*common.ConnectionInfo, *v3.AccessLogKernelLog, bool) {
pid, _ := events.ParseConnectionID(kernelLog.Event.GetConnectionID())
func (r *Runner) buildKernelLog(kernelLog common.KernelLog) (*common.ConnectionInfo, *v3.AccessLogKernelLog, bool) {
pid, _ := events.ParseConnectionID(kernelLog.Event().GetConnectionID())
// if the process not monitoring, then ignore it
if !r.shouldReportProcessLog(pid) {
return nil, nil, false
}
connection := r.context.ConnectionMgr.Find(kernelLog.Event)
connection := r.context.ConnectionMgr.Find(kernelLog.Event())
if connection == nil {
// if the connection cannot be found, it means that the connection have not been established
// just re-add into the queue for checking in the next period
if time.Since(kernelLog.Event.Timestamp()) > kernelAccessLogCacheTime {
if time.Since(kernelLog.Event().Timestamp()) > kernelAccessLogCacheTime {
return nil, nil, false
}
return nil, nil, true
}
event := forwarder.BuildKernelLogFromEvent(kernelLog.Type, kernelLog.Event)
event := forwarder.BuildKernelLogFromEvent(kernelLog.Type(), kernelLog.Event())
return connection, event, false
}

Expand Down

0 comments on commit ecb6e8c

Please sign in to comment.