From e86845a37b65f7eb36305abe28e4784824cd7a1a Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Wed, 24 Sep 2025 19:23:12 +0800 Subject: [PATCH] Configure the timeout and retry count between the master and worker --- dlrover/python/elastic_agent/torch/training.py | 11 +++++++++-- dlrover/python/master/args.py | 6 ++++++ dlrover/python/master/main.py | 1 + dlrover/python/master/node/dist_job_manager.py | 2 +- dlrover/python/scheduler/job.py | 1 + dlrover/trainer/torch/elastic_run.py | 18 ++++++++++++++++++ 6 files changed, 36 insertions(+), 3 deletions(-) diff --git a/dlrover/python/elastic_agent/torch/training.py b/dlrover/python/elastic_agent/torch/training.py index cbb5652dff..7bb1c6f421 100644 --- a/dlrover/python/elastic_agent/torch/training.py +++ b/dlrover/python/elastic_agent/torch/training.py @@ -218,6 +218,8 @@ class ElasticLaunchConfig(LaunchConfig): training_log_file: str = "" failure_node_errors: str = "" numa_affinity: bool = False + connect_master_timeout = 300 + connect_master_max_retry = 5 def set_node_unit(self, node_unit): """Set the number unit of nodes.""" @@ -316,6 +318,7 @@ def __init__( node_rank, rdzv_params: RendezvousParameters, local_world_size, + connect_master_timeout = 300, ): self._name = name self._node_rank = node_rank @@ -328,7 +331,7 @@ def __init__( ) self.pend_timeout = float(rdzv_params.get("pend_timeout", "inf")) self._client = MasterClient.singleton_instance() - self._store = MasterKVStore(self._name, timedelta(seconds=300)) + self._store = MasterKVStore(self._name, timedelta(seconds=connect_master_timeout)) lastcall_timeout = int(rdzv_params.get("lastcall_timeout", 60)) node_unit = int(rdzv_params.get("node_unit", "1")) self._client.report_rdzv_params( @@ -521,6 +524,7 @@ def __init__( training_log_file: str = "", failure_node_errors: str = "", with_diagnostician: bool = True, + connect_master_max_retry = 5, ): if version_less_than_230(): super().__init__( @@ -557,6 +561,7 @@ def __init__( node_rank=node_rank, local_world_size=config.nproc_per_node, ) + self.connect_master_max_retry = connect_master_max_retry self._agent_context = get_agent_context() self._rank_cpu_affinity = {} if self._config.numa_affinity: @@ -722,7 +727,7 @@ def _get_master_addr_port(self, store: Store) -> Tuple[str, int]: return master_addr, master_port def _safe_get_master_addr_port(self, store: Store) -> Tuple[str, int]: - for _ in range(5): + for _ in range(self.connect_master_max_retry): try: return self._get_master_addr_port(store) except Exception as e: @@ -1414,6 +1419,7 @@ def launch_agent( training_log_file=config.training_log_file, failure_node_errors=config.failure_node_errors, exit_barrier_timeout=900, + connect_master_max_retry=config.connect_master_max_retry ) shutdown_rdzv = True @@ -1512,6 +1518,7 @@ def _create_worker_spec( node_rank, rdzv_parameters, local_world_size=config.nproc_per_node, + connect_master_timeout=config.connect_master_timeout, ) spec = WorkerSpec( role=config.role, diff --git a/dlrover/python/master/args.py b/dlrover/python/master/args.py index a0bb002978..023992baa2 100644 --- a/dlrover/python/master/args.py +++ b/dlrover/python/master/args.py @@ -104,6 +104,12 @@ def _build_master_args_parser(): type=pos_int, help="The timeout value of worker task process(For PS type job).", ) + parser.add_argument( + "--dead_node_timeout", + default=600, + type=int, + help="dead node timeout in seconds", + ) return parser diff --git a/dlrover/python/master/main.py b/dlrover/python/master/main.py index 6fe744b274..40ece35557 100644 --- a/dlrover/python/master/main.py +++ b/dlrover/python/master/main.py @@ -81,6 +81,7 @@ def run(args): else: from dlrover.python.master.dist_master import DistributedJobMaster + job_args.dead_node_timeout = args.dead_node_timeout update_context(job_args) master = DistributedJobMaster(_dlrover_context.master_port, job_args) master.prepare() diff --git a/dlrover/python/master/node/dist_job_manager.py b/dlrover/python/master/node/dist_job_manager.py index 7859128bdd..00413357c0 100644 --- a/dlrover/python/master/node/dist_job_manager.py +++ b/dlrover/python/master/node/dist_job_manager.py @@ -475,7 +475,7 @@ def _monitor_nodes(self): def _monitor_node_heart_beat(self): with self._lock: try: - events = self._get_dead_node_event() + events = self._get_dead_node_event(window_interval=self._job_args.dead_node_timeout) except Exception as e: logger.warning(e) events = [] diff --git a/dlrover/python/scheduler/job.py b/dlrover/python/scheduler/job.py index ad2cc5132d..a8adfb35c4 100644 --- a/dlrover/python/scheduler/job.py +++ b/dlrover/python/scheduler/job.py @@ -107,6 +107,7 @@ def __init__(self, platform, namespace, job_name): self.cordon_fault_node = False self.xpu_type: Accelerators = Accelerators.GENERIC_CPU self.enable_suspended = False + self.dead_node_timeout = 600 @abstractmethod def initilize(self): diff --git a/dlrover/trainer/torch/elastic_run.py b/dlrover/trainer/torch/elastic_run.py index fe234dc779..a95624231b 100644 --- a/dlrover/trainer/torch/elastic_run.py +++ b/dlrover/trainer/torch/elastic_run.py @@ -214,6 +214,22 @@ def parse_args(args): action=check_env, help="Whether to test the communication performance.", ) + parser.add_argument( + "--connect-master-timeout", + "--connect_master_timeout", + type=int, + action=env, + default=30, + help="Connect master timeout in seconds.", + ) + parser.add_argument( + "--connect-master-max-retry", + "--connect_master_max_retry", + type=int, + action=env, + default=2, + help="Connect master max retry times.", + ) return parser.parse_args(args) @@ -402,6 +418,8 @@ def _elastic_config_from_args( elastic_config.rdzv_endpoint = "" join_timeout = elastic_config.rdzv_configs.get("join_timeout", 600) elastic_config.rdzv_configs["timeout"] = join_timeout + elastic_config.connect_master_timeout = args.connect_master_max_retry + elastic_config.connect_master_max_retry = args.connect_master_max_retry return elastic_config, cmd, cmd_args