• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4672

14 Aug 2025 01:04PM UTC coverage: 59.715% (+2.2%) from 57.533%
#4672

push

travis-ci

web-flow
fix(query): fix order by column check of union operator (#32524)

137065 of 292055 branches covered (46.93%)

Branch coverage included in aggregate %.

1 of 13 new or added lines in 1 file covered. (7.69%)

20958 existing lines in 205 files now uncovered.

207155 of 284385 relevant lines covered (72.84%)

20501687.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.52
/source/dnode/vnode/src/vnd/vnodeBufPool.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "vnd.h"
17

18
/* ------------------------ STRUCTURES ------------------------ */
19
static int32_t vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) {
41,679✔
20
  SVBufPool *pPool;
21

22
  pPool = taosMemoryCalloc(1, sizeof(SVBufPool));
41,679!
23
  if (pPool == NULL) {
41,680!
24
    vError("vgId:%d, failed to allocate buffer pool", TD_VID(pVnode));
×
UNCOV
25
    return terrno;
×
26
  }
27
  pPool->node.data = taosMemoryMalloc(size);
41,680!
28
  if (NULL == pPool->node.data) {
41,685!
UNCOV
29
    vError("vgId:%d, failed to allocate buffer pool data, size:%" PRId64, TD_VID(pVnode), size);
×
UNCOV
30
    taosMemoryFree(pPool);
×
UNCOV
31
    return terrno;
×
32
  }
33

34
  // query handle list
35
  (void)taosThreadMutexInit(&pPool->mutex, NULL);
41,685✔
36
  pPool->nQuery = 0;
41,682✔
37
  pPool->qList.pNext = &pPool->qList;
41,682✔
38
  pPool->qList.ppNext = &pPool->qList.pNext;
41,682✔
39

40
  pPool->pVnode = pVnode;
41,682✔
41
  pPool->id = id;
41,682✔
42
  pPool->ptr = pPool->node.data;
41,682✔
43
  pPool->pTail = &pPool->node;
41,682✔
44
  pPool->node.prev = NULL;
41,682✔
45
  pPool->node.pnext = &pPool->pTail;
41,682✔
46
  pPool->node.size = size;
41,682✔
47

48
  if (VND_IS_RSMA(pVnode)) {
41,682!
UNCOV
49
    pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock));
×
50
    if (!pPool->lock) {
×
51
      taosMemoryFree(pPool->node.data);
×
UNCOV
52
      taosMemoryFree(pPool);
×
UNCOV
53
      return terrno;
×
54
    }
UNCOV
55
    if (taosThreadSpinInit(pPool->lock, 0) != 0) {
×
UNCOV
56
      taosMemoryFree((void *)pPool->lock);
×
UNCOV
57
      taosMemoryFree(pPool->node.data);
×
UNCOV
58
      taosMemoryFree(pPool);
×
UNCOV
59
      return terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
60
    }
61
  } else {
62
    pPool->lock = NULL;
41,684✔
63
  }
64

65
  *ppPool = pPool;
41,684✔
66
  return 0;
41,684✔
67
}
68

69
static void vnodeBufPoolDestroy(SVBufPool *pPool) {
41,697✔
70
  vnodeBufPoolReset(pPool);
41,697✔
71
  if (pPool->lock) {
41,697!
UNCOV
72
    (void)taosThreadSpinDestroy(pPool->lock);
×
UNCOV
73
    taosMemoryFree((void *)pPool->lock);
×
74
  }
75
  (void)taosThreadMutexDestroy(&pPool->mutex);
41,697✔
76
  taosMemoryFree(pPool->node.data);
41,696!
77
  taosMemoryFree(pPool);
41,697!
78
}
41,696✔
79

80
int vnodeOpenBufPool(SVnode *pVnode) {
13,899✔
81
  int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
13,899✔
82

83
  for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
55,582✔
84
    // create pool
85
    int32_t code;
86
    if ((code = vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i]))) {
41,681!
UNCOV
87
      vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
×
UNCOV
88
      vnodeCloseBufPool(pVnode);
×
UNCOV
89
      return code;
×
90
    }
91

92
    // add to free list
93
    pVnode->aBufPool[i]->freeNext = pVnode->freeList;
41,683✔
94
    pVnode->freeList = pVnode->aBufPool[i];
41,683✔
95
  }
96

97
  vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size);
13,901✔
98
  return 0;
13,899✔
99
}
100

101
void vnodeCloseBufPool(SVnode *pVnode) {
13,898✔
102
  for (int32_t i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
55,594✔
103
    if (pVnode->aBufPool[i]) {
41,697!
104
      vnodeBufPoolDestroy(pVnode->aBufPool[i]);
41,697✔
105
      pVnode->aBufPool[i] = NULL;
41,696✔
106
    }
107
  }
108

109
  vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
13,897✔
110
}
13,897✔
111

112
void vnodeBufPoolReset(SVBufPool *pPool) {
74,775✔
113
  if (pPool->nQuery != 0) {
74,775!
UNCOV
114
    vError("vgId:%d, buffer pool %p of id %d has %d queries, reset it may cause problem", TD_VID(pPool->pVnode), pPool,
×
115
           pPool->id, pPool->nQuery);
116
  }
117

118
  for (SVBufPoolNode *pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) {
2,164,482✔
119
    pNode->prev->pnext = &pPool->pTail;
2,089,707✔
120
    pPool->pTail = pNode->prev;
2,089,707✔
121
    pPool->size = pPool->size - sizeof(*pNode) - pNode->size;
2,089,707✔
122
    taosMemoryFree(pNode);
2,089,707!
123
  }
124

125
  pPool->size = 0;
74,775✔
126
  pPool->ptr = pPool->node.data;
74,775✔
127
}
74,775✔
128

129
void *vnodeBufPoolMallocAligned(SVBufPool *pPool, int size) {
629,362,740✔
130
  SVBufPoolNode *pNode;
131
  void          *p = NULL;
629,362,740✔
132
  uint8_t       *ptr = NULL;
629,362,740✔
133
  int            paddingLen = 0;
629,362,740✔
134

135
  if (pPool == NULL) {
629,362,740!
UNCOV
136
    terrno = TSDB_CODE_INVALID_PARA;
×
UNCOV
137
    return NULL;
×
138
  }
139

140
  if (pPool->lock) taosThreadSpinLock(pPool->lock);
629,362,740!
141

142
  ptr = pPool->ptr;
629,410,018✔
143
  paddingLen = (((long)ptr + 7) & ~7) - (long)ptr;
629,410,018✔
144

145
  if (pPool->node.size >= pPool->ptr - pPool->node.data + size + paddingLen) {
629,410,018✔
146
    // allocate from the anchor node
147
    p = pPool->ptr + paddingLen;
627,320,311✔
148
    size += paddingLen;
627,320,311✔
149
    pPool->ptr = pPool->ptr + size;
627,320,311✔
150
    pPool->size += size;
627,320,311✔
151
  } else {
152
    // allocate a new node
153
    pNode = taosMemoryMalloc(sizeof(*pNode) + size);
2,089,707!
154
    if (pNode == NULL) {
2,089,707!
UNCOV
155
      if (pPool->lock) {
×
UNCOV
156
        (void)taosThreadSpinUnlock(pPool->lock);
×
157
      }
UNCOV
158
      return NULL;
×
159
    }
160
    pNode->data = (uint8_t *)&pNode[1];
2,089,707✔
161

162
    p = pNode->data;
2,089,707✔
163
    pNode->size = size;
2,089,707✔
164
    pNode->prev = pPool->pTail;
2,089,707✔
165
    pNode->pnext = &pPool->pTail;
2,089,707✔
166
    pPool->pTail->pnext = &pNode->prev;
2,089,707✔
167
    pPool->pTail = pNode;
2,089,707✔
168

169
    pPool->size = pPool->size + sizeof(*pNode) + size;
2,089,707✔
170
  }
171
  if (pPool->lock) (void)taosThreadSpinUnlock(pPool->lock);
629,410,018!
172
  return p;
629,381,457✔
173
}
174

175
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
131,552✔
176
  SVBufPoolNode *pNode;
177
  void          *p = NULL;
131,552✔
178

179
  if (pPool == NULL) {
131,552!
UNCOV
180
    terrno = TSDB_CODE_INVALID_PARA;
×
UNCOV
181
    return NULL;
×
182
  }
183

184
  if (pPool->lock) taosThreadSpinLock(pPool->lock);
131,552!
185
  if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
131,553!
186
    // allocate from the anchor node
187
    p = pPool->ptr;
131,553✔
188
    pPool->ptr = pPool->ptr + size;
131,553✔
189
    pPool->size += size;
131,553✔
190
  } else {
191
    // allocate a new node
192
    pNode = taosMemoryMalloc(sizeof(*pNode) + size);
×
193
    if (pNode == NULL) {
×
UNCOV
194
      if (pPool->lock) (void)taosThreadSpinUnlock(pPool->lock);
×
195
      return NULL;
×
196
    }
UNCOV
197
    pNode->data = (uint8_t *)&pNode[1];
×
198

UNCOV
199
    p = pNode->data;
×
UNCOV
200
    pNode->size = size;
×
UNCOV
201
    pNode->prev = pPool->pTail;
×
UNCOV
202
    pNode->pnext = &pPool->pTail;
×
UNCOV
203
    pPool->pTail->pnext = &pNode->prev;
×
UNCOV
204
    pPool->pTail = pNode;
×
205

UNCOV
206
    pPool->size = pPool->size + sizeof(*pNode) + size;
×
207
  }
208
  if (pPool->lock) (void)taosThreadSpinUnlock(pPool->lock);
131,553!
209
  return p;
131,550✔
210
}
211

212
void vnodeBufPoolFree(SVBufPool *pPool, void *p) {
180,676✔
213
  // uint8_t       *ptr = (uint8_t *)p;
214
  // SVBufPoolNode *pNode;
215

216
  // if (ptr < pPool->node.data || ptr >= pPool->node.data + pPool->node.size) {
217
  //   pNode = &((SVBufPoolNode *)p)[-1];
218
  //   *pNode->pnext = pNode->prev;
219
  //   pNode->prev->pnext = pNode->pnext;
220

221
  //   pPool->size = pPool->size - sizeof(*pNode) - pNode->size;
222
  //   taosMemoryFree(pNode);
223
  // }
224
}
180,676✔
225

226
void vnodeBufPoolRef(SVBufPool *pPool) {
46,974✔
227
  int32_t nRef = atomic_fetch_add_32(&pPool->nRef, 1);
46,974✔
228
  if (nRef <= 0) {
46,976!
229
    vError("vgId:%d, buffer pool %p of id %d is referenced by %d", TD_VID(pPool->pVnode), pPool, pPool->id, nRef);
×
230
  }
231
}
46,976✔
232

UNCOV
233
static void vnodeBufPoolResize(SVBufPool *pPool, int64_t size) {
×
UNCOV
234
  if (pPool == NULL) return;
×
235

236
  uint8_t *pDataNew = taosMemoryMalloc(size);
×
237
  if (pDataNew == NULL) {
×
UNCOV
238
    vError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pPool->pVnode), __func__, __FILE__, __LINE__,
×
239
           tstrerror(terrno));
UNCOV
240
    return;
×
241
  }
242

243
  // Apply change
UNCOV
244
  int64_t oldSize = pPool->node.size;
×
UNCOV
245
  taosMemoryFree(pPool->node.data);
×
UNCOV
246
  pPool->node.data = pDataNew;
×
UNCOV
247
  pPool->node.size = size;
×
248

UNCOV
249
  vInfo("vgId:%d, buffer pool %d resized from %" PRId64 " to %" PRId64, TD_VID(pPool->pVnode), pPool->id, oldSize,
×
250
        size);
UNCOV
251
  return;
×
252
}
253

254
void vnodeBufPoolAddToFreeList(SVBufPool *pPool) {
33,078✔
255
  SVnode *pVnode = pPool->pVnode;
33,078✔
256

257
  int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
33,078✔
258
  if (pPool->node.size != size) {
33,078!
UNCOV
259
    vnodeBufPoolResize(pPool, size);
×
260
  }
261

262
  // add to free list
263
  vDebug("vgId:%d, buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id);
33,078✔
264
  vnodeBufPoolReset(pPool);
33,078✔
265
  pPool->freeNext = pVnode->freeList;
33,078✔
266
  pVnode->freeList = pPool;
33,078✔
267
  (void)taosThreadCondSignal(&pVnode->poolNotEmpty);
33,078✔
268
}
33,078✔
269

270
void vnodeBufPoolUnRef(SVBufPool *pPool, bool proactive) {
46,975✔
271
  if (pPool == NULL) return;
46,975!
272

273
  SVnode *pVnode = pPool->pVnode;
46,975✔
274

275
  if (proactive) {
46,975!
276
    (void)taosThreadMutexLock(&pVnode->mutex);
46,975✔
277
  }
278

279
  if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit;
46,976✔
280

281
  // remove from recycle queue or on-recycle position
282
  if (pVnode->onRecycle == pPool) {
345!
UNCOV
283
    pVnode->onRecycle = NULL;
×
284
  } else {
285
    if (pPool->recyclePrev) {
345!
UNCOV
286
      pPool->recyclePrev->recycleNext = pPool->recycleNext;
×
287
    } else {
288
      pVnode->recycleHead = pPool->recycleNext;
345✔
289
    }
290

291
    if (pPool->recycleNext) {
345!
UNCOV
292
      pPool->recycleNext->recyclePrev = pPool->recyclePrev;
×
293
    } else {
294
      pVnode->recycleTail = pPool->recyclePrev;
345✔
295
    }
296
    pPool->recyclePrev = pPool->recycleNext = NULL;
345✔
297
  }
298

299
  vnodeBufPoolAddToFreeList(pPool);
345✔
300

301
_exit:
46,977✔
302
  if (proactive) {
46,977!
303
    (void)taosThreadMutexUnlock(&pVnode->mutex);
46,977✔
304
  }
305
  return;
46,977✔
306
}
307

308
void vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
6,611,833✔
309
  (void)taosThreadMutexLock(&pPool->mutex);
6,611,833✔
310

311
  pQNode->pNext = pPool->qList.pNext;
6,619,917✔
312
  pQNode->ppNext = &pPool->qList.pNext;
6,619,917✔
313
  pPool->qList.pNext->ppNext = &pQNode->pNext;
6,619,917✔
314
  pPool->qList.pNext = pQNode;
6,619,917✔
315
  pPool->nQuery++;
6,619,917✔
316

317
  (void)taosThreadMutexUnlock(&pPool->mutex);
6,619,917✔
318
}
6,617,927✔
319

320
void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) {
6,619,804✔
321
  int32_t code = 0;
6,619,804✔
322

323
  if (proactive) {
6,619,804!
324
    (void)taosThreadMutexLock(&pPool->mutex);
6,619,968✔
325
  }
326

327
  pQNode->pNext->ppNext = pQNode->ppNext;
6,620,086✔
328
  *pQNode->ppNext = pQNode->pNext;
6,620,086✔
329
  pPool->nQuery--;
6,620,086✔
330

331
  if (proactive) {
6,620,086!
332
    (void)taosThreadMutexUnlock(&pPool->mutex);
6,620,259✔
333
  }
334
}
6,619,902✔
335

UNCOV
336
int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
×
UNCOV
337
  int32_t code = 0;
×
338

UNCOV
339
  SVnode *pVnode = pPool->pVnode;
×
340

UNCOV
341
  vDebug("vgId:%d, recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id);
×
342

UNCOV
343
  (void)taosThreadMutexLock(&pPool->mutex);
×
344

UNCOV
345
  SQueryNode *pNode = pPool->qList.pNext;
×
UNCOV
346
  while (pNode != &pPool->qList) {
×
UNCOV
347
    SQueryNode *pTNode = pNode->pNext;
×
348

UNCOV
349
    int32_t rc = pNode->reseek(pNode->pQHandle);
×
UNCOV
350
    if (rc == 0 || rc == TSDB_CODE_VND_QUERY_BUSY) {
×
UNCOV
351
      pNode = pTNode;
×
352
    } else {
UNCOV
353
      code = rc;
×
UNCOV
354
      goto _exit;
×
355
    }
356
  }
357

UNCOV
358
_exit:
×
UNCOV
359
  (void)taosThreadMutexUnlock(&pPool->mutex);
×
UNCOV
360
  return code;
×
361
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc