Skip to content

Commit 57649f6

Browse files
authored
Separate multiple process for reading connection information in the access log module (#151)
1 parent f015297 commit 57649f6

File tree

4 files changed

+110
-53
lines changed

4 files changed

+110
-53
lines changed

CHANGES.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Release Notes.
55
0.8.0
66
------------------
77
#### Features
8+
* Separate multiple process for reading connection information in the access log module.
89

910
#### Bug Fixes
1011
* Fix the base image cannot run in the arm64.

configs/rover_configs.yaml

+7
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,13 @@ access_log:
140140
max_count: ${ROVER_ACCESS_LOG_FLUSH_MAX_COUNT:10000}
141141
# The period of flush access log to the backend
142142
period: ${ROVER_ACCESS_LOG_FLUSH_PERIOD:5s}
143+
connection_analyze:
144+
# The size of connection buffer on each CPU
145+
per_cpu_buffer: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PER_CPU_BUFFER:200KB}
146+
# The count of parallel connection analyzer
147+
parallels: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_PARALLELS:1}
148+
# The size of per paralleled analyzer queue
149+
queue_size: ${ROVER_ACCESS_LOG_CONNECTION_ANALYZE_QUEUE_SIZE:2000}
143150
protocol_analyze:
144151
# The size of socket data buffer on each CPU
145152
per_cpu_buffer: ${ROVER_ACCESS_LOG_PROTOCOL_ANALYZE_PER_CPU_BUFFER:400KB}

pkg/accesslog/collector/connect.go

+90-48
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,13 @@
1818
package collector
1919

2020
import (
21+
"context"
2122
"encoding/binary"
23+
"fmt"
2224
"net"
25+
"os"
26+
27+
"github.com/docker/go-units"
2328

2429
"github.com/sirupsen/logrus"
2530

@@ -29,6 +34,7 @@ import (
2934
"github.com/apache/skywalking-rover/pkg/logger"
3035
"github.com/apache/skywalking-rover/pkg/module"
3136
"github.com/apache/skywalking-rover/pkg/tools"
37+
"github.com/apache/skywalking-rover/pkg/tools/btf"
3238
"github.com/apache/skywalking-rover/pkg/tools/enums"
3339
"github.com/apache/skywalking-rover/pkg/tools/ip"
3440

@@ -43,72 +49,108 @@ var connectLogger = logger.GetLogger("access_log", "collector", "connect")
4349
var connectCollectInstance = NewConnectCollector()
4450

4551
type ConnectCollector struct {
46-
connTracker *ip.ConnTrack
52+
eventQueue *btf.EventQueue
4753
}
4854

4955
func NewConnectCollector() *ConnectCollector {
56+
return &ConnectCollector{}
57+
}
58+
59+
func (c *ConnectCollector) Start(_ *module.Manager, ctx *common.AccessLogContext) error {
60+
perCPUBufferSize, err := units.RAMInBytes(ctx.Config.ConnectionAnalyze.PerCPUBufferSize)
61+
if err != nil {
62+
return err
63+
}
64+
if int(perCPUBufferSize) < os.Getpagesize() {
65+
return fmt.Errorf("the cpu buffer must bigger than %dB", os.Getpagesize())
66+
}
67+
if ctx.Config.ConnectionAnalyze.Parallels < 1 {
68+
return fmt.Errorf("the parallels cannot be small than 1")
69+
}
70+
if ctx.Config.ConnectionAnalyze.QueueSize < 1 {
71+
return fmt.Errorf("the queue size be small than 1")
72+
}
5073
track, err := ip.NewConnTrack()
5174
if err != nil {
5275
connectLogger.Warnf("cannot create the connection tracker, %v", err)
5376
}
54-
return &ConnectCollector{connTracker: track}
55-
}
77+
c.eventQueue = btf.NewEventQueue(ctx.Config.ConnectionAnalyze.Parallels, ctx.Config.ConnectionAnalyze.QueueSize, func() btf.PartitionContext {
78+
return newConnectionPartitionContext(ctx, track)
79+
})
80+
c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), func() interface{} {
81+
return &events.SocketConnectEvent{}
82+
}, func(data interface{}) string {
83+
return fmt.Sprintf("%d", data.(*events.SocketConnectEvent).ConID)
84+
})
85+
c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker)
5686

57-
func (c *ConnectCollector) Start(_ *module.Manager, context *common.AccessLogContext) error {
58-
context.BPF.AddTracePoint("syscalls", "sys_enter_connect", context.BPF.TracepointEnterConnect)
59-
context.BPF.AddTracePoint("syscalls", "sys_exit_connect", context.BPF.TracepointExitConnect)
60-
context.BPF.AddTracePoint("syscalls", "sys_enter_accept", context.BPF.TracepointEnterAccept)
61-
context.BPF.AddTracePoint("syscalls", "sys_exit_accept", context.BPF.TracepointExitAccept)
62-
context.BPF.AddTracePoint("syscalls", "sys_enter_accept4", context.BPF.TracepointEnterAccept)
63-
context.BPF.AddTracePoint("syscalls", "sys_exit_accept4", context.BPF.TracepointExitAccept)
87+
ctx.BPF.AddTracePoint("syscalls", "sys_enter_connect", ctx.BPF.TracepointEnterConnect)
88+
ctx.BPF.AddTracePoint("syscalls", "sys_exit_connect", ctx.BPF.TracepointExitConnect)
89+
ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept", ctx.BPF.TracepointEnterAccept)
90+
ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept", ctx.BPF.TracepointExitAccept)
91+
ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept4", ctx.BPF.TracepointEnterAccept)
92+
ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept4", ctx.BPF.TracepointExitAccept)
6493

65-
context.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
66-
"tcp_connect": context.BPF.TcpConnect,
94+
ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
95+
"tcp_connect": ctx.BPF.TcpConnect,
6796
})
68-
context.BPF.AddLink(link.Kretprobe, map[string]*ebpf.Program{
69-
"sock_alloc": context.BPF.SockAllocRet,
97+
ctx.BPF.AddLink(link.Kretprobe, map[string]*ebpf.Program{
98+
"sock_alloc": ctx.BPF.SockAllocRet,
7099
})
71-
context.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
72-
"ip4_datagram_connect": context.BPF.Ip4UdpDatagramConnect,
100+
ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{
101+
"ip4_datagram_connect": ctx.BPF.Ip4UdpDatagramConnect,
73102
})
74103

75-
_ = context.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
76-
"__nf_conntrack_hash_insert": context.BPF.NfConntrackHashInsert,
104+
_ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
105+
"__nf_conntrack_hash_insert": ctx.BPF.NfConntrackHashInsert,
77106
})
78-
_ = context.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
79-
"nf_confirm": context.BPF.NfConfirm,
107+
_ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
108+
"nf_confirm": ctx.BPF.NfConfirm,
80109
})
81-
_ = context.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
82-
"ctnetlink_fill_info": context.BPF.NfCtnetlinkFillInfo,
83-
})
84-
85-
context.BPF.ReadEventAsync(context.BPF.SocketConnectionEventQueue, func(data interface{}) {
86-
event := data.(*events.SocketConnectEvent)
87-
connectLogger.Debugf("receive connect event, connection ID: %d, randomID: %d, "+
88-
"pid: %d, fd: %d, role: %s: func: %s, family: %d, success: %d, conntrack exist: %t",
89-
event.ConID, event.RandomID, event.PID, event.SocketFD, enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
90-
event.SocketFamily, event.ConnectSuccess, event.ConnTrackUpstreamPort != 0)
91-
socketPair := c.buildSocketFromConnectEvent(event)
92-
if socketPair == nil {
93-
connectLogger.Debugf("cannot found the socket paire from connect event, connection ID: %d, randomID: %d",
94-
event.ConID, event.RandomID)
95-
return
96-
}
97-
connectLogger.Debugf("build socket pair success, connection ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
98-
event.ConID, event.RandomID, socketPair.Role, socketPair.SrcIP, socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
99-
context.ConnectionMgr.OnConnectEvent(event, socketPair)
100-
forwarder.SendConnectEvent(context, event, socketPair)
101-
}, func() interface{} {
102-
return &events.SocketConnectEvent{}
110+
_ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{
111+
"ctnetlink_fill_info": ctx.BPF.NfCtnetlinkFillInfo,
103112
})
104-
105113
return nil
106114
}
107115

108116
func (c *ConnectCollector) Stop() {
109117
}
110118

111-
func (c *ConnectCollector) fixSocketFamilyIfNeed(event *events.SocketConnectEvent, result *ip.SocketPair) {
119+
type ConnectionPartitionContext struct {
120+
context *common.AccessLogContext
121+
connTracker *ip.ConnTrack
122+
}
123+
124+
func newConnectionPartitionContext(ctx *common.AccessLogContext, connTracker *ip.ConnTrack) *ConnectionPartitionContext {
125+
return &ConnectionPartitionContext{
126+
context: ctx,
127+
connTracker: connTracker,
128+
}
129+
}
130+
131+
func (c *ConnectionPartitionContext) Start(ctx context.Context) {
132+
133+
}
134+
135+
func (c *ConnectionPartitionContext) Consume(data interface{}) {
136+
event := data.(*events.SocketConnectEvent)
137+
connectLogger.Debugf("receive connect event, connection ID: %d, randomID: %d, "+
138+
"pid: %d, fd: %d, role: %s: func: %s, family: %d, success: %d, conntrack exist: %t",
139+
event.ConID, event.RandomID, event.PID, event.SocketFD, enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName),
140+
event.SocketFamily, event.ConnectSuccess, event.ConnTrackUpstreamPort != 0)
141+
socketPair := c.buildSocketFromConnectEvent(event)
142+
if socketPair == nil {
143+
connectLogger.Debugf("cannot found the socket paire from connect event, connection ID: %d, randomID: %d",
144+
event.ConID, event.RandomID)
145+
return
146+
}
147+
connectLogger.Debugf("build socket pair success, connection ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d",
148+
event.ConID, event.RandomID, socketPair.Role, socketPair.SrcIP, socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort)
149+
c.context.ConnectionMgr.OnConnectEvent(event, socketPair)
150+
forwarder.SendConnectEvent(c.context, event, socketPair)
151+
}
152+
153+
func (c *ConnectionPartitionContext) fixSocketFamilyIfNeed(event *events.SocketConnectEvent, result *ip.SocketPair) {
112154
if result == nil {
113155
return
114156
}
@@ -128,7 +170,7 @@ func (c *ConnectCollector) fixSocketFamilyIfNeed(event *events.SocketConnectEven
128170
}
129171
}
130172

131-
func (c *ConnectCollector) buildSocketFromConnectEvent(event *events.SocketConnectEvent) *ip.SocketPair {
173+
func (c *ConnectionPartitionContext) buildSocketFromConnectEvent(event *events.SocketConnectEvent) *ip.SocketPair {
132174
if event.SocketFamily != unix.AF_INET && event.SocketFamily != unix.AF_INET6 && event.SocketFamily != enums.SocketFamilyUnknown {
133175
// if not ipv4, ipv6 or unknown, ignore
134176
return nil
@@ -160,7 +202,7 @@ func (c *ConnectCollector) buildSocketFromConnectEvent(event *events.SocketConne
160202
return pair
161203
}
162204

163-
func (c *ConnectCollector) isOnlyLocalPortEmpty(socketPair *ip.SocketPair) bool {
205+
func (c *ConnectionPartitionContext) isOnlyLocalPortEmpty(socketPair *ip.SocketPair) bool {
164206
if socketPair == nil {
165207
return false
166208
}
@@ -172,7 +214,7 @@ func (c *ConnectCollector) isOnlyLocalPortEmpty(socketPair *ip.SocketPair) bool
172214
return socketPair.IsValid()
173215
}
174216

175-
func (c *ConnectCollector) buildSocketPair(event *events.SocketConnectEvent) *ip.SocketPair {
217+
func (c *ConnectionPartitionContext) buildSocketPair(event *events.SocketConnectEvent) *ip.SocketPair {
176218
var result *ip.SocketPair
177219
haveConnTrack := false
178220
if event.SocketFamily == unix.AF_INET {
@@ -232,7 +274,7 @@ func (c *ConnectCollector) buildSocketPair(event *events.SocketConnectEvent) *ip
232274
return result
233275
}
234276

235-
func (c *ConnectCollector) tryToUpdateSocketFromConntrack(event *events.SocketConnectEvent, socket *ip.SocketPair) {
277+
func (c *ConnectionPartitionContext) tryToUpdateSocketFromConntrack(event *events.SocketConnectEvent, socket *ip.SocketPair) {
236278
if socket != nil && socket.IsValid() && c.connTracker != nil && !tools.IsLocalHostAddress(socket.DestIP) &&
237279
event.FuncName != enums.SocketFunctionNameAccept { // accept event don't need to update the remote address
238280
// if no contract and socket data is valid, then trying to get the remote address from the socket

pkg/accesslog/common/config.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,25 @@ import "github.com/apache/skywalking-rover/pkg/module"
2222
type Config struct {
2323
module.Config
2424

25-
Active bool `mapstructure:"active"`
26-
ExcludeNamespaces string `mapstructure:"exclude_namespaces"`
27-
ExcludeClusters string `mapstructure:"exclude_cluster"`
28-
Flush FlushConfig `mapstructure:"flush"`
29-
ProtocolAnalyze ProtocolAnalyzeConfig `mapstructure:"protocol_analyze"`
25+
Active bool `mapstructure:"active"`
26+
ExcludeNamespaces string `mapstructure:"exclude_namespaces"`
27+
ExcludeClusters string `mapstructure:"exclude_cluster"`
28+
Flush FlushConfig `mapstructure:"flush"`
29+
ConnectionAnalyze ConnectionAnalyzeConfig `mapstructure:"connection_analyze"`
30+
ProtocolAnalyze ProtocolAnalyzeConfig `mapstructure:"protocol_analyze"`
3031
}
3132

3233
type FlushConfig struct {
3334
MaxCountOneStream int `mapstructure:"max_count"`
3435
Period string `mapstructure:"period"`
3536
}
3637

38+
type ConnectionAnalyzeConfig struct {
39+
PerCPUBufferSize string `mapstructure:"per_cpu_buffer"`
40+
Parallels int `mapstructure:"parallels"`
41+
QueueSize int `mapstructure:"queue_size"`
42+
}
43+
3744
type ProtocolAnalyzeConfig struct {
3845
PerCPUBufferSize string `mapstructure:"per_cpu_buffer"`
3946
Parallels int `mapstructure:"parallels"`

0 commit comments

Comments
 (0)