Skip to content

Commit 7329283

Browse files
authored
Remove redundant code and fix a race in tests (#24957) (#27752)
2 parents 063ae96 + 42819a6 commit 7329283

File tree

7 files changed

+70
-417
lines changed

7 files changed

+70
-417
lines changed

ydb/library/workload/tpcc/task_queue.h

Lines changed: 0 additions & 190 deletions
Original file line numberDiff line numberDiff line change
@@ -19,196 +19,6 @@ using Clock = std::chrono::steady_clock;
1919

2020
//-----------------------------------------------------------------------------
2121

22-
// We have two types of coroutines:
23-
// * Outter or Internal (in TPC-C this is a terminal task). It can sleep (co_awaiting) or wait for another "inner" task.
24-
// * Inner or External (in TPC-C this is a transaction task). E.g. it might perform async requests to YDB.
25-
// when it is finished, terminal task continues. It is called external, because usually it has to use
26-
// thread-safe interface of ITaskQueue (future used to co_await is normally set by another thread).
27-
28-
template <typename T>
29-
struct TTask {
30-
struct TPromiseType;
31-
using TCoroHandle = std::coroutine_handle<TPromiseType>;
32-
33-
struct TPromiseType {
34-
TTask get_return_object() {
35-
return TTask{std::coroutine_handle<TPromiseType>::from_promise(*this)};
36-
}
37-
38-
std::suspend_always initial_suspend() { return {}; }
39-
40-
auto final_suspend() noexcept {
41-
struct TFinalAwaiter {
42-
bool await_ready() noexcept { return false; }
43-
std::coroutine_handle<> await_suspend(TCoroHandle h) noexcept {
44-
if (h.promise().Continuation) {
45-
return h.promise().Continuation;
46-
}
47-
return std::noop_coroutine();
48-
}
49-
void await_resume() noexcept {}
50-
};
51-
return TFinalAwaiter{};
52-
}
53-
54-
template <typename U>
55-
void return_value(U&& v) {
56-
Value = std::forward<U>(v);
57-
}
58-
59-
void unhandled_exception() {
60-
Exception = std::current_exception();
61-
}
62-
63-
T Value;
64-
std::exception_ptr Exception;
65-
std::coroutine_handle<> Continuation;
66-
};
67-
68-
using promise_type = TPromiseType;
69-
70-
TTask(TCoroHandle h)
71-
: Handle(h)
72-
{
73-
}
74-
75-
// note, default move constructors doesn't null Handle,
76-
// so that we need to add our own
77-
TTask(TTask&& other) noexcept
78-
: Handle(std::exchange(other.Handle, nullptr))
79-
{}
80-
81-
TTask& operator=(TTask&& other) noexcept {
82-
if (this != &other) {
83-
if (Handle) {
84-
Handle.destroy();
85-
}
86-
Handle = std::exchange(other.Handle, nullptr);
87-
}
88-
return *this;
89-
}
90-
91-
TTask(const TTask&) = delete;
92-
TTask& operator=(const TTask&) = delete;
93-
94-
~TTask() {
95-
if (Handle) {
96-
Handle.destroy();
97-
}
98-
}
99-
100-
// awaitable task
101-
102-
bool await_ready() const noexcept {
103-
return Handle.done();
104-
}
105-
106-
void await_suspend(std::coroutine_handle<> awaiting) {
107-
Handle.promise().Continuation = awaiting;
108-
Handle.resume(); // start inner task
109-
}
110-
111-
T&& await_resume() {
112-
if (Handle.promise().Exception) {
113-
std::rethrow_exception(Handle.promise().Exception);
114-
}
115-
116-
return std::move(Handle.promise().Value);
117-
}
118-
119-
TCoroHandle Handle;
120-
};
121-
122-
template <>
123-
struct TTask<void> {
124-
struct TPromiseType;
125-
using TCoroHandle = std::coroutine_handle<TPromiseType>;
126-
127-
struct TPromiseType {
128-
TTask get_return_object() {
129-
return TTask{std::coroutine_handle<TPromiseType>::from_promise(*this)};
130-
}
131-
132-
std::suspend_always initial_suspend() { return {}; }
133-
134-
auto final_suspend() noexcept {
135-
struct TFinalAwaiter {
136-
bool await_ready() noexcept { return false; }
137-
std::coroutine_handle<> await_suspend(TCoroHandle h) noexcept {
138-
if (h.promise().Continuation) {
139-
return h.promise().Continuation;
140-
}
141-
return std::noop_coroutine();
142-
}
143-
void await_resume() noexcept {}
144-
};
145-
return TFinalAwaiter{};
146-
}
147-
148-
void return_void() {}
149-
150-
void unhandled_exception() {
151-
Exception = std::current_exception();
152-
}
153-
154-
std::exception_ptr Exception;
155-
std::coroutine_handle<> Continuation;
156-
};
157-
158-
using promise_type = TPromiseType;
159-
160-
TTask(TCoroHandle h)
161-
: Handle(h)
162-
{
163-
}
164-
165-
// note, default move constructors doesn't null Handle,
166-
// so that we need to add our own
167-
TTask(TTask&& other) noexcept
168-
: Handle(std::exchange(other.Handle, nullptr))
169-
{}
170-
171-
TTask& operator=(TTask&& other) noexcept {
172-
if (this != &other) {
173-
if (Handle) {
174-
Handle.destroy();
175-
}
176-
Handle = std::exchange(other.Handle, nullptr);
177-
}
178-
return *this;
179-
}
180-
181-
TTask(const TTask&) = delete;
182-
TTask& operator=(const TTask&) = delete;
183-
184-
~TTask() {
185-
if (Handle) {
186-
Handle.destroy();
187-
}
188-
}
189-
190-
// awaitable task
191-
192-
bool await_ready() const noexcept {
193-
return Handle.done();
194-
}
195-
196-
void await_suspend(std::coroutine_handle<> awaiting) {
197-
Handle.promise().Continuation = awaiting;
198-
Handle.resume(); // start inner task
199-
}
200-
201-
void await_resume() {
202-
if (Handle.promise().Exception) {
203-
std::rethrow_exception(Handle.promise().Exception);
204-
}
205-
}
206-
207-
TCoroHandle Handle;
208-
};
209-
210-
//-----------------------------------------------------------------------------
211-
21222
class ITaskQueue {
21323
public:
21424
struct TThreadStats {

ydb/library/workload/tpcc/terminal.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#include "util.h"
55
#include "constants.h"
66

7-
#include <util/generic/scope.h>
7+
#include <library/cpp/threading/future/core/coroutine_traits.h>
88

99
#include <array>
1010

@@ -89,21 +89,20 @@ TTerminal::TTerminal(size_t terminalID,
8989
, StopToken(stopToken)
9090
, StopWarmup(stopWarmup)
9191
, Stats(stats)
92-
, Task(Run())
9392
{
9493
}
9594

9695
void TTerminal::Start() {
9796
if (!Started) {
98-
TaskQueue.TaskReadyThreadSafe(Task.Handle, Context.TerminalID);
97+
TaskFuture = Run();
9998
Started = true;
10099
}
101100
}
102101

103-
TTerminalTask TTerminal::Run() {
102+
NThreading::TFuture<void> TTerminal::Run() {
104103
auto& Log = Context.Log; // to make LOG_* macros working
105104

106-
Y_DEFER { Stopped = true; };
105+
co_await TTaskReady(TaskQueue, Context.TerminalID);
107106

108107
LOG_D("Terminal " << Context.TerminalID << " has started");
109108

@@ -218,7 +217,7 @@ bool TTerminal::IsDone() const {
218217
return true;
219218
}
220219

221-
return Stopped;
220+
return TaskFuture.HasValue() || TaskFuture.HasException();
222221
}
223222

224223
} // namespace NYdb::NTPCC

ydb/library/workload/tpcc/terminal.h

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,6 @@ class TTerminalStats {
128128
std::atomic<bool> WasCleared{false};
129129
};
130130

131-
using TTerminalTask = TTask<void>;
132-
133131
//-----------------------------------------------------------------------------
134132

135133
class alignas(64) TTerminal {
@@ -164,7 +162,7 @@ class alignas(64) TTerminal {
164162
bool IsDone() const;
165163

166164
private:
167-
TTerminalTask Run();
165+
NThreading::TFuture<void> Run();
168166

169167
private:
170168
ITaskQueue& TaskQueue;
@@ -174,12 +172,10 @@ class alignas(64) TTerminal {
174172
std::atomic<bool>& StopWarmup;
175173
std::shared_ptr<TTerminalStats> Stats;
176174

177-
TTerminalTask Task;
175+
NThreading::TFuture<void> TaskFuture;
178176

179177
bool Started = false;
180178
bool WarmupWasStopped = false;
181-
182-
std::atomic<bool> Stopped = false;
183179
};
184180

185181
} // namespace NYdb::NTPCC

0 commit comments

Comments
 (0)