776 static const size_t BLOCK_SIZE =
static_cast<size_t>(Traits::BLOCK_SIZE);
784#pragma warning(disable: 4307)
785#pragma warning(disable: 4309)
792 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value,
"Traits::size_t must be an unsigned integral type");
793 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value,
"Traits::index_t must be an unsigned integral type");
794 static_assert(
sizeof(
index_t) >=
sizeof(
size_t),
"Traits::index_t must be at least as wide as Traits::size_t");
814 : producerListTail(nullptr),
816 initialBlockPoolIndex(0),
817 nextExplicitConsumerId(0),
818 globalExplicitConsumerOffset(0)
820 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
821 populate_initial_implicit_producer_hash();
822 populate_initial_block_list(capacity /
BLOCK_SIZE + ((capacity & (
BLOCK_SIZE - 1)) == 0 ? 0 : 1));
824#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
829 explicitProducers.store(
nullptr, std::memory_order_relaxed);
830 implicitProducers.store(
nullptr, std::memory_order_relaxed);
837 ConcurrentQueue(
size_t minCapacity,
size_t maxExplicitProducers,
size_t maxImplicitProducers)
838 : producerListTail(nullptr),
840 initialBlockPoolIndex(0),
841 nextExplicitConsumerId(0),
842 globalExplicitConsumerOffset(0)
844 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
845 populate_initial_implicit_producer_hash();
846 size_t blocks = (((minCapacity +
BLOCK_SIZE - 1) /
BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers);
847 populate_initial_block_list(blocks);
849#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
850 explicitProducers.store(
nullptr, std::memory_order_relaxed);
851 implicitProducers.store(
nullptr, std::memory_order_relaxed);
861 auto ptr = producerListTail.load(std::memory_order_relaxed);
862 while (ptr !=
nullptr) {
863 auto next = ptr->next_prod();
864 if (ptr->token !=
nullptr) {
865 ptr->token->producer =
nullptr;
873 auto hash = implicitProducerHash.load(std::memory_order_relaxed);
874 while (hash !=
nullptr) {
875 auto prev = hash->prev;
876 if (prev !=
nullptr) {
877 for (
size_t i = 0; i != hash->capacity; ++i) {
878 hash->entries[i].~ImplicitProducerKVP();
880 hash->~ImplicitProducerHash();
881 (Traits::free)(hash);
888 auto block = freeList.head_unsafe();
889 while (block !=
nullptr) {
890 auto next = block->freeListNext.load(std::memory_order_relaxed);
891 if (block->dynamicallyAllocated) {
898 destroy_array(initialBlockPool, initialBlockPoolSize);
912 : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
913 producerCount(other.producerCount.load(std::memory_order_relaxed)),
914 initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
915 initialBlockPool(other.initialBlockPool),
916 initialBlockPoolSize(other.initialBlockPoolSize),
917 freeList(std::move(other.freeList)),
918 nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
919 globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
922 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
923 populate_initial_implicit_producer_hash();
924 swap_implicit_producer_hashes(other);
926 other.producerListTail.store(
nullptr, std::memory_order_relaxed);
927 other.producerCount.store(0, std::memory_order_relaxed);
928 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
929 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
931#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
932 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
933 other.explicitProducers.store(
nullptr, std::memory_order_relaxed);
934 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
935 other.implicitProducers.store(
nullptr, std::memory_order_relaxed);
938 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
939 other.initialBlockPoolSize = 0;
940 other.initialBlockPool =
nullptr;
947 return swap_internal(other);
957 swap_internal(other);
963 if (
this == &other) {
967 details::swap_relaxed(producerListTail, other.producerListTail);
968 details::swap_relaxed(producerCount, other.producerCount);
969 details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
970 std::swap(initialBlockPool, other.initialBlockPool);
971 std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
972 freeList.swap(other.freeList);
973 details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
974 details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
976 swap_implicit_producer_hashes(other);
979 other.reown_producers();
981#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
982 details::swap_relaxed(explicitProducers, other.explicitProducers);
983 details::swap_relaxed(implicitProducers, other.implicitProducers);
998 else return inner_enqueue<CanAlloc>(item);
1009 else return inner_enqueue<CanAlloc>(std::move(item));
1018 return inner_enqueue<CanAlloc>(token, item);
1027 return inner_enqueue<CanAlloc>(token, std::move(item));
1036 template<
typename It>
1040 else return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
1049 template<
typename It>
1052 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
1063 else return inner_enqueue<CannotAlloc>(item);
1074 else return inner_enqueue<CannotAlloc>(std::move(item));
1082 return inner_enqueue<CannotAlloc>(token, item);
1090 return inner_enqueue<CannotAlloc>(token, std::move(item));
1100 template<
typename It>
1104 else return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1112 template<
typename It>
1115 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1124 template<
typename U>
1129 size_t nonEmptyCount = 0;
1130 ProducerBase* best =
nullptr;
1131 size_t bestSize = 0;
1132 for (
auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
1133 auto size = ptr->size_approx();
1135 if (
size > bestSize) {
1145 if (nonEmptyCount > 0) {
1146 if ((details::likely)(best->dequeue(item))) {
1149 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1150 if (ptr != best && ptr->dequeue(item)) {
1167 template<
typename U>
1170 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1171 if (ptr->dequeue(item)) {
1182 template<
typename U>
1191 if (token.desiredProducer ==
nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1192 if (!update_current_producer_after_rotation(token)) {
1199 if (
static_cast<ProducerBase*
>(token.currentProducer)->dequeue(item)) {
1201 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1206 auto tail = producerListTail.load(std::memory_order_acquire);
1207 auto ptr =
static_cast<ProducerBase*
>(token.currentProducer)->next_prod();
1208 if (ptr ==
nullptr) {
1211 while (ptr !=
static_cast<ProducerBase*
>(token.currentProducer)) {
1212 if (ptr->dequeue(item)) {
1213 token.currentProducer = ptr;
1214 token.itemsConsumedFromCurrent = 1;
1217 ptr = ptr->next_prod();
1218 if (ptr ==
nullptr) {
1230 template<
typename It>
1234 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1235 count += ptr->dequeue_bulk(itemFirst, max - count);
1248 template<
typename It>
1251 if (token.desiredProducer ==
nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1252 if (!update_current_producer_after_rotation(token)) {
1257 size_t count =
static_cast<ProducerBase*
>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1260 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1264 token.itemsConsumedFromCurrent +=
static_cast<std::uint32_t
>(count);
1267 auto tail = producerListTail.load(std::memory_order_acquire);
1268 auto ptr =
static_cast<ProducerBase*
>(token.currentProducer)->next_prod();
1269 if (ptr ==
nullptr) {
1272 while (ptr !=
static_cast<ProducerBase*
>(token.currentProducer)) {
1273 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1275 if (dequeued != 0) {
1276 token.currentProducer = ptr;
1277 token.itemsConsumedFromCurrent =
static_cast<std::uint32_t
>(dequeued);
1279 if (dequeued == max) {
1283 ptr = ptr->next_prod();
1284 if (ptr ==
nullptr) {
1299 template<
typename U>
1302 return static_cast<ExplicitProducer*
>(producer.
producer)->dequeue(item);
1312 template<
typename It>
1315 return static_cast<ExplicitProducer*
>(producer.
producer)->dequeue_bulk(itemFirst, max);
1328 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
1329 size += ptr->size_approx();
1353 struct ExplicitProducer;
1354 friend struct ExplicitProducer;
1355 struct ImplicitProducer;
1356 friend struct ImplicitProducer;
1359 enum AllocationMode { CanAlloc, CannotAlloc };
1366 template<AllocationMode canAlloc,
typename U>
1372 template<AllocationMode canAlloc,
typename U>
1373 inline bool inner_enqueue(U&& element)
1375 auto producer = get_or_add_implicit_producer();
1376 return producer ==
nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template
enqueue<canAlloc>(std::forward<U>(element));
1379 template<AllocationMode canAlloc,
typename It>
1380 inline bool inner_enqueue_bulk(
producer_token_t const& token, It itemFirst,
size_t count)
1385 template<AllocationMode canAlloc,
typename It>
1386 inline bool inner_enqueue_bulk(It itemFirst,
size_t count)
1388 auto producer = get_or_add_implicit_producer();
1389 return producer ==
nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template
enqueue_bulk<canAlloc>(itemFirst, count);
1392 inline bool update_current_producer_after_rotation(
consumer_token_t& token)
1395 auto tail = producerListTail.load(std::memory_order_acquire);
1396 if (token.desiredProducer ==
nullptr && tail ==
nullptr) {
1399 auto prodCount = producerCount.load(std::memory_order_relaxed);
1400 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1401 if ((details::unlikely)(token.desiredProducer ==
nullptr)) {
1405 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1406 token.desiredProducer = tail;
1407 for (std::uint32_t i = 0;
i != offset; ++
i) {
1408 token.desiredProducer =
static_cast<ProducerBase*
>(token.desiredProducer)->next_prod();
1409 if (token.desiredProducer ==
nullptr) {
1410 token.desiredProducer = tail;
1415 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1416 if (delta >= prodCount) {
1417 delta = delta % prodCount;
1419 for (std::uint32_t i = 0;
i != delta; ++
i) {
1420 token.desiredProducer =
static_cast<ProducerBase*
>(token.desiredProducer)->next_prod();
1421 if (token.desiredProducer ==
nullptr) {
1422 token.desiredProducer = tail;
1426 token.lastKnownGlobalOffset = globalOffset;
1427 token.currentProducer = token.desiredProducer;
1428 token.itemsConsumedFromCurrent = 0;
1437 template <
typename N>
1440 FreeListNode() : freeListRefs(0), freeListNext(nullptr) { }
1442 std::atomic<std::uint32_t> freeListRefs;
1443 std::atomic<N*> freeListNext;
1449 template<
typename N>
1452 FreeList() : freeListHead(nullptr) { }
1453 FreeList(FreeList&& other) : freeListHead(other.freeListHead.
load(
std::memory_order_relaxed)) { other.freeListHead.store(
nullptr, std::memory_order_relaxed); }
1454 void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1459 inline void add(N* node)
1461#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1462 debug::DebugLock lock(mutex);
1466 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1469 add_knowing_refcount_is_zero(node);
1475#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1476 debug::DebugLock lock(mutex);
1478 auto head = freeListHead.load(std::memory_order_acquire);
1479 while (head !=
nullptr) {
1480 auto prevHead = head;
1481 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1482 if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
1483 head = freeListHead.load(std::memory_order_acquire);
1489 auto next = head->freeListNext.load(std::memory_order_relaxed);
1490 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1493 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1496 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1503 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1504 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1505 add_knowing_refcount_is_zero(prevHead);
1513 N* head_unsafe()
const {
return freeListHead.load(std::memory_order_relaxed); }
1516 inline void add_knowing_refcount_is_zero(N* node)
1526 auto head = freeListHead.load(std::memory_order_relaxed);
1528 node->freeListNext.store(head, std::memory_order_relaxed);
1529 node->freeListRefs.store(1, std::memory_order_release);
1530 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
1532 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1542 std::atomic<N*> freeListHead;
1544 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1545 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1547#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1548 debug::DebugMutex mutex;
1557 enum InnerQueueContext { implicit_context = 0, explicit_context = 1 };
1562 : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), dynamicallyAllocated(true)
1564#ifdef MCDBGQ_TRACKMEM
1569 template<InnerQueueContext context>
1570 inline bool is_empty()
const
1575 if (!emptyFlags[i].
load(std::memory_order_relaxed)) {
1581 std::atomic_thread_fence(std::memory_order_acquire);
1586 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) ==
BLOCK_SIZE) {
1587 std::atomic_thread_fence(std::memory_order_acquire);
1590 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <=
BLOCK_SIZE);
1596 template<InnerQueueContext context>
1607 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1615 template<InnerQueueContext context>
1620 std::atomic_thread_fence(std::memory_order_release);
1622 for (
size_t j = 0;
j !=
count; ++
j) {
1623 assert(!emptyFlags[i + j].
load(std::memory_order_relaxed));
1624 emptyFlags[
i +
j].store(
true, std::memory_order_relaxed);
1630 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1636 template<InnerQueueContext context>
1637 inline void set_all_empty()
1642 emptyFlags[
i].store(
true, std::memory_order_relaxed);
1647 elementsCompletelyDequeued.store(
BLOCK_SIZE, std::memory_order_relaxed);
1651 template<InnerQueueContext context>
1652 inline void reset_empty()
1657 emptyFlags[
i].store(
false, std::memory_order_relaxed);
1662 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1670 static_assert(std::alignment_of<T>::value <=
sizeof(T),
"The queue does not support types with an alignment greater than their size at this time");
1674 std::atomic<size_t> elementsCompletelyDequeued;
1677 std::atomic<std::uint32_t> freeListRefs;
1678 std::atomic<Block*> freeListNext;
1679 bool dynamicallyAllocated;
1681#ifdef MCDBGQ_TRACKMEM
1685 static_assert(std::alignment_of<Block>::value >= std::alignment_of<T>::value,
"Internal error: Blocks must be at least as aligned as the type they are wrapping");
1688#ifdef MCDBGQ_TRACKMEM
1698 struct ProducerBase :
public details::ConcurrentQueueProducerTypelessBase
1703 dequeueOptimisticCount(0),
1704 dequeueOvercommit(0),
1706 isExplicit(isExplicit_),
1711 virtual ~ProducerBase() { }
1713 template<
typename U>
1714 inline bool dequeue(U& element)
1724 template<
typename It>
1725 inline size_t dequeue_bulk(It& itemFirst,
size_t max)
1735 inline ProducerBase* next_prod()
const {
return static_cast<ProducerBase*
>(
next); }
1737 inline size_t size_approx()
const
1739 auto tail = tailIndex.load(std::memory_order_relaxed);
1740 auto head = headIndex.load(std::memory_order_relaxed);
1741 return details::circular_less_than(head, tail) ?
static_cast<size_t>(tail - head) : 0;
1744 inline index_t getTail()
const {
return tailIndex.load(std::memory_order_relaxed); }
1746 std::atomic<index_t> tailIndex;
1747 std::atomic<index_t> headIndex;
1749 std::atomic<index_t> dequeueOptimisticCount;
1750 std::atomic<index_t> dequeueOvercommit;
1759#ifdef MCDBGQ_TRACKMEM
1760 friend struct MemStats;
1772 ProducerBase(parent_, true),
1773 blockIndex(nullptr),
1774 pr_blockIndexSlotsUsed(0),
1776 pr_blockIndexFront(0),
1777 pr_blockIndexEntries(nullptr),
1778 pr_blockIndexRaw(nullptr)
1780 size_t poolBasedIndexSize = details::ceil_to_pow_2(parent_->initialBlockPoolSize) >> 1;
1781 if (poolBasedIndexSize > pr_blockIndexSize) {
1782 pr_blockIndexSize = poolBasedIndexSize;
1793 if (this->tailBlock !=
nullptr) {
1795 Block* halfDequeuedBlock =
nullptr;
1796 if ((this->headIndex.load(std::memory_order_relaxed) &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0) {
1799 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1800 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base +
BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
1801 i = (
i + 1) & (pr_blockIndexSize - 1);
1803 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
1804 halfDequeuedBlock = pr_blockIndexEntries[
i].block;
1808 auto block = this->tailBlock;
1810 block = block->next;
1811 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1816 if (block == halfDequeuedBlock) {
1817 i =
static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) &
static_cast<index_t>(
BLOCK_SIZE - 1));
1821 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) &
static_cast<index_t>(
BLOCK_SIZE - 1)) == 0 ?
BLOCK_SIZE :
static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) &
static_cast<index_t>(
BLOCK_SIZE - 1));
1822 while (i !=
BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1823 (*block)[
i++]->~T();
1825 }
while (block != this->tailBlock);
1829 if (this->tailBlock !=
nullptr) {
1830 auto block = this->tailBlock;
1832 auto nextBlock = block->next;
1833 this->parent->add_block_to_free_list(block);
1835 }
while (block != this->tailBlock);
1839 auto header =
static_cast<BlockIndexHeader*
>(pr_blockIndexRaw);
1840 while (header !=
nullptr) {
1841 auto prev =
static_cast<BlockIndexHeader*
>(header->prev);
1842 header->~BlockIndexHeader();
1843 (Traits::free)(header);
1848 template<AllocationMode allocMode,
typename U>
1849 inline bool enqueue(U&& element)
1851 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1852 index_t newTailIndex = 1 + currentTailIndex;
1855 auto startBlock = this->tailBlock;
1856 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1857 if (this->tailBlock !=
nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1859 this->tailBlock = this->tailBlock->next;
1860 this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1872 auto head = this->headIndex.load(std::memory_order_relaxed);
1873 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1874 if (!details::circular_less_than<index_t>(head, currentTailIndex +
BLOCK_SIZE)
1882 if (pr_blockIndexRaw ==
nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1890 else if (!new_block_index(pr_blockIndexSlotsUsed)) {
1896 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1897 if (newBlock ==
nullptr) {
1900#ifdef MCDBGQ_TRACKMEM
1901 newBlock->owner =
this;
1903 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1904 if (this->tailBlock ==
nullptr) {
1905 newBlock->next = newBlock;
1908 newBlock->next = this->tailBlock->next;
1909 this->tailBlock->next = newBlock;
1911 this->tailBlock = newBlock;
1912 ++pr_blockIndexSlotsUsed;
1919 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1924 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1925 this->tailBlock = startBlock ==
nullptr ? this->tailBlock : startBlock;
1931 (void)originalBlockIndexSlotsUsed;
1935 auto&
entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1936 entry.base = currentTailIndex;
1937 entry.block = this->tailBlock;
1938 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
1939 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
1942 this->tailIndex.store(newTailIndex, std::memory_order_release);
1948 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1950 this->tailIndex.store(newTailIndex, std::memory_order_release);
1954 template<
typename U>
1955 bool dequeue(U& element)
1957 auto tail = this->tailIndex.load(std::memory_order_relaxed);
1958 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
1959 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
1976 std::atomic_thread_fence(std::memory_order_acquire);
1979 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
1991 tail = this->tailIndex.load(std::memory_order_acquire);
1992 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2003 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2008 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2009 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2014 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2015 auto blockBaseIndex = index & ~static_cast<index_t>(
BLOCK_SIZE - 1);
2016 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(blockBaseIndex - headBase) /
static_cast<typename std::make_signed<index_t>::type
>(
BLOCK_SIZE));
2017 auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
2020 auto& el = *((*block)[index]);
2030 (*block)[index]->~T();
2031 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2033 } guard = { block, index };
2035 element = std::move(el);
2038 element = std::move(el);
2040 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2047 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2054 template<AllocationMode allocMode,
typename It>
2060 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2061 auto startBlock = this->tailBlock;
2062 auto originalBlockIndexFront = pr_blockIndexFront;
2063 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2065 Block* firstAllocatedBlock =
nullptr;
2068 size_t blockBaseDiff = ((startTailIndex +
count - 1) & ~static_cast<index_t>(
BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~
static_cast<index_t>(
BLOCK_SIZE - 1));
2070 if (blockBaseDiff > 0) {
2072 while (blockBaseDiff > 0 && this->tailBlock !=
nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2076 this->tailBlock = this->tailBlock->next;
2077 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? this->tailBlock : firstAllocatedBlock;
2079 auto&
entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2080 entry.base = currentTailIndex;
2081 entry.block = this->tailBlock;
2082 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2086 while (blockBaseDiff > 0) {
2090 auto head = this->headIndex.load(std::memory_order_relaxed);
2091 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2093 if (pr_blockIndexRaw ==
nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2096 pr_blockIndexFront = originalBlockIndexFront;
2097 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2098 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2101 else if (full || !new_block_index(originalBlockIndexSlotsUsed)) {
2103 pr_blockIndexFront = originalBlockIndexFront;
2104 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2105 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2112 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2116 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2117 if (newBlock ==
nullptr) {
2118 pr_blockIndexFront = originalBlockIndexFront;
2119 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2120 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2124#ifdef MCDBGQ_TRACKMEM
2125 newBlock->owner =
this;
2127 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2128 if (this->tailBlock ==
nullptr) {
2129 newBlock->next = newBlock;
2132 newBlock->next = this->tailBlock->next;
2133 this->tailBlock->next = newBlock;
2135 this->tailBlock = newBlock;
2136 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? this->tailBlock : firstAllocatedBlock;
2138 ++pr_blockIndexSlotsUsed;
2140 auto&
entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2141 entry.base = currentTailIndex;
2142 entry.block = this->tailBlock;
2143 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2148 auto block = firstAllocatedBlock;
2150 block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2151 if (block == this->tailBlock) {
2154 block = block->next;
2158 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2164 currentTailIndex = startTailIndex;
2165 auto endBlock = this->tailBlock;
2166 this->tailBlock = startBlock;
2167 assert((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr || count == 0);
2168 if ((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock !=
nullptr) {
2169 this->tailBlock = firstAllocatedBlock;
2173 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2174 stopIndex = newTailIndex;
2177 while (currentTailIndex != stopIndex) {
2178 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2183 while (currentTailIndex != stopIndex) {
2191 new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<!
MOODYCAMEL_NOEXCEPT_CTOR(T,
decltype(*itemFirst),
new (
static_cast<T*
>(
nullptr)) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2200 auto constructedStopIndex = currentTailIndex;
2201 auto lastBlockEnqueued = this->tailBlock;
2203 pr_blockIndexFront = originalBlockIndexFront;
2204 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2205 this->tailBlock = startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2207 if (!details::is_trivially_destructible<T>::value) {
2208 auto block = startBlock;
2210 block = firstAllocatedBlock;
2212 currentTailIndex = startTailIndex;
2215 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2216 stopIndex = constructedStopIndex;
2218 while (currentTailIndex != stopIndex) {
2219 (*block)[currentTailIndex++]->~T();
2221 if (block == lastBlockEnqueued) {
2224 block = block->next;
2231 if (this->tailBlock == endBlock) {
2232 assert(currentTailIndex == newTailIndex);
2235 this->tailBlock = this->tailBlock->next;
2239 if (firstAllocatedBlock !=
nullptr)
2240 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2243 this->tailIndex.store(newTailIndex, std::memory_order_release);
2247 template<
typename It>
2248 size_t dequeue_bulk(It& itemFirst,
size_t max)
2250 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2251 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2252 auto desiredCount =
static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2253 if (details::circular_less_than<size_t>(0, desiredCount)) {
2254 desiredCount = desiredCount < max ? desiredCount : max;
2255 std::atomic_thread_fence(std::memory_order_acquire);
2257 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2259 tail = this->tailIndex.load(std::memory_order_acquire);
2260 auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
2261 if (details::circular_less_than<size_t>(0, actualCount)) {
2262 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2263 if (actualCount < desiredCount) {
2264 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2269 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2272 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2273 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2275 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2276 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(
BLOCK_SIZE - 1);
2277 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(firstBlockBaseIndex - headBase) /
static_cast<typename std::make_signed<index_t>::type
>(
BLOCK_SIZE));
2278 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2281 auto index = firstIndex;
2283 auto firstIndexInBlock = index;
2285 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t>(actualCount), endIndex) ? firstIndex +
static_cast<index_t>(actualCount) : endIndex;
2286 auto block = localBlockIndex->entries[indexIndex].block;
2288 while (index != endIndex) {
2289 auto& el = *((*block)[index]);
2290 *itemFirst++ = std::move(el);
2297 while (index != endIndex) {
2298 auto& el = *((*block)[index]);
2299 *itemFirst = std::move(el);
2310 block = localBlockIndex->entries[indexIndex].block;
2311 while (index != endIndex) {
2312 (*block)[index++]->~T();
2314 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
2315 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2317 firstIndexInBlock = index;
2319 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t>(actualCount), endIndex) ? firstIndex +
static_cast<index_t>(actualCount) : endIndex;
2320 }
while (index != firstIndex + actualCount);
2325 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
2326 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2327 }
while (index != firstIndex + actualCount);
2333 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2341 struct BlockIndexEntry
2347 struct BlockIndexHeader
2350 std::atomic<size_t> front;
2351 BlockIndexEntry* entries;
2356 bool new_block_index(
size_t numberOfFilledSlotsToExpose)
2358 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2361 pr_blockIndexSize <<= 1;
2362 auto newRawPtr =
static_cast<char*
>((Traits::malloc)(
sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 +
sizeof(BlockIndexEntry) * pr_blockIndexSize));
2363 if (newRawPtr ==
nullptr) {
2364 pr_blockIndexSize >>= 1;
2368 auto newBlockIndexEntries =
reinterpret_cast<BlockIndexEntry*
>(details::align_for<BlockIndexEntry>(newRawPtr +
sizeof(BlockIndexHeader)));
2372 if (pr_blockIndexSlotsUsed != 0) {
2373 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2375 newBlockIndexEntries[
j++] = pr_blockIndexEntries[
i];
2376 i = (
i + 1) & prevBlockSizeMask;
2377 }
while (i != pr_blockIndexFront);
2381 auto header =
new (newRawPtr) BlockIndexHeader;
2382 header->size = pr_blockIndexSize;
2383 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2384 header->entries = newBlockIndexEntries;
2385 header->prev = pr_blockIndexRaw;
2387 pr_blockIndexFront =
j;
2388 pr_blockIndexEntries = newBlockIndexEntries;
2389 pr_blockIndexRaw = newRawPtr;
2390 blockIndex.store(header, std::memory_order_release);
2396 std::atomic<BlockIndexHeader*> blockIndex;
2399 size_t pr_blockIndexSlotsUsed;
2400 size_t pr_blockIndexSize;
2401 size_t pr_blockIndexFront;
2402 BlockIndexEntry* pr_blockIndexEntries;
2403 void* pr_blockIndexRaw;
2405#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2411#ifdef MCDBGQ_TRACKMEM
2412 friend struct MemStats;
2424 ProducerBase(parent_, false),
2438#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2440 if (!this->
inactive.load(std::memory_order_relaxed)) {
2446 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2447 auto index = this->headIndex.load(std::memory_order_relaxed);
2448 Block* block =
nullptr;
2449 assert(index == tail || details::circular_less_than(index, tail));
2450 bool forceFreeLastBlock = index != tail;
2451 while (index != tail) {
2453 if (block !=
nullptr) {
2455 this->parent->add_block_to_free_list(block);
2458 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2461 ((*block)[index])->~T();
2467 if (this->tailBlock !=
nullptr && (forceFreeLastBlock || (tail &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0)) {
2468 this->parent->add_block_to_free_list(this->tailBlock);
2472 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2473 if (localBlockIndex !=
nullptr) {
2474 for (
size_t i = 0;
i != localBlockIndex->capacity; ++
i) {
2475 localBlockIndex->index[
i]->~BlockIndexEntry();
2478 auto prev = localBlockIndex->prev;
2479 localBlockIndex->~BlockIndexHeader();
2480 (Traits::free)(localBlockIndex);
2481 localBlockIndex = prev;
2482 }
while (localBlockIndex !=
nullptr);
2486 template<AllocationMode allocMode,
typename U>
2487 inline bool enqueue(U&& element)
2489 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2490 index_t newTailIndex = 1 + currentTailIndex;
2493 auto head = this->headIndex.load(std::memory_order_relaxed);
2494 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2498#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2499 debug::DebugLock lock(mutex);
2502 BlockIndexEntry* idxEntry;
2503 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2508 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2509 if (newBlock ==
nullptr) {
2510 rewind_block_index_tail();
2511 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2514#ifdef MCDBGQ_TRACKMEM
2515 newBlock->owner =
this;
2517 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2522 new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2525 rewind_block_index_tail();
2526 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2527 this->parent->add_block_to_free_list(newBlock);
2533 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2535 this->tailBlock = newBlock;
2538 this->tailIndex.store(newTailIndex, std::memory_order_release);
2544 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2546 this->tailIndex.store(newTailIndex, std::memory_order_release);
2550 template<
typename U>
2551 bool dequeue(U& element)
2554 index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2555 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2556 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2557 std::atomic_thread_fence(std::memory_order_acquire);
2559 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2560 tail = this->tailIndex.load(std::memory_order_acquire);
2561 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2562 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2565 auto entry = get_block_index_entry_for_index(index);
2568 auto block =
entry->value.load(std::memory_order_relaxed);
2569 auto& el = *((*block)[index]);
2572#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2575 debug::DebugLock lock(producer->mutex);
2580 BlockIndexEntry*
entry;
2581 ConcurrentQueue* parent;
2585 (*block)[index]->~T();
2586 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2587 entry->value.store(
nullptr, std::memory_order_relaxed);
2588 parent->add_block_to_free_list(block);
2591 } guard = { block, index,
entry, this->parent };
2593 element = std::move(el);
2596 element = std::move(el);
2599 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2601#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2602 debug::DebugLock lock(mutex);
2605 entry->value.store(
nullptr, std::memory_order_relaxed);
2607 this->parent->add_block_to_free_list(block);
2614 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2622#pragma warning(push)
2623#pragma warning(disable: 4706)
2625 template<AllocationMode allocMode,
typename It>
2626 bool enqueue_bulk(It itemFirst,
size_t count)
2637 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2638 auto startBlock = this->tailBlock;
2639 Block* firstAllocatedBlock =
nullptr;
2640 auto endBlock = this->tailBlock;
2643 size_t blockBaseDiff = ((startTailIndex +
count - 1) & ~static_cast<index_t>(
BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~
static_cast<index_t>(
BLOCK_SIZE - 1));
2645 if (blockBaseDiff > 0) {
2646#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2647 debug::DebugLock lock(mutex);
2654 BlockIndexEntry* idxEntry =
nullptr;
2656 bool indexInserted =
false;
2657 auto head = this->headIndex.load(std::memory_order_relaxed);
2658 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2661 if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) ==
nullptr) {
2664 if (indexInserted) {
2665 rewind_block_index_tail();
2666 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2668 currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t>(
BLOCK_SIZE - 1);
2669 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->next) {
2671 idxEntry = get_block_index_entry_for_index(currentTailIndex);
2672 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2673 rewind_block_index_tail();
2675 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2676 this->tailBlock = startBlock;
2681#ifdef MCDBGQ_TRACKMEM
2682 newBlock->owner =
this;
2684 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2685 newBlock->next =
nullptr;
2688 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2692 if ((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr) {
2693 assert(this->tailBlock !=
nullptr);
2694 this->tailBlock->next = newBlock;
2696 this->tailBlock = newBlock;
2697 endBlock = newBlock;
2698 firstAllocatedBlock = firstAllocatedBlock ==
nullptr ? newBlock : firstAllocatedBlock;
2699 }
while (blockBaseDiff > 0);
2704 currentTailIndex = startTailIndex;
2705 this->tailBlock = startBlock;
2706 assert((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock !=
nullptr || count == 0);
2707 if ((startTailIndex &
static_cast<index_t>(
BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock !=
nullptr) {
2708 this->tailBlock = firstAllocatedBlock;
2712 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2713 stopIndex = newTailIndex;
2716 while (currentTailIndex != stopIndex) {
2717 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2722 while (currentTailIndex != stopIndex) {
2723 new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<!
MOODYCAMEL_NOEXCEPT_CTOR(T,
decltype(*itemFirst),
new (
static_cast<T*
>(
nullptr)) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2729 auto constructedStopIndex = currentTailIndex;
2730 auto lastBlockEnqueued = this->tailBlock;
2732 if (!details::is_trivially_destructible<T>::value) {
2733 auto block = startBlock;
2735 block = firstAllocatedBlock;
2737 currentTailIndex = startTailIndex;
2740 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2741 stopIndex = constructedStopIndex;
2743 while (currentTailIndex != stopIndex) {
2744 (*block)[currentTailIndex++]->~T();
2746 if (block == lastBlockEnqueued) {
2749 block = block->next;
2753 currentTailIndex = (startTailIndex - 1) & ~
static_cast<index_t>(
BLOCK_SIZE - 1);
2754 for (
auto block = firstAllocatedBlock; block !=
nullptr; block = block->next) {
2756 auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2757 idxEntry->value.store(
nullptr, std::memory_order_relaxed);
2758 rewind_block_index_tail();
2760 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2761 this->tailBlock = startBlock;
2766 if (this->tailBlock == endBlock) {
2767 assert(currentTailIndex == newTailIndex);
2770 this->tailBlock = this->tailBlock->next;
2772 this->tailIndex.store(newTailIndex, std::memory_order_release);
2779 template<
typename It>
2780 size_t dequeue_bulk(It& itemFirst,
size_t max)
2782 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2783 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2784 auto desiredCount =
static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2785 if (details::circular_less_than<size_t>(0, desiredCount)) {
2786 desiredCount = desiredCount < max ? desiredCount : max;
2787 std::atomic_thread_fence(std::memory_order_acquire);
2789 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2791 tail = this->tailIndex.load(std::memory_order_acquire);
2792 auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
2793 if (details::circular_less_than<size_t>(0, actualCount)) {
2794 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2795 if (actualCount < desiredCount) {
2796 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2801 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2804 auto index = firstIndex;
2805 BlockIndexHeader* localBlockIndex;
2806 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2808 auto blockStartIndex = index;
2810 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t>(actualCount), endIndex) ? firstIndex +
static_cast<index_t>(actualCount) : endIndex;
2812 auto entry = localBlockIndex->index[indexIndex];
2813 auto block =
entry->value.load(std::memory_order_relaxed);
2815 while (index != endIndex) {
2816 auto& el = *((*block)[index]);
2817 *itemFirst++ = std::move(el);
2824 while (index != endIndex) {
2825 auto& el = *((*block)[index]);
2826 *itemFirst = std::move(el);
2834 entry = localBlockIndex->index[indexIndex];
2835 block =
entry->value.load(std::memory_order_relaxed);
2836 while (index != endIndex) {
2837 (*block)[index++]->~T();
2840 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex,
static_cast<size_t>(endIndex - blockStartIndex))) {
2841#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2842 debug::DebugLock lock(mutex);
2844 entry->value.store(
nullptr, std::memory_order_relaxed);
2845 this->parent->add_block_to_free_list(block);
2847 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2849 blockStartIndex = index;
2851 endIndex = details::circular_less_than<index_t>(firstIndex +
static_cast<index_t>(actualCount), endIndex) ? firstIndex +
static_cast<index_t>(actualCount) : endIndex;
2852 }
while (index != firstIndex + actualCount);
2857 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex,
static_cast<size_t>(endIndex - blockStartIndex))) {
2859#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2860 debug::DebugLock lock(mutex);
2864 entry->value.store(
nullptr, std::memory_order_relaxed);
2866 this->parent->add_block_to_free_list(block);
2868 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2869 }
while (index != firstIndex + actualCount);
2874 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2883 static const index_t INVALID_BLOCK_BASE = 1;
2885 struct BlockIndexEntry
2887 std::atomic<index_t> key;
2888 std::atomic<Block*> value;
2891 struct BlockIndexHeader
2894 std::atomic<size_t> tail;
2895 BlockIndexEntry* entries;
2896 BlockIndexEntry** index;
2897 BlockIndexHeader* prev;
2900 template<AllocationMode allocMode>
2901 inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry,
index_t blockStartIndex)
2903 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2904 if (localBlockIndex ==
nullptr) {
2907 size_t newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2908 idxEntry = localBlockIndex->index[newTail];
2909 if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
2910 idxEntry->value.load(std::memory_order_relaxed) ==
nullptr) {
2912 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2913 localBlockIndex->tail.store(newTail, std::memory_order_release);
2921 else if (!new_block_index()) {
2925 localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2926 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2927 idxEntry = localBlockIndex->index[newTail];
2928 assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
2929 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2930 localBlockIndex->tail.store(newTail, std::memory_order_release);
2935 inline void rewind_block_index_tail()
2937 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2938 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
2941 inline BlockIndexEntry* get_block_index_entry_for_index(
index_t index)
const
2943 BlockIndexHeader* localBlockIndex;
2944 auto idx = get_block_index_index_for_index(index, localBlockIndex);
2945 return localBlockIndex->index[idx];
2948 inline size_t get_block_index_index_for_index(
index_t index, BlockIndexHeader*& localBlockIndex)
const
2950#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2951 debug::DebugLock lock(mutex);
2953 index &= ~static_cast<index_t>(
BLOCK_SIZE - 1);
2954 localBlockIndex = blockIndex.load(std::memory_order_acquire);
2955 auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
2956 auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
2957 assert(tailBase != INVALID_BLOCK_BASE);
2960 auto offset =
static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type
>(index - tailBase) /
static_cast<typename std::make_signed<index_t>::type
>(
BLOCK_SIZE));
2961 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
2962 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) !=
nullptr);
2966 bool new_block_index()
2968 auto prev = blockIndex.load(std::memory_order_relaxed);
2969 size_t prevCapacity = prev ==
nullptr ? 0 : prev->capacity;
2970 auto entryCount = prev ==
nullptr ? nextBlockIndexCapacity : prevCapacity;
2971 auto raw =
static_cast<char*
>((Traits::malloc)(
2972 sizeof(BlockIndexHeader) +
2973 std::alignment_of<BlockIndexEntry>::value - 1 +
sizeof(BlockIndexEntry) * entryCount +
2974 std::alignment_of<BlockIndexEntry*>::value - 1 +
sizeof(BlockIndexEntry*) * nextBlockIndexCapacity));
2975 if (raw ==
nullptr) {
2979 auto header =
new (raw) BlockIndexHeader;
2980 auto entries =
reinterpret_cast<BlockIndexEntry*
>(details::align_for<BlockIndexEntry>(raw +
sizeof(BlockIndexHeader)));
2981 auto index =
reinterpret_cast<BlockIndexEntry**
>(details::align_for<BlockIndexEntry*>(
reinterpret_cast<char*
>(entries) +
sizeof(BlockIndexEntry) * entryCount));
2982 if (prev !=
nullptr) {
2983 auto prevTail = prev->tail.load(std::memory_order_relaxed);
2984 auto prevPos = prevTail;
2987 prevPos = (prevPos + 1) & (prev->capacity - 1);
2988 index[
i++] = prev->index[prevPos];
2989 }
while (prevPos != prevTail);
2990 assert(i == prevCapacity);
2992 for (
size_t i = 0;
i != entryCount; ++
i) {
2993 new (entries +
i) BlockIndexEntry;
2994 entries[
i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
2995 index[prevCapacity +
i] = entries +
i;
2997 header->prev = prev;
2998 header->entries = entries;
2999 header->index = index;
3000 header->capacity = nextBlockIndexCapacity;
3001 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
3003 blockIndex.store(header, std::memory_order_release);
3005 nextBlockIndexCapacity <<= 1;
3011 size_t nextBlockIndexCapacity;
3012 std::atomic<BlockIndexHeader*> blockIndex;
3014#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3016 details::ThreadExitListener threadExitListener;
3020#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3026#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3027 mutable debug::DebugMutex mutex;
3029#ifdef MCDBGQ_TRACKMEM
3030 friend struct MemStats;
3039 void populate_initial_block_list(
size_t blockCount)
3041 initialBlockPoolSize = blockCount;
3042 if (initialBlockPoolSize == 0) {
3043 initialBlockPool =
nullptr;
3047 initialBlockPool = create_array<Block>(blockCount);
3048 if (initialBlockPool ==
nullptr) {
3049 initialBlockPoolSize = 0;
3051 for (
size_t i = 0;
i < initialBlockPoolSize; ++
i) {
3052 initialBlockPool[
i].dynamicallyAllocated =
false;
3056 inline Block* try_get_block_from_initial_pool()
3058 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
3062 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3064 return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
3067 inline void add_block_to_free_list(Block* block)
3069#ifdef MCDBGQ_TRACKMEM
3070 block->owner =
nullptr;
3072 if (!Traits::RECYCLE_ALLOCATED_BLOCKS && block->dynamicallyAllocated) {
3076 freeList.add(block);
3080 inline void add_blocks_to_free_list(Block* block)
3082 while (block !=
nullptr) {
3083 auto next = block->next;
3084 add_block_to_free_list(block);
3089 inline Block* try_get_block_from_free_list()
3091 return freeList.try_get();
3095 template<AllocationMode canAlloc>
3096 Block* requisition_block()
3098 auto block = try_get_block_from_initial_pool();
3099 if (block !=
nullptr) {
3103 block = try_get_block_from_free_list();
3104 if (block !=
nullptr) {
3109 return create<Block>();
3117#ifdef MCDBGQ_TRACKMEM
3120 size_t allocatedBlocks;
3123 size_t ownedBlocksExplicit;
3124 size_t ownedBlocksImplicit;
3125 size_t implicitProducers;
3126 size_t explicitProducers;
3127 size_t elementsEnqueued;
3128 size_t blockClassBytes;
3129 size_t queueClassBytes;
3130 size_t implicitBlockIndexBytes;
3131 size_t explicitBlockIndexBytes;
3133 friend class ConcurrentQueue;
3136 static MemStats getFor(ConcurrentQueue* q)
3138 MemStats
stats = { 0 };
3140 stats.elementsEnqueued =
q->size_approx();
3142 auto block =
q->freeList.head_unsafe();
3143 while (block !=
nullptr) {
3144 ++
stats.allocatedBlocks;
3146 block = block->freeListNext.load(std::memory_order_relaxed);
3149 for (
auto ptr =
q->producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
3150 bool implicit =
dynamic_cast<ImplicitProducer*
>(ptr) !=
nullptr;
3151 stats.implicitProducers += implicit ? 1 : 0;
3152 stats.explicitProducers += implicit ? 0 : 1;
3155 auto prod =
static_cast<ImplicitProducer*
>(ptr);
3156 stats.queueClassBytes +=
sizeof(ImplicitProducer);
3157 auto head = prod->headIndex.load(std::memory_order_relaxed);
3158 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3159 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3160 if (hash !=
nullptr) {
3161 for (
size_t i = 0;
i != hash->capacity; ++
i) {
3162 if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) !=
nullptr) {
3163 ++
stats.allocatedBlocks;
3164 ++
stats.ownedBlocksImplicit;
3167 stats.implicitBlockIndexBytes += hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry);
3168 for (; hash !=
nullptr; hash = hash->prev) {
3169 stats.implicitBlockIndexBytes +=
sizeof(
typename ImplicitProducer::BlockIndexHeader) + hash->capacity *
sizeof(
typename ImplicitProducer::BlockIndexEntry*);
3172 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3178 auto prod =
static_cast<ExplicitProducer*
>(ptr);
3179 stats.queueClassBytes +=
sizeof(ExplicitProducer);
3180 auto tailBlock = prod->tailBlock;
3181 bool wasNonEmpty =
false;
3182 if (tailBlock !=
nullptr) {
3183 auto block = tailBlock;
3185 ++
stats.allocatedBlocks;
3186 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3188 wasNonEmpty = wasNonEmpty || block != tailBlock;
3190 ++
stats.ownedBlocksExplicit;
3191 block = block->next;
3192 }
while (block != tailBlock);
3194 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3195 while (index !=
nullptr) {
3196 stats.explicitBlockIndexBytes +=
sizeof(
typename ExplicitProducer::BlockIndexHeader) + index->size *
sizeof(
typename ExplicitProducer::BlockIndexEntry);
3197 index =
static_cast<typename ExplicitProducer::BlockIndexHeader*
>(index->prev);
3202 auto freeOnInitialPool =
q->initialBlockPoolIndex.load(std::memory_order_relaxed) >=
q->initialBlockPoolSize ? 0 :
q->initialBlockPoolSize -
q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3203 stats.allocatedBlocks += freeOnInitialPool;
3204 stats.freeBlocks += freeOnInitialPool;
3206 stats.blockClassBytes =
sizeof(Block) *
stats.allocatedBlocks;
3207 stats.queueClassBytes +=
sizeof(ConcurrentQueue);
3214 MemStats getMemStats()
3216 return MemStats::getFor(
this);
3219 friend struct MemStats;
3227 ProducerBase* recycle_or_create_producer(
bool isExplicit)
3229#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3230 debug::DebugLock lock(implicitProdMutex);
3233 for (
auto ptr = producerListTail.load(std::memory_order_acquire); ptr !=
nullptr; ptr = ptr->next_prod()) {
3234 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3235 bool expected =
true;
3236 if (ptr->inactive.compare_exchange_strong(expected,
false, std::memory_order_acquire, std::memory_order_relaxed)) {
3243 return add_producer(isExplicit ?
static_cast<ProducerBase*
>(create<ExplicitProducer>(
this)) : create<ImplicitProducer>(
this));
3246 ProducerBase* add_producer(ProducerBase* producer)
3249 if (producer ==
nullptr) {
3253 producerCount.fetch_add(1, std::memory_order_relaxed);
3256 auto prevTail = producerListTail.load(std::memory_order_relaxed);
3258 producer->next = prevTail;
3259 }
while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
3261#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3262 if (producer->isExplicit) {
3263 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3265 static_cast<ExplicitProducer*
>(producer)->nextExplicitProducer = prevTailExplicit;
3266 }
while (!explicitProducers.compare_exchange_weak(prevTailExplicit,
static_cast<ExplicitProducer*
>(producer), std::memory_order_release, std::memory_order_relaxed));
3269 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3271 static_cast<ImplicitProducer*
>(producer)->nextImplicitProducer = prevTailImplicit;
3272 }
while (!implicitProducers.compare_exchange_weak(prevTailImplicit,
static_cast<ImplicitProducer*
>(producer), std::memory_order_release, std::memory_order_relaxed));
3279 void reown_producers()
3284 for (
auto ptr = producerListTail.load(std::memory_order_relaxed); ptr !=
nullptr; ptr = ptr->next_prod()) {
3294 struct ImplicitProducerKVP
3296 std::atomic<details::thread_id_t> key;
3299 ImplicitProducerKVP() : value(nullptr) { }
3303 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3304 value = other.value;
3315 if (
this != &other) {
3316 details::swap_relaxed(key, other.key);
3317 std::swap(value, other.value);
3322 template<
typename XT,
typename XTraits>
3325 struct ImplicitProducerHash
3328 ImplicitProducerKVP* entries;
3329 ImplicitProducerHash* prev;
3332 inline void populate_initial_implicit_producer_hash()
3338 implicitProducerHashCount.store(0, std::memory_order_relaxed);
3339 auto hash = &initialImplicitProducerHash;
3341 hash->entries = &initialImplicitProducerHashEntries[0];
3343 initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3345 hash->prev =
nullptr;
3346 implicitProducerHash.store(hash, std::memory_order_relaxed);
3357 initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3358 initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3359 other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3361 details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3363 details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3364 if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3365 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3368 ImplicitProducerHash* hash;
3369 for (hash = implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3372 hash->prev = &initialImplicitProducerHash;
3374 if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3375 other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3378 ImplicitProducerHash* hash;
3379 for (hash = other.implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3382 hash->prev = &other.initialImplicitProducerHash;
3400#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3401 debug::DebugLock lock(implicitProdMutex);
3405 auto hashedId = details::hash_thread_id(
id);
3407 auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3408 assert(mainHash !=
nullptr);
3409 for (
auto hash = mainHash; hash !=
nullptr; hash = hash->prev) {
3411 auto index = hashedId;
3413 index &= hash->capacity - 1u;
3415 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3416 if (probedKey ==
id) {
3422 auto value = hash->entries[index].value;
3423 if (hash != mainHash) {
3426 index &= mainHash->capacity - 1u;
3427 auto empty = details::invalid_thread_id;
3428#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3429 auto reusable = details::invalid_thread_id2;
3430 if (mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_seq_cst, std::memory_order_relaxed) ||
3431 mainHash->entries[index].key.compare_exchange_strong(reusable,
id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3433 if (mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3435 mainHash->entries[index].value = value;
3444 if (probedKey == details::invalid_thread_id) {
3452 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3455 if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3460 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3461 if (newCount >= (mainHash->capacity >> 1)) {
3462 size_t newCapacity = mainHash->capacity << 1;
3463 while (newCount >= (newCapacity >> 1)) {
3466 auto raw =
static_cast<char*
>((Traits::malloc)(
sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 +
sizeof(ImplicitProducerKVP) * newCapacity));
3467 if (raw ==
nullptr) {
3469 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3470 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3474 auto newHash =
new (raw) ImplicitProducerHash;
3475 newHash->capacity =
static_cast<size_t>(newCapacity);
3476 newHash->entries =
reinterpret_cast<ImplicitProducerKVP*
>(details::align_for<ImplicitProducerKVP>(raw +
sizeof(ImplicitProducerHash)));
3477 for (
size_t i = 0;
i != newCapacity; ++
i) {
3478 new (newHash->entries +
i) ImplicitProducerKVP;
3479 newHash->entries[
i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3481 newHash->prev = mainHash;
3482 implicitProducerHash.store(newHash, std::memory_order_release);
3483 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3487 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3494 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3495 auto producer =
static_cast<ImplicitProducer*
>(recycle_or_create_producer(
false));
3496 if (producer ==
nullptr) {
3497 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3501#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3502 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3503 producer->threadExitListener.userData = producer;
3507 auto index = hashedId;
3509 index &= mainHash->capacity - 1u;
3510 auto empty = details::invalid_thread_id;
3511#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3512 auto reusable = details::invalid_thread_id2;
3513 if (mainHash->entries[index].key.compare_exchange_strong(reusable,
id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3514 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3515 mainHash->entries[index].value = producer;
3519 if (mainHash->entries[index].key.compare_exchange_strong(empty,
id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3520 mainHash->entries[index].value = producer;
3531 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3535#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3539#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3540 debug::DebugLock lock(implicitProdMutex);
3542 auto hash = implicitProducerHash.load(std::memory_order_acquire);
3543 assert(hash !=
nullptr);
3545 auto hashedId = details::hash_thread_id(
id);
3550 for (; hash !=
nullptr; hash = hash->prev) {
3551 auto index = hashedId;
3553 index &= hash->capacity - 1u;
3555 if (hash->entries[index].key.compare_exchange_strong(probedKey, details::invalid_thread_id2, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3559 }
while (probedKey != details::invalid_thread_id);
3563 producer->inactive.store(
true, std::memory_order_release);
3566 static void implicit_producer_thread_exited_callback(
void* userData)
3569 auto queue = producer->parent;
3570 queue->implicit_producer_thread_exited(producer);
3578 template<
typename TAlign>
3579 static inline void* aligned_malloc(
size_t size)
3582 return (Traits::malloc)(
size);
3584 size_t alignment = std::alignment_of<TAlign>::value;
3585 void* raw = (Traits::malloc)(
size + alignment - 1 +
sizeof(
void*));
3588 char* ptr = details::align_for<TAlign>(
reinterpret_cast<char*
>(raw) +
sizeof(
void*));
3589 *(
reinterpret_cast<void**
>(ptr) - 1) = raw;
3594 template<
typename TAlign>
3595 static inline void aligned_free(
void* ptr)
3598 return (Traits::free)(ptr);
3600 (Traits::free)(ptr ? *(
reinterpret_cast<void**
>(ptr) - 1) :
nullptr);
3603 template<
typename U>
3604 static inline U* create_array(
size_t count)
3607 U*
p =
static_cast<U*
>(aligned_malloc<U>(
sizeof(U) * count));
3611 for (
size_t i = 0;
i !=
count; ++
i)
3616 template<
typename U>
3617 static inline void destroy_array(U* p,
size_t count)
3621 for (
size_t i = count;
i != 0; )
3627 template<
typename U>
3628 static inline U* create()
3630 void*
p = aligned_malloc<U>(
sizeof(U));
3631 return p !=
nullptr ?
new (
p) U : nullptr;
3634 template<
typename U,
typename A1>
3635 static inline U* create(A1&& a1)
3637 void*
p = aligned_malloc<U>(
sizeof(U));
3638 return p !=
nullptr ?
new (
p) U(std::forward<A1>(a1)) :
nullptr;
3641 template<
typename U>
3642 static inline void destroy(U* p)
3650 std::atomic<ProducerBase*> producerListTail;
3651 std::atomic<std::uint32_t> producerCount;
3653 std::atomic<size_t> initialBlockPoolIndex;
3654 Block* initialBlockPool;
3655 size_t initialBlockPoolSize;
3657#ifndef MCDBGQ_USEDEBUGFREELIST
3658 FreeList<Block> freeList;
3660 debug::DebugFreeList<Block> freeList;
3663 std::atomic<ImplicitProducerHash*> implicitProducerHash;
3664 std::atomic<size_t> implicitProducerHashCount;
3665 ImplicitProducerHash initialImplicitProducerHash;
3666 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3667 std::atomic_flag implicitProducerHashResizeInProgress;
3669 std::atomic<std::uint32_t> nextExplicitConsumerId;
3670 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3672#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3673 debug::DebugMutex implicitProdMutex;
3676#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3677 std::atomic<ExplicitProducer*> explicitProducers;
3678 std::atomic<ImplicitProducer*> implicitProducers;