• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #869

20 Jan 2023 12:14AM UTC coverage: 86.397% (-0.09%) from 86.487%
#869

push

web-flow
Merge pull request #6142 from msimberg/update-daint-jenkins-perftest-references

Update performance test references for Piz Daint

174481 of 201952 relevant lines covered (86.4%)

2150263.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.23
/libs/core/concurrency/include/hpx/concurrency/concurrentqueue.hpp
1
//  Copyright (c) 2013-2016, Cameron Desrochers.
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
// hpx-no-inspect : disable automatic code inspection for this file
8

9
// ---------------------------------------------------------------------------
10
// This file has been taken from https://github.com/cameron314/concurrentqueue
11
// commit dea078cf5b6e742cd67a0d725e36f872feca4de4
12
// The boost copyright has been added to this file in accordance with the
13
// dual license terms for the concurrentqueue and conformance with the HPX policy
14
// https://github.com/cameron314/concurrentqueue/blob/master/LICENSE.md
15
// ---------------------------------------------------------------------------
16

17
// Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
18
// An overview, including benchmark results, is provided here:
19
//     http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
20
// The full design is also described in excruciating detail at:
21
//    http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
22

23
// Simplified BSD license:
24
// Copyright (c) 2013-2016, Cameron Desrochers.
25
// All rights reserved.
26
//
27
// Redistribution and use in source and binary forms, with or without modification,
28
// are permitted provided that the following conditions are met:
29
//
30
// - Redistributions of source code must retain the above copyright notice, this list of
31
// conditions and the following disclaimer.
32
// - Redistributions in binary form must reproduce the above copyright notice, this list of
33
// conditions and the following disclaimer in the documentation and/or other materials
34
// provided with the distribution.
35
//
36
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
37
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
38
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
39
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
40
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
41
// OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
42
// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
43
// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
44
// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
45

46
// clang-format off
47

48
#pragma once
49

50
#include <hpx/config/move.hpp>
51
#include <hpx/config/forward.hpp>
52

53
#if defined(__GNUC__)
54
// Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
55
// Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
56
// upon assigning any computed values)
57
#pragma GCC diagnostic push
58
#pragma GCC diagnostic ignored "-Wconversion"
59

60
#ifdef MCDBGQ_USE_RELACY
61
#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
62
#endif
63
#endif
64

65
#if defined(__APPLE__)
66
#include "TargetConditionals.h"
67
#endif
68

69
#ifdef MCDBGQ_USE_RELACY
70
#include "relacy/relacy_std.hpp"
71
#include "relacy_shims.h"
72
// We only use malloc/free anyway, and the delete macro messes up `= delete` method declarations.
73
// We'll override the default trait malloc ourselves without a macro.
74
#undef new
75
#undef delete
76
#undef malloc
77
#undef free
78
#else
79
#include <atomic>    // Requires C++11. Sorry VS2010.
80
#include <cassert>
81
#endif
82
#include <cstddef>              // for max_align_t
83
#include <cstdint>
84
#include <cstdlib>
85
#include <type_traits>
86
#include <algorithm>
87
#include <utility>
88
#include <limits>
89
#include <climits>    // for CHAR_BIT
90
#include <array>
91
#include <thread>    // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
92

93
// Platform-specific definitions of a numeric thread ID type and an invalid value
94
namespace hpx { namespace concurrency { namespace details {
95
    template<typename thread_id_t> struct thread_id_converter {
96
        typedef thread_id_t thread_id_numeric_size_t;
97
        typedef thread_id_t thread_id_hash_t;
98
        static thread_id_hash_t prehash(thread_id_t const& x) { return x; }
279,067✔
99
    };
100
} } }
101
#if defined(MCDBGQ_USE_RELACY)
102
namespace hpx { namespace concurrency { namespace details {
103
    typedef std::uint32_t thread_id_t;
104
    static const thread_id_t invalid_thread_id  = 0xFFFFFFFFU;
105
    static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
106
    static inline thread_id_t thread_id() { return rl::thread_index(); }
107
} } }
108
#elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
109
// No sense pulling in windows.h in a header, we'll manually declare the function
110
// we use and rely on backwards-compatibility for this not to break
111
extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
112
namespace hpx { namespace concurrency { namespace details {
113
    static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows");
114
    typedef std::uint32_t thread_id_t;
115
    static const thread_id_t invalid_thread_id  = 0;      // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
116
    static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU;  // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread IDs are presently multiples of 4.
117
    static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); }
118
} } }
119
#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE)
120
namespace hpx { namespace concurrency { namespace details {
121
    static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes");
122

123
    typedef std::thread::id thread_id_t;
124
    static const thread_id_t invalid_thread_id;         // Default ctor creates invalid ID
125

126
    // Note we don't define a invalid_thread_id2 since std::thread::id doesn't have one; it's
127
    // only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined anyway, which it won't
128
    // be.
129
    static inline thread_id_t thread_id() { return std::this_thread::get_id(); }
130

131
    template<std::size_t> struct thread_id_size { };
132
    template<> struct thread_id_size<4> { typedef std::uint32_t numeric_t; };
133
    template<> struct thread_id_size<8> { typedef std::uint64_t numeric_t; };
134

135
    template<> struct thread_id_converter<thread_id_t> {
136
        typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
137
#if !defined(__APPLE__)
138
        typedef std::size_t thread_id_hash_t;
139
#else
140
        typedef thread_id_numeric_size_t thread_id_hash_t;
141
#endif
142

143
        static thread_id_hash_t prehash(thread_id_t const& x)
144
        {
145
#if !defined(__APPLE__)
146
            return std::hash<std::thread::id>()(x);
147
#else
148
            return *reinterpret_cast<thread_id_hash_t const*>(&x);
149
#endif
150
        }
151
    };
152
} } }
153
#else
154
// Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
155
// In order to get a numeric thread ID in a platform-independent way, we use a thread-local
156
// static variable's address as a thread identifier :-)
157
#if defined(__GNUC__) || defined(__INTEL_COMPILER)
158
#define MOODYCAMEL_THREADLOCAL __thread
159
#elif defined(_MSC_VER)
160
#define MOODYCAMEL_THREADLOCAL __declspec(thread)
161
#else
162
// Assume C++11 compliant compiler
163
#define MOODYCAMEL_THREADLOCAL thread_local
164
#endif
165
namespace hpx { namespace concurrency { namespace details {
166
    typedef std::uintptr_t thread_id_t;
167
    static const thread_id_t invalid_thread_id  = 0;    // Address can't be nullptr
168
    static const thread_id_t invalid_thread_id2 = 1;    // Member accesses off a null pointer are also generally invalid. Plus it's not aligned.
169
    static inline thread_id_t thread_id() { static MOODYCAMEL_THREADLOCAL int x; return reinterpret_cast<thread_id_t>(&x); }
279,086✔
170
} } }
171
#endif
172

173
// Exceptions
174
#if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
175
#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
176
#define MOODYCAMEL_EXCEPTIONS_ENABLED
177
#endif
178
#endif
179
#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
180
#define MOODYCAMEL_TRY try
181
#define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__)
182
#define MOODYCAMEL_RETHROW throw
183
#define MOODYCAMEL_THROW(expr) throw (expr)
184
#else
185
#define MOODYCAMEL_TRY if (true)
186
#define MOODYCAMEL_CATCH(...) else if (false)
187
#define MOODYCAMEL_RETHROW
188
#define MOODYCAMEL_THROW(expr)
189
#endif
190

191
#if !defined(MOODYCAMEL_NOEXCEPT)
192
#if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
193
#define MOODYCAMEL_NOEXCEPT
194
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
195
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
196
#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
197
// VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-(
198
// We have to assume *all* non-trivial constructors may throw on VS2012!
199
#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
200
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
201
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
202
#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
203
#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
204
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
205
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
206
#else
207
#define MOODYCAMEL_NOEXCEPT noexcept
208
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
209
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
210
#endif
211
#endif
212

213
#if !defined(MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED)
214
#ifdef MCDBGQ_USE_RELACY
215
#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
216
#else
217
// VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445
218
// g++ <=4.7 doesn't support thread_local either.
219
// Finally, iOS/ARM doesn't have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work
220
#if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
221
// Assume `thread_local` is fully supported in all other C++11 compilers/platforms
222
//#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED    // always disabled for now since several users report having problems with it on
223
#endif
224
#endif
225
#endif
226

227
// VS2012 doesn't support deleted functions.
228
// In this case, we declare the function normally but don't define it. A link error will be generated if the function is called.
229
#if !defined(MOODYCAMEL_DELETE_FUNCTION)
230
#if defined(_MSC_VER) && _MSC_VER < 1800
231
#define MOODYCAMEL_DELETE_FUNCTION
232
#else
233
#define MOODYCAMEL_DELETE_FUNCTION = delete
234
#endif
235
#endif
236

237
// Compiler-specific likely/unlikely hints
238
namespace hpx { namespace concurrency { namespace details {
239
#if defined(__GNUC__)
240
    static inline bool (likely)(bool x) { return __builtin_expect((x), true); }
558,231✔
241
    static inline bool (unlikely)(bool x) { return __builtin_expect((x), false); }
242
#else
243
    static inline bool (likely)(bool x) { return x; }
244
    static inline bool (unlikely)(bool x) { return x; }
245
#endif
246
} } }
247

248
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
249
#include "internal/concurrentqueue_internal_debug.h"
250
#endif
251

252
namespace hpx { namespace concurrency {
253
namespace details {
254
    template<typename T>
255
    struct const_numeric_max {
256
        static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");
257
        static const T value = std::numeric_limits<T>::is_signed
258
            ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
259
            : static_cast<T>(-1);
260
    };
261

262
#if defined(__GLIBCXX__)
263
    typedef ::max_align_t std_max_align_t;      // libstdc++ forgot to add it to std:: for a while
264
#else
265
    typedef std::max_align_t std_max_align_t;   // Others (e.g. MSVC) insist it can *only* be accessed via std::
266
#endif
267

268
    // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
269
    // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
270
    typedef union {
271
        std_max_align_t x;
272
        long long y;
273
        void* z;
274
    } max_align_t;
275
}
276

277
// Default traits for the ConcurrentQueue. To change some of the
278
// traits without re-implementing all of them, inherit from this
279
// struct and shadow the declarations you wish to be different;
280
// since the traits are used as a template type parameter, the
281
// shadowed declarations will be used where defined, and the defaults
282
// otherwise.
283
struct ConcurrentQueueDefaultTraits
284
{
285
    // General-purpose size type. std::size_t is strongly recommended.
286
    typedef std::size_t size_t;
287

288
    // The type used for the enqueue and dequeue indices. Must be at least as
289
    // large as size_t. Should be significantly larger than the number of elements
290
    // you expect to hold at once, especially if you have a high turnover rate;
291
    // for example, on 32-bit x86, if you expect to have over a hundred million
292
    // elements or pump several million elements through your queue in a very
293
    // short space of time, using a 32-bit type *may* trigger a race condition.
294
    // A 64-bit int type is recommended in that case, and in practice will
295
    // prevent a race condition no matter the usage of the queue. Note that
296
    // whether the queue is lock-free with a 64-int type depends on the whether
297
    // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
298
    typedef std::size_t index_t;
299

300
    // Internally, all elements are enqueued and dequeued from multi-element
301
    // blocks; this is the smallest controllable unit. If you expect few elements
302
    // but many producers, a smaller block size should be favoured. For few producers
303
    // and/or many elements, a larger block size is preferred. A sane default
304
    // is provided. Must be a power of 2.
305
    static const size_t BLOCK_SIZE = 32;
306

307
    // For explicit producers (i.e. when using a producer token), the block is
308
    // checked for being empty by iterating through a list of flags, one per element.
309
    // For large block sizes, this is too inefficient, and switching to an atomic
310
    // counter-based approach is faster. The switch is made for block sizes strictly
311
    // larger than this threshold.
312
    static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
313

314
    // How many full blocks can be expected for a single explicit producer? This should
315
    // reflect that number's maximum for optimal performance. Must be a power of 2.
316
    static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
317

318
    // How many full blocks can be expected for a single implicit producer? This should
319
    // reflect that number's maximum for optimal performance. Must be a power of 2.
320
    static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
321

322
    // The initial size of the hash table mapping thread IDs to implicit producers.
323
    // Note that the hash is resized every time it becomes half full.
324
    // Must be a power of two, and either 0 or at least 1. If 0, implicit production
325
    // (using the enqueue methods without an explicit producer token) is disabled.
326
    static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
327

328
    // Controls the number of items that an explicit consumer (i.e. one with a token)
329
    // must consume before it causes all consumers to rotate and move on to the next
330
    // internal queue.
331
    static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
332

333
    // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
334
    // Enqueue operations that would cause this limit to be surpassed will fail. Note
335
    // that this limit is enforced at the block level (for performance reasons), i.e.
336
    // it's rounded up to the nearest block size.
337
    static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value;
338

339

340
#if !defined(MCDBGQ_USE_RELACY)
341
    // Memory allocation can be customized if needed.
342
    // malloc should return nullptr on failure, and handle alignment like std::malloc.
343
#if defined(malloc) || defined(free)
344
    // Gah, this is 2015, stop defining macros that break standard code already!
345
    // Work around malloc/free being special macros:
346
    static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
347
    static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
348
    static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
349
    static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
350
#else
351
    static inline void* malloc(size_t size) { return std::malloc(size); }
5,455✔
352
    static inline void free(void* ptr) { return std::free(ptr); }
5,455✔
353
#endif
354
#else
355
    // Debug versions when running under the Relacy race detector (ignore
356
    // these in user code)
357
    static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); }
358
    static inline void free(void* ptr) { return rl::rl_free(ptr, $); }
359
#endif
360
};
361

362

363
// When producing or consuming many elements, the most efficient way is to:
364
//    1) Use one of the bulk-operation methods of the queue with a token
365
//    2) Failing that, use the bulk-operation methods without a token
366
//    3) Failing that, create a token and use that with the single-item methods
367
//    4) Failing that, use the single-parameter methods of the queue
368
// Having said that, don't create tokens willy-nilly -- ideally there should be
369
// a maximum of one token per thread (of each kind).
370
struct ProducerToken;
371
struct ConsumerToken;
372

373
template<typename T, typename Traits> class ConcurrentQueue;
374
template<typename T, typename Traits> class BlockingConcurrentQueue;
375
class ConcurrentQueueTests;
376

377

378
namespace details
379
{
380
    struct ConcurrentQueueProducerTypelessBase
381
    {
382
        ConcurrentQueueProducerTypelessBase* next;
383
        std::atomic<bool> inactive;
384
        ProducerToken* token;
385

386
        ConcurrentQueueProducerTypelessBase()
1,697✔
387
            : next(nullptr), inactive(false), token(nullptr)
1,697✔
388
        {
389
        }
1,697✔
390
    };
391

392
    template<bool use32> struct _hash_32_or_64 {
393
        static inline std::uint32_t hash(std::uint32_t h)
394
        {
395
            // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
396
            // Since the thread ID is already unique, all we really want to do is propagate that
397
            // uniqueness evenly across all the bits, so that we can use a subset of the bits while
398
            // reducing collisions significantly
399
            h ^= h >> 16;
400
            h *= 0x85ebca6b;
401
            h ^= h >> 13;
402
            h *= 0xc2b2ae35;
403
            return h ^ (h >> 16);
404
        }
405
    };
406
    template<> struct _hash_32_or_64<1> {
407
        static inline std::uint64_t hash(std::uint64_t h)
279,089✔
408
        {
409
            h ^= h >> 33;
279,089✔
410
            h *= 0xff51afd7ed558ccd;
279,089✔
411
            h ^= h >> 33;
279,089✔
412
            h *= 0xc4ceb9fe1a85ec53;
279,089✔
413
            return h ^ (h >> 33);
279,089✔
414
        }
415
    };
416
    template<std::size_t size> struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> {  };
417

418
    static inline size_t hash_thread_id(thread_id_t id)
279,092✔
419
    {
420
        static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
421
        return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
279,092✔
422
            thread_id_converter<thread_id_t>::prehash(id)));
279,092✔
423
    }
424

425
#ifdef _MSC_VER
426
#pragma warning(push)
427
#pragma warning(disable: 4554)
428
#endif
429
    template<typename T>
430
    static inline bool circular_less_than(T a, T b)
998,775✔
431
    {
432
        static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types");
433
        return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << static_cast<T>(sizeof(T) * CHAR_BIT - 1));
998,779✔
434
    }
435
#ifdef _MSC_VER
436
#pragma warning(pop)
437
#endif
438

439
    template<typename U>
440
    static inline char* align_for(char* ptr)
3,510✔
441
    {
442
        const std::size_t alignment = std::alignment_of<U>::value;
3,510✔
443
        return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
3,510✔
444
    }
445

446
    template<typename T>
447
    static inline T ceil_to_pow_2(T x)
×
448
    {
449
        static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types");
450

451
        // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
452
        --x;
×
453
        x |= x >> 1;
×
454
        x |= x >> 2;
×
455
        x |= x >> 4;
×
456
        for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
×
457
            x |= x >> (i << 3);
×
458
        }
×
459
        ++x;
×
460
        return x;
×
461
    }
462

463
    template<typename T>
464
    static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
465
    {
466
        T temp = HPX_MOVE(left.load(std::memory_order_relaxed));
467
        left.store(HPX_MOVE(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
468
        right.store(HPX_MOVE(temp), std::memory_order_relaxed);
469
    }
470

471
    template<typename T>
472
    static inline T const& nomove(T const& x)
473
    {
474
        return x;
475
    }
476

477
    template<bool Enable>
478
    struct nomove_if
479
    {
480
        template<typename T>
481
        static inline T const& eval(T const& x)
482
        {
483
            return x;
484
        }
485
    };
486

487
    template<>
488
    struct nomove_if<false>
489
    {
490
        template<typename U>
491
        static inline auto eval(U&& x)
492
            -> decltype(HPX_FORWARD(U, x))
493
        {
494
            return HPX_FORWARD(U, x);
495
        }
496
    };
497

498
    template<typename It>
499
    static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
500
    {
501
        return *it;
502
    }
503

504
#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
505
    template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { };
506
#else
507
    template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
508
#endif
509

510
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
511
#ifdef MCDBGQ_USE_RELACY
512
    typedef RelacyThreadExitListener ThreadExitListener;
513
    typedef RelacyThreadExitNotifier ThreadExitNotifier;
514
#else
515
    struct ThreadExitListener
516
    {
517
        typedef void (*callback_t)(void*);
518
        callback_t callback;
519
        void* userData;
520

521
        ThreadExitListener* next;    // reserved for use by the ThreadExitNotifier
522
    };
523

524

525
    class ThreadExitNotifier
526
    {
527
    public:
528
        static void subscribe(ThreadExitListener* listener)
529
        {
530
            auto& tlsInst = instance();
531
            listener->next = tlsInst.tail;
532
            tlsInst.tail = listener;
533
        }
534

535
        static void unsubscribe(ThreadExitListener* listener)
536
        {
537
            auto& tlsInst = instance();
538
            ThreadExitListener** prev = &tlsInst.tail;
539
            for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
540
                if (ptr == listener) {
541
                    *prev = ptr->next;
542
                    break;
543
                }
544
                prev = &ptr->next;
545
            }
546
        }
547

548
    private:
549
        ThreadExitNotifier() : tail(nullptr) { }
550
        ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
551
        ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
552

553
        ~ThreadExitNotifier()
554
        {
555
            // This thread is about to exit, let everyone know!
556
            assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
557
            for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
558
                ptr->callback(ptr->userData);
559
            }
560
        }
561

562
        // Thread-local
563
        static inline ThreadExitNotifier& instance()
564
        {
565
            static thread_local ThreadExitNotifier notifier;
566
            return notifier;
567
        }
568

569
    private:
570
        ThreadExitListener* tail;
571
    };
572
#endif
573
#endif
574

575
    template<typename T> struct static_is_lock_free_num { enum { value = 0 }; };
576
    template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; };
577
    template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; };
578
    template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; };
579
    template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; };
580
    template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; };
581
    template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> {  };
582
    template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; };
583
    template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; };
584
}
585

586

587
struct ProducerToken
588
{
589
    template<typename T, typename Traits>
590
    explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);
591

592
    template<typename T, typename Traits>
593
    explicit ProducerToken(BlockingConcurrentQueue<T, Traits>& queue);
594

595
    ProducerToken(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
596
        : producer(other.producer)
597
    {
598
        other.producer = nullptr;
599
        if (producer != nullptr) {
600
            producer->token = this;
601
        }
602
    }
603

604
    inline ProducerToken& operator=(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
605
    {
606
        swap(other);
607
        return *this;
608
    }
609

610
    void swap(ProducerToken& other) MOODYCAMEL_NOEXCEPT
611
    {
612
        std::swap(producer, other.producer);
613
        if (producer != nullptr) {
614
            producer->token = this;
615
        }
616
        if (other.producer != nullptr) {
617
            other.producer->token = &other;
618
        }
619
    }
620

621
    // A token is always valid unless:
622
    //     1) Memory allocation failed during construction
623
    //     2) It was moved via the move constructor
624
    //        (Note: assignment does a swap, leaving both potentially valid)
625
    //     3) The associated queue was destroyed
626
    // Note that if valid() returns true, that only indicates
627
    // that the token is valid for use with a specific queue,
628
    // but not which one; that's up to the user to track.
629
    inline bool valid() const { return producer != nullptr; }
630

631
    ~ProducerToken()
632
    {
633
        if (producer != nullptr) {
634
            producer->token = nullptr;
635
            producer->inactive.store(true, std::memory_order_release);
636
        }
637
    }
638

639
    // Disable copying and assignment
640
    ProducerToken(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
641
    ProducerToken& operator=(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
642

643
private:
644
    template<typename T, typename Traits> friend class ConcurrentQueue;
645
    friend class ConcurrentQueueTests;
646

647
protected:
648
    details::ConcurrentQueueProducerTypelessBase* producer;
649
};
650

651

652
struct ConsumerToken
653
{
654
    template<typename T, typename Traits>
655
    explicit ConsumerToken(ConcurrentQueue<T, Traits>& q);
656

657
    template<typename T, typename Traits>
658
    explicit ConsumerToken(BlockingConcurrentQueue<T, Traits>& q);
659

660
    ConsumerToken(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
661
        : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
662
    {
663
    }
664

665
    inline ConsumerToken& operator=(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
666
    {
667
        swap(other);
668
        return *this;
669
    }
670

671
    void swap(ConsumerToken& other) MOODYCAMEL_NOEXCEPT
672
    {
673
        std::swap(initialOffset, other.initialOffset);
674
        std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
675
        std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
676
        std::swap(currentProducer, other.currentProducer);
677
        std::swap(desiredProducer, other.desiredProducer);
678
    }
679

680
    // Disable copying and assignment
681
    ConsumerToken(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
682
    ConsumerToken& operator=(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
683

684
private:
685
    template<typename T, typename Traits> friend class ConcurrentQueue;
686
    friend class ConcurrentQueueTests;
687

688
private: // but shared with ConcurrentQueue
689
    std::uint32_t initialOffset;
690
    std::uint32_t lastKnownGlobalOffset;
691
    std::uint32_t itemsConsumedFromCurrent;
692
    details::ConcurrentQueueProducerTypelessBase* currentProducer;
693
    details::ConcurrentQueueProducerTypelessBase* desiredProducer;
694
};
695

696
// Need to forward-declare this swap because it's in a namespace.
697
// See http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
698
template<typename T, typename Traits>
699
inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT;
700

701

702
template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
703
class ConcurrentQueue
704
{
705
public:
706
    typedef ::hpx::concurrency::ProducerToken producer_token_t;
707
    typedef ::hpx::concurrency::ConsumerToken consumer_token_t;
708

709
    typedef typename Traits::index_t index_t;
710
    typedef typename Traits::size_t size_t;
711

712
    static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
713
    static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
714
    static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
715
    static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
716
    static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
717
    static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
718
#ifdef _MSC_VER
719
#pragma warning(push)
720
#pragma warning(disable: 4307)    // + integral constant overflow (that's what the ternary expression is for!)
721
#pragma warning(disable: 4309)    // static_cast: Truncation of constant value
722
#endif
723
    static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
724
#ifdef _MSC_VER
725
#pragma warning(pop)
726
#endif
727

728
    static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value, "Traits::size_t must be an unsigned integral type");
729
    static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value, "Traits::index_t must be an unsigned integral type");
730
    static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t");
731
    static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
732
    static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
733
    static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
734
    static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
735
    static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) || !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)), "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
736
    static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1, "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
737

738
public:
739
    // Creates a queue with at least `capacity` element slots; note that the
740
    // actual number of elements that can be inserted without additional memory
741
    // allocation depends on the number of producers and the block size (e.g. if
742
    // the block size is equal to `capacity`, only a single block will be allocated
743
    // up-front, which means only a single producer will be able to enqueue elements
744
    // without an extra allocation -- blocks aren't shared between producers).
745
    // This method is not thread safe -- it is up to the user to ensure that the
746
    // queue is fully constructed before it starts being used by other threads (this
747
    // includes making the memory effects of construction visible, possibly with a
748
    // memory barrier).
749
    explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
972✔
750
        : producerListTail(nullptr),
972✔
751
        producerCount(0),
972✔
752
        initialBlockPoolIndex(0),
972✔
753
        nextExplicitConsumerId(0),
972✔
754
        globalExplicitConsumerOffset(0)
972✔
755
    {
756
        implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
972✔
757
        populate_initial_implicit_producer_hash();
972✔
758
        populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
972✔
759

760
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
761
        // Track all the producers using a fully-resolved typed list for
762
        // each kind; this makes it possible to debug them starting from
763
        // the root queue object (otherwise wacky casts are needed that
764
        // don't compile in the debugger's expression evaluator).
765
        explicitProducers.store(nullptr, std::memory_order_relaxed);
766
        implicitProducers.store(nullptr, std::memory_order_relaxed);
767
#endif
768
    }
972✔
769

770
    // Computes the correct amount of pre-allocated blocks for you based
771
    // on the minimum number of elements you want available at any given
772
    // time, and the maximum concurrent number of each type of producer.
773
    ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
774
        : producerListTail(nullptr),
775
        producerCount(0),
776
        initialBlockPoolIndex(0),
777
        nextExplicitConsumerId(0),
778
        globalExplicitConsumerOffset(0)
779
    {
780
        implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
781
        populate_initial_implicit_producer_hash();
782
        size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers);
783
        populate_initial_block_list(blocks);
784

785
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
786
        explicitProducers.store(nullptr, std::memory_order_relaxed);
787
        implicitProducers.store(nullptr, std::memory_order_relaxed);
788
#endif
789
    }
790

791
    // Note: The queue should not be accessed concurrently while it's
792
    // being deleted. It's up to the user to synchronize this.
793
    // This method is not thread safe.
794
    ~ConcurrentQueue()
972✔
795
    {
796
        // Destroy producers
797
        auto ptr = producerListTail.load(std::memory_order_relaxed);
972✔
798
        while (ptr != nullptr) {
2,669✔
799
            auto next = ptr->next_prod();
1,697✔
800
            if (ptr->token != nullptr) {
1,697✔
801
                ptr->token->producer = nullptr;
×
802
            }
×
803
            destroy(ptr);
1,697✔
804
            ptr = next;
1,697✔
805
        }
806

807
        // Destroy implicit producer hash tables
808
        if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0) {
809
            auto hash = implicitProducerHash.load(std::memory_order_relaxed);
972✔
810
            while (hash != nullptr) {
2,014✔
811
                auto prev = hash->prev;
1,042✔
812
                if (prev != nullptr) {    // The last hash is part of this object and was not allocated dynamically
1,042✔
813
                    for (size_t i = 0; i != hash->capacity; ++i) {
4,550✔
814
                        hash->entries[i].~ImplicitProducerKVP();
4,480✔
815
                    }
4,480✔
816
                    hash->~ImplicitProducerHash();
70✔
817
                    (Traits::free)(hash);
70✔
818
                }
70✔
819
                hash = prev;
1,042✔
820
            }
821
        }
822

823
        // Destroy global free list
824
        auto block = freeList.head_unsafe();
972✔
825
        while (block != nullptr) {
4,625✔
826
            auto next = block->freeListNext.load(std::memory_order_relaxed);
3,653✔
827
            if (block->dynamicallyAllocated) {
3,653✔
828
                destroy(block);
996✔
829
            }
996✔
830
            block = next;
3,653✔
831
        }
832

833
        // Destroy initial free list
834
        destroy_array(initialBlockPool, initialBlockPoolSize);
972✔
835
    }
972✔
836

837
    // Disable copying and copy assignment
838
    ConcurrentQueue(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
839
    ConcurrentQueue& operator=(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
840

841
    // Moving is supported, but note that it is *not* a thread-safe operation.
842
    // Nobody can use the queue while it's being moved, and the memory effects
843
    // of that move must be propagated to other threads before they can use it.
844
    // Note: When a queue is moved, its tokens are still valid but can only be
845
    // used with the destination queue (i.e. semantically they are moved along
846
    // with the queue itself).
847
    ConcurrentQueue(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
848
        : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
849
        producerCount(other.producerCount.load(std::memory_order_relaxed)),
850
        initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
851
        initialBlockPool(other.initialBlockPool),
852
        initialBlockPoolSize(other.initialBlockPoolSize),
853
        freeList(HPX_MOVE(other.freeList)),
854
        nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
855
        globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
856
    {
857
        // Move the other one into this, and leave the other one as an empty queue
858
        implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
859
        populate_initial_implicit_producer_hash();
860
        swap_implicit_producer_hashes(other);
861

862
        other.producerListTail.store(nullptr, std::memory_order_relaxed);
863
        other.producerCount.store(0, std::memory_order_relaxed);
864
        other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
865
        other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
866

867
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
868
        explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
869
        other.explicitProducers.store(nullptr, std::memory_order_relaxed);
870
        implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
871
        other.implicitProducers.store(nullptr, std::memory_order_relaxed);
872
#endif
873

874
        other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
875
        other.initialBlockPoolSize = 0;
876
        other.initialBlockPool = nullptr;
877

878
        reown_producers();
879
    }
880

881
    inline ConcurrentQueue& operator=(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
882
    {
883
        return swap_internal(other);
884
    }
885

886
    // Swaps this queue's state with the other's. Not thread-safe.
887
    // Swapping two queues does not invalidate their tokens, however
888
    // the tokens that were created for one queue must be used with
889
    // only the swapped queue (i.e. the tokens are tied to the
890
    // queue's movable state, not the object itself).
891
    inline void swap(ConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
892
    {
893
        swap_internal(other);
894
    }
895

896
private:
897
    ConcurrentQueue& swap_internal(ConcurrentQueue& other)
898
    {
899
        if (this == &other) {
900
            return *this;
901
        }
902

903
        details::swap_relaxed(producerListTail, other.producerListTail);
904
        details::swap_relaxed(producerCount, other.producerCount);
905
        details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
906
        std::swap(initialBlockPool, other.initialBlockPool);
907
        std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
908
        freeList.swap(other.freeList);
909
        details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
910
        details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
911

912
        swap_implicit_producer_hashes(other);
913

914
        reown_producers();
915
        other.reown_producers();
916

917
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
918
        details::swap_relaxed(explicitProducers, other.explicitProducers);
919
        details::swap_relaxed(implicitProducers, other.implicitProducers);
920
#endif
921

922
        return *this;
923
    }
924

925
public:
926
    // Enqueues a single item (by copying it).
927
    // Allocates memory if required. Only fails if memory allocation fails (or implicit
928
    // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
929
    // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
930
    // Thread-safe.
931
    inline bool enqueue(T const& item)
932
    {
933
        if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
934
        return inner_enqueue<CanAlloc>(item);
935
    }
936

937
    // Enqueues a single item (by moving it, if possible).
938
    // Allocates memory if required. Only fails if memory allocation fails (or implicit
939
    // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
940
    // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
941
    // Thread-safe.
942
    inline bool enqueue(T&& item)
279,115✔
943
    {
944
        if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
945
        return inner_enqueue<CanAlloc>(HPX_MOVE(item));
279,115✔
946
    }
947

948
    // Enqueues a single item (by copying it) using an explicit producer token.
949
    // Allocates memory if required. Only fails if memory allocation fails (or
950
    // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
951
    // Thread-safe.
952
    inline bool enqueue(producer_token_t const& token, T const& item)
953
    {
954
        return inner_enqueue<CanAlloc>(token, item);
955
    }
956

957
    // Enqueues a single item (by moving it, if possible) using an explicit producer token.
958
    // Allocates memory if required. Only fails if memory allocation fails (or
959
    // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
960
    // Thread-safe.
961
    inline bool enqueue(producer_token_t const& token, T&& item)
962
    {
963
        return inner_enqueue<CanAlloc>(token, HPX_MOVE(item));
964
    }
965

966
    // Enqueues several items.
967
    // Allocates memory if required. Only fails if memory allocation fails (or
968
    // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
969
    // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
970
    // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
971
    // Thread-safe.
972
    template<typename It>
973
    bool enqueue_bulk(It itemFirst, size_t count)
974
    {
975
        if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
976
        return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
977
    }
978

979
    // Enqueues several items using an explicit producer token.
980
    // Allocates memory if required. Only fails if memory allocation fails
981
    // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
982
    // Note: Use std::make_move_iterator if the elements should be moved
983
    // instead of copied.
984
    // Thread-safe.
985
    template<typename It>
986
    bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
987
    {
988
        return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
989
    }
990

991
    // Enqueues a single item (by copying it).
992
    // Does not allocate memory. Fails if not enough room to enqueue (or implicit
993
    // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
994
    // is 0).
995
    // Thread-safe.
996
    inline bool try_enqueue(T const& item)
997
    {
998
        if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
999
        return inner_enqueue<CannotAlloc>(item);
1000
    }
1001

1002
    // Enqueues a single item (by moving it, if possible).
1003
    // Does not allocate memory (except for one-time implicit producer).
1004
    // Fails if not enough room to enqueue (or implicit production is
1005
    // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1006
    // Thread-safe.
1007
    inline bool try_enqueue(T&& item)
1008
    {
1009
        if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1010
        return inner_enqueue<CannotAlloc>(HPX_MOVE(item));
1011
    }
1012

1013
    // Enqueues a single item (by copying it) using an explicit producer token.
1014
    // Does not allocate memory. Fails if not enough room to enqueue.
1015
    // Thread-safe.
1016
    inline bool try_enqueue(producer_token_t const& token, T const& item)
1017
    {
1018
        return inner_enqueue<CannotAlloc>(token, item);
1019
    }
1020

1021
    // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1022
    // Does not allocate memory. Fails if not enough room to enqueue.
1023
    // Thread-safe.
1024
    inline bool try_enqueue(producer_token_t const& token, T&& item)
1025
    {
1026
        return inner_enqueue<CannotAlloc>(token, HPX_MOVE(item));
1027
    }
1028

1029
    // Enqueues several items.
1030
    // Does not allocate memory (except for one-time implicit producer).
1031
    // Fails if not enough room to enqueue (or implicit production is
1032
    // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1033
    // Note: Use std::make_move_iterator if the elements should be moved
1034
    // instead of copied.
1035
    // Thread-safe.
1036
    template<typename It>
1037
    bool try_enqueue_bulk(It itemFirst, size_t count)
1038
    {
1039
        if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1040
        return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1041
    }
1042

1043
    // Enqueues several items using an explicit producer token.
1044
    // Does not allocate memory. Fails if not enough room to enqueue.
1045
    // Note: Use std::make_move_iterator if the elements should be moved
1046
    // instead of copied.
1047
    // Thread-safe.
1048
    template<typename It>
1049
    bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1050
    {
1051
        return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1052
    }
1053

1054

1055

1056
    // Attempts to dequeue from the queue.
1057
    // Returns false if all producer streams appeared empty at the time they
1058
    // were checked (so, the queue is likely but not guaranteed to be empty).
1059
    // Never allocates. Thread-safe.
1060
    template<typename U>
1061
    bool try_dequeue(U& item)
301,729✔
1062
    {
1063
        // Instead of simply trying each producer in turn (which could cause needless contention on the first
1064
        // producer), we score them heuristically.
1065
        size_t nonEmptyCount = 0;
301,765✔
1066
        ProducerBase* best = nullptr;
301,765✔
1067
        size_t bestSize = 0;
301,765✔
1068
        for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
722,044✔
1069
            auto size = ptr->size_approx();
420,324✔
1070
            if (size > 0) {
420,324✔
1071
                if (size > bestSize) {
279,341✔
1072
                    bestSize = size;
279,219✔
1073
                    best = ptr;
279,219✔
1074
                }
279,219✔
1075
                ++nonEmptyCount;
279,341✔
1076
            }
279,341✔
1077
        }
420,315✔
1078

1079
        // If there was at least one non-empty queue but it appears empty at the time
1080
        // we try to dequeue from it, we need to make sure every queue's been tried
1081
        if (nonEmptyCount > 0) {
301,763✔
1082
            if ((details::likely)(best->dequeue(item))) {
279,242✔
1083
                return true;
279,106✔
1084
            }
1085
            for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
307✔
1086
                if (ptr != best && ptr->dequeue(item)) {
171✔
1087
                    return true;
×
1088
                }
1089
            }
171✔
1090
        }
136✔
1091
        return false;
22,685✔
1092
    }
301,761✔
1093

1094
    // Attempts to dequeue from the queue.
1095
    // Returns false if all producer streams appeared empty at the time they
1096
    // were checked (so, the queue is likely but not guaranteed to be empty).
1097
    // This differs from the try_dequeue(item) method in that this one does
1098
    // not attempt to reduce contention by interleaving the order that producer
1099
    // streams are dequeued from. So, using this method can reduce overall throughput
1100
    // under contention, but will give more predictable results in single-threaded
1101
    // consumer scenarios. This is mostly only useful for internal unit tests.
1102
    // Never allocates. Thread-safe.
1103
    template<typename U>
1104
    bool try_dequeue_non_interleaved(U& item)
1105
    {
1106
        for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1107
            if (ptr->dequeue(item)) {
1108
                return true;
1109
            }
1110
        }
1111
        return false;
1112
    }
1113

1114
    // Attempts to dequeue from the queue using an explicit consumer token.
1115
    // Returns false if all producer streams appeared empty at the time they
1116
    // were checked (so, the queue is likely but not guaranteed to be empty).
1117
    // Never allocates. Thread-safe.
1118
    template<typename U>
1119
    bool try_dequeue(consumer_token_t& token, U& item)
1120
    {
1121
        // The idea is roughly as follows:
1122
        // Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the highest efficiency consumer dictates the rotation speed of everyone else, more or less
1123
        // If you see that the global offset has changed, you must reset your consumption counter and move to your designated place
1124
        // If there's no items where you're supposed to be, keep moving until you find a producer with some items
1125
        // If the global offset has not changed but you've run out of items to consume, move over from your current position until you find an producer with something in it
1126

1127
        if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1128
            if (!update_current_producer_after_rotation(token)) {
1129
                return false;
1130
            }
1131
        }
1132

1133
        // If there was at least one non-empty queue but it appears empty at the time
1134
        // we try to dequeue from it, we need to make sure every queue's been tried
1135
        if (static_cast<ProducerBase*>(token.currentProducer)->dequeue(item)) {
1136
            if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1137
                globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1138
            }
1139
            return true;
1140
        }
1141

1142
        auto tail = producerListTail.load(std::memory_order_acquire);
1143
        auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1144
        if (ptr == nullptr) {
1145
            ptr = tail;
1146
        }
1147
        while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1148
            if (ptr->dequeue(item)) {
1149
                token.currentProducer = ptr;
1150
                token.itemsConsumedFromCurrent = 1;
1151
                return true;
1152
            }
1153
            ptr = ptr->next_prod();
1154
            if (ptr == nullptr) {
1155
                ptr = tail;
1156
            }
1157
        }
1158
        return false;
1159
    }
1160

1161
    // Attempts to dequeue several elements from the queue.
1162
    // Returns the number of items actually dequeued.
1163
    // Returns 0 if all producer streams appeared empty at the time they
1164
    // were checked (so, the queue is likely but not guaranteed to be empty).
1165
    // Never allocates. Thread-safe.
1166
    template<typename It>
1167
    size_t try_dequeue_bulk(It itemFirst, size_t max)
1168
    {
1169
        size_t count = 0;
1170
        for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1171
            count += ptr->dequeue_bulk(itemFirst, max - count);
1172
            if (count == max) {
1173
                break;
1174
            }
1175
        }
1176
        return count;
1177
    }
1178

1179
    // Attempts to dequeue several elements from the queue using an explicit consumer token.
1180
    // Returns the number of items actually dequeued.
1181
    // Returns 0 if all producer streams appeared empty at the time they
1182
    // were checked (so, the queue is likely but not guaranteed to be empty).
1183
    // Never allocates. Thread-safe.
1184
    template<typename It>
1185
    size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
1186
    {
1187
        if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1188
            if (!update_current_producer_after_rotation(token)) {
1189
                return 0;
1190
            }
1191
        }
1192

1193
        size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1194
        if (count == max) {
1195
            if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1196
                globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1197
            }
1198
            return max;
1199
        }
1200
        token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
1201
        max -= count;
1202

1203
        auto tail = producerListTail.load(std::memory_order_acquire);
1204
        auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1205
        if (ptr == nullptr) {
1206
            ptr = tail;
1207
        }
1208
        while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1209
            auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1210
            count += dequeued;
1211
            if (dequeued != 0) {
1212
                token.currentProducer = ptr;
1213
                token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
1214
            }
1215
            if (dequeued == max) {
1216
                break;
1217
            }
1218
            max -= dequeued;
1219
            ptr = ptr->next_prod();
1220
            if (ptr == nullptr) {
1221
                ptr = tail;
1222
            }
1223
        }
1224
        return count;
1225
    }
1226

1227

1228

1229
    // Attempts to dequeue from a specific producer's inner queue.
1230
    // If you happen to know which producer you want to dequeue from, this
1231
    // is significantly faster than using the general-case try_dequeue methods.
1232
    // Returns false if the producer's queue appeared empty at the time it
1233
    // was checked (so, the queue is likely but not guaranteed to be empty).
1234
    // Never allocates. Thread-safe.
1235
    template<typename U>
1236
    inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item)
1237
    {
1238
        return static_cast<ExplicitProducer*>(producer.producer)->dequeue(item);
1239
    }
1240

1241
    // Attempts to dequeue several elements from a specific producer's inner queue.
1242
    // Returns the number of items actually dequeued.
1243
    // If you happen to know which producer you want to dequeue from, this
1244
    // is significantly faster than using the general-case try_dequeue methods.
1245
    // Returns 0 if the producer's queue appeared empty at the time it
1246
    // was checked (so, the queue is likely but not guaranteed to be empty).
1247
    // Never allocates. Thread-safe.
1248
    template<typename It>
1249
    inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max)
1250
    {
1251
        return static_cast<ExplicitProducer*>(producer.producer)->dequeue_bulk(itemFirst, max);
1252
    }
1253

1254

1255
    // Returns an estimate of the total number of elements currently in the queue. This
1256
    // estimate is only accurate if the queue has completely stabilized before it is called
1257
    // (i.e. all enqueue and dequeue operations have completed and their memory effects are
1258
    // visible on the calling thread, and no further operations start while this method is
1259
    // being called).
1260
    // Thread-safe.
1261
    size_t size_approx() const
1262
    {
1263
        size_t size = 0;
1264
        for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1265
            size += ptr->size_approx();
1266
        }
1267
        return size;
1268
    }
1269

1270

1271
    // Returns true if the underlying atomic variables used by
1272
    // the queue are lock-free (they should be on most platforms).
1273
    // Thread-safe.
1274
    static bool is_lock_free()
1275
    {
1276
        return
1277
            details::static_is_lock_free<bool>::value == 2 &&
1278
            details::static_is_lock_free<size_t>::value == 2 &&
1279
            details::static_is_lock_free<std::uint32_t>::value == 2 &&
1280
            details::static_is_lock_free<index_t>::value == 2 &&
1281
            details::static_is_lock_free<void*>::value == 2 &&
1282
            details::static_is_lock_free<typename details::thread_id_converter<details::thread_id_t>::thread_id_numeric_size_t>::value == 2;
1283
    }
1284

1285

1286
private:
1287
    friend struct ProducerToken;
1288
    friend struct ConsumerToken;
1289
    struct ExplicitProducer;
1290
    friend struct ExplicitProducer;
1291
    struct ImplicitProducer;
1292
    friend struct ImplicitProducer;
1293
    friend class ConcurrentQueueTests;
1294

1295
    enum AllocationMode { CanAlloc, CannotAlloc };
1296

1297

1298
    ///////////////////////////////
1299
    // Queue methods
1300
    ///////////////////////////////
1301

1302
    template<AllocationMode canAlloc, typename U>
1303
    inline bool inner_enqueue(producer_token_t const& token, U&& element)
1304
    {
1305
        return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(HPX_FORWARD(U, element));
1306
    }
1307

1308
    template<AllocationMode canAlloc, typename U>
1309
    inline bool inner_enqueue(U&& element)
279,115✔
1310
    {
1311
        auto producer = get_or_add_implicit_producer();
279,107✔
1312
        return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(HPX_FORWARD(U, element));
279,115✔
1313
    }
1314

1315
    template<AllocationMode canAlloc, typename It>
1316
    inline bool inner_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1317
    {
1318
        return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1319
    }
1320

1321
    template<AllocationMode canAlloc, typename It>
1322
    inline bool inner_enqueue_bulk(It itemFirst, size_t count)
1323
    {
1324
        auto producer = get_or_add_implicit_producer();
1325
        return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1326
    }
1327

1328
    inline bool update_current_producer_after_rotation(consumer_token_t& token)
1329
    {
1330
        // Ah, there's been a rotation, figure out where we should be!
1331
        auto tail = producerListTail.load(std::memory_order_acquire);
1332
        if (token.desiredProducer == nullptr && tail == nullptr) {
1333
            return false;
1334
        }
1335
        auto prodCount = producerCount.load(std::memory_order_relaxed);
1336
        auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1337
        if ((details::unlikely)(token.desiredProducer == nullptr)) {
1338
            // Aha, first time we're dequeueing anything.
1339
            // Figure out our local position
1340
            // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
1341
            std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1342
            token.desiredProducer = tail;
1343
            for (std::uint32_t i = 0; i != offset; ++i) {
1344
                token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1345
                if (token.desiredProducer == nullptr) {
1346
                    token.desiredProducer = tail;
1347
                }
1348
            }
1349
        }
1350

1351
        std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1352
        if (delta >= prodCount) {
1353
            delta = delta % prodCount;
1354
        }
1355
        for (std::uint32_t i = 0; i != delta; ++i) {
1356
            token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1357
            if (token.desiredProducer == nullptr) {
1358
                token.desiredProducer = tail;
1359
            }
1360
        }
1361

1362
        token.lastKnownGlobalOffset = globalOffset;
1363
        token.currentProducer = token.desiredProducer;
1364
        token.itemsConsumedFromCurrent = 0;
1365
        return true;
1366
    }
1367

1368

1369
    ///////////////////////////
1370
    // Free list
1371
    ///////////////////////////
1372

1373
    template <typename N>
1374
    struct FreeListNode
1375
    {
1376
        FreeListNode() : freeListRefs(0), freeListNext(nullptr) { }
1377

1378
        std::atomic<std::uint32_t> freeListRefs;
1379
        std::atomic<N*> freeListNext;
1380
    };
1381

1382
    // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
1383
    // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
1384
    // speedy under low contention.
1385
    template<typename N>    // N must inherit FreeListNode or have the same fields (and initialization of them)
1386
    struct FreeList
1387
    {
1388
        FreeList() : freeListHead(nullptr) { }
972✔
1389
        FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); }
1390
        void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1391

1392
        FreeList(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
1393
        FreeList& operator=(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
1394

1395
        inline void add(N* node)
10,197✔
1396
        {
1397
#if MCDBGQ_NOLOCKFREE_FREELIST
1398
            debug::DebugLock lock(mutex);
1399
#endif
1400
            // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
1401
            // set it using a fetch_add
1402
            if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
10,197✔
1403
                // Oh look! We were the last ones referencing this node, and we know
1404
                // we want to add it to the free list, so let's do it!
1405
                add_knowing_refcount_is_zero(node);
10,197✔
1406
            }
10,197✔
1407
        }
10,197✔
1408

1409
        inline N* try_get()
7,540✔
1410
        {
1411
#if MCDBGQ_NOLOCKFREE_FREELIST
1412
            debug::DebugLock lock(mutex);
1413
#endif
1414
            auto head = freeListHead.load(std::memory_order_acquire);
7,540✔
1415
            while (head != nullptr) {
7,540✔
1416
                auto prevHead = head;
6,544✔
1417
                auto refs = head->freeListRefs.load(std::memory_order_relaxed);
6,544✔
1418
                if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
6,544✔
1419
                    head = freeListHead.load(std::memory_order_acquire);
×
1420
                    continue;
×
1421
                }
1422

1423
                // Good, reference count has been incremented (it wasn't at zero), which means we can read the
1424
                // next and not worry about it changing between now and the time we do the CAS
1425
                auto next = head->freeListNext.load(std::memory_order_relaxed);
6,544✔
1426
                if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
6,544✔
1427
                    // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
1428
                    // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
1429
                    assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
6,544✔
1430

1431
                    // Decrease refcount twice, once for our ref, and once for the list's ref
1432
                    head->freeListRefs.fetch_sub(2, std::memory_order_release);
6,544✔
1433
                    return head;
6,544✔
1434
                }
1435

1436
                // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
1437
                // Note that we don't need to release any memory effects, but we do need to ensure that the reference
1438
                // count decrement happens-after the CAS on the head.
1439
                refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
×
1440
                if (refs == SHOULD_BE_ON_FREELIST + 1) {
×
1441
                    add_knowing_refcount_is_zero(prevHead);
×
1442
                }
×
1443
            }
1444

1445
            return nullptr;
996✔
1446
        }
7,540✔
1447

1448
        // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
1449
        N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
972✔
1450

1451
    private:
1452
        inline void add_knowing_refcount_is_zero(N* node)
10,197✔
1453
        {
1454
            // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
1455
            // only one copy of this method per node at a time, i.e. the single thread case), then we know
1456
            // we can safely change the next pointer of the node; however, once the refcount is back above
1457
            // zero, then other threads could increase it (happens under heavy contention, when the refcount
1458
            // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
1459
            // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
1460
            // to add the node to the actual list fails, decrease the refcount and leave the add operation to
1461
            // the next thread who puts the refcount back at zero (which could be us, hence the loop).
1462
            auto head = freeListHead.load(std::memory_order_relaxed);
10,197✔
1463
            while (true) {
10,197✔
1464
                node->freeListNext.store(head, std::memory_order_relaxed);
10,197✔
1465
                node->freeListRefs.store(1, std::memory_order_release);
10,197✔
1466
                if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
10,197✔
1467
                    // Hmm, the add failed, but we can only try again when the refcount goes back to zero
1468
                    if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
×
1469
                        continue;
×
1470
                    }
1471
                }
×
1472
                return;
10,197✔
1473
            }
1474
        }
1475

1476
    private:
1477
        // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
1478
        std::atomic<N*> freeListHead;
1479

1480
    static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1481
    static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1482

1483
#if MCDBGQ_NOLOCKFREE_FREELIST
1484
        debug::DebugMutex mutex;
1485
#endif
1486
    };
1487

1488

1489
    ///////////////////////////
1490
    // Block
1491
    ///////////////////////////
1492

1493
    enum InnerQueueContext { implicit_context = 0, explicit_context = 1 };
1494

1495
    struct Block
1496
    {
1497
        Block()
32,100✔
1498
            : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true)
64,200✔
1499
        {
1500
#if MCDBGQ_TRACKMEM
1501
            owner = nullptr;
1502
#endif
1503
        }
32,100✔
1504

1505
        template<InnerQueueContext context>
1506
        inline bool is_empty() const
×
1507
        {
1508
            if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1509
                // Check flags
1510
                for (size_t i = 0; i < BLOCK_SIZE; ++i) {
×
1511
                    if (!emptyFlags[i].load(std::memory_order_relaxed)) {
×
1512
                        return false;
×
1513
                    }
1514
                }
×
1515

1516
                // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
1517
                std::atomic_thread_fence(std::memory_order_acquire);
×
1518
                return true;
×
1519
            }
1520
            else {
1521
                // Check counter
1522
                if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1523
                    std::atomic_thread_fence(std::memory_order_acquire);
1524
                    return true;
1525
                }
1526
                assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1527
                return false;
1528
            }
1529
        }
×
1530

1531
        // Returns true if the block is now empty (does not apply in explicit context)
1532
        template<InnerQueueContext context>
1533
        inline bool set_empty(index_t i)
279,092✔
1534
        {
1535
            if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1536
                // Set flag
1537
                assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
×
1538
                emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release);
×
1539
                return false;
×
1540
            }
1541
            else {
1542
                // Increment counter
1543
                auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
279,109✔
1544
                assert(prevVal < BLOCK_SIZE);
279,092✔
1545
                return prevVal == BLOCK_SIZE - 1;
279,109✔
1546
            }
1547
        }
1548

1549
        // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
1550
        // Returns true if the block is now empty (does not apply in explicit context).
1551
        template<InnerQueueContext context>
1552
        inline bool set_many_empty(index_t i, size_t count)
1553
        {
1554
            if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1555
                // Set flags
1556
                std::atomic_thread_fence(std::memory_order_release);
1557
                i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
1558
                for (size_t j = 0; j != count; ++j) {
1559
                    assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1560
                    emptyFlags[i + j].store(true, std::memory_order_relaxed);
1561
                }
1562
                return false;
1563
            }
1564
            else {
1565
                // Increment counter
1566
                auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1567
                assert(prevVal + count <= BLOCK_SIZE);
1568
                return prevVal + count == BLOCK_SIZE;
1569
            }
1570
        }
1571

1572
        template<InnerQueueContext context>
1573
        inline void set_all_empty()
1574
        {
1575
            if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1576
                // Set all flags
1577
                for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1578
                    emptyFlags[i].store(true, std::memory_order_relaxed);
1579
                }
1580
            }
1581
            else {
1582
                // Reset counter
1583
                elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1584
            }
1585
        }
1586

1587
        template<InnerQueueContext context>
1588
        inline void reset_empty()
10,197✔
1589
        {
1590
            if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1591
                // Reset flags
1592
                for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1593
                    emptyFlags[i].store(false, std::memory_order_relaxed);
1594
                }
1595
            }
1596
            else {
1597
                // Reset counter
1598
                elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
10,197✔
1599
            }
1600
        }
10,197✔
1601

1602
        inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
558,142✔
1603
        inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1604

1605
    private:
1606
        // IMPORTANT: This must be the first member in Block, so that if T depends on the alignment of
1607
        // addresses returned by malloc, that alignment will be preserved. Apparently clang actually
1608
        // generates code that uses this assumption for AVX instructions in some cases. Ideally, we
1609
        // should also align Block to the alignment of T in case it's higher than malloc's 16-byte
1610
        // alignment, but this is hard to do in a cross-platform way. Assert for this case:
1611
        static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value, "The queue does not support super-aligned types at this time");
1612
        // Additionally, we need the alignment of Block itself to be a multiple of max_align_t since
1613
        // otherwise the appropriate padding will not be added at the end of Block in order to make
1614
        // arrays of Blocks all be properly aligned (not just the first one). We use a union to force
1615
        // this.
1616
        union {
1617
            // NOLINTNEXTLINE(bugprone-sizeof-expression)
1618
            char elements[sizeof(T) * BLOCK_SIZE];
1619
            details::max_align_t dummy;
1620
        };
1621
    public:
1622
        Block* next;
1623
        std::atomic<size_t> elementsCompletelyDequeued;
1624
        std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1625
    public:
1626
        std::atomic<std::uint32_t> freeListRefs;
1627
        std::atomic<Block*> freeListNext;
1628
        std::atomic<bool> shouldBeOnFreeList;
1629
        bool dynamicallyAllocated;    // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
1630

1631
#if MCDBGQ_TRACKMEM
1632
        void* owner;
1633
#endif
1634
    };
1635
    static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1636

1637

1638
#if MCDBGQ_TRACKMEM
1639
public:
1640
    struct MemStats;
1641
private:
1642
#endif
1643

1644
    ///////////////////////////
1645
    // Producer base
1646
    ///////////////////////////
1647

1648
    struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase
1649
    {
1650
        ProducerBase(ConcurrentQueue* parent_, bool isExplicit_) :
1,697✔
1651
            tailIndex(0),
1,697✔
1652
            headIndex(0),
1,697✔
1653
            dequeueOptimisticCount(0),
1,697✔
1654
            dequeueOvercommit(0),
1,697✔
1655
            tailBlock(nullptr),
1,697✔
1656
            isExplicit(isExplicit_),
1,697✔
1657
            parent(parent_)
1,697✔
1658
        {
3,394✔
1659
        }
1,697✔
1660

1661
        virtual ~ProducerBase() { };
1,697✔
1662

1663
        template<typename U>
1664
        inline bool dequeue(U& element)
279,248✔
1665
        {
1666
            if (isExplicit) {
279,248✔
1667
                return static_cast<ExplicitProducer*>(this)->dequeue(element);
×
1668
            }
1669
            else {
1670
                return static_cast<ImplicitProducer*>(this)->dequeue(element);
279,273✔
1671
            }
1672
        }
279,273✔
1673

1674
        template<typename It>
1675
        inline size_t dequeue_bulk(It& itemFirst, size_t max)
1676
        {
1677
            if (isExplicit) {
1678
                return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1679
            }
1680
            else {
1681
                return static_cast<ImplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1682
            }
1683
        }
1684

1685
        inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
436,593✔
1686

1687
        inline size_t size_approx() const
420,340✔
1688
        {
1689
            auto tail = tailIndex.load(std::memory_order_relaxed);
420,314✔
1690
            auto head = headIndex.load(std::memory_order_relaxed);
420,314✔
1691
            return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
420,340✔
1692
        }
1693

1694
        inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
1695
    protected:
1696
        std::atomic<index_t> tailIndex;    // Where to enqueue to next
1697
        std::atomic<index_t> headIndex;    // Where to dequeue from next
1698

1699
        std::atomic<index_t> dequeueOptimisticCount;
1700
        std::atomic<index_t> dequeueOvercommit;
1701

1702
        Block* tailBlock;
1703

1704
    public:
1705
        bool isExplicit;
1706
        ConcurrentQueue* parent;
1707

1708
    protected:
1709
#if MCDBGQ_TRACKMEM
1710
        friend struct MemStats;
1711
#endif
1712
    };
1713

1714

1715
    ///////////////////////////
1716
    // Explicit queue
1717
    ///////////////////////////
1718

1719
    struct ExplicitProducer : public ProducerBase
1720
    {
1721
        explicit ExplicitProducer(ConcurrentQueue* parent) :
×
1722
            ProducerBase(parent, true),
×
1723
            blockIndex(nullptr),
×
1724
            pr_blockIndexSlotsUsed(0),
×
1725
            pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
×
1726
            pr_blockIndexFront(0),
×
1727
            pr_blockIndexEntries(nullptr),
×
1728
            pr_blockIndexRaw(nullptr)
×
1729
        {
×
1730
            size_t poolBasedIndexSize = details::ceil_to_pow_2(parent->initialBlockPoolSize) >> 1;
×
1731
            if (poolBasedIndexSize > pr_blockIndexSize) {
×
1732
                pr_blockIndexSize = poolBasedIndexSize;
×
1733
            }
×
1734

1735
            new_block_index(0);    // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
×
1736
        }
×
1737

1738
        ~ExplicitProducer()
×
1739
        {
×
1740
            // Destruct any elements not yet dequeued.
1741
            // Since we're in the destructor, we can assume all elements
1742
            // are either completely dequeued or completely not (no halfways).
1743
            if (this->tailBlock != nullptr) {    // Note this means there must be a block index too
×
1744
                // First find the block that's partially dequeued, if any
1745
                Block* halfDequeuedBlock = nullptr;
×
1746
                if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
×
1747
                    // The head's not on a block boundary, meaning a block somewhere is partially dequeued
1748
                    // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary)
1749
                    size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
×
1750
                    while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
×
1751
                        i = (i + 1) & (pr_blockIndexSize - 1);
×
1752
                    }
1753
                    assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
×
1754
                    halfDequeuedBlock = pr_blockIndexEntries[i].block;
×
1755
                }
×
1756

1757
                // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration)
1758
                auto block = this->tailBlock;
×
1759
                do {
×
1760
                    block = block->next;
×
1761
                    if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
×
1762
                        continue;
×
1763
                    }
1764

1765
                    size_t i = 0;  // Offset into block
×
1766
                    if (block == halfDequeuedBlock) {
×
1767
                        i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
×
1768
                    }
×
1769

1770
                    // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index
1771
                    auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
×
1772
                    while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
×
1773
                        (*block)[i++]->~T();
×
1774
                    }
1775
                } while (block != this->tailBlock);
×
1776
            }
×
1777

1778
            // Destroy all blocks that we own
1779
            if (this->tailBlock != nullptr) {
×
1780
                auto block = this->tailBlock;
×
1781
                do {
×
1782
                    auto nextBlock = block->next;
×
1783
                    if (block->dynamicallyAllocated) {
×
1784
                        destroy(block);
×
1785
                    }
×
1786
                    else {
1787
                        this->parent->add_block_to_free_list(block);
×
1788
                    }
1789
                    block = nextBlock;
×
1790
                } while (block != this->tailBlock);
×
1791
            }
×
1792

1793
            // Destroy the block indices
1794
            auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw);
×
1795
            while (header != nullptr) {
×
1796
                auto prev = static_cast<BlockIndexHeader*>(header->prev);
×
1797
                header->~BlockIndexHeader();
×
1798
                (Traits::free)(header);
×
1799
                header = prev;
×
1800
            }
1801
        }
×
1802

1803
        template<AllocationMode allocMode, typename U>
1804
        inline bool enqueue(U&& element)
1805
        {
1806
            index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1807
            index_t newTailIndex = 1 + currentTailIndex;
1808
            if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
1809
                // We reached the end of a block, start a new one
1810
                auto startBlock = this->tailBlock;
1811
                auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1812
                if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1813
                    // We can re-use the block ahead of us, it's empty!
1814
                    this->tailBlock = this->tailBlock->next;
1815
                    this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1816

1817
                    // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
1818
                    // last block from it first -- except instead of removing then adding, we can just overwrite).
1819
                    // Note that there must be a valid block index here, since even if allocation failed in the ctor,
1820
                    // it would have been re-attempted when adding the first block to the queue; since there is such
1821
                    // a block, a block index must have been successfully allocated.
1822
                }
1823
                else {
1824
                    // Whatever head value we see here is >= the last value we saw here (relatively),
1825
                    // and <= its current value. Since we have the most recent tail, the head must be
1826
                    // <= to it.
1827
                    auto head = this->headIndex.load(std::memory_order_relaxed);
1828
                    assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1829
                    if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
1830
                        || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
1831
                        // We can't enqueue in another block because there's not enough leeway -- the
1832
                        // tail could surpass the head by the time the block fills up! (Or we'll exceed
1833
                        // the size limit, if the second part of the condition was true.)
1834
                        return false;
1835
                    }
1836
                    // We're going to need a new block; check that the block index has room
1837
                    if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1838
                        // Hmm, the circular block index is already full -- we'll need
1839
                        // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
1840
                        // the initial allocation failed in the constructor.
1841

1842
                        if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) {
1843
                            return false;
1844
                        }
1845
                    }
1846

1847
                    // Insert a new block in the circular linked list
1848
                    auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1849
                    if (newBlock == nullptr) {
1850
                        return false;
1851
                    }
1852
#if MCDBGQ_TRACKMEM
1853
                    newBlock->owner = this;
1854
#endif
1855
                    newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1856
                    if (this->tailBlock == nullptr) {
1857
                        newBlock->next = newBlock;
1858
                    }
1859
                    else {
1860
                        newBlock->next = this->tailBlock->next;
1861
                        this->tailBlock->next = newBlock;
1862
                    }
1863
                    this->tailBlock = newBlock;
1864
                    ++pr_blockIndexSlotsUsed;
1865
                }
1866

1867
                if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(HPX_FORWARD(U, element)))) {
1868
                    // The constructor may throw. We want the element not to appear in the queue in
1869
                    // that case (without corrupting the queue):
1870
                    MOODYCAMEL_TRY {
1871
                        new ((*this->tailBlock)[currentTailIndex]) T(HPX_FORWARD(U, element));
1872
                    }
1873
                    MOODYCAMEL_CATCH (...) {
1874
                        // Revert change to the current block, but leave the new block available
1875
                        // for next time
1876
                        pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1877
                        this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
1878
                        MOODYCAMEL_RETHROW;
1879
                    }
1880
                }
1881
                else {
1882
                    (void)startBlock;
1883
                    (void)originalBlockIndexSlotsUsed;
1884
                }
1885

1886
                // Add block to block index
1887
                auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1888
                entry.base = currentTailIndex;
1889
                entry.block = this->tailBlock;
1890
                blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
1891
                pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
1892

1893
                if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(HPX_FORWARD(U, element)))) {
1894
                    this->tailIndex.store(newTailIndex, std::memory_order_release);
1895
                    return true;
1896
                }
1897
            }
1898

1899
            // Enqueue
1900
            new ((*this->tailBlock)[currentTailIndex]) T(HPX_FORWARD(U, element));
1901

1902
            this->tailIndex.store(newTailIndex, std::memory_order_release);
1903
            return true;
1904
        }
1905

1906
        template<typename U>
1907
        bool dequeue(U& element)
×
1908
        {
1909
            auto tail = this->tailIndex.load(std::memory_order_relaxed);
×
1910
            auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
×
1911
            if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
×
1912
                // Might be something to dequeue, let's give it a try
1913

1914
                // Note that this if is purely for performance purposes in the common case when the queue is
1915
                // empty and the values are eventually consistent -- we may enter here spuriously.
1916

1917
                // Note that whatever the values of overcommit and tail are, they are not going to change (unless we
1918
                // change them) and must be the same value at this point (inside the if) as when the if condition was
1919
                // evaluated.
1920

1921
                // We insert an acquire fence here to synchronize-with the release upon incrementing dequeueOvercommit below.
1922
                // This ensures that whatever the value we got loaded into overcommit, the load of dequeueOptisticCount in
1923
                // the fetch_add below will result in a value at least as recent as that (and therefore at least as large).
1924
                // Note that I believe a compiler (signal) fence here would be sufficient due to the nature of fetch_add (all
1925
                // read-modify-write operations are guaranteed to work on the latest value in the modification order), but
1926
                // unfortunately that can't be shown to be correct using only the C++11 standard.
1927
                // See http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
1928
                std::atomic_thread_fence(std::memory_order_acquire);
×
1929

1930
                // Increment optimistic counter, then check if it went over the boundary
1931
                auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
×
1932

1933
                // Note that since dequeueOvercommit must be <= dequeueOptimisticCount (because dequeueOvercommit is only ever
1934
                // incremented after dequeueOptimisticCount -- this is enforced in the `else` block below), and since we now
1935
                // have a version of dequeueOptimisticCount that is at least as recent as overcommit (due to the release upon
1936
                // incrementing dequeueOvercommit and the acquire above that synchronizes with it), overcommit <= myDequeueCount.
1937
                // However, we can't assert this since both dequeueOptimisticCount and dequeueOvercommit may (independently)
1938
                // overflow; in such a case, though, the logic still holds since the difference between the two is maintained.
1939

1940
                // Note that we reload tail here in case it changed; it will be the same value as before or greater, since
1941
                // this load is sequenced after (happens after) the earlier load above. This is supported by read-read
1942
                // coherency (as defined in the standard), explained here: http://en.cppreference.com/w/cpp/atomic/memory_order
1943
                tail = this->tailIndex.load(std::memory_order_acquire);
×
1944
                if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
×
1945
                    // Guaranteed to be at least one element to dequeue!
1946

1947
                    // Get the index. Note that since there's guaranteed to be at least one element, this
1948
                    // will never exceed tail. We need to do an acquire-release fence here since it's possible
1949
                    // that whatever condition got us to this point was for an earlier enqueued element (that
1950
                    // we already see the memory effects for), but that by the time we increment somebody else
1951
                    // has incremented it, and we need to see the memory effects for *that* element, which is
1952
                    // in such a case is necessarily visible on the thread that incremented it in the first
1953
                    // place with the more current condition (they must have acquired a tail that is at least
1954
                    // as recent).
1955
                    auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
×
1956

1957

1958
                    // Determine which block the element is in
1959

1960
                    auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
×
1961
                    auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
×
1962

1963
                    // We need to be careful here about subtracting and dividing because of index wrap-around.
1964
                    // When an index wraps, we need to preserve the sign of the offset when dividing it by the
1965
                    // block size (in order to get a correct signed block count offset in all cases):
1966
                    auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
×
1967
                    auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
×
1968
                    auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) / BLOCK_SIZE);
×
1969
                    auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
×
1970

1971
                    // Dequeue
1972
                    auto& el = *((*block)[index]);
×
1973
                    if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = HPX_MOVE(el))) {
1974
                        // Make sure the element is still fully dequeued and destroyed even if the assignment
1975
                        // throws
1976
                        struct Guard {
1977
                            Block* block;
1978
                            index_t index;
1979

1980
                            ~Guard()
1981
                            {
1982
                                (*block)[index]->~T();
1983
                                block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1984
                            }
1985
                        } guard = { block, index };
1986

1987
                        element = HPX_MOVE(el); // NOLINT
1988
                    }
1989
                    else {
1990
                        element = HPX_MOVE(el); // NOLINT
×
1991
                        el.~T(); // NOLINT
×
1992
                        block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
×
1993
                    }
1994

1995
                    return true;
×
1996
                }
1997
                else {
1998
                    // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
1999
                    this->dequeueOvercommit.fetch_add(1, std::memory_order_release);    // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
×
2000
                }
2001
            }
×
2002

2003
            return false;
×
2004
        }
×
2005

2006
        template<AllocationMode allocMode, typename It>
2007
        bool enqueue_bulk(It itemFirst, size_t count)
2008
        {
2009
            // First, we need to make sure we have enough room to enqueue all of the elements;
2010
            // this means pre-allocating blocks and putting them in the block index (but only if
2011
            // all the allocations succeeded).
2012
            index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2013
            auto startBlock = this->tailBlock;
2014
            auto originalBlockIndexFront = pr_blockIndexFront;
2015
            auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2016

2017
            Block* firstAllocatedBlock = nullptr;
2018

2019
            // Figure out how many blocks we'll need to allocate, and do so
2020
            size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2021
            index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2022
            if (blockBaseDiff > 0) {
2023
                // Allocate as many blocks as possible from ahead
2024
                while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2025
                    blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2026
                    currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2027

2028
                    this->tailBlock = this->tailBlock->next;
2029
                    firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2030

2031
                    auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2032
                    entry.base = currentTailIndex;
2033
                    entry.block = this->tailBlock;
2034
                    pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2035
                }
2036

2037
                // Now allocate as many blocks as necessary from the block pool
2038
                while (blockBaseDiff > 0) {
2039
                    blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2040
                    currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2041

2042
                    auto head = this->headIndex.load(std::memory_order_relaxed);
2043
                    assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2044
                    bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2045
                    if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2046
                        if (allocMode == CannotAlloc || full || !new_block_index(originalBlockIndexSlotsUsed)) {
2047
                            // Failed to allocate, undo changes (but keep injected blocks)
2048
                            pr_blockIndexFront = originalBlockIndexFront;
2049
                            pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2050
                            this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2051
                            return false;
2052
                        }
2053

2054
                        // pr_blockIndexFront is updated inside new_block_index, so we need to
2055
                        // update our fallback value too (since we keep the new index even if we
2056
                        // later fail)
2057
                        originalBlockIndexFront = originalBlockIndexSlotsUsed;
2058
                    }
2059

2060
                    // Insert a new block in the circular linked list
2061
                    auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2062
                    if (newBlock == nullptr) {
2063
                        pr_blockIndexFront = originalBlockIndexFront;
2064
                        pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2065
                        this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2066
                        return false;
2067
                    }
2068

2069
#if MCDBGQ_TRACKMEM
2070
                    newBlock->owner = this;
2071
#endif
2072
                    newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2073
                    if (this->tailBlock == nullptr) {
2074
                        newBlock->next = newBlock;
2075
                    }
2076
                    else {
2077
                        newBlock->next = this->tailBlock->next;
2078
                        this->tailBlock->next = newBlock;
2079
                    }
2080
                    this->tailBlock = newBlock;
2081
                    firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2082

2083
                    ++pr_blockIndexSlotsUsed;
2084

2085
                    auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2086
                    entry.base = currentTailIndex;
2087
                    entry.block = this->tailBlock;
2088
                    pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2089
                }
2090

2091
                // Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and
2092
                // publish the new block index front
2093
                auto block = firstAllocatedBlock;
2094
                while (true) {
2095
                    block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2096
                    if (block == this->tailBlock) {
2097
                        break;
2098
                    }
2099
                    block = block->next;
2100
                }
2101

2102
                if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2103
                    blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2104
                }
2105
            }
2106

2107
            // Enqueue, one block at a time
2108
            index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2109
            currentTailIndex = startTailIndex;
2110
            auto endBlock = this->tailBlock;
2111
            this->tailBlock = startBlock;
2112
            assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2113
            if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2114
                this->tailBlock = firstAllocatedBlock;
2115
            }
2116
            while (true) {
2117
                auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2118
                if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2119
                    stopIndex = newTailIndex;
2120
                }
2121
                if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2122
                    while (currentTailIndex != stopIndex) {
2123
                        new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2124
                    }
2125
                }
2126
                else {
2127
                    MOODYCAMEL_TRY {
2128
                        while (currentTailIndex != stopIndex) {
2129
                            // Must use copy constructor even if move constructor is available
2130
                            // because we may have to revert if there's an exception.
2131
                            // Sorry about the horrible templated next line, but it was the only way
2132
                            // to disable moving *at compile time*, which is important because a type
2133
                            // may only define a (noexcept) move constructor, and so calls to the
2134
                            // cctor will not compile, even if they are in an if branch that will never
2135
                            // be executed
2136
                            new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2137
                            ++currentTailIndex;
2138
                            ++itemFirst;
2139
                        }
2140
                    }
2141
                    MOODYCAMEL_CATCH (...) {
2142
                        // Oh dear, an exception's been thrown -- destroy the elements that
2143
                        // were enqueued so far and revert the entire bulk operation (we'll keep
2144
                        // any allocated blocks in our linked list for later, though).
2145
                        auto constructedStopIndex = currentTailIndex;
2146
                        auto lastBlockEnqueued = this->tailBlock;
2147

2148
                        pr_blockIndexFront = originalBlockIndexFront;
2149
                        pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2150
                        this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2151

2152
                        if (!details::is_trivially_destructible<T>::value) {
2153
                            auto block = startBlock;
2154
                            if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2155
                                block = firstAllocatedBlock;
2156
                            }
2157
                            currentTailIndex = startTailIndex;
2158
                            while (true) {
2159
                                stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2160
                                if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2161
                                    stopIndex = constructedStopIndex;
2162
                                }
2163
                                while (currentTailIndex != stopIndex) {
2164
                                    (*block)[currentTailIndex++]->~T();
2165
                                }
2166
                                if (block == lastBlockEnqueued) {
2167
                                    break;
2168
                                }
2169
                                block = block->next;
2170
                            }
2171
                        }
2172
                        MOODYCAMEL_RETHROW;
2173
                    }
2174
                }
2175

2176
                if (this->tailBlock == endBlock) {
2177
                    assert(currentTailIndex == newTailIndex);
2178
                    break;
2179
                }
2180
                this->tailBlock = this->tailBlock->next;
2181
            }
2182

2183
            if (!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst))) && firstAllocatedBlock != nullptr) {
2184
                blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2185
            }
2186

2187
            this->tailIndex.store(newTailIndex, std::memory_order_release);
2188
            return true;
2189
        }
2190

2191
        template<typename It>
2192
        size_t dequeue_bulk(It& itemFirst, size_t max)
2193
        {
2194
            auto tail = this->tailIndex.load(std::memory_order_relaxed);
2195
            auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2196
            auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2197
            if (details::circular_less_than<size_t>(0, desiredCount)) {
2198
                desiredCount = desiredCount < max ? desiredCount : max;
2199
                std::atomic_thread_fence(std::memory_order_acquire);
2200

2201
                auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);;
2202

2203
                tail = this->tailIndex.load(std::memory_order_acquire);
2204
                auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2205
                if (details::circular_less_than<size_t>(0, actualCount)) {
2206
                    actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2207
                    if (actualCount < desiredCount) {
2208
                        this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2209
                    }
2210

2211
                    // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2212
                    // will never exceed tail.
2213
                    auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2214

2215
                    // Determine which block the first element is in
2216
                    auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2217
                    auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2218

2219
                    auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2220
                    auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2221
                    auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE);
2222
                    auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2223

2224
                    // Iterate the blocks and dequeue
2225
                    auto index = firstIndex;
2226
                    do {
2227
                        auto firstIndexInBlock = index;
2228
                        auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2229
                        endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2230
                        auto block = localBlockIndex->entries[indexIndex].block;
2231
                        if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = HPX_MOVE((*(*block)[index])))) {
2232
                            while (index != endIndex) {
2233
                                auto& el = *((*block)[index]);
2234
                                *itemFirst++ = HPX_MOVE(el);
2235
                                el.~T();
2236
                                ++index;
2237
                            }
2238
                        }
2239
                        else {
2240
                            MOODYCAMEL_TRY {
2241
                                while (index != endIndex) {
2242
                                    auto& el = *((*block)[index]);
2243
                                    *itemFirst = HPX_MOVE(el);
2244
                                    ++itemFirst;
2245
                                    el.~T();
2246
                                    ++index;
2247
                                }
2248
                            }
2249
                            MOODYCAMEL_CATCH (...) {
2250
                                // It's too late to revert the dequeue, but we can make sure that all
2251
                                // the dequeued objects are properly destroyed and the block index
2252
                                // (and empty count) are properly updated before we propagate the exception
2253
                                do {
2254
                                    block = localBlockIndex->entries[indexIndex].block;
2255
                                    while (index != endIndex) {
2256
                                        (*block)[index++]->~T();
2257
                                    }
2258
                                    block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2259
                                    indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2260

2261
                                    firstIndexInBlock = index;
2262
                                    endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2263
                                    endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2264
                                } while (index != firstIndex + actualCount);
2265

2266
                                MOODYCAMEL_RETHROW;
2267
                            }
2268
                        }
2269
                        block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2270
                        indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2271
                    } while (index != firstIndex + actualCount);
2272

2273
                    return actualCount;
2274
                }
2275
                else {
2276
                    // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2277
                    this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2278
                }
2279
            }
2280

2281
            return 0;
2282
        }
2283

2284
    private:
2285
        struct BlockIndexEntry
2286
        {
2287
            index_t base;
2288
            Block* block;
2289
        };
2290

2291
        struct BlockIndexHeader
2292
        {
2293
            size_t size;
2294
            std::atomic<size_t> front;    // Current slot (not next, like pr_blockIndexFront)
2295
            BlockIndexEntry* entries;
2296
            void* prev;
2297
        };
2298

2299

2300
        bool new_block_index(size_t numberOfFilledSlotsToExpose)
×
2301
        {
2302
            auto prevBlockSizeMask = pr_blockIndexSize - 1;
×
2303

2304
            // Create the new block
2305
            pr_blockIndexSize <<= 1;
×
2306
            auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize));
×
2307
            if (newRawPtr == nullptr) {
×
2308
                pr_blockIndexSize >>= 1;    // Reset to allow graceful retry
×
2309
                return false;
×
2310
            }
2311

2312
            auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader)));
×
2313

2314
            // Copy in all the old indices, if any
2315
            size_t j = 0;
×
2316
            if (pr_blockIndexSlotsUsed != 0) {
×
2317
                auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
×
2318
                do {
×
2319
                    newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
×
2320
                    i = (i + 1) & prevBlockSizeMask;
×
2321
                } while (i != pr_blockIndexFront);
×
2322
            }
×
2323

2324
            // Update everything
2325
            auto header = new (newRawPtr) BlockIndexHeader;
×
2326
            header->size = pr_blockIndexSize;
×
2327
            header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
×
2328
            header->entries = newBlockIndexEntries;
×
2329
            header->prev = pr_blockIndexRaw;    // we link the new block to the old one so we can free it later
×
2330

2331
            pr_blockIndexFront = j;
×
2332
            pr_blockIndexEntries = newBlockIndexEntries;
×
2333
            pr_blockIndexRaw = newRawPtr;
×
2334
            blockIndex.store(header, std::memory_order_release);
×
2335

2336
            return true;
×
2337
        }
×
2338

2339
    private:
2340
        std::atomic<BlockIndexHeader*> blockIndex;
2341

2342
        // To be used by producer only -- consumer must use the ones in referenced by blockIndex
2343
        size_t pr_blockIndexSlotsUsed;
2344
        size_t pr_blockIndexSize;
2345
        size_t pr_blockIndexFront;    // Next slot (not current)
2346
        BlockIndexEntry* pr_blockIndexEntries;
2347
        void* pr_blockIndexRaw;
2348

2349
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2350
    public:
2351
        ExplicitProducer* nextExplicitProducer;
2352
    private:
2353
#endif
2354

2355
#if MCDBGQ_TRACKMEM
2356
        friend struct MemStats;
2357
#endif
2358
    };
2359

2360

2361
    //////////////////////////////////
2362
    // Implicit queue
2363
    //////////////////////////////////
2364

2365
    struct ImplicitProducer : public ProducerBase
2366
    {
2367
        ImplicitProducer(ConcurrentQueue* parent) :
1,697✔
2368
            ProducerBase(parent, false),
1,697✔
2369
            nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE),
1,697✔
2370
            blockIndex(nullptr)
1,697✔
2371
        {
3,394✔
2372
            new_block_index();
1,697✔
2373
        }
1,697✔
2374

2375
        ~ImplicitProducer()
1,697✔
2376
        {
1,697✔
2377
            // Note that since we're in the destructor we can assume that all enqueue/dequeue operations
2378
            // completed already; this means that all undequeued elements are placed contiguously across
2379
            // contiguous blocks, and that only the first and last remaining blocks can be only partially
2380
            // empty (all other remaining blocks must be completely full).
2381

2382
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2383
            // Unregister ourselves for thread termination notification
2384
            if (!this->inactive.load(std::memory_order_relaxed)) {
2385
                details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2386
            }
2387
#endif
2388

2389
            // Destroy all remaining elements!
2390
            auto tail = this->tailIndex.load(std::memory_order_relaxed);
1,697✔
2391
            auto index = this->headIndex.load(std::memory_order_relaxed);
1,697✔
2392
            Block* block = nullptr;
1,697✔
2393
            assert(index == tail || details::circular_less_than(index, tail));
1,697✔
2394
            bool forceFreeLastBlock = index != tail;    // If we enter the loop, then the last (tail) block will not be freed
1,697✔
2395
            while (index != tail) {
1,697✔
2396
                if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block == nullptr) {
×
2397
                    if (block != nullptr) {
×
2398
                        // Free the old block
2399
                        this->parent->add_block_to_free_list(block);
×
2400
                    }
×
2401

2402
                    block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
×
2403
                }
×
2404

2405
                ((*block)[index])->~T();
×
2406
                ++index;
×
2407
            }
2408
            // Even if the queue is empty, there's still one block that's not on the free list
2409
            // (unless the head index reached the end of it, in which case the tail will be poised
2410
            // to create a new block).
2411
            if (this->tailBlock != nullptr && (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
1,697✔
2412
                this->parent->add_block_to_free_list(this->tailBlock);
1,696✔
2413
            }
1,696✔
2414

2415
            // Destroy block index
2416
            auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
1,697✔
2417
            if (localBlockIndex != nullptr) {
1,697✔
2418
                for (size_t i = 0; i != localBlockIndex->capacity; ++i) {
57,505✔
2419
                    localBlockIndex->index[i]->~BlockIndexEntry();
55,808✔
2420
                }
55,808✔
2421
                do {
1,697✔
2422
                    auto prev = localBlockIndex->prev;
1,720✔
2423
                    localBlockIndex->~BlockIndexHeader();
1,720✔
2424
                    (Traits::free)(localBlockIndex);
1,720✔
2425
                    localBlockIndex = prev;
1,720✔
2426
                } while (localBlockIndex != nullptr);
1,720✔
2427
            }
1,697✔
2428
        }
1,697✔
2429

2430
        template<AllocationMode allocMode, typename U>
2431
        inline bool enqueue(U&& element)
279,107✔
2432
        {
2433
            index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
279,107✔
2434
            index_t newTailIndex = 1 + currentTailIndex;
279,107✔
2435
            if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
279,107✔
2436
                // We reached the end of a block, start a new one
2437
                auto head = this->headIndex.load(std::memory_order_relaxed);
10,197✔
2438
                assert(!details::circular_less_than<index_t>(currentTailIndex, head));
10,197✔
2439
                if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
10,197✔
2440
                    return false;
×
2441
                }
2442
#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2443
                debug::DebugLock lock(mutex);
2444
#endif
2445
                // Find out where we'll be inserting this block in the block index
2446
                BlockIndexEntry* idxEntry;
2447
                if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
10,197✔
2448
                    return false;
×
2449
                }
2450

2451
                // Get ahold of a new block
2452
                auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
10,197✔
2453
                if (newBlock == nullptr) {
10,197✔
2454
                    rewind_block_index_tail();
×
2455
                    idxEntry->value.store(nullptr, std::memory_order_relaxed);
×
2456
                    return false;
×
2457
                }
2458
#if MCDBGQ_TRACKMEM
2459
                newBlock->owner = this;
2460
#endif
2461
                newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
10,196✔
2462

2463
                if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(HPX_FORWARD(U, element)))) {
2464
                    // May throw, try to insert now before we publish the fact that we have this new block
2465
                    MOODYCAMEL_TRY {
2466
                        new ((*newBlock)[currentTailIndex]) T(HPX_FORWARD(U, element));
2467
                    }
2468
                    MOODYCAMEL_CATCH (...) {
2469
                        rewind_block_index_tail();
2470
                        idxEntry->value.store(nullptr, std::memory_order_relaxed);
2471
                        this->parent->add_block_to_free_list(newBlock);
2472
                        MOODYCAMEL_RETHROW;
2473
                    }
2474
                }
2475

2476
                // Insert the new block into the index
2477
                idxEntry->value.store(newBlock, std::memory_order_relaxed);
10,196✔
2478

2479
                this->tailBlock = newBlock;
10,196✔
2480

2481
                if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(HPX_FORWARD(U, element)))) {
2482
                    this->tailIndex.store(newTailIndex, std::memory_order_release);
2483
                    return true;
2484
                }
2485
            }
10,196✔
2486

2487
            // Enqueue
2488
            new ((*this->tailBlock)[currentTailIndex]) T(HPX_FORWARD(U, element));
279,106✔
2489

2490
            this->tailIndex.store(newTailIndex, std::memory_order_release);
279,106✔
2491
            return true;
279,106✔
2492
        }
279,106✔
2493

2494
        template<typename U>
2495
        bool dequeue(U& element)
279,250✔
2496
        {
2497
            // See ExplicitProducer::dequeue for rationale and explanation
2498
            index_t tail = this->tailIndex.load(std::memory_order_relaxed);
279,244✔
2499
            index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
279,244✔
2500
            if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
279,244✔
2501
                std::atomic_thread_fence(std::memory_order_acquire);
279,136✔
2502

2503
                index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
279,136✔
2504
                tail = this->tailIndex.load(std::memory_order_acquire);
279,136✔
2505
                if ((details::likely)(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
279,136✔
2506
                    index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
279,109✔
2507

2508
                    // Determine which block the element is in
2509
                    auto entry = get_block_index_entry_for_index(index);
279,109✔
2510

2511
                    // Dequeue
2512
                    auto block = entry->value.load(std::memory_order_relaxed);
279,109✔
2513
                    auto& el = *((*block)[index]);
279,109✔
2514

2515
                    if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = HPX_MOVE(el))) {
2516
#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2517
                        // Note: Acquiring the mutex with every dequeue instead of only when a block
2518
                        // is released is very sub-optimal, but it is, after all, purely debug code.
2519
                        debug::DebugLock lock(producer->mutex);
2520
#endif
2521
                        struct Guard {
2522
                            Block* block;
2523
                            index_t index;
2524
                            BlockIndexEntry* entry;
2525
                            ConcurrentQueue* parent;
2526

2527
                            ~Guard()
2528
                            {
2529
                                (*block)[index]->~T();
2530
                                if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2531
                                    entry->value.store(nullptr, std::memory_order_relaxed);
2532
                                    parent->add_block_to_free_list(block);
2533
                                }
2534
                            }
2535
                        } guard = { block, index, entry, this->parent };
2536

2537
                        element = HPX_MOVE(el); // NOLINT
2538
                    }
2539
                    else {
2540
                        element = HPX_MOVE(el); // NOLINT
279,109✔
2541
                        el.~T(); // NOLINT
279,109✔
2542

2543
                        if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
279,109✔
2544
                            {
2545
#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2546
                                debug::DebugLock lock(mutex);
2547
#endif
2548
                                // Add the block back into the global free pool (and remove from block index)
2549
                                entry->value.store(nullptr, std::memory_order_relaxed);
8,501✔
2550
                            }
2551
                            this->parent->add_block_to_free_list(block);    // releases the above store
8,501✔
2552
                        }
8,501✔
2553
                    }
2554

2555
                    return true;
279,107✔
2556
                }
2557
                else {
2558
                    this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
33✔
2559
                }
2560
            }
33✔
2561

2562
            return false;
171✔
2563
        }
279,273✔
2564

2565
        template<AllocationMode allocMode, typename It>
2566
        bool enqueue_bulk(It itemFirst, size_t count)
2567
        {
2568
            // First, we need to make sure we have enough room to enqueue all of the elements;
2569
            // this means pre-allocating blocks and putting them in the block index (but only if
2570
            // all the allocations succeeded).
2571

2572
            // Note that the tailBlock we start off with may not be owned by us any more;
2573
            // this happens if it was filled up exactly to the top (setting tailIndex to
2574
            // the first index of the next block which is not yet allocated), then dequeued
2575
            // completely (putting it on the free list) before we enqueue again.
2576

2577
            index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2578
            auto startBlock = this->tailBlock;
2579
            Block* firstAllocatedBlock = nullptr;
2580
            auto endBlock = this->tailBlock;
2581

2582
            // Figure out how many blocks we'll need to allocate, and do so
2583
            size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2584
            index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2585
            if (blockBaseDiff > 0) {
2586
#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2587
                debug::DebugLock lock(mutex);
2588
#endif
2589
                do {
2590
                    blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2591
                    currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2592

2593
                    // Find out where we'll be inserting this block in the block index
2594
                    BlockIndexEntry* idxEntry = nullptr;  // initialization here unnecessary but compiler can't always tell
2595
                    Block* newBlock;
2596
                    bool indexInserted = false;
2597
                    auto head = this->headIndex.load(std::memory_order_relaxed);
2598
                    assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2599
                    bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2600
                    if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) == nullptr) {
2601
                        // Index allocation or block allocation failed; revert any other allocations
2602
                        // and index insertions done so far for this operation
2603
                        if (indexInserted) {
2604
                            rewind_block_index_tail();
2605
                            idxEntry->value.store(nullptr, std::memory_order_relaxed);
2606
                        }
2607
                        currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2608
                        for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2609
                            currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2610
                            idxEntry = get_block_index_entry_for_index(currentTailIndex);
2611
                            idxEntry->value.store(nullptr, std::memory_order_relaxed);
2612
                            rewind_block_index_tail();
2613
                        }
2614
                        this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2615
                        this->tailBlock = startBlock;
2616

2617
                        return false;
2618
                    }
2619

2620
#if MCDBGQ_TRACKMEM
2621
                    newBlock->owner = this;
2622
#endif
2623
                    newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2624
                    newBlock->next = nullptr;
2625

2626
                    // Insert the new block into the index
2627
                    idxEntry->value.store(newBlock, std::memory_order_relaxed);
2628

2629
                    // Store the chain of blocks so that we can undo if later allocations fail,
2630
                    // and so that we can find the blocks when we do the actual enqueueing
2631
                    if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) {
2632
                        assert(this->tailBlock != nullptr);
2633
                        this->tailBlock->next = newBlock;
2634
                    }
2635
                    this->tailBlock = newBlock;
2636
                    endBlock = newBlock;
2637
                    firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
2638
                } while (blockBaseDiff > 0);
2639
            }
2640

2641
            // Enqueue, one block at a time
2642
            index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2643
            currentTailIndex = startTailIndex;
2644
            this->tailBlock = startBlock;
2645
            assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2646
            if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2647
                this->tailBlock = firstAllocatedBlock;
2648
            }
2649
            while (true) {
2650
                auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2651
                if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2652
                    stopIndex = newTailIndex;
2653
                }
2654
                if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2655
                    while (currentTailIndex != stopIndex) {
2656
                        new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2657
                    }
2658
                }
2659
                else {
2660
                    MOODYCAMEL_TRY {
2661
                        while (currentTailIndex != stopIndex) {
2662
                            new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2663
                            ++currentTailIndex;
2664
                            ++itemFirst;
2665
                        }
2666
                    }
2667
                    MOODYCAMEL_CATCH (...) {
2668
                        auto constructedStopIndex = currentTailIndex;
2669
                        auto lastBlockEnqueued = this->tailBlock;
2670

2671
                        if (!details::is_trivially_destructible<T>::value) {
2672
                            auto block = startBlock;
2673
                            if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2674
                                block = firstAllocatedBlock;
2675
                            }
2676
                            currentTailIndex = startTailIndex;
2677
                            while (true) {
2678
                                stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2679
                                if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2680
                                    stopIndex = constructedStopIndex;
2681
                                }
2682
                                while (currentTailIndex != stopIndex) {
2683
                                    (*block)[currentTailIndex++]->~T();
2684
                                }
2685
                                if (block == lastBlockEnqueued) {
2686
                                    break;
2687
                                }
2688
                                block = block->next;
2689
                            }
2690
                        }
2691

2692
                        currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2693
                        for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2694
                            currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2695
                            auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
2696
                            idxEntry->value.store(nullptr, std::memory_order_relaxed);
2697
                            rewind_block_index_tail();
2698
                        }
2699
                        this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2700
                        this->tailBlock = startBlock;
2701
                        MOODYCAMEL_RETHROW;
2702
                    }
2703
                }
2704

2705
                if (this->tailBlock == endBlock) {
2706
                    assert(currentTailIndex == newTailIndex);
2707
                    break;
2708
                }
2709
                this->tailBlock = this->tailBlock->next;
2710
            }
2711
            this->tailIndex.store(newTailIndex, std::memory_order_release);
2712
            return true;
2713
        }
2714

2715
        template<typename It>
2716
        size_t dequeue_bulk(It& itemFirst, size_t max)
2717
        {
2718
            auto tail = this->tailIndex.load(std::memory_order_relaxed);
2719
            auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2720
            auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2721
            if (details::circular_less_than<size_t>(0, desiredCount)) {
2722
                desiredCount = desiredCount < max ? desiredCount : max;
2723
                std::atomic_thread_fence(std::memory_order_acquire);
2724

2725
                auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2726

2727
                tail = this->tailIndex.load(std::memory_order_acquire);
2728
                auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2729
                if (details::circular_less_than<size_t>(0, actualCount)) {
2730
                    actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2731
                    if (actualCount < desiredCount) {
2732
                        this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2733
                    }
2734

2735
                    // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2736
                    // will never exceed tail.
2737
                    auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2738

2739
                    // Iterate the blocks and dequeue
2740
                    auto index = firstIndex;
2741
                    BlockIndexHeader* localBlockIndex;
2742
                    auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2743
                    do {
2744
                        auto blockStartIndex = index;
2745
                        auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2746
                        endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2747

2748
                        auto entry = localBlockIndex->index[indexIndex];
2749
                        auto block = entry->value.load(std::memory_order_relaxed);
2750
                        if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = HPX_MOVE((*(*block)[index])))) {
2751
                            while (index != endIndex) {
2752
                                auto& el = *((*block)[index]);
2753
                                *itemFirst++ = HPX_MOVE(el);
2754
                                el.~T();
2755
                                ++index;
2756
                            }
2757
                        }
2758
                        else {
2759
                            MOODYCAMEL_TRY {
2760
                                while (index != endIndex) {
2761
                                    auto& el = *((*block)[index]);
2762
                                    *itemFirst = HPX_MOVE(el);
2763
                                    ++itemFirst;
2764
                                    el.~T();
2765
                                    ++index;
2766
                                }
2767
                            }
2768
                            MOODYCAMEL_CATCH (...) {
2769
                                do {
2770
                                    entry = localBlockIndex->index[indexIndex];
2771
                                    block = entry->value.load(std::memory_order_relaxed);
2772
                                    while (index != endIndex) {
2773
                                        (*block)[index++]->~T();
2774
                                    }
2775

2776
                                    if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2777
#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2778
                                        debug::DebugLock lock(mutex);
2779
#endif
2780
                                        entry->value.store(nullptr, std::memory_order_relaxed);
2781
                                        this->parent->add_block_to_free_list(block);
2782
                                    }
2783
                                    indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2784

2785
                                    blockStartIndex = index;
2786
                                    endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2787
                                    endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2788
                                } while (index != firstIndex + actualCount);
2789

2790
                                MOODYCAMEL_RETHROW;
2791
                            }
2792
                        }
2793
                        if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2794
                            {
2795
#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2796
                                debug::DebugLock lock(mutex);
2797
#endif
2798
                                // Note that the set_many_empty above did a release, meaning that anybody who acquires the block
2799
                                // we're about to free can use it safely since our writes (and reads!) will have happened-before then.
2800
                                entry->value.store(nullptr, std::memory_order_relaxed);
2801
                            }
2802
                            this->parent->add_block_to_free_list(block);    // releases the above store
2803
                        }
2804
                        indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2805
                    } while (index != firstIndex + actualCount);
2806

2807
                    return actualCount;
2808
                }
2809
                else {
2810
                    this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2811
                }
2812
            }
2813

2814
            return 0;
2815
        }
2816

2817
    private:
2818
        // The block size must be > 1, so any number with the low bit set is an invalid block base index
2819
        static const index_t INVALID_BLOCK_BASE = 1;
2820

2821
        struct BlockIndexEntry
2822
        {
2823
            std::atomic<index_t> key;
2824
            std::atomic<Block*> value;
2825
        };
2826

2827
        struct BlockIndexHeader
2828
        {
2829
            size_t capacity;
2830
            std::atomic<size_t> tail;
2831
            BlockIndexEntry* entries;
2832
            BlockIndexEntry** index;
2833
            BlockIndexHeader* prev;
2834
        };
2835

2836
        template<AllocationMode allocMode>
2837
        inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry, index_t blockStartIndex)
10,197✔
2838
        {
2839
            auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);    // We're the only writer thread, relaxed is OK
10,197✔
2840
            if (localBlockIndex == nullptr) {
10,197✔
2841
                return false;  // this can happen if new_block_index failed in the constructor
×
2842
            }
2843
            auto newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
10,197✔
2844
            idxEntry = localBlockIndex->index[newTail];
10,197✔
2845
            if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
10,197✔
2846
                idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
6,085✔
2847

2848
                idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
10,174✔
2849
                localBlockIndex->tail.store(newTail, std::memory_order_release);
10,174✔
2850
                return true;
10,174✔
2851
            }
2852

2853
            // No room in the old block index, try to allocate another one!
2854
            if (allocMode == CannotAlloc || !new_block_index()) {
23✔
2855
                return false;
×
2856
            }
2857
            localBlockIndex = blockIndex.load(std::memory_order_relaxed);
23✔
2858
            newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
23✔
2859
            idxEntry = localBlockIndex->index[newTail];
23✔
2860
            assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
23✔
2861
            idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
23✔
2862
            localBlockIndex->tail.store(newTail, std::memory_order_release);
23✔
2863
            return true;
23✔
2864
        }
10,197✔
2865

2866
        inline void rewind_block_index_tail()
×
2867
        {
2868
            auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
×
2869
            localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
×
2870
        }
×
2871

2872
        inline BlockIndexEntry* get_block_index_entry_for_index(index_t index) const
279,116✔
2873
        {
2874
            BlockIndexHeader* localBlockIndex;
2875
            auto idx = get_block_index_index_for_index(index, localBlockIndex);
279,116✔
2876
            return localBlockIndex->index[idx];
279,116✔
2877
        }
2878

2879
        inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader*& localBlockIndex) const
279,113✔
2880
        {
2881
#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2882
            debug::DebugLock lock(mutex);
2883
#endif
2884
            index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
279,089✔
2885
            localBlockIndex = blockIndex.load(std::memory_order_acquire);
279,089✔
2886
            auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
279,089✔
2887
            auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
279,089✔
2888
            assert(tailBase != INVALID_BLOCK_BASE);
279,113✔
2889
            // Note: Must use division instead of shift because the index may wrap around, causing a negative
2890
            // offset, whose negativity we want to preserve
2891
            auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / BLOCK_SIZE);
279,088✔
2892
            size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
279,088✔
2893
            assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr);
279,089✔
2894
            return idx;
279,088✔
2895
        }
2896

2897
        bool new_block_index()
1,720✔
2898
        {
2899
            auto prev = blockIndex.load(std::memory_order_relaxed);
1,720✔
2900
            size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
1,720✔
2901
            auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity;
1,720✔
2902
            auto raw = static_cast<char*>((Traits::malloc)(
1,720✔
2903
                sizeof(BlockIndexHeader) +
2904
                std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * entryCount +
1,720✔
2905
                // NOLINTNEXTLINE(bugprone-sizeof-expression)
2906
                std::alignment_of<BlockIndexEntry*>::value - 1 + sizeof(BlockIndexEntry*) * nextBlockIndexCapacity));
1,720✔
2907
            if (raw == nullptr) {
1,720✔
2908
                return false;
×
2909
            }
2910

2911
            auto header = new (raw) BlockIndexHeader;
1,720✔
2912
            auto entries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(raw + sizeof(BlockIndexHeader)));
1,720✔
2913
            auto index = reinterpret_cast<BlockIndexEntry**>(details::align_for<BlockIndexEntry*>(reinterpret_cast<char*>(entries) + sizeof(BlockIndexEntry) * entryCount));
1,720✔
2914
            if (prev != nullptr) {
1,720✔
2915
                auto prevTail = prev->tail.load(std::memory_order_relaxed);
23✔
2916
                auto prevPos = prevTail;
23✔
2917
                size_t i = 0;
23✔
2918
                do {
23✔
2919
                    prevPos = (prevPos + 1) & (prev->capacity - 1);
1,504✔
2920
                    index[i++] = prev->index[prevPos];
1,504✔
2921
                } while (prevPos != prevTail);
1,504✔
2922
                assert(i == prevCapacity);
23✔
2923
            }
23✔
2924
            for (size_t i = 0; i != entryCount; ++i) {
57,528✔
2925
                new (entries + i) BlockIndexEntry;
55,808✔
2926
                entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
55,808✔
2927
                index[prevCapacity + i] = entries + i;
55,808✔
2928
            }
55,808✔
2929
            header->prev = prev;
1,720✔
2930
            header->entries = entries;
1,720✔
2931
            header->index = index;
1,720✔
2932
            header->capacity = nextBlockIndexCapacity;
1,720✔
2933
            header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
1,720✔
2934

2935
            blockIndex.store(header, std::memory_order_release);
1,720✔
2936

2937
            nextBlockIndexCapacity <<= 1;
1,720✔
2938

2939
            return true;
1,720✔
2940
        }
1,720✔
2941

2942
    private:
2943
        size_t nextBlockIndexCapacity;
2944
        std::atomic<BlockIndexHeader*> blockIndex;
2945

2946
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2947
    public:
2948
        details::ThreadExitListener threadExitListener;
2949
    private:
2950
#endif
2951

2952
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2953
    public:
2954
        ImplicitProducer* nextImplicitProducer;
2955
    private:
2956
#endif
2957

2958
#if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2959
        mutable debug::DebugMutex mutex;
2960
#endif
2961
#if MCDBGQ_TRACKMEM
2962
        friend struct MemStats;
2963
#endif
2964
    };
2965

2966

2967
    //////////////////////////////////
2968
    // Block pool manipulation
2969
    //////////////////////////////////
2970

2971
    void populate_initial_block_list(size_t blockCount)
972✔
2972
    {
2973
        initialBlockPoolSize = blockCount;
972✔
2974
        if (initialBlockPoolSize == 0) {
972✔
2975
            initialBlockPool = nullptr;
×
2976
            return;
×
2977
        }
2978

2979
        initialBlockPool = create_array<Block>(blockCount);
972✔
2980
        if (initialBlockPool == nullptr) {
972✔
2981
            initialBlockPoolSize = 0;
×
2982
        }
×
2983
        for (size_t i = 0; i < initialBlockPoolSize; ++i) {
32,076✔
2984
            initialBlockPool[i].dynamicallyAllocated = false;
31,104✔
2985
        }
31,104✔
2986
    }
972✔
2987

2988
    inline Block* try_get_block_from_initial_pool()
10,196✔
2989
    {
2990
        if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
10,196✔
2991
            return nullptr;
7,540✔
2992
        }
2993

2994
        auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
2,657✔
2995

2996
        return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
2,656✔
2997
    }
10,197✔
2998

2999
    inline void add_block_to_free_list(Block* block)
10,197✔
3000
    {
3001
#if MCDBGQ_TRACKMEM
3002
        block->owner = nullptr;
3003
#endif
3004
        freeList.add(block);
10,197✔
3005
    }
10,197✔
3006

3007
    inline void add_blocks_to_free_list(Block* block)
3008
    {
3009
        while (block != nullptr) {
3010
            auto next = block->next;
3011
            add_block_to_free_list(block);
3012
            block = next;
3013
        }
3014
    }
3015

3016
    inline Block* try_get_block_from_free_list()
7,540✔
3017
    {
3018
        return freeList.try_get();
7,540✔
3019
    }
3020

3021
    // Gets a free block from one of the memory pools, or allocates a new one (if applicable)
3022
    template<AllocationMode canAlloc>
3023
    Block* requisition_block()
10,197✔
3024
    {
3025
        auto block = try_get_block_from_initial_pool();
10,197✔
3026
        if (block != nullptr) {
10,197✔
3027
            return block;
2,657✔
3028
        }
3029

3030
        block = try_get_block_from_free_list();
7,540✔
3031
        if (block != nullptr) {
7,540✔
3032
            return block;
6,544✔
3033
        }
3034

3035
        if (canAlloc == CanAlloc) {
3036
            return create<Block>();
996✔
3037
        }
3038

3039
        return nullptr;
3040
    }
10,197✔
3041

3042

3043
#if MCDBGQ_TRACKMEM
3044
    public:
3045
        struct MemStats {
3046
            size_t allocatedBlocks;
3047
            size_t usedBlocks;
3048
            size_t freeBlocks;
3049
            size_t ownedBlocksExplicit;
3050
            size_t ownedBlocksImplicit;
3051
            size_t implicitProducers;
3052
            size_t explicitProducers;
3053
            size_t elementsEnqueued;
3054
            size_t blockClassBytes;
3055
            size_t queueClassBytes;
3056
            size_t implicitBlockIndexBytes;
3057
            size_t explicitBlockIndexBytes;
3058

3059
            friend class ConcurrentQueue;
3060

3061
        private:
3062
            static MemStats getFor(ConcurrentQueue* q)
3063
            {
3064
                MemStats stats = { 0 };
3065

3066
                stats.elementsEnqueued = q->size_approx();
3067

3068
                auto block = q->freeList.head_unsafe();
3069
                while (block != nullptr) {
3070
                    ++stats.allocatedBlocks;
3071
                    ++stats.freeBlocks;
3072
                    block = block->freeListNext.load(std::memory_order_relaxed);
3073
                }
3074

3075
                for (auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3076
                    bool implicit = dynamic_cast<ImplicitProducer*>(ptr) != nullptr;
3077
                    stats.implicitProducers += implicit ? 1 : 0;
3078
                    stats.explicitProducers += implicit ? 0 : 1;
3079

3080
                    if (implicit) {
3081
                        auto prod = static_cast<ImplicitProducer*>(ptr);
3082
                        stats.queueClassBytes += sizeof(ImplicitProducer);
3083
                        auto head = prod->headIndex.load(std::memory_order_relaxed);
3084
                        auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3085
                        auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3086
                        if (hash != nullptr) {
3087
                            for (size_t i = 0; i != hash->capacity; ++i) {
3088
                                if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) != nullptr) {
3089
                                    ++stats.allocatedBlocks;
3090
                                    ++stats.ownedBlocksImplicit;
3091
                                }
3092
                            }
3093
                            stats.implicitBlockIndexBytes += hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry);
3094
                            for (; hash != nullptr; hash = hash->prev) {
3095
                                stats.implicitBlockIndexBytes += sizeof(typename ImplicitProducer::BlockIndexHeader) + hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*);
3096
                            }
3097
                        }
3098
                        for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3099
                            //auto block = prod->get_block_index_entry_for_index(head);
3100
                            ++stats.usedBlocks;
3101
                        }
3102
                    }
3103
                    else {
3104
                        auto prod = static_cast<ExplicitProducer*>(ptr);
3105
                        stats.queueClassBytes += sizeof(ExplicitProducer);
3106
                        auto tailBlock = prod->tailBlock;
3107
                        bool wasNonEmpty = false;
3108
                        if (tailBlock != nullptr) {
3109
                            auto block = tailBlock;
3110
                            do {
3111
                                ++stats.allocatedBlocks;
3112
                                if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3113
                                    ++stats.usedBlocks;
3114
                                    wasNonEmpty = wasNonEmpty || block != tailBlock;
3115
                                }
3116
                                ++stats.ownedBlocksExplicit;
3117
                                block = block->next;
3118
                            } while (block != tailBlock);
3119
                        }
3120
                        auto index = prod->blockIndex.load(std::memory_order_relaxed);
3121
                        while (index != nullptr) {
3122
                            stats.explicitBlockIndexBytes += sizeof(typename ExplicitProducer::BlockIndexHeader) + index->size * sizeof(typename ExplicitProducer::BlockIndexEntry);
3123
                            index = static_cast<typename ExplicitProducer::BlockIndexHeader*>(index->prev);
3124
                        }
3125
                    }
3126
                }
3127

3128
                auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3129
                stats.allocatedBlocks += freeOnInitialPool;
3130
                stats.freeBlocks += freeOnInitialPool;
3131

3132
                stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks;
3133
                stats.queueClassBytes += sizeof(ConcurrentQueue);
3134

3135
                return stats;
3136
            }
3137
        };
3138

3139
        // For debugging only. Not thread-safe.
3140
        MemStats getMemStats()
3141
        {
3142
            return MemStats::getFor(this);
3143
        }
3144
    private:
3145
        friend struct MemStats;
3146
#endif
3147

3148

3149
    //////////////////////////////////
3150
    // Producer list manipulation
3151
    //////////////////////////////////
3152

3153
    ProducerBase* recycle_or_create_producer(bool isExplicit)
3154
    {
3155
        bool recycled;
3156
        return recycle_or_create_producer(isExplicit, recycled);
3157
    }
3158

3159
    ProducerBase* recycle_or_create_producer(bool isExplicit, bool& recycled)
1,697✔
3160
    {
3161
#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3162
        debug::DebugLock lock(implicitProdMutex);
3163
#endif
3164
        // Try to re-use one first
3165
        for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
16,120✔
3166
            if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
14,423✔
3167
                bool expected = true;
×
3168
                if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) {
×
3169
                    // We caught one! It's been marked as activated, the caller can have it
3170
                    recycled = true;
×
3171
                    return ptr;
×
3172
                }
3173
            }
×
3174
        }
14,423✔
3175

3176
        recycled = false;
1,697✔
3177
        return add_producer(isExplicit ? static_cast<ProducerBase*>(create<ExplicitProducer>(this)) : create<ImplicitProducer>(this));
1,697✔
3178
    }
1,697✔
3179

3180
    ProducerBase* add_producer(ProducerBase* producer)
1,697✔
3181
    {
3182
        // Handle failed memory allocation
3183
        if (producer == nullptr) {
1,697✔
3184
            return nullptr;
×
3185
        }
3186

3187
        producerCount.fetch_add(1, std::memory_order_relaxed);
1,697✔
3188

3189
        // Add it to the lock-free list
3190
        auto prevTail = producerListTail.load(std::memory_order_relaxed);
1,697✔
3191
        do {
1,697✔
3192
            producer->next = prevTail;
1,697✔
3193
        } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
1,697✔
3194

3195
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3196
        if (producer->isExplicit) {
3197
            auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3198
            do {
3199
                static_cast<ExplicitProducer*>(producer)->nextExplicitProducer = prevTailExplicit;
3200
            } while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3201
        }
3202
        else {
3203
            auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3204
            do {
3205
                static_cast<ImplicitProducer*>(producer)->nextImplicitProducer = prevTailImplicit;
3206
            } while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3207
        }
3208
#endif
3209

3210
        return producer;
1,697✔
3211
    }
1,697✔
3212

3213
    void reown_producers()
3214
    {
3215
        // After another instance is moved-into/swapped-with this one, all the
3216
        // producers we stole still think their parents are the other queue.
3217
        // So fix them up!
3218
        for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) {
3219
            ptr->parent = this;
3220
        }
3221
    }
3222

3223

3224
    //////////////////////////////////
3225
    // Implicit producer hash
3226
    //////////////////////////////////
3227

3228
    struct ImplicitProducerKVP
3229
    {
3230
        std::atomic<details::thread_id_t> key;
3231
        ImplicitProducer* value;    // No need for atomicity since it's only read by the thread that sets it in the first place
3232

3233
        ImplicitProducerKVP() : value(nullptr) { }
35,584✔
3234

3235
        ImplicitProducerKVP(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3236
        {
3237
            key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3238
            value = other.value;
3239
        }
3240

3241
        inline ImplicitProducerKVP& operator=(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3242
        {
3243
            swap(other);
3244
            return *this;
3245
        }
3246

3247
        inline void swap(ImplicitProducerKVP& other) MOODYCAMEL_NOEXCEPT
3248
        {
3249
            if (this != &other) {
3250
                details::swap_relaxed(key, other.key);
3251
                std::swap(value, other.value);
3252
            }
3253
        }
3254
    };
3255

3256
    template<typename XT, typename XTraits>
3257
    friend void hpx::concurrency::swap(typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&, typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&) MOODYCAMEL_NOEXCEPT;
3258

3259
    struct ImplicitProducerHash
3260
    {
3261
        size_t capacity;
3262
        ImplicitProducerKVP* entries;
3263
        ImplicitProducerHash* prev;
3264
    };
3265

3266
    inline void populate_initial_implicit_producer_hash()
972✔
3267
    {
3268
        if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3269

3270
        implicitProducerHashCount.store(0, std::memory_order_relaxed);
972✔
3271
        auto hash = &initialImplicitProducerHash;
972✔
3272
        hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
972✔
3273
        hash->entries = &initialImplicitProducerHashEntries[0];
972✔
3274
        for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
32,076✔
3275
            initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
31,104✔
3276
        }
31,104✔
3277
        hash->prev = nullptr;
972✔
3278
        implicitProducerHash.store(hash, std::memory_order_relaxed);
972✔
3279
    }
972✔
3280

3281
    void swap_implicit_producer_hashes(ConcurrentQueue& other)
3282
    {
3283
        if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3284

3285
        // Swap (assumes our implicit producer hash is initialized)
3286
        initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3287
        initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3288
        other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3289

3290
        details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3291

3292
        details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3293
        if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3294
            implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3295
        }
3296
        else {
3297
            ImplicitProducerHash* hash;
3298
            for (hash = implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3299
                continue;
3300
            }
3301
            hash->prev = &initialImplicitProducerHash;
3302
        }
3303
        if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3304
            other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3305
        }
3306
        else {
3307
            ImplicitProducerHash* hash;
3308
            for (hash = other.implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3309
                continue;
3310
            }
3311
            hash->prev = &other.initialImplicitProducerHash;
3312
        }
3313
    }
3314

3315
    // Only fails (returns nullptr) if memory allocation fails
3316
    ImplicitProducer* get_or_add_implicit_producer()
279,115✔
3317
    {
3318
        // Note that since the data is essentially thread-local (key is thread ID),
3319
        // there's a reduced need for fences (memory ordering is already consistent
3320
        // for any individual thread), except for the current table itself.
3321

3322
        // Start by looking for the thread ID in the current and all previous hash tables.
3323
        // If it's not found, it must not be in there yet, since this same thread would
3324
        // have added it previously to one of the tables that we traversed.
3325

3326
        // Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
3327

3328
#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3329
        debug::DebugLock lock(implicitProdMutex);
3330
#endif
3331

3332
        auto id = details::thread_id();
279,111✔
3333
        auto hashedId = details::hash_thread_id(id);
279,111✔
3334

3335
        auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
279,111✔
3336
        for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
281,266✔
3337
            // Look for the id in this hash
3338
            auto index = hashedId;
279,564✔
3339
            while (true) {    // Not an infinite loop because at least one slot is free in the hash table
280,373✔
3340
                index &= hash->capacity - 1;
280,369✔
3341

3342
                auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
280,369✔
3343
                if (probedKey == id) {
280,369✔
3344
                    // Found it! If we had to search several hashes deep, though, we should lazily add it
3345
                    // to the current main hash table to avoid the extended search next time.
3346
                    // Note there's guaranteed to be room in the current hash table since every subsequent
3347
                    // table implicitly reserves space for all previous tables (there's only one
3348
                    // implicitProducerHashCount).
3349
                    auto value = hash->entries[index].value;
277,410✔
3350
                    if (hash != mainHash) {
277,410✔
3351
                        index = hashedId;
139✔
3352
                        while (true) {
143✔
3353
                            index &= mainHash->capacity - 1;
143✔
3354
                            probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
143✔
3355
                            auto empty = details::invalid_thread_id;
143✔
3356
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3357
                            auto reusable = details::invalid_thread_id2;
3358
                            if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty,    id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3359
                                (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3360
#else
3361
                            if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty,    id, std::memory_order_relaxed, std::memory_order_relaxed))) {
143✔
3362
#endif
3363
                                mainHash->entries[index].value = value;
139✔
3364
                                break;
139✔
3365
                            }
3366
                            ++index;
4✔
3367
                        }
3368
                    }
139✔
3369

3370
                    return value;
277,411✔
3371
                }
3372
                if (probedKey == details::invalid_thread_id) {
2,959✔
3373
                    break;    // Not in this hash table
2,151✔
3374
                }
3375
                ++index;
808✔
3376
            }
3377
        }
2,151✔
3378

3379
        // Insert!
3380
        auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
1,697✔
3381
        while (true) {
1,697✔
3382
            if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) {
1,697✔
3383
                // We've acquired the resize lock, try to allocate a bigger hash table.
3384
                // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
3385
                // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
3386
                // locked block).
3387
                mainHash = implicitProducerHash.load(std::memory_order_acquire);
70✔
3388
                if (newCount >= (mainHash->capacity >> 1)) {
70✔
3389
                    auto newCapacity = mainHash->capacity << 1;
70✔
3390
                    while (newCount >= (newCapacity >> 1)) {
70✔
3391
                        newCapacity <<= 1;
×
3392
                    }
3393
                    auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 + sizeof(ImplicitProducerKVP) * newCapacity));
70✔
3394
                    if (raw == nullptr) {
70✔
3395
                        // Allocation failed
3396
                        implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
×
3397
                        implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
×
3398
                        return nullptr;
×
3399
                    }
3400

3401
                    auto newHash = new (raw) ImplicitProducerHash;
70✔
3402
                    newHash->capacity = newCapacity;
70✔
3403
                    newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash)));
70✔
3404
                    for (size_t i = 0; i != newCapacity; ++i) {
4,550✔
3405
                        new (newHash->entries + i) ImplicitProducerKVP;
4,480✔
3406
                        newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
4,480✔
3407
                    }
4,480✔
3408
                    newHash->prev = mainHash;
70✔
3409
                    implicitProducerHash.store(newHash, std::memory_order_release);
70✔
3410
                    implicitProducerHashResizeInProgress.clear(std::memory_order_release);
70✔
3411
                    mainHash = newHash;
70✔
3412
                }
70✔
3413
                else {
3414
                    implicitProducerHashResizeInProgress.clear(std::memory_order_release);
×
3415
                }
3416
            }
70✔
3417

3418
            // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
3419
            // to finish being allocated by another thread (and if we just finished allocating above, the condition will
3420
            // always be true)
3421
            if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
1,697✔
3422
                bool recycled;
3423
                auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false, recycled));
1,697✔
3424
                if (producer == nullptr) {
1,697✔
3425
                    implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
×
3426
                    return nullptr;
×
3427
                }
3428
                if (recycled) {
1,697✔
3429
                    implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
×
3430
                }
×
3431

3432
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3433
                producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3434
                producer->threadExitListener.userData = producer;
3435
                details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3436
#endif
3437

3438
                auto index = hashedId;
1,697✔
3439
                while (true) {
2,080✔
3440
                    index &= mainHash->capacity - 1;
2,080✔
3441
                    auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
2,080✔
3442

3443
                    auto empty = details::invalid_thread_id;
2,080✔
3444
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3445
                    auto reusable = details::invalid_thread_id2;
3446
                    if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty,    id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3447
                        (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3448
#else
3449
                    if ((probedKey == empty    && mainHash->entries[index].key.compare_exchange_strong(empty,    id, std::memory_order_relaxed, std::memory_order_relaxed))) {
2,080✔
3450
#endif
3451
                        mainHash->entries[index].value = producer;
1,697✔
3452
                        break;
1,697✔
3453
                    }
3454
                    ++index;
383✔
3455
                }
3456
                return producer;
1,697✔
3457
            }
3458

3459
            // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
3460
            // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
3461
            // we try to allocate ourselves).
3462
            mainHash = implicitProducerHash.load(std::memory_order_acquire);
×
3463
        }
3464
    }
279,108✔
3465

3466
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3467
    void implicit_producer_thread_exited(ImplicitProducer* producer)
3468
    {
3469
        // Remove from thread exit listeners
3470
        details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
3471

3472
        // Remove from hash
3473
#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3474
        debug::DebugLock lock(implicitProdMutex);
3475
#endif
3476
        auto hash = implicitProducerHash.load(std::memory_order_acquire);
3477
        assert(hash != nullptr);    // The thread exit listener is only registered if we were added to a hash in the first place
3478
        auto id = details::thread_id();
3479
        auto hashedId = details::hash_thread_id(id);
3480
        details::thread_id_t probedKey;
3481

3482
        // We need to traverse all the hashes just in case other threads aren't on the current one yet and are
3483
        // trying to add an entry thinking there's a free slot (because they reused a producer)
3484
        for (; hash != nullptr; hash = hash->prev) {
3485
            auto index = hashedId;
3486
            do {
3487
                index &= hash->capacity - 1;
3488
                probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3489
                if (probedKey == id) {
3490
                    hash->entries[index].key.store(details::invalid_thread_id2, std::memory_order_release);
3491
                    break;
3492
                }
3493
                ++index;
3494
            } while (probedKey != details::invalid_thread_id);    // Can happen if the hash has changed but we weren't put back in it yet, or if we weren't added to this hash in the first place
3495
        }
3496

3497
        // Mark the queue as being recyclable
3498
        producer->inactive.store(true, std::memory_order_release);
3499
    }
3500

3501
    static void implicit_producer_thread_exited_callback(void* userData)
3502
    {
3503
        auto producer = static_cast<ImplicitProducer*>(userData);
3504
        auto queue = producer->parent;
3505
        queue->implicit_producer_thread_exited(producer);
3506
    }
3507
#endif
3508

3509
    //////////////////////////////////
3510
    // Utility functions
3511
    //////////////////////////////////
3512

3513
    template<typename U>
3514
    static inline U* create_array(size_t count)
972✔
3515
    {
3516
        assert(count > 0);
972✔
3517
        auto p = static_cast<U*>((Traits::malloc)(sizeof(U) * count));
972✔
3518
        if (p == nullptr) {
972✔
3519
            return nullptr;
×
3520
        }
3521

3522
        for (size_t i = 0; i != count; ++i) {
32,076✔
3523
            new (p + i) U();
31,104✔
3524
        }
31,104✔
3525
        return p;
972✔
3526
    }
972✔
3527

3528
    template<typename U>
3529
    static inline void destroy_array(U* p, size_t count)
972✔
3530
    {
3531
        if (p != nullptr) {
972✔
3532
            assert(count > 0);
972✔
3533
            for (size_t i = count; i != 0; ) {
32,076✔
3534
                (p + --i)->~U();
31,104✔
3535
            }
3536
            (Traits::free)(p);
972✔
3537
        }
972✔
3538
    }
972✔
3539

3540
    template<typename U>
3541
    static inline U* create()
996✔
3542
    {
3543
        auto p = (Traits::malloc)(sizeof(U));
996✔
3544
        return p != nullptr ? new (p) U : nullptr;
996✔
3545
    }
3546

3547
    template<typename U, typename A1>
3548
    static inline U* create(A1&& a1)
1,697✔
3549
    {
3550
        auto p = (Traits::malloc)(sizeof(U));
1,697✔
3551
        return p != nullptr ? new (p) U(HPX_FORWARD(A1, a1)) : nullptr;
1,697✔
3552
    }
3553

3554
    template<typename U>
3555
    static inline void destroy(U* p)
2,693✔
3556
    {
3557
        if (p != nullptr) {
2,693✔
3558
            p->~U();
2,693✔
3559
        }
2,693✔
3560
        (Traits::free)(p);
2,693✔
3561
    }
2,693✔
3562

3563
private:
3564
    std::atomic<ProducerBase*> producerListTail;
3565
    std::atomic<std::uint32_t> producerCount;
3566

3567
    std::atomic<size_t> initialBlockPoolIndex;
3568
    Block* initialBlockPool;
3569
    size_t initialBlockPoolSize;
3570

3571
#if !MCDBGQ_USEDEBUGFREELIST
3572
    FreeList<Block> freeList;
3573
#else
3574
    debug::DebugFreeList<Block> freeList;
3575
#endif
3576

3577
    std::atomic<ImplicitProducerHash*> implicitProducerHash;
3578
    std::atomic<size_t> implicitProducerHashCount;    // Number of slots logically used
3579
    ImplicitProducerHash initialImplicitProducerHash;
3580
    std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3581
    std::atomic_flag implicitProducerHashResizeInProgress;
3582

3583
    std::atomic<std::uint32_t> nextExplicitConsumerId;
3584
    std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3585

3586
#if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3587
    debug::DebugMutex implicitProdMutex;
3588
#endif
3589

3590
#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3591
    std::atomic<ExplicitProducer*> explicitProducers;
3592
    std::atomic<ImplicitProducer*> implicitProducers;
3593
#endif
3594
};
3595

3596

3597
template<typename T, typename Traits>
3598
ProducerToken::ProducerToken(ConcurrentQueue<T, Traits>& queue)
3599
    : producer(queue.recycle_or_create_producer(true))
3600
{
3601
    if (producer != nullptr) {
3602
        producer->token = this;
3603
    }
3604
}
3605

3606
template<typename T, typename Traits>
3607
ProducerToken::ProducerToken(BlockingConcurrentQueue<T, Traits>& queue)
3608
    : producer(reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->recycle_or_create_producer(true))
3609
{
3610
    if (producer != nullptr) {
3611
        producer->token = this;
3612
    }
3613
}
3614

3615
template<typename T, typename Traits>
3616
ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits>& queue)
3617
    : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3618
{
3619
    initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3620
    lastKnownGlobalOffset = -1;
3621
}
3622

3623
template<typename T, typename Traits>
3624
ConsumerToken::ConsumerToken(BlockingConcurrentQueue<T, Traits>& queue)
3625
    : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3626
{
3627
    initialOffset = reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3628
    lastKnownGlobalOffset = -1;
3629
}
3630

3631
template<typename T, typename Traits>
3632
inline void swap(ConcurrentQueue<T, Traits>& a, ConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
3633
{
3634
    a.swap(b);
3635
}
3636

3637
inline void swap(ProducerToken& a, ProducerToken& b) MOODYCAMEL_NOEXCEPT
3638
{
3639
    a.swap(b);
3640
}
3641

3642
inline void swap(ConsumerToken& a, ConsumerToken& b) MOODYCAMEL_NOEXCEPT
3643
{
3644
    a.swap(b);
3645
}
3646

3647
template<typename T, typename Traits>
3648
inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT
3649
{
3650
    a.swap(b);
3651
}
3652

3653
} }
3654

3655
#if defined(__GNUC__)
3656
#pragma GCC diagnostic pop
3657
#endif
3658

3659
// clang-format on
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc