diff options
-rw-r--r-- | src/common/bounded_threadsafe_queue.h | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index 975215863..0fb2f42d1 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -45,12 +45,12 @@ public: } void PopWait(T& t, std::stop_token stop_token) { - Wait(stop_token); + ConsumerWait(stop_token); Pop(t); } T PopWait(std::stop_token stop_token) { - Wait(stop_token); + ConsumerWait(stop_token); T t; Pop(t); return t; @@ -88,9 +88,10 @@ private: } } else if constexpr (Mode == PushMode::Wait) { // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); - } + std::unique_lock lock{producer_cv_mutex}; + producer_cv.wait(lock, [this, write_index] { + return (write_index - m_read_index.load()) < Capacity; + }); } else { static_assert(Mode < PushMode::Count, "Invalid PushMode."); } @@ -105,8 +106,8 @@ private: ++m_write_index; // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + std::scoped_lock lock{consumer_cv_mutex}; + consumer_cv.notify_one(); return true; } @@ -122,9 +123,10 @@ private: } } else if constexpr (Mode == PushMode::Wait) { // Wait until we have free slots to write to. - while ((write_index - m_read_index.load()) == Capacity) { - std::this_thread::yield(); - } + std::unique_lock lock{producer_cv_mutex}; + producer_cv.wait(lock, [this, write_index] { + return (write_index - m_read_index.load()) < Capacity; + }); } else { static_assert(Mode < PushMode::Count, "Invalid PushMode."); } @@ -139,8 +141,8 @@ private: ++m_write_index; // Notify the consumer that we have pushed into the queue. - std::scoped_lock lock{cv_mutex}; - cv.notify_one(); + std::scoped_lock lock{consumer_cv_mutex}; + consumer_cv.notify_one(); return true; } @@ -161,6 +163,10 @@ private: // Increment the read index. ++m_read_index; + + // Notify the producer that we have popped off the queue. + std::unique_lock lock{producer_cv_mutex}; + producer_cv.notify_one(); } bool Pop(T& t) { @@ -180,12 +186,16 @@ private: // Increment the read index. ++m_read_index; + // Notify the producer that we have popped off the queue. + std::scoped_lock lock{producer_cv_mutex}; + producer_cv.notify_one(); + return true; } - void Wait(std::stop_token stop_token) { - std::unique_lock lock{cv_mutex}; - Common::CondvarWait(cv, lock, stop_token, [this] { return !Empty(); }); + void ConsumerWait(std::stop_token stop_token) { + std::unique_lock lock{consumer_cv_mutex}; + Common::CondvarWait(consumer_cv, lock, stop_token, [this] { return !Empty(); }); } alignas(128) std::atomic_size_t m_read_index{0}; @@ -193,8 +203,10 @@ private: std::array<T, Capacity> m_data; - std::condition_variable_any cv; - std::mutex cv_mutex; + std::condition_variable_any producer_cv; + std::mutex producer_cv_mutex; + std::condition_variable_any consumer_cv; + std::mutex consumer_cv_mutex; }; template <typename T, size_t Capacity = detail::DefaultCapacity> |