Skip to content

Commit f86ef09

Browse files
committed
[WIP] fix: ensure tests pass
In particular, several of the commits in this set assume their `blocking_queue`s will be empty by the time the destructor is called. However, this is not guaranteeable, and causes segfaults and/or indefinite hangs when encountered. This commit predominantly ensures that the queues are all `clear()`d appropriately. Signed-off-by: Sam Stuewe <[email protected]>
1 parent e31a309 commit f86ef09

File tree

11 files changed

+81
-49
lines changed

11 files changed

+81
-49
lines changed

CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ if(DEFINED CMAKE_PREFIX_PATH)
5454
endif()
5555

5656
if(CMAKE_BUILD_TYPE STREQUAL "Debug")
57-
add_compile_options(-fprofile-arcs -ftest-coverage)
57+
add_compile_options(-fprofile-arcs -ftest-coverage -Og -ggdb3)
5858
endif()
5959

6060
if(CMAKE_BUILD_TYPE STREQUAL "Debug")

benchmarks/CMakeLists.txt

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ target_link_libraries(run_benchmarks ${GTEST_LIBRARY}
1818
shard
1919
watchtower
2020
locking_shard
21+
raft
2122
transaction
2223
rpc
2324
network
@@ -26,4 +27,5 @@ target_link_libraries(run_benchmarks ${GTEST_LIBRARY}
2627
crypto
2728
secp256k1
2829
${LEVELDB_LIBRARY}
30+
${NURAFT_LIBRARY}
2931
${CMAKE_THREAD_LIBS_INIT})

src/uhs/twophase/coordinator/controller.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,7 @@ namespace cbdc::coordinator {
680680
m_start_thread.join();
681681
}
682682

683+
m_attestation_check_queue.clear();
683684
for(auto& t : m_attestation_check_threads) {
684685
if(t.joinable()) {
685686
t.join();
@@ -710,15 +711,15 @@ namespace cbdc::coordinator {
710711
tx,
711712
m_opts.m_sentinel_public_keys,
712713
m_opts.m_attestation_threshold);
713-
cb(std::move(tx), valid);
714+
cb(tx, valid);
714715
}
715716
}
716717
}
717718

718719
auto controller::check_tx_attestation(const transaction::compact_tx& tx,
719720
attestation_check_callback cb)
720721
-> bool {
721-
m_attestation_check_queue.push({std::move(tx), std::move(cb)});
722+
m_attestation_check_queue.push({tx, std::move(cb)});
722723
return true;
723724
}
724725

@@ -731,7 +732,7 @@ namespace cbdc::coordinator {
731732
}
732733

733734
return check_tx_attestation(
734-
std::move(tx),
735+
tx,
735736
[&,
736737
res_cb = std::move(result_callback)](transaction::compact_tx tx2,
737738
bool result) {
@@ -760,9 +761,8 @@ namespace cbdc::coordinator {
760761
auto idx = m_current_batch->add_tx(tx2);
761762
// Map the index of the tx to the transaction ID and
762763
// sentinel ID
763-
m_current_txs->emplace(
764-
tx2.m_id,
765-
std::make_pair(std::move(res_cb), idx));
764+
m_current_txs->emplace(tx2.m_id,
765+
std::make_pair(res_cb, idx));
766766
return true;
767767
}();
768768
if(added) {

src/uhs/twophase/locking_shard/controller.cpp

+21-9
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
namespace cbdc::locking_shard {
1818
controller::controller(size_t shard_id,
1919
size_t node_id,
20-
const cbdc::config::options& opts,
20+
cbdc::config::options opts,
2121
std::shared_ptr<logging::log> logger)
2222
: m_opts(std::move(opts)),
2323
m_logger(std::move(logger)),
@@ -112,6 +112,18 @@ namespace cbdc::locking_shard {
112112
return true;
113113
}
114114

115+
controller::~controller() {
116+
m_running = false;
117+
m_validation_queue.clear();
118+
for(auto& t : m_validation_threads) {
119+
if(t.joinable()) {
120+
t.join();
121+
}
122+
}
123+
m_validation_threads.clear();
124+
m_server.reset();
125+
}
126+
115127
auto controller::raft_callback(nuraft::cb_func::Type type,
116128
nuraft::cb_func::Param* /* param */)
117129
-> nuraft::cb_func::ReturnCode {
@@ -158,24 +170,24 @@ namespace cbdc::locking_shard {
158170
auto v = validation_request();
159171
if(m_validation_queue.pop(v)) {
160172
auto [req, cb] = v;
161-
validate_request(std::move(req), std::move(cb));
173+
validate_request(std::move(req), cb);
162174
}
163175
}
164176
}
165177

166178
auto
167-
controller::enqueue_validation(cbdc::buffer buf,
179+
controller::enqueue_validation(cbdc::buffer request,
168180
cbdc::raft::rpc::validation_callback cb)
169181
-> bool {
170-
m_validation_queue.push({std::move(buf), std::move(cb)});
182+
m_validation_queue.push({std::move(request), std::move(cb)});
171183
return true;
172184
}
173185

174-
auto controller::validate_request(cbdc::buffer buf,
175-
cbdc::raft::rpc::validation_callback cb)
176-
-> bool {
186+
auto controller::validate_request(
187+
cbdc::buffer request,
188+
const cbdc::raft::rpc::validation_callback& cb) -> bool {
177189
auto maybe_req
178-
= cbdc::from_buffer<cbdc::rpc::request<rpc::request>>(buf);
190+
= cbdc::from_buffer<cbdc::rpc::request<rpc::request>>(request);
179191
auto valid = true;
180192
if(maybe_req) {
181193
valid = std::visit(
@@ -208,7 +220,7 @@ namespace cbdc::locking_shard {
208220
valid = false;
209221
}
210222

211-
cb(std::move(buf), valid);
223+
cb(std::move(request), valid);
212224
return true;
213225
}
214226
}

src/uhs/twophase/locking_shard/controller.hpp

+5-3
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ namespace cbdc::locking_shard {
2626
/// \param logger log to use for output.
2727
controller(size_t shard_id,
2828
size_t node_id,
29-
const cbdc::config::options& opts,
29+
cbdc::config::options opts,
3030
std::shared_ptr<logging::log> logger);
31-
~controller() = default;
31+
32+
~controller();
3233

3334
controller() = delete;
3435
controller(const controller&) = delete;
@@ -48,7 +49,8 @@ namespace cbdc::locking_shard {
4849
nuraft::cb_func::Param* param)
4950
-> nuraft::cb_func::ReturnCode;
5051
auto validate_request(cbdc::buffer request,
51-
cbdc::raft::rpc::validation_callback cb) -> bool;
52+
const cbdc::raft::rpc::validation_callback& cb)
53+
-> bool;
5254

5355
auto enqueue_validation(cbdc::buffer request,
5456
cbdc::raft::rpc::validation_callback cb)

src/uhs/twophase/sentinel_2pc/controller.cpp

+22-14
Original file line numberDiff line numberDiff line change
@@ -111,19 +111,23 @@ namespace cbdc::sentinel_2pc {
111111
return true;
112112
}
113113

114+
controller::~controller() {
115+
stop();
116+
}
117+
114118
void controller::validation_worker() {
115119
while(m_running) {
116120
auto v = queued_validation();
117121
if(m_validation_queue.pop(v)) {
118122
auto [tx, cb] = v;
119-
cb(std::move(tx), transaction::validation::check_tx(tx));
123+
cb(tx, transaction::validation::check_tx(tx));
120124
}
121125
}
122126
}
123127

124128
auto controller::validate_tx(const transaction::full_tx& tx,
125129
validation_callback cb) -> bool {
126-
m_validation_queue.push({std::move(tx), std::move(cb)});
130+
m_validation_queue.push({tx, std::move(cb)});
127131
return true;
128132
}
129133

@@ -133,22 +137,22 @@ namespace cbdc::sentinel_2pc {
133137
if(m_attestation_queue.pop(v)) {
134138
auto [tx, cb] = v;
135139
auto compact_tx = cbdc::transaction::compact_tx(tx);
136-
cb(std::move(tx), compact_tx.sign(m_secp.get(), m_privkey));
140+
cb(tx, compact_tx.sign(m_secp.get(), m_privkey));
137141
}
138142
}
139143
}
140144

141145
auto controller::attest_tx(const transaction::full_tx& tx,
142146
attestation_callback cb) -> bool {
143-
m_attestation_queue.push({std::move(tx), std::move(cb)});
147+
m_attestation_queue.push({tx, std::move(cb)});
144148
return true;
145149
}
146150

147151
auto controller::execute_transaction(
148152
transaction::full_tx tx,
149153
execute_result_callback_type result_callback) -> bool {
150154
return controller::validate_tx(
151-
std::move(tx),
155+
tx,
152156
[&, result_callback](
153157
const transaction::full_tx& tx2,
154158
std::optional<cbdc::transaction::validation::tx_error> err) {
@@ -166,10 +170,7 @@ namespace cbdc::sentinel_2pc {
166170
}
167171

168172
auto compact_tx = cbdc::transaction::compact_tx(tx2);
169-
gather_attestations(std::move(tx2),
170-
std::move(result_callback),
171-
compact_tx,
172-
{});
173+
gather_attestations(tx2, result_callback, compact_tx, {});
173174
return;
174175
});
175176
}
@@ -194,7 +195,7 @@ namespace cbdc::sentinel_2pc {
194195
transaction::full_tx tx,
195196
validate_result_callback_type result_callback) -> bool {
196197
return controller::validate_tx(
197-
std::move(tx),
198+
tx,
198199
[&, result_callback](
199200
const transaction::full_tx& tx2,
200201
std::optional<cbdc::transaction::validation::tx_error> err) {
@@ -203,7 +204,7 @@ namespace cbdc::sentinel_2pc {
203204
return;
204205
}
205206
controller::attest_tx(
206-
std::move(tx2),
207+
tx2,
207208
[&, result_callback](
208209
const transaction::full_tx& /* tx3 */,
209210
std::optional<cbdc::sentinel::validate_response> res) {
@@ -233,17 +234,24 @@ namespace cbdc::sentinel_2pc {
233234

234235
void controller::stop() {
235236
m_running = false;
237+
m_rpc_server.reset();
238+
239+
m_validation_queue.clear();
240+
m_attestation_queue.clear();
241+
236242
for(auto& t : m_validation_threads) {
237243
if(t.joinable()) {
238244
t.join();
239245
}
240246
}
247+
m_validation_threads.clear();
241248

242249
for(auto& t : m_attestation_threads) {
243250
if(t.joinable()) {
244251
t.join();
245252
}
246253
}
254+
m_attestation_threads.clear();
247255
}
248256

249257
void controller::gather_attestations(
@@ -252,10 +260,10 @@ namespace cbdc::sentinel_2pc {
252260
const transaction::compact_tx& ctx,
253261
std::unordered_set<size_t> requested) {
254262
if(ctx.m_attestations.size() < m_opts.m_attestation_threshold) {
255-
if(ctx.m_attestations.size() == 0) {
263+
if(ctx.m_attestations.empty()) {
256264
// Self-attest first
257265
controller::attest_tx(
258-
std::move(tx),
266+
tx,
259267
[&, ctx, result_callback](const transaction::full_tx& tx2,
260268
validate_result res) {
261269
validate_result_handler(res,
@@ -297,7 +305,7 @@ namespace cbdc::sentinel_2pc {
297305
void
298306
controller::send_compact_tx(const transaction::compact_tx& ctx,
299307
execute_result_callback_type result_callback) {
300-
auto cb = [&, this, ctx, res_cb = std::move(result_callback)](
308+
auto cb = [&, ctx, res_cb = std::move(result_callback)](
301309
std::optional<bool> res) {
302310
result_handler(res, res_cb);
303311
};

src/uhs/twophase/sentinel_2pc/controller.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace cbdc::sentinel_2pc {
3838
const config::options& opts,
3939
std::shared_ptr<logging::log> logger);
4040

41-
~controller() override = default;
41+
~controller() override;
4242

4343
/// Initializes the controller. Connects to the shard coordinator
4444
/// network and launches a server thread for external clients.

src/uhs/twophase/sentinel_2pc/server.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,20 @@ namespace cbdc::sentinel::rpc {
1717
m_srv->register_handler_callback(
1818
[&](const request& req,
1919
async_interface::result_callback_type callback) {
20-
auto req_item = request_queue_t{req, callback};
20+
auto req_item = request_queue_t{req, std::move(callback)};
2121
m_request_queue.push(req_item);
2222
return true;
2323
});
2424
}
25-
bool operator<(const request_queue_t& a, const request_queue_t& b) {
25+
auto operator<(const request_queue_t& a, const request_queue_t& b)
26+
-> bool {
2627
// Prioritize validate requests over execute requests
2728
return (std::holds_alternative<validate_request>(a.m_req)
2829
&& std::holds_alternative<execute_request>(b.m_req));
2930
}
3031
async_server::~async_server() {
3132
m_running = false;
33+
m_request_queue.clear();
3234
if(m_processing_thread.joinable()) {
3335
m_processing_thread.join();
3436
}

src/uhs/twophase/sentinel_2pc/server.hpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace cbdc::sentinel::rpc {
1818
async_interface::result_callback_type m_cb;
1919
};
2020

21-
bool operator<(const request_queue_t& a, const request_queue_t& b);
21+
auto operator<(const request_queue_t& a, const request_queue_t& b) -> bool;
2222
/// Asynchronous RPC server for a sentinel.
2323
class async_server {
2424
public:
@@ -33,6 +33,10 @@ namespace cbdc::sentinel::rpc {
3333
std::unique_ptr<cbdc::rpc::async_server<request, response>> srv);
3434

3535
~async_server();
36+
async_server(async_server&&) noexcept = delete;
37+
auto operator=(async_server&&) noexcept -> async_server& = delete;
38+
async_server(const async_server&) = delete;
39+
auto operator=(const async_server&) -> async_server& = delete;
3640

3741
private:
3842
void process();

src/util/raft/rpc_server.hpp

+7-6
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ namespace cbdc::raft::rpc {
2626
/// \param impl pointer to the raft node.
2727
/// \see cbdc::rpc::server
2828
void register_raft_node(std::shared_ptr<node> impl) {
29-
register_raft_node(impl, std::nullopt);
29+
register_raft_node(std::move(impl), std::nullopt);
3030
}
3131

3232
/// Registers the raft node whose state machine handles RPC requests
@@ -42,10 +42,11 @@ namespace cbdc::raft::rpc {
4242
if(validate.has_value()) {
4343
m_validate_func = std::move(validate.value());
4444
} else {
45-
m_validate_func = [&](buffer b, validation_callback cb) {
46-
cb(std::move(b), true);
47-
return true;
48-
};
45+
m_validate_func
46+
= [&](buffer b, const validation_callback& cb) {
47+
cb(std::move(b), true);
48+
return true;
49+
};
4950
}
5051
cbdc::rpc::raw_async_server::register_handler_callback(
5152
[&](buffer req, response_callback_type resp_cb) {
@@ -84,7 +85,7 @@ namespace cbdc::raft::rpc {
8485

8586
auto success = m_impl->replicate(
8687
new_log,
87-
[&, resp_cb = std::move(res_cb), req_buf = new_log](
88+
[&, resp_cb = res_cb, req_buf = new_log](
8889
result_type& r,
8990
nuraft::ptr<std::exception>& err) {
9091
if(err) {

0 commit comments

Comments
 (0)