Skip to content

Commit 48046bf

Browse files
committed
feat: 完成jobflow controller大致功能
1 parent fc300eb commit 48046bf

14 files changed

+566
-101
lines changed

README.md

+101-1
Original file line numberDiff line numberDiff line change
@@ -1 +1,101 @@
1-
# job-flow-operator
1+
## jobflow-operator 简易型 Job 树状引擎控制器
2+
3+
### 项目思路与设计
4+
设计背景:k8s 当中原生的 Job 资源对象执行时,并没有相互依赖的编排特性(ex: Job a 完成后 -> 再执行Job b ...)。
5+
本项目在此需求上,基于 k8s 的扩展功能,实现 JobFlow 的自定义资源控制器,实现一个能执行多 Job 树状引擎的 operator 应用。
6+
7+
![](./image/%E6%97%A0%E6%A0%87%E9%A2%98-2023-08-10-2343.png?raw=true)
8+
9+
- crd 资源对象如下,更多信息可以参考 [参考](./yaml/example1.yaml)
10+
- name: flow 名称,多个 flow 名称不能重复
11+
- dependencies: 定义依赖项,如果有多个依赖可以填写多个
12+
- jobTemplate: job 模版,支持 k8s 原生 job spec 全部字段
13+
```yaml
14+
apiVersion: api.practice.com/v1alpha1
15+
kind: JobFlow
16+
metadata:
17+
name: jobflow-example
18+
spec:
19+
# 可填写多个 flow 流程
20+
# 每个 flow 中重要字段 分别为:
21+
# name: flow 名称,多个 flow 名称不能重复
22+
# dependencies: 定义依赖项,如果有多个依赖可以填写多个
23+
# jobTemplate: job 模版,支持 k8s 原生 job spec 全部字段
24+
flows:
25+
- name: job1
26+
dependencies: []
27+
jobTemplate:
28+
template:
29+
spec:
30+
containers:
31+
- image: busybox:1.28
32+
command:
33+
- sh
34+
- -c
35+
- sleep 10s
36+
imagePullPolicy: IfNotPresent
37+
name: nginx
38+
restartPolicy: OnFailure
39+
- name: job2
40+
jobTemplate:
41+
template:
42+
spec:
43+
containers:
44+
- image: busybox:1.28
45+
command:
46+
- sh
47+
- -c
48+
- sleep 100s
49+
imagePullPolicy: IfNotPresent
50+
name: nginx
51+
restartPolicy: OnFailure
52+
dependencies:
53+
- job1 # 代表 job2 依赖 job1 完成后才开始启动
54+
- name: job3
55+
jobTemplate:
56+
template:
57+
spec:
58+
containers:
59+
- image: busybox:1.28
60+
command:
61+
- sh
62+
- -c
63+
- sleep 100s
64+
imagePullPolicy: IfNotPresent
65+
name: nginx
66+
restartPolicy: OnFailure
67+
dependencies:
68+
# 代表 job3 依赖 job1 job2 完成后才开始启动
69+
- job1
70+
- job2
71+
- name: job4
72+
jobTemplate:
73+
template:
74+
spec:
75+
containers:
76+
- image: busybox:1.28
77+
command:
78+
- sh
79+
- -c
80+
- sleep 10s
81+
imagePullPolicy: IfNotPresent
82+
name: nginx
83+
restartPolicy: OnFailure
84+
- name: job5
85+
dependencies:
86+
# 代表依赖 job2 job4 后才执行
87+
- job4
88+
- job2
89+
jobTemplate:
90+
template:
91+
spec:
92+
containers:
93+
- image: busybox:1.28
94+
command:
95+
- sh
96+
- -c
97+
- sleep 10s
98+
imagePullPolicy: IfNotPresent
99+
name: nginx
100+
restartPolicy: OnFailure
101+
```

image/无标题-2023-08-10-2343.png

415 KB
Loading

main.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@ import (
44
jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1"
55
"github.com/myoperator/jobflowoperator/pkg/controller"
66
"github.com/myoperator/jobflowoperator/pkg/k8sconfig"
7+
batchv1 "k8s.io/api/batch/v1"
78
_ "k8s.io/code-generator"
89
"k8s.io/klog/v2"
910
"os"
1011
"sigs.k8s.io/controller-runtime/pkg/builder"
12+
"sigs.k8s.io/controller-runtime/pkg/handler"
1113
logf "sigs.k8s.io/controller-runtime/pkg/log"
1214
"sigs.k8s.io/controller-runtime/pkg/log/zap"
1315
"sigs.k8s.io/controller-runtime/pkg/manager"
1416
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
17+
"sigs.k8s.io/controller-runtime/pkg/source"
1518
"time"
1619
)
1720

@@ -50,7 +53,7 @@ func main() {
5053
var d time.Duration = 0
5154
// 1. 管理器初始化
5255
mgr, err := manager.New(k8sconfig.K8sRestConfig(), manager.Options{
53-
Logger: logf.Log.WithName("jobflow-operator"),
56+
Logger: logf.Log.WithName("JobFlow operator"),
5457
SyncPeriod: &d, // resync不设置触发
5558
})
5659
if err != nil {
@@ -66,9 +69,16 @@ func main() {
6669
}
6770

6871
// 3. 控制器相关
69-
jobFlowCtl := controller.NewJobFlowController(mgr.GetClient(), mgr.GetLogger(), mgr.GetScheme())
72+
jobFlowCtl := controller.NewJobFlowController(mgr.GetClient(), mgr.GetLogger(),
73+
mgr.GetScheme(), mgr.GetEventRecorderFor("JobFlow operator"))
7074

71-
err = builder.ControllerManagedBy(mgr).For(&jobflowv1alpha1.JobFlow{}).Complete(jobFlowCtl)
75+
err = builder.ControllerManagedBy(mgr).For(&jobflowv1alpha1.JobFlow{}).
76+
Watches(&source.Kind{Type: &batchv1.Job{}},
77+
handler.Funcs{
78+
UpdateFunc: jobFlowCtl.OnUpdateJobHandler,
79+
DeleteFunc: jobFlowCtl.OnDeleteJobHandler,
80+
},
81+
).Complete(jobFlowCtl)
7282

7383
errC := make(chan error)
7484

pkg/apis/jobflow/v1alpha1/register.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import (
77
)
88

99
const (
10-
ClusterConfigGroup = "api.practice.com"
11-
ClusterConfigVersion = "v1alpha1"
10+
JobFlowGroup = "api.practice.com"
11+
JobFlowVersion = "v1alpha1"
12+
JobFlowKind = "JobFlow"
13+
JobFlowApiVersion = "api.practice.com/v1alpha1"
1214
)
1315

1416
// SchemeGroupVersion is group version used to register these objects
15-
var SchemeGroupVersion = schema.GroupVersion{Group: ClusterConfigGroup, Version: ClusterConfigVersion}
17+
var SchemeGroupVersion = schema.GroupVersion{Group: JobFlowGroup, Version: JobFlowVersion}
1618

1719
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
1820
func Kind(kind string) schema.GroupKind {

pkg/apis/jobflow/v1alpha1/types.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,35 @@ type JobFlow struct {
2020
}
2121

2222
type JobFlowSpec struct {
23+
// Flows 多个 flow 步骤流程
2324
Flows []Flow `json:"flows,omitempty"`
2425
}
2526

2627
type Flow struct {
27-
// job name, namespace 就是默认 namespace
28+
// job name, namespace 就是默认 JobFlow namespace
2829
Name string `json:"name"`
29-
// 用于赋值
30-
JobTemplate v1.JobSpec `json:"jobTemplate"`
31-
// 依赖项
30+
// 用于赋值 job 模版
31+
JobTemplate v1.JobSpec `json:"jobTemplate,omitempty"`
32+
// Dependencies 依赖项,其中可以填写多个 依赖的 job name
33+
// ex: 如果 job3 依赖 job1 and job2, 就能
3234
Dependencies []string `json:"dependencies"`
3335
}
3436

3537
type JobFlowStatus struct {
36-
// 用于存储 map 是 name/namespace 的方式 或是只要 name就行
38+
// 用于存储 map 是 name/namespace 进行存储
3739
JobStatusList map[string]v1.JobStatus `json:"jobStatusList,omitempty"`
40+
// 记录 JobFlow 状态
41+
State string `json:"state,omitempty"`
3842
}
3943

44+
const (
45+
Succeed = "Succeed" // 代表 JobFlow 中所有 Job 都執行成功
46+
Terminating = "Terminating" // 代表 JobFlow 正在被刪除
47+
Failed = "Failed" // 代表 JobFlow 執行失敗
48+
Running = "Running" // 代表 JobFlow 有任何一個 Job 正在執行
49+
Pending = "Pending" // 代表 JobFlow 正在等待
50+
)
51+
4052
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
4153

4254
// JobFlowList

pkg/apis/jobflow/v1alpha1/zz_generated.deepcopy.go

+6-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)