diff --git a/src/openrct2/core/JobPool.cpp b/src/openrct2/core/JobPool.cpp deleted file mode 100644 index 4f319ea056..0000000000 --- a/src/openrct2/core/JobPool.cpp +++ /dev/null @@ -1,103 +0,0 @@ -#include "JobPool.hpp" - -JobPool::JobPool() - : _shouldStop(false), - _processing(0) -{ - for (size_t n = 0; n < std::thread::hardware_concurrency(); n++) - { - _threads.emplace_back(&JobPool::processQueue, this); - } -} - -JobPool::~JobPool() -{ - { - unique_lock lock(_mutex); - _shouldStop = true; - _condPending.notify_all(); - } - - for (auto&& th : _threads) - { - if(th.joinable()) - th.join(); - } -} - -void JobPool::addTask(std::function workFn, std::function completionFn) -{ - unique_lock lock(_mutex); - _pending.push_back(TaskData_t{workFn, completionFn}); - _condPending.notify_one(); -} - -void JobPool::addTask(std::function workFn) -{ - unique_lock lock(_mutex); - _pending.push_back(TaskData_t{ workFn, nullptr }); - _condPending.notify_one(); -} - -void JobPool::join() -{ - while (true) - { - unique_lock lock(_mutex); - _condComplete.wait(lock, [this]() - { - return (_pending.empty() && _processing == 0) || - (_completed.empty() == false); - }); - - if (_completed.empty() && - _pending.empty() && - _processing == 0) - { - break; - } - - auto taskData = _completed.front(); - _completed.pop_front(); - - lock.unlock(); - - taskData.completionFn(); - } -} - -void JobPool::processQueue() -{ - while (true) - { - unique_lock lock(_mutex); - _condPending.wait(lock, [this]() { - return _shouldStop || !_pending.empty(); - }); - if (!_pending.empty()) - { - _processing++; - - auto taskData = _pending.front(); - _pending.pop_front(); - - lock.unlock(); - - taskData.workFn(); - - lock.lock(); - - if (taskData.completionFn) - { - _completed.push_back(taskData); - } - - _processing--; - _condComplete.notify_one(); - } - if(_shouldStop) - break; - } -} - - diff --git a/src/openrct2/core/JobPool.hpp b/src/openrct2/core/JobPool.hpp index 684e060986..bf6f47d723 100644 --- a/src/openrct2/core/JobPool.hpp +++ b/src/openrct2/core/JobPool.hpp @@ -14,6 +14,8 @@ *****************************************************************************/ #pragma endregion +#pragma once + #include #include #include @@ -31,8 +33,8 @@ private: const std::function completionFn; }; - std::atomic_bool _shouldStop; - std::atomic _processing; + std::atomic_bool _shouldStop = false; + std::atomic _processing = 0; std::vector _threads; std::deque _pending; std::deque _completed; @@ -43,16 +45,108 @@ private: typedef std::unique_lock unique_lock; public: - JobPool(); - ~JobPool(); + JobPool() + { + for (size_t n = 0; n < std::thread::hardware_concurrency(); n++) + { + _threads.emplace_back(&JobPool::processQueue, this); + } + } + + ~JobPool() + { + { + unique_lock lock(_mutex); + _shouldStop = true; + _condPending.notify_all(); + } + + for (auto&& th : _threads) + { + if (th.joinable()) + th.join(); + } + } void addTask(std::function workFn, - std::function completionFn); + std::function completionFn) + { + { + unique_lock lock(_mutex); + _pending.push_back(TaskData_t{ workFn, completionFn }); + _condPending.notify_one(); + } + } - void addTask(std::function workFn); + void addTask(std::function workFn) + { + return addTask(workFn, nullptr); + } - void join(); + void join() + { + while (true) + { + unique_lock lock(_mutex); + _condComplete.wait(lock, [this]() + { + return (_pending.empty() && _processing == 0) || + (_completed.empty() == false); + }); + + if (_completed.empty() && + _pending.empty() && + _processing == 0) + { + break; + } + + if (!_completed.empty()) + { + auto taskData = _completed.front(); + _completed.pop_front(); + + lock.unlock(); + + taskData.completionFn(); + + lock.lock(); + } + } + } private: - void processQueue(); + void processQueue() + { + while (true) + { + unique_lock lock(_mutex); + _condPending.wait(lock, [this]() { + return _shouldStop || !_pending.empty(); + }); + if (!_pending.empty()) + { + _processing++; + + auto taskData = _pending.front(); + _pending.pop_front(); + + lock.unlock(); + + taskData.workFn(); + + lock.lock(); + + if (taskData.completionFn) + { + _completed.push_back(taskData); + } + + _processing--; + _condComplete.notify_one(); + } + if (_shouldStop) + break; + } + } };