• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26165774062

20 May 2026 01:29PM UTC coverage: 51.981% (-0.2%) from 52.2%
26165774062

Pull #68

github

web-flow
Merge a4eaed4d4 into 6c9aaa7c9
Pull Request #68: feat(aggregator): offset metrics, per-event-name system metrics, and time-bucket persistence

36911 of 92534 branches covered (39.89%)

Branch coverage included in aggregate %.

89 of 185 new or added lines in 8 files covered. (48.11%)

276 existing lines in 9 files now uncovered.

33359 of 42649 relevant lines covered (78.22%)

20407.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.84
/src/dftracer/utils/core/pipeline/executor.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/platform_compat.h>
3
#include <dftracer/utils/core/coro/yield.h>
4
#include <dftracer/utils/core/io/io_backend_factory.h>
5
#include <dftracer/utils/core/pipeline/executor.h>
6
#include <dftracer/utils/core/tasks/coro_scope.h>
7
#include <dftracer/utils/core/tasks/task.h>
8

9
#include <chrono>
10
#include <coroutine>
11
#include <exception>
12
#include <vector>
13

14
namespace dftracer::utils {
15

16
static thread_local void* tls_current_worker_context = nullptr;
17

18
void* get_current_worker_context() { return tls_current_worker_context; }
3,116✔
19

20
void set_current_worker_context(void* context) {
6,626✔
21
    tls_current_worker_context = context;
6,626✔
22
}
6,626✔
23

24
static thread_local Executor* tls_current_executor = nullptr;
25

26
Executor* Executor::current() noexcept { return tls_current_executor; }
712,554✔
27

28
Executor* Executor::set_current(Executor* e) noexcept {
1,349,578✔
29
    auto* old = tls_current_executor;
1,349,578✔
30
    tls_current_executor = e;
1,349,578✔
31
    return old;
1,349,578✔
32
}
33

34
// Thread-local list of coroutine handles to destroy after the current
35
// resume() returns.  FinalAwaiter pushes here instead of the shared
36
// destroy_queue_ to avoid another worker freeing the frame while
37
// the coroutine-suspend machinery is still accessing it.
38
static thread_local std::vector<std::coroutine_handle<>> tls_pending_destroys;
39

40
void schedule_thread_local_destroy(std::coroutine_handle<> h) {
10,275✔
41
    tls_pending_destroys.push_back(h);
10,275✔
42
}
10,174✔
43

44
void drain_thread_local_destroys() {
36,460✔
45
    Executor* exec = Executor::current();
36,460✔
46
    for (auto h : tls_pending_destroys) {
46,718✔
47
        if (!h) continue;
10,272✔
48
        if (exec) {
10,191!
49
            exec->schedule_destroy(h);
10,191!
50
        } else {
5,038✔
51
            h.destroy();
×
52
        }
53
    }
54
    tls_pending_destroys.clear();
36,493✔
55
}
36,403✔
56

57
Executor::Executor(const ExecutorConfig& config)
1,626✔
58
    : num_threads_(config.num_threads == 0
1,084✔
59
                       ? dftracer_utils_hardware_concurrency()
542✔
60
                       : config.num_threads),
61
      last_activity_ns_(
542✔
62
          std::chrono::steady_clock::now().time_since_epoch().count()),
542✔
63
      idle_timeout_(config.idle_timeout),
542✔
64
      deadlock_timeout_(config.deadlock_timeout),
542✔
65
      io_pool_size_(config.io_pool_size == 0
1,084✔
66
                        ? dftracer_utils_hardware_concurrency()
542✔
67
                        : config.io_pool_size),
68
      io_backend_type_(config.io_backend_type),
542✔
69
      io_batch_threshold_(config.io_batch_threshold) {
2,168!
70
    if (num_threads_ == 0) {
1,084✔
71
        num_threads_ = 2;
×
72
    }
73
    if (io_pool_size_ == 0) {
1,084✔
74
        io_pool_size_ = 2;
×
75
    }
76
    DFTRACER_UTILS_LOG_DEBUG(
77
        "Executor created with %zu threads, idle_timeout=%lld s, "
78
        "deadlock_timeout=%lld s",
79
        num_threads_, idle_timeout_.count(), deadlock_timeout_.count());
80
}
1,084✔
81

82
Executor::~Executor() {
1,626✔
83
    shutdown();
1,084!
84
    drain_destroy_queue();
1,084!
85
}
1,626✔
86

87
void Executor::start() {
1,074✔
88
    if (running_) {
1,074!
89
        DFTRACER_UTILS_LOG_WARN("%s", "Executor already running");
×
90
        return;
×
91
    }
92

93
    running_ = true;
1,074✔
94
    workers_.clear();
1,074✔
95
    workers_.reserve(num_threads_);
1,074✔
96

97
    timer_service_.start();
1,074✔
98

99
    // Create and start I/O backend before workers so workers
100
    // can use it immediately.
101
    io_backend_ = io::create_io_backend(*this, io_pool_size_, io_backend_type_,
2,148✔
102
                                        io_batch_threshold_);
1,074✔
103
    io_backend_->start();
1,074✔
104

105
    // Create all worker contexts first so workers_ is stable before any
106
    // worker thread can try to iterate/steal from it.
107
    for (std::size_t i = 0; i < num_threads_; ++i) {
4,390✔
108
        auto worker = std::make_unique<WorkerContext>(i);
3,316!
109
        worker->last_activity = std::chrono::steady_clock::now();
3,316✔
110
        workers_.push_back(std::move(worker));
3,316!
111
    }
3,316✔
112

113
    // Start worker threads after all contexts are in place.
114
    for (auto& worker : workers_) {
4,390✔
115
        worker->thread =
3,316✔
116
            std::thread(&Executor::worker_thread, this, worker.get());
5,035!
117
    }
118

119
    DFTRACER_UTILS_LOG_DEBUG("Executor started with %zu worker threads",
120
                             num_threads_);
121
}
537✔
122

123
void Executor::shutdown() {
2,768✔
124
    if (!running_) {
2,768✔
125
        return;
1,694✔
126
    }
127

128
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutting down executor");
129
    running_ = false;
1,074✔
130
    wake_all_workers();
1,074✔
131

132
    // Join all worker threads (must happen before io_backend_ is
133
    // destroyed, since workers call io_backend_->poll() when idle).
134
    for (auto& worker : workers_) {
4,390✔
135
        if (worker->thread.joinable()) {
3,316✔
136
            worker->thread.join();
3,316!
137
        }
1,597✔
138
    }
139

140
    // Stop I/O backend AFTER joining workers (workers may poll the
141
    // backend) but BEFORE clearing workers_.  The I/O backend's
142
    // completion thread may still call enqueue() -> wake_all_workers()
143
    // which accesses WorkerContext cv/mutex, so workers_ must remain
144
    // alive until the completion thread has exited.
145
    if (io_backend_) {
1,074✔
146
        io_backend_->stop();
1,074✔
147
        io_backend_.reset();
1,074✔
148
    }
537✔
149

150
    // Destroy deferred frames and orphaned run-queue entries BEFORE
151
    // clearing workers_. Frames may hold shared_ptr<Channel> whose
152
    // ConcurrentQueue has TLS producer tokens tied to worker threads.
153
    drain_destroy_queue();
1,074✔
154
    {
155
        std::coroutine_handle<> orphan;
1,074✔
156
        while (run_queue_.try_dequeue(orphan)) {
1,103✔
157
            if (orphan) {
29✔
158
                orphan.destroy();
29!
159
            }
14✔
160
        }
161
    }
162

163
    workers_.clear();
1,074✔
164
    timer_service_.stop();
1,074✔
165

166
    // Drain the main thread's thread-local destroy list (for
167
    // coroutines whose FinalAwaiter ran on the main thread).
168
    drain_thread_local_destroys();
1,074✔
169

170
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor shutdown complete");
171
}
1,384✔
172

173
void Executor::reset() {
×
174
    // Queue will be reset by caller if needed
175
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor reset");
176
}
×
177

178
void Executor::set_completion_callback(CompletionCallback callback) {
634✔
179
    completion_callback_ = std::move(callback);
634✔
180
}
634✔
181

182
void Executor::worker_thread(WorkerContext* context) {
3,316✔
183
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu started", context->worker_id);
184

185
    set_current_worker_context(context);
3,316✔
186
    tls_current_executor = this;
3,316✔
187
    coro::reset_timeslice();
3,316✔
188

189
    while (running_) {
84,099✔
190
        std::coroutine_handle<> pending_resume;
80,591✔
191

192
        // Snapshot the work signal BEFORE checking any queues.
193
        // This ensures that any signal increment (from enqueue +
194
        // signal_global_work) that happens AFTER this load will be detected by
195
        // the wait predicate below, even if the actual queue check sees the
196
        // queue as empty. Loading it inside the else branch (after queue
197
        // checks) creates a race: work can arrive between the queue check and
198
        // the signal load, causing the worker to sleep with the updated signal
199
        // value while work sits in the queue.
200
        const std::uint64_t observed_signal =
32,931✔
201
            work_signal_.load(std::memory_order_acquire);
80,591✔
202

203
        // Run queue: coroutine handles from enqueue() and
204
        // schedule_coroutine_resumption().
205
        if (run_queue_.try_dequeue(pending_resume)) {
80,613✔
206
            coro::reset_timeslice();
32,103✔
207
            context->is_idle.store(false, std::memory_order_relaxed);
32,102✔
208
            if (pending_resume && !pending_resume.done()) {
32,151✔
209
                auto typed =
210
                    std::coroutine_handle<coro::CoroPromise>::from_address(
32,116✔
211
                        pending_resume.address());
15,556✔
212
                TaskIndex tid = typed.promise().task_id;
32,077✔
213
                if (tid >= 0) {
32,049✔
214
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
21,828!
215
                    auto it = task_registry_.find(tid);
21,837!
216
                    if (it != task_registry_.end() &&
21,837!
217
                        it->second.state == TaskInfo::QUEUED) {
×
218
                        it->second.state = TaskInfo::RUNNING;
×
219
                        it->second.started_at =
×
220
                            std::chrono::steady_clock::now();
×
221
                        it->second.worker_id = context->worker_id;
×
222
                        it->second.location = TaskInfo::EXECUTING;
×
223
                        ++tasks_started_;
×
224
                    }
225
                }
21,837✔
226
                DFTRACER_TSAN_ACQUIRE(pending_resume.address());
227
                pending_resume.resume();
32,058!
228
            }
15,556✔
229
            // Destroy coroutine frames that FinalAwaiter deferred to this
230
            // thread.  Safe: resume() has fully returned, so the frame
231
            // is suspended at final_suspend and no code references it.
232
            drain_thread_local_destroys();
32,259!
233
            drain_destroy_queue();
32,162!
234
        }
15,650✔
235
        // No work available -- sleep until signaled.
236
        else {
237
            context->is_idle.store(true, std::memory_order_relaxed);
48,620✔
238
            // Flush any batched I/O operations before sleeping.
239
            // This ensures pending SQEs are submitted even when no
240
            // new work is arriving (drain trigger).
241
            if (io_backend_) {
48,871✔
242
                io_backend_->flush();
48,528!
243
            }
17,288✔
244
            // Opportunistic I/O polling before sleeping.
245
            // If the backend has completions, they'll enqueue new
246
            // work, so skip the sleep and retry the run queue.
247
            if (io_backend_) {
49,055✔
248
                auto reaped = io_backend_->poll(0);
49,060!
249
                if (reaped > 0) {
49,055!
250
                    drain_destroy_queue();
×
251
                    continue;
×
252
                }
253
            }
17,377✔
254
            drain_destroy_queue();
49,055!
255
            work_signal_.wait(observed_signal, std::memory_order_acquire);
49,028✔
256
        }
257
    }
258

259
    // Final drain: destroy any frames deferred during the last resume().
260
    drain_thread_local_destroys();
3,167✔
261

262
    tls_current_executor = nullptr;
3,303✔
263
    set_current_worker_context(nullptr);
3,303✔
264

265
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu terminated", context->worker_id);
266
}
3,307✔
267

268
void Executor::notify_completion(std::shared_ptr<Task> task) {
1,545✔
269
    // No mutex needed -- the callback is set exactly once during Scheduler
270
    // construction (before any task is submitted) and never modified after.
271
    // on_task_completed uses only atomics, lock-free queues, and properly-
272
    // locked shard mutexes, so concurrent calls from multiple workers are safe.
273
    if (completion_callback_) {
1,545✔
274
        completion_callback_(task);
1,545!
275
    }
773✔
276
}
1,544✔
277

278
void Executor::request_shutdown() {
10✔
279
    if (shutdown_requested_.load()) {
10!
280
        return;  // Already requested
×
281
    }
282

283
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutdown requested for executor");
284
    shutdown_requested_ = true;
10✔
285
}
5✔
286

287
void Executor::schedule_coroutine_resumption(std::coroutine_handle<> handle) {
12,732✔
288
    // Delegate to enqueue() -- unified path for all coroutine handles.
289
    enqueue(handle);
12,732✔
290
}
12,732✔
291

292
void Executor::signal_global_work() {
32,271✔
293
    work_signal_.fetch_add(1, std::memory_order_acq_rel);
32,271✔
294
    // Wake one worker, not all. Each enqueue adds one unit of work,
295
    // so one worker is sufficient. Avoids thundering herd where all
296
    // N threads wake, N-1 find no work, and go back to sleep.
297
    wake_one_worker();
32,271✔
298
}
32,268✔
299

300
void Executor::wake_one_worker() { work_signal_.notify_one(); }
32,284✔
301

302
void Executor::wake_all_workers() {
1,074✔
303
    work_signal_.fetch_add(1, std::memory_order_release);
1,074✔
304
    work_signal_.notify_all();
1,074✔
305
}
1,074✔
306

307
// Helper function for when_all.h (avoids circular dependency)
308
void schedule_coroutine_resumption_helper(Executor* executor,
12,730✔
309
                                          std::coroutine_handle<> handle) {
310
    if (executor) {
12,730✔
311
        executor->schedule_coroutine_resumption(handle);
12,729✔
312
    }
5,937✔
313
}
12,731✔
314

315
// Helper function for when_any.h (avoids circular dependency)
316
void schedule_destroy_helper(Executor* executor,
13✔
317
                             std::coroutine_handle<> handle) {
318
    if (executor && handle) {
13!
319
        executor->schedule_destroy(handle);
13✔
320
    }
6✔
321
}
13✔
322

323
void Executor::enqueue(std::coroutine_handle<> handle) {
32,274✔
324
    if (!handle || handle.done()) {
32,274!
UNCOV
325
        return;  // Invalid or already completed
×
326
    }
327

328
    DFTRACER_TSAN_RELEASE(handle.address());
329
    run_queue_.enqueue(handle);
32,275✔
330
    signal_global_work();
32,278✔
331
}
15,594✔
332

333
bool Executor::is_responsive() const {
4✔
334
    // If shutdown was requested, consider unresponsive
335
    if (shutdown_requested_.load()) {
4!
336
        return false;
×
337
    }
338

339
    // If not running, not responsive
340
    if (!running_.load()) {
4✔
341
        return false;
×
342
    }
343

344
    // Check if all threads might be deadlocked
345
    // (all threads busy but no progress for a while)
346
    std::size_t started = tasks_started_.load();
4✔
347
    std::size_t completed = tasks_completed_.load();
4✔
348
    std::size_t active = started - completed;
4✔
349

350
    if (active >= num_threads_) {
4✔
351
        // All threads busy - check if making progress
352
        auto now = std::chrono::steady_clock::now();
2✔
353
        auto last_ns = last_activity_ns_.load(std::memory_order_acquire);
2✔
354
        auto last_tp = std::chrono::steady_clock::time_point(
1✔
355
            std::chrono::steady_clock::duration(last_ns));
2✔
356
        auto idle_time = now - last_tp;
2!
357

358
        // If all threads busy but no activity for deadlock_timeout,
359
        // likely deadlocked
360
        if (idle_time > deadlock_timeout_) {
2!
361
            DFTRACER_UTILS_LOG_WARN(
×
362
                "Executor appears deadlocked: %zu threads, %zu active "
363
                "tasks, idle for %lld ms",
364
                num_threads_, active,
365
                std::chrono::duration_cast<std::chrono::milliseconds>(idle_time)
366
                    .count());
367
            return false;
×
368
        }
369
    }
1✔
370

371
    return true;
4✔
372
}
2✔
373

374
void Executor::mark_activity() {
3,088✔
375
    last_activity_ns_.store(
4,629✔
376
        std::chrono::steady_clock::now().time_since_epoch().count(),
3,088✔
377
        std::memory_order_release);
378
}
3,079✔
379

380
void Executor::update_task_location(TaskIndex task_id,
×
381
                                    TaskInfo::Location location,
382
                                    std::size_t worker_id) {
383
    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
384
    auto it = task_registry_.find(task_id);
×
385
    if (it != task_registry_.end()) {
×
386
        it->second.location = location;
×
387
        if (location == TaskInfo::LOCAL_QUEUE ||
×
388
            location == TaskInfo::EXECUTING) {
389
            it->second.worker_id = worker_id;
×
390
        }
391
    }
392
}
×
393

394
ExecutorProgress Executor::get_progress() const {
42✔
395
    std::shared_lock<std::shared_mutex> lock(registry_mutex_);
42!
396
    ExecutorProgress progress;
42✔
397

398
    // Overall stats
399
    progress.total_tasks_submitted = total_tasks_submitted_.load();
42✔
400
    progress.tasks_completed = tasks_completed_.load();
42✔
401

402
    // Count task states
403
    progress.tasks_queued = 0;
42✔
404
    progress.tasks_running = 0;
42✔
405
    progress.tasks_failed = 0;
42✔
406

407
    for (const auto& [task_id, info] : task_registry_) {
138!
408
        switch (info.state) {
96!
409
            case TaskInfo::QUEUED:
1✔
410
                progress.tasks_queued++;
2✔
411
                break;
2✔
412
            case TaskInfo::RUNNING:
413
            case TaskInfo::WAITING:
414
                progress.tasks_running++;
×
415
                break;
×
416
            case TaskInfo::COMPLETED:
47✔
417
                // Already counted
418
                break;
94✔
419
            case TaskInfo::FAILED:
420
                progress.tasks_failed++;
×
421
                break;
×
422
        }
423

424
        if (info.state == TaskInfo::FAILED && !info.error_message.empty()) {
96!
425
            progress.recent_errors.push_back({task_id, info.error_message});
×
426
        }
427
    }
428

429
    for (std::size_t i = 0; i < workers_.size(); ++i) {
118✔
430
        // No per-worker local queues anymore; report 0.
431
        progress.worker_queue_depths.push_back(0);
76!
432
    }
38✔
433

434
    // Build task trees (find root tasks)
435
    std::unordered_set<TaskIndex> processed;
63✔
436
    for (const auto& [task_id, info] : task_registry_) {
138!
437
        if (info.parent_task_id == -1) {  // Root task
96✔
438
            auto task_progress = build_task_progress_tree(task_id, processed);
92!
439
            progress.root_tasks.push_back(task_progress);
92✔
440
        }
92✔
441
    }
442

443
    // Worker states
444
    for (const auto& worker : workers_) {
118✔
445
        ExecutorProgress::WorkerStatus status;
76✔
446
        status.worker_id = worker->worker_id;
76✔
447
        status.is_idle = worker->is_idle.load();
76✔
448

449
        TaskIndex current_id = worker->current_task_id.load();
76✔
450
        if (current_id != -1) {
76✔
451
            status.current_task_id = current_id;
6!
452
            std::lock_guard<std::mutex> name_lock(worker->task_name_mutex);
6!
453
            status.current_task_name = worker->current_task_name;
6!
454
        }
6✔
455

456
        status.local_queue_depth = 0;
76✔
457

458
        progress.workers.push_back(status);
76!
459
    }
76✔
460

461
    return progress;
63✔
462
}
42!
463

464
TaskProgress Executor::build_task_progress_tree(
96✔
465
    TaskIndex task_id, std::unordered_set<TaskIndex>& processed) const {
466
    TaskProgress progress;
96✔
467

468
    if (processed.count(task_id)) {
96!
469
        // Avoid cycles
470
        progress.task_id = task_id;
×
471
        progress.name = "[Cycle Detected]";
×
472
        return progress;
×
473
    }
474
    processed.insert(task_id);
96!
475

476
    auto it = task_registry_.find(task_id);
96!
477
    if (it == task_registry_.end()) {
96!
478
        progress.task_id = task_id;
×
479
        progress.name = "[Not Found]";
×
480
        return progress;
×
481
    }
482

483
    const TaskInfo& info = it->second;
96!
484
    progress.task_id = task_id;
96✔
485
    progress.name = info.name;
96!
486

487
    // State
488
    switch (info.state) {
96!
489
        case TaskInfo::QUEUED:
1✔
490
            progress.state = "queued";
2!
491
            break;
2✔
492
        case TaskInfo::RUNNING:
493
            progress.state = "running";
×
494
            break;
×
495
        case TaskInfo::WAITING:
496
            progress.state = "waiting";
×
497
            break;
×
498
        case TaskInfo::COMPLETED:
47✔
499
            progress.state = "completed";
94!
500
            break;
94✔
501
        case TaskInfo::FAILED:
502
            progress.state = "failed";
×
503
            break;
×
504
    }
505

506
    // Timing
507
    auto now = std::chrono::steady_clock::now();
96✔
508
    if (info.state == TaskInfo::QUEUED) {
96✔
509
        progress.queued_duration_ms =
2✔
510
            std::chrono::duration<double, std::milli>(now - info.queued_at)
2!
511
                .count();
2!
512
        progress.execution_duration_ms = 0;
2✔
513
    } else if (info.state == TaskInfo::RUNNING ||
95!
514
               info.state == TaskInfo::WAITING) {
94!
515
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
×
516
                                          info.started_at - info.queued_at)
×
517
                                          .count();
×
518
        progress.execution_duration_ms =
×
519
            std::chrono::duration<double, std::milli>(now - info.started_at)
×
520
                .count();
×
521
    } else {  // COMPLETED or FAILED
522
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
141!
523
                                          info.started_at - info.queued_at)
94!
524
                                          .count();
94!
525
        progress.execution_duration_ms =
94✔
526
            std::chrono::duration<double, std::milli>(info.completed_at -
188!
527
                                                      info.started_at)
94!
528
                .count();
94!
529
    }
530

531
    // Progress
532
    progress.total_subtasks = info.child_task_ids.size();
96✔
533
    progress.completed_subtasks = info.completed_children.load();
96✔
534
    if (progress.total_subtasks > 0) {
96✔
535
        progress.progress_percentage =
4✔
536
            (100.0 * static_cast<double>(progress.completed_subtasks)) /
6✔
537
            static_cast<double>(progress.total_subtasks);
4✔
538
    } else {
2✔
539
        progress.progress_percentage =
92✔
540
            (info.state == TaskInfo::COMPLETED) ? 100.0 : 0.0;
92✔
541
    }
542

543
    // Location
544
    switch (info.location) {
96!
545
        case TaskInfo::SHARED_QUEUE:
1✔
546
            progress.location = "shared_queue";
2!
547
            break;
2✔
548
        case TaskInfo::LOCAL_QUEUE:
549
            progress.location =
550
                "worker_" + std::to_string(info.worker_id) + "_local";
×
551
            break;
×
552
        case TaskInfo::EXECUTING:
553
            progress.location =
554
                "executing_on_worker_" + std::to_string(info.worker_id);
×
555
            break;
×
556
        case TaskInfo::DONE:
47✔
557
            progress.location = "done";
94!
558
            break;
94✔
559
    }
560

561
    // Build children recursively
562
    for (TaskIndex child_id : info.child_task_ids) {
100✔
563
        progress.children.push_back(
4!
564
            build_task_progress_tree(child_id, processed));
6!
565
    }
566

567
    return progress;
96✔
568
}
48!
569

570
// ============================================================================
571
// Phase 3: Coro-based task execution
572
// ============================================================================
573

574
coro::Coro Executor::run_task(std::shared_ptr<Task> task,
9,278!
575
                              std::shared_ptr<std::any> input) {
788✔
576
    // Get worker context from TLS (set by worker_thread at start).
577
    auto* context = static_cast<WorkerContext*>(get_current_worker_context());
2,295✔
578

579
    if (!context || !task) {
2,295✔
580
        co_return;
3,832✔
581
    }
582

583
    // Update worker context
584
    context->current_task_id = task->get_id();
2,295✔
585
    {
586
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
774!
587
        context->current_task_name = task->get_name();
774✔
588
    }
774✔
589
    context->last_activity = std::chrono::steady_clock::now();
772✔
590

591
    // Mark task start
592
    mark_activity();
772✔
593
    ++tasks_started_;
773✔
594

595
    // Update task registry to RUNNING
596
    {
597
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
773!
598
        auto it = task_registry_.find(task->get_id());
773!
599
        if (it != task_registry_.end()) {
773!
600
            it->second.state = TaskInfo::RUNNING;
773!
601
            it->second.started_at = std::chrono::steady_clock::now();
773!
602
            it->second.worker_id = context->worker_id;
773!
603
            it->second.location = TaskInfo::EXECUTING;
773!
604
        }
773✔
605
    }
773✔
606

607
    task->result().mark_running();
773!
608

609
    {
610
        CoroScope scope(this);
2,293!
611
        std::exception_ptr task_error;
2,293✔
612

613
        DFTRACER_UTILS_LOG_DEBUG("Worker %zu executing task ID %ld ('%s')",
614
                                 context->worker_id, task->get_id(),
615
                                 task->get_name());
616

617
        try {
618
            auto coro_task = task->execute(scope, *input);
2,293!
619
            // Propagate executor to the CoroTask's PromiseBase so that
620
            // nested awaitables (when_any, channel, etc.) can schedule
621
            // resumptions.  run_task() is a Coro (CoroPromise, not
622
            // PromiseBase), so the normal PromiseBase propagation in
623
            // CoroTask::await_suspend doesn't fire.
624
            coro_task.handle().promise().set_executor(this);
2,293✔
625
            std::any result = co_await std::move(coro_task);
3,065!
626

627
            // Set result via TaskResult
628
            task->set_result(std::move(result));
760!
629
        } catch (...) {
773✔
630
            task_error = std::current_exception();
13✔
631
        }
13!
632

633
        // Always join scope to wait for any sub-spawned coroutines.
634
        // Without this, the scope's JoinHandle would be destroyed
635
        // while FinalAwaiters still reference it (use-after-free).
636
        co_await scope.join();
3,088!
637

638
        if (!task_error) {
773✔
639
            DFTRACER_UTILS_LOG_DEBUG(
640
                "Task ID %ld ('%s') completed successfully", task->get_id(),
641
                task->get_name());
642

643
            // Mark task completion
644
            mark_activity();
760!
645
            ++tasks_completed_;
760✔
646
            context->tasks_executed++;
760✔
647

648
            // Update task registry to COMPLETED
649
            {
650
                std::unique_lock<std::shared_mutex> lock(registry_mutex_);
760!
651
                auto it = task_registry_.find(task->get_id());
760!
652
                if (it != task_registry_.end()) {
760!
653
                    it->second.state = TaskInfo::COMPLETED;
760!
654
                    it->second.completed_at = std::chrono::steady_clock::now();
760!
655
                    it->second.location = TaskInfo::DONE;
760!
656

657
                    // Update parent's completed children count
658
                    if (it->second.parent_task_id != -1) {
760!
659
                        auto parent_it =
414✔
660
                            task_registry_.find(it->second.parent_task_id);
414!
661
                        if (parent_it != task_registry_.end()) {
414!
662
                            parent_it->second.completed_children++;
410!
663
                        }
410✔
664
                    }
414✔
665
                }
760✔
666
            }
760✔
667

668
            // Notify scheduler
669
            notify_completion(task);
760!
670

671
        } else {
760✔
672
            try {
673
                std::rethrow_exception(task_error);
13!
674
            } catch (const std::exception& e) {
13!
675
                DFTRACER_UTILS_LOG_ERROR("Task ID %ld ('%s') failed: %s",
13!
676
                                         task->get_id(), task->get_name(),
677
                                         e.what());
678

679
                task->set_exception(task_error);
13!
680

681
                mark_activity();
13!
682
                ++tasks_completed_;
13✔
683
                context->tasks_executed++;
13✔
684

685
                {
686
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
13!
687
                    auto it = task_registry_.find(task->get_id());
13!
688
                    if (it != task_registry_.end()) {
13!
689
                        it->second.state = TaskInfo::FAILED;
13!
690
                        it->second.completed_at =
13!
691
                            std::chrono::steady_clock::now();
13✔
692
                        it->second.error_message = e.what();
13!
693
                        it->second.location = TaskInfo::DONE;
13!
694
                    }
13✔
695
                }
13✔
696

697
                notify_completion(task);
13!
698

699
            } catch (...) {
13!
700
                DFTRACER_UTILS_LOG_ERROR(
×
701
                    "Task ID %ld ('%s') failed with unknown exception",
702
                    task->get_id(), task->get_name());
703

704
                task->set_exception(task_error);
×
705

706
                mark_activity();
×
707
                ++tasks_completed_;
708
                context->tasks_executed++;
709

710
                {
711
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
712
                    auto it = task_registry_.find(task->get_id());
×
713
                    if (it != task_registry_.end()) {
×
714
                        it->second.state = TaskInfo::FAILED;
×
715
                        it->second.completed_at =
×
716
                            std::chrono::steady_clock::now();
717
                        it->second.error_message = "Unknown exception";
×
718
                        it->second.location = TaskInfo::DONE;
×
719
                    }
720
                }
721

722
                notify_completion(task);
×
723
            }
13!
724
        }
725
    }
2,317✔
726

727
    // Clear current task info
728
    context->current_task_id = -1;
773✔
729
    {
730
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
773!
731
        context->current_task_name.clear();
773✔
732
    }
773✔
733

734
    co_return;
773✔
735
}
13,065!
736

737
void Executor::submit_task(std::shared_ptr<Task> task,
1,574✔
738
                           std::shared_ptr<std::any> input,
739
                           TaskIndex parent_task_id) {
740
    // Register in task registry
741
    {
742
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,574!
743

744
        auto [it, inserted] = task_registry_.emplace(
4,722!
745
            std::piecewise_construct, std::forward_as_tuple(task->get_id()),
1,574!
746
            std::forward_as_tuple());
2,361✔
747

748
        if (inserted) {
1,574!
749
            it->second.task_id = task->get_id();
1,574!
750
            it->second.parent_task_id = parent_task_id;
2,361!
751
            it->second.name = task->get_name();
1,574!
752
            it->second.state = TaskInfo::QUEUED;
1,574!
753
            it->second.queued_at = std::chrono::steady_clock::now();
2,361!
754
            it->second.location = TaskInfo::SHARED_QUEUE;
1,574!
755
            it->second.worker_id = static_cast<std::size_t>(-1);
1,574!
756

757
            if (parent_task_id != -1) {
1,574✔
758
                auto parent_it = task_registry_.find(parent_task_id);
870!
759
                if (parent_it != task_registry_.end()) {
870✔
760
                    parent_it->second.child_task_ids.push_back(task->get_id());
862!
761
                }
435✔
762
            }
439✔
763
        }
787✔
764
    }
1,574✔
765

766
    ++total_tasks_submitted_;
1,574✔
767

768
    // Create Coro, set executor on promise, enqueue released handle
769
    auto coro = run_task(std::move(task), std::move(input));
2,361!
770
    coro.handle().promise().executor = this;
1,574!
771
    enqueue(coro.release());
1,574!
772
}
1,574✔
773

774
TaskIndex Executor::enqueue_tracked(
1,112✔
775
    coro::Coro coro, std::string name,
776
    std::shared_ptr<std::atomic<TaskIndex>> tid_out) {
777
    TaskIndex id = next_coro_task_id_.fetch_sub(1, std::memory_order_relaxed);
1,112✔
778
    coro.handle().promise().task_id = id;
1,112✔
779
    coro.handle().promise().executor = this;
1,112✔
780

781
    {
782
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,112!
783
        auto [it, inserted] = task_registry_.emplace(std::piecewise_construct,
2,224!
784
                                                     std::forward_as_tuple(id),
1,112✔
785
                                                     std::forward_as_tuple());
1,668✔
786
        if (inserted) {
1,112!
787
            auto now = std::chrono::steady_clock::now();
1,112✔
788
            it->second.task_id = id;
1,668!
789
            it->second.parent_task_id = -1;
1,112!
790
            it->second.name = std::move(name);
1,112!
791
            it->second.state = TaskInfo::QUEUED;
1,112!
792
            it->second.queued_at = now;
1,112!
793
            it->second.location = TaskInfo::SHARED_QUEUE;
1,112!
794
            it->second.worker_id = static_cast<std::size_t>(-1);
1,112!
795
        }
556✔
796
    }
1,112✔
797
    ++total_tasks_submitted_;
1,112✔
798

799
    if (tid_out) {
1,112✔
800
        tid_out->store(id, std::memory_order_release);
1,112✔
801
    }
556✔
802

803
    auto handle = coro.release();
1,112!
804
    enqueue(handle);
1,112!
805
    return id;
1,112✔
806
}
807

808
void Executor::mark_coro_completed(TaskIndex id) {
2,224✔
809
    bool was_new = false;
2,224✔
810
    {
811
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
2,224!
812
        auto it = task_registry_.find(id);
2,224!
813
        if (it != task_registry_.end() &&
4,448!
814
            it->second.state != TaskInfo::COMPLETED) {
2,224✔
815
            auto now = std::chrono::steady_clock::now();
1,112✔
816
            if (it->second.started_at.time_since_epoch().count() == 0) {
1,112!
817
                it->second.started_at = now;
1,112!
818
            }
556✔
819
            it->second.state = TaskInfo::COMPLETED;
1,112!
820
            it->second.completed_at = now;
1,112!
821
            it->second.location = TaskInfo::DONE;
1,112!
822
            was_new = true;
1,112✔
823
        }
556✔
824
    }
2,224✔
825
    if (was_new) ++tasks_completed_;
2,224✔
826
}
2,224✔
827

828
void Executor::schedule_destroy(std::coroutine_handle<> handle) {
10,206✔
829
    if (handle) {
10,206✔
830
        destroy_queue_.enqueue(handle);
10,158✔
831
    }
5,040✔
832
}
10,313✔
833

834
void Executor::drain_destroy_queue() {
83,239✔
835
    std::coroutine_handle<> to_destroy;
83,239✔
836
    while (destroy_queue_.try_dequeue(to_destroy)) {
93,660✔
837
        if (to_destroy) {
10,424✔
838
            to_destroy.destroy();
10,407!
839
        }
5,081✔
840
    }
841
}
83,243✔
842

843
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc