Skip to content

Conversation

alparish
Copy link
Contributor

Description

This PR introduces the LeaderOnlyTokenCrawler, a performance-optimized implementation that eliminates redundant API calls by processing complete event content in the leader thread without worker partitions.

  • Created new LeaderOnlyTokenCrawler class that handles both content retrieval and buffer writing
  • Added support for both acknowledged and unacknowledged buffer writing
  • Implemented periodic checkpoint updates and failure handling

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

*
* @param buffer The buffer to write events to
*/
void setBuffer(Buffer<Record<Event>> buffer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this method (setBuffer) and add buffer as a parameter to writeBatchToBuffer in a new revision

long startTime = System.currentTimeMillis();
Instant lastCheckpointTime = Instant.now();

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this try-catch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the outer try catch is not needed as we already have a try catch around process batch. I will remove this.


Iterator<ItemInfo> itemIterator = client.listItems(lastToken);

while (itemIterator.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we switched from do-while to while loop? Ref

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If itemIterator.hasNext() is false, then client.listItems() returned an empty iterator and there are no items to process. So there's no need to process anything if hasNext() is false on the first check. That's why a regular while loop is appropriate here.

throw new RuntimeException("Crawl operation failed", e);
}

log.info("Crawl completed in {} ms", System.currentTimeMillis() - startTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Included crawl metric

private final LeaderOnlyTokenCrawlerClient client;
private final PluginMetrics pluginMetrics;
@Setter
private boolean acknowledgementsEnabled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being set from the Source plugin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

private void processBatch(List<ItemInfo> batch,
LeaderPartition leaderPartition,
EnhancedSourceCoordinator coordinator) {
if (acknowledgementsEnabled && acknowledgementSetManager != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we expect acknowledgementSetManager to always exist if acknowledgementsEnabled is true, does it make sense to remove this null check? I am referring to WorkerScheduler

Open to your feedback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. Since acknowledgementSetManager is required when acknowledgements are enabled, the null check is redundant. I will remove it.

} else {
// On failure: give up partition
log.error("Batch processing failed for token: {}", lastToken);
coordinator.giveUpPartition(leaderPartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should be giving up the partition, since this indicates the partition is shutting down and if this is for leader partition, then it will indicate the entire plugin is shutting down which is not the case:

Should be called by the source when it is shutting down to indicate that it will no longer be able to perform work on partitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alparish what happens if acknowledgements do not comeback? Do you have a retry mechanism?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @bbenner7635 Giving up is probably not right here. Please test it locally to confirm the behavior

Copy link
Collaborator

@kkondaka kkondaka Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, acknowledgement timeouts should be very large (or INT MAX) because retrying events (that have not been acknowledged) means that the duplicate events will be sent. And if there are buffer issues or sink issues that caused the acknowledgements to be timedout, then injecting same events again will make the problem worse.

AcknowledgementSet acknowledgementSet = acknowledgementSetManager.create(
success -> {
if (success) {
// On success: update checkpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these plugin metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added these metrics

success -> {
if (success) {
// On success: update checkpoint
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String a not a primitive variable. updateLeaderProgressState here is async function which might be invoked a while later.

Is it possible the lastToken here is already changed to a different value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback is processed right after writing to the buffer, and before processing the next batch. Since we only update lastToken when processing a new batch, its value will remain consistent during the acknowledgment handling for the current batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then it could be a big performance risk because it will wait until event being ingested and then move to next batch.

Say the buffer wait time is 30 second, it means we can only process 50 events every 30 second?

If that is the case, shall we increase the batch size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to increase the batch size, we need a reasonably high number to ensure the minimal speed as well as not breaching buffer size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@san81 Do you have any recommendation for the batch size

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to just say one number. I would say, really test with different batch sizes and see what gives the best performance. The factors to consider are

  • vendor api support for max page size and latency with respect to the increase in page size
  • size of the api response payload with respect to the increase in page size. Just giving an example here. For S3 based pipelines, OSI pipelines process at least 20Mbps per OCU.
  • In these 3p connector pipelines, I don't think we will ever fill up the buffer. Each OCU comes up with 8GB buffer size. Biggest bottleneck is the vendor API latency and network latency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that is the case, I think we can easily go up to 5000 event batch for now and we will do load test to verify it is working without problem. The max size is no more then 10kb so the total size is maximumly 50MB, way less than 8GB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After I rethink about it. We should make it configurable per connector.

For Okta, each API can only return maximum 100 events. So 5000 event batch requires 50 calls, which can easily run into a failure mode where 1 api failure block 49 other calls.
A reasonable balance is 5 ~ 10 calls

@alparish alparish force-pushed the feature/leaderOnlyTokenCrawler branch from f9b45fc to c2fd947 Compare October 15, 2025 03:53
Signed-off-by: Alekhya Parisha <[email protected]>
@alparish alparish force-pushed the feature/leaderOnlyTokenCrawler branch from c2fd947 to ca88f1c Compare October 15, 2025 03:57
processBatch(batch, leaderPartition, coordinator);
} catch (Exception e) {
batchesFailedCounter.increment();
log.error("Failed to process batch ending with token {}", lastToken, e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using NOISY is preferred for this case since we are printing the entire stack trace

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this. This is happening at a batch level (not per-event level). And it should be more of a defensive programming catch.

} else {
// On failure: give up partition
log.error("Batch processing failed for token: {}", lastToken);
coordinator.giveUpPartition(leaderPartition);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @bbenner7635 Giving up is probably not right here. Please test it locally to confirm the behavior

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something about this seems to mix concerns. We already have four crawlers. Each one has a different way to crawl. This change is still the token crawler, but it is using only the leader node. We should decouple the approach for crawling from the nodes that make use of it. Otherwise, we may end up with 8 crawlers.

processBatch(batch, leaderPartition, coordinator);
} catch (Exception e) {
batchesFailedCounter.increment();
log.error("Failed to process batch ending with token {}", lastToken, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this. This is happening at a batch level (not per-event level). And it should be more of a defensive programming catch.

@Named
public class LeaderOnlyTokenCrawler implements Crawler {
private static final Logger log = LoggerFactory.getLogger(LeaderOnlyTokenCrawler.class);
private static final Duration BUFFER_WRITE_TIMEOUT = Duration.ofSeconds(15);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the client be providing these values?

import java.util.concurrent.TimeUnit;

@Named
public class LeaderOnlyTokenCrawler implements Crawler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not use the raw type. You need Crawler<...>.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants