Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 39 additions & 18 deletions copy_with_port_portname.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,40 @@
import logging
import json

def run_specialization_script(template_script_path, output_dir, edge_params_list, python_exe, copy_script_path):
def _normalize_output_relpath(template_script_path, output_relpath=None):
if output_relpath:
relpath = output_relpath.replace("\\", "/").lstrip("/")
else:
relpath = os.path.basename(template_script_path)
if not relpath:
raise ValueError("Output relative path cannot be empty.")
return relpath


def _join_output_path(output_dir, output_relpath):
return os.path.join(output_dir, *output_relpath.split("/"))


def run_specialization_script(
template_script_path,
output_dir,
edge_params_list,
python_exe,
copy_script_path,
output_relpath=None
):
"""
Calls the copy script to generate a specialized version of a node's script.
Returns the basename of the generated script on success, None on failure.
"""
# The new copy script generates a standardized filename, e.g., "original.py"
base_template_name = os.path.basename(template_script_path)
template_root, template_ext = os.path.splitext(base_template_name)
output_filename = f"{template_root}{template_ext}"
expected_output_path = os.path.join(output_dir, output_filename)
output_relpath = _normalize_output_relpath(template_script_path, output_relpath)
expected_output_path = _join_output_path(output_dir, output_relpath)

# If the specialized file already exists, we don't need to regenerate it.
if os.path.exists(expected_output_path):
logging.info(f"Specialized script '{expected_output_path}' already exists. Using existing.")
return output_filename
return output_relpath

# Convert the list of parameters to a JSON string for command line argument
edge_params_json_str = json.dumps(edge_params_list)
Expand All @@ -31,13 +50,15 @@ def run_specialization_script(template_script_path, output_dir, edge_params_list
output_dir,
edge_params_json_str # Pass the JSON string as the last argument
]
if output_relpath:
cmd.append(output_relpath)
logging.info(f"Running specialization for '{base_template_name}': {' '.join(cmd)}")
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding='utf-8')
logging.info(f"Successfully generated specialized script '{output_filename}'.")
logging.info(f"Successfully generated specialized script '{output_relpath}'.")
if result.stdout: logging.debug(f"copy_with_port_portname.py stdout:\n{result.stdout.strip()}")
if result.stderr: logging.warning(f"copy_with_port_portname.py stderr:\n{result.stderr.strip()}")
return output_filename
return output_relpath
except subprocess.CalledProcessError as e:
logging.error(f"Error calling specialization script for '{template_script_path}':")
logging.error(f"Command: {' '.join(e.cmd)}")
Expand All @@ -50,7 +71,7 @@ def run_specialization_script(template_script_path, output_dir, edge_params_list
return None


def create_modified_script(template_script_path, output_dir, edge_params_json_str):
def create_modified_script(template_script_path, output_dir, edge_params_json_str, output_relpath=None):
"""
Creates a modified Python script by injecting ZMQ port and port name
definitions from a JSON object.
Expand Down Expand Up @@ -121,17 +142,16 @@ def create_modified_script(template_script_path, output_dir, edge_params_json_st
modified_lines = lines[:insert_index] + definitions + lines[insert_index:]

# --- Determine and create output file ---
base_template_name = os.path.basename(template_script_path)
template_root, template_ext = os.path.splitext(base_template_name)

# Standardized output filename for a node with one or more specializations
output_filename = f"{template_root}{template_ext}"
output_script_path = os.path.join(output_dir, output_filename)
output_relpath = _normalize_output_relpath(template_script_path, output_relpath)
output_script_path = _join_output_path(output_dir, output_relpath)

try:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"Created output directory: {output_dir}")
output_parent = os.path.dirname(output_script_path)
if output_parent and not os.path.exists(output_parent):
os.makedirs(output_parent, exist_ok=True)

with open(output_script_path, 'w') as f:
f.writelines(modified_lines)
Expand All @@ -149,14 +169,15 @@ def create_modified_script(template_script_path, output_dir, edge_params_json_st
datefmt='%Y-%m-%d %H:%M:%S'
)

if len(sys.argv) != 4:
print("\nUsage: python3 copy_with_port_portname.py <TEMPLATE_SCRIPT_PATH> <OUTPUT_DIRECTORY> '<JSON_PARAMETERS>'\n")
if len(sys.argv) not in [4, 5]:
print("\nUsage: python3 copy_with_port_portname.py <TEMPLATE_SCRIPT_PATH> <OUTPUT_DIRECTORY> '<JSON_PARAMETERS>' [OUTPUT_RELATIVE_PATH]\n")
print("Example JSON: '[{\"port\": \"2355\", \"port_name\": \"FUNBODY_REP_1\", \"source_node_label\": \"nodeA\", \"target_node_label\": \"nodeB\"}]'")
print("Note: The JSON string must be enclosed in single quotes in shell.\n")
sys.exit(1)

template_script_path_arg = sys.argv[1]
output_directory_arg = sys.argv[2]
json_params_arg = sys.argv[3]
output_relpath_arg = sys.argv[4] if len(sys.argv) == 5 else None

create_modified_script(template_script_path_arg, output_directory_arg, json_params_arg)
create_modified_script(template_script_path_arg, output_directory_arg, json_params_arg, output_relpath_arg)
97 changes: 63 additions & 34 deletions mkconcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,40 +418,69 @@ def cleanup_script_files():
logging.warning(f"Error processing edge for parameter aggregation: {e}")

# --- Now, run the specialization for each node that has aggregated parameters ---
if node_edge_params:
logging.info("Running script specialization process...")
specialized_scripts_output_dir = os.path.abspath(os.path.join(outdir, "src"))
os.makedirs(specialized_scripts_output_dir, exist_ok=True)

for node_id, params_list in node_edge_params.items():
current_node_full_label = nodes_dict[node_id]
try:
container_name, original_script = current_node_full_label.split(':', 1)
except ValueError:
continue # Skip if label format is wrong

if not original_script or "." not in original_script:
continue # Skip if not a script file

template_script_full_path = os.path.join(sourcedir, original_script)
if not os.path.exists(template_script_full_path):
logging.error(f"Cannot specialize: Original script '{template_script_full_path}' not found in '{sourcedir}'.")
continue

new_script_basename = copy_with_port_portname.run_specialization_script(
template_script_full_path,
specialized_scripts_output_dir,
params_list,
python_executable,
copy_script_py_path
)

if new_script_basename:
# Update nodes_dict to point to the new comprehensive specialized script
nodes_dict[node_id] = f"{container_name}:{new_script_basename}"
logging.info(f"Node ID '{node_id}' ('{container_name}') updated to use specialized script '{new_script_basename}'.")
else:
logging.error(f"Failed to generate specialized script for node ID '{node_id}'. It will retain its original script.")
if node_edge_params:
logging.info("Running script specialization process...")
specialized_scripts_output_dir = os.path.abspath(os.path.join(outdir, "src"))
os.makedirs(specialized_scripts_output_dir, exist_ok=True)

# Build one specialization plan per source script. This avoids collisions
# when multiple nodes reference the same script and need different ZMQ params.
script_edge_params = {}
script_nodes = {}
for node_id, params_list in node_edge_params.items():
current_node_full_label = nodes_dict.get(node_id, "")
try:
container_name, original_script = current_node_full_label.split(':', 1)
except ValueError:
continue

if not original_script or "." not in original_script:
continue

script_nodes.setdefault(original_script, []).append((node_id, container_name))
script_edge_params.setdefault(original_script, [])
seen_keys = {
(
p.get("port"),
p.get("port_name"),
p.get("source_node_label"),
p.get("target_node_label")
)
for p in script_edge_params[original_script]
}
for edge_param in params_list:
edge_key = (
edge_param.get("port"),
edge_param.get("port_name"),
edge_param.get("source_node_label"),
edge_param.get("target_node_label")
)
if edge_key not in seen_keys:
script_edge_params[original_script].append(edge_param)
seen_keys.add(edge_key)

for original_script, merged_params in script_edge_params.items():
template_script_full_path = os.path.join(sourcedir, original_script)
if not os.path.exists(template_script_full_path):
logging.error(f"Cannot specialize: Original script '{template_script_full_path}' not found in '{sourcedir}'.")
continue

new_script_relpath = copy_with_port_portname.run_specialization_script(
template_script_full_path,
specialized_scripts_output_dir,
merged_params,
python_executable,
copy_script_py_path,
output_relpath=original_script
)

if not new_script_relpath:
logging.error(f"Failed to generate specialized script for source '{original_script}'.")
continue

for node_id, container_name in script_nodes.get(original_script, []):
nodes_dict[node_id] = f"{container_name}:{new_script_relpath}"
logging.info(f"Node ID '{node_id}' ('{container_name}') updated to use specialized script '{new_script_relpath}'.")

#not right for PM2_1_1 and PM2_1_2
volswr = len(nodes_dict)*['']
Expand Down
41 changes: 41 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,47 @@ def test_run_command_subdir_source(self):
self.assertEqual(result.exit_code, 0)
self.assertTrue(Path('out/src/subdir/script.py').exists())

def test_run_command_shared_source_specialization_merges_edge_params(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
Path('src').mkdir()
Path('src/common.py').write_text(
"import concore\n\n"
"def step():\n"
" return None\n"
)

workflow = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<graphml xmlns="http://graphml.graphdrawing.org/xmlns" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://graphml.graphdrawing.org/xmlns http://www.yworks.com/xml/schema/graphml/1.1/ygraphml.xsd" xmlns:y="http://www.yworks.com/xml/graphml">
<key for="node" id="d6" yfiles.type="nodegraphics"/>
<key for="edge" id="d10" yfiles.type="edgegraphics"/>
<graph edgedefault="directed" id="G">
<node id="n1"><data key="d6"><y:ShapeNode><y:NodeLabel>A:common.py</y:NodeLabel></y:ShapeNode></data></node>
<node id="n2"><data key="d6"><y:ShapeNode><y:NodeLabel>B:common.py</y:NodeLabel></y:ShapeNode></data></node>
<node id="n3"><data key="d6"><y:ShapeNode><y:NodeLabel>C:common.py</y:NodeLabel></y:ShapeNode></data></node>
<edge source="n1" target="n2"><data key="d10"><y:PolyLineEdge><y:EdgeLabel>0x1000_AB</y:EdgeLabel></y:PolyLineEdge></data></edge>
<edge source="n2" target="n3"><data key="d10"><y:PolyLineEdge><y:EdgeLabel>0x1001_BC</y:EdgeLabel></y:PolyLineEdge></data></edge>
</graph>
</graphml>
"""
Path('workflow.graphml').write_text(workflow)

result = self.runner.invoke(cli, [
'run',
'workflow.graphml',
'--source', 'src',
'--output', 'out',
'--type', 'posix'
])
self.assertEqual(result.exit_code, 0)

specialized_script = Path('out/src/common.py')
self.assertTrue(specialized_script.exists())
content = specialized_script.read_text()
self.assertIn('PORT_NAME_A_B', content)
self.assertIn('PORT_A_B', content)
self.assertIn('PORT_NAME_B_C', content)
self.assertIn('PORT_B_C', content)

def test_run_command_existing_output(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
result = self.runner.invoke(cli, ['init', 'test-project'])
Expand Down