• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4892

20 Dec 2025 01:15PM UTC coverage: 65.571% (+0.02%) from 65.549%
#4892

push

travis-ci

web-flow
feat: support taos_connect_with func (#33952)

* feat: support taos_connect_with

* refactor: enhance connection options and add tests for taos_set_option and taos_connect_with

* fix: handle NULL keys and values in taos_connect_with options

* fix: revert TAOSWS_GIT_TAG to default value "main"

* docs: add TLS configuration options for WebSocket connections in documentation

* docs: modify zh docs and add en docs

* chore: update taos.cfg

* docs: add examples

* docs: add error handling for connection failure in example code

2 of 82 new or added lines in 3 files covered. (2.44%)

527 existing lines in 120 files now uncovered.

182859 of 278870 relevant lines covered (65.57%)

104634355.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.65
/source/util/src/tworker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tworker.h"
18
#include "taoserror.h"
19
#include "tcompare.h"
20
#include "tgeosctx.h"
21
#include "tlog.h"
22
#include "ttrace.h"
23
#include "tcurl.h"
24

25
#define QUEUE_THRESHOLD (1000 * 1000)
26

27
typedef void *(*ThreadFp)(void *param);
28

29
int32_t tQWorkerInit(SQWorkerPool *pool) {
5,660,240✔
30
  int32_t code = taosOpenQset(&pool->qset);
5,660,240✔
31
  if (code) return code;
5,660,240✔
32

33
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SQueueWorker));
5,660,240✔
34
  if (pool->workers == NULL) {
5,660,240✔
35
    taosCloseQset(pool->qset);
×
36
    return terrno;
×
37
  }
38

39
  (void)taosThreadMutexInit(&pool->mutex, NULL);
5,660,240✔
40

41
  for (int32_t i = 0; i < pool->max; ++i) {
40,216,380✔
42
    SQueueWorker *worker = pool->workers + i;
34,556,140✔
43
    worker->id = i;
34,556,140✔
44
    worker->pool = pool;
34,556,140✔
45
  }
46

47
  uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
5,660,240✔
48
  return 0;
5,660,240✔
49
}
50

51
void tQWorkerCleanup(SQWorkerPool *pool) {
5,660,240✔
52
  for (int32_t i = 0; i < pool->max; ++i) {
40,216,380✔
53
    SQueueWorker *worker = pool->workers + i;
34,556,140✔
54
    if (taosCheckPthreadValid(worker->thread)) {
34,556,140✔
55
      taosQsetThreadResume(pool->qset);
34,556,140✔
56
    }
57
  }
58

59
  for (int32_t i = 0; i < pool->max; ++i) {
40,216,380✔
60
    SQueueWorker *worker = pool->workers + i;
34,556,140✔
61
    if (taosCheckPthreadValid(worker->thread)) {
34,556,140✔
62
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
34,556,140✔
63
      (void)taosThreadJoin(worker->thread, NULL);
34,556,140✔
64
      taosThreadClear(&worker->thread);
34,556,140✔
65
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
34,556,140✔
66
    }
67
  }
68

69
  taosMemoryFreeClear(pool->workers);
5,660,240✔
70
  taosCloseQset(pool->qset);
5,660,240✔
71
  (void)taosThreadMutexDestroy(&pool->mutex);
5,660,240✔
72

73
  uInfo("worker:%s is closed", pool->name);
5,660,240✔
74
}
5,660,240✔
75

76
static void *tQWorkerThreadFp(SQueueWorker *worker) {
34,552,252✔
77
  SQWorkerPool *pool = worker->pool;
34,552,252✔
78
  SQueueInfo    qinfo = {0};
34,554,868✔
79
  void         *msg = NULL;
34,554,728✔
80
  int32_t       code = 0;
34,554,728✔
81

82
  int32_t ret = taosBlockSIGPIPE();
34,554,728✔
83
  if (ret < 0) {
34,549,584✔
84
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
85
  }
86

87
  setThreadName(pool->name);
34,549,584✔
88
  worker->pid = taosGetSelfPthreadId();
34,556,140✔
89
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
34,555,625✔
90

91
  while (1) {
92
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
504,205,324✔
93
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
34,546,197✔
94
            worker->pid);
95
      break;
34,553,814✔
96
    }
97

98
    if (qinfo.timestamp != 0) {
469,652,663✔
99
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
469,653,524✔
100
      if (cost > QUEUE_THRESHOLD) {
469,653,524✔
101
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
1,244,397✔
102
      }
103
    }
104

105
    if (qinfo.fp != NULL) {
469,653,550✔
106
      qinfo.workerId = worker->id;
469,653,435✔
107
      qinfo.threadNum = pool->num;
469,653,435✔
108
      (*((FItem)qinfo.fp))(&qinfo, msg);
469,653,146✔
109
    }
110

111
    taosUpdateItemSize(qinfo.queue, 1);
469,646,108✔
112
  }
113

114
  DestoryThreadLocalRegComp();
34,553,814✔
115

116
  return NULL;
34,554,351✔
117
}
118

119
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
5,660,240✔
120
  int32_t     code;
121
  STaosQueue *queue;
5,657,813✔
122

123
  code = taosOpenQueue(&queue);
5,660,240✔
124
  if (code) {
5,660,240✔
125
    terrno = code;
×
126
    return NULL;
×
127
  }
128

129
  (void)taosThreadMutexLock(&pool->mutex);
5,660,240✔
130
  taosSetQueueFp(queue, fp, NULL);
5,660,240✔
131
  code = taosAddIntoQset(pool->qset, queue, ahandle);
5,660,240✔
132
  if (code) {
5,660,240✔
133
    taosCloseQueue(queue);
×
134
    (void)taosThreadMutexUnlock(&pool->mutex);
×
135
    terrno = code;
×
136
    return NULL;
×
137
  }
138

139
  // spawn a thread to process queue
140
  if (pool->num < pool->max) {
5,660,240✔
141
    do {
142
      SQueueWorker *worker = pool->workers + pool->num;
34,556,140✔
143

144
      TdThreadAttr thAttr;
34,539,322✔
145
      (void)taosThreadAttrInit(&thAttr);
34,556,140✔
146
      (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
34,556,140✔
147

148
      if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
34,556,140✔
149
        taosCloseQueue(queue);
×
150
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
151
        queue = NULL;
×
152
        break;
×
153
      }
154

155
      (void)taosThreadAttrDestroy(&thAttr);
34,556,140✔
156
      pool->num++;
34,556,140✔
157
      uInfo("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num);
34,556,140✔
158
    } while (pool->num < pool->min);
34,556,140✔
159
  }
160

161
  (void)taosThreadMutexUnlock(&pool->mutex);
5,660,240✔
162
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
5,660,240✔
163

164
  return queue;
5,660,240✔
165
}
166

167
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
5,660,240✔
168
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
5,660,240✔
169
  taosCloseQueue(queue);
5,660,240✔
170
}
5,660,240✔
171

172
int32_t tAutoQWorkerInit(SAutoQWorkerPool *pool) {
×
173
  int32_t code;
174

175
  code = taosOpenQset(&pool->qset);
×
176
  if (code) {
×
177
    return terrno = code;
×
178
  }
179

180
  pool->workers = taosArrayInit(2, sizeof(SQueueWorker *));
×
181
  if (pool->workers == NULL) {
×
182
    taosCloseQset(pool->qset);
×
183
    return terrno;
×
184
  }
185

186
  (void)taosThreadMutexInit(&pool->mutex, NULL);
×
187

188
  uInfo("worker:%s is initialized as auto", pool->name);
×
189
  return 0;
×
190
}
191

192
void tAutoQWorkerCleanup(SAutoQWorkerPool *pool) {
×
193
  int32_t size = taosArrayGetSize(pool->workers);
×
194
  for (int32_t i = 0; i < size; ++i) {
×
195
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
×
196
    if (taosCheckPthreadValid(worker->thread)) {
×
197
      taosQsetThreadResume(pool->qset);
×
198
    }
199
  }
200

201
  for (int32_t i = 0; i < size; ++i) {
×
202
    SQueueWorker *worker = taosArrayGetP(pool->workers, i);
×
203
    if (taosCheckPthreadValid(worker->thread)) {
×
204
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
×
205
      (void)taosThreadJoin(worker->thread, NULL);
×
206
      taosThreadClear(&worker->thread);
×
207
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
×
208
    }
209
    taosMemoryFree(worker);
×
210
  }
211

212
  taosArrayDestroy(pool->workers);
×
213
  taosCloseQset(pool->qset);
×
214
  (void)taosThreadMutexDestroy(&pool->mutex);
×
215

216
  uInfo("worker:%s is closed", pool->name);
×
217
}
×
218

219
static void *tAutoQWorkerThreadFp(SQueueWorker *worker) {
×
220
  SAutoQWorkerPool *pool = worker->pool;
×
221
  SQueueInfo        qinfo = {0};
×
222
  void             *msg = NULL;
×
223
  int32_t           code = 0;
×
224

225
  int32_t ret = taosBlockSIGPIPE();
×
226
  if (ret < 0) {
×
227
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
228
  }
229

230
  setThreadName(pool->name);
×
231
  worker->pid = taosGetSelfPthreadId();
×
232
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
×
233

234
  while (1) {
235
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
×
236
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
×
237
            worker->pid);
238
      break;
×
239
    }
240

241
    if (qinfo.timestamp != 0) {
×
242
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
×
243
      if (cost > QUEUE_THRESHOLD) {
×
244
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
×
245
      }
246
    }
247

248
    if (qinfo.fp != NULL) {
×
249
      qinfo.workerId = worker->id;
×
250
      qinfo.threadNum = taosArrayGetSize(pool->workers);
×
251
      (*((FItem)qinfo.fp))(&qinfo, msg);
×
252
    }
253

254
    taosUpdateItemSize(qinfo.queue, 1);
×
255
  }
256
  DestoryThreadLocalRegComp();
×
257
  closeThreadNotificationConn();
×
258

259
  return NULL;
×
260
}
261

262
STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem fp, int32_t minNum) {
×
263
  int32_t     code;
264
  STaosQueue *queue;
×
265

266
  code = taosOpenQueue(&queue);
×
267
  if (code) {
×
268
    terrno = code;
×
269
    return NULL;
×
270
  }
271

272
  (void)taosThreadMutexLock(&pool->mutex);
×
273
  taosSetQueueFp(queue, fp, NULL);
×
274

275
  code = taosAddIntoQset(pool->qset, queue, ahandle);
×
276
  if (code) {
×
277
    taosCloseQueue(queue);
×
278
    (void)taosThreadMutexUnlock(&pool->mutex);
×
279
    terrno = code;
×
280
    return NULL;
×
281
  }
282

283
  int32_t queueNum = taosGetQueueNumber(pool->qset);
×
284
  int32_t curWorkerNum = taosArrayGetSize(pool->workers);
×
285
  int32_t dstWorkerNum = ceilf(queueNum * pool->ratio);
×
286

287
  if (dstWorkerNum < minNum) {
×
288
    dstWorkerNum = minNum;
×
289
  }
290

291
  // spawn a thread to process queue
292
  while (curWorkerNum < dstWorkerNum) {
×
293
    SQueueWorker *worker = taosMemoryCalloc(1, sizeof(SQueueWorker));
×
294
    if (worker == NULL || taosArrayPush(pool->workers, &worker) == NULL) {
×
295
      uError("worker:%s:%d failed to create", pool->name, curWorkerNum);
×
296
      taosMemoryFree(worker);
×
297
      taosCloseQueue(queue);
×
298
      (void)taosThreadMutexUnlock(&pool->mutex);
×
299
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
300
      return NULL;
×
301
    }
302
    worker->id = curWorkerNum;
×
303
    worker->pool = pool;
×
304

305
    TdThreadAttr thAttr;
×
306
    (void)taosThreadAttrInit(&thAttr);
×
307
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
×
308

309
    if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tAutoQWorkerThreadFp, worker) != 0) {
×
310
      uError("worker:%s:%d failed to create thread, total:%d", pool->name, worker->id, curWorkerNum);
×
311
      void *tmp = taosArrayPop(pool->workers);
×
312
      taosMemoryFree(worker);
×
313
      taosCloseQueue(queue);
×
314
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
315
      return NULL;
×
316
    }
317

318
    (void)taosThreadAttrDestroy(&thAttr);
×
319
    int32_t numOfThreads = taosArrayGetSize(pool->workers);
×
320
    uInfo("worker:%s:%d is launched, total:%d, expect:%d", pool->name, worker->id, numOfThreads, dstWorkerNum);
×
321

322
    curWorkerNum++;
×
323
  }
324

325
  (void)taosThreadMutexUnlock(&pool->mutex);
×
326
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
×
327

328
  return queue;
×
329
}
330

331
void tAutoQWorkerFreeQueue(SAutoQWorkerPool *pool, STaosQueue *queue) {
×
332
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
×
333
  taosCloseQueue(queue);
×
334
}
×
335

336
int32_t tWWorkerInit(SWWorkerPool *pool) {
17,952,700✔
337
  pool->nextId = 0;
17,952,700✔
338
  pool->workers = taosMemoryCalloc(pool->max, sizeof(SWWorker));
17,953,413✔
339
  if (pool->workers == NULL) {
17,953,418✔
340
    return terrno;
×
341
  }
342

343
  (void)taosThreadMutexInit(&pool->mutex, NULL);
17,952,408✔
344

345
  for (int32_t i = 0; i < pool->max; ++i) {
42,724,899✔
346
    SWWorker *worker = pool->workers + i;
24,771,778✔
347
    worker->id = i;
24,771,778✔
348
    worker->qall = NULL;
24,772,075✔
349
    worker->qset = NULL;
24,772,075✔
350
    worker->pool = pool;
24,772,075✔
351
  }
352

353
  uInfo("worker:%s is initialized, max:%d", pool->name, pool->max);
17,952,553✔
354
  return 0;
17,953,710✔
355
}
356

357
void tWWorkerCleanup(SWWorkerPool *pool) {
17,953,346✔
358
  for (int32_t i = 0; i < pool->max; ++i) {
42,725,394✔
359
    SWWorker *worker = pool->workers + i;
24,772,424✔
360
    if (taosCheckPthreadValid(worker->thread)) {
24,772,424✔
361
      if (worker->qset) {
19,991,685✔
362
        taosQsetThreadResume(worker->qset);
19,992,087✔
363
      }
364
    }
365
  }
366

367
  for (int32_t i = 0; i < pool->max; ++i) {
42,725,758✔
368
    SWWorker *worker = pool->workers + i;
24,771,953✔
369
    if (taosCheckPthreadValid(worker->thread)) {
24,772,693✔
370
      uInfo("worker:%s:%d is stopping", pool->name, worker->id);
19,991,933✔
371
      (void)taosThreadJoin(worker->thread, NULL);
19,992,356✔
372
      taosThreadClear(&worker->thread);
19,992,451✔
373
      taosFreeQall(worker->qall);
19,991,902✔
374
      taosCloseQset(worker->qset);
19,992,451✔
375
      uInfo("worker:%s:%d is stopped", pool->name, worker->id);
19,992,451✔
376
    }
377
  }
378

379
  taosMemoryFreeClear(pool->workers);
17,953,710✔
380
  (void)taosThreadMutexDestroy(&pool->mutex);
17,953,404✔
381

382
  uInfo("worker:%s is closed", pool->name);
17,953,710✔
383
}
17,953,710✔
384

385
static void *tWWorkerThreadFp(SWWorker *worker) {
19,992,159✔
386
  SWWorkerPool *pool = worker->pool;
19,992,159✔
387
  SQueueInfo    qinfo = {0};
19,992,451✔
388
  void         *msg = NULL;
19,992,451✔
389
  int32_t       code = 0;
19,992,451✔
390
  int32_t       numOfMsgs = 0;
19,992,451✔
391

392
  int32_t ret = taosBlockSIGPIPE();
19,992,451✔
393
  if (ret < 0) {
19,988,989✔
394
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
395
  }
396

397
  setThreadName(pool->name);
19,988,989✔
398
  worker->pid = taosGetSelfPthreadId();
19,992,451✔
399
  uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
19,992,067✔
400

401
  while (1) {
402
    numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
1,316,239,619✔
403
    if (numOfMsgs == 0) {
1,316,176,064✔
404
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, worker->qset,
19,989,735✔
405
            worker->pid);
406
      break;
19,992,451✔
407
    }
408

409
    if (qinfo.timestamp != 0) {
1,296,186,329✔
410
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
1,296,177,435✔
411
      if (cost > QUEUE_THRESHOLD) {
1,296,177,435✔
412
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
23,232✔
413
      }
414
    }
415

416
    if (qinfo.fp != NULL) {
1,296,171,690✔
417
      qinfo.workerId = worker->id;
1,296,186,926✔
418
      qinfo.threadNum = pool->num;
1,296,198,911✔
419
      (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
1,296,218,220✔
420
    }
421
    taosUpdateItemSize(qinfo.queue, numOfMsgs);
1,296,184,774✔
422
  }
423

424
  return NULL;
19,992,451✔
425
}
426

427
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
21,438,146✔
428
  (void)taosThreadMutexLock(&pool->mutex);
21,438,146✔
429
  SWWorker   *worker = pool->workers + pool->nextId;
21,438,146✔
430
  int32_t     code = -1;
21,438,146✔
431
  STaosQueue *queue;
21,433,796✔
432

433
  code = taosOpenQueue(&queue);
21,438,146✔
434
  if (code) goto _OVER;
21,438,146✔
435

436
  taosSetQueueFp(queue, NULL, fp);
21,438,146✔
437
  if (worker->qset == NULL) {
21,438,146✔
438
    code = taosOpenQset(&worker->qset);
19,992,451✔
439
    if (code) goto _OVER;
19,991,390✔
440

441
    code = taosAddIntoQset(worker->qset, queue, ahandle);
19,991,390✔
442
    if (code) goto _OVER;
19,992,154✔
443
    code = taosAllocateQall(&worker->qall);
19,992,154✔
444
    if (code) goto _OVER;
19,991,655✔
445

446
    TdThreadAttr thAttr;
19,987,305✔
447
    (void)taosThreadAttrInit(&thAttr);
19,991,655✔
448
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
19,990,808✔
449
    code = taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tWWorkerThreadFp, worker);
19,990,012✔
450
    if ((code)) goto _OVER;
19,992,451✔
451

452
    uInfo("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max);
19,992,451✔
453
    pool->nextId = (pool->nextId + 1) % pool->max;
19,992,451✔
454

455
    (void)taosThreadAttrDestroy(&thAttr);
19,992,451✔
456
    pool->num++;
19,992,451✔
457
    if (pool->num > pool->max) pool->num = pool->max;
19,992,451✔
458
  } else {
459
    code = taosAddIntoQset(worker->qset, queue, ahandle);
1,445,695✔
460
    if (code) goto _OVER;
1,445,695✔
461
    pool->nextId = (pool->nextId + 1) % pool->max;
1,445,695✔
462
  }
463

464
_OVER:
21,438,146✔
465
  (void)taosThreadMutexUnlock(&pool->mutex);
21,438,146✔
466

467
  if (code) {
21,438,146✔
468
    if (queue != NULL) taosCloseQueue(queue);
×
469
    if (worker->qset != NULL) taosCloseQset(worker->qset);
×
470
    if (worker->qall != NULL) taosFreeQall(worker->qall);
×
471
    terrno = code;
×
472
    return NULL;
×
473
  } else {
474
    while (worker->pid <= 0) taosMsleep(10);
48,213,139✔
475

476
    taosQueueSetThreadId(queue, worker->pid);
21,436,076✔
477
    uInfo("worker:%s, queue:%p is allocated, ahandle:%p thread:%08" PRId64, pool->name, queue, ahandle, worker->pid);
21,437,857✔
478
    return queue;
21,438,146✔
479
  }
480
}
481

482
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
21,438,146✔
483
  uInfo("worker:%s, queue:%p is freed", pool->name, queue);
21,438,146✔
484
  taosCloseQueue(queue);
21,437,842✔
485
}
21,437,644✔
486

487
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
6,790,758✔
488
  int32_t code;
489
  pWorker->poolType = pCfg->poolType;
6,790,758✔
490
  pWorker->name = pCfg->name;
6,790,758✔
491
  pWorker->stopNoWaitQueue = pCfg->stopNoWaitQueue;
6,790,758✔
492

493
  switch (pCfg->poolType) {
6,790,758✔
494
    case QWORKER_POOL: {
5,660,240✔
495
      SQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQWorkerPool));
5,660,240✔
496
      if (!pPool) {
5,660,240✔
497
        return terrno;
×
498
      }
499
      pPool->name = pCfg->name;
5,660,240✔
500
      pPool->min = pCfg->min;
5,660,240✔
501
      pPool->max = pCfg->max;
5,660,240✔
502
      pWorker->pool = pPool;
5,660,240✔
503
      if ((code = tQWorkerInit(pPool))) {
5,660,240✔
504
        return (terrno = code);
×
505
      }
506

507
      pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
5,660,240✔
508
      if (pWorker->queue == NULL) {
5,660,240✔
509
        return terrno;
×
510
      }
511
    } break;
5,660,240✔
512
    case QUERY_AUTO_QWORKER_POOL: {
1,130,518✔
513
      SQueryAutoQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPool));
1,130,518✔
514
      if (!pPool) {
1,130,518✔
515
        return terrno;
×
516
      }
517
      pPool->name = pCfg->name;
1,130,518✔
518
      pPool->min = pCfg->min;
1,130,518✔
519
      pPool->max = pCfg->max;
1,130,518✔
520
      pPool->stopNoWaitQueue = pCfg->stopNoWaitQueue;
1,130,518✔
521
      pWorker->pool = pPool;
1,130,518✔
522

523
      code = tQueryAutoQWorkerInit(pPool);
1,130,518✔
524
      if (code) return code;
1,130,518✔
525

526
      pWorker->queue = tQueryAutoQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
1,130,518✔
527
      if (!pWorker->queue) {
1,130,518✔
528
        return terrno;
×
529
      }
530
    } break;
1,130,518✔
531
    default:
×
532
      return TSDB_CODE_INVALID_PARA;
×
533
  }
534
  return 0;
6,790,758✔
535
}
536

537
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
6,790,758✔
538
  if (pWorker->queue == NULL) return;
6,790,758✔
539
  if (!pWorker->stopNoWaitQueue) {
6,790,758✔
540
    while (!taosQueueEmpty(pWorker->queue)) {
7,443,653✔
541
      taosMsleep(10);
759,495✔
542
    }
543
  }
544

545
  switch (pWorker->poolType) {
6,790,758✔
546
    case QWORKER_POOL:
5,660,240✔
547
      tQWorkerCleanup(pWorker->pool);
5,660,240✔
548
      tQWorkerFreeQueue(pWorker->pool, pWorker->queue);
5,660,240✔
549
      taosMemoryFree(pWorker->pool);
5,660,240✔
550
      break;
5,660,240✔
551
    case QUERY_AUTO_QWORKER_POOL:
1,130,518✔
552
      tQueryAutoQWorkerCleanup(pWorker->pool);
1,130,518✔
553
      tQueryAutoQWorkerFreeQueue(pWorker->pool, pWorker->queue);
1,130,518✔
554
      taosMemoryFree(pWorker->pool);
1,130,518✔
555
      break;
1,130,518✔
556
    default:
×
557
      break;
×
558
  }
559
}
560

561
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
16,744,922✔
562
  SWWorkerPool *pPool = &pWorker->pool;
16,744,922✔
563
  pPool->name = pCfg->name;
16,745,635✔
564
  pPool->max = pCfg->max;
16,745,635✔
565

566
  int32_t code = tWWorkerInit(pPool);
16,745,635✔
567
  if (code) return code;
16,745,932✔
568

569
  pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
16,745,932✔
570
  if (pWorker->queue == NULL) {
16,745,932✔
571
    return terrno;
×
572
  }
573

574
  pWorker->name = pCfg->name;
16,745,932✔
575
  return 0;
16,745,932✔
576
}
577

578
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
16,745,932✔
579
  if (pWorker->queue == NULL) return;
16,745,932✔
580

581
  while (!taosQueueEmpty(pWorker->queue)) {
18,829,123✔
582
    taosMsleep(10);
2,083,955✔
583
  }
584

585
  tWWorkerCleanup(&pWorker->pool);
16,745,568✔
586
  tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
16,745,932✔
587
}
588

589
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool);
590
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p);
591
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p);
592
static void    tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool);
593
static bool    tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker);
594

595
#define GET_ACTIVE_N(int64_val)  (int32_t)((int64_val) >> 32)
596
#define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF)
597

598
static int32_t atomicFetchSubActive(int64_t *ptr, int32_t val) {
×
599
  int64_t acutalSubVal = val;
×
600
  acutalSubVal <<= 32;
×
601
  int64_t newVal64 = atomic_fetch_sub_64(ptr, acutalSubVal);
×
602
  return GET_ACTIVE_N(newVal64);
×
603
}
604

605
static int32_t atomicFetchSubRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_sub_64(ptr, val)); }
1,673,977,848✔
606

607
static int32_t atomicFetchAddActive(int64_t *ptr, int32_t val) {
174,067,782✔
608
  int64_t actualAddVal = val;
174,067,782✔
609
  actualAddVal <<= 32;
174,067,782✔
610
  int64_t newVal64 = atomic_fetch_add_64(ptr, actualAddVal);
174,067,782✔
611
  return GET_ACTIVE_N(newVal64);
174,067,782✔
612
}
613

614
static int32_t atomicFetchAddRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_add_64(ptr, val)); }
×
615

616
static bool atomicCompareExchangeActive(int64_t *ptr, int32_t *expectedVal, int32_t newVal) {
186✔
617
  int64_t oldVal64 = *expectedVal, newVal64 = newVal;
186✔
618
  int32_t running = GET_RUNNING_N(*ptr);
186✔
619
  oldVal64 <<= 32;
186✔
620
  newVal64 <<= 32;
186✔
621
  oldVal64 |= running;
186✔
622
  newVal64 |= running;
186✔
623
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
186✔
624
  if (actualNewVal64 == oldVal64) {
186✔
625
    return true;
186✔
626
  } else {
627
    *expectedVal = GET_ACTIVE_N(actualNewVal64);
×
628
    return false;
×
629
  }
630
}
631

632
static int64_t atomicCompareExchangeRunning(int64_t *ptr, int32_t *expectedVal, int32_t newVal) {
×
633
  int64_t oldVal64 = *expectedVal, newVal64 = newVal;
×
634
  int64_t activeShifted = GET_ACTIVE_N(*ptr);
×
635
  activeShifted <<= 32;
×
636
  oldVal64 |= activeShifted;
×
637
  newVal64 |= activeShifted;
×
638
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
×
639
  if (actualNewVal64 == oldVal64) {
×
640
    return true;
×
641
  } else {
642
    *expectedVal = GET_RUNNING_N(actualNewVal64);
×
643
    return false;
×
644
  }
645
}
646

647
static int64_t atomicCompareExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive,
1,876,180,717✔
648
                                                     int32_t *expectedRunning, int32_t newRunning) {
649
  int64_t oldVal64 = *expectedActive, newVal64 = newActive;
1,876,180,717✔
650
  oldVal64 <<= 32;
1,876,184,674✔
651
  oldVal64 |= *expectedRunning;
1,876,184,674✔
652
  newVal64 <<= 32;
1,876,184,794✔
653
  newVal64 |= newRunning;
1,876,184,794✔
654
  int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
1,876,184,794✔
655
  if (actualNewVal64 == oldVal64) {
1,876,188,206✔
656
    return true;
1,876,045,380✔
657
  } else {
658
    *expectedActive = GET_ACTIVE_N(actualNewVal64);
142,826✔
659
    *expectedRunning = GET_RUNNING_N(actualNewVal64);
142,826✔
660
    return false;
142,826✔
661
  }
662
}
663

664
static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
174,459,345✔
665
  SQueryAutoQWorkerPool *pool = worker->pool;
174,459,345✔
666
  SQueueInfo             qinfo = {0};
174,469,969✔
667
  void                  *msg = NULL;
174,468,451✔
668
  int32_t                code = 0;
174,465,386✔
669

670
  int32_t ret = taosBlockSIGPIPE();
174,465,386✔
671
  if (ret < 0) {
174,456,063✔
672
    uError("worker:%s:%d failed to block SIGPIPE", pool->name, worker->id);
×
673
  }
674

675
  setThreadName(pool->name);
174,456,063✔
676
  worker->pid = taosGetSelfPthreadId();
174,472,463✔
677
  uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid);
174,466,157✔
678

679
  while (1) {
680
    if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
1,848,329,051✔
681
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%08" PRId64, pool->name, worker->id, pool->qset,
174,246,983✔
682
            worker->pid);
683
      break;
174,293,234✔
684
    }
685

686
    if (pool->exit) {
1,674,001,446✔
687
      uInfo("worker:%s:%d exit, thread:%08" PRId64, pool->name, worker->id, worker->pid);
×
688
      break;
×
689
    }
690

691
    if (qinfo.timestamp != 0) {
1,674,008,520✔
692
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
1,674,008,572✔
693
      if (cost > QUEUE_THRESHOLD) {
1,674,008,572✔
694
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pool->name, cost / QUEUE_THRESHOLD);
16,044✔
695
      }
696
    }
697

698
    tQueryAutoQWorkerWaitingCheck(pool);
1,674,012,983✔
699

700
    if (qinfo.fp != NULL) {
1,674,004,254✔
701
      qinfo.workerId = worker->id;
1,674,009,425✔
702
      qinfo.threadNum = pool->num;
1,674,009,556✔
703
      qinfo.workerCb = pool->pCb;
1,674,008,447✔
704
      (*((FItem)qinfo.fp))(&qinfo, msg);
1,674,008,475✔
705
    }
706

707
    taosUpdateItemSize(qinfo.queue, 1);
1,673,931,779✔
708
    if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) {
1,674,001,662✔
709
      uDebug("worker:%s:%d exited", pool->name, worker->id);
120,240✔
710
      break;
120,240✔
711
    }
712
  }
713

714
  DestoryThreadLocalRegComp();
174,413,474✔
715
  closeThreadNotificationConn();
174,436,029✔
716

717
  return NULL;
174,431,217✔
718
}
719

720
static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) {
1,775,011,421✔
721
  SQueryAutoQWorkerPool *pPool = p;
1,775,011,421✔
722
  bool                   ret = false;
1,775,011,421✔
723
  int32_t                waiting = pPool->waitingAfterBlockN;
1,775,011,421✔
724
  while (waiting > 0) {
1,775,016,753✔
UNCOV
725
    int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1);
×
UNCOV
726
    if (waitingNew == waiting) {
×
UNCOV
727
      (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
×
UNCOV
728
      (void)taosThreadCondSignal(&pPool->waitingAfterBlockCond);
×
UNCOV
729
      (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
×
730
      ret = true;
76✔
731
      break;
76✔
732
    }
733
    waiting = waitingNew;
×
734
  }
735
  return ret;
1,775,016,167✔
736
}
737

738
static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void *p) {
1,775,018,215✔
739
  SQueryAutoQWorkerPool *pPool = p;
1,775,018,215✔
740
  bool                   ret = false;
1,775,018,215✔
741
  int32_t                waiting = pPool->waitingBeforeProcessMsgN;
1,775,018,215✔
742
  while (waiting > 0) {
1,775,021,226✔
743
    int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1);
186✔
744
    if (waitingNew == waiting) {
186✔
745
      (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
186✔
746
      (void)taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond);
186✔
747
      (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
186✔
748
      ret = true;
16✔
749
      break;
16✔
750
    }
751
    waiting = waitingNew;
×
752
  }
753
  return ret;
1,775,013,952✔
754
}
755

756
static bool tQueryAutoQWorkerTryDecActive(void *p, int32_t minActive) {
1,775,005,442✔
757
  SQueryAutoQWorkerPool *pPool = p;
1,775,005,442✔
758
  bool                   ret = false;
1,775,005,442✔
759
  int64_t                val64 = pPool->activeRunningN;
1,775,005,442✔
760
  int32_t                active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64);
1,775,015,371✔
761
  while (active > minActive) {
1,775,013,216✔
762
    if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1))
101,021,674✔
763
      return true;
101,018,172✔
764
  }
765
  return false;
1,673,991,542✔
766
}
767

768
static void tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool *pPool) {
1,674,007,873✔
769
  while (1) {
32✔
770
    int64_t val64 = pPool->activeRunningN;
1,674,007,873✔
771
    int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
1,674,008,838✔
772
    while (running < pPool->num) {
1,674,143,133✔
773
      if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) {
1,674,143,615✔
774
        return;
1,674,009,447✔
775
      }
776
    }
777
    if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) {
226✔
778
      break;
186✔
779
    }
780
  }
781
  // to wait for process
782
  (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
186✔
783
  (void)atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1);
186✔
784
  if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock);
186✔
785
  // recovered from waiting
786
  (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
186✔
787
  return;
186✔
788
}
789

790
bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool *pPool, SQueryAutoQWorker *pWorker) {
1,673,992,303✔
791
  if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(pPool) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(pPool) ||
2,147,483,647✔
792
      tQueryAutoQWorkerTryDecActive(pPool, pPool->num)) {
1,673,996,951✔
793
    (void)taosThreadMutexLock(&pPool->poolLock);
37,781,870✔
794
    if (pPool->exit) {
37,777,869✔
795
      (void)taosThreadMutexUnlock(&pPool->poolLock);
2,656✔
796
      return false;
2,656✔
797
    }
798

799
    SListNode *pNode = listNode(pWorker);
37,775,213✔
800
    SListNode *tNode = tdListPopNode(pPool->workers, pNode);
37,775,213✔
801
    // reclaim some workers
802
    if (pWorker->id >= pPool->maxInUse) {
37,775,213✔
UNCOV
803
      while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) {
×
804
        SListNode         *head = tdListPopHead(pPool->exitedWorkers);
×
805
        SQueryAutoQWorker *pWorker = (SQueryAutoQWorker *)head->data;
×
806
        if (pWorker && taosCheckPthreadValid(pWorker->thread)) {
×
807
          (void)taosThreadJoin(pWorker->thread, NULL);
×
808
          taosThreadClear(&pWorker->thread);
×
809
        }
810
        taosMemoryFree(head);
×
811
      }
UNCOV
812
      tdListAppendNode(pPool->exitedWorkers, pNode);
×
UNCOV
813
      (void)taosThreadMutexUnlock(&pPool->poolLock);
×
UNCOV
814
      return false;
×
815
    }
816

817
    // put back to backup pool
818
    tdListAppendNode(pPool->backupWorkers, pNode);
37,775,213✔
819
    (void)taosThreadMutexUnlock(&pPool->poolLock);
37,775,213✔
820

821
    // start to wait at backup cond
822
    (void)taosThreadMutexLock(&pPool->backupLock);
37,775,213✔
823
    (void)atomic_fetch_add_32(&pPool->backupNum, 1);
37,775,213✔
824
    if (!pPool->exit) (void)taosThreadCondWait(&pPool->backupCond, &pPool->backupLock);
37,775,213✔
825
    (void)taosThreadMutexUnlock(&pPool->backupLock);
37,775,213✔
826

827
    // recovered from backup
828
    (void)taosThreadMutexLock(&pPool->poolLock);
37,775,213✔
829
    if (pPool->exit) {
37,775,213✔
830
      (void)taosThreadMutexUnlock(&pPool->poolLock);
117,584✔
831
      return false;
117,584✔
832
    }
833
    SListNode *tNode1 = tdListPopNode(pPool->backupWorkers, pNode);
37,657,629✔
834
    tdListAppendNode(pPool->workers, pNode);
37,657,629✔
835
    (void)taosThreadMutexUnlock(&pPool->poolLock);
37,657,629✔
836

837
    return true;
37,657,629✔
838
  } else {
839
    (void)atomicFetchSubRunning(&pPool->activeRunningN, 1);
1,636,207,807✔
840
    return true;
1,636,225,898✔
841
  }
842
}
843

844
int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
3,798,749✔
845
  int32_t code;
846

847
  pool->exit = false;
3,798,749✔
848

849
  (void)taosThreadMutexInit(&pool->poolLock, NULL);
3,798,749✔
850
  (void)taosThreadMutexInit(&pool->backupLock, NULL);
3,798,749✔
851
  (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL);
3,798,749✔
852
  (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL);
3,798,749✔
853

854
  (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL);
3,798,749✔
855
  (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL);
3,798,749✔
856
  (void)taosThreadCondInit(&pool->backupCond, NULL);
3,798,749✔
857

858
  code = taosOpenQset(&pool->qset);
3,798,749✔
859
  if (code) return terrno = code;
3,798,749✔
860
  pool->workers = tdListNew(sizeof(SQueryAutoQWorker));
3,798,749✔
861
  if (!pool->workers) return terrno;
3,798,749✔
862
  pool->backupWorkers = tdListNew(sizeof(SQueryAutoQWorker));
3,798,749✔
863
  if (!pool->backupWorkers) return terrno;
3,798,749✔
864
  pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker));
3,798,749✔
865
  if (!pool->exitedWorkers) return terrno;
3,798,749✔
866
  pool->maxInUse = pool->max * 2 + 2;
3,798,749✔
867

868
  if (!pool->pCb) {
3,798,749✔
869
    pool->pCb = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPoolCB));
3,798,749✔
870
    if (!pool->pCb) return terrno;
3,798,749✔
871
    pool->pCb->pPool = pool;
3,798,749✔
872
    pool->pCb->beforeBlocking = tQueryAutoQWorkerBeforeBlocking;
3,798,749✔
873
    pool->pCb->afterRecoverFromBlocking = tQueryAutoQWorkerRecoverFromBlocking;
3,798,749✔
874
  }
875
  return TSDB_CODE_SUCCESS;
3,798,749✔
876
}
877

878
void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
3,798,783✔
879
  (void)taosThreadMutexLock(&pPool->poolLock);
3,798,783✔
880
  if (pPool->stopNoWaitQueue) {
3,798,783✔
881
    pPool->exit = true;
106,600✔
882
  }
883
  int32_t size = 0;
3,798,783✔
884
  if (pPool->workers) {
3,798,783✔
885
    size = listNEles(pPool->workers);
3,798,749✔
886
  }
887
  if (pPool->backupWorkers) {
3,798,783✔
888
    size += listNEles(pPool->backupWorkers);
3,798,749✔
889
  }
890
  if (pPool->qset) {
3,798,783✔
891
    for (int32_t i = 0; i < size; ++i) {
178,278,429✔
892
      taosQsetThreadResume(pPool->qset);
174,479,680✔
893
    }
894
  }
895
  (void)taosThreadMutexUnlock(&pPool->poolLock);
3,798,783✔
896

897
  (void)taosThreadMutexLock(&pPool->backupLock);
3,798,783✔
898
  (void)taosThreadCondBroadcast(&pPool->backupCond);
3,798,783✔
899
  (void)taosThreadMutexUnlock(&pPool->backupLock);
3,798,783✔
900

901
  (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
3,798,783✔
902
  (void)taosThreadCondBroadcast(&pPool->waitingAfterBlockCond);
3,798,783✔
903
  (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
3,798,783✔
904

905
  (void)taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
3,798,783✔
906
  (void)taosThreadCondBroadcast(&pPool->waitingBeforeProcessMsgCond);
3,798,783✔
907
  (void)taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
3,798,783✔
908

909
  int32_t            idx = 0;
3,798,783✔
910
  SQueryAutoQWorker *worker = NULL;
3,798,783✔
911
  while (pPool->workers) {
178,160,879✔
912
    (void)taosThreadMutexLock(&pPool->poolLock);
178,160,845✔
913
    if (listNEles(pPool->workers) == 0) {
178,160,845✔
914
      (void)taosThreadMutexUnlock(&pPool->poolLock);
3,798,749✔
915
      break;
3,798,749✔
916
    }
917
    SListNode *pNode = tdListPopHead(pPool->workers);
174,362,096✔
918
    uDebug("0free worker node:%p, prev:%p, next:%p", pNode, TD_DLIST_NODE_PREV(pNode), TD_DLIST_NODE_NEXT(pNode));
174,362,096✔
919
    worker = pNode ? (SQueryAutoQWorker *)pNode->data : NULL;
174,362,096✔
920
    (void)taosThreadMutexUnlock(&pPool->poolLock);
174,362,096✔
921
    if (worker && taosCheckPthreadValid(worker->thread)) {
174,362,096✔
922
      (void)taosThreadJoin(worker->thread, NULL);
174,362,096✔
923
      taosThreadClear(&worker->thread);
174,362,096✔
924
    }
925
    uDebug("free worker node:%p, prev:%p, next:%p", pNode, TD_DLIST_NODE_PREV(pNode), TD_DLIST_NODE_NEXT(pNode));
174,362,096✔
926

927
    taosMemoryFree(pNode);
174,362,096✔
928
  }
929

930
  while (pPool->backupWorkers) {
3,916,367✔
931
    (void)taosThreadMutexLock(&pPool->poolLock);
3,916,333✔
932
    if (listNEles(pPool->backupWorkers) == 0) {
3,916,333✔
933
      (void)taosThreadMutexUnlock(&pPool->poolLock);
3,798,749✔
934
      break;
3,798,749✔
935
    }
936
    uDebug("backupworker head:%p, prev:%p, next:%p", TD_DLIST_HEAD(pPool->backupWorkers), TD_DLIST_NODE_PREV(TD_DLIST_HEAD(pPool->backupWorkers)), TD_DLIST_NODE_NEXT(TD_DLIST_HEAD(pPool->backupWorkers)));
117,584✔
937
    SListNode *pNode = tdListPopHead(pPool->backupWorkers);
117,584✔
938
    worker = pNode ? (SQueryAutoQWorker *)pNode->data : NULL;
117,584✔
939
    (void)taosThreadMutexUnlock(&pPool->poolLock);
117,584✔
940

941
    if (worker && taosCheckPthreadValid(worker->thread)) {
117,584✔
942
      (void)taosThreadJoin(worker->thread, NULL);
117,584✔
943
      taosThreadClear(&worker->thread);
117,584✔
944
    }
945
    taosMemoryFree(pNode);
117,584✔
946
  }
947

948
  while (pPool->exitedWorkers) {
3,798,783✔
949
    (void)taosThreadMutexLock(&pPool->poolLock);
3,798,749✔
950
    if (listNEles(pPool->exitedWorkers) == 0) {
3,798,749✔
951
      (void)taosThreadMutexUnlock(&pPool->poolLock);
3,798,749✔
952
      break;
3,798,749✔
953
    }
954

UNCOV
955
    SListNode *pNode = tdListPopHead(pPool->exitedWorkers);
×
UNCOV
956
    worker = pNode ? (SQueryAutoQWorker *)pNode->data : NULL;
×
UNCOV
957
    (void)taosThreadMutexUnlock(&pPool->poolLock);
×
958

UNCOV
959
    if (worker && taosCheckPthreadValid(worker->thread)) {
×
UNCOV
960
      (void)taosThreadJoin(worker->thread, NULL);
×
UNCOV
961
      taosThreadClear(&worker->thread);
×
962
    }
UNCOV
963
    taosMemoryFree(pNode);
×
964
  }
965

966
  (void)taosThreadMutexLock(&pPool->poolLock);
3,798,783✔
967
  pPool->workers = tdListFree(pPool->workers);
3,798,783✔
968
  pPool->backupWorkers = tdListFree(pPool->backupWorkers);
3,798,783✔
969
  pPool->exitedWorkers = tdListFree(pPool->exitedWorkers);
3,798,783✔
970
  taosMemoryFree(pPool->pCb);
3,798,783✔
971
  (void)taosThreadMutexUnlock(&pPool->poolLock);
3,798,783✔
972

973
  (void)taosThreadMutexDestroy(&pPool->poolLock);
3,798,783✔
974
  (void)taosThreadMutexDestroy(&pPool->backupLock);
3,798,783✔
975
  (void)taosThreadMutexDestroy(&pPool->waitingAfterBlockLock);
3,798,783✔
976
  (void)taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock);
3,798,783✔
977

978
  (void)taosThreadCondDestroy(&pPool->backupCond);
3,798,783✔
979
  (void)taosThreadCondDestroy(&pPool->waitingAfterBlockCond);
3,798,783✔
980
  (void)taosThreadCondDestroy(&pPool->waitingBeforeProcessMsgCond);
3,798,783✔
981
  taosCloseQset(pPool->qset);
3,798,783✔
982
}
3,798,783✔
983

984
STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahandle, FItem fp) {
10,767,621✔
985
  STaosQueue *queue;
10,748,288✔
986
  int32_t     code = taosOpenQueue(&queue);
10,767,621✔
987
  if (code) {
10,767,257✔
988
    terrno = code;
×
989
    return NULL;
×
990
  }
991

992
  (void)taosThreadMutexLock(&pool->poolLock);
10,767,257✔
993
  taosSetQueueFp(queue, fp, NULL);
10,767,621✔
994
  code = taosAddIntoQset(pool->qset, queue, ahandle);
10,767,621✔
995
  if (code) {
10,767,621✔
996
    taosCloseQueue(queue);
×
997
    queue = NULL;
×
998
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
999
    return NULL;
×
1000
  }
1001
  SQueryAutoQWorker  worker = {0};
10,767,621✔
1002
  SQueryAutoQWorker *pWorker = NULL;
10,767,621✔
1003

1004
  // spawn a thread to process queue
1005
  if (pool->num < pool->max) {
10,767,621✔
1006
    do {
1007
      worker.id = listNEles(pool->workers);
174,067,782✔
1008
      worker.backupIdx = -1;
174,067,782✔
1009
      worker.pool = pool;
174,067,782✔
1010
      SListNode *pNode = tdListAdd(pool->workers, &worker);
174,067,782✔
1011
      if (!pNode) {
174,067,782✔
1012
        taosCloseQueue(queue);
×
1013
        queue = NULL;
×
1014
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1015
        break;
×
1016
      }
1017
      pWorker = (SQueryAutoQWorker *)pNode->data;
174,067,782✔
1018

1019
      TdThreadAttr thAttr;
172,657,102✔
1020
      (void)taosThreadAttrInit(&thAttr);
174,067,782✔
1021
      (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
174,067,782✔
1022

1023
      if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
174,067,782✔
1024
        taosCloseQueue(queue);
×
1025
        queue = NULL;
×
1026
        break;
×
1027
      }
1028

1029
      (void)taosThreadAttrDestroy(&thAttr);
174,067,782✔
1030
      pool->num++;
174,067,782✔
1031
      (void)atomicFetchAddActive(&pool->activeRunningN, 1);
174,067,782✔
1032
      uInfo("worker:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num);
174,067,782✔
1033
    } while (pool->num < pool->min);
174,067,782✔
1034
  }
1035

1036
  (void)taosThreadMutexUnlock(&pool->poolLock);
10,767,621✔
1037
  uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle);
10,767,621✔
1038

1039
  return queue;
10,767,621✔
1040
}
1041

1042
void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool *pPool, STaosQueue *pQ) { taosCloseQueue(pQ); }
9,503,484✔
1043

1044
static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool *pool) {
37,777,672✔
1045
  // try backup pool
1046
  int32_t backup = pool->backupNum;
37,777,672✔
1047
  while (backup > 0) {
37,778,066✔
1048
    int32_t backupNew = atomic_val_compare_exchange_32(&pool->backupNum, backup, backup - 1);
37,366,168✔
1049
    if (backupNew == backup) {
37,366,168✔
1050
      (void)taosThreadCondSignal(&pool->backupCond);
37,365,971✔
1051
      return TSDB_CODE_SUCCESS;
37,365,840✔
1052
    }
1053
    backup = backupNew;
197✔
1054
  }
1055
  // backup pool is empty, create new
1056
  SQueryAutoQWorker *pWorker = NULL;
411,898✔
1057
  SQueryAutoQWorker  worker = {0};
411,898✔
1058
  worker.pool = pool;
411,898✔
1059
  worker.backupIdx = -1;
411,898✔
1060
  (void)taosThreadMutexLock(&pool->poolLock);
411,898✔
1061
  if (pool->exit) {
411,898✔
1062
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
1063
    return TSDB_CODE_SUCCESS;
×
1064
  }
1065
  worker.id = listNEles(pool->workers);
411,898✔
1066
  SListNode *pNode = tdListAdd(pool->workers, &worker);
411,898✔
1067
  if (!pNode) {
411,898✔
1068
    (void)taosThreadMutexUnlock(&pool->poolLock);
×
1069
    return terrno;
×
1070
  }
1071
  (void)taosThreadMutexUnlock(&pool->poolLock);
411,898✔
1072
  pWorker = (SQueryAutoQWorker *)pNode->data;
411,898✔
1073

1074
  TdThreadAttr thAttr;
410,842✔
1075
  (void)taosThreadAttrInit(&thAttr);
411,898✔
1076
  (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
411,898✔
1077

1078
  if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tQueryAutoQWorkerThreadFp, pWorker) != 0) {
411,898✔
1079
    uError("create queryAutoWorker thread failed, error:%s", tstrerror(terrno));
×
1080
    return terrno;
×
1081
  }
1082
  (void)taosThreadAttrDestroy(&thAttr);
411,898✔
1083

1084
  return TSDB_CODE_SUCCESS;
411,898✔
1085
}
1086

1087
static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) {
101,018,161✔
1088
  SQueryAutoQWorkerPool *pPool = p;
101,018,161✔
1089
  if (tQueryAutoQWorkerTrySignalWaitingAfterBlock(p) || tQueryAutoQWorkerTrySignalWaitingBeforeProcess(p) ||
202,036,519✔
1090
      tQueryAutoQWorkerTryDecActive(p, pPool->num)) {
101,017,722✔
1091
  } else {
1092
    int32_t code = tQueryAutoQWorkerAddWorker(pPool);
37,777,869✔
1093
    if (code != TSDB_CODE_SUCCESS) {
37,777,869✔
1094
      return code;
×
1095
    }
1096
    (void)atomicFetchSubRunning(&pPool->activeRunningN, 1);
37,777,869✔
1097
  }
1098

1099
  return TSDB_CODE_SUCCESS;
101,018,227✔
1100
}
1101

1102
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) {
101,017,927✔
1103
  SQueryAutoQWorkerPool *pPool = p;
101,017,927✔
1104
  int64_t                val64 = pPool->activeRunningN;
101,017,927✔
1105
  int32_t                running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
101,018,358✔
1106
  while (running < pPool->num) {
101,021,805✔
1107
    if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) {
101,021,805✔
1108
      return TSDB_CODE_SUCCESS;
101,018,358✔
1109
    }
1110
  }
UNCOV
1111
  (void)taosThreadMutexLock(&pPool->waitingAfterBlockLock);
×
UNCOV
1112
  (void)atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1);
×
UNCOV
1113
  if (!pPool->exit) (void)taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock);
×
UNCOV
1114
  (void)taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
×
UNCOV
1115
  if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT;
×
UNCOV
1116
  return TSDB_CODE_SUCCESS;
×
1117
}
1118

1119
int32_t tDispatchWorkerInit(SDispatchWorkerPool *pPool) {
1,315,904✔
1120
  int32_t code = 0;
1,315,904✔
1121
  pPool->num = 0;
1,315,904✔
1122
  pPool->pWorkers = taosMemCalloc(pPool->max, sizeof(SDispatchWorker));
1,315,904✔
1123
  if (!pPool->pWorkers) return terrno;
1,315,904✔
1124
  (void)taosThreadMutexInit(&pPool->poolLock, NULL);
1,315,904✔
1125
  return code;
1,315,904✔
1126
}
1127

1128
static void *tDispatchWorkerThreadFp(SDispatchWorker *pWorker) {
10,291,638✔
1129
  SDispatchWorkerPool *pPool = pWorker->pool;
10,291,638✔
1130
  SQueueInfo qinfo = {0};
10,297,158✔
1131
  int32_t code = 0;
10,296,719✔
1132
  void *msg = NULL;
10,296,719✔
1133

1134
  int32_t ret = taosBlockSIGPIPE();
10,296,719✔
1135
  if (ret < 0) {
10,295,990✔
1136
    uError("worker:%s:%d failed to block SIGPIPE", pPool->name, pWorker->id);
×
1137
  }
1138

1139
  setThreadName(pPool->name);
10,295,990✔
1140
  pWorker->pid = taosGetSelfPthreadId();
10,297,158✔
1141
  uInfo("worker:%s:%d is running, thread:%d", pPool->name, pWorker->id, pWorker->pid);
10,295,530✔
1142

1143
  while (1) {
1144
    if (taosReadQitemFromQset(pWorker->qset, (void **)&msg, &qinfo) == 0) {
111,404,200✔
1145
      uInfo("worker:%s:%d qset:%p, got no message and exiting, thread:%d", pPool->name, pWorker->id,
10,283,067✔
1146
            pWorker->qset, pWorker->pid);
1147
      break;
10,295,119✔
1148
    }
1149

1150
    if (qinfo.timestamp != 0) {
101,122,116✔
1151
      int64_t cost = taosGetTimestampUs() - qinfo.timestamp;
101,119,307✔
1152
      if (cost > QUEUE_THRESHOLD) {
101,119,307✔
1153
        uWarn("worker:%s,message has been queued for too long, cost: %" PRId64 "s", pPool->name, cost / QUEUE_THRESHOLD);
28,548✔
1154
      }
1155
    }
1156

1157
    if (qinfo.fp != NULL) {
101,118,493✔
1158
      qinfo.workerId = pWorker->id;
101,120,507✔
1159
      qinfo.threadNum = pPool->num;
101,120,507✔
1160
      (*((FItem)qinfo.fp))(&qinfo, msg);
101,120,508✔
1161
    }
1162
  }
1163
  DestoryThreadLocalRegComp();
10,295,119✔
1164
  closeThreadNotificationConn();
10,295,990✔
1165
  return NULL;
10,296,750✔
1166
}
1167

1168
int32_t tDispatchWorkerAllocQueue(SDispatchWorkerPool *pPool, void *ahandle, FItem fp, DispatchFp dispatchFp) {
1,315,904✔
1169
  int32_t code = 0;
1,315,904✔
1170
  SDispatchWorker* pWorker = NULL;
1,315,904✔
1171
  (void)taosThreadMutexLock(&pPool->poolLock);
1,315,904✔
1172
  pPool->dispatchFp = dispatchFp;
1,315,904✔
1173
  for (int32_t i = pPool->num; i < pPool->max; ++i) {
11,613,062✔
1174
    pWorker = pPool->pWorkers + i;
10,297,158✔
1175
    pWorker->id = pPool->num;
10,297,158✔
1176
    pWorker->pool = pPool;
10,297,158✔
1177
    pPool->num++;
10,297,158✔
1178
    code = taosOpenQset(&pWorker->qset);
10,297,158✔
1179
    if (code != 0) break;
10,297,158✔
1180
    code = taosOpenQueue(&pWorker->queue);
10,297,158✔
1181
    if (code != 0) break;
10,297,158✔
1182
    taosSetQueueFp(pWorker->queue, fp, ahandle);
10,297,158✔
1183
    code = taosAddIntoQset(pWorker->qset, pWorker->queue, ahandle);
10,297,158✔
1184
    if (code != 0) break;
10,297,158✔
1185

1186
    TdThreadAttr thAttr;
10,288,413✔
1187
    (void)taosThreadAttrInit(&thAttr);
10,297,158✔
1188
    (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
10,297,158✔
1189

1190
    if (taosThreadCreate(&pWorker->thread, &thAttr, (ThreadFp)tDispatchWorkerThreadFp, pWorker) != 0) {
10,297,158✔
1191
      code = terrno;
×
1192
      (void)taosThreadAttrDestroy(&thAttr);
×
1193
      break;
×
1194
    }
1195
    (void)taosThreadAttrDestroy(&thAttr);
10,297,158✔
1196
    uInfo("worker:%s:%d is launched, threadId:%" PRId64 ", total:%d", pPool->name, pWorker->id, taosGetPthreadId(pWorker->thread), pPool->num);
10,297,158✔
1197
  }
1198

1199
  (void)taosThreadMutexUnlock(&pPool->poolLock);
1,315,904✔
1200
  if (code == 0) uInfo("worker:%s, queue:%p is allocated, ahandle:%p", pPool->name, pWorker->queue, ahandle);
1,315,904✔
1201
  return code;
1,315,904✔
1202
}
1203

1204
static void tDispatchWorkerFreeQueue(SDispatchWorkerPool *pPool) {
1,315,904✔
1205
  (void)taosThreadMutexLock(&pPool->poolLock);
1,315,904✔
1206
  if (!pPool->pWorkers) return;
1,315,904✔
1207
  for (int32_t i = 0; i < pPool->num; ++i) {
11,613,062✔
1208
    SDispatchWorker *pWorker = pPool->pWorkers + i;
10,297,158✔
1209
    if (pWorker->queue) {
10,297,158✔
1210
      taosCloseQueue(pWorker->queue);
10,297,158✔
1211
      pWorker->queue = NULL;
10,297,158✔
1212
    }
1213
    if (pWorker->qset) {
10,297,158✔
1214
      taosCloseQset(pWorker->qset);
10,297,158✔
1215
      pWorker->qset = NULL;
10,297,158✔
1216
    }
1217
  }
1218
  (void)taosThreadMutexUnlock(&pPool->poolLock);
1,315,904✔
1219
}
1220

1221
void tDispatchWorkerCleanup(SDispatchWorkerPool *pPool) {
1,315,904✔
1222
  (void)taosThreadMutexLock(&pPool->poolLock);
1,315,904✔
1223
  pPool->exit = true;
1,315,904✔
1224
  if (pPool->pWorkers) {
1,315,904✔
1225
    for (int32_t i = 0; i < pPool->num; ++i) {
11,613,062✔
1226
      SDispatchWorker *pWorker = pPool->pWorkers + i;
10,297,158✔
1227
      if (pWorker->qset) {
10,297,158✔
1228
        taosQsetThreadResume(pWorker->qset);
10,297,158✔
1229
      }
1230
    }
1231
  }
1232
  (void)taosThreadMutexUnlock(&pPool->poolLock);
1,315,904✔
1233

1234
  if (pPool->pWorkers) {
1,315,904✔
1235
    for (int32_t i = 0; i < pPool->num; ++i) {
11,613,062✔
1236
      SDispatchWorker *pWorker = pPool->pWorkers + i;
10,297,158✔
1237
      if (taosCheckPthreadValid(pWorker->thread)) {
10,297,158✔
1238
        (void)taosThreadJoin(pWorker->thread, NULL);
10,297,158✔
1239
        taosThreadClear(&pWorker->thread);
10,297,158✔
1240
      }
1241
    }
1242
  }
1243
  tDispatchWorkerFreeQueue(pPool);
1,315,904✔
1244
  taosMemoryFreeClear(pPool->pWorkers);
1,315,904✔
1245
  (void)taosThreadMutexDestroy(&pPool->poolLock);
1,315,904✔
1246
}
1,315,904✔
1247

1248
int32_t tAddTaskIntoDispatchWorkerPool(SDispatchWorkerPool *pPool, void *pMsg) {
101,110,401✔
1249
  int32_t code = 0;
101,110,401✔
1250
  int32_t idx = 0;
101,110,401✔
1251
  SDispatchWorker *pWorker = NULL;
101,114,121✔
1252
  (void)taosThreadMutexLock(&pPool->poolLock);
101,114,121✔
1253
  code = pPool->dispatchFp(pPool, pMsg, &idx);
101,124,951✔
1254
  if (code == 0) {
101,124,951✔
1255
    pWorker = pPool->pWorkers + idx;
101,124,951✔
1256
    if (pWorker->queue) {
101,124,951✔
1257
      code = taosWriteQitem(pWorker->queue, pMsg);
101,124,951✔
1258
    } else {
1259
      code = TSDB_CODE_INTERNAL_ERROR;
×
1260
    }
1261
  }
1262
  (void)taosThreadMutexUnlock(&pPool->poolLock);
101,124,951✔
1263
  if (code != 0) {
101,124,544✔
1264
    uError("worker:%s, failed to add task into dispatch worker pool, code:%d", pPool->name, code);
×
1265
  } else {
1266
    uDebug("msg %p dispatch to the %dth worker, threadId:%" PRId64, pMsg, idx, taosGetPthreadId(pWorker->thread));
101,124,544✔
1267
  }
1268
  return code;
101,124,544✔
1269
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc