@@ -2,16 +2,24 @@ package controller
2
2
3
3
import (
4
4
"context"
5
+ "fmt"
5
6
"github.com/go-logr/logr"
7
+ jobflowv1alpha1 "github.com/myoperator/jobflowoperator/pkg/apis/jobflow/v1alpha1"
8
+ v1 "k8s.io/api/core/v1"
9
+ "k8s.io/apimachinery/pkg/api/errors"
6
10
"k8s.io/apimachinery/pkg/runtime"
11
+ "k8s.io/client-go/tools/record"
12
+ "k8s.io/klog/v2"
7
13
"sigs.k8s.io/controller-runtime/pkg/client"
8
14
"sigs.k8s.io/controller-runtime/pkg/reconcile"
15
+ "time"
9
16
)
10
17
11
18
type JobFlowController struct {
12
- client client.Client
13
- Scheme * runtime.Scheme
14
- log logr.Logger
19
+ client client.Client
20
+ Scheme * runtime.Scheme
21
+ Recorder record.EventRecorder
22
+ log logr.Logger
15
23
}
16
24
17
25
func NewJobFlowController (client client.Client , log logr.Logger , scheme * runtime.Scheme ) * JobFlowController {
@@ -25,5 +33,27 @@ func NewJobFlowController(client client.Client, log logr.Logger, scheme *runtime
25
33
// Reconcile 调协 loop
26
34
func (r * JobFlowController ) Reconcile (ctx context.Context , req reconcile.Request ) (reconcile.Result , error ) {
27
35
36
+ klog .Info ("start jobFlow Reconcile.........." )
37
+ klog .Info (fmt .Sprintf ("req.%v" , req ))
38
+
39
+ // load JobFlow by namespace
40
+ jobFlow := & jobflowv1alpha1.JobFlow {}
41
+ time .Sleep (time .Second )
42
+ err := r .client .Get (ctx , req .NamespacedName , jobFlow )
43
+ if err != nil {
44
+ // If no instance is found, it will be returned directly
45
+ if errors .IsNotFound (err ) {
46
+ klog .Info (fmt .Sprintf ("not found jobFlow : %v" , req .Name ))
47
+ return reconcile.Result {}, nil
48
+ }
49
+ klog .Error (err , err .Error ())
50
+ r .Recorder .Eventf (jobFlow , v1 .EventTypeWarning , "Created" , err .Error ())
51
+ return reconcile.Result {}, err
52
+ }
53
+
54
+ // 启动 依序启动 job 任务
55
+
56
+ // 改变 job 状态
57
+
28
58
return reconcile.Result {}, nil
29
59
}
0 commit comments