Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions awx/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,7 @@ def get_related(self, obj):
access_list=self.reverse('api:project_access_list', kwargs={'pk': obj.pk}),
object_roles=self.reverse('api:project_object_roles_list', kwargs={'pk': obj.pk}),
copy=self.reverse('api:project_copy', kwargs={'pk': obj.pk}),
instance_groups=self.reverse('api:project_instance_groups_list', kwargs={'pk': obj.pk}),
)
)
if obj.organization:
Expand Down
2 changes: 2 additions & 0 deletions awx/api/urls/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ProjectNotificationTemplatesSuccessList,
ProjectObjectRolesList,
ProjectAccessList,
ProjectInstanceGroupsList,
ProjectCopy,
)

Expand Down Expand Up @@ -48,6 +49,7 @@
),
re_path(r'^(?P<pk>[0-9]+)/object_roles/$', ProjectObjectRolesList.as_view(), name='project_object_roles_list'),
re_path(r'^(?P<pk>[0-9]+)/access_list/$', ProjectAccessList.as_view(), name='project_access_list'),
re_path(r'^(?P<pk>[0-9]+)/instance_groups/$', ProjectInstanceGroupsList.as_view(), name='project_instance_groups_list'),
re_path(r'^(?P<pk>[0-9]+)/copy/$', ProjectCopy.as_view(), name='project_copy'),
]

Expand Down
9 changes: 9 additions & 0 deletions awx/api/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,15 @@ class ProjectCopy(CopyAPIView):
resource_purpose = 'copy of a project'


class ProjectInstanceGroupsList(SubListAttachDetachAPIView):
model = models.InstanceGroup
serializer_class = serializers.InstanceGroupSerializer
parent_model = models.Project
relationship = 'instance_groups'
filter_read_permission = False
resource_purpose = 'instance groups of a project'


class UserList(ListCreateAPIView):
model = models.User
serializer_class = serializers.UserSerializer
Expand Down
14 changes: 14 additions & 0 deletions awx/main/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,20 @@ def can_start(self, obj, validate_license=True):
def can_delete(self, obj):
return self.can_change(obj, None)

@check_superuser
def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False):
if relationship == "instance_groups":
if not obj.organization:
return False
return self.user in sub_obj.use_role and self.user in obj.admin_role
return super(ProjectAccess, self).can_attach(obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check)

@check_superuser
def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs):
if relationship == "instance_groups":
return self.can_attach(obj, sub_obj, relationship, *args, **kwargs)
return super(ProjectAccess, self).can_unattach(obj, sub_obj, relationship, *args, **kwargs)


class ProjectUpdateAccess(BaseAccess):
"""
Expand Down
20 changes: 20 additions & 0 deletions awx/main/models/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,26 @@ def get_notification_templates(self):
def get_notification_friendly_name(self):
return "Project Update"

@property
def preferred_instance_groups(self):
"""Return instance groups for this project update."""
if self.project and self.project.instance_groups.exists():
return list(self.project.instance_groups.all())
# No instance groups - use control plane (current behavior)
return self.control_plane_instance_group

@property
def capacity_type(self):
"""Return capacity type - 'execution' if project has instance groups."""
if self.project and self.project.instance_groups.exists():
return 'execution'
return 'control'

@property
def is_remote_update(self):
"""Returns True if running on a remote mesh node."""
return bool(self.execution_node and self.controller_node and self.execution_node != self.controller_node)

def save(self, *args, **kwargs):
added_update_fields = []
if not self.job_tags:
Expand Down
22 changes: 22 additions & 0 deletions awx/main/tasks/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,28 @@ def __init__(self, *args, **kwargs):
super(RunnerCallbackForProjectUpdate, self).__init__(*args, **kwargs)
self.playbook_new_revision = None
self.host_map = {}
self.project_artifact_path = None
self.roles_artifact_path = None
self.collections_artifact_path = None

def artifacts_handler(self, artifact_dir):
"""Process artifacts including project files for remote updates."""
super().artifacts_handler(artifact_dir)

project_update_path = os.path.join(artifact_dir, 'project_update')
if os.path.exists(project_update_path):
self.project_artifact_path = project_update_path
logger.debug(f'Found project artifacts at {project_update_path}')

project_roles_path = os.path.join(artifact_dir, 'project_roles')
if os.path.exists(project_roles_path):
self.roles_artifact_path = project_roles_path
logger.debug(f'Found roles artifacts at {project_roles_path}')

project_collections_path = os.path.join(artifact_dir, 'project_collections')
if os.path.exists(project_collections_path):
self.collections_artifact_path = project_collections_path
logger.debug(f'Found collections artifacts at {project_collections_path}')

def event_handler(self, event_data):
super_return_value = super(RunnerCallbackForProjectUpdate, self).event_handler(event_data)
Expand Down
115 changes: 111 additions & 4 deletions awx/main/tasks/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,30 @@ def _wrapped(self, *args, **kwargs):
return _wrapped


def select_execution_node_by_capacity(instances):
"""
Helper function for spawn_project_sync jobs since they need to work outside task manager context.
"""
best_instance = None
best_remaining = -1

for instance in instances:
if instance.capacity <= 0:
continue
remaining = instance.remaining_capacity
if remaining >= 0 and remaining > best_remaining:
best_instance = instance
best_remaining = remaining

# Fallback: return least-loaded node even if all are over capacity
if best_instance is None:
valid = [i for i in instances if i.capacity > 0]
if valid:
best_instance = max(valid, key=lambda i: i.remaining_capacity)

return best_instance


@task(on_duplicate='queue_one', bind=True, queue=get_task_queuename)
def dispatch_waiting_jobs(binder):
for uj in UnifiedJob.objects.filter(status='waiting', controller_node=settings.CLUSTER_HOST_ID).only('id', 'status', 'polymorphic_ctype', 'celery_task_id'):
Expand Down Expand Up @@ -832,8 +856,20 @@ def get_sync_needs(self, project, scm_branch=None):
return sync_needs

def spawn_project_sync(self, project, sync_needs, scm_branch=None):
cn = Instance.objects.my_hostname()

pu_ig = self.instance.instance_group
pu_en = Instance.objects.my_hostname()
pu_en = cn

if project.instance_groups.exists():
for project_ig in project.instance_groups.all():
candidate_instances = project_ig.instances.filter(enabled=True, node_type__in=['execution', 'hybrid'])
execution_instance = select_execution_node_by_capacity(candidate_instances)

if execution_instance and execution_instance.hostname != cn:
pu_ig = project_ig
pu_en = execution_instance.hostname
break

sync_metafields = dict(
launch_type="sync",
Expand All @@ -842,7 +878,7 @@ def spawn_project_sync(self, project, sync_needs, scm_branch=None):
status='running',
instance_group=pu_ig,
execution_node=pu_en,
controller_node=pu_en,
controller_node=cn,
celery_task_id=self.instance.celery_task_id,
)
if scm_branch and scm_branch != project.scm_branch:
Expand Down Expand Up @@ -1384,8 +1420,15 @@ def build_args(self, project_update, private_data_dir, passwords):
args = []
if getattr(settings, 'PROJECT_UPDATE_VVV', False):
args.append('-vvv')
if project_update.job_tags:
args.extend(['-t', project_update.job_tags])

job_tags = project_update.job_tags

# Add remote_update tag for automation mesh execution. (value set in pre_run_hook)
if getattr(self, '_is_remote_update', False):
job_tags = f"{job_tags},remote_update" if job_tags else "remote_update"

if job_tags:
args.extend(['-t', job_tags])
return args

def build_extra_vars_file(self, project_update, private_data_dir):
Expand Down Expand Up @@ -1458,6 +1501,9 @@ def get_password_prompts(self, passwords={}):
def pre_run_hook(self, instance, private_data_dir):
super(RunProjectUpdate, self).pre_run_hook(instance, private_data_dir)
# re-create root project folder if a natural disaster has destroyed it
# Store remote update decision for use in build_args and post_run_hook
self.private_data_dir = private_data_dir
self._is_remote_update = instance.is_remote_update
project_path = instance.project.get_project_path(check_if_exists=False)

if instance.launch_type != 'sync':
Expand Down Expand Up @@ -1537,6 +1583,11 @@ def post_run_hook(self, instance, status):
instance.scm_revision = self.runner_callback.playbook_new_revision
instance.save(update_fields=['scm_revision'])

# Handle remote execution - copy project from artifacts
if getattr(self, '_is_remote_update', False) and status == 'successful':
self._copy_project_from_artifacts(instance)
self._copy_requirements_from_artifacts(instance)

# Roles and collection folders copy to durable cache
base_path = instance.get_cache_path()
stage_path = os.path.join(base_path, 'stage')
Expand Down Expand Up @@ -1573,6 +1624,62 @@ def post_run_hook(self, instance, status):
p.inventory_files = p.inventories
p.save(update_fields=['scm_revision', 'playbook_files', 'inventory_files'])

def _copy_project_from_artifacts(self, instance):
"""Copy project files from artifacts to projects_root after remote execution."""
artifacts_project_path = self.runner_callback.project_artifact_path

if artifacts_project_path is None or not os.path.exists(artifacts_project_path):
raise PostRunError(
f'{instance.log_format} project artifacts not found after remote update: {artifacts_project_path}',
status='error',
)

project_path = instance.get_project_path(check_if_exists=False)

try:
if os.path.exists(project_path):
shutil.rmtree(project_path)

shutil.copytree(artifacts_project_path, project_path, symlinks=True)
except Exception:
raise PostRunError(
f'{instance.log_format} failed to copy project artifacts to {project_path}',
status='error',
tb=traceback.format_exc(),
)

logger.info(f'Copied project from remote execution for {instance.log_format}')
Comment thread
dhageman marked this conversation as resolved.

def _copy_requirements_from_artifacts(self, instance):
"""Copy roles and collections from artifacts to cache stage directory after remote execution."""
base_path = instance.get_cache_path()
stage_path = os.path.join(base_path, 'stage')

# Mapping: artifact path attribute -> destination subdirectory
artifact_mappings = [
('roles_artifact_path', 'requirements_roles'),
('collections_artifact_path', 'requirements_collections'),
]

for attr, dest_subdir in artifact_mappings:
artifact_path = getattr(self.runner_callback, attr, None)

if artifact_path is None or not os.path.exists(artifact_path):
logger.debug(f'{instance.log_format} {attr} path does not exist: {artifact_path}')
continue

dest_path = os.path.join(stage_path, dest_subdir)
try:
shutil.copytree(artifact_path, dest_path, symlinks=True, dirs_exist_ok=True)
except Exception:
raise PostRunError(
f'{instance.log_format} failed to copy {dest_subdir} from remote execution artifacts',
status='error',
tb=traceback.format_exc(),
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

logger.info(f'{instance.log_format} copied {dest_subdir} from remote execution artifacts')

def build_execution_environment_params(self, instance, private_data_dir):
if settings.IS_K8S:
return {}
Expand Down
Loading
Loading