vk_scheduler: split work queue waits and execution waits
This commit is contained in:
		| @@ -47,14 +47,15 @@ Scheduler::Scheduler(const Device& device_, StateTracker& state_tracker_) | ||||
| Scheduler::~Scheduler() = default; | ||||
|  | ||||
| void Scheduler::Flush(VkSemaphore signal_semaphore, VkSemaphore wait_semaphore) { | ||||
|     // When flushing, we only send data to the worker thread; no waiting is necessary. | ||||
|     SubmitExecution(signal_semaphore, wait_semaphore); | ||||
|     AllocateNewContext(); | ||||
| } | ||||
|  | ||||
| void Scheduler::Finish(VkSemaphore signal_semaphore, VkSemaphore wait_semaphore) { | ||||
|     // When finishing, we need to wait for the submission to have executed on the device. | ||||
|     const u64 presubmit_tick = CurrentTick(); | ||||
|     SubmitExecution(signal_semaphore, wait_semaphore); | ||||
|     WaitWorker(); | ||||
|     Wait(presubmit_tick); | ||||
|     AllocateNewContext(); | ||||
| } | ||||
| @@ -63,8 +64,13 @@ void Scheduler::WaitWorker() { | ||||
|     MICROPROFILE_SCOPE(Vulkan_WaitForWorker); | ||||
|     DispatchWork(); | ||||
|  | ||||
|     std::unique_lock lock{work_mutex}; | ||||
|     wait_cv.wait(lock, [this] { return work_queue.empty(); }); | ||||
|     // Ensure the queue is drained. | ||||
|     std::unique_lock ql{queue_mutex}; | ||||
|     event_cv.wait(ql, [this] { return work_queue.empty(); }); | ||||
|  | ||||
|     // Now wait for execution to finish. | ||||
|     // This needs to be done in the same order as WorkerThread. | ||||
|     std::unique_lock el{execution_mutex}; | ||||
| } | ||||
|  | ||||
| void Scheduler::DispatchWork() { | ||||
| @@ -72,10 +78,10 @@ void Scheduler::DispatchWork() { | ||||
|         return; | ||||
|     } | ||||
|     { | ||||
|         std::scoped_lock lock{work_mutex}; | ||||
|         std::scoped_lock ql{queue_mutex}; | ||||
|         work_queue.push(std::move(chunk)); | ||||
|     } | ||||
|     work_cv.notify_one(); | ||||
|     event_cv.notify_all(); | ||||
|     AcquireNewChunk(); | ||||
| } | ||||
|  | ||||
| @@ -137,30 +143,55 @@ bool Scheduler::UpdateRescaling(bool is_rescaling) { | ||||
|  | ||||
| void Scheduler::WorkerThread(std::stop_token stop_token) { | ||||
|     Common::SetCurrentThreadName("VulkanWorker"); | ||||
|     do { | ||||
|         std::unique_ptr<CommandChunk> work; | ||||
|         bool has_submit{false}; | ||||
|         { | ||||
|             std::unique_lock lock{work_mutex}; | ||||
|  | ||||
|     const auto TryPopQueue{[this](auto& work) -> bool { | ||||
|         if (work_queue.empty()) { | ||||
|                 wait_cv.notify_all(); | ||||
|             } | ||||
|             Common::CondvarWait(work_cv, lock, stop_token, [&] { return !work_queue.empty(); }); | ||||
|             if (stop_token.stop_requested()) { | ||||
|                 continue; | ||||
|             return false; | ||||
|         } | ||||
|  | ||||
|         work = std::move(work_queue.front()); | ||||
|         work_queue.pop(); | ||||
|         event_cv.notify_all(); | ||||
|         return true; | ||||
|     }}; | ||||
|  | ||||
|             has_submit = work->HasSubmit(); | ||||
|             work->ExecuteAll(current_cmdbuf); | ||||
|     while (!stop_token.stop_requested()) { | ||||
|         std::unique_ptr<CommandChunk> work; | ||||
|  | ||||
|         { | ||||
|             std::unique_lock lk{queue_mutex}; | ||||
|  | ||||
|             // Wait for work. | ||||
|             Common::CondvarWait(event_cv, lk, stop_token, [&] { return TryPopQueue(work); }); | ||||
|  | ||||
|             // If we've been asked to stop, we're done. | ||||
|             if (stop_token.stop_requested()) { | ||||
|                 return; | ||||
|             } | ||||
|  | ||||
|             // Exchange lock ownership so that we take the execution lock before | ||||
|             // the queue lock goes out of scope. This allows us to force execution | ||||
|             // to complete in the next step. | ||||
|             std::exchange(lk, std::unique_lock{execution_mutex}); | ||||
|  | ||||
|             // Perform the work, tracking whether the chunk was a submission | ||||
|             // before executing. | ||||
|             const bool has_submit = work->HasSubmit(); | ||||
|             work->ExecuteAll(current_cmdbuf); | ||||
|  | ||||
|             // If the chunk was a submission, reallocate the command buffer. | ||||
|             if (has_submit) { | ||||
|                 AllocateWorkerCommandBuffer(); | ||||
|             } | ||||
|         std::scoped_lock reserve_lock{reserve_mutex}; | ||||
|         chunk_reserve.push_back(std::move(work)); | ||||
|     } while (!stop_token.stop_requested()); | ||||
|         } | ||||
|  | ||||
|         { | ||||
|             std::scoped_lock rl{reserve_mutex}; | ||||
|  | ||||
|             // Recycle the chunk back to the reserve. | ||||
|             chunk_reserve.emplace_back(std::move(work)); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| void Scheduler::AllocateWorkerCommandBuffer() { | ||||
| @@ -289,13 +320,16 @@ void Scheduler::EndRenderPass() { | ||||
| } | ||||
|  | ||||
| void Scheduler::AcquireNewChunk() { | ||||
|     std::scoped_lock lock{reserve_mutex}; | ||||
|     std::scoped_lock rl{reserve_mutex}; | ||||
|  | ||||
|     if (chunk_reserve.empty()) { | ||||
|         // If we don't have anything reserved, we need to make a new chunk. | ||||
|         chunk = std::make_unique<CommandChunk>(); | ||||
|     } else { | ||||
|         // Otherwise, we can just take from the reserve. | ||||
|         chunk = std::make_unique<CommandChunk>(); | ||||
|         return; | ||||
|     } | ||||
|     chunk = std::move(chunk_reserve.back()); | ||||
|         chunk_reserve.pop_back(); | ||||
|     } | ||||
| } | ||||
|  | ||||
| } // namespace Vulkan | ||||
|   | ||||
| @@ -232,10 +232,10 @@ private: | ||||
|  | ||||
|     std::queue<std::unique_ptr<CommandChunk>> work_queue; | ||||
|     std::vector<std::unique_ptr<CommandChunk>> chunk_reserve; | ||||
|     std::mutex execution_mutex; | ||||
|     std::mutex reserve_mutex; | ||||
|     std::mutex work_mutex; | ||||
|     std::condition_variable_any work_cv; | ||||
|     std::condition_variable wait_cv; | ||||
|     std::mutex queue_mutex; | ||||
|     std::condition_variable_any event_cv; | ||||
|     std::jthread worker_thread; | ||||
| }; | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Liam
					Liam