Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Motivated by "The load balancing issue in Poco::ActiveThreadPool" #4544 #4548

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions Foundation/include/Poco/ActiveThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@ class Foundation_API ActiveThreadPool
/// The thread pool always keeps fixed number of threads running.
/// Use case for this pool is running many (more than os-max-thread-count) short live tasks
/// Round-robin model allow efficiently utilize cpu cores
/// Using redistributeTasks option allows optimize reusage of idle threads
{
public:
ActiveThreadPool(int capacity = static_cast<int>(Environment::processorCount()) + 1,
int stackSize = POCO_THREAD_STACK_SIZE);
int stackSize = POCO_THREAD_STACK_SIZE,
bool redistributeTasks = false);
/// Creates a thread pool with fixed capacity threads.
/// Threads are created with given stack size.

ActiveThreadPool(std::string name,
int capacity = static_cast<int>(Environment::processorCount()) + 1,
int stackSize = POCO_THREAD_STACK_SIZE);
int stackSize = POCO_THREAD_STACK_SIZE,
bool redistributeTasks = false);
/// Creates a thread pool with the given name and fixed capacity threads.
/// Threads are created with given stack size.

Expand Down Expand Up @@ -107,11 +110,11 @@ class Foundation_API ActiveThreadPool
/// Returns a reference to the default
/// thread pool.

protected:
private:
ActiveThread* getThread();
ActiveThread* createThread();
void recreateThreads();

private:
ActiveThreadPool(const ActiveThreadPool& pool);
ActiveThreadPool& operator = (const ActiveThreadPool& pool);

Expand All @@ -124,6 +127,7 @@ class Foundation_API ActiveThreadPool
ThreadVec _threads;
mutable FastMutex _mutex;
std::atomic<size_t> _lastThreadIndex{0};
bool _redistributeTasks;
};


Expand Down
129 changes: 90 additions & 39 deletions Foundation/src/ActiveThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,29 +79,37 @@ class NewActionNotification: public Notification
class ActiveThread: public Runnable
{
public:
ActiveThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE);
ActiveThread(const std::string& name, std::vector<ActiveThread*> &threads, int stackSize = POCO_THREAD_STACK_SIZE);
~ActiveThread() override = default;

void start();
void start(Thread::Priority priority, Runnable& target);
void start(Thread::Priority priority, Runnable& target, const std::string& name);
void start(Notification::Ptr notification);
void setRedistributeOption(bool redistributeTask);
void join();
bool idle() const;
int id() const;
bool isRunning() const;
void release();
void run() override;

private:
NotificationQueue _pTargetQueue;
std::string _name;
std::vector<ActiveThread*> &_threads;
Thread _thread;
Event _targetCompleted;
FastMutex _mutex;
const long JOIN_TIMEOUT = 10000;
std::atomic<bool> _needToStop{false};
std::atomic<bool> _idle{true};
bool _redistributeTasks{false};
};


ActiveThread::ActiveThread(const std::string& name, int stackSize):
ActiveThread::ActiveThread(const std::string& name, std::vector<ActiveThread*> &threads, int stackSize):
_name(name),
_threads(threads),
_thread(name),
_targetCompleted(false)
{
Expand All @@ -115,28 +123,51 @@ void ActiveThread::start()
_thread.start(*this);
}


void ActiveThread::start(Thread::Priority priority, Runnable& target)
{
_pTargetQueue.enqueueNotification(Poco::makeAuto<NewActionNotification>(priority, target, _name));
}

void ActiveThread::start(Notification::Ptr notification)
{
if (!notification.isNull()){
_pTargetQueue.enqueueNotification(std::move(notification));
}
}

void ActiveThread::start(Thread::Priority priority, Runnable& target, const std::string& name)
{
_pTargetQueue.enqueueNotification(Poco::makeAuto<NewActionNotification>(priority, target, name));
}

void ActiveThread::setRedistributeOption(bool redistributeTask)
{
_redistributeTasks = redistributeTask;
}

void ActiveThread::join()
{
_pTargetQueue.wakeUpAll();
if (!_pTargetQueue.empty())
{
_targetCompleted.wait();
}
}

inline bool ActiveThread::idle() const
{
return _idle;
}

inline int ActiveThread::id() const
{
return _thread.id();
}

inline bool ActiveThread::isRunning() const
{
return _thread.isRunning();
}

void ActiveThread::release()
{
Expand All @@ -157,14 +188,29 @@ void ActiveThread::release()
}
}


void ActiveThread::run()
{
do
{
AutoPtr<Notification> pN = _pTargetQueue.waitDequeueNotification();
while (pN)
{
_idle = false;
if (_redistributeTasks)
{
for (const auto &thr : _threads)
{
if (thr && thr->isRunning() && (thr->id() != _thread.id()) && thr->idle())
{
thr->start(std::move(pN));
pN = _pTargetQueue.waitDequeueNotification(1000);
}
}
if (pN.isNull())
{
break;
}
}
NewActionNotification::Ptr pNAN = pN.cast<NewActionNotification>();
Runnable& target = pNAN->runnable();
_thread.setPriority(pNAN->priority());
Expand All @@ -191,47 +237,36 @@ void ActiveThread::run()
pN = _pTargetQueue.waitDequeueNotification(1000);
}
_targetCompleted.set();
_idle = true;
}
while (_needToStop == false);
}


ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize):
ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize, bool redistributeTasks):
_capacity(capacity),
_serial(0),
_stackSize(stackSize),
_lastThreadIndex(0)
_lastThreadIndex(0),
_redistributeTasks(redistributeTasks)
{
poco_assert (_capacity >= 1);

_threads.reserve(_capacity);

for (int i = 0; i < _capacity; i++)
{
ActiveThread* pThread = createThread();
_threads.push_back(pThread);
pThread->start();
}

recreateThreads();
}


ActiveThreadPool::ActiveThreadPool(std::string name, int capacity, int stackSize):
ActiveThreadPool::ActiveThreadPool(std::string name, int capacity, int stackSize, bool redistributeTasks):
_name(std::move(name)),
_capacity(capacity),
_serial(0),
_stackSize(stackSize),
_lastThreadIndex(0)
_lastThreadIndex(0),
_redistributeTasks(redistributeTasks)
{
poco_assert (_capacity >= 1);

_threads.reserve(_capacity);

for (int i = 0; i < _capacity; i++)
{
ActiveThread* pThread = createThread();
_threads.push_back(pThread);
pThread->start();
}

recreateThreads();
}


Expand Down Expand Up @@ -298,22 +333,15 @@ void ActiveThreadPool::joinAll()
{
pThread->join();
}

_threads.clear();
_threads.reserve(_capacity);

for (int i = 0; i < _capacity; i++)
{
ActiveThread* pThread = createThread();
_threads.push_back(pThread);
pThread->start();
}

recreateThreads();
}

ActiveThread* ActiveThreadPool::getThread()
{
auto thrSize = _threads.size();
auto i = (_lastThreadIndex++) % thrSize;
auto i = (_lastThreadIndex++);
i = i % thrSize;
ActiveThread* pThread = _threads[i];
return pThread;
}
Expand All @@ -323,9 +351,32 @@ ActiveThread* ActiveThreadPool::createThread()
{
std::ostringstream name;
name << _name << "[#active-thread-" << ++_serial << "]";
return new ActiveThread(name.str(), _stackSize);
return new ActiveThread(name.str(), _threads, _stackSize);
}

void ActiveThreadPool::recreateThreads()
{
_threads.clear();
_threads.reserve(_capacity);

for (int i = 0; i < _capacity; i++)
{
ActiveThread* pThread = createThread();
_threads.push_back(pThread);
pThread->start();
}
if (_redistributeTasks)
{
for (auto& thr : _threads)
{
while (!thr->isRunning())
{
Poco::Thread::sleep(100);
}
thr->setRedistributeOption(_redistributeTasks);
}
}
}

class ActiveThreadPoolSingletonHolder
{
Expand Down
67 changes: 67 additions & 0 deletions Foundation/testsuite/src/ActiveThreadPoolTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "Poco/Exception.h"
#include "Poco/Thread.h"
#include "Poco/Environment.h"
#include <vector>


using Poco::ActiveThreadPool;
Expand Down Expand Up @@ -76,6 +77,71 @@ void ActiveThreadPoolTest::testActiveThreadPool()
}


void ActiveThreadPoolTest::testActiveThreadLoadBalancing()
{
Poco::AtomicCounter lttCount;
ptrdiff_t lttPerTIDCount = 0;
Poco::FastMutex mutex;
class LongTimeTask : public Poco::Runnable
{
Poco::AtomicCounter &_counter;
ptrdiff_t &_tidCounter;
Poco::FastMutex &_mutex;
public:
LongTimeTask(Poco::AtomicCounter &counter, ptrdiff_t &tidCounter, Poco::FastMutex &mutex) :
_counter(counter),
_tidCounter(tidCounter),
_mutex(mutex)
{}
void run() override
{
_counter++;
{
Poco::FastMutex::ScopedLock lock(_mutex);
if (_tidCounter >= 0)
{
_tidCounter -= (ptrdiff_t)Poco::Thread::currentTid();
}
else
{
_tidCounter += (ptrdiff_t)Poco::Thread::currentTid();
}
}
Poco::Thread::sleep(1 * 110);
}
};

Poco::AtomicCounter sttCount;
class ShortTimeTask : public Poco::Runnable
{
Poco::AtomicCounter &_counter;
public:
ShortTimeTask(Poco::AtomicCounter &counter) : _counter(counter) {}
void run() override
{
_counter++;
}
};

const int capacity = 2;
const int taskCount = 200;
const bool redistributeTasks = true;
Poco::ActiveThreadPool pool(capacity, POCO_THREAD_STACK_SIZE, redistributeTasks);
std::vector<std::unique_ptr<LongTimeTask>> lttVec(taskCount);
std::vector<std::unique_ptr<ShortTimeTask>> sttVec(taskCount);
for (int i = 0; i < taskCount; i++) {
lttVec[i] = std::make_unique<LongTimeTask>(lttCount, lttPerTIDCount, mutex);
pool.start(*(lttVec[i]));
sttVec[i] = std::make_unique<ShortTimeTask>(sttCount);
pool.start(*(sttVec[i]));
}

pool.joinAll();
assertEqual(taskCount, lttCount.value());
assertEqual(taskCount, sttCount.value());
assertTrue(lttPerTIDCount != 0); // without optimization all tasks runs on single thread and this counter equal to 0, othrewise - no
}

void ActiveThreadPoolTest::setUp()
{
_count = 0;
Expand All @@ -98,6 +164,7 @@ CppUnit::Test* ActiveThreadPoolTest::suite()
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ActiveThreadPoolTest");

CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool);
CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadLoadBalancing);

return pSuite;
}
1 change: 1 addition & 0 deletions Foundation/testsuite/src/ActiveThreadPoolTest.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ActiveThreadPoolTest: public CppUnit::TestCase
~ActiveThreadPoolTest();

void testActiveThreadPool();
void testActiveThreadLoadBalancing();

void setUp();
void tearDown();
Expand Down
Loading