Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,27 @@ execution:

If the server does not respond within the timeout, the job fails instead of hanging indefinitely.


### NVCF Reliability Defaults

For NVCF-backed endpoints such as `https://integrate.api.nvidia.com`, the Local executor now applies safer defaults during config generation:

- If `target.api_endpoint.stream` is unset, chat/completions evaluations default to `stream: true`. This reduces gateway idle timeouts for long reasoning requests.
- If you explicitly set `target.api_endpoint.stream: false`, the launcher injects `target.api_endpoint.headers.NVCF-POLL-SECONDS: "1800"` unless you already set that header yourself.
- NVCF `504` responses and request timeouts are treated as non-retryable in the built-in BYOB and client paths to avoid duplicating work that may still be running behind the gateway.

You can still override these defaults explicitly in your config:

```yaml
target:
api_endpoint:
stream: false
headers:
NVCF-POLL-SECONDS: "1800"
```

For large reasoning models, keep `parallelism` conservative and set `request_timeout` high enough to cover long generations even when streaming is enabled.

## Rerunning Evaluations

The Local executor generates reusable scripts for rerunning evaluations:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import shlex
import sys
from dataclasses import dataclass
from typing import Optional
from typing import Any, Optional
from urllib.parse import urlparse

import yaml
from omegaconf import DictConfig, OmegaConf
Expand All @@ -29,6 +30,30 @@
from nemo_evaluator_launcher.common.logging_utils import logger

CONTAINER_RESULTS_DIR = "/results"
# Kept in sync with nemo_evaluator.common.nvcf; intentionally duplicated so the
# launcher does not bind to a specific core version beyond what pyproject pins.
DEFAULT_NVCF_POLL_SECONDS = "1800"
_NVCF_HOSTNAMES = frozenset({"integrate.api.nvidia.com", "api.nvcf.nvidia.com"})


def _is_nvcf_endpoint(url: str | None) -> bool:
if not url:
return False
hostname = urlparse(url).hostname or ""
return hostname in _NVCF_HOSTNAMES


def _apply_nvcf_defaults(merged_config: dict[str, Any]) -> None:
api_endpoint = merged_config.setdefault("target", {}).setdefault("api_endpoint", {})
if not _is_nvcf_endpoint(api_endpoint.get("url")):
return

if api_endpoint.get("stream") is None and api_endpoint.get("type") in {"chat", "completions"}:
api_endpoint["stream"] = True

if not api_endpoint.get("stream"):
headers = api_endpoint.setdefault("headers", {})
headers.setdefault("NVCF-POLL-SECONDS", DEFAULT_NVCF_POLL_SECONDS)


@dataclass(frozen=True)
Expand Down Expand Up @@ -222,6 +247,26 @@ def get_eval_factory_command(
["target", "api_endpoint", "type"],
task_definition["endpoint_type"],
)

api_endpoint_cfg = cfg.get("target", {}).get("api_endpoint", {})
stream = api_endpoint_cfg.get("stream", None)
if stream is not None:
_set_nested_optionally_overriding(
merged_nemo_evaluator_config,
["target", "api_endpoint", "stream"],
stream,
)
headers = api_endpoint_cfg.get("headers", None)
if headers is not None:
if OmegaConf.is_config(headers):
headers = OmegaConf.to_container(headers, resolve=True)
_set_nested_optionally_overriding(
merged_nemo_evaluator_config,
["target", "api_endpoint", "headers"],
headers,
)

_apply_nvcf_defaults(merged_nemo_evaluator_config)
# For unlisted tasks, use the full harness.task format
# For listed tasks, use just the task name (existing behavior)
if task_definition.get("is_unlisted"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,106 @@ def test_get_eval_factory_command_basic(monkeypatch):

# The command to run eval is present
assert "$cmd run_eval --run_config config_ef.yaml" in result.cmd


def test_get_eval_factory_command_applies_nvcf_stream_default(monkeypatch):
monkeypatch.setattr(
"nemo_evaluator_launcher.common.helpers.get_versions", lambda: "TEST_VER"
)

cfg = OmegaConf.create(
{
"evaluation": {"nemo_evaluator_config": {"config": {}}},
"deployment": {"type": "none"},
"target": {
"api_endpoint": {
"url": "https://integrate.api.nvidia.com/v1/chat/completions",
"model_id": "model-123",
"api_key_name": "MY_API_KEY",
}
},
}
)
user_task_config = OmegaConf.create({"nemo_evaluator_config": {"config": {}}})
task_definition = {"endpoint_type": "chat", "task": "my_task"}

result = get_eval_factory_command(cfg, user_task_config, task_definition)
b64 = _extract_b64_from_echo_cmd(result.cmd)
merged = yaml.safe_load(base64.b64decode(b64.encode("utf-8")).decode("utf-8"))

assert merged["target"]["api_endpoint"]["stream"] is True


def test_get_eval_factory_command_applies_nvcf_poll_header_when_stream_disabled(monkeypatch):
monkeypatch.setattr(
"nemo_evaluator_launcher.common.helpers.get_versions", lambda: "TEST_VER"
)

cfg = OmegaConf.create(
{
"evaluation": {
"nemo_evaluator_config": {
"target": {"api_endpoint": {"stream": False}},
"config": {},
}
},
"deployment": {"type": "none"},
"target": {
"api_endpoint": {
"url": "https://integrate.api.nvidia.com/v1/chat/completions",
"model_id": "model-123",
"api_key_name": "MY_API_KEY",
}
},
}
)
user_task_config = OmegaConf.create({"nemo_evaluator_config": {"config": {}}})
task_definition = {"endpoint_type": "chat", "task": "my_task"}

result = get_eval_factory_command(cfg, user_task_config, task_definition)
b64 = _extract_b64_from_echo_cmd(result.cmd)
merged = yaml.safe_load(base64.b64decode(b64.encode("utf-8")).decode("utf-8"))

assert merged["target"]["api_endpoint"]["stream"] is False
assert (
merged["target"]["api_endpoint"]["headers"]["NVCF-POLL-SECONDS"]
== "1800"
)


def test_get_eval_factory_command_preserves_top_level_endpoint_stream_and_headers(monkeypatch):
monkeypatch.setattr(
"nemo_evaluator_launcher.common.helpers.get_versions", lambda: "TEST_VER"
)

cfg = OmegaConf.create(
{
"evaluation": {"nemo_evaluator_config": {"config": {}}},
"deployment": {"type": "none"},
"target": {
"api_endpoint": {
"url": "https://integrate.api.nvidia.com/v1/chat/completions",
"model_id": "model-123",
"api_key_name": "MY_API_KEY",
"stream": True,
"headers": {
"NVCF-POLL-SECONDS": "3600",
"X-Test-Header": "present",
},
}
},
}
)
user_task_config = OmegaConf.create({"nemo_evaluator_config": {"config": {}}})
task_definition = {"endpoint_type": "chat", "task": "my_task"}

result = get_eval_factory_command(cfg, user_task_config, task_definition)
b64 = _extract_b64_from_echo_cmd(result.cmd)
merged = yaml.safe_load(base64.b64decode(b64.encode("utf-8")).decode("utf-8"))

assert merged["target"]["api_endpoint"]["stream"] is True
assert merged["target"]["api_endpoint"]["headers"] == {
"NVCF-POLL-SECONDS": "3600",
"X-Test-Header": "present",
}

Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,14 @@ def intercept_request(
start_time = time.time()

# This is a final interceptor, we'll need the flask_request and api
headers = {k: v for k, v in ar.r.headers if k.lower() != "host"}
for key, value in (context.request_headers or {}).items():
headers.setdefault(key, value)

raw_response = requests.request(
method=ar.r.method,
url=context.url,
headers={k: v for k, v in ar.r.headers if k.lower() != "host"},
headers=headers,
json=ar.r.json,
cookies=ar.r.cookies,
allow_redirects=False,
Expand Down
14 changes: 13 additions & 1 deletion packages/nemo-evaluator/src/nemo_evaluator/adapters/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def _run_adapter_server(
adapter_config: AdapterConfig,
port: int,
model_name: str | None = None,
request_headers: dict[str, str] | None = None,
) -> None:
"""Internal function to run the adapter server."""
_increase_file_descriptor_limit()
Expand All @@ -142,6 +143,7 @@ def _run_adapter_server(
adapter_config=adapter_config,
port=port,
model_name=model_name,
request_headers=request_headers,
)

def signal_handler(signum, frame):
Expand Down Expand Up @@ -189,6 +191,7 @@ def __init__(
adapter_config: AdapterConfig,
port: int = DEFAULT_ADAPTER_PORT,
model_name: str | None = None,
request_headers: dict[str, str] | None = None,
):
"""
Initialize the adapter server.
Expand Down Expand Up @@ -217,6 +220,7 @@ def __init__(
self.output_dir = output_dir
self.adapter_config = adapter_config
self.model_name = model_name
self.request_headers = request_headers

# Initialize the shared adapter pipeline
self.pipeline = AdapterPipeline(adapter_config, output_dir, model_name)
Expand Down Expand Up @@ -337,6 +341,7 @@ def _handler(self, path: str) -> flask.Response:
output_dir=self.output_dir,
url=self.api_url,
model_name=self.model_name,
request_headers=self.request_headers,
)

# Create adapter request
Expand Down Expand Up @@ -542,7 +547,14 @@ def __enter__(self):
self.process = multiprocessing.get_context("spawn").Process(
target=_run_adapter_server,
daemon=True,
args=(self.original_url, output_dir, adapter_config, self.port, model_name),
kwargs={
"api_url": self.original_url,
"output_dir": output_dir,
"adapter_config": adapter_config,
"port": self.port,
"model_name": model_name,
"request_headers": self.evaluation.target.api_endpoint.headers,
},
)
self.process.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class AdapterGlobalContext:
output_dir: str # Directory for output files
url: str # The upstream API URL to forward requests to
model_name: str | None = None # Model name for logging context
request_headers: dict[str, str] | None = None # Extra headers to inject upstream

@property
def metrics_path(self) -> Path:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class ApiEndpoint(BaseModel):
stream: Optional[bool] = Field(
description="Whether responses should be streamed", default=None
)
headers: Optional[Dict[str, str]] = Field(
description="Extra HTTP headers to send to the endpoint", default=None
)
type: Optional[EndpointType] = Field(
description="The type of the target", default=None
)
Expand All @@ -94,6 +97,9 @@ class EndpointModelConfig(BaseModel):
stream: Optional[bool] = Field(
description="Whether responses should be streamed", default=None
)
headers: Optional[Dict[str, str]] = Field(
description="Extra HTTP headers to send to the endpoint", default=None
)
type: Optional[EndpointType] = Field(
description="The type of the target", default=None
)
Expand Down
Loading
Loading