• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5053

13 May 2026 12:00PM UTC coverage: 73.397% (+0.06%) from 73.338%
#5053

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

627 existing lines in 131 files now uncovered.

281694 of 383795 relevant lines covered (73.4%)

132505311.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.0
/source/dnode/vnode/src/vnd/vnodeAsync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "vnd.h"
17
#include "vnodeHash.h"
18

19
typedef struct SVAsync    SVAsync;
20
typedef struct SVATask    SVATask;
21
typedef struct SVAChannel SVAChannel;
22

23
#define VNODE_ASYNC_DEFAULT_WORKERS 4
24
#define VNODE_ASYNC_MAX_WORKERS     256
25

26
// priority
27

28
#define EVA_PRIORITY_MAX (EVA_PRIORITY_LOW + 1)
29

30
// worker
31
typedef enum {
32
  EVA_WORKER_STATE_UINIT = 0,
33
  EVA_WORKER_STATE_ACTIVE,
34
  EVA_WORKER_STATE_IDLE,
35
  EVA_WORKER_STATE_STOP,
36
} EVWorkerState;
37

38
typedef struct {
39
  SVAsync      *async;
40
  int32_t       workerId;
41
  EVWorkerState state;
42
  TdThread      thread;
43
  SVATask      *runningTask;
44
} SVWorker;
45

46
// task
47
typedef enum {
48
  EVA_TASK_STATE_WAITTING = 0,
49
  EVA_TASK_STATE_RUNNING,
50
} EVATaskState;
51

52
struct SVATask {
53
  int64_t     taskId;
54
  EVAPriority priority;
55
  int32_t     priorScore;
56
  SVAChannel *channel;
57
  int32_t (*execute)(void *);
58
  void (*cancel)(void *);
59
  void        *arg;
60
  EVATaskState state;
61

62
  // wait
63
  int32_t      numWait;
64
  TdThreadCond waitCond;
65

66
  // queue
67
  struct SVATask *prev;
68
  struct SVATask *next;
69
};
70

71
typedef struct {
72
  void (*cancel)(void *);
73
  void *arg;
74
} SVATaskCancelInfo;
75

76
#define VATASK_PIORITY(task_) ((task_)->priority - ((task_)->priorScore / 4))
77

78
// async channel
79
typedef enum {
80
  EVA_CHANNEL_STATE_OPEN = 0,
81
  EVA_CHANNEL_STATE_CLOSE,
82
} EVAChannelState;
83

84
struct SVAChannel {
85
  int64_t         channelId;
86
  EVAChannelState state;
87
  SVATask         queue[EVA_PRIORITY_MAX];
88
  SVATask        *scheduled;
89

90
  SVAChannel *prev;
91
  SVAChannel *next;
92
};
93

94
// async handle
95
struct SVAsync {
96
  const char *label;
97

98
  TdThreadMutex mutex;
99
  TdThreadCond  hasTask;
100
  bool          stop;
101

102
  // worker
103
  int32_t  numWorkers;
104
  int32_t  numLaunchWorkers;
105
  int32_t  numIdleWorkers;
106
  SVWorker workers[VNODE_ASYNC_MAX_WORKERS];
107

108
  // channel
109
  int64_t      nextChannelId;
110
  int32_t      numChannels;
111
  SVAChannel   chList;
112
  SVHashTable *channelTable;
113

114
  // task
115
  int64_t      nextTaskId;
116
  int32_t      numTasks;
117
  SVATask      queue[EVA_PRIORITY_MAX];
118
  SVHashTable *taskTable;
119
};
120

121
struct {
122
  const char *label;
123
  SVAsync    *async;
124
} GVnodeAsyncs[] = {
125
    [0] = {NULL, NULL},
126
    [1] = {"vnode-commit", NULL},
127
    [2] = {"vnode-merge", NULL},
128
    [3] = {"vnode-compact", NULL},
129
    [4] = {"vnode-retention", NULL},
130
    [5] = {"vnode-scan", NULL},
131
};
132

133
#define MIN_ASYNC_ID 1
134
#define MAX_ASYNC_ID (sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]) - 1)
135

136
static void vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
7,417,312✔
137
  int32_t ret;
138

139
  if (task->channel != NULL && task->channel->scheduled == task) {
7,417,312✔
140
    task->channel->scheduled = NULL;
×
141
    if (task->channel->state == EVA_CHANNEL_STATE_CLOSE) {
×
142
      taosMemoryFree(task->channel);
×
143
    } else {
144
      for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
145
        SVATask *nextTask = task->channel->queue[i].next;
×
146
        if (nextTask != &task->channel->queue[i]) {
×
147
          if (task->channel->scheduled == NULL) {
×
148
            task->channel->scheduled = nextTask;
×
149
            nextTask->next->prev = nextTask->prev;
×
150
            nextTask->prev->next = nextTask->next;
×
151
          } else {
152
            nextTask->priorScore++;
×
153
            int32_t newPriority = VATASK_PIORITY(nextTask);
×
154
            if (newPriority != i) {
×
155
              // remove from current priority queue
156
              nextTask->prev->next = nextTask->next;
×
157
              nextTask->next->prev = nextTask->prev;
×
158
              // add to new priority queue
159
              nextTask->next = &task->channel->queue[newPriority];
×
160
              nextTask->prev = task->channel->queue[newPriority].prev;
×
161
              nextTask->next->prev = nextTask;
×
162
              nextTask->prev->next = nextTask;
×
163
            }
164
          }
165
        }
166
      }
167

168
      if (task->channel->scheduled != NULL) {
×
169
        int32_t priority = VATASK_PIORITY(task->channel->scheduled);
×
170
        task->channel->scheduled->next = &async->queue[priority];
×
171
        task->channel->scheduled->prev = async->queue[priority].prev;
×
172
        task->channel->scheduled->next->prev = task->channel->scheduled;
×
173
        task->channel->scheduled->prev->next = task->channel->scheduled;
×
174
      }
175
    }
176
  }
177

178
  ret = vHashDrop(async->taskTable, task);
7,417,312✔
179
  TAOS_UNUSED(ret);
180
  async->numTasks--;
7,417,312✔
181

182
  if (task->numWait == 0) {
7,417,312✔
183
    (void)taosThreadCondDestroy(&task->waitCond);
6,110,437✔
184
    taosMemoryFree(task);
6,110,437✔
185
  } else if (task->numWait == 1) {
1,306,875✔
186
    (void)taosThreadCondSignal(&task->waitCond);
1,306,875✔
187
  } else {
188
    (void)taosThreadCondBroadcast(&task->waitCond);
×
189
  }
190
}
7,417,312✔
191

192
static void vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
1,173,679✔
193
  while (async->queue[0].next != &async->queue[0] || async->queue[1].next != &async->queue[1] ||
1,173,679✔
194
         async->queue[2].next != &async->queue[2]) {
1,173,679✔
195
    for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
196
      while (async->queue[i].next != &async->queue[i]) {
×
197
        SVATask *task = async->queue[i].next;
×
198
        task->prev->next = task->next;
×
199
        task->next->prev = task->prev;
×
200
        if (task->cancel) {
×
201
          if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
×
202
                                             .cancel = task->cancel,
×
203
                                             .arg = task->arg,
×
204
                                         }) == NULL) {
205
            vError("failed to push cancel task into array");
×
206
          };
207
        }
208
        vnodeAsyncTaskDone(async, task);
×
209
      }
210
    }
211
  }
212
}
1,173,679✔
213

214
static void *vnodeAsyncLoop(void *arg) {
1,173,679✔
215
  SVWorker *worker = (SVWorker *)arg;
1,173,679✔
216
  SVAsync  *async = worker->async;
1,173,679✔
217
  SArray   *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
1,173,679✔
218
  if (cancelArray == NULL) {
1,173,679✔
219
    return NULL;
×
220
  }
221

222
  setThreadName(async->label);
1,173,679✔
223
  taosSetCpuAffinity(THREAD_CAT_WRITE);
1,173,679✔
224

225
  for (;;) {
7,386,918✔
226
    (void)taosThreadMutexLock(&async->mutex);
8,559,273✔
227

228
    // finish last running task
229
    if (worker->runningTask != NULL) {
8,560,597✔
230
      vnodeAsyncTaskDone(async, worker->runningTask);
7,386,918✔
231
      worker->runningTask = NULL;
7,386,918✔
232
    }
233

234
    for (;;) {
235
      if (async->stop || worker->workerId >= async->numWorkers) {
15,470,110✔
236
        if (async->stop) {  // cancel all tasks
1,173,679✔
237
          vnodeAsyncCancelAllTasks(async, cancelArray);
1,173,679✔
238
        }
239
        worker->state = EVA_WORKER_STATE_STOP;
1,173,679✔
240
        async->numLaunchWorkers--;
1,173,679✔
241
        (void)taosThreadMutexUnlock(&async->mutex);
1,173,679✔
242
        goto _exit;
1,173,679✔
243
      }
244

245
      for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
57,185,724✔
246
        SVATask *task = async->queue[i].next;
42,889,293✔
247
        if (task != &async->queue[i]) {
42,889,293✔
248
          if (worker->runningTask == NULL) {
7,386,918✔
249
            worker->runningTask = task;
7,386,918✔
250
            task->prev->next = task->next;
7,386,918✔
251
            task->next->prev = task->prev;
7,386,918✔
252
          } else {  // promote priority
253
            task->priorScore++;
×
254
            int32_t priority = VATASK_PIORITY(task);
×
255
            if (priority != i) {
×
256
              // remove from current priority queue
257
              task->prev->next = task->next;
×
258
              task->next->prev = task->prev;
×
259
              // add to new priority queue
260
              task->next = &async->queue[priority];
×
261
              task->prev = async->queue[priority].prev;
×
262
              task->next->prev = task;
×
263
              task->prev->next = task;
×
264
            }
265
          }
266
        }
267
      }
268

269
      if (worker->runningTask == NULL) {
14,296,431✔
270
        worker->state = EVA_WORKER_STATE_IDLE;
6,909,513✔
271
        async->numIdleWorkers++;
6,909,513✔
272
        (void)taosThreadCondWait(&async->hasTask, &async->mutex);
6,909,513✔
273
        async->numIdleWorkers--;
6,909,513✔
274
        worker->state = EVA_WORKER_STATE_ACTIVE;
6,909,513✔
275
      } else {
276
        worker->runningTask->state = EVA_TASK_STATE_RUNNING;
7,386,918✔
277
        break;
7,386,918✔
278
      }
279
    }
280

281
    (void)taosThreadMutexUnlock(&async->mutex);
7,386,918✔
282

283
    // do run the task
284
    int32_t code = worker->runningTask->execute(worker->runningTask->arg);
7,386,918✔
285
    TAOS_UNUSED(code);
286
  }
287

288
_exit:
1,173,679✔
289
  for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
1,173,679✔
290
    SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
×
291
    cancel->cancel(cancel->arg);
×
292
  }
293
  taosArrayDestroy(cancelArray);
1,173,679✔
294
  return NULL;
1,171,854✔
295
}
296

297
static uint32_t vnodeAsyncTaskHash(const void *obj) {
22,883,715✔
298
  SVATask *task = (SVATask *)obj;
22,883,715✔
299
  return MurmurHash3_32((const char *)(&task->taskId), sizeof(task->taskId));
22,883,715✔
300
}
301

302
static int32_t vnodeAsyncTaskCompare(const void *obj1, const void *obj2) {
8,953,310✔
303
  SVATask *task1 = (SVATask *)obj1;
8,953,310✔
304
  SVATask *task2 = (SVATask *)obj2;
8,953,310✔
305
  if (task1->taskId < task2->taskId) {
8,953,310✔
306
    return -1;
4,693✔
307
  } else if (task1->taskId > task2->taskId) {
8,948,617✔
308
    return 1;
2,248✔
309
  }
310
  return 0;
8,946,369✔
311
}
312

313
static uint32_t vnodeAsyncChannelHash(const void *obj) {
×
314
  SVAChannel *channel = (SVAChannel *)obj;
×
315
  return MurmurHash3_32((const char *)(&channel->channelId), sizeof(channel->channelId));
×
316
}
317

318
static int32_t vnodeAsyncChannelCompare(const void *obj1, const void *obj2) {
×
319
  SVAChannel *channel1 = (SVAChannel *)obj1;
×
320
  SVAChannel *channel2 = (SVAChannel *)obj2;
×
321
  if (channel1->channelId < channel2->channelId) {
×
322
    return -1;
×
323
  } else if (channel1->channelId > channel2->channelId) {
×
324
    return 1;
×
325
  }
326
  return 0;
×
327
}
328

329
static int32_t vnodeAsyncInit(SVAsync **async, const char *label) {
3,605,365✔
330
  int32_t ret;
331

332
  if (async == NULL) {
3,605,365✔
333
    return TSDB_CODE_INVALID_PARA;
×
334
  }
335

336
  if (label == NULL) {
3,605,365✔
337
    label = "anonymous";
×
338
  }
339

340
  (*async) = (SVAsync *)taosMemoryCalloc(1, sizeof(SVAsync) + strlen(label) + 1);
3,605,365✔
341
  if ((*async) == NULL) {
3,605,365✔
342
    return terrno;
×
343
  }
344

345
  memcpy((char *)((*async) + 1), label, strlen(label) + 1);
3,605,365✔
346
  (*async)->label = (const char *)((*async) + 1);
3,605,365✔
347

348
  (void)taosThreadMutexInit(&(*async)->mutex, NULL);
3,605,365✔
349
  (void)taosThreadCondInit(&(*async)->hasTask, NULL);
3,605,365✔
350
  (*async)->stop = false;
3,605,365✔
351

352
  // worker
353
  (*async)->numWorkers = VNODE_ASYNC_DEFAULT_WORKERS;
3,605,365✔
354
  (*async)->numLaunchWorkers = 0;
3,605,365✔
355
  (*async)->numIdleWorkers = 0;
3,605,365✔
356
  for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) {
926,578,805✔
357
    (*async)->workers[i].async = (*async);
922,973,440✔
358
    (*async)->workers[i].workerId = i;
922,973,440✔
359
    (*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
922,973,440✔
360
    (*async)->workers[i].runningTask = NULL;
922,973,440✔
361
  }
362

363
  // channel
364
  (*async)->nextChannelId = 0;
3,605,365✔
365
  (*async)->numChannels = 0;
3,605,365✔
366
  (*async)->chList.prev = &(*async)->chList;
3,605,365✔
367
  (*async)->chList.next = &(*async)->chList;
3,605,365✔
368
  ret = vHashInit(&(*async)->channelTable, vnodeAsyncChannelHash, vnodeAsyncChannelCompare);
3,605,365✔
369
  if (ret != 0) {
3,605,365✔
370
    (void)taosThreadMutexDestroy(&(*async)->mutex);
×
371
    (void)taosThreadCondDestroy(&(*async)->hasTask);
×
372
    taosMemoryFree(*async);
×
373
    return ret;
×
374
  }
375

376
  // task
377
  (*async)->nextTaskId = 0;
3,605,365✔
378
  (*async)->numTasks = 0;
3,605,365✔
379
  for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
14,421,460✔
380
    (*async)->queue[i].next = &(*async)->queue[i];
10,816,095✔
381
    (*async)->queue[i].prev = &(*async)->queue[i];
10,816,095✔
382
  }
383
  ret = vHashInit(&(*async)->taskTable, vnodeAsyncTaskHash, vnodeAsyncTaskCompare);
3,605,365✔
384
  if (ret != 0) {
3,605,365✔
385
    vHashDestroy(&(*async)->channelTable);
×
386
    (void)taosThreadMutexDestroy(&(*async)->mutex);
×
387
    (void)taosThreadCondDestroy(&(*async)->hasTask);
×
388
    taosMemoryFree(*async);
×
389
    return ret;
×
390
  }
391

392
  return 0;
3,605,365✔
393
}
394

395
static int32_t vnodeAsyncDestroy(SVAsync **async) {
3,605,365✔
396
  if ((*async) == NULL) {
3,605,365✔
397
    return TSDB_CODE_INVALID_PARA;
×
398
  }
399

400
  // set stop and broadcast
401
  (void)taosThreadMutexLock(&(*async)->mutex);
3,605,365✔
402
  (*async)->stop = true;
3,605,365✔
403
  (void)taosThreadCondBroadcast(&(*async)->hasTask);
3,605,365✔
404
  (void)taosThreadMutexUnlock(&(*async)->mutex);
3,605,365✔
405

406
  // join all workers
407
  for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) {
926,578,805✔
408
    (void)taosThreadMutexLock(&(*async)->mutex);
922,973,440✔
409
    EVWorkerState state = (*async)->workers[i].state;
922,973,440✔
410
    (void)taosThreadMutexUnlock(&(*async)->mutex);
922,973,440✔
411

412
    if (state == EVA_WORKER_STATE_UINIT) {
922,973,440✔
413
      continue;
921,799,761✔
414
    }
415

416
    (void)taosThreadJoin((*async)->workers[i].thread, NULL);
1,173,679✔
417
    (*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
1,173,679✔
418
  }
419

420
  // close all channels
421
  for (SVAChannel *channel = (*async)->chList.next; channel != &(*async)->chList; channel = (*async)->chList.next) {
3,605,365✔
422
    channel->next->prev = channel->prev;
×
423
    channel->prev->next = channel->next;
×
424

425
    int32_t ret = vHashDrop((*async)->channelTable, channel);
×
426
    TAOS_UNUSED(ret);
427
    (*async)->numChannels--;
×
428
    taosMemoryFree(channel);
×
429
  }
430

431
  (void)taosThreadMutexDestroy(&(*async)->mutex);
3,605,365✔
432
  (void)taosThreadCondDestroy(&(*async)->hasTask);
3,605,365✔
433

434
  vHashDestroy(&(*async)->channelTable);
3,605,365✔
435
  vHashDestroy(&(*async)->taskTable);
3,605,365✔
436
  taosMemoryFree(*async);
3,605,365✔
437
  *async = NULL;
3,605,365✔
438

439
  return 0;
3,605,365✔
440
}
441

442
static void vnodeAsyncLaunchWorker(SVAsync *async) {
1,173,679✔
443
  TdThreadAttr thAttr;
1,172,920✔
444
  (void)taosThreadAttrInit(&thAttr);
1,173,679✔
445
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,173,679✔
446
#ifdef TD_COMPACT_OS
447
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
448
#endif
449
  for (int32_t i = 0; i < async->numWorkers; i++) {
2,153,685✔
450
    if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) {
2,153,685✔
451
      continue;
980,006✔
452
    } else if (async->workers[i].state == EVA_WORKER_STATE_STOP) {
1,173,679✔
453
      int32_t ret = taosThreadJoin(async->workers[i].thread, NULL);
×
454
      async->workers[i].state = EVA_WORKER_STATE_UINIT;
×
455
    }
456

457
    int32_t ret = taosThreadCreate(&async->workers[i].thread, &thAttr, vnodeAsyncLoop, &async->workers[i]);
1,173,679✔
458
    if (ret) {
1,173,679✔
459
      vError("failed to create worker thread since %s", tstrerror(ret));
×
460
    } else {
461
      async->workers[i].state = EVA_WORKER_STATE_ACTIVE;
1,173,679✔
462
      async->numLaunchWorkers++;
1,173,679✔
463
    }
464
    break;
1,173,679✔
465
  }
466
#ifdef TD_ASTRA
467
  (void)taosThreadAttrDestroy(&thAttr);
468
#endif
469
}
1,173,679✔
470

471
int32_t vnodeAsyncOpen() {
721,073✔
472
  int32_t code = 0;
721,073✔
473
  int32_t lino = 0;
721,073✔
474

475
  int32_t numOfThreads[] = {
721,073✔
476
      0,                        //
477
      tsNumOfCommitThreads,     // vnode-commit
478
      tsNumOfCommitThreads,     // vnode-merge
479
      tsNumOfCompactThreads,    // vnode-compact
480
      tsNumOfRetentionThreads,  // vnode-retention
481
      2,                        // vnode-scan
482
  };
483

484
  for (int32_t i = 1; i < sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]); i++) {
4,326,438✔
485
    code = vnodeAsyncInit(&GVnodeAsyncs[i].async, GVnodeAsyncs[i].label);
3,605,365✔
486
    TSDB_CHECK_CODE(code, lino, _exit);
3,605,365✔
487

488
    code = vnodeAsyncSetWorkers(i, numOfThreads[i]);
3,605,365✔
489
    TSDB_CHECK_CODE(code, lino, _exit);
3,605,365✔
490
  }
491

492
_exit:
721,073✔
493
  return code;
721,073✔
494
}
495

496
void vnodeAsyncClose() {
721,073✔
497
  for (int32_t i = 1; i < sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]); i++) {
4,326,438✔
498
    int32_t ret = vnodeAsyncDestroy(&GVnodeAsyncs[i].async);
3,605,365✔
499
  }
500
}
721,073✔
501

502
int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void *), void (*complete)(void *), void *arg,
7,417,312✔
503
                   SVATaskID *taskID) {
504
  SVAChannelID channelID = {
7,417,312✔
505
      .async = async,
506
      .id = 0,
507
  };
508
  return vnodeAsyncC(&channelID, priority, execute, complete, arg, taskID);
7,416,537✔
509
}
510

511
int32_t vnodeAsyncC(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *),
7,416,676✔
512
                    void *arg, SVATaskID *taskID) {
513
  if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || execute == NULL ||
7,416,676✔
514
      channelID->id < 0) {
7,415,901✔
515
    return TSDB_CODE_INVALID_PARA;
×
516
  }
517

518
  int32_t  ret;
519
  int64_t  id;
520
  SVAsync *async = GVnodeAsyncs[channelID->async].async;
7,417,147✔
521

522
  // create task object
523
  SVATask *task = (SVATask *)taosMemoryCalloc(1, sizeof(SVATask));
7,417,312✔
524
  if (task == NULL) {
7,416,905✔
525
    return terrno;
×
526
  }
527

528
  task->priority = priority;
7,416,905✔
529
  task->priorScore = 0;
7,417,312✔
530
  task->execute = execute;
7,417,312✔
531
  task->cancel = cancel;
7,417,312✔
532
  task->arg = arg;
7,417,312✔
533
  task->state = EVA_TASK_STATE_WAITTING;
7,417,312✔
534
  task->numWait = 0;
7,417,312✔
535
  (void)taosThreadCondInit(&task->waitCond, NULL);
7,417,312✔
536

537
  // schedule task
538
  (void)taosThreadMutexLock(&async->mutex);
7,416,511✔
539

540
  if (channelID->id == 0) {
7,417,312✔
541
    task->channel = NULL;
7,417,312✔
542
  } else {
543
    SVAChannel channel = {
×
544
        .channelId = channelID->id,
×
545
    };
546
    ret = vHashGet(async->channelTable, &channel, (void **)&task->channel);
×
547
    TAOS_UNUSED(ret);
548
    if (task->channel == NULL) {
×
549
      (void)taosThreadMutexUnlock(&async->mutex);
×
550
      (void)taosThreadCondDestroy(&task->waitCond);
×
551
      taosMemoryFree(task);
×
552
      return TSDB_CODE_INVALID_PARA;
×
553
    }
554
  }
555

556
  task->taskId = id = ++async->nextTaskId;
7,417,312✔
557

558
  // add task to hash table
559
  ret = vHashPut(async->taskTable, task);
7,417,312✔
560
  if (ret != 0) {
7,417,312✔
561
    (void)taosThreadMutexUnlock(&async->mutex);
×
562
    (void)taosThreadCondDestroy(&task->waitCond);
×
563
    taosMemoryFree(task);
×
564
    return ret;
×
565
  }
566

567
  async->numTasks++;
7,417,312✔
568

569
  // add task to queue
570
  if (task->channel == NULL || task->channel->scheduled == NULL) {
7,417,312✔
571
    // add task to async->queue
572
    if (task->channel) {
7,417,312✔
573
      task->channel->scheduled = task;
×
574
    }
575

576
    task->next = &async->queue[priority];
7,417,312✔
577
    task->prev = async->queue[priority].prev;
7,417,312✔
578
    task->next->prev = task;
7,417,312✔
579
    task->prev->next = task;
7,417,312✔
580

581
    // signal worker or launch new worker
582
    if (async->numIdleWorkers > 0) {
7,417,312✔
583
      (void)taosThreadCondSignal(&(async->hasTask));
5,780,209✔
584
    } else if (async->numLaunchWorkers < async->numWorkers) {
1,637,103✔
585
      vnodeAsyncLaunchWorker(async);
1,173,679✔
586
    }
587
  } else if (task->channel->scheduled->state == EVA_TASK_STATE_RUNNING ||
×
588
             priority >= VATASK_PIORITY(task->channel->scheduled)) {
×
589
    // add task to task->channel->queue
590
    task->next = &task->channel->queue[priority];
×
591
    task->prev = task->channel->queue[priority].prev;
×
592
    task->next->prev = task;
×
593
    task->prev->next = task;
×
594
  } else {
595
    // remove task->channel->scheduled from queue
596
    task->channel->scheduled->prev->next = task->channel->scheduled->next;
×
597
    task->channel->scheduled->next->prev = task->channel->scheduled->prev;
×
598

599
    // promote priority and add task->channel->scheduled to task->channel->queue
600
    task->channel->scheduled->priorScore++;
×
601
    int32_t newPriority = VATASK_PIORITY(task->channel->scheduled);
×
602
    task->channel->scheduled->next = &task->channel->queue[newPriority];
×
603
    task->channel->scheduled->prev = task->channel->queue[newPriority].prev;
×
604
    task->channel->scheduled->next->prev = task->channel->scheduled;
×
605
    task->channel->scheduled->prev->next = task->channel->scheduled;
×
606

607
    // add task to queue
608
    task->channel->scheduled = task;
×
609
    task->next = &async->queue[priority];
×
610
    task->prev = async->queue[priority].prev;
×
611
    task->next->prev = task;
×
612
    task->prev->next = task;
×
613
  }
614

615
  (void)taosThreadMutexUnlock(&async->mutex);
7,417,312✔
616

617
  if (taskID != NULL) {
7,417,312✔
618
    taskID->async = channelID->async;
7,417,312✔
619
    taskID->id = id;
7,417,312✔
620
  }
621

622
  return 0;
7,417,312✔
623
}
624

625
void vnodeAWait(SVATaskID *taskID) {
89,827,025✔
626
  if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
89,827,025✔
627
    return;
83,322,622✔
628
  }
629

630
  SVAsync *async = GVnodeAsyncs[taskID->async].async;
6,504,403✔
631
  SVATask *task = NULL;
6,504,403✔
632
  SVATask  task2 = {
6,502,411✔
633
       .taskId = taskID->id,
6,502,411✔
634
  };
635

636
  (void)taosThreadMutexLock(&async->mutex);
6,503,047✔
637

638
  int32_t ret = vHashGet(async->taskTable, &task2, (void **)&task);
6,505,675✔
639
  if (task) {
6,505,675✔
640
    task->numWait++;
1,306,875✔
641
    (void)taosThreadCondWait(&task->waitCond, &async->mutex);
1,306,875✔
642
    task->numWait--;
1,306,875✔
643

644
    if (task->numWait == 0) {
1,306,875✔
645
      (void)taosThreadCondDestroy(&task->waitCond);
1,306,875✔
646
      taosMemoryFree(task);
1,306,875✔
647
    }
648
  }
649

650
  (void)taosThreadMutexUnlock(&async->mutex);
6,505,675✔
651
}
652

653
int32_t vnodeACancel(SVATaskID *taskID) {
73,976,964✔
654
  if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
73,976,964✔
655
    return TSDB_CODE_INVALID_PARA;
73,522,215✔
656
  }
657

658
  int32_t  ret = 0;
454,837✔
659
  SVAsync *async = GVnodeAsyncs[taskID->async].async;
454,837✔
660
  SVATask *task = NULL;
454,837✔
661
  SVATask  task2 = {
454,837✔
662
       .taskId = taskID->id,
454,837✔
663
  };
664
  void (*cancel)(void *) = NULL;
454,837✔
665
  void *arg = NULL;
454,837✔
666

667
  (void)taosThreadMutexLock(&async->mutex);
454,837✔
668

669
  ret = vHashGet(async->taskTable, &task2, (void **)&task);
454,837✔
670
  if (task) {
454,837✔
671
    if (task->state == EVA_TASK_STATE_WAITTING) {
84,368✔
672
      cancel = task->cancel;
30,394✔
673
      arg = task->arg;
30,394✔
674
      task->next->prev = task->prev;
30,394✔
675
      task->prev->next = task->next;
30,394✔
676
      vnodeAsyncTaskDone(async, task);
30,394✔
677
    } else {
678
      ret = TSDB_CODE_FAILED;
53,974✔
679
    }
680
  }
681

682
  (void)taosThreadMutexUnlock(&async->mutex);
454,837✔
683

684
  if (cancel) {
454,837✔
685
    cancel(arg);
30,394✔
686
  }
687

688
  return ret;
454,837✔
689
}
690

691
int32_t vnodeAsyncSetWorkers(int64_t asyncID, int32_t numWorkers) {
3,605,365✔
692
  if (asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) {
3,605,365✔
693
    return TSDB_CODE_INVALID_PARA;
×
694
  }
695
  int32_t  ret;
696
  SVAsync *async = GVnodeAsyncs[asyncID].async;
3,605,365✔
697
  (void)taosThreadMutexLock(&async->mutex);
3,605,365✔
698
  async->numWorkers = numWorkers;
3,605,365✔
699
  if (async->numIdleWorkers > 0) {
3,605,365✔
700
    (void)taosThreadCondBroadcast(&async->hasTask);
×
701
  }
702
  (void)taosThreadMutexUnlock(&async->mutex);
3,605,365✔
703

704
  return 0;
3,605,365✔
705
}
706

707
int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) {
×
708
  if (channelID == NULL || asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID) {
×
709
    return TSDB_CODE_INVALID_PARA;
×
710
  }
711

712
  SVAsync *async = GVnodeAsyncs[asyncID].async;
×
713

714
  // create channel object
715
  SVAChannel *channel = (SVAChannel *)taosMemoryMalloc(sizeof(SVAChannel));
×
716
  if (channel == NULL) {
×
717
    return terrno;
×
718
  }
719
  channel->state = EVA_CHANNEL_STATE_OPEN;
×
720
  for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
721
    channel->queue[i].next = &channel->queue[i];
×
722
    channel->queue[i].prev = &channel->queue[i];
×
723
  }
724
  channel->scheduled = NULL;
×
725

726
  // register channel
727
  (void)taosThreadMutexLock(&async->mutex);
×
728

729
  channel->channelId = channelID->id = ++async->nextChannelId;
×
730

731
  // add to hash table
732
  int32_t ret = vHashPut(async->channelTable, channel);
×
733
  if (ret != 0) {
×
734
    (void)taosThreadMutexUnlock(&async->mutex);
×
735
    taosMemoryFree(channel);
×
736
    return ret;
×
737
  }
738

739
  // add to list
740
  channel->next = &async->chList;
×
741
  channel->prev = async->chList.prev;
×
742
  channel->next->prev = channel;
×
743
  channel->prev->next = channel;
×
744

745
  async->numChannels++;
×
746

747
  (void)taosThreadMutexUnlock(&async->mutex);
×
748

749
  channelID->async = asyncID;
×
750
  return 0;
×
751
}
752

753
int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
×
754
  if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || channelID->id <= 0) {
×
755
    return TSDB_CODE_INVALID_PARA;
×
756
  }
757

758
  SVAsync    *async = GVnodeAsyncs[channelID->async].async;
×
759
  SVAChannel *channel = NULL;
×
760
  SVAChannel  channel2 = {
×
761
       .channelId = channelID->id,
×
762
  };
763
  SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
×
764
  if (cancelArray == NULL) {
×
765
    return terrno;
×
766
  }
767

768
  (void)taosThreadMutexLock(&async->mutex);
×
769

770
  int32_t ret = vHashGet(async->channelTable, &channel2, (void **)&channel);
×
771
  TAOS_UNUSED(ret);
772
  if (channel) {
×
773
    // unregister channel
774
    channel->next->prev = channel->prev;
×
775
    channel->prev->next = channel->next;
×
776
    ret = vHashDrop(async->channelTable, channel);
×
777
    async->numChannels--;
×
778

779
    // cancel all waiting tasks
780
    for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
781
      while (channel->queue[i].next != &channel->queue[i]) {
×
782
        SVATask *task = channel->queue[i].next;
×
783
        task->prev->next = task->next;
×
784
        task->next->prev = task->prev;
×
785
        if (task->cancel) {
×
786
          if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
×
787
                                             .cancel = task->cancel,
×
788
                                             .arg = task->arg,
×
789
                                         }) == NULL) {
790
            vError("failed to push cancel info");
×
791
          };
792
        }
793
        vnodeAsyncTaskDone(async, task);
×
794
      }
795
    }
796

797
    // cancel or wait the scheduled task
798
    if (channel->scheduled == NULL || channel->scheduled->state == EVA_TASK_STATE_WAITTING) {
×
799
      if (channel->scheduled) {
×
800
        channel->scheduled->prev->next = channel->scheduled->next;
×
801
        channel->scheduled->next->prev = channel->scheduled->prev;
×
802
        if (channel->scheduled->cancel) {
×
803
          if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
×
804
                                             .cancel = channel->scheduled->cancel,
×
805
                                             .arg = channel->scheduled->arg,
×
806
                                         }) == NULL) {
807
            vError("failed to push cancel info");
×
808
          }
809
        }
810
        vnodeAsyncTaskDone(async, channel->scheduled);
×
811
      }
812
      taosMemoryFree(channel);
×
813
    } else {
814
      if (waitRunning) {
×
815
        // wait task
816
        SVATask *task = channel->scheduled;
×
817
        task->numWait++;
×
818
        (void)taosThreadCondWait(&task->waitCond, &async->mutex);
×
819
        task->numWait--;
×
820
        if (task->numWait == 0) {
×
821
          (void)taosThreadCondDestroy(&task->waitCond);
×
822
          taosMemoryFree(task);
×
823
        }
824

825
        taosMemoryFree(channel);
×
826
      } else {
827
        channel->state = EVA_CHANNEL_STATE_CLOSE;
×
828
      }
829
    }
830
  }
831

832
  (void)taosThreadMutexUnlock(&async->mutex);
×
833
  for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
×
834
    SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
×
835
    cancel->cancel(cancel->arg);
×
836
  }
837
  taosArrayDestroy(cancelArray);
×
838

839
  channelID->async = 0;
×
840
  channelID->id = 0;
×
841
  return 0;
×
842
}
843

844
const char *vnodeGetATaskName(EVATaskT taskType) {
45,436✔
845
  switch (taskType) {
45,436✔
846
    case EVA_TASK_COMMIT:
28,374✔
847
      return "vnode-commit";
28,374✔
848
    case EVA_TASK_MERGE:
17,062✔
849
      return "vnode-merge";
17,062✔
850
    case EVA_TASK_COMPACT:
×
851
      return "vnode-compact";
×
UNCOV
852
    case EVA_TASK_RETENTION:
×
UNCOV
853
      return "vnode-retention";
×
854
    default:
×
855
      return "unknown";
×
856
  }
857
}
858

859
bool vnodeATaskValid(SVATaskID *taskID) {
1,412,501✔
860
  if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
1,412,501✔
861
    return false;
323,922✔
862
  }
863

864
  SVAsync *async = GVnodeAsyncs[taskID->async].async;
1,088,579✔
865
  SVATask  task2 = {
1,088,579✔
866
       .taskId = taskID->id,
1,088,579✔
867
  };
868
  SVATask *task = NULL;
1,088,579✔
869

870
  (void)taosThreadMutexLock(&async->mutex);
1,088,579✔
871
  int32_t ret = vHashGet(async->taskTable, &task2, (void **)&task);
1,088,579✔
872
  (void)taosThreadMutexUnlock(&async->mutex);
1,088,579✔
873

874
  return ret == 0 && task != NULL;
1,088,579✔
875
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc