Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pull-based Ingestion] Add Kinesis plugin support #17615

Merged
merged 8 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,34 @@ protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaS
}

@Override
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(KafkaOffset offset, long maxMessages, int timeoutMillis)
throws TimeoutException {
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(
KafkaOffset offset,
boolean includeStart,
long maxMessages,
int timeoutMillis
) throws TimeoutException {
List<ReadResult<KafkaOffset, KafkaMessage>> records = AccessController.doPrivileged(
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(offset.getOffset(), maxMessages, timeoutMillis)
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(
offset.getOffset(),
includeStart,
maxMessages,
timeoutMillis
)
);
return records;
}

@Override
public KafkaOffset nextPointer() {
return new KafkaOffset(lastFetchedOffset + 1);
}

@Override
public KafkaOffset nextPointer(KafkaOffset pointer) {
return new KafkaOffset(pointer.getOffset() + 1);
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(long maxMessages, int timeoutMillis) throws TimeoutException {
List<ReadResult<KafkaOffset, KafkaMessage>> records = AccessController.doPrivileged(
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(
lastFetchedOffset,
false,
maxMessages,
timeoutMillis
)
);
return records;
}

@Override
Expand Down Expand Up @@ -191,18 +203,28 @@ public IngestionShardPointer pointerFromOffset(String offset) {
return new KafkaOffset(offsetValue);
}

private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(long startOffset, long maxMessages, int timeoutMillis) {
if (lastFetchedOffset < 0 || lastFetchedOffset != startOffset - 1) {
logger.info("Seeking to offset {}", startOffset);
consumer.seek(topicPartition, startOffset);
private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(
long startOffset,
boolean includeStart,
long maxMessages,
int timeoutMillis
) {
long kafkaStartOffset = startOffset;
if (!includeStart) {
kafkaStartOffset += 1;
}

if (lastFetchedOffset < 0 || lastFetchedOffset != kafkaStartOffset - 1) {
logger.info("Seeking to offset {}", kafkaStartOffset);
consumer.seek(topicPartition, kafkaStartOffset);
// update the last fetched offset so that we don't need to seek again if no more messages to fetch
lastFetchedOffset = startOffset - 1;
lastFetchedOffset = kafkaStartOffset - 1;
}

ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(timeoutMillis));
List<ConsumerRecord<byte[], byte[]>> messageAndOffsets = consumerRecords.records(topicPartition);

long endOffset = startOffset + maxMessages;
long endOffset = kafkaStartOffset + maxMessages;
List<ReadResult<KafkaOffset, KafkaMessage>> results = new ArrayList<>();

for (ConsumerRecord<byte[], byte[]> messageAndOffset : messageAndOffsets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ public void testReadNext() throws Exception {

when(mockConsumer.poll(any(Duration.class))).thenReturn(records);

List<IngestionShardConsumer.ReadResult<KafkaOffset, KafkaMessage>> result = consumer.readNext(new KafkaOffset(0), 10, 1000);
List<IngestionShardConsumer.ReadResult<KafkaOffset, KafkaMessage>> result = consumer.readNext(new KafkaOffset(0), true, 10, 1000);

assertEquals(1, result.size());
assertEquals("message", new String(result.get(0).getMessage().getPayload(), StandardCharsets.UTF_8));
assertEquals(1, consumer.nextPointer().getOffset());
assertEquals(0, consumer.getShardId());
assertEquals("client1", consumer.getClientId());
}
Expand Down
251 changes: 251 additions & 0 deletions plugins/ingestion-kinesis/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description = 'Pull-based ingestion plugin to consume from Kinesis'
classname = 'org.opensearch.plugin.kinesis.KinesisPlugin'
}

versions << [
'docker': '3.3.6',
'testcontainers': '1.19.7',
'ducttape': '1.0.8',
'snappy': '1.1.10.7',
]

dependencies {
// aws sdk
api "software.amazon.awssdk:sdk-core:${versions.aws}"
api "software.amazon.awssdk:annotations:${versions.aws}"
api "software.amazon.awssdk:aws-core:${versions.aws}"
api "software.amazon.awssdk:auth:${versions.aws}"
api "software.amazon.awssdk:identity-spi:${versions.aws}"
api "software.amazon.awssdk:checksums:${versions.aws}"
api "software.amazon.awssdk:checksums-spi:${versions.aws}"
api "software.amazon.awssdk.crt:aws-crt:${versions.awscrt}"
api "software.amazon.awssdk:http-auth:${versions.aws}"
api "software.amazon.awssdk:http-auth-aws:${versions.aws}"
api "software.amazon.awssdk:http-auth-spi:${versions.aws}"
api "software.amazon.awssdk:retries:${versions.aws}"
api "software.amazon.awssdk:retries-spi:${versions.aws}"
api "software.amazon.awssdk:endpoints-spi:${versions.aws}"
api "software.amazon.awssdk:http-client-spi:${versions.aws}"
api "software.amazon.awssdk:apache-client:${versions.aws}"
api "software.amazon.awssdk:metrics-spi:${versions.aws}"
api "software.amazon.awssdk:profiles:${versions.aws}"
api "software.amazon.awssdk:regions:${versions.aws}"
api "software.amazon.awssdk:utils:${versions.aws}"
api "software.amazon.awssdk:aws-json-protocol:${versions.aws}"
api "software.amazon.awssdk:protocol-core:${versions.aws}"
api "software.amazon.awssdk:json-utils:${versions.aws}"
api "software.amazon.awssdk:third-party-jackson-core:${versions.aws}"
api "software.amazon.awssdk:aws-xml-protocol:${versions.aws}"
api "software.amazon.awssdk:aws-json-protocol:${versions.aws}"
api "software.amazon.awssdk:aws-query-protocol:${versions.aws}"
api "software.amazon.awssdk:sts:${versions.aws}"
api "software.amazon.awssdk:netty-nio-client:${versions.aws}"
api "software.amazon.awssdk:kinesis:${versions.aws}"
api "software.amazon.awssdk:aws-cbor-protocol:${versions.aws}"
api "software.amazon.awssdk:third-party-jackson-dataformat-cbor:${versions.aws}"

api "org.apache.httpcomponents:httpclient:${versions.httpclient}"
api "org.apache.httpcomponents:httpcore:${versions.httpcore}"
api "commons-logging:commons-logging:${versions.commonslogging}"
api "commons-codec:commons-codec:${versions.commonscodec}"
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
api "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
api "joda-time:joda-time:${versions.joda}"
api "org.slf4j:slf4j-api:${versions.slf4j}"

// network stack
api "io.netty:netty-buffer:${versions.netty}"
api "io.netty:netty-codec:${versions.netty}"
api "io.netty:netty-codec-http:${versions.netty}"
api "io.netty:netty-codec-http2:${versions.netty}"
api "io.netty:netty-common:${versions.netty}"
api "io.netty:netty-handler:${versions.netty}"
api "io.netty:netty-resolver:${versions.netty}"
api "io.netty:netty-transport:${versions.netty}"
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
api "io.netty:netty-transport-classes-epoll:${versions.netty}"


// test
testImplementation "com.github.docker-java:docker-java-api:${versions.docker}"
testImplementation "com.github.docker-java:docker-java-transport:${versions.docker}"
testImplementation "com.github.docker-java:docker-java-transport-zerodep:${versions.docker}"
testImplementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
testImplementation "org.testcontainers:testcontainers:${versions.testcontainers}"
testImplementation "org.testcontainers:localstack:${versions.testcontainers}"
testImplementation "org.rnorth.duct-tape:duct-tape:${versions.ducttape}"
testImplementation "org.apache.commons:commons-compress:${versions.commonscompress}"
testImplementation "commons-io:commons-io:${versions.commonsio}"
testImplementation 'org.awaitility:awaitility:4.2.0'
}

internalClusterTest{
environment 'TESTCONTAINERS_RYUK_DISABLED', 'true'
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
systemProperty 'tests.security.manager', 'false'
}

tasks.named("dependencyLicenses").configure {
mapping from: /jackson-.*/, to: 'jackson'
mapping from: /netty-.*/, to: 'netty'
mapping from: /log4j-.*/, to: 'log4j'
}

thirdPartyAudit {
ignoreMissingClasses(
'com.aayushatharva.brotli4j.Brotli4jLoader',
'com.aayushatharva.brotli4j.decoder.DecoderJNI$Status',
'com.aayushatharva.brotli4j.decoder.DecoderJNI$Wrapper',
'com.aayushatharva.brotli4j.encoder.BrotliEncoderChannel',
'com.aayushatharva.brotli4j.encoder.Encoder$Mode',
'com.aayushatharva.brotli4j.encoder.Encoder$Parameters',

'com.google.protobuf.nano.CodedOutputByteBufferNano',
'com.google.protobuf.nano.MessageNano',

'org.apache.avalon.framework.logger.Logger',
'org.apache.log.Hierarchy',
'org.apache.log.Logger',
'org.apache.log4j.Level',
'org.apache.log4j.Logger',
'org.apache.log4j.Priority',

'org.slf4j.impl.StaticLoggerBinder',
'org.slf4j.impl.StaticMDCBinder',
'org.slf4j.impl.StaticMarkerBinder',

'org.graalvm.nativeimage.hosted.Feature',
'org.graalvm.nativeimage.hosted.Feature$AfterImageWriteAccess',

'com.ning.compress.BufferRecycler',
'com.ning.compress.lzf.ChunkDecoder',
'com.ning.compress.lzf.ChunkEncoder',
'com.ning.compress.lzf.LZFChunk',
'com.ning.compress.lzf.LZFEncoder',
'com.ning.compress.lzf.util.ChunkDecoderFactory',
'com.ning.compress.lzf.util.ChunkEncoderFactory',

'javax.servlet.ServletContextEvent',
'javax.servlet.ServletContextListener',

'io.netty.internal.tcnative.Buffer',
'io.netty.internal.tcnative.CertificateCompressionAlgo',
'io.netty.internal.tcnative.Library',
'io.netty.internal.tcnative.SSLContext',
'io.netty.internal.tcnative.SSLPrivateKeyMethod',

'io.netty.internal.tcnative.AsyncSSLPrivateKeyMethod',
'io.netty.internal.tcnative.AsyncTask',
'io.netty.internal.tcnative.CertificateCallback',
'io.netty.internal.tcnative.CertificateVerifier',
'io.netty.internal.tcnative.ResultCallback',
'io.netty.internal.tcnative.SessionTicketKey',
'io.netty.internal.tcnative.SniHostNameMatcher',
'io.netty.internal.tcnative.SSL',
'io.netty.internal.tcnative.SSLSession',
'io.netty.internal.tcnative.SSLSessionCache',

'lzma.sdk.lzma.Encoder',
'net.jpountz.lz4.LZ4Compressor',
'net.jpountz.lz4.LZ4Factory',
'net.jpountz.lz4.LZ4FastDecompressor',
'net.jpountz.xxhash.XXHash32',
'net.jpountz.xxhash.XXHashFactory',

// from io.netty.handler.ssl.util.BouncyCastleSelfSignedCertGenerator (netty)
'org.bouncycastle.cert.X509v3CertificateBuilder',
'org.bouncycastle.cert.jcajce.JcaX509CertificateConverter',
'org.bouncycastle.operator.jcajce.JcaContentSignerBuilder',
'org.bouncycastle.openssl.PEMEncryptedKeyPair',
'org.bouncycastle.openssl.PEMParser',
'org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter',
'org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder',
'org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder',
'org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo',

'org.conscrypt.AllocatedBuffer',
'org.conscrypt.BufferAllocator',
'org.conscrypt.Conscrypt',
'org.conscrypt.HandshakeListener',

'org.eclipse.jetty.alpn.ALPN$ClientProvider',
'org.eclipse.jetty.alpn.ALPN$ServerProvider',
'org.eclipse.jetty.alpn.ALPN',

// from io.netty.handler.ssl.JettyNpnSslEngine (netty)
'org.eclipse.jetty.npn.NextProtoNego$ClientProvider',
'org.eclipse.jetty.npn.NextProtoNego$ServerProvider',
'org.eclipse.jetty.npn.NextProtoNego',

// from io.netty.handler.codec.marshalling.ChannelBufferByteInput (netty)
'org.jboss.marshalling.ByteInput',

// from io.netty.handler.codec.marshalling.ChannelBufferByteOutput (netty)
'org.jboss.marshalling.ByteOutput',

// from io.netty.handler.codec.marshalling.CompatibleMarshallingEncoder (netty)
'org.jboss.marshalling.Marshaller',

// from io.netty.handler.codec.marshalling.ContextBoundUnmarshallerProvider (netty)
'org.jboss.marshalling.MarshallerFactory',
'org.jboss.marshalling.MarshallingConfiguration',
'org.jboss.marshalling.Unmarshaller',

'reactor.blockhound.BlockHound$Builder',
'reactor.blockhound.integration.BlockHoundIntegration',

'software.amazon.eventstream.HeaderValue',
'software.amazon.eventstream.Message',
'software.amazon.eventstream.MessageDecoder'
)

ignoreViolations (
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$1',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$2',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$3',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$4',
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5',

'io.netty.util.internal.PlatformDependent0',
'io.netty.util.internal.PlatformDependent0$1',
'io.netty.util.internal.PlatformDependent0$2',
'io.netty.util.internal.PlatformDependent0$3',
'io.netty.util.internal.PlatformDependent0$4',
'io.netty.util.internal.PlatformDependent0$6',

'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueConsumerNodeRef',
'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueProducerNodeRef',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueColdProducerFields',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueConsumerFields',
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueProducerFields',
'io.netty.util.internal.shaded.org.jctools.queues.LinkedQueueNode',
'io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerLimitField',
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueConsumerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerIndexField',
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerLimitField',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess',
'io.netty.util.internal.shaded.org.jctools.util.UnsafeLongArrayAccess',
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
c5acc1da9567290302d80ffa1633785afa4ce630
Loading
Loading