Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions libs/aws/langchain_aws/graphs/neptune_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def get_details(self) -> Any:


class BaseNeptuneGraph(ABC):
def __init__(
self, property_descriptions: Optional[Dict[Tuple[str, str], str]] = None
):
self.property_descriptions = property_descriptions or {}

@property
def get_schema(self) -> str:
"""Returns the schema of the Neptune database"""
Expand Down Expand Up @@ -170,6 +175,19 @@ def _get_edge_properties(

return edge_properties

def _inject_property_descriptions(self, properties_list: List) -> List:
"""Inject property descriptions into node and edge properties."""
if not self.property_descriptions:
return properties_list

for item in properties_list:
for prop in item["properties"]:
key_field = "labels" if "labels" in item else "type"
key = (item[key_field], prop["property"])
if key in self.property_descriptions:
prop["description"] = self.property_descriptions[key]
return properties_list

def _refresh_schema(self) -> None:
"""Refreshes the Neptune graph schema information."""

Expand All @@ -183,8 +201,12 @@ def _refresh_schema(self) -> None:
}
n_labels, e_labels = self._get_labels()
triple_schema = self._get_triples(e_labels)
node_properties = self._get_node_properties(n_labels, types)
edge_properties = self._get_edge_properties(e_labels, types)
node_properties = self._inject_property_descriptions(
self._get_node_properties(n_labels, types)
)
edge_properties = self._inject_property_descriptions(
self._get_edge_properties(e_labels, types)
)

self.schema = f"""
Node properties are the following:
Expand Down Expand Up @@ -236,9 +258,10 @@ def __init__(
aws_session_token: Optional[SecretStr] = None,
endpoint_url: Optional[str] = None,
config: Optional["Config"] = None,
property_descriptions: Optional[Dict[Tuple[str, str], str]] = None,
) -> None:
"""Create a new Neptune Analytics graph wrapper instance."""

super().__init__(property_descriptions)
self.graph_identifier = graph_identifier

if client is not None:
Expand Down Expand Up @@ -319,8 +342,12 @@ def _refresh_schema(self) -> None:
data = self.query(pg_schema_query)
raw_schema = data[0]["schema"]
triple_schema = _format_triples(raw_schema["labelTriples"])
node_properties = _format_node_properties(raw_schema["nodeLabelDetails"])
edge_properties = _format_edge_properties(raw_schema["edgeLabelDetails"])
node_properties = self._inject_property_descriptions(
_format_node_properties(raw_schema["nodeLabelDetails"])
)
edge_properties = self._inject_property_descriptions(
_format_edge_properties(raw_schema["edgeLabelDetails"])
)

self.schema = f"""
Node properties are the following:
Expand Down Expand Up @@ -384,8 +411,10 @@ def __init__(
aws_session_token: Optional[SecretStr] = None,
endpoint_url: Optional[str] = None,
config: Optional["Config"] = None,
property_descriptions: Optional[Dict[Tuple[str, str], str]] = None,
) -> None:
"""Create a new Neptune graph wrapper instance."""
super().__init__(property_descriptions)

try:
if client is not None:
Expand Down