Skip to content

Commit 8574146

Browse files
xekcursoragent
andcommitted
Auto-enable and remove federation plugins on RabbitmqCluster
When a RabbitMQFederation is reconciled, the controller now ensures that rabbitmq_federation and rabbitmq_federation_management plugins are enabled on the target RabbitmqCluster CR's additionalPlugins. When the plugins are first added, reconciliation is requeued for 30s to allow RabbitMQ to load them before configuring upstreams/policies. On deletion, if no other RabbitMQFederation CRs reference the same cluster, the federation plugins are removed from additionalPlugins. This causes RabbitMQ to restart without those plugins, ensuring a clean state. Assisted-by: Claude 4.6 Opus Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 296ae86 commit 8574146

2 files changed

Lines changed: 168 additions & 11 deletions

File tree

internal/controller/rabbitmq/rabbitmq_controller.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,16 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ct
380380
return ctrl.Result{}, fmt.Errorf("error configuring RabbitmqCluster: %w", err)
381381
}
382382

383+
// Preserve additionalPlugins that may have been set by the federation controller
384+
// on the existing RabbitmqCluster CR, since MarshalInto from the RabbitMq spec
385+
// does not include them.
386+
existingCluster := &rabbitmqv2.RabbitmqCluster{}
387+
if err := r.Client.Get(ctx, types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, existingCluster); err == nil {
388+
if len(existingCluster.Spec.Rabbitmq.AdditionalPlugins) > 0 && len(rabbitmqCluster.Spec.Rabbitmq.AdditionalPlugins) == 0 {
389+
rabbitmqCluster.Spec.Rabbitmq.AdditionalPlugins = existingCluster.Spec.Rabbitmq.AdditionalPlugins
390+
}
391+
}
392+
383393
rabbitmqImplCluster := impl.NewRabbitMqCluster(rabbitmqCluster, 5)
384394
rmqres, rmqerr := rabbitmqImplCluster.CreateOrPatch(ctx, helper)
385395
if rmqerr != nil {

internal/controller/rabbitmq/rabbitmqfederation_controller.go

Lines changed: 158 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
rabbitmqapi "github.com/openstack-k8s-operators/infra-operator/pkg/rabbitmq/api"
3333
condition "github.com/openstack-k8s-operators/lib-common/modules/common/condition"
3434
helper "github.com/openstack-k8s-operators/lib-common/modules/common/helper"
35+
rabbitmqclusterv2 "github.com/rabbitmq/cluster-operator/v2/api/v1beta1"
3536
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
3637
"k8s.io/apimachinery/pkg/runtime"
3738
"k8s.io/apimachinery/pkg/types"
@@ -57,7 +58,7 @@ type RabbitMQFederationReconciler struct {
5758
//+kubebuilder:rbac:groups=rabbitmq.openstack.org,resources=rabbitmqvhosts/finalizers,verbs=update
5859
//+kubebuilder:rbac:groups=rabbitmq.openstack.org,resources=rabbitmqs,verbs=get;list;watch;update;patch
5960
//+kubebuilder:rbac:groups=rabbitmq.openstack.org,resources=rabbitmqs/finalizers,verbs=update
60-
//+kubebuilder:rbac:groups=rabbitmq.com,resources=rabbitmqclusters,verbs=get;list;watch
61+
//+kubebuilder:rbac:groups=rabbitmq.com,resources=rabbitmqclusters,verbs=get;list;watch;update;patch
6162
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch
6263

6364
// Reconcile reconciles a RabbitMQFederation object
@@ -90,9 +91,8 @@ func (r *RabbitMQFederationReconciler) Reconcile(ctx context.Context, req ctrl.R
9091
// Restore condition timestamps if they haven't changed
9192
condition.RestoreLastTransitionTimes(&instance.Status.Conditions, savedConditions)
9293

93-
if instance.Status.Conditions.IsUnknown(condition.ReadyCondition) {
94-
instance.Status.Conditions.Set(instance.Status.Conditions.Mirror(condition.ReadyCondition))
95-
}
94+
// Always mirror the ReadyCondition from the federation-specific condition
95+
instance.Status.Conditions.Set(instance.Status.Conditions.Mirror(condition.ReadyCondition))
9696
if err := h.PatchInstance(ctx, instance); err != nil {
9797
Log.Error(err, "Failed to patch instance")
9898
}
@@ -221,6 +221,23 @@ func (r *RabbitMQFederationReconciler) reconcileNormal(ctx context.Context, inst
221221
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, err
222222
}
223223

224+
// Ensure federation plugins are enabled on the RabbitmqCluster
225+
pluginsUpdated, err := r.ensureFederationPlugins(ctx, instance.Spec.RabbitmqClusterName, instance.Namespace)
226+
if err != nil {
227+
instance.Status.Conditions.Set(condition.FalseCondition(
228+
rabbitmqv1.RabbitMQFederationReadyCondition,
229+
condition.ErrorReason,
230+
condition.SeverityWarning,
231+
rabbitmqv1.RabbitMQFederationReadyErrorMessage,
232+
fmt.Sprintf("failed to enable federation plugins: %v", err)))
233+
return ctrl.Result{RequeueAfter: time.Duration(10) * time.Second}, err
234+
}
235+
if pluginsUpdated {
236+
// Plugins were just enabled - requeue to give RabbitMQ time to load them
237+
Log.Info("Federation plugins enabled, requeueing to allow RabbitMQ to load them")
238+
return ctrl.Result{RequeueAfter: time.Duration(30) * time.Second}, nil
239+
}
240+
224241
// Get RabbitMQ cluster connection details
225242
apiClient, err := r.getRabbitMQClient(ctx, instance.Spec.RabbitmqClusterName, instance.Namespace)
226243
if err != nil {
@@ -287,10 +304,13 @@ func (r *RabbitMQFederationReconciler) reconcileNormal(ctx context.Context, inst
287304

288305
Log.Info("Created/updated federation policy", "policy", policyName, "vhost", vhostName)
289306

290-
// Set ready condition
307+
// Set ready conditions
291308
instance.Status.Conditions.Set(condition.TrueCondition(
292309
rabbitmqv1.RabbitMQFederationReadyCondition,
293310
rabbitmqv1.RabbitMQFederationReadyMessage))
311+
instance.Status.Conditions.Set(condition.TrueCondition(
312+
condition.ReadyCondition,
313+
rabbitmqv1.RabbitMQFederationReadyMessage))
294314

295315
return ctrl.Result{}, nil
296316
}
@@ -357,6 +377,12 @@ func (r *RabbitMQFederationReconciler) reconcileDelete(ctx context.Context, inst
357377
Log.Info("Deleted federation upstream", "upstream", instance.Spec.UpstreamName, "vhost", vhostName)
358378
}
359379

380+
// Remove federation plugins if no other federations reference this cluster
381+
if err := r.removeFederationPluginsIfUnused(ctx, instance.Spec.RabbitmqClusterName, instance.Namespace, instance.Name); err != nil {
382+
Log.Info("Failed to remove federation plugins", "cluster", instance.Spec.RabbitmqClusterName, "error", err)
383+
// Continue anyway - don't block deletion on cleanup failures
384+
}
385+
360386
// Remove finalizer from vhost if VhostRef is set
361387
if instance.Spec.VhostRef != "" {
362388
vhostFinalizer := rabbitmqv1.FederationVhostFinalizerPrefix + instance.Name
@@ -468,6 +494,17 @@ func (r *RabbitMQFederationReconciler) buildUpstreamURI(ctx context.Context, ins
468494

469495
// getRabbitMQClient creates a RabbitMQ API client for the specified cluster
470496
func (r *RabbitMQFederationReconciler) getRabbitMQClient(ctx context.Context, clusterName, namespace string) (*rabbitmqapi.Client, error) {
497+
h, err := helper.NewHelper(&rabbitmqv1.RabbitMQFederation{}, r.Client, r.Kclient, r.Scheme, log.FromContext(ctx))
498+
if err != nil {
499+
return nil, fmt.Errorf("failed to create helper: %w", err)
500+
}
501+
502+
// Get the RabbitmqCluster CR to determine TLS settings
503+
rmqCluster := &rabbitmqclusterv2.RabbitmqCluster{}
504+
if err := r.Get(ctx, types.NamespacedName{Name: clusterName, Namespace: namespace}, rmqCluster); err != nil {
505+
return nil, fmt.Errorf("failed to get RabbitmqCluster %s: %w", clusterName, err)
506+
}
507+
471508
// Get the RabbitMQ cluster credentials
472509
secretName := fmt.Sprintf("%s-default-user", clusterName)
473510
secret := &corev1.Secret{}
@@ -479,18 +516,128 @@ func (r *RabbitMQFederationReconciler) getRabbitMQClient(ctx context.Context, cl
479516
if !ok {
480517
return nil, fmt.Errorf("secret %s does not contain 'username' key", secretName)
481518
}
482-
483519
password, ok := secret.Data["password"]
484520
if !ok {
485521
return nil, fmt.Errorf("secret %s does not contain 'password' key", secretName)
486522
}
487523

488-
// Build the management API URL
489-
serviceName := fmt.Sprintf("%s.%s.svc.cluster.local", clusterName, namespace)
490-
baseURL := fmt.Sprintf("http://%s:15672", serviceName)
524+
// Build the management API URL using the shared helper (handles TLS)
525+
baseURL := getManagementURL(rmqCluster, secret)
526+
527+
// Get TLS CA cert if TLS is enabled
528+
caCert, err := getTLSCACert(ctx, h, rmqCluster, namespace)
529+
if err != nil {
530+
return nil, fmt.Errorf("failed to get TLS CA cert: %w", err)
531+
}
532+
533+
tlsEnabled := rmqCluster.Spec.TLS.SecretName != ""
534+
return rabbitmqapi.NewClient(baseURL, string(username), string(password), tlsEnabled, caCert), nil
535+
}
536+
537+
// federationPlugins are the RabbitMQ plugins required for federation
538+
var federationPlugins = []rabbitmqclusterv2.Plugin{
539+
"rabbitmq_federation",
540+
"rabbitmq_federation_management",
541+
}
542+
543+
// ensureFederationPlugins ensures that the federation plugins are enabled on the RabbitmqCluster CR.
544+
// The rabbitmq controller preserves additionalPlugins set here when reconciling.
545+
// Returns true if the cluster was updated (plugins were added).
546+
func (r *RabbitMQFederationReconciler) ensureFederationPlugins(ctx context.Context, clusterName, namespace string) (bool, error) {
547+
Log := log.FromContext(ctx)
548+
549+
rmqCluster := &rabbitmqclusterv2.RabbitmqCluster{}
550+
if err := r.Get(ctx, types.NamespacedName{Name: clusterName, Namespace: namespace}, rmqCluster); err != nil {
551+
return false, fmt.Errorf("failed to get RabbitmqCluster %s: %w", clusterName, err)
552+
}
553+
554+
existing := rmqCluster.Spec.Rabbitmq.AdditionalPlugins
555+
updated := false
556+
for _, plugin := range federationPlugins {
557+
found := false
558+
for _, p := range existing {
559+
if p == plugin {
560+
found = true
561+
break
562+
}
563+
}
564+
if !found {
565+
existing = append(existing, plugin)
566+
updated = true
567+
}
568+
}
569+
570+
if updated {
571+
rmqCluster.Spec.Rabbitmq.AdditionalPlugins = existing
572+
if err := r.Update(ctx, rmqCluster); err != nil {
573+
return false, fmt.Errorf("failed to update RabbitmqCluster %s with federation plugins: %w", clusterName, err)
574+
}
575+
Log.Info("Enabled federation plugins on RabbitmqCluster", "cluster", clusterName,
576+
"plugins", federationPlugins)
577+
}
578+
579+
return updated, nil
580+
}
581+
582+
// removeFederationPluginsIfUnused removes federation plugins from the RabbitmqCluster CR
583+
// if no other RabbitMQFederation CRs reference this cluster.
584+
func (r *RabbitMQFederationReconciler) removeFederationPluginsIfUnused(ctx context.Context, clusterName, namespace, excludeFederationName string) error {
585+
Log := log.FromContext(ctx)
586+
587+
// Check if any other RabbitMQFederation CRs still reference this cluster
588+
fedList := &rabbitmqv1.RabbitMQFederationList{}
589+
if err := r.List(ctx, fedList, client.InNamespace(namespace)); err != nil {
590+
return fmt.Errorf("failed to list RabbitMQFederation resources: %w", err)
591+
}
592+
593+
for i := range fedList.Items {
594+
fed := &fedList.Items[i]
595+
// Skip the federation being deleted and any that are also being deleted
596+
if fed.Name == excludeFederationName || !fed.DeletionTimestamp.IsZero() {
597+
continue
598+
}
599+
if fed.Spec.RabbitmqClusterName == clusterName {
600+
Log.Info("Other federation still references this cluster, keeping plugins",
601+
"cluster", clusterName, "federation", fed.Name)
602+
return nil
603+
}
604+
}
605+
606+
// No other federations reference this cluster - remove the plugins
607+
rmqCluster := &rabbitmqclusterv2.RabbitmqCluster{}
608+
if err := r.Get(ctx, types.NamespacedName{Name: clusterName, Namespace: namespace}, rmqCluster); err != nil {
609+
if k8s_errors.IsNotFound(err) {
610+
return nil
611+
}
612+
return fmt.Errorf("failed to get RabbitmqCluster %s: %w", clusterName, err)
613+
}
614+
615+
filtered := make([]rabbitmqclusterv2.Plugin, 0, len(rmqCluster.Spec.Rabbitmq.AdditionalPlugins))
616+
removed := false
617+
for _, p := range rmqCluster.Spec.Rabbitmq.AdditionalPlugins {
618+
isFedPlugin := false
619+
for _, fp := range federationPlugins {
620+
if p == fp {
621+
isFedPlugin = true
622+
break
623+
}
624+
}
625+
if !isFedPlugin {
626+
filtered = append(filtered, p)
627+
} else {
628+
removed = true
629+
}
630+
}
631+
632+
if removed {
633+
rmqCluster.Spec.Rabbitmq.AdditionalPlugins = filtered
634+
if err := r.Update(ctx, rmqCluster); err != nil {
635+
return fmt.Errorf("failed to remove federation plugins from RabbitmqCluster %s: %w", clusterName, err)
636+
}
637+
Log.Info("Removed federation plugins from RabbitmqCluster", "cluster", clusterName)
638+
}
491639

492-
// Create and return the client
493-
return rabbitmqapi.NewClient(baseURL, string(username), string(password), false, nil), nil
640+
return nil
494641
}
495642

496643
// SetupWithManager sets up the controller with the Manager.

0 commit comments

Comments
 (0)