diff --git a/src/aiperf/__main__.py b/src/aiperf/__main__.py index 66b12ccf7..cc9e9d9c3 100644 --- a/src/aiperf/__main__.py +++ b/src/aiperf/__main__.py @@ -4,7 +4,7 @@ import sys from aiperf.cli import app -from aiperf.gpu_telemetry.constants import DEFAULT_DCGM_ENDPOINTS +from aiperf.common.environment import Environment def main() -> int: @@ -15,7 +15,7 @@ def main() -> int: if "--gpu-telemetry" in sys.argv: idx = sys.argv.index("--gpu-telemetry") if idx >= len(sys.argv) - 1 or sys.argv[idx + 1].startswith("-"): - for endpoint in reversed(DEFAULT_DCGM_ENDPOINTS): + for endpoint in reversed(Environment.GPU.DEFAULT_DCGM_ENDPOINTS): sys.argv.insert(idx + 1, endpoint) return app(sys.argv[1:]) diff --git a/src/aiperf/cli_utils.py b/src/aiperf/cli_utils.py index 8569c37a8..9768956d5 100644 --- a/src/aiperf/cli_utils.py +++ b/src/aiperf/cli_utils.py @@ -54,6 +54,7 @@ def raise_startup_error_and_exit( border_style=border_style, ) ) + console.file.flush() sys.exit(exit_code) @@ -115,3 +116,23 @@ def __exit__(self, exc_type, exc_value, traceback): title=self.title, exit_code=self.exit_code, ) + + +def print_developer_mode_warning() -> None: + """Print a warning message to the console if developer mode is enabled.""" + from rich.console import Console + from rich.panel import Panel + from rich.text import Text + + console = Console() + panel = Panel( + Text( + "Developer Mode is active. This is a developer-only feature. Use at your own risk.", + style="yellow", + ), + title="AIPerf Developer Mode", + border_style="bold yellow", + title_align="left", + ) + console.print(panel) + console.file.flush() diff --git a/src/aiperf/common/base_component_service.py b/src/aiperf/common/base_component_service.py index 9c41ae6d8..fe5383f21 100644 --- a/src/aiperf/common/base_component_service.py +++ b/src/aiperf/common/base_component_service.py @@ -6,13 +6,9 @@ from aiperf.common.base_service import BaseService from aiperf.common.config import ServiceConfig, UserConfig -from aiperf.common.constants import ( - DEFAULT_HEARTBEAT_INTERVAL, - DEFAULT_MAX_REGISTRATION_ATTEMPTS, - DEFAULT_REGISTRATION_INTERVAL, -) from aiperf.common.decorators import implements_protocol from aiperf.common.enums import CommandType, LifecycleState, ServiceType +from aiperf.common.environment import Environment from aiperf.common.hooks import ( background_task, on_command, @@ -57,7 +53,7 @@ def __init__( **kwargs, ) - @background_task(interval=DEFAULT_HEARTBEAT_INTERVAL, immediate=False) + @background_task(interval=Environment.SERVICE.HEARTBEAT_INTERVAL, immediate=False) async def _heartbeat_task(self) -> None: """Send a heartbeat notification to the system controller.""" await self.publish( @@ -83,12 +79,12 @@ async def _register_service_on_start(self) -> None: target_service_type=ServiceType.SYSTEM_CONTROLLER, state=self.state, ) - for _ in range(DEFAULT_MAX_REGISTRATION_ATTEMPTS): + for _ in range(Environment.SERVICE.REGISTRATION_MAX_ATTEMPTS): result = await self.send_command_and_wait_for_response( # NOTE: We keep the command id the same each time to ensure that the system controller # can ignore duplicate registration requests. command_message, - timeout=DEFAULT_REGISTRATION_INTERVAL, + timeout=Environment.SERVICE.REGISTRATION_INTERVAL, ) if isinstance(result, CommandResponse): self.debug( diff --git a/src/aiperf/common/bootstrap.py b/src/aiperf/common/bootstrap.py index 9faf96a7e..eb4f53d03 100644 --- a/src/aiperf/common/bootstrap.py +++ b/src/aiperf/common/bootstrap.py @@ -10,6 +10,7 @@ import sys from aiperf.common.config import ServiceConfig, UserConfig +from aiperf.common.environment import Environment from aiperf.common.protocols import ServiceProtocol @@ -52,7 +53,7 @@ def bootstrap_and_run_service( user_config = load_user_config() async def _run_service(): - if service_config.developer.enable_yappi: + if Environment.DEV.ENABLE_YAPPI: _start_yappi_profiling() from aiperf.module_loader import ensure_modules_loaded @@ -114,11 +115,11 @@ async def _run_service(): except Exception as e: service.exception(f"Unhandled exception in service: {e}") - if service_config.developer.enable_yappi: + if Environment.DEV.ENABLE_YAPPI: _stop_yappi_profiling(service.service_id, user_config) with contextlib.suppress(asyncio.CancelledError): - if not service_config.developer.disable_uvloop: + if not Environment.SERVICE.DISABLE_UVLOOP: import uvloop uvloop.run(_run_service()) diff --git a/src/aiperf/common/config/__init__.py b/src/aiperf/common/config/__init__.py index 023c50944..6fdaf17eb 100644 --- a/src/aiperf/common/config/__init__.py +++ b/src/aiperf/common/config/__init__.py @@ -18,14 +18,12 @@ ) from aiperf.common.config.cli_parameter import ( CLIParameter, - DeveloperOnlyCLI, DisableCLI, ) from aiperf.common.config.config_defaults import ( AudioDefaults, CLIDefaults, ConversationDefaults, - DevDefaults, EndpointDefaults, ImageDefaults, InputDefaults, @@ -58,10 +56,6 @@ TurnConfig, TurnDelayConfig, ) -from aiperf.common.config.dev_config import ( - DeveloperConfig, - print_developer_mode_warning, -) from aiperf.common.config.endpoint_config import ( EndpointConfig, ) @@ -128,9 +122,6 @@ "CLIParameter", "ConversationConfig", "ConversationDefaults", - "DevDefaults", - "DeveloperConfig", - "DeveloperOnlyCLI", "DisableCLI", "EndpointConfig", "EndpointDefaults", @@ -180,6 +171,5 @@ "parse_str_or_dict_as_tuple_list", "parse_str_or_list", "parse_str_or_list_of_positive_values", - "print_developer_mode_warning", "print_str_or_list", ] diff --git a/src/aiperf/common/config/cli_parameter.py b/src/aiperf/common/config/cli_parameter.py index ac6bb3625..ecfbf5b3e 100644 --- a/src/aiperf/common/config/cli_parameter.py +++ b/src/aiperf/common/config/cli_parameter.py @@ -3,9 +3,6 @@ from cyclopts import Parameter -from aiperf.common.config.groups import Groups -from aiperf.common.constants import AIPERF_DEV_MODE - class CLIParameter(Parameter): """Configuration for a CLI parameter. @@ -27,13 +24,3 @@ class DisableCLI(CLIParameter): def __init__(self, reason: str = "Not supported via command line", *args, **kwargs): super().__init__(*args, parse=False, **kwargs) - - -class DeveloperOnlyCLI(CLIParameter): - """Configuration for a CLI parameter that is only available to developers. - - This is a subclass of the CLIParameter class that is used to set a CLI parameter to only be available to developers. - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, parse=AIPERF_DEV_MODE, group=Groups.DEVELOPER, **kwargs) diff --git a/src/aiperf/common/config/config_defaults.py b/src/aiperf/common/config/config_defaults.py index 73ebd255b..5a31658e8 100644 --- a/src/aiperf/common/config/config_defaults.py +++ b/src/aiperf/common/config/config_defaults.py @@ -4,7 +4,6 @@ from dataclasses import dataclass from pathlib import Path -from aiperf.common.constants import AIPERF_DEV_MODE from aiperf.common.enums import ( AIPerfLogLevel, AIPerfUIType, @@ -185,19 +184,3 @@ class LoadGeneratorDefaults: class WorkersDefaults: MIN = None MAX = None - - -@dataclass(frozen=True) -class DevDefaults: - if AIPERF_DEV_MODE: - ENABLE_YAPPI = False - DEBUG_SERVICES = None - TRACE_SERVICES = None - SHOW_INTERNAL_METRICS = True - DISABLE_UVLOOP = False - else: - ENABLE_YAPPI = False - DEBUG_SERVICES = None - TRACE_SERVICES = None - SHOW_INTERNAL_METRICS = False - DISABLE_UVLOOP = False diff --git a/src/aiperf/common/config/config_validators.py b/src/aiperf/common/config/config_validators.py index 53ccace10..e441077aa 100644 --- a/src/aiperf/common/config/config_validators.py +++ b/src/aiperf/common/config/config_validators.py @@ -7,7 +7,7 @@ import orjson -from aiperf.common.enums import ServiceType +from aiperf.common.enums.service_enums import ServiceType from aiperf.common.utils import load_json_str """ diff --git a/src/aiperf/common/config/dev_config.py b/src/aiperf/common/config/dev_config.py deleted file mode 100644 index 4e1d149f7..000000000 --- a/src/aiperf/common/config/dev_config.py +++ /dev/null @@ -1,104 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - - -from typing import Annotated - -from pydantic import BeforeValidator, Field - -from aiperf.common.config.base_config import BaseConfig -from aiperf.common.config.cli_parameter import DeveloperOnlyCLI -from aiperf.common.config.config_defaults import DevDefaults -from aiperf.common.config.config_validators import parse_service_types -from aiperf.common.constants import AIPERF_DEV_MODE -from aiperf.common.enums.service_enums import ServiceType - - -def print_developer_mode_warning() -> None: - """Print a warning message to the console if developer mode is enabled.""" - from rich.console import Console - from rich.panel import Panel - from rich.text import Text - - console = Console() - panel = Panel( - Text( - "Developer Mode is active. This is a developer-only feature. Use at your own risk.", - style="yellow", - ), - title="AIPerf Developer Mode", - border_style="bold yellow", - title_align="left", - ) - console.print(panel) - console.file.flush() - - -if AIPERF_DEV_MODE: - # Print a warning message to the console if developer mode is enabled, once at load time - print_developer_mode_warning() - - -class DeveloperConfig(BaseConfig): - """ - A configuration class for defining developer related settings. - - NOTE: These settings are only available in developer mode. - """ - - enable_yappi: Annotated[ - bool, - Field( - description="*[Developer use only]* Enable yappi profiling (Yet Another Python Profiler) to profile AIPerf's internal python code. " - "This can be used in the development of AIPerf in order to find performance bottlenecks across the various services. " - "The output '.prof' files can be viewed with snakeviz. Requires yappi and snakeviz to be installed. " - "Run 'pip install yappi snakeviz' to install them.", - ), - DeveloperOnlyCLI( - name=("--enable-yappi-profiling"), - ), - ] = DevDefaults.ENABLE_YAPPI - - debug_services: Annotated[ - set[ServiceType] | None, - Field( - description="*[Developer use only]* List of services to enable debug logging for. Can be a comma-separated list, a single service type, " - "or the cli flag can be used multiple times.", - ), - DeveloperOnlyCLI( - name=("--debug-service", "--debug-services"), - ), - BeforeValidator(parse_service_types), - ] = DevDefaults.DEBUG_SERVICES - - trace_services: Annotated[ - set[ServiceType] | None, - Field( - description="*[Developer use only]* List of services to enable trace logging for. Can be a comma-separated list, a single service type, " - "or the cli flag can be used multiple times.", - ), - DeveloperOnlyCLI( - name=("--trace-service", "--trace-services"), - ), - BeforeValidator(parse_service_types), - ] = DevDefaults.TRACE_SERVICES - - show_internal_metrics: Annotated[ - bool, - Field( - description="*[Developer use only]* Whether to show internal and hidden metrics in the output", - ), - DeveloperOnlyCLI( - name=("--show-internal-metrics"), - ), - ] = DevDefaults.SHOW_INTERNAL_METRICS - - disable_uvloop: Annotated[ - bool, - Field( - description="*[Developer use only]* Disable the use of uvloop, and use the default asyncio event loop instead.", - ), - DeveloperOnlyCLI( - name=("--disable-uvloop"), - ), - ] = DevDefaults.DISABLE_UVLOOP diff --git a/src/aiperf/common/config/service_config.py b/src/aiperf/common/config/service_config.py index a74a37082..ed694bf01 100644 --- a/src/aiperf/common/config/service_config.py +++ b/src/aiperf/common/config/service_config.py @@ -3,14 +3,12 @@ from typing import Annotated from pydantic import Field, model_validator -from pydantic_settings import BaseSettings, SettingsConfigDict from typing_extensions import Self from aiperf.common.aiperf_logger import AIPerfLogger -from aiperf.common.config.base_config import ADD_TO_TEMPLATE +from aiperf.common.config.base_config import ADD_TO_TEMPLATE, BaseConfig from aiperf.common.config.cli_parameter import CLIParameter, DisableCLI from aiperf.common.config.config_defaults import ServiceDefaults -from aiperf.common.config.dev_config import DeveloperConfig from aiperf.common.config.groups import Groups from aiperf.common.config.worker_config import WorkersConfig from aiperf.common.config.zmq_config import ( @@ -27,16 +25,9 @@ _logger = AIPerfLogger(__name__) -class ServiceConfig(BaseSettings): +class ServiceConfig(BaseConfig): """Base configuration for all services. It will be provided to all services during their __init__ function.""" - model_config = SettingsConfigDict( - env_prefix="AIPERF_", - env_file=".env", - env_file_encoding="utf-8", - extra="allow", - ) - _CLI_GROUP = Groups.SERVICE _comm_config: BaseZMQCommunicationConfig | None = None @@ -159,8 +150,6 @@ def validate_comm_config(self) -> Self: ), ] = ServiceDefaults.UI_TYPE - developer: DeveloperConfig = DeveloperConfig() - @property def comm_config(self) -> BaseZMQCommunicationConfig: """Get the communication configuration.""" diff --git a/src/aiperf/common/config/worker_config.py b/src/aiperf/common/config/worker_config.py index 22ab12410..72f0910bc 100644 --- a/src/aiperf/common/config/worker_config.py +++ b/src/aiperf/common/config/worker_config.py @@ -8,7 +8,7 @@ from aiperf.common.config.cli_parameter import CLIParameter, DisableCLI from aiperf.common.config.config_defaults import WorkersDefaults from aiperf.common.config.groups import Groups -from aiperf.common.constants import DEFAULT_MAX_WORKERS_CAP +from aiperf.common.environment import Environment class WorkersConfig(BaseConfig): @@ -29,7 +29,7 @@ class WorkersConfig(BaseConfig): Field( description="Maximum number of workers to create. If not specified, the number of" " workers will be determined by the formula `min(concurrency, (num CPUs * 0.75) - 1)`, " - f" with a default max cap of `{DEFAULT_MAX_WORKERS_CAP}`. Any value provided will still be capped by" + f" with a default max cap of `{Environment.WORKER.MAX_WORKERS_CAP}`. Any value provided will still be capped by" f" the concurrency value (if specified), but not by the max cap.", ), CLIParameter( diff --git a/src/aiperf/common/constants.py b/src/aiperf/common/constants.py index 8c377c55e..9fbc530fc 100644 --- a/src/aiperf/common/constants.py +++ b/src/aiperf/common/constants.py @@ -1,8 +1,6 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -import os - NANOS_PER_SECOND = 1_000_000_000 NANOS_PER_MILLIS = 1_000_000 MILLIS_PER_SECOND = 1000 @@ -24,112 +22,5 @@ "std", ] - -GRACEFUL_SHUTDOWN_TIMEOUT = 5.0 -"""Default timeout for shutting down services in seconds.""" - -DEFAULT_SHUTDOWN_ACK_TIMEOUT = 5.0 -"""Default timeout for waiting for a shutdown command response in seconds.""" - -DEFAULT_PROFILE_CANCEL_TIMEOUT = 10.0 -"""Default timeout for cancelling a profile run in seconds.""" - -TASK_CANCEL_TIMEOUT_SHORT = 2.0 -"""Maximum time to wait for simple tasks to complete when cancelling them.""" - -DEFAULT_COMMS_REQUEST_TIMEOUT = 90.0 -"""Default timeout for requests from req_clients to rep_clients in seconds.""" - -DEFAULT_PULL_CLIENT_MAX_CONCURRENCY = 100_000 -"""Default maximum concurrency for pull clients.""" - -DEFAULT_SERVICE_REGISTRATION_TIMEOUT = 30.0 -"""Default timeout for service registration in seconds.""" - -DEFAULT_SERVICE_START_TIMEOUT = 30.0 -"""Default timeout for service start in seconds.""" - -DEFAULT_COMMAND_RESPONSE_TIMEOUT = 30.0 -"""Default timeout for command responses in seconds.""" - -DEFAULT_CONNECTION_PROBE_INTERVAL = 0.1 -"""Default interval for connection probes in seconds until a response is received.""" - -DEFAULT_CONNECTION_PROBE_TIMEOUT = 30.0 -"""Maximum amount of time to wait for connection probe response.""" - -DEFAULT_PROFILE_CONFIGURE_TIMEOUT = 300.0 -"""Default timeout for profile configure command in seconds.""" - -DEFAULT_PROFILE_START_TIMEOUT = 60.0 -"""Default timeout for profile start command in seconds.""" - -DEFAULT_MAX_REGISTRATION_ATTEMPTS = 10 -"""Default maximum number of registration attempts for component services before giving up.""" - -DEFAULT_REGISTRATION_INTERVAL = 1.0 -"""Default interval between registration attempts in seconds for component services.""" - -DEFAULT_HEARTBEAT_INTERVAL = 5.0 -"""Default interval between heartbeat messages in seconds for component services.""" - -AIPERF_DEV_MODE = os.getenv("AIPERF_DEV_MODE", "false").lower() in ("true", "1") - -DEFAULT_UI_MIN_UPDATE_PERCENT = 1.0 -"""Default minimum percentage difference from the last update to trigger a UI update (for non-dashboard UIs).""" - -DEFAULT_WORKER_CHECK_INTERVAL = 1.0 -"""Default interval between worker checks in seconds for the WorkerManager.""" - -DEFAULT_WORKER_HIGH_LOAD_CPU_USAGE = 85.0 -"""Default CPU usage threshold for a worker to be considered high load.""" - -DEFAULT_WORKER_HIGH_LOAD_RECOVERY_TIME = 5.0 -"""Default time in seconds from the last time a worker was in high load before it is considered healthy again.""" - -DEFAULT_WORKER_ERROR_RECOVERY_TIME = 3.0 -"""Default time in seconds from the last time a worker had an error before it is considered healthy again.""" - -DEFAULT_WORKER_STALE_TIME = 10.0 -"""Default time in seconds from the last time a worker reported any status before it is considered stale.""" - -DEFAULT_WORKER_STATUS_SUMMARY_INTERVAL = 0.5 -"""Default interval in seconds between worker status summary messages.""" - -DEFAULT_REALTIME_METRICS_INTERVAL = 5.0 -"""Default interval in seconds between real-time metrics messages.""" - -DEFAULT_CREDIT_PROGRESS_REPORT_INTERVAL = 2.0 -"""Default interval in seconds between credit progress report messages.""" - -DEFAULT_RECORDS_PROGRESS_REPORT_INTERVAL = 2.0 -"""Default interval in seconds between records progress report messages.""" - -DEFAULT_WORKER_HEALTH_CHECK_INTERVAL = 2.0 -"""Default interval in seconds between worker health check messages.""" - -DEFAULT_RECORD_PROCESSOR_SCALE_FACTOR = 4 -"""Default scale factor for the number of record processors to spawn based on the number of workers. -This will spawn 1 record processor for every X workers.""" - -DEFAULT_MAX_WORKERS_CAP = 32 -"""Default absolute maximum number of workers to spawn, regardless of the number of CPU cores. -Only applies if the user does not specify a max workers value.""" - -DEFAULT_ZMQ_CONTEXT_TERM_TIMEOUT = 10.0 -"""Default timeout for terminating the ZMQ context in seconds.""" - -AIPERF_HTTP_CONNECTION_LIMIT = int(os.environ.get("AIPERF_HTTP_CONNECTION_LIMIT", 2500)) -"""Maximum number of concurrent connections for HTTP clients.""" - GOOD_REQUEST_COUNT_TAG = "good_request_count" """GoodRequestCount metric tag""" - -DEFAULT_RECORD_EXPORT_BATCH_SIZE = 100 -"""Default batch size for record export results processor.""" - -DEFAULT_RAW_RECORD_EXPORT_BATCH_SIZE = 10 -"""Default batch size for raw record export results processor.""" - -DEFAULT_PUBLIC_DATASET_TIMEOUT = 300.0 -"""Default timeout for public dataset loader in seconds.""" diff --git a/src/aiperf/common/environment.py b/src/aiperf/common/environment.py new file mode 100644 index 000000000..cdd3f3172 --- /dev/null +++ b/src/aiperf/common/environment.py @@ -0,0 +1,671 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +""" +Environment Configuration Module + +Provides a hierarchical, type-safe configuration system using Pydantic BaseSettings. +All settings can be configured via environment variables with the AIPERF_ prefix. + +Structure: + Environment.DATASET.* - Dataset management + Environment.DEV.* - Development and debugging settings + Environment.GPU.* - GPU telemetry collection + Environment.HTTP.* - HTTP client socket and connection settings + Environment.LOGGING.* - Logging configuration + Environment.METRICS.* - Metrics collection and storage + Environment.RECORD.* - Record processing + Environment.SERVICE.* - Service lifecycle and communication + Environment.UI.* - User interface settings + Environment.WORKER.* - Worker management and scaling + Environment.ZMQ.* - ZMQ communication settings + +Examples: + # Via environment variables: + AIPERF_HTTP_SO_RCVBUF=20971520 + AIPERF_WORKER_CPU_UTILIZATION_FACTOR=0.8 + + # In code: + print(f"Buffer: {Environment.HTTP.SO_RCVBUF}") + print(f"Workers: {Environment.WORKER.CPU_UTILIZATION_FACTOR}") +""" + +from typing import Annotated + +from pydantic import BeforeValidator, Field, model_validator +from pydantic_settings import BaseSettings, SettingsConfigDict +from typing_extensions import Self + +from aiperf.common.aiperf_logger import AIPerfLogger +from aiperf.common.config.config_validators import ( + parse_service_types, + parse_str_or_csv_list, +) +from aiperf.common.enums.service_enums import ServiceType + +_logger = AIPerfLogger(__name__) + +__all__ = ["Environment"] + + +class _DatasetSettings(BaseSettings): + """Dataset loading and configuration. + + Controls timeouts and behavior for dataset loading operations. + """ + + model_config = SettingsConfigDict( + env_prefix="AIPERF_DATASET_", + ) + + CONFIGURATION_TIMEOUT: float = Field( + ge=1.0, + le=100000.0, + default=300.0, + description="Timeout in seconds for dataset configuration operations", + ) + PUBLIC_DATASET_TIMEOUT: float = Field( + ge=1.0, + le=100000.0, + default=300.0, + description="Timeout in seconds for public dataset loading operations", + ) + + +class _DeveloperSettings(BaseSettings): + """Development and debugging configuration. + + Controls developer-focused features like debug logging, profiling, and internal metrics. + These settings are typically disabled in production environments. + """ + + model_config = SettingsConfigDict( + env_prefix="AIPERF_DEV_", + ) + + DEBUG_SERVICES: Annotated[ + set[ServiceType] | None, + BeforeValidator(parse_service_types), + ] = Field( + default=None, + description="List of services to enable DEBUG logging for (comma-separated or multiple flags)", + ) + ENABLE_YAPPI: bool = Field( + default=False, + description="Enable yappi profiling (Yet Another Python Profiler) for performance analysis. " + "Requires 'pip install yappi snakeviz'", + ) + MODE: bool = Field( + default=False, + description="Enable AIPerf Developer mode for internal metrics and debugging", + ) + SHOW_EXPERIMENTAL_METRICS: bool = Field( + default=False, + description="[Developer use only] Show experimental metrics in output (requires DEV_MODE)", + ) + SHOW_INTERNAL_METRICS: bool = Field( + default=False, + description="[Developer use only] Show internal and hidden metrics in output (requires DEV_MODE)", + ) + TRACE_SERVICES: Annotated[ + set[ServiceType] | None, + BeforeValidator(parse_service_types), + ] = Field( + default=None, + description="List of services to enable TRACE logging for (comma-separated or multiple flags)", + ) + + +class _GPUSettings(BaseSettings): + """GPU telemetry collection configuration. + + Controls GPU metrics collection frequency, endpoint detection, and shutdown behavior. + Metrics are collected from DCGM endpoints at the specified interval. + """ + + model_config = SettingsConfigDict( + env_prefix="AIPERF_GPU_", + env_parse_enums=True, + ) + + COLLECTION_INTERVAL: float = Field( + ge=0.01, + le=300.0, + default=0.33, + description="GPU telemetry metrics collection interval in seconds (default: 330ms, ~3Hz)", + ) + DEFAULT_DCGM_ENDPOINTS: Annotated[ + str | list[str], + BeforeValidator(parse_str_or_csv_list), + ] = Field( + default=["http://localhost:9400/metrics", "http://localhost:9401/metrics"], + description="Default DCGM endpoint URLs to check for GPU telemetry (comma-separated string or JSON array)", + ) + REACHABILITY_TIMEOUT: int = Field( + ge=1, + le=300, + default=5, + description="Timeout in seconds for checking GPU telemetry endpoint reachability during init", + ) + SHUTDOWN_DELAY: float = Field( + ge=1.0, + le=300.0, + default=5.0, + description="Delay in seconds before shutting down GPU telemetry service to allow command response transmission", + ) + THREAD_JOIN_TIMEOUT: float = Field( + ge=1.0, + le=300.0, + default=5.0, + description="Timeout in seconds for joining GPU telemetry collection threads during shutdown", + ) + + +class _HTTPSettings(BaseSettings): + """HTTP client socket and connection configuration. + + Controls low-level socket options, keepalive settings, DNS caching, and connection + pooling for HTTP clients. These settings optimize performance for high-throughput + streaming workloads. + """ + + model_config = SettingsConfigDict( + env_prefix="AIPERF_HTTP_", + ) + + CONNECTION_LIMIT: int = Field( + ge=1, + le=65000, + default=2500, + description="Maximum number of concurrent HTTP connections", + ) + KEEPALIVE_TIMEOUT: int = Field( + ge=1, + le=10000, + default=300, + description="HTTP connection keepalive timeout in seconds for connection pooling", + ) + SO_RCVBUF: int = Field( + ge=1024, + default=10485760, # 10MB + description="Socket receive buffer size in bytes (default: 10MB for high-throughput streaming)", + ) + SO_RCVTIMEO: int = Field( + ge=1, + le=100000, + default=30, + description="Socket receive timeout in seconds", + ) + SO_SNDBUF: int = Field( + ge=1024, + default=10485760, # 10MB + description="Socket send buffer size in bytes (default: 10MB for high-throughput streaming)", + ) + SO_SNDTIMEO: int = Field( + ge=1, + le=100000, + default=30, + description="Socket send timeout in seconds", + ) + TCP_KEEPCNT: int = Field( + ge=1, + le=100, + default=1, + description="Maximum number of keepalive probes to send before considering the connection dead", + ) + TCP_KEEPIDLE: int = Field( + ge=1, + le=100000, + default=60, + description="Time in seconds before starting TCP keepalive probes on idle connections", + ) + TCP_KEEPINTVL: int = Field( + ge=1, + le=100000, + default=30, + description="Interval in seconds between TCP keepalive probes", + ) + TCP_USER_TIMEOUT: int = Field( + ge=1, + le=1000000, + default=30000, + description="TCP user timeout in milliseconds (Linux-specific, detects dead connections)", + ) + TTL_DNS_CACHE: int = Field( + ge=1, + le=1000000, + default=300, + description="DNS cache TTL in seconds for aiohttp client sessions", + ) + + +class _LoggingSettings(BaseSettings): + """Logging system configuration. + + Controls multiprocessing log queue size and other logging behavior. + """ + + model_config = SettingsConfigDict( + env_prefix="AIPERF_LOGGING_", + ) + + QUEUE_MAXSIZE: int = Field( + ge=1, + le=1000000, + default=1000, + description="Maximum size of the multiprocessing logging queue", + ) + + +class _MetricsSettings(BaseSettings): + """Metrics collection and storage configuration. + + Controls metrics storage allocation and collection behavior. + """ + + model_config = SettingsConfigDict( + env_prefix="AIPERF_METRICS_", + ) + + ARRAY_INITIAL_CAPACITY: int = Field( + ge=100, + le=1000000, + default=10000, + description="Initial array capacity for metric storage dictionaries to minimize reallocation", + ) + + +class _RecordSettings(BaseSettings): + """Record processing and export configuration. + + Controls batch sizes, processor scaling, and progress reporting for record processing. + """ + + model_config = SettingsConfigDict( + env_prefix="AIPERF_RECORD_", + ) + + EXPORT_BATCH_SIZE: int = Field( + ge=1, + le=1000000, + default=100, + description="Batch size for record export results processor", + ) + RAW_EXPORT_BATCH_SIZE: int = Field( + ge=1, + le=1000000, + default=10, + description="Batch size for raw record writer processor", + ) + PROCESSOR_SCALE_FACTOR: int = Field( + ge=1, + le=100, + default=4, + description="Scale factor for number of record processors to spawn based on worker count. " + "Formula: 1 record processor for every X workers", + ) + PROGRESS_REPORT_INTERVAL: float = Field( + ge=0.1, + le=600.0, + default=2.0, + description="Interval in seconds between records progress report messages", + ) + + +class _ServiceSettings(BaseSettings): + """Service lifecycle and inter-service communication configuration. + + Controls timeouts for service registration, startup, shutdown, command handling, + connection probing, heartbeats, and profile operations. + """ + + model_config = SettingsConfigDict( + env_prefix="AIPERF_SERVICE_", + ) + + COMMAND_RESPONSE_TIMEOUT: float = Field( + ge=1.0, + le=1000.0, + default=30.0, + description="Timeout in seconds for command responses", + ) + COMMS_REQUEST_TIMEOUT: float = Field( + ge=1.0, + le=1000.0, + default=90.0, + description="Timeout in seconds for requests from req_clients to rep_clients", + ) + CONNECTION_PROBE_INTERVAL: float = Field( + ge=0.1, + le=600.0, + default=0.1, + description="Interval in seconds for connection probes while waiting for initial connection to the zmq message bus", + ) + CONNECTION_PROBE_TIMEOUT: float = Field( + ge=1.0, + le=100000.0, + default=90.0, + description="Maximum time in seconds to wait for connection probe response while waiting for initial connection to the zmq message bus", + ) + CREDIT_PROGRESS_REPORT_INTERVAL: float = Field( + ge=1, + le=100000.0, + default=2.0, + description="Interval in seconds between credit progress report messages", + ) + DISABLE_UVLOOP: bool = Field( + default=False, + description="Disable uvloop and use default asyncio event loop instead", + ) + HEARTBEAT_INTERVAL: float = Field( + ge=1.0, + le=100000.0, + default=5.0, + description="Interval in seconds between heartbeat messages for component services", + ) + PROFILE_CONFIGURE_TIMEOUT: float = Field( + ge=1.0, + le=100000.0, + default=300.0, + description="Timeout in seconds for profile configure command", + ) + PROFILE_START_TIMEOUT: float = Field( + ge=1.0, + le=100000.0, + default=60.0, + description="Timeout in seconds for profile start command", + ) + REGISTRATION_INTERVAL: float = Field( + ge=1.0, + le=100000.0, + default=1.0, + description="Interval in seconds between registration attempts for component services", + ) + REGISTRATION_MAX_ATTEMPTS: int = Field( + ge=1, + le=100000, + default=10, + description="Maximum number of registration attempts before giving up", + ) + REGISTRATION_TIMEOUT: float = Field( + ge=1.0, + le=100000.0, + default=30.0, + description="Timeout in seconds for service registration", + ) + START_TIMEOUT: float = Field( + ge=1.0, + le=100000.0, + default=30.0, + description="Timeout in seconds for service start operations", + ) + TASK_CANCEL_TIMEOUT_SHORT: float = Field( + ge=1.0, + le=100000.0, + default=2.0, + description="Maximum time in seconds to wait for simple tasks to complete when cancelling", + ) + + +class _UISettings(BaseSettings): + """User interface and dashboard configuration. + + Controls refresh rates, update thresholds, and notification behavior for the + various UI modes (dashboard, tqdm, etc.). + """ + + model_config = SettingsConfigDict( + env_prefix="AIPERF_UI_", + ) + + LOG_REFRESH_INTERVAL: float = Field( + ge=0.01, + le=100000.0, + default=0.1, + description="Log viewer refresh interval in seconds (default: 10 FPS)", + ) + MIN_UPDATE_PERCENT: float = Field( + ge=0.01, + le=100.0, + default=1.0, + description="Minimum percentage difference from last update to trigger a UI update (for non-dashboard UIs)", + ) + NOTIFICATION_TIMEOUT: int = Field( + ge=1, + le=100000, + default=3, + description="Duration in seconds to display UI notifications before auto-dismissing", + ) + REALTIME_METRICS_INTERVAL: float = Field( + ge=1.0, + le=1000.0, + default=5.0, + description="Interval in seconds between real-time metrics messages", + ) + SPINNER_REFRESH_RATE: float = Field( + ge=0.1, + le=100.0, + default=0.1, + description="Progress spinner refresh rate in seconds (default: 10 FPS)", + ) + + +class _WorkerSettings(BaseSettings): + """Worker management and auto-scaling configuration. + + Controls worker pool sizing, health monitoring, load detection, and recovery behavior. + The CPU_UTILIZATION_FACTOR is used in the auto-scaling formula: + max_workers = max(1, min(int(cpu_count * factor) - 1, MAX_WORKERS_CAP)) + """ + + model_config = SettingsConfigDict( + env_prefix="AIPERF_WORKER_", + ) + + CHECK_INTERVAL: float = Field( + ge=0.1, + le=100000.0, + default=1.0, + description="Interval in seconds between worker status checks by WorkerManager", + ) + CPU_UTILIZATION_FACTOR: float = Field( + ge=0.1, + le=1.0, + default=0.75, + description="Factor multiplied by CPU count to determine default max workers (0.0-1.0). " + "Formula: max(1, min(int(cpu_count * factor) - 1, MAX_WORKERS_CAP))", + ) + ERROR_RECOVERY_TIME: float = Field( + ge=0.1, + le=1000.0, + default=3.0, + description="Time in seconds from last error before worker is considered healthy again", + ) + HEALTH_CHECK_INTERVAL: float = Field( + ge=0.1, + le=1000.0, + default=2.0, + description="Interval in seconds between worker health check messages", + ) + HIGH_LOAD_CPU_USAGE: float = Field( + ge=50.0, + le=100.0, + default=85.0, + description="CPU usage percentage threshold for considering a worker under high load", + ) + HIGH_LOAD_RECOVERY_TIME: float = Field( + ge=0.1, + le=1000.0, + default=5.0, + description="Time in seconds from last high load before worker is considered recovered", + ) + MAX_WORKERS_CAP: int = Field( + ge=1, + le=10000, + default=32, + description="Absolute maximum number of workers to spawn, regardless of CPU count", + ) + STALE_TIME: float = Field( + ge=0.1, + le=1000.0, + default=10.0, + description="Time in seconds from last status report before worker is considered stale", + ) + STATUS_SUMMARY_INTERVAL: float = Field( + ge=0.1, + le=1000.0, + default=0.5, + description="Interval in seconds between worker status summary messages", + ) + + +class _ZMQSettings(BaseSettings): + """ZMQ socket and communication configuration. + + Controls ZMQ socket timeouts, keepalive settings, retry behavior, and concurrency limits. + These settings affect reliability and performance of the internal message bus. + """ + + model_config = SettingsConfigDict( + env_prefix="AIPERF_ZMQ_", + ) + + CONTEXT_TERM_TIMEOUT: float = Field( + ge=1.0, + le=100000.0, + default=10.0, + description="Timeout in seconds for terminating the ZMQ context during shutdown", + ) + PULL_MAX_CONCURRENCY: int = Field( + ge=1, + le=10000000, + default=100_000, + description="Maximum concurrency for ZMQ PULL clients", + ) + PUSH_MAX_RETRIES: int = Field( + ge=1, + le=100, + default=2, + description="Maximum number of retry attempts when pushing messages to ZMQ PUSH socket", + ) + PUSH_RETRY_DELAY: float = Field( + ge=0.1, + le=1000.0, + default=0.1, + description="Delay in seconds between retry attempts for ZMQ PUSH operations", + ) + RCVTIMEO: int = Field( + ge=1, + le=10000000, + default=300000, # 5 minutes + description="Socket receive timeout in milliseconds (default: 5 minutes)", + ) + SNDTIMEO: int = Field( + ge=1, + le=10000000, + default=300000, # 5 minutes + description="Socket send timeout in milliseconds (default: 5 minutes)", + ) + TCP_KEEPALIVE_IDLE: int = Field( + ge=1, + le=100000, + default=60, + description="Time in seconds before starting TCP keepalive probes on idle ZMQ connections", + ) + TCP_KEEPALIVE_INTVL: int = Field( + ge=1, + le=100000, + default=10, + description="Interval in seconds between TCP keepalive probes for ZMQ connections", + ) + + +class _Environment(BaseSettings): + """ + Root environment configuration with nested subsystem settings. + + This is a singleton instance that loads configuration from environment variables + with the AIPERF_ prefix. Settings are organized into logical subsystems for + better discoverability and maintainability. + + All nested settings can be configured via environment variables using the pattern: + AIPERF_{SUBSYSTEM}_{SETTING_NAME} + + Example: + AIPERF_HTTP_CONNECTION_LIMIT=5000 + AIPERF_WORKER_CPU_UTILIZATION_FACTOR=0.8 + AIPERF_ZMQ_RCVTIMEO=600000 + """ + + model_config = SettingsConfigDict( + env_prefix="AIPERF_", + env_file=".env", + env_file_encoding="utf-8", + extra="allow", + ) + + # Nested subsystem settings (alphabetically ordered) + DATASET: _DatasetSettings = Field( + default_factory=_DatasetSettings, + description="Dataset loading and configuration settings", + ) + DEV: _DeveloperSettings = Field( + default_factory=_DeveloperSettings, + description="Development and debugging settings", + ) + GPU: _GPUSettings = Field( + default_factory=_GPUSettings, + description="GPU telemetry collection settings", + ) + HTTP: _HTTPSettings = Field( + default_factory=_HTTPSettings, + description="HTTP client socket and connection settings", + ) + LOGGING: _LoggingSettings = Field( + default_factory=_LoggingSettings, + description="Logging system settings", + ) + METRICS: _MetricsSettings = Field( + default_factory=_MetricsSettings, + description="Metrics collection and storage settings", + ) + RECORD: _RecordSettings = Field( + default_factory=_RecordSettings, + description="Record processing and export settings", + ) + SERVICE: _ServiceSettings = Field( + default_factory=_ServiceSettings, + description="Service lifecycle and communication settings", + ) + UI: _UISettings = Field( + default_factory=_UISettings, + description="User interface and dashboard settings", + ) + WORKER: _WorkerSettings = Field( + default_factory=_WorkerSettings, + description="Worker management and scaling settings", + ) + ZMQ: _ZMQSettings = Field( + default_factory=_ZMQSettings, + description="ZMQ communication settings", + ) + + @model_validator(mode="after") + def validate_dev_mode(self) -> Self: + """Validate that developer mode is enabled for features that require it.""" + if self.DEV.SHOW_INTERNAL_METRICS and not self.DEV.MODE: + _logger.warning( + "Developer mode is not enabled, disabling AIPERF_DEV_SHOW_INTERNAL_METRICS" + ) + self.DEV.SHOW_INTERNAL_METRICS = False + + if self.DEV.SHOW_EXPERIMENTAL_METRICS and not self.DEV.MODE: + _logger.warning( + "Developer mode is not enabled, disabling AIPERF_DEV_SHOW_EXPERIMENTAL_METRICS" + ) + self.DEV.SHOW_EXPERIMENTAL_METRICS = False + + return self + + +# Global singleton instance +Environment = _Environment() diff --git a/src/aiperf/common/logging.py b/src/aiperf/common/logging.py index 59f80986b..bd38a03b1 100644 --- a/src/aiperf/common/logging.py +++ b/src/aiperf/common/logging.py @@ -14,11 +14,9 @@ from aiperf.common.config import ServiceConfig, ServiceDefaults, UserConfig from aiperf.common.config.config_defaults import OutputDefaults from aiperf.common.enums import AIPerfUIType, ServiceType +from aiperf.common.environment import Environment from aiperf.common.factories import ServiceFactory -LOG_QUEUE_MAXSIZE = 1000 - - _logger = AIPerfLogger(__name__) _global_log_queue: "multiprocessing.Queue | None" = None _log_queue_lock = threading.Lock() @@ -33,7 +31,9 @@ def get_global_log_queue() -> multiprocessing.Queue: if _global_log_queue is None: with _log_queue_lock: if _global_log_queue is None: - _global_log_queue = multiprocessing.Queue(maxsize=LOG_QUEUE_MAXSIZE) + _global_log_queue = multiprocessing.Queue( + maxsize=Environment.LOGGING.QUEUE_MAXSIZE + ) return _global_log_queue @@ -101,12 +101,12 @@ def setup_child_process_logging( if service_id: # If the service is in the trace or debug services, set the level to trace or debug - if service_config.developer.trace_services and _is_service_in_types( - service_id, service_config.developer.trace_services + if Environment.DEV.TRACE_SERVICES and _is_service_in_types( + service_id, Environment.DEV.TRACE_SERVICES ): level = _TRACE - elif service_config.developer.debug_services and _is_service_in_types( - service_id, service_config.developer.debug_services + elif Environment.DEV.DEBUG_SERVICES and _is_service_in_types( + service_id, Environment.DEV.DEBUG_SERVICES ): level = _DEBUG diff --git a/src/aiperf/common/mixins/command_handler_mixin.py b/src/aiperf/common/mixins/command_handler_mixin.py index 19fbbfc1f..94f8d6ef5 100644 --- a/src/aiperf/common/mixins/command_handler_mixin.py +++ b/src/aiperf/common/mixins/command_handler_mixin.py @@ -6,8 +6,8 @@ from typing import Any from aiperf.common.config import ServiceConfig, UserConfig -from aiperf.common.constants import DEFAULT_COMMAND_RESPONSE_TIMEOUT from aiperf.common.enums import MessageType +from aiperf.common.environment import Environment from aiperf.common.hooks import ( AIPerfHook, Hook, @@ -167,7 +167,9 @@ async def _publish_command_unhandled_response( ) async def send_command_and_wait_for_response( - self, message: CommandMessage, timeout: float = DEFAULT_COMMAND_RESPONSE_TIMEOUT + self, + message: CommandMessage, + timeout: float = Environment.SERVICE.COMMAND_RESPONSE_TIMEOUT, ) -> CommandResponse | ErrorDetails: """Send a single command message to a single service and wait for the response. This is useful communicating directly with a single service. @@ -189,7 +191,7 @@ async def send_command_and_wait_for_all_responses( self, command: CommandMessage, service_ids: list[str], - timeout: float = DEFAULT_COMMAND_RESPONSE_TIMEOUT, + timeout: float = Environment.SERVICE.COMMAND_RESPONSE_TIMEOUT, ) -> list[CommandResponse | ErrorDetails]: """Broadcast a command message to multiple services and wait for the responses from all of the specified services. This is useful for the system controller to send one command but wait for all services to respond. diff --git a/src/aiperf/common/mixins/message_bus_mixin.py b/src/aiperf/common/mixins/message_bus_mixin.py index 72becc8e4..9072ccbf2 100644 --- a/src/aiperf/common/mixins/message_bus_mixin.py +++ b/src/aiperf/common/mixins/message_bus_mixin.py @@ -7,13 +7,10 @@ from typing import Any from aiperf.common.config import ServiceConfig -from aiperf.common.constants import ( - DEFAULT_CONNECTION_PROBE_INTERVAL, - DEFAULT_CONNECTION_PROBE_TIMEOUT, -) from aiperf.common.decorators import implements_protocol from aiperf.common.enums import CommAddress from aiperf.common.enums.message_enums import MessageType +from aiperf.common.environment import Environment from aiperf.common.hooks import ( AIPerfHook, Hook, @@ -90,7 +87,7 @@ async def _probe_loop() -> None: try: await asyncio.wait_for( self._probe_and_wait_for_response(), - timeout=DEFAULT_CONNECTION_PROBE_INTERVAL, + timeout=Environment.SERVICE.CONNECTION_PROBE_INTERVAL, ) break except asyncio.TimeoutError: @@ -99,7 +96,9 @@ async def _probe_loop() -> None: ) await yield_to_event_loop() - await asyncio.wait_for(_probe_loop(), timeout=DEFAULT_CONNECTION_PROBE_TIMEOUT) + await asyncio.wait_for( + _probe_loop(), timeout=Environment.SERVICE.CONNECTION_PROBE_TIMEOUT + ) async def _process_connection_probe_message( self, message: ConnectionProbeMessage diff --git a/src/aiperf/common/mixins/task_manager_mixin.py b/src/aiperf/common/mixins/task_manager_mixin.py index befd7a414..822584441 100644 --- a/src/aiperf/common/mixins/task_manager_mixin.py +++ b/src/aiperf/common/mixins/task_manager_mixin.py @@ -4,8 +4,8 @@ import inspect from collections.abc import Callable, Coroutine -from aiperf.common.constants import TASK_CANCEL_TIMEOUT_SHORT from aiperf.common.decorators import implements_protocol +from aiperf.common.environment import Environment from aiperf.common.mixins.aiperf_logger_mixin import AIPerfLoggerMixin from aiperf.common.protocols import TaskManagerProtocol from aiperf.common.utils import yield_to_event_loop @@ -36,7 +36,7 @@ async def wait_for_tasks(self) -> list[BaseException | None]: return await asyncio.gather(*list(self.tasks), return_exceptions=True) async def cancel_all_tasks( - self, timeout: float = TASK_CANCEL_TIMEOUT_SHORT + self, timeout: float = Environment.SERVICE.TASK_CANCEL_TIMEOUT_SHORT ) -> None: """Cancel all tasks in the set and wait for up to timeout seconds for them to complete. diff --git a/src/aiperf/common/protocols.py b/src/aiperf/common/protocols.py index 1ca5625cf..04fd14d7d 100644 --- a/src/aiperf/common/protocols.py +++ b/src/aiperf/common/protocols.py @@ -5,15 +5,8 @@ from collections.abc import Callable, Coroutine from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable -from aiperf.common.constants import ( - DEFAULT_COMMS_REQUEST_TIMEOUT, - DEFAULT_SERVICE_REGISTRATION_TIMEOUT, - DEFAULT_SERVICE_START_TIMEOUT, -) -from aiperf.common.enums import ( - CommClientType, - LifecycleState, -) +from aiperf.common.enums import CommClientType, LifecycleState +from aiperf.common.environment import Environment from aiperf.common.hooks import Hook, HookType from aiperf.common.models import ( MetricRecordMetadata, @@ -174,7 +167,7 @@ class RequestClientProtocol(CommunicationClientProtocol, Protocol): async def request( self, message: MessageT, - timeout: float = DEFAULT_COMMS_REQUEST_TIMEOUT, + timeout: float = Environment.SERVICE.COMMS_REQUEST_TIMEOUT, ) -> MessageOutputT: ... async def request_async( @@ -467,13 +460,13 @@ async def stop_services_by_type( async def wait_for_all_services_registration( self, stop_event: asyncio.Event, - timeout_seconds: float = DEFAULT_SERVICE_REGISTRATION_TIMEOUT, + timeout_seconds: float = Environment.SERVICE.REGISTRATION_TIMEOUT, ) -> None: ... async def wait_for_all_services_start( self, stop_event: asyncio.Event, - timeout_seconds: float = DEFAULT_SERVICE_START_TIMEOUT, + timeout_seconds: float = Environment.SERVICE.START_TIMEOUT, ) -> None: ... diff --git a/src/aiperf/controller/base_service_manager.py b/src/aiperf/controller/base_service_manager.py index 785ff1ffa..4dfe25e41 100644 --- a/src/aiperf/controller/base_service_manager.py +++ b/src/aiperf/controller/base_service_manager.py @@ -4,11 +4,8 @@ from abc import ABC, abstractmethod from aiperf.common.config import ServiceConfig, UserConfig -from aiperf.common.constants import ( - DEFAULT_SERVICE_REGISTRATION_TIMEOUT, - DEFAULT_SERVICE_START_TIMEOUT, -) from aiperf.common.decorators import implements_protocol +from aiperf.common.environment import Environment from aiperf.common.hooks import on_start, on_stop from aiperf.common.mixins import AIPerfLifecycleMixin from aiperf.common.models import ServiceRunInfo @@ -107,7 +104,7 @@ async def kill_all_services(self) -> list[BaseException | None]: async def wait_for_all_services_registration( self, stop_event: asyncio.Event, - timeout_seconds: float = DEFAULT_SERVICE_REGISTRATION_TIMEOUT, + timeout_seconds: float = Environment.SERVICE.REGISTRATION_TIMEOUT, ) -> None: pass @@ -115,6 +112,6 @@ async def wait_for_all_services_registration( async def wait_for_all_services_start( self, stop_event: asyncio.Event, - timeout_seconds: float = DEFAULT_SERVICE_START_TIMEOUT, + timeout_seconds: float = Environment.SERVICE.START_TIMEOUT, ) -> None: pass diff --git a/src/aiperf/controller/kubernetes_service_manager.py b/src/aiperf/controller/kubernetes_service_manager.py index 2f7ee267d..081efe585 100644 --- a/src/aiperf/controller/kubernetes_service_manager.py +++ b/src/aiperf/controller/kubernetes_service_manager.py @@ -5,12 +5,9 @@ from pydantic import BaseModel from aiperf.common.config import ServiceConfig, UserConfig -from aiperf.common.constants import ( - DEFAULT_SERVICE_REGISTRATION_TIMEOUT, - DEFAULT_SERVICE_START_TIMEOUT, -) from aiperf.common.decorators import implements_protocol from aiperf.common.enums import ServiceRunType +from aiperf.common.environment import Environment from aiperf.common.factories import ServiceManagerFactory from aiperf.common.protocols import ServiceManagerProtocol from aiperf.common.types import ServiceTypeT @@ -70,7 +67,7 @@ async def kill_all_services(self) -> list[BaseException | None]: async def wait_for_all_services_registration( self, stop_event: asyncio.Event, - timeout_seconds: float = DEFAULT_SERVICE_REGISTRATION_TIMEOUT, + timeout_seconds: float = Environment.SERVICE.REGISTRATION_TIMEOUT, ) -> None: """Wait for all required services to be registered in Kubernetes.""" self.logger.debug( @@ -84,7 +81,7 @@ async def wait_for_all_services_registration( async def wait_for_all_services_start( self, stop_event: asyncio.Event, - timeout_seconds: float = DEFAULT_SERVICE_START_TIMEOUT, + timeout_seconds: float = Environment.SERVICE.START_TIMEOUT, ) -> None: """Wait for all required services to be started in Kubernetes.""" self.logger.debug( diff --git a/src/aiperf/controller/multiprocess_service_manager.py b/src/aiperf/controller/multiprocess_service_manager.py index 5ea5e74e4..2bfbbdad2 100644 --- a/src/aiperf/controller/multiprocess_service_manager.py +++ b/src/aiperf/controller/multiprocess_service_manager.py @@ -10,13 +10,9 @@ from aiperf.common.bootstrap import bootstrap_and_run_service from aiperf.common.config import ServiceConfig, UserConfig -from aiperf.common.constants import ( - DEFAULT_SERVICE_REGISTRATION_TIMEOUT, - DEFAULT_SERVICE_START_TIMEOUT, - TASK_CANCEL_TIMEOUT_SHORT, -) from aiperf.common.decorators import implements_protocol from aiperf.common.enums import ServiceRegistrationStatus, ServiceRunType +from aiperf.common.environment import Environment from aiperf.common.exceptions import AIPerfError from aiperf.common.factories import ServiceFactory, ServiceManagerFactory from aiperf.common.protocols import ServiceManagerProtocol @@ -139,7 +135,7 @@ async def kill_all_services(self) -> list[BaseException | None]: async def wait_for_all_services_registration( self, stop_event: asyncio.Event, - timeout_seconds: float = DEFAULT_SERVICE_REGISTRATION_TIMEOUT, + timeout_seconds: float = Environment.SERVICE.REGISTRATION_TIMEOUT, ) -> None: """Wait for all required services to be registered. @@ -207,7 +203,7 @@ async def _wait_for_process(self, info: MultiProcessRunInfo) -> None: try: info.process.terminate() await asyncio.to_thread( - info.process.join, timeout=TASK_CANCEL_TIMEOUT_SHORT + info.process.join, timeout=Environment.SERVICE.TASK_CANCEL_TIMEOUT_SHORT ) self.debug( f"Service {info.service_type} process stopped (pid: {info.process.pid})" @@ -221,7 +217,7 @@ async def _wait_for_process(self, info: MultiProcessRunInfo) -> None: async def wait_for_all_services_start( self, stop_event: asyncio.Event, - timeout_seconds: float = DEFAULT_SERVICE_START_TIMEOUT, + timeout_seconds: float = Environment.SERVICE.START_TIMEOUT, ) -> None: """Wait for all required services to be started.""" self.debug("Waiting for all required services to start...") diff --git a/src/aiperf/controller/proxy_manager.py b/src/aiperf/controller/proxy_manager.py index 7327f8163..7404552e2 100644 --- a/src/aiperf/controller/proxy_manager.py +++ b/src/aiperf/controller/proxy_manager.py @@ -5,8 +5,8 @@ import zmq.asyncio from aiperf.common.config import ServiceConfig -from aiperf.common.constants import DEFAULT_ZMQ_CONTEXT_TERM_TIMEOUT from aiperf.common.enums import ZMQProxyType +from aiperf.common.environment import Environment from aiperf.common.factories import ZMQProxyFactory from aiperf.common.hooks import on_init, on_start, on_stop from aiperf.common.mixins import AIPerfLifecycleMixin @@ -56,7 +56,7 @@ async def _stop_proxies(self) -> None: self.debug("Terminating ZMQ context") await asyncio.wait_for( asyncio.to_thread(zmq.asyncio.Context.instance().term), - timeout=DEFAULT_ZMQ_CONTEXT_TERM_TIMEOUT, + timeout=Environment.ZMQ.CONTEXT_TERM_TIMEOUT, ) self.debug("ZMQ context terminated successfully") except BaseException as e: diff --git a/src/aiperf/controller/system_controller.py b/src/aiperf/controller/system_controller.py index a230b173c..9757991a5 100644 --- a/src/aiperf/controller/system_controller.py +++ b/src/aiperf/controller/system_controller.py @@ -8,16 +8,10 @@ from rich.console import Console +from aiperf.cli_utils import print_developer_mode_warning from aiperf.common.base_service import BaseService from aiperf.common.config import ServiceConfig, UserConfig from aiperf.common.config.config_defaults import OutputDefaults -from aiperf.common.config.dev_config import print_developer_mode_warning -from aiperf.common.constants import ( - AIPERF_DEV_MODE, - DEFAULT_PROFILE_CONFIGURE_TIMEOUT, - DEFAULT_PROFILE_START_TIMEOUT, - DEFAULT_RECORD_PROCESSOR_SCALE_FACTOR, -) from aiperf.common.enums import ( CommandResponseStatus, CommandType, @@ -25,6 +19,7 @@ ServiceRegistrationStatus, ServiceType, ) +from aiperf.common.environment import Environment from aiperf.common.exceptions import LifecycleOperationError from aiperf.common.factories import ( AIPerfUIFactory, @@ -86,6 +81,10 @@ def __init__( service_id=service_id, ) self.debug("Creating System Controller") + if Environment.DEV.MODE: + # Print a warning message to the console if developer mode is enabled, once at load time + print_developer_mode_warning() + self._was_cancelled = False # List of required service types, in no particular order # These are services that must be running before the system controller can start profiling @@ -209,7 +208,7 @@ async def _profile_configure_all_services(self) -> None: config=self.user_config, ), list(self.service_manager.service_id_map.keys()), - timeout=DEFAULT_PROFILE_CONFIGURE_TIMEOUT, + timeout=Environment.SERVICE.PROFILE_CONFIGURE_TIMEOUT, ) duration = time.perf_counter() - begin self._parse_responses_for_errors(responses, "Configure Profiling") @@ -223,7 +222,7 @@ async def _start_profiling_all_services(self) -> None: service_id=self.service_id, ), list(self.service_manager.service_id_map.keys()), - timeout=DEFAULT_PROFILE_START_TIMEOUT, + timeout=Environment.SERVICE.PROFILE_START_TIMEOUT, ) self._parse_responses_for_errors(responses, "Start Profiling") self.info("All services started profiling successfully") @@ -410,7 +409,9 @@ async def _handle_spawn_workers_command(self, message: SpawnWorkersCommand) -> N if self.scale_record_processors_with_workers: await self.service_manager.run_service( ServiceType.RECORD_PROCESSOR, - max(1, message.num_workers // DEFAULT_RECORD_PROCESSOR_SCALE_FACTOR), + max( + 1, message.num_workers // Environment.RECORD.PROCESSOR_SCALE_FACTOR + ), ) @on_command(CommandType.SHUTDOWN_WORKERS) @@ -560,7 +561,7 @@ async def _stop_system_controller(self) -> None: else: self._print_exit_errors_and_log_file() - if AIPERF_DEV_MODE: + if Environment.DEV.MODE: # Print a warning message to the console if developer mode is enabled, on exit after results print_developer_mode_warning() diff --git a/src/aiperf/dataset/__init__.py b/src/aiperf/dataset/__init__.py index 1cd7af5f3..05995de87 100644 --- a/src/aiperf/dataset/__init__.py +++ b/src/aiperf/dataset/__init__.py @@ -7,7 +7,6 @@ SyntheticDatasetComposer, ) from aiperf.dataset.dataset_manager import ( - DATASET_CONFIGURATION_TIMEOUT, DatasetManager, main, ) @@ -62,7 +61,6 @@ "CustomDatasetComposer", "CustomDatasetLoaderProtocol", "CustomDatasetT", - "DATASET_CONFIGURATION_TIMEOUT", "DEFAULT_CORPUS_FILE", "DatasetManager", "ImageGenerator", diff --git a/src/aiperf/dataset/dataset_manager.py b/src/aiperf/dataset/dataset_manager.py index cb50290fd..e3283c53b 100644 --- a/src/aiperf/dataset/dataset_manager.py +++ b/src/aiperf/dataset/dataset_manager.py @@ -17,6 +17,7 @@ MessageType, ServiceType, ) +from aiperf.common.environment import Environment from aiperf.common.factories import ( ComposerFactory, DatasetSamplingStrategyFactory, @@ -42,7 +43,6 @@ from aiperf.common.tokenizer import Tokenizer from aiperf.dataset.loader import ShareGPTLoader -DATASET_CONFIGURATION_TIMEOUT = 300.0 _logger = AIPerfLogger(__name__) @@ -350,7 +350,8 @@ async def _wait_for_dataset_configuration(self) -> None: "Dataset not configured. Waiting for dataset to be configured..." ) await asyncio.wait_for( - self.dataset_configured.wait(), timeout=DATASET_CONFIGURATION_TIMEOUT + self.dataset_configured.wait(), + timeout=Environment.DATASET.CONFIGURATION_TIMEOUT, ) diff --git a/src/aiperf/dataset/loader/base_public_dataset.py b/src/aiperf/dataset/loader/base_public_dataset.py index 35a937fd2..d515a046b 100644 --- a/src/aiperf/dataset/loader/base_public_dataset.py +++ b/src/aiperf/dataset/loader/base_public_dataset.py @@ -4,7 +4,7 @@ from pathlib import Path from typing import Any, ClassVar -from aiperf.common.constants import DEFAULT_PUBLIC_DATASET_TIMEOUT +from aiperf.common.environment import Environment from aiperf.common.exceptions import DatasetLoaderError from aiperf.common.mixins import AIPerfLoggerMixin from aiperf.common.models import Conversation, RequestRecord @@ -45,7 +45,9 @@ class BasePublicDatasetLoader(AIPerfLoggerMixin): def __init__(self, **kwargs): super().__init__(**kwargs) - self.http_client = AioHttpClient(timeout=DEFAULT_PUBLIC_DATASET_TIMEOUT) + self.http_client = AioHttpClient( + timeout=Environment.DATASET.PUBLIC_DATASET_TIMEOUT + ) self.cache_filepath = AIPERF_DATASET_CACHE_DIR / self.filename async def load_dataset(self) -> dict[str, Any]: diff --git a/src/aiperf/exporters/experimental_metrics_console_exporter.py b/src/aiperf/exporters/experimental_metrics_console_exporter.py index 22067e1cc..938e29113 100644 --- a/src/aiperf/exporters/experimental_metrics_console_exporter.py +++ b/src/aiperf/exporters/experimental_metrics_console_exporter.py @@ -1,9 +1,9 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -from aiperf.common.constants import AIPERF_DEV_MODE from aiperf.common.decorators import implements_protocol from aiperf.common.enums import MetricFlags from aiperf.common.enums.data_exporter_enums import ConsoleExporterType +from aiperf.common.environment import Environment from aiperf.common.exceptions import ConsoleExporterDisabled from aiperf.common.factories import ConsoleExporterFactory from aiperf.common.models import MetricResult @@ -23,8 +23,8 @@ class ConsoleExperimentalMetricsExporter(ConsoleMetricsExporter): def __init__(self, exporter_config: ExporterConfig, **kwargs) -> None: super().__init__(exporter_config=exporter_config, **kwargs) - self._show_experimental_metrics = AIPERF_DEV_MODE and ( - exporter_config.service_config.developer.show_internal_metrics + self._show_experimental_metrics = ( + Environment.DEV.MODE and Environment.DEV.SHOW_EXPERIMENTAL_METRICS ) if not self._show_experimental_metrics: raise ConsoleExporterDisabled( diff --git a/src/aiperf/exporters/internal_metrics_console_exporter.py b/src/aiperf/exporters/internal_metrics_console_exporter.py index 100182b25..269b952a2 100644 --- a/src/aiperf/exporters/internal_metrics_console_exporter.py +++ b/src/aiperf/exporters/internal_metrics_console_exporter.py @@ -1,9 +1,9 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -from aiperf.common.constants import AIPERF_DEV_MODE from aiperf.common.decorators import implements_protocol from aiperf.common.enums import MetricFlags from aiperf.common.enums.data_exporter_enums import ConsoleExporterType +from aiperf.common.environment import Environment from aiperf.common.exceptions import ConsoleExporterDisabled from aiperf.common.factories import ConsoleExporterFactory from aiperf.common.models import MetricResult @@ -24,8 +24,8 @@ class ConsoleInternalMetricsExporter(ConsoleMetricsExporter): def __init__(self, exporter_config: ExporterConfig, **kwargs) -> None: super().__init__(exporter_config=exporter_config, **kwargs) - self._show_internal_metrics = AIPERF_DEV_MODE and ( - exporter_config.service_config.developer.show_internal_metrics + self._show_internal_metrics = ( + Environment.DEV.MODE and Environment.DEV.SHOW_INTERNAL_METRICS ) if not self._show_internal_metrics: raise ConsoleExporterDisabled( diff --git a/src/aiperf/gpu_telemetry/__init__.py b/src/aiperf/gpu_telemetry/__init__.py index 9758ece7d..a39b860a4 100644 --- a/src/aiperf/gpu_telemetry/__init__.py +++ b/src/aiperf/gpu_telemetry/__init__.py @@ -8,11 +8,7 @@ from aiperf.gpu_telemetry.constants import ( DCGM_TO_FIELD_MAPPING, - DEFAULT_COLLECTION_INTERVAL, - DEFAULT_DCGM_ENDPOINTS, SCALING_FACTORS, - THREAD_JOIN_TIMEOUT, - URL_REACHABILITY_TIMEOUT, ) from aiperf.gpu_telemetry.telemetry_data_collector import ( TelemetryDataCollector, @@ -23,11 +19,7 @@ __all__ = [ "DCGM_TO_FIELD_MAPPING", - "DEFAULT_COLLECTION_INTERVAL", - "DEFAULT_DCGM_ENDPOINTS", "SCALING_FACTORS", - "THREAD_JOIN_TIMEOUT", "TelemetryDataCollector", "TelemetryManager", - "URL_REACHABILITY_TIMEOUT", ] diff --git a/src/aiperf/gpu_telemetry/constants.py b/src/aiperf/gpu_telemetry/constants.py index 2aeb883e3..d7bd8985e 100644 --- a/src/aiperf/gpu_telemetry/constants.py +++ b/src/aiperf/gpu_telemetry/constants.py @@ -14,17 +14,6 @@ TemperatureMetricUnit, ) -# Default telemetry configuration -DEFAULT_DCGM_ENDPOINTS = [ - "http://localhost:9400/metrics", - "http://localhost:9401/metrics", -] -DEFAULT_COLLECTION_INTERVAL = 0.33 # in seconds, 330ms (~3Hz) - -# Timeouts for telemetry operations (seconds) -URL_REACHABILITY_TIMEOUT = 5 -THREAD_JOIN_TIMEOUT = 5.0 - # Unit conversion scaling factors SCALING_FACTORS = { "energy_consumption": 1e-9, # mJ to MJ diff --git a/src/aiperf/gpu_telemetry/telemetry_data_collector.py b/src/aiperf/gpu_telemetry/telemetry_data_collector.py index 6f10bc1ea..39c27a02d 100644 --- a/src/aiperf/gpu_telemetry/telemetry_data_collector.py +++ b/src/aiperf/gpu_telemetry/telemetry_data_collector.py @@ -9,14 +9,13 @@ import aiohttp from prometheus_client.parser import text_string_to_metric_families +from aiperf.common.environment import Environment from aiperf.common.hooks import background_task, on_init, on_stop from aiperf.common.mixins.aiperf_lifecycle_mixin import AIPerfLifecycleMixin from aiperf.common.models import ErrorDetails, TelemetryMetrics, TelemetryRecord from aiperf.gpu_telemetry.constants import ( DCGM_TO_FIELD_MAPPING, - DEFAULT_COLLECTION_INTERVAL, SCALING_FACTORS, - URL_REACHABILITY_TIMEOUT, ) __all__ = ["TelemetryDataCollector"] @@ -47,7 +46,7 @@ class TelemetryDataCollector(AIPerfLifecycleMixin): def __init__( self, dcgm_url: str, - collection_interval: float = DEFAULT_COLLECTION_INTERVAL, + collection_interval: float | None = None, record_callback: Callable[[list[TelemetryRecord], str], Awaitable[None] | None] | None = None, error_callback: Callable[[ErrorDetails, str], Awaitable[None] | None] @@ -55,7 +54,11 @@ def __init__( collector_id: str = "telemetry_collector", ) -> None: self._dcgm_url = dcgm_url - self._collection_interval = collection_interval + self._collection_interval = ( + collection_interval + if collection_interval is not None + else Environment.GPU.COLLECTION_INTERVAL + ) self._record_callback = record_callback self._error_callback = error_callback self._scaling_factors = SCALING_FACTORS @@ -70,7 +73,7 @@ async def _initialize_http_client(self) -> None: Called automatically by AIPerfLifecycleMixin during initialization phase. Creates an aiohttp ClientSession with appropriate timeout settings. """ - timeout = aiohttp.ClientTimeout(total=URL_REACHABILITY_TIMEOUT) + timeout = aiohttp.ClientTimeout(total=Environment.GPU.REACHABILITY_TIMEOUT) self._session = aiohttp.ClientSession(timeout=timeout) @on_stop @@ -116,7 +119,7 @@ async def is_url_reachable(self) -> bool: return False else: # Create a temporary session for reachability check - timeout = aiohttp.ClientTimeout(total=URL_REACHABILITY_TIMEOUT) + timeout = aiohttp.ClientTimeout(total=Environment.GPU.REACHABILITY_TIMEOUT) async with aiohttp.ClientSession(timeout=timeout) as temp_session: try: # Try HEAD first for efficiency diff --git a/src/aiperf/gpu_telemetry/telemetry_manager.py b/src/aiperf/gpu_telemetry/telemetry_manager.py index 2e3dfb4d3..bc8aa6898 100644 --- a/src/aiperf/gpu_telemetry/telemetry_manager.py +++ b/src/aiperf/gpu_telemetry/telemetry_manager.py @@ -12,6 +12,7 @@ CommandType, ServiceType, ) +from aiperf.common.environment import Environment from aiperf.common.factories import ServiceFactory from aiperf.common.hooks import on_command, on_init, on_stop from aiperf.common.messages import ( @@ -26,10 +27,6 @@ PushClientProtocol, ServiceProtocol, ) -from aiperf.gpu_telemetry.constants import ( - DEFAULT_COLLECTION_INTERVAL, - DEFAULT_DCGM_ENDPOINTS, -) from aiperf.gpu_telemetry.telemetry_data_collector import TelemetryDataCollector __all__ = ["TelemetryManager"] @@ -100,15 +97,20 @@ def __init__( # Store user-provided endpoints separately for display filtering (excluding auto-inserted defaults) self._user_provided_endpoints = [ - ep for ep in valid_endpoints if ep not in DEFAULT_DCGM_ENDPOINTS + ep + for ep in valid_endpoints + if ep not in Environment.GPU.DEFAULT_DCGM_ENDPOINTS ] # Combine defaults + user endpoints, preserving order and removing duplicates self._dcgm_endpoints = list( - dict.fromkeys(list(DEFAULT_DCGM_ENDPOINTS) + self._user_provided_endpoints) + dict.fromkeys( + list(Environment.GPU.DEFAULT_DCGM_ENDPOINTS) + + self._user_provided_endpoints + ) ) - self._collection_interval = DEFAULT_COLLECTION_INTERVAL + self._collection_interval = Environment.GPU.COLLECTION_INTERVAL @staticmethod def _normalize_dcgm_url(url: str) -> str: @@ -202,7 +204,9 @@ async def _profile_configure_command( # Determine which defaults are reachable for display filtering reachable_endpoints = list(self._collectors.keys()) reachable_defaults = [ - ep for ep in DEFAULT_DCGM_ENDPOINTS if ep in reachable_endpoints + ep + for ep in Environment.GPU.DEFAULT_DCGM_ENDPOINTS + if ep in reachable_endpoints ] endpoints_for_display = self._compute_endpoints_for_display(reachable_defaults) @@ -285,10 +289,10 @@ async def _telemetry_manager_stop(self) -> None: async def _delayed_shutdown(self) -> None: """Shutdown service after a delay to allow command response to be sent. - Waits 5 seconds before calling stop() to ensure the command response + Waits before calling stop() to ensure the command response has time to be published and transmitted to the SystemController. """ - await asyncio.sleep(5.0) + await asyncio.sleep(Environment.GPU.SHUTDOWN_DELAY) await self.stop() async def _send_telemetry_disabled_status_and_shutdown(self, reason: str) -> None: diff --git a/src/aiperf/metrics/metric_dicts.py b/src/aiperf/metrics/metric_dicts.py index 74d19840c..5624b79c3 100644 --- a/src/aiperf/metrics/metric_dicts.py +++ b/src/aiperf/metrics/metric_dicts.py @@ -12,6 +12,7 @@ MetricValueTypeT, MetricValueTypeVarT, ) +from aiperf.common.environment import Environment from aiperf.common.exceptions import MetricTypeError, MetricUnitError, NoMetricValue from aiperf.common.models.record_models import MetricResult, MetricValue from aiperf.common.types import MetricTagT @@ -59,7 +60,10 @@ class MetricRecordDict(BaseMetricDict[MetricValueTypeT]): """ def to_display_dict( - self, registry: "type[MetricRegistry]", show_internal: bool = False + self, + registry: "type[MetricRegistry]", + show_internal: bool = False, + show_experimental: bool = False, ) -> dict[str, MetricValue]: """Convert to display units with filtering applied. NOTE: This will not include metrics with the `NO_INDIVIDUAL_RECORDS` flag. @@ -81,11 +85,13 @@ def to_display_dict( _logger.warning(f"Metric {tag} not found in registry") continue - if not show_internal and not metric_class.missing_flags( - MetricFlags.EXPERIMENTAL | MetricFlags.INTERNAL + if ( + metric_class.has_flags(MetricFlags.EXPERIMENTAL) + and not show_experimental ): continue - + if metric_class.has_flags(MetricFlags.INTERNAL) and not show_internal: + continue if metric_class.has_flags(MetricFlags.NO_INDIVIDUAL_RECORDS): continue @@ -140,7 +146,9 @@ class MetricArray(Generic[MetricValueTypeVarT]): This is used to store the values of a metric over time. """ - def __init__(self, initial_capacity: int = 10000): + def __init__( + self, initial_capacity: int = Environment.METRICS.ARRAY_INITIAL_CAPACITY + ): """Initialize the array with the given initial capacity.""" if initial_capacity <= 0: raise ValueError("Initial capacity must be greater than 0") diff --git a/src/aiperf/post_processors/raw_record_writer_processor.py b/src/aiperf/post_processors/raw_record_writer_processor.py index 6ab1ab8b2..0b21aafd0 100644 --- a/src/aiperf/post_processors/raw_record_writer_processor.py +++ b/src/aiperf/post_processors/raw_record_writer_processor.py @@ -8,10 +8,10 @@ from aiperf.common.config import UserConfig from aiperf.common.config.config_defaults import OutputDefaults -from aiperf.common.constants import DEFAULT_RAW_RECORD_EXPORT_BATCH_SIZE from aiperf.common.decorators import implements_protocol from aiperf.common.enums.data_exporter_enums import DataExporterType, ExportLevel from aiperf.common.enums.post_processor_enums import RecordProcessorType +from aiperf.common.environment import Environment from aiperf.common.exceptions import DataExporterDisabled, PostProcessorDisabled from aiperf.common.factories import ( DataExporterFactory, @@ -76,7 +76,7 @@ def __init__( # Initialize the buffered writer mixin super().__init__( output_file=output_file, - batch_size=DEFAULT_RAW_RECORD_EXPORT_BATCH_SIZE, + batch_size=Environment.RECORD.RAW_EXPORT_BATCH_SIZE, service_id=service_id, user_config=user_config, **kwargs, diff --git a/src/aiperf/post_processors/record_export_results_processor.py b/src/aiperf/post_processors/record_export_results_processor.py index 2cca97552..73cc993dc 100644 --- a/src/aiperf/post_processors/record_export_results_processor.py +++ b/src/aiperf/post_processors/record_export_results_processor.py @@ -2,9 +2,9 @@ # SPDX-License-Identifier: Apache-2.0 from aiperf.common.config import ServiceConfig, UserConfig -from aiperf.common.constants import AIPERF_DEV_MODE, DEFAULT_RECORD_EXPORT_BATCH_SIZE from aiperf.common.decorators import implements_protocol from aiperf.common.enums import ExportLevel, ResultsProcessorType +from aiperf.common.environment import Environment from aiperf.common.exceptions import PostProcessorDisabled from aiperf.common.factories import ResultsProcessorFactory from aiperf.common.messages.inference_messages import MetricRecordsData @@ -43,13 +43,16 @@ def __init__( # Initialize parent classes with the output file super().__init__( output_file=output_file, - batch_size=DEFAULT_RECORD_EXPORT_BATCH_SIZE, + batch_size=Environment.RECORD.EXPORT_BATCH_SIZE, user_config=user_config, **kwargs, ) self.show_internal = ( - AIPERF_DEV_MODE and service_config.developer.show_internal_metrics + Environment.DEV.MODE and Environment.DEV.SHOW_INTERNAL_METRICS + ) + self.show_experimental = ( + Environment.DEV.MODE and Environment.DEV.SHOW_EXPERIMENTAL_METRICS ) self.info(f"Record metrics export enabled: {self.output_file}") @@ -57,7 +60,7 @@ async def process_result(self, record_data: MetricRecordsData) -> None: try: metric_dict = MetricRecordDict(record_data.metrics) display_metrics = metric_dict.to_display_dict( - MetricRegistry, self.show_internal + MetricRegistry, self.show_internal, self.show_experimental ) if not display_metrics: return diff --git a/src/aiperf/records/record_processor_service.py b/src/aiperf/records/record_processor_service.py index 7ab2fac35..d4e0d92ee 100644 --- a/src/aiperf/records/record_processor_service.py +++ b/src/aiperf/records/record_processor_service.py @@ -5,13 +5,13 @@ from aiperf.common.base_component_service import BaseComponentService from aiperf.common.config import ServiceConfig, UserConfig -from aiperf.common.constants import DEFAULT_PULL_CLIENT_MAX_CONCURRENCY from aiperf.common.enums import ( CommAddress, CommandType, MessageType, ServiceType, ) +from aiperf.common.environment import Environment from aiperf.common.exceptions import FactoryCreationError, PostProcessorDisabled from aiperf.common.factories import ( RecordProcessorFactory, @@ -60,7 +60,7 @@ def __init__( service_id=service_id, pull_client_address=CommAddress.RAW_INFERENCE_PROXY_BACKEND, pull_client_bind=False, - pull_client_max_concurrency=DEFAULT_PULL_CLIENT_MAX_CONCURRENCY, + pull_client_max_concurrency=Environment.ZMQ.PULL_MAX_CONCURRENCY, ) self.records_push_client: PushClientProtocol = self.comms.create_push_client( CommAddress.RECORDS, diff --git a/src/aiperf/records/records_manager.py b/src/aiperf/records/records_manager.py index 83def3ffe..55e828ee7 100644 --- a/src/aiperf/records/records_manager.py +++ b/src/aiperf/records/records_manager.py @@ -6,12 +6,7 @@ from aiperf.common.base_component_service import BaseComponentService from aiperf.common.config import ServiceConfig, UserConfig -from aiperf.common.constants import ( - DEFAULT_PULL_CLIENT_MAX_CONCURRENCY, - DEFAULT_REALTIME_METRICS_INTERVAL, - DEFAULT_RECORDS_PROGRESS_REPORT_INTERVAL, - NANOS_PER_SECOND, -) +from aiperf.common.constants import NANOS_PER_SECOND from aiperf.common.decorators import implements_protocol from aiperf.common.enums import ( AIPerfUIType, @@ -21,6 +16,7 @@ MessageType, ServiceType, ) +from aiperf.common.environment import Environment from aiperf.common.exceptions import PostProcessorDisabled from aiperf.common.factories import ResultsProcessorFactory, ServiceFactory from aiperf.common.hooks import background_task, on_command, on_message, on_pull_message @@ -83,7 +79,7 @@ def __init__( service_id=service_id, pull_client_address=CommAddress.RECORDS, pull_client_bind=True, - pull_client_max_concurrency=DEFAULT_PULL_CLIENT_MAX_CONCURRENCY, + pull_client_max_concurrency=Environment.ZMQ.PULL_MAX_CONCURRENCY, ) ######################################################### @@ -379,7 +375,9 @@ async def _on_credit_phase_complete( # all records before we have the final request count set. await self._check_if_all_records_received() - @background_task(interval=DEFAULT_RECORDS_PROGRESS_REPORT_INTERVAL, immediate=False) + @background_task( + interval=Environment.RECORD.PROGRESS_REPORT_INTERVAL, immediate=False + ) async def _report_records_task(self) -> None: """Report the records processing stats.""" if self.processing_stats.processed > 0 or self.processing_stats.errors > 0: @@ -425,7 +423,7 @@ async def _report_realtime_metrics_task(self) -> None: if self.service_config.ui_type != AIPerfUIType.DASHBOARD: return while not self.stop_requested: - await asyncio.sleep(DEFAULT_REALTIME_METRICS_INTERVAL) + await asyncio.sleep(Environment.UI.REALTIME_METRICS_INTERVAL) async with self.processing_status_lock: if ( self.processing_stats.total_records diff --git a/src/aiperf/timing/credit_issuing_strategy.py b/src/aiperf/timing/credit_issuing_strategy.py index 850df54ad..edc9bc2bd 100644 --- a/src/aiperf/timing/credit_issuing_strategy.py +++ b/src/aiperf/timing/credit_issuing_strategy.py @@ -5,11 +5,9 @@ import time from abc import ABC, abstractmethod -from aiperf.common.constants import ( - DEFAULT_CREDIT_PROGRESS_REPORT_INTERVAL, - NANOS_PER_SECOND, -) +from aiperf.common.constants import NANOS_PER_SECOND from aiperf.common.enums import CreditPhase, TimingMode +from aiperf.common.environment import Environment from aiperf.common.exceptions import ConfigurationError from aiperf.common.factories import AIPerfFactory from aiperf.common.messages import CreditReturnMessage @@ -342,7 +340,7 @@ async def _progress_report_loop(self) -> None: """Report the progress at a fixed interval.""" self.debug("Starting progress reporting loop") while not self.all_phases_complete_event.is_set(): - await asyncio.sleep(DEFAULT_CREDIT_PROGRESS_REPORT_INTERVAL) + await asyncio.sleep(Environment.SERVICE.CREDIT_PROGRESS_REPORT_INTERVAL) for phase, stats in self.phase_stats.items(): try: diff --git a/src/aiperf/transports/aiohttp_client.py b/src/aiperf/transports/aiohttp_client.py index 6ce2279e0..9d18b692f 100644 --- a/src/aiperf/transports/aiohttp_client.py +++ b/src/aiperf/transports/aiohttp_client.py @@ -156,19 +156,8 @@ def socket_factory(addr_info): SocketDefaults.apply_to_socket(sock) return sock - default_kwargs: dict[str, Any] = { - "limit": AioHttpDefaults.LIMIT, - "limit_per_host": AioHttpDefaults.LIMIT_PER_HOST, - "ttl_dns_cache": AioHttpDefaults.TTL_DNS_CACHE, - "use_dns_cache": AioHttpDefaults.USE_DNS_CACHE, - "enable_cleanup_closed": AioHttpDefaults.ENABLE_CLEANUP_CLOSED, - "force_close": AioHttpDefaults.FORCE_CLOSE, - "keepalive_timeout": AioHttpDefaults.KEEPALIVE_TIMEOUT, - "happy_eyeballs_delay": AioHttpDefaults.HAPPY_EYEBALLS_DELAY, - "family": AioHttpDefaults.SOCKET_FAMILY, - "socket_factory": socket_factory, - } - + default_kwargs: dict[str, Any] = AioHttpDefaults.get_default_kwargs() + default_kwargs["socket_factory"] = socket_factory default_kwargs.update(kwargs) return aiohttp.TCPConnector( diff --git a/src/aiperf/transports/http_defaults.py b/src/aiperf/transports/http_defaults.py index 4ec8309a1..d056fd526 100644 --- a/src/aiperf/transports/http_defaults.py +++ b/src/aiperf/transports/http_defaults.py @@ -2,8 +2,9 @@ # SPDX-License-Identifier: Apache-2.0 import socket from dataclasses import dataclass +from typing import Any -from aiperf.common import constants +from aiperf.common.environment import Environment @dataclass(frozen=True) @@ -14,23 +15,11 @@ class SocketDefaults: TCP_NODELAY = 1 # Disable Nagle's algorithm TCP_QUICKACK = 1 # Quick ACK mode - SO_KEEPALIVE = 1 # Enable keepalive - TCP_KEEPIDLE = 60 # Start keepalive after 1 min idle - TCP_KEEPINTVL = 30 # Keepalive interval: 30 seconds - TCP_KEEPCNT = 1 # 1 failed keepalive probes = dead - SO_LINGER = 0 # Disable linger SO_REUSEADDR = 1 # Enable reuse address SO_REUSEPORT = 1 # Enable reuse port - SO_RCVBUF = 1024 * 1024 * 10 # 10MB receive buffer - SO_SNDBUF = 1024 * 1024 * 10 # 10MB send buffer - - SO_RCVTIMEO = 30 # 30 second receive timeout - SO_SNDTIMEO = 30 # 30 second send timeout - TCP_USER_TIMEOUT = 30000 # 30 sec user timeout - @classmethod def apply_to_socket(cls, sock: socket.socket) -> None: """Apply the default socket options to the given socket.""" @@ -43,13 +32,19 @@ def apply_to_socket(cls, sock: socket.socket) -> None: # Fine-tune keepalive timing (Linux-specific) if hasattr(socket, "TCP_KEEPIDLE"): - sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, cls.TCP_KEEPIDLE) - sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, cls.TCP_KEEPINTVL) - sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, cls.TCP_KEEPCNT) + sock.setsockopt( + socket.SOL_TCP, socket.TCP_KEEPIDLE, Environment.HTTP.TCP_KEEPIDLE + ) + sock.setsockopt( + socket.SOL_TCP, socket.TCP_KEEPINTVL, Environment.HTTP.TCP_KEEPINTVL + ) + sock.setsockopt( + socket.SOL_TCP, socket.TCP_KEEPCNT, Environment.HTTP.TCP_KEEPCNT + ) # Buffer size optimizations for streaming - sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, cls.SO_RCVBUF) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, cls.SO_SNDBUF) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, Environment.HTTP.SO_RCVBUF) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, Environment.HTTP.SO_SNDBUF) # Linux-specific TCP optimizations if hasattr(socket, "TCP_QUICKACK"): @@ -57,7 +52,9 @@ def apply_to_socket(cls, sock: socket.socket) -> None: if hasattr(socket, "TCP_USER_TIMEOUT"): sock.setsockopt( - socket.SOL_TCP, socket.TCP_USER_TIMEOUT, cls.TCP_USER_TIMEOUT + socket.SOL_TCP, + socket.TCP_USER_TIMEOUT, + Environment.HTTP.TCP_USER_TIMEOUT, ) @@ -66,15 +63,30 @@ class AioHttpDefaults: """Default values for aiohttp.ClientSession.""" LIMIT = ( - constants.AIPERF_HTTP_CONNECTION_LIMIT + Environment.HTTP.CONNECTION_LIMIT ) # Maximum number of concurrent connections LIMIT_PER_HOST = ( 0 # Maximum number of concurrent connections per host (0 will set to LIMIT) ) - TTL_DNS_CACHE = 300 # Time to live for DNS cache + TTL_DNS_CACHE = Environment.HTTP.TTL_DNS_CACHE # Time to live for DNS cache USE_DNS_CACHE = True # Enable DNS cache ENABLE_CLEANUP_CLOSED = False # Disable cleanup of closed connections FORCE_CLOSE = False # Disable force close connections - KEEPALIVE_TIMEOUT = 300 # Keepalive timeout + KEEPALIVE_TIMEOUT = Environment.HTTP.KEEPALIVE_TIMEOUT # Keepalive timeout HAPPY_EYEBALLS_DELAY = None # Happy eyeballs delay (None = disabled) SOCKET_FAMILY = socket.AF_INET # Family of the socket (IPv4) + + @classmethod + def get_default_kwargs(cls) -> dict[str, Any]: + """Get the default keyword arguments for aiohttp.ClientSession.""" + return { + "limit": cls.LIMIT, + "limit_per_host": cls.LIMIT_PER_HOST, + "ttl_dns_cache": cls.TTL_DNS_CACHE, + "use_dns_cache": cls.USE_DNS_CACHE, + "enable_cleanup_closed": cls.ENABLE_CLEANUP_CLOSED, + "force_close": cls.FORCE_CLOSE, + "keepalive_timeout": cls.KEEPALIVE_TIMEOUT, + "happy_eyeballs_delay": cls.HAPPY_EYEBALLS_DELAY, + "family": cls.SOCKET_FAMILY, + } diff --git a/src/aiperf/ui/dashboard/aiperf_textual_app.py b/src/aiperf/ui/dashboard/aiperf_textual_app.py index b204726cb..61946f98f 100644 --- a/src/aiperf/ui/dashboard/aiperf_textual_app.py +++ b/src/aiperf/ui/dashboard/aiperf_textual_app.py @@ -12,8 +12,8 @@ from textual.widgets import Footer from aiperf.common.config.service_config import ServiceConfig -from aiperf.common.constants import AIPERF_DEV_MODE from aiperf.common.enums import WorkerStatus +from aiperf.common.environment import Environment from aiperf.common.models import MetricResult, RecordsStats, RequestsStats, WorkerStats from aiperf.controller.system_controller import SystemController from aiperf.ui.dashboard.aiperf_theme import AIPERF_THEME @@ -38,9 +38,6 @@ class AIPerfTextualApp(App): ALLOW_IN_MAXIMIZED_VIEW = "ProgressHeader, Footer" """Allow the custom header and footer to be displayed when a panel is maximized.""" - NOTIFICATION_TIMEOUT = 3 - """The timeout for notifications in seconds.""" - CSS = """ #main-container { height: 100%; @@ -85,7 +82,7 @@ def __init__( super().__init__() self.title = "NVIDIA AIPerf" - if AIPERF_DEV_MODE: + if Environment.DEV.MODE: self.title = "NVIDIA AIPerf (Developer Mode)" self.log_viewer: RichLogViewer | None = None @@ -142,7 +139,9 @@ async def action_quit(self) -> None: self.progress_header = None self.realtime_metrics_dashboard = None self.log_viewer = None + # Forward the signal to the main process + # IMPORTANT: This is necessary, otherwise the process will hang os.kill(os.getpid(), signal.SIGINT) async def action_toggle_hide_log_viewer(self) -> None: diff --git a/src/aiperf/ui/dashboard/progress_dashboard.py b/src/aiperf/ui/dashboard/progress_dashboard.py index 7ab9a8bfb..c794707ca 100644 --- a/src/aiperf/ui/dashboard/progress_dashboard.py +++ b/src/aiperf/ui/dashboard/progress_dashboard.py @@ -18,6 +18,7 @@ from textual.widgets import Static from aiperf.common.enums import CreditPhase +from aiperf.common.environment import Environment from aiperf.common.models import RecordsStats, RequestsStats, StatsProtocol from aiperf.ui.dashboard.custom_widgets import MaximizableWidget from aiperf.ui.utils import format_elapsed_time, format_eta @@ -54,8 +55,6 @@ class ProgressDashboard(Container, MaximizableWidget): } """ - SPINNER_REFRESH_RATE = 0.1 # 10 FPS - def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self.border_title = "Profile Progress" @@ -79,7 +78,7 @@ def __init__(self, **kwargs) -> None: def on_mount(self) -> None: """Set up the refresh timer when the widget is mounted.""" self.refresh_timer = self.set_interval( - self.SPINNER_REFRESH_RATE, self.refresh_timer_callback + Environment.UI.SPINNER_REFRESH_RATE, self.refresh_timer_callback ) def on_unmount(self) -> None: diff --git a/src/aiperf/ui/dashboard/realtime_metrics_dashboard.py b/src/aiperf/ui/dashboard/realtime_metrics_dashboard.py index 09d9bb19e..84703460d 100644 --- a/src/aiperf/ui/dashboard/realtime_metrics_dashboard.py +++ b/src/aiperf/ui/dashboard/realtime_metrics_dashboard.py @@ -16,6 +16,7 @@ from aiperf.common.aiperf_logger import AIPerfLogger from aiperf.common.config.service_config import ServiceConfig from aiperf.common.enums.metric_enums import MetricFlags, MetricUnitT +from aiperf.common.environment import Environment from aiperf.common.exceptions import MetricUnitError from aiperf.common.models.record_models import MetricResult from aiperf.metrics.base_metric import BaseMetric @@ -68,7 +69,7 @@ def _should_skip(self, metric: MetricResult) -> bool: metric_class.has_any_flags( MetricFlags.NO_CONSOLE | MetricFlags.INTERNAL | MetricFlags.EXPERIMENTAL ) - and not self.service_config.developer.show_internal_metrics + and not Environment.DEV.SHOW_INTERNAL_METRICS ) def _initialize_columns(self) -> None: diff --git a/src/aiperf/ui/dashboard/rich_log_viewer.py b/src/aiperf/ui/dashboard/rich_log_viewer.py index 071409939..831c4e9fd 100644 --- a/src/aiperf/ui/dashboard/rich_log_viewer.py +++ b/src/aiperf/ui/dashboard/rich_log_viewer.py @@ -9,6 +9,7 @@ from textual.events import Click from textual.widgets import RichLog +from aiperf.common.environment import Environment from aiperf.common.hooks import background_task from aiperf.common.mixins import AIPerfLifecycleMixin from aiperf.common.utils import yield_to_event_loop @@ -105,9 +106,7 @@ def __init__( self.log_queue = log_queue self.app = app - LOG_REFRESH_INTERVAL = 0.1 - - @background_task(immediate=True, interval=LOG_REFRESH_INTERVAL) + @background_task(immediate=True, interval=Environment.UI.LOG_REFRESH_INTERVAL) async def _consume_logs(self) -> None: """Consume log records from the queue and display them. diff --git a/src/aiperf/ui/tqdm_ui.py b/src/aiperf/ui/tqdm_ui.py index 9a0f19f05..db99ae1d6 100644 --- a/src/aiperf/ui/tqdm_ui.py +++ b/src/aiperf/ui/tqdm_ui.py @@ -2,9 +2,9 @@ # SPDX-License-Identifier: Apache-2.0 from tqdm import tqdm -from aiperf.common.constants import DEFAULT_UI_MIN_UPDATE_PERCENT from aiperf.common.decorators import implements_protocol from aiperf.common.enums import AIPerfUIType +from aiperf.common.environment import Environment from aiperf.common.factories import AIPerfUIFactory from aiperf.common.hooks import ( on_profiling_progress, @@ -39,7 +39,7 @@ def __init__( **kwargs, ) self.total = total - self.update_threshold = DEFAULT_UI_MIN_UPDATE_PERCENT + self.update_threshold = Environment.UI.MIN_UPDATE_PERCENT self.last_percent = 0.0 self.last_value = 0.0 diff --git a/src/aiperf/workers/worker.py b/src/aiperf/workers/worker.py index 69ae77d98..e4a4d6cbb 100644 --- a/src/aiperf/workers/worker.py +++ b/src/aiperf/workers/worker.py @@ -10,8 +10,6 @@ from aiperf.common.base_component_service import BaseComponentService from aiperf.common.config import ServiceConfig, UserConfig from aiperf.common.constants import ( - AIPERF_HTTP_CONNECTION_LIMIT, - DEFAULT_WORKER_HEALTH_CHECK_INTERVAL, NANOS_PER_SECOND, ) from aiperf.common.enums import ( @@ -21,6 +19,7 @@ MessageType, ServiceType, ) +from aiperf.common.environment import Environment from aiperf.common.exceptions import NotInitializedError from aiperf.common.factories import ( ServiceFactory, @@ -76,13 +75,13 @@ def __init__( pull_client_bind=False, # NOTE: We set the max concurrency to the same as the HTTP connection limit to ensure # that the worker will not receive any more credits while the connection limit is reached. - pull_client_max_concurrency=AIPERF_HTTP_CONNECTION_LIMIT, + pull_client_max_concurrency=Environment.HTTP.CONNECTION_LIMIT, **kwargs, ) self.debug(lambda: f"Worker process __init__ (pid: {self._process.pid})") - self.health_check_interval = DEFAULT_WORKER_HEALTH_CHECK_INTERVAL + self.health_check_interval = Environment.WORKER.HEALTH_CHECK_INTERVAL self.task_stats: WorkerTaskStats = WorkerTaskStats() diff --git a/src/aiperf/workers/worker_manager.py b/src/aiperf/workers/worker_manager.py index 1446bce0b..d4a5b897f 100644 --- a/src/aiperf/workers/worker_manager.py +++ b/src/aiperf/workers/worker_manager.py @@ -8,18 +8,10 @@ from aiperf.common.base_component_service import BaseComponentService from aiperf.common.bootstrap import bootstrap_and_run_service from aiperf.common.config import ServiceConfig, UserConfig -from aiperf.common.constants import ( - DEFAULT_MAX_WORKERS_CAP, - DEFAULT_WORKER_CHECK_INTERVAL, - DEFAULT_WORKER_ERROR_RECOVERY_TIME, - DEFAULT_WORKER_HIGH_LOAD_CPU_USAGE, - DEFAULT_WORKER_HIGH_LOAD_RECOVERY_TIME, - DEFAULT_WORKER_STALE_TIME, - DEFAULT_WORKER_STATUS_SUMMARY_INTERVAL, - NANOS_PER_SECOND, -) +from aiperf.common.constants import NANOS_PER_SECOND from aiperf.common.enums import MessageType, ServiceType from aiperf.common.enums.worker_enums import WorkerStatus +from aiperf.common.environment import Environment from aiperf.common.factories import ServiceFactory from aiperf.common.hooks import background_task, on_message, on_start, on_stop from aiperf.common.messages import ( @@ -76,9 +68,13 @@ def __init__( self.max_concurrency = self.user_config.loadgen.concurrency self.max_workers = self.service_config.workers.max if self.max_workers is None: - # Default to 75% of the CPU cores - 1, with a cap of DEFAULT_MAX_WORKERS_CAP, and a minimum of 1 + # Default to 75% of the CPU cores - 1, with a cap of Environment.WORKER.MAX_WORKERS_CAP, and a minimum of 1 self.max_workers = max( - 1, min(int(self.cpu_count * 0.75) - 1, DEFAULT_MAX_WORKERS_CAP) + 1, + min( + int(self.cpu_count * Environment.WORKER.CPU_UTILIZATION_FACTOR) - 1, + Environment.WORKER.MAX_WORKERS_CAP, + ), ) self.debug( lambda: f"Auto-setting max workers to {self.max_workers} due to no max workers specified." @@ -150,17 +146,17 @@ def _update_worker_status( if message.task_stats.failed > info.task_stats.failed: info.last_error_ns = time.time_ns() info.status = WorkerStatus.ERROR - elif (time.time_ns() - (info.last_error_ns or 0)) / NANOS_PER_SECOND < DEFAULT_WORKER_ERROR_RECOVERY_TIME: # fmt: skip + elif (time.time_ns() - (info.last_error_ns or 0)) / NANOS_PER_SECOND < Environment.WORKER.ERROR_RECOVERY_TIME: # fmt: skip info.status = WorkerStatus.ERROR # High Load Status - elif message.health.cpu_usage > DEFAULT_WORKER_HIGH_LOAD_CPU_USAGE: + elif message.health.cpu_usage > Environment.WORKER.HIGH_LOAD_CPU_USAGE: info.last_high_load_ns = time.time_ns() self.warning( f"CPU usage for {message.service_id} is {round(message.health.cpu_usage)}%. AIPerf results may be inaccurate." ) info.status = WorkerStatus.HIGH_LOAD - elif (time.time_ns() - (info.last_high_load_ns or 0)) / NANOS_PER_SECOND < DEFAULT_WORKER_HIGH_LOAD_RECOVERY_TIME: # fmt: skip + elif (time.time_ns() - (info.last_high_load_ns or 0)) / NANOS_PER_SECOND < Environment.WORKER.HIGH_LOAD_RECOVERY_TIME: # fmt: skip info.status = WorkerStatus.HIGH_LOAD # Idle Status @@ -174,16 +170,18 @@ def _update_worker_status( info.health = message.health info.task_stats = message.task_stats - @background_task(immediate=False, interval=DEFAULT_WORKER_CHECK_INTERVAL) + @background_task(immediate=False, interval=Environment.WORKER.CHECK_INTERVAL) async def _worker_status_loop(self) -> None: """Check the status of all workers.""" self.debug("Checking worker status") for _, info in self.worker_infos.items(): - if (time.time_ns() - (info.last_update_ns or 0)) / NANOS_PER_SECOND > DEFAULT_WORKER_STALE_TIME: # fmt: skip + if (time.time_ns() - (info.last_update_ns or 0)) / NANOS_PER_SECOND > Environment.WORKER.STALE_TIME: # fmt: skip info.status = WorkerStatus.STALE - @background_task(immediate=False, interval=DEFAULT_WORKER_STATUS_SUMMARY_INTERVAL) + @background_task( + immediate=False, interval=Environment.WORKER.STATUS_SUMMARY_INTERVAL + ) async def _worker_summary_loop(self) -> None: """Generate a summary of the worker status.""" summary = WorkerStatusSummaryMessage( diff --git a/src/aiperf/zmq/__init__.py b/src/aiperf/zmq/__init__.py index 06e7b5b73..4d03483a5 100644 --- a/src/aiperf/zmq/__init__.py +++ b/src/aiperf/zmq/__init__.py @@ -18,8 +18,6 @@ ZMQPullClient, ) from aiperf.zmq.push_client import ( - MAX_PUSH_RETRIES, - RETRY_DELAY_INTERVAL_SEC, ZMQPushClient, ) from aiperf.zmq.router_reply_client import ( @@ -59,10 +57,8 @@ "BaseZMQClient", "BaseZMQCommunication", "BaseZMQProxy", - "MAX_PUSH_RETRIES", "ProxyEndType", "ProxySocketClient", - "RETRY_DELAY_INTERVAL_SEC", "TOPIC_DELIMITER", "TOPIC_END", "TOPIC_END_ENCODED", diff --git a/src/aiperf/zmq/dealer_request_client.py b/src/aiperf/zmq/dealer_request_client.py index b8249c866..667a882ac 100644 --- a/src/aiperf/zmq/dealer_request_client.py +++ b/src/aiperf/zmq/dealer_request_client.py @@ -7,9 +7,9 @@ import zmq.asyncio -from aiperf.common.constants import DEFAULT_COMMS_REQUEST_TIMEOUT from aiperf.common.decorators import implements_protocol from aiperf.common.enums import CommClientType +from aiperf.common.environment import Environment from aiperf.common.exceptions import CommunicationError from aiperf.common.factories import CommunicationClientFactory from aiperf.common.hooks import background_task, on_stop @@ -127,7 +127,7 @@ async def request_async( async def request( self, message: Message, - timeout: float = DEFAULT_COMMS_REQUEST_TIMEOUT, + timeout: float = Environment.SERVICE.COMMS_REQUEST_TIMEOUT, ) -> Message: """Send a request and wait for a response up to timeout seconds. diff --git a/src/aiperf/zmq/pull_client.py b/src/aiperf/zmq/pull_client.py index dbef4362c..4c48be8de 100644 --- a/src/aiperf/zmq/pull_client.py +++ b/src/aiperf/zmq/pull_client.py @@ -6,9 +6,9 @@ import zmq.asyncio -from aiperf.common.constants import DEFAULT_PULL_CLIENT_MAX_CONCURRENCY from aiperf.common.decorators import implements_protocol from aiperf.common.enums import CommClientType +from aiperf.common.environment import Environment from aiperf.common.factories import CommunicationClientFactory from aiperf.common.hooks import background_task, on_stop from aiperf.common.messages import Message @@ -78,7 +78,7 @@ def __init__( self.semaphore = asyncio.Semaphore(value=max_pull_concurrency) else: self.semaphore = asyncio.Semaphore( - value=DEFAULT_PULL_CLIENT_MAX_CONCURRENCY + value=Environment.ZMQ.PULL_MAX_CONCURRENCY ) @background_task(immediate=True, interval=None) diff --git a/src/aiperf/zmq/push_client.py b/src/aiperf/zmq/push_client.py index ae58aab57..c2a7f4f4f 100644 --- a/src/aiperf/zmq/push_client.py +++ b/src/aiperf/zmq/push_client.py @@ -7,18 +7,13 @@ from aiperf.common.decorators import implements_protocol from aiperf.common.enums import CommClientType +from aiperf.common.environment import Environment from aiperf.common.exceptions import CommunicationError from aiperf.common.factories import CommunicationClientFactory from aiperf.common.messages import Message from aiperf.common.protocols import PushClientProtocol from aiperf.zmq.zmq_base_client import BaseZMQClient -MAX_PUSH_RETRIES = 2 -"""Maximum number of retries for pushing a message.""" - -RETRY_DELAY_INTERVAL_SEC = 0.1 -"""The interval to wait before retrying to push a message.""" - @implements_protocol(PushClientProtocol) @CommunicationClientFactory.register(CommClientType.PUSH) @@ -75,15 +70,18 @@ async def _push_message( self, message: Message, retry_count: int = 0, - max_retries: int = MAX_PUSH_RETRIES, + max_retries: int | None = None, ) -> None: """Push a message to the socket. Will retry up to max_retries times. Args: message: Message to be sent must be a Message object retry_count: Current retry count - max_retries: Maximum number of times to retry pushing the message + max_retries: Maximum number of times to retry pushing the message (defaults to Environment.ZMQ.PUSH_MAX_RETRIES) """ + if max_retries is None: + max_retries = Environment.ZMQ.PUSH_MAX_RETRIES + try: data_json = message.model_dump_json() await self.socket.send_string(data_json) @@ -98,7 +96,7 @@ async def _push_message( f"Failed to push data after {retry_count} retries: {e}", ) from e - await asyncio.sleep(RETRY_DELAY_INTERVAL_SEC) + await asyncio.sleep(Environment.ZMQ.PUSH_RETRY_DELAY) return await self._push_message(message, retry_count + 1, max_retries) except Exception as e: raise CommunicationError(f"Failed to push data: {e}") from e diff --git a/src/aiperf/zmq/zmq_defaults.py b/src/aiperf/zmq/zmq_defaults.py index 01d0e5e6b..bf37d0212 100644 --- a/src/aiperf/zmq/zmq_defaults.py +++ b/src/aiperf/zmq/zmq_defaults.py @@ -1,6 +1,8 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +from aiperf.common.environment import Environment + # ZMQ Constants TOPIC_END = "$" """This is used to add to the end of each topic to prevent the topic from being a prefix of another topic. @@ -27,14 +29,17 @@ class ZMQSocketDefaults: - """Default values for ZMQ sockets.""" + """Default values for ZMQ sockets. + + Socket options are loaded from the Environment configuration to allow runtime tuning. + """ - # Socket Options - RCVTIMEO = 300000 # 5 minutes - SNDTIMEO = 300000 # 5 minutes + # Socket Options (loaded from Environment) + RCVTIMEO = Environment.ZMQ.RCVTIMEO + SNDTIMEO = Environment.ZMQ.SNDTIMEO TCP_KEEPALIVE = 1 - TCP_KEEPALIVE_IDLE = 60 - TCP_KEEPALIVE_INTVL = 10 + TCP_KEEPALIVE_IDLE = Environment.ZMQ.TCP_KEEPALIVE_IDLE + TCP_KEEPALIVE_INTVL = Environment.ZMQ.TCP_KEEPALIVE_INTVL TCP_KEEPALIVE_CNT = 3 IMMEDIATE = 1 # Don't queue messages LINGER = 0 # Don't wait on close diff --git a/tests/common/conftest.py b/tests/common/conftest.py index 4ce32e64c..71c465ac0 100644 --- a/tests/common/conftest.py +++ b/tests/common/conftest.py @@ -36,7 +36,11 @@ def mock_log_queue() -> MagicMock: @pytest.fixture -def service_config_no_uvloop(service_config: ServiceConfig) -> ServiceConfig: +def service_config_no_uvloop( + service_config: ServiceConfig, monkeypatch +) -> ServiceConfig: """Create a ServiceConfig with uvloop disabled for testing.""" - service_config.developer.disable_uvloop = True + from aiperf.common.environment import Environment + + monkeypatch.setattr(Environment.SERVICE, "DISABLE_UVLOOP", True) return service_config diff --git a/tests/config/test_dev_mode.py b/tests/config/test_dev_mode.py index 0b07851b5..9d4141eeb 100644 --- a/tests/config/test_dev_mode.py +++ b/tests/config/test_dev_mode.py @@ -6,38 +6,20 @@ class TestDevMode: - def test_dev_mode_on(self, monkeypatch, capsys): + def test_dev_mode_on(self, monkeypatch): monkeypatch.setenv("AIPERF_DEV_MODE", "1") - constants = importlib.reload(importlib.import_module("aiperf.common.constants")) - - assert constants.AIPERF_DEV_MODE is True - - monkeypatch.setattr("aiperf.common.config.dev_config.AIPERF_DEV_MODE", True) - _ = importlib.reload( - importlib.import_module("aiperf.common.config.service_config") + env_module = importlib.reload( + importlib.import_module("aiperf.common.environment") ) - _ = importlib.reload(importlib.import_module("aiperf.common.config.dev_config")) - _ = importlib.reload(importlib.import_module("aiperf.common.config")) - cli = importlib.reload(importlib.import_module("aiperf.cli")) + Environment = env_module.Environment - cli.app(["profile", "-h"]) - captured = capsys.readouterr() - assert "Developer Mode is active" in captured.out + assert Environment.DEV.MODE is True - def test_dev_mode_off(self, monkeypatch, capsys): + def test_dev_mode_off(self, monkeypatch): monkeypatch.setenv("AIPERF_DEV_MODE", "0") - constants = importlib.reload(importlib.import_module("aiperf.common.constants")) - - assert constants.AIPERF_DEV_MODE is False - - monkeypatch.setattr("aiperf.common.config.dev_config.AIPERF_DEV_MODE", False) - _ = importlib.reload( - importlib.import_module("aiperf.common.config.service_config") + env_module = importlib.reload( + importlib.import_module("aiperf.common.environment") ) - _ = importlib.reload(importlib.import_module("aiperf.common.config.dev_config")) - _ = importlib.reload(importlib.import_module("aiperf.common.config")) - cli = importlib.reload(importlib.import_module("aiperf.cli")) + Environment = env_module.Environment - cli.app(["-h"]) - captured = capsys.readouterr() - assert "Developer Mode is active" not in captured.out + assert Environment.DEV.MODE is False diff --git a/tests/data_exporters/test_console_exporter.py b/tests/data_exporters/test_console_exporter.py index cd3917b1b..5d76f277f 100644 --- a/tests/data_exporters/test_console_exporter.py +++ b/tests/data_exporters/test_console_exporter.py @@ -5,7 +5,6 @@ from rich.console import Console from aiperf.common.config import EndpointConfig, ServiceConfig, UserConfig -from aiperf.common.config.dev_config import DeveloperConfig from aiperf.common.constants import NANOS_PER_MILLIS from aiperf.common.enums import EndpointType from aiperf.common.models import MetricResult, ProfileResults @@ -128,9 +127,7 @@ def test_should_show_metrics_based_on_flags( ): """Test that metrics are shown/hidden based on their flags""" user_config = UserConfig(endpoint=mock_endpoint_config) - service_config = ServiceConfig( - developer=DeveloperConfig(show_internal_metrics=False) - ) + service_config = ServiceConfig() config = ExporterConfig( results=ProfileResults( records=[], diff --git a/tests/gpu_telemetry/test_telemetry_manager.py b/tests/gpu_telemetry/test_telemetry_manager.py index 00b869575..62519d7bc 100644 --- a/tests/gpu_telemetry/test_telemetry_manager.py +++ b/tests/gpu_telemetry/test_telemetry_manager.py @@ -6,6 +6,7 @@ import pytest from aiperf.common.config import UserConfig +from aiperf.common.environment import Environment from aiperf.common.messages import ( CommandAcknowledgedResponse, ProfileConfigureCommand, @@ -14,7 +15,6 @@ TelemetryStatusMessage, ) from aiperf.common.models import ErrorDetails -from aiperf.gpu_telemetry.constants import DEFAULT_DCGM_ENDPOINTS from aiperf.gpu_telemetry.telemetry_data_collector import TelemetryDataCollector from aiperf.gpu_telemetry.telemetry_manager import TelemetryManager @@ -50,7 +50,7 @@ def test_initialization_default_endpoint(self): mock_user_config.gpu_telemetry = None manager = self._create_manager_with_mocked_base(mock_user_config) - assert manager._dcgm_endpoints == list(DEFAULT_DCGM_ENDPOINTS) + assert manager._dcgm_endpoints == list(Environment.GPU.DEFAULT_DCGM_ENDPOINTS) def test_initialization_custom_endpoints(self): """Test initialization with custom user-provided endpoints.""" @@ -61,7 +61,7 @@ def test_initialization_custom_endpoints(self): manager = self._create_manager_with_mocked_base(mock_user_config) # Should have both defaults + custom endpoint - for default_endpoint in DEFAULT_DCGM_ENDPOINTS: + for default_endpoint in Environment.GPU.DEFAULT_DCGM_ENDPOINTS: assert default_endpoint in manager._dcgm_endpoints assert custom_endpoint in manager._dcgm_endpoints assert len(manager._dcgm_endpoints) == 3 @@ -74,7 +74,7 @@ def test_initialization_string_endpoint(self): manager = self._create_manager_with_mocked_base(mock_user_config) assert isinstance(manager._dcgm_endpoints, list) - for default_endpoint in DEFAULT_DCGM_ENDPOINTS: + for default_endpoint in Environment.GPU.DEFAULT_DCGM_ENDPOINTS: assert default_endpoint in manager._dcgm_endpoints assert "http://single-node:9401/metrics" in manager._dcgm_endpoints assert len(manager._dcgm_endpoints) == 3 @@ -94,7 +94,7 @@ def test_initialization_filters_invalid_urls(self): # Should have 2 defaults + 2 valid URLs assert len(manager._dcgm_endpoints) == 4 - for default_endpoint in DEFAULT_DCGM_ENDPOINTS: + for default_endpoint in Environment.GPU.DEFAULT_DCGM_ENDPOINTS: assert default_endpoint in manager._dcgm_endpoints assert "http://valid:9401/metrics" in manager._dcgm_endpoints assert "http://another-valid:9401/metrics" in manager._dcgm_endpoints @@ -112,8 +112,8 @@ def test_initialization_deduplicates_endpoints(self): # Should have 2 defaults + 2 unique user endpoints (duplicate removed) assert len(manager._dcgm_endpoints) == 4 - assert manager._dcgm_endpoints[0] == DEFAULT_DCGM_ENDPOINTS[0] - assert manager._dcgm_endpoints[1] == DEFAULT_DCGM_ENDPOINTS[1] + assert manager._dcgm_endpoints[0] == Environment.GPU.DEFAULT_DCGM_ENDPOINTS[0] + assert manager._dcgm_endpoints[1] == Environment.GPU.DEFAULT_DCGM_ENDPOINTS[1] assert manager._dcgm_endpoints[2] == "http://node1:9401/metrics" assert manager._dcgm_endpoints[3] == "http://node2:9401/metrics" @@ -130,8 +130,8 @@ def test_user_provides_default_endpoint(self): # Should have 2 defaults + 1 unique user endpoint (defaults not duplicated) assert len(manager._dcgm_endpoints) == 3 - assert manager._dcgm_endpoints[0] == DEFAULT_DCGM_ENDPOINTS[0] - assert manager._dcgm_endpoints[1] == DEFAULT_DCGM_ENDPOINTS[1] + assert manager._dcgm_endpoints[0] == Environment.GPU.DEFAULT_DCGM_ENDPOINTS[0] + assert manager._dcgm_endpoints[1] == Environment.GPU.DEFAULT_DCGM_ENDPOINTS[1] assert manager._dcgm_endpoints[2] == "http://node1:9401/metrics" # Verify user_provided_endpoints excludes the defaults assert len(manager._user_provided_endpoints) == 1 @@ -525,7 +525,7 @@ def test_invalid_endpoints_filtered_during_init(self): # Only 2 defaults + valid endpoint should remain assert len(manager._dcgm_endpoints) == 3 - for default_endpoint in DEFAULT_DCGM_ENDPOINTS: + for default_endpoint in Environment.GPU.DEFAULT_DCGM_ENDPOINTS: assert default_endpoint in manager._dcgm_endpoints assert "http://valid:9401/metrics" in manager._dcgm_endpoints @@ -568,10 +568,10 @@ def test_both_defaults_included_when_no_user_config(self): manager = self._create_manager_with_mocked_base(mock_user_config) - assert len(DEFAULT_DCGM_ENDPOINTS) == 2 - assert "http://localhost:9400/metrics" in DEFAULT_DCGM_ENDPOINTS - assert "http://localhost:9401/metrics" in DEFAULT_DCGM_ENDPOINTS - assert manager._dcgm_endpoints == list(DEFAULT_DCGM_ENDPOINTS) + assert len(Environment.GPU.DEFAULT_DCGM_ENDPOINTS) == 2 + assert "http://localhost:9400/metrics" in Environment.GPU.DEFAULT_DCGM_ENDPOINTS + assert "http://localhost:9401/metrics" in Environment.GPU.DEFAULT_DCGM_ENDPOINTS + assert manager._dcgm_endpoints == list(Environment.GPU.DEFAULT_DCGM_ENDPOINTS) def test_user_explicitly_configured_telemetry_flag(self): """Test that _user_explicitly_configured_telemetry flag is set correctly.""" @@ -600,7 +600,7 @@ def _create_test_manager(self): manager = TelemetryManager.__new__(TelemetryManager) manager.service_id = "test_manager" manager._collectors = {} - manager._dcgm_endpoints = list(DEFAULT_DCGM_ENDPOINTS) + manager._dcgm_endpoints = list(Environment.GPU.DEFAULT_DCGM_ENDPOINTS) manager._user_provided_endpoints = [] manager._user_explicitly_configured_telemetry = False manager._collection_interval = 0.33 @@ -665,7 +665,7 @@ def _create_test_manager(self): manager = TelemetryManager.__new__(TelemetryManager) manager.service_id = "test_manager" manager._collectors = {} - manager._dcgm_endpoints = list(DEFAULT_DCGM_ENDPOINTS) + manager._dcgm_endpoints = list(Environment.GPU.DEFAULT_DCGM_ENDPOINTS) manager._user_provided_endpoints = [] manager._user_explicitly_configured_telemetry = False manager._collection_interval = 0.33 @@ -743,7 +743,9 @@ def _create_test_manager(self, user_requested, user_endpoints): manager = TelemetryManager.__new__(TelemetryManager) manager.service_id = "test_manager" manager._collectors = {} - manager._dcgm_endpoints = list(DEFAULT_DCGM_ENDPOINTS) + user_endpoints + manager._dcgm_endpoints = ( + list(Environment.GPU.DEFAULT_DCGM_ENDPOINTS) + user_endpoints + ) manager._user_provided_endpoints = user_endpoints manager._user_explicitly_configured_telemetry = user_requested manager._collection_interval = 0.33 @@ -759,12 +761,14 @@ async def test_hide_unreachable_defaults_when_one_default_reachable(self): # Manually simulate one reachable default by adding to collectors # This tests the smart visibility logic without complex mocking - manager._collectors[DEFAULT_DCGM_ENDPOINTS[0]] = MagicMock() + manager._collectors[Environment.GPU.DEFAULT_DCGM_ENDPOINTS[0]] = MagicMock() # Call the status reporting part directly reachable_endpoints = list(manager._collectors.keys()) reachable_defaults = [ - ep for ep in DEFAULT_DCGM_ENDPOINTS if ep in reachable_endpoints + ep + for ep in Environment.GPU.DEFAULT_DCGM_ENDPOINTS + if ep in reachable_endpoints ] # Test the smart visibility logic @@ -777,8 +781,8 @@ async def test_hide_unreachable_defaults_when_one_default_reachable(self): # Should only report reachable endpoint assert len(endpoints_to_report) == 1 - assert DEFAULT_DCGM_ENDPOINTS[0] in endpoints_to_report - assert DEFAULT_DCGM_ENDPOINTS[1] not in endpoints_to_report + assert Environment.GPU.DEFAULT_DCGM_ENDPOINTS[0] in endpoints_to_report + assert Environment.GPU.DEFAULT_DCGM_ENDPOINTS[1] not in endpoints_to_report @pytest.mark.asyncio async def test_show_custom_urls_when_defaults_unreachable(self): @@ -803,7 +807,7 @@ async def test_show_custom_urls_when_defaults_unreachable(self): assert len(call_args.endpoints_configured) == 1 # Just custom URL assert "http://custom:9401/metrics" in call_args.endpoints_configured # Defaults should NOT be in the tested list since they're unreachable - for endpoint in DEFAULT_DCGM_ENDPOINTS: + for endpoint in Environment.GPU.DEFAULT_DCGM_ENDPOINTS: assert endpoint not in call_args.endpoints_configured @pytest.mark.asyncio @@ -815,12 +819,14 @@ async def test_show_custom_and_reachable_defaults(self): manager.publish = AsyncMock() # Simulate one reachable default - manager._collectors[DEFAULT_DCGM_ENDPOINTS[0]] = MagicMock() + manager._collectors[Environment.GPU.DEFAULT_DCGM_ENDPOINTS[0]] = MagicMock() # Get the status logic results directly reachable_endpoints = list(manager._collectors.keys()) reachable_defaults = [ - ep for ep in DEFAULT_DCGM_ENDPOINTS if ep in reachable_endpoints + ep + for ep in Environment.GPU.DEFAULT_DCGM_ENDPOINTS + if ep in reachable_endpoints ] # Scenario 3 logic @@ -831,8 +837,8 @@ async def test_show_custom_and_reachable_defaults(self): # Should have both custom URL and reachable default assert len(endpoints_to_report) == 2 assert "http://custom:9401/metrics" in endpoints_to_report - assert DEFAULT_DCGM_ENDPOINTS[0] in endpoints_to_report - assert DEFAULT_DCGM_ENDPOINTS[1] not in endpoints_to_report + assert Environment.GPU.DEFAULT_DCGM_ENDPOINTS[0] in endpoints_to_report + assert Environment.GPU.DEFAULT_DCGM_ENDPOINTS[1] not in endpoints_to_report @pytest.mark.asyncio async def test_hide_defaults_when_not_requested_and_all_unreachable(self): diff --git a/tests/post_processors/test_record_export_results_processor.py b/tests/post_processors/test_record_export_results_processor.py index 6077b9773..493d35f38 100644 --- a/tests/post_processors/test_record_export_results_processor.py +++ b/tests/post_processors/test_record_export_results_processor.py @@ -14,9 +14,9 @@ ServiceConfig, UserConfig, ) -from aiperf.common.constants import DEFAULT_RECORD_EXPORT_BATCH_SIZE from aiperf.common.enums import CreditPhase, EndpointType from aiperf.common.enums.data_exporter_enums import ExportLevel +from aiperf.common.environment import Environment from aiperf.common.exceptions import PostProcessorDisabled from aiperf.common.messages import MetricRecordsMessage from aiperf.common.models.record_models import ( @@ -180,11 +180,11 @@ def test_init_sets_show_internal_in_dev_mode( service_config: ServiceConfig, ): """Test that show_internal is set based on dev mode.""" - with patch( - "aiperf.post_processors.record_export_results_processor.AIPERF_DEV_MODE", - True, + with ( + patch.object(Environment.DEV, "MODE", True), + patch.object(Environment.DEV, "SHOW_INTERNAL_METRICS", True), + patch.object(Environment.DEV, "SHOW_EXPERIMENTAL_METRICS", False), ): - service_config.developer.show_internal_metrics = True processor = RecordExportResultsProcessor( service_id="records-manager", service_config=service_config, @@ -596,7 +596,7 @@ async def test_lifecycle( with patch.object( MetricRecordDict, "to_display_dict", return_value=mock_display_dict ): - for i in range(DEFAULT_RECORD_EXPORT_BATCH_SIZE * 2): + for i in range(Environment.RECORD.EXPORT_BATCH_SIZE * 2): await processor.process_result( create_metric_records_message( x_request_id=f"record-{i}", @@ -612,14 +612,14 @@ async def test_lifecycle( finally: await processor.stop() - assert processor.lines_written == DEFAULT_RECORD_EXPORT_BATCH_SIZE * 2 + assert processor.lines_written == Environment.RECORD.EXPORT_BATCH_SIZE * 2 contents = mock_aiofiles_stringio.getvalue() lines = contents.splitlines() assert contents.endswith("\n"), ( f"Contents should end with newline but got: {repr(contents[-20:])}" ) - assert len(lines) == DEFAULT_RECORD_EXPORT_BATCH_SIZE * 2 + assert len(lines) == Environment.RECORD.EXPORT_BATCH_SIZE * 2 for i, line in enumerate(lines): record = MetricRecordInfo.model_validate_json(line) diff --git a/tests/test_main.py b/tests/test_main.py index e366ce50e..4bb699d2a 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -6,7 +6,7 @@ import pytest -from aiperf.gpu_telemetry.constants import DEFAULT_DCGM_ENDPOINTS +from aiperf.common.environment import Environment class TestMainFunction: @@ -25,20 +25,20 @@ def _restore_argv(self): # --gpu-telemetry at end without value ( ["--gpu-telemetry"], - ["--gpu-telemetry", *DEFAULT_DCGM_ENDPOINTS], - DEFAULT_DCGM_ENDPOINTS, + ["--gpu-telemetry", *Environment.GPU.DEFAULT_DCGM_ENDPOINTS], + Environment.GPU.DEFAULT_DCGM_ENDPOINTS, ), # --gpu-telemetry followed by single dash flag ( ["--gpu-telemetry", "-v"], - ["--gpu-telemetry", *DEFAULT_DCGM_ENDPOINTS, "-v"], - DEFAULT_DCGM_ENDPOINTS, + ["--gpu-telemetry", *Environment.GPU.DEFAULT_DCGM_ENDPOINTS, "-v"], + Environment.GPU.DEFAULT_DCGM_ENDPOINTS, ), # --gpu-telemetry followed by double dash flag ( ["--gpu-telemetry", "--verbose"], - ["--gpu-telemetry", *DEFAULT_DCGM_ENDPOINTS, "--verbose"], - DEFAULT_DCGM_ENDPOINTS, + ["--gpu-telemetry", *Environment.GPU.DEFAULT_DCGM_ENDPOINTS, "--verbose"], + Environment.GPU.DEFAULT_DCGM_ENDPOINTS, ), # --gpu-telemetry with custom value ( @@ -75,4 +75,8 @@ def mock_run_system_controller(user_config, service_config): main() assert sys.argv == ["aiperf", "profile", "-m", "test-model", *expected_argv] - assert captured_user_config.gpu_telemetry == expected_gpu_telemetry + assert ( + captured_user_config.gpu_telemetry == expected_gpu_telemetry + if captured_user_config + else [] + ) diff --git a/tests/transports/test_tcp_connector.py b/tests/transports/test_tcp_connector.py index e9906ac40..9fdb06a51 100644 --- a/tests/transports/test_tcp_connector.py +++ b/tests/transports/test_tcp_connector.py @@ -11,9 +11,8 @@ import pytest -from aiperf.common import constants +from aiperf.common.environment import Environment from aiperf.transports.aiohttp_client import create_tcp_connector -from aiperf.transports.http_defaults import SocketDefaults ################################################################################ # Test create_tcp_connector @@ -36,7 +35,7 @@ def test_create_default_connector(self) -> None: call_kwargs = mock_connector_class.call_args[1] # Verify default parameters - assert call_kwargs["limit"] == constants.AIPERF_HTTP_CONNECTION_LIMIT + assert call_kwargs["limit"] == Environment.HTTP.CONNECTION_LIMIT assert call_kwargs["limit_per_host"] == 0 assert call_kwargs["ttl_dns_cache"] == 300 assert call_kwargs["use_dns_cache"] is True @@ -103,8 +102,8 @@ def test_socket_factory_configuration(self, socket_factory_setup) -> None: expected_calls = [ (socket.SOL_TCP, socket.TCP_NODELAY, 1), (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), - (socket.SOL_SOCKET, socket.SO_RCVBUF, SocketDefaults.SO_RCVBUF), - (socket.SOL_SOCKET, socket.SO_SNDBUF, SocketDefaults.SO_SNDBUF), + (socket.SOL_SOCKET, socket.SO_RCVBUF, Environment.HTTP.SO_RCVBUF), + (socket.SOL_SOCKET, socket.SO_SNDBUF, Environment.HTTP.SO_SNDBUF), ] for option_level, option_name, option_value in expected_calls: @@ -122,26 +121,26 @@ def test_socket_factory_configuration(self, socket_factory_setup) -> None: True, "TCP_KEEPIDLE", socket.TCP_KEEPIDLE, - SocketDefaults.TCP_KEEPIDLE, + Environment.HTTP.TCP_KEEPIDLE, ), ( True, "TCP_KEEPINTVL", socket.TCP_KEEPINTVL, - SocketDefaults.TCP_KEEPINTVL, + Environment.HTTP.TCP_KEEPINTVL, ), - (True, "TCP_KEEPCNT", socket.TCP_KEEPCNT, SocketDefaults.TCP_KEEPCNT), + (True, "TCP_KEEPCNT", socket.TCP_KEEPCNT, Environment.HTTP.TCP_KEEPCNT), ( True, "TCP_QUICKACK", socket.TCP_QUICKACK, - SocketDefaults.TCP_QUICKACK, + 1, ), ( True, "TCP_USER_TIMEOUT", socket.TCP_USER_TIMEOUT, - SocketDefaults.TCP_USER_TIMEOUT, + Environment.HTTP.TCP_USER_TIMEOUT, ), (False, "TCP_KEEPIDLE", socket.TCP_KEEPIDLE, None), ], diff --git a/tests/ui/test_realtime_metrics_dashboard.py b/tests/ui/test_realtime_metrics_dashboard.py index 109d094c0..9fddf3bc6 100644 --- a/tests/ui/test_realtime_metrics_dashboard.py +++ b/tests/ui/test_realtime_metrics_dashboard.py @@ -1,10 +1,12 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +from unittest.mock import patch + import pytest from aiperf.common.config import ServiceConfig -from aiperf.common.config.dev_config import DeveloperConfig +from aiperf.common.environment import Environment from aiperf.common.models import MetricResult from aiperf.metrics.types.benchmark_duration_metric import BenchmarkDurationMetric from aiperf.metrics.types.error_request_count import ErrorRequestCountMetric @@ -18,16 +20,6 @@ from aiperf.ui.dashboard.realtime_metrics_dashboard import RealtimeMetricsTable -@pytest.fixture -def service_config_show_internal_false(): - return ServiceConfig(developer=DeveloperConfig(show_internal_metrics=False)) - - -@pytest.fixture -def service_config_show_internal_true(): - return ServiceConfig(developer=DeveloperConfig(show_internal_metrics=True)) - - class TestRealtimeMetricsTable: @pytest.mark.parametrize( "metric_tag, show_internal, should_skip", @@ -56,16 +48,15 @@ def test_should_skip_logic_with_real_metrics( self, metric_tag, show_internal, should_skip ): """Test that metrics are skipped based on flags and configuration using real metrics""" - service_config = ServiceConfig( - developer=DeveloperConfig(show_internal_metrics=show_internal) - ) - table = RealtimeMetricsTable(service_config) - - metric_result = MetricResult( - tag=metric_tag, - header="Test Metric", - unit="ms", - avg=1.0, - ) - - assert table._should_skip(metric_result) is should_skip + with patch.object(Environment.DEV, "SHOW_INTERNAL_METRICS", show_internal): + service_config = ServiceConfig() + table = RealtimeMetricsTable(service_config) + + metric_result = MetricResult( + tag=metric_tag, + header="Test Metric", + unit="ms", + avg=1.0, + ) + + assert table._should_skip(metric_result) is should_skip diff --git a/tests/workers/test_worker_manager.py b/tests/workers/test_worker_manager.py index 76145b896..fe3d149a2 100644 --- a/tests/workers/test_worker_manager.py +++ b/tests/workers/test_worker_manager.py @@ -11,11 +11,8 @@ from aiperf.common.config import EndpointConfig, ServiceConfig, UserConfig from aiperf.common.config.loadgen_config import LoadGeneratorConfig from aiperf.common.config.worker_config import WorkersConfig -from aiperf.common.constants import ( - DEFAULT_WORKER_HIGH_LOAD_CPU_USAGE, - DEFAULT_WORKER_HIGH_LOAD_RECOVERY_TIME, -) from aiperf.common.enums.worker_enums import WorkerStatus +from aiperf.common.environment import Environment from aiperf.common.messages import WorkerHealthMessage from aiperf.common.models import ProcessHealth, WorkerTaskStats from aiperf.workers.worker_manager import WorkerManager, WorkerStatusInfo @@ -192,7 +189,7 @@ def test_cpu_at_threshold_boundary( ): """Test that CPU exactly at 85% threshold does not trigger warning (boundary condition).""" message = create_health_message( - time_traveler, cpu_usage=DEFAULT_WORKER_HIGH_LOAD_CPU_USAGE + time_traveler, cpu_usage=Environment.WORKER.HIGH_LOAD_CPU_USAGE ) worker_manager._update_worker_status(worker_info, message) @@ -203,8 +200,8 @@ def test_cpu_at_threshold_boundary( @pytest.mark.parametrize( "time_offset_seconds,expected_status", [ - (DEFAULT_WORKER_HIGH_LOAD_RECOVERY_TIME / 2, WorkerStatus.HIGH_LOAD), - (DEFAULT_WORKER_HIGH_LOAD_RECOVERY_TIME + 1, WorkerStatus.HEALTHY), + (Environment.WORKER.HIGH_LOAD_RECOVERY_TIME / 2, WorkerStatus.HIGH_LOAD), + (Environment.WORKER.HIGH_LOAD_RECOVERY_TIME + 1, WorkerStatus.HEALTHY), ], ) def test_high_cpu_recovery_behavior(