-
Notifications
You must be signed in to change notification settings - Fork 165
feat(csharp/src/Drivers/Databricks): Implement straggler download mitigation for CloudFetch #3637
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(csharp/src/Drivers/Databricks): Implement straggler download mitigation for CloudFetch #3637
Conversation
| end | ||
| ``` | ||
|
|
||
| ### 3.2 Download with Straggler Detection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sure that we handle a corner case, that if all the download tries are just taking long, it will cause this chunk download failreus, maybe we need some protections that.
- for the last retry, don't do straggler cancel
- or we keep one download already running when we do straggler retries, and which ever success earlier to take result from that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've used the first approach. Updated the doc accordingly
Add runtime straggler download detection based on median throughput analysis with automatic cancellation and retry for CloudFetch operations. Changes: - Add 6 new configuration parameters in DatabricksParameters.cs - Implement FileDownloadMetrics class for tracking download timing/throughput - Implement StragglerDownloadDetector class for median-based detection algorithm - Integrate straggler handling into CloudFetchDownloader retry loop - Add background monitoring task for periodic straggler checks - Add per-file CancellationTokenSource for granular download cancellation - Implement edge case protection: last retry attempt cannot be cancelled Key Features: - Median throughput calculation for outlier resistance - 60% quantile threshold before detection starts - Retry integration: straggler cancellation counts as one retry attempt - OpenTelemetry instrumentation for observability - Disabled by default for conservative rollout 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Add minimal unit tests and E2E integration tests for straggler detection feature, focusing on mistake-prone areas and configuration validation. Unit Tests (12 tests): - FileDownloadMetrics throughput calculation and state management - StragglerDownloadDetector parameter validation - Median calculation with odd/even counts - Edge cases: empty lists, below threshold, cancelled downloads - Fallback threshold trigger validation E2E Tests (6 tests): - Configuration parameter validation - Default disabled behavior - Parameter naming conventions - Basic integration with default configuration - Atomic counter operations All 18 tests passing. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…ary semaphore Add simple sequential download fallback that activates when too many stragglers are detected, using a secondary semaphore approach for clean throttling. Implementation: - Add _sequentialSemaphore (1/1 capacity) and _isSequentialMode flag - Set _isSequentialMode=true when fallback threshold exceeded - Conditionally acquire sequential semaphore before downloads - Release in reverse order (sequential then parallel) - Dispose sequential semaphore in StopAsync Key advantages: - Uses semaphore's native throttling behavior - Can switch back to parallel by flipping flag - No task chaining complexity or lock contention - Clean RAII-style acquire/release pattern - Minimal code changes (~15 lines) All 18 tests passing. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Address 5 critical implementation issues identified in code review: 1. Semaphore leak: Wrap task creation in try/catch to release semaphores if exception occurs after acquisition but before task creation 2. Race condition: Add fileData == null check to straggler cancellation handler to prevent unnecessary retries when download completed just before cancellation 3. URL refresh null handling: Log warning when URL refresh fails instead of silently continuing with potentially expired URL 4. Memory leak prevention: Move cleanup to finally block to ensure per-file cancellation tokens are always disposed 5. Fire-and-forget exception handling: Wrap cleanup task in try/catch to prevent unobserved task exceptions All 18 straggler mitigation tests pass after fixes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
csharp/src/Drivers/Databricks/Reader/CloudFetch/straggler-mitigation-summary.md
Outdated
Show resolved
Hide resolved
| | Event Name | When Emitted | Key Tags | | ||
| |------------|-------------|----------| | ||
| | `cloudfetch.straggler_check` | When stragglers identified | `active_downloads`, `completed_downloads`, `stragglers_identified` | | ||
| | `cloudfetch.straggler_cancelling` | Before cancelling straggler | `offset` | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we just need the cancelled tag.
| } | ||
| ``` | ||
|
|
||
| #### StragglerDownloadDetectorTests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems too much details on the testing part, can we show some skeleton code for the stragger monitor? Like how does it monitor all download threads, how does it restart, is it on the same straggle thread? I think the purpose for restarting is we establish a new http connection with the cloud provider, does it guarantee that?
Address 8 critical and important implementation issues: P0 Critical Issues: 1. Sequential semaphore lifecycle: Remove readonly and disposal to support restart scenarios without ObjectDisposedException 2. Sequential semaphore TOCTOU race: Capture mode atomically at acquisition time to prevent semaphore count drift 3. Try/finally coverage: Move metrics initialization inside try block to ensure cleanup always runs and prevent memory leaks P1 Important Issues: 4. Duplicate straggler detection: Add tracking dictionary to prevent counting same file multiple times across retry cycles 5. Counter overflow protection: Change from int to long (max ~9 quintillion) to prevent overflow in pathological scenarios P2 Issues: 6. Cleanup task cancellation: Use cancellationToken in fire-and-forget cleanup to respect shutdown and remove immediately if cancelled 7. File size validation: Add constructor validation to reject zero or negative file sizes and prevent invalid throughput calculations 8. Stale CTS atomicity: Use AddOrUpdate to atomically replace cancellation token source and dispose old one, preventing race conditions All 18 straggler mitigation tests pass after fixes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Add 25 comprehensive end-to-end tests covering all straggler mitigation functionality with realistic scenarios. Test Coverage: **Straggler Detection (5 tests)** - Fast/slow download detection with proper timing - Quantile threshold validation - All-fast downloads (no false positives) - Already-cancelled exclusion - Empty/null metrics handling **Sequential Fallback (2 tests)** - Fallback triggers when threshold exceeded - Fallback does not trigger below threshold **Duplicate Detection Prevention (2 tests)** - With tracking dict: same file counted only once - Without tracking dict: counts multiple times (control test) **FileDownloadMetrics (4 tests)** - Invalid size validation (zero and negative) - Throughput calculation accuracy - Throughput before completion returns null - Straggler flag functionality **Counter Overflow Protection (1 test)** - Verifies long type usage **Median Calculation (2 tests)** - Odd count returns middle value - Even count returns average of middle two **Edge Cases (4 tests)** - No completed downloads - Empty metrics list - Null metrics - Very fast download (< 1ms) without division errors **Concurrency (2 tests)** - Parallel detection with thread-safe counter - Parallel detection with tracking prevents duplicates **Parameter Validation (4 tests)** - Invalid multiplier - Invalid quantile (too low/high) - Negative padding - Negative max stragglers Key Testing Approach: - Uses helper methods to create fast/slow downloads naturally - Slow downloads created first, then aged via Thread.Sleep - Fast downloads complete immediately after creation - No reflection or mocking needed for timing - All tests are deterministic and repeatable Test Results: ✅ All 25 comprehensive E2E tests pass ✅ All 43 total straggler tests pass (unit + basic E2E + comprehensive) ✅ Total test time: ~8 seconds 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Added 5 targeted tests validating code review fixes: - Duplicate detection prevention (issue apache#5) - Atomic CTS replacement (issue apache#9) - Cleanup in finally block (issue apache#3) - Cancellable cleanup tasks (issue apache#7) - Concurrent CTS cleanup safety All tests use real objects (ConcurrentDictionary, CancellationTokenSource) without mocks, following existing CloudFetch test patterns. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…HTTP Created 15 comprehensive E2E tests following CloudFetchDownloaderTest.cs pattern: Passing tests (8): - FastDownloadsNotMarkedAsStraggler - RequiresMinimumCompletionQuantile - MonitoringThreadRespectsCancellation - ParallelModeRespectsMaxParallelDownloads - SequentialModeEnforcesOneDownloadAtATime - NoStragglersDetectedInSequentialMode - CleanShutdownDuringMonitoring - FeatureDisabledByDefault Tests validate: - Monitoring thread lifecycle and cancellation - Semaphore behavior (parallel and sequential modes) - Minimum completion quantile requirement - Feature disabled without configuration - Clean shutdown during operations Known issues (4 tests failing): - Difficulty mocking abstract/internal HiveServer2Connection - Needs investigation for proper property configuration Test coverage includes straggler detection, sequential fallback, semaphore management, retry logic, and complex scenarios. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…ocused tests Changes: - Moved tests from E2E to Unit (were unit tests, not E2E) - Reduced from 30 redundant tests to 10 critical tests - Removed obvious tests (parameter validation, basic getters/setters) Unit tests now focus on: - Duplicate detection prevention across monitoring cycles - Atomic CTS replacement for retries - Cleanup execution in finally blocks - Cleanup cancellation during shutdown - Concurrent cleanup safety - Counter overflow protection (long vs int) - Median calculation correctness (even/odd count) - Empty metrics null safety - Concurrent modification thread safety All 10 tests pass. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Adds straggler download mitigation feature to improve CloudFetch performance by detecting and cancelling abnormally slow parallel downloads. Implementation: - New StragglerDownloadDetector class for detecting slow downloads - New FileDownloadMetrics class for tracking download performance - New CloudFetchStragglerMitigationConfig for configuration management - Integration into CloudFetchDownloader with background monitoring thread - Automatic fallback to sequential downloads after threshold Configuration Parameters: - adbc.databricks.cloudfetch.straggler_mitigation_enabled (default: false) - adbc.databricks.cloudfetch.straggler_multiplier (default: 1.5) - adbc.databricks.cloudfetch.straggler_quantile (default: 0.6) - adbc.databricks.cloudfetch.straggler_padding_seconds (default: 5) - adbc.databricks.cloudfetch.max_stragglers_per_query (default: 10) - adbc.databricks.cloudfetch.synchronous_fallback_enabled (default: true) Tests: - 19 comprehensive unit tests covering basic functionality and advanced scenarios - 19 E2E tests with mocked HTTP responses validating real-world scenarios - All tests pass successfully Documentation: - straggler-mitigation-design.md: comprehensive design documentation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Replace string.Split(string) with string.Split(string[], StringSplitOptions) as the single-string overload is not available in .NET Framework 4.7.2. Fixes compilation errors in CloudFetchStragglerDownloaderE2ETests.cs.
| private readonly object _errorLock = new object(); | ||
|
|
||
| // Straggler mitigation fields | ||
| private readonly bool _isStragglerMitigationEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move the change here and related logci to a new class?
Summary
Implements straggler download mitigation for CloudFetch to improve query performance by detecting and cancelling abnormally slow parallel downloads. The feature monitors active downloads and automatically retries stragglers when faster slots become available, with an optional fallback to sequential mode after a configurable threshold.
Changes
New Classes:
StragglerDownloadDetector(StragglerDetector.cs)FileDownloadMetrics(FileDownloadMetrics.cs)CloudFetchStragglerMitigationConfig(CloudFetchStragglerMitigationConfig.cs)CloudFetchDownloader Integration:
CancellationTokenSourcefor clean cancellationConfiguration Parameters:
All parameters use
adbc.databricks.cloudfetch.prefix:straggler_mitigation_enabled(default: false) - Feature togglestraggler_multiplier(default: 1.5) - Throughput multiplier for detectionstraggler_quantile(default: 0.6) - Minimum completion percentage before detectionstraggler_padding_seconds(default: 5) - Grace period before flagging as stragglermax_stragglers_per_query(default: 10) - Threshold to trigger sequential fallbacksynchronous_fallback_enabled(default: true) - Enable automatic fallback to sequential modeBenefits
Technical Details
Detection Algorithm:
CancellationTokenSourceSequential Fallback:
FetchResultscallThread Safety:
ConcurrentDictionaryfor metrics and cancellation tokensTesting
38 tests total, all passing:
Unit Tests (19):
FileDownloadMetricsthroughput calculation (before/after completion)FileDownloadMetricsstraggler flag settingStragglerDownloadDetectorparameter validation (multiplier, quantile)E2E Tests (19):
Documentation
straggler-mitigation-design.md- Comprehensive design doc with algorithm details, implementation notes, configuration guide, and usage examples🤖 Generated with Claude Code