• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4908

30 Dec 2025 10:52AM UTC coverage: 65.386% (-0.2%) from 65.541%
#4908

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

1330 existing lines in 113 files now uncovered.

193461 of 295877 relevant lines covered (65.39%)

115765274.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.57
/source/util/src/tmempool.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "osMemPool.h"
18
#include "tmempoolInt.h"
19
#include "tlog.h"
20
#include "tutil.h"
21
#include "taos.h"
22
#include "tglobal.h"
23

24
static TdThreadOnce  gMPoolInit = PTHREAD_ONCE_INIT;
25
void* gMemPoolHandle = NULL;
26
threadlocal void* threadPoolSession = NULL;
27
threadlocal bool  threadPoolEnabled = true;
28

29
SMemPoolMgmt gMPMgmt = {0};
30
SMPStrategyFp gMPFps[] = {
31
  {NULL}, 
32
  {NULL,        mpDirectFullAlloc, mpDirectFullFree, mpDirectGetMemSize, mpDirectFullRealloc, NULL,               NULL,             mpDirectTrim},
33
  //{mpChunkInit, mpChunkAlloc,  mpChunkFree,  mpChunkGetMemSize,  mpChunkRealloc,  mpChunkInitSession, mpChunkUpdateCfg, NULL}
34
};
35

36

37
int32_t mpCheckCfg(SMemPoolCfg* cfg) {
×
38
  if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) {
×
39
    uError("invalid memory pool chunkSize:%d", cfg->chunkSize);
×
40
    return TSDB_CODE_INVALID_PARA;
×
41
  }
42

43
  if (cfg->evicPolicy <= 0 || cfg->evicPolicy >= E_EVICT_MAX_VALUE) {
×
44
    uError("invalid memory pool evicPolicy:%d", cfg->evicPolicy);
×
45
    return TSDB_CODE_INVALID_PARA;
×
46
  }
47

48
  if (cfg->threadNum <= 0) {
×
49
    uError("invalid memory pool threadNum:%d", cfg->threadNum);
×
50
    return TSDB_CODE_INVALID_PARA;
×
51
  }
52

53
  return TSDB_CODE_SUCCESS;
×
54
}
55

56

57
void mpFreeCacheGroup(SMPCacheGroup* pGrp) {
562,719✔
58
  if (NULL == pGrp) {
562,719✔
59
    return;
×
60
  }
61

62
  taosMemoryFree(pGrp->pNodes);
562,719✔
63
  taosMemoryFree(pGrp);
562,719✔
64
}
65

66

67
int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead) {
582,595✔
68
  SMPCacheGroup* pGrp = NULL;
582,595✔
69
  if (NULL == atomic_load_ptr(&pInfo->pGrpHead)) {
582,595✔
70
    pInfo->pGrpHead = taosMemoryCalloc(1, sizeof(*pInfo->pGrpHead));
582,216✔
71
    if (NULL == pInfo->pGrpHead) {
582,216✔
72
      uError("malloc pGrpHead failed, error:%s", tstrerror(terrno));
×
73
      MP_ERR_RET(terrno);
×
74
    }
75

76
    pGrp = pInfo->pGrpHead;
582,216✔
77
  } else {
78
    pGrp = (SMPCacheGroup*)taosMemoryCalloc(1, sizeof(SMPCacheGroup));
379✔
79
    if (NULL == pGrp) {
379✔
80
      uError("malloc SMPCacheGroup failed, error:%s", tstrerror(terrno));
×
81
      MP_ERR_RET(terrno);
×
82
    }
83
    pGrp->pNext = pHead;
379✔
84
  }
85

86
  pGrp->nodesNum = pInfo->groupNum;
582,595✔
87
  pGrp->pNodes = taosMemoryCalloc(pGrp->nodesNum, pInfo->nodeSize);
582,595✔
88
  if (NULL == pGrp->pNodes) {
582,595✔
89
    uError("calloc %d %d nodes in cache group failed", pGrp->nodesNum, pInfo->nodeSize);
×
90
    MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
91
  }
92

93
  if (pHead && atomic_val_compare_exchange_ptr(&pInfo->pGrpHead, pHead, pGrp) != pHead) {
582,595✔
UNCOV
94
    mpFreeCacheGroup(pGrp);
×
UNCOV
95
    return TSDB_CODE_SUCCESS;
×
96
  }
97

98
  (void)atomic_add_fetch_64(&pInfo->allocNum, pGrp->nodesNum);
582,595✔
99

100
  return TSDB_CODE_SUCCESS;
582,595✔
101
}
102

103
void mpDestroyCacheGroup(SMPCacheGroupInfo* pInfo) {
562,340✔
104
  SMPCacheGroup* pGrp = pInfo->pGrpHead;
562,340✔
105
  SMPCacheGroup* pNext = NULL;
562,340✔
106
  while (NULL != pGrp) {
1,125,059✔
107
    pNext = pGrp->pNext;
562,719✔
108

109
    mpFreeCacheGroup(pGrp);
562,719✔
110

111
    pGrp = pNext;
562,719✔
112
  }
113
}
562,340✔
114

115

116
int32_t mpPopIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes) {
302,272,578✔
117
  SMPCacheGroup* pGrp = NULL;
302,272,578✔
118
  SMPListNode* pNode = NULL;
302,272,578✔
119
  
120
  while (true) {
121
    pNode = (SMPListNode*)atomic_load_ptr(&pInfo->pIdleList);
302,716,098✔
122
    if (NULL == pNode) {
302,618,396✔
123
      break;
1,501,704✔
124
    }
125

126
    if (atomic_val_compare_exchange_ptr(&pInfo->pIdleList, pNode, pNode->pNext) != pNode) {
301,116,692✔
127
      continue;
443,520✔
128
    }
129

130
    pNode->pNext = NULL;
300,847,566✔
131
    goto _return;
300,847,075✔
132
  }
133

134
  while (true) {
379✔
135
    pGrp = atomic_load_ptr(&pInfo->pGrpHead);
1,502,083✔
136
    int32_t offset = atomic_fetch_add_32(&pGrp->idleOffset, 1);
1,502,083✔
137
    if (offset < pGrp->nodesNum) {
1,502,083✔
138
      pNode = (SMPListNode*)((char*)pGrp->pNodes + offset * pInfo->nodeSize);
1,501,704✔
139
      break;
1,501,236✔
140
    } else {
141
      (void)atomic_sub_fetch_32(&pGrp->idleOffset, 1);
379✔
142
    }
143
    
144
    MP_ERR_RET(mpAddCacheGroup(pPool, pInfo, pGrp));
379✔
145
  }
146

147
_return:
302,348,311✔
148

149
  *ppRes = pNode;
302,348,311✔
150

151
  return TSDB_CODE_SUCCESS;
302,328,790✔
152
}
153

154
void mpPushIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPListNode* pNode) {
302,325,485✔
155
  SMPCacheGroup* pGrp = NULL;
302,325,485✔
156
  SMPListNode* pOrig = NULL;
302,325,485✔
157
  
158
  while (true) {
159
    pOrig = (SMPListNode*)atomic_load_ptr(&pInfo->pIdleList);
302,373,590✔
160
    pNode->pNext = pOrig;
302,379,236✔
161
    
162
    if (atomic_val_compare_exchange_ptr(&pInfo->pIdleList, pOrig, pNode) != pOrig) {
302,380,067✔
163
      continue;
48,105✔
164
    }
165

166
    break;
302,348,091✔
167
  }
168
}
302,348,091✔
169

170

171
int32_t mpUpdateCfg(SMemPool* pPool) {
582,216✔
172
  if (gMPFps[gMPMgmt.strategy].updateCfgFp) {
582,216✔
173
    MP_ERR_RET((*gMPFps[gMPMgmt.strategy].updateCfgFp)(pPool));
×
174
  }
175

176
  uDebug("memPool %s cfg updated, reserveSize:%" PRId64 ", jobQuota:%dMB, threadNum:%d", 
582,216✔
177
      pPool->name, pPool->cfg.reserveSize, *pPool->cfg.jobQuota, pPool->cfg.threadNum);
178

179
  return TSDB_CODE_SUCCESS;
582,216✔
180
}
181

182
uint32_t mpFileIdHashFp(const char* fileId, uint32_t len) {
×
183
  return *(uint32_t*)fileId;
×
184
}
185

186
void mpDestroyPosStat(SMPStatPos* pStat) {
×
187
  taosHashCleanup(pStat->fileHash);
×
188
  pStat->fileHash = NULL;
×
189
  taosHashCleanup(pStat->remainHash);
×
190
  pStat->remainHash = NULL;
×
191
  taosHashCleanup(pStat->allocHash);
×
192
  pStat->allocHash = NULL;
×
193
  taosHashCleanup(pStat->freeHash);
×
194
  pStat->freeHash = NULL;
×
195
}
×
196

197
int32_t mpInitPosStat(SMPStatPos* pStat, bool sessionStat) {
×
198
  pStat->remainHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
199
  if (NULL == pStat->remainHash) {
×
200
    uError("memPool init posStat remainHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
×
201
    return terrno;
×
202
  }
203

204
  pStat->allocHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
205
  if (NULL == pStat->allocHash) {
×
206
    uError("memPool init posStat allocHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
×
207
    return terrno;
×
208
  }
209

210
  pStat->freeHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
211
  if (NULL == pStat->freeHash) {
×
212
    uError("memPool init posStat freeHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
×
213
    return terrno;
×
214
  }
215

216
  pStat->fileHash = taosHashInit(1024, mpFileIdHashFp, false, HASH_ENTRY_LOCK);
×
217
  if (NULL == pStat->fileHash) {
×
218
    uError("memPool init posStat fileHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
×
219
    return terrno;
×
220
  }
221

222
  uDebug("memPool stat initialized, sessionStat:%d", sessionStat);
×
223

224
  return TSDB_CODE_SUCCESS;
×
225
}
226

227
int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
582,216✔
228
//  MP_ERR_RET(mpCheckCfg(cfg));
229
  
230
  TAOS_MEMCPY(&pPool->cfg, cfg, sizeof(*cfg));
582,216✔
231
  
232
  pPool->name = taosStrdup(poolName);
582,216✔
233
  if (NULL == pPool->name) {
582,216✔
234
    uError("calloc memory pool name %s failed", poolName);
×
235
    MP_ERR_RET(terrno);
×
236
  }
237

238
  MP_ERR_RET(mpUpdateCfg(pPool));
582,216✔
239

240
  pPool->sessionCache.groupNum = MP_SESSION_CACHE_ALLOC_BATCH_SIZE;
582,216✔
241
  pPool->sessionCache.nodeSize = sizeof(SMPSession);
582,216✔
242

243
  MP_ERR_RET(mpAddCacheGroup(pPool, &pPool->sessionCache, NULL));
582,216✔
244

245
  if (gMPFps[gMPMgmt.strategy].initFp) {
582,216✔
246
    MP_ERR_RET((*gMPFps[gMPMgmt.strategy].initFp)(pPool, poolName, cfg));
×
247
  }
248

249
  if (tsMemPoolFullFunc) {
582,216✔
250
    MP_ERR_RET(mpInitPosStat(&pPool->stat.posStat, false));
×
251
  }
252
  
253
  return TSDB_CODE_SUCCESS;
582,216✔
254
}
255

256
FORCE_INLINE void mpUpdateMaxAllocSize(int64_t* pMaxAllocMemSize, int64_t newSize) {
257
  int64_t maxAllocMemSize = atomic_load_64(pMaxAllocMemSize);
×
258
  while (true) {
259
    if (newSize <= maxAllocMemSize) {
×
260
      break;
×
261
    }
262
    
263
    if (maxAllocMemSize == atomic_val_compare_exchange_64(pMaxAllocMemSize, maxAllocMemSize, newSize)) {
×
264
      break;
×
265
    }
266

267
    maxAllocMemSize = atomic_load_64(pMaxAllocMemSize);
×
268
  }
269
}
×
270

271
void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize) {
×
272
  if (addSize) {
×
273
    if (NULL != pSession) {
×
274
      (void)atomic_add_fetch_64(&pSession->pJob->job.allocMemSize, addSize);
×
275
    }
276
    (void)atomic_add_fetch_64(&pPool->allocMemSize, addSize);
×
277
  }
278

279
  if (NULL != pSession) {
×
280
    int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size);
×
281
    mpUpdateMaxAllocSize(&pSession->maxAllocMemSize, allocMemSize);
×
282

283
    allocMemSize = atomic_load_64(&pSession->pJob->job.allocMemSize);
×
284
    mpUpdateMaxAllocSize(&pSession->pJob->job.maxAllocMemSize, allocMemSize);
×
285
  }
286
  
287
  if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOG_MAXSIZE)) {
×
288
    int64_t allocMemSize = atomic_load_64(&pPool->allocMemSize);
×
289
    mpUpdateMaxAllocSize(&pPool->maxAllocMemSize, allocMemSize);
×
290
  }
291
}
×
292

293
int32_t mpPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) {
×
294
  if (retireLowLevel) {
×
295
    if (0 == atomic_val_compare_exchange_8(&gMPMgmt.msgQueue.lowLevelRetire, 0, 1)) {
×
296
      atomic_store_ptr(&gMPMgmt.msgQueue.pPool, pPool);
×
297
      MP_ERR_RET(tsem2_post(&gMPMgmt.threadSem));
×
298
    }
299
    
300
    return TSDB_CODE_SUCCESS;
×
301
  }
302

303
  if (0 == atomic_val_compare_exchange_8(&gMPMgmt.msgQueue.midLevelRetire, 0, 1)) {
×
304
    atomic_store_ptr(&gMPMgmt.msgQueue.pPool, pPool);
×
305
    MP_ERR_RET(tsem2_post(&gMPMgmt.threadSem));
×
306
  }
307
  
308
  return TSDB_CODE_SUCCESS;
×
309
}
310

311

312
int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size) {
×
313
  int32_t code = TSDB_CODE_SUCCESS;
×
314
  if (NULL == pSession) {
×
315
    (void)atomic_add_fetch_64(&pPool->allocMemSize, size);
×
316
    return code;
×
317
  }
318
  
319
  SMPJob* pJob = pSession->pJob;  
×
320
  int64_t cAllocSize = atomic_add_fetch_64(&pJob->job.allocMemSize, size);
×
321
  int64_t quota = atomic_load_32(pPool->cfg.jobQuota);
×
322
  if (quota > 0 && cAllocSize > (quota * 1048576L)) {
×
323
    code = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD;
×
324
    uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pJob->job.jobId, cAllocSize, quota);
×
325
    pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code);
×
326
    mpSchedTrim(NULL);
×
327
    (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
×
328
    MP_RET(code);
×
329
  }
330

331
  if (atomic_load_64(&tsCurrentAvailMemorySize) <= (pPool->cfg.reserveSize + size)) {
×
332
    code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
×
333
    uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %" PRId64 " bytes", 
×
334
        pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, pPool->cfg.reserveSize);
335
    pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code);
×
336
    mpSchedTrim(NULL);
×
337
    (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
×
338
    MP_RET(code);
×
339
  }
340

341
  (void)atomic_add_fetch_64(&pPool->allocMemSize, size);
×
342

343
/*
344
  int64_t pAllocSize = atomic_add_fetch_64(&pPool->allocMemSize, size);
345
  if (pAllocSize >= atomic_load_32(pPool->cfg.upperLimitSize) * 1048576UL) {
346
    code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
347
    uWarn("%s pool allocSize %" PRId64 " reaches the upperLimit %" PRId64, pPool->name, pAllocSize, atomic_load_32(pPool->cfg.upperLimitSize) * 1048576UL);
348
    pPool->cfg.cb.retireJobFp(&pJob->job, code);
349
    (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
350
    (void)atomic_sub_fetch_64(&pPool->allocMemSize, size);
351
    MP_RET(code);
352
  }
353
*/
354

355
  return TSDB_CODE_SUCCESS;
×
356
}
357

358
int64_t mpGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) {
×
359
  return (*gMPFps[gMPMgmt.strategy].getSizeFp)(pPool, pSession, ptr);
×
360
}
361

362
int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes) {
×
363
  MP_RET((*gMPFps[gMPMgmt.strategy].allocFp)(pPool, pSession, size, alignment, ppRes));
×
364
}
365

366
int32_t mpCalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, void** ppRes) {
×
367
  int32_t code = TSDB_CODE_SUCCESS;
×
368
  void *res = NULL;
×
369

370
  MP_ERR_RET(mpMalloc(pPool, pSession, size, 0, &res));
×
371

372
  if (NULL != res) {
×
373
    TAOS_MEMSET(res, 0, *size);
×
374
  }
375

376
_return:
×
377

378
  *ppRes = res;
×
379

380
  return code;
×
381
}
382

383

384
void mpFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) {
×
385
  if (NULL == ptr) {
×
386
    if (origSize) {
×
387
      *origSize = 0;
×
388
    }
389
    
390
    return;
×
391
  }
392

393
  (*gMPFps[gMPMgmt.strategy].freeFp)(pPool, pSession, ptr, origSize);
×
394
}
395

396
int32_t mpRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) {
×
397
  int32_t code = TSDB_CODE_SUCCESS;
×
398

399
  if (NULL == *pPtr) {
×
400
    *origSize = 0;
×
401
    MP_RET(mpMalloc(pPool, pSession, size, 0, pPtr));
×
402
  }
403

404
  if (0 == *size) {
×
405
    mpFree(pPool, pSession, *pPtr, origSize);
×
406
    *pPtr = NULL;
×
407
    return TSDB_CODE_SUCCESS;
×
408
  }
409

410
  *origSize = mpGetMemorySizeImpl(pPool, pSession, *pPtr);
×
411

412
  MP_RET((*gMPFps[gMPMgmt.strategy].reallocFp)(pPool, pSession, pPtr, size, origSize));
×
413
}
414

415
int32_t mpTrim(SMemPool* pPool, SMPSession* pSession, int32_t size, bool* trimed) {
×
416
  int32_t code = TSDB_CODE_SUCCESS;
×
417

418
  if (gMPFps[gMPMgmt.strategy].trimFp) {
×
419
    MP_RET((*gMPFps[gMPMgmt.strategy].trimFp)(pPool, pSession, size, trimed));
×
420
  }
421

422
  return code;
×
423
}
424

425

426
void mpPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName, int64_t maxAllocSize) {
×
427
  if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
×
428
    return;
×
429
  }
430

431
  uInfo("MemPool [%s] stat detail:", detailName);
×
432

433
  uInfo("Max Used Memory Size: %" PRId64, maxAllocSize);
×
434
  
435
  uInfo("[times]:");
×
436
  switch (gMPMgmt.strategy) {
×
437
    case E_MP_STRATEGY_DIRECT:
×
438
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Malloc", pDetail->times.memMalloc));
×
439
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Calloc", pDetail->times.memCalloc));
×
440
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Realloc", pDetail->times.memRealloc));
×
441
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->times.memStrdup));
×
442
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->times.memStrndup));
×
443
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Free", pDetail->times.memFree));
×
444
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Trim", pDetail->times.memTrim));
×
445
      break;
×
446
    case E_MP_STRATEGY_CHUNK:
×
447
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkMalloc", pDetail->times.chunkMalloc));
×
448
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkRecycle", pDetail->times.chunkRecycle));
×
449
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkReUse", pDetail->times.chunkReUse));
×
450
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkFree", pDetail->times.chunkFree));
×
451
      break;
×
452
    default:
×
453
      break;
×
454
  }
455
  
456
  uInfo("[bytes]:");
×
457
  switch (gMPMgmt.strategy) {
×
458
    case E_MP_STRATEGY_DIRECT:  
×
459
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Malloc", pDetail->bytes.memMalloc));
×
460
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Calloc", pDetail->bytes.memCalloc));
×
461
      uInfo(MP_STAT_ORIG_FORMAT, MP_STAT_ORIG_VALUE("Realloc", pDetail->bytes.memRealloc));
×
462
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->bytes.memStrdup));
×
463
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->bytes.memStrndup));
×
464
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Free", pDetail->bytes.memFree));
×
465
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Trim", pDetail->bytes.memTrim));
×
466
      break;
×
467
  case E_MP_STRATEGY_CHUNK:
×
468
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkMalloc", pDetail->bytes.chunkMalloc));
×
469
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkRecycle", pDetail->bytes.chunkRecycle));
×
470
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkReUse", pDetail->bytes.chunkReUse));
×
471
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkFree", pDetail->bytes.chunkFree));
×
472
      break;
×
473
    default:
×
474
      break;
×
475
  }
476
}
477

478
int32_t mpAddToRemainAllocHash(SHashObj* pHash, SMPFileLine* pFileLine) {
×
479
  int32_t code = TSDB_CODE_SUCCESS;
×
480
  SMPAllocStat stat = {0}, *pStat = NULL;
×
481
  
482
  while (true) {
483
    pStat = (SMPAllocStat*)taosHashGet(pHash, &pFileLine->fl, sizeof(pFileLine->fl));
×
484
    if (NULL == pStat) {
×
485
      code = taosHashPut(pHash, &pFileLine->fl, sizeof(pFileLine->fl), &stat, sizeof(stat));
×
486
      if (TSDB_CODE_SUCCESS != code) {
×
487
        if (TSDB_CODE_DUP_KEY == code) {
×
488
          continue;
×
489
        }
490
        
491
        uError("taosHashPut to remain alloc hash failed, error:%s", tstrerror(code));
×
492
        return code;
×
493
      }
494

495
      continue;
×
496
    }
497

498
    (void)atomic_add_fetch_64(&pStat->allocBytes, pFileLine->size);
×
499
    (void)atomic_add_fetch_64(&pStat->allocTimes, 1);
×
500
    break;
×
501
  }
502

503
  return TSDB_CODE_SUCCESS;
×
504
}
505

506
void mpPrintPosRemainStat(SMPStatPos* pStat) {
×
507
  int32_t code = TSDB_CODE_SUCCESS;
×
508
  int32_t remainNum = taosHashGetSize(pStat->remainHash);
×
509
  if (remainNum <= 0) {
×
510
    uInfo("no alloc remaining memory");
×
511
    return;
×
512
  }
513
  
514
  SHashObj* pAllocHash = taosHashInit(remainNum / 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
×
515
  if (NULL == pAllocHash) {
×
516
    uError("taosHashInit pAllocHash failed, error:%s, remainNum:%d", tstrerror(terrno), remainNum);
×
517
    return;
×
518
  }
519

520
  SMPFileLine* pFileLine = NULL;
×
521
  void* pIter = taosHashIterate(pStat->remainHash, NULL);
×
522
  while (pIter) {
×
523
    pFileLine = (SMPFileLine*)pIter;
×
524

525
    MP_ERR_JRET(mpAddToRemainAllocHash(pAllocHash, pFileLine));
×
526

527
    pIter = taosHashIterate(pStat->remainHash, pIter);
×
528
  }
529

530
  SMPAllocStat* pAlloc = NULL;
×
531
  pIter = taosHashIterate(pAllocHash, NULL);
×
532
  while (pIter) {
×
533
    pAlloc = (SMPAllocStat*)pIter;
×
534
    SMPFileLineId* pId = (SMPFileLineId*)taosHashGetKey(pIter, NULL);
×
535
    SMPAllocStat* pAlloc = (SMPAllocStat*)taosHashGet(pStat->allocHash, pId, sizeof(*pId));
×
536
    char* pFileName = (char*)taosHashGet(pStat->fileHash, &pId->fileId, sizeof(pId->fileId));
×
537
    if (NULL == pAlloc || NULL == pFileName) {
×
538
      uError("fail to get pId in allocHash or fileHash, pAlloc:%p, pFileName:%p", pAlloc, pFileName);
×
539
      goto _return;
×
540
    }
541

542
    uInfo("REMAINING: %" PRId64 " bytes alloced by %s:%d in %" PRId64 " times", pAlloc->allocBytes, pFileName, pId->line, pAlloc->allocTimes);
×
543
    
544
    pIter = taosHashIterate(pAllocHash, pIter);
×
545
  }
546

547
_return:
×
548

549
  taosHashCleanup(pAllocHash);
×
550
}
551

552
void mpPrintPosAllocStat(SMPStatPos* pStat) {
×
553

554
}
×
555

556
void mpPrintPosFreeStat(SMPStatPos* pStat) {
×
557

558
}
×
559

560
void mpPrintPosStat(SMPCtrlInfo* pCtrl, SMPStatPos* pStat, char* detailName) {
×
561
  if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
×
562
    return;
×
563
  }
564

565
  uInfo("MemPool [%s] Pos Stat:", detailName);
×
566
  uInfo("error times: %" PRId64, pStat->logErrTimes);
×
567

568
  mpPrintPosRemainStat(pStat);
×
569

570
  mpPrintPosAllocStat(pStat);
×
571

572
  mpPrintPosFreeStat(pStat);
×
573
}
574

575
void mpPrintNodeStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) {
×
576
  //TODO
577
}
×
578

579
void mpPrintSessionStat(SMPCtrlInfo* pCtrl, SMPStatSession* pSessStat, char* detailName) {
×
580
  if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
×
581
    return;
×
582
  }
583

584
  uInfo("MemPool [%s] session stat:", detailName);
×
585
  uInfo("init session succeed num: %" PRId64, pSessStat->initSucc);
×
586
  uInfo("init session failed num: %" PRId64, pSessStat->initFail);
×
587
  uInfo("session destroyed num: %" PRId64, pSessStat->destroyNum);
×
588
}
589

590

591

592
void mpLogDetailStat(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* pInput) {
×
593
  switch (item) {
×
594
    case E_MP_STAT_LOG_MEM_MALLOC: {
×
595
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
×
596
        (void)atomic_add_fetch_64(&pDetail->times.memMalloc.exec, 1);
×
597
        (void)atomic_add_fetch_64(&pDetail->bytes.memMalloc.exec, pInput->size);
×
598
      }
599
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
×
600
        (void)atomic_add_fetch_64(&pDetail->times.memMalloc.succ, 1);
×
601
        (void)atomic_add_fetch_64(&pDetail->bytes.memMalloc.succ, pInput->size);
×
602
      } 
603
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
×
604
        (void)atomic_add_fetch_64(&pDetail->times.memMalloc.fail, 1);
×
605
        (void)atomic_add_fetch_64(&pDetail->bytes.memMalloc.fail, pInput->size);
×
606
      } 
607
      break;
×
608
    }
609
    case E_MP_STAT_LOG_MEM_CALLOC:{
×
610
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
×
611
        (void)atomic_add_fetch_64(&pDetail->times.memCalloc.exec, 1);
×
612
        (void)atomic_add_fetch_64(&pDetail->bytes.memCalloc.exec, pInput->size);
×
613
      }
614
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
×
615
        (void)atomic_add_fetch_64(&pDetail->times.memCalloc.succ, 1);
×
616
        (void)atomic_add_fetch_64(&pDetail->bytes.memCalloc.succ, pInput->size);
×
617
      } 
618
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
×
619
        (void)atomic_add_fetch_64(&pDetail->times.memCalloc.fail, 1);
×
620
        (void)atomic_add_fetch_64(&pDetail->bytes.memCalloc.fail, pInput->size);
×
621
      } 
622
      break;
×
623
    }
624
    case E_MP_STAT_LOG_MEM_REALLOC:{
×
625
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
×
626
        (void)atomic_add_fetch_64(&pDetail->times.memRealloc.exec, 1);
×
627
        (void)atomic_add_fetch_64(&pDetail->bytes.memRealloc.exec, pInput->size);
×
628
        (void)atomic_add_fetch_64(&pDetail->bytes.memRealloc.origExec, pInput->origSize);
×
629
      }
630
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
×
631
        (void)atomic_add_fetch_64(&pDetail->times.memRealloc.succ, 1);
×
632
        (void)atomic_add_fetch_64(&pDetail->bytes.memRealloc.succ, pInput->size);
×
633
        (void)atomic_add_fetch_64(&pDetail->bytes.memRealloc.origSucc, pInput->origSize);
×
634
      } 
635
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
×
636
        (void)atomic_add_fetch_64(&pDetail->times.memRealloc.fail, 1);
×
637
        (void)atomic_add_fetch_64(&pDetail->bytes.memRealloc.fail, pInput->size);
×
638
        (void)atomic_add_fetch_64(&pDetail->bytes.memRealloc.origFail, pInput->origSize);
×
639
      } 
640
      break;
×
641
    }
642
    case E_MP_STAT_LOG_MEM_FREE:{
×
643
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
×
644
        (void)atomic_add_fetch_64(&pDetail->times.memFree.exec, 1);
×
645
        (void)atomic_add_fetch_64(&pDetail->bytes.memFree.exec, pInput->size);
×
646
      }
647
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
×
648
        (void)atomic_add_fetch_64(&pDetail->times.memFree.succ, 1);
×
649
        (void)atomic_add_fetch_64(&pDetail->bytes.memFree.succ, pInput->size);
×
650
      } 
651
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
×
652
        (void)atomic_add_fetch_64(&pDetail->times.memFree.fail, 1);
×
653
        (void)atomic_add_fetch_64(&pDetail->bytes.memFree.fail, pInput->size);
×
654
      } 
655
      break;
×
656
    }
657
    case E_MP_STAT_LOG_MEM_STRDUP: {
×
658
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
×
659
        (void)atomic_add_fetch_64(&pDetail->times.memStrdup.exec, 1);
×
660
        (void)atomic_add_fetch_64(&pDetail->bytes.memStrdup.exec, pInput->size);
×
661
      }
662
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
×
663
        (void)atomic_add_fetch_64(&pDetail->times.memStrdup.succ, 1);
×
664
        (void)atomic_add_fetch_64(&pDetail->bytes.memStrdup.succ, pInput->size);
×
665
      } 
666
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
×
667
        (void)atomic_add_fetch_64(&pDetail->times.memStrdup.fail, 1);
×
668
        (void)atomic_add_fetch_64(&pDetail->bytes.memStrdup.fail, pInput->size);
×
669
      } 
670
      break;
×
671
    }
672
    case E_MP_STAT_LOG_MEM_STRNDUP: {
×
673
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
×
674
        (void)atomic_add_fetch_64(&pDetail->times.memStrndup.exec, 1);
×
675
        (void)atomic_add_fetch_64(&pDetail->bytes.memStrndup.exec, pInput->size);
×
676
      }
677
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
×
678
        (void)atomic_add_fetch_64(&pDetail->times.memStrndup.succ, 1);
×
679
        (void)atomic_add_fetch_64(&pDetail->bytes.memStrndup.succ, pInput->size);
×
680
      } 
681
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
×
682
        (void)atomic_add_fetch_64(&pDetail->times.memStrndup.fail, 1);
×
683
        (void)atomic_add_fetch_64(&pDetail->bytes.memStrndup.fail, pInput->size);
×
684
      } 
685
      break;
×
686
    }
687
    case E_MP_STAT_LOG_MEM_TRIM: {
×
688
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
×
689
        (void)atomic_add_fetch_64(&pDetail->times.memTrim.exec, 1);
×
690
      }
691
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
×
692
        (void)atomic_add_fetch_64(&pDetail->times.memTrim.succ, 1);
×
693
        (void)atomic_add_fetch_64(&pDetail->bytes.memTrim.succ, pInput->size);
×
694
      } 
695
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
×
696
        (void)atomic_add_fetch_64(&pDetail->times.memTrim.fail, 1);
×
697
      } 
698
      break;
×
699
    }
700
    case E_MP_STAT_LOG_CHUNK_MALLOC:  
×
701
    case E_MP_STAT_LOG_CHUNK_RECYCLE:  
702
    case E_MP_STAT_LOG_CHUNK_REUSE:
703
    case E_MP_STAT_LOG_CHUNK_FREE: {
704

705
    }
706
    default:
707
      uError("Invalid stat item: %d", item);
×
708
      break;
×
709
  }
710
}
×
711

712
int32_t mpGetAllocFreeStat(SHashObj* pHash, void* pKey, int32_t keyLen, void* pNew, int32_t newSize, void** ppRes) {
×
713
  void* pStat = NULL;
×
714
  int32_t code = TSDB_CODE_SUCCESS;
×
715

716
  while (true) {
717
    pStat = taosHashGet(pHash, pKey, keyLen);
×
718
    if (NULL != pStat) {
×
719
      *ppRes = pStat;
×
720
      break;
×
721
    }
722

723
    code = taosHashPut(pHash, pKey, keyLen, pNew, newSize);
×
724
    if (code) {
×
725
      if (TSDB_CODE_DUP_KEY == code) {
×
726
        continue;
×
727
      }
728

729
      return code;
×
730
    }
731
  }
732

733
  return TSDB_CODE_SUCCESS;
×
734
}
735

736
int32_t mpGetPosStatFileId(SMPStatPos* pStat, char* fileName, uint32_t* pId, bool sessionStat) {
×
737
  uint32_t hashVal = MurmurHash3_32(fileName, strlen(fileName));
×
738
  int32_t code = taosHashPut(pStat->fileHash, &hashVal, sizeof(hashVal), fileName, strlen(fileName) + 1);
×
739
  if (code && TSDB_CODE_DUP_KEY != code) {
×
740
    return code;
×
741
  }
742

743
  *pId = hashVal;
×
744
  
745
  return TSDB_CODE_SUCCESS;
×
746
}
747

748
void mpLogPosStat(SMPStatPos* pStat, EMPStatLogItem item, SMPStatInput* pInput, bool sessionStat) {
×
749
  if (!MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
×
750
    return;
×
751
  }
752

753
  int32_t code = TSDB_CODE_SUCCESS;
×
754
  
755
  switch (item) {
×
756
    case E_MP_STAT_LOG_MEM_MALLOC: 
×
757
    case E_MP_STAT_LOG_MEM_CALLOC: 
758
    case E_MP_STAT_LOG_MEM_STRDUP: 
759
    case E_MP_STAT_LOG_MEM_STRNDUP: {
760
      SMPAllocStat allocStat = {0}, *pAlloc = NULL;
×
761
      SMPFileLine fileLine = {.fl.line = pInput->line, .size = pInput->size};
×
762
      code = mpGetPosStatFileId(pStat, pInput->file, &fileLine.fl.fileId, sessionStat);
×
763
      if (TSDB_CODE_SUCCESS != code) {
×
764
        uError("add pMem:%p %s:%d to fileHash failed, error:%s, sessionStat:%d", 
×
765
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
766
        MP_ERR_JRET(code);
×
767
      }
768
      code = taosHashPut(pStat->remainHash, &pInput->pMem, POINTER_BYTES, &fileLine, sizeof(fileLine));
×
769
      if (TSDB_CODE_SUCCESS != code) {
×
770
        if (TSDB_CODE_DUP_KEY == code) {
×
771
          SMPFileLine* pFileLine = (SMPFileLine*)taosHashAcquire(pStat->remainHash, &pInput->pMem, POINTER_BYTES);
×
772
          if (pFileLine) {
×
773
            char* pFileName = (char*)taosHashGet(pStat->fileHash, &pFileLine->fl.fileId, sizeof(pFileLine->fl.fileId));
×
774
            if (NULL == pFileName) {
×
775
              uError("fail to get fileId %u in fileHash", pFileLine->fl.fileId);
×
776
            } else {
777
              uError("add pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d, origAllocAt %s:%d", 
×
778
                pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat, pFileName, pFileLine->fl.line);
779
              MP_ERR_JRET(code);
×
780
            }
781
          }
782
        }
783

784
        uError("add pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d", 
×
785
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
786
        
787
        MP_ERR_JRET(code);
×
788
      }
789
      code = mpGetAllocFreeStat(pStat->allocHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&allocStat, sizeof(allocStat), (void**)&pAlloc);
×
790
      if (TSDB_CODE_SUCCESS != code) {
×
791
        uError("add pMem:%p %s:%d to allocHash failed, error:%s, sessionStat:%d", 
×
792
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
793
        MP_ERR_JRET(code);
×
794
      }
795
      
796
      (void)atomic_add_fetch_64(&pAlloc->allocTimes, 1);
×
797
      (void)atomic_add_fetch_64(&pAlloc->allocBytes, pInput->size);
×
798
      break;
×
799
    }
800
    case E_MP_STAT_LOG_MEM_REALLOC: {
×
801
      SMPAllocStat allocStat = {0}, *pAlloc = NULL;
×
802
      SMPFreeStat freeStat = {0}, *pFree = NULL;
×
803
      SMPFileLine fileLine = {.fl.line = pInput->line, .size = pInput->size};
×
804
      code = mpGetPosStatFileId(pStat, pInput->file, &fileLine.fl.fileId, sessionStat);
×
805
      if (TSDB_CODE_SUCCESS != code) {
×
806
        uError("realloc: add pMem:%p %s:%d to fileHash failed, error:%s, sessionStat:%d", 
×
807
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
808
        MP_ERR_JRET(code);
×
809
      }
810

811
      // ASSSRT((pInput->pOrigMem && pInput->origSize > 0) || (NULL == pInput->pOrigMem && pInput->origSize == 0));
812
      
813
      if (pInput->pOrigMem && pInput->origSize > 0) {
×
814
        code = taosHashRemove(pStat->remainHash, &pInput->pOrigMem, POINTER_BYTES);
×
815
        if (TSDB_CODE_SUCCESS != code) {
×
816
          uError("realloc: rm pOrigMem:%p %s:%d from remainHash failed, error:%s, sessionStat:%d", 
×
817
            pInput->pOrigMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
818
          MP_ERR_JRET(code);
×
819
        }
820
        code = mpGetAllocFreeStat(pStat->freeHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&freeStat, sizeof(freeStat), (void**)&pFree);
×
821
        if (TSDB_CODE_SUCCESS != code) {
×
822
          uError("realloc: add pOrigMem:%p %s:%d to freeHash failed, error:%s, sessionStat:%d", 
×
823
            pInput->pOrigMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
824
          MP_ERR_JRET(code);
×
825
        }
826
        
827
        (void)atomic_add_fetch_64(&pFree->freeTimes, 1);
×
828
        (void)atomic_add_fetch_64(&pFree->freeBytes, pInput->origSize);
×
829
      }
830
      
831
      code = taosHashPut(pStat->remainHash, &pInput->pMem, POINTER_BYTES, &fileLine, sizeof(fileLine));
×
832
      if (TSDB_CODE_SUCCESS != code) {
×
833
        if (TSDB_CODE_DUP_KEY == code) {
×
834
          SMPFileLine* pFileLine = (SMPFileLine*)taosHashAcquire(pStat->remainHash, &pInput->pMem, POINTER_BYTES);
×
835
          if (pFileLine) {
×
836
            char* pFileName = (char*)taosHashGet(pStat->fileHash, &pFileLine->fl.fileId, sizeof(pFileLine->fl.fileId));
×
837
            if (NULL == pFileName) {
×
838
              uError("realloc: fail to get fileId %u in fileHash", pFileLine->fl.fileId);
×
839
            } else {
840
              uError("realloc: add pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d, origAllocAt %s:%d", 
×
841
                pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat, pFileName, pFileLine->fl.line);
842
              MP_ERR_JRET(code);
×
843
            }
844
          }
845
        }
846

847
        uError("realloc: add pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d", 
×
848
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
849
        MP_ERR_JRET(code);
×
850
      }
851
      
852
      code = mpGetAllocFreeStat(pStat->allocHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&allocStat, sizeof(allocStat), (void**)&pAlloc);
×
853
      if (TSDB_CODE_SUCCESS != code) {
×
854
        uError("realloc: add pMem:%p %s:%d to allocHash failed, error:%s, sessionStat:%d", 
×
855
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
856
        MP_ERR_JRET(code);
×
857
      }
858

859
      (void)atomic_add_fetch_64(&pAlloc->allocTimes, 1);
×
860
      (void)atomic_add_fetch_64(&pAlloc->allocBytes, pInput->size);
×
861
      break;
×
862
    }
863
    case E_MP_STAT_LOG_MEM_FREE: {
×
864
      SMPAllocStat allocStat = {0}, *pAlloc = NULL;
×
865
      SMPFreeStat freeStat = {0}, *pFree = NULL;
×
866
      SMPFileLineId fl = {.line = pInput->line};
×
867
      code = mpGetPosStatFileId(pStat, pInput->file, &fl.fileId, sessionStat);
×
868
      if (TSDB_CODE_SUCCESS != code) {
×
869
        uError("free: add pMem:%p %s:%d to fileHash failed, error:%s, sessionStat:%d", 
×
870
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
871
        MP_ERR_JRET(code);
×
872
      }
873

874
      code = taosHashRemove(pStat->remainHash, &pInput->pMem, POINTER_BYTES);
×
875
      if (TSDB_CODE_SUCCESS != code) {
×
876
        uDebug("free: rm pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d", 
×
877
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
878
      }
879

880
      code = mpGetAllocFreeStat(pStat->freeHash, &fl, sizeof(fl), (void*)&freeStat, sizeof(freeStat), (void**)&pFree);
×
881
      if (TSDB_CODE_SUCCESS != code) {
×
882
        uError("free: add pMem:%p %s:%d to freeHash failed, error:%s, sessionStat:%d", 
×
883
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
884
        MP_ERR_JRET(code);
×
885
      }
886
        
887
      (void)atomic_add_fetch_64(&pFree->freeTimes, 1);
×
888
      (void)atomic_add_fetch_64(&pFree->freeBytes, pInput->size);
×
889
      break;
×
890
    }
891
    case E_MP_STAT_LOG_MEM_TRIM:
×
892
      break;
×
893
    case E_MP_STAT_LOG_CHUNK_MALLOC:  
×
894
    case E_MP_STAT_LOG_CHUNK_RECYCLE:  
895
    case E_MP_STAT_LOG_CHUNK_REUSE:
896
    case E_MP_STAT_LOG_CHUNK_FREE: {
897
      break;
×
898
    }
899
    default:
×
900
      uError("Invalid stat item: %d", item);
×
901
      break;
×
902
  }
903

904
  return;
×
905

906
_return:
×
907
  
908
  (void)atomic_add_fetch_64(&pStat->logErrTimes, 1);
×
909
}
910

911

912
void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPStatInput* pInput) {
×
913
  bool enablePool = false, randErr = false;
×
914
  
915
  switch (item) {
×
916
    case E_MP_STAT_LOG_MEM_MALLOC:
×
917
    case E_MP_STAT_LOG_MEM_CALLOC:
918
    case E_MP_STAT_LOG_MEM_REALLOC:
919
    case E_MP_STAT_LOG_MEM_FREE:
920
    case E_MP_STAT_LOG_MEM_STRDUP: 
921
    case E_MP_STAT_LOG_MEM_STRNDUP: 
922
    case E_MP_STAT_LOG_MEM_TRIM: {
923
      if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) {
×
924
        mpLogDetailStat(&pSession->stat.statDetail, item, pInput);
×
925
      }
926
      if (MP_GET_FLAG(gMPMgmt.ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) {
×
927
        mpLogDetailStat(&pPool->stat.statDetail, item, pInput);
×
928
      }
929
      if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
×
930
        taosSaveDisableMemPoolUsage(enablePool, randErr);
×
931
        mpLogPosStat(&pSession->stat.posStat, item, pInput, true);
×
932
        taosRestoreEnableMemPoolUsage(enablePool, randErr);
×
933
      }
934
      if (MP_GET_FLAG(gMPMgmt.ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
×
935
        taosSaveDisableMemPoolUsage(enablePool, randErr);
×
936
        mpLogPosStat(&pPool->stat.posStat, item, pInput, false);
×
937
        taosRestoreEnableMemPoolUsage(enablePool, randErr);
×
938
      }
939
      break;
×
940
    }
941
    case E_MP_STAT_LOG_CHUNK_MALLOC:  
×
942
    case E_MP_STAT_LOG_CHUNK_RECYCLE:  
943
    case E_MP_STAT_LOG_CHUNK_REUSE:
944
    case E_MP_STAT_LOG_CHUNK_FREE: {
945
      break;
×
946
    }
947
    default:
×
948
      uError("Invalid stat item: %d", item);
×
949
      break;
×
950
  }
951
}
×
952

953
void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) {
×
954
  if (0 == tsMemPoolFullFunc) {
×
955
    return;
×
956
  }
957
  
958
  SMemPool* pPool = (SMemPool*)poolHandle;
×
959
  SMPSession* pSession = (SMPSession*)session;
×
960
  SMPCtrlInfo* pCtrl = NULL;
×
961
  SMPStatDetail* pDetail = NULL;
×
962

963
  if (NULL != session) {
×
964
    pCtrl = &pSession->ctrl;
×
965
    pDetail = &pSession->stat.statDetail;
×
966
    if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT)) {
×
967
      int64_t allocSize = MEMPOOL_GET_ALLOC_SIZE(pDetail);
×
968
      int64_t freeSize = MEMPOOL_GET_FREE_SIZE(pDetail);
×
969

970
      if (allocSize != freeSize) {
×
971
        uError("%s Session in JOB:0x%" PRIx64 " stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, 
×
972
            detailName, pSession->pJob->job.jobId, allocSize, freeSize);
973

974
        taosMemPoolPrintStat(NULL, pSession, detailName);
×
975
      } else {
976
        uDebug("%s Session in JOB:0x%" PRIx64 " stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64, 
×
977
            detailName, pSession->pJob->job.jobId, allocSize, freeSize);
978
      }
979
    }
980
  }
981

982
  if (NULL != poolHandle) {
×
983
    pCtrl = &gMPMgmt.ctrl;
×
984
    pDetail = &pPool->stat.statDetail;
×
985
    int64_t sessInit = atomic_load_64(&pPool->stat.statSession.initFail) + atomic_load_64(&pPool->stat.statSession.initSucc);
×
986
    if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT) && sessInit == atomic_load_64(&pPool->stat.statSession.destroyNum)) {
×
987
      int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.memStrdup.succ + pDetail->bytes.memStrndup.succ;
×
988
      int64_t freeSize = pDetail->bytes.memRealloc.origSucc + pDetail->bytes.memFree.succ;
×
989

990
      if (allocSize != freeSize) {
×
991
        uError("%s MemPool %s stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize);
×
992

993
        taosMemPoolPrintStat(poolHandle, NULL, detailName);
×
994
      } else {
995
        uDebug("%s MemPool %s stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize);
×
996
      }
997
    }
998
  }
999
}
1000

1001

1002
void mpCheckUpateCfg(void) {
×
1003
/*
1004
  taosRLockLatch(&gMPMgmt.poolLock);
1005
  int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList);
1006
  for (int32_t i = 0; i < poolNum; ++i) {
1007
    SMemPool* pPool = (SMemPool*)taosArrayGetP(gMPMgmt.poolList, i);
1008
    if (pPool->cfg.cb.cfgUpdateFp) {
1009
      (*pPool->cfg.cb.cfgUpdateFp)((void*)pPool, &pPool->cfg);
1010
    }
1011
  }
1012
  taosRUnLockLatch(&gMPMgmt.poolLock);
1013
*/  
1014
}
×
1015

1016
void mpUpdateSystemAvailableMemorySize() {
2,147,483,647✔
1017
  static int64_t errorTimes = 0;
1018
  int64_t sysAvailSize = 0;
2,147,483,647✔
1019
  
1020
  int32_t code = taosGetSysAvailMemory(&sysAvailSize);
2,147,483,647✔
1021
  if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1022
    errorTimes++;
×
1023
    if (0 == errorTimes % 1000) {
×
1024
      uError("get system available memory size failed, error: %s, errorTimes:%" PRId64, tstrerror(code), errorTimes);
×
1025
    }
1026
    
1027
    return;
×
1028
  }
1029

1030
  atomic_store_64(&tsCurrentAvailMemorySize, sysAvailSize);
2,147,483,647✔
1031

1032
  uTrace("system available memory size: %" PRId64, sysAvailSize);
2,147,483,647✔
1033
}
1034

1035
void mpSchedTrim(int64_t* loopTimes) {
7,995,541✔
1036
  static int64_t trimTimes = 0;
1037
  
1038
  atomic_store_8(&tsNeedTrim, 1);
7,995,541✔
1039
  if (loopTimes) {
7,995,541✔
1040
    *loopTimes = 0;
7,995,541✔
1041
  }
1042
  
1043
  uDebug("%" PRId64 "th memory trim scheduled", ++trimTimes);
7,995,541✔
1044
}
7,995,541✔
1045

1046
void* mpMgmtThreadFunc(void* param) {
582,216✔
1047
  int32_t timeout = 0;
582,216✔
1048
  int64_t retireSize = 0, loopTimes = 0;
582,216✔
1049
  SMemPool* pPool = (SMemPool*)atomic_load_ptr(&gMemPoolHandle);
582,216✔
1050
  
1051
  while (0 == atomic_load_8(&gMPMgmt.modExit)) {
2,147,483,647✔
1052
    mpUpdateSystemAvailableMemorySize();
2,147,483,647✔
1053

1054
    retireSize = pPool->cfg.reserveSize - atomic_load_64(&tsCurrentAvailMemorySize);
2,147,483,647✔
1055
    if (retireSize > 0) {
2,147,483,647✔
1056
      (*pPool->cfg.cb.failFp)(retireSize, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
×
1057

1058
      mpSchedTrim(&loopTimes);
×
1059
    }
1060

1061
    if (0 == (++loopTimes) % 500) {
2,147,483,647✔
1062
      mpSchedTrim(&loopTimes);
7,995,541✔
1063
    }
1064

1065
    taosMsleep(MP_DEFAULT_MEM_CHK_INTERVAL_MS);
2,147,483,647✔
1066
/*
1067
    timeout = tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs);
1068
    if (0 != timeout) {
1069
      mpUpdateSystemAvailableMemorySize();
1070
      continue;
1071
    }
1072

1073
    if (atomic_load_8(&gMPMgmt.msgQueue.midLevelRetire)) {
1074
      (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->cfg.retireUnitSize), false, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
1075
    } else if (atomic_load_8(&gMPMgmt.msgQueue.lowLevelRetire)) {
1076
      (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->cfg.retireUnitSize), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
1077
    }
1078
*/    
1079
  }
1080

1081
  taosMemPoolModDestroy();
562,340✔
1082
  
1083
  return NULL;
562,340✔
1084
}
1085

1086
int32_t mpCreateMgmtThread() {
582,216✔
1087
  int32_t code = TSDB_CODE_SUCCESS;
582,216✔
1088
  TdThreadAttr thAttr;
581,809✔
1089
  MP_ERR_RET(taosThreadAttrInit(&thAttr));
582,216✔
1090
  MP_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
582,216✔
1091
  code = taosThreadCreate(&gMPMgmt.poolMgmtThread, &thAttr, mpMgmtThreadFunc, NULL);
582,216✔
1092
  if (code != 0) {
582,216✔
1093
    uError("failed to create memPool mgmt thread, error: 0x%x", code);
×
1094
    (void)taosThreadAttrDestroy(&thAttr);
×
1095
    MP_ERR_JRET(code);
×
1096
  }
1097

1098
_return:
582,216✔
1099

1100
  MP_ERR_RET(taosThreadAttrDestroy(&thAttr));
582,216✔
1101

1102
  return code;
582,216✔
1103
}
1104

1105
void mpModInit(void) {
582,216✔
1106
  int32_t code = TSDB_CODE_SUCCESS;
582,216✔
1107

1108
  gMPMgmt.modExit = 0;
582,216✔
1109

1110
  taosInitRWLatch(&gMPMgmt.poolLock);
582,216✔
1111
  
1112
  gMPMgmt.poolList = taosArrayInit(10, POINTER_BYTES);
582,216✔
1113
  if (NULL == gMPMgmt.poolList) {
582,216✔
1114
    MP_ERR_JRET(terrno);
×
1115
  }
1116

1117
  gMPMgmt.strategy = E_MP_STRATEGY_DIRECT;
582,216✔
1118

1119
  gMPMgmt.ctrl.statFlags = MP_STAT_FLAG_LOG_ALL & (~MP_LOG_FLAG_ALL_POS);
582,216✔
1120
  gMPMgmt.ctrl.funcFlags = MP_CTRL_FLAG_PRINT_STAT | MP_CTRL_FLAG_CHECK_STAT | MP_CTRL_FLAG_LOG_MAXSIZE;
582,216✔
1121

1122
  //gMPMgmt.code = tsem2_init(&gMPMgmt.threadSem, 0, 0);
1123
  //if (TSDB_CODE_SUCCESS != gMPMgmt.code) {
1124
  //  uError("failed to init sem2, error: 0x%x", gMPMgmt.code);
1125
  //  return;
1126
  //}
1127

1128
  gMPMgmt.waitMs = MP_DEFAULT_MEM_CHK_INTERVAL_MS;
582,216✔
1129

1130
_return:
582,216✔
1131

1132
  gMPMgmt.code = code;
582,216✔
1133
}
582,216✔
1134

1135

1136
void mpFreePool(void* p) {
562,340✔
1137
  SMemPool* pPool = *(void**)p;
562,340✔
1138
  taosMemoryFree(pPool);
562,340✔
1139
}
562,340✔
1140

1141

1142
void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) {
×
1143
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1144
  SMPSession* pSession = (SMPSession*)session;
×
1145
  char detailName[128];
×
1146

1147
  if (NULL != pSession && MP_GET_FLAG(pSession->ctrl.funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
×
1148
    snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "Session");
×
1149
    detailName[sizeof(detailName) - 1] = 0;
×
1150
    mpPrintStatDetail(&pSession->ctrl, &pSession->stat.statDetail, detailName, pSession->maxAllocMemSize);
×
1151

1152
    snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionPos");
×
1153
    detailName[sizeof(detailName) - 1] = 0;
×
1154
    mpPrintPosStat(&pSession->ctrl, &pSession->stat.posStat, detailName);
×
1155
  }
1156

1157
  if (NULL != pPool && MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
×
1158
    snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name);
×
1159
    detailName[sizeof(detailName) - 1] = 0;
×
1160
    mpPrintSessionStat(&gMPMgmt.ctrl, &pPool->stat.statSession, detailName);
×
1161
    mpPrintStatDetail(&gMPMgmt.ctrl, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize);
×
1162

1163
    //snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode");
1164
    //detailName[sizeof(detailName) - 1] = 0;
1165
    //mpPrintNodeStat(&gMPMgmt.ctrl, pPool->stat.nodeStat, detailName);
1166
    
1167
    snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolPos");
×
1168
    detailName[sizeof(detailName) - 1] = 0;
×
1169
    mpPrintPosStat(&gMPMgmt.ctrl, &pPool->stat.posStat, detailName);
×
1170
  }
1171
}
×
1172

1173

1174
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
582,216✔
1175
  int32_t code = TSDB_CODE_SUCCESS;
582,216✔
1176
  SMemPool* pPool = NULL;
582,216✔
1177
  
1178
  //MP_ERR_JRET(taosThreadOnce(&gMPoolInit, mpModInit));
1179
  mpModInit();
582,216✔
1180
  
1181
  if (TSDB_CODE_SUCCESS != gMPMgmt.code) {
582,216✔
1182
    uError("init memory pool failed, code: 0x%x", gMPMgmt.code);
×
1183
    MP_ERR_JRET(gMPMgmt.code);
×
1184
  }
1185

1186
  pPool = (SMemPool*)taosMemoryCalloc(1, sizeof(SMemPool));
582,216✔
1187
  if (NULL == pPool) {
582,216✔
1188
    uError("calloc memory pool failed, code: 0x%x", terrno);
×
1189
    MP_ERR_JRET(terrno);
×
1190
  }
1191

1192
  MP_ERR_JRET(mpInit(pPool, poolName, cfg));
582,216✔
1193

1194
  taosWLockLatch(&gMPMgmt.poolLock);
582,216✔
1195
  
1196
  if (NULL == taosArrayPush(gMPMgmt.poolList, &pPool)) {
1,164,432✔
1197
    taosWUnLockLatch(&gMPMgmt.poolLock);
×
1198
    MP_ERR_JRET(terrno);
×
1199
  }
1200
  
1201
  pPool->slotId = taosArrayGetSize(gMPMgmt.poolList) - 1;
582,216✔
1202
  
1203
  taosWUnLockLatch(&gMPMgmt.poolLock);
582,216✔
1204

1205
  uInfo("mempool %s opened", poolName);
582,216✔
1206

1207
_return:
581,809✔
1208

1209
  if (TSDB_CODE_SUCCESS != code) {
582,216✔
1210
    taosMemPoolClose(pPool);
×
1211
    pPool = NULL;
×
1212
  }
1213

1214
  *poolHandle = pPool;
582,216✔
1215

1216
  return code;
582,216✔
1217
}
1218

1219
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) {
×
1220
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1221

1222
  (void)mpUpdateCfg(pPool);
×
1223
}
×
1224

1225
void taosMemPoolDestroySession(void* poolHandle, void* session) {
302,333,363✔
1226
  SMemPool* pPool = (SMemPool*)poolHandle;
302,333,363✔
1227
  SMPSession* pSession = (SMPSession*)session;
302,333,363✔
1228
  if (NULL == poolHandle || NULL == pSession) {
302,333,363✔
UNCOV
1229
    uWarn("null pointer of poolHandle %p or session %p", poolHandle, session);
×
1230
    return;
×
1231
  }
1232

1233
  if (tsMemPoolFullFunc) {
302,338,237✔
1234
    (void)atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1);
×
1235
    mpCheckStatDetail(pPool, pSession, "DestroySession");
×
1236
    mpDestroyPosStat(&pSession->stat.posStat);
×
1237
  }
1238
  
1239
  //taosMemFreeClear(pSession->sessionId);
1240

1241
  TAOS_MEMSET(pSession, 0, sizeof(*pSession));
302,331,347✔
1242

1243
  mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession);
302,331,347✔
1244
}
1245

1246
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, char* sessionId) {
302,278,639✔
1247
  int32_t code = TSDB_CODE_SUCCESS;
302,278,639✔
1248
  SMemPool* pPool = (SMemPool*)poolHandle;
302,278,639✔
1249
  SMPSession* pSession = NULL;
302,278,639✔
1250

1251
  MP_ERR_JRET(mpPopIdleNode(pPool, &pPool->sessionCache, (void**)&pSession));
302,308,578✔
1252

1253
/*
1254
  pSession->sessionId = taosStrdup(sessionId);
1255
  if (NULL == pSession->sessionId) {
1256
    uError("strdup sessionId failed, error:%s", tstrerror(terrno));
1257
    MP_ERR_JRET(terrno);
1258
  }
1259
*/
1260

1261
  TAOS_MEMCPY(&pSession->ctrl, &gMPMgmt.ctrl, sizeof(pSession->ctrl));
302,332,358✔
1262

1263
  if (gMPFps[gMPMgmt.strategy].initSessionFp) {
302,310,739✔
1264
    MP_ERR_JRET((*gMPFps[gMPMgmt.strategy].initSessionFp)(pPool, pSession));
×
1265
  }
1266

1267
  if (tsMemPoolFullFunc) {
302,257,895✔
1268
    MP_ERR_JRET(mpInitPosStat(&pSession->stat.posStat, true));
×
1269
  }
1270
  
1271
  pSession->pJob = (SMPJob*)pJob;
302,257,895✔
1272

1273
_return:
302,305,925✔
1274

1275
  if (TSDB_CODE_SUCCESS != code) {
302,304,416✔
1276
    taosMemPoolDestroySession(poolHandle, pSession);
×
1277
    pSession = NULL;
×
1278
    (void)atomic_add_fetch_64(&pPool->stat.statSession.initFail, 1);
×
1279
  } else {
1280
    (void)atomic_add_fetch_64(&pPool->stat.statSession.initSucc, 1);
302,304,416✔
1281
  }
1282

1283
  *ppSession = pSession;
302,237,745✔
1284

1285
  return code;
302,234,108✔
1286
}
1287

1288

1289
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
2,147,483,647✔
1290
  if (0 == tsMemPoolFullFunc) {
2,147,483,647✔
1291
    return mpDirectAlloc(poolHandle, ((SMPSession*)session)->pJob, size);
2,147,483,647✔
1292
  }
1293
  
1294
  int32_t code = TSDB_CODE_SUCCESS;
×
1295
  SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
×
1296
  
1297
  if (NULL == poolHandle || NULL == fileName || size < 0) {
×
1298
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, size);
×
1299
    MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1300
  }
1301

1302
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1303
  SMPSession* pSession = (SMPSession*)session;
×
1304

1305
  code = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem);
×
1306

1307
  MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
×
1308
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input);
×
1309

1310
_return:
×
1311

1312
  if (TSDB_CODE_SUCCESS != code) {
×
1313
    terrno = code;
×
1314
  }
1315
  
1316
  return input.pMem;
×
1317
}
1318

1319
void   *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
2,147,483,647✔
1320
  if (0 == tsMemPoolFullFunc) {
2,147,483,647✔
1321
    return mpDirectCalloc(poolHandle, ((SMPSession*)session)->pJob, num, size);
2,147,483,647✔
1322
  }
1323

1324
  int32_t code = TSDB_CODE_SUCCESS;
×
1325
  int64_t totalSize = num * size;
×
1326
  SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
×
1327
  
1328
  if (NULL == poolHandle || NULL == fileName || num < 0 || size < 0) {
×
1329
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, num:%" PRId64 ", size:%" PRId64, 
×
1330
      __FUNCTION__, poolHandle, session, fileName, num, size);
1331
    MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1332
  }
1333

1334
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1335
  SMPSession* pSession = (SMPSession*)session;
×
1336

1337
  code = mpCalloc(pPool, pSession, &input.size, &input.pMem);
×
1338

1339
  MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
×
1340
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input);
×
1341

1342
_return:
×
1343

1344
  if (TSDB_CODE_SUCCESS != code) {
×
1345
    terrno = code;
×
1346
  }
1347

1348
  return input.pMem;
×
1349
}
1350

1351
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) {
2,147,483,647✔
1352
  if (0 == tsMemPoolFullFunc) {
2,147,483,647✔
1353
    return mpDirectRealloc(poolHandle, ((SMPSession*)session)->pJob, ptr, size);
2,147,483,647✔
1354
  }
1355

1356
  int32_t code = TSDB_CODE_SUCCESS;
×
1357
  SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .origSize = 0, .pMem = ptr, .pOrigMem = ptr};
×
1358
  
1359
  if (NULL == poolHandle || NULL == fileName || size < 0) {
×
1360
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, 
×
1361
      __FUNCTION__, poolHandle, session, fileName, size);
1362
    MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1363
  }
1364

1365
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1366
  SMPSession* pSession = (SMPSession*)session;
×
1367

1368
  code = mpRealloc(pPool, pSession, &input.pMem, &input.size, &input.origSize);
×
1369

1370
  if (NULL != input.pMem) {
×
1371
    MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
×
1372
    mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input);
×
1373
  } else if (0 == size){
×
1374
    input.pMem = input.pOrigMem;
×
1375
    input.pOrigMem = NULL;
×
1376
    input.size = input.origSize;
×
1377
    MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
×
1378
    mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input);
×
1379
    input.pMem = NULL;
×
1380
  } else {
1381
    MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_FAIL);
×
1382
    mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input);
×
1383

1384
    input.pMem = input.pOrigMem;
×
1385
    input.pOrigMem = NULL;
×
1386
    input.size = input.origSize;
×
1387
    input.procFlags = MP_STAT_PROC_FLAG_EXEC;
×
1388
    MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
×
1389
    mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input);
×
1390
    input.pMem = NULL;
×
1391
  }
1392

1393
_return:
×
1394

1395
  if (TSDB_CODE_SUCCESS != code) {
×
1396
    terrno = code;
×
1397
  }
1398

1399
  return input.pMem;
×
1400
}
1401

1402
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) {
1,598,949,950✔
1403
  if (0 == tsMemPoolFullFunc) {
1,598,949,950✔
1404
    return mpDirectStrdup(poolHandle, ((SMPSession*)session)->pJob, ptr);
1,599,253,066✔
1405
  }
1406

1407
  int32_t code = TSDB_CODE_SUCCESS;
×
1408
  int64_t size = (ptr ? strlen(ptr) : 0) + 1;
×
1409
  SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
×
1410
  
1411
  if (NULL == poolHandle || NULL == fileName || NULL == ptr) {
×
1412
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p", 
×
1413
      __FUNCTION__, poolHandle, session, fileName, ptr);
1414
    MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1415
  }
1416

1417
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1418
  SMPSession* pSession = (SMPSession*)session;
×
1419

1420
  code = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem);
×
1421
  if (NULL != input.pMem) {
×
1422
    TAOS_STRCPY(input.pMem, ptr);
×
1423
    *((char*)input.pMem + size - 1) = 0;
×
1424
  }
1425

1426
  MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
×
1427
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_STRDUP, &input);
×
1428

1429
_return:
×
1430

1431
  if (TSDB_CODE_SUCCESS != code) {
×
1432
    terrno = code;
×
1433
  }
1434

1435
  return input.pMem;
×
1436
}
1437

1438
char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) {
242,541,811✔
1439
  if (0 == tsMemPoolFullFunc) {
242,541,811✔
1440
    return mpDirectStrndup(poolHandle, ((SMPSession*)session)->pJob, ptr, size);
242,577,947✔
1441
  }
1442

1443
  int32_t code = TSDB_CODE_SUCCESS;
×
1444
  int64_t origSize = ptr ? strlen(ptr) : 0;
×
1445
  size = TMIN(size, origSize) + 1;
×
1446
  SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
×
1447
  
1448
  if (NULL == poolHandle || NULL == fileName || NULL == ptr || size < 0) {
×
1449
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p, size:%" PRId64, 
×
1450
      __FUNCTION__, poolHandle, session, fileName, ptr, size);
1451
    MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1452
  }
1453

1454
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1455
  SMPSession* pSession = (SMPSession*)session;
×
1456

1457
  code = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem);
×
1458
  if (NULL != input.pMem) {
×
1459
    TAOS_MEMCPY(input.pMem, ptr, size - 1);
×
1460
    *((char*)input.pMem + size - 1) = 0;
×
1461
  }
1462

1463
  MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
×
1464
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_STRNDUP, &input);
×
1465

1466
_return:
×
1467

1468
  if (TSDB_CODE_SUCCESS != code) {
×
1469
    terrno = code;
×
1470
  }
1471

1472
  return input.pMem;
×
1473
}
1474

1475

1476
void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
2,147,483,647✔
1477
  if (0 == tsMemPoolFullFunc) {
2,147,483,647✔
1478
    mpDirectFree(poolHandle, ((SMPSession*)session)->pJob, ptr);
2,147,483,647✔
1479
    return;
2,147,483,647✔
1480
  }
1481

1482
  if (NULL == ptr) {
×
1483
    return;
×
1484
  }
1485
  
1486
  int32_t code = TSDB_CODE_SUCCESS;
×
1487
  if (NULL == poolHandle || NULL == fileName) {
×
1488
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p", 
×
1489
      __FUNCTION__, poolHandle, session, fileName);
1490
  }
1491

1492
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1493
  SMPSession* pSession = (SMPSession*)session;
×
1494
  SMPStatInput input = {.file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = ptr};
×
1495

1496
  mpFree(pPool, pSession, ptr, &input.size);
×
1497

1498
  MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
×
1499
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input);
×
1500
}
1501

1502
int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
×
1503
  if (0 == tsMemPoolFullFunc) {
×
1504
    return taosMemSize(ptr);
×
1505
  }
1506

1507
  int64_t code = 0;
×
1508
  if (NULL == poolHandle || NULL == fileName) {
×
1509
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p", 
×
1510
      __FUNCTION__, poolHandle, session, fileName);
1511
    MP_ERR_RET(TSDB_CODE_INVALID_PARA);
×
1512
  }
1513

1514
  if (NULL == ptr) {
×
1515
    return 0;
×
1516
  }
1517

1518
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1519
  SMPSession* pSession = (SMPSession*)session;
×
1520
  
1521
  return mpGetMemorySizeImpl(pPool, pSession, ptr);
×
1522
}
1523

1524
void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) {
2,147,483,647✔
1525
  if (0 == tsMemPoolFullFunc) {
2,147,483,647✔
1526
    return mpDirectAlignAlloc(poolHandle, ((SMPSession*)session)->pJob, alignment, size);
2,147,483,647✔
1527
  }
1528

UNCOV
1529
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1530
  SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
×
1531
  
1532
  if (NULL == poolHandle || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) {
×
1533
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, alignment:%u, size:%" PRId64, 
×
1534
      __FUNCTION__, poolHandle, session, fileName, alignment, size);
1535
    MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1536
  }
1537

1538
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1539
  SMPSession* pSession = (SMPSession*)session;
×
1540

1541
  code = mpMalloc(pPool, pSession, &input.size, alignment, &input.pMem);
×
1542

1543
  MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
×
1544
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input);
×
1545

1546
_return:
×
1547

1548
  if (TSDB_CODE_SUCCESS != code) {
×
1549
    terrno = code;
×
1550
  }
1551

1552
  return input.pMem;
×
1553
}
1554

1555
void taosMemPoolClose(void* poolHandle) {
562,340✔
1556
  if (NULL == poolHandle) {
562,340✔
1557
    return;
×
1558
  }
1559
  
1560
  SMemPool* pPool = (SMemPool*)poolHandle;
562,340✔
1561

1562
  if (tsMemPoolFullFunc) {
562,340✔
1563
    mpCheckStatDetail(pPool, NULL, "PoolClose");
×
1564
    mpDestroyPosStat(&pPool->stat.posStat);
×
1565
  }
1566
  
1567
  taosMemoryFree(pPool->name);
562,340✔
1568
  mpDestroyCacheGroup(&pPool->sessionCache);
562,340✔
1569

1570
  atomic_store_8(&gMPMgmt.modExit, 1);
562,340✔
1571

1572
  (void)taosThreadJoin(gMPMgmt.poolMgmtThread, NULL);
562,340✔
1573
}
1574

1575

1576
void taosMemPoolModDestroy(void) {
562,340✔
1577
  gMemPoolHandle = NULL;
562,340✔
1578
  
1579
  taosArrayDestroyEx(gMPMgmt.poolList, mpFreePool);
562,340✔
1580
  gMPMgmt.poolList = NULL;
562,340✔
1581
}
562,340✔
1582

1583

1584
int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed) {
×
1585
  if (0 == tsMemPoolFullFunc) {
×
1586
    return taosMemTrim(size, trimed);
×
1587
  }
1588

1589
  int32_t code = TSDB_CODE_SUCCESS;
×
1590
  
1591
  if (NULL == poolHandle || NULL == fileName || size < 0) {
×
1592
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%d", 
×
1593
      __FUNCTION__, poolHandle, session, fileName, size);
1594
    MP_ERR_RET(TSDB_CODE_INVALID_PARA);
×
1595
  }
1596

1597
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1598
  SMPSession* pSession = (SMPSession*)session;
×
1599
  SMPStatInput input = {.size = 0, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
×
1600

1601
  code = mpTrim(pPool, pSession, size, trimed);
×
1602

1603
  input.size = (trimed) ? (*trimed) : 0;
×
1604

1605
  MP_SET_FLAG(input.procFlags, ((0 == code) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
×
1606
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_TRIM, &input);
×
1607

1608
  return code;
×
1609
}
1610

1611
int32_t taosMemPoolCallocJob(uint64_t jobId, uint64_t cId, void** ppJob) {
112,525,317✔
1612
  int32_t code = TSDB_CODE_SUCCESS;
112,525,317✔
1613
  *ppJob = taosMemoryCalloc(1, sizeof(SMPJob));
112,525,317✔
1614
  if (NULL == *ppJob) {
112,457,174✔
1615
    uError("calloc mp job failed, code: 0x%x", terrno);
×
1616
    MP_ERR_RET(terrno);
×
1617
  }
1618

1619
  SMPJob* pJob = (SMPJob*)*ppJob;
112,463,980✔
1620
  pJob->job.jobId = jobId;
112,487,127✔
1621
  pJob->job.clientId = cId;
112,517,914✔
1622

1623
  return code;
112,460,467✔
1624
}
1625

1626
int32_t taosMemPoolTryLockPool(void* poolHandle, bool readLock) {
×
1627
  if (NULL == poolHandle) {
×
1628
    return TSDB_CODE_INVALID_PARA;
×
1629
  }
1630

1631
  int32_t code = 0;
×
1632
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1633
  if (readLock) {
×
1634
    MP_TRY_LOCK(MP_READ, &pPool->cfgLock, code);
×
1635
  } else {
1636
    MP_TRY_LOCK(MP_WRITE, &pPool->cfgLock, code);
×
1637
  }
1638

1639
  return code;
×
1640
}
1641

1642
void taosMemPoolUnLockPool(void* poolHandle, bool readLock) {
×
1643
  if (NULL == poolHandle) {
×
1644
    return;
×
1645
  }
1646

1647
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1648
  if (readLock) {
×
1649
    MP_UNLOCK(MP_READ, &pPool->cfgLock);
×
1650
  } else {
1651
    MP_UNLOCK(MP_WRITE, &pPool->cfgLock);
×
1652
  }
1653
}
1654

1655
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd) {
×
1656
  if (NULL == poolHandle) {
×
1657
    *usedSize = 0;
×
1658
    *needEnd = false;
×
1659
    return;
×
1660
  }
1661
  
1662
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1663

1664
  *needEnd = true;
×
1665
  *usedSize = atomic_load_64(&pPool->allocMemSize);
×
1666
}
1667

1668
void taosMemPoolGetUsedSizeEnd(void* poolHandle) {
×
1669
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1670
  if (NULL == pPool) {
×
1671
    return;
×
1672
  }
1673
  
1674
  MP_UNLOCK(MP_WRITE, &pPool->cfgLock);
×
1675
}
1676

1677
int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize) {
×
1678
  int32_t code = TSDB_CODE_SUCCESS;
×
1679
  if (NULL == session || (NULL == ppStat && NULL == allocSize && NULL == maxAllocSize)) {
×
1680
    uError("%s invalid input param, session:%p, ppStat:%p, allocSize:%p, maxAllocSize:%p", __FUNCTION__, session, ppStat, allocSize, maxAllocSize);
×
1681
    MP_ERR_RET(TSDB_CODE_INVALID_PARA);
×
1682
  }
1683

1684
  SMPSession* pSession = (SMPSession*)session;
×
1685

1686
  if (ppStat) {
×
1687
    *ppStat = &pSession->stat.statDetail;
×
1688
  }
1689
  if (allocSize) {
×
1690
    *allocSize = atomic_load_64(&pSession->allocMemSize);
×
1691
  }
1692
  if (maxAllocSize) {
×
1693
    *maxAllocSize = atomic_load_64(&pSession->maxAllocMemSize);
×
1694
  }
1695
  
1696
  return code;
×
1697
}
1698

1699
void taosMemPoolSchedTrim(void) {
×
1700
  mpSchedTrim(NULL);
×
1701
}
×
1702

1703
int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) {
582,216✔
1704
  int32_t code = TSDB_CODE_SUCCESS;
582,216✔
1705
  
1706
#ifdef LINUX  
1707
  if (!tsQueryUseMemoryPool) {
582,216✔
1708
#endif  
1709
    uInfo("memory pool disabled cause of configured disabled");
×
1710
    return code;
×
1711
#ifdef LINUX  
1712
  }
1713
#endif
1714

1715
  code = taosGetTotalMemory(&tsTotalMemoryKB);
582,216✔
1716
  if (TSDB_CODE_SUCCESS != code) {
582,216✔
1717
    uInfo("fail to system total memory, error: %s", tstrerror(code));
×
1718
    return code;
×
1719
  }
1720

1721
  if (tsTotalMemoryKB <= 0) {
582,216✔
1722
    uInfo("memory pool disabled since no enough system total memory, size: %" PRId64 "KB", tsTotalMemoryKB);
×
1723
    return code;
×
1724
  }
1725

1726
  uInfo("total memory size: %" PRId64 "KB", tsTotalMemoryKB);
582,216✔
1727

1728
  if (0 == tsMinReservedMemorySize) {
582,216✔
1729
    tsMinReservedMemorySize = TMAX(MIN_RESERVE_MEM_SIZE, tsTotalMemoryKB / 1024 * MP_DEFAULT_RESERVE_MEM_PERCENT / 100);
582,216✔
1730
  }
1731
  
1732
  SMemPoolCfg cfg = {0};
582,216✔
1733
  int64_t sysAvailSize = 0;
582,216✔
1734
  
1735
  code = taosGetSysAvailMemory(&sysAvailSize);
582,216✔
1736
  if (code || sysAvailSize < MP_MIN_MEM_POOL_SIZE) {
582,216✔
1737
    uInfo("memory pool disabled since no enough system available memory, size: %" PRId64, sysAvailSize);
×
1738
    code = TSDB_CODE_SUCCESS;
×
1739
    return code;
×
1740
  }
1741

1742
  cfg.reserveSize = tsMinReservedMemorySize * 1048576UL;
582,216✔
1743

1744
  int64_t freeSizeAfterRes = sysAvailSize - tsMinReservedMemorySize * 1048576UL;
582,216✔
1745
  if (freeSizeAfterRes < MP_MIN_FREE_SIZE_AFTER_RESERVE) {
582,216✔
1746
    uInfo("memory pool disabled since no enough system available memory after reservied, size: %" PRId64, freeSizeAfterRes);
×
1747
    return code;
×
1748
  }
1749

1750
  atomic_store_64(&tsCurrentAvailMemorySize, sysAvailSize);
582,216✔
1751

1752
  cfg.evicPolicy = E_EVICT_AUTO; //TODO
582,216✔
1753
  cfg.chunkSize = 1048576;
582,216✔
1754
  cfg.jobQuota = &tsSingleQueryMaxMemorySize;
582,216✔
1755
  cfg.cb.failFp = failFp;
582,216✔
1756
  cfg.cb.reachFp  = reachFp;
582,216✔
1757
  
1758
  code = taosMemPoolOpen("taosd", &cfg, &gMemPoolHandle);
582,216✔
1759
  if (TSDB_CODE_SUCCESS != code) {
582,216✔
1760
    return code;
×
1761
  }  
1762

1763
  code = mpCreateMgmtThread();
582,216✔
1764
  if (TSDB_CODE_SUCCESS != code) {
582,216✔
1765
    return code;
×
1766
  }  
1767

1768
  uInfo("memory pool initialized, reservedSize:%dMB, freeAfterReserved:%" PRId64 "MB, jobQuota:%dMB", tsMinReservedMemorySize, freeSizeAfterRes/1048576UL, tsSingleQueryMaxMemorySize);
582,216✔
1769

1770
  return code;
582,216✔
1771
}
1772

1773

1774
void taosAutoMemoryFree(void *ptr) {
2,147,483,647✔
1775
  if (NULL != gMemPoolHandle && threadPoolEnabled && threadPoolSession) {
2,147,483,647✔
1776
    taosMemPoolFree(gMemPoolHandle, threadPoolSession, ptr, __FILE__, __LINE__);
155,148,170✔
1777
  } else {
1778
    taosMemFree(ptr);
2,147,483,647✔
1779
  }
1780
}
2,147,483,647✔
1781

1782
static int32_t mpUpdateReservedSize(SMemPool* pPool, int32_t newReservedSizeMB) {
243✔
1783
  int32_t code = TSDB_CODE_SUCCESS;
243✔
1784
  int64_t sysAvailSize = 0;
243✔
1785
  code = taosGetSysAvailMemory(&sysAvailSize);
243✔
1786
  if (code || sysAvailSize < MP_MIN_MEM_POOL_SIZE) {
243✔
1787
    uInfo("failed to update reserved size since no enough system available memory, size: %" PRId64, sysAvailSize);
×
1788
    MP_ERR_RET(TSDB_CODE_QRY_MEMORY_POOL_MEMORY_NOT_ENOUGH);
×
1789
  }
1790

1791
  int64_t freeSizeAfterRes = sysAvailSize - newReservedSizeMB * 1048576UL;
243✔
1792
  if (freeSizeAfterRes < MP_MIN_FREE_SIZE_AFTER_RESERVE) {
243✔
1793
    uInfo("failed to update reserved size since no enough system available memory after reservied, size: %" PRId64,
×
1794
          freeSizeAfterRes);
1795
    MP_ERR_RET(TSDB_CODE_QRY_MEMORY_POOL_MEMORY_NOT_ENOUGH);
×
1796
  }
1797

1798
  MP_LOCK(MP_WRITE, &pPool->cfgLock);
243✔
1799
  pPool->cfg.reserveSize = newReservedSizeMB * 1048576UL;
243✔
1800
  MP_UNLOCK(MP_WRITE, &pPool->cfgLock);
243✔
1801

1802
  return code;
243✔
1803
}
1804

1805
int32_t taosMemoryPoolCfgUpdateReservedSize(int32_t newReservedSizeMB) {
243✔
1806
  if (!threadPoolEnabled) {
243✔
1807
    return TSDB_CODE_SUCCESS;
×
1808
  }
1809
  int32_t code = TSDB_CODE_SUCCESS;
243✔
1810
  if (NULL == gMemPoolHandle) {
243✔
1811
    uError("failed to update reserved size since memory pool not initialized");
×
1812
    MP_ERR_RET(TSDB_CODE_QRY_MEMORY_POOL_NOT_INITIALIZED);
×
1813
  }
1814
  if (newReservedSizeMB < 0) {
243✔
1815
    uError("failed to update reserved size since newReservedSizeMB:%d is invalid", newReservedSizeMB);
×
1816
    MP_ERR_RET(TSDB_CODE_INVALID_PARA);
×
1817
  }
1818

1819
  return mpUpdateReservedSize(gMemPoolHandle, newReservedSizeMB);
243✔
1820
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc