Skip to content

Commit ca4a2cd

Browse files
authored
CPP-935: Fix latency-aware policy (#502)
Latency-aware policy update timer wasn't started on request processors.
1 parent 7f193cb commit ca4a2cd

File tree

3 files changed

+66
-0
lines changed

3 files changed

+66
-0
lines changed

src/latency_aware_policy.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ Host::Ptr LatencyAwarePolicy::LatencyAwareQueryPlan::compute_next() {
102102
return host;
103103
}
104104

105+
LOG_TRACE("Skipping %s because latency is too high %f", host->address_string().c_str(),
106+
static_cast<double>(latency.average) / 1e6);
105107
skipped_.push_back(host);
106108
}
107109

src/request_processor.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ RequestProcessor::RequestProcessor(RequestProcessorListener* listener, EventLoop
214214
for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(); it != policies.end(); ++it) {
215215
// Initialize the load balancing policies
216216
(*it)->init(connected_host, hosts, random, local_dc);
217+
(*it)->register_handles(event_loop_->loop());
217218
}
218219

219220
listener_->on_connect(this);
@@ -308,6 +309,10 @@ void RequestProcessor::on_requires_flush() {
308309
}
309310

310311
void RequestProcessor::on_close(ConnectionPoolManager* manager) {
312+
for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin();
313+
it != load_balancing_policies_.end(); ++it) {
314+
(*it)->close_handles();
315+
}
311316
async_.close_handle();
312317
prepare_.close_handle();
313318
timer_.stop();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
Copyright (c) DataStax, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
#include "cassandra.h"
18+
#include "integration.hpp"
19+
20+
#include <algorithm>
21+
#include <iterator>
22+
23+
class LatencyAwarePolicyTest : public Integration {
24+
public:
25+
void SetUp() {
26+
number_dc1_nodes_ = 3;
27+
is_session_requested_ = false;
28+
Integration::SetUp();
29+
}
30+
};
31+
32+
/**
33+
* Validates that latency-aware policy is enabled and updating the minimum average latency.
34+
*
35+
* @since 2.16.1
36+
* @jira_ticket CPP-935
37+
* @test_category load_balancing_policy:latency_aware
38+
*/
39+
CASSANDRA_INTEGRATION_TEST_F(LatencyAwarePolicyTest, IsEnabled) {
40+
CHECK_FAILURE
41+
cluster_ = default_cluster();
42+
cluster_.with_load_balance_round_robin();
43+
cass_cluster_set_token_aware_routing(cluster_.get(), cass_false);
44+
cass_cluster_set_latency_aware_routing(cluster_.get(), cass_true);
45+
cass_cluster_set_latency_aware_routing_settings(cluster_.get(), 2.0, 100LL * 1000LL * 1000LL,
46+
10LL * 1000LL * 1000LL * 1000LL, 100, 1);
47+
connect(cluster_);
48+
49+
logger_.reset();
50+
logger_.add_critera("Calculated new minimum:");
51+
52+
for (int i = 0; i < 9; ++i) { // Greater than min measured
53+
session_.execute("SELECT release_version FROM system.local");
54+
}
55+
56+
msleep(250);
57+
58+
EXPECT_GT(logger_.count(), 0u);
59+
}

0 commit comments

Comments
 (0)