Skip to content

Commit

Permalink
Refactor the socket detail message in the Access log module (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Nov 11, 2024
1 parent 5c6fe8b commit 5d72fa4
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 32 deletions.
6 changes: 3 additions & 3 deletions pkg/accesslog/collector/protocols/http1.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (p *HTTP1Protocol) handleResponse(metrics ProtocolMetrics, b *buffer.Buffer
}

func (p *HTTP1Protocol) handleHTTPData(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) {
detailEvents := make([]*events.SocketDetailEvent, 0)
detailEvents := make([]events.SocketDetail, 0)
detailEvents = appendSocketDetailsFromBuffer(detailEvents, request.HeaderBuffer())
detailEvents = appendSocketDetailsFromBuffer(detailEvents, request.BodyBuffer())
detailEvents = appendSocketDetailsFromBuffer(detailEvents, response.HeaderBuffer())
Expand All @@ -166,8 +166,8 @@ func (p *HTTP1Protocol) handleHTTPData(metrics *HTTP1Metrics, request *reader.Re
forwarder.SendTransferProtocolEvent(p.ctx, detailEvents, &v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
StartTime: forwarder.BuildOffsetTimestamp(detailEvents[0].StartTime),
EndTime: forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].EndTime),
StartTime: forwarder.BuildOffsetTimestamp(detailEvents[0].GetStartTime()),
EndTime: forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
Version: v3.AccessLogHTTPProtocolVersion_HTTP1,
Request: &v3.AccessLogHTTPProtocolRequest{
Method: transformHTTPMethod(originalRequest.Method),
Expand Down
10 changes: 5 additions & 5 deletions pkg/accesslog/collector/protocols/http2.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id ui
}

func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) {
detailEvents := make([]*events.SocketDetailEvent, 0)
detailEvents := make([]events.SocketDetail, 0)
detailEvents = appendSocketDetailsFromBuffer(detailEvents, stream.reqHeaderBuffer)
detailEvents = appendSocketDetailsFromBuffer(detailEvents, stream.reqBodyBuffer)
detailEvents = appendSocketDetailsFromBuffer(detailEvents, stream.respHeaderBuffer)
Expand All @@ -239,8 +239,8 @@ func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) {
forwarder.SendTransferProtocolEvent(r.ctx, detailEvents, &v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
StartTime: forwarder.BuildOffsetTimestamp(r.firstDetail(stream.reqBodyBuffer, detailEvents[0]).StartTime),
EndTime: forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].EndTime),
StartTime: forwarder.BuildOffsetTimestamp(r.firstDetail(stream.reqBodyBuffer, detailEvents[0]).GetStartTime()),
EndTime: forwarder.BuildOffsetTimestamp(detailEvents[len(detailEvents)-1].GetEndTime()),
Version: v3.AccessLogHTTPProtocolVersion_HTTP2,
Request: &v3.AccessLogHTTPProtocolRequest{
Method: r.parseHTTPMethod(stream),
Expand Down Expand Up @@ -271,11 +271,11 @@ func (r *HTTP2Protocol) parseHTTPMethod(streaming *HTTP2Streaming) v3.AccessLogH
return transformHTTPMethod(strings.ToUpper(method))
}

func (r *HTTP2Protocol) firstDetail(buf *buffer.Buffer, def *events.SocketDetailEvent) *events.SocketDetailEvent {
func (r *HTTP2Protocol) firstDetail(buf *buffer.Buffer, def events.SocketDetail) events.SocketDetail {
if buf == nil || buf.Details() == nil || buf.Details().Len() == 0 {
return def
}
return buf.Details().Front().Value.(*events.SocketDetailEvent)
return buf.Details().Front().Value.(events.SocketDetail)
}

func (r *HTTP2Protocol) bufferSizeOfZero(buf *buffer.Buffer) uint64 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/accesslog/collector/protocols/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ type Protocol interface {
Analyze(metrics ProtocolMetrics, buffer *buffer.Buffer, helper *AnalyzeHelper) error
}

func appendSocketDetailsFromBuffer(result []*events.SocketDetailEvent, buf *buffer.Buffer) []*events.SocketDetailEvent {
func appendSocketDetailsFromBuffer(result []events.SocketDetail, buf *buffer.Buffer) []events.SocketDetail {
if buf == nil || buf.DetailLength() == 0 {
return result
}
for e := buf.Details().Front(); e != nil; e = e.Next() {
if len(result) > 0 && result[len(result)-1] == e.Value {
continue
}
result = append(result, e.Value.(*events.SocketDetailEvent))
result = append(result, e.Value.(events.SocketDetail))
}
return result
}
Expand Down
29 changes: 19 additions & 10 deletions pkg/accesslog/collector/protocols/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type AnalyzeQueue struct {
context *common.AccessLogContext
eventQueue *btf.EventQueue
perCPUBuffer int64

detailSupplier func() events.SocketDetail
}

func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) {
Expand All @@ -71,14 +73,17 @@ func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) {
eventQueue: btf.NewEventQueue(ctx.Config.ProtocolAnalyze.Parallels, ctx.Config.ProtocolAnalyze.QueueSize, func() btf.PartitionContext {
return NewPartitionContext(ctx)
}),
detailSupplier: func() events.SocketDetail {
return &events.SocketDetailEvent{}
},
}, nil
}

func (q *AnalyzeQueue) Start(ctx context.Context) {
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, int(q.perCPUBuffer), func() interface{} {
return &events.SocketDetailEvent{}
return q.detailSupplier()
}, func(data interface{}) string {
return fmt.Sprintf("%d", data.(*events.SocketDetailEvent).GetConnectionID())
return fmt.Sprintf("%d", data.(events.SocketDetail).GetConnectionID())
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue, int(q.perCPUBuffer), func() interface{} {
return &events.SocketDataUploadEvent{}
Expand All @@ -89,6 +94,10 @@ func (q *AnalyzeQueue) Start(ctx context.Context) {
q.eventQueue.Start(ctx, q.context.BPF.Linker)
}

func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func() events.SocketDetail) {
q.detailSupplier = supplier
}

type PartitionContext struct {
context *common.AccessLogContext
protocolMgr *ProtocolManager
Expand Down Expand Up @@ -164,18 +173,18 @@ func (p *PartitionContext) Start(ctx context.Context) {

func (p *PartitionContext) Consume(data interface{}) {
switch event := data.(type) {
case *events.SocketDetailEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
case events.SocketDetail:
pid, _ := events.ParseConnectionID(event.GetConnectionID())
log.Debugf("receive the socket detail event, connection ID: %d, random ID: %d, pid: %d, data id: %d, "+
"function name: %s, package count: %d, package size: %d, l4 duration: %d, ssl: %d",
event.ConnectionID, event.RandomID, pid, event.DataID0, event.FunctionName,
event.L4PackageCount, event.L4TotalPackageSize, event.L4Duration, event.SSL)
if event.Protocol == enums.ConnectionProtocolUnknown {
"function name: %s, package count: %d, package size: %d, ssl: %d",
event.GetConnectionID(), event.GetRandomID(), pid, event.DataID(), event.GetFunctionName(),
event.GetL4PackageCount(), event.GetL4TotalPackageSize(), event.GetSSL())
if event.GetProtocol() == enums.ConnectionProtocolUnknown {
// if the connection protocol is unknown, we just needs to add this into the kernel log
forwarder.SendTransferNoProtocolEvent(p.context, event)
return
}
connection := p.getConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.Protocol)
connection := p.getConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.GetProtocol())
connection.appendDetail(p.context, event)
case *events.SocketDataUploadEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
Expand Down Expand Up @@ -303,7 +312,7 @@ type PartitionConnection struct {
lastCheckCloseTime time.Time
}

func (p *PartitionConnection) appendDetail(ctx *common.AccessLogContext, detail *events.SocketDetailEvent) {
func (p *PartitionConnection) appendDetail(ctx *common.AccessLogContext, detail events.SocketDetail) {
if p.skipAllDataAnalyze {
// if the connection is already skip all data analyze, then just send the detail event
forwarder.SendTransferNoProtocolEvent(ctx, detail)
Expand Down
8 changes: 4 additions & 4 deletions pkg/accesslog/common/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ func (c *ConnectionManager) connectionPostHandle(connection *ConnectionInfo, eve
// if not all processor finished, then add into the map
c.allUnfinishedConnections[fmt.Sprintf("%d_%d", event.GetConnectionID(), event.GetRandomID())] = &e.allProcessorFinished
}
case *events.SocketDetailEvent:
if e.SSL == 1 && connection.RPCConnection.TlsMode == v3.AccessLogConnectionTLSMode_Plain {
case events.SocketDetail:
if e.GetSSL() == 1 && connection.RPCConnection.TlsMode == v3.AccessLogConnectionTLSMode_Plain {
connection.RPCConnection.TlsMode = v3.AccessLogConnectionTLSMode_TLS
}
if e.Protocol != enums.ConnectionProtocolUnknown && connection.RPCConnection.Protocol == v3.AccessLogProtocolType_TCP {
switch e.Protocol {
if e.GetProtocol() != enums.ConnectionProtocolUnknown && connection.RPCConnection.Protocol == v3.AccessLogProtocolType_TCP {
switch e.GetProtocol() {
case enums.ConnectionProtocolHTTP:
connection.RPCConnection.Protocol = v3.AccessLogProtocolType_HTTP_1
case enums.ConnectionProtocolHTTP2:
Expand Down
4 changes: 2 additions & 2 deletions pkg/accesslog/common/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type KernelLog struct {
}

type ProtocolLog struct {
KernelLogs []*events.SocketDetailEvent
KernelLogs []events.SocketDetail
Protocol *v3.AccessLogProtocolLogs
}

Expand Down Expand Up @@ -84,7 +84,7 @@ func (q *Queue) AppendKernelLog(tp LogType, event events.Event) {
q.consumeIfNeed()
}

func (q *Queue) AppendProtocolLog(kernelLogs []*events.SocketDetailEvent, protocol *v3.AccessLogProtocolLogs) {
func (q *Queue) AppendProtocolLog(kernelLogs []events.SocketDetail, protocol *v3.AccessLogProtocolLogs) {
select {
case q.protocolLogs <- &ProtocolLog{
KernelLogs: kernelLogs,
Expand Down
44 changes: 44 additions & 0 deletions pkg/accesslog/events/detail.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,26 @@ package events
import (
"time"

"github.com/apache/skywalking-rover/pkg/tools/buffer"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/apache/skywalking-rover/pkg/tools/host"
)

type SocketDetail interface {
Event
buffer.SocketDataDetail

GetStartTime() uint64
GetEndTime() uint64

GetL4PackageCount() uint8
GetL4TotalPackageSize() uint64

GetFunctionName() enums.SocketFunctionName
GetProtocol() enums.ConnectionProtocol
GetSSL() uint8
}

type SocketDetailEvent struct {
ConnectionID uint64
RandomID uint64
Expand Down Expand Up @@ -71,3 +87,31 @@ func (d *SocketDetailEvent) Timestamp() time.Time {
func (d *SocketDetailEvent) DataID() uint64 {
return d.DataID0
}

func (d *SocketDetailEvent) GetStartTime() uint64 {
return d.StartTime
}

func (d *SocketDetailEvent) GetEndTime() uint64 {
return d.EndTime
}

func (d *SocketDetailEvent) GetL4PackageCount() uint8 {
return d.L4PackageCount
}

func (d *SocketDetailEvent) GetL4TotalPackageSize() uint64 {
return d.L4TotalPackageSize
}

func (d *SocketDetailEvent) GetFunctionName() enums.SocketFunctionName {
return d.FunctionName
}

func (d *SocketDetailEvent) GetProtocol() enums.ConnectionProtocol {
return d.Protocol
}

func (d *SocketDetailEvent) GetSSL() uint8 {
return d.SSL
}
2 changes: 1 addition & 1 deletion pkg/accesslog/forwarder/close.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

func init() {
registerKernelLogBuilder(common.LogTypeClose, closeLogBuilder)
RegisterKernelLogBuilder(common.LogTypeClose, closeLogBuilder)
}

func SendCloseEvent(context *common.AccessLogContext, event *common.CloseEventWithNotify) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/accesslog/forwarder/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func init() {
registerKernelLogBuilder(common.LogTypeConnect, connectLogBuilder)
RegisterKernelLogBuilder(common.LogTypeConnect, connectLogBuilder)
}

func SendConnectEvent(context *common.AccessLogContext, event *events.SocketConnectEvent, socketPair *ip.SocketPair) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/accesslog/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type KernelLogBuilder func(data events.Event) *v3.AccessLogKernelLog

var kernelLogBuilders = make([]KernelLogBuilder, 10)

func registerKernelLogBuilder(tp common.LogType, builder KernelLogBuilder) {
func RegisterKernelLogBuilder(tp common.LogType, builder KernelLogBuilder) {
kernelLogBuilders[tp] = builder
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/accesslog/forwarder/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ import (
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)

func SendTransferProtocolEvent(context *common.AccessLogContext, kernelLogs []*events.SocketDetailEvent, protocolData *v3.AccessLogProtocolLogs) {
func SendTransferProtocolEvent(context *common.AccessLogContext, kernelLogs []events.SocketDetail, protocolData *v3.AccessLogProtocolLogs) {
context.Queue.AppendProtocolLog(kernelLogs, protocolData)
}
4 changes: 2 additions & 2 deletions pkg/accesslog/forwarder/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
)

func init() {
registerKernelLogBuilder(common.LogTypeKernelTransfer, kernelTransferLogBuilder)
RegisterKernelLogBuilder(common.LogTypeKernelTransfer, kernelTransferLogBuilder)
}

func SendTransferNoProtocolEvent(context *common.AccessLogContext, event *events.SocketDetailEvent) {
func SendTransferNoProtocolEvent(context *common.AccessLogContext, event events.SocketDetail) {
context.Queue.AppendKernelLog(common.LogTypeKernelTransfer, event)
}

Expand Down

0 comments on commit 5d72fa4

Please sign in to comment.