• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.79
/source/dnode/vnode/src/vnd/vnodeBufPool.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "vnd.h"
17

18
/* ------------------------ STRUCTURES ------------------------ */
19
static int32_t vnodeBufPoolCreate(SVnode *pVnode, int32_t id, int64_t size, SVBufPool **ppPool) {
41,692✔
20
  SVBufPool *pPool;
21

22
  pPool = taosMemoryMalloc(sizeof(SVBufPool) + size);
41,692✔
23
  if (pPool == NULL) {
41,700!
24
    return terrno;
×
25
  }
26
  memset(pPool, 0, sizeof(SVBufPool));
41,700✔
27

28
  // query handle list
29
  (void)taosThreadMutexInit(&pPool->mutex, NULL);
41,700✔
30
  pPool->nQuery = 0;
41,693✔
31
  pPool->qList.pNext = &pPool->qList;
41,693✔
32
  pPool->qList.ppNext = &pPool->qList.pNext;
41,693✔
33

34
  pPool->pVnode = pVnode;
41,693✔
35
  pPool->id = id;
41,693✔
36
  pPool->ptr = pPool->node.data;
41,693✔
37
  pPool->pTail = &pPool->node;
41,693✔
38
  pPool->node.prev = NULL;
41,693✔
39
  pPool->node.pnext = &pPool->pTail;
41,693✔
40
  pPool->node.size = size;
41,693✔
41

42
  if (VND_IS_RSMA(pVnode)) {
41,693✔
43
    pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock));
52✔
44
    if (!pPool->lock) {
51!
45
      taosMemoryFree(pPool);
×
46
      return terrno;
×
47
    }
48
    if (taosThreadSpinInit(pPool->lock, 0) != 0) {
51!
49
      taosMemoryFree((void *)pPool->lock);
×
50
      taosMemoryFree(pPool);
×
51
      return terrno = TAOS_SYSTEM_ERROR(errno);
×
52
    }
53
  } else {
54
    pPool->lock = NULL;
41,641✔
55
  }
56

57
  *ppPool = pPool;
41,694✔
58
  return 0;
41,694✔
59
}
60

61
static void vnodeBufPoolDestroy(SVBufPool *pPool) {
41,703✔
62
  vnodeBufPoolReset(pPool);
41,703✔
63
  if (pPool->lock) {
41,703✔
64
    (void)taosThreadSpinDestroy(pPool->lock);
51✔
65
    taosMemoryFree((void *)pPool->lock);
51✔
66
  }
67
  (void)taosThreadMutexDestroy(&pPool->mutex);
41,703✔
68
  taosMemoryFree(pPool);
41,703✔
69
}
41,703✔
70

71
int vnodeOpenBufPool(SVnode *pVnode) {
13,901✔
72
  int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
13,901✔
73

74
  for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
55,595✔
75
    // create pool
76
    int32_t code;
77
    if ((code = vnodeBufPoolCreate(pVnode, i, size, &pVnode->aBufPool[i]))) {
41,692!
78
      vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
×
79
      vnodeCloseBufPool(pVnode);
×
80
      return code;
×
81
    }
82

83
    // add to free list
84
    pVnode->aBufPool[i]->freeNext = pVnode->freeList;
41,694✔
85
    pVnode->freeList = pVnode->aBufPool[i];
41,694✔
86
  }
87

88
  vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size);
13,903✔
89
  return 0;
13,901✔
90
}
91

92
void vnodeCloseBufPool(SVnode *pVnode) {
13,901✔
93
  for (int32_t i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
55,604✔
94
    if (pVnode->aBufPool[i]) {
41,703!
95
      vnodeBufPoolDestroy(pVnode->aBufPool[i]);
41,703✔
96
      pVnode->aBufPool[i] = NULL;
41,703✔
97
    }
98
  }
99

100
  vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
13,901✔
101
}
13,901✔
102

103
void vnodeBufPoolReset(SVBufPool *pPool) {
73,544✔
104
  if (pPool->nQuery != 0) {
73,544!
105
    vError("vgId:%d, buffer pool %p of id %d has %d queries, reset it may cause problem", TD_VID(pPool->pVnode), pPool,
×
106
           pPool->id, pPool->nQuery);
107
  }
108

109
  for (SVBufPoolNode *pNode = pPool->pTail; pNode->prev; pNode = pPool->pTail) {
10,696,788✔
110
    pNode->prev->pnext = &pPool->pTail;
10,623,272✔
111
    pPool->pTail = pNode->prev;
10,623,272✔
112
    pPool->size = pPool->size - sizeof(*pNode) - pNode->size;
10,623,272✔
113
    taosMemoryFree(pNode);
10,623,272✔
114
  }
115

116
  pPool->size = 0;
73,516✔
117
  pPool->ptr = pPool->node.data;
73,516✔
118
}
73,516✔
119

120
void *vnodeBufPoolMallocAligned(SVBufPool *pPool, int size) {
818,144,103✔
121
  SVBufPoolNode *pNode;
122
  void          *p = NULL;
818,144,103✔
123
  uint8_t       *ptr = NULL;
818,144,103✔
124
  int            paddingLen = 0;
818,144,103✔
125

126
  if (pPool == NULL) {
818,144,103!
127
    terrno = TSDB_CODE_INVALID_PARA;
×
128
    return NULL;
×
129
  }
130

131
  if (pPool->lock) taosThreadSpinLock(pPool->lock);
818,144,103✔
132

133
  ptr = pPool->ptr;
818,172,641✔
134
  paddingLen = (((long)ptr + 7) & ~7) - (long)ptr;
818,172,641✔
135

136
  if (pPool->node.size >= pPool->ptr - pPool->node.data + size + paddingLen) {
818,172,641✔
137
    // allocate from the anchor node
138
    p = pPool->ptr + paddingLen;
807,548,982✔
139
    size += paddingLen;
807,548,982✔
140
    pPool->ptr = pPool->ptr + size;
807,548,982✔
141
    pPool->size += size;
807,548,982✔
142
  } else {
143
    // allocate a new node
144
    pNode = taosMemoryMalloc(sizeof(*pNode) + size);
10,623,659✔
145
    if (pNode == NULL) {
10,623,659!
146
      if (pPool->lock) {
×
147
        (void)taosThreadSpinUnlock(pPool->lock);
×
148
      }
149
      return NULL;
×
150
    }
151

152
    p = pNode->data;
10,623,659✔
153
    pNode->size = size;
10,623,659✔
154
    pNode->prev = pPool->pTail;
10,623,659✔
155
    pNode->pnext = &pPool->pTail;
10,623,659✔
156
    pPool->pTail->pnext = &pNode->prev;
10,623,659✔
157
    pPool->pTail = pNode;
10,623,659✔
158

159
    pPool->size = pPool->size + sizeof(*pNode) + size;
10,623,659✔
160
  }
161
  if (pPool->lock) (void)taosThreadSpinUnlock(pPool->lock);
818,172,641✔
162
  return p;
818,161,833✔
163
}
164

165
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
63,716✔
166
  SVBufPoolNode *pNode;
167
  void          *p = NULL;
63,716✔
168

169
  if (pPool == NULL) {
63,716!
170
    terrno = TSDB_CODE_INVALID_PARA;
×
171
    return NULL;
×
172
  }
173

174
  if (pPool->lock) taosThreadSpinLock(pPool->lock);
63,716✔
175
  if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
63,716!
176
    // allocate from the anchor node
177
    p = pPool->ptr;
63,716✔
178
    pPool->ptr = pPool->ptr + size;
63,716✔
179
    pPool->size += size;
63,716✔
180
  } else {
181
    // allocate a new node
182
    pNode = taosMemoryMalloc(sizeof(*pNode) + size);
×
183
    if (pNode == NULL) {
×
184
      if (pPool->lock) (void)taosThreadSpinUnlock(pPool->lock);
×
185
      return NULL;
×
186
    }
187

188
    p = pNode->data;
×
189
    pNode->size = size;
×
190
    pNode->prev = pPool->pTail;
×
191
    pNode->pnext = &pPool->pTail;
×
192
    pPool->pTail->pnext = &pNode->prev;
×
193
    pPool->pTail = pNode;
×
194

195
    pPool->size = pPool->size + sizeof(*pNode) + size;
×
196
  }
197
  if (pPool->lock) (void)taosThreadSpinUnlock(pPool->lock);
63,716✔
198
  return p;
63,715✔
199
}
200

201
void vnodeBufPoolFree(SVBufPool *pPool, void *p) {
13,671✔
202
  // uint8_t       *ptr = (uint8_t *)p;
203
  // SVBufPoolNode *pNode;
204

205
  // if (ptr < pPool->node.data || ptr >= pPool->node.data + pPool->node.size) {
206
  //   pNode = &((SVBufPoolNode *)p)[-1];
207
  //   *pNode->pnext = pNode->prev;
208
  //   pNode->prev->pnext = pNode->pnext;
209

210
  //   pPool->size = pPool->size - sizeof(*pNode) - pNode->size;
211
  //   taosMemoryFree(pNode);
212
  // }
213
}
13,671✔
214

215
void vnodeBufPoolRef(SVBufPool *pPool) {
45,802✔
216
  int32_t nRef = atomic_fetch_add_32(&pPool->nRef, 1);
45,802✔
217
  if (nRef <= 0) {
45,803!
218
    vError("vgId:%d, buffer pool %p of id %d is referenced by %d", TD_VID(pPool->pVnode), pPool, pPool->id, nRef);
×
219
  }
220
}
45,803✔
221

222
void vnodeBufPoolAddToFreeList(SVBufPool *pPool) {
31,841✔
223
  SVnode *pVnode = pPool->pVnode;
31,841✔
224

225
  int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
31,841✔
226
  if (pPool->node.size != size) {
31,841!
227
    SVBufPool *pNewPool = NULL;
×
228
    if (vnodeBufPoolCreate(pVnode, pPool->id, size, &pNewPool) < 0) {
×
229
      vWarn("vgId:%d, failed to change buffer pool of id %d size from %" PRId64 " to %" PRId64 " since %s",
×
230
            TD_VID(pVnode), pPool->id, pPool->node.size, size, tstrerror(errno));
231
    } else {
232
      vInfo("vgId:%d, buffer pool of id %d size changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pPool->id,
×
233
            pPool->node.size, size);
234

235
      vnodeBufPoolDestroy(pPool);
×
236
      pPool = pNewPool;
×
237
      pVnode->aBufPool[pPool->id] = pPool;
×
238
    }
239
  }
240

241
  // add to free list
242
  vDebug("vgId:%d, buffer pool %p of id %d is added to free list", TD_VID(pVnode), pPool, pPool->id);
31,841✔
243
  vnodeBufPoolReset(pPool);
31,841✔
244
  pPool->freeNext = pVnode->freeList;
31,841✔
245
  pVnode->freeList = pPool;
31,841✔
246
  (void)taosThreadCondSignal(&pVnode->poolNotEmpty);
31,841✔
247
}
31,841✔
248

249
void vnodeBufPoolUnRef(SVBufPool *pPool, bool proactive) {
45,804✔
250
  if (pPool == NULL) return;
45,804!
251

252
  SVnode *pVnode = pPool->pVnode;
45,804✔
253

254
  if (proactive) {
45,804✔
255
    (void)taosThreadMutexLock(&pVnode->mutex);
45,801✔
256
  }
257

258
  if (atomic_sub_fetch_32(&pPool->nRef, 1) > 0) goto _exit;
45,805✔
259

260
  // remove from recycle queue or on-recycle position
261
  if (pVnode->onRecycle == pPool) {
1,597✔
262
    pVnode->onRecycle = NULL;
2✔
263
  } else {
264
    if (pPool->recyclePrev) {
1,595!
UNCOV
265
      pPool->recyclePrev->recycleNext = pPool->recycleNext;
×
266
    } else {
267
      pVnode->recycleHead = pPool->recycleNext;
1,595✔
268
    }
269

270
    if (pPool->recycleNext) {
1,595✔
271
      pPool->recycleNext->recyclePrev = pPool->recyclePrev;
10✔
272
    } else {
273
      pVnode->recycleTail = pPool->recyclePrev;
1,585✔
274
    }
275
    pPool->recyclePrev = pPool->recycleNext = NULL;
1,595✔
276
  }
277

278
  vnodeBufPoolAddToFreeList(pPool);
1,597✔
279

280
_exit:
45,804✔
281
  if (proactive) {
45,804✔
282
    (void)taosThreadMutexUnlock(&pVnode->mutex);
45,802✔
283
  }
284
  return;
45,803✔
285
}
286

287
void vnodeBufPoolRegisterQuery(SVBufPool *pPool, SQueryNode *pQNode) {
4,208,845✔
288
  (void)taosThreadMutexLock(&pPool->mutex);
4,208,845✔
289

290
  pQNode->pNext = pPool->qList.pNext;
4,213,907✔
291
  pQNode->ppNext = &pPool->qList.pNext;
4,213,907✔
292
  pPool->qList.pNext->ppNext = &pQNode->pNext;
4,213,907✔
293
  pPool->qList.pNext = pQNode;
4,213,907✔
294
  pPool->nQuery++;
4,213,907✔
295

296
  (void)taosThreadMutexUnlock(&pPool->mutex);
4,213,907✔
297
}
4,213,947✔
298

299
void vnodeBufPoolDeregisterQuery(SVBufPool *pPool, SQueryNode *pQNode, bool proactive) {
4,213,969✔
300
  int32_t code = 0;
4,213,969✔
301

302
  if (proactive) {
4,213,969!
303
    (void)taosThreadMutexLock(&pPool->mutex);
4,214,085✔
304
  }
305

306
  pQNode->pNext->ppNext = pQNode->ppNext;
4,214,083✔
307
  *pQNode->ppNext = pQNode->pNext;
4,214,083✔
308
  pPool->nQuery--;
4,214,083✔
309

310
  if (proactive) {
4,214,083✔
311
    (void)taosThreadMutexUnlock(&pPool->mutex);
4,213,845✔
312
  }
313
}
4,214,511✔
314

315
int32_t vnodeBufPoolRecycle(SVBufPool *pPool) {
2✔
316
  int32_t code = 0;
2✔
317

318
  SVnode *pVnode = pPool->pVnode;
2✔
319

320
  vDebug("vgId:%d, recycle buffer pool %p of id %d", TD_VID(pVnode), pPool, pPool->id);
2!
321

322
  (void)taosThreadMutexLock(&pPool->mutex);
2✔
323

324
  SQueryNode *pNode = pPool->qList.pNext;
2✔
325
  while (pNode != &pPool->qList) {
4✔
326
    SQueryNode *pTNode = pNode->pNext;
2✔
327

328
    int32_t rc = pNode->reseek(pNode->pQHandle);
2✔
329
    if (rc == 0 || rc == TSDB_CODE_VND_QUERY_BUSY) {
2!
330
      pNode = pTNode;
2✔
331
    } else {
332
      code = rc;
×
333
      goto _exit;
×
334
    }
335
  }
336

337
_exit:
2✔
338
  (void)taosThreadMutexUnlock(&pPool->mutex);
2✔
339
  return code;
2✔
340
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc