• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #850

15 Dec 2022 03:00PM UTC coverage: 86.404% (+0.7%) from 85.715%
#850

push

StellarBot
Merge #6098

6098: Forking Boost.Lockfree r=hkaiser a=hkaiser

Working towards #3440 

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

1285 of 1285 new or added lines in 18 files covered. (100.0%)

174060 of 201449 relevant lines covered (86.4%)

1882623.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.38
/libs/core/concurrency/include/hpx/concurrency/queue.hpp
1
//  Copyright (C) 2008-2013 Tim Blechmann
2
//  Copyright (c) 2022 Hartmut Kaiser
3
//
4
//  SPDX-License-Identifier: BSL-1.0
5
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
6
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7
//
8
//  lock-free queue from
9
//  Michael, M. M. and Scott, M. L.,
10
//  "simple, fast and practical non-blocking and blocking concurrent queue algorithms"
11

12
#pragma once
13

14
#include <hpx/config.hpp>
15
#include <hpx/assert.hpp>
16
#include <hpx/concurrency/cache_line_data.hpp>
17
#include <hpx/concurrency/detail/copy_payload.hpp>
18
#include <hpx/concurrency/detail/freelist_stack.hpp>
19
#include <hpx/concurrency/detail/tagged_ptr.hpp>
20

21
#include <atomic>
22
#include <cstddef>
23
#include <memory>
24
#include <type_traits>
25

26
namespace hpx::lockfree {
27
    /**
28
     * The queue class provides a multi-writer/multi-reader queue, pushing and
29
     * popping is lock-free,
30
     *  construction/destruction has to be synchronized. It uses a freelist for
31
     *  memory management, freed nodes are pushed to the freelist and not
32
     *  returned to the OS before the queue is destroyed.
33
     *
34
     *  \b Policies:
35
     *  - \ref hpx::lockfree::fixed_sized, defaults to \c
36
     *    hpx::lockfree::fixed_sized<false> \n Can be used to completely
37
     *    disable dynamic memory allocations during push in order to ensure
38
     *    lockfree behavior. \n If the data structure is configured as
39
     *    fixed-sized, the internal nodes are stored inside an array and they
40
     *    are addressed by array indexing. This limits the possible size of the
41
     *    queue to the number of elements that can be addressed by the index
42
     *    type (usually 2**16-2), but on platforms that lack double-width
43
     *    compare-and-exchange instructions, this is the best way to achieve
44
     *    lock-freedom.
45
     *
46
     *  - \ref hpx::lockfree::capacity, optional \n If this template argument
47
     *    is passed to the options, the size of the queue is set at
48
     *    compile-time.\n This option implies \c fixed_sized<true>
49
     *
50
     *  - \ref hpx::lockfree::allocator, defaults to \c
51
     *    hpx::lockfree::allocator<std::allocator<void>> \n Specifies the
52
     *    allocator that is used for the internal freelist
53
     *
54
     *  \b Requirements:
55
     *   - T must have a copy constructor
56
     *   - T must have a trivial assignment operator
57
     *   - T must have a trivial destructor
58
     */
59
    template <typename T, typename Allocator = std::allocator<T>,
60
        std::size_t Capacity = 0, bool IsFixedSize = false>
61
    class queue
62
    {
63
    private:
64
        static_assert(std::is_trivially_destructible_v<T>);
65
        static_assert(std::is_trivially_copyable_v<T>);
66

67
        static constexpr bool has_capacity = Capacity != 0;
68
        static constexpr bool fixed_sized = IsFixedSize;
69
        static constexpr bool node_based = !(has_capacity || fixed_sized);
70
        static constexpr bool compile_time_sized = has_capacity;
71

72
        // the queue uses one dummy node
73
        static constexpr std::size_t capacity = Capacity + 1;
74

75
        struct node;
76
        struct node_data
77
        {
78
            using tagged_node_handle =
79
                typename detail::select_tagged_handle<node,
80
                    node_based>::tagged_handle_type;
81
            using handle_type = typename detail::select_tagged_handle<node,
82
                node_based>::handle_type;
83

84
            template <typename T_>
85
            node_data(T_&& t, handle_type null_handle) noexcept(
1,660,283✔
86
                noexcept(std::is_nothrow_constructible_v<T, T_&&>))
87
              : data(HPX_FORWARD(T_, t))
1,660,283✔
88
            {
89
                /* increment tag to avoid ABA problem */
90
                tagged_node_handle old_next =
91
                    next.load(std::memory_order_relaxed);
1,660,283✔
92
                tagged_node_handle new_next(
1,660,283✔
93
                    null_handle, old_next.get_next_tag());
1,660,283✔
94
                next.store(new_next, std::memory_order_release);
1,660,283✔
95
            }
1,660,283✔
96

97
            explicit node_data(handle_type null_handle) noexcept
610✔
98
              : next(tagged_node_handle(null_handle, 0))
610✔
99
            {
100
            }
610✔
101

102
            node_data() noexcept {}
103

104
            std::atomic<tagged_node_handle> next;
105
            T data;
106
        };
107

108
        struct node : hpx::util::cache_aligned_data_derived<node_data>
109
        {
110
            using hpx::util::cache_aligned_data_derived<
111
                node_data>::cache_aligned_data_derived;
1,660,893✔
112
        };
113

114
        using node_allocator = typename std::allocator_traits<
115
            Allocator>::template rebind_alloc<node>;
116

117
        using pool_t = typename detail::select_freelist<node, node_allocator,
118
            compile_time_sized, fixed_sized, capacity>::type;
119

120
        using tagged_node_handle = typename pool_t::tagged_node_handle;
121
        using handle_type = typename detail::select_tagged_handle<node,
122
            node_based>::handle_type;
123

124
        using index_t = typename tagged_node_handle::index_t;
125

126
        static constexpr auto null_index_t() noexcept
1,220✔
127
        {
128
            if constexpr (std::is_pointer_v<index_t>)
129
            {
130
                return nullptr;
1,216✔
131
            }
132
            else
133
            {
134
                return static_cast<index_t>(0);
4✔
135
            }
136
        }
137

138
        void initialize()
610✔
139
        {
140
            node* n = pool.template construct<true, false>(pool.null_handle());
610✔
141
            tagged_node_handle dummy_node(pool.get_handle(n), 0);
610✔
142
            head_.store(dummy_node, std::memory_order_relaxed);
610✔
143
            tail_.store(dummy_node, std::memory_order_release);
610✔
144
        }
610✔
145

146
        struct implementation_defined
147
        {
148
            using allocator = node_allocator;
149
            using size_type = std::size_t;
150
        };
151

152
        queue(queue const&) = delete;
153
        queue& operator=(queue const&) = delete;
154

155
    public:
156
        using value_type = T;
157
        using allocator = typename implementation_defined::allocator;
158
        using size_type = typename implementation_defined::size_type;
159

160
        /**
161
         * \return true, if implementation is lock-free.
162
         *
163
         * \warning It only checks, if the queue head and tail nodes and the
164
         *          freelist can be modified in a lock-free manner.
165
         *       On most platforms, the whole implementation is lock-free, if this
166
         *       is true. Using c++0x-style atomics, there is no possibility to
167
         *       provide a completely accurate implementation, because one would
168
         *       need to test every internal node, which is impossible if further
169
         *       nodes will be allocated from the operating system.
170
         */
171
        constexpr bool is_lock_free() const noexcept
4✔
172
        {
173
            return head_.is_lock_free() && tail_.is_lock_free() &&
8✔
174
                pool.is_lock_free();
4✔
175
        }
176

177
        /** Construct a fixed-sized queue
178
         *
179
         *  \pre Must specify a capacity<> argument
180
         */
181
        queue()
2✔
182
          : head_(tagged_node_handle(null_index_t(), 0))
2✔
183
          , tail_(tagged_node_handle(null_index_t(), 0))
2✔
184
          , pool(node_allocator(), capacity)
2✔
185
        {
186
            // Don't use static_assert() here since it will be evaluated when
187
            // compiling this function and this function may be compiled even
188
            // when it isn't being used.
189
            HPX_ASSERT(has_capacity);
190
            initialize();
2✔
191
        }
2✔
192

193
        /** Construct a fixed-sized queue with a custom allocator
194
         *
195
         *  \pre Must specify a capacity<> argument
196
         */
197
        template <typename U>
198
        explicit queue(typename std::allocator_traits<
199
            Allocator>::template rebind_alloc<U> const& alloc)
200
          : head_(tagged_node_handle(null_index_t(), 0))
201
          , tail_(tagged_node_handle(null_index_t(), 0))
202
          , pool(alloc, capacity)
203
        {
204
            static_assert(has_capacity);
205
            initialize();
206
        }
207

208
        /**
209
         * Construct a fixed-sized queue with a custom allocator
210
         *
211
         *  \pre Must specify a capacity<> argument
212
         */
213
        explicit queue(allocator const& alloc)
214
          : head_(tagged_node_handle(null_index_t(), 0))
215
          , tail_(tagged_node_handle(null_index_t(), 0))
216
          , pool(alloc, capacity)
217
        {
218
            // Don't use static_assert() here since it will be evaluated when
219
            // compiling this function and this function may be compiled even
220
            // when it isn't being used.
221
            HPX_ASSERT(has_capacity);
222
            initialize();
223
        }
224

225
        /**
226
         * Construct a variable-sized queue
227
         *
228
         *  Allocate n nodes initially for the freelist
229
         *
230
         *  \pre Must \b not specify a capacity<> argument
231
         */
232
        explicit queue(size_type n)
608✔
233
          : head_(tagged_node_handle(null_index_t(), 0))
608✔
234
          , tail_(tagged_node_handle(null_index_t(), 0))
608✔
235
          , pool(node_allocator(), n + 1)
608✔
236
        {
237
            // Don't use static_assert() here since it will be evaluated when
238
            // compiling this function and this function may be compiled even
239
            // when it isn't being used.
240
            HPX_ASSERT(!has_capacity);
241
            initialize();
608✔
242
        }
608✔
243

244
        /**
245
         * Construct a variable-sized queue with a custom allocator
246
         *
247
         *  Allocate n nodes initially for the freelist
248
         *
249
         *  \pre Must \b not specify a capacity<> argument
250
         */
251
        template <typename U>
252
        queue(size_type n,
253
            typename std::allocator_traits<Allocator>::template rebind_alloc<
254
                U> const& alloc)
255
          : head_(tagged_node_handle(null_index_t(), 0))
256
          , tail_(tagged_node_handle(null_index_t(), 0))
257
          , pool(alloc, n + 1)
258
        {
259
            static_assert(!has_capacity);
260
            initialize();
261
        }
262

263
        /** \copydoc hpx::lockfree::stack::reserve
264
         */
265
        void reserve(size_type n)
1✔
266
        {
267
            pool.template reserve<true>(n);
1✔
268
        }
1✔
269

270
        /** \copydoc hpx::lockfree::stack::reserve_unsafe
271
         */
272
        void reserve_unsafe(size_type n)
1✔
273
        {
274
            pool.template reserve<false>(n);
1✔
275
        }
1✔
276

277
        /** Destroys queue, free all nodes from freelist.
278
         */
279
        ~queue()
609✔
280
        {
281
            T dummy;
282
            while (unsynchronized_pop(dummy))
609✔
283
            {
284
            }
285

286
            pool.template destruct<false>(
1,218✔
287
                head_.load(std::memory_order_relaxed));
609✔
288
        }
609✔
289

290
        /**
291
         * Check if the queue is empty
292
         *
293
         * \return true, if the queue is empty, false otherwise
294
         * \note The result is only accurate, if no other thread modifies the
295
         *       queue. Therefore it is rarely practical to use this value in
296
         *       program logic.
297
         */
298
        constexpr bool empty() const noexcept
20✔
299
        {
300
            return pool.get_handle(head_.load()) ==
40✔
301
                pool.get_handle(tail_.load());
20✔
302
        }
303

304
        /**
305
         * Pushes object t to the queue.
306
         *
307
         * \post object will be pushed to the queue, if internal node can be
308
         *       allocated
309
         * \returns true, if the push operation is successful.
310
         *
311
         * \note Thread-safe. If internal memory pool is exhausted and the
312
         *       memory pool is not fixed-sized, a new node will be allocated
313
         *                    from the OS. This may not be lock-free.
314
         */
315
        template <typename T_>
316
        bool push(T_&& t)
1,644,508✔
317
        {
318
            return do_push<false>(HPX_FORWARD(T_, t));
1,644,507✔
319
        }
320

321
        /**
322
         * Pushes object t to the queue.
323
         *
324
         * \post object will be pushed to the queue, if internal node can be
325
         *       allocated
326
         * \returns true, if the push operation is successful.
327
         *
328
         * \note Thread-safe and non-blocking. If internal memory pool is
329
         *       exhausted, operation will fail
330
         * \throws if memory allocator throws
331
         */
332
        template <typename T_>
333
        bool bounded_push(T_&& t)
19,993✔
334
        {
335
            return do_push<true>(HPX_FORWARD(T_, t));
19,993✔
336
        }
337

338
    private:
339
        template <bool Bounded, typename T_>
340
        bool do_push(T_&& t)
1,664,499✔
341
        {
342
            node* n = pool.template construct<true, Bounded>(
3,328,998✔
343
                HPX_FORWARD(T_, t), pool.null_handle());
1,664,557✔
344
            handle_type node_handle = pool.get_handle(n);
1,664,557✔
345

346
            if (n == nullptr)
1,664,557✔
347
                return false;
4,279✔
348

349
            for (;;)
1,695,841✔
350
            {
351
                tagged_node_handle tail = tail_.load(std::memory_order_acquire);
1,695,827✔
352
                node* tail_node = pool.get_pointer(tail);
1,695,827✔
353
                tagged_node_handle next =
354
                    tail_node->next.load(std::memory_order_acquire);
1,695,827✔
355
                node* next_ptr = pool.get_pointer(next);
1,695,827✔
356

357
                tagged_node_handle tail2 =
358
                    tail_.load(std::memory_order_acquire);
1,695,827✔
359
                if (HPX_LIKELY(tail == tail2))
1,695,827✔
360
                {
361
                    if (next_ptr == nullptr)
1,695,765✔
362
                    {
363
                        tagged_node_handle new_tail_next(
1,680,422✔
364
                            node_handle, next.get_next_tag());
1,680,422✔
365
                        if (tail_node->next.compare_exchange_weak(
3,360,804✔
366
                                next, new_tail_next))
1,680,422✔
367
                        {
368
                            tagged_node_handle new_tail(
1,660,281✔
369
                                node_handle, tail.get_next_tag());
1,660,281✔
370
                            tail_.compare_exchange_strong(tail, new_tail);
1,660,281✔
371
                            return true;
1,660,281✔
372
                        }
373
                    }
20,141✔
374
                    else
375
                    {
376
                        tagged_node_handle new_tail(
15,363✔
377
                            pool.get_handle(next_ptr), tail.get_next_tag());
15,363✔
378
                        tail_.compare_exchange_strong(tail, new_tail);
15,363✔
379
                    }
380
                }
35,504✔
381
            }
382
        }
1,664,560✔
383

384
    public:
385
        /**
386
         * Pushes object t to the queue.
387
         *
388
         * \post object will be pushed to the queue, if internal node can be
389
         *       allocated
390
         * \returns true, if the push operation is successful.
391
         *
392
         * \note Not Thread-safe. If internal memory pool is exhausted and the
393
         *       memory pool is not fixed-sized, a new node will be allocated
394
         *       from the OS. This may not be lock-free.
395
         * \throws if memory allocator throws
396
         */
397
        template <typename T_>
398
        bool unsynchronized_push(T_&& t)
2✔
399
        {
400
            node* n = pool.template construct<false, false>(
4✔
401
                HPX_FORWARD(T_, t), pool.null_handle());
2✔
402

403
            if (n == nullptr)
2✔
404
                return false;
×
405

406
            for (;;)
2✔
407
            {
408
                tagged_node_handle tail = tail_.load(std::memory_order_relaxed);
2✔
409
                tagged_node_handle next =
410
                    tail->next.load(std::memory_order_relaxed);
2✔
411
                node* next_ptr = next.get_ptr();
2✔
412

413
                if (next_ptr == nullptr)
2✔
414
                {
415
                    tail->next.store(tagged_node_handle(n, next.get_next_tag()),
2✔
416
                        std::memory_order_relaxed);
417
                    tail_.store(tagged_node_handle(n, tail.get_next_tag()),
2✔
418
                        std::memory_order_relaxed);
419
                    return true;
2✔
420
                }
421
                else
422
                {
423
                    tail_.store(
×
424
                        tagged_node_handle(next_ptr, tail.get_next_tag()),
×
425
                        std::memory_order_relaxed);
426
                }
427
            }
428
        }
2✔
429

430
        /**
431
         * Pops object from queue.
432
         *
433
         * \post if pop operation is successful, object will be copied to ret.
434
         * \returns true, if the pop operation is successful, false if queue was
435
         *          empty.
436
         *
437
         * \note Thread-safe and non-blocking
438
         */
439
        bool pop(T& ret) noexcept(
1,666,664✔
440
            noexcept(std::is_nothrow_copy_constructible_v<T>))
441
        {
442
            return pop<T>(ret);
1,667,384✔
443
        }
444

445
        /**
446
         * Pops object from queue.
447
         *
448
         * \pre type U must be constructible by T and copyable, or T must be
449
         *      convertible to U
450
         * \post if pop operation is successful, object will be copied to ret.
451
         * \returns true, if the pop operation is successful, false if queue was
452
         *          empty.
453
         *
454
         * \note Thread-safe and non-blocking
455
         */
456
        template <typename U>
457
        bool pop(U& ret) noexcept(
1,659,917✔
458
            noexcept(std::is_nothrow_constructible_v<U, T>))
459
        {
460
            for (;;)
1,702,120✔
461
            {
462
                tagged_node_handle head = head_.load(std::memory_order_acquire);
1,692,715✔
463
                node* head_ptr = pool.get_pointer(head);
1,692,715✔
464

465
                tagged_node_handle tail = tail_.load(std::memory_order_acquire);
1,692,715✔
466
                tagged_node_handle next =
467
                    head_ptr->next.load(std::memory_order_acquire);
1,692,715✔
468
                node* next_ptr = pool.get_pointer(next);
1,692,715✔
469

470
                tagged_node_handle head2 =
471
                    head_.load(std::memory_order_acquire);
1,692,715✔
472
                if (HPX_LIKELY(head == head2))
1,692,715✔
473
                {
474
                    if (pool.get_handle(head) == pool.get_handle(tail))
1,710,830✔
475
                    {
476
                        if (next_ptr == nullptr)
100,397✔
477
                            return false;
78,344✔
478

479
                        tagged_node_handle new_tail(
22,055✔
480
                            pool.get_handle(next), tail.get_next_tag());
22,055✔
481
                        tail_.compare_exchange_strong(tail, new_tail);
22,055✔
482
                    }
22,055✔
483
                    else
484
                    {
485
                        if (next_ptr == nullptr)
1,613,273✔
486
                        {
487
                            // this check is not part of the original algorithm
488
                            // as published by michael and scott
489
                            //
490
                            // however we reuse the tagged_ptr part for the
491
                            // freelist and clear the next part during node
492
                            // allocation. we can observe a null-pointer here.
493
                            continue;
×
494
                        }
495

496
                        detail::copy_payload(next_ptr->data, ret);
1,613,273✔
497

498
                        tagged_node_handle new_head(
1,666,703✔
499
                            pool.get_handle(next), head.get_next_tag());
1,666,703✔
500
                        if (head_.compare_exchange_weak(head, new_head))
1,666,703✔
501
                        {
502
                            pool.template destruct<true>(head);
1,626,985✔
503
                            return true;
1,626,985✔
504
                        }
505
                    }
506
                }
42,147✔
507
            }
508
        }
1,705,785✔
509

510
        /**
511
         * Pops object from queue.
512
         *
513
         * \post if pop operation is successful, object will be copied to ret.
514
         * \returns true, if the pop operation is successful, false if queue was
515
         *          empty.
516
         *
517
         * \note Not thread-safe, but non-blocking
518
         *
519
         */
520
        bool unsynchronized_pop(T& ret) noexcept(
611✔
521
            noexcept(std::is_nothrow_copy_constructible_v<T>))
522
        {
523
            return unsynchronized_pop<T>(ret);
611✔
524
        }
525

526
        /**
527
         * Pops object from queue.
528
         *
529
         * \pre type U must be constructible by T and copyable, or T must be
530
         *      convertible to U
531
         * \post if pop operation is successful, object will be copied to ret.
532
         * \returns true, if the pop operation is successful, false if queue was
533
         *          empty.
534
         *
535
         * \note Not thread-safe, but non-blocking
536
         *
537
         */
538
        template <typename U>
539
        bool unsynchronized_pop(U& ret) noexcept(
611✔
540
            noexcept(std::is_nothrow_constructible_v<U, T>))
541
        {
542
            for (;;)
611✔
543
            {
544
                tagged_node_handle head = head_.load(std::memory_order_relaxed);
611✔
545
                node* head_ptr = pool.get_pointer(head);
611✔
546
                tagged_node_handle tail = tail_.load(std::memory_order_relaxed);
611✔
547
                tagged_node_handle next =
548
                    head_ptr->next.load(std::memory_order_relaxed);
611✔
549
                node* next_ptr = pool.get_pointer(next);
611✔
550

551
                if (pool.get_handle(head) == pool.get_handle(tail))
611✔
552
                {
553
                    if (next_ptr == nullptr)
609✔
554
                        return false;
609✔
555

556
                    tagged_node_handle new_tail(
×
557
                        pool.get_handle(next), tail.get_next_tag());
×
558
                    tail_.store(new_tail);
×
559
                }
×
560
                else
561
                {
562
                    if (next_ptr == nullptr)
2✔
563
                    {
564
                        // this check is not part of the original algorithm as
565
                        // published by michael and scott
566
                        //
567
                        // however we reuse the tagged_ptr part for the freelist
568
                        // and clear the next part during node allocation. we
569
                        // can observe a null-pointer here.
570
                        continue;
×
571
                    }
572

573
                    detail::copy_payload(next_ptr->data, ret);
2✔
574
                    tagged_node_handle new_head(
2✔
575
                        pool.get_handle(next), head.get_next_tag());
2✔
576
                    head_.store(new_head);
2✔
577
                    pool.template destruct<false>(head);
2✔
578
                    return true;
2✔
579
                }
580
            }
581
        }
611✔
582

583
        /**
584
         * consumes one element via a functor
585
         *
586
         *  pops one element from the queue and applies the functor on this
587
         *  object
588
         *
589
         * \returns true, if one element was consumed
590
         *
591
         * \note Thread-safe and non-blocking, if functor is thread-safe and
592
         *       non-blocking
593
         */
594
        template <typename F>
595
        bool consume_one(F&& f)
5✔
596
        {
597
            T element;
598
            bool success = pop(element);
5✔
599
            if (success)
5✔
600
                f(element);
4✔
601

602
            return success;
5✔
603
        }
604

605
        /**
606
         * consumes all elements via a functor
607
         *
608
         * sequentially pops all elements from the queue and applies the functor
609
         * on each object
610
         *
611
         * \returns number of elements that are consumed
612
         *
613
         * \note Thread-safe and non-blocking, if functor is thread-safe and
614
         *       non-blocking
615
         */
616
        template <typename F>
617
        std::size_t consume_all(F&& f)
1✔
618
        {
619
            std::size_t element_count = 0;
1✔
620
            while (consume_one(f))
3✔
621
                ++element_count;
2✔
622

623
            return element_count;
1✔
624
        }
625

626
    private:
627
        util::cache_aligned_data_derived<std::atomic<tagged_node_handle>> head_;
628
        util::cache_aligned_data_derived<std::atomic<tagged_node_handle>> tail_;
629

630
        pool_t pool;
631
    };
632
}    // namespace hpx::lockfree
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc