• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3526

10 Nov 2024 03:50AM UTC coverage: 60.225% (-0.6%) from 60.818%
#3526

push

travis-ci

web-flow
Merge pull request #28709 from taosdata/main

merge: from main to 3.0 branch

117031 of 249004 branches covered (47.0%)

Branch coverage included in aggregate %.

130 of 169 new or added lines in 23 files covered. (76.92%)

4149 existing lines in 176 files now uncovered.

197577 of 273386 relevant lines covered (72.27%)

5840219.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.23
/source/util/src/tqueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tqueue.h"
18
#include "taoserror.h"
19
#include "tlog.h"
20
#include "tutil.h"
21

22
int64_t tsQueueMemoryAllowed = 0;
23
int64_t tsQueueMemoryUsed = 0;
24

25
struct STaosQueue {
26
  STaosQnode   *head;
27
  STaosQnode   *tail;
28
  STaosQueue   *next;     // for queue set
29
  STaosQset    *qset;     // for queue set
30
  void         *ahandle;  // for queue set
31
  FItem         itemFp;
32
  FItems        itemsFp;
33
  TdThreadMutex mutex;
34
  int64_t       memOfItems;
35
  int32_t       numOfItems;
36
  int64_t       threadId;
37
  int64_t       memLimit;
38
  int64_t       itemLimit;
39
};
40

41
struct STaosQset {
42
  STaosQueue   *head;
43
  STaosQueue   *current;
44
  TdThreadMutex mutex;
45
  tsem_t        sem;
46
  int32_t       numOfQueues;
47
  int32_t       numOfItems;
48
};
49

50
struct STaosQall {
51
  STaosQnode *current;
52
  STaosQnode *start;
53
  int32_t     numOfItems;
54
  int64_t     memOfItems;
55
  int32_t     unAccessedNumOfItems;
56
  int64_t     unAccessMemOfItems;
57
};
58

59
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
16,279✔
60
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }
16,279✔
61

62
int32_t taosOpenQueue(STaosQueue **queue) {
2,228,410✔
63
  *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
2,228,410✔
64
  if (*queue == NULL) {
2,229,694!
65
    return terrno;
×
66
  }
67

68
  int32_t code = taosThreadMutexInit(&(*queue)->mutex, NULL);
2,229,694✔
69
  if (code) {
2,229,655!
70
    taosMemoryFreeClear(*queue);
×
71
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
72
  }
73

74
  uDebug("queue:%p is opened", queue);
2,229,655✔
75
  return 0;
2,229,176✔
76
}
77

78
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
110,843✔
79
  if (queue == NULL) return;
110,843!
80
  queue->itemFp = itemFp;
110,843✔
81
  queue->itemsFp = itemsFp;
110,843✔
82
}
83

84
void taosCloseQueue(STaosQueue *queue) {
2,226,964✔
85
  if (queue == NULL) return;
2,226,964!
86
  STaosQnode *pTemp;
87
  STaosQset  *qset;
88

89
  (void)taosThreadMutexLock(&queue->mutex);
2,226,964✔
90
  STaosQnode *pNode = queue->head;
2,227,073✔
91
  queue->head = NULL;
2,227,073✔
92
  qset = queue->qset;
2,227,073✔
93
  (void)taosThreadMutexUnlock(&queue->mutex);
2,227,073✔
94

95
  if (queue->qset) {
2,227,065✔
96
    taosRemoveFromQset(qset, queue);
36,799✔
97
  }
98

99
  while (pNode) {
2,227,680✔
100
    pTemp = pNode;
608✔
101
    pNode = pNode->next;
608✔
102
    taosMemoryFree(pTemp);
608✔
103
  }
104

105
  (void)taosThreadMutexDestroy(&queue->mutex);
2,227,072✔
106
  taosMemoryFree(queue);
2,226,965✔
107

108
  uDebug("queue:%p is closed", queue);
2,227,093✔
109
}
110

111
bool taosQueueEmpty(STaosQueue *queue) {
7,630,350✔
112
  if (queue == NULL) return true;
7,630,350!
113

114
  bool empty = false;
7,630,350✔
115
  (void)taosThreadMutexLock(&queue->mutex);
7,630,350✔
116
  if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) {
7,655,789!
117
    empty = true;
2,741,100✔
118
  }
119
  (void)taosThreadMutexUnlock(&queue->mutex);
7,655,789✔
120

121
  return empty;
7,657,247✔
122
}
123

124
void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
23,107,255✔
125
  if (queue == NULL) return;
23,107,255!
126

127
  (void)taosThreadMutexLock(&queue->mutex);
23,107,255✔
128
  queue->numOfItems -= items;
23,110,032✔
129
  (void)taosThreadMutexUnlock(&queue->mutex);
23,110,032✔
130
}
131

132
int32_t taosQueueItemSize(STaosQueue *queue) {
10,319,153✔
133
  if (queue == NULL) return 0;
10,319,153!
134

135
  (void)taosThreadMutexLock(&queue->mutex);
10,319,153✔
136
  int32_t numOfItems = queue->numOfItems;
10,323,656✔
137
  (void)taosThreadMutexUnlock(&queue->mutex);
10,323,656✔
138

139
  uTrace("queue:%p, numOfItems:%d memOfItems:%" PRId64, queue, queue->numOfItems, queue->memOfItems);
10,327,071✔
140
  return numOfItems;
10,327,348✔
141
}
142

143
int64_t taosQueueMemorySize(STaosQueue *queue) {
343,339✔
144
  (void)taosThreadMutexLock(&queue->mutex);
343,339✔
145
  int64_t memOfItems = queue->memOfItems;
343,342✔
146
  (void)taosThreadMutexUnlock(&queue->mutex);
343,342✔
147
  return memOfItems;
343,343✔
148
}
149

150
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) {
28,672,484✔
151
  int64_t alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
28,672,484✔
152
  if (alloced > tsQueueMemoryAllowed) {
28,700,517✔
153
    if (itype == RPC_QITEM) {
10,656,033!
154
      uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
×
155
             tsQueueMemoryAllowed);
156
      (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
157
      return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
×
158
    }
159
  }
160

161
  *item = NULL;
28,700,517✔
162
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
28,700,517✔
163
  if (pNode == NULL) {
28,693,538!
164
    (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
165
    return terrno;
×
166
  }
167

168
  pNode->dataSize = dataSize;
28,693,538✔
169
  pNode->size = size;
28,693,538✔
170
  pNode->itype = itype;
28,693,538✔
171
  pNode->timestamp = taosGetTimestampUs();
28,691,624✔
172
  uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
28,691,624✔
173
  *item = pNode->item;
28,694,445✔
174
  return 0;
28,694,445✔
175
}
176

177
void taosFreeQitem(void *pItem) {
28,681,160✔
178
  if (pItem == NULL) return;
28,681,160!
179

180
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
28,681,160✔
181
  int64_t     alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
28,681,160✔
182
  uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
28,693,470✔
183

184
  taosMemoryFree(pNode);
28,693,469✔
185
}
186

187
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
28,670,036✔
188
  int32_t     code = 0;
28,670,036✔
189
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
28,670,036✔
190
  pNode->timestamp = taosGetTimestampUs();
28,677,451✔
191
  pNode->next = NULL;
28,677,451✔
192

193
  (void)taosThreadMutexLock(&queue->mutex);
28,677,451✔
194
  if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) {
28,684,718!
195
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
196
    uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
×
197
           queue->memLimit, tstrerror(code));
198

199
    (void)taosThreadMutexUnlock(&queue->mutex);
×
200
    return code;
×
201
  } else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) {
28,684,718!
202
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
203
    uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
×
204
           queue->itemLimit, tstrerror(code));
205
    (void)taosThreadMutexUnlock(&queue->mutex);
×
206
    return code;
×
207
  }
208

209
  if (queue->tail) {
28,684,718✔
210
    queue->tail->next = pNode;
4,843,606✔
211
    queue->tail = pNode;
4,843,606✔
212
  } else {
213
    queue->head = pNode;
23,841,112✔
214
    queue->tail = pNode;
23,841,112✔
215
  }
216
  queue->numOfItems++;
28,684,718✔
217
  queue->memOfItems += (pNode->size + pNode->dataSize);
28,684,718✔
218
  if (queue->qset) {
28,684,718✔
219
    (void)atomic_add_fetch_32(&queue->qset->numOfItems, 1);
23,584,364✔
220
  }
221

222
  uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
28,686,200✔
223

224
  (void)taosThreadMutexUnlock(&queue->mutex);
28,686,205✔
225

226
  if (queue->qset) {
28,685,374✔
227
    if (tsem_post(&queue->qset->sem) != 0) {
23,585,662!
228
      uError("failed to post semaphore for queue set:%p", queue->qset);
×
229
    }
230
  }
231
  return code;
28,684,063✔
232
}
233

234
void taosReadQitem(STaosQueue *queue, void **ppItem) {
5,468,024✔
235
  STaosQnode *pNode = NULL;
5,468,024✔
236

237
  (void)taosThreadMutexLock(&queue->mutex);
5,468,024✔
238

239
  if (queue->head) {
5,474,288✔
240
    pNode = queue->head;
4,916,371✔
241
    *ppItem = pNode->item;
4,916,371✔
242
    queue->head = pNode->next;
4,916,371✔
243
    if (queue->head == NULL) {
4,916,371✔
244
      queue->tail = NULL;
1,642,147✔
245
    }
246
    queue->numOfItems--;
4,916,371✔
247
    queue->memOfItems -= (pNode->size + pNode->dataSize);
4,916,371✔
248
    if (queue->qset) {
4,916,371!
249
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
×
250
    }
251
    uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
4,916,371✔
252
           queue->memOfItems);
253
  }
254

255
  (void)taosThreadMutexUnlock(&queue->mutex);
5,474,289✔
256
}
5,473,208✔
257

258
int32_t taosAllocateQall(STaosQall **qall) {
180,170✔
259
  *qall = taosMemoryCalloc(1, sizeof(STaosQall));
180,170✔
260
  if (*qall == NULL) {
180,178!
UNCOV
261
    return terrno;
×
262
  }
263
  return 0;
180,180✔
264
}
265

266
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
179,930✔
267

268
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
349,149✔
269
  int32_t numOfItems = 0;
349,149✔
270
  bool    empty;
271

272
  (void)taosThreadMutexLock(&queue->mutex);
349,149✔
273

274
  empty = queue->head == NULL;
349,159✔
275
  if (!empty) {
349,159✔
276
    memset(qall, 0, sizeof(STaosQall));
112,254✔
277
    qall->current = queue->head;
112,254✔
278
    qall->start = queue->head;
112,254✔
279
    qall->numOfItems = queue->numOfItems;
112,254✔
280
    qall->memOfItems = queue->memOfItems;
112,254✔
281

282
    qall->unAccessedNumOfItems = queue->numOfItems;
112,254✔
283
    qall->unAccessMemOfItems = queue->memOfItems;
112,254✔
284

285
    numOfItems = qall->numOfItems;
112,254✔
286

287
    queue->head = NULL;
112,254✔
288
    queue->tail = NULL;
112,254✔
289
    queue->numOfItems = 0;
112,254✔
290
    queue->memOfItems = 0;
112,254✔
291
    uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems,
112,254✔
292
           queue->memOfItems);
293
    if (queue->qset) {
112,254!
294
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
×
295
    }
296
  }
297

298
  (void)taosThreadMutexUnlock(&queue->mutex);
349,159✔
299

300
  // if source queue is empty, we set destination qall to empty too.
301
  if (empty) {
349,163✔
302
    qall->current = NULL;
236,909✔
303
    qall->start = NULL;
236,909✔
304
    qall->numOfItems = 0;
236,909✔
305
  }
306
  return numOfItems;
349,163✔
307
}
308

309
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
8,643,053✔
310
  STaosQnode *pNode;
311
  int32_t     num = 0;
8,643,053✔
312

313
  pNode = qall->current;
8,643,053✔
314
  if (pNode) qall->current = pNode->next;
8,643,053✔
315

316
  if (pNode) {
8,643,053✔
317
    *ppItem = pNode->item;
8,308,721✔
318
    num = 1;
8,308,721✔
319

320
    qall->unAccessedNumOfItems -= 1;
8,308,721✔
321
    qall->unAccessMemOfItems -= pNode->dataSize;
8,308,721✔
322

323
    uTrace("item:%p is fetched", *ppItem);
8,308,721✔
324
  } else {
325
    *ppItem = NULL;
334,332✔
326
  }
327

328
  return num;
8,645,339✔
329
}
330

331
int32_t taosOpenQset(STaosQset **qset) {
87,759✔
332
  *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
87,759✔
333
  if (*qset == NULL) {
87,759!
334
    return terrno;
×
335
  }
336

337
  (void)taosThreadMutexInit(&(*qset)->mutex, NULL);
87,759✔
338
  if (tsem_init(&(*qset)->sem, 0, 0) != 0) {
87,759!
339
    taosMemoryFree(*qset);
×
340
    return terrno;
×
341
  }
342

343
  uDebug("qset:%p is opened", qset);
87,758✔
344
  return 0;
87,759✔
345
}
346

347
void taosCloseQset(STaosQset *qset) {
87,758✔
348
  if (qset == NULL) return;
87,758!
349

350
  // remove all the queues from qset
351
  (void)taosThreadMutexLock(&qset->mutex);
87,758✔
352
  while (qset->head) {
161,799✔
353
    STaosQueue *queue = qset->head;
74,040✔
354
    qset->head = qset->head->next;
74,040✔
355

356
    queue->qset = NULL;
74,040✔
357
    queue->next = NULL;
74,040✔
358
  }
359
  (void)taosThreadMutexUnlock(&qset->mutex);
87,759✔
360

361
  (void)taosThreadMutexDestroy(&qset->mutex);
87,758✔
362
  if (tsem_destroy(&qset->sem) != 0) {
87,757!
363
    uError("failed to destroy semaphore for qset:%p", qset);
×
364
  }
365
  taosMemoryFree(qset);
87,758✔
366
  uDebug("qset:%p is closed", qset);
87,757✔
367
}
368

369
// tsem_post 'qset->sem', so that reader threads waiting for it
370
// resumes execution and return, should only be used to signal the
371
// thread to exit.
372
void taosQsetThreadResume(STaosQset *qset) {
686,062✔
373
  uDebug("qset:%p, it will exit", qset);
686,062✔
374
  if (tsem_post(&qset->sem) != 0) {
686,062!
375
    uError("failed to post semaphore for qset:%p", qset);
×
376
  }
377
}
686,064✔
378

379
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
110,842✔
380
  if (queue->qset) return TSDB_CODE_INVALID_PARA;
110,842!
381

382
  (void)taosThreadMutexLock(&qset->mutex);
110,842✔
383

384
  queue->next = qset->head;
110,840✔
385
  queue->ahandle = ahandle;
110,840✔
386
  qset->head = queue;
110,840✔
387
  qset->numOfQueues++;
110,840✔
388

389
  (void)taosThreadMutexLock(&queue->mutex);
110,840✔
390
  (void)atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
110,841✔
391
  queue->qset = qset;
110,843✔
392
  (void)taosThreadMutexUnlock(&queue->mutex);
110,843✔
393

394
  (void)taosThreadMutexUnlock(&qset->mutex);
110,844✔
395

396
  uTrace("queue:%p is added into qset:%p", queue, qset);
110,843✔
397
  return 0;
110,842✔
398
}
399

400
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
36,801✔
401
  STaosQueue *tqueue = NULL;
36,801✔
402

403
  (void)taosThreadMutexLock(&qset->mutex);
36,801✔
404

405
  if (qset->head) {
36,802!
406
    if (qset->head == queue) {
36,803✔
407
      qset->head = qset->head->next;
26,444✔
408
      tqueue = queue;
26,444✔
409
    } else {
410
      STaosQueue *prev = qset->head;
10,359✔
411
      tqueue = qset->head->next;
10,359✔
412
      while (tqueue) {
29,328!
413
        if (tqueue == queue) {
29,328✔
414
          prev->next = tqueue->next;
10,359✔
415
          break;
10,359✔
416
        } else {
417
          prev = tqueue;
18,969✔
418
          tqueue = tqueue->next;
18,969✔
419
        }
420
      }
421
    }
422

423
    if (tqueue) {
36,803✔
424
      if (qset->current == queue) qset->current = tqueue->next;
36,802✔
425
      qset->numOfQueues--;
36,802✔
426

427
      (void)taosThreadMutexLock(&queue->mutex);
36,802✔
428
      (void)atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
36,801✔
429
      queue->qset = NULL;
36,803✔
430
      queue->next = NULL;
36,803✔
431
      (void)taosThreadMutexUnlock(&queue->mutex);
36,803✔
432
    }
433
  }
434

435
  (void)taosThreadMutexUnlock(&qset->mutex);
36,800✔
436

437
  uDebug("queue:%p is removed from qset:%p", queue, qset);
36,800✔
438
}
36,800✔
439

440
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) {
16,075,500✔
441
  STaosQnode *pNode = NULL;
16,075,500✔
442
  int32_t     code = 0;
16,075,500✔
443

444
  if (tsem_wait(&qset->sem) != 0) {
16,075,500!
445
    uError("failed to wait semaphore for qset:%p", qset);
×
446
  }
447

448
  (void)taosThreadMutexLock(&qset->mutex);
16,049,042✔
449

450
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
20,179,185✔
451
    if (qset->current == NULL) qset->current = qset->head;
19,553,020✔
452
    STaosQueue *queue = qset->current;
19,553,020✔
453
    if (queue) qset->current = queue->next;
19,553,020!
454
    if (queue == NULL) break;
19,553,020!
455
    if (queue->head == NULL) continue;
19,553,020✔
456

457
    (void)taosThreadMutexLock(&queue->mutex);
15,450,224✔
458

459
    if (queue->head) {
15,450,261!
460
      pNode = queue->head;
15,450,262✔
461
      *ppItem = pNode->item;
15,450,262✔
462
      qinfo->ahandle = queue->ahandle;
15,450,262✔
463
      qinfo->fp = queue->itemFp;
15,450,262✔
464
      qinfo->queue = queue;
15,450,262✔
465
      qinfo->timestamp = pNode->timestamp;
15,450,262✔
466

467
      queue->head = pNode->next;
15,450,262✔
468
      if (queue->head == NULL) queue->tail = NULL;
15,450,262✔
469
      // queue->numOfItems--;
470
      queue->memOfItems -= (pNode->size + pNode->dataSize);
15,450,262✔
471
      (void)atomic_sub_fetch_32(&qset->numOfItems, 1);
15,450,262✔
472
      code = 1;
15,450,261✔
473
      uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,
15,450,261✔
474
             queue->memOfItems);
475
    }
476

477
    (void)taosThreadMutexUnlock(&queue->mutex);
15,450,261✔
478
    if (pNode) break;
15,450,252!
479
  }
480

481
  (void)taosThreadMutexUnlock(&qset->mutex);
16,076,420✔
482

483
  return code;
16,075,613✔
484
}
485

486
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) {
7,717,317✔
487
  STaosQueue *queue;
488
  int32_t     code = 0;
7,717,317✔
489

490
  if (tsem_wait(&qset->sem) != 0) {
7,717,317!
491
    uError("failed to wait semaphore for qset:%p", qset);
×
492
  }
493
  (void)taosThreadMutexLock(&qset->mutex);
7,715,670✔
494

495
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
7,908,885✔
496
    if (qset->current == NULL) qset->current = qset->head;
7,850,312✔
497
    queue = qset->current;
7,850,312✔
498
    if (queue) qset->current = queue->next;
7,850,312✔
499
    if (queue == NULL) break;
7,850,312!
500
    if (queue->head == NULL) continue;
7,850,312✔
501

502
    (void)taosThreadMutexLock(&queue->mutex);
7,659,077✔
503

504
    if (queue->head) {
7,659,424!
505
      qall->current = queue->head;
7,659,450✔
506
      qall->start = queue->head;
7,659,450✔
507
      qall->numOfItems = queue->numOfItems;
7,659,450✔
508
      qall->memOfItems = queue->memOfItems;
7,659,450✔
509
      qall->unAccessedNumOfItems = queue->numOfItems;
7,659,450✔
510
      qall->unAccessMemOfItems = queue->memOfItems;
7,659,450✔
511

512
      code = qall->numOfItems;
7,659,450✔
513
      qinfo->ahandle = queue->ahandle;
7,659,450✔
514
      qinfo->fp = queue->itemsFp;
7,659,450✔
515
      qinfo->queue = queue;
7,659,450✔
516
      qinfo->timestamp = queue->head->timestamp;
7,659,450✔
517

518
      queue->head = NULL;
7,659,450✔
519
      queue->tail = NULL;
7,659,450✔
520
      // queue->numOfItems = 0;
521
      queue->memOfItems = 0;
7,659,450✔
522
      uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems);
7,659,450✔
523

524
      (void)atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
7,659,461✔
525
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
8,135,999✔
526
        if (tsem_wait(&qset->sem) != 0) {
476,157!
527
          uError("failed to wait semaphore for qset:%p", qset);
×
528
        }
529
      }
530
    }
531

532
    (void)taosThreadMutexUnlock(&queue->mutex);
7,659,816✔
533

534
    if (code != 0) break;
7,659,613✔
535
  }
536

537
  (void)taosThreadMutexUnlock(&qset->mutex);
7,717,744✔
538
  return code;
7,717,463✔
539
}
540

541
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
431,661✔
542
int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; }
×
543

544
int64_t taosQallUnAccessedItemSize(STaosQall *qall) { return qall->unAccessedNumOfItems; }
20,791✔
545
int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfItems; }
14,753✔
546

547
void    taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
×
548
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
12,268✔
549

550
void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId) { pQueue->threadId = threadId; }
61,377✔
551

552
int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; }
122,676✔
553

554
#if 0
555

556
void taosResetQsetThread(STaosQset *qset, void *pItem) {
557
  if (pItem == NULL) return;
558
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
559

560
  (void)taosThreadMutexLock(&qset->mutex);
561
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
562
    tsem_post(&qset->sem);
563
  }
564
  (void)taosThreadMutexUnlock(&qset->mutex);
565
}
566

567
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc