Skip to content

Commit 43286c6

Browse files
committed
Remove deprecated version attribute and Bump CP 7.9.4
1 parent 01daac8 commit 43286c6

File tree

5 files changed

+21
-22
lines changed

5 files changed

+21
-22
lines changed

dsl/docker-compose.yml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
---
2-
version: '2'
32
services:
43
broker:
5-
image: confluentinc/cp-kafka:7.7.0
4+
image: confluentinc/cp-kafka:7.9.4
65
hostname: broker
76
container_name: broker
87
networks:
@@ -31,7 +30,7 @@ services:
3130
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
3231

3332
control-center:
34-
image: confluentinc/cp-enterprise-control-center:7.7.0
33+
image: confluentinc/cp-enterprise-control-center:7.9.4
3534
hostname: control-center
3635
container_name: control-center
3736
networks:
@@ -49,7 +48,7 @@ services:
4948
PORT: 9021
5049

5150
init-kafka:
52-
image: confluentinc/cp-kafka:7.7.0
51+
image: confluentinc/cp-kafka:7.9.4
5352
hostname: init-kafka
5453
container_name: init-kafka
5554
networks:

dsl/src/main/java/com/michelin/kafka/error/handling/dsl/KafkaStreamsApp.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,20 @@ public static void main(String[] args) {
4747
StreamsBuilder streamsBuilder = new StreamsBuilder();
4848
buildTopology(streamsBuilder);
4949

50-
try (KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties)) {
51-
kafkaStreams.setUncaughtExceptionHandler(
52-
exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
53-
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
50+
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
5451

55-
kafkaStreams.start();
56-
}
52+
kafkaStreams.setUncaughtExceptionHandler(
53+
_ -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
54+
55+
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
56+
57+
kafkaStreams.start();
5758
}
5859

5960
public static void buildTopology(StreamsBuilder streamsBuilder) {
6061
streamsBuilder.stream("delivery_booked_topic", Consumed.with(Serdes.String(), Serdes.String()))
6162
.mapValues(KafkaStreamsApp::parseFromJson) // JsonSyntaxException
62-
.filter((key, value) -> {
63+
.filter((_, value) -> {
6364
if (value.getNumberOfTires() < 0) {
6465
throw new InvalidDeliveryException("Number of tires cannot be negative");
6566
}

processor-api/docker-compose.yml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
---
2-
version: '2'
32
services:
43
broker:
5-
image: confluentinc/cp-kafka:7.7.0
4+
image: confluentinc/cp-kafka:7.9.4
65
hostname: broker
76
container_name: broker
87
networks:
@@ -31,7 +30,7 @@ services:
3130
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
3231

3332
control-center:
34-
image: confluentinc/cp-enterprise-control-center:7.7.0
33+
image: confluentinc/cp-enterprise-control-center:7.9.4
3534
hostname: control-center
3635
container_name: control-center
3736
networks:
@@ -49,7 +48,7 @@ services:
4948
PORT: 9021
5049

5150
init-kafka:
52-
image: confluentinc/cp-kafka:7.7.0
51+
image: confluentinc/cp-kafka:7.9.4
5352
hostname: init-kafka
5453
container_name: init-kafka
5554
networks:

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/CustomProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class CustomProcessor extends ContextualProcessor<String, DeliveryBooked,
5050
@Override
5151
public void init(ProcessorContext<String, String> context) {
5252
super.init(context);
53-
context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
53+
context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, _ -> {
5454
// Simulate an intentional exception
5555
throw new KaboomException("Simulated exception in punctuation");
5656
});

processor-api/src/main/java/com/michelin/kafka/error/handling/papi/KafkaStreamsApp.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ public static void main(String[] args) {
4848
StreamsBuilder streamsBuilder = new StreamsBuilder();
4949
buildTopology(streamsBuilder);
5050

51-
try (KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties)) {
52-
kafkaStreams.setUncaughtExceptionHandler(
53-
exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
54-
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
51+
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
5552

56-
kafkaStreams.start();
57-
}
53+
kafkaStreams.setUncaughtExceptionHandler(
54+
_ -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
55+
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
56+
57+
kafkaStreams.start();
5858
}
5959

6060
public static void buildTopology(StreamsBuilder streamsBuilder) {

0 commit comments

Comments
 (0)