• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3587

23 Jan 2025 02:42AM UTC coverage: 63.549% (-0.09%) from 63.643%
#3587

push

travis-ci

web-flow
Merge pull request #29637 from taosdata/docs/TS-5944

docs/TS-5944 Correct typos in the descriptions of maximum and minimum values for taosBenchmark

141306 of 285630 branches covered (49.47%)

Branch coverage included in aggregate %.

219951 of 282844 relevant lines covered (77.76%)

19107446.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.85
/source/util/src/tqueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "taoserror.h"
18
#include "tlog.h"
19
#include "tqueue.h"
20
#include "tutil.h"
21

22
int64_t tsQueueMemoryAllowed = 0;
23
int64_t tsQueueMemoryUsed = 0;
24

25
int64_t tsApplyMemoryAllowed = 0;
26
int64_t tsApplyMemoryUsed = 0;
27
struct STaosQueue {
28
  STaosQnode   *head;
29
  STaosQnode   *tail;
30
  STaosQueue   *next;     // for queue set
31
  STaosQset    *qset;     // for queue set
32
  void         *ahandle;  // for queue set
33
  FItem         itemFp;
34
  FItems        itemsFp;
35
  TdThreadMutex mutex;
36
  int64_t       memOfItems;
37
  int32_t       numOfItems;
38
  int64_t       threadId;
39
  int64_t       memLimit;
40
  int64_t       itemLimit;
41
};
42

43
struct STaosQset {
44
  STaosQueue   *head;
45
  STaosQueue   *current;
46
  TdThreadMutex mutex;
47
  tsem_t        sem;
48
  int32_t       numOfQueues;
49
  int32_t       numOfItems;
50
};
51

52
struct STaosQall {
53
  STaosQnode *current;
54
  STaosQnode *start;
55
  int32_t     numOfItems;
56
  int64_t     memOfItems;
57
  int32_t     unAccessedNumOfItems;
58
  int64_t     unAccessMemOfItems;
59
};
60

61
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
29,838✔
62
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }
29,836✔
63

64
int32_t taosOpenQueue(STaosQueue **queue) {
8,252,767✔
65
  *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
8,252,767!
66
  if (*queue == NULL) {
8,262,851!
67
    return terrno;
×
68
  }
69

70
  int32_t code = taosThreadMutexInit(&(*queue)->mutex, NULL);
8,262,851✔
71
  if (code) {
8,262,861!
72
    taosMemoryFreeClear(*queue);
×
73
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
74
  }
75

76
  uDebug("queue:%p is opened", queue);
8,262,861✔
77
  return 0;
8,254,662✔
78
}
79

80
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
103,956✔
81
  if (queue == NULL) return;
103,956!
82
  queue->itemFp = itemFp;
103,956✔
83
  queue->itemsFp = itemsFp;
103,956✔
84
}
85

86
void taosCloseQueue(STaosQueue *queue) {
8,264,607✔
87
  if (queue == NULL) return;
8,264,607!
88
  STaosQnode *pTemp;
89
  STaosQset  *qset;
90

91
  (void)taosThreadMutexLock(&queue->mutex);
8,264,607✔
92
  STaosQnode *pNode = queue->head;
8,267,074✔
93
  queue->head = NULL;
8,267,074✔
94
  qset = queue->qset;
8,267,074✔
95
  (void)taosThreadMutexUnlock(&queue->mutex);
8,267,074✔
96

97
  if (queue->qset) {
8,267,552✔
98
    taosRemoveFromQset(qset, queue);
34,903✔
99
  }
100

101
  while (pNode) {
8,264,408✔
102
    pTemp = pNode;
510✔
103
    pNode = pNode->next;
510✔
104
    taosMemoryFree(pTemp);
510!
105
  }
106

107
  (void)taosThreadMutexDestroy(&queue->mutex);
8,263,898✔
108
  taosMemoryFree(queue);
8,262,045!
109

110
  uDebug("queue:%p is closed", queue);
8,265,690✔
111
}
112

113
bool taosQueueEmpty(STaosQueue *queue) {
28,495,547✔
114
  if (queue == NULL) return true;
28,495,547!
115

116
  bool empty = false;
28,495,547✔
117
  (void)taosThreadMutexLock(&queue->mutex);
28,495,547✔
118
  if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) {
28,534,253!
119
    empty = true;
10,491,368✔
120
  }
121
  (void)taosThreadMutexUnlock(&queue->mutex);
28,534,253✔
122

123
  return empty;
28,537,269✔
124
}
125

126
void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
64,137,698✔
127
  if (queue == NULL) return;
64,137,698!
128

129
  (void)taosThreadMutexLock(&queue->mutex);
64,137,698✔
130
  queue->numOfItems -= items;
64,157,849✔
131
  (void)taosThreadMutexUnlock(&queue->mutex);
64,157,849✔
132
}
133

134
int32_t taosQueueItemSize(STaosQueue *queue) {
37,524,205✔
135
  if (queue == NULL) return 0;
37,524,205!
136

137
  (void)taosThreadMutexLock(&queue->mutex);
37,524,205✔
138
  int32_t numOfItems = queue->numOfItems;
37,529,924✔
139
  (void)taosThreadMutexUnlock(&queue->mutex);
37,529,924✔
140

141
  uTrace("queue:%p, numOfItems:%d memOfItems:%" PRId64, queue, queue->numOfItems, queue->memOfItems);
37,531,134✔
142
  return numOfItems;
37,529,386✔
143
}
144

145
int64_t taosQueueMemorySize(STaosQueue *queue) {
1,378,853✔
146
  (void)taosThreadMutexLock(&queue->mutex);
1,378,853✔
147
  int64_t memOfItems = queue->memOfItems;
1,378,988✔
148
  (void)taosThreadMutexUnlock(&queue->mutex);
1,378,988✔
149
  return memOfItems;
1,379,030✔
150
}
151

152
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) {
89,515,260✔
153
  int64_t alloced = -1;
89,515,260✔
154

155
  if (alloced > tsQueueMemoryAllowed) {
89,515,260!
156
    alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
157
    if (itype == RPC_QITEM) {
×
158
      uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
×
159
             tsQueueMemoryAllowed);
160
      (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
161
      return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
×
162
    }
163
  } else if (itype == APPLY_QITEM) {
89,515,260✔
164
    alloced = atomic_add_fetch_64(&tsApplyMemoryUsed, size + dataSize);
3,319,994✔
165
    if (alloced > tsApplyMemoryAllowed) {
3,320,028!
166
      uDebug("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
×
167
             tsApplyMemoryAllowed);
168
      (void)atomic_sub_fetch_64(&tsApplyMemoryUsed, size + dataSize);
×
169
      return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
×
170
    }
171
  }
172

173
  *item = NULL;
89,515,294✔
174
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
89,515,294✔
175
  if (pNode == NULL) {
89,580,628!
176
    if (itype == RPC_QITEM) {
×
177
      (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
178
    } else if (itype == APPLY_QITEM) {
×
179
      (void)atomic_sub_fetch_64(&tsApplyMemoryUsed, size + dataSize);
×
180
    }
181
    return terrno;
×
182
  }
183

184
  pNode->dataSize = dataSize;
89,580,628✔
185
  pNode->size = size;
89,580,628✔
186
  pNode->itype = itype;
89,580,628✔
187
  pNode->timestamp = taosGetTimestampUs();
89,568,607✔
188
  uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
89,568,607✔
189
  *item = pNode->item;
89,618,562✔
190
  return 0;
89,618,562✔
191
}
192

193
void taosFreeQitem(void *pItem) {
89,636,848✔
194
  if (pItem == NULL) return;
89,636,848!
195

196
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
89,636,848✔
197
  int64_t     alloced = -1;
89,636,848✔
198
  if (pNode->itype == RPC_QITEM) {
89,636,848✔
199
    alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
50,714,373✔
200
  } else if (pNode->itype == APPLY_QITEM) {
38,922,475✔
201
    alloced = atomic_sub_fetch_64(&tsApplyMemoryUsed, pNode->size + pNode->dataSize);
3,319,917✔
202
  }
203
  uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
89,646,494✔
204

205
  taosMemoryFree(pNode);
89,646,495!
206
}
207

208
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
89,501,697✔
209
  int32_t     code = 0;
89,501,697✔
210
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
89,501,697✔
211
  pNode->timestamp = taosGetTimestampUs();
89,555,301✔
212
  pNode->next = NULL;
89,555,301✔
213

214
  (void)taosThreadMutexLock(&queue->mutex);
89,555,301✔
215
  if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) {
89,634,925!
216
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
217
    uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
×
218
           queue->memLimit, tstrerror(code));
219

220
    (void)taosThreadMutexUnlock(&queue->mutex);
×
221
    return code;
×
222
  } else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) {
89,634,925!
223
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
224
    uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
×
225
           queue->itemLimit, tstrerror(code));
226
    (void)taosThreadMutexUnlock(&queue->mutex);
×
227
    return code;
×
228
  }
229

230
  if (queue->tail) {
89,634,925✔
231
    queue->tail->next = pNode;
21,869,749✔
232
    queue->tail = pNode;
21,869,749✔
233
  } else {
234
    queue->head = pNode;
67,765,176✔
235
    queue->tail = pNode;
67,765,176✔
236
  }
237
  queue->numOfItems++;
89,634,925✔
238
  queue->memOfItems += (pNode->size + pNode->dataSize);
89,634,925✔
239
  if (queue->qset) {
89,634,925✔
240
    (void)atomic_add_fetch_32(&queue->qset->numOfItems, 1);
70,943,295✔
241
  }
242

243
  uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
89,670,998✔
244

245
  (void)taosThreadMutexUnlock(&queue->mutex);
89,671,001✔
246

247
  if (queue->qset) {
89,642,073✔
248
    if (tsem_post(&queue->qset->sem) != 0) {
70,978,934!
249
      uError("failed to post semaphore for queue set:%p", queue->qset);
×
250
    }
251
  }
252
  return code;
89,617,572✔
253
}
254

255
void taosReadQitem(STaosQueue *queue, void **ppItem) {
18,644,041✔
256
  STaosQnode *pNode = NULL;
18,644,041✔
257

258
  (void)taosThreadMutexLock(&queue->mutex);
18,644,041✔
259

260
  if (queue->head) {
18,655,714✔
261
    pNode = queue->head;
18,053,191✔
262
    *ppItem = pNode->item;
18,053,191✔
263
    queue->head = pNode->next;
18,053,191✔
264
    if (queue->head == NULL) {
18,053,191✔
265
      queue->tail = NULL;
6,147,487✔
266
    }
267
    queue->numOfItems--;
18,053,191✔
268
    queue->memOfItems -= (pNode->size + pNode->dataSize);
18,053,191✔
269
    if (queue->qset) {
18,053,191!
270
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
×
271
    }
272
    uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
18,053,191✔
273
           queue->memOfItems);
274
  }
275

276
  (void)taosThreadMutexUnlock(&queue->mutex);
18,655,714✔
277
}
18,655,995✔
278

279
int32_t taosAllocateQall(STaosQall **qall) {
176,965✔
280
  *qall = taosMemoryCalloc(1, sizeof(STaosQall));
176,965!
281
  if (*qall == NULL) {
176,995!
282
    return terrno;
×
283
  }
284
  return 0;
176,995✔
285
}
286

287
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
176,484!
288

289
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
447,180✔
290
  int32_t numOfItems = 0;
447,180✔
291
  bool    empty;
292

293
  (void)taosThreadMutexLock(&queue->mutex);
447,180✔
294

295
  empty = queue->head == NULL;
447,305✔
296
  if (!empty) {
447,305✔
297
    memset(qall, 0, sizeof(STaosQall));
149,967✔
298
    qall->current = queue->head;
149,967✔
299
    qall->start = queue->head;
149,967✔
300
    qall->numOfItems = queue->numOfItems;
149,967✔
301
    qall->memOfItems = queue->memOfItems;
149,967✔
302

303
    qall->unAccessedNumOfItems = queue->numOfItems;
149,967✔
304
    qall->unAccessMemOfItems = queue->memOfItems;
149,967✔
305

306
    numOfItems = qall->numOfItems;
149,967✔
307

308
    queue->head = NULL;
149,967✔
309
    queue->tail = NULL;
149,967✔
310
    queue->numOfItems = 0;
149,967✔
311
    queue->memOfItems = 0;
149,967✔
312
    uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems,
149,967✔
313
           queue->memOfItems);
314
    if (queue->qset) {
149,967!
315
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
×
316
    }
317
  }
318

319
  (void)taosThreadMutexUnlock(&queue->mutex);
447,305✔
320

321
  // if source queue is empty, we set destination qall to empty too.
322
  if (empty) {
447,286✔
323
    qall->current = NULL;
297,335✔
324
    qall->start = NULL;
297,335✔
325
    qall->numOfItems = 0;
297,335✔
326
  }
327
  return numOfItems;
447,286✔
328
}
329

330
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
41,137,089✔
331
  STaosQnode *pNode;
332
  int32_t     num = 0;
41,137,089✔
333

334
  pNode = qall->current;
41,137,089✔
335
  if (pNode) qall->current = pNode->next;
41,137,089✔
336

337
  if (pNode) {
41,137,089✔
338
    *ppItem = pNode->item;
40,620,824✔
339
    num = 1;
40,620,824✔
340

341
    qall->unAccessedNumOfItems -= 1;
40,620,824✔
342
    qall->unAccessMemOfItems -= pNode->dataSize;
40,620,824✔
343

344
    uTrace("item:%p is fetched", *ppItem);
40,620,824✔
345
  } else {
346
    *ppItem = NULL;
516,265✔
347
  }
348

349
  return num;
41,149,285✔
350
}
351

352
int32_t taosOpenQset(STaosQset **qset) {
81,352✔
353
  *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
81,352!
354
  if (*qset == NULL) {
81,352!
355
    return terrno;
×
356
  }
357

358
  (void)taosThreadMutexInit(&(*qset)->mutex, NULL);
81,352✔
359
  if (tsem_init(&(*qset)->sem, 0, 0) != 0) {
81,352!
360
    taosMemoryFree(*qset);
×
361
    return terrno;
×
362
  }
363

364
  uDebug("qset:%p is opened", qset);
81,350✔
365
  return 0;
81,350✔
366
}
367

368
void taosCloseQset(STaosQset *qset) {
81,351✔
369
  if (qset == NULL) return;
81,351!
370

371
  // remove all the queues from qset
372
  (void)taosThreadMutexLock(&qset->mutex);
81,351✔
373
  while (qset->head) {
150,404✔
374
    STaosQueue *queue = qset->head;
69,052✔
375
    qset->head = qset->head->next;
69,052✔
376

377
    queue->qset = NULL;
69,052✔
378
    queue->next = NULL;
69,052✔
379
  }
380
  (void)taosThreadMutexUnlock(&qset->mutex);
81,352✔
381

382
  (void)taosThreadMutexDestroy(&qset->mutex);
81,352✔
383
  if (tsem_destroy(&qset->sem) != 0) {
81,351!
384
    uError("failed to destroy semaphore for qset:%p", qset);
×
385
  }
386
  taosMemoryFree(qset);
81,351!
387
  uDebug("qset:%p is closed", qset);
81,352✔
388
}
389

390
// tsem_post 'qset->sem', so that reader threads waiting for it
391
// resumes execution and return, should only be used to signal the
392
// thread to exit.
393
void taosQsetThreadResume(STaosQset *qset) {
611,458✔
394
  uDebug("qset:%p, it will exit", qset);
611,458✔
395
  if (tsem_post(&qset->sem) != 0) {
611,458!
396
    uError("failed to post semaphore for qset:%p", qset);
×
397
  }
398
}
611,457✔
399

400
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
103,955✔
401
  if (queue->qset) return TSDB_CODE_INVALID_PARA;
103,955!
402

403
  (void)taosThreadMutexLock(&qset->mutex);
103,955✔
404

405
  queue->next = qset->head;
103,957✔
406
  queue->ahandle = ahandle;
103,957✔
407
  qset->head = queue;
103,957✔
408
  qset->numOfQueues++;
103,957✔
409

410
  (void)taosThreadMutexLock(&queue->mutex);
103,957✔
411
  (void)atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
103,958✔
412
  queue->qset = qset;
103,958✔
413
  (void)taosThreadMutexUnlock(&queue->mutex);
103,958✔
414

415
  (void)taosThreadMutexUnlock(&qset->mutex);
103,958✔
416

417
  uTrace("queue:%p is added into qset:%p", queue, qset);
103,955✔
418
  return 0;
103,956✔
419
}
420

421
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
34,902✔
422
  STaosQueue *tqueue = NULL;
34,902✔
423

424
  (void)taosThreadMutexLock(&qset->mutex);
34,902✔
425

426
  if (qset->head) {
34,904!
427
    if (qset->head == queue) {
34,905✔
428
      qset->head = qset->head->next;
24,637✔
429
      tqueue = queue;
24,637✔
430
    } else {
431
      STaosQueue *prev = qset->head;
10,268✔
432
      tqueue = qset->head->next;
10,268✔
433
      while (tqueue) {
36,027!
434
        if (tqueue == queue) {
36,027✔
435
          prev->next = tqueue->next;
10,268✔
436
          break;
10,268✔
437
        } else {
438
          prev = tqueue;
25,759✔
439
          tqueue = tqueue->next;
25,759✔
440
        }
441
      }
442
    }
443

444
    if (tqueue) {
34,905!
445
      if (qset->current == queue) qset->current = tqueue->next;
34,905✔
446
      qset->numOfQueues--;
34,905✔
447

448
      (void)taosThreadMutexLock(&queue->mutex);
34,905✔
449
      (void)atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
34,904✔
450
      queue->qset = NULL;
34,904✔
451
      queue->next = NULL;
34,904✔
452
      (void)taosThreadMutexUnlock(&queue->mutex);
34,904✔
453
    }
454
  }
455

456
  (void)taosThreadMutexUnlock(&qset->mutex);
34,903✔
457

458
  uDebug("queue:%p is removed from qset:%p", queue, qset);
34,899✔
459
}
34,899✔
460

461
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) {
31,498,287✔
462
  STaosQnode *pNode = NULL;
31,498,287✔
463
  int32_t     code = 0;
31,498,287✔
464

465
  if (tsem_wait(&qset->sem) != 0) {
31,498,287!
466
    uError("failed to wait semaphore for qset:%p", qset);
×
467
  }
468

469
  (void)taosThreadMutexLock(&qset->mutex);
31,469,841✔
470

471
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
171,483,352✔
472
    if (qset->current == NULL) qset->current = qset->head;
170,925,844✔
473
    STaosQueue *queue = qset->current;
170,925,844✔
474
    if (queue) qset->current = queue->next;
170,925,844✔
475
    if (queue == NULL) break;
170,925,844!
476
    if (queue->head == NULL) continue;
170,925,844✔
477

478
    (void)taosThreadMutexLock(&queue->mutex);
30,947,908✔
479

480
    if (queue->head) {
30,950,184!
481
      pNode = queue->head;
30,950,191✔
482
      *ppItem = pNode->item;
30,950,191✔
483
      qinfo->ahandle = queue->ahandle;
30,950,191✔
484
      qinfo->fp = queue->itemFp;
30,950,191✔
485
      qinfo->queue = queue;
30,950,191✔
486
      qinfo->timestamp = pNode->timestamp;
30,950,191✔
487

488
      queue->head = pNode->next;
30,950,191✔
489
      if (queue->head == NULL) queue->tail = NULL;
30,950,191✔
490
      // queue->numOfItems--;
491
      queue->memOfItems -= (pNode->size + pNode->dataSize);
30,950,191✔
492
      (void)atomic_sub_fetch_32(&qset->numOfItems, 1);
30,950,191✔
493
      code = 1;
30,950,261✔
494
      uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,
30,950,261✔
495
             queue->memOfItems);
496
    }
497

498
    (void)taosThreadMutexUnlock(&queue->mutex);
30,950,254✔
499
    if (pNode) break;
30,950,127!
500
  }
501

502
  (void)taosThreadMutexUnlock(&qset->mutex);
31,507,659✔
503

504
  return code;
31,504,418✔
505
}
506

507
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) {
33,258,882✔
508
  STaosQueue *queue;
509
  int32_t     code = 0;
33,258,882✔
510

511
  if (tsem_wait(&qset->sem) != 0) {
33,258,882!
512
    uError("failed to wait semaphore for qset:%p", qset);
×
513
  }
514
  (void)taosThreadMutexLock(&qset->mutex);
33,251,434✔
515

516
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
83,458,988✔
517
    if (qset->current == NULL) qset->current = qset->head;
83,382,741✔
518
    queue = qset->current;
83,382,741✔
519
    if (queue) qset->current = queue->next;
83,382,741!
520
    if (queue == NULL) break;
83,382,741!
521
    if (queue->head == NULL) continue;
83,382,741✔
522

523
    (void)taosThreadMutexLock(&queue->mutex);
33,184,727✔
524

525
    if (queue->head) {
33,208,458!
526
      qall->current = queue->head;
33,208,695✔
527
      qall->start = queue->head;
33,208,695✔
528
      qall->numOfItems = queue->numOfItems;
33,208,695✔
529
      qall->memOfItems = queue->memOfItems;
33,208,695✔
530
      qall->unAccessedNumOfItems = queue->numOfItems;
33,208,695✔
531
      qall->unAccessMemOfItems = queue->memOfItems;
33,208,695✔
532

533
      code = qall->numOfItems;
33,208,695✔
534
      qinfo->ahandle = queue->ahandle;
33,208,695✔
535
      qinfo->fp = queue->itemsFp;
33,208,695✔
536
      qinfo->queue = queue;
33,208,695✔
537
      qinfo->timestamp = queue->head->timestamp;
33,208,695✔
538

539
      queue->head = NULL;
33,208,695✔
540
      queue->tail = NULL;
33,208,695✔
541
      // queue->numOfItems = 0;
542
      queue->memOfItems = 0;
33,208,695✔
543
      uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems);
33,208,695✔
544

545
      (void)atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
33,208,701✔
546
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
40,035,833✔
547
        if (tsem_wait(&qset->sem) != 0) {
6,827,440!
548
          uError("failed to wait semaphore for qset:%p", qset);
×
549
        }
550
      }
551
    }
552

553
    (void)taosThreadMutexUnlock(&queue->mutex);
33,208,156✔
554

555
    if (code != 0) break;
33,209,635!
556
  }
557

558
  (void)taosThreadMutexUnlock(&qset->mutex);
33,286,194✔
559
  return code;
33,265,003✔
560
}
561

562
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
1,563,049✔
563
int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; }
×
564

565
int64_t taosQallUnAccessedItemSize(STaosQall *qall) { return qall->unAccessedNumOfItems; }
54,466✔
566
int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfItems; }
65,474✔
567

568
void    taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
×
569
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
11,635✔
570

571
void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId) { pQueue->threadId = threadId; }
58,319✔
572

573
int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; }
116,349✔
574

575
#if 0
576

577
void taosResetQsetThread(STaosQset *qset, void *pItem) {
578
  if (pItem == NULL) return;
579
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
580

581
  (void)taosThreadMutexLock(&qset->mutex);
582
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
583
    tsem_post(&qset->sem);
584
  }
585
  (void)taosThreadMutexUnlock(&qset->mutex);
586
}
587

588
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc