From 04e5cb091c7c0d650ddc7c9538a3abf72e6414bb Mon Sep 17 00:00:00 2001 From: Alekhya Parisha Date: Tue, 28 Oct 2025 18:25:24 -0700 Subject: [PATCH] Implement LeaderOnlyTokenCrawler Signed-off-by: Alekhya Parisha --- .../base/LeaderOnlyTokenCrawler.java | 220 ++++++++++++++++++ .../base/LeaderOnlyTokenCrawlerClient.java | 28 +++ .../base/LeaderOnlyTokenCrawlerTest.java | 198 ++++++++++++++++ 3 files changed, 446 insertions(+) create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawler.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawlerClient.java create mode 100644 data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawlerTest.java diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawler.java new file mode 100644 index 0000000000..4c2cd7adf0 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawler.java @@ -0,0 +1,220 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; +import lombok.Setter; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Named; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +@Named +public class LeaderOnlyTokenCrawler implements Crawler { + private static final Logger log = LoggerFactory.getLogger(LeaderOnlyTokenCrawler.class); + private static final Duration NO_ACK_TIME_OUT_SECONDS = Duration.ofSeconds(900); + private static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(1); + private static final Duration DEFAULT_LEASE_DURATION = Duration.ofMinutes(15); + private static final int BATCH_SIZE = 1000; + + private static final String METRIC_BATCHES_FAILED = "batchesFailed"; + private static final String METRIC_BUFFER_WRITE_TIME = "bufferWriteTime"; + public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses"; + public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures"; + + private final LeaderOnlyTokenCrawlerClient client; + private final Timer crawlingTimer; + private final PluginMetrics pluginMetrics; + @Setter + private boolean acknowledgementsEnabled; + @Setter + private AcknowledgementSetManager acknowledgementSetManager; + @Setter + private Buffer> buffer; + private final Counter batchesFailedCounter; + private final Counter acknowledgementSetSuccesses; + private final Counter acknowledgementSetFailures; + private final Timer bufferWriteTimer; + + private String lastToken; + private boolean shouldStopCrawl = false; + private Duration noAckTimeout; + + public LeaderOnlyTokenCrawler( + LeaderOnlyTokenCrawlerClient client, + PluginMetrics pluginMetrics) { + this.client = client; + this.pluginMetrics = pluginMetrics; + this.crawlingTimer = pluginMetrics.timer("crawlingTime"); + this.batchesFailedCounter = pluginMetrics.counter(METRIC_BATCHES_FAILED); + this.bufferWriteTimer = pluginMetrics.timer(METRIC_BUFFER_WRITE_TIME); + this.acknowledgementSetSuccesses = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME); + this.acknowledgementSetFailures = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME); + this.noAckTimeout = NO_ACK_TIME_OUT_SECONDS; + } + + @Override + public Instant crawl(LeaderPartition leaderPartition, + EnhancedSourceCoordinator coordinator) { + shouldStopCrawl = false; + long startTime = System.currentTimeMillis(); + Instant lastCheckpointTime = Instant.now(); + TokenPaginationCrawlerLeaderProgressState leaderProgressState = + (TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get(); + lastToken = leaderProgressState.getLastToken(); + + log.info("Starting leader-only crawl with token: {}", lastToken); + + Iterator itemIterator = client.listItems(lastToken); + + while (itemIterator.hasNext() && !shouldStopCrawl) { + List batch = collectBatch(itemIterator); + if (batch.isEmpty()) { + continue; + } + + ItemInfo lastItem = batch.get(batch.size() - 1); + lastToken = lastItem.getItemId(); + + try { + processBatch(batch, leaderPartition, coordinator); + } catch (Exception e) { + batchesFailedCounter.increment(); + log.error("Failed to process batch ending with token {}", lastToken, e); + throw e; + } + + // Periodic checkpoint if not using acknowledgments + if (!acknowledgementsEnabled && + Duration.between(lastCheckpointTime, Instant.now()).compareTo(CHECKPOINT_INTERVAL) >= 0) { + updateLeaderProgressState(leaderPartition, lastToken, coordinator); + lastCheckpointTime = Instant.now(); + } + } + + // Final flush of any remaining items + if (!acknowledgementsEnabled) { + updateLeaderProgressState(leaderPartition, lastToken, coordinator); + } + + long crawlTimeMillis = System.currentTimeMillis() - startTime; + log.debug("Crawling completed in {} ms", crawlTimeMillis); + crawlingTimer.record(crawlTimeMillis, TimeUnit.MILLISECONDS); + return Instant.now(); + } + + @Override + public void executePartition(SaasWorkerProgressState state, Buffer buffer, AcknowledgementSet acknowledgementSet) { + + } + + private List collectBatch(Iterator iterator) { + List batch = new ArrayList<>(); + for (int i = 0; i < BATCH_SIZE && iterator.hasNext(); i++) { + ItemInfo item = iterator.next(); + if (item != null) { + batch.add(item); + } + } + return batch; + } + + private void processBatch(List batch, + LeaderPartition leaderPartition, + EnhancedSourceCoordinator coordinator) { + if (acknowledgementsEnabled) { + AtomicBoolean ackReceived = new AtomicBoolean(false); + long createTimestamp = System.currentTimeMillis(); + AcknowledgementSet acknowledgementSet = acknowledgementSetManager.create( + success -> { + ackReceived.set(true); + if (success) { + // On success: update checkpoint + acknowledgementSetSuccesses.increment(); + updateLeaderProgressState(leaderPartition, lastToken, coordinator); + } else { + // On failure: Stop the crawl + acknowledgementSetFailures.increment(); + log.warn("Batch processing received negative acknowledgment for token: {}. Stopping current crawl.", lastToken); + shouldStopCrawl = true; + } + }, + noAckTimeout + ); + + bufferWriteTimer.record(() -> { + try { + client.writeBatchToBuffer(batch, buffer, acknowledgementSet); + acknowledgementSet.complete(); + // Check every 15 seconds until either: + // 1. We get an ack (positive/negative) + // 2. Or timeout duration is reached + while (!ackReceived.get()) { + Thread.sleep(Duration.ofSeconds(15).toMillis()); + Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(createTimestamp), Instant.now()); + + if (!ackWaitDuration.minus(noAckTimeout).isNegative()) { + // No ack received within NO_ACK_TIME_OUT_SECONDS + log.warn("Acknowledgment not received for batch with token {} past wait time. Stopping current crawl.", lastToken); + shouldStopCrawl = true; + break; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for acknowledgment", e); + } catch (Exception e) { + log.error("Failed to process batch ending with token {}", lastToken, e); + acknowledgementSet.complete(); + throw e; + } + }); + } else { + // Without Acknowledgments: + // Write directly and update checkpoint + bufferWriteTimer.record(() -> { + try { + client.writeBatchToBuffer(batch, buffer, null); + updateLeaderProgressState(leaderPartition, lastToken, coordinator); + } catch (Exception e) { + log.error("Failed to write batch to buffer", e); + throw e; + } + }); + } + } + + private void updateLeaderProgressState(LeaderPartition leaderPartition, + String updatedToken, + EnhancedSourceCoordinator coordinator) { + TokenPaginationCrawlerLeaderProgressState leaderProgressState = + (TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get(); + String oldToken = leaderProgressState.getLastToken(); + leaderProgressState.setLastToken(updatedToken); + leaderPartition.setLeaderProgressState(leaderProgressState); + coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_LEASE_DURATION); + log.info("Updated leader progress state: old lastToken={}, new lastToken={}", oldToken, updatedToken); + } + + @VisibleForTesting + void setNoAckTimeout(Duration timeout) { + this.noAckTimeout = timeout; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawlerClient.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawlerClient.java new file mode 100644 index 0000000000..c5cab4d7c2 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawlerClient.java @@ -0,0 +1,28 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; + +import java.util.List; + +/** + * Interface for leader-only token-based crawler client that extends TokenCrawlerClient. + * This interface adds additional method for direct buffer writing, + * optimized for single-leader processing without worker partitions. + */ +public interface LeaderOnlyTokenCrawlerClient extends TokenCrawlerClient { + /** + * Writes a batch of items directly to the buffer. + * + * @param items The batch of items to write + * @param buffer The buffer to write events to + * @param acknowledgementSet Optional acknowledgment set for tracking write completion. + * If provided, items will be added to this set for acknowledgment tracking. + * Can be null if acknowledgments are disabled. + */ + void writeBatchToBuffer(List items, Buffer> buffer, AcknowledgementSet acknowledgementSet); +} \ No newline at end of file diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawlerTest.java new file mode 100644 index 0000000000..d5faa93862 --- /dev/null +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/LeaderOnlyTokenCrawlerTest.java @@ -0,0 +1,198 @@ +package org.opensearch.dataprepper.plugins.source.source_crawler.base; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; +import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +@ExtendWith(MockitoExtension.class) +class LeaderOnlyTokenCrawlerTest { + private static final int BATCH_SIZE = 1000; + private static final String INITIAL_TOKEN = "initial-token"; + private static final Duration TEST_TIMEOUT = Duration.ofMillis(100); + + @Mock + private LeaderOnlyTokenCrawlerClient client; + @Mock + private EnhancedSourceCoordinator coordinator; + @Mock + private LeaderPartition leaderPartition; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private AcknowledgementSet acknowledgementSet; + @Mock + private Buffer> buffer; + + private LeaderOnlyTokenCrawler crawler; + private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("CrawlerTest", "crawler"); + + @BeforeEach + void setup() { + crawler = new LeaderOnlyTokenCrawler(client, pluginMetrics); + crawler.setAcknowledgementSetManager(acknowledgementSetManager); + crawler.setBuffer(buffer); + crawler.setNoAckTimeout(TEST_TIMEOUT); // Set short timeout for tests + when(leaderPartition.getProgressState()) + .thenReturn(Optional.of(new TokenPaginationCrawlerLeaderProgressState(INITIAL_TOKEN))); + } + + @Test + void testCrawlWithEmptyList() { + when(client.listItems(INITIAL_TOKEN)).thenReturn(Collections.emptyIterator()); + crawler.crawl(leaderPartition, coordinator); + verify(client, never()).writeBatchToBuffer(any(), any(), any()); + } + + @Test + void testSimpleCrawl() { + List items = createTestItems(1); + when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator()); + crawler.crawl(leaderPartition, coordinator); + verify(client).writeBatchToBuffer(items, buffer, null); + } + + @Test + void testCrawlWithAcknowledgmentsEnabled() { + List items = createTestItems(1); + when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator()); + when(acknowledgementSetManager.create(any(), eq(TEST_TIMEOUT))) + .thenReturn(acknowledgementSet); + + ArgumentCaptor> callbackCaptor = ArgumentCaptor.forClass(Consumer.class); + + crawler.setAcknowledgementsEnabled(true); + crawler.crawl(leaderPartition, coordinator); + + verify(acknowledgementSetManager).create(callbackCaptor.capture(), eq(TEST_TIMEOUT)); + + // Simulate immediate successful acknowledgment + callbackCaptor.getValue().accept(true); + + verify(client).writeBatchToBuffer(items, buffer, acknowledgementSet); + verify(acknowledgementSet).complete(); + verify(coordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); + } + + @Test + void testCrawlWithMultipleBatches() { + List items = createTestItems(BATCH_SIZE + 1); + when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator()); + + crawler.setAcknowledgementsEnabled(false); + crawler.crawl(leaderPartition, coordinator); + + verify(client, times(2)).writeBatchToBuffer(any(), eq(buffer), any()); + } + + @Test + void testBufferWriteFailure() { + List items = createTestItems(1); + when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator()); + doThrow(new RuntimeException("Buffer write failed")) + .when(client).writeBatchToBuffer(any(), any(), any()); + + assertThrows(RuntimeException.class, () -> + crawler.crawl(leaderPartition, coordinator)); + } + + @Test + void testProgressStateUpdate() { + List items = createTestItems(1); + String newToken = "id0"; + when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator()); + + crawler.setAcknowledgementsEnabled(false); + crawler.crawl(leaderPartition, coordinator); + + verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); + TokenPaginationCrawlerLeaderProgressState state = + (TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get(); + assertEquals(newToken, state.getLastToken()); + } + + @Test + void testNegativeAcknowledgment() { + List items = createTestItems(BATCH_SIZE + 1); + when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator()); + when(acknowledgementSetManager.create(any(), eq(TEST_TIMEOUT))) + .thenReturn(acknowledgementSet); + + ArgumentCaptor> callbackCaptor = ArgumentCaptor.forClass(Consumer.class); + + crawler.setAcknowledgementsEnabled(true); + crawler.crawl(leaderPartition, coordinator); + + verify(acknowledgementSetManager).create(callbackCaptor.capture(), eq(TEST_TIMEOUT)); + + // Simulate negative acknowledgment + callbackCaptor.getValue().accept(false); + + verify(client, times(1)).writeBatchToBuffer(any(), any(), any()); + verify(coordinator, never()).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); + } + + @Test + void testAcknowledgmentTimeout() { + List items = createTestItems(BATCH_SIZE + 1); + when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator()); + when(acknowledgementSetManager.create(any(), eq(TEST_TIMEOUT))) + .thenReturn(acknowledgementSet); + + ArgumentCaptor> callbackCaptor = ArgumentCaptor.forClass(Consumer.class); + + crawler.setAcknowledgementsEnabled(true); + crawler.crawl(leaderPartition, coordinator); + + verify(acknowledgementSetManager).create(callbackCaptor.capture(), eq(TEST_TIMEOUT)); + + // Verify: + // 1. Only first batch was processed + verify(client, times(1)).writeBatchToBuffer(any(), any(), any()); + // 2. No checkpoint update happened + verify(coordinator, never()).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); + // 3. Acknowledgment set was completed + verify(acknowledgementSet).complete(); + } + + + private List createTestItems(int count) { + List items = new ArrayList<>(); + for (int i = 0; i < count; i++) { + items.add(new TestItemInfo("id" + i, new HashMap<>(), Instant.now())); + } + return items; + } +} \ No newline at end of file