Skip to content

Commit 2ed2f29

Browse files
authored
HIVE-29238: Upgrade Kafka version from 2.5.0 to 3.9.1 (#6110)
1 parent a947925 commit 2ed2f29

File tree

12 files changed

+90
-99
lines changed

12 files changed

+90
-99
lines changed

itests/qtest-druid/pom.xml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
<druid.derby.version>10.11.1.1</druid.derby.version>
3737
<druid.guava.version>16.0.1</druid.guava.version>
3838
<druid.guice.version>4.1.0</druid.guice.version>
39-
<kafka.test.version>2.5.0</kafka.test.version>
4039
<druid.guice.version>4.1.0</druid.guice.version>
4140
<slf4j.version>1.7.30</slf4j.version>
4241
</properties>
@@ -219,12 +218,17 @@
219218
<dependency>
220219
<groupId>org.apache.kafka</groupId>
221220
<artifactId>kafka_2.12</artifactId>
222-
<version>${kafka.test.version}</version>
221+
<version>${kafka.version}</version>
223222
</dependency>
224223
<dependency>
225224
<groupId>org.apache.kafka</groupId>
226225
<artifactId>kafka-clients</artifactId>
227-
<version>${kafka.test.version}</version>
226+
<version>${kafka.version}</version>
227+
</dependency>
228+
<dependency>
229+
<groupId>org.apache.kafka</groupId>
230+
<artifactId>kafka-server</artifactId>
231+
<version>${kafka.version}</version>
228232
</dependency>
229233
<dependency>
230234
<groupId>org.slf4j</groupId>

itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.hive.kafka;
2020

2121
import kafka.server.KafkaConfig;
22-
import kafka.server.KafkaServerStartable;
22+
import kafka.server.KafkaServer;
2323

2424
import org.apache.commons.io.FileUtils;
2525
import org.apache.hadoop.service.AbstractService;
@@ -29,6 +29,7 @@
2929
import org.apache.kafka.clients.producer.ProducerRecord;
3030
import org.apache.kafka.common.serialization.ByteArraySerializer;
3131
import org.apache.kafka.common.serialization.StringSerializer;
32+
import org.apache.kafka.common.utils.Time;
3233

3334
import com.google.common.base.Throwables;
3435
import com.google.common.io.Files;
@@ -43,6 +44,7 @@
4344
import java.util.List;
4445
import java.util.Properties;
4546
import java.util.stream.IntStream;
47+
import scala.Option;
4648

4749
/**
4850
* This class has the hooks to start and stop single node kafka cluster.
@@ -54,7 +56,7 @@ public class SingleNodeKafkaCluster extends AbstractService {
5456
private static final String LOCALHOST = "localhost";
5557

5658

57-
private final KafkaServerStartable serverStartable;
59+
private final KafkaServer server;
5860
private final int brokerPort;
5961
private final String kafkaServer;
6062

@@ -81,8 +83,8 @@ public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort, Intege
8183

8284
properties.setProperty("zookeeper.connect", zkString);
8385
properties.setProperty("broker.id", String.valueOf(1));
84-
properties.setProperty("host.name", LOCALHOST);
85-
properties.setProperty("port", Integer.toString(brokerPort));
86+
properties.setProperty("listeners", "PLAINTEXT://" + LOCALHOST + ":" + Integer.toString(brokerPort));
87+
properties.setProperty("advertised.listeners", "PLAINTEXT://" + LOCALHOST + ":" + Integer.toString(brokerPort));
8688
properties.setProperty("log.dir", logDir);
8789
// This property is very important, we are sending form records with a specific time
8890
// Thus need to make sure that they don't get DELETED
@@ -94,21 +96,21 @@ public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort, Intege
9496
properties.setProperty("transaction.state.log.min.isr", String.valueOf(1));
9597
properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577");
9698

97-
this.serverStartable = new KafkaServerStartable(KafkaConfig.fromProps(properties));
99+
this.server = new KafkaServer(KafkaConfig.fromProps(properties), Time.SYSTEM, Option.empty(), false);
98100
}
99101

100102

101103
@Override
102104
protected void serviceStart() throws Exception {
103-
serverStartable.startup();
105+
server.startup();
104106
log.info("Kafka Server Started on port {}", brokerPort);
105107

106108
}
107109

108110
@Override
109111
protected void serviceStop() throws Exception {
110112
log.info("Stopping Kafka Server");
111-
serverStartable.shutdown();
113+
server.shutdown();
112114
log.info("Kafka Server Stopped");
113115
}
114116

kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.kafka.common.PartitionInfo;
3434
import org.apache.kafka.common.TopicPartition;
3535
import org.apache.kafka.common.errors.ProducerFencedException;
36+
import org.apache.kafka.common.Uuid;
3637
import org.slf4j.Logger;
3738
import org.slf4j.LoggerFactory;
3839

@@ -67,6 +68,11 @@ class HiveKafkaProducer<K, V> implements Producer<K, V> {
6768
kafkaProducer = new KafkaProducer<>(properties);
6869
}
6970

71+
@Override
72+
public Uuid clientInstanceId(Duration timeout) {
73+
throw new UnsupportedOperationException();
74+
}
75+
7076
@Override public void initTransactions() {
7177
kafkaProducer.initTransactions();
7278
}
@@ -138,11 +144,11 @@ synchronized void resumeTransaction(long producerId, short epoch) {
138144

139145
Object transactionManager = getValue(kafkaProducer, "transactionManager");
140146

141-
Object topicPartitionBookkeeper = getValue(transactionManager, "topicPartitionBookkeeper");
147+
Object txnPartitionMap = getValue(transactionManager, "txnPartitionMap");
142148
invoke(transactionManager,
143149
"transitionTo",
144150
getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
145-
invoke(topicPartitionBookkeeper, "reset");
151+
invoke(txnPartitionMap, "reset");
146152
Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
147153
setValue(producerIdAndEpoch, "producerId", producerId);
148154
setValue(producerIdAndEpoch, "epoch", epoch);
@@ -189,14 +195,35 @@ private void flushNewPartitions() {
189195

190196
private synchronized TransactionalRequestResult enqueueNewPartitions() {
191197
Object transactionManager = getValue(kafkaProducer, "transactionManager");
192-
Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
193-
invoke(transactionManager,
194-
"enqueueRequest",
195-
new Class[] {txnRequestHandler.getClass().getSuperclass()},
196-
new Object[] {txnRequestHandler});
197-
return (TransactionalRequestResult) getValue(txnRequestHandler,
198-
txnRequestHandler.getClass().getSuperclass(),
199-
"result");
198+
synchronized (transactionManager) {
199+
Object newPartitionsInTransaction =
200+
getValue(transactionManager, "newPartitionsInTransaction");
201+
Object newPartitionsInTransactionIsEmpty =
202+
invoke(newPartitionsInTransaction, "isEmpty");
203+
TransactionalRequestResult result;
204+
if (newPartitionsInTransactionIsEmpty instanceof Boolean
205+
&& !((Boolean) newPartitionsInTransactionIsEmpty)) {
206+
Object txnRequestHandler =
207+
invoke(transactionManager, "addPartitionsToTransactionHandler");
208+
invoke(
209+
transactionManager,
210+
"enqueueRequest",
211+
new Class[]{txnRequestHandler.getClass().getSuperclass()},
212+
new Object[]{txnRequestHandler});
213+
214+
result = (TransactionalRequestResult)
215+
getValue(
216+
txnRequestHandler,
217+
txnRequestHandler.getClass().getSuperclass(),
218+
"result");
219+
} else {
220+
// we don't have an operation but this operation string is also used in
221+
// addPartitionsToTransactionHandler.
222+
result = new TransactionalRequestResult("AddPartitionsToTxn");
223+
result.done();
224+
}
225+
return result;
226+
}
200227
}
201228

202229
@SuppressWarnings("unchecked") private static Enum<?> getEnum(String enumFullName) {

kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
140140
}
141141
} else {
142142
// case seek to beginning of stream
143-
consumer.seekToBeginning(Collections.singleton(topicPartition));
143+
consumer.seekToBeginning(topicPartitionList);
144144
// seekToBeginning is lazy thus need to call position() or poll(0)
145145
this.startOffset = consumer.position(topicPartition);
146146
LOG.info("Consumer at beginning of topic partition [{}], current start offset [{}]",

kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

33+
import java.time.Duration;
3334
import java.util.Iterator;
3435
import java.util.Properties;
3536

@@ -150,7 +151,7 @@ private synchronized void initialize(KafkaInputSplit inputSplit, Configuration j
150151
LOG.trace("total read bytes [{}]", readBytes);
151152
if (consumer != null) {
152153
consumer.wakeup();
153-
consumer.close();
154+
consumer.close(Duration.ZERO);
154155
}
155156
}
156157

kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.slf4j.LoggerFactory;
3838

3939
import java.io.IOException;
40+
import java.time.Duration;
4041
import java.util.Iterator;
4142
import java.util.Properties;
4243

@@ -150,7 +151,7 @@ private void cleanRowBoat() {
150151
LOG.trace("total read bytes [{}]", readBytes);
151152
if (consumer != null) {
152153
consumer.wakeup();
153-
consumer.close();
154+
consumer.close(Duration.ZERO);
154155
}
155156
}
156157

kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.kafka.clients.consumer.KafkaConsumer;
2626
import org.apache.kafka.clients.producer.ProducerConfig;
2727
import org.apache.kafka.clients.producer.ProducerRecord;
28+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2829
import org.apache.kafka.common.TopicPartition;
2930
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
3031
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -158,7 +159,9 @@
158159
@Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpochAndId() {
159160
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
160161
secondProducer.resumeTransaction(3434L, (short) 12);
161-
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
162+
secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
163+
new TopicPartition("dummy_topic", 0),
164+
new OffsetAndMetadata(0L)), "__dummy_consumer_group");
162165
secondProducer.close(Duration.ZERO);
163166
}
164167

@@ -169,7 +172,9 @@
169172
producer.close(Duration.ZERO);
170173
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
171174
secondProducer.resumeTransaction(pid, (short) 12);
172-
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
175+
secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
176+
new TopicPartition("dummy_topic", 0),
177+
new OffsetAndMetadata(0L)), "__dummy_consumer_group");
173178
secondProducer.close(Duration.ZERO);
174179
}
175180

@@ -180,7 +185,9 @@
180185
producer.close(Duration.ZERO);
181186
HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
182187
secondProducer.resumeTransaction(45L, epoch);
183-
secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
188+
secondProducer.sendOffsetsToTransaction(Collections.singletonMap(
189+
new TopicPartition("dummy_topic", 0),
190+
new OffsetAndMetadata(0L)), "__dummy_consumer_group");
184191
secondProducer.close(Duration.ZERO);
185192
}
186193
}

kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import kafka.zk.EmbeddedZookeeper;
2727
import org.apache.commons.io.FileUtils;
2828
import org.apache.hadoop.hive.common.IPStackUtils;
29-
import org.apache.kafka.common.network.Mode;
29+
import org.apache.kafka.common.network.ConnectionMode;
3030
import org.apache.kafka.common.utils.Time;
3131
import org.apache.kafka.test.TestSslUtils;
3232
import org.junit.rules.ExternalResource;
@@ -41,6 +41,7 @@
4141
import java.util.Map;
4242
import java.util.Properties;
4343
import java.util.stream.Collectors;
44+
import scala.Option;
4445

4546
/**
4647
* Test Helper Class to start and stop a kafka broker.
@@ -106,7 +107,8 @@ KafkaBrokerResource enableSASL(String principal, String keytab) {
106107
brokerProps.setProperty("listener.name.l2.gssapi.sasl.jaas.config", jaasConfig);
107108
brokerProps.setProperty("listener.name.l3.gssapi.sasl.jaas.config", jaasConfig);
108109
truststoreFile = File.createTempFile("kafka_truststore", "jks");
109-
brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(Mode.SERVER).createNewTrustStore(truststoreFile).build());
110+
brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(ConnectionMode.SERVER)
111+
.createNewTrustStore(truststoreFile).build());
110112
brokerProps.setProperty("delegation.token.master.key", "AnyValueShouldDoHereItDoesntMatter");
111113
}
112114
brokerProps.setProperty("offsets.topic.replication.factor", "1");
@@ -116,9 +118,9 @@ KafkaBrokerResource enableSASL(String principal, String keytab) {
116118
kafkaServer = TestUtils.createServer(config, Time.SYSTEM);
117119
kafkaServer.startup();
118120
kafkaServer.zkClient();
119-
adminZkClient = new AdminZkClient(kafkaServer.zkClient());
121+
adminZkClient = new AdminZkClient(kafkaServer.zkClient(), Option.empty());
120122
LOG.info("Creating kafka TOPIC [{}]", TOPIC);
121-
adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
123+
adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$, false);
122124
}
123125

124126
/**

kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import javax.annotation.Nullable;
4646
import java.nio.charset.Charset;
47+
import java.time.Duration;
4748
import java.util.Arrays;
4849
import java.util.Iterator;
4950
import java.util.List;
@@ -304,7 +305,7 @@ private static void sendData(List<ConsumerRecord<byte[], byte[]>> recordList, @N
304305
@After public void tearDown() {
305306
this.kafkaRecordIterator = null;
306307
if (this.consumer != null) {
307-
this.consumer.close();
308+
this.consumer.close(Duration.ZERO);
308309
}
309310
}
310311

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@
173173
<junit.version>4.13.2</junit.version>
174174
<junit.jupiter.version>5.13.3</junit.jupiter.version>
175175
<junit.vintage.version>5.13.3</junit.vintage.version>
176-
<kafka.version>2.5.0</kafka.version>
176+
<kafka.version>3.9.1</kafka.version>
177177
<kryo.version>5.5.0</kryo.version>
178178
<reflectasm.version>1.11.9</reflectasm.version>
179179
<kudu.version>1.17.0</kudu.version>

0 commit comments

Comments
 (0)