• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4929

16 Jan 2026 02:32AM UTC coverage: 66.723% (+0.02%) from 66.708%
#4929

push

travis-ci

web-flow
enh: interp supports using non-null prev/next values to fill (#34236)

281 of 327 new or added lines in 11 files covered. (85.93%)

539 existing lines in 127 files now uncovered.

203225 of 304580 relevant lines covered (66.72%)

128590777.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.9
/source/dnode/vnode/src/vnd/vnodeAsync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "vnd.h"
17
#include "vnodeHash.h"
18

19
typedef struct SVAsync    SVAsync;
20
typedef struct SVATask    SVATask;
21
typedef struct SVAChannel SVAChannel;
22

23
#define VNODE_ASYNC_DEFAULT_WORKERS 4
24
#define VNODE_ASYNC_MAX_WORKERS     256
25

26
// priority
27

28
#define EVA_PRIORITY_MAX (EVA_PRIORITY_LOW + 1)
29

30
// worker
31
typedef enum {
32
  EVA_WORKER_STATE_UINIT = 0,
33
  EVA_WORKER_STATE_ACTIVE,
34
  EVA_WORKER_STATE_IDLE,
35
  EVA_WORKER_STATE_STOP,
36
} EVWorkerState;
37

38
typedef struct {
39
  SVAsync      *async;
40
  int32_t       workerId;
41
  EVWorkerState state;
42
  TdThread      thread;
43
  SVATask      *runningTask;
44
} SVWorker;
45

46
// task
47
typedef enum {
48
  EVA_TASK_STATE_WAITTING = 0,
49
  EVA_TASK_STATE_RUNNING,
50
} EVATaskState;
51

52
struct SVATask {
53
  int64_t     taskId;
54
  EVAPriority priority;
55
  int32_t     priorScore;
56
  SVAChannel *channel;
57
  int32_t (*execute)(void *);
58
  void (*cancel)(void *);
59
  void        *arg;
60
  EVATaskState state;
61

62
  // wait
63
  int32_t      numWait;
64
  TdThreadCond waitCond;
65

66
  // queue
67
  struct SVATask *prev;
68
  struct SVATask *next;
69
};
70

71
typedef struct {
72
  void (*cancel)(void *);
73
  void *arg;
74
} SVATaskCancelInfo;
75

76
#define VATASK_PIORITY(task_) ((task_)->priority - ((task_)->priorScore / 4))
77

78
// async channel
79
typedef enum {
80
  EVA_CHANNEL_STATE_OPEN = 0,
81
  EVA_CHANNEL_STATE_CLOSE,
82
} EVAChannelState;
83

84
struct SVAChannel {
85
  int64_t         channelId;
86
  EVAChannelState state;
87
  SVATask         queue[EVA_PRIORITY_MAX];
88
  SVATask        *scheduled;
89

90
  SVAChannel *prev;
91
  SVAChannel *next;
92
};
93

94
// async handle
95
struct SVAsync {
96
  const char *label;
97

98
  TdThreadMutex mutex;
99
  TdThreadCond  hasTask;
100
  bool          stop;
101

102
  // worker
103
  int32_t  numWorkers;
104
  int32_t  numLaunchWorkers;
105
  int32_t  numIdleWorkers;
106
  SVWorker workers[VNODE_ASYNC_MAX_WORKERS];
107

108
  // channel
109
  int64_t      nextChannelId;
110
  int32_t      numChannels;
111
  SVAChannel   chList;
112
  SVHashTable *channelTable;
113

114
  // task
115
  int64_t      nextTaskId;
116
  int32_t      numTasks;
117
  SVATask      queue[EVA_PRIORITY_MAX];
118
  SVHashTable *taskTable;
119
};
120

121
struct {
122
  const char *label;
123
  SVAsync    *async;
124
} GVnodeAsyncs[] = {
125
    [0] = {NULL, NULL},
126
    [1] = {"vnode-commit", NULL},
127
    [2] = {"vnode-merge", NULL},
128
    [3] = {"vnode-compact", NULL},
129
    [4] = {"vnode-retention", NULL},
130
    [5] = {"vnode-scan", NULL},
131
};
132

133
#define MIN_ASYNC_ID 1
134
#define MAX_ASYNC_ID (sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]) - 1)
135

136
static void vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
6,295,476✔
137
  int32_t ret;
138

139
  if (task->channel != NULL && task->channel->scheduled == task) {
6,295,476✔
140
    task->channel->scheduled = NULL;
×
141
    if (task->channel->state == EVA_CHANNEL_STATE_CLOSE) {
×
142
      taosMemoryFree(task->channel);
×
143
    } else {
144
      for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
145
        SVATask *nextTask = task->channel->queue[i].next;
×
146
        if (nextTask != &task->channel->queue[i]) {
×
147
          if (task->channel->scheduled == NULL) {
×
148
            task->channel->scheduled = nextTask;
×
149
            nextTask->next->prev = nextTask->prev;
×
150
            nextTask->prev->next = nextTask->next;
×
151
          } else {
152
            nextTask->priorScore++;
×
153
            int32_t newPriority = VATASK_PIORITY(nextTask);
×
154
            if (newPriority != i) {
×
155
              // remove from current priority queue
156
              nextTask->prev->next = nextTask->next;
×
157
              nextTask->next->prev = nextTask->prev;
×
158
              // add to new priority queue
159
              nextTask->next = &task->channel->queue[newPriority];
×
160
              nextTask->prev = task->channel->queue[newPriority].prev;
×
161
              nextTask->next->prev = nextTask;
×
162
              nextTask->prev->next = nextTask;
×
163
            }
164
          }
165
        }
166
      }
167

168
      if (task->channel->scheduled != NULL) {
×
169
        int32_t priority = VATASK_PIORITY(task->channel->scheduled);
×
170
        task->channel->scheduled->next = &async->queue[priority];
×
171
        task->channel->scheduled->prev = async->queue[priority].prev;
×
172
        task->channel->scheduled->next->prev = task->channel->scheduled;
×
173
        task->channel->scheduled->prev->next = task->channel->scheduled;
×
174
      }
175
    }
176
  }
177

178
  ret = vHashDrop(async->taskTable, task);
6,295,476✔
179
  TAOS_UNUSED(ret);
180
  async->numTasks--;
6,294,848✔
181

182
  if (task->numWait == 0) {
6,295,476✔
183
    (void)taosThreadCondDestroy(&task->waitCond);
5,393,559✔
184
    taosMemoryFree(task);
5,393,559✔
185
  } else if (task->numWait == 1) {
901,917✔
186
    (void)taosThreadCondSignal(&task->waitCond);
901,917✔
187
  } else {
188
    (void)taosThreadCondBroadcast(&task->waitCond);
×
189
  }
190
}
6,295,476✔
191

192
static void vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
910,221✔
193
  while (async->queue[0].next != &async->queue[0] || async->queue[1].next != &async->queue[1] ||
910,221✔
194
         async->queue[2].next != &async->queue[2]) {
910,221✔
195
    for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
196
      while (async->queue[i].next != &async->queue[i]) {
×
197
        SVATask *task = async->queue[i].next;
×
198
        task->prev->next = task->next;
×
199
        task->next->prev = task->prev;
×
200
        if (task->cancel) {
×
201
          if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
×
202
                                             .cancel = task->cancel,
×
203
                                             .arg = task->arg,
×
204
                                         }) == NULL) {
205
            vError("failed to push cancel task into array");
×
206
          };
207
        }
208
        vnodeAsyncTaskDone(async, task);
×
209
      }
210
    }
211
  }
212
}
910,221✔
213

214
static void *vnodeAsyncLoop(void *arg) {
910,221✔
215
  SVWorker *worker = (SVWorker *)arg;
910,221✔
216
  SVAsync  *async = worker->async;
910,221✔
217
  SArray   *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
910,221✔
218
  if (cancelArray == NULL) {
910,221✔
219
    return NULL;
×
220
  }
221

222
  setThreadName(async->label);
910,221✔
223

224
  for (;;) {
6,278,998✔
225
    (void)taosThreadMutexLock(&async->mutex);
7,189,219✔
226

227
    // finish last running task
228
    if (worker->runningTask != NULL) {
7,189,219✔
229
      vnodeAsyncTaskDone(async, worker->runningTask);
6,278,998✔
230
      worker->runningTask = NULL;
6,278,998✔
231
    }
232

233
    for (;;) {
234
      if (async->stop || worker->workerId >= async->numWorkers) {
12,969,626✔
235
        if (async->stop) {  // cancel all tasks
910,221✔
236
          vnodeAsyncCancelAllTasks(async, cancelArray);
910,221✔
237
        }
238
        worker->state = EVA_WORKER_STATE_STOP;
910,221✔
239
        async->numLaunchWorkers--;
910,221✔
240
        (void)taosThreadMutexUnlock(&async->mutex);
910,221✔
241
        goto _exit;
910,221✔
242
      }
243

244
      for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
48,237,620✔
245
        SVATask *task = async->queue[i].next;
36,178,215✔
246
        if (task != &async->queue[i]) {
36,178,215✔
247
          if (worker->runningTask == NULL) {
6,278,998✔
248
            worker->runningTask = task;
6,278,998✔
249
            task->prev->next = task->next;
6,278,998✔
250
            task->next->prev = task->prev;
6,278,998✔
251
          } else {  // promote priority
252
            task->priorScore++;
×
253
            int32_t priority = VATASK_PIORITY(task);
×
254
            if (priority != i) {
×
255
              // remove from current priority queue
256
              task->prev->next = task->next;
×
257
              task->next->prev = task->prev;
×
258
              // add to new priority queue
259
              task->next = &async->queue[priority];
×
260
              task->prev = async->queue[priority].prev;
×
261
              task->next->prev = task;
×
262
              task->prev->next = task;
×
263
            }
264
          }
265
        }
266
      }
267

268
      if (worker->runningTask == NULL) {
12,059,405✔
269
        worker->state = EVA_WORKER_STATE_IDLE;
5,780,407✔
270
        async->numIdleWorkers++;
5,780,407✔
271
        (void)taosThreadCondWait(&async->hasTask, &async->mutex);
5,780,407✔
272
        async->numIdleWorkers--;
5,780,407✔
273
        worker->state = EVA_WORKER_STATE_ACTIVE;
5,780,407✔
274
      } else {
275
        worker->runningTask->state = EVA_TASK_STATE_RUNNING;
6,278,998✔
276
        break;
6,278,998✔
277
      }
278
    }
279

280
    (void)taosThreadMutexUnlock(&async->mutex);
6,278,998✔
281

282
    // do run the task
283
    int32_t code = worker->runningTask->execute(worker->runningTask->arg);
6,278,998✔
284
    TAOS_UNUSED(code);
285
  }
286

287
_exit:
910,221✔
288
  for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
910,221✔
289
    SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
×
290
    cancel->cancel(cancel->arg);
×
291
  }
292
  taosArrayDestroy(cancelArray);
909,871✔
293
  return NULL;
909,423✔
294
}
295

296
static uint32_t vnodeAsyncTaskHash(const void *obj) {
19,554,859✔
297
  SVATask *task = (SVATask *)obj;
19,554,859✔
298
  return MurmurHash3_32((const char *)(&task->taskId), sizeof(task->taskId));
19,554,859✔
299
}
300

301
static int32_t vnodeAsyncTaskCompare(const void *obj1, const void *obj2) {
7,425,861✔
302
  SVATask *task1 = (SVATask *)obj1;
7,425,861✔
303
  SVATask *task2 = (SVATask *)obj2;
7,425,861✔
304
  if (task1->taskId < task2->taskId) {
7,425,861✔
305
    return -1;
18,506✔
306
  } else if (task1->taskId > task2->taskId) {
7,407,355✔
307
    return 1;
15,846✔
308
  }
309
  return 0;
7,391,509✔
310
}
311

312
static uint32_t vnodeAsyncChannelHash(const void *obj) {
×
313
  SVAChannel *channel = (SVAChannel *)obj;
×
314
  return MurmurHash3_32((const char *)(&channel->channelId), sizeof(channel->channelId));
×
315
}
316

317
static int32_t vnodeAsyncChannelCompare(const void *obj1, const void *obj2) {
×
318
  SVAChannel *channel1 = (SVAChannel *)obj1;
×
319
  SVAChannel *channel2 = (SVAChannel *)obj2;
×
320
  if (channel1->channelId < channel2->channelId) {
×
321
    return -1;
×
322
  } else if (channel1->channelId > channel2->channelId) {
×
323
    return 1;
×
324
  }
325
  return 0;
×
326
}
327

328
static int32_t vnodeAsyncInit(SVAsync **async, const char *label) {
2,871,825✔
329
  int32_t ret;
330

331
  if (async == NULL) {
2,871,825✔
332
    return TSDB_CODE_INVALID_PARA;
×
333
  }
334

335
  if (label == NULL) {
2,871,825✔
336
    label = "anonymous";
×
337
  }
338

339
  (*async) = (SVAsync *)taosMemoryCalloc(1, sizeof(SVAsync) + strlen(label) + 1);
2,871,825✔
340
  if ((*async) == NULL) {
2,871,825✔
341
    return terrno;
×
342
  }
343

344
  memcpy((char *)((*async) + 1), label, strlen(label) + 1);
2,871,825✔
345
  (*async)->label = (const char *)((*async) + 1);
2,871,825✔
346

347
  (void)taosThreadMutexInit(&(*async)->mutex, NULL);
2,871,825✔
348
  (void)taosThreadCondInit(&(*async)->hasTask, NULL);
2,871,825✔
349
  (*async)->stop = false;
2,871,825✔
350

351
  // worker
352
  (*async)->numWorkers = VNODE_ASYNC_DEFAULT_WORKERS;
2,871,825✔
353
  (*async)->numLaunchWorkers = 0;
2,871,825✔
354
  (*async)->numIdleWorkers = 0;
2,871,825✔
355
  for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) {
738,059,025✔
356
    (*async)->workers[i].async = (*async);
735,187,200✔
357
    (*async)->workers[i].workerId = i;
735,187,200✔
358
    (*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
735,187,200✔
359
    (*async)->workers[i].runningTask = NULL;
735,187,200✔
360
  }
361

362
  // channel
363
  (*async)->nextChannelId = 0;
2,871,825✔
364
  (*async)->numChannels = 0;
2,871,825✔
365
  (*async)->chList.prev = &(*async)->chList;
2,871,825✔
366
  (*async)->chList.next = &(*async)->chList;
2,871,825✔
367
  ret = vHashInit(&(*async)->channelTable, vnodeAsyncChannelHash, vnodeAsyncChannelCompare);
2,871,825✔
368
  if (ret != 0) {
2,871,825✔
369
    (void)taosThreadMutexDestroy(&(*async)->mutex);
×
370
    (void)taosThreadCondDestroy(&(*async)->hasTask);
×
371
    taosMemoryFree(*async);
×
372
    return ret;
×
373
  }
374

375
  // task
376
  (*async)->nextTaskId = 0;
2,871,825✔
377
  (*async)->numTasks = 0;
2,871,825✔
378
  for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
11,487,300✔
379
    (*async)->queue[i].next = &(*async)->queue[i];
8,615,475✔
380
    (*async)->queue[i].prev = &(*async)->queue[i];
8,615,475✔
381
  }
382
  ret = vHashInit(&(*async)->taskTable, vnodeAsyncTaskHash, vnodeAsyncTaskCompare);
2,871,825✔
383
  if (ret != 0) {
2,871,825✔
384
    vHashDestroy(&(*async)->channelTable);
×
385
    (void)taosThreadMutexDestroy(&(*async)->mutex);
×
386
    (void)taosThreadCondDestroy(&(*async)->hasTask);
×
387
    taosMemoryFree(*async);
×
388
    return ret;
×
389
  }
390

391
  return 0;
2,871,825✔
392
}
393

394
static int32_t vnodeAsyncDestroy(SVAsync **async) {
2,871,825✔
395
  if ((*async) == NULL) {
2,871,825✔
396
    return TSDB_CODE_INVALID_PARA;
×
397
  }
398

399
  // set stop and broadcast
400
  (void)taosThreadMutexLock(&(*async)->mutex);
2,871,825✔
401
  (*async)->stop = true;
2,871,825✔
402
  (void)taosThreadCondBroadcast(&(*async)->hasTask);
2,871,825✔
403
  (void)taosThreadMutexUnlock(&(*async)->mutex);
2,871,825✔
404

405
  // join all workers
406
  for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) {
738,059,025✔
407
    (void)taosThreadMutexLock(&(*async)->mutex);
735,187,200✔
408
    EVWorkerState state = (*async)->workers[i].state;
735,187,200✔
409
    (void)taosThreadMutexUnlock(&(*async)->mutex);
735,187,200✔
410

411
    if (state == EVA_WORKER_STATE_UINIT) {
735,187,200✔
412
      continue;
734,276,979✔
413
    }
414

415
    (void)taosThreadJoin((*async)->workers[i].thread, NULL);
910,221✔
416
    (*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
910,221✔
417
  }
418

419
  // close all channels
420
  for (SVAChannel *channel = (*async)->chList.next; channel != &(*async)->chList; channel = (*async)->chList.next) {
2,871,825✔
421
    channel->next->prev = channel->prev;
×
422
    channel->prev->next = channel->next;
×
423

424
    int32_t ret = vHashDrop((*async)->channelTable, channel);
×
425
    TAOS_UNUSED(ret);
426
    (*async)->numChannels--;
×
427
    taosMemoryFree(channel);
×
428
  }
429

430
  (void)taosThreadMutexDestroy(&(*async)->mutex);
2,871,825✔
431
  (void)taosThreadCondDestroy(&(*async)->hasTask);
2,871,825✔
432

433
  vHashDestroy(&(*async)->channelTable);
2,871,825✔
434
  vHashDestroy(&(*async)->taskTable);
2,871,825✔
435
  taosMemoryFree(*async);
2,871,825✔
436
  *async = NULL;
2,871,825✔
437

438
  return 0;
2,871,825✔
439
}
440

441
static void vnodeAsyncLaunchWorker(SVAsync *async) {
910,221✔
442
  TdThreadAttr thAttr;
909,915✔
443
  (void)taosThreadAttrInit(&thAttr);
910,221✔
444
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
910,221✔
445
#ifdef TD_COMPACT_OS
446
  (void)taosThreadAttrSetStackSize(&thAttr, STACK_SIZE_SMALL);
447
#endif
448
  for (int32_t i = 0; i < async->numWorkers; i++) {
1,658,073✔
449
    if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) {
1,658,073✔
450
      continue;
747,852✔
451
    } else if (async->workers[i].state == EVA_WORKER_STATE_STOP) {
910,221✔
452
      int32_t ret = taosThreadJoin(async->workers[i].thread, NULL);
×
453
      async->workers[i].state = EVA_WORKER_STATE_UINIT;
×
454
    }
455

456
    int32_t ret = taosThreadCreate(&async->workers[i].thread, &thAttr, vnodeAsyncLoop, &async->workers[i]);
910,221✔
457
    if (ret) {
910,221✔
458
      vError("failed to create worker thread since %s", tstrerror(ret));
×
459
    } else {
460
      async->workers[i].state = EVA_WORKER_STATE_ACTIVE;
910,221✔
461
      async->numLaunchWorkers++;
910,221✔
462
    }
463
    break;
910,221✔
464
  }
465
#ifdef TD_ASTRA
466
  (void)taosThreadAttrDestroy(&thAttr);
467
#endif
468
}
910,221✔
469

470
int32_t vnodeAsyncOpen() {
574,365✔
471
  int32_t code = 0;
574,365✔
472
  int32_t lino = 0;
574,365✔
473

474
  int32_t numOfThreads[] = {
574,365✔
475
      0,                        //
476
      tsNumOfCommitThreads,     // vnode-commit
477
      tsNumOfCommitThreads,     // vnode-merge
478
      tsNumOfCompactThreads,    // vnode-compact
479
      tsNumOfRetentionThreads,  // vnode-retention
480
      2,                        // vnode-scan
481
  };
482

483
  for (int32_t i = 1; i < sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]); i++) {
3,446,190✔
484
    code = vnodeAsyncInit(&GVnodeAsyncs[i].async, GVnodeAsyncs[i].label);
2,871,825✔
485
    TSDB_CHECK_CODE(code, lino, _exit);
2,871,825✔
486

487
    code = vnodeAsyncSetWorkers(i, numOfThreads[i]);
2,871,825✔
488
    TSDB_CHECK_CODE(code, lino, _exit);
2,871,825✔
489
  }
490

491
_exit:
574,365✔
492
  return code;
574,365✔
493
}
494

495
void vnodeAsyncClose() {
574,365✔
496
  for (int32_t i = 1; i < sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]); i++) {
3,446,190✔
497
    int32_t ret = vnodeAsyncDestroy(&GVnodeAsyncs[i].async);
2,871,825✔
498
  }
499
}
574,365✔
500

501
int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void *), void (*complete)(void *), void *arg,
6,295,476✔
502
                   SVATaskID *taskID) {
503
  SVAChannelID channelID = {
6,295,476✔
504
      .async = async,
505
      .id = 0,
506
  };
507
  return vnodeAsyncC(&channelID, priority, execute, complete, arg, taskID);
6,295,476✔
508
}
509

510
int32_t vnodeAsyncC(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *),
6,295,070✔
511
                    void *arg, SVATaskID *taskID) {
512
  if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || execute == NULL ||
6,295,070✔
513
      channelID->id < 0) {
6,295,476✔
514
    return TSDB_CODE_INVALID_PARA;
×
515
  }
516

517
  int32_t  ret;
518
  int64_t  id;
519
  SVAsync *async = GVnodeAsyncs[channelID->async].async;
6,295,476✔
520

521
  // create task object
522
  SVATask *task = (SVATask *)taosMemoryCalloc(1, sizeof(SVATask));
6,295,257✔
523
  if (task == NULL) {
6,295,113✔
524
    return terrno;
×
525
  }
526

527
  task->priority = priority;
6,295,113✔
528
  task->priorScore = 0;
6,295,113✔
529
  task->execute = execute;
6,295,332✔
530
  task->cancel = cancel;
6,295,332✔
531
  task->arg = arg;
6,294,369✔
532
  task->state = EVA_TASK_STATE_WAITTING;
6,295,113✔
533
  task->numWait = 0;
6,295,332✔
534
  (void)taosThreadCondInit(&task->waitCond, NULL);
6,294,369✔
535

536
  // schedule task
537
  (void)taosThreadMutexLock(&async->mutex);
6,294,732✔
538

539
  if (channelID->id == 0) {
6,295,476✔
540
    task->channel = NULL;
6,295,476✔
541
  } else {
542
    SVAChannel channel = {
×
543
        .channelId = channelID->id,
×
544
    };
545
    ret = vHashGet(async->channelTable, &channel, (void **)&task->channel);
×
546
    TAOS_UNUSED(ret);
547
    if (task->channel == NULL) {
×
548
      (void)taosThreadMutexUnlock(&async->mutex);
×
549
      (void)taosThreadCondDestroy(&task->waitCond);
×
550
      taosMemoryFree(task);
×
551
      return TSDB_CODE_INVALID_PARA;
×
552
    }
553
  }
554

555
  task->taskId = id = ++async->nextTaskId;
6,295,476✔
556

557
  // add task to hash table
558
  ret = vHashPut(async->taskTable, task);
6,295,476✔
559
  if (ret != 0) {
6,295,476✔
560
    (void)taosThreadMutexUnlock(&async->mutex);
×
561
    (void)taosThreadCondDestroy(&task->waitCond);
×
562
    taosMemoryFree(task);
×
563
    return ret;
×
564
  }
565

566
  async->numTasks++;
6,295,476✔
567

568
  // add task to queue
569
  if (task->channel == NULL || task->channel->scheduled == NULL) {
6,295,476✔
570
    // add task to async->queue
571
    if (task->channel) {
6,295,476✔
572
      task->channel->scheduled = task;
×
573
    }
574

575
    task->next = &async->queue[priority];
6,295,476✔
576
    task->prev = async->queue[priority].prev;
6,295,476✔
577
    task->next->prev = task;
6,295,476✔
578
    task->prev->next = task;
6,295,476✔
579

580
    // signal worker or launch new worker
581
    if (async->numIdleWorkers > 0) {
6,295,476✔
582
      (void)taosThreadCondSignal(&(async->hasTask));
4,910,550✔
583
    } else if (async->numLaunchWorkers < async->numWorkers) {
1,384,926✔
584
      vnodeAsyncLaunchWorker(async);
910,221✔
585
    }
586
  } else if (task->channel->scheduled->state == EVA_TASK_STATE_RUNNING ||
×
587
             priority >= VATASK_PIORITY(task->channel->scheduled)) {
×
588
    // add task to task->channel->queue
589
    task->next = &task->channel->queue[priority];
×
590
    task->prev = task->channel->queue[priority].prev;
×
591
    task->next->prev = task;
×
592
    task->prev->next = task;
×
593
  } else {
594
    // remove task->channel->scheduled from queue
595
    task->channel->scheduled->prev->next = task->channel->scheduled->next;
×
596
    task->channel->scheduled->next->prev = task->channel->scheduled->prev;
×
597

598
    // promote priority and add task->channel->scheduled to task->channel->queue
599
    task->channel->scheduled->priorScore++;
×
600
    int32_t newPriority = VATASK_PIORITY(task->channel->scheduled);
×
601
    task->channel->scheduled->next = &task->channel->queue[newPriority];
×
602
    task->channel->scheduled->prev = task->channel->queue[newPriority].prev;
×
603
    task->channel->scheduled->next->prev = task->channel->scheduled;
×
604
    task->channel->scheduled->prev->next = task->channel->scheduled;
×
605

606
    // add task to queue
607
    task->channel->scheduled = task;
×
608
    task->next = &async->queue[priority];
×
609
    task->prev = async->queue[priority].prev;
×
610
    task->next->prev = task;
×
611
    task->prev->next = task;
×
612
  }
613

614
  (void)taosThreadMutexUnlock(&async->mutex);
6,295,476✔
615

616
  if (taskID != NULL) {
6,295,476✔
617
    taskID->async = channelID->async;
6,295,476✔
618
    taskID->id = id;
6,295,476✔
619
  }
620

621
  return 0;
6,295,476✔
622
}
623

624
void vnodeAWait(SVATaskID *taskID) {
73,504,401✔
625
  if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
73,504,401✔
626
    return;
68,018,808✔
627
  }
628

629
  SVAsync *async = GVnodeAsyncs[taskID->async].async;
5,488,565✔
630
  SVATask *task = NULL;
5,485,734✔
631
  SVATask  task2 = {
5,486,331✔
632
       .taskId = taskID->id,
5,486,834✔
633
  };
634

635
  (void)taosThreadMutexLock(&async->mutex);
5,486,901✔
636

637
  int32_t ret = vHashGet(async->taskTable, &task2, (void **)&task);
5,489,611✔
638
  if (task) {
5,489,611✔
639
    task->numWait++;
901,917✔
640
    (void)taosThreadCondWait(&task->waitCond, &async->mutex);
901,917✔
641
    task->numWait--;
901,917✔
642

643
    if (task->numWait == 0) {
901,917✔
644
      (void)taosThreadCondDestroy(&task->waitCond);
901,917✔
645
      taosMemoryFree(task);
901,917✔
646
    }
647
  }
648

649
  (void)taosThreadMutexUnlock(&async->mutex);
5,489,611✔
650
}
651

652
int32_t vnodeACancel(SVATaskID *taskID) {
60,785,996✔
653
  if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
60,785,996✔
654
    return TSDB_CODE_INVALID_PARA;
60,293,988✔
655
  }
656

657
  int32_t  ret = 0;
492,904✔
658
  SVAsync *async = GVnodeAsyncs[taskID->async].async;
492,904✔
659
  SVATask *task = NULL;
492,904✔
660
  SVATask  task2 = {
492,904✔
661
       .taskId = taskID->id,
492,904✔
662
  };
663
  void (*cancel)(void *) = NULL;
492,904✔
664
  void *arg = NULL;
492,904✔
665

666
  (void)taosThreadMutexLock(&async->mutex);
492,904✔
667

668
  ret = vHashGet(async->taskTable, &task2, (void **)&task);
492,904✔
669
  if (task) {
492,904✔
670
    if (task->state == EVA_TASK_STATE_WAITTING) {
59,864✔
671
      cancel = task->cancel;
16,478✔
672
      arg = task->arg;
16,478✔
673
      task->next->prev = task->prev;
16,478✔
674
      task->prev->next = task->next;
16,478✔
675
      vnodeAsyncTaskDone(async, task);
16,478✔
676
    } else {
677
      ret = TSDB_CODE_FAILED;
43,386✔
678
    }
679
  }
680

681
  (void)taosThreadMutexUnlock(&async->mutex);
492,904✔
682

683
  if (cancel) {
492,904✔
684
    cancel(arg);
16,478✔
685
  }
686

687
  return ret;
492,904✔
688
}
689

690
int32_t vnodeAsyncSetWorkers(int64_t asyncID, int32_t numWorkers) {
2,871,825✔
691
  if (asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) {
2,871,825✔
692
    return TSDB_CODE_INVALID_PARA;
×
693
  }
694
  int32_t  ret;
695
  SVAsync *async = GVnodeAsyncs[asyncID].async;
2,871,825✔
696
  (void)taosThreadMutexLock(&async->mutex);
2,871,825✔
697
  async->numWorkers = numWorkers;
2,871,825✔
698
  if (async->numIdleWorkers > 0) {
2,871,825✔
699
    (void)taosThreadCondBroadcast(&async->hasTask);
×
700
  }
701
  (void)taosThreadMutexUnlock(&async->mutex);
2,871,825✔
702

703
  return 0;
2,871,825✔
704
}
705

706
int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) {
×
707
  if (channelID == NULL || asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID) {
×
708
    return TSDB_CODE_INVALID_PARA;
×
709
  }
710

711
  SVAsync *async = GVnodeAsyncs[asyncID].async;
×
712

713
  // create channel object
714
  SVAChannel *channel = (SVAChannel *)taosMemoryMalloc(sizeof(SVAChannel));
×
715
  if (channel == NULL) {
×
716
    return terrno;
×
717
  }
718
  channel->state = EVA_CHANNEL_STATE_OPEN;
×
719
  for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
720
    channel->queue[i].next = &channel->queue[i];
×
721
    channel->queue[i].prev = &channel->queue[i];
×
722
  }
723
  channel->scheduled = NULL;
×
724

725
  // register channel
726
  (void)taosThreadMutexLock(&async->mutex);
×
727

728
  channel->channelId = channelID->id = ++async->nextChannelId;
×
729

730
  // add to hash table
731
  int32_t ret = vHashPut(async->channelTable, channel);
×
732
  if (ret != 0) {
×
733
    (void)taosThreadMutexUnlock(&async->mutex);
×
734
    taosMemoryFree(channel);
×
735
    return ret;
×
736
  }
737

738
  // add to list
739
  channel->next = &async->chList;
×
740
  channel->prev = async->chList.prev;
×
741
  channel->next->prev = channel;
×
742
  channel->prev->next = channel;
×
743

744
  async->numChannels++;
×
745

746
  (void)taosThreadMutexUnlock(&async->mutex);
×
747

748
  channelID->async = asyncID;
×
749
  return 0;
×
750
}
751

752
int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
×
753
  if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || channelID->id <= 0) {
×
754
    return TSDB_CODE_INVALID_PARA;
×
755
  }
756

757
  SVAsync    *async = GVnodeAsyncs[channelID->async].async;
×
758
  SVAChannel *channel = NULL;
×
759
  SVAChannel  channel2 = {
×
760
       .channelId = channelID->id,
×
761
  };
762
  SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
×
763
  if (cancelArray == NULL) {
×
764
    return terrno;
×
765
  }
766

767
  (void)taosThreadMutexLock(&async->mutex);
×
768

769
  int32_t ret = vHashGet(async->channelTable, &channel2, (void **)&channel);
×
770
  TAOS_UNUSED(ret);
771
  if (channel) {
×
772
    // unregister channel
773
    channel->next->prev = channel->prev;
×
774
    channel->prev->next = channel->next;
×
775
    ret = vHashDrop(async->channelTable, channel);
×
776
    async->numChannels--;
×
777

778
    // cancel all waiting tasks
779
    for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
780
      while (channel->queue[i].next != &channel->queue[i]) {
×
781
        SVATask *task = channel->queue[i].next;
×
782
        task->prev->next = task->next;
×
783
        task->next->prev = task->prev;
×
784
        if (task->cancel) {
×
785
          if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
×
786
                                             .cancel = task->cancel,
×
787
                                             .arg = task->arg,
×
788
                                         }) == NULL) {
789
            vError("failed to push cancel info");
×
790
          };
791
        }
792
        vnodeAsyncTaskDone(async, task);
×
793
      }
794
    }
795

796
    // cancel or wait the scheduled task
797
    if (channel->scheduled == NULL || channel->scheduled->state == EVA_TASK_STATE_WAITTING) {
×
798
      if (channel->scheduled) {
×
799
        channel->scheduled->prev->next = channel->scheduled->next;
×
800
        channel->scheduled->next->prev = channel->scheduled->prev;
×
801
        if (channel->scheduled->cancel) {
×
802
          if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
×
803
                                             .cancel = channel->scheduled->cancel,
×
804
                                             .arg = channel->scheduled->arg,
×
805
                                         }) == NULL) {
806
            vError("failed to push cancel info");
×
807
          }
808
        }
809
        vnodeAsyncTaskDone(async, channel->scheduled);
×
810
      }
811
      taosMemoryFree(channel);
×
812
    } else {
813
      if (waitRunning) {
×
814
        // wait task
815
        SVATask *task = channel->scheduled;
×
816
        task->numWait++;
×
817
        (void)taosThreadCondWait(&task->waitCond, &async->mutex);
×
818
        task->numWait--;
×
819
        if (task->numWait == 0) {
×
820
          (void)taosThreadCondDestroy(&task->waitCond);
×
821
          taosMemoryFree(task);
×
822
        }
823

824
        taosMemoryFree(channel);
×
825
      } else {
826
        channel->state = EVA_CHANNEL_STATE_CLOSE;
×
827
      }
828
    }
829
  }
830

831
  (void)taosThreadMutexUnlock(&async->mutex);
×
832
  for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
×
833
    SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
×
834
    cancel->cancel(cancel->arg);
×
835
  }
836
  taosArrayDestroy(cancelArray);
×
837

838
  channelID->async = 0;
×
839
  channelID->id = 0;
×
840
  return 0;
×
841
}
842

843
const char *vnodeGetATaskName(EVATaskT taskType) {
35,964✔
844
  switch (taskType) {
35,964✔
845
    case EVA_TASK_COMMIT:
22,278✔
846
      return "vnode-commit";
22,278✔
847
    case EVA_TASK_MERGE:
13,686✔
848
      return "vnode-merge";
13,686✔
849
    case EVA_TASK_COMPACT:
×
850
      return "vnode-compact";
×
UNCOV
851
    case EVA_TASK_RETENTION:
×
UNCOV
852
      return "vnode-retention";
×
853
    default:
×
854
      return "unknown";
×
855
  }
856
}
857

858
bool vnodeATaskValid(SVATaskID *taskID) {
1,237,163✔
859
  if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
1,237,163✔
860
    return false;
255,771✔
861
  }
862

863
  SVAsync *async = GVnodeAsyncs[taskID->async].async;
981,392✔
864
  SVATask  task2 = {
981,392✔
865
       .taskId = taskID->id,
981,392✔
866
  };
867
  SVATask *task = NULL;
981,392✔
868

869
  (void)taosThreadMutexLock(&async->mutex);
981,392✔
870
  int32_t ret = vHashGet(async->taskTable, &task2, (void **)&task);
981,392✔
871
  (void)taosThreadMutexUnlock(&async->mutex);
981,392✔
872

873
  return ret == 0 && task != NULL;
981,392✔
874
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc