Skip to content

Conversation

@grzesir
Copy link
Contributor

@grzesir grzesir commented Sep 17, 2025

Description by Korbit AI

What change is being made?

Fix data flow and reliability across brokers and data sources by introducing rate-limited REST calls, caching and backoff for Tradovate, live data delegation and improved timezone handling for DataBento and Polars, plus ETA overrides for backtests and minor logging safeguards.

Why are these changes being made?

Improve robustness and performance in live/rest data paths (Tradovate, DataBento, ProjectX), ensure consistent UTC-aware timestamps, and provide more informative backtesting progress with accurate ETA. These changes also nudge toward safer defaults and resilience against rate limits and missing live data.

Is this description stale? Ask me to generate a new description by commenting /korbit-generate-pr-description

Copy link
Contributor

@korbit-ai korbit-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by Korbit AI

Korbit automatically attempts to detect when you fix issues in new commits.
Category Issue Status
Documentation Unclear request wrapper docstring ▹ view
Readability Overly Nested Logging Logic ▹ view
Documentation Incomplete rate limiter docstring ▹ view
Performance Inefficient ETA calculation on every update ▹ view
Readability Unexplained ETA Configuration Constants ▹ view
Functionality Rate limit retry without backoff ▹ view
Readability Unexplained HTTP Status Code ▹ view
Documentation Incomplete timezone handling documentation ▹ view
Error Handling Silent import failure with generic exception handling ▹ view
Readability Redundant Conditional Logic ▹ view
Files scanned
File Path Reviewed
setup.py
lumibot/data_sources/data_source_backtesting.py
lumibot/data_sources/databento_data.py
lumibot/tools/helpers.py
lumibot/entities/bars.py
lumibot/data_sources/projectx_data.py
lumibot/tools/databento_helper.py
lumibot/tools/databento_helper_polars.py
lumibot/brokers/tradovate.py
lumibot/data_sources/databento_data_polars.py
lumibot/strategies/strategy_executor.py
lumibot/strategies/strategy.py

Explore our documentation to understand the languages and file types we support and the files we ignore.

Check out our docs on how you can make Korbit work best for you and your team.

Loving Korbit!? Share us on LinkedIn Reddit and X

Comment on lines 383 to 391
if color:
colored_message = colored(message, color)
self.logger.info(colored_message)
if color in COLORS:
colored_message = colored(message, color)
self.logger.info(colored_message)
else:
self.logger.warning(f"Unsupported log color '{color}' for message: {message}")
self.logger.info(message)
else:
self.logger.info(message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overly Nested Logging Logic category Readability

Tell me more
What is the issue?

Nested if statements in log_message() make the control flow harder to follow than necessary.

Why this matters

Nested conditionals increase cognitive load and make code harder to scan. This can lead to confusion about the different execution paths.

Suggested change ∙ Feature Preview
# Flatten the logic
if not color:
    self.logger.info(message)
    return

if color not in COLORS:
    self.logger.warning(f"Unsupported log color '{color}' for message: {message}")
    self.logger.info(message)
    return
    
colored_message = colored(message, color)
self.logger.info(colored_message)
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

Comment on lines +123 to +125
self._eta_history.append((now_wall, percent))
while self._eta_history and (now_wall - self._eta_history[0][0]).total_seconds() > self._eta_window_seconds:
self._eta_history.popleft()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient ETA calculation on every update category Performance

Tell me more
What is the issue?

The ETA calculation logic executes on every datetime update, performing deque operations and time calculations even when progress hasn't meaningfully changed.

Why this matters

This creates unnecessary computational overhead during backtesting, especially for high-frequency strategies where _update_datetime is called frequently. The performance impact compounds with longer backtests.

Suggested change ∙ Feature Preview

Add a condition to only perform ETA calculations when sufficient time has elapsed or progress has changed meaningfully, similar to the existing logging throttle pattern:

if (self._last_eta_time is None) or ((now_wall - self._last_eta_time).total_seconds() >= 1):
    self._last_eta_time = now_wall
    # Perform ETA calculations here
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

Comment on lines +60 to +62
self._eta_window_seconds = 30
self._eta_calibration_seconds = 25
self._eta_calibration_progress = 4 # percent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unexplained ETA Configuration Constants category Readability

Tell me more
What is the issue?

Magic numbers are used for ETA calculation configuration without clear explanation of their significance or why these specific values were chosen.

Why this matters

Future maintainers won't understand why these specific values were chosen or how adjusting them might impact the ETA calculation behavior.

Suggested change ∙ Feature Preview

Add class constants with descriptive names and comments explaining the rationale:

# Window of time (in seconds) to keep ETA calculation history
ETA_WINDOW_SECONDS = 30
# Minimum elapsed seconds before starting ETA calibration
ETA_CALIBRATION_SECONDS = 25
# Minimum progress percentage before starting ETA calibration
ETA_CALIBRATION_PROGRESS_PERCENT = 4
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

Comment on lines +199 to +209
if status_code == 429 or "rate limit" in message.lower():
if cached_balances is not None:
self.strategy.logger.warning(
"Broker balance refresh hit rate limit; using cached values and continuing"
)
broker_balances = cached_balances
else:
self.strategy.logger.warning(
"Broker balance refresh hit rate limit and no cached value is available; retrying"
)
broker_balances = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rate limit retry without backoff category Functionality

Tell me more
What is the issue?

The rate limit handling logic sets broker_balances to None when no cached values are available, but this will trigger the existing retry mechanism that may not respect rate limits, potentially causing repeated rate limit violations.

Why this matters

When rate limits are hit and no cached values exist, the code falls back to the legacy retry path which will immediately retry the same operation that just failed due to rate limits. This can lead to aggressive retrying that violates rate limits further and may result in temporary bans or escalated rate limiting from the broker.

Suggested change ∙ Feature Preview

Implement proper backoff when rate limits are hit and no cached values are available. Add a delay before retrying or skip this iteration entirely:

if status_code == 429 or "rate limit" in message.lower():
    if cached_balances is not None:
        self.strategy.logger.warning(
            "Broker balance refresh hit rate limit; using cached values and continuing"
        )
        broker_balances = cached_balances
    else:
        self.strategy.logger.warning(
            "Broker balance refresh hit rate limit and no cached value is available; skipping balance update this iteration"
        )
        # Skip balance update this iteration to avoid immediate retry
        break  # Exit the while loop to avoid retrying immediately
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

message = str(balance_exc)
cached_balances = getattr(self.broker, "_cached_balances", None)

if status_code == 429 or "rate limit" in message.lower():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unexplained HTTP Status Code category Readability

Tell me more
What is the issue?

Magic number 429 is used without explanation of its meaning as an HTTP status code for rate limiting.

Why this matters

Using unexplained HTTP status codes makes the code harder to understand for developers not familiar with all HTTP codes.

Suggested change ∙ Feature Preview
HTTP_TOO_MANY_REQUESTS = 429
if status_code == HTTP_TOO_MANY_REQUESTS or "rate limit" in message.lower():
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

else:
logger.warning(f"Unexpected contract format: {resolved_contract}, using as-is")
return [resolved_contract]
def _ensure_polars_datetime_timezone(df: pl.DataFrame, column: str = "datetime", tz: str = "UTC") -> pl.DataFrame:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete timezone handling documentation category Documentation

Tell me more
What is the issue?

The docstring lacks explanation of the function's behavior with different datetime types and timezone scenarios.

Why this matters

Users may not understand how the function handles naive vs aware datetimes, or different time units, leading to potential timezone-related bugs.

Suggested change ∙ Feature Preview

"""Ensure the specified datetime column is timezone-aware in the given timezone.

Handles various datetime scenarios:

  • Naive datetimes: Assigns specified timezone
  • Aware datetimes: Converts to specified timezone
  • Different time units: Normalizes to nanoseconds

Parameters

df : pl.DataFrame
Input DataFrame
column : str, default "datetime"
Name of datetime column to process
tz : str, default "UTC"
Target timezone

Returns

pl.DataFrame
DataFrame with normalized timezone-aware datetime column
"""

Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

Comment on lines +12 to +13
except Exception: # pragma: no cover - optional dependency path
DataBentoDataPolars = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent import failure with generic exception handling category Error Handling

Tell me more
What is the issue?

Generic catch-all exception handler without logging or contextual information about the import failure.

Why this matters

When the DataBentoDataPolars import fails, developers have no visibility into what went wrong, making debugging difficult and potentially masking important configuration or dependency issues.

Suggested change ∙ Feature Preview

Add logging to capture the import failure details:

except Exception as e:  # pragma: no cover - optional dependency path
    logger.debug(f"DataBentoDataPolars not available: {e}")
    DataBentoDataPolars = None
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

event_time = trade_entry.get("event_time", event_time)
age_ms = int((datetime.now(timezone.utc) - trade_entry["received_at"]).total_seconds() * 1000)

tick = 0.25 if price is not None else 0.25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant Conditional Logic category Readability

Tell me more
What is the issue?

The ternary expression is redundant since both conditions return the same value, making the code unnecessarily complex.

Why this matters

This adds cognitive overhead for no benefit and could confuse readers trying to understand the logic.

Suggested change ∙ Feature Preview

Simplify to a constant:

tick = 0.25  # Default tick size for futures
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

Comment on lines +103 to +104
def _throttle_rest(self):
"""Ensure REST calls respect a soft per-minute cap."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete rate limiter docstring category Documentation

Tell me more
What is the issue?

The docstring lacks information about the rate limiting parameters and behavior.

Why this matters

Without understanding the rate limiting parameters, developers may not be able to properly configure or troubleshoot rate limiting issues.

Suggested change ∙ Feature Preview

def _throttle_rest(self):
"""Ensure REST calls respect a soft per-minute cap.

    Uses a rolling window of request timestamps to enforce the rate limit
    specified by self._rate_limit_per_minute. When limit is reached,
    sleeps with small jitter until a request slot becomes available.
    """
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

Comment on lines +133 to +134
def _request(self, method: str, url: str, **kwargs):
"""Wrapper around requests.request with throttling."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear request wrapper docstring category Documentation

Tell me more
What is the issue?

The docstring doesn't explain the return value or throttling behavior.

Why this matters

Developers need to understand what the method returns and how throttling affects the request flow.

Suggested change ∙ Feature Preview

def _request(self, method: str, url: str, **kwargs):
"""Wrapper around requests.request with throttling.

    Applies rate limiting via _throttle_rest before executing request.
    Returns the raw requests.Response object.
    """
Provide feedback to improve future suggestions

Nice Catch Incorrect Not in Scope Not in coding standard Other

💬 Looking for more details? Reply to this comment to chat with Korbit.

…ple files

- Fix line length issues in tradovate.py and thetadata_backtesting.py
- Fix unused loop variable in botspot_error_reporting_example.py
- Auto-fixed whitespace and formatting issues from ruff

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants