• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4804

16 Oct 2025 10:33AM UTC coverage: 61.259% (+0.1%) from 61.147%
#4804

push

travis-ci

happyguoxy
Merge branch 'cover/3.0' of github.com:taosdata/TDengine into cover/3.0

156021 of 324369 branches covered (48.1%)

Branch coverage included in aggregate %.

79 of 100 new or added lines in 19 files covered. (79.0%)

3318 existing lines in 125 files now uncovered.

207798 of 269534 relevant lines covered (77.1%)

168909799.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.27
/source/dnode/vnode/src/vnd/vnodeBufPool.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "vnd.h"
17

18
/* ------------------------ STRUCTURES ------------------------ */
19
static int32_t vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) {
19,584,840✔
20
  SVBufPool *pPool;
21

22
  pPool = taosMemoryCalloc(1, sizeof(SVBufPool));
19,584,840!
23
  if (pPool == NULL) {
19,583,630!
24
    vError("vgId:%d, failed to allocate buffer pool", TD_VID(pVnode));
×
25
    return terrno;
×
26
  }
27
  pPool->node.data = taosMemoryMalloc(size);
19,583,630!
28
  if (NULL == pPool->node.data) {
19,586,729!
29
    vError("vgId:%d, failed to allocate buffer pool data, size:%" PRId64, TD_VID(pVnode), size);
×
30
    taosMemoryFree(pPool);
×
31
    return terrno;
×
32
  }
33

34
  // query handle list
35
  (void)taosThreadMutexInit(&pPool->mutex, NULL);
19,583,914✔
36
  pPool->nQuery = 0;
19,584,701✔
37
  pPool->qList.pNext = &pPool->qList;
19,584,715✔
38
  pPool->qList.ppNext = &pPool->qList.pNext;
19,584,173✔
39

40
  pPool->pVnode = pVnode;
19,586,512✔
41
  pPool->id = id;
19,584,602✔
42
  pPool->ptr = pPool->node.data;
19,584,149✔
43
  pPool->pTail = &pPool->node;
19,587,054✔
44
  pPool->node.prev = NULL;
19,583,882✔
45
  pPool->node.pnext = &pPool->pTail;
19,585,868✔
46
  pPool->node.size = size;
19,584,753✔
47

48
  *ppPool = pPool;
19,584,557✔
49
  return 0;
19,581,917✔
50
}
51

52
static void vnodeBufPoolDestroy(SVBufPool *pPool) {
19,587,054✔
53
  vnodeBufPoolReset(pPool);
19,587,054✔
54
  (void)taosThreadMutexDestroy(&pPool->mutex);
19,586,251✔
55
  taosMemoryFree(pPool->node.data);
19,587,054!
56
  taosMemoryFree(pPool);
19,586,628!
57
}
19,587,054✔
58

59
int vnodeOpenBufPool(SVnode *pVnode) {
6,521,138✔
60
  int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
6,521,138✔
61

62
  for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
26,112,455✔
63
    // create pool
64
    int32_t code;
65
    if ((code = vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i]))) {
19,583,437!
66
      vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
×
67
      vnodeCloseBufPool(pVnode);
×
68
      return code;
×
69
    }
70

71
    // add to free list
72
    pVnode->aBufPool[i]->freeNext = pVnode->freeList;
19,581,917✔
73
    pVnode->freeList = pVnode->aBufPool[i];
19,582,928✔
74
  }
75

76
  vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size);
6,529,018✔
77
  return 0;
6,529,018✔
78
}
79

80
void vnodeCloseBufPool(SVnode *pVnode) {
6,529,018✔
81
  for (int32_t i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
26,116,072✔
82
    if (pVnode->aBufPool[i]) {
19,587,054✔
83
      vnodeBufPoolDestroy(pVnode->aBufPool[i]);
19,586,251✔
84
      pVnode->aBufPool[i] = NULL;
19,587,054✔
85
    }
86
  }
87

88
  vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
6,529,018✔
89
}
6,529,018✔
90

91
void vnodeBufPoolReset(SVBufPool *pPool) {
29,500,041✔
92
  if (pPool->nQuery != 0) {
29,500,041!
93
    vError("vgId:%d, buffer pool %p of id %d has %d queries, reset it may cause problem", TD_VID(pPool->pVnode), pPool,
×
94
           pPool->id, pPool->nQuery);
95
  }
96

97
  for (SVBufPoolNode *pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) {
960,817,415✔
98
    pNode->prev->pnext = &pPool->pTail;
931,317,374✔
99
    pPool->pTail = pNode->prev;
931,317,374✔
100
    pPool->size = pPool->size - sizeof(*pNode) - pNode->size;
931,317,374✔
101
    taosMemoryFree(pNode);
931,317,374!
102
  }
103

104
  pPool->size = 0;
29,500,041✔
105
  pPool->ptr = pPool->node.data;
29,500,041✔
106
}
29,499,238✔
107

108
void *vnodeBufPoolMallocAligned(SVBufPool *pPool, int size) {
2,147,483,647✔
109
  SVBufPoolNode *pNode;
110
  void          *p = NULL;
2,147,483,647✔
111
  uint8_t       *ptr = NULL;
2,147,483,647✔
112
  int            paddingLen = 0;
2,147,483,647✔
113

114
  if (pPool == NULL) {
2,147,483,647!
115
    terrno = TSDB_CODE_INVALID_PARA;
×
116
    return NULL;
×
117
  }
118

119
  ptr = pPool->ptr;
2,147,483,647✔
120
  paddingLen = (((long)ptr + 7) & ~7) - (long)ptr;
2,147,483,647✔
121

122
  if (pPool->node.size >= pPool->ptr - pPool->node.data + size + paddingLen) {
2,147,483,647✔
123
    // allocate from the anchor node
124
    p = pPool->ptr + paddingLen;
2,147,483,647✔
125
    size += paddingLen;
2,147,483,647✔
126
    pPool->ptr = pPool->ptr + size;
2,147,483,647✔
127
    pPool->size += size;
2,147,483,647✔
128
  } else {
129
    // allocate a new node
130
    pNode = taosMemoryMalloc(sizeof(*pNode) + size);
931,317,374!
131
    if (pNode == NULL) {
931,301,564!
132
      return NULL;
×
133
    }
134
    pNode->data = (uint8_t *)&pNode[1];
931,301,564✔
135

136
    p = pNode->data;
931,302,584✔
137
    pNode->size = size;
931,305,644✔
138
    pNode->prev = pPool->pTail;
931,311,764✔
139
    pNode->pnext = &pPool->pTail;
931,315,844✔
140
    pPool->pTail->pnext = &pNode->prev;
931,317,374✔
141
    pPool->pTail = pNode;
931,317,374✔
142

143
    pPool->size = pPool->size + sizeof(*pNode) + size;
931,317,374✔
144
  }
145
  return p;
2,147,483,647✔
146
}
147

148
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
28,215,602✔
149
  SVBufPoolNode *pNode;
150
  void          *p = NULL;
28,215,602✔
151

152
  if (pPool == NULL) {
28,215,602!
153
    terrno = TSDB_CODE_INVALID_PARA;
×
154
    return NULL;
×
155
  }
156

157
  if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
28,215,602!
158
    // allocate from the anchor node
159
    p = pPool->ptr;
28,215,396✔
160
    pPool->ptr = pPool->ptr + size;
28,216,726✔
161
    pPool->size += size;
28,217,007✔
162
  } else {
163
    // allocate a new node
164
    pNode = taosMemoryMalloc(sizeof(*pNode) + size);
×
165
    if (pNode == NULL) {
×
166
      return NULL;
×
167
    }
168
    pNode->data = (uint8_t *)&pNode[1];
×
169

170
    p = pNode->data;
×
171
    pNode->size = size;
×
172
    pNode->prev = pPool->pTail;
×
173
    pNode->pnext = &pPool->pTail;
×
174
    pPool->pTail->pnext = &pNode->prev;
×
175
    pPool->pTail = pNode;
×
176

177
    pPool->size = pPool->size + sizeof(*pNode) + size;
×
178
  }
179
  return p;
28,216,932✔
180
}
181

182
void vnodeBufPoolFree(SVBufPool *pPool, void *p) {
156,315,578✔
183
  // uint8_t       *ptr = (uint8_t *)p;
184
  // SVBufPoolNode *pNode;
185

186
  // if (ptr < pPool->node.data || ptr >= pPool->node.data + pPool->node.size) {
187
  //   pNode = &((SVBufPoolNode *)p)[-1];
188
  //   *pNode->pnext = pNode->prev;
189
  //   pNode->prev->pnext = pNode->pnext;
190

191
  //   pPool->size = pPool->size - sizeof(*pNode) - pNode->size;
192
  //   taosMemoryFree(pNode);
193
  // }
194
}
156,315,578✔
195

196
void vnodeBufPoolRef(SVBufPool *pPool) {
16,416,174✔
197
  int32_t nRef = atomic_fetch_add_32(&pPool->nRef, 1);
16,416,174✔
198
  if (nRef <= 0) {
16,439,210!
199
    vError("vgId:%d, buffer pool %p of id %d is referenced by %d", TD_VID(pPool->pVnode), pPool, pPool->id, nRef);
×
200
  }
201
}
16,439,210✔
202

203
static void vnodeBufPoolResize(SVBufPool *pPool, int64_t size) {
×
204
  if (pPool == NULL) return;
×
205

206
  uint8_t *pDataNew = taosMemoryMalloc(size);
×
207
  if (pDataNew == NULL) {
×
208
    vError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pPool->pVnode), __func__, __FILE__, __LINE__,
×
209
           tstrerror(terrno));
210
    return;
×
211
  }
212

213
  // Apply change
214
  int64_t oldSize = pPool->node.size;
×
215
  taosMemoryFree(pPool->node.data);
×
216
  pPool->node.data = pDataNew;
×
217
  pPool->node.size = size;
×
218

219
  vInfo("vgId:%d, buffer pool %d resized from %" PRId64 " to %" PRId64, TD_VID(pPool->pVnode), pPool->id, oldSize,
×
220
        size);
221
  return;
×
222
}
223

224
void vnodeBufPoolAddToFreeList(SVBufPool *pPool) {
9,912,987✔
225
  SVnode *pVnode = pPool->pVnode;
9,912,987✔
226

227
  int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
9,912,987✔
228
  if (pPool->node.size != size) {
9,912,987!
229
    vnodeBufPoolResize(pPool, size);
×
230
  }
231

232
  // add to free list
233
  vDebug("vgId:%d, buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id);
9,912,987✔
234
  vnodeBufPoolReset(pPool);
9,912,987✔
235
  pPool->freeNext = pVnode->freeList;
9,912,987✔
236
  pVnode->freeList = pPool;
9,912,987✔
237
  (void)taosThreadCondSignal(&pVnode->poolNotEmpty);
9,912,987✔
238
}
9,912,987✔
239

240
void vnodeBufPoolUnRef(SVBufPool *pPool, bool proactive) {
16,439,270✔
241
  if (pPool == NULL) return;
16,439,270!
242

243
  SVnode *pVnode = pPool->pVnode;
16,439,270✔
244

245
  if (proactive) {
16,439,851✔
246
    (void)taosThreadMutexLock(&pVnode->mutex);
16,437,066✔
247
  }
248

249
  if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit;
16,439,622✔
250

251
  // remove from recycle queue or on-recycle position
252
  if (pVnode->onRecycle == pPool) {
104,009✔
253
    pVnode->onRecycle = NULL;
3,000✔
254
  } else {
255
    if (pPool->recyclePrev) {
101,009!
UNCOV
256
      pPool->recyclePrev->recycleNext = pPool->recycleNext;
×
257
    } else {
258
      pVnode->recycleHead = pPool->recycleNext;
101,009✔
259
    }
260

261
    if (pPool->recycleNext) {
101,009!
UNCOV
262
      pPool->recycleNext->recyclePrev = pPool->recyclePrev;
×
263
    } else {
264
      pVnode->recycleTail = pPool->recyclePrev;
101,009✔
265
    }
266
    pPool->recyclePrev = pPool->recycleNext = NULL;
101,009✔
267
  }
268

269
  vnodeBufPoolAddToFreeList(pPool);
104,009✔
270

271
_exit:
16,440,972✔
272
  if (proactive) {
16,440,972✔
273
    (void)taosThreadMutexUnlock(&pVnode->mutex);
16,433,218✔
274
  }
275
  return;
16,441,975✔
276
}
277

278
void vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
288,090,048✔
279
  (void)taosThreadMutexLock(&pPool->mutex);
288,090,048✔
280

281
  pQNode->pNext = pPool->qList.pNext;
288,204,249✔
282
  pQNode->ppNext = &pPool->qList.pNext;
288,190,662✔
283
  pPool->qList.pNext->ppNext = &pQNode->pNext;
288,141,974✔
284
  pPool->qList.pNext = pQNode;
288,168,639✔
285
  pPool->nQuery++;
287,969,910✔
286

287
  (void)taosThreadMutexUnlock(&pPool->mutex);
288,131,186✔
288
}
288,044,076✔
289

290
void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) {
288,190,351✔
291
  int32_t code = 0;
288,190,351✔
292

293
  if (proactive) {
288,190,351!
294
    (void)taosThreadMutexLock(&pPool->mutex);
288,204,684✔
295
  }
296

297
  pQNode->pNext->ppNext = pQNode->ppNext;
288,212,638✔
298
  *pQNode->ppNext = pQNode->pNext;
288,218,088✔
299
  pPool->nQuery--;
288,203,710✔
300

301
  if (proactive) {
288,222,385✔
302
    (void)taosThreadMutexUnlock(&pPool->mutex);
288,223,715✔
303
  }
304
}
288,184,289✔
305

306
int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
22,200✔
307
  int32_t code = 0;
22,200✔
308

309
  SVnode *pVnode = pPool->pVnode;
22,200✔
310

311
  vDebug("vgId:%d, recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id);
22,200!
312

313
  (void)taosThreadMutexLock(&pPool->mutex);
22,200✔
314

315
  SQueryNode *pNode = pPool->qList.pNext;
22,200✔
316
  while (pNode != &pPool->qList) {
45,600✔
317
    SQueryNode *pTNode = pNode->pNext;
23,400✔
318

319
    int32_t rc = pNode->reseek(pNode->pQHandle);
23,400✔
320
    if (rc == 0 || rc == TSDB_CODE_VND_QUERY_BUSY) {
23,400!
321
      pNode = pTNode;
23,400✔
322
    } else {
323
      code = rc;
×
324
      goto _exit;
×
325
    }
326
  }
327

328
_exit:
22,200✔
329
  (void)taosThreadMutexUnlock(&pPool->mutex);
22,200✔
330
  return code;
22,200✔
331
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc