• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28278922221

27 Jun 2026 04:43AM UTC coverage: 52.227% (+0.1%) from 52.111%
28278922221

Pull #79

github

web-flow
Merge 478a5fe2e into 53ad1e86c
Pull Request #79: Add Valgrind memory checking (C++, Python, MPI) and fix the bugs it found

37166 of 92481 branches covered (40.19%)

Branch coverage included in aggregate %.

129 of 144 new or added lines in 11 files covered. (89.58%)

39 existing lines in 9 files now uncovered.

33496 of 42816 relevant lines covered (78.23%)

20388.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.42
/src/dftracer/utils/core/pipeline/executor.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/platform_compat.h>
3
#include <dftracer/utils/core/coro/yield.h>
4
#include <dftracer/utils/core/io/io_backend_factory.h>
5
#include <dftracer/utils/core/pipeline/executor.h>
6
#include <dftracer/utils/core/tasks/coro_scope.h>
7
#include <dftracer/utils/core/tasks/task.h>
8

9
#include <chrono>
10
#include <coroutine>
11
#include <exception>
12
#include <vector>
13

14
namespace dftracer::utils {
15

16
static thread_local void* tls_current_worker_context = nullptr;
17

18
void* get_current_worker_context() { return tls_current_worker_context; }
3,119✔
19

20
void set_current_worker_context(void* context) {
6,607✔
21
    tls_current_worker_context = context;
6,607✔
22
}
6,607✔
23

24
static thread_local Executor* tls_current_executor = nullptr;
25

26
Executor* Executor::current() noexcept { return tls_current_executor; }
715,128✔
27

28
Executor* Executor::set_current(Executor* e) noexcept {
1,348,294✔
29
    auto* old = tls_current_executor;
1,348,294✔
30
    tls_current_executor = e;
1,348,294✔
31
    return old;
1,348,294✔
32
}
33

34
// Thread-local list of coroutine handles to destroy after the current
35
// resume() returns.  FinalAwaiter pushes here instead of the shared
36
// destroy_queue_ to avoid another worker freeing the frame while
37
// the coroutine-suspend machinery is still accessing it.
38
static thread_local std::vector<std::coroutine_handle<>> tls_pending_destroys;
39

40
void schedule_thread_local_destroy(std::coroutine_handle<> h) {
10,412✔
41
    tls_pending_destroys.push_back(h);
10,412✔
42
}
10,310✔
43

44
void drain_thread_local_destroys() {
37,817✔
45
    Executor* exec = Executor::current();
37,817✔
46
    for (auto h : tls_pending_destroys) {
48,242✔
47
        if (!h) continue;
10,476✔
48
        if (exec) {
10,399!
49
            exec->schedule_destroy(h);
10,399!
50
        } else {
5,139✔
51
            h.destroy();
×
52
        }
53
    }
54
    tls_pending_destroys.clear();
37,836✔
55
}
37,673✔
56

57
Executor::Executor(const ExecutorConfig& config)
2,710!
58
    : num_threads_(config.num_threads == 0
1,629✔
59
                       ? dftracer_utils_hardware_concurrency()
545✔
60
                       : config.num_threads),
539✔
61
      last_activity_ns_(
1,626✔
62
          std::chrono::steady_clock::now().time_since_epoch().count()),
1,084!
63
      idle_timeout_(config.idle_timeout),
1,084✔
64
      deadlock_timeout_(config.deadlock_timeout),
1,084✔
65
      io_pool_size_(config.io_pool_size == 0
2,074✔
66
                        ? dftracer_utils_hardware_concurrency()
990✔
67
                        : config.io_pool_size),
94✔
68
      io_backend_type_(config.io_backend_type),
1,084✔
69
      io_batch_threshold_(config.io_batch_threshold) {
2,710!
70
    if (num_threads_ == 0) {
1,084✔
71
        num_threads_ = 2;
×
72
    }
73
    if (io_pool_size_ == 0) {
1,084✔
74
        io_pool_size_ = 2;
×
75
    }
76
    DFTRACER_UTILS_LOG_DEBUG(
77
        "Executor created with %zu threads, idle_timeout=%lld s, "
78
        "deadlock_timeout=%lld s",
79
        num_threads_, idle_timeout_.count(), deadlock_timeout_.count());
80
}
1,626✔
81

82
Executor::~Executor() {
1,626✔
83
    shutdown();
1,084!
84
    drain_destroy_queue();
1,084!
85
}
1,626✔
86

87
void Executor::start() {
1,074✔
88
    if (running_) {
1,074!
89
        DFTRACER_UTILS_LOG_WARN("%s", "Executor already running");
×
90
        return;
×
91
    }
92

93
    running_ = true;
1,074✔
94
    workers_.clear();
1,074✔
95
    workers_.reserve(num_threads_);
1,074✔
96

97
    timer_service_.start();
1,074✔
98

99
    // Create and start I/O backend before workers so workers
100
    // can use it immediately.
101
    io_backend_ = io::create_io_backend(*this, io_pool_size_, io_backend_type_,
2,148✔
102
                                        io_batch_threshold_);
1,074✔
103
    io_backend_->start();
1,074✔
104

105
    // Create all worker contexts first so workers_ is stable before any
106
    // worker thread can try to iterate/steal from it.
107
    for (std::size_t i = 0; i < num_threads_; ++i) {
4,390✔
108
        auto worker = std::make_unique<WorkerContext>(i);
3,316!
109
        worker->last_activity = std::chrono::steady_clock::now();
3,316✔
110
        workers_.push_back(std::move(worker));
3,316!
111
    }
3,316✔
112

113
    // Start worker threads after all contexts are in place.
114
    for (auto& worker : workers_) {
4,390✔
115
        worker->thread =
3,316✔
116
            std::thread(&Executor::worker_thread, this, worker.get());
5,035!
117
    }
118

119
    DFTRACER_UTILS_LOG_DEBUG("Executor started with %zu worker threads",
120
                             num_threads_);
121
}
537✔
122

123
void Executor::shutdown() {
2,768✔
124
    if (!running_) {
2,768✔
125
        return;
1,694✔
126
    }
127

128
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutting down executor");
129
    running_ = false;
1,074✔
130
    wake_all_workers();
1,074✔
131

132
    // Join all worker threads (must happen before io_backend_ is
133
    // destroyed, since workers call io_backend_->poll() when idle).
134
    for (auto& worker : workers_) {
4,390✔
135
        if (worker->thread.joinable()) {
3,316✔
136
            worker->thread.join();
3,316!
137
        }
1,597✔
138
    }
139

140
    // Stop I/O backend AFTER joining workers (workers may poll the
141
    // backend) but BEFORE clearing workers_.  The I/O backend's
142
    // completion thread may still call enqueue() -> wake_all_workers()
143
    // which accesses WorkerContext cv/mutex, so workers_ must remain
144
    // alive until the completion thread has exited.
145
    if (io_backend_) {
1,074✔
146
        io_backend_->stop();
1,074✔
147
        io_backend_.reset();
1,074✔
148
    }
537✔
149

150
    // Destroy deferred frames and orphaned run-queue entries BEFORE
151
    // clearing workers_. Frames may hold shared_ptr<Channel> whose
152
    // ConcurrentQueue has TLS producer tokens tied to worker threads.
153
    drain_destroy_queue();
1,074✔
154
    {
155
        RunQueueEntry orphan;
1,074✔
156
        while (run_queue_.try_dequeue(orphan)) {
1,099✔
157
            if (orphan.handle) {
25✔
158
                orphan.handle.destroy();
25!
159
            }
10✔
160
        }
161
    }
162

163
    workers_.clear();
1,074✔
164
    timer_service_.stop();
1,074✔
165

166
    // Drain the main thread's thread-local destroy list (for
167
    // coroutines whose FinalAwaiter ran on the main thread).
168
    drain_thread_local_destroys();
1,074✔
169

170
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor shutdown complete");
171
}
1,384✔
172

173
void Executor::reset() {
×
174
    // Queue will be reset by caller if needed
175
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor reset");
176
}
×
177

178
void Executor::set_completion_callback(CompletionCallback callback) {
634✔
179
    completion_callback_ = std::move(callback);
634✔
180
}
634✔
181

182
void Executor::worker_thread(WorkerContext* context) {
3,316✔
183
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu started", context->worker_id);
184

185
    set_current_worker_context(context);
3,316✔
186
    tls_current_executor = this;
3,316✔
187
    coro::reset_timeslice();
3,316✔
188

189
    while (running_) {
90,888✔
190
        RunQueueEntry pending_entry;
87,271✔
191

192
        // Snapshot the work signal BEFORE checking any queues.
193
        // This ensures that any signal increment (from enqueue +
194
        // signal_global_work) that happens AFTER this load will be detected by
195
        // the wait predicate below, even if the actual queue check sees the
196
        // queue as empty. Loading it inside the else branch (after queue
197
        // checks) creates a race: work can arrive between the queue check and
198
        // the signal load, causing the worker to sleep with the updated signal
199
        // value while work sits in the queue.
200
        const std::uint64_t observed_signal =
39,218✔
201
            work_signal_.load(std::memory_order_acquire);
87,271✔
202

203
        // Run queue: coroutine handles from enqueue() and
204
        // schedule_coroutine_resumption().
205
        if (run_queue_.try_dequeue(pending_entry)) {
87,395✔
206
            coro::reset_timeslice();
33,438✔
207
            context->is_idle.store(false, std::memory_order_relaxed);
33,397✔
208
            std::coroutine_handle<> pending_resume = pending_entry.handle;
33,513✔
209
            if (pending_resume && !pending_resume.done()) {
33,513✔
210
                // Id comes from the entry, not the promise: foreign promise
211
                // types have no task_id field.
212
                TaskIndex tid = pending_entry.task_id;
33,440✔
213
                if (tid >= 0) {
33,440✔
UNCOV
214
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
UNCOV
215
                    auto it = task_registry_.find(tid);
×
UNCOV
216
                    if (it != task_registry_.end() &&
×
217
                        it->second.state == TaskInfo::QUEUED) {
×
218
                        it->second.state = TaskInfo::RUNNING;
×
219
                        it->second.started_at =
×
220
                            std::chrono::steady_clock::now();
×
221
                        it->second.worker_id = context->worker_id;
×
222
                        it->second.location = TaskInfo::EXECUTING;
×
223
                        ++tasks_started_;
×
224
                    }
UNCOV
225
                }
×
226
                DFTRACER_TSAN_ACQUIRE(pending_resume.address());
227
                pending_resume.resume();
33,440!
228
            }
16,565✔
229
            // Destroy coroutine frames that FinalAwaiter deferred to this
230
            // thread.  Safe: resume() has fully returned, so the frame
231
            // is suspended at final_suspend and no code references it.
232
            drain_thread_local_destroys();
33,513!
233
            drain_destroy_queue();
33,351!
234
        }
16,583✔
235
        // No work available -- sleep until signaled.
236
        else {
237
            context->is_idle.store(true, std::memory_order_relaxed);
54,002✔
238
            // Flush any batched I/O operations before sleeping.
239
            // This ensures pending SQEs are submitted even when no
240
            // new work is arriving (drain trigger).
241
            if (io_backend_) {
54,207✔
242
                io_backend_->flush();
54,001!
243
            }
22,606✔
244
            // Opportunistic I/O polling before sleeping.
245
            // If the backend has completions, they'll enqueue new
246
            // work, so skip the sleep and retry the run queue.
247
            if (io_backend_) {
54,395✔
248
                auto reaped = io_backend_->poll(0);
54,392!
249
                if (reaped > 0) {
54,382!
250
                    drain_destroy_queue();
×
251
                    continue;
×
252
                }
253
            }
22,635✔
254
            drain_destroy_queue();
54,382!
255
            work_signal_.wait(observed_signal, std::memory_order_acquire);
54,349✔
256
        }
257
    }
258

259
    // Final drain: destroy any frames deferred during the last resume().
260
    drain_thread_local_destroys();
3,058✔
261

262
    tls_current_executor = nullptr;
3,259✔
263
    set_current_worker_context(nullptr);
3,259✔
264

265
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu terminated", context->worker_id);
266
}
3,258✔
267

268
void Executor::notify_completion(std::shared_ptr<Task> task) {
1,549✔
269
    // No mutex needed -- the callback is set exactly once during Scheduler
270
    // construction (before any task is submitted) and never modified after.
271
    // on_task_completed uses only atomics, lock-free queues, and properly-
272
    // locked shard mutexes, so concurrent calls from multiple workers are safe.
273
    if (completion_callback_) {
1,549✔
274
        completion_callback_(task);
1,549!
275
    }
777✔
276
}
1,548✔
277

278
void Executor::request_shutdown() {
10✔
279
    if (shutdown_requested_.load()) {
10!
280
        return;  // Already requested
×
281
    }
282

283
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutdown requested for executor");
284
    shutdown_requested_ = true;
10✔
285
}
5✔
286

287
void Executor::schedule_coroutine_resumption(std::coroutine_handle<> handle) {
13,857✔
288
    // Delegate to enqueue() -- unified path for all coroutine handles.
289
    enqueue(handle);
13,857✔
290
}
13,859✔
291

292
void Executor::signal_global_work() {
33,588✔
293
    work_signal_.fetch_add(1, std::memory_order_acq_rel);
33,588✔
294
    // Wake one worker, not all. Each enqueue adds one unit of work,
295
    // so one worker is sufficient. Avoids thundering herd where all
296
    // N threads wake, N-1 find no work, and go back to sleep.
297
    wake_one_worker();
33,588✔
298
}
33,586✔
299

300
void Executor::wake_one_worker() { work_signal_.notify_one(); }
33,604✔
301

302
void Executor::wake_all_workers() {
1,074✔
303
    work_signal_.fetch_add(1, std::memory_order_release);
1,074✔
304
    work_signal_.notify_all();
1,074✔
305
}
1,074✔
306

307
// Helper function for when_all.h (avoids circular dependency)
308
void schedule_coroutine_resumption_helper(Executor* executor,
13,859✔
309
                                          std::coroutine_handle<> handle) {
310
    if (executor) {
13,859✔
311
        executor->schedule_coroutine_resumption(handle);
13,859✔
312
    }
6,841✔
313
}
13,857✔
314

315
// Helper function for when_any.h (avoids circular dependency)
316
void schedule_destroy_helper(Executor* executor,
14✔
317
                             std::coroutine_handle<> handle) {
318
    if (executor && handle) {
14!
319
        executor->schedule_destroy(handle);
14✔
320
    }
7✔
321
}
14✔
322

323
void Executor::enqueue(std::coroutine_handle<> handle, TaskIndex task_id) {
33,604✔
324
    if (!handle || handle.done()) {
33,604!
325
        return;  // Invalid or already completed
6✔
326
    }
327

328
    DFTRACER_TSAN_RELEASE(handle.address());
329
    run_queue_.enqueue(RunQueueEntry{handle, task_id});
33,596!
330
    signal_global_work();
33,597✔
331
}
16,590✔
332

333
bool Executor::is_responsive() const {
4✔
334
    // If shutdown was requested, consider unresponsive
335
    if (shutdown_requested_.load()) {
4!
336
        return false;
×
337
    }
338

339
    // If not running, not responsive
340
    if (!running_.load()) {
4✔
341
        return false;
×
342
    }
343

344
    // Check if all threads might be deadlocked
345
    // (all threads busy but no progress for a while)
346
    std::size_t started = tasks_started_.load();
4✔
347
    std::size_t completed = tasks_completed_.load();
4✔
348
    std::size_t active = started - completed;
4✔
349

350
    if (active >= num_threads_) {
4✔
351
        // All threads busy - check if making progress
352
        auto now = std::chrono::steady_clock::now();
2✔
353
        auto last_ns = last_activity_ns_.load(std::memory_order_acquire);
2✔
354
        auto last_tp = std::chrono::steady_clock::time_point(
1✔
355
            std::chrono::steady_clock::duration(last_ns));
2✔
356
        auto idle_time = now - last_tp;
2!
357

358
        // If all threads busy but no activity for deadlock_timeout,
359
        // likely deadlocked
360
        if (idle_time > deadlock_timeout_) {
2!
361
            DFTRACER_UTILS_LOG_WARN(
×
362
                "Executor appears deadlocked: %zu threads, %zu active "
363
                "tasks, idle for %lld ms",
364
                num_threads_, active,
365
                std::chrono::duration_cast<std::chrono::milliseconds>(idle_time)
366
                    .count());
367
            return false;
×
368
        }
369
    }
1✔
370

371
    return true;
4✔
372
}
2✔
373

374
void Executor::mark_activity() {
3,093✔
375
    last_activity_ns_.store(
4,640✔
376
        std::chrono::steady_clock::now().time_since_epoch().count(),
3,093✔
377
        std::memory_order_release);
378
}
3,089✔
379

380
void Executor::update_task_location(TaskIndex task_id,
×
381
                                    TaskInfo::Location location,
382
                                    std::size_t worker_id) {
383
    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
384
    auto it = task_registry_.find(task_id);
×
385
    if (it != task_registry_.end()) {
×
386
        it->second.location = location;
×
387
        if (location == TaskInfo::LOCAL_QUEUE ||
×
388
            location == TaskInfo::EXECUTING) {
389
            it->second.worker_id = worker_id;
×
390
        }
391
    }
392
}
×
393

394
ExecutorProgress Executor::get_progress() const {
42✔
395
    std::shared_lock<std::shared_mutex> lock(registry_mutex_);
42!
396
    ExecutorProgress progress;
42✔
397

398
    // Overall stats
399
    progress.total_tasks_submitted = total_tasks_submitted_.load();
42✔
400
    progress.tasks_completed = tasks_completed_.load();
42✔
401

402
    // Count task states
403
    progress.tasks_queued = 0;
42✔
404
    progress.tasks_running = 0;
42✔
405
    progress.tasks_failed = 0;
42✔
406

407
    for (const auto& [task_id, info] : task_registry_) {
138!
408
        switch (info.state) {
96!
409
            case TaskInfo::QUEUED:
1✔
410
                progress.tasks_queued++;
2✔
411
                break;
2✔
412
            case TaskInfo::RUNNING:
413
            case TaskInfo::WAITING:
414
                progress.tasks_running++;
×
415
                break;
×
416
            case TaskInfo::COMPLETED:
47✔
417
                // Already counted
418
                break;
94✔
419
            case TaskInfo::FAILED:
420
                progress.tasks_failed++;
×
421
                break;
×
422
        }
423

424
        if (info.state == TaskInfo::FAILED && !info.error_message.empty()) {
96!
425
            progress.recent_errors.push_back({task_id, info.error_message});
×
426
        }
427
    }
428

429
    for (std::size_t i = 0; i < workers_.size(); ++i) {
118✔
430
        // No per-worker local queues anymore; report 0.
431
        progress.worker_queue_depths.push_back(0);
76!
432
    }
38✔
433

434
    // Build task trees (find root tasks)
435
    std::unordered_set<TaskIndex> processed;
63✔
436
    for (const auto& [task_id, info] : task_registry_) {
138!
437
        if (info.parent_task_id == -1) {  // Root task
96✔
438
            auto task_progress = build_task_progress_tree(task_id, processed);
92!
439
            progress.root_tasks.push_back(task_progress);
92✔
440
        }
92✔
441
    }
442

443
    // Worker states
444
    for (const auto& worker : workers_) {
118✔
445
        ExecutorProgress::WorkerStatus status;
76✔
446
        status.worker_id = worker->worker_id;
76✔
447
        status.is_idle = worker->is_idle.load();
76✔
448

449
        TaskIndex current_id = worker->current_task_id.load();
76✔
450
        if (current_id != -1) {
76✔
451
            status.current_task_id = current_id;
6!
452
            std::lock_guard<std::mutex> name_lock(worker->task_name_mutex);
6!
453
            status.current_task_name = worker->current_task_name;
6!
454
        }
6✔
455

456
        status.local_queue_depth = 0;
76✔
457

458
        progress.workers.push_back(status);
76!
459
    }
76✔
460

461
    return progress;
63✔
462
}
42!
463

464
TaskProgress Executor::build_task_progress_tree(
96✔
465
    TaskIndex task_id, std::unordered_set<TaskIndex>& processed) const {
466
    TaskProgress progress;
96✔
467

468
    if (processed.count(task_id)) {
96!
469
        // Avoid cycles
470
        progress.task_id = task_id;
×
471
        progress.name = "[Cycle Detected]";
×
472
        return progress;
×
473
    }
474
    processed.insert(task_id);
96!
475

476
    auto it = task_registry_.find(task_id);
96!
477
    if (it == task_registry_.end()) {
96!
478
        progress.task_id = task_id;
×
479
        progress.name = "[Not Found]";
×
480
        return progress;
×
481
    }
482

483
    const TaskInfo& info = it->second;
96!
484
    progress.task_id = task_id;
96✔
485
    progress.name = info.name;
96!
486

487
    // State
488
    switch (info.state) {
96!
489
        case TaskInfo::QUEUED:
1✔
490
            progress.state = "queued";
2!
491
            break;
2✔
492
        case TaskInfo::RUNNING:
493
            progress.state = "running";
×
494
            break;
×
495
        case TaskInfo::WAITING:
496
            progress.state = "waiting";
×
497
            break;
×
498
        case TaskInfo::COMPLETED:
47✔
499
            progress.state = "completed";
94!
500
            break;
94✔
501
        case TaskInfo::FAILED:
502
            progress.state = "failed";
×
503
            break;
×
504
    }
505

506
    // Timing
507
    auto now = std::chrono::steady_clock::now();
96✔
508
    if (info.state == TaskInfo::QUEUED) {
96✔
509
        progress.queued_duration_ms =
2✔
510
            std::chrono::duration<double, std::milli>(now - info.queued_at)
2!
511
                .count();
2!
512
        progress.execution_duration_ms = 0;
2✔
513
    } else if (info.state == TaskInfo::RUNNING ||
95!
514
               info.state == TaskInfo::WAITING) {
94!
515
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
×
516
                                          info.started_at - info.queued_at)
×
517
                                          .count();
×
518
        progress.execution_duration_ms =
×
519
            std::chrono::duration<double, std::milli>(now - info.started_at)
×
520
                .count();
×
521
    } else {  // COMPLETED or FAILED
522
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
141!
523
                                          info.started_at - info.queued_at)
94!
524
                                          .count();
94!
525
        progress.execution_duration_ms =
94✔
526
            std::chrono::duration<double, std::milli>(info.completed_at -
188!
527
                                                      info.started_at)
94!
528
                .count();
94!
529
    }
530

531
    // Progress
532
    progress.total_subtasks = info.child_task_ids.size();
96✔
533
    progress.completed_subtasks = info.completed_children.load();
96✔
534
    if (progress.total_subtasks > 0) {
96✔
535
        progress.progress_percentage =
4✔
536
            (100.0 * static_cast<double>(progress.completed_subtasks)) /
6✔
537
            static_cast<double>(progress.total_subtasks);
4✔
538
    } else {
2✔
539
        progress.progress_percentage =
92✔
540
            (info.state == TaskInfo::COMPLETED) ? 100.0 : 0.0;
92✔
541
    }
542

543
    // Location
544
    switch (info.location) {
96!
545
        case TaskInfo::SHARED_QUEUE:
1✔
546
            progress.location = "shared_queue";
2!
547
            break;
2✔
548
        case TaskInfo::LOCAL_QUEUE:
549
            progress.location =
550
                "worker_" + std::to_string(info.worker_id) + "_local";
×
551
            break;
×
552
        case TaskInfo::EXECUTING:
553
            progress.location =
554
                "executing_on_worker_" + std::to_string(info.worker_id);
×
555
            break;
×
556
        case TaskInfo::DONE:
47✔
557
            progress.location = "done";
94!
558
            break;
94✔
559
    }
560

561
    // Build children recursively
562
    for (TaskIndex child_id : info.child_task_ids) {
100✔
563
        progress.children.push_back(
4!
564
            build_task_progress_tree(child_id, processed));
6!
565
    }
566

567
    return progress;
96✔
568
}
48!
569

570
// ============================================================================
571
// Phase 3: Coro-based task execution
572
// ============================================================================
573

574
coro::Coro Executor::run_task(std::shared_ptr<Task> task,
9,296!
575
                              std::shared_ptr<std::any> input) {
787!
576
    // Get worker context from TLS (set by worker_thread at start).
577
    auto* context = static_cast<WorkerContext*>(get_current_worker_context());
2,302✔
578

579
    if (!context || !task) {
2,302✔
580
        co_return;
3,841✔
581
    }
582

583
    // Update worker context
584
    context->current_task_id = task->get_id();
2,302✔
585
    {
586
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
777!
587
        context->current_task_name = task->get_name();
777✔
588
    }
783✔
589
    context->last_activity = std::chrono::steady_clock::now();
777✔
590

591
    // Mark task start
592
    mark_activity();
777!
593
    ++tasks_started_;
777✔
594

595
    // Update task registry to RUNNING
596
    {
597
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
777!
598
        auto it = task_registry_.find(task->get_id());
777!
599
        if (it != task_registry_.end()) {
777!
600
            it->second.state = TaskInfo::RUNNING;
777!
601
            it->second.started_at = std::chrono::steady_clock::now();
777!
602
            it->second.worker_id = context->worker_id;
777!
603
            it->second.location = TaskInfo::EXECUTING;
777!
604
        }
777✔
605
    }
777✔
606

607
    task->result().mark_running();
777!
608

609
    {
610
        CoroScope scope(this);
2,302!
611
        std::exception_ptr task_error;
2,302✔
612

613
        DFTRACER_UTILS_LOG_DEBUG("Worker %zu executing task ID %ld ('%s')",
614
                                 context->worker_id, task->get_id(),
615
                                 task->get_name());
616

617
        try {
618
            auto coro_task = task->execute(scope, *input);
2,302!
619
            // Propagate executor to the CoroTask's PromiseBase so that
620
            // nested awaitables (when_any, channel, etc.) can schedule
621
            // resumptions.  run_task() is a Coro (CoroPromise, not
622
            // PromiseBase), so the normal PromiseBase propagation in
623
            // CoroTask::await_suspend doesn't fire.
624
            coro_task.handle().promise().set_executor(this);
2,302✔
625
            std::any result = co_await std::move(coro_task);
3,078!
626

627
            // Set result via TaskResult
628
            task->set_result(std::move(result));
763!
629
        } catch (...) {
776✔
630
            task_error = std::current_exception();
13✔
631
        }
13!
632

633
        // Always join scope to wait for any sub-spawned coroutines.
634
        // Without this, the scope's JoinHandle would be destroyed
635
        // while FinalAwaiters still reference it (use-after-free).
636
        co_await scope.join();
3,091!
637

638
        if (!task_error) {
777✔
639
            DFTRACER_UTILS_LOG_DEBUG(
640
                "Task ID %ld ('%s') completed successfully", task->get_id(),
641
                task->get_name());
642

643
            // Mark task completion
644
            mark_activity();
764!
645
            ++tasks_completed_;
764✔
646
            context->tasks_executed++;
764✔
647

648
            // Update task registry to COMPLETED
649
            {
650
                std::unique_lock<std::shared_mutex> lock(registry_mutex_);
764!
651
                auto it = task_registry_.find(task->get_id());
764!
652
                if (it != task_registry_.end()) {
764!
653
                    it->second.state = TaskInfo::COMPLETED;
764!
654
                    it->second.completed_at = std::chrono::steady_clock::now();
764!
655
                    it->second.location = TaskInfo::DONE;
764!
656

657
                    // Update parent's completed children count
658
                    if (it->second.parent_task_id != -1) {
764!
659
                        auto parent_it =
420✔
660
                            task_registry_.find(it->second.parent_task_id);
420!
661
                        if (parent_it != task_registry_.end()) {
420!
662
                            parent_it->second.completed_children++;
416!
663
                        }
416✔
664
                    }
420✔
665
                }
764✔
666
            }
764✔
667

668
            // Notify scheduler
669
            notify_completion(task);
764!
670

671
        } else {
764✔
672
            try {
673
                std::rethrow_exception(task_error);
13!
674
            } catch (const std::exception& e) {
13!
675
                DFTRACER_UTILS_LOG_ERROR("Task ID %ld ('%s') failed: %s",
13!
676
                                         task->get_id(), task->get_name(),
677
                                         e.what());
678

679
                task->set_exception(task_error);
13!
680

681
                mark_activity();
13!
682
                ++tasks_completed_;
13✔
683
                context->tasks_executed++;
13✔
684

685
                {
686
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
13!
687
                    auto it = task_registry_.find(task->get_id());
13!
688
                    if (it != task_registry_.end()) {
13!
689
                        it->second.state = TaskInfo::FAILED;
13!
690
                        it->second.completed_at =
13!
691
                            std::chrono::steady_clock::now();
13✔
692
                        it->second.error_message = e.what();
13!
693
                        it->second.location = TaskInfo::DONE;
13!
694
                    }
13✔
695
                }
13✔
696

697
                notify_completion(task);
13!
698

699
            } catch (...) {
13!
700
                DFTRACER_UTILS_LOG_ERROR(
×
701
                    "Task ID %ld ('%s') failed with unknown exception",
702
                    task->get_id(), task->get_name());
703

704
                task->set_exception(task_error);
×
705

706
                mark_activity();
×
707
                ++tasks_completed_;
708
                context->tasks_executed++;
709

710
                {
711
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
712
                    auto it = task_registry_.find(task->get_id());
×
713
                    if (it != task_registry_.end()) {
×
714
                        it->second.state = TaskInfo::FAILED;
×
715
                        it->second.completed_at =
×
716
                            std::chrono::steady_clock::now();
717
                        it->second.error_message = "Unknown exception";
×
718
                        it->second.location = TaskInfo::DONE;
×
719
                    }
720
                }
721

722
                notify_completion(task);
×
723
            }
13!
724
        }
725
    }
2,322✔
726

727
    // Clear current task info
728
    context->current_task_id = -1;
771✔
729
    {
730
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
771!
731
        context->current_task_name.clear();
771✔
732
    }
771✔
733

734
    co_return;
771✔
735
}
13,098!
736

737
void Executor::submit_task(std::shared_ptr<Task> task,
1,573✔
738
                           std::shared_ptr<std::any> input,
739
                           TaskIndex parent_task_id) {
740
    // Register in task registry
741
    {
742
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,573!
743

744
        auto [it, inserted] = task_registry_.emplace(
2,360✔
745
            std::piecewise_construct, std::forward_as_tuple(task->get_id()),
1,573✔
746
            std::forward_as_tuple());
2,361✔
747

748
        if (inserted) {
1,573✔
749
            it->second.task_id = task->get_id();
1,574!
750
            it->second.parent_task_id = parent_task_id;
1,574!
751
            it->second.name = task->get_name();
1,574!
752
            it->second.state = TaskInfo::QUEUED;
1,574!
753
            it->second.queued_at = std::chrono::steady_clock::now();
1,574!
754
            it->second.location = TaskInfo::SHARED_QUEUE;
1,574!
755
            it->second.worker_id = static_cast<std::size_t>(-1);
1,574!
756

757
            if (parent_task_id != -1) {
1,574✔
758
                auto parent_it = task_registry_.find(parent_task_id);
873!
759
                if (parent_it != task_registry_.end()) {
873✔
760
                    parent_it->second.child_task_ids.push_back(task->get_id());
865!
761
                }
437✔
762
            }
441✔
763
        }
787✔
764
    }
1,577✔
765

766
    ++total_tasks_submitted_;
1,575✔
767

768
    // Create Coro, set executor on promise, enqueue released handle
769
    auto coro = run_task(std::move(task), std::move(input));
2,362!
770
    coro.handle().promise().executor = this;
1,575!
771
    enqueue(coro.release());
1,574!
772
}
1,577✔
773

774
TaskIndex Executor::enqueue_tracked(
1,140✔
775
    coro::Coro coro, std::string name,
776
    std::shared_ptr<std::atomic<TaskIndex>> tid_out) {
777
    TaskIndex id = next_coro_task_id_.fetch_sub(1, std::memory_order_relaxed);
1,140✔
778
    coro.handle().promise().task_id = id;
1,140✔
779
    coro.handle().promise().executor = this;
1,140✔
780

781
    {
782
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,140!
783
        auto [it, inserted] = task_registry_.emplace(std::piecewise_construct,
2,280!
784
                                                     std::forward_as_tuple(id),
1,140✔
785
                                                     std::forward_as_tuple());
1,710✔
786
        if (inserted) {
1,140!
787
            auto now = std::chrono::steady_clock::now();
1,140✔
788
            it->second.task_id = id;
1,140!
789
            it->second.parent_task_id = -1;
1,140!
790
            it->second.name = std::move(name);
1,140!
791
            it->second.state = TaskInfo::QUEUED;
1,140!
792
            it->second.queued_at = now;
1,140!
793
            it->second.location = TaskInfo::SHARED_QUEUE;
1,140!
794
            it->second.worker_id = static_cast<std::size_t>(-1);
1,140!
795
        }
570✔
796
    }
1,140✔
797
    ++total_tasks_submitted_;
1,140✔
798

799
    if (tid_out) {
1,140✔
800
        tid_out->store(id, std::memory_order_release);
1,140✔
801
    }
570✔
802

803
    auto handle = coro.release();
1,140!
804
    enqueue(handle, id);
1,140!
805
    return id;
1,140✔
806
}
807

808
void Executor::mark_coro_completed(TaskIndex id) {
2,279✔
809
    bool was_new = false;
2,279✔
810
    {
811
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
2,279!
812
        auto it = task_registry_.find(id);
2,280!
813
        if (it != task_registry_.end() &&
4,560!
814
            it->second.state != TaskInfo::COMPLETED) {
2,280✔
815
            auto now = std::chrono::steady_clock::now();
1,140✔
816
            if (it->second.started_at.time_since_epoch().count() == 0) {
1,140!
817
                it->second.started_at = now;
1,140!
818
            }
570✔
819
            it->second.state = TaskInfo::COMPLETED;
1,140!
820
            it->second.completed_at = now;
1,140!
821
            it->second.location = TaskInfo::DONE;
1,140!
822
            was_new = true;
1,140✔
823
        }
570✔
824
    }
2,280✔
825
    if (was_new) ++tasks_completed_;
2,280✔
826
}
2,280✔
827

828
void Executor::schedule_destroy(std::coroutine_handle<> handle) {
10,358✔
829
    if (handle) {
10,358✔
830
        destroy_queue_.enqueue(handle);
10,359✔
831
    }
5,140✔
832
}
10,451✔
833

834
void Executor::drain_destroy_queue() {
89,863✔
835
    std::coroutine_handle<> to_destroy;
89,863✔
836
    while (destroy_queue_.try_dequeue(to_destroy)) {
100,407✔
837
        if (to_destroy) {
10,533✔
838
            to_destroy.destroy();
10,505!
839
        }
5,158✔
840
    }
841
}
89,880✔
842

843
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc