Skip to content

Commit ca88f1c

Browse files
author
Alekhya Parisha
committed
Implement LeaderOnlyTokenCrawler
Signed-off-by: Alekhya Parisha <[email protected]>
1 parent 8dbdfeb commit ca88f1c

File tree

3 files changed

+360
-0
lines changed

3 files changed

+360
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package org.opensearch.dataprepper.plugins.source.source_crawler.base;
2+
3+
import io.micrometer.core.instrument.Counter;
4+
import io.micrometer.core.instrument.Timer;
5+
import lombok.Setter;
6+
import org.opensearch.dataprepper.metrics.PluginMetrics;
7+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
8+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
9+
import org.opensearch.dataprepper.model.buffer.Buffer;
10+
import org.opensearch.dataprepper.model.event.Event;
11+
import org.opensearch.dataprepper.model.record.Record;
12+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
13+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
14+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
15+
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
import javax.inject.Named;
20+
import java.time.Duration;
21+
import java.time.Instant;
22+
import java.util.ArrayList;
23+
import java.util.Iterator;
24+
import java.util.List;
25+
import java.util.concurrent.TimeUnit;
26+
27+
@Named
28+
public class LeaderOnlyTokenCrawler implements Crawler {
29+
private static final Logger log = LoggerFactory.getLogger(LeaderOnlyTokenCrawler.class);
30+
private static final Duration BUFFER_WRITE_TIMEOUT = Duration.ofSeconds(15);
31+
private static final Duration CHECKPOINT_INTERVAL = Duration.ofMinutes(1);
32+
private static final Duration DEFAULT_LEASE_DURATION = Duration.ofMinutes(15);
33+
private static final int BATCH_SIZE = 50;
34+
35+
private static final String METRIC_BATCHES_FAILED = "batchesFailed";
36+
private static final String METRIC_BUFFER_WRITE_TIME = "bufferWriteTime";
37+
public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses";
38+
public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures";
39+
40+
private final LeaderOnlyTokenCrawlerClient client;
41+
private final Timer crawlingTimer;
42+
private final PluginMetrics pluginMetrics;
43+
@Setter
44+
private boolean acknowledgementsEnabled;
45+
@Setter
46+
private AcknowledgementSetManager acknowledgementSetManager;
47+
@Setter
48+
private Buffer<Record<Event>> buffer;
49+
private final Counter batchesFailedCounter;
50+
private final Counter acknowledgementSetSuccesses;
51+
private final Counter acknowledgementSetFailures;
52+
private final Timer bufferWriteTimer;
53+
54+
private String lastToken;
55+
56+
public LeaderOnlyTokenCrawler(
57+
LeaderOnlyTokenCrawlerClient client,
58+
PluginMetrics pluginMetrics) {
59+
this.client = client;
60+
this.pluginMetrics = pluginMetrics;
61+
this.crawlingTimer = pluginMetrics.timer("crawlingTime");
62+
this.batchesFailedCounter = pluginMetrics.counter(METRIC_BATCHES_FAILED);
63+
this.bufferWriteTimer = pluginMetrics.timer(METRIC_BUFFER_WRITE_TIME);
64+
this.acknowledgementSetSuccesses = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME);
65+
this.acknowledgementSetFailures = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME);
66+
}
67+
68+
@Override
69+
public Instant crawl(LeaderPartition leaderPartition,
70+
EnhancedSourceCoordinator coordinator) {
71+
long startTime = System.currentTimeMillis();
72+
Instant lastCheckpointTime = Instant.now();
73+
TokenPaginationCrawlerLeaderProgressState leaderProgressState =
74+
(TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get();
75+
lastToken = leaderProgressState.getLastToken();
76+
77+
log.info("Starting leader-only crawl with token: {}", lastToken);
78+
79+
Iterator<ItemInfo> itemIterator = client.listItems(lastToken);
80+
81+
while (itemIterator.hasNext()) {
82+
List<ItemInfo> batch = collectBatch(itemIterator);
83+
if (batch.isEmpty()) {
84+
continue;
85+
}
86+
87+
ItemInfo lastItem = batch.get(batch.size() - 1);
88+
lastToken = lastItem.getItemId();
89+
90+
try {
91+
processBatch(batch, leaderPartition, coordinator);
92+
} catch (Exception e) {
93+
batchesFailedCounter.increment();
94+
log.error("Failed to process batch ending with token {}", lastToken, e);
95+
throw e;
96+
}
97+
98+
// Periodic checkpoint if not using acknowledgments
99+
if (!acknowledgementsEnabled &&
100+
Duration.between(lastCheckpointTime, Instant.now()).compareTo(CHECKPOINT_INTERVAL) >= 0) {
101+
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
102+
lastCheckpointTime = Instant.now();
103+
}
104+
}
105+
106+
// Final flush of any remaining items
107+
if (!acknowledgementsEnabled) {
108+
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
109+
}
110+
111+
long crawlTimeMillis = System.currentTimeMillis() - startTime;
112+
log.debug("Crawling completed in {} ms", crawlTimeMillis);
113+
crawlingTimer.record(crawlTimeMillis, TimeUnit.MILLISECONDS);
114+
return Instant.now();
115+
}
116+
117+
@Override
118+
public void executePartition(SaasWorkerProgressState state, Buffer buffer, AcknowledgementSet acknowledgementSet) {
119+
120+
}
121+
122+
private List<ItemInfo> collectBatch(Iterator<ItemInfo> iterator) {
123+
List<ItemInfo> batch = new ArrayList<>();
124+
for (int i = 0; i < BATCH_SIZE && iterator.hasNext(); i++) {
125+
ItemInfo item = iterator.next();
126+
if (item != null) {
127+
batch.add(item);
128+
}
129+
}
130+
return batch;
131+
}
132+
133+
private void processBatch(List<ItemInfo> batch,
134+
LeaderPartition leaderPartition,
135+
EnhancedSourceCoordinator coordinator) {
136+
if (acknowledgementsEnabled) {
137+
AcknowledgementSet acknowledgementSet = acknowledgementSetManager.create(
138+
success -> {
139+
if (success) {
140+
// On success: update checkpoint
141+
acknowledgementSetSuccesses.increment();
142+
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
143+
} else {
144+
// On failure: give up partition
145+
acknowledgementSetFailures.increment();
146+
log.error("Batch processing failed for token: {}", lastToken);
147+
coordinator.giveUpPartition(leaderPartition);
148+
}
149+
},
150+
BUFFER_WRITE_TIMEOUT
151+
);
152+
153+
bufferWriteTimer.record(() -> {
154+
try {
155+
client.writeBatchToBuffer(batch, buffer, acknowledgementSet);
156+
acknowledgementSet.complete();
157+
} catch (Exception e) {
158+
log.error("Failed to write batch to buffer", e);
159+
acknowledgementSet.complete(); // This will trigger the failure callback
160+
throw e;
161+
}
162+
});
163+
} else {
164+
// Without Acknowledgments:
165+
// Write directly and update checkpoint
166+
bufferWriteTimer.record(() -> {
167+
try {
168+
client.writeBatchToBuffer(batch, buffer, null);
169+
updateLeaderProgressState(leaderPartition, lastToken, coordinator);
170+
} catch (Exception e) {
171+
log.error("Failed to write batch to buffer", e);
172+
throw e;
173+
}
174+
});
175+
}
176+
}
177+
178+
private void updateLeaderProgressState(LeaderPartition leaderPartition,
179+
String updatedToken,
180+
EnhancedSourceCoordinator coordinator) {
181+
TokenPaginationCrawlerLeaderProgressState leaderProgressState =
182+
(TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get();
183+
String oldToken = leaderProgressState.getLastToken();
184+
leaderProgressState.setLastToken(updatedToken);
185+
leaderPartition.setLeaderProgressState(leaderProgressState);
186+
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_LEASE_DURATION);
187+
log.info("Updated leader progress state: old lastToken={}, new lastToken={}", oldToken, updatedToken);
188+
}
189+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.opensearch.dataprepper.plugins.source.source_crawler.base;
2+
3+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
4+
import org.opensearch.dataprepper.model.buffer.Buffer;
5+
import org.opensearch.dataprepper.model.event.Event;
6+
import org.opensearch.dataprepper.model.record.Record;
7+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.PaginationCrawlerWorkerProgressState;
8+
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
9+
10+
import java.util.List;
11+
12+
/**
13+
* Interface for leader-only token-based crawler client that extends TokenCrawlerClient.
14+
* This interface adds additional method for direct buffer writing,
15+
* optimized for single-leader processing without worker partitions.
16+
*/
17+
public interface LeaderOnlyTokenCrawlerClient extends TokenCrawlerClient<PaginationCrawlerWorkerProgressState> {
18+
/**
19+
* Writes a batch of items directly to the buffer.
20+
*
21+
* @param items The batch of items to write
22+
* @param buffer The buffer to write events to
23+
* @param acknowledgementSet Optional acknowledgment set for tracking write completion.
24+
* If provided, items will be added to this set for acknowledgment tracking.
25+
* Can be null if acknowledgments are disabled.
26+
*/
27+
void writeBatchToBuffer(List<ItemInfo> items, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet);
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package org.opensearch.dataprepper.plugins.source.source_crawler.base;
2+
3+
import org.junit.jupiter.api.BeforeEach;
4+
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.api.extension.ExtendWith;
6+
import org.mockito.Mock;
7+
import org.mockito.junit.jupiter.MockitoExtension;
8+
import org.opensearch.dataprepper.metrics.PluginMetrics;
9+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
10+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
11+
import org.opensearch.dataprepper.model.buffer.Buffer;
12+
import org.opensearch.dataprepper.model.event.Event;
13+
import org.opensearch.dataprepper.model.record.Record;
14+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
15+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
16+
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.TokenPaginationCrawlerLeaderProgressState;
17+
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
18+
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;
19+
20+
import java.time.Duration;
21+
import java.time.Instant;
22+
import java.util.ArrayList;
23+
import java.util.Collections;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Optional;
27+
28+
import static org.junit.jupiter.api.Assertions.assertEquals;
29+
import static org.junit.jupiter.api.Assertions.assertThrows;
30+
import static org.mockito.ArgumentMatchers.any;
31+
import static org.mockito.ArgumentMatchers.eq;
32+
import static org.mockito.Mockito.verify;
33+
import static org.mockito.Mockito.when;
34+
import static org.mockito.Mockito.doThrow;
35+
import static org.mockito.Mockito.never;
36+
37+
import static org.mockito.internal.verification.VerificationModeFactory.times;
38+
39+
@ExtendWith(MockitoExtension.class)
40+
class LeaderOnlyTokenCrawlerTest {
41+
private static final int BATCH_SIZE = 50;
42+
private static final String INITIAL_TOKEN = "initial-token";
43+
private static final Duration BUFFER_WRITE_TIMEOUT = Duration.ofSeconds(15);
44+
45+
@Mock
46+
private LeaderOnlyTokenCrawlerClient client;
47+
@Mock
48+
private EnhancedSourceCoordinator coordinator;
49+
@Mock
50+
private LeaderPartition leaderPartition;
51+
@Mock
52+
private AcknowledgementSetManager acknowledgementSetManager;
53+
@Mock
54+
private AcknowledgementSet acknowledgementSet;
55+
@Mock
56+
private Buffer<Record<Event>> buffer;
57+
58+
private LeaderOnlyTokenCrawler crawler;
59+
private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("CrawlerTest", "crawler");
60+
61+
@BeforeEach
62+
void setup() {
63+
crawler = new LeaderOnlyTokenCrawler(client, pluginMetrics);
64+
crawler.setAcknowledgementSetManager(acknowledgementSetManager);
65+
crawler.setBuffer(buffer);
66+
when(leaderPartition.getProgressState())
67+
.thenReturn(Optional.of(new TokenPaginationCrawlerLeaderProgressState(INITIAL_TOKEN)));
68+
}
69+
70+
@Test
71+
void testCrawlWithEmptyList() {
72+
when(client.listItems(INITIAL_TOKEN)).thenReturn(Collections.emptyIterator());
73+
crawler.crawl(leaderPartition, coordinator);
74+
verify(client, never()).writeBatchToBuffer(any(), any(), any());
75+
}
76+
77+
@Test
78+
void testSimpleCrawl() {
79+
List<ItemInfo> items = createTestItems(1);
80+
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
81+
crawler.crawl(leaderPartition, coordinator);
82+
verify(client).writeBatchToBuffer(items, buffer, null);
83+
}
84+
85+
@Test
86+
void testCrawlWithAcknowledgmentsEnabled() {
87+
List<ItemInfo> items = createTestItems(1);
88+
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
89+
when(acknowledgementSetManager.create(any(), any(Duration.class)))
90+
.thenReturn(acknowledgementSet);
91+
92+
crawler.setAcknowledgementsEnabled(true);
93+
crawler.crawl(leaderPartition, coordinator);
94+
95+
verify(client).writeBatchToBuffer(items, buffer, acknowledgementSet);
96+
verify(acknowledgementSet).complete();
97+
}
98+
99+
@Test
100+
void testCrawlWithMultipleBatches() {
101+
List<ItemInfo> items = createTestItems(BATCH_SIZE + 1); // Create more than one batch
102+
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
103+
104+
crawler.setAcknowledgementsEnabled(false);
105+
crawler.crawl(leaderPartition, coordinator);
106+
107+
verify(client, times(2)).writeBatchToBuffer(any(), eq(buffer), any());
108+
}
109+
110+
@Test
111+
void testBufferWriteFailure() {
112+
List<ItemInfo> items = createTestItems(1);
113+
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
114+
doThrow(new RuntimeException("Buffer write failed"))
115+
.when(client).writeBatchToBuffer(any(), any(), any());
116+
117+
assertThrows(RuntimeException.class, () ->
118+
crawler.crawl(leaderPartition, coordinator));
119+
}
120+
121+
@Test
122+
void testProgressStateUpdate() {
123+
List<ItemInfo> items = createTestItems(1);
124+
String newToken = "id0"; // This will be the ID of the created test item
125+
when(client.listItems(INITIAL_TOKEN)).thenReturn(items.iterator());
126+
127+
crawler.setAcknowledgementsEnabled(false);
128+
crawler.crawl(leaderPartition, coordinator);
129+
130+
verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
131+
TokenPaginationCrawlerLeaderProgressState state =
132+
(TokenPaginationCrawlerLeaderProgressState) leaderPartition.getProgressState().get();
133+
assertEquals(newToken, state.getLastToken());
134+
}
135+
136+
private List<ItemInfo> createTestItems(int count) {
137+
List<ItemInfo> items = new ArrayList<>();
138+
for (int i = 0; i < count; i++) {
139+
items.add(new TestItemInfo("id" + i, new HashMap<>(), Instant.now()));
140+
}
141+
return items;
142+
}
143+
}

0 commit comments

Comments
 (0)