diff --git a/.semversioner/next-release/minor-20250721095433798586.json b/.semversioner/next-release/minor-20250721095433798586.json new file mode 100644 index 0000000000..ddc2398bca --- /dev/null +++ b/.semversioner/next-release/minor-20250721095433798586.json @@ -0,0 +1,4 @@ +{ + "type": "minor", + "description": "Add ElasticSearch vector store support with full compatibility with existing vector store interface" +} diff --git a/graphrag/vector_stores/elasticsearch.py b/graphrag/vector_stores/elasticsearch.py new file mode 100644 index 0000000000..060717ed44 --- /dev/null +++ b/graphrag/vector_stores/elasticsearch.py @@ -0,0 +1,221 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +"""The ElasticSearch vector storage implementation package.""" + +import json +from typing import Any + +import numpy as np +from elasticsearch import Elasticsearch, NotFoundError +from elasticsearch.helpers import bulk + +from graphrag.data_model.types import TextEmbedder +from graphrag.vector_stores.base import ( + BaseVectorStore, + VectorStoreDocument, + VectorStoreSearchResult, +) + + +def _create_index_settings(vector_dim: int) -> dict: + """Create ElasticSearch index settings with dynamic vector dimensions.""" + return { + "settings": { + "number_of_shards": 1, # Single shard for development/local setup + "number_of_replicas": 0, # No replicas for development/local setup + }, + "mappings": { + "properties": { + "id": {"type": "keyword"}, + "text": {"type": "text"}, + "vector": { + "type": "dense_vector", + "dims": vector_dim, + "index": True, + "similarity": "cosine", + }, + "attributes": {"type": "text"}, + } + }, + } + + +class ElasticSearchVectorStore(BaseVectorStore): + """ElasticSearch vector storage implementation.""" + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + + def connect(self, **kwargs: Any) -> None: + """Connect to the vector storage.""" + self.db_connection = Elasticsearch( + hosts=[kwargs.get("url", "http://localhost:9200")] + ) + if self.collection_name and self.db_connection.indices.exists( + index=self.collection_name + ): + pass + + def load_documents( + self, documents: list[VectorStoreDocument], overwrite: bool = True + ) -> None: + """Load documents into vector storage.""" + if self.db_connection is None: + msg = "Must connect to ElasticSearch before loading documents" + raise RuntimeError(msg) + + data = [ + { + "id": document.id, + "text": document.text, + "vector": document.vector, + "attributes": json.dumps(document.attributes), + } + for document in documents + if document.vector is not None # Skip documents without embeddings + ] + + if len(data) == 0: + data = None + + if overwrite: + if self.db_connection.indices.exists(index=self.collection_name): + self.db_connection.indices.delete(index=self.collection_name) + + if data: + vector_dim = len(data[0]["vector"]) + index_settings = _create_index_settings(vector_dim) + + self.db_connection.indices.create( + index=self.collection_name, + body=index_settings, + ) + actions = [ + { + "_index": self.collection_name, + "_id": str(doc["id"]), + "_source": doc, + } + for doc in data + ] + bulk(self.db_connection, actions) + # Force index refresh for immediate searchability (ElasticSearch is near real-time by default) + self.db_connection.indices.refresh(index=self.collection_name) + else: + # Default to OpenAI text-embedding-3-small dimensions + default_settings = _create_index_settings(1536) + self.db_connection.indices.create( + index=self.collection_name, + body=default_settings, + ) + else: + if not self.db_connection.indices.exists(index=self.collection_name): + if data: + vector_dim = len(data[0]["vector"]) + index_settings = _create_index_settings(vector_dim) + else: + # Default to OpenAI text-embedding-3-small dimensions + index_settings = _create_index_settings(1536) + + self.db_connection.indices.create( + index=self.collection_name, + body=index_settings, + ) + if data: + actions = [ + { + "_index": self.collection_name, + "_id": str(doc["id"]), + "_source": doc, + } + for doc in data + ] + bulk(self.db_connection, actions) + # Force index refresh for immediate searchability (ElasticSearch is near real-time by default) + self.db_connection.indices.refresh(index=self.collection_name) + + def filter_by_id(self, include_ids: list[str] | list[int]) -> Any: + """Build a query filter to filter documents by id.""" + if len(include_ids) == 0: + self.query_filter = None + else: + self.query_filter = { + "terms": {"id": [str(doc_id) for doc_id in include_ids]} + } + return self.query_filter + + def similarity_search_by_vector( + self, query_embedding: list[float], k: int = 10, **kwargs: Any + ) -> list[VectorStoreSearchResult]: + """Perform a vector-based similarity search.""" + if self.db_connection is None: + msg = "Must connect to ElasticSearch before searching" + raise RuntimeError(msg) + + query = { + "knn": { + "field": "vector", + "query_vector": query_embedding, + "k": k, + # Search more candidates for better recall in approximate KNN + # 10x multiplier balances accuracy vs performance, capped at 10k for memory limits + "num_candidates": min(k * 10, 10000), + }, + "_source": ["id", "text", "vector", "attributes"], + } + + if self.query_filter: + query["query"] = self.query_filter + + response = self.db_connection.search( + index=self.collection_name, + body=query, + ) + + return [ + VectorStoreSearchResult( + document=VectorStoreDocument( + id=hit["_source"]["id"], + text=hit["_source"]["text"], + vector=hit["_source"]["vector"], + attributes=json.loads(hit["_source"]["attributes"]), + ), + score=hit["_score"], + ) + for hit in response["hits"]["hits"] + ] + + def similarity_search_by_text( + self, text: str, text_embedder: TextEmbedder, k: int = 10, **kwargs: Any + ) -> list[VectorStoreSearchResult]: + """Perform a similarity search using a given input text.""" + query_embedding = text_embedder(text) + if query_embedding is not None: + if isinstance(query_embedding, np.ndarray): + # Convert NumPy array to list for ElasticSearch JSON API compatibility + # Unlike LanceDB which supports NumPy natively, ElasticSearch requires JSON serialization + query_embedding = query_embedding.tolist() + return self.similarity_search_by_vector(query_embedding, k) + return [] + + def search_by_id(self, id: str) -> VectorStoreDocument: + """Search for a document by id.""" + if self.db_connection is None: + msg = "Must connect to ElasticSearch before searching" + raise RuntimeError(msg) + + try: + response = self.db_connection.get( + index=self.collection_name, + id=str(id), + ) + source = response["_source"] + return VectorStoreDocument( + id=source["id"], + text=source["text"], + vector=source["vector"], + attributes=json.loads(source["attributes"]), + ) + except NotFoundError: + return VectorStoreDocument(id=id, text=None, vector=None) # Return null object for consistency diff --git a/graphrag/vector_stores/factory.py b/graphrag/vector_stores/factory.py index d1dd3e42e3..ede71b9157 100644 --- a/graphrag/vector_stores/factory.py +++ b/graphrag/vector_stores/factory.py @@ -9,6 +9,7 @@ from graphrag.vector_stores.azure_ai_search import AzureAISearchVectorStore from graphrag.vector_stores.base import BaseVectorStore from graphrag.vector_stores.cosmosdb import CosmosDBVectorStore +from graphrag.vector_stores.elasticsearch import ElasticSearchVectorStore from graphrag.vector_stores.lancedb import LanceDBVectorStore @@ -18,6 +19,7 @@ class VectorStoreType(str, Enum): LanceDB = "lancedb" AzureAISearch = "azure_ai_search" CosmosDB = "cosmosdb" + ElasticSearch = "elasticsearch" class VectorStoreFactory: @@ -45,6 +47,8 @@ def create_vector_store( return AzureAISearchVectorStore(**kwargs) case VectorStoreType.CosmosDB: return CosmosDBVectorStore(**kwargs) + case VectorStoreType.ElasticSearch: + return ElasticSearchVectorStore(**kwargs) case _: if vector_store_type in cls.vector_store_types: return cls.vector_store_types[vector_store_type](**kwargs) diff --git a/pyproject.toml b/pyproject.toml index 972cfd0141..746a9d4975 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ environs = "^11.0.0" # Vector Stores azure-search-documents = "^11.5.2" +elasticsearch = "^8.0.0" lancedb = "^0.17.0" # Async IO diff --git a/tests/integration/vector_stores/test_elasticsearch.py b/tests/integration/vector_stores/test_elasticsearch.py new file mode 100644 index 0000000000..c2579f0d8f --- /dev/null +++ b/tests/integration/vector_stores/test_elasticsearch.py @@ -0,0 +1,306 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +"""Integration tests for ElasticSearch vector store implementation.""" + +import contextlib + +import numpy as np +import pytest + +from graphrag.vector_stores.base import VectorStoreDocument +from graphrag.vector_stores.elasticsearch import ElasticSearchVectorStore + + +def test_vector_store_operations(): + """Test basic vector store operations with ElasticSearch.""" + vector_store = ElasticSearchVectorStore(collection_name="test_collection") + + # Test connection + try: + vector_store.connect(url="http://localhost:9200") + except Exception: # noqa: BLE001 + pytest.skip("ElasticSearch not available for testing") + + # Test documents + docs = [ + VectorStoreDocument( + id="1", + text="This is document 1", + vector=[0.1, 0.2, 0.3, 0.4, 0.5], + attributes={"title": "Doc 1", "category": "test"}, + ), + VectorStoreDocument( + id="2", + text="This is document 2", + vector=[0.2, 0.3, 0.4, 0.5, 0.6], + attributes={"title": "Doc 2", "category": "test"}, + ), + VectorStoreDocument( + id="3", + text="This is document 3", + vector=[0.3, 0.4, 0.5, 0.6, 0.7], + attributes={"title": "Doc 3", "category": "test"}, + ), + ] + + # Load documents (first 2) + vector_store.load_documents(docs[:2]) + + # Test search by ID + doc = vector_store.search_by_id("1") + assert doc.id == "1" + assert doc.text == "This is document 1" + assert doc.attributes["title"] == "Doc 1" + + # Test vector similarity search + query_vector = [0.15, 0.25, 0.35, 0.45, 0.55] + results = vector_store.similarity_search_by_vector(query_vector, k=2) + + assert len(results) <= 2 + assert all(result.score >= 0 for result in results) + + # Verify results are ordered by score (descending) + scores = [result.score for result in results] + assert scores == sorted(scores, reverse=True) + + # Test filter by ID + filter_query = vector_store.filter_by_id(["1"]) + assert "terms" in filter_query + assert filter_query["terms"]["id"] == ["1"] + + # Test loading additional documents (without overwrite) + vector_store.load_documents([docs[2]], overwrite=False) + + # Test overwrite functionality + vector_store.load_documents(docs, overwrite=True) + + +def test_elasticsearch_dynamic_vector_dimensions(): + """Test ElasticSearch with different vector dimensions (like LanceDB).""" + # Test different vector dimensions like LanceDB supports + test_cases = [ + { + "name": "small_vectors", + "dimension": 3, + "vectors": [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]], + }, + { + "name": "bert_vectors", + "dimension": 768, + "vectors": [[0.1] * 768, [0.2] * 768], + }, + { + "name": "openai_vectors", + "dimension": 1536, + "vectors": [[0.1] * 1536, [0.2] * 1536], + }, + ] + + for test_case in test_cases: + vector_store = ElasticSearchVectorStore( + collection_name=f"test_{test_case['name']}" + ) + + try: + vector_store.connect(url="http://localhost:9200") + except Exception: # noqa: BLE001 + pytest.skip("ElasticSearch not available for testing") + + # Test documents with specific dimensions + docs = [ + VectorStoreDocument( + id=f"{i}", + text=f"Document {i}", + vector=vector, + attributes={"dimension": test_case["dimension"]}, + ) + for i, vector in enumerate(test_case["vectors"]) + ] + + # Should automatically detect vector dimension like LanceDB + vector_store.load_documents(docs) + + # Test search with same dimension + query_vector = [0.15] * test_case["dimension"] + results = vector_store.similarity_search_by_vector(query_vector, k=1) + + assert len(results) <= len(docs) + if results: + assert len(results[0].document.vector) == test_case["dimension"] + + +def test_elasticsearch_with_authentication(): + """Test ElasticSearch with authentication parameters.""" + vector_store = ElasticSearchVectorStore(collection_name="test_auth") + + # Should not raise errors during initialization + assert vector_store.collection_name == "test_auth" + + +def test_elasticsearch_connection_kwargs(): + """Test ElasticSearch with connection kwargs.""" + vector_store = ElasticSearchVectorStore(collection_name="test_kwargs") + + # Test that connection parameters can be passed to connect() + with contextlib.suppress(Exception): + vector_store.connect(url="http://localhost:9200") + + +def test_elasticsearch_error_handling(): + """Test error handling for ElasticSearch operations.""" + vector_store = ElasticSearchVectorStore(collection_name="test_errors") + + # Test operations without connection should raise RuntimeError + with pytest.raises(RuntimeError, match="Must connect to ElasticSearch"): + vector_store.load_documents([]) + + with pytest.raises(RuntimeError, match="Must connect to ElasticSearch"): + vector_store.similarity_search_by_vector([0.1, 0.2, 0.3]) + + with pytest.raises(RuntimeError, match="Must connect to ElasticSearch"): + vector_store.search_by_id("test") + + +def test_elasticsearch_empty_documents(): + """Test handling of empty document lists.""" + vector_store = ElasticSearchVectorStore(collection_name="test_empty") + + try: + vector_store.connect(url="http://localhost:9200") + except Exception: # noqa: BLE001 + pytest.skip("ElasticSearch not available for testing") + + # Loading empty list should create index with default dimension (1536) + vector_store.load_documents([]) + + +def test_elasticsearch_documents_without_vectors(): + """Test handling of documents without vectors.""" + vector_store = ElasticSearchVectorStore(collection_name="test_no_vectors") + + try: + vector_store.connect(url="http://localhost:9200") + except Exception: # noqa: BLE001 + pytest.skip("ElasticSearch not available for testing") + + # Documents without vectors should be filtered out + docs = [ + VectorStoreDocument( + id="1", + text="Document without vector", + vector=None, + attributes={}, + ), + VectorStoreDocument( + id="2", + text="Document with vector", + vector=[0.1, 0.2, 0.3], + attributes={}, + ), + ] + + vector_store.load_documents(docs) + + +def test_elasticsearch_text_embedder_search(): + """Test text-based similarity search with embedder function.""" + vector_store = ElasticSearchVectorStore(collection_name="test_text_search") + + try: + vector_store.connect(url="http://localhost:9200") + except Exception: # noqa: BLE001 + pytest.skip("ElasticSearch not available for testing") + + # Mock text embedder function + def mock_embedder(text: str) -> list[float]: + """Mock embedder that returns simple vector based on text length.""" + text_length = len(text) + return [float(text_length % 10) / 10] * 5 + + # Test empty embedder response + def empty_embedder(text: str) -> None: + """Mock embedder that returns None.""" + return + + # Load test documents + docs = [ + VectorStoreDocument( + id="1", + text="Short text", + vector=[0.1, 0.1, 0.1, 0.1, 0.1], + attributes={"type": "short"}, + ), + ] + + vector_store.load_documents(docs) + + # Test with empty embedder (should return empty list like LanceDB) + results = vector_store.similarity_search_by_text("test query", empty_embedder, k=2) + assert len(results) == 0 + + # Test text similarity search + results = vector_store.similarity_search_by_text( + "Medium length query text", mock_embedder, k=2 + ) + + assert len(results) <= 2 + assert all(result.score >= 0 for result in results) + + +def test_elasticsearch_numpy_vector_handling(): + """Test handling of numpy arrays as vectors.""" + vector_store = ElasticSearchVectorStore(collection_name="test_numpy") + + # Mock text embedder that returns numpy array + def numpy_embedder(text: str) -> np.ndarray: + """Mock embedder that returns numpy array.""" + return np.array([0.1, 0.2, 0.3, 0.4, 0.5]) + + try: + vector_store.connect(url="http://localhost:9200") + except Exception: # noqa: BLE001 + pytest.skip("ElasticSearch not available for testing") + + # Load test document + docs = [ + VectorStoreDocument( + id="1", + text="Test document", + vector=[0.1, 0.2, 0.3, 0.4, 0.5], + attributes={}, + ), + ] + + vector_store.load_documents(docs) + + # Test with numpy array - should handle conversion automatically + results = vector_store.similarity_search_by_text("test query", numpy_embedder, k=1) + + assert len(results) == 1 + + +def test_elasticsearch_search_by_id_not_found(): + """Test search_by_id when document is not found.""" + vector_store = ElasticSearchVectorStore(collection_name="test_not_found") + + try: + vector_store.connect(url="http://localhost:9200") + except Exception: # noqa: BLE001 + pytest.skip("ElasticSearch not available for testing") + + # Search for non-existent document should return empty document like LanceDB + doc = vector_store.search_by_id("nonexistent") + assert doc.id == "nonexistent" + assert doc.text is None + assert doc.vector is None + + +def test_elasticsearch_filter_by_id_empty(): + """Test filter_by_id with empty list.""" + vector_store = ElasticSearchVectorStore(collection_name="test_filter_empty") + + # Empty list should set query_filter to None + result = vector_store.filter_by_id([]) + assert result is None + assert vector_store.query_filter is None