• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3942

25 Apr 2025 11:21AM UTC coverage: 62.853% (+0.3%) from 62.507%
#3942

push

travis-ci

web-flow
docs: jdbc tmq supports database subscription. [TS-6222] (#30819)

* docs: jdbc tmq supports database subscription. [TS-6222]

* Update docs/zh/07-develop/07-tmq.md

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* Update 07-tmq.md

---------

Co-authored-by: haoranchen <haoran920c@163.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

156603 of 317531 branches covered (49.32%)

Branch coverage included in aggregate %.

241895 of 316485 relevant lines covered (76.43%)

6664240.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.8
/source/util/src/tworker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tworker.h"
18
#include "taoserror.h"
19
#include "tcompare.h"
20
#include "tgeosctx.h"
21
#include "tlog.h"
22

23
#define QUEUE_THRESHOLD (1000 * 1000)
24

25
typedef void *(*ThreadFp)(void *param);
26

27
int32_t tQWorkerInit(SQWorkerPool *pool) {
23,022✔
28
  int32_t code = taosOpenQset(&pool->qset);
23,022✔
29
  if (code) return code;
23,022!
30

31
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQueueWorker));
23,022!
32
  if (pool->workers == NULL) {
23,022!
33
    taosCloseQset(pool->qset);
×
34
    return terrno;
×
35
  }
36

37
  (void)taosThreadMutexInit(&pool->mutex, NULL);
23,022✔
38

39
  for (int32_t i = 0; i < pool->max; ++i) {
150,837✔
40
    SQueueWorker *worker = pool->workers + i;
127,815✔
41
    worker->id = i;
127,815✔
42
    worker->pool = pool;
127,815✔
43
  }
44

45
  uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
23,022!
46
  return 0;
23,022✔
47
}
48

49
void tQWorkerCleanup(SQWorkerPool *pool) {
23,022✔
50
  for (int32_t i = 0; i < pool->max; ++i) {
150,837✔
51
    SQueueWorker *worker = pool->workers + i;
127,815✔
52
    if (taosCheckPthreadValid(worker->thread)) {
127,815!
53
      taosQsetThreadResume(pool->qset);
127,815✔
54
    }
55
  }
56

57
  for (int32_t i = 0; i < pool->max; ++i) {
150,837✔
58
    SQueueWorker *worker = pool->workers + i;
127,815✔
59
    if (taosCheckPthreadValid(worker->thread)) {
127,815!
60
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
127,815!
61
      (void)taosThreadJoin(worker->thread, NULL);
127,815✔
62
      taosThreadClear(&worker->thread);
127,815✔
63
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
127,815!
64
    }
65
  }
66

67
  taosMemoryFreeClear(pool->workers);
23,022!
68
  taosCloseQset(pool->qset);
23,022✔
69
  (void)taosThreadMutexDestroy(&pool->mutex);
23,022✔
70

71
  uInfo("worker:%s is closed", pool->name);
23,022!
72
}
23,022✔
73

74
static void *tQWorkerThreadFp(SQueueWorker *worker) {
127,789✔
75
  SQWorkerPool *pool = worker->pool;
127,789✔
76
  SQueueInfo    qinfo = {0};
127,789✔
77
  void         *msg = NULL;
127,789✔
78
  int32_t       code = 0;
127,789✔
79

80
  int32_t ret = taosBlockSIGPIPE();
127,789✔
81
  if (ret < 0) {
127,794!
82
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
83
  }
84

85
  setThreadName(pool->name);
127,794✔
86
  worker->pid = taosGetSelfPthreadId();
127,789✔
87
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
127,769!
88

89
  while (1) {
90
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
2,485,593✔
91
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
127,700!
92
            worker->pid);
93
      break;
127,811✔
94
    }
95

96
    if (qinfo.timestamp != 0) {
2,357,113!
97
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
2,357,541✔
98
      if (cost > QUEUE_THRESHOLD) {
2,357,541✔
99
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
2,552!
100
      }
101
    }
102

103
    if (qinfo.fp != NULL) {
2,357,476✔
104
      qinfo.workerId = worker->id;
2,357,414✔
105
      qinfo.threadNum = pool->num;
2,357,414✔
106
      (*((FItem)qinfo.fp))(&qinfo, msg);
2,357,414✔
107
    }
108

109
    taosUpdateItemSize(qinfo.queue, 1);
2,357,833✔
110
  }
111

112
  DestoryThreadLocalRegComp();
127,811✔
113

114
  return NULL;
127,806✔
115
}
116

117
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
23,022✔
118
  int32_t     code;
119
  STaosQueue *queue;
120

121
  code = taosOpenQueue(&queue);
23,022✔
122
  if (code) {
23,022!
123
    terrno = code;
×
124
    return NULL;
×
125
  }
126

127
  (void)taosThreadMutexLock(&pool->mutex);
23,022✔
128
  taosSetQueueFp(queue, fp, NULL);
23,022✔
129
  code = taosAddIntoQset(pool->qset, queue, ahandle);
23,022✔
130
  if (code) {
23,022!
131
    taosCloseQueue(queue);
×
132
    (void)taosThreadMutexUnlock(&pool->mutex);
×
133
    terrno = code;
×
134
    return NULL;
×
135
  }
136

137
  // spawn a thread to process queue
138
  if (pool->num < pool->max) {
23,022!
139
    do {
140
      SQueueWorker *worker = pool->workers + pool->num;
127,815✔
141

142
      TdThreadAttr thAttr;
143
      (void)taosThreadAttrInit(&thAttr);
127,815✔
144
      (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
127,815✔
145

146
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
127,815!
147
        taosCloseQueue(queue);
×
148
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
149
        queue = NULL;
×
150
        break;
×
151
      }
152

153
      (void)taosThreadAttrDestroy(&thAttr);
127,815✔
154
      pool->num++;
127,815✔
155
      uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
127,815!
156
    } while (pool->num < pool->min);
127,815✔
157
  }
158

159
  (void)taosThreadMutexUnlock(&pool->mutex);
23,022✔
160
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
23,022!
161

162
  return queue;
23,022✔
163
}
164

165
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
23,022✔
166
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
23,022!
167
  taosCloseQueue(queue);
23,022✔
168
}
23,022✔
169

170
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
5,046✔
171
  int32_t code;
172

173
  code = taosOpenQset(&pool->qset);
5,046✔
174
  if (code) {
5,046!
175
    return terrno = code;
×
176
  }
177

178
  pool->workers = taosArrayInit(2, sizeof(SQueueWorker *));
5,046✔
179
  if (pool->workers == NULL) {
5,046!
180
    taosCloseQset(pool->qset);
×
181
    return terrno;
×
182
  }
183

184
  (void)taosThreadMutexInit(&pool->mutex, NULL);
5,046✔
185

186
  uInfo("worker:%s is initialized as auto", pool->name);
5,046!
187
  return 0;
5,046✔
188
}
189

190
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
5,046✔
191
  int32_t size = taosArrayGetSize(pool->workers);
5,046✔
192
  for (int32_t i = 0; i < size; ++i) {
13,918✔
193
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
8,872✔
194
    if (taosCheckPthreadValid(worker->thread)) {
8,872!
195
      taosQsetThreadResume(pool->qset);
8,872✔
196
    }
197
  }
198

199
  for (int32_t i = 0; i < size; ++i) {
13,918✔
200
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
8,872✔
201
    if (taosCheckPthreadValid(worker->thread)) {
8,872!
202
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
8,872!
203
      (void)taosThreadJoin(worker->thread, NULL);
8,872✔
204
      taosThreadClear(&worker->thread);
8,872✔
205
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
8,872!
206
    }
207
    taosMemoryFree(worker);
8,872!
208
  }
209

210
  taosArrayDestroy(pool->workers);
5,046✔
211
  taosCloseQset(pool->qset);
5,046✔
212
  (void)taosThreadMutexDestroy(&pool->mutex);
5,046✔
213

214
  uInfo("worker:%s is closed", pool->name);
5,046!
215
}
5,046✔
216

217
static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
8,872✔
218
  SAutoQWorkerPool *pool = worker->pool;
8,872✔
219
  SQueueInfo        qinfo = {0};
8,872✔
220
  void             *msg = NULL;
8,872✔
221
  int32_t           code = 0;
8,872✔
222

223
  int32_t ret = taosBlockSIGPIPE();
8,872✔
224
  if (ret < 0) {
8,872!
225
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
226
  }
227

228
  setThreadName(pool->name);
8,872✔
229
  worker->pid = taosGetSelfPthreadId();
8,872✔
230
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
8,870!
231

232
  while (1) {
233
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
110,403✔
234
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
8,868!
235
            worker->pid);
236
      break;
8,872✔
237
    }
238

239
    if (qinfo.timestamp != 0) {
101,530!
240
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
101,530✔
241
      if (cost > QUEUE_THRESHOLD) {
101,530✔
242
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
131!
243
      }
244
    }
245

246
    if (qinfo.fp != NULL) {
101,529!
247
      qinfo.workerId = worker->id;
101,530✔
248
      qinfo.threadNum = taosArrayGetSize(pool->workers);
101,530✔
249
      (*((FItem)qinfo.fp))(&qinfo, msg);
101,530✔
250
    }
251

252
    taosUpdateItemSize(qinfo.queue, 1);
101,530✔
253
  }
254
  DestoryThreadLocalRegComp();
8,872✔
255

256
  return NULL;
8,872✔
257
}
258

259
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum) {
26,956✔
260
  int32_t     code;
261
  STaosQueue *queue;
262

263
  code = taosOpenQueue(&queue);
26,956✔
264
  if (code) {
26,956!
265
    terrno = code;
×
266
    return NULL;
×
267
  }
268

269
  (void)taosThreadMutexLock(&pool->mutex);
26,956✔
270
  taosSetQueueFp(queue, fp, NULL);
26,956✔
271

272
  code = taosAddIntoQset(pool->qset, queue, ahandle);
26,956✔
273
  if (code) {
26,956!
274
    taosCloseQueue(queue);
×
275
    (void)taosThreadMutexUnlock(&pool->mutex);
×
276
    terrno = code;
×
277
    return NULL;
×
278
  }
279

280
  int32_t queueNum = taosGetQueueNumber(pool->qset);
26,956✔
281
  int32_t curWorkerNum = taosArrayGetSize(pool->workers);
26,956✔
282
  int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
26,956✔
283

284
  if (dstWorkerNum < minNum) {
26,956✔
285
    dstWorkerNum = minNum;
7,393✔
286
  }
287

288
  // spawn a thread to process queue
289
  while (curWorkerNum < dstWorkerNum) {
35,828✔
290
    SQueueWorker *worker = taosMemoryCalloc(1, sizeof(SQueueWorker));
8,872!
291
    if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
17,744!
292
      uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
×
293
      taosMemoryFree(worker);
×
294
      taosCloseQueue(queue);
×
295
      (void)taosThreadMutexUnlock(&pool->mutex);
×
296
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
297
      return NULL;
×
298
    }
299
    worker->id = curWorkerNum;
8,872✔
300
    worker->pool = pool;
8,872✔
301

302
    TdThreadAttr thAttr;
303
    (void)taosThreadAttrInit(&thAttr);
8,872✔
304
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
8,872✔
305

306
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
8,872!
307
      uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
×
308
      void *tmp = taosArrayPop(pool->workers);
×
309
      taosMemoryFree(worker);
×
310
      taosCloseQueue(queue);
×
311
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
312
      return NULL;
×
313
    }
