diff --git a/launch_graph/generate.py b/launch_graph/generate.py index 380bb4c..745d90c 100755 --- a/launch_graph/generate.py +++ b/launch_graph/generate.py @@ -22,25 +22,37 @@ import tempfile from launch import LaunchContext, LaunchDescription -from launch.actions import IncludeLaunchDescription, GroupAction +from launch.actions import IncludeLaunchDescription, GroupAction, TimerAction from launch_ros.actions import Node def resolve_substitutions(subst, context): + """ + Resolves launch substitutions within a given context. + Handles lists of substitutions, single substitution objects, or plain strings. + """ try: if isinstance(subst, list): + # If it's a list, join the results of performing each substitution return ''.join( s.perform(context) if hasattr(s, 'perform') else str(s) for s in subst ) elif hasattr(subst, 'perform'): + # If it's a single substitution object, perform it return subst.perform(context) else: + # Otherwise, treat it as a string return str(subst) except Exception as e: + # Return an error message if resolution fails return f'' def load_launch_description_from_file(path): + """ + Loads a LaunchDescription object from a Python launch file. + It expects the file to define a 'generate_launch_description()' function. + """ spec = importlib.util.spec_from_file_location('launch_module', path) launch_module = importlib.util.module_from_spec(spec) sys.modules['launch_module'] = launch_module @@ -51,7 +63,11 @@ def load_launch_description_from_file(path): return launch_module.generate_launch_description() + def print_launch_tree(ld: LaunchDescription, indent=0, context=None): + """ + Recursively prints the structure of a LaunchDescription. + """ if context is None: context = LaunchContext() for action in ld.entities: @@ -59,9 +75,13 @@ def print_launch_tree(ld: LaunchDescription, indent=0, context=None): def print_action_tree(action, indent, context): + """ + Prints a single launch action and its nested actions. + """ prefix = ' ' * indent if isinstance(action, IncludeLaunchDescription): + # Handle IncludeLaunchDescription actions filename = '' try: source = action.launch_description_source @@ -82,6 +102,7 @@ def print_action_tree(action, indent, context): print(prefix + f'Include: {filename}') try: + # Recursively print the included launch description nested_ld = source.get_launch_description(context) if isinstance(nested_ld, LaunchDescription): print_launch_tree(nested_ld, indent + 1, context) @@ -91,6 +112,7 @@ def print_action_tree(action, indent, context): print(prefix + f'(Could not load: {e})') elif isinstance(action, Node): + # Handle Node actions try: raw_exe = getattr(action, 'node_executable', '') exe = resolve_substitutions(raw_exe, context) @@ -99,15 +121,33 @@ def print_action_tree(action, indent, context): print(prefix + f'Node: (error: {e})') elif isinstance(action, GroupAction): + # Handle GroupAction actions print(prefix + 'Group:') try: + # Recursively print sub-actions within the group for sub_action in action.get_sub_entities(): print_action_tree(sub_action, indent + 1, context) except Exception as e: print(prefix + f'(Could not evaluate group: {e})') + elif isinstance(action, TimerAction): + # Handle TimerAction actions + print(prefix + f'TimerAction (period: {action.period}):') + try: + # Recursively print actions nested within the TimerAction + for sub_action in action.actions: + print_action_tree(sub_action, indent + 1, context) + except Exception as e: + print(prefix + f'(Could not evaluate TimerAction: {e})') + else: + # Handle any other unhandled action types (optional, for debugging) + print(prefix + f'Unknown Action Type: {type(action).__name__}') + def collect_edges(ld, parent, context=None, edges=None, node_shapes=None): + """ + Recursively collects edges (parent-child relationships) and node shapes for a graph. + """ if context is None: context = LaunchContext() if edges is None: @@ -115,6 +155,7 @@ def collect_edges(ld, parent, context=None, edges=None, node_shapes=None): if node_shapes is None: node_shapes = {} + # The parent itself is a 'box' (representing a launch file or group) node_shapes[parent] = 'box' for action in ld.entities: @@ -124,9 +165,13 @@ def collect_edges(ld, parent, context=None, edges=None, node_shapes=None): def collect_action_edges(action, parent, context, edges, node_shapes): + """ + Collects edges and node shapes for a single launch action. + """ if isinstance(action, IncludeLaunchDescription): try: source = action.launch_description_source + path_str = '' if hasattr(source, 'launch_file_path'): path_str = source.launch_file_path elif hasattr(source, 'launch_file_path_substitutions'): @@ -140,12 +185,14 @@ def collect_action_edges(action, parent, context, edges, node_shapes): filename = os.path.basename(path_str) edges.add((parent, filename)) - node_shapes[filename] = 'box' + node_shapes[filename] = 'box' # Launch files are boxes nested_ld = source.get_launch_description(context) if isinstance(nested_ld, LaunchDescription): + # Recursively collect edges for the included launch description collect_edges(nested_ld, filename, context, edges, node_shapes) except Exception: + # Ignore errors during collection to continue building the graph as much as possible pass elif isinstance(action, Node): @@ -153,32 +200,50 @@ def collect_action_edges(action, parent, context, edges, node_shapes): raw_exe = getattr(action, 'node_executable', '') exe = resolve_substitutions(raw_exe, context) edges.add((parent, exe)) - node_shapes[exe] = 'ellipse' + node_shapes[exe] = 'ellipse' # Nodes are ellipses except Exception: pass elif isinstance(action, GroupAction): try: for sub_action in action.get_sub_entities(): + # Recursively collect edges for sub-actions in a group, with the same parent + collect_action_edges(sub_action, parent, context, edges, node_shapes) + except Exception: + pass + + elif isinstance(action, TimerAction): + try: + for sub_action in action.actions: + # Recursively collect edges for actions within a TimerAction, with the same parent collect_action_edges(sub_action, parent, context, edges, node_shapes) except Exception: pass def generate_dot_content(edges, node_shapes): + """ + Generates the content for a DOT graph file from collected edges and node shapes. + """ lines = [] lines.append('digraph LaunchTree {') - lines.append(' node [fontname="Arial"];') - lines.append(' rankdir=TB;') + lines.append(' node [fontname="Arial"];') # Set default font for nodes + lines.append(' rankdir=TB;') # Top-to-bottom layout for node, shape in node_shapes.items(): + # Define node shapes (box for launch files/groups, ellipse for nodes) lines.append(f' "{node}" [shape={shape}];') for parent, child in sorted(edges): + # Define directed edges lines.append(f' "{parent}" -> "{child}";') lines.append('}') return '\n'.join(lines) def main(): + """ + Main function to parse arguments, load the launch file, analyze it, + and generate the graph. + """ parser = argparse.ArgumentParser( description='Generate a graph from a ROS 2 launch file.' ) @@ -198,9 +263,11 @@ def main(): args = parser.parse_args() + # Set default output filename if not provided if not args.out: args.out = f'launch_tree.{args.format}' + # Check if the launch file exists if not os.path.exists(args.launch_file): print(f'Error: file does not exist: {args.launch_file}') sys.exit(1) @@ -210,27 +277,41 @@ def main(): root = os.path.basename(args.launch_file) print(f'Launch Tree for {args.launch_file}:\n') - print_launch_tree(ld) + print_launch_tree(ld) # Print the tree structure to console + # Collect edges and node shapes for graph generation edges, node_shapes = collect_edges(ld, parent=root) dot_content = generate_dot_content(edges, node_shapes) if args.format == 'dot': + # If DOT format is requested, write directly to the output file with open(args.out, 'w') as f: f.write(dot_content) print(f'DOT graph written to: {args.out}') else: + # For image formats (png, pdf), use Graphviz 'dot' command with tempfile.NamedTemporaryFile('w', suffix='.dot', delete=False) as f: f.write(dot_content) tmp_dot_path = f.name try: + # Call dot command to render the graph subprocess.run(['dot', f'-T{args.format}', tmp_dot_path, '-o', args.out], - check=True) + check=True) # check=True raises an exception for non-zero exit codes print(f'{args.format.upper()} generated at: {args.out}') + except FileNotFoundError: + print('Error: Graphviz "dot" command not found.') + print('Please install Graphviz to generate image formats (png, pdf).') + print('On Ubuntu/Debian: sudo apt install graphviz') + print('On macOS (with Homebrew): brew install graphviz') + print('On Windows: Download from graphviz.org and add to PATH.') + except subprocess.CalledProcessError as e: + print(f'Error running Graphviz dot command: {e}') + print(f'Check if the generated DOT file ({tmp_dot_path}) is valid.') except Exception as e: print(f'Failed to generate {args.format.upper()}: {e}') finally: + # Clean up the temporary DOT file os.remove(tmp_dot_path) except Exception as e: