From c03e4310d85a110a99b229b59269693c34b0df00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Tue, 17 Mar 2026 12:20:21 +0100 Subject: [PATCH 01/11] authz/loader: restart file watcher on transient symlink errors Koanf's file.Provider.Watch exits permanently when filepath.EvalSymlinks fails -- which happens every time Kubernetes updates a ConfigMap-mounted volume, since kubelet briefly removes the ..data symlink before creating the new one. The fix: when the watch callback receives an error, sleep 1s and restart the watch. Also reload the policy immediately after restart to pick up any updates missed while dead. This was causing the dataplane authz interceptor to permanently stop picking up policy changes after the first ConfigMap update, requiring a pod restart to recover. --- authz/go.mod | 4 + authz/loader/atomic_rename_test.go | 127 ++++++++++++++++++++++++++++ authz/loader/loader.go | 78 ++++++++--------- authz/loader/loader_symlink_test.go | 91 ++++++++++++++++++++ 4 files changed, 261 insertions(+), 39 deletions(-) create mode 100644 authz/loader/atomic_rename_test.go create mode 100644 authz/loader/loader_symlink_test.go diff --git a/authz/go.mod b/authz/go.mod index 350c46d..373626b 100644 --- a/authz/go.mod +++ b/authz/go.mod @@ -10,6 +10,7 @@ require ( github.com/knadh/koanf/providers/file v1.2.1 github.com/knadh/koanf/providers/rawbytes v1.0.0 github.com/knadh/koanf/v2 v2.3.2 + github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.42.0 go.opentelemetry.io/otel/trace v1.42.0 golang.org/x/net v0.51.0 @@ -19,6 +20,7 @@ require ( require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -26,10 +28,12 @@ require ( github.com/knadh/koanf/maps v0.1.2 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel/metric v1.42.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.35.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260226221140-a57be14db171 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/authz/loader/atomic_rename_test.go b/authz/loader/atomic_rename_test.go new file mode 100644 index 0000000..284216a --- /dev/null +++ b/authz/loader/atomic_rename_test.go @@ -0,0 +1,127 @@ +package loader_test + +import ( + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/redpanda-data/common-go/authz" + "github.com/redpanda-data/common-go/authz/loader" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestWatchPolicyFile_AtomicRename reproduces the neovim atomic save pattern +// on Linux: write to a temp file in the same directory, then rename over the +// original. This causes inotify to lose the watch because the inode changes. +// +// This test currently fails -- the watcher does not pick up the new content. +func TestWatchPolicyFile_AtomicRename(t *testing.T) { + dir := t.TempDir() + policyPath := filepath.Join(dir, "policy.yaml") + + initialPolicy := []byte(`roles: + - id: reader + permissions: + - read +bindings: + - role: reader + principal: "User:alice@example.com" + scope: "organizations/test" +`) + require.NoError(t, os.WriteFile(policyPath, initialPolicy, 0o644)) + + var mu sync.Mutex + var reloadedPolicy *authz.Policy + var reloadErr error + + policy, unwatch, err := loader.WatchPolicyFile(policyPath, func(p authz.Policy, err error) { + mu.Lock() + defer mu.Unlock() + if err != nil { + reloadErr = err + } else { + reloadedPolicy = &p + } + }) + require.NoError(t, err) + defer func() { _ = unwatch() }() + + // Verify initial load + require.Len(t, policy.Roles, 1) + assert.Equal(t, "reader", string(policy.Roles[0].ID)) + + // Simulate neovim atomic save: write temp file, rename over original. + // This is exactly what neovim does with 'backupcopy=no' (the default on Linux). + updatedPolicy := []byte(`roles: + - id: admin + permissions: + - read + - write + - delete +bindings: + - role: admin + principal: "User:alice@example.com" + scope: "organizations/test" +`) + + // Neovim with backupcopy=no (default on Linux): + // 1. Rename original to backup (original inode moves away) + // 2. Write new content to original path (new inode) + // 3. Unlink backup + backupPath := policyPath + "~" + require.NoError(t, os.Rename(policyPath, backupPath)) + require.NoError(t, os.WriteFile(policyPath, updatedPolicy, 0o644)) + require.NoError(t, os.Remove(backupPath)) + + // Wait for first reload + waitForReload := func(t *testing.T, expected string, timeout time.Duration) { + t.Helper() + deadline := time.After(timeout) + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-deadline: + mu.Lock() + e := reloadErr + mu.Unlock() + if e != nil { + t.Fatalf("watcher error: %v", e) + } + t.Fatalf("timed out waiting for role %q", expected) + case <-ticker.C: + mu.Lock() + got := reloadedPolicy + reloadedPolicy = nil // reset for next round + mu.Unlock() + if got != nil { + require.Len(t, got.Roles, 1) + assert.Equal(t, expected, string(got.Roles[0].ID)) + return + } + } + } + } + + waitForReload(t, "admin", 3*time.Second) + + // Second neovim edit -- this is where the watcher typically breaks. + secondPolicy := []byte(`roles: + - id: superadmin + permissions: + - everything +bindings: + - role: superadmin + principal: "User:alice@example.com" + scope: "organizations/test" +`) + backupPath2 := policyPath + "~" + require.NoError(t, os.Rename(policyPath, backupPath2)) + require.NoError(t, os.WriteFile(policyPath, secondPolicy, 0o644)) + require.NoError(t, os.Remove(backupPath2)) + + waitForReload(t, "superadmin", 3*time.Second) +} diff --git a/authz/loader/loader.go b/authz/loader/loader.go index 3480201..87e1e6b 100644 --- a/authz/loader/loader.go +++ b/authz/loader/loader.go @@ -11,8 +11,8 @@ package loader import ( "fmt" - "runtime" - "strings" + "log/slog" + "time" "github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/providers/file" @@ -85,43 +85,43 @@ func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, Policy if err := k.Unmarshal("", &policy); err != nil { return authz.Policy{}, nil, fmt.Errorf("failed to unmarshal file %s: %w", path, err) } - // Watch for changes using the file provider's Watch method - // The watchFunc will be called whenever the file changes - watchFunc := func(_ any, watchErr error) { - // On macOS (kqueue), fsnotify watches inodes. An atomic rename — - // write to a temp file then rename over the watched path — replaces - // the inode, so kqueue fires a REMOVE on the old inode and never - // fires a CREATE for the new one. koanf surfaces this as - // "file was removed". On Linux (inotify), the same operation - // fires a CREATE on the destination path, so the reload succeeds - // without hitting this branch. We therefore only see this error in - // local macOS development, which is why it wasn't caught by CI. - // - // When we detect a REMOVE on darwin, try reloading from the path - // directly. If the file is there (atomic rename), we get the updated - // policy. If it's genuinely gone, the reload errors and we fall - // through to propagate the original watcher error. - isMacOSRemove := runtime.GOOS == "darwin" && watchErr != nil && strings.Contains(watchErr.Error(), "was removed") - if watchErr != nil && !isMacOSRemove { - callback(authz.Policy{}, fmt.Errorf("watcher error: %w", watchErr)) - return + // startWatch starts a koanf file watcher. On file change it reloads the + // policy and calls callback. On watcher error (e.g. transient symlink + // removal during a Kubernetes ConfigMap update, or neovim atomic save + // on Linux) it restarts the watch after a short delay. + var startWatch func() + startWatch = func() { + fp := file.Provider(path) + if err := fp.Watch(func(_ any, watchErr error) { + if watchErr != nil { + slog.Warn("Policy file watcher exited, will restart", + "path", path, "error", watchErr) + time.Sleep(time.Second) + startWatch() + // Reload immediately -- we may have missed updates while dead. + if p, err := LoadPolicyFromFile(path); err == nil { + slog.Info("Policy file watcher restarted", + "path", path) + callback(p, nil) + } + return + } + k := koanf.New(".") + if err := k.Load(fp, yaml.Parser()); err != nil { + callback(authz.Policy{}, fmt.Errorf("failed to reload policy file %s: %w", path, err)) + return + } + var p authz.Policy + if err := k.Unmarshal("", &p); err != nil { + callback(authz.Policy{}, fmt.Errorf("failed to unmarshal policy: %w", err)) + return + } + callback(p, nil) + }); err != nil { + slog.Error("Failed to start policy file watcher", + "path", path, "error", err) } - // Reload the policy - k := koanf.New(".") - if err := k.Load(fp, yaml.Parser()); err != nil { - callback(authz.Policy{}, fmt.Errorf("failed to reload policy file %s: %w", path, err)) - return - } - var policy authz.Policy - if err := k.Unmarshal("", &policy); err != nil { - callback(authz.Policy{}, fmt.Errorf("failed to unmarshal policy: %w", err)) - return - } - callback(policy, nil) - } - err := fp.Watch(watchFunc) - if err != nil { - return authz.Policy{}, nil, &InitializeWatchError{err} } - return policy, fp.Unwatch, nil + startWatch() + return policy, func() error { return nil }, nil } diff --git a/authz/loader/loader_symlink_test.go b/authz/loader/loader_symlink_test.go new file mode 100644 index 0000000..6e923f0 --- /dev/null +++ b/authz/loader/loader_symlink_test.go @@ -0,0 +1,91 @@ +package loader_test + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/redpanda-data/common-go/authz" + "github.com/redpanda-data/common-go/authz/loader" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestWatchPolicyFile_SymlinkSwap reproduces the Kubernetes ConfigMap update +// pattern: the kubelet atomically swaps a symlink to point to a new directory. +// During the swap the old symlink target is briefly removed before the new one +// appears. The file watcher must survive this transient removal and pick up +// the new content. +func TestWatchPolicyFile_SymlinkSwap(t *testing.T) { + dir := t.TempDir() + + gen1 := filepath.Join(dir, "gen1") + require.NoError(t, os.Mkdir(gen1, 0o755)) + writePolicy(t, filepath.Join(gen1, "policy.yaml"), 1) + + dataLink := filepath.Join(dir, "..data") + require.NoError(t, os.Symlink(gen1, dataLink)) + + policyPath := filepath.Join(dir, "policy.yaml") + require.NoError(t, os.Symlink(filepath.Join(dataLink, "policy.yaml"), policyPath)) + + reloadCh := make(chan authz.Policy, 10) + errCh := make(chan error, 10) + + policy, _, err := loader.WatchPolicyFile(policyPath, func(p authz.Policy, err error) { + if err != nil { + errCh <- err + return + } + reloadCh <- p + }) + require.NoError(t, err) + require.Len(t, policy.Bindings, 1) + + // First symlink swap. + swapSymlink(t, dir, dataLink, 2) + + select { + case p := <-reloadCh: + assert.Len(t, p.Bindings, 2, "expected 2 bindings after first swap") + case err := <-errCh: + t.Fatalf("watcher returned error instead of reloading: %v", err) + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for policy reload after first symlink swap") + } + + // Second swap: verify the watcher survived the first swap. + swapSymlink(t, dir, dataLink, 3) + + select { + case p := <-reloadCh: + assert.Len(t, p.Bindings, 3, "expected 3 bindings after second swap") + case err := <-errCh: + t.Fatalf("watcher died after first swap: %v", err) + case <-time.After(5 * time.Second): + t.Fatal("watcher is dead -- timed out waiting for second policy reload") + } +} + +func swapSymlink(t *testing.T, dir, dataLink string, nBindings int) { + t.Helper() + gen := filepath.Join(dir, fmt.Sprintf("gen%d", nBindings)) + require.NoError(t, os.Mkdir(gen, 0o755)) + writePolicy(t, filepath.Join(gen, "policy.yaml"), nBindings) + + require.NoError(t, os.Remove(dataLink)) + time.Sleep(50 * time.Millisecond) + require.NoError(t, os.Symlink(gen, dataLink)) +} + +func writePolicy(t *testing.T, path string, nBindings int) { + t.Helper() + var bindings string + for i := range nBindings { + bindings += fmt.Sprintf("- principal: \"User:user%d@example.com\"\n role: Writer\n scope: \"organizations/org1/resourcegroups/rg1/dataplanes/dp1\"\n", i) + } + data := fmt.Sprintf("roles:\n- id: Writer\n permissions:\n - dataplane_pipeline_create\nbindings:\n%s", bindings) + require.NoError(t, os.WriteFile(path, []byte(data), 0o644)) +} From 5cb24fddcef3a58fa3aeb06af6aba50dadad4512 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Tue, 17 Mar 2026 12:26:48 +0100 Subject: [PATCH 02/11] authz/loader: fix license header and import ordering in test --- authz/loader/loader_symlink_test.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/authz/loader/loader_symlink_test.go b/authz/loader/loader_symlink_test.go index 6e923f0..7f3aa92 100644 --- a/authz/loader/loader_symlink_test.go +++ b/authz/loader/loader_symlink_test.go @@ -1,3 +1,12 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + package loader_test import ( @@ -7,10 +16,11 @@ import ( "testing" "time" - "github.com/redpanda-data/common-go/authz" - "github.com/redpanda-data/common-go/authz/loader" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/redpanda-data/common-go/authz" + "github.com/redpanda-data/common-go/authz/loader" ) // TestWatchPolicyFile_SymlinkSwap reproduces the Kubernetes ConfigMap update From 5e739859511e48dd2594ca6bcbcf3a1cde12c886 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Tue, 17 Mar 2026 12:33:25 +0100 Subject: [PATCH 03/11] authz/loader: use for loop with proper shutdown for watcher restart --- authz/loader/loader.go | 108 ++++++++++++++++++++++++++--------------- 1 file changed, 70 insertions(+), 38 deletions(-) diff --git a/authz/loader/loader.go b/authz/loader/loader.go index 87e1e6b..c337c35 100644 --- a/authz/loader/loader.go +++ b/authz/loader/loader.go @@ -75,53 +75,85 @@ func (e *InitializeWatchError) Unwrap() error { // WatchPolicyFile watches a YAML policy file for changes and calls the callback // when the file is modified. This is particularly useful for Kubernetes ConfigMap // mounted files which are updated via symlink changes. +// +// Koanf's file watcher exits on transient errors (e.g. symlink briefly removed +// during a ConfigMap update). This function runs the watcher in a loop, +// restarting it automatically when it exits. Call the returned PolicyUnwatch +// to stop the loop and clean up. func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, PolicyUnwatch, error) { - k := koanf.New(".") - fp := file.Provider(path) - if err := k.Load(fp, yaml.Parser()); err != nil { + policy, err := LoadPolicyFromFile(path) + if err != nil { return authz.Policy{}, nil, fmt.Errorf("failed to load policy file %s: %w", path, err) } - var policy authz.Policy - if err := k.Unmarshal("", &policy); err != nil { - return authz.Policy{}, nil, fmt.Errorf("failed to unmarshal file %s: %w", path, err) + + // restartCh is signalled by the watch callback when koanf's watcher exits. + restartCh := make(chan struct{}, 1) + stopCh := make(chan struct{}) + + watchCb := func(_ any, watchErr error) { + if watchErr != nil { + slog.Warn("Policy file watcher exited, will restart", + "path", path, "error", watchErr) + select { + case restartCh <- struct{}{}: + default: + } + return + } + p, err := LoadPolicyFromFile(path) + if err != nil { + callback(authz.Policy{}, fmt.Errorf("failed to reload policy file %s: %w", path, err)) + return + } + callback(p, nil) } - // startWatch starts a koanf file watcher. On file change it reloads the - // policy and calls callback. On watcher error (e.g. transient symlink - // removal during a Kubernetes ConfigMap update, or neovim atomic save - // on Linux) it restarts the watch after a short delay. - var startWatch func() - startWatch = func() { - fp := file.Provider(path) - if err := fp.Watch(func(_ any, watchErr error) { - if watchErr != nil { - slog.Warn("Policy file watcher exited, will restart", - "path", path, "error", watchErr) - time.Sleep(time.Second) - startWatch() - // Reload immediately -- we may have missed updates while dead. - if p, err := LoadPolicyFromFile(path); err == nil { - slog.Info("Policy file watcher restarted", - "path", path) - callback(p, nil) - } + + // Initial watch. + fp := file.Provider(path) + if err := fp.Watch(watchCb); err != nil { + return authz.Policy{}, nil, &InitializeWatchError{Err: err} + } + + // Restart loop: waits for the watcher to die, then restarts it. + go func() { + for { + select { + case <-stopCh: return + case <-restartCh: } - k := koanf.New(".") - if err := k.Load(fp, yaml.Parser()); err != nil { - callback(authz.Policy{}, fmt.Errorf("failed to reload policy file %s: %w", path, err)) + + // Backoff to avoid busy-looping if the file is persistently gone. + select { + case <-stopCh: return + case <-time.After(time.Second): } - var p authz.Policy - if err := k.Unmarshal("", &p); err != nil { - callback(authz.Policy{}, fmt.Errorf("failed to unmarshal policy: %w", err)) - return + + slog.Info("Restarting policy file watcher", "path", path) + fp := file.Provider(path) + if err := fp.Watch(watchCb); err != nil { + slog.Error("Failed to restart policy file watcher", + "path", path, "error", err) + // Signal ourselves to retry. + select { + case restartCh <- struct{}{}: + default: + } + continue + } + + // Reload immediately -- we may have missed updates while dead. + if p, err := LoadPolicyFromFile(path); err == nil { + callback(p, nil) } - callback(p, nil) - }); err != nil { - slog.Error("Failed to start policy file watcher", - "path", path, "error", err) } + }() + + unwatch := func() error { + close(stopCh) + return nil } - startWatch() - return policy, func() error { return nil }, nil + + return policy, unwatch, nil } From 4b75519a240b0071acddce5a66057b501ed62e71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Tue, 17 Mar 2026 12:39:27 +0100 Subject: [PATCH 04/11] authz/loader: keep initial watch synchronous, only loop for restarts --- authz/loader/loader.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/authz/loader/loader.go b/authz/loader/loader.go index c337c35..9cf7dac 100644 --- a/authz/loader/loader.go +++ b/authz/loader/loader.go @@ -108,13 +108,14 @@ func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, Policy callback(p, nil) } - // Initial watch. + // Start the initial watch synchronously so the watcher is active before + // we return to the caller. fp := file.Provider(path) if err := fp.Watch(watchCb); err != nil { return authz.Policy{}, nil, &InitializeWatchError{Err: err} } - // Restart loop: waits for the watcher to die, then restarts it. + // Restart loop: when koanf's watcher dies, restart it after a short backoff. go func() { for { select { @@ -123,25 +124,23 @@ func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, Policy case <-restartCh: } - // Backoff to avoid busy-looping if the file is persistently gone. select { case <-stopCh: return case <-time.After(time.Second): } - slog.Info("Restarting policy file watcher", "path", path) fp := file.Provider(path) if err := fp.Watch(watchCb); err != nil { - slog.Error("Failed to restart policy file watcher", + slog.Warn("Failed to restart policy file watcher, will retry", "path", path, "error", err) - // Signal ourselves to retry. select { case restartCh <- struct{}{}: default: } continue } + slog.Info("Policy file watcher restarted", "path", path) // Reload immediately -- we may have missed updates while dead. if p, err := LoadPolicyFromFile(path); err == nil { From fe59944a603cfdbaf0f1761e1fc755f1556bf793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Tue, 17 Mar 2026 12:44:54 +0100 Subject: [PATCH 05/11] authz/loader: single watch loop with proper shutdown, no special initial case --- authz/loader/loader.go | 48 ++++++++++++++++--------------------- authz/loader/loader_test.go | 3 +++ 2 files changed, 23 insertions(+), 28 deletions(-) diff --git a/authz/loader/loader.go b/authz/loader/loader.go index 9cf7dac..3a88dce 100644 --- a/authz/loader/loader.go +++ b/authz/loader/loader.go @@ -86,16 +86,15 @@ func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, Policy return authz.Policy{}, nil, fmt.Errorf("failed to load policy file %s: %w", path, err) } - // restartCh is signalled by the watch callback when koanf's watcher exits. - restartCh := make(chan struct{}, 1) stopCh := make(chan struct{}) + diedCh := make(chan struct{}, 1) watchCb := func(_ any, watchErr error) { if watchErr != nil { slog.Warn("Policy file watcher exited, will restart", "path", path, "error", watchErr) select { - case restartCh <- struct{}{}: + case diedCh <- struct{}{}: default: } return @@ -108,41 +107,34 @@ func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, Policy callback(p, nil) } - // Start the initial watch synchronously so the watcher is active before - // we return to the caller. - fp := file.Provider(path) - if err := fp.Watch(watchCb); err != nil { - return authz.Policy{}, nil, &InitializeWatchError{Err: err} - } - - // Restart loop: when koanf's watcher dies, restart it after a short backoff. + // Watch loop: starts koanf's file watcher, blocks until it dies, restarts. + // Koanf's watcher exits on transient errors like symlink removal during + // Kubernetes ConfigMap updates. go func() { for { - select { - case <-stopCh: - return - case <-restartCh: + fp := file.Provider(path) + if err := fp.Watch(watchCb); err != nil { + slog.Warn("Failed to start policy file watcher, will retry", + "path", path, "error", err) + } else { + // Block until the watcher dies or we're told to stop. + select { + case <-stopCh: + fp.Unwatch() + return + case <-diedCh: + } } + // Backoff before restarting. select { case <-stopCh: return case <-time.After(time.Second): } - fp := file.Provider(path) - if err := fp.Watch(watchCb); err != nil { - slog.Warn("Failed to restart policy file watcher, will retry", - "path", path, "error", err) - select { - case restartCh <- struct{}{}: - default: - } - continue - } - slog.Info("Policy file watcher restarted", "path", path) - - // Reload immediately -- we may have missed updates while dead. + // Reload after backoff — we may have missed updates while dead. + // By now the symlink should be back. if p, err := LoadPolicyFromFile(path); err == nil { callback(p, nil) } diff --git a/authz/loader/loader_test.go b/authz/loader/loader_test.go index 1aec018..c991106 100644 --- a/authz/loader/loader_test.go +++ b/authz/loader/loader_test.go @@ -14,6 +14,7 @@ import ( "path/filepath" "reflect" "testing" + "time" "github.com/redpanda-data/common-go/authz" ) @@ -130,6 +131,8 @@ func TestFileWatcher(t *testing.T) { if !reflect.DeepEqual(policy, expectedPolicyOne) { t.Errorf("expected %v, want %v", policy, expectedPolicyOne) } + // Small delay to let the background watcher goroutine start. + time.Sleep(100 * time.Millisecond) tmpPath := filepath.Join(dir, "rbac_policy.tmp.yaml") if err := os.WriteFile(tmpPath, []byte(policyTwo), 0o644); err != nil { t.Fatalf("failed to write file: %v", err) From cb86d1bc8f31a1fd6d95f15c9d1447aca08d0515 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Tue, 17 Mar 2026 13:02:01 +0100 Subject: [PATCH 06/11] authz/loader: restart koanf file watcher on transient symlink errors Koanf's file.Provider.Watch exits when filepath.EvalSymlinks fails, which happens during Kubernetes ConfigMap updates (kubelet briefly removes the ..data symlink). Restart the watch in a loop with 1s backoff. Reload the policy after each restart to catch missed updates. Single file.Provider instance, initial watch is synchronous, restart loop runs in a background goroutine. Unwatch closes the stop channel. --- authz/loader/loader.go | 56 +++++++++++++---------- authz/loader/loader_watch_test.go | 75 +++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 24 deletions(-) create mode 100644 authz/loader/loader_watch_test.go diff --git a/authz/loader/loader.go b/authz/loader/loader.go index 3a88dce..1d58985 100644 --- a/authz/loader/loader.go +++ b/authz/loader/loader.go @@ -86,55 +86,63 @@ func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, Policy return authz.Policy{}, nil, fmt.Errorf("failed to load policy file %s: %w", path, err) } + fp := file.Provider(path) stopCh := make(chan struct{}) diedCh := make(chan struct{}, 1) - watchCb := func(_ any, watchErr error) { - if watchErr != nil { + watchFunc := func(_ any, err error) { + if err != nil { slog.Warn("Policy file watcher exited, will restart", - "path", path, "error", watchErr) + "path", path, "error", err) select { case diedCh <- struct{}{}: default: } return } - p, err := LoadPolicyFromFile(path) - if err != nil { - callback(authz.Policy{}, fmt.Errorf("failed to reload policy file %s: %w", path, err)) + p, loadErr := LoadPolicyFromFile(path) + if loadErr != nil { + callback(authz.Policy{}, fmt.Errorf("failed to reload policy file %s: %w", path, loadErr)) return } callback(p, nil) } - // Watch loop: starts koanf's file watcher, blocks until it dies, restarts. - // Koanf's watcher exits on transient errors like symlink removal during - // Kubernetes ConfigMap updates. + // Start the initial watch synchronously so it's active before we return. + if err := fp.Watch(watchFunc); err != nil { + return authz.Policy{}, nil, &InitializeWatchError{Err: err} + } + + // Restart loop: when the watcher dies, restart it after a backoff. + // Koanf sets isWatching=false when its goroutine exits, so fp.Watch + // can be called again on the same provider. go func() { for { - fp := file.Provider(path) - if err := fp.Watch(watchCb); err != nil { - slog.Warn("Failed to start policy file watcher, will retry", - "path", path, "error", err) - } else { - // Block until the watcher dies or we're told to stop. - select { - case <-stopCh: - fp.Unwatch() - return - case <-diedCh: - } + select { + case <-stopCh: + return + case <-diedCh: } - // Backoff before restarting. select { case <-stopCh: return case <-time.After(time.Second): } - // Reload after backoff — we may have missed updates while dead. - // By now the symlink should be back. + if err := fp.Watch(watchFunc); err != nil { + slog.Warn("Failed to restart policy file watcher, will retry", + "path", path, "error", err) + // Signal ourselves to retry next iteration. + select { + case diedCh <- struct{}{}: + default: + } + continue + } + slog.Info("Policy file watcher restarted", "path", path) + + // Reload — we may have missed updates while dead. if p, err := LoadPolicyFromFile(path); err == nil { callback(p, nil) } diff --git a/authz/loader/loader_watch_test.go b/authz/loader/loader_watch_test.go new file mode 100644 index 0000000..a336a0a --- /dev/null +++ b/authz/loader/loader_watch_test.go @@ -0,0 +1,75 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package loader_test + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/common-go/authz" + "github.com/redpanda-data/common-go/authz/loader" +) + +func TestWatchBasic(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "policy.yaml") + require.NoError(t, os.WriteFile(path, []byte(` +roles: +- id: Writer + permissions: + - p1 +bindings: +- principal: "User:alice@example.com" + role: Writer + scope: "orgs/o1" +`), 0o644)) + + ch := make(chan authz.Policy, 10) + fmt.Println("before") + policy, unwatch, err := loader.WatchPolicyFile(path, func(p authz.Policy, err error) { + t.Logf("callback: bindings=%d err=%v", len(p.Bindings), err) + if err == nil { + ch <- p + } + }) + + fmt.Println("after") + require.NoError(t, err) + defer unwatch() + require.Len(t, policy.Bindings, 1) + + t.Log("writing updated policy") + require.NoError(t, os.WriteFile(path, []byte(` +roles: +- id: Writer + permissions: + - p1 +bindings: +- principal: "User:alice@example.com" + role: Writer + scope: "orgs/o1" +- principal: "User:bob@example.com" + role: Writer + scope: "orgs/o1" +`), 0o644)) + + select { + case p := <-ch: + require.Len(t, p.Bindings, 2) + t.Log("got updated policy") + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for callback") + } +} From f13613f4fda7fa75ced8874730759e977fd7980d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Tue, 17 Mar 2026 13:08:55 +0100 Subject: [PATCH 07/11] authz/loader: call fp.Unwatch on shutdown --- authz/loader/loader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/authz/loader/loader.go b/authz/loader/loader.go index 1d58985..6627a2c 100644 --- a/authz/loader/loader.go +++ b/authz/loader/loader.go @@ -151,7 +151,7 @@ func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, Policy unwatch := func() error { close(stopCh) - return nil + return fp.Unwatch() } return policy, unwatch, nil From 7ff7e2fb687f62f73240052943331c8b786335e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Tue, 17 Mar 2026 13:10:28 +0100 Subject: [PATCH 08/11] authz/loader: add unwatch test, remove debug prints --- authz/loader/loader_watch_test.go | 43 ++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/authz/loader/loader_watch_test.go b/authz/loader/loader_watch_test.go index a336a0a..53cb8e6 100644 --- a/authz/loader/loader_watch_test.go +++ b/authz/loader/loader_watch_test.go @@ -13,9 +13,11 @@ import ( "fmt" "os" "path/filepath" + "sync/atomic" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/redpanda-data/common-go/authz" @@ -37,15 +39,12 @@ bindings: `), 0o644)) ch := make(chan authz.Policy, 10) - fmt.Println("before") policy, unwatch, err := loader.WatchPolicyFile(path, func(p authz.Policy, err error) { t.Logf("callback: bindings=%d err=%v", len(p.Bindings), err) if err == nil { ch <- p } }) - - fmt.Println("after") require.NoError(t, err) defer unwatch() require.Len(t, policy.Bindings, 1) @@ -73,3 +72,41 @@ bindings: t.Fatal("timed out waiting for callback") } } + +func TestWatchUnwatch(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "policy.yaml") + + writeTestPolicy(t, path, 1) + + var calls atomic.Int32 + policy, unwatch, err := loader.WatchPolicyFile(path, func(p authz.Policy, err error) { + calls.Add(1) + }) + require.NoError(t, err) + require.Len(t, policy.Bindings, 1) + + // Verify watcher is alive: write and expect callback. + writeTestPolicy(t, path, 2) + time.Sleep(500 * time.Millisecond) + require.Greater(t, calls.Load(), int32(0), "expected at least one callback before unwatch") + + // Unwatch and reset counter. + require.NoError(t, unwatch()) + calls.Store(0) + + // Write again — callback should NOT fire. + writeTestPolicy(t, path, 3) + time.Sleep(500 * time.Millisecond) + assert.Equal(t, int32(0), calls.Load(), "callback should not fire after unwatch") +} + +func writeTestPolicy(t *testing.T, path string, nBindings int) { + t.Helper() + var bindings string + for i := range nBindings { + bindings += fmt.Sprintf("- principal: \"User:user%d@example.com\"\n role: Writer\n scope: \"orgs/o1\"\n", i) + } + data := fmt.Sprintf("roles:\n- id: Writer\n permissions:\n - p1\nbindings:\n%s", bindings) + require.NoError(t, os.WriteFile(path, []byte(data), 0o644)) +} From e2e42c7bd8654f35bf0a7f7c7b5976152602621a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Tue, 17 Mar 2026 13:13:03 +0100 Subject: [PATCH 09/11] authz/loader: fix double-close panic, document callback concurrency --- authz/loader/loader.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/authz/loader/loader.go b/authz/loader/loader.go index 6627a2c..65fd4d2 100644 --- a/authz/loader/loader.go +++ b/authz/loader/loader.go @@ -12,6 +12,7 @@ package loader import ( "fmt" "log/slog" + "sync" "time" "github.com/knadh/koanf/parsers/yaml" @@ -51,6 +52,8 @@ func LoadPolicyFromBytes(data []byte) (authz.Policy, error) { // PolicyCallback is called when a policy is loaded or reloaded. // If an error occurs during loading, policy will be empty and err will be set. +// The callback must be safe for concurrent use — it may be called from +// multiple goroutines (e.g. during watcher restart). type PolicyCallback func(policy authz.Policy, err error) // PolicyUnwatch stops watching the policy file for changes. @@ -149,8 +152,9 @@ func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, Policy } }() + var closeOnce sync.Once unwatch := func() error { - close(stopCh) + closeOnce.Do(func() { close(stopCh) }) return fp.Unwatch() } From bd02886000d5b0b1335616a128a7ac2a7265787a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Tue, 17 Mar 2026 13:29:50 +0100 Subject: [PATCH 10/11] authz/loader: fresh file.Provider on each restart to avoid mutex deadlock --- authz/loader/loader.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/authz/loader/loader.go b/authz/loader/loader.go index 65fd4d2..d0d2be7 100644 --- a/authz/loader/loader.go +++ b/authz/loader/loader.go @@ -89,7 +89,6 @@ func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, Policy return authz.Policy{}, nil, fmt.Errorf("failed to load policy file %s: %w", path, err) } - fp := file.Provider(path) stopCh := make(chan struct{}) diedCh := make(chan struct{}, 1) @@ -112,13 +111,19 @@ func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, Policy } // Start the initial watch synchronously so it's active before we return. + // Each iteration uses a fresh file.Provider because koanf's internal + // mutex can deadlock if we reuse a provider whose goroutine is still + // cleaning up. + fp := file.Provider(path) if err := fp.Watch(watchFunc); err != nil { return authz.Policy{}, nil, &InitializeWatchError{Err: err} } + // currentFP tracks the active provider so unwatch can stop it. + var mu sync.Mutex + currentFP := fp + // Restart loop: when the watcher dies, restart it after a backoff. - // Koanf sets isWatching=false when its goroutine exits, so fp.Watch - // can be called again on the same provider. go func() { for { select { @@ -133,16 +138,21 @@ func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, Policy case <-time.After(time.Second): } - if err := fp.Watch(watchFunc); err != nil { + newFP := file.Provider(path) + if err := newFP.Watch(watchFunc); err != nil { slog.Warn("Failed to restart policy file watcher, will retry", "path", path, "error", err) - // Signal ourselves to retry next iteration. select { case diedCh <- struct{}{}: default: } continue } + + mu.Lock() + currentFP = newFP + mu.Unlock() + slog.Info("Policy file watcher restarted", "path", path) // Reload — we may have missed updates while dead. @@ -155,7 +165,9 @@ func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, Policy var closeOnce sync.Once unwatch := func() error { closeOnce.Do(func() { close(stopCh) }) - return fp.Unwatch() + mu.Lock() + defer mu.Unlock() + return currentFP.Unwatch() } return policy, unwatch, nil From 10985ccb5f6ae25089ca4ee092e62caa84618294 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Tue, 17 Mar 2026 13:43:04 +0100 Subject: [PATCH 11/11] authz/loader: fix unused param lint, go mod tidy --- authz/loader/atomic_rename_test.go | 14 ++++++++++++-- authz/loader/loader_watch_test.go | 2 +- kvstore/client_test.go | 8 +++----- kvstore/sync_test.go | 21 +++++++-------------- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/authz/loader/atomic_rename_test.go b/authz/loader/atomic_rename_test.go index 284216a..b749924 100644 --- a/authz/loader/atomic_rename_test.go +++ b/authz/loader/atomic_rename_test.go @@ -1,3 +1,12 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + package loader_test import ( @@ -7,10 +16,11 @@ import ( "testing" "time" - "github.com/redpanda-data/common-go/authz" - "github.com/redpanda-data/common-go/authz/loader" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/redpanda-data/common-go/authz" + "github.com/redpanda-data/common-go/authz/loader" ) // TestWatchPolicyFile_AtomicRename reproduces the neovim atomic save pattern diff --git a/authz/loader/loader_watch_test.go b/authz/loader/loader_watch_test.go index 53cb8e6..d225335 100644 --- a/authz/loader/loader_watch_test.go +++ b/authz/loader/loader_watch_test.go @@ -80,7 +80,7 @@ func TestWatchUnwatch(t *testing.T) { writeTestPolicy(t, path, 1) var calls atomic.Int32 - policy, unwatch, err := loader.WatchPolicyFile(path, func(p authz.Policy, err error) { + policy, unwatch, err := loader.WatchPolicyFile(path, func(_ authz.Policy, _ error) { calls.Add(1) }) require.NoError(t, err) diff --git a/kvstore/client_test.go b/kvstore/client_test.go index 3b44271..dde6f4f 100644 --- a/kvstore/client_test.go +++ b/kvstore/client_test.go @@ -185,11 +185,9 @@ func TestSync_NoSpuriousWakeups(t *testing.T) { // Send broadcasts at intermediate offsets var wg sync.WaitGroup for i := 1; i < 100; i++ { - wg.Add(1) - go func(offset int) { - defer wg.Done() - c.setOffset(int64(offset)) - }(i) + wg.Go(func() { + c.setOffset(int64(i)) + }) } wg.Wait() diff --git a/kvstore/sync_test.go b/kvstore/sync_test.go index 1a86f17..266245d 100644 --- a/kvstore/sync_test.go +++ b/kvstore/sync_test.go @@ -97,15 +97,13 @@ func TestSync_ConcurrentPuts(t *testing.T) { errCount := make(chan struct{}, numWrites) for i := 0; i < numWrites; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() + wg.Go(func() { key := []byte(fmt.Sprintf("key-%d", i)) value := fmt.Sprintf("value-%d", i) if err := client.Put(ctx, key, []byte(value)); err != nil { errCount <- struct{}{} } - }(i) + }) } wg.Wait() @@ -437,9 +435,7 @@ func TestMultiClient_ConcurrentWrites(t *testing.T) { // Each client writes its own set of keys concurrently var wg sync.WaitGroup for clientID := 0; clientID < numClients; clientID++ { - wg.Add(1) - go func(clientID int) { - defer wg.Done() + wg.Go(func() { client := clients[clientID] for i := 0; i < writesPerClient; i++ { key := []byte(fmt.Sprintf("client%d-key%d", clientID, i)) @@ -447,7 +443,7 @@ func TestMultiClient_ConcurrentWrites(t *testing.T) { err := client.Put(ctx, key, value) require.NoError(t, err) } - }(clientID) + }) } wg.Wait() @@ -579,11 +575,8 @@ func TestSync_StressReadYourOwnWrites(t *testing.T) { var totalOps atomic.Int64 var wg sync.WaitGroup - for p := 0; p < numProducers; p++ { - wg.Add(1) - go func(producerID int) { - defer wg.Done() - + for producerID := 0; producerID < numProducers; producerID++ { + wg.Go(func() { // Each producer has its own key - no concurrent writes to same key key := []byte(fmt.Sprintf("producer-%d-key", producerID)) @@ -618,7 +611,7 @@ func TestSync_StressReadYourOwnWrites(t *testing.T) { totalOps.Add(1) } - }(p) + }) } wg.Wait()