• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4840

09 Nov 2025 09:08AM UTC coverage: 67.809% (-2.6%) from 70.408%
#4840

push

travis-ci

web-flow
Merge a8d8f75bf into b63644c82

261 of 436 new or added lines in 5 files covered. (59.86%)

641 existing lines in 4 files now uncovered.

187985 of 277226 relevant lines covered (67.81%)

293024821.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.76
/source/dnode/vnode/src/meta/metaCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "meta.h"
16

17
#ifdef TD_ENTERPRISE
18
extern const char* tkLogStb[];
19
extern const char* tkAuditStb[];
20
extern const int   tkLogStbNum;
21
extern const int   tkAuditStbNum;
22
#endif
23

24
#define TAG_FILTER_RES_KEY_LEN  32
25
#define META_CACHE_BASE_BUCKET  1024
26
#define META_CACHE_STATS_BUCKET 16
27

28
// (uid , suid) : child table
29
// (uid,     0) : normal table
30
// (suid, suid) : super table
31
typedef struct SMetaCacheEntry SMetaCacheEntry;
32
struct SMetaCacheEntry {
33
  SMetaCacheEntry* next;
34
  SMetaInfo        info;
35
};
36

37
typedef struct SMetaStbStatsEntry {
38
  struct SMetaStbStatsEntry* next;
39
  SMetaStbStats              info;
40
} SMetaStbStatsEntry;
41

42
typedef struct STagFilterResEntry {
43
  SHashObj *set;    // the set of md5 digest, extracted from the serialized tag query condition
44
  uint32_t hitTimes;  // queried times for current super table
45
} STagFilterResEntry;
46

47
typedef struct STagCondFilterEntry {
48
  SArray*   pColIds;   // SArray<col_id_t>
49
  SHashObj* set;       // SHashObj<digest, SArray<uid>>
50
  uint32_t  hitTimes;  // queried times for current tag filter condition
51
} STagCondFilterEntry;
52

53
typedef struct STagConds {
54
  SHashObj* set;       // SHashObj<tagColIdStr, STagCondFilterEntry>
55
  uint32_t  hitTimes;  // queried times for current super table
56
} STagConds;
57

58
struct SMetaCache {
59
  // child, normal, super, table entry cache
60
  struct SEntryCache {
61
    int32_t           nEntry;
62
    int32_t           nBucket;
63
    SMetaCacheEntry** aBucket;
64
  } sEntryCache;
65

66
  // stable stats cache
67
  struct SStbStatsCache {
68
    int32_t              nEntry;
69
    int32_t              nBucket;
70
    SMetaStbStatsEntry** aBucket;
71
  } sStbStatsCache;
72

73
  // query cache
74
  struct STagFilterResCache {
75
    TdThreadMutex lock;
76
    uint32_t      accTimes;
77
    SHashObj*     pTableEntry;
78
    SLRUCache*    pUidResCache;
79
  } sTagFilterResCache;
80

81
  // cache table list for tag filter conditions
82
  // that match format "tag1 = v1 AND tag2 = v2 AND ..."
83
  struct SStableTagFilterResCache {
84
    TdThreadRwlock rwlock;
85
    uint32_t       accTimes;
86
    SHashObj*      pTableEntry;  // HashObj<suid, STagConds>
87
  } sStableTagFilterResCache;
88

76,932,655✔
89
  struct STbGroupResCache {
76,932,655✔
90
    TdThreadMutex lock;
91
    uint32_t      accTimes;
2,147,483,647✔
92
    SHashObj*     pTableEntry;
2,147,483,647✔
93
    SLRUCache*    pResCache;
2,147,483,647✔
94
  } STbGroupResCache;
866,343,769✔
95

866,343,751✔
96
  struct STbFilterCache {
866,294,557✔
97
    SHashObj* pStb;
98
    SHashObj* pStbName;
99
  } STbFilterCache;
76,939,185✔
100

101
  struct STbRefDbCache {
76,939,185✔
102
    TdThreadMutex lock;
103
    SHashObj*     pStbRefs; // key: suid, value: SHashObj<dbName, refTimes>
76,937,867✔
104
  } STbRefDbCache;
76,937,867✔
105
};
106

1,369,904,817✔
107
static void entryCacheClose(SMeta* pMeta) {
1,293,502,068✔
108
  if (pMeta->pCache) {
1,363,911,620✔
109
    // close entry cache
70,944,670✔
110
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
70,944,670✔
111
      SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
70,955,100✔
112
      while (pEntry) {
113
        SMetaCacheEntry* tEntry = pEntry->next;
114
        taosMemoryFree(pEntry);
76,936,947✔
115
        pEntry = tEntry;
116
      }
76,937,867✔
117
    }
118
    taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
11,034✔
119
  }
11,034✔
120
}
11,034✔
121

11,034✔
122
static void statsCacheClose(SMeta* pMeta) {
11,034✔
123
  if (pMeta->pCache) {
124
    // close entry cache
184,108✔
125
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sStbStatsCache.nBucket; iBucket++) {
184,108✔
126
      SMetaStbStatsEntry* pEntry = pMeta->pCache->sStbStatsCache.aBucket[iBucket];
184,108✔
127
      while (pEntry) {
184,108✔
128
        SMetaStbStatsEntry* tEntry = pEntry->next;
184,108✔
129
        taosMemoryFree(pEntry);
130
        pEntry = tEntry;
76,849,569✔
131
      }
76,849,569✔
132
    }
133
    taosMemoryFree(pMeta->pCache->sStbStatsCache.aBucket);
134
  }
76,849,569✔
135
}
76,921,684✔
UNCOV
136

×
137
static void freeCacheEntryFp(void* param) {
138
  STagFilterResEntry** p = param;
139
  taosHashCleanup((*p)->set);
140
  taosMemoryFreeClear(*p);
76,923,768✔
141
}
76,924,227✔
142

76,948,215✔
143
static void freeTagFilterEntryFp(void* param) {
76,921,725✔
144
  STagCondFilterEntry** p = param;
76,940,514✔
NEW
145
  taosArrayDestroy((*p)->pColIds);
×
146
  taosHashCleanup((*p)->set);
147
  taosMemoryFreeClear(*p);
148
}
149

76,938,443✔
150
static void freeTagCondsFp(void* param) {
76,940,090✔
151
  STagConds** p = param;
76,942,287✔
152
  taosHashCleanup((*p)->set);
76,940,514✔
153
  taosMemoryFreeClear(*p);
76,936,514✔
NEW
154
}
×
155

156
static void freeRefDbFp(void* param) {
157
  SHashObj** p = param;
158
  taosHashCleanup(*p);
76,936,799✔
159
  *p = NULL;
76,940,514✔
UNCOV
160
}
×
161

162
int32_t metaCacheOpen(SMeta* pMeta) {
163
  int32_t code = 0;
76,940,514✔
164
  int32_t lino;
153,880,602✔
165

153,869,350✔
166
  pMeta->pCache = (SMetaCache*)taosMemoryCalloc(1, sizeof(SMetaCache));
76,940,514✔
UNCOV
167
  if (pMeta->pCache == NULL) {
×
168
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
169
  }
170

76,940,514✔
171
  // open entry cache
76,940,514✔
172
  pMeta->pCache->sEntryCache.nEntry = 0;
173
  pMeta->pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
174
  pMeta->pCache->sEntryCache.aBucket =
76,939,380✔
175
      (SMetaCacheEntry**)taosMemoryCalloc(pMeta->pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
76,940,514✔
UNCOV
176
  if (pMeta->pCache->sEntryCache.aBucket == NULL) {
×
177
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
178
  }
179

76,940,514✔
180
  // open stats cache
153,881,028✔
181
  pMeta->pCache->sStbStatsCache.nEntry = 0;
153,870,202✔
182
  pMeta->pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
76,940,514✔
UNCOV
183
  pMeta->pCache->sStbStatsCache.aBucket =
×
184
      (SMetaStbStatsEntry**)taosMemoryCalloc(pMeta->pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
185
  if (pMeta->pCache->sStbStatsCache.aBucket == NULL) {
186
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
76,940,514✔
187
  }
76,940,509✔
188

189
  // open tag filter cache
190
  pMeta->pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
153,881,023✔
191
  if (pMeta->pCache->sTagFilterResCache.pUidResCache == NULL) {
153,870,192✔
192
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
76,940,514✔
UNCOV
193
  }
×
194

195
  pMeta->pCache->sTagFilterResCache.accTimes = 0;
196
  pMeta->pCache->sTagFilterResCache.pTableEntry =
153,880,336✔
197
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
153,868,114✔
198
  if (pMeta->pCache->sTagFilterResCache.pTableEntry == NULL) {
76,940,514✔
199
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
200
  }
201

202
  taosHashSetFreeFp(pMeta->pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
203
  (void)taosThreadMutexInit(&pMeta->pCache->sTagFilterResCache.lock, NULL);
153,876,431✔
204

153,866,619✔
205
  // open stable tag filter cache
76,940,514✔
NEW
206
  pMeta->pCache->sStableTagFilterResCache.accTimes = 0;
×
207
  pMeta->pCache->sStableTagFilterResCache.pTableEntry =
208
    taosHashInit(1024,
209
      taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
76,940,514✔
210
  if (pMeta->pCache->sStableTagFilterResCache.pTableEntry == NULL) {
76,940,514✔
211
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
212
  }
213
  taosHashSetFreeFp(
76,940,514✔
214
    pMeta->pCache->sStableTagFilterResCache.pTableEntry, freeTagCondsFp);
76,940,514✔
NEW
215

×
NEW
216
  TAOS_UNUSED(taosThreadRwlockInit(
×
217
    &pMeta->pCache->sStableTagFilterResCache.rwlock, NULL));
218

76,940,514✔
219
  // open group res cache
220
  pMeta->pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
76,938,883✔
221
  if (pMeta->pCache->STbGroupResCache.pResCache == NULL) {
222
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
223
  }
76,934,240✔
224

76,934,240✔
225
  pMeta->pCache->STbGroupResCache.accTimes = 0;
76,934,240✔
226
  pMeta->pCache->STbGroupResCache.pTableEntry =
76,936,523✔
227
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
228
  if (pMeta->pCache->STbGroupResCache.pTableEntry == NULL) {
76,937,867✔
229
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
76,936,678✔
230
  }
76,936,393✔
231

76,939,185✔
232
  taosHashSetFreeFp(pMeta->pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
233
  (void)taosThreadMutexInit(&pMeta->pCache->STbGroupResCache.lock, NULL);
76,939,185✔
234

76,939,185✔
235
  // open filter cache
76,936,080✔
236
  pMeta->pCache->STbFilterCache.pStb =
76,939,185✔
237
      taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
238
  if (pMeta->pCache->STbFilterCache.pStb == NULL) {
76,937,813✔
239
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
76,939,185✔
240
  }
241

76,937,813✔
242
  pMeta->pCache->STbFilterCache.pStbName =
76,939,185✔
243
      taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
76,939,185✔
244
  if (pMeta->pCache->STbFilterCache.pStbName == NULL) {
245
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
76,939,185✔
246
  }
76,937,813✔
247

248
  // open ref db cache
76,940,503✔
249
  pMeta->pCache->STbRefDbCache.pStbRefs =
250
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
243,604✔
251
  if (pMeta->pCache->STbRefDbCache.pStbRefs == NULL) {
243,604✔
252
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
253
  }
254

243,604✔
255
  taosHashSetFreeFp(pMeta->pCache->STbRefDbCache.pStbRefs, freeRefDbFp);
243,604✔
256
  (void)taosThreadMutexInit(&pMeta->pCache->STbRefDbCache.lock, NULL);
UNCOV
257

×
258

259
_exit:
260
  if (code) {
243,604✔
261
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
243,604✔
262
    metaCacheClose(pMeta);
×
263
  } else {
264
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
265
  }
266
  return code;
650,064,788✔
267
}
649,821,184✔
268

269
void metaCacheClose(SMeta* pMeta) {
1,299,642,368✔
270
  if (pMeta->pCache) {
649,821,184✔
271
    entryCacheClose(pMeta);
272
    statsCacheClose(pMeta);
649,821,184✔
273

649,821,184✔
274
    taosHashClear(pMeta->pCache->sTagFilterResCache.pTableEntry);
275
    taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
649,821,184✔
276
    (void)taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
277
    taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
278

279
    (void)taosThreadRwlockDestroy(&pMeta->pCache->sStableTagFilterResCache.rwlock);
280
    taosHashCleanup(pMeta->pCache->sStableTagFilterResCache.pTableEntry);
243,604✔
281

243,604✔
282
    taosHashClear(pMeta->pCache->STbGroupResCache.pTableEntry);
243,604✔
283
    taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
243,604✔
284
    (void)taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
285
    taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);
286

1,025,463,420✔
287
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStb);
1,025,463,420✔
288
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStbName);
289

290
    taosHashClear(pMeta->pCache->STbRefDbCache.pStbRefs);
291
    (void)taosThreadMutexDestroy(&pMeta->pCache->STbRefDbCache.lock);
292
    taosHashCleanup(pMeta->pCache->STbRefDbCache.pStbRefs);
1,025,463,420✔
293

1,025,661,912✔
294
    taosMemoryFree(pMeta->pCache);
1,025,538,220✔
295
    pMeta->pCache = NULL;
1,205,963,853✔
296
  }
180,364,973✔
297
}
298

299
static void metaRehashCache(SMetaCache* pCache, int8_t expand) {
1,025,209,528✔
300
  int32_t code = 0;
132,700,597✔
UNCOV
301
  int32_t nBucket;
×
UNCOV
302

×
303
  if (expand) {
304
    nBucket = pCache->sEntryCache.nBucket * 2;
132,499,718✔
305
  } else {
132,778,815✔
306
    nBucket = pCache->sEntryCache.nBucket / 2;
132,489,863✔
307
  }
308

309
  SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
892,925,680✔
310
  if (aBucket == NULL) {
243,604✔
311
    return;
312
  }
243,604✔
313

314
  // rehash
315
  for (int32_t iBucket = 0; iBucket < pCache->sEntryCache.nBucket; iBucket++) {
892,856,952✔
316
    SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
892,724,247✔
UNCOV
317

×
UNCOV
318
    while (pEntry) {
×
319
      SMetaCacheEntry* pTEntry = pEntry->next;
320

321
      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
892,724,247✔
322
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
892,789,767✔
323

892,162,926✔
324
      pEntry = pTEntry;
892,741,300✔
325
    }
326
  }
327

1,025,494,467✔
328
  // final set
1,025,494,467✔
329
  taosMemoryFree(pCache->sEntryCache.aBucket);
330
  pCache->sEntryCache.nBucket = nBucket;
331
  pCache->sEntryCache.aBucket = aBucket;
27,383,615✔
332
  return;
27,383,615✔
333
}
334

27,383,615✔
335
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
27,392,760✔
336
  int32_t code = 0;
27,383,120✔
337

27,542,498✔
338
  // meta is wlocked for calling this func.
149,776✔
339

340
  // search
341
  SMetaCache*       pCache = pMeta->pCache;
27,393,601✔
342
  int32_t           iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
27,392,760✔
343
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
26,608,552✔
344
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
26,608,552✔
345
    ppEntry = &(*ppEntry)->next;
26,608,334✔
346
  }
26,608,334✔
347

24,309,711✔
UNCOV
348
  if (*ppEntry) {  // update
×
349
    if (pInfo->suid != (*ppEntry)->info.suid) {
350
      metaError("meta/cache: suid should be same as the one in cache.");
351
      return TSDB_CODE_INVALID_PARA;
784,208✔
352
    }
353
    if (pInfo->version > (*ppEntry)->info.version) {
354
      (*ppEntry)->info.version = pInfo->version;
27,394,363✔
355
      (*ppEntry)->info.skmVer = pInfo->skmVer;
27,394,363✔
356
    }
357
  } else {  // insert
358
    if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
2,147,483,647✔
359
      metaRehashCache(pCache, 1);
2,147,483,647✔
360

361
      iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
2,147,483,647✔
362
    }
2,147,483,647✔
363

2,147,483,647✔
364
    SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
365
    if (pEntryNew == NULL) {
2,147,483,647✔
366
      code = terrno;
603,036,389✔
367
      goto _exit;
368
    }
369

2,147,483,647✔
370
    pEntryNew->info = *pInfo;
2,147,483,647✔
371
    pEntryNew->next = pCache->sEntryCache.aBucket[iBucket];
2,147,483,647✔
372
    pCache->sEntryCache.aBucket[iBucket] = pEntryNew;
373
    pCache->sEntryCache.nEntry++;
374
  }
984,611,595✔
375

