Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,11 @@ response = client.ingestion.get_state(
"shard": 0,
"poller_state": "POLLING",
"error_policy": "DROP",
"poller_paused": false
"poller_paused": false,
"write_block_enabled" : false,
"batch_start_pointer" : "KafkaOffset{offset=2}",
"is_primary" : true,
"node" : "node_name"
}
]
}
Expand Down
26 changes: 24 additions & 2 deletions _api-reference/document-apis/pull-based-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ Before using pull-based ingestion, ensure that the following prerequisites are m
* Install an ingestion plugin for your streaming source using the command `bin/opensearch-plugin install <plugin-name>`. For more information, see [Additional plugins]({{site.url}}{{site.baseurl}}/install-and-configure/additional-plugins/index/). OpenSearch supports the following ingestion plugins:
- `ingestion-kafka`
- `ingestion-kinesis`
* Enable [segment replication]({{site.url}}{{site.baseurl}}/tuning-your-cluster/availability-and-recovery/segment-replication/index/) with [remote-backed storage]({{site.url}}{{site.baseurl}}/tuning-your-cluster/availability-and-recovery/remote-store/index/). Pull-based ingestion is not compatible with document replication.
* Configure pull-based ingestion during [index creation](#creating-an-index-for-pull-based-ingestion). You cannot convert an existing push-based index to a pull-based one.

## Creating an index for pull-based ingestion

To ingest data from a streaming source, first create an index with pull-based ingestion settings. The following request creates an index that pulls data from a Kafka topic:
To ingest data from a streaming source, first create an index with pull-based ingestion settings. The following request creates an index that pulls data from a Kafka topic in the segment replication mode. For other available modes, see [Ingestion modes](#ingestion-modes):

```json
PUT /my-index
Expand Down Expand Up @@ -75,8 +74,31 @@ The `ingestion_source` parameters control how OpenSearch pulls data from the str
| `poll.timeout` | The maximum time to wait for data in each poll operation. Optional. |
| `num_processor_threads` | The number of threads for processing ingested data. Optional. Default is 1. |
| `internal_queue_size` | The size of the internal blocking queue for advanced tuning. Valid values are from 1 to 100,000, inclusive. Optional. Default is 100. |
| `all_active` | Whether to enable the all-active ingestion mode. Cannot be enabled for indexes that use segment replication mode. Default is `false`. See [Ingestion modes](#ingestion-modes). |
| `param` | Source-specific configuration parameters. Required. <br>&ensp;&#x2022; The `ingest-kafka` plugin requires:<br>&ensp;&ensp;- `topic`: The Kafka topic to consume from<br>&ensp;&ensp;- `bootstrap_servers`: The Kafka server addresses<br>&ensp;&ensp;Optionally, you can provide additional standard Kafka consumer parameters (such as `fetch.min.bytes`). These parameters are passed directly to the Kafka consumer. <br>&ensp;&#x2022; The `ingest-kinesis` plugin requires:<br>&ensp;&ensp;- `stream`: The Kinesis stream name<br>&ensp;&ensp;- `region`: The AWS Region<br>&ensp;&ensp;- `access_key`: The AWS access key<br>&ensp;&ensp;- `secret_key`: The AWS secret key<br>&ensp;&ensp;Optionally, you can provide an `endpoint_override`. |


### Ingestion modes

Pull-based ingestion supports the following modes.

#### Segment replication mode

In segment replication mode, the primary shards ingest events from a streaming source and index the documents. The pull-based index is configured to use [segment replication]({{site.url}}{{site.baseurl}}/tuning-your-cluster/availability-and-recovery/segment-replication/index/) to copy over the segment files from primary to replica shards, as shown in the following image.

![Pull-based ingestion segment replication mode]({{site.url}}{{site.baseurl}}/images/pull-based-ingestion/pull-based-segrep-mode.png){: width="50%" }

We recommend using this mode with [remote-backed storage]({{site.url}}{{site.baseurl}}/tuning-your-cluster/availability-and-recovery/remote-store/index/).
{: .tip}

#### All-active mode

Enabling all-active mode allows both primary and replica shards to independently ingest and index events from the streaming source, as shown in the following image.

![Pull-based ingestion all active mode]({{site.url}}{{site.baseurl}}/images/pull-based-ingestion/pull-based-all-active-mode.png){: width="50%" }

There is no replication or coordination between the shards, although replica shards may fetch segment files from the primary shard during bootstrapping if a local copy is unavailable. This mode is currently not supported with segment replication.

### Stream position

When creating an index, you can specify where OpenSearch should start reading from the stream by configuring the `pointer.init.reset` and `pointer.init.reset.value` settings in the `ingestion_source` parameter. OpenSearch will resume reading from the last commited position for existing indexes.
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading