Skip to content

Commit 82d1db6

Browse files
authored
MINOR: code cleanup (#6054)
Reviewers: Bill Bejeck <[email protected]>, John Roesler <[email protected]>, Guozhang Wang <[email protected]>, Ryanne Dolan <[email protected]>, Ismael Juma <[email protected]>
1 parent 0206e6c commit 82d1db6

24 files changed

+1052
-496
lines changed

checkstyle/suppressions.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@
165165
files="KStreamKStreamJoinTest.java"/>
166166
<suppress checks="MethodLength"
167167
files="KStreamWindowAggregateTest.java"/>
168+
<suppress checks="MethodLength"
169+
files="RocksDBWindowStoreTest.java"/>
168170

169171
<suppress checks="ClassDataAbstractionCoupling"
170172
files=".*[/\\]streams[/\\].*test[/\\].*.java"/>

streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
8686
joinThisStoreNames);
8787

8888
if (materializedInternal != null) {
89-
final StoreBuilder<KeyValueStore<K, VR>> storeBuilder
90-
= new KeyValueStoreMaterializer<>(materializedInternal).materialize();
89+
final StoreBuilder<KeyValueStore<K, VR>> storeBuilder =
90+
new KeyValueStoreMaterializer<>(materializedInternal).materialize();
9191
topologyBuilder.addStateStore(storeBuilder, mergeProcessorName);
9292
}
9393
}

streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@
1616
*/
1717
package org.apache.kafka.streams;
1818

19-
import java.io.IOException;
20-
import java.nio.file.Files;
21-
import java.nio.file.Path;
22-
import java.time.Duration;
2319
import org.apache.kafka.clients.CommonClientConfigs;
2420
import org.apache.kafka.clients.consumer.ConsumerConfig;
2521
import org.apache.kafka.clients.producer.MockProducer;
@@ -61,6 +57,10 @@
6157
import org.junit.rules.TestName;
6258

6359
import java.io.File;
60+
import java.io.IOException;
61+
import java.nio.file.Files;
62+
import java.nio.file.Path;
63+
import java.time.Duration;
6464
import java.util.Collection;
6565
import java.util.Collections;
6666
import java.util.HashMap;
@@ -215,7 +215,7 @@ public void testStateOneThreadDeadButRebalanceFinish() throws InterruptedExcepti
215215
@Test
216216
public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
217217
builder.globalTable("anyTopic");
218-
final List<Node> nodes = asList(new Node(0, "localhost", 8121));
218+
final List<Node> nodes = Collections.singletonList(new Node(0, "localhost", 8121));
219219
final Cluster cluster = new Cluster("mockClusterId", nodes,
220220
Collections.emptySet(), Collections.emptySet(),
221221
Collections.emptySet(), nodes.get(0));
@@ -326,14 +326,11 @@ public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
326326

327327
// make sure we have the global state thread running too
328328
builder.globalTable("anyTopic");
329-
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
330-
try {
329+
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
331330
streams.start();
332331
fail("expected start() to time out and throw an exception.");
333332
} catch (final StreamsException expected) {
334333
// This is a result of not being able to connect to the broker.
335-
} finally {
336-
streams.close();
337334
}
338335
// There's nothing to assert... We're testing that this operation actually completes.
339336
}
@@ -349,11 +346,8 @@ public void testLocalThreadCloseWithoutConnectingToBroker() {
349346

350347
// make sure we have the global state thread running too
351348
builder.table("anyTopic");
352-
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
353-
try {
349+
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
354350
streams.start();
355-
} finally {
356-
streams.close();
357351
}
358352
// There's nothing to assert... We're testing that this operation actually completes.
359353
}
@@ -362,9 +356,8 @@ public void testLocalThreadCloseWithoutConnectingToBroker() {
362356
@Test
363357
public void testInitializesAndDestroysMetricsReporters() {
364358
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
365-
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
366359

367-
try {
360+
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
368361
final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
369362
final int initDiff = newInitCount - oldInitCount;
370363
assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
@@ -373,8 +366,6 @@ public void testInitializesAndDestroysMetricsReporters() {
373366
final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
374367
streams.close();
375368
assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get());
376-
} finally {
377-
streams.close();
378369
}
379370
}
380371

@@ -584,8 +575,7 @@ public void shouldCleanupOldStateDirs() throws InterruptedException {
584575

585576
builder.table(topic, Materialized.as("store"));
586577

587-
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
588-
try {
578+
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
589579
final CountDownLatch latch = new CountDownLatch(1);
590580
streams.setStateListener((newState, oldState) -> {
591581
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
@@ -601,21 +591,16 @@ public void shouldCleanupOldStateDirs() throws InterruptedException {
601591
verifyCleanupStateDir(appDir, oldTaskDir);
602592
assertTrue(oldTaskDir.mkdirs());
603593
verifyCleanupStateDir(appDir, oldTaskDir);
604-
} finally {
605-
streams.close();
606594
}
607595
}
608596

609597
@Test
610598
public void shouldThrowOnNegativeTimeoutForClose() {
611-
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
612-
try {
599+
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
613600
streams.close(Duration.ofMillis(-1L));
614601
fail("should not accept negative close parameter");
615602
} catch (final IllegalArgumentException e) {
616603
// expected
617-
} finally {
618-
streams.close();
619604
}
620605
}
621606

@@ -684,9 +669,12 @@ private Topology getStatefulTopology(final String inputTopic,
684669
final String globalStoreName,
685670
final boolean isPersistentStore) throws Exception {
686671
CLUSTER.createTopics(inputTopic, outputTopic, globalTopicName);
687-
final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(isPersistentStore ?
688-
Stores.persistentKeyValueStore(storeName) : Stores.inMemoryKeyValueStore(storeName),
689-
Serdes.String(), Serdes.Long());
672+
final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
673+
isPersistentStore ?
674+
Stores.persistentKeyValueStore(storeName)
675+
: Stores.inMemoryKeyValueStore(storeName),
676+
Serdes.String(),
677+
Serdes.Long());
690678
final Topology topology = new Topology();
691679
topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic)
692680
.addProcessor("process", () -> new AbstractProcessor<String, String>() {
@@ -769,7 +757,8 @@ private void startStreamsAndCheckDirExists(final Topology topology,
769757
}
770758
}
771759

772-
private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
760+
private void verifyCleanupStateDir(final String appDir,
761+
final File oldTaskDir) throws InterruptedException {
773762
final File taskDir = new File(appDir, "0_0");
774763
TestUtils.waitForCondition(
775764
() -> !oldTaskDir.exists() && taskDir.exists(),
@@ -785,7 +774,8 @@ public static class StateListenerStub implements KafkaStreams.StateListener {
785774
public Map<KafkaStreams.State, Long> mapStates = new HashMap<>();
786775

787776
@Override
788-
public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
777+
public void onChange(final KafkaStreams.State newState,
778+
final KafkaStreams.State oldState) {
789779
final long prevCount = mapStates.containsKey(newState) ? mapStates.get(newState) : 0;
790780
numChanges++;
791781
this.oldState = oldState;

streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,10 @@ public class EosIntegrationTest {
7373
private static final int MAX_WAIT_TIME_MS = 60 * 1000;
7474

7575
@ClassRule
76-
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, new Properties() {
77-
{
78-
put("auto.create.topics.enable", false);
79-
}
80-
});
76+
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
77+
NUM_BROKERS,
78+
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false"))
79+
);
8180

8281
private static String applicationId;
8382
private final static int NUM_TOPIC_PARTITIONS = 2;
@@ -154,23 +153,22 @@ private void runSimpleCopyTest(final int numberOfRestarts,
154153
}
155154
output.to(outputTopic);
156155

156+
final Properties properties = new Properties();
157+
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
158+
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
159+
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
160+
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
161+
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
162+
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
163+
properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
164+
157165
for (int i = 0; i < numberOfRestarts; ++i) {
158166
final Properties config = StreamsTestUtils.getStreamsConfig(
159167
applicationId,
160168
CLUSTER.bootstrapServers(),
161169
Serdes.LongSerde.class.getName(),
162170
Serdes.LongSerde.class.getName(),
163-
new Properties() {
164-
{
165-
put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
166-
put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
167-
put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
168-
put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
169-
put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
170-
put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
171-
put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
172-
}
173-
});
171+
properties);
174172

175173
try (final KafkaStreams streams = new KafkaStreams(builder.build(), config)) {
176174
streams.start();
@@ -275,11 +273,10 @@ public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
275273
CONSUMER_GROUP_ID,
276274
LongDeserializer.class,
277275
LongDeserializer.class,
278-
new Properties() {
279-
{
280-
put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
281-
}
282-
}),
276+
Utils.mkProperties(Collections.singletonMap(
277+
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
278+
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))
279+
),
283280
SINGLE_PARTITION_OUTPUT_TOPIC,
284281
firstBurstOfData.size()
285282
);
@@ -300,11 +297,10 @@ public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
300297
CONSUMER_GROUP_ID,
301298
LongDeserializer.class,
302299
LongDeserializer.class,
303-
new Properties() {
304-
{
305-
put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
306-
}
307-
}),
300+
Utils.mkProperties(Collections.singletonMap(
301+
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
302+
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))
303+
),
308304
SINGLE_PARTITION_OUTPUT_TOPIC,
309305
secondBurstOfData.size()
310306
);
@@ -691,11 +687,9 @@ private List<KeyValue<Long, Long>> readResult(final int numberOfRecords,
691687
groupId,
692688
LongDeserializer.class,
693689
LongDeserializer.class,
694-
new Properties() {
695-
{
696-
put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
697-
}
698-
}),
690+
Utils.mkProperties(Collections.singletonMap(
691+
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
692+
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))),
699693
SINGLE_PARTITION_OUTPUT_TOPIC,
700694
numberOfRecords
701695
);

streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,31 +33,39 @@
3333

3434
public class CopartitionedTopicsValidatorTest {
3535

36-
private final StreamsPartitionAssignor.CopartitionedTopicsValidator validator
37-
= new StreamsPartitionAssignor.CopartitionedTopicsValidator("thread");
36+
private final StreamsPartitionAssignor.CopartitionedTopicsValidator validator =
37+
new StreamsPartitionAssignor.CopartitionedTopicsValidator("thread");
3838
private final Map<TopicPartition, PartitionInfo> partitions = new HashMap<>();
3939
private final Cluster cluster = Cluster.empty();
4040

4141
@Before
4242
public void before() {
43-
partitions.put(new TopicPartition("first", 0), new PartitionInfo("first", 0, null, null, null));
44-
partitions.put(new TopicPartition("first", 1), new PartitionInfo("first", 1, null, null, null));
45-
partitions.put(new TopicPartition("second", 0), new PartitionInfo("second", 0, null, null, null));
46-
partitions.put(new TopicPartition("second", 1), new PartitionInfo("second", 1, null, null, null));
43+
partitions.put(
44+
new TopicPartition("first", 0),
45+
new PartitionInfo("first", 0, null, null, null));
46+
partitions.put(
47+
new TopicPartition("first", 1),
48+
new PartitionInfo("first", 1, null, null, null));
49+
partitions.put(
50+
new TopicPartition("second", 0),
51+
new PartitionInfo("second", 0, null, null, null));
52+
partitions.put(
53+
new TopicPartition("second", 1),
54+
new PartitionInfo("second", 1, null, null, null));
4755
}
4856

4957
@Test(expected = IllegalStateException.class)
5058
public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() {
5159
validator.validate(Collections.singleton("topic"),
52-
Collections.<String, StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(),
60+
Collections.emptyMap(),
5361
cluster);
5462
}
5563

5664
@Test(expected = TopologyException.class)
5765
public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() {
5866
partitions.remove(new TopicPartition("second", 0));
5967
validator.validate(Utils.mkSet("first", "second"),
60-
Collections.<String, StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(),
68+
Collections.emptyMap(),
6169
cluster.withPartitions(partitions));
6270
}
6371

@@ -100,11 +108,11 @@ public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartition
100108

101109
private StreamsPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String repartitionTopic,
102110
final int partitions) {
103-
final InternalTopicConfig repartitionTopicConfig
104-
= new RepartitionTopicConfig(repartitionTopic, Collections.<String, String>emptyMap());
111+
final InternalTopicConfig repartitionTopicConfig =
112+
new RepartitionTopicConfig(repartitionTopic, Collections.emptyMap());
105113

106-
final StreamsPartitionAssignor.InternalTopicMetadata metadata
107-
= new StreamsPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig);
114+
final StreamsPartitionAssignor.InternalTopicMetadata metadata =
115+
new StreamsPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig);
108116
metadata.numPartitions = partitions;
109117
return metadata;
110118
}

0 commit comments

Comments
 (0)