Skip to content

(EAI-152) check for change to chunkAlgoHash when updating embeddings #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion packages/mongodb-rag-core/src/contentStore/EmbeddedContent.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Page } from ".";
import { Page, PersistedPage } from ".";
import { VectorStore } from "../VectorStore";

/**
Expand Down Expand Up @@ -75,6 +75,14 @@ export type DeleteEmbeddedContentArgs = {
inverseDataSources?: boolean;
};

export interface GetSourcesMatchParams {
sourceNames?: string[];
chunkAlgoHash: {
hashValue: string;
operation: "equals" | "notEquals";
};
}

/**
Data store of the embedded content.
*/
Expand Down Expand Up @@ -114,4 +122,9 @@ export type EmbeddedContentStore = VectorStore<EmbeddedContent> & {
Initialize the store.
*/
init?: () => Promise<void>;

/**
Get the data sources that match the given query.
*/
getDataSources(matchQuery: GetSourcesMatchParams): Promise<string[]>;
};
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { pageIdentity } from ".";
import { pageIdentity, PersistedPage } from ".";
import { DatabaseConnection } from "../DatabaseConnection";
import { EmbeddedContent, EmbeddedContentStore } from "./EmbeddedContent";
import {
EmbeddedContent,
EmbeddedContentStore,
GetSourcesMatchParams,
} from "./EmbeddedContent";
import { FindNearestNeighborsOptions, WithScore } from "../VectorStore";
import {
MakeMongoDbDatabaseConnectionParams,
Expand Down Expand Up @@ -58,6 +62,27 @@ export type MongoDbEmbeddedContentStore = EmbeddedContentStore &
init(): Promise<void>;
};

function makeMatchQuery({ sourceNames, chunkAlgoHash }: GetSourcesMatchParams) {
let operator = "";
if (chunkAlgoHash.operation === "equals") {
operator = "$eq";
}
if (chunkAlgoHash.operation === "notEquals") {
operator = "$ne";
}
return {
chunkAlgoHash: { [operator]: chunkAlgoHash.hashValue },
// run on specific source names if specified, run on all if not
...(sourceNames
? {
sourceName: {
$in: sourceNames,
},
}
: undefined),
};
}

export function makeMongoDbEmbeddedContentStore({
connectionUri,
databaseName,
Expand Down Expand Up @@ -232,5 +257,22 @@ export function makeMongoDbEmbeddedContentStore({
}
}
},

async getDataSources(matchQuery: GetSourcesMatchParams): Promise<string[]> {
const result = await embeddedContentCollection
.aggregate([
{ $match: makeMatchQuery(matchQuery) },
{
$group: {
_id: null,
uniqueSources: { $addToSet: "$sourceName" },
},
},
{ $project: { _id: 0, uniqueSources: 1 } },
])
.toArray();
const uniqueSources = result.length > 0 ? result[0].uniqueSources : [];
return uniqueSources;
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,27 @@ import {
updateEmbeddedContent,
updateEmbeddedContentForPage,
} from "./updateEmbeddedContent";
import { persistPages } from ".";
import {
makeMongoDbEmbeddedContentStore,
makeMongoDbPageStore,
MongoDbEmbeddedContentStore,
MongoDbPageStore,
persistPages,
updatePages,
} from ".";
import { makeMockPageStore } from "../test/MockPageStore";
import * as chunkPageModule from "../chunk/chunkPage";
import { EmbeddedContentStore, EmbeddedContent } from "./EmbeddedContent";
import {
EmbeddedContentStore,
EmbeddedContent,
GetSourcesMatchParams,
} from "./EmbeddedContent";
import { Embedder } from "../embed";
import { Page, PersistedPage } from ".";
import { strict as assert } from "assert";
import { MongoMemoryReplSet } from "mongodb-memory-server";
import { DataSource } from "../dataSources";
import { MongoClient } from "mongodb";

export const makeMockEmbeddedContentStore = (): EmbeddedContentStore => {
const content: Map<string /* page url */, EmbeddedContent[]> = new Map();
Expand All @@ -29,6 +44,9 @@ export const makeMockEmbeddedContentStore = (): EmbeddedContentStore => {
metadata: {
embeddingName: "test",
},
async getDataSources(matchQuery: GetSourcesMatchParams): Promise<string[]> {
return [];
},
};
};

Expand Down Expand Up @@ -207,6 +225,7 @@ describe("updateEmbeddedContent", () => {
store: embeddedContentStore,
page,
concurrencyOptions: { createChunks: 2 },
chunkAlgoHash: "testchunkalgohash",
});

const embeddedContent = await embeddedContentStore.loadEmbeddedContent({
Expand Down Expand Up @@ -276,3 +295,231 @@ describe("updateEmbeddedContent", () => {
});
});
});

// These tests use "mongodb-memory-server", not mockEmbeddedContentStore
describe("updateEmbeddedContent", () => {
let mongod: MongoMemoryReplSet | undefined;
let pageStore: MongoDbPageStore;
let embedStore: MongoDbEmbeddedContentStore;
let uri: string;
let databaseName: string;
let mongoClient: MongoClient;
let page1Embedding: EmbeddedContent[], page2Embedding: EmbeddedContent[];
let pages: PersistedPage[] = [];

const embedder = {
async embed() {
return { embedding: [1, 2, 3] };
},
};
const mockDataSources: DataSource[] = [
{
name: "source1",
fetchPages: async () => [
{
url: "test1.com",
format: "html",
sourceName: "source1",
body: "hello source 1",
},
],
},
{
name: "source2",
fetchPages: async () => [
{
url: "test2.com",
format: "html",
sourceName: "source2",
body: "hello source 2",
},
],
},
];
const mockDataSourceNames = mockDataSources.map(
(dataSource) => dataSource.name
);
beforeAll(async () => {
mongod = await MongoMemoryReplSet.create();
uri = mongod.getUri();
mongoClient = new MongoClient(uri);
await mongoClient.connect();
});
beforeEach(async () => {
// setup mongo client, page store, and embedded content store
databaseName = "test-all-command";
embedStore = makeMongoDbEmbeddedContentStore({
connectionUri: uri,
databaseName,
searchIndex: { embeddingName: "test-embedding" },
});
pageStore = makeMongoDbPageStore({
connectionUri: uri,
databaseName,
});
// create pages and verify that they have been created
await updatePages({ sources: mockDataSources, pageStore });
pages = await pageStore.loadPages();
assert(pages.length == 2);
// create embeddings for the pages and verify that they have been created
await updateEmbeddedContent({
since: new Date(0),
embeddedContentStore: embedStore,
pageStore,
sourceNames: mockDataSourceNames,
embedder,
});
page1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
page2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
assert(page1Embedding.length);
assert(page2Embedding.length);
});

afterEach(async () => {
await pageStore?.drop();
await embedStore?.drop();
});
afterAll(async () => {
await pageStore?.close();
await embedStore?.close();
await mongoClient?.close();
await mongod?.stop();
});

it("updates embedded content for pages that have been updated after the 'since' date provided", async () => {
// Modify dates of pages and embedded content for testing
const sinceDate = new Date("2024-01-01");
const beforeSinceDate = new Date("2023-01-01");
const afterSinceDate = new Date("2025-01-01");
// set pages[0] to be last updated before sinceDate (should not be modified)
await mongoClient
.db(databaseName)
.collection("pages")
.updateOne({ ...pages[0] }, { $set: { updated: beforeSinceDate } });
await mongoClient
.db(databaseName)
.collection("embedded_content")
.updateOne(
{ sourceName: mockDataSourceNames[0] },
{ $set: { updated: beforeSinceDate } }
);
// set pages[1] to be last updated after sinceDate (should be re-chunked)
await mongoClient
.db(databaseName)
.collection("pages")
.updateOne({ ...pages[1] }, { $set: { updated: afterSinceDate } });
await mongoClient
.db(databaseName)
.collection("embedded_content")
.updateOne(
{ sourceName: mockDataSourceNames[1] },
{ $set: { updated: afterSinceDate } }
);
const originalPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
const originalPage2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
await updateEmbeddedContent({
since: sinceDate,
embeddedContentStore: embedStore,
pageStore,
sourceNames: mockDataSourceNames,
embedder,
});
const updatedPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
const updatedPage2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
assert(updatedPage1Embedding.length);
assert(updatedPage2Embedding.length);
expect(updatedPage1Embedding[0].updated.getTime()).toBe(
originalPage1Embedding[0].updated.getTime()
);
expect(updatedPage2Embedding[0].updated.getTime()).not.toBe(
originalPage2Embedding[0].updated.getTime()
);
});
it("updates embedded content when page has not changed, but chunk algo has, ignoring since date", async () => {
// change the chunking algo for the second page, but not the first
await updateEmbeddedContent({
since: new Date(),
embeddedContentStore: embedStore,
pageStore,
sourceNames: [mockDataSourceNames[0]],
embedder,
});
await updateEmbeddedContent({
since: new Date(),
embeddedContentStore: embedStore,
pageStore,
sourceNames: [mockDataSourceNames[1]],
embedder,
chunkOptions: { chunkOverlap: 2 },
});
const updatedPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
const updatedPage2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
assert(updatedPage1Embedding.length);
assert(updatedPage2Embedding.length);
expect(updatedPage1Embedding[0].chunkAlgoHash).toBe(
page1Embedding[0].chunkAlgoHash
);
expect(updatedPage2Embedding[0].chunkAlgoHash).not.toBe(
page2Embedding[0].chunkAlgoHash
);
});
it("use a new chunking algo on data sources, some of which have pages that have been updated", async () => {
// SETUP: Modify dates of pages and embedded content for this test case
const sinceDate = new Date("2024-01-01");
const afterSinceDate = new Date("2025-01-01");
await mongoClient
.db(databaseName)
.collection("pages")
.updateOne({ ...pages[0] }, { $set: { updated: afterSinceDate } });
await mongoClient
.db(databaseName)
.collection("embedded_content")
.updateOne(
{ sourceName: mockDataSourceNames[0] },
{ $set: { updated: afterSinceDate } }
);
const originalPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
// END SETUP
await updateEmbeddedContent({
since: sinceDate,
embeddedContentStore: embedStore,
pageStore,
sourceNames: mockDataSourceNames,
embedder,
chunkOptions: { chunkOverlap: 2 },
});
const updatedPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
const updatedPage2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
assert(updatedPage1Embedding.length);
assert(updatedPage2Embedding.length);
// both pages should be updated
expect(updatedPage1Embedding[0].chunkAlgoHash).not.toBe(
originalPage1Embedding[0].chunkAlgoHash
);
expect(updatedPage2Embedding[0].chunkAlgoHash).not.toBe(
page2Embedding[0].chunkAlgoHash
);
});
});
Loading