diff --git a/Foundation/include/Poco/ActiveThreadPool.h b/Foundation/include/Poco/ActiveThreadPool.h index 5c04a8ff02..66db1822ae 100644 --- a/Foundation/include/Poco/ActiveThreadPool.h +++ b/Foundation/include/Poco/ActiveThreadPool.h @@ -41,16 +41,19 @@ class Foundation_API ActiveThreadPool /// The thread pool always keeps fixed number of threads running. /// Use case for this pool is running many (more than os-max-thread-count) short live tasks /// Round-robin model allow efficiently utilize cpu cores + /// Using redistributeTasks option allows optimize reusage of idle threads { public: ActiveThreadPool(int capacity = static_cast(Environment::processorCount()) + 1, - int stackSize = POCO_THREAD_STACK_SIZE); + int stackSize = POCO_THREAD_STACK_SIZE, + bool redistributeTasks = false); /// Creates a thread pool with fixed capacity threads. /// Threads are created with given stack size. ActiveThreadPool(std::string name, int capacity = static_cast(Environment::processorCount()) + 1, - int stackSize = POCO_THREAD_STACK_SIZE); + int stackSize = POCO_THREAD_STACK_SIZE, + bool redistributeTasks = false); /// Creates a thread pool with the given name and fixed capacity threads. /// Threads are created with given stack size. @@ -107,11 +110,11 @@ class Foundation_API ActiveThreadPool /// Returns a reference to the default /// thread pool. -protected: +private: ActiveThread* getThread(); ActiveThread* createThread(); + void recreateThreads(); -private: ActiveThreadPool(const ActiveThreadPool& pool); ActiveThreadPool& operator = (const ActiveThreadPool& pool); @@ -124,6 +127,7 @@ class Foundation_API ActiveThreadPool ThreadVec _threads; mutable FastMutex _mutex; std::atomic _lastThreadIndex{0}; + bool _redistributeTasks; }; diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp index 628c78d6da..b867f8123d 100644 --- a/Foundation/src/ActiveThreadPool.cpp +++ b/Foundation/src/ActiveThreadPool.cpp @@ -79,29 +79,37 @@ class NewActionNotification: public Notification class ActiveThread: public Runnable { public: - ActiveThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE); + ActiveThread(const std::string& name, std::vector &threads, int stackSize = POCO_THREAD_STACK_SIZE); ~ActiveThread() override = default; void start(); void start(Thread::Priority priority, Runnable& target); void start(Thread::Priority priority, Runnable& target, const std::string& name); + void start(Notification::Ptr notification); + void setRedistributeOption(bool redistributeTask); void join(); + bool idle() const; + int id() const; + bool isRunning() const; void release(); void run() override; private: NotificationQueue _pTargetQueue; std::string _name; + std::vector &_threads; Thread _thread; Event _targetCompleted; FastMutex _mutex; const long JOIN_TIMEOUT = 10000; std::atomic _needToStop{false}; + std::atomic _idle{true}; + bool _redistributeTasks{false}; }; - -ActiveThread::ActiveThread(const std::string& name, int stackSize): +ActiveThread::ActiveThread(const std::string& name, std::vector &threads, int stackSize): _name(name), + _threads(threads), _thread(name), _targetCompleted(false) { @@ -115,18 +123,28 @@ void ActiveThread::start() _thread.start(*this); } - void ActiveThread::start(Thread::Priority priority, Runnable& target) { _pTargetQueue.enqueueNotification(Poco::makeAuto(priority, target, _name)); } +void ActiveThread::start(Notification::Ptr notification) +{ + if (!notification.isNull()){ + _pTargetQueue.enqueueNotification(std::move(notification)); + } +} void ActiveThread::start(Thread::Priority priority, Runnable& target, const std::string& name) { _pTargetQueue.enqueueNotification(Poco::makeAuto(priority, target, name)); } +void ActiveThread::setRedistributeOption(bool redistributeTask) +{ + _redistributeTasks = redistributeTask; +} + void ActiveThread::join() { _pTargetQueue.wakeUpAll(); @@ -134,9 +152,22 @@ void ActiveThread::join() { _targetCompleted.wait(); } +} + +inline bool ActiveThread::idle() const +{ + return _idle; +} +inline int ActiveThread::id() const +{ + return _thread.id(); } +inline bool ActiveThread::isRunning() const +{ + return _thread.isRunning(); +} void ActiveThread::release() { @@ -157,7 +188,6 @@ void ActiveThread::release() } } - void ActiveThread::run() { do @@ -165,6 +195,22 @@ void ActiveThread::run() AutoPtr pN = _pTargetQueue.waitDequeueNotification(); while (pN) { + _idle = false; + if (_redistributeTasks) + { + for (const auto &thr : _threads) + { + if (thr && thr->isRunning() && (thr->id() != _thread.id()) && thr->idle()) + { + thr->start(std::move(pN)); + pN = _pTargetQueue.waitDequeueNotification(1000); + } + } + if (pN.isNull()) + { + break; + } + } NewActionNotification::Ptr pNAN = pN.cast(); Runnable& target = pNAN->runnable(); _thread.setPriority(pNAN->priority()); @@ -191,47 +237,36 @@ void ActiveThread::run() pN = _pTargetQueue.waitDequeueNotification(1000); } _targetCompleted.set(); + _idle = true; } while (_needToStop == false); } -ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize): +ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize, bool redistributeTasks): _capacity(capacity), _serial(0), _stackSize(stackSize), - _lastThreadIndex(0) + _lastThreadIndex(0), + _redistributeTasks(redistributeTasks) { poco_assert (_capacity >= 1); - - _threads.reserve(_capacity); - - for (int i = 0; i < _capacity; i++) - { - ActiveThread* pThread = createThread(); - _threads.push_back(pThread); - pThread->start(); - } + + recreateThreads(); } -ActiveThreadPool::ActiveThreadPool(std::string name, int capacity, int stackSize): +ActiveThreadPool::ActiveThreadPool(std::string name, int capacity, int stackSize, bool redistributeTasks): _name(std::move(name)), _capacity(capacity), _serial(0), _stackSize(stackSize), - _lastThreadIndex(0) + _lastThreadIndex(0), + _redistributeTasks(redistributeTasks) { poco_assert (_capacity >= 1); - - _threads.reserve(_capacity); - - for (int i = 0; i < _capacity; i++) - { - ActiveThread* pThread = createThread(); - _threads.push_back(pThread); - pThread->start(); - } + + recreateThreads(); } @@ -298,22 +333,15 @@ void ActiveThreadPool::joinAll() { pThread->join(); } - - _threads.clear(); - _threads.reserve(_capacity); - - for (int i = 0; i < _capacity; i++) - { - ActiveThread* pThread = createThread(); - _threads.push_back(pThread); - pThread->start(); - } + + recreateThreads(); } ActiveThread* ActiveThreadPool::getThread() { auto thrSize = _threads.size(); - auto i = (_lastThreadIndex++) % thrSize; + auto i = (_lastThreadIndex++); + i = i % thrSize; ActiveThread* pThread = _threads[i]; return pThread; } @@ -323,9 +351,32 @@ ActiveThread* ActiveThreadPool::createThread() { std::ostringstream name; name << _name << "[#active-thread-" << ++_serial << "]"; - return new ActiveThread(name.str(), _stackSize); + return new ActiveThread(name.str(), _threads, _stackSize); } +void ActiveThreadPool::recreateThreads() +{ + _threads.clear(); + _threads.reserve(_capacity); + + for (int i = 0; i < _capacity; i++) + { + ActiveThread* pThread = createThread(); + _threads.push_back(pThread); + pThread->start(); + } + if (_redistributeTasks) + { + for (auto& thr : _threads) + { + while (!thr->isRunning()) + { + Poco::Thread::sleep(100); + } + thr->setRedistributeOption(_redistributeTasks); + } + } +} class ActiveThreadPoolSingletonHolder { diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp index 1bef49d0bb..e3643aedc0 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp @@ -16,6 +16,7 @@ #include "Poco/Exception.h" #include "Poco/Thread.h" #include "Poco/Environment.h" +#include using Poco::ActiveThreadPool; @@ -76,6 +77,71 @@ void ActiveThreadPoolTest::testActiveThreadPool() } +void ActiveThreadPoolTest::testActiveThreadLoadBalancing() +{ + Poco::AtomicCounter lttCount; + ptrdiff_t lttPerTIDCount = 0; + Poco::FastMutex mutex; + class LongTimeTask : public Poco::Runnable + { + Poco::AtomicCounter &_counter; + ptrdiff_t &_tidCounter; + Poco::FastMutex &_mutex; + public: + LongTimeTask(Poco::AtomicCounter &counter, ptrdiff_t &tidCounter, Poco::FastMutex &mutex) : + _counter(counter), + _tidCounter(tidCounter), + _mutex(mutex) + {} + void run() override + { + _counter++; + { + Poco::FastMutex::ScopedLock lock(_mutex); + if (_tidCounter >= 0) + { + _tidCounter -= (ptrdiff_t)Poco::Thread::currentTid(); + } + else + { + _tidCounter += (ptrdiff_t)Poco::Thread::currentTid(); + } + } + Poco::Thread::sleep(1 * 110); + } + }; + + Poco::AtomicCounter sttCount; + class ShortTimeTask : public Poco::Runnable + { + Poco::AtomicCounter &_counter; + public: + ShortTimeTask(Poco::AtomicCounter &counter) : _counter(counter) {} + void run() override + { + _counter++; + } + }; + + const int capacity = 2; + const int taskCount = 200; + const bool redistributeTasks = true; + Poco::ActiveThreadPool pool(capacity, POCO_THREAD_STACK_SIZE, redistributeTasks); + std::vector> lttVec(taskCount); + std::vector> sttVec(taskCount); + for (int i = 0; i < taskCount; i++) { + lttVec[i] = std::make_unique(lttCount, lttPerTIDCount, mutex); + pool.start(*(lttVec[i])); + sttVec[i] = std::make_unique(sttCount); + pool.start(*(sttVec[i])); + } + + pool.joinAll(); + assertEqual(taskCount, lttCount.value()); + assertEqual(taskCount, sttCount.value()); + assertTrue(lttPerTIDCount != 0); // without optimization all tasks runs on single thread and this counter equal to 0, othrewise - no +} + void ActiveThreadPoolTest::setUp() { _count = 0; @@ -98,6 +164,7 @@ CppUnit::Test* ActiveThreadPoolTest::suite() CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ActiveThreadPoolTest"); CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool); + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadLoadBalancing); return pSuite; } diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.h b/Foundation/testsuite/src/ActiveThreadPoolTest.h index 51df837355..44f33a751d 100644 --- a/Foundation/testsuite/src/ActiveThreadPoolTest.h +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.h @@ -27,6 +27,7 @@ class ActiveThreadPoolTest: public CppUnit::TestCase ~ActiveThreadPoolTest(); void testActiveThreadPool(); + void testActiveThreadLoadBalancing(); void setUp(); void tearDown();