@@ -778,6 +778,123 @@ public void testAllActiveIngestionPeriodicFlush() throws Exception {
778778
779779 waitForSearchableDocs (10 , Arrays .asList (nodeA ));
780780 waitForState (() -> getPeriodicFlushCount (nodeA , indexName ) >= 1 );
781+ }
782+
783+ public void testRawPayloadMapperIngestion () throws Exception {
784+ // Start cluster
785+ internalCluster ().startClusterManagerOnlyNode ();
786+ final String nodeA = internalCluster ().startDataOnlyNode ();
787+
788+ // Publish 2 valid messages
789+ String validMessage1 = "{\" name\" :\" alice\" ,\" age\" :30}" ;
790+ String validMessage2 = "{\" name\" :\" bob\" ,\" age\" :25}" ;
791+ produceData (validMessage1 );
792+ produceData (validMessage2 );
793+
794+ // Create index with raw_payload mapper
795+ createIndex (
796+ indexName ,
797+ Settings .builder ()
798+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
799+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 )
800+ .put ("ingestion_source.type" , "kafka" )
801+ .put ("ingestion_source.param.topic" , topicName )
802+ .put ("ingestion_source.param.bootstrap_servers" , kafka .getBootstrapServers ())
803+ .put ("ingestion_source.pointer.init.reset" , "earliest" )
804+ .put ("ingestion_source.mapper_type" , "raw_payload" )
805+ .put ("ingestion_source.error_strategy" , "drop" )
806+ .put ("ingestion_source.all_active" , true )
807+ .build (),
808+ "{\" properties\" :{\" name\" :{\" type\" : \" text\" },\" age\" :{\" type\" : \" integer\" }}}}"
809+ );
810+
811+ ensureGreen (indexName );
812+
813+ // Wait for both messages to be indexed
814+ waitForSearchableDocs (2 , List .of (nodeA ));
815+
816+ // Verify stats show 2 processed messages
817+ waitForState (() -> {
818+ PollingIngestStats stats = client ().admin ().indices ().prepareStats (indexName ).get ().getIndex (indexName ).getShards ()[0 ]
819+ .getPollingIngestStats ();
820+ return stats != null
821+ && stats .getMessageProcessorStats ().totalProcessedCount () == 2L
822+ && stats .getConsumerStats ().totalPolledCount () == 2L
823+ && stats .getConsumerStats ().totalPollerMessageFailureCount () == 0L
824+ && stats .getConsumerStats ().totalPollerMessageDroppedCount () == 0L
825+ && stats .getMessageProcessorStats ().totalInvalidMessageCount () == 0L ;
826+ });
827+
828+ // Validate document content
829+ SearchResponse searchResponse = client ().prepareSearch (indexName ).get ();
830+ assertEquals (2 , searchResponse .getHits ().getHits ().length );
831+ for (int i = 0 ; i < searchResponse .getHits ().getHits ().length ; i ++) {
832+ Map <String , Object > source = searchResponse .getHits ().getHits ()[i ].getSourceAsMap ();
833+ assertTrue (source .containsKey ("name" ));
834+ assertTrue (source .containsKey ("age" ));
835+ }
836+
837+ // Publish invalid JSON message
838+ String invalidJsonMessage = "{ invalid json" ;
839+ produceData (invalidJsonMessage );
840+
841+ // Wait for consumer to encounter the error and drop it
842+ waitForState (() -> {
843+ PollingIngestStats stats = client ().admin ().indices ().prepareStats (indexName ).get ().getIndex (indexName ).getShards ()[0 ]
844+ .getPollingIngestStats ();
845+ return stats != null
846+ && stats .getConsumerStats ().totalPolledCount () == 3L
847+ && stats .getConsumerStats ().totalPollerMessageFailureCount () == 1L
848+ && stats .getConsumerStats ().totalPollerMessageDroppedCount () == 1L
849+ && stats .getMessageProcessorStats ().totalProcessedCount () == 2L ;
850+ });
851+
852+ // Publish message with invalid content that will fail at processor level
853+ String invalidFieldTypeMessage = "{\" name\" :123,\" age\" :\" not a number\" }" ;
854+ produceData (invalidFieldTypeMessage );
855+
856+ // Wait for processor to encounter the error
857+ waitForState (() -> {
858+ PollingIngestStats stats = client ().admin ().indices ().prepareStats (indexName ).get ().getIndex (indexName ).getShards ()[0 ]
859+ .getPollingIngestStats ();
860+ return stats != null
861+ && stats .getConsumerStats ().totalPolledCount () == 4L
862+ && stats .getConsumerStats ().totalPollerMessageFailureCount () == 1L
863+ && stats .getMessageProcessorStats ().totalProcessedCount () == 3L
864+ && stats .getMessageProcessorStats ().totalFailedCount () == 1L
865+ && stats .getMessageProcessorStats ().totalFailuresDroppedCount () == 1L ;
866+ });
867+
868+ // Pause ingestion, reset to offset 0, and resume
869+ pauseIngestion (indexName );
870+ waitForState (() -> {
871+ GetIngestionStateResponse ingestionState = getIngestionState (indexName );
872+ return ingestionState .getShardStates ().length == 1
873+ && ingestionState .getFailedShards () == 0
874+ && ingestionState .getShardStates ()[0 ].isPollerPaused ()
875+ && ingestionState .getShardStates ()[0 ].getPollerState ().equalsIgnoreCase ("paused" );
876+ });
877+
878+ // Resume with reset to offset 0 (will re-process the 2 valid messages)
879+ resumeIngestion (indexName , 0 , ResumeIngestionRequest .ResetSettings .ResetMode .OFFSET , "0" );
880+ waitForState (() -> {
881+ GetIngestionStateResponse ingestionState = getIngestionState (indexName );
882+ return ingestionState .getShardStates ().length == 1
883+ && ingestionState .getShardStates ()[0 ].isPollerPaused () == false
884+ && (ingestionState .getShardStates ()[0 ].getPollerState ().equalsIgnoreCase ("polling" )
885+ || ingestionState .getShardStates ()[0 ].getPollerState ().equalsIgnoreCase ("processing" ));
886+ });
781887
888+ // Wait for the 3 messages to be processed by the processor after reset (1 will be dropped by the poller)
889+ waitForState (() -> {
890+ PollingIngestStats stats = client ().admin ().indices ().prepareStats (indexName ).get ().getIndex (indexName ).getShards ()[0 ]
891+ .getPollingIngestStats ();
892+ return stats != null && stats .getMessageProcessorStats ().totalProcessedCount () == 3L ;
893+ });
894+
895+ // Verify still only 2 documents (no duplicates must be indexed)
896+ RangeQueryBuilder query = new RangeQueryBuilder ("age" ).gte (0 );
897+ SearchResponse response = client ().prepareSearch (indexName ).setQuery (query ).get ();
898+ assertThat (response .getHits ().getTotalHits ().value (), is (2L ));
782899 }
783900}
0 commit comments