• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4847

11 Nov 2025 05:50AM UTC coverage: 62.651% (+0.3%) from 62.306%
#4847

push

travis-ci

web-flow
Merge e78cd6509 into 47a2ea7a0

542 of 650 new or added lines in 16 files covered. (83.38%)

1515 existing lines in 91 files now uncovered.

113826 of 181682 relevant lines covered (62.65%)

113230552.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.76
/source/dnode/vnode/src/meta/metaCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "meta.h"
16

17
#ifdef TD_ENTERPRISE
18
extern const char* tkLogStb[];
19
extern const char* tkAuditStb[];
20
extern const int   tkLogStbNum;
21
extern const int   tkAuditStbNum;
22
#endif
23

24
#define TAG_FILTER_RES_KEY_LEN  32
25
#define META_CACHE_BASE_BUCKET  1024
26
#define META_CACHE_STATS_BUCKET 16
27

28
// (uid , suid) : child table
29
// (uid,     0) : normal table
30
// (suid, suid) : super table
31
typedef struct SMetaCacheEntry SMetaCacheEntry;
32
struct SMetaCacheEntry {
33
  SMetaCacheEntry* next;
34
  SMetaInfo        info;
35
};
36

37
typedef struct SMetaStbStatsEntry {
38
  struct SMetaStbStatsEntry* next;
39
  SMetaStbStats              info;
40
} SMetaStbStatsEntry;
41

42
typedef struct STagFilterResEntry {
43
  SHashObj *set;    // the set of md5 digest, extracted from the serialized tag query condition
44
  uint32_t hitTimes;  // queried times for current super table
45
} STagFilterResEntry;
46

47
typedef struct STagCondFilterEntry {
48
  SArray*   pColIds;   // SArray<col_id_t>
49
  SHashObj* set;       // SHashObj<digest, SArray<uid>>
50
  uint32_t  hitTimes;  // queried times for current tag filter condition
51
} STagCondFilterEntry;
52

53
typedef struct STagConds {
54
  SHashObj* set;       // SHashObj<tagColIdStr, STagCondFilterEntry>
55
  uint32_t  hitTimes;  // queried times for current super table
56
  uint32_t  numTagDataEntries; // total num of tag data entries in this stable
57
} STagConds;
58

59
struct SMetaCache {
60
  // child, normal, super, table entry cache
61
  struct SEntryCache {
62
    int32_t           nEntry;
63
    int32_t           nBucket;
64
    SMetaCacheEntry** aBucket;
65
  } sEntryCache;
66

67
  // stable stats cache
68
  struct SStbStatsCache {
69
    int32_t              nEntry;
70
    int32_t              nBucket;
71
    SMetaStbStatsEntry** aBucket;
72
  } sStbStatsCache;
73

74
  // query cache
75
  struct STagFilterResCache {
76
    TdThreadMutex lock;
77
    uint32_t      accTimes;
78
    SHashObj*     pTableEntry;
79
    SLRUCache*    pUidResCache;
80
  } sTagFilterResCache;
81

82
  // cache table list for tag filter conditions
83
  // that match format "tag1 = v1 AND tag2 = v2 AND ..."
84
  struct SStableTagFilterResCache {
85
    TdThreadRwlock rwlock;
86
    SHashObj*      pTableEntry;  // HashObj<suid, STagConds>
87
    // access times
88
    uint64_t       accTimes;
8,015,149✔
89
    // hit times
8,015,149✔
90
    uint64_t       hitTimes;
91
    // total num of tag data entries in all stables
2,147,483,647✔
92
    uint32_t       numTagDataEntries;
2,147,483,647✔
93
  } sStableTagFilterResCache;
2,147,483,647✔
94

73,349,220✔
95
  struct STbGroupResCache {
73,349,220✔
96
    TdThreadMutex lock;
73,348,875✔
97
    uint32_t      accTimes;
98
    SHashObj*     pTableEntry;
99
    SLRUCache*    pResCache;
8,015,944✔
100
  } STbGroupResCache;
101

8,015,944✔
102
  struct STbFilterCache {
103
    SHashObj* pStb;
8,015,944✔
104
    SHashObj* pStbName;
8,015,944✔
105
  } STbFilterCache;
106

141,561,983✔
107
  struct STbRefDbCache {
133,547,617✔
108
    TdThreadMutex lock;
139,644,456✔
109
    SHashObj*     pStbRefs; // key: suid, value: SHashObj<dbName, refTimes>
6,098,417✔
110
  } STbRefDbCache;
6,098,417✔
111
};
6,095,607✔
112

113
static void entryCacheClose(SMeta* pMeta) {
114
  if (pMeta->pCache) {
8,014,978✔
115
    // close entry cache
116
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
8,015,944✔
117
      SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
118
      while (pEntry) {
1,197✔
119
        SMetaCacheEntry* tEntry = pEntry->next;
1,197✔
120
        taosMemoryFree(pEntry);
1,197✔
121
        pEntry = tEntry;
1,197✔
122
      }
1,197✔
123
    }
124
    taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
23,500✔
125
  }
23,500✔
126
}
23,500✔
127

23,500✔
128
static void statsCacheClose(SMeta* pMeta) {
23,500✔
129
  if (pMeta->pCache) {
130
    // close entry cache
8,003,398✔
131
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sStbStatsCache.nBucket; iBucket++) {
8,003,398✔
132
      SMetaStbStatsEntry* pEntry = pMeta->pCache->sStbStatsCache.aBucket[iBucket];
133
      while (pEntry) {
134
        SMetaStbStatsEntry* tEntry = pEntry->next;
8,003,398✔
135
        taosMemoryFree(pEntry);
8,014,523✔
UNCOV
136
        pEntry = tEntry;
×
137
      }
138
    }
139
    taosMemoryFree(pMeta->pCache->sStbStatsCache.aBucket);
140
  }
8,013,772✔
141
}
8,015,944✔
142

8,027,236✔
143
static void freeCacheEntryFp(void* param) {
8,012,947✔
144
  STagFilterResEntry** p = param;
8,015,944✔
UNCOV
145
  taosHashCleanup((*p)->set);
×
146
  taosMemoryFreeClear(*p);
147
}
148

149
static void freeTagFilterEntryFp(void* param) {
8,015,944✔
150
  STagCondFilterEntry** p = param;
8,015,944✔
151
  taosArrayDestroy((*p)->pColIds);
8,027,197✔
152
  taosHashCleanup((*p)->set);
8,015,944✔
153
  taosMemoryFreeClear(*p);
8,015,944✔
NEW
154
}
×
155

156
static void freeTagCondsFp(void* param) {
157
  STagConds** p = param;
158
  taosHashCleanup((*p)->set);
8,015,905✔
159
  taosMemoryFreeClear(*p);
8,015,944✔
NEW
160
}
×
161

162
static void freeRefDbFp(void* param) {
163
  SHashObj** p = param;
8,015,944✔
164
  taosHashCleanup(*p);
16,031,888✔
165
  *p = NULL;
16,019,981✔
166
}
8,015,944✔
UNCOV
167

×
168
int32_t metaCacheOpen(SMeta* pMeta) {
169
  int32_t code = 0;
170
  int32_t lino;
8,015,944✔
171

8,015,663✔
172
  pMeta->pCache = (SMetaCache*)taosMemoryCalloc(1, sizeof(SMetaCache));
173
  if (pMeta->pCache == NULL) {
174
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
8,015,944✔
175
  }
8,015,944✔
UNCOV
176

×
177
  // open entry cache
178
  pMeta->pCache->sEntryCache.nEntry = 0;
179
  pMeta->pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
8,015,944✔
180
  pMeta->pCache->sEntryCache.aBucket =
16,031,888✔
181
      (SMetaCacheEntry**)taosMemoryCalloc(pMeta->pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
16,020,596✔
182
  if (pMeta->pCache->sEntryCache.aBucket == NULL) {
8,015,944✔
183
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
184
  }
185

186
  // open stats cache
8,015,944✔
187
  pMeta->pCache->sStbStatsCache.nEntry = 0;
8,015,944✔
188
  pMeta->pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
189
  pMeta->pCache->sStbStatsCache.aBucket =
190
      (SMetaStbStatsEntry**)taosMemoryCalloc(pMeta->pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
16,031,888✔
191
  if (pMeta->pCache->sStbStatsCache.aBucket == NULL) {
16,020,596✔
192
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
8,015,944✔
UNCOV
193
  }
×
194

195
  // open tag filter cache
196
  pMeta->pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
16,031,888✔
197
  if (pMeta->pCache->sTagFilterResCache.pUidResCache == NULL) {
16,020,596✔
198
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
8,015,944✔
UNCOV
199
  }
×
200

201
  pMeta->pCache->sTagFilterResCache.accTimes = 0;
202
  pMeta->pCache->sTagFilterResCache.pTableEntry =
203
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
16,031,821✔
204
  if (pMeta->pCache->sTagFilterResCache.pTableEntry == NULL) {
16,019,755✔
205
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
8,015,944✔
UNCOV
206
  }
×
207

208
  taosHashSetFreeFp(pMeta->pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
209
  (void)taosThreadMutexInit(&pMeta->pCache->sTagFilterResCache.lock, NULL);
8,015,944✔
210

8,015,944✔
211
  // open stable tag filter cache
212
  pMeta->pCache->sStableTagFilterResCache.accTimes = 0;
213
  pMeta->pCache->sStableTagFilterResCache.numTagDataEntries = 0;
8,015,944✔
214
  pMeta->pCache->sStableTagFilterResCache.pTableEntry =
8,015,944✔
NEW
215
    taosHashInit(1024,
×
NEW
216
      taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
×
217
  if (pMeta->pCache->sStableTagFilterResCache.pTableEntry == NULL) {
218
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
8,015,944✔
219
  }
220
  taosHashSetFreeFp(
8,015,944✔
221
    pMeta->pCache->sStableTagFilterResCache.pTableEntry, freeTagCondsFp);
222

223
  TAOS_UNUSED(taosThreadRwlockInit(
8,015,944✔
224
    &pMeta->pCache->sStableTagFilterResCache.rwlock, NULL));
8,015,944✔
225

8,015,944✔
226
  // open group res cache
8,015,944✔
227
  pMeta->pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
228
  if (pMeta->pCache->STbGroupResCache.pResCache == NULL) {
8,015,944✔
229
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
8,015,944✔
230
  }
8,015,659✔
231

8,015,944✔
232
  pMeta->pCache->STbGroupResCache.accTimes = 0;
233
  pMeta->pCache->STbGroupResCache.pTableEntry =
8,015,944✔
234
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
8,015,944✔
235
  if (pMeta->pCache->STbGroupResCache.pTableEntry == NULL) {
8,015,944✔
236
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
8,015,234✔
237
  }
238

8,015,944✔
239
  taosHashSetFreeFp(pMeta->pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
8,015,944✔
240
  (void)taosThreadMutexInit(&pMeta->pCache->STbGroupResCache.lock, NULL);
241

8,015,944✔
242
  // open filter cache
8,015,944✔
243
  pMeta->pCache->STbFilterCache.pStb =
8,015,944✔
244
      taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
245
  if (pMeta->pCache->STbFilterCache.pStb == NULL) {
8,015,944✔
246
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
8,015,944✔
247
  }
248

8,015,944✔
249
  pMeta->pCache->STbFilterCache.pStbName =
250
      taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
21,468✔
251
  if (pMeta->pCache->STbFilterCache.pStbName == NULL) {
21,468✔
252
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
253
  }
254

21,468✔
255
  // open ref db cache
21,468✔
256
  pMeta->pCache->STbRefDbCache.pStbRefs =
UNCOV
257
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
258
  if (pMeta->pCache->STbRefDbCache.pStbRefs == NULL) {
259
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
260
  }
21,468✔
261

21,468✔
UNCOV
262
  taosHashSetFreeFp(pMeta->pCache->STbRefDbCache.pStbRefs, freeRefDbFp);
×
263
  (void)taosThreadMutexInit(&pMeta->pCache->STbRefDbCache.lock, NULL);
264

265

266
_exit:
64,381,916✔
267
  if (code) {
64,360,448✔
268
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
269
    metaCacheClose(pMeta);
128,720,896✔
270
  } else {
64,360,448✔
271
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
272
  }
64,360,448✔
273
  return code;
64,360,448✔
274
}
275

64,360,448✔
276
void metaCacheClose(SMeta* pMeta) {
277
  if (pMeta->pCache) {
278
    entryCacheClose(pMeta);
279
    statsCacheClose(pMeta);
280

21,468✔
281
    taosHashClear(pMeta->pCache->sTagFilterResCache.pTableEntry);
21,468✔
282
    taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
21,468✔
283
    (void)taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
21,468✔
284
    taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
285

286
    (void)taosThreadRwlockDestroy(&pMeta->pCache->sStableTagFilterResCache.rwlock);
95,382,405✔
287
    taosHashCleanup(pMeta->pCache->sStableTagFilterResCache.pTableEntry);
95,382,405✔
288

289
    taosHashClear(pMeta->pCache->STbGroupResCache.pTableEntry);
290
    taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
291
    (void)taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
292
    taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);
95,382,405✔
293

95,408,272✔
294
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStb);
95,387,757✔
295
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStbName);
108,771,540✔
296

13,373,702✔
297
    taosHashClear(pMeta->pCache->STbRefDbCache.pStbRefs);
298
    (void)taosThreadMutexDestroy(&pMeta->pCache->STbRefDbCache.lock);
299
    taosHashCleanup(pMeta->pCache->STbRefDbCache.pStbRefs);
95,359,938✔
300

19,493,497✔
UNCOV
301
    taosMemoryFree(pMeta->pCache);
×
UNCOV
302
    pMeta->pCache = NULL;
×
303
  }
304
}
19,455,806✔
305

19,497,228✔
306
static void metaRehashCache(SMetaCache* pCache, int8_t expand) {
19,452,944✔
307
  int32_t code = 0;
308
  int32_t nBucket;
309

75,903,315✔
310
  if (expand) {
21,468✔
311
    nBucket = pCache->sEntryCache.nBucket * 2;
312
  } else {
21,468✔
313
    nBucket = pCache->sEntryCache.nBucket / 2;
314
  }
315

75,908,015✔
316
  SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
75,888,317✔
UNCOV
317
  if (aBucket == NULL) {
×
318
    return;
×
319
  }
320

321
  // rehash
75,888,317✔
322
  for (int32_t iBucket = 0; iBucket < pCache->sEntryCache.nBucket; iBucket++) {
75,887,671✔
323
    SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
75,840,955✔
324

75,888,136✔
325
    while (pEntry) {
326
      SMetaCacheEntry* pTEntry = pEntry->next;
327

95,383,668✔
328
      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
95,383,668✔
329
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
330

331
      pEntry = pTEntry;
2,597,490✔
332
    }
2,597,490✔
333
  }
334

2,597,490✔
335
  // final set
2,597,490✔
336
  taosMemoryFree(pCache->sEntryCache.aBucket);
2,597,490✔
337
  pCache->sEntryCache.nBucket = nBucket;
2,614,143✔
338
  pCache->sEntryCache.aBucket = aBucket;
16,653✔
339
  return;
340
}
341

2,596,498✔
342
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
2,596,838✔
343
  int32_t code = 0;
2,572,135✔
344

2,572,787✔
345
  // meta is wlocked for calling this func.
2,572,787✔
346

2,572,135✔
347
  // search
2,513,109✔
UNCOV
348
  SMetaCache*       pCache = pMeta->pCache;
×
349
  int32_t           iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
350
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
351
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
24,703✔
352
    ppEntry = &(*ppEntry)->next;
353
  }
354

2,597,490✔
355
  if (*ppEntry) {  // update
2,597,490✔
356
    if (pInfo->suid != (*ppEntry)->info.suid) {
357
      metaError("meta/cache: suid should be same as the one in cache.");
358
      return TSDB_CODE_INVALID_PARA;
1,545,283,887✔
359
    }
1,545,283,887✔
360
    if (pInfo->version > (*ppEntry)->info.version) {
361
      (*ppEntry)->info.version = pInfo->version;
1,545,283,887✔
362
      (*ppEntry)->info.skmVer = pInfo->skmVer;
1,545,440,830✔
363
    }
1,545,447,785✔
364
  } else {  // insert
365
    if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
1,597,247,895✔
366
      metaRehashCache(pCache, 1);
51,792,063✔
367

368
      iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
369
    }
1,545,445,644✔
370

1,461,283,039✔
371
    SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
1,461,269,367✔
372
    if (pEntryNew == NULL) {
373
      code = terrno;
374
      goto _exit;
84,162,605✔
375
    }
376

377
    pEntryNew->info = *pInfo;
1,545,458,668✔
378
    pEntryNew->next = pCache->sEntryCache.aBucket[iBucket];
379
    pCache->sEntryCache.aBucket[iBucket] = pEntryNew;
380
    pCache->sEntryCache.nEntry++;
118,347✔
381
  }
118,347✔
382

383
_exit:
384
  return code;
118,347✔
385
}
110,888✔
386

387
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
7,459✔
388
  int32_t code = 0;
389

390
  SMetaCache*       pCache = pMeta->pCache;
118,347✔
391
  int32_t           iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
118,347✔
UNCOV
392
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
×
UNCOV
393
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
×
394
    ppEntry = &(*ppEntry)->next;
395
  }
396

397
  SMetaCacheEntry* pEntry = *ppEntry;
5,947,531✔
398
  if (pEntry) {
5,829,184✔
399
    *ppEntry = pEntry->next;
400
    taosMemoryFree(pEntry);
11,394,853✔
401
    pCache->sEntryCache.nEntry--;
5,565,669✔
402
    if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
403
        pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
5,565,669✔
404
      metaRehashCache(pCache, 0);
5,565,669✔
405
    }
406
  } else {
5,565,669✔
407
    code = TSDB_CODE_NOT_FOUND;
408
  }
409

410
_exit:
411
  return code;
118,347✔
412
}
118,347✔
413

118,347✔
414
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
415
  int32_t code = 0;
118,347✔
416

118,347✔
417
  SMetaCache*      pCache = pMeta->pCache;
418
  int32_t          iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
419
  SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
63,551,668✔
420

63,551,668✔
421
  while (pEntry && pEntry->info.uid != uid) {
422
    pEntry = pEntry->next;
423
  }
424

425
  if (pEntry) {
63,551,668✔
426
    if (pInfo) {
63,565,882✔
427
      *pInfo = pEntry->info;
63,567,004✔
428
    }
66,873,432✔
429
  } else {
3,321,578✔
430
    code = TSDB_CODE_NOT_FOUND;
431
  }
432

63,537,260✔
433
  return code;
56,808,917✔
434
}
56,814,441✔
435

56,814,816✔
436
static int32_t metaRehashStatsCache(SMetaCache* pCache, int8_t expand) {
56,807,628✔
437
  int32_t code = 0;
438
  int32_t nBucket;
6,739,072✔
439

110,888✔
440
  if (expand) {
110,888✔
441
    nBucket = pCache->sStbStatsCache.nBucket * 2;
442
  } else {
443
    nBucket = pCache->sStbStatsCache.nBucket / 2;
6,745,285✔
444
  }
6,745,352✔
UNCOV
445

×
UNCOV
446
  SMetaStbStatsEntry** aBucket = (SMetaStbStatsEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaStbStatsEntry*));
×
447
  if (aBucket == NULL) {
448
    code = terrno;
449
    goto _exit;
6,745,352✔
450
  }
6,745,285✔
451

6,745,228✔
452
  // rehash
6,745,352✔
453
  for (int32_t iBucket = 0; iBucket < pCache->sStbStatsCache.nBucket; iBucket++) {
454
    SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
455

63,548,483✔
456
    while (pEntry) {
63,548,483✔
457
      SMetaStbStatsEntry* pTEntry = pEntry->next;
458

459
      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
972,634✔
460
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
972,634✔
461

462
      pEntry = pTEntry;
972,634✔
463
    }
972,634✔
464
  }
972,634✔
465

1,109,078✔
466
  // final set
136,444✔
467
  taosMemoryFree(pCache->sStbStatsCache.aBucket);
468
  pCache->sStbStatsCache.nBucket = nBucket;
469
  pCache->sStbStatsCache.aBucket = aBucket;
972,634✔
470

972,634✔
471
_exit:
644,125✔
472
  return code;
644,125✔
473
}
644,125✔
474

644,125✔
475
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
406,295✔
476
  int32_t code = 0;
7,459✔
477

478
  // meta is wlocked for calling this func.
479

328,509✔
480
  // search
481
  SMetaCache*          pCache = pMeta->pCache;
482
  int32_t              iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
971,982✔
483
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
971,982✔
484
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
485
    ppEntry = &(*ppEntry)->next;
486
  }
237,431,851✔
487

237,431,851✔
488
  if (*ppEntry) {  // update
489
    (*ppEntry)->info.ctbNum = pInfo->ctbNum;
237,431,851✔
490
    (*ppEntry)->info.colNum = pInfo->colNum;
237,460,884✔
491
    (*ppEntry)->info.flags = pInfo->flags;
237,453,097✔
492
    (*ppEntry)->info.keep = pInfo->keep;
493
  } else {  // insert
268,072,781✔
494
    if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
30,605,561✔
495
      TAOS_UNUSED(metaRehashStatsCache(pCache, 1));
496
      iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
497
    }
237,469,120✔
498

220,299,252✔
499
    SMetaStbStatsEntry* pEntryNew = (SMetaStbStatsEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
220,297,386✔
500
    if (pEntryNew == NULL) {
501
      code = terrno;
502
      goto _exit;
17,169,868✔
503
    }
504

505
    pEntryNew->info = *pInfo;
237,474,161✔
506
    pEntryNew->next = pCache->sStbStatsCache.aBucket[iBucket];
507
    pCache->sStbStatsCache.aBucket[iBucket] = pEntryNew;
508
    pCache->sStbStatsCache.nEntry++;
509
  }
133,624,753✔
510

133,589,945✔
511
_exit:
512
  return code;
513
}
514

133,615,262✔
515
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
133,615,262✔
516
  int32_t code = 0;
133,619,535✔
517

518
  SMetaCache*          pCache = pMeta->pCache;
133,623,955✔
519
  int32_t              iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
520
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
8,379✔
521
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
522
    ppEntry = &(*ppEntry)->next;
8,379✔
523
  }
8,379✔
524

525
  SMetaStbStatsEntry* pEntry = *ppEntry;
526
  if (pEntry) {
8,379✔
527
    *ppEntry = pEntry->next;
8,379✔
528
    taosMemoryFree(pEntry);
8,379✔
529
    pCache->sStbStatsCache.nEntry--;
530
    if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
8,379✔
531
        pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
8,379✔
532
      TAOS_UNUSED(metaRehashStatsCache(pCache, 0));
8,379✔
533
    }
534
  } else {
535
    code = TSDB_CODE_NOT_FOUND;
536
  }
537

538
_exit:
539
  return code;
540
}
8,379✔
541

8,379✔
542
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
543
  int32_t code = TSDB_CODE_SUCCESS;
8,379✔
544

8,379✔
545
  SMetaCache*         pCache = pMeta->pCache;
3,591✔
546
  int32_t             iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
3,591✔
547
  SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
548

549
  while (pEntry && pEntry->info.uid != uid) {
550
    pEntry = pEntry->next;
4,788✔
551
  }
4,788✔
UNCOV
552

×
UNCOV
553
  if (pEntry) {
×
554
    if (pInfo) {
555
      *pInfo = pEntry->info;
556
    }
4,788✔
557
  } else {
558
    code = TSDB_CODE_NOT_FOUND;
4,788✔
559
  }
4,788✔
560

561
  return code;
562
}
4,788✔
UNCOV
563

×
564
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
565
  memcpy(&pBuf[2], key, keyLen);
566
}
4,788✔
567

568
// the format of key:
4,788✔
569
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
4,788✔
UNCOV
570
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
×
571
  buf[0] = (uint64_t)pHashMap;
572
  buf[1] = suid;
573
  setMD5DigestInKey(buf, key, keyLen);
574
}
4,788✔
575

576
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
577
                                  bool* acquireRes) {
4,788✔
578
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
4,788✔
579
  int32_t vgId = TD_VID(pMeta->pVnode);
580

581
  // generate the composed key for LRU cache
3,591✔
582
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
583
  SHashObj*      pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
3,591✔
UNCOV
584
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
×
585

586
  *acquireRes = 0;
587
  uint64_t key[4];
3,591✔
588
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
3,591✔
UNCOV
589
  
×
UNCOV
590
  // void* tmp = NULL;
×
591
  // uint32_t len = 0;
592
  // (void)taosAscii2Hex((const char*)key, 32, &tmp, &len);
593
  // qDebug("metaGetCachedTableUidList %p %"PRId64" key: %s", pTableMap, suid, tmp);
3,591✔
594
  // taosMemoryFree(tmp);
595

3,591✔
596
  (void)taosThreadMutexLock(pLock);
597
  pMeta->pCache->sTagFilterResCache.accTimes += 1;
3,591✔
598

798✔
599
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
798✔
600
  if (pHandle == NULL) {
798✔
601
    (void)taosThreadMutexUnlock(pLock);
798✔
602
    return TSDB_CODE_SUCCESS;
798✔
603
  }
604

605
  // do some book mark work after acquiring the filter result from cache
606
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
607
  if (NULL == pEntry) {
3,591✔
608
    metaError("meta/cache: pEntry should not be NULL.");
609
    return TSDB_CODE_NOT_FOUND;
610
  }
1,197✔
611

1,197✔
612
  *acquireRes = 1;
1,197✔
613

1,197✔
614
  const char* p = taosLRUCacheValue(pCache, pHandle);
1,197✔
615
  int32_t     size = *(int32_t*)p;
616

1,197✔
617
  // set the result into the buffer
1,197✔
618
  if (taosArrayAddBatch(pList1, p + sizeof(int32_t), size) == NULL) {
1,197✔
619
    return terrno;
1,197✔
620
  }
1,197✔
621

1,197✔
622
  (*pEntry)->hitTimes += 1;
1,197✔
623

624
  uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
1,197✔
625
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
1,197✔
626
    metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
×
UNCOV
627
             ((double)(*pEntry)->hitTimes) / acc);
×
UNCOV
628
  }
×
UNCOV
629

×
630
  bool ret = taosLRUCacheRelease(pCache, pHandle, false);
UNCOV
631

×
632
  // unlock meta
633
  (void)taosThreadMutexUnlock(pLock);
634
  return TSDB_CODE_SUCCESS;
1,197✔
635
}
636

637
int32_t metaStableTagFilterCacheGet(void* pVnode, tb_uid_t suid,
638
  const uint8_t* pTagCondKey, int32_t tagCondKeyLen,
3,591✔
639
  const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) {
640

3,591✔
641
  int32_t code = TSDB_CODE_SUCCESS;
3,591✔
642
  int32_t lino = 0;
3,591✔
643
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
644
  int32_t vgId = TD_VID(pMeta->pVnode);
3,591✔
NEW
645
  *acquireRes = 0;
×
646

647
  // generate the composed key for LRU cache
NEW
648
  SHashObj*       pTableMap = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
×
NEW
649
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
×
650

651
  code = taosThreadRwlockRdlock(pRwlock);
652
  TSDB_CHECK_CODE(code, lino, _end);
3,591✔
NEW
653
  pMeta->pCache->sStableTagFilterResCache.accTimes += 1;
×
654

655
  STagConds** pTagConds =
NEW
656
    (STagConds**)taosHashGet(pTableMap, &suid, sizeof(tb_uid_t));
×
NEW
657
  TSDB_CHECK_NULL(pTagConds, code, lino, _end, TSDB_CODE_SUCCESS);
×
658

659
  STagCondFilterEntry** pFilterEntry = (STagCondFilterEntry**)taosHashGet(
660
    (*pTagConds)->set, pTagCondKey, tagCondKeyLen);
3,591✔
661
  TSDB_CHECK_NULL(pFilterEntry, code, lino, _end, TSDB_CODE_SUCCESS);
3,591✔
662

3,591✔
663
  SArray** pArray = (SArray**)taosHashGet((*pFilterEntry)->set, pKey, keyLen);
664
  TSDB_CHECK_NULL(pArray, code, lino, _end, TSDB_CODE_SUCCESS);
3,591✔
665

3,591✔
666
  // set the result into the buffer
667
  *acquireRes = 1;
668
  TAOS_UNUSED(taosArrayAddBatch(
669
    pList1, TARRAY_GET_ELEM(*pArray, 0), taosArrayGetSize(*pArray)));
670

671
  // do some bookmark work after acquiring the filter result from cache
672
  (*pTagConds)->hitTimes += 1;
673
  (*pFilterEntry)->hitTimes += 1;
3,591✔
674
  uint64_t hit = ++pMeta->pCache->sStableTagFilterResCache.hitTimes;
3,591✔
675
  uint64_t acc = pMeta->pCache->sStableTagFilterResCache.accTimes;
3,591✔
676
  if ((*pTagConds)->hitTimes % 1000 == 0 && (*pTagConds)->hitTimes > 0) {
1,197✔
677
    metaInfo(
1,197✔
NEW
678
      "vgId:%d, suid:%" PRIu64 
×
679
      ", current stable cache hit:%" PRIu32 ", this tag condition hit:%" PRIu32
680
      ", total cache hit:%" PRIu64 ", acc:%" PRIu64 ", hit rate:%.2f%%",
681
      vgId, suid, (*pTagConds)->hitTimes, (*pFilterEntry)->hitTimes, hit, acc,
2,394✔
682
      ((double)hit / acc * 100));
2,394✔
683
  }
NEW
684

×
NEW
685
_end:
×
686
  if (TSDB_CODE_SUCCESS != code) {
687
    metaError("vgId:%d, %s failed at %s:%d since %s",
2,394✔
NEW
688
      vgId, __func__, __FILE__, lino, tstrerror(code));
×
689
  }
690
  // unlock meta
691
  code = taosThreadRwlockUnlock(pRwlock);
692
  if (TSDB_CODE_SUCCESS != code) {
693
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
3,591✔
694
      vgId, __func__, __FILE__, lino, tstrerror(code));
695
  }
3,591✔
696
  return code;
3,591✔
697
}
3,591✔
698

699
static void freeUidCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
700
  (void)ud;
3,591✔
701
  if (value == NULL) {
702
    return;
703
  }
4,006,635✔
704

4,006,635✔
705
  const uint64_t* p = key;
4,006,635✔
706
  if (keyLen != sizeof(int64_t) * 4) {
4,006,635✔
707
    metaError("key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
4,006,635✔
708
    return;
4,006,635✔
709
  }
710

711
  SHashObj* pHashObj = (SHashObj*)p[0];
66,796,581✔
712

66,796,581✔
713
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
66,804,717✔
714

66,804,620✔
715
  if (pEntry != NULL && (*pEntry) != NULL) {
716
    int64_t st = taosGetTimestampUs();
66,808,113✔
717
    int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
66,807,435✔
718
    if (code == TSDB_CODE_SUCCESS) {
719
      double el = (taosGetTimestampUs() - st) / 1000.0;
66,803,692✔
720
      metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", taosHashGetSize((*pEntry)->set),
66,807,851✔
721
               el);
722
    }
66,812,906✔
723
  }
66,808,432✔
724

66,807,634✔
725
  taosMemoryFree(value);
66,805,313✔
726
}
727

728
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
798✔
729
  int32_t             code = TSDB_CODE_SUCCESS;
730
  int32_t             lino = 0;
798✔
731
  STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
1,596✔
732
  TSDB_CHECK_NULL(p, code, lino, _end, terrno);
733

798✔
734
  p->hitTimes = 0;
798✔
735
  p->set = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
736
  TSDB_CHECK_NULL(p->set, code, lino, _end, terrno);
798✔
737
  code = taosHashPut(p->set, pKey, keyLen, NULL, 0);
798✔
738
  TSDB_CHECK_CODE(code, lino, _end);
739
  code = taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
798✔
740
  TSDB_CHECK_CODE(code, lino, _end);
798✔
741

742
_end:
UNCOV
743
  if (code != TSDB_CODE_SUCCESS) {
×
744
    metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
745
    if (p != NULL) {
×
746
      if (p->set != NULL) {
747
        taosHashCleanup(p->set);
UNCOV
748
      }
×
749
      taosMemoryFree(p);
×
UNCOV
750
    }
×
751
  }
UNCOV
752
  return code;
×
UNCOV
753
}
×
UNCOV
754

×
755
// check both the payload size and selectivity ratio
UNCOV
756
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
×
UNCOV
757
                              int32_t payloadLen, double selectivityRatio) {
×
758
  int32_t code = 0;
UNCOV
759
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
UNCOV
760
  int32_t vgId = TD_VID(pMeta->pVnode);
×
UNCOV
761

×
UNCOV
762
  if (selectivityRatio > tsSelectivityRatio) {
×
763
    metaDebug("vgId:%d, suid:%" PRIu64
764
              " failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
UNCOV
765
              vgId, suid, selectivityRatio, tsSelectivityRatio);
×
766
    taosMemoryFree(pPayload);
×
767
    return TSDB_CODE_SUCCESS;
×
UNCOV
768
  }
×
769

770
  if (payloadLen > tsTagFilterResCacheSize) {
771
    metaDebug("vgId:%d, suid:%" PRIu64
×
772
              " failed to add to uid list cache, due to payload length %d greater than threshold %d",
UNCOV
773
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
×
774
    taosMemoryFree(pPayload);
775
    return TSDB_CODE_SUCCESS;
×
UNCOV
776
  }
×
UNCOV
777

×
778
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
779
  SHashObj*      pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
780
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
UNCOV
781

×
782
  uint64_t key[4] = {0};
783
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
UNCOV
784

×
UNCOV
785
  // void* tmp = NULL;
×
786
  // uint32_t len = 0;
787
  // (void)taosAscii2Hex((const char*)key, 32, &tmp, &len);
UNCOV
788
  // qDebug("metaUidFilterCachePut %p %"PRId64" key: %s", pTableEntry, suid, tmp);
×
789
  // taosMemoryFree(tmp);
UNCOV
790

×
UNCOV
791
  (void)taosThreadMutexLock(pLock);
×
792
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
793
  if (pEntry == NULL) {
UNCOV
794
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
×
UNCOV
795
    if (code != TSDB_CODE_SUCCESS) {
×
796
      goto _end;
×
UNCOV
797
    }
×
798
  } else {  // check if it exists or not
799
    code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
UNCOV
800
    if (code == TSDB_CODE_DUP_KEY) {
×
801
      // we have already found the existed items, no need to added to cache anymore.
802
      (void)taosThreadMutexUnlock(pLock);
×
803
      return TSDB_CODE_SUCCESS;
UNCOV
804
    }
×
UNCOV
805
    if (code != TSDB_CODE_SUCCESS) {
×
806
      goto _end;
×
UNCOV
807
    }
×
UNCOV
808
  }
×
UNCOV
809

×
810
  // add to cache.
811
  (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL, NULL,
812
                           TAOS_LRU_PRIORITY_LOW, NULL);
813
_end:
UNCOV
814
  (void)taosThreadMutexUnlock(pLock);
×
815
  metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
816
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
UNCOV
817

×
818
  return code;
UNCOV
819
}
×
UNCOV
820

×
NEW
821
static void freeSArrayPtr(void* pp) {
×
822
  SArray* pArray = *(SArray**)pp;
NEW
823
  taosArrayDestroy(pArray);
×
NEW
824
}
×
825

826
int32_t metaStableTagFilterCachePut(
NEW
827
  void* pVnode, uint64_t suid, const void* pTagCondKey, int32_t tagCondKeyLen,
×
NEW
828
  const void* pKey, int32_t keyLen, SArray* pUidList, SArray** pTagColIds) {
×
829

830
  int32_t code = TSDB_CODE_SUCCESS;
NEW
831
  int32_t lino = 0;
×
NEW
832
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
NEW
833
  int32_t vgId = TD_VID(pMeta->pVnode);
×
834

NEW
835
  SHashObj*       pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
×
NEW
836
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
×
837

NEW
838
  code = taosThreadRwlockWrlock(pRwlock);
×
NEW
839
  TSDB_CHECK_CODE(code, lino, _end);
×
NEW
840

×
NEW
841
  STagConds** pTagConds = 
×
NEW
842
    (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
×
NEW
843
  if (pTagConds == NULL) {
×
844
    // add new (suid -> tag conds) entry
845
    STagConds* pEntry = (STagConds*)taosMemoryMalloc(sizeof(STagConds));
NEW
846
    TSDB_CHECK_NULL(pEntry, code, lino, _end, terrno);
×
NEW
847

×
848
    pEntry->hitTimes = 0;
NEW
849
    pEntry->set = taosHashInit(
×
NEW
850
      1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
×
851
      false, HASH_NO_LOCK);
NEW
852
    taosHashSetFreeFp(pEntry->set, freeTagFilterEntryFp);
×
NEW
853
    TSDB_CHECK_NULL(pEntry->set, code, lino, _end, terrno);
×
854

855
    code = taosHashPut(
856
      pTableEntry, &suid, sizeof(uint64_t), &pEntry, POINTER_BYTES);
857
    TSDB_CHECK_CODE(code, lino, _end);
NEW
858

×
859
    pTagConds = (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
NEW
860
    TSDB_CHECK_NULL(pTagConds, code, lino, _end, TSDB_CODE_NOT_FOUND);
×
NEW
861
  }
×
NEW
862

×
863
  STagCondFilterEntry** pFilterEntry =
864
    (STagCondFilterEntry**)taosHashGet(
NEW
865
      (*pTagConds)->set, pTagCondKey, tagCondKeyLen);
×
866
  if (pFilterEntry == NULL) {
867
    // add new (tag cond -> filter entry) entry
868
    STagCondFilterEntry* pEntry = 
869
      (STagCondFilterEntry*)taosMemoryMalloc(sizeof(STagCondFilterEntry));
66,795,678✔
870
    TSDB_CHECK_NULL(pEntry, code, lino, _end, terrno);
66,795,678✔
871

66,801,495✔
872
    pEntry->hitTimes = 0;
66,803,107✔
873
    pEntry->set = taosHashInit(
874
      1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
66,801,732✔
875
      false, HASH_NO_LOCK);
66,805,167✔
876
    TSDB_CHECK_NULL(pEntry->set, code, lino, _end, terrno);
877
    taosHashSetFreeFp(pEntry->set, freeSArrayPtr);
66,805,634✔
878
    pEntry->pColIds = *pTagColIds;
66,802,712✔
879
    *pTagColIds = NULL;
880

66,802,375✔
881
    code = taosHashPut(
66,812,913✔
882
      (*pTagConds)->set, pTagCondKey, tagCondKeyLen, &pEntry, POINTER_BYTES);
66,812,913✔
883
    TSDB_CHECK_CODE(code, lino, _end);
66,814,213✔
884

885
    pFilterEntry = (STagCondFilterEntry**)taosHashGet(
NEW
886
      (*pTagConds)->set, pTagCondKey, tagCondKeyLen);
×
887
  } else {
NEW
888
    // pColIds is already set, so we can destroy the new one
×
NEW
889
    taosArrayDestroy(*pTagColIds);
×
890
    *pTagColIds = NULL;
NEW
891
  }
×
NEW
892

×
893
  // add to cache.
NEW
894
  SArray* pPayload = taosArrayDup(pUidList, NULL);
×
NEW
895
  code = taosHashPut(
×
896
    (*pFilterEntry)->set, pKey, keyLen, &pPayload, POINTER_BYTES);
NEW
897
  TSDB_CHECK_CODE(code, lino, _end);
×
NEW
898
  pMeta->pCache->sStableTagFilterResCache.numTagDataEntries += 1;
×
899
  (*pTagConds)->numTagDataEntries += 1;
900

901
_end:
114,922,576✔
902
  if (TSDB_CODE_SUCCESS != code) {
114,922,576✔
903
    metaError("vgId:%d, %s failed at %s:%d since %s",
1,080✔
904
      vgId, __func__, __FILE__, lino, tstrerror(code));
905
  } else {
906
    metaInfo("vgId:%d, suid:%" PRIu64 " new tag data filter entry added, "
114,921,496✔
NEW
907
      "uid num:%d, current stable tag conditions num:%d, "
×
908
      "this tag condition data entries num:%d, "
909
      "cache stable num:%d, total tag data entries num:%" PRIu32 ", "
910
      "total tag data entries num:%" PRIu32,
114,951,830✔
911
      vgId, suid, (int32_t)taosArrayGetSize(pUidList), 
912
      pTagConds ? (int32_t)taosHashGetSize((*pTagConds)->set) : 0,
913
      pFilterEntry ? (int32_t)taosHashGetSize((*pFilterEntry)->set) : 0,
6,480✔
914
      (int32_t)taosHashGetSize(pTableEntry),
6,480✔
915
      pMeta->pCache->sStableTagFilterResCache.numTagDataEntries,
2,376✔
916
      (*pTagConds)->numTagDataEntries);
917
  }
918
  // unlock meta
4,104✔
919
  code = taosThreadRwlockUnlock(pRwlock);
4,104✔
920
  if (TSDB_CODE_SUCCESS != code) {
921
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
NEW
922
      vgId, __func__, __FILE__, lino, tstrerror(code));
×
923
  }
924

925
  return code;
3,888✔
926
}
3,888✔
927

3,888✔
928
// drop all the cache entries for a super table 
NEW
929
int32_t metaStableTagFilterCacheDropSTable(
×
930
  SMeta* pMeta, tb_uid_t suid) {
931
  if (pMeta == NULL) {
932
    return TSDB_CODE_INVALID_PARA;
4,007,692✔
933
  }
934
  int32_t   lino = 0;
4,007,692✔
935
  int32_t   code = TSDB_CODE_SUCCESS;
4,007,692✔
936
  SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
4,007,692✔
937
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
938

4,007,692✔
939
  code = taosThreadRwlockWrlock(pRwlock);
4,008,039✔
940
  TSDB_CHECK_CODE(code, lino, _end);
216✔
941
  STagConds** pTagConds = taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t));
216✔
942
  if (pTagConds != NULL) {
4,007,823✔
NEW
943
    pMeta->pCache->sStableTagFilterResCache.
×
NEW
944
      numTagDataEntries -= (*pTagConds)->numTagDataEntries;
×
945
  }
946
  code = taosHashRemove(pTableEntry, &suid, sizeof(tb_uid_t));
4,008,039✔
947
  TSDB_CHECK_CODE(code, lino, _end);
4,320✔
948

4,104✔
949
_end:
950
  if (TSDB_CODE_SUCCESS != code) {
951
    metaError("vgId:%d, %s failed at %s:%d since %s",
952
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
953
  } else {
4,008,039✔
954
    metaDebug(
955
      "vgId:%d, suid:%" PRIu64 " stable tag filter cache dropped from cache",
956
      TD_VID(pMeta->pVnode), suid);
175,075✔
957
  }
175,075✔
958
  code = taosThreadRwlockUnlock(pRwlock);
959
  if (TSDB_CODE_SUCCESS != code) {
175,075✔
960
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
158,994✔
961
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
962
  }
963
  return code;
16,708✔
964
}
16,708✔
965

16,093✔
966
static int32_t buildTagDataEntryKey(
16,093✔
967
  const SArray* pColIds, STag* pTag, T_MD5_CTX* pContext) {
16,093✔
968
  int32_t code = TSDB_CODE_SUCCESS;
969
  int32_t keyLen = 0;
16,093✔
970
  // get length first
16,093✔
971
  for (int32_t i = 0; i < taosArrayGetSize(pColIds); i++) {
972
    STagVal pTagValue = {.cid = *(col_id_t*)taosArrayGet(pColIds, i)};
973
    if (tTagGet(pTag, &pTagValue)) {
615✔
974
      keyLen += sizeof(col_id_t);
975
      if (IS_VAR_DATA_TYPE(pTagValue.type)) {
976
        keyLen += pTagValue.nData;
300,971✔
977
      } else {
300,971✔
978
        keyLen += tDataTypes[pTagValue.type].bytes;
300,971✔
979
      }
300,971✔
980
    } else {
300,971✔
981
      // tag value not found
982
      code = TSDB_CODE_NOT_FOUND;
300,971✔
983
      return code;
984
    }
300,971✔
985
  }
300,971✔
986

298,208✔
987
  char* pKey = taosMemoryCalloc(1, keyLen);
988
  if (NULL == pKey) {
989
    code = terrno;
2,763✔
990
    return code;
991
  }
2,763✔
992

993
  // build the key
1,991✔
994
  char* pStart = pKey;
300,971✔
995
  for (int32_t i = 0; i < taosArrayGetSize(pColIds); i++) {
300,971✔
996
    STagVal pTagValue = {.cid = *(col_id_t*)taosArrayGet(pColIds, i)};
997
    if (tTagGet(pTag, &pTagValue)) {
998
      // copy cid
2,077,772✔
999
      memcpy(pStart, &pTagValue.cid, sizeof(col_id_t));
2,077,772✔
1000
      pStart += sizeof(col_id_t);
2,077,772✔
1001
      // copy value
2,077,772✔
1002
      if (IS_VAR_DATA_TYPE(pTagValue.type)) {
2,077,794✔
1003
        int32_t varLen = pTagValue.nData;
2,080,023✔
1004
        memcpy(pStart, pTagValue.pData, varLen);
1005
        pStart += varLen;
2,080,050✔
1006
      } else {
1007
        int32_t typeLen = tDataTypes[pTagValue.type].bytes;
2,080,621✔
1008
        memcpy(pStart, &pTagValue.i64, typeLen);
2,078,853✔
1009
        pStart += typeLen;
2,055,353✔
1010
      }
7,153,367✔
1011
    } else {
5,095,691✔
1012
      // tag value not found
5,093,390✔
1013
      taosMemoryFree(pKey);
5,093,390✔
1014
      code = TSDB_CODE_NOT_FOUND;
5,094,538✔
1015
      return code;
5,093,939✔
1016
    }
5,093,939✔
1017
  }
5,092,275✔
1018

5,092,275✔
1019
  // update MD5
5,095,131✔
1020
  tMD5Init(pContext);
5,095,131✔
1021
  tMD5Update(pContext, (uint8_t*)pKey, (uint32_t)keyLen);
1022
  tMD5Final(pContext);
1023

1024
  taosMemFreeClear(pKey);
2,081,176✔
1025
  return code;
2,080,582✔
NEW
1026
}
×
1027

1028
// remove the dropped table uid from all cache entries
2,080,582✔
1029
// pDroppedTable is the dropped child table meta entry
2,080,558✔
1030
int32_t metaStableTagFilterCacheUpdateUid(SMeta* pMeta,
1031
  const SMetaEntry* pDroppedTable, ETagFilterCacheAction action) {
1032
  if (pMeta == NULL || pDroppedTable == NULL) {
23,500✔
1033
    return TSDB_CODE_INVALID_PARA;
23,500✔
1034
  }
23,500✔
1035
  int32_t   lino = 0;
23,500✔
1036
  int32_t   code = TSDB_CODE_SUCCESS;
1037
  SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
23,500✔
1038
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
23,500✔
1039

1040
  code = taosThreadRwlockWrlock(pRwlock);
23,500✔
1041
  TSDB_CHECK_CODE(code, lino, _end);
23,500✔
1042

1043
  tb_uid_t suid = pDroppedTable->ctbEntry.suid;;
23,500✔
1044
  STagConds** pTagConds =
1045
    (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t));
23,500✔
1046
  if (pTagConds != NULL) {
23,500✔
NEW
1047
    STagCondFilterEntry** ppFilterEntry = NULL;
×
1048
    while ((ppFilterEntry = taosHashIterate((*pTagConds)->set, ppFilterEntry))) {
1049
      STagCondFilterEntry* pFilterEntry = *ppFilterEntry;
23,500✔
1050
      // rebuild the tagCondKey and check existence
1051
      SArray* pColIds = pFilterEntry->pColIds;
1052
      // rebuild the tagCondFilterKey
26,595✔
1053
      int32_t keyLen = 0;
26,595✔
1054
      char*   pKey = NULL;
26,595✔
1055
      T_MD5_CTX context = {0};
26,595✔
1056
      code = buildTagDataEntryKey(pColIds, (STag*)pDroppedTable->ctbEntry.pTags, &context);
26,595✔
1057
      if (code != TSDB_CODE_SUCCESS) {
26,595✔
1058
        metaError("vgId:%d, suid:%" PRIu64 " failed to build tag condition key for dropped table uid:%" PRIu64
1059
          " since %s",
26,595✔
1060
          TD_VID(pMeta->pVnode), suid, pDroppedTable->uid, tstrerror(code));
1061
        goto _end;
26,595✔
1062
      }
26,595✔
1063

26,595✔
1064
      SArray** pArray = (SArray**)taosHashGet(
23,500✔
1065
        pFilterEntry->set, context.digest, tListLen(context.digest));
23,500✔
1066
      if (pArray != NULL) {
1067
        // check and remove the dropped table uid from the array
3,095✔
1068
        // TODO(Tony Zhang): optimize this scan
1069
        if (action == STABLE_TAG_FILTER_CACHE_DROP_TABLE) {
1070
          for (int32_t i = 0; i < taosArrayGetSize(*pArray); i++) {
61,433✔
1071
            uint64_t uid = *(uint64_t*)taosArrayGet(*pArray, i);
34,838✔
1072
            if (uid == pDroppedTable->uid) {
34,838✔
1073
              taosArrayRemove(*pArray, i);
34,838✔
1074
              metaDebug("vgId:%d, suid:%" PRIu64 " removed dropped table uid:%" PRIu64
34,838✔
1075
                " from stable tag filter cache",
34,838✔
1076
                TD_VID(pMeta->pVnode), suid, pDroppedTable->uid);
1077
              break;
1078
            } 
1079
          }
26,595✔
1080
        } else {
26,595✔
NEW
1081
          // STABLE_TAG_FILTER_CACHE_ADD_TABLE
×
1082
          void* _tmp = taosArrayPush(*pArray, &pDroppedTable->uid);
1083
        }
26,595✔
1084
      }
1085
    }
26,595✔
1086
  }
1087

1088
_end:
1089
  if (TSDB_CODE_SUCCESS != code) {
1090
    metaError("vgId:%d, %s failed at %s:%d since %s",
1091
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
1092
  } else {
1093
    metaDebug(
1094
      "vgId:%d, suid:%" PRIu64 " update table uid:%" PRIu64
1095
      " in stable tag filter cache, action:%d",
1096
      TD_VID(pMeta->pVnode),
1097
      pDroppedTable->ctbEntry.suid, pDroppedTable->uid, action);
1098
  }
1099
  code = taosThreadRwlockUnlock(pRwlock);
1100
  if (TSDB_CODE_SUCCESS != code) {
1101
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
1102
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
1103
  }
1104
  return code;
1105
}
1106

1107
int32_t metaStableTagFilterCacheDropTag(SMeta* pMeta,
1108
  tb_uid_t suid, col_id_t cid) {
1109
  if (pMeta == NULL) {
1110
    return TSDB_CODE_INVALID_PARA;
1111
  }
1112
  int32_t   lino = 0;
1113
  int32_t   code = TSDB_CODE_SUCCESS;
1114
  SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
1115
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
1116

1117
  code = taosThreadRwlockWrlock(pRwlock);
1118
  TSDB_CHECK_CODE(code, lino, _end);
1119

1120
  STagConds** pTagConds =
1121
    (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t));
1122
  if (pTagConds != NULL) {
1123
    void* pIter = taosHashIterate((*pTagConds)->set, NULL);
1124
    while (pIter) {
1125
      STagCondFilterEntry* pFilterEntry = *(STagCondFilterEntry**)pIter;
1126
      bool found = false;
1127
      for (int32_t i = 0; i < taosArrayGetSize(pFilterEntry->pColIds); i++) {
1128
        col_id_t existCid = *(col_id_t*)taosArrayGet(pFilterEntry->pColIds, i);
1129
        if (existCid == cid) {
1130
          found = true;
1131
          break;
1132
        }
1133
      }
1134
      if (found) {
1135
        uint32_t numEntries = taosHashGetSize(pFilterEntry->set);
1136
        size_t keyLen = 0;
1137
        char  *key = (char *)taosHashGetKey(pIter, &keyLen);
1138
        code = taosHashRemove((*pTagConds)->set, key, keyLen);
1139
        TSDB_CHECK_CODE(code, lino, _end);
1140
        (*pTagConds)->numTagDataEntries -= numEntries;
1141
        pMeta->pCache->sStableTagFilterResCache.numTagDataEntries -= numEntries;
1142
      }
1143
      pIter = taosHashIterate((*pTagConds)->set, pIter);
1144
    }
1145
  }
1146
_end:
1147
  if (TSDB_CODE_SUCCESS != code) {
1148
    metaError("vgId:%d, %s failed at %s:%d since %s",
1149
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
1150
  } else {
1151
    metaDebug(
1152
      "vgId:%d, suid:%" PRIu64 " dropped tag cid:%d "
1153
      "from stable tag filter cache",
1154
      TD_VID(pMeta->pVnode), suid, cid);
1155
  }
1156
  code = taosThreadRwlockUnlock(pRwlock);
1157
  if (TSDB_CODE_SUCCESS != code) {
1158
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
1159
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
1160
  }
1161
  return code;
1162
}
1163

1164
void metaCacheClear(SMeta* pMeta) {
1165
  metaWLock(pMeta);
1166
  metaCacheClose(pMeta);
1167
  (void)metaCacheOpen(pMeta);
1168
  metaULock(pMeta);
1169
}
1170

1171
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
1172
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
1173
  uint64_t  p[4] = {0};
1174
  int32_t   vgId = TD_VID(pMeta->pVnode);
1175
  SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
1176

1177
  uint64_t dummy[2] = {0};
1178
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
1179

1180
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
1181
  (void)taosThreadMutexLock(pLock);
1182

1183
  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
1184
  if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
1185
    (void)taosThreadMutexUnlock(pLock);
1186
    return TSDB_CODE_SUCCESS;
1187
  }
1188

1189
  (*pEntry)->hitTimes = 0;
1190

1191
  char *iter = taosHashIterate((*pEntry)->set, NULL);
1192
  while (iter != NULL) {
1193
    setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
1194
    taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
1195
    iter = taosHashIterate((*pEntry)->set, iter);
1196
  }
1197
  taosHashClear((*pEntry)->set);
1198
  (void)taosThreadMutexUnlock(pLock);
1199

1200
  metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
1201
  return TSDB_CODE_SUCCESS;
1202
}
1203

1204
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
1205
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
1206
  int32_t vgId = TD_VID(pMeta->pVnode);
1207

1208
  // generate the composed key for LRU cache
1209
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
1210
  SHashObj*      pTableMap = pMeta->pCache->STbGroupResCache.pTableEntry;
1211
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
1212

1213
  *pList = NULL;
1214
  uint64_t key[4];
1215
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
1216

1217
  (void)taosThreadMutexLock(pLock);
1218
  pMeta->pCache->STbGroupResCache.accTimes += 1;
1219

1220
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
1221
  if (pHandle == NULL) {
1222
    (void)taosThreadMutexUnlock(pLock);
1223
    return TSDB_CODE_SUCCESS;
1224
  }
1225

1226
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
1227
  if (NULL == pEntry) {
1228
    metaDebug("suid %" PRIu64 " not in tb group cache", suid);
1229
    return TSDB_CODE_NOT_FOUND;
1230
  }
1231

1232
  *pList = taosArrayDup(taosLRUCacheValue(pCache, pHandle), NULL);
1233

1234
  (*pEntry)->hitTimes += 1;
1235

1236
  uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
1237
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
1238
    metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
1239
             ((double)(*pEntry)->hitTimes) / acc);
1240
  }
1241

1242
  bool ret = taosLRUCacheRelease(pCache, pHandle, false);
1243

1244
  // unlock meta
1245
  (void)taosThreadMutexUnlock(pLock);
1246
  return TSDB_CODE_SUCCESS;
1247
}
1248

1249
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
1250
  (void)ud;
1251
  if (value == NULL) {
1252
    return;
1253
  }
1254

1255
  const uint64_t* p = key;
1256
  if (keyLen != sizeof(int64_t) * 4) {
1257
    metaError("tb group key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
1258
    return;
1259
  }
1260

1261
  SHashObj* pHashObj = (SHashObj*)p[0];
1262

1263
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
1264

1265
  if (pEntry != NULL && (*pEntry) != NULL) {
1266
    int64_t st = taosGetTimestampUs();
1267
    int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
1268
    if (code == TSDB_CODE_SUCCESS) {
1269
      double el = (taosGetTimestampUs() - st) / 1000.0;
1270
      metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
1271
                taosHashGetSize((*pEntry)->set), el);
1272
    }
1273
  }
1274

1275
  taosArrayDestroy((SArray*)value);
1276
}
1277

1278
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
1279
                              int32_t payloadLen) {
1280
  int32_t code = 0;
1281
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
1282
  int32_t vgId = TD_VID(pMeta->pVnode);
1283

1284
  if (payloadLen > tsTagFilterResCacheSize) {
1285
    metaDebug("vgId:%d, suid:%" PRIu64
1286
              " ignore to add to tb group cache, due to payload length %d greater than threshold %d",
1287
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
1288
    taosArrayDestroy((SArray*)pPayload);
1289
    return TSDB_CODE_SUCCESS;
1290
  }
1291

1292
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
1293
  SHashObj*      pTableEntry = pMeta->pCache->STbGroupResCache.pTableEntry;
1294
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
1295

1296
  uint64_t key[4] = {0};
1297
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
1298

1299
  (void)taosThreadMutexLock(pLock);
1300
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
1301
  if (pEntry == NULL) {
1302
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
1303
    if (code != TSDB_CODE_SUCCESS) {
1304
      goto _end;
1305
    }
1306
  } else {  // check if it exists or not
1307
    code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
1308
    if (code == TSDB_CODE_DUP_KEY) {
1309
      // we have already found the existed items, no need to added to cache anymore.
1310
      (void)taosThreadMutexUnlock(pLock);
1311
      return TSDB_CODE_SUCCESS;
1312
    }
1313
    if (code != TSDB_CODE_SUCCESS) {
1314
      goto _end;
1315
    }
1316
  }
1317

1318
  // add to cache.
1319
  (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL, NULL,
1320
                           TAOS_LRU_PRIORITY_LOW, NULL);
1321
_end:
1322
  (void)taosThreadMutexUnlock(pLock);
1323
  metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
1324
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
1325

1326
  return code;
1327
}
1328

1329
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
1330
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
1331
  uint64_t  p[4] = {0};
1332
  int32_t   vgId = TD_VID(pMeta->pVnode);
1333
  SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;
1334

1335
  uint64_t dummy[2] = {0};
1336
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
1337

1338
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
1339
  (void)taosThreadMutexLock(pLock);
1340

1341
  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
1342
  if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
1343
    (void)taosThreadMutexUnlock(pLock);
1344
    return TSDB_CODE_SUCCESS;
1345
  }
1346

1347
  (*pEntry)->hitTimes = 0;
1348

1349
  char *iter = taosHashIterate((*pEntry)->set, NULL);
1350
  while (iter != NULL) {
1351
    setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
1352
    taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
1353
    iter = taosHashIterate((*pEntry)->set, iter);
1354
  }
1355
  taosHashClear((*pEntry)->set);
1356
  (void)taosThreadMutexUnlock(pLock);
1357

1358
  metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
1359
  return TSDB_CODE_SUCCESS;
1360
}
1361

1362
bool metaTbInFilterCache(SMeta* pMeta, const void* key, int8_t type) {
1363
  if (type == 0 && taosHashGet(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t))) {
1364
    return true;
1365
  }
1366

1367
  if (type == 1 && taosHashGet(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key))) {
1368
    return true;
1369
  }
1370

1371
  return false;
1372
}
1373

1374
int32_t metaPutTbToFilterCache(SMeta* pMeta, const void* key, int8_t type) {
1375
  if (type == 0) {
1376
    return taosHashPut(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t), NULL, 0);
1377
  }
1378

1379
  if (type == 1) {
1380
    return taosHashPut(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key), NULL, 0);
1381
  }
1382

1383
  return 0;
1384
}
1385

1386
int32_t metaSizeOfTbFilterCache(SMeta* pMeta, int8_t type) {
1387
  if (type == 0) {
1388
    return taosHashGetSize(pMeta->pCache->STbFilterCache.pStb);
1389
  }
1390
  return 0;
1391
}
1392

1393
int32_t metaInitTbFilterCache(SMeta* pMeta) {
1394
#ifdef TD_ENTERPRISE
1395
  int32_t      tbNum = 0;
1396
  const char** pTbArr = NULL;
1397
  const char*  dbName = NULL;
1398

1399
  if (!(dbName = strchr(pMeta->pVnode->config.dbname, '.'))) return 0;
1400
  if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
1401
    tbNum = tkLogStbNum;
1402
    pTbArr = (const char**)&tkLogStb;
1403
  } else if (0 == strncmp(dbName, "audit", TSDB_DB_NAME_LEN)) {
1404
    tbNum = tkAuditStbNum;
1405
    pTbArr = (const char**)&tkAuditStb;
1406
  }
1407
  if (tbNum && pTbArr) {
1408
    for (int32_t i = 0; i < tbNum; ++i) {
1409
      TAOS_CHECK_RETURN(metaPutTbToFilterCache(pMeta, pTbArr[i], 1));
1410
    }
1411
  }
1412
#else
1413
#endif
1414
  return 0;
1415
}
1416

1417
int64_t metaGetStbKeep(SMeta* pMeta, int64_t uid) {
1418
  SMetaStbStats stats = {0};
1419

1420
  if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
1421
    return stats.keep;
1422
  }
1423

1424
  SMetaEntry* pEntry = NULL;
1425
  if (metaFetchEntryByUid(pMeta, uid, &pEntry) == TSDB_CODE_SUCCESS) {
1426
    int64_t keep = -1;
1427
    if (pEntry->type == TSDB_SUPER_TABLE) {
1428
      keep = pEntry->stbEntry.keep;
1429
    }
1430
    metaFetchEntryFree(&pEntry);
1431
    return keep;
1432
  }
1433
  
1434
  return -1;
1435
}
1436

1437
int32_t metaRefDbsCacheClear(SMeta* pMeta, uint64_t suid) {
1438
  int32_t        code = TSDB_CODE_SUCCESS;
1439
  int32_t        vgId = TD_VID(pMeta->pVnode);
1440
  SHashObj*      pEntryHashMap = pMeta->pCache->STbRefDbCache.pStbRefs;
1441
  TdThreadMutex* pLock = &pMeta->pCache->STbRefDbCache.lock;
1442

1443
  (void)taosThreadMutexLock(pLock);
1444

1445
  SHashObj** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
1446
  if (pEntry == NULL) {
1447
    goto _return;
1448
  }
1449

1450
  code = taosHashRemove(pEntryHashMap, &suid, sizeof(uint64_t));
1451

1452
  metaDebug("vgId:%d suid:%" PRId64 " cached virtual stable ref db cleared", vgId, suid);
1453

1454
_return:
1455
  (void)taosThreadMutexUnlock(pLock);
1456
  return code;
1457
}
1458

1459
int32_t metaGetCachedRefDbs(void* pVnode, tb_uid_t suid, SArray* pList) {
1460
  int32_t        code = TSDB_CODE_SUCCESS;
1461
  int32_t        line = 0;
1462
  SMeta*         pMeta = ((SVnode*)pVnode)->pMeta;
1463
  SHashObj*      pTableMap = pMeta->pCache->STbRefDbCache.pStbRefs;
1464
  TdThreadMutex* pLock = &pMeta->pCache->STbRefDbCache.lock;
1465

1466
  (void)taosThreadMutexLock(pLock);
1467

1468
  SHashObj** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
1469
  if (pEntry) {
1470
    void *iter = taosHashIterate(*pEntry, NULL);
1471
    while (iter != NULL) {
1472
      size_t   dbNameLen = 0;
1473
      char*    name = NULL;
1474
      char*    dbName = NULL;
1475
      name = taosHashGetKey(iter, &dbNameLen);
1476
      TSDB_CHECK_NULL(name, code, line, _return, terrno);
1477
      dbName = taosMemoryMalloc(dbNameLen + 1);
1478
      TSDB_CHECK_NULL(dbName, code, line, _return, terrno);
1479
      tstrncpy(dbName, name, dbNameLen + 1);
1480
      TSDB_CHECK_NULL(taosArrayPush(pList, &dbName), code, line, _return, terrno);
1481
      iter = taosHashIterate(*pEntry, iter);
1482
    }
1483
  }
1484

1485
_return:
1486
  if (code) {
1487
    metaError("%s failed at line %d since %s", __func__, line, tstrerror(code));
1488
  }
1489
  (void)taosThreadMutexUnlock(pLock);
1490
  return code;
1491
}
1492

1493
static int32_t addRefDbsCacheNewEntry(SHashObj* pRefDbs, uint64_t suid, SHashObj **pEntry) {
1494
  int32_t      code = TSDB_CODE_SUCCESS;
1495
  int32_t      lino = 0;
1496
  SHashObj*    p = NULL;
1497

1498
  p = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1499
  TSDB_CHECK_NULL(p, code, lino, _end, terrno);
1500

1501
  code = taosHashPut(pRefDbs, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
1502
  TSDB_CHECK_CODE(code, lino, _end);
1503

1504
  *pEntry = p;
1505

1506
_end:
1507
  if (code != TSDB_CODE_SUCCESS) {
1508
    metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1509
  }
1510
  return code;
1511
}
1512

1513
int32_t metaPutRefDbsToCache(void* pVnode, tb_uid_t suid, SArray* pList) {
1514
  int32_t        code = 0;
1515
  int32_t        line = 0;
1516
  SMeta*         pMeta = ((SVnode*)pVnode)->pMeta;
1517
  SHashObj*      pStbRefs = pMeta->pCache->STbRefDbCache.pStbRefs;
1518
  TdThreadMutex* pLock = &pMeta->pCache->STbRefDbCache.lock;
1519

1520
  (void)taosThreadMutexLock(pLock);
1521

1522
  SHashObj*  pEntry = NULL;
1523
  SHashObj** find = taosHashGet(pStbRefs, &suid, sizeof(uint64_t));
1524
  if (find == NULL) {
1525
    code = addRefDbsCacheNewEntry(pStbRefs, suid, &pEntry);
1526
    TSDB_CHECK_CODE(code, line, _return);
1527
  } else {  // check if it exists or not
1528
    pEntry = *find;
1529
  }
1530

1531
  for (int32_t i = 0; i < taosArrayGetSize(pList); i++) {
1532
    char* dbName = taosArrayGetP(pList, i);
1533
    void* pItem = taosHashGet(pEntry, dbName, strlen(dbName));
1534
    if (pItem == NULL) {
1535
      code = taosHashPut(pEntry, dbName, strlen(dbName), NULL, 0);
1536
      TSDB_CHECK_CODE(code, line, _return);
1537
    }
1538
  }
1539

1540
_return:
1541
  if (code) {
1542
    metaError("%s failed at line %d since %s", __func__, line, tstrerror(code));
1543
  }
1544
  (void)taosThreadMutexUnlock(pLock);
1545

1546
  return code;
1547
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc