• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/vnd/vnodeAsync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "vnd.h"
17
#include "vnodeHash.h"
18

19
typedef struct SVAsync    SVAsync;
20
typedef struct SVATask    SVATask;
21
typedef struct SVAChannel SVAChannel;
22

23
#define VNODE_ASYNC_DEFAULT_WORKERS 4
24
#define VNODE_ASYNC_MAX_WORKERS     256
25

26
// priority
27

28
#define EVA_PRIORITY_MAX (EVA_PRIORITY_LOW + 1)
29

30
// worker
31
typedef enum {
32
  EVA_WORKER_STATE_UINIT = 0,
33
  EVA_WORKER_STATE_ACTIVE,
34
  EVA_WORKER_STATE_IDLE,
35
  EVA_WORKER_STATE_STOP,
36
} EVWorkerState;
37

38
typedef struct {
39
  SVAsync      *async;
40
  int32_t       workerId;
41
  EVWorkerState state;
42
  TdThread      thread;
43
  SVATask      *runningTask;
44
} SVWorker;
45

46
// task
47
typedef enum {
48
  EVA_TASK_STATE_WAITTING = 0,
49
  EVA_TASK_STATE_RUNNING,
50
} EVATaskState;
51

52
struct SVATask {
53
  int64_t     taskId;
54
  EVAPriority priority;
55
  int32_t     priorScore;
56
  SVAChannel *channel;
57
  int32_t (*execute)(void *);
58
  void (*cancel)(void *);
59
  void        *arg;
60
  EVATaskState state;
61

62
  // wait
63
  int32_t      numWait;
64
  TdThreadCond waitCond;
65

66
  // queue
67
  struct SVATask *prev;
68
  struct SVATask *next;
69
};
70

71
typedef struct {
72
  void (*cancel)(void *);
73
  void *arg;
74
} SVATaskCancelInfo;
75

76
#define VATASK_PIORITY(task_) ((task_)->priority - ((task_)->priorScore / 4))
77

78
// async channel
79
typedef enum {
80
  EVA_CHANNEL_STATE_OPEN = 0,
81
  EVA_CHANNEL_STATE_CLOSE,
82
} EVAChannelState;
83

84
struct SVAChannel {
85
  int64_t         channelId;
86
  EVAChannelState state;
87
  SVATask         queue[EVA_PRIORITY_MAX];
88
  SVATask        *scheduled;
89

90
  SVAChannel *prev;
91
  SVAChannel *next;
92
};
93

94
// async handle
95
struct SVAsync {
96
  const char *label;
97

98
  TdThreadMutex mutex;
99
  TdThreadCond  hasTask;
100
  bool          stop;
101

102
  // worker
103
  int32_t  numWorkers;
104
  int32_t  numLaunchWorkers;
105
  int32_t  numIdleWorkers;
106
  SVWorker workers[VNODE_ASYNC_MAX_WORKERS];
107

108
  // channel
109
  int64_t      nextChannelId;
110
  int32_t      numChannels;
111
  SVAChannel   chList;
112
  SVHashTable *channelTable;
113

114
  // task
115
  int64_t      nextTaskId;
116
  int32_t      numTasks;
117
  SVATask      queue[EVA_PRIORITY_MAX];
118
  SVHashTable *taskTable;
119
};
120

121
struct {
122
  const char *label;
123
  SVAsync    *async;
124
} GVnodeAsyncs[] = {
125
    [0] = {NULL, NULL},
126
    [1] = {"vnode-commit", NULL},
127
    [2] = {"vnode-merge", NULL},
128
    [3] = {"vnode-compact", NULL},
129
    [4] = {"vnode-retention", NULL},
130
};
131

132
#define MIN_ASYNC_ID 1
133
#define MAX_ASYNC_ID (sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]) - 1)
134

UNCOV
135
static void vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
×
136
  int32_t ret;
137

UNCOV
138
  if (task->channel != NULL && task->channel->scheduled == task) {
×
139
    task->channel->scheduled = NULL;
×
140
    if (task->channel->state == EVA_CHANNEL_STATE_CLOSE) {
×
141
      taosMemoryFree(task->channel);
×
142
    } else {
143
      for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
144
        SVATask *nextTask = task->channel->queue[i].next;
×
145
        if (nextTask != &task->channel->queue[i]) {
×
146
          if (task->channel->scheduled == NULL) {
×
147
            task->channel->scheduled = nextTask;
×
148
            nextTask->next->prev = nextTask->prev;
×
149
            nextTask->prev->next = nextTask->next;
×
150
          } else {
151
            nextTask->priorScore++;
×
152
            int32_t newPriority = VATASK_PIORITY(nextTask);
×
153
            if (newPriority != i) {
×
154
              // remove from current priority queue
155
              nextTask->prev->next = nextTask->next;
×
156
              nextTask->next->prev = nextTask->prev;
×
157
              // add to new priority queue
158
              nextTask->next = &task->channel->queue[newPriority];
×
159
              nextTask->prev = task->channel->queue[newPriority].prev;
×
160
              nextTask->next->prev = nextTask;
×
161
              nextTask->prev->next = nextTask;
×
162
            }
163
          }
164
        }
165
      }
166

167
      if (task->channel->scheduled != NULL) {
×
168
        int32_t priority = VATASK_PIORITY(task->channel->scheduled);
×
169
        task->channel->scheduled->next = &async->queue[priority];
×
170
        task->channel->scheduled->prev = async->queue[priority].prev;
×
171
        task->channel->scheduled->next->prev = task->channel->scheduled;
×
172
        task->channel->scheduled->prev->next = task->channel->scheduled;
×
173
      }
174
    }
175
  }
176

UNCOV
177
  ret = vHashDrop(async->taskTable, task);
×
178
  TAOS_UNUSED(ret);
UNCOV
179
  async->numTasks--;
×
180

UNCOV
181
  if (task->numWait == 0) {
×
UNCOV
182
    (void)taosThreadCondDestroy(&task->waitCond);
×
UNCOV
183
    taosMemoryFree(task);
×
UNCOV
184
  } else if (task->numWait == 1) {
×
UNCOV
185
    (void)taosThreadCondSignal(&task->waitCond);
×
186
  } else {
187
    (void)taosThreadCondBroadcast(&task->waitCond);
×
188
  }
UNCOV
189
}
×
190

UNCOV
191
static void vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
×
UNCOV
192
  while (async->queue[0].next != &async->queue[0] || async->queue[1].next != &async->queue[1] ||
×
UNCOV
193
         async->queue[2].next != &async->queue[2]) {
×
194
    for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
195
      while (async->queue[i].next != &async->queue[i]) {
×
196
        SVATask *task = async->queue[i].next;
×
197
        task->prev->next = task->next;
×
198
        task->next->prev = task->prev;
×
199
        if (task->cancel) {
×
200
          if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
×
201
                                             .cancel = task->cancel,
×
202
                                             .arg = task->arg,
×
203
                                         }) == NULL) {
204
            vError("failed to push cancel task into array");
×
205
          };
206
        }
207
        vnodeAsyncTaskDone(async, task);
×
208
      }
209
    }
210
  }
UNCOV
211
}
×
212

UNCOV
213
static void *vnodeAsyncLoop(void *arg) {
×
UNCOV
214
  SVWorker *worker = (SVWorker *)arg;
×
UNCOV
215
  SVAsync  *async = worker->async;
×
UNCOV
216
  SArray   *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
×
UNCOV
217
  if (cancelArray == NULL) {
×
218
    return NULL;
×
219
  }
220

UNCOV
221
  setThreadName(async->label);
×
222

UNCOV
223
  for (;;) {
×
UNCOV
224
    (void)taosThreadMutexLock(&async->mutex);
×
225

226
    // finish last running task
UNCOV
227
    if (worker->runningTask != NULL) {
×
UNCOV
228
      vnodeAsyncTaskDone(async, worker->runningTask);
×
UNCOV
229
      worker->runningTask = NULL;
×
230
    }
231

232
    for (;;) {
UNCOV
233
      if (async->stop || worker->workerId >= async->numWorkers) {
×
UNCOV
234
        if (async->stop) {  // cancel all tasks
×
UNCOV
235
          vnodeAsyncCancelAllTasks(async, cancelArray);
×
236
        }
UNCOV
237
        worker->state = EVA_WORKER_STATE_STOP;
×
UNCOV
238
        async->numLaunchWorkers--;
×
UNCOV
239
        (void)taosThreadMutexUnlock(&async->mutex);
×
UNCOV
240
        goto _exit;
×
241
      }
242

UNCOV
243
      for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
UNCOV
244
        SVATask *task = async->queue[i].next;
×
UNCOV
245
        if (task != &async->queue[i]) {
×
UNCOV
246
          if (worker->runningTask == NULL) {
×
UNCOV
247
            worker->runningTask = task;
×
UNCOV
248
            task->prev->next = task->next;
×
UNCOV
249
            task->next->prev = task->prev;
×
250
          } else {  // promote priority
251
            task->priorScore++;
×
252
            int32_t priority = VATASK_PIORITY(task);
×
253
            if (priority != i) {
×
254
              // remove from current priority queue
255
              task->prev->next = task->next;
×
256
              task->next->prev = task->prev;
×
257
              // add to new priority queue
258
              task->next = &async->queue[priority];
×
259
              task->prev = async->queue[priority].prev;
×
260
              task->next->prev = task;
×
261
              task->prev->next = task;
×
262
            }
263
          }
264
        }
265
      }
266

UNCOV
267
      if (worker->runningTask == NULL) {
×
UNCOV
268
        worker->state = EVA_WORKER_STATE_IDLE;
×
UNCOV
269
        async->numIdleWorkers++;
×
UNCOV
270
        (void)taosThreadCondWait(&async->hasTask, &async->mutex);
×
UNCOV
271
        async->numIdleWorkers--;
×
UNCOV
272
        worker->state = EVA_WORKER_STATE_ACTIVE;
×
273
      } else {
UNCOV
274
        worker->runningTask->state = EVA_TASK_STATE_RUNNING;
×
UNCOV
275
        break;
×
276
      }
277
    }
278

UNCOV
279
    (void)taosThreadMutexUnlock(&async->mutex);
×
280

281
    // do run the task
UNCOV
282
    int32_t code = worker->runningTask->execute(worker->runningTask->arg);
×
283
    TAOS_UNUSED(code);
284
  }
285

UNCOV
286
_exit:
×
UNCOV
287
  for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
×
288
    SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
×
289
    cancel->cancel(cancel->arg);
×
290
  }
UNCOV
291
  taosArrayDestroy(cancelArray);
×
UNCOV
292
  return NULL;
×
293
}
294

UNCOV
295
static uint32_t vnodeAsyncTaskHash(const void *obj) {
×
UNCOV
296
  SVATask *task = (SVATask *)obj;
×
UNCOV
297
  return MurmurHash3_32((const char *)(&task->taskId), sizeof(task->taskId));
×
298
}
299

UNCOV
300
static int32_t vnodeAsyncTaskCompare(const void *obj1, const void *obj2) {
×
UNCOV
301
  SVATask *task1 = (SVATask *)obj1;
×
UNCOV
302
  SVATask *task2 = (SVATask *)obj2;
×
UNCOV
303
  if (task1->taskId < task2->taskId) {
×
UNCOV
304
    return -1;
×
UNCOV
305
  } else if (task1->taskId > task2->taskId) {
×
UNCOV
306
    return 1;
×
307
  }
UNCOV
308
  return 0;
×
309
}
310

311
static uint32_t vnodeAsyncChannelHash(const void *obj) {
×
312
  SVAChannel *channel = (SVAChannel *)obj;
×
313
  return MurmurHash3_32((const char *)(&channel->channelId), sizeof(channel->channelId));
×
314
}
315

316
static int32_t vnodeAsyncChannelCompare(const void *obj1, const void *obj2) {
×
317
  SVAChannel *channel1 = (SVAChannel *)obj1;
×
318
  SVAChannel *channel2 = (SVAChannel *)obj2;
×
319
  if (channel1->channelId < channel2->channelId) {
×
320
    return -1;
×
321
  } else if (channel1->channelId > channel2->channelId) {
×
322
    return 1;
×
323
  }
324
  return 0;
×
325
}
326

UNCOV
327
static int32_t vnodeAsyncInit(SVAsync **async, const char *label) {
×
328
  int32_t ret;
329

UNCOV
330
  if (async == NULL) {
×
331
    return TSDB_CODE_INVALID_PARA;
×
332
  }
333

UNCOV
334
  if (label == NULL) {
×
335
    label = "anonymous";
×
336
  }
337

UNCOV
338
  (*async) = (SVAsync *)taosMemoryCalloc(1, sizeof(SVAsync) + strlen(label) + 1);
×
UNCOV
339
  if ((*async) == NULL) {
×
340
    return terrno;
×
341
  }
342

UNCOV
343
  memcpy((char *)((*async) + 1), label, strlen(label) + 1);
×
UNCOV
344
  (*async)->label = (const char *)((*async) + 1);
×
345

UNCOV
346
  (void)taosThreadMutexInit(&(*async)->mutex, NULL);
×
UNCOV
347
  (void)taosThreadCondInit(&(*async)->hasTask, NULL);
×
UNCOV
348
  (*async)->stop = false;
×
349

350
  // worker
UNCOV
351
  (*async)->numWorkers = VNODE_ASYNC_DEFAULT_WORKERS;
×
UNCOV
352
  (*async)->numLaunchWorkers = 0;
×
UNCOV
353
  (*async)->numIdleWorkers = 0;
×
UNCOV
354
  for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) {
×
UNCOV
355
    (*async)->workers[i].async = (*async);
×
UNCOV
356
    (*async)->workers[i].workerId = i;
×
UNCOV
357
    (*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
×
UNCOV
358
    (*async)->workers[i].runningTask = NULL;
×
359
  }
360

361
  // channel
UNCOV
362
  (*async)->nextChannelId = 0;
×
UNCOV
363
  (*async)->numChannels = 0;
×
UNCOV
364
  (*async)->chList.prev = &(*async)->chList;
×
UNCOV
365
  (*async)->chList.next = &(*async)->chList;
×
UNCOV
366
  ret = vHashInit(&(*async)->channelTable, vnodeAsyncChannelHash, vnodeAsyncChannelCompare);
×
UNCOV
367
  if (ret != 0) {
×
368
    (void)taosThreadMutexDestroy(&(*async)->mutex);
×
369
    (void)taosThreadCondDestroy(&(*async)->hasTask);
×
370
    taosMemoryFree(*async);
×
371
    return ret;
×
372
  }
373

374
  // task
UNCOV
375
  (*async)->nextTaskId = 0;
×
UNCOV
376
  (*async)->numTasks = 0;
×
UNCOV
377
  for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
UNCOV
378
    (*async)->queue[i].next = &(*async)->queue[i];
×
UNCOV
379
    (*async)->queue[i].prev = &(*async)->queue[i];
×
380
  }
UNCOV
381
  ret = vHashInit(&(*async)->taskTable, vnodeAsyncTaskHash, vnodeAsyncTaskCompare);
×
UNCOV
382
  if (ret != 0) {
×
383
    vHashDestroy(&(*async)->channelTable);
×
384
    (void)taosThreadMutexDestroy(&(*async)->mutex);
×
385
    (void)taosThreadCondDestroy(&(*async)->hasTask);
×
386
    taosMemoryFree(*async);
×
387
    return ret;
×
388
  }
389

UNCOV
390
  return 0;
×
391
}
392

UNCOV
393
static int32_t vnodeAsyncDestroy(SVAsync **async) {
×
UNCOV
394
  if ((*async) == NULL) {
×
395
    return TSDB_CODE_INVALID_PARA;
×
396
  }
397

398
  // set stop and broadcast
UNCOV
399
  (void)taosThreadMutexLock(&(*async)->mutex);
×
UNCOV
400
  (*async)->stop = true;
×
UNCOV
401
  (void)taosThreadCondBroadcast(&(*async)->hasTask);
×
UNCOV
402
  (void)taosThreadMutexUnlock(&(*async)->mutex);
×
403

404
  // join all workers
UNCOV
405
  for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) {
×
UNCOV
406
    (void)taosThreadMutexLock(&(*async)->mutex);
×
UNCOV
407
    EVWorkerState state = (*async)->workers[i].state;
×
UNCOV
408
    (void)taosThreadMutexUnlock(&(*async)->mutex);
×
409

UNCOV
410
    if (state == EVA_WORKER_STATE_UINIT) {
×
UNCOV
411
      continue;
×
412
    }
413

UNCOV
414
    (void)taosThreadJoin((*async)->workers[i].thread, NULL);
×
UNCOV
415
    (*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
×
416
  }
417

418
  // close all channels
UNCOV
419
  for (SVAChannel *channel = (*async)->chList.next; channel != &(*async)->chList; channel = (*async)->chList.next) {
×
420
    channel->next->prev = channel->prev;
×
421
    channel->prev->next = channel->next;
×
422

423
    int32_t ret = vHashDrop((*async)->channelTable, channel);
×
424
    TAOS_UNUSED(ret);
425
    (*async)->numChannels--;
×
426
    taosMemoryFree(channel);
×
427
  }
428

UNCOV
429
  (void)taosThreadMutexDestroy(&(*async)->mutex);
×
UNCOV
430
  (void)taosThreadCondDestroy(&(*async)->hasTask);
×
431

UNCOV
432
  vHashDestroy(&(*async)->channelTable);
×
UNCOV
433
  vHashDestroy(&(*async)->taskTable);
×
UNCOV
434
  taosMemoryFree(*async);
×
UNCOV
435
  *async = NULL;
×
436

UNCOV
437
  return 0;
×
438
}
439

UNCOV
440
static void vnodeAsyncLaunchWorker(SVAsync *async) {
×
UNCOV
441
  for (int32_t i = 0; i < async->numWorkers; i++) {
×
UNCOV
442
    if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) {
×
UNCOV
443
      continue;
×
UNCOV
444
    } else if (async->workers[i].state == EVA_WORKER_STATE_STOP) {
×
445
      int32_t ret = taosThreadJoin(async->workers[i].thread, NULL);
×
446
      async->workers[i].state = EVA_WORKER_STATE_UINIT;
×
447
    }
448

UNCOV
449
    int32_t ret = taosThreadCreate(&async->workers[i].thread, NULL, vnodeAsyncLoop, &async->workers[i]);
×
UNCOV
450
    if (ret) {
×
451
      vError("failed to create worker thread since %s", tstrerror(ret));
×
452
    } else {
UNCOV
453
      async->workers[i].state = EVA_WORKER_STATE_ACTIVE;
×
UNCOV
454
      async->numLaunchWorkers++;
×
455
    }
UNCOV
456
    break;
×
457
  }
UNCOV
458
}
×
459

UNCOV
460
int32_t vnodeAsyncOpen() {
×
UNCOV
461
  int32_t code = 0;
×
UNCOV
462
  int32_t lino = 0;
×
463

UNCOV
464
  int32_t numOfThreads[] = {
×
465
      0,                        //
466
      tsNumOfCommitThreads,     // vnode-commit
467
      tsNumOfCommitThreads,     // vnode-merge
468
      tsNumOfCompactThreads,    // vnode-compact
469
      tsNumOfRetentionThreads,  // vnode-retention
470
  };
471

UNCOV
472
  for (int32_t i = 1; i < sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]); i++) {
×
UNCOV
473
    code = vnodeAsyncInit(&GVnodeAsyncs[i].async, GVnodeAsyncs[i].label);
×
UNCOV
474
    TSDB_CHECK_CODE(code, lino, _exit);
×
475

UNCOV
476
    code = vnodeAsyncSetWorkers(i, numOfThreads[i]);
×
UNCOV
477
    TSDB_CHECK_CODE(code, lino, _exit);
×
478
  }
479

UNCOV
480
_exit:
×
UNCOV
481
  return code;
×
482
}
483

UNCOV
484
void vnodeAsyncClose() {
×
UNCOV
485
  for (int32_t i = 1; i < sizeof(GVnodeAsyncs) / sizeof(GVnodeAsyncs[0]); i++) {
×
UNCOV
486
    int32_t ret = vnodeAsyncDestroy(&GVnodeAsyncs[i].async);
×
487
  }
UNCOV
488
}
×
489

UNCOV
490
int32_t vnodeAsync(int64_t async, EVAPriority priority, int32_t (*execute)(void *), void (*complete)(void *), void *arg,
×
491
                   SVATaskID *taskID) {
UNCOV
492
  SVAChannelID channelID = {
×
493
      .async = async,
494
      .id = 0,
495
  };
UNCOV
496
  return vnodeAsyncC(&channelID, priority, execute, complete, arg, taskID);
×
497
}
498

UNCOV
499
int32_t vnodeAsyncC(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *),
×
500
                    void *arg, SVATaskID *taskID) {
UNCOV
501
  if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || execute == NULL ||
×
UNCOV
502
      channelID->id < 0) {
×
503
    return TSDB_CODE_INVALID_PARA;
×
504
  }
505

506
  int32_t  ret;
507
  int64_t  id;
UNCOV
508
  SVAsync *async = GVnodeAsyncs[channelID->async].async;
×
509

510
  // create task object
UNCOV
511
  SVATask *task = (SVATask *)taosMemoryCalloc(1, sizeof(SVATask));
×
UNCOV
512
  if (task == NULL) {
×
513
    return terrno;
×
514
  }
515

UNCOV
516
  task->priority = priority;
×
UNCOV
517
  task->priorScore = 0;
×
UNCOV
518
  task->execute = execute;
×
UNCOV
519
  task->cancel = cancel;
×
UNCOV
520
  task->arg = arg;
×
UNCOV
521
  task->state = EVA_TASK_STATE_WAITTING;
×
UNCOV
522
  task->numWait = 0;
×
UNCOV
523
  (void)taosThreadCondInit(&task->waitCond, NULL);
×
524

525
  // schedule task
UNCOV
526
  (void)taosThreadMutexLock(&async->mutex);
×
527

UNCOV
528
  if (channelID->id == 0) {
×
UNCOV
529
    task->channel = NULL;
×
530
  } else {
531
    SVAChannel channel = {
×
532
        .channelId = channelID->id,
×
533
    };
534
    ret = vHashGet(async->channelTable, &channel, (void **)&task->channel);
×
535
    TAOS_UNUSED(ret);
536
    if (task->channel == NULL) {
×
537
      (void)taosThreadMutexUnlock(&async->mutex);
×
538
      (void)taosThreadCondDestroy(&task->waitCond);
×
539
      taosMemoryFree(task);
×
540
      return TSDB_CODE_INVALID_PARA;
×
541
    }
542
  }
543

UNCOV
544
  task->taskId = id = ++async->nextTaskId;
×
545

546
  // add task to hash table
UNCOV
547
  ret = vHashPut(async->taskTable, task);
×
UNCOV
548
  if (ret != 0) {
×
549
    (void)taosThreadMutexUnlock(&async->mutex);
×
550
    (void)taosThreadCondDestroy(&task->waitCond);
×
551
    taosMemoryFree(task);
×
552
    return ret;
×
553
  }
554

UNCOV
555
  async->numTasks++;
×
556

557
  // add task to queue
UNCOV
558
  if (task->channel == NULL || task->channel->scheduled == NULL) {
×
559
    // add task to async->queue
UNCOV
560
    if (task->channel) {
×
561
      task->channel->scheduled = task;
×
562
    }
563

UNCOV
564
    task->next = &async->queue[priority];
×
UNCOV
565
    task->prev = async->queue[priority].prev;
×
UNCOV
566
    task->next->prev = task;
×
UNCOV
567
    task->prev->next = task;
×
568

569
    // signal worker or launch new worker
UNCOV
570
    if (async->numIdleWorkers > 0) {
×
UNCOV
571
      (void)taosThreadCondSignal(&(async->hasTask));
×
UNCOV
572
    } else if (async->numLaunchWorkers < async->numWorkers) {
×
UNCOV
573
      vnodeAsyncLaunchWorker(async);
×
574
    }
575
  } else if (task->channel->scheduled->state == EVA_TASK_STATE_RUNNING ||
×
576
             priority >= VATASK_PIORITY(task->channel->scheduled)) {
×
577
    // add task to task->channel->queue
578
    task->next = &task->channel->queue[priority];
×
579
    task->prev = task->channel->queue[priority].prev;
×
580
    task->next->prev = task;
×
581
    task->prev->next = task;
×
582
  } else {
583
    // remove task->channel->scheduled from queue
584
    task->channel->scheduled->prev->next = task->channel->scheduled->next;
×
585
    task->channel->scheduled->next->prev = task->channel->scheduled->prev;
×
586

587
    // promote priority and add task->channel->scheduled to task->channel->queue
588
    task->channel->scheduled->priorScore++;
×
589
    int32_t newPriority = VATASK_PIORITY(task->channel->scheduled);
×
590
    task->channel->scheduled->next = &task->channel->queue[newPriority];
×
591
    task->channel->scheduled->prev = task->channel->queue[newPriority].prev;
×
592
    task->channel->scheduled->next->prev = task->channel->scheduled;
×
593
    task->channel->scheduled->prev->next = task->channel->scheduled;
×
594

595
    // add task to queue
596
    task->channel->scheduled = task;
×
597
    task->next = &async->queue[priority];
×
598
    task->prev = async->queue[priority].prev;
×
599
    task->next->prev = task;
×
600
    task->prev->next = task;
×
601
  }
602

UNCOV
603
  (void)taosThreadMutexUnlock(&async->mutex);
×
604

UNCOV
605
  if (taskID != NULL) {
×
UNCOV
606
    taskID->async = channelID->async;
×
UNCOV
607
    taskID->id = id;
×
608
  }
609

UNCOV
610
  return 0;
×
611
}
612

UNCOV
613
void vnodeAWait(SVATaskID *taskID) {
×
UNCOV
614
  if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
×
UNCOV
615
    return;
×
616
  }
617

UNCOV
618
  SVAsync *async = GVnodeAsyncs[taskID->async].async;
×
UNCOV
619
  SVATask *task = NULL;
×
UNCOV
620
  SVATask  task2 = {
×
UNCOV
621
       .taskId = taskID->id,
×
622
  };
623

UNCOV
624
  (void)taosThreadMutexLock(&async->mutex);
×
625

UNCOV
626
  int32_t ret = vHashGet(async->taskTable, &task2, (void **)&task);
×
UNCOV
627
  if (task) {
×
UNCOV
628
    task->numWait++;
×
UNCOV
629
    (void)taosThreadCondWait(&task->waitCond, &async->mutex);
×
UNCOV
630
    task->numWait--;
×
631

UNCOV
632
    if (task->numWait == 0) {
×
UNCOV
633
      (void)taosThreadCondDestroy(&task->waitCond);
×
UNCOV
634
      taosMemoryFree(task);
×
635
    }
636
  }
637

UNCOV
638
  (void)taosThreadMutexUnlock(&async->mutex);
×
639
}
640

UNCOV
641
int32_t vnodeACancel(SVATaskID *taskID) {
×
UNCOV
642
  if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
×
UNCOV
643
    return TSDB_CODE_INVALID_PARA;
×
644
  }
645

UNCOV
646
  int32_t  ret = 0;
×
UNCOV
647
  SVAsync *async = GVnodeAsyncs[taskID->async].async;
×
UNCOV
648
  SVATask *task = NULL;
×
UNCOV
649
  SVATask  task2 = {
×
UNCOV
650
       .taskId = taskID->id,
×
651
  };
UNCOV
652
  void (*cancel)(void *) = NULL;
×
UNCOV
653
  void *arg = NULL;
×
654

UNCOV
655
  (void)taosThreadMutexLock(&async->mutex);
×
656

UNCOV
657
  ret = vHashGet(async->taskTable, &task2, (void **)&task);
×
UNCOV
658
  if (task) {
×
UNCOV
659
    if (task->state == EVA_TASK_STATE_WAITTING) {
×
UNCOV
660
      cancel = task->cancel;
×
UNCOV
661
      arg = task->arg;
×
UNCOV
662
      task->next->prev = task->prev;
×
UNCOV
663
      task->prev->next = task->next;
×
UNCOV
664
      vnodeAsyncTaskDone(async, task);
×
665
    } else {
UNCOV
666
      ret = TSDB_CODE_FAILED;
×
667
    }
668
  }
669

UNCOV
670
  (void)taosThreadMutexUnlock(&async->mutex);
×
671

UNCOV
672
  if (cancel) {
×
UNCOV
673
    cancel(arg);
×
674
  }
675

UNCOV
676
  return ret;
×
677
}
678

UNCOV
679
int32_t vnodeAsyncSetWorkers(int64_t asyncID, int32_t numWorkers) {
×
UNCOV
680
  if (asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) {
×
681
    return TSDB_CODE_INVALID_PARA;
×
682
  }
683
  int32_t  ret;
UNCOV
684
  SVAsync *async = GVnodeAsyncs[asyncID].async;
×
UNCOV
685
  (void)taosThreadMutexLock(&async->mutex);
×
UNCOV
686
  async->numWorkers = numWorkers;
×
UNCOV
687
  if (async->numIdleWorkers > 0) {
×
688
    (void)taosThreadCondBroadcast(&async->hasTask);
×
689
  }
UNCOV
690
  (void)taosThreadMutexUnlock(&async->mutex);
×
691

UNCOV
692
  return 0;
×
693
}
694

695
int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) {
×
696
  if (channelID == NULL || asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID) {
×
697
    return TSDB_CODE_INVALID_PARA;
×
698
  }
699

700
  SVAsync *async = GVnodeAsyncs[asyncID].async;
×
701

702
  // create channel object
703
  SVAChannel *channel = (SVAChannel *)taosMemoryMalloc(sizeof(SVAChannel));
×
704
  if (channel == NULL) {
×
705
    return terrno;
×
706
  }
707
  channel->state = EVA_CHANNEL_STATE_OPEN;
×
708
  for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
709
    channel->queue[i].next = &channel->queue[i];
×
710
    channel->queue[i].prev = &channel->queue[i];
×
711
  }
712
  channel->scheduled = NULL;
×
713

714
  // register channel
715
  (void)taosThreadMutexLock(&async->mutex);
×
716

717
  channel->channelId = channelID->id = ++async->nextChannelId;
×
718

719
  // add to hash table
720
  int32_t ret = vHashPut(async->channelTable, channel);
×
721
  if (ret != 0) {
×
722
    (void)taosThreadMutexUnlock(&async->mutex);
×
723
    taosMemoryFree(channel);
×
724
    return ret;
×
725
  }
726

727
  // add to list
728
  channel->next = &async->chList;
×
729
  channel->prev = async->chList.prev;
×
730
  channel->next->prev = channel;
×
731
  channel->prev->next = channel;
×
732

733
  async->numChannels++;
×
734

735
  (void)taosThreadMutexUnlock(&async->mutex);
×
736

737
  channelID->async = asyncID;
×
738
  return 0;
×
739
}
740

741
int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
×
742
  if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || channelID->id <= 0) {
×
743
    return TSDB_CODE_INVALID_PARA;
×
744
  }
745

746
  SVAsync    *async = GVnodeAsyncs[channelID->async].async;
×
747
  SVAChannel *channel = NULL;
×
748
  SVAChannel  channel2 = {
×
749
       .channelId = channelID->id,
×
750
  };
751
  SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
×
752
  if (cancelArray == NULL) {
×
753
    return terrno;
×
754
  }
755

756
  (void)taosThreadMutexLock(&async->mutex);
×
757

758
  int32_t ret = vHashGet(async->channelTable, &channel2, (void **)&channel);
×
759
  TAOS_UNUSED(ret);
760
  if (channel) {
×
761
    // unregister channel
762
    channel->next->prev = channel->prev;
×
763
    channel->prev->next = channel->next;
×
764
    ret = vHashDrop(async->channelTable, channel);
×
765
    async->numChannels--;
×
766

767
    // cancel all waiting tasks
768
    for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
769
      while (channel->queue[i].next != &channel->queue[i]) {
×
770
        SVATask *task = channel->queue[i].next;
×
771
        task->prev->next = task->next;
×
772
        task->next->prev = task->prev;
×
773
        if (task->cancel) {
×
774
          if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
×
775
                                             .cancel = task->cancel,
×
776
                                             .arg = task->arg,
×
777
                                         }) == NULL) {
778
            vError("failed to push cancel info");
×
779
          };
780
        }
781
        vnodeAsyncTaskDone(async, task);
×
782
      }
783
    }
784

785
    // cancel or wait the scheduled task
786
    if (channel->scheduled == NULL || channel->scheduled->state == EVA_TASK_STATE_WAITTING) {
×
787
      if (channel->scheduled) {
×
788
        channel->scheduled->prev->next = channel->scheduled->next;
×
789
        channel->scheduled->next->prev = channel->scheduled->prev;
×
790
        if (channel->scheduled->cancel) {
×
791
          if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
×
792
                                             .cancel = channel->scheduled->cancel,
×
793
                                             .arg = channel->scheduled->arg,
×
794
                                         }) == NULL) {
795
            vError("failed to push cancel info");
×
796
          }
797
        }
798
        vnodeAsyncTaskDone(async, channel->scheduled);
×
799
      }
800
      taosMemoryFree(channel);
×
801
    } else {
802
      if (waitRunning) {
×
803
        // wait task
804
        SVATask *task = channel->scheduled;
×
805
        task->numWait++;
×
806
        (void)taosThreadCondWait(&task->waitCond, &async->mutex);
×
807
        task->numWait--;
×
808
        if (task->numWait == 0) {
×
809
          (void)taosThreadCondDestroy(&task->waitCond);
×
810
          taosMemoryFree(task);
×
811
        }
812

813
        taosMemoryFree(channel);
×
814
      } else {
815
        channel->state = EVA_CHANNEL_STATE_CLOSE;
×
816
      }
817
    }
818
  }
819

820
  (void)taosThreadMutexUnlock(&async->mutex);
×
821
  for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
×
822
    SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
×
823
    cancel->cancel(cancel->arg);
×
824
  }
825
  taosArrayDestroy(cancelArray);
×
826

827
  channelID->async = 0;
×
828
  channelID->id = 0;
×
829
  return 0;
×
830
}
831

UNCOV
832
const char *vnodeGetATaskName(EVATaskT taskType) {
×
UNCOV
833
  switch (taskType) {
×
UNCOV
834
    case EVA_TASK_COMMIT:
×
UNCOV
835
      return "vnode-commit";
×
UNCOV
836
    case EVA_TASK_MERGE:
×
UNCOV
837
      return "vnode-merge";
×
UNCOV
838
    case EVA_TASK_COMPACT:
×
UNCOV
839
      return "vnode-compact";
×
UNCOV
840
    case EVA_TASK_RETENTION:
×
UNCOV
841
      return "vnode-retention";
×
842
    default:
×
843
      return "unknown";
×
844
  }
845
}
846

UNCOV
847
bool vnodeATaskValid(SVATaskID *taskID) {
×
UNCOV
848
  if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
×
UNCOV
849
    return false;
×
850
  }
851

UNCOV
852
  SVAsync *async = GVnodeAsyncs[taskID->async].async;
×
UNCOV
853
  SVATask  task2 = {
×
UNCOV
854
       .taskId = taskID->id,
×
855
  };
UNCOV
856
  SVATask *task = NULL;
×
857

UNCOV
858
  (void)taosThreadMutexLock(&async->mutex);
×
UNCOV
859
  int32_t ret = vHashGet(async->taskTable, &task2, (void **)&task);
×
UNCOV
860
  (void)taosThreadMutexUnlock(&async->mutex);
×
861

UNCOV
862
  return ret == 0 && task != NULL;
×
863
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc