• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3541

26 Nov 2024 03:56AM UTC coverage: 60.776% (-0.07%) from 60.846%
#3541

push

travis-ci

web-flow
Merge pull request #28920 from taosdata/fix/TD-33008-3.0

fix(query)[TD-33008]. fix error handling in tsdbCacheRead

120076 of 252763 branches covered (47.51%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

1395 existing lines in 154 files now uncovered.

200995 of 275526 relevant lines covered (72.95%)

19612328.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.03
/source/dnode/vnode/src/vnd/vnodeAsync.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "vnd.h"
17
#include "vnodeHash.h"
18

19
typedef struct SVAsync    SVAsync;
20
typedef struct SVATask    SVATask;
21
typedef struct SVAChannel SVAChannel;
22

23
#define VNODE_ASYNC_DEFAULT_WORKERS 4
24
#define VNODE_ASYNC_MAX_WORKERS     256
25

26
// priority
27

28
#define EVA_PRIORITY_MAX (EVA_PRIORITY_LOW + 1)
29

30
// worker
31
typedef enum {
32
  EVA_WORKER_STATE_UINIT = 0,
33
  EVA_WORKER_STATE_ACTIVE,
34
  EVA_WORKER_STATE_IDLE,
35
  EVA_WORKER_STATE_STOP,
36
} EVWorkerState;
37

38
typedef struct {
39
  SVAsync      *async;
40
  int32_t       workerId;
41
  EVWorkerState state;
42
  TdThread      thread;
43
  SVATask      *runningTask;
44
} SVWorker;
45

46
// task
47
typedef enum {
48
  EVA_TASK_STATE_WAITTING = 0,
49
  EVA_TASK_STATE_RUNNING,
50
} EVATaskState;
51

52
struct SVATask {
53
  int64_t     taskId;
54
  EVAPriority priority;
55
  int32_t     priorScore;
56
  SVAChannel *channel;
57
  int32_t (*execute)(void *);
58
  void (*cancel)(void *);
59
  void        *arg;
60
  EVATaskState state;
61

62
  // wait
63
  int32_t      numWait;
64
  TdThreadCond waitCond;
65

66
  // queue
67
  struct SVATask *prev;
68
  struct SVATask *next;
69
};
70

71
typedef struct {
72
  void (*cancel)(void *);
73
  void *arg;
74
} SVATaskCancelInfo;
75

76
#define VATASK_PIORITY(task_) ((task_)->priority - ((task_)->priorScore / 4))
77

78
// async channel
79
typedef enum {
80
  EVA_CHANNEL_STATE_OPEN = 0,
81
  EVA_CHANNEL_STATE_CLOSE,
82
} EVAChannelState;
83

84
struct SVAChannel {
85
  int64_t         channelId;
86
  EVAChannelState state;
87
  SVATask         queue[EVA_PRIORITY_MAX];
88
  SVATask        *scheduled;
89

90
  SVAChannel *prev;
91
  SVAChannel *next;
92
};
93

94
// async handle
95
struct SVAsync {
96
  const char *label;
97

98
  TdThreadMutex mutex;
99
  TdThreadCond  hasTask;
100
  bool          stop;
101

102
  // worker
103
  int32_t  numWorkers;
104
  int32_t  numLaunchWorkers;
105
  int32_t  numIdleWorkers;
106
  SVWorker workers[VNODE_ASYNC_MAX_WORKERS];
107

108
  // channel
109
  int64_t      nextChannelId;
110
  int32_t      numChannels;
111
  SVAChannel   chList;
112
  SVHashTable *channelTable;
113

114
  // task
115
  int64_t      nextTaskId;
116
  int32_t      numTasks;
117
  SVATask      queue[EVA_PRIORITY_MAX];
118
  SVHashTable *taskTable;
119
};
120

121
SVAsync *vnodeAsyncs[3];
122
#define MIN_ASYNC_ID 1
123
#define MAX_ASYNC_ID (sizeof(vnodeAsyncs) / sizeof(vnodeAsyncs[0]) - 1)
124

125
static void vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
39,594✔
126
  int32_t ret;
127

128
  if (task->channel != NULL && task->channel->scheduled == task) {
39,594!
129
    task->channel->scheduled = NULL;
39,582✔
130
    if (task->channel->state == EVA_CHANNEL_STATE_CLOSE) {
39,582!
131
      taosMemoryFree(task->channel);
×
132
    } else {
133
      for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
158,328✔
134
        SVATask *nextTask = task->channel->queue[i].next;
118,746✔
135
        if (nextTask != &task->channel->queue[i]) {
118,746✔
136
          if (task->channel->scheduled == NULL) {
1,567✔
137
            task->channel->scheduled = nextTask;
1,563✔
138
            nextTask->next->prev = nextTask->prev;
1,563✔
139
            nextTask->prev->next = nextTask->next;
1,563✔
140
          } else {
141
            nextTask->priorScore++;
4✔
142
            int32_t newPriority = VATASK_PIORITY(nextTask);
4✔
143
            if (newPriority != i) {
4!
144
              // remove from current priority queue
145
              nextTask->prev->next = nextTask->next;
×
146
              nextTask->next->prev = nextTask->prev;
×
147
              // add to new priority queue
148
              nextTask->next = &task->channel->queue[newPriority];
×
149
              nextTask->prev = task->channel->queue[newPriority].prev;
×
150
              nextTask->next->prev = nextTask;
×
151
              nextTask->prev->next = nextTask;
×
152
            }
153
          }
154
        }
155
      }
156

157
      if (task->channel->scheduled != NULL) {
39,582✔
158
        int32_t priority = VATASK_PIORITY(task->channel->scheduled);
1,563✔
159
        task->channel->scheduled->next = &async->queue[priority];
1,563✔
160
        task->channel->scheduled->prev = async->queue[priority].prev;
1,563✔
161
        task->channel->scheduled->next->prev = task->channel->scheduled;
1,563✔
162
        task->channel->scheduled->prev->next = task->channel->scheduled;
1,563✔
163
      }
164
    }
165
  }
166

167
  ret = vHashDrop(async->taskTable, task);
39,594✔
168
  TAOS_UNUSED(ret);
169
  async->numTasks--;
39,594✔
170

171
  if (task->numWait == 0) {
39,594✔
172
    (void)taosThreadCondDestroy(&task->waitCond);
35,295✔
173
    taosMemoryFree(task);
35,295✔
174
  } else if (task->numWait == 1) {
4,299!
175
    (void)taosThreadCondSignal(&task->waitCond);
4,299✔
176
  } else {
177
    (void)taosThreadCondBroadcast(&task->waitCond);
×
178
  }
179
}
39,594✔
180

181
static void vnodeAsyncCancelAllTasks(SVAsync *async, SArray *cancelArray) {
3,587✔
182
  while (async->queue[0].next != &async->queue[0] || async->queue[1].next != &async->queue[1] ||
3,587!
183
         async->queue[2].next != &async->queue[2]) {
3,587!
184
    for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
×
185
      while (async->queue[i].next != &async->queue[i]) {
×
186
        SVATask *task = async->queue[i].next;
×
187
        task->prev->next = task->next;
×
188
        task->next->prev = task->prev;
×
189
        if (task->cancel) {
×
190
          if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
×
191
                                             .cancel = task->cancel,
×
192
                                             .arg = task->arg,
×
193
                                         }) == NULL) {
194
            vError("failed to push cancel task into array");
×
195
          };
196
        }
197
        vnodeAsyncTaskDone(async, task);
×
198
      }
199
    }
200
  }
201
}
3,587✔
202

203
static void *vnodeAsyncLoop(void *arg) {
3,587✔
204
  SVWorker *worker = (SVWorker *)arg;
3,587✔
205
  SVAsync  *async = worker->async;
3,587✔
206
  SArray   *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
3,587✔
207
  if (cancelArray == NULL) {
3,587!
208
    return NULL;
×
209
  }
210

211
  setThreadName(async->label);
3,587✔
212

213
  for (;;) {
39,564✔
214
    (void)taosThreadMutexLock(&async->mutex);
43,151✔
215

216
    // finish last running task
217
    if (worker->runningTask != NULL) {
43,152✔
218
      vnodeAsyncTaskDone(async, worker->runningTask);
39,565✔
219
      worker->runningTask = NULL;
39,565✔
220
    }
221

222
    for (;;) {
223
      if (async->stop || worker->workerId >= async->numWorkers) {
79,601!
224
        if (async->stop) {  // cancel all tasks
3,587!
225
          vnodeAsyncCancelAllTasks(async, cancelArray);
3,587✔
226
        }
227
        worker->state = EVA_WORKER_STATE_STOP;
3,587✔
228
        async->numLaunchWorkers--;
3,587✔
229
        (void)taosThreadMutexUnlock(&async->mutex);
3,587✔
230
        goto _exit;
3,583✔
231
      }
232

233
      for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
304,056✔
234
        SVATask *task = async->queue[i].next;
228,042✔
235
        if (task != &async->queue[i]) {
228,042✔
236
          if (worker->runningTask == NULL) {
39,579✔
237
            worker->runningTask = task;
39,565✔
238
            task->prev->next = task->next;
39,565✔
239
            task->next->prev = task->prev;
39,565✔
240
          } else {  // promote priority
241
            task->priorScore++;
14✔
242
            int32_t priority = VATASK_PIORITY(task);
14✔
243
            if (priority != i) {
14✔
244
              // remove from current priority queue
245
              task->prev->next = task->next;
3✔
246
              task->next->prev = task->prev;
3✔
247
              // add to new priority queue
248
              task->next = &async->queue[priority];
3✔
249
              task->prev = async->queue[priority].prev;
3✔
250
              task->next->prev = task;
3✔
251
              task->prev->next = task;
3✔
252
            }
253
          }
254
        }
255
      }
256

257
      if (worker->runningTask == NULL) {
76,014✔
258
        worker->state = EVA_WORKER_STATE_IDLE;
36,449✔
259
        async->numIdleWorkers++;
36,449✔
260
        (void)taosThreadCondWait(&async->hasTask, &async->mutex);
36,449✔
261
        async->numIdleWorkers--;
36,449✔
262
        worker->state = EVA_WORKER_STATE_ACTIVE;
36,449✔
263
      } else {
264
        worker->runningTask->state = EVA_TASK_STATE_RUNNING;
39,565✔
265
        break;
39,565✔
266
      }
267
    }
268

269
    (void)taosThreadMutexUnlock(&async->mutex);
39,565✔
270

271
    // do run the task
272
    int32_t code = worker->runningTask->execute(worker->runningTask->arg);
39,563✔
273
    TAOS_UNUSED(code);
274
  }
275

276
_exit:
3,583✔
277
  for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
3,583!
278
    SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
×
279
    cancel->cancel(cancel->arg);
×
280
  }
281
  taosArrayDestroy(cancelArray);
3,583✔
282
  return NULL;
3,579✔
283
}
284

285
static uint32_t vnodeAsyncTaskHash(const void *obj) {
111,647✔
286
  SVATask *task = (SVATask *)obj;
111,647✔
287
  return MurmurHash3_32((const char *)(&task->taskId), sizeof(task->taskId));
111,647✔
288
}
289

290
static int32_t vnodeAsyncTaskCompare(const void *obj1, const void *obj2) {
43,921✔
291
  SVATask *task1 = (SVATask *)obj1;
43,921✔
292
  SVATask *task2 = (SVATask *)obj2;
43,921✔
293
  if (task1->taskId < task2->taskId) {
43,921✔
294
    return -1;
42✔
295
  } else if (task1->taskId > task2->taskId) {
43,879✔
296
    return 1;
36✔
297
  }
298
  return 0;
43,843✔
299
}
300

301
static uint32_t vnodeAsyncChannelHash(const void *obj) {
86,868✔
302
  SVAChannel *channel = (SVAChannel *)obj;
86,868✔
303
  return MurmurHash3_32((const char *)(&channel->channelId), sizeof(channel->channelId));
86,868✔
304
}
305

306
static int32_t vnodeAsyncChannelCompare(const void *obj1, const void *obj2) {
71,288✔
307
  SVAChannel *channel1 = (SVAChannel *)obj1;
71,288✔
308
  SVAChannel *channel2 = (SVAChannel *)obj2;
71,288✔
309
  if (channel1->channelId < channel2->channelId) {
71,288✔
310
    return -1;
49✔
311
  } else if (channel1->channelId > channel2->channelId) {
71,239✔
312
    return 1;
129✔
313
  }
314
  return 0;
71,110✔
315
}
316

317
static int32_t vnodeAsyncInit(SVAsync **async, const char *label) {
4,786✔
318
  int32_t ret;
319

320
  if (async == NULL) {
4,786!
321
    return TSDB_CODE_INVALID_PARA;
×
322
  }
323

324
  if (label == NULL) {
4,786!
325
    label = "anonymous";
×
326
  }
327

328
  (*async) = (SVAsync *)taosMemoryCalloc(1, sizeof(SVAsync) + strlen(label) + 1);
4,786✔
329
  if ((*async) == NULL) {
4,786!
330
    return terrno;
×
331
  }
332

333
  strcpy((char *)((*async) + 1), label);
4,786✔
334
  (*async)->label = (const char *)((*async) + 1);
4,786✔
335

336
  (void)taosThreadMutexInit(&(*async)->mutex, NULL);
4,786✔
337
  (void)taosThreadCondInit(&(*async)->hasTask, NULL);
4,786✔
338
  (*async)->stop = false;
4,786✔
339

340
  // worker
341
  (*async)->numWorkers = VNODE_ASYNC_DEFAULT_WORKERS;
4,786✔
342
  (*async)->numLaunchWorkers = 0;
4,786✔
343
  (*async)->numIdleWorkers = 0;
4,786✔
344
  for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) {
1,230,002✔
345
    (*async)->workers[i].async = (*async);
1,225,216✔
346
    (*async)->workers[i].workerId = i;
1,225,216✔
347
    (*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
1,225,216✔
348
    (*async)->workers[i].runningTask = NULL;
1,225,216✔
349
  }
350

351
  // channel
352
  (*async)->nextChannelId = 0;
4,786✔
353
  (*async)->numChannels = 0;
4,786✔
354
  (*async)->chList.prev = &(*async)->chList;
4,786✔
355
  (*async)->chList.next = &(*async)->chList;
4,786✔
356
  ret = vHashInit(&(*async)->channelTable, vnodeAsyncChannelHash, vnodeAsyncChannelCompare);
4,786✔
357
  if (ret != 0) {
4,786!
358
    (void)taosThreadMutexDestroy(&(*async)->mutex);
×
359
    (void)taosThreadCondDestroy(&(*async)->hasTask);
×
360
    taosMemoryFree(*async);
×
361
    return ret;
×
362
  }
363

364
  // task
365
  (*async)->nextTaskId = 0;
4,786✔
366
  (*async)->numTasks = 0;
4,786✔
367
  for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
19,144✔
368
    (*async)->queue[i].next = &(*async)->queue[i];
14,358✔
369
    (*async)->queue[i].prev = &(*async)->queue[i];
14,358✔
370
  }
371
  ret = vHashInit(&(*async)->taskTable, vnodeAsyncTaskHash, vnodeAsyncTaskCompare);
4,786✔
372
  if (ret != 0) {
4,786!
373
    vHashDestroy(&(*async)->channelTable);
×
374
    (void)taosThreadMutexDestroy(&(*async)->mutex);
×
375
    (void)taosThreadCondDestroy(&(*async)->hasTask);
×
376
    taosMemoryFree(*async);
×
377
    return ret;
×
378
  }
379

380
  return 0;
4,786✔
381
}
382

383
static int32_t vnodeAsyncDestroy(SVAsync **async) {
4,786✔
384
  if ((*async) == NULL) {
4,786!
385
    return TSDB_CODE_INVALID_PARA;
×
386
  }
387

388
  // set stop and broadcast
389
  (void)taosThreadMutexLock(&(*async)->mutex);
4,786✔
390
  (*async)->stop = true;
4,786✔
391
  (void)taosThreadCondBroadcast(&(*async)->hasTask);
4,786✔
392
  (void)taosThreadMutexUnlock(&(*async)->mutex);
4,786✔
393

394
  // join all workers
395
  for (int32_t i = 0; i < VNODE_ASYNC_MAX_WORKERS; i++) {
1,230,002✔
396
    (void)taosThreadMutexLock(&(*async)->mutex);
1,225,216✔
397
    EVWorkerState state = (*async)->workers[i].state;
1,225,216✔
398
    (void)taosThreadMutexUnlock(&(*async)->mutex);
1,225,216✔
399

400
    if (state == EVA_WORKER_STATE_UINIT) {
1,225,216✔
401
      continue;
1,221,629✔
402
    }
403

404
    (void)taosThreadJoin((*async)->workers[i].thread, NULL);
3,587✔
405
    (*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
3,587✔
406
  }
407

408
  // close all channels
409
  for (SVAChannel *channel = (*async)->chList.next; channel != &(*async)->chList; channel = (*async)->chList.next) {
4,786!
410
    channel->next->prev = channel->prev;
×
411
    channel->prev->next = channel->next;
×
412

413
    int32_t ret = vHashDrop((*async)->channelTable, channel);
×
414
    TAOS_UNUSED(ret);
415
    (*async)->numChannels--;
×
416
    taosMemoryFree(channel);
×
417
  }
418

419
  (void)taosThreadMutexDestroy(&(*async)->mutex);
4,786✔
420
  (void)taosThreadCondDestroy(&(*async)->hasTask);
4,786✔
421

422
  vHashDestroy(&(*async)->channelTable);
4,786✔
423
  vHashDestroy(&(*async)->taskTable);
4,786✔
424
  taosMemoryFree(*async);
4,786✔
425
  *async = NULL;
4,786✔
426

427
  return 0;
4,786✔
428
}
429

430
static void vnodeAsyncLaunchWorker(SVAsync *async) {
3,587✔
431
  for (int32_t i = 0; i < async->numWorkers; i++) {
6,886!
432
    if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) {
6,886✔
433
      continue;
3,299✔
434
    } else if (async->workers[i].state == EVA_WORKER_STATE_STOP) {
3,587!
435
      int32_t ret = taosThreadJoin(async->workers[i].thread, NULL);
×
436
      async->workers[i].state = EVA_WORKER_STATE_UINIT;
×
437
    }
438

439
    int32_t ret = taosThreadCreate(&async->workers[i].thread, NULL, vnodeAsyncLoop, &async->workers[i]);
3,587✔
440
    if (ret) {
3,587!
441
      vError("failed to create worker thread since %s", tstrerror(ret));
×
442
    } else {
443
      async->workers[i].state = EVA_WORKER_STATE_ACTIVE;
3,587✔
444
      async->numLaunchWorkers++;
3,587✔
445
    }
446
    break;
3,587✔
447
  }
448
}
3,587✔
449

450
int32_t vnodeAsyncOpen(int32_t numOfThreads) {
2,393✔
451
  int32_t code = 0;
2,393✔
452
  int32_t lino = 0;
2,393✔
453

454
  // vnode-commit
455
  code = vnodeAsyncInit(&vnodeAsyncs[1], "vnode-commit");
2,393✔
456
  TSDB_CHECK_CODE(code, lino, _exit);
2,393!
457

458
  code = vnodeAsyncSetWorkers(1, numOfThreads);
2,393✔
459
  TSDB_CHECK_CODE(code, lino, _exit);
2,393!
460

461
  // vnode-merge
462
  code = vnodeAsyncInit(&vnodeAsyncs[2], "vnode-merge");
2,393✔
463
  TSDB_CHECK_CODE(code, lino, _exit);
2,393!
464

465
  code = vnodeAsyncSetWorkers(2, numOfThreads);
2,393✔
466
  TSDB_CHECK_CODE(code, lino, _exit);
2,393!
467

468
_exit:
2,393✔
469
  return code;
2,393✔
470
}
471

472
void vnodeAsyncClose() {
2,393✔
473
  int32_t ret;
474
  ret = vnodeAsyncDestroy(&vnodeAsyncs[1]);
2,393✔
475
  ret = vnodeAsyncDestroy(&vnodeAsyncs[2]);
2,393✔
476
}
2,393✔
477

478
int32_t vnodeAsync(SVAChannelID *channelID, EVAPriority priority, int32_t (*execute)(void *), void (*cancel)(void *),
39,585✔
479
                   void *arg, SVATaskID *taskID) {
480
  if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || execute == NULL ||
39,585!
481
      channelID->id < 0) {
39,589!
482
    return TSDB_CODE_INVALID_PARA;
×
483
  }
484

485
  int32_t  ret;
486
  int64_t  id;
487
  SVAsync *async = vnodeAsyncs[channelID->async];
39,590✔
488

489
  // create task object
490
  SVATask *task = (SVATask *)taosMemoryCalloc(1, sizeof(SVATask));
39,590✔
491
  if (task == NULL) {
39,589!
492
    return terrno;
×
493
  }
494

495
  task->priority = priority;
39,589✔
496
  task->priorScore = 0;
39,589✔
497
  task->execute = execute;
39,589✔
498
  task->cancel = cancel;
39,589✔
499
  task->arg = arg;
39,589✔
500
  task->state = EVA_TASK_STATE_WAITTING;
39,589✔
501
  task->numWait = 0;
39,589✔
502
  (void)taosThreadCondInit(&task->waitCond, NULL);
39,589✔
503

504
  // schedule task
505
  (void)taosThreadMutexLock(&async->mutex);
39,587✔
506

507
  if (channelID->id == 0) {
39,594!
508
    task->channel = NULL;
×
509
  } else {
510
    SVAChannel channel = {
39,594✔
511
        .channelId = channelID->id,
39,594✔
512
    };
513
    ret = vHashGet(async->channelTable, &channel, (void **)&task->channel);
39,594✔
514
    TAOS_UNUSED(ret);
515
    if (task->channel == NULL) {
39,594!
516
      (void)taosThreadMutexUnlock(&async->mutex);
×
517
      (void)taosThreadCondDestroy(&task->waitCond);
×
518
      taosMemoryFree(task);
×
519
      return TSDB_CODE_INVALID_PARA;
×
520
    }
521
  }
522

523
  task->taskId = id = ++async->nextTaskId;
39,594✔
524

525
  // add task to hash table
526
  ret = vHashPut(async->taskTable, task);
39,594✔
527
  if (ret != 0) {
39,594!
528
    (void)taosThreadMutexUnlock(&async->mutex);
×
529
    (void)taosThreadCondDestroy(&task->waitCond);
×
530
    taosMemoryFree(task);
×
531
    return ret;
×
532
  }
533

534
  async->numTasks++;
39,594✔
535

536
  // add task to queue
537
  if (task->channel == NULL || task->channel->scheduled == NULL) {
39,594!
538
    // add task to async->queue
539
    if (task->channel) {
38,019!
540
      task->channel->scheduled = task;
38,019✔
541
    }
542

543
    task->next = &async->queue[priority];
38,019✔
544
    task->prev = async->queue[priority].prev;
38,019✔
545
    task->next->prev = task;
38,019✔
546
    task->prev->next = task;
38,019✔
547

548
    // signal worker or launch new worker
549
    if (async->numIdleWorkers > 0) {
38,019✔
550
      (void)taosThreadCondSignal(&(async->hasTask));
33,222✔
551
    } else if (async->numLaunchWorkers < async->numWorkers) {
4,797✔
552
      vnodeAsyncLaunchWorker(async);
3,587✔
553
    }
554
  } else if (task->channel->scheduled->state == EVA_TASK_STATE_RUNNING ||
1,575✔
555
             priority >= VATASK_PIORITY(task->channel->scheduled)) {
4!
556
    // add task to task->channel->queue
557
    task->next = &task->channel->queue[priority];
1,571✔
558
    task->prev = task->channel->queue[priority].prev;
1,571✔
559
    task->next->prev = task;
1,571✔
560
    task->prev->next = task;
1,571✔
561
  } else {
562
    // remove task->channel->scheduled from queue
563
    task->channel->scheduled->prev->next = task->channel->scheduled->next;
4✔
564
    task->channel->scheduled->next->prev = task->channel->scheduled->prev;
4✔
565

566
    // promote priority and add task->channel->scheduled to task->channel->queue
567
    task->channel->scheduled->priorScore++;
4✔
568
    int32_t newPriority = VATASK_PIORITY(task->channel->scheduled);
4✔
569
    task->channel->scheduled->next = &task->channel->queue[newPriority];
4✔
570
    task->channel->scheduled->prev = task->channel->queue[newPriority].prev;
4✔
571
    task->channel->scheduled->next->prev = task->channel->scheduled;
4✔
572
    task->channel->scheduled->prev->next = task->channel->scheduled;
4✔
573

574
    // add task to queue
575
    task->channel->scheduled = task;
4✔
576
    task->next = &async->queue[priority];
4✔
577
    task->prev = async->queue[priority].prev;
4✔
578
    task->next->prev = task;
4✔
579
    task->prev->next = task;
4✔
580
  }
581

582
  (void)taosThreadMutexUnlock(&async->mutex);
39,594✔
583

584
  if (taskID != NULL) {
39,594✔
585
    taskID->async = channelID->async;
32,415✔
586
    taskID->id = id;
32,415✔
587
  }
588

589
  return 0;
39,594✔
590
}
591

592
void vnodeAWait(SVATaskID *taskID) {
46,153✔
593
  if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
46,153!
594
    return;
13,697✔
595
  }
596

597
  SVAsync *async = vnodeAsyncs[taskID->async];
32,456✔
598
  SVATask *task = NULL;
32,456✔
599
  SVATask  task2 = {
32,456✔
600
       .taskId = taskID->id,
32,456✔
601
  };
602

603
  (void)taosThreadMutexLock(&async->mutex);
32,456✔
604

605
  int32_t ret = vHashGet(async->taskTable, &task2, (void **)&task);
32,459✔
606
  if (task) {
32,459✔
607
    task->numWait++;
4,249✔
608
    (void)taosThreadCondWait(&task->waitCond, &async->mutex);
4,249✔
609
    task->numWait--;
4,249✔
610

611
    if (task->numWait == 0) {
4,249!
612
      (void)taosThreadCondDestroy(&task->waitCond);
4,249✔
613
      taosMemoryFree(task);
4,249✔
614
    }
615
  }
616

617
  (void)taosThreadMutexUnlock(&async->mutex);
32,459✔
618
}
619

620
int32_t vnodeACancel(SVATaskID *taskID) {
×
621
  if (taskID == NULL || taskID->async < MIN_ASYNC_ID || taskID->async > MAX_ASYNC_ID || taskID->id <= 0) {
×
622
    return TSDB_CODE_INVALID_PARA;
×
623
  }
624

625
  int32_t  ret = 0;
×
626
  SVAsync *async = vnodeAsyncs[taskID->async];
×
627
  SVATask *task = NULL;
×
628
  SVATask  task2 = {
×
629
       .taskId = taskID->id,
×
630
  };
631
  void (*cancel)(void *) = NULL;
×
632
  void *arg = NULL;
×
633

634
  (void)taosThreadMutexLock(&async->mutex);
×
635

636
  ret = vHashGet(async->taskTable, &task2, (void **)&task);
×
637
  if (task) {
×
638
    if (task->state == EVA_TASK_STATE_WAITTING) {
×
639
      cancel = task->cancel;
×
640
      arg = task->arg;
×
641
      task->next->prev = task->prev;
×
642
      task->prev->next = task->next;
×
643
      vnodeAsyncTaskDone(async, task);
×
644
    } else {
645
      ret = TSDB_CODE_FAILED;
×
646
    }
647
  }
648

649
  (void)taosThreadMutexUnlock(&async->mutex);
×
650

651
  if (cancel) {
×
652
    cancel(arg);
×
653
  }
654

655
  return ret;
×
656
}
657

658
int32_t vnodeAsyncSetWorkers(int64_t asyncID, int32_t numWorkers) {
4,786✔
659
  if (asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID || numWorkers <= 0 || numWorkers > VNODE_ASYNC_MAX_WORKERS) {
4,786!
660
    return TSDB_CODE_INVALID_PARA;
×
661
  }
662
  int32_t  ret;
663
  SVAsync *async = vnodeAsyncs[asyncID];
4,786✔
664
  (void)taosThreadMutexLock(&async->mutex);
4,786✔
665
  async->numWorkers = numWorkers;
4,786✔
666
  if (async->numIdleWorkers > 0) {
4,786!
667
    (void)taosThreadCondBroadcast(&async->hasTask);
×
668
  }
669
  (void)taosThreadMutexUnlock(&async->mutex);
4,786✔
670

671
  return 0;
4,786✔
672
}
673

674
int32_t vnodeAChannelInit(int64_t asyncID, SVAChannelID *channelID) {
15,713✔
675
  if (channelID == NULL || asyncID < MIN_ASYNC_ID || asyncID > MAX_ASYNC_ID) {
15,713!
676
    return TSDB_CODE_INVALID_PARA;
×
677
  }
678

679
  SVAsync *async = vnodeAsyncs[asyncID];
15,751✔
680

681
  // create channel object
682
  SVAChannel *channel = (SVAChannel *)taosMemoryMalloc(sizeof(SVAChannel));
15,751✔
683
  if (channel == NULL) {
15,746!
UNCOV
684
    return terrno;
×
685
  }
686
  channel->state = EVA_CHANNEL_STATE_OPEN;
15,751✔
687
  for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
62,965✔
688
    channel->queue[i].next = &channel->queue[i];
47,214✔
689
    channel->queue[i].prev = &channel->queue[i];
47,214✔
690
  }
691
  channel->scheduled = NULL;
15,751✔
692

693
  // register channel
694
  (void)taosThreadMutexLock(&async->mutex);
15,751✔
695

696
  channel->channelId = channelID->id = ++async->nextChannelId;
15,758✔
697

698
  // add to hash table
699
  int32_t ret = vHashPut(async->channelTable, channel);
15,758✔
700
  if (ret != 0) {
15,758!
701
    (void)taosThreadMutexUnlock(&async->mutex);
×
702
    taosMemoryFree(channel);
×
703
    return ret;
×
704
  }
705

706
  // add to list
707
  channel->next = &async->chList;
15,758✔
708
  channel->prev = async->chList.prev;
15,758✔
709
  channel->next->prev = channel;
15,758✔
710
  channel->prev->next = channel;
15,758✔
711

712
  async->numChannels++;
15,758✔
713

714
  (void)taosThreadMutexUnlock(&async->mutex);
15,758✔
715

716
  channelID->async = asyncID;
15,758✔
717
  return 0;
15,758✔
718
}
719

720
int32_t vnodeAChannelDestroy(SVAChannelID *channelID, bool waitRunning) {
15,758✔
721
  if (channelID == NULL || channelID->async < MIN_ASYNC_ID || channelID->async > MAX_ASYNC_ID || channelID->id <= 0) {
15,758!
722
    return TSDB_CODE_INVALID_PARA;
1✔
723
  }
724

725
  SVAsync    *async = vnodeAsyncs[channelID->async];
15,757✔
726
  SVAChannel *channel = NULL;
15,757✔
727
  SVAChannel  channel2 = {
15,757✔
728
       .channelId = channelID->id,
15,757✔
729
  };
730
  SArray *cancelArray = taosArrayInit(0, sizeof(SVATaskCancelInfo));
15,757✔
731
  if (cancelArray == NULL) {
15,757!
732
    return terrno;
×
733
  }
734

735
  (void)taosThreadMutexLock(&async->mutex);
15,757✔
736

737
  int32_t ret = vHashGet(async->channelTable, &channel2, (void **)&channel);
15,758✔
738
  TAOS_UNUSED(ret);
739
  if (channel) {
15,758!
740
    // unregister channel
741
    channel->next->prev = channel->prev;
15,758✔
742
    channel->prev->next = channel->next;
15,758✔
743
    ret = vHashDrop(async->channelTable, channel);
15,758✔
744
    async->numChannels--;
15,758✔
745

746
    // cancel all waiting tasks
747
    for (int32_t i = 0; i < EVA_PRIORITY_MAX; i++) {
63,032✔
748
      while (channel->queue[i].next != &channel->queue[i]) {
47,286✔
749
        SVATask *task = channel->queue[i].next;
12✔
750
        task->prev->next = task->next;
12✔
751
        task->next->prev = task->prev;
12✔
752
        if (task->cancel) {
12!
753
          if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
12!
754
                                             .cancel = task->cancel,
12✔
755
                                             .arg = task->arg,
12✔
756
                                         }) == NULL) {
757
            vError("failed to push cancel info");
×
758
          };
759
        }
760
        vnodeAsyncTaskDone(async, task);
12✔
761
      }
762
    }
763

764
    // cancel or wait the scheduled task
765
    if (channel->scheduled == NULL || channel->scheduled->state == EVA_TASK_STATE_WAITTING) {
15,758✔
766
      if (channel->scheduled) {
15,708✔
767
        channel->scheduled->prev->next = channel->scheduled->next;
17✔
768
        channel->scheduled->next->prev = channel->scheduled->prev;
17✔
769
        if (channel->scheduled->cancel) {
17!
770
          if (taosArrayPush(cancelArray, &(SVATaskCancelInfo){
17!
771
                                             .cancel = channel->scheduled->cancel,
17✔
772
                                             .arg = channel->scheduled->arg,
17✔
773
                                         }) == NULL) {
774
            vError("failed to push cancel info");
×
775
          }
776
        }
777
        vnodeAsyncTaskDone(async, channel->scheduled);
17✔
778
      }
779
      taosMemoryFree(channel);
15,708✔
780
    } else {
781
      if (waitRunning) {
50!
782
        // wait task
783
        SVATask *task = channel->scheduled;
50✔
784
        task->numWait++;
50✔
785
        (void)taosThreadCondWait(&task->waitCond, &async->mutex);
50✔
786
        task->numWait--;
50✔
787
        if (task->numWait == 0) {
50!
788
          (void)taosThreadCondDestroy(&task->waitCond);
50✔
789
          taosMemoryFree(task);
50✔
790
        }
791

792
        taosMemoryFree(channel);
50✔
793
      } else {
794
        channel->state = EVA_CHANNEL_STATE_CLOSE;
×
795
      }
796
    }
797
  }
798

799
  (void)taosThreadMutexUnlock(&async->mutex);
15,758✔
800
  for (int32_t i = 0; i < taosArrayGetSize(cancelArray); i++) {
15,787✔
801
    SVATaskCancelInfo *cancel = (SVATaskCancelInfo *)taosArrayGet(cancelArray, i);
29✔
802
    cancel->cancel(cancel->arg);
29✔
803
  }
804
  taosArrayDestroy(cancelArray);
15,758✔
805

806
  channelID->async = 0;
15,757✔
807
  channelID->id = 0;
15,757✔
808
  return 0;
15,757✔
809
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc