• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #866

14 Jan 2023 04:16PM UTC coverage: 85.951% (-0.5%) from 86.431%
#866

push

StellarBot
Merge #6134

6134: Adding notification function for parcelports to be called after early parcel handling r=hkaiser a=hkaiser

Parcelports now can override a new function `void initialized()` that will be called after early parcel handling is finished and before the thread pools are operational (i.e. before background work starts).

`@JiakunYan` please let me know if this is what you requested.

Co-authored-by: Hartmut Kaiser <hartmut.kaiser@gmail.com>

4 of 4 new or added lines in 2 files covered. (100.0%)

173540 of 201905 relevant lines covered (85.95%)

1871917.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.6
/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp
1
//  Copyright (c) 2007-2022 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#pragma once
8

9
#include <hpx/config.hpp>
10
#include <hpx/assert.hpp>
11
#include <hpx/execution_base/this_thread.hpp>
12
#include <hpx/functional/move_only_function.hpp>
13
#include <hpx/hardware/timestamp.hpp>
14
#include <hpx/modules/itt_notify.hpp>
15
#include <hpx/thread_pools/detail/scheduling_log.hpp>
16
#include <hpx/threading_base/scheduler_base.hpp>
17
#include <hpx/threading_base/scheduler_state.hpp>
18
#include <hpx/threading_base/thread_data.hpp>
19

20
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
21
    defined(HPX_HAVE_THREAD_IDLE_RATES)
22
#include <hpx/thread_pools/detail/scoped_background_timer.hpp>
23
#endif
24

25
#if defined(HPX_HAVE_APEX)
26
#include <hpx/threading_base/external_timer.hpp>
27
#endif
28

29
#include <atomic>
30
#include <cstddef>
31
#include <cstdint>
32
#include <limits>
33
#include <memory>
34
#include <utility>
35

36
namespace hpx::threads::detail {
37

38
    ///////////////////////////////////////////////////////////////////////
39
    // helper class for switching thread state in and out during execution
40
    class switch_status
41
    {
42
    public:
43
        switch_status(
9,535,831✔
44
            thread_id_ref_type const& t, thread_state prev_state) noexcept
45
          : thread_(get_thread_id_data(t))
9,535,906✔
46
          , prev_state_(prev_state)
9,535,906✔
47
          , next_thread_id_(nullptr)
9,535,906✔
48
          , need_restore_state_(thread_->set_state_tagged(
19,071,662✔
49
                thread_schedule_state::active, prev_state_, orig_state_))
9,535,906✔
50
        {
51
        }
9,535,906✔
52

53
        ~switch_status()
9,536,137✔
54
        {
55
            if (need_restore_state_)
9,536,138✔
56
            {
57
                store_state(prev_state_);
×
58
            }
×
59
        }
9,536,129✔
60

61
        constexpr bool is_valid() const noexcept
9,536,137✔
62
        {
63
            return need_restore_state_;
9,536,138✔
64
        }
65

66
        // allow to change the state the thread will be switched to after
67
        // execution
68
        thread_state operator=(thread_result_type&& new_state) noexcept
9,535,151✔
69
        {
70
            prev_state_ = thread_state(
9,535,265✔
71
                new_state.first, prev_state_.state_ex(), prev_state_.tag() + 1);
9,535,265✔
72
            if (new_state.second != nullptr)
9,535,265✔
73
            {
74
                next_thread_id_ = HPX_MOVE(new_state.second);
13,298✔
75
            }
13,298✔
76
            return prev_state_;
9,535,266✔
77
        }
78

79
        // Get the state this thread was in before execution (usually pending),
80
        // this helps making sure no other worker-thread is started to execute
81
        // this HPX-thread in the meantime.
82
        thread_schedule_state get_previous() const noexcept
28,601,997✔
83
        {
84
            return prev_state_.state();
28,602,041✔
85
        }
86

87
        // This restores the previous state, while making sure that the original
88
        // state has not been changed since we started executing this thread.
89
        // The function returns true if the state has been set, false otherwise.
90
        bool store_state(thread_state& newstate) noexcept
9,536,106✔
91
        {
92
            disable_restore();
9,536,234✔
93

94
            if (thread_->restore_state(prev_state_, orig_state_))
9,536,234✔
95
            {
96
                newstate = prev_state_;
9,536,235✔
97
                return true;
9,536,235✔
98
            }
99
            return false;
×
100
        }
9,536,209✔
101

102
        // disable default handling in destructor
103
        void disable_restore() noexcept
9,536,116✔
104
        {
105
            need_restore_state_ = false;
9,536,116✔
106
        }
9,536,116✔
107

108
        constexpr thread_id_ref_type const& get_next_thread() const noexcept
109
        {
110
            return next_thread_id_;
111
        }
112

113
        thread_id_ref_type move_next_thread() noexcept
9,536,140✔
114
        {
115
            return HPX_MOVE(next_thread_id_);
9,536,154✔
116
        }
117

118
    private:
119
        thread_data* thread_;
120
        thread_state prev_state_;
121
        thread_state orig_state_;
122
        thread_id_ref_type next_thread_id_;
123
        bool need_restore_state_;
124
    };
125

126
    class switch_status_background
127
    {
128
    public:
129
        switch_status_background(
76,351,226✔
130
            thread_id_ref_type const& t, thread_state prev_state) noexcept
131
          : thread_(get_thread_id_data(t))
76,358,907✔
132
          , prev_state_(prev_state)
76,358,907✔
133
          , next_thread_id_(nullptr)
76,358,907✔
134
          , need_restore_state_(
152,702,452✔
135
                thread_->set_state_tagged(thread_schedule_state::active,
152,702,452✔
136
                    prev_state_, orig_state_, std::memory_order_relaxed))
76,358,907✔
137
        {
138
        }
76,358,907✔
139

140
        ~switch_status_background()
76,692,508✔
141
        {
142
            if (need_restore_state_)
76,695,470✔
143
            {
144
                store_state(prev_state_);
×
145
            }
×
146
        }
76,678,312✔
147

148
        constexpr bool is_valid() const noexcept
77,515,759✔
149
        {
150
            return need_restore_state_;
77,524,047✔
151
        }
152

153
        // allow to change the state the thread will be switched to after
154
        // execution
155
        thread_state operator=(thread_result_type&& new_state) noexcept
76,249,600✔
156
        {
157
            prev_state_ = thread_state(
76,460,648✔
158
                new_state.first, prev_state_.state_ex(), prev_state_.tag() + 1);
76,460,648✔
159
            if (new_state.second != nullptr)
76,460,648✔
160
            {
161
                next_thread_id_ = HPX_MOVE(new_state.second);
×
162
            }
×
163
            return prev_state_;
76,461,803✔
164
        }
165

166
        // Get the state this thread was in before execution (usually pending),
167
        // this helps making sure no other worker-thread is started to execute
168
        // this HPX-thread in the meantime.
169
        thread_schedule_state get_previous() const noexcept
77,187,585✔
170
        {
171
            return prev_state_.state();
77,189,001✔
172
        }
173

174
        // This restores the previous state, while making sure that the original
175
        // state has not been changed since we started executing this thread.
176
        // The function returns true if the state has been set, false otherwise.
177
        bool store_state(thread_state& newstate) noexcept
76,231,593✔
178
        {
179
            disable_restore();
77,159,010✔
180
            if (thread_->restore_state(prev_state_, orig_state_))
77,159,010✔
181
            {
182
                newstate = prev_state_;
77,047,915✔
183
                return true;
77,047,915✔
184
            }
185
            return false;
×
186
        }
77,049,249✔
187

188
        // disable default handling in destructor
189
        void disable_restore() noexcept
76,215,187✔
190
        {
191
            need_restore_state_ = false;
76,215,535✔
192
        }
76,215,535✔
193

194
        constexpr thread_id_ref_type const& get_next_thread() const noexcept
195
        {
196
            return next_thread_id_;
197
        }
198

199
        thread_id_ref_type move_next_thread() noexcept
76,470,546✔
200
        {
201
            return HPX_MOVE(next_thread_id_);
76,479,865✔
202
        }
203

204
    private:
205
        thread_data* thread_;
206
        thread_state prev_state_;
207
        thread_state orig_state_;
208
        thread_id_ref_type next_thread_id_;
209
        bool need_restore_state_;
210
    };
211

212
#ifdef HPX_HAVE_THREAD_IDLE_RATES
213
    struct idle_collect_rate
214
    {
215
        idle_collect_rate(
216
            std::int64_t& tfunc_time, std::int64_t& exec_time) noexcept
217
          : start_timestamp_(util::hardware::timestamp())
218
          , tfunc_time_(tfunc_time)
219
          , exec_time_(exec_time)
220
        {
221
        }
222

223
        void collect_exec_time(std::int64_t timestamp) noexcept
224
        {
225
            exec_time_ += util::hardware::timestamp() - timestamp;
226
        }
227

228
        void take_snapshot() noexcept
229
        {
230
            if (tfunc_time_ == std::int64_t(-1))
231
            {
232
                start_timestamp_ = util::hardware::timestamp();
233
                tfunc_time_ = 0;
234
                exec_time_ = 0;
235
            }
236
            else
237
            {
238
                tfunc_time_ = util::hardware::timestamp() - start_timestamp_;
239
            }
240
        }
241

242
        std::int64_t start_timestamp_;
243

244
        std::int64_t& tfunc_time_;
245
        std::int64_t& exec_time_;
246
    };
247

248
    struct exec_time_wrapper
249
    {
250
        explicit exec_time_wrapper(idle_collect_rate& idle_rate) noexcept
251
          : timestamp_(util::hardware::timestamp())
252
          , idle_rate_(idle_rate)
253
        {
254
        }
255
        ~exec_time_wrapper()
256
        {
257
            idle_rate_.collect_exec_time(timestamp_);
258
        }
259

260
        std::int64_t timestamp_;
261
        idle_collect_rate& idle_rate_;
262
    };
263

264
    struct tfunc_time_wrapper
265
    {
266
        explicit constexpr tfunc_time_wrapper(
267
            idle_collect_rate& idle_rate) noexcept
268
          : idle_rate_(idle_rate)
269
        {
270
        }
271
        ~tfunc_time_wrapper()
272
        {
273
            idle_rate_.take_snapshot();
274
        }
275

276
        idle_collect_rate& idle_rate_;
277
    };
278
#else
279
    struct idle_collect_rate
280
    {
281
        explicit constexpr idle_collect_rate(
4,091✔
282
            std::int64_t&, std::int64_t&) noexcept
283
        {
284
        }
4,095✔
285
    };
286

287
    struct exec_time_wrapper
288
    {
289
        explicit constexpr exec_time_wrapper(idle_collect_rate&) noexcept {}
9,536,248✔
290
    };
291

292
    struct tfunc_time_wrapper
293
    {
294
        explicit constexpr tfunc_time_wrapper(idle_collect_rate&) noexcept {}
19,072,043✔
295
    };
296
#endif
297

298
    ///////////////////////////////////////////////////////////////////////////
299
    struct is_active_wrapper
300
    {
301
        explicit is_active_wrapper(bool& is_active) noexcept
9,536,261✔
302
          : is_active_(is_active)
9,536,262✔
303
        {
304
            is_active = true;
9,536,262✔
305
        }
9,536,262✔
306
        ~is_active_wrapper()
9,535,282✔
307
        {
308
            is_active_ = false;
9,535,204✔
309
        }
9,535,204✔
310

311
        bool& is_active_;
312
    };
313

314
    ///////////////////////////////////////////////////////////////////////////
315
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
316
    defined(HPX_HAVE_THREAD_IDLE_RATES)
317
    struct scheduling_counters
318
    {
319
        scheduling_counters(std::int64_t& executed_threads,
320
            std::int64_t& executed_thread_phases, std::int64_t& tfunc_time,
321
            std::int64_t& exec_time, std::int64_t& idle_loop_count,
322
            std::int64_t& busy_loop_count, bool& is_active,
323
            std::int64_t& background_work_duration,
324
            std::int64_t& background_send_duration,
325
            std::int64_t& background_receive_duration) noexcept
326
          : executed_threads_(executed_threads)
327
          , executed_thread_phases_(executed_thread_phases)
328
          , tfunc_time_(tfunc_time)
329
          , exec_time_(exec_time)
330
          , idle_loop_count_(idle_loop_count)
331
          , busy_loop_count_(busy_loop_count)
332
          , background_work_duration_(background_work_duration)
333
          , background_send_duration_(background_send_duration)
334
          , background_receive_duration_(background_receive_duration)
335
          , is_active_(is_active)
336
        {
337
        }
338

339
        std::int64_t& executed_threads_;
340
        std::int64_t& executed_thread_phases_;
341
        std::int64_t& tfunc_time_;
342
        std::int64_t& exec_time_;
343
        std::int64_t& idle_loop_count_;
344
        std::int64_t& busy_loop_count_;
345
        std::int64_t& background_work_duration_;
346
        std::int64_t& background_send_duration_;
347
        std::int64_t& background_receive_duration_;
348
        bool& is_active_;
349
    };
350
#else
351
    struct scheduling_counters
352
    {
353
        scheduling_counters(std::int64_t& executed_threads,
4,322✔
354
            std::int64_t& executed_thread_phases, std::int64_t& tfunc_time,
355
            std::int64_t& exec_time, std::int64_t& idle_loop_count,
356
            std::int64_t& busy_loop_count, bool& is_active) noexcept
357
          : executed_threads_(executed_threads)
4,326✔
358
          , executed_thread_phases_(executed_thread_phases)
4,326✔
359
          , tfunc_time_(tfunc_time)
4,326✔
360
          , exec_time_(exec_time)
4,326✔
361
          , idle_loop_count_(idle_loop_count)
4,326✔
362
          , busy_loop_count_(busy_loop_count)
4,326✔
363
          , is_active_(is_active)
4,326✔
364
        {
365
        }
4,326✔
366

367
        std::int64_t& executed_threads_;
368
        std::int64_t& executed_thread_phases_;
369
        std::int64_t& tfunc_time_;
370
        std::int64_t& exec_time_;
371
        std::int64_t& idle_loop_count_;
372
        std::int64_t& busy_loop_count_;
373
        bool& is_active_;
374
    };
375

376
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
377

378
    struct scheduling_callbacks
4,232✔
379
    {
380
        using callback_type = hpx::move_only_function<void()>;
381
        using background_callback_type = hpx::move_only_function<bool()>;
382

383
        explicit scheduling_callbacks(callback_type&& outer,
4,127✔
384
            callback_type&& inner = callback_type(),
385
            background_callback_type&& background = background_callback_type(),
386
            std::size_t max_background_threads =
387
                (std::numeric_limits<std::size_t>::max)(),
388
            std::size_t max_idle_loop_count = HPX_IDLE_LOOP_COUNT_MAX,
389
            std::size_t max_busy_loop_count = HPX_BUSY_LOOP_COUNT_MAX)
390
          : outer_(HPX_MOVE(outer))
4,136✔
391
          , inner_(HPX_MOVE(inner))
4,136✔
392
          , background_(HPX_MOVE(background))
4,136✔
393
          , max_background_threads_(max_background_threads)
4,136✔
394
          , max_idle_loop_count_(max_idle_loop_count)
4,136✔
395
          , max_busy_loop_count_(max_busy_loop_count)
4,136✔
396
        {
397
        }
4,136✔
398

399
        callback_type outer_;
400
        callback_type inner_;
401
        background_callback_type background_;
402
        std::size_t const max_background_threads_;
403
        std::int64_t const max_idle_loop_count_;
404
        std::int64_t const max_busy_loop_count_;
405
    };
406

407
    template <typename SchedulingPolicy>
408
    thread_id_ref_type create_background_thread(SchedulingPolicy& scheduler,
553✔
409
        scheduling_callbacks& callbacks,
410
        std::shared_ptr<bool>& background_running, std::size_t num_thread,
411
        std::int64_t& idle_loop_count)
412
    {
413
        threads::thread_schedule_hint schedulehint(
541✔
414
            static_cast<std::int16_t>(num_thread));
541✔
415

416
        thread_id_ref_type background_thread;
541✔
417
        background_running.reset(new bool(true));
553✔
418
        thread_init_data background_init(
502✔
419
            [&, background_running](
2,746✔
420
                thread_restart_state) -> thread_result_type {
421
                while (*background_running)
76,349,871✔
422
                {
423
                    if (callbacks.background_())
76,349,279✔
424
                    {
425
                        // we only update the idle_loop_count if
426
                        // background_running is true. If it was false, this
427
                        // task was given back to the scheduler.
428
                        if (*background_running)
×
429
                            idle_loop_count = 0;
×
430
                    }
×
431
                    // Force yield...
432
                    hpx::execution_base::this_thread::yield("background_work");
76,920,784✔
433
                }
434

435
                return thread_result_type(
599✔
436
                    thread_schedule_state::terminated, invalid_thread_id);
599✔
437
            },
438
            hpx::threads::thread_description("background_work"),
474✔
439
            thread_priority::high_recursive, schedulehint,
474✔
440
            thread_stacksize::large,
441
            // Create in suspended to prevent the thread from being scheduled
442
            // directly...
443
            thread_schedule_state::suspended, true, &scheduler);
474✔
444

445
        scheduler.SchedulingPolicy::create_thread(
599✔
446
            background_init, &background_thread, hpx::throws);
447
        HPX_ASSERT(background_thread);
599✔
448
        scheduler.SchedulingPolicy::increment_background_thread_count();
599✔
449
        // We can now set the state to pending
450
        get_thread_id_data(background_thread)
599✔
451
            ->set_state(thread_schedule_state::pending);
599✔
452
        return background_thread;
599✔
453
    }
599✔
454

455
    // This function tries to invoke the background work thread. It returns
456
    // false when we need to give the background thread back to scheduler and
457
    // create a new one that is supposed to be executed inside the
458
    // scheduling_loop, true otherwise
459
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
460
    defined(HPX_HAVE_THREAD_IDLE_RATES)
461
    template <typename SchedulingPolicy>
462
    bool call_background_thread(thread_id_ref_type& background_thread,
463
        thread_id_ref_type& next_thrd, SchedulingPolicy& scheduler,
464
        std::size_t num_thread, bool /* running */,
465
        std::int64_t& background_work_exec_time_init,
466
        hpx::execution_base::this_thread::detail::agent_storage*
467
            context_storage)
468
#else
469
    template <typename SchedulingPolicy>
470
    bool call_background_thread(thread_id_ref_type& background_thread,
452,986,645✔
471
        thread_id_ref_type& next_thrd, SchedulingPolicy& scheduler,
472
        std::size_t num_thread, bool /* running */,
473
        hpx::execution_base::this_thread::detail::agent_storage*
474
            context_storage)
475
#endif
476
    {
477
        if (HPX_UNLIKELY(background_thread))
455,287,902✔
478
        {
479
            thread_state state =
480
                get_thread_id_data(background_thread)->get_state();
76,432,396✔
481
            thread_schedule_state state_val = state.state();
76,432,396✔
482

483
            // we should only deal with pending here.
484
            HPX_ASSERT(thread_schedule_state::pending == state_val);
76,411,518✔
485

486
            // tries to set state to active (only if state is still
487
            // the same as 'state')
488
            detail::switch_status_background thrd_stat(
77,358,786✔
489
                background_thread, state);
77,358,786✔
490

491
            if (HPX_LIKELY(thrd_stat.is_valid() &&
76,399,461✔
492
                    thrd_stat.get_previous() == thread_schedule_state::pending))
493
            {
494
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
495
    defined(HPX_HAVE_THREAD_IDLE_RATES)
496
                // measure background work duration
497
                background_work_duration_counter bg_work_duration(
498
                    background_work_exec_time_init);
499
                background_exec_time_wrapper bg_exec_time(bg_work_duration);
500
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
501

502
                // invoke background thread
503
                thrd_stat =
76,313,423✔
504
                    (*get_thread_id_data(background_thread))(context_storage);
76,271,894✔
505

506
                thread_id_ref_type next = thrd_stat.move_next_thread();
76,313,423✔
507
                if (next != nullptr && next != background_thread)
76,271,894✔
508
                {
509
                    if (next_thrd == nullptr)
×
510
                    {
511
                        next_thrd = HPX_MOVE(next);
×
512
                    }
×
513
                    else
514
                    {
515
                        auto* scheduler =
×
516
                            get_thread_id_data(next)->get_scheduler_base();
×
517
                        scheduler->schedule_thread(HPX_MOVE(next),
×
518
                            threads::thread_schedule_hint(
×
519
                                static_cast<std::int16_t>(num_thread)),
×
520
                            true);
521
                        scheduler->do_some_work(num_thread);
×
522
                    }
523
                }
×
524
            }
76,313,423✔
525
            thrd_stat.store_state(state);
76,830,995✔
526
            state_val = state.state();
76,830,995✔
527

528
            if (HPX_LIKELY(state_val == thread_schedule_state::pending_boost))
76,830,995✔
529
            {
530
                get_thread_id_data(background_thread)
10,745✔
531
                    ->set_state(thread_schedule_state::pending);
10,745✔
532
            }
10,745✔
533
            else if (thread_schedule_state::terminated == state_val)
76,823,491✔
534
            {
535
                scheduler.SchedulingPolicy::decrement_background_thread_count();
×
536
                background_thread = thread_id_type();
×
537
            }
×
538
            else if (thread_schedule_state::suspended == state_val)
76,714,768✔
539
            {
540
                return false;
×
541
            }
542
        }
76,733,238✔
543
        return true;
454,974,488✔
544
    }
454,974,488✔
545

546
    template <typename SchedulingPolicy>
547
    void scheduling_loop(std::size_t num_thread, SchedulingPolicy& scheduler,
4,072✔
548
        scheduling_counters& counters, scheduling_callbacks& params)
549
    {
550
        std::atomic<hpx::state>& this_state = scheduler.get_state(num_thread);
3,969✔
551

552
#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
553
        util::itt::stack_context ctx;    // helper for itt support
554
        util::itt::thread_domain thread_domain;
555
        util::itt::id threadid(thread_domain, &scheduler);
556
        util::itt::string_handle task_id("task_id");
557
        util::itt::string_handle task_phase("task_phase");
558
        // util::itt::frame_context fctx(thread_domain);
559
#endif
560

561
        std::int64_t& idle_loop_count = counters.idle_loop_count_;
3,969✔
562
        std::int64_t& busy_loop_count = counters.busy_loop_count_;
3,969✔
563

564
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
565
    defined(HPX_HAVE_THREAD_IDLE_RATES)
566
        std::int64_t& bg_work_exec_time_init =
567
            counters.background_work_duration_;
568
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
569

570
        idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_);
3,969✔
571
        [[maybe_unused]] tfunc_time_wrapper tfunc_time_collector(idle_rate);
3,969✔
572

573
        // spin for some time after queues have become empty
574
        bool may_exit = false;
3,969✔
575

576
        std::shared_ptr<bool> background_running = nullptr;
3,969✔
577
        thread_id_ref_type background_thread;
3,969✔
578

579
        if (scheduler.SchedulingPolicy::has_scheduler_mode(
4,621✔
580
                policies::scheduler_mode::do_background_work) &&
3,969✔
581
            num_thread < params.max_background_threads_ &&
3,962✔
582
            !params.background_.empty())
544✔
583
        {
584
            background_thread = create_background_thread(scheduler, params,
1,088✔
585
                background_running, num_thread, idle_loop_count);
598✔
586
        }
595✔
587

588
        hpx::execution_base::this_thread::detail::agent_storage*
589
            context_storage =
4,302✔
590
                hpx::execution_base::this_thread::detail::get_agent_storage();
4,336✔
591

592
        std::size_t added = std::size_t(-1);
4,302✔
593
        thread_id_ref_type next_thrd;
4,302✔
594
        while (true)
461,921,580✔
595
        {
596
            thread_id_ref_type thrd = HPX_MOVE(next_thrd);
463,720,721✔
597

598
            // Get the next HPX thread from the queue
599
            bool running = this_state.load(std::memory_order_relaxed) <
463,720,721✔
600
                hpx::state::pre_sleep;
601

602
            // extract the stealing mode once per loop iteration
603
            bool enable_stealing =
463,720,721✔
604
                scheduler.SchedulingPolicy::has_scheduler_mode(
463,720,721✔
605
                    policies::scheduler_mode::enable_stealing);
606

607
            // stealing staged threads is enabled if:
608
            // - fast idle mode is on: same as normal stealing
609
            // - fast idle mode off: only after normal stealing has failed for
610
            //                       a while
611
            bool enable_stealing_staged = enable_stealing;
463,720,721✔
612
            if (enable_stealing_staged &&
461,347,594✔
613
                !scheduler.SchedulingPolicy::has_scheduler_mode(
435,297,819✔
614
                    policies::scheduler_mode::fast_idle_mode))
615
            {
616
                enable_stealing_staged =
436,186,877✔
617
                    idle_loop_count > params.max_idle_loop_count_ / 2;
436,186,877✔
618
            }
436,186,877✔
619

620
            if (HPX_LIKELY(thrd ||
463,026,497✔
621
                    scheduler.SchedulingPolicy::get_next_thread(
622
                        num_thread, running, thrd, enable_stealing)))
623
            {
624
                [[maybe_unused]] tfunc_time_wrapper tfunc_time_collector(
9,536,003✔
625
                    idle_rate);
626
                HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() ==
9,536,030✔
627
                    &scheduler);
628

629
                idle_loop_count = 0;
9,535,939✔
630
                ++busy_loop_count;
9,535,939✔
631

632
                may_exit = false;
9,535,939✔
633

634
                // Only pending HPX threads will be executed. Any non-pending
635
                // HPX threads are leftovers from a set_state() call for a
636
                // previously pending HPX thread (see comments above).
637
                thread_state state = get_thread_id_data(thrd)->get_state();
9,535,939✔
638
                thread_schedule_state state_val = state.state();
9,535,939✔
639

640
                if (HPX_LIKELY(thread_schedule_state::pending == state_val))
9,535,939✔
641
                {
642
                    // switch the state of the thread to active and back to what
643
                    // the thread reports as its return value
644

645
                    {
646
                        // tries to set state to active (only if state is still
647
                        // the same as 'state')
648
                        detail::switch_status thrd_stat(thrd, state);
9,536,100✔
649
                        if (HPX_LIKELY(thrd_stat.is_valid() &&
9,535,939✔
650
                                thrd_stat.get_previous() ==
651
                                    thread_schedule_state::pending))
652
                        {
653
                            detail::write_state_log(scheduler, num_thread, thrd,
19,072,038✔
654
                                thrd_stat.get_previous(),
9,536,242✔
655
                                thread_schedule_state::active);
656

657
                            [[maybe_unused]] tfunc_time_wrapper
658
                                tfunc_time_collector(idle_rate);
9,535,242✔
659

660
                            // thread returns new required state store the
661
                            // returned state in the thread
662
                            {
663
                                is_active_wrapper utilization(
9,535,242✔
664
                                    counters.is_active_);
9,535,242✔
665
                                auto* thrdptr = get_thread_id_data(thrd);
9,535,242✔
666
#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
667
                                util::itt::caller_context cctx(ctx);
668
                                // util::itt::undo_frame_context undoframe(fctx);
669
                                util::itt::task task =
670
                                    thrdptr->get_description().get_task_itt(
671
                                        thread_domain);
672
                                task.add_metadata(task_id, thrdptr);
673
                                task.add_metadata(
674
                                    task_phase, thrdptr->get_thread_phase());
675
#endif
676
                                // Record time elapsed in thread changing state
677
                                // and add to aggregate execution time.
678
                                [[maybe_unused]] exec_time_wrapper
679
                                    exec_time_collector(idle_rate);
9,535,242✔
680

681
#if defined(HPX_HAVE_APEX)
682
                                // get the APEX data pointer, in case we are
683
                                // resuming the thread and have to restore any
684
                                // leaf timers from direct actions, etc.
685

686
                                // the address of tmp_data is getting stored by
687
                                // APEX during this call
688
                                util::external_timer::scoped_timer profiler(
689
                                    thrdptr->get_timer_data());
690

691
                                thrd_stat = (*thrdptr)(context_storage);
692

693
                                if (thrd_stat.get_previous() ==
694
                                    thread_schedule_state::terminated)
695
                                {
696
                                    profiler.stop();
697
                                    // just in case, clean up the now dead pointer.
698
                                    thrdptr->set_timer_data(nullptr);
699
                                }
700
                                else
701
                                {
702
                                    profiler.yield();
703
                                }
704
#else
705
                                thrd_stat = (*thrdptr)(context_storage);
9,536,242✔
706
#endif
707
                            }
9,535,242✔
708

709
                            detail::write_state_log(scheduler, num_thread, thrd,
19,070,484✔
710
                                thread_schedule_state::active,
711
                                thrd_stat.get_previous());
9,536,019✔
712

713
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
714
                            ++counters.executed_thread_phases_;
9,536,131✔
715
#endif
716
                        }
9,536,131✔
717
                        else
718
                        {
719
                            // some other worker-thread got in between and
720
                            // started executing this HPX-thread, we just
721
                            // continue with the next one
722
                            thrd_stat.disable_restore();
×
723
                            detail::write_state_log_warning(scheduler,
×
724
                                num_thread, thrd, state_val, "no execution");
×
725
                            continue;
×
726
                        }
727

728
                        // store and retrieve the new state in the thread
729
                        if (HPX_UNLIKELY(!thrd_stat.store_state(state)))
9,536,184✔
730
                        {
731
                            // some other worker-thread got in between and
732
                            // changed the state of this thread, we just
733
                            // continue with the next one
734
                            detail::write_state_log_warning(scheduler,
×
735
                                num_thread, thrd, state_val, "no state change");
×
736
                            continue;
×
737
                        }
738

739
                        state_val = state.state();
9,536,145✔
740

741
                        // any exception thrown from the thread will reset its
742
                        // state at this point
743

744
                        // handle next thread id if given (switch directly to
745
                        // this thread)
746
                        next_thrd = thrd_stat.move_next_thread();
9,536,145✔
747
                    }
9,536,184✔
748

749
                    // Re-add this work item to our list of work items if the
750
                    // HPX thread should be re-scheduled. If the HPX thread is
751
                    // suspended now we just keep it in the map of threads.
752
                    if (HPX_UNLIKELY(
9,536,083✔
753
                            state_val == thread_schedule_state::pending))
754
                    {
755
                        if (HPX_LIKELY(next_thrd == nullptr))
2,337,164✔
756
                        {
757
                            // schedule other work
758
                            scheduler.SchedulingPolicy::wait_or_add_new(
4,647,732✔
759
                                num_thread, running, idle_loop_count,
2,323,865✔
760
                                enable_stealing_staged, added);
2,323,865✔
761
                        }
2,323,865✔
762

763
                        // schedule this thread again, make sure it ends up at
764
                        // the end of the queue
765
                        scheduler.SchedulingPolicy::schedule_thread_last(
4,674,326✔
766
                            HPX_MOVE(thrd),
2,337,143✔
767
                            threads::thread_schedule_hint(
2,337,143✔
768
                                static_cast<std::int16_t>(num_thread)),
2,337,143✔
769
                            true);
770
                        scheduler.SchedulingPolicy::do_some_work(num_thread);
2,337,137✔
771
                    }
2,337,137✔
772
                    else if (HPX_UNLIKELY(state_val ==
7,198,932✔
773
                                 thread_schedule_state::pending_boost))
774
                    {
775
                        get_thread_id_data(thrd)->set_state(
461,539✔
776
                            thread_schedule_state::pending);
777

778
                        if (HPX_LIKELY(next_thrd == nullptr))
461,539✔
779
                        {
780
                            // reschedule this thread right away if the
781
                            // background work will be triggered
782
                            if (HPX_UNLIKELY(busy_loop_count >
461,539✔
783
                                    params.max_busy_loop_count_))
784
                            {
785
                                next_thrd = HPX_MOVE(thrd);
220✔
786
                            }
220✔
787
                            else
788
                            {
789
                                // schedule other work
790
                                scheduler.SchedulingPolicy::wait_or_add_new(
922,638✔
791
                                    num_thread, running, idle_loop_count,
461,319✔
792
                                    enable_stealing_staged, added);
461,319✔
793

794
                                // schedule this thread again immediately with
795
                                // boosted priority
796
                                scheduler.SchedulingPolicy::schedule_thread(
922,638✔
797
                                    HPX_MOVE(thrd),
461,319✔
798
                                    threads::thread_schedule_hint(
461,319✔
799
                                        static_cast<std::int16_t>(num_thread)),
461,319✔
800
                                    true, thread_priority::boost);
801
                                scheduler.SchedulingPolicy::do_some_work(
922,638✔
802
                                    num_thread);
461,318✔
803
                            }
804
                        }
461,538✔
805
                        else if (HPX_LIKELY(next_thrd != thrd))
×
806
                        {
807
                            // schedule this thread again immediately with
808
                            // boosted priority
809
                            scheduler.SchedulingPolicy::schedule_thread(
×
810
                                HPX_MOVE(thrd),
×
811
                                threads::thread_schedule_hint(
×
812
                                    static_cast<std::int16_t>(num_thread)),
×
813
                                true, thread_priority::boost);
814
                            scheduler.SchedulingPolicy::do_some_work(
×
815
                                num_thread);
×
816
                        }
×
817
                    }
461,538✔
818
                }
9,535,894✔
819
                else if (HPX_UNLIKELY(
×
820
                             thread_schedule_state::active == state_val))
821
                {
822
                    write_rescheduling_log_warning(scheduler, num_thread, thrd);
×
823

824
                    // re-schedule thread, if it is still marked as active this
825
                    // might happen, if some thread has been added to the
826
                    // scheduler queue already but the state has not been reset
827
                    // yet
828
                    auto* thrdptr = get_thread_id_data(thrd);
×
829
                    auto priority = thrdptr->get_priority();
×
830
                    scheduler.SchedulingPolicy::schedule_thread(HPX_MOVE(thrd),
×
831
                        threads::thread_schedule_hint(
×
832
                            static_cast<std::int16_t>(num_thread)),
×
833
                        true, priority);
×
834
                    scheduler.SchedulingPolicy::do_some_work(num_thread);
×
835
                }
×
836

837
                // Remove the mapping from thread_map_ if HPX thread is depleted
838
                // or terminated, this will delete the HPX thread. REVIEW: what
839
                // has to be done with depleted HPX threads?
840
                if (HPX_LIKELY(state_val == thread_schedule_state::depleted ||
9,535,894✔
841
                        state_val == thread_schedule_state::terminated))
842
                {
843
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
844
                    ++counters.executed_threads_;
5,605,128✔
845
#endif
846
                    thrd = thread_id_type();
5,605,128✔
847
                }
5,605,128✔
848
            }
9,536,163✔
849

850
            // if nothing else has to be done either wait or terminate
851
            else
852
            {
853
                ++idle_loop_count;
452,767,038✔
854

855
                if (scheduler.SchedulingPolicy::wait_or_add_new(num_thread,
901,497,430✔
856
                        running, idle_loop_count, enable_stealing_staged,
452,767,038✔
857
                        added))
858
                {
859
                    // Clean up terminated threads before trying to exit
860
                    bool can_exit = !running &&
424,244,541✔
861
                        scheduler.SchedulingPolicy::cleanup_terminated(
53,360✔
862
                            num_thread, true) &&
26,680✔
863
                        scheduler.SchedulingPolicy::get_queue_length(
53,994✔
864
                            num_thread) == 0;
26,997✔
865

866
                    if (this_state.load(std::memory_order_relaxed) ==
423,440,488✔
867
                        hpx::state::pre_sleep)
868
                    {
869
                        if (can_exit)
22,652✔
870
                        {
871
                            scheduler.SchedulingPolicy::suspend(num_thread);
869✔
872
                        }
869✔
873
                    }
22,658✔
874
                    else
875
                    {
876
                        can_exit = can_exit &&
423,422,672✔
877
                            scheduler.SchedulingPolicy::get_thread_count(
9,486✔
878
                                thread_schedule_state::suspended,
879
                                thread_priority::default_, num_thread) == 0;
4,743✔
880

881
                        if (can_exit)
423,798,441✔
882
                        {
883
                            if (!scheduler.SchedulingPolicy::has_scheduler_mode(
4,831✔
884
                                    policies::scheduler_mode::delay_exit))
885
                            {
886
                                // If this is an inner scheduler, try to exit
887
                                // immediately
888
                                if (background_thread != nullptr)
8✔
889
                                {
890
                                    HPX_ASSERT(background_running);
×
891
                                    *background_running = false;    //-V522
×
892
                                    auto priority =
×
893
                                        get_thread_id_data(background_thread)
×
894
                                            ->get_priority();
×
895

896
                                    scheduler.SchedulingPolicy::
×
897
                                        decrement_background_thread_count();
×
898
                                    scheduler.SchedulingPolicy::schedule_thread(
×
899
                                        HPX_MOVE(background_thread),
×
900
                                        threads::thread_schedule_hint(
×
901
                                            static_cast<std::int16_t>(
902
                                                num_thread)),
×
903
                                        true, priority);
×
904
                                    scheduler.SchedulingPolicy::do_some_work(
×
905
                                        num_thread);
×
906

907
                                    background_thread = thread_id_type();
×
908
                                    background_running.reset();
×
909
                                }
×
910
                                else
911
                                {
912
                                    this_state.store(hpx::state::stopped);
8✔
913
                                    break;
8✔
914
                                }
915
                            }
×
916
                            else
917
                            {
918
                                // Otherwise, keep idling for some time
919
                                if (!may_exit)
4,806✔
920
                                    idle_loop_count = 0;
4,809✔
921
                                may_exit = true;
4,803✔
922
                            }
923
                        }
4,804✔
924
                    }
925
                }
423,654,089✔
926
                else if (!may_exit && added == 0 &&
29,648,581✔
927
                    (scheduler.SchedulingPolicy::has_scheduler_mode(
27,952,138✔
928
                        policies::scheduler_mode::fast_idle_mode)))
929
                {
930
                    // speed up idle suspend if no work was stolen
931
                    idle_loop_count += params.max_idle_loop_count_ / 1024;
×
932
                    added = std::size_t(-1);
×
933
                }
×
934

935
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
936
    defined(HPX_HAVE_THREAD_IDLE_RATES)
937
                // do background work in parcel layer and in agas
938
                if (!call_background_thread(background_thread, next_thrd,
939
                        scheduler, num_thread, running, bg_work_exec_time_init,
940
                        context_storage))
941
#else
942
                if (!call_background_thread(background_thread, next_thrd,
453,314,323✔
943
                        scheduler, num_thread, running, context_storage))
455,107,569✔
944
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
945
                {
946
                    // Let the current background thread terminate as soon as
947
                    // possible. No need to reschedule, as another LCO will set
948
                    // it to pending and schedule it back eventually
949
                    HPX_ASSERT(background_thread);
×
950
                    HPX_ASSERT(background_running);
×
951
                    *background_running = false;
×
952
                    scheduler
×
953
                        .SchedulingPolicy::decrement_background_thread_count();
×
954

955
                    // Create a new one that will replace the current such we
956
                    // avoid deadlock situations, if all background threads are
957
                    // blocked.
958
                    background_thread =
×
959
                        create_background_thread(scheduler, params,
×
960
                            background_running, num_thread, idle_loop_count);
×
961
                }
×
962
                // call back into invoking context
963
                if (!params.inner_.empty())
454,713,555✔
964
                {
965
                    params.inner_();
×
966
                    context_storage = hpx::execution_base::this_thread::detail::
×
967
                        get_agent_storage();
968
                }
×
969
            }
970

971
            if (scheduler.custom_polling_function() ==
463,439,947✔
972
                policies::detail::polling_status::busy)
973
            {
974
                idle_loop_count = 0;
×
975
            }
×
976

977
            // something went badly wrong, give up
978
            if (HPX_UNLIKELY(this_state.load(std::memory_order_relaxed) ==
460,261,048✔
979
                    hpx::state::terminating))
980
            {
981
                break;
×
982
            }
983

984
            if (busy_loop_count > params.max_busy_loop_count_)
460,513,745✔
985
            {
986
                busy_loop_count = 0;
4,384✔
987

988
#if defined(HPX_HAVE_BACKGROUND_THREAD_COUNTERS) &&                            \
989
    defined(HPX_HAVE_THREAD_IDLE_RATES)
990
                // do background work in parcel layer and in agas
991
                if (!call_background_thread(background_thread, next_thrd,
992
                        scheduler, num_thread, running, bg_work_exec_time_init,
993
                        context_storage))
994
#else
995
                // do background work in parcel layer and in agas
996
                if (!call_background_thread(background_thread, next_thrd,
4,384✔
997
                        scheduler, num_thread, running, context_storage))
4,384✔
998
#endif    // HPX_HAVE_BACKGROUND_THREAD_COUNTERS
999
                {
1000
                    // Let the current background thread terminate as soon as
1001
                    // possible. No need to reschedule, as another LCO will set
1002
                    // it to pending and schedule it back eventually
1003
                    HPX_ASSERT(background_thread);
×
1004
                    HPX_ASSERT(background_running);
×
1005
                    *background_running = false;
×
1006
                    scheduler
×
1007
                        .SchedulingPolicy::decrement_background_thread_count();
×
1008
                    // Create a new one which will replace the current such we
1009
                    // avoid deadlock situations, if all background threads are
1010
                    // blocked.
1011
                    background_thread =
×
1012
                        create_background_thread(scheduler, params,
×
1013
                            background_running, num_thread, idle_loop_count);
×
1014
                }
×
1015
            }
4,384✔
1016
            else if (idle_loop_count > params.max_idle_loop_count_ || may_exit)
460,509,361✔
1017
            {
1018
                if (idle_loop_count > params.max_idle_loop_count_)
5,257✔
1019
                    idle_loop_count = 0;
515✔
1020

1021
                // call back into invoking context
1022
                if (!params.outer_.empty())
5,299✔
1023
                {
1024
                    params.outer_();
5,470✔
1025
                    context_storage = hpx::execution_base::this_thread::detail::
5,470✔
1026
                        get_agent_storage();
1027
                }
5,467✔
1028

1029
                // break if we were idling after 'may_exit'
1030
                if (may_exit)
5,467✔
1031
                {
1032
                    HPX_ASSERT(this_state.load(std::memory_order_relaxed) !=
4,952✔
1033
                        hpx::state::pre_sleep);
1034

1035
                    if (background_thread)
4,942✔
1036
                    {
1037
                        HPX_ASSERT(background_running);
597✔
1038
                        *background_running = false;
599✔
1039
                        auto priority = get_thread_id_data(background_thread)
1,190✔
1040
                                            ->get_priority();
599✔
1041

1042
                        scheduler.SchedulingPolicy::
599✔
1043
                            decrement_background_thread_count();
599✔
1044
                        scheduler.SchedulingPolicy::schedule_thread(
1,190✔
1045
                            HPX_MOVE(background_thread),
599✔
1046
                            threads::thread_schedule_hint(
599✔
1047
                                static_cast<std::int16_t>(num_thread)),
599✔
1048
                            true, priority);
599✔
1049
                        scheduler.SchedulingPolicy::do_some_work(num_thread);
599✔
1050

1051
                        background_thread = thread_id_type();
599✔
1052
                        background_running.reset();
599✔
1053
                    }
599✔
1054
                    else
1055
                    {
1056
                        bool can_exit = !running &&
8,532✔
1057
                            scheduler.SchedulingPolicy::cleanup_terminated(
4,228✔
1058
                                true) &&
4,238✔
1059
                            scheduler.SchedulingPolicy::get_thread_count(
8,476✔
1060
                                thread_schedule_state::suspended,
1061
                                thread_priority::default_, num_thread) == 0 &&
4,238✔
1062
                            scheduler.SchedulingPolicy::get_queue_length(
8,578✔
1063
                                num_thread) == 0;
4,289✔
1064

1065
                        if (can_exit)
4,188✔
1066
                        {
1067
                            this_state.store(hpx::state::stopped);
4,327✔
1068
                            break;
4,327✔
1069
                        }
1070
                    }
1071

1072
                    may_exit = false;
599✔
1073
                }
599✔
1074
                else
1075
                {
1076
                    scheduler.SchedulingPolicy::cleanup_terminated(true);
515✔
1077
                }
1078
            }
1,114✔
1079
        }
459,158,451✔
1080
    }
4,312✔
1081
}    // namespace hpx::threads::detail
1082

1083
// NOTE: This line only exists to please doxygen. Without the line doxygen
1084
// generates incomplete xml output.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc