Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GPU Support for rlaunch multi #495

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions fireworks/features/multi_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import threading
import time
from multiprocessing import Manager, Process
import math

from fireworks.core.rocket_launcher import rapidfire
from fireworks.fw_config import (
Expand All @@ -20,6 +21,7 @@
get_my_host,
log_multi,
)
from warnings import warn

__author__ = "Xiaohui Qu, Anubhav Jain"
__copyright__ = "Copyright 2013, The Material Project & The Electrolyte Genome Project"
Expand Down Expand Up @@ -54,7 +56,8 @@ def ping_multilaunch(port, stop_event):


def rapidfire_process(
fworker, nlaunches, sleep, loglvl, port, node_list, sub_nproc, timeout, running_ids_dict, local_redirect
fworker, nlaunches, sleep, loglvl, port, node_list, sub_nproc, timeout, running_ids_dict, local_redirect,
gpu_id=None
):
"""
Initializes shared data with multiprocessing parameters and starts a rapidfire.
Expand All @@ -70,7 +73,13 @@ def rapidfire_process(
sub_nproc (int): number of processors of the sub job
timeout (int): # of seconds after which to stop the rapidfire process
local_redirect (bool): redirect standard input and output to local file
gpu_id (int): GPU ID to use for the sub job
"""
if gpu_id is not None:
# If the sub job is using GPU, set the CUDA_VISIBLE_DEVICES environment variable
# This will limit the GPU usage to only the specified GPU
os.environ['CUDA_VISIBLE_DEVICES'] = str(gpu_id)

ds = DataServer(address=("127.0.0.1", port), authkey=DS_PASSWORD)
ds.connect()
launchpad = ds.LaunchPad()
Expand Down Expand Up @@ -147,6 +156,7 @@ def start_rockets(
timeout=None,
running_ids_dict=None,
local_redirect=False,
gpu_lists=None
):
"""
Create each sub job and start a rocket launch in each one
Expand All @@ -168,17 +178,18 @@ def start_rockets(
processes = [
Process(
target=rapidfire_process,
args=(fworker, nlaunches, sleep, loglvl, port, nl, sub_nproc, timeout, running_ids_dict, local_redirect),
args=(fworker, nlaunches, sleep, loglvl, port, nl, sub_nproc,
timeout, running_ids_dict, local_redirect, gpu_id),
)
for nl, sub_nproc in zip(node_lists, sub_nproc_list)
for nl, sub_nproc, gpu_id in zip(node_lists, sub_nproc_list, gpu_lists)
]
for p in processes:
p.start()
time.sleep(0.15)
return processes


def split_node_lists(num_jobs, total_node_list=None, ppn=24):
def split_node_lists(num_jobs, total_node_list=None, ppn=24, gpus_per_node=None):
"""
Parse node list and processor list from nodefile contents

Expand All @@ -198,10 +209,19 @@ def split_node_lists(num_jobs, total_node_list=None, ppn=24):
sub_nnodes = nnodes // num_jobs
sub_nproc_list = [sub_nnodes * ppn] * num_jobs
node_lists = [orig_node_list[i : i + sub_nnodes] for i in range(0, nnodes, sub_nnodes)]

if gpus_per_node is not None:
gpu_lists = list(range(gpus_per_node)) * nnodes
else:
gpu_lists = [None] * nnodes
else:
sub_nproc_list = [ppn] * num_jobs
node_lists = [None] * num_jobs
return node_lists, sub_nproc_list
if gpus_per_node is not None:
gpu_lists = (list(range(gpus_per_node))*math.ceil(num_jobs/gpus_per_node))[:num_jobs]
else:
gpu_lists = [None] * num_jobs
return node_lists, sub_nproc_list, gpu_lists


# TODO: why is loglvl a required parameter??? Also nlaunches and sleep_time could have a sensible default??
Expand All @@ -217,6 +237,7 @@ def launch_multiprocess(
timeout=None,
exclude_current_node=False,
local_redirect=False,
use_gpu=False
):
"""
Launch the jobs in the job packing mode.
Expand All @@ -234,6 +255,23 @@ def launch_multiprocess(
exclude_current_node: Don't use the script launching node as a compute node
local_redirect (bool): redirect standard input and output to local file
"""
gpus_per_node = None
cuda_devices = os.environ.get("CUDA_VISIBLE_DEVICES", None)
if use_gpu and cuda_devices is not None:
# Count the number of GPUs on each node
gpus_per_node = len(os.environ["CUDA_VISIBLE_DEVICES"].split(','))
num_gpu = gpus_per_node
if total_node_list is not None:
# If the node list is specified, we need to multiply the number of GPUs by the
# number of nodes. Else we assume it is a single node job.
num_gpu = gpus_per_node * len(total_node_list)
if num_jobs > num_gpu:
raise ValueError(f"More jobs than GPUs requested. num_jobs={num_jobs},"
f" num_gpu={num_gpu}")
else:
warn('No node list specified, assuming the number of requested jobs is less'
' than the number of total GPUs available.')

# parse node file contents
if exclude_current_node:
host = get_my_host()
Expand All @@ -244,7 +282,7 @@ def launch_multiprocess(
total_node_list.remove(host)
else:
log_multi(l_logger, "The current node is not in the node list, keep the node list as is")
node_lists, sub_nproc_list = split_node_lists(num_jobs, total_node_list, ppn)
node_lists, sub_nproc_list, gpu_lists = split_node_lists(num_jobs, total_node_list, ppn, gpus_per_node)

# create shared dataserver
ds = DataServer.setup(launchpad)
Expand All @@ -264,6 +302,7 @@ def launch_multiprocess(
timeout=timeout,
running_ids_dict=running_ids_dict,
local_redirect=local_redirect,
gpu_lists=gpu_lists
)
FWData().Running_IDs = running_ids_dict

Expand Down
2 changes: 2 additions & 0 deletions fireworks/scripts/rlaunch_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def rlaunch(argv: Optional[Sequence[str]] = None) -> int:
multi_parser.add_argument(
"--local_redirect", help="Redirect stdout and stderr to the launch directory", action="store_true"
)
multi_parser.add_argument("--use_gpu", help="Whether or not the job uses GPU compute", default=False, type=bool)

parser.add_argument("-l", "--launchpad_file", help="path to launchpad file")
parser.add_argument("-w", "--fworker_file", help="path to fworker file")
Expand Down Expand Up @@ -187,6 +188,7 @@ def rlaunch(argv: Optional[Sequence[str]] = None) -> int:
timeout=args.timeout,
exclude_current_node=args.exclude_current_node,
local_redirect=args.local_redirect,
use_gpu=args.use_gpu
)
else:
launch_rocket(launchpad, fworker, args.fw_id, args.loglvl, pdb_on_exception=args.pdb)
Expand Down