diff --git a/apis/fluentd/v1alpha1/tests/helper_test.go b/apis/fluentd/v1alpha1/tests/helper_test.go index c113c0d9e..f6730783c 100644 --- a/apis/fluentd/v1alpha1/tests/helper_test.go +++ b/apis/fluentd/v1alpha1/tests/helper_test.go @@ -21,134 +21,33 @@ const ( ) func Test_Cfg2ES(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - clusterOutputsForCluster := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ES} - cfgRouter, err := psr.BuildCfgRouter(&FluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - cfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputsForCluster) - err = psr.WithCfgResources(*cfgRouter.Label, cfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - // fmt.Println(config) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-namespaced-cfg-output-es.cfg"))).To(Equal(config)) - } + testNamespacedConfig(t, sl, Fluentd, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ES}, "./expected/fluentd-namespaced-cfg-output-es.cfg") } func Test_ClusterCfgInputTail(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, FluentdInputTail.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputTag} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputs) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - // fmt.Println(config) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-global-cfg-input-tail.cfg"))).To(Equal(config)) - } + testClusterConfigWithGlobalInputs(t, sl, FluentdInputTail, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputTag}, "./expected/fluentd-global-cfg-input-tail.cfg") } func Test_ClusterCfgInputSample(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, FluentdInputSample.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputTag} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputs) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - // fmt.Println(config) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-global-cfg-input-sample.cfg"))).To(Equal(config)) - } + testClusterConfigWithGlobalInputs(t, sl, FluentdInputSample, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputTag}, "./expected/fluentd-global-cfg-input-sample.cfg") } func Test_ClusterCfgInputMonitorAgent(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, FluentdInputMonitorAgent.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputTag} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputs) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-global-cfg-input-monitorAgent.cfg"))).To(Equal(config)) - } + testClusterConfigWithGlobalInputs(t, sl, FluentdInputMonitorAgent, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputTag}, "./expected/fluentd-global-cfg-input-monitorAgent.cfg") } func Test_ClusterCfgOutput2ES(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterFilters := []fluentdv1alpha1.ClusterFilter{} - clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ES} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-es.cfg"))).To(Equal(config)) - } + testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{}, []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ES}, "./expected/fluentd-cluster-cfg-output-es.cfg", false) } func Test_ClusterCfgOutput2ESDataStream(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterFilters := []fluentdv1alpha1.ClusterFilter{} - clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ESDataStream} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-es-data-stream.cfg"))).To(Equal(config)) - } + testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{}, []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ESDataStream}, "./expected/fluentd-cluster-cfg-output-es-data-stream.cfg", false) } func Test_ClusterCfgOutput2CopyESDataStream(t *testing.T) { @@ -176,24 +75,8 @@ func Test_ClusterCfgOutput2CopyESDataStream(t *testing.T) { } func Test_Cfg2OpenSearch(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - clusterOutputsForCluster := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2OpenSearch} - cfgRouter, err := psr.BuildCfgRouter(&FluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - cfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputsForCluster) - err = psr.WithCfgResources(*cfgRouter.Label, cfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-namespaced-cfg-output-opensearch.cfg"))).To(Equal(config)) - } + testNamespacedConfig(t, sl, Fluentd, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2OpenSearch}, "./expected/fluentd-namespaced-cfg-output-opensearch.cfg") } func Test_ClusterCfgOutput2OpenSearch(t *testing.T) { @@ -220,26 +103,8 @@ func Test_ClusterCfgOutput2OpenSearch(t *testing.T) { } func Test_ClusterCfgOutput2Kafka(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterFilters := []fluentdv1alpha1.ClusterFilter{FluentdClusterFilter1} - clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdClusterOutput2kafka} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - // fmt.Println(config) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-kafka.cfg"))).To(Equal(config)) - } + testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{FluentdClusterFilter1}, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutput2kafka}, "./expected/fluentd-cluster-cfg-output-kafka.cfg", false) } func Test_ClusterCfgOutput2Loki(t *testing.T) { @@ -266,97 +131,23 @@ func Test_ClusterCfgOutput2Loki(t *testing.T) { } func Test_MixedCfgs2ES(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterOutputsForCluster := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ES} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputsForCluster) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - cfgRouter, err := psr.BuildCfgRouter(&FluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - cfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputsForCluster) - err = psr.WithCfgResources(*cfgRouter.Label, cfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - // fmt.Println(config) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-mixed-cfgs-output-es.cfg"))).To(Equal(config)) - } + testMixedConfigs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ES}, "./expected/fluentd-mixed-cfgs-output-es.cfg") } func Test_ClusterCfgOutput2CloudWatch(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterFilters := []fluentdv1alpha1.ClusterFilter{FluentdClusterFilter1} - clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdClusterOutput2CloudWatch} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(strings.TrimSpace(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-cloudwatch.cfg")))).To(Equal(config)) - } + testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{FluentdClusterFilter1}, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutput2CloudWatch}, "./expected/fluentd-cluster-cfg-output-cloudwatch.cfg", true) } func Test_ClusterCfgOutput2Datadog(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterFilters := []fluentdv1alpha1.ClusterFilter{FluentdClusterFilter1} - clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdClusterOutput2Datadog} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(strings.TrimSpace(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-datadog.cfg")))).To(Equal(config)) - } + testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{FluentdClusterFilter1}, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutput2Datadog}, "./expected/fluentd-cluster-cfg-output-datadog.cfg", true) } func Test_ClusterCfgOutput2Null(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterFilters := []fluentdv1alpha1.ClusterFilter{FluentdClusterFilter1} - clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdClusterOutput2Null} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(strings.TrimSpace(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-null.cfg")))).To(Equal(config)) - } + testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{FluentdClusterFilter1}, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutput2Null}, "./expected/fluentd-cluster-cfg-output-null.cfg", true) } func Test_MixedCfgCopy1(t *testing.T) { @@ -390,61 +181,13 @@ func Test_MixedCfgCopy1(t *testing.T) { } func Test_MixedCfgCopy2(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - cfgRouter, err := psr.BuildCfgRouter(&FluentdConfig2) - g.Expect(err).NotTo(HaveOccurred()) - outputsForCluster := []fluentdv1alpha1.Output{FluentdOutputMixedCopy2} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdConfig2.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, []fluentdv1alpha1.ClusterOutput{}) - cfgResources, _ := psr.PatchAndFilterNamespacedLevelResources(sl, FluentdConfig2.GetCfgId(), []fluentdv1alpha1.Input{}, []fluentdv1alpha1.Filter{}, outputsForCluster) - cfgResources.InputPlugins = append(cfgResources.InputPlugins, clustercfgResources.InputPlugins...) - cfgResources.FilterPlugins = append(cfgResources.FilterPlugins, clustercfgResources.FilterPlugins...) - cfgResources.OutputPlugins = append(cfgResources.OutputPlugins, clustercfgResources.OutputPlugins...) - err = psr.IdentifyCopyAndPatchOutput(cfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - err = psr.WithCfgResources(*cfgRouter.Label, cfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - // fmt.Println(config) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-mixed-cfgs-output-copy-2.cfg"))).To(Equal(config)) - } + testMixedConfigWithCopy(t, sl, Fluentd, &FluentdConfig2, []fluentdv1alpha1.Output{FluentdOutputMixedCopy2}, []fluentdv1alpha1.ClusterOutput{}, "./expected/fluentd-mixed-cfgs-output-copy-2.cfg") } func Test_MixedCfgCopy3(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - cfgRouter, err := psr.BuildCfgRouter(&FluentdConfig2) - g.Expect(err).NotTo(HaveOccurred()) - outputsForCluster := []fluentdv1alpha1.Output{FluentdOutputMixedCopy3} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdConfig2.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, []fluentdv1alpha1.ClusterOutput{}) - cfgResources, _ := psr.PatchAndFilterNamespacedLevelResources(sl, FluentdConfig2.GetCfgId(), []fluentdv1alpha1.Input{}, []fluentdv1alpha1.Filter{}, outputsForCluster) - cfgResources.InputPlugins = append(cfgResources.InputPlugins, clustercfgResources.InputPlugins...) - cfgResources.FilterPlugins = append(cfgResources.FilterPlugins, clustercfgResources.FilterPlugins...) - cfgResources.OutputPlugins = append(cfgResources.OutputPlugins, clustercfgResources.OutputPlugins...) - err = psr.IdentifyCopyAndPatchOutput(cfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - err = psr.WithCfgResources(*cfgRouter.Label, cfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - // fmt.Println(config) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-mixed-cfgs-output-copy-3.cfg"))).To(Equal(config)) - } + testMixedConfigWithCopy(t, sl, Fluentd, &FluentdConfig2, []fluentdv1alpha1.Output{FluentdOutputMixedCopy3}, []fluentdv1alpha1.ClusterOutput{}, "./expected/fluentd-mixed-cfgs-output-copy-3.cfg") } func Test_MixedCfgCopy4(t *testing.T) { @@ -505,31 +248,8 @@ func Test_ClusterCfgOutput2StdoutAndLoki(t *testing.T) { } func Test_MixedCfgs2OpenSearch(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterOutputsForCluster := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2OpenSearch} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputsForCluster) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - cfgRouter, err := psr.BuildCfgRouter(&FluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - cfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputsForCluster) - err = psr.WithCfgResources(*cfgRouter.Label, cfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - // fmt.Println(config) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-mixed-cfgs-output-opensearch.cfg"))).To(Equal(config)) - } + testMixedConfigs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, &FluentdConfig1, []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2OpenSearch}, "./expected/fluentd-mixed-cfgs-output-opensearch.cfg") } func Test_MixedCfgs2MultiTenant(t *testing.T) { @@ -566,49 +286,13 @@ func Test_MixedCfgs2MultiTenant(t *testing.T) { } func Test_OutputWithBuffer(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterFilters := []fluentdv1alpha1.ClusterFilter{FluentdClusterFilter1} - clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputBuffer} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - // fmt.Println(config) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-buffer-example.cfg"))).To(Equal(config)) - } + testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{FluentdClusterFilter1}, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputBuffer}, "./expected/fluentd-cluster-cfg-output-buffer-example.cfg", false) } func Test_OutputWithMemoryBuffer(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) - - psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) - - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1) - g.Expect(err).NotTo(HaveOccurred()) - clusterFilters := []fluentdv1alpha1.ClusterFilter{FluentdClusterFilter1} - clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputMemoryBuffer} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs) - err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) - g.Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < maxRuntimes; i++ { - config, errs := psr.RenderMainConfig(false) - // fmt.Println(config) - g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-memory-buffer.cfg"))).To(Equal(config)) - } + testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{FluentdClusterFilter1}, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputMemoryBuffer}, "./expected/fluentd-cluster-cfg-output-memory-buffer.cfg", false) } func Test_DuplicateRemovalCRSpecs(t *testing.T) { @@ -815,25 +499,164 @@ func Test_DuplicateRemovalCRSpecs(t *testing.T) { } func Test_RecordTransformer(t *testing.T) { - g := NewGomegaWithT(t) sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) + testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{FluentdClusterRecordTransformerFilter}, []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputCluster}, "./expected/fluentd-cluster-cfg-filter-recordTransformer.cfg", false) +} + +// testNamespacedConfig tests a namespaced config with cluster outputs +func testNamespacedConfig( + t *testing.T, + sl plugins.SecretLoader, + fluentd fluentdv1alpha1.Fluentd, + config *fluentdv1alpha1.FluentdConfig, + clusterOutputs []fluentdv1alpha1.ClusterOutput, + expectedCfgPath string, +) { + g := NewGomegaWithT(t) psr := fluentdv1alpha1.NewGlobalPluginResources("main") - psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs) + psr.CombineGlobalInputsPlugins(sl, fluentd.Spec.GlobalInputs) - clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1) + cfgRouter, err := psr.BuildCfgRouter(config) g.Expect(err).NotTo(HaveOccurred()) - clusterFilters := []fluentdv1alpha1.ClusterFilter{FluentdClusterRecordTransformerFilter} - clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdClusterOutputCluster} - clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs) + cfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, config.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputs) + err = psr.WithCfgResources(*cfgRouter.Label, cfgResources) + g.Expect(err).NotTo(HaveOccurred()) + + for i := 0; i < maxRuntimes; i++ { + config, errs := psr.RenderMainConfig(false) + g.Expect(errs).NotTo(HaveOccurred()) + g.Expect(string(getExpectedCfg(expectedCfgPath))).To(Equal(config)) + } +} + +// testClusterConfigWithGlobalInputs tests a cluster config with custom global inputs +func testClusterConfigWithGlobalInputs( + t *testing.T, + sl plugins.SecretLoader, + fluentd fluentdv1alpha1.Fluentd, + config *fluentdv1alpha1.FluentdConfig, + clusterOutputs []fluentdv1alpha1.ClusterOutput, + expectedCfgPath string, +) { + g := NewGomegaWithT(t) + + psr := fluentdv1alpha1.NewGlobalPluginResources("main") + psr.CombineGlobalInputsPlugins(sl, fluentd.Spec.GlobalInputs) + + clustercfgRouter, err := psr.BuildCfgRouter(config) + g.Expect(err).NotTo(HaveOccurred()) + clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, config.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputs) err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) g.Expect(err).NotTo(HaveOccurred()) for i := 0; i < maxRuntimes; i++ { config, errs := psr.RenderMainConfig(false) - // fmt.Println(config) g.Expect(errs).NotTo(HaveOccurred()) - g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-filter-recordTransformer.cfg"))).To(Equal(config)) + g.Expect(string(getExpectedCfg(expectedCfgPath))).To(Equal(config)) + } +} + +// testClusterConfigWithFiltersAndOutputs tests a cluster config with filters and outputs +func testClusterConfigWithFiltersAndOutputs( + t *testing.T, + sl plugins.SecretLoader, + fluentd fluentdv1alpha1.Fluentd, + clusterConfig *fluentdv1alpha1.ClusterFluentdConfig, + clusterFilters []fluentdv1alpha1.ClusterFilter, + clusterOutputs []fluentdv1alpha1.ClusterOutput, + expectedCfgPath string, + useTrimSpace bool, +) { + g := NewGomegaWithT(t) + + psr := fluentdv1alpha1.NewGlobalPluginResources("main") + psr.CombineGlobalInputsPlugins(sl, fluentd.Spec.GlobalInputs) + + clustercfgRouter, err := psr.BuildCfgRouter(clusterConfig) + g.Expect(err).NotTo(HaveOccurred()) + clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, clusterConfig.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs) + err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) + g.Expect(err).NotTo(HaveOccurred()) + + for i := 0; i < maxRuntimes; i++ { + config, errs := psr.RenderMainConfig(false) + g.Expect(errs).NotTo(HaveOccurred()) + expectedCfg := string(getExpectedCfg(expectedCfgPath)) + if useTrimSpace { + g.Expect(strings.TrimSpace(expectedCfg)).To(Equal(config)) + } else { + g.Expect(expectedCfg).To(Equal(config)) + } + } +} + +// testMixedConfigs tests a combination of cluster and namespace configs +func testMixedConfigs( + t *testing.T, + sl plugins.SecretLoader, + fluentd fluentdv1alpha1.Fluentd, + clusterConfig *fluentdv1alpha1.ClusterFluentdConfig, + namespacedConfig *fluentdv1alpha1.FluentdConfig, + clusterOutputs []fluentdv1alpha1.ClusterOutput, + expectedCfgPath string, +) { + g := NewGomegaWithT(t) + + psr := fluentdv1alpha1.NewGlobalPluginResources("main") + psr.CombineGlobalInputsPlugins(sl, fluentd.Spec.GlobalInputs) + + clustercfgRouter, err := psr.BuildCfgRouter(clusterConfig) + g.Expect(err).NotTo(HaveOccurred()) + clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, clusterConfig.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputs) + err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources) + g.Expect(err).NotTo(HaveOccurred()) + + cfgRouter, err := psr.BuildCfgRouter(namespacedConfig) + g.Expect(err).NotTo(HaveOccurred()) + cfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, namespacedConfig.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputs) + err = psr.WithCfgResources(*cfgRouter.Label, cfgResources) + g.Expect(err).NotTo(HaveOccurred()) + + for i := 0; i < maxRuntimes; i++ { + config, errs := psr.RenderMainConfig(false) + g.Expect(errs).NotTo(HaveOccurred()) + g.Expect(string(getExpectedCfg(expectedCfgPath))).To(Equal(config)) + } +} + +// testMixedConfigWithCopy tests a mixed config with copy outputs +func testMixedConfigWithCopy( + t *testing.T, + sl plugins.SecretLoader, + fluentd fluentdv1alpha1.Fluentd, + namespacedConfig *fluentdv1alpha1.FluentdConfig, + outputs []fluentdv1alpha1.Output, + clusterOutputs []fluentdv1alpha1.ClusterOutput, + expectedCfgPath string, +) { + g := NewGomegaWithT(t) + + psr := fluentdv1alpha1.NewGlobalPluginResources("main") + psr.CombineGlobalInputsPlugins(sl, fluentd.Spec.GlobalInputs) + + cfgRouter, err := psr.BuildCfgRouter(namespacedConfig) + g.Expect(err).NotTo(HaveOccurred()) + clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, namespacedConfig.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, []fluentdv1alpha1.ClusterFilter{}, clusterOutputs) + cfgResources, _ := psr.PatchAndFilterNamespacedLevelResources(sl, namespacedConfig.GetCfgId(), []fluentdv1alpha1.Input{}, []fluentdv1alpha1.Filter{}, outputs) + cfgResources.InputPlugins = append(cfgResources.InputPlugins, clustercfgResources.InputPlugins...) + cfgResources.FilterPlugins = append(cfgResources.FilterPlugins, clustercfgResources.FilterPlugins...) + cfgResources.OutputPlugins = append(cfgResources.OutputPlugins, clustercfgResources.OutputPlugins...) + err = psr.IdentifyCopyAndPatchOutput(cfgResources) + g.Expect(err).NotTo(HaveOccurred()) + + err = psr.WithCfgResources(*cfgRouter.Label, cfgResources) + g.Expect(err).NotTo(HaveOccurred()) + + for i := 0; i < maxRuntimes; i++ { + config, errs := psr.RenderMainConfig(false) + g.Expect(errs).NotTo(HaveOccurred()) + g.Expect(string(getExpectedCfg(expectedCfgPath))).To(Equal(config)) } } diff --git a/pkg/operator/fluentd-daemonset.go b/pkg/operator/fluentd-daemonset.go index 7ec3d3b50..30e74957f 100644 --- a/pkg/operator/fluentd-daemonset.go +++ b/pkg/operator/fluentd-daemonset.go @@ -164,41 +164,11 @@ func MakeFluentdDaemonSet(fd fluentdv1alpha1.Fluentd) *appsv1.DaemonSet { MountPath: "/fluentd/tail", }) } - // Mount host or emptydir VolumeSource - if fd.Spec.BufferVolume != nil && !fd.Spec.BufferVolume.DisableBufferVolume { - bufferVolName := fmt.Sprintf("%s-buffer", fd.Name) - bufferpv := fd.Spec.BufferVolume - - if bufferpv.HostPath != nil { - specTemplateSpec.Volumes = append(specTemplateSpec.Volumes, corev1.Volume{ - Name: bufferVolName, - VolumeSource: corev1.VolumeSource{ - HostPath: bufferpv.HostPath, - }, - }) - - specTemplateSpec.Containers[0].VolumeMounts = append(specTemplateSpec.Containers[0].VolumeMounts, corev1.VolumeMount{ - Name: bufferVolName, - MountPath: BufferMountPath, - }) - return &ds - } - - if bufferpv.EmptyDir != nil { - specTemplateSpec.Volumes = append(specTemplateSpec.Volumes, corev1.Volume{ - Name: bufferVolName, - VolumeSource: corev1.VolumeSource{ - EmptyDir: bufferpv.EmptyDir, - }, - }) - - specTemplateSpec.Containers[0].VolumeMounts = append(specTemplateSpec.Containers[0].VolumeMounts, corev1.VolumeMount{ - Name: bufferVolName, - MountPath: BufferMountPath, - }) - return &ds - } + // Mount host or emptydir VolumeSource + if configureBufferVolume(fd, specTemplateSpec) { + return &ds } + return &ds } diff --git a/pkg/operator/shared.go b/pkg/operator/shared.go new file mode 100644 index 000000000..215da7279 --- /dev/null +++ b/pkg/operator/shared.go @@ -0,0 +1,50 @@ +package operator + +import ( + "fmt" + + fluentdv1alpha1 "github.com/fluent/fluent-operator/v3/apis/fluentd/v1alpha1" + corev1 "k8s.io/api/core/v1" +) + +// configureBufferVolume configures the buffer volume for fluentd. +// Returns true if the caller should return early. +func configureBufferVolume(fd fluentdv1alpha1.Fluentd, specTemplateSpec *corev1.PodSpec) bool { + if fd.Spec.BufferVolume != nil && !fd.Spec.BufferVolume.DisableBufferVolume { + bufferVolName := fmt.Sprintf("%s-buffer", fd.Name) + bufferpv := fd.Spec.BufferVolume + + if bufferpv.HostPath != nil { + specTemplateSpec.Volumes = append(specTemplateSpec.Volumes, corev1.Volume{ + Name: bufferVolName, + VolumeSource: corev1.VolumeSource{ + HostPath: bufferpv.HostPath, + }, + }) + + specTemplateSpec.Containers[0].VolumeMounts = append(specTemplateSpec.Containers[0].VolumeMounts, corev1.VolumeMount{ + Name: bufferVolName, + MountPath: BufferMountPath, + }) + + return true + } + + if bufferpv.EmptyDir != nil { + specTemplateSpec.Volumes = append(specTemplateSpec.Volumes, corev1.Volume{ + Name: bufferVolName, + VolumeSource: corev1.VolumeSource{ + EmptyDir: bufferpv.EmptyDir, + }, + }) + + specTemplateSpec.Containers[0].VolumeMounts = append(specTemplateSpec.Containers[0].VolumeMounts, corev1.VolumeMount{ + Name: bufferVolName, + MountPath: BufferMountPath, + }) + + return true + } + } + return false +} diff --git a/pkg/operator/sts.go b/pkg/operator/sts.go index 0ada150af..dd7c50345 100644 --- a/pkg/operator/sts.go +++ b/pkg/operator/sts.go @@ -163,40 +163,8 @@ func MakeStatefulSet(fd fluentdv1alpha1.Fluentd) *appsv1.StatefulSet { } // Mount host or emptydir VolumeSource - if fd.Spec.BufferVolume != nil && !fd.Spec.BufferVolume.DisableBufferVolume { - bufferVolName := fmt.Sprintf("%s-buffer", fd.Name) - bufferpv := fd.Spec.BufferVolume - - if bufferpv.HostPath != nil { - specTemplateSpec.Volumes = append(specTemplateSpec.Volumes, corev1.Volume{ - Name: bufferVolName, - VolumeSource: corev1.VolumeSource{ - HostPath: bufferpv.HostPath, - }, - }) - - specTemplateSpec.Containers[0].VolumeMounts = append(specTemplateSpec.Containers[0].VolumeMounts, corev1.VolumeMount{ - Name: bufferVolName, - MountPath: BufferMountPath, - }) - return &sts - } - - if bufferpv.EmptyDir != nil { - specTemplateSpec.Volumes = append(specTemplateSpec.Volumes, corev1.Volume{ - Name: bufferVolName, - VolumeSource: corev1.VolumeSource{ - EmptyDir: bufferpv.EmptyDir, - }, - }) - - specTemplateSpec.Containers[0].VolumeMounts = append(specTemplateSpec.Containers[0].VolumeMounts, corev1.VolumeMount{ - Name: bufferVolName, - MountPath: BufferMountPath, - }) - - return &sts - } + if configureBufferVolume(fd, specTemplateSpec) { + return &sts } if fd.Spec.BufferVolume == nil || !fd.Spec.BufferVolume.DisableBufferVolume {