Skip to content

Commit b3b9080

Browse files
authored
chore: fix fiber types in the codebase (#2574)
Reduce reliance on core/fibers Signed-off-by: Roman Gershman <[email protected]>
1 parent 4000adf commit b3b9080

13 files changed

+45
-54
lines changed

src/core/fibers.h

-11
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,9 @@
1212

1313
namespace dfly {
1414

15-
using util::fb2::Barrier;
16-
using util::fb2::BlockingCounter;
17-
using util::fb2::CondVar;
18-
using util::fb2::Done;
19-
using util::fb2::EventCount;
2015
using util::fb2::Fiber;
21-
using util::fb2::Future;
2216
using util::fb2::Launch;
2317
using util::fb2::Mutex;
24-
using util::fb2::Promise;
2518
using util::fb2::SimpleChannel;
2619

2720
} // namespace dfly
28-
29-
namespace util {
30-
using fb2::SharedMutex;
31-
}

src/facade/dragonfly_connection.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ class Connection : public util::Connection {
308308
// Block until memory usage is below limit, can be called from any thread
309309
void EnsureBelowLimit();
310310

311-
dfly::EventCount ec;
311+
util::fb2::EventCount ec;
312312
std::atomic_size_t subscriber_bytes = 0;
313313

314314
size_t subscriber_thread_limit = 0; // cached flag subscriber_thread_limit
@@ -377,7 +377,7 @@ class Connection : public util::Connection {
377377
void DecreaseStatsOnClose();
378378

379379
std::deque<MessageHandle> dispatch_q_; // dispatch queue
380-
dfly::EventCount evc_; // dispatch queue waker
380+
util::fb2::EventCount evc_; // dispatch queue waker
381381
util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started)
382382

383383
size_t pending_pipeline_cmd_cnt_ = 0; // how many queued async commands in dispatch_q

src/server/acl/user_registry.cc

+11-9
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,23 @@
1414

1515
ABSL_DECLARE_FLAG(std::string, requirepass);
1616

17+
using namespace util;
18+
1719
namespace dfly::acl {
1820

1921
void UserRegistry::MaybeAddAndUpdate(std::string_view username, User::UpdateRequest req) {
20-
std::unique_lock<util::SharedMutex> lock(mu_);
22+
std::unique_lock<fb2::SharedMutex> lock(mu_);
2123
auto& user = registry_[username];
2224
user.Update(std::move(req));
2325
}
2426

2527
bool UserRegistry::RemoveUser(std::string_view username) {
26-
std::unique_lock<util::SharedMutex> lock(mu_);
28+
std::unique_lock<fb2::SharedMutex> lock(mu_);
2729
return registry_.erase(username);
2830
}
2931

3032
UserRegistry::UserCredentials UserRegistry::GetCredentials(std::string_view username) const {
31-
std::shared_lock<util::SharedMutex> lock(mu_);
33+
std::shared_lock<fb2::SharedMutex> lock(mu_);
3234
auto it = registry_.find(username);
3335
if (it == registry_.end()) {
3436
return {};
@@ -37,7 +39,7 @@ UserRegistry::UserCredentials UserRegistry::GetCredentials(std::string_view user
3739
}
3840

3941
bool UserRegistry::IsUserActive(std::string_view username) const {
40-
std::shared_lock<util::SharedMutex> lock(mu_);
42+
std::shared_lock<fb2::SharedMutex> lock(mu_);
4143
auto it = registry_.find(username);
4244
if (it == registry_.end()) {
4345
return false;
@@ -46,7 +48,7 @@ bool UserRegistry::IsUserActive(std::string_view username) const {
4648
}
4749

4850
bool UserRegistry::AuthUser(std::string_view username, std::string_view password) const {
49-
std::shared_lock<util::SharedMutex> lock(mu_);
51+
std::shared_lock<fb2::SharedMutex> lock(mu_);
5052
const auto& user = registry_.find(username);
5153
if (user == registry_.end()) {
5254
return false;
@@ -56,23 +58,23 @@ bool UserRegistry::AuthUser(std::string_view username, std::string_view password
5658
}
5759

5860
UserRegistry::RegistryViewWithLock UserRegistry::GetRegistryWithLock() const {
59-
std::shared_lock<util::SharedMutex> lock(mu_);
61+
std::shared_lock<fb2::SharedMutex> lock(mu_);
6062
return {std::move(lock), registry_};
6163
}
6264

6365
UserRegistry::RegistryWithWriteLock UserRegistry::GetRegistryWithWriteLock() {
64-
std::unique_lock<util::SharedMutex> lock(mu_);
66+
std::unique_lock<fb2::SharedMutex> lock(mu_);
6567
return {std::move(lock), registry_};
6668
}
6769

68-
UserRegistry::UserWithWriteLock::UserWithWriteLock(std::unique_lock<util::SharedMutex> lk,
70+
UserRegistry::UserWithWriteLock::UserWithWriteLock(std::unique_lock<fb2::SharedMutex> lk,
6971
const User& user, bool exists)
7072
: user(user), exists(exists), registry_lk_(std::move(lk)) {
7173
}
7274

7375
UserRegistry::UserWithWriteLock UserRegistry::MaybeAddAndUpdateWithLock(std::string_view username,
7476
User::UpdateRequest req) {
75-
std::unique_lock<util::SharedMutex> lock(mu_);
77+
std::unique_lock<fb2::SharedMutex> lock(mu_);
7678
const bool exists = registry_.contains(username);
7779
auto& user = registry_[username];
7880
user.Update(std::move(req));

src/server/acl/user_registry.h

+6-7
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,15 @@
55
#pragma once
66

77
#include <absl/container/flat_hash_map.h>
8-
#include <absl/synchronization/mutex.h>
98

109
#include <algorithm>
1110
#include <shared_mutex>
1211
#include <string>
1312
#include <utility>
1413
#include <vector>
1514

16-
#include "core/fibers.h"
1715
#include "server/acl/user.h"
16+
#include "util/fibers/synchronization.h"
1817

1918
namespace dfly::acl {
2019

@@ -70,12 +69,12 @@ class UserRegistry {
7069
// Helper class for accessing a user with a ReadLock outside the scope of UserRegistry
7170
class UserWithWriteLock {
7271
public:
73-
UserWithWriteLock(std::unique_lock<util::SharedMutex> lk, const User& user, bool exists);
72+
UserWithWriteLock(std::unique_lock<util::fb2::SharedMutex> lk, const User& user, bool exists);
7473
const User& user;
7574
const bool exists;
7675

7776
private:
78-
std::unique_lock<util::SharedMutex> registry_lk_;
77+
std::unique_lock<util::fb2::SharedMutex> registry_lk_;
7978
};
8079

8180
UserWithWriteLock MaybeAddAndUpdateWithLock(std::string_view username, User::UpdateRequest req);
@@ -84,18 +83,18 @@ class UserRegistry {
8483

8584
private:
8685
RegistryType registry_;
87-
mutable util::SharedMutex mu_;
86+
mutable util::fb2::SharedMutex mu_;
8887

8988
// Helper class for accessing the registry with a ReadLock outside the scope of UserRegistry
9089
template <template <typename T> typename LockT, typename RegT> class RegistryWithLock {
9190
public:
92-
RegistryWithLock(LockT<util::SharedMutex> lk, RegT reg)
91+
RegistryWithLock(LockT<util::fb2::SharedMutex> lk, RegT reg)
9392
: registry(reg), registry_lk_(std::move(lk)) {
9493
}
9594
RegT registry;
9695

9796
private:
98-
LockT<util::SharedMutex> registry_lk_;
97+
LockT<util::fb2::SharedMutex> registry_lk_;
9998
};
10099
};
101100

src/server/engine_shard_set.h

+4-4
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ class EngineShard {
244244

245245
uint32_t defrag_task_ = 0;
246246
Fiber fiber_periodic_;
247-
Done fiber_periodic_done_;
247+
util::fb2::Done fiber_periodic_done_;
248248

249249
DefragTaskState defrag_state_;
250250
std::unique_ptr<TieredStorage> tiered_storage_;
@@ -322,7 +322,7 @@ class EngineShardSet {
322322
// The functions running inside the shard queue run atomically (sequentially)
323323
// with respect each other on the same shard.
324324
template <typename U> void AwaitRunningOnShardQueue(U&& func) {
325-
BlockingCounter bc{unsigned(shard_queue_.size())};
325+
util::fb2::BlockingCounter bc{unsigned(shard_queue_.size())};
326326
for (size_t i = 0; i < shard_queue_.size(); ++i) {
327327
Add(i, [&func, bc]() mutable {
328328
func(EngineShard::tlocal());
@@ -347,7 +347,7 @@ class EngineShardSet {
347347

348348
template <typename U, typename P>
349349
void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) const {
350-
BlockingCounter bc{0};
350+
util::fb2::BlockingCounter bc{0};
351351

352352
for (uint32_t i = 0; i < size(); ++i) {
353353
if (!pred(i))
@@ -364,7 +364,7 @@ void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) const {
364364
}
365365

366366
template <typename U, typename P> void EngineShardSet::RunBlockingInParallel(U&& func, P&& pred) {
367-
BlockingCounter bc{0};
367+
util::fb2::BlockingCounter bc{0};
368368
static_assert(std::is_invocable_v<U, EngineShard*>,
369369
"Argument must be invocable EngineShard* as argument.");
370370
static_assert(std::is_void_v<std::invoke_result_t<U, EngineShard*>>,

src/server/io_utils.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ class BufferedStreamerBase : public io::Sink {
6464
bool IsStopped();
6565

6666
protected:
67-
bool producer_done_ = false; // whether producer is done
68-
unsigned buffered_ = 0; // how many entries are buffered
69-
EventCount waker_; // two sided waker
67+
bool producer_done_ = false; // whether producer is done
68+
unsigned buffered_ = 0; // how many entries are buffered
69+
util::fb2::EventCount waker_; // two sided waker
7070

7171
const Cancellation* cll_; // global cancellation
7272

src/server/journal/journal_slice.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class JournalSlice {
6868
std::optional<base::RingBuffer<JournalItem>> ring_buffer_;
6969
base::IoBuf ring_serialize_buf_;
7070

71-
mutable util::SharedMutex cb_mu_;
71+
mutable util::fb2::SharedMutex cb_mu_;
7272
std::vector<std::pair<uint32_t, ChangeCallback>> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_);
7373

7474
LSN lsn_ = 1;

src/server/journal/tx_executor.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ struct JournalReader;
1616
class MultiShardExecution {
1717
public:
1818
struct TxExecutionSync {
19-
Barrier barrier;
19+
util::fb2::Barrier barrier;
2020
std::atomic_uint32_t counter;
21-
BlockingCounter block;
21+
util::fb2::BlockingCounter block;
2222

2323
explicit TxExecutionSync(uint32_t counter)
2424
: barrier(counter), counter(counter), block(counter) {

src/server/replica.h

+5-4
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ class Replica : ProtocolClient {
148148
// In redis replication mode.
149149
Fiber sync_fb_;
150150
Fiber acks_fb_;
151-
EventCount waker_;
151+
util::fb2::EventCount waker_;
152152

153153
std::vector<std::unique_ptr<DflyShardReplica>> shard_flows_;
154154
// A vector of the last executer LSNs when a replication is interrupted.
@@ -182,13 +182,14 @@ class DflyShardReplica : public ProtocolClient {
182182

183183
// Start replica initialized as dfly flow.
184184
// Sets is_full_sync when successful.
185-
io::Result<bool> StartSyncFlow(BlockingCounter block, Context* cntx, std::optional<LSN>);
185+
io::Result<bool> StartSyncFlow(util::fb2::BlockingCounter block, Context* cntx,
186+
std::optional<LSN>);
186187

187188
// Transition into stable state mode as dfly flow.
188189
std::error_code StartStableSyncFlow(Context* cntx);
189190

190191
// Single flow full sync fiber spawned by StartFullSyncFlow.
191-
void FullSyncDflyFb(std::string eof_token, BlockingCounter block, Context* cntx);
192+
void FullSyncDflyFb(std::string eof_token, util::fb2::BlockingCounter block, Context* cntx);
192193

193194
// Single flow stable state sync fiber spawned by StartStableSyncFlow.
194195
void StableSyncDflyReadFb(Context* cntx);
@@ -213,7 +214,7 @@ class DflyShardReplica : public ProtocolClient {
213214

214215
std::queue<std::pair<TransactionData, bool>> trans_data_queue_;
215216
static constexpr size_t kYieldAfterItemsInQueue = 50;
216-
EventCount waker_; // waker for trans_data_queue_
217+
util::fb2::EventCount waker_; // waker for trans_data_queue_
217218
bool use_multi_shard_exe_sync_;
218219

219220
std::unique_ptr<JournalExecutor> executor_;

src/server/server_family.cc

+4-4
Original file line numberDiff line numberDiff line change
@@ -807,12 +807,12 @@ struct AggregateLoadResult {
807807
// Load starts as many fibers as there are files to load each one separately.
808808
// It starts one more fiber that waits for all load fibers to finish and returns the first
809809
// error (if any occured) with a future.
810-
Future<GenericError> ServerFamily::Load(const std::string& load_path) {
810+
fb2::Future<GenericError> ServerFamily::Load(const std::string& load_path) {
811811
auto paths_result = snapshot_storage_->LoadPaths(load_path);
812812
if (!paths_result) {
813813
LOG(ERROR) << "Failed to load snapshot: " << paths_result.error().Format();
814814

815-
Promise<GenericError> ec_promise;
815+
fb2::Promise<GenericError> ec_promise;
816816
ec_promise.set_value(paths_result.error());
817817
return ec_promise.get_future();
818818
}
@@ -856,8 +856,8 @@ Future<GenericError> ServerFamily::Load(const std::string& load_path) {
856856
load_fibers.push_back(proactor->LaunchFiber(std::move(load_fiber)));
857857
}
858858

859-
Promise<GenericError> ec_promise;
860-
Future<GenericError> ec_future = ec_promise.get_future();
859+
fb2::Promise<GenericError> ec_promise;
860+
fb2::Future<GenericError> ec_future = ec_promise.get_future();
861861

862862
// Run fiber that empties the channel and sets ec_promise.
863863
auto load_join_fiber = [this, aggregated_result, load_fibers = std::move(load_fibers),

src/server/server_family.h

+4-4
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ class ServerFamily {
171171

172172
// Load snapshot from file (.rdb file or summary.dfs file) and return
173173
// future with error_code.
174-
Future<GenericError> Load(const std::string& file_name);
174+
util::fb2::Future<GenericError> Load(const std::string& file_name);
175175

176176
bool IsSaving() const {
177177
return is_saving_.load(std::memory_order_relaxed);
@@ -261,7 +261,7 @@ class ServerFamily {
261261
void SendInvalidationMessages() const;
262262

263263
Fiber snapshot_schedule_fb_;
264-
Future<GenericError> load_result_;
264+
util::fb2::Future<GenericError> load_result_;
265265

266266
uint32_t stats_caching_task_ = 0;
267267
Service& service_;
@@ -294,11 +294,11 @@ class ServerFamily {
294294
// be --dbfilename.
295295
bool save_on_shutdown_{true};
296296

297-
Done schedule_done_;
297+
util::fb2::Done schedule_done_;
298298
std::unique_ptr<util::fb2::FiberQueueThreadPool> fq_threadpool_;
299299
std::shared_ptr<detail::SnapshotStorage> snapshot_storage_;
300300

301-
mutable Mutex peak_stats_mu_;
301+
mutable util::fb2::Mutex peak_stats_mu_;
302302
mutable PeakStats peak_stats_;
303303
};
304304

src/server/server_state.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ class ServerState { // public struct - to allow initialization.
285285
// should subscribe to `client_pause_ec_` through `AwaitPauseState` to be
286286
// notified when the break is over.
287287
int client_pauses_[2] = {};
288-
EventCount client_pause_ec_;
288+
util::fb2::EventCount client_pause_ec_;
289289

290290
using Counter = util::SlidingCounter<7>;
291291
Counter qps_;

src/server/transaction.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ class Transaction {
427427
uint32_t DEBUG_Count() const; // Get current counter value
428428
private:
429429
std::atomic_uint32_t count_{0};
430-
EventCount ec_{};
430+
util::fb2::EventCount ec_{};
431431
};
432432

433433
// "Single claim - single modification" barrier. Multiple threads might try to claim it, only one
@@ -446,7 +446,7 @@ class Transaction {
446446
private:
447447
std::atomic_bool claimed_{false};
448448
std::atomic_bool closed_{false};
449-
EventCount ec_{};
449+
util::fb2::EventCount ec_{};
450450
};
451451

452452
private:

0 commit comments

Comments
 (0)