• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3768

28 Mar 2025 10:15AM UTC coverage: 33.726% (-0.3%) from 33.993%
#3768

push

travis-ci

happyguoxy
test:alter lcov result

144891 of 592084 branches covered (24.47%)

Branch coverage included in aggregate %.

218795 of 486283 relevant lines covered (44.99%)

765715.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.07
/source/util/src/tmempool.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "osMemPool.h"
18
#include "tmempoolInt.h"
19
#include "tlog.h"
20
#include "tutil.h"
21
#include "taos.h"
22
#include "tglobal.h"
23

24
static TdThreadOnce  gMPoolInit = PTHREAD_ONCE_INIT;
25
void* gMemPoolHandle = NULL;
26
threadlocal void* threadPoolSession = NULL;
27
threadlocal bool  threadPoolEnabled = true;
28

29
SMemPoolMgmt gMPMgmt = {0};
30
SMPStrategyFp gMPFps[] = {
31
  {NULL}, 
32
  {NULL,        mpDirectFullAlloc, mpDirectFullFree, mpDirectGetMemSize, mpDirectFullRealloc, NULL,               NULL,             mpDirectTrim},
33
  //{mpChunkInit, mpChunkAlloc,  mpChunkFree,  mpChunkGetMemSize,  mpChunkRealloc,  mpChunkInitSession, mpChunkUpdateCfg, NULL}
34
};
35

36

37
int32_t mpCheckCfg(SMemPoolCfg* cfg) {
×
38
  if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) {
×
39
    uError("invalid memory pool chunkSize:%d", cfg->chunkSize);
×
40
    return TSDB_CODE_INVALID_PARA;
×
41
  }
42

43
  if (cfg->evicPolicy <= 0 || cfg->evicPolicy >= E_EVICT_MAX_VALUE) {
×
44
    uError("invalid memory pool evicPolicy:%d", cfg->evicPolicy);
×
45
    return TSDB_CODE_INVALID_PARA;
×
46
  }
47

48
  if (cfg->threadNum <= 0) {
×
49
    uError("invalid memory pool threadNum:%d", cfg->threadNum);
×
50
    return TSDB_CODE_INVALID_PARA;
×
51
  }
52

53
  return TSDB_CODE_SUCCESS;
×
54
}
55

56

57
void mpFreeCacheGroup(SMPCacheGroup* pGrp) {
62✔
58
  if (NULL == pGrp) {
62!
59
    return;
×
60
  }
61

62
  taosMemoryFree(pGrp->pNodes);
62!
63
  taosMemoryFree(pGrp);
62!
64
}
65

66

67
int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead) {
133✔
68
  SMPCacheGroup* pGrp = NULL;
133✔
69
  if (NULL == pInfo->pGrpHead) {
133!
70
    pInfo->pGrpHead = taosMemoryCalloc(1, sizeof(*pInfo->pGrpHead));
133!
71
    if (NULL == pInfo->pGrpHead) {
133!
72
      uError("malloc pGrpHead failed, error:%s", tstrerror(terrno));
×
73
      MP_ERR_RET(terrno);
×
74
    }
75

76
    pGrp = pInfo->pGrpHead;
133✔
77
  } else {
78
    pGrp = (SMPCacheGroup*)taosMemoryCalloc(1, sizeof(SMPCacheGroup));
×
79
    if (NULL == pInfo->pGrpHead) {
×
80
      uError("malloc SMPCacheGroup failed, error:%s", tstrerror(terrno));
×
81
      MP_ERR_RET(terrno);
×
82
    }
83
    pGrp->pNext = pHead;
×
84
  }
85

86
  pGrp->nodesNum = pInfo->groupNum;
133✔
87
  pGrp->pNodes = taosMemoryCalloc(pGrp->nodesNum, pInfo->nodeSize);
133!
88
  if (NULL == pGrp->pNodes) {
133!
89
    uError("calloc %d %d nodes in cache group failed", pGrp->nodesNum, pInfo->nodeSize);
×
90
    MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
91
  }
92

93
  if (pHead && atomic_val_compare_exchange_ptr(&pInfo->pGrpHead, pHead, pGrp) != pHead) {
133!
94
    mpFreeCacheGroup(pGrp);
×
95
    return TSDB_CODE_SUCCESS;
×
96
  }
97

98
  (void)atomic_add_fetch_64(&pInfo->allocNum, pGrp->nodesNum);
133✔
99

100
  return TSDB_CODE_SUCCESS;
133✔
101
}
102

103
void mpDestroyCacheGroup(SMPCacheGroupInfo* pInfo) {
62✔
104
  SMPCacheGroup* pGrp = pInfo->pGrpHead;
62✔
105
  SMPCacheGroup* pNext = NULL;
62✔
106
  while (NULL != pGrp) {
124✔
107
    pNext = pGrp->pNext;
62✔
108

109
    mpFreeCacheGroup(pGrp);
62✔
110

111
    pGrp = pNext;
62✔
112
  }
113
}
62✔
114

115

116
int32_t mpPopIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes) {
2,290✔
117
  SMPCacheGroup* pGrp = NULL;
2,290✔
118
  SMPListNode* pNode = NULL;
2,290✔
119
  
120
  while (true) {
121
    pNode = (SMPListNode*)atomic_load_ptr(&pInfo->pIdleList);
2,290✔
122
    if (NULL == pNode) {
2,290✔
123
      break;
138✔
124
    }
125

126
    if (atomic_val_compare_exchange_ptr(&pInfo->pIdleList, pNode, pNode->pNext) != pNode) {
2,152!
127
      continue;
×
128
    }
129

130
    pNode->pNext = NULL;
2,152✔
131
    goto _return;
2,152✔
132
  }
133

134
  while (true) {
×
135
    pGrp = atomic_load_ptr(&pInfo->pGrpHead);
138✔
136
    int32_t offset = atomic_fetch_add_32(&pGrp->idleOffset, 1);
138✔
137
    if (offset < pGrp->nodesNum) {
138!
138
      pNode = (SMPListNode*)((char*)pGrp->pNodes + offset * pInfo->nodeSize);
138✔
139
      break;
138✔
140
    } else {
141
      (void)atomic_sub_fetch_32(&pGrp->idleOffset, 1);
×
142
    }
143
    
144
    MP_ERR_RET(mpAddCacheGroup(pPool, pInfo, pGrp));
×
145
  }
146

147
_return:
2,290✔
148

149
  *ppRes = pNode;
2,290✔
150

151
  return TSDB_CODE_SUCCESS;
2,290✔
152
}
153

154
void mpPushIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPListNode* pNode) {
2,289✔
155
  SMPCacheGroup* pGrp = NULL;
2,289✔
156
  SMPListNode* pOrig = NULL;
2,289✔
157
  
158
  while (true) {
159
    pOrig = (SMPListNode*)atomic_load_ptr(&pInfo->pIdleList);
2,289✔
160
    pNode->pNext = pOrig;
2,289✔
161
    
162
    if (atomic_val_compare_exchange_ptr(&pInfo->pIdleList, pOrig, pNode) != pOrig) {
2,289!
163
      continue;
×
164
    }
165

166
    break;
2,289✔
167
  }
168
}
2,289✔
169

170

171
int32_t mpUpdateCfg(SMemPool* pPool) {
133✔
172
  if (gMPFps[gMPMgmt.strategy].updateCfgFp) {
133!
173
    MP_ERR_RET((*gMPFps[gMPMgmt.strategy].updateCfgFp)(pPool));
×
174
  }
175

176
  uDebug("memPool %s cfg updated, reserveSize:%" PRId64 ", jobQuota:%dMB, threadNum:%d", 
133✔
177
      pPool->name, pPool->cfg.reserveSize, *pPool->cfg.jobQuota, pPool->cfg.threadNum);
178

179
  return TSDB_CODE_SUCCESS;
133✔
180
}
181

182
uint32_t mpFileIdHashFp(const char* fileId, uint32_t len) {
×
183
  return *(uint32_t*)fileId;
×
184
}
185

186
void mpDestroyPosStat(SMPStatPos* pStat) {
×
187
  taosHashCleanup(pStat->fileHash);
×
188
  pStat->fileHash = NULL;
×
189
  taosHashCleanup(pStat->remainHash);
×
190
  pStat->remainHash = NULL;
×
191
  taosHashCleanup(pStat->allocHash);
×
192
  pStat->allocHash = NULL;
×
193
  taosHashCleanup(pStat->freeHash);
×
194
  pStat->freeHash = NULL;
×
195
}
×
196

197
int32_t mpInitPosStat(SMPStatPos* pStat, bool sessionStat) {
×
198
  pStat->remainHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
199
  if (NULL == pStat->remainHash) {
×
200
    uError("memPool init posStat remainHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
×
201
    return terrno;
×
202
  }
203

204
  pStat->allocHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
205
  if (NULL == pStat->allocHash) {
×
206
    uError("memPool init posStat allocHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
×
207
    return terrno;
×
208
  }
209

210
  pStat->freeHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
211
  if (NULL == pStat->freeHash) {
×
212
    uError("memPool init posStat freeHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
×
213
    return terrno;
×
214
  }
215

216
  pStat->fileHash = taosHashInit(1024, mpFileIdHashFp, false, HASH_ENTRY_LOCK);
×
217
  if (NULL == pStat->fileHash) {
×
218
    uError("memPool init posStat fileHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
×
219
    return terrno;
×
220
  }
221

222
  uDebug("memPool stat initialized, sessionStat:%d", sessionStat);
×
223

224
  return TSDB_CODE_SUCCESS;
×
225
}
226

227
int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
133✔
228
//  MP_ERR_RET(mpCheckCfg(cfg));
229
  
230
  TAOS_MEMCPY(&pPool->cfg, cfg, sizeof(*cfg));
133✔
231
  
232
  pPool->name = taosStrdup(poolName);
133!
233
  if (NULL == pPool->name) {
133!
234
    uError("calloc memory pool name %s failed", poolName);
×
235
    MP_ERR_RET(terrno);
×
236
  }
237

238
  MP_ERR_RET(mpUpdateCfg(pPool));
133!
239

240
  pPool->sessionCache.groupNum = MP_SESSION_CACHE_ALLOC_BATCH_SIZE;
133✔
241
  pPool->sessionCache.nodeSize = sizeof(SMPSession);
133✔
242

243
  MP_ERR_RET(mpAddCacheGroup(pPool, &pPool->sessionCache, NULL));
133!
244

245
  if (gMPFps[gMPMgmt.strategy].initFp) {
133!
246
    MP_ERR_RET((*gMPFps[gMPMgmt.strategy].initFp)(pPool, poolName, cfg));
×
247
  }
248

249
  if (tsMemPoolFullFunc) {
133!
250
    MP_ERR_RET(mpInitPosStat(&pPool->stat.posStat, false));
×
251
  }
252
  
253
  return TSDB_CODE_SUCCESS;
133✔
254
}
255

256
FORCE_INLINE void mpUpdateMaxAllocSize(int64_t* pMaxAllocMemSize, int64_t newSize) {
257
  int64_t maxAllocMemSize = atomic_load_64(pMaxAllocMemSize);
18✔
258
  while (true) {
259
    if (newSize <= maxAllocMemSize) {
18!
260
      break;
1✔
261
    }
262
    
263
    if (maxAllocMemSize == atomic_val_compare_exchange_64(pMaxAllocMemSize, maxAllocMemSize, newSize)) {
17!
264
      break;
17✔
265
    }
266

267
    maxAllocMemSize = atomic_load_64(pMaxAllocMemSize);
×
268
  }
269
}
18✔
270

271
void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize) {
6✔
272
  if (addSize) {
6!
273
    if (NULL != pSession) {
6!
274
      (void)atomic_add_fetch_64(&pSession->pJob->job.allocMemSize, addSize);
6✔
275
    }
276
    (void)atomic_add_fetch_64(&pPool->allocMemSize, addSize);
6✔
277
  }
278

279
  if (NULL != pSession) {
6!
280
    int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size);
6✔
281
    mpUpdateMaxAllocSize(&pSession->maxAllocMemSize, allocMemSize);
6✔
282

283
    allocMemSize = atomic_load_64(&pSession->pJob->job.allocMemSize);
6✔
284
    mpUpdateMaxAllocSize(&pSession->pJob->job.maxAllocMemSize, allocMemSize);
6✔
285
  }
286
  
287
  if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOG_MAXSIZE)) {
6!
288
    int64_t allocMemSize = atomic_load_64(&pPool->allocMemSize);
6✔
289
    mpUpdateMaxAllocSize(&pPool->maxAllocMemSize, allocMemSize);
6✔
290
  }
291
}
6✔
292

293
int32_t mpPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) {
×
294
  if (retireLowLevel) {
×
295
    if (0 == atomic_val_compare_exchange_8(&gMPMgmt.msgQueue.lowLevelRetire, 0, 1)) {
×
296
      atomic_store_ptr(&gMPMgmt.msgQueue.pPool, pPool);
×
297
      MP_ERR_RET(tsem2_post(&gMPMgmt.threadSem));
×
298
    }
299
    
300
    return TSDB_CODE_SUCCESS;
×
301
  }
302

303
  if (0 == atomic_val_compare_exchange_8(&gMPMgmt.msgQueue.midLevelRetire, 0, 1)) {
×
304
    atomic_store_ptr(&gMPMgmt.msgQueue.pPool, pPool);
×
305
    MP_ERR_RET(tsem2_post(&gMPMgmt.threadSem));
×
306
  }
307
  
308
  return TSDB_CODE_SUCCESS;
×
309
}
310

311

312
int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size) {
6✔
313
  int32_t code = TSDB_CODE_SUCCESS;
6✔
314
  if (NULL == pSession) {
6!
315
    (void)atomic_add_fetch_64(&pPool->allocMemSize, size);
×
316
    return code;
×
317
  }
318
  
319
  SMPJob* pJob = pSession->pJob;  
6✔
320
  int64_t cAllocSize = atomic_add_fetch_64(&pJob->job.allocMemSize, size);
6✔
321
  int64_t quota = atomic_load_32(pPool->cfg.jobQuota);
6✔
322
  if (quota > 0 && cAllocSize > (quota * 1048576L)) {
6!
323
    code = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD;
×
324
    uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pJob->job.jobId, cAllocSize, quota);
×
325
    pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code);
×
326
    mpSchedTrim(NULL);
×
327
    (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
×
328
    MP_RET(code);
×
329
  }
330

331
  if (atomic_load_64(&tsCurrentAvailMemorySize) <= (pPool->cfg.reserveSize + size)) {
6!
332
    code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
×
333
    uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %" PRId64 " bytes", 
×
334
        pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, pPool->cfg.reserveSize);
335
    pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code);
×
336
    mpSchedTrim(NULL);
×
337
    (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
×
338
    MP_RET(code);
×
339
  }
340

341
  (void)atomic_add_fetch_64(&pPool->allocMemSize, size);
6✔
342

343
/*
344
  int64_t pAllocSize = atomic_add_fetch_64(&pPool->allocMemSize, size);
345
  if (pAllocSize >= atomic_load_32(pPool->cfg.upperLimitSize) * 1048576UL) {
346
    code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
347
    uWarn("%s pool allocSize %" PRId64 " reaches the upperLimit %" PRId64, pPool->name, pAllocSize, atomic_load_32(pPool->cfg.upperLimitSize) * 1048576UL);
348
    pPool->cfg.cb.retireJobFp(&pJob->job, code);
349
    (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
350
    (void)atomic_sub_fetch_64(&pPool->allocMemSize, size);
351
    MP_RET(code);
352
  }
353
*/
354

355
  return TSDB_CODE_SUCCESS;
6✔
356
}
357

358
int64_t mpGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) {
2✔
359
  return (*gMPFps[gMPMgmt.strategy].getSizeFp)(pPool, pSession, ptr);
2✔
360
}
361

362
int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes) {
5✔
363
  MP_RET((*gMPFps[gMPMgmt.strategy].allocFp)(pPool, pSession, size, alignment, ppRes));
5!
364
}
365

366
int32_t mpCalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, void** ppRes) {
1✔
367
  int32_t code = TSDB_CODE_SUCCESS;
1✔
368
  void *res = NULL;
1✔
369

370
  MP_ERR_RET(mpMalloc(pPool, pSession, size, 0, &res));
1!
371

372
  if (NULL != res) {
1!
373
    TAOS_MEMSET(res, 0, *size);
1✔
374
  }
375

376
_return:
×
377

378
  *ppRes = res;
1✔
379

380
  return code;
1✔
381
}
382

383

384
void mpFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) {
1✔
385
  if (NULL == ptr) {
1!
386
    if (origSize) {
×
387
      *origSize = 0;
×
388
    }
389
    
390
    return;
×
391
  }
392

393
  (*gMPFps[gMPMgmt.strategy].freeFp)(pPool, pSession, ptr, origSize);
1✔
394
}
395

396
int32_t mpRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) {
1✔
397
  int32_t code = TSDB_CODE_SUCCESS;
1✔
398

399
  if (NULL == *pPtr) {
1!
400
    *origSize = 0;
×
401
    MP_RET(mpMalloc(pPool, pSession, size, 0, pPtr));
×
402
  }
403

404
  if (0 == *size) {
1!
405
    mpFree(pPool, pSession, *pPtr, origSize);
×
406
    *pPtr = NULL;
×
407
    return TSDB_CODE_SUCCESS;
×
408
  }
409

410
  *origSize = mpGetMemorySizeImpl(pPool, pSession, *pPtr);
1✔
411

412
  MP_RET((*gMPFps[gMPMgmt.strategy].reallocFp)(pPool, pSession, pPtr, size, origSize));
1!
413
}
414

415
int32_t mpTrim(SMemPool* pPool, SMPSession* pSession, int32_t size, bool* trimed) {
2✔
416
  int32_t code = TSDB_CODE_SUCCESS;
2✔
417

418
  if (gMPFps[gMPMgmt.strategy].trimFp) {
2!
419
    MP_RET((*gMPFps[gMPMgmt.strategy].trimFp)(pPool, pSession, size, trimed));
2!
420
  }
421

422
  return code;
×
423
}
424

425

426
void mpPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName, int64_t maxAllocSize) {
×
427
  if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
×
428
    return;
×
429
  }
430

431
  uInfo("MemPool [%s] stat detail:", detailName);
×
432

433
  uInfo("Max Used Memory Size: %" PRId64, maxAllocSize);
×
434
  
435
  uInfo("[times]:");
×
436
  switch (gMPMgmt.strategy) {
×
437
    case E_MP_STRATEGY_DIRECT:
×
438
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Malloc", pDetail->times.memMalloc));
×
439
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Calloc", pDetail->times.memCalloc));
×
440
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Realloc", pDetail->times.memRealloc));
×
441
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->times.memStrdup));
×
442
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->times.memStrndup));
×
443
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Free", pDetail->times.memFree));
×
444
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Trim", pDetail->times.memTrim));
×
445
      break;
×
446
    case E_MP_STRATEGY_CHUNK:
×
447
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkMalloc", pDetail->times.chunkMalloc));
×
448
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkRecycle", pDetail->times.chunkRecycle));
×
449
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkReUse", pDetail->times.chunkReUse));
×
450
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkFree", pDetail->times.chunkFree));
×
451
      break;
×
452
    default:
×
453
      break;
×
454
  }
455
  
456
  uInfo("[bytes]:");
×
457
  switch (gMPMgmt.strategy) {
×
458
    case E_MP_STRATEGY_DIRECT:  
×
459
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Malloc", pDetail->bytes.memMalloc));
×
460
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Calloc", pDetail->bytes.memCalloc));
×
461
      uInfo(MP_STAT_ORIG_FORMAT, MP_STAT_ORIG_VALUE("Realloc", pDetail->bytes.memRealloc));
×
462
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->bytes.memStrdup));
×
463
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->bytes.memStrndup));
×
464
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Free", pDetail->bytes.memFree));
×
465
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Trim", pDetail->bytes.memTrim));
×
466
      break;
×
467
  case E_MP_STRATEGY_CHUNK:
×
468
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkMalloc", pDetail->bytes.chunkMalloc));
×
469
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkRecycle", pDetail->bytes.chunkRecycle));
×
470
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkReUse", pDetail->bytes.chunkReUse));
×
471
      uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkFree", pDetail->bytes.chunkFree));
×
472
      break;
×
473
    default:
×
474
      break;
×
475
  }
476
}
477

478
int32_t mpAddToRemainAllocHash(SHashObj* pHash, SMPFileLine* pFileLine) {
×
479
  int32_t code = TSDB_CODE_SUCCESS;
×
480
  SMPAllocStat stat = {0}, *pStat = NULL;
×
481
  
482
  while (true) {
483
    pStat = (SMPAllocStat*)taosHashGet(pHash, &pFileLine->fl, sizeof(pFileLine->fl));
×
484
    if (NULL == pStat) {
×
485
      code = taosHashPut(pHash, &pFileLine->fl, sizeof(pFileLine->fl), &stat, sizeof(stat));
×
486
      if (TSDB_CODE_SUCCESS != code) {
×
487
        if (TSDB_CODE_DUP_KEY == code) {
×
488
          continue;
×
489
        }
490
        
491
        uError("taosHashPut to remain alloc hash failed, error:%s", tstrerror(code));
×
492
        return code;
×
493
      }
494

495
      continue;
×
496
    }
497

498
    (void)atomic_add_fetch_64(&pStat->allocBytes, pFileLine->size);
×
499
    (void)atomic_add_fetch_64(&pStat->allocTimes, 1);
×
500
    break;
×
501
  }
502

503
  return TSDB_CODE_SUCCESS;
×
504
}
505

506
void mpPrintPosRemainStat(SMPStatPos* pStat) {
×
507
  int32_t code = TSDB_CODE_SUCCESS;
×
508
  int32_t remainNum = taosHashGetSize(pStat->remainHash);
×
509
  if (remainNum <= 0) {
×
510
    uInfo("no alloc remaining memory");
×
511
    return;
×
512
  }
513
  
514
  SHashObj* pAllocHash = taosHashInit(remainNum / 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
×
515
  if (NULL == pAllocHash) {
×
516
    uError("taosHashInit pAllocHash failed, error:%s, remainNum:%d", tstrerror(terrno), remainNum);
×
517
    return;
×
518
  }
519

520
  SMPFileLine* pFileLine = NULL;
×
521
  void* pIter = taosHashIterate(pStat->remainHash, NULL);
×
522
  while (pIter) {
×
523
    pFileLine = (SMPFileLine*)pIter;
×
524

525
    MP_ERR_JRET(mpAddToRemainAllocHash(pAllocHash, pFileLine));
×
526

527
    pIter = taosHashIterate(pStat->remainHash, pIter);
×
528
  }
529

530
  SMPAllocStat* pAlloc = NULL;
×
531
  pIter = taosHashIterate(pAllocHash, NULL);
×
532
  while (pIter) {
×
533
    pAlloc = (SMPAllocStat*)pIter;
×
534
    SMPFileLineId* pId = (SMPFileLineId*)taosHashGetKey(pIter, NULL);
×
535
    SMPAllocStat* pAlloc = (SMPAllocStat*)taosHashGet(pStat->allocHash, pId, sizeof(*pId));
×
536
    char* pFileName = (char*)taosHashGet(pStat->fileHash, &pId->fileId, sizeof(pId->fileId));
×
537
    if (NULL == pAlloc || NULL == pFileName) {
×
538
      uError("fail to get pId in allocHash or fileHash, pAlloc:%p, pFileName:%p", pAlloc, pFileName);
×
539
      goto _return;
×
540
    }
541

542
    uInfo("REMAINING: %" PRId64 " bytes alloced by %s:%d in %" PRId64 " times", pAlloc->allocBytes, pFileName, pId->line, pAlloc->allocTimes);
×
543
    
544
    pIter = taosHashIterate(pAllocHash, pIter);
×
545
  }
546

547
_return:
×
548

549
  taosHashCleanup(pAllocHash);
×
550
}
551

552
void mpPrintPosAllocStat(SMPStatPos* pStat) {
×
553

554
}
×
555

556
void mpPrintPosFreeStat(SMPStatPos* pStat) {
×
557

558
}
×
559

560
void mpPrintPosStat(SMPCtrlInfo* pCtrl, SMPStatPos* pStat, char* detailName) {
×
561
  if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
×
562
    return;
×
563
  }
564

565
  uInfo("MemPool [%s] Pos Stat:", detailName);
×
566
  uInfo("error times: %" PRId64, pStat->logErrTimes);
×
567

568
  mpPrintPosRemainStat(pStat);
×
569

570
  mpPrintPosAllocStat(pStat);
×
571

572
  mpPrintPosFreeStat(pStat);
×
573
}
574

575
void mpPrintNodeStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) {
×
576
  //TODO
577
}
×
578

579
void mpPrintSessionStat(SMPCtrlInfo* pCtrl, SMPStatSession* pSessStat, char* detailName) {
×
580
  if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
×
581
    return;
×
582
  }
583

584
  uInfo("MemPool [%s] session stat:", detailName);
×
585
  uInfo("init session succeed num: %" PRId64, pSessStat->initSucc);
×
586
  uInfo("init session failed num: %" PRId64, pSessStat->initFail);
×
587
  uInfo("session destroyed num: %" PRId64, pSessStat->destroyNum);
×
588
}
589

590

591

592
void mpLogDetailStat(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* pInput) {
18✔
593
  switch (item) {
18!
594
    case E_MP_STAT_LOG_MEM_MALLOC: {
4✔
595
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
4!
596
        (void)atomic_add_fetch_64(&pDetail->times.memMalloc.exec, 1);
4✔
597
        (void)atomic_add_fetch_64(&pDetail->bytes.memMalloc.exec, pInput->size);
4✔
598
      }
599
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
4!
600
        (void)atomic_add_fetch_64(&pDetail->times.memMalloc.succ, 1);
4✔
601
        (void)atomic_add_fetch_64(&pDetail->bytes.memMalloc.succ, pInput->size);
4✔
602
      } 
603
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
4!
604
        (void)atomic_add_fetch_64(&pDetail->times.memMalloc.fail, 1);
×
605
        (void)atomic_add_fetch_64(&pDetail->bytes.memMalloc.fail, pInput->size);
×
606
      } 
607
      break;
4✔
608
    }
609
    case E_MP_STAT_LOG_MEM_CALLOC:{
2✔
610
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
2!
611
        (void)atomic_add_fetch_64(&pDetail->times.memCalloc.exec, 1);
2✔
612
        (void)atomic_add_fetch_64(&pDetail->bytes.memCalloc.exec, pInput->size);
2✔
613
      }
614
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
2!
615
        (void)atomic_add_fetch_64(&pDetail->times.memCalloc.succ, 1);
2✔
616
        (void)atomic_add_fetch_64(&pDetail->bytes.memCalloc.succ, pInput->size);
2✔
617
      } 
618
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
2!
619
        (void)atomic_add_fetch_64(&pDetail->times.memCalloc.fail, 1);
×
620
        (void)atomic_add_fetch_64(&pDetail->bytes.memCalloc.fail, pInput->size);
×
621
      } 
622
      break;
2✔
623
    }
624
    case E_MP_STAT_LOG_MEM_REALLOC:{
2✔
625
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
2!
626
        (void)atomic_add_fetch_64(&pDetail->times.memRealloc.exec, 1);
2✔
627
        (void)atomic_add_fetch_64(&pDetail->bytes.memRealloc.exec, pInput->size);
2✔
628
        (void)atomic_add_fetch_64(&pDetail->bytes.memRealloc.origExec, pInput->origSize);
2✔
629
      }
630
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
2!
631
        (void)atomic_add_fetch_64(&pDetail->times.memRealloc.succ, 1);
2✔
632
        (void)atomic_add_fetch_64(&pDetail->bytes.memRealloc.succ, pInput->size);
2✔
633
        (void)atomic_add_fetch_64(&pDetail->bytes.memRealloc.origSucc, pInput->origSize);
2✔
634
      } 
635
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
2!
636
        (void)atomic_add_fetch_64(&pDetail->times.memRealloc.fail, 1);
×
637
        (void)atomic_add_fetch_64(&pDetail->bytes.memRealloc.fail, pInput->size);
×
638
        (void)atomic_add_fetch_64(&pDetail->bytes.memRealloc.origFail, pInput->origSize);
×
639
      } 
640
      break;
2✔
641
    }
642
    case E_MP_STAT_LOG_MEM_FREE:{
2✔
643
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
2!
644
        (void)atomic_add_fetch_64(&pDetail->times.memFree.exec, 1);
2✔
645
        (void)atomic_add_fetch_64(&pDetail->bytes.memFree.exec, pInput->size);
2✔
646
      }
647
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
2!
648
        (void)atomic_add_fetch_64(&pDetail->times.memFree.succ, 1);
2✔
649
        (void)atomic_add_fetch_64(&pDetail->bytes.memFree.succ, pInput->size);
2✔
650
      } 
651
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
2!
652
        (void)atomic_add_fetch_64(&pDetail->times.memFree.fail, 1);
×
653
        (void)atomic_add_fetch_64(&pDetail->bytes.memFree.fail, pInput->size);
×
654
      } 
655
      break;
2✔
656
    }
657
    case E_MP_STAT_LOG_MEM_STRDUP: {
2✔
658
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
2!
659
        (void)atomic_add_fetch_64(&pDetail->times.memStrdup.exec, 1);
2✔
660
        (void)atomic_add_fetch_64(&pDetail->bytes.memStrdup.exec, pInput->size);
2✔
661
      }
662
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
2!
663
        (void)atomic_add_fetch_64(&pDetail->times.memStrdup.succ, 1);
2✔
664
        (void)atomic_add_fetch_64(&pDetail->bytes.memStrdup.succ, pInput->size);
2✔
665
      } 
666
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
2!
667
        (void)atomic_add_fetch_64(&pDetail->times.memStrdup.fail, 1);
×
668
        (void)atomic_add_fetch_64(&pDetail->bytes.memStrdup.fail, pInput->size);
×
669
      } 
670
      break;
2✔
671
    }
672
    case E_MP_STAT_LOG_MEM_STRNDUP: {
2✔
673
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
2!
674
        (void)atomic_add_fetch_64(&pDetail->times.memStrndup.exec, 1);
2✔
675
        (void)atomic_add_fetch_64(&pDetail->bytes.memStrndup.exec, pInput->size);
2✔
676
      }
677
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
2!
678
        (void)atomic_add_fetch_64(&pDetail->times.memStrndup.succ, 1);
2✔
679
        (void)atomic_add_fetch_64(&pDetail->bytes.memStrndup.succ, pInput->size);
2✔
680
      } 
681
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
2!
682
        (void)atomic_add_fetch_64(&pDetail->times.memStrndup.fail, 1);
×
683
        (void)atomic_add_fetch_64(&pDetail->bytes.memStrndup.fail, pInput->size);
×
684
      } 
685
      break;
2✔
686
    }
687
    case E_MP_STAT_LOG_MEM_TRIM: {
4✔
688
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
4!
689
        (void)atomic_add_fetch_64(&pDetail->times.memTrim.exec, 1);
4✔
690
      }
691
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
4!
692
        (void)atomic_add_fetch_64(&pDetail->times.memTrim.succ, 1);
4✔
693
        (void)atomic_add_fetch_64(&pDetail->bytes.memTrim.succ, pInput->size);
4✔
694
      } 
695
      if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
4!
696
        (void)atomic_add_fetch_64(&pDetail->times.memTrim.fail, 1);
×
697
      } 
698
      break;
4✔
699
    }
700
    case E_MP_STAT_LOG_CHUNK_MALLOC:  
×
701
    case E_MP_STAT_LOG_CHUNK_RECYCLE:  
702
    case E_MP_STAT_LOG_CHUNK_REUSE:
703
    case E_MP_STAT_LOG_CHUNK_FREE: {
704

705
    }
706
    default:
707
      uError("Invalid stat item: %d", item);
×
708
      break;
×
709
  }
710
}
18✔
711

712
int32_t mpGetAllocFreeStat(SHashObj* pHash, void* pKey, int32_t keyLen, void* pNew, int32_t newSize, void** ppRes) {
×
713
  void* pStat = NULL;
×
714
  int32_t code = TSDB_CODE_SUCCESS;
×
715

716
  while (true) {
717
    pStat = taosHashGet(pHash, pKey, keyLen);
×
718
    if (NULL != pStat) {
×
719
      *ppRes = pStat;
×
720
      break;
×
721
    }
722

723
    code = taosHashPut(pHash, pKey, keyLen, pNew, newSize);
×
724
    if (code) {
×
725
      if (TSDB_CODE_DUP_KEY == code) {
×
726
        continue;
×
727
      }
728

729
      return code;
×
730
    }
731
  }
732

733
  return TSDB_CODE_SUCCESS;
×
734
}
735

736
int32_t mpGetPosStatFileId(SMPStatPos* pStat, char* fileName, uint32_t* pId, bool sessionStat) {
×
737
  uint32_t hashVal = MurmurHash3_32(fileName, strlen(fileName));
×
738
  int32_t code = taosHashPut(pStat->fileHash, &hashVal, sizeof(hashVal), fileName, strlen(fileName) + 1);
×
739
  if (code && TSDB_CODE_DUP_KEY != code) {
×
740
    return code;
×
741
  }
742

743
  *pId = hashVal;
×
744
  
745
  return TSDB_CODE_SUCCESS;
×
746
}
747

748
void mpLogPosStat(SMPStatPos* pStat, EMPStatLogItem item, SMPStatInput* pInput, bool sessionStat) {
×
749
  if (!MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
×
750
    return;
×
751
  }
752

753
  int32_t code = TSDB_CODE_SUCCESS;
×
754
  
755
  switch (item) {
×
756
    case E_MP_STAT_LOG_MEM_MALLOC: 
×
757
    case E_MP_STAT_LOG_MEM_CALLOC: 
758
    case E_MP_STAT_LOG_MEM_STRDUP: 
759
    case E_MP_STAT_LOG_MEM_STRNDUP: {
760
      SMPAllocStat allocStat = {0}, *pAlloc = NULL;
×
761
      SMPFileLine fileLine = {.fl.line = pInput->line, .size = pInput->size};
×
762
      code = mpGetPosStatFileId(pStat, pInput->file, &fileLine.fl.fileId, sessionStat);
×
763
      if (TSDB_CODE_SUCCESS != code) {
×
764
        uError("add pMem:%p %s:%d to fileHash failed, error:%s, sessionStat:%d", 
×
765
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
766
        MP_ERR_JRET(code);
×
767
      }
768
      code = taosHashPut(pStat->remainHash, &pInput->pMem, POINTER_BYTES, &fileLine, sizeof(fileLine));
×
769
      if (TSDB_CODE_SUCCESS != code) {
×
770
        if (TSDB_CODE_DUP_KEY == code) {
×
771
          SMPFileLine* pFileLine = (SMPFileLine*)taosHashAcquire(pStat->remainHash, &pInput->pMem, POINTER_BYTES);
×
772
          if (pFileLine) {
×
773
            char* pFileName = (char*)taosHashGet(pStat->fileHash, &pFileLine->fl.fileId, sizeof(pFileLine->fl.fileId));
×
774
            if (NULL == pFileName) {
×
775
              uError("fail to get fileId %u in fileHash", pFileLine->fl.fileId);
×
776
            } else {
777
              uError("add pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d, origAllocAt %s:%d", 
×
778
                pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat, pFileName, pFileLine->fl.line);
779
              MP_ERR_JRET(code);
×
780
            }
781
          }
782
        }
783

784
        uError("add pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d", 
×
785
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
786
        
787
        MP_ERR_JRET(code);
×
788
      }
789
      code = mpGetAllocFreeStat(pStat->allocHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&allocStat, sizeof(allocStat), (void**)&pAlloc);
×
790
      if (TSDB_CODE_SUCCESS != code) {
×
791
        uError("add pMem:%p %s:%d to allocHash failed, error:%s, sessionStat:%d", 
×
792
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
793
        MP_ERR_JRET(code);
×
794
      }
795
      
796
      (void)atomic_add_fetch_64(&pAlloc->allocTimes, 1);
×
797
      (void)atomic_add_fetch_64(&pAlloc->allocBytes, pInput->size);
×
798
      break;
×
799
    }
800
    case E_MP_STAT_LOG_MEM_REALLOC: {
×
801
      SMPAllocStat allocStat = {0}, *pAlloc = NULL;
×
802
      SMPFreeStat freeStat = {0}, *pFree = NULL;
×
803
      SMPFileLine fileLine = {.fl.line = pInput->line, .size = pInput->size};
×
804
      code = mpGetPosStatFileId(pStat, pInput->file, &fileLine.fl.fileId, sessionStat);
×
805
      if (TSDB_CODE_SUCCESS != code) {
×
806
        uError("realloc: add pMem:%p %s:%d to fileHash failed, error:%s, sessionStat:%d", 
×
807
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
808
        MP_ERR_JRET(code);
×
809
      }
810

811
      // ASSSRT((pInput->pOrigMem && pInput->origSize > 0) || (NULL == pInput->pOrigMem && pInput->origSize == 0));
812
      
813
      if (pInput->pOrigMem && pInput->origSize > 0) {
×
814
        code = taosHashRemove(pStat->remainHash, &pInput->pOrigMem, POINTER_BYTES);
×
815
        if (TSDB_CODE_SUCCESS != code) {
×
816
          uError("realloc: rm pOrigMem:%p %s:%d from remainHash failed, error:%s, sessionStat:%d", 
×
817
            pInput->pOrigMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
818
          MP_ERR_JRET(code);
×
819
        }
820
        code = mpGetAllocFreeStat(pStat->freeHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&freeStat, sizeof(freeStat), (void**)&pFree);
×
821
        if (TSDB_CODE_SUCCESS != code) {
×
822
          uError("realloc: add pOrigMem:%p %s:%d to freeHash failed, error:%s, sessionStat:%d", 
×
823
            pInput->pOrigMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
824
          MP_ERR_JRET(code);
×
825
        }
826
        
827
        (void)atomic_add_fetch_64(&pFree->freeTimes, 1);
×
828
        (void)atomic_add_fetch_64(&pFree->freeBytes, pInput->origSize);
×
829
      }
830
      
831
      code = taosHashPut(pStat->remainHash, &pInput->pMem, POINTER_BYTES, &fileLine, sizeof(fileLine));
×
832
      if (TSDB_CODE_SUCCESS != code) {
×
833
        if (TSDB_CODE_DUP_KEY == code) {
×
834
          SMPFileLine* pFileLine = (SMPFileLine*)taosHashAcquire(pStat->remainHash, &pInput->pMem, POINTER_BYTES);
×
835
          if (pFileLine) {
×
836
            char* pFileName = (char*)taosHashGet(pStat->fileHash, &pFileLine->fl.fileId, sizeof(pFileLine->fl.fileId));
×
837
            if (NULL == pFileName) {
×
838
              uError("realloc: fail to get fileId %u in fileHash", pFileLine->fl.fileId);
×
839
            } else {
840
              uError("realloc: add pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d, origAllocAt %s:%d", 
×
841
                pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat, pFileName, pFileLine->fl.line);
842
              MP_ERR_JRET(code);
×
843
            }
844
          }
845
        }
846

847
        uError("realloc: add pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d", 
×
848
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
849
        MP_ERR_JRET(code);
×
850
      }
851
      
852
      code = mpGetAllocFreeStat(pStat->allocHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&allocStat, sizeof(allocStat), (void**)&pAlloc);
×
853
      if (TSDB_CODE_SUCCESS != code) {
×
854
        uError("realloc: add pMem:%p %s:%d to allocHash failed, error:%s, sessionStat:%d", 
×
855
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
856
        MP_ERR_JRET(code);
×
857
      }
858

859
      (void)atomic_add_fetch_64(&pAlloc->allocTimes, 1);
×
860
      (void)atomic_add_fetch_64(&pAlloc->allocBytes, pInput->size);
×
861
      break;
×
862
    }
863
    case E_MP_STAT_LOG_MEM_FREE: {
×
864
      SMPAllocStat allocStat = {0}, *pAlloc = NULL;
×
865
      SMPFreeStat freeStat = {0}, *pFree = NULL;
×
866
      SMPFileLineId fl = {.line = pInput->line};
×
867
      code = mpGetPosStatFileId(pStat, pInput->file, &fl.fileId, sessionStat);
×
868
      if (TSDB_CODE_SUCCESS != code) {
×
869
        uError("free: add pMem:%p %s:%d to fileHash failed, error:%s, sessionStat:%d", 
×
870
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
871
        MP_ERR_JRET(code);
×
872
      }
873

874
      code = taosHashRemove(pStat->remainHash, &pInput->pMem, POINTER_BYTES);
×
875
      if (TSDB_CODE_SUCCESS != code) {
×
876
        uDebug("free: rm pMem:%p %s:%d to remainHash failed, error:%s, sessionStat:%d", 
×
877
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
878
      }
879

880
      code = mpGetAllocFreeStat(pStat->freeHash, &fl, sizeof(fl), (void*)&freeStat, sizeof(freeStat), (void**)&pFree);
×
881
      if (TSDB_CODE_SUCCESS != code) {
×
882
        uError("free: add pMem:%p %s:%d to freeHash failed, error:%s, sessionStat:%d", 
×
883
          pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
884
        MP_ERR_JRET(code);
×
885
      }
886
        
887
      (void)atomic_add_fetch_64(&pFree->freeTimes, 1);
×
888
      (void)atomic_add_fetch_64(&pFree->freeBytes, pInput->size);
×
889
      break;
×
890
    }
891
    case E_MP_STAT_LOG_MEM_TRIM:
×
892
      break;
×
893
    case E_MP_STAT_LOG_CHUNK_MALLOC:  
×
894
    case E_MP_STAT_LOG_CHUNK_RECYCLE:  
895
    case E_MP_STAT_LOG_CHUNK_REUSE:
896
    case E_MP_STAT_LOG_CHUNK_FREE: {
897
      break;
×
898
    }
899
    default:
×
900
      uError("Invalid stat item: %d", item);
×
901
      break;
×
902
  }
903

904
  return;
×
905

906
_return:
×
907
  
908
  (void)atomic_add_fetch_64(&pStat->logErrTimes, 1);
×
909
}
910

911

912
void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPStatInput* pInput) {
9✔
913
  bool enablePool = false, randErr = false;
9✔
914
  
915
  switch (item) {
9!
916
    case E_MP_STAT_LOG_MEM_MALLOC:
9✔
917
    case E_MP_STAT_LOG_MEM_CALLOC:
918
    case E_MP_STAT_LOG_MEM_REALLOC:
919
    case E_MP_STAT_LOG_MEM_FREE:
920
    case E_MP_STAT_LOG_MEM_STRDUP: 
921
    case E_MP_STAT_LOG_MEM_STRNDUP: 
922
    case E_MP_STAT_LOG_MEM_TRIM: {
923
      if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) {
9!
924
        mpLogDetailStat(&pSession->stat.statDetail, item, pInput);
9✔
925
      }
926
      if (MP_GET_FLAG(gMPMgmt.ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) {
9!
927
        mpLogDetailStat(&pPool->stat.statDetail, item, pInput);
9✔
928
      }
929
      if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
9!
930
        taosSaveDisableMemPoolUsage(enablePool, randErr);
×
931
        mpLogPosStat(&pSession->stat.posStat, item, pInput, true);
×
932
        taosRestoreEnableMemPoolUsage(enablePool, randErr);
×
933
      }
934
      if (MP_GET_FLAG(gMPMgmt.ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
9!
935
        taosSaveDisableMemPoolUsage(enablePool, randErr);
×
936
        mpLogPosStat(&pPool->stat.posStat, item, pInput, false);
×
937
        taosRestoreEnableMemPoolUsage(enablePool, randErr);
×
938
      }
939
      break;
9✔
940
    }
941
    case E_MP_STAT_LOG_CHUNK_MALLOC:  
×
942
    case E_MP_STAT_LOG_CHUNK_RECYCLE:  
943
    case E_MP_STAT_LOG_CHUNK_REUSE:
944
    case E_MP_STAT_LOG_CHUNK_FREE: {
945
      break;
×
946
    }
947
    default:
×
948
      uError("Invalid stat item: %d", item);
×
949
      break;
×
950
  }
951
}
9✔
952

953
void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) {
×
954
  if (0 == tsMemPoolFullFunc) {
×
955
    return;
×
956
  }
957
  
958
  SMemPool* pPool = (SMemPool*)poolHandle;
×
959
  SMPSession* pSession = (SMPSession*)session;
×
960
  SMPCtrlInfo* pCtrl = NULL;
×
961
  SMPStatDetail* pDetail = NULL;
×
962

963
  if (NULL != session) {
×
964
    pCtrl = &pSession->ctrl;
×
965
    pDetail = &pSession->stat.statDetail;
×
966
    if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT)) {
×
967
      int64_t allocSize = MEMPOOL_GET_ALLOC_SIZE(pDetail);
×
968
      int64_t freeSize = MEMPOOL_GET_FREE_SIZE(pDetail);
×
969

970
      if (allocSize != freeSize) {
×
971
        uError("%s Session in JOB:0x%" PRIx64 " stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, 
×
972
            detailName, pSession->pJob->job.jobId, allocSize, freeSize);
973

974
        taosMemPoolPrintStat(NULL, pSession, detailName);
×
975
      } else {
976
        uDebug("%s Session in JOB:0x%" PRIx64 " stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64, 
×
977
            detailName, pSession->pJob->job.jobId, allocSize, freeSize);
978
      }
979
    }
980
  }
981

982
  if (NULL != poolHandle) {
×
983
    pCtrl = &gMPMgmt.ctrl;
×
984
    pDetail = &pPool->stat.statDetail;
×
985
    int64_t sessInit = atomic_load_64(&pPool->stat.statSession.initFail) + atomic_load_64(&pPool->stat.statSession.initSucc);
×
986
    if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT) && sessInit == atomic_load_64(&pPool->stat.statSession.destroyNum)) {
×
987
      int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.memStrdup.succ + pDetail->bytes.memStrndup.succ;
×
988
      int64_t freeSize = pDetail->bytes.memRealloc.origSucc + pDetail->bytes.memFree.succ;
×
989

990
      if (allocSize != freeSize) {
×
991
        uError("%s MemPool %s stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize);
×
992

993
        taosMemPoolPrintStat(poolHandle, NULL, detailName);
×
994
      } else {
995
        uDebug("%s MemPool %s stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize);
×
996
      }
997
    }
998
  }
999
}
1000

1001

1002
void mpCheckUpateCfg(void) {
×
1003
/*
1004
  taosRLockLatch(&gMPMgmt.poolLock);
1005
  int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList);
1006
  for (int32_t i = 0; i < poolNum; ++i) {
1007
    SMemPool* pPool = (SMemPool*)taosArrayGetP(gMPMgmt.poolList, i);
1008
    if (pPool->cfg.cb.cfgUpdateFp) {
1009
      (*pPool->cfg.cb.cfgUpdateFp)((void*)pPool, &pPool->cfg);
1010
    }
1011
  }
1012
  taosRUnLockLatch(&gMPMgmt.poolLock);
1013
*/  
1014
}
×
1015

1016
void mpUpdateSystemAvailableMemorySize() {
388,467✔
1017
  static int64_t errorTimes = 0;
1018
  int64_t sysAvailSize = 0;
388,467✔
1019
  
1020
  int32_t code = taosGetSysAvailMemory(&sysAvailSize);
388,467✔
1021
  if (TSDB_CODE_SUCCESS != code) {
388,464!
1022
    errorTimes++;
×
1023
    if (0 == errorTimes % 1000) {
×
1024
      uError("get system available memory size failed, error: %s, errorTimes:%" PRId64, tstrerror(code), errorTimes);
×
1025
    }
1026
    
1027
    return;
×
1028
  }
1029

1030
  atomic_store_64(&tsCurrentAvailMemorySize, sysAvailSize);
388,464✔
1031

1032
  uTrace("system available memory size: %" PRId64, sysAvailSize);
388,464✔
1033
}
1034

1035
void mpSchedTrim(int64_t* loopTimes) {
745✔
1036
  static int64_t trimTimes = 0;
1037
  
1038
  atomic_store_8(&tsNeedTrim, 1);
745✔
1039
  if (loopTimes) {
745!
1040
    *loopTimes = 0;
745✔
1041
  }
1042
  
1043
  uDebug("%" PRId64 "th memory trim scheduled", ++trimTimes);
745✔
1044
}
745✔
1045

1046
void* mpMgmtThreadFunc(void* param) {
132✔
1047
  int32_t timeout = 0;
132✔
1048
  int64_t retireSize = 0, loopTimes = 0;
132✔
1049
  SMemPool* pPool = (SMemPool*)atomic_load_ptr(&gMemPoolHandle);
132✔
1050
  
1051
  while (0 == atomic_load_8(&gMPMgmt.modExit)) {
388,529✔
1052
    mpUpdateSystemAvailableMemorySize();
388,467✔
1053

1054
    retireSize = pPool->cfg.reserveSize - atomic_load_64(&tsCurrentAvailMemorySize);
388,464✔
1055
    if (retireSize > 0) {
388,464!
1056
      (*pPool->cfg.cb.failFp)(retireSize, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
×
1057

1058
      mpSchedTrim(&loopTimes);
×
1059
    }
1060

1061
    if (0 == (++loopTimes) % 500) {
388,464✔
1062
      mpSchedTrim(&loopTimes);
745✔
1063
    }
1064

1065
    taosMsleep(MP_DEFAULT_MEM_CHK_INTERVAL_MS);
388,464✔
1066
/*
1067
    timeout = tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs);
1068
    if (0 != timeout) {
1069
      mpUpdateSystemAvailableMemorySize();
1070
      continue;
1071
    }
1072

1073
    if (atomic_load_8(&gMPMgmt.msgQueue.midLevelRetire)) {
1074
      (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->cfg.retireUnitSize), false, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
1075
    } else if (atomic_load_8(&gMPMgmt.msgQueue.lowLevelRetire)) {
1076
      (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->cfg.retireUnitSize), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
1077
    }
1078
*/    
1079
  }
1080

1081
  taosMemPoolModDestroy();
62✔
1082
  
1083
  return NULL;
62✔
1084
}
1085

1086
int32_t mpCreateMgmtThread() {
133✔
1087
  int32_t code = TSDB_CODE_SUCCESS;
133✔
1088
  TdThreadAttr thAttr;
1089
  MP_ERR_RET(taosThreadAttrInit(&thAttr));
133!
1090
  MP_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
133!
1091
  code = taosThreadCreate(&gMPMgmt.poolMgmtThread, &thAttr, mpMgmtThreadFunc, NULL);
133✔
1092
  if (code != 0) {
133!
1093
    uError("failed to create memPool mgmt thread, error: 0x%x", code);
×
1094
    (void)taosThreadAttrDestroy(&thAttr);
×
1095
    MP_ERR_JRET(code);
×
1096
  }
1097

1098
_return:
133✔
1099

1100
  MP_ERR_RET(taosThreadAttrDestroy(&thAttr));
133!
1101

1102
  return code;
133✔
1103
}
1104

1105
void mpModInit(void) {
133✔
1106
  int32_t code = TSDB_CODE_SUCCESS;
133✔
1107

1108
  gMPMgmt.modExit = 0;
133✔
1109

1110
  taosInitRWLatch(&gMPMgmt.poolLock);
133✔
1111
  
1112
  gMPMgmt.poolList = taosArrayInit(10, POINTER_BYTES);
133✔
1113
  if (NULL == gMPMgmt.poolList) {
133!
1114
    MP_ERR_JRET(terrno);
×
1115
  }
1116

1117
  gMPMgmt.strategy = E_MP_STRATEGY_DIRECT;
133✔
1118

1119
  gMPMgmt.ctrl.statFlags = MP_STAT_FLAG_LOG_ALL & (~MP_LOG_FLAG_ALL_POS);
133✔
1120
  gMPMgmt.ctrl.funcFlags = MP_CTRL_FLAG_PRINT_STAT | MP_CTRL_FLAG_CHECK_STAT | MP_CTRL_FLAG_LOG_MAXSIZE;
133✔
1121

1122
  //gMPMgmt.code = tsem2_init(&gMPMgmt.threadSem, 0, 0);
1123
  //if (TSDB_CODE_SUCCESS != gMPMgmt.code) {
1124
  //  uError("failed to init sem2, error: 0x%x", gMPMgmt.code);
1125
  //  return;
1126
  //}
1127

1128
  gMPMgmt.waitMs = MP_DEFAULT_MEM_CHK_INTERVAL_MS;
133✔
1129

1130
_return:
133✔
1131

1132
  gMPMgmt.code = code;
133✔
1133
}
133✔
1134

1135

1136
void mpFreePool(void* p) {
62✔
1137
  SMemPool* pPool = *(void**)p;
62✔
1138
  taosMemoryFree(pPool);
62!
1139
}
62✔
1140

1141

1142
void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) {
×
1143
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1144
  SMPSession* pSession = (SMPSession*)session;
×
1145
  char detailName[128];
1146

1147
  if (NULL != pSession && MP_GET_FLAG(pSession->ctrl.funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
×
1148
    snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "Session");
×
1149
    detailName[sizeof(detailName) - 1] = 0;
×
1150
    mpPrintStatDetail(&pSession->ctrl, &pSession->stat.statDetail, detailName, pSession->maxAllocMemSize);
×
1151

1152
    snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionPos");
×
1153
    detailName[sizeof(detailName) - 1] = 0;
×
1154
    mpPrintPosStat(&pSession->ctrl, &pSession->stat.posStat, detailName);
×
1155
  }
1156

1157
  if (NULL != pPool && MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
×
1158
    snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name);
×
1159
    detailName[sizeof(detailName) - 1] = 0;
×
1160
    mpPrintSessionStat(&gMPMgmt.ctrl, &pPool->stat.statSession, detailName);
×
1161
    mpPrintStatDetail(&gMPMgmt.ctrl, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize);
×
1162

1163
    //snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode");
1164
    //detailName[sizeof(detailName) - 1] = 0;
1165
    //mpPrintNodeStat(&gMPMgmt.ctrl, pPool->stat.nodeStat, detailName);
1166
    
1167
    snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolPos");
×
1168
    detailName[sizeof(detailName) - 1] = 0;
×
1169
    mpPrintPosStat(&gMPMgmt.ctrl, &pPool->stat.posStat, detailName);
×
1170
  }
1171
}
×
1172

1173

1174
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
133✔
1175
  int32_t code = TSDB_CODE_SUCCESS;
133✔
1176
  SMemPool* pPool = NULL;
133✔
1177
  
1178
  //MP_ERR_JRET(taosThreadOnce(&gMPoolInit, mpModInit));
1179
  mpModInit();
133✔
1180
  
1181
  if (TSDB_CODE_SUCCESS != gMPMgmt.code) {
133!
1182
    uError("init memory pool failed, code: 0x%x", gMPMgmt.code);
×
1183
    MP_ERR_JRET(gMPMgmt.code);
×
1184
  }
1185

1186
  pPool = (SMemPool*)taosMemoryCalloc(1, sizeof(SMemPool));
133!
1187
  if (NULL == pPool) {
133!
1188
    uError("calloc memory pool failed, code: 0x%x", terrno);
×
1189
    MP_ERR_JRET(terrno);
×
1190
  }
1191

1192
  MP_ERR_JRET(mpInit(pPool, poolName, cfg));
133!
1193

1194
  taosWLockLatch(&gMPMgmt.poolLock);
133✔
1195
  
1196
  if (NULL == taosArrayPush(gMPMgmt.poolList, &pPool)) {
266!
1197
    taosWUnLockLatch(&gMPMgmt.poolLock);
×
1198
    MP_ERR_JRET(terrno);
×
1199
  }
1200
  
1201
  pPool->slotId = taosArrayGetSize(gMPMgmt.poolList) - 1;
133✔
1202
  
1203
  taosWUnLockLatch(&gMPMgmt.poolLock);
133✔
1204

1205
  uInfo("mempool %s opened", poolName);
133!
1206

1207
_return:
×
1208

1209
  if (TSDB_CODE_SUCCESS != code) {
133!
1210
    taosMemPoolClose(pPool);
×
1211
    pPool = NULL;
×
1212
  }
1213

1214
  *poolHandle = pPool;
133✔
1215

1216
  return code;
133✔
1217
}
1218

1219
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) {
×
1220
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1221

1222
  (void)mpUpdateCfg(pPool);
×
1223
}
×
1224

1225
void taosMemPoolDestroySession(void* poolHandle, void* session) {
2,289✔
1226
  SMemPool* pPool = (SMemPool*)poolHandle;
2,289✔
1227
  SMPSession* pSession = (SMPSession*)session;
2,289✔
1228
  if (NULL == poolHandle || NULL == pSession) {
2,289!
1229
    uWarn("null pointer of poolHandle %p or session %p", poolHandle, session);
×
1230
    return;
×
1231
  }
1232

1233
  if (tsMemPoolFullFunc) {
2,289!
1234
    (void)atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1);
×
1235
    mpCheckStatDetail(pPool, pSession, "DestroySession");
×
1236
    mpDestroyPosStat(&pSession->stat.posStat);
×
1237
  }
1238
  
1239
  taosMemFreeClear(pSession->sessionId);
2,289!
1240

1241
  TAOS_MEMSET(pSession, 0, sizeof(*pSession));
2,289✔
1242

1243
  mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession);
2,289✔
1244
}
1245

1246
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, char* sessionId) {
2,290✔
1247
  int32_t code = TSDB_CODE_SUCCESS;
2,290✔
1248
  SMemPool* pPool = (SMemPool*)poolHandle;
2,290✔
1249
  SMPSession* pSession = NULL;
2,290✔
1250

1251
  MP_ERR_JRET(mpPopIdleNode(pPool, &pPool->sessionCache, (void**)&pSession));
2,290!
1252

1253
  pSession->sessionId = taosStrdup(sessionId);
2,290!
1254
  if (NULL == pSession->sessionId) {
2,290!
1255
    uError("strdup sessionId failed, error:%s", tstrerror(terrno));
×
1256
    MP_ERR_JRET(terrno);
×
1257
  }
1258

1259
  TAOS_MEMCPY(&pSession->ctrl, &gMPMgmt.ctrl, sizeof(pSession->ctrl));
2,290✔
1260

1261
  if (gMPFps[gMPMgmt.strategy].initSessionFp) {
2,290!
1262
    MP_ERR_JRET((*gMPFps[gMPMgmt.strategy].initSessionFp)(pPool, pSession));
×
1263
  }
1264

1265
  if (tsMemPoolFullFunc) {
2,290!
1266
    MP_ERR_JRET(mpInitPosStat(&pSession->stat.posStat, true));
×
1267
  }
1268
  
1269
  pSession->pJob = (SMPJob*)pJob;
2,290✔
1270

1271
_return:
2,290✔
1272

1273
  if (TSDB_CODE_SUCCESS != code) {
2,290!
1274
    taosMemPoolDestroySession(poolHandle, pSession);
×
1275
    pSession = NULL;
×
1276
    (void)atomic_add_fetch_64(&pPool->stat.statSession.initFail, 1);
×
1277
  } else {
1278
    (void)atomic_add_fetch_64(&pPool->stat.statSession.initSucc, 1);
2,290✔
1279
  }
1280

1281
  *ppSession = pSession;
2,290✔
1282

1283
  return code;
2,290✔
1284
}
1285

1286

1287
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
154,793✔
1288
  if (0 == tsMemPoolFullFunc) {
154,793!
1289
    return mpDirectAlloc(poolHandle, ((SMPSession*)session)->pJob, size);
154,805✔
1290
  }
1291
  
1292
  int32_t code = TSDB_CODE_SUCCESS;
1293
  SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
1294
  
1295
  if (NULL == poolHandle || NULL == fileName || size < 0) {
3!
1296
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, size);
×
1297
    MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1298
  }
1299

1300
  SMemPool* pPool = (SMemPool*)poolHandle;
1301
  SMPSession* pSession = (SMPSession*)session;
1302

1303
  code = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem);
1304

1305
  MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
1!
1306
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input);
1✔
1307

1308
_return:
1✔
1309

1310
  if (TSDB_CODE_SUCCESS != code) {
1!
1311
    terrno = code;
×
1312
  }
1313
  
1314
  return input.pMem;
1✔
1315
}
1316

1317
void   *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
18,777,417✔
1318
  if (0 == tsMemPoolFullFunc) {
18,777,417!
1319
    return mpDirectCalloc(poolHandle, ((SMPSession*)session)->pJob, num, size);
18,778,909✔
1320
  }
1321

1322
  int32_t code = TSDB_CODE_SUCCESS;
1323
  int64_t totalSize = num * size;
1324
  SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
1325
  
1326
  if (NULL == poolHandle || NULL == fileName || num < 0 || size < 0) {
4!
1327
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, num:%" PRId64 ", size:%" PRId64, 
×
1328
      __FUNCTION__, poolHandle, session, fileName, num, size);
1329
    MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1330
  }
1331

1332
  SMemPool* pPool = (SMemPool*)poolHandle;
1333
  SMPSession* pSession = (SMPSession*)session;
1334

1335
  code = mpCalloc(pPool, pSession, &input.size, &input.pMem);
1336

1337
  MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
1!
1338
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input);
1✔
1339

1340
_return:
1✔
1341

1342
  if (TSDB_CODE_SUCCESS != code) {
1!
1343
    terrno = code;
×
1344
  }
1345

1346
  return input.pMem;
1✔
1347
}
1348

1349
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) {
1,830,990✔
1350
  if (0 == tsMemPoolFullFunc) {
1,830,990✔
1351
    return mpDirectRealloc(poolHandle, ((SMPSession*)session)->pJob, ptr, size);
1,830,989✔
1352
  }
1353

1354
  int32_t code = TSDB_CODE_SUCCESS;
1✔
1355
  SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .origSize = 0, .pMem = ptr, .pOrigMem = ptr};
1✔
1356
  
1357
  if (NULL == poolHandle || NULL == fileName || size < 0) {
1!
1358
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, 
×
1359
      __FUNCTION__, poolHandle, session, fileName, size);
1360
    MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1361
  }
1362

1363
  SMemPool* pPool = (SMemPool*)poolHandle;
1✔
1364
  SMPSession* pSession = (SMPSession*)session;
1✔
1365

1366
  code = mpRealloc(pPool, pSession, &input.pMem, &input.size, &input.origSize);
1✔
1367

1368
  if (NULL != input.pMem) {
1!
1369
    MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
1✔
1370
    mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input);
1✔
1371
  } else if (0 == size){
×
1372
    input.pMem = input.pOrigMem;
×
1373
    input.pOrigMem = NULL;
×
1374
    input.size = input.origSize;
×
1375
    MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
×
1376
    mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input);
×
1377
    input.pMem = NULL;
×
1378
  } else {
1379
    MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_FAIL);
×
1380
    mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input);
×
1381

1382
    input.pMem = input.pOrigMem;
×
1383
    input.pOrigMem = NULL;
×
1384
    input.size = input.origSize;
×
1385
    input.procFlags = MP_STAT_PROC_FLAG_EXEC;
×
1386
    MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
×
1387
    mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input);
×
1388
    input.pMem = NULL;
×
1389
  }
1390

1391
_return:
1✔
1392

1393
  if (TSDB_CODE_SUCCESS != code) {
1!
1394
    terrno = code;
×
1395
  }
1396

1397
  return input.pMem;
1✔
1398
}
1399

1400
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) {
9,276✔
1401
  if (0 == tsMemPoolFullFunc) {
9,276✔
1402
    return mpDirectStrdup(poolHandle, ((SMPSession*)session)->pJob, ptr);
9,275✔
1403
  }
1404

1405
  int32_t code = TSDB_CODE_SUCCESS;
1✔
1406
  int64_t size = (ptr ? strlen(ptr) : 0) + 1;
1!
1407
  SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
1✔
1408
  
1409
  if (NULL == poolHandle || NULL == fileName || NULL == ptr) {
1!
1410
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p", 
×
1411
      __FUNCTION__, poolHandle, session, fileName, ptr);
1412
    MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1413
  }
1414

1415
  SMemPool* pPool = (SMemPool*)poolHandle;
1✔
1416
  SMPSession* pSession = (SMPSession*)session;
1✔
1417

1418
  code = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem);
1✔
1419
  if (NULL != input.pMem) {
1!
1420
    TAOS_STRCPY(input.pMem, ptr);
1✔
1421
    *((char*)input.pMem + size - 1) = 0;
1✔
1422
  }
1423

1424
  MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
1!
1425
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_STRDUP, &input);
1✔
1426

1427
_return:
1✔
1428

1429
  if (TSDB_CODE_SUCCESS != code) {
1!
1430
    terrno = code;
×
1431
  }
1432

1433
  return input.pMem;
1✔
1434
}
1435

1436
char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) {
1,177✔
1437
  if (0 == tsMemPoolFullFunc) {
1,177✔
1438
    return mpDirectStrndup(poolHandle, ((SMPSession*)session)->pJob, ptr, size);
1,176✔
1439
  }
1440

1441
  int32_t code = TSDB_CODE_SUCCESS;
1✔
1442
  int64_t origSize = ptr ? strlen(ptr) : 0;
1!
1443
  size = TMIN(size, origSize) + 1;
1✔
1444
  SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
1✔
1445
  
1446
  if (NULL == poolHandle || NULL == fileName || NULL == ptr || size < 0) {
1!
1447
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p, size:%" PRId64, 
×
1448
      __FUNCTION__, poolHandle, session, fileName, ptr, size);
1449
    MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1450
  }
1451

1452
  SMemPool* pPool = (SMemPool*)poolHandle;
1✔
1453
  SMPSession* pSession = (SMPSession*)session;
1✔
1454

1455
  code = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem);
1✔
1456
  if (NULL != input.pMem) {
1!
1457
    TAOS_MEMCPY(input.pMem, ptr, size - 1);
1✔
1458
    *((char*)input.pMem + size - 1) = 0;
1✔
1459
  }
1460

1461
  MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
1!
1462
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_STRNDUP, &input);
1✔
1463

1464
_return:
1✔
1465

1466
  if (TSDB_CODE_SUCCESS != code) {
1!
1467
    terrno = code;
×
1468
  }
1469

1470
  return input.pMem;
1✔
1471
}
1472

1473

1474
void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
20,636,515✔
1475
  if (0 == tsMemPoolFullFunc) {
20,636,515!
1476
    mpDirectFree(poolHandle, ((SMPSession*)session)->pJob, ptr);
20,640,359✔
1477
    return;
20,646,579✔
1478
  }
1479

1480
  if (NULL == ptr) {
×
1481
    return;
×
1482
  }
1483
  
1484
  int32_t code = TSDB_CODE_SUCCESS;
1485
  if (NULL == poolHandle || NULL == fileName) {
2!
1486
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p", 
×
1487
      __FUNCTION__, poolHandle, session, fileName);
1488
  }
1489

1490
  SMemPool* pPool = (SMemPool*)poolHandle;
1491
  SMPSession* pSession = (SMPSession*)session;
1492
  SMPStatInput input = {.file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = ptr};
1493

1494
  mpFree(pPool, pSession, ptr, &input.size);
1495

1496
  MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
1✔
1497
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input);
1✔
1498
}
1499

1500
int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
2✔
1501
  if (0 == tsMemPoolFullFunc) {
2✔
1502
    return taosMemSize(ptr);
1✔
1503
  }
1504

1505
  int64_t code = 0;
1✔
1506
  if (NULL == poolHandle || NULL == fileName) {
1!
1507
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p", 
×
1508
      __FUNCTION__, poolHandle, session, fileName);
1509
    MP_ERR_RET(TSDB_CODE_INVALID_PARA);
×
1510
  }
1511

1512
  if (NULL == ptr) {
1!
1513
    return 0;
×
1514
  }
1515

1516
  SMemPool* pPool = (SMemPool*)poolHandle;
1✔
1517
  SMPSession* pSession = (SMPSession*)session;
1✔
1518
  
1519
  return mpGetMemorySizeImpl(pPool, pSession, ptr);
1✔
1520
}
1521

1522
void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) {
38,181✔
1523
  if (0 == tsMemPoolFullFunc) {
38,181!
1524
    return mpDirectAlignAlloc(poolHandle, ((SMPSession*)session)->pJob, alignment, size);
38,181✔
1525
  }
1526

1527
  int32_t code = TSDB_CODE_SUCCESS;
×
1528
  SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
×
1529
  
1530
  if (NULL == poolHandle || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) {
×
1531
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, alignment:%u, size:%" PRId64, 
×
1532
      __FUNCTION__, poolHandle, session, fileName, alignment, size);
1533
    MP_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1534
  }
1535

1536
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1537
  SMPSession* pSession = (SMPSession*)session;
×
1538

1539
  code = mpMalloc(pPool, pSession, &input.size, alignment, &input.pMem);
×
1540

1541
  MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
1!
1542
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input);
1✔
1543

1544
_return:
1✔
1545

1546
  if (TSDB_CODE_SUCCESS != code) {
1!
1547
    terrno = code;
×
1548
  }
1549

1550
  return input.pMem;
1✔
1551
}
1552

1553
void taosMemPoolClose(void* poolHandle) {
70✔
1554
  if (NULL == poolHandle) {
70✔
1555
    return;
8✔
1556
  }
1557
  
1558
  SMemPool* pPool = (SMemPool*)poolHandle;
62✔
1559

1560
  if (tsMemPoolFullFunc) {
62!
1561
    mpCheckStatDetail(pPool, NULL, "PoolClose");
×
1562
    mpDestroyPosStat(&pPool->stat.posStat);
×
1563
  }
1564
  
1565
  taosMemoryFree(pPool->name);
62!
1566
  mpDestroyCacheGroup(&pPool->sessionCache);
62✔
1567

1568
  atomic_store_8(&gMPMgmt.modExit, 1);
62✔
1569

1570
  (void)taosThreadJoin(gMPMgmt.poolMgmtThread, NULL);
62✔
1571
}
1572

1573

1574
void taosMemPoolModDestroy(void) {
62✔
1575
  gMemPoolHandle = NULL;
62✔
1576
  
1577
  taosArrayDestroyEx(gMPMgmt.poolList, mpFreePool);
62✔
1578
  gMPMgmt.poolList = NULL;
62✔
1579
}
62✔
1580

1581

1582
int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed) {
4✔
1583
  if (0 == tsMemPoolFullFunc) {
4✔
1584
    return taosMemTrim(size, trimed);
2✔
1585
  }
1586

1587
  int32_t code = TSDB_CODE_SUCCESS;
2✔
1588
  
1589
  if (NULL == poolHandle || NULL == fileName || size < 0) {
2!
1590
    uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%d", 
×
1591
      __FUNCTION__, poolHandle, session, fileName, size);
1592
    MP_ERR_RET(TSDB_CODE_INVALID_PARA);
×
1593
  }
1594

1595
  SMemPool* pPool = (SMemPool*)poolHandle;
2✔
1596
  SMPSession* pSession = (SMPSession*)session;
2✔
1597
  SMPStatInput input = {.size = 0, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
2✔
1598

1599
  code = mpTrim(pPool, pSession, size, trimed);
2✔
1600

1601
  input.size = (trimed) ? (*trimed) : 0;
2✔
1602

1603
  MP_SET_FLAG(input.procFlags, ((0 == code) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
2!
1604
  mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_TRIM, &input);
2✔
1605

1606
  return code;
2✔
1607
}
1608

1609
int32_t taosMemPoolCallocJob(uint64_t jobId, uint64_t cId, void** ppJob) {
1,239✔
1610
  int32_t code = TSDB_CODE_SUCCESS;
1,239✔
1611
  *ppJob = taosMemoryCalloc(1, sizeof(SMPJob));
1,239!
1612
  if (NULL == *ppJob) {
1,239!
1613
    uError("calloc mp job failed, code: 0x%x", terrno);
×
1614
    MP_ERR_RET(terrno);
×
1615
  }
1616

1617
  SMPJob* pJob = (SMPJob*)*ppJob;
1,239✔
1618
  pJob->job.jobId = jobId;
1,239✔
1619
  pJob->job.clientId = cId;
1,239✔
1620

1621
  return code;
1,239✔
1622
}
1623

1624
int32_t taosMemPoolTryLockPool(void* poolHandle, bool readLock) {
×
1625
  if (NULL == poolHandle) {
×
1626
    return TSDB_CODE_INVALID_PARA;
×
1627
  }
1628

1629
  int32_t code = 0;
×
1630
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1631
  if (readLock) {
×
1632
    MP_TRY_LOCK(MP_READ, &pPool->cfgLock, code);
×
1633
  } else {
1634
    MP_TRY_LOCK(MP_WRITE, &pPool->cfgLock, code);
×
1635
  }
1636

1637
  return code;
×
1638
}
1639

1640
void taosMemPoolUnLockPool(void* poolHandle, bool readLock) {
×
1641
  if (NULL == poolHandle) {
×
1642
    return;
×
1643
  }
1644

1645
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1646
  if (readLock) {
×
1647
    MP_UNLOCK(MP_READ, &pPool->cfgLock);
×
1648
  } else {
1649
    MP_UNLOCK(MP_WRITE, &pPool->cfgLock);
×
1650
  }
1651
}
1652

1653
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd) {
×
1654
  if (NULL == poolHandle) {
×
1655
    *usedSize = 0;
×
1656
    *needEnd = false;
×
1657
    return;
×
1658
  }
1659
  
1660
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1661

1662
  *needEnd = true;
×
1663
  *usedSize = atomic_load_64(&pPool->allocMemSize);
×
1664
}
1665

1666
void taosMemPoolGetUsedSizeEnd(void* poolHandle) {
×
1667
  SMemPool* pPool = (SMemPool*)poolHandle;
×
1668
  if (NULL == pPool) {
×
1669
    return;
×
1670
  }
1671
  
1672
  MP_UNLOCK(MP_WRITE, &pPool->cfgLock);
×
1673
}
1674

1675
int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize) {
×
1676
  int32_t code = TSDB_CODE_SUCCESS;
×
1677
  if (NULL == session || (NULL == ppStat && NULL == allocSize && NULL == maxAllocSize)) {
×
1678
    uError("%s invalid input param, session:%p, ppStat:%p, allocSize:%p, maxAllocSize:%p", __FUNCTION__, session, ppStat, allocSize, maxAllocSize);
×
1679
    MP_ERR_RET(TSDB_CODE_INVALID_PARA);
×
1680
  }
1681

1682
  SMPSession* pSession = (SMPSession*)session;
×
1683

1684
  if (ppStat) {
×
1685
    *ppStat = &pSession->stat.statDetail;
×
1686
  }
1687
  if (allocSize) {
×
1688
    *allocSize = atomic_load_64(&pSession->allocMemSize);
×
1689
  }
1690
  if (maxAllocSize) {
×
1691
    *maxAllocSize = atomic_load_64(&pSession->maxAllocMemSize);
×
1692
  }
1693
  
1694
  return code;
×
1695
}
1696

1697
void taosMemPoolSchedTrim(void) {
×
1698
  mpSchedTrim(NULL);
×
1699
}
×
1700

1701
int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) {
133✔
1702
  int32_t code = TSDB_CODE_SUCCESS;
133✔
1703
  
1704
#ifdef LINUX  
1705
  if (!tsQueryUseMemoryPool) {
133!
1706
#endif  
1707
    uInfo("memory pool disabled cause of configured disabled");
×
1708
    return code;
×
1709
#ifdef LINUX  
1710
  }
1711
#endif
1712

1713
  code = taosGetTotalMemory(&tsTotalMemoryKB);
133✔
1714
  if (TSDB_CODE_SUCCESS != code) {
133!
1715
    uInfo("fail to system total memory, error: %s", tstrerror(code));
×
1716
    return code;
×
1717
  }
1718

1719
  if (tsTotalMemoryKB <= 0) {
133!
1720
    uInfo("memory pool disabled since no enough system total memory, size: %" PRId64 "KB", tsTotalMemoryKB);
×
1721
    return code;
×
1722
  }
1723

1724
  uInfo("total memory size: %" PRId64 "KB", tsTotalMemoryKB);
133!
1725

1726
  if (0 == tsMinReservedMemorySize) {
133!
1727
    tsMinReservedMemorySize = TMAX(MIN_RESERVE_MEM_SIZE, tsTotalMemoryKB / 1024 * MP_DEFAULT_RESERVE_MEM_PERCENT / 100);
133✔
1728
  }
1729
  
1730
  SMemPoolCfg cfg = {0};
133✔
1731
  int64_t sysAvailSize = 0;
133✔
1732
  
1733
  code = taosGetSysAvailMemory(&sysAvailSize);
133✔
1734
  if (code || sysAvailSize < MP_MIN_MEM_POOL_SIZE) {
133!
1735
    uInfo("memory pool disabled since no enough system available memory, size: %" PRId64, sysAvailSize);
×
1736
    code = TSDB_CODE_SUCCESS;
×
1737
    return code;
×
1738
  }
1739

1740
  cfg.reserveSize = tsMinReservedMemorySize * 1048576UL;
133✔
1741

1742
  int64_t freeSizeAfterRes = sysAvailSize - tsMinReservedMemorySize * 1048576UL;
133✔
1743
  if (freeSizeAfterRes < MP_MIN_FREE_SIZE_AFTER_RESERVE) {
133!
1744
    uInfo("memory pool disabled since no enough system available memory after reservied, size: %" PRId64, freeSizeAfterRes);
×
1745
    return code;
×
1746
  }
1747

1748
  atomic_store_64(&tsCurrentAvailMemorySize, sysAvailSize);
133✔
1749

1750
  cfg.evicPolicy = E_EVICT_AUTO; //TODO
133✔
1751
  cfg.chunkSize = 1048576;
133✔
1752
  cfg.jobQuota = &tsSingleQueryMaxMemorySize;
133✔
1753
  cfg.cb.failFp = failFp;
133✔
1754
  cfg.cb.reachFp  = reachFp;
133✔
1755
  
1756
  code = taosMemPoolOpen("taosd", &cfg, &gMemPoolHandle);
133✔
1757
  if (TSDB_CODE_SUCCESS != code) {
133!
1758
    return code;
×
1759
  }  
1760

1761
  code = mpCreateMgmtThread();
133✔
1762
  if (TSDB_CODE_SUCCESS != code) {
133!
1763
    return code;
×
1764
  }  
1765

1766
  uInfo("memory pool initialized, reservedSize:%dMB, freeAfterReserved:%" PRId64 "MB, jobQuota:%dMB", tsMinReservedMemorySize, freeSizeAfterRes/1048576UL, tsSingleQueryMaxMemorySize);
133!
1767

1768
  return code;
133✔
1769
}
1770

1771

1772
void taosAutoMemoryFree(void *ptr) {
208,149✔
1773
  if (NULL != gMemPoolHandle && threadPoolEnabled && threadPoolSession) {
208,149!
1774
    taosMemPoolFree(gMemPoolHandle, threadPoolSession, ptr, __FILE__, __LINE__);
55✔
1775
  } else {
1776
    taosMemFree(ptr);
208,094✔
1777
  }
1778
}
208,153✔
1779

1780

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc