From c4690c9e424bca8ba1ffc26ffafce467930bc436 Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Fri, 17 Oct 2025 11:33:26 -0700 Subject: [PATCH 01/11] feat(go): Add IAM authentication support with automatic token refresh Implement IAM authentication for ElastiCache and MemoryDB clusters in the Go client: - Add IamAuthConfig with configurable refresh intervals - Extend ServerCredentials to support IAM authentication mode - Implement RefreshIamToken method for manual token refresh - Add FFI binding for refresh_iam_token - Include comprehensive examples for ElastiCache, MemoryDB, and cluster modes - Add unit tests for IAM configuration and credentials The client automatically refreshes IAM tokens based on the configured interval, with support for manual refresh when needed. IAM and password authentication modes are mutually exclusive. Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- ffi/src/lib.rs | 40 +++++ go/base_client.go | 87 ++++++++++ go/config/config.go | 100 ++++++++++- go/config/config_test.go | 69 ++++++++ go/examples/iam_authentication_example.go | 203 ++++++++++++++++++++++ 5 files changed, 496 insertions(+), 3 deletions(-) create mode 100644 go/examples/iam_authentication_example.go diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index ddb3179e7a..d325255ea2 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -1587,6 +1587,46 @@ pub unsafe extern "C-unwind" fn update_connection_password( }) } +/// Manually refresh the IAM authentication token. +/// +/// This function triggers an immediate refresh of the IAM token and updates the connection. +/// It is only available if the client was created with IAM authentication. +/// +/// # Parameters +/// +/// * `client_adapter_ptr`: Pointer to a valid client returned from [`create_client`]. +/// * `request_id`: Unique identifier for a valid payload buffer created in the calling language. +/// +/// # Returns +/// +/// * A pointer to a [`CommandResult`] containing "OK" on success, or an error if: +/// - The client is not using IAM authentication +/// - Token generation fails +/// - Authentication with the new token fails +/// +/// # Safety +/// +/// * `client_adapter_ptr` must not be `null` and must be obtained from the `ConnectionResponse` returned from [`create_client`]. +/// * `client_adapter_ptr` must be able to be safely casted to a valid [`Arc`] via [`Arc::from_raw`]. +/// * `request_id` must be valid until it is passed in a call to [`free_command_response`]. +/// * This function should only be called with a `client_adapter_ptr` created by [`create_client`], before [`close_client`] was called with the pointer. +#[unsafe(no_mangle)] +pub unsafe extern "C-unwind" fn refresh_iam_token( + client_adapter_ptr: *const c_void, + request_id: usize, +) -> *mut CommandResult { + let client_adapter = unsafe { + // we increment the strong count to ensure that the client is not dropped just because we turned it into an Arc. + Arc::increment_strong_count(client_adapter_ptr); + Arc::from_raw(client_adapter_ptr as *mut ClientAdapter) + }; + + let mut client = client_adapter.core.client.clone(); + client_adapter.execute_request(request_id, async move { + client.refresh_iam_token().await.map(|_| Value::Okay) + }) +} + /// Executes a Lua script. /// /// # Parameters diff --git a/go/base_client.go b/go/base_client.go index 87f8ce3162..c123355a9f 100644 --- a/go/base_client.go +++ b/go/base_client.go @@ -731,6 +731,93 @@ func (client *baseClient) ResetConnectionPassword(ctx context.Context) (string, return client.submitConnectionPasswordUpdate(ctx, "", false) } +func (client *baseClient) submitRefreshIamToken(ctx context.Context) (string, error) { + // Check if context is already done + select { + case <-ctx.Done(): + return models.DefaultStringResponse, ctx.Err() + default: + // Continue with execution + } + + // Create a channel to receive the result + resultChannel := make(chan payload, 1) + resultChannelPtr := unsafe.Pointer(&resultChannel) + + pinner := pinner{} + pinnedChannelPtr := uintptr(pinner.Pin(resultChannelPtr)) + defer pinner.Unpin() + + client.mu.Lock() + if client.coreClient == nil { + client.mu.Unlock() + return models.DefaultStringResponse, NewClosingError("RefreshIamToken failed. The client is closed.") + } + client.pending[resultChannelPtr] = struct{}{} + + C.refresh_iam_token( + client.coreClient, + C.uintptr_t(pinnedChannelPtr), + ) + client.mu.Unlock() + + // Wait for result or context cancellation + var payload payload + select { + case <-ctx.Done(): + client.mu.Lock() + if client.pending != nil { + delete(client.pending, resultChannelPtr) + } + client.mu.Unlock() + // Start cleanup goroutine + go func() { + // Wait for payload on separate channel + if payload := <-resultChannel; payload.value != nil { + C.free_command_response(payload.value) + } + }() + return models.DefaultStringResponse, ctx.Err() + case payload = <-resultChannel: + // Continue with normal processing + } + + client.mu.Lock() + if client.pending != nil { + delete(client.pending, resultChannelPtr) + } + client.mu.Unlock() + + if payload.error != nil { + return models.DefaultStringResponse, payload.error + } + + return handleOkResponse(payload.value) +} + +// RefreshIamToken manually refreshes the IAM token for the current connection. +// +// This method is only available if the client was created with IAM authentication. +// It triggers an immediate refresh of the IAM token and updates the connection. +// +// Parameters: +// +// ctx - The context for controlling the command execution. +// +// Return value: +// +// `"OK"` response on success. +// +// Example: +// +// result, err := client.RefreshIamToken(context.Background()) +// if err != nil { +// // handle error +// } +func (client *baseClient) RefreshIamToken(ctx context.Context) (string, error) { + return client.submitRefreshIamToken(ctx) +} + // Set the given key with the given value. The return value is a response from Valkey containing the string "OK". // // See [valkey.io] for details. diff --git a/go/config/config.go b/go/config/config.go index 0689ec2379..13012e487f 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -34,18 +34,86 @@ func (addr *NodeAddress) toProtobuf() *protobuf.NodeAddress { return &protobuf.NodeAddress{Host: addr.Host, Port: uint32(addr.Port)} } +// ServiceType represents the types of AWS services that can be used for IAM authentication. +type ServiceType int + +const ( + // Elasticache represents Amazon ElastiCache service. + Elasticache ServiceType = iota + // MemoryDB represents Amazon MemoryDB service. + MemoryDB +) + +// IamAuthConfig represents configuration settings for IAM authentication. +type IamAuthConfig struct { + // The name of the ElastiCache/MemoryDB cluster. + clusterName string + // The type of service being used (ElastiCache or MemoryDB). + service ServiceType + // The AWS region where the ElastiCache/MemoryDB cluster is located. + region string + // Optional refresh interval in seconds for renewing IAM authentication tokens. + // If not provided, defaults to 300 seconds (5 min). + refreshIntervalSeconds *uint32 +} + +// NewIamAuthConfig returns an [IamAuthConfig] struct with the given configuration. +func NewIamAuthConfig(clusterName string, service ServiceType, region string) *IamAuthConfig { + defaultRefresh := uint32(300) + return &IamAuthConfig{ + clusterName: clusterName, + service: service, + region: region, + refreshIntervalSeconds: &defaultRefresh, + } +} + +// WithRefreshIntervalSeconds sets the refresh interval in seconds for IAM token renewal. +func (config *IamAuthConfig) WithRefreshIntervalSeconds(seconds uint32) *IamAuthConfig { + config.refreshIntervalSeconds = &seconds + return config +} + +func (config *IamAuthConfig) toProtobuf() *protobuf.IamCredentials { + iamCreds := &protobuf.IamCredentials{ + ClusterName: config.clusterName, + Region: config.region, + } + + if config.service == Elasticache { + iamCreds.ServiceType = protobuf.ServiceType_ELASTICACHE + } else { + iamCreds.ServiceType = protobuf.ServiceType_MEMORYDB + } + + if config.refreshIntervalSeconds != nil { + iamCreds.RefreshIntervalSeconds = config.refreshIntervalSeconds + } + + return iamCreds +} + // ServerCredentials represents the credentials for connecting to servers. +// Supports two authentication modes: +// - Password-based authentication: Use username and password +// - IAM authentication: Use username (required) and iamConfig +// +// These modes are mutually exclusive. type ServerCredentials struct { // The username that will be used for authenticating connections to the servers. If not supplied, "default" - // will be used. + // will be used for password-based authentication. Required for IAM authentication. username string // The password that will be used for authenticating connections to the servers. + // Mutually exclusive with iamConfig. password string + // IAM authentication configuration. Mutually exclusive with password. + // The client will automatically generate and refresh the authentication token based on the provided configuration. + iamConfig *IamAuthConfig } // NewServerCredentials returns a [ServerCredentials] struct with the given username and password. func NewServerCredentials(username string, password string) *ServerCredentials { - return &ServerCredentials{username, password} + return &ServerCredentials{username: username, password: password} } // NewServerCredentialsWithDefaultUsername returns a [ServerCredentials] struct with a default username of "default" and the @@ -54,8 +122,34 @@ func NewServerCredentialsWithDefaultUsername(password string) *ServerCredentials return &ServerCredentials{password: password} } +// NewServerCredentialsWithIam returns a [ServerCredentials] struct configured for IAM authentication. +// The username is required for IAM authentication. +func NewServerCredentialsWithIam(username string, iamConfig *IamAuthConfig) (*ServerCredentials, error) { + if username == "" { + return nil, errors.New("username is required for IAM authentication") + } + if iamConfig == nil { + return nil, errors.New("iamConfig cannot be nil") + } + return &ServerCredentials{username: username, iamConfig: iamConfig}, nil +} + func (creds *ServerCredentials) toProtobuf() *protobuf.AuthenticationInfo { - return &protobuf.AuthenticationInfo{Username: creds.username, Password: creds.password} + authInfo := &protobuf.AuthenticationInfo{ + Username: creds.username, + Password: creds.password, + } + + if creds.iamConfig != nil { + authInfo.IamCredentials = creds.iamConfig.toProtobuf() + } + + return authInfo +} + +// IsIamAuth returns true if this credential is configured for IAM authentication. +func (creds *ServerCredentials) IsIamAuth() bool { + return creds.iamConfig != nil } // ReadFrom represents the client's read from strategy. diff --git a/go/config/config_test.go b/go/config/config_test.go index 4fa9f5013c..1b76e16d61 100644 --- a/go/config/config_test.go +++ b/go/config/config_test.go @@ -197,6 +197,75 @@ func TestServerCredentials(t *testing.T) { }, } + for _, param := range parameters { + result := param.input.toProtobuf() + assert.Equal(t, param.expected, result) + } +} + +func TestServerCredentialsWithIam(t *testing.T) { + iamConfig := NewIamAuthConfig("my-cluster", Elasticache, "us-east-1") + creds, err := NewServerCredentialsWithIam("myUser", iamConfig) + + assert.Nil(t, err) + assert.NotNil(t, creds) + assert.True(t, creds.IsIamAuth()) + + authInfo := creds.toProtobuf() + assert.Equal(t, "myUser", authInfo.Username) + assert.Equal(t, "", authInfo.Password) + assert.NotNil(t, authInfo.IamCredentials) + assert.Equal(t, "my-cluster", authInfo.IamCredentials.ClusterName) + assert.Equal(t, "us-east-1", authInfo.IamCredentials.Region) + assert.Equal(t, protobuf.ServiceType_ELASTICACHE, authInfo.IamCredentials.ServiceType) + assert.Equal(t, uint32(300), *authInfo.IamCredentials.RefreshIntervalSeconds) +} + +func TestServerCredentialsWithIamCustomRefresh(t *testing.T) { + iamConfig := NewIamAuthConfig("my-cluster", MemoryDB, "us-west-2"). + WithRefreshIntervalSeconds(600) + creds, err := NewServerCredentialsWithIam("myUser", iamConfig) + + assert.Nil(t, err) + assert.NotNil(t, creds) + + authInfo := creds.toProtobuf() + assert.Equal(t, protobuf.ServiceType_MEMORYDB, authInfo.IamCredentials.ServiceType) + assert.Equal(t, uint32(600), *authInfo.IamCredentials.RefreshIntervalSeconds) +} + +func TestServerCredentialsWithIamRequiresUsername(t *testing.T) { + iamConfig := NewIamAuthConfig("my-cluster", Elasticache, "us-east-1") + creds, err := NewServerCredentialsWithIam("", iamConfig) + + assert.NotNil(t, err) + assert.Nil(t, creds) + assert.Contains(t, err.Error(), "username is required") +} + +func TestServerCredentialsWithIamRequiresConfig(t *testing.T) { + creds, err := NewServerCredentialsWithIam("myUser", nil) + + assert.NotNil(t, err) + assert.Nil(t, creds) + assert.Contains(t, err.Error(), "iamConfig cannot be nil") +} + +func TestOldServerCredentialsTests(t *testing.T) { + parameters := []struct { + input *ServerCredentials + expected *protobuf.AuthenticationInfo + }{ + { + NewServerCredentials("username", "password"), + &protobuf.AuthenticationInfo{Username: "username", Password: "password"}, + }, + { + NewServerCredentialsWithDefaultUsername("password"), + &protobuf.AuthenticationInfo{Password: "password"}, + }, + } + for i, parameter := range parameters { t.Run(fmt.Sprintf("Testing [%v]", i), func(t *testing.T) { result := parameter.input.toProtobuf() diff --git a/go/examples/iam_authentication_example.go b/go/examples/iam_authentication_example.go new file mode 100644 index 0000000000..35797c7c42 --- /dev/null +++ b/go/examples/iam_authentication_example.go @@ -0,0 +1,203 @@ +// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +package examples + +import ( + "context" + "fmt" + + "github.com/valkey-io/valkey-glide/go/v2" + "github.com/valkey-io/valkey-glide/go/v2/config" +) + +// Example demonstrating IAM authentication with Valkey GLIDE client. +// +// This example shows how to: +// - Configure IAM authentication for ElastiCache or MemoryDB +// - Create a client with IAM credentials +// - Manually refresh IAM tokens + +// ElasticacheExample demonstrates connecting to ElastiCache with IAM authentication +func ElasticacheExample() { + // Configure IAM authentication + iamConfig := config.NewIamAuthConfig( + "my-elasticache-cluster", + config.Elasticache, + "us-east-1", + ) // Uses default 300 second refresh interval + + // Create credentials with IAM config + credentials, err := config.NewServerCredentialsWithIam("myIamUser", iamConfig) + if err != nil { + fmt.Printf("Failed to create IAM credentials: %v\n", err) + return + } + + // Create client configuration + clientConfig := config.NewClientConfiguration(). + WithAddress(&config.NodeAddress{ + Host: "my-cluster.cache.amazonaws.com", + Port: 6379, + }). + WithCredentials(credentials). + WithUseTLS(true) // Recommended for AWS services + + // Create and use the client + client, err := glide.NewClient(clientConfig) + if err != nil { + fmt.Printf("Failed to create client: %v\n", err) + return + } + defer client.Close() + + // The client will automatically refresh IAM tokens based on the refresh interval + response, err := client.Set(context.Background(), "key", "value") + if err != nil { + fmt.Printf("SET failed: %v\n", err) + return + } + fmt.Printf("ElastiCache SET response: %s\n", response) + + value, err := client.Get(context.Background(), "key") + if err != nil { + fmt.Printf("GET failed: %v\n", err) + return + } + fmt.Printf("ElastiCache GET response: %s\n", value.Value()) +} + +// MemoryDBExample demonstrates connecting to MemoryDB with custom refresh interval +func MemoryDBExample() { + // Configure IAM authentication with custom refresh interval + iamConfig := config.NewIamAuthConfig( + "my-memorydb-cluster", + config.MemoryDB, + "us-west-2", + ).WithRefreshIntervalSeconds(600) // Refresh every 10 minutes + + credentials, err := config.NewServerCredentialsWithIam("myIamUser", iamConfig) + if err != nil { + fmt.Printf("Failed to create IAM credentials: %v\n", err) + return + } + + clientConfig := config.NewClientConfiguration(). + WithAddress(&config.NodeAddress{ + Host: "my-cluster.memorydb.amazonaws.com", + Port: 6379, + }). + WithCredentials(credentials). + WithUseTLS(true) + + client, err := glide.NewClient(clientConfig) + if err != nil { + fmt.Printf("Failed to create client: %v\n", err) + return + } + defer client.Close() + + response, err := client.Set(context.Background(), "memorydb-key", "memorydb-value") + if err != nil { + fmt.Printf("SET failed: %v\n", err) + return + } + fmt.Printf("MemoryDB SET response: %s\n", response) +} + +// ManualRefreshExample demonstrates manually refreshing IAM token +func ManualRefreshExample() { + iamConfig := config.NewIamAuthConfig( + "my-cluster", + config.Elasticache, + "us-east-1", + ) + + credentials, err := config.NewServerCredentialsWithIam("myIamUser", iamConfig) + if err != nil { + fmt.Printf("Failed to create IAM credentials: %v\n", err) + return + } + + clientConfig := config.NewClientConfiguration(). + WithAddress(&config.NodeAddress{ + Host: "my-cluster.cache.amazonaws.com", + Port: 6379, + }). + WithCredentials(credentials). + WithUseTLS(true) + + client, err := glide.NewClient(clientConfig) + if err != nil { + fmt.Printf("Failed to create client: %v\n", err) + return + } + defer client.Close() + + // Perform some operations + _, err = client.Set(context.Background(), "key1", "value1") + if err != nil { + fmt.Printf("SET failed: %v\n", err) + return + } + + // Manually refresh the IAM token if needed + // (normally this happens automatically based on refreshIntervalSeconds) + refreshResponse, err := client.RefreshIamToken(context.Background()) + if err != nil { + fmt.Printf("Token refresh failed: %v\n", err) + return + } + fmt.Printf("Token refresh response: %s\n", refreshResponse) + + // Continue with operations using the refreshed token + _, err = client.Set(context.Background(), "key2", "value2") + if err != nil { + fmt.Printf("SET failed: %v\n", err) + return + } +} + +// ClusterExample demonstrates IAM authentication with cluster mode +func ClusterExample() { + iamConfig := config.NewIamAuthConfig( + "my-cluster", + config.Elasticache, + "us-east-1", + ) + + credentials, err := config.NewServerCredentialsWithIam("myIamUser", iamConfig) + if err != nil { + fmt.Printf("Failed to create IAM credentials: %v\n", err) + return + } + + clusterConfig := config.NewClusterClientConfiguration(). + WithAddress(&config.NodeAddress{ + Host: "my-cluster.cache.amazonaws.com", + Port: 6379, + }). + WithCredentials(credentials). + WithUseTLS(true) + + client, err := glide.NewClusterClient(clusterConfig) + if err != nil { + fmt.Printf("Failed to create cluster client: %v\n", err) + return + } + defer client.Close() + + response, err := client.Set(context.Background(), "cluster-key", "cluster-value") + if err != nil { + fmt.Printf("SET failed: %v\n", err) + return + } + fmt.Printf("Cluster SET response: %s\n", response) + + // Manually refresh IAM token for cluster client + refreshResponse, err := client.RefreshIamToken(context.Background()) + if err != nil { + fmt.Printf("Token refresh failed: %v\n", err) + return + } + fmt.Printf("Token refresh response: %s\n", refreshResponse) +} From ba4faf62ae12b8763fab086c570e8f4372cc671c Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Sun, 19 Oct 2025 19:14:10 -0700 Subject: [PATCH 02/11] fix miri tests Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- ffi/miri-tests/mock-glide-core/src/client.rs | 4 ++++ ffi/src/lib.rs | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/ffi/miri-tests/mock-glide-core/src/client.rs b/ffi/miri-tests/mock-glide-core/src/client.rs index 969ec612bc..de05aa76ab 100644 --- a/ffi/miri-tests/mock-glide-core/src/client.rs +++ b/ffi/miri-tests/mock-glide-core/src/client.rs @@ -83,4 +83,8 @@ impl Client { ) -> RedisResult { todo!() } + + pub async fn refresh_iam_token(&mut self) -> RedisResult<()> { + todo!() + } } diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index d325255ea2..710b58173f 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -19,9 +19,8 @@ use redis::ErrorKind; use redis::ObjectType; use redis::ScanStateRC; use redis::cluster_routing::{ - MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr, + MultipleNodeRoutingInfo, Route, RoutingInfo, ResponsePolicy, SingleNodeRoutingInfo, SlotAddr, }; -use redis::cluster_routing::{ResponsePolicy, Routable}; use redis::{ClusterScanArgs, RedisError}; use redis::{Cmd, Pipeline, PipelineRetryStrategy, RedisResult, Value}; use std::ffi::CStr; From 122b6240f10b81f3c57404de98270dec5bd510c9 Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Sun, 19 Oct 2025 19:17:50 -0700 Subject: [PATCH 03/11] lint fix Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- ffi/src/lib.rs | 2 +- go/config/config_test.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index 710b58173f..bed4fa6e21 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -19,7 +19,7 @@ use redis::ErrorKind; use redis::ObjectType; use redis::ScanStateRC; use redis::cluster_routing::{ - MultipleNodeRoutingInfo, Route, RoutingInfo, ResponsePolicy, SingleNodeRoutingInfo, SlotAddr, + MultipleNodeRoutingInfo, ResponsePolicy, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr, }; use redis::{ClusterScanArgs, RedisError}; use redis::{Cmd, Pipeline, PipelineRetryStrategy, RedisResult, Value}; diff --git a/go/config/config_test.go b/go/config/config_test.go index 1b76e16d61..ddfb3e5484 100644 --- a/go/config/config_test.go +++ b/go/config/config_test.go @@ -206,11 +206,11 @@ func TestServerCredentials(t *testing.T) { func TestServerCredentialsWithIam(t *testing.T) { iamConfig := NewIamAuthConfig("my-cluster", Elasticache, "us-east-1") creds, err := NewServerCredentialsWithIam("myUser", iamConfig) - + assert.Nil(t, err) assert.NotNil(t, creds) assert.True(t, creds.IsIamAuth()) - + authInfo := creds.toProtobuf() assert.Equal(t, "myUser", authInfo.Username) assert.Equal(t, "", authInfo.Password) @@ -225,10 +225,10 @@ func TestServerCredentialsWithIamCustomRefresh(t *testing.T) { iamConfig := NewIamAuthConfig("my-cluster", MemoryDB, "us-west-2"). WithRefreshIntervalSeconds(600) creds, err := NewServerCredentialsWithIam("myUser", iamConfig) - + assert.Nil(t, err) assert.NotNil(t, creds) - + authInfo := creds.toProtobuf() assert.Equal(t, protobuf.ServiceType_MEMORYDB, authInfo.IamCredentials.ServiceType) assert.Equal(t, uint32(600), *authInfo.IamCredentials.RefreshIntervalSeconds) @@ -237,7 +237,7 @@ func TestServerCredentialsWithIamCustomRefresh(t *testing.T) { func TestServerCredentialsWithIamRequiresUsername(t *testing.T) { iamConfig := NewIamAuthConfig("my-cluster", Elasticache, "us-east-1") creds, err := NewServerCredentialsWithIam("", iamConfig) - + assert.NotNil(t, err) assert.Nil(t, creds) assert.Contains(t, err.Error(), "username is required") @@ -245,7 +245,7 @@ func TestServerCredentialsWithIamRequiresUsername(t *testing.T) { func TestServerCredentialsWithIamRequiresConfig(t *testing.T) { creds, err := NewServerCredentialsWithIam("myUser", nil) - + assert.NotNil(t, err) assert.Nil(t, creds) assert.Contains(t, err.Error(), "iamConfig cannot be nil") From 83b60cefa5338ea0553cedf4f893a2b9601509c5 Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Sun, 19 Oct 2025 19:39:07 -0700 Subject: [PATCH 04/11] fix ffi Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- ffi/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index bed4fa6e21..d325255ea2 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -19,8 +19,9 @@ use redis::ErrorKind; use redis::ObjectType; use redis::ScanStateRC; use redis::cluster_routing::{ - MultipleNodeRoutingInfo, ResponsePolicy, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr, + MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr, }; +use redis::cluster_routing::{ResponsePolicy, Routable}; use redis::{ClusterScanArgs, RedisError}; use redis::{Cmd, Pipeline, PipelineRetryStrategy, RedisResult, Value}; use std::ffi::CStr; From 0ebcd4aa021dc81b0fd2271f040638267c864cf7 Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Mon, 20 Oct 2025 14:07:29 -0700 Subject: [PATCH 05/11] clean up tests Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- go/config/config_test.go | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/go/config/config_test.go b/go/config/config_test.go index ddfb3e5484..49a20cfc21 100644 --- a/go/config/config_test.go +++ b/go/config/config_test.go @@ -251,30 +251,6 @@ func TestServerCredentialsWithIamRequiresConfig(t *testing.T) { assert.Contains(t, err.Error(), "iamConfig cannot be nil") } -func TestOldServerCredentialsTests(t *testing.T) { - parameters := []struct { - input *ServerCredentials - expected *protobuf.AuthenticationInfo - }{ - { - NewServerCredentials("username", "password"), - &protobuf.AuthenticationInfo{Username: "username", Password: "password"}, - }, - { - NewServerCredentialsWithDefaultUsername("password"), - &protobuf.AuthenticationInfo{Password: "password"}, - }, - } - - for i, parameter := range parameters { - t.Run(fmt.Sprintf("Testing [%v]", i), func(t *testing.T) { - result := parameter.input.toProtobuf() - - assert.Equal(t, parameter.expected, result) - }) - } -} - func TestConfig_AzAffinity(t *testing.T) { hosts := []string{"host1", "host2"} ports := []int{1234, 5678} From 3f483bc524053f40ad1481a1f37d419dccf204af Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Wed, 22 Oct 2025 15:06:41 -0700 Subject: [PATCH 06/11] Update go/config/config.go Co-authored-by: Taylor Curran Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- go/config/config.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/config/config.go b/go/config/config.go index 13012e487f..4fdabe6d58 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -38,10 +38,10 @@ func (addr *NodeAddress) toProtobuf() *protobuf.NodeAddress { type ServiceType int const ( - // Elasticache represents Amazon ElastiCache service. - Elasticache ServiceType = iota - // MemoryDB represents Amazon MemoryDB service. - MemoryDB + // Amazon ElastiCache service. + ElastiCache ServiceType = iota + // Amazon MemoryDB service. + MemoryDb ) // IamAuthConfig represents configuration settings for IAM authentication. From d69f955eea3715f8ba1f94f0f1e0828e5b638348 Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Wed, 22 Oct 2025 15:32:24 -0700 Subject: [PATCH 07/11] refactored refreshIntervalSeconds reverted config_tests change fix documentation Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- go/base_client.go | 51 +++++++++++++++++++++++++++++++++++----- go/config/config.go | 10 ++++---- go/config/config_test.go | 9 ++++--- 3 files changed, 55 insertions(+), 15 deletions(-) diff --git a/go/base_client.go b/go/base_client.go index c123355a9f..4223c2c1fc 100644 --- a/go/base_client.go +++ b/go/base_client.go @@ -731,6 +731,21 @@ func (client *baseClient) ResetConnectionPassword(ctx context.Context) (string, return client.submitConnectionPasswordUpdate(ctx, "", false) } +// submitRefreshIamToken is the internal implementation for manually refreshing the IAM authentication token. +// +// This method sends a refresh request to the core client to generate a new IAM token and update +// the connection. It handles context cancellation and manages the asynchronous communication with +// the underlying C client. +// +// Parameters: +// +// ctx - The context for controlling the command execution and cancellation. +// +// Return value: +// +// Returns "OK" on successful token refresh, or an error if the operation fails. +// +// Note: This is an internal method. Use RefreshIamToken() for the public API. func (client *baseClient) submitRefreshIamToken(ctx context.Context) (string, error) { // Check if context is already done select { @@ -795,25 +810,49 @@ func (client *baseClient) submitRefreshIamToken(ctx context.Context) (string, er return handleOkResponse(payload.value) } -// RefreshIamToken manually refreshes the IAM token for the current connection. +// RefreshIamToken manually refreshes the IAM authentication token for the current connection. // -// This method is only available if the client was created with IAM authentication. -// It triggers an immediate refresh of the IAM token and updates the connection. +// This method is only available if the client was created with IAM authentication +// (using [ServerCredentials] with [IamAuthConfig]). It triggers an immediate refresh of the +// IAM token and updates the connection with the new token. +// +// Normally, IAM tokens are refreshed automatically based on the refresh interval configured +// in [IamAuthConfig]. Use this method when you need to force an immediate token refresh, +// such as when credentials have been rotated or when troubleshooting authentication issues. // // Parameters: // -// ctx - The context for controlling the command execution. +// ctx - The context for controlling the command execution and cancellation. // // Return value: // -// `"OK"` response on success. +// Returns "OK" on successful token refresh. +// +// Errors: +// +// Returns an error if: +// - The client was not configured with IAM authentication +// - The token refresh operation fails +// - The context is cancelled +// - The client is closed // // Example: // +// // Create client with IAM authentication +// iamConfig := config.NewIamAuthConfig("my-cluster", config.ElastiCache, "us-east-1") +// creds, _ := config.NewServerCredentialsWithIam("myuser", iamConfig) +// clientConfig := config.NewClientConfiguration().WithCredentials(creds) +// client, _ := glide.NewClient(clientConfig) +// +// // Manually refresh the token // result, err := client.RefreshIamToken(context.Background()) // if err != nil { -// // handle error +// log.Printf("Token refresh failed: %v", err) +// return // } +// log.Printf("Token refreshed: %s", result) // "OK" +// +// See also: [IamAuthConfig], [ServerCredentials], [NewServerCredentialsWithIam] func (client *baseClient) RefreshIamToken(ctx context.Context) (string, error) { return client.submitRefreshIamToken(ctx) } diff --git a/go/config/config.go b/go/config/config.go index 4fdabe6d58..cfebe889f7 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -53,18 +53,16 @@ type IamAuthConfig struct { // The AWS region where the ElastiCache/MemoryDB cluster is located. region string // Optional refresh interval in seconds for renewing IAM authentication tokens. - // If not provided, defaults to 300 seconds (5 min). + // If not provided, the core will use its default value. refreshIntervalSeconds *uint32 } // NewIamAuthConfig returns an [IamAuthConfig] struct with the given configuration. func NewIamAuthConfig(clusterName string, service ServiceType, region string) *IamAuthConfig { - defaultRefresh := uint32(300) return &IamAuthConfig{ - clusterName: clusterName, - service: service, - region: region, - refreshIntervalSeconds: &defaultRefresh, + clusterName: clusterName, + service: service, + region: region, } } diff --git a/go/config/config_test.go b/go/config/config_test.go index 49a20cfc21..1e206dc3dc 100644 --- a/go/config/config_test.go +++ b/go/config/config_test.go @@ -197,9 +197,12 @@ func TestServerCredentials(t *testing.T) { }, } - for _, param := range parameters { - result := param.input.toProtobuf() - assert.Equal(t, param.expected, result) + for i, parameter := range parameters { + t.Run(fmt.Sprintf("Testing [%v]", i), func(t *testing.T) { + result := parameter.input.toProtobuf() + + assert.Equal(t, parameter.expected, result) + }) } } From 472c426eabb29fe833d7aedd77b65ad01a08e095 Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Wed, 22 Oct 2025 15:33:40 -0700 Subject: [PATCH 08/11] added changelog Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fe4f50037..a8db2af78d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * JAVA: Add refreshTopologyFromInitialNodes Configuration Option ([#4870](https://github.com/valkey-io/valkey-glide/pull/4870)) * GO: add RefreshTopologyFromInitialNodes configuration option ([#4871](https://github.com/valkey-io/valkey-glide/pull/4871)) * Node: add refreshTopologyFromInitialNodes configuration option ([#4872](https://github.com/valkey-io/valkey-glide/pull/4872)) +* FFI/GO: Add IAM authentication support with automatic token refresh ([#4892](https://github.com/valkey-io/valkey-glide/pull/4892)) #### Fixes From 1010e28937988bdbf2a958ce37242aaa0550c9e8 Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Wed, 22 Oct 2025 15:58:39 -0700 Subject: [PATCH 09/11] fix ElastiCache spelling Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- go/config/config.go | 4 ++-- go/config/config_test.go | 4 ++-- go/examples/iam_authentication_example.go | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go/config/config.go b/go/config/config.go index cfebe889f7..b722ec25ee 100644 --- a/go/config/config.go +++ b/go/config/config.go @@ -41,7 +41,7 @@ const ( // Amazon ElastiCache service. ElastiCache ServiceType = iota // Amazon MemoryDB service. - MemoryDb + MemoryDB ) // IamAuthConfig represents configuration settings for IAM authentication. @@ -78,7 +78,7 @@ func (config *IamAuthConfig) toProtobuf() *protobuf.IamCredentials { Region: config.region, } - if config.service == Elasticache { + if config.service == ElastiCache { iamCreds.ServiceType = protobuf.ServiceType_ELASTICACHE } else { iamCreds.ServiceType = protobuf.ServiceType_MEMORYDB diff --git a/go/config/config_test.go b/go/config/config_test.go index 1e206dc3dc..deefd62892 100644 --- a/go/config/config_test.go +++ b/go/config/config_test.go @@ -207,7 +207,7 @@ func TestServerCredentials(t *testing.T) { } func TestServerCredentialsWithIam(t *testing.T) { - iamConfig := NewIamAuthConfig("my-cluster", Elasticache, "us-east-1") + iamConfig := NewIamAuthConfig("my-cluster", ElastiCache, "us-east-1") creds, err := NewServerCredentialsWithIam("myUser", iamConfig) assert.Nil(t, err) @@ -238,7 +238,7 @@ func TestServerCredentialsWithIamCustomRefresh(t *testing.T) { } func TestServerCredentialsWithIamRequiresUsername(t *testing.T) { - iamConfig := NewIamAuthConfig("my-cluster", Elasticache, "us-east-1") + iamConfig := NewIamAuthConfig("my-cluster", ElastiCache, "us-east-1") creds, err := NewServerCredentialsWithIam("", iamConfig) assert.NotNil(t, err) diff --git a/go/examples/iam_authentication_example.go b/go/examples/iam_authentication_example.go index 35797c7c42..ef99bdab82 100644 --- a/go/examples/iam_authentication_example.go +++ b/go/examples/iam_authentication_example.go @@ -17,12 +17,12 @@ import ( // - Create a client with IAM credentials // - Manually refresh IAM tokens -// ElasticacheExample demonstrates connecting to ElastiCache with IAM authentication -func ElasticacheExample() { +// ElastiCacheExample demonstrates connecting to ElastiCache with IAM authentication +func ElastiCacheExample() { // Configure IAM authentication iamConfig := config.NewIamAuthConfig( "my-elasticache-cluster", - config.Elasticache, + config.ElastiCache, "us-east-1", ) // Uses default 300 second refresh interval @@ -108,7 +108,7 @@ func MemoryDBExample() { func ManualRefreshExample() { iamConfig := config.NewIamAuthConfig( "my-cluster", - config.Elasticache, + config.ElastiCache, "us-east-1", ) @@ -161,7 +161,7 @@ func ManualRefreshExample() { func ClusterExample() { iamConfig := config.NewIamAuthConfig( "my-cluster", - config.Elasticache, + config.ElastiCache, "us-east-1", ) From 1a8f3a0de330f195bf41daba5c710fb8faefe4ec Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Wed, 22 Oct 2025 16:39:39 -0700 Subject: [PATCH 10/11] fix go testing Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- go/config/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/config/config_test.go b/go/config/config_test.go index deefd62892..51b0a218c3 100644 --- a/go/config/config_test.go +++ b/go/config/config_test.go @@ -221,7 +221,7 @@ func TestServerCredentialsWithIam(t *testing.T) { assert.Equal(t, "my-cluster", authInfo.IamCredentials.ClusterName) assert.Equal(t, "us-east-1", authInfo.IamCredentials.Region) assert.Equal(t, protobuf.ServiceType_ELASTICACHE, authInfo.IamCredentials.ServiceType) - assert.Equal(t, uint32(300), *authInfo.IamCredentials.RefreshIntervalSeconds) + assert.Nil(t, authInfo.IamCredentials.RefreshIntervalSeconds) } func TestServerCredentialsWithIamCustomRefresh(t *testing.T) { From 9b4f5d07a7f03e067f457a00965ac9da4fda0f5c Mon Sep 17 00:00:00 2001 From: affonsov <67347924+affonsov@users.noreply.github.com> Date: Thu, 23 Oct 2025 10:26:19 -0700 Subject: [PATCH 11/11] removing int tests Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com> --- go/examples/iam_authentication_example.go | 203 ---------------------- 1 file changed, 203 deletions(-) delete mode 100644 go/examples/iam_authentication_example.go diff --git a/go/examples/iam_authentication_example.go b/go/examples/iam_authentication_example.go deleted file mode 100644 index ef99bdab82..0000000000 --- a/go/examples/iam_authentication_example.go +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 - -package examples - -import ( - "context" - "fmt" - - "github.com/valkey-io/valkey-glide/go/v2" - "github.com/valkey-io/valkey-glide/go/v2/config" -) - -// Example demonstrating IAM authentication with Valkey GLIDE client. -// -// This example shows how to: -// - Configure IAM authentication for ElastiCache or MemoryDB -// - Create a client with IAM credentials -// - Manually refresh IAM tokens - -// ElastiCacheExample demonstrates connecting to ElastiCache with IAM authentication -func ElastiCacheExample() { - // Configure IAM authentication - iamConfig := config.NewIamAuthConfig( - "my-elasticache-cluster", - config.ElastiCache, - "us-east-1", - ) // Uses default 300 second refresh interval - - // Create credentials with IAM config - credentials, err := config.NewServerCredentialsWithIam("myIamUser", iamConfig) - if err != nil { - fmt.Printf("Failed to create IAM credentials: %v\n", err) - return - } - - // Create client configuration - clientConfig := config.NewClientConfiguration(). - WithAddress(&config.NodeAddress{ - Host: "my-cluster.cache.amazonaws.com", - Port: 6379, - }). - WithCredentials(credentials). - WithUseTLS(true) // Recommended for AWS services - - // Create and use the client - client, err := glide.NewClient(clientConfig) - if err != nil { - fmt.Printf("Failed to create client: %v\n", err) - return - } - defer client.Close() - - // The client will automatically refresh IAM tokens based on the refresh interval - response, err := client.Set(context.Background(), "key", "value") - if err != nil { - fmt.Printf("SET failed: %v\n", err) - return - } - fmt.Printf("ElastiCache SET response: %s\n", response) - - value, err := client.Get(context.Background(), "key") - if err != nil { - fmt.Printf("GET failed: %v\n", err) - return - } - fmt.Printf("ElastiCache GET response: %s\n", value.Value()) -} - -// MemoryDBExample demonstrates connecting to MemoryDB with custom refresh interval -func MemoryDBExample() { - // Configure IAM authentication with custom refresh interval - iamConfig := config.NewIamAuthConfig( - "my-memorydb-cluster", - config.MemoryDB, - "us-west-2", - ).WithRefreshIntervalSeconds(600) // Refresh every 10 minutes - - credentials, err := config.NewServerCredentialsWithIam("myIamUser", iamConfig) - if err != nil { - fmt.Printf("Failed to create IAM credentials: %v\n", err) - return - } - - clientConfig := config.NewClientConfiguration(). - WithAddress(&config.NodeAddress{ - Host: "my-cluster.memorydb.amazonaws.com", - Port: 6379, - }). - WithCredentials(credentials). - WithUseTLS(true) - - client, err := glide.NewClient(clientConfig) - if err != nil { - fmt.Printf("Failed to create client: %v\n", err) - return - } - defer client.Close() - - response, err := client.Set(context.Background(), "memorydb-key", "memorydb-value") - if err != nil { - fmt.Printf("SET failed: %v\n", err) - return - } - fmt.Printf("MemoryDB SET response: %s\n", response) -} - -// ManualRefreshExample demonstrates manually refreshing IAM token -func ManualRefreshExample() { - iamConfig := config.NewIamAuthConfig( - "my-cluster", - config.ElastiCache, - "us-east-1", - ) - - credentials, err := config.NewServerCredentialsWithIam("myIamUser", iamConfig) - if err != nil { - fmt.Printf("Failed to create IAM credentials: %v\n", err) - return - } - - clientConfig := config.NewClientConfiguration(). - WithAddress(&config.NodeAddress{ - Host: "my-cluster.cache.amazonaws.com", - Port: 6379, - }). - WithCredentials(credentials). - WithUseTLS(true) - - client, err := glide.NewClient(clientConfig) - if err != nil { - fmt.Printf("Failed to create client: %v\n", err) - return - } - defer client.Close() - - // Perform some operations - _, err = client.Set(context.Background(), "key1", "value1") - if err != nil { - fmt.Printf("SET failed: %v\n", err) - return - } - - // Manually refresh the IAM token if needed - // (normally this happens automatically based on refreshIntervalSeconds) - refreshResponse, err := client.RefreshIamToken(context.Background()) - if err != nil { - fmt.Printf("Token refresh failed: %v\n", err) - return - } - fmt.Printf("Token refresh response: %s\n", refreshResponse) - - // Continue with operations using the refreshed token - _, err = client.Set(context.Background(), "key2", "value2") - if err != nil { - fmt.Printf("SET failed: %v\n", err) - return - } -} - -// ClusterExample demonstrates IAM authentication with cluster mode -func ClusterExample() { - iamConfig := config.NewIamAuthConfig( - "my-cluster", - config.ElastiCache, - "us-east-1", - ) - - credentials, err := config.NewServerCredentialsWithIam("myIamUser", iamConfig) - if err != nil { - fmt.Printf("Failed to create IAM credentials: %v\n", err) - return - } - - clusterConfig := config.NewClusterClientConfiguration(). - WithAddress(&config.NodeAddress{ - Host: "my-cluster.cache.amazonaws.com", - Port: 6379, - }). - WithCredentials(credentials). - WithUseTLS(true) - - client, err := glide.NewClusterClient(clusterConfig) - if err != nil { - fmt.Printf("Failed to create cluster client: %v\n", err) - return - } - defer client.Close() - - response, err := client.Set(context.Background(), "cluster-key", "cluster-value") - if err != nil { - fmt.Printf("SET failed: %v\n", err) - return - } - fmt.Printf("Cluster SET response: %s\n", response) - - // Manually refresh IAM token for cluster client - refreshResponse, err := client.RefreshIamToken(context.Background()) - if err != nil { - fmt.Printf("Token refresh failed: %v\n", err) - return - } - fmt.Printf("Token refresh response: %s\n", refreshResponse) -}