From 8d53e0926968fb002be07b1fcb8ef0629c395a61 Mon Sep 17 00:00:00 2001 From: KiritoDev <36680385+KiritoDv@users.noreply.github.com> Date: Mon, 21 Mar 2022 21:38:16 -0600 Subject: [PATCH] Thread pool prototype --- OTRGui/src/ctpl/ctpl_stl.h | 251 ++++++++++++++++++++++++ OTRGui/src/game/game.cpp | 4 +- OTRGui/src/impl/extractor/extractor.cpp | 9 +- 3 files changed, 260 insertions(+), 4 deletions(-) create mode 100644 OTRGui/src/ctpl/ctpl_stl.h diff --git a/OTRGui/src/ctpl/ctpl_stl.h b/OTRGui/src/ctpl/ctpl_stl.h new file mode 100644 index 000000000..8e2d9c908 --- /dev/null +++ b/OTRGui/src/ctpl/ctpl_stl.h @@ -0,0 +1,251 @@ +/********************************************************* +* +* Copyright (C) 2014 by Vitaliy Vitsentiy +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*********************************************************/ + + +#ifndef __ctpl_stl_thread_pool_H__ +#define __ctpl_stl_thread_pool_H__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + + +// thread pool to run user's functors with signature +// ret func(int id, other_params) +// where id is the index of the thread that runs the functor +// ret is some return type + + +namespace ctpl { + + namespace detail { + template + class Queue { + public: + bool push(T const & value) { + std::unique_lock lock(this->mutex); + this->q.push(value); + return true; + } + // deletes the retrieved element, do not use for non integral types + bool pop(T & v) { + std::unique_lock lock(this->mutex); + if (this->q.empty()) + return false; + v = this->q.front(); + this->q.pop(); + return true; + } + bool empty() { + std::unique_lock lock(this->mutex); + return this->q.empty(); + } + private: + std::queue q; + std::mutex mutex; + }; + } + + class thread_pool { + + public: + + thread_pool() { this->init(); } + thread_pool(int nThreads) { this->init(); this->resize(nThreads); } + + // the destructor waits for all the functions in the queue to be finished + ~thread_pool() { + this->stop(true); + } + + // get the number of running threads in the pool + int size() { return static_cast(this->threads.size()); } + + // number of idle threads + int n_idle() { return this->nWaiting; } + std::thread & get_thread(int i) { return *this->threads[i]; } + + // change the number of threads in the pool + // should be called from one thread, otherwise be careful to not interleave, also with this->stop() + // nThreads must be >= 0 + void resize(int nThreads) { + if (!this->isStop && !this->isDone) { + int oldNThreads = static_cast(this->threads.size()); + if (oldNThreads <= nThreads) { // if the number of threads is increased + this->threads.resize(nThreads); + this->flags.resize(nThreads); + + for (int i = oldNThreads; i < nThreads; ++i) { + this->flags[i] = std::make_shared>(false); + this->set_thread(i); + } + } + else { // the number of threads is decreased + for (int i = oldNThreads - 1; i >= nThreads; --i) { + *this->flags[i] = true; // this thread will finish + this->threads[i]->detach(); + } + { + // stop the detached threads that were waiting + std::unique_lock lock(this->mutex); + this->cv.notify_all(); + } + this->threads.resize(nThreads); // safe to delete because the threads are detached + this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals + } + } + } + + // empty the queue + void clear_queue() { + std::function * _f; + while (this->q.pop(_f)) + delete _f; // empty the queue + } + + // pops a functional wrapper to the original function + std::function pop() { + std::function * _f = nullptr; + this->q.pop(_f); + std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred + std::function f; + if (_f) + f = *_f; + return f; + } + + // wait for all computing threads to finish and stop all threads + // may be called asynchronously to not pause the calling thread while waiting + // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions + void stop(bool isWait = false) { + if (!isWait) { + if (this->isStop) + return; + this->isStop = true; + for (int i = 0, n = this->size(); i < n; ++i) { + *this->flags[i] = true; // command the threads to stop + } + this->clear_queue(); // empty the queue + } + else { + if (this->isDone || this->isStop) + return; + this->isDone = true; // give the waiting threads a command to finish + } + { + std::unique_lock lock(this->mutex); + this->cv.notify_all(); // stop all waiting threads + } + for (int i = 0; i < static_cast(this->threads.size()); ++i) { // wait for the computing threads to finish + if (this->threads[i]->joinable()) + this->threads[i]->join(); + } + // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads + // therefore delete them here + this->clear_queue(); + this->threads.clear(); + this->flags.clear(); + } + + template + auto push(F && f, Rest&&... rest) ->std::future { + auto pck = std::make_shared>( + std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...) + ); + auto _f = new std::function([pck](int id) { + (*pck)(id); + }); + this->q.push(_f); + std::unique_lock lock(this->mutex); + this->cv.notify_one(); + return pck->get_future(); + } + + // run the user's function that excepts argument int - id of the running thread. returned value is templatized + // operator returns std::future, where the user can get the result and rethrow the catched exceptins + template + auto push(F && f) ->std::future { + auto pck = std::make_shared>(std::forward(f)); + auto _f = new std::function([pck](int id) { + (*pck)(id); + }); + this->q.push(_f); + std::unique_lock lock(this->mutex); + this->cv.notify_one(); + return pck->get_future(); + } + + + private: + + // deleted + thread_pool(const thread_pool &);// = delete; + thread_pool(thread_pool &&);// = delete; + thread_pool & operator=(const thread_pool &);// = delete; + thread_pool & operator=(thread_pool &&);// = delete; + + void set_thread(int i) { + std::shared_ptr> flag(this->flags[i]); // a copy of the shared ptr to the flag + auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() { + std::atomic & _flag = *flag; + std::function * _f; + bool isPop = this->q.pop(_f); + while (true) { + while (isPop) { // if there is anything in the queue + std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred + (*_f)(i); + if (_flag) + return; // the thread is wanted to stop, return even if the queue is not empty yet + else + isPop = this->q.pop(_f); + } + // the queue is empty here, wait for the next command + std::unique_lock lock(this->mutex); + ++this->nWaiting; + this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; }); + --this->nWaiting; + if (!isPop) + return; // if the queue is empty and this->isDone == true or *flag then return + } + }; + this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() + } + + void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; } + + std::vector> threads; + std::vector>> flags; + detail::Queue *> q; + std::atomic isDone; + std::atomic isStop; + std::atomic nWaiting; // how many threads are waiting + + std::mutex mutex; + std::condition_variable cv; + }; + +} + +#endif // __ctpl_stl_thread_pool_H__ \ No newline at end of file diff --git a/OTRGui/src/game/game.cpp b/OTRGui/src/game/game.cpp index 65103f732..3fc6438a8 100644 --- a/OTRGui/src/game/game.cpp +++ b/OTRGui/src/game/game.cpp @@ -21,7 +21,7 @@ Vector2 dragOffset; std::string sohFolder = NULLSTR; bool extracting = false; bool rom_ready = false; -bool single_thread = true; +bool single_thread = false; bool hide_second_btn = false; RomVersion version; const char* patched_rom = "tmp/rom.z64"; @@ -127,7 +127,7 @@ void OTRGame::draw() { UIUtils::GuiShadowText(("Rom Type: " + version.version).c_str(), 32, text_y, 10, WHITE, BLACK); UIUtils::GuiShadowText("Tool Version: 1.0", 32, text_y + 15, 10, WHITE, BLACK); UIUtils::GuiShadowText("OTR Version: 1.0", 32, text_y + 30, 10, WHITE, BLACK); - // UIUtils::GuiToggle(&single_thread, "Single Thread", 32, text_y + 40, currentStep != NULLSTR); + UIUtils::GuiToggle(&single_thread, "Single Thread", 32, text_y + 40, currentStep != NULLSTR); if(!hide_second_btn && UIUtils::GuiIconButton("Folder", "Open\nShip Folder", 109, 50, currentStep != NULLSTR, "Select your Ship of Harkinian Folder\n\nYou could use another folder\nfor development purposes")) { const std::string path = NativeFS->LaunchFileExplorer(LaunchType::FOLDER); diff --git a/OTRGui/src/impl/extractor/extractor.cpp b/OTRGui/src/impl/extractor/extractor.cpp index ebe32eee9..ec435a9e4 100644 --- a/OTRGui/src/impl/extractor/extractor.cpp +++ b/OTRGui/src/impl/extractor/extractor.cpp @@ -3,6 +3,7 @@ #include "impl.h" #include "utils/mutils.h" +#include "ctpl/ctpl_stl.h" #include #ifdef _WIN32 @@ -68,6 +69,9 @@ void startWorker() { std::vector files; Util::dirscan(path, files); std::vector xmlFiles; + + const int num_threads = std::thread::hardware_concurrency(); + ctpl::thread_pool pool(num_threads / 2); for(auto &file : files) { if (file.find(".xml") != std::string::npos) xmlFiles.push_back(file); } @@ -76,8 +80,9 @@ void startWorker() { if(single_thread) { ExtractFunc(file); } else { - std::thread thread_object(ExtractFunc, file); - thread_object.detach(); + pool.push([file](int) { + ExtractFunc(file); + }); } }