Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,7 @@ public void testShareAutoOffsetResetByDurationInvalidFormat() {
public void testShareConsumerAfterCoordinatorMovement() throws Exception {
String topicName = "multipart";
String groupId = "multipartGrp";
Uuid topicId = createTopic(topicName, 3, 3);
Uuid topicId = createTopic(topicName, 1, 3);
alterShareAutoOffsetReset(groupId, "earliest");
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);

Expand Down Expand Up @@ -2253,7 +2253,7 @@ public void testBehaviorOnDeliveryCountBoundary() {
public void testComplexShareConsumer() throws Exception {
String topicName = "multipart";
String groupId = "multipartGrp";
createTopic(topicName, 3, 3);
createTopic(topicName, 1, 3);
TopicPartition multiTp = new TopicPartition(topicName, 0);

ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
Expand Down Expand Up @@ -2307,7 +2307,6 @@ public void testComplexShareConsumer() throws Exception {
serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"),
@ClusterConfigProperty(key = "group.share.enable", value = "true"),
@ClusterConfigProperty(key = "group.share.partition.max.record.locks", value = "10000"),
@ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private final IdempotentCloser idempotentCloser = new IdempotentCloser();
private Uuid memberId;
private boolean fetchMoreRecords = false;
private final AtomicInteger fetchRecordsNodeId = new AtomicInteger(-1);
private final Map<Integer, Map<TopicIdPartition, Acknowledgements>> fetchAcknowledgementsToSend;
private final Map<Integer, Map<TopicIdPartition, Acknowledgements>> fetchAcknowledgementsInFlight;
private final Map<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStates;
Expand Down Expand Up @@ -169,7 +170,7 @@ public PollResult poll(long currentTimeMs) {
if (nodesWithPendingRequests.contains(node.id())) {
log.trace("Skipping fetch for partition {} because previous fetch request to {} has not been processed", partition, node.id());
} else {
// if there is a leader and no in-flight requests, issue a new fetch
// If there is a leader and no in-flight requests, issue a new fetch.
ShareSessionHandler handler = handlerMap.computeIfAbsent(node,
k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId)));

Expand All @@ -196,11 +197,15 @@ public PollResult poll(long currentTimeMs) {
}
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());

// If we have not chosen a node for fetching records yet, choose now, and rotate the
// assigned partitions so the next poll starts on a different partition.
if (fetchRecordsNodeId.compareAndSet(-1, node.id())) {
subscriptions.movePartitionToEnd(partition);
}
log.debug("Added fetch request for partition {} to node {}", tip, node.id());
}
}


// Iterate over the session handlers to see if there are acknowledgements to be sent for partitions
// which are no longer part of the current subscription.
// We fail acknowledgements for records fetched from a previous leader.
Expand All @@ -226,7 +231,7 @@ public PollResult poll(long currentTimeMs) {
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, nodeId);
} else {
log.debug("Leader for the partition is down or has changed, failing Acknowledgements for partition {}", tip);
log.debug("Leader for the partition is down or has changed, failing acknowledgements for partition {}", tip);
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acks));
}
Expand All @@ -238,13 +243,27 @@ public PollResult poll(long currentTimeMs) {
}
});

// Iterate over the share session handlers and build a list of UnsentRequests
// Iterate over the share session handlers and build a list of UnsentRequests.
List<UnsentRequest> requests = handlerMap.entrySet().stream().map(entry -> {
Node target = entry.getKey();
ShareSessionHandler handler = entry.getValue();

log.trace("Building ShareFetch request to send to node {}", target.id());
ShareFetchRequest.Builder requestBuilder = handler.newShareFetchBuilder(groupId, fetchConfig);
// We only send a full ShareFetch to a single node at a time. We prepare to
// build ShareFetch requests for all nodes with session handlers to permit
// piggy-backing of acknowledgements, and also to adjust the topic-partitions
// in the share session.
if (target.id() != fetchRecordsNodeId.get()) {
ShareFetchRequestData data = requestBuilder.data();
// If there's nothing to send, just skip building the record.
if (data.topics().isEmpty() && data.forgottenTopicsData().isEmpty()) {
return null;
} else {
// There is something to send, but we don't want to fetch any records.
requestBuilder.data().setMaxRecords(0);
}
}

nodesWithPendingRequests.add(target.id());

Expand All @@ -256,11 +275,20 @@ public PollResult poll(long currentTimeMs) {
}
};
return new UnsentRequest(requestBuilder, Optional.of(target)).whenComplete(responseHandler);
}).collect(Collectors.toList());
}).filter(Objects::nonNull).collect(Collectors.toList());

return new PollResult(requests);
}

@Override
public long maximumTimeToWait(long currentTimeMs) {
// When fetching records and there is no chosen node for fetching, we do not want to wait for the next poll
if (fetchMoreRecords && subscriptions.numAssignedPartitions() > 0 && fetchRecordsNodeId.get() == -1) {
return 0L;
}
return Long.MAX_VALUE;
}

/**
*
* @return True if we can add acknowledgements to the share session.
Expand Down Expand Up @@ -849,6 +877,7 @@ private void handleShareFetchSuccess(Node fetchTarget,
metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs());
} finally {
log.debug("Removing pending request for node {} - success", fetchTarget.id());
fetchRecordsNodeId.compareAndSet(fetchTarget.id(), -1);
nodesWithPendingRequests.remove(fetchTarget.id());
}
}
Expand Down Expand Up @@ -887,6 +916,7 @@ private void handleShareFetchFailure(Node fetchTarget,
}));
} finally {
log.debug("Removing pending request for node {} - failed", fetchTarget.id());
fetchRecordsNodeId.compareAndSet(fetchTarget.id(), -1);
nodesWithPendingRequests.remove(fetchTarget.id());
}
}
Expand Down
Loading