• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.51
/src/dftracer/utils/core/pipeline/executor.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/platform_compat.h>
3
#include <dftracer/utils/core/coro/yield.h>
4
#include <dftracer/utils/core/io/io_backend_factory.h>
5
#include <dftracer/utils/core/pipeline/executor.h>
6
#include <dftracer/utils/core/tasks/coro_scope.h>
7
#include <dftracer/utils/core/tasks/task.h>
8
#include <dftracer/utils/core/utilities/monitor.h>
9
#include <zlib.h>
10

11
#include <chrono>
12
#include <coroutine>
13
#include <exception>
14
#include <mutex>
15
#include <vector>
16

17
namespace dftracer::utils {
18

19
namespace {
20

21
// Force zlib-ng's lazy CPU-feature functable init single-threaded before any
22
// worker spawns, so concurrent first-use does not race on the global table.
23
void warmup_vendored_libs() noexcept {
264✔
24
    unsigned char src[64];
25
    for (std::size_t i = 0; i < sizeof(src); ++i) {
17,160✔
26
        src[i] = static_cast<unsigned char>(i);
16,896✔
27
    }
8,448✔
28
    unsigned char comp[128];
29
    unsigned char back[64];
30

31
    z_stream def{};
264✔
32
    if (deflateInit2(&def, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 31, 8,
264!
33
                     Z_DEFAULT_STRATEGY) == Z_OK) {
264!
34
        def.next_in = src;
264✔
35
        def.avail_in = sizeof(src);
264✔
36
        def.next_out = comp;
264✔
37
        def.avail_out = sizeof(comp);
264✔
38
        deflate(&def, Z_FINISH);
264!
39
        uInt comp_len = static_cast<uInt>(sizeof(comp) - def.avail_out);
264✔
40
        deflateEnd(&def);
264!
41

42
        z_stream inf{};
264✔
43
        if (inflateInit2(&inf, 31) == Z_OK) {
264!
44
            inf.next_in = comp;
264✔
45
            inf.avail_in = comp_len;
264✔
46
            inf.next_out = back;
264✔
47
            inf.avail_out = sizeof(back);
264✔
48
            inflate(&inf, Z_FINISH);
264!
49
            inflateEnd(&inf);
264!
50
        }
132✔
51
    }
132✔
52
}
264✔
53

54
}  // namespace
55

56
static thread_local void* tls_current_worker_context = nullptr;
57

58
void* get_current_worker_context() { return tls_current_worker_context; }
3,163✔
59

60
void set_current_worker_context(void* context) {
6,703✔
61
    tls_current_worker_context = context;
6,703✔
62
}
6,703✔
63

64
static thread_local Executor* tls_current_executor = nullptr;
65

66
Executor* Executor::current() noexcept { return tls_current_executor; }
717,472✔
67

68
Executor* Executor::set_current(Executor* e) noexcept {
1,431,115✔
69
    auto* old = tls_current_executor;
1,431,115✔
70
    tls_current_executor = e;
1,431,115✔
71
    return old;
1,431,115✔
72
}
73

74
// Thread-local list of coroutine handles to destroy after the current
75
// resume() returns.  FinalAwaiter pushes here instead of the shared
76
// destroy_queue_ to avoid another worker freeing the frame while
77
// the coroutine-suspend machinery is still accessing it.
78
static thread_local std::vector<std::coroutine_handle<>> tls_pending_destroys;
79

80
void schedule_thread_local_destroy(std::coroutine_handle<> h) {
10,292✔
81
    tls_pending_destroys.push_back(h);
10,292✔
82
}
10,149✔
83

84
void drain_thread_local_destroys() {
37,565✔
85
    Executor* exec = Executor::current();
37,565✔
86
    for (auto h : tls_pending_destroys) {
47,793✔
87
        if (!h) continue;
10,325✔
88
        if (exec) {
10,261!
89
            exec->schedule_destroy(h);
10,261!
90
        } else {
5,018✔
91
            h.destroy();
×
92
        }
93
    }
94
    tls_pending_destroys.clear();
37,485✔
95
}
37,379✔
96

97
Executor::Executor(const ExecutorConfig& config)
2,745!
98
    : num_threads_(config.num_threads == 0
1,647!
99
                       ? dftracer_utils_hardware_concurrency()
549!
100
                       : config.num_threads),
549✔
101
      last_activity_ns_(
1,647✔
102
          std::chrono::steady_clock::now().time_since_epoch().count()),
1,098!
103
      idle_timeout_(config.idle_timeout),
1,098✔
104
      deadlock_timeout_(config.deadlock_timeout),
1,098✔
105
      io_pool_size_(config.io_pool_size == 0
2,096✔
106
                        ? dftracer_utils_hardware_concurrency()
998✔
107
                        : config.io_pool_size),
100✔
108
      io_backend_type_(config.io_backend_type),
1,098✔
109
      io_batch_threshold_(config.io_batch_threshold) {
2,745!
110
    if (num_threads_ == 0) {
1,098✔
111
        num_threads_ = 2;
×
112
    }
113
    if (io_pool_size_ == 0) {
1,098✔
114
        io_pool_size_ = 2;
×
115
    }
116
#ifdef DFTRACER_UTILS_VALGRIND_MODE
117
    // Per-Executor thread churn dominates runtime when Valgrind serializes and
118
    // instruments every thread, so cap the pools.
119
    if (num_threads_ > 2) num_threads_ = 2;
120
    if (io_pool_size_ > 2) io_pool_size_ = 2;
121
#endif
122
    DFTRACER_UTILS_LOG_DEBUG(
1,098!
123
        "Executor created with %zu threads, idle_timeout=%lld s, "
124
        "deadlock_timeout=%lld s",
125
        num_threads_, static_cast<long long>(idle_timeout_.count()),
126
        static_cast<long long>(deadlock_timeout_.count()));
127
}
1,647✔
128

129
Executor::~Executor() {
1,647✔
130
    shutdown();
1,098!
131
    drain_destroy_queue();
1,098!
132
}
1,647✔
133

134
void Executor::start() {
1,088✔
135
    if (running_) {
1,088!
136
        DFTRACER_UTILS_LOG_WARN("%s", "Executor already running");
×
137
        return;
×
138
    }
139

140
    running_ = true;
1,088✔
141
    workers_.clear();
1,088✔
142
    workers_.reserve(num_threads_);
1,088✔
143

144
    static std::once_flag warmup_once;
145
    std::call_once(warmup_once, warmup_vendored_libs);
1,088✔
146

147
    timer_service_.start();
1,088✔
148

149
    // Create and start I/O backend before workers so workers
150
    // can use it immediately.
151
    io_backend_ = io::create_io_backend(*this, io_pool_size_, io_backend_type_,
2,176✔
152
                                        io_batch_threshold_);
1,088✔
153
    io_backend_->start();
1,088✔
154

155
    // Create all worker contexts first so workers_ is stable before any
156
    // worker thread can try to iterate/steal from it.
157
    for (std::size_t i = 0; i < num_threads_; ++i) {
4,450✔
158
        auto worker = std::make_unique<WorkerContext>(i);
3,362!
159
        worker->last_activity = std::chrono::steady_clock::now();
3,362✔
160
        workers_.push_back(std::move(worker));
3,362!
161
    }
3,362✔
162

163
    // Start worker threads after all contexts are in place.
164
    for (auto& worker : workers_) {
4,450✔
165
        worker->thread =
3,362✔
166
            std::thread(&Executor::worker_thread, this, worker.get());
5,107!
167
    }
168

169
    DFTRACER_UTILS_LOG_DEBUG("Executor started with %zu worker threads",
1,088✔
170
                             num_threads_);
171
}
544✔
172

173
void Executor::shutdown() {
2,808✔
174
    if (!running_) {
2,808✔
175
        return;
1,720✔
176
    }
177

178
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutting down executor");
1,088✔
179
    running_ = false;
1,088✔
180
    wake_all_workers();
1,088✔
181

182
    // Join all worker threads (must happen before io_backend_ is
183
    // destroyed, since workers call io_backend_->poll() when idle).
184
    for (auto& worker : workers_) {
4,450✔
185
        if (worker->thread.joinable()) {
3,362✔
186
            worker->thread.join();
3,362!
187
        }
1,617✔
188
    }
189

190
    // Stop I/O backend AFTER joining workers (workers may poll the
191
    // backend) but BEFORE clearing workers_.  The I/O backend's
192
    // completion thread may still call enqueue() -> wake_all_workers()
193
    // which accesses WorkerContext cv/mutex, so workers_ must remain
194
    // alive until the completion thread has exited.
195
    if (io_backend_) {
1,088✔
196
        io_backend_->stop();
1,088✔
197
        io_backend_.reset();
1,088✔
198
    }
544✔
199

200
    // Destroy deferred frames and orphaned run-queue entries BEFORE
201
    // clearing workers_. Frames may hold shared_ptr<Channel> whose
202
    // ConcurrentQueue has TLS producer tokens tied to worker threads.
203
    drain_destroy_queue();
1,088✔
204
    {
205
        RunQueueEntry orphan;
1,088✔
206
        while (run_queue_.try_dequeue(orphan)) {
1,116✔
207
            if (orphan.handle) {
28✔
208
                orphan.handle.destroy();
28!
209
            }
13✔
210
        }
211
    }
212

213
    workers_.clear();
1,088✔
214
    timer_service_.stop();
1,088✔
215

216
    // Drain the main thread's thread-local destroy list (for
217
    // coroutines whose FinalAwaiter ran on the main thread).
218
    drain_thread_local_destroys();
1,088✔
219

220
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor shutdown complete");
1,088✔
221
}
1,404✔
222

223
void Executor::reset() {
×
224
    // Queue will be reset by caller if needed
225
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor reset");
×
226
}
×
227

228
void Executor::set_completion_callback(CompletionCallback callback) {
646✔
229
    completion_callback_ = std::move(callback);
646✔
230
}
646✔
231

232
void Executor::worker_thread(WorkerContext* context) {
3,721✔
233
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu started", context->worker_id);
3,721✔
234

235
    set_current_worker_context(context);
3,721✔
236
    tls_current_executor = this;
3,720✔
237
    coro::reset_timeslice();
3,720✔
238

239
    // Fixed for the process lifetime; read once to keep the resume loop cheap.
240
    const bool monitor = utilities::monitoring_enabled();
3,721✔
241
    if (monitor) {
3,721✔
242
        utilities::monitor_set_worker(static_cast<int>(context->worker_id));
×
243
    }
244

245
    while (running_) {
92,024✔
246
        RunQueueEntry pending_entry;
88,614✔
247

248
        // Snapshot the work signal BEFORE checking any queues.
249
        // This ensures that any signal increment (from enqueue +
250
        // signal_global_work) that happens AFTER this load will be detected by
251
        // the wait predicate below, even if the actual queue check sees the
252
        // queue as empty. Loading it inside the else branch (after queue
253
        // checks) creates a race: work can arrive between the queue check and
254
        // the signal load, causing the worker to sleep with the updated signal
255
        // value while work sits in the queue.
256
        const std::uint64_t observed_signal =
40,407✔
257
            work_signal_.load(std::memory_order_acquire);
88,614✔
258

259
        // Run queue: coroutine handles from enqueue() and
260
        // schedule_coroutine_resumption().
261
        if (run_queue_.try_dequeue(pending_entry)) {
88,516✔
262
            coro::reset_timeslice();
33,125✔
263
            context->is_idle.store(false, std::memory_order_relaxed);
33,085✔
264
            std::coroutine_handle<> pending_resume = pending_entry.handle;
33,184✔
265
            if (pending_resume && !pending_resume.done()) {
33,184!
266
                // Id comes from the entry, not the promise: foreign promise
267
                // types have no task_id field.
268
                TaskIndex tid = pending_entry.task_id;
33,160✔
269
                if (tid >= 0) {
33,160✔
270
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
271
                    auto it = task_registry_.find(tid);
×
272
                    if (it != task_registry_.end() &&
×
273
                        it->second.state == TaskInfo::QUEUED) {
×
274
                        it->second.state = TaskInfo::RUNNING;
×
275
                        it->second.started_at =
×
276
                            std::chrono::steady_clock::now();
×
277
                        it->second.worker_id = context->worker_id;
×
278
                        it->second.location = TaskInfo::EXECUTING;
×
279
                        ++tasks_started_;
×
280
                    }
281
                }
×
282
                DFTRACER_TSAN_ACQUIRE(pending_resume.address());
283
                if (monitor) {
33,160✔
284
                    utilities::monitor_resume_begin(pending_entry.monitor_id);
×
285
                }
286
                pending_resume.resume();
33,160!
287
                if (monitor) {
33,169✔
288
                    utilities::monitor_resume_end(pending_resume.address(),
×
289
                                                  pending_resume.done());
×
290
                }
291
            }
16,284✔
292
            // Destroy coroutine frames that FinalAwaiter deferred to this
293
            // thread.  Safe: resume() has fully returned, so the frame
294
            // is suspended at final_suspend and no code references it.
295
            drain_thread_local_destroys();
33,329!
296
            drain_destroy_queue();
33,179!
297
        }
16,436✔
298
        // No work available -- sleep until signaled.
299
        else {
300
            context->is_idle.store(true, std::memory_order_relaxed);
55,514✔
301
            // Flush any batched I/O operations before sleeping.
302
            // This ensures pending SQEs are submitted even when no
303
            // new work is arriving (drain trigger).
304
            if (io_backend_) {
55,729✔
305
                io_backend_->flush();
55,248!
306
            }
23,762✔
307
            // Opportunistic I/O polling before sleeping.
308
            // If the backend has completions, they'll enqueue new
309
            // work, so skip the sleep and retry the run queue.
310
            if (io_backend_) {
55,935✔
311
                auto reaped = io_backend_->poll(0);
55,741!
312
                if (reaped > 0) {
55,736!
313
                    drain_destroy_queue();
×
314
                    continue;
×
315
                }
316
            }
23,942✔
317
            drain_destroy_queue();
55,923!
318
            // Re-check after snapshotting the signal: shutdown()'s bump may
319
            // have landed post-snapshot, so wait() would park forever.
320
            if (!running_.load(std::memory_order_acquire)) {
55,855✔
321
                break;
17✔
322
            }
323
            work_signal_.wait(observed_signal, std::memory_order_acquire);
55,466✔
324
        }
325
    }
326

327
    // Final drain: destroy any frames deferred during the last resume().
328
    drain_thread_local_destroys();
3,218✔
329

330
    tls_current_executor = nullptr;
3,344✔
331
    set_current_worker_context(nullptr);
3,344✔
332

333
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu terminated", context->worker_id);
3,335✔
334
}
3,305✔
335

336
void Executor::notify_completion(std::shared_ptr<Task> task) {
1,569✔
337
    // No mutex needed -- the callback is set exactly once during Scheduler
338
    // construction (before any task is submitted) and never modified after.
339
    // on_task_completed uses only atomics, lock-free queues, and properly-
340
    // locked shard mutexes, so concurrent calls from multiple workers are safe.
341
    if (completion_callback_) {
1,569✔
342
        completion_callback_(task);
1,570!
343
    }
786✔
344
}
1,568✔
345

346
void Executor::request_shutdown() {
10✔
347
    if (shutdown_requested_.load()) {
10!
348
        return;  // Already requested
×
349
    }
350

351
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutdown requested for executor");
10✔
352
    shutdown_requested_ = true;
10✔
353
}
5✔
354

355
void Executor::schedule_coroutine_resumption(std::coroutine_handle<> handle) {
13,578✔
356
    // Delegate to enqueue() -- unified path for all coroutine handles.
357
    enqueue(handle);
13,578✔
358
}
13,573✔
359

360
void Executor::signal_global_work() {
33,280✔
361
    work_signal_.fetch_add(1, std::memory_order_acq_rel);
33,280✔
362
    // Wake one worker, not all. Each enqueue adds one unit of work,
363
    // so one worker is sufficient. Avoids thundering herd where all
364
    // N threads wake, N-1 find no work, and go back to sleep.
365
    wake_one_worker();
33,280✔
366
}
33,282✔
367

368
void Executor::wake_one_worker() { work_signal_.notify_one(); }
33,298✔
369

370
void Executor::wake_all_workers() {
1,088✔
371
    work_signal_.fetch_add(1, std::memory_order_release);
1,088✔
372
    work_signal_.notify_all();
1,088✔
373
}
1,088✔
374

375
// Helper function for when_all.h (avoids circular dependency)
376
void schedule_coroutine_resumption_helper(Executor* executor,
13,573✔
377
                                          std::coroutine_handle<> handle) {
378
    if (executor) {
13,573✔
379
        executor->schedule_coroutine_resumption(handle);
13,577✔
380
    }
6,593✔
381
}
13,580✔
382

383
// Helper function for when_any.h (avoids circular dependency)
384
void schedule_destroy_helper(Executor* executor,
15✔
385
                             std::coroutine_handle<> handle) {
386
    if (executor && handle) {
15!
387
        executor->schedule_destroy(handle);
15✔
388
    }
7✔
389
}
15✔
390

391
void Executor::enqueue(std::coroutine_handle<> handle, TaskIndex task_id) {
33,288✔
392
    if (!handle || handle.done()) {
33,288!
393
        return;  // Invalid or already completed
6✔
394
    }
395

396
    long long monitor_id = -1;
33,278✔
397
    if (utilities::monitoring_enabled()) {
33,278✔
398
        // task_id >= 0 marks a tracked submitted task; otherwise it is spawn
399
        // fan-out (or a re-enqueue of an already-seen coroutine).
400
        monitor_id = utilities::monitor_enqueue(
×
401
            handle.address(), task_id >= 0 ? utilities::CoroKind::Task
×
402
                                           : utilities::CoroKind::Spawn);
403
    }
404
    DFTRACER_TSAN_RELEASE(handle.address());
405
    run_queue_.enqueue(RunQueueEntry{handle, task_id, monitor_id});
33,280!
406
    signal_global_work();
33,284✔
407
}
16,321✔
408

409
bool Executor::is_responsive() const {
4✔
410
    // If shutdown was requested, consider unresponsive
411
    if (shutdown_requested_.load()) {
4!
412
        return false;
×
413
    }
414

415
    // If not running, not responsive
416
    if (!running_.load()) {
4✔
417
        return false;
×
418
    }
419

420
    // Check if all threads might be deadlocked
421
    // (all threads busy but no progress for a while)
422
    std::size_t started = tasks_started_.load();
4✔
423
    std::size_t completed = tasks_completed_.load();
4✔
424
    std::size_t active = started - completed;
4✔
425

426
    if (active >= num_threads_) {
4✔
427
        // All threads busy - check if making progress
428
        auto now = std::chrono::steady_clock::now();
2✔
429
        auto last_ns = last_activity_ns_.load(std::memory_order_acquire);
2✔
430
        auto last_tp = std::chrono::steady_clock::time_point(
1✔
431
            std::chrono::steady_clock::duration(last_ns));
2✔
432
        auto idle_time = now - last_tp;
2!
433

434
        // If all threads busy but no activity for deadlock_timeout,
435
        // likely deadlocked
436
        if (idle_time > deadlock_timeout_) {
2!
437
            DFTRACER_UTILS_LOG_WARN(
×
438
                "Executor appears deadlocked: %zu threads, %zu active "
439
                "tasks, idle for %lld ms",
440
                num_threads_, active,
441
                static_cast<long long>(
442
                    std::chrono::duration_cast<std::chrono::milliseconds>(
443
                        idle_time)
444
                        .count()));
445
            return false;
×
446
        }
447
    }
1✔
448

449
    return true;
4✔
450
}
2✔
451

452
void Executor::mark_activity() {
3,132✔
453
    last_activity_ns_.store(
4,697✔
454
        std::chrono::steady_clock::now().time_since_epoch().count(),
3,132✔
455
        std::memory_order_release);
456
}
3,131✔
457

458
void Executor::update_task_location(TaskIndex task_id,
×
459
                                    TaskInfo::Location location,
460
                                    std::size_t worker_id) {
461
    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
462
    auto it = task_registry_.find(task_id);
×
463
    if (it != task_registry_.end()) {
×
464
        it->second.location = location;
×
465
        if (location == TaskInfo::LOCAL_QUEUE ||
×
466
            location == TaskInfo::EXECUTING) {
467
            it->second.worker_id = worker_id;
×
468
        }
469
    }
470
}
×
471

472
ExecutorProgress Executor::get_progress() const {
42✔
473
    std::shared_lock<std::shared_mutex> lock(registry_mutex_);
42!
474
    ExecutorProgress progress;
42✔
475

476
    // Overall stats
477
    progress.total_tasks_submitted = total_tasks_submitted_.load();
42✔
478
    progress.tasks_completed = tasks_completed_.load();
42✔
479

480
    // Count task states
481
    progress.tasks_queued = 0;
42✔
482
    progress.tasks_running = 0;
42✔
483
    progress.tasks_failed = 0;
42✔
484

485
    for (const auto& [task_id, info] : task_registry_) {
138!
486
        switch (info.state) {
96!
487
            case TaskInfo::QUEUED:
1✔
488
                progress.tasks_queued++;
2✔
489
                break;
2✔
490
            case TaskInfo::RUNNING:
491
            case TaskInfo::WAITING:
492
                progress.tasks_running++;
×
493
                break;
×
494
            case TaskInfo::COMPLETED:
47✔
495
                // Already counted
496
                break;
94✔
497
            case TaskInfo::FAILED:
498
                progress.tasks_failed++;
×
499
                break;
×
500
        }
501

502
        if (info.state == TaskInfo::FAILED && !info.error_message.empty()) {
96!
503
            progress.recent_errors.push_back({task_id, info.error_message});
×
504
        }
505
    }
506

507
    for (std::size_t i = 0; i < workers_.size(); ++i) {
118✔
508
        // No per-worker local queues anymore; report 0.
509
        progress.worker_queue_depths.push_back(0);
76!
510
    }
38✔
511

512
    // Build task trees (find root tasks)
513
    std::unordered_set<TaskIndex> processed;
63✔
514
    for (const auto& [task_id, info] : task_registry_) {
138!
515
        if (info.parent_task_id == -1) {  // Root task
96✔
516
            auto task_progress = build_task_progress_tree(task_id, processed);
92!
517
            progress.root_tasks.push_back(task_progress);
92✔
518
        }
92✔
519
    }
520

521
    // Worker states
522
    for (const auto& worker : workers_) {
118✔
523
        ExecutorProgress::WorkerStatus status;
76✔
524
        status.worker_id = worker->worker_id;
76✔
525
        status.is_idle = worker->is_idle.load();
76✔
526

527
        TaskIndex current_id = worker->current_task_id.load();
76✔
528
        if (current_id != -1) {
76✔
529
            status.current_task_id = current_id;
6!
530
            std::lock_guard<std::mutex> name_lock(worker->task_name_mutex);
6!
531
            status.current_task_name = worker->current_task_name;
6!
532
        }
6✔
533

534
        status.local_queue_depth = 0;
76✔
535

536
        progress.workers.push_back(status);
76!
537
    }
76✔
538

539
    return progress;
63✔
540
}
42!
541

542
TaskProgress Executor::build_task_progress_tree(
96✔
543
    TaskIndex task_id, std::unordered_set<TaskIndex>& processed) const {
544
    TaskProgress progress;
96✔
545

546
    if (processed.count(task_id)) {
96!
547
        // Avoid cycles
548
        progress.task_id = task_id;
×
549
        progress.name = "[Cycle Detected]";
×
550
        return progress;
×
551
    }
552
    processed.insert(task_id);
96!
553

554
    auto it = task_registry_.find(task_id);
96!
555
    if (it == task_registry_.end()) {
96!
556
        progress.task_id = task_id;
×
557
        progress.name = "[Not Found]";
×
558
        return progress;
×
559
    }
560

561
    const TaskInfo& info = it->second;
96!
562
    progress.task_id = task_id;
96✔
563
    progress.name = info.name;
96!
564

565
    // State
566
    switch (info.state) {
96!
567
        case TaskInfo::QUEUED:
1✔
568
            progress.state = "queued";
2!
569
            break;
2✔
570
        case TaskInfo::RUNNING:
571
            progress.state = "running";
×
572
            break;
×
573
        case TaskInfo::WAITING:
574
            progress.state = "waiting";
×
575
            break;
×
576
        case TaskInfo::COMPLETED:
47✔
577
            progress.state = "completed";
94!
578
            break;
94✔
579
        case TaskInfo::FAILED:
580
            progress.state = "failed";
×
581
            break;
×
582
    }
583

584
    // Timing
585
    auto now = std::chrono::steady_clock::now();
96✔
586
    if (info.state == TaskInfo::QUEUED) {
96✔
587
        progress.queued_duration_ms =
2✔
588
            std::chrono::duration<double, std::milli>(now - info.queued_at)
2!
589
                .count();
2!
590
        progress.execution_duration_ms = 0;
2✔
591
    } else if (info.state == TaskInfo::RUNNING ||
95!
592
               info.state == TaskInfo::WAITING) {
94!
593
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
×
594
                                          info.started_at - info.queued_at)
×
595
                                          .count();
×
596
        progress.execution_duration_ms =
×
597
            std::chrono::duration<double, std::milli>(now - info.started_at)
×
598
                .count();
×
599
    } else {  // COMPLETED or FAILED
600
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
141!
601
                                          info.started_at - info.queued_at)
94!
602
                                          .count();
94!
603
        progress.execution_duration_ms =
94✔
604
            std::chrono::duration<double, std::milli>(info.completed_at -
188!
605
                                                      info.started_at)
94!
606
                .count();
94!
607
    }
608

609
    // Progress
610
    progress.total_subtasks = info.child_task_ids.size();
96✔
611
    progress.completed_subtasks = info.completed_children.load();
96✔
612
    if (progress.total_subtasks > 0) {
96✔
613
        progress.progress_percentage =
4✔
614
            (100.0 * static_cast<double>(progress.completed_subtasks)) /
6✔
615
            static_cast<double>(progress.total_subtasks);
4✔
616
    } else {
2✔
617
        progress.progress_percentage =
92✔
618
            (info.state == TaskInfo::COMPLETED) ? 100.0 : 0.0;
92✔
619
    }
620

621
    // Location
622
    switch (info.location) {
96!
623
        case TaskInfo::SHARED_QUEUE:
1✔
624
            progress.location = "shared_queue";
2!
625
            break;
2✔
626
        case TaskInfo::LOCAL_QUEUE:
627
            progress.location =
628
                "worker_" + std::to_string(info.worker_id) + "_local";
×
629
            break;
×
630
        case TaskInfo::EXECUTING:
631
            progress.location =
632
                "executing_on_worker_" + std::to_string(info.worker_id);
×
633
            break;
×
634
        case TaskInfo::DONE:
47✔
635
            progress.location = "done";
94!
636
            break;
94✔
637
    }
638

639
    // Build children recursively
640
    for (TaskIndex child_id : info.child_task_ids) {
100✔
641
        progress.children.push_back(
4!
642
            build_task_progress_tree(child_id, processed));
6!
643
    }
644

645
    return progress;
96✔
646
}
48!
647

648
// ============================================================================
649
// Phase 3: Coro-based task execution
650
// ============================================================================
651

652
coro::Coro Executor::run_task(std::shared_ptr<Task> task,
9,426!
653
                              std::shared_ptr<std::any> input) {
799!
654
    // Get worker context from TLS (set by worker_thread at start).
655
    auto* context = static_cast<WorkerContext*>(get_current_worker_context());
2,329✔
656

657
    if (!context || !task) {
2,329✔
658
        co_return;
3,885✔
659
    }
660

661
    // Update worker context
662
    context->current_task_id = task->get_id();
2,329✔
663
    {
664
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
784!
665
        context->current_task_name = task->get_name();
784!
666
    }
784✔
667
    context->last_activity = std::chrono::steady_clock::now();
784✔
668

669
    // Mark task start
670
    mark_activity();
784✔
671
    ++tasks_started_;
786✔
672

673
    // Update task registry to RUNNING
674
    {
675
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
786!
676
        auto it = task_registry_.find(task->get_id());
786!
677
        if (it != task_registry_.end()) {
786!
678
            it->second.state = TaskInfo::RUNNING;
786!
679
            it->second.started_at = std::chrono::steady_clock::now();
786!
680
            it->second.worker_id = context->worker_id;
786!
681
            it->second.location = TaskInfo::EXECUTING;
786!
682
        }
786✔
683
    }
786✔
684

685
    task->result().mark_running();
786!
686

687
    {
688
        CoroScope scope(this);
2,329!
689
        std::exception_ptr task_error;
2,329✔
690

691
        DFTRACER_UTILS_LOG_DEBUG("Worker %zu executing task ID %ld ('%s')",
2,329!
692
                                 context->worker_id, task->get_id(),
693
                                 task->get_name());
694

695
        try {
696
            auto coro_task = task->execute(scope, *input);
2,329!
697
            // Propagate executor to the CoroTask's PromiseBase so that
698
            // nested awaitables (when_any, channel, etc.) can schedule
699
            // resumptions.  run_task() is a Coro (CoroPromise, not
700
            // PromiseBase), so the normal PromiseBase propagation in
701
            // CoroTask::await_suspend doesn't fire.
702
            coro_task.handle().promise().set_executor(this);
2,329✔
703
            std::any result = co_await std::move(coro_task);
3,114!
704

705
            // Set result via TaskResult
706
            task->set_result(std::move(result));
772!
707
        } catch (...) {
785✔
708
            task_error = std::current_exception();
13✔
709
        }
13!
710

711
        // Always join scope to wait for any sub-spawned coroutines.
712
        // Without this, the scope's JoinHandle would be destroyed
713
        // while FinalAwaiters still reference it (use-after-free).
714
        co_await scope.join();
3,140!
715

716
        if (!task_error) {
786✔
717
            DFTRACER_UTILS_LOG_DEBUG(
773!
718
                "Task ID %ld ('%s') completed successfully", task->get_id(),
719
                task->get_name());
720

721
            // Mark task completion
722
            mark_activity();
773!
723
            ++tasks_completed_;
773✔
724
            context->tasks_executed++;
773✔
725

726
            // Update task registry to COMPLETED
727
            {
728
                std::unique_lock<std::shared_mutex> lock(registry_mutex_);
773!
729
                auto it = task_registry_.find(task->get_id());
773!
730
                if (it != task_registry_.end()) {
773!
731
                    it->second.state = TaskInfo::COMPLETED;
773!
732
                    it->second.completed_at = std::chrono::steady_clock::now();
773!
733
                    it->second.location = TaskInfo::DONE;
773!
734

735
                    // Update parent's completed children count
736
                    if (it->second.parent_task_id != -1) {
773!
737
                        auto parent_it =
419✔
738
                            task_registry_.find(it->second.parent_task_id);
419!
739
                        if (parent_it != task_registry_.end()) {
419!
740
                            parent_it->second.completed_children++;
415!
741
                        }
415✔
742
                    }
419✔
743
                }
773✔
744
            }
773✔
745

746
            // Notify scheduler
747
            notify_completion(task);
773!
748

749
        } else {
773✔
750
            try {
751
                std::rethrow_exception(task_error);
13!
752
            } catch (const std::exception& e) {
13!
753
                DFTRACER_UTILS_LOG_ERROR("Task ID %ld ('%s') failed: %s",
13!
754
                                         task->get_id(), task->get_name(),
755
                                         e.what());
756

757
                task->set_exception(task_error);
13!
758

759
                mark_activity();
13!
760
                ++tasks_completed_;
13✔
761
                context->tasks_executed++;
13✔
762

763
                {
764
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
13!
765
                    auto it = task_registry_.find(task->get_id());
13!
766
                    if (it != task_registry_.end()) {
13!
767
                        it->second.state = TaskInfo::FAILED;
13!
768
                        it->second.completed_at =
13!
769
                            std::chrono::steady_clock::now();
13✔
770
                        it->second.error_message = e.what();
13!
771
                        it->second.location = TaskInfo::DONE;
13!
772
                    }
13✔
773
                }
13✔
774

775
                notify_completion(task);
13!
776

777
            } catch (...) {
13!
778
                DFTRACER_UTILS_LOG_ERROR(
×
779
                    "Task ID %ld ('%s') failed with unknown exception",
780
                    task->get_id(), task->get_name());
781

782
                task->set_exception(task_error);
×
783

784
                mark_activity();
×
785
                ++tasks_completed_;
786
                context->tasks_executed++;
787

788
                {
789
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
790
                    auto it = task_registry_.find(task->get_id());
×
791
                    if (it != task_registry_.end()) {
×
792
                        it->second.state = TaskInfo::FAILED;
×
793
                        it->second.completed_at =
×
794
                            std::chrono::steady_clock::now();
795
                        it->second.error_message = "Unknown exception";
×
796
                        it->second.location = TaskInfo::DONE;
×
797
                    }
798
                }
799

800
                notify_completion(task);
×
801
            }
13!
802
        }
803
    }
2,357✔
804

805
    // Clear current task info
806
    context->current_task_id = -1;
783✔
807
    {
808
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
783!
809
        context->current_task_name.clear();
783✔
810
    }
783✔
811

812
    co_return;
783✔
813
}
13,273!
814

815
void Executor::submit_task(std::shared_ptr<Task> task,
1,597✔
816
                           std::shared_ptr<std::any> input,
817
                           TaskIndex parent_task_id) {
818
    // Register in task registry
819
    {
820
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,597!
821

822
        auto [it, inserted] = task_registry_.emplace(
2,397!
823
            std::piecewise_construct, std::forward_as_tuple(task->get_id()),
1,598!
824
            std::forward_as_tuple());
2,397✔
825

826
        if (inserted) {
1,598!
827
            it->second.task_id = task->get_id();
1,598!
828
            it->second.parent_task_id = parent_task_id;
1,598!
829
            it->second.name = task->get_name();
1,598!
830
            it->second.state = TaskInfo::QUEUED;
1,598!
831
            it->second.queued_at = std::chrono::steady_clock::now();
1,598!
832
            it->second.location = TaskInfo::SHARED_QUEUE;
1,598!
833
            it->second.worker_id = static_cast<std::size_t>(-1);
1,598!
834

835
            if (parent_task_id != -1) {
1,598✔
836
                auto parent_it = task_registry_.find(parent_task_id);
886!
837
                if (parent_it != task_registry_.end()) {
886✔
838
                    parent_it->second.child_task_ids.push_back(task->get_id());
878!
839
                }
439✔
840
            }
443✔
841
        }
799✔
842
    }
1,598✔
843

844
    ++total_tasks_submitted_;
1,598✔
845

846
    // Create Coro, set executor on promise, enqueue released handle
847
    auto coro = run_task(std::move(task), std::move(input));
2,397!
848
    coro.handle().promise().executor = this;
1,598!
849
    enqueue(coro.release());
1,598!
850
}
1,598✔
851

852
TaskIndex Executor::enqueue_tracked(
1,144✔
853
    coro::Coro coro, std::string name,
854
    std::shared_ptr<std::atomic<TaskIndex>> tid_out) {
855
    TaskIndex id = next_coro_task_id_.fetch_sub(1, std::memory_order_relaxed);
1,144✔
856
    coro.handle().promise().task_id = id;
1,144✔
857
    coro.handle().promise().executor = this;
1,144✔
858

859
    {
860
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,144!
861
        auto [it, inserted] = task_registry_.emplace(std::piecewise_construct,
2,288!
862
                                                     std::forward_as_tuple(id),
1,144✔
863
                                                     std::forward_as_tuple());
1,716✔
864
        if (inserted) {
1,144!
865
            auto now = std::chrono::steady_clock::now();
1,144✔
866
            it->second.task_id = id;
1,144!
867
            it->second.parent_task_id = -1;
1,144!
868
            it->second.name = std::move(name);
1,144!
869
            it->second.state = TaskInfo::QUEUED;
1,144!
870
            it->second.queued_at = now;
1,144!
871
            it->second.location = TaskInfo::SHARED_QUEUE;
1,144!
872
            it->second.worker_id = static_cast<std::size_t>(-1);
1,144!
873
        }
572✔
874
    }
1,144✔
875
    ++total_tasks_submitted_;
1,144✔
876

877
    if (tid_out) {
1,144✔
878
        tid_out->store(id, std::memory_order_release);
1,144✔
879
    }
572✔
880

881
    auto handle = coro.release();
1,144!
882
    enqueue(handle, id);
1,144!
883
    return id;
1,144✔
884
}
885

886
void Executor::mark_coro_completed(TaskIndex id) {
2,288✔
887
    bool was_new = false;
2,288✔
888
    {
889
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
2,288!
890
        auto it = task_registry_.find(id);
2,288!
891
        if (it != task_registry_.end() &&
4,576!
892
            it->second.state != TaskInfo::COMPLETED) {
2,288✔
893
            auto now = std::chrono::steady_clock::now();
1,144✔
894
            if (it->second.started_at.time_since_epoch().count() == 0) {
1,144!
895
                it->second.started_at = now;
1,144!
896
            }
572✔
897
            it->second.state = TaskInfo::COMPLETED;
1,144!
898
            it->second.completed_at = now;
1,144!
899
            it->second.location = TaskInfo::DONE;
1,144!
900
            was_new = true;
1,144✔
901
        }
572✔
902
    }
2,288✔
903
    if (was_new) ++tasks_completed_;
2,288✔
904
}
2,288✔
905

906
void Executor::schedule_destroy(std::coroutine_handle<> handle) {
10,255✔
907
    if (handle) {
10,255✔
908
        destroy_queue_.enqueue(handle);
10,186✔
909
    }
5,001✔
910
}
10,283✔
911

912
void Executor::drain_destroy_queue() {
90,921✔
913
    std::coroutine_handle<> to_destroy;
90,921✔
914
    while (destroy_queue_.try_dequeue(to_destroy)) {
101,374✔
915
        if (to_destroy) {
10,416✔
916
            to_destroy.destroy();
10,406!
917
        }
5,096✔
918
    }
919
}
90,851✔
920

921
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc