diff --git a/docs/reference/bulk.md b/docs/reference/bulk.md new file mode 100644 index 000000000..6d2a64389 --- /dev/null +++ b/docs/reference/bulk.md @@ -0,0 +1,180 @@ +# Bulk ingest [bulk-ingest] + +The Bulk API can be cumbersome to use directly, due to its payload formatting +requirements. The PHP client includes a bulk helper function with a simplified +interface. + +With the bulk helper, the application has to provide a generator or iterable +that yields the individual bulk actions. The helper then formats and submits +the bulk request, optionally splitting the data into chunks based on maximum +number of actions or payload size. + +The following example submits a bulk request that indexes three documents: + +```php +use Elastic\Elasticsearch\Helper\Bulk; + +function get_next_document() +{ + yield Bulk::index_action([ + 'title' => 'document 1 title', + 'body' => 'document 1 body', + ]); + yield Bulk::index_action([ + 'title' => 'document 2 title', + 'body' => 'document 2 body', + ]); + yield Bulk::index_action([ + 'title' => 'document 3 title', + 'body' => 'document 3 body', + ]); +} + +Bulk::bulk($client, 'my_index', get_next_document()) +``` + +## The bulk helper function + +The bulk helper can be called as follows: + +```php +use Elastic\Elasticsearch\Helper\Bulk; + +$response = Bulk::bulk($client, $index, $actions, $stats_only = false, $chunk_size = 500, $max_chunk_bytes = 100 * 1024 * 1024); +``` + +This function has three required arguments: + +- `$client` is the client to use to submit Bulk API requests, +- `$index` is the default index that actions will be applied to, +- `$actions` is the iterable that yields the bulk actions, normally implemented + as a generator. + +The `$stats_only` optional argument controls whether details of each individual +operation are included in the response. When this argument is set to `true`, +these details are omitted. The default is `false`. + +The two optional arguments `$chunk_size` and`$max_chunk_bytes` determine how +often Bulk API requests are issued. The helper stores actions in memory and +only submits a Bulk API request when the action count reaches `$chunk_size` or +the payload size reaches `$max_chunk_bytes`, whichever happens first. The +application can also trigger an explicit Bulk API request to be issued by +yielding a `flush` action from its generator. + +The return value of the `bulk()` function is an array with three elements: + +- The total number of actions that were processed +- The count of errors +- An array with the status of each operation, as returned by the bulk API. This + array is omitted when the `$stats_only` argument is set to `true`. + +## Bulk actions + +A Bulk API request includes a list of operations, called *actions*. The Bulk +API supports four different actions: `index`, `create`, `update` and`delete`. +A `flush` action that is specific to this helper is also available. + +### Index + +The `index` action indexes the specified document. If the document already +exists, it replaces it and increments the version. + +```php +yield Bulk::index_action($document, $id = null, $other_metadata = null); +``` + +The `$document` argument is an array with the document to index. The document's +unique ID can be passed in the `$id` argument, if desired. When `$id` is not +provided, the server generates a unique document ID. Any other attributes of +the index action can be passed in the `$other_metadata` argument. + +The following example indexes a document with ID `123`: + +```php +yield Bulk::index_action([ + 'field1' => 'value1', +], '123'); +``` + +The next example explicitly names the index that the action applies to: + +```php +yield Bulk::index_action([ + 'field1' => 'value1', +], '123', ['_index' => 'some-other-index']); +``` + +### Create + +The `create` action indexes the specified document if it does not already +exist. + +```php +yield Bulk::create_action($document, $id = null, $other_metadata = null); +``` + +The `$document` argument is an array with the document to create. The +document's unique ID can be passed in the `$id` argument, if desired. When +`$id` is not provided, the server generates a unique document ID. Any other +attributes of the index action can be passed in the `$other_metadata` argument. + +The following example creates a document: + +```php +yield Bulk::create_action([ + 'field1' => 'value1', +]); +``` + +### Update + +The `update` action performs a partial document update. + +```php +yield Bulk::update_action($document_updates, $id, $other_metadata = null); +``` + +The `$document_updates` argument is an array with the desired updates to the +document, formatted as required by the Bulk API +[update action](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk#operation-bulk-body-application-json-updateaction-object). +The `$id` argument is the ID of the document to update. Any other attributes of +the index action can be passed in the `$other_metadata` argument. + +The following example updates the value of field `field2` in the document with +ID `123`: + +```php +yield Bulk::update_action([ + 'doc' => [ + 'field2' => 'value2', + ], +], '123'); +``` + +### Delete + +The `delete` action removes the specified document from the index. + +```php +yield Bulk::delete_action($id, $other_metadata = null); +``` + +The `$id` argument is the ID of the document delete. Any other attributes of +the index action can be passed in the `$other_metadata` argument. + +The following example deletes the document with ID `123`: + +```php +yield Bulk::delete_action('123'); +``` + +### Flush + +The `flush` action instructs the bulk helper to submit a Bulk API request with +all the actions accumulated up to that point. This allows the application to +override the logic that decides when to submit in a Bulk API request based on +count of actions or payload size. + +```php +yield Bulk::flush_action(); +``` diff --git a/docs/reference/client-helpers.md b/docs/reference/client-helpers.md index fec194e67..14877d403 100644 --- a/docs/reference/client-helpers.md +++ b/docs/reference/client-helpers.md @@ -9,6 +9,7 @@ The PHP client comes with the following helpers: * [Iterators](/reference/iterators.md) * [ES|QL](/reference/esql.md) +* [Bulk ingest](/reference/bulk.md) diff --git a/docs/reference/toc.yml b/docs/reference/toc.yml index e58d0e544..e9421d6a5 100644 --- a/docs/reference/toc.yml +++ b/docs/reference/toc.yml @@ -24,4 +24,5 @@ toc: - file: client-helpers.md children: - file: iterators.md - - file: esql.md \ No newline at end of file + - file: esql.md + - file: bulk.md diff --git a/examples/dense_vector_benchmark.php b/examples/dense_vector_benchmark.php new file mode 100644 index 000000000..791eb4d03 --- /dev/null +++ b/examples/dense_vector_benchmark.php @@ -0,0 +1,172 @@ + $doc['docid'], + 'title' => $doc['title'], + 'text' => $doc['text'], + 'emb' => $packed ? Vectors::packDenseVector($doc['emb']) : $doc['emb'] + ]); + } +} + +function upload($client, $index, $dataset, $chunk_size, $repetitions, $packed) { + // create index + if ($client->indices()->exists(['index' => $index])->getStatusCode() != 404) { + $client->indices()->delete(['index' => $index]); + } + $client->indices()->create([ + 'index' => $index, + 'body' => [ + 'mappings' => [ + 'properties' => [ + 'docid' => [ + 'type' => 'keyword', + ], + 'title' => [ + 'type' => 'text', + ], + 'text' => [ + 'type' => 'text', + ], + 'emb' => [ + 'type' => 'dense_vector', + 'index_options' => [ + 'type' => 'flat', + ], + ], + ], + ], + ], + ]); + $client->indices()->refresh(['index' => $index]); + + // run the bulk upload + $len = sizeof($dataset); + $body = []; + $start = microtime(true); + $response = Bulk::bulk($client, $index, get_next_document($dataset, $repetitions, $packed), true, $chunk_size); + assert ($response[0] == sizeof($dataset)); // make sure all items were ingested + assert ($response[1] == 0); // make sure there were no errors + return microtime(true) - $start; +} + +$opts = getopt('s:r:', array('url:', 'json', 'runs:', 'help'), $rest_index); +if (array_key_exists('help', $opts)) { + echo "Usage: " . $argv[0] . "[-s CHUNK_SIZES] [-r REPETITIONS] [--url URL] [--json] [--runs RUNS] DATASET_FILE\n"; + echo " -s CHUNK_SIZES List of chunk sizes to use, separated by commas (default: 100,250,500,1000)\n"; + echo " -r REPETITIONS Number of times the dataset is repeated (default: 20)\n"; + echo " --url URL The Elasticsearch connection URL\n"; + echo " --json Output benchmark results in JSON format\n"; + echo " --runs Number of runs that are averaged for each chunk size (default: 3)\n"; + exit(0); +} +if (!array_key_exists('url', $opts)) { + echo 'Error: --url argument is required.'; + exit(1); +} +else { + $ELASTICSEARCH_URL = $opts['url']; +} +if (array_key_exists('s', $opts)) { + $chunk_sizes = array_map(fn($v) => intval($v), explode(',', $opts['s'])); +} +if (array_key_exists('r', $opts)) { + $repetitions = intval($opts['r']); +} +if (array_key_exists('json', $opts)) { + $json_output = TRUE; +} +if (array_key_exists('runs', $opts)) { + $runs = intval($opts['runs']); +} +if (!$argv[$rest_index]) { + echo 'Error'; + exit(1); +} +else { + $dataset_file = $argv[$rest_index]; +} + +// read CSV dataset +$f = fopen($dataset_file, 'rt'); +while (!feof($f)) { + $line = fgets($f); + if ($line !== FALSE) { + $dataset[] = json_decode($line, true); + } +} +fclose($f); + +// initialize client +$client = Elastic\Elasticsearch\ClientBuilder::create() + ->setHosts([$ELASTICSEARCH_URL]) + ->build(); + +// run the benchmark +$results = []; +foreach ($chunk_sizes as $chunk_size) { + if (!$json_output) { + echo 'Uploading ' . $dataset_file . ' with chunk size ' . $chunk_size . "...\n"; + } + $normal_runs = []; + $packed_runs = []; + for ($run = 0; $run < $runs; $run++) { + $normal_runs[] = upload($client, $index, $dataset, $chunk_size, $repetitions, FALSE); + $packed_runs[] = upload($client, $index, $dataset, $chunk_size, $repetitions, TRUE); + } + $t = array_sum($normal_runs) / $runs; + $pt = array_sum($packed_runs) / $runs; + $result = [ + 'dataset_size' => sizeof($dataset) * $repetitions, + 'chunk_size' => $chunk_size, + 'float32' => [ + 'duration' => intval($t * 1000 + 0.5), + ], + 'base64' => [ + 'duration' => intval($pt * 1000 + 0.5), + ], + ]; + $results[] = $result; + if (!$json_output) { + echo 'Size: ' . $result['dataset_size'] . "\n"; + echo 'float duration: ' . number_format($t, 2) . 's (' . number_format($result['dataset_size'] / $t, 2) . " docs/s)\n"; + echo 'base64 duration: ' . number_format($pt, 2) . 's (' . number_format($result['dataset_size'] / $pt, 2) . " docs/s)\n"; + echo 'Speed up: ' . number_format($t / $pt, 2) . "x\n"; + } +} + +if ($json_output) { + echo json_encode($results, JSON_PRETTY_PRINT); +} diff --git a/src/Exception/BulkHelperException.php b/src/Exception/BulkHelperException.php new file mode 100644 index 000000000..ffe598708 --- /dev/null +++ b/src/Exception/BulkHelperException.php @@ -0,0 +1,26 @@ + an array where the first and second elements are the + * total number of operations processed and the total number of errors. + * If $stats_only is false, a third element is returned, with an array of + * individual operation results, as returned by the bulk API. + * @throws ClientResponseException + * @throws ServerResponseException + */ + public static function bulk( + Client $client, + string $index, + Iterator $actions, + bool $stats_only = false, + int $chunk_size = 500, + int $max_chunk_bytes = 100 * 1024 * 1024, + ): array { + $totalCount = 0; + $totalErrors = 0; + $results = []; + $chunkCount = 0; + $chunkBytes = 0; + $body = []; + foreach ($actions as $action) { + foreach ($action as $item) { + $encodedItem = json_encode($item); + $chunkBytes += strlen($encodedItem) + 1; + $body[] = $encodedItem; + } + if (count($action) > 0) { + $chunkCount++; + $totalCount++; + } + if (count($action) == 0 || $chunkCount >= $chunk_size || $chunkBytes >= $max_chunk_bytes) { + $response = $client->bulk(['index' => $index, 'body' => $body]); + foreach ($response['items'] as $item) { + $status = $item[array_key_first($item)]['status']; + if ($status < 200 || $status >= 300) { + $totalErrors += 1; + } + if (!$stats_only) { + $results[] = $item; + } + } + $body = []; + $chunkCount = 0; + $chunkBytes = 0; + unset($response); + } + } + if (!empty($body)) { + $response = $client->bulk(['index' => $index, 'body' => $body]); + foreach ($response['items'] as $item) { + $status = $item[array_key_first($item)]['status']; + if ($status < 200 || $status >= 300) { + $totalErrors += 1; + } + $results[] = $item; + } + unset($body); + unset($response); + } + return [$totalCount, $totalErrors, $results]; + } + + private static function action(string $action, array $metadata, ?array $document): array { + if (count($metadata) == 0) { + $metadata = new class {}; + } + if (isset($document)) { + return [ + [$action => $metadata], + $document, + ]; + } + else { + return [ + [$action => $metadata], + ]; + } + } + + /** + * Return an index action for the bulk helper. + * + * @param $document The document to index. + * @param $id The id of the document. If omitted, a new document id will be assigned. + * @param $metadata Additional metadata to include. + * @return array + */ + public static function index_action(array $document, ?string $id = null, ?array $other_metadata = null): array { + $metadata = []; + if (isset($id)) { + $metadata['_id'] = $id; + } + if (isset($other_metadata)) { + $metadata = array_merge($other_metadata, $metadata); + } + return Bulk::action('index', $metadata, $document); + } + + /** + * Return a create action for the bulk helper. + * + * @param $document The document to create. + * @param $id The id of the document. If omitted, a new document id will be assigned. + * @param $metadata Additional metadata to include. + * @return array + */ + public static function create_action(array $document, ?string $id = null, ?array $other_metadata = null): array { + $metadata = []; + if (isset($id)) { + $metadata['_id'] = $id; + } + if (isset($other_metadata)) { + $metadata = array_merge($other_metadata, $metadata); + } + return Bulk::action('create', $metadata, $document); + } + + /** + * Return an partial update action for the bulk helper. + * + * @param $document The document to index. + * @param $id The id of the document. + * @param $metadata Additional metadata to include. + * @return array + */ + public static function update_action(array $document_updates, string $id, ?array $other_metadata = null): array { + $metadata = ['_id' => $id]; + if (isset($other_metadata)) { + $metadata = array_merge($other_metadata, $metadata); + } + return Bulk::action('update', $metadata, ['doc' => $document_updates]); + } + + /** + * Return a delete action for the bulk helper. + * + * @param $id The id of the document. If omitted, a new document id will be assigned. + * @param $other_metadata Additional metadata to include. + * @return array + */ + public static function delete_action(string $id, ?array $other_metadata = null): array { + $metadata = ['_id' => $id]; + if (isset($other_metadata)) { + $metadata = array_merge($other_metadata, $metadata); + } + return Bulk::action('delete', $metadata, null); + } + + /** + * Return a flush action for the bulk helper. + * + * This isn't a proper action that is sent to the server, but just an indicator + * for the bulk helper to write all the entries that have been accumulated so + * far, even if they do not meet the count or size criteria for writing. + * + * @return array + */ + public static function flush_action(): array { + return []; + } +} diff --git a/tests/Integration/BulkTest.php b/tests/Integration/BulkTest.php index ea6e23388..4b21bffb2 100644 --- a/tests/Integration/BulkTest.php +++ b/tests/Integration/BulkTest.php @@ -15,10 +15,23 @@ namespace Elastic\Elasticsearch\Tests\Integration; use Elastic\Elasticsearch\Client; +use Elastic\Elasticsearch\Helper\Bulk; use Elastic\Elasticsearch\Helper\Vectors; use Elastic\Elasticsearch\Tests\Utility; use PHPUnit\Framework\TestCase; + +function readIndex($client, $index) { + $response = $client->indices()->refresh([ + 'index' => $index, + ]); + $response = $client->search([ + 'index' => $index, + 'body' => ['query' => ['match_all' => new \ArrayObject([])]] + ]); + return $response; +} + /** * @group integration */ @@ -133,4 +146,185 @@ public function testBulkIndexWithBase64Vector() $this->assertEquals(200, $response->getStatusCode()); $this->assertCount(2, $response['items']); } + + public function testBulkHelperFlushByCount() + { + function flushByCountActions($client, $index) { + yield Bulk::create_action(['data' => 'one']); + yield Bulk::create_action(['data' => 'two'], '2'); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 2); + + yield Bulk::index_action(['data' => 'three']); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 2); + + yield Bulk::index_action(['data' => 'fuor'], '4'); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 4); + + yield Bulk::update_action(['data' => 'four'], '4'); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 4); + + yield Bulk::delete_action('2'); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 3); + } + + $response = Bulk::bulk( + $this->client, self::TEST_INDEX, + flushByCountActions($this->client, self::TEST_INDEX), true, 2 + ); + $this->assertEquals($response[0], 6); + $this->assertEquals($response[1], 0); + $this->assertEquals($response[2], []); // only stats in this response + + $response = readIndex($this->client, self::TEST_INDEX); + $this->assertEquals(200, $response->getStatusCode()); + $this->assertEquals(3, $response['hits']['total']['value']); + foreach ($response['hits']['hits'] as $hit) { + switch ($hit['_source']['data']) { + case 'one': + break; + case 'three': + break; + case 'four': + $this->assertEquals($hit['_id'], '4'); + break; + default: + $this->assertFalse(true, 'Unexpected data: ' . $hit['_source']['data']); + break; + } + } + } + + public function testBulkHelperFlushBySize() + { + function flushBySizeActions($client, $index) { + yield Bulk::create_action(['data' => 'one']); + yield Bulk::create_action(['data' => 'two'], '2'); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 2); + + yield Bulk::index_action(['data' => 'three']); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 2); + + yield Bulk::index_action(['data' => 'fuor'], '4'); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 4); + + yield Bulk::update_action(['data' => 'four'], '4'); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 4); + + yield Bulk::delete_action('2'); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 4); + } + + $response = Bulk::bulk( + $this->client, self::TEST_INDEX, + flushBySizeActions($this->client, self::TEST_INDEX), + false, // include individual item results + 500, 40, + ); + $this->assertEquals($response[0], 6); + $this->assertEquals($response[1], 0); + $this->assertEquals(sizeof($response[2]), 6); + + $response = readIndex($this->client, self::TEST_INDEX); + $this->assertEquals(200, $response->getStatusCode()); + $this->assertEquals(3, $response['hits']['total']['value']); + foreach ($response['hits']['hits'] as $hit) { + switch ($hit['_source']['data']) { + case 'one': + break; + case 'three': + break; + case 'four': + $this->assertEquals($hit['_id'], '4'); + break; + default: + $this->assertFalse(true, 'Unexpected data: ' . $hit['_source']['data']); + break; + } + } + } + + + public function testBulkHelperExplicitFlush() + { + function explicitFlushActions($client, $index) { + yield Bulk::create_action(['data' => 'one']); + yield Bulk::create_action(['data' => 'two'], '2'); + yield Bulk::flush_action(); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 2); + + yield Bulk::index_action(['data' => 'three']); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 2); + + yield Bulk::index_action(['data' => 'fuor'], '4'); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 2); + + yield Bulk::update_action(['data' => 'four'], '4'); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 2); + + yield Bulk::delete_action('2'); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 2); + + yield Bulk::flush_action(); + $response = readIndex($client, $index); + assert($response->getStatusCode() == 200); + assert($response['hits']['total']['value'] == 3); + } + + $response = Bulk::bulk( + $this->client, self::TEST_INDEX, + explicitFlushActions($this->client, self::TEST_INDEX), + true, // stats only + ); + $this->assertEquals($response[0], 6); + $this->assertEquals($response[1], 0); + $this->assertEquals($response[2], []); + + $response = readIndex($this->client, self::TEST_INDEX); + $this->assertEquals(200, $response->getStatusCode()); + $this->assertEquals(3, $response['hits']['total']['value']); + foreach ($response['hits']['hits'] as $hit) { + switch ($hit['_source']['data']) { + case 'one': + break; + case 'three': + break; + case 'four': + $this->assertEquals($hit['_id'], '4'); + break; + default: + $this->assertFalse(true, 'Unexpected data: ' . $hit['_source']['data']); + break; + } + } + } }