Skip to content

Commit

Permalink
Downgrade the protocol of connection when protocol break (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Mar 4, 2025
1 parent f459a28 commit 1641294
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Release Notes.
* Reduce unessential `conntrack` query when detect new connection.
* Reduce CPU and memory usage in the access log module.
* Reduce handle connection event time in the access log module.
* Downgrade the protocol of connection when protocol break in the access log module.

#### Bug Fixes
* Fix the base image cannot run in the arm64.
Expand Down
12 changes: 11 additions & 1 deletion bpf/include/protocol_analyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ static __inline __u32 infer_http2_message(const char* buf, size_t count) {
bpf_probe_read(frame, sizeof(frame), buf + frameOffset);
frameOffset += (bpf_ntohl(*(__u32 *) frame) >> 8) + kFrameBasicSize;

// frametype only accept 0x00 - 0x09
if (frame[3] > 0x09) {
return CONNECTION_MESSAGE_TYPE_UNKNOWN;
}
// is header frame
if (frame[3] != kFrameTypeHeader) {
continue;
Expand All @@ -135,9 +139,15 @@ static __inline __u32 infer_http2_message(const char* buf, size_t count) {
return CONNECTION_MESSAGE_TYPE_UNKNOWN;
}

// stream ID cannot be 0
__u32 streamID = ((frame[5] & 0x7F) << 24) | (frame[6] << 16) | (frame[7] << 8) | frame[8];
if (streamID == 0) {
return CONNECTION_MESSAGE_TYPE_UNKNOWN;
}

// locate the header block fragment offset
headerBlockFragmentOffset = kFrameBasicSize;
if (frame[4] & 0x20) { // PADDED flag is set
if (frame[4] & 0x08) { // PADDED flag is set
headerBlockFragmentOffset += 1;
}
if (frame[4] & 0x20) { // PRIORITY flag is set
Expand Down
26 changes: 16 additions & 10 deletions pkg/accesslog/collector/protocols/http1.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,24 @@ import (
var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1")
var http1AnalyzeMaxRetryCount = 3

type HTTP1ProtocolAnalyze func(metrics *HTTP1Metrics, connection *PartitionConnection,
request *reader.Request, response *reader.Response) error
type HTTP1ProtocolAnalyzer interface {
HandleHTTPData(metrics *HTTP1Metrics, connection *PartitionConnection,
request *reader.Request, response *reader.Response) error
OnProtocolBreak(metrics *HTTP1Metrics, connection *PartitionConnection)
}

type HTTP1Protocol struct {
ctx *common.AccessLogContext
analyze HTTP1ProtocolAnalyze
reader *reader.Reader
ctx *common.AccessLogContext
analyzer HTTP1ProtocolAnalyzer
reader *reader.Reader
}

func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze HTTP1ProtocolAnalyze) *HTTP1Protocol {
func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze HTTP1ProtocolAnalyzer) *HTTP1Protocol {
protocol := &HTTP1Protocol{ctx: ctx, reader: reader.NewReader()}
if analyze == nil {
protocol.analyze = protocol.HandleHTTPData
protocol.analyzer = protocol
} else {
protocol.analyze = analyze
protocol.analyzer = analyze
}
return protocol
}
Expand Down Expand Up @@ -174,7 +177,7 @@ func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, connection *Partit
}

// getting the request and response, then send to the forwarder
if analyzeError := p.analyze(metrics, connection, request, response); analyzeError != nil {
if analyzeError := p.analyzer.HandleHTTPData(metrics, connection, request, response); analyzeError != nil {
p.appendAnalyzeUnFinished(metrics, request, response)
}
return enums.ParseResultSuccess, nil
Expand All @@ -191,7 +194,7 @@ func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics *HTTP1Metrics, request *
func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics, connection *PartitionConnection) {
for element := m.analyzeUnFinished.Front(); element != nil; {
unFinished := element.Value.(*HTTP1AnalyzeUnFinished)
err := p.analyze(m, connection, unFinished.request, unFinished.response)
err := p.analyzer.HandleHTTPData(m, connection, unFinished.request, unFinished.response)
if err != nil {
unFinished.retryCount++
if unFinished.retryCount < http1AnalyzeMaxRetryCount {
Expand Down Expand Up @@ -266,6 +269,9 @@ func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, connection *Partit
return nil
}

func (p *HTTP1Protocol) OnProtocolBreak(metrics *HTTP1Metrics, connection *PartitionConnection) {
}

func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) {
if ioReader != nil {
_ = ioReader.Close()
Expand Down
74 changes: 41 additions & 33 deletions pkg/accesslog/collector/protocols/http2.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,32 @@ var maxHTTP2StreamingTime = time.Minute * 3

var http2Log = logger.GetLogger("accesslog", "collector", "protocols", "http2")

type HTTP2StreamAnalyze func(stream *HTTP2Streaming) error
type HTTP2StreamAnalyzer interface {
HandleWholeStream(connection *PartitionConnection, stream *HTTP2Streaming) error
OnProtocolBreak(connection *PartitionConnection, metrics *HTTP2Metrics)
}

type HTTP2Protocol struct {
ctx *common.AccessLogContext
analyze HTTP2StreamAnalyze
ctx *common.AccessLogContext
analyzer HTTP2StreamAnalyzer
}

func NewHTTP2Analyzer(ctx *common.AccessLogContext, analyze HTTP2StreamAnalyze) *HTTP2Protocol {
func NewHTTP2Analyzer(ctx *common.AccessLogContext, analyzer HTTP2StreamAnalyzer) *HTTP2Protocol {
protocol := &HTTP2Protocol{ctx: ctx}
if analyze == nil {
protocol.analyze = protocol.handleWholeStream
if analyzer == nil {
protocol.analyzer = protocol
} else {
protocol.analyze = analyze
protocol.analyzer = analyzer
}
return protocol
}

type HTTP2Metrics struct {
connectionID uint64
randomID uint64
hpackDecoder *hpack.Decoder
ConnectionID uint64
RandomID uint64
HpackDecoder *hpack.Decoder

streams map[uint32]*HTTP2Streaming
Streams map[uint32]*HTTP2Streaming
}

type HTTP2Streaming struct {
Expand All @@ -82,18 +85,18 @@ type HTTP2Streaming struct {

func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64) ProtocolMetrics {
return &HTTP2Metrics{
connectionID: connectionID,
randomID: randomID,
hpackDecoder: hpack.NewDecoder(4096, nil),
streams: make(map[uint32]*HTTP2Streaming),
ConnectionID: connectionID,
RandomID: randomID,
HpackDecoder: hpack.NewDecoder(4096, nil),
Streams: make(map[uint32]*HTTP2Streaming),
}
}

func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error {
http2Metrics := connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics)
buf := connection.Buffer(enums.ConnectionProtocolHTTP2)
http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID: %d, random ID: %d",
http2Metrics.connectionID, http2Metrics.randomID)
http2Metrics.ConnectionID, http2Metrics.RandomID)
buf.ResetForLoopReading()
for {
if !buf.PrepareForReading() {
Expand All @@ -115,9 +118,9 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze
var result enums.ParseResult
switch header.Type {
case http2.FrameHeaders:
result, protocolBreak, _ = r.handleHeader(connection, &header, startPosition, http2Metrics, buf)
result, protocolBreak, _ = r.HandleHeader(connection, &header, startPosition, http2Metrics, buf)
case http2.FrameData:
result, protocolBreak, _ = r.handleData(&header, startPosition, http2Metrics, buf)
result, protocolBreak, _ = r.HandleData(connection, &header, startPosition, http2Metrics, buf)
default:
tmp := make([]byte, header.Length)
if err := buf.ReadUntilBufferFull(tmp); err != nil {
Expand All @@ -134,8 +137,9 @@ func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *Analyze
// if the protocol break, then stop the loop and notify the caller to skip analyze all data(just sending the detail)
if protocolBreak {
http2Log.Warnf("the HTTP/2 protocol break, maybe not tracing the connection from beginning, skip all data analyze in this connection, "+
"connection ID: %d", http2Metrics.connectionID)
"connection ID: %d", http2Metrics.ConnectionID)
helper.ProtocolBreak = true
r.analyzer.OnProtocolBreak(connection, http2Metrics)
break
}

Expand All @@ -159,19 +163,19 @@ func (r *HTTP2Protocol) ForProtocol() enums.ConnectionProtocol {
return enums.ConnectionProtocolHTTP2
}

func (r *HTTP2Protocol) handleHeader(connection *PartitionConnection, header *http2.FrameHeader, startPos *buffer.Position,
func (r *HTTP2Protocol) HandleHeader(connection *PartitionConnection, header *http2.FrameHeader, startPos *buffer.Position,
metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool, error) {
bytes := make([]byte, header.Length)
if err := buf.ReadUntilBufferFull(bytes); err != nil {
return enums.ParseResultSkipPackage, false, err
return enums.ParseResultSkipPackage, true, err
}
headerData, err := metrics.hpackDecoder.DecodeFull(bytes)
headerData, err := metrics.HpackDecoder.DecodeFull(bytes)
if err != nil {
// reading the header failure, maybe not tracing the connection from beginning
return enums.ParseResultSkipPackage, true, err
}
// saving stream
streaming := metrics.streams[header.StreamID]
streaming := metrics.Streams[header.StreamID]
headers := r.parseHeaders(headerData)
if streaming == nil {
streaming = &HTTP2Streaming{
Expand All @@ -180,7 +184,7 @@ func (r *HTTP2Protocol) handleHeader(connection *PartitionConnection, header *ht
ReqHeaderBuffer: buf.Slice(true, startPos, buf.Position()),
Connection: connection,
}
metrics.streams[header.StreamID] = streaming
metrics.Streams[header.StreamID] = streaming
return enums.ParseResultSuccess, false, nil
}

Expand All @@ -207,14 +211,15 @@ func (r *HTTP2Protocol) handleHeader(connection *PartitionConnection, header *ht
// is end of stream and in the response
if header.Flags.Has(http2.FlagHeadersEndStream) {
// should be end of the stream and send to the protocol
_ = r.analyze(streaming)
_ = r.analyzer.HandleWholeStream(connection, streaming)
// delete streaming
delete(metrics.streams, header.StreamID)
delete(metrics.Streams, header.StreamID)
}
return enums.ParseResultSuccess, false, nil
}

func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id uint32, streaming *HTTP2Streaming) {
func (r *HTTP2Protocol) validateIsStreamOpenTooLong(connection *PartitionConnection,
metrics *HTTP2Metrics, id uint32, streaming *HTTP2Streaming) {
// if in the response mode or the request body is not nil, then skip
if streaming.IsInResponse || streaming.ReqBodyBuffer == nil {
return
Expand All @@ -227,9 +232,9 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id ui
}
if time.Since(host.Time(socketBuffer.StartTime())) > maxHTTP2StreamingTime {
http2Log.Infof("detect the HTTP/2 stream is too long, split the stream, connection ID: %d, stream ID: %d, headers: %v",
metrics.connectionID, id, streaming.ReqHeader)
metrics.ConnectionID, id, streaming.ReqHeader)

_ = r.analyze(streaming)
_ = r.analyzer.HandleWholeStream(connection, streaming)

// clean sent buffers
if streaming.ReqBodyBuffer != nil {
Expand All @@ -238,7 +243,7 @@ func (r *HTTP2Protocol) validateIsStreamOpenTooLong(metrics *HTTP2Metrics, id ui
}
}

func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) error {
func (r *HTTP2Protocol) HandleWholeStream(_ *PartitionConnection, stream *HTTP2Streaming) error {
details := make([]events.SocketDetail, 0)
var allInclude = true
var idRange *buffer.DataIDRange
Expand Down Expand Up @@ -285,6 +290,9 @@ func (r *HTTP2Protocol) handleWholeStream(stream *HTTP2Streaming) error {
return nil
}

func (r *HTTP2Protocol) OnProtocolBreak(connection *PartitionConnection, metrics *HTTP2Metrics) {
}

func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming) v3.AccessLogHTTPProtocolRequestMethod {
method := streaming.ReqHeader[":method"]
if method == "" {
Expand Down Expand Up @@ -318,10 +326,10 @@ func (r *HTTP2Protocol) AppendHeaders(exist, needAppends map[string]string) {
}
}

func (r *HTTP2Protocol) handleData(header *http2.FrameHeader, startPos *buffer.Position,
func (r *HTTP2Protocol) HandleData(connection *PartitionConnection, header *http2.FrameHeader, startPos *buffer.Position,
metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool, error) {
bytes := make([]byte, header.Length)
streaming := metrics.streams[header.StreamID]
streaming := metrics.Streams[header.StreamID]
if streaming == nil {
// cannot found the stream, maybe not tracing the connection from beginning
return enums.ParseResultSkipPackage, true, nil
Expand All @@ -335,7 +343,7 @@ func (r *HTTP2Protocol) handleData(header *http2.FrameHeader, startPos *buffer.P
streaming.RespBodyBuffer = buffer.CombineSlices(true, buf, streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
}

r.validateIsStreamOpenTooLong(metrics, header.StreamID, streaming)
r.validateIsStreamOpenTooLong(connection, metrics, header.StreamID, streaming)
return enums.ParseResultSuccess, false, nil
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/accesslog/collector/protocols/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,11 @@ func (p *PartitionContext) processConnectionEvents(connection *PartitionConnecti
if helper.ProtocolBreak {
// notify the connection manager to skip analyze all data(just sending the detail)
connection.skipAllDataAnalyze = true
p.context.ConnectionMgr.SkipAllDataAnalyze(connection.connectionID, connection.randomID)
p.context.ConnectionMgr.SkipAllDataAnalyzeAndDowngradeProtocol(connection.connectionID, connection.randomID)
for _, buf := range connection.dataBuffers {
for e := buf.BuildDetails().Front(); e != nil; e = e.Next() {
forwarder.SendTransferNoProtocolEvent(p.context, e.Value.(events.SocketDetail))
}
buf.Clean()
}
}
Expand Down
Loading

0 comments on commit 1641294

Please sign in to comment.