Skip to content

Commit d3735a8

Browse files
committed
address comments
Signed-off-by: Yupeng Fu <[email protected]>
1 parent 916e3ec commit d3735a8

File tree

5 files changed

+66
-26
lines changed

5 files changed

+66
-26
lines changed

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

+13
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,19 @@ public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(
136136
return records;
137137
}
138138

139+
@Override
140+
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(long maxMessages, int timeoutMillis) throws TimeoutException {
141+
List<ReadResult<KafkaOffset, KafkaMessage>> records = AccessController.doPrivileged(
142+
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(
143+
lastFetchedOffset,
144+
false,
145+
maxMessages,
146+
timeoutMillis
147+
)
148+
);
149+
return records;
150+
}
151+
139152
@Override
140153
public IngestionShardPointer earliestPointer() {
141154
long startOffset = AccessController.doPrivileged(

plugins/ingestion-kinesis/src/internalClusterTest/java/org/opensearch/plugin/kinesis/KinesisIngestionBaseIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ private void stopKinesis() {
9191

9292
if (localstack != null) {
9393
localstack.stop();
94+
localstack = null;
9495
}
9596
}
9697

plugins/ingestion-kinesis/src/main/java/org/opensearch/plugin/kinesis/KinesisShardConsumer.java

+41-21
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class KinesisShardConsumer implements IngestionShardConsumer<SequenceNumb
4949
*/
5050
;
5151
private KinesisClient kinesisClient;
52-
private String lastFetchedSequenceNumber = "";
52+
private String lastShardIterator;
5353
final String clientId;
5454
final String kinesisShardId;
5555
final int shardId;
@@ -124,6 +124,7 @@ public List<ReadResult<SequenceNumber, KinesisMessage>> readNext(
124124
int timeoutMillis
125125
) throws TimeoutException {
126126
List<ReadResult<SequenceNumber, KinesisMessage>> records = fetch(
127+
null,
127128
sequenceNumber.getSequenceNumber(),
128129
includeStart,
129130
maxMessages,
@@ -132,6 +133,14 @@ public List<ReadResult<SequenceNumber, KinesisMessage>> readNext(
132133
return records;
133134
}
134135

136+
@Override
137+
public List<ReadResult<SequenceNumber, KinesisMessage>> readNext(long maxMessages, int timeoutMillis) throws TimeoutException {
138+
if (lastShardIterator == null) {
139+
throw new IllegalStateException("No shard iterator available");
140+
}
141+
return fetch(lastShardIterator, null, false, maxMessages, timeoutMillis);
142+
}
143+
135144
@Override
136145
public IngestionShardPointer earliestPointer() {
137146
return getSequenceNumber(ShardIteratorType.TRIM_HORIZON, null, 0);
@@ -142,40 +151,50 @@ public IngestionShardPointer latestPointer() {
142151
return getSequenceNumber(ShardIteratorType.LATEST, null, 0);
143152
}
144153

145-
private List<Record> fetchRecords(ShardIteratorType shardIteratorType, String startingSequenceNumber, long timestampMillis, int limit) {
146-
// Get a shard iterator AFTER the given sequence number
147-
GetShardIteratorRequest.Builder builder = GetShardIteratorRequest.builder()
148-
.streamName(config.getStream())
149-
.shardId(kinesisShardId)
150-
.shardIteratorType(shardIteratorType);
154+
private List<Record> fetchRecords(
155+
String shardIterator,
156+
ShardIteratorType shardIteratorType,
157+
String startingSequenceNumber,
158+
long timestampMillis,
159+
int limit
160+
) {
161+
String shardIteratorToUse = shardIterator;
151162

152-
if (startingSequenceNumber != null) {
153-
builder = builder.startingSequenceNumber(startingSequenceNumber);
154-
}
163+
if (shardIterator == null) {
164+
// fetch the shard iterator
165+
GetShardIteratorRequest.Builder builder = GetShardIteratorRequest.builder()
166+
.streamName(config.getStream())
167+
.shardId(kinesisShardId)
168+
.shardIteratorType(shardIteratorType);
169+
170+
if (startingSequenceNumber != null) {
171+
builder = builder.startingSequenceNumber(startingSequenceNumber);
172+
}
155173

156-
if (timestampMillis != 0) {
157-
builder = builder.timestamp(Instant.ofEpochMilli(timestampMillis));
158-
}
174+
if (timestampMillis != 0) {
175+
builder = builder.timestamp(Instant.ofEpochMilli(timestampMillis));
176+
}
159177

160-
GetShardIteratorRequest shardIteratorRequest = builder.build();
178+
GetShardIteratorRequest shardIteratorRequest = builder.build();
161179

162-
GetShardIteratorResponse shardIteratorResponse = kinesisClient.getShardIterator(shardIteratorRequest);
163-
String shardIterator = shardIteratorResponse.shardIterator();
180+
GetShardIteratorResponse shardIteratorResponse = kinesisClient.getShardIterator(shardIteratorRequest);
181+
shardIteratorToUse = shardIteratorResponse.shardIterator();
182+
}
164183

165-
if (shardIterator == null) {
184+
if (shardIteratorToUse == null) {
166185
return new ArrayList<>();
167186
}
168187

169188
// Fetch the next records
170-
GetRecordsRequest recordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).limit(limit).build();
171-
189+
GetRecordsRequest recordsRequest = GetRecordsRequest.builder().shardIterator(shardIteratorToUse).limit(limit).build();
172190
GetRecordsResponse recordsResponse = kinesisClient.getRecords(recordsRequest);
191+
lastShardIterator = recordsResponse.nextShardIterator();
173192
List<Record> records = recordsResponse.records();
174193
return records;
175194
}
176195

177196
private SequenceNumber getSequenceNumber(ShardIteratorType shardIteratorType, String startingSequenceNumber, long timestampMillis) {
178-
List<Record> records = fetchRecords(shardIteratorType, startingSequenceNumber, timestampMillis, 1);
197+
List<Record> records = fetchRecords(null, shardIteratorType, startingSequenceNumber, timestampMillis, 1);
179198

180199
if (!records.isEmpty()) {
181200
Record nextRecord = records.get(0);
@@ -197,6 +216,7 @@ public IngestionShardPointer pointerFromOffset(String offset) {
197216
}
198217

199218
private synchronized List<ReadResult<SequenceNumber, KinesisMessage>> fetch(
219+
String shardIterator,
200220
String sequenceNumber,
201221
boolean includeStart,
202222
long maxMessages,
@@ -208,7 +228,7 @@ private synchronized List<ReadResult<SequenceNumber, KinesisMessage>> fetch(
208228

209229
ShardIteratorType iteratorType = includeStart ? ShardIteratorType.AT_SEQUENCE_NUMBER : ShardIteratorType.AFTER_SEQUENCE_NUMBER;
210230

211-
List<Record> records = fetchRecords(iteratorType, sequenceNumber, 0, (int) limit);
231+
List<Record> records = fetchRecords(shardIterator, iteratorType, sequenceNumber, 0, (int) limit);
212232

213233
List<ReadResult<SequenceNumber, KinesisMessage>> results = new ArrayList<>();
214234

server/src/main/java/org/opensearch/index/IngestionShardConsumer.java

+10
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,16 @@ public M getMessage() {
6969
List<ReadResult<T, M>> readNext(T pointer, boolean includeStart, long maxMessages, int timeoutMillis)
7070
throws java.util.concurrent.TimeoutException;
7171

72+
/**
73+
* Read the next set of messages from the source using the previous pointer. An exception is thrown if no previous pointer is available.
74+
* This method is used as an optimization for consecutive reads.
75+
* @param maxMessages the maximum number of messages to read, or -1 for no limit
76+
* @param timeoutMillis the maximum time to wait for messages
77+
* @return a list of messages read from the source
78+
* @throws java.util.concurrent.TimeoutException
79+
*/
80+
List<ReadResult<T, M>> readNext(long maxMessages, int timeoutMillis) throws java.util.concurrent.TimeoutException;
81+
7282
/**
7383
* @return the earliest pointer in the shard
7484
*/

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,6 @@ protected void startPoll() {
149149
}
150150
logger.info("Starting poller for shard {}", consumer.getShardId());
151151

152-
// track the last record successfully written to the blocking queue
153-
IngestionShardPointer lastSuccessfulPointer = null;
154-
155152
while (true) {
156153
try {
157154
if (closed) {
@@ -204,7 +201,7 @@ protected void startPoll() {
204201
if (includeBatchStartPointer) {
205202
results = consumer.readNext(batchStartPointer, true, MAX_POLL_SIZE, POLL_TIMEOUT);
206203
} else {
207-
results = consumer.readNext(lastSuccessfulPointer, false, MAX_POLL_SIZE, POLL_TIMEOUT);
204+
results = consumer.readNext(MAX_POLL_SIZE, POLL_TIMEOUT);
208205
}
209206

210207
if (results.isEmpty()) {
@@ -221,7 +218,6 @@ protected void startPoll() {
221218
batchStartPointer = result.getPointer();
222219
firstInBatch = false;
223220
}
224-
lastSuccessfulPointer = result.getPointer();
225221

226222
// check if the message is already processed
227223
if (isProcessed(result.getPointer())) {

0 commit comments

Comments
 (0)