Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions orchestration/flows/bl832/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class FlowParameterMapper:
"alcf_recon_flow/alcf_recon_flow": [
"file_path",
"config"],
"alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": [
"file_path",
"config"],
# From move.py
"new_832_file_flow/new_file_832": [
"file_path",
Expand All @@ -25,6 +28,14 @@ class FlowParameterMapper:
# From nersc.py
"nersc_recon_flow/nersc_recon_flow": [
"file_path",
"config"],
"nersc_recon_multinode_flow/nersc_recon_multinode_flow": [
"file_path",
"num_nodes",
"config"],
"nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow": [
"file_path",
"num_nodes",
"config"]
}

Expand Down Expand Up @@ -55,23 +66,39 @@ class DecisionFlowInputModel(BaseModel):


@task(name="setup_decision_settings")
def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: bool) -> dict:
def setup_decision_settings(
alcf_recon: bool = False,
alcf_forge_recon_segment: bool = False,
nersc_recon: bool = False,
nersc_recon_multinode: bool = False,
nersc_forge_recon_segment: bool = False,
new_file_832: bool = True
) -> dict:
"""
This task is used to define the settings for the decision making process of the BL832 beamline.

:param alcf_recon: Boolean indicating whether to run the ALCF reconstruction flow.
:param alcf_forge_recon_segment: Boolean indicating whether to run the ALCF Forge reconstruction segment flow.
:param nersc_recon: Boolean indicating whether to run the NERSC reconstruction flow.
:param nersc_move: Boolean indicating whether to move files to NERSC.
:param nersc_recon_multinode: Boolean indicating whether to run the NERSC multinode reconstruction flow.
:param new_file_832: Boolean indicating whether to run the new 832 file processing flow.
:return: A dictionary containing the settings for each flow.
"""
logger = get_run_logger()
try:
logger.info(f"Setting up decision settings: alcf_recon={alcf_recon}, "
f"nersc_recon={nersc_recon}, new_file_832={new_file_832}")
f"alcf_forge_recon_segment={alcf_forge_recon_segment}, "
f"nersc_recon={nersc_recon}, "
f"nersc_recon_multinode={nersc_recon_multinode}, "
f"nersc_forge_recon_segment={nersc_forge_recon_segment}, "
f"new_file_832={new_file_832}")
# Define which flows to run based on the input settings
settings = {
"alcf_recon_flow/alcf_recon_flow": alcf_recon,
"alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": alcf_forge_recon_segment,
"nersc_recon_flow/nersc_recon_flow": nersc_recon,
"nersc_recon_multinode_flow/nersc_recon_multinode_flow": nersc_recon_multinode,
"nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow": nersc_forge_recon_segment,
"new_832_file_flow/new_file_832": new_file_832
}
# Save the settings in a JSON block for later retrieval by other flows
Expand Down Expand Up @@ -145,10 +172,27 @@ async def dispatcher(
alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params)
tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params))

if decision_settings.get("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow"):
alcf_forge_params = FlowParameterMapper.get_flow_parameters(
"alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow",
available_params
)
tasks.append(run_recon_flow_async("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", alcf_forge_params))

if decision_settings.get("nersc_recon_flow/nersc_recon_flow"):
nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params)
tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params))

if decision_settings.get("nersc_recon_multinode_flow/nersc_recon_multinode_flow"):
nersc_multinode_params = FlowParameterMapper.get_flow_parameters(
"nersc_recon_multinode_flow/nersc_recon_multinode_flow", available_params)
tasks.append(run_recon_flow_async("nersc_recon_multinode_flow/nersc_recon_multinode_flow", nersc_multinode_params))

if decision_settings.get("nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow"):
nersc_forge_recon_segment_params = FlowParameterMapper.get_flow_parameters(
"nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow", available_params)
tasks.append(run_recon_flow_async(
"nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow", nersc_forge_recon_segment_params))
# Run ALCF and NERSC flows in parallel, if any
if tasks:
try:
Expand All @@ -169,7 +213,14 @@ async def dispatcher(
"""
try:
# Setup decision settings based on input parameters
setup_decision_settings(alcf_recon=True, nersc_recon=True, new_file_832=True)
setup_decision_settings(
alcf_recon=True,
alcf_forge_recon_segment=False,
nersc_recon=True,
nersc_recon_multinode=False,
nersc_forge_recon_segment=False,
new_file_832=True
)
# Run the main decision flow with the specified parameters
# asyncio.run(dispatcher(
# config={}, # PYTEST, ALCF, NERSC
Expand Down