• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23529483807

25 Mar 2026 07:17AM UTC coverage: 48.515% (-1.6%) from 50.098%
23529483807

Pull #57

github

web-flow
Merge 5b1e117ad into 38f9f3616
Pull Request #57: feat(comparator): add pairwise traces comparator

18829 of 49412 branches covered (38.11%)

Branch coverage included in aggregate %.

1584 of 1933 new or added lines in 14 files covered. (81.95%)

3552 existing lines in 135 files now uncovered.

18474 of 27477 relevant lines covered (67.23%)

241072.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.42
/src/dftracer/utils/core/pipeline/executor.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/coro/yield.h>
3
#include <dftracer/utils/core/pipeline/executor.h>
4
#include <dftracer/utils/core/sqlite/vfs.h>
5
#include <dftracer/utils/core/tasks/coro_scope.h>
6
#include <dftracer/utils/core/tasks/task.h>
7

8
#include <chrono>
9
#include <coroutine>
10
#include <exception>
11
#include <vector>
12

13
#include "../io/io_backend_factory.h"
14
#include "../io/io_thread_pool.h"
15

16
namespace dftracer::utils {
17

18
static thread_local void* tls_current_worker_context = nullptr;
19

20
void* get_current_worker_context() { return tls_current_worker_context; }
1,410✔
21

22
void set_current_worker_context(void* context) {
2,590✔
23
    tls_current_worker_context = context;
2,590✔
24
}
2,590✔
25

26
static thread_local Executor* tls_current_executor = nullptr;
27

28
Executor* Executor::current() noexcept { return tls_current_executor; }
32,602✔
29

30
Executor* Executor::set_current(Executor* e) noexcept {
676,958✔
31
    auto* old = tls_current_executor;
676,958✔
32
    tls_current_executor = e;
676,958✔
33
    return old;
676,958✔
34
}
35

36
io::IoThreadPool* Executor::sqlite_pool() noexcept {
469✔
37
    return sqlite_pool_.get();
469✔
38
}
39

40
// Thread-local list of coroutine handles to destroy after the current
41
// resume() returns.  FinalAwaiter pushes here instead of the shared
42
// destroy_queue_ to avoid another worker freeing the frame while
43
// the coroutine-suspend machinery is still accessing it.
44
static thread_local std::vector<std::coroutine_handle<>> tls_pending_destroys;
45

46
void schedule_thread_local_destroy(std::coroutine_handle<> h) {
2,718✔
47
    tls_pending_destroys.push_back(h);
2,718✔
48
}
2,718✔
49

50
void drain_thread_local_destroys() {
10,795✔
51
    Executor* exec = Executor::current();
10,795✔
52
    for (auto h : tls_pending_destroys) {
13,506✔
53
        if (!h) continue;
2,711!
54
        if (exec) {
2,711!
55
            exec->schedule_destroy(h);
2,711✔
56
        } else {
2,711✔
UNCOV
57
            h.destroy();
×
58
        }
59
    }
60
    tls_pending_destroys.clear();
10,795✔
61
}
10,795✔
62

63
Executor::Executor(const ExecutorConfig& config)
864✔
64
    : num_threads_(config.num_threads == 0 ? std::thread::hardware_concurrency()
65
                                           : config.num_threads),
66
      last_activity_time_(std::chrono::steady_clock::now()),
67
      idle_timeout_(config.idle_timeout),
68
      deadlock_timeout_(config.deadlock_timeout),
69
      io_pool_size_(config.io_pool_size),
70
      io_backend_type_(config.io_backend_type),
71
      io_batch_threshold_(config.io_batch_threshold),
72
      sqlite_pool_size_(config.sqlite_pool_size) {
432✔
73
    if (num_threads_ == 0) {
432!
UNCOV
74
        num_threads_ = 2;  // Fallback if hardware_concurrency returns 0
×
UNCOV
75
    }
×
76
    DFTRACER_UTILS_LOG_DEBUG(
77
        "Executor created with %zu threads, idle_timeout=%lld s, "
78
        "deadlock_timeout=%lld s",
79
        num_threads_, idle_timeout_.count(), deadlock_timeout_.count());
80
}
432✔
81

82
Executor::~Executor() {
864✔
83
    shutdown();
432!
84
    drain_destroy_queue();
432!
85
}
864✔
86

87
void Executor::start() {
427✔
88
    if (running_) {
427!
UNCOV
89
        DFTRACER_UTILS_LOG_WARN("%s", "Executor already running");
×
UNCOV
90
        return;
×
91
    }
92

93
    running_ = true;
427✔
94
    workers_.clear();
427✔
95
    workers_.reserve(num_threads_);
427✔
96

97
    timer_service_.start();
427✔
98

99
    // Create and start I/O backend before workers so workers
100
    // can use it immediately.
101
    io_backend_ = io::create_io_backend(*this, io_pool_size_, io_backend_type_,
854✔
102
                                        io_batch_threshold_);
427✔
103
    io_backend_->start();
427✔
104
    sqlite::register_dftracer_sqlite_vfs(io_backend_.get(), this);
427✔
105

106
    sqlite_pool_ = std::make_unique<io::IoThreadPool>(sqlite_pool_size_);
427✔
107
    sqlite_pool_->start();
427✔
108

109
    // Create all worker contexts first so workers_ is stable before any
110
    // worker thread can try to iterate/steal from it.
111
    for (std::size_t i = 0; i < num_threads_; ++i) {
1,726✔
112
        auto worker = std::make_unique<WorkerContext>(i);
1,299✔
113
        worker->last_activity = std::chrono::steady_clock::now();
1,299✔
114
        workers_.push_back(std::move(worker));
1,299!
115
    }
1,299✔
116

117
    // Start worker threads after all contexts are in place.
118
    for (auto& worker : workers_) {
1,726✔
119
        worker->thread =
1,299✔
120
            std::thread(&Executor::worker_thread, this, worker.get());
1,299✔
121
    }
122

123
    DFTRACER_UTILS_LOG_DEBUG("Executor started with %zu worker threads",
124
                             num_threads_);
125
}
427✔
126

127
void Executor::shutdown() {
1,131✔
128
    if (!running_) {
1,131✔
129
        return;
704✔
130
    }
131

132
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutting down executor");
133
    running_ = false;
427✔
134
    wake_all_workers();
427✔
135

136
    // Join all worker threads (must happen before io_backend_ is
137
    // destroyed, since workers call io_backend_->poll() when idle).
138
    for (auto& worker : workers_) {
1,726✔
139
        if (worker->thread.joinable()) {
1,299!
140
            worker->thread.join();
1,299✔
141
        }
1,299✔
142
    }
143

144
    // Stop I/O backend AFTER joining workers (workers may poll the
145
    // backend) but BEFORE clearing workers_.  The I/O backend's
146
    // completion thread may still call enqueue() -> wake_all_workers()
147
    // which accesses WorkerContext cv/mutex, so workers_ must remain
148
    // alive until the completion thread has exited.
149
    if (sqlite_pool_) {
427!
150
        sqlite_pool_->stop();
427✔
151
        sqlite_pool_.reset();
427✔
152
    }
427✔
153

154
    sqlite::unregister_dftracer_sqlite_vfs();
427✔
155

156
    if (io_backend_) {
427!
157
        io_backend_->stop();
427✔
158
        io_backend_.reset();
427✔
159
    }
427✔
160

161
    // Destroy deferred frames and orphaned run-queue entries BEFORE
162
    // clearing workers_. Frames may hold shared_ptr<Channel> whose
163
    // ConcurrentQueue has TLS producer tokens tied to worker threads.
164
    drain_destroy_queue();
427✔
165
    {
166
        std::coroutine_handle<> orphan;
427✔
167
        while (run_queue_.try_dequeue(orphan)) {
441✔
168
            if (orphan) {
14!
169
                orphan.destroy();
14✔
170
            }
14✔
171
        }
172
    }
173

174
    workers_.clear();
427✔
175
    timer_service_.stop();
427✔
176

177
    // Drain the main thread's thread-local destroy list (for
178
    // coroutines whose FinalAwaiter ran on the main thread).
179
    drain_thread_local_destroys();
427✔
180

181
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor shutdown complete");
182
}
1,131✔
183

UNCOV
184
void Executor::reset() {
×
185
    // Queue will be reset by caller if needed
186
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor reset");
UNCOV
187
}
×
188

189
void Executor::set_completion_callback(CompletionCallback callback) {
284✔
190
    completion_callback_ = std::move(callback);
284✔
191
}
284✔
192

193
void Executor::worker_thread(WorkerContext* context) {
1,299✔
194
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu started", context->worker_id);
195

196
    set_current_worker_context(context);
1,299✔
197
    tls_current_executor = this;
1,299✔
198
    coro::reset_timeslice();
1,299✔
199

200
    while (running_) {
18,887✔
201
        std::coroutine_handle<> pending_resume;
17,836✔
202

203
        // Snapshot the work signal BEFORE checking any queues.
204
        // This ensures that any signal increment (from enqueue +
205
        // signal_global_work) that happens AFTER this load will be detected by
206
        // the wait predicate below, even if the actual queue check sees the
207
        // queue as empty. Loading it inside the else branch (after queue
208
        // checks) creates a race: work can arrive between the queue check and
209
        // the signal load, causing the worker to sleep with the updated signal
210
        // value while work sits in the queue.
211
        const std::uint64_t observed_signal =
17,836✔
212
            work_signal_.load(std::memory_order_acquire);
17,836✔
213

214
        // Run queue: coroutine handles from enqueue() and
215
        // schedule_coroutine_resumption().
216
        if (run_queue_.try_dequeue(pending_resume)) {
17,836✔
217
            coro::reset_timeslice();
9,066✔
218
            context->is_idle.store(false, std::memory_order_relaxed);
9,066✔
219
            if (pending_resume && !pending_resume.done()) {
9,066✔
220
                auto typed =
221
                    std::coroutine_handle<coro::CoroPromise>::from_address(
9,064✔
222
                        pending_resume.address());
9,064✔
223
                TaskIndex tid = typed.promise().task_id;
9,064✔
224
                if (tid >= 0) {
9,064✔
225
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
6,362✔
226
                    auto it = task_registry_.find(tid);
6,362!
227
                    if (it != task_registry_.end() &&
6,362!
UNCOV
228
                        it->second.state == TaskInfo::QUEUED) {
×
UNCOV
229
                        it->second.state = TaskInfo::RUNNING;
×
230
                        it->second.started_at =
×
231
                            std::chrono::steady_clock::now();
×
232
                        it->second.worker_id = context->worker_id;
×
233
                        it->second.location = TaskInfo::EXECUTING;
×
234
                        ++tasks_started_;
×
235
                    }
×
236
                }
6,362✔
237
                pending_resume.resume();
9,064✔
238
            }
9,064✔
239
            // Destroy coroutine frames that FinalAwaiter deferred to this
240
            // thread.  Safe: resume() has fully returned, so the frame
241
            // is suspended at final_suspend and no code references it.
242
            drain_thread_local_destroys();
9,076✔
243
        }
9,076✔
244
        // No work available -- sleep until signaled.
245
        else {
246
            context->is_idle.store(true, std::memory_order_relaxed);
8,770✔
247
            // Flush any batched I/O operations before sleeping.
248
            // This ensures pending SQEs are submitted even when no
249
            // new work is arriving (drain trigger).
250
            if (io_backend_) {
8,770✔
251
                io_backend_->flush();
8,516✔
252
            }
8,516✔
253
            // Opportunistic I/O polling before sleeping.
254
            // If the backend has completions, they'll enqueue new
255
            // work, so skip the sleep and retry the run queue.
256
            if (io_backend_) {
8,770✔
257
                auto reaped = io_backend_->poll(0);
8,641✔
258
                if (reaped > 0) {
8,641!
UNCOV
259
                    continue;
×
260
                }
261
            }
8,641✔
262
            std::unique_lock<std::mutex> lock(context->queue_mutex);
8,770✔
263
            context->cv.wait(lock, [this, observed_signal] {
25,959!
264
                return !running_.load(std::memory_order_acquire) ||
17,189✔
265
                       work_signal_.load(std::memory_order_acquire) !=
31,784✔
266
                           observed_signal;
15,892✔
267
            });
268
        }
8,512✔
269
    }
270

271
    // Final drain: destroy any frames deferred during the last resume().
272
    drain_thread_local_destroys();
1,051✔
273

274
    tls_current_executor = nullptr;
1,051✔
275
    set_current_worker_context(nullptr);
1,051✔
276

277
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu terminated", context->worker_id);
278
}
1,051✔
279

280
void Executor::notify_completion(std::shared_ptr<Task> task) {
699✔
281
    // No mutex needed -- the callback is set exactly once during Scheduler
282
    // construction (before any task is submitted) and never modified after.
283
    // on_task_completed uses only atomics, lock-free queues, and properly-
284
    // locked shard mutexes, so concurrent calls from multiple workers are safe.
285
    if (completion_callback_) {
699!
286
        completion_callback_(task);
699!
287
    }
699✔
288
}
699✔
289

290
void Executor::request_shutdown() {
5✔
291
    if (shutdown_requested_.load()) {
5!
UNCOV
292
        return;  // Already requested
×
293
    }
294

295
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutdown requested for executor");
296
    shutdown_requested_ = true;
5✔
297
}
5✔
298

299
void Executor::schedule_coroutine_resumption(std::coroutine_handle<> handle) {
4,741✔
300
    // Delegate to enqueue() -- unified path for all coroutine handles.
301
    enqueue(handle);
4,741✔
302
}
4,741✔
303

304
void Executor::signal_global_work() {
9,091✔
305
    work_signal_.fetch_add(1, std::memory_order_acq_rel);
9,091✔
306
    wake_all_workers();
9,091✔
307
}
9,091✔
308

UNCOV
309
void Executor::wake_one_worker() {
×
UNCOV
310
    const std::size_t worker_count = workers_.size();
×
311
    if (worker_count == 0) {
×
312
        return;
×
313
    }
314

UNCOV
315
    const std::size_t worker_index =
×
UNCOV
316
        next_worker_.fetch_add(1, std::memory_order_relaxed) % worker_count;
×
317
    // Lock-then-unlock the worker's mutex before notifying.
318
    // This ensures the worker is either before its predicate check (and will
319
    // see the updated atomic state) or inside cv.wait (and will receive the
320
    // notification). Without this, a notification sent between predicate
321
    // evaluation and cv.wait entry is lost, causing the worker to hang.
UNCOV
322
    workers_[worker_index]->queue_mutex.lock();
×
UNCOV
323
    workers_[worker_index]->queue_mutex.unlock();
×
324
    workers_[worker_index]->cv.notify_one();
×
325
}
×
326

327
void Executor::wake_all_workers() {
9,525✔
328
    for (auto& worker : workers_) {
41,889✔
329
        // Lock-then-unlock ensures the worker is either before its predicate
330
        // check or inside cv.wait before the notification is sent.
331
        // See wake_one_worker() for detailed rationale.
332
        worker->queue_mutex.lock();
32,364✔
333
        worker->queue_mutex.unlock();
32,364✔
334
        worker->cv.notify_all();
32,364✔
335
    }
336
}
9,525✔
337

338
// Helper function for when_all.h (avoids circular dependency)
339
void schedule_coroutine_resumption_helper(Executor* executor,
4,743✔
340
                                          std::coroutine_handle<> handle) {
341
    if (executor) {
4,743!
342
        executor->schedule_coroutine_resumption(handle);
4,743✔
343
    }
4,743✔
344
}
4,743✔
345

346
// Helper function for when_any.h (avoids circular dependency)
347
void schedule_destroy_helper(Executor* executor,
8✔
348
                             std::coroutine_handle<> handle) {
349
    if (executor && handle) {
8!
350
        executor->schedule_destroy(handle);
8✔
351
    }
8✔
352
}
8✔
353

354
void Executor::enqueue(std::coroutine_handle<> handle) {
9,099✔
355
    if (!handle || handle.done()) {
9,099✔
356
        return;  // Invalid or already completed
8✔
357
    }
358

359
    run_queue_.enqueue(handle);
9,091✔
360
    signal_global_work();
9,091✔
361
}
9,091✔
362

363
bool Executor::is_responsive() const {
2✔
364
    // If shutdown was requested, consider unresponsive
365
    if (shutdown_requested_.load()) {
2!
UNCOV
366
        return false;
×
367
    }
368

369
    // If not running, not responsive
370
    if (!running_.load()) {
2!
UNCOV
371
        return false;
×
372
    }
373

374
    // Check if all threads might be deadlocked
375
    // (all threads busy but no progress for a while)
376
    std::size_t started = tasks_started_.load();
2✔
377
    std::size_t completed = tasks_completed_.load();
2✔
378
    std::size_t active = started - completed;
2✔
379

380
    if (active >= num_threads_) {
2✔
381
        // All threads busy - check if making progress
382
        std::lock_guard<std::mutex> lock(activity_mutex_);
1✔
383
        auto now = std::chrono::steady_clock::now();
1✔
384
        auto idle_time = now - last_activity_time_;
1!
385

386
        // If all threads busy but no activity for deadlock_timeout,
387
        // likely deadlocked
388
        if (idle_time > deadlock_timeout_) {
1!
UNCOV
389
            DFTRACER_UTILS_LOG_WARN(
×
390
                "Executor appears deadlocked: %zu threads, %zu active "
391
                "tasks, idle for %lld ms",
392
                num_threads_, active,
393
                std::chrono::duration_cast<std::chrono::milliseconds>(idle_time)
394
                    .count());
UNCOV
395
            return false;
×
396
        }
397
    }
1!
398

399
    return true;
2✔
400
}
2✔
401

402
void Executor::mark_activity() {
1,394✔
403
    std::lock_guard<std::mutex> lock(activity_mutex_);
1,394✔
404
    last_activity_time_ = std::chrono::steady_clock::now();
1,394✔
405
}
1,394✔
406

UNCOV
407
void Executor::update_task_location(TaskIndex task_id,
×
408
                                    TaskInfo::Location location,
409
                                    std::size_t worker_id) {
UNCOV
410
    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
UNCOV
411
    auto it = task_registry_.find(task_id);
×
412
    if (it != task_registry_.end()) {
×
413
        it->second.location = location;
×
414
        if (location == TaskInfo::LOCAL_QUEUE ||
×
415
            location == TaskInfo::EXECUTING) {
×
416
            it->second.worker_id = worker_id;
×
UNCOV
417
        }
×
418
    }
×
UNCOV
419
}
×
420

421
ExecutorProgress Executor::get_progress() const {
21✔
422
    std::shared_lock<std::shared_mutex> lock(registry_mutex_);
21✔
423
    ExecutorProgress progress;
21✔
424

425
    // Overall stats
426
    progress.total_tasks_submitted = total_tasks_submitted_.load();
21✔
427
    progress.tasks_completed = tasks_completed_.load();
21✔
428

429
    // Count task states
430
    progress.tasks_queued = 0;
21✔
431
    progress.tasks_running = 0;
21✔
432
    progress.tasks_failed = 0;
21✔
433

434
    for (const auto& [task_id, info] : task_registry_) {
69!
435
        switch (info.state) {
48!
436
            case TaskInfo::QUEUED:
437
                progress.tasks_queued++;
1✔
438
                break;
1✔
439
            case TaskInfo::RUNNING:
440
            case TaskInfo::WAITING:
UNCOV
441
                progress.tasks_running++;
×
UNCOV
442
                break;
×
443
            case TaskInfo::COMPLETED:
444
                // Already counted
445
                break;
47✔
446
            case TaskInfo::FAILED:
UNCOV
447
                progress.tasks_failed++;
×
UNCOV
448
                break;
×
449
        }
450

451
        if (info.state == TaskInfo::FAILED && !info.error_message.empty()) {
48!
UNCOV
452
            progress.recent_errors.push_back({task_id, info.error_message});
×
UNCOV
453
        }
×
454
    }
455

456
    for (std::size_t i = 0; i < workers_.size(); ++i) {
59✔
457
        // No per-worker local queues anymore; report 0.
458
        progress.worker_queue_depths.push_back(0);
38!
459
    }
38✔
460

461
    // Build task trees (find root tasks)
462
    std::unordered_set<TaskIndex> processed;
21✔
463
    for (const auto& [task_id, info] : task_registry_) {
69!
464
        if (info.parent_task_id == -1) {  // Root task
48✔
465
            auto task_progress = build_task_progress_tree(task_id, processed);
46!
466
            progress.root_tasks.push_back(task_progress);
46!
467
        }
46✔
468
    }
469

470
    // Worker states
471
    for (const auto& worker : workers_) {
59✔
472
        ExecutorProgress::WorkerStatus status;
38✔
473
        status.worker_id = worker->worker_id;
38✔
474
        status.is_idle = worker->is_idle.load();
38✔
475

476
        TaskIndex current_id = worker->current_task_id.load();
38✔
477
        if (current_id != -1) {
38✔
478
            status.current_task_id = current_id;
3!
479
            std::lock_guard<std::mutex> name_lock(worker->task_name_mutex);
3!
480
            status.current_task_name = worker->current_task_name;
3!
481
        }
3✔
482

483
        status.local_queue_depth = 0;
38✔
484

485
        progress.workers.push_back(status);
38!
486
    }
38✔
487

488
    return progress;
21✔
489
}
21!
490

491
TaskProgress Executor::build_task_progress_tree(
48✔
492
    TaskIndex task_id, std::unordered_set<TaskIndex>& processed) const {
493
    TaskProgress progress;
48✔
494

495
    if (processed.count(task_id)) {
48!
496
        // Avoid cycles
UNCOV
497
        progress.task_id = task_id;
×
UNCOV
498
        progress.name = "[Cycle Detected]";
×
499
        return progress;
×
500
    }
501
    processed.insert(task_id);
48!
502

503
    auto it = task_registry_.find(task_id);
48!
504
    if (it == task_registry_.end()) {
48!
UNCOV
505
        progress.task_id = task_id;
×
UNCOV
506
        progress.name = "[Not Found]";
×
507
        return progress;
×
508
    }
509

510
    const TaskInfo& info = it->second;
48!
511
    progress.task_id = task_id;
48✔
512
    progress.name = info.name;
48!
513

514
    // State
515
    switch (info.state) {
48!
516
        case TaskInfo::QUEUED:
517
            progress.state = "queued";
1!
518
            break;
1✔
519
        case TaskInfo::RUNNING:
UNCOV
520
            progress.state = "running";
×
UNCOV
521
            break;
×
522
        case TaskInfo::WAITING:
523
            progress.state = "waiting";
×
UNCOV
524
            break;
×
525
        case TaskInfo::COMPLETED:
526
            progress.state = "completed";
47!
527
            break;
47✔
528
        case TaskInfo::FAILED:
UNCOV
529
            progress.state = "failed";
×
UNCOV
530
            break;
×
531
    }
532

533
    // Timing
534
    auto now = std::chrono::steady_clock::now();
48✔
535
    if (info.state == TaskInfo::QUEUED) {
48✔
536
        progress.queued_duration_ms =
1✔
537
            std::chrono::duration<double, std::milli>(now - info.queued_at)
1!
538
                .count();
1!
539
        progress.execution_duration_ms = 0;
1✔
540
    } else if (info.state == TaskInfo::RUNNING ||
48!
541
               info.state == TaskInfo::WAITING) {
47✔
UNCOV
542
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
×
UNCOV
543
                                          info.started_at - info.queued_at)
×
544
                                          .count();
×
545
        progress.execution_duration_ms =
×
546
            std::chrono::duration<double, std::milli>(now - info.started_at)
×
547
                .count();
×
548
    } else {  // COMPLETED or FAILED
×
549
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
94!
550
                                          info.started_at - info.queued_at)
47!
551
                                          .count();
47!
552
        progress.execution_duration_ms =
47✔
553
            std::chrono::duration<double, std::milli>(info.completed_at -
94!
554
                                                      info.started_at)
47✔
555
                .count();
47!
556
    }
557

558
    // Progress
559
    progress.total_subtasks = info.child_task_ids.size();
48✔
560
    progress.completed_subtasks = info.completed_children.load();
48✔
561
    if (progress.total_subtasks > 0) {
48✔
562
        progress.progress_percentage =
2✔
563
            (100.0 * static_cast<double>(progress.completed_subtasks)) /
4✔
564
            static_cast<double>(progress.total_subtasks);
2✔
565
    } else {
2✔
566
        progress.progress_percentage =
46✔
567
            (info.state == TaskInfo::COMPLETED) ? 100.0 : 0.0;
46✔
568
    }
569

570
    // Location
571
    switch (info.location) {
48!
572
        case TaskInfo::SHARED_QUEUE:
573
            progress.location = "shared_queue";
1!
574
            break;
1✔
575
        case TaskInfo::LOCAL_QUEUE:
UNCOV
576
            progress.location =
×
UNCOV
577
                "worker_" + std::to_string(info.worker_id) + "_local";
×
UNCOV
578
            break;
×
579
        case TaskInfo::EXECUTING:
580
            progress.location =
×
UNCOV
581
                "executing_on_worker_" + std::to_string(info.worker_id);
×
UNCOV
582
            break;
×
583
        case TaskInfo::DONE:
584
            progress.location = "done";
47!
585
            break;
47✔
586
    }
587

588
    // Build children recursively
589
    for (TaskIndex child_id : info.child_task_ids) {
50✔
590
        progress.children.push_back(
2!
591
            build_task_progress_tree(child_id, processed));
2!
592
    }
593

594
    return progress;
48✔
595
}
48!
596

597
// ============================================================================
598
// Phase 3: Coro-based task execution
599
// ============================================================================
600

601
coro::Coro Executor::run_task(std::shared_ptr<Task> task,
7,677!
602
                              std::shared_ptr<std::any> input) {
713!
603
    // Get worker context from TLS (set by worker_thread at start).
604
    auto* context = static_cast<WorkerContext*>(get_current_worker_context());
2,071✔
605

606
    if (!context || !task) {
2,071✔
607
        co_return;
3,459✔
608
    }
609

610
    // Update worker context
611
    context->current_task_id = task->get_id();
2,071✔
612
    {
613
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
696!
614
        context->current_task_name = task->get_name();
696!
615
    }
696✔
616
    context->last_activity = std::chrono::steady_clock::now();
696✔
617

618
    // Mark task start
619
    mark_activity();
696✔
620
    ++tasks_started_;
699✔
621

622
    // Update task registry to RUNNING
623
    {
624
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
699!
625
        auto it = task_registry_.find(task->get_id());
699!
626
        if (it != task_registry_.end()) {
699!
627
            it->second.state = TaskInfo::RUNNING;
699!
628
            it->second.started_at = std::chrono::steady_clock::now();
699!
629
            it->second.worker_id = context->worker_id;
699!
630
            it->second.location = TaskInfo::EXECUTING;
699!
631
        }
699✔
632
    }
699✔
633

634
    task->result().mark_running();
699!
635

636
    {
637
        CoroScope scope(this);
2,071!
638
        std::exception_ptr task_error;
2,071✔
639

640
        DFTRACER_UTILS_LOG_DEBUG("Worker %zu executing task ID %ld ('%s')",
641
                                 context->worker_id, task->get_id(),
642
                                 task->get_name().c_str());
643

644
        try {
645
            auto coro_task = task->execute(scope, *input);
2,071!
646
            // Propagate executor to the CoroTask's PromiseBase so that
647
            // nested awaitables (when_any, channel, etc.) can schedule
648
            // resumptions.  run_task() is a Coro (CoroPromise, not
649
            // PromiseBase), so the normal PromiseBase propagation in
650
            // CoroTask::await_suspend doesn't fire.
651
            coro_task.handle().promise().set_executor(this);
2,071✔
652
            std::any result = co_await std::move(coro_task);
2,770!
653

654
            // Set result via TaskResult
655
            task->set_result(std::move(result));
686!
656
        } catch (...) {
699✔
657
            task_error = std::current_exception();
13✔
658
        }
13!
659

660
        // Always join scope to wait for any sub-spawned coroutines.
661
        // Without this, the scope's JoinHandle would be destroyed
662
        // while FinalAwaiters still reference it (use-after-free).
663
        co_await scope.join();
2,796!
664

665
        if (!task_error) {
699✔
666
            DFTRACER_UTILS_LOG_DEBUG(
667
                "Task ID %ld ('%s') completed successfully", task->get_id(),
668
                task->get_name().c_str());
669

670
            // Mark task completion
671
            mark_activity();
686!
672
            ++tasks_completed_;
686✔
673
            context->tasks_executed++;
686✔
674

675
            // Update task registry to COMPLETED
676
            {
677
                std::unique_lock<std::shared_mutex> lock(registry_mutex_);
686!
678
                auto it = task_registry_.find(task->get_id());
686!
679
                if (it != task_registry_.end()) {
686!
680
                    it->second.state = TaskInfo::COMPLETED;
686!
681
                    it->second.completed_at = std::chrono::steady_clock::now();
686!
682
                    it->second.location = TaskInfo::DONE;
686!
683

684
                    // Update parent's completed children count
685
                    if (it->second.parent_task_id != -1) {
686!
686
                        auto parent_it =
386✔
687
                            task_registry_.find(it->second.parent_task_id);
386!
688
                        if (parent_it != task_registry_.end()) {
386!
689
                            parent_it->second.completed_children++;
386!
690
                        }
386✔
691
                    }
386✔
692
                }
686✔
693
            }
686✔
694

695
            // Notify scheduler
696
            notify_completion(task);
686!
697

698
        } else {
686✔
699
            try {
700
                std::rethrow_exception(task_error);
13!
701
            } catch (const std::exception& e) {
13!
702
                DFTRACER_UTILS_LOG_ERROR("Task ID %ld ('%s') failed: %s",
13!
703
                                         task->get_id(),
704
                                         task->get_name().c_str(), e.what());
705

706
                task->set_exception(task_error);
13!
707

708
                mark_activity();
13!
709
                ++tasks_completed_;
13✔
710
                context->tasks_executed++;
13✔
711

712
                {
713
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
13!
714
                    auto it = task_registry_.find(task->get_id());
13!
715
                    if (it != task_registry_.end()) {
13!
716
                        it->second.state = TaskInfo::FAILED;
13!
717
                        it->second.completed_at =
13!
718
                            std::chrono::steady_clock::now();
13✔
719
                        it->second.error_message = e.what();
13!
720
                        it->second.location = TaskInfo::DONE;
13!
721
                    }
13✔
722
                }
13✔
723

724
                notify_completion(task);
13!
725

726
            } catch (...) {
13!
UNCOV
727
                DFTRACER_UTILS_LOG_ERROR(
×
728
                    "Task ID %ld ('%s') failed with unknown exception",
729
                    task->get_id(), task->get_name().c_str());
730

UNCOV
731
                task->set_exception(task_error);
×
732

UNCOV
733
                mark_activity();
×
UNCOV
734
                ++tasks_completed_;
×
UNCOV
735
                context->tasks_executed++;
×
736

737
                {
UNCOV
738
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
UNCOV
739
                    auto it = task_registry_.find(task->get_id());
×
UNCOV
740
                    if (it != task_registry_.end()) {
×
UNCOV
741
                        it->second.state = TaskInfo::FAILED;
×
UNCOV
742
                        it->second.completed_at =
×
UNCOV
743
                            std::chrono::steady_clock::now();
×
UNCOV
744
                        it->second.error_message = "Unknown exception";
×
UNCOV
745
                        it->second.location = TaskInfo::DONE;
×
UNCOV
746
                    }
×
UNCOV
747
                }
×
748

UNCOV
749
                notify_completion(task);
×
750
            }
13!
751
        }
752
    }
2,097✔
753

754
    // Clear current task info
755
    context->current_task_id = -1;
699✔
756
    {
757
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
699!
758
        context->current_task_name.clear();
699✔
759
    }
699✔
760

761
    co_return;
699✔
762
}
10,387✔
763

764
void Executor::submit_task(std::shared_ptr<Task> task,
713✔
765
                           std::shared_ptr<std::any> input,
766
                           TaskIndex parent_task_id) {
767
    // Register in task registry
768
    {
769
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
713✔
770

771
        auto [it, inserted] = task_registry_.emplace(
2,139!
772
            std::piecewise_construct, std::forward_as_tuple(task->get_id()),
713!
773
            std::forward_as_tuple());
713✔
774

775
        if (inserted) {
713!
776
            it->second.task_id = task->get_id();
713!
777
            it->second.parent_task_id = parent_task_id;
1,426!
778
            it->second.name = task->get_name();
713!
779
            it->second.state = TaskInfo::QUEUED;
713!
780
            it->second.queued_at = std::chrono::steady_clock::now();
1,426!
781
            it->second.location = TaskInfo::SHARED_QUEUE;
713!
782
            it->second.worker_id = static_cast<std::size_t>(-1);
713!
783

784
            if (parent_task_id != -1) {
713✔
785
                auto parent_it = task_registry_.find(parent_task_id);
411!
786
                if (parent_it != task_registry_.end()) {
411!
787
                    parent_it->second.child_task_ids.push_back(task->get_id());
411!
788
                }
411✔
789
            }
411✔
790
        }
713✔
791
    }
713✔
792

793
    ++total_tasks_submitted_;
713✔
794

795
    // Create Coro, set executor on promise, enqueue released handle
796
    auto coro = run_task(std::move(task), std::move(input));
713!
797
    coro.handle().promise().executor = this;
713!
798
    enqueue(coro.release());
713!
799
}
713✔
800

801
TaskIndex Executor::enqueue_tracked(
312✔
802
    coro::Coro coro, std::string name,
803
    std::shared_ptr<std::atomic<TaskIndex>> tid_out) {
804
    TaskIndex id = next_coro_task_id_.fetch_sub(1, std::memory_order_relaxed);
312✔
805
    coro.handle().promise().task_id = id;
312✔
806
    coro.handle().promise().executor = this;
312✔
807

808
    {
809
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
312✔
810
        auto [it, inserted] = task_registry_.emplace(std::piecewise_construct,
624!
811
                                                     std::forward_as_tuple(id),
312✔
812
                                                     std::forward_as_tuple());
312✔
813
        if (inserted) {
312!
814
            auto now = std::chrono::steady_clock::now();
312✔
815
            it->second.task_id = id;
624!
816
            it->second.parent_task_id = -1;
312!
817
            it->second.name = std::move(name);
312!
818
            it->second.state = TaskInfo::QUEUED;
312!
819
            it->second.queued_at = now;
312!
820
            it->second.location = TaskInfo::SHARED_QUEUE;
312!
821
            it->second.worker_id = static_cast<std::size_t>(-1);
312!
822
        }
312✔
823
    }
312✔
824
    ++total_tasks_submitted_;
312✔
825

826
    if (tid_out) {
312!
827
        tid_out->store(id, std::memory_order_release);
312✔
828
    }
312✔
829

830
    auto handle = coro.release();
312✔
831
    enqueue(handle);
312✔
832
    return id;
312✔
UNCOV
833
}
×
834

835
void Executor::mark_coro_completed(TaskIndex id) {
624✔
836
    bool was_new = false;
624✔
837
    {
838
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
624✔
839
        auto it = task_registry_.find(id);
624!
840
        if (it != task_registry_.end() &&
1,248!
841
            it->second.state != TaskInfo::COMPLETED) {
624!
842
            auto now = std::chrono::steady_clock::now();
312✔
843
            if (it->second.started_at.time_since_epoch().count() == 0) {
312!
844
                it->second.started_at = now;
312!
845
            }
312✔
846
            it->second.state = TaskInfo::COMPLETED;
312!
847
            it->second.completed_at = now;
312!
848
            it->second.location = TaskInfo::DONE;
312!
849
            was_new = true;
312✔
850
        }
312✔
851
    }
624✔
852
    if (was_new) ++tasks_completed_;
624✔
853
}
624✔
854

855
void Executor::schedule_destroy(std::coroutine_handle<> handle) {
2,719✔
856
    if (handle) {
2,719!
857
        destroy_queue_.enqueue(handle);
2,719✔
858
    }
2,719✔
859
}
2,719✔
860

861
void Executor::drain_destroy_queue() {
859✔
862
    std::coroutine_handle<> to_destroy;
859✔
863
    while (destroy_queue_.try_dequeue(to_destroy)) {
3,589✔
864
        if (to_destroy) {
2,730!
865
            to_destroy.destroy();
2,730✔
866
        }
2,730✔
867
    }
868
}
859✔
869

870
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc