• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3796

31 Mar 2025 10:39AM UTC coverage: 30.372% (-7.1%) from 37.443%
#3796

push

travis-ci

happyguoxy
test:add test cases

69287 of 309062 branches covered (22.42%)

Branch coverage included in aggregate %.

118044 of 307720 relevant lines covered (38.36%)

278592.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.41
/source/dnode/vnode/src/meta/metaCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "meta.h"
16

17
#ifdef TD_ENTERPRISE
18
extern const char* tkLogStb[];
19
extern const char* tkAuditStb[];
20
extern const int   tkLogStbNum;
21
extern const int   tkAuditStbNum;
22
#endif
23

24
#define TAG_FILTER_RES_KEY_LEN  32
25
#define META_CACHE_BASE_BUCKET  1024
26
#define META_CACHE_STATS_BUCKET 16
27

28
// (uid , suid) : child table
29
// (uid,     0) : normal table
30
// (suid, suid) : super table
31
typedef struct SMetaCacheEntry SMetaCacheEntry;
32
struct SMetaCacheEntry {
33
  SMetaCacheEntry* next;
34
  SMetaInfo        info;
35
};
36

37
typedef struct SMetaStbStatsEntry {
38
  struct SMetaStbStatsEntry* next;
39
  SMetaStbStats              info;
40
} SMetaStbStatsEntry;
41

42
typedef struct STagFilterResEntry {
43
  SHashObj *set;    // the set of md5 digest, extracted from the serialized tag query condition
44
  uint32_t hitTimes;  // queried times for current super table
45
} STagFilterResEntry;
46

47
struct SMetaCache {
48
  // child, normal, super, table entry cache
49
  struct SEntryCache {
50
    int32_t           nEntry;
51
    int32_t           nBucket;
52
    SMetaCacheEntry** aBucket;
53
  } sEntryCache;
54

55
  // stable stats cache
56
  struct SStbStatsCache {
57
    int32_t              nEntry;
58
    int32_t              nBucket;
59
    SMetaStbStatsEntry** aBucket;
60
  } sStbStatsCache;
61

62
  // query cache
63
  struct STagFilterResCache {
64
    TdThreadMutex lock;
65
    uint32_t      accTimes;
66
    SHashObj*     pTableEntry;
67
    SLRUCache*    pUidResCache;
68
  } sTagFilterResCache;
69

70
  struct STbGroupResCache {
71
    TdThreadMutex lock;
72
    uint32_t      accTimes;
73
    SHashObj*     pTableEntry;
74
    SLRUCache*    pResCache;
75
  } STbGroupResCache;
76

77
  struct STbFilterCache {
78
    SHashObj* pStb;
79
    SHashObj* pStbName;
80
  } STbFilterCache;
81
};
82

83
static void entryCacheClose(SMeta* pMeta) {
21✔
84
  if (pMeta->pCache) {
21!
85
    // close entry cache
86
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
21,525✔
87
      SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
21,504✔
88
      while (pEntry) {
21,520✔
89
        SMetaCacheEntry* tEntry = pEntry->next;
16✔
90
        taosMemoryFree(pEntry);
16!
91
        pEntry = tEntry;
16✔
92
      }
93
    }
94
    taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
21!
95
  }
96
}
21✔
97

98
static void statsCacheClose(SMeta* pMeta) {
21✔
99
  if (pMeta->pCache) {
21!
100
    // close entry cache
101
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sStbStatsCache.nBucket; iBucket++) {
357✔
102
      SMetaStbStatsEntry* pEntry = pMeta->pCache->sStbStatsCache.aBucket[iBucket];
336✔
103
      while (pEntry) {
345✔
104
        SMetaStbStatsEntry* tEntry = pEntry->next;
9✔
105
        taosMemoryFree(pEntry);
9!
106
        pEntry = tEntry;
9✔
107
      }
108
    }
109
    taosMemoryFree(pMeta->pCache->sStbStatsCache.aBucket);
21!
110
  }
111
}
21✔
112

113
static void freeCacheEntryFp(void* param) {
×
114
  STagFilterResEntry** p = param;
×
115
  taosHashCleanup((*p)->set);
×
116
  taosMemoryFreeClear(*p);
×
117
}
×
118

119
int32_t metaCacheOpen(SMeta* pMeta) {
21✔
120
  int32_t code = 0;
21✔
121
  int32_t lino;
122

123
  pMeta->pCache = (SMetaCache*)taosMemoryCalloc(1, sizeof(SMetaCache));
21!
124
  if (pMeta->pCache == NULL) {
21!
125
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
126
  }
127

128
  // open entry cache
129
  pMeta->pCache->sEntryCache.nEntry = 0;
21✔
130
  pMeta->pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
21✔
131
  pMeta->pCache->sEntryCache.aBucket =
42✔
132
      (SMetaCacheEntry**)taosMemoryCalloc(pMeta->pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
21!
133
  if (pMeta->pCache->sEntryCache.aBucket == NULL) {
21!
134
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
135
  }
136

137
  // open stats cache
138
  pMeta->pCache->sStbStatsCache.nEntry = 0;
21✔
139
  pMeta->pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
21✔
140
  pMeta->pCache->sStbStatsCache.aBucket =
42✔
141
      (SMetaStbStatsEntry**)taosMemoryCalloc(pMeta->pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
21!
142
  if (pMeta->pCache->sStbStatsCache.aBucket == NULL) {
21!
143
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
144
  }
145

146
  pMeta->pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
21✔
147
  if (pMeta->pCache->sTagFilterResCache.pUidResCache == NULL) {
21!
148
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
149
  }
150

151
  pMeta->pCache->sTagFilterResCache.accTimes = 0;
21✔
152
  pMeta->pCache->sTagFilterResCache.pTableEntry =
42✔
153
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
21✔
154
  if (pMeta->pCache->sTagFilterResCache.pTableEntry == NULL) {
21!
155
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
156
  }
157

158
  taosHashSetFreeFp(pMeta->pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
21✔
159
  (void)taosThreadMutexInit(&pMeta->pCache->sTagFilterResCache.lock, NULL);
21✔
160

161
  pMeta->pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
21✔
162
  if (pMeta->pCache->STbGroupResCache.pResCache == NULL) {
21!
163
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
164
  }
165

166
  pMeta->pCache->STbGroupResCache.accTimes = 0;
21✔
167
  pMeta->pCache->STbGroupResCache.pTableEntry =
42✔
168
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
21✔
169
  if (pMeta->pCache->STbGroupResCache.pTableEntry == NULL) {
21!
170
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
171
  }
172

173
  taosHashSetFreeFp(pMeta->pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
21✔
174
  (void)taosThreadMutexInit(&pMeta->pCache->STbGroupResCache.lock, NULL);
21✔
175

176
  pMeta->pCache->STbFilterCache.pStb =
42✔
177
      taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
21✔
178
  if (pMeta->pCache->STbFilterCache.pStb == NULL) {
21!
179
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
180
  }
181

182
  pMeta->pCache->STbFilterCache.pStbName =
42✔
183
      taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
21✔
184
  if (pMeta->pCache->STbFilterCache.pStbName == NULL) {
21!
185
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
186
  }
187

188
_exit:
21✔
189
  if (code) {
21!
190
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
191
    metaCacheClose(pMeta);
×
192
  } else {
193
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
21✔
194
  }
195
  return code;
21✔
196
}
197

198
void metaCacheClose(SMeta* pMeta) {
21✔
199
  if (pMeta->pCache) {
21!
200
    entryCacheClose(pMeta);
21✔
201
    statsCacheClose(pMeta);
21✔
202

203
    taosHashClear(pMeta->pCache->sTagFilterResCache.pTableEntry);
21✔
204
    taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
21✔
205
    (void)taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
21✔
206
    taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
21✔
207

208
    taosHashClear(pMeta->pCache->STbGroupResCache.pTableEntry);
21✔
209
    taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
21✔
210
    (void)taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
21✔
211
    taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);
21✔
212

213
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStb);
21✔
214
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStbName);
21✔
215

216
    taosMemoryFree(pMeta->pCache);
21!
217
    pMeta->pCache = NULL;
21✔
218
  }
219
}
21✔
220

221
static void metaRehashCache(SMetaCache* pCache, int8_t expand) {
×
222
  int32_t code = 0;
×
223
  int32_t nBucket;
224

225
  if (expand) {
×
226
    nBucket = pCache->sEntryCache.nBucket * 2;
×
227
  } else {
228
    nBucket = pCache->sEntryCache.nBucket / 2;
×
229
  }
230

231
  SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
×
232
  if (aBucket == NULL) {
×
233
    return;
×
234
  }
235

236
  // rehash
237
  for (int32_t iBucket = 0; iBucket < pCache->sEntryCache.nBucket; iBucket++) {
×
238
    SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
×
239

240
    while (pEntry) {
×
241
      SMetaCacheEntry* pTEntry = pEntry->next;
×
242

243
      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
×
244
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
×
245

246
      pEntry = pTEntry;
×
247
    }
248
  }
249

250
  // final set
251
  taosMemoryFree(pCache->sEntryCache.aBucket);
×
252
  pCache->sEntryCache.nBucket = nBucket;
×
253
  pCache->sEntryCache.aBucket = aBucket;
×
254
  return;
×
255
}
256

257
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
36✔
258
  int32_t code = 0;
36✔
259

260
  // meta is wlocked for calling this func.
261

262
  // search
263
  SMetaCache*       pCache = pMeta->pCache;
36✔
264
  int32_t           iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
36✔
265
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
36✔
266
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
36!
267
    ppEntry = &(*ppEntry)->next;
×
268
  }
269

270
  if (*ppEntry) {  // update
36✔
271
    if (pInfo->suid != (*ppEntry)->info.suid) {
16!
272
      metaError("meta/cache: suid should be same as the one in cache.");
×
273
      return TSDB_CODE_INVALID_PARA;
×
274
    }
275
    if (pInfo->version > (*ppEntry)->info.version) {
16!
276
      (*ppEntry)->info.version = pInfo->version;
16✔
277
      (*ppEntry)->info.skmVer = pInfo->skmVer;
16✔
278
    }
279
  } else {  // insert
280
    if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
20!
281
      metaRehashCache(pCache, 1);
×
282

283
      iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
×
284
    }
285

286
    SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
20!
287
    if (pEntryNew == NULL) {
20!
288
      code = terrno;
×
289
      goto _exit;
×
290
    }
291

292
    pEntryNew->info = *pInfo;
20✔
293
    pEntryNew->next = pCache->sEntryCache.aBucket[iBucket];
20✔
294
    pCache->sEntryCache.aBucket[iBucket] = pEntryNew;
20✔
295
    pCache->sEntryCache.nEntry++;
20✔
296
  }
297

298
_exit:
36✔
299
  return code;
36✔
300
}
301

302
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
4✔
303
  int32_t code = 0;
4✔
304

305
  SMetaCache*       pCache = pMeta->pCache;
4✔
306
  int32_t           iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
4✔
307
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
4✔
308
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
4!
309
    ppEntry = &(*ppEntry)->next;
×
310
  }
311

312
  SMetaCacheEntry* pEntry = *ppEntry;
4✔
313
  if (pEntry) {
4!
314
    *ppEntry = pEntry->next;
4✔
315
    taosMemoryFree(pEntry);
4!
316
    pCache->sEntryCache.nEntry--;
4✔
317
    if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
4!
318
        pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
4!
319
      metaRehashCache(pCache, 0);
×
320
    }
321
  } else {
322
    code = TSDB_CODE_NOT_FOUND;
×
323
  }
324

325
_exit:
4✔
326
  return code;
4✔
327
}
328

329
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
156✔
330
  int32_t code = 0;
156✔
331

332
  SMetaCache*      pCache = pMeta->pCache;
156✔
333
  int32_t          iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
156✔
334
  SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
156✔
335

336
  while (pEntry && pEntry->info.uid != uid) {
156!
337
    pEntry = pEntry->next;
×
338
  }
339

340
  if (pEntry) {
156✔
341
    if (pInfo) {
117!
342
      *pInfo = pEntry->info;
117✔
343
    }
344
  } else {
345
    code = TSDB_CODE_NOT_FOUND;
39✔
346
  }
347

348
  return code;
156✔
349
}
350

351
static int32_t metaRehashStatsCache(SMetaCache* pCache, int8_t expand) {
×
352
  int32_t code = 0;
×
353
  int32_t nBucket;
354

355
  if (expand) {
×
356
    nBucket = pCache->sStbStatsCache.nBucket * 2;
×
357
  } else {
358
    nBucket = pCache->sStbStatsCache.nBucket / 2;
×
359
  }
360

361
  SMetaStbStatsEntry** aBucket = (SMetaStbStatsEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaStbStatsEntry*));
×
362
  if (aBucket == NULL) {
×
363
    code = terrno;
×
364
    goto _exit;
×
365
  }
366

367
  // rehash
368
  for (int32_t iBucket = 0; iBucket < pCache->sStbStatsCache.nBucket; iBucket++) {
×
369
    SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
×
370

371
    while (pEntry) {
×
372
      SMetaStbStatsEntry* pTEntry = pEntry->next;
×
373

374
      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
×
375
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
×
376

377
      pEntry = pTEntry;
×
378
    }
379
  }
380

381
  // final set
382
  taosMemoryFree(pCache->sStbStatsCache.aBucket);
×
383
  pCache->sStbStatsCache.nBucket = nBucket;
×
384
  pCache->sStbStatsCache.aBucket = aBucket;
×
385

386
_exit:
×
387
  return code;
×
388
}
389

390
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
13✔
391
  int32_t code = 0;
13✔
392

393
  // meta is wlocked for calling this func.
394

395
  // search
396
  SMetaCache*          pCache = pMeta->pCache;
13✔
397
  int32_t              iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
13✔
398
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
13✔
399
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
13!
400
    ppEntry = &(*ppEntry)->next;
×
401
  }
402

403
  if (*ppEntry) {  // update
13✔
404
    (*ppEntry)->info.ctbNum = pInfo->ctbNum;
4✔
405
    (*ppEntry)->info.colNum = pInfo->colNum;
4✔
406
    (*ppEntry)->info.keep = pInfo->keep;
4✔
407
  } else {  // insert
408
    if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
9!
409
      TAOS_UNUSED(metaRehashStatsCache(pCache, 1));
×
410
      iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
×
411
    }
412

413
    SMetaStbStatsEntry* pEntryNew = (SMetaStbStatsEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
9!
414
    if (pEntryNew == NULL) {
9!
415
      code = terrno;
×
416
      goto _exit;
×
417
    }
418

419
    pEntryNew->info = *pInfo;
9✔
420
    pEntryNew->next = pCache->sStbStatsCache.aBucket[iBucket];
9✔
421
    pCache->sStbStatsCache.aBucket[iBucket] = pEntryNew;
9✔
422
    pCache->sStbStatsCache.nEntry++;
9✔
423
  }
424

425
_exit:
13✔
426
  return code;
13✔
427
}
428

429
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
4✔
430
  int32_t code = 0;
4✔
431

432
  SMetaCache*          pCache = pMeta->pCache;
4✔
433
  int32_t              iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
4✔
434
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
4✔
435
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
4!
436
    ppEntry = &(*ppEntry)->next;
×
437
  }
438

439
  SMetaStbStatsEntry* pEntry = *ppEntry;
4✔
440
  if (pEntry) {
4!
441
    *ppEntry = pEntry->next;
×
442
    taosMemoryFree(pEntry);
×
443
    pCache->sStbStatsCache.nEntry--;
×
444
    if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
×
445
        pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
×
446
      TAOS_UNUSED(metaRehashStatsCache(pCache, 0));
×
447
    }
448
  } else {
449
    code = TSDB_CODE_NOT_FOUND;
4✔
450
  }
451

452
_exit:
4✔
453
  return code;
4✔
454
}
455

456
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
33✔
457
  int32_t code = TSDB_CODE_SUCCESS;
33✔
458

459
  SMetaCache*         pCache = pMeta->pCache;
33✔
460
  int32_t             iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
33✔
461
  SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
33✔
462

463
  while (pEntry && pEntry->info.uid != uid) {
33!
464
    pEntry = pEntry->next;
×
465
  }
466

467
  if (pEntry) {
33✔
468
    if (pInfo) {
11!
469
      *pInfo = pEntry->info;
11✔
470
    }
471
  } else {
472
    code = TSDB_CODE_NOT_FOUND;
22✔
473
  }
474

475
  return code;
33✔
476
}
477

478
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
479
  memcpy(&pBuf[2], key, keyLen);
2✔
480
}
×
481

482
// the format of key:
483
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
484
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
2✔
485
  buf[0] = (uint64_t)pHashMap;
2✔
486
  buf[1] = suid;
2✔
487
  setMD5DigestInKey(buf, key, keyLen);
488
}
2✔
489

490
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
×
491
                                  bool* acquireRes) {
492
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
493
  int32_t vgId = TD_VID(pMeta->pVnode);
×
494

495
  // generate the composed key for LRU cache
496
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
×
497
  SHashObj*      pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
×
498
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
×
499

500
  *acquireRes = 0;
×
501
  uint64_t key[4];
502
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
×
503

504
  (void)taosThreadMutexLock(pLock);
×
505
  pMeta->pCache->sTagFilterResCache.accTimes += 1;
×
506

507
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
×
508
  if (pHandle == NULL) {
×
509
    (void)taosThreadMutexUnlock(pLock);
×
510
    return TSDB_CODE_SUCCESS;
×
511
  }
512

513
  // do some book mark work after acquiring the filter result from cache
514
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
×
515
  if (NULL == pEntry) {
×
516
    metaError("meta/cache: pEntry should not be NULL.");
×
517
    return TSDB_CODE_NOT_FOUND;
×
518
  }
519

520
  *acquireRes = 1;
×
521

522
  const char* p = taosLRUCacheValue(pCache, pHandle);
×
523
  int32_t     size = *(int32_t*)p;
×
524

525
  // set the result into the buffer
526
  if (taosArrayAddBatch(pList1, p + sizeof(int32_t), size) == NULL) {
×
527
    return terrno;
×
528
  }
529

530
  (*pEntry)->hitTimes += 1;
×
531

532
  uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
×
533
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
×
534
    metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
×
535
             ((double)(*pEntry)->hitTimes) / acc);
536
  }
537

538
  bool ret = taosLRUCacheRelease(pCache, pHandle, false);
×
539

540
  // unlock meta
541
  (void)taosThreadMutexUnlock(pLock);
×
542
  return TSDB_CODE_SUCCESS;
×
543
}
544

545
static void freeUidCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
×
546
  (void)ud;
547
  if (value == NULL) {
×
548
    return;
×
549
  }
550

551
  const uint64_t* p = key;
×
552
  if (keyLen != sizeof(int64_t) * 4) {
×
553
    metaError("key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
×
554
    return;
×
555
  }
556

557
  SHashObj* pHashObj = (SHashObj*)p[0];
×
558

559
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
×
560

561
  if (pEntry != NULL && (*pEntry) != NULL) {
×
562
    int64_t st = taosGetTimestampUs();
×
563
    int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
×
564
    if (code == TSDB_CODE_SUCCESS) {
×
565
      double el = (taosGetTimestampUs() - st) / 1000.0;
×
566
      metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", taosHashGetSize((*pEntry)->set),
×
567
               el);
568
    }
569
  }
570

571
  taosMemoryFree(value);
×
572
}
573

574
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
×
575
  int32_t             code = TSDB_CODE_SUCCESS;
×
576
  int32_t             lino = 0;
×
577
  STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
×
578
  TSDB_CHECK_NULL(p, code, lino, _end, terrno);
×
579

580
  p->hitTimes = 0;
×
581
  p->set = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
×
582
  TSDB_CHECK_NULL(p->set, code, lino, _end, terrno);
×
583
  code = taosHashPut(p->set, pKey, keyLen, NULL, 0);
×
584
  TSDB_CHECK_CODE(code, lino, _end);
×
585
  code = taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
×
586
  TSDB_CHECK_CODE(code, lino, _end);
×
587

588
_end:
×
589
  if (code != TSDB_CODE_SUCCESS) {
×
590
    metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
591
    if (p != NULL) {
×
592
      if (p->set != NULL) {
×
593
        taosHashCleanup(p->set);
×
594
      }
595
      taosMemoryFree(p);
×
596
    }
597
  }
598
  return code;
×
599
}
600

601
// check both the payload size and selectivity ratio
602
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
×
603
                              int32_t payloadLen, double selectivityRatio) {
604
  int32_t code = 0;
×
605
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
606
  int32_t vgId = TD_VID(pMeta->pVnode);
×
607

608
  if (selectivityRatio > tsSelectivityRatio) {
×
609
    metaDebug("vgId:%d, suid:%" PRIu64
×
610
              " failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
611
              vgId, suid, selectivityRatio, tsSelectivityRatio);
612
    taosMemoryFree(pPayload);
×
613
    return TSDB_CODE_SUCCESS;
×
614
  }
615

616
  if (payloadLen > tsTagFilterResCacheSize) {
×
617
    metaDebug("vgId:%d, suid:%" PRIu64
×
618
              " failed to add to uid list cache, due to payload length %d greater than threshold %d",
619
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
620
    taosMemoryFree(pPayload);
×
621
    return TSDB_CODE_SUCCESS;
×
622
  }
623

624
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
×
625
  SHashObj*      pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
×
626
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
×
627

628
  uint64_t key[4] = {0};
×
629
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
×
630

631
  (void)taosThreadMutexLock(pLock);
×
632
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
×
633
  if (pEntry == NULL) {
×
634
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
×
635
    if (code != TSDB_CODE_SUCCESS) {
×
636
      goto _end;
×
637
    }
638
  } else {  // check if it exists or not
639
    code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
×
640
    if (code == TSDB_CODE_DUP_KEY) {
×
641
      // we have already found the existed items, no need to added to cache anymore.
642
      (void)taosThreadMutexUnlock(pLock);
×
643
      return TSDB_CODE_SUCCESS;
×
644
    }
645
    if (code != TSDB_CODE_SUCCESS) {
×
646
      goto _end;
×
647
    }
648
  }
649

650
  // add to cache.
651
  (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL, NULL,
×
652
                           TAOS_LRU_PRIORITY_LOW, NULL);
653
_end:
×
654
  (void)taosThreadMutexUnlock(pLock);
×
655
  metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
×
656
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
657

658
  return code;
×
659
}
660

661
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
662
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
1✔
663
  uint64_t  p[4] = {0};
1✔
664
  int32_t   vgId = TD_VID(pMeta->pVnode);
1✔
665
  SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
1✔
666

667
  uint64_t dummy[2] = {0};
1✔
668
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
1✔
669

670
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
1✔
671
  (void)taosThreadMutexLock(pLock);
1✔
672

673
  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
1✔
674
  if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
1!
675
    (void)taosThreadMutexUnlock(pLock);
1✔
676
    return TSDB_CODE_SUCCESS;
1✔
677
  }
678

679
  (*pEntry)->hitTimes = 0;
×
680

681
  char *iter = taosHashIterate((*pEntry)->set, NULL);
×
682
  while (iter != NULL) {
×
683
    setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
684
    taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
×
685
    iter = taosHashIterate((*pEntry)->set, iter);
×
686
  }
687
  taosHashClear((*pEntry)->set);
×
688
  (void)taosThreadMutexUnlock(pLock);
×
689

690
  metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
×
691
  return TSDB_CODE_SUCCESS;
×
692
}
693

694
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
×
695
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
696
  int32_t vgId = TD_VID(pMeta->pVnode);
×
697

698
  // generate the composed key for LRU cache
699
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
×
700
  SHashObj*      pTableMap = pMeta->pCache->STbGroupResCache.pTableEntry;
×
701
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
×
702

703
  *pList = NULL;
×
704
  uint64_t key[4];
705
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
×
706

707
  (void)taosThreadMutexLock(pLock);
×
708
  pMeta->pCache->STbGroupResCache.accTimes += 1;
×
709

710
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
×
711
  if (pHandle == NULL) {
×
712
    (void)taosThreadMutexUnlock(pLock);
×
713
    return TSDB_CODE_SUCCESS;
×
714
  }
715

716
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
×
717
  if (NULL == pEntry) {
×
718
    metaDebug("suid %" PRIu64 " not in tb group cache", suid);
×
719
    return TSDB_CODE_NOT_FOUND;
×
720
  }
721

722
  *pList = taosArrayDup(taosLRUCacheValue(pCache, pHandle), NULL);
×
723

724
  (*pEntry)->hitTimes += 1;
×
725

726
  uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
×
727
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
×
728
    metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
×
729
             ((double)(*pEntry)->hitTimes) / acc);
730
  }
731

732
  bool ret = taosLRUCacheRelease(pCache, pHandle, false);
×
733

734
  // unlock meta
735
  (void)taosThreadMutexUnlock(pLock);
×
736
  return TSDB_CODE_SUCCESS;
×
737
}
738

739
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
×
740
  (void)ud;
741
  if (value == NULL) {
×
742
    return;
×
743
  }
744

745
  const uint64_t* p = key;
×
746
  if (keyLen != sizeof(int64_t) * 4) {
×
747
    metaError("tb group key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
×
748
    return;
×
749
  }
750

751
  SHashObj* pHashObj = (SHashObj*)p[0];
×
752

753
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
×
754

755
  if (pEntry != NULL && (*pEntry) != NULL) {
×
756
    int64_t st = taosGetTimestampUs();
×
757
    int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
×
758
    if (code == TSDB_CODE_SUCCESS) {
×
759
      double el = (taosGetTimestampUs() - st) / 1000.0;
×
760
      metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
×
761
                taosHashGetSize((*pEntry)->set), el);
762
    }
763
  }
764

765
  taosArrayDestroy((SArray*)value);
×
766
}
767

768
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
×
769
                              int32_t payloadLen) {
770
  int32_t code = 0;
×
771
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
772
  int32_t vgId = TD_VID(pMeta->pVnode);
×
773

774
  if (payloadLen > tsTagFilterResCacheSize) {
×
775
    metaDebug("vgId:%d, suid:%" PRIu64
×
776
              " ignore to add to tb group cache, due to payload length %d greater than threshold %d",
777
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
778
    taosArrayDestroy((SArray*)pPayload);
×
779
    return TSDB_CODE_SUCCESS;
×
780
  }
781

782
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
×
783
  SHashObj*      pTableEntry = pMeta->pCache->STbGroupResCache.pTableEntry;
×
784
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
×
785

786
  uint64_t key[4] = {0};
×
787
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
×
788

789
  (void)taosThreadMutexLock(pLock);
×
790
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
×
791
  if (pEntry == NULL) {
×
792
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
×
793
    if (code != TSDB_CODE_SUCCESS) {
×
794
      goto _end;
×
795
    }
796
  } else {  // check if it exists or not
797
    code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
×
798
    if (code == TSDB_CODE_DUP_KEY) {
×
799
      // we have already found the existed items, no need to added to cache anymore.
800
      (void)taosThreadMutexUnlock(pLock);
×
801
      return TSDB_CODE_SUCCESS;
×
802
    }
803
    if (code != TSDB_CODE_SUCCESS) {
×
804
      goto _end;
×
805
    }
806
  }
807

808
  // add to cache.
809
  (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL, NULL,
×
810
                           TAOS_LRU_PRIORITY_LOW, NULL);
811
_end:
×
812
  (void)taosThreadMutexUnlock(pLock);
×
813
  metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
×
814
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
815

816
  return code;
×
817
}
818

819
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
820
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
1✔
821
  uint64_t  p[4] = {0};
1✔
822
  int32_t   vgId = TD_VID(pMeta->pVnode);
1✔
823
  SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;
1✔
824

825
  uint64_t dummy[2] = {0};
1✔
826
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
1✔
827

828
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
1✔
829
  (void)taosThreadMutexLock(pLock);
1✔
830

831
  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
1✔
832
  if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
1!
833
    (void)taosThreadMutexUnlock(pLock);
1✔
834
    return TSDB_CODE_SUCCESS;
1✔
835
  }
836

837
  (*pEntry)->hitTimes = 0;
×
838

839
  char *iter = taosHashIterate((*pEntry)->set, NULL);
×
840
  while (iter != NULL) {
×
841
    setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
842
    taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
×
843
    iter = taosHashIterate((*pEntry)->set, iter);
×
844
  }
845
  taosHashClear((*pEntry)->set);
×
846
  (void)taosThreadMutexUnlock(pLock);
×
847

848
  metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
×
849
  return TSDB_CODE_SUCCESS;
×
850
}
851

852
bool metaTbInFilterCache(SMeta* pMeta, const void* key, int8_t type) {
8✔
853
  if (type == 0 && taosHashGet(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t))) {
8!
854
    return true;
×
855
  }
856

857
  if (type == 1 && taosHashGet(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key))) {
8!
858
    return true;
×
859
  }
860

861
  return false;
8✔
862
}
863

864
int32_t metaPutTbToFilterCache(SMeta* pMeta, const void* key, int8_t type) {
×
865
  if (type == 0) {
×
866
    return taosHashPut(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t), NULL, 0);
×
867
  }
868

869
  if (type == 1) {
×
870
    return taosHashPut(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key), NULL, 0);
×
871
  }
872

873
  return 0;
×
874
}
875

876
int32_t metaSizeOfTbFilterCache(SMeta* pMeta, int8_t type) {
×
877
  if (type == 0) {
×
878
    return taosHashGetSize(pMeta->pCache->STbFilterCache.pStb);
×
879
  }
880
  return 0;
×
881
}
882

883
int32_t metaInitTbFilterCache(SMeta* pMeta) {
21✔
884
#ifdef TD_ENTERPRISE
885
  int32_t      tbNum = 0;
21✔
886
  const char** pTbArr = NULL;
21✔
887
  const char*  dbName = NULL;
21✔
888

889
  if (!(dbName = strchr(pMeta->pVnode->config.dbname, '.'))) return 0;
21!
890
  if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
21!
891
    tbNum = tkLogStbNum;
×
892
    pTbArr = (const char**)&tkLogStb;
×
893
  } else if (0 == strncmp(dbName, "audit", TSDB_DB_NAME_LEN)) {
21!
894
    tbNum = tkAuditStbNum;
×
895
    pTbArr = (const char**)&tkAuditStb;
×
896
  }
897
  if (tbNum && pTbArr) {
21!
898
    for (int32_t i = 0; i < tbNum; ++i) {
×
899
      TAOS_CHECK_RETURN(metaPutTbToFilterCache(pMeta, pTbArr[i], 1));
×
900
    }
901
  }
902
#else
903
#endif
904
  return 0;
21✔
905
}
906

907
int64_t metaGetStbKeep(SMeta* pMeta, int64_t uid) {
×
908
  SMetaStbStats stats = {0};
×
909

910
  if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
×
911
    return stats.keep;
×
912
  }
913

914
  SMetaEntry* pEntry = NULL;
×
915
  if (metaFetchEntryByUid(pMeta, uid, &pEntry) == TSDB_CODE_SUCCESS) {
×
916
    int64_t keep = -1;
×
917
    if (pEntry->type == TSDB_SUPER_TABLE) {
×
918
      keep = pEntry->stbEntry.keep;
×
919
    }
920
    metaFetchEntryFree(&pEntry);
×
921
    return keep;
×
922
  }
923
  
924
  return -1;
×
925
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc