• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4674

18 Aug 2025 07:58AM UTC coverage: 59.821% (+0.1%) from 59.715%
#4674

push

travis-ci

web-flow
test: update case desc (#32551)

136937 of 292075 branches covered (46.88%)

Branch coverage included in aggregate %.

207916 of 284395 relevant lines covered (73.11%)

4553289.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.53
/source/util/src/tworker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tworker.h"
18
#include "taoserror.h"
19
#include "tcompare.h"
20
#include "tgeosctx.h"
21
#include "tlog.h"
22
#include "ttrace.h"
23

24
#define QUEUE_THRESHOLD (1000 * 1000)
25

26
typedef void *(*ThreadFp)(void *param);
27

28
int32_t tQWorkerInit(SQWorkerPool *pool) {
24,448✔
29
  int32_t code = taosOpenQset(&pool->qset);
24,448✔
30
  if (code) return code;
24,448!
31

32
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQueueWorker));
24,448!
33
  if (pool->workers == NULL) {
24,448!
34
    taosCloseQset(pool->qset);
×
35
    return terrno;
×
36
  }
37

38
  (void)taosThreadMutexInit(&pool->mutex, NULL);
24,448✔
39

40
  for (int32_t i = 0; i < pool->max; ++i) {
161,443✔
41
    SQueueWorker *worker = pool->workers + i;
136,995✔
42
    worker->id = i;
136,995✔
43
    worker->pool = pool;
136,995✔
44
  }
45

46
  uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
24,448!
47
  return 0;
24,448✔
48
}
49

50
void tQWorkerCleanup(SQWorkerPool *pool) {
24,448✔
51
  for (int32_t i = 0; i < pool->max; ++i) {
161,443✔
52
    SQueueWorker *worker = pool->workers + i;
136,995✔
53
    if (taosCheckPthreadValid(worker->thread)) {
136,995!
54
      taosQsetThreadResume(pool->qset);
136,995✔
55
    }
56
  }
57

58
  for (int32_t i = 0; i < pool->max; ++i) {
161,443✔
59
    SQueueWorker *worker = pool->workers + i;
136,995✔
60
    if (taosCheckPthreadValid(worker->thread)) {
136,995!
61
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
136,995!
62
      (void)taosThreadJoin(worker->thread, NULL);
136,995✔
63
      taosThreadClear(&worker->thread);
136,995✔
64
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
136,995!
65
    }
66
  }
67

68
  taosMemoryFreeClear(pool->workers);
24,448!
69
  taosCloseQset(pool->qset);
24,448✔
70
  (void)taosThreadMutexDestroy(&pool->mutex);
24,448✔
71

72
  uInfo("worker:%s is closed", pool->name);
24,448!
73
}
24,448✔
74

75
static void *tQWorkerThreadFp(SQueueWorker *worker) {
136,974✔
76
  SQWorkerPool *pool = worker->pool;
136,974✔
77
  SQueueInfo    qinfo = {0};
136,974✔
78
  void         *msg = NULL;
136,974✔
79
  int32_t       code = 0;
136,974✔
80

81
  int32_t ret = taosBlockSIGPIPE();
136,974✔
82
  if (ret < 0) {
136,965!
83
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
84
  }
85

86
  setThreadName(pool->name);
136,965✔
87
  worker->pid = taosGetSelfPthreadId();
136,965✔
88
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
136,954!
89

90
  while (1) {
91
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
1,906,582✔
92
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
136,805!
93
            worker->pid);
94
      break;
136,991✔
95
    }
96

97
    if (qinfo.timestamp != 0) {
1,768,746!
98
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
1,768,965✔
99
      if (cost > QUEUE_THRESHOLD) {
1,768,965✔
100
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
1,909!
101
      }
102
    }
103

104
    if (qinfo.fp != NULL) {
1,768,879✔
105
      qinfo.workerId = worker->id;
1,768,694✔
106
      qinfo.threadNum = pool->num;
1,768,694✔
107
      (*((FItem)qinfo.fp))(&qinfo, msg);
1,768,694✔
108
    }
109

110
    taosUpdateItemSize(qinfo.queue, 1);
1,769,745✔
111
  }
112

113
  DestoryThreadLocalRegComp();
136,991✔
114

115
  return NULL;
136,993✔
116
}
117

118
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
24,448✔
119
  int32_t     code;
120
  STaosQueue *queue;
121

122
  code = taosOpenQueue(&queue);
24,448✔
123
  if (code) {
24,448!
124
    terrno = code;
×
125
    return NULL;
×
126
  }
127

128
  (void)taosThreadMutexLock(&pool->mutex);
24,448✔
129
  taosSetQueueFp(queue, fp, NULL);
24,448✔
130
  code = taosAddIntoQset(pool->qset, queue, ahandle);
24,448✔
131
  if (code) {
24,448!
132
    taosCloseQueue(queue);
×
133
    (void)taosThreadMutexUnlock(&pool->mutex);
×
134
    terrno = code;
×
135
    return NULL;
×
136
  }
137

138
  // spawn a thread to process queue
139
  if (pool->num < pool->max) {
24,448!
140
    do {
141
      SQueueWorker *worker = pool->workers + pool->num;
136,995✔
142

143
      TdThreadAttr thAttr;
144
      (void)taosThreadAttrInit(&thAttr);
136,995✔
145
      (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
136,995✔
146

147
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
136,995!
148
        taosCloseQueue(queue);
×
149
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
150
        queue = NULL;
×
151
        break;
×
152
      }
153

154
      (void)taosThreadAttrDestroy(&thAttr);
136,995✔
155
      pool->num++;
136,995✔
156
      uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
136,995!
157
    } while (pool->num < pool->min);
136,995✔
158
  }
159

160
  (void)taosThreadMutexUnlock(&pool->mutex);
24,448✔
161
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
24,448!
162

163
  return queue;
24,448✔
164
}
165

166
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
24,448✔
167
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
24,448!
168
  taosCloseQueue(queue);
24,448✔
169
}
24,448✔
170

171
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
×
172
  int32_t code;
173

174
  code = taosOpenQset(&pool->qset);
×
175
  if (code) {
×
176
    return terrno = code;
×
177
  }
178

179
  pool->workers = taosArrayInit(2, sizeof(SQueueWorker *));
×
180
  if (pool->workers == NULL) {
×
181
    taosCloseQset(pool->qset);
×
182
    return terrno;
×
183
  }
184

185
  (void)taosThreadMutexInit(&pool->mutex, NULL);
×
186

187
  uInfo("worker:%s is initialized as auto", pool->name);
×
188
  return 0;
×
189
}
190

191
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
×
192
  int32_t size = taosArrayGetSize(pool->workers);
×
193
  for (int32_t i = 0; i < size; ++i) {
×
194
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
×
195
    if (taosCheckPthreadValid(worker->thread)) {
×
196
      taosQsetThreadResume(pool->qset);
×
197
    }
198
  }
199

200
  for (int32_t i = 0; i < size; ++i) {
×
201
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
×
202
    if (taosCheckPthreadValid(worker->thread)) {
×
203
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
×
204
      (void)taosThreadJoin(worker->thread, NULL);
×
205
      taosThreadClear(&worker->thread);
×
206
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
×
207
    }
208
    taosMemoryFree(worker);
×
209
  }
210

211
  taosArrayDestroy(pool->workers);
×
212
  taosCloseQset(pool->qset);
×
213
  (void)taosThreadMutexDestroy(&pool->mutex);
×
214

215
  uInfo("worker:%s is closed", pool->name);
×
216
}
×
217

218
static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
×
219
  SAutoQWorkerPool *pool = worker->pool;
×
220
  SQueueInfo        qinfo = {0};
×
221
  void             *msg = NULL;
×
222
  int32_t           code = 0;
×
223

224
  int32_t ret = taosBlockSIGPIPE();
×
225
  if (ret < 0) {
×
226
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
227
  }
228

229
  setThreadName(pool->name);
×
230
  worker->pid = taosGetSelfPthreadId();
×
231
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
×
232

233
  while (1) {
234
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
×
235
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
×
236
            worker->pid);
237
      break;
×
238
    }
239

240
    if (qinfo.timestamp != 0) {
×
241
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
×
242
      if (cost > QUEUE_THRESHOLD) {
×
243
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
×
244
      }
245
    }
246

247
    if (qinfo.fp != NULL) {
×
248
      qinfo.workerId = worker->id;
×
249
      qinfo.threadNum = taosArrayGetSize(pool->workers);
×
250
      (*((FItem)qinfo.fp))(&qinfo, msg);
×
251
    }
252

253
    taosUpdateItemSize(qinfo.queue, 1);
×
254
  }
255
  DestoryThreadLocalRegComp();
×
256

257
  return NULL;
×
258
}
259

260
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum) {
×
261
  int32_t     code;
262
  STaosQueue *queue;
263

264
  code = taosOpenQueue(&queue);
×
265
  if (code) {
×
266
    terrno = code;
×
267
    return NULL;
×
268
  }
269

270
  (void)taosThreadMutexLock(&pool->mutex);
×
271
  taosSetQueueFp(queue, fp, NULL);
×
272

273
  code = taosAddIntoQset(pool->qset, queue, ahandle);
×
274
  if (code) {
×
275
    taosCloseQueue(queue);
×
276
    (void)taosThreadMutexUnlock(&pool->mutex);
×
277
    terrno = code;
×
278
    return NULL;
×
279
  }
280

281
  int32_t queueNum = taosGetQueueNumber(pool->qset);
×
282
  int32_t curWorkerNum = taosArrayGetSize(pool->workers);
×
283
  int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
×
284

285
  if (dstWorkerNum < minNum) {
×
286
    dstWorkerNum = minNum;
×
287
  }
288

289
  // spawn a thread to process queue
290
  while (curWorkerNum < dstWorkerNum) {
×
291
    SQueueWorker *worker = taosMemoryCalloc(1, sizeof(SQueueWorker));
×
292
    if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
×
293
      uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
×
294
      taosMemoryFree(worker);
×
295
      taosCloseQueue(queue);
×
296
      (void)taosThreadMutexUnlock(&pool->mutex);
×
297
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
298
      return NULL;
×
299
    }
300
    worker->id = curWorkerNum;
×
301
    worker->pool = pool;
×
302

303
    TdThreadAttr thAttr;
304
    (void)taosThreadAttrInit(&thAttr);
×
305
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
306

307
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
×
308
      uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
×
309
      void *tmp = taosArrayPop(pool->workers);
×
310
      taosMemoryFree(worker);
×
311
      taosCloseQueue(queue);
×
312
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
313
      return NULL;
×
314
    }
315

316
    (void)taosThreadAttrDestroy(&thAttr);
×
317
    int32_t numOfThreads = taosArrayGetSize(pool->workers);
×
318
    uInfo("worker:%s:%d is launched, total:%d, expect:%d", pool->name, worker->id, numOfThreads, dstWorkerNum);
×
319

320
    curWorkerNum++;
×
321
  }
322

323
  (void)taosThreadMutexUnlock(&pool->mutex);
×
324
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
×
325

326
  return queue;
×
327
}
328

329
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
×
330
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
×
331
  taosCloseQueue(queue);
×
332
}
×
333

334
int32_t tWWorkerInit(SWWorkerPool *pool) {
57,898✔
335
  pool->nextId = 0;
57,898✔
336
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
57,898!
337
  if (pool->workers == NULL) {
57,898!
338
    return terrno;
×
339
  }
340

341
  (void)taosThreadMutexInit(&pool->mutex, NULL);
57,898✔
342

343
  for (int32_t i = 0; i < pool->max; ++i) {
193,822✔
344
    SWWorker *worker = pool->workers + i;
135,924✔
345
    worker->id = i;
135,924✔
346
    worker->qall = NULL;
135,924✔
347
    worker->qset = NULL;
135,924✔
348
    worker->pool = pool;
135,924✔
349
  }
350

351
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
57,898!
352
  return 0;
57,898✔
353
}
354

355
void tWWorkerCleanup(SWWorkerPool *pool) {
57,894✔
356
  for (int32_t i = 0; i < pool->max; ++i) {
193,814✔
357
    SWWorker *worker = pool->workers + i;
135,920✔
358
    if (taosCheckPthreadValid(worker->thread)) {
135,920✔
359
      if (worker->qset) {
73,705!
360
        taosQsetThreadResume(worker->qset);
73,706✔
361
      }
362
    }
363
  }
364

365
  for (int32_t i = 0; i < pool->max; ++i) {
193,818✔
366
    SWWorker *worker = pool->workers + i;
135,920✔
367
    if (taosCheckPthreadValid(worker->thread)) {
135,920✔
368
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
73,707!
369
      (void)taosThreadJoin(worker->thread, NULL);
73,710✔
370
      taosThreadClear(&worker->thread);
73,711✔
371
      taosFreeQall(worker->qall);
73,711✔
372
      taosCloseQset(worker->qset);
73,711✔
373
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
73,711!
374
    }
375
  }
376

377
  taosMemoryFreeClear(pool->workers);
57,898!
378
  (void)taosThreadMutexDestroy(&pool->mutex);
57,897✔
379

380
  uInfo("worker:%s is closed", pool->name);
57,898!
381
}
57,898✔
382

383
static void *tWWorkerThreadFp(SWWorker *worker) {
73,705✔
384
  SWWorkerPool *pool = worker->pool;
73,705✔
385
  SQueueInfo    qinfo = {0};
73,705✔
386
  void         *msg = NULL;
73,705✔
387
  int32_t       code = 0;
73,705✔
388
  int32_t       numOfMsgs = 0;
73,705✔
389

390
  int32_t ret = taosBlockSIGPIPE();
73,705✔
391
  if (ret < 0) {
73,702!
392
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
393
  }
394

395
  setThreadName(pool->name);
73,702✔
396
  worker->pid = taosGetSelfPthreadId();
73,701✔
397
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
73,706!
398

399
  while (1) {
400
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
6,918,177✔
401
    if (numOfMsgs == 0) {
6,916,030✔
402
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
72,753!
403
            worker->pid);
404
      break;
73,711✔
405
    }
406

407
    if (qinfo.timestamp != 0) {
6,843,277!
408
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
6,843,119✔
409
      if (cost > QUEUE_THRESHOLD) {
6,843,119✔
410
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
31!
411
      }
412
    }
413

414
    if (qinfo.fp != NULL) {
6,842,749✔
415
      qinfo.workerId = worker->id;
6,842,546✔
416
      qinfo.threadNum = pool->num;
6,842,546✔
417
      (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
6,842,546✔
418
    }
419
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
6,844,582✔
420
  }
421

422
  return NULL;
73,711✔
423
}
424

425
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
77,580✔
426
  (void)taosThreadMutexLock(&pool->mutex);
77,580✔
427
  SWWorker   *worker = pool->workers + pool->nextId;
77,580✔
428
  int32_t     code = -1;
77,580✔
429
  STaosQueue *queue;
430

431
  code = taosOpenQueue(&queue);
77,580✔
432
  if (code) goto _OVER;
77,580!
433

434
  taosSetQueueFp(queue, NULL, fp);
77,580✔
435
  if (worker->qset == NULL) {
77,580✔
436
    code = taosOpenQset(&worker->qset);
73,709✔
437
    if (code) goto _OVER;
73,711!
438

439
    code = taosAddIntoQset(worker->qset, queue, ahandle);
73,711✔
440
    if (code) goto _OVER;
73,711!
441
    code = taosAllocateQall(&worker->qall);
73,711✔
442
    if (code) goto _OVER;
73,710!
443

444
    TdThreadAttr thAttr;
445
    (void)taosThreadAttrInit(&thAttr);
73,710✔
446
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
73,710✔
447
    code = taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker);
73,708✔
448
    if ((code)) goto _OVER;
73,708!
449

450
    uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
73,708!
451
    pool->nextId = (pool->nextId + 1) % pool->max;
73,710✔
452

453
    (void)taosThreadAttrDestroy(&thAttr);
73,710✔
454
    pool->num++;
73,711✔
455
    if (pool->num > pool->max) pool->num = pool->max;
73,711!
456
  } else {
457
    code = taosAddIntoQset(worker->qset, queue, ahandle);
3,871✔
458
    if (code) goto _OVER;
3,869!
459
    pool->nextId = (pool->nextId + 1) % pool->max;
3,869✔
460
  }
461

462
_OVER:
77,580✔
463
  (void)taosThreadMutexUnlock(&pool->mutex);
77,580✔
464

465
  if (code) {
77,580!
466
    if (queue != NULL) taosCloseQueue(queue);
×
467
    if (worker->qset != NULL) taosCloseQset(worker->qset);
×
468
    if (worker->qall != NULL) taosFreeQall(worker->qall);
×
469
    terrno = code;
×
470
    return NULL;
×
471
  } else {
472
    while (worker->pid <= 0) taosMsleep(10);
125,399✔
473

474
    taosQueueSetThreadId(queue, worker->pid);
77,578✔
475
    uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, worker->pid);
77,580!
476
    return queue;
77,580✔
477
  }
478
}
479

480
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
77,578✔
481
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
77,578!
482
  taosCloseQueue(queue);
77,580✔
483
}
77,577✔
484

485
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
27,414✔
486
  int32_t code;
487
  pWorker->poolType = pCfg->poolType;
27,414✔
488
  pWorker->name = pCfg->name;
27,414✔
489
  pWorker->stopNoWaitQueue = pCfg->stopNoWaitQueue;
27,414✔
490

491
  switch (pCfg->poolType) {
27,414!
492
    case QWORKER_POOL: {
24,448✔
493
      SQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQWorkerPool));
24,448!
494
      if (!pPool) {
24,448!
495
        return terrno;
×
496
      }
497
      pPool->name = pCfg->name;
24,448✔
498
      pPool->min = pCfg->min;
24,448✔
499
      pPool->max = pCfg->max;
24,448✔
500
      pWorker->pool = pPool;
24,448✔
501
      if ((code = tQWorkerInit(pPool))) {
24,448!
502
        return (terrno = code);
×
503
      }
504

505
      pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
24,448✔
506
      if (pWorker->queue == NULL) {
24,448!
507
        return terrno;
×
508
      }
509
    } break;
24,448✔
510
    case QUERY_AUTO_QWORKER_POOL: {
2,966✔
511
      SQueryAutoQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPool));
2,966!
512
      if (!pPool) {
2,966!
513
        return terrno;
×
514
      }
515
      pPool->name = pCfg->name;
2,966✔
516
      pPool->min = pCfg->min;
2,966✔
517
      pPool->max = pCfg->max;
2,966✔
518
      pPool->stopNoWaitQueue = pCfg->stopNoWaitQueue;
2,966✔
519
      pWorker->pool = pPool;
2,966✔
520

521
      code = tQueryAutoQWorkerInit(pPool);
2,966✔
522
      if (code) return code;
2,966!
523

524
      pWorker->queue = tQueryAutoQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
2,966✔
525
      if (!pWorker->queue) {
2,966!
526
        return terrno;
×
527
      }
528
    } break;
2,966✔
529
    default:
×
530
      return TSDB_CODE_INVALID_PARA;
×
531
  }
532
  return 0;
27,414✔
533
}
534

535
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
27,414✔
536
  if (pWorker->queue == NULL) return;
27,414!
537
  if (!pWorker->stopNoWaitQueue) {
27,414✔
538
    while (!taosQueueEmpty(pWorker->queue)) {
30,997✔
539
      taosMsleep(10);
3,795✔
540
    }
541
  }
542

543
  switch (pWorker->poolType) {
27,414!
544
    case QWORKER_POOL:
24,448✔
545
      tQWorkerCleanup(pWorker->pool);
24,448✔
546
      tQWorkerFreeQueue(pWorker->pool, pWorker->queue);
24,448✔
547
      taosMemoryFree(pWorker->pool);
24,448!
548
      break;
24,448✔
549
    case QUERY_AUTO_QWORKER_POOL:
2,966✔
550
      tQueryAutoQWorkerCleanup(pWorker->pool);
2,966✔
551
      tQueryAutoQWorkerFreeQueue(pWorker->pool, pWorker->queue);
2,966✔
552
      taosMemoryFree(pWorker->pool);
2,966!
553
      break;
2,966✔
554
    default:
×
555
      break;
×
556
  }
557
}
558

559
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
50,216✔
560
  SWWorkerPool *pPool = &pWorker->pool;
50,216✔
561
  pPool->name = pCfg->name;
50,216✔
562
  pPool->max = pCfg->max;
50,216✔
563

564
  int32_t code = tWWorkerInit(pPool);
50,216✔
565
  if (code) return code;
50,216!
566

567
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
50,216✔
568
  if (pWorker->queue == NULL) {
50,216!
569
    return terrno;
×
570
  }
571

572
  pWorker->name = pCfg->name;
50,216✔
573
  return 0;
50,216✔
574
}
575

576
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
50,216✔
577
  if (pWorker->queue == NULL) return;
50,216!
578

579
  while (!taosQueueEmpty(pWorker->queue)) {
54,297✔
580
    taosMsleep(10);
4,084✔
581
  }
582

583
  tWWorkerCleanup(&pWorker->pool);
50,215✔
584
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
50,216✔
585
}
586

587
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool);
588
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p);
589
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p);
590
static void    tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool);
591
static bool    tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker);
592

593
#define GET_ACTIVE_N(int64_val)  (int32_t)((int64_val) >> 32)
594
#define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF)
595

596
static int32_t atomicFetchSubActive(int64_t *ptr, int32_t val) {
×
597
  int64_t acutalSubVal = val;
×
598
  acutalSubVal <<= 32;
×
599
  int64_t newVal64 = atomic_fetch_sub_64(ptr, acutalSubVal);
×
600
  return GET_ACTIVE_N(newVal64);
×
601
}
602

603
static int32_t atomicFetchSubRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_sub_64(ptr, val)); }
2,483,023✔
604

605
static int32_t atomicFetchAddActive(int64_t *ptr, int32_t val) {
477,920✔
606
  int64_t actualAddVal = val;
477,920✔
607
  actualAddVal <<= 32;
477,920✔
608
  int64_t newVal64 = atomic_fetch_add_64(ptr, actualAddVal);
477,920✔
609
  return GET_ACTIVE_N(newVal64);
477,920✔
610
}
611

612
static int32_t atomicFetchAddRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_add_64(ptr, val)); }
×
613

614
static bool atomicCompareExchangeActive(int64_t *ptr, int32_t *expectedVal, int32_t newVal) {
10✔
615
  int64_t oldVal64 = *expectedVal, newVal64 = newVal;
10✔
616
  int32_t running = GET_RUNNING_N(*ptr);
10✔
617
  oldVal64 <<= 32;
10✔
618
  newVal64 <<= 32;
10✔
619
  oldVal64 |= running;
10✔
620
  newVal64 |= running;
10✔
621
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
10✔
622
  if (actualNewVal64 == oldVal64) {
10!
623
    return true;
10✔
624
  } else {
625
    *expectedVal = GET_ACTIVE_N(actualNewVal64);
×
626
    return false;
×
627
  }
628
}
629

630
static int64_t atomicCompareExchangeRunning(int64_t *ptr, int32_t *expectedVal, int32_t newVal) {
×
631
  int64_t oldVal64 = *expectedVal, newVal64 = newVal;
×
632
  int64_t activeShifted = GET_ACTIVE_N(*ptr);
×
633
  activeShifted <<= 32;
×
634
  oldVal64 |= activeShifted;
×
635
  newVal64 |= activeShifted;
×
636
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
×
637
  if (actualNewVal64 == oldVal64) {
×
638
    return true;
×
639
  } else {
640
    *expectedVal = GET_RUNNING_N(actualNewVal64);
×
641
    return false;
×
642
  }
643
}
644

645
static int64_t atomicCompareExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive,
4,320,910✔
646
                                                     int32_t *expectedRunning, int32_t newRunning) {
647
  int64_t oldVal64 = *expectedActive, newVal64 = newActive;
4,320,910✔
648
  oldVal64 <<= 32;
4,320,910✔
649
  oldVal64 |= *expectedRunning;
4,320,910✔
650
  newVal64 <<= 32;
4,320,910✔
651
  newVal64 |= newRunning;
4,320,910✔
652
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
4,320,910✔
653
  if (actualNewVal64 == oldVal64) {
4,320,957✔
654
    return true;
4,320,220✔
655
  } else {
656
    *expectedActive = GET_ACTIVE_N(actualNewVal64);
737✔
657
    *expectedRunning = GET_RUNNING_N(actualNewVal64);
737✔
658
    return false;
737✔
659
  }
660
}
661

662
static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
479,698✔
663
  SQueryAutoQWorkerPool *pool = worker->pool;
479,698✔
664
  SQueueInfo             qinfo = {0};
479,698✔
665
  void                  *msg = NULL;
479,698✔
666
  int32_t                code = 0;
479,698✔
667

668
  int32_t ret = taosBlockSIGPIPE();
479,698✔
669
  if (ret < 0) {
479,687!
670
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
671
  }
672

673
  setThreadName(pool->name);
479,687✔
674
  worker->pid = taosGetSelfPthreadId();
479,667✔
675
  uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
479,672✔
676

677
  while (1) {
678
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
2,962,604✔
679
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
478,547!
680
            worker->pid);
681
      break;
479,070✔
682
    }
683

684
    if (pool->exit) {
2,483,057!
685
      uInfo("worker:%s:%d exit, thread:%08" PRId64, pool->name, worker->id, worker->pid);
×
686
      break;
×
687
    }
688

689
    if (qinfo.timestamp != 0) {
2,483,057!
690
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
2,483,032✔
691
      if (cost > QUEUE_THRESHOLD) {
2,483,032!
692
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
×
693
      }
694
    }
695

696
    tQueryAutoQWorkerWaitingCheck(pool);
2,483,028✔
697

698
    if (qinfo.fp != NULL) {
2,483,079!
699
      qinfo.workerId = worker->id;
2,483,079✔
700
      qinfo.threadNum = pool->num;
2,483,079✔
701
      qinfo.workerCb = pool->pCb;
2,483,079✔
702
      (*((FItem)qinfo.fp))(&qinfo, msg);
2,483,079✔
703
    }
704

705
    taosUpdateItemSize(qinfo.queue, 1);
2,482,809✔
706
    if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) {
2,483,066✔
707
      uDebug("worker:%s:%d exited", pool->name, worker->id);
157!
708
      break;
157✔
709
    }
710
  }
711

712
  DestoryThreadLocalRegComp();
479,227✔
713

714
  return NULL;
479,044✔
715
}
716

717
static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) {
3,401,636✔
718
  SQueryAutoQWorkerPool *pPool = p;
3,401,636✔
719
  bool                   ret = false;
3,401,636✔
720
  int32_t                waiting = pPool->waitingAfterBlockN;
3,401,636✔
721
  while (waiting > 0) {
3,401,636✔
722
    int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1);
12✔
723
    if (waitingNew == waiting) {
12!
724
      (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
12✔
725
      (void)taosThreadCondSignal(&pPool->waitingAfterBlockCond);
12✔
726
      (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
12✔
727
      ret = true;
7✔
728
      break;
7✔
729
    }
730
    waiting = waitingNew;
×
731
  }
732
  return ret;
3,401,631✔
733
}
734

735
static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void *p) {
3,401,629✔
736
  SQueryAutoQWorkerPool *pPool = p;
3,401,629✔
737
  bool                   ret = false;
3,401,629✔
738
  int32_t                waiting = pPool->waitingBeforeProcessMsgN;
3,401,629✔
739
  while (waiting > 0) {
3,401,629✔
740
    int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1);
10✔
741
    if (waitingNew == waiting) {
10!
742
      (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
10✔
743
      (void)taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond);
10✔
744
      (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
10✔
745
      ret = true;
12✔
746
      break;
12✔
747
    }
748
    waiting = waitingNew;
×
749
  }
750
  return ret;
3,401,631✔
751
}
752

753
static bool tQueryAutoQWorkerTryDecActive(void *p, int32_t minActive) {
3,401,616✔
754
  SQueryAutoQWorkerPool *pPool = p;
3,401,616✔
755
  bool                   ret = false;
3,401,616✔
756
  int64_t                val64 = pPool->activeRunningN;
3,401,616✔
757
  int32_t                active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64);
3,401,616✔
758
  while (active > minActive) {
3,401,655✔
759
    if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1))
918,608✔
760
      return true;
918,570✔
761
  }
762
  return false;
2,483,047✔
763
}
764

765
static void tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
2,483,042✔
766
  while (1) {
23✔
767
    int64_t val64 = pPool->activeRunningN;
2,483,042✔
768
    int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
2,483,042✔
769
    while (running < pPool->num) {
2,483,619!
770
      if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) {
2,483,668✔
771
        return;
2,483,070✔
772
      }
773
    }
774
    if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) {
×
775
      break;
×
776
    }
777
  }
778
  // to wait for process
779
  (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
×
780
  (void)atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1);
10✔
781
  if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock);
10!
782
  // recovered from waiting
783
  (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
10✔
784
  return;
10✔
785
}
786

787
bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) {
2,483,059✔
788
  if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) ||
4,966,108✔
789
      tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) {
2,483,047✔
790
    (void)taosThreadMutexLock(&pPool->poolLock);
458,083✔
791
    if (pPool->exit) {
458,063!
792
      (void)taosThreadMutexUnlock(&pPool->poolLock);
×
793
      return false;
×
794
    }
795

796
    SListNode *pNode = listNode(pWorker);
458,063✔
797
    SListNode *tNode = tdListPopNode(pPool->workers, pNode);
458,063✔
798
    // reclaim some workers
799
    if (pWorker->id >= pPool->maxInUse) {
458,063!
800
      while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) {
×
801
        SListNode         *head = tdListPopHead(pPool->exitedWorkers);
×
802
        SQueryAutoQWorker *pWorker = (SQueryAutoQWorker *)head->data;
×
803
        if (pWorker && taosCheckPthreadValid(pWorker->thread)) {
×
804
          (void)taosThreadJoin(pWorker->thread, NULL);
×
805
          taosThreadClear(&pWorker->thread);
×
806
        }
807
        taosMemoryFree(head);
×
808
      }
809
      tdListAppendNode(pPool->exitedWorkers, pNode);
×
810
      (void)taosThreadMutexUnlock(&pPool->poolLock);
×
811
      return false;
×
812
    }
813

814
    // put back to backup pool
815
    tdListAppendNode(pPool->backupWorkers, pNode);
458,063✔
816
    (void)taosThreadMutexUnlock(&pPool->poolLock);
458,063✔
817

818
    // start to wait at backup cond
819
    (void)taosThreadMutexLock(&pPool->backupLock);
458,063✔
820
    (void)atomic_fetch_add_32(&pPool->backupNum, 1);
458,063✔
821
    if (!pPool->exit) (void)taosThreadCondWait(&pPool->backupCond, &pPool->backupLock);
458,063!
822
    (void)taosThreadMutexUnlock(&pPool->backupLock);
458,063✔
823

824
    // recovered from backup
825
    (void)taosThreadMutexLock(&pPool->poolLock);
458,063✔
826
    if (pPool->exit) {
458,063✔
827
      (void)taosThreadMutexUnlock(&pPool->poolLock);
157✔
828
      return false;
157✔
829
    }
830
    SListNode *tNode1 = tdListPopNode(pPool->backupWorkers, pNode);
457,906✔
831
    tdListAppendNode(pPool->workers, pNode);
457,905✔
832
    (void)taosThreadMutexUnlock(&pPool->poolLock);
457,906✔
833

834
    return true;
457,905✔
835
  } else {
836
    (void)atomicFetchSubRunning(&pPool->activeRunningN, 1);
2,024,976✔
837
    return true;
2,025,015✔
838
  }
839
}
840

841
int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
8,234✔
842
  int32_t code;
843

844
  pool->exit = false;
8,234✔
845

846
  (void)taosThreadMutexInit(&pool->poolLock, NULL);
8,234✔
847
  (void)taosThreadMutexInit(&pool->backupLock, NULL);
8,234✔
848
  (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL);
8,234✔
849
  (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL);
8,234✔
850

851
  (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL);
8,234✔
852
  (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL);
8,234✔
853
  (void)taosThreadCondInit(&pool->backupCond, NULL);
8,234✔
854

855
  code = taosOpenQset(&pool->qset);
8,234✔
856
  if (code) return terrno = code;
8,234!
857
  pool->workers = tdListNew(sizeof(SQueryAutoQWorker));
8,234✔
858
  if (!pool->workers) return terrno;
8,234!
859
  pool->backupWorkers = tdListNew(sizeof(SQueryAutoQWorker));
8,234✔
860
  if (!pool->backupWorkers) return terrno;
8,234!
861
  pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker));
8,234✔
862
  if (!pool->exitedWorkers) return terrno;
8,234!
863
  pool->maxInUse = pool->max * 2 + 2;
8,234✔
864

865
  if (!pool->pCb) {
8,234!
866
    pool->pCb = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPoolCB));
8,234!
867
    if (!pool->pCb) return terrno;
8,234!
868
    pool->pCb->pPool = pool;
8,234✔
869
    pool->pCb->beforeBlocking = tQueryAutoQWorkerBeforeBlocking;
8,234✔
870
    pool->pCb->afterRecoverFromBlocking = tQueryAutoQWorkerRecoverFromBlocking;
8,234✔
871
  }
872
  return TSDB_CODE_SUCCESS;
8,234✔
873
}
874

875
void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
8,235✔
876
  (void)taosThreadMutexLock(&pPool->poolLock);
8,235✔
877
  if (pPool->stopNoWaitQueue) {
8,235✔
878
    pPool->exit = true;
212✔
879
  }
880
  int32_t size = 0;
8,235✔
881
  if (pPool->workers) {
8,235✔
882
    size = listNEles(pPool->workers);
8,234✔
883
  }
884
  if (pPool->backupWorkers) {
8,235✔
885
    size += listNEles(pPool->backupWorkers);
8,234✔
886
  }
887
  if (pPool->qset) {
8,235✔
888
    for (int32_t i = 0; i < size; ++i) {
487,984✔
889
      taosQsetThreadResume(pPool->qset);
479,750✔
890
    }
891
  }
892
  (void)taosThreadMutexUnlock(&pPool->poolLock);
8,235✔
893

894
  (void)taosThreadMutexLock(&pPool->backupLock);
8,235✔
895
  (void)taosThreadCondBroadcast(&pPool->backupCond);
8,235✔
896
  (void)taosThreadMutexUnlock(&pPool->backupLock);
8,235✔
897

898
  (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
8,235✔
899
  (void)taosThreadCondBroadcast(&pPool->waitingAfterBlockCond);
8,235✔
900
  (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
8,235✔
901

902
  (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
8,235✔
903
  (void)taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond);
8,235✔
904
  (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
8,235✔
905

906
  int32_t            idx = 0;
8,235✔
907
  SQueryAutoQWorker *worker = NULL;
8,235✔
908
  while (pPool->workers) {
487,828✔
909
    (void)taosThreadMutexLock(&pPool->poolLock);
487,827✔
910
    if (listNEles(pPool->workers) == 0) {
487,827✔
911
      (void)taosThreadMutexUnlock(&pPool->poolLock);
8,234✔
912
      break;
8,234✔
913
    }
914
    SListNode *pNode = tdListPopHead(pPool->workers);
479,593✔
915
    uDebug("0free worker node:%p, prev:%p, next:%p", pNode, TD_DLIST_NODE_PREV(pNode), TD_DLIST_NODE_NEXT(pNode));
479,593✔
916
    worker = (SQueryAutoQWorker *)pNode->data;
479,593✔
917
    (void)taosThreadMutexUnlock(&pPool->poolLock);
479,593✔
918
    if (worker && taosCheckPthreadValid(worker->thread)) {
479,593!
919
      (void)taosThreadJoin(worker->thread, NULL);
479,593✔
920
      taosThreadClear(&worker->thread);
479,593✔
921
    }
922
    uDebug("free worker node:%p, prev:%p, next:%p", pNode, TD_DLIST_NODE_PREV(pNode), TD_DLIST_NODE_NEXT(pNode));
479,593✔
923

924
    taosMemoryFree(pNode);
479,593!
925
  }
926

927
  while (pPool->backupWorkers) {
8,392✔
928
    (void)taosThreadMutexLock(&pPool->poolLock);
8,391✔
929
    if (listNEles(pPool->backupWorkers) == 0) {
8,391✔
930
      (void)taosThreadMutexUnlock(&pPool->poolLock);
8,234✔
931
      break;
8,234✔
932
    }
933
    uDebug("backupworker head:%p, prev:%p, next:%p", TD_DLIST_HEAD(pPool->backupWorkers), TD_DLIST_NODE_PREV(TD_DLIST_HEAD(pPool->backupWorkers)), TD_DLIST_NODE_NEXT(TD_DLIST_HEAD(pPool->backupWorkers)));
157!
934
    SListNode *pNode = tdListPopHead(pPool->backupWorkers);
157✔
935
    worker = (SQueryAutoQWorker *)pNode->data;
157✔
936
    (void)taosThreadMutexUnlock(&pPool->poolLock);
157✔
937

938
    if (worker && taosCheckPthreadValid(worker->thread)) {
157!
939
      (void)taosThreadJoin(worker->thread, NULL);
157✔
940
      taosThreadClear(&worker->thread);
157✔
941
    }
942
    taosMemoryFree(pNode);
157!
943
  }
944

945
  while (pPool->exitedWorkers) {
8,235✔
946
    (void)taosThreadMutexLock(&pPool->poolLock);
8,234✔
947
    if (listNEles(pPool->exitedWorkers) == 0) {
8,234!
948
      (void)taosThreadMutexUnlock(&pPool->poolLock);
8,234✔
949
      break;
8,234✔
950
    }
951

952
    SListNode *pNode = tdListPopHead(pPool->exitedWorkers);
×
953
    worker = (SQueryAutoQWorker *)pNode->data;
×
954
    (void)taosThreadMutexUnlock(&pPool->poolLock);
×
955

956
    if (worker && taosCheckPthreadValid(worker->thread)) {
×
957
      (void)taosThreadJoin(worker->thread, NULL);
×
958
      taosThreadClear(&worker->thread);
×
959
    }
960
    taosMemoryFree(pNode);
×
961
  }
962

963
  (void)taosThreadMutexLock(&pPool->poolLock);
8,235✔
964
  pPool->workers = tdListFree(pPool->workers);
8,235✔
965
  pPool->backupWorkers = tdListFree(pPool->backupWorkers);
8,235✔
966
  pPool->exitedWorkers = tdListFree(pPool->exitedWorkers);
8,235✔
967
  taosMemoryFree(pPool->pCb);
8,235!
968
  (void)taosThreadMutexUnlock(&pPool->poolLock);
8,235✔
969

970
  (void)taosThreadMutexDestroy(&pPool->poolLock);
8,235✔
971
  (void)taosThreadMutexDestroy(&pPool->backupLock);
8,235✔
972
  (void)taosThreadMutexDestroy(&pPool->waitingAfterBlockLock);
8,235✔
973
  (void)taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock);
8,235✔
974

975
  (void)taosThreadCondDestroy(&pPool->backupCond);
8,235✔
976
  (void)taosThreadCondDestroy(&pPool->waitingAfterBlockCond);
8,235✔
977
  (void)taosThreadCondDestroy(&pPool->waitingBeforeProcessMsgCond);
8,235✔
978
  taosCloseQset(pPool->qset);
8,235✔
979
}
8,235✔
980

981
STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahandle, FItem fp) {
18,075✔
982
  STaosQueue *queue;
983
  int32_t     code = taosOpenQueue(&queue);
18,075✔
984
  if (code) {
18,075!
985
    terrno = code;
×
986
    return NULL;
×
987
  }
988

989
  (void)taosThreadMutexLock(&pool->poolLock);
18,075✔
990
  taosSetQueueFp(queue, fp, NULL);
18,075✔
991
  code = taosAddIntoQset(pool->qset, queue, ahandle);
18,075✔
992
  if (code) {
18,075!
993
    taosCloseQueue(queue);
×
994
    queue = NULL;
×
995
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
996
    return NULL;
×
997
  }
998
  SQueryAutoQWorker  worker = {0};
18,075✔
999
  SQueryAutoQWorker *pWorker = NULL;
18,075✔
1000

1001
  // spawn a thread to process queue
1002
  if (pool->num < pool->max) {
18,075✔
1003
    do {
1004
      worker.id = listNEles(pool->workers);
477,920✔
1005
      worker.backupIdx = -1;
477,920✔
1006
      worker.pool = pool;
477,920✔
1007
      SListNode *pNode = tdListAdd(pool->workers, &worker);
477,920✔
1008
      if (!pNode) {
477,920!
1009
        taosCloseQueue(queue);
×
1010
        queue = NULL;
×
1011
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1012
        break;
×
1013
      }
1014
      pWorker = (SQueryAutoQWorker *)pNode->data;
477,920✔
1015

1016
      TdThreadAttr thAttr;
1017
      (void)taosThreadAttrInit(&thAttr);
477,920✔
1018
      (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
477,920✔
1019

1020
      if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
477,920!
1021
        taosCloseQueue(queue);
×
1022
        queue = NULL;
×
1023
        break;
×
1024
      }
1025

1026
      (void)taosThreadAttrDestroy(&thAttr);
477,920✔
1027
      pool->num++;
477,920✔
1028
      (void)atomicFetchAddActive(&pool->activeRunningN, 1);
477,920✔
1029
      uInfo("worker:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num);
477,920!
1030
    } while (pool->num < pool->min);
477,920✔
1031
  }
1032

1033
  (void)taosThreadMutexUnlock(&pool->poolLock);
18,075✔
1034
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
18,075!
1035

1036
  return queue;
18,075✔
1037
}
1038

1039
void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool *pPool, STaosQueue *pQ) { taosCloseQueue(pQ); }
15,520✔
1040

1041
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool) {
458,055✔
1042
  // try backup pool
1043
  int32_t backup = pool->backupNum;
458,055✔
1044
  while (backup > 0) {
458,067✔
1045
    int32_t backupNew = atomic_val_compare_exchange_32(&pool->backupNum, backup, backup - 1);
456,237✔
1046
    if (backupNew == backup) {
456,245✔
1047
      (void)taosThreadCondSignal(&pool->backupCond);
456,233✔
1048
      return TSDB_CODE_SUCCESS;
456,233✔
1049
    }
1050
    backup = backupNew;
12✔
1051
  }
1052
  // backup pool is empty, create new
1053
  SQueryAutoQWorker *pWorker = NULL;
1,830✔
1054
  SQueryAutoQWorker  worker = {0};
1,830✔
1055
  worker.pool = pool;
1,830✔
1056
  worker.backupIdx = -1;
1,830✔
1057
  (void)taosThreadMutexLock(&pool->poolLock);
1,830✔
1058
  if (pool->exit) {
1,830!
1059
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
1060
    return TSDB_CODE_SUCCESS;
×
1061
  }
1062
  worker.id = listNEles(pool->workers);
1,830✔
1063
  SListNode *pNode = tdListAdd(pool->workers, &worker);
1,830✔
1064
  if (!pNode) {
1,830!
1065
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
1066
    return terrno;
×
1067
  }
1068
  (void)taosThreadMutexUnlock(&pool->poolLock);
1,830✔
1069
  pWorker = (SQueryAutoQWorker *)pNode->data;
1,830✔
1070

1071
  TdThreadAttr thAttr;
1072
  (void)taosThreadAttrInit(&thAttr);
1,830✔
1073
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,830✔
1074

1075
  if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
1,830!
1076
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1077
    return terrno;
×
1078
  }
1079
  (void)taosThreadAttrDestroy(&thAttr);
1,830✔
1080

1081
  return TSDB_CODE_SUCCESS;
1,830✔
1082
}
1083

1084
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) {
918,581✔
1085
  SQueryAutoQWorkerPool *pPool = p;
918,581✔
1086
  if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(p) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(p) ||
1,837,153✔
1087
      tQueryAutoQWorkerTryDecActive(p, pPool->num)) {
918,569✔
1088
  } else {
1089
    int32_t code = tQueryAutoQWorkerAddWorker(pPool);
458,059✔
1090
    if (code != TSDB_CODE_SUCCESS) {
458,063!
1091
      return code;
×
1092
    }
1093
    (void)atomicFetchSubRunning(&pPool->activeRunningN, 1);
458,063✔
1094
  }
1095

1096
  return TSDB_CODE_SUCCESS;
918,592✔
1097
}
1098

1099
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) {
918,580✔
1100
  SQueryAutoQWorkerPool *pPool = p;
918,580✔
1101
  int64_t                val64 = pPool->activeRunningN;
918,580✔
1102
  int32_t                running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
918,580✔
1103
  while (running < pPool->num) {
918,674✔
1104
    if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) {
918,673✔
1105
      return TSDB_CODE_SUCCESS;
918,580✔
1106
    }
1107
  }
1108
  (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
1✔
1109
  (void)atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1);
12✔
1110
  if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock);
12!
1111
  (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
12✔
1112
  if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT;
12!
1113
  return TSDB_CODE_SUCCESS;
12✔
1114
}
1115

1116
int32_t tDispatchWorkerInit(SDispatchWorkerPool *pPool) {
5,201✔
1117
  int32_t code = 0;
5,201✔
1118
  pPool->num = 0;
5,201✔
1119
  pPool->pWorkers = taosMemCalloc(pPool->max, sizeof(SDispatchWorker));
5,201✔
1120
  if (!pPool->pWorkers) return terrno;
5,201!
1121
  (void)taosThreadMutexInit(&pPool->poolLock, NULL);
5,201✔
1122
  return code;
5,201✔
1123
}
1124

1125
static void *tDispatchWorkerThreadFp(SDispatchWorker *pWorker) {
33,366✔
1126
  SDispatchWorkerPool *pPool = pWorker->pool;
33,366✔
1127
  SQueueInfo qinfo = {0};
33,366✔
1128
  int32_t code = 0;
33,366✔
1129
  void *msg = NULL;
33,366✔
1130

1131
  int32_t ret = taosBlockSIGPIPE();
33,366✔
1132
  if (ret < 0) {
33,372!
1133
    uError("worker:%s:%d failed to block SIGPIPE", pPool->name, pWorker->id);
×
1134
  }
1135

1136
  setThreadName(pPool->name);
33,372✔
1137
  pWorker->pid = taosGetSelfPthreadId();
33,367✔
1138
  uInfo("worker:%s:%d is running, thread:%d", pPool->name, pWorker->id, pWorker->pid);
33,368!
1139

1140
  while (1) {
1141
    if (taosReadQitemFromQset(pWorker->qset, (void **)&msg, &qinfo) == 0) {
162,285✔
1142
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%d", pPool->name, pWorker->id,
32,670!
1143
            pWorker->qset, pWorker->pid);
1144
      break;
33,375✔
1145
    }
1146

1147
    if (qinfo.timestamp != 0) {
128,738!
1148
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
128,792✔
1149
      if (cost > QUEUE_THRESHOLD) {
128,792!
1150
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pPool->name, cost / QUEUE_THRESHOLD);
×
1151
      }
1152
    }
1153

1154
    if (qinfo.fp != NULL) {
128,782✔
1155
      qinfo.workerId = pWorker->id;
128,726✔
1156
      qinfo.threadNum = pPool->num;
128,726✔
1157
      (*((FItem)qinfo.fp))(&qinfo, msg);
128,726✔
1158
    }
1159
  }
1160
  DestoryThreadLocalRegComp();
33,375✔
1161
  return NULL;
33,376✔
1162
}
1163

1164
int32_t tDispatchWorkerAllocQueue(SDispatchWorkerPool *pPool, void *ahandle, FItem fp, DispatchFp dispatchFp) {
5,201✔
1165
  int32_t code = 0;
5,201✔
1166
  SDispatchWorker* pWorker = NULL;
5,201✔
1167
  (void)taosThreadMutexLock(&pPool->poolLock);
5,201✔
1168
  pPool->dispatchFp = dispatchFp;
5,201✔
1169
  for (int32_t i = pPool->num; i < pPool->max; ++i) {
38,577✔
1170
    pWorker = pPool->pWorkers + i;
33,376✔
1171
    pWorker->id = pPool->num;
33,376✔
1172
    pWorker->pool = pPool;
33,376✔
1173
    pPool->num++;
33,376✔
1174
    code = taosOpenQset(&pWorker->qset);
33,376✔
1175
    if (code != 0) break;
33,376!
1176
    code = taosOpenQueue(&pWorker->queue);
33,376✔
1177
    if (code != 0) break;
33,376!
1178
    taosSetQueueFp(pWorker->queue, fp, ahandle);
33,376✔
1179
    code = taosAddIntoQset(pWorker->qset, pWorker->queue, ahandle);
33,376✔
1180
    if (code != 0) break;
33,376!
1181

1182
    TdThreadAttr thAttr;
1183
    (void)taosThreadAttrInit(&thAttr);
33,376✔
1184
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
33,376✔
1185

1186
    if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tDispatchWorkerThreadFp, pWorker) != 0) {
33,376!
1187
      code = terrno;
×
1188
      (void)taosThreadAttrDestroy(&thAttr);
×
1189
      break;
×
1190
    }
1191
    (void)taosThreadAttrDestroy(&thAttr);
33,376✔
1192
    uInfo("worker:%s:%d is launched, threadId:%" PRId64 ", total:%d", pPool->name, pWorker->id, taosGetPthreadId(pWorker->thread), pPool->num);
33,376!
1193
  }
1194

1195
  (void)taosThreadMutexUnlock(&pPool->poolLock);
5,201✔
1196
  if (code == 0) uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pPool->name, pWorker->queue, ahandle);
5,201!
1197
  return code;
5,201✔
1198
}
1199

1200
static void tDispatchWorkerFreeQueue(SDispatchWorkerPool *pPool) {
5,201✔
1201
  (void)taosThreadMutexLock(&pPool->poolLock);
5,201✔
1202
  if (!pPool->pWorkers) return;
5,201!
1203
  for (int32_t i = 0; i < pPool->num; ++i) {
38,577✔
1204
    SDispatchWorker *pWorker = pPool->pWorkers + i;
33,376✔
1205
    if (pWorker->queue) {
33,376!
1206
      taosCloseQueue(pWorker->queue);
33,376✔
1207
      pWorker->queue = NULL;
33,376✔
1208
    }
1209
    if (pWorker->qset) {
33,376!
1210
      taosCloseQset(pWorker->qset);
33,376✔
1211
      pWorker->qset = NULL;
33,376✔
1212
    }
1213
  }
1214
  (void)taosThreadMutexUnlock(&pPool->poolLock);
5,201✔
1215
}
1216

1217
void tDispatchWorkerCleanup(SDispatchWorkerPool *pPool) {
5,201✔
1218
  (void)taosThreadMutexLock(&pPool->poolLock);
5,201✔
1219
  pPool->exit = true;
5,201✔
1220
  if (pPool->pWorkers) {
5,201!
1221
    for (int32_t i = 0; i < pPool->num; ++i) {
38,577✔
1222
      SDispatchWorker *pWorker = pPool->pWorkers + i;
33,376✔
1223
      if (pWorker->qset) {
33,376!
1224
        taosQsetThreadResume(pWorker->qset);
33,376✔
1225
      }
1226
    }
1227
  }
1228
  (void)taosThreadMutexUnlock(&pPool->poolLock);
5,201✔
1229

1230
  if (pPool->pWorkers) {
5,201!
1231
    for (int32_t i = 0; i < pPool->num; ++i) {
38,577✔
1232
      SDispatchWorker *pWorker = pPool->pWorkers + i;
33,376✔
1233
      if (taosCheckPthreadValid(pWorker->thread)) {
33,376!
1234
        (void)taosThreadJoin(pWorker->thread, NULL);
33,376✔
1235
        taosThreadClear(&pWorker->thread);
33,376✔
1236
      }
1237
    }
1238
  }
1239
  tDispatchWorkerFreeQueue(pPool);
5,201✔
1240
  taosMemoryFreeClear(pPool->pWorkers);
5,201!
1241
  (void)taosThreadMutexDestroy(&pPool->poolLock);
5,201✔
1242
}
5,201✔
1243

1244
int32_t tAddTaskIntoDispatchWorkerPool(SDispatchWorkerPool *pPool, void *pMsg) {
128,900✔
1245
  int32_t code = 0;
128,900✔
1246
  int32_t idx = 0;
128,900✔
1247
  SDispatchWorker *pWorker = NULL;
128,900✔
1248
  (void)taosThreadMutexLock(&pPool->poolLock);
128,900✔
1249
  code = pPool->dispatchFp(pPool, pMsg, &idx);
128,936✔
1250
  if (code == 0) {
128,936!
1251
    pWorker = pPool->pWorkers + idx;
128,936✔
1252
    if (pWorker->queue) {
128,936!
1253
      code = taosWriteQitem(pWorker->queue, pMsg);
128,936✔
1254
    } else {
1255
      code = TSDB_CODE_INTERNAL_ERROR;
×
1256
    }
1257
  }
1258
  (void)taosThreadMutexUnlock(&pPool->poolLock);
128,936✔
1259
  if (code != 0) {
128,935!
1260
    uError("worker:%s, failed to add task into dispatch worker pool, code:%d", pPool->name, code);
×
1261
  } else {
1262
    uDebug("msg %p dispatch to the %dth worker, threadId:%" PRId64, pMsg, idx, taosGetPthreadId(pWorker->thread));
128,935✔
1263
  }
1264
  return code;
128,935✔
1265
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc