1+ package org .opensearch .dataprepper .plugins .source .source_crawler .base ;
2+
3+ import com .google .common .annotations .VisibleForTesting ;
4+ import io .micrometer .core .instrument .Counter ;
5+ import io .micrometer .core .instrument .Timer ;
6+ import lombok .Setter ;
7+ import org .opensearch .dataprepper .metrics .PluginMetrics ;
8+ import org .opensearch .dataprepper .model .acknowledgements .AcknowledgementSet ;
9+ import org .opensearch .dataprepper .model .acknowledgements .AcknowledgementSetManager ;
10+ import org .opensearch .dataprepper .model .buffer .Buffer ;
11+ import org .opensearch .dataprepper .model .event .Event ;
12+ import org .opensearch .dataprepper .model .record .Record ;
13+ import org .opensearch .dataprepper .model .source .coordinator .enhanced .EnhancedSourceCoordinator ;
14+ import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .partition .LeaderPartition ;
15+ import org .opensearch .dataprepper .plugins .source .source_crawler .coordination .state .TokenPaginationCrawlerLeaderProgressState ;
16+ import org .opensearch .dataprepper .plugins .source .source_crawler .model .ItemInfo ;
17+ import org .slf4j .Logger ;
18+ import org .slf4j .LoggerFactory ;
19+
20+ import javax .inject .Named ;
21+ import java .time .Duration ;
22+ import java .time .Instant ;
23+ import java .util .ArrayList ;
24+ import java .util .Iterator ;
25+ import java .util .List ;
26+ import java .util .concurrent .TimeUnit ;
27+ import java .util .concurrent .atomic .AtomicBoolean ;
28+
29+ @ Named
30+ public class LeaderOnlyTokenCrawler implements Crawler <SaasWorkerProgressState > {
31+ private static final Logger log = LoggerFactory .getLogger (LeaderOnlyTokenCrawler .class );
32+ private static final Duration NO_ACK_TIME_OUT_SECONDS = Duration .ofSeconds (900 );
33+ private static final Duration CHECKPOINT_INTERVAL = Duration .ofMinutes (1 );
34+ private static final Duration DEFAULT_LEASE_DURATION = Duration .ofMinutes (15 );
35+ private static final int BATCH_SIZE = 1000 ;
36+
37+ private static final String METRIC_BATCHES_FAILED = "batchesFailed" ;
38+ private static final String METRIC_BUFFER_WRITE_TIME = "bufferWriteTime" ;
39+ public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses" ;
40+ public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures" ;
41+
42+ private final LeaderOnlyTokenCrawlerClient client ;
43+ private final Timer crawlingTimer ;
44+ private final PluginMetrics pluginMetrics ;
45+ @ Setter
46+ private boolean acknowledgementsEnabled ;
47+ @ Setter
48+ private AcknowledgementSetManager acknowledgementSetManager ;
49+ @ Setter
50+ private Buffer <Record <Event >> buffer ;
51+ private final Counter batchesFailedCounter ;
52+ private final Counter acknowledgementSetSuccesses ;
53+ private final Counter acknowledgementSetFailures ;
54+ private final Timer bufferWriteTimer ;
55+
56+ private String lastToken ;
57+ private boolean shouldStopCrawl = false ;
58+ private Duration noAckTimeout ;
59+
60+ public LeaderOnlyTokenCrawler (
61+ LeaderOnlyTokenCrawlerClient client ,
62+ PluginMetrics pluginMetrics ) {
63+ this .client = client ;
64+ this .pluginMetrics = pluginMetrics ;
65+ this .crawlingTimer = pluginMetrics .timer ("crawlingTime" );
66+ this .batchesFailedCounter = pluginMetrics .counter (METRIC_BATCHES_FAILED );
67+ this .bufferWriteTimer = pluginMetrics .timer (METRIC_BUFFER_WRITE_TIME );
68+ this .acknowledgementSetSuccesses = pluginMetrics .counter (ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME );
69+ this .acknowledgementSetFailures = pluginMetrics .counter (ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME );
70+ this .noAckTimeout = NO_ACK_TIME_OUT_SECONDS ;
71+ }
72+
73+ @ Override
74+ public Instant crawl (LeaderPartition leaderPartition ,
75+ EnhancedSourceCoordinator coordinator ) {
76+ shouldStopCrawl = false ;
77+ long startTime = System .currentTimeMillis ();
78+ Instant lastCheckpointTime = Instant .now ();
79+ TokenPaginationCrawlerLeaderProgressState leaderProgressState =
80+ (TokenPaginationCrawlerLeaderProgressState ) leaderPartition .getProgressState ().get ();
81+ lastToken = leaderProgressState .getLastToken ();
82+
83+ log .info ("Starting leader-only crawl with token: {}" , lastToken );
84+
85+ Iterator <ItemInfo > itemIterator = client .listItems (lastToken );
86+
87+ while (itemIterator .hasNext () && !shouldStopCrawl ) {
88+ List <ItemInfo > batch = collectBatch (itemIterator );
89+ if (batch .isEmpty ()) {
90+ continue ;
91+ }
92+
93+ ItemInfo lastItem = batch .get (batch .size () - 1 );
94+ lastToken = lastItem .getItemId ();
95+
96+ try {
97+ processBatch (batch , leaderPartition , coordinator );
98+ } catch (Exception e ) {
99+ batchesFailedCounter .increment ();
100+ log .error ("Failed to process batch ending with token {}" , lastToken , e );
101+ throw e ;
102+ }
103+
104+ // Periodic checkpoint if not using acknowledgments
105+ if (!acknowledgementsEnabled &&
106+ Duration .between (lastCheckpointTime , Instant .now ()).compareTo (CHECKPOINT_INTERVAL ) >= 0 ) {
107+ updateLeaderProgressState (leaderPartition , lastToken , coordinator );
108+ lastCheckpointTime = Instant .now ();
109+ }
110+ }
111+
112+ // Final flush of any remaining items
113+ if (!acknowledgementsEnabled ) {
114+ updateLeaderProgressState (leaderPartition , lastToken , coordinator );
115+ }
116+
117+ long crawlTimeMillis = System .currentTimeMillis () - startTime ;
118+ log .debug ("Crawling completed in {} ms" , crawlTimeMillis );
119+ crawlingTimer .record (crawlTimeMillis , TimeUnit .MILLISECONDS );
120+ return Instant .now ();
121+ }
122+
123+ @ Override
124+ public void executePartition (SaasWorkerProgressState state , Buffer buffer , AcknowledgementSet acknowledgementSet ) {
125+
126+ }
127+
128+ private List <ItemInfo > collectBatch (Iterator <ItemInfo > iterator ) {
129+ List <ItemInfo > batch = new ArrayList <>();
130+ for (int i = 0 ; i < BATCH_SIZE && iterator .hasNext (); i ++) {
131+ ItemInfo item = iterator .next ();
132+ if (item != null ) {
133+ batch .add (item );
134+ }
135+ }
136+ return batch ;
137+ }
138+
139+ private void processBatch (List <ItemInfo > batch ,
140+ LeaderPartition leaderPartition ,
141+ EnhancedSourceCoordinator coordinator ) {
142+ if (acknowledgementsEnabled ) {
143+ AtomicBoolean ackReceived = new AtomicBoolean (false );
144+ long createTimestamp = System .currentTimeMillis ();
145+ AcknowledgementSet acknowledgementSet = acknowledgementSetManager .create (
146+ success -> {
147+ ackReceived .set (true );
148+ if (success ) {
149+ // On success: update checkpoint
150+ acknowledgementSetSuccesses .increment ();
151+ updateLeaderProgressState (leaderPartition , lastToken , coordinator );
152+ } else {
153+ // On failure: Stop the crawl
154+ acknowledgementSetFailures .increment ();
155+ log .warn ("Batch processing received negative acknowledgment for token: {}. Stopping current crawl." , lastToken );
156+ shouldStopCrawl = true ;
157+ }
158+ },
159+ noAckTimeout
160+ );
161+
162+ bufferWriteTimer .record (() -> {
163+ try {
164+ client .writeBatchToBuffer (batch , buffer , acknowledgementSet );
165+ acknowledgementSet .complete ();
166+ // Check every 15 seconds until either:
167+ // 1. We get an ack (positive/negative)
168+ // 2. Or timeout duration is reached
169+ while (!ackReceived .get ()) {
170+ Thread .sleep (Duration .ofSeconds (15 ).toMillis ());
171+ Duration ackWaitDuration = Duration .between (Instant .ofEpochMilli (createTimestamp ), Instant .now ());
172+
173+ if (!ackWaitDuration .minus (noAckTimeout ).isNegative ()) {
174+ // No ack received within NO_ACK_TIME_OUT_SECONDS
175+ log .warn ("Acknowledgment not received for batch with token {} past wait time. Stopping current crawl." , lastToken );
176+ shouldStopCrawl = true ;
177+ break ;
178+ }
179+ }
180+ } catch (InterruptedException e ) {
181+ Thread .currentThread ().interrupt ();
182+ throw new RuntimeException ("Interrupted while waiting for acknowledgment" , e );
183+ } catch (Exception e ) {
184+ log .error ("Failed to process batch ending with token {}" , lastToken , e );
185+ acknowledgementSet .complete ();
186+ throw e ;
187+ }
188+ });
189+ } else {
190+ // Without Acknowledgments:
191+ // Write directly and update checkpoint
192+ bufferWriteTimer .record (() -> {
193+ try {
194+ client .writeBatchToBuffer (batch , buffer , null );
195+ updateLeaderProgressState (leaderPartition , lastToken , coordinator );
196+ } catch (Exception e ) {
197+ log .error ("Failed to write batch to buffer" , e );
198+ throw e ;
199+ }
200+ });
201+ }
202+ }
203+
204+ private void updateLeaderProgressState (LeaderPartition leaderPartition ,
205+ String updatedToken ,
206+ EnhancedSourceCoordinator coordinator ) {
207+ TokenPaginationCrawlerLeaderProgressState leaderProgressState =
208+ (TokenPaginationCrawlerLeaderProgressState ) leaderPartition .getProgressState ().get ();
209+ String oldToken = leaderProgressState .getLastToken ();
210+ leaderProgressState .setLastToken (updatedToken );
211+ leaderPartition .setLeaderProgressState (leaderProgressState );
212+ coordinator .saveProgressStateForPartition (leaderPartition , DEFAULT_LEASE_DURATION );
213+ log .info ("Updated leader progress state: old lastToken={}, new lastToken={}" , oldToken , updatedToken );
214+ }
215+
216+ @ VisibleForTesting
217+ void setNoAckTimeout (Duration timeout ) {
218+ this .noAckTimeout = timeout ;
219+ }
220+ }
0 commit comments