Skip to content

Commit 81b99ac

Browse files
maurermiHalosGhost
authored andcommitted
raft: make replicate_sync truly blocking
Signed-off-by: Michael Maurer <[email protected]>
1 parent 7ce192d commit 81b99ac

File tree

2 files changed

+25
-11
lines changed

2 files changed

+25
-11
lines changed

src/util/raft/node.cpp

+20-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
#include "node.hpp"
77

8+
#include <future>
9+
810
namespace cbdc::raft {
911
node::node(int node_id,
1012
std::vector<network::endpoint_t> raft_endpoints,
@@ -99,11 +101,26 @@ namespace cbdc::raft {
99101
auto node::replicate_sync(const nuraft::ptr<nuraft::buffer>& new_log) const
100102
-> std::optional<nuraft::ptr<nuraft::buffer>> {
101103
auto ret = m_raft_instance->append_entries({new_log});
102-
if(!ret->get_accepted()
103-
|| ret->get_result_code() != nuraft::cmd_result_code::OK) {
104+
if(!ret->get_accepted()) {
105+
return std::nullopt;
106+
}
107+
auto result_code = nuraft::cmd_result_code::RESULT_NOT_EXIST_YET;
108+
auto blocking_promise = std::promise<void>();
109+
auto blocking_future = blocking_promise.get_future();
110+
ret->when_ready([&result_code,
111+
&blocking_promise](raft::result_type& r,
112+
nuraft::ptr<std::exception>& err) {
113+
if(err) {
114+
result_code = nuraft::cmd_result_code::FAILED;
115+
} else {
116+
result_code = r.get_result_code();
117+
}
118+
blocking_promise.set_value();
119+
});
120+
blocking_future.wait();
121+
if(result_code != nuraft::cmd_result_code::OK) {
104122
return std::nullopt;
105123
}
106-
107124
return ret->get();
108125
}
109126

tests/unit/raft_test.cpp

+5-8
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,10 @@ class raft_test : public ::testing::Test {
173173
auto new_log
174174
= cbdc::make_buffer<uint64_t, nuraft::ptr<nuraft::buffer>>(1);
175175

176+
auto res = nodes[0]->replicate_sync(new_log);
177+
ASSERT_TRUE(res.has_value());
178+
ASSERT_EQ(nodes[0]->last_log_idx(), 2UL);
179+
176180
cbdc::raft::callback_type result_fn = nullptr;
177181
auto result_done = std::atomic<bool>(false);
178182
if(!blocking) {
@@ -190,14 +194,7 @@ class raft_test : public ::testing::Test {
190194
while(!result_done) {
191195
std::this_thread::sleep_for(std::chrono::milliseconds(250));
192196
}
193-
ASSERT_EQ(nodes[0]->last_log_idx(), 2UL);
194-
195-
if(blocking) {
196-
// Replicate sync will only return a value in the blocking context
197-
auto res = nodes[0]->replicate_sync(new_log);
198-
ASSERT_TRUE(res.has_value());
199-
ASSERT_EQ(nodes[0]->last_log_idx(), 3UL);
200-
}
197+
ASSERT_EQ(nodes[0]->last_log_idx(), 3UL);
201198

202199
for(size_t i{0}; i < nodes.size(); i++) {
203200
ASSERT_EQ(nodes[i]->get_sm(), sms[i].get());

0 commit comments

Comments
 (0)