Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import lombok.Setter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Named
public class LeaderOnlyTokenCrawler implements Crawler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not use the raw type. You need Crawler<...>.

private static final Logger log = LoggerFactory.getLogger(LeaderOnlyTokenCrawler.class);
private static final Duration BUFFER_WRITE_TIMEOUT = Duration.ofSeconds(15);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the client be providing these values?

private static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(1);
private static final Duration DEFAULT_LEASE_DURATION = Duration.ofMinutes(15);
private static final int BATCH_SIZE = 50;

private static final String METRIC_BATCHES_FAILED = "batchesFailed";
private static final String METRIC_BUFFER_WRITE_TIME = "bufferWriteTime";
public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses";
public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures";

private final LeaderOnlyTokenCrawlerClient client;
private final Timer crawlingTimer;
private final PluginMetrics pluginMetrics;
@Setter
private boolean acknowledgementsEnabled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being set from the Source plugin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

@Setter
private AcknowledgementSetManager acknowledgementSetManager;
@Setter
private Buffer<Record<Event>> buffer;
private final Counter batchesFailedCounter;
private final Counter acknowledgementSetSuccesses;
private final Counter acknowledgementSetFailures;
private final Timer bufferWriteTimer;

private String lastToken;

public LeaderOnlyTokenCrawler(
LeaderOnlyTokenCrawlerClient client,
PluginMetrics pluginMetrics) {
this.client = client;
this.pluginMetrics = pluginMetrics;
this.crawlingTimer = pluginMetrics.timer("crawlingTime");
this.batchesFailedCounter = pluginMetrics.counter(METRIC_BATCHES_FAILED);
this.bufferWriteTimer = pluginMetrics.timer(METRIC_BUFFER_WRITE_TIME);
this.acknowledgementSetSuccesses = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME);
this.acknowledgementSetFailures = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME);
}

@Override
public Instant crawl(LeaderPartition leaderPartition,
EnhancedSourceCoordinator coordinator) {
long startTime = System.currentTimeMillis();
Instant lastCheckpointTime = Instant.now();
TokenPaginationCrawlerLeaderProgressState leaderProgressState =
(TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get();
lastToken = leaderProgressState.getLastToken();

log.info("Starting leader-only crawl with token: {}", lastToken);

Iterator<ItemInfo> itemIterator = client.listItems(lastToken);

while (itemIterator.hasNext()) {
List<ItemInfo> batch = collectBatch(itemIterator);
if (batch.isEmpty()) {
continue;
}

ItemInfo lastItem = batch.get(batch.size() - 1);
lastToken = lastItem.getItemId();

try {
processBatch(batch, leaderPartition, coordinator);
} catch (Exception e) {
batchesFailedCounter.increment();
log.error("Failed to process batch ending with token {}", lastToken, e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using NOISY is preferred for this case since we are printing the entire stack trace

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this. This is happening at a batch level (not per-event level). And it should be more of a defensive programming catch.

throw e;
}

// Periodic checkpoint if not using acknowledgments
if (!acknowledgementsEnabled &&
Duration.between(lastCheckpointTime, Instant.now()).compareTo(CHECKPOINT_INTERVAL) >= 0) {
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
lastCheckpointTime = Instant.now();
}
}

// Final flush of any remaining items
if (!acknowledgementsEnabled) {
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
}

long crawlTimeMillis = System.currentTimeMillis() - startTime;
log.debug("Crawling completed in {} ms", crawlTimeMillis);
crawlingTimer.record(crawlTimeMillis, TimeUnit.MILLISECONDS);
return Instant.now();
}

@Override
public void executePartition(SaasWorkerProgressState state, Buffer buffer, AcknowledgementSet acknowledgementSet) {

}

private List<ItemInfo> collectBatch(Iterator<ItemInfo> iterator) {
List<ItemInfo> batch = new ArrayList<>();
for (int i = 0; i < BATCH_SIZE && iterator.hasNext(); i++) {
ItemInfo item = iterator.next();
if (item != null) {
batch.add(item);
}
}
return batch;
}

private void processBatch(List<ItemInfo> batch,
LeaderPartition leaderPartition,
EnhancedSourceCoordinator coordinator) {
if (acknowledgementsEnabled) {
AcknowledgementSet acknowledgementSet = acknowledgementSetManager.create(
success -> {
if (success) {
// On success: update checkpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these plugin metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added these metrics

acknowledgementSetSuccesses.increment();
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String a not a primitive variable. updateLeaderProgressState here is async function which might be invoked a while later.

Is it possible the lastToken here is already changed to a different value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback is processed right after writing to the buffer, and before processing the next batch. Since we only update lastToken when processing a new batch, its value will remain consistent during the acknowledgment handling for the current batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then it could be a big performance risk because it will wait until event being ingested and then move to next batch.

Say the buffer wait time is 30 second, it means we can only process 50 events every 30 second?

If that is the case, shall we increase the batch size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to increase the batch size, we need a reasonably high number to ensure the minimal speed as well as not breaching buffer size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@san81 Do you have any recommendation for the batch size

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to just say one number. I would say, really test with different batch sizes and see what gives the best performance. The factors to consider are

  • vendor api support for max page size and latency with respect to the increase in page size
  • size of the api response payload with respect to the increase in page size. Just giving an example here. For S3 based pipelines, OSI pipelines process at least 20Mbps per OCU.
  • In these 3p connector pipelines, I don't think we will ever fill up the buffer. Each OCU comes up with 8GB buffer size. Biggest bottleneck is the vendor API latency and network latency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that is the case, I think we can easily go up to 5000 event batch for now and we will do load test to verify it is working without problem. The max size is no more then 10kb so the total size is maximumly 50MB, way less than 8GB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After I rethink about it. We should make it configurable per connector.

For Okta, each API can only return maximum 100 events. So 5000 event batch requires 50 calls, which can easily run into a failure mode where 1 api failure block 49 other calls.
A reasonable balance is 5 ~ 10 calls

} else {
// On failure: give up partition
acknowledgementSetFailures.increment();
log.error("Batch processing failed for token: {}", lastToken);
coordinator.giveUpPartition(leaderPartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should be giving up the partition, since this indicates the partition is shutting down and if this is for leader partition, then it will indicate the entire plugin is shutting down which is not the case:

Should be called by the source when it is shutting down to indicate that it will no longer be able to perform work on partitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alparish what happens if acknowledgements do not comeback? Do you have a retry mechanism?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @bbenner7635 Giving up is probably not right here. Please test it locally to confirm the behavior

Copy link
Collaborator

@kkondaka kkondaka Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, acknowledgement timeouts should be very large (or INT MAX) because retrying events (that have not been acknowledged) means that the duplicate events will be sent. And if there are buffer issues or sink issues that caused the acknowledgements to be timedout, then injecting same events again will make the problem worse.

}
},
BUFFER_WRITE_TIMEOUT
);

bufferWriteTimer.record(() -> {
try {
client.writeBatchToBuffer(batch, buffer, acknowledgementSet);
acknowledgementSet.complete();
} catch (Exception e) {
log.error("Failed to write batch to buffer", e);
acknowledgementSet.complete(); // This will trigger the failure callback
throw e;
}
});
} else {
// Without Acknowledgments:
// Write directly and update checkpoint
bufferWriteTimer.record(() -> {
try {
client.writeBatchToBuffer(batch, buffer, null);
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
} catch (Exception e) {
log.error("Failed to write batch to buffer", e);
throw e;
}
});
}
}

private void updateLeaderProgressState(LeaderPartition leaderPartition,
String updatedToken,
EnhancedSourceCoordinator coordinator) {
TokenPaginationCrawlerLeaderProgressState leaderProgressState =
(TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get();
String oldToken = leaderProgressState.getLastToken();
leaderProgressState.setLastToken(updatedToken);
leaderPartition.setLeaderProgressState(leaderProgressState);
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_LEASE_DURATION);
log.info("Updated leader progress state: old lastToken={}, new lastToken={}", oldToken, updatedToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;

import java.util.List;

/**
* Interface for leader-only token-based crawler client that extends TokenCrawlerClient.
* This interface adds additional method for direct buffer writing,
* optimized for single-leader processing without worker partitions.
*/
public interface LeaderOnlyTokenCrawlerClient extends TokenCrawlerClient<PaginationCrawlerWorkerProgressState> {
/**
* Writes a batch of items directly to the buffer.
*
* @param items The batch of items to write
* @param buffer The buffer to write events to
* @param acknowledgementSet Optional acknowledgment set for tracking write completion.
* If provided, items will be added to this set for acknowledgment tracking.
* Can be null if acknowledgments are disabled.
*/
void writeBatchToBuffer(List<ItemInfo> items, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;

import static org.mockito.internal.verification.VerificationModeFactory.times;

@ExtendWith(MockitoExtension.class)
class LeaderOnlyTokenCrawlerTest {
private static final int BATCH_SIZE = 50;
private static final String INITIAL_TOKEN = "initial-token";
private static final Duration BUFFER_WRITE_TIMEOUT = Duration.ofSeconds(15);

@Mock
private LeaderOnlyTokenCrawlerClient client;
@Mock
private EnhancedSourceCoordinator coordinator;
@Mock
private LeaderPartition leaderPartition;
@Mock
private AcknowledgementSetManager acknowledgementSetManager;
@Mock
private AcknowledgementSet acknowledgementSet;
@Mock
private Buffer<Record<Event>> buffer;

private LeaderOnlyTokenCrawler crawler;
private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("CrawlerTest", "crawler");

@BeforeEach
void setup() {
crawler = new LeaderOnlyTokenCrawler(client, pluginMetrics);
crawler.setAcknowledgementSetManager(acknowledgementSetManager);
crawler.setBuffer(buffer);
when(leaderPartition.getProgressState())
.thenReturn(Optional.of(new TokenPaginationCrawlerLeaderProgressState(INITIAL_TOKEN)));
}

@Test
void testCrawlWithEmptyList() {
when(client.listItems(INITIAL_TOKEN)).thenReturn(Collections.emptyIterator());
crawler.crawl(leaderPartition, coordinator);
verify(client, never()).writeBatchToBuffer(any(), any(), any());
}

@Test
void testSimpleCrawl() {
List<ItemInfo> items = createTestItems(1);
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
crawler.crawl(leaderPartition, coordinator);
verify(client).writeBatchToBuffer(items, buffer, null);
}

@Test
void testCrawlWithAcknowledgmentsEnabled() {
List<ItemInfo> items = createTestItems(1);
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
when(acknowledgementSetManager.create(any(), any(Duration.class)))
.thenReturn(acknowledgementSet);

crawler.setAcknowledgementsEnabled(true);
crawler.crawl(leaderPartition, coordinator);

verify(client).writeBatchToBuffer(items, buffer, acknowledgementSet);
verify(acknowledgementSet).complete();
}

@Test
void testCrawlWithMultipleBatches() {
List<ItemInfo> items = createTestItems(BATCH_SIZE + 1); // Create more than one batch
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());

crawler.setAcknowledgementsEnabled(false);
crawler.crawl(leaderPartition, coordinator);

verify(client, times(2)).writeBatchToBuffer(any(), eq(buffer), any());
}

@Test
void testBufferWriteFailure() {
List<ItemInfo> items = createTestItems(1);
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
doThrow(new RuntimeException("Buffer write failed"))
.when(client).writeBatchToBuffer(any(), any(), any());

assertThrows(RuntimeException.class, () ->
crawler.crawl(leaderPartition, coordinator));
}

@Test
void testProgressStateUpdate() {
List<ItemInfo> items = createTestItems(1);
String newToken = "id0"; // This will be the ID of the created test item
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());

crawler.setAcknowledgementsEnabled(false);
crawler.crawl(leaderPartition, coordinator);

verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
TokenPaginationCrawlerLeaderProgressState state =
(TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get();
assertEquals(newToken, state.getLastToken());
}

private List<ItemInfo> createTestItems(int count) {
List<ItemInfo> items = new ArrayList<>();
for (int i = 0; i < count; i++) {
items.add(new TestItemInfo("id" + i, new HashMap<>(), Instant.now()));
}
return items;
}
}
Loading