Skip to content

Commit 74200ee

Browse files
authored
fix(csharp/src/Drivers/BigQuery): improve selective handling of cancellation exception (#3615)
After adding cancellation functionality to BigQuery statements, there is a report that `task was cancelled` messages are now appearing. This is likely due to the change where the retry code stops processing when it gets a `OperationCancelledExecption`. This change now only exits early if the cancellation was requested on the passed cancellation token.
1 parent 00e9490 commit 74200ee

File tree

2 files changed

+17
-8
lines changed

2 files changed

+17
-8
lines changed

csharp/src/Drivers/BigQuery/BigQueryStatement.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
120120
}).ConfigureAwait(false);
121121
};
122122

123-
BigQueryResults results = await ExecuteWithRetriesAsync(getJobResults, activity).ConfigureAwait(false);
123+
BigQueryResults results = await ExecuteWithRetriesAsync(getJobResults, activity, cancellationContext.CancellationToken).ConfigureAwait(false);
124124

125125
TokenProtectedReadClientManger clientMgr = new TokenProtectedReadClientManger(Credential);
126126
clientMgr.UpdateToken = () => Task.Run(() =>
@@ -180,7 +180,7 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
180180
throw new AdbcException($"Unable to obtain result from statement [{statementIndex}]", AdbcStatusCode.InvalidData);
181181
};
182182

183-
results = await ExecuteWithRetriesAsync(getMultiJobResults, activity).ConfigureAwait(false);
183+
results = await ExecuteWithRetriesAsync(getMultiJobResults, activity, cancellationContext.CancellationToken).ConfigureAwait(false);
184184
}
185185

186186
if (results?.TableReference == null)
@@ -213,7 +213,7 @@ private async Task<QueryResult> ExecuteQueryInternalAsync()
213213
return await GetArrowReaders(clientMgr, table, results.TableReference.ProjectId, maxStreamCount, activity, context.CancellationToken).ConfigureAwait(false);
214214
}).ConfigureAwait(false);
215215
};
216-
IEnumerable<IArrowReader> readers = await ExecuteWithRetriesAsync(getArrowReadersFunc, activity).ConfigureAwait(false);
216+
IEnumerable<IArrowReader> readers = await ExecuteWithRetriesAsync(getArrowReadersFunc, activity, cancellationContext.CancellationToken).ConfigureAwait(false);
217217

218218
// Note: MultiArrowReader must dispose the cancellationContext.
219219
IArrowArrayStream stream = new MultiArrowReader(this, TranslateSchema(results.Schema), readers, new CancellationContext(cancellationRegistry));
@@ -287,7 +287,7 @@ private async Task<UpdateResult> ExecuteUpdateInternalAsync()
287287
return await context.Job.GetQueryResultsAsync(getQueryResultsOptions, context.CancellationToken).ConfigureAwait(false);
288288
}).ConfigureAwait(false);
289289
};
290-
BigQueryResults? result = await ExecuteWithRetriesAsync(getQueryResultsAsyncFunc, activity);
290+
BigQueryResults? result = await ExecuteWithRetriesAsync(getQueryResultsAsyncFunc, activity, context.CancellationToken);
291291
long updatedRows = result?.NumDmlAffectedRows.HasValue == true ? result.NumDmlAffectedRows.Value : -1L;
292292

293293
activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, updatedRows);
@@ -562,7 +562,8 @@ private TableReference TryGetLargeDestinationTableReference(string datasetId, Ac
562562

563563
public bool TokenRequiresUpdate(Exception ex) => BigQueryUtils.TokenRequiresUpdate(ex);
564564

565-
private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action, Activity? activity) => await RetryManager.ExecuteWithRetriesAsync<T>(this, action, activity, MaxRetryAttempts, RetryDelayMs);
565+
private async Task<T> ExecuteWithRetriesAsync<T>(Func<Task<T>> action, Activity? activity, CancellationToken cancellationToken = default) =>
566+
await RetryManager.ExecuteWithRetriesAsync<T>(this, action, activity, MaxRetryAttempts, RetryDelayMs, cancellationToken);
566567

567568
private async Task<T> ExecuteCancellableJobAsync<T>(
568569
JobCancellationContext context,
@@ -573,8 +574,12 @@ private async Task<T> ExecuteCancellableJobAsync<T>(
573574
{
574575
return await func(context).ConfigureAwait(false);
575576
}
576-
catch (Exception ex) when (BigQueryUtils.ContainsException(ex, out OperationCanceledException? cancelledEx))
577+
catch (Exception ex)
578+
when (context.CancellationToken.IsCancellationRequested &&
579+
BigQueryUtils.ContainsException(ex, out OperationCanceledException? cancelledEx))
577580
{
581+
// Note: OperationCanceledException could be thrown from the call,
582+
// but we only want to handle when the cancellation was requested from the context.
578583
activity?.AddException(cancelledEx!);
579584
try
580585
{

csharp/src/Drivers/BigQuery/RetryManager.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
using System;
2020
using System.Diagnostics;
21+
using System.Threading;
2122
using System.Threading.Tasks;
2223

2324
namespace Apache.Arrow.Adbc.Drivers.BigQuery
@@ -32,7 +33,8 @@ public static async Task<T> ExecuteWithRetriesAsync<T>(
3233
Func<Task<T>> action,
3334
Activity? activity,
3435
int maxRetries = 5,
35-
int initialDelayMilliseconds = 200)
36+
int initialDelayMilliseconds = 200,
37+
CancellationToken cancellationToken = default)
3638
{
3739
if (action == null)
3840
{
@@ -49,8 +51,10 @@ public static async Task<T> ExecuteWithRetriesAsync<T>(
4951
T result = await action();
5052
return result;
5153
}
52-
catch (Exception ex) when (!BigQueryUtils.ContainsException(ex, out OperationCanceledException? _))
54+
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
5355
{
56+
// Note: OperationCanceledException could be thrown from the call,
57+
// but we only want to break out when the cancellation was requested from the caller.
5458
activity?.AddBigQueryTag("retry_attempt", retryCount);
5559
activity?.AddException(ex);
5660

0 commit comments

Comments
 (0)