Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2092,15 +2092,21 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(updatedMember.assignedTasks().activeTasksWithEpochs()));
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
if (memberEpoch != 0 || !updatedMember.assignedTasks().isEmpty()) {
if (updatedMember.userEndpoint().isPresent()) {
// If no user endpoint is defined, there is no change in the endpoint information.
// Otherwise, bump the endpoint information epoch
group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1);
}
}

if (group.endpointInformationEpoch() != memberEndpointEpoch) {
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group, updatedMember));
}
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
if (groups.containsKey(group.groupId())) {
// If we just created the group and the endpoint information epoch will not be persisted, so return epoch 0.
// Otherwise, return the bumped epoch.
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
}

Map<String, CreatableTopic> internalTopicsToBeCreated = Collections.emptyMap();
if (updatedConfiguredTopology.topicConfigurationException().isPresent()) {
Expand Down Expand Up @@ -2216,20 +2222,20 @@ private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> maybeBuildE
StreamsGroupMember updatedMember) {
List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitionsList = new ArrayList<>();
final Map<String, StreamsGroupMember> members = group.members();
// Build endpoint information for all members except the updated member
for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) {
final String memberIdForAssignment = entry.getKey();
final Optional<StreamsGroupMemberMetadataValue.Endpoint> endpointOptional = members.get(memberIdForAssignment).userEndpoint();
StreamsGroupMember groupMember = updatedMember != null && memberIdForAssignment.equals(updatedMember.memberId()) ? updatedMember : members.get(memberIdForAssignment);
if (endpointOptional.isPresent()) {
final StreamsGroupMemberMetadataValue.Endpoint endpoint = endpointOptional.get();
final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
responseEndpoint.setHost(endpoint.host());
responseEndpoint.setPort(endpoint.port());
StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions = EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, group, metadataImage);
endpointToPartitionsList.add(endpointToPartitions);
if (updatedMember != null && entry.getKey().equals(updatedMember.memberId())) {
continue;
}
EndpointToPartitionsManager.maybeEndpointToPartitions(entry.getValue(), group, metadataImage)
.ifPresent(endpointToPartitionsList::add);
}
// Always build endpoint information for the updated member (whether new or existing)
if (updatedMember != null) {
EndpointToPartitionsManager.maybeEndpointToPartitions(updatedMember, group, metadataImage)
.ifPresent(endpointToPartitionsList::add);
}
return endpointToPartitionsList.isEmpty() ? null : endpointToPartitionsList;
return endpointToPartitionsList;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public static class DeadlineAndEpoch {
* The current epoch for endpoint information, this is used to determine when to send
* updated endpoint information to members of the group.
*/
private int endpointInformationEpoch = -1;
private int endpointInformationEpoch = 0;

/**
* The last used assignment configurations for this streams group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;

Expand All @@ -35,6 +36,33 @@ public class EndpointToPartitionsManager {
private EndpointToPartitionsManager() {
}

/**
* Creates endpoint-to-partitions mapping for a member if the member has a user endpoint.
* Returns empty if the member has no user endpoint.
*
* @param streamsGroupMember The streams group member.
* @param streamsGroup The streams group.
* @param metadataImage The metadata image.
* @return An Optional containing the EndpointToPartitions if the member has an endpoint, empty otherwise.
*/
public static Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> maybeEndpointToPartitions(
final StreamsGroupMember streamsGroupMember,
final StreamsGroup streamsGroup,
final CoordinatorMetadataImage metadataImage
) {
Optional<StreamsGroupMemberMetadataValue.Endpoint> endpointOptional = streamsGroupMember.userEndpoint();
if (endpointOptional.isEmpty()) {
return Optional.empty();
}

StreamsGroupMemberMetadataValue.Endpoint endpoint = endpointOptional.get();
StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
responseEndpoint.setHost(endpoint.host());
responseEndpoint.setPort(endpoint.port());

return Optional.of(endpointToPartitions(streamsGroupMember, responseEndpoint, streamsGroup, metadataImage));
}

public static StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions(final StreamsGroupMember streamsGroupMember,
final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint,
final StreamsGroup streamsGroup,
Expand Down
Loading
Loading