From 15d573194c95b95ccf4a5480d8e40a7765a00929 Mon Sep 17 00:00:00 2001 From: Morph <39850852+Morph1984@users.noreply.github.com> Date: Sun, 19 Mar 2023 04:01:47 -0400 Subject: bounded_threadsafe_queue: Add TryPush --- src/common/bounded_threadsafe_queue.h | 71 +++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/src/common/bounded_threadsafe_queue.h b/src/common/bounded_threadsafe_queue.h index e03427539..eb88cc1d1 100644 --- a/src/common/bounded_threadsafe_queue.h +++ b/src/common/bounded_threadsafe_queue.h @@ -22,6 +22,55 @@ class SPSCQueue { static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two."); public: + bool TryPush(T&& t) { + const size_t write_index = m_write_index.load(); + + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Push into the queue. + m_data[pos] = std::move(t); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); + + return true; + } + + template + bool TryPush(Args&&... args) { + const size_t write_index = m_write_index.load(); + + // Check if we have free slots to write to. + if ((write_index - m_read_index.load()) == Capacity) { + return false; + } + + // Determine the position to write to. + const size_t pos = write_index % Capacity; + + // Emplace into the queue. + std::construct_at(std::addressof(m_data[pos]), std::forward(args)...); + + // Increment the write index. + ++m_write_index; + + // Notify the consumer that we have pushed into the queue. + std::scoped_lock lock{cv_mutex}; + cv.notify_one(); + + return true; + } + void Push(T&& t) { const size_t write_index = m_write_index.load(); @@ -153,6 +202,17 @@ private: template class MPSCQueue { public: + bool TryPush(T&& t) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryPush(std::move(t)); + } + + template + bool TryPush(Args&&... args) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryPush(std::forward(args)...); + } + void Push(T&& t) { std::scoped_lock lock{write_mutex}; spsc_queue.Push(std::move(t)); @@ -196,6 +256,17 @@ private: template class MPMCQueue { public: + bool TryPush(T&& t) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryPush(std::move(t)); + } + + template + bool TryPush(Args&&... args) { + std::scoped_lock lock{write_mutex}; + return spsc_queue.TryPush(std::forward(args)...); + } + void Push(T&& t) { std::scoped_lock lock{write_mutex}; spsc_queue.Push(std::move(t)); -- cgit v1.2.3