Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions docs/reference/bulk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# Bulk ingest [bulk-ingest]

The Bulk API can be cumbersome to use directly, due to its payload formatting
requirements. The PHP client includes a bulk helper function with a simplified
interface.

With the bulk helper, the application has to provide a generator or iterable
that yields the individual bulk actions. The helper then formats and submits
the bulk request, optionally splitting the data into chunks based on maximum
number of actions or payload size.

The following example submits a bulk request that indexes three documents:

```php
use Elastic\Elasticsearch\Helper\Bulk;

function get_next_document()
{
yield Bulk::index_action([
'title' => 'document 1 title',
'body' => 'document 1 body',
]);
yield Bulk::index_action([
'title' => 'document 2 title',
'body' => 'document 2 body',
]);
yield Bulk::index_action([
'title' => 'document 3 title',
'body' => 'document 3 body',
]);
}

Bulk::bulk($client, 'my_index', get_next_document())
```

## The bulk helper function

The bulk helper can be called as follows:

```php
use Elastic\Elasticsearch\Helper\Bulk;

$response = Bulk::bulk($client, $index, $actions, $stats_only = false, $chunk_size = 500, $max_chunk_bytes = 100 * 1024 * 1024);
```

This function has three required arguments:

- `$client` is the client to use to submit Bulk API requests,
- `$index` is the default index that actions will be applied to,
- `$actions` is the iterable that yields the bulk actions, normally implemented
as a generator.

The `$stats_only` optional argument controls whether details of each individual
operation are included in the response. When this argument is set to `true`,
these details are omitted. The default is `false`.

The two optional arguments `$chunk_size` and`$max_chunk_bytes` determine how
often Bulk API requests are issued. The helper stores actions in memory and
only submits a Bulk API request when the action count reaches `$chunk_size` or
the payload size reaches `$max_chunk_bytes`, whichever happens first. The
application can also trigger an explicit Bulk API request to be issued by
yielding a `flush` action from its generator.

The return value of the `bulk()` function is an array with three elements:

- The total number of actions that were processed
- The count of errors
- An array with the status of each operation, as returned by the bulk API. This
array is omitted when the `$stats_only` argument is set to `true`.

## Bulk actions

A Bulk API request includes a list of operations, called *actions*. The Bulk
API supports four different actions: `index`, `create`, `update` and`delete`.
A `flush` action that is specific to this helper is also available.

### Index

The `index` action indexes the specified document. If the document already
exists, it replaces it and increments the version.

```php
yield Bulk::index_action($document, $id = null, $other_metadata = null);
```

The `$document` argument is an array with the document to index. The document's
unique ID can be passed in the `$id` argument, if desired. When `$id` is not
provided, the server generates a unique document ID. Any other attributes of
the index action can be passed in the `$other_metadata` argument.

The following example indexes a document with ID `123`:

```php
yield Bulk::index_action([
'field1' => 'value1',
], '123');
```

The next example explicitly names the index that the action applies to:

```php
yield Bulk::index_action([
'field1' => 'value1',
], '123', ['_index' => 'some-other-index']);
```

### Create

The `create` action indexes the specified document if it does not already
exist.

```php
yield Bulk::create_action($document, $id = null, $other_metadata = null);
```

The `$document` argument is an array with the document to create. The
document's unique ID can be passed in the `$id` argument, if desired. When
`$id` is not provided, the server generates a unique document ID. Any other
attributes of the index action can be passed in the `$other_metadata` argument.

The following example creates a document:

```php
yield Bulk::create_action([
'field1' => 'value1',
]);
```

### Update

The `update` action performs a partial document update.

```php
yield Bulk::update_action($document_updates, $id, $other_metadata = null);
```

The `$document_updates` argument is an array with the desired updates to the
document, formatted as required by the Bulk API
[update action](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk#operation-bulk-body-application-json-updateaction-object).
The `$id` argument is the ID of the document to update. Any other attributes of
the index action can be passed in the `$other_metadata` argument.

The following example updates the value of field `field2` in the document with
ID `123`:

```php
yield Bulk::update_action([
'doc' => [
'field2' => 'value2',
],
], '123');
```

### Delete

The `delete` action removes the specified document from the index.

```php
yield Bulk::delete_action($id, $other_metadata = null);
```

The `$id` argument is the ID of the document delete. Any other attributes of
the index action can be passed in the `$other_metadata` argument.

The following example deletes the document with ID `123`:

```php
yield Bulk::delete_action('123');
```

### Flush

The `flush` action instructs the bulk helper to submit a Bulk API request with
all the actions accumulated up to that point. This allows the application to
override the logic that decides when to submit in a Bulk API request based on
count of actions or payload size.

```php
yield Bulk::flush_action();
```
1 change: 1 addition & 0 deletions docs/reference/client-helpers.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The PHP client comes with the following helpers:

* [Iterators](/reference/iterators.md)
* [ES|QL](/reference/esql.md)
* [Bulk ingest](/reference/bulk.md)



3 changes: 2 additions & 1 deletion docs/reference/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ toc:
- file: client-helpers.md
children:
- file: iterators.md
- file: esql.md
- file: esql.md
- file: bulk.md
172 changes: 172 additions & 0 deletions examples/dense_vector_benchmark.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
<?php
/**
* Elasticsearch PHP Client
*
* @link https://github.com/elastic/elasticsearch-php
* @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co)
* @license https://opensource.org/licenses/MIT MIT License
*
* Licensed to Elasticsearch B.V under one or more agreements.
* Elasticsearch B.V licenses this file to you under the MIT License.
* See the LICENSE file in the project root for more information.
*/
declare(strict_types = 1);
ini_set('memory_limit', '1024M');

require_once __DIR__ . '/../vendor/autoload.php';

use Elastic\Elasticsearch\Helper\Bulk;
use Elastic\Elasticsearch\Helper\Vectors;

$ELASTICSEARCH_URL = '';
$chunk_sizes = array(100, 250, 500, 1000);
$repetitions = 20;
$json_output = FALSE;
$runs = 3;
$dataset_file = '';
$rest_index = null;
$dataset = [];
$index = 'benchmark';

function get_next_document($dataset, $repetitions, $packed) {
$len = sizeof($dataset);
for ($i = 1; $i <= $len * $repetitions; $i++) {
$doc = $dataset[($i - 1) % $len];
yield Bulk::index_action([
'docid' => $doc['docid'],
'title' => $doc['title'],
'text' => $doc['text'],
'emb' => $packed ? Vectors::packDenseVector($doc['emb']) : $doc['emb']
]);
}
}

function upload($client, $index, $dataset, $chunk_size, $repetitions, $packed) {
// create index
if ($client->indices()->exists(['index' => $index])->getStatusCode() != 404) {
$client->indices()->delete(['index' => $index]);
}
$client->indices()->create([
'index' => $index,
'body' => [
'mappings' => [
'properties' => [
'docid' => [
'type' => 'keyword',
],
'title' => [
'type' => 'text',
],
'text' => [
'type' => 'text',
],
'emb' => [
'type' => 'dense_vector',
'index_options' => [
'type' => 'flat',
],
],
],
],
],
]);
$client->indices()->refresh(['index' => $index]);

// run the bulk upload
$len = sizeof($dataset);
$body = [];
$start = microtime(true);
$response = Bulk::bulk($client, $index, get_next_document($dataset, $repetitions, $packed), true, $chunk_size);
assert ($response[0] == sizeof($dataset)); // make sure all items were ingested
assert ($response[1] == 0); // make sure there were no errors
return microtime(true) - $start;
}

$opts = getopt('s:r:', array('url:', 'json', 'runs:', 'help'), $rest_index);
if (array_key_exists('help', $opts)) {
echo "Usage: " . $argv[0] . "[-s CHUNK_SIZES] [-r REPETITIONS] [--url URL] [--json] [--runs RUNS] DATASET_FILE\n";
echo " -s CHUNK_SIZES List of chunk sizes to use, separated by commas (default: 100,250,500,1000)\n";
echo " -r REPETITIONS Number of times the dataset is repeated (default: 20)\n";
echo " --url URL The Elasticsearch connection URL\n";
echo " --json Output benchmark results in JSON format\n";
echo " --runs Number of runs that are averaged for each chunk size (default: 3)\n";
exit(0);
}
if (!array_key_exists('url', $opts)) {
echo 'Error: --url argument is required.';
exit(1);
}
else {
$ELASTICSEARCH_URL = $opts['url'];
}
if (array_key_exists('s', $opts)) {
$chunk_sizes = array_map(fn($v) => intval($v), explode(',', $opts['s']));
}
if (array_key_exists('r', $opts)) {
$repetitions = intval($opts['r']);
}
if (array_key_exists('json', $opts)) {
$json_output = TRUE;
}
if (array_key_exists('runs', $opts)) {
$runs = intval($opts['runs']);
}
if (!$argv[$rest_index]) {
echo 'Error';
exit(1);
}
else {
$dataset_file = $argv[$rest_index];
}

// read CSV dataset
$f = fopen($dataset_file, 'rt');
while (!feof($f)) {
$line = fgets($f);
if ($line !== FALSE) {
$dataset[] = json_decode($line, true);
}
}
fclose($f);

// initialize client
$client = Elastic\Elasticsearch\ClientBuilder::create()
->setHosts([$ELASTICSEARCH_URL])
->build();

// run the benchmark
$results = [];
foreach ($chunk_sizes as $chunk_size) {
if (!$json_output) {
echo 'Uploading ' . $dataset_file . ' with chunk size ' . $chunk_size . "...\n";
}
$normal_runs = [];
$packed_runs = [];
for ($run = 0; $run < $runs; $run++) {
$normal_runs[] = upload($client, $index, $dataset, $chunk_size, $repetitions, FALSE);
$packed_runs[] = upload($client, $index, $dataset, $chunk_size, $repetitions, TRUE);
}
$t = array_sum($normal_runs) / $runs;
$pt = array_sum($packed_runs) / $runs;
$result = [
'dataset_size' => sizeof($dataset) * $repetitions,
'chunk_size' => $chunk_size,
'float32' => [
'duration' => intval($t * 1000 + 0.5),
],
'base64' => [
'duration' => intval($pt * 1000 + 0.5),
],
];
$results[] = $result;
if (!$json_output) {
echo 'Size: ' . $result['dataset_size'] . "\n";
echo 'float duration: ' . number_format($t, 2) . 's (' . number_format($result['dataset_size'] / $t, 2) . " docs/s)\n";
echo 'base64 duration: ' . number_format($pt, 2) . 's (' . number_format($result['dataset_size'] / $pt, 2) . " docs/s)\n";
echo 'Speed up: ' . number_format($t / $pt, 2) . "x\n";
}
}

if ($json_output) {
echo json_encode($results, JSON_PRETTY_PRINT);
}
26 changes: 26 additions & 0 deletions src/Exception/BulkHelperException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php
/**
* Elasticsearch PHP Client
*
* @link https://github.com/elastic/elasticsearch-php
* @copyright Copyright (c) Elasticsearch B.V (https://www.elastic.co)
* @license https://opensource.org/licenses/MIT MIT License
*
* Licensed to Elasticsearch B.V under one or more agreements.
* Elasticsearch B.V licenses this file to you under the MIT License.
* See the LICENSE file in the project root for more information.
*/
declare(strict_types=1);

namespace Elastic\Elasticsearch\Exception;

use Elastic\Elasticsearch\Traits\ResponseTrait;
use Exception;

/**
* Bulk Helper error
*/
class BulkHelperException extends Exception implements ElasticsearchException
{
use ResponseTrait;
}
Loading
Loading