• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #857

28 Dec 2022 11:12PM UTC coverage: 86.543% (-0.06%) from 86.602%
#857

push

StellarBot
Merge #6118

6118: Modernize modules from level 17, 18, 19, and 20 r=hkaiser a=hkaiser

working towards https://github.com/STEllAR-GROUP/hpx/issues/5497

Modules:
- core/threading_base
- full/command_line_handling
- core/io_service
- core/schedulers
- core/synchronization
- core/futures
- core/thread_pools
- core/lcos_local
- core/pack_traversal
- core/resource_partitioner
- core/threading
- full/naming_base


Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

849 of 849 new or added lines in 98 files covered. (100.0%)

174389 of 201505 relevant lines covered (86.54%)

1916353.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.3
/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp
1
//  Copyright (c) 2007-2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#pragma once
8

9
#include <hpx/config.hpp>
10
#include <hpx/assert.hpp>
11
#include <hpx/execution_base/this_thread.hpp>
12
#include <hpx/functional/move_only_function.hpp>
13
#include <hpx/hardware/timestamp.hpp>
14
#include <hpx/modules/itt_notify.hpp>
15
#include <hpx/thread_pools/detail/scheduling_log.hpp>
16
#include <hpx/threading_base/scheduler_base.hpp>
17
#include <hpx/threading_base/scheduler_state.hpp>
18
#include <hpx/threading_base/thread_data.hpp>
19

20
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
21
    defined(HPX_HAVE_THREAD_IDLE_RATES)
22
#include <hpx/thread_pools/detail/scoped_background_timer.hpp>
23
#endif
24

25
#if defined(HPX_HAVE_APEX)
26
#include <hpx/threading_base/external_timer.hpp>
27
#endif
28

29
#include <atomic>
30
#include <cstddef>
31
#include <cstdint>
32
#include <limits>
33
#include <memory>
34
#include <utility>
35

36
namespace hpx::threads::detail {
37

38
    ///////////////////////////////////////////////////////////////////////
39
    // helper class for switching thread state in and out during execution
40
    class switch_status
41
    {
42
    public:
43
        switch_status(
9,690,003✔
44
            thread_id_ref_type const& t, thread_state prev_state) noexcept
45
          : thread_(get_thread_id_data(t))
9,690,014✔
46
          , prev_state_(prev_state)
9,690,014✔
47
          , next_thread_id_(nullptr)
9,690,014✔
48
          , need_restore_state_(thread_->set_state_tagged(
19,380,006✔
49
                thread_schedule_state::active, prev_state_, orig_state_))
9,690,014✔
50
        {
51
        }
9,690,014✔
52

53
        ~switch_status()
9,690,143✔
54
        {
55
            if (need_restore_state_)
9,690,191✔
56
            {
57
                store_state(prev_state_);
×
58
            }
×
59
        }
9,690,191✔
60

61
        constexpr bool is_valid() const noexcept
9,690,167✔
62
        {
63
            return need_restore_state_;
9,690,175✔
64
        }
65

66
        // allow to change the state the thread will be switched to after
67
        // execution
68
        thread_state operator=(thread_result_type&& new_state) noexcept
9,689,041✔
69
        {
70
            prev_state_ = thread_state(
9,689,239✔
71
                new_state.first, prev_state_.state_ex(), prev_state_.tag() + 1);
9,689,239✔
72
            if (new_state.second != nullptr)
9,689,239✔
73
            {
74
                next_thread_id_ = HPX_MOVE(new_state.second);
14,146✔
75
            }
14,146✔
76
            return prev_state_;
9,689,242✔
77
        }
78

79
        // Get the state this thread was in before execution (usually pending),
80
        // this helps making sure no other worker-thread is started to execute
81
        // this HPX-thread in the meantime.
82
        thread_schedule_state get_previous() const noexcept
29,064,959✔
83
        {
84
            return prev_state_.state();
29,064,984✔
85
        }
86

87
        // This restores the previous state, while making sure that the original
88
        // state has not been changed since we started executing this thread.
89
        // The function returns true if the state has been set, false otherwise.
90
        bool store_state(thread_state& newstate) noexcept
9,690,171✔
91
        {
92
            disable_restore();
9,690,269✔
93

94
            if (thread_->restore_state(prev_state_, orig_state_))
9,690,269✔
95
            {
96
                newstate = prev_state_;
9,690,270✔
97
                return true;
9,690,270✔
98
            }
99
            return false;
×
100
        }
9,690,270✔
101

102
        // disable default handling in destructor
103
        void disable_restore() noexcept
9,690,169✔
104
        {
105
            need_restore_state_ = false;
9,690,170✔
106
        }
9,690,170✔
107

108
        constexpr thread_id_ref_type const& get_next_thread() const noexcept
109
        {
110
            return next_thread_id_;
111
        }
112

113
        thread_id_ref_type move_next_thread() noexcept
9,690,211✔
114
        {
115
            return HPX_MOVE(next_thread_id_);
9,690,211✔
116
        }
117

118
    private:
119
        thread_data* thread_;
120
        thread_state prev_state_;
121
        thread_state orig_state_;
122
        thread_id_ref_type next_thread_id_;
123
        bool need_restore_state_;
124
    };
125

126
    class switch_status_background
127
    {
128
    public:
129
        switch_status_background(
73,811,903✔
130
            thread_id_ref_type const& t, thread_state prev_state) noexcept
131
          : thread_(get_thread_id_data(t))
73,830,616✔
132
          , prev_state_(prev_state)
73,830,616✔
133
          , next_thread_id_(nullptr)
73,830,616✔
134
          , need_restore_state_(
147,623,806✔
135
                thread_->set_state_tagged(thread_schedule_state::active,
147,623,806✔
136
                    prev_state_, orig_state_, std::memory_order_relaxed))
73,830,616✔
137
        {
138
        }
73,830,616✔
139

140
        ~switch_status_background()
74,205,248✔
141
        {
142
            if (need_restore_state_)
74,207,659✔
143
            {
144
                store_state(prev_state_);
×
145
            }
×
146
        }
74,174,234✔
147

148
        constexpr bool is_valid() const noexcept
74,724,619✔
149
        {
150
            return need_restore_state_;
74,724,740✔
151
        }
152

153
        // allow to change the state the thread will be switched to after
154
        // execution
155
        thread_state operator=(thread_result_type&& new_state) noexcept
73,770,640✔
156
        {
157
            prev_state_ = thread_state(
74,135,439✔
158
                new_state.first, prev_state_.state_ex(), prev_state_.tag() + 1);
74,135,439✔
159
            if (new_state.second != nullptr)
74,135,439✔
160
            {
161
                next_thread_id_ = HPX_MOVE(new_state.second);
×
162
            }
×
163
            return prev_state_;
74,136,451✔
164
        }
165

166
        // Get the state this thread was in before execution (usually pending),
167
        // this helps making sure no other worker-thread is started to execute
168
        // this HPX-thread in the meantime.
169
        thread_schedule_state get_previous() const noexcept
74,418,253✔
170
        {
171
            return prev_state_.state();
74,419,885✔
172
        }
173

174
        // This restores the previous state, while making sure that the original
175
        // state has not been changed since we started executing this thread.
176
        // The function returns true if the state has been set, false otherwise.
177
        bool store_state(thread_state& newstate) noexcept
73,856,202✔
178
        {
179
            disable_restore();
74,625,455✔
180
            if (thread_->restore_state(prev_state_, orig_state_))
74,625,455✔
181
            {
182
                newstate = prev_state_;
74,593,358✔
183
                return true;
74,593,358✔
184
            }
185
            return false;
×
186
        }
74,593,806✔
187

188
        // disable default handling in destructor
189
        void disable_restore() noexcept
73,841,105✔
190
        {
191
            need_restore_state_ = false;
73,841,394✔
192
        }
73,841,394✔
193

194
        constexpr thread_id_ref_type const& get_next_thread() const noexcept
195
        {
196
            return next_thread_id_;
197
        }
198

199
        thread_id_ref_type move_next_thread() noexcept
74,145,317✔
200
        {
201
            return HPX_MOVE(next_thread_id_);
74,171,641✔
202
        }
203

204
    private:
205
        thread_data* thread_;
206
        thread_state prev_state_;
207
        thread_state orig_state_;
208
        thread_id_ref_type next_thread_id_;
209
        bool need_restore_state_;
210
    };
211

212
#ifdef HPX_HAVE_THREAD_IDLE_RATES
213
    struct idle_collect_rate
214
    {
215
        idle_collect_rate(
216
            std::int64_t& tfunc_time, std::int64_t& exec_time) noexcept
217
          : start_timestamp_(util::hardware::timestamp())
218
          , tfunc_time_(tfunc_time)
219
          , exec_time_(exec_time)
220
        {
221
        }
222

223
        void collect_exec_time(std::int64_t timestamp) noexcept
224
        {
225
            exec_time_ += util::hardware::timestamp() - timestamp;
226
        }
227

228
        void take_snapshot() noexcept
229
        {
230
            if (tfunc_time_ == std::int64_t(-1))
231
            {
232
                start_timestamp_ = util::hardware::timestamp();
233
                tfunc_time_ = 0;
234
                exec_time_ = 0;
235
            }
236
            else
237
            {
238
                tfunc_time_ = util::hardware::timestamp() - start_timestamp_;
239
            }
240
        }
241

242
        std::int64_t start_timestamp_;
243

244
        std::int64_t& tfunc_time_;
245
        std::int64_t& exec_time_;
246
    };
247

248
    struct exec_time_wrapper
249
    {
250
        explicit exec_time_wrapper(idle_collect_rate& idle_rate) noexcept
251
          : timestamp_(util::hardware::timestamp())
252
          , idle_rate_(idle_rate)
253
        {
254
        }
255
        ~exec_time_wrapper()
256
        {
257
            idle_rate_.collect_exec_time(timestamp_);
258
        }
259

260
        std::int64_t timestamp_;
261
        idle_collect_rate& idle_rate_;
262
    };
263

264
    struct tfunc_time_wrapper
265
    {
266
        explicit constexpr tfunc_time_wrapper(
267
            idle_collect_rate& idle_rate) noexcept
268
          : idle_rate_(idle_rate)
269
        {
270
        }
271
        ~tfunc_time_wrapper()
272
        {
273
            idle_rate_.take_snapshot();
274
        }
275

276
        idle_collect_rate& idle_rate_;
277
    };
278
#else
279
    struct idle_collect_rate
280
    {
281
        explicit constexpr idle_collect_rate(
3,788✔
282
            std::int64_t&, std::int64_t&) noexcept
283
        {
284
        }
4,143✔
285
    };
286

287
    struct exec_time_wrapper
288
    {
289
        explicit constexpr exec_time_wrapper(idle_collect_rate&) noexcept {}
9,690,322✔
290
    };
291

292
    struct tfunc_time_wrapper
293
    {
294
        explicit constexpr tfunc_time_wrapper(idle_collect_rate&) noexcept {}
19,380,707✔
295
    };
296
#endif
297

298
    ///////////////////////////////////////////////////////////////////////////
299
    struct is_active_wrapper
300
    {
301
        explicit is_active_wrapper(bool& is_active) noexcept
9,690,331✔
302
          : is_active_(is_active)
9,690,327✔
303
        {
304
            is_active = true;
9,690,327✔
305
        }
9,690,327✔
306
        ~is_active_wrapper()
9,689,249✔
307
        {
308
            is_active_ = false;
9,689,249✔
309
        }
9,689,249✔
310

311
        bool& is_active_;
312
    };
313

314
    ///////////////////////////////////////////////////////////////////////////
315
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
316
    defined(HPX_HAVE_THREAD_IDLE_RATES)
317
    struct scheduling_counters
318
    {
319
        scheduling_counters(std::int64_t& executed_threads,
320
            std::int64_t& executed_thread_phases, std::int64_t& tfunc_time,
321
            std::int64_t& exec_time, std::int64_t& idle_loop_count,
322
            std::int64_t& busy_loop_count, bool& is_active,
323
            std::int64_t& background_work_duration,
324
            std::int64_t& background_send_duration,
325
            std::int64_t& background_receive_duration) noexcept
326
          : executed_threads_(executed_threads)
327
          , executed_thread_phases_(executed_thread_phases)
328
          , tfunc_time_(tfunc_time)
329
          , exec_time_(exec_time)
330
          , idle_loop_count_(idle_loop_count)
331
          , busy_loop_count_(busy_loop_count)
332
          , background_work_duration_(background_work_duration)
333
          , background_send_duration_(background_send_duration)
334
          , background_receive_duration_(background_receive_duration)
335
          , is_active_(is_active)
336
        {
337
        }
338

339
        std::int64_t& executed_threads_;
340
        std::int64_t& executed_thread_phases_;
341
        std::int64_t& tfunc_time_;
342
        std::int64_t& exec_time_;
343
        std::int64_t& idle_loop_count_;
344
        std::int64_t& busy_loop_count_;
345
        std::int64_t& background_work_duration_;
346
        std::int64_t& background_send_duration_;
347
        std::int64_t& background_receive_duration_;
348
        bool& is_active_;
349
    };
350
#else
351
    struct scheduling_counters
352
    {
353
        scheduling_counters(std::int64_t& executed_threads,
4,365✔
354
            std::int64_t& executed_thread_phases, std::int64_t& tfunc_time,
355
            std::int64_t& exec_time, std::int64_t& idle_loop_count,
356
            std::int64_t& busy_loop_count, bool& is_active) noexcept
357
          : executed_threads_(executed_threads)
4,365✔
358
          , executed_thread_phases_(executed_thread_phases)
4,365✔
359
          , tfunc_time_(tfunc_time)
4,365✔
360
          , exec_time_(exec_time)
4,365✔
361
          , idle_loop_count_(idle_loop_count)
4,365✔
362
          , busy_loop_count_(busy_loop_count)
4,365✔
363
          , is_active_(is_active)
4,365✔
364
        {
365
        }
4,365✔
366

367
        std::int64_t& executed_threads_;
368
        std::int64_t& executed_thread_phases_;
369
        std::int64_t& tfunc_time_;
370
        std::int64_t& exec_time_;
371
        std::int64_t& idle_loop_count_;
372
        std::int64_t& busy_loop_count_;
373
        bool& is_active_;
374
    };
375

376
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
377

378
    struct scheduling_callbacks
4,205✔
379
    {
380
        using callback_type = hpx::move_only_function<void()>;
381
        using background_callback_type = hpx::move_only_function<bool()>;
382

383
        explicit scheduling_callbacks(callback_type&& outer,
4,124✔
384
            callback_type&& inner = callback_type(),
385
            background_callback_type&& background = background_callback_type(),
386
            std::size_t max_background_threads =
387
                (std::numeric_limits<std::size_t>::max)(),
388
            std::size_t max_idle_loop_count = HPX_IDLE_LOOP_COUNT_MAX,
389
            std::size_t max_busy_loop_count = HPX_BUSY_LOOP_COUNT_MAX)
390
          : outer_(HPX_MOVE(outer))
4,128✔
391
          , inner_(HPX_MOVE(inner))
4,128✔
392
          , background_(HPX_MOVE(background))
4,128✔
393
          , max_background_threads_(max_background_threads)
4,128✔
394
          , max_idle_loop_count_(max_idle_loop_count)
4,128✔
395
          , max_busy_loop_count_(max_busy_loop_count)
4,128✔
396
        {
397
        }
4,128✔
398

399
        callback_type outer_;
400
        callback_type inner_;
401
        background_callback_type background_;
402
        std::size_t const max_background_threads_;
403
        std::int64_t const max_idle_loop_count_;
404
        std::int64_t const max_busy_loop_count_;
405
    };
406

407
    template <typename SchedulingPolicy>
408
    thread_id_ref_type create_background_thread(SchedulingPolicy& scheduler,
540✔
409
        scheduling_callbacks& callbacks,
410
        std::shared_ptr<bool>& background_running,
411
        threads::thread_schedule_hint schedulehint,
412
        std::int64_t& idle_loop_count)
413
    {
414
        thread_id_ref_type background_thread;
518✔
415
        background_running.reset(new bool(true));
540✔
416
        thread_init_data background_init(
486✔
417
            [&, background_running](
2,802✔
418
                thread_restart_state) -> thread_result_type {
419
                while (*background_running)
73,916,734✔
420
                {
421
                    if (callbacks.background_())
73,916,164✔
422
                    {
423
                        // we only update the idle_loop_count if
424
                        // background_running is true. If it was false, this
425
                        // task was given back to the scheduler.
426
                        if (*background_running)
×
427
                            idle_loop_count = 0;
×
428
                    }
×
429
                    // Force yield...
430
                    hpx::execution_base::this_thread::yield("background_work");
74,528,604✔
431
                }
432

433
                return thread_result_type(
599✔
434
                    thread_schedule_state::terminated, invalid_thread_id);
599✔
435
            },
436
            hpx::threads::thread_description("background_work"),
556✔
437
            thread_priority::high_recursive, schedulehint,
556✔
438
            thread_stacksize::large,
439
            // Create in suspended to prevent the thread from being scheduled
440
            // directly...
441
            thread_schedule_state::suspended, true, &scheduler);
556✔
442

443
        scheduler.SchedulingPolicy::create_thread(
599✔
444
            background_init, &background_thread, hpx::throws);
445
        HPX_ASSERT(background_thread);
599✔
446
        scheduler.SchedulingPolicy::increment_background_thread_count();
591✔
447
        // We can now set the state to pending
448
        get_thread_id_data(background_thread)
591✔
449
            ->set_state(thread_schedule_state::pending);
591✔
450
        return background_thread;
591✔
451
    }
599✔
452

453
    // This function tries to invoke the background work thread. It returns
454
    // false when we need to give the background thread back to scheduler and
455
    // create a new one that is supposed to be executed inside the
456
    // scheduling_loop, true otherwise
457
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
458
    defined(HPX_HAVE_THREAD_IDLE_RATES)
459
    template <typename SchedulingPolicy>
460
    bool call_background_thread(thread_id_ref_type& background_thread,
461
        thread_id_ref_type& next_thrd, SchedulingPolicy& scheduler,
462
        std::size_t num_thread, bool /* running */,
463
        std::int64_t& background_work_exec_time_init,
464
        hpx::execution_base::this_thread::detail::agent_storage*
465
            context_storage)
466
#else
467
    template <typename SchedulingPolicy>
468
    bool call_background_thread(thread_id_ref_type& background_thread,
473,403,952✔
469
        thread_id_ref_type& next_thrd, SchedulingPolicy& scheduler,
470
        std::size_t num_thread, bool /* running */,
471
        hpx::execution_base::this_thread::detail::agent_storage*
472
            context_storage)
473
#endif
474
    {
475
        if (HPX_UNLIKELY(background_thread))
477,273,907✔
476
        {
477
            thread_state state =
478
                get_thread_id_data(background_thread)->get_state();
73,818,526✔
479
            thread_schedule_state state_val = state.state();
73,818,526✔
480

481
            if (HPX_LIKELY(thread_schedule_state::pending == state_val))
73,818,526✔
482
            {
483
                {
484
                    // tries to set state to active (only if state is still
485
                    // the same as 'state')
486
                    detail::switch_status_background thrd_stat(
74,575,557✔
487
                        background_thread, state);
74,575,557✔
488

489
                    if (HPX_LIKELY(thrd_stat.is_valid() &&
73,818,526✔
490
                            thrd_stat.get_previous() ==
491
                                thread_schedule_state::pending))
492
                    {
493
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
494
    defined(HPX_HAVE_THREAD_IDLE_RATES)
495
                        // measure background work duration
496
                        background_work_duration_counter bg_work_duration(
497
                            background_work_exec_time_init);
498
                        background_exec_time_wrapper bg_exec_time(
499
                            bg_work_duration);
500
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
501

502
                        // invoke background thread
503
                        thrd_stat = (*get_thread_id_data(background_thread))(
148,328,070✔
504
                            context_storage);
73,787,336✔
505

506
                        thread_id_ref_type next = thrd_stat.move_next_thread();
73,947,481✔
507
                        if (next != nullptr && next != background_thread)
73,787,336✔
508
                        {
509
                            if (next_thrd == nullptr)
×
510
                            {
511
                                next_thrd = HPX_MOVE(next);
×
512
                            }
×
513
                            else
514
                            {
515
                                auto* scheduler = get_thread_id_data(next)
×
516
                                                      ->get_scheduler_base();
×
517
                                scheduler->schedule_thread(HPX_MOVE(next),
×
518
                                    threads::thread_schedule_hint(
×
519
                                        static_cast<std::int16_t>(num_thread)),
×
520
                                    true);
521
                                scheduler->do_some_work(num_thread);
×
522
                            }
523
                        }
×
524
                    }
73,947,481✔
525
                    thrd_stat.store_state(state);
74,271,725✔
526
                    state_val = state.state();
74,271,725✔
527

528
                    if (HPX_LIKELY(
74,271,725✔
529
                            state_val == thread_schedule_state::pending_boost))
530
                    {
531
                        get_thread_id_data(background_thread)
8,386✔
532
                            ->set_state(thread_schedule_state::pending);
8,386✔
533
                    }
8,386✔
534
                    else if (thread_schedule_state::terminated == state_val)
74,266,534✔
535
                    {
536
                        scheduler.SchedulingPolicy::
×
537
                            decrement_background_thread_count();
×
538
                        background_thread = thread_id_type();
×
539
                    }
×
540
                    else if (thread_schedule_state::suspended == state_val)
74,273,393✔
541
                    {
542
                        return false;
×
543
                    }
544
                }
74,244,108✔
545
                return true;
74,109,737✔
546
            }
547
            // This should never be reached ... we should only deal with pending
548
            // here.
549
            HPX_ASSERT(false);
×
550
        }
×
551
        return true;
402,590,827✔
552
    }
476,700,564✔
553

554
    template <typename SchedulingPolicy>
555
    void scheduling_loop(std::size_t num_thread, SchedulingPolicy& scheduler,
4,003✔
556
        scheduling_counters& counters, scheduling_callbacks& params)
557
    {
558
        std::atomic<hpx::state>& this_state = scheduler.get_state(num_thread);
3,691✔
559

560
#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
561
        util::itt::stack_context ctx;    // helper for itt support
562
        util::itt::thread_domain thread_domain;
563
        util::itt::id threadid(thread_domain, &scheduler);
564
        util::itt::string_handle task_id("task_id");
565
        util::itt::string_handle task_phase("task_phase");
566
        // util::itt::frame_context fctx(thread_domain);
567
#endif
568

569
        std::int64_t& idle_loop_count = counters.idle_loop_count_;
3,691✔
570
        std::int64_t& busy_loop_count = counters.busy_loop_count_;
3,691✔
571

572
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
573
    defined(HPX_HAVE_THREAD_IDLE_RATES)
574
        std::int64_t& bg_work_exec_time_init =
575
            counters.background_work_duration_;
576
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
577

578
        idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_);
3,691✔
579
        [[maybe_unused]] tfunc_time_wrapper tfunc_time_collector(idle_rate);
3,691✔
580

581
        // spin for some time after queues have become empty
582
        bool may_exit = false;
3,691✔
583

584
        std::shared_ptr<bool> background_running = nullptr;
3,691✔
585
        thread_id_ref_type background_thread;
3,691✔
586

587
        if (scheduler.SchedulingPolicy::has_scheduler_mode(
4,492✔
588
                policies::scheduler_mode::do_background_work) &&
3,691✔
589
            num_thread < params.max_background_threads_ &&
3,669✔
590
            !params.background_.empty())
486✔
591
        {
592
            background_thread =
590✔
593
                create_background_thread(scheduler, params, background_running,
972✔
594
                    thread_schedule_hint(static_cast<std::int16_t>(num_thread)),
593✔
595
                    idle_loop_count);
593✔
596
        }
590✔
597

598
        hpx::execution_base::this_thread::detail::agent_storage*
599
            context_storage =
4,296✔
600
                hpx::execution_base::this_thread::detail::get_agent_storage();
4,328✔
601

602
        std::size_t added = std::size_t(-1);
4,296✔
603
        thread_id_ref_type next_thrd;
4,296✔
604
        while (true)
483,707,979✔
605
        {
606
            thread_id_ref_type thrd = HPX_MOVE(next_thrd);
484,039,845✔
607

608
            // Get the next HPX thread from the queue
609
            bool running = this_state.load(std::memory_order_relaxed) <
484,039,845✔
610
                hpx::state::pre_sleep;
611

612
            // extract the stealing mode once per loop iteration
613
            bool enable_stealing =
484,039,845✔
614
                scheduler.SchedulingPolicy::has_scheduler_mode(
484,039,845✔
615
                    policies::scheduler_mode::enable_stealing);
616

617
            // stealing staged threads is enabled if:
618
            // - fast idle mode is on: same as normal stealing
619
            // - fast idle mode off: only after normal stealing has failed for
620
            //                       a while
621
            bool enable_stealing_staged = enable_stealing;
484,039,845✔
622
            if (!scheduler.SchedulingPolicy::has_scheduler_mode(
484,039,845✔
623
                    policies::scheduler_mode::fast_idle_mode))
624
            {
625
                enable_stealing_staged = enable_stealing_staged &&
943,156,719✔
626
                    idle_loop_count > params.max_idle_loop_count_ / 2;
459,116,874✔
627
            }
484,958,275✔
628

629
            if (HPX_LIKELY(thrd ||
484,958,275✔
630
                    scheduler.SchedulingPolicy::get_next_thread(
631
                        num_thread, running, thrd, enable_stealing)))
632
            {
633
                [[maybe_unused]] tfunc_time_wrapper tfunc_time_collector(
9,690,088✔
634
                    idle_rate);
635
                HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() ==
9,690,098✔
636
                    &scheduler);
637

638
                idle_loop_count = 0;
9,690,032✔
639
                ++busy_loop_count;
9,690,032✔
640

641
                may_exit = false;
9,690,032✔
642

643
                // Only pending HPX threads will be executed. Any non-pending
644
                // HPX threads are leftovers from a set_state() call for a
645
                // previously pending HPX thread (see comments above).
646
                thread_state state = get_thread_id_data(thrd)->get_state();
9,690,032✔
647
                thread_schedule_state state_val = state.state();
9,690,032✔
648

649
                if (HPX_LIKELY(thread_schedule_state::pending == state_val))
9,690,032✔
650
                {
651
                    // switch the state of the thread to active and back to what
652
                    // the thread reports as its return value
653

654
                    {
655
                        // tries to set state to active (only if state is still
656
                        // the same as 'state')
657
                        detail::switch_status thrd_stat(thrd, state);
9,690,132✔
658
                        if (HPX_LIKELY(thrd_stat.is_valid() &&
9,690,032✔
659
                                thrd_stat.get_previous() ==
660
                                    thread_schedule_state::pending))
661
                        {
662
                            detail::write_state_log(scheduler, num_thread, thrd,
19,380,144✔
663
                                thrd_stat.get_previous(),
9,690,327✔
664
                                thread_schedule_state::active);
665

666
                            [[maybe_unused]] tfunc_time_wrapper
667
                                tfunc_time_collector(idle_rate);
9,689,163✔
668

669
                            // thread returns new required state store the
670
                            // returned state in the thread
671
                            {
672
                                is_active_wrapper utilization(
9,689,163✔
673
                                    counters.is_active_);
9,689,163✔
674
                                auto* thrdptr = get_thread_id_data(thrd);
9,689,163✔
675
#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
676
                                util::itt::caller_context cctx(ctx);
677
                                // util::itt::undo_frame_context undoframe(fctx);
678
                                util::itt::task task =
679
                                    thrdptr->get_description().get_task_itt(
680
                                        thread_domain);
681
                                task.add_metadata(task_id, thrdptr);
682
                                task.add_metadata(
683
                                    task_phase, thrdptr->get_thread_phase());
684
#endif
685
                                // Record time elapsed in thread changing state
686
                                // and add to aggregate execution time.
687
                                [[maybe_unused]] exec_time_wrapper
688
                                    exec_time_collector(idle_rate);
9,689,163✔
689

690
#if defined(HPX_HAVE_APEX)
691
                                // get the APEX data pointer, in case we are
692
                                // resuming the thread and have to restore any
693
                                // leaf timers from direct actions, etc.
694

695
                                // the address of tmp_data is getting stored by
696
                                // APEX during this call
697
                                util::external_timer::scoped_timer profiler(
698
                                    thrdptr->get_timer_data());
699

700
                                thrd_stat = (*thrdptr)(context_storage);
701

702
                                if (thrd_stat.get_previous() ==
703
                                    thread_schedule_state::terminated)
704
                                {
705
                                    profiler.stop();
706
                                    // just in case, clean up the now dead pointer.
707
                                    thrdptr->set_timer_data(nullptr);
708
                                }
709
                                else
710
                                {
711
                                    profiler.yield();
712
                                }
713
#else
714
                                thrd_stat = (*thrdptr)(context_storage);
9,690,327✔
715
#endif
716
                            }
9,689,163✔
717

718
                            detail::write_state_log(scheduler, num_thread, thrd,
19,378,326✔
719
                                thread_schedule_state::active,
720
                                thrd_stat.get_previous());
9,689,982✔
721

722
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
723
                            ++counters.executed_thread_phases_;
9,690,185✔
724
#endif
725
                        }
9,690,185✔
726
                        else
727
                        {
728
                            // some other worker-thread got in between and
729
                            // started executing this HPX-thread, we just
730
                            // continue with the next one
731
                            thrd_stat.disable_restore();
×
732
                            detail::write_state_log_warning(scheduler,
×
733
                                num_thread, thrd, state_val, "no execution");
×
734
                            continue;
×
735
                        }
736

737
                        // store and retrieve the new state in the thread
738
                        if (HPX_UNLIKELY(!thrd_stat.store_state(state)))
9,690,241✔
739
                        {
740
                            // some other worker-thread got in between and
741
                            // changed the state of this thread, we just
742
                            // continue with the next one
743
                            detail::write_state_log_warning(scheduler,
×
744
                                num_thread, thrd, state_val, "no state change");
×
745
                            continue;
×
746
                        }
747

748
                        state_val = state.state();
9,690,100✔
749

750
                        // any exception thrown from the thread will reset its
751
                        // state at this point
752

753
                        // handle next thread id if given (switch directly to
754
                        // this thread)
755
                        next_thrd = thrd_stat.move_next_thread();
9,690,100✔
756
                    }
9,690,241✔
757

758
                    // Re-add this work item to our list of work items if the
759
                    // HPX thread should be re-scheduled. If the HPX thread is
760
                    // suspended now we just keep it in the map of threads.
761
                    if (HPX_UNLIKELY(
9,690,161✔
762
                            state_val == thread_schedule_state::pending))
763
                    {
764
                        if (HPX_LIKELY(next_thrd == nullptr))
2,332,128✔
765
                        {
766
                            // schedule other work
767
                            scheduler.SchedulingPolicy::wait_or_add_new(
4,635,964✔
768
                                num_thread, running, idle_loop_count,
2,317,983✔
769
                                enable_stealing_staged, added);
2,317,983✔
770
                        }
2,317,983✔
771

772
                        // schedule this thread again, make sure it ends up at
773
                        // the end of the queue
774
                        scheduler.SchedulingPolicy::schedule_thread_last(
4,664,258✔
775
                            HPX_MOVE(thrd),
2,332,112✔
776
                            threads::thread_schedule_hint(
2,332,112✔
777
                                static_cast<std::int16_t>(num_thread)),
2,332,112✔
778
                            true);
779
                        scheduler.SchedulingPolicy::do_some_work(num_thread);
2,332,109✔
780
                    }
2,332,099✔
781
                    else if (HPX_UNLIKELY(state_val ==
7,358,032✔
782
                                 thread_schedule_state::pending_boost))
783
                    {
784
                        get_thread_id_data(thrd)->set_state(
461,033✔
785
                            thread_schedule_state::pending);
786

787
                        if (HPX_LIKELY(next_thrd == nullptr))
461,033✔
788
                        {
789
                            // reschedule this thread right away if the
790
                            // background work will be triggered
791
                            if (HPX_UNLIKELY(busy_loop_count >
461,033✔
792
                                    params.max_busy_loop_count_))
793
                            {
794
                                next_thrd = HPX_MOVE(thrd);
222✔
795
                            }
222✔
796
                            else
797
                            {
798
                                // schedule other work
799
                                scheduler.SchedulingPolicy::wait_or_add_new(
921,622✔
800
                                    num_thread, running, idle_loop_count,
460,811✔
801
                                    enable_stealing_staged, added);
460,811✔
802

803
                                // schedule this thread again immediately with
804
                                // boosted priority
805
                                scheduler.SchedulingPolicy::schedule_thread(
921,622✔
806
                                    HPX_MOVE(thrd),
460,810✔
807
                                    threads::thread_schedule_hint(
460,810✔
808
                                        static_cast<std::int16_t>(num_thread)),
460,810✔
809
                                    true, thread_priority::boost);
810
                                scheduler.SchedulingPolicy::do_some_work(
921,620✔
811
                                    num_thread);
460,811✔
812
                            }
813
                        }
461,033✔
814
                        else if (HPX_LIKELY(next_thrd != thrd))
×
815
                        {
816
                            // schedule this thread again immediately with
817
                            // boosted priority
818
                            scheduler.SchedulingPolicy::schedule_thread(
×
819
                                HPX_MOVE(thrd),
×
820
                                threads::thread_schedule_hint(
×
821
                                    static_cast<std::int16_t>(num_thread)),
×
822
                                true, thread_priority::boost);
823
                            scheduler.SchedulingPolicy::do_some_work(
×
824
                                num_thread);
×
825
                        }
×
826
                    }
461,033✔
827
                }
9,690,056✔
828
                else if (HPX_UNLIKELY(
×
829
                             thread_schedule_state::active == state_val))
830
                {
831
                    write_rescheduling_log_warning(scheduler, num_thread, thrd);
×
832

833
                    // re-schedule thread, if it is still marked as active this
834
                    // might happen, if some thread has been added to the
835
                    // scheduler queue already but the state has not been reset
836
                    // yet
837
                    auto* thrdptr = get_thread_id_data(thrd);
×
838
                    auto priority = thrdptr->get_priority();
×
839
                    scheduler.SchedulingPolicy::schedule_thread(HPX_MOVE(thrd),
×
840
                        threads::thread_schedule_hint(
×
841
                            static_cast<std::int16_t>(num_thread)),
×
842
                        true, priority);
×
843
                    scheduler.SchedulingPolicy::do_some_work(num_thread);
×
844
                }
×
845

846
                // Remove the mapping from thread_map_ if HPX thread is depleted
847
                // or terminated, this will delete the HPX thread. REVIEW: what
848
                // has to be done with depleted HPX threads?
849
                if (HPX_LIKELY(state_val == thread_schedule_state::depleted ||
9,690,056✔
850
                        state_val == thread_schedule_state::terminated))
851
                {
852
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
853
                    ++counters.executed_threads_;
5,757,438✔
854
#endif
855
                    thrd = thread_id_type();
5,757,438✔
856
                }
5,757,438✔
857
            }
9,690,186✔
858

859
            // if nothing else has to be done either wait or terminate
860
            else
861
            {
862
                ++idle_loop_count;
472,050,568✔
863

864
                if (scheduler.SchedulingPolicy::wait_or_add_new(num_thread,
934,251,850✔
865
                        running, idle_loop_count, enable_stealing_staged,
472,050,568✔
866
                        added))
867
                {
868
                    // Clean up terminated threads before trying to exit
869
                    bool can_exit = !running &&
441,221,594✔
870
                        scheduler.SchedulingPolicy::cleanup_terminated(
68,980✔
871
                            num_thread, true) &&
34,490✔
872
                        scheduler.SchedulingPolicy::get_queue_length(
69,510✔
873
                            num_thread) == 0;
34,755✔
874

875
                    if (this_state.load(std::memory_order_relaxed) ==
443,315,409✔
876
                        hpx::state::pre_sleep)
877
                    {
878
                        if (can_exit)
30,359✔
879
                        {
880
                            scheduler.SchedulingPolicy::suspend(num_thread);
876✔
881
                        }
876✔
882
                    }
30,369✔
883
                    else
884
                    {
885
                        can_exit = can_exit &&
443,289,897✔
886
                            scheduler.SchedulingPolicy::get_thread_count(
9,524✔
887
                                thread_schedule_state::suspended,
888
                                thread_priority::default_, num_thread) == 0;
4,762✔
889

890
                        if (can_exit)
442,917,592✔
891
                        {
892
                            if (!scheduler.SchedulingPolicy::has_scheduler_mode(
4,804✔
893
                                    policies::scheduler_mode::delay_exit))
894
                            {
895
                                // If this is an inner scheduler, try to exit
896
                                // immediately
897
                                if (background_thread != nullptr)
8✔
898
                                {
899
                                    HPX_ASSERT(background_running);
×
900
                                    *background_running = false;
×
901
                                    auto priority =
×
902
                                        get_thread_id_data(background_thread)
×
903
                                            ->get_priority();
×
904

905
                                    scheduler.SchedulingPolicy::
×
906
                                        decrement_background_thread_count();
×
907
                                    scheduler.SchedulingPolicy::schedule_thread(
×
908
                                        HPX_MOVE(background_thread),
×
909
                                        threads::thread_schedule_hint(
×
910
                                            static_cast<std::int16_t>(
911
                                                num_thread)),
×
912
                                        true, priority);
×
913
                                    scheduler.SchedulingPolicy::do_some_work(
×
914
                                        num_thread);
×
915

916
                                    background_thread = thread_id_type();
×
917
                                    background_running.reset();
×
918
                                }
×
919
                                else
920
                                {
921
                                    this_state.store(hpx::state::stopped);
7✔
922
                                    break;
7✔
923
                                }
924
                            }
×
925
                            else
926
                            {
927
                                // Otherwise, keep idling for some time
928
                                if (!may_exit)
4,790✔
929
                                    idle_loop_count = 0;
4,791✔
930
                                may_exit = true;
4,790✔
931
                            }
932
                        }
4,792✔
933
                    }
934
                }
442,268,563✔
935
                else if (!may_exit && added == 0 &&
32,583,945✔
936
                    (scheduler.SchedulingPolicy::has_scheduler_mode(
30,417,143✔
937
                        policies::scheduler_mode::fast_idle_mode)))
938
                {
939
                    // speed up idle suspend if no work was stolen
940
                    idle_loop_count += params.max_idle_loop_count_ / 1024;
×
941
                    added = std::size_t(-1);
×
942
                }
×
943

944
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
945
    defined(HPX_HAVE_THREAD_IDLE_RATES)
946
                // do background work in parcel layer and in agas
947
                if (!call_background_thread(background_thread, next_thrd,
948
                        scheduler, num_thread, running, bg_work_exec_time_init,
949
                        context_storage))
950
#else
951
                if (!call_background_thread(background_thread, next_thrd,
474,425,767✔
952
                        scheduler, num_thread, running, context_storage))
476,297,859✔
953
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
954
                {
955
                    // Let the current background thread terminate as soon as
956
                    // possible. No need to reschedule, as another LCO will set
957
                    // it to pending and schedule it back eventually
958
                    HPX_ASSERT(background_thread);
×
959
                    HPX_ASSERT(background_running);
×
960
                    *background_running = false;
×
961
                    scheduler
×
962
                        .SchedulingPolicy::decrement_background_thread_count();
×
963
                    // Create a new one which will replace the current such we
964
                    // avoid deadlock situations, if all background threads are
965
                    // blocked.
966
                    background_thread = create_background_thread(scheduler,
×
967
                        params, background_running,
×
968
                        thread_schedule_hint(
×
969
                            static_cast<std::int16_t>(num_thread)),
×
970
                        idle_loop_count);
×
971
                }
×
972
                // call back into invoking context
973
                if (!params.inner_.empty())
475,984,200✔
974
                {
975
                    params.inner_();
×
976
                    context_storage = hpx::execution_base::this_thread::detail::
×
977
                        get_agent_storage();
978
                }
×
979
            }
980

981
            if (scheduler.custom_polling_function() ==
485,321,955✔
982
                policies::detail::polling_status::busy)
983
            {
984
                idle_loop_count = 0;
×
985
            }
×
986

987
            // something went badly wrong, give up
988
            if (HPX_UNLIKELY(this_state.load(std::memory_order_relaxed) ==
481,659,867✔
989
                    hpx::state::terminating))
990
            {
991
                break;
×
992
            }
993

994
            if (busy_loop_count > params.max_busy_loop_count_)
481,863,261✔
995
            {
996
                busy_loop_count = 0;
4,452✔
997

998
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
999
    defined(HPX_HAVE_THREAD_IDLE_RATES)
1000
                // do background work in parcel layer and in agas
1001
                if (!call_background_thread(background_thread, next_thrd,
1002
                        scheduler, num_thread, running, bg_work_exec_time_init,
1003
                        context_storage))
1004
#else
1005
                // do background work in parcel layer and in agas
1006
                if (!call_background_thread(background_thread, next_thrd,
4,452✔
1007
                        scheduler, num_thread, running, context_storage))
4,452✔
1008
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
1009
                {
1010
                    // Let the current background thread terminate as soon as
1011
                    // possible. No need to reschedule, as another LCO will set
1012
                    // it to pending and schedule it back eventually
1013
                    HPX_ASSERT(background_thread);
×
1014
                    HPX_ASSERT(background_running);
×
1015
                    *background_running = false;
×
1016
                    scheduler
×
1017
                        .SchedulingPolicy::decrement_background_thread_count();
×
1018
                    // Create a new one which will replace the current such we
1019
                    // avoid deadlock situations, if all background threads are
1020
                    // blocked.
1021
                    background_thread = create_background_thread(scheduler,
×
1022
                        params, background_running,
×
1023
                        thread_schedule_hint(
×
1024
                            static_cast<std::int16_t>(num_thread)),
×
1025
                        idle_loop_count);
×
1026
                }
×
1027
            }
4,452✔
1028
            else if (idle_loop_count > params.max_idle_loop_count_ || may_exit)
481,858,809✔
1029
            {
1030
                if (idle_loop_count > params.max_idle_loop_count_)
5,226✔
1031
                    idle_loop_count = 0;
474✔
1032

1033
                // call back into invoking context
1034
                if (!params.outer_.empty())
5,239✔
1035
                {
1036
                    params.outer_();
5,412✔
1037
                    context_storage = hpx::execution_base::this_thread::detail::
5,412✔
1038
                        get_agent_storage();
1039
                }
5,413✔
1040

1041
                // break if we were idling after 'may_exit'
1042
                if (may_exit)
5,407✔
1043
                {
1044
                    HPX_ASSERT(this_state.load(std::memory_order_relaxed) !=
4,933✔
1045
                        hpx::state::pre_sleep);
1046

1047
                    if (background_thread)
4,927✔
1048
                    {
1049
                        HPX_ASSERT(background_running);
594✔
1050
                        *background_running = false;
599✔
1051
                        auto priority = get_thread_id_data(background_thread)
1,182✔
1052
                                            ->get_priority();
599✔
1053

1054
                        scheduler.SchedulingPolicy::
599✔
1055
                            decrement_background_thread_count();
599✔
1056
                        scheduler.SchedulingPolicy::schedule_thread(
1,182✔
1057
                            HPX_MOVE(background_thread),
599✔
1058
                            threads::thread_schedule_hint(
599✔
1059
                                static_cast<std::int16_t>(num_thread)),
599✔
1060
                            true, priority);
599✔
1061
                        scheduler.SchedulingPolicy::do_some_work(num_thread);
599✔
1062

1063
                        background_thread = thread_id_type();
599✔
1064
                        background_running.reset();
599✔
1065
                    }
599✔
1066
                    else
1067
                    {
1068
                        bool can_exit = !running &&
8,545✔
1069
                            scheduler.SchedulingPolicy::cleanup_terminated(
4,224✔
1070
                                true) &&
4,238✔
1071
                            scheduler.SchedulingPolicy::get_thread_count(
8,476✔
1072
                                thread_schedule_state::suspended,
1073
                                thread_priority::default_, num_thread) == 0 &&
4,238✔
1074
                            scheduler.SchedulingPolicy::get_queue_length(
8,616✔
1075
                                num_thread) == 0;
4,308✔
1076

1077
                        if (can_exit)
4,213✔
1078
                        {
1079
                            this_state.store(hpx::state::stopped);
4,320✔
1080
                            break;
4,320✔
1081
                        }
1082
                    }
1083

1084
                    may_exit = false;
599✔
1085
                }
599✔
1086
                else
1087
                {
1088
                    scheduler.SchedulingPolicy::cleanup_terminated(true);
474✔
1089
                }
1090
            }
1,073✔
1091
        }
480,189,838✔
1092
    }
4,298✔
1093
}    // namespace hpx::threads::detail
1094

1095
// NOTE: This line only exists to please doxygen. Without the line doxygen
1096
// generates incomplete xml output.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc