Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions csharp/src/Drivers/Databricks/DatabricksParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,42 @@ public class DatabricksParameters : SparkParameters
/// </summary>
public const string CloudFetchPrefetchEnabled = "adbc.databricks.cloudfetch.prefetch_enabled";

/// <summary>
/// Whether to enable straggler download detection and mitigation for CloudFetch operations.
/// Default value is false if not specified.
/// </summary>
public const string CloudFetchStragglerMitigationEnabled = "adbc.databricks.cloudfetch.straggler_mitigation_enabled";

/// <summary>
/// Multiplier used to determine straggler threshold based on median throughput.
/// Default value is 1.5 if not specified.
/// </summary>
public const string CloudFetchStragglerMultiplier = "adbc.databricks.cloudfetch.straggler_multiplier";

/// <summary>
/// Fraction of downloads that must complete before straggler detection begins.
/// Valid range: 0.0 to 1.0. Default value is 0.6 (60%) if not specified.
/// </summary>
public const string CloudFetchStragglerQuantile = "adbc.databricks.cloudfetch.straggler_quantile";

/// <summary>
/// Extra buffer time in seconds added to the straggler threshold calculation.
/// Default value is 5 seconds if not specified.
/// </summary>
public const string CloudFetchStragglerPaddingSeconds = "adbc.databricks.cloudfetch.straggler_padding_seconds";

/// <summary>
/// Maximum number of stragglers detected per query before triggering sequential download fallback.
/// Default value is 10 if not specified.
/// </summary>
public const string CloudFetchMaxStragglersPerQuery = "adbc.databricks.cloudfetch.max_stragglers_per_query";

/// <summary>
/// Whether to automatically fall back to sequential downloads when max stragglers threshold is exceeded.
/// Default value is false if not specified.
/// </summary>
public const string CloudFetchSynchronousFallbackEnabled = "adbc.databricks.cloudfetch.synchronous_fallback_enabled";

/// <summary>
/// Maximum bytes per fetch request when retrieving query results from servers.
/// The value can be specified with unit suffixes: B (bytes), KB (kilobytes), MB (megabytes), GB (gigabytes).
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;

namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
{
/// <summary>
/// Configuration for straggler download mitigation feature.
/// </summary>
internal sealed class CloudFetchStragglerMitigationConfig
{
/// <summary>
/// Gets a value indicating whether straggler mitigation is enabled.
/// </summary>
public bool Enabled { get; }

/// <summary>
/// Gets the straggler throughput multiplier.
/// A download is considered a straggler if it takes more than (multiplier × expected_time) to complete.
/// </summary>
public double Multiplier { get; }

/// <summary>
/// Gets the minimum completion quantile before detection starts.
/// Detection only begins after this fraction of downloads have completed (e.g., 0.6 = 60%).
/// </summary>
public double Quantile { get; }

/// <summary>
/// Gets the straggler detection padding time.
/// Extra buffer time added before declaring a download as a straggler.
/// </summary>
public TimeSpan Padding { get; }

/// <summary>
/// Gets the maximum stragglers allowed before triggering fallback.
/// </summary>
public int MaxStragglersBeforeFallback { get; }

/// <summary>
/// Gets a value indicating whether synchronous fallback is enabled.
/// </summary>
public bool SynchronousFallbackEnabled { get; }

/// <summary>
/// Initializes a new instance of the <see cref="CloudFetchStragglerMitigationConfig"/> class.
/// </summary>
/// <param name="enabled">Whether straggler mitigation is enabled.</param>
/// <param name="multiplier">Straggler throughput multiplier (default: 1.5).</param>
/// <param name="quantile">Minimum completion quantile (default: 0.6).</param>
/// <param name="padding">Straggler detection padding (default: 5 seconds).</param>
/// <param name="maxStragglersBeforeFallback">Maximum stragglers before fallback (default: 10).</param>
/// <param name="synchronousFallbackEnabled">Whether synchronous fallback is enabled (default: false).</param>
public CloudFetchStragglerMitigationConfig(
bool enabled,
double multiplier = 1.5,
double quantile = 0.6,
TimeSpan? padding = null,
int maxStragglersBeforeFallback = 10,
bool synchronousFallbackEnabled = false)
{
Enabled = enabled;
Multiplier = multiplier;
Quantile = quantile;
Padding = padding ?? TimeSpan.FromSeconds(5);
MaxStragglersBeforeFallback = maxStragglersBeforeFallback;
SynchronousFallbackEnabled = synchronousFallbackEnabled;
}

/// <summary>
/// Gets a disabled configuration (feature off).
/// </summary>
public static CloudFetchStragglerMitigationConfig Disabled { get; } =
new CloudFetchStragglerMitigationConfig(enabled: false);

/// <summary>
/// Parses configuration from connection properties.
/// </summary>
/// <param name="properties">Connection properties dictionary.</param>
/// <returns>Parsed configuration, or Disabled if properties is null or feature not enabled.</returns>
public static CloudFetchStragglerMitigationConfig Parse(
IReadOnlyDictionary<string, string>? properties)
{
if (properties == null)
{
return Disabled;
}

bool enabled = ParseBooleanProperty(
properties,
DatabricksParameters.CloudFetchStragglerMitigationEnabled,
defaultValue: false);

if (!enabled)
{
return Disabled;
}

double multiplier = ParseDoubleProperty(
properties,
DatabricksParameters.CloudFetchStragglerMultiplier,
defaultValue: 1.5);

double quantile = ParseDoubleProperty(
properties,
DatabricksParameters.CloudFetchStragglerQuantile,
defaultValue: 0.6);

int paddingSeconds = ParseIntProperty(
properties,
DatabricksParameters.CloudFetchStragglerPaddingSeconds,
defaultValue: 5);

int maxStragglers = ParseIntProperty(
properties,
DatabricksParameters.CloudFetchMaxStragglersPerQuery,
defaultValue: 10);

bool synchronousFallback = ParseBooleanProperty(
properties,
DatabricksParameters.CloudFetchSynchronousFallbackEnabled,
defaultValue: false);

return new CloudFetchStragglerMitigationConfig(
enabled: true,
multiplier: multiplier,
quantile: quantile,
padding: TimeSpan.FromSeconds(paddingSeconds),
maxStragglersBeforeFallback: maxStragglers,
synchronousFallbackEnabled: synchronousFallback);
}

// Helper methods for parsing properties
private static bool ParseBooleanProperty(
IReadOnlyDictionary<string, string> properties,
string key,
bool defaultValue)
{
if (properties.TryGetValue(key, out string? value) &&
bool.TryParse(value, out bool result))
{
return result;
}
return defaultValue;
}

private static int ParseIntProperty(
IReadOnlyDictionary<string, string> properties,
string key,
int defaultValue)
{
if (properties.TryGetValue(key, out string? value) &&
int.TryParse(value,
System.Globalization.NumberStyles.Integer,
System.Globalization.CultureInfo.InvariantCulture,
out int result))
{
return result;
}
return defaultValue;
}

private static double ParseDoubleProperty(
IReadOnlyDictionary<string, string> properties,
string key,
double defaultValue)
{
if (properties.TryGetValue(key, out string? value) &&
double.TryParse(value,
System.Globalization.NumberStyles.Any,
System.Globalization.CultureInfo.InvariantCulture,
out double result))
{
return result;
}
return defaultValue;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;

namespace Apache.Arrow.Adbc.Drivers.Databricks.Reader.CloudFetch
{
/// <summary>
/// Tracks timing and throughput metrics for individual file downloads.
/// Thread-safe for concurrent access.
/// </summary>
internal class FileDownloadMetrics
{
private readonly object _lock = new object();
private DateTime? _downloadEndTime;
private bool _wasCancelledAsStragler;

// Minimum elapsed time to avoid unrealistic throughput calculations
private const double MinimumElapsedSecondsForThroughput = 0.001;

/// <summary>
/// Initializes a new instance of the <see cref="FileDownloadMetrics"/> class.
/// </summary>
/// <param name="fileOffset">The file offset in the download batch.</param>
/// <param name="fileSizeBytes">The size of the file in bytes.</param>
public FileDownloadMetrics(long fileOffset, long fileSizeBytes)
{
if (fileSizeBytes <= 0)
{
throw new ArgumentOutOfRangeException(
nameof(fileSizeBytes),
fileSizeBytes,
"File size must be positive");
}

FileOffset = fileOffset;
FileSizeBytes = fileSizeBytes;
DownloadStartTime = DateTime.UtcNow;
}

/// <summary>
/// Gets the file offset in the download batch.
/// </summary>
public long FileOffset { get; }

/// <summary>
/// Gets the size of the file in bytes.
/// </summary>
public long FileSizeBytes { get; }

/// <summary>
/// Gets the time when the download started.
/// </summary>
public DateTime DownloadStartTime { get; }

/// <summary>
/// Gets the time when the download completed, or null if still in progress.
/// </summary>
public DateTime? DownloadEndTime => _downloadEndTime;

/// <summary>
/// Gets a value indicating whether the download has completed.
/// </summary>
public bool IsDownloadCompleted => _downloadEndTime.HasValue;

/// <summary>
/// Gets a value indicating whether this download was cancelled as a straggler.
/// </summary>
public bool WasCancelledAsStragler => _wasCancelledAsStragler;

/// <summary>
/// Calculates the download throughput in bytes per second.
/// Returns null if the download has not completed.
/// Thread-safe.
/// </summary>
/// <returns>The throughput in bytes per second, or null if not completed.</returns>
public double? CalculateThroughputBytesPerSecond()
{
lock (_lock)
{
if (!_downloadEndTime.HasValue)
{
return null;
}

TimeSpan elapsed = _downloadEndTime.Value - DownloadStartTime;
double elapsedSeconds = elapsed.TotalSeconds;

// Avoid division by zero for very fast downloads
if (elapsedSeconds < MinimumElapsedSecondsForThroughput)
{
elapsedSeconds = MinimumElapsedSecondsForThroughput;
}

return FileSizeBytes / elapsedSeconds;
}
}

/// <summary>
/// Marks the download as completed and records the end time.
/// Thread-safe - idempotent (can be called multiple times safely).
/// </summary>
public void MarkDownloadCompleted()
{
lock (_lock)
{
if (_downloadEndTime.HasValue) return; // Already marked
_downloadEndTime = DateTime.UtcNow;
}
}

/// <summary>
/// Marks this download as having been cancelled due to being identified as a straggler.
/// Thread-safe - idempotent.
/// </summary>
public void MarkCancelledAsStragler()
{
lock (_lock)
{
_wasCancelledAsStragler = true;
}
}
}
}
Loading