• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 24057299873

07 Apr 2026 12:01AM UTC coverage: 52.076% (+0.8%) from 51.228%
24057299873

push

github

rayandrew
feat(rocksdb): migrate SQLite indexing to RocksDB

Replace SQLite-backed indexing and provenance storage with RocksDB-backed stores.

  Key changes:
  - add RocksDB async/database/db-manager/filesystem/key-codec layers
  - migrate index and provenance databases from SQLite to RocksDB
  - update index builder, trace reader, reorganize, view, stats, and comparator paths for
  RocksDB
  - harden transaction atomicity and rollback behavior with TransactionScope
  - add iterator status checking for prefix scans
  - harden gzip/tar indexer cache state and metadata handling
  - capture executor context in RocksDB awaitables
  - clean up failed RocksDB open paths and manager lifecycle behavior
  - vendor CPM 0.42.1 and update CI/build integration
  - refresh docs, Python bindings, and C++/Python test coverage for the new backend

  Validation:
  - full test suite passed
  - Ubuntu 22.04 Docker run passed
  - focused RocksDB/indexer regression tests passed.

24097 of 59624 branches covered (40.41%)

Branch coverage included in aggregate %.

2516 of 3144 new or added lines in 75 files covered. (80.03%)

72 existing lines in 15 files now uncovered.

20858 of 26701 relevant lines covered (78.12%)

14113.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.83
/src/dftracer/utils/core/pipeline/executor.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/platform_compat.h>
3
#include <dftracer/utils/core/coro/yield.h>
4
#include <dftracer/utils/core/io/io_backend_factory.h>
5
#include <dftracer/utils/core/io/io_thread_pool.h>
6
#include <dftracer/utils/core/pipeline/executor.h>
7
#include <dftracer/utils/core/tasks/coro_scope.h>
8
#include <dftracer/utils/core/tasks/task.h>
9

10
#include <chrono>
11
#include <coroutine>
12
#include <exception>
13
#include <vector>
14

15
namespace dftracer::utils {
16

17
static thread_local void* tls_current_worker_context = nullptr;
18

19
void* get_current_worker_context() { return tls_current_worker_context; }
2,776✔
20

21
void set_current_worker_context(void* context) {
5,555✔
22
    tls_current_worker_context = context;
5,555✔
23
}
5,555✔
24

25
static thread_local Executor* tls_current_executor = nullptr;
26

27
Executor* Executor::current() noexcept { return tls_current_executor; }
257,189✔
28

29
Executor* Executor::set_current(Executor* e) noexcept {
1,351,472✔
30
    auto* old = tls_current_executor;
1,351,472✔
31
    tls_current_executor = e;
1,351,472✔
32
    return old;
1,351,472✔
33
}
34

35
io::IoThreadPool* Executor::db_pool() noexcept { return db_pool_.get(); }
1,132✔
36

37
// Thread-local list of coroutine handles to destroy after the current
38
// resume() returns.  FinalAwaiter pushes here instead of the shared
39
// destroy_queue_ to avoid another worker freeing the frame while
40
// the coroutine-suspend machinery is still accessing it.
41
static thread_local std::vector<std::coroutine_handle<>> tls_pending_destroys;
42

43
void schedule_thread_local_destroy(std::coroutine_handle<> h) {
5,625✔
44
    tls_pending_destroys.push_back(h);
5,625✔
45
}
5,604✔
46

47
void drain_thread_local_destroys() {
20,881✔
48
    Executor* exec = Executor::current();
20,881✔
49
    for (auto h : tls_pending_destroys) {
26,584✔
50
        if (!h) continue;
5,633✔
51
        if (exec) {
5,625!
52
            exec->schedule_destroy(h);
5,625!
53
        } else {
2,812✔
54
            h.destroy();
×
55
        }
56
    }
57
    tls_pending_destroys.clear();
20,934✔
58
}
20,912✔
59

60
Executor::Executor(const ExecutorConfig& config)
1,338✔
61
    : num_threads_(config.num_threads == 0
892✔
62
                       ? dftracer_utils_hardware_concurrency()
446✔
63
                       : config.num_threads),
64
      last_activity_time_(std::chrono::steady_clock::now()),
446✔
65
      idle_timeout_(config.idle_timeout),
446✔
66
      deadlock_timeout_(config.deadlock_timeout),
446✔
67
      io_pool_size_(config.io_pool_size),
446✔
68
      io_backend_type_(config.io_backend_type),
446✔
69
      io_batch_threshold_(config.io_batch_threshold),
446✔
70
      db_pool_size_(config.db_pool_size) {
1,784!
71
    if (num_threads_ == 0) {
892✔
72
        num_threads_ = 2;  // Fallback if hardware_concurrency returns 0
×
73
    }
74
    DFTRACER_UTILS_LOG_DEBUG(
75
        "Executor created with %zu threads, idle_timeout=%lld s, "
76
        "deadlock_timeout=%lld s",
77
        num_threads_, idle_timeout_.count(), deadlock_timeout_.count());
78
}
892✔
79

80
Executor::~Executor() {
1,338✔
81
    shutdown();
892!
82
    drain_destroy_queue();
892!
83
}
1,338✔
84

85
void Executor::start() {
882✔
86
    if (running_) {
882!
87
        DFTRACER_UTILS_LOG_WARN("%s", "Executor already running");
×
88
        return;
×
89
    }
90

91
    running_ = true;
882✔
92
    workers_.clear();
882✔
93
    workers_.reserve(num_threads_);
882✔
94

95
    timer_service_.start();
882✔
96

97
    // Create and start I/O backend before workers so workers
98
    // can use it immediately.
99
    io_backend_ = io::create_io_backend(*this, io_pool_size_, io_backend_type_,
1,764✔
100
                                        io_batch_threshold_);
882✔
101
    io_backend_->start();
882✔
102

103
    db_pool_ = std::make_unique<io::IoThreadPool>(db_pool_size_);
882✔
104
    db_pool_->start();
882✔
105

106
    // Create all worker contexts first so workers_ is stable before any
107
    // worker thread can try to iterate/steal from it.
108
    for (std::size_t i = 0; i < num_threads_; ++i) {
3,678✔
109
        auto worker = std::make_unique<WorkerContext>(i);
2,796!
110
        worker->last_activity = std::chrono::steady_clock::now();
2,796✔
111
        workers_.push_back(std::move(worker));
2,796!
112
    }
2,796✔
113

114
    // Start worker threads after all contexts are in place.
115
    for (auto& worker : workers_) {
3,678✔
116
        worker->thread =
2,796✔
117
            std::thread(&Executor::worker_thread, this, worker.get());
4,248!
118
    }
119

120
    DFTRACER_UTILS_LOG_DEBUG("Executor started with %zu worker threads",
121
                             num_threads_);
122
}
441✔
123

124
void Executor::shutdown() {
2,340✔
125
    if (!running_) {
2,340✔
126
        return;
1,458✔
127
    }
128

129
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutting down executor");
130
    running_ = false;
882✔
131
    wake_all_workers();
882✔
132

133
    // Join all worker threads (must happen before io_backend_ is
134
    // destroyed, since workers call io_backend_->poll() when idle).
135
    for (auto& worker : workers_) {
3,678✔
136
        if (worker->thread.joinable()) {
2,796✔
137
            worker->thread.join();
2,796!
138
        }
1,344✔
139
    }
140

141
    // Stop I/O backend AFTER joining workers (workers may poll the
142
    // backend) but BEFORE clearing workers_.  The I/O backend's
143
    // completion thread may still call enqueue() -> wake_all_workers()
144
    // which accesses WorkerContext cv/mutex, so workers_ must remain
145
    // alive until the completion thread has exited.
146
    if (db_pool_) {
882✔
147
        db_pool_->stop();
882✔
148
        db_pool_.reset();
882✔
149
    }
441✔
150

151
    if (io_backend_) {
882✔
152
        io_backend_->stop();
882✔
153
        io_backend_.reset();
882✔
154
    }
441✔
155

156
    // Destroy deferred frames and orphaned run-queue entries BEFORE
157
    // clearing workers_. Frames may hold shared_ptr<Channel> whose
158
    // ConcurrentQueue has TLS producer tokens tied to worker threads.
159
    drain_destroy_queue();
882✔
160
    {
161
        std::coroutine_handle<> orphan;
882✔
162
        while (run_queue_.try_dequeue(orphan)) {
909✔
163
            if (orphan) {
27✔
164
                orphan.destroy();
27!
165
            }
11✔
166
        }
167
    }
168

169
    workers_.clear();
882✔
170
    timer_service_.stop();
882✔
171

172
    // Drain the main thread's thread-local destroy list (for
173
    // coroutines whose FinalAwaiter ran on the main thread).
174
    drain_thread_local_destroys();
882✔
175

176
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor shutdown complete");
177
}
1,170✔
178

179
void Executor::reset() {
×
180
    // Queue will be reset by caller if needed
181
    DFTRACER_UTILS_LOG_DEBUG("%s", "Executor reset");
182
}
×
183

184
void Executor::set_completion_callback(CompletionCallback callback) {
590✔
185
    completion_callback_ = std::move(callback);
590✔
186
}
590✔
187

188
void Executor::worker_thread(WorkerContext* context) {
2,796✔
189
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu started", context->worker_id);
190

191
    set_current_worker_context(context);
2,796✔
192
    tls_current_executor = this;
2,796✔
193
    coro::reset_timeslice();
2,796✔
194

195
    while (running_) {
29,032✔
196
        std::coroutine_handle<> pending_resume;
26,249✔
197

198
        // Snapshot the work signal BEFORE checking any queues.
199
        // This ensures that any signal increment (from enqueue +
200
        // signal_global_work) that happens AFTER this load will be detected by
201
        // the wait predicate below, even if the actual queue check sees the
202
        // queue as empty. Loading it inside the else branch (after queue
203
        // checks) creates a race: work can arrive between the queue check and
204
        // the signal load, causing the worker to sleep with the updated signal
205
        // value while work sits in the queue.
206
        const std::uint64_t observed_signal =
13,222✔
207
            work_signal_.load(std::memory_order_acquire);
26,249✔
208

209
        // Run queue: coroutine handles from enqueue() and
210
        // schedule_coroutine_resumption().
211
        if (run_queue_.try_dequeue(pending_resume)) {
26,237✔
212
            coro::reset_timeslice();
17,307✔
213
            context->is_idle.store(false, std::memory_order_relaxed);
17,297✔
214
            if (pending_resume && !pending_resume.done()) {
17,314✔
215
                auto typed =
216
                    std::coroutine_handle<coro::CoroPromise>::from_address(
17,294✔
217
                        pending_resume.address());
8,858✔
218
                TaskIndex tid = typed.promise().task_id;
17,271✔
219
                if (tid >= 0) {
17,265✔
220
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
11,639!
221
                    auto it = task_registry_.find(tid);
11,644!
222
                    if (it != task_registry_.end() &&
11,644!
223
                        it->second.state == TaskInfo::QUEUED) {
×
224
                        it->second.state = TaskInfo::RUNNING;
×
225
                        it->second.started_at =
×
226
                            std::chrono::steady_clock::now();
×
227
                        it->second.worker_id = context->worker_id;
×
228
                        it->second.location = TaskInfo::EXECUTING;
×
229
                        ++tasks_started_;
×
230
                    }
231
                }
11,644✔
232
                DFTRACER_TSAN_ACQUIRE(pending_resume.address());
233
                pending_resume.resume();
17,270!
234
            }
8,858✔
235
            // Destroy coroutine frames that FinalAwaiter deferred to this
236
            // thread.  Safe: resume() has fully returned, so the frame
237
            // is suspended at final_suspend and no code references it.
238
            drain_thread_local_destroys();
17,240!
239
            drain_destroy_queue();
17,256!
240
        }
8,864✔
241
        // No work available -- sleep until signaled.
242
        else {
243
            context->is_idle.store(true, std::memory_order_relaxed);
8,986✔
244
            // Flush any batched I/O operations before sleeping.
245
            // This ensures pending SQEs are submitted even when no
246
            // new work is arriving (drain trigger).
247
            if (io_backend_) {
8,985✔
248
                io_backend_->flush();
8,969!
249
            }
4,355✔
250
            // Opportunistic I/O polling before sleeping.
251
            // If the backend has completions, they'll enqueue new
252
            // work, so skip the sleep and retry the run queue.
253
            if (io_backend_) {
8,999✔
254
                auto reaped = io_backend_->poll(0);
8,998!
255
                if (reaped > 0) {
8,994!
NEW
256
                    drain_destroy_queue();
×
UNCOV
257
                    continue;
×
258
                }
259
            }
4,357✔
260
            drain_destroy_queue();
8,995!
261
            std::unique_lock<std::mutex> lock(context->queue_mutex);
8,990!
262
            context->cv.wait(lock, [this, observed_signal] {
17,655!
263
                return !running_.load(std::memory_order_acquire) ||
25,680✔
264
                       work_signal_.load(std::memory_order_acquire) !=
30,449✔
265
                           observed_signal;
16,575✔
266
            });
267
        }
8,975✔
268
    }
269

270
    // Final drain: destroy any frames deferred during the last resume().
271
    drain_thread_local_destroys();
2,788✔
272

273
    tls_current_executor = nullptr;
2,766✔
274
    set_current_worker_context(nullptr);
2,766✔
275

276
    DFTRACER_UTILS_LOG_DEBUG("Worker %zu terminated", context->worker_id);
277
}
2,761✔
278

279
void Executor::notify_completion(std::shared_ptr<Task> task) {
1,382✔
280
    // No mutex needed -- the callback is set exactly once during Scheduler
281
    // construction (before any task is submitted) and never modified after.
282
    // on_task_completed uses only atomics, lock-free queues, and properly-
283
    // locked shard mutexes, so concurrent calls from multiple workers are safe.
284
    if (completion_callback_) {
1,382✔
285
        completion_callback_(task);
1,382!
286
    }
693✔
287
}
1,375✔
288

289
void Executor::request_shutdown() {
10✔
290
    if (shutdown_requested_.load()) {
10!
291
        return;  // Already requested
×
292
    }
293

294
    DFTRACER_UTILS_LOG_DEBUG("%s", "Shutdown requested for executor");
295
    shutdown_requested_ = true;
10✔
296
}
5✔
297

298
void Executor::schedule_coroutine_resumption(std::coroutine_handle<> handle) {
8,329✔
299
    // Delegate to enqueue() -- unified path for all coroutine handles.
300
    enqueue(handle);
8,329✔
301
}
8,333✔
302

303
void Executor::signal_global_work() {
17,344✔
304
    work_signal_.fetch_add(1, std::memory_order_acq_rel);
17,344✔
305
    // Wake one worker, not all. Each enqueue adds one unit of work,
306
    // so one worker is sufficient. Avoids thundering herd where all
307
    // N threads wake, N-1 find no work, and go back to sleep.
308
    wake_one_worker();
17,344✔
309
}
17,347✔
310

311
void Executor::wake_one_worker() {
17,357✔
312
    const std::size_t worker_count = workers_.size();
17,357✔
313
    if (worker_count == 0) {
17,356✔
314
        return;
×
315
    }
316

317
    const std::size_t worker_index =
8,875✔
318
        next_worker_.fetch_add(1, std::memory_order_relaxed) % worker_count;
17,356✔
319
    // Lock-then-unlock the worker's mutex before notifying.
320
    // This ensures the worker is either before its predicate check (and will
321
    // see the updated atomic state) or inside cv.wait (and will receive the
322
    // notification). Without this, a notification sent between predicate
323
    // evaluation and cv.wait entry is lost, causing the worker to hang.
324
    workers_[worker_index]->queue_mutex.lock();
17,356✔
325
    workers_[worker_index]->queue_mutex.unlock();
17,354✔
326
    workers_[worker_index]->cv.notify_one();
17,355✔
327
}
8,875✔
328

329
void Executor::wake_all_workers() {
882✔
330
    for (auto& worker : workers_) {
3,678✔
331
        // Lock-then-unlock ensures the worker is either before its predicate
332
        // check or inside cv.wait before the notification is sent.
333
        // See wake_one_worker() for detailed rationale.
334
        worker->queue_mutex.lock();
2,796!
335
        worker->queue_mutex.unlock();
2,796✔
336
        worker->cv.notify_all();
2,796✔
337
    }
338
}
882✔
339

340
// Helper function for when_all.h (avoids circular dependency)
341
void schedule_coroutine_resumption_helper(Executor* executor,
8,330✔
342
                                          std::coroutine_handle<> handle) {
343
    if (executor) {
8,330✔
344
        executor->schedule_coroutine_resumption(handle);
8,330✔
345
    }
4,365✔
346
}
8,333✔
347

348
// Helper function for when_any.h (avoids circular dependency)
349
void schedule_destroy_helper(Executor* executor,
14✔
350
                             std::coroutine_handle<> handle) {
351
    if (executor && handle) {
14!
352
        executor->schedule_destroy(handle);
14✔
353
    }
7✔
354
}
14✔
355

356
void Executor::enqueue(std::coroutine_handle<> handle) {
17,345✔
357
    if (!handle || handle.done()) {
17,345!
358
        return;  // Invalid or already completed
2✔
359
    }
360

361
    DFTRACER_TSAN_RELEASE(handle.address());
362
    run_queue_.enqueue(handle);
17,349✔
363
    signal_global_work();
17,352✔
364
}
8,871✔
365

366
bool Executor::is_responsive() const {
4✔
367
    // If shutdown was requested, consider unresponsive
368
    if (shutdown_requested_.load()) {
4!
369
        return false;
×
370
    }
371

372
    // If not running, not responsive
373
    if (!running_.load()) {
4✔
374
        return false;
×
375
    }
376

377
    // Check if all threads might be deadlocked
378
    // (all threads busy but no progress for a while)
379
    std::size_t started = tasks_started_.load();
4✔
380
    std::size_t completed = tasks_completed_.load();
4✔
381
    std::size_t active = started - completed;
4✔
382

383
    if (active >= num_threads_) {
4✔
384
        // All threads busy - check if making progress
385
        std::lock_guard<std::mutex> lock(activity_mutex_);
2!
386
        auto now = std::chrono::steady_clock::now();
2✔
387
        auto idle_time = now - last_activity_time_;
2!
388

389
        // If all threads busy but no activity for deadlock_timeout,
390
        // likely deadlocked
391
        if (idle_time > deadlock_timeout_) {
2!
392
            DFTRACER_UTILS_LOG_WARN(
×
393
                "Executor appears deadlocked: %zu threads, %zu active "
394
                "tasks, idle for %lld ms",
395
                num_threads_, active,
396
                std::chrono::duration_cast<std::chrono::milliseconds>(idle_time)
397
                    .count());
398
            return false;
×
399
        }
400
    }
2!
401

402
    return true;
4✔
403
}
2✔
404

405
void Executor::mark_activity() {
2,747✔
406
    std::lock_guard<std::mutex> lock(activity_mutex_);
2,747!
407
    last_activity_time_ = std::chrono::steady_clock::now();
2,763✔
408
}
2,763✔
409

410
void Executor::update_task_location(TaskIndex task_id,
×
411
                                    TaskInfo::Location location,
412
                                    std::size_t worker_id) {
413
    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
414
    auto it = task_registry_.find(task_id);
×
415
    if (it != task_registry_.end()) {
×
416
        it->second.location = location;
×
417
        if (location == TaskInfo::LOCAL_QUEUE ||
×
418
            location == TaskInfo::EXECUTING) {
419
            it->second.worker_id = worker_id;
×
420
        }
421
    }
422
}
×
423

424
ExecutorProgress Executor::get_progress() const {
42✔
425
    std::shared_lock<std::shared_mutex> lock(registry_mutex_);
42!
426
    ExecutorProgress progress;
42✔
427

428
    // Overall stats
429
    progress.total_tasks_submitted = total_tasks_submitted_.load();
42✔
430
    progress.tasks_completed = tasks_completed_.load();
42✔
431

432
    // Count task states
433
    progress.tasks_queued = 0;
42✔
434
    progress.tasks_running = 0;
42✔
435
    progress.tasks_failed = 0;
42✔
436

437
    for (const auto& [task_id, info] : task_registry_) {
138!
438
        switch (info.state) {
96!
439
            case TaskInfo::QUEUED:
1✔
440
                progress.tasks_queued++;
2✔
441
                break;
2✔
442
            case TaskInfo::RUNNING:
443
            case TaskInfo::WAITING:
444
                progress.tasks_running++;
×
445
                break;
×
446
            case TaskInfo::COMPLETED:
47✔
447
                // Already counted
448
                break;
94✔
449
            case TaskInfo::FAILED:
450
                progress.tasks_failed++;
×
451
                break;
×
452
        }
453

454
        if (info.state == TaskInfo::FAILED && !info.error_message.empty()) {
96!
455
            progress.recent_errors.push_back({task_id, info.error_message});
×
456
        }
457
    }
458

459
    for (std::size_t i = 0; i < workers_.size(); ++i) {
118✔
460
        // No per-worker local queues anymore; report 0.
461
        progress.worker_queue_depths.push_back(0);
76!
462
    }
38✔
463

464
    // Build task trees (find root tasks)
465
    std::unordered_set<TaskIndex> processed;
63✔
466
    for (const auto& [task_id, info] : task_registry_) {
138!
467
        if (info.parent_task_id == -1) {  // Root task
96✔
468
            auto task_progress = build_task_progress_tree(task_id, processed);
92!
469
            progress.root_tasks.push_back(task_progress);
92✔
470
        }
92✔
471
    }
472

473
    // Worker states
474
    for (const auto& worker : workers_) {
118✔
475
        ExecutorProgress::WorkerStatus status;
76✔
476
        status.worker_id = worker->worker_id;
76✔
477
        status.is_idle = worker->is_idle.load();
76✔
478

479
        TaskIndex current_id = worker->current_task_id.load();
76✔
480
        if (current_id != -1) {
76✔
481
            status.current_task_id = current_id;
5!
482
            std::lock_guard<std::mutex> name_lock(worker->task_name_mutex);
5!
483
            status.current_task_name = worker->current_task_name;
5!
484
        }
5✔
485

486
        status.local_queue_depth = 0;
76✔
487

488
        progress.workers.push_back(status);
76!
489
    }
76✔
490

491
    return progress;
63✔
492
}
42!
493

494
TaskProgress Executor::build_task_progress_tree(
96✔
495
    TaskIndex task_id, std::unordered_set<TaskIndex>& processed) const {
496
    TaskProgress progress;
96✔
497

498
    if (processed.count(task_id)) {
96!
499
        // Avoid cycles
500
        progress.task_id = task_id;
×
501
        progress.name = "[Cycle Detected]";
×
502
        return progress;
×
503
    }
504
    processed.insert(task_id);
96!
505

506
    auto it = task_registry_.find(task_id);
96!
507
    if (it == task_registry_.end()) {
96!
508
        progress.task_id = task_id;
×
509
        progress.name = "[Not Found]";
×
510
        return progress;
×
511
    }
512

513
    const TaskInfo& info = it->second;
96!
514
    progress.task_id = task_id;
96✔
515
    progress.name = info.name;
96!
516

517
    // State
518
    switch (info.state) {
96!
519
        case TaskInfo::QUEUED:
1✔
520
            progress.state = "queued";
2!
521
            break;
2✔
522
        case TaskInfo::RUNNING:
523
            progress.state = "running";
×
524
            break;
×
525
        case TaskInfo::WAITING:
526
            progress.state = "waiting";
×
527
            break;
×
528
        case TaskInfo::COMPLETED:
47✔
529
            progress.state = "completed";
94!
530
            break;
94✔
531
        case TaskInfo::FAILED:
532
            progress.state = "failed";
×
533
            break;
×
534
    }
535

536
    // Timing
537
    auto now = std::chrono::steady_clock::now();
96✔
538
    if (info.state == TaskInfo::QUEUED) {
96✔
539
        progress.queued_duration_ms =
2✔
540
            std::chrono::duration<double, std::milli>(now - info.queued_at)
2!
541
                .count();
2!
542
        progress.execution_duration_ms = 0;
2✔
543
    } else if (info.state == TaskInfo::RUNNING ||
95!
544
               info.state == TaskInfo::WAITING) {
94!
545
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
×
546
                                          info.started_at - info.queued_at)
×
547
                                          .count();
×
548
        progress.execution_duration_ms =
×
549
            std::chrono::duration<double, std::milli>(now - info.started_at)
×
550
                .count();
×
551
    } else {  // COMPLETED or FAILED
552
        progress.queued_duration_ms = std::chrono::duration<double, std::milli>(
141!
553
                                          info.started_at - info.queued_at)
94!
554
                                          .count();
94!
555
        progress.execution_duration_ms =
94✔
556
            std::chrono::duration<double, std::milli>(info.completed_at -
188!
557
                                                      info.started_at)
94!
558
                .count();
94!
559
    }
560

561
    // Progress
562
    progress.total_subtasks = info.child_task_ids.size();
96✔
563
    progress.completed_subtasks = info.completed_children.load();
96✔
564
    if (progress.total_subtasks > 0) {
96✔
565
        progress.progress_percentage =
4✔
566
            (100.0 * static_cast<double>(progress.completed_subtasks)) /
6✔
567
            static_cast<double>(progress.total_subtasks);
4✔
568
    } else {
2✔
569
        progress.progress_percentage =
92✔
570
            (info.state == TaskInfo::COMPLETED) ? 100.0 : 0.0;
92✔
571
    }
572

573
    // Location
574
    switch (info.location) {
96!
575
        case TaskInfo::SHARED_QUEUE:
1✔
576
            progress.location = "shared_queue";
2!
577
            break;
2✔
578
        case TaskInfo::LOCAL_QUEUE:
579
            progress.location =
580
                "worker_" + std::to_string(info.worker_id) + "_local";
×
581
            break;
×
582
        case TaskInfo::EXECUTING:
583
            progress.location =
584
                "executing_on_worker_" + std::to_string(info.worker_id);
×
585
            break;
×
586
        case TaskInfo::DONE:
47✔
587
            progress.location = "done";
94!
588
            break;
94✔
589
    }
590

591
    // Build children recursively
592
    for (TaskIndex child_id : info.child_task_ids) {
100✔
593
        progress.children.push_back(
4!
594
            build_task_progress_tree(child_id, processed));
6!
595
    }
596

597
    return progress;
96✔
598
}
48!
599

600
// ============================================================================
601
// Phase 3: Coro-based task execution
602
// ============================================================================
603

604
coro::Coro Executor::run_task(std::shared_ptr<Task> task,
8,312!
605
                              std::shared_ptr<std::any> input) {
704!
606
    // Get worker context from TLS (set by worker_thread at start).
607
    auto* context = static_cast<WorkerContext*>(get_current_worker_context());
2,053✔
608

609
    if (!context || !task) {
2,053✔
610
        co_return;
3,426✔
611
    }
612

613
    // Update worker context
614
    context->current_task_id = task->get_id();
2,053✔
615
    {
616
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
694!
617
        context->current_task_name = task->get_name();
694✔
618
    }
694✔
619
    context->last_activity = std::chrono::steady_clock::now();
692✔
620

621
    // Mark task start
622
    mark_activity();
692✔
623
    ++tasks_started_;
693✔
624

625
    // Update task registry to RUNNING
626
    {
627
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
693!
628
        auto it = task_registry_.find(task->get_id());
693!
629
        if (it != task_registry_.end()) {
693!
630
            it->second.state = TaskInfo::RUNNING;
693!
631
            it->second.started_at = std::chrono::steady_clock::now();
693!
632
            it->second.worker_id = context->worker_id;
693!
633
            it->second.location = TaskInfo::EXECUTING;
693!
634
        }
693✔
635
    }
693✔
636

637
    task->result().mark_running();
693!
638

639
    {
640
        CoroScope scope(this);
2,051!
641
        std::exception_ptr task_error;
2,051✔
642

643
        DFTRACER_UTILS_LOG_DEBUG("Worker %zu executing task ID %ld ('%s')",
644
                                 context->worker_id, task->get_id(),
645
                                 task->get_name());
646

647
        try {
648
            auto coro_task = task->execute(scope, *input);
2,051!
649
            // Propagate executor to the CoroTask's PromiseBase so that
650
            // nested awaitables (when_any, channel, etc.) can schedule
651
            // resumptions.  run_task() is a Coro (CoroPromise, not
652
            // PromiseBase), so the normal PromiseBase propagation in
653
            // CoroTask::await_suspend doesn't fire.
654
            coro_task.handle().promise().set_executor(this);
2,051✔
655
            std::any result = co_await std::move(coro_task);
2,744!
656

657
            // Set result via TaskResult
658
            task->set_result(std::move(result));
678!
659
        } catch (...) {
691✔
660
            task_error = std::current_exception();
13✔
661
        }
13!
662

663
        // Always join scope to wait for any sub-spawned coroutines.
664
        // Without this, the scope's JoinHandle would be destroyed
665
        // while FinalAwaiters still reference it (use-after-free).
666
        co_await scope.join();
2,766!
667

668
        if (!task_error) {
693✔
669
            DFTRACER_UTILS_LOG_DEBUG(
670
                "Task ID %ld ('%s') completed successfully", task->get_id(),
671
                task->get_name());
672

673
            // Mark task completion
674
            mark_activity();
680!
675
            ++tasks_completed_;
680✔
676
            context->tasks_executed++;
680✔
677

678
            // Update task registry to COMPLETED
679
            {
680
                std::unique_lock<std::shared_mutex> lock(registry_mutex_);
680!
681
                auto it = task_registry_.find(task->get_id());
680!
682
                if (it != task_registry_.end()) {
680!
683
                    it->second.state = TaskInfo::COMPLETED;
680!
684
                    it->second.completed_at = std::chrono::steady_clock::now();
680!
685
                    it->second.location = TaskInfo::DONE;
680!
686

687
                    // Update parent's completed children count
688
                    if (it->second.parent_task_id != -1) {
680!
689
                        auto parent_it =
368✔
690
                            task_registry_.find(it->second.parent_task_id);
368!
691
                        if (parent_it != task_registry_.end()) {
368!
692
                            parent_it->second.completed_children++;
368!
693
                        }
368✔
694
                    }
368✔
695
                }
680✔
696
            }
680✔
697

698
            // Notify scheduler
699
            notify_completion(task);
680!
700

701
        } else {
680✔
702
            try {
703
                std::rethrow_exception(task_error);
13!
704
            } catch (const std::exception& e) {
13!
705
                DFTRACER_UTILS_LOG_ERROR("Task ID %ld ('%s') failed: %s",
13!
706
                                         task->get_id(), task->get_name(),
707
                                         e.what());
708

709
                task->set_exception(task_error);
13!
710

711
                mark_activity();
13!
712
                ++tasks_completed_;
13✔
713
                context->tasks_executed++;
13✔
714

715
                {
716
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
13!
717
                    auto it = task_registry_.find(task->get_id());
13!
718
                    if (it != task_registry_.end()) {
13!
719
                        it->second.state = TaskInfo::FAILED;
13!
720
                        it->second.completed_at =
13!
721
                            std::chrono::steady_clock::now();
13✔
722
                        it->second.error_message = e.what();
13!
723
                        it->second.location = TaskInfo::DONE;
13!
724
                    }
13✔
725
                }
13✔
726

727
                notify_completion(task);
13!
728

729
            } catch (...) {
13!
730
                DFTRACER_UTILS_LOG_ERROR(
×
731
                    "Task ID %ld ('%s') failed with unknown exception",
732
                    task->get_id(), task->get_name());
733

734
                task->set_exception(task_error);
×
735

736
                mark_activity();
×
737
                ++tasks_completed_;
738
                context->tasks_executed++;
739

740
                {
741
                    std::unique_lock<std::shared_mutex> lock(registry_mutex_);
×
742
                    auto it = task_registry_.find(task->get_id());
×
743
                    if (it != task_registry_.end()) {
×
744
                        it->second.state = TaskInfo::FAILED;
×
745
                        it->second.completed_at =
×
746
                            std::chrono::steady_clock::now();
747
                        it->second.error_message = "Unknown exception";
×
748
                        it->second.location = TaskInfo::DONE;
×
749
                    }
750
                }
751

752
                notify_completion(task);
×
753
            }
13!
754
        }
755
    }
2,079✔
756

757
    // Clear current task info
758
    context->current_task_id = -1;
689✔
759
    {
760
        std::lock_guard<std::mutex> lock(context->task_name_mutex);
689!
761
        context->current_task_name.clear();
689✔
762
    }
689✔
763

764
    co_return;
689✔
765
}
11,693!
766

767
void Executor::submit_task(std::shared_ptr<Task> task,
1,406✔
768
                           std::shared_ptr<std::any> input,
769
                           TaskIndex parent_task_id) {
770
    // Register in task registry
771
    {
772
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,406!
773

774
        auto [it, inserted] = task_registry_.emplace(
4,224!
775
            std::piecewise_construct, std::forward_as_tuple(task->get_id()),
1,408!
776
            std::forward_as_tuple());
2,112✔
777

778
        if (inserted) {
1,408!
779
            it->second.task_id = task->get_id();
1,408!
780
            it->second.parent_task_id = parent_task_id;
2,112!
781
            it->second.name = task->get_name();
1,408!
782
            it->second.state = TaskInfo::QUEUED;
1,408!
783
            it->second.queued_at = std::chrono::steady_clock::now();
2,112!
784
            it->second.location = TaskInfo::SHARED_QUEUE;
1,408!
785
            it->second.worker_id = static_cast<std::size_t>(-1);
1,408!
786

787
            if (parent_task_id != -1) {
1,408✔
788
                auto parent_it = task_registry_.find(parent_task_id);
794!
789
                if (parent_it != task_registry_.end()) {
794!
790
                    parent_it->second.child_task_ids.push_back(task->get_id());
794!
791
                }
390✔
792
            }
390✔
793
        }
704✔
794
    }
1,408✔
795

796
    ++total_tasks_submitted_;
1,408✔
797

798
    // Create Coro, set executor on promise, enqueue released handle
799
    auto coro = run_task(std::move(task), std::move(input));
2,112!
800
    coro.handle().promise().executor = this;
1,408!
801
    enqueue(coro.release());
1,408!
802
}
1,408✔
803

804
TaskIndex Executor::enqueue_tracked(
684✔
805
    coro::Coro coro, std::string name,
806
    std::shared_ptr<std::atomic<TaskIndex>> tid_out) {
807
    TaskIndex id = next_coro_task_id_.fetch_sub(1, std::memory_order_relaxed);
684✔
808
    coro.handle().promise().task_id = id;
684✔
809
    coro.handle().promise().executor = this;
684✔
810

811
    {
812
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
684!
813
        auto [it, inserted] = task_registry_.emplace(std::piecewise_construct,
1,368!
814
                                                     std::forward_as_tuple(id),
684✔
815
                                                     std::forward_as_tuple());
1,026✔
816
        if (inserted) {
684!
817
            auto now = std::chrono::steady_clock::now();
684✔
818
            it->second.task_id = id;
1,026!
819
            it->second.parent_task_id = -1;
684!
820
            it->second.name = std::move(name);
684!
821
            it->second.state = TaskInfo::QUEUED;
684!
822
            it->second.queued_at = now;
684!
823
            it->second.location = TaskInfo::SHARED_QUEUE;
684!
824
            it->second.worker_id = static_cast<std::size_t>(-1);
684!
825
        }
342✔
826
    }
684✔
827
    ++total_tasks_submitted_;
684✔
828

829
    if (tid_out) {
684✔
830
        tid_out->store(id, std::memory_order_release);
684✔
831
    }
342✔
832

833
    auto handle = coro.release();
684!
834
    enqueue(handle);
684!
835
    return id;
684✔
836
}
837

838
void Executor::mark_coro_completed(TaskIndex id) {
1,368✔
839
    bool was_new = false;
1,368✔
840
    {
841
        std::unique_lock<std::shared_mutex> lock(registry_mutex_);
1,368!
842
        auto it = task_registry_.find(id);
1,368!
843
        if (it != task_registry_.end() &&
2,736!
844
            it->second.state != TaskInfo::COMPLETED) {
1,368✔
845
            auto now = std::chrono::steady_clock::now();
684✔
846
            if (it->second.started_at.time_since_epoch().count() == 0) {
684!
847
                it->second.started_at = now;
684!
848
            }
342✔
849
            it->second.state = TaskInfo::COMPLETED;
684!
850
            it->second.completed_at = now;
684!
851
            it->second.location = TaskInfo::DONE;
684!
852
            was_new = true;
684✔
853
        }
342✔
854
    }
1,368✔
855
    if (was_new) ++tasks_completed_;
1,368✔
856
}
1,368✔
857

858
void Executor::schedule_destroy(std::coroutine_handle<> handle) {
5,581✔
859
    if (handle) {
5,581✔
860
        destroy_queue_.enqueue(handle);
5,617✔
861
    }
2,821✔
862
}
5,664✔
863

864
void Executor::drain_destroy_queue() {
27,968✔
865
    std::coroutine_handle<> to_destroy;
27,968✔
866
    while (destroy_queue_.try_dequeue(to_destroy)) {
33,664✔
867
        if (to_destroy) {
5,688✔
868
            to_destroy.destroy();
5,688!
869
        }
2,825✔
870
    }
871
}
28,001✔
872

873
}  // namespace dftracer::utils
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc