• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23531027933

25 Mar 2026 08:05AM UTC coverage: 48.592% (-1.5%) from 50.098%
23531027933

Pull #57

github

web-flow
Merge d1070e289 into 38f9f3616
Pull Request #57: feat(comparator): add pairwise traces comparator

18900 of 49456 branches covered (38.22%)

Branch coverage included in aggregate %.

1604 of 1954 new or added lines in 25 files covered. (82.09%)

3407 existing lines in 135 files now uncovered.

18487 of 27485 relevant lines covered (67.26%)

240991.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.77
/src/dftracer/utils/core/pipeline/executor.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/platform_compat.h>
3
#include <dftracer/utils/core/coro/yield.h>
4
#include <dftracer/utils/core/pipeline/executor.h>
5
#include <dftracer/utils/core/sqlite/vfs.h>
6
#include <dftracer/utils/core/tasks/coro_scope.h>
7
#include <dftracer/utils/core/tasks/task.h>
8

9
#include <chrono>
10
#include <coroutine>
11
#include <exception>
12
#include <vector>
13

14
#include "../io/io_backend_factory.h"
15
#include "../io/io_thread_pool.h"
16

17
namespace dftracer::utils {
18

19
static thread_local void* tls_current_worker_context = nullptr;
20

21
void* get_current_worker_context() { return tls_current_worker_context; }
1,414✔
22

23
void set_current_worker_context(void* context) {
2,592✔
24
    tls_current_worker_context = context;
2,592✔
25
}
2,592✔
26

27
static thread_local Executor* tls_current_executor = nullptr;
28

29
Executor* Executor::current() noexcept { return tls_current_executor; }
32,227✔
30

31
Executor* Executor::set_current(Executor* e) noexcept {
675,374✔
32
    auto* old = tls_current_executor;
675,374✔
33
    tls_current_executor = e;
675,374✔
34
    return old;
675,374✔
35
}
36

37
io::IoThreadPool* Executor::sqlite_pool() noexcept {
469✔
38
    return sqlite_pool_.get();
469✔
39
}
40

41
// Thread-local list of coroutine handles to destroy after the current
42
// resume() returns.  FinalAwaiter pushes here instead of the shared
43
// destroy_queue_ to avoid another worker freeing the frame while
44
// the coroutine-suspend machinery is still accessing it.
45
static thread_local std::vector<std::coroutine_handle<>> tls_pending_destroys;
46

47
void schedule_thread_local_destroy(std::coroutine_handle<> h) {
2,732✔
48
    tls_pending_destroys.push_back(h);
2,732✔
49
}
2,732✔
50

51
void drain_thread_local_destroys() {
10,116✔
52
    Executor* exec = Executor::current();
10,116✔
53
    for (auto h : tls_pending_destroys) {
12,841✔
54
        if (!h) continue;
2,725!
55
        if (exec) {
2,725!
56
            exec->schedule_destroy(h);
2,725✔
57
        } else {
2,725✔
58
            h.destroy();
×
59
        }
60
    }
61
    tls_pending_destroys.clear();
10,116✔
62
}
10,116✔
63

64
Executor::Executor(const ExecutorConfig& config)
864✔
65
    : num_threads_(config.num_threads == 0
66
                       ? dftracer_utils_hardware_concurrency()
67
                       : config.num_threads),
68
      last_activity_time_(std::chrono::steady_clock::now()),
69
      idle_timeout_(config.idle_timeout),
70
      deadlock_timeout_(config.deadlock_timeout),
71
      io_pool_size_(config.io_pool_size),
72
      io_backend_type_(config.io_backend_type),
73
      io_batch_threshold_(config.io_batch_threshold),
74
      sqlite_pool_size_(config.sqlite_pool_size) {
432✔
75
    if (num_threads_ == 0) {
432!
76
        num_threads_ = 2;  // Fallback if hardware_concurrency returns 0
×
UNCOV
77
    }
×
78
    DFTRACER_UTILS_LOG_DEBUG(
79
        "Executor created with %zu threads, idle_timeout=%lld s, "
80
        "deadlock_timeout=%lld s",
81
        num_threads_, idle_timeout_.count(), deadlock_timeout_.count());
82
}
432✔
83

84
Executor::~Executor() {
864✔
85
    shutdown();
432!
86
    drain_destroy_queue();
432!
87
}
864✔
88

89
void Executor::start() {
427✔
90
    if (running_) {
427!
91
        DFTRACER_UTILS_LOG_WARN("%s", "Executor already running");
×
92
        return;
×
93
    }
94

95
    running_ = true;
427✔
96
    workers_.clear();
427✔
97
    workers_.reserve(num_threads_);
427✔
98

99
    timer_service_.start();
427✔
100

101
    // Create and start I/O backend before workers so workers
102
    // can use it immediately.
103
    io_backend_ = io::create_io_backend(*this, io_pool_size_, io_backend_type_,
854✔
104
                                        io_batch_threshold_);
427✔
105
    io_backend_->start();
427✔
106
    sqlite::register_dftracer_sqlite_vfs(io_backend_.get(), this);
427✔
107

108
    sqlite_pool_ = std::make_unique<io::IoThreadPool>(sqlite_pool_size_);
427✔
109
    sqlite_pool_->start();
427✔
110

111
    // Create all worker contexts first so workers_ is stable before any
112
    // worker thread can try to iterate/steal from it.
113
    for (std::size_t i = 0; i < num_threads_; ++i) {
1,726✔
114
        auto worker = std::make_unique<WorkerContext>(i);
1,299✔
115
        worker->last_activity = std::chrono::steady_clock::now();
1,299✔
116
        workers_.push_back(std::move(worker));
1,299!
117
    }
1,299✔
118

119
    // Start worker threads after all contexts are in place.
120
    for (auto& worker : workers_) {
1,726✔
121
        worker->thread =
1,299✔
122
            std::thread(&Executor::worker_thread, this, worker.get());
1,299✔
123
    }
124

125
    DFTRACER_UTILS_LOG_DEBUG("Executor started with %zu worker threads",
126
                             num_threads_);
127
}
427✔
128

129
void Executor::shutdown() {
1,131✔
130
    if (!running_) {
1,131✔
131
        return;
704✔
132
    }
133

134
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutting down executor");
135
    running_ = false;
427✔
136
    wake_all_workers();
427✔
137

138
    // Join all worker threads (must happen before io_backend_ is
139
    // destroyed, since workers call io_backend_->poll() when idle).
140
    for (auto& worker : workers_) {
1,726✔
141
        if (worker->thread.joinable()) {
1,299!
142
            worker->thread.join();
1,299✔
143
        }
1,299✔
144
    }
145

146
    // Stop I/O backend AFTER joining workers (workers may poll the
147
    // backend) but BEFORE clearing workers_.  The I/O backend's
148
    // completion thread may still call enqueue() -> wake_all_workers()
149
    // which accesses WorkerContext cv/mutex, so workers_ must remain
150
    // alive until the completion thread has exited.
151
    if (sqlite_pool_) {
427!
152
        sqlite_pool_->stop();
427✔
153
        sqlite_pool_.reset();
427✔
154
    }
427✔
155

156
    sqlite::unregister_dftracer_sqlite_vfs();
427✔
157

158
    if (io_backend_) {
427!
159
        io_backend_->stop();
427✔
160
        io_backend_.reset();
427✔
161
    }
427✔
162

163
    // Destroy deferred frames and orphaned run-queue entries BEFORE
164
    // clearing workers_. Frames may hold shared_ptr<Channel> whose
165
    // ConcurrentQueue has TLS producer tokens tied to worker threads.
166
    drain_destroy_queue();
427✔
167
    {
168
        std::coroutine_handle<> orphan;
427✔
169
        while (run_queue_.try_dequeue(orphan)) {
437✔
170
            if (orphan) {
10!
171
                orphan.destroy();
10✔
172
            }
10✔
173
        }
174
    }
175

176
    workers_.clear();
427✔
177
    timer_service_.stop();
427✔
178

179
    // Drain the main thread's thread-local destroy list (for
180
    // coroutines whose FinalAwaiter ran on the main thread).
181
    drain_thread_local_destroys();
427✔
182

183
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor shutdown complete");
184
}
1,131✔
185

186
void Executor::reset() {
×
187
    // Queue will be reset by caller if needed
188
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor reset");
189
}
×
190

191
void Executor::set_completion_callback(CompletionCallback callback) {
284✔
192
    completion_callback_ = std::move(callback);
284✔
193
}
284✔
194

195
void Executor::worker_thread(WorkerContext* context) {
1,299✔
196
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu started", context->worker_id);
197

198
    set_current_worker_context(context);
1,299✔
199
    tls_current_executor = this;
1,299✔
200
    coro::reset_timeslice();
1,299✔
201

202
    while (running_) {
18,376✔
203
        std::coroutine_handle<> pending_resume;
17,107✔
204

205
        // Snapshot the work signal BEFORE checking any queues.
206
        // This ensures that any signal increment (from enqueue +
207
        // signal_global_work) that happens AFTER this load will be detected by
208
        // the wait predicate below, even if the actual queue check sees the
209
        // queue as empty. Loading it inside the else branch (after queue
210
        // checks) creates a race: work can arrive between the queue check and
211
        // the signal load, causing the worker to sleep with the updated signal
212
        // value while work sits in the queue.
213
        const std::uint64_t observed_signal =
17,107✔
214
            work_signal_.load(std::memory_order_acquire);
17,107✔
215

216
        // Run queue: coroutine handles from enqueue() and
217
        // schedule_coroutine_resumption().
218
        if (run_queue_.try_dequeue(pending_resume)) {
17,107✔
219
            coro::reset_timeslice();
8,397✔
220
            context->is_idle.store(false, std::memory_order_relaxed);
8,397✔
221
            if (pending_resume && !pending_resume.done()) {
8,397✔
222
                auto typed =
223
                    std::coroutine_handle<coro::CoroPromise>::from_address(
8,397✔
224
                        pending_resume.address());
8,397✔
225
                TaskIndex tid = typed.promise().task_id;
8,397✔
226
                if (tid >= 0) {
8,397✔
227
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
5,669✔
228
                    auto it = task_registry_.find(tid);
5,669!
229
                    if (it != task_registry_.end() &&
5,669!
230
                        it->second.state == TaskInfo::QUEUED) {
×
231
                        it->second.state = TaskInfo::RUNNING;
×
232
                        it->second.started_at =
×
233
                            std::chrono::steady_clock::now();
×
234
                        it->second.worker_id = context->worker_id;
×
235
                        it->second.location = TaskInfo::EXECUTING;
×
236
                        ++tasks_started_;
×
UNCOV
237
                    }
×
238
                }
5,669✔
239
                pending_resume.resume();
8,397✔
240
            }
8,397✔
241
            // Destroy coroutine frames that FinalAwaiter deferred to this
242
            // thread.  Safe: resume() has fully returned, so the frame
243
            // is suspended at final_suspend and no code references it.
244
            drain_thread_local_destroys();
8,407✔
245
        }
8,407✔
246
        // No work available -- sleep until signaled.
247
        else {
248
            context->is_idle.store(true, std::memory_order_relaxed);
8,710✔
249
            // Flush any batched I/O operations before sleeping.
250
            // This ensures pending SQEs are submitted even when no
251
            // new work is arriving (drain trigger).
252
            if (io_backend_) {
8,710✔
253
                io_backend_->flush();
8,684✔
254
            }
8,684✔
255
            // Opportunistic I/O polling before sleeping.
256
            // If the backend has completions, they'll enqueue new
257
            // work, so skip the sleep and retry the run queue.
258
            if (io_backend_) {
8,710✔
259
                auto reaped = io_backend_->poll(0);
8,695✔
260
                if (reaped > 0) {
8,695!
261
                    continue;
×
262
                }
263
            }
8,695✔
264
            std::unique_lock<std::mutex> lock(context->queue_mutex);
8,710✔
265
            context->cv.wait(lock, [this, observed_signal] {
26,059!
266
                return !running_.load(std::memory_order_acquire) ||
17,349✔
267
                       work_signal_.load(std::memory_order_acquire) !=
32,118✔
268
                           observed_signal;
16,059✔
269
            });
270
        }
8,680✔
271
    }
272

273
    // Final drain: destroy any frames deferred during the last resume().
274
    drain_thread_local_destroys();
1,269✔
275

276
    tls_current_executor = nullptr;
1,269✔
277
    set_current_worker_context(nullptr);
1,269✔
278

279
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu terminated", context->worker_id);
280
}
1,269✔
281

282
void Executor::notify_completion(std::shared_ptr<Task> task) {
703✔
283
    // No mutex needed -- the callback is set exactly once during Scheduler
284
    // construction (before any task is submitted) and never modified after.
285
    // on_task_completed uses only atomics, lock-free queues, and properly-
286
    // locked shard mutexes, so concurrent calls from multiple workers are safe.
287
    if (completion_callback_) {
703!
288
        completion_callback_(task);
703!
289
    }
703✔
290
}
703✔
291

292
void Executor::request_shutdown() {
5✔
293
    if (shutdown_requested_.load()) {
5!
294
        return;  // Already requested
×
295
    }
296

297
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutdown requested for executor");
298
    shutdown_requested_ = true;
5✔
299
}
5✔
300

301
void Executor::schedule_coroutine_resumption(std::coroutine_handle<> handle) {
4,056✔
302
    // Delegate to enqueue() -- unified path for all coroutine handles.
303
    enqueue(handle);
4,056✔
304
}
4,056✔
305

306
void Executor::signal_global_work() {
8,407✔
307
    work_signal_.fetch_add(1, std::memory_order_acq_rel);
8,407✔
308
    wake_all_workers();
8,407✔
309
}
8,407✔
310

311
void Executor::wake_one_worker() {
×
312
    const std::size_t worker_count = workers_.size();
×
313
    if (worker_count == 0) {
×
314
        return;
×
315
    }
316

UNCOV
317
    const std::size_t worker_index =
×
318
        next_worker_.fetch_add(1, std::memory_order_relaxed) % worker_count;
×
319
    // Lock-then-unlock the worker's mutex before notifying.
320
    // This ensures the worker is either before its predicate check (and will
321
    // see the updated atomic state) or inside cv.wait (and will receive the
322
    // notification). Without this, a notification sent between predicate
323
    // evaluation and cv.wait entry is lost, causing the worker to hang.
324
    workers_[worker_index]->queue_mutex.lock();
×
325
    workers_[worker_index]->queue_mutex.unlock();
×
326
    workers_[worker_index]->cv.notify_one();
×
UNCOV
327
}
×
328

329
void Executor::wake_all_workers() {
8,836✔
330
    for (auto& worker : workers_) {
35,823✔
331
        // Lock-then-unlock ensures the worker is either before its predicate
332
        // check or inside cv.wait before the notification is sent.
333
        // See wake_one_worker() for detailed rationale.
334
        worker->queue_mutex.lock();
26,987✔
335
        worker->queue_mutex.unlock();
26,987✔
336
        worker->cv.notify_all();
26,987✔
337
    }
338
}
8,836✔
339

340
// Helper function for when_all.h (avoids circular dependency)
341
void schedule_coroutine_resumption_helper(Executor* executor,
4,056✔
342
                                          std::coroutine_handle<> handle) {
343
    if (executor) {
4,056!
344
        executor->schedule_coroutine_resumption(handle);
4,056✔
345
    }
4,056✔
346
}
4,056✔
347

348
// Helper function for when_any.h (avoids circular dependency)
349
void schedule_destroy_helper(Executor* executor,
8✔
350
                             std::coroutine_handle<> handle) {
351
    if (executor && handle) {
8!
352
        executor->schedule_destroy(handle);
8✔
353
    }
8✔
354
}
8✔
355

356
void Executor::enqueue(std::coroutine_handle<> handle) {
8,407✔
357
    if (!handle || handle.done()) {
8,407!
358
        return;  // Invalid or already completed
×
359
    }
360

361
    run_queue_.enqueue(handle);
8,407✔
362
    signal_global_work();
8,407✔
363
}
8,407✔
364

365
bool Executor::is_responsive() const {
2✔
366
    // If shutdown was requested, consider unresponsive
367
    if (shutdown_requested_.load()) {
2!
368
        return false;
×
369
    }
370

371
    // If not running, not responsive
372
    if (!running_.load()) {
2!
373
        return false;
×
374
    }
375

376
    // Check if all threads might be deadlocked
377
    // (all threads busy but no progress for a while)
378
    std::size_t started = tasks_started_.load();
2✔
379
    std::size_t completed = tasks_completed_.load();
2✔
380
    std::size_t active = started - completed;
2✔
381

382
    if (active >= num_threads_) {
2✔
383
        // All threads busy - check if making progress
384
        std::lock_guard<std::mutex> lock(activity_mutex_);
1✔
385
        auto now = std::chrono::steady_clock::now();
1✔
386
        auto idle_time = now - last_activity_time_;
1!
387

388
        // If all threads busy but no activity for deadlock_timeout,
389
        // likely deadlocked
390
        if (idle_time > deadlock_timeout_) {
1!
391
            DFTRACER_UTILS_LOG_WARN(
×
392
                "Executor appears deadlocked: %zu threads, %zu active "
393
                "tasks, idle for %lld ms",
394
                num_threads_, active,
395
                std::chrono::duration_cast<std::chrono::milliseconds>(idle_time)
396
                    .count());
397
            return false;
×
398
        }
399
    }
1!
400

401
    return true;
2✔
402
}
2✔
403

404
void Executor::mark_activity() {
1,403✔
405
    std::lock_guard<std::mutex> lock(activity_mutex_);
1,403✔
406
    last_activity_time_ = std::chrono::steady_clock::now();
1,403✔
407
}
1,403✔
408

409
void Executor::update_task_location(TaskIndex task_id,
×
410
                                    TaskInfo::Location location,
411
                                    std::size_t worker_id) {
412
    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
413
    auto it = task_registry_.find(task_id);
×
414
    if (it != task_registry_.end()) {
×
415
        it->second.location = location;
×
416
        if (location == TaskInfo::LOCAL_QUEUE ||
×
UNCOV
417
            location == TaskInfo::EXECUTING) {
×
418
            it->second.worker_id = worker_id;
×
UNCOV
419
        }
×
UNCOV
420
    }
×
421
}
×
422

423
ExecutorProgress Executor::get_progress() const {
21✔
424
    std::shared_lock<std::shared_mutex> lock(registry_mutex_);
21✔
425
    ExecutorProgress progress;
21✔
426

427
    // Overall stats
428
    progress.total_tasks_submitted = total_tasks_submitted_.load();
21✔
429
    progress.tasks_completed = tasks_completed_.load();
21✔
430

431
    // Count task states
432
    progress.tasks_queued = 0;
21✔
433
    progress.tasks_running = 0;
21✔
434
    progress.tasks_failed = 0;
21✔
435

436
    for (const auto& [task_id, info] : task_registry_) {
69!
437
        switch (info.state) {
48!
438
            case TaskInfo::QUEUED:
439
                progress.tasks_queued++;
1✔
440
                break;
1✔
441
            case TaskInfo::RUNNING:
442
            case TaskInfo::WAITING:
443
                progress.tasks_running++;
×
444
                break;
×
445
            case TaskInfo::COMPLETED:
446
                // Already counted
447
                break;
47✔
448
            case TaskInfo::FAILED:
449
                progress.tasks_failed++;
×
450
                break;
×
451
        }
452

453
        if (info.state == TaskInfo::FAILED && !info.error_message.empty()) {
48!
454
            progress.recent_errors.push_back({task_id, info.error_message});
×
UNCOV
455
        }
×
456
    }
457

458
    for (std::size_t i = 0; i < workers_.size(); ++i) {
59✔
459
        // No per-worker local queues anymore; report 0.
460
        progress.worker_queue_depths.push_back(0);
38!
461
    }
38✔
462

463
    // Build task trees (find root tasks)
464
    std::unordered_set<TaskIndex> processed;
21✔
465
    for (const auto& [task_id, info] : task_registry_) {
69!
466
        if (info.parent_task_id == -1) {  // Root task
48✔
467
            auto task_progress = build_task_progress_tree(task_id, processed);
46!
468
            progress.root_tasks.push_back(task_progress);
46!
469
        }
46✔
470
    }
471

472
    // Worker states
473
    for (const auto& worker : workers_) {
59✔
474
        ExecutorProgress::WorkerStatus status;
38✔
475
        status.worker_id = worker->worker_id;
38✔
476
        status.is_idle = worker->is_idle.load();
38✔
477

478
        TaskIndex current_id = worker->current_task_id.load();
38✔
479
        if (current_id != -1) {
38✔
480
            status.current_task_id = current_id;
3!
481
            std::lock_guard<std::mutex> name_lock(worker->task_name_mutex);
3!
482
            status.current_task_name = worker->current_task_name;
3!
483
        }
3✔
484

485
        status.local_queue_depth = 0;
38✔
486

487
        progress.workers.push_back(status);
38!
488
    }
38✔
489

490
    return progress;
21✔
491
}
21!
492

493
TaskProgress Executor::build_task_progress_tree(
48✔
494
    TaskIndex task_id, std::unordered_set<TaskIndex>& processed) const {
495
    TaskProgress progress;
48✔
496

497
    if (processed.count(task_id)) {
48!
498
        // Avoid cycles
499
        progress.task_id = task_id;
×
500
        progress.name = "[Cycle Detected]";
×
501
        return progress;
×
502
    }
503
    processed.insert(task_id);
48!
504

505
    auto it = task_registry_.find(task_id);
48!
506
    if (it == task_registry_.end()) {
48!
507
        progress.task_id = task_id;
×
508
        progress.name = "[Not Found]";
×
509
        return progress;
×
510
    }
511

512
    const TaskInfo& info = it->second;
48!
513
    progress.task_id = task_id;
48✔
514
    progress.name = info.name;
48!
515

516
    // State
517
    switch (info.state) {
48!
518
        case TaskInfo::QUEUED:
519
            progress.state = "queued";
1!
520
            break;
1✔
521
        case TaskInfo::RUNNING:
522
            progress.state = "running";
×
523
            break;
×
524
        case TaskInfo::WAITING:
525
            progress.state = "waiting";
×
526
            break;
×
527
        case TaskInfo::COMPLETED:
528
            progress.state = "completed";
47!
529
            break;
47✔
530
        case TaskInfo::FAILED:
531
            progress.state = "failed";
×
532
            break;
×
533
    }
534

535
    // Timing
536
    auto now = std::chrono::steady_clock::now();
48✔
537
    if (info.state == TaskInfo::QUEUED) {
48✔
538
        progress.queued_duration_ms =
1✔
539
            std::chrono::duration<double, std::milli>(now - info.queued_at)
1!
540
                .count();
1!
541
        progress.execution_duration_ms = 0;
1✔
542
    } else if (info.state == TaskInfo::RUNNING ||
48!
543
               info.state == TaskInfo::WAITING) {
47✔
544
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
×
545
                                          info.started_at - info.queued_at)
×
546
                                          .count();
×
547
        progress.execution_duration_ms =
×
548
            std::chrono::duration<double, std::milli>(now - info.started_at)
×
549
                .count();
×
UNCOV
550
    } else {  // COMPLETED or FAILED
×
551
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
94!
552
                                          info.started_at - info.queued_at)
47!
553
                                          .count();
47!
554
        progress.execution_duration_ms =
47✔
555
            std::chrono::duration<double, std::milli>(info.completed_at -
94!
556
                                                      info.started_at)
47✔
557
                .count();
47!
558
    }
559

560
    // Progress
561
    progress.total_subtasks = info.child_task_ids.size();
48✔
562
    progress.completed_subtasks = info.completed_children.load();
48✔
563
    if (progress.total_subtasks > 0) {
48✔
564
        progress.progress_percentage =
2✔
565
            (100.0 * static_cast<double>(progress.completed_subtasks)) /
4✔
566
            static_cast<double>(progress.total_subtasks);
2✔
567
    } else {
2✔
568
        progress.progress_percentage =
46✔
569
            (info.state == TaskInfo::COMPLETED) ? 100.0 : 0.0;
46✔
570
    }
571

572
    // Location
573
    switch (info.location) {
48!
574
        case TaskInfo::SHARED_QUEUE:
575
            progress.location = "shared_queue";
1!
576
            break;
1✔
577
        case TaskInfo::LOCAL_QUEUE:
UNCOV
578
            progress.location =
×
579
                "worker_" + std::to_string(info.worker_id) + "_local";
×
580
            break;
×
581
        case TaskInfo::EXECUTING:
UNCOV
582
            progress.location =
×
583
                "executing_on_worker_" + std::to_string(info.worker_id);
×
584
            break;
×
585
        case TaskInfo::DONE:
586
            progress.location = "done";
47!
587
            break;
47✔
588
    }
589

590
    // Build children recursively
591
    for (TaskIndex child_id : info.child_task_ids) {
50✔
592
        progress.children.push_back(
2!
593
            build_task_progress_tree(child_id, processed));
2!
594
    }
595

596
    return progress;
48✔
597
}
48!
598

599
// ============================================================================
600
// Phase 3: Coro-based task execution
601
// ============================================================================
602

603
coro::Coro Executor::run_task(std::shared_ptr<Task> task,
7,680!
604
                              std::shared_ptr<std::any> input) {
713!
605
    // Get worker context from TLS (set by worker_thread at start).
606
    auto* context = static_cast<WorkerContext*>(get_current_worker_context());
2,076✔
607

608
    if (!context || !task) {
2,076✔
609
        co_return;
3,461✔
610
    }
611

612
    // Update worker context
613
    context->current_task_id = task->get_id();
2,076✔
614
    {
615
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
702!
616
        context->current_task_name = task->get_name();
702!
617
    }
702✔
618
    context->last_activity = std::chrono::steady_clock::now();
702✔
619

620
    // Mark task start
621
    mark_activity();
702✔
622
    ++tasks_started_;
703✔
623

624
    // Update task registry to RUNNING
625
    {
626
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
703!
627
        auto it = task_registry_.find(task->get_id());
703!
628
        if (it != task_registry_.end()) {
703!
629
            it->second.state = TaskInfo::RUNNING;
703!
630
            it->second.started_at = std::chrono::steady_clock::now();
703!
631
            it->second.worker_id = context->worker_id;
703!
632
            it->second.location = TaskInfo::EXECUTING;
703!
633
        }
703✔
634
    }
703✔
635

636
    task->result().mark_running();
703!
637

638
    {
639
        CoroScope scope(this);
2,076!
640
        std::exception_ptr task_error;
2,076✔
641

642
        DFTRACER_UTILS_LOG_DEBUG("Worker %zu executing task ID %ld ('%s')",
643
                                 context->worker_id, task->get_id(),
644
                                 task->get_name().c_str());
645

646
        try {
647
            auto coro_task = task->execute(scope, *input);
2,076!
648
            // Propagate executor to the CoroTask's PromiseBase so that
649
            // nested awaitables (when_any, channel, etc.) can schedule
650
            // resumptions.  run_task() is a Coro (CoroPromise, not
651
            // PromiseBase), so the normal PromiseBase propagation in
652
            // CoroTask::await_suspend doesn't fire.
653
            coro_task.handle().promise().set_executor(this);
2,076✔
654
            std::any result = co_await std::move(coro_task);
2,778!
655

656
            // Set result via TaskResult
657
            task->set_result(std::move(result));
685!
658
        } catch (...) {
698✔
659
            task_error = std::current_exception();
13✔
660
        }
13!
661

662
        // Always join scope to wait for any sub-spawned coroutines.
663
        // Without this, the scope's JoinHandle would be destroyed
664
        // while FinalAwaiters still reference it (use-after-free).
665
        co_await scope.join();
2,776!
666

667
        if (!task_error) {
703✔
668
            DFTRACER_UTILS_LOG_DEBUG(
669
                "Task ID %ld ('%s') completed successfully", task->get_id(),
670
                task->get_name().c_str());
671

672
            // Mark task completion
673
            mark_activity();
690!
674
            ++tasks_completed_;
690✔
675
            context->tasks_executed++;
690✔
676

677
            // Update task registry to COMPLETED
678
            {
679
                std::unique_lock<std::shared_mutex> lock(registry_mutex_);
690!
680
                auto it = task_registry_.find(task->get_id());
690!
681
                if (it != task_registry_.end()) {
690!
682
                    it->second.state = TaskInfo::COMPLETED;
690!
683
                    it->second.completed_at = std::chrono::steady_clock::now();
690!
684
                    it->second.location = TaskInfo::DONE;
690!
685

686
                    // Update parent's completed children count
687
                    if (it->second.parent_task_id != -1) {
690!
688
                        auto parent_it =
404✔
689
                            task_registry_.find(it->second.parent_task_id);
404!
690
                        if (parent_it != task_registry_.end()) {
404!
691
                            parent_it->second.completed_children++;
404!
692
                        }
404✔
693
                    }
404✔
694
                }
690✔
695
            }
690✔
696

697
            // Notify scheduler
698
            notify_completion(task);
690!
699

700
        } else {
690✔
701
            try {
702
                std::rethrow_exception(task_error);
13!
703
            } catch (const std::exception& e) {
13!
704
                DFTRACER_UTILS_LOG_ERROR("Task ID %ld ('%s') failed: %s",
13!
705
                                         task->get_id(),
706
                                         task->get_name().c_str(), e.what());
707

708
                task->set_exception(task_error);
13!
709

710
                mark_activity();
13!
711
                ++tasks_completed_;
13✔
712
                context->tasks_executed++;
13✔
713

714
                {
715
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
13!
716
                    auto it = task_registry_.find(task->get_id());
13!
717
                    if (it != task_registry_.end()) {
13!
718
                        it->second.state = TaskInfo::FAILED;
13!
719
                        it->second.completed_at =
13!
720
                            std::chrono::steady_clock::now();
13✔
721
                        it->second.error_message = e.what();
13!
722
                        it->second.location = TaskInfo::DONE;
13!
723
                    }
13✔
724
                }
13✔
725

726
                notify_completion(task);
13!
727

728
            } catch (...) {
13!
UNCOV
729
                DFTRACER_UTILS_LOG_ERROR(
×
730
                    "Task ID %ld ('%s') failed with unknown exception",
731
                    task->get_id(), task->get_name().c_str());
732

UNCOV
733
                task->set_exception(task_error);
×
734

UNCOV
735
                mark_activity();
×
UNCOV
736
                ++tasks_completed_;
×
UNCOV
737
                context->tasks_executed++;
×
738

739
                {
UNCOV
740
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
UNCOV
741
                    auto it = task_registry_.find(task->get_id());
×
UNCOV
742
                    if (it != task_registry_.end()) {
×
UNCOV
743
                        it->second.state = TaskInfo::FAILED;
×
UNCOV
744
                        it->second.completed_at =
×
UNCOV
745
                            std::chrono::steady_clock::now();
×
UNCOV
746
                        it->second.error_message = "Unknown exception";
×
UNCOV
747
                        it->second.location = TaskInfo::DONE;
×
UNCOV
748
                    }
×
UNCOV
749
                }
×
750

UNCOV
751
                notify_completion(task);
×
752
            }
13!
753
        }
754
    }
2,094✔
755

756
    // Clear current task info
757
    context->current_task_id = -1;
702✔
758
    {
759
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
702!
760
        context->current_task_name.clear();
702✔
761
    }
702✔
762

763
    co_return;
702✔
764
}
10,378✔
765

766
void Executor::submit_task(std::shared_ptr<Task> task,
712✔
767
                           std::shared_ptr<std::any> input,
768
                           TaskIndex parent_task_id) {
769
    // Register in task registry
770
    {
771
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
712✔
772

773
        auto [it, inserted] = task_registry_.emplace(
2,138✔
774
            std::piecewise_construct, std::forward_as_tuple(task->get_id()),
712✔
775
            std::forward_as_tuple());
713✔
776

777
        if (inserted) {
712✔
778
            it->second.task_id = task->get_id();
713!
779
            it->second.parent_task_id = parent_task_id;
1,426!
780
            it->second.name = task->get_name();
713!
781
            it->second.state = TaskInfo::QUEUED;
713!
782
            it->second.queued_at = std::chrono::steady_clock::now();
1,426!
783
            it->second.location = TaskInfo::SHARED_QUEUE;
713!
784
            it->second.worker_id = static_cast<std::size_t>(-1);
713!
785

786
            if (parent_task_id != -1) {
713✔
787
                auto parent_it = task_registry_.find(parent_task_id);
425!
788
                if (parent_it != task_registry_.end()) {
425!
789
                    parent_it->second.child_task_ids.push_back(task->get_id());
425!
790
                }
425✔
791
            }
425✔
792
        }
713✔
793
    }
714✔
794

795
    ++total_tasks_submitted_;
714✔
796

797
    // Create Coro, set executor on promise, enqueue released handle
798
    auto coro = run_task(std::move(task), std::move(input));
714!
799
    coro.handle().promise().executor = this;
714!
800
    enqueue(coro.release());
713!
801
}
716✔
802

803
TaskIndex Executor::enqueue_tracked(
312✔
804
    coro::Coro coro, std::string name,
805
    std::shared_ptr<std::atomic<TaskIndex>> tid_out) {
806
    TaskIndex id = next_coro_task_id_.fetch_sub(1, std::memory_order_relaxed);
312✔
807
    coro.handle().promise().task_id = id;
312✔
808
    coro.handle().promise().executor = this;
312✔
809

810
    {
811
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
312✔
812
        auto [it, inserted] = task_registry_.emplace(std::piecewise_construct,
624!
813
                                                     std::forward_as_tuple(id),
312✔
814
                                                     std::forward_as_tuple());
312✔
815
        if (inserted) {
312!
816
            auto now = std::chrono::steady_clock::now();
312✔
817
            it->second.task_id = id;
624!
818
            it->second.parent_task_id = -1;
312!
819
            it->second.name = std::move(name);
312!
820
            it->second.state = TaskInfo::QUEUED;
312!
821
            it->second.queued_at = now;
312!
822
            it->second.location = TaskInfo::SHARED_QUEUE;
312!
823
            it->second.worker_id = static_cast<std::size_t>(-1);
312!
824
        }
312✔
825
    }
312✔
826
    ++total_tasks_submitted_;
312✔
827

828
    if (tid_out) {
312!
829
        tid_out->store(id, std::memory_order_release);
312✔
830
    }
312✔
831

832
    auto handle = coro.release();
312✔
833
    enqueue(handle);
312✔
834
    return id;
312✔
UNCOV
835
}
×
836

837
void Executor::mark_coro_completed(TaskIndex id) {
624✔
838
    bool was_new = false;
624✔
839
    {
840
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
624✔
841
        auto it = task_registry_.find(id);
624!
842
        if (it != task_registry_.end() &&
1,248!
843
            it->second.state != TaskInfo::COMPLETED) {
624!
844
            auto now = std::chrono::steady_clock::now();
312✔
845
            if (it->second.started_at.time_since_epoch().count() == 0) {
312!
846
                it->second.started_at = now;
312!
847
            }
312✔
848
            it->second.state = TaskInfo::COMPLETED;
312!
849
            it->second.completed_at = now;
312!
850
            it->second.location = TaskInfo::DONE;
312!
851
            was_new = true;
312✔
852
        }
312✔
853
    }
624✔
854
    if (was_new) ++tasks_completed_;
624✔
855
}
624✔
856

857
void Executor::schedule_destroy(std::coroutine_handle<> handle) {
2,731✔
858
    if (handle) {
2,731✔
859
        destroy_queue_.enqueue(handle);
2,732✔
860
    }
2,732✔
861
}
2,733✔
862

863
void Executor::drain_destroy_queue() {
859✔
864
    std::coroutine_handle<> to_destroy;
859✔
865
    while (destroy_queue_.try_dequeue(to_destroy)) {
3,598✔
866
        if (to_destroy) {
2,739!
867
            to_destroy.destroy();
2,739✔
868
        }
2,739✔
869
    }
870
}
859✔
871

872
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc