• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3534

21 Nov 2024 07:36AM UTC coverage: 60.825% (+2.0%) from 58.848%
#3534

push

travis-ci

web-flow
Merge pull request #28810 from taosdata/ehn/add-sync-heartbeat-sent-time-to-log

ehn:add-sync-heartbeat-sent-time-to-log

120023 of 252376 branches covered (47.56%)

Branch coverage included in aggregate %.

43 of 47 new or added lines in 3 files covered. (91.49%)

2254 existing lines in 162 files now uncovered.

200876 of 275203 relevant lines covered (72.99%)

16110754.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.23
/source/util/src/tqueue.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tqueue.h"
18
#include "taoserror.h"
19
#include "tlog.h"
20
#include "tutil.h"
21

22
int64_t tsQueueMemoryAllowed = 0;
23
int64_t tsQueueMemoryUsed = 0;
24

25
struct STaosQueue {
26
  STaosQnode   *head;
27
  STaosQnode   *tail;
28
  STaosQueue   *next;     // for queue set
29
  STaosQset    *qset;     // for queue set
30
  void         *ahandle;  // for queue set
31
  FItem         itemFp;
32
  FItems        itemsFp;
33
  TdThreadMutex mutex;
34
  int64_t       memOfItems;
35
  int32_t       numOfItems;
36
  int64_t       threadId;
37
  int64_t       memLimit;
38
  int64_t       itemLimit;
39
};
40

41
struct STaosQset {
42
  STaosQueue   *head;
43
  STaosQueue   *current;
44
  TdThreadMutex mutex;
45
  tsem_t        sem;
46
  int32_t       numOfQueues;
47
  int32_t       numOfItems;
48
};
49

50
struct STaosQall {
51
  STaosQnode *current;
52
  STaosQnode *start;
53
  int32_t     numOfItems;
54
  int64_t     memOfItems;
55
  int32_t     unAccessedNumOfItems;
56
  int64_t     unAccessMemOfItems;
57
};
58

59
void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; }
28,464✔
60
void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; }
28,479✔
61

62
int32_t taosOpenQueue(STaosQueue **queue) {
7,392,393✔
63
  *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
7,392,393✔
64
  if (*queue == NULL) {
7,399,265!
UNCOV
65
    return terrno;
×
66
  }
67

68
  int32_t code = taosThreadMutexInit(&(*queue)->mutex, NULL);
7,399,265✔
69
  if (code) {
7,396,500!
70
    taosMemoryFreeClear(*queue);
×
71
    return (terrno = TAOS_SYSTEM_ERROR(code));
×
72
  }
73

74
  uDebug("queue:%p is opened", queue);
7,396,500✔
75
  return 0;
7,400,070✔
76
}
77

78
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
118,578✔
79
  if (queue == NULL) return;
118,578!
80
  queue->itemFp = itemFp;
118,578✔
81
  queue->itemsFp = itemsFp;
118,578✔
82
}
83

84
void taosCloseQueue(STaosQueue *queue) {
7,400,591✔
85
  if (queue == NULL) return;
7,400,591!
86
  STaosQnode *pTemp;
87
  STaosQset  *qset;
88

89
  (void)taosThreadMutexLock(&queue->mutex);
7,400,591✔
90
  STaosQnode *pNode = queue->head;
7,400,915✔
91
  queue->head = NULL;
7,400,915✔
92
  qset = queue->qset;
7,400,915✔
93
  (void)taosThreadMutexUnlock(&queue->mutex);
7,400,915✔
94

95
  if (queue->qset) {
7,400,861✔
96
    taosRemoveFromQset(qset, queue);
39,561✔
97
  }
98

99
  while (pNode) {
7,401,202✔
100
    pTemp = pNode;
599✔
101
    pNode = pNode->next;
599✔
102
    taosMemoryFree(pTemp);
599✔
103
  }
104

105
  (void)taosThreadMutexDestroy(&queue->mutex);
7,400,603✔
106
  taosMemoryFree(queue);
7,400,360✔
107

108
  uDebug("queue:%p is closed", queue);
7,400,827✔
109
}
110

111
bool taosQueueEmpty(STaosQueue *queue) {
22,400,731✔
112
  if (queue == NULL) return true;
22,400,731!
113

114
  bool empty = false;
22,400,731✔
115
  (void)taosThreadMutexLock(&queue->mutex);
22,400,731✔
116
  if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 /*&& queue->memOfItems == 0*/) {
22,439,885✔
117
    empty = true;
9,687,374✔
118
  }
119
  (void)taosThreadMutexUnlock(&queue->mutex);
22,439,885✔
120

121
  return empty;
22,445,687✔
122
}
123

124
void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
65,548,993✔
125
  if (queue == NULL) return;
65,548,993!
126

127
  (void)taosThreadMutexLock(&queue->mutex);
65,548,993✔
128
  queue->numOfItems -= items;
65,571,110✔
129
  (void)taosThreadMutexUnlock(&queue->mutex);
65,571,110✔
130
}
131

132
int32_t taosQueueItemSize(STaosQueue *queue) {
27,138,640✔
133
  if (queue == NULL) return 0;
27,138,640!
134

135
  (void)taosThreadMutexLock(&queue->mutex);
27,138,640✔
136
  int32_t numOfItems = queue->numOfItems;
27,145,413✔
137
  (void)taosThreadMutexUnlock(&queue->mutex);
27,145,413✔
138

139
  uTrace("queue:%p, numOfItems:%d memOfItems:%" PRId64, queue, queue->numOfItems, queue->memOfItems);
27,150,467✔
140
  return numOfItems;
27,151,201✔
141
}
142

143
int64_t taosQueueMemorySize(STaosQueue *queue) {
1,522,120✔
144
  (void)taosThreadMutexLock(&queue->mutex);
1,522,120✔
145
  int64_t memOfItems = queue->memOfItems;
1,522,345✔
146
  (void)taosThreadMutexUnlock(&queue->mutex);
1,522,345✔
147
  return memOfItems;
1,522,380✔
148
}
149

150
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) {
86,144,689✔
151
  int64_t alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
86,144,689✔
152
  if (alloced > tsQueueMemoryAllowed) {
86,304,314✔
153
    if (itype == RPC_QITEM) {
18,814,705!
154
      uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
×
155
             tsQueueMemoryAllowed);
156
      (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
157
      return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
×
158
    }
159
  }
160

161
  *item = NULL;
86,304,314✔
162
  STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
86,304,314✔
163
  if (pNode == NULL) {
86,214,736!
UNCOV
164
    (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
×
UNCOV
165
    return terrno;
×
166
  }
167

168
  pNode->dataSize = dataSize;
86,214,736✔
169
  pNode->size = size;
86,214,736✔
170
  pNode->itype = itype;
86,214,736✔
171
  pNode->timestamp = taosGetTimestampUs();
86,210,083✔
172
  uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
86,210,083✔
173
  *item = pNode->item;
86,256,990✔
174
  return 0;
86,256,990✔
175
}
176

177
void taosFreeQitem(void *pItem) {
86,270,857✔
178
  if (pItem == NULL) return;
86,270,857!
179

180
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
86,270,857✔
181
  int64_t     alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
86,270,857✔
182
  uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
86,303,339✔
183

184
  taosMemoryFree(pNode);
86,303,340✔
185
}
186

187
int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
86,120,757✔
188
  int32_t     code = 0;
86,120,757✔
189
  STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
86,120,757✔
190
  pNode->timestamp = taosGetTimestampUs();
86,192,608✔
191
  pNode->next = NULL;
86,192,608✔
192

193
  (void)taosThreadMutexLock(&queue->mutex);
86,192,608✔
194
  if (queue->memLimit > 0 && (queue->memOfItems + pNode->size + pNode->dataSize) > queue->memLimit) {
86,250,900!
195
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
196
    uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
×
197
           queue->memLimit, tstrerror(code));
198

199
    (void)taosThreadMutexUnlock(&queue->mutex);
×
200
    return code;
×
201
  } else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) {
86,250,900!
202
    code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
×
203
    uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
×
204
           queue->itemLimit, tstrerror(code));
205
    (void)taosThreadMutexUnlock(&queue->mutex);
×
206
    return code;
×
207
  }
208

209
  if (queue->tail) {
86,250,900✔
210
    queue->tail->next = pNode;
18,165,833✔
211
    queue->tail = pNode;
18,165,833✔
212
  } else {
213
    queue->head = pNode;
68,085,067✔
214
    queue->tail = pNode;
68,085,067✔
215
  }
216
  queue->numOfItems++;
86,250,900✔
217
  queue->memOfItems += (pNode->size + pNode->dataSize);
86,250,900✔
218
  if (queue->qset) {
86,250,900✔
219
    (void)atomic_add_fetch_32(&queue->qset->numOfItems, 1);
72,831,784✔
220
  }
221

222
  uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems);
86,279,779✔
223

224
  (void)taosThreadMutexUnlock(&queue->mutex);
86,279,779✔
225

226
  if (queue->qset) {
86,268,374✔
227
    if (tsem_post(&queue->qset->sem) != 0) {
72,860,785!
228
      uError("failed to post semaphore for queue set:%p", queue->qset);
×
229
    }
230
  }
231
  return code;
86,240,455✔
232
}
233

234
void taosReadQitem(STaosQueue *queue, void **ppItem) {
13,349,203✔
235
  STaosQnode *pNode = NULL;
13,349,203✔
236

237
  (void)taosThreadMutexLock(&queue->mutex);
13,349,203✔
238

239
  if (queue->head) {
13,352,267✔
240
    pNode = queue->head;
12,756,235✔
241
    *ppItem = pNode->item;
12,756,235✔
242
    queue->head = pNode->next;
12,756,235✔
243
    if (queue->head == NULL) {
12,756,235✔
244
      queue->tail = NULL;
5,148,755✔
245
    }
246
    queue->numOfItems--;
12,756,235✔
247
    queue->memOfItems -= (pNode->size + pNode->dataSize);
12,756,235✔
248
    if (queue->qset) {
12,756,235!
249
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
×
250
    }
251
    uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems,
12,756,235✔
252
           queue->memOfItems);
253
  }
254

255
  (void)taosThreadMutexUnlock(&queue->mutex);
13,352,267✔
256
}
13,355,011✔
257

258
int32_t taosAllocateQall(STaosQall **qall) {
189,537✔
259
  *qall = taosMemoryCalloc(1, sizeof(STaosQall));
189,537✔
260
  if (*qall == NULL) {
189,566!
261
    return terrno;
×
262
  }
263
  return 0;
189,568✔
264
}
265

266
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
189,083✔
267

268
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
509,082✔
269
  int32_t numOfItems = 0;
509,082✔
270
  bool    empty;
271

272
  (void)taosThreadMutexLock(&queue->mutex);
509,082✔
273

274
  empty = queue->head == NULL;
509,169✔
275
  if (!empty) {
509,169✔
276
    memset(qall, 0, sizeof(STaosQall));
172,519✔
277
    qall->current = queue->head;
172,519✔
278
    qall->start = queue->head;
172,519✔
279
    qall->numOfItems = queue->numOfItems;
172,519✔
280
    qall->memOfItems = queue->memOfItems;
172,519✔
281

282
    qall->unAccessedNumOfItems = queue->numOfItems;
172,519✔
283
    qall->unAccessMemOfItems = queue->memOfItems;
172,519✔
284

285
    numOfItems = qall->numOfItems;
172,519✔
286

287
    queue->head = NULL;
172,519✔
288
    queue->tail = NULL;
172,519✔
289
    queue->numOfItems = 0;
172,519✔
290
    queue->memOfItems = 0;
172,519✔
291
    uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems,
172,519✔
292
           queue->memOfItems);
293
    if (queue->qset) {
172,519!
294
      (void)atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
×
295
    }
296
  }
297

298
  (void)taosThreadMutexUnlock(&queue->mutex);
509,169✔
299

300
  // if source queue is empty, we set destination qall to empty too.
301
  if (empty) {
509,165✔
302
    qall->current = NULL;
336,683✔
303
    qall->start = NULL;
336,683✔
304
    qall->numOfItems = 0;
336,683✔
305
  }
306
  return numOfItems;
509,165✔
307
}
308

309
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
39,187,415✔
310
  STaosQnode *pNode;
311
  int32_t     num = 0;
39,187,415✔
312

313
  pNode = qall->current;
39,187,415✔
314
  if (pNode) qall->current = pNode->next;
39,187,415✔
315

316
  if (pNode) {
39,187,415✔
317
    *ppItem = pNode->item;
38,582,192✔
318
    num = 1;
38,582,192✔
319

320
    qall->unAccessedNumOfItems -= 1;
38,582,192✔
321
    qall->unAccessMemOfItems -= pNode->dataSize;
38,582,192✔
322

323
    uTrace("item:%p is fetched", *ppItem);
38,582,192✔
324
  } else {
325
    *ppItem = NULL;
605,223✔
326
  }
327

328
  return num;
39,196,953✔
329
}
330

331
int32_t taosOpenQset(STaosQset **qset) {
93,041✔
332
  *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
93,041✔
333
  if (*qset == NULL) {
93,041!
334
    return terrno;
×
335
  }
336

337
  (void)taosThreadMutexInit(&(*qset)->mutex, NULL);
93,041✔
338
  if (tsem_init(&(*qset)->sem, 0, 0) != 0) {
93,041!
339
    taosMemoryFree(*qset);
×
340
    return terrno;
×
341
  }
342

343
  uDebug("qset:%p is opened", qset);
93,041✔
344
  return 0;
93,041✔
345
}
346

347
void taosCloseQset(STaosQset *qset) {
93,041✔
348
  if (qset == NULL) return;
93,041!
349

350
  // remove all the queues from qset
351
  (void)taosThreadMutexLock(&qset->mutex);
93,041✔
352
  while (qset->head) {
172,061✔
353
    STaosQueue *queue = qset->head;
79,020✔
354
    qset->head = qset->head->next;
79,020✔
355

356
    queue->qset = NULL;
79,020✔
357
    queue->next = NULL;
79,020✔
358
  }
359
  (void)taosThreadMutexUnlock(&qset->mutex);
93,041✔
360

361
  (void)taosThreadMutexDestroy(&qset->mutex);
93,041✔
362
  if (tsem_destroy(&qset->sem) != 0) {
93,041!
363
    uError("failed to destroy semaphore for qset:%p", qset);
×
364
  }
365
  taosMemoryFree(qset);
93,041✔
366
  uDebug("qset:%p is closed", qset);
93,041✔
367
}
368

369
// tsem_post 'qset->sem', so that reader threads waiting for it
370
// resumes execution and return, should only be used to signal the
371
// thread to exit.
372
void taosQsetThreadResume(STaosQset *qset) {
762,480✔
373
  uDebug("qset:%p, it will exit", qset);
762,480✔
374
  if (tsem_post(&qset->sem) != 0) {
762,480!
375
    uError("failed to post semaphore for qset:%p", qset);
×
376
  }
377
}
762,478✔
378

379
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
118,579✔
380
  if (queue->qset) return TSDB_CODE_INVALID_PARA;
118,579!
381

382
  (void)taosThreadMutexLock(&qset->mutex);
118,579✔
383

384
  queue->next = qset->head;
118,580✔
385
  queue->ahandle = ahandle;
118,580✔
386
  qset->head = queue;
118,580✔
387
  qset->numOfQueues++;
118,580✔
388

389
  (void)taosThreadMutexLock(&queue->mutex);
118,580✔
390
  (void)atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
118,581✔
391
  queue->qset = qset;
118,581✔
392
  (void)taosThreadMutexUnlock(&queue->mutex);
118,581✔
393

394
  (void)taosThreadMutexUnlock(&qset->mutex);
118,581✔
395

396
  uTrace("queue:%p is added into qset:%p", queue, qset);
118,580✔
397
  return 0;
118,580✔
398
}
399

400
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
39,561✔
401
  STaosQueue *tqueue = NULL;
39,561✔
402

403
  (void)taosThreadMutexLock(&qset->mutex);
39,561✔
404

405
  if (qset->head) {
39,561!
406
    if (qset->head == queue) {
39,561✔
407
      qset->head = qset->head->next;
27,972✔
408
      tqueue = queue;
27,972✔
409
    } else {
410
      STaosQueue *prev = qset->head;
11,589✔
411
      tqueue = qset->head->next;
11,589✔
412
      while (tqueue) {
39,370!
413
        if (tqueue == queue) {
39,370✔
414
          prev->next = tqueue->next;
11,589✔
415
          break;
11,589✔
416
        } else {
417
          prev = tqueue;
27,781✔
418
          tqueue = tqueue->next;
27,781✔
419
        }
420
      }
421
    }
422

423
    if (tqueue) {
39,561✔
424
      if (qset->current == queue) qset->current = tqueue->next;
39,559✔
425
      qset->numOfQueues--;
39,559✔
426

427
      (void)taosThreadMutexLock(&queue->mutex);
39,559✔
428
      (void)atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
39,561✔
429
      queue->qset = NULL;
39,560✔
430
      queue->next = NULL;
39,560✔
431
      (void)taosThreadMutexUnlock(&queue->mutex);
39,560✔
432
    }
433
  }
434

435
  (void)taosThreadMutexUnlock(&qset->mutex);
39,561✔
436

437
  uDebug("queue:%p is removed from qset:%p", queue, qset);
39,559✔
438
}
39,559✔
439

440
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) {
35,600,984✔
441
  STaosQnode *pNode = NULL;
35,600,984✔
442
  int32_t     code = 0;
35,600,984✔
443

444
  if (tsem_wait(&qset->sem) != 0) {
35,600,984!
445
    uError("failed to wait semaphore for qset:%p", qset);
×
446
  }
447

448
  (void)taosThreadMutexLock(&qset->mutex);
35,564,846✔
449

450
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
174,907,457✔
451
    if (qset->current == NULL) qset->current = qset->head;
174,206,462✔
452
    STaosQueue *queue = qset->current;
174,206,462✔
453
    if (queue) qset->current = queue->next;
174,206,462✔
454
    if (queue == NULL) break;
174,206,462!
455
    if (queue->head == NULL) continue;
174,206,462✔
456

457
    (void)taosThreadMutexLock(&queue->mutex);
34,907,453✔
458

459
    if (queue->head) {
34,909,913!
460
      pNode = queue->head;
34,909,917✔
461
      *ppItem = pNode->item;
34,909,917✔
462
      qinfo->ahandle = queue->ahandle;
34,909,917✔
463
      qinfo->fp = queue->itemFp;
34,909,917✔
464
      qinfo->queue = queue;
34,909,917✔
465
      qinfo->timestamp = pNode->timestamp;
34,909,917✔
466

467
      queue->head = pNode->next;
34,909,917✔
468
      if (queue->head == NULL) queue->tail = NULL;
34,909,917✔
469
      // queue->numOfItems--;
470
      queue->memOfItems -= (pNode->size + pNode->dataSize);
34,909,917✔
471
      (void)atomic_sub_fetch_32(&qset->numOfItems, 1);
34,909,917✔
472
      code = 1;
34,909,999✔
473
      uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,
34,909,999✔
474
             queue->memOfItems);
475
    }
476

477
    (void)taosThreadMutexUnlock(&queue->mutex);
34,909,995✔
478
    if (pNode) break;
34,910,105!
479
  }
480

481
  (void)taosThreadMutexUnlock(&qset->mutex);
35,611,111✔
482

483
  return code;
35,607,107✔
484
}
485

486
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) {
30,720,386✔
487
  STaosQueue *queue;
488
  int32_t     code = 0;
30,720,386✔
489

490
  if (tsem_wait(&qset->sem) != 0) {
30,720,386!
491
    uError("failed to wait semaphore for qset:%p", qset);
×
492
  }
493
  (void)taosThreadMutexLock(&qset->mutex);
30,712,542✔
494

495
  for (int32_t i = 0; i < qset->numOfQueues; ++i) {
75,293,441✔
496
    if (qset->current == NULL) qset->current = qset->head;
75,218,438✔
497
    queue = qset->current;
75,218,438✔
498
    if (queue) qset->current = queue->next;
75,218,438!
499
    if (queue == NULL) break;
75,218,438!
500
    if (queue->head == NULL) continue;
75,218,438✔
501

502
    (void)taosThreadMutexLock(&queue->mutex);
30,644,249✔
503

504
    if (queue->head) {
30,660,233!
505
      qall->current = queue->head;
30,660,474✔
506
      qall->start = queue->head;
30,660,474✔
507
      qall->numOfItems = queue->numOfItems;
30,660,474✔
508
      qall->memOfItems = queue->memOfItems;
30,660,474✔
509
      qall->unAccessedNumOfItems = queue->numOfItems;
30,660,474✔
510
      qall->unAccessMemOfItems = queue->memOfItems;
30,660,474✔
511

512
      code = qall->numOfItems;
30,660,474✔
513
      qinfo->ahandle = queue->ahandle;
30,660,474✔
514
      qinfo->fp = queue->itemsFp;
30,660,474✔
515
      qinfo->queue = queue;
30,660,474✔
516
      qinfo->timestamp = queue->head->timestamp;
30,660,474✔
517

518
      queue->head = NULL;
30,660,474✔
519
      queue->tail = NULL;
30,660,474✔
520
      // queue->numOfItems = 0;
521
      queue->memOfItems = 0;
30,660,474✔
522
      uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems);
30,660,474✔
523

524
      (void)atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
30,660,481✔
525
      for (int32_t j = 1; j < qall->numOfItems; ++j) {
37,957,674✔
526
        if (tsem_wait(&qset->sem) != 0) {
7,297,430!
527
          uError("failed to wait semaphore for qset:%p", qset);
×
528
        }
529
      }
530
    }
531

532
    (void)taosThreadMutexUnlock(&queue->mutex);
30,660,003✔
533

534
    if (code != 0) break;
30,658,924!
535
  }
536

537
  (void)taosThreadMutexUnlock(&qset->mutex);
30,734,367✔
538
  return code;
30,720,279✔
539
}
540

541
int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; }
1,697,888✔
542
int64_t taosQallMemSize(STaosQall *qall) { return qall->memOfItems; }
×
543

544
int64_t taosQallUnAccessedItemSize(STaosQall *qall) { return qall->unAccessedNumOfItems; }
62,365✔
545
int64_t taosQallUnAccessedMemSize(STaosQall *qall) { return qall->unAccessMemOfItems; }
54,072✔
546

547
void    taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
×
548
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
13,187✔
549

550
void taosQueueSetThreadId(STaosQueue *pQueue, int64_t threadId) { pQueue->threadId = threadId; }
66,068✔
551

552
int64_t taosQueueGetThreadId(STaosQueue *pQueue) { return pQueue->threadId; }
131,867✔
553

554
#if 0
555

556
void taosResetQsetThread(STaosQset *qset, void *pItem) {
557
  if (pItem == NULL) return;
558
  STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
559

560
  (void)taosThreadMutexLock(&qset->mutex);
561
  for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
562
    tsem_post(&qset->sem);
563
  }
564
  (void)taosThreadMutexUnlock(&qset->mutex);
565
}
566

567
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc