• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5051

13 May 2026 12:00PM UTC coverage: 73.358% (-0.04%) from 73.398%
#5051

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

714 existing lines in 146 files now uncovered.

281543 of 383795 relevant lines covered (73.36%)

135448694.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.28
/source/util/src/tworker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tworker.h"
18
#include "taoserror.h"
19
#include "tcompare.h"
20
#include "tgeosctx.h"
21
#include "tlog.h"
22
#include "ttrace.h"
23
#include "tcurl.h"
24

25
#define QUEUE_THRESHOLD (1000 * 1000)
26

27
typedef void *(*ThreadFp)(void *param);
28

29
int32_t tQWorkerInit(SQWorkerPool *pool) {
6,631,401✔
30
  int32_t code = taosOpenQset(&pool->qset);
6,631,401✔
31
  if (code) return code;
6,631,401✔
32

33
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQueueWorker));
6,631,401✔
34
  if (pool->workers == NULL) {
6,631,401✔
35
    taosCloseQset(pool->qset);
×
36
    return terrno;
×
37
  }
38

39
  (void)taosThreadMutexInit(&pool->mutex, NULL);
6,631,401✔
40

41
  for (int32_t i = 0; i < pool->max; ++i) {
44,026,567✔
42
    SQueueWorker *worker = pool->workers + i;
37,395,166✔
43
    worker->id = i;
37,395,166✔
44
    worker->pool = pool;
37,395,166✔
45
  }
46

47
  uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
6,631,401✔
48
  return 0;
6,631,401✔
49
}
50

51
void tQWorkerCleanup(SQWorkerPool *pool) {
6,631,401✔
52
  for (int32_t i = 0; i < pool->max; ++i) {
44,026,567✔
53
    SQueueWorker *worker = pool->workers + i;
37,395,166✔
54
    if (taosCheckPthreadValid(worker->thread)) {
37,395,166✔
55
      taosQsetThreadResume(pool->qset);
37,395,166✔
56
    }
57
  }
58

59
  for (int32_t i = 0; i < pool->max; ++i) {
44,026,567✔
60
    SQueueWorker *worker = pool->workers + i;
37,395,166✔
61
    if (taosCheckPthreadValid(worker->thread)) {
37,395,166✔
62
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
37,395,166✔
63
      (void)taosThreadJoin(worker->thread, NULL);
37,395,166✔
64
      taosThreadClear(&worker->thread);
37,395,166✔
65
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
37,395,166✔
66
    }
67
  }
68

69
  taosMemoryFreeClear(pool->workers);
6,631,401✔
70
  taosCloseQset(pool->qset);
6,631,401✔
71
  (void)taosThreadMutexDestroy(&pool->mutex);
6,631,401✔
72

73
  uInfo("worker:%s is closed", pool->name);
6,631,401✔
74
}
6,631,401✔
75

76
static void *tQWorkerThreadFp(SQueueWorker *worker) {
37,382,893✔
77
  SQWorkerPool *pool = worker->pool;
37,382,893✔
78
  SQueueInfo    qinfo = {0};
37,394,938✔
79
  void         *msg = NULL;
37,394,662✔
80
  int32_t       code = 0;
37,394,662✔
81

82
  int32_t ret = taosBlockSIGPIPE();
37,394,662✔
83
  if (ret < 0) {
37,392,362✔
84
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
85
  }
86

87
  setThreadName(pool->name);
37,392,362✔
88
  if (pool->threadCategory >= 0) taosSetCpuAffinity((EThreadCategory)pool->threadCategory);
37,394,651✔
89
  worker->pid = taosGetSelfPthreadId();
37,387,266✔
90
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
37,383,863✔
91

92
  while (1) {
93
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
570,784,405✔
94
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
37,382,726✔
95
            worker->pid);
96
      break;
37,375,891✔
97
    }
98

99
    if (qinfo.timestamp != 0) {
533,390,906✔
100
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
533,391,204✔
101
      if (cost > QUEUE_THRESHOLD) {
533,391,204✔
102
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
1,071,405✔
103
      }
104
    }
105

106
    if (qinfo.fp != NULL) {
533,390,136✔
107
      qinfo.workerId = worker->id;
533,390,458✔
108
      qinfo.threadNum = pool->num;
533,391,261✔
109
      (*((FItem)qinfo.fp))(&qinfo, msg);
533,389,258✔
110
    }
111

112
    taosUpdateItemSize(qinfo.queue, 1);
533,384,766✔
113
  }
114

115
  DestoryThreadLocalRegComp();
37,375,891✔
116

117
  return NULL;
37,394,254✔
118
}
119

120
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
6,631,401✔
121
  int32_t     code;
122
  STaosQueue *queue;
6,610,982✔
123

124
  code = taosOpenQueue(&queue);
6,631,401✔
125
  if (code) {
6,631,401✔
126
    terrno = code;
×
127
    return NULL;
×
128
  }
129

130
  (void)taosThreadMutexLock(&pool->mutex);
6,631,401✔
131
  taosSetQueueFp(queue, fp, NULL);
6,631,401✔
132
  code = taosAddIntoQset(pool->qset, queue, ahandle);
6,631,401✔
133
  if (code) {
6,631,401✔
134
    taosCloseQueue(queue);
×
135
    (void)taosThreadMutexUnlock(&pool->mutex);
×
136
    terrno = code;
×
137
    return NULL;
×
138
  }
139

140
  // spawn a thread to process queue
141
  if (pool->num < pool->max) {
6,631,401✔
142
    do {
143
      SQueueWorker *worker = pool->workers + pool->num;
37,395,166✔
144

145
      TdThreadAttr thAttr;
37,300,002✔
146
      (void)taosThreadAttrInit(&thAttr);
37,395,166✔
147
      (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
37,395,166✔
148

149
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
37,395,166✔
150
        taosCloseQueue(queue);
×
151
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
152
        queue = NULL;
×
153
        break;
×
154
      }
155

156
      (void)taosThreadAttrDestroy(&thAttr);
37,395,166✔
157
      pool->num++;
37,395,166✔
158
      uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
37,395,166✔
159
    } while (pool->num < pool->min);
37,395,166✔
160
  }
161

162
  (void)taosThreadMutexUnlock(&pool->mutex);
6,631,401✔
163
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
6,631,401✔
164

165
  return queue;
6,631,401✔
166
}
167

168
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
6,631,401✔
169
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
6,631,401✔
170
  taosCloseQueue(queue);
6,631,401✔
171
}
6,631,401✔
172

173
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
×
174
  int32_t code;
175

176
  code = taosOpenQset(&pool->qset);
×
177
  if (code) {
×
178
    return terrno = code;
×
179
  }
180

181
  pool->workers = taosArrayInit(2, sizeof(SQueueWorker *));
×
182
  if (pool->workers == NULL) {
×
183
    taosCloseQset(pool->qset);
×
184
    return terrno;
×
185
  }
186

187
  (void)taosThreadMutexInit(&pool->mutex, NULL);
×
188

189
  uInfo("worker:%s is initialized as auto", pool->name);
×
190
  return 0;
×
191
}
192

193
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
×
194
  int32_t size = taosArrayGetSize(pool->workers);
×
195
  for (int32_t i = 0; i < size; ++i) {
×
196
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
×
197
    if (taosCheckPthreadValid(worker->thread)) {
×
198
      taosQsetThreadResume(pool->qset);
×
199
    }
200
  }
201

202
  for (int32_t i = 0; i < size; ++i) {
×
203
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
×
204
    if (taosCheckPthreadValid(worker->thread)) {
×
205
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
×
206
      (void)taosThreadJoin(worker->thread, NULL);
×
207
      taosThreadClear(&worker->thread);
×
208
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
×
209
    }
210
    taosMemoryFree(worker);
×
211
  }
212

213
  taosArrayDestroy(pool->workers);
×
214
  taosCloseQset(pool->qset);
×
215
  (void)taosThreadMutexDestroy(&pool->mutex);
×
216

217
  uInfo("worker:%s is closed", pool->name);
×
218
}
×
219

220
static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
×
221
  SAutoQWorkerPool *pool = worker->pool;
×
222
  SQueueInfo        qinfo = {0};
×
223
  void             *msg = NULL;
×
224
  int32_t           code = 0;
×
225

226
  int32_t ret = taosBlockSIGPIPE();
×
227
  if (ret < 0) {
×
228
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
229
  }
230

231
  setThreadName(pool->name);
×
232
  if (pool->threadCategory >= 0) taosSetCpuAffinity((EThreadCategory)pool->threadCategory);
×
233
  worker->pid = taosGetSelfPthreadId();
×
234
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
×
235

236
  while (1) {
237
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
×
238
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
×
239
            worker->pid);
240
      break;
×
241
    }
242

243
    if (qinfo.timestamp != 0) {
×
244
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
×
245
      if (cost > QUEUE_THRESHOLD) {
×
246
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
×
247
      }
248
    }
249

250
    if (qinfo.fp != NULL) {
×
251
      qinfo.workerId = worker->id;
×
252
      qinfo.threadNum = taosArrayGetSize(pool->workers);
×
253
      (*((FItem)qinfo.fp))(&qinfo, msg);
×
254
    }
255

256
    taosUpdateItemSize(qinfo.queue, 1);
×
257
  }
258
  DestoryThreadLocalRegComp();
×
259
  closeThreadNotificationConn();
×
260

261
  return NULL;
×
262
}
263

264
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum) {
×
265
  int32_t     code;
266
  STaosQueue *queue;
×
267

268
  code = taosOpenQueue(&queue);
×
269
  if (code) {
×
270
    terrno = code;
×
271
    return NULL;
×
272
  }
273

274
  (void)taosThreadMutexLock(&pool->mutex);
×
275
  taosSetQueueFp(queue, fp, NULL);
×
276

277
  code = taosAddIntoQset(pool->qset, queue, ahandle);
×
278
  if (code) {
×
279
    taosCloseQueue(queue);
×
280
    (void)taosThreadMutexUnlock(&pool->mutex);
×
281
    terrno = code;
×
282
    return NULL;
×
283
  }
284

285
  int32_t queueNum = taosGetQueueNumber(pool->qset);
×
286
  int32_t curWorkerNum = taosArrayGetSize(pool->workers);
×
287
  int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
×
288

289
  if (dstWorkerNum < minNum) {
×
290
    dstWorkerNum = minNum;
×
291
  }
292

293
  // spawn a thread to process queue
294
  while (curWorkerNum < dstWorkerNum) {
×
295
    SQueueWorker *worker = taosMemoryCalloc(1, sizeof(SQueueWorker));
×
296
    if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
×
297
      uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
×
298
      taosMemoryFree(worker);
×
299
      taosCloseQueue(queue);
×
300
      (void)taosThreadMutexUnlock(&pool->mutex);
×
301
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
302
      return NULL;
×
303
    }
304
    worker->id = curWorkerNum;
×
305
    worker->pool = pool;
×
306

307
    TdThreadAttr thAttr;
×
308
    (void)taosThreadAttrInit(&thAttr);
×
309
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
310

311
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
×
312
      uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
×
313
      void *tmp = taosArrayPop(pool->workers);
×
314
      taosMemoryFree(worker);
×
315
      taosCloseQueue(queue);
×
316
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
317
      return NULL;
×
318
    }
319

320
    (void)taosThreadAttrDestroy(&thAttr);
×
321
    int32_t numOfThreads = taosArrayGetSize(pool->workers);
×
322
    uInfo("worker:%s:%d is launched, total:%d, expect:%d", pool->name, worker->id, numOfThreads, dstWorkerNum);
×
323

324
    curWorkerNum++;
×
325
  }
326

327
  (void)taosThreadMutexUnlock(&pool->mutex);
×
328
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
×
329

330
  return queue;
×
331
}
332

333
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
×
334
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
×
335
  taosCloseQueue(queue);
×
336
}
×
337

338
int32_t tWWorkerInit(SWWorkerPool *pool) {
21,562,319✔
339
  pool->nextId = 0;
21,562,319✔
340
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
21,562,458✔
341
  if (pool->workers == NULL) {
21,562,590✔
342
    return terrno;
×
343
  }
344

345
  (void)taosThreadMutexInit(&pool->mutex, NULL);
21,561,556✔
346

347
  for (int32_t i = 0; i < pool->max; ++i) {
50,382,578✔
348
    SWWorker *worker = pool->workers + i;
28,821,288✔
349
    worker->id = i;
28,820,161✔
350
    worker->qall = NULL;
28,819,630✔
351
    worker->qset = NULL;
28,819,216✔
352
    worker->pool = pool;
28,818,758✔
353
  }
354

355
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
21,561,335✔
356
  return 0;
21,564,662✔
357
}
358

359
void tWWorkerCleanup(SWWorkerPool *pool) {
21,564,662✔
360
  for (int32_t i = 0; i < pool->max; ++i) {
50,386,586✔
361
    SWWorker *worker = pool->workers + i;
28,821,986✔
362
    if (taosCheckPthreadValid(worker->thread)) {
28,821,962✔
363
      if (worker->qset) {
24,207,068✔
364
        taosQsetThreadResume(worker->qset);
24,207,068✔
365
      }
366
    }
367
  }
368

369
  for (int32_t i = 0; i < pool->max; ++i) {
50,385,952✔
370
    SWWorker *worker = pool->workers + i;
28,821,924✔
371
    if (taosCheckPthreadValid(worker->thread)) {
28,821,924✔
372
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
24,207,357✔
373
      (void)taosThreadJoin(worker->thread, NULL);
24,207,435✔
374
      taosThreadClear(&worker->thread);
24,207,567✔
375
      taosFreeQall(worker->qall);
24,205,941✔
376
      taosCloseQset(worker->qset);
24,207,528✔
377
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
24,207,567✔
378
    }
379
  }
380

381
  taosMemoryFreeClear(pool->workers);
21,564,662✔
382
  (void)taosThreadMutexDestroy(&pool->mutex);
21,563,850✔
383

384
  uInfo("worker:%s is closed", pool->name);
21,564,662✔
385
}
21,564,662✔
386

387
static void *tWWorkerThreadFp(SWWorker *worker) {
24,207,567✔
388
  SWWorkerPool *pool = worker->pool;
24,207,567✔
389
  SQueueInfo    qinfo = {0};
24,207,567✔
390
  void         *msg = NULL;
24,207,567✔
391
  int32_t       code = 0;
24,207,567✔
392
  int32_t       numOfMsgs = 0;
24,207,567✔
393

394
  int32_t ret = taosBlockSIGPIPE();
24,207,567✔
395
  if (ret < 0) {
24,200,811✔
396
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
397
  }
398

399
  setThreadName(pool->name);
24,200,811✔
400
  if (pool->threadCategory >= 0) taosSetCpuAffinity((EThreadCategory)pool->threadCategory);
24,207,567✔
401
  worker->pid = taosGetSelfPthreadId();
24,179,525✔
402
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
24,194,932✔
403

404
  while (1) {
405
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
2,147,483,647✔
406
    if (numOfMsgs == 0) {
2,147,483,647✔
407
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
24,202,990✔
408
            worker->pid);
409
      break;
24,207,498✔
410
    }
411

412
    if (qinfo.timestamp != 0) {
2,147,483,647✔
413
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
2,147,483,647✔
414
      if (cost > QUEUE_THRESHOLD) {
2,147,483,647✔
415
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
62,926✔
416
      }
417
    }
418

419
    if (qinfo.fp != NULL) {
2,147,483,647✔
420
      qinfo.workerId = worker->id;
2,147,483,647✔
421
      qinfo.threadNum = pool->num;
2,147,483,647✔
422
      (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
2,147,483,647✔
423
    }
424
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
2,147,483,647✔
425
  }
426

427
  return NULL;
24,207,498✔
428
}
429

430
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
25,885,086✔
431
  (void)taosThreadMutexLock(&pool->mutex);
25,885,086✔
432
  SWWorker   *worker = pool->workers + pool->nextId;
25,885,420✔
433
  int32_t     code = -1;
25,885,420✔
434
  STaosQueue *queue;
25,859,796✔
435

436
  code = taosOpenQueue(&queue);
25,885,279✔
437
  if (code) goto _OVER;
25,885,281✔
438

439
  taosSetQueueFp(queue, NULL, fp);
25,885,281✔
440
  if (worker->qset == NULL) {
25,884,823✔
441
    code = taosOpenQset(&worker->qset);
24,206,970✔
442
    if (code) goto _OVER;
24,206,601✔
443

444
    code = taosAddIntoQset(worker->qset, queue, ahandle);
24,206,601✔
445
    if (code) goto _OVER;
24,207,428✔
446
    code = taosAllocateQall(&worker->qall);
24,207,428✔
447
    if (code) goto _OVER;
24,205,738✔
448

449
    TdThreadAttr thAttr;
24,183,014✔
450
    (void)taosThreadAttrInit(&thAttr);
24,206,335✔
451
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
24,206,009✔
452
    code = taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker);
24,206,937✔
453
    if ((code)) goto _OVER;
24,207,567✔
454

455
    uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
24,207,567✔
456
    pool->nextId = (pool->nextId + 1) % pool->max;
24,207,567✔
457

458
    (void)taosThreadAttrDestroy(&thAttr);
24,207,567✔
459
    pool->num++;
24,207,567✔
460
    if (pool->num > pool->max) pool->num = pool->max;
24,207,567✔
461
  } else {
462
    code = taosAddIntoQset(worker->qset, queue, ahandle);
1,677,853✔
463
    if (code) goto _OVER;
1,677,853✔
464
    pool->nextId = (pool->nextId + 1) % pool->max;
1,677,853✔
465
  }
466

467
_OVER:
25,885,420✔
468
  (void)taosThreadMutexUnlock(&pool->mutex);
25,884,962✔
469

470
  if (code) {
25,885,420✔
471
    if (queue != NULL) taosCloseQueue(queue);
×
472
    if (worker->qset != NULL) taosCloseQset(worker->qset);
×
473
    if (worker->qall != NULL) taosFreeQall(worker->qall);
×
474
    terrno = code;
×
475
    return NULL;
×
476
  } else {
477
    while (worker->pid <= 0) taosMsleep(10);
53,869,316✔
478

479
    taosQueueSetThreadId(queue, worker->pid);
25,885,207✔
480
    uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, worker->pid);
25,883,236✔
481
    return queue;
25,877,986✔
482
  }
483
}
484

485
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
25,885,420✔
486
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
25,885,420✔
487
  taosCloseQueue(queue);
25,885,966✔
488
}
25,884,901✔
489

490
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
7,826,819✔
491
  int32_t code;
492
  pWorker->poolType = pCfg->poolType;
7,826,819✔
493
  pWorker->name = pCfg->name;
7,826,819✔
494
  pWorker->stopNoWaitQueue = pCfg->stopNoWaitQueue;
7,826,819✔
495

496
  switch (pCfg->poolType) {
7,826,819✔
497
    case QWORKER_POOL: {
6,631,401✔
498
      SQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQWorkerPool));
6,631,401✔
499
      if (!pPool) {
6,631,401✔
500
        return terrno;
×
501
      }
502
      pPool->name = pCfg->name;
6,631,401✔
503
      pPool->min = pCfg->min;
6,631,401✔
504
      pPool->max = pCfg->max;
6,631,401✔
505
      pPool->threadCategory = pCfg->threadCategory;
6,631,401✔
506
      pWorker->pool = pPool;
6,631,401✔
507
      if ((code = tQWorkerInit(pPool))) {
6,631,401✔
508
        return (terrno = code);
×
509
      }
510

511
      pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
6,631,401✔
512
      if (pWorker->queue == NULL) {
6,631,401✔
513
        return terrno;
×
514
      }
515
    } break;
6,631,401✔
516
    case QUERY_AUTO_QWORKER_POOL: {
1,195,418✔
517
      SQueryAutoQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPool));
1,195,418✔
518
      if (!pPool) {
1,195,418✔
519
        return terrno;
×
520
      }
521
      pPool->name = pCfg->name;
1,195,418✔
522
      pPool->min = pCfg->min;
1,195,418✔
523
      pPool->max = pCfg->max;
1,195,418✔
524
      pPool->stopNoWaitQueue = pCfg->stopNoWaitQueue;
1,195,418✔
525
      pPool->threadCategory = pCfg->threadCategory;
1,195,418✔
526
      pWorker->pool = pPool;
1,195,418✔
527

528
      code = tQueryAutoQWorkerInit(pPool);
1,195,418✔
529
      if (code) return code;
1,195,418✔
530

531
      pWorker->queue = tQueryAutoQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
1,195,418✔
532
      if (!pWorker->queue) {
1,195,418✔
533
        return terrno;
×
534
      }
535
    } break;
1,195,418✔
536
    default:
×
537
      return TSDB_CODE_INVALID_PARA;
×
538
  }
539
  return 0;
7,826,819✔
540
}
541

542
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
7,826,819✔
543
  if (pWorker->queue == NULL) return;
7,826,819✔
544
  if (!pWorker->stopNoWaitQueue) {
7,826,819✔
545
    while (!taosQueueEmpty(pWorker->queue)) {
7,966,652✔
546
      taosMsleep(10);
232,818✔
547
    }
548
  }
549

550
  switch (pWorker->poolType) {
7,826,819✔
551
    case QWORKER_POOL:
6,631,401✔
552
      tQWorkerCleanup(pWorker->pool);
6,631,401✔
553
      tQWorkerFreeQueue(pWorker->pool, pWorker->queue);
6,631,401✔
554
      taosMemoryFree(pWorker->pool);
6,631,401✔
555
      break;
6,631,401✔
556
    case QUERY_AUTO_QWORKER_POOL:
1,195,418✔
557
      tQueryAutoQWorkerCleanup(pWorker->pool);
1,195,418✔
558
      tQueryAutoQWorkerFreeQueue(pWorker->pool, pWorker->queue);
1,195,418✔
559
      taosMemoryFree(pWorker->pool);
1,195,418✔
560
      break;
1,195,418✔
561
    default:
×
562
      break;
×
563
  }
564
}
565

566
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
20,268,864✔
567
  SWWorkerPool *pPool = &pWorker->pool;
20,268,864✔
568
  pPool->name = pCfg->name;
20,269,559✔
569
  pPool->max = pCfg->max;
20,269,559✔
570
  pPool->threadCategory = pCfg->threadCategory;
20,269,693✔
571

572
  int32_t code = tWWorkerInit(pPool);
20,268,459✔
573
  if (code) return code;
20,270,388✔
574

575
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
20,270,388✔
576
  if (pWorker->queue == NULL) {
20,270,388✔
577
    return terrno;
×
578
  }
579

580
  pWorker->name = pCfg->name;
20,270,388✔
581
  return 0;
20,270,388✔
582
}
583

584
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
20,269,128✔
585
  if (pWorker->queue == NULL) return;
20,269,128✔
586

587
  while (!taosQueueEmpty(pWorker->queue)) {
27,862,998✔
588
    taosMsleep(10);
7,592,145✔
589
  }
590

591
  tWWorkerCleanup(&pWorker->pool);
20,269,801✔
592
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
20,269,521✔
593
}
594

595
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool);
596
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p);
597
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p);
598
static void    tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool);
599
static bool    tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker);
600

601
#define GET_ACTIVE_N(int64_val)  (int32_t)((int64_val) >> 32)
602
#define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF)
603

604
static int32_t atomicFetchSubActive(int64_t *ptr, int32_t val) {
×
605
  int64_t acutalSubVal = val;
×
606
  acutalSubVal <<= 32;
×
607
  int64_t newVal64 = atomic_fetch_sub_64(ptr, acutalSubVal);
×
608
  return GET_ACTIVE_N(newVal64);
×
609
}
610

611
static int32_t atomicFetchSubRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_sub_64(ptr, val)); }
2,147,483,647✔
612

613
static int32_t atomicFetchAddActive(int64_t *ptr, int32_t val) {
221,956,862✔
614
  int64_t actualAddVal = val;
221,956,862✔
615
  actualAddVal <<= 32;
221,956,862✔
616
  int64_t newVal64 = atomic_fetch_add_64(ptr, actualAddVal);
221,956,862✔
617
  return GET_ACTIVE_N(newVal64);
221,956,862✔
618
}
619

620
static int32_t atomicFetchAddRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_add_64(ptr, val)); }
×
621

622
static bool atomicCompareExchangeActive(int64_t *ptr, int32_t *expectedVal, int32_t newVal) {
×
623
  int64_t oldVal64 = *expectedVal, newVal64 = newVal;
×
624
  int32_t running = GET_RUNNING_N(*ptr);
×
625
  oldVal64 <<= 32;
×
626
  newVal64 <<= 32;
×
627
  oldVal64 |= running;
×
628
  newVal64 |= running;
×
629
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
×
630
  if (actualNewVal64 == oldVal64) {
×
631
    return true;
×
632
  } else {
633
    *expectedVal = GET_ACTIVE_N(actualNewVal64);
×
634
    return false;
×
635
  }
636
}
637

638
static int64_t atomicCompareExchangeRunning(int64_t *ptr, int32_t *expectedVal, int32_t newVal) {
×
639
  int64_t oldVal64 = *expectedVal, newVal64 = newVal;
×
640
  int64_t activeShifted = GET_ACTIVE_N(*ptr);
×
641
  activeShifted <<= 32;
×
642
  oldVal64 |= activeShifted;
×
643
  newVal64 |= activeShifted;
×
644
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
×
645
  if (actualNewVal64 == oldVal64) {
×
646
    return true;
×
647
  } else {
648
    *expectedVal = GET_RUNNING_N(actualNewVal64);
×
649
    return false;
×
650
  }
651
}
652

653
static int64_t atomicCompareExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive,
2,147,483,647✔
654
                                                     int32_t *expectedRunning, int32_t newRunning) {
655
  int64_t oldVal64 = *expectedActive, newVal64 = newActive;
2,147,483,647✔
656
  oldVal64 <<= 32;
2,147,483,647✔
657
  oldVal64 |= *expectedRunning;
2,147,483,647✔
658
  newVal64 <<= 32;
2,147,483,647✔
659
  newVal64 |= newRunning;
2,147,483,647✔
660
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
2,147,483,647✔
661
  if (actualNewVal64 == oldVal64) {
2,147,483,647✔
662
    return true;
2,147,483,647✔
663
  } else {
664
    *expectedActive = GET_ACTIVE_N(actualNewVal64);
1,185,912✔
665
    *expectedRunning = GET_RUNNING_N(actualNewVal64);
1,185,912✔
666
    return false;
1,185,912✔
667
  }
668
}
669

670
static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
222,984,251✔
671
  SQueryAutoQWorkerPool *pool = worker->pool;
222,984,251✔
672
  SQueueInfo             qinfo = {0};
222,990,483✔
673
  void                  *msg = NULL;
222,989,728✔
674
  int32_t                code = 0;
222,989,156✔
675

676
  int32_t ret = taosBlockSIGPIPE();
222,989,156✔
677
  if (ret < 0) {
222,975,014✔
678
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
679
  }
680

681
  setThreadName(pool->name);
222,975,014✔
682
  if (pool->threadCategory >= 0) taosSetCpuAffinity((EThreadCategory)pool->threadCategory);
222,992,451✔
683
  worker->pid = taosGetSelfPthreadId();
222,930,546✔
684
  uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
222,971,211✔
685

686
  while (1) {
687
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
2,147,483,647✔
688
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
221,831,741✔
689
            worker->pid);
690
      break;
221,909,898✔
691
    }
692

693
    if (pool->stopNoWaitQueue && pool->exit) {
2,147,483,647✔
694
      uInfo("worker:%s:%d exit, thread:%08" PRId64, pool->name, worker->id, worker->pid);
×
695
      break;
×
696
    }
697

698
    if (qinfo.timestamp != 0) {
2,147,483,647✔
699
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
2,147,483,647✔
700
      if (cost > QUEUE_THRESHOLD) {
2,147,483,647✔
701
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
3,605,002✔
702
      }
703
    }
704

705
    tQueryAutoQWorkerWaitingCheck(pool);
2,147,483,647✔
706

707
    if (qinfo.fp != NULL) {
2,147,483,647✔
708
      qinfo.workerId = worker->id;
2,147,483,647✔
709
      qinfo.threadNum = pool->num;
2,147,483,647✔
710
      qinfo.workerCb = pool->pCb;
2,147,483,647✔
711
      (*((FItem)qinfo.fp))(&qinfo, msg);
2,147,483,647✔
712
    }
713

714
    taosUpdateItemSize(qinfo.queue, 1);
2,147,483,647✔
715
    if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) {
2,147,483,647✔
716
      uDebug("worker:%s:%d exited", pool->name, worker->id);
1,039,211✔
717
      break;
1,039,395✔
718
    }
719
  }
720

721
  DestoryThreadLocalRegComp();
222,949,293✔
722
  closeThreadNotificationConn();
222,888,928✔
723

724
  return NULL;
222,911,529✔
725
}
726

727
static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) {
2,147,483,647✔
728
  SQueryAutoQWorkerPool *pPool = p;
2,147,483,647✔
729
  bool                   ret = false;
2,147,483,647✔
730
  int32_t                waiting = pPool->waitingAfterBlockN;
2,147,483,647✔
731
  while (waiting > 0) {
2,147,483,647✔
UNCOV
732
    int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1);
×
UNCOV
733
    if (waitingNew == waiting) {
×
UNCOV
734
      (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
×
UNCOV
735
      (void)taosThreadCondSignal(&pPool->waitingAfterBlockCond);
×
UNCOV
736
      (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
×
737
      ret = true;
11,575✔
738
      break;
11,575✔
739
    }
740
    waiting = waitingNew;
×
741
  }
742
  return ret;
2,147,483,647✔
743
}
744

745
static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void *p) {
2,147,483,647✔
746
  SQueryAutoQWorkerPool *pPool = p;
2,147,483,647✔
747
  bool                   ret = false;
2,147,483,647✔
748
  int32_t                waiting = pPool->waitingBeforeProcessMsgN;
2,147,483,647✔
749
  while (waiting > 0) {
2,147,483,647✔
750
    int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1);
×
751
    if (waitingNew == waiting) {
×
752
      (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
×
753
      (void)taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond);
×
754
      (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
×
755
      ret = true;
15,907✔
756
      break;
15,907✔
757
    }
758
    waiting = waitingNew;
×
759
  }
760
  return ret;
2,147,483,647✔
761
}
762

763
static bool tQueryAutoQWorkerTryDecActive(void *p, int32_t minActive) {
2,147,483,647✔
764
  SQueryAutoQWorkerPool *pPool = p;
2,147,483,647✔
765
  bool                   ret = false;
2,147,483,647✔
766
  int64_t                val64 = pPool->activeRunningN;
2,147,483,647✔
767
  int32_t                active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64);
2,147,483,647✔
768
  while (active > minActive) {
2,147,483,647✔
769
    if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1))
326,299,503✔
770
      return true;
326,275,208✔
771
  }
772
  return false;
2,147,483,647✔
773
}
774

775
static void tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
2,147,483,647✔
776
  while (1) {
17,485✔
777
    int64_t val64 = pPool->activeRunningN;
2,147,483,647✔
778
    int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
2,147,483,647✔
779
    while (running < pPool->num) {
2,147,483,647✔
780
      if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) {
2,147,483,647✔
781
        return;
2,147,483,647✔
782
      }
783
    }
784
    if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) {
1,180✔
785
      break;
×
786
    }
787
  }
788
  // to wait for process
789
  (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
×
790
  (void)atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1);
×
791
  if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock);
×
792
  // recovered from waiting
793
  (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
×
794
  return;
×
795
}
796

797
bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) {
2,147,483,647✔
798
  if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) ||
2,147,483,647✔
799
      tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) {
2,147,483,647✔
800
    (void)taosThreadMutexLock(&pPool->poolLock);
140,664,984✔
801
    if (pPool->exit) {
140,647,068✔
802
      (void)taosThreadMutexUnlock(&pPool->poolLock);
1,891✔
803
      return false;
1,891✔
804
    }
805

806
    SListNode *pNode = listNode(pWorker);
140,645,177✔
807
    SListNode *tNode = tdListPopNode(pPool->workers, pNode);
140,645,177✔
808
    // reclaim some workers
809
    if (pWorker->id >= pPool->maxInUse) {
140,645,177✔
810
      while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) {
×
811
        SListNode         *head = tdListPopHead(pPool->exitedWorkers);
×
812
        SQueryAutoQWorker *pWorker = (SQueryAutoQWorker *)head->data;
×
813
        if (pWorker && taosCheckPthreadValid(pWorker->thread)) {
×
814
          (void)taosThreadJoin(pWorker->thread, NULL);
×
815
          taosThreadClear(&pWorker->thread);
×
816
        }
817
        taosMemoryFree(head);
×
818
      }
819
      tdListAppendNode(pPool->exitedWorkers, pNode);
×
820
      (void)taosThreadMutexUnlock(&pPool->poolLock);
×
821
      return false;
×
822
    }
823

824
    // put back to backup pool
825
    tdListAppendNode(pPool->backupWorkers, pNode);
140,645,177✔
826
    (void)taosThreadMutexUnlock(&pPool->poolLock);
140,645,177✔
827

828
    // start to wait at backup cond
829
    (void)taosThreadMutexLock(&pPool->backupLock);
140,645,177✔
830
    (void)atomic_fetch_add_32(&pPool->backupNum, 1);
140,645,177✔
831
    if (!pPool->exit) (void)taosThreadCondWait(&pPool->backupCond, &pPool->backupLock);
140,645,177✔
832
    (void)taosThreadMutexUnlock(&pPool->backupLock);
140,645,177✔
833

834
    // recovered from backup
835
    (void)taosThreadMutexLock(&pPool->poolLock);
140,644,887✔
836
    if (pPool->exit) {
140,645,177✔
837
      (void)taosThreadMutexUnlock(&pPool->poolLock);
1,037,504✔
838
      return false;
1,037,504✔
839
    }
840
    SListNode *tNode1 = tdListPopNode(pPool->backupWorkers, pNode);
139,607,673✔
841
    tdListAppendNode(pPool->workers, pNode);
139,607,673✔
842
    (void)taosThreadMutexUnlock(&pPool->poolLock);
139,607,673✔
843

844
    return true;
139,607,673✔
845
  } else {
846
    (void)atomicFetchSubRunning(&pPool->activeRunningN, 1);
2,147,483,647✔
847
    return true;
2,147,483,647✔
848
  }
849
}
850

851
int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
4,454,599✔
852
  int32_t code;
853

854
  pool->exit = false;
4,454,599✔
855

856
  (void)taosThreadMutexInit(&pool->poolLock, NULL);
4,454,599✔
857
  (void)taosThreadMutexInit(&pool->backupLock, NULL);
4,454,599✔
858
  (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL);
4,454,599✔
859
  (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL);
4,454,599✔
860

861
  (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL);
4,454,599✔
862
  (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL);
4,454,599✔
863
  (void)taosThreadCondInit(&pool->backupCond, NULL);
4,454,599✔
864

865
  code = taosOpenQset(&pool->qset);
4,454,599✔
866
  if (code) return terrno = code;
4,454,599✔
867
  pool->workers = tdListNew(sizeof(SQueryAutoQWorker));
4,454,599✔
868
  if (!pool->workers) return terrno;
4,454,599✔
869
  pool->backupWorkers = tdListNew(sizeof(SQueryAutoQWorker));
4,454,599✔
870
  if (!pool->backupWorkers) return terrno;
4,454,599✔
871
  pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker));
4,454,599✔
872
  if (!pool->exitedWorkers) return terrno;
4,454,599✔
873
  pool->maxInUse = pool->max * 2 + 2;
4,454,599✔
874

875
  if (!pool->pCb) {
4,454,599✔
876
    pool->pCb = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPoolCB));
4,454,599✔
877
    if (!pool->pCb) return terrno;
4,454,599✔
878
    pool->pCb->pPool = pool;
4,454,599✔
879
    pool->pCb->beforeBlocking = tQueryAutoQWorkerBeforeBlocking;
4,454,599✔
880
    pool->pCb->afterRecoverFromBlocking = tQueryAutoQWorkerRecoverFromBlocking;
4,454,599✔
881
  }
882
  return TSDB_CODE_SUCCESS;
4,454,599✔
883
}
884

885
void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
4,454,678✔
886
  (void)taosThreadMutexLock(&pPool->poolLock);
4,454,678✔
887
  pPool->exit = true;
4,454,678✔
888

889
  int32_t size = 0;
4,454,678✔
890
  if (pPool->workers) {
4,454,678✔
891
    size += listNEles(pPool->workers);
4,454,599✔
892
  }
893
  if (pPool->backupWorkers) {
4,454,678✔
894
    size += listNEles(pPool->backupWorkers);
4,454,599✔
895
  }
896
  if (pPool->qset) {
4,454,678✔
897
    for (int32_t i = 0; i < size; ++i) {
227,450,856✔
898
      taosQsetThreadResume(pPool->qset);
222,996,257✔
899
    }
900
  }
901
  (void)taosThreadMutexUnlock(&pPool->poolLock);
4,454,678✔
902

903
  (void)taosThreadMutexLock(&pPool->backupLock);
4,454,678✔
904
  (void)taosThreadCondBroadcast(&pPool->backupCond);
4,454,678✔
905
  (void)taosThreadMutexUnlock(&pPool->backupLock);
4,454,678✔
906

907
  (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
4,454,678✔
908
  (void)taosThreadCondBroadcast(&pPool->waitingAfterBlockCond);
4,454,678✔
909
  (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
4,454,678✔
910

911
  (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
4,454,678✔
912
  (void)taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond);
4,454,678✔
913
  (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
4,454,678✔
914

915
  int32_t            idx = 0;
4,454,678✔
916
  SQueryAutoQWorker *worker = NULL;
4,454,678✔
917
  while (pPool->workers) {
226,413,431✔
918
    (void)taosThreadMutexLock(&pPool->poolLock);
226,413,352✔
919
    if (listNEles(pPool->workers) <= 0) {
226,413,352✔
920
      (void)taosThreadMutexUnlock(&pPool->poolLock);
4,454,599✔
921
      break;
4,454,599✔
922
    }
923
    SListNode *pNode = tdListPopHead(pPool->workers);
221,958,753✔
924
    uDebug("0free worker node:%p, prev:%p, next:%p", pNode, TD_DLIST_NODE_PREV(pNode), TD_DLIST_NODE_NEXT(pNode));
221,958,753✔
925
    worker = pNode ? (SQueryAutoQWorker *)pNode->data : NULL;
221,958,753✔
926
    (void)taosThreadMutexUnlock(&pPool->poolLock);
221,958,753✔
927
    if (worker && taosCheckPthreadValid(worker->thread)) {
221,958,753✔
928
      (void)taosThreadJoin(worker->thread, NULL);
221,958,753✔
929
      taosThreadClear(&worker->thread);
221,958,753✔
930
    }
931
    uDebug("free worker node:%p, prev:%p, next:%p", pNode, TD_DLIST_NODE_PREV(pNode), TD_DLIST_NODE_NEXT(pNode));
221,958,753✔
932

933
    taosMemoryFree(pNode);
221,958,753✔
934
  }
935

936
  while (pPool->backupWorkers) {
5,492,182✔
937
    (void)taosThreadMutexLock(&pPool->poolLock);
5,492,103✔
938
    if (listNEles(pPool->backupWorkers) <= 0) {
5,492,103✔
939
      (void)taosThreadMutexUnlock(&pPool->poolLock);
4,454,599✔
940
      break;
4,454,599✔
941
    }
942
    uDebug("backupworker head:%p, prev:%p, next:%p", TD_DLIST_HEAD(pPool->backupWorkers), 
1,037,504✔
943
        TD_DLIST_HEAD(pPool->backupWorkers) ? TD_DLIST_NODE_PREV(TD_DLIST_HEAD(pPool->backupWorkers)) : NULL, 
944
        TD_DLIST_HEAD(pPool->backupWorkers) ? TD_DLIST_NODE_NEXT(TD_DLIST_HEAD(pPool->backupWorkers)) : NULL);
945
    SListNode *pNode = tdListPopHead(pPool->backupWorkers);
1,037,504✔
946
    worker = pNode ? (SQueryAutoQWorker *)pNode->data : NULL;
1,037,504✔
947
    (void)taosThreadMutexUnlock(&pPool->poolLock);
1,037,504✔
948

949
    if (worker && taosCheckPthreadValid(worker->thread)) {
1,037,504✔
950
      (void)taosThreadJoin(worker->thread, NULL);
1,037,504✔
951
      taosThreadClear(&worker->thread);
1,037,504✔
952
    }
953
    taosMemoryFree(pNode);
1,037,504✔
954
  }
955

956
  while (pPool->exitedWorkers) {
4,454,678✔
957
    (void)taosThreadMutexLock(&pPool->poolLock);
4,454,599✔
958
    if (listNEles(pPool->exitedWorkers) == 0) {
4,454,599✔
959
      (void)taosThreadMutexUnlock(&pPool->poolLock);
4,454,599✔
960
      break;
4,454,599✔
961
    }
962

963
    SListNode *pNode = tdListPopHead(pPool->exitedWorkers);
×
964
    worker = pNode ? (SQueryAutoQWorker *)pNode->data : NULL;
×
965
    (void)taosThreadMutexUnlock(&pPool->poolLock);
×
966

967
    if (worker && taosCheckPthreadValid(worker->thread)) {
×
968
      (void)taosThreadJoin(worker->thread, NULL);
×
969
      taosThreadClear(&worker->thread);
×
970
    }
971
    taosMemoryFree(pNode);
×
972
  }
973

974
  (void)taosThreadMutexLock(&pPool->poolLock);
4,454,678✔
975
  pPool->workers = tdListFree(pPool->workers);
4,454,678✔
976
  pPool->backupWorkers = tdListFree(pPool->backupWorkers);
4,454,678✔
977
  pPool->exitedWorkers = tdListFree(pPool->exitedWorkers);
4,454,678✔
978
  taosMemoryFree(pPool->pCb);
4,454,678✔
979
  (void)taosThreadMutexUnlock(&pPool->poolLock);
4,454,678✔
980

981
  (void)taosThreadMutexDestroy(&pPool->poolLock);
4,454,678✔
982
  (void)taosThreadMutexDestroy(&pPool->backupLock);
4,454,678✔
983
  (void)taosThreadMutexDestroy(&pPool->waitingAfterBlockLock);
4,454,678✔
984
  (void)taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock);
4,454,678✔
985

986
  (void)taosThreadCondDestroy(&pPool->backupCond);
4,454,678✔
987
  (void)taosThreadCondDestroy(&pPool->waitingAfterBlockCond);
4,454,678✔
988
  (void)taosThreadCondDestroy(&pPool->waitingBeforeProcessMsgCond);
4,454,678✔
989
  taosCloseQset(pPool->qset);
4,454,678✔
990
}
4,454,678✔
991

992
STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahandle, FItem fp) {
13,096,115✔
993
  STaosQueue *queue;
13,063,982✔
994
  int32_t     code = taosOpenQueue(&queue);
13,096,115✔
995
  if (code) {
13,095,325✔
996
    terrno = code;
×
997
    return NULL;
×
998
  }
999

1000
  (void)taosThreadMutexLock(&pool->poolLock);
13,095,325✔
1001
  taosSetQueueFp(queue, fp, NULL);
13,096,115✔
1002
  code = taosAddIntoQset(pool->qset, queue, ahandle);
13,096,115✔
1003
  if (code) {
13,095,121✔
1004
    taosCloseQueue(queue);
×
1005
    queue = NULL;
×
1006
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
1007
    return NULL;
×
1008
  }
1009
  SQueryAutoQWorker  worker = {0};
13,095,121✔
1010
  SQueryAutoQWorker *pWorker = NULL;
13,095,121✔
1011

1012
  // spawn a thread to process queue
1013
  if (pool->num < pool->max) {
13,095,121✔
1014
    do {
1015
      worker.id = listNEles(pool->workers);
221,956,862✔
1016
      worker.backupIdx = -1;
221,956,862✔
1017
      worker.pool = pool;
221,956,862✔
1018
      SListNode *pNode = tdListAdd(pool->workers, &worker);
221,956,862✔
1019
      if (!pNode) {
221,956,862✔
1020
        taosCloseQueue(queue);
×
1021
        queue = NULL;
×
1022
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1023
        break;
×
1024
      }
1025
      pWorker = (SQueryAutoQWorker *)pNode->data;
221,956,862✔
1026

1027
      TdThreadAttr thAttr;
220,455,378✔
1028
      (void)taosThreadAttrInit(&thAttr);
221,956,862✔
1029
      (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
221,956,862✔
1030

1031
      if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
221,956,862✔
1032
        taosCloseQueue(queue);
×
1033
        queue = NULL;
×
1034
        break;
×
1035
      }
1036

1037
      (void)taosThreadAttrDestroy(&thAttr);
221,956,862✔
1038
      pool->num++;
221,956,862✔
1039
      (void)atomicFetchAddActive(&pool->activeRunningN, 1);
221,956,862✔
1040
      uInfo("worker:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num);
221,956,862✔
1041
    } while (pool->num < pool->min);
221,956,862✔
1042
  }
1043

1044
  (void)taosThreadMutexUnlock(&pool->poolLock);
13,095,918✔
1045
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
13,096,115✔
1046

1047
  return queue;
13,096,115✔
1048
}
1049

1050
void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool *pPool, STaosQueue *pQ) { taosCloseQueue(pQ); }
11,330,612✔
1051

1052
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool) {
140,644,658✔
1053
  // try backup pool
1054
  int32_t backup = pool->backupNum;
140,644,658✔
1055
  while (backup > 0) {
140,662,084✔
1056
    int32_t backupNew = atomic_val_compare_exchange_32(&pool->backupNum, backup, backup - 1);
139,622,689✔
1057
    if (backupNew == backup) {
139,625,713✔
1058
      (void)taosThreadCondSignal(&pool->backupCond);
139,607,673✔
1059
      return TSDB_CODE_SUCCESS;
139,607,231✔
1060
    }
1061
    backup = backupNew;
18,040✔
1062
  }
1063
  // backup pool is empty, create new
1064
  SQueryAutoQWorker *pWorker = NULL;
1,039,395✔
1065
  SQueryAutoQWorker  worker = {0};
1,039,395✔
1066
  worker.pool = pool;
1,039,395✔
1067
  worker.backupIdx = -1;
1,039,395✔
1068
  (void)taosThreadMutexLock(&pool->poolLock);
1,039,395✔
1069
  if (pool->exit) {
1,039,395✔
1070
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
1071
    return TSDB_CODE_SUCCESS;
×
1072
  }
1073
  worker.id = listNEles(pool->workers);
1,039,395✔
1074
  SListNode *pNode = tdListAdd(pool->workers, &worker);
1,039,395✔
1075
  if (!pNode) {
1,039,395✔
1076
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
1077
    return terrno;
×
1078
  }
1079
  (void)taosThreadMutexUnlock(&pool->poolLock);
1,039,395✔
1080
  pWorker = (SQueryAutoQWorker *)pNode->data;
1,039,395✔
1081

1082
  TdThreadAttr thAttr;
1,037,985✔
1083
  (void)taosThreadAttrInit(&thAttr);
1,039,395✔
1084
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,039,395✔
1085

1086
  if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
1,039,395✔
1087
    uError("create queryAutoWorker thread failed, error:%s", tstrerror(terrno));
×
1088
    return terrno;
×
1089
  }
1090
  (void)taosThreadAttrDestroy(&thAttr);
1,039,395✔
1091

1092
  return TSDB_CODE_SUCCESS;
1,039,395✔
1093
}
1094

1095
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) {
326,272,210✔
1096
  SQueryAutoQWorkerPool *pPool = p;
326,272,210✔
1097
  if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(p) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(p) ||
652,540,452✔
1098
      tQueryAutoQWorkerTryDecActive(p, pPool->num)) {
326,268,181✔
1099
  } else {
1100
    int32_t code = tQueryAutoQWorkerAddWorker(pPool);
140,640,102✔
1101
    if (code != TSDB_CODE_SUCCESS) {
140,645,236✔
1102
      return code;
×
1103
    }
1104
    (void)atomicFetchSubRunning(&pPool->activeRunningN, 1);
140,645,236✔
1105
  }
1106

1107
  return TSDB_CODE_SUCCESS;
326,274,152✔
1108
}
1109

1110
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) {
326,274,594✔
1111
  SQueryAutoQWorkerPool *pPool = p;
326,274,594✔
1112
  int64_t                val64 = pPool->activeRunningN;
326,274,594✔
1113
  int32_t                running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
326,274,594✔
1114
  while (running < pPool->num) {
326,284,785✔
1115
    if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) {
326,284,785✔
1116
      return TSDB_CODE_SUCCESS;
326,275,208✔
1117
    }
1118
  }
UNCOV
1119
  (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
×
UNCOV
1120
  (void)atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1);
×
UNCOV
1121
  if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock);
×
UNCOV
1122
  (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
×
UNCOV
1123
  if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT;
×
UNCOV
1124
  return TSDB_CODE_SUCCESS;
×
1125
}
1126

1127
int32_t tDispatchWorkerInit(SDispatchWorkerPool *pPool) {
1,391,305✔
1128
  int32_t code = 0;
1,391,305✔
1129
  pPool->num = 0;
1,391,305✔
1130
  pPool->pWorkers = taosMemCalloc(pPool->max, sizeof(SDispatchWorker));
1,391,305✔
1131
  if (!pPool->pWorkers) return terrno;
1,391,305✔
1132
  (void)taosThreadMutexInit(&pPool->poolLock, NULL);
1,391,305✔
1133
  return code;
1,391,305✔
1134
}
1135

1136
static void *tDispatchWorkerThreadFp(SDispatchWorker *pWorker) {
10,189,530✔
1137
  SDispatchWorkerPool *pPool = pWorker->pool;
10,189,530✔
1138
  SQueueInfo qinfo = {0};
10,192,303✔
1139
  int32_t code = 0;
10,191,992✔
1140
  void *msg = NULL;
10,191,992✔
1141

1142
  int32_t ret = taosBlockSIGPIPE();
10,191,992✔
1143
  if (ret < 0) {
10,191,435✔
1144
    uError("worker:%s:%d failed to block SIGPIPE", pPool->name, pWorker->id);
×
1145
  }
1146

1147
  setThreadName(pPool->name);
10,191,435✔
1148
  if (pPool->threadCategory >= 0) taosSetCpuAffinity((EThreadCategory)pPool->threadCategory);
10,191,510✔
1149
  pWorker->pid = taosGetSelfPthreadId();
10,191,305✔
1150
  uInfo("worker:%s:%d is running, thread:%d", pPool->name, pWorker->id, pWorker->pid);
10,191,721✔
1151

1152
  while (1) {
1153
    if (taosReadQitemFromQset(pWorker->qset, (void **)&msg, &qinfo) == 0) {
144,965,193✔
1154
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%d", pPool->name, pWorker->id,
10,175,678✔
1155
            pWorker->qset, pWorker->pid);
1156
      break;
10,192,379✔
1157
    }
1158

1159
    if (qinfo.timestamp != 0) {
134,778,455✔
1160
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
134,779,203✔
1161
      if (cost > QUEUE_THRESHOLD) {
134,779,203✔
1162
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pPool->name, cost / QUEUE_THRESHOLD);
184,038✔
1163
      }
1164
    }
1165

1166
    if (qinfo.fp != NULL) {
134,788,322✔
1167
      qinfo.workerId = pWorker->id;
134,777,069✔
1168
      qinfo.threadNum = pPool->num;
134,777,930✔
1169
      (*((FItem)qinfo.fp))(&qinfo, msg);
134,780,239✔
1170
    }
1171
  }
1172
  DestoryThreadLocalRegComp();
10,192,379✔
1173
  closeThreadNotificationConn();
10,192,379✔
1174
  return NULL;
10,185,449✔
1175
}
1176

1177
int32_t tDispatchWorkerAllocQueue(SDispatchWorkerPool *pPool, void *ahandle, FItem fp, DispatchFp dispatchFp) {
1,391,305✔
1178
  int32_t code = 0;
1,391,305✔
1179
  SDispatchWorker* pWorker = NULL;
1,391,305✔
1180
  (void)taosThreadMutexLock(&pPool->poolLock);
1,391,305✔
1181
  pPool->dispatchFp = dispatchFp;
1,391,305✔
1182
  for (int32_t i = pPool->num; i < pPool->max; ++i) {
11,583,684✔
1183
    pWorker = pPool->pWorkers + i;
10,192,379✔
1184
    pWorker->id = pPool->num;
10,192,379✔
1185
    pWorker->pool = pPool;
10,192,379✔
1186
    pPool->num++;
10,192,379✔
1187
    code = taosOpenQset(&pWorker->qset);
10,192,379✔
1188
    if (code != 0) break;
10,192,379✔
1189
    code = taosOpenQueue(&pWorker->queue);
10,192,379✔
1190
    if (code != 0) break;
10,192,379✔
1191
    taosSetQueueFp(pWorker->queue, fp, ahandle);
10,192,379✔
1192
    code = taosAddIntoQset(pWorker->qset, pWorker->queue, ahandle);
10,192,379✔
1193
    if (code != 0) break;
10,192,379✔
1194

1195
    TdThreadAttr thAttr;
10,177,697✔
1196
    (void)taosThreadAttrInit(&thAttr);
10,192,379✔
1197
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
10,192,379✔
1198

1199
    if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tDispatchWorkerThreadFp, pWorker) != 0) {
10,192,379✔
1200
      code = terrno;
×
1201
      (void)taosThreadAttrDestroy(&thAttr);
×
1202
      break;
×
1203
    }
1204
    (void)taosThreadAttrDestroy(&thAttr);
10,192,379✔
1205
    uInfo("worker:%s:%d is launched, threadId:%" PRId64 ", total:%d", pPool->name, pWorker->id, taosGetPthreadId(pWorker->thread), pPool->num);
10,192,379✔
1206
  }
1207

1208
  (void)taosThreadMutexUnlock(&pPool->poolLock);
1,391,305✔
1209
  if (code == 0) uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pPool->name, pWorker->queue, ahandle);
1,391,305✔
1210
  return code;
1,391,305✔
1211
}
1212

1213
static void tDispatchWorkerFreeQueue(SDispatchWorkerPool *pPool) {
1,391,305✔
1214
  (void)taosThreadMutexLock(&pPool->poolLock);
1,391,305✔
1215
  if (!pPool->pWorkers) return;
1,391,305✔
1216
  for (int32_t i = 0; i < pPool->num; ++i) {
11,583,684✔
1217
    SDispatchWorker *pWorker = pPool->pWorkers + i;
10,192,379✔
1218
    if (pWorker->queue) {
10,192,379✔
1219
      taosCloseQueue(pWorker->queue);
10,192,379✔
1220
      pWorker->queue = NULL;
10,192,379✔
1221
    }
1222
    if (pWorker->qset) {
10,192,379✔
1223
      taosCloseQset(pWorker->qset);
10,192,379✔
1224
      pWorker->qset = NULL;
10,192,379✔
1225
    }
1226
  }
1227
  (void)taosThreadMutexUnlock(&pPool->poolLock);
1,391,305✔
1228
}
1229

1230
void tDispatchWorkerCleanup(SDispatchWorkerPool *pPool) {
1,391,305✔
1231
  (void)taosThreadMutexLock(&pPool->poolLock);
1,391,305✔
1232
  pPool->exit = true;
1,391,305✔
1233
  if (pPool->pWorkers) {
1,391,305✔
1234
    for (int32_t i = 0; i < pPool->num; ++i) {
11,583,684✔
1235
      SDispatchWorker *pWorker = pPool->pWorkers + i;
10,192,379✔
1236
      if (pWorker->qset) {
10,192,379✔
1237
        taosQsetThreadResume(pWorker->qset);
10,192,379✔
1238
      }
1239
    }
1240
  }
1241
  (void)taosThreadMutexUnlock(&pPool->poolLock);
1,391,305✔
1242

1243
  if (pPool->pWorkers) {
1,391,305✔
1244
    for (int32_t i = 0; i < pPool->num; ++i) {
11,583,684✔
1245
      SDispatchWorker *pWorker = pPool->pWorkers + i;
10,192,379✔
1246
      if (taosCheckPthreadValid(pWorker->thread)) {
10,192,379✔
1247
        (void)taosThreadJoin(pWorker->thread, NULL);
10,192,379✔
1248
        taosThreadClear(&pWorker->thread);
10,192,379✔
1249
      }
1250
    }
1251
  }
1252
  tDispatchWorkerFreeQueue(pPool);
1,391,305✔
1253
  taosMemoryFreeClear(pPool->pWorkers);
1,391,305✔
1254
  (void)taosThreadMutexDestroy(&pPool->poolLock);
1,391,305✔
1255
}
1,391,305✔
1256

1257
int32_t tAddTaskIntoDispatchWorkerPool(SDispatchWorkerPool *pPool, void *pMsg) {
134,773,783✔
1258
  int32_t code = 0;
134,773,783✔
1259
  int32_t idx = 0;
134,773,783✔
1260
  SDispatchWorker *pWorker = NULL;
134,775,614✔
1261
  (void)taosThreadMutexLock(&pPool->poolLock);
134,775,614✔
1262
  code = pPool->dispatchFp(pPool, pMsg, &idx);
134,783,281✔
1263
  if (code == 0) {
134,783,281✔
1264
    pWorker = pPool->pWorkers + idx;
134,783,281✔
1265
    if (pWorker->queue) {
134,783,281✔
1266
      code = taosWriteQitem(pWorker->queue, pMsg);
134,783,281✔
1267
    } else {
1268
      code = TSDB_CODE_INTERNAL_ERROR;
×
1269
    }
1270
  }
1271
  (void)taosThreadMutexUnlock(&pPool->poolLock);
134,783,281✔
1272
  if (code != 0) {
134,783,281✔
1273
    uError("worker:%s, failed to add task into dispatch worker pool, code:%d", pPool->name, code);
×
1274
  } else {
1275
    uDebug("msg %p dispatch to the %dth worker, threadId:%" PRId64, pMsg, idx, taosGetPthreadId(pWorker->thread));
134,783,281✔
1276
  }
1277
  return code;
134,783,281✔
1278
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc