• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4925

12 Jan 2026 09:34AM UTC coverage: 66.107% (+0.8%) from 65.354%
#4925

push

travis-ci

web-flow
merge: from main to 3.0 branch #34248

103 of 129 new or added lines in 9 files covered. (79.84%)

891 existing lines in 139 files now uncovered.

200488 of 303278 relevant lines covered (66.11%)

129810096.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.27
/source/util/src/tqueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "taoserror.h"
18
#include "tlog.h"
19
#include "tqueue.h"
20
#include "tutil.h"
21

22
int64_t tsQueueMemoryAllowed = 0;
23
int64_t tsQueueMemoryUsed = 0;
24

25
int64_t tsApplyMemoryAllowed = 0;
26
int64_t tsApplyMemoryUsed = 0;
27
struct STaosQueue {
28
  STaosQnode   *head;
29
  STaosQnode   *tail;
30
  STaosQueue   *next;     // for queue set
31
  STaosQset    *qset;     // for queue set
32
  void         *ahandle;  // for queue set
33
  FItem         itemFp;
34
  FItems        itemsFp;
35
  int32_t       numOfItems;
36
  int64_t       memOfItems;
37
  int64_t       threadId;
38
  int64_t       memLimit;
39
  int64_t       itemLimit;
40
  TdThreadMutex mutex;
41
};
42

43
struct STaosQset {
44
  STaosQueue   *head;
45
  STaosQueue   *current;
46
  TdThreadMutex mutex;
47
  tsem_t        sem;
48
  int32_t       numOfQueues;
49
  int32_t       numOfItems;
50
};
51

52
struct STaosQall {
53
  STaosQnode *current;
54
  STaosQnode *start;
55
  int32_t     numOfItems;
56
  int64_t     memOfItems;
57
  int32_t     unAccessedNumOfItems;
58
  int64_t     unAccessMemOfItems;
59
};
60

61
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
×
62
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }
×
63

64
int32_t taosOpenQueue(STaosQueue **queue) {
425,450,736✔
65
  *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
425,450,736✔
66
  if (*queue == NULL) {
423,704,114✔
67
    return terrno;
×
68
  }
69

70
  int32_t code = taosThreadMutexInit(&(*queue)->mutex, NULL);
423,782,971✔
71
  if (code) {
424,558,723✔
72
    taosMemoryFreeClear(*queue);
×
73
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
74
  }
75

76
  uDebug("queue:%p is opened", queue);
424,558,723✔
77
  return 0;
425,220,095✔
78
}
79

80
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
39,369,007✔
81
  if (queue == NULL) return;
39,369,007✔
82
  queue->itemFp = itemFp;
39,369,007✔
83
  queue->itemsFp = itemsFp;
39,369,007✔
84
}
85

86
void taosCloseQueue(STaosQueue *queue) {
424,766,427✔
87
  if (queue == NULL) return;
424,766,427✔
88
  STaosQnode *pTemp;
89
  STaosQset  *qset;
90

91
  (void)taosThreadMutexLock(&queue->mutex);
424,766,390✔
92
  STaosQnode *pNode = queue->head;
424,914,913✔
93
  queue->head = NULL;
424,909,082✔
94
  qset = queue->qset;
424,861,500✔
95
  (void)taosThreadMutexUnlock(&queue->mutex);
424,748,171✔
96

97
  if (queue->qset) {
424,910,745✔
98
    taosRemoveFromQset(qset, queue);
18,315,331✔
99
  }
100

101
  while (pNode) {
424,895,056✔
102
    pTemp = pNode;
134,171✔
103
    pNode = pNode->next;
134,171✔
104
    taosMemoryFree(pTemp);
134,171✔
105
  }
106

107
  (void)taosThreadMutexDestroy(&queue->mutex);
424,760,885✔
108
  taosMemoryFree(queue);
424,692,532✔
109

110
  uDebug("queue:%p is closed", queue);
424,655,449✔
111
}
112

113
bool taosQueueEmpty(STaosQueue *queue) {
1,184,483,383✔
114
  if (queue == NULL) return true;
1,184,483,383✔
115

116
  bool empty = false;
1,184,483,383✔
117
  (void)taosThreadMutexLock(&queue->mutex);
1,184,483,383✔
118
  if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) {
1,184,747,774✔
119
    empty = true;
542,652,179✔
120
  }
121
  (void)taosThreadMutexUnlock(&queue->mutex);
1,184,680,977✔
122

123
  return empty;
1,184,803,082✔
124
}
125

126
void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
2,147,483,647✔
127
  if (queue == NULL) return;
2,147,483,647✔
128

129
  (void)taosThreadMutexLock(&queue->mutex);
2,147,483,647✔
130
  queue->numOfItems -= items;
2,147,483,647✔
131
  (void)taosThreadMutexUnlock(&queue->mutex);
2,147,483,647✔
132
}
133

134
int32_t taosQueueItemSize(STaosQueue *queue) {
1,350,900,079✔
135
  if (queue == NULL) return 0;
1,350,900,079✔
136

137
  (void)taosThreadMutexLock(&queue->mutex);
1,350,900,079✔
138
  int32_t numOfItems = queue->numOfItems;
1,351,181,322✔
139
  (void)taosThreadMutexUnlock(&queue->mutex);
1,351,189,287✔
140

141
  uTrace("queue:%p, numOfItems:%d memOfItems:%" PRId64, queue, queue->numOfItems, queue->memOfItems);
1,351,201,999✔
142
  return numOfItems;
1,351,006,465✔
143
}
144

145
int64_t taosQueueMemorySize(STaosQueue *queue) {
×
146
  (void)taosThreadMutexLock(&queue->mutex);
×
147
  int64_t memOfItems = queue->memOfItems;
×
148
  (void)taosThreadMutexUnlock(&queue->mutex);
×
149
  return memOfItems;
×
150
}
151

152
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) {
2,147,483,647✔
153
  int64_t alloced = -1;
2,147,483,647✔
154

155
  if (itype == RPC_QITEM) {
2,147,483,647✔
156
    alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
2,147,483,647✔
157
    if (alloced > tsQueueMemoryAllowed) {
2,147,483,647✔
158
      uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
×
159
             tsQueueMemoryAllowed);
160
      (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
161
      return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
×
162
    }
163
  } else if (itype == APPLY_QITEM) {
2,147,483,647✔
164
    alloced = atomic_add_fetch_64(&tsApplyMemoryUsed, size + dataSize);
166,733,390✔
165
    if (alloced > tsApplyMemoryAllowed) {
166,733,390✔
166
      uDebug("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
×
167
             tsApplyMemoryAllowed);
168
      (void)atomic_sub_fetch_64(&tsApplyMemoryUsed, size + dataSize);
×
169
      return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
×
170
    }
171
  }
172

173
  *item = NULL;
2,147,483,647✔
174
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
2,147,483,647✔
175
  if (pNode == NULL) {
2,147,483,647✔
176
    if (itype == RPC_QITEM) {
×
177
      (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
178
    } else if (itype == APPLY_QITEM) {
×
179
      (void)atomic_sub_fetch_64(&tsApplyMemoryUsed, size + dataSize);
×
180
    }
181
    return terrno;
×
182
  }
183

184
  pNode->dataSize = dataSize;
2,147,483,647✔
185
  pNode->size = size;
2,147,483,647✔
186
  pNode->itype = itype;
2,147,483,647✔
187
  pNode->timestamp = taosGetTimestampUs();
2,147,483,647✔
188
  uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
2,147,483,647✔
189
  *item = pNode->item;
2,147,483,647✔
190
  return 0;
2,147,483,647✔
191
}
192

193
void taosFreeQitem(void *pItem) {
2,147,483,647✔
194
  if (pItem == NULL) return;
2,147,483,647✔
195

196
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
2,147,483,647✔
197
  int64_t     alloced = -1;
2,147,483,647✔
198
  if (pNode->itype == RPC_QITEM) {
2,147,483,647✔
199
    alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
2,147,483,647✔
200
  } else if (pNode->itype == APPLY_QITEM) {
2,147,483,647✔
201
    alloced = atomic_sub_fetch_64(&tsApplyMemoryUsed, pNode->size + pNode->dataSize);
166,732,792✔
202
  }
203
  uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
2,147,483,647✔
204

205
  taosMemoryFree(pNode);
2,147,483,647✔
206
}
207

208
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
2,147,483,647✔
209
  int32_t     code = 0;
2,147,483,647✔
210
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
2,147,483,647✔
211
  pNode->timestamp = taosGetTimestampUs();
2,147,483,647✔
212
  pNode->next = NULL;
2,147,483,647✔
213

214
  (void)taosThreadMutexLock(&queue->mutex);
2,147,483,647✔
215
  if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) {
2,147,483,647✔
216
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
217
    uError("item:%p, failed to put into queue:%p, queue mem limit:%" PRId64 ", reason:%s" PRId64, pItem, queue,
×
218
           queue->memLimit, tstrerror(code));
219

220
    (void)taosThreadMutexUnlock(&queue->mutex);
×
221
    return code;
×
222
  } else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) {
2,147,483,647✔
223
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
224
    uError("item:%p, failed to put into queue:%p, queue size limit:%" PRId64 ", reason:%s" PRId64, pItem, queue,
×
225
           queue->itemLimit, tstrerror(code));
226
    (void)taosThreadMutexUnlock(&queue->mutex);
×
227
    return code;
×
228
  }
229

230
  if (queue->tail) {
2,147,483,647✔
231
    queue->tail->next = pNode;
601,482,293✔
232
    queue->tail = pNode;
601,472,885✔
233
  } else {
234
    queue->head = pNode;
2,147,483,647✔
235
    queue->tail = pNode;
2,147,483,647✔
236
  }
237
  queue->numOfItems++;
2,147,483,647✔
238
  queue->memOfItems += (pNode->size + pNode->dataSize);
2,147,483,647✔
239
  if (queue->qset) {
2,147,483,647✔
240
    (void)atomic_add_fetch_32(&queue->qset->numOfItems, 1);
2,147,483,647✔
241
  }
242

243
  uTrace("item:%p, is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
2,147,483,647✔
244

245
  (void)taosThreadMutexUnlock(&queue->mutex);
2,147,483,647✔
246

247
  if (queue->qset) {
2,147,483,647✔
248
    if (tsem_post(&queue->qset->sem) != 0) {
2,147,483,647✔
249
      uError("failed to post semaphore for queue set:%p", queue->qset);
×
250
    } else {
251
      uDebug("sem_post Qset %p, sem:%p", queue->qset, &queue->qset->sem);
2,147,483,647✔
252
    }
253
  } else {
254
    uDebug("empty qset");
664,031,402✔
255
  }
256
  return code;
2,147,483,647✔
257
}
258

259
void taosReadQitem(STaosQueue *queue, void **ppItem) {
1,061,515,364✔
260
  STaosQnode *pNode = NULL;
1,061,515,364✔
261

262
  (void)taosThreadMutexLock(&queue->mutex);
1,061,515,364✔
263

264
  if (queue->head) {
1,061,639,412✔
265
    pNode = queue->head;
664,019,568✔
266
    *ppItem = pNode->item;
664,036,139✔
267
    queue->head = pNode->next;
663,985,582✔
268
    if (queue->head == NULL) {
663,981,243✔
269
      queue->tail = NULL;
373,084,950✔
270
    }
271
    queue->numOfItems--;
663,978,788✔
272
    queue->memOfItems -= (pNode->size + pNode->dataSize);
663,958,892✔
273
    if (queue->qset) {
663,900,726✔
274
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
×
275
    }
276
    uTrace("item:%p, is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
663,930,839✔
277
           queue->memOfItems);
278
  }
279

280
  (void)taosThreadMutexUnlock(&queue->mutex);
1,061,449,352✔
281
}
1,061,653,643✔
282

283
int32_t taosAllocateQall(STaosQall **qall) {
17,472,862✔
284
  *qall = taosMemoryCalloc(1, sizeof(STaosQall));
17,472,862✔
285
  if (*qall == NULL) {
17,473,386✔
286
    return terrno;
×
287
  }
288
  return 0;
17,473,386✔
289
}
290

291
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
17,473,753✔
292

293
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
×
294
  int32_t numOfItems = 0;
×
295
  bool    empty;
296

297
  (void)taosThreadMutexLock(&queue->mutex);
×
298

299
  empty = queue->head == NULL;
×
300
  if (!empty) {
×
301
    memset(qall, 0, sizeof(STaosQall));
×
302
    qall->current = queue->head;
×
303
    qall->start = queue->head;
×
304
    qall->numOfItems = queue->numOfItems;
×
305
    qall->memOfItems = queue->memOfItems;
×
306

307
    qall->unAccessedNumOfItems = queue->numOfItems;
×
308
    qall->unAccessMemOfItems = queue->memOfItems;
×
309

310
    numOfItems = qall->numOfItems;
×
311

312
    queue->head = NULL;
×
313
    queue->tail = NULL;
×
314
    queue->numOfItems = 0;
×
315
    queue->memOfItems = 0;
×
316
    uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems,
×
317
           queue->memOfItems);
318
    if (queue->qset) {
×
319
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
×
320
    }
321
  }
322

323
  (void)taosThreadMutexUnlock(&queue->mutex);
×
324

325
  // if source queue is empty, we set destination qall to empty too.
326
  if (empty) {
×
327
    qall->current = NULL;
×
328
    qall->start = NULL;
×
329
    qall->numOfItems = 0;
×
330
  }
331
  return numOfItems;
×
332
}
333

334
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
2,120,407,455✔
335
  STaosQnode *pNode;
336
  int32_t     num = 0;
2,120,407,455✔
337

338
  pNode = qall->current;
2,120,407,455✔
339
  if (pNode) qall->current = pNode->next;
2,120,456,268✔
340

341
  if (pNode) {
2,120,487,560✔
342
    *ppItem = pNode->item;
2,120,487,560✔
343
    num = 1;
2,120,468,660✔
344

345
    qall->unAccessedNumOfItems -= 1;
2,120,468,660✔
346
    qall->unAccessMemOfItems -= pNode->dataSize;
2,120,545,204✔
347

348
    uTrace("item:%p, is fetched", *ppItem);
2,120,621,396✔
349
  } else {
UNCOV
350
    *ppItem = NULL;
×
351
  }
352

353
  return num;
2,120,530,113✔
354
}
355

356
int32_t taosOpenQset(STaosQset **qset) {
31,924,979✔
357
  *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
31,924,979✔
358
  if (*qset == NULL) {
31,925,154✔
359
    return terrno;
×
360
  }
361

362
  (void)taosThreadMutexInit(&(*qset)->mutex, NULL);
31,924,614✔
363
  if (tsem_init(&(*qset)->sem, 0, 0) != 0) {
31,925,222✔
364
    taosMemoryFree(*qset);
×
365
    return terrno;
×
366
  }
367

368
  uDebug("qset:%p, is opened", qset);
31,923,821✔
369
  return 0;
31,925,254✔
370
}
371

372
void taosCloseQset(STaosQset *qset) {
31,926,028✔
373
  if (qset == NULL) return;
31,926,028✔
374

375
  // remove all the queues from qset
376
  (void)taosThreadMutexLock(&qset->mutex);
31,925,991✔
377
  while (qset->head) {
52,979,927✔
378
    STaosQueue *queue = qset->head;
21,053,936✔
379
    qset->head = qset->head->next;
21,053,936✔
380

381
    queue->qset = NULL;
21,053,936✔
382
    queue->next = NULL;
21,053,936✔
383
  }
384
  (void)taosThreadMutexUnlock(&qset->mutex);
31,925,591✔
385

386
  (void)taosThreadMutexDestroy(&qset->mutex);
31,925,991✔
387
  if (tsem_destroy(&qset->sem) != 0) {
31,925,591✔
388
    uError("failed to destroy semaphore for qset:%p", qset);
×
389
  }
390
  taosMemoryFree(qset);
31,925,991✔
391
  uDebug("qset:%p, is closed", qset);
31,925,991✔
392
}
393

394
// tsem_post 'qset->sem', so that reader threads waiting for it
395
// resumes execution and return, should only be used to signal the
396
// thread to exit.
397
void taosQsetThreadResume(STaosQset *qset) {
206,499,555✔
398
  uDebug("qset:%p, it will exit", qset);
206,499,555✔
399
  if (tsem_post(&qset->sem) != 0) {
206,499,555✔
400
    uError("failed to post semaphore for qset:%p", qset);
×
401
  }
402
}
206,499,585✔
403

404
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
39,368,301✔
405
  if (queue->qset) return TSDB_CODE_INVALID_PARA;
39,368,301✔
406

407
  (void)taosThreadMutexLock(&qset->mutex);
39,368,530✔
408

409
  queue->next = qset->head;
39,368,191✔
410
  queue->ahandle = ahandle;
39,369,149✔
411
  qset->head = queue;
39,369,113✔
412
  qset->numOfQueues++;
39,368,514✔
413

414
  (void)taosThreadMutexLock(&queue->mutex);
39,368,094✔
415
  (void)atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
39,369,267✔
416
  queue->qset = qset;
39,368,530✔
417
  (void)taosThreadMutexUnlock(&queue->mutex);
39,368,530✔
418

419
  (void)taosThreadMutexUnlock(&qset->mutex);
39,368,845✔
420

421
  uTrace("queue:%p, is added into qset:%p", queue, qset);
39,368,459✔
422
  return 0;
39,369,038✔
423
}
424

425
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
18,315,331✔
426
  STaosQueue *tqueue = NULL;
18,315,331✔
427

428
  (void)taosThreadMutexLock(&qset->mutex);
18,315,331✔
429

430
  if (qset->head) {
18,315,331✔
431
    if (qset->head == queue) {
18,315,331✔
432
      qset->head = qset->head->next;
15,347,937✔
433
      tqueue = queue;
15,347,937✔
434
    } else {
435
      STaosQueue *prev = qset->head;
2,967,394✔
436
      tqueue = qset->head->next;
2,967,394✔
437
      while (tqueue) {
8,670,722✔
438
        if (tqueue == queue) {
8,670,477✔
439
          prev->next = tqueue->next;
2,967,394✔
440
          break;
2,967,394✔
441
        } else {
442
          prev = tqueue;
5,703,083✔
443
          tqueue = tqueue->next;
5,703,083✔
444
        }
445
      }
446
    }
447

448
    if (tqueue) {
18,315,576✔
449
      if (qset->current == queue) qset->current = tqueue->next;
18,315,331✔
450
      qset->numOfQueues--;
18,315,331✔
451

452
      (void)taosThreadMutexLock(&queue->mutex);
18,315,331✔
453
      (void)atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
18,314,642✔
454
      queue->qset = NULL;
18,314,642✔
455
      queue->next = NULL;
18,314,642✔
456
      (void)taosThreadMutexUnlock(&queue->mutex);
18,314,642✔
457
    }
458
  }
459

460
  (void)taosThreadMutexUnlock(&qset->mutex);
18,314,887✔
461

462
  uDebug("queue:%p, is removed from qset:%p", queue, qset);
18,315,331✔
463
}
18,315,331✔
464

465
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) {
2,147,483,647✔
466
  STaosQnode *pNode = NULL;
2,147,483,647✔
467
  int32_t     code = 0;
2,147,483,647✔
468

469
  uDebug("start to waitfromQset %p, sem:%p, idx:%d", qset, &qset->sem, qinfo->workerId);
2,147,483,647✔
470
  if (tsem_wait(&qset->sem) != 0) {
2,147,483,647✔
471
    uError("failed to wait semaphore for qset:%p", qset);
×
472
  }
473
  uDebug("end waitfromQset %p, sem:%p, idx:%d", qset, &qset->sem, qinfo->workerId);
2,147,483,647✔
474

475
  (void)taosThreadMutexLock(&qset->mutex);
2,147,483,647✔
476

477
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
2,147,483,647✔
478
    if (qset->current == NULL) qset->current = qset->head;
2,147,483,647✔
479
    STaosQueue *queue = qset->current;
2,147,483,647✔
480
    if (queue) qset->current = queue->next;
2,147,483,647✔
481
    if (queue == NULL) break;
2,147,483,647✔
482
    if (queue->head == NULL) continue;
2,147,483,647✔
483

484
    (void)taosThreadMutexLock(&queue->mutex);
2,147,483,647✔
485

486
    if (queue->head) {
2,147,483,647✔
487
      pNode = queue->head;
2,147,483,647✔
488
      *ppItem = pNode->item;
2,147,483,647✔
489
      qinfo->ahandle = queue->ahandle;
2,147,483,647✔
490
      qinfo->fp = queue->itemFp;
2,147,483,647✔
491
      qinfo->queue = queue;
2,147,483,647✔
492
      qinfo->timestamp = pNode->timestamp;
2,147,483,647✔
493

494
      queue->head = pNode->next;
2,147,483,647✔
495
      if (queue->head == NULL) queue->tail = NULL;
2,147,483,647✔
496
      // queue->numOfItems--;
497
      queue->memOfItems -= (pNode->size + pNode->dataSize);
2,147,483,647✔
498
      (void)atomic_sub_fetch_32(&qset->numOfItems, 1);
2,147,483,647✔
499
      code = 1;
2,147,483,647✔
500
      uTrace("item:%p, is read out from queue:%p, timeInQueue:%" PRId64 "us , items:%d mem:%" PRId64, 
2,147,483,647✔
501
          *ppItem, queue, taosGetTimestampUs() - pNode->timestamp, queue->numOfItems - 1, queue->memOfItems);
502
    }
503

504
    (void)taosThreadMutexUnlock(&queue->mutex);
2,147,483,647✔
505
    if (pNode) break;
2,147,483,647✔
506
  }
507

508
  (void)taosThreadMutexUnlock(&qset->mutex);
2,147,483,647✔
509

510
  return code;
2,147,483,647✔
511
}
512

513
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) {
2,013,411,641✔
514
  STaosQueue *queue;
515
  int32_t     code = 0;
2,013,411,641✔
516

517
  if (tsem_wait(&qset->sem) != 0) {
2,013,411,641✔
518
    uError("failed to wait semaphore for qset:%p", qset);
×
519
  }
520
  (void)taosThreadMutexLock(&qset->mutex);
2,013,495,951✔
521

522
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
2,046,654,455✔
523
    if (qset->current == NULL) qset->current = qset->head;
2,029,163,247✔
524
    queue = qset->current;
2,029,129,147✔
525
    if (queue) qset->current = queue->next;
2,029,161,005✔
526
    if (queue == NULL) break;
2,029,114,275✔
527
    if (queue->head == NULL) continue;
2,029,114,275✔
528

529
    (void)taosThreadMutexLock(&queue->mutex);
1,996,037,782✔
530

531
    if (queue->head) {
1,996,107,132✔
532
      qall->current = queue->head;
1,996,026,093✔
533
      qall->start = queue->head;
1,996,077,284✔
534
      qall->numOfItems = queue->numOfItems;
1,995,956,875✔
535
      qall->memOfItems = queue->memOfItems;
1,996,004,483✔
536
      qall->unAccessedNumOfItems = queue->numOfItems;
1,996,029,532✔
537
      qall->unAccessMemOfItems = queue->memOfItems;
1,996,028,834✔
538

539
      code = qall->numOfItems;
1,996,030,858✔
540
      qinfo->ahandle = queue->ahandle;
1,996,038,695✔
541
      qinfo->fp = queue->itemsFp;
1,996,041,140✔
542
      qinfo->queue = queue;
1,995,973,862✔
543
      qinfo->timestamp = queue->head->timestamp;
1,995,973,351✔
544

545
      queue->head = NULL;
1,996,059,390✔
546
      queue->tail = NULL;
1,996,054,801✔
547
      // queue->numOfItems = 0;
548
      queue->memOfItems = 0;
1,996,000,841✔
549
      uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems);
1,996,049,326✔
550

551
      (void)atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
1,996,049,326✔
552

553
      STaosQnode *pNode = qall->start->next;
1,996,072,534✔
554
      // skip the first node, tsem_wait has been called once at the beginning of this function
555
      // queue->numOfItems is not entirely reliable and may be larger than the actual value due to concurrency issues.
556
      while (pNode) {
2,120,625,485✔
557
        if (tsem_wait(&qset->sem) != 0) {
124,563,956✔
558
          uError("failed to wait semaphore for qset:%p", qset);
×
559
        }
560
        pNode = pNode->next;
124,563,327✔
561
      }
562
    }
563

564
    (void)taosThreadMutexUnlock(&queue->mutex);
1,996,141,849✔
565

566
    if (code != 0) break;
1,996,039,908✔
567
  }
568

569
  (void)taosThreadMutexUnlock(&qset->mutex);
2,013,491,055✔
570
  return code;
2,013,539,999✔
571
}
572

573
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
×
574
int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; }
×
575

576
int64_t taosQallUnAccessedItemSize(STaosQall *qall) { return qall->unAccessedNumOfItems; }
×
577
int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfItems; }
×
578

579
void    taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
×
580
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
×
581

582
void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId) { pQueue->threadId = threadId; }
18,684,599✔
583

584
int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; }
36,593,060✔
585

586
#if 0
587

588
void taosResetQsetThread(STaosQset *qset, void *pItem) {
589
  if (pItem == NULL) return;
590
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
591

592
  (void)taosThreadMutexLock(&qset->mutex);
593
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
594
    tsem_post(&qset->sem);
595
  }
596
  (void)taosThreadMutexUnlock(&qset->mutex);
597
}
598

599
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc