Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 76 additions & 18 deletions internal/controller/dragonfly_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"errors"
"fmt"
"net"
"strconv"
Expand Down Expand Up @@ -135,8 +136,12 @@ func (dfi *DragonflyInstance) checkReplicaRole(ctx context.Context, pod *corev1.
}
}

if masterIp != redisMasterIp && masterIp != pod.Labels[resources.MasterIpLabelKey] {
return false, nil
// for compatibily, to be remove in futur version
// check if the masterIp matches either the label (for compatibility) or the annotation
if masterIp != redisMasterIp {
if masterIp != pod.Labels[resources.MasterIpLabel] && masterIp != pod.Annotations[resources.MasterIp] {
return false, nil
}
}

return true, nil
Expand Down Expand Up @@ -202,7 +207,7 @@ func (dfi *DragonflyInstance) checkAndConfigureReplicas(ctx context.Context, mas
if err != nil {
return err
}
// configuring to the right master
// Configure to the right master if not correct
if !ok {
dfi.log.Info("configuring pod as replica to the right master", "pod", pod.Name)
if err := dfi.configureReplica(ctx, &pod, masterIp); err != nil {
Expand All @@ -218,6 +223,7 @@ func (dfi *DragonflyInstance) checkAndConfigureReplicas(ctx context.Context, mas
}
}

dfi.log.Info("All pods are configured correctly", "dfi", dfi.df.Name)
return nil
}

Expand Down Expand Up @@ -299,6 +305,33 @@ func (dfi *DragonflyInstance) getMaster(ctx context.Context) (*corev1.Pod, error
return &healthyMasters[0], nil
}

// getMasterIp gets the IP of the master pod, checking annotations first for IPv6 support
func (dfi *DragonflyInstance) getMasterIp(ctx context.Context) (string, error) {
dfi.log.Info("retrieving IP of the master")
pods, err := dfi.getPods(ctx)
if err != nil {
return "", err
}

for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning &&
pod.Status.ContainerStatuses[0].Ready &&
pod.Labels[resources.RoleLabelKey] == resources.Master {

masterIp, hasMasterIp := pod.Annotations[resources.MasterIp]
if hasMasterIp {
dfi.log.Info("Retrieved Master IP from annotation", "masterIp", masterIp)
return masterIp, nil
}

masterIp = pod.Status.PodIP
return masterIp, nil
}
}

return "", errors.New("could not find master")
}

// getReplicas gets all the replicas for the dragonfly instance
func (dfi *DragonflyInstance) getReplicas(ctx context.Context) (*corev1.PodList, error) {
var replicas corev1.PodList
Expand Down Expand Up @@ -380,8 +413,11 @@ func (dfi *DragonflyInstance) replicaOf(ctx context.Context, pod *corev1.Pod, ma
})
defer redisClient.Close()

dfi.log.Info("trying to invoke SLAVE OF command", "pod", pod.Name, "master", masterIp, "addr", redisClient.Options().Addr)
resp, err := redisClient.SlaveOf(ctx, masterIp, fmt.Sprint(resources.DragonflyAdminPort)).Result()
// Sanitize masterIp in case ipv6
masterIp = sanitizeIp(masterIp)

dfi.log.Info("Trying to invoke SLAVE OF command", "pod", pod.Name, "master", masterIp, "addr", redisClient.Options().Addr)
resp, err := redisClient.SlaveOf(ctx, masterIp, strconv.Itoa(resources.DragonflyAdminPort)).Result()
if err != nil {
return fmt.Errorf("error running SLAVE OF command: %s", err)
}
Expand All @@ -390,12 +426,20 @@ func (dfi *DragonflyInstance) replicaOf(ctx context.Context, pod *corev1.Pod, ma
return fmt.Errorf("response of `SLAVE OF` on replica is not OK: %s", resp)
}

dfi.log.Info("marking pod role as replica", "pod", pod.Name)
patchFrom := client.MergeFrom(pod.DeepCopy())
dfi.log.Info("Marking pod role as replica", "pod", pod.Name, "masterIp", masterIp)
pod.Labels[resources.RoleLabelKey] = resources.Replica
pod.Labels[resources.MasterIpLabelKey] = masterIp
if err := dfi.client.Patch(ctx, pod, patchFrom); err != nil {
return fmt.Errorf("failed to update the role label on the pod: %w", err)
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[resources.MasterIp] = masterIp

// for compatibily, to be remove in futur version
if !strings.Contains(masterIp, ":") {
pod.Labels[resources.MasterIpLabel] = masterIp
}

if err := dfi.client.Update(ctx, pod); err != nil {
return fmt.Errorf("could not update replica metadata: %w", err)
}

return nil
Expand All @@ -418,12 +462,22 @@ func (dfi *DragonflyInstance) replicaOfNoOne(ctx context.Context, pod *corev1.Po
return fmt.Errorf("response of `SLAVE OF NO ONE` on master is not OK: %s", resp)
}

dfi.log.Info("marking pod role as master", "pod", pod.Name)
patchFrom := client.MergeFrom(pod.DeepCopy())
masterIp := pod.Status.PodIP

dfi.log.Info("Marking pod role as master", "pod", pod.Name, "masterIp", masterIp)
pod.Labels[resources.RoleLabelKey] = resources.Master
delete(pod.Labels, resources.MasterIpLabelKey)
if err := dfi.client.Patch(ctx, pod, patchFrom); err != nil {
return fmt.Errorf("failed to update the role label on the pod: %w", err)
// for compatibily, to be remove in futur version
if !strings.Contains(masterIp, ":") {
pod.Labels[resources.MasterIpLabel] = masterIp
}

if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[resources.MasterIp] = masterIp

if err := dfi.client.Update(ctx, pod); err != nil {
return err
}

return nil
Expand Down Expand Up @@ -654,12 +708,16 @@ func (dfi *DragonflyInstance) replTakeover(ctx context.Context, newMaster *corev
return fmt.Errorf("response of `REPLTAKEOVER` on replica is not OK: %s", resp)
}

patchFrom := client.MergeFrom(newMaster.DeepCopy())
masterIp := newMaster.Status.PodIP

newMaster.Labels[resources.RoleLabelKey] = resources.Master
delete(newMaster.Labels, resources.MasterIpLabelKey)
if newMaster.Annotations == nil {
newMaster.Annotations = make(map[string]string)
}
newMaster.Annotations[resources.MasterIp] = masterIp

// update the label on the pod
if err := dfi.client.Patch(ctx, newMaster, patchFrom); err != nil {
if err := dfi.client.Update(ctx, newMaster); err != nil {
return fmt.Errorf("failed to update the role label on the pod: %w", err)
}

Expand Down
22 changes: 22 additions & 0 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package controller
import (
"errors"
"fmt"
"strings"

"github.com/dragonflydb/dragonfly-operator/internal/resources"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -172,3 +174,23 @@ func isMasterError(err error) bool {
errors.Is(err, ErrNoHealthyMaster) ||
errors.Is(err, ErrIncorrectMasters)
}

// Helper function to parse Redis INFO data
func parseRedisInfo(info string) map[string]string {
data := map[string]string{}
for _, line := range strings.Split(info, "\n") {
if line == "" || strings.HasPrefix(line, "#") {
continue
}
kv := strings.Split(line, ":")
if len(kv) == 2 {
data[kv[0]] = strings.TrimSuffix(kv[1], "\r")
}
}
return data
}

// sanitizeIp Ipv6
func sanitizeIp(masterIp string) string {
return strings.Trim(masterIp, "[]")
}
7 changes: 5 additions & 2 deletions internal/resources/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ const (
KubernetesPartOfLabelKey = "app.kubernetes.io/part-of"
KubernetesPartOf = "dragonfly"

DragonflyNameLabelKey = "app"
MasterIpLabel string = "master-ip"
DragonflyNameLabelKey = "app"

MasterIpLabelKey = "master-ip"
MasterIp = "operator.dragonflydb.io/masterIP"

RoleLabelKey = "role"

Expand All @@ -98,6 +99,8 @@ const (
var DefaultDragonflyArgs = []string{
"--alsologtostderr",
"--primary_port_http_enabled=false",
"--bind=::",
"--admin_bind=::",
fmt.Sprintf("--admin_port=%d", DragonflyAdminPort),
"--admin_nopass",
}
Loading