Skip to content

Commit 76f4a56

Browse files
committed
feat: 新增jobflow中 job 任务出错的后续逻辑
1 parent 48046bf commit 76f4a56

File tree

5 files changed

+64
-56
lines changed

5 files changed

+64
-56
lines changed

Dockerfile

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ RUN go mod download
2020
COPY main.go main.go
2121
COPY pkg/ pkg/
2222
# Build
23-
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o myclusterconfigoperator main.go
23+
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o jobflowoperator main.go
2424

2525

2626
FROM alpine:3.12
2727
WORKDIR /app
2828
# 需要的文件需要复制过来
29-
COPY --from=builder /app/myclusterconfigoperator .
29+
COPY --from=builder /app/jobflowoperator .
3030
USER 65532:65532
3131

32-
ENTRYPOINT ["./myclusterconfigoperator"]
32+
ENTRYPOINT ["./jobflowoperator"]

pkg/controller/helper.go

+21-15
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,15 @@ func (r *JobFlowController) deployJobFlow(ctx context.Context, jobFlow jobflowv1
4545
job.Name = jobName
4646
job.Namespace = jobFlow.Namespace
4747
job.Spec = flow.JobTemplate
48+
job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyNever
4849
// 直接创建
4950
if err = r.client.Create(ctx, job); err != nil {
5051
if errors.IsAlreadyExists(err) {
5152
continue
5253
}
5354
return err
5455
}
55-
r.Recorder.Eventf(&jobFlow, v1.EventTypeNormal, "Created", fmt.Sprintf("create a job named %v without dependencies", job.Name))
56+
r.event.Eventf(&jobFlow, v1.EventTypeNormal, "Created", fmt.Sprintf("create a job named %v without dependencies", job.Name))
5657
} else {
5758
// 如果有依赖的情况
5859
// query dependency meets the requirements
@@ -64,6 +65,9 @@ func (r *JobFlowController) deployJobFlow(ctx context.Context, jobFlow jobflowv1
6465
job.Name = jobName
6566
job.Namespace = jobFlow.Namespace
6667
job.Spec = flow.JobTemplate
68+
job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyNever
69+
var cc int32
70+
job.Spec.BackoffLimit = &cc
6771

6872
targetJobName := getJobName(jobFlow.Name, targetName)
6973
namespacedName := types.NamespacedName{
@@ -85,6 +89,11 @@ func (r *JobFlowController) deployJobFlow(ctx context.Context, jobFlow jobflowv1
8589
if dependenciesJob.Status.Succeeded != 1 {
8690
flag = false
8791
}
92+
93+
// 如果依赖的 job 出错,直接退出
94+
if dependenciesJob.Status.Failed == 1 {
95+
return errors.NewBadRequest(fmt.Sprintf("dependencies Job %s execute error", dependenciesJob.Name))
96+
}
8897
}
8998
// 如果已经完成,就进行下去
9099
if flag {
@@ -94,7 +103,7 @@ func (r *JobFlowController) deployJobFlow(ctx context.Context, jobFlow jobflowv1
94103
}
95104
return err
96105
}
97-
r.Recorder.Eventf(&jobFlow, v1.EventTypeNormal, "Created", fmt.Sprintf("create job named %v for next step", job.Name))
106+
r.event.Eventf(&jobFlow, v1.EventTypeNormal, "Created", fmt.Sprintf("create job named %v for next step", job.Name))
98107
}
99108
}
100109
continue
@@ -116,14 +125,17 @@ func (r *JobFlowController) updateJobFlowStatus(ctx context.Context, jobFlow *jo
116125
allJobList := new(batchv1.JobList)
117126
err := r.client.List(ctx, allJobList)
118127
if err != nil {
119-
klog.Error(err, "")
128+
klog.Error("list error: ", err)
120129
return err
121130
}
122131
jobFlowStatus, err := getAllJobStatus(jobFlow, allJobList)
123132
if err != nil {
124133
return err
125134
}
126135
jobFlow.Status = *jobFlowStatus
136+
if jobFlowStatus.State == jobflowv1alpha1.Succeed || jobFlowStatus.State == jobflowv1alpha1.Failed {
137+
r.event.Eventf(jobFlow, v1.EventTypeNormal, jobFlowStatus.State, fmt.Sprintf("finshed JobFlow named %s", jobFlow.Name))
138+
}
127139
if err = r.client.Status().Update(ctx, jobFlow); err != nil {
128140
if errors.IsNotFound(err) {
129141
return nil
@@ -133,17 +145,13 @@ func (r *JobFlowController) updateJobFlowStatus(ctx context.Context, jobFlow *jo
133145
return nil
134146
}
135147

136-
const (
137-
JobFlow = "JobFlow"
138-
)
139-
140-
// getAllJobStatus Get the information of all created jobs
148+
// getAllJobStatus 记录 Job Status
141149
func getAllJobStatus(jobFlow *jobflowv1alpha1.JobFlow, allJobList *batchv1.JobList) (*jobflowv1alpha1.JobFlowStatus, error) {
142-
// 过去掉只留 job flow 相关的
150+
// 过去掉只留 jobflow 相关的 job
143151
jobListRes := make([]batchv1.Job, 0)
144152
for _, job := range allJobList.Items {
145153
for _, reference := range job.OwnerReferences {
146-
if reference.Kind == JobFlow && reference.Name == jobFlow.Name {
154+
if reference.Kind == jobflowv1alpha1.JobFlowKind && reference.Name == jobFlow.Name {
147155
jobListRes = append(jobListRes, job)
148156
}
149157
}
@@ -176,7 +184,7 @@ func getAllJobStatus(jobFlow *jobflowv1alpha1.JobFlow, allJobList *batchv1.JobLi
176184
}
177185
}
178186

179-
// 确认 jobflow 狀態
187+
// 确认 jobFlow 狀態
180188
if jobFlow.DeletionTimestamp != nil {
181189
jobFlowStatus.State = jobflowv1alpha1.Terminating
182190
} else {
@@ -198,9 +206,8 @@ func getAllJobStatus(jobFlow *jobflowv1alpha1.JobFlow, allJobList *batchv1.JobLi
198206

199207
func (r *JobFlowController) OnUpdateJobHandler(event event.UpdateEvent, limitingInterface workqueue.RateLimitingInterface) {
200208
for _, ref := range event.ObjectNew.GetOwnerReferences() {
201-
fmt.Println("ccccccc")
202209
if ref.Kind == jobflowv1alpha1.JobFlowKind && ref.APIVersion == jobflowv1alpha1.JobFlowApiVersion {
203-
// 重新放入Reconcile调协方法
210+
// 重新放入 Reconcile 调协方法
204211
limitingInterface.Add(reconcile.Request{
205212
NamespacedName: types.NamespacedName{
206213
Name: ref.Name, Namespace: event.ObjectNew.GetNamespace(),
@@ -212,9 +219,8 @@ func (r *JobFlowController) OnUpdateJobHandler(event event.UpdateEvent, limiting
212219

213220
func (r *JobFlowController) OnDeleteJobHandler(event event.DeleteEvent, limitingInterface workqueue.RateLimitingInterface) {
214221
for _, ref := range event.Object.GetOwnerReferences() {
215-
fmt.Println("ddddddddd")
216222
if ref.Kind == jobflowv1alpha1.JobFlowKind && ref.APIVersion == jobflowv1alpha1.JobFlowApiVersion {
217-
// 重新入列,这样删除pod后,就会进入调和loop,发现ownerReference还在,会立即创建出新的pod。
223+
// 重新入列
218224
klog.Info("delete pod: ", event.Object.GetName(), event.Object.GetObjectKind())
219225
limitingInterface.Add(reconcile.Request{
220226
NamespacedName: types.NamespacedName{Name: ref.Name,

pkg/controller/jobflow_controller.go

+19-12
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@ import (
1616
)
1717

1818
type JobFlowController struct {
19-
client client.Client
20-
Scheme *runtime.Scheme
21-
Recorder record.EventRecorder
22-
log logr.Logger
19+
client client.Client
20+
Scheme *runtime.Scheme
21+
event record.EventRecorder
22+
log logr.Logger
2323
}
2424

25-
func NewJobFlowController(client client.Client, log logr.Logger, scheme *runtime.Scheme, recorder record.EventRecorder) *JobFlowController {
25+
func NewJobFlowController(client client.Client, log logr.Logger, scheme *runtime.Scheme, event record.EventRecorder) *JobFlowController {
2626
return &JobFlowController{
27-
client: client,
28-
log: log,
29-
Recorder: recorder,
30-
Scheme: scheme,
27+
client: client,
28+
log: log,
29+
event: event,
30+
Scheme: scheme,
3131
}
3232
}
3333

@@ -47,11 +47,11 @@ func (r *JobFlowController) Reconcile(ctx context.Context, req reconcile.Request
4747
return reconcile.Result{}, nil
4848
}
4949
klog.Error(err, err.Error())
50-
r.Recorder.Eventf(jobFlow, v1.EventTypeWarning, "Created", err.Error())
50+
r.event.Eventf(jobFlow, v1.EventTypeWarning, "Created", err.Error())
5151
return reconcile.Result{}, err
5252
}
5353

54-
if jobFlow.Status.State == jobflowv1alpha1.Succeed {
54+
if jobFlow.Status.State == jobflowv1alpha1.Failed {
5555
return reconcile.Result{}, nil
5656
}
5757

@@ -61,13 +61,20 @@ func (r *JobFlowController) Reconcile(ctx context.Context, req reconcile.Request
6161
// deploy job by dependence order.
6262
if err = r.deployJobFlow(ctx, *jobFlow); err != nil {
6363
klog.Error("deployJob error: ", err)
64-
return reconcile.Result{}, err
64+
r.event.Eventf(jobFlow, v1.EventTypeWarning, "Failed", err.Error())
65+
// 如果是 执行 job 任务出错,跳转
66+
if errors.IsBadRequest(err) {
67+
goto continueExecution
68+
}
69+
return reconcile.Result{RequeueAfter: time.Second * 60}, err
6570
}
6671

72+
continueExecution:
6773
// update status
6874
// 修改 job 狀態,list 出所有相關的 job ,並查看其狀態,並存在 status 中
6975
if err = r.updateJobFlowStatus(ctx, jobFlow); err != nil {
7076
klog.Error("update jobFlow status error: ", err)
77+
r.event.Eventf(jobFlow, v1.EventTypeWarning, "Failed", err.Error())
7178
return reconcile.Result{}, err
7279
}
7380
klog.Info("end jobFlow Reconcile........")

yaml/example1.yaml renamed to yaml/example-job-error.yaml

+3-25
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
apiVersion: api.practice.com/v1alpha1
22
kind: JobFlow
33
metadata:
4-
name: jobflow-example
4+
name: jobflow-example-error
55
spec:
66
# 可填写多个 flow 流程
77
# 每个 flow 中重要字段 分别为:
@@ -22,8 +22,7 @@ spec:
2222
- sleep 10s
2323
imagePullPolicy: IfNotPresent
2424
name: nginx
25-
restartPolicy: OnFailure
26-
- name: job2
25+
- name: job2-error-container
2726
jobTemplate:
2827
template:
2928
spec:
@@ -32,10 +31,9 @@ spec:
3231
command:
3332
- sh
3433
- -c
35-
- sleep 100s
34+
- sleep 10s && exit 1
3635
imagePullPolicy: IfNotPresent
3736
name: nginx
38-
restartPolicy: OnFailure
3937
dependencies:
4038
- job1 # 代表 job2 依赖 job1 完成后才开始启动
4139
- name: job3
@@ -50,7 +48,6 @@ spec:
5048
- sleep 100s
5149
imagePullPolicy: IfNotPresent
5250
name: nginx
53-
restartPolicy: OnFailure
5451
dependencies:
5552
# 代表 job3 依赖 job1 job2 完成后才开始启动
5653
- job1
@@ -67,22 +64,3 @@ spec:
6764
- sleep 10s
6865
imagePullPolicy: IfNotPresent
6966
name: nginx
70-
restartPolicy: OnFailure
71-
- name: job5
72-
dependencies:
73-
# 代表依赖 job2 job4 后才执行
74-
- job4
75-
- job2
76-
jobTemplate:
77-
template:
78-
spec:
79-
containers:
80-
- image: busybox:1.28
81-
command:
82-
- sh
83-
- -c
84-
- sleep 10s
85-
imagePullPolicy: IfNotPresent
86-
name: nginx
87-
restartPolicy: OnFailure
88-

yaml/example.yaml

+18-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
apiVersion: api.practice.com/v1alpha1
22
kind: JobFlow
33
metadata:
4-
name: my-sequential-job
4+
name: jobflow-example
55
spec:
66
# 可填写多个 flow 流程
77
# 每个 flow 中重要字段 分别为:
@@ -68,4 +68,21 @@ spec:
6868
imagePullPolicy: IfNotPresent
6969
name: nginx
7070
restartPolicy: OnFailure
71+
- name: job5
72+
dependencies:
73+
# 代表依赖 job2 job4 后才执行
74+
- job4
75+
- job2
76+
jobTemplate:
77+
template:
78+
spec:
79+
containers:
80+
- image: busybox:1.28
81+
command:
82+
- sh
83+
- -c
84+
- sleep 10s
85+
imagePullPolicy: IfNotPresent
86+
name: nginx
87+
restartPolicy: OnFailure
7188

0 commit comments

Comments
 (0)