Skip to content

Commit 56add86

Browse files
committed
Build a single hub manager command
Signed-off-by: Jian Qiu <[email protected]>
1 parent 4cb6e38 commit 56add86

File tree

5 files changed

+328
-53
lines changed

5 files changed

+328
-53
lines changed

cmd/registration-operator/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func newNucleusCommand() *cobra.Command {
4747
}
4848

4949
cmd.AddCommand(hub.NewHubOperatorCmd())
50+
cmd.AddCommand(hub.NewHubManagerCmd())
5051
cmd.AddCommand(spoke.NewKlusterletOperatorCmd())
5152
cmd.AddCommand(spoke.NewKlusterletAgentCmd())
5253

pkg/cmd/hub/operator.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
1010
"open-cluster-management.io/ocm/pkg/operator/operators/clustermanager"
11+
"open-cluster-management.io/ocm/pkg/singleton/hub"
1112
"open-cluster-management.io/ocm/pkg/version"
1213
)
1314

@@ -33,3 +34,16 @@ func NewHubOperatorCmd() *cobra.Command {
3334
opts.AddFlags(flags)
3435
return cmd
3536
}
37+
38+
func NewHubManagerCmd() *cobra.Command {
39+
opts := hub.NewHubOption()
40+
commonOpts := opts.CommonOption
41+
cmd := commonOpts.NewControllerCommandConfig("hub-manager", version.Get(), opts.RunManager, clock.RealClock{}).
42+
NewCommandWithContext(context.TODO())
43+
cmd.Use = "hub-manager"
44+
cmd.Short = "Start the hub manager"
45+
46+
flags := cmd.Flags()
47+
opts.AddFlags(flags)
48+
return cmd
49+
}

pkg/singleton/hub/manager.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package hub
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/openshift/library-go/pkg/controller/controllercmd"
8+
"github.com/spf13/pflag"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/client-go/dynamic"
11+
"k8s.io/client-go/dynamic/dynamicinformer"
12+
kubeinformers "k8s.io/client-go/informers"
13+
"k8s.io/client-go/kubernetes"
14+
"k8s.io/client-go/metadata"
15+
"k8s.io/client-go/rest"
16+
"k8s.io/klog/v2"
17+
cpclientset "sigs.k8s.io/cluster-inventory-api/client/clientset/versioned"
18+
cpinformerv1alpha1 "sigs.k8s.io/cluster-inventory-api/client/informers/externalversions"
19+
20+
addonclient "open-cluster-management.io/api/client/addon/clientset/versioned"
21+
addoninformers "open-cluster-management.io/api/client/addon/informers/externalversions"
22+
clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned"
23+
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
24+
workv1client "open-cluster-management.io/api/client/work/clientset/versioned"
25+
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions"
26+
clusterv1 "open-cluster-management.io/api/cluster/v1"
27+
ocmfeature "open-cluster-management.io/api/feature"
28+
29+
"open-cluster-management.io/ocm/pkg/addon"
30+
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
31+
"open-cluster-management.io/ocm/pkg/features"
32+
placementcontrollers "open-cluster-management.io/ocm/pkg/placement/controllers"
33+
registrationhub "open-cluster-management.io/ocm/pkg/registration/hub"
34+
workhub "open-cluster-management.io/ocm/pkg/work/hub"
35+
)
36+
37+
type HubOption struct {
38+
CommonOption *commonoptions.Options
39+
registrationOption *registrationhub.HubManagerOptions
40+
workOption *workhub.WorkHubManagerOptions
41+
}
42+
43+
func NewHubOption() *HubOption {
44+
return &HubOption{
45+
CommonOption: commonoptions.NewOptions(),
46+
registrationOption: registrationhub.NewHubManagerOptions(),
47+
workOption: workhub.NewWorkHubManagerOptions(),
48+
}
49+
}
50+
51+
func (o *HubOption) AddFlags(fs *pflag.FlagSet) {
52+
o.CommonOption.AddFlags(fs)
53+
o.registrationOption.AddFlags(fs)
54+
o.workOption.AddFlags(fs)
55+
}
56+
57+
func (o *HubOption) RunManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
58+
kubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig)
59+
if err != nil {
60+
return err
61+
}
62+
63+
dynamicClient, err := dynamic.NewForConfig(controllerContext.KubeConfig)
64+
if err != nil {
65+
return err
66+
}
67+
68+
// copy a separate config for gc controller and increase the gc controller's throughput.
69+
metadataKubeConfig := rest.CopyConfig(controllerContext.KubeConfig)
70+
metadataKubeConfig.QPS = controllerContext.KubeConfig.QPS * 2
71+
metadataKubeConfig.Burst = controllerContext.KubeConfig.Burst * 2
72+
metadataClient, err := metadata.NewForConfig(metadataKubeConfig)
73+
if err != nil {
74+
return err
75+
}
76+
77+
clusterClient, err := clusterv1client.NewForConfig(controllerContext.KubeConfig)
78+
if err != nil {
79+
return err
80+
}
81+
82+
clusterProfileClient, err := cpclientset.NewForConfig(controllerContext.KubeConfig)
83+
if err != nil {
84+
return err
85+
}
86+
87+
workClient, err := workv1client.NewForConfig(controllerContext.KubeConfig)
88+
if err != nil {
89+
return err
90+
}
91+
92+
addOnClient, err := addonclient.NewForConfig(controllerContext.KubeConfig)
93+
if err != nil {
94+
return err
95+
}
96+
97+
clusterInformers := clusterv1informers.NewSharedInformerFactory(clusterClient, 30*time.Minute)
98+
clusterProfileInformers := cpinformerv1alpha1.NewSharedInformerFactory(clusterProfileClient, 30*time.Minute)
99+
workInformers := workv1informers.NewSharedInformerFactory(workClient, 30*time.Minute)
100+
kubeInfomers := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 30*time.Minute, kubeinformers.WithTweakListOptions(
101+
func(listOptions *metav1.ListOptions) {
102+
// Note all kube resources managed by registration should have the cluster label, and should not have
103+
// the addon label.
104+
selector := &metav1.LabelSelector{
105+
MatchExpressions: []metav1.LabelSelectorRequirement{
106+
{
107+
Key: clusterv1.ClusterNameLabelKey,
108+
Operator: metav1.LabelSelectorOpExists,
109+
},
110+
},
111+
}
112+
listOptions.LabelSelector = metav1.FormatLabelSelector(selector)
113+
}))
114+
addOnInformers := addoninformers.NewSharedInformerFactory(addOnClient, 30*time.Minute)
115+
dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 30*time.Minute)
116+
117+
// start registration controller
118+
go func() {
119+
err := o.registrationOption.RunControllerManagerWithInformers(
120+
ctx, controllerContext,
121+
kubeClient, metadataClient, clusterClient, clusterProfileClient, addOnClient,
122+
kubeInfomers, clusterInformers, clusterProfileInformers, workInformers, addOnInformers)
123+
if err != nil {
124+
klog.Fatal(err)
125+
}
126+
}()
127+
128+
// start placement controller
129+
go func() {
130+
err := placementcontrollers.RunControllerManagerWithInformers(
131+
ctx, controllerContext, kubeClient, clusterClient, clusterInformers)
132+
if err != nil {
133+
klog.Fatal(err)
134+
}
135+
}()
136+
137+
// start addon controller
138+
if features.HubMutableFeatureGate.Enabled(ocmfeature.AddonManagement) {
139+
go func() {
140+
err := addon.RunControllerManagerWithInformers(
141+
ctx, controllerContext, kubeClient, addOnClient, workClient,
142+
clusterInformers, addOnInformers, workInformers, dynamicInformers)
143+
if err != nil {
144+
klog.Fatal(err)
145+
}
146+
}()
147+
}
148+
149+
// start work controller
150+
if features.HubMutableFeatureGate.Enabled(ocmfeature.ManifestWorkReplicaSet) {
151+
go func() {
152+
hubConfig := workhub.NewWorkHubManagerConfig(o.workOption)
153+
err := hubConfig.RunControllerManagerWithInformers(
154+
ctx, controllerContext, workClient, workInformers, workInformers, clusterInformers)
155+
if err != nil {
156+
klog.Fatal(err)
157+
}
158+
}()
159+
}
160+
161+
<-ctx.Done()
162+
return nil
163+
}

pkg/singleton/hub/manager_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package hub
2+
3+
import (
4+
"context"
5+
ocmfeature "open-cluster-management.io/api/feature"
6+
"open-cluster-management.io/ocm/pkg/features"
7+
"path/filepath"
8+
"testing"
9+
"time"
10+
11+
"github.com/onsi/ginkgo/v2"
12+
"github.com/onsi/gomega"
13+
"github.com/openshift/library-go/pkg/controller/controllercmd"
14+
"k8s.io/client-go/kubernetes/scheme"
15+
"k8s.io/client-go/rest"
16+
"sigs.k8s.io/controller-runtime/pkg/envtest"
17+
logf "sigs.k8s.io/controller-runtime/pkg/log"
18+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
19+
20+
workapiv1 "open-cluster-management.io/api/work/v1"
21+
22+
"open-cluster-management.io/ocm/test/integration/util"
23+
)
24+
25+
var testEnv *envtest.Environment
26+
var sourceConfigFileName string
27+
var cfg *rest.Config
28+
29+
var CRDPaths = []string{
30+
// hub
31+
"../../../vendor/open-cluster-management.io/api/work/v1/0000_00_work.open-cluster-management.io_manifestworks.crd.yaml",
32+
"../../../vendor/open-cluster-management.io/api/work/v1alpha1/0000_00_work.open-cluster-management.io_manifestworkreplicasets.crd.yaml",
33+
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "cluster", "v1"),
34+
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "cluster", "v1beta1"),
35+
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "cluster", "v1beta2"),
36+
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "addon", "v1alpha1"),
37+
}
38+
39+
func TestWorkManager(t *testing.T) {
40+
gomega.RegisterFailHandler(ginkgo.Fail)
41+
ginkgo.RunSpecs(t, "Singleton Hub Manager Suite")
42+
}
43+
44+
var _ = ginkgo.BeforeSuite(func() {
45+
logf.SetLogger(zap.New(zap.WriteTo(ginkgo.GinkgoWriter), zap.UseDevMode(true)))
46+
ginkgo.By("bootstrapping test environment")
47+
var err error
48+
49+
// start a kube-apiserver
50+
testEnv = &envtest.Environment{
51+
ErrorIfCRDPathMissing: true,
52+
CRDDirectoryPaths: CRDPaths,
53+
}
54+
cfg, err = testEnv.Start()
55+
gomega.Expect(err).ToNot(gomega.HaveOccurred())
56+
gomega.Expect(cfg).ToNot(gomega.BeNil())
57+
58+
err = workapiv1.Install(scheme.Scheme)
59+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
60+
61+
err = features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubRegistrationFeatureGates)
62+
gomega.Expect(err).ToNot(gomega.HaveOccurred())
63+
64+
err = features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubWorkFeatureGates)
65+
gomega.Expect(err).ToNot(gomega.HaveOccurred())
66+
67+
err = features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubAddonManagerFeatureGates)
68+
gomega.Expect(err).ToNot(gomega.HaveOccurred())
69+
70+
// enable ManagedClusterAutoApproval feature gate
71+
err = features.HubMutableFeatureGate.Set("ManagedClusterAutoApproval=true")
72+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
73+
74+
// enable resourceCleanup feature gate
75+
err = features.HubMutableFeatureGate.Set("ResourceCleanup=true")
76+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
77+
78+
err = features.HubMutableFeatureGate.Set("ClusterImporter=true")
79+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
80+
81+
err = features.HubMutableFeatureGate.Set("ManifestWorkReplicaSet=true")
82+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
83+
})
84+
85+
var _ = ginkgo.AfterSuite(func() {
86+
ginkgo.By("tearing down the test environment")
87+
88+
err := testEnv.Stop()
89+
gomega.Expect(err).ToNot(gomega.HaveOccurred())
90+
})
91+
92+
var _ = ginkgo.Describe("start hub manager", func() {
93+
ginkgo.It("start hub manager", func() {
94+
ctx, stopHub := context.WithCancel(context.Background())
95+
opts := NewHubOption()
96+
opts.workOption.WorkDriver = "kube"
97+
opts.workOption.WorkDriverConfig = sourceConfigFileName
98+
opts.registrationOption.ClusterAutoApprovalUsers = []string{util.AutoApprovalBootstrapUser}
99+
100+
// start hub controller
101+
go func() {
102+
err := opts.RunManager(ctx, &controllercmd.ControllerContext{
103+
KubeConfig: cfg,
104+
EventRecorder: util.NewIntegrationTestEventRecorder("hub"),
105+
})
106+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
107+
}()
108+
109+
time.Sleep(5 * time.Second)
110+
stopHub()
111+
})
112+
})

0 commit comments

Comments
 (0)