• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3524

08 Nov 2024 04:27AM UTC coverage: 60.898% (+5.0%) from 55.861%
#3524

push

travis-ci

web-flow
Merge pull request #28647 from taosdata/fix/3.0/TD-32519_drop_ctb

fix TD-32519 drop child table with tsma caused crash

118687 of 248552 branches covered (47.75%)

Branch coverage included in aggregate %.

286 of 337 new or added lines in 18 files covered. (84.87%)

9647 existing lines in 190 files now uncovered.

199106 of 273291 relevant lines covered (72.85%)

15236719.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.23
/source/util/src/tqueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tqueue.h"
18
#include "taoserror.h"
19
#include "tlog.h"
20
#include "tutil.h"
21

22
int64_t tsQueueMemoryAllowed = 0;
23
int64_t tsQueueMemoryUsed = 0;
24

25
struct STaosQueue {
26
  STaosQnode   *head;
27
  STaosQnode   *tail;
28
  STaosQueue   *next;     // for queue set
29
  STaosQset    *qset;     // for queue set
30
  void         *ahandle;  // for queue set
31
  FItem         itemFp;
32
  FItems        itemsFp;
33
  TdThreadMutex mutex;
34
  int64_t       memOfItems;
35
  int32_t       numOfItems;
36
  int64_t       threadId;
37
  int64_t       memLimit;
38
  int64_t       itemLimit;
39
};
40

41
struct STaosQset {
42
  STaosQueue   *head;
43
  STaosQueue   *current;
44
  TdThreadMutex mutex;
45
  tsem_t        sem;
46
  int32_t       numOfQueues;
47
  int32_t       numOfItems;
48
};
49

50
struct STaosQall {
51
  STaosQnode *current;
52
  STaosQnode *start;
53
  int32_t     numOfItems;
54
  int64_t     memOfItems;
55
  int32_t     unAccessedNumOfItems;
56
  int64_t     unAccessMemOfItems;
57
};
58

59
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
28,415✔
60
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }
28,417✔
61

62
int32_t taosOpenQueue(STaosQueue **queue) {
7,529,734✔
63
  *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
7,529,734✔
64
  if (*queue == NULL) {
7,536,006!
UNCOV
65
    return terrno;
×
66
  }
67

68
  int32_t code = taosThreadMutexInit(&(*queue)->mutex, NULL);
7,536,006✔
69
  if (code) {
7,533,163!
70
    taosMemoryFreeClear(*queue);
×
71
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
72
  }
73

74
  uDebug("queue:%p is opened", queue);
7,533,163✔
75
  return 0;
7,536,840✔
76
}
77

78
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
121,165✔
79
  if (queue == NULL) return;
121,165!
80
  queue->itemFp = itemFp;
121,165✔
81
  queue->itemsFp = itemsFp;
121,165✔
82
}
83

84
void taosCloseQueue(STaosQueue *queue) {
7,536,823✔
85
  if (queue == NULL) return;
7,536,823!
86
  STaosQnode *pTemp;
87
  STaosQset  *qset;
88

89
  (void)taosThreadMutexLock(&queue->mutex);
7,536,823✔
90
  STaosQnode *pNode = queue->head;
7,537,102✔
91
  queue->head = NULL;
7,537,102✔
92
  qset = queue->qset;
7,537,102✔
93
  (void)taosThreadMutexUnlock(&queue->mutex);
7,537,102✔
94

95
  if (queue->qset) {
7,537,103✔
96
    taosRemoveFromQset(qset, queue);
40,676✔
97
  }
98

99
  while (pNode) {
7,537,521✔
100
    pTemp = pNode;
589✔
101
    pNode = pNode->next;
589✔
102
    taosMemoryFree(pTemp);
589✔
103
  }
104

105
  (void)taosThreadMutexDestroy(&queue->mutex);
7,536,932✔
106
  taosMemoryFree(queue);
7,536,764✔
107

108
  uDebug("queue:%p is closed", queue);
7,537,196✔
109
}
110

111
bool taosQueueEmpty(STaosQueue *queue) {
25,412,220✔
112
  if (queue == NULL) return true;
25,412,220!
113

114
  bool empty = false;
25,412,220✔
115
  (void)taosThreadMutexLock(&queue->mutex);
25,412,220✔
116
  if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) {
25,480,166✔
117
    empty = true;
9,976,606✔
118
  }
119
  (void)taosThreadMutexUnlock(&queue->mutex);
25,480,166✔
120

121
  return empty;
25,484,600✔
122
}
123

124
void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
66,267,156✔
125
  if (queue == NULL) return;
66,267,156!
126

127
  (void)taosThreadMutexLock(&queue->mutex);
66,267,156✔
128
  queue->numOfItems -= items;
66,291,122✔
129
  (void)taosThreadMutexUnlock(&queue->mutex);
66,291,122✔
130
}
131

132
int32_t taosQueueItemSize(STaosQueue *queue) {
32,760,060✔
133
  if (queue == NULL) return 0;
32,760,060!
134

135
  (void)taosThreadMutexLock(&queue->mutex);
32,760,060✔
136
  int32_t numOfItems = queue->numOfItems;
32,779,461✔
137
  (void)taosThreadMutexUnlock(&queue->mutex);
32,779,461✔
138

139
  uTrace("queue:%p, numOfItems:%d memOfItems:%" PRId64, queue, queue->numOfItems, queue->memOfItems);
32,792,432✔
140
  return numOfItems;
32,793,633✔
141
}
142

143
int64_t taosQueueMemorySize(STaosQueue *queue) {
1,628,988✔
144
  (void)taosThreadMutexLock(&queue->mutex);
1,628,988✔
145
  int64_t memOfItems = queue->memOfItems;
1,629,152✔
146
  (void)taosThreadMutexUnlock(&queue->mutex);
1,629,152✔
147
  return memOfItems;
1,629,187✔
148
}
149

150
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) {
89,582,989✔
151
  int64_t alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
89,582,989✔
152
  if (alloced > tsQueueMemoryAllowed) {
89,737,088✔
153
    if (itype == RPC_QITEM) {
19,220,025!
154
      uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
×
155
             tsQueueMemoryAllowed);
156
      (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
157
      return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
×
158
    }
159
  }
160

161
  *item = NULL;
89,737,088✔
162
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
89,737,088✔
163
  if (pNode == NULL) {
89,647,115!
UNCOV
164
    (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
UNCOV
165
    return terrno;
×
166
  }
167

168
  pNode->dataSize = dataSize;
89,647,115✔
169
  pNode->size = size;
89,647,115✔
170
  pNode->itype = itype;
89,647,115✔
171
  pNode->timestamp = taosGetTimestampUs();
89,643,883✔
172
  uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
89,643,883✔
173
  *item = pNode->item;
89,672,134✔
174
  return 0;
89,672,134✔
175
}
176

177
void taosFreeQitem(void *pItem) {
89,672,944✔
178
  if (pItem == NULL) return;
89,672,944!
179

180
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
89,672,944✔
181
  int64_t     alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
89,672,944✔
182
  uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
89,727,780✔
183

184
  taosMemoryFree(pNode);
89,727,781✔
185
}
186

187
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
89,551,053✔
188
  int32_t     code = 0;
89,551,053✔
189
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
89,551,053✔
190
  pNode->timestamp = taosGetTimestampUs();
89,628,730✔
191
  pNode->next = NULL;
89,628,730✔
192

193
  (void)taosThreadMutexLock(&queue->mutex);
89,628,730✔
194
  if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) {
89,683,254!
195
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
196
    uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
×
197
           queue->memLimit, tstrerror(code));
198

199
    (void)taosThreadMutexUnlock(&queue->mutex);
×
200
    return code;
×
201
  } else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) {
89,683,254!
202
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
203
    uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
×
204
           queue->itemLimit, tstrerror(code));
205
    (void)taosThreadMutexUnlock(&queue->mutex);
×
206
    return code;
×
207
  }
208

209
  if (queue->tail) {
89,683,254✔
210
    queue->tail->next = pNode;
21,124,688✔
211
    queue->tail = pNode;
21,124,688✔
212
  } else {
213
    queue->head = pNode;
68,558,566✔
214
    queue->tail = pNode;
68,558,566✔
215
  }
216
  queue->numOfItems++;
89,683,254✔
217
  queue->memOfItems += (pNode->size + pNode->dataSize);
89,683,254✔
218
  if (queue->qset) {
89,683,254✔
219
    (void)atomic_add_fetch_32(&queue->qset->numOfItems, 1);
73,424,988✔
220
  }
221

222
  uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
89,713,810✔
223

224
  (void)taosThreadMutexUnlock(&queue->mutex);
89,713,816✔
225

226
  if (queue->qset) {
89,685,404✔
227
    if (tsem_post(&queue->qset->sem) != 0) {
73,446,147!
228
      uError("failed to post semaphore for queue set:%p", queue->qset);
×
229
    }
230
  }
231
  return code;
89,671,863✔
232
}
233

234
void taosReadQitem(STaosQueue *queue, void **ppItem) {
16,088,303✔
235
  STaosQnode *pNode = NULL;
16,088,303✔
236

237
  (void)taosThreadMutexLock(&queue->mutex);
16,088,303✔
238

239
  if (queue->head) {
16,092,560✔
240
    pNode = queue->head;
15,504,854✔
241
    *ppItem = pNode->item;
15,504,854✔
242
    queue->head = pNode->next;
15,504,854✔
243
    if (queue->head == NULL) {
15,504,854✔
244
      queue->tail = NULL;
5,037,274✔
245
    }
246
    queue->numOfItems--;
15,504,854✔
247
    queue->memOfItems -= (pNode->size + pNode->dataSize);
15,504,854✔
248
    if (queue->qset) {
15,504,854!
249
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
×
250
    }
251
    uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
15,504,854✔
252
           queue->memOfItems);
253
  }
254

255
  (void)taosThreadMutexUnlock(&queue->mutex);
16,092,560✔
256
}
16,099,300✔
257

258
int32_t taosAllocateQall(STaosQall **qall) {
198,351✔
259
  *qall = taosMemoryCalloc(1, sizeof(STaosQall));
198,351✔
260
  if (*qall == NULL) {
198,372!
UNCOV
261
    return terrno;
×
262
  }
263
  return 0;
198,373✔
264
}
265

266
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
197,890✔
267

268
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
518,077✔
269
  int32_t numOfItems = 0;
518,077✔
270
  bool    empty;
271

272
  (void)taosThreadMutexLock(&queue->mutex);
518,077✔
273

274
  empty = queue->head == NULL;
518,129✔
275
  if (!empty) {
518,129✔
276
    memset(qall, 0, sizeof(STaosQall));
175,161✔
277
    qall->current = queue->head;
175,161✔
278
    qall->start = queue->head;
175,161✔
279
    qall->numOfItems = queue->numOfItems;
175,161✔
280
    qall->memOfItems = queue->memOfItems;
175,161✔
281

282
    qall->unAccessedNumOfItems = queue->numOfItems;
175,161✔
283
    qall->unAccessMemOfItems = queue->memOfItems;
175,161✔
284

285
    numOfItems = qall->numOfItems;
175,161✔
286

287
    queue->head = NULL;
175,161✔
288
    queue->tail = NULL;
175,161✔
289
    queue->numOfItems = 0;
175,161✔
290
    queue->memOfItems = 0;
175,161✔
291
    uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems,
175,161✔
292
           queue->memOfItems);
293
    if (queue->qset) {
175,161!
294
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
×
295
    }
296
  }
297

298
  (void)taosThreadMutexUnlock(&queue->mutex);
518,129✔
299

300
  // if source queue is empty, we set destination qall to empty too.
301
  if (empty) {
518,132✔
302
    qall->current = NULL;
342,974✔
303
    qall->start = NULL;
342,974✔
304
    qall->numOfItems = 0;
342,974✔
305
  }
306
  return numOfItems;
518,132✔
307
}
308

309
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
39,164,534✔
310
  STaosQnode *pNode;
311
  int32_t     num = 0;
39,164,534✔
312

313
  pNode = qall->current;
39,164,534✔
314
  if (pNode) qall->current = pNode->next;
39,164,534✔
315

316
  if (pNode) {
39,164,534✔
317
    *ppItem = pNode->item;
38,563,825✔
318
    num = 1;
38,563,825✔
319

320
    qall->unAccessedNumOfItems -= 1;
38,563,825✔
321
    qall->unAccessMemOfItems -= pNode->dataSize;
38,563,825✔
322

323
    uTrace("item:%p is fetched", *ppItem);
38,563,825✔
324
  } else {
325
    *ppItem = NULL;
600,709✔
326
  }
327

328
  return num;
39,177,018✔
329
}
330

331
int32_t taosOpenQset(STaosQset **qset) {
94,570✔
332
  *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
94,570✔
333
  if (*qset == NULL) {
94,570!
334
    return terrno;
×
335
  }
336

337
  (void)taosThreadMutexInit(&(*qset)->mutex, NULL);
94,570✔
338
  if (tsem_init(&(*qset)->sem, 0, 0) != 0) {
94,570!
339
    taosMemoryFree(*qset);
×
340
    return terrno;
×
341
  }
342

343
  uDebug("qset:%p is opened", qset);
94,570✔
344
  return 0;
94,570✔
345
}
346

347
void taosCloseQset(STaosQset *qset) {
94,570✔
348
  if (qset == NULL) return;
94,570!
349

350
  // remove all the queues from qset
351
  (void)taosThreadMutexLock(&qset->mutex);
94,570✔
352
  while (qset->head) {
175,061✔
353
    STaosQueue *queue = qset->head;
80,492✔
354
    qset->head = qset->head->next;
80,492✔
355

356
    queue->qset = NULL;
80,492✔
357
    queue->next = NULL;
80,492✔
358
  }
359
  (void)taosThreadMutexUnlock(&qset->mutex);
94,569✔
360

361
  (void)taosThreadMutexDestroy(&qset->mutex);
94,570✔
362
  if (tsem_destroy(&qset->sem) != 0) {
94,570!
363
    uError("failed to destroy semaphore for qset:%p", qset);
×
364
  }
365
  taosMemoryFree(qset);
94,569✔
366
  uDebug("qset:%p is closed", qset);
94,569✔
367
}
368

369
// tsem_post 'qset->sem', so that reader threads waiting for it
370
// resumes execution and return, should only be used to signal the
371
// thread to exit.
372
void taosQsetThreadResume(STaosQset *qset) {
763,580✔
373
  uDebug("qset:%p, it will exit", qset);
763,580✔
374
  if (tsem_post(&qset->sem) != 0) {
763,580!
375
    uError("failed to post semaphore for qset:%p", qset);
×
376
  }
377
}
763,586✔
378

379
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
121,169✔
380
  if (queue->qset) return TSDB_CODE_INVALID_PARA;
121,169!
381

382
  (void)taosThreadMutexLock(&qset->mutex);
121,169✔
383

384
  queue->next = qset->head;
121,169✔
385
  queue->ahandle = ahandle;
121,169✔
386
  qset->head = queue;
121,169✔
387
  qset->numOfQueues++;
121,169✔
388

389
  (void)taosThreadMutexLock(&queue->mutex);
121,169✔
390
  (void)atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
121,170✔
391
  queue->qset = qset;
121,170✔
392
  (void)taosThreadMutexUnlock(&queue->mutex);
121,170✔
393

394
  (void)taosThreadMutexUnlock(&qset->mutex);
121,170✔
395

396
  uTrace("queue:%p is added into qset:%p", queue, qset);
121,170✔
397
  return 0;
121,170✔
398
}
399

400
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
40,675✔
401
  STaosQueue *tqueue = NULL;
40,675✔
402

403
  (void)taosThreadMutexLock(&qset->mutex);
40,675✔
404

405
  if (qset->head) {
40,677!
406
    if (qset->head == queue) {
40,677✔
407
      qset->head = qset->head->next;
28,997✔
408
      tqueue = queue;
28,997✔
409
    } else {
410
      STaosQueue *prev = qset->head;
11,680✔
411
      tqueue = qset->head->next;
11,680✔
412
      while (tqueue) {
39,595!
413
        if (tqueue == queue) {
39,595✔
414
          prev->next = tqueue->next;
11,680✔
415
          break;
11,680✔
416
        } else {
417
          prev = tqueue;
27,915✔
418
          tqueue = tqueue->next;
27,915✔
419
        }
420
      }
421
    }
422

423
    if (tqueue) {
40,677✔
424
      if (qset->current == queue) qset->current = tqueue->next;
40,675✔
425
      qset->numOfQueues--;
40,675✔
426

427
      (void)taosThreadMutexLock(&queue->mutex);
40,675✔
428
      (void)atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
40,677✔
429
      queue->qset = NULL;
40,677✔
430
      queue->next = NULL;
40,677✔
431
      (void)taosThreadMutexUnlock(&queue->mutex);
40,677✔
432
    }
433
  }
434

435
  (void)taosThreadMutexUnlock(&qset->mutex);
40,679✔
436

437
  uDebug("queue:%p is removed from qset:%p", queue, qset);
40,677✔
438
}
40,677✔
439

440
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) {
36,274,129✔
441
  STaosQnode *pNode = NULL;
36,274,129✔
442
  int32_t     code = 0;
36,274,129✔
443

444
  if (tsem_wait(&qset->sem) != 0) {
36,274,129!
445
    uError("failed to wait semaphore for qset:%p", qset);
×
446
  }
447

448
  (void)taosThreadMutexLock(&qset->mutex);
36,237,648✔
449

450
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
190,186,557✔
451
    if (qset->current == NULL) qset->current = qset->head;
189,485,662✔
452
    STaosQueue *queue = qset->current;
189,485,662✔
453
    if (queue) qset->current = queue->next;
189,485,662✔
454
    if (queue == NULL) break;
189,485,662!
455
    if (queue->head == NULL) continue;
189,485,662✔
456

457
    (void)taosThreadMutexLock(&queue->mutex);
35,581,304✔
458

459
    if (queue->head) {
35,584,174!
460
      pNode = queue->head;
35,584,188✔
461
      *ppItem = pNode->item;
35,584,188✔
462
      qinfo->ahandle = queue->ahandle;
35,584,188✔
463
      qinfo->fp = queue->itemFp;
35,584,188✔
464
      qinfo->queue = queue;
35,584,188✔
465
      qinfo->timestamp = pNode->timestamp;
35,584,188✔
466

467
      queue->head = pNode->next;
35,584,188✔
468
      if (queue->head == NULL) queue->tail = NULL;
35,584,188✔
469
      // queue->numOfItems--;
470
      queue->memOfItems -= (pNode->size + pNode->dataSize);
35,584,188✔
471
      (void)atomic_sub_fetch_32(&qset->numOfItems, 1);
35,584,188✔
472
      code = 1;
35,584,194✔
473
      uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,
35,584,194✔
474
             queue->memOfItems);
475
    }
476

477
    (void)taosThreadMutexUnlock(&queue->mutex);
35,584,180✔
478
    if (pNode) break;
35,583,990!
479
  }
480

481
  (void)taosThreadMutexUnlock(&qset->mutex);
36,284,958✔
482

483
  return code;
36,280,918✔
484
}
485

486
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) {
30,767,280✔
487
  STaosQueue *queue;
488
  int32_t     code = 0;
30,767,280✔
489

490
  if (tsem_wait(&qset->sem) != 0) {
30,767,280!
491
    uError("failed to wait semaphore for qset:%p", qset);
×
492
  }
493
  (void)taosThreadMutexLock(&qset->mutex);
30,758,364✔
494

495
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
82,692,728✔
496
    if (qset->current == NULL) qset->current = qset->head;
82,610,812✔
497
    queue = qset->current;
82,610,812✔
498
    if (queue) qset->current = queue->next;
82,610,812!
499
    if (queue == NULL) break;
82,610,812!
500
    if (queue->head == NULL) continue;
82,610,812✔
501

502
    (void)taosThreadMutexLock(&queue->mutex);
30,684,648✔
503

504
    if (queue->head) {
30,706,310!
505
      qall->current = queue->head;
30,706,568✔
506
      qall->start = queue->head;
30,706,568✔
507
      qall->numOfItems = queue->numOfItems;
30,706,568✔
508
      qall->memOfItems = queue->memOfItems;
30,706,568✔
509
      qall->unAccessedNumOfItems = queue->numOfItems;
30,706,568✔
510
      qall->unAccessMemOfItems = queue->memOfItems;
30,706,568✔
511

512
      code = qall->numOfItems;
30,706,568✔
513
      qinfo->ahandle = queue->ahandle;
30,706,568✔
514
      qinfo->fp = queue->itemsFp;
30,706,568✔
515
      qinfo->queue = queue;
30,706,568✔
516
      qinfo->timestamp = queue->head->timestamp;
30,706,568✔
517

518
      queue->head = NULL;
30,706,568✔
519
      queue->tail = NULL;
30,706,568✔
520
      // queue->numOfItems = 0;
521
      queue->memOfItems = 0;
30,706,568✔
522
      uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems);
30,706,568✔
523

524
      (void)atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
30,706,576✔
525
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
37,876,264✔
526
        if (tsem_wait(&qset->sem) != 0) {
7,170,622!
527
          uError("failed to wait semaphore for qset:%p", qset);
×
528
        }
529
      }
530
    }
531

532
    (void)taosThreadMutexUnlock(&queue->mutex);
30,705,384✔
533

534
    if (code != 0) break;
30,704,322!
535
  }
536

537
  (void)taosThreadMutexUnlock(&qset->mutex);
30,786,653✔
538
  return code;
30,767,130✔
539
}
540

541
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
1,798,955✔
542
int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; }
×
543

544
int64_t taosQallUnAccessedItemSize(STaosQall *qall) { return qall->unAccessedNumOfItems; }
65,458✔
545
int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfItems; }
54,065✔
546

547
void    taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
×
548
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
13,559✔
549

550
void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId) { pQueue->threadId = threadId; }
67,930✔
551

552
int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; }
135,586✔
553

554
#if 0
555

556
void taosResetQsetThread(STaosQset *qset, void *pItem) {
557
  if (pItem == NULL) return;
558
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
559

560
  (void)taosThreadMutexLock(&qset->mutex);
561
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
562
    tsem_post(&qset->sem);
563
  }
564
  (void)taosThreadMutexUnlock(&qset->mutex);
565
}
566

567
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc