• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26195612357

20 May 2026 11:19PM UTC coverage: 49.859% (-2.3%) from 52.2%
26195612357

push

github

hariharan-devarajan
feat(aggregator): improve system metrics scanning and persistence error handling

16041 of 43831 branches covered (36.6%)

Branch coverage included in aggregate %.

6 of 17 new or added lines in 2 files covered. (35.29%)

1072 existing lines in 104 files now uncovered.

21423 of 31309 relevant lines covered (68.42%)

13054.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.07
/src/dftracer/utils/core/pipeline/executor.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/platform_compat.h>
3
#include <dftracer/utils/core/coro/yield.h>
4
#include <dftracer/utils/core/io/io_backend_factory.h>
5
#include <dftracer/utils/core/pipeline/executor.h>
6
#include <dftracer/utils/core/tasks/coro_scope.h>
7
#include <dftracer/utils/core/tasks/task.h>
8

9
#include <chrono>
10
#include <coroutine>
11
#include <exception>
12
#include <vector>
13

14
namespace dftracer::utils {
15

16
static thread_local void* tls_current_worker_context = nullptr;
17

18
void* get_current_worker_context() { return tls_current_worker_context; }
1,558✔
19

20
void set_current_worker_context(void* context) {
3,430✔
21
    tls_current_worker_context = context;
3,430✔
22
}
3,430✔
23

24
static thread_local Executor* tls_current_executor = nullptr;
25

26
Executor* Executor::current() noexcept { return tls_current_executor; }
354,078✔
27

28
Executor* Executor::set_current(Executor* e) noexcept {
674,871✔
29
    auto* old = tls_current_executor;
674,871✔
30
    tls_current_executor = e;
674,871✔
31
    return old;
674,871✔
32
}
33

34
// Thread-local list of coroutine handles to destroy after the current
35
// resume() returns.  FinalAwaiter pushes here instead of the shared
36
// destroy_queue_ to avoid another worker freeing the frame while
37
// the coroutine-suspend machinery is still accessing it.
38
static thread_local std::vector<std::coroutine_handle<>> tls_pending_destroys;
39

40
void schedule_thread_local_destroy(std::coroutine_handle<> h) {
5,246✔
41
    tls_pending_destroys.push_back(h);
5,246✔
42
}
5,166✔
43

44
void drain_thread_local_destroys() {
19,165✔
45
    Executor* exec = Executor::current();
19,165✔
46
    for (auto h : tls_pending_destroys) {
24,403✔
47
        if (!h) continue;
5,268!
48
        if (exec) {
5,197!
49
            exec->schedule_destroy(h);
5,197!
50
        } else {
51
            h.destroy();
×
52
        }
53
    }
54
    tls_pending_destroys.clear();
19,137✔
55
}
19,035✔
56

57
Executor::Executor(const ExecutorConfig& config)
542✔
58
    : num_threads_(config.num_threads == 0
1,084✔
59
                       ? dftracer_utils_hardware_concurrency()
542✔
60
                       : config.num_threads),
61
      last_activity_ns_(
542✔
62
          std::chrono::steady_clock::now().time_since_epoch().count()),
542✔
63
      idle_timeout_(config.idle_timeout),
542✔
64
      deadlock_timeout_(config.deadlock_timeout),
542✔
65
      io_pool_size_(config.io_pool_size == 0
1,084✔
66
                        ? dftracer_utils_hardware_concurrency()
542✔
67
                        : config.io_pool_size),
68
      io_backend_type_(config.io_backend_type),
542✔
69
      io_batch_threshold_(config.io_batch_threshold) {
1,626!
70
    if (num_threads_ == 0) {
542!
71
        num_threads_ = 2;
×
72
    }
73
    if (io_pool_size_ == 0) {
542!
74
        io_pool_size_ = 2;
×
75
    }
76
    DFTRACER_UTILS_LOG_DEBUG(
77
        "Executor created with %zu threads, idle_timeout=%lld s, "
78
        "deadlock_timeout=%lld s",
79
        num_threads_, idle_timeout_.count(), deadlock_timeout_.count());
80
}
542✔
81

82
Executor::~Executor() {
542✔
83
    shutdown();
542✔
84
    drain_destroy_queue();
542✔
85
}
542✔
86

87
void Executor::start() {
537✔
88
    if (running_) {
537!
89
        DFTRACER_UTILS_LOG_WARN("%s", "Executor already running");
×
90
        return;
×
91
    }
92

93
    running_ = true;
537✔
94
    workers_.clear();
537✔
95
    workers_.reserve(num_threads_);
537✔
96

97
    timer_service_.start();
537✔
98

99
    // Create and start I/O backend before workers so workers
100
    // can use it immediately.
101
    io_backend_ = io::create_io_backend(*this, io_pool_size_, io_backend_type_,
1,074✔
102
                                        io_batch_threshold_);
537✔
103
    io_backend_->start();
537✔
104

105
    // Create all worker contexts first so workers_ is stable before any
106
    // worker thread can try to iterate/steal from it.
107
    for (std::size_t i = 0; i < num_threads_; ++i) {
2,256✔
108
        auto worker = std::make_unique<WorkerContext>(i);
1,719!
109
        worker->last_activity = std::chrono::steady_clock::now();
1,719✔
110
        workers_.push_back(std::move(worker));
1,719!
111
    }
1,719✔
112

113
    // Start worker threads after all contexts are in place.
114
    for (auto& worker : workers_) {
2,256✔
115
        worker->thread =
1,719✔
116
            std::thread(&Executor::worker_thread, this, worker.get());
3,438!
117
    }
118

119
    DFTRACER_UTILS_LOG_DEBUG("Executor started with %zu worker threads",
120
                             num_threads_);
121
}
122

123
void Executor::shutdown() {
1,384✔
124
    if (!running_) {
1,384✔
125
        return;
847✔
126
    }
127

128
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutting down executor");
129
    running_ = false;
537✔
130
    wake_all_workers();
537✔
131

132
    // Join all worker threads (must happen before io_backend_ is
133
    // destroyed, since workers call io_backend_->poll() when idle).
134
    for (auto& worker : workers_) {
2,256✔
135
        if (worker->thread.joinable()) {
1,719!
136
            worker->thread.join();
1,719!
137
        }
138
    }
139

140
    // Stop I/O backend AFTER joining workers (workers may poll the
141
    // backend) but BEFORE clearing workers_.  The I/O backend's
142
    // completion thread may still call enqueue() -> wake_all_workers()
143
    // which accesses WorkerContext cv/mutex, so workers_ must remain
144
    // alive until the completion thread has exited.
145
    if (io_backend_) {
537!
146
        io_backend_->stop();
537✔
147
        io_backend_.reset();
537✔
148
    }
149

150
    // Destroy deferred frames and orphaned run-queue entries BEFORE
151
    // clearing workers_. Frames may hold shared_ptr<Channel> whose
152
    // ConcurrentQueue has TLS producer tokens tied to worker threads.
153
    drain_destroy_queue();
537✔
154
    {
155
        std::coroutine_handle<> orphan;
537✔
156
        while (run_queue_.try_dequeue(orphan)) {
552!
157
            if (orphan) {
15!
158
                orphan.destroy();
15!
159
            }
160
        }
161
    }
162

163
    workers_.clear();
537✔
164
    timer_service_.stop();
537✔
165

166
    // Drain the main thread's thread-local destroy list (for
167
    // coroutines whose FinalAwaiter ran on the main thread).
168
    drain_thread_local_destroys();
537✔
169

170
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor shutdown complete");
171
}
172

173
void Executor::reset() {
×
174
    // Queue will be reset by caller if needed
175
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor reset");
176
}
×
177

178
void Executor::set_completion_callback(CompletionCallback callback) {
317✔
179
    completion_callback_ = std::move(callback);
317✔
180
}
317✔
181

182
void Executor::worker_thread(WorkerContext* context) {
1,719✔
183
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu started", context->worker_id);
184

185
    set_current_worker_context(context);
1,719✔
186
    tls_current_executor = this;
1,719✔
187
    coro::reset_timeslice();
1,719✔
188

189
    while (running_) {
50,177✔
190
        std::coroutine_handle<> pending_resume;
48,353✔
191

192
        // Snapshot the work signal BEFORE checking any queues.
193
        // This ensures that any signal increment (from enqueue +
194
        // signal_global_work) that happens AFTER this load will be detected by
195
        // the wait predicate below, even if the actual queue check sees the
196
        // queue as empty. Loading it inside the else branch (after queue
197
        // checks) creates a race: work can arrive between the queue check and
198
        // the signal load, causing the worker to sleep with the updated signal
199
        // value while work sits in the queue.
200
        const std::uint64_t observed_signal =
201
            work_signal_.load(std::memory_order_acquire);
48,353✔
202

203
        // Run queue: coroutine handles from enqueue() and
204
        // schedule_coroutine_resumption().
205
        if (run_queue_.try_dequeue(pending_resume)) {
48,239!
206
            coro::reset_timeslice();
16,860✔
207
            context->is_idle.store(false, std::memory_order_relaxed);
16,835✔
208
            if (pending_resume && !pending_resume.done()) {
16,939!
209
                auto typed =
210
                    std::coroutine_handle<coro::CoroPromise>::from_address(
16,882✔
211
                        pending_resume.address());
212
                TaskIndex tid = typed.promise().task_id;
16,814✔
213
                if (tid >= 0) {
16,807✔
214
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
11,640!
215
                    auto it = task_registry_.find(tid);
11,650!
216
                    if (it != task_registry_.end() &&
11,650!
217
                        it->second.state == TaskInfo::QUEUED) {
×
218
                        it->second.state = TaskInfo::RUNNING;
×
219
                        it->second.started_at =
×
220
                            std::chrono::steady_clock::now();
×
221
                        it->second.worker_id = context->worker_id;
×
222
                        it->second.location = TaskInfo::EXECUTING;
×
223
                        ++tasks_started_;
×
224
                    }
225
                }
11,650✔
226
                DFTRACER_TSAN_ACQUIRE(pending_resume.address());
227
                pending_resume.resume();
16,817!
228
            }
229
            // Destroy coroutine frames that FinalAwaiter deferred to this
230
            // thread.  Safe: resume() has fully returned, so the frame
231
            // is suspended at final_suspend and no code references it.
232
            drain_thread_local_destroys();
16,938!
233
            drain_destroy_queue();
16,820!
234
        }
235
        // No work available -- sleep until signaled.
236
        else {
237
            context->is_idle.store(true, std::memory_order_relaxed);
31,404✔
238
            // Flush any batched I/O operations before sleeping.
239
            // This ensures pending SQEs are submitted even when no
240
            // new work is arriving (drain trigger).
241
            if (io_backend_) {
31,661!
242
                io_backend_->flush();
31,503!
243
            }
244
            // Opportunistic I/O polling before sleeping.
245
            // If the backend has completions, they'll enqueue new
246
            // work, so skip the sleep and retry the run queue.
247
            if (io_backend_) {
31,899✔
248
                auto reaped = io_backend_->poll(0);
31,902!
249
                if (reaped > 0) {
31,883!
250
                    drain_destroy_queue();
×
251
                    continue;
×
252
                }
253
            }
254
            drain_destroy_queue();
31,884!
255
            work_signal_.wait(observed_signal, std::memory_order_acquire);
31,845✔
256
        }
257
    }
258

259
    // Final drain: destroy any frames deferred during the last resume().
260
    drain_thread_local_destroys();
1,603✔
261

262
    tls_current_executor = nullptr;
1,697✔
263
    set_current_worker_context(nullptr);
1,697✔
264

265
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu terminated", context->worker_id);
266
}
1,707✔
267

268
void Executor::notify_completion(std::shared_ptr<Task> task) {
772✔
269
    // No mutex needed -- the callback is set exactly once during Scheduler
270
    // construction (before any task is submitted) and never modified after.
271
    // on_task_completed uses only atomics, lock-free queues, and properly-
272
    // locked shard mutexes, so concurrent calls from multiple workers are safe.
273
    if (completion_callback_) {
772!
274
        completion_callback_(task);
772!
275
    }
276
}
772✔
277

278
void Executor::request_shutdown() {
5✔
279
    if (shutdown_requested_.load()) {
5!
280
        return;  // Already requested
×
281
    }
282

283
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutdown requested for executor");
284
    shutdown_requested_ = true;
5✔
285
}
286

287
void Executor::schedule_coroutine_resumption(std::coroutine_handle<> handle) {
7,108✔
288
    // Delegate to enqueue() -- unified path for all coroutine handles.
289
    enqueue(handle);
7,108✔
290
}
7,107✔
291

292
void Executor::signal_global_work() {
16,992✔
293
    work_signal_.fetch_add(1, std::memory_order_acq_rel);
16,992✔
294
    // Wake one worker, not all. Each enqueue adds one unit of work,
295
    // so one worker is sufficient. Avoids thundering herd where all
296
    // N threads wake, N-1 find no work, and go back to sleep.
297
    wake_one_worker();
16,992✔
298
}
16,996✔
299

300
void Executor::wake_one_worker() { work_signal_.notify_one(); }
16,997✔
301

302
void Executor::wake_all_workers() {
537✔
303
    work_signal_.fetch_add(1, std::memory_order_release);
537✔
304
    work_signal_.notify_all();
537✔
305
}
537✔
306

307
// Helper function for when_all.h (avoids circular dependency)
308
void schedule_coroutine_resumption_helper(Executor* executor,
7,107✔
309
                                          std::coroutine_handle<> handle) {
310
    if (executor) {
7,107✔
311
        executor->schedule_coroutine_resumption(handle);
7,106✔
312
    }
313
}
7,110✔
314

315
// Helper function for when_any.h (avoids circular dependency)
316
void schedule_destroy_helper(Executor* executor,
7✔
317
                             std::coroutine_handle<> handle) {
318
    if (executor && handle) {
7!
319
        executor->schedule_destroy(handle);
7✔
320
    }
321
}
7✔
322

323
void Executor::enqueue(std::coroutine_handle<> handle) {
16,990✔
324
    if (!handle || handle.done()) {
16,990!
UNCOV
325
        return;  // Invalid or already completed
×
326
    }
327

328
    DFTRACER_TSAN_RELEASE(handle.address());
329
    run_queue_.enqueue(handle);
16,993✔
330
    signal_global_work();
16,993✔
331
}
332

333
bool Executor::is_responsive() const {
2✔
334
    // If shutdown was requested, consider unresponsive
335
    if (shutdown_requested_.load()) {
2!
336
        return false;
×
337
    }
338

339
    // If not running, not responsive
340
    if (!running_.load()) {
2!
341
        return false;
×
342
    }
343

344
    // Check if all threads might be deadlocked
345
    // (all threads busy but no progress for a while)
346
    std::size_t started = tasks_started_.load();
2✔
347
    std::size_t completed = tasks_completed_.load();
2✔
348
    std::size_t active = started - completed;
2✔
349

350
    if (active >= num_threads_) {
2✔
351
        // All threads busy - check if making progress
352
        auto now = std::chrono::steady_clock::now();
1✔
353
        auto last_ns = last_activity_ns_.load(std::memory_order_acquire);
1✔
354
        auto last_tp = std::chrono::steady_clock::time_point(
355
            std::chrono::steady_clock::duration(last_ns));
1✔
356
        auto idle_time = now - last_tp;
1!
357

358
        // If all threads busy but no activity for deadlock_timeout,
359
        // likely deadlocked
360
        if (idle_time > deadlock_timeout_) {
1!
361
            DFTRACER_UTILS_LOG_WARN(
×
362
                "Executor appears deadlocked: %zu threads, %zu active "
363
                "tasks, idle for %lld ms",
364
                num_threads_, active,
365
                std::chrono::duration_cast<std::chrono::milliseconds>(idle_time)
366
                    .count());
367
            return false;
×
368
        }
369
    }
370

371
    return true;
2✔
372
}
373

374
void Executor::mark_activity() {
1,539✔
375
    last_activity_ns_.store(
1,529✔
376
        std::chrono::steady_clock::now().time_since_epoch().count(),
1,539✔
377
        std::memory_order_release);
378
}
1,527✔
379

380
void Executor::update_task_location(TaskIndex task_id,
×
381
                                    TaskInfo::Location location,
382
                                    std::size_t worker_id) {
383
    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
384
    auto it = task_registry_.find(task_id);
×
385
    if (it != task_registry_.end()) {
×
386
        it->second.location = location;
×
387
        if (location == TaskInfo::LOCAL_QUEUE ||
×
388
            location == TaskInfo::EXECUTING) {
389
            it->second.worker_id = worker_id;
×
390
        }
391
    }
392
}
×
393

394
ExecutorProgress Executor::get_progress() const {
21✔
395
    std::shared_lock<std::shared_mutex> lock(registry_mutex_);
21!
396
    ExecutorProgress progress;
21✔
397

398
    // Overall stats
399
    progress.total_tasks_submitted = total_tasks_submitted_.load();
21✔
400
    progress.tasks_completed = tasks_completed_.load();
21✔
401

402
    // Count task states
403
    progress.tasks_queued = 0;
21✔
404
    progress.tasks_running = 0;
21✔
405
    progress.tasks_failed = 0;
21✔
406

407
    for (const auto& [task_id, info] : task_registry_) {
69✔
408
        switch (info.state) {
48!
409
            case TaskInfo::QUEUED:
1✔
410
                progress.tasks_queued++;
1✔
411
                break;
1✔
UNCOV
412
            case TaskInfo::RUNNING:
×
413
            case TaskInfo::WAITING:
414
                progress.tasks_running++;
×
415
                break;
×
416
            case TaskInfo::COMPLETED:
47✔
417
                // Already counted
418
                break;
47✔
UNCOV
419
            case TaskInfo::FAILED:
×
420
                progress.tasks_failed++;
×
421
                break;
×
422
        }
423

424
        if (info.state == TaskInfo::FAILED && !info.error_message.empty()) {
48!
425
            progress.recent_errors.push_back({task_id, info.error_message});
×
426
        }
427
    }
428

429
    for (std::size_t i = 0; i < workers_.size(); ++i) {
59✔
430
        // No per-worker local queues anymore; report 0.
431
        progress.worker_queue_depths.push_back(0);
38!
432
    }
433

434
    // Build task trees (find root tasks)
435
    std::unordered_set<TaskIndex> processed;
42✔
436
    for (const auto& [task_id, info] : task_registry_) {
69✔
437
        if (info.parent_task_id == -1) {  // Root task
48✔
438
            auto task_progress = build_task_progress_tree(task_id, processed);
46!
439
            progress.root_tasks.push_back(task_progress);
46!
440
        }
46✔
441
    }
442

443
    // Worker states
444
    for (const auto& worker : workers_) {
59✔
445
        ExecutorProgress::WorkerStatus status;
38✔
446
        status.worker_id = worker->worker_id;
38✔
447
        status.is_idle = worker->is_idle.load();
38✔
448

449
        TaskIndex current_id = worker->current_task_id.load();
38✔
450
        if (current_id != -1) {
38✔
451
            status.current_task_id = current_id;
3✔
452
            std::lock_guard<std::mutex> name_lock(worker->task_name_mutex);
3!
453
            status.current_task_name = worker->current_task_name;
3!
454
        }
3✔
455

456
        status.local_queue_depth = 0;
38✔
457

458
        progress.workers.push_back(status);
38!
459
    }
38✔
460

461
    return progress;
42✔
462
}
21✔
463

464
TaskProgress Executor::build_task_progress_tree(
48✔
465
    TaskIndex task_id, std::unordered_set<TaskIndex>& processed) const {
466
    TaskProgress progress;
48✔
467

468
    if (processed.count(task_id)) {
48!
469
        // Avoid cycles
470
        progress.task_id = task_id;
×
471
        progress.name = "[Cycle Detected]";
×
472
        return progress;
×
473
    }
474
    processed.insert(task_id);
48!
475

476
    auto it = task_registry_.find(task_id);
48!
477
    if (it == task_registry_.end()) {
48!
478
        progress.task_id = task_id;
×
479
        progress.name = "[Not Found]";
×
480
        return progress;
×
481
    }
482

483
    const TaskInfo& info = it->second;
48✔
484
    progress.task_id = task_id;
48✔
485
    progress.name = info.name;
48!
486

487
    // State
488
    switch (info.state) {
48!
489
        case TaskInfo::QUEUED:
1✔
490
            progress.state = "queued";
1!
491
            break;
1✔
UNCOV
492
        case TaskInfo::RUNNING:
×
493
            progress.state = "running";
×
494
            break;
×
UNCOV
495
        case TaskInfo::WAITING:
×
496
            progress.state = "waiting";
×
497
            break;
×
498
        case TaskInfo::COMPLETED:
47✔
499
            progress.state = "completed";
47!
500
            break;
47✔
UNCOV
501
        case TaskInfo::FAILED:
×
502
            progress.state = "failed";
×
503
            break;
×
504
    }
505

506
    // Timing
507
    auto now = std::chrono::steady_clock::now();
48✔
508
    if (info.state == TaskInfo::QUEUED) {
48✔
509
        progress.queued_duration_ms =
1✔
510
            std::chrono::duration<double, std::milli>(now - info.queued_at)
1!
511
                .count();
1✔
512
        progress.execution_duration_ms = 0;
1✔
513
    } else if (info.state == TaskInfo::RUNNING ||
47!
514
               info.state == TaskInfo::WAITING) {
47!
515
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
×
516
                                          info.started_at - info.queued_at)
×
517
                                          .count();
×
518
        progress.execution_duration_ms =
×
519
            std::chrono::duration<double, std::milli>(now - info.started_at)
×
520
                .count();
×
521
    } else {  // COMPLETED or FAILED
522
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
47!
523
                                          info.started_at - info.queued_at)
47!
524
                                          .count();
47✔
525
        progress.execution_duration_ms =
47✔
526
            std::chrono::duration<double, std::milli>(info.completed_at -
94!
527
                                                      info.started_at)
47!
528
                .count();
47✔
529
    }
530

531
    // Progress
532
    progress.total_subtasks = info.child_task_ids.size();
48✔
533
    progress.completed_subtasks = info.completed_children.load();
48✔
534
    if (progress.total_subtasks > 0) {
48✔
535
        progress.progress_percentage =
2✔
536
            (100.0 * static_cast<double>(progress.completed_subtasks)) /
2✔
537
            static_cast<double>(progress.total_subtasks);
2✔
538
    } else {
539
        progress.progress_percentage =
46✔
540
            (info.state == TaskInfo::COMPLETED) ? 100.0 : 0.0;
46✔
541
    }
542

543
    // Location
544
    switch (info.location) {
48!
545
        case TaskInfo::SHARED_QUEUE:
1✔
546
            progress.location = "shared_queue";
1!
547
            break;
1✔
UNCOV
548
        case TaskInfo::LOCAL_QUEUE:
×
549
            progress.location =
550
                "worker_" + std::to_string(info.worker_id) + "_local";
×
551
            break;
×
UNCOV
552
        case TaskInfo::EXECUTING:
×
553
            progress.location =
554
                "executing_on_worker_" + std::to_string(info.worker_id);
×
555
            break;
×
556
        case TaskInfo::DONE:
47✔
557
            progress.location = "done";
47!
558
            break;
47✔
559
    }
560

561
    // Build children recursively
562
    for (TaskIndex child_id : info.child_task_ids) {
50✔
563
        progress.children.push_back(
2!
564
            build_task_progress_tree(child_id, processed));
4!
565
    }
566

567
    return progress;
48✔
UNCOV
568
}
×
569

570
// ============================================================================
571
// Phase 3: Coro-based task execution
572
// ============================================================================
573

574
coro::Coro Executor::run_task(std::shared_ptr<Task> task,
787!
575
                              std::shared_ptr<std::any> input) {
576
    // Get worker context from TLS (set by worker_thread at start).
577
    auto* context = static_cast<WorkerContext*>(get_current_worker_context());
578

579
    if (!context || !task) {
580
        co_return;
581
    }
582

583
    // Update worker context
584
    context->current_task_id = task->get_id();
585
    {
586
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
587
        context->current_task_name = task->get_name();
588
    }
589
    context->last_activity = std::chrono::steady_clock::now();
590

591
    // Mark task start
592
    mark_activity();
593
    ++tasks_started_;
594

595
    // Update task registry to RUNNING
596
    {
597
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
598
        auto it = task_registry_.find(task->get_id());
599
        if (it != task_registry_.end()) {
600
            it->second.state = TaskInfo::RUNNING;
601
            it->second.started_at = std::chrono::steady_clock::now();
602
            it->second.worker_id = context->worker_id;
603
            it->second.location = TaskInfo::EXECUTING;
604
        }
605
    }
606

607
    task->result().mark_running();
608

609
    {
610
        CoroScope scope(this);
611
        std::exception_ptr task_error;
612

613
        DFTRACER_UTILS_LOG_DEBUG("Worker %zu executing task ID %ld ('%s')",
614
                                 context->worker_id, task->get_id(),
615
                                 task->get_name());
616

617
        try {
618
            auto coro_task = task->execute(scope, *input);
619
            // Propagate executor to the CoroTask's PromiseBase so that
620
            // nested awaitables (when_any, channel, etc.) can schedule
621
            // resumptions.  run_task() is a Coro (CoroPromise, not
622
            // PromiseBase), so the normal PromiseBase propagation in
623
            // CoroTask::await_suspend doesn't fire.
624
            coro_task.handle().promise().set_executor(this);
625
            std::any result = co_await std::move(coro_task);
626

627
            // Set result via TaskResult
628
            task->set_result(std::move(result));
629
        } catch (...) {
630
            task_error = std::current_exception();
631
        }
632

633
        // Always join scope to wait for any sub-spawned coroutines.
634
        // Without this, the scope's JoinHandle would be destroyed
635
        // while FinalAwaiters still reference it (use-after-free).
636
        co_await scope.join();
637

638
        if (!task_error) {
639
            DFTRACER_UTILS_LOG_DEBUG(
640
                "Task ID %ld ('%s') completed successfully", task->get_id(),
641
                task->get_name());
642

643
            // Mark task completion
644
            mark_activity();
645
            ++tasks_completed_;
646
            context->tasks_executed++;
647

648
            // Update task registry to COMPLETED
649
            {
650
                std::unique_lock<std::shared_mutex> lock(registry_mutex_);
651
                auto it = task_registry_.find(task->get_id());
652
                if (it != task_registry_.end()) {
653
                    it->second.state = TaskInfo::COMPLETED;
654
                    it->second.completed_at = std::chrono::steady_clock::now();
655
                    it->second.location = TaskInfo::DONE;
656

657
                    // Update parent's completed children count
658
                    if (it->second.parent_task_id != -1) {
659
                        auto parent_it =
660
                            task_registry_.find(it->second.parent_task_id);
661
                        if (parent_it != task_registry_.end()) {
662
                            parent_it->second.completed_children++;
663
                        }
664
                    }
665
                }
666
            }
667

668
            // Notify scheduler
669
            notify_completion(task);
670

671
        } else {
672
            try {
673
                std::rethrow_exception(task_error);
674
            } catch (const std::exception& e) {
675
                DFTRACER_UTILS_LOG_ERROR("Task ID %ld ('%s') failed: %s",
676
                                         task->get_id(), task->get_name(),
677
                                         e.what());
678

679
                task->set_exception(task_error);
680

681
                mark_activity();
682
                ++tasks_completed_;
683
                context->tasks_executed++;
684

685
                {
686
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
687
                    auto it = task_registry_.find(task->get_id());
688
                    if (it != task_registry_.end()) {
689
                        it->second.state = TaskInfo::FAILED;
690
                        it->second.completed_at =
691
                            std::chrono::steady_clock::now();
692
                        it->second.error_message = e.what();
693
                        it->second.location = TaskInfo::DONE;
694
                    }
695
                }
696

697
                notify_completion(task);
698

699
            } catch (...) {
700
                DFTRACER_UTILS_LOG_ERROR(
701
                    "Task ID %ld ('%s') failed with unknown exception",
702
                    task->get_id(), task->get_name());
703

704
                task->set_exception(task_error);
705

706
                mark_activity();
707
                ++tasks_completed_;
708
                context->tasks_executed++;
709

710
                {
711
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
712
                    auto it = task_registry_.find(task->get_id());
713
                    if (it != task_registry_.end()) {
714
                        it->second.state = TaskInfo::FAILED;
715
                        it->second.completed_at =
716
                            std::chrono::steady_clock::now();
717
                        it->second.error_message = "Unknown exception";
718
                        it->second.location = TaskInfo::DONE;
719
                    }
720
                }
721

722
                notify_completion(task);
723
            }
724
        }
725
    }
726

727
    // Clear current task info
728
    context->current_task_id = -1;
729
    {
730
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
731
        context->current_task_name.clear();
732
    }
733

734
    co_return;
735
}
1,574!
736

737
void Executor::submit_task(std::shared_ptr<Task> task,
787✔
738
                           std::shared_ptr<std::any> input,
739
                           TaskIndex parent_task_id) {
740
    // Register in task registry
741
    {
742
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
787!
743

744
        auto [it, inserted] = task_registry_.emplace(
1,574!
745
            std::piecewise_construct, std::forward_as_tuple(task->get_id()),
787✔
746
            std::forward_as_tuple());
1,574✔
747

748
        if (inserted) {
787!
749
            it->second.task_id = task->get_id();
787✔
750
            it->second.parent_task_id = parent_task_id;
787✔
751
            it->second.name = task->get_name();
787!
752
            it->second.state = TaskInfo::QUEUED;
787✔
753
            it->second.queued_at = std::chrono::steady_clock::now();
787✔
754
            it->second.location = TaskInfo::SHARED_QUEUE;
787✔
755
            it->second.worker_id = static_cast<std::size_t>(-1);
787✔
756

757
            if (parent_task_id != -1) {
787✔
758
                auto parent_it = task_registry_.find(parent_task_id);
427!
759
                if (parent_it != task_registry_.end()) {
427✔
760
                    parent_it->second.child_task_ids.push_back(task->get_id());
423!
761
                }
762
            }
763
        }
764
    }
787✔
765

766
    ++total_tasks_submitted_;
787✔
767

768
    // Create Coro, set executor on promise, enqueue released handle
769
    auto coro = run_task(std::move(task), std::move(input));
1,574!
770
    coro.handle().promise().executor = this;
787✔
771
    enqueue(coro.release());
787!
772
}
787✔
773

774
TaskIndex Executor::enqueue_tracked(
556✔
775
    coro::Coro coro, std::string name,
776
    std::shared_ptr<std::atomic<TaskIndex>> tid_out) {
777
    TaskIndex id = next_coro_task_id_.fetch_sub(1, std::memory_order_relaxed);
556✔
778
    coro.handle().promise().task_id = id;
556✔
779
    coro.handle().promise().executor = this;
556✔
780

781
    {
782
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
556!
783
        auto [it, inserted] = task_registry_.emplace(std::piecewise_construct,
1,112!
784
                                                     std::forward_as_tuple(id),
556✔
785
                                                     std::forward_as_tuple());
1,112✔
786
        if (inserted) {
556!
787
            auto now = std::chrono::steady_clock::now();
556✔
788
            it->second.task_id = id;
556✔
789
            it->second.parent_task_id = -1;
556✔
790
            it->second.name = std::move(name);
556✔
791
            it->second.state = TaskInfo::QUEUED;
556✔
792
            it->second.queued_at = now;
556✔
793
            it->second.location = TaskInfo::SHARED_QUEUE;
556✔
794
            it->second.worker_id = static_cast<std::size_t>(-1);
556✔
795
        }
796
    }
556✔
797
    ++total_tasks_submitted_;
556✔
798

799
    if (tid_out) {
556!
800
        tid_out->store(id, std::memory_order_release);
556✔
801
    }
802

803
    auto handle = coro.release();
556!
804
    enqueue(handle);
556!
805
    return id;
556✔
806
}
807

808
void Executor::mark_coro_completed(TaskIndex id) {
1,112✔
809
    bool was_new = false;
1,112✔
810
    {
811
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,112!
812
        auto it = task_registry_.find(id);
1,112!
813
        if (it != task_registry_.end() &&
2,224!
814
            it->second.state != TaskInfo::COMPLETED) {
1,112✔
815
            auto now = std::chrono::steady_clock::now();
556✔
816
            if (it->second.started_at.time_since_epoch().count() == 0) {
556!
817
                it->second.started_at = now;
556✔
818
            }
819
            it->second.state = TaskInfo::COMPLETED;
556✔
820
            it->second.completed_at = now;
556✔
821
            it->second.location = TaskInfo::DONE;
556✔
822
            was_new = true;
556✔
823
        }
824
    }
1,112✔
825
    if (was_new) ++tasks_completed_;
1,112✔
826
}
1,112✔
827

828
void Executor::schedule_destroy(std::coroutine_handle<> handle) {
5,218✔
829
    if (handle) {
5,218!
830
        destroy_queue_.enqueue(handle);
5,171✔
831
    }
832
}
5,285✔
833

834
void Executor::drain_destroy_queue() {
49,798✔
835
    std::coroutine_handle<> to_destroy;
49,798✔
836
    while (destroy_queue_.try_dequeue(to_destroy)) {
55,120!
837
        if (to_destroy) {
5,338!
838
            to_destroy.destroy();
5,335!
839
        }
840
    }
841
}
49,744✔
842

843
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc