diff --git a/copy_with_port_portname.py b/copy_with_port_portname.py index 1a0a033..1398092 100644 --- a/copy_with_port_portname.py +++ b/copy_with_port_portname.py @@ -5,21 +5,40 @@ import logging import json -def run_specialization_script(template_script_path, output_dir, edge_params_list, python_exe, copy_script_path): +def _normalize_output_relpath(template_script_path, output_relpath=None): + if output_relpath: + relpath = output_relpath.replace("\\", "/").lstrip("/") + else: + relpath = os.path.basename(template_script_path) + if not relpath: + raise ValueError("Output relative path cannot be empty.") + return relpath + + +def _join_output_path(output_dir, output_relpath): + return os.path.join(output_dir, *output_relpath.split("/")) + + +def run_specialization_script( + template_script_path, + output_dir, + edge_params_list, + python_exe, + copy_script_path, + output_relpath=None +): """ Calls the copy script to generate a specialized version of a node's script. Returns the basename of the generated script on success, None on failure. """ - # The new copy script generates a standardized filename, e.g., "original.py" base_template_name = os.path.basename(template_script_path) - template_root, template_ext = os.path.splitext(base_template_name) - output_filename = f"{template_root}{template_ext}" - expected_output_path = os.path.join(output_dir, output_filename) + output_relpath = _normalize_output_relpath(template_script_path, output_relpath) + expected_output_path = _join_output_path(output_dir, output_relpath) # If the specialized file already exists, we don't need to regenerate it. if os.path.exists(expected_output_path): logging.info(f"Specialized script '{expected_output_path}' already exists. Using existing.") - return output_filename + return output_relpath # Convert the list of parameters to a JSON string for command line argument edge_params_json_str = json.dumps(edge_params_list) @@ -31,13 +50,15 @@ def run_specialization_script(template_script_path, output_dir, edge_params_list output_dir, edge_params_json_str # Pass the JSON string as the last argument ] + if output_relpath: + cmd.append(output_relpath) logging.info(f"Running specialization for '{base_template_name}': {' '.join(cmd)}") try: result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding='utf-8') - logging.info(f"Successfully generated specialized script '{output_filename}'.") + logging.info(f"Successfully generated specialized script '{output_relpath}'.") if result.stdout: logging.debug(f"copy_with_port_portname.py stdout:\n{result.stdout.strip()}") if result.stderr: logging.warning(f"copy_with_port_portname.py stderr:\n{result.stderr.strip()}") - return output_filename + return output_relpath except subprocess.CalledProcessError as e: logging.error(f"Error calling specialization script for '{template_script_path}':") logging.error(f"Command: {' '.join(e.cmd)}") @@ -50,7 +71,7 @@ def run_specialization_script(template_script_path, output_dir, edge_params_list return None -def create_modified_script(template_script_path, output_dir, edge_params_json_str): +def create_modified_script(template_script_path, output_dir, edge_params_json_str, output_relpath=None): """ Creates a modified Python script by injecting ZMQ port and port name definitions from a JSON object. @@ -121,17 +142,16 @@ def create_modified_script(template_script_path, output_dir, edge_params_json_st modified_lines = lines[:insert_index] + definitions + lines[insert_index:] # --- Determine and create output file --- - base_template_name = os.path.basename(template_script_path) - template_root, template_ext = os.path.splitext(base_template_name) - - # Standardized output filename for a node with one or more specializations - output_filename = f"{template_root}{template_ext}" - output_script_path = os.path.join(output_dir, output_filename) + output_relpath = _normalize_output_relpath(template_script_path, output_relpath) + output_script_path = _join_output_path(output_dir, output_relpath) try: if not os.path.exists(output_dir): os.makedirs(output_dir) print(f"Created output directory: {output_dir}") + output_parent = os.path.dirname(output_script_path) + if output_parent and not os.path.exists(output_parent): + os.makedirs(output_parent, exist_ok=True) with open(output_script_path, 'w') as f: f.writelines(modified_lines) @@ -149,8 +169,8 @@ def create_modified_script(template_script_path, output_dir, edge_params_json_st datefmt='%Y-%m-%d %H:%M:%S' ) - if len(sys.argv) != 4: - print("\nUsage: python3 copy_with_port_portname.py ''\n") + if len(sys.argv) not in [4, 5]: + print("\nUsage: python3 copy_with_port_portname.py '' [OUTPUT_RELATIVE_PATH]\n") print("Example JSON: '[{\"port\": \"2355\", \"port_name\": \"FUNBODY_REP_1\", \"source_node_label\": \"nodeA\", \"target_node_label\": \"nodeB\"}]'") print("Note: The JSON string must be enclosed in single quotes in shell.\n") sys.exit(1) @@ -158,5 +178,6 @@ def create_modified_script(template_script_path, output_dir, edge_params_json_st template_script_path_arg = sys.argv[1] output_directory_arg = sys.argv[2] json_params_arg = sys.argv[3] + output_relpath_arg = sys.argv[4] if len(sys.argv) == 5 else None - create_modified_script(template_script_path_arg, output_directory_arg, json_params_arg) \ No newline at end of file + create_modified_script(template_script_path_arg, output_directory_arg, json_params_arg, output_relpath_arg) diff --git a/mkconcore.py b/mkconcore.py index 0a11948..000d2df 100644 --- a/mkconcore.py +++ b/mkconcore.py @@ -418,40 +418,69 @@ def cleanup_script_files(): logging.warning(f"Error processing edge for parameter aggregation: {e}") # --- Now, run the specialization for each node that has aggregated parameters --- -if node_edge_params: - logging.info("Running script specialization process...") - specialized_scripts_output_dir = os.path.abspath(os.path.join(outdir, "src")) - os.makedirs(specialized_scripts_output_dir, exist_ok=True) - - for node_id, params_list in node_edge_params.items(): - current_node_full_label = nodes_dict[node_id] - try: - container_name, original_script = current_node_full_label.split(':', 1) - except ValueError: - continue # Skip if label format is wrong - - if not original_script or "." not in original_script: - continue # Skip if not a script file - - template_script_full_path = os.path.join(sourcedir, original_script) - if not os.path.exists(template_script_full_path): - logging.error(f"Cannot specialize: Original script '{template_script_full_path}' not found in '{sourcedir}'.") - continue - - new_script_basename = copy_with_port_portname.run_specialization_script( - template_script_full_path, - specialized_scripts_output_dir, - params_list, - python_executable, - copy_script_py_path - ) - - if new_script_basename: - # Update nodes_dict to point to the new comprehensive specialized script - nodes_dict[node_id] = f"{container_name}:{new_script_basename}" - logging.info(f"Node ID '{node_id}' ('{container_name}') updated to use specialized script '{new_script_basename}'.") - else: - logging.error(f"Failed to generate specialized script for node ID '{node_id}'. It will retain its original script.") +if node_edge_params: + logging.info("Running script specialization process...") + specialized_scripts_output_dir = os.path.abspath(os.path.join(outdir, "src")) + os.makedirs(specialized_scripts_output_dir, exist_ok=True) + + # Build one specialization plan per source script. This avoids collisions + # when multiple nodes reference the same script and need different ZMQ params. + script_edge_params = {} + script_nodes = {} + for node_id, params_list in node_edge_params.items(): + current_node_full_label = nodes_dict.get(node_id, "") + try: + container_name, original_script = current_node_full_label.split(':', 1) + except ValueError: + continue + + if not original_script or "." not in original_script: + continue + + script_nodes.setdefault(original_script, []).append((node_id, container_name)) + script_edge_params.setdefault(original_script, []) + seen_keys = { + ( + p.get("port"), + p.get("port_name"), + p.get("source_node_label"), + p.get("target_node_label") + ) + for p in script_edge_params[original_script] + } + for edge_param in params_list: + edge_key = ( + edge_param.get("port"), + edge_param.get("port_name"), + edge_param.get("source_node_label"), + edge_param.get("target_node_label") + ) + if edge_key not in seen_keys: + script_edge_params[original_script].append(edge_param) + seen_keys.add(edge_key) + + for original_script, merged_params in script_edge_params.items(): + template_script_full_path = os.path.join(sourcedir, original_script) + if not os.path.exists(template_script_full_path): + logging.error(f"Cannot specialize: Original script '{template_script_full_path}' not found in '{sourcedir}'.") + continue + + new_script_relpath = copy_with_port_portname.run_specialization_script( + template_script_full_path, + specialized_scripts_output_dir, + merged_params, + python_executable, + copy_script_py_path, + output_relpath=original_script + ) + + if not new_script_relpath: + logging.error(f"Failed to generate specialized script for source '{original_script}'.") + continue + + for node_id, container_name in script_nodes.get(original_script, []): + nodes_dict[node_id] = f"{container_name}:{new_script_relpath}" + logging.info(f"Node ID '{node_id}' ('{container_name}') updated to use specialized script '{new_script_relpath}'.") #not right for PM2_1_1 and PM2_1_2 volswr = len(nodes_dict)*[''] diff --git a/tests/test_cli.py b/tests/test_cli.py index f852d5c..a48ea4f 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -138,6 +138,47 @@ def test_run_command_subdir_source(self): self.assertEqual(result.exit_code, 0) self.assertTrue(Path('out/src/subdir/script.py').exists()) + def test_run_command_shared_source_specialization_merges_edge_params(self): + with self.runner.isolated_filesystem(temp_dir=self.temp_dir): + Path('src').mkdir() + Path('src/common.py').write_text( + "import concore\n\n" + "def step():\n" + " return None\n" + ) + + workflow = """ + + + + + A:common.py + B:common.py + C:common.py + 0x1000_AB + 0x1001_BC + + +""" + Path('workflow.graphml').write_text(workflow) + + result = self.runner.invoke(cli, [ + 'run', + 'workflow.graphml', + '--source', 'src', + '--output', 'out', + '--type', 'posix' + ]) + self.assertEqual(result.exit_code, 0) + + specialized_script = Path('out/src/common.py') + self.assertTrue(specialized_script.exists()) + content = specialized_script.read_text() + self.assertIn('PORT_NAME_A_B', content) + self.assertIn('PORT_A_B', content) + self.assertIn('PORT_NAME_B_C', content) + self.assertIn('PORT_B_C', content) + def test_run_command_existing_output(self): with self.runner.isolated_filesystem(temp_dir=self.temp_dir): result = self.runner.invoke(cli, ['init', 'test-project'])