Skip to content

Commit 21aaa09

Browse files
support message mappers to support different input formats and raw payloads
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 66dd2c7 commit 21aaa09

File tree

18 files changed

+644
-52
lines changed

18 files changed

+644
-52
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
- Add async periodic flush task support for pull-based ingestion ([#19878](https://github.com/opensearch-project/OpenSearch/pull/19878))
2727
- Add support for context aware segments ([#19098](https://github.com/opensearch-project/OpenSearch/pull/19098))
2828
- Implement GRPC FunctionScoreQuery ([#19888](https://github.com/opensearch-project/OpenSearch/pull/19888))
29+
- Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765)]
2930

3031
### Changed
3132
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,123 @@ public void testAllActiveIngestionPeriodicFlush() throws Exception {
778778

779779
waitForSearchableDocs(10, Arrays.asList(nodeA));
780780
waitForState(() -> getPeriodicFlushCount(nodeA, indexName) >= 1);
781-
782781
}
782+
783+
// public void testRawPayloadMapperIngestion() throws Exception {
784+
// // Start cluster
785+
// internalCluster().startClusterManagerOnlyNode();
786+
// final String nodeA = internalCluster().startDataOnlyNode();
787+
//
788+
// // Publish 2 valid messages
789+
// String validMessage1 = "{\"name\":\"alice\",\"age\":30}";
790+
// String validMessage2 = "{\"name\":\"bob\",\"age\":25}";
791+
// produceData(validMessage1);
792+
// produceData(validMessage2);
793+
//
794+
// // Create index with raw_payload mapper
795+
// createIndex(
796+
// indexName,
797+
// Settings.builder()
798+
// .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
799+
// .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
800+
// .put("ingestion_source.type", "kafka")
801+
// .put("ingestion_source.param.topic", topicName)
802+
// .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
803+
// .put("ingestion_source.pointer.init.reset", "earliest")
804+
// .put("ingestion_source.mapper_type", "raw_payload")
805+
// .put("ingestion_source.error_strategy", "drop")
806+
// .put("ingestion_source.all_active", true)
807+
// .build(),
808+
// "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
809+
// );
810+
//
811+
// ensureGreen(indexName);
812+
//
813+
// // Wait for both messages to be indexed
814+
// waitForSearchableDocs(2, List.of(nodeA));
815+
//
816+
// // Verify stats show 2 processed messages
817+
// waitForState(() -> {
818+
// PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
819+
// .getPollingIngestStats();
820+
// return stats != null
821+
// && stats.getMessageProcessorStats().totalProcessedCount() == 2L
822+
// && stats.getConsumerStats().totalPolledCount() == 2L
823+
// && stats.getConsumerStats().totalPollerMessageFailureCount() == 0L
824+
// && stats.getConsumerStats().totalPollerMessageDroppedCount() == 0L
825+
// && stats.getMessageProcessorStats().totalInvalidMessageCount() == 0L;
826+
// });
827+
//
828+
// // Validate document content
829+
// SearchResponse searchResponse = client().prepareSearch(indexName).get();
830+
// assertEquals(2, searchResponse.getHits().getHits().length);
831+
// for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
832+
// Map<String, Object> source = searchResponse.getHits().getHits()[i].getSourceAsMap();
833+
// assertTrue(source.containsKey("name"));
834+
// assertTrue(source.containsKey("age"));
835+
// }
836+
//
837+
// // Publish invalid JSON message
838+
// String invalidJsonMessage = "{ invalid json";
839+
// produceData(invalidJsonMessage);
840+
//
841+
// // Wait for consumer to encounter the error and drop it
842+
// waitForState(() -> {
843+
// PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
844+
// .getPollingIngestStats();
845+
// return stats != null
846+
// && stats.getConsumerStats().totalPolledCount() == 3L
847+
// && stats.getConsumerStats().totalPollerMessageFailureCount() == 1L
848+
// && stats.getConsumerStats().totalPollerMessageDroppedCount() == 1L
849+
// && stats.getMessageProcessorStats().totalProcessedCount() == 2L;
850+
// });
851+
//
852+
// // Publish message with invalid content that will fail at processor level
853+
// String invalidFieldTypeMessage = "{\"name\":123,\"age\":\"not a number\"}";
854+
// produceData(invalidFieldTypeMessage);
855+
//
856+
// // Wait for processor to encounter the error
857+
// waitForState(() -> {
858+
// PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
859+
// .getPollingIngestStats();
860+
// return stats != null
861+
// && stats.getConsumerStats().totalPolledCount() == 4L
862+
// && stats.getConsumerStats().totalPollerMessageFailureCount() == 1L
863+
// && stats.getMessageProcessorStats().totalProcessedCount() == 3L
864+
// && stats.getMessageProcessorStats().totalFailedCount() == 1L
865+
// && stats.getMessageProcessorStats().totalFailuresDroppedCount() == 1L;
866+
// });
867+
//
868+
// // Pause ingestion, reset to offset 0, and resume
869+
// pauseIngestion(indexName);
870+
// waitForState(() -> {
871+
// GetIngestionStateResponse ingestionState = getIngestionState(indexName);
872+
// return ingestionState.getShardStates().length == 1
873+
// && ingestionState.getFailedShards() == 0
874+
// && ingestionState.getShardStates()[0].isPollerPaused()
875+
// && ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("paused");
876+
// });
877+
//
878+
// // Resume with reset to offset 0 (will re-process the 2 valid messages)
879+
// resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0");
880+
// waitForState(() -> {
881+
// GetIngestionStateResponse ingestionState = getIngestionState(indexName);
882+
// return ingestionState.getShardStates().length == 1
883+
// && ingestionState.getShardStates()[0].isPollerPaused() == false
884+
// && (ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("polling")
885+
// || ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("processing"));
886+
// });
887+
//
888+
// // Wait for the 3 messages to be processed by the processor after reset (1 will be dropped by the poller)
889+
// waitForState(() -> {
890+
// PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
891+
// .getPollingIngestStats();
892+
// return stats != null && stats.getMessageProcessorStats().totalProcessedCount() == 3L;
893+
// });
894+
//
895+
// // Verify still only 2 documents (no duplicates must be indexed)
896+
// RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
897+
// SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
898+
// assertThat(response.getHits().getTotalHits().value(), is(2L));
899+
// }
783900
}

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.opensearch.index.seqno.SequenceNumbers;
7373
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
7474
import org.opensearch.indices.pollingingest.StreamPoller;
75+
import org.opensearch.indices.pollingingest.mappers.IngestionMessageMapper;
7576
import org.opensearch.indices.replication.SegmentReplicationSource;
7677
import org.opensearch.indices.replication.common.ReplicationType;
7778

@@ -920,6 +921,18 @@ public Iterator<Setting<?>> settings() {
920921
Property.Final
921922
);
922923

924+
/**
925+
* Defines how the incoming ingestion message payload is mapped to the internal message format.
926+
*/
927+
public static final String SETTING_INGESTION_SOURCE_MAPPER_TYPE = "index.ingestion_source.mapper_type";
928+
public static final Setting<IngestionMessageMapper.MapperType> INGESTION_SOURCE_MAPPER_TYPE_SETTING = new Setting<>(
929+
SETTING_INGESTION_SOURCE_MAPPER_TYPE,
930+
IngestionMessageMapper.MapperType.DEFAULT.getName(),
931+
IngestionMessageMapper.MapperType::fromString,
932+
Property.IndexScope,
933+
Property.Final
934+
);
935+
923936
/**
924937
* Defines if all-active pull-based ingestion is enabled. In this mode, replicas will directly consume from the
925938
* streaming source and process the updates. In the default document replication mode, this setting must be enabled.
@@ -1225,6 +1238,7 @@ public IngestionSource getIngestionSource() {
12251238
final int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.get(settings);
12261239
final boolean allActiveIngestionEnabled = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.get(settings);
12271240
final TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.get(settings);
1241+
final IngestionMessageMapper.MapperType mapperType = INGESTION_SOURCE_MAPPER_TYPE_SETTING.get(settings);
12281242

12291243
return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams)
12301244
.setPointerInitReset(pointerInitReset)
@@ -1235,6 +1249,7 @@ public IngestionSource getIngestionSource() {
12351249
.setBlockingQueueSize(blockingQueueSize)
12361250
.setAllActiveIngestion(allActiveIngestionEnabled)
12371251
.setPointerBasedLagUpdateInterval(pointerBasedLagUpdateInterval)
1252+
.setMapperType(mapperType)
12381253
.build();
12391254
}
12401255
return null;

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@
1313
import org.opensearch.common.unit.TimeValue;
1414
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
1515
import org.opensearch.indices.pollingingest.StreamPoller;
16+
import org.opensearch.indices.pollingingest.mappers.IngestionMessageMapper;
1617

1718
import java.util.HashMap;
1819
import java.util.Map;
1920
import java.util.Objects;
2021

2122
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING;
2223
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING;
24+
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAPPER_TYPE_SETTING;
2325
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE;
2426
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING;
2527
import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING;
@@ -40,6 +42,7 @@ public class IngestionSource {
4042
private int blockingQueueSize;
4143
private final boolean allActiveIngestion;
4244
private final TimeValue pointerBasedLagUpdateInterval;
45+
private final IngestionMessageMapper.MapperType mapperType;
4346

4447
private IngestionSource(
4548
String type,
@@ -51,7 +54,8 @@ private IngestionSource(
5154
int numProcessorThreads,
5255
int blockingQueueSize,
5356
boolean allActiveIngestion,
54-
TimeValue pointerBasedLagUpdateInterval
57+
TimeValue pointerBasedLagUpdateInterval,
58+
IngestionMessageMapper.MapperType mapperType
5559
) {
5660
this.type = type;
5761
this.pointerInitReset = pointerInitReset;
@@ -63,6 +67,7 @@ private IngestionSource(
6367
this.blockingQueueSize = blockingQueueSize;
6468
this.allActiveIngestion = allActiveIngestion;
6569
this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval;
70+
this.mapperType = mapperType;
6671
}
6772

6873
public String getType() {
@@ -105,6 +110,10 @@ public TimeValue getPointerBasedLagUpdateInterval() {
105110
return pointerBasedLagUpdateInterval;
106111
}
107112

113+
public IngestionMessageMapper.MapperType getMapperType() {
114+
return mapperType;
115+
}
116+
108117
@Override
109118
public boolean equals(Object o) {
110119
if (this == o) return true;
@@ -119,7 +128,8 @@ public boolean equals(Object o) {
119128
&& Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads)
120129
&& Objects.equals(blockingQueueSize, ingestionSource.blockingQueueSize)
121130
&& Objects.equals(allActiveIngestion, ingestionSource.allActiveIngestion)
122-
&& Objects.equals(pointerBasedLagUpdateInterval, ingestionSource.pointerBasedLagUpdateInterval);
131+
&& Objects.equals(pointerBasedLagUpdateInterval, ingestionSource.pointerBasedLagUpdateInterval)
132+
&& Objects.equals(mapperType, ingestionSource.mapperType);
123133
}
124134

125135
@Override
@@ -134,7 +144,8 @@ public int hashCode() {
134144
numProcessorThreads,
135145
blockingQueueSize,
136146
allActiveIngestion,
137-
pointerBasedLagUpdateInterval
147+
pointerBasedLagUpdateInterval,
148+
mapperType
138149
);
139150
}
140151

@@ -164,6 +175,9 @@ public String toString() {
164175
+ allActiveIngestion
165176
+ ", pointerBasedLagUpdateInterval="
166177
+ pointerBasedLagUpdateInterval
178+
+ ", mapperType='"
179+
+ mapperType
180+
+ '\''
167181
+ '}';
168182
}
169183

@@ -225,6 +239,7 @@ public static class Builder {
225239
private TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.getDefault(
226240
Settings.EMPTY
227241
);
242+
private IngestionMessageMapper.MapperType mapperType = INGESTION_SOURCE_MAPPER_TYPE_SETTING.getDefault(Settings.EMPTY);
228243

229244
public Builder(String type) {
230245
this.type = type;
@@ -239,6 +254,7 @@ public Builder(IngestionSource ingestionSource) {
239254
this.blockingQueueSize = ingestionSource.blockingQueueSize;
240255
this.allActiveIngestion = ingestionSource.allActiveIngestion;
241256
this.pointerBasedLagUpdateInterval = ingestionSource.pointerBasedLagUpdateInterval;
257+
this.mapperType = ingestionSource.mapperType;
242258
}
243259

244260
public Builder setPointerInitReset(PointerInitReset pointerInitReset) {
@@ -291,6 +307,11 @@ public Builder setPointerBasedLagUpdateInterval(TimeValue pointerBasedLagUpdateI
291307
return this;
292308
}
293309

310+
public Builder setMapperType(IngestionMessageMapper.MapperType mapperType) {
311+
this.mapperType = mapperType;
312+
return this;
313+
}
314+
294315
public IngestionSource build() {
295316
return new IngestionSource(
296317
type,
@@ -302,7 +323,8 @@ public IngestionSource build() {
302323
numProcessorThreads,
303324
blockingQueueSize,
304325
allActiveIngestion,
305-
pointerBasedLagUpdateInterval
326+
pointerBasedLagUpdateInterval,
327+
mapperType
306328
);
307329
}
308330

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
285285
IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING,
286286
IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING,
287287
IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING,
288+
IndexMetadata.INGESTION_SOURCE_MAPPER_TYPE_SETTING,
288289

289290
// Settings for search replica
290291
IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING,

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ private void initializeStreamPoller(
146146
.numProcessorThreads(ingestionSource.getNumProcessorThreads())
147147
.blockingQueueSize(ingestionSource.getBlockingQueueSize())
148148
.pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval().millis())
149+
.mapperType(ingestionSource.getMapperType())
149150
.build();
150151
registerStreamPollerListener();
151152

0 commit comments

Comments
 (0)