Skip to content

Commit b88bc09

Browse files
committed
Fix data race problem related to leap array and event loop
1 parent 780220f commit b88bc09

File tree

6 files changed

+40
-20
lines changed

6 files changed

+40
-20
lines changed

sentinel-core/log/block/block_log_task.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ void BlockLogTask::Log(const std::string& resource, const std::string& cause) {
8181
}
8282
auto key = absl::StrFormat("%s|%s", resource, cause);
8383
{
84-
absl::ReaderMutexLock lck(&mtx_);
84+
absl::WriterMutexLock lck(&mtx_);
8585
auto it = map_.find(key);
8686
if (it != map_.end()) {
8787
it->second.last_block_ = TimeUtils::CurrentTimeMillis().count();

sentinel-core/statistic/base/leap_array.h

+12-3
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class LeapArray {
4848
const int32_t bucket_length_ms_; // time length of each bucket
4949
private:
5050
const std::unique_ptr<WindowWrapSharedPtr<T>[]> array_;
51-
std::mutex mtx_;
51+
mutable std::mutex leap_array_mtx_;
5252

5353
int32_t CalculateTimeIdx(/*@Valid*/ int64_t time_millis) const;
5454
int64_t CalculateWindowStart(/*@Valid*/ int64_t time_millis) const;
@@ -78,9 +78,12 @@ WindowWrapSharedPtr<T> LeapArray<T>::CurrentWindow(int64_t time_millis) {
7878
int64_t bucket_start = CalculateWindowStart(time_millis);
7979

8080
while (true) {
81+
std::unique_lock<std::mutex> lck(leap_array_mtx_, std::defer_lock);
82+
// TODO: granularity too rough, need to be optimized.
83+
leap_array_mtx_.lock();
8184
WindowWrapSharedPtr<T> old = array_[idx];
85+
leap_array_mtx_.unlock();
8286
if (old == nullptr) {
83-
std::unique_lock<std::mutex> lck(mtx_, std::defer_lock);
8487
if (lck.try_lock() && array_[idx] == nullptr) {
8588
WindowWrapSharedPtr<T> bucket = std::make_shared<WindowWrap<T>>(
8689
bucket_length_ms_, bucket_start, NewEmptyBucket(time_millis));
@@ -90,7 +93,7 @@ WindowWrapSharedPtr<T> LeapArray<T>::CurrentWindow(int64_t time_millis) {
9093
} else if (bucket_start == old->BucketStart()) {
9194
return old;
9295
} else if (bucket_start > old->BucketStart()) {
93-
std::unique_lock<std::mutex> lck(mtx_, std::defer_lock);
96+
std::unique_lock<std::mutex> lck(leap_array_mtx_, std::defer_lock);
9497
if (lck.try_lock()) {
9598
ResetWindowTo(old, bucket_start);
9699
return old;
@@ -148,7 +151,10 @@ std::vector<WindowWrapSharedPtr<T>> LeapArray<T>::Buckets(
148151
}
149152
int size = sample_count_; // array_.size()
150153
for (int i = 0; i < size; i++) {
154+
// TODO: granularity too rough, need to be optimized.
155+
leap_array_mtx_.lock();
151156
auto w = array_[i];
157+
leap_array_mtx_.unlock();
152158
if (w == nullptr || IsBucketDeprecated(time_millis, w)) {
153159
continue;
154160
}
@@ -166,7 +172,10 @@ std::vector<std::shared_ptr<T>> LeapArray<T>::Values(
166172
}
167173
int size = sample_count_; // array_.size()
168174
for (int i = 0; i < size; i++) {
175+
// TODO: granularity too rough, need to be optimized.
176+
leap_array_mtx_.lock();
169177
WindowWrapSharedPtr<T> w = array_[i];
178+
leap_array_mtx_.unlock();
170179
if (w == nullptr || IsBucketDeprecated(time_millis, w)) {
171180
continue;
172181
}

sentinel-core/transport/common/BUILD

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ cc_library(
1111
],
1212
deps = [
1313
"//:libevent",
14+
"@com_google_absl//absl/synchronization",
1415
],
1516
visibility = ["//visibility:public"],
1617
)

sentinel-core/transport/common/event_loop_thread.cc

+13-9
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@
77
namespace Sentinel {
88
namespace Transport {
99

10-
EventLoopThread::EventLoopThread() : stoped_(true) {}
10+
EventLoopThread::EventLoopThread() = default;
1111

1212
bool EventLoopThread::Start() {
1313
std::promise<bool> start_promise;
1414
auto start_future = start_promise.get_future();
1515

16-
thd_.reset(
17-
new std::thread([this, &start_promise] { this->Work(start_promise); }));
16+
thd_.reset(new std::thread(
17+
[start_promise = std::move(start_promise), this]() mutable {
18+
this->Work(std::move(start_promise));
19+
}));
1820

1921
return start_future.get();
2022
}
@@ -30,7 +32,7 @@ void EventLoopThread::Stop() {
3032
thd_->join();
3133
}
3234

33-
void EventLoopThread::Work(std::promise<bool>& promise) {
35+
void EventLoopThread::Work(std::promise<bool>&& promise) {
3436
auto ret = InitEventBase();
3537
if (!ret) {
3638
promise.set_value(false);
@@ -41,7 +43,9 @@ void EventLoopThread::Work(std::promise<bool>& promise) {
4143

4244
Dispatch();
4345

44-
ClearEventBase();
46+
// Do free job outside by whom use eventloop event_base struct, i.e.,
47+
// HttpServer. If not follow the rule above, which will lead to unexpected
48+
// problems of data race. ClearEventBase();
4549
}
4650

4751
bool EventLoopThread::InitEventBase() {
@@ -100,15 +104,15 @@ void EventLoopThread::Dispatch() {
100104
}
101105
}
102106

103-
void EventLoopThread::RunTask(Functor func) {
107+
void EventLoopThread::RunTask(Functor&& func) {
104108
if (IsInLoopThread()) {
105109
func();
106110
return;
107111
}
108112

109113
{
110-
std::lock_guard<std::mutex> lock(task_mutex_);
111-
pending_tasks_.emplace_back(func);
114+
absl::WriterMutexLock lck(&task_mutex_);
115+
pending_tasks_.emplace_back(std::move(func));
112116
}
113117

114118
Wakeup();
@@ -141,7 +145,7 @@ void EventLoopThread::DoPendingTasks() {
141145
std::vector<Functor> functors;
142146

143147
{
144-
std::lock_guard<std::mutex> lock(task_mutex_);
148+
absl::WriterMutexLock lck(&task_mutex_);
145149
functors.swap(pending_tasks_);
146150
}
147151

sentinel-core/transport/common/event_loop_thread.h

+7-5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
#include <event2/event.h>
99

10+
#include "absl/synchronization/mutex.h"
11+
1012
namespace Sentinel {
1113
namespace Transport {
1214

@@ -22,7 +24,7 @@ class EventLoopThread {
2224

2325
struct event_base *GetEventBase();
2426

25-
void RunTask(Functor func);
27+
void RunTask(Functor &&func);
2628

2729
bool IsInLoopThread() const;
2830

@@ -31,7 +33,7 @@ class EventLoopThread {
3133
void ClearEventBase();
3234

3335
void Dispatch();
34-
void Work(std::promise<bool> &promise);
36+
void Work(std::promise<bool> &&promise);
3537
void Wakeup();
3638
void DoPendingTasks();
3739

@@ -42,11 +44,11 @@ class EventLoopThread {
4244
struct event_base *base_ = nullptr;
4345

4446
std::unique_ptr<std::thread> thd_;
45-
std::atomic<bool> stoped_;
47+
std::atomic<bool> stoped_{true};
4648
evutil_socket_t wakeup_fd_[2]; // 0:read 1:write
4749

48-
std::mutex task_mutex_;
49-
std::vector<Functor> pending_tasks_;
50+
absl::Mutex task_mutex_;
51+
std::vector<Functor> pending_tasks_ GUARDED_BY(task_mutex_);
5052
};
5153

5254
} // namespace Transport

tests/tsan-flow.cc

+6-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ void doAnotherEntry() { doEntry("big_brother_service:foo()"); }
3636
int main() {
3737
// Initialize for Sentinel.
3838
Sentinel::Log::Logger::InitDefaultLogger();
39-
Sentinel::Transport::HttpCommandCenterInitTarget command_center_init;
40-
command_center_init.Initialize();
39+
Sentinel::Transport::HttpCommandCenterInitTarget* p_command_center_init =
40+
new Sentinel::Transport::HttpCommandCenterInitTarget();
41+
p_command_center_init->Initialize();
4142
Sentinel::Log::MetricLogTask metric_log_task;
4243
metric_log_task.Initialize();
4344

@@ -73,5 +74,8 @@ int main() {
7374
t4.join();
7475
t5.join();
7576
t6.join();
77+
78+
delete p_command_center_init;
79+
7680
return 0;
7781
}

0 commit comments

Comments
 (0)