Skip to content

Commit 8874bbb

Browse files
authored
Add XCCL backend for distributed recipes (#2605)
1 parent 6b24e31 commit 8874bbb

7 files changed

+63
-16
lines changed

recipes/dev/lora_finetune_distributed_multi_dataset.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,16 @@ def __init__(self, cfg: DictConfig) -> None:
137137
"full fp16 training is not supported with this recipe. Please use bf16 or fp32 instead."
138138
)
139139

140+
# Set up the backend for distributed training (NCCL, GLOO, etc.)
141+
self._enable_async_checkpointing = cfg.get("enable_async_checkpointing", False)
142+
self.fsdp_cpu_offload = cfg.get("fsdp_cpu_offload", False)
143+
self.distributed_backend = training.get_distributed_backend(
144+
cfg.device,
145+
offload_ops_to_cpu=self.fsdp_cpu_offload
146+
or self._enable_async_checkpointing,
147+
)
148+
init_process_group(self.distributed_backend)
149+
140150
self.world_size, self.rank = utils.get_world_size_and_rank()
141151

142152
self._is_rank_zero = self.rank == 0
@@ -146,9 +156,10 @@ def __init__(self, cfg: DictConfig) -> None:
146156
self._log_every_n_steps = cfg.get("log_every_n_steps", 1)
147157
self._log_peak_memory_stats = cfg.get("log_peak_memory_stats", False)
148158

149-
if self._log_peak_memory_stats and self._device.type != "cuda":
159+
if self._log_peak_memory_stats and self._device.type not in {"cuda", "xpu"}:
150160
log.info(
151-
"log_peak_memory_stats was set to True, however, training does not use cuda. Setting log_peak_memory_stats=False."
161+
"log_peak_memory_stats was set to True, however, training does not use cuda or xpu."
162+
"Setting log_peak_memory_stats=False."
152163
)
153164
self._log_peak_memory_stats = False
154165

@@ -938,7 +949,6 @@ def recipe_main(cfg: DictConfig) -> None:
938949
"Distributed finetune recipe should be run via a distributed launcher."
939950
"If using tune CLI, please specify --nnodes 1 and --nproc_per_node [num_gpus]"
940951
)
941-
init_process_group("cuda:nccl,cpu:gloo")
942952
if cfg.get("fsdp_cpu_offload", False):
943953
# Utilize all available CPU cores for intra-op parallelism. This provides ~2x
944954
# speed up when benchmarking fused AdamW on CPU

recipes/full_dpo_distributed.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,21 @@ def __init__(self, cfg: DictConfig) -> None:
129129
self._log_every_n_steps = cfg.get("log_every_n_steps", 1)
130130
self._log_peak_memory_stats = cfg.get("log_peak_memory_stats", False)
131131

132-
if self._log_peak_memory_stats and self._device.type != "cuda":
132+
if self._log_peak_memory_stats and self._device.type not in {"cuda", "xpu"}:
133133
log.info(
134-
"log_peak_memory_stats was set to True, however, training does not use cuda. Setting log_peak_memory_stats=False."
134+
"log_peak_memory_stats was set to True, however, training does not use cuda or xpu."
135+
"Setting log_peak_memory_stats=False."
135136
)
136137
self._log_peak_memory_stats = False
137138

139+
# Set up the backend for distributed training (NCCL, GLOO, etc.)
140+
self._enable_async_checkpointing = cfg.get("enable_async_checkpointing", False)
141+
self.fsdp_cpu_offload = cfg.get("fsdp_cpu_offload", False)
142+
self.distributed_backend = training.get_distributed_backend(
143+
cfg.device, offload_ops_to_cpu=True
144+
)
145+
init_process_group(self.distributed_backend)
146+
138147
self.world_size, self.rank = get_world_size_and_rank()
139148
self._is_rank_zero = self.rank == 0
140149

@@ -1070,8 +1079,6 @@ def recipe_main(cfg: DictConfig) -> None:
10701079
"Distributed finetune recipe should be run via a distributed launcher."
10711080
"If using tune CLI, please specify --nnodes 1 and --nproc_per_node [num_gpus]"
10721081
)
1073-
1074-
init_process_group("cuda:nccl,cpu:gloo")
10751082
if cfg.get("fsdp_cpu_offload", False):
10761083
# Utilize all available CPU cores for intra-op parallelism. This provides ~2x
10771084
# speed up when benchmarking fused AdamW on CPU

recipes/full_finetune_distributed.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,10 @@ def __init__(self, cfg: DictConfig) -> None:
177177
self._output_dir = cfg.output_dir
178178
self._log_every_n_steps = cfg.get("log_every_n_steps", 1)
179179
self._log_peak_memory_stats = cfg.get("log_peak_memory_stats", False)
180-
if self._log_peak_memory_stats and device_type != "cuda":
180+
if self._log_peak_memory_stats and self._device.type not in {"cuda", "xpu"}:
181181
log.info(
182-
"log_peak_memory_stats was set to True, however, training does not use cuda. Setting log_peak_memory_stats=False."
182+
"log_peak_memory_stats was set to True, however, training does not use cuda or xpu."
183+
"Setting log_peak_memory_stats=False."
183184
)
184185
self._log_peak_memory_stats = False
185186

recipes/knowledge_distillation_distributed.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,16 @@ def __init__(self, cfg: DictConfig) -> None:
117117
"fp16 precision is not supported in this recipe. Please use fp32 or bf16."
118118
)
119119

120+
# Set up the backend for distributed training (NCCL, GLOO, etc.)
121+
self._enable_async_checkpointing = cfg.get("enable_async_checkpointing", False)
122+
self.fsdp_cpu_offload = cfg.get("fsdp_cpu_offload", False)
123+
self.distributed_backend = training.get_distributed_backend(
124+
cfg.device,
125+
offload_ops_to_cpu=self.fsdp_cpu_offload
126+
or self._enable_async_checkpointing,
127+
)
128+
init_process_group(self.distributed_backend)
129+
120130
self.world_size, self.rank = utils.get_world_size_and_rank()
121131

122132
self._is_rank_zero = self.rank == 0
@@ -959,7 +969,6 @@ def recipe_main(cfg: DictConfig) -> None:
959969
"Distributed finetune recipe should be run via a distributed launcher."
960970
"If using tune CLI, please specify --nnodes 1 and --nproc_per_node [num_gpus]"
961971
)
962-
init_process_group("cuda:nccl,cpu:gloo")
963972
if cfg.get("fsdp_cpu_offload", False):
964973
# Utilize all available CPU cores for intra-op parallelism. This provides ~2x
965974
# speed up when benchmarking fused AdamW on CPU

recipes/lora_dpo_distributed.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,14 @@ def __init__(self, cfg: DictConfig) -> None:
134134
"full fp16 training is not supported with this recipe. Please use bf16 or fp32 instead."
135135
)
136136

137+
# Set up the backend for distributed training (NCCL, GLOO, etc.)
138+
self._enable_async_checkpointing = cfg.get("enable_async_checkpointing", False)
139+
self.fsdp_cpu_offload = cfg.get("fsdp_cpu_offload", False)
140+
self.distributed_backend = training.get_distributed_backend(
141+
cfg.device, offload_ops_to_cpu=True
142+
)
143+
init_process_group(self.distributed_backend)
144+
137145
self.world_size, self.rank = utils.get_world_size_and_rank()
138146

139147
self._is_rank_zero = self.rank == 0
@@ -143,9 +151,10 @@ def __init__(self, cfg: DictConfig) -> None:
143151
self._log_every_n_steps = cfg.get("log_every_n_steps", 1)
144152
self._log_peak_memory_stats = cfg.get("log_peak_memory_stats", False)
145153

146-
if self._log_peak_memory_stats and self._device.type != "cuda":
154+
if self._log_peak_memory_stats and self._device.type not in {"cuda", "xpu"}:
147155
log.info(
148-
"log_peak_memory_stats was set to True, however, training does not use cuda. Setting log_peak_memory_stats=False."
156+
"log_peak_memory_stats was set to True, however, training does not use cuda or xpu."
157+
"Setting log_peak_memory_stats=False."
149158
)
150159
self._log_peak_memory_stats = False
151160

@@ -851,7 +860,6 @@ def recipe_main(cfg: DictConfig) -> None:
851860
"Distributed finetune recipe should be run via a distributed launcher."
852861
"If using tune CLI, please specify --nnodes 1 and --nproc_per_node [num_gpus]"
853862
)
854-
init_process_group("cuda:nccl,cpu:gloo")
855863
if cfg.get("fsdp_cpu_offload", False):
856864
# Utilize all available CPU cores for intra-op parallelism. This provides ~2x
857865
# speed up when benchmarking fused AdamW on CPU

recipes/lora_finetune_distributed.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,16 @@ def __init__(self, cfg: DictConfig) -> None:
135135
"full fp16 training is not supported with this recipe. Please use bf16 or fp32 instead."
136136
)
137137

138+
# Set up the backend for distributed training (NCCL, GLOO, etc.)
139+
self._enable_async_checkpointing = cfg.get("enable_async_checkpointing", False)
140+
self.fsdp_cpu_offload = cfg.get("fsdp_cpu_offload", False)
141+
self.distributed_backend = training.get_distributed_backend(
142+
cfg.device,
143+
offload_ops_to_cpu=self.fsdp_cpu_offload
144+
or self._enable_async_checkpointing,
145+
)
146+
init_process_group(self.distributed_backend)
147+
138148
self.world_size, self.rank = utils.get_world_size_and_rank()
139149

140150
self._is_rank_zero = self.rank == 0
@@ -144,9 +154,10 @@ def __init__(self, cfg: DictConfig) -> None:
144154
self._log_every_n_steps = cfg.get("log_every_n_steps", 1)
145155
self._log_peak_memory_stats = cfg.get("log_peak_memory_stats", False)
146156

147-
if self._log_peak_memory_stats and self._device.type != "cuda":
157+
if self._log_peak_memory_stats and self._device.type not in {"cuda", "xpu"}:
148158
log.info(
149-
"log_peak_memory_stats was set to True, however, training does not use cuda. Setting log_peak_memory_stats=False."
159+
"log_peak_memory_stats was set to True, however, training does not use cuda or xpu."
160+
"Setting log_peak_memory_stats=False."
150161
)
151162
self._log_peak_memory_stats = False
152163

@@ -977,7 +988,6 @@ def recipe_main(cfg: DictConfig) -> None:
977988
"Distributed finetune recipe should be run via a distributed launcher."
978989
"If using tune CLI, please specify --nnodes 1 and --nproc_per_node [num_gpus]"
979990
)
980-
init_process_group("cuda:nccl,cpu:gloo")
981991
if cfg.get("fsdp_cpu_offload", False):
982992
# Utilize all available CPU cores for intra-op parallelism. This provides ~2x
983993
# speed up when benchmarking fused AdamW on CPU

torchtune/training/_distributed.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ def _broadcast_tensor(tensor: torch.Tensor, src: int = 0) -> torch.Tensor:
165165
device = tensor.device
166166
if dist.get_backend() == "nccl":
167167
tensor = tensor.to(get_device("cuda"))
168+
elif dist.get_backend() == "xccl":
169+
tensor = tensor.to(get_device("xpu"))
168170
dist.broadcast(tensor, src=src, group=None)
169171
return tensor.to(device)
170172
else:

0 commit comments

Comments
 (0)