From 7c118bdd7e48af2950302547db824a6eeb524067 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 17 Jun 2026 23:06:38 +0100 Subject: [PATCH 1/2] KAFKA-20524: Improve reset offset usability --- .../admin/AlterShareGroupOffsetsResult.java | 4 +- .../group/ConsumerGroupCommandOptions.java | 52 +++++++++---------- .../consumer/group/ShareGroupCommand.java | 2 +- .../group/ShareGroupCommandOptions.java | 40 +++++++------- .../streams/StreamsGroupCommandOptions.java | 31 ++++++----- 5 files changed, 61 insertions(+), 68 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java index 96582297f7543..a21eaade3674c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterShareGroupOffsetsResult.java @@ -76,8 +76,8 @@ public KafkaFuture all() { .collect(Collectors.toList()); for (ApiException exception : topicPartitionErrorsMap.values()) { if (exception != null) { - throw Errors.forException(exception).exception( - "Failed altering group offsets for the following partitions: " + partitionsFailed); + throw Errors.forException(exception).exception(exception.getMessage() + + " Failed altering group offsets for the following partitions: " + partitionsFailed); } } return null; diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java index 4b2ecaa4d01e1..8909613052232 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java @@ -28,57 +28,55 @@ import static org.apache.kafka.tools.ToolsUtils.minus; public class ConsumerGroupCommandOptions extends CommandDefaultOptions { + private static final String NL = System.lineSeparator(); private static final String BOOTSTRAP_SERVER_DOC = "The server(s) to connect to. REQUIRED for all options except for --validate-regex."; - private static final String GROUP_DOC = "The consumer group we wish to act on."; - private static final String TOPIC_DOC = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " + - "In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " + - "Reset-offsets also supports multiple topic inputs."; - private static final String ALL_TOPICS_DOC = "Consider all topics assigned to a group in the `reset-offsets` process."; + private static final String GROUP_DOC = "The consumer group id."; + private static final String TOPIC_DOC = "The topic whose offset information should be deleted or reset. " + + "In `reset-offsets` case, partitions can be specified using this format: `topic:0,1,2`, where 0,1,2 are the partitions to be included. " + + "Reset-offsets also supports multiple topics."; + private static final String ALL_TOPICS_DOC = "Apply to all topics."; private static final String LIST_DOC = "List all consumer groups."; private static final String DESCRIBE_DOC = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group."; private static final String ALL_GROUPS_DOC = "Apply to all consumer groups."; - private static final String NL = System.lineSeparator(); - private static final String DELETE_DOC = "Pass in groups to delete topic partition offsets and ownership information " + - "over the entire consumer group. For instance --group g1 --group g2"; + private static final String DELETE_DOC = "Delete topic partition offsets and ownership information for one or more consumer groups."; private static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + "to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " + "or is going through some changes)."; private static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client and Consumer."; - private static final String RESET_OFFSETS_DOC = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + NL + + private static final String RESET_OFFSETS_DOC = "Reset offsets of consumer group. Supports one consumer group at a time, and instances should be inactive" + NL + "Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. " + "Additionally, the --export option is used to export the offsets in CSV format." + NL + "You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " + "--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL + - "To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'."; - private static final String DRY_RUN_DOC = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets."; - private static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets."; - private static final String EXPORT_DOC = "Export offset information in CSV format. Supported operations: reset-offsets."; + "To define the scope, use --all-topics or --topic. The scope must be specified unless you use --from-file."; + private static final String DRY_RUN_DOC = "Output offset reset information without executing the operation. Supported operations: reset-offsets."; + private static final String EXECUTE_DOC = "Execute the offset reset operation. Supported operations: reset-offsets."; + private static final String EXPORT_DOC = "Generate offset reset information in CSV format for export to a file. Supported operations: reset-offsets."; private static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset."; private static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file."; private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'"; - private static final String RESET_BY_DURATION_DOC = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'"; + private static final String RESET_BY_DURATION_DOC = "Reset offsets by duration from current timestamp. Format: 'PnDTnHnMnS'"; private static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset."; private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset."; private static final String RESET_TO_CURRENT_DOC = "Reset offsets to current offset."; private static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative."; - private static final String MEMBERS_DOC = "Describe members of the group. This option may be used with '--describe' and '--bootstrap-server' options only." + NL + + private static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the --describe option only." + NL + "Example: --bootstrap-server localhost:9092 --describe --group group1 --members"; - private static final String VERBOSE_DOC = "Provide additional information, if any, when describing the group. This option may be used " + - "with '--offsets'/'--members'/'--state' and '--bootstrap-server' options only." + NL + "Example: --bootstrap-server localhost:9092 --describe --group group1 --members --verbose"; + private static final String VERBOSE_DOC = "Provide additional information, if any, when describing the group. This option may be used with the --describe option only."; private static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset lag. " + - "This is the default sub-action of and may be used with '--describe' and '--bootstrap-server' options only." + NL + + "This is the default sub-action of and may be used with the --describe option only." + NL + "Example: --bootstrap-server localhost:9092 --describe --group group1 --offsets"; - private static final String STATE_DOC = "When specified with '--describe', includes the state of the group." + NL + + private static final String STATE_DOC = "When specified with --describe, includes the state of the group." + NL + "Example: --bootstrap-server localhost:9092 --describe --group group1 --state" + NL + - "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states." + NL + + "When specified with --list, it displays the states of the groups. It can also be used to list groups with specific states." + NL + "Example: --bootstrap-server localhost:9092 --list --state stable,empty" + NL + - "This option may be used with '--describe', '--list' and '--bootstrap-server' options only."; - private static final String TYPE_DOC = "When specified with '--list', it displays the types of all the groups. It can also be used to list groups with specific types." + NL + + "This option may be used with --describe and --list options only."; + private static final String TYPE_DOC = "When specified with --list, it displays the types of the groups. It can also be used to list groups with specific types." + NL + "Example: --bootstrap-server localhost:9092 --list --type classic,consumer" + NL + - "This option may be used with the '--list' option only."; - private static final String DELETE_OFFSETS_DOC = "Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics."; - private static final String VALIDATE_REGEX_DOC = "Validate that the syntax of the provided regular expression is valid according to the RE2 format."; + "This option may be used with the --list option only."; + private static final String DELETE_OFFSETS_DOC = "Delete offsets of consumer group. Supports one consumer group at a time, and multiple topics."; + private static final String VALIDATE_REGEX_DOC = "Validate the syntax of the regular expression according to the RE2 format."; final OptionSpec bootstrapServerOpt; final OptionSpec groupOpt; @@ -130,7 +128,7 @@ private ConsumerGroupCommandOptions(String[] args) { .ofType(String.class); groupOpt = parser.accepts("group", GROUP_DOC) .withRequiredArg() - .describedAs("consumer group") + .describedAs("group id") .ofType(String.class); topicOpt = parser.accepts("topic", TOPIC_DOC) .withRequiredArg() @@ -208,7 +206,7 @@ private ConsumerGroupCommandOptions(String[] args) { @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"}) void checkArgs() { - CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets."); + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list, describe, reset and delete consumer groups."); if (!options.has(validateRegexOpt)) { CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index d06ee58bbd8d7..2e4b5f50f1791 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -299,7 +299,7 @@ Map deleteShareGroups() { for (String groupId : groupIdSet) { Optional listing = shareGroupIds.stream().filter(item -> item.groupId().equals(groupId)).findAny(); if (listing.isEmpty()) { - errGroups.put(groupId, new IllegalArgumentException("Group '" + groupId + "' is not a share group.")); + errGroups.put(groupId, new GroupIdNotFoundException("Group '" + groupId + "' is not a share group.")); } else { Optional groupState = listing.get().groupState(); groupState.ifPresent(state -> { diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java index a3f1eff877a0d..c36057498ae67 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java @@ -19,9 +19,6 @@ import org.apache.kafka.server.util.CommandDefaultOptions; import org.apache.kafka.server.util.CommandLineUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -31,45 +28,44 @@ import static org.apache.kafka.tools.ToolsUtils.minus; public class ShareGroupCommandOptions extends CommandDefaultOptions { - private static final Logger LOGGER = LoggerFactory.getLogger(ShareGroupCommandOptions.class); + private static final String NL = System.lineSeparator(); private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; - private static final String GROUP_DOC = "The share group we wish to act on."; - private static final String TOPIC_DOC = "The topic whose offset information should be deleted or included in the reset offset process. " + - "When resetting offsets, partitions can be specified using this format: 'topic1:0,1,2', where 0,1,2 are the partitions to be included."; - private static final String ALL_TOPICS_DOC = "Consider all topics assigned to a share group in the 'reset-offsets' process."; + private static final String GROUP_DOC = "The share group id."; + private static final String TOPIC_DOC = "The topic whose offset information should be deleted or reset. " + + "When resetting offsets, partitions can be specified using this format: `topic:0,1,2`, where 0,1,2 are the partitions to be included."; + private static final String ALL_TOPICS_DOC = "Apply to all topics. Supported operations: reset-offsets."; private static final String LIST_DOC = "List all share groups."; private static final String DESCRIBE_DOC = "Describe share group, members and offset information."; private static final String ALL_GROUPS_DOC = "Apply to all share groups."; - private static final String NL = System.lineSeparator(); private static final String DELETE_DOC = "Delete share group."; private static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + "to specify the maximum amount of time in milliseconds to wait before the group stabilizes."; private static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client."; - private static final String RESET_OFFSETS_DOC = "Reset offsets of share group. Supports one share group at the time, and instances must be inactive." + NL + + private static final String RESET_OFFSETS_DOC = "Reset offsets of share group. Supports one share group at a time, and instances must be inactive." + NL + "Has 2 execution options: --dry-run to plan which offsets to reset, and --execute to reset the offsets. " + NL + "Additionally, the --export option is used to export the offsets in CSV format." + NL + "You must choose one of the following reset specifications: --to-datetime, --to-earliest, --to-latest, --from-file, --to-current, --to-offset." + NL + - "To define the scope, use --all-topics, --topic or --from-file." + NL + - "Fails if neither '--dry-run' nor '--execute' is specified."; - private static final String DRY_RUN_DOC = "Only show results without executing changes on share groups. Supported operations: reset-offsets."; - private static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets."; - private static final String EXPORT_DOC = "Export offset information in CSV format. Supported operations: reset-offsets."; + "To define the scope, use --all-topics or --topic. The scope must be specified unless you use --from-file." + NL + + "Fails if neither --dry-run nor --execute is specified."; + private static final String DRY_RUN_DOC = "Output offset reset information without executing the operation. Supported operations: reset-offsets."; + private static final String EXECUTE_DOC = "Execute the offset reset operation. Supported operations: reset-offsets."; + private static final String EXPORT_DOC = "Generate offset reset information in CSV format for export to a file. Supported operations: reset-offsets."; private static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset."; private static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file."; private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'"; private static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset."; private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset."; private static final String RESET_TO_CURRENT_DOC = "Reset offsets to current offset."; - private static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the '--describe' option only."; + private static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the --describe option only."; private static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset information. " + - "This is the default sub-action and may be used with the '--describe' option only."; - private static final String STATE_DOC = "When specified with '--describe', includes the state of the group." + NL + - "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. " + + "This is the default sub-action and may be used with the --describe option only."; + private static final String STATE_DOC = "When specified with --describe, includes the state of the group." + NL + + "When specified with --list, it displays the state of all groups. It can also be used to list groups with specific states. " + "Valid values are Empty, Stable and Dead."; private static final String VERBOSE_DOC = "Provide additional information, if any, when describing the group. This option may be used " + - "with the '--describe' option only."; - private static final String DELETE_OFFSETS_DOC = "Delete offsets of share group. Supports one share group at the time, and multiple topics."; + "with the --describe option only."; + private static final String DELETE_OFFSETS_DOC = "Delete offsets of share group. Supports one share group at a time, and multiple topics."; final OptionSpec bootstrapServerOpt; final OptionSpec groupOpt; @@ -113,7 +109,7 @@ public ShareGroupCommandOptions(String[] args) { .ofType(String.class); groupOpt = parser.accepts("group", GROUP_DOC) .withRequiredArg() - .describedAs("share group") + .describedAs("group id") .ofType(String.class); topicOpt = parser.accepts("topic", TOPIC_DOC) .withRequiredArg() diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java index 170dc241f7167..b569f2edced2b 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java @@ -31,39 +31,38 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions { private static final String NL = System.lineSeparator(); private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; - private static final String GROUP_DOC = "The streams group we wish to act on."; + private static final String GROUP_DOC = "The streams group id."; private static final String ALL_GROUPS_DOC = "Apply to all streams groups."; private static final String INPUT_TOPIC_DOC = "The input topic whose committed offset should be deleted or reset. " + - "In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " + + "In --reset-offsets case, partitions can be specified using this format: `topic:0,1,2`, where 0,1,2 are the partitions to be included. " + "Multiple input topics can be specified. Supported operations: delete-offsets, reset-offsets."; - private static final String ALL_INPUT_TOPICS_DOC = "Consider all source topics used in the topology of the group. Supported operations: delete-offsets, reset-offsets."; + private static final String ALL_INPUT_TOPICS_DOC = "Consider all input topics used in the topology of the group. Supported operations: delete-offsets, reset-offsets."; private static final String LIST_DOC = "List all streams groups."; private static final String DESCRIBE_DOC = "Describe streams group and list offset lag related to given group."; - private static final String DELETE_DOC = "Pass in groups to delete topic partition offsets and ownership information " + - "over the entire streams group. For instance --group g1 --group g2"; - private static final String DELETE_OFFSETS_DOC = "Delete offsets of streams group. Supports one streams group at the time, and multiple topics."; + private static final String DELETE_DOC = "Delete topic partition offsets and ownership information for one or more streams groups."; + private static final String DELETE_OFFSETS_DOC = "Delete offsets of streams group. Supports one streams group at a time, and multiple topics."; private static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + "to specify the maximum amount of time in milliseconds to wait before the group stabilizes."; private static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client."; - private static final String STATE_DOC = "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. " + + private static final String STATE_DOC = "When specified with --list, it displays the state of all groups. It can also be used to list groups with specific states. " + "Valid values are Empty, NotReady, Stable, Assigning, Reconciling, and Dead."; - private static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the '--describe' option only."; + private static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the --describe option only."; private static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset information." + - "This is the default sub-action and may be used with the '--describe' option only."; + "This is the default sub-action and may be used with the --describe option only."; private static final String RESET_OFFSETS_DOC = "Reset offsets of streams group. The instances should be inactive." + NL + "Has 2 execution options: --dry-run to plan which offsets to reset, and --execute to update the offsets." + NL + "If you use --execute, all internal topics linked to the group will also be deleted." + NL + "You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " + "--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL + - "To define the scope use --all-input-topics or --input-topic. One scope must be specified unless you use '--from-file'." + NL + - "Fails if neither '--dry-run' nor '--execute' is specified."; + "To define the scope, use --all-input-topics or --input-topic. The scope must be specified unless you use --from-file." + NL + + "Fails if neither --dry-run nor --execute is specified."; private static final String DRY_RUN_DOC = "Only show results without executing changes on streams group. Supported operations: reset-offsets."; - private static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets."; - private static final String EXPORT_DOC = "Export operation execution to a CSV file. Supported operations: reset-offsets."; + private static final String EXECUTE_DOC = "Execute the offset reset operation. Supported operations: reset-offsets."; + private static final String EXPORT_DOC = "Generate offset reset information in CSV format for export to a file. Supported operations: reset-offsets."; private static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset."; private static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file."; private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'"; - private static final String RESET_BY_DURATION_DOC = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'"; + private static final String RESET_BY_DURATION_DOC = "Reset offsets by duration from current timestamp. Format: 'PnDTnHnMnS'"; private static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset."; private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset."; private static final String RESET_TO_CURRENT_DOC = "Reset offsets to current offset."; @@ -128,7 +127,7 @@ public StreamsGroupCommandOptions(String[] args) { .ofType(String.class); groupOpt = parser.accepts("group", GROUP_DOC) .withRequiredArg() - .describedAs("streams group") + .describedAs("group id") .ofType(String.class); inputTopicOpt = parser.accepts("input-topic", INPUT_TOPIC_DOC) .withRequiredArg() @@ -204,7 +203,7 @@ public StreamsGroupCommandOptions(String[] args) { @SuppressWarnings("NPathComplexity") void checkArgs() { - CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list, or describe streams groups."); + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list, describe, reset and delete streams groups."); CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); From 2cced3b507eb07335ea64795ec1529bde09c3ce5 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 18 Jun 2026 13:28:31 +0100 Subject: [PATCH 2/2] Improve messages and documentation --- docs/getting-started/upgrade.md | 1 + docs/operations/basic-kafka-operations.md | 23 +++++++++++++++---- .../group/ConsumerGroupCommandOptions.java | 10 ++++---- .../group/ShareGroupCommandOptions.java | 12 +++++----- .../streams/StreamsGroupCommandOptions.java | 14 +++++------ 5 files changed, 38 insertions(+), 22 deletions(-) diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md index ff2f7e93077d6..07b13f2d826c7 100644 --- a/docs/getting-started/upgrade.md +++ b/docs/getting-started/upgrade.md @@ -35,6 +35,7 @@ type: docs * The `ClientQuotaCallback#updateClusterMetadata` method is deprecated and will be removed in Kafka 5.0. Custom implementations of `ClientQuotaCallback` no longer need to override this method, as a default no-op implementation is now provided. For further details, please refer to [KIP-1200](https://cwiki.apache.org/confluence/x/axBJFg). * The in-memory keystores (used for PEM certificates) now use the default type provided by `KeyStore.getDefaultType()` instead of the hardcoded PKCS12 type. * Storage directories formatted by the `kafka-storage` tool are no longer forward-compatible. A Kafka broker must be the same version as, or newer than, the `kafka-storage` tool that formatted its directory, regardless of the `--release-version` chosen at format time. For further details, please refer to [KIP-1170](https://cwiki.apache.org/confluence/x/ZYoEFQ). + * The `kafka-share-groups.sh` tool can be used to initialize share group offsets from a file. This enables a share group to be initialized from the committed offsets of a consumer group. For further details, please refer to [KIP-1323](https://cwiki.apache.org/confluence/x/vY0mGQ). * Several Yammer-based group coordinator metrics are deprecated and will be removed in Kafka 5.0 in favor of equivalent Kafka Metrics. Please use `kafka.server:type=group-coordinator-metrics,name=group-count,protocol=classic` instead of `kafka.coordinator.group:type=GroupMetadataManager,name=NumGroups`, `kafka.server:type=group-coordinator-metrics,name=offset-count` instead of `kafka.coordinator.group:type=GroupMetadataManager,name=NumOffsets`, and diff --git a/docs/operations/basic-kafka-operations.md b/docs/operations/basic-kafka-operations.md index 2baaeae674fcb..d05f119ca46c5 100644 --- a/docs/operations/basic-kafka-operations.md +++ b/docs/operations/basic-kafka-operations.md @@ -227,9 +227,9 @@ To reset offsets of a consumer group, "--reset-offsets" option can be used. This It has 3 execution options: - * (default) to display which offsets to reset. + * \--dry-run : to display which offsets to reset. (the default) * \--execute : to execute --reset-offsets process. - * \--export : to export the results to a CSV format. + * \--export : to generate offset reset information in CSV format for export to a file. @@ -297,10 +297,11 @@ You can see that both members have been assigned the same partition which they a To reset the offsets of a share group, use the "--reset-offsets" option: -It has 2 execution options: +It has 3 execution options: - * \--dry-run: to display which offsets to reset. + * \--dry-run : to display which offsets to reset. * \--execute : to execute --reset-offsets process. + * \--export : to generate offset reset information in CSV format for export to a file. @@ -309,6 +310,9 @@ It has 2 execution options: * \--to-datetime : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss' * \--to-earliest : Reset offsets to earliest offset. * \--to-latest : Reset offsets to latest offset. + * \--from-file : Reset offsets to values defined in CSV file. + * \--to-current : Resets offsets to current offset. + * \--to-offset : Reset offsets to a specific offset. @@ -320,6 +324,17 @@ GROUP TOPIC PARTITION NEW-OFFSET my-share-group topic1 0 10 ``` +For example, you can export the current offsets from an inactive consumer group and use them to set the offsets for a share group: + +```bash +$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my-group --all-topics --to-current --dry-run --export > FILE.CSV + +$ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my-share-group --from-file FILE.CSV --execute +GROUP TOPIC PARTITION NEW-OFFSET +my-share-group topic1 0 10 +``` + + To delete the offsets of individual topics in the share group, use the "--delete-offsets" option: ```bash diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java index 8909613052232..a0291f8536fde 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java @@ -50,9 +50,9 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions { "You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " + "--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL + "To define the scope, use --all-topics or --topic. The scope must be specified unless you use --from-file."; - private static final String DRY_RUN_DOC = "Output offset reset information without executing the operation. Supported operations: reset-offsets."; - private static final String EXECUTE_DOC = "Execute the offset reset operation. Supported operations: reset-offsets."; - private static final String EXPORT_DOC = "Generate offset reset information in CSV format for export to a file. Supported operations: reset-offsets."; + private static final String DRY_RUN_DOC = "Output offset reset information without executing the operation. Supported operation: reset-offsets."; + private static final String EXECUTE_DOC = "Execute the offset reset operation. Supported operation: reset-offsets."; + private static final String EXPORT_DOC = "Generate offset reset information in CSV format for export to a file. Supported operation: reset-offsets."; private static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset."; private static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file."; private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'"; @@ -67,7 +67,7 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions { private static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset lag. " + "This is the default sub-action of and may be used with the --describe option only." + NL + "Example: --bootstrap-server localhost:9092 --describe --group group1 --offsets"; - private static final String STATE_DOC = "When specified with --describe, includes the state of the group." + NL + + private static final String STATE_DOC = "When specified with --describe, it displays the state of the group." + NL + "Example: --bootstrap-server localhost:9092 --describe --group group1 --state" + NL + "When specified with --list, it displays the states of the groups. It can also be used to list groups with specific states." + NL + "Example: --bootstrap-server localhost:9092 --list --state stable,empty" + NL + @@ -206,7 +206,7 @@ private ConsumerGroupCommandOptions(String[] args) { @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"}) void checkArgs() { - CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list, describe, reset and delete consumer groups."); + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list, describe, delete and manage the offsets of consumer groups."); if (!options.has(validateRegexOpt)) { CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java index c36057498ae67..73898675f28e6 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java @@ -34,7 +34,7 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { private static final String GROUP_DOC = "The share group id."; private static final String TOPIC_DOC = "The topic whose offset information should be deleted or reset. " + "When resetting offsets, partitions can be specified using this format: `topic:0,1,2`, where 0,1,2 are the partitions to be included."; - private static final String ALL_TOPICS_DOC = "Apply to all topics. Supported operations: reset-offsets."; + private static final String ALL_TOPICS_DOC = "Apply to all topics. Supported operation: reset-offsets."; private static final String LIST_DOC = "List all share groups."; private static final String DESCRIBE_DOC = "Describe share group, members and offset information."; private static final String ALL_GROUPS_DOC = "Apply to all share groups."; @@ -48,9 +48,9 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { "You must choose one of the following reset specifications: --to-datetime, --to-earliest, --to-latest, --from-file, --to-current, --to-offset." + NL + "To define the scope, use --all-topics or --topic. The scope must be specified unless you use --from-file." + NL + "Fails if neither --dry-run nor --execute is specified."; - private static final String DRY_RUN_DOC = "Output offset reset information without executing the operation. Supported operations: reset-offsets."; - private static final String EXECUTE_DOC = "Execute the offset reset operation. Supported operations: reset-offsets."; - private static final String EXPORT_DOC = "Generate offset reset information in CSV format for export to a file. Supported operations: reset-offsets."; + private static final String DRY_RUN_DOC = "Output offset reset information without executing the operation. Supported operation: reset-offsets."; + private static final String EXECUTE_DOC = "Execute the offset reset operation. Supported operation: reset-offsets."; + private static final String EXPORT_DOC = "Generate offset reset information in CSV format for export to a file. Supported operation: reset-offsets."; private static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset."; private static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file."; private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'"; @@ -60,7 +60,7 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { private static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the --describe option only."; private static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset information. " + "This is the default sub-action and may be used with the --describe option only."; - private static final String STATE_DOC = "When specified with --describe, includes the state of the group." + NL + + private static final String STATE_DOC = "When specified with --describe, it displays the state of the group." + NL + "When specified with --list, it displays the state of all groups. It can also be used to list groups with specific states. " + "Valid values are Empty, Stable and Dead."; private static final String VERBOSE_DOC = "Provide additional information, if any, when describing the group. This option may be used " + @@ -173,7 +173,7 @@ public ShareGroupCommandOptions(String[] args) { @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"}) public void checkArgs() { - CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list, describe, reset and delete share groups."); + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list, describe, delete and manage the offsets of share groups."); CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java index b569f2edced2b..82258ba006159 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java @@ -56,9 +56,9 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions { "--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL + "To define the scope, use --all-input-topics or --input-topic. The scope must be specified unless you use --from-file." + NL + "Fails if neither --dry-run nor --execute is specified."; - private static final String DRY_RUN_DOC = "Only show results without executing changes on streams group. Supported operations: reset-offsets."; - private static final String EXECUTE_DOC = "Execute the offset reset operation. Supported operations: reset-offsets."; - private static final String EXPORT_DOC = "Generate offset reset information in CSV format for export to a file. Supported operations: reset-offsets."; + private static final String DRY_RUN_DOC = "Only show results without executing changes on streams group. Supported operation: reset-offsets."; + private static final String EXECUTE_DOC = "Execute the offset reset operation. Supported operation: reset-offsets."; + private static final String EXPORT_DOC = "Generate offset reset information in CSV format for export to a file. Supported operation: reset-offsets."; private static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset."; private static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file."; private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'"; @@ -67,14 +67,14 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions { private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset."; private static final String RESET_TO_CURRENT_DOC = "Reset offsets to current offset."; private static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative."; - private static final String DELETE_INTERNAL_TOPIC_DOC = "Delete specified internal topic of the streams group. Supported operations: reset-offsets." + + private static final String DELETE_INTERNAL_TOPIC_DOC = "Delete specified internal topic of the streams group. Supported operation: reset-offsets." + "This option is applicable only when --execute is used."; private static final String DELETE_ALL_INTERNAL_TOPICS_DOC = "Delete all internal topics linked to the streams group. Supported operations: reset-offsets, delete." + "With reset-offsets, this option is applicable only when --execute is used."; private static final String VERBOSE_DOC = """ - Use with --describe --state to show group epoch and target assignment epoch. + Use with --describe --state to show group epoch and target assignment epoch. Use with --describe --members to show for each member the member epoch, target assignment epoch, current assignment, target assignment, and whether member is still using the classic rebalance protocol. - Use with --describe --offsets and --describe to show leader epochs for each partition."""; + Use with --describe --offsets and --describe to show leader epochs for each partition."""; final OptionSpec bootstrapServerOpt; final OptionSpec groupOpt; @@ -203,7 +203,7 @@ public StreamsGroupCommandOptions(String[] args) { @SuppressWarnings("NPathComplexity") void checkArgs() { - CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list, describe, reset and delete streams groups."); + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list, describe, delete and manage the offsets of streams groups."); CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);