• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3541

26 Nov 2024 03:56AM UTC coverage: 60.776% (-0.07%) from 60.846%
#3541

push

travis-ci

web-flow
Merge pull request #28920 from taosdata/fix/TD-33008-3.0

fix(query)[TD-33008]. fix error handling in tsdbCacheRead

120076 of 252763 branches covered (47.51%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

1395 existing lines in 154 files now uncovered.

200995 of 275526 relevant lines covered (72.95%)

19612328.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.04
/source/util/src/tqueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tqueue.h"
18
#include "taoserror.h"
19
#include "tlog.h"
20
#include "tutil.h"
21

22
int64_t tsQueueMemoryAllowed = 0;
23
int64_t tsQueueMemoryUsed = 0;
24

25
struct STaosQueue {
26
  STaosQnode   *head;
27
  STaosQnode   *tail;
28
  STaosQueue   *next;     // for queue set
29
  STaosQset    *qset;     // for queue set
30
  void         *ahandle;  // for queue set
31
  FItem         itemFp;
32
  FItems        itemsFp;
33
  TdThreadMutex mutex;
34
  int64_t       memOfItems;
35
  int32_t       numOfItems;
36
  int64_t       threadId;
37
  int64_t       memLimit;
38
  int64_t       itemLimit;
39
};
40

41
struct STaosQset {
42
  STaosQueue   *head;
43
  STaosQueue   *current;
44
  TdThreadMutex mutex;
45
  tsem_t        sem;
46
  int32_t       numOfQueues;
47
  int32_t       numOfItems;
48
};
49

50
struct STaosQall {
51
  STaosQnode *current;
52
  STaosQnode *start;
53
  int32_t     numOfItems;
54
  int64_t     memOfItems;
55
  int32_t     unAccessedNumOfItems;
56
  int64_t     unAccessMemOfItems;
57
};
58

59
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
28,042✔
60
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }
28,042✔
61

62
int32_t taosOpenQueue(STaosQueue **queue) {
8,681,456✔
63
  *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
8,681,456✔
64
  if (*queue == NULL) {
8,686,749!
65
    return terrno;
×
66
  }
67

68
  int32_t code = taosThreadMutexInit(&(*queue)->mutex, NULL);
8,686,749✔
69
  if (code) {
8,685,846!
70
    taosMemoryFreeClear(*queue);
×
71
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
72
  }
73

74
  uDebug("queue:%p is opened", queue);
8,685,846✔
75
  return 0;
8,688,471✔
76
}
77

78
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
122,451✔
79
  if (queue == NULL) return;
122,451!
80
  queue->itemFp = itemFp;
122,451✔
81
  queue->itemsFp = itemsFp;
122,451✔
82
}
83

84
void taosCloseQueue(STaosQueue *queue) {
8,689,310✔
85
  if (queue == NULL) return;
8,689,310!
86
  STaosQnode *pTemp;
87
  STaosQset  *qset;
88

89
  (void)taosThreadMutexLock(&queue->mutex);
8,689,310✔
90
  STaosQnode *pNode = queue->head;
8,689,698✔
91
  queue->head = NULL;
8,689,698✔
92
  qset = queue->qset;
8,689,698✔
93
  (void)taosThreadMutexUnlock(&queue->mutex);
8,689,698✔
94

95
  if (queue->qset) {
8,689,700✔
96
    taosRemoveFromQset(qset, queue);
41,105✔
97
  }
98

99
  while (pNode) {
8,690,087✔
100
    pTemp = pNode;
538✔
101
    pNode = pNode->next;
538✔
102
    taosMemoryFree(pTemp);
538✔
103
  }
104

105
  (void)taosThreadMutexDestroy(&queue->mutex);
8,689,549✔
106
  taosMemoryFree(queue);
8,689,119✔
107

108
  uDebug("queue:%p is closed", queue);
8,689,882✔
109
}
110

111
bool taosQueueEmpty(STaosQueue *queue) {
29,460,763✔
112
  if (queue == NULL) return true;
29,460,763!
113

114
  bool empty = false;
29,460,763✔
115
  (void)taosThreadMutexLock(&queue->mutex);
29,460,763✔
116
  if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) {
29,485,906✔
117
    empty = true;
10,994,237✔
118
  }
119
  (void)taosThreadMutexUnlock(&queue->mutex);
29,485,906✔
120

121
  return empty;
29,491,178✔
122
}
123

124
void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
66,803,490✔
125
  if (queue == NULL) return;
66,803,490!
126

127
  (void)taosThreadMutexLock(&queue->mutex);
66,803,490✔
128
  queue->numOfItems -= items;
66,832,641✔
129
  (void)taosThreadMutexUnlock(&queue->mutex);
66,832,641✔
130
}
131

132
int32_t taosQueueItemSize(STaosQueue *queue) {
38,028,320✔
133
  if (queue == NULL) return 0;
38,028,320!
134

135
  (void)taosThreadMutexLock(&queue->mutex);
38,028,320✔
136
  int32_t numOfItems = queue->numOfItems;
38,035,268✔
137
  (void)taosThreadMutexUnlock(&queue->mutex);
38,035,268✔
138

139
  uTrace("queue:%p, numOfItems:%d memOfItems:%" PRId64, queue, queue->numOfItems, queue->memOfItems);
38,041,847✔
140
  return numOfItems;
38,042,498✔
141
}
142

143
int64_t taosQueueMemorySize(STaosQueue *queue) {
1,081,931✔
144
  (void)taosThreadMutexLock(&queue->mutex);
1,081,931✔
145
  int64_t memOfItems = queue->memOfItems;
1,082,066✔
146
  (void)taosThreadMutexUnlock(&queue->mutex);
1,082,066✔
147
  return memOfItems;
1,082,057✔
148
}
149

150
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) {
96,430,339✔
151
  int64_t alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
96,430,339✔
152
  if (alloced > tsQueueMemoryAllowed) {
96,577,447✔
153
    if (itype == RPC_QITEM) {
17,880,775!
154
      uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
×
155
             tsQueueMemoryAllowed);
156
      (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
157
      return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
×
158
    }
159
  }
160

161
  *item = NULL;
96,577,447✔
162
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
96,577,447✔
163
  if (pNode == NULL) {
96,477,538!
164
    (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
165
    return terrno;
×
166
  }
167

168
  pNode->dataSize = dataSize;
96,477,538✔
169
  pNode->size = size;
96,477,538✔
170
  pNode->itype = itype;
96,477,538✔
171
  pNode->timestamp = taosGetTimestampUs();
96,483,069✔
172
  uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
96,483,069✔
173
  *item = pNode->item;
96,524,350✔
174
  return 0;
96,524,350✔
175
}
176

177
void taosFreeQitem(void *pItem) {
96,534,977✔
178
  if (pItem == NULL) return;
96,534,977!
179

180
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
96,534,977✔
181
  int64_t     alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
96,534,977✔
182
  uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
96,576,148✔
183

184
  taosMemoryFree(pNode);
96,576,147✔
185
}
186

187
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
96,406,216✔
188
  int32_t     code = 0;
96,406,216✔
189
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
96,406,216✔
190
  pNode->timestamp = taosGetTimestampUs();
96,465,780✔
191
  pNode->next = NULL;
96,465,780✔
192

193
  (void)taosThreadMutexLock(&queue->mutex);
96,465,780✔
194
  if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) {
96,530,327!
195
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
196
    uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
×
197
           queue->memLimit, tstrerror(code));
198

199
    (void)taosThreadMutexUnlock(&queue->mutex);
×
200
    return code;
×
201
  } else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) {
96,530,327!
202
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
203
    uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
×
204
           queue->itemLimit, tstrerror(code));
205
    (void)taosThreadMutexUnlock(&queue->mutex);
×
206
    return code;
×
207
  }
208

209
  if (queue->tail) {
96,530,327✔
210
    queue->tail->next = pNode;
26,014,261✔
211
    queue->tail = pNode;
26,014,261✔
212
  } else {
213
    queue->head = pNode;
70,516,066✔
214
    queue->tail = pNode;
70,516,066✔
215
  }
216
  queue->numOfItems++;
96,530,327✔
217
  queue->memOfItems += (pNode->size + pNode->dataSize);
96,530,327✔
218
  if (queue->qset) {
96,530,327✔
219
    (void)atomic_add_fetch_32(&queue->qset->numOfItems, 1);
77,623,481✔
220
  }
221

222
  uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
96,553,817✔
223

224
  (void)taosThreadMutexUnlock(&queue->mutex);
96,553,819✔
225

226
  if (queue->qset) {
96,548,596✔
227
    if (tsem_post(&queue->qset->sem) != 0) {
77,650,869!
228
      uError("failed to post semaphore for queue set:%p", queue->qset);
×
229
    }
230
  }
231
  return code;
96,517,830✔
232
}
233

234
void taosReadQitem(STaosQueue *queue, void **ppItem) {
19,040,828✔
235
  STaosQnode *pNode = NULL;
19,040,828✔
236

237
  (void)taosThreadMutexLock(&queue->mutex);
19,040,828✔
238

239
  if (queue->head) {
19,041,071✔
240
    pNode = queue->head;
18,446,500✔
241
    *ppItem = pNode->item;
18,446,500✔
242
    queue->head = pNode->next;
18,446,500✔
243
    if (queue->head == NULL) {
18,446,500✔
244
      queue->tail = NULL;
6,399,306✔
245
    }
246
    queue->numOfItems--;
18,446,500✔
247
    queue->memOfItems -= (pNode->size + pNode->dataSize);
18,446,500✔
248
    if (queue->qset) {
18,446,500!
249
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
×
250
    }
251
    uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
18,446,500✔
252
           queue->memOfItems);
253
  }
254

255
  (void)taosThreadMutexUnlock(&queue->mutex);
19,041,071✔
256
}
19,044,185✔
257

258
int32_t taosAllocateQall(STaosQall **qall) {
187,385✔
259
  *qall = taosMemoryCalloc(1, sizeof(STaosQall));
187,385✔
260
  if (*qall == NULL) {
187,412!
UNCOV
261
    return terrno;
×
262
  }
263
  return 0;
187,417✔
264
}
265

266
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
186,912✔
267

268
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
446,276✔
269
  int32_t numOfItems = 0;
446,276✔
270
  bool    empty;
271

272
  (void)taosThreadMutexLock(&queue->mutex);
446,276✔
273

274
  empty = queue->head == NULL;
446,357✔
275
  if (!empty) {
446,357✔
276
    memset(qall, 0, sizeof(STaosQall));
148,758✔
277
    qall->current = queue->head;
148,758✔
278
    qall->start = queue->head;
148,758✔
279
    qall->numOfItems = queue->numOfItems;
148,758✔
280
    qall->memOfItems = queue->memOfItems;
148,758✔
281

282
    qall->unAccessedNumOfItems = queue->numOfItems;
148,758✔
283
    qall->unAccessMemOfItems = queue->memOfItems;
148,758✔
284

285
    numOfItems = qall->numOfItems;
148,758✔
286

287
    queue->head = NULL;
148,758✔
288
    queue->tail = NULL;
148,758✔
289
    queue->numOfItems = 0;
148,758✔
290
    queue->memOfItems = 0;
148,758✔
291
    uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems,
148,758✔
292
           queue->memOfItems);
293
    if (queue->qset) {
148,759!
294
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
×
295
    }
296
  }
297

298
  (void)taosThreadMutexUnlock(&queue->mutex);
446,358✔
299

300
  // if source queue is empty, we set destination qall to empty too.
301
  if (empty) {
446,376✔
302
    qall->current = NULL;
297,609✔
303
    qall->start = NULL;
297,609✔
304
    qall->numOfItems = 0;
297,609✔
305
  }
306
  return numOfItems;
446,376✔
307
}
308

309
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
46,527,464✔
310
  STaosQnode *pNode;
311
  int32_t     num = 0;
46,527,464✔
312

313
  pNode = qall->current;
46,527,464✔
314
  if (pNode) qall->current = pNode->next;
46,527,464✔
315

316
  if (pNode) {
46,527,464✔
317
    *ppItem = pNode->item;
46,011,825✔
318
    num = 1;
46,011,825✔
319

320
    qall->unAccessedNumOfItems -= 1;
46,011,825✔
321
    qall->unAccessMemOfItems -= pNode->dataSize;
46,011,825✔
322

323
    uTrace("item:%p is fetched", *ppItem);
46,011,825✔
324
  } else {
325
    *ppItem = NULL;
515,639✔
326
  }
327

328
  return num;
46,533,435✔
329
}
330

331
int32_t taosOpenQset(STaosQset **qset) {
95,498✔
332
  *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
95,498✔
333
  if (*qset == NULL) {
95,498!
334
    return terrno;
×
335
  }
336

337
  (void)taosThreadMutexInit(&(*qset)->mutex, NULL);
95,498✔
338
  if (tsem_init(&(*qset)->sem, 0, 0) != 0) {
95,497!
339
    taosMemoryFree(*qset);
×
340
    return terrno;
×
341
  }
342

343
  uDebug("qset:%p is opened", qset);
95,496✔
344
  return 0;
95,497✔
345
}
346

347
void taosCloseQset(STaosQset *qset) {
95,496✔
348
  if (qset == NULL) return;
95,496!
349

350
  // remove all the queues from qset
351
  (void)taosThreadMutexLock(&qset->mutex);
95,496✔
352
  while (qset->head) {
176,839✔
353
    STaosQueue *queue = qset->head;
81,343✔
354
    qset->head = qset->head->next;
81,343✔
355

356
    queue->qset = NULL;
81,343✔
357
    queue->next = NULL;
81,343✔
358
  }
359
  (void)taosThreadMutexUnlock(&qset->mutex);
95,496✔
360

361
  (void)taosThreadMutexDestroy(&qset->mutex);
95,497✔
362
  if (tsem_destroy(&qset->sem) != 0) {
95,497!
363
    uError("failed to destroy semaphore for qset:%p", qset);
×
364
  }
365
  taosMemoryFree(qset);
95,498✔
366
  uDebug("qset:%p is closed", qset);
95,498✔
367
}
368

369
// tsem_post 'qset->sem', so that reader threads waiting for it
370
// resumes execution and return, should only be used to signal the
371
// thread to exit.
372
void taosQsetThreadResume(STaosQset *qset) {
771,031✔
373
  uDebug("qset:%p, it will exit", qset);
771,031✔
374
  if (tsem_post(&qset->sem) != 0) {
771,031!
375
    uError("failed to post semaphore for qset:%p", qset);
×
376
  }
377
}
771,033✔
378

379
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
122,452✔
380
  if (queue->qset) return TSDB_CODE_INVALID_PARA;
122,452!
381

382
  (void)taosThreadMutexLock(&qset->mutex);
122,452✔
383

384
  queue->next = qset->head;
122,453✔
385
  queue->ahandle = ahandle;
122,453✔
386
  qset->head = queue;
122,453✔
387
  qset->numOfQueues++;
122,453✔
388

389
  (void)taosThreadMutexLock(&queue->mutex);
122,453✔
390
  (void)atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
122,451✔
391
  queue->qset = qset;
122,452✔
392
  (void)taosThreadMutexUnlock(&queue->mutex);
122,452✔
393

394
  (void)taosThreadMutexUnlock(&qset->mutex);
122,452✔
395

396
  uTrace("queue:%p is added into qset:%p", queue, qset);
122,451✔
397
  return 0;
122,452✔
398
}
399

400
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
41,102✔
401
  STaosQueue *tqueue = NULL;
41,102✔
402

403
  (void)taosThreadMutexLock(&qset->mutex);
41,102✔
404

405
  if (qset->head) {
41,106!
406
    if (qset->head == queue) {
41,106✔
407
      qset->head = qset->head->next;
29,131✔
408
      tqueue = queue;
29,131✔
409
    } else {
410
      STaosQueue *prev = qset->head;
11,975✔
411
      tqueue = qset->head->next;
11,975✔
412
      while (tqueue) {
41,377!
413
        if (tqueue == queue) {
41,377✔
414
          prev->next = tqueue->next;
11,975✔
415
          break;
11,975✔
416
        } else {
417
          prev = tqueue;
29,402✔
418
          tqueue = tqueue->next;
29,402✔
419
        }
420
      }
421
    }
422

423
    if (tqueue) {
41,106!
424
      if (qset->current == queue) qset->current = tqueue->next;
41,106✔
425
      qset->numOfQueues--;
41,106✔
426

427
      (void)taosThreadMutexLock(&queue->mutex);
41,106✔
428
      (void)atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
41,105✔
429
      queue->qset = NULL;
41,106✔
430
      queue->next = NULL;
41,106✔
431
      (void)taosThreadMutexUnlock(&queue->mutex);
41,106✔
432
    }
433
  }
434

435
  (void)taosThreadMutexUnlock(&qset->mutex);
41,105✔
436

437
  uDebug("queue:%p is removed from qset:%p", queue, qset);
41,105✔
438
}
41,105✔
439

440
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) {
32,761,415✔
441
  STaosQnode *pNode = NULL;
32,761,415✔
442
  int32_t     code = 0;
32,761,415✔
443

444
  if (tsem_wait(&qset->sem) != 0) {
32,761,415!
445
    uError("failed to wait semaphore for qset:%p", qset);
×
446
  }
447

448
  (void)taosThreadMutexLock(&qset->mutex);
32,736,106✔
449

450
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
185,164,845✔
451
    if (qset->current == NULL) qset->current = qset->head;
184,457,897✔
452
    STaosQueue *queue = qset->current;
184,457,897✔
453
    if (queue) qset->current = queue->next;
184,457,897✔
454
    if (queue == NULL) break;
184,457,897!
455
    if (queue->head == NULL) continue;
184,457,897✔
456

457
    (void)taosThreadMutexLock(&queue->mutex);
32,063,909✔
458

459
    if (queue->head) {
32,065,986!
460
      pNode = queue->head;
32,066,000✔
461
      *ppItem = pNode->item;
32,066,000✔
462
      qinfo->ahandle = queue->ahandle;
32,066,000✔
463
      qinfo->fp = queue->itemFp;
32,066,000✔
464
      qinfo->queue = queue;
32,066,000✔
465
      qinfo->timestamp = pNode->timestamp;
32,066,000✔
466

467
      queue->head = pNode->next;
32,066,000✔
468
      if (queue->head == NULL) queue->tail = NULL;
32,066,000✔
469
      // queue->numOfItems--;
470
      queue->memOfItems -= (pNode->size + pNode->dataSize);
32,066,000✔
471
      (void)atomic_sub_fetch_32(&qset->numOfItems, 1);
32,066,000✔
472
      code = 1;
32,066,051✔
473
      uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,
32,066,051✔
474
             queue->memOfItems);
475
    }
476

477
    (void)taosThreadMutexUnlock(&queue->mutex);
32,066,040✔
478
    if (pNode) break;
32,066,017!
479
  }
480

481
  (void)taosThreadMutexUnlock(&qset->mutex);
32,773,027✔
482

483
  return code;
32,769,655✔
484
}
485

486
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) {
34,827,252✔
487
  STaosQueue *queue;
488
  int32_t     code = 0;
34,827,252✔
489

490
  if (tsem_wait(&qset->sem) != 0) {
34,827,252!
491
    uError("failed to wait semaphore for qset:%p", qset);
×
492
  }
493
  (void)taosThreadMutexLock(&qset->mutex);
34,819,193✔
494

495
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
83,806,046✔
496
    if (qset->current == NULL) qset->current = qset->head;
83,730,125✔
497
    queue = qset->current;
83,730,125✔
498
    if (queue) qset->current = queue->next;
83,730,125!
499
    if (queue == NULL) break;
83,730,125!
500
    if (queue->head == NULL) continue;
83,730,125✔
501

502
    (void)taosThreadMutexLock(&queue->mutex);
34,750,095✔
503

504
    if (queue->head) {
34,763,593!
505
      qall->current = queue->head;
34,764,058✔
506
      qall->start = queue->head;
34,764,058✔
507
      qall->numOfItems = queue->numOfItems;
34,764,058✔
508
      qall->memOfItems = queue->memOfItems;
34,764,058✔
509
      qall->unAccessedNumOfItems = queue->numOfItems;
34,764,058✔
510
      qall->unAccessMemOfItems = queue->memOfItems;
34,764,058✔
511

512
      code = qall->numOfItems;
34,764,058✔
513
      qinfo->ahandle = queue->ahandle;
34,764,058✔
514
      qinfo->fp = queue->itemsFp;
34,764,058✔
515
      qinfo->queue = queue;
34,764,058✔
516
      qinfo->timestamp = queue->head->timestamp;
34,764,058✔
517

518
      queue->head = NULL;
34,764,058✔
519
      queue->tail = NULL;
34,764,058✔
520
      // queue->numOfItems = 0;
521
      queue->memOfItems = 0;
34,764,058✔
522
      uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems);
34,764,058✔
523

524
      (void)atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
34,764,070✔
525
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
45,584,319✔
526
        if (tsem_wait(&qset->sem) != 0) {
10,821,052!
527
          uError("failed to wait semaphore for qset:%p", qset);
×
528
        }
529
      }
530
    }
531

532
    (void)taosThreadMutexUnlock(&queue->mutex);
34,762,802✔
533

534
    if (code != 0) break;
34,763,979!
535
  }
536

537
  (void)taosThreadMutexUnlock(&qset->mutex);
34,840,931✔
538
  return code;
34,827,602✔
539
}
540

541
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
1,233,032✔
542
int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; }
×
543

544
int64_t taosQallUnAccessedItemSize(STaosQall *qall) { return qall->unAccessedNumOfItems; }
44,979✔
545
int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfItems; }
55,957✔
546

547
void    taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
×
548
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
13,702✔
549

550
void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId) { pQueue->threadId = threadId; }
68,647✔
551

552
int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; }
137,018✔
553

554
#if 0
555

556
void taosResetQsetThread(STaosQset *qset, void *pItem) {
557
  if (pItem == NULL) return;
558
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
559

560
  (void)taosThreadMutexLock(&qset->mutex);
561
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
562
    tsem_post(&qset->sem);
563
  }
564
  (void)taosThreadMutexUnlock(&qset->mutex);
565
}
566

567
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc