Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,6 @@ private static void throwIfStreamsGroupHeartbeatRequestIsInvalid(
private static void throwIfStreamsGroupHeartbeatRequestIsUsingUnsupportedFeatures(
StreamsGroupHeartbeatRequestData request
) throws InvalidRequestException {
throwIfNotNull(request.instanceId(), "Static membership is not yet supported.");
throwIfNotNull(request.taskOffsets(), "TaskOffsets are not supported yet.");
throwIfNotNull(request.taskEndOffsets(), "TaskEndOffsets are not supported yet.");
throwIfNotNullOrEmpty(request.warmupTasks(), "WarmupTasks are not supported yet.");
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,40 @@ public Map<String, String> staticMembers() {
}

/**
* Returns the target assignment of the member.
* Returns true if the static member exists.
*
* @param instanceId The instance id.
*
* @return The StreamsGroupMemberAssignment or an EMPTY one if it does not exist.
* @return A boolean indicating whether the member exists or not.
*/
public TasksTuple targetAssignment(String memberId) {
return targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY);
public boolean hasStaticMember(String instanceId) {
if (instanceId == null) return false;
return staticMembers.containsKey(instanceId);
}

/**
* Returns the target assignment of the member.
* <p>
* If {@code instanceId} is empty, the assignment is looked up by {@code memberId}.
* If {@code instanceId} is present, the assignment is looked up by the member ID
* associated with that static member instance ID.
*
* @param memberId The member id.
* @param instanceId The instance id.
*
* @return The StreamsGroupMemberAssignment for the resolved member ID, or {@link TasksTuple#EMPTY}
* if no assignment exists or no static member exists for {@code instanceId}.
*/
public TasksTuple targetAssignment(String memberId, Optional<String> instanceId) {
if (instanceId.isEmpty()) {
return targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY);
} else {
StreamsGroupMember previousMember = staticMember(instanceId.get());
if (previousMember != null) {
return targetAssignment.getOrDefault(previousMember.memberId(), TasksTuple.EMPTY);
}
}
return TasksTuple.EMPTY;
}

/**
Expand Down Expand Up @@ -1274,11 +1302,13 @@ public void invalidateCachedEndpointToPartitions(String memberId) {
*
* @param updatedMember The member that was just updated (may have a stale entry in the members map).
* @param metadataImage The current metadata image for resolving topic partitions.
* @param maybeReplacedStaticMember The replaced static member. it can be null.
* @return The list of endpoint-to-partitions mappings for all members with endpoints.
*/
public List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> buildEndpointToPartitions(
StreamsGroupMember updatedMember,
CoordinatorMetadataImage metadataImage
CoordinatorMetadataImage metadataImage,
StreamsGroupMember maybeReplacedStaticMember
) {
List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitionsList = new ArrayList<>();
if (updatedMember == null) {
Expand All @@ -1290,6 +1320,9 @@ public List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> buildEndpoin
if (entry.getKey().equals(updatedMember.memberId())) {
continue;
}
if (maybeReplacedStaticMember != null && entry.getKey().equals(maybeReplacedStaticMember.memberId())) {
continue;
}
getOrComputeEndpointToPartitions(entry.getValue(), metadataImage)
.ifPresent(endpointToPartitionsList::add);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -104,9 +105,13 @@ public Builder(String memberId) {
}

public Builder(StreamsGroupMember member) {
this(Objects.requireNonNull(member, "member cannot be null"), member.memberId);
}

public Builder(StreamsGroupMember member, String memberId) {
Objects.requireNonNull(member, "member cannot be null");

this.memberId = member.memberId;
this.memberId = memberId;
this.memberEpoch = member.memberEpoch;
this.previousMemberEpoch = member.previousMemberEpoch;
this.instanceId = member.instanceId;
Expand Down Expand Up @@ -315,6 +320,37 @@ public StreamsGroupMember build() {
tasksPendingRevocation
);
}

/**
* Resets the assignment epochs to 0 for all assigned active tasks.
* Used when a static member leaves, so that the rejoining member's
* active tasks will be assigned from epoch 0 to the new member ID.
* All commits using the old member ID will be fenced.
*/
public Builder resetAssignedTasksEpochsToZero() {
if (this.assignedTasks.isEmpty()) {
return this;
}

if (this.assignedTasks.activeTasksWithEpochs().isEmpty()) {
return this;
}

Map<String, Map<Integer, Integer>> resetActiveTasks = new HashMap<>();
for (Map.Entry<String, Map<Integer, Integer>> entry : this.assignedTasks.activeTasksWithEpochs().entrySet()) {
Map<Integer, Integer> resetActiveTaskEpochs = new HashMap<>();
for (Integer partitionId : entry.getValue().keySet()) {
resetActiveTaskEpochs.put(partitionId, 0);
}
resetActiveTasks.put(entry.getKey(), resetActiveTaskEpochs);
}
this.assignedTasks = new TasksTupleWithEpochs(
resetActiveTasks,
this.assignedTasks.standbyTasks(),
this.assignedTasks.warmupTasks()
);
return this;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,21 +591,6 @@ public void testStreamsGroupHeartbeatFailsForUnsupportedFeatures() throws Except
AuthorizableRequestContext context = mock(AuthorizableRequestContext.class);
when(context.requestVersion()).thenReturn((int) ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion());

assertEquals(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
.setErrorMessage("Static membership is not yet supported."),
Map.of(),
-1
),
service.streamsGroupHeartbeat(
context,
new StreamsGroupHeartbeatRequestData()
.setInstanceId(Uuid.randomUuid().toString())
).get(5, TimeUnit.SECONDS)
);

assertEquals(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData()
Expand Down Expand Up @@ -2188,7 +2173,7 @@ public void testStreamsGroupDescribeInvalidGroupId() throws ExecutionException,
ArgumentMatchers.eq("streams-group-describe"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(new StreamsGroupDescribeResult(List.of(describedGroup), Map.of())));
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));

CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future =
service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), Arrays.asList("", null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22710,9 +22710,9 @@ public void testReplayStreamsGroupTargetAssignmentMember() {
TaskAssignmentTestUtil.mkTasks("subtopology-1", 6, 7, 8))
);
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord("foo", "m1", tasks));
assertEquals(tasks.activeTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").activeTasks());
assertEquals(tasks.standbyTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").standbyTasks());
assertEquals(tasks.warmupTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").warmupTasks());
assertEquals(tasks.activeTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1", Optional.empty()).activeTasks());
assertEquals(tasks.standbyTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1", Optional.empty()).standbyTasks());
assertEquals(tasks.warmupTasks(), context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1", Optional.empty()).warmupTasks());
}

@Test
Expand Down Expand Up @@ -22743,7 +22743,7 @@ public void testReplayStreamsGroupTargetAssignmentMemberTombstoneExisting() {

context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord("foo", "m1"));

assertTrue(context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1").isEmpty());
assertTrue(context.groupMetadataManager.streamsGroup("foo").targetAssignment("m1", Optional.empty()).isEmpty());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,24 +734,28 @@ public CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<Ini
}

public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
StreamsGroupHeartbeatRequestData request
StreamsGroupHeartbeatRequestData request,
String clientId,
InetAddress clientAddress
) {
return streamsGroupHeartbeat(request, ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion());
return streamsGroupHeartbeat(request, clientId, clientAddress, ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion());
}

public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
StreamsGroupHeartbeatRequestData request,
String clientId,
InetAddress clientAddress,
short version
) {
RequestContext context = new RequestContext(
new RequestHeader(
ApiKeys.STREAMS_GROUP_HEARTBEAT,
version,
"client",
clientId,
0
),
"1",
InetAddress.getLoopbackAddress(),
clientAddress,
KafkaPrincipal.ANONYMOUS,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
Expand All @@ -770,6 +774,19 @@ public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streams
return result;
}

public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
StreamsGroupHeartbeatRequestData request
) {
return streamsGroupHeartbeat(request, "client", InetAddress.getLoopbackAddress());
}

public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(
StreamsGroupHeartbeatRequestData request,
short version
) {
return streamsGroupHeartbeat(request, "client", InetAddress.getLoopbackAddress(), version);
}

public List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> sleep(long ms) {
time.sleep(ms);
List<MockCoordinatorTimer.ExpiredTimeout<CoordinatorRecord>> timeouts = timer.poll();
Expand Down
Loading
Loading