• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3530

16 Nov 2024 07:44AM UTC coverage: 60.219% (-0.7%) from 60.888%
#3530

push

travis-ci

web-flow
Update 03-ad.md

118417 of 252124 branches covered (46.97%)

Branch coverage included in aggregate %.

198982 of 274951 relevant lines covered (72.37%)

6072359.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.23
/source/util/src/tqueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tqueue.h"
18
#include "taoserror.h"
19
#include "tlog.h"
20
#include "tutil.h"
21

22
int64_t tsQueueMemoryAllowed = 0;
23
int64_t tsQueueMemoryUsed = 0;
24

25
struct STaosQueue {
26
  STaosQnode   *head;
27
  STaosQnode   *tail;
28
  STaosQueue   *next;     // for queue set
29
  STaosQset    *qset;     // for queue set
30
  void         *ahandle;  // for queue set
31
  FItem         itemFp;
32
  FItems        itemsFp;
33
  TdThreadMutex mutex;
34
  int64_t       memOfItems;
35
  int32_t       numOfItems;
36
  int64_t       threadId;
37
  int64_t       memLimit;
38
  int64_t       itemLimit;
39
};
40

41
struct STaosQset {
42
  STaosQueue   *head;
43
  STaosQueue   *current;
44
  TdThreadMutex mutex;
45
  tsem_t        sem;
46
  int32_t       numOfQueues;
47
  int32_t       numOfItems;
48
};
49

50
struct STaosQall {
51
  STaosQnode *current;
52
  STaosQnode *start;
53
  int32_t     numOfItems;
54
  int64_t     memOfItems;
55
  int32_t     unAccessedNumOfItems;
56
  int64_t     unAccessMemOfItems;
57
};
58

59
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
16,420✔
60
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }
16,420✔
61

62
int32_t taosOpenQueue(STaosQueue **queue) {
2,054,127✔
63
  *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
2,054,127✔
64
  if (*queue == NULL) {
2,055,096!
65
    return terrno;
×
66
  }
67

68
  int32_t code = taosThreadMutexInit(&(*queue)->mutex, NULL);
2,055,096✔
69
  if (code) {
2,054,502!
70
    taosMemoryFreeClear(*queue);
×
71
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
72
  }
73

74
  uDebug("queue:%p is opened", queue);
2,054,502✔
75
  return 0;
2,055,165✔
76
}
77

78
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
110,981✔
79
  if (queue == NULL) return;
110,981!
80
  queue->itemFp = itemFp;
110,981✔
81
  queue->itemsFp = itemsFp;
110,981✔
82
}
83

84
void taosCloseQueue(STaosQueue *queue) {
2,052,386✔
85
  if (queue == NULL) return;
2,052,386!
86
  STaosQnode *pTemp;
87
  STaosQset  *qset;
88

89
  (void)taosThreadMutexLock(&queue->mutex);
2,052,386✔
90
  STaosQnode *pNode = queue->head;
2,052,461✔
91
  queue->head = NULL;
2,052,461✔
92
  qset = queue->qset;
2,052,461✔
93
  (void)taosThreadMutexUnlock(&queue->mutex);
2,052,461✔
94

95
  if (queue->qset) {
2,052,459✔
96
    taosRemoveFromQset(qset, queue);
36,812✔
97
  }
98

99
  while (pNode) {
2,053,049✔
100
    pTemp = pNode;
599✔
101
    pNode = pNode->next;
599✔
102
    taosMemoryFree(pTemp);
599✔
103
  }
104

105
  (void)taosThreadMutexDestroy(&queue->mutex);
2,052,450✔
106
  taosMemoryFree(queue);
2,052,411✔
107

108
  uDebug("queue:%p is closed", queue);
2,052,447✔
109
}
110

111
bool taosQueueEmpty(STaosQueue *queue) {
7,334,213✔
112
  if (queue == NULL) return true;
7,334,213!
113

114
  bool empty = false;
7,334,213✔
115
  (void)taosThreadMutexLock(&queue->mutex);
7,334,213✔
116
  if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) {
7,360,812✔
117
    empty = true;
2,490,986✔
118
  }
119
  (void)taosThreadMutexUnlock(&queue->mutex);
7,360,812✔
120

121
  return empty;
7,366,254✔
122
}
123

124
void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
21,883,048✔
125
  if (queue == NULL) return;
21,883,048!
126

127
  (void)taosThreadMutexLock(&queue->mutex);
21,883,048✔
128
  queue->numOfItems -= items;
21,885,542✔
129
  (void)taosThreadMutexUnlock(&queue->mutex);
21,885,542✔
130
}
131

132
int32_t taosQueueItemSize(STaosQueue *queue) {
10,245,594✔
133
  if (queue == NULL) return 0;
10,245,594!
134

135
  (void)taosThreadMutexLock(&queue->mutex);
10,245,594✔
136
  int32_t numOfItems = queue->numOfItems;
10,255,512✔
137
  (void)taosThreadMutexUnlock(&queue->mutex);
10,255,512✔
138

139
  uTrace("queue:%p, numOfItems:%d memOfItems:%" PRId64, queue, queue->numOfItems, queue->memOfItems);
10,260,501✔
140
  return numOfItems;
10,261,139✔
141
}
142

143
int64_t taosQueueMemorySize(STaosQueue *queue) {
355,562✔
144
  (void)taosThreadMutexLock(&queue->mutex);
355,562✔
145
  int64_t memOfItems = queue->memOfItems;
355,570✔
146
  (void)taosThreadMutexUnlock(&queue->mutex);
355,570✔
147
  return memOfItems;
355,570✔
148
}
149

150
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) {
27,397,131✔
151
  int64_t alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
27,397,131✔
152
  if (alloced > tsQueueMemoryAllowed) {
27,431,684✔
153
    if (itype == RPC_QITEM) {
10,067,393!
154
      uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
×
155
             tsQueueMemoryAllowed);
156
      (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
157
      return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
×
158
    }
159
  }
160

161
  *item = NULL;
27,431,684✔
162
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
27,431,684✔
163
  if (pNode == NULL) {
27,425,252!
164
    (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
165
    return terrno;
×
166
  }
167

168
  pNode->dataSize = dataSize;
27,425,252✔
169
  pNode->size = size;
27,425,252✔
170
  pNode->itype = itype;
27,425,252✔
171
  pNode->timestamp = taosGetTimestampUs();
27,423,752✔
172
  uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
27,423,752✔
173
  *item = pNode->item;
27,425,008✔
174
  return 0;
27,425,008✔
175
}
176

177
void taosFreeQitem(void *pItem) {
27,407,328✔
178
  if (pItem == NULL) return;
27,407,328!
179

180
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
27,407,328✔
181
  int64_t     alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
27,407,328✔
182
  uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
27,423,429✔
183

184
  taosMemoryFree(pNode);
27,423,431✔
185
}
186

187
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
27,394,159✔
188
  int32_t     code = 0;
27,394,159✔
189
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
27,394,159✔
190
  pNode->timestamp = taosGetTimestampUs();
27,401,582✔
191
  pNode->next = NULL;
27,401,582✔
192

193
  (void)taosThreadMutexLock(&queue->mutex);
27,401,582✔
194
  if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) {
27,409,802!
195
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
196
    uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
×
197
           queue->memLimit, tstrerror(code));
198

199
    (void)taosThreadMutexUnlock(&queue->mutex);
×
200
    return code;
×
201
  } else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) {
27,409,802!
202
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
203
    uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
×
204
           queue->itemLimit, tstrerror(code));
205
    (void)taosThreadMutexUnlock(&queue->mutex);
×
206
    return code;
×
207
  }
208

209
  if (queue->tail) {
27,409,802✔
210
    queue->tail->next = pNode;
4,823,795✔
211
    queue->tail = pNode;
4,823,795✔
212
  } else {
213
    queue->head = pNode;
22,586,007✔
214
    queue->tail = pNode;
22,586,007✔
215
  }
216
  queue->numOfItems++;
27,409,802✔
217
  queue->memOfItems += (pNode->size + pNode->dataSize);
27,409,802✔
218
  if (queue->qset) {
27,409,802✔
219
    (void)atomic_add_fetch_32(&queue->qset->numOfItems, 1);
22,354,727✔
220
  }
221

222
  uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
27,411,343✔
223

224
  (void)taosThreadMutexUnlock(&queue->mutex);
27,411,346✔
225

226
  if (queue->qset) {
27,419,145✔
227
    if (tsem_post(&queue->qset->sem) != 0) {
22,356,339!
228
      uError("failed to post semaphore for queue set:%p", queue->qset);
×
229
    }
230
  }
231
  return code;
27,416,230✔
232
}
233

234
void taosReadQitem(STaosQueue *queue, void **ppItem) {
5,427,199✔
235
  STaosQnode *pNode = NULL;
5,427,199✔
236

237
  (void)taosThreadMutexLock(&queue->mutex);
5,427,199✔
238

239
  if (queue->head) {
5,431,536✔
240
    pNode = queue->head;
4,871,632✔
241
    *ppItem = pNode->item;
4,871,632✔
242
    queue->head = pNode->next;
4,871,632✔
243
    if (queue->head == NULL) {
4,871,632✔
244
      queue->tail = NULL;
1,546,703✔
245
    }
246
    queue->numOfItems--;
4,871,632✔
247
    queue->memOfItems -= (pNode->size + pNode->dataSize);
4,871,632✔
248
    if (queue->qset) {
4,871,632!
249
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
×
250
    }
251
    uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
4,871,632✔
252
           queue->memOfItems);
253
  }
254

255
  (void)taosThreadMutexUnlock(&queue->mutex);
5,431,536✔
256
}
5,433,036✔
257

258
int32_t taosAllocateQall(STaosQall **qall) {
176,838✔
259
  *qall = taosMemoryCalloc(1, sizeof(STaosQall));
176,838✔
260
  if (*qall == NULL) {
176,839!
261
    return terrno;
×
262
  }
263
  return 0;
176,839✔
264
}
265

266
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
176,590✔
267

268
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
348,366✔
269
  int32_t numOfItems = 0;
348,366✔
270
  bool    empty;
271

272
  (void)taosThreadMutexLock(&queue->mutex);
348,366✔
273

274
  empty = queue->head == NULL;
348,372✔
275
  if (!empty) {
348,372✔
276
    memset(qall, 0, sizeof(STaosQall));
112,063✔
277
    qall->current = queue->head;
112,063✔
278
    qall->start = queue->head;
112,063✔
279
    qall->numOfItems = queue->numOfItems;
112,063✔
280
    qall->memOfItems = queue->memOfItems;
112,063✔
281

282
    qall->unAccessedNumOfItems = queue->numOfItems;
112,063✔
283
    qall->unAccessMemOfItems = queue->memOfItems;
112,063✔
284

285
    numOfItems = qall->numOfItems;
112,063✔
286

287
    queue->head = NULL;
112,063✔
288
    queue->tail = NULL;
112,063✔
289
    queue->numOfItems = 0;
112,063✔
290
    queue->memOfItems = 0;
112,063✔
291
    uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems,
112,063✔
292
           queue->memOfItems);
293
    if (queue->qset) {
112,063!
294
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
×
295
    }
296
  }
297

298
  (void)taosThreadMutexUnlock(&queue->mutex);
348,372✔
299

300
  // if source queue is empty, we set destination qall to empty too.
301
  if (empty) {
348,372✔
302
    qall->current = NULL;
236,309✔
303
    qall->start = NULL;
236,309✔
304
    qall->numOfItems = 0;
236,309✔
305
  }
306
  return numOfItems;
348,372✔
307
}
308

309
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
8,273,741✔
310
  STaosQnode *pNode;
311
  int32_t     num = 0;
8,273,741✔
312

313
  pNode = qall->current;
8,273,741✔
314
  if (pNode) qall->current = pNode->next;
8,273,741✔
315

316
  if (pNode) {
8,273,741✔
317
    *ppItem = pNode->item;
7,932,611✔
318
    num = 1;
7,932,611✔
319

320
    qall->unAccessedNumOfItems -= 1;
7,932,611✔
321
    qall->unAccessMemOfItems -= pNode->dataSize;
7,932,611✔
322

323
    uTrace("item:%p is fetched", *ppItem);
7,932,611✔
324
  } else {
325
    *ppItem = NULL;
341,130✔
326
  }
327

328
  return num;
8,275,996✔
329
}
330

331
int32_t taosOpenQset(STaosQset **qset) {
87,885✔
332
  *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
87,885✔
333
  if (*qset == NULL) {
87,885!
334
    return terrno;
×
335
  }
336

337
  (void)taosThreadMutexInit(&(*qset)->mutex, NULL);
87,885✔
338
  if (tsem_init(&(*qset)->sem, 0, 0) != 0) {
87,885!
339
    taosMemoryFree(*qset);
×
340
    return terrno;
×
341
  }
342

343
  uDebug("qset:%p is opened", qset);
87,885✔
344
  return 0;
87,885✔
345
}
346

347
void taosCloseQset(STaosQset *qset) {
87,886✔
348
  if (qset == NULL) return;
87,886!
349

350
  // remove all the queues from qset
351
  (void)taosThreadMutexLock(&qset->mutex);
87,886✔
352
  while (qset->head) {
162,055✔
353
    STaosQueue *queue = qset->head;
74,169✔
354
    qset->head = qset->head->next;
74,169✔
355

356
    queue->qset = NULL;
74,169✔
357
    queue->next = NULL;
74,169✔
358
  }
359
  (void)taosThreadMutexUnlock(&qset->mutex);
87,886✔
360

361
  (void)taosThreadMutexDestroy(&qset->mutex);
87,886✔
362
  if (tsem_destroy(&qset->sem) != 0) {
87,886!
363
    uError("failed to destroy semaphore for qset:%p", qset);
×
364
  }
365
  taosMemoryFree(qset);
87,885✔
366
  uDebug("qset:%p is closed", qset);
87,886✔
367
}
368

369
// tsem_post 'qset->sem', so that reader threads waiting for it
370
// resumes execution and return, should only be used to signal the
371
// thread to exit.
372
void taosQsetThreadResume(STaosQset *qset) {
688,779✔
373
  uDebug("qset:%p, it will exit", qset);
688,779✔
374
  if (tsem_post(&qset->sem) != 0) {
688,779!
375
    uError("failed to post semaphore for qset:%p", qset);
×
376
  }
377
}
688,780✔
378

379
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
110,980✔
380
  if (queue->qset) return TSDB_CODE_INVALID_PARA;
110,980!
381

382
  (void)taosThreadMutexLock(&qset->mutex);
110,980✔
383

384
  queue->next = qset->head;
110,981✔
385
  queue->ahandle = ahandle;
110,981✔
386
  qset->head = queue;
110,981✔
387
  qset->numOfQueues++;
110,981✔
388

389
  (void)taosThreadMutexLock(&queue->mutex);
110,981✔
390
  (void)atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
110,981✔
391
  queue->qset = qset;
110,981✔
392
  (void)taosThreadMutexUnlock(&queue->mutex);
110,981✔
393

394
  (void)taosThreadMutexUnlock(&qset->mutex);
110,981✔
395

396
  uTrace("queue:%p is added into qset:%p", queue, qset);
110,981✔
397
  return 0;
110,981✔
398
}
399

400
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
36,810✔
401
  STaosQueue *tqueue = NULL;
36,810✔
402

403
  (void)taosThreadMutexLock(&qset->mutex);
36,810✔
404

405
  if (qset->head) {
36,812!
406
    if (qset->head == queue) {
36,812✔
407
      qset->head = qset->head->next;
26,498✔
408
      tqueue = queue;
26,498✔
409
    } else {
410
      STaosQueue *prev = qset->head;
10,314✔
411
      tqueue = qset->head->next;
10,314✔
412
      while (tqueue) {
30,766!
413
        if (tqueue == queue) {
30,766✔
414
          prev->next = tqueue->next;
10,314✔
415
          break;
10,314✔
416
        } else {
417
          prev = tqueue;
20,452✔
418
          tqueue = tqueue->next;
20,452✔
419
        }
420
      }
421
    }
422

423
    if (tqueue) {
36,812✔
424
      if (qset->current == queue) qset->current = tqueue->next;
36,810✔
425
      qset->numOfQueues--;
36,810✔
426

427
      (void)taosThreadMutexLock(&queue->mutex);
36,810✔
428
      (void)atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
36,812✔
429
      queue->qset = NULL;
36,812✔
430
      queue->next = NULL;
36,812✔
431
      (void)taosThreadMutexUnlock(&queue->mutex);
36,812✔
432
    }
433
  }
434

435
  (void)taosThreadMutexUnlock(&qset->mutex);
36,815✔
436

437
  uDebug("queue:%p is removed from qset:%p", queue, qset);
36,813✔
438
}
36,813✔
439

440
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) {
15,223,732✔
441
  STaosQnode *pNode = NULL;
15,223,732✔
442
  int32_t     code = 0;
15,223,732✔
443

444
  if (tsem_wait(&qset->sem) != 0) {
15,223,732!
445
    uError("failed to wait semaphore for qset:%p", qset);
×
446
  }
447

448
  (void)taosThreadMutexLock(&qset->mutex);
15,194,696✔
449

450
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
19,076,846✔
451
    if (qset->current == NULL) qset->current = qset->head;
18,447,961✔
452
    STaosQueue *queue = qset->current;
18,447,961✔
453
    if (queue) qset->current = queue->next;
18,447,961✔
454
    if (queue == NULL) break;
18,447,961!
455
    if (queue->head == NULL) continue;
18,447,961✔
456

457
    (void)taosThreadMutexLock(&queue->mutex);
14,595,742✔
458

459
    if (queue->head) {
14,595,770!
460
      pNode = queue->head;
14,595,774✔
461
      *ppItem = pNode->item;
14,595,774✔
462
      qinfo->ahandle = queue->ahandle;
14,595,774✔
463
      qinfo->fp = queue->itemFp;
14,595,774✔
464
      qinfo->queue = queue;
14,595,774✔
465
      qinfo->timestamp = pNode->timestamp;
14,595,774✔
466

467
      queue->head = pNode->next;
14,595,774✔
468
      if (queue->head == NULL) queue->tail = NULL;
14,595,774✔
469
      // queue->numOfItems--;
470
      queue->memOfItems -= (pNode->size + pNode->dataSize);
14,595,774✔
471
      (void)atomic_sub_fetch_32(&qset->numOfItems, 1);
14,595,774✔
472
      code = 1;
14,595,776✔
473
      uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,
14,595,776✔
474
             queue->memOfItems);
475
    }
476

477
    (void)taosThreadMutexUnlock(&queue->mutex);
14,595,773✔
478
    if (pNode) break;
14,595,795!
479
  }
480

481
  (void)taosThreadMutexUnlock(&qset->mutex);
15,224,683✔
482

483
  return code;
15,223,781✔
484
}
485

486
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) {
7,347,427✔
487
  STaosQueue *queue;
488
  int32_t     code = 0;
7,347,427✔
489

490
  if (tsem_wait(&qset->sem) != 0) {
7,347,427!
491
    uError("failed to wait semaphore for qset:%p", qset);
×
492
  }
493
  (void)taosThreadMutexLock(&qset->mutex);
7,345,344✔
494

495
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
7,537,940✔
496
    if (qset->current == NULL) qset->current = qset->head;
7,480,474✔
497
    queue = qset->current;
7,480,474✔
498
    if (queue) qset->current = queue->next;
7,480,474!
499
    if (queue == NULL) break;
7,480,474!
500
    if (queue->head == NULL) continue;
7,480,474✔
501

502
    (void)taosThreadMutexLock(&queue->mutex);
7,289,335✔
503

504
    if (queue->head) {
7,289,526!
505
      qall->current = queue->head;
7,289,569✔
506
      qall->start = queue->head;
7,289,569✔
507
      qall->numOfItems = queue->numOfItems;
7,289,569✔
508
      qall->memOfItems = queue->memOfItems;
7,289,569✔
509
      qall->unAccessedNumOfItems = queue->numOfItems;
7,289,569✔
510
      qall->unAccessMemOfItems = queue->memOfItems;
7,289,569✔
511

512
      code = qall->numOfItems;
7,289,569✔
513
      qinfo->ahandle = queue->ahandle;
7,289,569✔
514
      qinfo->fp = queue->itemsFp;
7,289,569✔
515
      qinfo->queue = queue;
7,289,569✔
516
      qinfo->timestamp = queue->head->timestamp;
7,289,569✔
517

518
      queue->head = NULL;
7,289,569✔
519
      queue->tail = NULL;
7,289,569✔
520
      // queue->numOfItems = 0;
521
      queue->memOfItems = 0;
7,289,569✔
522
      uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems);
7,289,569✔
523

524
      (void)atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
7,289,579✔
525
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
7,760,814✔
526
        if (tsem_wait(&qset->sem) != 0) {
470,949!
527
          uError("failed to wait semaphore for qset:%p", qset);
×
528
        }
529
      }
530
    }
531

532
    (void)taosThreadMutexUnlock(&queue->mutex);
7,289,822✔
533

534
    if (code != 0) break;
7,289,483!
535
  }
536

537
  (void)taosThreadMutexUnlock(&qset->mutex);
7,347,061✔
538
  return code;
7,347,511✔
539
}
540

541
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
446,945✔
542
int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; }
×
543

544
int64_t taosQallUnAccessedItemSize(STaosQall *qall) { return qall->unAccessedNumOfItems; }
22,840✔
545
int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfItems; }
14,786✔
546

547
void    taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
×
548
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
12,271✔
549

550
void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId) { pQueue->threadId = threadId; }
61,396✔
551

552
int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; }
122,702✔
553

554
#if 0
555

556
void taosResetQsetThread(STaosQset *qset, void *pItem) {
557
  if (pItem == NULL) return;
558
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
559

560
  (void)taosThreadMutexLock(&qset->mutex);
561
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
562
    tsem_post(&qset->sem);
563
  }
564
  (void)taosThreadMutexUnlock(&qset->mutex);
565
}
566

567
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc