diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 9f551a5d02cc2..81b4feb2f7524 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2092,7 +2092,9 @@ private CoordinatorResult stream response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(updatedMember.assignedTasks().activeTasksWithEpochs())); response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks())); response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks())); - if (memberEpoch != 0 || !updatedMember.assignedTasks().isEmpty()) { + if (updatedMember.userEndpoint().isPresent()) { + // If no user endpoint is defined, there is no change in the endpoint information. + // Otherwise, bump the endpoint information epoch group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1); } } @@ -2100,7 +2102,11 @@ private CoordinatorResult stream if (group.endpointInformationEpoch() != memberEndpointEpoch) { response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group, updatedMember)); } - response.setEndpointInformationEpoch(group.endpointInformationEpoch()); + if (groups.containsKey(group.groupId())) { + // If we just created the group, the endpoint information epoch will not be persisted, so return epoch 0. + // Otherwise, return the bumped epoch. + response.setEndpointInformationEpoch(group.endpointInformationEpoch()); + } Map internalTopicsToBeCreated = Collections.emptyMap(); if (updatedConfiguredTopology.topicConfigurationException().isPresent()) { @@ -2216,20 +2222,20 @@ private List maybeBuildE StreamsGroupMember updatedMember) { List endpointToPartitionsList = new ArrayList<>(); final Map members = group.members(); + // Build endpoint information for all members except the updated member for (Map.Entry entry : members.entrySet()) { - final String memberIdForAssignment = entry.getKey(); - final Optional endpointOptional = members.get(memberIdForAssignment).userEndpoint(); - StreamsGroupMember groupMember = updatedMember != null && memberIdForAssignment.equals(updatedMember.memberId()) ? updatedMember : members.get(memberIdForAssignment); - if (endpointOptional.isPresent()) { - final StreamsGroupMemberMetadataValue.Endpoint endpoint = endpointOptional.get(); - final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint(); - responseEndpoint.setHost(endpoint.host()); - responseEndpoint.setPort(endpoint.port()); - StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions = EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, group, metadataImage); - endpointToPartitionsList.add(endpointToPartitions); + if (updatedMember != null && entry.getKey().equals(updatedMember.memberId())) { + continue; } + EndpointToPartitionsManager.maybeEndpointToPartitions(entry.getValue(), group, metadataImage) + .ifPresent(endpointToPartitionsList::add); + } + // Always build endpoint information for the updated member (whether new or existing) + if (updatedMember != null) { + EndpointToPartitionsManager.maybeEndpointToPartitions(updatedMember, group, metadataImage) + .ifPresent(endpointToPartitionsList::add); } - return endpointToPartitionsList.isEmpty() ? null : endpointToPartitionsList; + return endpointToPartitionsList; } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 192ddada4deae..5b8fa258cb3ba 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -208,7 +208,7 @@ public static class DeadlineAndEpoch { * The current epoch for endpoint information, this is used to determine when to send * updated endpoint information to members of the group. */ - private int endpointInformationEpoch = -1; + private int endpointInformationEpoch = 0; /** * The last used assignment configurations for this streams group. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java index 7b55f346f0983..455a584da4042 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.streams.StreamsGroup; import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; @@ -35,6 +36,33 @@ public class EndpointToPartitionsManager { private EndpointToPartitionsManager() { } + /** + * Creates endpoint-to-partitions mapping for a member if the member has a user endpoint. + * Returns empty if the member has no user endpoint. + * + * @param streamsGroupMember The streams group member. + * @param streamsGroup The streams group. + * @param metadataImage The metadata image. + * @return An Optional containing the EndpointToPartitions if the member has an endpoint, empty otherwise. + */ + public static Optional maybeEndpointToPartitions( + final StreamsGroupMember streamsGroupMember, + final StreamsGroup streamsGroup, + final CoordinatorMetadataImage metadataImage + ) { + Optional endpointOptional = streamsGroupMember.userEndpoint(); + if (endpointOptional.isEmpty()) { + return Optional.empty(); + } + + StreamsGroupMemberMetadataValue.Endpoint endpoint = endpointOptional.get(); + StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint(); + responseEndpoint.setHost(endpoint.host()); + responseEndpoint.setPort(endpoint.port()); + + return Optional.of(endpointToPartitions(streamsGroupMember, responseEndpoint, streamsGroup, metadataImage)); + } + public static StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions(final StreamsGroupMember streamsGroupMember, final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint, final StreamsGroup streamsGroup, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index cf80e19c4ff1b..6ac1587accd16 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -165,6 +165,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; @@ -16447,8 +16448,7 @@ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { .setWarmupTasks(List.of()) .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.MISSING_SOURCE_TOPICS.code()) - .setStatusDetail("Source topics bar are missing."))) - .setEndpointInformationEpoch(-1), + .setStatusDetail("Source topics bar are missing."))), result.response().data() ); @@ -16533,8 +16533,7 @@ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() { .setWarmupTasks(List.of()) .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.MISSING_INTERNAL_TOPICS.code()) - .setStatusDetail("Internal topics are missing: [bar]"))) - .setEndpointInformationEpoch(-1), + .setStatusDetail("Internal topics are missing: [bar]"))), result.response().data() ); @@ -16616,8 +16615,7 @@ public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() { .setWarmupTasks(List.of()) .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code()) - .setStatusDetail("Following topics do not have the same number of partitions: [{bar=3, foo=6}]"))) - .setEndpointInformationEpoch(-1), + .setStatusDetail("Following topics do not have the same number of partitions: [{bar=3, foo=6}]"))), result.response().data() ); @@ -16714,8 +16712,7 @@ public void testStreamsGroupMemberJoiningWithStaleTopology() { .setWarmupTasks(List.of()) .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.STALE_TOPOLOGY.code()) - .setStatusDetail("The member's topology epoch 0 is behind the group's topology epoch 1."))) - .setEndpointInformationEpoch(-1), + .setStatusDetail("The member's topology epoch 0 is behind the group's topology epoch 1."))), result.response().data() ); @@ -16809,8 +16806,7 @@ public void testStreamsGroupMemberRequestingShutdownApplication() { new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.SHUTDOWN_APPLICATION.code()) .setStatusDetail(statusDetail) - )) - .setEndpointInformationEpoch(-1), + )), result1.response().data() ); assertRecordsEquals(List.of(), result1.records()); @@ -16831,8 +16827,7 @@ public void testStreamsGroupMemberRequestingShutdownApplication() { new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.SHUTDOWN_APPLICATION.code()) .setStatusDetail(statusDetail) - )) - .setEndpointInformationEpoch(-1), + )), result2.response().data() ); @@ -16924,8 +16919,7 @@ public void testStreamsGroupMemberRequestingShutdownApplicationUponLeaving() { new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(Status.SHUTDOWN_APPLICATION.code()) .setStatusDetail(statusDetail) - )) - .setEndpointInformationEpoch(-1), + )), result2.response().data() ); } @@ -17263,8 +17257,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()) - .setEndpointInformationEpoch(-1), + .setWarmupTasks(List.of()), result.response().data() ); @@ -17404,8 +17397,7 @@ public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) .setMemberEpoch(1) - .setHeartbeatIntervalMs(5000) - .setEndpointInformationEpoch(-1), + .setHeartbeatIntervalMs(5000), result.response().data() ); } @@ -17513,8 +17505,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()) - .setEndpointInformationEpoch(-1), + .setWarmupTasks(List.of()), result.response().data() ); @@ -17555,8 +17546,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) .setPartitions(List.of(0)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()) - .setEndpointInformationEpoch(0), + .setWarmupTasks(List.of()), result.response().data() ); @@ -17601,8 +17591,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) .setPartitions(List.of(2)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()) - .setEndpointInformationEpoch(1), + .setWarmupTasks(List.of()), result.response().data() ); @@ -17635,8 +17624,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) new StreamsGroupHeartbeatResponseData() .setMemberId(memberId3) .setMemberEpoch(11) - .setHeartbeatIntervalMs(5000) - .setEndpointInformationEpoch(1), + .setHeartbeatIntervalMs(5000), result.response().data() ); @@ -17675,8 +17663,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) .setMemberEpoch(11) - .setHeartbeatIntervalMs(5000) - .setEndpointInformationEpoch(1), + .setHeartbeatIntervalMs(5000), result.response().data() ); @@ -17705,8 +17692,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) new StreamsGroupHeartbeatResponseData() .setMemberId(memberId2) .setMemberEpoch(10) - .setHeartbeatIntervalMs(5000) - .setEndpointInformationEpoch(1), + .setHeartbeatIntervalMs(5000), result.response().data() ); @@ -17732,8 +17718,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) .setSubtopologyId(subtopology2) .setPartitions(List.of(1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()) - .setEndpointInformationEpoch(2), + .setWarmupTasks(List.of()), result.response().data() ); @@ -17763,8 +17748,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) new StreamsGroupHeartbeatResponseData() .setMemberId(memberId3) .setMemberEpoch(11) - .setHeartbeatIntervalMs(5000) - .setEndpointInformationEpoch(2), + .setHeartbeatIntervalMs(5000), result.response().data() ); @@ -17806,8 +17790,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) .setPartitions(List.of(2)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()) - .setEndpointInformationEpoch(3), + .setWarmupTasks(List.of()), result.response().data() ); @@ -17853,8 +17836,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) .setSubtopologyId(subtopology2) .setPartitions(List.of(1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()) - .setEndpointInformationEpoch(4), + .setWarmupTasks(List.of()), result.response().data() ); @@ -18192,8 +18174,7 @@ fooTopicName, computeTopicHash( .setPartitions(List.of(0, 1, 2, 3, 4, 5)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()) - .setEndpointInformationEpoch(1), + .setWarmupTasks(List.of()), result.response().data() ); @@ -18454,8 +18435,7 @@ public void testStreamsRebalanceTimeoutLifecycle() { .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()) - .setEndpointInformationEpoch(-1), + .setWarmupTasks(List.of()), result.response().data() ); @@ -18483,8 +18463,7 @@ public void testStreamsRebalanceTimeoutLifecycle() { .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()) - .setEndpointInformationEpoch(0), + .setWarmupTasks(List.of()), result.response().data() ); @@ -18616,8 +18595,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()) - .setEndpointInformationEpoch(-1), + .setWarmupTasks(List.of()), result.response().data() ); @@ -18644,8 +18622,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()) - .setEndpointInformationEpoch(0), + .setWarmupTasks(List.of()), result.response().data() ); @@ -18810,6 +18787,11 @@ public void testStreamsGroupEndpointInformationOnlyWhenEpochGreater() { .setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092)) .setEndpointInformationEpoch(0)); + StreamsGroupHeartbeatResponseData.EndpointToPartitions expectedEndpointToPartitions = new StreamsGroupHeartbeatResponseData.EndpointToPartitions() + .setUserEndpoint(new StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092)) + .setActivePartitions(List.of(new StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("foo").setPartitions(List.of(0, 1)))) + .setStandbyPartitions(List.of()); + assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) @@ -18821,46 +18803,110 @@ public void testStreamsGroupEndpointInformationOnlyWhenEpochGreater() { .setPartitions(List.of(0, 1)))) .setStandbyTasks(List.of()) .setWarmupTasks(List.of()) - .setPartitionsByUserEndpoint(null), + .setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions)), result.response().data() ); + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092)) + .setMemberEpoch(result.response().data().memberEpoch()) + .setEndpointInformationEpoch(result.response().data().endpointInformationEpoch())); + assertNull(result.response().data().partitionsByUserEndpoint()); + } - context.groupMetadataManager.streamsGroup(groupId).setEndpointInformationEpoch(1); + @Test + public void testStreamsGroupEndpointInformationIncludesNewMember() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); - StreamsGroupHeartbeatResponseData.EndpointToPartitions expectedEndpointToPartitions = new StreamsGroupHeartbeatResponseData.EndpointToPartitions() - .setUserEndpoint(new StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092)) - .setActivePartitions(List.of(new StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("foo").setPartitions(List.of(0, 1)))) - .setStandbyPartitions(List.of()); + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 4) + .buildCoordinatorMetadataImage()) + .build(); - result = context.streamsGroupHeartbeat( + // Prepare assignment for first member + assignor.prepareGroupAssignment( + Map.of(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1)))); + + // First member joins + CoordinatorResult result = context.streamsGroupHeartbeat( new StreamsGroupHeartbeatRequestData() .setGroupId(groupId) - .setMemberId(memberId) - .setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092)) - .setMemberEpoch(result.response().data().memberEpoch())); + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint().setHost("host1").setPort(9092)) + .setEndpointInformationEpoch(0)); - assertNotNull(result.response().data().partitionsByUserEndpoint()); - StreamsGroupHeartbeatResponseData.EndpointToPartitions actualEndpointToPartitions = result.response().data().partitionsByUserEndpoint().get(0); - assertEquals(expectedEndpointToPartitions.userEndpoint(), actualEndpointToPartitions.userEndpoint()); - StreamsGroupHeartbeatResponseData.TopicPartition expectedEndpointTopicPartitions = expectedEndpointToPartitions.activePartitions().get(0); - StreamsGroupHeartbeatResponseData.TopicPartition actualEndpointTopicPartitions = actualEndpointToPartitions.activePartitions().get(0); + assertEquals(1, result.response().data().memberEpoch()); - assertEquals(expectedEndpointTopicPartitions.topic(), actualEndpointTopicPartitions.topic()); - List actualPartitions = actualEndpointTopicPartitions.partitions(); - Collections.sort(actualPartitions); - assertEquals(expectedEndpointTopicPartitions.partitions(), actualPartitions); + // Prepare assignment for both members + assignor.prepareGroupAssignment( + Map.of( + memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1)), + memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 2, 3)) + )); + // Second member joins result = context.streamsGroupHeartbeat( new StreamsGroupHeartbeatRequestData() .setGroupId(groupId) - .setMemberId(memberId) - .setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092)) - .setMemberEpoch(result.response().data().memberEpoch()) - .setEndpointInformationEpoch(result.response().data().endpointInformationEpoch())); + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setUserEndpoint(new StreamsGroupHeartbeatRequestData.Endpoint().setHost("host2").setPort(9093))); - assertNull(result.response().data().partitionsByUserEndpoint()); + // The response should include endpoint information because the member's epoch (0) differs from the group's (1) + assertNotNull(result.response().data().partitionsByUserEndpoint()); + List endpointsList = result.response().data().partitionsByUserEndpoint(); + assertEquals(2, endpointsList.size(), "Should include both members in endpoint information"); + + // Sort by port for consistent ordering + endpointsList.sort(Comparator.comparingInt(e -> e.userEndpoint().port())); + + // Verify first member's endpoint + StreamsGroupHeartbeatResponseData.EndpointToPartitions member1Endpoint = endpointsList.get(0); + assertEquals("host1", member1Endpoint.userEndpoint().host()); + assertEquals(9092, member1Endpoint.userEndpoint().port()); + assertEquals(1, member1Endpoint.activePartitions().size()); + StreamsGroupHeartbeatResponseData.TopicPartition member1Topic = member1Endpoint.activePartitions().get(0); + assertEquals("foo", member1Topic.topic()); + List member1Partitions = new ArrayList<>(member1Topic.partitions()); + Collections.sort(member1Partitions); + assertEquals(List.of(0, 1), member1Partitions); + + // Verify second member's endpoint (the new member) + StreamsGroupHeartbeatResponseData.EndpointToPartitions member2Endpoint = endpointsList.get(1); + assertEquals("host2", member2Endpoint.userEndpoint().host()); + assertEquals(9093, member2Endpoint.userEndpoint().port()); + assertEquals(1, member2Endpoint.activePartitions().size()); + StreamsGroupHeartbeatResponseData.TopicPartition member2Topic = member2Endpoint.activePartitions().get(0); + assertEquals("foo", member2Topic.topic()); + List member2Partitions = new ArrayList<>(member2Topic.partitions()); + Collections.sort(member2Partitions); + assertEquals(List.of(2, 3), member2Partitions); } @Test