• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28356348514

29 Jun 2026 07:40AM UTC coverage: 52.174% (-0.1%) from 52.278%
28356348514

Pull #83

github

web-flow
Merge 278203630 into 2efed6649
Pull Request #83: refactor and improve code QoL

37276 of 92891 branches covered (40.13%)

Branch coverage included in aggregate %.

671 of 1173 new or added lines in 58 files covered. (57.2%)

66 existing lines in 30 files now uncovered.

33619 of 42991 relevant lines covered (78.2%)

20387.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.88
/src/dftracer/utils/core/pipeline/executor.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/platform_compat.h>
3
#include <dftracer/utils/core/coro/yield.h>
4
#include <dftracer/utils/core/io/io_backend_factory.h>
5
#include <dftracer/utils/core/pipeline/executor.h>
6
#include <dftracer/utils/core/tasks/coro_scope.h>
7
#include <dftracer/utils/core/tasks/task.h>
8
#include <dftracer/utils/core/utilities/monitor.h>
9

10
#include <chrono>
11
#include <coroutine>
12
#include <exception>
13
#include <vector>
14

15
namespace dftracer::utils {
16

17
static thread_local void* tls_current_worker_context = nullptr;
18

19
void* get_current_worker_context() { return tls_current_worker_context; }
3,114✔
20

21
void set_current_worker_context(void* context) {
6,619✔
22
    tls_current_worker_context = context;
6,619✔
23
}
6,619✔
24

25
static thread_local Executor* tls_current_executor = nullptr;
26

27
Executor* Executor::current() noexcept { return tls_current_executor; }
717,781✔
28

29
Executor* Executor::set_current(Executor* e) noexcept {
1,429,034✔
30
    auto* old = tls_current_executor;
1,429,034✔
31
    tls_current_executor = e;
1,429,034✔
32
    return old;
1,429,034✔
33
}
34

35
// Thread-local list of coroutine handles to destroy after the current
36
// resume() returns.  FinalAwaiter pushes here instead of the shared
37
// destroy_queue_ to avoid another worker freeing the frame while
38
// the coroutine-suspend machinery is still accessing it.
39
static thread_local std::vector<std::coroutine_handle<>> tls_pending_destroys;
40

41
void schedule_thread_local_destroy(std::coroutine_handle<> h) {
10,323✔
42
    tls_pending_destroys.push_back(h);
10,323✔
43
}
10,205✔
44

45
void drain_thread_local_destroys() {
36,852✔
46
    Executor* exec = Executor::current();
36,852✔
47
    for (auto h : tls_pending_destroys) {
47,192✔
48
        if (!h) continue;
10,398✔
49
        if (exec) {
10,343!
50
            exec->schedule_destroy(h);
10,343!
51
        } else {
5,074✔
52
            h.destroy();
×
53
        }
54
    }
55
    tls_pending_destroys.clear();
36,787✔
56
}
36,678✔
57

58
Executor::Executor(const ExecutorConfig& config)
2,710!
59
    : num_threads_(config.num_threads == 0
1,626!
60
                       ? dftracer_utils_hardware_concurrency()
542!
61
                       : config.num_threads),
542✔
62
      last_activity_ns_(
1,626✔
63
          std::chrono::steady_clock::now().time_since_epoch().count()),
1,084!
64
      idle_timeout_(config.idle_timeout),
1,084✔
65
      deadlock_timeout_(config.deadlock_timeout),
1,084✔
66
      io_pool_size_(config.io_pool_size == 0
2,074✔
67
                        ? dftracer_utils_hardware_concurrency()
990✔
68
                        : config.io_pool_size),
94✔
69
      io_backend_type_(config.io_backend_type),
1,084✔
70
      io_batch_threshold_(config.io_batch_threshold) {
2,710!
71
    if (num_threads_ == 0) {
1,084✔
72
        num_threads_ = 2;
×
73
    }
74
    if (io_pool_size_ == 0) {
1,084✔
75
        io_pool_size_ = 2;
×
76
    }
77
#ifdef DFTRACER_UTILS_VALGRIND_MODE
78
    // Per-Executor thread churn dominates runtime when Valgrind serializes and
79
    // instruments every thread, so cap the pools.
80
    if (num_threads_ > 2) num_threads_ = 2;
81
    if (io_pool_size_ > 2) io_pool_size_ = 2;
82
#endif
83
    DFTRACER_UTILS_LOG_DEBUG(
1,084!
84
        "Executor created with %zu threads, idle_timeout=%lld s, "
85
        "deadlock_timeout=%lld s",
86
        num_threads_, idle_timeout_.count(), deadlock_timeout_.count());
87
}
1,626✔
88

89
Executor::~Executor() {
1,626✔
90
    shutdown();
1,084!
91
    drain_destroy_queue();
1,084!
92
}
1,626✔
93

94
void Executor::start() {
1,074✔
95
    if (running_) {
1,074✔
96
        DFTRACER_UTILS_LOG_WARN("%s", "Executor already running");
×
97
        return;
×
98
    }
99

100
    running_ = true;
1,074✔
101
    workers_.clear();
1,074✔
102
    workers_.reserve(num_threads_);
1,074✔
103

104
    timer_service_.start();
1,074✔
105

106
    // Create and start I/O backend before workers so workers
107
    // can use it immediately.
108
    io_backend_ = io::create_io_backend(*this, io_pool_size_, io_backend_type_,
2,148✔
109
                                        io_batch_threshold_);
1,074✔
110
    io_backend_->start();
1,074✔
111

112
    // Create all worker contexts first so workers_ is stable before any
113
    // worker thread can try to iterate/steal from it.
114
    for (std::size_t i = 0; i < num_threads_; ++i) {
4,390✔
115
        auto worker = std::make_unique<WorkerContext>(i);
3,316!
116
        worker->last_activity = std::chrono::steady_clock::now();
3,316✔
117
        workers_.push_back(std::move(worker));
3,316!
118
    }
3,316✔
119

120
    // Start worker threads after all contexts are in place.
121
    for (auto& worker : workers_) {
4,390✔
122
        worker->thread =
3,316✔
123
            std::thread(&Executor::worker_thread, this, worker.get());
5,035!
124
    }
125

126
    DFTRACER_UTILS_LOG_DEBUG("Executor started with %zu worker threads",
1,074!
127
                             num_threads_);
128
}
537✔
129

130
void Executor::shutdown() {
2,768✔
131
    if (!running_) {
2,768✔
132
        return;
1,694✔
133
    }
134

135
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutting down executor");
1,074!
136
    running_ = false;
1,074✔
137
    wake_all_workers();
1,074✔
138

139
    // Join all worker threads (must happen before io_backend_ is
140
    // destroyed, since workers call io_backend_->poll() when idle).
141
    for (auto& worker : workers_) {
4,390✔
142
        if (worker->thread.joinable()) {
3,316✔
143
            worker->thread.join();
3,316!
144
        }
1,597✔
145
    }
146

147
    // Stop I/O backend AFTER joining workers (workers may poll the
148
    // backend) but BEFORE clearing workers_.  The I/O backend's
149
    // completion thread may still call enqueue() -> wake_all_workers()
150
    // which accesses WorkerContext cv/mutex, so workers_ must remain
151
    // alive until the completion thread has exited.
152
    if (io_backend_) {
1,074✔
153
        io_backend_->stop();
1,074✔
154
        io_backend_.reset();
1,074✔
155
    }
537✔
156

157
    // Destroy deferred frames and orphaned run-queue entries BEFORE
158
    // clearing workers_. Frames may hold shared_ptr<Channel> whose
159
    // ConcurrentQueue has TLS producer tokens tied to worker threads.
160
    drain_destroy_queue();
1,074✔
161
    {
162
        RunQueueEntry orphan;
1,074✔
163
        while (run_queue_.try_dequeue(orphan)) {
1,102✔
164
            if (orphan.handle) {
28✔
165
                orphan.handle.destroy();
28!
166
            }
13✔
167
        }
168
    }
169

170
    workers_.clear();
1,074✔
171
    timer_service_.stop();
1,074✔
172

173
    // Drain the main thread's thread-local destroy list (for
174
    // coroutines whose FinalAwaiter ran on the main thread).
175
    drain_thread_local_destroys();
1,074✔
176

177
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor shutdown complete");
1,074!
178
}
1,384✔
179

180
void Executor::reset() {
×
181
    // Queue will be reset by caller if needed
UNCOV
182
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor reset");
×
183
}
×
184

185
void Executor::set_completion_callback(CompletionCallback callback) {
634✔
186
    completion_callback_ = std::move(callback);
634✔
187
}
634✔
188

189
void Executor::worker_thread(WorkerContext* context) {
3,313✔
190
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu started", context->worker_id);
3,313!
191

192
    set_current_worker_context(context);
3,313✔
193
    tls_current_executor = this;
3,313✔
194
    coro::reset_timeslice();
3,313✔
195

196
    // Fixed for the process lifetime; read once to keep the resume loop cheap.
197
    const bool monitor = utilities::monitoring_enabled();
3,313✔
198
    if (monitor) {
3,313✔
NEW
199
        utilities::monitor_set_worker(static_cast<int>(context->worker_id));
×
200
    }
201

202
    while (running_) {
87,355✔
203
        RunQueueEntry pending_entry;
83,927✔
204

205
        // Snapshot the work signal BEFORE checking any queues.
206
        // This ensures that any signal increment (from enqueue +
207
        // signal_global_work) that happens AFTER this load will be detected by
208
        // the wait predicate below, even if the actual queue check sees the
209
        // queue as empty. Loading it inside the else branch (after queue
210
        // checks) creates a race: work can arrive between the queue check and
211
        // the signal load, causing the worker to sleep with the updated signal
212
        // value while work sits in the queue.
213
        const std::uint64_t observed_signal =
36,162✔
214
            work_signal_.load(std::memory_order_acquire);
83,927✔
215

216
        // Run queue: coroutine handles from enqueue() and
217
        // schedule_coroutine_resumption().
218
        if (run_queue_.try_dequeue(pending_entry)) {
83,821✔
219
            coro::reset_timeslice();
32,471✔
220
            context->is_idle.store(false, std::memory_order_relaxed);
32,454✔
221
            std::coroutine_handle<> pending_resume = pending_entry.handle;
32,540✔
222
            if (pending_resume && !pending_resume.done()) {
32,540✔
223
                // Id comes from the entry, not the promise: foreign promise
224
                // types have no task_id field.
225
                TaskIndex tid = pending_entry.task_id;
32,508✔
226
                if (tid >= 0) {
32,508✔
227
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
228
                    auto it = task_registry_.find(tid);
×
229
                    if (it != task_registry_.end() &&
×
230
                        it->second.state == TaskInfo::QUEUED) {
×
231
                        it->second.state = TaskInfo::RUNNING;
×
232
                        it->second.started_at =
×
233
                            std::chrono::steady_clock::now();
×
234
                        it->second.worker_id = context->worker_id;
×
235
                        it->second.location = TaskInfo::EXECUTING;
×
236
                        ++tasks_started_;
×
237
                    }
238
                }
×
239
                DFTRACER_TSAN_ACQUIRE(pending_resume.address());
240
                if (monitor) {
32,508✔
NEW
241
                    utilities::monitor_resume_begin(pending_entry.monitor_id);
×
242
                }
243
                pending_resume.resume();
32,508!
244
                if (monitor) {
32,521✔
NEW
245
                    utilities::monitor_resume_end(pending_resume.address(),
×
NEW
246
                                                  pending_resume.done());
×
247
                }
248
            }
15,759✔
249
            // Destroy coroutine frames that FinalAwaiter deferred to this
250
            // thread.  Safe: resume() has fully returned, so the frame
251
            // is suspended at final_suspend and no code references it.
252
            drain_thread_local_destroys();
32,589!
253
            drain_destroy_queue();
32,452!
254
        }
15,824✔
255
        // No work available -- sleep until signaled.
256
        else {
257
            context->is_idle.store(true, std::memory_order_relaxed);
51,422✔
258
            // Flush any batched I/O operations before sleeping.
259
            // This ensures pending SQEs are submitted even when no
260
            // new work is arriving (drain trigger).
261
            if (io_backend_) {
51,644✔
262
                io_backend_->flush();
51,411!
263
            }
20,320✔
264
            // Opportunistic I/O polling before sleeping.
265
            // If the backend has completions, they'll enqueue new
266
            // work, so skip the sleep and retry the run queue.
267
            if (io_backend_) {
51,837✔
268
                auto reaped = io_backend_->poll(0);
51,841!
269
                if (reaped > 0) {
51,816!
270
                    drain_destroy_queue();
×
271
                    continue;
×
272
                }
273
            }
20,402✔
274
            drain_destroy_queue();
51,815!
275
            work_signal_.wait(observed_signal, std::memory_order_acquire);
51,767✔
276
        }
277
    }
278

279
    // Final drain: destroy any frames deferred during the last resume().
280
    drain_thread_local_destroys();
3,146✔
281

282
    tls_current_executor = nullptr;
3,301✔
283
    set_current_worker_context(nullptr);
3,301✔
284

285
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu terminated", context->worker_id);
3,300!
286
}
3,291✔
287

288
void Executor::notify_completion(std::shared_ptr<Task> task) {
1,546✔
289
    // No mutex needed -- the callback is set exactly once during Scheduler
290
    // construction (before any task is submitted) and never modified after.
291
    // on_task_completed uses only atomics, lock-free queues, and properly-
292
    // locked shard mutexes, so concurrent calls from multiple workers are safe.
293
    if (completion_callback_) {
1,546✔
294
        completion_callback_(task);
1,545!
295
    }
774✔
296
}
1,545✔
297

298
void Executor::request_shutdown() {
10✔
299
    if (shutdown_requested_.load()) {
10!
300
        return;  // Already requested
×
301
    }
302

303
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutdown requested for executor");
10!
304
    shutdown_requested_ = true;
10✔
305
}
5✔
306

307
void Executor::schedule_coroutine_resumption(std::coroutine_handle<> handle) {
12,887✔
308
    // Delegate to enqueue() -- unified path for all coroutine handles.
309
    enqueue(handle);
12,887✔
310
}
12,890✔
311

312
void Executor::signal_global_work() {
32,624✔
313
    work_signal_.fetch_add(1, std::memory_order_acq_rel);
32,624✔
314
    // Wake one worker, not all. Each enqueue adds one unit of work,
315
    // so one worker is sufficient. Avoids thundering herd where all
316
    // N threads wake, N-1 find no work, and go back to sleep.
317
    wake_one_worker();
32,624✔
318
}
32,624✔
319

320
void Executor::wake_one_worker() { work_signal_.notify_one(); }
32,688✔
321

322
void Executor::wake_all_workers() {
1,074✔
323
    work_signal_.fetch_add(1, std::memory_order_release);
1,074✔
324
    work_signal_.notify_all();
1,074✔
325
}
1,074✔
326

327
// Helper function for when_all.h (avoids circular dependency)
328
void schedule_coroutine_resumption_helper(Executor* executor,
12,882✔
329
                                          std::coroutine_handle<> handle) {
330
    if (executor) {
12,882✔
331
        executor->schedule_coroutine_resumption(handle);
12,883✔
332
    }
6,035✔
333
}
12,887✔
334

335
// Helper function for when_any.h (avoids circular dependency)
336
void schedule_destroy_helper(Executor* executor,
13✔
337
                             std::coroutine_handle<> handle) {
338
    if (executor && handle) {
13!
339
        executor->schedule_destroy(handle);
13✔
340
    }
7✔
341
}
13✔
342

343
void Executor::enqueue(std::coroutine_handle<> handle, TaskIndex task_id) {
32,655✔
344
    if (!handle || handle.done()) {
32,655!
345
        return;  // Invalid or already completed
18✔
346
    }
347

348
    long long monitor_id = -1;
32,639✔
349
    if (utilities::monitoring_enabled()) {
32,639✔
350
        // task_id >= 0 marks a tracked submitted task; otherwise it is spawn
351
        // fan-out (or a re-enqueue of an already-seen coroutine).
NEW
352
        monitor_id = utilities::monitor_enqueue(
×
NEW
353
            handle.address(), task_id >= 0 ? utilities::CoroKind::Task
×
354
                                           : utilities::CoroKind::Spawn);
355
    }
356
    DFTRACER_TSAN_RELEASE(handle.address());
357
    run_queue_.enqueue(RunQueueEntry{handle, task_id, monitor_id});
32,638!
358
    signal_global_work();
32,643✔
359
}
15,789✔
360

361
bool Executor::is_responsive() const {
4✔
362
    // If shutdown was requested, consider unresponsive
363
    if (shutdown_requested_.load()) {
4!
364
        return false;
×
365
    }
366

367
    // If not running, not responsive
368
    if (!running_.load()) {
4✔
369
        return false;
×
370
    }
371

372
    // Check if all threads might be deadlocked
373
    // (all threads busy but no progress for a while)
374
    std::size_t started = tasks_started_.load();
4✔
375
    std::size_t completed = tasks_completed_.load();
4✔
376
    std::size_t active = started - completed;
4✔
377

378
    if (active >= num_threads_) {
4✔
379
        // All threads busy - check if making progress
380
        auto now = std::chrono::steady_clock::now();
2✔
381
        auto last_ns = last_activity_ns_.load(std::memory_order_acquire);
2✔
382
        auto last_tp = std::chrono::steady_clock::time_point(
1✔
383
            std::chrono::steady_clock::duration(last_ns));
2✔
384
        auto idle_time = now - last_tp;
2!
385

386
        // If all threads busy but no activity for deadlock_timeout,
387
        // likely deadlocked
388
        if (idle_time > deadlock_timeout_) {
2!
389
            DFTRACER_UTILS_LOG_WARN(
×
390
                "Executor appears deadlocked: %zu threads, %zu active "
391
                "tasks, idle for %lld ms",
392
                num_threads_, active,
393
                std::chrono::duration_cast<std::chrono::milliseconds>(idle_time)
394
                    .count());
395
            return false;
×
396
        }
397
    }
1✔
398

399
    return true;
4✔
400
}
2✔
401

402
void Executor::mark_activity() {
3,088✔
403
    last_activity_ns_.store(
4,624✔
404
        std::chrono::steady_clock::now().time_since_epoch().count(),
3,088✔
405
        std::memory_order_release);
406
}
3,077✔
407

408
void Executor::update_task_location(TaskIndex task_id,
×
409
                                    TaskInfo::Location location,
410
                                    std::size_t worker_id) {
411
    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
412
    auto it = task_registry_.find(task_id);
×
413
    if (it != task_registry_.end()) {
×
414
        it->second.location = location;
×
415
        if (location == TaskInfo::LOCAL_QUEUE ||
×
416
            location == TaskInfo::EXECUTING) {
417
            it->second.worker_id = worker_id;
×
418
        }
419
    }
420
}
×
421

422
ExecutorProgress Executor::get_progress() const {
42✔
423
    std::shared_lock<std::shared_mutex> lock(registry_mutex_);
42!
424
    ExecutorProgress progress;
42✔
425

426
    // Overall stats
427
    progress.total_tasks_submitted = total_tasks_submitted_.load();
42✔
428
    progress.tasks_completed = tasks_completed_.load();
42✔
429

430
    // Count task states
431
    progress.tasks_queued = 0;
42✔
432
    progress.tasks_running = 0;
42✔
433
    progress.tasks_failed = 0;
42✔
434

435
    for (const auto& [task_id, info] : task_registry_) {
138!
436
        switch (info.state) {
96!
437
            case TaskInfo::QUEUED:
1✔
438
                progress.tasks_queued++;
2✔
439
                break;
2✔
440
            case TaskInfo::RUNNING:
441
            case TaskInfo::WAITING:
442
                progress.tasks_running++;
×
443
                break;
×
444
            case TaskInfo::COMPLETED:
47✔
445
                // Already counted
446
                break;
94✔
447
            case TaskInfo::FAILED:
448
                progress.tasks_failed++;
×
449
                break;
×
450
        }
451

452
        if (info.state == TaskInfo::FAILED && !info.error_message.empty()) {
96!
453
            progress.recent_errors.push_back({task_id, info.error_message});
×
454
        }
455
    }
456

457
    for (std::size_t i = 0; i < workers_.size(); ++i) {
118✔
458
        // No per-worker local queues anymore; report 0.
459
        progress.worker_queue_depths.push_back(0);
76!
460
    }
38✔
461

462
    // Build task trees (find root tasks)
463
    std::unordered_set<TaskIndex> processed;
63✔
464
    for (const auto& [task_id, info] : task_registry_) {
138!
465
        if (info.parent_task_id == -1) {  // Root task
96✔
466
            auto task_progress = build_task_progress_tree(task_id, processed);
92!
467
            progress.root_tasks.push_back(task_progress);
92✔
468
        }
92✔
469
    }
470

471
    // Worker states
472
    for (const auto& worker : workers_) {
118✔
473
        ExecutorProgress::WorkerStatus status;
76✔
474
        status.worker_id = worker->worker_id;
76✔
475
        status.is_idle = worker->is_idle.load();
76✔
476

477
        TaskIndex current_id = worker->current_task_id.load();
76✔
478
        if (current_id != -1) {
76✔
479
            status.current_task_id = current_id;
6!
480
            std::lock_guard<std::mutex> name_lock(worker->task_name_mutex);
6!
481
            status.current_task_name = worker->current_task_name;
6!
482
        }
6✔
483

484
        status.local_queue_depth = 0;
76✔
485

486
        progress.workers.push_back(status);
76!
487
    }
76✔
488

489
    return progress;
63✔
490
}
42!
491

492
TaskProgress Executor::build_task_progress_tree(
96✔
493
    TaskIndex task_id, std::unordered_set<TaskIndex>& processed) const {
494
    TaskProgress progress;
96✔
495

496
    if (processed.count(task_id)) {
96!
497
        // Avoid cycles
498
        progress.task_id = task_id;
×
499
        progress.name = "[Cycle Detected]";
×
500
        return progress;
×
501
    }
502
    processed.insert(task_id);
96!
503

504
    auto it = task_registry_.find(task_id);
96!
505
    if (it == task_registry_.end()) {
96!
506
        progress.task_id = task_id;
×
507
        progress.name = "[Not Found]";
×
508
        return progress;
×
509
    }
510

511
    const TaskInfo& info = it->second;
96!
512
    progress.task_id = task_id;
96✔
513
    progress.name = info.name;
96!
514

515
    // State
516
    switch (info.state) {
96!
517
        case TaskInfo::QUEUED:
1✔
518
            progress.state = "queued";
2!
519
            break;
2✔
520
        case TaskInfo::RUNNING:
521
            progress.state = "running";
×
522
            break;
×
523
        case TaskInfo::WAITING:
524
            progress.state = "waiting";
×
525
            break;
×
526
        case TaskInfo::COMPLETED:
47✔
527
            progress.state = "completed";
94!
528
            break;
94✔
529
        case TaskInfo::FAILED:
530
            progress.state = "failed";
×
531
            break;
×
532
    }
533

534
    // Timing
535
    auto now = std::chrono::steady_clock::now();
96✔
536
    if (info.state == TaskInfo::QUEUED) {
96✔
537
        progress.queued_duration_ms =
2✔
538
            std::chrono::duration<double, std::milli>(now - info.queued_at)
2!
539
                .count();
2!
540
        progress.execution_duration_ms = 0;
2✔
541
    } else if (info.state == TaskInfo::RUNNING ||
95!
542
               info.state == TaskInfo::WAITING) {
94!
543
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
×
544
                                          info.started_at - info.queued_at)
×
545
                                          .count();
×
546
        progress.execution_duration_ms =
×
547
            std::chrono::duration<double, std::milli>(now - info.started_at)
×
548
                .count();
×
549
    } else {  // COMPLETED or FAILED
550
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
141!
551
                                          info.started_at - info.queued_at)
94!
552
                                          .count();
94!
553
        progress.execution_duration_ms =
94✔
554
            std::chrono::duration<double, std::milli>(info.completed_at -
188!
555
                                                      info.started_at)
94!
556
                .count();
94!
557
    }
558

559
    // Progress
560
    progress.total_subtasks = info.child_task_ids.size();
96✔
561
    progress.completed_subtasks = info.completed_children.load();
96✔
562
    if (progress.total_subtasks > 0) {
96✔
563
        progress.progress_percentage =
4✔
564
            (100.0 * static_cast<double>(progress.completed_subtasks)) /
6✔
565
            static_cast<double>(progress.total_subtasks);
4✔
566
    } else {
2✔
567
        progress.progress_percentage =
92✔
568
            (info.state == TaskInfo::COMPLETED) ? 100.0 : 0.0;
92✔
569
    }
570

571
    // Location
572
    switch (info.location) {
96!
573
        case TaskInfo::SHARED_QUEUE:
1✔
574
            progress.location = "shared_queue";
2!
575
            break;
2✔
576
        case TaskInfo::LOCAL_QUEUE:
577
            progress.location =
578
                "worker_" + std::to_string(info.worker_id) + "_local";
×
579
            break;
×
580
        case TaskInfo::EXECUTING:
581
            progress.location =
582
                "executing_on_worker_" + std::to_string(info.worker_id);
×
583
            break;
×
584
        case TaskInfo::DONE:
47✔
585
            progress.location = "done";
94!
586
            break;
94✔
587
    }
588

589
    // Build children recursively
590
    for (TaskIndex child_id : info.child_task_ids) {
100✔
591
        progress.children.push_back(
4!
592
            build_task_progress_tree(child_id, processed));
6!
593
    }
594

595
    return progress;
96✔
596
}
48!
597

598
// ============================================================================
599
// Phase 3: Coro-based task execution
600
// ============================================================================
601

602
coro::Coro Executor::run_task(std::shared_ptr<Task> task,
9,287!
603
                              std::shared_ptr<std::any> input) {
789✔
604
    // Get worker context from TLS (set by worker_thread at start).
605
    auto* context = static_cast<WorkerContext*>(get_current_worker_context());
2,289✔
606

607
    if (!context || !task) {
2,289✔
608
        co_return;
3,825✔
609
    }
610

611
    // Update worker context
612
    context->current_task_id = task->get_id();
2,289✔
613
    {
614
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
770!
615
        context->current_task_name = task->get_name();
770!
616
    }
770✔
617
    context->last_activity = std::chrono::steady_clock::now();
770✔
618

619
    // Mark task start
620
    mark_activity();
770✔
621
    ++tasks_started_;
774✔
622

623
    // Update task registry to RUNNING
624
    {
625
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
774!
626
        auto it = task_registry_.find(task->get_id());
774!
627
        if (it != task_registry_.end()) {
774!
628
            it->second.state = TaskInfo::RUNNING;
774!
629
            it->second.started_at = std::chrono::steady_clock::now();
774!
630
            it->second.worker_id = context->worker_id;
774!
631
            it->second.location = TaskInfo::EXECUTING;
774!
632
        }
774✔
633
    }
774✔
634

635
    task->result().mark_running();
774!
636

637
    {
638
        CoroScope scope(this);
2,289!
639
        std::exception_ptr task_error;
2,289✔
640

641
        DFTRACER_UTILS_LOG_DEBUG("Worker %zu executing task ID %ld ('%s')",
2,289!
642
                                 context->worker_id, task->get_id(),
643
                                 task->get_name());
644

645
        try {
646
            auto coro_task = task->execute(scope, *input);
2,289!
647
            // Propagate executor to the CoroTask's PromiseBase so that
648
            // nested awaitables (when_any, channel, etc.) can schedule
649
            // resumptions.  run_task() is a Coro (CoroPromise, not
650
            // PromiseBase), so the normal PromiseBase propagation in
651
            // CoroTask::await_suspend doesn't fire.
652
            coro_task.handle().promise().set_executor(this);
2,289✔
653
            std::any result = co_await std::move(coro_task);
3,062!
654

655
            // Set result via TaskResult
656
            task->set_result(std::move(result));
758!
657
        } catch (...) {
771✔
658
            task_error = std::current_exception();
13✔
659
        }
13!
660

661
        // Always join scope to wait for any sub-spawned coroutines.
662
        // Without this, the scope's JoinHandle would be destroyed
663
        // while FinalAwaiters still reference it (use-after-free).
664
        co_await scope.join();
3,083!
665

666
        if (!task_error) {
778✔
667
            DFTRACER_UTILS_LOG_DEBUG(
765!
668
                "Task ID %ld ('%s') completed successfully", task->get_id(),
669
                task->get_name());
670

671
            // Mark task completion
672
            mark_activity();
761!
673
            ++tasks_completed_;
761✔
674
            context->tasks_executed++;
761✔
675

676
            // Update task registry to COMPLETED
677
            {
678
                std::unique_lock<std::shared_mutex> lock(registry_mutex_);
761!
679
                auto it = task_registry_.find(task->get_id());
761!
680
                if (it != task_registry_.end()) {
761!
681
                    it->second.state = TaskInfo::COMPLETED;
761!
682
                    it->second.completed_at = std::chrono::steady_clock::now();
761!
683
                    it->second.location = TaskInfo::DONE;
761!
684

685
                    // Update parent's completed children count
686
                    if (it->second.parent_task_id != -1) {
761!
687
                        auto parent_it =
425✔
688
                            task_registry_.find(it->second.parent_task_id);
425!
689
                        if (parent_it != task_registry_.end()) {
425!
690
                            parent_it->second.completed_children++;
421!
691
                        }
421✔
692
                    }
425✔
693
                }
761✔
694
            }
761✔
695

696
            // Notify scheduler
697
            notify_completion(task);
761!
698

699
        } else {
761✔
700
            try {
701
                std::rethrow_exception(task_error);
13!
702
            } catch (const std::exception& e) {
13!
703
                DFTRACER_UTILS_LOG_ERROR("Task ID %ld ('%s') failed: %s",
13!
704
                                         task->get_id(), task->get_name(),
705
                                         e.what());
706

707
                task->set_exception(task_error);
13!
708

709
                mark_activity();
13!
710
                ++tasks_completed_;
13✔
711
                context->tasks_executed++;
13✔
712

713
                {
714
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
13!
715
                    auto it = task_registry_.find(task->get_id());
13!
716
                    if (it != task_registry_.end()) {
13!
717
                        it->second.state = TaskInfo::FAILED;
13!
718
                        it->second.completed_at =
13!
719
                            std::chrono::steady_clock::now();
13✔
720
                        it->second.error_message = e.what();
13!
721
                        it->second.location = TaskInfo::DONE;
13!
722
                    }
13✔
723
                }
13✔
724

725
                notify_completion(task);
13!
726

727
            } catch (...) {
13!
728
                DFTRACER_UTILS_LOG_ERROR(
×
729
                    "Task ID %ld ('%s') failed with unknown exception",
730
                    task->get_id(), task->get_name());
731

732
                task->set_exception(task_error);
×
733

734
                mark_activity();
×
735
                ++tasks_completed_;
736
                context->tasks_executed++;
737

738
                {
739
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
740
                    auto it = task_registry_.find(task->get_id());
×
741
                    if (it != task_registry_.end()) {
×
742
                        it->second.state = TaskInfo::FAILED;
×
743
                        it->second.completed_at =
×
744
                            std::chrono::steady_clock::now();
745
                        it->second.error_message = "Unknown exception";
×
746
                        it->second.location = TaskInfo::DONE;
×
747
                    }
748
                }
749

750
                notify_completion(task);
×
751
            }
13!
752
        }
753
    }
2,323✔
754

755
    // Clear current task info
756
    context->current_task_id = -1;
774✔
757
    {
758
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
774!
759
        context->current_task_name.clear();
774✔
760
    }
774✔
761

762
    co_return;
774✔
763
}
16,091!
764

765
void Executor::submit_task(std::shared_ptr<Task> task,
1,574✔
766
                           std::shared_ptr<std::any> input,
767
                           TaskIndex parent_task_id) {
768
    // Register in task registry
769
    {
770
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,574!
771

772
        auto [it, inserted] = task_registry_.emplace(
2,361!
773
            std::piecewise_construct, std::forward_as_tuple(task->get_id()),
1,574!
774
            std::forward_as_tuple());
2,361✔
775

776
        if (inserted) {
1,574!
777
            it->second.task_id = task->get_id();
1,574!
778
            it->second.parent_task_id = parent_task_id;
1,574!
779
            it->second.name = task->get_name();
1,574!
780
            it->second.state = TaskInfo::QUEUED;
1,574!
781
            it->second.queued_at = std::chrono::steady_clock::now();
1,574!
782
            it->second.location = TaskInfo::SHARED_QUEUE;
1,574!
783
            it->second.worker_id = static_cast<std::size_t>(-1);
1,574!
784

785
            if (parent_task_id != -1) {
1,574✔
786
                auto parent_it = task_registry_.find(parent_task_id);
885!
787
                if (parent_it != task_registry_.end()) {
885✔
788
                    parent_it->second.child_task_ids.push_back(task->get_id());
877!
789
                }
445✔
790
            }
449✔
791
        }
787✔
792
    }
1,574✔
793

794
    ++total_tasks_submitted_;
1,574✔
795

796
    // Create Coro, set executor on promise, enqueue released handle
797
    auto coro = run_task(std::move(task), std::move(input));
2,361!
798
    coro.handle().promise().executor = this;
1,574!
799
    enqueue(coro.release());
1,574!
800
}
1,574✔
801

802
TaskIndex Executor::enqueue_tracked(
1,140✔
803
    coro::Coro coro, std::string name,
804
    std::shared_ptr<std::atomic<TaskIndex>> tid_out) {
805
    TaskIndex id = next_coro_task_id_.fetch_sub(1, std::memory_order_relaxed);
1,140✔
806
    coro.handle().promise().task_id = id;
1,140✔
807
    coro.handle().promise().executor = this;
1,140✔
808

809
    {
810
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,140!
811
        auto [it, inserted] = task_registry_.emplace(std::piecewise_construct,
2,280!
812
                                                     std::forward_as_tuple(id),
1,140✔
813
                                                     std::forward_as_tuple());
1,710✔
814
        if (inserted) {
1,140!
815
            auto now = std::chrono::steady_clock::now();
1,140✔
816
            it->second.task_id = id;
1,140!
817
            it->second.parent_task_id = -1;
1,140!
818
            it->second.name = std::move(name);
1,140!
819
            it->second.state = TaskInfo::QUEUED;
1,140!
820
            it->second.queued_at = now;
1,140!
821
            it->second.location = TaskInfo::SHARED_QUEUE;
1,140!
822
            it->second.worker_id = static_cast<std::size_t>(-1);
1,140!
823
        }
570✔
824
    }
1,140✔
825
    ++total_tasks_submitted_;
1,140✔
826

827
    if (tid_out) {
1,140✔
828
        tid_out->store(id, std::memory_order_release);
1,140✔
829
    }
570✔
830

831
    auto handle = coro.release();
1,140!
832
    enqueue(handle, id);
1,140!
833
    return id;
1,140✔
834
}
835

836
void Executor::mark_coro_completed(TaskIndex id) {
2,280✔
837
    bool was_new = false;
2,280✔
838
    {
839
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
2,280!
840
        auto it = task_registry_.find(id);
2,280!
841
        if (it != task_registry_.end() &&
4,560!
842
            it->second.state != TaskInfo::COMPLETED) {
2,280✔
843
            auto now = std::chrono::steady_clock::now();
1,140✔
844
            if (it->second.started_at.time_since_epoch().count() == 0) {
1,140!
845
                it->second.started_at = now;
1,140!
846
            }
570✔
847
            it->second.state = TaskInfo::COMPLETED;
1,140!
848
            it->second.completed_at = now;
1,140!
849
            it->second.location = TaskInfo::DONE;
1,140!
850
            was_new = true;
1,140✔
851
        }
570✔
852
    }
2,280✔
853
    if (was_new) ++tasks_completed_;
2,280✔
854
}
2,280✔
855

856
void Executor::schedule_destroy(std::coroutine_handle<> handle) {
10,377✔
857
    if (handle) {
10,377✔
858
        destroy_queue_.enqueue(handle);
10,345✔
859
    }
5,111✔
860
}
10,455✔
861

862
void Executor::drain_destroy_queue() {
86,164✔
863
    std::coroutine_handle<> to_destroy;
86,164✔
864
    while (destroy_queue_.try_dequeue(to_destroy)) {
96,705✔
865
        if (to_destroy) {
10,493✔
866
            to_destroy.destroy();
10,476!
867
        }
5,136✔
868
    }
869
}
86,253✔
870

871
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc