Skip to content

Commit 8f4bb9d

Browse files
committed
2 parents 8f6c93c + 1967a1d commit 8f4bb9d

File tree

1 file changed

+69
-47
lines changed

1 file changed

+69
-47
lines changed

include/LockFreeSpscQueue.h

Lines changed: 69 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -172,14 +172,6 @@ class LockFreeSpscQueue
172172
}
173173
}
174174

175-
/**
176-
* @brief Releases ownership of the transaction, preventing the destructor
177-
* from automatically committing the write.
178-
* @details This is an advanced feature used when another object, like a
179-
* WriteTransaction, needs to take over responsibility for the commit.
180-
*/
181-
void release() noexcept { m_owner_queue = nullptr; }
182-
183175
// This RAII object is move-only to ensure single ownership of a transaction.
184176
WriteScope(const WriteScope&) = delete;
185177
WriteScope& operator=(const WriteScope&) = delete;
@@ -551,38 +543,13 @@ class LockFreeSpscQueue
551543
*/
552544
[[nodiscard]] WriteScope prepare_write(size_t num_items_to_write)
553545
{
554-
// "Fast path" calculation using the producer's local cache of the read
555-
// pointer (which involves no cross-core communication) and `write_pos`.
556-
// Relaxed load is safe for the `write_pos` index as this is the only
557-
// thread that modifies it.
558-
const size_t current_write_pos
559-
= m_producer_data.write_pos.load(std::memory_order_relaxed);
560-
size_t available_space
561-
= m_capacity - (current_write_pos - m_producer_data.cached_read_pos);
562-
563-
// Note: The subtraction `current_write_pos - cached_read_pos`
564-
// calculates the number of items currently in the queue, even when the
565-
// 64-bit indices wrap around, due to the defined behavior of unsigned
566-
// integer arithmetic.
567-
568-
if (available_space < num_items_to_write) {
569-
// "Slow path": our cache is out of date.
570-
// Perform an expensive acquire load to get the true position from the consumer.
571-
m_producer_data.cached_read_pos
572-
= m_consumer_data.read_pos.load(std::memory_order_acquire);
573-
// Recalculate available space with the updated value.
574-
available_space
575-
= m_capacity - (current_write_pos - m_producer_data.cached_read_pos);
546+
auto [start_index, block_size1, start_index2, block_size2]
547+
= get_write_reservation(num_items_to_write);
548+
if (block_size1 + block_size2 == 0) {
549+
return { 0, 0, 0, 0, nullptr };
576550
}
577551

578-
const size_t items_to_write = std::min(num_items_to_write, available_space);
579-
if (items_to_write == 0) { return {0, 0, 0, 0, nullptr}; }
580-
581-
const size_t start_index = current_write_pos & m_capacity_mask;
582-
const size_t space_to_end = m_capacity - start_index;
583-
const size_t block_size1 = std::min(items_to_write, space_to_end);
584-
const size_t block_size2 = items_to_write - block_size1;
585-
return {start_index, block_size1, 0, block_size2, this};
552+
return { start_index, block_size1, start_index2, block_size2, this };
586553
}
587554

588555
/**
@@ -632,19 +599,20 @@ class LockFreeSpscQueue
632599
*/
633600
[[nodiscard]] std::optional<WriteTransaction> try_start_write(size_t num_items)
634601
{
635-
auto scope = prepare_write(num_items);
636-
if (scope.get_items_written() == 0) {
602+
auto [start_index, block_size1, start_index2, block_size2]
603+
= get_write_reservation(num_items);
604+
605+
const size_t items_reserved = block_size1 + block_size2;
606+
if (items_reserved == 0) {
637607
return std::nullopt;
638608
}
639609

640-
// Create the transaction using the spans from the scope.
641-
auto transaction = WriteTransaction(this, scope.get_block1(), scope.get_block2());
642-
643-
// Release the scope so its destructor does nothing.
644-
// The WriteTransaction is now solely responsible for the final commit.
645-
scope.release();
610+
auto block1 = m_buffer.subspan(start_index, block_size1);
611+
auto block2 = (block_size2 > 0)
612+
? m_buffer.subspan(start_index2, block_size2)
613+
: std::span<T>{};
646614

647-
return transaction;
615+
return WriteTransaction(this, block1, block2);
648616
}
649617

650618
/** @brief Returns the total capacity of the queue (the size of the buffer). */
@@ -729,6 +697,60 @@ class LockFreeSpscQueue
729697
friend struct WriteScope;
730698
friend struct ReadScope;
731699

700+
/**
701+
* @brief Performs the core logic to calculate a reservation for a write operation.
702+
* @details This is a private helper that centralizes the write reservation logic,
703+
* which is shared by `prepare_write` and `try_start_write`. It implements
704+
* the "fast path/slow path" optimization by first checking against the
705+
* producer's cached `read_pos` and only performing an expensive `acquire`
706+
* load on the consumer's true `read_pos` when necessary.
707+
* @note This function is responsible only for the calculation of a reservation;
708+
* it does not perform the final "commit" that makes the space available. While
709+
* it may update the producer's internal `cached_read_pos` as a performance
710+
* optimization, it never modifies the queue's main `write_pos` index.
711+
* The actual commit (advancing `write_pos`) is the exclusive responsibility
712+
* of the RAII scope objects (`WriteScope`, `WriteTransaction`).
713+
* @return A tuple containing {start_index1, block_size1, start_index2, block_size2}.
714+
* If no space is available, all values in the tuple will be zero.
715+
*/
716+
[[nodiscard]] std::tuple<size_t, size_t, size_t, size_t>
717+
get_write_reservation(size_t num_items_to_write) noexcept
718+
{
719+
// "Fast path" calculation using the producer's local cache of the read
720+
// pointer (which involves no cross-core communication) and `write_pos`.
721+
// Relaxed load is safe for the `write_pos` index as this is the only
722+
// thread that modifies it.
723+
const size_t current_write_pos
724+
= m_producer_data.write_pos.load(std::memory_order_relaxed);
725+
size_t available_space
726+
= m_capacity - (current_write_pos - m_producer_data.cached_read_pos);
727+
728+
// Note: The subtraction `current_write_pos - cached_read_pos`
729+
// calculates the number of items currently in the queue, even when the
730+
// 64-bit indices wrap around, due to the defined behavior of unsigned
731+
// integer arithmetic.
732+
733+
if (available_space < num_items_to_write) {
734+
// "Slow path": our cache is out of date.
735+
// Perform an expensive acquire load to get the true position from the consumer.
736+
m_producer_data.cached_read_pos
737+
= m_consumer_data.read_pos.load(std::memory_order_acquire);
738+
available_space = m_capacity - (current_write_pos - m_producer_data.cached_read_pos);
739+
}
740+
741+
const size_t items_to_reserve = std::min(num_items_to_write, available_space);
742+
743+
if (items_to_reserve == 0) {
744+
return { 0, 0, 0, 0 };
745+
}
746+
747+
const size_t start_index = current_write_pos & m_capacity_mask;
748+
const size_t space_to_end = m_capacity - start_index;
749+
const size_t block_size1 = std::min(items_to_reserve, space_to_end);
750+
const size_t block_size2 = items_to_reserve - block_size1;
751+
return { start_index, block_size1, 0, block_size2 };
752+
}
753+
732754
void commit_write(size_t num_items_written) noexcept
733755
{
734756
// This function uses a load-then-store sequence, which is a deliberate

0 commit comments

Comments
 (0)