Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apis/fluentbit/v1alpha2/plugins/configmap_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func NewConfigMapLoader(c client.Client, ns string) ConfigMapLoader {

func (cl ConfigMapLoader) LoadConfigMap(selector v1.ConfigMapKeySelector, namespace string) (string, error) {
var configMap v1.ConfigMap
if err := cl.client.Get(context.Background(), client.ObjectKey{Name: selector.Name, Namespace: namespace}, &configMap); err != nil {
ctx := context.Background()
if err := cl.client.Get(ctx, client.ObjectKey{Name: selector.Name, Namespace: namespace}, &configMap); err != nil {
return "", err
}

Expand Down
3 changes: 2 additions & 1 deletion apis/fluentbit/v1alpha2/plugins/secret_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func NewSecretLoader(c client.Client, ns string) SecretLoader {

func (sl SecretLoader) LoadSecret(s Secret) (string, error) {
var secret corev1.Secret
if err := sl.Client.Get(context.Background(), client.ObjectKey{Name: s.ValueFrom.SecretKeyRef.Name, Namespace: sl.namespace}, &secret); err != nil {
ctx := context.Background()
if err := sl.Client.Get(ctx, client.ObjectKey{Name: s.ValueFrom.SecretKeyRef.Name, Namespace: sl.namespace}, &secret); err != nil {
return "", err
}

Expand Down
3 changes: 2 additions & 1 deletion apis/fluentd/v1alpha1/plugins/configmap_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func NewConfigMapLoader(c client.Client, ns string) ConfigMapLoader {

func (cl ConfigMapLoader) LoadConfigMap(selector v1.ConfigMapKeySelector) (string, error) {
var configMap v1.ConfigMap
if err := cl.client.Get(context.Background(), client.ObjectKey{Name: selector.Name, Namespace: cl.namespace}, &configMap); err != nil {
ctx := context.Background()
if err := cl.client.Get(ctx, client.ObjectKey{Name: selector.Name, Namespace: cl.namespace}, &configMap); err != nil {
return "", err
}

Expand Down
3 changes: 2 additions & 1 deletion apis/fluentd/v1alpha1/plugins/secret_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func NewSecretLoader(c client.Client, ns string, l logr.Logger) SecretLoader {

func (sl SecretLoaderStruct) LoadSecret(s Secret) (string, error) {
var secret corev1.Secret
if err := sl.client.Get(context.Background(), client.ObjectKey{Name: s.ValueFrom.SecretKeyRef.Name, Namespace: sl.namespace}, &secret); err != nil {
ctx := context.Background()
if err := sl.client.Get(ctx, client.ObjectKey{Name: s.ValueFrom.SecretKeyRef.Name, Namespace: sl.namespace}, &secret); err != nil {
return "", err
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/doc-gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
const (
firstParagraph = `# API Docs
This Document documents the types introduced by the %s Operator.
> Note this document is generated from code comments. When contributing a change to this document please do so by changing the code comments.`

Check failure on line 22 in cmd/doc-gen/main.go

View workflow job for this annotation

GitHub Actions / Run on Ubuntu

The line is 142 characters long, which exceeds the maximum of 120 characters. (lll)

fluentbitPluginPath = "apis/fluentbit/v1alpha2/plugins/"
fluentdPluginPath = "apis/fluentd/v1alpha1/plugins/"
Expand Down Expand Up @@ -380,7 +380,7 @@
func fieldName(field *ast.Field) string {
jsonTag := ""
if field.Tag != nil {
jsonTag = reflect.StructTag(field.Tag.Value[1 : len(field.Tag.Value)-1]).Get("json") // Delete first and last quotation
jsonTag = reflect.StructTag(strings.Trim(field.Tag.Value, "`")).Get("json")
if strings.Contains(jsonTag, "inline") {
return "-"
}
Expand Down
28 changes: 18 additions & 10 deletions cmd/fluent-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,27 @@ func main() {
var disabledControllers string
var tlsOpts []func(*tls.Config)

flag.StringVar(&watchNamespaces, "watch-namespaces", "", "Optional comma separated list of namespaces to watch for resources in. Defaults to cluster scope.")
flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
flag.StringVar(&watchNamespaces, "watch-namespaces", "",
"Optional comma separated list of namespaces to watch for resources in. Defaults to cluster scope.")
flag.StringVar(&metricsAddr, "metrics-bind-address", "0",
"The address the metrics endpoint binds to. Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&secureMetrics, "metrics-secure", true, "If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.")
flag.StringVar(&webhookCertPath, "webhook-cert-path", "", "The directory that contains the webhook certificate.")
flag.StringVar(&webhookCertName, "webhook-cert-name", "tls.crt", "The name of the webhook certificate file.")
flag.BoolVar(&secureMetrics, "metrics-secure", true,
"If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.")
flag.StringVar(&webhookCertPath, "webhook-cert-path", "",
"The directory that contains the webhook certificate.")
flag.StringVar(&webhookCertName, "webhook-cert-name", "tls.crt",
"The name of the webhook certificate file.")
flag.StringVar(&webhookCertKey, "webhook-cert-key", "tls.key", "The name of the webhook key file.")
flag.StringVar(&metricsCertPath, "metrics-cert-path", "", "The directory that contains the metrics server certificate.")
flag.StringVar(&metricsCertName, "metrics-cert-name", "tls.crt", "The name of the metrics server certificate file.")
flag.StringVar(&metricsCertPath, "metrics-cert-path", "",
"The directory that contains the metrics server certificate.")
flag.StringVar(&metricsCertName, "metrics-cert-name", "tls.crt",
"The name of the metrics server certificate file.")
flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.")
flag.BoolVar(&enableHTTP2, "enable-http2", false, "If set, HTTP/2 will be enabled for the metrics and webhook servers")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.StringVar(&disabledControllers, "disable-component-controllers", "",
"Optional argument that accepts two values: fluent-bit and fluentd. "+
Expand Down Expand Up @@ -240,7 +247,8 @@ func main() {
case fluentdName:
fluentdEnabled = false
default:
setupLog.Error(errors.New("incorrect value for `-disable-component-controllers` and it will not be proceeded (possible values are: fluent-bit, fluentd)"), "")
err := errors.New("incorrect value for disable-component-controllers flag")
setupLog.Error(err, "it will not be proceeded (possible values are: fluent-bit, fluentd)")
}
}

Expand Down
29 changes: 16 additions & 13 deletions cmd/fluent-watcher/fluentbit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,30 @@
flag.BoolVar(&exitOnFailure, "exit-on-failure", false, "Deprecated: This has no effect anymore.")
flag.DurationVar(&flbTerminationTimeout, "flb-timeout", 0, "Deprecated: This has no effect anymore.")

signalCtx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
signalCtx, cancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer cancel()

logger := log.NewLogfmtLogger(os.Stdout)
logger = log.With(logger, "time", log.TimestampFormat(time.Now, time.RFC3339))
l := log.NewLogfmtLogger(os.Stdout)
l = log.With(l, "time", log.TimestampFormat(time.Now, time.RFC3339))

// check the config file format
_, err := os.Stat(defaultSecretYamlPath)
if os.IsNotExist(err) {
_ = level.Info(logger).Log("msg", "No fluent-bit secret yaml found, using classic one.")
_ = level.Info(l).Log("msg", "No fluent-bit secret yaml found, using classic one.")
flag.StringVar(&configPath, "c", defaultCfgPath, "The classic config file path.")
} else {
_ = level.Info(logger).Log("msg", "fluent-bit secret yaml found, using yaml one.")
_ = level.Info(l).Log("msg", "fluent-bit secret yaml found, using yaml one.")
flag.StringVar(&configPath, "c", defaultYamlCfgPath, "The yaml config file path.")
}

if exitOnFailure {
_ = level.Warn(logger).Log("--exit-on-failure is deprecated. The process will exit no matter what if fluent-bit exits so this can safely be removed.")
_ = level.Warn(l).
Log("--exit-on-failure is deprecated. The process will exit no matter what if fluent-bit exits so this can safely be removed.")

Check failure on line 71 in cmd/fluent-watcher/fluentbit/main.go

View workflow job for this annotation

GitHub Actions / Run on Ubuntu

The line is 130 characters long, which exceeds the maximum of 120 characters. (lll)
}
if flbTerminationTimeout > 0 {
_ = level.Warn(logger).Log("--flb-timeout is deprecated. Consider setting the terminationGracePeriod field on the `(Cluster)FluentBit` instance.")
_ = level.Warn(l).
Log("--flb-timeout is deprecated. Consider setting the terminationGracePeriod field on the `(Cluster)FluentBit` instance.")

Check failure on line 75 in cmd/fluent-watcher/fluentbit/main.go

View workflow job for this annotation

GitHub Actions / Run on Ubuntu

The line is 126 characters long, which exceeds the maximum of 120 characters. (lll)
}

flag.Parse()
Expand All @@ -83,12 +86,12 @@
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
_ = level.Error(logger).Log("msg", "failed to start fluent-bit", "error", err)
_ = level.Error(l).Log("msg", "failed to start fluent-bit", "error", err)
os.Exit(1)
}
_ = level.Info(logger).Log("msg", "fluent-bit watcher started")
_ = level.Info(l).Log("msg", "fluent-bit watcher started")

grp, grpCtx := errgroup.WithContext(context.Background())
grp, grpCtx := errgroup.WithContext(ctx)
grp.Go(func() error {
// Watch the process. If it exits, we want to crash immediately.
defer cancel()
Expand Down Expand Up @@ -124,7 +127,7 @@
if !isValidEvent(event) {
continue
}
_ = level.Info(logger).Log("msg", "Config file changed, reloading...")
_ = level.Info(l).Log("msg", "Config file changed, reloading...")
if err := cmd.Process.Signal(syscall.SIGHUP); err != nil {
return fmt.Errorf("failed to reload config: %w", err)
}
Expand All @@ -142,12 +145,12 @@
// Always try to gracefully shut down fluent-bit. This will allow `cmd.Wait` above to finish
// and thus allow `grp.Wait` below to return.
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil && !errors.Is(err, os.ErrProcessDone) {
_ = level.Error(logger).Log("msg", "Failed to send SIGTERM to fluent-bit", "error", err)
_ = level.Error(l).Log("msg", "Failed to send SIGTERM to fluent-bit", "error", err)
// Do not exit on error here. The process might've died and that's okay.
}

if err := grp.Wait(); err != nil {
_ = level.Error(logger).Log("msg", "Failure during the run time of fluent-bit", "error", err)
_ = level.Error(l).Log("msg", "Failure during the run time of fluent-bit", "error", err)
os.Exit(1)
}
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/fluent-watcher/fluentd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ func main() {
flag.Parse()

logger = log.NewLogfmtLogger(os.Stdout)

timerCtx, timerCancel = context.WithCancel(context.Background())
ctx := context.Background()
timerCtx, timerCancel = context.WithCancel(ctx)

var g run.Group
{
// Termination handler.
g.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM))
g.Add(run.SignalHandler(ctx, os.Interrupt, syscall.SIGTERM))
}
{
// Watch the Fluentd, if the Fluentd not exists or stopped, restart it.
Expand All @@ -100,7 +100,7 @@ func main() {
return err
}

timerCtx, timerCancel = context.WithCancel(context.Background())
timerCtx, timerCancel = context.WithCancel(ctx)

// After the fluentd exit, fluentd watcher restarts it with an exponential
// back-off delay (1s, 2s, 4s, ...), that is capped at five minutes.
Expand Down
89 changes: 58 additions & 31 deletions controllers/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
Scheme *runtime.Scheme
}

// +kubebuilder:rbac:groups=fluentbit.fluent.io,resources=fluentbits;fluentbitconfigs;collectors;inputs;filters;outputs,verbs=get;list;watch;create;update;patch;delete

Check failure on line 45 in controllers/collector_controller.go

View workflow job for this annotation

GitHub Actions / Run on Ubuntu

The line is 167 characters long, which exceeds the maximum of 120 characters. (lll)
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterroles,verbs=create
Expand Down Expand Up @@ -85,15 +85,22 @@

// Check if Secret exists and requeue when not found
var sec corev1.Secret
if err := r.Get(ctx, client.ObjectKey{Namespace: co.Namespace, Name: co.Spec.FluentBitConfigName}, &sec); err != nil {
objKey := client.ObjectKey{Namespace: co.Namespace, Name: co.Spec.FluentBitConfigName}
if err := r.Get(ctx, objKey, &sec); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}

// Install RBAC resources for the filter plugin kubernetes
cr, sa, crb := operator.MakeRBACObjects(co.Name, co.Namespace, "collector", co.Spec.RBACRules, co.Spec.ServiceAccountAnnotations)
cr, sa, crb := operator.MakeRBACObjects(
co.Name,
co.Namespace,
"collector",
co.Spec.RBACRules,
co.Spec.ServiceAccountAnnotations,
)
// Deploy Fluent Bit Collector ClusterRole
if _, err := controllerutil.CreateOrPatch(ctx, r.Client, cr, r.mutate(cr, &co)); err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -127,14 +134,24 @@
func (r *CollectorReconciler) mutate(obj client.Object, co *fluentbitv1alpha2.Collector) controllerutil.MutateFn {
switch o := obj.(type) {
case *rbacv1.ClusterRole:
expected, _, _ := operator.MakeRBACObjects(co.Name, co.Namespace, "collector", co.Spec.RBACRules, co.Spec.ServiceAccountAnnotations)
expected, _, _ := operator.MakeRBACObjects(co.Name,
co.Namespace, "collector",
co.Spec.RBACRules,
co.Spec.ServiceAccountAnnotations,
)

return func() error {
o.Rules = expected.Rules
return nil
}
case *corev1.ServiceAccount:
_, expected, _ := operator.MakeRBACObjects(co.Name, co.Namespace, "collector", co.Spec.RBACRules, co.Spec.ServiceAccountAnnotations)
_, expected, _ := operator.MakeRBACObjects(
co.Name,
co.Namespace,
"collector",
co.Spec.RBACRules,
co.Spec.ServiceAccountAnnotations,
)

return func() error {
o.Annotations = expected.Annotations
Expand All @@ -144,7 +161,12 @@
return nil
}
case *rbacv1.ClusterRoleBinding:
_, _, expected := operator.MakeRBACObjects(co.Name, co.Namespace, "collector", co.Spec.RBACRules, co.Spec.ServiceAccountAnnotations)
_, _, expected := operator.MakeRBACObjects(co.Name,
co.Namespace,
"collector",
co.Spec.RBACRules,
co.Spec.ServiceAccountAnnotations,
)

return func() error {
o.RoleRef = expected.RoleRef
Expand Down Expand Up @@ -220,35 +242,40 @@
}

func (r *CollectorReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &corev1.ServiceAccount{}, collectorOwnerKey, func(rawObj client.Object) []string {
// grab the job object, extract the owner.
sa := rawObj.(*corev1.ServiceAccount)
owner := metav1.GetControllerOf(sa)
if owner == nil {
return nil
}
// Make sure it's a FluentBit. If so, return it.
if owner.APIVersion != fluentbitApiGVStr || owner.Kind != "Collector" {
return nil
}
return []string{owner.Name}
}); err != nil {
ctx := context.Background()
if err := mgr.GetFieldIndexer().IndexField(
ctx, &corev1.ServiceAccount{}, collectorOwnerKey,
func(rawObj client.Object) []string {
// grab the job object, extract the owner.
sa := rawObj.(*corev1.ServiceAccount)
owner := metav1.GetControllerOf(sa)
if owner == nil {
return nil
}
// Make sure it's a FluentBit. If so, return it.
if owner.APIVersion != fluentbitApiGVStr || owner.Kind != "Collector" {
return nil
}
return []string{owner.Name}
}); err != nil {
return err
}

if err := mgr.GetFieldIndexer().IndexField(context.Background(), &appsv1.StatefulSet{}, collectorOwnerKey, func(rawObj client.Object) []string {
// grab the job object, extract the owner.
sts := rawObj.(*appsv1.StatefulSet)
owner := metav1.GetControllerOf(sts)
if owner == nil {
return nil
}
// Make sure it's a FluentBit. If so, return it.
if owner.APIVersion != fluentbitApiGVStr || owner.Kind != "Collector" {
return nil
}
return []string{owner.Name}
}); err != nil {
if err := mgr.GetFieldIndexer().IndexField(
ctx, &appsv1.StatefulSet{}, collectorOwnerKey,
func(rawObj client.Object) []string {
// grab the job object, extract the owner.
sts := rawObj.(*appsv1.StatefulSet)
owner := metav1.GetControllerOf(sts)
if owner == nil {
return nil
}
// Make sure it's a FluentBit. If so, return it.
if owner.APIVersion != fluentbitApiGVStr || owner.Kind != "Collector" {
return nil
}
return []string{owner.Name}
}); err != nil {
return err
}

Expand Down
16 changes: 14 additions & 2 deletions controllers/fluent_controller_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,13 @@ func (r *FluentdReconciler) mutate(obj client.Object, fd *fluentdv1alpha1.Fluent
return nil
}
case *corev1.ServiceAccount:
_, expected, _ := operator.MakeRBACObjects(fd.Name, fd.Namespace, "fluentd", fd.Spec.RBACRules, fd.Spec.ServiceAccountAnnotations)
_, expected, _ := operator.MakeRBACObjects(
fd.Name,
fd.Namespace,
"fluentd",
fd.Spec.RBACRules,
fd.Spec.ServiceAccountAnnotations,
)

return func() error {
o.Labels = expected.Labels
Expand All @@ -132,7 +138,13 @@ func (r *FluentdReconciler) mutate(obj client.Object, fd *fluentdv1alpha1.Fluent
return nil
}
case *rbacv1.ClusterRoleBinding:
_, _, expected := operator.MakeRBACObjects(fd.Name, fd.Namespace, "fluentd", fd.Spec.RBACRules, fd.Spec.ServiceAccountAnnotations)
_, _, expected := operator.MakeRBACObjects(
fd.Name,
fd.Namespace,
"fluentd",
fd.Spec.RBACRules,
fd.Spec.ServiceAccountAnnotations,
)

return func() error {
o.RoleRef = expected.RoleRef
Expand Down
Loading
Loading