-
Notifications
You must be signed in to change notification settings - Fork 54k
Handle exceptions in Kafka stream application #18928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| } | ||
|
|
||
| @Override | ||
| public void configure(Map<String, ?> configs) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this isn't using the Baeldung formatter - do you have access to the Baeldung IDE formatter profile? Similar elsewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it from the earlier baeldung docs and made the changes.
| try { | ||
| return mapper.writeValueAsBytes(user); | ||
| } catch (JsonProcessingException ex) { | ||
| log.error("Error deserializing the user {} with exception {}", user, ex.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Best practice is to include the exception as the last parameter to the logger call too so the stack trace is preserved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, i've updated to include it.
| try { | ||
| return mapper.readValue(bytes, User.class); | ||
| } catch (IOException e) { | ||
| log.error("Error deserializing the message {} for topic {}", bytes, topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Best practice is to include the exception as the last parameter to the logger call too so the stack trace is preserved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, i've updated to include it.
| .untilAsserted(() -> { | ||
| ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(500)); | ||
| Map<String, Long> counts = StreamSupport.stream(records.spliterator(), false) | ||
| .collect(Collectors.toMap(ConsumerRecord::key, ConsumerRecord::value, (a, b) -> b)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason for the (a, b) -> b part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to avoid any map error if duplicate present and so provided with a merge function for resolution. But can be removed as well in our case as duplicate not possible.
No description provided.