• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4696

29 Aug 2025 06:36AM UTC coverage: 58.25% (+0.2%) from 58.041%
#4696

push

travis-ci

web-flow
fix(gpt): fix race-condition in preparing tmp files (#32800)

133424 of 291873 branches covered (45.71%)

Branch coverage included in aggregate %.

5 of 34 new or added lines in 6 files covered. (14.71%)

444 existing lines in 69 files now uncovered.

201767 of 283561 relevant lines covered (71.15%)

17907122.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.66
/source/util/src/tworker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tworker.h"
18
#include "taoserror.h"
19
#include "tcompare.h"
20
#include "tgeosctx.h"
21
#include "tlog.h"
22
#include "ttrace.h"
23

24
#define QUEUE_THRESHOLD (1000 * 1000)
25

26
typedef void *(*ThreadFp)(void *param);
27

28
int32_t tQWorkerInit(SQWorkerPool *pool) {
21,319✔
29
  int32_t code = taosOpenQset(&pool->qset);
21,319✔
30
  if (code) return code;
21,319!
31

32
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQueueWorker));
21,319!
33
  if (pool->workers == NULL) {
21,319!
34
    taosCloseQset(pool->qset);
×
35
    return terrno;
×
36
  }
37

38
  (void)taosThreadMutexInit(&pool->mutex, NULL);
21,319✔
39

40
  for (int32_t i = 0; i < pool->max; ++i) {
142,138✔
41
    SQueueWorker *worker = pool->workers + i;
120,819✔
42
    worker->id = i;
120,819✔
43
    worker->pool = pool;
120,819✔
44
  }
45

46
  uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
21,319!
47
  return 0;
21,319✔
48
}
49

50
void tQWorkerCleanup(SQWorkerPool *pool) {
21,319✔
51
  for (int32_t i = 0; i < pool->max; ++i) {
142,138✔
52
    SQueueWorker *worker = pool->workers + i;
120,819✔
53
    if (taosCheckPthreadValid(worker->thread)) {
120,819!
54
      taosQsetThreadResume(pool->qset);
120,819✔
55
    }
56
  }
57

58
  for (int32_t i = 0; i < pool->max; ++i) {
142,138✔
59
    SQueueWorker *worker = pool->workers + i;
120,819✔
60
    if (taosCheckPthreadValid(worker->thread)) {
120,819!
61
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
120,819!
62
      (void)taosThreadJoin(worker->thread, NULL);
120,819✔
63
      taosThreadClear(&worker->thread);
120,819✔
64
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
120,819!
65
    }
66
  }
67

68
  taosMemoryFreeClear(pool->workers);
21,319!
69
  taosCloseQset(pool->qset);
21,319✔
70
  (void)taosThreadMutexDestroy(&pool->mutex);
21,319✔
71

72
  uInfo("worker:%s is closed", pool->name);
21,319!
73
}
21,319✔
74

75
static void *tQWorkerThreadFp(SQueueWorker *worker) {
120,801✔
76
  SQWorkerPool *pool = worker->pool;
120,801✔
77
  SQueueInfo    qinfo = {0};
120,801✔
78
  void         *msg = NULL;
120,801✔
79
  int32_t       code = 0;
120,801✔
80

81
  int32_t ret = taosBlockSIGPIPE();
120,801✔
82
  if (ret < 0) {
120,795!
83
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
84
  }
85

86
  setThreadName(pool->name);
120,795✔
87
  worker->pid = taosGetSelfPthreadId();
120,788✔
88
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
120,786!
89

90
  while (1) {
91
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
5,445,891✔
92
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
120,640!
93
            worker->pid);
94
      break;
120,817✔
95
    }
96

97
    if (qinfo.timestamp != 0) {
5,323,444!
98
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
5,324,179✔
99
      if (cost > QUEUE_THRESHOLD) {
5,324,179✔
100
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
1,567!
101
      }
102
    }
103

104
    if (qinfo.fp != NULL) {
5,324,091✔
105
      qinfo.workerId = worker->id;
5,323,193✔
106
      qinfo.threadNum = pool->num;
5,323,193✔
107
      (*((FItem)qinfo.fp))(&qinfo, msg);
5,323,193✔
108
    }
109

110
    taosUpdateItemSize(qinfo.queue, 1);
5,325,768✔
111
  }
112

113
  DestoryThreadLocalRegComp();
120,817✔
114

115
  return NULL;
120,814✔
116
}
117

118
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
21,319✔
119
  int32_t     code;
120
  STaosQueue *queue;
121

122
  code = taosOpenQueue(&queue);
21,319✔
123
  if (code) {
21,319!
124
    terrno = code;
×
125
    return NULL;
×
126
  }
127

128
  (void)taosThreadMutexLock(&pool->mutex);
21,319✔
129
  taosSetQueueFp(queue, fp, NULL);
21,319✔
130
  code = taosAddIntoQset(pool->qset, queue, ahandle);
21,319✔
131
  if (code) {
21,319!
132
    taosCloseQueue(queue);
×
133
    (void)taosThreadMutexUnlock(&pool->mutex);
×
134
    terrno = code;
×
135
    return NULL;
×
136
  }
137

138
  // spawn a thread to process queue
139
  if (pool->num < pool->max) {
21,319!
140
    do {
141
      SQueueWorker *worker = pool->workers + pool->num;
120,819✔
142

143
      TdThreadAttr thAttr;
144
      (void)taosThreadAttrInit(&thAttr);
120,819✔
145
      (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
120,819✔
146

147
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
120,819!
148
        taosCloseQueue(queue);
×
149
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
150
        queue = NULL;
×
151
        break;
×
152
      }
153

154
      (void)taosThreadAttrDestroy(&thAttr);
120,819✔
155
      pool->num++;
120,819✔
156
      uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
120,819!
157
    } while (pool->num < pool->min);
120,819✔
158
  }
159

160
  (void)taosThreadMutexUnlock(&pool->mutex);
21,319✔
161
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
21,319!
162

163
  return queue;
21,319✔
164
}
165

166
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
21,319✔
167
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
21,319!
168
  taosCloseQueue(queue);
21,319✔
169
}
21,319✔
170

171
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
×
172
  int32_t code;
173

174
  code = taosOpenQset(&pool->qset);
×
175
  if (code) {
×
176
    return terrno = code;
×
177
  }
178

179
  pool->workers = taosArrayInit(2, sizeof(SQueueWorker *));
×
180
  if (pool->workers == NULL) {
×
181
    taosCloseQset(pool->qset);
×
182
    return terrno;
×
183
  }
184

185
  (void)taosThreadMutexInit(&pool->mutex, NULL);
×
186

187
  uInfo("worker:%s is initialized as auto", pool->name);
×
188
  return 0;
×
189
}
190

191
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
×
192
  int32_t size = taosArrayGetSize(pool->workers);
×
193
  for (int32_t i = 0; i < size; ++i) {
×
194
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
×
195
    if (taosCheckPthreadValid(worker->thread)) {
×
196
      taosQsetThreadResume(pool->qset);
×
197
    }
198
  }
199

200
  for (int32_t i = 0; i < size; ++i) {
×
201
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
×
202
    if (taosCheckPthreadValid(worker->thread)) {
×
203
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
×
204
      (void)taosThreadJoin(worker->thread, NULL);
×
205
      taosThreadClear(&worker->thread);
×
206
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
×
207
    }
208
    taosMemoryFree(worker);
×
209
  }
210

211
  taosArrayDestroy(pool->workers);
×
212
  taosCloseQset(pool->qset);
×
213
  (void)taosThreadMutexDestroy(&pool->mutex);
×
214

215
  uInfo("worker:%s is closed", pool->name);
×
216
}
×
217

218
static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
×
219
  SAutoQWorkerPool *pool = worker->pool;
×
220
  SQueueInfo        qinfo = {0};
×
221
  void             *msg = NULL;
×
222
  int32_t           code = 0;
×
223

224
  int32_t ret = taosBlockSIGPIPE();
×
225
  if (ret < 0) {
×
226
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
227
  }
228

229
  setThreadName(pool->name);
×
230
  worker->pid = taosGetSelfPthreadId();
×
231
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
×
232

233
  while (1) {
234
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
×
235
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
×
236
            worker->pid);
237
      break;
×
238
    }
239

240
    if (qinfo.timestamp != 0) {
×
241
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
×
242
      if (cost > QUEUE_THRESHOLD) {
×
243
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
×
244
      }
245
    }
246

247
    if (qinfo.fp != NULL) {
×
248
      qinfo.workerId = worker->id;
×
249
      qinfo.threadNum = taosArrayGetSize(pool->workers);
×
250
      (*((FItem)qinfo.fp))(&qinfo, msg);
×
251
    }
252

253
    taosUpdateItemSize(qinfo.queue, 1);
×
254
  }
255
  DestoryThreadLocalRegComp();
×
256

257
  return NULL;
×
258
}
259

260
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum) {
×
261
  int32_t     code;
262
  STaosQueue *queue;
263

264
  code = taosOpenQueue(&queue);
×
265
  if (code) {
×
266
    terrno = code;
×
267
    return NULL;
×
268
  }
269

270
  (void)taosThreadMutexLock(&pool->mutex);
×
271
  taosSetQueueFp(queue, fp, NULL);
×
272

273
  code = taosAddIntoQset(pool->qset, queue, ahandle);
×
274
  if (code) {
×
275
    taosCloseQueue(queue);
×
276
    (void)taosThreadMutexUnlock(&pool->mutex);
×
277
    terrno = code;
×
278
    return NULL;
×
279
  }
280

281
  int32_t queueNum = taosGetQueueNumber(pool->qset);
×
282
  int32_t curWorkerNum = taosArrayGetSize(pool->workers);
×
283
  int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
×
284

285
  if (dstWorkerNum < minNum) {
×
286
    dstWorkerNum = minNum;
×
287
  }
288

289
  // spawn a thread to process queue
290
  while (curWorkerNum < dstWorkerNum) {
×
291
    SQueueWorker *worker = taosMemoryCalloc(1, sizeof(SQueueWorker));
×
292
    if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
×
293
      uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
×
294
      taosMemoryFree(worker);
×
295
      taosCloseQueue(queue);
×
296
      (void)taosThreadMutexUnlock(&pool->mutex);
×
297
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
298
      return NULL;
×
299
    }
300
    worker->id = curWorkerNum;
×
301
    worker->pool = pool;
×
302

303
    TdThreadAttr thAttr;
304
    (void)taosThreadAttrInit(&thAttr);
×
305
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
306

307
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
×
308
      uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
×
309
      void *tmp = taosArrayPop(pool->workers);
×
310
      taosMemoryFree(worker);
×
311
      taosCloseQueue(queue);
×
312
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
313
      return NULL;
×
314
    }
315

316
    (void)taosThreadAttrDestroy(&thAttr);
×
317
    int32_t numOfThreads = taosArrayGetSize(pool->workers);
×
318
    uInfo("worker:%s:%d is launched, total:%d, expect:%d", pool->name, worker->id, numOfThreads, dstWorkerNum);
×
319

320
    curWorkerNum++;
×
321
  }
322

323
  (void)taosThreadMutexUnlock(&pool->mutex);
×
324
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
×
325

326
  return queue;
×
327
}
328

329
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
×
330
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
×
331
  taosCloseQueue(queue);
×
332
}
×
333

334
int32_t tWWorkerInit(SWWorkerPool *pool) {
58,875✔
335
  pool->nextId = 0;
58,875✔
336
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
58,875!
337
  if (pool->workers == NULL) {
58,873!
338
    return terrno;
×
339
  }
340

341
  (void)taosThreadMutexInit(&pool->mutex, NULL);
58,873✔
342

343
  for (int32_t i = 0; i < pool->max; ++i) {
186,457✔
344
    SWWorker *worker = pool->workers + i;
127,581✔
345
    worker->id = i;
127,581✔
346
    worker->qall = NULL;
127,581✔
347
    worker->qset = NULL;
127,581✔
348
    worker->pool = pool;
127,581✔
349
  }
350

351
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
58,876!
352
  return 0;
58,876✔
353
}
354

355
void tWWorkerCleanup(SWWorkerPool *pool) {
58,873✔
356
  for (int32_t i = 0; i < pool->max; ++i) {
186,453✔
357
    SWWorker *worker = pool->workers + i;
127,580✔
358
    if (taosCheckPthreadValid(worker->thread)) {
127,580✔
359
      if (worker->qset) {
74,447!
360
        taosQsetThreadResume(worker->qset);
74,448✔
361
      }
362
    }
363
  }
364

365
  for (int32_t i = 0; i < pool->max; ++i) {
186,455✔
366
    SWWorker *worker = pool->workers + i;
127,579✔
367
    if (taosCheckPthreadValid(worker->thread)) {
127,579✔
368
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
74,447!
369
      (void)taosThreadJoin(worker->thread, NULL);
74,451✔
370
      taosThreadClear(&worker->thread);
74,450✔
371
      taosFreeQall(worker->qall);
74,450✔
372
      taosCloseQset(worker->qset);
74,450✔
373
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
74,450!
374
    }
375
  }
376

377
  taosMemoryFreeClear(pool->workers);
58,876!
378
  (void)taosThreadMutexDestroy(&pool->mutex);
58,875✔
379

380
  uInfo("worker:%s is closed", pool->name);
58,875!
381
}
58,876✔
382

383
static void *tWWorkerThreadFp(SWWorker *worker) {
74,445✔
384
  SWWorkerPool *pool = worker->pool;
74,445✔
385
  SQueueInfo    qinfo = {0};
74,445✔
386
  void         *msg = NULL;
74,445✔
387
  int32_t       code = 0;
74,445✔
388
  int32_t       numOfMsgs = 0;
74,445✔
389

390
  int32_t ret = taosBlockSIGPIPE();
74,445✔
391
  if (ret < 0) {
74,439!
392
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
393
  }
394

395
  setThreadName(pool->name);
74,439✔
396
  worker->pid = taosGetSelfPthreadId();
74,432✔
397
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
74,437!
398

399
  while (1) {
400
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
33,211,677✔
401
    if (numOfMsgs == 0) {
33,200,638✔
402
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
73,436!
403
            worker->pid);
404
      break;
74,448✔
405
    }
406

407
    if (qinfo.timestamp != 0) {
33,127,202!
408
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
33,127,008✔
409
      if (cost > QUEUE_THRESHOLD) {
33,127,008✔
410
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
53!
411
      }
412
    }
413

414
    if (qinfo.fp != NULL) {
33,126,787✔
415
      qinfo.workerId = worker->id;
33,121,753✔
416
      qinfo.threadNum = pool->num;
33,121,753✔
417
      (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
33,121,753✔
418
    }
419
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
33,138,645✔
420
  }
421

422
  return NULL;
74,448✔
423
}
424

425
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
80,132✔
426
  (void)taosThreadMutexLock(&pool->mutex);
80,132✔
427
  SWWorker   *worker = pool->workers + pool->nextId;
80,132✔
428
  int32_t     code = -1;
80,132✔
429
  STaosQueue *queue;
430

431
  code = taosOpenQueue(&queue);
80,132✔
432
  if (code) goto _OVER;
80,127!
433

434
  taosSetQueueFp(queue, NULL, fp);
80,127✔
435
  if (worker->qset == NULL) {
80,126✔
436
    code = taosOpenQset(&worker->qset);
74,446✔
437
    if (code) goto _OVER;
74,449!
438

439
    code = taosAddIntoQset(worker->qset, queue, ahandle);
74,449✔
440
    if (code) goto _OVER;
74,450!
441
    code = taosAllocateQall(&worker->qall);
74,450✔
442
    if (code) goto _OVER;
74,449!
443

444
    TdThreadAttr thAttr;
445
    (void)taosThreadAttrInit(&thAttr);
74,449✔
446
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
74,446✔
447
    code = taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker);
74,446✔
448
    if ((code)) goto _OVER;
74,450!
449

450
    uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
74,450!
451
    pool->nextId = (pool->nextId + 1) % pool->max;
74,450✔
452

453
    (void)taosThreadAttrDestroy(&thAttr);
74,450✔
454
    pool->num++;
74,450✔
455
    if (pool->num > pool->max) pool->num = pool->max;
74,450!
456
  } else {
457
    code = taosAddIntoQset(worker->qset, queue, ahandle);
5,680✔
458
    if (code) goto _OVER;
5,682!
459
    pool->nextId = (pool->nextId + 1) % pool->max;
5,682✔
460
  }
461

462
_OVER:
80,132✔
463
  (void)taosThreadMutexUnlock(&pool->mutex);
80,132✔
464

465
  if (code) {
80,131!
466
    if (queue != NULL) taosCloseQueue(queue);
×
467
    if (worker->qset != NULL) taosCloseQset(worker->qset);
×
468
    if (worker->qall != NULL) taosFreeQall(worker->qall);
×
469
    terrno = code;
×
470
    return NULL;
×
471
  } else {
472
    while (worker->pid <= 0) taosMsleep(10);
127,803✔
473

474
    taosQueueSetThreadId(queue, worker->pid);
80,130✔
475
    uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, worker->pid);
80,131!
476
    return queue;
80,132✔
477
  }
478
}
479

480
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
80,128✔
481
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
80,128!
482
  taosCloseQueue(queue);
80,131✔
483
}
80,129✔
484

485
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
23,986✔
486
  int32_t code;
487
  pWorker->poolType = pCfg->poolType;
23,986✔
488
  pWorker->name = pCfg->name;
23,986✔
489
  pWorker->stopNoWaitQueue = pCfg->stopNoWaitQueue;
23,986✔
490

491
  switch (pCfg->poolType) {
23,986!
492
    case QWORKER_POOL: {
21,319✔
493
      SQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQWorkerPool));
21,319!
494
      if (!pPool) {
21,319!
495
        return terrno;
×
496
      }
497
      pPool->name = pCfg->name;
21,319✔
498
      pPool->min = pCfg->min;
21,319✔
499
      pPool->max = pCfg->max;
21,319✔
500
      pWorker->pool = pPool;
21,319✔
501
      if ((code = tQWorkerInit(pPool))) {
21,319!
502
        return (terrno = code);
×
503
      }
504

505
      pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
21,319✔
506
      if (pWorker->queue == NULL) {
21,319!
507
        return terrno;
×
508
      }
509
    } break;
21,319✔
510
    case QUERY_AUTO_QWORKER_POOL: {
2,667✔
511
      SQueryAutoQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPool));
2,667!
512
      if (!pPool) {
2,667!
513
        return terrno;
×
514
      }
515
      pPool->name = pCfg->name;
2,667✔
516
      pPool->min = pCfg->min;
2,667✔
517
      pPool->max = pCfg->max;
2,667✔
518
      pPool->stopNoWaitQueue = pCfg->stopNoWaitQueue;
2,667✔
519
      pWorker->pool = pPool;
2,667✔
520

521
      code = tQueryAutoQWorkerInit(pPool);
2,667✔
522
      if (code) return code;
2,667!
523

524
      pWorker->queue = tQueryAutoQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
2,667✔
525
      if (!pWorker->queue) {
2,667!
526
        return terrno;
×
527
      }
528
    } break;
2,667✔
529
    default:
×
530
      return TSDB_CODE_INVALID_PARA;
×
531
  }
532
  return 0;
23,986✔
533
}
534

535
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
23,986✔
536
  if (pWorker->queue == NULL) return;
23,986!
537
  if (!pWorker->stopNoWaitQueue) {
23,986✔
538
    while (!taosQueueEmpty(pWorker->queue)) {
24,677✔
539
      taosMsleep(10);
902✔
540
    }
541
  }
542

543
  switch (pWorker->poolType) {
23,986!
544
    case QWORKER_POOL:
21,319✔
545
      tQWorkerCleanup(pWorker->pool);
21,319✔
546
      tQWorkerFreeQueue(pWorker->pool, pWorker->queue);
21,319✔
547
      taosMemoryFree(pWorker->pool);
21,319!
548
      break;
21,319✔
549
    case QUERY_AUTO_QWORKER_POOL:
2,667✔
550
      tQueryAutoQWorkerCleanup(pWorker->pool);
2,667✔
551
      tQueryAutoQWorkerFreeQueue(pWorker->pool, pWorker->queue);
2,667✔
552
      taosMemoryFree(pWorker->pool);
2,667!
553
      break;
2,667✔
554
    default:
×
555
      break;
×
556
  }
557
}
558

559
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
52,126✔
560
  SWWorkerPool *pPool = &pWorker->pool;
52,126✔
561
  pPool->name = pCfg->name;
52,126✔
562
  pPool->max = pCfg->max;
52,126✔
563

564
  int32_t code = tWWorkerInit(pPool);
52,126✔
565
  if (code) return code;
52,128!
566

567
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
52,128✔
568
  if (pWorker->queue == NULL) {
52,128!
569
    return terrno;
×
570
  }
571

572
  pWorker->name = pCfg->name;
52,128✔
573
  return 0;
52,128✔
574
}
575

576
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
52,127✔
577
  if (pWorker->queue == NULL) return;
52,127!
578

579
  while (!taosQueueEmpty(pWorker->queue)) {
55,832✔
580
    taosMsleep(10);
3,709✔
581
  }
582

583
  tWWorkerCleanup(&pWorker->pool);
52,127✔
584
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
52,128✔
585
}
586

587
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool);
588
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p);
589
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p);
590
static void    tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool);
591
static bool    tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker);
592

593
#define GET_ACTIVE_N(int64_val)  (int32_t)((int64_val) >> 32)
594
#define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF)
595

596
static int32_t atomicFetchSubActive(int64_t *ptr, int32_t val) {
×
597
  int64_t acutalSubVal = val;
×
598
  acutalSubVal <<= 32;
×
599
  int64_t newVal64 = atomic_fetch_sub_64(ptr, acutalSubVal);
×
600
  return GET_ACTIVE_N(newVal64);
×
601
}
602

603
static int32_t atomicFetchSubRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_sub_64(ptr, val)); }
17,478,799✔
604

605
static int32_t atomicFetchAddActive(int64_t *ptr, int32_t val) {
470,072✔
606
  int64_t actualAddVal = val;
470,072✔
607
  actualAddVal <<= 32;
470,072✔
608
  int64_t newVal64 = atomic_fetch_add_64(ptr, actualAddVal);
470,072✔
609
  return GET_ACTIVE_N(newVal64);
470,072✔
610
}
611

612
static int32_t atomicFetchAddRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_add_64(ptr, val)); }
×
613

614
static bool atomicCompareExchangeActive(int64_t *ptr, int32_t *expectedVal, int32_t newVal) {
9✔
615
  int64_t oldVal64 = *expectedVal, newVal64 = newVal;
9✔
616
  int32_t running = GET_RUNNING_N(*ptr);
9✔
617
  oldVal64 <<= 32;
9✔
618
  newVal64 <<= 32;
9✔
619
  oldVal64 |= running;
9✔
620
  newVal64 |= running;
9✔
621
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
9✔
622
  if (actualNewVal64 == oldVal64) {
9!
623
    return true;
9✔
624
  } else {
625
    *expectedVal = GET_ACTIVE_N(actualNewVal64);
×
626
    return false;
×
627
  }
628
}
629

630
static int64_t atomicCompareExchangeRunning(int64_t *ptr, int32_t *expectedVal, int32_t newVal) {
×
631
  int64_t oldVal64 = *expectedVal, newVal64 = newVal;
×
632
  int64_t activeShifted = GET_ACTIVE_N(*ptr);
×
633
  activeShifted <<= 32;
×
634
  oldVal64 |= activeShifted;
×
635
  newVal64 |= activeShifted;
×
636
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
×
637
  if (actualNewVal64 == oldVal64) {
×
638
    return true;
×
639
  } else {
640
    *expectedVal = GET_RUNNING_N(actualNewVal64);
×
641
    return false;
×
642
  }
643
}
644

645
static int64_t atomicCompareExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive,
28,179,095✔
646
                                                     int32_t *expectedRunning, int32_t newRunning) {
647
  int64_t oldVal64 = *expectedActive, newVal64 = newActive;
28,179,095✔
648
  oldVal64 <<= 32;
28,179,095✔
649
  oldVal64 |= *expectedRunning;
28,179,095✔
650
  newVal64 <<= 32;
28,179,095✔
651
  newVal64 |= newRunning;
28,179,095✔
652
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
28,179,095✔
653
  if (actualNewVal64 == oldVal64) {
28,181,922✔
654
    return true;
28,105,894✔
655
  } else {
656
    *expectedActive = GET_ACTIVE_N(actualNewVal64);
76,028✔
657
    *expectedRunning = GET_RUNNING_N(actualNewVal64);
76,028✔
658
    return false;
76,028✔
659
  }
660
}
661

662
static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
471,635✔
663
  SQueryAutoQWorkerPool *pool = worker->pool;
471,635✔
664
  SQueueInfo             qinfo = {0};
471,635✔
665
  void                  *msg = NULL;
471,635✔
666
  int32_t                code = 0;
471,635✔
667

668
  int32_t ret = taosBlockSIGPIPE();
471,635✔
669
  if (ret < 0) {
471,631!
670
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
671
  }
672

673
  setThreadName(pool->name);
471,631✔
674
  worker->pid = taosGetSelfPthreadId();
471,592✔
675
  uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
471,527✔
676

677
  while (1) {
678
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
17,952,033✔
679
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
470,108!
680
            worker->pid);
681
      break;
470,765✔
682
    }
683

684
    if (pool->exit) {
17,483,869!
685
      uInfo("worker:%s:%d exit, thread:%08" PRId64, pool->name, worker->id, worker->pid);
×
686
      break;
×
687
    }
688

689
    if (qinfo.timestamp != 0) {
17,483,869!
690
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
17,482,541✔
691
      if (cost > QUEUE_THRESHOLD) {
17,482,541!
692
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
×
693
      }
694
    }
695

696
    tQueryAutoQWorkerWaitingCheck(pool);
17,482,484✔
697

698
    if (qinfo.fp != NULL) {
17,484,442!
699
      qinfo.workerId = worker->id;
17,484,464✔
700
      qinfo.threadNum = pool->num;
17,484,464✔
701
      qinfo.workerCb = pool->pCb;
17,484,464✔
702
      (*((FItem)qinfo.fp))(&qinfo, msg);
17,484,464✔
703
    }
704

705
    taosUpdateItemSize(qinfo.queue, 1);
17,474,148✔
706
    if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) {
17,482,651✔
707
      uDebug("worker:%s:%d exited", pool->name, worker->id);
142!
708
      break;
142✔
709
    }
710
  }
711

712
  DestoryThreadLocalRegComp();
470,907✔
713

714
  return NULL;
470,735✔
715
}
716

717
static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) {
22,790,184✔
718
  SQueryAutoQWorkerPool *pPool = p;
22,790,184✔
719
  bool                   ret = false;
22,790,184✔
720
  int32_t                waiting = pPool->waitingAfterBlockN;
22,790,184✔
721
  while (waiting > 0) {
22,790,184✔
722
    int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1);
7✔
723
    if (waitingNew == waiting) {
7!
724
      (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
7✔
725
      (void)taosThreadCondSignal(&pPool->waitingAfterBlockCond);
7✔
726
      (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
7✔
UNCOV
727
      ret = true;
×
UNCOV
728
      break;
×
729
    }
730
    waiting = waitingNew;
×
731
  }
732
  return ret;
22,789,968✔
733
}
734

735
static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void *p) {
22,790,577✔
736
  SQueryAutoQWorkerPool *pPool = p;
22,790,577✔
737
  bool                   ret = false;
22,790,577✔
738
  int32_t                waiting = pPool->waitingBeforeProcessMsgN;
22,790,577✔
739
  while (waiting > 0) {
22,790,577✔
740
    int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1);
9✔
741
    if (waitingNew == waiting) {
9!
742
      (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
9✔
743
      (void)taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond);
9✔
744
      (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
9✔
UNCOV
745
      ret = true;
×
UNCOV
746
      break;
×
747
    }
748
    waiting = waitingNew;
×
749
  }
750
  return ret;
22,790,425✔
751
}
752

753
static bool tQueryAutoQWorkerTryDecActive(void *p, int32_t minActive) {
22,790,735✔
754
  SQueryAutoQWorkerPool *pPool = p;
22,790,735✔
755
  bool                   ret = false;
22,790,735✔
756
  int64_t                val64 = pPool->activeRunningN;
22,790,735✔
757
  int32_t                active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64);
22,790,735✔
758
  while (active > minActive) {
22,794,508✔
759
    if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1))
5,314,360✔
760
      return true;
5,310,697✔
761
  }
762
  return false;
17,480,148✔
763
}
764

765
static void tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
17,482,496✔
766
  while (1) {
402✔
767
    int64_t val64 = pPool->activeRunningN;
17,482,496✔
768
    int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
17,482,496✔
769
    while (running < pPool->num) {
17,547,753!
770
      if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) {
17,551,926✔
771
        return;
17,484,553✔
772
      }
773
    }
774
    if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) {
×
775
      break;
×
776
    }
777
  }
778
  // to wait for process
779
  (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
×
780
  (void)atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1);
9✔
781
  if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock);
9!
782
  // recovered from waiting
783
  (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
9✔
784
  return;
9✔
785
}
786

787
bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) {
17,481,930✔
788
  if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) ||
34,962,992!
789
      tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) {
17,481,016✔
790
    (void)taosThreadMutexLock(&pPool->poolLock);
2,172,680✔
791
    if (pPool->exit) {
2,172,258!
792
      (void)taosThreadMutexUnlock(&pPool->poolLock);
×
793
      return false;
×
794
    }
795

796
    SListNode *pNode = listNode(pWorker);
2,172,258✔
797
    SListNode *tNode = tdListPopNode(pPool->workers, pNode);
2,172,258✔
798
    // reclaim some workers
799
    if (pWorker->id >= pPool->maxInUse) {
2,172,258!
800
      while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) {
×
801
        SListNode         *head = tdListPopHead(pPool->exitedWorkers);
×
802
        SQueryAutoQWorker *pWorker = (SQueryAutoQWorker *)head->data;
×
803
        if (pWorker && taosCheckPthreadValid(pWorker->thread)) {
×
804
          (void)taosThreadJoin(pWorker->thread, NULL);
×
805
          taosThreadClear(&pWorker->thread);
×
806
        }
807
        taosMemoryFree(head);
×
808
      }
809
      tdListAppendNode(pPool->exitedWorkers, pNode);
×
810
      (void)taosThreadMutexUnlock(&pPool->poolLock);
×
811
      return false;
×
812
    }
813

814
    // put back to backup pool
815
    tdListAppendNode(pPool->backupWorkers, pNode);
2,172,258✔
816
    (void)taosThreadMutexUnlock(&pPool->poolLock);
2,172,258✔
817

818
    // start to wait at backup cond
819
    (void)taosThreadMutexLock(&pPool->backupLock);
2,172,258✔
820
    (void)atomic_fetch_add_32(&pPool->backupNum, 1);
2,172,258✔
821
    if (!pPool->exit) (void)taosThreadCondWait(&pPool->backupCond, &pPool->backupLock);
2,172,258!
822
    (void)taosThreadMutexUnlock(&pPool->backupLock);
2,172,257✔
823

824
    // recovered from backup
825
    (void)taosThreadMutexLock(&pPool->poolLock);
2,172,247✔
826
    if (pPool->exit) {
2,172,258✔
827
      (void)taosThreadMutexUnlock(&pPool->poolLock);
142✔
828
      return false;
142✔
829
    }
830
    SListNode *tNode1 = tdListPopNode(pPool->backupWorkers, pNode);
2,172,116✔
831
    tdListAppendNode(pPool->workers, pNode);
2,172,115✔
832
    (void)taosThreadMutexUnlock(&pPool->poolLock);
2,172,116✔
833

834
    return true;
2,172,114✔
835
  } else {
836
    (void)atomicFetchSubRunning(&pPool->activeRunningN, 1);
15,308,141✔
837
    return true;
15,308,692✔
838
  }
839
}
840

841
int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
7,886✔
842
  int32_t code;
843

844
  pool->exit = false;
7,886✔
845

846
  (void)taosThreadMutexInit(&pool->poolLock, NULL);
7,886✔
847
  (void)taosThreadMutexInit(&pool->backupLock, NULL);
7,886✔
848
  (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL);
7,886✔
849
  (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL);
7,886✔
850

851
  (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL);
7,886✔
852
  (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL);
7,886✔
853
  (void)taosThreadCondInit(&pool->backupCond, NULL);
7,886✔
854

855
  code = taosOpenQset(&pool->qset);
7,886✔
856
  if (code) return terrno = code;
7,886!
857
  pool->workers = tdListNew(sizeof(SQueryAutoQWorker));
7,886✔
858
  if (!pool->workers) return terrno;
7,886!
859
  pool->backupWorkers = tdListNew(sizeof(SQueryAutoQWorker));
7,886✔
860
  if (!pool->backupWorkers) return terrno;
7,886!
861
  pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker));
7,886✔
862
  if (!pool->exitedWorkers) return terrno;
7,886!
863
  pool->maxInUse = pool->max * 2 + 2;
7,886✔
864

865
  if (!pool->pCb) {
7,886!
866
    pool->pCb = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPoolCB));
7,886!
867
    if (!pool->pCb) return terrno;
7,886!
868
    pool->pCb->pPool = pool;
7,886✔
869
    pool->pCb->beforeBlocking = tQueryAutoQWorkerBeforeBlocking;
7,886✔
870
    pool->pCb->afterRecoverFromBlocking = tQueryAutoQWorkerRecoverFromBlocking;
7,886✔
871
  }
872
  return TSDB_CODE_SUCCESS;
7,886✔
873
}
874

875
void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
7,887✔
876
  (void)taosThreadMutexLock(&pPool->poolLock);
7,887✔
877
  if (pPool->stopNoWaitQueue) {
7,887✔
878
    pPool->exit = true;
211✔
879
  }
880
  int32_t size = 0;
7,887✔
881
  if (pPool->workers) {
7,887✔
882
    size = listNEles(pPool->workers);
7,886✔
883
  }
884
  if (pPool->backupWorkers) {
7,887✔
885
    size += listNEles(pPool->backupWorkers);
7,886✔
886
  }
887
  if (pPool->qset) {
7,887✔
888
    for (int32_t i = 0; i < size; ++i) {
479,595✔
889
      taosQsetThreadResume(pPool->qset);
471,709✔
890
    }
891
  }
892
  (void)taosThreadMutexUnlock(&pPool->poolLock);
7,887✔
893

894
  (void)taosThreadMutexLock(&pPool->backupLock);
7,887✔
895
  (void)taosThreadCondBroadcast(&pPool->backupCond);
7,887✔
896
  (void)taosThreadMutexUnlock(&pPool->backupLock);
7,887✔
897

898
  (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
7,887✔
899
  (void)taosThreadCondBroadcast(&pPool->waitingAfterBlockCond);
7,887✔
900
  (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
7,887✔
901

902
  (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
7,887✔
903
  (void)taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond);
7,887✔
904
  (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
7,887✔
905

906
  int32_t            idx = 0;
7,887✔
907
  SQueryAutoQWorker *worker = NULL;
7,887✔
908
  while (pPool->workers) {
479,454✔
909
    (void)taosThreadMutexLock(&pPool->poolLock);
479,453✔
910
    if (listNEles(pPool->workers) == 0) {
479,453✔
911
      (void)taosThreadMutexUnlock(&pPool->poolLock);
7,886✔
912
      break;
7,886✔
913
    }
914
    SListNode *pNode = tdListPopHead(pPool->workers);
471,567✔
915
    uDebug("0free worker node:%p, prev:%p, next:%p", pNode, TD_DLIST_NODE_PREV(pNode), TD_DLIST_NODE_NEXT(pNode));
471,567✔
916
    worker = pNode ? (SQueryAutoQWorker *)pNode->data : NULL;
471,567!
917
    (void)taosThreadMutexUnlock(&pPool->poolLock);
471,567✔
918
    if (worker && taosCheckPthreadValid(worker->thread)) {
471,567!
919
      (void)taosThreadJoin(worker->thread, NULL);
471,567✔
920
      taosThreadClear(&worker->thread);
471,567✔
921
    }
922
    uDebug("free worker node:%p, prev:%p, next:%p", pNode, TD_DLIST_NODE_PREV(pNode), TD_DLIST_NODE_NEXT(pNode));
471,567✔
923

924
    taosMemoryFree(pNode);
471,567!
925
  }
926

927
  while (pPool->backupWorkers) {
8,029✔
928
    (void)taosThreadMutexLock(&pPool->poolLock);
8,028✔
929
    if (listNEles(pPool->backupWorkers) == 0) {
8,028✔
930
      (void)taosThreadMutexUnlock(&pPool->poolLock);
7,886✔
931
      break;
7,886✔
932
    }
933
    uDebug("backupworker head:%p, prev:%p, next:%p", TD_DLIST_HEAD(pPool->backupWorkers), TD_DLIST_NODE_PREV(TD_DLIST_HEAD(pPool->backupWorkers)), TD_DLIST_NODE_NEXT(TD_DLIST_HEAD(pPool->backupWorkers)));
142!
934
    SListNode *pNode = tdListPopHead(pPool->backupWorkers);
142✔
935
    worker = pNode ? (SQueryAutoQWorker *)pNode->data : NULL;
142!
936
    (void)taosThreadMutexUnlock(&pPool->poolLock);
142✔
937

938
    if (worker && taosCheckPthreadValid(worker->thread)) {
142!
939
      (void)taosThreadJoin(worker->thread, NULL);
142✔
940
      taosThreadClear(&worker->thread);
142✔
941
    }
942
    taosMemoryFree(pNode);
142!
943
  }
944

945
  while (pPool->exitedWorkers) {
7,887✔
946
    (void)taosThreadMutexLock(&pPool->poolLock);
7,886✔
947
    if (listNEles(pPool->exitedWorkers) == 0) {
7,886!
948
      (void)taosThreadMutexUnlock(&pPool->poolLock);
7,886✔
949
      break;
7,886✔
950
    }
951

952
    SListNode *pNode = tdListPopHead(pPool->exitedWorkers);
×
953
    worker = pNode ? (SQueryAutoQWorker *)pNode->data : NULL;
×
954
    (void)taosThreadMutexUnlock(&pPool->poolLock);
×
955

956
    if (worker && taosCheckPthreadValid(worker->thread)) {
×
957
      (void)taosThreadJoin(worker->thread, NULL);
×
958
      taosThreadClear(&worker->thread);
×
959
    }
960
    taosMemoryFree(pNode);
×
961
  }
962

963
  (void)taosThreadMutexLock(&pPool->poolLock);
7,887✔
964
  pPool->workers = tdListFree(pPool->workers);
7,887✔
965
  pPool->backupWorkers = tdListFree(pPool->backupWorkers);
7,887✔
966
  pPool->exitedWorkers = tdListFree(pPool->exitedWorkers);
7,887✔
967
  taosMemoryFree(pPool->pCb);
7,887!
968
  (void)taosThreadMutexUnlock(&pPool->poolLock);
7,887✔
969

970
  (void)taosThreadMutexDestroy(&pPool->poolLock);
7,887✔
971
  (void)taosThreadMutexDestroy(&pPool->backupLock);
7,887✔
972
  (void)taosThreadMutexDestroy(&pPool->waitingAfterBlockLock);
7,887✔
973
  (void)taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock);
7,887✔
974

975
  (void)taosThreadCondDestroy(&pPool->backupCond);
7,887✔
976
  (void)taosThreadCondDestroy(&pPool->waitingAfterBlockCond);
7,887✔
977
  (void)taosThreadCondDestroy(&pPool->waitingBeforeProcessMsgCond);
7,887✔
978
  taosCloseQset(pPool->qset);
7,887✔
979
}
7,887✔
980

981
STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahandle, FItem fp) {
18,514✔
982
  STaosQueue *queue;
983
  int32_t     code = taosOpenQueue(&queue);
18,514✔
984
  if (code) {
18,514!
985
    terrno = code;
×
986
    return NULL;
×
987
  }
988

989
  (void)taosThreadMutexLock(&pool->poolLock);
18,514✔
990
  taosSetQueueFp(queue, fp, NULL);
18,514✔
991
  code = taosAddIntoQset(pool->qset, queue, ahandle);
18,514✔
992
  if (code) {
18,514!
993
    taosCloseQueue(queue);
×
994
    queue = NULL;
×
995
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
996
    return NULL;
×
997
  }
998
  SQueryAutoQWorker  worker = {0};
18,514✔
999
  SQueryAutoQWorker *pWorker = NULL;
18,514✔
1000

1001
  // spawn a thread to process queue
1002
  if (pool->num < pool->max) {
18,514✔
1003
    do {
1004
      worker.id = listNEles(pool->workers);
470,072✔
1005
      worker.backupIdx = -1;
470,072✔
1006
      worker.pool = pool;
470,072✔
1007
      SListNode *pNode = tdListAdd(pool->workers, &worker);
470,072✔
1008
      if (!pNode) {
470,072!
1009
        taosCloseQueue(queue);
×
1010
        queue = NULL;
×
1011
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1012
        break;
×
1013
      }
1014
      pWorker = (SQueryAutoQWorker *)pNode->data;
470,072✔
1015

1016
      TdThreadAttr thAttr;
1017
      (void)taosThreadAttrInit(&thAttr);
470,072✔
1018
      (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
470,072✔
1019

1020
      if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
470,072!
1021
        taosCloseQueue(queue);
×
1022
        queue = NULL;
×
1023
        break;
×
1024
      }
1025

1026
      (void)taosThreadAttrDestroy(&thAttr);
470,072✔
1027
      pool->num++;
470,072✔
1028
      (void)atomicFetchAddActive(&pool->activeRunningN, 1);
470,072✔
1029
      uInfo("worker:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num);
470,072!
1030
    } while (pool->num < pool->min);
470,072✔
1031
  }
1032

1033
  (void)taosThreadMutexUnlock(&pool->poolLock);
18,514✔
1034
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
18,514!
1035

1036
  return queue;
18,514✔
1037
}
1038

1039
void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool *pPool, STaosQueue *pQ) { taosCloseQueue(pQ); }
15,698✔
1040

1041
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool) {
2,172,148✔
1042
  // try backup pool
1043
  int32_t backup = pool->backupNum;
2,172,148✔
1044
  while (backup > 0) {
2,172,653✔
1045
    int32_t backupNew = atomic_val_compare_exchange_32(&pool->backupNum, backup, backup - 1);
2,171,000✔
1046
    if (backupNew == backup) {
2,171,115✔
1047
      (void)taosThreadCondSignal(&pool->backupCond);
2,170,610✔
1048
      return TSDB_CODE_SUCCESS;
2,170,607✔
1049
    }
1050
    backup = backupNew;
505✔
1051
  }
1052
  // backup pool is empty, create new
1053
  SQueryAutoQWorker *pWorker = NULL;
1,653✔
1054
  SQueryAutoQWorker  worker = {0};
1,653✔
1055
  worker.pool = pool;
1,653✔
1056
  worker.backupIdx = -1;
1,653✔
1057
  (void)taosThreadMutexLock(&pool->poolLock);
1,653✔
1058
  if (pool->exit) {
1,637!
1059
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
1060
    return TSDB_CODE_SUCCESS;
×
1061
  }
1062
  worker.id = listNEles(pool->workers);
1,637✔
1063
  SListNode *pNode = tdListAdd(pool->workers, &worker);
1,637✔
1064
  if (!pNode) {
1,637!
1065
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
1066
    return terrno;
×
1067
  }
1068
  (void)taosThreadMutexUnlock(&pool->poolLock);
1,637✔
1069
  pWorker = (SQueryAutoQWorker *)pNode->data;
1,637✔
1070

1071
  TdThreadAttr thAttr;
1072
  (void)taosThreadAttrInit(&thAttr);
1,637✔
1073
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,637✔
1074

1075
  if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
1,637!
1076
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1077
    return terrno;
×
1078
  }
1079
  (void)taosThreadAttrDestroy(&thAttr);
1,637✔
1080

1081
  return TSDB_CODE_SUCCESS;
1,637✔
1082
}
1083

1084
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) {
5,308,525✔
1085
  SQueryAutoQWorkerPool *pPool = p;
5,308,525✔
1086
  if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(p) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(p) ||
10,618,945!
1087
      tQueryAutoQWorkerTryDecActive(p, pPool->num)) {
5,309,816✔
1088
  } else {
1089
    int32_t code = tQueryAutoQWorkerAddWorker(pPool);
2,172,192✔
1090
    if (code != TSDB_CODE_SUCCESS) {
2,172,237!
1091
      return code;
×
1092
    }
1093
    (void)atomicFetchSubRunning(&pPool->activeRunningN, 1);
2,172,237✔
1094
  }
1095

1096
  return TSDB_CODE_SUCCESS;
5,310,693✔
1097
}
1098

1099
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) {
5,310,555✔
1100
  SQueryAutoQWorkerPool *pPool = p;
5,310,555✔
1101
  int64_t                val64 = pPool->activeRunningN;
5,310,555✔
1102
  int32_t                running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
5,310,555✔
1103
  while (running < pPool->num) {
5,313,804!
1104
    if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) {
5,314,054✔
1105
      return TSDB_CODE_SUCCESS;
5,310,701✔
1106
    }
1107
  }
1108
  (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
×
1109
  (void)atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1);
7✔
1110
  if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock);
7!
1111
  (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
7✔
1112
  if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT;
7!
1113
  return TSDB_CODE_SUCCESS;
7✔
1114
}
1115

1116
int32_t tDispatchWorkerInit(SDispatchWorkerPool *pPool) {
4,569✔
1117
  int32_t code = 0;
4,569✔
1118
  pPool->num = 0;
4,569✔
1119
  pPool->pWorkers = taosMemCalloc(pPool->max, sizeof(SDispatchWorker));
4,569✔
1120
  if (!pPool->pWorkers) return terrno;
4,569!
1121
  (void)taosThreadMutexInit(&pPool->poolLock, NULL);
4,569✔
1122
  return code;
4,569✔
1123
}
1124

1125
static void *tDispatchWorkerThreadFp(SDispatchWorker *pWorker) {
30,081✔
1126
  SDispatchWorkerPool *pPool = pWorker->pool;
30,081✔
1127
  SQueueInfo qinfo = {0};
30,081✔
1128
  int32_t code = 0;
30,081✔
1129
  void *msg = NULL;
30,081✔
1130

1131
  int32_t ret = taosBlockSIGPIPE();
30,081✔
1132
  if (ret < 0) {
30,081!
1133
    uError("worker:%s:%d failed to block SIGPIPE", pPool->name, pWorker->id);
×
1134
  }
1135

1136
  setThreadName(pPool->name);
30,081✔
1137
  pWorker->pid = taosGetSelfPthreadId();
30,079✔
1138
  uInfo("worker:%s:%d is running, thread:%d", pPool->name, pWorker->id, pWorker->pid);
30,077!
1139

1140
  while (1) {
1141
    if (taosReadQitemFromQset(pWorker->qset, (void **)&msg, &qinfo) == 0) {
197,459✔
1142
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%d", pPool->name, pWorker->id,
29,578!
1143
            pWorker->qset, pWorker->pid);
1144
      break;
30,083✔
1145
    }
1146

1147
    if (qinfo.timestamp != 0) {
167,101!
1148
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
167,314✔
1149
      if (cost > QUEUE_THRESHOLD) {
167,314!
1150
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pPool->name, cost / QUEUE_THRESHOLD);
×
1151
      }
1152
    }
1153

1154
    if (qinfo.fp != NULL) {
167,131!
1155
      qinfo.workerId = pWorker->id;
167,284✔
1156
      qinfo.threadNum = pPool->num;
167,284✔
1157
      (*((FItem)qinfo.fp))(&qinfo, msg);
167,284✔
1158
    }
1159
  }
1160
  DestoryThreadLocalRegComp();
30,083✔
1161
  return NULL;
30,083✔
1162
}
1163

1164
int32_t tDispatchWorkerAllocQueue(SDispatchWorkerPool *pPool, void *ahandle, FItem fp, DispatchFp dispatchFp) {
4,569✔
1165
  int32_t code = 0;
4,569✔
1166
  SDispatchWorker* pWorker = NULL;
4,569✔
1167
  (void)taosThreadMutexLock(&pPool->poolLock);
4,569✔
1168
  pPool->dispatchFp = dispatchFp;
4,569✔
1169
  for (int32_t i = pPool->num; i < pPool->max; ++i) {
34,652✔
1170
    pWorker = pPool->pWorkers + i;
30,083✔
1171
    pWorker->id = pPool->num;
30,083✔
1172
    pWorker->pool = pPool;
30,083✔
1173
    pPool->num++;
30,083✔
1174
    code = taosOpenQset(&pWorker->qset);
30,083✔
1175
    if (code != 0) break;
30,083!
1176
    code = taosOpenQueue(&pWorker->queue);
30,083✔
1177
    if (code != 0) break;
30,083!
1178
    taosSetQueueFp(pWorker->queue, fp, ahandle);
30,083✔
1179
    code = taosAddIntoQset(pWorker->qset, pWorker->queue, ahandle);
30,083✔
1180
    if (code != 0) break;
30,083!
1181

1182
    TdThreadAttr thAttr;
1183
    (void)taosThreadAttrInit(&thAttr);
30,083✔
1184
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
30,083✔
1185

1186
    if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tDispatchWorkerThreadFp, pWorker) != 0) {
30,083!
1187
      code = terrno;
×
1188
      (void)taosThreadAttrDestroy(&thAttr);
×
1189
      break;
×
1190
    }
1191
    (void)taosThreadAttrDestroy(&thAttr);
30,083✔
1192
    uInfo("worker:%s:%d is launched, threadId:%" PRId64 ", total:%d", pPool->name, pWorker->id, taosGetPthreadId(pWorker->thread), pPool->num);
30,083!
1193
  }
1194

1195
  (void)taosThreadMutexUnlock(&pPool->poolLock);
4,569✔
1196
  if (code == 0) uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pPool->name, pWorker->queue, ahandle);
4,569!
1197
  return code;
4,569✔
1198
}
1199

1200
static void tDispatchWorkerFreeQueue(SDispatchWorkerPool *pPool) {
4,569✔
1201
  (void)taosThreadMutexLock(&pPool->poolLock);
4,569✔
1202
  if (!pPool->pWorkers) return;
4,569!
1203
  for (int32_t i = 0; i < pPool->num; ++i) {
34,652✔
1204
    SDispatchWorker *pWorker = pPool->pWorkers + i;
30,083✔
1205
    if (pWorker->queue) {
30,083!
1206
      taosCloseQueue(pWorker->queue);
30,083✔
1207
      pWorker->queue = NULL;
30,083✔
1208
    }
1209
    if (pWorker->qset) {
30,083!
1210
      taosCloseQset(pWorker->qset);
30,083✔
1211
      pWorker->qset = NULL;
30,083✔
1212
    }
1213
  }
1214
  (void)taosThreadMutexUnlock(&pPool->poolLock);
4,569✔
1215
}
1216

1217
void tDispatchWorkerCleanup(SDispatchWorkerPool *pPool) {
4,569✔
1218
  (void)taosThreadMutexLock(&pPool->poolLock);
4,569✔
1219
  pPool->exit = true;
4,569✔
1220
  if (pPool->pWorkers) {
4,569!
1221
    for (int32_t i = 0; i < pPool->num; ++i) {
34,652✔
1222
      SDispatchWorker *pWorker = pPool->pWorkers + i;
30,083✔
1223
      if (pWorker->qset) {
30,083!
1224
        taosQsetThreadResume(pWorker->qset);
30,083✔
1225
      }
1226
    }
1227
  }
1228
  (void)taosThreadMutexUnlock(&pPool->poolLock);
4,569✔
1229

1230
  if (pPool->pWorkers) {
4,569!
1231
    for (int32_t i = 0; i < pPool->num; ++i) {
34,652✔
1232
      SDispatchWorker *pWorker = pPool->pWorkers + i;
30,083✔
1233
      if (taosCheckPthreadValid(pWorker->thread)) {
30,083!
1234
        (void)taosThreadJoin(pWorker->thread, NULL);
30,083✔
1235
        taosThreadClear(&pWorker->thread);
30,083✔
1236
      }
1237
    }
1238
  }
1239
  tDispatchWorkerFreeQueue(pPool);
4,569✔
1240
  taosMemoryFreeClear(pPool->pWorkers);
4,569!
1241
  (void)taosThreadMutexDestroy(&pPool->poolLock);
4,569✔
1242
}
4,569✔
1243

1244
int32_t tAddTaskIntoDispatchWorkerPool(SDispatchWorkerPool *pPool, void *pMsg) {
167,353✔
1245
  int32_t code = 0;
167,353✔
1246
  int32_t idx = 0;
167,353✔
1247
  SDispatchWorker *pWorker = NULL;
167,353✔
1248
  (void)taosThreadMutexLock(&pPool->poolLock);
167,353✔
1249
  code = pPool->dispatchFp(pPool, pMsg, &idx);
167,386✔
1250
  if (code == 0) {
167,386!
1251
    pWorker = pPool->pWorkers + idx;
167,386✔
1252
    if (pWorker->queue) {
167,386!
1253
      code = taosWriteQitem(pWorker->queue, pMsg);
167,386✔
1254
    } else {
1255
      code = TSDB_CODE_INTERNAL_ERROR;
×
1256
    }
1257
  }
1258
  (void)taosThreadMutexUnlock(&pPool->poolLock);
167,386✔
1259
  if (code != 0) {
167,385!
1260
    uError("worker:%s, failed to add task into dispatch worker pool, code:%d", pPool->name, code);
×
1261
  } else {
1262
    uDebug("msg %p dispatch to the %dth worker, threadId:%" PRId64, pMsg, idx, taosGetPthreadId(pWorker->thread));
167,385✔
1263
  }
1264
  return code;
167,385✔
1265
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc