• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28312924461

28 Jun 2026 05:45AM UTC coverage: 52.278% (-0.08%) from 52.356%
28312924461

push

github

hariharan-devarajan
feat(ci): add Valgrind memory checking for C++ and Python tests

- Add valgrind-cpp and valgrind-python CI jobs with ccache/CPM caching
- Add Makefile targets and helper scripts for native/Docker Valgrind runs
- Add suppression files for C++, Python, and MPI runtime noise
- Update test binaries to skip timing checks under Valgrind
- Add dict_set_steal helper to reduce Python reference counting errors
- Fix memory handling in Arrow column builder and Python objects
- Refactor executor queue to track task IDs for better leak attribution

37422 of 93043 branches covered (40.22%)

Branch coverage included in aggregate %.

129 of 144 new or added lines in 11 files covered. (89.58%)

21 existing lines in 9 files now uncovered.

33719 of 43039 relevant lines covered (78.35%)

20334.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.58
/src/dftracer/utils/core/pipeline/executor.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/platform_compat.h>
3
#include <dftracer/utils/core/coro/yield.h>
4
#include <dftracer/utils/core/io/io_backend_factory.h>
5
#include <dftracer/utils/core/pipeline/executor.h>
6
#include <dftracer/utils/core/tasks/coro_scope.h>
7
#include <dftracer/utils/core/tasks/task.h>
8

9
#include <chrono>
10
#include <coroutine>
11
#include <exception>
12
#include <vector>
13

14
namespace dftracer::utils {
15

16
static thread_local void* tls_current_worker_context = nullptr;
17

18
void* get_current_worker_context() { return tls_current_worker_context; }
3,116✔
19

20
void set_current_worker_context(void* context) {
6,615✔
21
    tls_current_worker_context = context;
6,615✔
22
}
6,615✔
23

24
static thread_local Executor* tls_current_executor = nullptr;
25

26
Executor* Executor::current() noexcept { return tls_current_executor; }
720,659✔
27

28
Executor* Executor::set_current(Executor* e) noexcept {
1,428,117✔
29
    auto* old = tls_current_executor;
1,428,117✔
30
    tls_current_executor = e;
1,428,117✔
31
    return old;
1,428,117✔
32
}
33

34
// Thread-local list of coroutine handles to destroy after the current
35
// resume() returns.  FinalAwaiter pushes here instead of the shared
36
// destroy_queue_ to avoid another worker freeing the frame while
37
// the coroutine-suspend machinery is still accessing it.
38
static thread_local std::vector<std::coroutine_handle<>> tls_pending_destroys;
39

40
void schedule_thread_local_destroy(std::coroutine_handle<> h) {
10,459✔
41
    tls_pending_destroys.push_back(h);
10,459✔
42
}
10,358✔
43

44
void drain_thread_local_destroys() {
37,473✔
45
    Executor* exec = Executor::current();
37,473✔
46
    for (auto h : tls_pending_destroys) {
47,861✔
47
        if (!h) continue;
10,498✔
48
        if (exec) {
10,426!
49
            exec->schedule_destroy(h);
10,426!
50
        } else {
5,149✔
51
            h.destroy();
×
52
        }
53
    }
54
    tls_pending_destroys.clear();
37,456✔
55
}
37,304✔
56

57
Executor::Executor(const ExecutorConfig& config)
2,710!
58
    : num_threads_(config.num_threads == 0
1,629✔
59
                       ? dftracer_utils_hardware_concurrency()
545✔
60
                       : config.num_threads),
539✔
61
      last_activity_ns_(
1,626✔
62
          std::chrono::steady_clock::now().time_since_epoch().count()),
1,084!
63
      idle_timeout_(config.idle_timeout),
1,084✔
64
      deadlock_timeout_(config.deadlock_timeout),
1,084✔
65
      io_pool_size_(config.io_pool_size == 0
2,074✔
66
                        ? dftracer_utils_hardware_concurrency()
990✔
67
                        : config.io_pool_size),
94✔
68
      io_backend_type_(config.io_backend_type),
1,084✔
69
      io_batch_threshold_(config.io_batch_threshold) {
2,710!
70
    if (num_threads_ == 0) {
1,084✔
71
        num_threads_ = 2;
×
72
    }
73
    if (io_pool_size_ == 0) {
1,084✔
74
        io_pool_size_ = 2;
×
75
    }
76
#ifdef DFTRACER_UTILS_VALGRIND_MODE
77
    // Per-Executor thread churn dominates runtime when Valgrind serializes and
78
    // instruments every thread, so cap the pools.
79
    if (num_threads_ > 2) num_threads_ = 2;
80
    if (io_pool_size_ > 2) io_pool_size_ = 2;
81
#endif
82
    DFTRACER_UTILS_LOG_DEBUG(
83
        "Executor created with %zu threads, idle_timeout=%lld s, "
84
        "deadlock_timeout=%lld s",
85
        num_threads_, idle_timeout_.count(), deadlock_timeout_.count());
86
}
1,626✔
87

88
Executor::~Executor() {
1,626✔
89
    shutdown();
1,084!
90
    drain_destroy_queue();
1,084!
91
}
1,626✔
92

93
void Executor::start() {
1,074✔
94
    if (running_) {
1,074!
95
        DFTRACER_UTILS_LOG_WARN("%s", "Executor already running");
×
96
        return;
×
97
    }
98

99
    running_ = true;
1,074✔
100
    workers_.clear();
1,074✔
101
    workers_.reserve(num_threads_);
1,074✔
102

103
    timer_service_.start();
1,074✔
104

105
    // Create and start I/O backend before workers so workers
106
    // can use it immediately.
107
    io_backend_ = io::create_io_backend(*this, io_pool_size_, io_backend_type_,
2,148✔
108
                                        io_batch_threshold_);
1,074✔
109
    io_backend_->start();
1,074✔
110

111
    // Create all worker contexts first so workers_ is stable before any
112
    // worker thread can try to iterate/steal from it.
113
    for (std::size_t i = 0; i < num_threads_; ++i) {
4,390✔
114
        auto worker = std::make_unique<WorkerContext>(i);
3,316!
115
        worker->last_activity = std::chrono::steady_clock::now();
3,316✔
116
        workers_.push_back(std::move(worker));
3,316!
117
    }
3,316✔
118

119
    // Start worker threads after all contexts are in place.
120
    for (auto& worker : workers_) {
4,390✔
121
        worker->thread =
3,316✔
122
            std::thread(&Executor::worker_thread, this, worker.get());
5,035!
123
    }
124

125
    DFTRACER_UTILS_LOG_DEBUG("Executor started with %zu worker threads",
126
                             num_threads_);
127
}
537✔
128

129
void Executor::shutdown() {
2,768✔
130
    if (!running_) {
2,768✔
131
        return;
1,694✔
132
    }
133

134
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutting down executor");
135
    running_ = false;
1,074✔
136
    wake_all_workers();
1,074✔
137

138
    // Join all worker threads (must happen before io_backend_ is
139
    // destroyed, since workers call io_backend_->poll() when idle).
140
    for (auto& worker : workers_) {
4,390✔
141
        if (worker->thread.joinable()) {
3,316✔
142
            worker->thread.join();
3,316!
143
        }
1,597✔
144
    }
145

146
    // Stop I/O backend AFTER joining workers (workers may poll the
147
    // backend) but BEFORE clearing workers_.  The I/O backend's
148
    // completion thread may still call enqueue() -> wake_all_workers()
149
    // which accesses WorkerContext cv/mutex, so workers_ must remain
150
    // alive until the completion thread has exited.
151
    if (io_backend_) {
1,074✔
152
        io_backend_->stop();
1,074✔
153
        io_backend_.reset();
1,074✔
154
    }
537✔
155

156
    // Destroy deferred frames and orphaned run-queue entries BEFORE
157
    // clearing workers_. Frames may hold shared_ptr<Channel> whose
158
    // ConcurrentQueue has TLS producer tokens tied to worker threads.
159
    drain_destroy_queue();
1,074✔
160
    {
161
        RunQueueEntry orphan;
1,074✔
162
        while (run_queue_.try_dequeue(orphan)) {
1,103✔
163
            if (orphan.handle) {
29✔
164
                orphan.handle.destroy();
29!
165
            }
14✔
166
        }
167
    }
168

169
    workers_.clear();
1,074✔
170
    timer_service_.stop();
1,074✔
171

172
    // Drain the main thread's thread-local destroy list (for
173
    // coroutines whose FinalAwaiter ran on the main thread).
174
    drain_thread_local_destroys();
1,074✔
175

176
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor shutdown complete");
177
}
1,384✔
178

179
void Executor::reset() {
×
180
    // Queue will be reset by caller if needed
181
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor reset");
182
}
×
183

184
void Executor::set_completion_callback(CompletionCallback callback) {
634✔
185
    completion_callback_ = std::move(callback);
634✔
186
}
634✔
187

188
void Executor::worker_thread(WorkerContext* context) {
3,315✔
189
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu started", context->worker_id);
190

191
    set_current_worker_context(context);
3,315✔
192
    tls_current_executor = this;
3,315✔
193
    coro::reset_timeslice();
3,315✔
194

195
    while (running_) {
90,094✔
196
        RunQueueEntry pending_entry;
86,476✔
197

198
        // Snapshot the work signal BEFORE checking any queues.
199
        // This ensures that any signal increment (from enqueue +
200
        // signal_global_work) that happens AFTER this load will be detected by
201
        // the wait predicate below, even if the actual queue check sees the
202
        // queue as empty. Loading it inside the else branch (after queue
203
        // checks) creates a race: work can arrive between the queue check and
204
        // the signal load, causing the worker to sleep with the updated signal
205
        // value while work sits in the queue.
206
        const std::uint64_t observed_signal =
37,963✔
207
            work_signal_.load(std::memory_order_acquire);
86,476✔
208

209
        // Run queue: coroutine handles from enqueue() and
210
        // schedule_coroutine_resumption().
211
        if (run_queue_.try_dequeue(pending_entry)) {
86,552✔
212
            coro::reset_timeslice();
33,033✔
213
            context->is_idle.store(false, std::memory_order_relaxed);
33,007✔
214
            std::coroutine_handle<> pending_resume = pending_entry.handle;
33,124✔
215
            if (pending_resume && !pending_resume.done()) {
33,124✔
216
                // Id comes from the entry, not the promise: foreign promise
217
                // types have no task_id field.
218
                TaskIndex tid = pending_entry.task_id;
33,081✔
219
                if (tid >= 0) {
33,081✔
UNCOV
220
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
UNCOV
221
                    auto it = task_registry_.find(tid);
×
UNCOV
222
                    if (it != task_registry_.end() &&
×
223
                        it->second.state == TaskInfo::QUEUED) {
×
224
                        it->second.state = TaskInfo::RUNNING;
×
225
                        it->second.started_at =
×
226
                            std::chrono::steady_clock::now();
×
227
                        it->second.worker_id = context->worker_id;
×
228
                        it->second.location = TaskInfo::EXECUTING;
×
229
                        ++tasks_started_;
×
230
                    }
UNCOV
231
                }
×
232
                DFTRACER_TSAN_ACQUIRE(pending_resume.address());
233
                pending_resume.resume();
33,081!
234
            }
16,284✔
235
            // Destroy coroutine frames that FinalAwaiter deferred to this
236
            // thread.  Safe: resume() has fully returned, so the frame
237
            // is suspended at final_suspend and no code references it.
238
            drain_thread_local_destroys();
33,170!
239
            drain_destroy_queue();
32,981!
240
        }
16,295✔
241
        // No work available -- sleep until signaled.
242
        else {
243
            context->is_idle.store(true, std::memory_order_relaxed);
53,602✔
244
            // Flush any batched I/O operations before sleeping.
245
            // This ensures pending SQEs are submitted even when no
246
            // new work is arriving (drain trigger).
247
            if (io_backend_) {
53,800✔
248
                io_backend_->flush();
53,650!
249
            }
21,650✔
250
            // Opportunistic I/O polling before sleeping.
251
            // If the backend has completions, they'll enqueue new
252
            // work, so skip the sleep and retry the run queue.
253
            if (io_backend_) {
53,972✔
254
                auto reaped = io_backend_->poll(0);
53,977!
255
                if (reaped > 0) {
53,971!
256
                    drain_destroy_queue();
×
257
                    continue;
×
258
                }
259
            }
21,672✔
260
            drain_destroy_queue();
53,967!
261
            work_signal_.wait(observed_signal, std::memory_order_acquire);
53,925✔
262
        }
263
    }
264

265
    // Final drain: destroy any frames deferred during the last resume().
266
    drain_thread_local_destroys();
3,068✔
267

268
    tls_current_executor = nullptr;
3,282✔
269
    set_current_worker_context(nullptr);
3,282✔
270

271
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu terminated", context->worker_id);
272
}
3,281✔
273

274
void Executor::notify_completion(std::shared_ptr<Task> task) {
1,545✔
275
    // No mutex needed -- the callback is set exactly once during Scheduler
276
    // construction (before any task is submitted) and never modified after.
277
    // on_task_completed uses only atomics, lock-free queues, and properly-
278
    // locked shard mutexes, so concurrent calls from multiple workers are safe.
279
    if (completion_callback_) {
1,545✔
280
        completion_callback_(task);
1,545!
281
    }
773✔
282
}
1,544✔
283

284
void Executor::request_shutdown() {
10✔
285
    if (shutdown_requested_.load()) {
10!
286
        return;  // Already requested
×
287
    }
288

289
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutdown requested for executor");
290
    shutdown_requested_ = true;
10✔
291
}
5✔
292

293
void Executor::schedule_coroutine_resumption(std::coroutine_handle<> handle) {
13,456✔
294
    // Delegate to enqueue() -- unified path for all coroutine handles.
295
    enqueue(handle);
13,456✔
296
}
13,451✔
297

298
void Executor::signal_global_work() {
33,207✔
299
    work_signal_.fetch_add(1, std::memory_order_acq_rel);
33,207✔
300
    // Wake one worker, not all. Each enqueue adds one unit of work,
301
    // so one worker is sufficient. Avoids thundering herd where all
302
    // N threads wake, N-1 find no work, and go back to sleep.
303
    wake_one_worker();
33,207✔
304
}
33,207✔
305

306
void Executor::wake_one_worker() { work_signal_.notify_one(); }
33,223✔
307

308
void Executor::wake_all_workers() {
1,074✔
309
    work_signal_.fetch_add(1, std::memory_order_release);
1,074✔
310
    work_signal_.notify_all();
1,074✔
311
}
1,074✔
312

313
// Helper function for when_all.h (avoids circular dependency)
314
void schedule_coroutine_resumption_helper(Executor* executor,
13,455✔
315
                                          std::coroutine_handle<> handle) {
316
    if (executor) {
13,455✔
317
        executor->schedule_coroutine_resumption(handle);
13,455✔
318
    }
6,539✔
319
}
13,453✔
320

321
// Helper function for when_any.h (avoids circular dependency)
322
void schedule_destroy_helper(Executor* executor,
14✔
323
                             std::coroutine_handle<> handle) {
324
    if (executor && handle) {
14!
325
        executor->schedule_destroy(handle);
14✔
326
    }
8✔
327
}
14✔
328

329
void Executor::enqueue(std::coroutine_handle<> handle, TaskIndex task_id) {
33,218✔
330
    if (!handle || handle.done()) {
33,218!
UNCOV
331
        return;  // Invalid or already completed
×
332
    }
333

334
    DFTRACER_TSAN_RELEASE(handle.address());
335
    run_queue_.enqueue(RunQueueEntry{handle, task_id});
33,214!
336
    signal_global_work();
33,218✔
337
}
16,298✔
338

339
bool Executor::is_responsive() const {
4✔
340
    // If shutdown was requested, consider unresponsive
341
    if (shutdown_requested_.load()) {
4!
342
        return false;
×
343
    }
344

345
    // If not running, not responsive
346
    if (!running_.load()) {
4✔
347
        return false;
×
348
    }
349

350
    // Check if all threads might be deadlocked
351
    // (all threads busy but no progress for a while)
352
    std::size_t started = tasks_started_.load();
4✔
353
    std::size_t completed = tasks_completed_.load();
4✔
354
    std::size_t active = started - completed;
4✔
355

356
    if (active >= num_threads_) {
4✔
357
        // All threads busy - check if making progress
358
        auto now = std::chrono::steady_clock::now();
2✔
359
        auto last_ns = last_activity_ns_.load(std::memory_order_acquire);
2✔
360
        auto last_tp = std::chrono::steady_clock::time_point(
1✔
361
            std::chrono::steady_clock::duration(last_ns));
2✔
362
        auto idle_time = now - last_tp;
2!
363

364
        // If all threads busy but no activity for deadlock_timeout,
365
        // likely deadlocked
366
        if (idle_time > deadlock_timeout_) {
2!
367
            DFTRACER_UTILS_LOG_WARN(
×
368
                "Executor appears deadlocked: %zu threads, %zu active "
369
                "tasks, idle for %lld ms",
370
                num_threads_, active,
371
                std::chrono::duration_cast<std::chrono::milliseconds>(idle_time)
372
                    .count());
373
            return false;
×
374
        }
375
    }
1✔
376

377
    return true;
4✔
378
}
2✔
379

380
void Executor::mark_activity() {
3,088✔
381
    last_activity_ns_.store(
4,626✔
382
        std::chrono::steady_clock::now().time_since_epoch().count(),
3,088✔
383
        std::memory_order_release);
384
}
3,087✔
385

386
void Executor::update_task_location(TaskIndex task_id,
×
387
                                    TaskInfo::Location location,
388
                                    std::size_t worker_id) {
389
    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
390
    auto it = task_registry_.find(task_id);
×
391
    if (it != task_registry_.end()) {
×
392
        it->second.location = location;
×
393
        if (location == TaskInfo::LOCAL_QUEUE ||
×
394
            location == TaskInfo::EXECUTING) {
395
            it->second.worker_id = worker_id;
×
396
        }
397
    }
398
}
×
399

400
ExecutorProgress Executor::get_progress() const {
42✔
401
    std::shared_lock<std::shared_mutex> lock(registry_mutex_);
42!
402
    ExecutorProgress progress;
42✔
403

404
    // Overall stats
405
    progress.total_tasks_submitted = total_tasks_submitted_.load();
42✔
406
    progress.tasks_completed = tasks_completed_.load();
42✔
407

408
    // Count task states
409
    progress.tasks_queued = 0;
42✔
410
    progress.tasks_running = 0;
42✔
411
    progress.tasks_failed = 0;
42✔
412

413
    for (const auto& [task_id, info] : task_registry_) {
138!
414
        switch (info.state) {
96!
415
            case TaskInfo::QUEUED:
1✔
416
                progress.tasks_queued++;
2✔
417
                break;
2✔
418
            case TaskInfo::RUNNING:
419
            case TaskInfo::WAITING:
420
                progress.tasks_running++;
×
421
                break;
×
422
            case TaskInfo::COMPLETED:
47✔
423
                // Already counted
424
                break;
94✔
425
            case TaskInfo::FAILED:
426
                progress.tasks_failed++;
×
427
                break;
×
428
        }
429

430
        if (info.state == TaskInfo::FAILED && !info.error_message.empty()) {
96!
431
            progress.recent_errors.push_back({task_id, info.error_message});
×
432
        }
433
    }
434

435
    for (std::size_t i = 0; i < workers_.size(); ++i) {
118✔
436
        // No per-worker local queues anymore; report 0.
437
        progress.worker_queue_depths.push_back(0);
76!
438
    }
38✔
439

440
    // Build task trees (find root tasks)
441
    std::unordered_set<TaskIndex> processed;
63✔
442
    for (const auto& [task_id, info] : task_registry_) {
138!
443
        if (info.parent_task_id == -1) {  // Root task
96✔
444
            auto task_progress = build_task_progress_tree(task_id, processed);
92!
445
            progress.root_tasks.push_back(task_progress);
92✔
446
        }
92✔
447
    }
448

449
    // Worker states
450
    for (const auto& worker : workers_) {
118✔
451
        ExecutorProgress::WorkerStatus status;
76✔
452
        status.worker_id = worker->worker_id;
76✔
453
        status.is_idle = worker->is_idle.load();
76✔
454

455
        TaskIndex current_id = worker->current_task_id.load();
76✔
456
        if (current_id != -1) {
76✔
457
            status.current_task_id = current_id;
6!
458
            std::lock_guard<std::mutex> name_lock(worker->task_name_mutex);
6!
459
            status.current_task_name = worker->current_task_name;
6!
460
        }
6✔
461

462
        status.local_queue_depth = 0;
76✔
463

464
        progress.workers.push_back(status);
76!
465
    }
76✔
466

467
    return progress;
63✔
468
}
42!
469

470
TaskProgress Executor::build_task_progress_tree(
96✔
471
    TaskIndex task_id, std::unordered_set<TaskIndex>& processed) const {
472
    TaskProgress progress;
96✔
473

474
    if (processed.count(task_id)) {
96!
475
        // Avoid cycles
476
        progress.task_id = task_id;
×
477
        progress.name = "[Cycle Detected]";
×
478
        return progress;
×
479
    }
480
    processed.insert(task_id);
96!
481

482
    auto it = task_registry_.find(task_id);
96!
483
    if (it == task_registry_.end()) {
96!
484
        progress.task_id = task_id;
×
485
        progress.name = "[Not Found]";
×
486
        return progress;
×
487
    }
488

489
    const TaskInfo& info = it->second;
96!
490
    progress.task_id = task_id;
96✔
491
    progress.name = info.name;
96!
492

493
    // State
494
    switch (info.state) {
96!
495
        case TaskInfo::QUEUED:
1✔
496
            progress.state = "queued";
2!
497
            break;
2✔
498
        case TaskInfo::RUNNING:
499
            progress.state = "running";
×
500
            break;
×
501
        case TaskInfo::WAITING:
502
            progress.state = "waiting";
×
503
            break;
×
504
        case TaskInfo::COMPLETED:
47✔
505
            progress.state = "completed";
94!
506
            break;
94✔
507
        case TaskInfo::FAILED:
508
            progress.state = "failed";
×
509
            break;
×
510
    }
511

512
    // Timing
513
    auto now = std::chrono::steady_clock::now();
96✔
514
    if (info.state == TaskInfo::QUEUED) {
96✔
515
        progress.queued_duration_ms =
2✔
516
            std::chrono::duration<double, std::milli>(now - info.queued_at)
2!
517
                .count();
2!
518
        progress.execution_duration_ms = 0;
2✔
519
    } else if (info.state == TaskInfo::RUNNING ||
95!
520
               info.state == TaskInfo::WAITING) {
94!
521
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
×
522
                                          info.started_at - info.queued_at)
×
523
                                          .count();
×
524
        progress.execution_duration_ms =
×
525
            std::chrono::duration<double, std::milli>(now - info.started_at)
×
526
                .count();
×
527
    } else {  // COMPLETED or FAILED
528
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
141!
529
                                          info.started_at - info.queued_at)
94!
530
                                          .count();
94!
531
        progress.execution_duration_ms =
94✔
532
            std::chrono::duration<double, std::milli>(info.completed_at -
188!
533
                                                      info.started_at)
94!
534
                .count();
94!
535
    }
536

537
    // Progress
538
    progress.total_subtasks = info.child_task_ids.size();
96✔
539
    progress.completed_subtasks = info.completed_children.load();
96✔
540
    if (progress.total_subtasks > 0) {
96✔
541
        progress.progress_percentage =
4✔
542
            (100.0 * static_cast<double>(progress.completed_subtasks)) /
6✔
543
            static_cast<double>(progress.total_subtasks);
4✔
544
    } else {
2✔
545
        progress.progress_percentage =
92✔
546
            (info.state == TaskInfo::COMPLETED) ? 100.0 : 0.0;
92✔
547
    }
548

549
    // Location
550
    switch (info.location) {
96!
551
        case TaskInfo::SHARED_QUEUE:
1✔
552
            progress.location = "shared_queue";
2!
553
            break;
2✔
554
        case TaskInfo::LOCAL_QUEUE:
555
            progress.location =
556
                "worker_" + std::to_string(info.worker_id) + "_local";
×
557
            break;
×
558
        case TaskInfo::EXECUTING:
559
            progress.location =
560
                "executing_on_worker_" + std::to_string(info.worker_id);
×
561
            break;
×
562
        case TaskInfo::DONE:
47✔
563
            progress.location = "done";
94!
564
            break;
94✔
565
    }
566

567
    // Build children recursively
568
    for (TaskIndex child_id : info.child_task_ids) {
100✔
569
        progress.children.push_back(
4!
570
            build_task_progress_tree(child_id, processed));
6!
571
    }
572

573
    return progress;
96✔
574
}
48!
575

576
// ============================================================================
577
// Phase 3: Coro-based task execution
578
// ============================================================================
579

580
coro::Coro Executor::run_task(std::shared_ptr<Task> task,
9,273!
581
                              std::shared_ptr<std::any> input) {
787!
582
    // Get worker context from TLS (set by worker_thread at start).
583
    auto* context = static_cast<WorkerContext*>(get_current_worker_context());
2,293✔
584

585
    if (!context || !task) {
2,293✔
586
        co_return;
3,827✔
587
    }
588

589
    // Update worker context
590
    context->current_task_id = task->get_id();
2,293✔
591
    {
592
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
773!
593
        context->current_task_name = task->get_name();
773!
594
    }
773✔
595
    context->last_activity = std::chrono::steady_clock::now();
773✔
596

597
    // Mark task start
598
    mark_activity();
773!
599
    ++tasks_started_;
773✔
600

601
    // Update task registry to RUNNING
602
    {
603
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
773!
604
        auto it = task_registry_.find(task->get_id());
773!
605
        if (it != task_registry_.end()) {
773!
606
            it->second.state = TaskInfo::RUNNING;
773!
607
            it->second.started_at = std::chrono::steady_clock::now();
773!
608
            it->second.worker_id = context->worker_id;
773!
609
            it->second.location = TaskInfo::EXECUTING;
773!
610
        }
773✔
611
    }
773✔
612

613
    task->result().mark_running();
773!
614

615
    {
616
        CoroScope scope(this);
2,293!
617
        std::exception_ptr task_error;
2,293✔
618

619
        DFTRACER_UTILS_LOG_DEBUG("Worker %zu executing task ID %ld ('%s')",
620
                                 context->worker_id, task->get_id(),
621
                                 task->get_name());
622

623
        try {
624
            auto coro_task = task->execute(scope, *input);
2,293!
625
            // Propagate executor to the CoroTask's PromiseBase so that
626
            // nested awaitables (when_any, channel, etc.) can schedule
627
            // resumptions.  run_task() is a Coro (CoroPromise, not
628
            // PromiseBase), so the normal PromiseBase propagation in
629
            // CoroTask::await_suspend doesn't fire.
630
            coro_task.handle().promise().set_executor(this);
2,293✔
631
            std::any result = co_await std::move(coro_task);
3,066!
632

633
            // Set result via TaskResult
634
            task->set_result(std::move(result));
760!
635
        } catch (...) {
773✔
636
            task_error = std::current_exception();
13✔
637
        }
13!
638

639
        // Always join scope to wait for any sub-spawned coroutines.
640
        // Without this, the scope's JoinHandle would be destroyed
641
        // while FinalAwaiters still reference it (use-after-free).
642
        co_await scope.join();
3,089!
643

644
        if (!task_error) {
773✔
645
            DFTRACER_UTILS_LOG_DEBUG(
646
                "Task ID %ld ('%s') completed successfully", task->get_id(),
647
                task->get_name());
648

649
            // Mark task completion
650
            mark_activity();
760!
651
            ++tasks_completed_;
760✔
652
            context->tasks_executed++;
760✔
653

654
            // Update task registry to COMPLETED
655
            {
656
                std::unique_lock<std::shared_mutex> lock(registry_mutex_);
760!
657
                auto it = task_registry_.find(task->get_id());
760!
658
                if (it != task_registry_.end()) {
760!
659
                    it->second.state = TaskInfo::COMPLETED;
760!
660
                    it->second.completed_at = std::chrono::steady_clock::now();
760!
661
                    it->second.location = TaskInfo::DONE;
760!
662

663
                    // Update parent's completed children count
664
                    if (it->second.parent_task_id != -1) {
760!
665
                        auto parent_it =
420✔
666
                            task_registry_.find(it->second.parent_task_id);
420!
667
                        if (parent_it != task_registry_.end()) {
420!
668
                            parent_it->second.completed_children++;
416!
669
                        }
416✔
670
                    }
420✔
671
                }
760✔
672
            }
760✔
673

674
            // Notify scheduler
675
            notify_completion(task);
760!
676

677
        } else {
760✔
678
            try {
679
                std::rethrow_exception(task_error);
13!
680
            } catch (const std::exception& e) {
13!
681
                DFTRACER_UTILS_LOG_ERROR("Task ID %ld ('%s') failed: %s",
13!
682
                                         task->get_id(), task->get_name(),
683
                                         e.what());
684

685
                task->set_exception(task_error);
13!
686

687
                mark_activity();
13!
688
                ++tasks_completed_;
13✔
689
                context->tasks_executed++;
13✔
690

691
                {
692
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
13!
693
                    auto it = task_registry_.find(task->get_id());
13!
694
                    if (it != task_registry_.end()) {
13!
695
                        it->second.state = TaskInfo::FAILED;
13!
696
                        it->second.completed_at =
13!
697
                            std::chrono::steady_clock::now();
13✔
698
                        it->second.error_message = e.what();
13!
699
                        it->second.location = TaskInfo::DONE;
13!
700
                    }
13✔
701
                }
13✔
702

703
                notify_completion(task);
13!
704

705
            } catch (...) {
13!
706
                DFTRACER_UTILS_LOG_ERROR(
×
707
                    "Task ID %ld ('%s') failed with unknown exception",
708
                    task->get_id(), task->get_name());
709

710
                task->set_exception(task_error);
×
711

712
                mark_activity();
×
713
                ++tasks_completed_;
714
                context->tasks_executed++;
715

716
                {
717
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
718
                    auto it = task_registry_.find(task->get_id());
×
719
                    if (it != task_registry_.end()) {
×
720
                        it->second.state = TaskInfo::FAILED;
×
721
                        it->second.completed_at =
×
722
                            std::chrono::steady_clock::now();
723
                        it->second.error_message = "Unknown exception";
×
724
                        it->second.location = TaskInfo::DONE;
×
725
                    }
726
                }
727

728
                notify_completion(task);
×
729
            }
13!
730
        }
731
    }
2,317✔
732

733
    // Clear current task info
734
    context->current_task_id = -1;
773✔
735
    {
736
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
773!
737
        context->current_task_name.clear();
773✔
738
    }
773✔
739

740
    co_return;
773✔
741
}
13,060!
742

743
void Executor::submit_task(std::shared_ptr<Task> task,
1,574✔
744
                           std::shared_ptr<std::any> input,
745
                           TaskIndex parent_task_id) {
746
    // Register in task registry
747
    {
748
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,574!
749

750
        auto [it, inserted] = task_registry_.emplace(
2,361!
751
            std::piecewise_construct, std::forward_as_tuple(task->get_id()),
1,574!
752
            std::forward_as_tuple());
2,361✔
753

754
        if (inserted) {
1,574!
755
            it->second.task_id = task->get_id();
1,574!
756
            it->second.parent_task_id = parent_task_id;
1,574!
757
            it->second.name = task->get_name();
1,574!
758
            it->second.state = TaskInfo::QUEUED;
1,574!
759
            it->second.queued_at = std::chrono::steady_clock::now();
1,574!
760
            it->second.location = TaskInfo::SHARED_QUEUE;
1,574!
761
            it->second.worker_id = static_cast<std::size_t>(-1);
1,574!
762

763
            if (parent_task_id != -1) {
1,574✔
764
                auto parent_it = task_registry_.find(parent_task_id);
882!
765
                if (parent_it != task_registry_.end()) {
882✔
766
                    parent_it->second.child_task_ids.push_back(task->get_id());
874!
767
                }
441✔
768
            }
445✔
769
        }
787✔
770
    }
1,574✔
771

772
    ++total_tasks_submitted_;
1,574✔
773

774
    // Create Coro, set executor on promise, enqueue released handle
775
    auto coro = run_task(std::move(task), std::move(input));
2,361!
776
    coro.handle().promise().executor = this;
1,574!
777
    enqueue(coro.release());
1,574!
778
}
1,574✔
779

780
TaskIndex Executor::enqueue_tracked(
1,140✔
781
    coro::Coro coro, std::string name,
782
    std::shared_ptr<std::atomic<TaskIndex>> tid_out) {
783
    TaskIndex id = next_coro_task_id_.fetch_sub(1, std::memory_order_relaxed);
1,140✔
784
    coro.handle().promise().task_id = id;
1,140✔
785
    coro.handle().promise().executor = this;
1,140✔
786

787
    {
788
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,140!
789
        auto [it, inserted] = task_registry_.emplace(std::piecewise_construct,
2,280!
790
                                                     std::forward_as_tuple(id),
1,140✔
791
                                                     std::forward_as_tuple());
1,710✔
792
        if (inserted) {
1,140!
793
            auto now = std::chrono::steady_clock::now();
1,140✔
794
            it->second.task_id = id;
1,140!
795
            it->second.parent_task_id = -1;
1,140!
796
            it->second.name = std::move(name);
1,140!
797
            it->second.state = TaskInfo::QUEUED;
1,140!
798
            it->second.queued_at = now;
1,140!
799
            it->second.location = TaskInfo::SHARED_QUEUE;
1,140!
800
            it->second.worker_id = static_cast<std::size_t>(-1);
1,140!
801
        }
570✔
802
    }
1,140✔
803
    ++total_tasks_submitted_;
1,140✔
804

805
    if (tid_out) {
1,140✔
806
        tid_out->store(id, std::memory_order_release);
1,140✔
807
    }
570✔
808

809
    auto handle = coro.release();
1,140!
810
    enqueue(handle, id);
1,140!
811
    return id;
1,140✔
812
}
813

814
void Executor::mark_coro_completed(TaskIndex id) {
2,280✔
815
    bool was_new = false;
2,280✔
816
    {
817
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
2,280!
818
        auto it = task_registry_.find(id);
2,280!
819
        if (it != task_registry_.end() &&
4,560!
820
            it->second.state != TaskInfo::COMPLETED) {
2,280✔
821
            auto now = std::chrono::steady_clock::now();
1,140✔
822
            if (it->second.started_at.time_since_epoch().count() == 0) {
1,140!
823
                it->second.started_at = now;
1,140!
824
            }
570✔
825
            it->second.state = TaskInfo::COMPLETED;
1,140!
826
            it->second.completed_at = now;
1,140!
827
            it->second.location = TaskInfo::DONE;
1,140!
828
            was_new = true;
1,140✔
829
        }
570✔
830
    }
2,280✔
831
    if (was_new) ++tasks_completed_;
2,280✔
832
}
2,280✔
833

834
void Executor::schedule_destroy(std::coroutine_handle<> handle) {
10,363✔
835
    if (handle) {
10,363✔
836
        destroy_queue_.enqueue(handle);
10,367✔
837
    }
5,155✔
838
}
10,442✔
839

840
void Executor::drain_destroy_queue() {
89,092✔
841
    std::coroutine_handle<> to_destroy;
89,092✔
842
    while (destroy_queue_.try_dequeue(to_destroy)) {
99,640✔
843
        if (to_destroy) {
10,536✔
844
            to_destroy.destroy();
10,516!
845
        }
5,158✔
846
    }
847
}
89,075✔
848

849
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc