Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions csharp/src/Drivers/Databricks/DatabricksConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,8 @@ protected override HttpMessageHandler CreateHttpHandler()
{
// Add retry handler for 408, 502, 503, 504 responses with Retry-After support
// This must be INSIDE ThriftErrorMessageHandler so retries happen before exceptions are thrown
baseHandler = new RetryHttpHandler(baseHandler, TemporarilyUnavailableRetryTimeout);
baseAuthHandler = new RetryHttpHandler(baseAuthHandler, TemporarilyUnavailableRetryTimeout);
baseHandler = new RetryHttpHandler(baseHandler, this, TemporarilyUnavailableRetryTimeout);
baseAuthHandler = new RetryHttpHandler(baseAuthHandler, this, TemporarilyUnavailableRetryTimeout);
}

// Add Thrift error message handler AFTER retry handler (OUTSIDE in the chain)
Expand Down
190 changes: 115 additions & 75 deletions csharp/src/Drivers/Databricks/RetryHttpHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@
*/

using System;
using System.Diagnostics;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
using Apache.Arrow.Adbc.Tracing;

namespace Apache.Arrow.Adbc.Drivers.Databricks
{
/// <summary>
/// HTTP handler that implements retry behavior for 408, 502, 503, and 504 responses.
/// Uses Retry-After header if present, otherwise uses exponential backoff.
/// </summary>
internal class RetryHttpHandler : DelegatingHandler
internal class RetryHttpHandler : DelegatingHandler, IActivityTracer
{
private readonly IActivityTracer _activityTracer;
private readonly int _retryTimeoutSeconds;
private readonly int _initialBackoffSeconds = 1;
private readonly int _maxBackoffSeconds = 32;
Expand All @@ -38,114 +41,151 @@ internal class RetryHttpHandler : DelegatingHandler
/// Initializes a new instance of the <see cref="RetryHttpHandler"/> class.
/// </summary>
/// <param name="innerHandler">The inner handler to delegate to.</param>
/// <param name="activityTracer">The activity tracer for logging and diagnostics.</param>
/// <param name="retryTimeoutSeconds">Maximum total time in seconds to retry before failing.</param>
public RetryHttpHandler(HttpMessageHandler innerHandler, int retryTimeoutSeconds)
public RetryHttpHandler(HttpMessageHandler innerHandler, IActivityTracer activityTracer, int retryTimeoutSeconds)
: base(innerHandler)
{
_activityTracer = activityTracer;
_retryTimeoutSeconds = retryTimeoutSeconds;
}

// IActivityTracer implementation - delegates to the connection
ActivityTrace IActivityTracer.Trace => _activityTracer.Trace;
string? IActivityTracer.TraceParent => _activityTracer.TraceParent;
string IActivityTracer.AssemblyVersion => _activityTracer.AssemblyVersion;
string IActivityTracer.AssemblyName => _activityTracer.AssemblyName;

/// <summary>
/// Sends an HTTP request to the inner handler with retry logic for retryable status codes.
/// </summary>
protected override async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken cancellationToken)
{
// Clone the request content if it's not null so we can reuse it for retries
var requestContentClone = request.Content != null
? await CloneHttpContentAsync(request.Content)
: null;

HttpResponseMessage response;
string? lastErrorMessage = null;
DateTime startTime = DateTime.UtcNow;
int attemptCount = 0;
int currentBackoffSeconds = _initialBackoffSeconds;
int totalRetrySeconds = 0;

do
return await this.TraceActivityAsync(async activity =>
{
// Set the content for each attempt (if needed)
if (requestContentClone != null && request.Content == null)
// Clone the request content if it's not null so we can reuse it for retries
var requestContentClone = request.Content != null
? await CloneHttpContentAsync(request.Content)
: null;

HttpResponseMessage response;
string? lastErrorMessage = null;
DateTime startTime = DateTime.UtcNow;
int attemptCount = 0;
int currentBackoffSeconds = _initialBackoffSeconds;
int totalRetrySeconds = 0;

do
{
request.Content = await CloneHttpContentAsync(requestContentClone);
}
// Set the content for each attempt (if needed)
if (requestContentClone != null && request.Content == null)
{
request.Content = await CloneHttpContentAsync(requestContentClone);
}

response = await base.SendAsync(request, cancellationToken);
response = await base.SendAsync(request, cancellationToken);

// If it's not a retryable status code, return immediately
if (!IsRetryableStatusCode(response.StatusCode))
{
return response;
}
// If it's not a retryable status code, return immediately
if (!IsRetryableStatusCode(response.StatusCode))
{
// Only log retry summary if retries occurred
if (attemptCount > 0)
{
activity?.SetTag("http.retry.total_attempts", attemptCount);
activity?.SetTag("http.response.status_code", (int)response.StatusCode);
}
return response;
}

attemptCount++;
attemptCount++;

// Check if we've exceeded the timeout
TimeSpan elapsedTime = DateTime.UtcNow - startTime;
if (_retryTimeoutSeconds > 0 && elapsedTime.TotalSeconds > _retryTimeoutSeconds)
{
// We've exceeded the timeout, so break out of the loop
break;
}
activity?.SetTag("http.retry.attempt", attemptCount);
activity?.SetTag("http.response.status_code", (int)response.StatusCode);

int waitSeconds;
// Check if we've exceeded the timeout
TimeSpan elapsedTime = DateTime.UtcNow - startTime;
if (_retryTimeoutSeconds > 0 && elapsedTime.TotalSeconds > _retryTimeoutSeconds)
{
// We've exceeded the timeout, so break out of the loop
break;
}

// Check for Retry-After header
if (response.Headers.TryGetValues("Retry-After", out var retryAfterValues))
{
// Parse the Retry-After value
string retryAfterValue = string.Join(",", retryAfterValues);
if (int.TryParse(retryAfterValue, out int retryAfterSeconds) && retryAfterSeconds > 0)
int waitSeconds;

// Check for Retry-After header
if (response.Headers.TryGetValues("Retry-After", out var retryAfterValues))
{
// Use the Retry-After value
waitSeconds = retryAfterSeconds;
lastErrorMessage = $"Service temporarily unavailable (HTTP {(int)response.StatusCode}). Using server-specified retry after {waitSeconds} seconds. Attempt {attemptCount}.";
// Parse the Retry-After value
string retryAfterValue = string.Join(",", retryAfterValues);
if (int.TryParse(retryAfterValue, out int retryAfterSeconds) && retryAfterSeconds > 0)
{
// Use the Retry-After value
waitSeconds = retryAfterSeconds;
lastErrorMessage = $"Service temporarily unavailable (HTTP {(int)response.StatusCode}). Using server-specified retry after {waitSeconds} seconds. Attempt {attemptCount}.";
}
else
{
// Invalid Retry-After value, use exponential backoff
waitSeconds = CalculateBackoffWithJitter(currentBackoffSeconds);
lastErrorMessage = $"Service temporarily unavailable (HTTP {(int)response.StatusCode}). Invalid Retry-After header, using exponential backoff of {waitSeconds} seconds. Attempt {attemptCount}.";
}
}
else
{
// Invalid Retry-After value, use exponential backoff
// No Retry-After header, use exponential backoff
waitSeconds = CalculateBackoffWithJitter(currentBackoffSeconds);
lastErrorMessage = $"Service temporarily unavailable (HTTP {(int)response.StatusCode}). Invalid Retry-After header, using exponential backoff of {waitSeconds} seconds. Attempt {attemptCount}.";
lastErrorMessage = $"Service temporarily unavailable (HTTP {(int)response.StatusCode}). Using exponential backoff of {waitSeconds} seconds. Attempt {attemptCount}.";
}
}
else
{
// No Retry-After header, use exponential backoff
waitSeconds = CalculateBackoffWithJitter(currentBackoffSeconds);
lastErrorMessage = $"Service temporarily unavailable (HTTP {(int)response.StatusCode}). Using exponential backoff of {waitSeconds} seconds. Attempt {attemptCount}.";
}

// Dispose the response before retrying
response.Dispose();
// Dispose the response before retrying
response.Dispose();

// Reset the request content for the next attempt
request.Content = null;
// Reset the request content for the next attempt
request.Content = null;

// Update total retry time
totalRetrySeconds += waitSeconds;
if (_retryTimeoutSeconds > 0 && totalRetrySeconds > _retryTimeoutSeconds)
{
// We've exceeded the timeout, so break out of the loop
break;
}
// Update total retry time
totalRetrySeconds += waitSeconds;
if (_retryTimeoutSeconds > 0 && totalRetrySeconds > _retryTimeoutSeconds)
{
// We've exceeded the timeout, so break out of the loop
break;
}

// Wait for the calculated time
await Task.Delay(TimeSpan.FromSeconds(waitSeconds), cancellationToken);
// Wait for the calculated time
await Task.Delay(TimeSpan.FromSeconds(waitSeconds), cancellationToken);

// Increase backoff for next attempt (exponential backoff)
currentBackoffSeconds = Math.Min(currentBackoffSeconds * 2, _maxBackoffSeconds);
} while (!cancellationToken.IsCancellationRequested);
// Increase backoff for next attempt (exponential backoff)
currentBackoffSeconds = Math.Min(currentBackoffSeconds * 2, _maxBackoffSeconds);
} while (!cancellationToken.IsCancellationRequested);

// If we get here, we've either exceeded the timeout or been cancelled
if (cancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException("Request cancelled during retry wait", cancellationToken);
}
activity?.SetTag("http.retry.total_attempts", attemptCount);
activity?.SetTag("http.response.status_code", (int)response.StatusCode);

// If we get here, we've either exceeded the timeout or been cancelled
if (cancellationToken.IsCancellationRequested)
{
activity?.SetTag("http.retry.outcome", "cancelled");
var cancelEx = new OperationCanceledException("Request cancelled during retry wait", cancellationToken);
activity?.AddException(cancelEx, [
new("error.context", "http.retry.cancelled"),
new("attempts", attemptCount)
]);
throw cancelEx;
}

throw new DatabricksException(lastErrorMessage ?? "Service temporarily unavailable and retry timeout exceeded", AdbcStatusCode.IOError)
.SetSqlState("08001");
// Timeout exceeded
activity?.SetTag("http.retry.outcome", "timeout_exceeded");
var exception = new DatabricksException(lastErrorMessage ?? "Service temporarily unavailable and retry timeout exceeded", AdbcStatusCode.IOError).SetSqlState("08001");
activity?.AddException(exception, [
new("error.context", "http.retry.timeout_exceeded"),
new("attempts", attemptCount),
new("total_retry_seconds", totalRetrySeconds),
new("timeout_seconds", _retryTimeoutSeconds)
]);
throw exception;
}, activityName: "RetryHttp");
}

/// <summary>
Expand Down
36 changes: 28 additions & 8 deletions csharp/test/Drivers/Databricks/Unit/RetryHttpHandlerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Databricks;
using Apache.Arrow.Adbc.Tracing;
using Xunit;

namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit
Expand All @@ -34,6 +35,17 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks.Unit
/// </summary>
public class RetryHttpHandlerTest
{
/// <summary>
/// Mock activity tracer for testing.
/// </summary>
private class MockActivityTracer : IActivityTracer
{
public string? TraceParent { get; set; }
public ActivityTrace Trace => new ActivityTrace("TestSource", "1.0.0", TraceParent);
public string AssemblyVersion => "1.0.0";
public string AssemblyName => "TestAssembly";
}

/// <summary>
/// Tests that the RetryHttpHandler properly processes 503 responses with Retry-After headers.
/// </summary>
Expand All @@ -49,7 +61,8 @@ public async Task RetryAfterHandlerProcesses503Response()
});

// Create the RetryHttpHandler with retry enabled and a 5-second timeout
var retryHandler = new RetryHttpHandler(mockHandler, 5);
var mockTracer = new MockActivityTracer();
var retryHandler = new RetryHttpHandler(mockHandler, mockTracer, 5);

// Create an HttpClient with our handler
var httpClient = new HttpClient(retryHandler);
Expand Down Expand Up @@ -84,7 +97,8 @@ public async Task RetryAfterHandlerThrowsWhenTimeoutExceeded()
});

// Create the RetryHttpHandler with retry enabled and a 1-second timeout
var retryHandler = new RetryHttpHandler(mockHandler, 1);
var mockTracer = new MockActivityTracer();
var retryHandler = new RetryHttpHandler(mockHandler, mockTracer, 1);

// Create an HttpClient with our handler
var httpClient = new HttpClient(retryHandler);
Expand Down Expand Up @@ -115,7 +129,8 @@ public async Task RetryAfterHandlerHandlesNonRetryableResponse()
});

// Create the RetryHttpHandler with retry enabled
var retryHandler = new RetryHttpHandler(mockHandler, 5);
var mockTracer = new MockActivityTracer();
var retryHandler = new RetryHttpHandler(mockHandler, mockTracer, 5);

// Create an HttpClient with our handler
var httpClient = new HttpClient(retryHandler);
Expand Down Expand Up @@ -143,7 +158,8 @@ public async Task RetryHandlerUsesExponentialBackoffFor503WithoutRetryAfterHeade
});

// Create the RetryHttpHandler with retry enabled
var retryHandler = new RetryHttpHandler(mockHandler, 5);
var mockTracer = new MockActivityTracer();
var retryHandler = new RetryHttpHandler(mockHandler, mockTracer, 5);

// Create an HttpClient with our handler
var httpClient = new HttpClient(retryHandler);
Expand Down Expand Up @@ -191,7 +207,8 @@ public async Task RetryHandlerUsesExponentialBackoffForInvalidRetryAfterHeader()
});

// Create the RetryHttpHandler with retry enabled
var retryHandler = new RetryHttpHandler(mockHandler, 5);
var mockTracer = new MockActivityTracer();
var retryHandler = new RetryHttpHandler(mockHandler, mockTracer, 5);

// Create an HttpClient with our handler
var httpClient = new HttpClient(retryHandler);
Expand Down Expand Up @@ -223,7 +240,8 @@ public async Task RetryHandlerProcessesRetryableStatusCodes(HttpStatusCode statu
});

// Create the RetryHttpHandler with retry enabled
var retryHandler = new RetryHttpHandler(mockHandler, 5);
var mockTracer = new MockActivityTracer();
var retryHandler = new RetryHttpHandler(mockHandler, mockTracer, 5);

// Create an HttpClient with our handler
var httpClient = new HttpClient(retryHandler);
Expand Down Expand Up @@ -257,7 +275,8 @@ public async Task RetryHandlerHandlesMultipleRetriesWithExponentialBackoff()
});

// Create the RetryHttpHandler with retry enabled and a generous timeout
var retryHandler = new RetryHttpHandler(mockHandler, 10);
var mockTracer = new MockActivityTracer();
var retryHandler = new RetryHttpHandler(mockHandler, mockTracer, 10);

// Create an HttpClient with our handler
var httpClient = new HttpClient(retryHandler);
Expand Down Expand Up @@ -296,7 +315,8 @@ public async Task RetryHandlerThrowsWhenServerNeverRecovers(HttpStatusCode statu
});

// Create the RetryHttpHandler with a short timeout to make the test run faster
var retryHandler = new RetryHttpHandler(mockHandler, 3);
var mockTracer = new MockActivityTracer();
var retryHandler = new RetryHttpHandler(mockHandler, mockTracer, 3);

// Create an HttpClient with our handler
var httpClient = new HttpClient(retryHandler);
Expand Down
Loading