Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,11 @@ response = client.ingestion.get_state(
"shard": 0,
"poller_state": "POLLING",
"error_policy": "DROP",
"poller_paused": false
"poller_paused": false,
"write_block_enabled" : false,
"batch_start_pointer" : "KafkaOffset{offset=2}",
"is_primary" : true,
"node" : "node_name"
}
]
}
Expand Down
20 changes: 18 additions & 2 deletions _api-reference/document-apis/pull-based-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ Before using pull-based ingestion, ensure that the following prerequisites are m
* Install an ingestion plugin for your streaming source using the command `bin/opensearch-plugin install <plugin-name>`. For more information, see [Additional plugins]({{site.url}}{{site.baseurl}}/install-and-configure/additional-plugins/index/). OpenSearch supports the following ingestion plugins:
- `ingestion-kafka`
- `ingestion-kinesis`
* Enable [segment replication]({{site.url}}{{site.baseurl}}/tuning-your-cluster/availability-and-recovery/segment-replication/index/) with [remote-backed storage]({{site.url}}{{site.baseurl}}/tuning-your-cluster/availability-and-recovery/remote-store/index/). Pull-based ingestion is not compatible with document replication.
* Configure pull-based ingestion during [index creation](#creating-an-index-for-pull-based-ingestion). You cannot convert an existing push-based index to a pull-based one.

## Creating an index for pull-based ingestion

To ingest data from a streaming source, first create an index with pull-based ingestion settings. The following request creates an index that pulls data from a Kafka topic:
To ingest data from a streaming source, first create an index with pull-based ingestion settings. The following request creates an index that pulls data from a Kafka topic in segment replication mode. See [ingestion modes](#ingestion-modes) for other available modes.

```json
PUT /my-index
Expand Down Expand Up @@ -75,8 +74,25 @@ The `ingestion_source` parameters control how OpenSearch pulls data from the str
| `poll.timeout` | The maximum time to wait for data in each poll operation. Optional. |
| `num_processor_threads` | The number of threads for processing ingested data. Optional. Default is 1. |
| `internal_queue_size` | The size of the internal blocking queue for advanced tuning. Valid values are from 1 to 100,000, inclusive. Optional. Default is 100. |
| `all_active` | Boolean value determining if all-active ingestion mode should be enabled. This is not supported in segment replication mode. Default value is false. See [ingestion modes](#ingestion-modes) |
| `param` | Source-specific configuration parameters. Required. <br>&ensp;&#x2022; The `ingest-kafka` plugin requires:<br>&ensp;&ensp;- `topic`: The Kafka topic to consume from<br>&ensp;&ensp;- `bootstrap_servers`: The Kafka server addresses<br>&ensp;&ensp;Optionally, you can provide additional standard Kafka consumer parameters (such as `fetch.min.bytes`). These parameters are passed directly to the Kafka consumer. <br>&ensp;&#x2022; The `ingest-kinesis` plugin requires:<br>&ensp;&ensp;- `stream`: The Kinesis stream name<br>&ensp;&ensp;- `region`: The AWS Region<br>&ensp;&ensp;- `access_key`: The AWS access key<br>&ensp;&ensp;- `secret_key`: The AWS secret key<br>&ensp;&ensp;Optionally, you can provide an `endpoint_override`. |


### Ingestion modes

Pull-based ingestion supports the following modes:

#### Segment replication mode

Pull-based ingestion can be used in the [segment replication]({{site.url}}{{site.baseurl}}/tuning-your-cluster/availability-and-recovery/segment-replication/index/) mode.
In this mode, the primary shards ingest events from a streaming source and index the documents. Segments are then copied over to the replica shards.
It is recommended to use this mode with a [remote-backed storage]({{site.url}}{{site.baseurl}}/tuning-your-cluster/availability-and-recovery/remote-store/index/).

#### All-active mode

All-active mode is the document replication equivalent in pull-based ingestion. Primary and replica shards ingest events from the streaming source and index documents independently.
There is no replication or coordination between the primary and replica shards. Replica shards however may depend on the primary shard during bootstrapping to copy the segment files if a local copy is unavailable.

### Stream position

When creating an index, you can specify where OpenSearch should start reading from the stream by configuring the `pointer.init.reset` and `pointer.init.reset.value` settings in the `ingestion_source` parameter. OpenSearch will resume reading from the last commited position for existing indexes.
Expand Down