29#include <seqan3/utility/parallel/detail/spin_delay.hpp>
31namespace seqan3::contrib
35enum class queue_op_status : uint8_t
43enum struct buffer_queue_policy : uint8_t
87 buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
91 using buffer_type = buffer_t;
92 using value_type =
typename buffer_type::value_type;
93 using size_type =
typename buffer_type::size_type;
94 using reference = void;
95 using const_reference = void;
98 buffer_queue() : buffer_queue{0u}
100 buffer_queue(buffer_queue
const &) =
delete;
101 buffer_queue(buffer_queue &&) =
delete;
102 buffer_queue & operator=(buffer_queue
const &) =
delete;
103 buffer_queue & operator=(buffer_queue &&) =
delete;
104 ~buffer_queue() =
default;
107 explicit buffer_queue(size_type
const init_capacity)
109 buffer.resize(init_capacity + 1);
113 template <std::ranges::input_range range_type>
115 buffer_queue(size_type
const init_capacity, range_type && r) : buffer_queue{init_capacity}
117 std::ranges::copy(r, std::ranges::begin(buffer));
123 template <
typename value2_t>
125 void push(value2_t && value)
127 detail::spin_delay delay{};
132 if (status == queue_op_status::closed)
133 throw queue_op_status::closed;
134 else if (status == queue_op_status::success)
137 assert(status != queue_op_status::empty);
138 assert(status == queue_op_status::full);
143 template <
typename value2_t>
145 queue_op_status wait_push(value2_t && value)
147 detail::spin_delay delay{};
153 if (status != queue_op_status::full)
156 assert(status != queue_op_status::empty);
157 assert(status == queue_op_status::full);
162 value_type value_pop()
164 detail::spin_delay delay{};
169 if (!writer_waiting.load())
171 auto status = try_pop(value);
173 if (status == queue_op_status::closed)
174 throw queue_op_status::closed;
175 else if (status == queue_op_status::success)
178 assert(status != queue_op_status::full);
179 assert(status == queue_op_status::empty);
185 queue_op_status wait_pop(value_type & value)
187 detail::spin_delay delay{};
192 if (!writer_waiting.load())
196 if (status == queue_op_status::closed || status == queue_op_status::success)
199 assert(status != queue_op_status::full);
200 assert(status == queue_op_status::empty);
211 template <
typename value2_t>
213 queue_op_status try_push(value2_t &&);
215 queue_op_status try_pop(value_t &);
223 if (writer_waiting.exchange(
true))
230 writer_waiting.store(
false);
234 writer_waiting.store(
false);
239 bool is_closed() const noexcept
247 return pop_front_position == push_back_position;
250 bool is_full() const noexcept
253 return is_ring_buffer_exhausted(pop_front_position, push_back_position);
256 size_type
size() const noexcept
259 if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
261 return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
265 assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
266 return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
281 constexpr bool is_ring_buffer_exhausted(size_type
const from, size_type
const to)
const
283 assert(to <= (from + ring_buffer_capacity + 1));
285 return to >= from + ring_buffer_capacity;
301 constexpr size_type to_buffer_position(size_type
const position)
const
303 return position & (ring_buffer_capacity - 1);
325 size_type cyclic_increment(size_type position)
332 if (to_buffer_position(++position) >= buffer.size())
333 position += ring_buffer_capacity - buffer.size();
337 template <
typename value2_t>
339 bool overflow(value2_t &&)
344 template <
typename value2_t>
346 bool overflow(value2_t && value);
361template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
362using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
365template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
366using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
376template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
377template <
typename value2_t>
379inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
384 size_type old_size = buffer.size();
385 size_type ring_buffer_capacity = this->ring_buffer_capacity;
386 size_type local_front = this->pop_front_position;
387 size_type local_back = this->push_back_position;
390 assert(local_back == this->pending_push_back_position);
391 assert(local_front == this->pending_pop_front_position);
393 bool valueWasAppended =
false;
397 if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
404 auto it = std::ranges::begin(buffer) + to_buffer_position(local_back);
406 local_back = local_front + ring_buffer_capacity;
407 valueWasAppended =
true;
410 assert(is_ring_buffer_exhausted(local_front, local_back));
413 size_type front_buffer_position = to_buffer_position(local_front);
414 size_type back_buffer_position = to_buffer_position(local_back);
417 buffer.resize(old_size + 1);
419 std::ranges::move_backward(
std::span{buffer.data() + front_buffer_position, buffer.data() + old_size},
420 buffer.data() + buffer.size());
425 this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
426 this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
428 this->ring_buffer_capacity = ring_buffer_capacity;
430 return valueWasAppended;
453template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
454inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
459 size_type local_pending_pop_front_position{};
460 size_type next_local_pop_front_position{};
461 detail::spin_delay spinDelay{};
463 local_pending_pop_front_position = this->pending_pop_front_position;
467 size_type local_push_back_position = this->push_back_position;
469 assert(local_pending_pop_front_position <= local_push_back_position);
472 if (local_pending_pop_front_position == local_push_back_position)
474 return is_closed() ? queue_op_status::closed : queue_op_status::empty;
478 next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
481 if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
482 next_local_pop_front_position))
489 result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
493 detail::spin_delay delay{};
494 size_type acquired_slot = local_pending_pop_front_position;
495 while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
497 acquired_slot = local_pending_pop_front_position;
502 return queue_op_status::success;
528template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
529template <
typename value2_t>
531inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
535 detail::spin_delay delay{};
540 return queue_op_status::closed;
543 size_type local_pending_push_back_position = this->pending_push_back_position;
548 size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
549 size_type local_pop_front_position = this->pop_front_position;
553 if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
558 if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
559 next_local_push_back_position))
563 auto it = std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
569 detail::spin_delay delay{};
571 size_type acquired_slot = local_pending_push_back_position;
573 !this->push_back_position.compare_exchange_weak(acquired_slot, next_local_push_back_position))
575 acquired_slot = local_pending_push_back_position;
579 return queue_op_status::success;
589 return queue_op_status::success;
593 return queue_op_status::full;
Provides various transformation traits used by the range module.
T current_exception(T... args)
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition new:54
seqan::stl::ranges::to to
Converts a range to a container. <dl class="no-api">This entity is not part of the SeqAn API....
Definition to.hpp:23
constexpr size_t size
The size of a type pack.
Definition type_pack/traits.hpp:143
A more refined container concept than seqan3::container.
The <new> header from C++17's standard library.
T rethrow_exception(T... args)
Adaptations of concepts from the standard library.