Skip to content

Commit e072a42

Browse files
feat(ingest): adds get_entities_v3 method to DataHubGraph (#13045)
1 parent 40106be commit e072a42

File tree

5 files changed

+325
-2
lines changed

5 files changed

+325
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import argparse
2+
3+
from datahub.ingestion.graph.client import DataHubGraph
4+
from datahub.ingestion.graph.config import DatahubClientConfig
5+
6+
if __name__ == "__main__":
7+
parser = argparse.ArgumentParser(
8+
description="Fetch entities from DataHub using get_entities"
9+
)
10+
parser.add_argument("--token", required=False, help="DataHub access token")
11+
parser.add_argument(
12+
"--server_url",
13+
required=False,
14+
default="http://localhost:8080",
15+
help="DataHub server URL (defaults to http://localhost:8080)",
16+
)
17+
parser.add_argument(
18+
"--entity_name",
19+
required=True,
20+
help="Entity type name (e.g., dataset, dashboard, chart)",
21+
)
22+
parser.add_argument(
23+
"--urn",
24+
required=True,
25+
action="append",
26+
dest="urns",
27+
help="Entity URN(s) to fetch. Can specify multiple times.",
28+
)
29+
parser.add_argument(
30+
"--aspect",
31+
action="append",
32+
dest="aspects",
33+
help="Aspect name(s) to fetch. Can specify multiple times. If none provided, all aspects will be fetched.",
34+
)
35+
parser.add_argument(
36+
"--with-system-metadata",
37+
action="store_true",
38+
help="Include system metadata in the response.",
39+
)
40+
args = parser.parse_args()
41+
42+
# Validate that at least one URN is provided
43+
if not args.urns:
44+
parser.error("At least one --urn argument is required")
45+
46+
client = DataHubGraph(
47+
config=DatahubClientConfig(
48+
server=args.server_url,
49+
token=args.token,
50+
)
51+
)
52+
53+
response = client.get_entities(
54+
entity_name=args.entity_name,
55+
urns=args.urns,
56+
aspects=args.aspects,
57+
with_system_metadata=args.with_system_metadata,
58+
)
59+
60+
print(f"Received {len(response)} entities")
61+
62+
for urn, entity in response.items():
63+
print(f"Entity: {urn}")
64+
65+
if not entity:
66+
print("\tNo aspects found for this entity")
67+
continue
68+
69+
for aspect_name, (aspect, system_metadata) in entity.items():
70+
print(f"\tAspect: {aspect_name} Type: {type(aspect).__name__}")
71+
print(f"\t\t{aspect}")
72+
if system_metadata:
73+
print(f"\tSystem Metadata: {system_metadata}")
74+
75+
print()

metadata-ingestion/src/datahub/ingestion/graph/client.py

+104
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from requests.models import HTTPError
2828
from typing_extensions import deprecated
2929

30+
from datahub._codegen.aspect import _Aspect
3031
from datahub.cli import config_utils
3132
from datahub.configuration.common import ConfigModel, GraphError, OperationalError
3233
from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
@@ -1697,6 +1698,7 @@ def run_assertions_for_asset(
16971698

16981699
return res["runAssertionsForAsset"]
16991700

1701+
@deprecated("Use get_entities instead which returns typed aspects")
17001702
def get_entities_v2(
17011703
self,
17021704
entity_name: str,
@@ -1736,6 +1738,108 @@ def get_entities_v2(
17361738
retval[entity_urn][aspect_key] = aspect_value
17371739
return retval
17381740

1741+
def get_entities(
1742+
self,
1743+
entity_name: str,
1744+
urns: List[str],
1745+
aspects: Optional[List[str]] = None,
1746+
with_system_metadata: bool = False,
1747+
) -> Dict[str, Dict[str, Tuple[_Aspect, Optional[SystemMetadataClass]]]]:
1748+
"""
1749+
Get entities using the OpenAPI v3 endpoint, deserializing aspects into typed objects.
1750+
1751+
Args:
1752+
entity_name: The entity type name
1753+
urns: List of entity URNs to fetch
1754+
aspects: Optional list of aspect names to fetch. If None, all aspects will be fetched.
1755+
with_system_metadata: If True, return system metadata along with each aspect.
1756+
1757+
Returns:
1758+
A dictionary mapping URNs to a dictionary of aspect name to tuples of
1759+
(typed aspect object, system metadata). If with_system_metadata is False,
1760+
the system metadata in the tuple will be None.
1761+
"""
1762+
aspects = aspects or []
1763+
1764+
request_payload = []
1765+
for urn in urns:
1766+
entity_request: Dict[str, Any] = {"urn": urn}
1767+
for aspect_name in aspects:
1768+
entity_request[aspect_name] = {}
1769+
request_payload.append(entity_request)
1770+
1771+
headers: Dict[str, Any] = {
1772+
"Accept": "application/json",
1773+
"Content-Type": "application/json",
1774+
}
1775+
1776+
url = f"{self.config.server}/openapi/v3/entity/{entity_name}/batchGet"
1777+
if with_system_metadata:
1778+
url += "?systemMetadata=true"
1779+
1780+
response = self._session.post(
1781+
url, data=json.dumps(request_payload), headers=headers
1782+
)
1783+
response.raise_for_status()
1784+
entities = response.json()
1785+
1786+
result: Dict[str, Dict[str, Tuple[_Aspect, Optional[SystemMetadataClass]]]] = {}
1787+
1788+
for entity in entities:
1789+
entity_urn = entity.get("urn")
1790+
if entity_urn is None:
1791+
logger.warning(
1792+
f"Missing URN in entity response: {entity}, skipping deserialization"
1793+
)
1794+
continue
1795+
1796+
entity_aspects: Dict[
1797+
str, Tuple[_Aspect, Optional[SystemMetadataClass]]
1798+
] = {}
1799+
1800+
for aspect_name, aspect_obj in entity.items():
1801+
if aspect_name == "urn":
1802+
continue
1803+
1804+
aspect_class = ASPECT_NAME_MAP.get(aspect_name)
1805+
if aspect_class is None:
1806+
logger.warning(
1807+
f"Unknown aspect type {aspect_name}, skipping deserialization"
1808+
)
1809+
continue
1810+
1811+
aspect_value = aspect_obj.get("value")
1812+
if aspect_value is None:
1813+
logger.warning(
1814+
f"Unknown aspect value for aspect {aspect_name}, skipping deserialization"
1815+
)
1816+
continue
1817+
1818+
try:
1819+
post_json_obj = post_json_transform(aspect_value)
1820+
typed_aspect = aspect_class.from_obj(post_json_obj)
1821+
assert isinstance(typed_aspect, aspect_class) and isinstance(
1822+
typed_aspect, _Aspect
1823+
)
1824+
1825+
system_metadata = None
1826+
if with_system_metadata:
1827+
system_metadata_obj = aspect_obj.get("systemMetadata")
1828+
if system_metadata_obj:
1829+
system_metadata = SystemMetadataClass.from_obj(
1830+
system_metadata_obj
1831+
)
1832+
1833+
entity_aspects[aspect_name] = (typed_aspect, system_metadata)
1834+
except Exception as e:
1835+
logger.error(f"Error deserializing aspect {aspect_name}: {e}")
1836+
raise
1837+
1838+
if entity_aspects:
1839+
result[entity_urn] = entity_aspects
1840+
1841+
return result
1842+
17391843
def upsert_custom_assertion(
17401844
self,
17411845
urn: Optional[str],

metadata-ingestion/tests/test_helpers/graph_helpers.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from datetime import timedelta
22
from pathlib import Path
3-
from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union
3+
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union
44

5+
from datahub._codegen.aspect import _Aspect
56
from datahub.emitter.mce_builder import Aspect
67
from datahub.emitter.mcp import MetadataChangeProposalWrapper
78
from datahub.emitter.mcp_builder import mcps_from_mce
@@ -17,6 +18,7 @@
1718
from datahub.metadata.schema_classes import (
1819
ASPECT_NAME_MAP,
1920
DomainPropertiesClass,
21+
SystemMetadataClass,
2022
UsageAggregationClass,
2123
)
2224

@@ -128,6 +130,27 @@ def emit_mcp(
128130
) -> None:
129131
self.emitted.append(mcp)
130132

133+
def get_entities(
134+
self,
135+
entity_name: str,
136+
urns: List[str],
137+
aspects: Optional[List[str]] = None,
138+
with_system_metadata: bool = False,
139+
) -> Dict[str, Dict[str, Tuple[_Aspect, Optional[SystemMetadataClass]]]]:
140+
result: Dict[str, Dict[str, Tuple[_Aspect, Optional[SystemMetadataClass]]]] = {}
141+
for urn, entity in self.entity_graph.items():
142+
if urn not in urns:
143+
continue
144+
if urn not in result:
145+
result[urn] = {}
146+
for aspect_name, aspect in entity.items():
147+
if aspects and aspect_name not in aspects:
148+
continue
149+
# Mock implementation always returns None for system metadata
150+
system_metadata = None
151+
result[urn][aspect_name] = (aspect, system_metadata)
152+
return result
153+
131154
def get_emitted(
132155
self,
133156
) -> List[
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import pytest
2+
3+
from datahub.metadata.schema_classes import DatasetPropertiesClass
4+
from tests.test_helpers.graph_helpers import MockDataHubGraph
5+
6+
7+
@pytest.fixture
8+
def mock_datahub_graph(pytestconfig):
9+
graph = MockDataHubGraph()
10+
demo_data_path = pytestconfig.rootpath / "examples" / "demo_data" / "demo_data.json"
11+
graph.import_file(demo_data_path)
12+
return graph
13+
14+
15+
def test_get_entities(mock_datahub_graph):
16+
aspect_name = "datasetProperties"
17+
urn_1 = "urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_aha.hospital_beds,PROD)"
18+
urn_2 = "urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_geotab_mobility_impact.city_congestion,PROD)"
19+
20+
entities = mock_datahub_graph.get_entities(
21+
entity_name="dataset",
22+
urns=[
23+
urn_1, # misses datasetProperties aspect
24+
urn_2, # has datasetProperties aspect
25+
],
26+
aspects=[aspect_name],
27+
)
28+
29+
# Verify results
30+
assert entities
31+
assert (
32+
urn_1 in entities
33+
and aspect_name not in entities[urn_1]
34+
and entities[urn_1] == {}
35+
)
36+
assert (
37+
urn_2 in entities
38+
and aspect_name in entities[urn_2]
39+
and isinstance(entities[urn_2][aspect_name][0], DatasetPropertiesClass)
40+
and entities[urn_2][aspect_name][1] is None
41+
)
42+
43+
# Test system_metadata is always None regardless of flag
44+
entities_with_metadata = mock_datahub_graph.get_entities(
45+
entity_name="dataset",
46+
urns=[urn_2],
47+
aspects=[aspect_name],
48+
with_system_metadata=True,
49+
)
50+
assert (
51+
urn_2 in entities_with_metadata
52+
and aspect_name in entities_with_metadata[urn_2]
53+
and isinstance(
54+
entities_with_metadata[urn_2][aspect_name][0], DatasetPropertiesClass
55+
)
56+
and entities_with_metadata[urn_2][aspect_name][1] is None
57+
)

smoke-test/tests/cli/datahub_graph_test.py

+65-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44
import tenacity
55

66
from datahub.ingestion.graph.client import DataHubGraph
7-
from datahub.metadata.schema_classes import KafkaSchemaClass, SchemaMetadataClass
7+
from datahub.metadata.schema_classes import (
8+
DatasetPropertiesClass,
9+
KafkaSchemaClass,
10+
OwnershipClass,
11+
SchemaMetadataClass,
12+
SystemMetadataClass,
13+
)
814
from tests.utils import delete_urns_from_file, get_sleep_info, ingest_file_via_rest
915

1016
sleep_sec, sleep_times = get_sleep_info()
@@ -41,6 +47,64 @@ def test_get_aspect_v2(graph_client, ingest_cleanup_data):
4147
)
4248

4349

50+
def test_get_entities_v3(graph_client, ingest_cleanup_data):
51+
ownership_aspect_name = "ownership"
52+
dataset_properties_aspect_name = "datasetProperties"
53+
urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,test-rollback,PROD)"
54+
entities = graph_client.get_entities(
55+
entity_name="dataset",
56+
urns=[urn],
57+
aspects=[ownership_aspect_name, dataset_properties_aspect_name],
58+
)
59+
60+
assert entities
61+
assert len(entities) == 1 and urn in entities
62+
assert (
63+
len(entities[urn]) == 2
64+
and ownership_aspect_name in entities[urn]
65+
and dataset_properties_aspect_name in entities[urn]
66+
and isinstance(entities[urn][ownership_aspect_name][0], OwnershipClass)
67+
and isinstance(
68+
entities[urn][dataset_properties_aspect_name][0], DatasetPropertiesClass
69+
)
70+
and entities[urn][ownership_aspect_name][1] is None
71+
and entities[urn][dataset_properties_aspect_name][1] is None
72+
)
73+
assert {
74+
owner.owner for owner in entities[urn][ownership_aspect_name][0].owners
75+
} == {
76+
"urn:li:corpuser:datahub",
77+
"urn:li:corpuser:jdoe",
78+
}
79+
assert not entities[urn][dataset_properties_aspect_name][0].description
80+
assert entities[urn][dataset_properties_aspect_name][0].customProperties == {
81+
"prop1": "fakeprop",
82+
"prop2": "pikachu",
83+
}
84+
85+
# Test with system metadata
86+
entities_with_metadata = graph_client.get_entities(
87+
entity_name="dataset",
88+
urns=[urn],
89+
aspects=[ownership_aspect_name],
90+
with_system_metadata=True,
91+
)
92+
93+
assert entities_with_metadata
94+
assert len(entities_with_metadata) == 1 and urn in entities_with_metadata
95+
assert (
96+
ownership_aspect_name in entities_with_metadata[urn]
97+
and entities_with_metadata[urn][ownership_aspect_name][0]
98+
and isinstance(
99+
entities_with_metadata[urn][ownership_aspect_name][0], OwnershipClass
100+
)
101+
and entities_with_metadata[urn][ownership_aspect_name][1]
102+
and isinstance(
103+
entities_with_metadata[urn][ownership_aspect_name][1], SystemMetadataClass
104+
)
105+
)
106+
107+
44108
@tenacity.retry(
45109
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
46110
)

0 commit comments

Comments
 (0)