Skip to content
Closed
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions cognee/api/v1/cognify/ontology_graph_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import asyncio
from typing import Union, Dict, List, Tuple
from rdflib import Graph
from uuid import uuid4
from io import IOBase, BytesIO
from pydantic import Field

from cognee.low_level import DataPoint
from cognee.infrastructure.engine import Edge
from cognee.modules.users.methods import get_default_user
from cognee.modules.data.methods import create_dataset
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.tasks.storage import add_data_points
from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.tasks.task import Task
import hashlib
from uuid import UUID


# -----------------------------
# STEP 1: Load ontology triples
# -----------------------------
async def load_ontology_data(ontology_file: Union[str, bytes, IOBase], format: str) -> list[dict]:
"""Parses RDF/OWL ontology into subject-predicate-object triples."""
g = Graph()
if isinstance(ontology_file, bytes):
ontology_file = BytesIO(ontology_file)
if isinstance(ontology_file, IOBase):
try:
ontology_file.seek(0)
except (OSError, AttributeError):
# Some streams may not be seekable; continue with current position.
pass
try:
g.parse(ontology_file, format=format)
except Exception as e:
raise ValueError(f"Failed to parse ontology file: {e}")

triples = []
for s, p, o in g:
triples.append({
"subject": str(s),
"predicate": str(p),
"object": str(o)
})
return triples


# -------------------------------------
# STEP 2: Convert RDF triples to DataPoints
# -------------------------------------
class OntologyEntity(DataPoint):
"""
Represents an ontology resource as a Cognee DataPoint.

`related_to` stores outgoing relationships as tuples of (Edge metadata, target entity).
"""

name: str
uri: str
related_to: List[Tuple[Edge, "OntologyEntity"]] = Field(default_factory=list)
metadata: dict = {"index_fields": ["name"]}


OntologyEntity.model_rebuild()


def _extract_label(uri: str) -> str:
"""Return the local name for a URI (last fragment or path component)."""
if "#" in uri:
return uri.rsplit("#", 1)[-1] or uri
if "/" in uri:
return uri.rstrip("/").rsplit("/", 1)[-1] or uri
return uri


async def ontology_to_datapoints(triples: list[dict]) -> list[DataPoint]:
"""
Converts parsed triples into Cognee DataPoints (entities + relations).
This preserves the ontology's structure as a graph.
"""
entities: Dict[str, OntologyEntity] = {}

for t in triples:
subj = t["subject"]
pred = t["predicate"]
obj = t["object"]

# Create or reuse entities
if subj not in entities:
entities[subj] = OntologyEntity(
id=UUID(hashlib.md5(subj.encode()).hexdigest()),
name=_extract_label(subj),
uri=subj,
)

if obj not in entities:
entities[obj] = OntologyEntity(
id=UUID(hashlib.md5(obj.encode()).hexdigest()),
name=_extract_label(obj),
uri=obj,
)

predicate_label = _extract_label(pred)
edge = Edge(
relationship_type=predicate_label,
properties={"uri": pred},
)
if not any(
existing_edge.relationship_type == predicate_label and target.uri == obj
for existing_edge, target in entities[subj].related_to
):
entities[subj].related_to.append((edge, entities[obj]))
Comment on lines +109 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

CRITICAL: Guard against accessing skipped literal objects.

When an object is a Literal (not a URIRef), the code skips creating an entity (line 107). However, line 118 unconditionally tries to access entities[obj], which will raise a KeyError for skipped literals, crashing the pipeline.

Apply this diff to only create edges for URI objects:

         predicate_label = _extract_label(pred)
         edge = Edge(
             relationship_type=predicate_label,
             properties={"uri": pred},
         )
+        # Only create edges between entity nodes (URIs), not to literals
+        if obj not in entities:
+            continue
+        
         if not any(
             existing_edge.relationship_type == predicate_label and target.uri == obj
             for existing_edge, target in entities[subj].related_to
         ):
             entities[subj].related_to.append((edge, entities[obj]))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if not any(
existing_edge.relationship_type == predicate_label and target.uri == obj
for existing_edge, target in entities[subj].related_to
):
entities[subj].related_to.append((edge, entities[obj]))
predicate_label = _extract_label(pred)
edge = Edge(
relationship_type=predicate_label,
properties={"uri": pred},
)
# Only create edges between entity nodes (URIs), not to literals
if obj not in entities:
continue
if not any(
existing_edge.relationship_type == predicate_label and target.uri == obj
for existing_edge, target in entities[subj].related_to
):
entities[subj].related_to.append((edge, entities[obj]))
🤖 Prompt for AI Agents
In cognee/api/v1/cognify/ontology_graph_pipeline.py around lines 114 to 118, the
code unconditionally accesses entities[obj] when appending an edge which raises
a KeyError for literal objects that were intentionally skipped; change the logic
to only consider and append edges when the object is a URI entity present in
entities (e.g., guard with if obj in entities before performing the any(...)
check and before appending), so edges are only created for URI objects that
exist in the entities mapping.


return list(entities.values())


# -------------------------------------
# STEP 3: Define the custom task function
# -------------------------------------
async def ontology_ingestion_task(inputs: list, format: str):
"""
Custom Cognee Task: Ingest OWL/RDF ontology and store as structured DataPoints.
"""
ontology_file = inputs[0]
triples = await load_ontology_data(ontology_file, format)
datapoints = await ontology_to_datapoints(triples)
await add_data_points(datapoints)
return datapoints


# -------------------------------------
# STEP 4: Build and run the pipeline
# -------------------------------------
async def run_ontology_pipeline(ontology_file: Union[str, bytes, IOBase], format: str = "xml"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Remove the default value for the format parameter.

Based on learnings: The format parameter must be required (not optional) to ensure reliable RDF parsing, as rdflib's auto-detection is unreliable. The current default value of "xml" could lead to parse failures if the actual file format differs.

Apply this diff:

-async def run_ontology_pipeline(ontology_file: Union[str, bytes, IOBase], format: str = "xml"):
+async def run_ontology_pipeline(ontology_file: Union[str, bytes, IOBase], format: str):

Based on learnings

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def run_ontology_pipeline(ontology_file: Union[str, bytes, IOBase], format: str = "xml"):
async def run_ontology_pipeline(ontology_file: Union[str, bytes, IOBase], format: str):
🤖 Prompt for AI Agents
In cognee/api/v1/cognify/ontology_graph_pipeline.py around line 140, the
function signature defines format: str = "xml" which makes format optional;
remove the default so the parameter is required (change to format: str) and then
update any internal usages/call sites and tests to pass an explicit format
string; ensure the docstring/type hints reflect that format is mandatory and run
tests to catch callers that need updating.

import cognee
from cognee.low_level import setup

# Reset state for clean runs
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await setup()

user = await get_default_user()
db_engine = get_relational_engine()

async with db_engine.get_async_session() as session:
dataset = await create_dataset("ontology_dataset", user, session)

# Define your pipeline with the new custom task
tasks = [
Task(ontology_ingestion_task,format=format, task_config={"batch_size": 50}),
]

async for status in run_tasks(tasks, dataset.id, ontology_file, user, "ontology_ingestion_pipeline"):
yield status

Loading