diff --git a/.gitignore b/.gitignore index 07e49ff7b..469bccb47 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,4 @@ rest-api-spec yaml-rest-tests generated-tests schema +base64-data diff --git a/package.json b/package.json index aa28671f8..8c2a90edb 100644 --- a/package.json +++ b/package.json @@ -70,6 +70,7 @@ "chai": "5.2.1", "cross-zip": "4.0.1", "desm": "1.3.1", + "inly": "^5.0.1", "into-stream": "8.0.1", "js-yaml": "4.1.0", "license-checker": "25.0.1", diff --git a/test/ingest/README.md b/test/ingest/README.md new file mode 100644 index 000000000..c63f26a53 --- /dev/null +++ b/test/ingest/README.md @@ -0,0 +1,5 @@ +To run the base64 benchmark test: + +1. Install dependencies: `npm install` +2. Set environment variables `ES_URL`, `ES_USERNAME` and `ES_PASSWORD` +3. Run the script: `node test/ingest/base64.mjs` diff --git a/test/ingest/base64.mjs b/test/ingest/base64.mjs new file mode 100644 index 000000000..dcf290b55 --- /dev/null +++ b/test/ingest/base64.mjs @@ -0,0 +1,170 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { readFile } from "node:fs/promises" +import { existsSync, createWriteStream, mkdirSync } from "node:fs" +import { Readable } from "node:stream" +import { finished } from "node:stream/promises" +import inly from 'inly' +import { Client } from '../../index.js' +import { Serializer } from '@elastic/transport' + +const client = new Client({ + node: process.env.ES_URL, + auth: { + username: process.env.ES_USERNAME, + password: process.env.ES_PASSWORD, + }, + compression: false +}) + +const indexName = "b64-test" + +const indexSettings = { + index: indexName, + wait_for_active_shards: 'all', + mappings: { + properties: { + docid: { type: "keyword" }, + emb: { + dims: 1536, + index: true, + index_options: { type: "flat" }, + similarity: "cosine", + type: "dense_vector" + }, + text: { + fields: { + keyword: { + ignore_above: 256, + type: "keyword" + } + }, + type: "text" + }, + title: { + type: "text" + } + } + } +} + +const dataset_size = 20000 + +/** + * Fetches vector data set + */ +async function fetchDataSet () { + const url = 'https://rally-tracks.elastic.co/openai_vector/open_ai_corpus-initial-indexing-1k.json.bz2' + const dir = 'base64-data' + const filePath = `./${dir}/open_ai_corpus-initial-indexing-1k.json` + const filePathBz2 = `./${dir}/open_ai_corpus-initial-indexing-1k.json.bz2` + + if (!existsSync(filePath)) { + mkdirSync(dir, { recursive: true }) + + // download archive + if (!existsSync(filePathBz2)) { + console.log(`Downloading ${url}`) + const { body } = await fetch(url) + const stream = createWriteStream(filePathBz2) + await finished(Readable.fromWeb(body).pipe(stream)) + } + + // extract archive + await new Promise((resolve, reject) => { + console.log(`Extracting ${filePathBz2} to ${dir}`) + const extract = inly(filePathBz2, dir) + extract.on('error', reject) + extract.on('end', resolve) + }) + } + + const file = await readFile(filePath, 'utf8') + return file.split(/[\r\n]+/) + .filter(row => row.trim().length > 0) + .map(row => JSON.parse(row.trim())) +} + +/** + * Loops over an array until a certain number of records has be yielded + */ +function* loopDataSet (data) { + let count = 0 + while (true) { + for (const item of data) { + yield item + count++ + if (count >= dataset_size) return + } + } +} + +/** + * Bulk ingest the dataset + * @param {number} chunkSize number of documents to serialize before running bulk request + * @param {boolean} base64 If true, encode float32 embeddings array as a base64 string + * @returns {number} Milliseconds the serialize+index operations took + */ +async function index (chunkSize, base64 = false) { + const data = await fetchDataSet() + const serializer = new Serializer() + let chunk = [] + + await client.indices.create(indexSettings) + await client.indices.refresh({ index: indexName }) + + const start = Date.now() + + for (const doc of loopDataSet(data)) { + if (base64) doc.emb = serializer.encodeFloat32Vector(doc.emb) + chunk.push(doc) + if (chunk.length >= chunkSize) { + const operations = chunk.flatMap(doc => [{ index: { _index: indexName } }, doc]) + await client.bulk({ operations }) + chunk = [] + } + } + + const duration = Date.now() - start + + await client.indices.delete({ index: indexName }) + + return duration +} + +async function run () { + // clean up any existing index, in case a previous run ended in a partially finished state + await client.indices.delete({ + index: indexName, + allow_no_indices: true + }) + + const measurements = [] + + for (const chunk_size of [100, 250, 500, 1000]) { + const measurement = { dataset_size, chunk_size } + + const float32Duration = [] + const base64Duration = [] + + for (const _ of [1, 2, 3]) { + float32Duration.push(await index(chunk_size)) + base64Duration.push(await index(chunk_size, true)) + } + + measurement.float32 = { duration: float32Duration.reduce((a, b) => a + b, 0) / float32Duration.length } + measurement.base64 = { duration: base64Duration.reduce((a, b) => a + b, 0) / base64Duration.length } + + measurements.push(measurement) + } + + console.log(JSON.stringify(measurements, null, 2)) +} + +run().catch(err => { + console.error(err) + process.exit(1) +})