Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions authz/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/knadh/koanf/providers/file v1.2.1
github.com/knadh/koanf/providers/rawbytes v1.0.0
github.com/knadh/koanf/v2 v2.3.2
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.42.0
go.opentelemetry.io/otel/trace v1.42.0
golang.org/x/net v0.51.0
Expand All @@ -19,17 +20,20 @@ require (

require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.5.0 // indirect
github.com/knadh/koanf/maps v0.1.2 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/metric v1.42.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.35.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260226221140-a57be14db171 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
137 changes: 137 additions & 0 deletions authz/loader/atomic_rename_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package loader_test

import (
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/common-go/authz"
"github.com/redpanda-data/common-go/authz/loader"
)

// TestWatchPolicyFile_AtomicRename reproduces the neovim atomic save pattern
// on Linux: write to a temp file in the same directory, then rename over the
// original. This causes inotify to lose the watch because the inode changes.
//
// This test currently fails -- the watcher does not pick up the new content.
func TestWatchPolicyFile_AtomicRename(t *testing.T) {
dir := t.TempDir()
policyPath := filepath.Join(dir, "policy.yaml")

initialPolicy := []byte(`roles:
- id: reader
permissions:
- read
bindings:
- role: reader
principal: "User:alice@example.com"
scope: "organizations/test"
`)
require.NoError(t, os.WriteFile(policyPath, initialPolicy, 0o644))

var mu sync.Mutex
var reloadedPolicy *authz.Policy
var reloadErr error

policy, unwatch, err := loader.WatchPolicyFile(policyPath, func(p authz.Policy, err error) {
mu.Lock()
defer mu.Unlock()
if err != nil {
reloadErr = err
} else {
reloadedPolicy = &p
}
})
require.NoError(t, err)
defer func() { _ = unwatch() }()

// Verify initial load
require.Len(t, policy.Roles, 1)
assert.Equal(t, "reader", string(policy.Roles[0].ID))

// Simulate neovim atomic save: write temp file, rename over original.
// This is exactly what neovim does with 'backupcopy=no' (the default on Linux).
updatedPolicy := []byte(`roles:
- id: admin
permissions:
- read
- write
- delete
bindings:
- role: admin
principal: "User:alice@example.com"
scope: "organizations/test"
`)

// Neovim with backupcopy=no (default on Linux):
// 1. Rename original to backup (original inode moves away)
// 2. Write new content to original path (new inode)
// 3. Unlink backup
backupPath := policyPath + "~"
require.NoError(t, os.Rename(policyPath, backupPath))
require.NoError(t, os.WriteFile(policyPath, updatedPolicy, 0o644))
require.NoError(t, os.Remove(backupPath))

// Wait for first reload
waitForReload := func(t *testing.T, expected string, timeout time.Duration) {
t.Helper()
deadline := time.After(timeout)
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-deadline:
mu.Lock()
e := reloadErr
mu.Unlock()
if e != nil {
t.Fatalf("watcher error: %v", e)
}
t.Fatalf("timed out waiting for role %q", expected)
case <-ticker.C:
mu.Lock()
got := reloadedPolicy
reloadedPolicy = nil // reset for next round
mu.Unlock()
if got != nil {
require.Len(t, got.Roles, 1)
assert.Equal(t, expected, string(got.Roles[0].ID))
return
}
}
}
}

waitForReload(t, "admin", 3*time.Second)

// Second neovim edit -- this is where the watcher typically breaks.
secondPolicy := []byte(`roles:
- id: superadmin
permissions:
- everything
bindings:
- role: superadmin
principal: "User:alice@example.com"
scope: "organizations/test"
`)
backupPath2 := policyPath + "~"
require.NoError(t, os.Rename(policyPath, backupPath2))
require.NoError(t, os.WriteFile(policyPath, secondPolicy, 0o644))
require.NoError(t, os.Remove(backupPath2))

waitForReload(t, "superadmin", 3*time.Second)
}
131 changes: 89 additions & 42 deletions authz/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ package loader

import (
"fmt"
"runtime"
"strings"
"log/slog"
"sync"
"time"

"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/file"
Expand Down Expand Up @@ -51,6 +52,8 @@ func LoadPolicyFromBytes(data []byte) (authz.Policy, error) {

// PolicyCallback is called when a policy is loaded or reloaded.
// If an error occurs during loading, policy will be empty and err will be set.
// The callback must be safe for concurrent use — it may be called from
// multiple goroutines (e.g. during watcher restart).
type PolicyCallback func(policy authz.Policy, err error)

// PolicyUnwatch stops watching the policy file for changes.
Expand All @@ -75,53 +78,97 @@ func (e *InitializeWatchError) Unwrap() error {
// WatchPolicyFile watches a YAML policy file for changes and calls the callback
// when the file is modified. This is particularly useful for Kubernetes ConfigMap
// mounted files which are updated via symlink changes.
//
// Koanf's file watcher exits on transient errors (e.g. symlink briefly removed
// during a ConfigMap update). This function runs the watcher in a loop,
// restarting it automatically when it exits. Call the returned PolicyUnwatch
// to stop the loop and clean up.
func WatchPolicyFile(path string, callback PolicyCallback) (authz.Policy, PolicyUnwatch, error) {
k := koanf.New(".")
fp := file.Provider(path)
if err := k.Load(fp, yaml.Parser()); err != nil {
policy, err := LoadPolicyFromFile(path)
if err != nil {
return authz.Policy{}, nil, fmt.Errorf("failed to load policy file %s: %w", path, err)
}
var policy authz.Policy
if err := k.Unmarshal("", &policy); err != nil {
return authz.Policy{}, nil, fmt.Errorf("failed to unmarshal file %s: %w", path, err)
}
// Watch for changes using the file provider's Watch method
// The watchFunc will be called whenever the file changes
watchFunc := func(_ any, watchErr error) {
// On macOS (kqueue), fsnotify watches inodes. An atomic rename —
// write to a temp file then rename over the watched path — replaces
// the inode, so kqueue fires a REMOVE on the old inode and never
// fires a CREATE for the new one. koanf surfaces this as
// "file <path> was removed". On Linux (inotify), the same operation
// fires a CREATE on the destination path, so the reload succeeds
// without hitting this branch. We therefore only see this error in
// local macOS development, which is why it wasn't caught by CI.
//
// When we detect a REMOVE on darwin, try reloading from the path
// directly. If the file is there (atomic rename), we get the updated
// policy. If it's genuinely gone, the reload errors and we fall
// through to propagate the original watcher error.
isMacOSRemove := runtime.GOOS == "darwin" && watchErr != nil && strings.Contains(watchErr.Error(), "was removed")
if watchErr != nil && !isMacOSRemove {
callback(authz.Policy{}, fmt.Errorf("watcher error: %w", watchErr))
return
}
// Reload the policy
k := koanf.New(".")
if err := k.Load(fp, yaml.Parser()); err != nil {
callback(authz.Policy{}, fmt.Errorf("failed to reload policy file %s: %w", path, err))

stopCh := make(chan struct{})
diedCh := make(chan struct{}, 1)

watchFunc := func(_ any, err error) {
if err != nil {
slog.Warn("Policy file watcher exited, will restart",
"path", path, "error", err)
select {
case diedCh <- struct{}{}:
default:
}
return
}
var policy authz.Policy
if err := k.Unmarshal("", &policy); err != nil {
callback(authz.Policy{}, fmt.Errorf("failed to unmarshal policy: %w", err))
p, loadErr := LoadPolicyFromFile(path)
if loadErr != nil {
callback(authz.Policy{}, fmt.Errorf("failed to reload policy file %s: %w", path, loadErr))
return
}
callback(policy, nil)
callback(p, nil)
}
err := fp.Watch(watchFunc)
if err != nil {
return authz.Policy{}, nil, &InitializeWatchError{err}

// Start the initial watch synchronously so it's active before we return.
// Each iteration uses a fresh file.Provider because koanf's internal
// mutex can deadlock if we reuse a provider whose goroutine is still
// cleaning up.
fp := file.Provider(path)
if err := fp.Watch(watchFunc); err != nil {
return authz.Policy{}, nil, &InitializeWatchError{Err: err}
}

// currentFP tracks the active provider so unwatch can stop it.
var mu sync.Mutex
currentFP := fp

// Restart loop: when the watcher dies, restart it after a backoff.
go func() {
for {
select {
case <-stopCh:
return
case <-diedCh:
}

select {
case <-stopCh:
return
case <-time.After(time.Second):
}

newFP := file.Provider(path)
if err := newFP.Watch(watchFunc); err != nil {
slog.Warn("Failed to restart policy file watcher, will retry",
"path", path, "error", err)
select {
case diedCh <- struct{}{}:
default:
}
continue
}

mu.Lock()
currentFP = newFP
mu.Unlock()

slog.Info("Policy file watcher restarted", "path", path)

// Reload — we may have missed updates while dead.
if p, err := LoadPolicyFromFile(path); err == nil {
callback(p, nil)
}
}
}()

var closeOnce sync.Once
unwatch := func() error {
closeOnce.Do(func() { close(stopCh) })
mu.Lock()
defer mu.Unlock()
return currentFP.Unwatch()
}
return policy, fp.Unwatch, nil

return policy, unwatch, nil
}
Loading
Loading