314

315
    (void)taosThreadAttrDestroy(&thAttr);
8,872✔
316
    int32_t numOfThreads = taosArrayGetSize(pool->workers);
8,872✔
317
    uInfo("worker:%s:%d is launched, total:%d, expect:%d", pool->name, worker->id, numOfThreads, dstWorkerNum);
8,872!
318

319
    curWorkerNum++;
8,872✔
320
  }
321

322
  (void)taosThreadMutexUnlock(&pool->mutex);
26,956✔
323
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
26,956!
324

325
  return queue;
26,956✔
326
}
327

328
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
26,956✔
329
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
26,956!
330
  taosCloseQueue(queue);
26,956✔
331
}
26,954✔
332

333
int32_t tWWorkerInit(SWWorkerPool *pool) {
61,545✔
334
  pool->nextId = 0;
61,545✔
335
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
61,545!
336
  if (pool->workers == NULL) {
61,545!
337
    return terrno;
×
338
  }
339

340
  (void)taosThreadMutexInit(&pool->mutex, NULL);
61,545✔
341

342
  for (int32_t i = 0; i < pool->max; ++i) {
153,318✔
343
    SWWorker *worker = pool->workers + i;
91,773✔
344
    worker->id = i;
91,773✔
345
    worker->qall = NULL;
91,773✔
346
    worker->qset = NULL;
91,773✔
347
    worker->pool = pool;
91,773✔
348
  }
349

350
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
61,545!
351
  return 0;
61,545✔
352
}
353

354
void tWWorkerCleanup(SWWorkerPool *pool) {
61,542✔
355
  for (int32_t i = 0; i < pool->max; ++i) {
153,309✔
356
    SWWorker *worker = pool->workers + i;
91,770✔
357
    if (taosCheckPthreadValid(worker->thread)) {
91,770✔
358
      if (worker->qset) {
72,526!
359
        taosQsetThreadResume(worker->qset);
72,526✔
360
      }
361
    }
362
  }
363

364
  for (int32_t i = 0; i < pool->max; ++i) {
153,312✔
365
    SWWorker *worker = pool->workers + i;
91,767✔
366
    if (taosCheckPthreadValid(worker->thread)) {
91,767✔
367
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
72,528!
368
      (void)taosThreadJoin(worker->thread, NULL);
72,533✔
369
      taosThreadClear(&worker->thread);
72,533✔
370
      taosFreeQall(worker->qall);
72,534✔
371
      taosCloseQset(worker->qset);
72,533✔
372
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
72,534!
373
    }
374
  }
375

376
  taosMemoryFreeClear(pool->workers);
61,545!
377
  (void)taosThreadMutexDestroy(&pool->mutex);
61,542✔
378

379
  uInfo("worker:%s is closed", pool->name);
61,541!
380
}
61,544✔
381

382
static void *tWWorkerThreadFp(SWWorker *worker) {
72,531✔
383
  SWWorkerPool *pool = worker->pool;
72,531✔
384
  SQueueInfo    qinfo = {0};
72,531✔
385
  void         *msg = NULL;
72,531✔
386
  int32_t       code = 0;
72,531✔
387
  int32_t       numOfMsgs = 0;
72,531✔
388

389
  int32_t ret = taosBlockSIGPIPE();
72,531✔
390
  if (ret < 0) {
72,522!
391
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
392
  }
393

394
  setThreadName(pool->name);
72,522✔
395
  worker->pid = taosGetSelfPthreadId();
72,528✔
396
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
72,524!
397

398
  while (1) {
399
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
8,136,647✔
400
    if (numOfMsgs == 0) {
8,134,681✔
401
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
72,152!
402
            worker->pid);
403
      break;
72,533✔
404
    }
405

406
    if (qinfo.timestamp != 0) {
8,062,529!
407
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
8,063,328✔
408
      if (cost > QUEUE_THRESHOLD) {
8,063,328✔
409
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
41!
410
      }
411
    }
412

413
    if (qinfo.fp != NULL) {
8,063,171!
414
      qinfo.workerId = worker->id;
8,063,428✔
415
      qinfo.threadNum = pool->num;
8,063,428✔
416
      (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
8,063,428✔
417
    }
418
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
8,063,886✔
419
  }
420

421
  return NULL;
72,533✔
422
}
423

424
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
94,410✔
425
  (void)taosThreadMutexLock(&pool->mutex);
94,410✔
426
  SWWorker   *worker = pool->workers + pool->nextId;
94,410✔
427
  int32_t     code = -1;
94,410✔
428
  STaosQueue *queue;
429

430
  code = taosOpenQueue(&queue);
94,410✔
431
  if (code) goto _OVER;
94,409!
432

433
  taosSetQueueFp(queue, NULL, fp);
94,409✔
434
  if (worker->qset == NULL) {
94,409✔
435
    code = taosOpenQset(&worker->qset);
72,533✔
436
    if (code) goto _OVER;
72,534!
437

438
    code = taosAddIntoQset(worker->qset, queue, ahandle);
72,534✔
439
    if (code) goto _OVER;
72,533!
440
    code = taosAllocateQall(&worker->qall);
72,533✔
441
    if (code) goto _OVER;
72,534!
442

443
    TdThreadAttr thAttr;
444
    (void)taosThreadAttrInit(&thAttr);
72,534✔
445
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
72,534✔
446
    code = taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker);
72,534✔
447
    if ((code)) goto _OVER;
72,534!
448

449
    uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
72,534!
450
    pool->nextId = (pool->nextId + 1) % pool->max;
72,534✔
451

452
    (void)taosThreadAttrDestroy(&thAttr);
72,534✔
453
    pool->num++;
72,534✔
454
    if (pool->num > pool->max) pool->num = pool->max;
72,534✔
455
  } else {
456
    code = taosAddIntoQset(worker->qset, queue, ahandle);
21,876✔
457
    if (code) goto _OVER;
21,876!
458
    pool->nextId = (pool->nextId + 1) % pool->max;
21,876✔
459
  }
460

461
_OVER:
94,410✔
462
  (void)taosThreadMutexUnlock(&pool->mutex);
94,410✔
463

464
  if (code) {
94,410!
465
    if (queue != NULL) taosCloseQueue(queue);
×
466
    if (worker->qset != NULL) taosCloseQset(worker->qset);
×
467
    if (worker->qall != NULL) taosFreeQall(worker->qall);
×
468
    terrno = code;
×
469
    return NULL;
×
470
  } else {
471
    while (worker->pid <= 0) taosMsleep(10);
152,745✔
472

473
    taosQueueSetThreadId(queue, worker->pid);
94,405✔
474
    uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, worker->pid);
94,404!
475
    return queue;
94,410✔
476
  }
477
}
478

479
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
94,397✔
480
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
94,397!
481
  taosCloseQueue(queue);
94,406✔
482
}
94,402✔
483

484
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
25,683✔
485
  int32_t code;
486
  pWorker->poolType = pCfg->poolType;
25,683✔
487
  pWorker->name = pCfg->name;
25,683✔
488

489
  switch (pCfg->poolType) {
25,683!
490
    case QWORKER_POOL: {
23,022✔
491
      SQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQWorkerPool));
23,022!
492
      if (!pPool) {
23,022!
493
        return terrno;
×
494
      }
495
      pPool->name = pCfg->name;
23,022✔
496
      pPool->min = pCfg->min;
23,022✔
497
      pPool->max = pCfg->max;
23,022✔
498
      pWorker->pool = pPool;
23,022✔
499
      if ((code = tQWorkerInit(pPool))) {
23,022!
500
        return (terrno = code);
×
501
      }
502

503
      pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
23,022✔
504
      if (pWorker->queue == NULL) {
23,022!
505
        return terrno;
×
506
      }
507
    } break;
23,022✔
508
    case QUERY_AUTO_QWORKER_POOL: {
2,661✔
509
      SQueryAutoQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPool));
2,661!
510
      if (!pPool) {
2,661!
511
        return terrno;
×
512
      }
513
      pPool->name = pCfg->name;
2,661✔
514
      pPool->min = pCfg->min;
2,661✔
515
      pPool->max = pCfg->max;
2,661✔
516
      pWorker->pool = pPool;
2,661✔
517

518
      code = tQueryAutoQWorkerInit(pPool);
2,661✔
519
      if (code) return code;
2,661!
520

521
      pWorker->queue = tQueryAutoQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
2,661✔
522
      if (!pWorker->queue) {
2,661!
523
        return terrno;
×
524
      }
525
    } break;
2,661✔
526
    default:
×
527
      return TSDB_CODE_INVALID_PARA;
×
528
  }
529
  return 0;
25,683✔
530
}
531

532
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
25,683✔
533
  if (pWorker->queue == NULL) return;
25,683!
534
  while (!taosQueueEmpty(pWorker->queue)) {
26,863✔
535
    taosMsleep(10);
1,180✔
536
  }
537

538
  switch (pWorker->poolType) {
25,683!
539
    case QWORKER_POOL:
23,022✔
540
      tQWorkerCleanup(pWorker->pool);
23,022✔
541
      tQWorkerFreeQueue(pWorker->pool, pWorker->queue);
23,022✔
542
      taosMemoryFree(pWorker->pool);
23,022!
543
      break;
23,022✔
544
    case QUERY_AUTO_QWORKER_POOL:
2,661✔
545
      tQueryAutoQWorkerCleanup(pWorker->pool);
2,661✔
546
      tQueryAutoQWorkerFreeQueue(pWorker->pool, pWorker->queue);
2,661✔
547
      taosMemoryFree(pWorker->pool);
2,661!
548
      break;
2,661✔
549
    default:
×
550
      break;
×
551
  }
552
}
553

554
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
53,976✔
555
  SWWorkerPool *pPool = &pWorker->pool;
53,976✔
556
  pPool->name = pCfg->name;
53,976✔
557
  pPool->max = pCfg->max;
53,976✔
558

559
  int32_t code = tWWorkerInit(pPool);
53,976✔
560
  if (code) return code;
53,976!
561

562
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
53,976✔
563
  if (pWorker->queue == NULL) {
53,976!
564
    return terrno;
×
565
  }
566

567
  pWorker->name = pCfg->name;
53,976✔
568
  return 0;
53,976✔
569
}
570

571
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
53,975✔
572
  if (pWorker->queue == NULL) return;
53,975!
573

574
  while (!taosQueueEmpty(pWorker->queue)) {
58,442✔
575
    taosMsleep(10);
4,470✔
576
  }
577

578
  tWWorkerCleanup(&pWorker->pool);
53,975✔
579
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
53,976✔
580
}
581

582
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool);
583
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p);
584
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p);
585
static void    tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool);
586
static bool    tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker);
587

588
#define GET_ACTIVE_N(int64_val)  (int32_t)((int64_val) >> 32)
589
#define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF)
590

591
static int32_t atomicFetchSubActive(int64_t *ptr, int32_t val) {
×
592
  int64_t acutalSubVal = val;
×
593
  acutalSubVal <<= 32;
×
594
  int64_t newVal64 = atomic_fetch_sub_64(ptr, acutalSubVal);
×
595
  return GET_ACTIVE_N(newVal64);
×
596
}
597

598
static int32_t atomicFetchSubRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_sub_64(ptr, val)); }
19,846,008✔
599

600
static int32_t atomicFetchAddActive(int64_t *ptr, int32_t val) {
1,637,568✔
601
  int64_t actualAddVal = val;
1,637,568✔
602
  actualAddVal <<= 32;
1,637,568✔
603
  int64_t newVal64 = atomic_fetch_add_64(ptr, actualAddVal);
1,637,568✔
604
  return GET_ACTIVE_N(newVal64);
1,637,568✔
605
}
606

607
static int32_t atomicFetchAddRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_add_64(ptr, val)); }
×
608

609
static bool atomicCompareExchangeActive(int64_t *ptr, int32_t *expectedVal, int32_t newVal) {
10✔
610
  int64_t oldVal64 = *expectedVal, newVal64 = newVal;
10✔
611
  int32_t running = GET_RUNNING_N(*ptr);
10✔
612
  oldVal64 <<= 32;
10✔
613
  newVal64 <<= 32;
10✔
614
  oldVal64 |= running;
10✔
615
  newVal64 |= running;
10✔
616
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
10✔
617
  if (actualNewVal64 == oldVal64) {
10!
618
    return true;
10✔
619
  } else {
620
    *expectedVal = GET_ACTIVE_N(actualNewVal64);
×
621
    return false;
×
622
  }
623
}
624

625
static int64_t atomicCompareExchangeRunning(int64_t *ptr, int32_t *expectedVal, int32_t newVal) {
×
626
  int64_t oldVal64 = *expectedVal, newVal64 = newVal;
×
627
  int64_t activeShifted = GET_ACTIVE_N(*ptr);
×
628
  activeShifted <<= 32;
×
629
  oldVal64 |= activeShifted;
×
630
  newVal64 |= activeShifted;
×
631
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
×
632
  if (actualNewVal64 == oldVal64) {
×
633
    return true;
×
634
  } else {
635
    *expectedVal = GET_RUNNING_N(actualNewVal64);
×
636
    return false;
×
637
  }
638
}
639

640
static int64_t atomicCompareExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive,
22,361,316✔
641
                                                     int32_t *expectedRunning, int32_t newRunning) {
642
  int64_t oldVal64 = *expectedActive, newVal64 = newActive;
22,361,316✔
643
  oldVal64 <<= 32;
22,361,316✔
644
  oldVal64 |= *expectedRunning;
22,361,316✔
645
  newVal64 <<= 32;
22,361,316✔
646
  newVal64 |= newRunning;
22,361,316✔
647
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
22,361,316✔
648
  if (actualNewVal64 == oldVal64) {
22,362,930✔
649
    return true;
22,325,652✔
650
  } else {
651
    *expectedActive = GET_ACTIVE_N(actualNewVal64);
37,278✔
652
    *expectedRunning = GET_RUNNING_N(actualNewVal64);
37,278✔
653
    return false;
37,278✔
654
  }
655
}
656

657
static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
1,638,892✔
658
  SQueryAutoQWorkerPool *pool = worker->pool;
1,638,892✔
659
  SQueueInfo             qinfo = {0};
1,638,892✔
660
  void                  *msg = NULL;
1,638,892✔
661
  int32_t                code = 0;
1,638,892✔
662

663
  int32_t ret = taosBlockSIGPIPE();
1,638,892✔
664
  if (ret < 0) {
1,639,128!
665
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
666
  }
667

668
  setThreadName(pool->name);
1,639,128✔
669
  worker->pid = taosGetSelfPthreadId();
1,638,361✔
670
  uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
1,638,304✔
671

672
  while (1) {
673
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
21,483,614✔
674
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
1,635,094!
675
            worker->pid);
676
      break;
1,633,678✔
677
    }
678

679
    if (qinfo.timestamp != 0) {
19,845,147!
680
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
19,845,896✔
681
      if (cost > QUEUE_THRESHOLD) {
19,845,896!
682
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
×
683
      }
684
    }
685

686
    tQueryAutoQWorkerWaitingCheck(pool);
19,843,875✔
687

688
    if (qinfo.fp != NULL) {
19,847,536!
689
      qinfo.workerId = worker->id;
19,847,559✔
690
      qinfo.threadNum = pool->num;
19,847,559✔
691
      qinfo.workerCb = pool->pCb;
19,847,559✔
692
      (*((FItem)qinfo.fp))(&qinfo, msg);
19,847,559✔
693
    }
694

695
    taosUpdateItemSize(qinfo.queue, 1);
19,844,453✔
696
    if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) {
19,847,426✔
697
      uDebug("worker:%s:%d exited", pool->name, worker->id);
1,810✔
698
      break;
1,810✔
699
    }
700
  }
701

702
  DestoryThreadLocalRegComp();
1,635,488✔
703

704
  return NULL;
1,634,298✔
705
}
706

707
static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) {
21,085,729✔
708
  SQueryAutoQWorkerPool *pPool = p;
21,085,729✔
709
  bool                   ret = false;
21,085,729✔
710
  int32_t                waiting = pPool->waitingAfterBlockN;
21,085,729✔
711
  while (waiting > 0) {
21,085,729✔
712
    int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1);
57✔
713
    if (waitingNew == waiting) {
57!
714
      (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
57✔
715
      (void)taosThreadCondSignal(&pPool->waitingAfterBlockCond);
57✔
716
      (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
57✔
717
      ret = true;
92✔
718
      break;
92✔
719
    }
720
    waiting = waitingNew;
×
721
  }
722
  return ret;
21,085,764✔
723
}
724

725
static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void *p) {
21,085,599✔
726
  SQueryAutoQWorkerPool *pPool = p;
21,085,599✔
727
  bool                   ret = false;
21,085,599✔
728
  int32_t                waiting = pPool->waitingBeforeProcessMsgN;
21,085,599✔
729
  while (waiting > 0) {
21,085,599✔
730
    int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1);
10✔
731
    if (waitingNew == waiting) {
10!
732
      (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
10✔
733
      (void)taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond);
10✔
734
      (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
10✔
735
      ret = true;
×
736
      break;
×
737
    }
738
    waiting = waitingNew;
×
739
  }
740
  return ret;
21,085,575✔
741
}
742

743
static bool tQueryAutoQWorkerTryDecActive(void *p, int32_t minActive) {
21,085,632✔
744
  SQueryAutoQWorkerPool *pPool = p;
21,085,632✔
745
  bool                   ret = false;
21,085,632✔
746
  int64_t                val64 = pPool->activeRunningN;
21,085,632✔
747
  int32_t                active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64);
21,085,632✔
748
  while (active > minActive) {
21,086,024✔
749
    if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1))
1,239,366✔
750
      return true;
1,239,031✔
751
  }
752
  return false;
19,846,658✔
753
}
754

755
static void tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
19,845,808✔
756
  while (1) {
175✔
757
    int64_t val64 = pPool->activeRunningN;
19,845,808✔
758
    int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
19,845,808✔
759
    while (running < pPool->num) {
19,881,986!
760
      if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) {
19,882,109✔
761
        return;
19,847,581✔
762
      }
763
    }
764
    if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) {
×
765
      break;
×
766
    }
767
  }
768
  // to wait for process
769
  (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
×
770
  (void)atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1);
10✔
771
  if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock);
10!
772
  // recovered from waiting
773
  (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
10✔
774
  return;
10✔
775
}
776

777
bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) {
19,846,922✔
778
  if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) ||
39,693,669!
779
      tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) {
19,846,524✔
780
    (void)taosThreadMutexLock(&pPool->poolLock);
670,756✔
781
    SListNode *pNode = listNode(pWorker);
670,669✔
782
    SListNode *tNode = tdListPopNode(pPool->workers, pNode);
670,669✔
783
    // reclaim some workers
784
    if (pWorker->id >= pPool->maxInUse) {
670,667!
785
      while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) {
×
786
        SListNode         *head = tdListPopHead(pPool->exitedWorkers);
×
787
        SQueryAutoQWorker *pWorker = (SQueryAutoQWorker *)head->data;
×
788
        if (pWorker && taosCheckPthreadValid(pWorker->thread)) {
×
789
          (void)taosThreadJoin(pWorker->thread, NULL);
×
790
          taosThreadClear(&pWorker->thread);
×
791
        }
792
        taosMemoryFree(head);
×
793
      }
794
      tdListAppendNode(pPool->exitedWorkers, pNode);
×
795
      (void)taosThreadMutexUnlock(&pPool->poolLock);
×
796
      return false;
×
797
    }
798

799
    // put back to backup pool
800
    tdListAppendNode(pPool->backupWorkers, pNode);
670,667✔
801
    (void)taosThreadMutexUnlock(&pPool->poolLock);
670,667✔
802

803
    // start to wait at backup cond
804
    (void)taosThreadMutexLock(&pPool->backupLock);
670,669✔
805
    (void)atomic_fetch_add_32(&pPool->backupNum, 1);
670,669✔
806
    if (!pPool->exit) (void)taosThreadCondWait(&pPool->backupCond, &pPool->backupLock);
670,669!
807
    (void)taosThreadMutexUnlock(&pPool->backupLock);
670,667✔
808

809
    // recovered from backup
810
    (void)taosThreadMutexLock(&pPool->poolLock);
670,664✔
811
    if (pPool->exit) {
670,669✔
812
      (void)taosThreadMutexUnlock(&pPool->poolLock);
1,811✔
813
      return false;
1,811✔
814
    }
815
    SListNode *tNode1 = tdListPopNode(pPool->backupWorkers, pNode);
668,858✔
816
    tdListAppendNode(pPool->workers, pNode);
668,858✔
817
    (void)taosThreadMutexUnlock(&pPool->poolLock);
668,858✔
818

819
    return true;
668,857✔
820
  } else {
821
    (void)atomicFetchSubRunning(&pPool->activeRunningN, 1);
19,175,811✔
822
    return true;
19,176,651✔
823
  }
824
}
825

826
int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
22,394✔
827
  int32_t code;
828

829
  pool->exit = false;
22,394✔
830

831
  (void)taosThreadMutexInit(&pool->poolLock, NULL);
22,394✔
832
  (void)taosThreadMutexInit(&pool->backupLock, NULL);
22,394✔
833
  (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL);
22,394✔
834
  (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL);
22,394✔
835

836
  (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL);
22,394✔
837
  (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL);
22,394✔
838
  (void)taosThreadCondInit(&pool->backupCond, NULL);
22,394✔
839

840
  code = taosOpenQset(&pool->qset);
22,394✔
841
  if (code) return terrno = code;
22,394!
842
  pool->workers = tdListNew(sizeof(SQueryAutoQWorker));
22,394✔
843
  if (!pool->workers) return terrno;
22,394!
844
  pool->backupWorkers = tdListNew(sizeof(SQueryAutoQWorker));
22,394✔
845
  if (!pool->backupWorkers) return terrno;
22,394!
846
  pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker));
22,394✔
847
  if (!pool->exitedWorkers) return terrno;
22,394!
848
  pool->maxInUse = pool->max * 2 + 2;
22,394✔
849

850
  if (!pool->pCb) {
22,394!
851
    pool->pCb = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPoolCB));
22,394!
852
    if (!pool->pCb) return terrno;
22,394!
853
    pool->pCb->pPool = pool;
22,394✔
854
    pool->pCb->beforeBlocking = tQueryAutoQWorkerBeforeBlocking;
22,394✔
855
    pool->pCb->afterRecoverFromBlocking = tQueryAutoQWorkerRecoverFromBlocking;
22,394✔
856
  }
857
  return TSDB_CODE_SUCCESS;
22,394✔
858
}
859

860
void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
22,395✔
861
  (void)taosThreadMutexLock(&pPool->poolLock);
22,395✔
862
  pPool->exit = true;
22,395✔
863
  int32_t size = 0;
22,395✔
864
  if (pPool->workers) {
22,395✔
865
    size = listNEles(pPool->workers);
22,394✔
866
  }
867
  if (pPool->backupWorkers) {
22,395✔
868
    size += listNEles(pPool->backupWorkers);
22,394✔
869
  }
870
  if (pPool->qset) {
22,395✔
871
    for (int32_t i = 0; i < size; ++i) {
1,661,771✔
872
      taosQsetThreadResume(pPool->qset);
1,639,377✔
873
    }
874
  }
875
  (void)taosThreadMutexUnlock(&pPool->poolLock);
22,395✔
876

877
  (void)taosThreadMutexLock(&pPool->backupLock);
22,395✔
878
  (void)taosThreadCondBroadcast(&pPool->backupCond);
22,395✔
879
  (void)taosThreadMutexUnlock(&pPool->backupLock);
22,395✔
880

881
  (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
22,395✔
882
  (void)taosThreadCondBroadcast(&pPool->waitingAfterBlockCond);
22,395✔
883
  (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
22,395✔
884

885
  (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
22,395✔
886
  (void)taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond);
22,395✔
887
  (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
22,395✔
888

889
  int32_t            idx = 0;
22,395✔
890
  SQueryAutoQWorker *worker = NULL;
22,395✔
891
  while (pPool->workers) {
1,659,961✔
892
    (void)taosThreadMutexLock(&pPool->poolLock);
1,659,960✔
893
    if (listNEles(pPool->workers) == 0) {
1,659,960✔
894
      (void)taosThreadMutexUnlock(&pPool->poolLock);
22,394✔
895
      break;
22,394✔
896
    }
897
    SListNode *pNode = tdListPopHead(pPool->workers);
1,637,566✔
898
    worker = (SQueryAutoQWorker *)pNode->data;
1,637,566✔
899
    (void)taosThreadMutexUnlock(&pPool->poolLock);
1,637,566✔
900
    if (worker && taosCheckPthreadValid(worker->thread)) {
1,637,566!
901
      (void)taosThreadJoin(worker->thread, NULL);
1,637,566✔
902
      taosThreadClear(&worker->thread);
1,637,566✔
903
    }
904
    taosMemoryFree(pNode);
1,637,566!
905
  }
906

907
  while (pPool->backupWorkers && listNEles(pPool->backupWorkers) > 0) {
24,206✔
908
    SListNode *pNode = tdListPopHead(pPool->backupWorkers);
1,811✔
909
    worker = (SQueryAutoQWorker *)pNode->data;
1,811✔
910
    if (worker && taosCheckPthreadValid(worker->thread)) {
1,811!
911
      (void)taosThreadJoin(worker->thread, NULL);
1,811✔
912
      taosThreadClear(&worker->thread);
1,811✔
913
    }
914
    taosMemoryFree(pNode);
1,811!
915
  }
916

917
  while (pPool->exitedWorkers && listNEles(pPool->exitedWorkers) > 0) {
22,395!
918
    SListNode *pNode = tdListPopHead(pPool->exitedWorkers);
×
919
    worker = (SQueryAutoQWorker *)pNode->data;
×
920
    if (worker && taosCheckPthreadValid(worker->thread)) {
×
921
      (void)taosThreadJoin(worker->thread, NULL);
×
922
      taosThreadClear(&worker->thread);
×
923
    }
924
    taosMemoryFree(pNode);
×
925
  }
926

927
  pPool->workers = tdListFree(pPool->workers);
22,395✔
928
  pPool->backupWorkers = tdListFree(pPool->backupWorkers);
22,395✔
929
  pPool->exitedWorkers = tdListFree(pPool->exitedWorkers);
22,395✔
930
  taosMemoryFree(pPool->pCb);
22,395!
931

932
  (void)taosThreadMutexDestroy(&pPool->poolLock);
22,395✔
933
  (void)taosThreadMutexDestroy(&pPool->backupLock);
22,395✔
934
  (void)taosThreadMutexDestroy(&pPool->waitingAfterBlockLock);
22,395✔
935
  (void)taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock);
22,395✔
936

937
  (void)taosThreadCondDestroy(&pPool->backupCond);
22,395✔
938
  (void)taosThreadCondDestroy(&pPool->waitingAfterBlockCond);
22,395✔
939
  (void)taosThreadCondDestroy(&pPool->waitingBeforeProcessMsgCond);
22,395✔
940
  taosCloseQset(pPool->qset);
22,395✔
941
}
22,395✔
942

943
STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahandle, FItem fp) {
33,349✔
944
  STaosQueue *queue;
945
  int32_t     code = taosOpenQueue(&queue);
33,349✔
946
  if (code) {
33,349!
947
    terrno = code;
×
948
    return NULL;
×
949
  }
950

951
  (void)taosThreadMutexLock(&pool->poolLock);
33,349✔
952
  taosSetQueueFp(queue, fp, NULL);
33,349✔
953
  code = taosAddIntoQset(pool->qset, queue, ahandle);
33,349✔
954
  if (code) {
33,349!
955
    taosCloseQueue(queue);
×
956
    queue = NULL;
×
957
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
958
    return NULL;
×
959
  }
960
  SQueryAutoQWorker  worker = {0};
33,349✔
961
  SQueryAutoQWorker *pWorker = NULL;
33,349✔
962

963
  // spawn a thread to process queue
964
  if (pool->num < pool->max) {
33,349✔
965
    do {
966
      worker.id = listNEles(pool->workers);
1,637,568✔
967
      worker.backupIdx = -1;
1,637,568✔
968
      worker.pool = pool;
1,637,568✔
969
      SListNode *pNode = tdListAdd(pool->workers, &worker);
1,637,568✔
970
      if (!pNode) {
1,637,568!
971
        taosCloseQueue(queue);
×
972
        queue = NULL;
×
973
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
974
        break;
×
975
      }
976
      pWorker = (SQueryAutoQWorker *)pNode->data;
1,637,568✔
977

978
      TdThreadAttr thAttr;
979
      (void)taosThreadAttrInit(&thAttr);
1,637,568✔
980
      (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,637,568✔
981

982
      if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
1,637,568!
983
        taosCloseQueue(queue);
×
984
        queue = NULL;
×
985
        break;
×
986
      }
987

988
      (void)taosThreadAttrDestroy(&thAttr);
1,637,568✔
989
      pool->num++;
1,637,568✔
990
      (void)atomicFetchAddActive(&pool->activeRunningN, 1);
1,637,568✔
991
      uInfo("worker:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num);
1,637,568!
992
    } while (pool->num < pool->min);
1,637,568✔
993
  }
994

995
  (void)taosThreadMutexUnlock(&pool->poolLock);
33,349✔
996
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
33,349!
997

998
  return queue;
33,349✔
999
}
1000

1001
void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool *pPool, STaosQueue *pQ) { taosCloseQueue(pQ); }
16,139✔
1002

1003
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool) {
670,613✔
1004
  // try backup pool
1005
  int32_t backup = pool->backupNum;
670,613✔
1006
  while (backup > 0) {
670,720✔
1007
    int32_t backupNew = atomic_val_compare_exchange_32(&pool->backupNum, backup, backup - 1);
668,910✔
1008
    if (backupNew == backup) {
668,963✔
1009
      (void)taosThreadCondSignal(&pool->backupCond);
668,856✔
1010
      return TSDB_CODE_SUCCESS;
668,858✔
1011
    }
1012
    backup = backupNew;
107✔
1013
  }
1014
  // backup pool is empty, create new
1015
  SQueryAutoQWorker *pWorker = NULL;
1,810✔
1016
  SQueryAutoQWorker  worker = {0};
1,810✔
1017
  worker.pool = pool;
1,810✔
1018
  worker.backupIdx = -1;
1,810✔
1019
  (void)taosThreadMutexLock(&pool->poolLock);
1,810✔
1020
  worker.id = listNEles(pool->workers);
1,809✔
1021
  SListNode *pNode = tdListAdd(pool->workers, &worker);
1,809✔
1022
  if (!pNode) {
1,809!
1023
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
1024
    return terrno;
×
1025
  }
1026
  (void)taosThreadMutexUnlock(&pool->poolLock);
1,809✔
1027
  pWorker = (SQueryAutoQWorker *)pNode->data;
1,809✔
1028

1029
  TdThreadAttr thAttr;
1030
  (void)taosThreadAttrInit(&thAttr);
1,809✔
1031
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,809✔
1032

1033
  if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
1,809!
1034
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1035
    return terrno;
×
1036
  }
1037
  (void)taosThreadAttrDestroy(&thAttr);
1,809✔
1038

1039
  return TSDB_CODE_SUCCESS;
1,809✔
1040
}
1041

1042
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) {
1,239,011✔
1043
  SQueryAutoQWorkerPool *pPool = p;
1,239,011✔
1044
  if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(p) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(p) ||
2,478,070!
1045
      tQueryAutoQWorkerTryDecActive(p, pPool->num)) {
1,239,029✔
1046
  } else {
1047
    int32_t code = tQueryAutoQWorkerAddWorker(pPool);
670,655✔
1048
    if (code != TSDB_CODE_SUCCESS) {
670,667!
1049
      return code;
×
1050
    }
1051
    (void)atomicFetchSubRunning(&pPool->activeRunningN, 1);
670,667✔
1052
  }
1053

1054
  return TSDB_CODE_SUCCESS;
1,239,091✔
1055
}
1056

1057
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) {
1,239,046✔
1058
  SQueryAutoQWorkerPool *pPool = p;
1,239,046✔
1059
  int64_t                val64 = pPool->activeRunningN;
1,239,046✔
1060
  int32_t                running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
1,239,046✔
1061
  while (running < pPool->num) {
1,240,151✔
1062
    if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) {
1,240,088✔
1063
      return TSDB_CODE_SUCCESS;
1,239,040✔
1064
    }
1065
  }
1066
  (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
63✔
1067
  (void)atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1);
57✔
1068
  if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock);
57!
1069
  (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
57✔
1070
  if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT;
57!
1071
  return TSDB_CODE_SUCCESS;
57✔
1072
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc