Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ rest-api-spec
yaml-rest-tests
generated-tests
schema
base64-data
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"chai": "5.2.1",
"cross-zip": "4.0.1",
"desm": "1.3.1",
"inly": "^5.0.1",
"into-stream": "8.0.1",
"js-yaml": "4.1.0",
"license-checker": "25.0.1",
Expand Down
5 changes: 5 additions & 0 deletions test/ingest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
To run the base64 benchmark test:

1. Install dependencies: `npm install`
2. Set environment variables `ES_URL`, `ES_USERNAME` and `ES_PASSWORD`
3. Run the script: `node test/ingest/base64.mjs`
170 changes: 170 additions & 0 deletions test/ingest/base64.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright Elasticsearch B.V. and contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { readFile } from "node:fs/promises"
import { existsSync, createWriteStream, mkdirSync } from "node:fs"
import { Readable } from "node:stream"
import { finished } from "node:stream/promises"
import inly from 'inly'
import { Client } from '../../index.js'
import { Serializer } from '@elastic/transport'

const client = new Client({
node: process.env.ES_URL,
auth: {
username: process.env.ES_USERNAME,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm just when we're testing to use http instead of https as suggested in the doc

password: process.env.ES_PASSWORD,
},
compression: false
})

const indexName = "b64-test"

const indexSettings = {
index: indexName,
wait_for_active_shards: 'all',
mappings: {
properties: {
docid: { type: "keyword" },
emb: {
dims: 1536,
index: true,
index_options: { type: "flat" },
similarity: "cosine",
type: "dense_vector"
},
text: {
fields: {
keyword: {
ignore_above: 256,
type: "keyword"
}
},
type: "text"
},
title: {
type: "text"
}
}
}
}

const dataset_size = 20000

/**
* Fetches vector data set
*/
async function fetchDataSet () {
const url = 'https://rally-tracks.elastic.co/openai_vector/open_ai_corpus-initial-indexing-1k.json.bz2'
const dir = 'base64-data'
const filePath = `./${dir}/open_ai_corpus-initial-indexing-1k.json`
const filePathBz2 = `./${dir}/open_ai_corpus-initial-indexing-1k.json.bz2`

if (!existsSync(filePath)) {
mkdirSync(dir, { recursive: true })

// download archive
if (!existsSync(filePathBz2)) {
console.log(`Downloading ${url}`)
const { body } = await fetch(url)
const stream = createWriteStream(filePathBz2)
await finished(Readable.fromWeb(body).pipe(stream))
}

// extract archive
await new Promise((resolve, reject) => {
console.log(`Extracting ${filePathBz2} to ${dir}`)
const extract = inly(filePathBz2, dir)
extract.on('error', reject)
extract.on('end', resolve)
})
}

const file = await readFile(filePath, 'utf8')
return file.split(/[\r\n]+/)
.filter(row => row.trim().length > 0)
.map(row => JSON.parse(row.trim()))
}

/**
* Loops over an array until a certain number of records has be yielded
*/
function* loopDataSet (data) {
let count = 0
while (true) {
for (const item of data) {
yield item
count++
if (count >= dataset_size) return
}
}
}

/**
* Bulk ingest the dataset
* @param {number} chunkSize number of documents to serialize before running bulk request
* @param {boolean} base64 If true, encode float32 embeddings array as a base64 string
* @returns {number} Milliseconds the serialize+index operations took
*/
async function index (chunkSize, base64 = false) {
const data = await fetchDataSet()
const serializer = new Serializer()
let chunk = []

await client.indices.create(indexSettings)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add a refresh call to ensure the index is fully created before starting the ingestion

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 63277f6

await client.indices.refresh({ index: indexName })

const start = Date.now()

for (const doc of loopDataSet(data)) {
if (base64) doc.emb = serializer.encodeFloat32Vector(doc.emb)
chunk.push(doc)
if (chunk.length >= chunkSize) {
const operations = chunk.flatMap(doc => [{ index: { _index: indexName } }, doc])
await client.bulk({ operations })
chunk = []
}
}

const duration = Date.now() - start

await client.indices.delete({ index: indexName })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The index is deleted but there's no check/delete at the start of each pass before client.indices.create(). If a previous run crashed, the index might already exist...to be extra safe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 47044ff.


return duration
}

async function run () {
// clean up any existing index, in case a previous run ended in a partially finished state
await client.indices.delete({
index: indexName,
allow_no_indices: true
})

const measurements = []

for (const chunk_size of [100, 250, 500, 1000]) {
const measurement = { dataset_size, chunk_size }

const float32Duration = []
const base64Duration = []

for (const _ of [1, 2, 3]) {
float32Duration.push(await index(chunk_size))
base64Duration.push(await index(chunk_size, true))
}

measurement.float32 = { duration: float32Duration.reduce((a, b) => a + b, 0) / float32Duration.length }
measurement.base64 = { duration: base64Duration.reduce((a, b) => a + b, 0) / base64Duration.length }

measurements.push(measurement)
}

console.log(JSON.stringify(measurements, null, 2))
}

run().catch(err => {
console.error(err)
process.exit(1)
})
Loading