• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3534

21 Nov 2024 07:36AM UTC coverage: 60.825% (+2.0%) from 58.848%
#3534

push

travis-ci

web-flow
Merge pull request #28810 from taosdata/ehn/add-sync-heartbeat-sent-time-to-log

ehn:add-sync-heartbeat-sent-time-to-log

120023 of 252376 branches covered (47.56%)

Branch coverage included in aggregate %.

43 of 47 new or added lines in 3 files covered. (91.49%)

2254 existing lines in 162 files now uncovered.

200876 of 275203 relevant lines covered (72.99%)

16110754.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.41
/source/util/src/tworker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tworker.h"
18
#include "taoserror.h"
19
#include "tcompare.h"
20
#include "tgeosctx.h"
21
#include "tlog.h"
22

23
#define QUEUE_THRESHOLD (1000 * 1000)
24

25
typedef void *(*ThreadFp)(void *param);
26

27
int32_t tQWorkerInit(SQWorkerPool *pool) {
19,563✔
28
  int32_t code = taosOpenQset(&pool->qset);
19,563✔
29
  if (code) return code;
19,563!
30

31
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQueueWorker));
19,563✔
32
  if (pool->workers == NULL) {
19,563!
33
    taosCloseQset(pool->qset);
×
34
    return terrno;
×
35
  }
36

37
  (void)taosThreadMutexInit(&pool->mutex, NULL);
19,563✔
38

39
  for (int32_t i = 0; i < pool->max; ++i) {
137,625✔
40
    SQueueWorker *worker = pool->workers + i;
118,062✔
41
    worker->id = i;
118,062✔
42
    worker->pool = pool;
118,062✔
43
  }
44

45
  uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
19,563!
46
  return 0;
19,563✔
47
}
48

49
void tQWorkerCleanup(SQWorkerPool *pool) {
19,563✔
50
  for (int32_t i = 0; i < pool->max; ++i) {
137,625✔
51
    SQueueWorker *worker = pool->workers + i;
118,062✔
52
    if (taosCheckPthreadValid(worker->thread)) {
118,062!
53
      taosQsetThreadResume(pool->qset);
118,062✔
54
    }
55
  }
56

57
  for (int32_t i = 0; i < pool->max; ++i) {
137,625✔
58
    SQueueWorker *worker = pool->workers + i;
118,062✔
59
    if (taosCheckPthreadValid(worker->thread)) {
118,062!
60
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
118,062!
61
      (void)taosThreadJoin(worker->thread, NULL);
118,062✔
62
      taosThreadClear(&worker->thread);
118,062✔
63
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
118,062!
64
    }
65
  }
66

67
  taosMemoryFreeClear(pool->workers);
19,563!
68
  taosCloseQset(pool->qset);
19,563✔
69
  (void)taosThreadMutexDestroy(&pool->mutex);
19,563✔
70

71
  uInfo("worker:%s is closed", pool->name);
19,563!
72
}
19,563✔
73

74
static void *tQWorkerThreadFp(SQueueWorker *worker) {
118,030✔
75
  SQWorkerPool *pool = worker->pool;
118,030✔
76
  SQueueInfo    qinfo = {0};
118,030✔
77
  void         *msg = NULL;
118,030✔
78
  int32_t       code = 0;
118,030✔
79

80
  int32_t ret = taosBlockSIGPIPE();
118,030✔
81
  if (ret < 0) {
118,028!
82
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
83
  }
84

85
  setThreadName(pool->name);
118,028✔
86
  worker->pid = taosGetSelfPthreadId();
118,040✔
87
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
118,026!
88

89
  while (1) {
90
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
8,499,853✔
91
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
117,899!
92
            worker->pid);
93
      break;
118,061✔
94
    }
95

96
    if (qinfo.timestamp != 0) {
8,381,621!
97
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
8,381,688✔
98
      if (cost > QUEUE_THRESHOLD) {
8,381,688✔
99
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
2,374!
100
      }
101
    }
102

103
    if (qinfo.fp != NULL) {
8,381,642!
104
      qinfo.workerId = worker->id;
8,381,682✔
105
      qinfo.threadNum = pool->num;
8,381,682✔
106
      (*((FItem)qinfo.fp))(&qinfo, msg);
8,381,682✔
107
    }
108

109
    taosUpdateItemSize(qinfo.queue, 1);
8,381,632✔
110
  }
111

112
  DestoryThreadLocalRegComp();
118,061✔
113

114
  return NULL;
118,058✔
115
}
116

117
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
19,563✔
118
  int32_t     code;
119
  STaosQueue *queue;
120

121
  code = taosOpenQueue(&queue);
19,563✔
122
  if (code) {
19,563!
123
    terrno = code;
×
124
    return NULL;
×
125
  }
126

127
  (void)taosThreadMutexLock(&pool->mutex);
19,563✔
128
  taosSetQueueFp(queue, fp, NULL);
19,563✔
129
  code = taosAddIntoQset(pool->qset, queue, ahandle);
19,563✔
130
  if (code) {
19,563!
131
    taosCloseQueue(queue);
×
132
    (void)taosThreadMutexUnlock(&pool->mutex);
×
133
    terrno = code;
×
134
    return NULL;
×
135
  }
136

137
  // spawn a thread to process queue
138
  if (pool->num < pool->max) {
19,563!
139
    do {
140
      SQueueWorker *worker = pool->workers + pool->num;
118,062✔
141

142
      TdThreadAttr thAttr;
143
      (void)taosThreadAttrInit(&thAttr);
118,062✔
144
      (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
118,062✔
145

146
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
118,062!
147
        taosCloseQueue(queue);
×
148
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
149
        queue = NULL;
×
150
        break;
×
151
      }
152

153
      (void)taosThreadAttrDestroy(&thAttr);
118,062✔
154
      pool->num++;
118,062✔
155
      uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
118,062!
156
    } while (pool->num < pool->min);
118,062✔
157
  }
158

159
  (void)taosThreadMutexUnlock(&pool->mutex);
19,563✔
160
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
19,563!
161

162
  return queue;
19,563✔
163
}
164

165
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
19,563✔
166
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
19,563!
167
  taosCloseQueue(queue);
19,563✔
168
}
19,563✔
169

170
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
2,371✔
171
  int32_t code;
172

173
  code = taosOpenQset(&pool->qset);
2,371✔
174
  if (code) {
2,371!
175
    return terrno = code;
×
176
  }
177

178
  pool->workers = taosArrayInit(2, sizeof(SQueueWorker *));
2,371✔
179
  if (pool->workers == NULL) {
2,371!
180
    taosCloseQset(pool->qset);
×
181
    return terrno;
×
182
  }
183

184
  (void)taosThreadMutexInit(&pool->mutex, NULL);
2,371✔
185

186
  uInfo("worker:%s is initialized as auto", pool->name);
2,371!
187
  return 0;
2,371✔
188
}
189

190
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
2,371✔
191
  int32_t size = taosArrayGetSize(pool->workers);
2,371✔
192
  for (int32_t i = 0; i < size; ++i) {
7,957✔
193
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
5,586✔
194
    if (taosCheckPthreadValid(worker->thread)) {
5,586!
195
      taosQsetThreadResume(pool->qset);
5,586✔
196
    }
197
  }
198

199
  for (int32_t i = 0; i < size; ++i) {
7,957✔
200
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
5,586✔
201
    if (taosCheckPthreadValid(worker->thread)) {
5,586!
202
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
5,586!
203
      (void)taosThreadJoin(worker->thread, NULL);
5,586✔
204
      taosThreadClear(&worker->thread);
5,586✔
205
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
5,586!
206
    }
207
    taosMemoryFree(worker);
5,586✔
208
  }
209

210
  taosArrayDestroy(pool->workers);
2,371✔
211
  taosCloseQset(pool->qset);
2,371✔
212
  (void)taosThreadMutexDestroy(&pool->mutex);
2,371✔
213

214
  uInfo("worker:%s is closed", pool->name);
2,371!
215
}
2,371✔
216

217
static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
5,586✔
218
  SAutoQWorkerPool *pool = worker->pool;
5,586✔
219
  SQueueInfo        qinfo = {0};
5,586✔
220
  void             *msg = NULL;
5,586✔
221
  int32_t           code = 0;
5,586✔
222

223
  int32_t ret = taosBlockSIGPIPE();
5,586✔
224
  if (ret < 0) {
5,586!
225
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
226
  }
227

228
  setThreadName(pool->name);
5,586✔
229
  worker->pid = taosGetSelfPthreadId();
5,586✔
230
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
5,586!
231

232
  while (1) {
233
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
450,921✔
234
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
5,584!
235
            worker->pid);
236
      break;
5,586✔
237
    }
238

239
    if (qinfo.timestamp != 0) {
445,408!
240
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
445,391✔
241
      if (cost > QUEUE_THRESHOLD) {
445,391✔
242
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
941!
243
      }
244
    }
245

246
    if (qinfo.fp != NULL) {
445,384!
247
      qinfo.workerId = worker->id;
445,394✔
248
      qinfo.threadNum = taosArrayGetSize(pool->workers);
445,394✔
249
      (*((FItem)qinfo.fp))(&qinfo, msg);
445,335✔
250
    }
251

252
    taosUpdateItemSize(qinfo.queue, 1);
445,253✔
253
  }
254
  DestoryThreadLocalRegComp();
5,586✔
255

256
  return NULL;
5,585✔
257
}
258

259
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp) {
13,187✔
260
  int32_t     code;
261
  STaosQueue *queue;
262

263
  code = taosOpenQueue(&queue);
13,187✔
264
  if (code) {
13,187!
265
    terrno = code;
×
266
    return NULL;
×
267
  }
268

269
  (void)taosThreadMutexLock(&pool->mutex);
13,187✔
270
  taosSetQueueFp(queue, fp, NULL);
13,187✔
271

272
  code = taosAddIntoQset(pool->qset, queue, ahandle);
13,187✔
273
  if (code) {
13,187!
274
    taosCloseQueue(queue);
×
275
    (void)taosThreadMutexUnlock(&pool->mutex);
×
276
    terrno = code;
×
277
    return NULL;
×
278
  }
279

280
  int32_t queueNum = taosGetQueueNumber(pool->qset);
13,187✔
281
  int32_t curWorkerNum = taosArrayGetSize(pool->workers);
13,187✔
282
  int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
13,187✔
283
  if (dstWorkerNum < 2) dstWorkerNum = 2;
13,187✔
284

285
  // spawn a thread to process queue
286
  while (curWorkerNum < dstWorkerNum) {
18,773✔
287
    SQueueWorker *worker = taosMemoryCalloc(1, sizeof(SQueueWorker));
5,586✔
288
    if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
11,172!
289
      uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
×
290
      taosMemoryFree(worker);
×
291
      taosCloseQueue(queue);
×
292
      (void)taosThreadMutexUnlock(&pool->mutex);
×
293
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
294
      return NULL;
×
295
    }
296
    worker->id = curWorkerNum;
5,586✔
297
    worker->pool = pool;
5,586✔
298

299
    TdThreadAttr thAttr;
300
    (void)taosThreadAttrInit(&thAttr);
5,586✔
301
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
5,586✔
302

303
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
5,586!
304
      uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
×
305
      void *tmp = taosArrayPop(pool->workers);
×
306
      taosMemoryFree(worker);
×
307
      taosCloseQueue(queue);
×
308
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
309
      return NULL;
×
310
    }
311

312
    (void)taosThreadAttrDestroy(&thAttr);
5,586✔
313
    int32_t numOfThreads = taosArrayGetSize(pool->workers);
5,586✔
314
    uInfo("worker:%s:%d is launched, total:%d, expect:%d", pool->name, worker->id, numOfThreads, dstWorkerNum);
5,586!
315

316
    curWorkerNum++;
5,586✔
317
  }
318

319
  (void)taosThreadMutexUnlock(&pool->mutex);
13,187✔
320
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
13,187!
321

322
  return queue;
13,187✔
323
}
324

325
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
13,186✔
326
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
13,186!
327
  taosCloseQueue(queue);
13,187✔
328
}
13,187✔
329

330
int32_t tWWorkerInit(SWWorkerPool *pool) {
55,259✔
331
  pool->nextId = 0;
55,259✔
332
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
55,259✔
333
  if (pool->workers == NULL) {
55,259!
334
    return terrno;
×
335
  }
336

337
  (void)taosThreadMutexInit(&pool->mutex, NULL);
55,259✔
338

339
  for (int32_t i = 0; i < pool->max; ++i) {
131,857✔
340
    SWWorker *worker = pool->workers + i;
76,598✔
341
    worker->id = i;
76,598✔
342
    worker->qall = NULL;
76,598✔
343
    worker->qset = NULL;
76,598✔
344
    worker->pool = pool;
76,598✔
345
  }
346

347
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
55,259!
348
  return 0;
55,259✔
349
}
350

351
void tWWorkerCleanup(SWWorkerPool *pool) {
55,251✔
352
  for (int32_t i = 0; i < pool->max; ++i) {
131,841✔
353
    SWWorker *worker = pool->workers + i;
76,590✔
354
    if (taosCheckPthreadValid(worker->thread)) {
76,590✔
355
      if (worker->qset) {
62,161!
356
        taosQsetThreadResume(worker->qset);
62,162✔
357
      }
358
    }
359
  }
360

361
  for (int32_t i = 0; i < pool->max; ++i) {
131,849✔
362
    SWWorker *worker = pool->workers + i;
76,588✔
363
    if (taosCheckPthreadValid(worker->thread)) {
76,588✔
364
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
62,158!
365
      (void)taosThreadJoin(worker->thread, NULL);
62,166✔
366
      taosThreadClear(&worker->thread);
62,167✔
367
      taosFreeQall(worker->qall);
62,167✔
368
      taosCloseQset(worker->qset);
62,167✔
369
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
62,167!
370
    }
371
  }
372

373
  taosMemoryFreeClear(pool->workers);
55,261✔
374
  (void)taosThreadMutexDestroy(&pool->mutex);
55,261✔
375

376
  uInfo("worker:%s is closed", pool->name);
55,256!
377
}
55,258✔
378

379
static void *tWWorkerThreadFp(SWWorker *worker) {
62,160✔
380
  SWWorkerPool *pool = worker->pool;
62,160✔
381
  SQueueInfo    qinfo = {0};
62,160✔
382
  void         *msg = NULL;
62,160✔
383
  int32_t       code = 0;
62,160✔
384
  int32_t       numOfMsgs = 0;
62,160✔
385

386
  int32_t ret = taosBlockSIGPIPE();
62,160✔
387
  if (ret < 0) {
62,154!
388
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
389
  }
390

391
  setThreadName(pool->name);
62,154✔
392
  worker->pid = taosGetSelfPthreadId();
62,142✔
393
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
62,158!
394

395
  while (1) {
396
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
30,723,866✔
397
    if (numOfMsgs == 0) {
30,714,145✔
398
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
61,839!
399
            worker->pid);
400
      break;
62,167✔
401
    }
402

403
    if (qinfo.timestamp != 0) {
30,652,306✔
404
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
30,651,136✔
405
      if (cost > QUEUE_THRESHOLD) {
30,651,136✔
406
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
23!
407
      }
408
    }
409

410
    if (qinfo.fp != NULL) {
30,651,260✔
411
      qinfo.workerId = worker->id;
30,647,582✔
412
      qinfo.threadNum = pool->num;
30,647,582✔
413
      (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
30,647,582✔
414
    }
415
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
30,663,154✔
416
  }
417

418
  return NULL;
62,167✔
419
}
420

421
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
66,075✔
422
  (void)taosThreadMutexLock(&pool->mutex);
66,075✔
423
  SWWorker   *worker = pool->workers + pool->nextId;
66,075✔
424
  int32_t     code = -1;
66,075✔
425
  STaosQueue *queue;
426

427
  code = taosOpenQueue(&queue);
66,075✔
428
  if (code) goto _OVER;
66,075!
429

430
  taosSetQueueFp(queue, NULL, fp);
66,075✔
431
  if (worker->qset == NULL) {
66,075✔
432
    code = taosOpenQset(&worker->qset);
62,167✔
433
    if (code) goto _OVER;
62,167!
434

435
    code = taosAddIntoQset(worker->qset, queue, ahandle);
62,167✔
436
    if (code) goto _OVER;
62,167!
437
    code = taosAllocateQall(&worker->qall);
62,167✔
438
    if (code) goto _OVER;
62,167!
439

440
    TdThreadAttr thAttr;
441
    (void)taosThreadAttrInit(&thAttr);
62,167✔
442
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
62,167✔
443
    code = taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker);
62,167✔
444
    if ((code)) goto _OVER;
62,167!
445

446
    uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
62,167!
447
    pool->nextId = (pool->nextId + 1) % pool->max;
62,167✔
448

449
    (void)taosThreadAttrDestroy(&thAttr);
62,167✔
450
    pool->num++;
62,167✔
451
    if (pool->num > pool->max) pool->num = pool->max;
62,167✔
452
  } else {
453
    code = taosAddIntoQset(worker->qset, queue, ahandle);
3,908✔
454
    if (code) goto _OVER;
3,908!
455
    pool->nextId = (pool->nextId + 1) % pool->max;
3,908✔
456
  }
457

458
_OVER:
66,075✔
459
  (void)taosThreadMutexUnlock(&pool->mutex);
66,075✔
460

461
  if (code) {
66,075!
462
    if (queue != NULL) taosCloseQueue(queue);
×
463
    if (worker->qset != NULL) taosCloseQset(worker->qset);
×
464
    if (worker->qall != NULL) taosFreeQall(worker->qall);
×
465
    terrno = code;
×
466
    return NULL;
×
467
  } else {
468
    while (worker->pid <= 0) taosMsleep(10);
116,115✔
469

470
    taosQueueSetThreadId(queue, worker->pid);
66,073✔
471
    uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, worker->pid);
66,073!
472
    return queue;
66,075✔
473
  }
474
}
475

476
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
66,075✔
477
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
66,075!
478
  taosCloseQueue(queue);
66,075✔
479
}
66,069✔
480

481
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
22,069✔
482
  int32_t code;
483
  pWorker->poolType = pCfg->poolType;
22,069✔
484
  pWorker->name = pCfg->name;
22,069✔
485

486
  switch (pCfg->poolType) {
22,069!
487
    case QWORKER_POOL: {
19,563✔
488
      SQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQWorkerPool));
19,563✔
489
      if (!pPool) {
19,563!
490
        return terrno;
×
491
      }
492
      pPool->name = pCfg->name;
19,563✔
493
      pPool->min = pCfg->min;
19,563✔
494
      pPool->max = pCfg->max;
19,563✔
495
      pWorker->pool = pPool;
19,563✔
496
      if ((code = tQWorkerInit(pPool))) {
19,563!
497
        return (terrno = code);
×
498
      }
499

500
      pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
19,563✔
501
      if (pWorker->queue == NULL) {
19,563!
502
        return terrno;
×
503
      }
504
    } break;
19,563✔
505
    case QUERY_AUTO_QWORKER_POOL: {
2,506✔
506
      SQueryAutoQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPool));
2,506✔
507
      if (!pPool) {
2,506!
508
        return terrno;
×
509
      }
510
      pPool->name = pCfg->name;
2,506✔
511
      pPool->min = pCfg->min;
2,506✔
512
      pPool->max = pCfg->max;
2,506✔
513
      pWorker->pool = pPool;
2,506✔
514

515
      code = tQueryAutoQWorkerInit(pPool);
2,506✔
516
      if (code) return code;
2,506!
517

518
      pWorker->queue = tQueryAutoQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
2,506✔
519
      if (!pWorker->queue) {
2,506!
520
        return terrno;
×
521
      }
522
    } break;
2,506✔
523
    default:
×
524
      return TSDB_CODE_INVALID_PARA;
×
525
  }
526
  return 0;
22,069✔
527
}
528

529
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
22,069✔
530
  if (pWorker->queue == NULL) return;
22,069!
531
  while (!taosQueueEmpty(pWorker->queue)) {
22,359✔
532
    taosMsleep(10);
290✔
533
  }
534

535
  switch (pWorker->poolType) {
22,069!
536
    case QWORKER_POOL:
19,563✔
537
      tQWorkerCleanup(pWorker->pool);
19,563✔
538
      tQWorkerFreeQueue(pWorker->pool, pWorker->queue);
19,563✔
539
      taosMemoryFree(pWorker->pool);
19,563✔
540
      break;
19,563✔
541
    case QUERY_AUTO_QWORKER_POOL:
2,506✔
542
      tQueryAutoQWorkerCleanup(pWorker->pool);
2,506✔
543
      tQueryAutoQWorkerFreeQueue(pWorker->pool, pWorker->queue);
2,506✔
544
      taosMemoryFree(pWorker->pool);
2,506✔
545
      break;
2,506✔
546
    default:
×
547
      break;
×
548
  }
549
}
550

551
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
52,888✔
552
  SWWorkerPool *pPool = &pWorker->pool;
52,888✔
553
  pPool->name = pCfg->name;
52,888✔
554
  pPool->max = pCfg->max;
52,888✔
555

556
  int32_t code = tWWorkerInit(pPool);
52,888✔
557
  if (code) return code;
52,888!
558

559
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
52,888✔
560
  if (pWorker->queue == NULL) {
52,888!
561
    return terrno;
×
562
  }
563

564
  pWorker->name = pCfg->name;
52,888✔
565
  return 0;
52,888✔
566
}
567

568
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
52,887✔
569
  if (pWorker->queue == NULL) return;
52,887!
570

571
  while (!taosQueueEmpty(pWorker->queue)) {
57,040✔
572
    taosMsleep(10);
4,159✔
573
  }
574

575
  tWWorkerCleanup(&pWorker->pool);
52,888✔
576
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
52,888✔
577
}
578

579
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool);
580
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p);
581
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p);
582
static void    tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool);
583
static bool    tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker);
584

585
#define GET_ACTIVE_N(int64_val)  (int32_t)((int64_val) >> 32)
586
#define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF)
587

588
static int32_t atomicFetchSubActive(int64_t *ptr, int32_t val) {
×
589
  int64_t acutalSubVal = val;
×
590
  acutalSubVal <<= 32;
×
591
  int64_t newVal64 = atomic_fetch_sub_64(ptr, acutalSubVal);
×
592
  return GET_ACTIVE_N(newVal64);
×
593
}
594

595
static int32_t atomicFetchSubRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_sub_64(ptr, val)); }
26,076,383✔
596

597
static int32_t atomicFetchAddActive(int64_t *ptr, int32_t val) {
574,880✔
598
  int64_t actualAddVal = val;
574,880✔
599
  actualAddVal <<= 32;
574,880✔
600
  int64_t newVal64 = atomic_fetch_add_64(ptr, actualAddVal);
574,880✔
601
  return GET_ACTIVE_N(newVal64);
574,880✔
602
}
603

604
static int32_t atomicFetchAddRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_add_64(ptr, val)); }
×
605

606
static bool atomicCompareExchangeActive(int64_t *ptr, int32_t *expectedVal, int32_t newVal) {
12✔
607
  int64_t oldVal64 = *expectedVal, newVal64 = newVal;
12✔
608
  int32_t running = GET_RUNNING_N(*ptr);
12✔
609
  oldVal64 <<= 32;
12✔
610
  newVal64 <<= 32;
12✔
611
  oldVal64 |= running;
12✔
612
  newVal64 |= running;
12✔
613
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
12✔
614
  if (actualNewVal64 == oldVal64) {
12!
615
    return true;
12✔
616
  } else {
617
    *expectedVal = GET_ACTIVE_N(actualNewVal64);
×
618
    return false;
×
619
  }
620
}
621

622
static int64_t atomicCompareExchangeRunning(int64_t *ptr, int32_t *expectedVal, int32_t newVal) {
×
623
  int64_t oldVal64 = *expectedVal, newVal64 = newVal;
×
624
  int64_t activeShifted = GET_ACTIVE_N(*ptr);
×
625
  activeShifted <<= 32;
×
626
  oldVal64 |= activeShifted;
×
627
  newVal64 |= activeShifted;
×
628
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
×
629
  if (actualNewVal64 == oldVal64) {
×
630
    return true;
×
631
  } else {
632
    *expectedVal = GET_RUNNING_N(actualNewVal64);
×
633
    return false;
×
634
  }
635
}
636

637
static int64_t atomicCompareExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive,
34,852,731✔
638
                                                     int32_t *expectedRunning, int32_t newRunning) {
639
  int64_t oldVal64 = *expectedActive, newVal64 = newActive;
34,852,731✔
640
  oldVal64 <<= 32;
34,852,731✔
641
  oldVal64 |= *expectedRunning;
34,852,731✔
642
  newVal64 <<= 32;
34,852,731✔
643
  newVal64 |= newRunning;
34,852,731✔
644
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
34,852,731✔
645
  if (actualNewVal64 == oldVal64) {
34,855,506✔
646
    return true;
34,764,150✔
647
  } else {
648
    *expectedActive = GET_ACTIVE_N(actualNewVal64);
91,356✔
649
    *expectedRunning = GET_RUNNING_N(actualNewVal64);
91,356✔
650
    return false;
91,356✔
651
  }
652
}
653

654
static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
576,515✔
655
  SQueryAutoQWorkerPool *pool = worker->pool;
576,515✔
656
  SQueueInfo             qinfo = {0};
576,515✔
657
  void                  *msg = NULL;
576,515✔
658
  int32_t                code = 0;
576,515✔
659

660
  int32_t ret = taosBlockSIGPIPE();
576,515✔
661
  if (ret < 0) {
576,580!
662
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
663
  }
664

665
  setThreadName(pool->name);
576,580✔
666
  worker->pid = taosGetSelfPthreadId();
576,306✔
667
  uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
576,369✔
668

669
  while (1) {
670
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
26,653,893✔
671
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
573,813!
672
            worker->pid);
673
      break;
574,762✔
674
    }
675

676
    if (qinfo.timestamp != 0) {
26,081,243!
677
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
26,079,723✔
678
      if (cost > QUEUE_THRESHOLD) {
26,079,723!
679
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
×
680
      }
681
    }
682

683
    tQueryAutoQWorkerWaitingCheck(pool);
26,079,249✔
684

685
    if (qinfo.fp != NULL) {
26,082,491!
686
      qinfo.workerId = worker->id;
26,082,582✔
687
      qinfo.threadNum = pool->num;
26,082,582✔
688
      qinfo.workerCb = pool->pCb;
26,082,582✔
689
      (*((FItem)qinfo.fp))(&qinfo, msg);
26,082,582✔
690
    }
691

692
    taosUpdateItemSize(qinfo.queue, 1);
26,070,887✔
693
    if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) {
26,081,887✔
694
      uDebug("worker:%s:%d exited", pool->name, worker->id);
1,800✔
695
      break;
1,801✔
696
    }
697
  }
698

699
  DestoryThreadLocalRegComp();
576,563✔
700

701
  return NULL;
576,513✔
702
}
703

704
static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) {
30,419,515✔
705
  SQueryAutoQWorkerPool *pPool = p;
30,419,515✔
706
  bool                   ret = false;
30,419,515✔
707
  int32_t                waiting = pPool->waitingAfterBlockN;
30,419,515✔
708
  while (waiting > 0) {
30,419,515✔
709
    int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1);
59✔
710
    if (waitingNew == waiting) {
59!
711
      (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
59✔
712
      (void)taosThreadCondSignal(&pPool->waitingAfterBlockCond);
59✔
713
      (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
59✔
UNCOV
714
      ret = true;
×
UNCOV
715
      break;
×
716
    }
717
    waiting = waitingNew;
×
718
  }
719
  return ret;
30,419,113✔
720
}
721

722
static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void *p) {
30,419,626✔
723
  SQueryAutoQWorkerPool *pPool = p;
30,419,626✔
724
  bool                   ret = false;
30,419,626✔
725
  int32_t                waiting = pPool->waitingBeforeProcessMsgN;
30,419,626✔
726
  while (waiting > 0) {
30,419,626✔
727
    int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1);
12✔
728
    if (waitingNew == waiting) {
12!
729
      (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
12✔
730
      (void)taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond);
12✔
731
      (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
12✔
UNCOV
732
      ret = true;
×
UNCOV
733
      break;
×
734
    }
735
    waiting = waitingNew;
×
736
  }
737
  return ret;
30,419,361✔
738
}
739

740
static bool tQueryAutoQWorkerTryDecActive(void *p, int32_t minActive) {
30,419,601✔
741
  SQueryAutoQWorkerPool *pPool = p;
30,419,601✔
742
  bool                   ret = false;
30,419,601✔
743
  int64_t                val64 = pPool->activeRunningN;
30,419,601✔
744
  int32_t                active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64);
30,419,601✔
745
  while (active > minActive) {
30,421,741✔
746
    if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1))
4,342,577✔
747
      return true;
4,340,678✔
748
  }
749
  return false;
26,079,164✔
750
}
751

752
static void tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
26,079,716✔
753
  while (1) {
461✔
754
    int64_t val64 = pPool->activeRunningN;
26,079,716✔
755
    int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
26,079,716✔
756
    while (running < pPool->num) {
26,164,622!
757
      if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) {
26,165,792✔
758
        return;
26,082,753✔
759
      }
760
    }
761
    if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) {
×
762
      break;
×
763
    }
764
  }
765
  // to wait for process
766
  (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
×
767
  (void)atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1);
12✔
768
  if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock);
12!
769
  // recovered from waiting
770
  (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
12✔
771
  return;
12✔
772
}
773

774
bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) {
26,079,566✔
775
  if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) ||
52,158,859!
776
      tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) {
26,079,444✔
777
    (void)taosThreadMutexLock(&pPool->poolLock);
2,115,289✔
778
    SListNode *pNode = listNode(pWorker);
2,115,175✔
779
    SListNode *tNode = tdListPopNode(pPool->workers, pNode);
2,115,175✔
780
    // reclaim some workers
781
    if (pWorker->id >= pPool->maxInUse) {
2,115,175✔
782
      while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) {
10!
783
        SListNode         *head = tdListPopHead(pPool->exitedWorkers);
×
784
        SQueryAutoQWorker *pWorker = (SQueryAutoQWorker *)head->data;
×
785
        if (pWorker && taosCheckPthreadValid(pWorker->thread)) {
×
786
          (void)taosThreadJoin(pWorker->thread, NULL);
×
787
          taosThreadClear(&pWorker->thread);
×
788
        }
789
        taosMemoryFree(head);
×
790
      }
791
      tdListAppendNode(pPool->exitedWorkers, pNode);
10✔
792
      (void)taosThreadMutexUnlock(&pPool->poolLock);
10✔
793
      return false;
10✔
794
    }
795

796
    // put back to backup pool
797
    tdListAppendNode(pPool->backupWorkers, pNode);
2,115,165✔
798
    (void)taosThreadMutexUnlock(&pPool->poolLock);
2,115,165✔
799

800
    // start to wait at backup cond
801
    (void)taosThreadMutexLock(&pPool->backupLock);
2,115,164✔
802
    (void)atomic_fetch_add_32(&pPool->backupNum, 1);
2,115,165✔
803
    if (!pPool->exit) (void)taosThreadCondWait(&pPool->backupCond, &pPool->backupLock);
2,115,165!
804
    (void)taosThreadMutexUnlock(&pPool->backupLock);
2,115,165✔
805

806
    // recovered from backup
807
    (void)taosThreadMutexLock(&pPool->poolLock);
2,115,156✔
808
    if (pPool->exit) {
2,115,165✔
809
      (void)taosThreadMutexUnlock(&pPool->poolLock);
1,792✔
810
      return false;
1,792✔
811
    }
812
    SListNode *tNode1 = tdListPopNode(pPool->backupWorkers, pNode);
2,113,373✔
813
    tdListAppendNode(pPool->workers, pNode);
2,113,373✔
814
    (void)taosThreadMutexUnlock(&pPool->poolLock);
2,113,373✔
815

816
    return true;
2,113,373✔
817
  } else {
818
    (void)atomicFetchSubRunning(&pPool->activeRunningN, 1);
23,963,081✔
819
    return true;
23,965,056✔
820
  }
821
}
822

823
int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
8,940✔
824
  int32_t code;
825

826
  (void)taosThreadMutexInit(&pool->poolLock, NULL);
8,940✔
827
  (void)taosThreadMutexInit(&pool->backupLock, NULL);
8,940✔
828
  (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL);
8,940✔
829
  (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL);
8,940✔
830

831
  (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL);
8,940✔
832
  (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL);
8,940✔
833
  (void)taosThreadCondInit(&pool->backupCond, NULL);
8,940✔
834

835
  code = taosOpenQset(&pool->qset);
8,940✔
836
  if (code) return terrno = code;
8,940!
837
  pool->workers = tdListNew(sizeof(SQueryAutoQWorker));
8,940✔
838
  if (!pool->workers) return terrno;
8,940!
839
  pool->backupWorkers = tdListNew(sizeof(SQueryAutoQWorker));
8,940✔
840
  if (!pool->backupWorkers) return terrno;
8,940!
841
  pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker));
8,940✔
842
  if (!pool->exitedWorkers) return terrno;
8,940!
843
  pool->maxInUse = pool->max * 2 + 2;
8,940✔
844

845
  if (!pool->pCb) {
8,940!
846
    pool->pCb = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPoolCB));
8,940✔
847
    if (!pool->pCb) return terrno;
8,940!
848
    pool->pCb->pPool = pool;
8,940✔
849
    pool->pCb->beforeBlocking = tQueryAutoQWorkerBeforeBlocking;
8,940✔
850
    pool->pCb->afterRecoverFromBlocking = tQueryAutoQWorkerRecoverFromBlocking;
8,940✔
851
  }
852
  return TSDB_CODE_SUCCESS;
8,940✔
853
}
854

855
void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
8,940✔
856
  (void)taosThreadMutexLock(&pPool->poolLock);
8,940✔
857
  pPool->exit = true;
8,940✔
858
  int32_t size = 0;
8,940✔
859
  if (pPool->workers) {
8,940!
860
    size = listNEles(pPool->workers);
8,940✔
861
  }
862
  if (pPool->backupWorkers) {
8,940!
863
    size += listNEles(pPool->backupWorkers);
8,940✔
864
  }
865
  if (pPool->qset) {
8,940!
866
    for (int32_t i = 0; i < size; ++i) {
585,611✔
867
      taosQsetThreadResume(pPool->qset);
576,671✔
868
    }
869
  }
870
  (void)taosThreadMutexUnlock(&pPool->poolLock);
8,940✔
871

872
  (void)taosThreadMutexLock(&pPool->backupLock);
8,940✔
873
  (void)taosThreadCondBroadcast(&pPool->backupCond);
8,940✔
874
  (void)taosThreadMutexUnlock(&pPool->backupLock);
8,940✔
875

876
  (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
8,940✔
877
  (void)taosThreadCondBroadcast(&pPool->waitingAfterBlockCond);
8,940✔
878
  (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
8,940✔
879

880
  (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
8,940✔
881
  (void)taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond);
8,940✔
882
  (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
8,940✔
883

884
  int32_t            idx = 0;
8,940✔
885
  SQueryAutoQWorker *worker = NULL;
8,940✔
886
  while (pPool->workers) {
583,819!
887
    (void)taosThreadMutexLock(&pPool->poolLock);
583,819✔
888
    if (listNEles(pPool->workers) == 0) {
583,819✔
889
      (void)taosThreadMutexUnlock(&pPool->poolLock);
8,940✔
890
      break;
8,940✔
891
    }
892
    SListNode *pNode = tdListPopHead(pPool->workers);
574,879✔
893
    worker = (SQueryAutoQWorker *)pNode->data;
574,879✔
894
    (void)taosThreadMutexUnlock(&pPool->poolLock);
574,879✔
895
    if (worker && taosCheckPthreadValid(worker->thread)) {
574,879!
896
      (void)taosThreadJoin(worker->thread, NULL);
574,879✔
897
      taosThreadClear(&worker->thread);
574,879✔
898
    }
899
    taosMemoryFree(pNode);
574,879✔
900
  }
901

902
  while (pPool->backupWorkers && listNEles(pPool->backupWorkers) > 0) {
10,732!
903
    SListNode *pNode = tdListPopHead(pPool->backupWorkers);
1,792✔
904
    worker = (SQueryAutoQWorker *)pNode->data;
1,792✔
905
    if (worker && taosCheckPthreadValid(worker->thread)) {
1,792!
906
      (void)taosThreadJoin(worker->thread, NULL);
1,792✔
907
      taosThreadClear(&worker->thread);
1,792✔
908
    }
909
    taosMemoryFree(pNode);
1,792✔
910
  }
911

912
  while (pPool->exitedWorkers && listNEles(pPool->exitedWorkers) > 0) {
8,950!
913
    SListNode *pNode = tdListPopHead(pPool->exitedWorkers);
10✔
914
    worker = (SQueryAutoQWorker *)pNode->data;
10✔
915
    if (worker && taosCheckPthreadValid(worker->thread)) {
10!
916
      (void)taosThreadJoin(worker->thread, NULL);
10✔
917
      taosThreadClear(&worker->thread);
10✔
918
    }
919
    taosMemoryFree(pNode);
10✔
920
  }
921

922
  pPool->workers = tdListFree(pPool->workers);
8,940✔
923
  pPool->backupWorkers = tdListFree(pPool->backupWorkers);
8,940✔
924
  pPool->exitedWorkers = tdListFree(pPool->exitedWorkers);
8,940✔
925
  taosMemoryFree(pPool->pCb);
8,940✔
926

927
  (void)taosThreadMutexDestroy(&pPool->poolLock);
8,940✔
928
  (void)taosThreadMutexDestroy(&pPool->backupLock);
8,940✔
929
  (void)taosThreadMutexDestroy(&pPool->waitingAfterBlockLock);
8,940✔
930
  (void)taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock);
8,940✔
931

932
  (void)taosThreadCondDestroy(&pPool->backupCond);
8,940✔
933
  (void)taosThreadCondDestroy(&pPool->waitingAfterBlockCond);
8,940✔
934
  (void)taosThreadCondDestroy(&pPool->waitingBeforeProcessMsgCond);
8,940✔
935
  taosCloseQset(pPool->qset);
8,940✔
936
}
8,940✔
937

938
STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahandle, FItem fp) {
19,756✔
939
  STaosQueue *queue;
940
  int32_t     code = taosOpenQueue(&queue);
19,756✔
941
  if (code) {
19,756!
942
    terrno = code;
×
943
    return NULL;
×
944
  }
945

946
  (void)taosThreadMutexLock(&pool->poolLock);
19,756✔
947
  taosSetQueueFp(queue, fp, NULL);
19,756✔
948
  code = taosAddIntoQset(pool->qset, queue, ahandle);
19,756✔
949
  if (code) {
19,756!
950
    taosCloseQueue(queue);
×
951
    queue = NULL;
×
952
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
953
    return NULL;
×
954
  }
955
  SQueryAutoQWorker  worker = {0};
19,756✔
956
  SQueryAutoQWorker *pWorker = NULL;
19,756✔
957

958
  // spawn a thread to process queue
959
  if (pool->num < pool->max) {
19,756✔
960
    do {
961
      worker.id = listNEles(pool->workers);
574,880✔
962
      worker.backupIdx = -1;
574,880✔
963
      worker.pool = pool;
574,880✔
964
      SListNode *pNode = tdListAdd(pool->workers, &worker);
574,880✔
965
      if (!pNode) {
574,880!
966
        taosCloseQueue(queue);
×
967
        queue = NULL;
×
968
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
969
        break;
×
970
      }
971
      pWorker = (SQueryAutoQWorker *)pNode->data;
574,880✔
972

973
      TdThreadAttr thAttr;
974
      (void)taosThreadAttrInit(&thAttr);
574,880✔
975
      (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
574,880✔
976

977
      if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
574,880!
978
        taosCloseQueue(queue);
×
979
        queue = NULL;
×
980
        break;
×
981
      }
982

983
      (void)taosThreadAttrDestroy(&thAttr);
574,880✔
984
      pool->num++;
574,880✔
985
      (void)atomicFetchAddActive(&pool->activeRunningN, 1);
574,880✔
986
      uInfo("worker:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num);
574,880!
987
    } while (pool->num < pool->min);
574,880✔
988
  }
989

990
  (void)taosThreadMutexUnlock(&pool->poolLock);
19,756✔
991
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
19,756!
992

993
  return queue;
19,756✔
994
}
995

996
void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool *pPool, STaosQueue *pQ) { taosCloseQueue(pQ); }
15,693✔
997

998
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool) {
2,115,060✔
999
  // try backup pool
1000
  int32_t backup = pool->backupNum;
2,115,060✔
1001
  while (backup > 0) {
2,115,344✔
1002
    int32_t backupNew = atomic_val_compare_exchange_32(&pool->backupNum, backup, backup - 1);
2,113,547✔
1003
    if (backupNew == backup) {
2,113,635✔
1004
      (void)taosThreadCondSignal(&pool->backupCond);
2,113,351✔
1005
      return TSDB_CODE_SUCCESS;
2,113,369✔
1006
    }
1007
    backup = backupNew;
284✔
1008
  }
1009
  // backup pool is empty, create new
1010
  SQueryAutoQWorker *pWorker = NULL;
1,797✔
1011
  SQueryAutoQWorker  worker = {0};
1,797✔
1012
  worker.pool = pool;
1,797✔
1013
  worker.backupIdx = -1;
1,797✔
1014
  (void)taosThreadMutexLock(&pool->poolLock);
1,797✔
1015
  worker.id = listNEles(pool->workers);
1,801✔
1016
  SListNode *pNode = tdListAdd(pool->workers, &worker);
1,801✔
1017
  if (!pNode) {
1,801!
1018
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
1019
    return terrno;
×
1020
  }
1021
  (void)taosThreadMutexUnlock(&pool->poolLock);
1,801✔
1022
  pWorker = (SQueryAutoQWorker *)pNode->data;
1,801✔
1023

1024
  TdThreadAttr thAttr;
1025
  (void)taosThreadAttrInit(&thAttr);
1,801✔
1026
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
1,801✔
1027

1028
  if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
1,801!
1029
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1030
    return terrno;
×
1031
  }
1032
  (void)taosThreadAttrDestroy(&thAttr);
1,801✔
1033

1034
  return TSDB_CODE_SUCCESS;
1,801✔
1035
}
1036

1037
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) {
4,340,465✔
1038
  SQueryAutoQWorkerPool *pPool = p;
4,340,465✔
1039
  if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(p) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(p) ||
8,681,062!
1040
      tQueryAutoQWorkerTryDecActive(p, pPool->num)) {
4,340,480✔
1041
  } else {
1042
    int32_t code = tQueryAutoQWorkerAddWorker(pPool);
2,115,106✔
1043
    if (code != TSDB_CODE_SUCCESS) {
2,115,166!
1044
      return code;
×
1045
    }
1046
    (void)atomicFetchSubRunning(&pPool->activeRunningN, 1);
2,115,166✔
1047
  }
1048

1049
  return TSDB_CODE_SUCCESS;
4,340,737✔
1050
}
1051

1052
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) {
4,340,628✔
1053
  SQueryAutoQWorkerPool *pPool = p;
4,340,628✔
1054
  int64_t                val64 = pPool->activeRunningN;
4,340,628✔
1055
  int32_t                running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
4,340,628✔
1056
  while (running < pPool->num) {
4,345,333✔
1057
    if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) {
4,345,320✔
1058
      return TSDB_CODE_SUCCESS;
4,340,688✔
1059
    }
1060
  }
1061
  (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
13✔
1062
  (void)atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1);
59✔
1063
  if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock);
59!
1064
  (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
59✔
1065
  if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT;
59!
1066
  return TSDB_CODE_SUCCESS;
59✔
1067
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc