Skip to content
Closed
Changes from 6 commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions cognee/api/v1/cognify/ontology_graph_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import asyncio
from typing import List, Union, Optional
from rdflib import Graph
from uuid import uuid4
from datetime import datetime, timezone

from cognee.modules.pipelines import run_tasks
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.methods import get_default_user
from cognee.shared.data_models import KnowledgeGraph
from cognee.tasks.storage import add_data_points
from cognee.tasks.graph import extract_graph_from_data
from cognee.modules.data.methods import create_dataset
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.chunking.models.DocumentChunk import DocumentChunk
from cognee.modules.data.processing.document_types import Document
from io import IOBase

# ---------- Step 1: Load Ontology (from file object) ----------
async def load_ontology_data(ontology_file: Union[str, bytes, "IOBase"],format: str) -> list[dict]:
"""
Loads OWL/RDF ontology directly from a file-like object and extracts RDF triples.

Args:
ontology_file: File-like object or path to RDF data
format: RDF serialization format (xml, turtle, n3, json-ld, etc.).
If None, rdflib will attempt auto-detection.
"""
g = Graph()
try:
g.parse(ontology_file, format=format)
except Exception as e:
raise ValueError(f"Failed to parse ontology file: {str(e)}") from e
triples = []
for s, p, o in g:
triple = {
"subject": str(s),
"predicate": str(p),
"object": str(o),
"object_type": type(o).__name__, # 'URIRef', 'Literal', 'BNode'
}
if hasattr(o, 'datatype') and o.datatype:
triple["object_datatype"] = str(o.datatype)
if hasattr(o, 'language') and o.language:
triple["object_language"] = o.language
triples.append(triple)
return triples


# ---------- Step 2: Convert Triples into Chunks ----------
def convert_triples_to_chunks(triples: list[dict], format: str = "xml") -> list[DocumentChunk]:
"""
Convert ontology triples into Cognee-compatible DocumentChunk objects.
"""

# Map RDF formats to MIME types
mime_types = {
"xml": "application/rdf+xml",
"turtle": "text/turtle",
"n3": "text/n3",
"nt": "application/n-triples",
"json-ld": "application/ld+json",
}
chunks = []

# Minimal valid Document (from your class)
ontology_doc = Document(
id=uuid4(),
name="in_memory_ontology.owl",
raw_data_location="in_memory_source",
external_metadata=None,
mime_type=mime_types.get(format, "application/rdf+xml")
)

for i, t in enumerate(triples):
text = f"{t['subject']} {t['predicate']} {t['object']}"
chunk = DocumentChunk(
id=uuid4(),
text=text,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Placing triplets as plain text won't take the desired effect. The relations(Egdes) between the entities will be lost. We need to keep the structure using proper DataPoints. To do that we need to extract the proper types.
It's not an easy task. Please take a look at the example in Cognee-starter(custom-model).

chunk_size=len(text.split()),
chunk_index=i,
cut_type="triple",
is_part_of=ontology_doc,
metadata={"triple": t, "index_fields": ["text"]}
)
chunks.append(chunk)
return chunks


# ---------- Step 3: Run Ontology Pipeline ----------
async def run_ontology_pipeline(ontology_file,format: str, dataset_name: str = "ontology_dataset"):
"""
Run the ontology ingestion pipeline directly from a file object (no file path).
"""
from cognee.low_level import setup
import cognee

await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await setup()
Copy link
Contributor

@coderabbitai coderabbitai bot Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Remove destructive prune operations or make them optional.

The prune_data() and prune_system() calls delete all existing data and system metadata without warning. This is unexpected behavior for a data ingestion pipeline and could result in data loss if a user runs this on a populated instance.

Either:

  1. Remove these calls entirely and let users manage cleanup separately, or
  2. Make them opt-in via a parameter:
-async def run_ontology_pipeline(ontology_file, dataset_name: str = "ontology_dataset"):
+async def run_ontology_pipeline(
+    ontology_file, 
+    dataset_name: str = "ontology_dataset",
+    clear_existing: bool = False
+):
     """
     Run the ontology ingestion pipeline directly from a file object (no file path).
+    
+    Args:
+        ontology_file: File-like object containing RDF data
+        dataset_name: Name for the dataset
+        clear_existing: If True, prune existing data and system metadata before ingestion
     """
     from cognee.low_level import setup
     import cognee
 
-    await cognee.prune.prune_data()
-    await cognee.prune.prune_system(metadata=True)
+    if clear_existing:
+        await cognee.prune.prune_data()
+        await cognee.prune.prune_system(metadata=True)
     await setup()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from cognee.low_level import setup
import cognee
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await setup()
async def run_ontology_pipeline(
ontology_file,
dataset_name: str = "ontology_dataset",
clear_existing: bool = False
):
"""
Run the ontology ingestion pipeline directly from a file object (no file path).
Args:
ontology_file: File-like object containing RDF data
dataset_name: Name for the dataset
clear_existing: If True, prune existing data and system metadata before ingestion
"""
from cognee.low_level import setup
import cognee
if clear_existing:
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await setup()
🤖 Prompt for AI Agents
In cognee/api/v1/cognify/ontology_graph_pipeline.py around lines 69 to 74, the
code calls cognee.prune.prune_data() and
cognee.prune.prune_system(metadata=True) which perform destructive deletions;
remove these calls or make them opt‑in by adding a parameter (e.g.,
allow_prune=False) to the surrounding function or entry point, wrap the prune
calls in an if allow_prune: block, add a clear warning log before performing
pruning, and ensure the default behavior is non‑destructive (no pruning) so
users must explicitly enable pruning to run those operations.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followed the same pattern of the code_graph_pipeline code. Would love to get some input from the reviewer, Before moving ahead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find the code_graph_pipeline file
fd -t f "code_graph_pipeline"

# Search for files containing code_graph_pipeline
rg -l "code_graph_pipeline"

Length of output: 210


🏁 Script executed:

#!/bin/bash
# Examine the code_graph_pipeline.py file, particularly looking for prune operations
cat cognee/api/v1/cognify/code_graph_pipeline.py

Length of output: 4106


@Vattikuti-Manideep-Sitaram, you're absolutely right! I can confirm that code_graph_pipeline.py has the exact same pattern:

await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await setup()

This appears to be an established pattern for pipeline operations in the codebase. My concern about the destructive nature of these operations applies equally to both pipelines.

Given that you've followed the existing convention, here are some thoughts for the reviewers/maintainers:

  1. If this is intentional behavior: Consider adding clear documentation in both pipeline functions warning users that running these pipelines will clear all existing data and system metadata. For example:
async def run_ontology_pipeline(ontology_file, dataset_name: str = "ontology_dataset"):
    """
    Run the ontology ingestion pipeline directly from a file object.
    
    ⚠️ WARNING: This pipeline clears all existing data and system metadata 
    before processing. Ensure you have backups if needed.
    
    Args:
        ontology_file: File-like object containing RDF data
        dataset_name: Name for the dataset
    """
  1. Alternative approach: If the prune operations are meant for testing/example purposes, consider extracting them to a setup utility that both pipelines can optionally call, or making it configurable across all pipelines consistently.

For the current PR: Since you're following the established pattern, this implementation is consistent with the codebase. However, I'd recommend waiting for maintainer input on whether they want to:

  • Keep the current pattern as-is with better documentation
  • Refactor both pipelines to make pruning optional
  • Keep it as-is without changes

What are your thoughts, reviewers?


✏️ Learnings added
Learnt from: Vattikuti-Manideep-Sitaram
PR: topoteretes/cognee#1529
File: cognee/api/v1/cognify/ontology_graph_pipeline.py:69-74
Timestamp: 2025-10-11T04:18:24.585Z
Learning: The code_graph_pipeline.py and ontology_graph_pipeline.py both follow an established pattern of calling cognee.prune.prune_data() and cognee.prune.prune_system(metadata=True) at the start of pipeline execution. This appears to be intentional behavior for pipeline operations in the cognee codebase.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


user = await get_default_user()
db_engine = get_relational_engine()

async with db_engine.get_async_session() as session:
dataset = await create_dataset(dataset_name, user, session)

# ✅ Process ontology file directly
triples = await load_ontology_data(ontology_file,format)
chunks = convert_triples_to_chunks(triples)

# Define pipeline tasks
tasks = [
Task(
extract_graph_from_data,
graph_model=KnowledgeGraph,
task_config={"batch_size": 20},
),
Task(add_data_points, task_config={"batch_size": 20}),
]

# Run tasks with chunks
async for run_status in run_tasks(tasks, dataset.id, chunks, user, "ontology_pipeline"):
yield run_status
Loading