• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.89
/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp
1
//  Copyright (c) 2007-2025 Hartmut Kaiser
2
//
3
//  SPDX-License-Identifier: BSL-1.0
4
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
5
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6

7
#pragma once
8

9
#include <hpx/config.hpp>
10
#include <hpx/assert.hpp>
11
#include <hpx/hardware/timestamp.hpp>
12
#include <hpx/modules/execution_base.hpp>
13
#include <hpx/modules/functional.hpp>
14
#include <hpx/modules/threading_base.hpp>
15
#include <hpx/thread_pools/detail/background_thread.hpp>
16
#include <hpx/thread_pools/detail/scheduling_callbacks.hpp>
17
#include <hpx/thread_pools/detail/scheduling_counters.hpp>
18
#include <hpx/thread_pools/detail/scheduling_log.hpp>
19

20
#if defined(HPX_HAVE_ITTNOTIFY) && HPX_HAVE_ITTNOTIFY != 0 &&                  \
21
    !defined(HPX_HAVE_APEX)
22
#include <hpx/modules/itt_notify.hpp>
23
#endif
24

25
#include <atomic>
26
#include <cstddef>
27
#include <cstdint>
28
#include <memory>
29
#include <utility>
30

31
namespace hpx::threads::detail {
32

33
    ///////////////////////////////////////////////////////////////////////
34
#ifdef HPX_HAVE_THREAD_IDLE_RATES
35
    struct idle_collect_rate
36
    {
37
        idle_collect_rate(
38
            std::int64_t& tfunc_time, std::int64_t& exec_time) noexcept
39
          : start_timestamp_(
40
                static_cast<std::int64_t>(util::hardware::timestamp()))
41
          , tfunc_time_(tfunc_time)
42
          , exec_time_(exec_time)
43
        {
44
        }
45

46
        void collect_exec_time(std::uint64_t timestamp) const noexcept
47
        {
48
            exec_time_ += static_cast<std::int64_t>(
49
                util::hardware::timestamp() - timestamp);
50
        }
51

52
        void take_snapshot() noexcept
53
        {
54
            if (tfunc_time_ == static_cast<std::int64_t>(-1))
55
            {
56
                start_timestamp_ =
57
                    static_cast<std::int64_t>(util::hardware::timestamp());
58
                tfunc_time_ = 0;
59
                exec_time_ = 0;
60
            }
61
            else
62
            {
63
                tfunc_time_ =
64
                    static_cast<std::int64_t>(util::hardware::timestamp()) -
65
                    start_timestamp_;
66
            }
67
        }
68

69
        std::int64_t start_timestamp_;
70
        std::int64_t& tfunc_time_;
71
        std::int64_t& exec_time_;
72
    };
73
#else
74
    struct idle_collect_rate
75
    {
76
        explicit constexpr idle_collect_rate(
77
            std::int64_t&, std::int64_t&) noexcept
78
        {
79
        }
80
    };
81
#endif
82

83
    ///////////////////////////////////////////////////////////////////////////
84
    template <typename SchedulingPolicy>
85
    void scheduling_loop(std::size_t num_thread, SchedulingPolicy& scheduler,
86
        scheduling_counters& counters, scheduling_callbacks& params)
87
    {
88
        std::atomic<hpx::state>& this_state = scheduler.get_state(num_thread);
89

90
#if defined(HPX_HAVE_ITTNOTIFY) && HPX_HAVE_ITTNOTIFY != 0 &&                  \
91
    !defined(HPX_HAVE_APEX)
92
        util::itt::stack_context ctx;    // helper for itt support
93
        util::itt::thread_domain const thread_domain;
94
        util::itt::id threadid(thread_domain, &scheduler);
95
        util::itt::string_handle const task_id("task_id");
96
        util::itt::string_handle const task_phase("task_phase");
97
        // util::itt::frame_context fctx(thread_domain);
98
#endif
99

100
        std::int64_t& idle_loop_count = counters.idle_loop_count_;
101
        std::int64_t& busy_loop_count = counters.busy_loop_count_;
102

103
        background_work_exec_time bg_work_exec_time_init(counters);
104

105
#ifdef HPX_HAVE_THREAD_IDLE_RATES
106
        idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_);
107
        auto tfunc_time_collector = hpx::experimental::scope_exit(
108
            [&idle_rate] { idle_rate.take_snapshot(); });
109
#endif
110

111
        // spin for some time after queues have become empty
112
        bool may_exit = false;
113

114
        std::shared_ptr<bool> background_running;
115
        thread_id_ref_type background_thread;
116
        bool const do_background_work =
117
            scheduler.has_scheduler_mode(
118
                policies::scheduler_mode::do_background_work, num_thread) &&
119
            num_thread < params.max_background_threads_ &&
120
            !params.background_.empty();
121

122
        if (do_background_work)
123
        {
124
            // do background work in parcel layer and in agas
125
            background_thread = create_background_thread(scheduler, num_thread,
126
                params, background_running, idle_loop_count);
127
        }
128

129
        hpx::execution_base::this_thread::detail::agent_storage*
130
            context_storage =
131
                hpx::execution_base::this_thread::detail::get_agent_storage();
132

133
        auto added = static_cast<std::size_t>(-1);
134
        thread_id_ref_type next_thrd;
135
        while (true)
136
        {
137
            thread_id_ref_type thrd;
138
            if (HPX_UNLIKELY(next_thrd))
139
            {
140
                thrd = HPX_MOVE(next_thrd);
141
                next_thrd = thread_id_ref_type();
142
            }
1,145,123✔
143

144
            // Get the next HPX thread from the queue
145
            bool running = this_state.load(std::memory_order_relaxed) <
146
                hpx::state::pre_sleep;
147

148
            // extract the stealing mode once per loop iteration (except during
149
            // shutdown)
150
            bool enable_stealing = !may_exit &&
151
                scheduler.has_scheduler_mode(
152
                    policies::scheduler_mode::enable_stealing, num_thread);
153

×
154
            // stealing staged threads is enabled if:
155
            // - fast idle mode is on: same as normal stealing
156
            // - fast idle mode off: only after normal stealing has failed for
157
            //                       a while
158
            bool enable_stealing_staged = enable_stealing;
159
            if (enable_stealing_staged &&
160
                !scheduler.has_scheduler_mode(
×
161
                    policies::scheduler_mode::fast_idle_mode, num_thread))
162
            {
163
                enable_stealing_staged = !may_exit &&
×
164
                    idle_loop_count > params.max_idle_loop_count_ / 2;
165
            }
166

167
            if (HPX_LIKELY(thrd ||
168
                    scheduler.get_next_thread(
169
                        num_thread, running, thrd, enable_stealing)))
170
            {
171
                HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() ==
172
                    &scheduler);
173

174
                idle_loop_count = 0;
×
175
                ++busy_loop_count;
×
176

177
                may_exit = false;
178

179
                // Only pending HPX threads will be executed. Any non-pending
180
                // HPX threads are leftovers from a set_state() call for a
181
                // previously pending HPX thread (see comments above).
182
                auto* thrdptr = get_thread_id_data(thrd);
183
                thread_state state = thrdptr->get_state();
184
                thread_schedule_state state_val = state.state();
185

×
186
                if (HPX_LIKELY(thread_schedule_state::pending == state_val))
×
187
                {
188
                    // switch the state of the thread to active and back to what
189
                    // the thread reports as its return value
×
190

×
191
                    {
192
                        // tries to set state to active (only if state is still
193
                        // the same as 'state')
194
                        detail::switch_status thrd_stat(thrd, state);
195
                        if (HPX_LIKELY(thrd_stat.is_valid() &&
196
                                thrd_stat.get_previous() ==
×
197
                                    thread_schedule_state::pending))
198
                        {
199
                            detail::write_state_log(scheduler, num_thread, thrd,
200
                                thrd_stat.get_previous(),
201
                                thread_schedule_state::active);
202

×
203
#ifdef HPX_HAVE_THREAD_IDLE_RATES
204
                            auto tfunc_time_collector_inner =
×
205
                                hpx::experimental::scope_exit([&idle_rate] {
×
206
                                    idle_rate.take_snapshot();
207
                                });
208
#endif
209
                            // thread returns new required state store the
210
                            // returned state in the thread
211
                            {
212
                                counters.is_active_ = true;
×
213
                                auto utilization =
214
                                    hpx::experimental::scope_exit(
215
                                        [&is_active = counters.is_active_] {
216
                                            is_active = false;
217
                                        });
×
218

219
#if defined(HPX_HAVE_ITTNOTIFY) && HPX_HAVE_ITTNOTIFY != 0 &&                  \
220
    !defined(HPX_HAVE_APEX)
221
                                util::itt::caller_context cctx(
222
                                    ctx, !thrdptr->is_stackless());
223
                                // util::itt::undo_frame_context undoframe(fctx);
224
                                util::itt::task task =
225
                                    thrdptr->get_description().get_task_itt(
226
                                        thread_domain);
×
227
                                task.add_metadata(task_id, thrdptr);
228
                                task.add_metadata(
229
                                    task_phase, thrdptr->get_thread_phase());
230
#endif
×
231
#ifdef HPX_HAVE_THREAD_IDLE_RATES
×
232
                                // Record time elapsed in thread changing state
233
                                // and add to aggregate execution time.
234
                                auto exec_time_collector =
×
235
                                    hpx::experimental::scope_exit(
236
                                        [&idle_rate,
237
                                            ts = util::hardware::timestamp()] {
238
                                            idle_rate.collect_exec_time(ts);
239
                                        });
240
#endif
241
#if defined(HPX_HAVE_APEX)
×
242
                                // get the APEX data pointer, in case we are
×
243
                                // resuming the thread and have to restore any
244
                                // leaf timers from direct actions, etc.
245

246
                                // the address of tmp_data is getting stored by
247
                                // APEX during this call
248
                                util::external_timer::scoped_timer profiler(
249
                                    thrdptr->get_timer_data());
250

251
                                thrd_stat = (*thrdptr)(context_storage);
252

253
                                thread_schedule_state s =
×
254
                                    thrd_stat.get_previous();
255
                                if (s == thread_schedule_state::terminated ||
256
                                    s == thread_schedule_state::deleted)
257
                                {
258
                                    profiler.stop();
259
                                    // just in case, clean up the now dead pointer.
260
                                    thrdptr->set_timer_data(nullptr);
261
                                }
262
                                else
×
263
                                {
264
                                    profiler.yield();
265
                                }
266
#else
×
267
                                thrd_stat = (*thrdptr)(context_storage);
268
#endif
269
                            }
270

271
                            detail::write_state_log(scheduler, num_thread, thrd,
272
                                thread_schedule_state::active,
273
                                thrd_stat.get_previous());
274

275
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
276
                            ++counters.executed_thread_phases_;
×
277
#endif
278
                        }
279
                        else
280
                        {
281
                            // some other worker-thread got in between and
282
                            // started executing this HPX-thread, we just
283
                            // continue with the next one
284
                            thrd_stat.disable_restore();
285
                            detail::write_state_log_warning(scheduler,
286
                                num_thread, thrd, state_val, "no execution");
287
                            continue;
288
                        }
289

290
                        // store and retrieve the new state in the thread
291
                        if (HPX_UNLIKELY(!thrd_stat.store_state(state)))
292
                        {
293
                            // some other worker-thread got in between and
294
                            // changed the state of this thread, we just
295
                            // continue with the next one
296
                            detail::write_state_log_warning(scheduler,
297
                                num_thread, thrd, state_val, "no state change");
298
                            continue;
299
                        }
300

301
                        state_val = state.state();
302

303
                        // any exception thrown from the thread will reset its
304
                        // state at this point
305

306
                        // handle next thread id if given (switch directly to
307
                        // this thread)
308
                        next_thrd = thrd_stat.move_next_thread();
309
                    }
310

311
                    // Re-add this work item to our list of work items if the
312
                    // HPX thread should be re-scheduled. If the HPX thread is
313
                    // suspended now we just keep it in the map of threads.
314
                    if (HPX_UNLIKELY(
315
                            state_val == thread_schedule_state::pending))
316
                    {
317
                        if (HPX_LIKELY(next_thrd == nullptr))
318
                        {
319
                            // schedule other work
×
320
                            scheduler.wait_or_add_new(num_thread, running,
321
                                idle_loop_count, enable_stealing_staged, added);
322
                        }
323

×
324
                        // schedule this thread again, make sure it ends up at
325
                        // the end of the queue
326
                        auto priority = thrdptr->get_priority();
327
                        scheduler.SchedulingPolicy::schedule_thread_last(
328
                            HPX_MOVE(thrd),
×
329
                            threads::thread_schedule_hint(
330
                                static_cast<std::int16_t>(num_thread)),
331
                            priority != threads::thread_priority::bound,
332
                            priority);
333

334
                        scheduler.SchedulingPolicy::do_some_work(num_thread);
335
                    }
336
                    else if (HPX_UNLIKELY(state_val ==
337
                                 thread_schedule_state::pending_boost))
×
338
                    {
339
                        [[maybe_unused]] auto oldstate =
×
340
                            thrdptr->set_state(thread_schedule_state::pending);
341

342
                        if (HPX_LIKELY(!next_thrd))
343
                        {
×
344
                            // reschedule this thread right away if the
345
                            // background work will be triggered
346
                            if (HPX_UNLIKELY(busy_loop_count >
347
                                    params.max_busy_loop_count_))
348
                            {
×
349
                                next_thrd = HPX_MOVE(thrd);
350
                            }
×
351
                            else
352
                            {
353
                                // schedule other work
354
                                scheduler.wait_or_add_new(num_thread, running,
355
                                    idle_loop_count, enable_stealing_staged,
356
                                    added);
357

358
                                // schedule this thread again immediately with
359
                                // boosted priority
360
                                scheduler.SchedulingPolicy::schedule_thread(
361
                                    HPX_MOVE(thrd),
×
362
                                    threads::thread_schedule_hint(
363
                                        static_cast<std::int16_t>(num_thread)),
364
                                    true, thread_priority::boost);
365

366
                                scheduler.SchedulingPolicy::do_some_work(
×
367
                                    num_thread);
368
                            }
369
                        }
×
370
                        else if (HPX_LIKELY(next_thrd != thrd))
371
                        {
372
                            // schedule this thread again immediately with
×
373
                            // boosted priority
374
                            scheduler.SchedulingPolicy::schedule_thread(
375
                                HPX_MOVE(thrd),
376
                                threads::thread_schedule_hint(
377
                                    static_cast<std::int16_t>(num_thread)),
378
                                true, thread_priority::boost);
×
379

380
                            scheduler.SchedulingPolicy::do_some_work(
381
                                num_thread);
382
                        }
383
                    }
×
384
                }
385
                else if (HPX_UNLIKELY(
×
386
                             thread_schedule_state::active == state_val &&
387
                             !get_thread_id_data(thrd)->runs_as_child()))
388
                {
×
389
                    write_rescheduling_log_warning(scheduler, num_thread, thrd);
390

×
391
                    // re-schedule thread, if it is still marked as active this
392
                    // might happen, if some thread has been added to the
393
                    // scheduler queue already but the state has not been reset
394
                    // yet
×
395
                    auto priority = thrdptr->get_priority();
396
                    scheduler.SchedulingPolicy::schedule_thread(HPX_MOVE(thrd),
397
                        threads::thread_schedule_hint(
398
                            static_cast<std::int16_t>(num_thread)),
399
                        priority != threads::thread_priority::bound, priority);
400

401
                    scheduler.SchedulingPolicy::do_some_work(num_thread);
402
                }
×
403

404
                // Remove the mapping from thread_map_ if HPX thread is deleted
405
                // or terminated, this will delete the HPX thread.
406
                if (HPX_LIKELY(state_val == thread_schedule_state::deleted ||
407
                        state_val == thread_schedule_state::terminated))
408
                {
×
409
#ifdef HPX_HAVE_THREAD_CUMULATIVE_COUNTS
410
                    ++counters.executed_threads_;
411
#endif
412
                    HPX_ASSERT(!thrdptr->runs_as_child());
413
                    thrd = thread_id_type();
×
414
                }
415
            }
416

417
            // if nothing else has to be done either wait or terminate
×
418
            else
419
            {
420
                ++idle_loop_count;
421

×
422
                next_thrd = thread_id_ref_type();
423
                if (scheduler.wait_or_add_new(num_thread, running,
424
                        idle_loop_count, enable_stealing_staged, added,
425
                        &next_thrd))
426
                {
×
427
                    // Clean up terminated threads before trying to exit
428
                    bool can_exit = !running &&
429
                        scheduler.SchedulingPolicy::cleanup_terminated(
430
                            num_thread, true) &&
431
                        scheduler.SchedulingPolicy::get_queue_length(
×
432
                            num_thread) == 0;
433

434
                    if (this_state.load(std::memory_order_relaxed) ==
435
                        hpx::state::pre_sleep)
×
436
                    {
437
                        if (can_exit)
438
                        {
439
                            scheduler.SchedulingPolicy::suspend(num_thread);
440
                        }
441
                    }
442
                    else
×
443
                    {
444
                        can_exit = can_exit &&
445
                            scheduler.SchedulingPolicy::get_thread_count(
446
                                thread_schedule_state::suspended,
×
447
                                thread_priority::default_, num_thread) == 0;
448

449
                        if (can_exit)
450
                        {
451
                            if (!scheduler.has_scheduler_mode(
×
452
                                    policies::scheduler_mode::delay_exit,
453
                                    num_thread))
454
                            {
455
                                // If this is an inner scheduler, try to exit
×
456
                                // immediately
457
                                if (background_thread != nullptr)
458
                                {
459
                                    HPX_ASSERT(background_running);
460
                                    *background_running = false;    //-V522
461

462
                                    // do background work in parcel layer and in
463
                                    // agas
464
                                    [[maybe_unused]] bool const has_exited =
465
                                        call_background_thread(
×
466
                                            background_thread, next_thrd,
467
                                            scheduler, num_thread,
468
                                            bg_work_exec_time_init,
×
469
                                            context_storage);
470

471
                                    // the background thread should have exited
472
                                    HPX_ASSERT(has_exited);
473

×
474
                                    background_thread.reset();
139✔
475
                                    background_running.reset();
×
476
                                }
×
477
                                else
478
                                {
479
                                    this_state.store(hpx::state::stopped);
×
480
                                    break;
481
                                }
482
                            }
×
483
                            else
484
                            {
×
485
                                // Otherwise, keep idling for some time
486
                                if (!may_exit)
487
                                    idle_loop_count = 0;
488
                                may_exit = true;
489
                            }
×
490
                        }
×
491
                    }
492
                }
493
                else if (!may_exit && added == 0 &&
494
                    (scheduler.has_scheduler_mode(
495
                        policies::scheduler_mode::fast_idle_mode, num_thread)))
496
                {
×
497
                    // speed up idle suspend if no work was stolen
498
                    idle_loop_count += params.max_idle_loop_count_ / 1024;
499
                    added = static_cast<std::size_t>(-1);
500
                }
501

×
502
                // if stealing yielded a new task, run it first
503
                if (next_thrd != nullptr)
504
                {
×
505
                    continue;
506
                }
507

508
                if (do_background_work)
×
509
                {
510
                    // do background work in parcel layer and in agas
511
                    call_and_create_background_thread(background_thread,
512
                        next_thrd, scheduler, num_thread,
513
                        bg_work_exec_time_init, context_storage, params,
514
                        background_running, idle_loop_count);
515
                }
516

517
                // call back into invoking context
518
                if (!params.inner_.empty())
519
                {
520
                    params.inner_();
521
                    context_storage = hpx::execution_base::this_thread::detail::
522
                        get_agent_storage();
523
                }
524
            }
525

526
            // something went badly wrong, give up
527
            if (HPX_UNLIKELY(this_state.load(std::memory_order_relaxed) ==
528
                    hpx::state::terminating))
529
            {
×
530
                break;
×
531
            }
532

533
            if (busy_loop_count > params.max_busy_loop_count_)
534
            {
535
                busy_loop_count = 0;
536

×
537
                if (do_background_work)
538
                {
539
                    // do background work in parcel layer and in agas
540
                    call_and_create_background_thread(background_thread,
541
                        next_thrd, scheduler, num_thread,
×
542
                        bg_work_exec_time_init, context_storage, params,
×
543
                        background_running, idle_loop_count);
544
                }
545
            }
546
            else if (idle_loop_count > params.max_idle_loop_count_)
×
547
            {
548
                // call back into invoking context
×
549
                if (!params.outer_.empty() && params.outer_())
550
                {
551
                    idle_loop_count = 0;
×
552
                    context_storage = hpx::execution_base::this_thread::detail::
553
                        get_agent_storage();
554
                }
×
555

556
                scheduler.SchedulingPolicy::cleanup_terminated(true);
557
            }
558
            else if (may_exit)
559
            {
560
                if (idle_loop_count > params.max_idle_loop_count_)
561
                    idle_loop_count = 0;
×
562

563
                // call back into invoking context
564
                if (!params.outer_.empty() && params.outer_())
565
                {
×
566
                    context_storage = hpx::execution_base::this_thread::detail::
567
                        get_agent_storage();
568
                }
569

×
570
                // break if we were idling after 'may_exit'
571
                HPX_ASSERT(this_state.load(std::memory_order_relaxed) !=
572
                    hpx::state::pre_sleep);
×
573

574
                if (background_thread)
575
                {
576
                    HPX_ASSERT(background_running);
×
577
                    *background_running = false;
578

579
                    // do background work in parcel layer and in agas
580
                    [[maybe_unused]] bool const has_exited =
581
                        call_background_thread(background_thread, next_thrd,
582
                            scheduler, num_thread, bg_work_exec_time_init,
×
583
                            context_storage);
584

×
585
                    // the background thread should have exited
586
                    HPX_ASSERT(has_exited);
×
587

588
                    background_thread.reset();
589
                    background_running.reset();
×
590
                }
591
                else
592
                {
593
                    bool const can_exit = !running &&
594
                        scheduler.SchedulingPolicy::cleanup_terminated(true) &&
595
                        scheduler.SchedulingPolicy::get_thread_count(
×
596
                            thread_schedule_state::suspended,
597
                            thread_priority::default_, num_thread) == 0 &&
×
598
                        scheduler.SchedulingPolicy::get_queue_length(
×
599
                            num_thread) == 0;
600

601
                    if (can_exit)
×
602
                    {
603
                        this_state.store(hpx::state::stopped);
604
                        break;
605
                    }
×
606
                }
607

608
                may_exit = false;
609
            }
×
610

611
#if defined(HPX_HAVE_MODULE_ASYNC_MPI) ||                                      \
612
    defined(HPX_HAVE_MODULE_ASYNC_CUDA) || defined(HPX_HAVE_MODULE_ASYNC_SYCL)
613
            if (scheduler.custom_polling_function() ==
614
                policies::detail::polling_status::busy)
×
615
            {
616
                idle_loop_count = 0;
617
            }
×
618
#endif
619
        }
620
    }
621
}    // namespace hpx::threads::detail
×
622

623
// NOTE: This line only exists to please doxygen. Without the line doxygen
624
// generates incomplete xml output.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc