diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 7e83e64b0..08cff3450 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -75,6 +75,7 @@ set(HEADERS synchronized_wrapper.h telemetry.h thread.h + thread_pool.h thread_queue_list.h timer.h vector_math.h diff --git a/src/common/thread_pool.h b/src/common/thread_pool.h new file mode 100644 index 000000000..eb723db17 --- /dev/null +++ b/src/common/thread_pool.h @@ -0,0 +1,121 @@ +// Copyright 2016 Citra Emulator Project / PPSSPP Project +// Licensed under GPLv2 or any later version +// Refer to the license.txt file included. + +#include +#include +#include +#include +#include +#include +#include + +#include "common/assert.h" + +namespace Common { + +class ThreadPool { +private: + explicit ThreadPool(unsigned int num_threads) : num_threads(num_threads), workers(num_threads) { + ASSERT(num_threads); + } + +public: + static ThreadPool& GetPool() { + static ThreadPool thread_pool(std::thread::hardware_concurrency()); + return thread_pool; + } + + template + auto push(F&& f, Args&&... args) { + auto ret = workers[next_worker].push(std::forward(f), std::forward(args)...); + next_worker = (next_worker + 1) % num_threads; + return ret; + } + + unsigned int total_threads() { + return num_threads; + } + +private: + template + class ThreadsafeQueue { + private: + const size_t capacity; + std::vector queue_storage; + std::mutex mutex; + std::condition_variable queue_changed; + + public: + explicit ThreadsafeQueue(const size_t capacity) : capacity(capacity) { + queue_storage.reserve(capacity); + } + + void push(const T& element) { + std::unique_lock lock(mutex); + while (queue_storage.size() >= capacity) { + queue_changed.wait(lock); + } + queue_storage.push_back(element); + queue_changed.notify_one(); + } + + T pop() { + std::unique_lock lock(mutex); + while (queue_storage.empty()) { + queue_changed.wait(lock); + } + T element(std::move(queue_storage.back())); + queue_storage.pop_back(); + queue_changed.notify_one(); + return element; + } + + void push(T&& element) { + std::unique_lock lock(mutex); + while (queue_storage.size() >= capacity) { + queue_changed.wait(lock); + } + queue_storage.emplace_back(element); + queue_changed.notify_one(); + } + }; + + class Worker { + private: + ThreadsafeQueue> queue; + std::thread thread; + static constexpr size_t MAX_QUEUE_CAPACITY = 100; + + public: + Worker() : queue(MAX_QUEUE_CAPACITY), thread([this] { loop(); }) {} + + ~Worker() { + queue.push(nullptr); // Exit the loop + thread.join(); + } + + void loop() { + for (;;) { + std::function fn(queue.pop()); + if (!fn) // a nullptr function is the signal to exit the loop + break; + fn(); + } + } + + template + auto push(F&& f, Args&&... args) { + auto task = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...)); + queue.push([task]() { (*task)(); }); + return task->get_future(); + } + }; + + const unsigned int num_threads; + int next_worker = 0; + std::vector workers; +}; + +} // namespace Common