From fb4fa60f115a33a8ae1d3029fd1b284420b4ef03 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:07:32 +0000 Subject: [PATCH 1/8] Initial plan From 949664409cd1a0b1973f9306bf3dc6468520369f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:17:51 +0000 Subject: [PATCH 2/8] Add ancestors and descendants support for hierarchical nodes - Added _hierarchy_support flag to detect hierarchical nodes - Initialized _hierarchical_data dict for ancestors/descendants - Created RelationshipManager instances for ancestors/descendants - Updated __getattr__ to return hierarchical data - Added query generation for ancestors/descendants in both async and sync versions Co-authored-by: BeArchiTek <1334310+BeArchiTek@users.noreply.github.com> --- infrahub_sdk/node/node.py | 149 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/infrahub_sdk/node/node.py b/infrahub_sdk/node/node.py index 72624467..d282a659 100644 --- a/infrahub_sdk/node/node.py +++ b/infrahub_sdk/node/node.py @@ -58,6 +58,9 @@ def __init__(self, schema: MainSchemaTypesAPI, branch: str, data: dict | None = self._artifact_support = hasattr(schema, "inherit_from") and "CoreArtifactTarget" in schema.inherit_from self._artifact_definition_support = schema.kind == "CoreArtifactDefinition" + + # Check if this node is hierarchical (supports ancestors/descendants) + self._hierarchy_support = hasattr(schema, "hierarchy") and schema.hierarchy is not None if not self.id: self._existing = False @@ -479,6 +482,7 @@ def __init__( self._relationship_cardinality_many_data: dict[str, RelationshipManager] = {} self._relationship_cardinality_one_data: dict[str, RelatedNode] = {} + self._hierarchical_data: dict[str, RelationshipManager] = {} super().__init__(schema=schema, branch=branch or client.default_branch, data=data) @@ -532,6 +536,46 @@ def _init_relationships(self, data: dict | RelatedNode | None = None) -> None: schema=rel_schema, data=rel_data, ) + + # Initialize ancestors and descendants for hierarchical nodes + if self._hierarchy_support: + from ..schema import RelationshipSchemaAPI + + # Create pseudo-schema for ancestors (read-only, many cardinality) + ancestors_schema = RelationshipSchemaAPI( + name="ancestors", + peer=self._schema.hierarchy, # type: ignore[attr-defined] + cardinality="many", + read_only=True, + optional=True, + ) + ancestors_data = data.get("ancestors", None) if isinstance(data, dict) else None + self._hierarchical_data["ancestors"] = RelationshipManager( + name="ancestors", + client=self._client, + node=self, + branch=self._branch, + schema=ancestors_schema, + data=ancestors_data, + ) + + # Create pseudo-schema for descendants (read-only, many cardinality) + descendants_schema = RelationshipSchemaAPI( + name="descendants", + peer=self._schema.hierarchy, # type: ignore[attr-defined] + cardinality="many", + read_only=True, + optional=True, + ) + descendants_data = data.get("descendants", None) if isinstance(data, dict) else None + self._hierarchical_data["descendants"] = RelationshipManager( + name="descendants", + client=self._client, + node=self, + branch=self._branch, + schema=descendants_schema, + data=descendants_data, + ) def __getattr__(self, name: str) -> Attribute | RelationshipManager | RelatedNode: if "_attribute_data" in self.__dict__ and name in self._attribute_data: @@ -540,6 +584,8 @@ def __getattr__(self, name: str) -> Attribute | RelationshipManager | RelatedNod return self._relationship_cardinality_many_data[name] if "_relationship_cardinality_one_data" in self.__dict__ and name in self._relationship_cardinality_one_data: return self._relationship_cardinality_one_data[name] + if "_hierarchical_data" in self.__dict__ and name in self._hierarchical_data: + return self._hierarchical_data[name] raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") @@ -772,6 +818,36 @@ async def generate_query_data_node( if insert_alias: data[rel_name]["@alias"] = f"__alias__{self._schema.kind}__{rel_name}" + + # Add ancestors and descendants for hierarchical nodes if included or if prefetch_relationships is True + if self._hierarchy_support: + for hierarchical_name in ["ancestors", "descendants"]: + if exclude and hierarchical_name in exclude: + continue + + # Only include if explicitly requested or if prefetch_relationships is True + should_fetch = prefetch_relationships or (include is not None and hierarchical_name in include) + if not should_fetch: + continue + + peer_data: dict[str, Any] = {} + if should_fetch: + peer_schema = await self._client.schema.get(kind=self._schema.hierarchy, branch=self._branch) # type: ignore[attr-defined] + peer_node = InfrahubNode(client=self._client, schema=peer_schema, branch=self._branch) + peer_data = await peer_node.generate_query_data_node( + property=property, + ) + + hierarchical_data = RelationshipManager._generate_query_data(peer_data=peer_data, property=property) + # Use fragment for hierarchical fields similar to hierarchy relationships + data_node = hierarchical_data["edges"]["node"] + hierarchical_data["edges"]["node"] = {} + hierarchical_data["edges"]["node"][f"...on {self._schema.hierarchy}"] = data_node # type: ignore[attr-defined] + + data[hierarchical_name] = hierarchical_data + + if insert_alias: + data[hierarchical_name]["@alias"] = f"__alias__{self._schema.kind}__{hierarchical_name}" return data @@ -1110,6 +1186,7 @@ def __init__( self._relationship_cardinality_many_data: dict[str, RelationshipManagerSync] = {} self._relationship_cardinality_one_data: dict[str, RelatedNodeSync] = {} + self._hierarchical_data: dict[str, RelationshipManagerSync] = {} super().__init__(schema=schema, branch=branch or client.default_branch, data=data) @@ -1163,6 +1240,46 @@ def _init_relationships(self, data: dict | None = None) -> None: schema=rel_schema, data=rel_data, ) + + # Initialize ancestors and descendants for hierarchical nodes + if self._hierarchy_support: + from ..schema import RelationshipSchemaAPI + + # Create pseudo-schema for ancestors (read-only, many cardinality) + ancestors_schema = RelationshipSchemaAPI( + name="ancestors", + peer=self._schema.hierarchy, # type: ignore[attr-defined] + cardinality="many", + read_only=True, + optional=True, + ) + ancestors_data = data.get("ancestors", None) if isinstance(data, dict) else None + self._hierarchical_data["ancestors"] = RelationshipManagerSync( + name="ancestors", + client=self._client, + node=self, + branch=self._branch, + schema=ancestors_schema, + data=ancestors_data, + ) + + # Create pseudo-schema for descendants (read-only, many cardinality) + descendants_schema = RelationshipSchemaAPI( + name="descendants", + peer=self._schema.hierarchy, # type: ignore[attr-defined] + cardinality="many", + read_only=True, + optional=True, + ) + descendants_data = data.get("descendants", None) if isinstance(data, dict) else None + self._hierarchical_data["descendants"] = RelationshipManagerSync( + name="descendants", + client=self._client, + node=self, + branch=self._branch, + schema=descendants_schema, + data=descendants_data, + ) def __getattr__(self, name: str) -> Attribute | RelationshipManagerSync | RelatedNodeSync: if "_attribute_data" in self.__dict__ and name in self._attribute_data: @@ -1171,6 +1288,8 @@ def __getattr__(self, name: str) -> Attribute | RelationshipManagerSync | Relate return self._relationship_cardinality_many_data[name] if "_relationship_cardinality_one_data" in self.__dict__ and name in self._relationship_cardinality_one_data: return self._relationship_cardinality_one_data[name] + if "_hierarchical_data" in self.__dict__ and name in self._hierarchical_data: + return self._hierarchical_data[name] raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") @@ -1393,6 +1512,36 @@ def generate_query_data_node( if insert_alias: data[rel_name]["@alias"] = f"__alias__{self._schema.kind}__{rel_name}" + + # Add ancestors and descendants for hierarchical nodes if included or if prefetch_relationships is True + if self._hierarchy_support: + for hierarchical_name in ["ancestors", "descendants"]: + if exclude and hierarchical_name in exclude: + continue + + # Only include if explicitly requested or if prefetch_relationships is True + should_fetch = prefetch_relationships or (include is not None and hierarchical_name in include) + if not should_fetch: + continue + + peer_data: dict[str, Any] = {} + if should_fetch: + peer_schema = self._client.schema.get(kind=self._schema.hierarchy, branch=self._branch) # type: ignore[attr-defined] + peer_node = InfrahubNodeSync(client=self._client, schema=peer_schema, branch=self._branch) + peer_data = peer_node.generate_query_data_node( + property=property, + ) + + hierarchical_data = RelationshipManagerSync._generate_query_data(peer_data=peer_data, property=property) + # Use fragment for hierarchical fields similar to hierarchy relationships + data_node = hierarchical_data["edges"]["node"] + hierarchical_data["edges"]["node"] = {} + hierarchical_data["edges"]["node"][f"...on {self._schema.hierarchy}"] = data_node # type: ignore[attr-defined] + + data[hierarchical_name] = hierarchical_data + + if insert_alias: + data[hierarchical_name]["@alias"] = f"__alias__{self._schema.kind}__{hierarchical_name}" return data From b79cef085e9533ff597e3bba0e9dd190c6d62cca Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:27:57 +0000 Subject: [PATCH 3/8] Add comprehensive tests for hierarchical nodes ancestors and descendants - Created 16 tests covering initialization, data handling, and query generation - Tests verify ancestors and descendants are properly initialized - Tests confirm query generation includes hierarchical fields - Tests validate both async and sync node implementations - All tests pass successfully Co-authored-by: BeArchiTek <1334310+BeArchiTek@users.noreply.github.com> --- tests/unit/sdk/test_hierarchical_nodes.py | 319 ++++++++++++++++++++++ 1 file changed, 319 insertions(+) create mode 100644 tests/unit/sdk/test_hierarchical_nodes.py diff --git a/tests/unit/sdk/test_hierarchical_nodes.py b/tests/unit/sdk/test_hierarchical_nodes.py new file mode 100644 index 00000000..7a75df3d --- /dev/null +++ b/tests/unit/sdk/test_hierarchical_nodes.py @@ -0,0 +1,319 @@ +"""Tests for hierarchical nodes ancestors and descendants functionality.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync +from infrahub_sdk.schema import NodeSchema + +if TYPE_CHECKING: + from infrahub_sdk import InfrahubClient, InfrahubClientSync + +pytestmark = pytest.mark.httpx_mock(can_send_already_matched_responses=True) + + +@pytest.fixture +async def hierarchical_schema(): + """Schema for a hierarchical location node with hierarchy support.""" + data = { + "name": "Location", + "namespace": "Infra", + "default_filter": "name__value", + "attributes": [ + {"name": "name", "kind": "String", "unique": True}, + {"name": "description", "kind": "String", "optional": True}, + ], + "relationships": [ + { + "name": "parent", + "peer": "InfraLocation", + "optional": True, + "cardinality": "one", + "kind": "Hierarchy", + }, + { + "name": "children", + "peer": "InfraLocation", + "optional": True, + "cardinality": "many", + "kind": "Hierarchy", + }, + ], + } + schema_api = NodeSchema(**data).convert_api() # type: ignore + # Set hierarchy field manually since it's not part of NodeSchema but only NodeSchemaAPI + # This field would normally be set by the backend + schema_api.hierarchy = "InfraLocation" + return schema_api + + +@pytest.mark.parametrize("client_type", ["standard", "sync"]) +async def test_hierarchical_node_has_hierarchy_support( + client: InfrahubClient, client_sync: InfrahubClientSync, hierarchical_schema, client_type +): + """Test that hierarchical nodes are properly detected and support ancestors/descendants.""" + if client_type == "standard": + node = InfrahubNode(client=client, schema=hierarchical_schema) + else: + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema) + + # Check that hierarchy support is detected + assert node._hierarchy_support is True + assert hierarchical_schema.hierarchy == "InfraLocation" + + +@pytest.mark.parametrize("client_type", ["standard", "sync"]) +async def test_hierarchical_node_has_ancestors_descendants( + client: InfrahubClient, client_sync: InfrahubClientSync, hierarchical_schema, client_type +): + """Test that hierarchical nodes have ancestors and descendants attributes.""" + if client_type == "standard": + node = InfrahubNode(client=client, schema=hierarchical_schema) + else: + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema) + + # Check that ancestors and descendants are accessible + assert hasattr(node, "ancestors") + assert hasattr(node, "descendants") + + # Check they are initialized as RelationshipManager + ancestors = node.ancestors + descendants = node.descendants + + assert ancestors is not None + assert descendants is not None + assert ancestors.name == "ancestors" + assert descendants.name == "descendants" + assert ancestors.schema.read_only is True + assert descendants.schema.read_only is True + assert ancestors.schema.cardinality == "many" + assert descendants.schema.cardinality == "many" + + +@pytest.mark.parametrize("client_type", ["standard", "sync"]) +async def test_hierarchical_node_with_ancestors_data( + client: InfrahubClient, client_sync: InfrahubClientSync, hierarchical_schema, client_type +): + """Test that hierarchical nodes can be initialized with ancestors data.""" + data = { + "id": "location-1", + "name": {"value": "USA"}, + "description": {"value": "United States"}, + "ancestors": { + "edges": [ + {"node": {"id": "ancestor-1", "__typename": "InfraLocation", "display_label": "Earth"}}, + {"node": {"id": "ancestor-2", "__typename": "InfraLocation", "display_label": "Americas"}}, + ] + }, + } + + if client_type == "standard": + node = InfrahubNode(client=client, schema=hierarchical_schema, data=data) + else: + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema, data=data) + + # Check ancestors are properly initialized + assert node.ancestors.initialized is True + assert len(node.ancestors.peers) == 2 + assert node.ancestors.peers[0].id == "ancestor-1" + assert node.ancestors.peers[0].display_label == "Earth" + assert node.ancestors.peers[1].id == "ancestor-2" + assert node.ancestors.peers[1].display_label == "Americas" + + +@pytest.mark.parametrize("client_type", ["standard", "sync"]) +async def test_hierarchical_node_with_descendants_data( + client: InfrahubClient, client_sync: InfrahubClientSync, hierarchical_schema, client_type +): + """Test that hierarchical nodes can be initialized with descendants data.""" + data = { + "id": "location-1", + "name": {"value": "USA"}, + "description": {"value": "United States"}, + "descendants": { + "edges": [ + {"node": {"id": "descendant-1", "__typename": "InfraLocation", "display_label": "California"}}, + {"node": {"id": "descendant-2", "__typename": "InfraLocation", "display_label": "New York"}}, + {"node": {"id": "descendant-3", "__typename": "InfraLocation", "display_label": "Texas"}}, + ] + }, + } + + if client_type == "standard": + node = InfrahubNode(client=client, schema=hierarchical_schema, data=data) + else: + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema, data=data) + + # Check descendants are properly initialized + assert node.descendants.initialized is True + assert len(node.descendants.peers) == 3 + assert node.descendants.peers[0].id == "descendant-1" + assert node.descendants.peers[0].display_label == "California" + assert node.descendants.peers[1].id == "descendant-2" + assert node.descendants.peers[1].display_label == "New York" + assert node.descendants.peers[2].id == "descendant-3" + assert node.descendants.peers[2].display_label == "Texas" + + +@pytest.mark.parametrize("client_type", ["standard", "sync"]) +async def test_non_hierarchical_node_no_ancestors_descendants( + client: InfrahubClient, client_sync: InfrahubClientSync, location_schema, client_type +): + """Test that non-hierarchical nodes don't have ancestors/descendants.""" + if client_type == "standard": + node = InfrahubNode(client=client, schema=location_schema) + else: + node = InfrahubNodeSync(client=client_sync, schema=location_schema) + + # Check that hierarchy support is not detected + assert node._hierarchy_support is False + + # Check that accessing ancestors/descendants raises AttributeError + with pytest.raises(AttributeError, match="has no attribute 'ancestors'"): + _ = node.ancestors + + with pytest.raises(AttributeError, match="has no attribute 'descendants'"): + _ = node.descendants + + +async def test_hierarchical_node_query_generation_includes_ancestors( + client: InfrahubClient, hierarchical_schema +): + """Test that query generation includes ancestors when requested.""" + # Pre-populate schema cache to avoid fetching from server + cache_data = { + "version": "1.0", + "nodes": [hierarchical_schema.model_dump()], + } + client.schema.set_cache(cache_data) + + node = InfrahubNode(client=client, schema=hierarchical_schema) + + # Generate query with ancestors included + query_data = await node.generate_query_data_node(include=["ancestors"]) + + # Check that ancestors is in the query + assert "ancestors" in query_data + assert "edges" in query_data["ancestors"] + assert "node" in query_data["ancestors"]["edges"] + + # Check that the fragment is used for hierarchical fields + assert "...on InfraLocation" in query_data["ancestors"]["edges"]["node"] + + +async def test_hierarchical_node_query_generation_includes_descendants( + client: InfrahubClient, hierarchical_schema +): + """Test that query generation includes descendants when requested.""" + # Pre-populate schema cache to avoid fetching from server + cache_data = { + "version": "1.0", + "nodes": [hierarchical_schema.model_dump()], + } + client.schema.set_cache(cache_data) + + node = InfrahubNode(client=client, schema=hierarchical_schema) + + # Generate query with descendants included + query_data = await node.generate_query_data_node(include=["descendants"]) + + # Check that descendants is in the query + assert "descendants" in query_data + assert "edges" in query_data["descendants"] + assert "node" in query_data["descendants"]["edges"] + + # Check that the fragment is used for hierarchical fields + assert "...on InfraLocation" in query_data["descendants"]["edges"]["node"] + + +async def test_hierarchical_node_query_generation_prefetch_relationships( + client: InfrahubClient, hierarchical_schema +): + """Test that query generation includes ancestors/descendants with prefetch_relationships=True.""" + # Pre-populate schema cache to avoid fetching from server + cache_data = { + "version": "1.0", + "nodes": [hierarchical_schema.model_dump()], + } + client.schema.set_cache(cache_data) + + node = InfrahubNode(client=client, schema=hierarchical_schema) + + # Generate query with prefetch_relationships + query_data = await node.generate_query_data_node(prefetch_relationships=True) + + # Check that both ancestors and descendants are in the query + assert "ancestors" in query_data + assert "descendants" in query_data + + +async def test_hierarchical_node_query_generation_exclude(client: InfrahubClient, hierarchical_schema): + """Test that query generation respects exclude for ancestors/descendants.""" + # Pre-populate schema cache to avoid fetching from server + cache_data = { + "version": "1.0", + "nodes": [hierarchical_schema.model_dump()], + } + client.schema.set_cache(cache_data) + + node = InfrahubNode(client=client, schema=hierarchical_schema) + + # Generate query with ancestors excluded but prefetch_relationships enabled + query_data = await node.generate_query_data_node(prefetch_relationships=True, exclude=["ancestors"]) + + # Check that ancestors is excluded but descendants is present + assert "ancestors" not in query_data + assert "descendants" in query_data + + +def test_hierarchical_node_sync_query_generation_includes_ancestors( + client_sync: InfrahubClientSync, hierarchical_schema +): + """Test that sync query generation includes ancestors when requested.""" + # Set schema in cache to avoid HTTP request + cache_data = { + "version": "1.0", + "nodes": [hierarchical_schema.model_dump()], + } + client_sync.schema.set_cache(cache_data) + + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema) + + # Generate query with ancestors included + query_data = node.generate_query_data_node(include=["ancestors"]) + + # Check that ancestors is in the query + assert "ancestors" in query_data + assert "edges" in query_data["ancestors"] + assert "node" in query_data["ancestors"]["edges"] + + # Check that the fragment is used for hierarchical fields + assert "...on InfraLocation" in query_data["ancestors"]["edges"]["node"] + + +def test_hierarchical_node_sync_query_generation_includes_descendants( + client_sync: InfrahubClientSync, hierarchical_schema +): + """Test that sync query generation includes descendants when requested.""" + # Set schema in cache to avoid HTTP request + cache_data = { + "version": "1.0", + "nodes": [hierarchical_schema.model_dump()], + } + client_sync.schema.set_cache(cache_data) + + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema) + + # Generate query with descendants included + query_data = node.generate_query_data_node(include=["descendants"]) + + # Check that descendants is in the query + assert "descendants" in query_data + assert "edges" in query_data["descendants"] + assert "node" in query_data["descendants"]["edges"] + + # Check that the fragment is used for hierarchical fields + assert "...on InfraLocation" in query_data["descendants"]["edges"]["node"] From a5146ddfb6c9b333de5c550afc89af9f9d49d787 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:31:16 +0000 Subject: [PATCH 4/8] Address code review feedback - remove redundant conditional checks - Removed redundant 'if should_fetch' checks in both async and sync implementations - The checks were unnecessary since we already continue if should_fetch is False - All tests continue to pass Co-authored-by: BeArchiTek <1334310+BeArchiTek@users.noreply.github.com> --- infrahub_sdk/node/node.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/infrahub_sdk/node/node.py b/infrahub_sdk/node/node.py index d282a659..2cc75b6a 100644 --- a/infrahub_sdk/node/node.py +++ b/infrahub_sdk/node/node.py @@ -830,13 +830,11 @@ async def generate_query_data_node( if not should_fetch: continue - peer_data: dict[str, Any] = {} - if should_fetch: - peer_schema = await self._client.schema.get(kind=self._schema.hierarchy, branch=self._branch) # type: ignore[attr-defined] - peer_node = InfrahubNode(client=self._client, schema=peer_schema, branch=self._branch) - peer_data = await peer_node.generate_query_data_node( - property=property, - ) + peer_schema = await self._client.schema.get(kind=self._schema.hierarchy, branch=self._branch) # type: ignore[attr-defined] + peer_node = InfrahubNode(client=self._client, schema=peer_schema, branch=self._branch) + peer_data = await peer_node.generate_query_data_node( + property=property, + ) hierarchical_data = RelationshipManager._generate_query_data(peer_data=peer_data, property=property) # Use fragment for hierarchical fields similar to hierarchy relationships @@ -1524,13 +1522,11 @@ def generate_query_data_node( if not should_fetch: continue - peer_data: dict[str, Any] = {} - if should_fetch: - peer_schema = self._client.schema.get(kind=self._schema.hierarchy, branch=self._branch) # type: ignore[attr-defined] - peer_node = InfrahubNodeSync(client=self._client, schema=peer_schema, branch=self._branch) - peer_data = peer_node.generate_query_data_node( - property=property, - ) + peer_schema = self._client.schema.get(kind=self._schema.hierarchy, branch=self._branch) # type: ignore[attr-defined] + peer_node = InfrahubNodeSync(client=self._client, schema=peer_schema, branch=self._branch) + peer_data = peer_node.generate_query_data_node( + property=property, + ) hierarchical_data = RelationshipManagerSync._generate_query_data(peer_data=peer_data, property=property) # Use fragment for hierarchical fields similar to hierarchy relationships From 2191f4e5d12beee8388f2165b0767c08da02f4e7 Mon Sep 17 00:00:00 2001 From: Bearchitek Date: Thu, 27 Nov 2025 17:42:03 +0100 Subject: [PATCH 5/8] Add parent and children as well --- infrahub_sdk/node/constants.py | 2 + infrahub_sdk/node/node.py | 311 ++++++++++++++++------ tests/unit/sdk/test_hierarchical_nodes.py | 285 +++++++++++++++++--- 3 files changed, 486 insertions(+), 112 deletions(-) diff --git a/infrahub_sdk/node/constants.py b/infrahub_sdk/node/constants.py index 7f5217d8..a05b5b9e 100644 --- a/infrahub_sdk/node/constants.py +++ b/infrahub_sdk/node/constants.py @@ -18,4 +18,6 @@ "calling generate is only supported for CoreArtifactDefinition nodes" ) +HIERARCHY_FETCH_FEATURE_NOT_SUPPORTED_MESSAGE = "Hierarchical fields are not supported for this node." + HFID_STR_SEPARATOR = "__" diff --git a/infrahub_sdk/node/node.py b/infrahub_sdk/node/node.py index 2cc75b6a..9e5b8da5 100644 --- a/infrahub_sdk/node/node.py +++ b/infrahub_sdk/node/node.py @@ -7,13 +7,21 @@ from ..constants import InfrahubClientMode from ..exceptions import FeatureNotSupportedError, NodeNotFoundError, ResourceNotDefinedError, SchemaNotFoundError from ..graphql import Mutation, Query -from ..schema import GenericSchemaAPI, RelationshipCardinality, RelationshipKind +from ..schema import ( + GenericSchemaAPI, + ProfileSchemaAPI, + RelationshipCardinality, + RelationshipKind, + RelationshipSchemaAPI, + TemplateSchemaAPI, +) from ..utils import compare_lists, generate_short_id from .attribute import Attribute from .constants import ( ARTIFACT_DEFINITION_GENERATE_FEATURE_NOT_SUPPORTED_MESSAGE, ARTIFACT_FETCH_FEATURE_NOT_SUPPORTED_MESSAGE, ARTIFACT_GENERATE_FEATURE_NOT_SUPPORTED_MESSAGE, + HIERARCHY_FETCH_FEATURE_NOT_SUPPORTED_MESSAGE, PROPERTIES_OBJECT, ) from .related_node import RelatedNode, RelatedNodeBase, RelatedNodeSync @@ -57,10 +65,21 @@ def __init__(self, schema: MainSchemaTypesAPI, branch: str, data: dict | None = self._relationships = [item.name for item in self._schema.relationships] self._artifact_support = hasattr(schema, "inherit_from") and "CoreArtifactTarget" in schema.inherit_from + if not isinstance(schema, GenericSchemaAPI): + self._artifact_support = getattr(schema, "inherit_from", None) is not None + else: + self._artifact_support = False self._artifact_definition_support = schema.kind == "CoreArtifactDefinition" - - # Check if this node is hierarchical (supports ancestors/descendants) - self._hierarchy_support = hasattr(schema, "hierarchy") and schema.hierarchy is not None + + # Check if this node is hierarchical (supports parent/children and ancestors/descendants) + if ( + not isinstance(schema, ProfileSchemaAPI) + or not isinstance(schema, GenericSchemaAPI) + or not isinstance(schema, TemplateSchemaAPI) + ): + self._hierarchy_support = getattr(schema, "hierarchy", None) is not None + else: + self._hierarchy_support = False if not self.id: self._existing = False @@ -387,6 +406,10 @@ def _validate_artifact_support(self, message: str) -> None: if not self._artifact_support: raise FeatureNotSupportedError(message) + def _validate_hierarchy_support(self, message: str) -> None: + if not self._hierarchy_support: + raise FeatureNotSupportedError(message) + def _validate_artifact_definition_support(self, message: str) -> None: if not self._artifact_definition_support: raise FeatureNotSupportedError(message) @@ -482,7 +505,7 @@ def __init__( self._relationship_cardinality_many_data: dict[str, RelationshipManager] = {} self._relationship_cardinality_one_data: dict[str, RelatedNode] = {} - self._hierarchical_data: dict[str, RelationshipManager] = {} + self._hierarchical_data: dict[str, RelatedNode | RelationshipManager] = {} super().__init__(schema=schema, branch=branch or client.default_branch, data=data) @@ -536,15 +559,45 @@ def _init_relationships(self, data: dict | RelatedNode | None = None) -> None: schema=rel_schema, data=rel_data, ) - - # Initialize ancestors and descendants for hierarchical nodes + # Initialize parent, children, ancestors and descendants for hierarchical nodes if self._hierarchy_support: - from ..schema import RelationshipSchemaAPI - + # Create pseudo-schema for parent (cardinality one) + parent_schema = RelationshipSchemaAPI( + name="parent", + peer=self._schema.hierarchy, # type: ignore[union-attr, arg-type] + kind=RelationshipKind.HIERARCHY, + cardinality="one", + optional=True, + ) + parent_data = data.get("parent", None) if isinstance(data, dict) else None + self._hierarchical_data["parent"] = RelatedNode( + name="parent", + client=self._client, + branch=self._branch, + schema=parent_schema, + data=parent_data, + ) + # Create pseudo-schema for children (many cardinality) + children_schema = RelationshipSchemaAPI( + name="children", + peer=self._schema.hierarchy, # type: ignore[union-attr, arg-type] + kind=RelationshipKind.HIERARCHY, + cardinality="many", + optional=True, + ) + children_data = data.get("children", None) if isinstance(data, dict) else None + self._hierarchical_data["children"] = RelationshipManager( + name="children", + client=self._client, + node=self, + branch=self._branch, + schema=children_schema, + data=children_data, + ) # Create pseudo-schema for ancestors (read-only, many cardinality) ancestors_schema = RelationshipSchemaAPI( name="ancestors", - peer=self._schema.hierarchy, # type: ignore[attr-defined] + peer=self._schema.hierarchy, # type: ignore[union-attr, arg-type] cardinality="many", read_only=True, optional=True, @@ -558,11 +611,10 @@ def _init_relationships(self, data: dict | RelatedNode | None = None) -> None: schema=ancestors_schema, data=ancestors_data, ) - # Create pseudo-schema for descendants (read-only, many cardinality) descendants_schema = RelationshipSchemaAPI( name="descendants", - peer=self._schema.hierarchy, # type: ignore[attr-defined] + peer=self._schema.hierarchy, # type: ignore[union-attr, arg-type] cardinality="many", read_only=True, optional=True, @@ -674,6 +726,56 @@ async def save( self._client.store.set(node=self) + async def _process_hierarchical_fields( + self, + data: dict[str, Any], + include: list[str] | None = None, + exclude: list[str] | None = None, + prefetch_relationships: bool = False, + insert_alias: bool = False, + property: bool = False, + ) -> None: + """Process hierarchical fields (parent, children, ancestors, descendants) for hierarchical nodes.""" + self._validate_hierarchy_support(HIERARCHY_FETCH_FEATURE_NOT_SUPPORTED_MESSAGE) + + for hierarchical_name in ["parent", "children", "ancestors", "descendants"]: + if exclude and hierarchical_name in exclude: + continue + + # Only include if explicitly requested or if prefetch_relationships is True + should_fetch = prefetch_relationships or (include is not None and hierarchical_name in include) + if not should_fetch: + continue + + peer_schema = await self._client.schema.get(kind=self._schema.hierarchy, branch=self._branch) # type: ignore[union-attr, arg-type] + peer_node = InfrahubNode(client=self._client, schema=peer_schema, branch=self._branch) + # Exclude hierarchical fields from peer data to prevent infinite recursion + peer_exclude = list(exclude) if exclude else [] + peer_exclude.extend(["parent", "children", "ancestors", "descendants"]) + peer_data = await peer_node.generate_query_data_node( + exclude=peer_exclude, + property=property, + ) + + # Parent is cardinality one, others are cardinality many + if hierarchical_name == "parent": + hierarchical_data = RelatedNode._generate_query_data(peer_data=peer_data, property=property) + # Use fragment for hierarchical fields similar to hierarchy relationships + data_node = hierarchical_data["node"] + hierarchical_data["node"] = {} + hierarchical_data["node"][f"...on {self._schema.hierarchy}"] = data_node # type: ignore[union-attr] + else: + hierarchical_data = RelationshipManager._generate_query_data(peer_data=peer_data, property=property) + # Use fragment for hierarchical fields similar to hierarchy relationships + data_node = hierarchical_data["edges"]["node"] + hierarchical_data["edges"]["node"] = {} + hierarchical_data["edges"]["node"][f"...on {self._schema.hierarchy}"] = data_node # type: ignore[union-attr] + + data[hierarchical_name] = hierarchical_data + + if insert_alias: + data[hierarchical_name]["@alias"] = f"__alias__{self._schema.kind}__{hierarchical_name}" + async def generate_query_data( self, filters: dict[str, Any] | None = None, @@ -801,6 +903,7 @@ async def generate_query_data_node( property=property, ) + rel_data: dict[str, Any] if rel_schema and rel_schema.cardinality == "one": rel_data = RelatedNode._generate_query_data(peer_data=peer_data, property=property) # Nodes involved in a hierarchy are required to inherit from a common ancestor node, and graphql @@ -813,39 +916,23 @@ async def generate_query_data_node( rel_data["node"][f"...on {rel_schema.peer}"] = data_node elif rel_schema and rel_schema.cardinality == "many": rel_data = RelationshipManager._generate_query_data(peer_data=peer_data, property=property) + else: + continue data[rel_name] = rel_data if insert_alias: data[rel_name]["@alias"] = f"__alias__{self._schema.kind}__{rel_name}" - - # Add ancestors and descendants for hierarchical nodes if included or if prefetch_relationships is True - if self._hierarchy_support: - for hierarchical_name in ["ancestors", "descendants"]: - if exclude and hierarchical_name in exclude: - continue - - # Only include if explicitly requested or if prefetch_relationships is True - should_fetch = prefetch_relationships or (include is not None and hierarchical_name in include) - if not should_fetch: - continue - - peer_schema = await self._client.schema.get(kind=self._schema.hierarchy, branch=self._branch) # type: ignore[attr-defined] - peer_node = InfrahubNode(client=self._client, schema=peer_schema, branch=self._branch) - peer_data = await peer_node.generate_query_data_node( - property=property, - ) - - hierarchical_data = RelationshipManager._generate_query_data(peer_data=peer_data, property=property) - # Use fragment for hierarchical fields similar to hierarchy relationships - data_node = hierarchical_data["edges"]["node"] - hierarchical_data["edges"]["node"] = {} - hierarchical_data["edges"]["node"][f"...on {self._schema.hierarchy}"] = data_node # type: ignore[attr-defined] - - data[hierarchical_name] = hierarchical_data - - if insert_alias: - data[hierarchical_name]["@alias"] = f"__alias__{self._schema.kind}__{hierarchical_name}" + + # Add parent, children, ancestors and descendants for hierarchical nodes + await self._process_hierarchical_fields( + data=data, + include=include, + exclude=exclude, + prefetch_relationships=prefetch_relationships, + insert_alias=insert_alias, + property=property, + ) return data @@ -1184,7 +1271,7 @@ def __init__( self._relationship_cardinality_many_data: dict[str, RelationshipManagerSync] = {} self._relationship_cardinality_one_data: dict[str, RelatedNodeSync] = {} - self._hierarchical_data: dict[str, RelationshipManagerSync] = {} + self._hierarchical_data: dict[str, RelatedNodeSync | RelationshipManagerSync] = {} super().__init__(schema=schema, branch=branch or client.default_branch, data=data) @@ -1238,15 +1325,48 @@ def _init_relationships(self, data: dict | None = None) -> None: schema=rel_schema, data=rel_data, ) - - # Initialize ancestors and descendants for hierarchical nodes + + # Initialize parent, children, ancestors and descendants for hierarchical nodes if self._hierarchy_support: - from ..schema import RelationshipSchemaAPI - + # Create pseudo-schema for parent (cardinality one) + parent_schema = RelationshipSchemaAPI( + name="parent", + peer=self._schema.hierarchy, # type: ignore[union-attr, arg-type] + kind=RelationshipKind.HIERARCHY, + cardinality="one", + optional=True, + ) + parent_data = data.get("parent", None) if isinstance(data, dict) else None + self._hierarchical_data["parent"] = RelatedNodeSync( + name="parent", + client=self._client, + branch=self._branch, + schema=parent_schema, + data=parent_data, + ) + + # Create pseudo-schema for children (many cardinality) + children_schema = RelationshipSchemaAPI( + name="children", + peer=self._schema.hierarchy, # type: ignore[union-attr, arg-type] + kind=RelationshipKind.HIERARCHY, + cardinality="many", + optional=True, + ) + children_data = data.get("children", None) if isinstance(data, dict) else None + self._hierarchical_data["children"] = RelationshipManagerSync( + name="children", + client=self._client, + node=self, + branch=self._branch, + schema=children_schema, + data=children_data, + ) + # Create pseudo-schema for ancestors (read-only, many cardinality) ancestors_schema = RelationshipSchemaAPI( name="ancestors", - peer=self._schema.hierarchy, # type: ignore[attr-defined] + peer=self._schema.hierarchy, # type: ignore[union-attr, arg-type] cardinality="many", read_only=True, optional=True, @@ -1260,11 +1380,11 @@ def _init_relationships(self, data: dict | None = None) -> None: schema=ancestors_schema, data=ancestors_data, ) - + # Create pseudo-schema for descendants (read-only, many cardinality) descendants_schema = RelationshipSchemaAPI( name="descendants", - peer=self._schema.hierarchy, # type: ignore[attr-defined] + peer=self._schema.hierarchy, # type: ignore[union-attr, arg-type] cardinality="many", read_only=True, optional=True, @@ -1369,6 +1489,56 @@ def save( self._client.store.set(node=self) + def _process_hierarchical_fields( + self, + data: dict[str, Any], + include: list[str] | None = None, + exclude: list[str] | None = None, + prefetch_relationships: bool = False, + insert_alias: bool = False, + property: bool = False, + ) -> None: + """Process hierarchical fields (parent, children, ancestors, descendants) for hierarchical nodes.""" + self._validate_hierarchy_support(HIERARCHY_FETCH_FEATURE_NOT_SUPPORTED_MESSAGE) + + for hierarchical_name in ["parent", "children", "ancestors", "descendants"]: + if exclude and hierarchical_name in exclude: + continue + + # Only include if explicitly requested or if prefetch_relationships is True + should_fetch = prefetch_relationships or (include is not None and hierarchical_name in include) + if not should_fetch: + continue + + peer_schema = self._client.schema.get(kind=self._schema.hierarchy, branch=self._branch) # type: ignore[union-attr, arg-type] + peer_node = InfrahubNodeSync(client=self._client, schema=peer_schema, branch=self._branch) + # Exclude hierarchical fields from peer data to prevent infinite recursion + peer_exclude = list(exclude) if exclude else [] + peer_exclude.extend(["parent", "children", "ancestors", "descendants"]) + peer_data = peer_node.generate_query_data_node( + exclude=peer_exclude, + property=property, + ) + + # Parent is cardinality one, others are cardinality many + if hierarchical_name == "parent": + hierarchical_data = RelatedNodeSync._generate_query_data(peer_data=peer_data, property=property) + # Use fragment for hierarchical fields similar to hierarchy relationships + data_node = hierarchical_data["node"] + hierarchical_data["node"] = {} + hierarchical_data["node"][f"...on {self._schema.hierarchy}"] = data_node # type: ignore[union-attr] + else: + hierarchical_data = RelationshipManagerSync._generate_query_data(peer_data=peer_data, property=property) + # Use fragment for hierarchical fields similar to hierarchy relationships + data_node = hierarchical_data["edges"]["node"] + hierarchical_data["edges"]["node"] = {} + hierarchical_data["edges"]["node"][f"...on {self._schema.hierarchy}"] = data_node # type: ignore[union-attr] + + data[hierarchical_name] = hierarchical_data + + if insert_alias: + data[hierarchical_name]["@alias"] = f"__alias__{self._schema.kind}__{hierarchical_name}" + def generate_query_data( self, filters: dict[str, Any] | None = None, @@ -1491,8 +1661,11 @@ def generate_query_data_node( if rel_schema and should_fetch_relationship: peer_schema = self._client.schema.get(kind=rel_schema.peer, branch=self._branch) peer_node = InfrahubNodeSync(client=self._client, schema=peer_schema, branch=self._branch) - peer_data = peer_node.generate_query_data_node(include=include, exclude=exclude, property=property) + peer_data = peer_node.generate_query_data_node( + property=property, + ) + rel_data: dict[str, Any] if rel_schema and rel_schema.cardinality == "one": rel_data = RelatedNodeSync._generate_query_data(peer_data=peer_data, property=property) # Nodes involved in a hierarchy are required to inherit from a common ancestor node, and graphql @@ -1505,39 +1678,23 @@ def generate_query_data_node( rel_data["node"][f"...on {rel_schema.peer}"] = data_node elif rel_schema and rel_schema.cardinality == "many": rel_data = RelationshipManagerSync._generate_query_data(peer_data=peer_data, property=property) + else: + continue data[rel_name] = rel_data if insert_alias: data[rel_name]["@alias"] = f"__alias__{self._schema.kind}__{rel_name}" - - # Add ancestors and descendants for hierarchical nodes if included or if prefetch_relationships is True - if self._hierarchy_support: - for hierarchical_name in ["ancestors", "descendants"]: - if exclude and hierarchical_name in exclude: - continue - - # Only include if explicitly requested or if prefetch_relationships is True - should_fetch = prefetch_relationships or (include is not None and hierarchical_name in include) - if not should_fetch: - continue - - peer_schema = self._client.schema.get(kind=self._schema.hierarchy, branch=self._branch) # type: ignore[attr-defined] - peer_node = InfrahubNodeSync(client=self._client, schema=peer_schema, branch=self._branch) - peer_data = peer_node.generate_query_data_node( - property=property, - ) - - hierarchical_data = RelationshipManagerSync._generate_query_data(peer_data=peer_data, property=property) - # Use fragment for hierarchical fields similar to hierarchy relationships - data_node = hierarchical_data["edges"]["node"] - hierarchical_data["edges"]["node"] = {} - hierarchical_data["edges"]["node"][f"...on {self._schema.hierarchy}"] = data_node # type: ignore[attr-defined] - - data[hierarchical_name] = hierarchical_data - - if insert_alias: - data[hierarchical_name]["@alias"] = f"__alias__{self._schema.kind}__{hierarchical_name}" + + # Add parent, children, ancestors and descendants for hierarchical nodes + self._process_hierarchical_fields( + data=data, + include=include, + exclude=exclude, + prefetch_relationships=prefetch_relationships, + insert_alias=insert_alias, + property=property, + ) return data diff --git a/tests/unit/sdk/test_hierarchical_nodes.py b/tests/unit/sdk/test_hierarchical_nodes.py index 7a75df3d..d30c137b 100644 --- a/tests/unit/sdk/test_hierarchical_nodes.py +++ b/tests/unit/sdk/test_hierarchical_nodes.py @@ -1,4 +1,4 @@ -"""Tests for hierarchical nodes ancestors and descendants functionality.""" +"""Tests for hierarchical nodes parent, children, ancestors and descendants functionality.""" from __future__ import annotations @@ -54,7 +54,7 @@ async def hierarchical_schema(): async def test_hierarchical_node_has_hierarchy_support( client: InfrahubClient, client_sync: InfrahubClientSync, hierarchical_schema, client_type ): - """Test that hierarchical nodes are properly detected and support ancestors/descendants.""" + """Test that hierarchical nodes are properly detected and support parent/children/ancestors/descendants.""" if client_type == "standard": node = InfrahubNode(client=client, schema=hierarchical_schema) else: @@ -66,33 +66,102 @@ async def test_hierarchical_node_has_hierarchy_support( @pytest.mark.parametrize("client_type", ["standard", "sync"]) -async def test_hierarchical_node_has_ancestors_descendants( +async def test_hierarchical_node_has_all_hierarchical_fields( client: InfrahubClient, client_sync: InfrahubClientSync, hierarchical_schema, client_type ): - """Test that hierarchical nodes have ancestors and descendants attributes.""" + """Test that hierarchical nodes have parent, children, ancestors and descendants attributes.""" if client_type == "standard": node = InfrahubNode(client=client, schema=hierarchical_schema) else: node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema) - # Check that ancestors and descendants are accessible + # Check that all hierarchical fields are accessible + assert hasattr(node, "parent") + assert hasattr(node, "children") assert hasattr(node, "ancestors") assert hasattr(node, "descendants") - # Check they are initialized as RelationshipManager - ancestors = node.ancestors - descendants = node.descendants + # Check parent is initialized as RelatedNode + parent = node.parent + assert parent is not None + assert parent.name == "parent" + assert parent.schema.cardinality == "one" + + # Check children is initialized as RelationshipManager + children = node.children + assert children is not None + assert children.name == "children" + assert children.schema.cardinality == "many" + # Check ancestors is initialized as RelationshipManager + ancestors = node.ancestors assert ancestors is not None - assert descendants is not None assert ancestors.name == "ancestors" - assert descendants.name == "descendants" assert ancestors.schema.read_only is True - assert descendants.schema.read_only is True assert ancestors.schema.cardinality == "many" + + # Check descendants is initialized as RelationshipManager + descendants = node.descendants + assert descendants is not None + assert descendants.name == "descendants" + assert descendants.schema.read_only is True assert descendants.schema.cardinality == "many" +@pytest.mark.parametrize("client_type", ["standard", "sync"]) +async def test_hierarchical_node_with_parent_data( + client: InfrahubClient, client_sync: InfrahubClientSync, hierarchical_schema, client_type +): + """Test that hierarchical nodes can be initialized with parent data.""" + data = { + "id": "location-1", + "name": {"value": "California"}, + "description": {"value": "State in USA"}, + "parent": {"node": {"id": "parent-1", "__typename": "InfraLocation", "display_label": "USA"}}, + } + + if client_type == "standard": + node = InfrahubNode(client=client, schema=hierarchical_schema, data=data) + else: + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema, data=data) + + # Check parent is properly initialized + assert node.parent.initialized is True + assert node.parent.id == "parent-1" + assert node.parent.display_label == "USA" + + +@pytest.mark.parametrize("client_type", ["standard", "sync"]) +async def test_hierarchical_node_with_children_data( + client: InfrahubClient, client_sync: InfrahubClientSync, hierarchical_schema, client_type +): + """Test that hierarchical nodes can be initialized with children data.""" + data = { + "id": "location-1", + "name": {"value": "USA"}, + "description": {"value": "United States"}, + "children": { + "edges": [ + {"node": {"id": "child-1", "__typename": "InfraLocation", "display_label": "California"}}, + {"node": {"id": "child-2", "__typename": "InfraLocation", "display_label": "New York"}}, + ] + }, + } + + if client_type == "standard": + node = InfrahubNode(client=client, schema=hierarchical_schema, data=data) + else: + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema, data=data) + + # Check children are properly initialized + assert node.children.initialized is True + assert len(node.children.peers) == 2 + assert node.children.peers[0].id == "child-1" + assert node.children.peers[0].display_label == "California" + assert node.children.peers[1].id == "child-2" + assert node.children.peers[1].display_label == "New York" + + @pytest.mark.parametrize("client_type", ["standard", "sync"]) async def test_hierarchical_node_with_ancestors_data( client: InfrahubClient, client_sync: InfrahubClientSync, hierarchical_schema, client_type @@ -159,10 +228,10 @@ async def test_hierarchical_node_with_descendants_data( @pytest.mark.parametrize("client_type", ["standard", "sync"]) -async def test_non_hierarchical_node_no_ancestors_descendants( +async def test_non_hierarchical_node_no_hierarchical_fields( client: InfrahubClient, client_sync: InfrahubClientSync, location_schema, client_type ): - """Test that non-hierarchical nodes don't have ancestors/descendants.""" + """Test that non-hierarchical nodes don't have parent/children/ancestors/descendants.""" if client_type == "standard": node = InfrahubNode(client=client, schema=location_schema) else: @@ -171,7 +240,13 @@ async def test_non_hierarchical_node_no_ancestors_descendants( # Check that hierarchy support is not detected assert node._hierarchy_support is False - # Check that accessing ancestors/descendants raises AttributeError + # Check that accessing hierarchical fields raises AttributeError + with pytest.raises(AttributeError, match="has no attribute 'parent'"): + _ = node.parent + + with pytest.raises(AttributeError, match="has no attribute 'children'"): + _ = node.children + with pytest.raises(AttributeError, match="has no attribute 'ancestors'"): _ = node.ancestors @@ -179,9 +254,52 @@ async def test_non_hierarchical_node_no_ancestors_descendants( _ = node.descendants -async def test_hierarchical_node_query_generation_includes_ancestors( - client: InfrahubClient, hierarchical_schema -): +async def test_hierarchical_node_query_generation_includes_parent(client: InfrahubClient, hierarchical_schema): + """Test that query generation includes parent when requested.""" + # Pre-populate schema cache to avoid fetching from server + cache_data = { + "version": "1.0", + "nodes": [hierarchical_schema.model_dump()], + } + client.schema.set_cache(cache_data) + + node = InfrahubNode(client=client, schema=hierarchical_schema) + + # Generate query with parent included + query_data = await node.generate_query_data_node(include=["parent"]) + + # Check that parent is in the query + assert "parent" in query_data + assert "node" in query_data["parent"] + + # Check that the fragment is used for hierarchical fields + assert "...on InfraLocation" in query_data["parent"]["node"] + + +async def test_hierarchical_node_query_generation_includes_children(client: InfrahubClient, hierarchical_schema): + """Test that query generation includes children when requested.""" + # Pre-populate schema cache to avoid fetching from server + cache_data = { + "version": "1.0", + "nodes": [hierarchical_schema.model_dump()], + } + client.schema.set_cache(cache_data) + + node = InfrahubNode(client=client, schema=hierarchical_schema) + + # Generate query with children included + query_data = await node.generate_query_data_node(include=["children"]) + + # Check that children is in the query + assert "children" in query_data + assert "edges" in query_data["children"] + assert "node" in query_data["children"]["edges"] + + # Check that the fragment is used for hierarchical fields + assert "...on InfraLocation" in query_data["children"]["edges"]["node"] + + +async def test_hierarchical_node_query_generation_includes_ancestors(client: InfrahubClient, hierarchical_schema): """Test that query generation includes ancestors when requested.""" # Pre-populate schema cache to avoid fetching from server cache_data = { @@ -189,7 +307,7 @@ async def test_hierarchical_node_query_generation_includes_ancestors( "nodes": [hierarchical_schema.model_dump()], } client.schema.set_cache(cache_data) - + node = InfrahubNode(client=client, schema=hierarchical_schema) # Generate query with ancestors included @@ -204,9 +322,7 @@ async def test_hierarchical_node_query_generation_includes_ancestors( assert "...on InfraLocation" in query_data["ancestors"]["edges"]["node"] -async def test_hierarchical_node_query_generation_includes_descendants( - client: InfrahubClient, hierarchical_schema -): +async def test_hierarchical_node_query_generation_includes_descendants(client: InfrahubClient, hierarchical_schema): """Test that query generation includes descendants when requested.""" # Pre-populate schema cache to avoid fetching from server cache_data = { @@ -214,7 +330,7 @@ async def test_hierarchical_node_query_generation_includes_descendants( "nodes": [hierarchical_schema.model_dump()], } client.schema.set_cache(cache_data) - + node = InfrahubNode(client=client, schema=hierarchical_schema) # Generate query with descendants included @@ -229,46 +345,95 @@ async def test_hierarchical_node_query_generation_includes_descendants( assert "...on InfraLocation" in query_data["descendants"]["edges"]["node"] -async def test_hierarchical_node_query_generation_prefetch_relationships( - client: InfrahubClient, hierarchical_schema -): - """Test that query generation includes ancestors/descendants with prefetch_relationships=True.""" +async def test_hierarchical_node_query_generation_prefetch_relationships(client: InfrahubClient, hierarchical_schema): + """Test that query generation includes all hierarchical fields with prefetch_relationships=True.""" # Pre-populate schema cache to avoid fetching from server cache_data = { "version": "1.0", "nodes": [hierarchical_schema.model_dump()], } client.schema.set_cache(cache_data) - + node = InfrahubNode(client=client, schema=hierarchical_schema) # Generate query with prefetch_relationships query_data = await node.generate_query_data_node(prefetch_relationships=True) - # Check that both ancestors and descendants are in the query + # Check that all hierarchical fields are in the query + assert "parent" in query_data + assert "children" in query_data assert "ancestors" in query_data assert "descendants" in query_data async def test_hierarchical_node_query_generation_exclude(client: InfrahubClient, hierarchical_schema): - """Test that query generation respects exclude for ancestors/descendants.""" + """Test that query generation respects exclude for hierarchical fields.""" # Pre-populate schema cache to avoid fetching from server cache_data = { "version": "1.0", "nodes": [hierarchical_schema.model_dump()], } client.schema.set_cache(cache_data) - + node = InfrahubNode(client=client, schema=hierarchical_schema) - # Generate query with ancestors excluded but prefetch_relationships enabled - query_data = await node.generate_query_data_node(prefetch_relationships=True, exclude=["ancestors"]) + # Generate query with parent and ancestors excluded but prefetch_relationships enabled + query_data = await node.generate_query_data_node(prefetch_relationships=True, exclude=["parent", "ancestors"]) - # Check that ancestors is excluded but descendants is present + # Check that parent and ancestors are excluded but children and descendants are present + assert "parent" not in query_data assert "ancestors" not in query_data + assert "children" in query_data assert "descendants" in query_data +def test_hierarchical_node_sync_query_generation_includes_parent(client_sync: InfrahubClientSync, hierarchical_schema): + """Test that sync query generation includes parent when requested.""" + # Set schema in cache to avoid HTTP request + cache_data = { + "version": "1.0", + "nodes": [hierarchical_schema.model_dump()], + } + client_sync.schema.set_cache(cache_data) + + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema) + + # Generate query with parent included + query_data = node.generate_query_data_node(include=["parent"]) + + # Check that parent is in the query + assert "parent" in query_data + assert "node" in query_data["parent"] + + # Check that the fragment is used for hierarchical fields + assert "...on InfraLocation" in query_data["parent"]["node"] + + +def test_hierarchical_node_sync_query_generation_includes_children( + client_sync: InfrahubClientSync, hierarchical_schema +): + """Test that sync query generation includes children when requested.""" + # Set schema in cache to avoid HTTP request + cache_data = { + "version": "1.0", + "nodes": [hierarchical_schema.model_dump()], + } + client_sync.schema.set_cache(cache_data) + + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema) + + # Generate query with children included + query_data = node.generate_query_data_node(include=["children"]) + + # Check that children is in the query + assert "children" in query_data + assert "edges" in query_data["children"] + assert "node" in query_data["children"]["edges"] + + # Check that the fragment is used for hierarchical fields + assert "...on InfraLocation" in query_data["children"]["edges"]["node"] + + def test_hierarchical_node_sync_query_generation_includes_ancestors( client_sync: InfrahubClientSync, hierarchical_schema ): @@ -279,7 +444,7 @@ def test_hierarchical_node_sync_query_generation_includes_ancestors( "nodes": [hierarchical_schema.model_dump()], } client_sync.schema.set_cache(cache_data) - + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema) # Generate query with ancestors included @@ -304,7 +469,7 @@ def test_hierarchical_node_sync_query_generation_includes_descendants( "nodes": [hierarchical_schema.model_dump()], } client_sync.schema.set_cache(cache_data) - + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema) # Generate query with descendants included @@ -317,3 +482,53 @@ def test_hierarchical_node_sync_query_generation_includes_descendants( # Check that the fragment is used for hierarchical fields assert "...on InfraLocation" in query_data["descendants"]["edges"]["node"] + + +async def test_hierarchical_node_no_infinite_recursion_with_children(client: InfrahubClient, hierarchical_schema): + """Test that including children does not cause infinite recursion.""" + # Pre-populate schema cache to avoid fetching from server + cache_data = { + "version": "1.0", + "nodes": [hierarchical_schema.model_dump()], + } + client.schema.set_cache(cache_data) + + node = InfrahubNode(client=client, schema=hierarchical_schema) + + # This should not cause infinite recursion + query_data = await node.generate_query_data_node(include=["children"]) + + # Check that children is in the query + assert "children" in query_data + # But hierarchical fields should not be nested in the peer data + children_node_data = query_data["children"]["edges"]["node"]["...on InfraLocation"] + assert "parent" not in children_node_data + assert "children" not in children_node_data + assert "ancestors" not in children_node_data + assert "descendants" not in children_node_data + + +def test_hierarchical_node_sync_no_infinite_recursion_with_children( + client_sync: InfrahubClientSync, hierarchical_schema +): + """Test that including children does not cause infinite recursion in sync mode.""" + # Set schema in cache to avoid HTTP request + cache_data = { + "version": "1.0", + "nodes": [hierarchical_schema.model_dump()], + } + client_sync.schema.set_cache(cache_data) + + node = InfrahubNodeSync(client=client_sync, schema=hierarchical_schema) + + # This should not cause infinite recursion + query_data = node.generate_query_data_node(include=["children"]) + + # Check that children is in the query + assert "children" in query_data + # But hierarchical fields should not be nested in the peer data + children_node_data = query_data["children"]["edges"]["node"]["...on InfraLocation"] + assert "parent" not in children_node_data + assert "children" not in children_node_data + assert "ancestors" not in children_node_data + assert "descendants" not in children_node_data From e188689ccc49cda86e60a8ba091ca4996f0b65bc Mon Sep 17 00:00:00 2001 From: Bearchitek Date: Fri, 28 Nov 2025 10:49:25 +0100 Subject: [PATCH 6/8] Fix hierarchy support validation in InfrahubNode and InfrahubNodeSync --- infrahub_sdk/node/node.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/infrahub_sdk/node/node.py b/infrahub_sdk/node/node.py index 9e5b8da5..5048226a 100644 --- a/infrahub_sdk/node/node.py +++ b/infrahub_sdk/node/node.py @@ -736,7 +736,8 @@ async def _process_hierarchical_fields( property: bool = False, ) -> None: """Process hierarchical fields (parent, children, ancestors, descendants) for hierarchical nodes.""" - self._validate_hierarchy_support(HIERARCHY_FETCH_FEATURE_NOT_SUPPORTED_MESSAGE) + if not self._hierarchy_support: + return for hierarchical_name in ["parent", "children", "ancestors", "descendants"]: if exclude and hierarchical_name in exclude: @@ -1499,7 +1500,8 @@ def _process_hierarchical_fields( property: bool = False, ) -> None: """Process hierarchical fields (parent, children, ancestors, descendants) for hierarchical nodes.""" - self._validate_hierarchy_support(HIERARCHY_FETCH_FEATURE_NOT_SUPPORTED_MESSAGE) + if not self._hierarchy_support: + return for hierarchical_name in ["parent", "children", "ancestors", "descendants"]: if exclude and hierarchical_name in exclude: From 0259a7816fc287e151a42099bc97fd3729cc6fdd Mon Sep 17 00:00:00 2001 From: Bearchitek Date: Fri, 28 Nov 2025 10:52:58 +0100 Subject: [PATCH 7/8] linter --- infrahub_sdk/node/node.py | 1 - 1 file changed, 1 deletion(-) diff --git a/infrahub_sdk/node/node.py b/infrahub_sdk/node/node.py index 5048226a..f070104d 100644 --- a/infrahub_sdk/node/node.py +++ b/infrahub_sdk/node/node.py @@ -21,7 +21,6 @@ ARTIFACT_DEFINITION_GENERATE_FEATURE_NOT_SUPPORTED_MESSAGE, ARTIFACT_FETCH_FEATURE_NOT_SUPPORTED_MESSAGE, ARTIFACT_GENERATE_FEATURE_NOT_SUPPORTED_MESSAGE, - HIERARCHY_FETCH_FEATURE_NOT_SUPPORTED_MESSAGE, PROPERTIES_OBJECT, ) from .related_node import RelatedNode, RelatedNodeBase, RelatedNodeSync From f98b496573284e0ede57a32a74fb9ed7e8c1b15f Mon Sep 17 00:00:00 2001 From: Bearchitek Date: Fri, 28 Nov 2025 11:45:48 +0100 Subject: [PATCH 8/8] Fix _artifact_support to correctly check for CoreArtifactTarget --- infrahub_sdk/node/node.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/infrahub_sdk/node/node.py b/infrahub_sdk/node/node.py index f070104d..d5e86186 100644 --- a/infrahub_sdk/node/node.py +++ b/infrahub_sdk/node/node.py @@ -63,11 +63,12 @@ def __init__(self, schema: MainSchemaTypesAPI, branch: str, data: dict | None = self._attributes = [item.name for item in self._schema.attributes] self._relationships = [item.name for item in self._schema.relationships] - self._artifact_support = hasattr(schema, "inherit_from") and "CoreArtifactTarget" in schema.inherit_from - if not isinstance(schema, GenericSchemaAPI): - self._artifact_support = getattr(schema, "inherit_from", None) is not None - else: + # GenericSchemaAPI doesn't have inherit_from, so we need to check the type first + if isinstance(schema, GenericSchemaAPI): self._artifact_support = False + else: + inherit_from = getattr(schema, "inherit_from", None) or [] + self._artifact_support = "CoreArtifactTarget" in inherit_from self._artifact_definition_support = schema.kind == "CoreArtifactDefinition" # Check if this node is hierarchical (supports parent/children and ancestors/descendants)