376
_exit:
377
  return code;
2,147,483,647✔
378
}
379

380
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
1,380,191✔
381
  int32_t code = 0;
1,380,191✔
382

383
  SMetaCache*       pCache = pMeta->pCache;
384
  int32_t           iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
1,380,191✔
385
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
1,311,695✔
386
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
387
    ppEntry = &(*ppEntry)->next;
68,496✔
388
  }
389

390
  SMetaCacheEntry* pEntry = *ppEntry;
1,380,191✔
391
  if (pEntry) {
1,380,191✔
UNCOV
392
    *ppEntry = pEntry->next;
×
UNCOV
393
    taosMemoryFree(pEntry);
×
394
    pCache->sEntryCache.nEntry--;
395
    if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
396
        pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
397
      metaRehashCache(pCache, 0);
69,282,063✔
398
    }
67,901,872✔
399
  } else {
400
    code = TSDB_CODE_NOT_FOUND;
133,059,440✔
401
  }
65,157,568✔
402

403
_exit:
65,157,568✔
404
  return code;
65,157,568✔
405
}
406

65,157,568✔
407
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
408
  int32_t code = 0;
409

410
  SMetaCache*      pCache = pMeta->pCache;
411
  int32_t          iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
1,380,191✔
412
  SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
1,380,191✔
413

1,380,191✔
414
  while (pEntry && pEntry->info.uid != uid) {
415
    pEntry = pEntry->next;
1,380,191✔
416
  }
1,380,191✔
417

418
  if (pEntry) {
419
    if (pInfo) {
795,865,475✔
420
      *pInfo = pEntry->info;
795,865,475✔
421
    }
422
  } else {
423
    code = TSDB_CODE_NOT_FOUND;
424
  }
425

795,865,475✔
426
  return code;
795,972,544✔
427
}
795,988,115✔
428

832,906,643✔
429
static int32_t metaRehashStatsCache(SMetaCache* pCache, int8_t expand) {
37,141,125✔
430
  int32_t code = 0;
431
  int32_t nBucket;
432

795,720,188✔
433
  if (expand) {
719,781,205✔
434
    nBucket = pCache->sStbStatsCache.nBucket * 2;
719,855,948✔
435
  } else {
719,883,568✔
436
    nBucket = pCache->sStbStatsCache.nBucket / 2;
719,741,370✔
437
  }
438

76,044,982✔
439
  SMetaStbStatsEntry** aBucket = (SMetaStbStatsEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaStbStatsEntry*));
1,311,695✔
440
  if (aBucket == NULL) {
1,311,695✔
441
    code = terrno;
442
    goto _exit;
443
  }
76,071,620✔
444

76,061,328✔
UNCOV
445
  // rehash
×
UNCOV
446
  for (int32_t iBucket = 0; iBucket < pCache->sStbStatsCache.nBucket; iBucket++) {
×
447
    SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
448

449
    while (pEntry) {
76,061,328✔
450
      SMetaStbStatsEntry* pTEntry = pEntry->next;
76,070,887✔
451

76,062,945✔
452
      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
76,069,923✔
453
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
454

455
      pEntry = pTEntry;
795,783,902✔
456
    }
795,783,902✔
457
  }
458

459
  // final set
7,516,158✔
460
  taosMemoryFree(pCache->sStbStatsCache.aBucket);
7,516,158✔
461
  pCache->sStbStatsCache.nBucket = nBucket;
462
  pCache->sStbStatsCache.aBucket = aBucket;
7,516,158✔
463

7,517,979✔
464
_exit:
7,516,158✔
465
  return code;
8,617,636✔
466
}
1,101,478✔
467

468
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
469
  int32_t code = 0;
7,516,158✔
470

7,516,158✔
471
  // meta is wlocked for calling this func.
5,119,317✔
472

5,119,317✔
473
  // search
5,119,317✔
474
  SMetaCache*          pCache = pMeta->pCache;
5,119,317✔
475
  int32_t              iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
3,171,039✔
476
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
68,496✔
477
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
478
    ppEntry = &(*ppEntry)->next;
479
  }
2,396,841✔
480

481
  if (*ppEntry) {  // update
482
    (*ppEntry)->info.ctbNum = pInfo->ctbNum;
7,516,158✔
483
    (*ppEntry)->info.colNum = pInfo->colNum;
7,516,158✔
484
    (*ppEntry)->info.flags = pInfo->flags;
485
    (*ppEntry)->info.keep = pInfo->keep;
486
  } else {  // insert
2,147,483,647✔
487
    if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
2,147,483,647✔
488
      TAOS_UNUSED(metaRehashStatsCache(pCache, 1));
489
      iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
2,147,483,647✔
490
    }
2,147,483,647✔
491

2,147,483,647✔
492
    SMetaStbStatsEntry* pEntryNew = (SMetaStbStatsEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
493
    if (pEntryNew == NULL) {
2,147,483,647✔
494
      code = terrno;
376,502,667✔
495
      goto _exit;
496
    }
497

2,147,483,647✔
498
    pEntryNew->info = *pInfo;
2,147,483,647✔
499
    pEntryNew->next = pCache->sStbStatsCache.aBucket[iBucket];
2,147,483,647✔
500
    pCache->sStbStatsCache.aBucket[iBucket] = pEntryNew;
501
    pCache->sStbStatsCache.nEntry++;
502
  }
150,452,161✔
503

504
_exit:
505
  return code;
2,147,483,647✔
506
}
507

508
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
509
  int32_t code = 0;
1,557,049,664✔
510

1,557,016,550✔
511
  SMetaCache*          pCache = pMeta->pCache;
512
  int32_t              iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
513
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
514
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
1,556,929,207✔
515
    ppEntry = &(*ppEntry)->next;
1,556,929,207✔
516
  }
1,556,971,118✔
517

518
  SMetaStbStatsEntry* pEntry = *ppEntry;
1,557,042,308✔
519
  if (pEntry) {
520
    *ppEntry = pEntry->next;
77,238✔
521
    taosMemoryFree(pEntry);
522
    pCache->sStbStatsCache.nEntry--;
77,238✔
523
    if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
77,238✔
524
        pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
525
      TAOS_UNUSED(metaRehashStatsCache(pCache, 0));
526
    }
77,238✔
527
  } else {
77,238✔
528
    code = TSDB_CODE_NOT_FOUND;
77,238✔
529
  }
530

77,238✔
531
_exit:
77,238✔
532
  return code;
77,238✔
533
}
534

535
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
536
  int32_t code = TSDB_CODE_SUCCESS;
537

538
  SMetaCache*         pCache = pMeta->pCache;
539
  int32_t             iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
540
  SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
77,238✔
541

77,238✔
542
  while (pEntry && pEntry->info.uid != uid) {
543
    pEntry = pEntry->next;
77,238✔
544
  }
77,238✔
545

33,102✔
546
  if (pEntry) {
33,102✔
547
    if (pInfo) {
548
      *pInfo = pEntry->info;
549
    }
550
  } else {
44,136✔
551
    code = TSDB_CODE_NOT_FOUND;
44,136✔
UNCOV
552
  }
×
UNCOV
553

×
554
  return code;
555
}
556

44,136✔
557
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
558
  memcpy(&pBuf[2], key, keyLen);
44,136✔
559
}
44,136✔
560

561
// the format of key:
562
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
44,136✔
UNCOV
563
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
×
564
  buf[0] = (uint64_t)pHashMap;
565
  buf[1] = suid;
566
  setMD5DigestInKey(buf, key, keyLen);
44,136✔
567
}
568

44,136✔
569
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
44,136✔
UNCOV
570
                                  bool* acquireRes) {
×
571
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
572
  int32_t vgId = TD_VID(pMeta->pVnode);
573

574
  // generate the composed key for LRU cache
44,136✔
575
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
576
  SHashObj*      pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
577
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
44,136✔
578

44,136✔
579
  *acquireRes = 0;
580
  uint64_t key[4];
581
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
33,102✔
582
  
583
  // void* tmp = NULL;
33,102✔
UNCOV
584
  // uint32_t len = 0;
×
585
  // (void)taosAscii2Hex((const char*)key, 32, &tmp, &len);
586
  // qDebug("metaGetCachedTableUidList %p %"PRId64" key: %s", pTableMap, suid, tmp);
587
  // taosMemoryFree(tmp);
33,102✔
588

33,102✔
UNCOV
589
  (void)taosThreadMutexLock(pLock);
×
UNCOV
590
  pMeta->pCache->sTagFilterResCache.accTimes += 1;
×
591

592
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
593
  if (pHandle == NULL) {
33,102✔
594
    (void)taosThreadMutexUnlock(pLock);
595
    return TSDB_CODE_SUCCESS;
33,102✔
596
  }
597

33,102✔
598
  // do some book mark work after acquiring the filter result from cache
7,356✔
599
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
7,356✔
600
  if (NULL == pEntry) {
7,356✔
601
    metaError("meta/cache: pEntry should not be NULL.");
7,356✔
602
    return TSDB_CODE_NOT_FOUND;
7,356✔
603
  }
604

605
  *acquireRes = 1;
606

607
  const char* p = taosLRUCacheValue(pCache, pHandle);
33,102✔
608
  int32_t     size = *(int32_t*)p;
609

610
  // set the result into the buffer
11,034✔
611
  if (taosArrayAddBatch(pList1, p + sizeof(int32_t), size) == NULL) {
11,034✔
612
    return terrno;
11,034✔
613
  }
11,034✔
614

11,034✔
615
  (*pEntry)->hitTimes += 1;
616

11,034✔
617
  uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
11,034✔
618
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
11,034✔
619
    metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
11,034✔
620
             ((double)(*pEntry)->hitTimes) / acc);
11,034✔
621
  }
11,034✔
622

11,034✔
623
  bool ret = taosLRUCacheRelease(pCache, pHandle, false);
624

11,034✔
625
  // unlock meta
11,034✔
UNCOV
626
  (void)taosThreadMutexUnlock(pLock);
×
UNCOV
627
  return TSDB_CODE_SUCCESS;
×
UNCOV
628
}
×
UNCOV
629

×
630
int32_t metaStableTagFilterCacheGet(void* pVnode, tb_uid_t suid,
NEW
631
  const uint8_t* pTagCondKey, int32_t tagCondKeyLen,
×
632
  const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) {
633

634
  int32_t code = TSDB_CODE_SUCCESS;
11,034✔
635
  int32_t lino = 0;
636
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
637
  int32_t vgId = TD_VID(pMeta->pVnode);
638
  *acquireRes = 0;
33,102✔
639

640
  // generate the composed key for LRU cache
33,102✔
641
  SHashObj*       pTableMap = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
33,102✔
642
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
33,102✔
643

644
  code = taosThreadRwlockRdlock(pRwlock);
33,102✔
NEW
645
  TSDB_CHECK_CODE(code, lino, _end);
×
646
  pMeta->pCache->sStableTagFilterResCache.accTimes += 1;
647

NEW
648
  STagConds** pTagConds =
×
NEW
649
    (STagConds**)taosHashGet(pTableMap, &suid, sizeof(tb_uid_t));
×
650
  TSDB_CHECK_NULL(pTagConds, code, lino, _end, TSDB_CODE_SUCCESS);
651

652
  STagCondFilterEntry** pFilterEntry = (STagCondFilterEntry**)taosHashGet(
33,102✔
NEW
653
    (*pTagConds)->set, pTagCondKey, tagCondKeyLen);
×
654
  TSDB_CHECK_NULL(pFilterEntry, code, lino, _end, TSDB_CODE_SUCCESS);
655

NEW
656
  SArray** pArray = (SArray**)taosHashGet((*pFilterEntry)->set, pKey, keyLen);
×
NEW
657
  TSDB_CHECK_NULL(pArray, code, lino, _end, TSDB_CODE_SUCCESS);
×
658

659
  // set the result into the buffer
660
  *acquireRes = 1;
33,102✔
661
  TAOS_UNUSED(taosArrayAddBatch(
33,102✔
662
    pList1, TARRAY_GET_ELEM(*pArray, 0), taosArrayGetSize(*pArray)));
33,102✔
663

664
  // do some bookmark work after acquiring the filter result from cache
33,102✔
665
  (*pTagConds)->hitTimes += 1;
33,102✔
666
  (*pFilterEntry)->hitTimes += 1;
667
  uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
668
  if ((*pTagConds)->hitTimes % 5000 == 1) {
669
    metaInfo(
670
      "vgId:%d, suid:%" PRIu64 ", table cache hit:%d, "
671
      "total meta acc:%d, rate:%.2f%%, tag condition hit:%d",
672
      vgId, suid, (*pTagConds)->hitTimes, acc,
673
      (100 * (double)(*pTagConds)->hitTimes / acc), (*pFilterEntry)->hitTimes);
33,102✔
674
  }
33,102✔
675

33,102✔
676
_end:
11,034✔
677
  if (TSDB_CODE_SUCCESS != code) {
11,034✔
NEW
678
    metaError("vgId:%d, %s failed at %s:%d since %s",
×
679
      vgId, __func__, __FILE__, lino, tstrerror(code));
680
  }
681
  // unlock meta
22,068✔
682
  code = taosThreadRwlockUnlock(pRwlock);
22,068✔
683
  if (TSDB_CODE_SUCCESS != code) {
NEW
684
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
×
NEW
685
      vgId, __func__, __FILE__, lino, tstrerror(code));
×
686
  }
687
  return code;
22,068✔
NEW
688
}
×
689

690
static void freeUidCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
691
  (void)ud;
692
  if (value == NULL) {
693
    return;
33,102✔
694
  }
695

33,102✔
696
  const uint64_t* p = key;
33,102✔
697
  if (keyLen != sizeof(int64_t) * 4) {
33,102✔
698
    metaError("key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
699
    return;
700
  }
33,102✔
701

702
  SHashObj* pHashObj = (SHashObj*)p[0];
703

38,457,532✔
704
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
38,457,532✔
705

38,458,857✔
706
  if (pEntry != NULL && (*pEntry) != NULL) {
38,457,485✔
707
    int64_t st = taosGetTimestampUs();
38,458,857✔
708
    int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
38,458,857✔
709
    if (code == TSDB_CODE_SUCCESS) {
710
      double el = (taosGetTimestampUs() - st) / 1000.0;
711
      metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", taosHashGetSize((*pEntry)->set),
778,416,744✔
712
               el);
778,416,744✔
713
    }
778,476,669✔
714
  }
778,504,079✔
715

716
  taosMemoryFree(value);
778,505,842✔
717
}
778,487,002✔
718

719
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
778,460,769✔
720
  int32_t             code = TSDB_CODE_SUCCESS;
778,496,148✔
721
  int32_t             lino = 0;
722
  STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
778,576,842✔
723
  TSDB_CHECK_NULL(p, code, lino, _end, terrno);
778,514,106✔
724

778,506,750✔
725
  p->hitTimes = 0;
778,512,653✔
726
  p->set = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
727
  TSDB_CHECK_NULL(p->set, code, lino, _end, terrno);
728
  code = taosHashPut(p->set, pKey, keyLen, NULL, 0);
7,356✔
729
  TSDB_CHECK_CODE(code, lino, _end);
730
  code = taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
7,356✔
731
  TSDB_CHECK_CODE(code, lino, _end);
14,712✔
732

733
_end:
7,356✔
734
  if (code != TSDB_CODE_SUCCESS) {
7,356✔
735
    metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
736
    if (p != NULL) {
7,356✔
737
      if (p->set != NULL) {
7,356✔
738
        taosHashCleanup(p->set);
739
      }
7,356✔
740
      taosMemoryFree(p);
7,356✔
741
    }
742
  }
UNCOV
743
  return code;
×
UNCOV
744
}
×
UNCOV
745

×
746
// check both the payload size and selectivity ratio
747
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
UNCOV
748
                              int32_t payloadLen, double selectivityRatio) {
×
UNCOV
749
  int32_t code = 0;
×
UNCOV
750
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
751
  int32_t vgId = TD_VID(pMeta->pVnode);
UNCOV
752

×
UNCOV
753
  if (selectivityRatio > tsSelectivityRatio) {
×
754
    metaDebug("vgId:%d, suid:%" PRIu64
×
755
              " failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
UNCOV
756
              vgId, suid, selectivityRatio, tsSelectivityRatio);
×
757
    taosMemoryFree(pPayload);
×
758
    return TSDB_CODE_SUCCESS;
UNCOV
759
  }
×
UNCOV
760

×
UNCOV
761
  if (payloadLen > tsTagFilterResCacheSize) {
×
762
    metaDebug("vgId:%d, suid:%" PRIu64
×
763
              " failed to add to uid list cache, due to payload length %d greater than threshold %d",
764
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
765
    taosMemoryFree(pPayload);
×
766
    return TSDB_CODE_SUCCESS;
×
UNCOV
767
  }
×
UNCOV
768

×
769
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
770
  SHashObj*      pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
UNCOV
771
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
×
772

UNCOV
773
  uint64_t key[4] = {0};
×
774
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
UNCOV
775

×
UNCOV
776
  // void* tmp = NULL;
×
UNCOV
777
  // uint32_t len = 0;
×
778
  // (void)taosAscii2Hex((const char*)key, 32, &tmp, &len);
779
  // qDebug("metaUidFilterCachePut %p %"PRId64" key: %s", pTableEntry, suid, tmp);
780
  // taosMemoryFree(tmp);
UNCOV
781

×
782
  (void)taosThreadMutexLock(pLock);
783
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
UNCOV
784
  if (pEntry == NULL) {
×
UNCOV
785
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
×
786
    if (code != TSDB_CODE_SUCCESS) {
787
      goto _end;
UNCOV
788
    }
×
789
  } else {  // check if it exists or not
UNCOV
790
    code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
×
UNCOV
791
    if (code == TSDB_CODE_DUP_KEY) {
×
792
      // we have already found the existed items, no need to added to cache anymore.
793
      (void)taosThreadMutexUnlock(pLock);
794
      return TSDB_CODE_SUCCESS;
×
UNCOV
795
    }
×
UNCOV
796
    if (code != TSDB_CODE_SUCCESS) {
×
797
      goto _end;
×
798
    }
799
  }
UNCOV
800

×
801
  // add to cache.
UNCOV
802
  (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL, NULL,
×
803
                           TAOS_LRU_PRIORITY_LOW, NULL);
UNCOV
804
_end:
×
UNCOV
805
  (void)taosThreadMutexUnlock(pLock);
×
UNCOV
806
  metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
×
UNCOV
807
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
×
UNCOV
808

×
UNCOV
809
  return code;
×
810
}
811

812
static void freeSArrayPtr(void* pp) {
813
  SArray* pArray = *(SArray**)pp;
NEW
814
  taosArrayDestroy(pArray);
×
815
}
816

NEW
817
int32_t metaStableTagFilterCachePut(
×
818
  void* pVnode, uint64_t suid, const void* pTagCondKey, int32_t tagCondKeyLen,
NEW
819
  const void* pKey, int32_t keyLen, SArray* pUidList, SArray* pTagColIds) {
×
NEW
820

×
NEW
821
  int32_t code = TSDB_CODE_SUCCESS;
×
822
  int32_t lino = 0;
NEW
823
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
NEW
824
  int32_t vgId = TD_VID(pMeta->pVnode);
×
825

826
  SHashObj*       pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
NEW
827
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
×
NEW
828

×
829
  code = taosThreadRwlockWrlock(pRwlock);
830
  TSDB_CHECK_CODE(code, lino, _end);
NEW
831

×
NEW
832
  STagConds** pTagConds = 
×
NEW
833
    (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
×
834
  if (pTagConds == NULL) {
NEW
835
    // add new (suid -> tag conds) entry
×
NEW
836
    STagConds* pEntry = (STagConds*)taosMemoryMalloc(sizeof(STagConds));
×
837
    TSDB_CHECK_NULL(pEntry, code, lino, _end, terrno);
NEW
838

×
NEW
839
    pEntry->hitTimes = 0;
×
NEW
840
    pEntry->set = taosHashInit(
×
NEW
841
      1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
×
NEW
842
      false, HASH_NO_LOCK);
×
NEW
843
    taosHashSetFreeFp(pEntry->set, freeTagFilterEntryFp);
×
844
    TSDB_CHECK_NULL(pEntry->set, code, lino, _end, terrno);
845

NEW
846
    code = taosHashPut(
×
NEW
847
      pTableEntry, &suid, sizeof(uint64_t), &pEntry, POINTER_BYTES);
×
848
    TSDB_CHECK_CODE(code, lino, _end);
NEW
849

×
NEW
850
    pTagConds = (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
×
851
    TSDB_CHECK_NULL(pTagConds, code, lino, _end, terrno);
NEW
852
  }
×
NEW
853

×
854
  STagCondFilterEntry** pFilterEntry =
855
    (STagCondFilterEntry**)taosHashGet(
856
      (*pTagConds)->set, pTagCondKey, tagCondKeyLen);
857
  if (pFilterEntry == NULL) {
NEW
858
    // add new (tag cond -> filter entry) entry
×
859
    STagCondFilterEntry* pEntry = 
NEW
860
      (STagCondFilterEntry*)taosMemoryMalloc(sizeof(STagCondFilterEntry));
×
NEW
861
    TSDB_CHECK_NULL(pEntry, code, lino, _end, terrno);
×
NEW
862

×
863
    pEntry->hitTimes = 0;
864
    pEntry->set = taosHashInit(
NEW
865
      1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
×
866
      false, HASH_NO_LOCK);
867
    taosHashSetFreeFp(pEntry->set, freeSArrayPtr);
868
    TSDB_CHECK_NULL(pEntry->set, code, lino, _end, terrno);
869

778,394,621✔
870
    code = taosHashPut(
778,394,621✔
871
      (*pTagConds)->set, pTagCondKey, tagCondKeyLen, &pEntry, POINTER_BYTES);
778,435,390✔
872
    TSDB_CHECK_CODE(code, lino, _end);
778,352,344✔
873

874
    pFilterEntry = (STagCondFilterEntry**)taosHashGet(
778,470,383✔
875
      (*pTagConds)->set, pTagCondKey, tagCondKeyLen);
778,480,074✔
876
    TSDB_CHECK_NULL(pFilterEntry, code, lino, _end, terrno);
877
    (*pFilterEntry)->pColIds = taosArrayDup(pTagColIds, NULL);
778,444,248✔
878
  } else {
778,426,634✔
879
    // pColIds is already set, so we can destroy the new one
880
    taosArrayDestroy(pTagColIds);
778,486,648✔
881
  }
778,561,365✔
882

778,561,365✔
883
  // add to cache.
778,614,930✔
884
  SArray* pPayload = taosArrayDup(pUidList, NULL);
885
  code = taosHashPut(
NEW
886
    (*pFilterEntry)->set, pKey, keyLen, &pPayload, POINTER_BYTES);
×
887
  TSDB_CHECK_CODE(code, lino, _end);
NEW
888

×
NEW
889
_end:
×
890
  if (TSDB_CODE_SUCCESS != code) {
NEW
891
    metaError("vgId:%d, %s failed at %s:%d since %s",
×
NEW
892
      vgId, __func__, __FILE__, lino, tstrerror(code));
×
893
  } else {
NEW
894
    metaDebug("vgId:%d, suid:%" PRIu64 " new tag filter cache added into cache,"
×
NEW
895
      " num stable:%d, tag conditions:%d",
×
896
      vgId, suid, (int32_t)taosHashGetSize(pTableEntry),
NEW
897
      pTagConds ? (int32_t)taosHashGetSize((*pTagConds)->set) : 0);
×
NEW
898
  }
×
899
  // unlock meta
900
  code = taosThreadRwlockUnlock(pRwlock);
901
  if (TSDB_CODE_SUCCESS != code) {
1,444,385,756✔
902
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
1,444,385,756✔
903
      vgId, __func__, __FILE__, lino, tstrerror(code));
38,750✔
904
  }
905

906
  return code;
1,444,347,006✔
NEW
907
}
×
908

909
// drop all the cache entries for a super table 
910
int32_t metaStableTagFilterCacheDropSTable(
1,444,570,570✔
911
  SMeta* pMeta, tb_uid_t suid) {
912
  if (pMeta == NULL) {
913
    return TSDB_CODE_INVALID_PARA;
242,004✔
914
  }
242,004✔
915
  int32_t   lino = 0;
94,754✔
916
  int32_t   code = TSDB_CODE_SUCCESS;
917
  SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
918
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
147,250✔
919

147,250✔
920
  code = taosThreadRwlockWrlock(pRwlock);
921
  TSDB_CHECK_CODE(code, lino, _end);
NEW
922
  code = taosHashRemove(pTableEntry, &suid, sizeof(tb_uid_t));
×
923
  TSDB_CHECK_CODE(code, lino, _end);
924

925
_end:
152,296✔
926
  if (TSDB_CODE_SUCCESS != code) {
152,296✔
927
    metaError("vgId:%d, %s failed at %s:%d since %s",
152,296✔
928
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
NEW
929
  } else {
×
930
    metaDebug(
931
      "vgId:%d, suid:%" PRIu64 " stable tag filter cache dropped from cache",
932
      TD_VID(pMeta->pVnode), suid);
38,470,955✔
933
  }
934
  code = taosThreadRwlockUnlock(pRwlock);
38,470,955✔
935
  if (TSDB_CODE_SUCCESS != code) {
38,470,955✔
936
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
38,470,955✔
937
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
938
  }
38,470,955✔
939
  return code;
38,467,719✔
940
}
7,750✔
941

7,750✔
942
static int32_t buildTagDataEntryKey(
38,459,969✔
NEW
943
  const SArray* pColIds, STag* pTag, T_MD5_CTX* pContext) {
×
NEW
944
  int32_t code = TSDB_CODE_SUCCESS;
×
945
  int32_t keyLen = 0;
946
  // get length first
38,467,719✔
947
  for (int32_t i = 0; i < taosArrayGetSize(pColIds); i++) {
155,000✔
948
    STagVal pTagValue = {.cid = *(col_id_t*)taosArrayGet(pColIds, i)};
147,250✔
949
    if (tTagGet(pTag, &pTagValue)) {
950
      keyLen += sizeof(col_id_t);
951
      if (IS_VAR_DATA_TYPE(pTagValue.type)) {
952
        keyLen += pTagValue.nData;
953
      } else {
38,467,719✔
954
        keyLen += tDataTypes[pTagValue.type].bytes;
955
      }
956
    } else {
1,090,199✔
957
      // tag value not found
1,090,199✔
958
      code = TSDB_CODE_NOT_FOUND;
959
      return code;
1,090,199✔
960
    }
881,604✔
961
  }
962

963
  char* pKey = taosMemoryCalloc(1, keyLen);
206,924✔
964
  if (NULL == pKey) {
206,924✔
965
    code = terrno;
205,274✔
966
    return code;
205,274✔
967
  }
205,274✔
968

969
  // build the key
205,274✔
970
  char* pStart = pKey;
205,274✔
971
  for (int32_t i = 0; i < taosArrayGetSize(pColIds); i++) {
972
    STagVal pTagValue = {.cid = *(col_id_t*)taosArrayGet(pColIds, i)};
973
    if (tTagGet(pTag, &pTagValue)) {
1,650✔
974
      // copy cid
975
      memcpy(pStart, &pTagValue.cid, sizeof(col_id_t));
976
      pStart += sizeof(col_id_t);
2,196,485✔
977
      // copy value
2,196,485✔
978
      if (IS_VAR_DATA_TYPE(pTagValue.type)) {
2,196,485✔
979
        int32_t varLen = pTagValue.nData;
2,196,485✔
980
        memcpy(pStart, pTagValue.pData, varLen);
2,196,485✔
981
        pStart += varLen;
982
      } else {
2,196,485✔
983
        int32_t typeLen = tDataTypes[pTagValue.type].bytes;
984
        memcpy(pStart, &pTagValue.i64, typeLen);
2,196,485✔
985
        pStart += typeLen;
2,196,485✔
986
      }
2,173,860✔
987
    } else {
988
      // tag value not found
989
      taosMemoryFree(pKey);
22,625✔
990
      code = TSDB_CODE_NOT_FOUND;
991
      return code;
22,625✔
992
    }
993
  }
15,387✔
994

2,196,485✔
995
  // update MD5
2,196,485✔
996
  tMD5Init(pContext);
997
  tMD5Update(pContext, (uint8_t*)pKey, (uint32_t)keyLen);
998
  tMD5Final(pContext);
14,517,303✔
999

14,517,303✔
1000
  return code;
14,517,303✔
1001
}
14,517,303✔
1002

14,531,698✔
1003
// remove the dropped table uid from all cache entries
14,543,713✔
1004
// pDroppedTable is the dropped child table meta entry
1005
int32_t metaStableTagFilterCacheUpdateUid(SMeta* pMeta,
14,535,869✔
1006
  const SMetaEntry* pDroppedTable, ETagFilterCacheAction action) {
1007
  if (pMeta == NULL || pDroppedTable == NULL) {
14,538,219✔
1008
    return TSDB_CODE_INVALID_PARA;
14,534,950✔
1009
  }
14,350,842✔
1010
  int32_t   lino = 0;
50,087,134✔
1011
  int32_t   code = TSDB_CODE_SUCCESS;
35,725,893✔
1012
  SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
35,715,523✔
1013
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
35,715,523✔
1014

35,720,255✔
1015
  code = taosThreadRwlockWrlock(pRwlock);
35,721,118✔
1016
  TSDB_CHECK_CODE(code, lino, _end);
35,721,118✔
1017

35,686,661✔
1018
  tb_uid_t suid = pDroppedTable->ctbEntry.suid;;
35,686,661✔
1019
  STagConds** pTagConds =
35,712,240✔
1020
    (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t));
35,712,240✔
1021
  if (pTagConds != NULL) {
1022
    STagCondFilterEntry** ppFilterEntry = NULL;
1023
    while (ppFilterEntry = taosHashIterate((*pTagConds)->set, ppFilterEntry)) {
1024
      STagCondFilterEntry* pFilterEntry = *ppFilterEntry;
14,545,349✔
1025
      // rebuild the tagCondKey and check existence
14,540,544✔
NEW
1026
      SArray* pColIds = pFilterEntry->pColIds;
×
1027
      // rebuild the tagCondFilterKey
1028
      int32_t keyLen = 0;
14,540,544✔
1029
      char*   pKey = NULL;
14,540,549✔
1030
      T_MD5_CTX context = {0};
1031
      code = buildTagDataEntryKey(pColIds, (STag*)pDroppedTable->ctbEntry.pTags, &context);
1032
      if (code != TSDB_CODE_SUCCESS) {
184,108✔
1033
        metaError("vgId:%d, suid:%" PRIu64 " failed to build tag condition key for dropped table uid:%" PRIu64
184,108✔
1034
          " since %s",
184,108✔
1035
          TD_VID(pMeta->pVnode), suid, pDroppedTable->uid, tstrerror(code));
184,108✔
1036
        goto _end;
1037
      }
184,108✔
1038

184,108✔
1039
      SArray** pArray = (SArray**)taosHashGet(
1040
        pFilterEntry->set, context.digest, tListLen(context.digest));
184,108✔
1041
      if (pArray != NULL) {
184,108✔
1042
        // check and remove the dropped table uid from the array
1043
        // TODO(Tony Zhang): optimize this scan
184,108✔
1044
        if (action == STABLE_TAG_FILTER_CACHE_DROP_TABLE) {
1045
          for (int32_t i = 0; i < taosArrayGetSize(*pArray); i++) {
184,108✔
1046
            uint64_t uid = *(uint64_t*)taosArrayGet(*pArray, i);
184,108✔
NEW
1047
            if (uid == pDroppedTable->uid) {
×
1048
              taosArrayRemove(*pArray, i);
1049
              metaDebug("vgId:%d, suid:%" PRIu64 " removed dropped table uid:%" PRIu64
184,108✔
1050
                " from stable tag filter cache",
1051
                TD_VID(pMeta->pVnode), suid, pDroppedTable->uid);
1052
              break;
204,563✔
1053
            } 
204,563✔
1054
          }
204,563✔
1055
        } else {
204,563✔
1056
          // STABLE_TAG_FILTER_CACHE_ADD_TABLE
204,563✔
1057
          taosArrayPush(*pArray, &pDroppedTable->uid);
204,563✔
1058
        }
1059
      }
204,563✔
1060
    }
1061
  }
204,563✔
1062

204,563✔
1063
_end:
204,563✔
1064
  if (TSDB_CODE_SUCCESS != code) {
184,108✔
1065
    metaError("vgId:%d, %s failed at %s:%d since %s",
184,108✔
1066
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
1067
  } else {
20,455✔
1068
    metaDebug(
1069
      "vgId:%d, suid:%" PRIu64 " dropped table uid:%" PRIu64
1070
      " removed from stable tag filter cache",
468,478✔
1071
      TD_VID(pMeta->pVnode),
263,915✔
1072
      pDroppedTable->ctbEntry.suid, pDroppedTable->uid);
263,915✔
1073
  }
263,915✔
1074
  code = taosThreadRwlockUnlock(pRwlock);
263,915✔
1075
  if (TSDB_CODE_SUCCESS != code) {
263,915✔
1076
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
1077
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
1078
  }
1079
  return code;
204,563✔
1080
}
204,563✔
NEW
1081

×
1082
int32_t metaStableTagFilterCacheDropTag(SMeta* pMeta,
1083
  tb_uid_t suid, col_id_t cid) {
204,563✔
1084
  if (pMeta == NULL) {
1085
    return TSDB_CODE_INVALID_PARA;
204,563✔
1086
  }
1087
  int32_t   lino = 0;
1088
  int32_t   code = TSDB_CODE_SUCCESS;
1089
  SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
1090
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
1091

1092
  code = taosThreadRwlockWrlock(pRwlock);
1093
  TSDB_CHECK_CODE(code, lino, _end);
1094

1095
  STagConds** pTagConds =
1096
    (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t));
1097
  if (pTagConds != NULL) {
1098
    void* pIter = taosHashIterate((*pTagConds)->set, NULL);
1099
    while (pIter) {
1100
      STagCondFilterEntry* pFilterEntry = *(STagCondFilterEntry**)pIter;
1101
      bool found = false;
1102
      for (int32_t i = 0; i < taosArrayGetSize(pFilterEntry->pColIds); i++) {
1103
        col_id_t existCid = *(col_id_t*)taosArrayGet(pFilterEntry->pColIds, i);
1104
        if (existCid == cid) {
1105
          found = true;
1106
          break;
1107
        }
1108
      }
1109
      if (found) {
1110
        size_t keyLen = 0;
1111
        char  *key = (char *)taosHashGetKey(pIter, &keyLen);
1112
        code = taosHashRemove((*pTagConds)->set, key, keyLen);
1113
        TSDB_CHECK_CODE(code, lino, _end);
1114
      }
1115
      pIter = taosHashIterate((*pTagConds)->set, pIter);
1116
    }
1117
  }
1118
_end:
1119
  if (TSDB_CODE_SUCCESS != code) {
1120
    metaError("vgId:%d, %s failed at %s:%d since %s",
1121
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
1122
  } else {
1123
    metaDebug(
1124
      "vgId:%d, suid:%" PRIu64 " dropped tag cid:%d "
1125
      "from stable tag filter cache",
1126
      TD_VID(pMeta->pVnode), suid, cid);
1127
  }
1128
  code = taosThreadRwlockUnlock(pRwlock);
1129
  if (TSDB_CODE_SUCCESS != code) {
1130
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
1131
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
1132
  }
1133
  return code;
1134
}
1135

1136
void metaCacheClear(SMeta* pMeta) {
1137
  metaWLock(pMeta);
1138
  metaCacheClose(pMeta);
1139
  (void)metaCacheOpen(pMeta);
1140
  metaULock(pMeta);
1141
}
1142

1143
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
1144
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
1145
  uint64_t  p[4] = {0};
1146
  int32_t   vgId = TD_VID(pMeta->pVnode);
1147
  SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
1148

1149
  uint64_t dummy[2] = {0};
1150
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
1151

1152
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
1153
  (void)taosThreadMutexLock(pLock);
1154

1155
  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
1156
  if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
1157
    (void)taosThreadMutexUnlock(pLock);
1158
    return TSDB_CODE_SUCCESS;
1159
  }
1160

1161
  (*pEntry)->hitTimes = 0;
1162

1163
  char *iter = taosHashIterate((*pEntry)->set, NULL);
1164
  while (iter != NULL) {
1165
    setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
1166
    taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
1167
    iter = taosHashIterate((*pEntry)->set, iter);
1168
  }
1169
  taosHashClear((*pEntry)->set);
1170
  (void)taosThreadMutexUnlock(pLock);
1171

1172
  metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
1173
  return TSDB_CODE_SUCCESS;
1174
}
1175

1176
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
1177
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
1178
  int32_t vgId = TD_VID(pMeta->pVnode);
1179

1180
  // generate the composed key for LRU cache
1181
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
1182
  SHashObj*      pTableMap = pMeta->pCache->STbGroupResCache.pTableEntry;
1183
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
1184

1185
  *pList = NULL;
1186
  uint64_t key[4];
1187
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
1188

1189
  (void)taosThreadMutexLock(pLock);
1190
  pMeta->pCache->STbGroupResCache.accTimes += 1;
1191

1192
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
1193
  if (pHandle == NULL) {
1194
    (void)taosThreadMutexUnlock(pLock);
1195
    return TSDB_CODE_SUCCESS;
1196
  }
1197

1198
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
1199
  if (NULL == pEntry) {
1200
    metaDebug("suid %" PRIu64 " not in tb group cache", suid);
1201
    return TSDB_CODE_NOT_FOUND;
1202
  }
1203

1204
  *pList = taosArrayDup(taosLRUCacheValue(pCache, pHandle), NULL);
1205

1206
  (*pEntry)->hitTimes += 1;
1207

1208
  uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
1209
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
1210
    metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
1211
             ((double)(*pEntry)->hitTimes) / acc);
1212
  }
1213

1214
  bool ret = taosLRUCacheRelease(pCache, pHandle, false);
1215

1216
  // unlock meta
1217
  (void)taosThreadMutexUnlock(pLock);
1218
  return TSDB_CODE_SUCCESS;
1219
}
1220

1221
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
1222
  (void)ud;
1223
  if (value == NULL) {
1224
    return;
1225
  }
1226

1227
  const uint64_t* p = key;
1228
  if (keyLen != sizeof(int64_t) * 4) {
1229
    metaError("tb group key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
1230
    return;
1231
  }
1232

1233
  SHashObj* pHashObj = (SHashObj*)p[0];
1234

1235
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
1236

1237
  if (pEntry != NULL && (*pEntry) != NULL) {
1238
    int64_t st = taosGetTimestampUs();
1239
    int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
1240
    if (code == TSDB_CODE_SUCCESS) {
1241
      double el = (taosGetTimestampUs() - st) / 1000.0;
1242
      metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
1243
                taosHashGetSize((*pEntry)->set), el);
1244
    }
1245
  }
1246

1247
  taosArrayDestroy((SArray*)value);
1248
}
1249

1250
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
1251
                              int32_t payloadLen) {
1252
  int32_t code = 0;
1253
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
1254
  int32_t vgId = TD_VID(pMeta->pVnode);
1255

1256
  if (payloadLen > tsTagFilterResCacheSize) {
1257
    metaDebug("vgId:%d, suid:%" PRIu64
1258
              " ignore to add to tb group cache, due to payload length %d greater than threshold %d",
1259
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
1260
    taosArrayDestroy((SArray*)pPayload);
1261
    return TSDB_CODE_SUCCESS;
1262
  }
1263

1264
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
1265
  SHashObj*      pTableEntry = pMeta->pCache->STbGroupResCache.pTableEntry;
1266
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
1267

1268
  uint64_t key[4] = {0};
1269
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
1270

1271
  (void)taosThreadMutexLock(pLock);
1272
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
1273
  if (pEntry == NULL) {
1274
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
1275
    if (code != TSDB_CODE_SUCCESS) {
1276
      goto _end;
1277
    }
1278
  } else {  // check if it exists or not
1279
    code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
1280
    if (code == TSDB_CODE_DUP_KEY) {
1281
      // we have already found the existed items, no need to added to cache anymore.
1282
      (void)taosThreadMutexUnlock(pLock);
1283
      return TSDB_CODE_SUCCESS;
1284
    }
1285
    if (code != TSDB_CODE_SUCCESS) {
1286
      goto _end;
1287
    }
1288
  }
1289

1290
  // add to cache.
1291
  (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL, NULL,
1292
                           TAOS_LRU_PRIORITY_LOW, NULL);
1293
_end:
1294
  (void)taosThreadMutexUnlock(pLock);
1295
  metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
1296
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
1297

1298
  return code;
1299
}
1300

1301
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
1302
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
1303
  uint64_t  p[4] = {0};
1304
  int32_t   vgId = TD_VID(pMeta->pVnode);
1305
  SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;
1306

1307
  uint64_t dummy[2] = {0};
1308
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
1309

1310
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
1311
  (void)taosThreadMutexLock(pLock);
1312

1313
  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
1314
  if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
1315
    (void)taosThreadMutexUnlock(pLock);
1316
    return TSDB_CODE_SUCCESS;
1317
  }
1318

1319
  (*pEntry)->hitTimes = 0;
1320

1321
  char *iter = taosHashIterate((*pEntry)->set, NULL);
1322
  while (iter != NULL) {
1323
    setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
1324
    taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
1325
    iter = taosHashIterate((*pEntry)->set, iter);
1326
  }
1327
  taosHashClear((*pEntry)->set);
1328
  (void)taosThreadMutexUnlock(pLock);
1329

1330
  metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
1331
  return TSDB_CODE_SUCCESS;
1332
}
1333

1334
bool metaTbInFilterCache(SMeta* pMeta, const void* key, int8_t type) {
1335
  if (type == 0 && taosHashGet(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t))) {
1336
    return true;
1337
  }
1338

1339
  if (type == 1 && taosHashGet(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key))) {
1340
    return true;
1341
  }
1342

1343
  return false;
1344
}
1345

1346
int32_t metaPutTbToFilterCache(SMeta* pMeta, const void* key, int8_t type) {
1347
  if (type == 0) {
1348
    return taosHashPut(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t), NULL, 0);
1349
  }
1350

1351
  if (type == 1) {
1352
    return taosHashPut(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key), NULL, 0);
1353
  }
1354

1355
  return 0;
1356
}
1357

1358
int32_t metaSizeOfTbFilterCache(SMeta* pMeta, int8_t type) {
1359
  if (type == 0) {
1360
    return taosHashGetSize(pMeta->pCache->STbFilterCache.pStb);
1361
  }
1362
  return 0;
1363
}
1364

1365
int32_t metaInitTbFilterCache(SMeta* pMeta) {
1366
#ifdef TD_ENTERPRISE
1367
  int32_t      tbNum = 0;
1368
  const char** pTbArr = NULL;
1369
  const char*  dbName = NULL;
1370

1371
  if (!(dbName = strchr(pMeta->pVnode->config.dbname, '.'))) return 0;
1372
  if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
1373
    tbNum = tkLogStbNum;
1374
    pTbArr = (const char**)&tkLogStb;
1375
  } else if (0 == strncmp(dbName, "audit", TSDB_DB_NAME_LEN)) {
1376
    tbNum = tkAuditStbNum;
1377
    pTbArr = (const char**)&tkAuditStb;
1378
  }
1379
  if (tbNum && pTbArr) {
1380
    for (int32_t i = 0; i < tbNum; ++i) {
1381
      TAOS_CHECK_RETURN(metaPutTbToFilterCache(pMeta, pTbArr[i], 1));
1382
    }
1383
  }
1384
#else
1385
#endif
1386
  return 0;
1387
}
1388

1389
int64_t metaGetStbKeep(SMeta* pMeta, int64_t uid) {
1390
  SMetaStbStats stats = {0};
1391

1392
  if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
1393
    return stats.keep;
1394
  }
1395

1396
  SMetaEntry* pEntry = NULL;
1397
  if (metaFetchEntryByUid(pMeta, uid, &pEntry) == TSDB_CODE_SUCCESS) {
1398
    int64_t keep = -1;
1399
    if (pEntry->type == TSDB_SUPER_TABLE) {
1400
      keep = pEntry->stbEntry.keep;
1401
    }
1402
    metaFetchEntryFree(&pEntry);
1403
    return keep;
1404
  }
1405
  
1406
  return -1;
1407
}
1408

1409
int32_t metaRefDbsCacheClear(SMeta* pMeta, uint64_t suid) {
1410
  int32_t        code = TSDB_CODE_SUCCESS;
1411
  int32_t        vgId = TD_VID(pMeta->pVnode);
1412
  SHashObj*      pEntryHashMap = pMeta->pCache->STbRefDbCache.pStbRefs;
1413
  TdThreadMutex* pLock = &pMeta->pCache->STbRefDbCache.lock;
1414

1415
  (void)taosThreadMutexLock(pLock);
1416

1417
  SHashObj** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
1418
  if (pEntry == NULL) {
1419
    goto _return;
1420
  }
1421

1422
  code = taosHashRemove(pEntryHashMap, &suid, sizeof(uint64_t));
1423

1424
  metaDebug("vgId:%d suid:%" PRId64 " cached virtual stable ref db cleared", vgId, suid);
1425

1426
_return:
1427
  (void)taosThreadMutexUnlock(pLock);
1428
  return code;
1429
}
1430

1431
int32_t metaGetCachedRefDbs(void* pVnode, tb_uid_t suid, SArray* pList) {
1432
  int32_t        code = TSDB_CODE_SUCCESS;
1433
  int32_t        line = 0;
1434
  SMeta*         pMeta = ((SVnode*)pVnode)->pMeta;
1435
  SHashObj*      pTableMap = pMeta->pCache->STbRefDbCache.pStbRefs;
1436
  TdThreadMutex* pLock = &pMeta->pCache->STbRefDbCache.lock;
1437

1438
  (void)taosThreadMutexLock(pLock);
1439

1440
  SHashObj** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
1441
  if (pEntry) {
1442
    void *iter = taosHashIterate(*pEntry, NULL);
1443
    while (iter != NULL) {
1444
      size_t   dbNameLen = 0;
1445
      char*    name = NULL;
1446
      char*    dbName = NULL;
1447
      name = taosHashGetKey(iter, &dbNameLen);
1448
      TSDB_CHECK_NULL(name, code, line, _return, terrno);
1449
      dbName = taosMemoryMalloc(dbNameLen + 1);
1450
      TSDB_CHECK_NULL(dbName, code, line, _return, terrno);
1451
      tstrncpy(dbName, name, dbNameLen + 1);
1452
      TSDB_CHECK_NULL(taosArrayPush(pList, &dbName), code, line, _return, terrno);
1453
      iter = taosHashIterate(*pEntry, iter);
1454
    }
1455
  }
1456

1457
_return:
1458
  if (code) {
1459
    metaError("%s failed at line %d since %s", __func__, line, tstrerror(code));
1460
  }
1461
  (void)taosThreadMutexUnlock(pLock);
1462
  return code;
1463
}
1464

1465
static int32_t addRefDbsCacheNewEntry(SHashObj* pRefDbs, uint64_t suid, SHashObj **pEntry) {
1466
  int32_t      code = TSDB_CODE_SUCCESS;
1467
  int32_t      lino = 0;
1468
  SHashObj*    p = NULL;
1469

1470
  p = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1471
  TSDB_CHECK_NULL(p, code, lino, _end, terrno);
1472

1473
  code = taosHashPut(pRefDbs, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
1474
  TSDB_CHECK_CODE(code, lino, _end);
1475

1476
  *pEntry = p;
1477

1478
_end:
1479
  if (code != TSDB_CODE_SUCCESS) {
1480
    metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1481
  }
1482
  return code;
1483
}
1484

1485
int32_t metaPutRefDbsToCache(void* pVnode, tb_uid_t suid, SArray* pList) {
1486
  int32_t        code = 0;
1487
  int32_t        line = 0;
1488
  SMeta*         pMeta = ((SVnode*)pVnode)->pMeta;
1489
  SHashObj*      pStbRefs = pMeta->pCache->STbRefDbCache.pStbRefs;
1490
  TdThreadMutex* pLock = &pMeta->pCache->STbRefDbCache.lock;
1491

1492
  (void)taosThreadMutexLock(pLock);
1493

1494
  SHashObj*  pEntry = NULL;
1495
  SHashObj** find = taosHashGet(pStbRefs, &suid, sizeof(uint64_t));
1496
  if (find == NULL) {
1497
    code = addRefDbsCacheNewEntry(pStbRefs, suid, &pEntry);
1498
    TSDB_CHECK_CODE(code, line, _return);
1499
  } else {  // check if it exists or not
1500
    pEntry = *find;
1501
  }
1502

1503
  for (int32_t i = 0; i < taosArrayGetSize(pList); i++) {
1504
    char* dbName = taosArrayGetP(pList, i);
1505
    void* pItem = taosHashGet(pEntry, dbName, strlen(dbName));
1506
    if (pItem == NULL) {
1507
      code = taosHashPut(pEntry, dbName, strlen(dbName), NULL, 0);
1508
      TSDB_CHECK_CODE(code, line, _return);
1509
    }
1510
  }
1511

1512
_return:
1513
  if (code) {
1514
    metaError("%s failed at line %d since %s", __func__, line, tstrerror(code));
1515
  }
1516
  (void)taosThreadMutexUnlock(pLock);
1517

1518
  return code;
1519
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc