Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public KafkaFuture<Void> all() {
.collect(Collectors.toList());
for (ApiException exception : topicPartitionErrorsMap.values()) {
if (exception != null) {
throw Errors.forException(exception).exception(
"Failed altering group offsets for the following partitions: " + partitionsFailed);
throw Errors.forException(exception).exception(exception.getMessage() +
" Failed altering group offsets for the following partitions: " + partitionsFailed);
}
}
return null;
Expand Down
1 change: 1 addition & 0 deletions docs/getting-started/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type: docs
* The `ClientQuotaCallback#updateClusterMetadata` method is deprecated and will be removed in Kafka 5.0. Custom implementations of `ClientQuotaCallback` no longer need to override this method, as a default no-op implementation is now provided. For further details, please refer to [KIP-1200](https://cwiki.apache.org/confluence/x/axBJFg).
* The in-memory keystores (used for PEM certificates) now use the default type provided by `KeyStore.getDefaultType()` instead of the hardcoded PKCS12 type.
* Storage directories formatted by the `kafka-storage` tool are no longer forward-compatible. A Kafka broker must be the same version as, or newer than, the `kafka-storage` tool that formatted its directory, regardless of the `--release-version` chosen at format time. For further details, please refer to [KIP-1170](https://cwiki.apache.org/confluence/x/ZYoEFQ).
* The `kafka-share-groups.sh` tool can be used to initialize share group offsets from a file. This enables a share group to be initialized from the committed offsets of a consumer group. For further details, please refer to [KIP-1323](https://cwiki.apache.org/confluence/x/vY0mGQ).
* Several Yammer-based group coordinator metrics are deprecated and will be removed in Kafka 5.0 in favor of equivalent Kafka Metrics.
Please use `kafka.server:type=group-coordinator-metrics,name=group-count,protocol=classic` instead of `kafka.coordinator.group:type=GroupMetadataManager,name=NumGroups`,
`kafka.server:type=group-coordinator-metrics,name=offset-count` instead of `kafka.coordinator.group:type=GroupMetadataManager,name=NumOffsets`, and
Expand Down
23 changes: 19 additions & 4 deletions docs/operations/basic-kafka-operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,9 @@ To reset offsets of a consumer group, "--reset-offsets" option can be used. This

It has 3 execution options:

* (default) to display which offsets to reset.
* \--dry-run : to display which offsets to reset. (the default)
* \--execute : to execute --reset-offsets process.
* \--export : to export the results to a CSV format.
* \--export : to generate offset reset information in CSV format for export to a file.



Expand Down Expand Up @@ -297,10 +297,11 @@ You can see that both members have been assigned the same partition which they a

To reset the offsets of a share group, use the "--reset-offsets" option:

It has 2 execution options:
It has 3 execution options:

* \--dry-run: to display which offsets to reset.
* \--dry-run : to display which offsets to reset.
* \--execute : to execute --reset-offsets process.
* \--export : to generate offset reset information in CSV format for export to a file.



Expand All @@ -309,6 +310,9 @@ It has 2 execution options:
* \--to-datetime <String: datetime> : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'
* \--to-earliest : Reset offsets to earliest offset.
* \--to-latest : Reset offsets to latest offset.
* \--from-file : Reset offsets to values defined in CSV file.
* \--to-current : Resets offsets to current offset.
* \--to-offset : Reset offsets to a specific offset.



Expand All @@ -320,6 +324,17 @@ GROUP TOPIC PARTITION NEW-OFFSET
my-share-group topic1 0 10
```

For example, you can export the current offsets from an inactive consumer group and use them to set the offsets for a share group:

```bash
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my-group --all-topics --to-current --dry-run --export > FILE.CSV

$ bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group my-share-group --from-file FILE.CSV --execute
GROUP TOPIC PARTITION NEW-OFFSET
my-share-group topic1 0 10
```


To delete the offsets of individual topics in the share group, use the "--delete-offsets" option:

```bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,57 +28,55 @@
import static org.apache.kafka.tools.ToolsUtils.minus;

public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
private static final String NL = System.lineSeparator();

private static final String BOOTSTRAP_SERVER_DOC = "The server(s) to connect to. REQUIRED for all options except for --validate-regex.";
private static final String GROUP_DOC = "The consumer group we wish to act on.";
private static final String TOPIC_DOC = "The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. " +
"In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " +
"Reset-offsets also supports multiple topic inputs.";
private static final String ALL_TOPICS_DOC = "Consider all topics assigned to a group in the `reset-offsets` process.";
private static final String GROUP_DOC = "The consumer group id.";
private static final String TOPIC_DOC = "The topic whose offset information should be deleted or reset. " +
"In `reset-offsets` case, partitions can be specified using this format: `topic:0,1,2`, where 0,1,2 are the partitions to be included. " +
"Reset-offsets also supports multiple topics.";
private static final String ALL_TOPICS_DOC = "Apply to all topics.";
private static final String LIST_DOC = "List all consumer groups.";
private static final String DESCRIBE_DOC = "Describe consumer group and list offset lag (number of messages not yet processed) related to given group.";
private static final String ALL_GROUPS_DOC = "Apply to all consumer groups.";
private static final String NL = System.lineSeparator();
private static final String DELETE_DOC = "Pass in groups to delete topic partition offsets and ownership information " +
"over the entire consumer group. For instance --group g1 --group g2";
private static final String DELETE_DOC = "Delete topic partition offsets and ownership information for one or more consumer groups.";
private static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
"to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " +
"or is going through some changes).";
private static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client and Consumer.";
private static final String RESET_OFFSETS_DOC = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + NL +
private static final String RESET_OFFSETS_DOC = "Reset offsets of consumer group. Supports one consumer group at a time, and instances should be inactive" + NL +
"Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. " +
"Additionally, the --export option is used to export the offsets in CSV format." + NL +
"You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " +
"--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL +
"To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'.";
private static final String DRY_RUN_DOC = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets.";
private static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets.";
private static final String EXPORT_DOC = "Export offset information in CSV format. Supported operations: reset-offsets.";
"To define the scope, use --all-topics or --topic. The scope must be specified unless you use --from-file.";
private static final String DRY_RUN_DOC = "Output offset reset information without executing the operation. Supported operation: reset-offsets.";
private static final String EXECUTE_DOC = "Execute the offset reset operation. Supported operation: reset-offsets.";
private static final String EXPORT_DOC = "Generate offset reset information in CSV format for export to a file. Supported operation: reset-offsets.";
private static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset.";
private static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file.";
private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'";
private static final String RESET_BY_DURATION_DOC = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'";
private static final String RESET_BY_DURATION_DOC = "Reset offsets by duration from current timestamp. Format: 'PnDTnHnMnS'";
private static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset.";
private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset.";
private static final String RESET_TO_CURRENT_DOC = "Reset offsets to current offset.";
private static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.";
private static final String MEMBERS_DOC = "Describe members of the group. This option may be used with '--describe' and '--bootstrap-server' options only." + NL +
private static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the --describe option only." + NL +
"Example: --bootstrap-server localhost:9092 --describe --group group1 --members";
private static final String VERBOSE_DOC = "Provide additional information, if any, when describing the group. This option may be used " +
"with '--offsets'/'--members'/'--state' and '--bootstrap-server' options only." + NL + "Example: --bootstrap-server localhost:9092 --describe --group group1 --members --verbose";
private static final String VERBOSE_DOC = "Provide additional information, if any, when describing the group. This option may be used with the --describe option only.";
private static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset lag. " +
"This is the default sub-action of and may be used with '--describe' and '--bootstrap-server' options only." + NL +
"This is the default sub-action of and may be used with the --describe option only." + NL +
"Example: --bootstrap-server localhost:9092 --describe --group group1 --offsets";
private static final String STATE_DOC = "When specified with '--describe', includes the state of the group." + NL +
private static final String STATE_DOC = "When specified with --describe, it displays the state of the group." + NL +
"Example: --bootstrap-server localhost:9092 --describe --group group1 --state" + NL +
"When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states." + NL +
"When specified with --list, it displays the states of the groups. It can also be used to list groups with specific states." + NL +
"Example: --bootstrap-server localhost:9092 --list --state stable,empty" + NL +
"This option may be used with '--describe', '--list' and '--bootstrap-server' options only.";
private static final String TYPE_DOC = "When specified with '--list', it displays the types of all the groups. It can also be used to list groups with specific types." + NL +
"This option may be used with --describe and --list options only.";
private static final String TYPE_DOC = "When specified with --list, it displays the types of the groups. It can also be used to list groups with specific types." + NL +
"Example: --bootstrap-server localhost:9092 --list --type classic,consumer" + NL +
"This option may be used with the '--list' option only.";
private static final String DELETE_OFFSETS_DOC = "Delete offsets of consumer group. Supports one consumer group at the time, and multiple topics.";
private static final String VALIDATE_REGEX_DOC = "Validate that the syntax of the provided regular expression is valid according to the RE2 format.";
"This option may be used with the --list option only.";
private static final String DELETE_OFFSETS_DOC = "Delete offsets of consumer group. Supports one consumer group at a time, and multiple topics.";
private static final String VALIDATE_REGEX_DOC = "Validate the syntax of the regular expression according to the RE2 format.";

final OptionSpec<String> bootstrapServerOpt;
final OptionSpec<String> groupOpt;
Expand Down Expand Up @@ -130,7 +128,7 @@ private ConsumerGroupCommandOptions(String[] args) {
.ofType(String.class);
groupOpt = parser.accepts("group", GROUP_DOC)
.withRequiredArg()
.describedAs("consumer group")
.describedAs("group id")
.ofType(String.class);
topicOpt = parser.accepts("topic", TOPIC_DOC)
.withRequiredArg()
Expand Down Expand Up @@ -208,7 +206,7 @@ private ConsumerGroupCommandOptions(String[] args) {

@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
void checkArgs() {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.");
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list, describe, delete and manage the offsets of consumer groups.");

if (!options.has(validateRegexOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ Map<String, Throwable> deleteShareGroups() {
for (String groupId : groupIdSet) {
Optional<GroupListing> listing = shareGroupIds.stream().filter(item -> item.groupId().equals(groupId)).findAny();
if (listing.isEmpty()) {
errGroups.put(groupId, new IllegalArgumentException("Group '" + groupId + "' is not a share group."));
errGroups.put(groupId, new GroupIdNotFoundException("Group '" + groupId + "' is not a share group."));
} else {
Optional<GroupState> groupState = listing.get().groupState();
groupState.ifPresent(state -> {
Expand Down
Loading
Loading