diff --git a/packages/mongodb-rag-core/src/contentStore/EmbeddedContent.ts b/packages/mongodb-rag-core/src/contentStore/EmbeddedContent.ts index 75ff32660..265fb7621 100644 --- a/packages/mongodb-rag-core/src/contentStore/EmbeddedContent.ts +++ b/packages/mongodb-rag-core/src/contentStore/EmbeddedContent.ts @@ -75,6 +75,14 @@ export type DeleteEmbeddedContentArgs = { inverseDataSources?: boolean; }; +export interface GetSourcesMatchParams { + sourceNames?: string[]; + chunkAlgoHash: { + hashValue: string; + operation: "equals" | "notEquals"; + }; +} + /** Data store of the embedded content. */ @@ -114,4 +122,9 @@ export type EmbeddedContentStore = VectorStore & { Initialize the store. */ init?: () => Promise; + + /** + Get the names of ingested data sources that match the given query. + */ + getDataSources(matchQuery: GetSourcesMatchParams): Promise; }; diff --git a/packages/mongodb-rag-core/src/contentStore/MongoDbEmbeddedContentStore.ts b/packages/mongodb-rag-core/src/contentStore/MongoDbEmbeddedContentStore.ts index bcd3a1470..198c1ae5f 100644 --- a/packages/mongodb-rag-core/src/contentStore/MongoDbEmbeddedContentStore.ts +++ b/packages/mongodb-rag-core/src/contentStore/MongoDbEmbeddedContentStore.ts @@ -1,6 +1,10 @@ -import { pageIdentity } from "."; +import { pageIdentity, PersistedPage } from "."; import { DatabaseConnection } from "../DatabaseConnection"; -import { EmbeddedContent, EmbeddedContentStore } from "./EmbeddedContent"; +import { + EmbeddedContent, + EmbeddedContentStore, + GetSourcesMatchParams, +} from "./EmbeddedContent"; import { FindNearestNeighborsOptions, WithScore } from "../VectorStore"; import { MakeMongoDbDatabaseConnectionParams, @@ -58,6 +62,21 @@ export type MongoDbEmbeddedContentStore = EmbeddedContentStore & init(): Promise; }; +function makeMatchQuery({ sourceNames, chunkAlgoHash }: GetSourcesMatchParams) { + const operator = chunkAlgoHash.operation === "equals" ? "$eq" : "$ne"; + return { + chunkAlgoHash: { [operator]: chunkAlgoHash.hashValue }, + // run on specific source names if specified, run on all if not + ...(sourceNames + ? { + sourceName: { + $in: sourceNames, + }, + } + : undefined), + }; +} + export function makeMongoDbEmbeddedContentStore({ connectionUri, databaseName, @@ -232,5 +251,22 @@ export function makeMongoDbEmbeddedContentStore({ } } }, + + async getDataSources(matchQuery: GetSourcesMatchParams): Promise { + const result = await embeddedContentCollection + .aggregate([ + { $match: makeMatchQuery(matchQuery) }, + { + $group: { + _id: null, + uniqueSources: { $addToSet: "$sourceName" }, + }, + }, + { $project: { _id: 0, uniqueSources: 1 } }, + ]) + .toArray(); + const uniqueSources = result.length > 0 ? result[0].uniqueSources : []; + return uniqueSources; + }, }; } diff --git a/packages/mongodb-rag-core/src/contentStore/updateEmbeddedContent.test.ts b/packages/mongodb-rag-core/src/contentStore/updateEmbeddedContent.test.ts index 03d29c936..9ede0543f 100644 --- a/packages/mongodb-rag-core/src/contentStore/updateEmbeddedContent.test.ts +++ b/packages/mongodb-rag-core/src/contentStore/updateEmbeddedContent.test.ts @@ -2,12 +2,27 @@ import { updateEmbeddedContent, updateEmbeddedContentForPage, } from "./updateEmbeddedContent"; -import { persistPages } from "."; +import { + makeMongoDbEmbeddedContentStore, + makeMongoDbPageStore, + MongoDbEmbeddedContentStore, + MongoDbPageStore, + persistPages, + updatePages, +} from "."; import { makeMockPageStore } from "../test/MockPageStore"; import * as chunkPageModule from "../chunk/chunkPage"; -import { EmbeddedContentStore, EmbeddedContent } from "./EmbeddedContent"; +import { + EmbeddedContentStore, + EmbeddedContent, + GetSourcesMatchParams, +} from "./EmbeddedContent"; import { Embedder } from "../embed"; import { Page, PersistedPage } from "."; +import { strict as assert } from "assert"; +import { MongoMemoryReplSet } from "mongodb-memory-server"; +import { DataSource } from "../dataSources"; +import { MongoClient } from "mongodb"; export const makeMockEmbeddedContentStore = (): EmbeddedContentStore => { const content: Map = new Map(); @@ -29,6 +44,9 @@ export const makeMockEmbeddedContentStore = (): EmbeddedContentStore => { metadata: { embeddingName: "test", }, + async getDataSources(matchQuery: GetSourcesMatchParams): Promise { + return []; + }, }; }; @@ -49,6 +67,7 @@ const embedder = { }, }; +// TODO: deprecate mock store and use mongodb-memory-server instead. describe("updateEmbeddedContent", () => { it("deletes embedded content for deleted page", async () => { const pageStore = makeMockPageStore(); @@ -207,6 +226,7 @@ describe("updateEmbeddedContent", () => { store: embeddedContentStore, page, concurrencyOptions: { createChunks: 2 }, + chunkAlgoHash: "testchunkalgohash", }); const embeddedContent = await embeddedContentStore.loadEmbeddedContent({ @@ -276,3 +296,231 @@ describe("updateEmbeddedContent", () => { }); }); }); + +// These tests use "mongodb-memory-server", not mockEmbeddedContentStore +describe("updateEmbeddedContent updates chunks based on changes to copy or changes to the chunk algo", () => { + let mongod: MongoMemoryReplSet | undefined; + let pageStore: MongoDbPageStore; + let embedStore: MongoDbEmbeddedContentStore; + let uri: string; + let databaseName: string; + let mongoClient: MongoClient; + let page1Embedding: EmbeddedContent[], page2Embedding: EmbeddedContent[]; + let pages: PersistedPage[] = []; + + const embedder = { + async embed() { + return { embedding: [1, 2, 3] }; + }, + }; + const mockDataSources: DataSource[] = [ + { + name: "source1", + fetchPages: async () => [ + { + url: "test1.com", + format: "html", + sourceName: "source1", + body: "hello source 1", + }, + ], + }, + { + name: "source2", + fetchPages: async () => [ + { + url: "test2.com", + format: "html", + sourceName: "source2", + body: "hello source 2", + }, + ], + }, + ]; + const mockDataSourceNames = mockDataSources.map( + (dataSource) => dataSource.name + ); + beforeAll(async () => { + mongod = await MongoMemoryReplSet.create(); + uri = mongod.getUri(); + mongoClient = new MongoClient(uri); + await mongoClient.connect(); + }); + beforeEach(async () => { + // setup mongo client, page store, and embedded content store + databaseName = "test-all-command"; + embedStore = makeMongoDbEmbeddedContentStore({ + connectionUri: uri, + databaseName, + searchIndex: { embeddingName: "test-embedding" }, + }); + pageStore = makeMongoDbPageStore({ + connectionUri: uri, + databaseName, + }); + // create pages and verify that they have been created + await updatePages({ sources: mockDataSources, pageStore }); + pages = await pageStore.loadPages(); + assert(pages.length == 2); + // create embeddings for the pages and verify that they have been created + await updateEmbeddedContent({ + since: new Date(0), + embeddedContentStore: embedStore, + pageStore, + sourceNames: mockDataSourceNames, + embedder, + }); + page1Embedding = await embedStore.loadEmbeddedContent({ + page: pages[0], + }); + page2Embedding = await embedStore.loadEmbeddedContent({ + page: pages[1], + }); + assert(page1Embedding.length); + assert(page2Embedding.length); + }); + + afterEach(async () => { + await pageStore?.drop(); + await embedStore?.drop(); + }); + afterAll(async () => { + await pageStore?.close(); + await embedStore?.close(); + await mongoClient?.close(); + await mongod?.stop(); + }); + + it("should update embedded content only for pages that have been updated (copy change) after the 'since' date provided", async () => { + // Modify dates of pages and embedded content for testing + const sinceDate = new Date("2024-01-01"); + const beforeSinceDate = new Date("2023-01-01"); + const afterSinceDate = new Date("2025-01-01"); + // set pages[0] to be last updated before sinceDate (should not be modified) + await mongoClient + .db(databaseName) + .collection("pages") + .updateOne({ ...pages[0] }, { $set: { updated: beforeSinceDate } }); + await mongoClient + .db(databaseName) + .collection("embedded_content") + .updateOne( + { sourceName: mockDataSourceNames[0] }, + { $set: { updated: beforeSinceDate } } + ); + // set pages[1] to be last updated after sinceDate (should be re-chunked) + await mongoClient + .db(databaseName) + .collection("pages") + .updateOne({ ...pages[1] }, { $set: { updated: afterSinceDate } }); + await mongoClient + .db(databaseName) + .collection("embedded_content") + .updateOne( + { sourceName: mockDataSourceNames[1] }, + { $set: { updated: afterSinceDate } } + ); + const originalPage1Embedding = await embedStore.loadEmbeddedContent({ + page: pages[0], + }); + const originalPage2Embedding = await embedStore.loadEmbeddedContent({ + page: pages[1], + }); + await updateEmbeddedContent({ + since: sinceDate, + embeddedContentStore: embedStore, + pageStore, + sourceNames: mockDataSourceNames, + embedder, + }); + const updatedPage1Embedding = await embedStore.loadEmbeddedContent({ + page: pages[0], + }); + const updatedPage2Embedding = await embedStore.loadEmbeddedContent({ + page: pages[1], + }); + assert(updatedPage1Embedding.length); + assert(updatedPage2Embedding.length); + expect(updatedPage1Embedding[0].updated.getTime()).toBe( + originalPage1Embedding[0].updated.getTime() + ); + expect(updatedPage2Embedding[0].updated.getTime()).not.toBe( + originalPage2Embedding[0].updated.getTime() + ); + }); + it("should update embedded content when only chunk algo has changed", async () => { + // change the chunking algo for the second page, but not the first + await updateEmbeddedContent({ + since: new Date(), + embeddedContentStore: embedStore, + pageStore, + sourceNames: [mockDataSourceNames[0]], + embedder, + }); + await updateEmbeddedContent({ + since: new Date(), + embeddedContentStore: embedStore, + pageStore, + sourceNames: [mockDataSourceNames[1]], + embedder, + chunkOptions: { chunkOverlap: 2 }, + }); + const updatedPage1Embedding = await embedStore.loadEmbeddedContent({ + page: pages[0], + }); + const updatedPage2Embedding = await embedStore.loadEmbeddedContent({ + page: pages[1], + }); + assert(updatedPage1Embedding.length); + assert(updatedPage2Embedding.length); + expect(updatedPage1Embedding[0].chunkAlgoHash).toBe( + page1Embedding[0].chunkAlgoHash + ); + expect(updatedPage2Embedding[0].chunkAlgoHash).not.toBe( + page2Embedding[0].chunkAlgoHash + ); + }); + it("should update embedded content when either chunk algo has changed or copy has changed", async () => { + // SETUP: Modify dates of pages and embedded content for this test case + const sinceDate = new Date("2024-01-01"); + const afterSinceDate = new Date("2025-01-01"); + await mongoClient + .db(databaseName) + .collection("pages") + .updateOne({ ...pages[0] }, { $set: { updated: afterSinceDate } }); + await mongoClient + .db(databaseName) + .collection("embedded_content") + .updateOne( + { sourceName: mockDataSourceNames[0] }, + { $set: { updated: afterSinceDate } } + ); + const originalPage1Embedding = await embedStore.loadEmbeddedContent({ + page: pages[0], + }); + // END SETUP + await updateEmbeddedContent({ + since: sinceDate, + embeddedContentStore: embedStore, + pageStore, + sourceNames: mockDataSourceNames, + embedder, + chunkOptions: { chunkOverlap: 2 }, + }); + const updatedPage1Embedding = await embedStore.loadEmbeddedContent({ + page: pages[0], + }); + const updatedPage2Embedding = await embedStore.loadEmbeddedContent({ + page: pages[1], + }); + assert(updatedPage1Embedding.length); + assert(updatedPage2Embedding.length); + // both pages should be updated + expect(updatedPage1Embedding[0].chunkAlgoHash).not.toBe( + originalPage1Embedding[0].chunkAlgoHash + ); + expect(updatedPage2Embedding[0].chunkAlgoHash).not.toBe( + page2Embedding[0].chunkAlgoHash + ); + }); +}); diff --git a/packages/mongodb-rag-core/src/contentStore/updateEmbeddedContent.ts b/packages/mongodb-rag-core/src/contentStore/updateEmbeddedContent.ts index 801ddf9a4..961d6f1ab 100644 --- a/packages/mongodb-rag-core/src/contentStore/updateEmbeddedContent.ts +++ b/packages/mongodb-rag-core/src/contentStore/updateEmbeddedContent.ts @@ -18,9 +18,42 @@ export interface EmbedConcurrencyOptions { createChunks?: number; } +const getHashForFunc = ( + f: ChunkFunc, + o?: Partial +): string => { + const data = JSON.stringify(o ?? {}) + f.toString(); + const hash = createHash("sha256"); + hash.update(data); + const digest = hash.digest("hex"); + return digest; +}; + /** - (Re-)embeddedContent the pages in the page store that have changed since the given date - and stores the embeddedContent in the embeddedContent store. + Updates embedded content for pages that have changed since a given date or have different chunking hashes. + + @param options - The configuration options for updating embedded content + @param options.since - The date from which to check for content changes + @param options.embeddedContentStore - The store containing embedded content + @param options.pageStore - The store containing pages + @param options.sourceNames - Optional array of source names to filter pages by + @param options.embedder - The embedder instance used to generate embeddings + @param options.chunkOptions - Optional configuration for chunking algorithm + @param options.concurrencyOptions - Optional configuration for controlling concurrency during embedding + + @remarks + The function performs the following steps: + 1. Finds pages with content changes since the specified date + 2. Identifies pages with different chunking hashes from the current algorithm + 3. Processes both sets of pages by either: + - Deleting embedded content for deleted pages + - Updating embedded content for created/updated pages + + Processing is done with configurable concurrency to manage system resources. + + @throws Will throw an error if there are issues accessing stores or processing pages + + @returns Promise that resolves when all updates are complete */ export const updateEmbeddedContent = async ({ since, @@ -39,15 +72,45 @@ export const updateEmbeddedContent = async ({ sourceNames?: string[]; concurrencyOptions?: EmbedConcurrencyOptions; }): Promise => { - const changedPages = await pageStore.loadPages({ + // find all pages with changed content + const pagesWithChangedContent = await pageStore.loadPages({ updated: since, sources: sourceNames, }); logger.info( - `Found ${changedPages.length} changed pages since ${since}${ + `Found ${pagesWithChangedContent.length} changed pages since ${since}${ sourceNames ? ` in sources: ${sourceNames.join(", ")}` : "" }` ); + // find all pages with embeddings created using chunkingHashes different from the current chunkingHash + const chunkAlgoHash = getHashForFunc( + chunkPage, + chunkOptions + ); + const dataSourcesWithChangedChunking = + await embeddedContentStore.getDataSources({ + chunkAlgoHash: { + hashValue: chunkAlgoHash, + operation: "notEquals", + }, + sourceNames, + }); + // find all pages with changed chunking, ignoring since date because + // we want to re-chunk all pages with the new chunkAlgoHash, even if there were no other changes to the page + const pagesWithChangedChunking = await pageStore.loadPages({ + sources: dataSourcesWithChangedChunking, + }); + logger.info( + `Found ${ + pagesWithChangedChunking.length + } pages with changed chunkingHashes ${ + sourceNames ? ` in sources: ${sourceNames.join(", ")}` : "" + }` + ); + const changedPages = [ + ...pagesWithChangedContent, + ...pagesWithChangedChunking, + ]; await PromisePool.withConcurrency(concurrencyOptions?.processPages ?? 1) .for(changedPages) .process(async (page) => { @@ -68,38 +131,26 @@ export const updateEmbeddedContent = async ({ chunkOptions, embedder, concurrencyOptions, + chunkAlgoHash, }); } }); }; -const chunkAlgoHashes = new Map(); - -const getHashForFunc = (f: ChunkFunc, o?: Partial): string => { - const data = JSON.stringify(o ?? {}) + f.toString(); - const existingHash = chunkAlgoHashes.get(data); - if (existingHash) { - return existingHash; - } - const hash = createHash("sha256"); - hash.update(data); - const digest = hash.digest("hex"); - chunkAlgoHashes.set(data, digest); - return digest; -}; - export const updateEmbeddedContentForPage = async ({ page, store, embedder, chunkOptions, concurrencyOptions, + chunkAlgoHash, }: { page: PersistedPage; store: EmbeddedContentStore; embedder: Embedder; chunkOptions?: Partial; concurrencyOptions?: EmbedConcurrencyOptions; + chunkAlgoHash: string; }): Promise => { const contentChunks = await chunkPage(page, chunkOptions); @@ -122,7 +173,6 @@ export const updateEmbeddedContentForPage = async ({ const existingContent = await store.loadEmbeddedContent({ page, }); - const chunkAlgoHash = getHashForFunc(chunkPage, chunkOptions); if ( existingContent.length && existingContent[0].updated > page.updated && diff --git a/packages/mongodb-rag-core/src/findContent/BoostOnAtlasSearchFilter.test.ts b/packages/mongodb-rag-core/src/findContent/BoostOnAtlasSearchFilter.test.ts index 96e5f520f..24454bc6a 100644 --- a/packages/mongodb-rag-core/src/findContent/BoostOnAtlasSearchFilter.test.ts +++ b/packages/mongodb-rag-core/src/findContent/BoostOnAtlasSearchFilter.test.ts @@ -69,6 +69,9 @@ describe("makeBoostOnAtlasSearchFilter()", () => { async findNearestNeighbors() { return mockBoostedResults; }, + async getDataSources(matchQuery) { + return []; + }, metadata: { embeddingName, },