Unravel Engine C++ Reference
Loading...
Searching...
No Matches
concurrentqueue.h
Go to the documentation of this file.
1// Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
2// An overview, including benchmark results, is provided here:
3// http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
4// The full design is also described in excruciating detail at:
5// http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
6
7// Simplified BSD license:
8// Copyright (c) 2013-2020, Cameron Desrochers.
9// All rights reserved.
10//
11// Redistribution and use in source and binary forms, with or without modification,
12// are permitted provided that the following conditions are met:
13//
14// - Redistributions of source code must retain the above copyright notice, this list of
15// conditions and the following disclaimer.
16// - Redistributions in binary form must reproduce the above copyright notice, this list of
17// conditions and the following disclaimer in the documentation and/or other materials
18// provided with the distribution.
19//
20// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
21// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
22// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
23// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
25// OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
26// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
27// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
28// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30// Also dual-licensed under the Boost Software License (see LICENSE.md)
31
32#pragma once
33
34#if defined(__GNUC__) && !defined(__INTEL_COMPILER)
35// Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
36// Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
37// upon assigning any computed values)
38#pragma GCC diagnostic push
39#pragma GCC diagnostic ignored "-Wconversion"
40
41#ifdef MCDBGQ_USE_RELACY
42#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
43#endif
44#endif
45
46#if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
47// VS2019 with /W4 warns about constant conditional expressions but unless /std=c++17 or higher
48// does not support `if constexpr`, so we have no choice but to simply disable the warning
49#pragma warning(push)
50#pragma warning(disable: 4127) // conditional expression is constant
51#endif
52
53#if defined(__APPLE__)
54#include "TargetConditionals.h"
55#endif
56
57#ifdef MCDBGQ_USE_RELACY
58#include "relacy/relacy_std.hpp"
59#include "relacy_shims.h"
60// We only use malloc/free anyway, and the delete macro messes up `= delete` method declarations.
61// We'll override the default trait malloc ourselves without a macro.
62#undef new
63#undef delete
64#undef malloc
65#undef free
66#else
67#include <atomic> // Requires C++11. Sorry VS2010.
68#include <cassert>
69#endif
70#include <cstddef> // for max_align_t
71#include <cstdint>
72#include <cstdlib>
73#include <type_traits>
74#include <algorithm>
75#include <utility>
76#include <limits>
77#include <climits> // for CHAR_BIT
78#include <array>
79#include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
80#include <mutex> // used for thread exit synchronization
81
82// Platform-specific definitions of a numeric thread ID type and an invalid value
83namespace moodycamel { namespace details {
84 template<typename thread_id_t> struct thread_id_converter {
87 static thread_id_hash_t prehash(thread_id_t const& x) { return x; }
88 };
89} }
90#if defined(MCDBGQ_USE_RELACY)
91namespace moodycamel { namespace details {
92 typedef std::uint32_t thread_id_t;
93 static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
94 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
95 static inline thread_id_t thread_id() { return rl::thread_index(); }
96} }
97#elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
98// No sense pulling in windows.h in a header, we'll manually declare the function
99// we use and rely on backwards-compatibility for this not to break
100extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
101namespace moodycamel { namespace details {
102 static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows");
103 typedef std::uint32_t thread_id_t;
104 static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
105 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread IDs are presently multiples of 4.
106 static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); }
107} }
108#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE) || defined(__MVS__) || defined(MOODYCAMEL_NO_THREAD_LOCAL)
109namespace moodycamel { namespace details {
110 static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes");
111
112 typedef std::thread::id thread_id_t;
113 static const thread_id_t invalid_thread_id; // Default ctor creates invalid ID
114
115 // Note we don't define a invalid_thread_id2 since std::thread::id doesn't have one; it's
116 // only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined anyway, which it won't
117 // be.
118 static inline thread_id_t thread_id() { return std::this_thread::get_id(); }
119
120 template<std::size_t> struct thread_id_size { };
121 template<> struct thread_id_size<4> { typedef std::uint32_t numeric_t; };
122 template<> struct thread_id_size<8> { typedef std::uint64_t numeric_t; };
123
124 template<> struct thread_id_converter<thread_id_t> {
125 typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
126#ifndef __APPLE__
127 typedef std::size_t thread_id_hash_t;
128#else
130#endif
131
133 {
134#ifndef __APPLE__
135 return std::hash<std::thread::id>()(x);
136#else
137 return *reinterpret_cast<thread_id_hash_t const*>(&x);
138#endif
139 }
140 };
141} }
142#else
143// Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
144// In order to get a numeric thread ID in a platform-independent way, we use a thread-local
145// static variable's address as a thread identifier :-)
146#if defined(__GNUC__) || defined(__INTEL_COMPILER)
147#define MOODYCAMEL_THREADLOCAL __thread
148#elif defined(_MSC_VER)
149#define MOODYCAMEL_THREADLOCAL __declspec(thread)
150#else
151// Assume C++11 compliant compiler
152#define MOODYCAMEL_THREADLOCAL thread_local
153#endif
154namespace moodycamel { namespace details {
155 typedef std::uintptr_t thread_id_t;
156 static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr
157 static const thread_id_t invalid_thread_id2 = 1; // Member accesses off a null pointer are also generally invalid. Plus it's not aligned.
158 inline thread_id_t thread_id() { static MOODYCAMEL_THREADLOCAL int x; return reinterpret_cast<thread_id_t>(&x); }
159} }
160#endif
161
162// Constexpr if
163#ifndef MOODYCAMEL_CONSTEXPR_IF
164#if (defined(_MSC_VER) && defined(_HAS_CXX17) && _HAS_CXX17) || __cplusplus > 201402L
165#define MOODYCAMEL_CONSTEXPR_IF if constexpr
166#define MOODYCAMEL_MAYBE_UNUSED [[maybe_unused]]
167#else
168#define MOODYCAMEL_CONSTEXPR_IF if
169#define MOODYCAMEL_MAYBE_UNUSED
170#endif
171#endif
172
173// Exceptions
174#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
175#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
176#define MOODYCAMEL_EXCEPTIONS_ENABLED
177#endif
178#endif
179#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
180#define MOODYCAMEL_TRY try
181#define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__)
182#define MOODYCAMEL_RETHROW throw
183#define MOODYCAMEL_THROW(expr) throw (expr)
184#else
185#define MOODYCAMEL_TRY MOODYCAMEL_CONSTEXPR_IF (true)
186#define MOODYCAMEL_CATCH(...) else MOODYCAMEL_CONSTEXPR_IF (false)
187#define MOODYCAMEL_RETHROW
188#define MOODYCAMEL_THROW(expr)
189#endif
190
191#ifndef MOODYCAMEL_NOEXCEPT
192#if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
193#define MOODYCAMEL_NOEXCEPT
194#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
195#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
196#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
197// VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-(
198// We have to assume *all* non-trivial constructors may throw on VS2012!
199#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
200#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
201#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
202#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
203#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
204#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
205#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
206#else
207#define MOODYCAMEL_NOEXCEPT noexcept
208#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
209#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
210#endif
211#endif
212
213#ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
214#ifdef MCDBGQ_USE_RELACY
215#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
216#else
217// VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445
218// g++ <=4.7 doesn't support thread_local either.
219// Finally, iOS/ARM doesn't have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work
220#if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__) && !defined(__MVS__)
221// Assume `thread_local` is fully supported in all other C++11 compilers/platforms
222#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // tentatively enabled for now; years ago several users report having problems with it on
223#endif
224#endif
225#endif
226
227// VS2012 doesn't support deleted functions.
228// In this case, we declare the function normally but don't define it. A link error will be generated if the function is called.
229#ifndef MOODYCAMEL_DELETE_FUNCTION
230#if defined(_MSC_VER) && _MSC_VER < 1800
231#define MOODYCAMEL_DELETE_FUNCTION
232#else
233#define MOODYCAMEL_DELETE_FUNCTION = delete
234#endif
235#endif
236
237namespace moodycamel { namespace details {
238#ifndef MOODYCAMEL_ALIGNAS
239// VS2013 doesn't support alignas or alignof, and align() requires a constant literal
240#if defined(_MSC_VER) && _MSC_VER <= 1800
241#define MOODYCAMEL_ALIGNAS(alignment) __declspec(align(alignment))
242#define MOODYCAMEL_ALIGNOF(obj) __alignof(obj)
243#define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) typename details::Vs2013Aligned<std::alignment_of<obj>::value, T>::type
244 template<int Align, typename T> struct Vs2013Aligned { }; // default, unsupported alignment
245 template<typename T> struct Vs2013Aligned<1, T> { typedef __declspec(align(1)) T type; };
246 template<typename T> struct Vs2013Aligned<2, T> { typedef __declspec(align(2)) T type; };
247 template<typename T> struct Vs2013Aligned<4, T> { typedef __declspec(align(4)) T type; };
248 template<typename T> struct Vs2013Aligned<8, T> { typedef __declspec(align(8)) T type; };
249 template<typename T> struct Vs2013Aligned<16, T> { typedef __declspec(align(16)) T type; };
250 template<typename T> struct Vs2013Aligned<32, T> { typedef __declspec(align(32)) T type; };
251 template<typename T> struct Vs2013Aligned<64, T> { typedef __declspec(align(64)) T type; };
252 template<typename T> struct Vs2013Aligned<128, T> { typedef __declspec(align(128)) T type; };
253 template<typename T> struct Vs2013Aligned<256, T> { typedef __declspec(align(256)) T type; };
254#else
255 template<typename T> struct identity { typedef T type; };
256#define MOODYCAMEL_ALIGNAS(alignment) alignas(alignment)
257#define MOODYCAMEL_ALIGNOF(obj) alignof(obj)
258#define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj) alignas(alignof(obj)) typename details::identity<T>::type
259#endif
260#endif
261} }
262
263
264// TSAN can false report races in lock-free code. To enable TSAN to be used from projects that use this one,
265// we can apply per-function compile-time suppression.
266// See https://clang.llvm.org/docs/ThreadSanitizer.html#has-feature-thread-sanitizer
267#define MOODYCAMEL_NO_TSAN
268#if defined(__has_feature)
269 #if __has_feature(thread_sanitizer)
270 #undef MOODYCAMEL_NO_TSAN
271 #define MOODYCAMEL_NO_TSAN __attribute__((no_sanitize("thread")))
272 #endif // TSAN
273#endif // TSAN
274
275// Compiler-specific likely/unlikely hints
276namespace moodycamel { namespace details {
277#if defined(__GNUC__)
278 static inline bool (likely)(bool x) { return __builtin_expect((x), true); }
279 static inline bool (unlikely)(bool x) { return __builtin_expect((x), false); }
280#else
281 static inline bool (likely)(bool x) { return x; }
282 static inline bool (unlikely)(bool x) { return x; }
283#endif
284} }
285
286#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
287#include "internal/concurrentqueue_internal_debug.h"
288#endif
289
290namespace moodycamel {
291namespace details {
292 template<typename T>
294 static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");
295 static const T value = std::numeric_limits<T>::is_signed
296 ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
297 : static_cast<T>(-1);
298 };
299
300#if defined(__GLIBCXX__)
301 typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while
302#else
303 typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std::
304#endif
305
306 // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
307 // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
308 typedef union {
310 long long y;
311 void* z;
312 } max_align_t;
313}
314
315// Default traits for the ConcurrentQueue. To change some of the
316// traits without re-implementing all of them, inherit from this
317// struct and shadow the declarations you wish to be different;
318// since the traits are used as a template type parameter, the
319// shadowed declarations will be used where defined, and the defaults
320// otherwise.
322{
323 // General-purpose size type. std::size_t is strongly recommended.
324 typedef std::size_t size_t;
325
326 // The type used for the enqueue and dequeue indices. Must be at least as
327 // large as size_t. Should be significantly larger than the number of elements
328 // you expect to hold at once, especially if you have a high turnover rate;
329 // for example, on 32-bit x86, if you expect to have over a hundred million
330 // elements or pump several million elements through your queue in a very
331 // short space of time, using a 32-bit type *may* trigger a race condition.
332 // A 64-bit int type is recommended in that case, and in practice will
333 // prevent a race condition no matter the usage of the queue. Note that
334 // whether the queue is lock-free with a 64-int type depends on the whether
335 // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
336 typedef std::size_t index_t;
337
338 // Internally, all elements are enqueued and dequeued from multi-element
339 // blocks; this is the smallest controllable unit. If you expect few elements
340 // but many producers, a smaller block size should be favoured. For few producers
341 // and/or many elements, a larger block size is preferred. A sane default
342 // is provided. Must be a power of 2.
343 static const size_t BLOCK_SIZE = 32;
344
345 // For explicit producers (i.e. when using a producer token), the block is
346 // checked for being empty by iterating through a list of flags, one per element.
347 // For large block sizes, this is too inefficient, and switching to an atomic
348 // counter-based approach is faster. The switch is made for block sizes strictly
349 // larger than this threshold.
351
352 // How many full blocks can be expected for a single explicit producer? This should
353 // reflect that number's maximum for optimal performance. Must be a power of 2.
354 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
355
356 // How many full blocks can be expected for a single implicit producer? This should
357 // reflect that number's maximum for optimal performance. Must be a power of 2.
358 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
359
360 // The initial size of the hash table mapping thread IDs to implicit producers.
361 // Note that the hash is resized every time it becomes half full.
362 // Must be a power of two, and either 0 or at least 1. If 0, implicit production
363 // (using the enqueue methods without an explicit producer token) is disabled.
364 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
365
366 // Controls the number of items that an explicit consumer (i.e. one with a token)
367 // must consume before it causes all consumers to rotate and move on to the next
368 // internal queue.
370
371 // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
372 // Enqueue operations that would cause this limit to be surpassed will fail. Note
373 // that this limit is enforced at the block level (for performance reasons), i.e.
374 // it's rounded up to the nearest block size.
376
377 // The number of times to spin before sleeping when waiting on a semaphore.
378 // Recommended values are on the order of 1000-10000 unless the number of
379 // consumer threads exceeds the number of idle cores (in which case try 0-100).
380 // Only affects instances of the BlockingConcurrentQueue.
381 static const int MAX_SEMA_SPINS = 10000;
382
383 // Whether to recycle dynamically-allocated blocks into an internal free list or
384 // not. If false, only pre-allocated blocks (controlled by the constructor
385 // arguments) will be recycled, and all others will be `free`d back to the heap.
386 // Note that blocks consumed by explicit producers are only freed on destruction
387 // of the queue (not following destruction of the token) regardless of this trait.
388 static const bool RECYCLE_ALLOCATED_BLOCKS = false;
389
390
391#ifndef MCDBGQ_USE_RELACY
392 // Memory allocation can be customized if needed.
393 // malloc should return nullptr on failure, and handle alignment like std::malloc.
394#if defined(malloc) || defined(free)
395 // Gah, this is 2015, stop defining macros that break standard code already!
396 // Work around malloc/free being special macros:
397 static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
398 static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
399 static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
400 static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
401#else
402 static inline void* malloc(size_t size) { return std::malloc(size); }
403 static inline void free(void* ptr) { return std::free(ptr); }
404#endif
405#else
406 // Debug versions when running under the Relacy race detector (ignore
407 // these in user code)
408 static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); }
409 static inline void free(void* ptr) { return rl::rl_free(ptr, $); }
410#endif
411};
412
413
414// When producing or consuming many elements, the most efficient way is to:
415// 1) Use one of the bulk-operation methods of the queue with a token
416// 2) Failing that, use the bulk-operation methods without a token
417// 3) Failing that, create a token and use that with the single-item methods
418// 4) Failing that, use the single-parameter methods of the queue
419// Having said that, don't create tokens willy-nilly -- ideally there should be
420// a maximum of one token per thread (of each kind).
421struct ProducerToken;
422struct ConsumerToken;
423
424template<typename T, typename Traits> class ConcurrentQueue;
425template<typename T, typename Traits> class BlockingConcurrentQueue;
426class ConcurrentQueueTests;
427
428
429namespace details
430{
442
443 template<bool use32> struct _hash_32_or_64 {
444 static inline std::uint32_t hash(std::uint32_t h)
445 {
446 // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
447 // Since the thread ID is already unique, all we really want to do is propagate that
448 // uniqueness evenly across all the bits, so that we can use a subset of the bits while
449 // reducing collisions significantly
450 h ^= h >> 16;
451 h *= 0x85ebca6b;
452 h ^= h >> 13;
453 h *= 0xc2b2ae35;
454 return h ^ (h >> 16);
455 }
456 };
457 template<> struct _hash_32_or_64<1> {
458 static inline std::uint64_t hash(std::uint64_t h)
459 {
460 h ^= h >> 33;
461 h *= 0xff51afd7ed558ccd;
462 h ^= h >> 33;
463 h *= 0xc4ceb9fe1a85ec53;
464 return h ^ (h >> 33);
465 }
466 };
467 template<std::size_t size> struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> { };
468
469 static inline size_t hash_thread_id(thread_id_t id)
470 {
471 static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
472 return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
474 }
475
476 template<typename T>
477 static inline bool circular_less_than(T a, T b)
478 {
479 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types");
480 return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << (static_cast<T>(sizeof(T) * CHAR_BIT - 1)));
481 // Note: extra parens around rhs of operator<< is MSVC bug: https://developercommunity2.visualstudio.com/t/C4554-triggers-when-both-lhs-and-rhs-is/10034931
482 // silencing the bug requires #pragma warning(disable: 4554) around the calling code and has no effect when done here.
483 }
484
485 template<typename U>
486 static inline char* align_for(char* ptr)
487 {
488 const std::size_t alignment = std::alignment_of<U>::value;
489 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
490 }
491
492 template<typename T>
493 static inline T ceil_to_pow_2(T x)
494 {
495 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types");
496
497 // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
498 --x;
499 x |= x >> 1;
500 x |= x >> 2;
501 x |= x >> 4;
502 for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
503 x |= x >> (i << 3);
504 }
505 ++x;
506 return x;
507 }
508
509 template<typename T>
510 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
511 {
512 T temp = std::move(left.load(std::memory_order_relaxed));
513 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
514 right.store(std::move(temp), std::memory_order_relaxed);
515 }
516
517 template<typename T>
518 static inline T const& nomove(T const& x)
519 {
520 return x;
521 }
522
523 template<bool Enable>
525 {
526 template<typename T>
527 static inline T const& eval(T const& x)
528 {
529 return x;
530 }
531 };
532
533 template<>
534 struct nomove_if<false>
535 {
536 template<typename U>
537 static inline auto eval(U&& x)
538 -> decltype(std::forward<U>(x))
539 {
540 return std::forward<U>(x);
541 }
542 };
543
544 template<typename It>
545 static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
546 {
547 return *it;
548 }
549
550#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
551 template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { };
552#else
553 template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
554#endif
555
556#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
557#ifdef MCDBGQ_USE_RELACY
558 typedef RelacyThreadExitListener ThreadExitListener;
559 typedef RelacyThreadExitNotifier ThreadExitNotifier;
560#else
561 class ThreadExitNotifier;
562
564 {
565 typedef void (*callback_t)(void*);
567 void* userData;
568
569 ThreadExitListener* next; // reserved for use by the ThreadExitNotifier
570 ThreadExitNotifier* chain; // reserved for use by the ThreadExitNotifier
571 };
572
574 {
575 public:
576 static void subscribe(ThreadExitListener* listener)
577 {
578 auto& tlsInst = instance();
579 std::lock_guard<std::mutex> guard(mutex());
580 listener->next = tlsInst.tail;
581 listener->chain = &tlsInst;
582 tlsInst.tail = listener;
583 }
584
585 static void unsubscribe(ThreadExitListener* listener)
586 {
587 std::lock_guard<std::mutex> guard(mutex());
588 if (!listener->chain) {
589 return; // race with ~ThreadExitNotifier
590 }
591 auto& tlsInst = *listener->chain;
592 listener->chain = nullptr;
593 ThreadExitListener** prev = &tlsInst.tail;
594 for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
595 if (ptr == listener) {
596 *prev = ptr->next;
597 break;
598 }
599 prev = &ptr->next;
600 }
601 }
602
603 private:
604 ThreadExitNotifier() : tail(nullptr) { }
605 ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
606 ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
607
608 ~ThreadExitNotifier()
609 {
610 // This thread is about to exit, let everyone know!
611 assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
612 std::lock_guard<std::mutex> guard(mutex());
613 for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
614 ptr->chain = nullptr;
615 ptr->callback(ptr->userData);
616 }
617 }
618
619 // Thread-local
620 static inline ThreadExitNotifier& instance()
621 {
622 static thread_local ThreadExitNotifier notifier;
623 return notifier;
624 }
625
626 static inline std::mutex& mutex()
627 {
628 // Must be static because the ThreadExitNotifier could be destroyed while unsubscribe is called
629 static std::mutex mutex;
630 return mutex;
631 }
632
633 private:
634 ThreadExitListener* tail;
635 };
636#endif
637#endif
638
639 template<typename T> struct static_is_lock_free_num { enum { value = 0 }; };
640 template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; };
641 template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; };
642 template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; };
643 template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; };
644 template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; };
645 template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { };
646 template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; };
647 template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; };
648}
649
650
652{
653 template<typename T, typename Traits>
655
656 template<typename T, typename Traits>
658
660 : producer(other.producer)
661 {
662 other.producer = nullptr;
663 if (producer != nullptr) {
664 producer->token = this;
665 }
666 }
667
669 {
670 swap(other);
671 return *this;
672 }
673
675 {
676 std::swap(producer, other.producer);
677 if (producer != nullptr) {
678 producer->token = this;
679 }
680 if (other.producer != nullptr) {
681 other.producer->token = &other;
682 }
683 }
684
685 // A token is always valid unless:
686 // 1) Memory allocation failed during construction
687 // 2) It was moved via the move constructor
688 // (Note: assignment does a swap, leaving both potentially valid)
689 // 3) The associated queue was destroyed
690 // Note that if valid() returns true, that only indicates
691 // that the token is valid for use with a specific queue,
692 // but not which one; that's up to the user to track.
693 inline bool valid() const { return producer != nullptr; }
694
696 {
697 if (producer != nullptr) {
698 producer->token = nullptr;
699 producer->inactive.store(true, std::memory_order_release);
700 }
701 }
702
703 // Disable copying and assignment
706
707private:
708 template<typename T, typename Traits> friend class ConcurrentQueue;
710
711protected:
713};
714
715
717{
718 template<typename T, typename Traits>
720
721 template<typename T, typename Traits>
723
725 : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
726 {
727 }
728
730 {
731 swap(other);
732 return *this;
733 }
734
736 {
737 std::swap(initialOffset, other.initialOffset);
738 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
739 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
740 std::swap(currentProducer, other.currentProducer);
741 std::swap(desiredProducer, other.desiredProducer);
742 }
743
744 // Disable copying and assignment
747
748private:
749 template<typename T, typename Traits> friend class ConcurrentQueue;
751
752private: // but shared with ConcurrentQueue
753 std::uint32_t initialOffset;
754 std::uint32_t lastKnownGlobalOffset;
755 std::uint32_t itemsConsumedFromCurrent;
758};
759
760// Need to forward-declare this swap because it's in a namespace.
761// See http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
762template<typename T, typename Traits>
763inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT;
764
765
766template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
768{
769public:
772
773 typedef typename Traits::index_t index_t;
774 typedef typename Traits::size_t size_t;
775
776 static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
777 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
778 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
779 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
780 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
781 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
782#ifdef _MSC_VER
783#pragma warning(push)
784#pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!)
785#pragma warning(disable: 4309) // static_cast: Truncation of constant value
786#endif
787 static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
788#ifdef _MSC_VER
789#pragma warning(pop)
790#endif
791
792 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value, "Traits::size_t must be an unsigned integral type");
793 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value, "Traits::index_t must be an unsigned integral type");
794 static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t");
795 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
796 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
797 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
798 static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
799 static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) || !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)), "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
800 static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1, "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
801
802public:
803 // Creates a queue with at least `capacity` element slots; note that the
804 // actual number of elements that can be inserted without additional memory
805 // allocation depends on the number of producers and the block size (e.g. if
806 // the block size is equal to `capacity`, only a single block will be allocated
807 // up-front, which means only a single producer will be able to enqueue elements
808 // without an extra allocation -- blocks aren't shared between producers).
809 // This method is not thread safe -- it is up to the user to ensure that the
810 // queue is fully constructed before it starts being used by other threads (this
811 // includes making the memory effects of construction visible, possibly with a
812 // memory barrier).
813 explicit ConcurrentQueue(size_t capacity = 32 * BLOCK_SIZE)
814 : producerListTail(nullptr),
815 producerCount(0),
816 initialBlockPoolIndex(0),
817 nextExplicitConsumerId(0),
818 globalExplicitConsumerOffset(0)
819 {
820 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
821 populate_initial_implicit_producer_hash();
822 populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
823
824#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
825 // Track all the producers using a fully-resolved typed list for
826 // each kind; this makes it possible to debug them starting from
827 // the root queue object (otherwise wacky casts are needed that
828 // don't compile in the debugger's expression evaluator).
829 explicitProducers.store(nullptr, std::memory_order_relaxed);
830 implicitProducers.store(nullptr, std::memory_order_relaxed);
831#endif
832 }
833
834 // Computes the correct amount of pre-allocated blocks for you based
835 // on the minimum number of elements you want available at any given
836 // time, and the maximum concurrent number of each type of producer.
837 ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
838 : producerListTail(nullptr),
839 producerCount(0),
840 initialBlockPoolIndex(0),
841 nextExplicitConsumerId(0),
842 globalExplicitConsumerOffset(0)
843 {
844 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
845 populate_initial_implicit_producer_hash();
846 size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers);
847 populate_initial_block_list(blocks);
848
849#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
850 explicitProducers.store(nullptr, std::memory_order_relaxed);
851 implicitProducers.store(nullptr, std::memory_order_relaxed);
852#endif
853 }
854
855 // Note: The queue should not be accessed concurrently while it's
856 // being deleted. It's up to the user to synchronize this.
857 // This method is not thread safe.
859 {
860 // Destroy producers
861 auto ptr = producerListTail.load(std::memory_order_relaxed);
862 while (ptr != nullptr) {
863 auto next = ptr->next_prod();
864 if (ptr->token != nullptr) {
865 ptr->token->producer = nullptr;
866 }
867 destroy(ptr);
868 ptr = next;
869 }
870
871 // Destroy implicit producer hash tables
873 auto hash = implicitProducerHash.load(std::memory_order_relaxed);
874 while (hash != nullptr) {
875 auto prev = hash->prev;
876 if (prev != nullptr) { // The last hash is part of this object and was not allocated dynamically
877 for (size_t i = 0; i != hash->capacity; ++i) {
878 hash->entries[i].~ImplicitProducerKVP();
879 }
880 hash->~ImplicitProducerHash();
881 (Traits::free)(hash);
882 }
883 hash = prev;
884 }
885 }
886
887 // Destroy global free list
888 auto block = freeList.head_unsafe();
889 while (block != nullptr) {
890 auto next = block->freeListNext.load(std::memory_order_relaxed);
891 if (block->dynamicallyAllocated) {
892 destroy(block);
893 }
894 block = next;
895 }
896
897 // Destroy initial free list
898 destroy_array(initialBlockPool, initialBlockPoolSize);
899 }
900
901 // Disable copying and copy assignment
904
905 // Moving is supported, but note that it is *not* a thread-safe operation.
906 // Nobody can use the queue while it's being moved, and the memory effects
907 // of that move must be propagated to other threads before they can use it.
908 // Note: When a queue is moved, its tokens are still valid but can only be
909 // used with the destination queue (i.e. semantically they are moved along
910 // with the queue itself).
912 : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
913 producerCount(other.producerCount.load(std::memory_order_relaxed)),
914 initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
915 initialBlockPool(other.initialBlockPool),
916 initialBlockPoolSize(other.initialBlockPoolSize),
917 freeList(std::move(other.freeList)),
918 nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
919 globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
920 {
921 // Move the other one into this, and leave the other one as an empty queue
922 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
923 populate_initial_implicit_producer_hash();
924 swap_implicit_producer_hashes(other);
925
926 other.producerListTail.store(nullptr, std::memory_order_relaxed);
927 other.producerCount.store(0, std::memory_order_relaxed);
928 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
929 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
930
931#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
932 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
933 other.explicitProducers.store(nullptr, std::memory_order_relaxed);
934 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
935 other.implicitProducers.store(nullptr, std::memory_order_relaxed);
936#endif
937
938 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
939 other.initialBlockPoolSize = 0;
940 other.initialBlockPool = nullptr;
941
942 reown_producers();
943 }
944
946 {
947 return swap_internal(other);
948 }
949
950 // Swaps this queue's state with the other's. Not thread-safe.
951 // Swapping two queues does not invalidate their tokens, however
952 // the tokens that were created for one queue must be used with
953 // only the swapped queue (i.e. the tokens are tied to the
954 // queue's movable state, not the object itself).
956 {
957 swap_internal(other);
958 }
959
960private:
961 ConcurrentQueue& swap_internal(ConcurrentQueue& other)
962 {
963 if (this == &other) {
964 return *this;
965 }
966
967 details::swap_relaxed(producerListTail, other.producerListTail);
968 details::swap_relaxed(producerCount, other.producerCount);
969 details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
970 std::swap(initialBlockPool, other.initialBlockPool);
971 std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
972 freeList.swap(other.freeList);
973 details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
974 details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
975
976 swap_implicit_producer_hashes(other);
977
978 reown_producers();
979 other.reown_producers();
980
981#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
982 details::swap_relaxed(explicitProducers, other.explicitProducers);
983 details::swap_relaxed(implicitProducers, other.implicitProducers);
984#endif
985
986 return *this;
987 }
988
989public:
990 // Enqueues a single item (by copying it).
991 // Allocates memory if required. Only fails if memory allocation fails (or implicit
992 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
993 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
994 // Thread-safe.
995 inline bool enqueue(T const& item)
996 {
998 else return inner_enqueue<CanAlloc>(item);
999 }
1000
1001 // Enqueues a single item (by moving it, if possible).
1002 // Allocates memory if required. Only fails if memory allocation fails (or implicit
1003 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
1004 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1005 // Thread-safe.
1006 inline bool enqueue(T&& item)
1007 {
1009 else return inner_enqueue<CanAlloc>(std::move(item));
1010 }
1011
1012 // Enqueues a single item (by copying it) using an explicit producer token.
1013 // Allocates memory if required. Only fails if memory allocation fails (or
1014 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1015 // Thread-safe.
1016 inline bool enqueue(producer_token_t const& token, T const& item)
1017 {
1018 return inner_enqueue<CanAlloc>(token, item);
1019 }
1020
1021 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1022 // Allocates memory if required. Only fails if memory allocation fails (or
1023 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1024 // Thread-safe.
1025 inline bool enqueue(producer_token_t const& token, T&& item)
1026 {
1027 return inner_enqueue<CanAlloc>(token, std::move(item));
1028 }
1029
1030 // Enqueues several items.
1031 // Allocates memory if required. Only fails if memory allocation fails (or
1032 // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
1033 // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1034 // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
1035 // Thread-safe.
1036 template<typename It>
1037 bool enqueue_bulk(It itemFirst, size_t count)
1038 {
1040 else return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
1041 }
1042
1043 // Enqueues several items using an explicit producer token.
1044 // Allocates memory if required. Only fails if memory allocation fails
1045 // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1046 // Note: Use std::make_move_iterator if the elements should be moved
1047 // instead of copied.
1048 // Thread-safe.
1049 template<typename It>
1050 bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1051 {
1052 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
1053 }
1054
1055 // Enqueues a single item (by copying it).
1056 // Does not allocate memory. Fails if not enough room to enqueue (or implicit
1057 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
1058 // is 0).
1059 // Thread-safe.
1060 inline bool try_enqueue(T const& item)
1061 {
1063 else return inner_enqueue<CannotAlloc>(item);
1064 }
1065
1066 // Enqueues a single item (by moving it, if possible).
1067 // Does not allocate memory (except for one-time implicit producer).
1068 // Fails if not enough room to enqueue (or implicit production is
1069 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1070 // Thread-safe.
1071 inline bool try_enqueue(T&& item)
1072 {
1074 else return inner_enqueue<CannotAlloc>(std::move(item));
1075 }
1076
1077 // Enqueues a single item (by copying it) using an explicit producer token.
1078 // Does not allocate memory. Fails if not enough room to enqueue.
1079 // Thread-safe.
1080 inline bool try_enqueue(producer_token_t const& token, T const& item)
1081 {
1082 return inner_enqueue<CannotAlloc>(token, item);
1083 }
1084
1085 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1086 // Does not allocate memory. Fails if not enough room to enqueue.
1087 // Thread-safe.
1088 inline bool try_enqueue(producer_token_t const& token, T&& item)
1089 {
1090 return inner_enqueue<CannotAlloc>(token, std::move(item));
1091 }
1092
1093 // Enqueues several items.
1094 // Does not allocate memory (except for one-time implicit producer).
1095 // Fails if not enough room to enqueue (or implicit production is
1096 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1097 // Note: Use std::make_move_iterator if the elements should be moved
1098 // instead of copied.
1099 // Thread-safe.
1100 template<typename It>
1101 bool try_enqueue_bulk(It itemFirst, size_t count)
1102 {
1104 else return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1105 }
1106
1107 // Enqueues several items using an explicit producer token.
1108 // Does not allocate memory. Fails if not enough room to enqueue.
1109 // Note: Use std::make_move_iterator if the elements should be moved
1110 // instead of copied.
1111 // Thread-safe.
1112 template<typename It>
1113 bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1114 {
1115 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1116 }
1117
1118
1119
1120 // Attempts to dequeue from the queue.
1121 // Returns false if all producer streams appeared empty at the time they
1122 // were checked (so, the queue is likely but not guaranteed to be empty).
1123 // Never allocates. Thread-safe.
1124 template<typename U>
1125 bool try_dequeue(U& item)
1126 {
1127 // Instead of simply trying each producer in turn (which could cause needless contention on the first
1128 // producer), we score them heuristically.
1129 size_t nonEmptyCount = 0;
1130 ProducerBase* best = nullptr;
1131 size_t bestSize = 0;
1132 for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
1133 auto size = ptr->size_approx();
1134 if (size > 0) {
1135 if (size > bestSize) {
1136 bestSize = size;
1137 best = ptr;
1138 }
1139 ++nonEmptyCount;
1140 }
1141 }
1142
1143 // If there was at least one non-empty queue but it appears empty at the time
1144 // we try to dequeue from it, we need to make sure every queue's been tried
1145 if (nonEmptyCount > 0) {
1146 if ((details::likely)(best->dequeue(item))) {
1147 return true;
1148 }
1149 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1150 if (ptr != best && ptr->dequeue(item)) {
1151 return true;
1152 }
1153 }
1154 }
1155 return false;
1156 }
1157
1158 // Attempts to dequeue from the queue.
1159 // Returns false if all producer streams appeared empty at the time they
1160 // were checked (so, the queue is likely but not guaranteed to be empty).
1161 // This differs from the try_dequeue(item) method in that this one does
1162 // not attempt to reduce contention by interleaving the order that producer
1163 // streams are dequeued from. So, using this method can reduce overall throughput
1164 // under contention, but will give more predictable results in single-threaded
1165 // consumer scenarios. This is mostly only useful for internal unit tests.
1166 // Never allocates. Thread-safe.
1167 template<typename U>
1169 {
1170 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1171 if (ptr->dequeue(item)) {
1172 return true;
1173 }
1174 }
1175 return false;
1176 }
1177
1178 // Attempts to dequeue from the queue using an explicit consumer token.
1179 // Returns false if all producer streams appeared empty at the time they
1180 // were checked (so, the queue is likely but not guaranteed to be empty).
1181 // Never allocates. Thread-safe.
1182 template<typename U>
1183 bool try_dequeue(consumer_token_t& token, U& item)
1184 {
1185 // The idea is roughly as follows:
1186 // Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the highest efficiency consumer dictates the rotation speed of everyone else, more or less
1187 // If you see that the global offset has changed, you must reset your consumption counter and move to your designated place
1188 // If there's no items where you're supposed to be, keep moving until you find a producer with some items
1189 // If the global offset has not changed but you've run out of items to consume, move over from your current position until you find an producer with something in it
1190
1191 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1192 if (!update_current_producer_after_rotation(token)) {
1193 return false;
1194 }
1195 }
1196
1197 // If there was at least one non-empty queue but it appears empty at the time
1198 // we try to dequeue from it, we need to make sure every queue's been tried
1199 if (static_cast<ProducerBase*>(token.currentProducer)->dequeue(item)) {
1200 if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1201 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1202 }
1203 return true;
1204 }
1205
1206 auto tail = producerListTail.load(std::memory_order_acquire);
1207 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1208 if (ptr == nullptr) {
1209 ptr = tail;
1210 }
1211 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1212 if (ptr->dequeue(item)) {
1213 token.currentProducer = ptr;
1214 token.itemsConsumedFromCurrent = 1;
1215 return true;
1216 }
1217 ptr = ptr->next_prod();
1218 if (ptr == nullptr) {
1219 ptr = tail;
1220 }
1221 }
1222 return false;
1223 }
1224
1225 // Attempts to dequeue several elements from the queue.
1226 // Returns the number of items actually dequeued.
1227 // Returns 0 if all producer streams appeared empty at the time they
1228 // were checked (so, the queue is likely but not guaranteed to be empty).
1229 // Never allocates. Thread-safe.
1230 template<typename It>
1231 size_t try_dequeue_bulk(It itemFirst, size_t max)
1232 {
1233 size_t count = 0;
1234 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1235 count += ptr->dequeue_bulk(itemFirst, max - count);
1236 if (count == max) {
1237 break;
1238 }
1239 }
1240 return count;
1241 }
1242
1243 // Attempts to dequeue several elements from the queue using an explicit consumer token.
1244 // Returns the number of items actually dequeued.
1245 // Returns 0 if all producer streams appeared empty at the time they
1246 // were checked (so, the queue is likely but not guaranteed to be empty).
1247 // Never allocates. Thread-safe.
1248 template<typename It>
1249 size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
1250 {
1251 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1252 if (!update_current_producer_after_rotation(token)) {
1253 return 0;
1254 }
1255 }
1256
1257 size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1258 if (count == max) {
1259 if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1260 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1261 }
1262 return max;
1263 }
1264 token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
1265 max -= count;
1266
1267 auto tail = producerListTail.load(std::memory_order_acquire);
1268 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1269 if (ptr == nullptr) {
1270 ptr = tail;
1271 }
1272 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1273 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1274 count += dequeued;
1275 if (dequeued != 0) {
1276 token.currentProducer = ptr;
1277 token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
1278 }
1279 if (dequeued == max) {
1280 break;
1281 }
1282 max -= dequeued;
1283 ptr = ptr->next_prod();
1284 if (ptr == nullptr) {
1285 ptr = tail;
1286 }
1287 }
1288 return count;
1289 }
1290
1291
1292
1293 // Attempts to dequeue from a specific producer's inner queue.
1294 // If you happen to know which producer you want to dequeue from, this
1295 // is significantly faster than using the general-case try_dequeue methods.
1296 // Returns false if the producer's queue appeared empty at the time it
1297 // was checked (so, the queue is likely but not guaranteed to be empty).
1298 // Never allocates. Thread-safe.
1299 template<typename U>
1300 inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item)
1301 {
1302 return static_cast<ExplicitProducer*>(producer.producer)->dequeue(item);
1303 }
1304
1305 // Attempts to dequeue several elements from a specific producer's inner queue.
1306 // Returns the number of items actually dequeued.
1307 // If you happen to know which producer you want to dequeue from, this
1308 // is significantly faster than using the general-case try_dequeue methods.
1309 // Returns 0 if the producer's queue appeared empty at the time it
1310 // was checked (so, the queue is likely but not guaranteed to be empty).
1311 // Never allocates. Thread-safe.
1312 template<typename It>
1313 inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max)
1314 {
1315 return static_cast<ExplicitProducer*>(producer.producer)->dequeue_bulk(itemFirst, max);
1316 }
1317
1318
1319 // Returns an estimate of the total number of elements currently in the queue. This
1320 // estimate is only accurate if the queue has completely stabilized before it is called
1321 // (i.e. all enqueue and dequeue operations have completed and their memory effects are
1322 // visible on the calling thread, and no further operations start while this method is
1323 // being called).
1324 // Thread-safe.
1325 size_t size_approx() const
1326 {
1327 size_t size = 0;
1328 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1329 size += ptr->size_approx();
1330 }
1331 return size;
1332 }
1333
1334
1335 // Returns true if the underlying atomic variables used by
1336 // the queue are lock-free (they should be on most platforms).
1337 // Thread-safe.
1348
1349
1350private:
1351 friend struct ProducerToken;
1352 friend struct ConsumerToken;
1353 struct ExplicitProducer;
1354 friend struct ExplicitProducer;
1355 struct ImplicitProducer;
1356 friend struct ImplicitProducer;
1358
1359 enum AllocationMode { CanAlloc, CannotAlloc };
1360
1361
1363 // Queue methods
1365
1366 template<AllocationMode canAlloc, typename U>
1367 inline bool inner_enqueue(producer_token_t const& token, U&& element)
1368 {
1369 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1370 }
1371
1372 template<AllocationMode canAlloc, typename U>
1373 inline bool inner_enqueue(U&& element)
1374 {
1375 auto producer = get_or_add_implicit_producer();
1376 return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1377 }
1378
1379 template<AllocationMode canAlloc, typename It>
1380 inline bool inner_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1381 {
1382 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1383 }
1384
1385 template<AllocationMode canAlloc, typename It>
1386 inline bool inner_enqueue_bulk(It itemFirst, size_t count)
1387 {
1388 auto producer = get_or_add_implicit_producer();
1389 return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1390 }
1391
1392 inline bool update_current_producer_after_rotation(consumer_token_t& token)
1393 {
1394 // Ah, there's been a rotation, figure out where we should be!
1395 auto tail = producerListTail.load(std::memory_order_acquire);
1396 if (token.desiredProducer == nullptr && tail == nullptr) {
1397 return false;
1398 }
1399 auto prodCount = producerCount.load(std::memory_order_relaxed);
1400 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1401 if ((details::unlikely)(token.desiredProducer == nullptr)) {
1402 // Aha, first time we're dequeueing anything.
1403 // Figure out our local position
1404 // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
1405 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1406 token.desiredProducer = tail;
1407 for (std::uint32_t i = 0; i != offset; ++i) {
1408 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1409 if (token.desiredProducer == nullptr) {
1410 token.desiredProducer = tail;
1411 }
1412 }
1413 }
1414
1415 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1416 if (delta >= prodCount) {
1417 delta = delta % prodCount;
1418 }
1419 for (std::uint32_t i = 0; i != delta; ++i) {
1420 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1421 if (token.desiredProducer == nullptr) {
1422 token.desiredProducer = tail;
1423 }
1424 }
1425
1426 token.lastKnownGlobalOffset = globalOffset;
1427 token.currentProducer = token.desiredProducer;
1428 token.itemsConsumedFromCurrent = 0;
1429 return true;
1430 }
1431
1432
1434 // Free list
1436
1437 template <typename N>
1438 struct FreeListNode
1439 {
1440 FreeListNode() : freeListRefs(0), freeListNext(nullptr) { }
1441
1442 std::atomic<std::uint32_t> freeListRefs;
1443 std::atomic<N*> freeListNext;
1444 };
1445
1446 // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
1447 // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
1448 // speedy under low contention.
1449 template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them)
1450 struct FreeList
1451 {
1452 FreeList() : freeListHead(nullptr) { }
1453 FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); }
1454 void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1455
1456 FreeList(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
1457 FreeList& operator=(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
1458
1459 inline void add(N* node)
1460 {
1461#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1462 debug::DebugLock lock(mutex);
1463#endif
1464 // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
1465 // set it using a fetch_add
1466 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1467 // Oh look! We were the last ones referencing this node, and we know
1468 // we want to add it to the free list, so let's do it!
1469 add_knowing_refcount_is_zero(node);
1470 }
1471 }
1472
1473 inline N* try_get()
1474 {
1475#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1476 debug::DebugLock lock(mutex);
1477#endif
1478 auto head = freeListHead.load(std::memory_order_acquire);
1479 while (head != nullptr) {
1480 auto prevHead = head;
1481 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1482 if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
1483 head = freeListHead.load(std::memory_order_acquire);
1484 continue;
1485 }
1486
1487 // Good, reference count has been incremented (it wasn't at zero), which means we can read the
1488 // next and not worry about it changing between now and the time we do the CAS
1489 auto next = head->freeListNext.load(std::memory_order_relaxed);
1490 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1491 // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
1492 // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
1493 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1494
1495 // Decrease refcount twice, once for our ref, and once for the list's ref
1496 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1497 return head;
1498 }
1499
1500 // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
1501 // Note that we don't need to release any memory effects, but we do need to ensure that the reference
1502 // count decrement happens-after the CAS on the head.
1503 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1504 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1505 add_knowing_refcount_is_zero(prevHead);
1506 }
1507 }
1508
1509 return nullptr;
1510 }
1511
1512 // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
1513 N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
1514
1515 private:
1516 inline void add_knowing_refcount_is_zero(N* node)
1517 {
1518 // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
1519 // only one copy of this method per node at a time, i.e. the single thread case), then we know
1520 // we can safely change the next pointer of the node; however, once the refcount is back above
1521 // zero, then other threads could increase it (happens under heavy contention, when the refcount
1522 // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
1523 // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
1524 // to add the node to the actual list fails, decrease the refcount and leave the add operation to
1525 // the next thread who puts the refcount back at zero (which could be us, hence the loop).
1526 auto head = freeListHead.load(std::memory_order_relaxed);
1527 while (true) {
1528 node->freeListNext.store(head, std::memory_order_relaxed);
1529 node->freeListRefs.store(1, std::memory_order_release);
1530 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
1531 // Hmm, the add failed, but we can only try again when the refcount goes back to zero
1532 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1533 continue;
1534 }
1535 }
1536 return;
1537 }
1538 }
1539
1540 private:
1541 // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
1542 std::atomic<N*> freeListHead;
1543
1544 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1545 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1546
1547#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1548 debug::DebugMutex mutex;
1549#endif
1550 };
1551
1552
1554 // Block
1556
1557 enum InnerQueueContext { implicit_context = 0, explicit_context = 1 };
1558
1559 struct Block
1560 {
1561 Block()
1562 : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), dynamicallyAllocated(true)
1563 {
1564#ifdef MCDBGQ_TRACKMEM
1565 owner = nullptr;
1566#endif
1567 }
1568
1569 template<InnerQueueContext context>
1570 inline bool is_empty() const
1571 {
1573 // Check flags
1574 for (size_t i = 0; i < BLOCK_SIZE; ++i) {
1575 if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1576 return false;
1577 }
1578 }
1579
1580 // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
1581 std::atomic_thread_fence(std::memory_order_acquire);
1582 return true;
1583 }
1584 else {
1585 // Check counter
1586 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1587 std::atomic_thread_fence(std::memory_order_acquire);
1588 return true;
1589 }
1590 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1591 return false;
1592 }
1593 }
1594
1595 // Returns true if the block is now empty (does not apply in explicit context)
1596 template<InnerQueueContext context>
1597 inline bool set_empty(MOODYCAMEL_MAYBE_UNUSED index_t i)
1598 {
1600 // Set flag
1601 assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
1602 emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release);
1603 return false;
1604 }
1605 else {
1606 // Increment counter
1607 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1608 assert(prevVal < BLOCK_SIZE);
1609 return prevVal == BLOCK_SIZE - 1;
1610 }
1611 }
1612
1613 // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
1614 // Returns true if the block is now empty (does not apply in explicit context).
1615 template<InnerQueueContext context>
1616 inline bool set_many_empty(MOODYCAMEL_MAYBE_UNUSED index_t i, size_t count)
1617 {
1619 // Set flags
1620 std::atomic_thread_fence(std::memory_order_release);
1621 i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
1622 for (size_t j = 0; j != count; ++j) {
1623 assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1624 emptyFlags[i + j].store(true, std::memory_order_relaxed);
1625 }
1626 return false;
1627 }
1628 else {
1629 // Increment counter
1630 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1631 assert(prevVal + count <= BLOCK_SIZE);
1632 return prevVal + count == BLOCK_SIZE;
1633 }
1634 }
1635
1636 template<InnerQueueContext context>
1637 inline void set_all_empty()
1638 {
1640 // Set all flags
1641 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1642 emptyFlags[i].store(true, std::memory_order_relaxed);
1643 }
1644 }
1645 else {
1646 // Reset counter
1647 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1648 }
1649 }
1650
1651 template<InnerQueueContext context>
1652 inline void reset_empty()
1653 {
1655 // Reset flags
1656 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1657 emptyFlags[i].store(false, std::memory_order_relaxed);
1658 }
1659 }
1660 else {
1661 // Reset counter
1662 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1663 }
1664 }
1665
1666 inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1667 inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1668
1669 private:
1670 static_assert(std::alignment_of<T>::value <= sizeof(T), "The queue does not support types with an alignment greater than their size at this time");
1671 MOODYCAMEL_ALIGNED_TYPE_LIKE(char[sizeof(T) * BLOCK_SIZE], T) elements;
1672 public:
1673 Block* next;
1674 std::atomic<size_t> elementsCompletelyDequeued;
1675 std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1676 public:
1677 std::atomic<std::uint32_t> freeListRefs;
1678 std::atomic<Block*> freeListNext;
1679 bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
1680
1681#ifdef MCDBGQ_TRACKMEM
1682 void* owner;
1683#endif
1684 };
1685 static_assert(std::alignment_of<Block>::value >= std::alignment_of<T>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1686
1687
1688#ifdef MCDBGQ_TRACKMEM
1689public:
1690 struct MemStats;
1691private:
1692#endif
1693
1695 // Producer base
1697
1698 struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase
1699 {
1700 ProducerBase(ConcurrentQueue* parent_, bool isExplicit_) :
1701 tailIndex(0),
1702 headIndex(0),
1703 dequeueOptimisticCount(0),
1704 dequeueOvercommit(0),
1705 tailBlock(nullptr),
1706 isExplicit(isExplicit_),
1707 parent(parent_)
1708 {
1709 }
1710
1711 virtual ~ProducerBase() { }
1712
1713 template<typename U>
1714 inline bool dequeue(U& element)
1715 {
1716 if (isExplicit) {
1717 return static_cast<ExplicitProducer*>(this)->dequeue(element);
1718 }
1719 else {
1720 return static_cast<ImplicitProducer*>(this)->dequeue(element);
1721 }
1722 }
1723
1724 template<typename It>
1725 inline size_t dequeue_bulk(It& itemFirst, size_t max)
1726 {
1727 if (isExplicit) {
1728 return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1729 }
1730 else {
1731 return static_cast<ImplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1732 }
1733 }
1734
1735 inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
1736
1737 inline size_t size_approx() const
1738 {
1739 auto tail = tailIndex.load(std::memory_order_relaxed);
1740 auto head = headIndex.load(std::memory_order_relaxed);
1741 return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
1742 }
1743
1744 inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
1745 protected:
1746 std::atomic<index_t> tailIndex; // Where to enqueue to next
1747 std::atomic<index_t> headIndex; // Where to dequeue from next
1748
1749 std::atomic<index_t> dequeueOptimisticCount;
1750 std::atomic<index_t> dequeueOvercommit;
1751
1752 Block* tailBlock;
1753
1754 public:
1755 bool isExplicit;
1756 ConcurrentQueue* parent;
1757
1758 protected:
1759#ifdef MCDBGQ_TRACKMEM
1760 friend struct MemStats;
1761#endif
1762 };
1763
1764
1766 // Explicit queue
1768
1769 struct ExplicitProducer : public ProducerBase
1770 {
1771 explicit ExplicitProducer(ConcurrentQueue* parent_) :
1772 ProducerBase(parent_, true),
1773 blockIndex(nullptr),
1774 pr_blockIndexSlotsUsed(0),
1775 pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
1776 pr_blockIndexFront(0),
1777 pr_blockIndexEntries(nullptr),
1778 pr_blockIndexRaw(nullptr)
1779 {
1780 size_t poolBasedIndexSize = details::ceil_to_pow_2(parent_->initialBlockPoolSize) >> 1;
1781 if (poolBasedIndexSize > pr_blockIndexSize) {
1782 pr_blockIndexSize = poolBasedIndexSize;
1783 }
1784
1785 new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
1786 }
1787
1788 ~ExplicitProducer()
1789 {
1790 // Destruct any elements not yet dequeued.
1791 // Since we're in the destructor, we can assume all elements
1792 // are either completely dequeued or completely not (no halfways).
1793 if (this->tailBlock != nullptr) { // Note this means there must be a block index too
1794 // First find the block that's partially dequeued, if any
1795 Block* halfDequeuedBlock = nullptr;
1796 if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
1797 // The head's not on a block boundary, meaning a block somewhere is partially dequeued
1798 // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary)
1799 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1800 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
1801 i = (i + 1) & (pr_blockIndexSize - 1);
1802 }
1803 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
1804 halfDequeuedBlock = pr_blockIndexEntries[i].block;
1805 }
1806
1807 // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration)
1808 auto block = this->tailBlock;
1809 do {
1810 block = block->next;
1811 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1812 continue;
1813 }
1814
1815 size_t i = 0; // Offset into block
1816 if (block == halfDequeuedBlock) {
1817 i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
1818 }
1819
1820 // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index
1821 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
1822 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1823 (*block)[i++]->~T();
1824 }
1825 } while (block != this->tailBlock);
1826 }
1827
1828 // Destroy all blocks that we own
1829 if (this->tailBlock != nullptr) {
1830 auto block = this->tailBlock;
1831 do {
1832 auto nextBlock = block->next;
1833 this->parent->add_block_to_free_list(block);
1834 block = nextBlock;
1835 } while (block != this->tailBlock);
1836 }
1837
1838 // Destroy the block indices
1839 auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw);
1840 while (header != nullptr) {
1841 auto prev = static_cast<BlockIndexHeader*>(header->prev);
1842 header->~BlockIndexHeader();
1843 (Traits::free)(header);
1844 header = prev;
1845 }
1846 }
1847
1848 template<AllocationMode allocMode, typename U>
1849 inline bool enqueue(U&& element)
1850 {
1851 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1852 index_t newTailIndex = 1 + currentTailIndex;
1853 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
1854 // We reached the end of a block, start a new one
1855 auto startBlock = this->tailBlock;
1856 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1857 if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1858 // We can re-use the block ahead of us, it's empty!
1859 this->tailBlock = this->tailBlock->next;
1860 this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1861
1862 // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
1863 // last block from it first -- except instead of removing then adding, we can just overwrite).
1864 // Note that there must be a valid block index here, since even if allocation failed in the ctor,
1865 // it would have been re-attempted when adding the first block to the queue; since there is such
1866 // a block, a block index must have been successfully allocated.
1867 }
1868 else {
1869 // Whatever head value we see here is >= the last value we saw here (relatively),
1870 // and <= its current value. Since we have the most recent tail, the head must be
1871 // <= to it.
1872 auto head = this->headIndex.load(std::memory_order_relaxed);
1873 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1874 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
1876 // We can't enqueue in another block because there's not enough leeway -- the
1877 // tail could surpass the head by the time the block fills up! (Or we'll exceed
1878 // the size limit, if the second part of the condition was true.)
1879 return false;
1880 }
1881 // We're going to need a new block; check that the block index has room
1882 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1883 // Hmm, the circular block index is already full -- we'll need
1884 // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
1885 // the initial allocation failed in the constructor.
1886
1887 MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) {
1888 return false;
1889 }
1890 else if (!new_block_index(pr_blockIndexSlotsUsed)) {
1891 return false;
1892 }
1893 }
1894
1895 // Insert a new block in the circular linked list
1896 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1897 if (newBlock == nullptr) {
1898 return false;
1899 }
1900#ifdef MCDBGQ_TRACKMEM
1901 newBlock->owner = this;
1902#endif
1903 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1904 if (this->tailBlock == nullptr) {
1905 newBlock->next = newBlock;
1906 }
1907 else {
1908 newBlock->next = this->tailBlock->next;
1909 this->tailBlock->next = newBlock;
1910 }
1911 this->tailBlock = newBlock;
1912 ++pr_blockIndexSlotsUsed;
1913 }
1914
1915 MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element)))) {
1916 // The constructor may throw. We want the element not to appear in the queue in
1917 // that case (without corrupting the queue):
1919 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1920 }
1921 MOODYCAMEL_CATCH (...) {
1922 // Revert change to the current block, but leave the new block available
1923 // for next time
1924 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1925 this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
1927 }
1928 }
1929 else {
1930 (void)startBlock;
1931 (void)originalBlockIndexSlotsUsed;
1932 }
1933
1934 // Add block to block index
1935 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1936 entry.base = currentTailIndex;
1937 entry.block = this->tailBlock;
1938 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
1939 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
1940
1941 MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element)))) {
1942 this->tailIndex.store(newTailIndex, std::memory_order_release);
1943 return true;
1944 }
1945 }
1946
1947 // Enqueue
1948 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1949
1950 this->tailIndex.store(newTailIndex, std::memory_order_release);
1951 return true;
1952 }
1953
1954 template<typename U>
1955 bool dequeue(U& element)
1956 {
1957 auto tail = this->tailIndex.load(std::memory_order_relaxed);
1958 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
1959 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
1960 // Might be something to dequeue, let's give it a try
1961
1962 // Note that this if is purely for performance purposes in the common case when the queue is
1963 // empty and the values are eventually consistent -- we may enter here spuriously.
1964
1965 // Note that whatever the values of overcommit and tail are, they are not going to change (unless we
1966 // change them) and must be the same value at this point (inside the if) as when the if condition was
1967 // evaluated.
1968
1969 // We insert an acquire fence here to synchronize-with the release upon incrementing dequeueOvercommit below.
1970 // This ensures that whatever the value we got loaded into overcommit, the load of dequeueOptisticCount in
1971 // the fetch_add below will result in a value at least as recent as that (and therefore at least as large).
1972 // Note that I believe a compiler (signal) fence here would be sufficient due to the nature of fetch_add (all
1973 // read-modify-write operations are guaranteed to work on the latest value in the modification order), but
1974 // unfortunately that can't be shown to be correct using only the C++11 standard.
1975 // See http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
1976 std::atomic_thread_fence(std::memory_order_acquire);
1977
1978 // Increment optimistic counter, then check if it went over the boundary
1979 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
1980
1981 // Note that since dequeueOvercommit must be <= dequeueOptimisticCount (because dequeueOvercommit is only ever
1982 // incremented after dequeueOptimisticCount -- this is enforced in the `else` block below), and since we now
1983 // have a version of dequeueOptimisticCount that is at least as recent as overcommit (due to the release upon
1984 // incrementing dequeueOvercommit and the acquire above that synchronizes with it), overcommit <= myDequeueCount.
1985 // However, we can't assert this since both dequeueOptimisticCount and dequeueOvercommit may (independently)
1986 // overflow; in such a case, though, the logic still holds since the difference between the two is maintained.
1987
1988 // Note that we reload tail here in case it changed; it will be the same value as before or greater, since
1989 // this load is sequenced after (happens after) the earlier load above. This is supported by read-read
1990 // coherency (as defined in the standard), explained here: http://en.cppreference.com/w/cpp/atomic/memory_order
1991 tail = this->tailIndex.load(std::memory_order_acquire);
1992 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
1993 // Guaranteed to be at least one element to dequeue!
1994
1995 // Get the index. Note that since there's guaranteed to be at least one element, this
1996 // will never exceed tail. We need to do an acquire-release fence here since it's possible
1997 // that whatever condition got us to this point was for an earlier enqueued element (that
1998 // we already see the memory effects for), but that by the time we increment somebody else
1999 // has incremented it, and we need to see the memory effects for *that* element, which is
2000 // in such a case is necessarily visible on the thread that incremented it in the first
2001 // place with the more current condition (they must have acquired a tail that is at least
2002 // as recent).
2003 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2004
2005
2006 // Determine which block the element is in
2007
2008 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2009 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2010
2011 // We need to be careful here about subtracting and dividing because of index wrap-around.
2012 // When an index wraps, we need to preserve the sign of the offset when dividing it by the
2013 // block size (in order to get a correct signed block count offset in all cases):
2014 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2015 auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
2016 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) / static_cast<typename std::make_signed<index_t>::type>(BLOCK_SIZE));
2017 auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
2018
2019 // Dequeue
2020 auto& el = *((*block)[index]);
2021 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2022 // Make sure the element is still fully dequeued and destroyed even if the assignment
2023 // throws
2024 struct Guard {
2025 Block* block;
2026 index_t index;
2027
2028 ~Guard()
2029 {
2030 (*block)[index]->~T();
2031 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2032 }
2033 } guard = { block, index };
2034
2035 element = std::move(el); // NOLINT
2036 }
2037 else {
2038 element = std::move(el); // NOLINT
2039 el.~T(); // NOLINT
2040 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
2041 }
2042
2043 return true;
2044 }
2045 else {
2046 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2047 this->dequeueOvercommit.fetch_add(1, std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
2048 }
2049 }
2050
2051 return false;
2052 }
2053
2054 template<AllocationMode allocMode, typename It>
2055 bool MOODYCAMEL_NO_TSAN enqueue_bulk(It itemFirst, size_t count)
2056 {
2057 // First, we need to make sure we have enough room to enqueue all of the elements;
2058 // this means pre-allocating blocks and putting them in the block index (but only if
2059 // all the allocations succeeded).
2060 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2061 auto startBlock = this->tailBlock;
2062 auto originalBlockIndexFront = pr_blockIndexFront;
2063 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2064
2065 Block* firstAllocatedBlock = nullptr;
2066
2067 // Figure out how many blocks we'll need to allocate, and do so
2068 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2069 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2070 if (blockBaseDiff > 0) {
2071 // Allocate as many blocks as possible from ahead
2072 while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2073 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2074 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2075
2076 this->tailBlock = this->tailBlock->next;
2077 firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2078
2079 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2080 entry.base = currentTailIndex;
2081 entry.block = this->tailBlock;
2082 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2083 }
2084
2085 // Now allocate as many blocks as necessary from the block pool
2086 while (blockBaseDiff > 0) {
2087 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2088 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2089
2090 auto head = this->headIndex.load(std::memory_order_relaxed);
2091 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2092 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2093 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2094 MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) {
2095 // Failed to allocate, undo changes (but keep injected blocks)
2096 pr_blockIndexFront = originalBlockIndexFront;
2097 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2098 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2099 return false;
2100 }
2101 else if (full || !new_block_index(originalBlockIndexSlotsUsed)) {
2102 // Failed to allocate, undo changes (but keep injected blocks)
2103 pr_blockIndexFront = originalBlockIndexFront;
2104 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2105 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2106 return false;
2107 }
2108
2109 // pr_blockIndexFront is updated inside new_block_index, so we need to
2110 // update our fallback value too (since we keep the new index even if we
2111 // later fail)
2112 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2113 }
2114
2115 // Insert a new block in the circular linked list
2116 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2117 if (newBlock == nullptr) {
2118 pr_blockIndexFront = originalBlockIndexFront;
2119 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2120 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2121 return false;
2122 }
2123
2124#ifdef MCDBGQ_TRACKMEM
2125 newBlock->owner = this;
2126#endif
2127 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2128 if (this->tailBlock == nullptr) {
2129 newBlock->next = newBlock;
2130 }
2131 else {
2132 newBlock->next = this->tailBlock->next;
2133 this->tailBlock->next = newBlock;
2134 }
2135 this->tailBlock = newBlock;
2136 firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2137
2138 ++pr_blockIndexSlotsUsed;
2139
2140 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2141 entry.base = currentTailIndex;
2142 entry.block = this->tailBlock;
2143 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2144 }
2145
2146 // Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and
2147 // publish the new block index front
2148 auto block = firstAllocatedBlock;
2149 while (true) {
2150 block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2151 if (block == this->tailBlock) {
2152 break;
2153 }
2154 block = block->next;
2155 }
2156
2157 MOODYCAMEL_CONSTEXPR_IF (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst)))) {
2158 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2159 }
2160 }
2161
2162 // Enqueue, one block at a time
2163 index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2164 currentTailIndex = startTailIndex;
2165 auto endBlock = this->tailBlock;
2166 this->tailBlock = startBlock;
2167 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2168 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2169 this->tailBlock = firstAllocatedBlock;
2170 }
2171 while (true) {
2172 index_t stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2173 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2174 stopIndex = newTailIndex;
2175 }
2176 MOODYCAMEL_CONSTEXPR_IF (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst)))) {
2177 while (currentTailIndex != stopIndex) {
2178 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2179 }
2180 }
2181 else {
2183 while (currentTailIndex != stopIndex) {
2184 // Must use copy constructor even if move constructor is available
2185 // because we may have to revert if there's an exception.
2186 // Sorry about the horrible templated next line, but it was the only way
2187 // to disable moving *at compile time*, which is important because a type
2188 // may only define a (noexcept) move constructor, and so calls to the
2189 // cctor will not compile, even if they are in an if branch that will never
2190 // be executed
2191 new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2192 ++currentTailIndex;
2193 ++itemFirst;
2194 }
2195 }
2196 MOODYCAMEL_CATCH (...) {
2197 // Oh dear, an exception's been thrown -- destroy the elements that
2198 // were enqueued so far and revert the entire bulk operation (we'll keep
2199 // any allocated blocks in our linked list for later, though).
2200 auto constructedStopIndex = currentTailIndex;
2201 auto lastBlockEnqueued = this->tailBlock;
2202
2203 pr_blockIndexFront = originalBlockIndexFront;
2204 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2205 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2206
2207 if (!details::is_trivially_destructible<T>::value) {
2208 auto block = startBlock;
2209 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2210 block = firstAllocatedBlock;
2211 }
2212 currentTailIndex = startTailIndex;
2213 while (true) {
2214 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2215 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2216 stopIndex = constructedStopIndex;
2217 }
2218 while (currentTailIndex != stopIndex) {
2219 (*block)[currentTailIndex++]->~T();
2220 }
2221 if (block == lastBlockEnqueued) {
2222 break;
2223 }
2224 block = block->next;
2225 }
2226 }
2228 }
2229 }
2230
2231 if (this->tailBlock == endBlock) {
2232 assert(currentTailIndex == newTailIndex);
2233 break;
2234 }
2235 this->tailBlock = this->tailBlock->next;
2236 }
2237
2238 MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst)))) {
2239 if (firstAllocatedBlock != nullptr)
2240 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2241 }
2242
2243 this->tailIndex.store(newTailIndex, std::memory_order_release);
2244 return true;
2245 }
2246
2247 template<typename It>
2248 size_t dequeue_bulk(It& itemFirst, size_t max)
2249 {
2250 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2251 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2252 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2253 if (details::circular_less_than<size_t>(0, desiredCount)) {
2254 desiredCount = desiredCount < max ? desiredCount : max;
2255 std::atomic_thread_fence(std::memory_order_acquire);
2256
2257 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2258
2259 tail = this->tailIndex.load(std::memory_order_acquire);
2260 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2261 if (details::circular_less_than<size_t>(0, actualCount)) {
2262 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2263 if (actualCount < desiredCount) {
2264 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2265 }
2266
2267 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2268 // will never exceed tail.
2269 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2270
2271 // Determine which block the first element is in
2272 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2273 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2274
2275 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2276 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2277 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / static_cast<typename std::make_signed<index_t>::type>(BLOCK_SIZE));
2278 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2279
2280 // Iterate the blocks and dequeue
2281 auto index = firstIndex;
2282 do {
2283 auto firstIndexInBlock = index;
2284 index_t endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2285 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2286 auto block = localBlockIndex->entries[indexIndex].block;
2287 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2288 while (index != endIndex) {
2289 auto& el = *((*block)[index]);
2290 *itemFirst++ = std::move(el);
2291 el.~T();
2292 ++index;
2293 }
2294 }
2295 else {
2297 while (index != endIndex) {
2298 auto& el = *((*block)[index]);
2299 *itemFirst = std::move(el);
2300 ++itemFirst;
2301 el.~T();
2302 ++index;
2303 }
2304 }
2305 MOODYCAMEL_CATCH (...) {
2306 // It's too late to revert the dequeue, but we can make sure that all
2307 // the dequeued objects are properly destroyed and the block index
2308 // (and empty count) are properly updated before we propagate the exception
2309 do {
2310 block = localBlockIndex->entries[indexIndex].block;
2311 while (index != endIndex) {
2312 (*block)[index++]->~T();
2313 }
2314 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2315 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2316
2317 firstIndexInBlock = index;
2318 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2319 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2320 } while (index != firstIndex + actualCount);
2321
2323 }
2324 }
2325 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2326 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2327 } while (index != firstIndex + actualCount);
2328
2329 return actualCount;
2330 }
2331 else {
2332 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2333 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2334 }
2335 }
2336
2337 return 0;
2338 }
2339
2340 private:
2341 struct BlockIndexEntry
2342 {
2343 index_t base;
2344 Block* block;
2345 };
2346
2347 struct BlockIndexHeader
2348 {
2349 size_t size;
2350 std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront)
2351 BlockIndexEntry* entries;
2352 void* prev;
2353 };
2354
2355
2356 bool new_block_index(size_t numberOfFilledSlotsToExpose)
2357 {
2358 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2359
2360 // Create the new block
2361 pr_blockIndexSize <<= 1;
2362 auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize));
2363 if (newRawPtr == nullptr) {
2364 pr_blockIndexSize >>= 1; // Reset to allow graceful retry
2365 return false;
2366 }
2367
2368 auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader)));
2369
2370 // Copy in all the old indices, if any
2371 size_t j = 0;
2372 if (pr_blockIndexSlotsUsed != 0) {
2373 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2374 do {
2375 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2376 i = (i + 1) & prevBlockSizeMask;
2377 } while (i != pr_blockIndexFront);
2378 }
2379
2380 // Update everything
2381 auto header = new (newRawPtr) BlockIndexHeader;
2382 header->size = pr_blockIndexSize;
2383 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2384 header->entries = newBlockIndexEntries;
2385 header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later
2386
2387 pr_blockIndexFront = j;
2388 pr_blockIndexEntries = newBlockIndexEntries;
2389 pr_blockIndexRaw = newRawPtr;
2390 blockIndex.store(header, std::memory_order_release);
2391
2392 return true;
2393 }
2394
2395 private:
2396 std::atomic<BlockIndexHeader*> blockIndex;
2397
2398 // To be used by producer only -- consumer must use the ones in referenced by blockIndex
2399 size_t pr_blockIndexSlotsUsed;
2400 size_t pr_blockIndexSize;
2401 size_t pr_blockIndexFront; // Next slot (not current)
2402 BlockIndexEntry* pr_blockIndexEntries;
2403 void* pr_blockIndexRaw;
2404
2405#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2406 public:
2407 ExplicitProducer* nextExplicitProducer;
2408 private:
2409#endif
2410
2411#ifdef MCDBGQ_TRACKMEM
2412 friend struct MemStats;
2413#endif
2414 };
2415
2416
2418 // Implicit queue
2420
2421 struct ImplicitProducer : public ProducerBase
2422 {
2424 ProducerBase(parent_, false),
2425 nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE),
2426 blockIndex(nullptr)
2427 {
2428 new_block_index();
2429 }
2430
2431 ~ImplicitProducer()
2432 {
2433 // Note that since we're in the destructor we can assume that all enqueue/dequeue operations
2434 // completed already; this means that all undequeued elements are placed contiguously across
2435 // contiguous blocks, and that only the first and last remaining blocks can be only partially
2436 // empty (all other remaining blocks must be completely full).
2437
2438#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2439 // Unregister ourselves for thread termination notification
2440 if (!this->inactive.load(std::memory_order_relaxed)) {
2441 details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2442 }
2443#endif
2444
2445 // Destroy all remaining elements!
2446 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2447 auto index = this->headIndex.load(std::memory_order_relaxed);
2448 Block* block = nullptr;
2449 assert(index == tail || details::circular_less_than(index, tail));
2450 bool forceFreeLastBlock = index != tail; // If we enter the loop, then the last (tail) block will not be freed
2451 while (index != tail) {
2452 if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block == nullptr) {
2453 if (block != nullptr) {
2454 // Free the old block
2455 this->parent->add_block_to_free_list(block);
2456 }
2457
2458 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2459 }
2460
2461 ((*block)[index])->~T();
2462 ++index;
2463 }
2464 // Even if the queue is empty, there's still one block that's not on the free list
2465 // (unless the head index reached the end of it, in which case the tail will be poised
2466 // to create a new block).
2467 if (this->tailBlock != nullptr && (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2468 this->parent->add_block_to_free_list(this->tailBlock);
2469 }
2470
2471 // Destroy block index
2472 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2473 if (localBlockIndex != nullptr) {
2474 for (size_t i = 0; i != localBlockIndex->capacity; ++i) {
2475 localBlockIndex->index[i]->~BlockIndexEntry();
2476 }
2477 do {
2478 auto prev = localBlockIndex->prev;
2479 localBlockIndex->~BlockIndexHeader();
2480 (Traits::free)(localBlockIndex);
2481 localBlockIndex = prev;
2482 } while (localBlockIndex != nullptr);
2483 }
2484 }
2485
2486 template<AllocationMode allocMode, typename U>
2487 inline bool enqueue(U&& element)
2488 {
2489 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2490 index_t newTailIndex = 1 + currentTailIndex;
2491 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2492 // We reached the end of a block, start a new one
2493 auto head = this->headIndex.load(std::memory_order_relaxed);
2494 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2495 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2496 return false;
2497 }
2498#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2499 debug::DebugLock lock(mutex);
2500#endif
2501 // Find out where we'll be inserting this block in the block index
2502 BlockIndexEntry* idxEntry;
2503 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2504 return false;
2505 }
2506
2507 // Get ahold of a new block
2508 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2509 if (newBlock == nullptr) {
2510 rewind_block_index_tail();
2511 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2512 return false;
2513 }
2514#ifdef MCDBGQ_TRACKMEM
2515 newBlock->owner = this;
2516#endif
2517 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2518
2519 MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element)))) {
2520 // May throw, try to insert now before we publish the fact that we have this new block
2522 new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2523 }
2524 MOODYCAMEL_CATCH (...) {
2525 rewind_block_index_tail();
2526 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2527 this->parent->add_block_to_free_list(newBlock);
2529 }
2530 }
2531
2532 // Insert the new block into the index
2533 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2534
2535 this->tailBlock = newBlock;
2536
2537 MOODYCAMEL_CONSTEXPR_IF (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (static_cast<T*>(nullptr)) T(std::forward<U>(element)))) {
2538 this->tailIndex.store(newTailIndex, std::memory_order_release);
2539 return true;
2540 }
2541 }
2542
2543 // Enqueue
2544 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2545
2546 this->tailIndex.store(newTailIndex, std::memory_order_release);
2547 return true;
2548 }
2549
2550 template<typename U>
2551 bool dequeue(U& element)
2552 {
2553 // See ExplicitProducer::dequeue for rationale and explanation
2554 index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2555 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2556 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2557 std::atomic_thread_fence(std::memory_order_acquire);
2558
2559 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2560 tail = this->tailIndex.load(std::memory_order_acquire);
2561 if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2562 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2563
2564 // Determine which block the element is in
2565 auto entry = get_block_index_entry_for_index(index);
2566
2567 // Dequeue
2568 auto block = entry->value.load(std::memory_order_relaxed);
2569 auto& el = *((*block)[index]);
2570
2571 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2572#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2573 // Note: Acquiring the mutex with every dequeue instead of only when a block
2574 // is released is very sub-optimal, but it is, after all, purely debug code.
2575 debug::DebugLock lock(producer->mutex);
2576#endif
2577 struct Guard {
2578 Block* block;
2579 index_t index;
2580 BlockIndexEntry* entry;
2581 ConcurrentQueue* parent;
2582
2583 ~Guard()
2584 {
2585 (*block)[index]->~T();
2586 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2587 entry->value.store(nullptr, std::memory_order_relaxed);
2588 parent->add_block_to_free_list(block);
2589 }
2590 }
2591 } guard = { block, index, entry, this->parent };
2592
2593 element = std::move(el); // NOLINT
2594 }
2595 else {
2596 element = std::move(el); // NOLINT
2597 el.~T(); // NOLINT
2598
2599 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2600 {
2601#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2602 debug::DebugLock lock(mutex);
2603#endif
2604 // Add the block back into the global free pool (and remove from block index)
2605 entry->value.store(nullptr, std::memory_order_relaxed);
2606 }
2607 this->parent->add_block_to_free_list(block); // releases the above store
2608 }
2609 }
2610
2611 return true;
2612 }
2613 else {
2614 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2615 }
2616 }
2617
2618 return false;
2619 }
2620
2621#ifdef _MSC_VER
2622#pragma warning(push)
2623#pragma warning(disable: 4706) // assignment within conditional expression
2624#endif
2625 template<AllocationMode allocMode, typename It>
2626 bool enqueue_bulk(It itemFirst, size_t count)
2627 {
2628 // First, we need to make sure we have enough room to enqueue all of the elements;
2629 // this means pre-allocating blocks and putting them in the block index (but only if
2630 // all the allocations succeeded).
2631
2632 // Note that the tailBlock we start off with may not be owned by us any more;
2633 // this happens if it was filled up exactly to the top (setting tailIndex to
2634 // the first index of the next block which is not yet allocated), then dequeued
2635 // completely (putting it on the free list) before we enqueue again.
2636
2637 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2638 auto startBlock = this->tailBlock;
2639 Block* firstAllocatedBlock = nullptr;
2640 auto endBlock = this->tailBlock;
2641
2642 // Figure out how many blocks we'll need to allocate, and do so
2643 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2644 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2645 if (blockBaseDiff > 0) {
2646#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2647 debug::DebugLock lock(mutex);
2648#endif
2649 do {
2650 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2651 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2652
2653 // Find out where we'll be inserting this block in the block index
2654 BlockIndexEntry* idxEntry = nullptr; // initialization here unnecessary but compiler can't always tell
2655 Block* newBlock;
2656 bool indexInserted = false;
2657 auto head = this->headIndex.load(std::memory_order_relaxed);
2658 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2659 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2660
2661 if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) == nullptr) {
2662 // Index allocation or block allocation failed; revert any other allocations
2663 // and index insertions done so far for this operation
2664 if (indexInserted) {
2665 rewind_block_index_tail();
2666 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2667 }
2668 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2669 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2670 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2671 idxEntry = get_block_index_entry_for_index(currentTailIndex);
2672 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2673 rewind_block_index_tail();
2674 }
2675 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2676 this->tailBlock = startBlock;
2677
2678 return false;
2679 }
2680
2681#ifdef MCDBGQ_TRACKMEM
2682 newBlock->owner = this;
2683#endif
2684 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2685 newBlock->next = nullptr;
2686
2687 // Insert the new block into the index
2688 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2689
2690 // Store the chain of blocks so that we can undo if later allocations fail,
2691 // and so that we can find the blocks when we do the actual enqueueing
2692 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) {
2693 assert(this->tailBlock != nullptr);
2694 this->tailBlock->next = newBlock;
2695 }
2696 this->tailBlock = newBlock;
2697 endBlock = newBlock;
2698 firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
2699 } while (blockBaseDiff > 0);
2700 }
2701
2702 // Enqueue, one block at a time
2703 index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2704 currentTailIndex = startTailIndex;
2705 this->tailBlock = startBlock;
2706 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2707 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2708 this->tailBlock = firstAllocatedBlock;
2709 }
2710 while (true) {
2711 index_t stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2712 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2713 stopIndex = newTailIndex;
2714 }
2715 MOODYCAMEL_CONSTEXPR_IF (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst)))) {
2716 while (currentTailIndex != stopIndex) {
2717 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2718 }
2719 }
2720 else {
2722 while (currentTailIndex != stopIndex) {
2723 new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2724 ++currentTailIndex;
2725 ++itemFirst;
2726 }
2727 }
2728 MOODYCAMEL_CATCH (...) {
2729 auto constructedStopIndex = currentTailIndex;
2730 auto lastBlockEnqueued = this->tailBlock;
2731
2732 if (!details::is_trivially_destructible<T>::value) {
2733 auto block = startBlock;
2734 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2735 block = firstAllocatedBlock;
2736 }
2737 currentTailIndex = startTailIndex;
2738 while (true) {
2739 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2740 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2741 stopIndex = constructedStopIndex;
2742 }
2743 while (currentTailIndex != stopIndex) {
2744 (*block)[currentTailIndex++]->~T();
2745 }
2746 if (block == lastBlockEnqueued) {
2747 break;
2748 }
2749 block = block->next;
2750 }
2751 }
2752
2753 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2754 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2755 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2756 auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2757 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2758 rewind_block_index_tail();
2759 }
2760 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2761 this->tailBlock = startBlock;
2763 }
2764 }
2765
2766 if (this->tailBlock == endBlock) {
2767 assert(currentTailIndex == newTailIndex);
2768 break;
2769 }
2770 this->tailBlock = this->tailBlock->next;
2771 }
2772 this->tailIndex.store(newTailIndex, std::memory_order_release);
2773 return true;
2774 }
2775#ifdef _MSC_VER
2776#pragma warning(pop)
2777#endif
2778
2779 template<typename It>
2780 size_t dequeue_bulk(It& itemFirst, size_t max)
2781 {
2782 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2783 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2784 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2785 if (details::circular_less_than<size_t>(0, desiredCount)) {
2786 desiredCount = desiredCount < max ? desiredCount : max;
2787 std::atomic_thread_fence(std::memory_order_acquire);
2788
2789 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2790
2791 tail = this->tailIndex.load(std::memory_order_acquire);
2792 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2793 if (details::circular_less_than<size_t>(0, actualCount)) {
2794 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2795 if (actualCount < desiredCount) {
2796 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2797 }
2798
2799 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2800 // will never exceed tail.
2801 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2802
2803 // Iterate the blocks and dequeue
2804 auto index = firstIndex;
2805 BlockIndexHeader* localBlockIndex;
2806 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2807 do {
2808 auto blockStartIndex = index;
2809 index_t endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2810 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2811
2812 auto entry = localBlockIndex->index[indexIndex];
2813 auto block = entry->value.load(std::memory_order_relaxed);
2814 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2815 while (index != endIndex) {
2816 auto& el = *((*block)[index]);
2817 *itemFirst++ = std::move(el);
2818 el.~T();
2819 ++index;
2820 }
2821 }
2822 else {
2824 while (index != endIndex) {
2825 auto& el = *((*block)[index]);
2826 *itemFirst = std::move(el);
2827 ++itemFirst;
2828 el.~T();
2829 ++index;
2830 }
2831 }
2832 MOODYCAMEL_CATCH (...) {
2833 do {
2834 entry = localBlockIndex->index[indexIndex];
2835 block = entry->value.load(std::memory_order_relaxed);
2836 while (index != endIndex) {
2837 (*block)[index++]->~T();
2838 }
2839
2840 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2841#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2842 debug::DebugLock lock(mutex);
2843#endif
2844 entry->value.store(nullptr, std::memory_order_relaxed);
2845 this->parent->add_block_to_free_list(block);
2846 }
2847 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2848
2849 blockStartIndex = index;
2850 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2851 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2852 } while (index != firstIndex + actualCount);
2853
2855 }
2856 }
2857 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2858 {
2859#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2860 debug::DebugLock lock(mutex);
2861#endif
2862 // Note that the set_many_empty above did a release, meaning that anybody who acquires the block
2863 // we're about to free can use it safely since our writes (and reads!) will have happened-before then.
2864 entry->value.store(nullptr, std::memory_order_relaxed);
2865 }
2866 this->parent->add_block_to_free_list(block); // releases the above store
2867 }
2868 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2869 } while (index != firstIndex + actualCount);
2870
2871 return actualCount;
2872 }
2873 else {
2874 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2875 }
2876 }
2877
2878 return 0;
2879 }
2880
2881 private:
2882 // The block size must be > 1, so any number with the low bit set is an invalid block base index
2883 static const index_t INVALID_BLOCK_BASE = 1;
2884
2885 struct BlockIndexEntry
2886 {
2887 std::atomic<index_t> key;
2888 std::atomic<Block*> value;
2889 };
2890
2891 struct BlockIndexHeader
2892 {
2893 size_t capacity;
2894 std::atomic<size_t> tail;
2895 BlockIndexEntry* entries;
2896 BlockIndexEntry** index;
2897 BlockIndexHeader* prev;
2898 };
2899
2900 template<AllocationMode allocMode>
2901 inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry, index_t blockStartIndex)
2902 {
2903 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed); // We're the only writer thread, relaxed is OK
2904 if (localBlockIndex == nullptr) {
2905 return false; // this can happen if new_block_index failed in the constructor
2906 }
2907 size_t newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2908 idxEntry = localBlockIndex->index[newTail];
2909 if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
2910 idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
2911
2912 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2913 localBlockIndex->tail.store(newTail, std::memory_order_release);
2914 return true;
2915 }
2916
2917 // No room in the old block index, try to allocate another one!
2918 MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) {
2919 return false;
2920 }
2921 else if (!new_block_index()) {
2922 return false;
2923 }
2924 else {
2925 localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2926 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2927 idxEntry = localBlockIndex->index[newTail];
2928 assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
2929 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2930 localBlockIndex->tail.store(newTail, std::memory_order_release);
2931 return true;
2932 }
2933 }
2934
2935 inline void rewind_block_index_tail()
2936 {
2937 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2938 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
2939 }
2940
2941 inline BlockIndexEntry* get_block_index_entry_for_index(index_t index) const
2942 {
2943 BlockIndexHeader* localBlockIndex;
2944 auto idx = get_block_index_index_for_index(index, localBlockIndex);
2945 return localBlockIndex->index[idx];
2946 }
2947
2948 inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader*& localBlockIndex) const
2949 {
2950#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2951 debug::DebugLock lock(mutex);
2952#endif
2953 index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
2954 localBlockIndex = blockIndex.load(std::memory_order_acquire);
2955 auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
2956 auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
2957 assert(tailBase != INVALID_BLOCK_BASE);
2958 // Note: Must use division instead of shift because the index may wrap around, causing a negative
2959 // offset, whose negativity we want to preserve
2960 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / static_cast<typename std::make_signed<index_t>::type>(BLOCK_SIZE));
2961 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
2962 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr);
2963 return idx;
2964 }
2965
2966 bool new_block_index()
2967 {
2968 auto prev = blockIndex.load(std::memory_order_relaxed);
2969 size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
2970 auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity;
2971 auto raw = static_cast<char*>((Traits::malloc)(
2972 sizeof(BlockIndexHeader) +
2973 std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * entryCount +
2974 std::alignment_of<BlockIndexEntry*>::value - 1 + sizeof(BlockIndexEntry*) * nextBlockIndexCapacity));
2975 if (raw == nullptr) {
2976 return false;
2977 }
2978
2979 auto header = new (raw) BlockIndexHeader;
2980 auto entries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(raw + sizeof(BlockIndexHeader)));
2981 auto index = reinterpret_cast<BlockIndexEntry**>(details::align_for<BlockIndexEntry*>(reinterpret_cast<char*>(entries) + sizeof(BlockIndexEntry) * entryCount));
2982 if (prev != nullptr) {
2983 auto prevTail = prev->tail.load(std::memory_order_relaxed);
2984 auto prevPos = prevTail;
2985 size_t i = 0;
2986 do {
2987 prevPos = (prevPos + 1) & (prev->capacity - 1);
2988 index[i++] = prev->index[prevPos];
2989 } while (prevPos != prevTail);
2990 assert(i == prevCapacity);
2991 }
2992 for (size_t i = 0; i != entryCount; ++i) {
2993 new (entries + i) BlockIndexEntry;
2994 entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
2995 index[prevCapacity + i] = entries + i;
2996 }
2997 header->prev = prev;
2998 header->entries = entries;
2999 header->index = index;
3000 header->capacity = nextBlockIndexCapacity;
3001 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
3002
3003 blockIndex.store(header, std::memory_order_release);
3004
3005 nextBlockIndexCapacity <<= 1;
3006
3007 return true;
3008 }
3009
3010 private:
3011 size_t nextBlockIndexCapacity;
3012 std::atomic<BlockIndexHeader*> blockIndex;
3013
3014#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3015 public:
3016 details::ThreadExitListener threadExitListener;
3017 private:
3018#endif
3019
3020#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3021 public:
3022 ImplicitProducer* nextImplicitProducer;
3023 private:
3024#endif
3025
3026#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3027 mutable debug::DebugMutex mutex;
3028#endif
3029#ifdef MCDBGQ_TRACKMEM
3030 friend struct MemStats;
3031#endif
3032 };
3033
3034
3036 // Block pool manipulation
3038
3039 void populate_initial_block_list(size_t blockCount)
3040 {
3041 initialBlockPoolSize = blockCount;
3042 if (initialBlockPoolSize == 0) {
3043 initialBlockPool = nullptr;
3044 return;
3045 }
3046
3047 initialBlockPool = create_array<Block>(blockCount);
3048 if (initialBlockPool == nullptr) {
3049 initialBlockPoolSize = 0;
3050 }
3051 for (size_t i = 0; i < initialBlockPoolSize; ++i) {
3052 initialBlockPool[i].dynamicallyAllocated = false;
3053 }
3054 }
3055
3056 inline Block* try_get_block_from_initial_pool()
3057 {
3058 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
3059 return nullptr;
3060 }
3061
3062 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3063
3064 return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
3065 }
3066
3067 inline void add_block_to_free_list(Block* block)
3068 {
3069#ifdef MCDBGQ_TRACKMEM
3070 block->owner = nullptr;
3071#endif
3072 if (!Traits::RECYCLE_ALLOCATED_BLOCKS && block->dynamicallyAllocated) {
3073 destroy(block);
3074 }
3075 else {
3076 freeList.add(block);
3077 }
3078 }
3079
3080 inline void add_blocks_to_free_list(Block* block)
3081 {
3082 while (block != nullptr) {
3083 auto next = block->next;
3084 add_block_to_free_list(block);
3085 block = next;
3086 }
3087 }
3088
3089 inline Block* try_get_block_from_free_list()
3090 {
3091 return freeList.try_get();
3092 }
3093
3094 // Gets a free block from one of the memory pools, or allocates a new one (if applicable)
3095 template<AllocationMode canAlloc>
3096 Block* requisition_block()
3097 {
3098 auto block = try_get_block_from_initial_pool();
3099 if (block != nullptr) {
3100 return block;
3101 }
3102
3103 block = try_get_block_from_free_list();
3104 if (block != nullptr) {
3105 return block;
3106 }
3107
3108 MOODYCAMEL_CONSTEXPR_IF (canAlloc == CanAlloc) {
3109 return create<Block>();
3110 }
3111 else {
3112 return nullptr;
3113 }
3114 }
3115
3116
3117#ifdef MCDBGQ_TRACKMEM
3118 public:
3119 struct MemStats {
3120 size_t allocatedBlocks;
3121 size_t usedBlocks;
3122 size_t freeBlocks;
3123 size_t ownedBlocksExplicit;
3124 size_t ownedBlocksImplicit;
3125 size_t implicitProducers;
3126 size_t explicitProducers;
3127 size_t elementsEnqueued;
3128 size_t blockClassBytes;
3129 size_t queueClassBytes;
3130 size_t implicitBlockIndexBytes;
3131 size_t explicitBlockIndexBytes;
3132
3133 friend class ConcurrentQueue;
3134
3135 private:
3136 static MemStats getFor(ConcurrentQueue* q)
3137 {
3138 MemStats stats = { 0 };
3139
3140 stats.elementsEnqueued = q->size_approx();
3141
3142 auto block = q->freeList.head_unsafe();
3143 while (block != nullptr) {
3144 ++stats.allocatedBlocks;
3145 ++stats.freeBlocks;
3146 block = block->freeListNext.load(std::memory_order_relaxed);
3147 }
3148
3149 for (auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3150 bool implicit = dynamic_cast<ImplicitProducer*>(ptr) != nullptr;
3151 stats.implicitProducers += implicit ? 1 : 0;
3152 stats.explicitProducers += implicit ? 0 : 1;
3153
3154 if (implicit) {
3155 auto prod = static_cast<ImplicitProducer*>(ptr);
3156 stats.queueClassBytes += sizeof(ImplicitProducer);
3157 auto head = prod->headIndex.load(std::memory_order_relaxed);
3158 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3159 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3160 if (hash != nullptr) {
3161 for (size_t i = 0; i != hash->capacity; ++i) {
3162 if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) != nullptr) {
3163 ++stats.allocatedBlocks;
3164 ++stats.ownedBlocksImplicit;
3165 }
3166 }
3167 stats.implicitBlockIndexBytes += hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry);
3168 for (; hash != nullptr; hash = hash->prev) {
3169 stats.implicitBlockIndexBytes += sizeof(typename ImplicitProducer::BlockIndexHeader) + hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*);
3170 }
3171 }
3172 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3173 //auto block = prod->get_block_index_entry_for_index(head);
3174 ++stats.usedBlocks;
3175 }
3176 }
3177 else {
3178 auto prod = static_cast<ExplicitProducer*>(ptr);
3179 stats.queueClassBytes += sizeof(ExplicitProducer);
3180 auto tailBlock = prod->tailBlock;
3181 bool wasNonEmpty = false;
3182 if (tailBlock != nullptr) {
3183 auto block = tailBlock;
3184 do {
3185 ++stats.allocatedBlocks;
3186 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3187 ++stats.usedBlocks;
3188 wasNonEmpty = wasNonEmpty || block != tailBlock;
3189 }
3190 ++stats.ownedBlocksExplicit;
3191 block = block->next;
3192 } while (block != tailBlock);
3193 }
3194 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3195 while (index != nullptr) {
3196 stats.explicitBlockIndexBytes += sizeof(typename ExplicitProducer::BlockIndexHeader) + index->size * sizeof(typename ExplicitProducer::BlockIndexEntry);
3197 index = static_cast<typename ExplicitProducer::BlockIndexHeader*>(index->prev);
3198 }
3199 }
3200 }
3201
3202 auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3203 stats.allocatedBlocks += freeOnInitialPool;
3204 stats.freeBlocks += freeOnInitialPool;
3205
3206 stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks;
3207 stats.queueClassBytes += sizeof(ConcurrentQueue);
3208
3209 return stats;
3210 }
3211 };
3212
3213 // For debugging only. Not thread-safe.
3214 MemStats getMemStats()
3215 {
3216 return MemStats::getFor(this);
3217 }
3218 private:
3219 friend struct MemStats;
3220#endif
3221
3222
3224 // Producer list manipulation
3226
3227 ProducerBase* recycle_or_create_producer(bool isExplicit)
3228 {
3229#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3230 debug::DebugLock lock(implicitProdMutex);
3231#endif
3232 // Try to re-use one first
3233 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3234 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3235 bool expected = true;
3236 if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) {
3237 // We caught one! It's been marked as activated, the caller can have it
3238 return ptr;
3239 }
3240 }
3241 }
3242
3243 return add_producer(isExplicit ? static_cast<ProducerBase*>(create<ExplicitProducer>(this)) : create<ImplicitProducer>(this));
3244 }
3245
3246 ProducerBase* add_producer(ProducerBase* producer)
3247 {
3248 // Handle failed memory allocation
3249 if (producer == nullptr) {
3250 return nullptr;
3251 }
3252
3253 producerCount.fetch_add(1, std::memory_order_relaxed);
3254
3255 // Add it to the lock-free list
3256 auto prevTail = producerListTail.load(std::memory_order_relaxed);
3257 do {
3258 producer->next = prevTail;
3259 } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
3260
3261#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3262 if (producer->isExplicit) {
3263 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3264 do {
3265 static_cast<ExplicitProducer*>(producer)->nextExplicitProducer = prevTailExplicit;
3266 } while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3267 }
3268 else {
3269 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3270 do {
3271 static_cast<ImplicitProducer*>(producer)->nextImplicitProducer = prevTailImplicit;
3272 } while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3273 }
3274#endif
3275
3276 return producer;
3277 }
3278
3279 void reown_producers()
3280 {
3281 // After another instance is moved-into/swapped-with this one, all the
3282 // producers we stole still think their parents are the other queue.
3283 // So fix them up!
3284 for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) {
3285 ptr->parent = this;
3286 }
3287 }
3288
3289
3291 // Implicit producer hash
3293
3294 struct ImplicitProducerKVP
3295 {
3296 std::atomic<details::thread_id_t> key;
3297 ImplicitProducer* value; // No need for atomicity since it's only read by the thread that sets it in the first place
3298
3299 ImplicitProducerKVP() : value(nullptr) { }
3300
3301 ImplicitProducerKVP(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3302 {
3303 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3304 value = other.value;
3305 }
3306
3307 inline ImplicitProducerKVP& operator=(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3308 {
3309 swap(other);
3310 return *this;
3311 }
3312
3313 inline void swap(ImplicitProducerKVP& other) MOODYCAMEL_NOEXCEPT
3314 {
3315 if (this != &other) {
3316 details::swap_relaxed(key, other.key);
3317 std::swap(value, other.value);
3318 }
3319 }
3320 };
3321
3322 template<typename XT, typename XTraits>
3323 friend void moodycamel::swap(typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&, typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&) MOODYCAMEL_NOEXCEPT;
3324
3325 struct ImplicitProducerHash
3326 {
3327 size_t capacity;
3328 ImplicitProducerKVP* entries;
3329 ImplicitProducerHash* prev;
3330 };
3331
3332 inline void populate_initial_implicit_producer_hash()
3333 {
3335 return;
3336 }
3337 else {
3338 implicitProducerHashCount.store(0, std::memory_order_relaxed);
3339 auto hash = &initialImplicitProducerHash;
3340 hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3341 hash->entries = &initialImplicitProducerHashEntries[0];
3342 for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3343 initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3344 }
3345 hash->prev = nullptr;
3346 implicitProducerHash.store(hash, std::memory_order_relaxed);
3347 }
3348 }
3349
3350 void swap_implicit_producer_hashes(ConcurrentQueue& other)
3351 {
3353 return;
3354 }
3355 else {
3356 // Swap (assumes our implicit producer hash is initialized)
3357 initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3358 initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3359 other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3360
3361 details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3362
3363 details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3364 if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3365 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3366 }
3367 else {
3368 ImplicitProducerHash* hash;
3369 for (hash = implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3370 continue;
3371 }
3372 hash->prev = &initialImplicitProducerHash;
3373 }
3374 if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3375 other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3376 }
3377 else {
3378 ImplicitProducerHash* hash;
3379 for (hash = other.implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3380 continue;
3381 }
3382 hash->prev = &other.initialImplicitProducerHash;
3383 }
3384 }
3385 }
3386
3387 // Only fails (returns nullptr) if memory allocation fails
3388 ImplicitProducer* get_or_add_implicit_producer()
3389 {
3390 // Note that since the data is essentially thread-local (key is thread ID),
3391 // there's a reduced need for fences (memory ordering is already consistent
3392 // for any individual thread), except for the current table itself.
3393
3394 // Start by looking for the thread ID in the current and all previous hash tables.
3395 // If it's not found, it must not be in there yet, since this same thread would
3396 // have added it previously to one of the tables that we traversed.
3397
3398 // Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
3399
3400#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3401 debug::DebugLock lock(implicitProdMutex);
3402#endif
3403
3404 auto id = details::thread_id();
3405 auto hashedId = details::hash_thread_id(id);
3406
3407 auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3408 assert(mainHash != nullptr); // silence clang-tidy and MSVC warnings (hash cannot be null)
3409 for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
3410 // Look for the id in this hash
3411 auto index = hashedId;
3412 while (true) { // Not an infinite loop because at least one slot is free in the hash table
3413 index &= hash->capacity - 1u;
3414
3415 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3416 if (probedKey == id) {
3417 // Found it! If we had to search several hashes deep, though, we should lazily add it
3418 // to the current main hash table to avoid the extended search next time.
3419 // Note there's guaranteed to be room in the current hash table since every subsequent
3420 // table implicitly reserves space for all previous tables (there's only one
3421 // implicitProducerHashCount).
3422 auto value = hash->entries[index].value;
3423 if (hash != mainHash) {
3424 index = hashedId;
3425 while (true) {
3426 index &= mainHash->capacity - 1u;
3427 auto empty = details::invalid_thread_id;
3428#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3429 auto reusable = details::invalid_thread_id2;
3430 if (mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_seq_cst, std::memory_order_relaxed) ||
3431 mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3432#else
3433 if (mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3434#endif
3435 mainHash->entries[index].value = value;
3436 break;
3437 }
3438 ++index;
3439 }
3440 }
3441
3442 return value;
3443 }
3444 if (probedKey == details::invalid_thread_id) {
3445 break; // Not in this hash table
3446 }
3447 ++index;
3448 }
3449 }
3450
3451 // Insert!
3452 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3453 while (true) {
3454 // NOLINTNEXTLINE(clang-analyzer-core.NullDereference)
3455 if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
3456 // We've acquired the resize lock, try to allocate a bigger hash table.
3457 // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
3458 // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
3459 // locked block).
3460 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3461 if (newCount >= (mainHash->capacity >> 1)) {
3462 size_t newCapacity = mainHash->capacity << 1;
3463 while (newCount >= (newCapacity >> 1)) {
3464 newCapacity <<= 1;
3465 }
3466 auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 + sizeof(ImplicitProducerKVP) * newCapacity));
3467 if (raw == nullptr) {
3468 // Allocation failed
3469 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3470 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
3471 return nullptr;
3472 }
3473
3474 auto newHash = new (raw) ImplicitProducerHash;
3475 newHash->capacity = static_cast<size_t>(newCapacity);
3476 newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash)));
3477 for (size_t i = 0; i != newCapacity; ++i) {
3478 new (newHash->entries + i) ImplicitProducerKVP;
3479 newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3480 }
3481 newHash->prev = mainHash;
3482 implicitProducerHash.store(newHash, std::memory_order_release);
3483 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3484 mainHash = newHash;
3485 }
3486 else {
3487 implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3488 }
3489 }
3490
3491 // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
3492 // to finish being allocated by another thread (and if we just finished allocating above, the condition will
3493 // always be true)
3494 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3495 auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false));
3496 if (producer == nullptr) {
3497 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3498 return nullptr;
3499 }
3500
3501#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3502 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3503 producer->threadExitListener.userData = producer;
3504 details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3505#endif
3506
3507 auto index = hashedId;
3508 while (true) {
3509 index &= mainHash->capacity - 1u;
3510 auto empty = details::invalid_thread_id;
3511#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3512 auto reusable = details::invalid_thread_id2;
3513 if (mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3514 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed); // already counted as a used slot
3515 mainHash->entries[index].value = producer;
3516 break;
3517 }
3518#endif
3519 if (mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3520 mainHash->entries[index].value = producer;
3521 break;
3522 }
3523 ++index;
3524 }
3525 return producer;
3526 }
3527
3528 // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
3529 // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
3530 // we try to allocate ourselves).
3531 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3532 }
3533 }
3534
3535#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3536 void implicit_producer_thread_exited(ImplicitProducer* producer)
3537 {
3538 // Remove from hash
3539#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3540 debug::DebugLock lock(implicitProdMutex);
3541#endif
3542 auto hash = implicitProducerHash.load(std::memory_order_acquire);
3543 assert(hash != nullptr); // The thread exit listener is only registered if we were added to a hash in the first place
3544 auto id = details::thread_id();
3545 auto hashedId = details::hash_thread_id(id);
3546 details::thread_id_t probedKey;
3547
3548 // We need to traverse all the hashes just in case other threads aren't on the current one yet and are
3549 // trying to add an entry thinking there's a free slot (because they reused a producer)
3550 for (; hash != nullptr; hash = hash->prev) {
3551 auto index = hashedId;
3552 do {
3553 index &= hash->capacity - 1u;
3554 probedKey = id;
3555 if (hash->entries[index].key.compare_exchange_strong(probedKey, details::invalid_thread_id2, std::memory_order_seq_cst, std::memory_order_relaxed)) {
3556 break;
3557 }
3558 ++index;
3559 } while (probedKey != details::invalid_thread_id); // Can happen if the hash has changed but we weren't put back in it yet, or if we weren't added to this hash in the first place
3560 }
3561
3562 // Mark the queue as being recyclable
3563 producer->inactive.store(true, std::memory_order_release);
3564 }
3565
3566 static void implicit_producer_thread_exited_callback(void* userData)
3567 {
3568 auto producer = static_cast<ImplicitProducer*>(userData);
3569 auto queue = producer->parent;
3570 queue->implicit_producer_thread_exited(producer);
3571 }
3572#endif
3573
3575 // Utility functions
3577
3578 template<typename TAlign>
3579 static inline void* aligned_malloc(size_t size)
3580 {
3581 MOODYCAMEL_CONSTEXPR_IF (std::alignment_of<TAlign>::value <= std::alignment_of<details::max_align_t>::value)
3582 return (Traits::malloc)(size);
3583 else {
3584 size_t alignment = std::alignment_of<TAlign>::value;
3585 void* raw = (Traits::malloc)(size + alignment - 1 + sizeof(void*));
3586 if (!raw)
3587 return nullptr;
3588 char* ptr = details::align_for<TAlign>(reinterpret_cast<char*>(raw) + sizeof(void*));
3589 *(reinterpret_cast<void**>(ptr) - 1) = raw;
3590 return ptr;
3591 }
3592 }
3593
3594 template<typename TAlign>
3595 static inline void aligned_free(void* ptr)
3596 {
3597 MOODYCAMEL_CONSTEXPR_IF (std::alignment_of<TAlign>::value <= std::alignment_of<details::max_align_t>::value)
3598 return (Traits::free)(ptr);
3599 else
3600 (Traits::free)(ptr ? *(reinterpret_cast<void**>(ptr) - 1) : nullptr);
3601 }
3602
3603 template<typename U>
3604 static inline U* create_array(size_t count)
3605 {
3606 assert(count > 0);
3607 U* p = static_cast<U*>(aligned_malloc<U>(sizeof(U) * count));
3608 if (p == nullptr)
3609 return nullptr;
3610
3611 for (size_t i = 0; i != count; ++i)
3612 new (p + i) U();
3613 return p;
3614 }
3615
3616 template<typename U>
3617 static inline void destroy_array(U* p, size_t count)
3618 {
3619 if (p != nullptr) {
3620 assert(count > 0);
3621 for (size_t i = count; i != 0; )
3622 (p + --i)->~U();
3623 }
3624 aligned_free<U>(p);
3625 }
3626
3627 template<typename U>
3628 static inline U* create()
3629 {
3630 void* p = aligned_malloc<U>(sizeof(U));
3631 return p != nullptr ? new (p) U : nullptr;
3632 }
3633
3634 template<typename U, typename A1>
3635 static inline U* create(A1&& a1)
3636 {
3637 void* p = aligned_malloc<U>(sizeof(U));
3638 return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
3639 }
3640
3641 template<typename U>
3642 static inline void destroy(U* p)
3643 {
3644 if (p != nullptr)
3645 p->~U();
3646 aligned_free<U>(p);
3647 }
3648
3649private:
3650 std::atomic<ProducerBase*> producerListTail;
3651 std::atomic<std::uint32_t> producerCount;
3652
3653 std::atomic<size_t> initialBlockPoolIndex;
3654 Block* initialBlockPool;
3655 size_t initialBlockPoolSize;
3656
3657#ifndef MCDBGQ_USEDEBUGFREELIST
3658 FreeList<Block> freeList;
3659#else
3660 debug::DebugFreeList<Block> freeList;
3661#endif
3662
3663 std::atomic<ImplicitProducerHash*> implicitProducerHash;
3664 std::atomic<size_t> implicitProducerHashCount; // Number of slots logically used
3665 ImplicitProducerHash initialImplicitProducerHash;
3666 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3667 std::atomic_flag implicitProducerHashResizeInProgress;
3668
3669 std::atomic<std::uint32_t> nextExplicitConsumerId;
3670 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3671
3672#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3673 debug::DebugMutex implicitProdMutex;
3674#endif
3675
3676#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3677 std::atomic<ExplicitProducer*> explicitProducers;
3678 std::atomic<ImplicitProducer*> implicitProducers;
3679#endif
3680};
3681
3682
3683template<typename T, typename Traits>
3685 : producer(queue.recycle_or_create_producer(true))
3686{
3687 if (producer != nullptr) {
3688 producer->token = this;
3689 }
3690}
3691
3692template<typename T, typename Traits>
3694 : producer(reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->recycle_or_create_producer(true))
3695{
3696 if (producer != nullptr) {
3697 producer->token = this;
3698 }
3699}
3700
3701template<typename T, typename Traits>
3703 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3704{
3705 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3706 lastKnownGlobalOffset = static_cast<std::uint32_t>(-1);
3707}
3708
3709template<typename T, typename Traits>
3711 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3712{
3713 initialOffset = reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3714 lastKnownGlobalOffset = static_cast<std::uint32_t>(-1);
3715}
3716
3717template<typename T, typename Traits>
3722
3724{
3725 a.swap(b);
3726}
3727
3729{
3730 a.swap(b);
3731}
3732
3733template<typename T, typename Traits>
3734inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT
3735{
3736 a.swap(b);
3737}
3738
3739}
3740
3741#if defined(_MSC_VER) && (!defined(_HAS_CXX17) || !_HAS_CXX17)
3742#pragma warning(pop)
3743#endif
3744
3745#if defined(__GNUC__) && !defined(__INTEL_COMPILER)
3746#pragma GCC diagnostic pop
3747#endif
void * load(bx::FileReaderI *_reader, bx::AllocatorI *_allocator, const char *_filePath, uint32_t *_size)
entt::handle b
manifold_type type
entt::handle a
::moodycamel::ProducerToken producer_token_t
bool try_enqueue(producer_token_t const &token, T &&item)
bool enqueue(producer_token_t const &token, T const &item)
bool enqueue(producer_token_t const &token, T &&item)
ConcurrentQueue(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max)
static const size_t MAX_SUBQUEUE_SIZE
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
ConcurrentQueue & operator=(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
::moodycamel::ConsumerToken consumer_token_t
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
bool enqueue_bulk(It itemFirst, size_t count)
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
bool try_dequeue_from_producer(producer_token_t const &producer, U &item)
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
bool try_dequeue(consumer_token_t &token, U &item)
void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
size_t try_dequeue_bulk(It itemFirst, size_t max)
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
ConcurrentQueue(size_t capacity=32 *BLOCK_SIZE)
static constexpr bool is_lock_free()
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
bool try_enqueue(T const &item)
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
ConcurrentQueue & operator=(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
bool enqueue(T const &item)
bool try_enqueue(producer_token_t const &token, T const &item)
ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
friend void moodycamel::swap(typename ConcurrentQueue< XT, XTraits >::ImplicitProducerKVP &, typename ConcurrentQueue< XT, XTraits >::ImplicitProducerKVP &) MOODYCAMEL_NOEXCEPT
bool try_dequeue_non_interleaved(U &item)
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
static const size_t BLOCK_SIZE
ConcurrentQueue(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
bool try_enqueue_bulk(It itemFirst, size_t count)
static void subscribe(ThreadExitListener *listener)
static void unsubscribe(ThreadExitListener *listener)
#define MOODYCAMEL_NOEXCEPT
#define MOODYCAMEL_ALIGNED_TYPE_LIKE(T, obj)
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)
#define MOODYCAMEL_THREADLOCAL
#define MOODYCAMEL_CONSTEXPR_IF
#define MOODYCAMEL_DELETE_FUNCTION
#define MOODYCAMEL_CATCH(...)
#define MOODYCAMEL_MAYBE_UNUSED
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr)
#define MOODYCAMEL_RETHROW
#define MOODYCAMEL_TRY
#define MOODYCAMEL_NO_TSAN
int count(const generator_t &generator) noexcept
Counts the number of steps left in the generator.
Definition utils.hpp:70
bgfx::Stats stats
Definition graphics.h:30
thread_id_t thread_id()
std::max_align_t std_max_align_t
std::uintptr_t thread_id_t
void swap(typename ConcurrentQueue< T, Traits >::ImplicitProducerKVP &a, typename ConcurrentQueue< T, Traits >::ImplicitProducerKVP &b) MOODYCAMEL_NOEXCEPT
utfchar32_t next(octet_iterator &it, octet_iterator end)
Definition checked.h:151
float x
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
ConsumerToken(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION
ConsumerToken(ConcurrentQueue< T, Traits > &q)
ConsumerToken & operator=(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
ConsumerToken & operator=(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION
ConsumerToken(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
void swap(ConsumerToken &other) MOODYCAMEL_NOEXCEPT
ProducerToken & operator=(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION
ProducerToken(ConcurrentQueue< T, Traits > &queue)
details::ConcurrentQueueProducerTypelessBase * producer
ProducerToken(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION
void swap(ProducerToken &other) MOODYCAMEL_NOEXCEPT
ProducerToken(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
ProducerToken & operator=(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
static std::uint64_t hash(std::uint64_t h)
static std::uint32_t hash(std::uint32_t h)
static auto eval(U &&x) -> decltype(std::forward< U >(x))
static T const & eval(T const &x)
static thread_id_hash_t prehash(thread_id_t const &x)