Skip to content

Commit 0b25f65

Browse files
louyutingsczyh30
authored andcommitted
Add initial work of the core pipeline (slot chain) (alibaba#3)
1 parent 77ad470 commit 0b25f65

19 files changed

+949
-1
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77

88
## Contributing
99

10-
Contributions are always welcomed! Please see [CONTRIBUTING](./CONTRIBUTING.md) for detailed guidelines.
10+
Contributions are always welcomed! Please see [CONTRIBUTING](./CONTRIBUTING.md) for detailed guidelines.

core/context.go

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package core
2+
3+
type Context struct {
4+
ResWrapper *ResourceWrapper
5+
Entry *CtxEntry
6+
Node node
7+
Count uint64
8+
Input *SentinelInput
9+
Output *SentinelOutput
10+
FeatureData map[interface{}]interface{}
11+
}
12+
13+
func NewContext() *Context {
14+
ctx := &Context{
15+
Input: newInput(),
16+
Output: newOutput(),
17+
}
18+
ctx.Input.Context = ctx
19+
ctx.Output.Context = ctx
20+
return ctx
21+
}
22+
23+
type SentinelInput struct {
24+
Context *Context
25+
// store some values in this context when calling context in slot.
26+
data map[interface{}]interface{}
27+
}
28+
29+
func newInput() *SentinelInput {
30+
return &SentinelInput{}
31+
}
32+
33+
type SentinelOutput struct {
34+
Context *Context
35+
CheckResult *RuleCheckResult
36+
msg string
37+
// store output data.
38+
data map[interface{}]interface{}
39+
}
40+
41+
func newOutput() *SentinelOutput {
42+
return &SentinelOutput{}
43+
}
44+
45+
// Reset init Context,
46+
func (ctx *Context) Reset() {
47+
// reset all fields of ctx
48+
ctx.ResWrapper = nil
49+
ctx.Entry = nil
50+
ctx.Node = nil
51+
ctx.Count = 0
52+
ctx.Input = nil
53+
ctx.Output = nil
54+
ctx.FeatureData = nil
55+
}

core/context_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package core

core/entry.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package core
2+
3+
// TrafficType describe the traffic type: Inbound or OutBound
4+
type TrafficType int32
5+
6+
const (
7+
InBound TrafficType = iota
8+
OutBound
9+
)
10+
11+
type ResourceWrapper struct {
12+
// global unique resource name
13+
ResourceName string
14+
// InBound or OutBound
15+
FlowType TrafficType
16+
}
17+
18+
// CtxEntry means Context entry,
19+
type CtxEntry struct {
20+
createTime uint64
21+
rs *ResourceWrapper
22+
// one entry with one context
23+
ctx *Context
24+
// each entry holds a slot chain.
25+
// it means this entry will go through the sc
26+
sc *SlotChain
27+
// caller node
28+
originNode node
29+
// current resource node
30+
currentNode node
31+
}
32+
33+
func NewCtEntry(ctx *Context, rw *ResourceWrapper, sc *SlotChain, cn node) *CtxEntry {
34+
return &CtxEntry{
35+
createTime: GetTimeMilli(),
36+
rs: rw,
37+
ctx: ctx,
38+
sc: sc,
39+
currentNode: cn,
40+
}
41+
}
42+
43+
func (e *CtxEntry) Exit() {
44+
e.ExitWithCnt(1)
45+
}
46+
47+
func (e *CtxEntry) ExitWithCnt(count int32) {
48+
e.exitForContext(e.ctx, count)
49+
}
50+
51+
func (e *CtxEntry) exitForContext(ctx *Context, count int32) {
52+
if e.sc != nil {
53+
e.sc.exit(ctx)
54+
}
55+
}
56+
57+
func (e *CtxEntry) GetCurrentNode() node {
58+
return e.currentNode
59+
}

core/entry_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package core

core/global_variable.go

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package core
2+
3+
const (
4+
TotalInBoundResourceName = "__total_inbound_traffic__"
5+
WindowLengthImMs uint32 = 200
6+
SampleCount uint32 = 5
7+
IntervalInMs uint32 = 1000
8+
)

core/logging.go

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package core
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"os"
7+
)
8+
9+
func InitDefaultLoggerToConsole() {
10+
fmt.Println("Init default log, output to console")
11+
log.SetFlags(log.LstdFlags | log.Lshortfile)
12+
log.SetPrefix("[sentinel]")
13+
}
14+
15+
// outputFile is the full path(absolute path)
16+
func NewFileLogger(outputFile, prefix string, flag int) *log.Logger {
17+
//get file info
18+
var logFile *os.File
19+
_, err := os.Stat(outputFile)
20+
if err == nil {
21+
logFile, err = os.Open(outputFile)
22+
if err != nil {
23+
log.Fatal("open log file error, ", err)
24+
}
25+
} else if err != nil && os.IsNotExist(err) {
26+
logFile, err = os.Create(outputFile)
27+
if err != nil {
28+
log.Fatal("create log file error, ", err)
29+
}
30+
} else {
31+
log.Fatal("open log file error, ", err)
32+
}
33+
return log.New(logFile, prefix, flag)
34+
}

core/node.go

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package core
2+
3+
type node interface {
4+
// total = pass + blocked
5+
TotalCountInMinute() uint64
6+
PassCountInMinute() uint64
7+
BlockCountInMinute() uint64
8+
CompleteCountInMinute() uint64
9+
ExceptionCountInMinute() uint64
10+
11+
TotalQps() uint64
12+
PassQps() uint64
13+
BlockQps() uint64
14+
CompleteQps() uint64
15+
ExceptionQps() uint64
16+
17+
AvgRt() uint64
18+
CurrentGoroutineNum() uint64
19+
20+
AddPassRequest(count uint64)
21+
AddRtAndCompleteRequest(rt, count uint64)
22+
AddBlockRequest(count uint64)
23+
AddExceptionRequest(count uint64)
24+
IncreaseGoroutineNum()
25+
DecreaseGoroutineNum()
26+
27+
Reset()
28+
}

core/node_test.go

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package core
2+
3+
import "github.com/stretchr/testify/mock"
4+
5+
type NodeMock struct {
6+
mock.Mock
7+
}
8+
9+
func (m *NodeMock) TotalCountInMinute() uint64 {
10+
args := m.Called()
11+
return uint64(args.Int(0))
12+
}
13+
func (m *NodeMock) PassCountInMinute() uint64 {
14+
args := m.Called()
15+
return uint64(args.Int(0))
16+
}
17+
func (m *NodeMock) BlockCountInMinute() uint64 {
18+
args := m.Called()
19+
return uint64(args.Int(0))
20+
}
21+
func (m *NodeMock) CompleteCountInMinute() uint64 {
22+
args := m.Called()
23+
return uint64(args.Int(0))
24+
}
25+
func (m *NodeMock) ExceptionCountInMinute() uint64 {
26+
args := m.Called()
27+
return uint64(args.Int(0))
28+
}
29+
30+
func (m *NodeMock) TotalQps() uint64 {
31+
args := m.Called()
32+
return uint64(args.Int(0))
33+
}
34+
func (m *NodeMock) PassQps() uint64 {
35+
args := m.Called()
36+
return uint64(*args.Get(0).(*int))
37+
}
38+
func (m *NodeMock) BlockQps() uint64 {
39+
args := m.Called()
40+
return uint64(args.Int(0))
41+
}
42+
func (m *NodeMock) CompleteQps() uint64 {
43+
args := m.Called()
44+
return uint64(args.Int(0))
45+
}
46+
func (m *NodeMock) ExceptionQps() uint64 {
47+
args := m.Called()
48+
return uint64(args.Int(0))
49+
}
50+
func (m *NodeMock) AvgRt() uint64 {
51+
args := m.Called()
52+
return uint64(args.Int(0))
53+
}
54+
func (m *NodeMock) CurrentGoroutineNum() uint64 {
55+
args := m.Called()
56+
return uint64(args.Int(0))
57+
}
58+
59+
func (m *NodeMock) AddPassRequest(count uint64) {
60+
m.Called(count)
61+
return
62+
}
63+
func (m *NodeMock) AddRtAndCompleteRequest(rt, count uint64) {
64+
m.Called(rt, count)
65+
return
66+
}
67+
func (m *NodeMock) AddBlockRequest(count uint64) {
68+
m.Called(count)
69+
return
70+
}
71+
func (m *NodeMock) AddExceptionRequest(count uint64) {
72+
m.Called(count)
73+
return
74+
}
75+
76+
func (m *NodeMock) IncreaseGoroutineNum() {
77+
m.Called()
78+
return
79+
}
80+
func (m *NodeMock) DecreaseGoroutineNum() {
81+
m.Called()
82+
return
83+
}
84+
85+
func (m *NodeMock) Reset() {
86+
m.Called()
87+
return
88+
}

core/result.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package core
2+
3+
import (
4+
"fmt"
5+
)
6+
7+
type RuleBasedCheckBlockedEvent int8
8+
9+
const (
10+
UnknownEvent RuleBasedCheckBlockedEvent = iota
11+
)
12+
13+
type SlotResultStatus int8
14+
15+
const (
16+
ResultStatusPass SlotResultStatus = iota
17+
ResultStatusBlocked
18+
)
19+
20+
type RuleCheckResult struct {
21+
Status SlotResultStatus
22+
BlockedEvent RuleBasedCheckBlockedEvent
23+
BlockedMsg string
24+
}
25+
26+
func (r *RuleCheckResult) status() string {
27+
if r.Status == ResultStatusPass {
28+
return "ResultStatusPass"
29+
} else if r.Status == ResultStatusBlocked {
30+
return "ResultStatusBlocked"
31+
} else {
32+
return "Unknown"
33+
}
34+
}
35+
36+
func (r *RuleCheckResult) toString() string {
37+
return fmt.Sprintf("check result:%s; BlockedEvent is:%v; BlockedMsg is:%s;", r.status(), r.BlockedEvent, r.BlockedMsg)
38+
}
39+
40+
func NewSlotResultPass() *RuleCheckResult {
41+
return &RuleCheckResult{Status: ResultStatusPass}
42+
}
43+
44+
func NewSlotResultBlocked(blockEvent RuleBasedCheckBlockedEvent, blockReason string) *RuleCheckResult {
45+
return &RuleCheckResult{
46+
Status: ResultStatusBlocked,
47+
BlockedEvent: blockEvent,
48+
BlockedMsg: blockReason,
49+
}
50+
}

core/result_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package core
2+
3+
import "testing"
4+
5+
func TestNewSlotResultBlock_normal(t *testing.T) {
6+
r := NewSlotResultBlocked(UnknownEvent, "UnknownEvent")
7+
t.Log(r.toString())
8+
}

0 commit comments

Comments
 (0)