Skip to content

[MultiNodePipelineBase] perf cluster pipeline if all the keys on same node #4148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
xrayw opened this issue Apr 23, 2025 · 4 comments · Fixed by #4149
Closed

[MultiNodePipelineBase] perf cluster pipeline if all the keys on same node #4148

xrayw opened this issue Apr 23, 2025 · 4 comments · Fixed by #4149
Milestone

Comments

@xrayw
Copy link
Contributor

xrayw commented Apr 23, 2025

I think there are some issues at here.

ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
    = pipelinedResponses.entrySet().iterator();
while (pipelinedResponsesIterator.hasNext()) {
  Map.Entry<HostAndPort, Queue<Response<?>>> entry = pipelinedResponsesIterator.next();
  HostAndPort nodeKey = entry.getKey();
  Queue<Response<?>> queue = entry.getValue();
  Connection connection = connections.get(nodeKey);
  executorService.submit(() -> {                        // we can run the last entry on current thread to save resources
    try {
      List<Object> unformatted = connection.getMany(queue.size());
      for (Object o : unformatted) {
        queue.poll().set(o);
      }
    } catch (JedisConnectionException jce) {
      log.error("Error with connection to " + nodeKey, jce);
      // cleanup the connection
      pipelinedResponsesIterator.remove();            // should not be used cross thread, it's not thread-safe, and when run to here, the iter may already reached to the end(hasNext=false)
      connections.remove(nodeKey);
      IOUtils.closeQuietly(connection);               // the connection should be return to pool in finally block
    } finally {
      countDownLatch.countDown();
    }
  });
}

and do we need to clear the pipelinedResponses and connections at the end? instead of remove the exceptioned one at catch block

@xrayw xrayw changed the title [MultiNodePipelineBase] cluster pipeline perf and connection leak fix [MultiNodePipelineBase] perf cluster pipeline and fix connection leak Apr 23, 2025
@xrayw
Copy link
Contributor Author

xrayw commented Apr 23, 2025

} catch (JedisConnectionException jce) {
          log.error("Error with connection to " + nodeKey, jce);
          // cleanup the connection
          // TODO these operations not thread-safe and
          pipelinedResponsesIterator.remove();
          connections.remove(nodeKey);
}

the code also has bug.

@ggivo
Copy link
Collaborator

ggivo commented Apr 24, 2025

@xrayw
Hi, just to confirm we’re aligned — based on the discussion here, it seems there’s no actual connection leak.

The goal of this issue is to track a performance optimization in MultiNodePipelineBase.
The suggestion is:
if all Response objects in a pipeline are associated with the same node, we could skip using the ExecutorService and instead run the sync() logic directly in the invoking thread.

Let me know if I got it correctly

@xrayw
Copy link
Contributor Author

xrayw commented Apr 24, 2025

Yes, No leak found, but the connection holding time is too long.

The goal of the issue is to perf the pipeline if all the keys on same node.

@xrayw xrayw changed the title [MultiNodePipelineBase] perf cluster pipeline and fix connection leak [MultiNodePipelineBase] perf cluster pipeline if all the keys on same node Apr 24, 2025
@ggivo ggivo added this to the 6.1.0 milestone May 7, 2025
@ggivo
Copy link
Collaborator

ggivo commented May 7, 2025

As discussed in the related PR, the connection lifecycle is bound to the Pipeline. Connection to given node is opened when the first command routed to the node is added, and closed when the pipeline is closed to ensure we are able to read command responses.

#4149 introduces optimization to create an Executor pool only if more than one node needs to be contacted, otherwise, we invoke the command in the invoking thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants