Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ShareAcquireMode;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
Expand Down Expand Up @@ -89,6 +90,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi
private final IdempotentCloser idempotentCloser = new IdempotentCloser();
private Uuid memberId;
private boolean fetchMoreRecords = false;
private final AtomicInteger fetchRecordsNodeId = new AtomicInteger(-1);
private final Map<Integer, Map<TopicIdPartition, Acknowledgements>> fetchAcknowledgementsToSend;
private final Map<Integer, Map<TopicIdPartition, Acknowledgements>> fetchAcknowledgementsInFlight;
private final Map<Integer, Tuple<AcknowledgeRequestState>> acknowledgeRequestStates;
Expand Down Expand Up @@ -196,6 +198,13 @@ public PollResult poll(long currentTimeMs) {
}
topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic());

// If we have not chosen a node for fetching records yet,
// choose now, and rotate the assigned partitions so the next poll starts on a different partition.
// This is only applicable for record_limit mode.
if (isShareAcquireModeRecordLimit() && fetchRecordsNodeId.compareAndSet(-1, node.id())) {
subscriptions.movePartitionToEnd(partition);
}

log.debug("Added fetch request for partition {} to node {}", tip, node.id());
}
}
Expand Down Expand Up @@ -246,6 +255,21 @@ public PollResult poll(long currentTimeMs) {
log.trace("Building ShareFetch request to send to node {}", target.id());
ShareFetchRequest.Builder requestBuilder = handler.newShareFetchBuilder(groupId, shareFetchConfig);

// For record_limit mode, we only send a full ShareFetch to a single node at a time.
// We prepare to build ShareFetch requests for all nodes with session handlers to permit
// piggy-backing of acknowledgements, and also to adjust the topic-partitions
// in the share session.
if (isShareAcquireModeRecordLimit() && target.id() != fetchRecordsNodeId.get()) {
ShareFetchRequestData data = requestBuilder.data();
// If there's nothing to send, just skip building the record.
if (data.topics().isEmpty() && data.forgottenTopicsData().isEmpty()) {
return null;
} else {
// There is something to send, but we don't want to fetch any records.
requestBuilder.data().setMaxRecords(0);
}
}

nodesWithPendingRequests.add(target.id());

BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> {
Expand All @@ -256,11 +280,15 @@ public PollResult poll(long currentTimeMs) {
}
};
return new UnsentRequest(requestBuilder, Optional.of(target)).whenComplete(responseHandler);
}).collect(Collectors.toList());
}).filter(Objects::nonNull).collect(Collectors.toList());

return new PollResult(requests);
}

private boolean isShareAcquireModeRecordLimit() {
return shareFetchConfig.shareAcquireMode == ShareAcquireMode.RECORD_LIMIT;
}

/**
*
* @return True if we can add acknowledgements to the share session.
Expand Down Expand Up @@ -736,6 +764,15 @@ private boolean isLeaderKnownToHaveChanged(int nodeId, TopicIdPartition topicIdP
return false;
}

@Override
public long maximumTimeToWait(long currentTimeMs) {
// When fetching records and there is no chosen node for fetching, we do not want to wait for the next poll in record_limit mode.
if (isShareAcquireModeRecordLimit() && fetchMoreRecords && subscriptions.numAssignedPartitions() > 0 && fetchRecordsNodeId.get() == -1) {
return 0L;
}
return Long.MAX_VALUE;
}

private void handleShareFetchSuccess(Node fetchTarget,
@SuppressWarnings("unused") ShareFetchRequestData requestData,
ClientResponse resp) {
Expand Down Expand Up @@ -854,6 +891,9 @@ private void handleShareFetchSuccess(Node fetchTarget,
metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs());
} finally {
log.debug("Removing pending request for node {} - success", fetchTarget.id());
if (isShareAcquireModeRecordLimit()) {
fetchRecordsNodeId.compareAndSet(fetchTarget.id(), -1);
}
nodesWithPendingRequests.remove(fetchTarget.id());
}
}
Expand Down Expand Up @@ -892,6 +932,9 @@ private void handleShareFetchFailure(Node fetchTarget,
}));
} finally {
log.debug("Removing pending request for node {} - failed", fetchTarget.id());
if (isShareAcquireModeRecordLimit()) {
fetchRecordsNodeId.compareAndSet(fetchTarget.id(), -1);
}
nodesWithPendingRequests.remove(fetchTarget.id());
}
}
Expand Down
Loading