Skip to content

Commit c855963

Browse files
committed
comment
Signed-off-by: Yupeng Fu <[email protected]>
1 parent 17abbaf commit c855963

File tree

1 file changed

+41
-1
lines changed

1 file changed

+41
-1
lines changed

plugins/ingestion-kinesis/src/internalClusterTest/java/org/opensearch/plugin/kinesis/IngestFromKinesisIT.java

+41-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,14 @@
2828
import java.util.stream.Stream;
2929

3030
import org.testcontainers.containers.localstack.LocalStackContainer;
31+
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
32+
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
33+
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
34+
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
35+
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
36+
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
37+
import software.amazon.awssdk.services.kinesis.model.Record;
38+
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
3139

3240
import static org.hamcrest.Matchers.is;
3341
import static org.awaitility.Awaitility.await;
@@ -98,7 +106,11 @@ public void testKinesisIngestion_RewindByOffset() throws InterruptedException {
98106
String sequenceNumber = produceData("3", "name3", "20");
99107
logger.info("Produced message with sequence number: {}", sequenceNumber);
100108
produceData("4", "name4", "21");
101-
Thread.sleep(2000);
109+
110+
await()
111+
.atMost(5, TimeUnit.SECONDS)
112+
.until(() -> isRewinded(sequenceNumber));
113+
102114

103115
// create an index with ingestion source from kinesis
104116
createIndex(
@@ -128,4 +140,32 @@ public void testKinesisIngestion_RewindByOffset() throws InterruptedException {
128140
assertThat(response.getHits().getTotalHits().value(), is(2L));
129141
});
130142
}
143+
144+
private boolean isRewinded(String sequenceNumber) {
145+
DescribeStreamResponse describeStreamResponse =
146+
kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(streamName).build());
147+
148+
String shardId = describeStreamResponse.streamDescription().shards().get(0).shardId();
149+
150+
GetShardIteratorRequest iteratorRequest = GetShardIteratorRequest.builder()
151+
.streamName(streamName)
152+
.shardId(shardId)
153+
.shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
154+
.startingSequenceNumber(sequenceNumber)
155+
.build();
156+
157+
GetShardIteratorResponse iteratorResponse = kinesisClient.getShardIterator(iteratorRequest);
158+
String shardIterator = iteratorResponse.shardIterator();
159+
160+
// Use the iterator to read the record
161+
GetRecordsRequest recordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).limit(1) // Adjust as needed
162+
.build();
163+
164+
GetRecordsResponse recordsResponse = kinesisClient.getRecords(recordsRequest);
165+
List<Record> records = recordsResponse.records();
166+
if (records.size() != 1) {
167+
return false;
168+
}
169+
return records.get(0).partitionKey().equals("3");
170+
}
131171
}

0 commit comments

Comments
 (0)