Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 88 additions & 7 deletions launch_graph/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,37 @@
import tempfile

from launch import LaunchContext, LaunchDescription
from launch.actions import IncludeLaunchDescription, GroupAction
from launch.actions import IncludeLaunchDescription, GroupAction, TimerAction
from launch_ros.actions import Node


def resolve_substitutions(subst, context):
"""
Resolves launch substitutions within a given context.
Handles lists of substitutions, single substitution objects, or plain strings.
"""
try:
if isinstance(subst, list):
# If it's a list, join the results of performing each substitution
return ''.join(
s.perform(context) if hasattr(s, 'perform') else str(s) for s in subst
)
elif hasattr(subst, 'perform'):
# If it's a single substitution object, perform it
return subst.perform(context)
else:
# Otherwise, treat it as a string
return str(subst)
except Exception as e:
# Return an error message if resolution fails
return f'<could not resolve: {e}>'


def load_launch_description_from_file(path):
"""
Loads a LaunchDescription object from a Python launch file.
It expects the file to define a 'generate_launch_description()' function.
"""
spec = importlib.util.spec_from_file_location('launch_module', path)
launch_module = importlib.util.module_from_spec(spec)
sys.modules['launch_module'] = launch_module
Expand All @@ -51,17 +63,25 @@ def load_launch_description_from_file(path):

return launch_module.generate_launch_description()


def print_launch_tree(ld: LaunchDescription, indent=0, context=None):
"""
Recursively prints the structure of a LaunchDescription.
"""
if context is None:
context = LaunchContext()
for action in ld.entities:
print_action_tree(action, indent, context)


def print_action_tree(action, indent, context):
"""
Prints a single launch action and its nested actions.
"""
prefix = ' ' * indent

if isinstance(action, IncludeLaunchDescription):
# Handle IncludeLaunchDescription actions
filename = '<unknown>'
try:
source = action.launch_description_source
Expand All @@ -82,6 +102,7 @@ def print_action_tree(action, indent, context):
print(prefix + f'Include: {filename}')

try:
# Recursively print the included launch description
nested_ld = source.get_launch_description(context)
if isinstance(nested_ld, LaunchDescription):
print_launch_tree(nested_ld, indent + 1, context)
Expand All @@ -91,6 +112,7 @@ def print_action_tree(action, indent, context):
print(prefix + f'(Could not load: {e})')

elif isinstance(action, Node):
# Handle Node actions
try:
raw_exe = getattr(action, 'node_executable', '<missing>')
exe = resolve_substitutions(raw_exe, context)
Expand All @@ -99,22 +121,41 @@ def print_action_tree(action, indent, context):
print(prefix + f'Node: <missing> (error: {e})')

elif isinstance(action, GroupAction):
# Handle GroupAction actions
print(prefix + 'Group:')
try:
# Recursively print sub-actions within the group
for sub_action in action.get_sub_entities():
print_action_tree(sub_action, indent + 1, context)
except Exception as e:
print(prefix + f'(Could not evaluate group: {e})')

elif isinstance(action, TimerAction):
# Handle TimerAction actions
print(prefix + f'TimerAction (period: {action.period}):')
try:
# Recursively print actions nested within the TimerAction
for sub_action in action.actions:
print_action_tree(sub_action, indent + 1, context)
except Exception as e:
print(prefix + f'(Could not evaluate TimerAction: {e})')
else:
# Handle any other unhandled action types (optional, for debugging)
print(prefix + f'Unknown Action Type: {type(action).__name__}')


def collect_edges(ld, parent, context=None, edges=None, node_shapes=None):
"""
Recursively collects edges (parent-child relationships) and node shapes for a graph.
"""
if context is None:
context = LaunchContext()
if edges is None:
edges = set()
if node_shapes is None:
node_shapes = {}

# The parent itself is a 'box' (representing a launch file or group)
node_shapes[parent] = 'box'

for action in ld.entities:
Expand All @@ -124,9 +165,13 @@ def collect_edges(ld, parent, context=None, edges=None, node_shapes=None):


def collect_action_edges(action, parent, context, edges, node_shapes):
"""
Collects edges and node shapes for a single launch action.
"""
if isinstance(action, IncludeLaunchDescription):
try:
source = action.launch_description_source
path_str = '<unknown>'
if hasattr(source, 'launch_file_path'):
path_str = source.launch_file_path
elif hasattr(source, 'launch_file_path_substitutions'):
Expand All @@ -140,45 +185,65 @@ def collect_action_edges(action, parent, context, edges, node_shapes):

filename = os.path.basename(path_str)
edges.add((parent, filename))
node_shapes[filename] = 'box'
node_shapes[filename] = 'box' # Launch files are boxes

nested_ld = source.get_launch_description(context)
if isinstance(nested_ld, LaunchDescription):
# Recursively collect edges for the included launch description
collect_edges(nested_ld, filename, context, edges, node_shapes)
except Exception:
# Ignore errors during collection to continue building the graph as much as possible
pass

elif isinstance(action, Node):
try:
raw_exe = getattr(action, 'node_executable', '<missing>')
exe = resolve_substitutions(raw_exe, context)
edges.add((parent, exe))
node_shapes[exe] = 'ellipse'
node_shapes[exe] = 'ellipse' # Nodes are ellipses
except Exception:
pass

elif isinstance(action, GroupAction):
try:
for sub_action in action.get_sub_entities():
# Recursively collect edges for sub-actions in a group, with the same parent
collect_action_edges(sub_action, parent, context, edges, node_shapes)
except Exception:
pass

elif isinstance(action, TimerAction):
try:
for sub_action in action.actions:
# Recursively collect edges for actions within a TimerAction, with the same parent
collect_action_edges(sub_action, parent, context, edges, node_shapes)
except Exception:
pass


def generate_dot_content(edges, node_shapes):
"""
Generates the content for a DOT graph file from collected edges and node shapes.
"""
lines = []
lines.append('digraph LaunchTree {')
lines.append(' node [fontname="Arial"];')
lines.append(' rankdir=TB;')
lines.append(' node [fontname="Arial"];') # Set default font for nodes
lines.append(' rankdir=TB;') # Top-to-bottom layout
for node, shape in node_shapes.items():
# Define node shapes (box for launch files/groups, ellipse for nodes)
lines.append(f' "{node}" [shape={shape}];')
for parent, child in sorted(edges):
# Define directed edges
lines.append(f' "{parent}" -> "{child}";')
lines.append('}')
return '\n'.join(lines)


def main():
"""
Main function to parse arguments, load the launch file, analyze it,
and generate the graph.
"""
parser = argparse.ArgumentParser(
description='Generate a graph from a ROS 2 launch file.'
)
Expand All @@ -198,9 +263,11 @@ def main():

args = parser.parse_args()

# Set default output filename if not provided
if not args.out:
args.out = f'launch_tree.{args.format}'

# Check if the launch file exists
if not os.path.exists(args.launch_file):
print(f'Error: file does not exist: {args.launch_file}')
sys.exit(1)
Expand All @@ -210,27 +277,41 @@ def main():
root = os.path.basename(args.launch_file)

print(f'Launch Tree for {args.launch_file}:\n')
print_launch_tree(ld)
print_launch_tree(ld) # Print the tree structure to console

# Collect edges and node shapes for graph generation
edges, node_shapes = collect_edges(ld, parent=root)
dot_content = generate_dot_content(edges, node_shapes)

if args.format == 'dot':
# If DOT format is requested, write directly to the output file
with open(args.out, 'w') as f:
f.write(dot_content)
print(f'DOT graph written to: {args.out}')
else:
# For image formats (png, pdf), use Graphviz 'dot' command
with tempfile.NamedTemporaryFile('w', suffix='.dot', delete=False) as f:
f.write(dot_content)
tmp_dot_path = f.name

try:
# Call dot command to render the graph
subprocess.run(['dot', f'-T{args.format}', tmp_dot_path, '-o', args.out],
check=True)
check=True) # check=True raises an exception for non-zero exit codes
print(f'{args.format.upper()} generated at: {args.out}')
except FileNotFoundError:
print('Error: Graphviz "dot" command not found.')
print('Please install Graphviz to generate image formats (png, pdf).')
print('On Ubuntu/Debian: sudo apt install graphviz')
print('On macOS (with Homebrew): brew install graphviz')
print('On Windows: Download from graphviz.org and add to PATH.')
except subprocess.CalledProcessError as e:
print(f'Error running Graphviz dot command: {e}')
print(f'Check if the generated DOT file ({tmp_dot_path}) is valid.')
except Exception as e:
print(f'Failed to generate {args.format.upper()}: {e}')
finally:
# Clean up the temporary DOT file
os.remove(tmp_dot_path)

except Exception as e:
Expand Down