• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4851

14 Nov 2025 08:06AM UTC coverage: 63.754% (+0.03%) from 63.728%
#4851

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

354 of 675 new or added lines in 18 files covered. (52.44%)

3145 existing lines in 113 files now uncovered.

149128 of 233910 relevant lines covered (63.75%)

117183401.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.0
/source/dnode/vnode/src/meta/metaCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "meta.h"
16

17
#ifdef TD_ENTERPRISE
18
extern const char* tkLogStb[];
19
extern const char* tkAuditStb[];
20
extern const int   tkLogStbNum;
21
extern const int   tkAuditStbNum;
22
#endif
23

24
#define TAG_FILTER_RES_KEY_LEN  32
25
#define META_CACHE_BASE_BUCKET  1024
26
#define META_CACHE_STATS_BUCKET 16
27

28
// (uid , suid) : child table
29
// (uid,     0) : normal table
30
// (suid, suid) : super table
31
typedef struct SMetaCacheEntry SMetaCacheEntry;
32
struct SMetaCacheEntry {
33
  SMetaCacheEntry* next;
34
  SMetaInfo        info;
35
};
36

37
typedef struct SMetaStbStatsEntry {
38
  struct SMetaStbStatsEntry* next;
39
  SMetaStbStats              info;
40
} SMetaStbStatsEntry;
41

42
typedef struct STagFilterResEntry {
43
  SHashObj *set;    // the set of md5 digest, extracted from the serialized tag query condition
44
  uint32_t hitTimes;  // queried times for current super table
45
} STagFilterResEntry;
46

47
typedef struct STagCondFilterEntry {
48
  SArray*   pColIds;   // SArray<col_id_t>
49
  SHashObj* set;       // SHashObj<digest, SArray<uid>>
50
  uint32_t  hitTimes;  // queried times for current tag filter condition
51
} STagCondFilterEntry;
52

53
typedef struct STagConds {
54
  SHashObj* set;       // SHashObj<tagColIdStr, STagCondFilterEntry>
55
  uint32_t  hitTimes;  // queried times for current super table
56
  uint32_t  numTagDataEntries; // total num of tag data entries in this stable
57
} STagConds;
58

59
struct SMetaCache {
60
  // child, normal, super, table entry cache
61
  struct SEntryCache {
62
    int32_t           nEntry;
63
    int32_t           nBucket;
64
    SMetaCacheEntry** aBucket;
65
  } sEntryCache;
66

67
  // stable stats cache
68
  struct SStbStatsCache {
69
    int32_t              nEntry;
70
    int32_t              nBucket;
71
    SMetaStbStatsEntry** aBucket;
72
  } sStbStatsCache;
73

74
  // query cache
75
  struct STagFilterResCache {
76
    TdThreadMutex lock;
77
    uint32_t      accTimes;
78
    SHashObj*     pTableEntry;
79
    SLRUCache*    pUidResCache;
80
  } sTagFilterResCache;
81

82
  // cache table list for tag filter conditions
83
  // that match format "tag1 = v1 AND tag2 = v2 AND ..."
84
  struct SStableTagFilterResCache {
85
    TdThreadRwlock rwlock;
86
    SHashObj*      pTableEntry;  // HashObj<suid, STagConds>
87
    // access times
88
    uint64_t       accTimes;
89
    // hit times
90
    uint64_t       hitTimes;
91
    // total num of tag data entries in all stables
92
    uint32_t       numTagDataEntries;
93
  } sStableTagFilterResCache;
94

95
  struct STbGroupResCache {
96
    TdThreadMutex lock;
97
    uint32_t      accTimes;
98
    SHashObj*     pTableEntry;
99
    SLRUCache*    pResCache;
100
  } STbGroupResCache;
101

102
  struct STbFilterCache {
103
    SHashObj* pStb;
104
    SHashObj* pStbName;
105
  } STbFilterCache;
106

107
  struct STbRefDbCache {
108
    TdThreadMutex lock;
109
    SHashObj*     pStbRefs; // key: suid, value: SHashObj<dbName, refTimes>
110
  } STbRefDbCache;
111
};
112

113
static void entryCacheClose(SMeta* pMeta) {
8,468,344✔
114
  if (pMeta->pCache) {
8,468,344✔
115
    // close entry cache
116
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
2,147,483,647✔
117
      SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
2,147,483,647✔
118
      while (pEntry) {
2,147,483,647✔
119
        SMetaCacheEntry* tEntry = pEntry->next;
76,392,748✔
120
        taosMemoryFree(pEntry);
76,392,748✔
121
        pEntry = tEntry;
76,393,634✔
122
      }
123
    }
124
    taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
8,468,344✔
125
  }
126
}
8,468,344✔
127

128
static void statsCacheClose(SMeta* pMeta) {
8,468,344✔
129
  if (pMeta->pCache) {
8,468,344✔
130
    // close entry cache
131
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sStbStatsCache.nBucket; iBucket++) {
149,503,042✔
132
      SMetaStbStatsEntry* pEntry = pMeta->pCache->sStbStatsCache.aBucket[iBucket];
141,034,698✔
133
      while (pEntry) {
147,416,434✔
134
        SMetaStbStatsEntry* tEntry = pEntry->next;
6,381,736✔
135
        taosMemoryFree(pEntry);
6,381,736✔
136
        pEntry = tEntry;
6,381,736✔
137
      }
138
    }
139
    taosMemoryFree(pMeta->pCache->sStbStatsCache.aBucket);
8,468,344✔
140
  }
141
}
8,468,344✔
142

143
static void freeCacheEntryFp(void* param) {
1,281✔
144
  STagFilterResEntry** p = param;
1,281✔
145
  taosHashCleanup((*p)->set);
1,281✔
146
  taosMemoryFreeClear(*p);
1,281✔
147
}
1,281✔
148

NEW
149
static void freeTagFilterEntryFp(void* param) {
×
NEW
150
  STagCondFilterEntry** p = param;
×
NEW
151
  taosArrayDestroy((*p)->pColIds);
×
NEW
152
  taosHashCleanup((*p)->set);
×
NEW
153
  taosMemoryFreeClear(*p);
×
NEW
154
}
×
155

NEW
156
static void freeTagCondsFp(void* param) {
×
NEW
157
  STagConds** p = param;
×
NEW
158
  taosHashCleanup((*p)->set);
×
NEW
159
  taosMemoryFreeClear(*p);
×
NEW
160
}
×
161

162
static void freeRefDbFp(void* param) {
24,192✔
163
  SHashObj** p = param;
24,192✔
164
  taosHashCleanup(*p);
24,192✔
165
  *p = NULL;
24,192✔
166
}
24,192✔
167

168
int32_t metaCacheOpen(SMeta* pMeta) {
8,455,220✔
169
  int32_t code = 0;
8,455,220✔
170
  int32_t lino;
171

172
  pMeta->pCache = (SMetaCache*)taosMemoryCalloc(1, sizeof(SMetaCache));
8,455,220✔
173
  if (pMeta->pCache == NULL) {
8,460,761✔
UNCOV
174
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
175
  }
176

177
  // open entry cache
178
  pMeta->pCache->sEntryCache.nEntry = 0;
8,462,099✔
179
  pMeta->pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
8,464,945✔
180
  pMeta->pCache->sEntryCache.aBucket =
8,480,370✔
181
      (SMetaCacheEntry**)taosMemoryCalloc(pMeta->pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
8,464,829✔
182
  if (pMeta->pCache->sEntryCache.aBucket == NULL) {
8,468,344✔
UNCOV
183
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
184
  }
185

186
  // open stats cache
187
  pMeta->pCache->sStbStatsCache.nEntry = 0;
8,467,690✔
188
  pMeta->pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
8,467,690✔
189
  pMeta->pCache->sStbStatsCache.aBucket =
8,478,151✔
190
      (SMetaStbStatsEntry**)taosMemoryCalloc(pMeta->pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
8,467,572✔
191
  if (pMeta->pCache->sStbStatsCache.aBucket == NULL) {
8,468,241✔
192
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
193
  }
194

195
  // open tag filter cache
196
  pMeta->pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
8,466,641✔
197
  if (pMeta->pCache->sTagFilterResCache.pUidResCache == NULL) {
8,468,344✔
198
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
199
  }
200

201
  pMeta->pCache->sTagFilterResCache.accTimes = 0;
8,468,344✔
202
  pMeta->pCache->sTagFilterResCache.pTableEntry =
16,936,688✔
203
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
16,924,662✔
204
  if (pMeta->pCache->sTagFilterResCache.pTableEntry == NULL) {
8,468,344✔
UNCOV
205
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
206
  }
207

208
  taosHashSetFreeFp(pMeta->pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
8,468,344✔
209
  (void)taosThreadMutexInit(&pMeta->pCache->sTagFilterResCache.lock, NULL);
8,468,040✔
210

211
  // open stable tag filter cache
212
  pMeta->pCache->sStableTagFilterResCache.accTimes = 0;
8,468,344✔
213
  pMeta->pCache->sStableTagFilterResCache.numTagDataEntries = 0;
8,468,040✔
214
  pMeta->pCache->sStableTagFilterResCache.pTableEntry =
16,936,384✔
215
    taosHashInit(1024,
16,924,358✔
216
      taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
217
  if (pMeta->pCache->sStableTagFilterResCache.pTableEntry == NULL) {
8,468,344✔
NEW
218
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
219
  }
220
  taosHashSetFreeFp(
8,468,344✔
221
    pMeta->pCache->sStableTagFilterResCache.pTableEntry, freeTagCondsFp);
8,468,344✔
222

223
  TAOS_UNUSED(taosThreadRwlockInit(
8,468,344✔
224
    &pMeta->pCache->sStableTagFilterResCache.rwlock, NULL));
225

226
  // open group res cache
227
  pMeta->pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
8,468,344✔
228
  if (pMeta->pCache->STbGroupResCache.pResCache == NULL) {
8,468,344✔
UNCOV
229
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
230
  }
231

232
  pMeta->pCache->STbGroupResCache.accTimes = 0;
8,468,344✔
233
  pMeta->pCache->STbGroupResCache.pTableEntry =
16,935,916✔
234
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
16,924,662✔
235
  if (pMeta->pCache->STbGroupResCache.pTableEntry == NULL) {
8,468,344✔
236
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
237
  }
238

239
  taosHashSetFreeFp(pMeta->pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
8,468,344✔
240
  (void)taosThreadMutexInit(&pMeta->pCache->STbGroupResCache.lock, NULL);
8,468,344✔
241

242
  // open filter cache
243
  pMeta->pCache->STbFilterCache.pStb =
16,936,688✔
244
      taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
16,924,662✔
245
  if (pMeta->pCache->STbFilterCache.pStb == NULL) {
8,468,344✔
UNCOV
246
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
247
  }
248

249
  pMeta->pCache->STbFilterCache.pStbName =
16,936,092✔
250
      taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
16,921,926✔
251
  if (pMeta->pCache->STbFilterCache.pStbName == NULL) {
8,464,869✔
UNCOV
252
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
253
  }
254

255
  // open ref db cache
256
  pMeta->pCache->STbRefDbCache.pStbRefs =
16,936,092✔
257
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
16,923,470✔
258
  if (pMeta->pCache->STbRefDbCache.pStbRefs == NULL) {
8,468,344✔
UNCOV
259
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
260
  }
261

262
  taosHashSetFreeFp(pMeta->pCache->STbRefDbCache.pStbRefs, freeRefDbFp);
8,468,344✔
263
  (void)taosThreadMutexInit(&pMeta->pCache->STbRefDbCache.lock, NULL);
8,468,344✔
264

265

266
_exit:
8,468,344✔
267
  if (code) {
8,468,344✔
UNCOV
268
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
UNCOV
269
    metaCacheClose(pMeta);
×
270
  } else {
271
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
8,468,344✔
272
  }
273
  return code;
8,468,344✔
274
}
275

276
void metaCacheClose(SMeta* pMeta) {
8,468,344✔
277
  if (pMeta->pCache) {
8,468,344✔
278
    entryCacheClose(pMeta);
8,468,344✔
279
    statsCacheClose(pMeta);
8,468,344✔
280

281
    taosHashClear(pMeta->pCache->sTagFilterResCache.pTableEntry);
8,468,344✔
282
    taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
8,468,344✔
283
    (void)taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
8,468,044✔
284
    taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
8,468,344✔
285

286
    (void)taosThreadRwlockDestroy(&pMeta->pCache->sStableTagFilterResCache.rwlock);
8,468,344✔
287
    taosHashCleanup(pMeta->pCache->sStableTagFilterResCache.pTableEntry);
8,468,344✔
288

289
    taosHashClear(pMeta->pCache->STbGroupResCache.pTableEntry);
8,468,344✔
290
    taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
8,468,344✔
291
    (void)taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
8,468,344✔
292
    taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);
8,468,344✔
293

294
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStb);
8,468,344✔
295
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStbName);
8,468,344✔
296

297
    taosHashClear(pMeta->pCache->STbRefDbCache.pStbRefs);
8,468,344✔
298
    (void)taosThreadMutexDestroy(&pMeta->pCache->STbRefDbCache.lock);
8,468,344✔
299
    taosHashCleanup(pMeta->pCache->STbRefDbCache.pStbRefs);
8,468,344✔
300

301
    taosMemoryFree(pMeta->pCache);
8,468,344✔
302
    pMeta->pCache = NULL;
8,468,344✔
303
  }
304
}
8,468,344✔
305

306
static void metaRehashCache(SMetaCache* pCache, int8_t expand) {
22,739✔
307
  int32_t code = 0;
22,739✔
308
  int32_t nBucket;
309

310
  if (expand) {
22,739✔
311
    nBucket = pCache->sEntryCache.nBucket * 2;
22,739✔
312
  } else {
UNCOV
313
    nBucket = pCache->sEntryCache.nBucket / 2;
×
314
  }
315

316
  SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
22,739✔
317
  if (aBucket == NULL) {
22,739✔
UNCOV
318
    return;
×
319
  }
320

321
  // rehash
322
  for (int32_t iBucket = 0; iBucket < pCache->sEntryCache.nBucket; iBucket++) {
68,009,171✔
323
    SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
67,986,432✔
324

325
    while (pEntry) {
135,972,864✔
326
      SMetaCacheEntry* pTEntry = pEntry->next;
67,986,432✔
327

328
      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
67,986,432✔
329
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
67,986,432✔
330

331
      pEntry = pTEntry;
67,986,432✔
332
    }
333
  }
334

335
  // final set
336
  taosMemoryFree(pCache->sEntryCache.aBucket);
22,739✔
337
  pCache->sEntryCache.nBucket = nBucket;
22,739✔
338
  pCache->sEntryCache.aBucket = aBucket;
22,739✔
339
  return;
22,739✔
340
}
341

342
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
99,552,394✔
343
  int32_t code = 0;
99,552,394✔
344

345
  // meta is wlocked for calling this func.
346

347
  // search
348
  SMetaCache*       pCache = pMeta->pCache;
99,552,394✔
349
  int32_t           iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
99,577,201✔
350
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
99,552,070✔
351
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
114,026,994✔
352
    ppEntry = &(*ppEntry)->next;
14,497,531✔
353
  }
354

355
  if (*ppEntry) {  // update
99,496,802✔
356
    if (pInfo->suid != (*ppEntry)->info.suid) {
20,226,639✔
UNCOV
357
      metaError("meta/cache: suid should be same as the one in cache.");
×
UNCOV
358
      return TSDB_CODE_INVALID_PARA;
×
359
    }
360
    if (pInfo->version > (*ppEntry)->info.version) {
20,184,119✔
361
      (*ppEntry)->info.version = pInfo->version;
20,200,434✔
362
      (*ppEntry)->info.skmVer = pInfo->skmVer;
20,177,406✔
363
    }
364
  } else {  // insert
365
    if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
79,292,571✔
366
      metaRehashCache(pCache, 1);
22,739✔
367

368
      iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
22,739✔
369
    }
370

371
    SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
79,345,536✔
372
    if (pEntryNew == NULL) {
79,338,440✔
UNCOV
373
      code = terrno;
×
374
      goto _exit;
×
375
    }
376

377
    pEntryNew->info = *pInfo;
79,338,440✔
378
    pEntryNew->next = pCache->sEntryCache.aBucket[iBucket];
79,333,803✔
379
    pCache->sEntryCache.aBucket[iBucket] = pEntryNew;
79,306,479✔
380
    pCache->sEntryCache.nEntry++;
79,315,444✔
381
  }
382

383
_exit:
99,531,756✔
384
  return code;
99,531,756✔
385
}
386

387
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
3,009,286✔
388
  int32_t code = 0;
3,009,286✔
389

390
  SMetaCache*       pCache = pMeta->pCache;
3,009,286✔
391
  int32_t           iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
3,010,127✔
392
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
3,010,602✔
393
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
3,019,881✔
394
    ppEntry = &(*ppEntry)->next;
11,872✔
395
  }
396

397
  SMetaCacheEntry* pEntry = *ppEntry;
3,007,785✔
398
  if (pEntry) {
3,009,292✔
399
    *ppEntry = pEntry->next;
2,975,058✔
400
    taosMemoryFree(pEntry);
2,975,897✔
401
    pCache->sEntryCache.nEntry--;
2,975,433✔
402
    if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
2,975,999✔
403
        pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
2,908,873✔
UNCOV
404
      metaRehashCache(pCache, 0);
×
405
    }
406
  } else {
407
    code = TSDB_CODE_NOT_FOUND;
34,234✔
408
  }
409

410
_exit:
3,009,999✔
411
  return code;
3,009,999✔
412
}
413

414
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
1,612,494,220✔
415
  int32_t code = 0;
1,612,494,220✔
416

417
  SMetaCache*      pCache = pMeta->pCache;
1,612,494,220✔
418
  int32_t          iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
1,612,641,790✔
419
  SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
1,612,637,790✔
420

421
  while (pEntry && pEntry->info.uid != uid) {
1,655,923,223✔
422
    pEntry = pEntry->next;
43,267,458✔
423
  }
424

425
  if (pEntry) {
1,612,641,785✔
426
    if (pInfo) {
1,524,562,275✔
427
      *pInfo = pEntry->info;
1,524,553,008✔
428
    }
429
  } else {
430
    code = TSDB_CODE_NOT_FOUND;
88,079,510✔
431
  }
432

433
  return code;
1,612,654,953✔
434
}
435

436
static int32_t metaRehashStatsCache(SMetaCache* pCache, int8_t expand) {
123,491✔
437
  int32_t code = 0;
123,491✔
438
  int32_t nBucket;
439

440
  if (expand) {
123,491✔
441
    nBucket = pCache->sStbStatsCache.nBucket * 2;
115,780✔
442
  } else {
443
    nBucket = pCache->sStbStatsCache.nBucket / 2;
7,711✔
444
  }
445

446
  SMetaStbStatsEntry** aBucket = (SMetaStbStatsEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaStbStatsEntry*));
123,491✔
447
  if (aBucket == NULL) {
123,491✔
UNCOV
448
    code = terrno;
×
UNCOV
449
    goto _exit;
×
450
  }
451

452
  // rehash
453
  for (int32_t iBucket = 0; iBucket < pCache->sStbStatsCache.nBucket; iBucket++) {
6,197,299✔
454
    SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
6,073,808✔
455

456
    while (pEntry) {
11,873,961✔
457
      SMetaStbStatsEntry* pTEntry = pEntry->next;
5,800,153✔
458

459
      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
5,800,153✔
460
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
5,800,153✔
461

462
      pEntry = pTEntry;
5,800,153✔
463
    }
464
  }
465

466
  // final set
467
  taosMemoryFree(pCache->sStbStatsCache.aBucket);
123,491✔
468
  pCache->sStbStatsCache.nBucket = nBucket;
123,491✔
469
  pCache->sStbStatsCache.aBucket = aBucket;
123,491✔
470

471
_exit:
123,491✔
472
  return code;
123,491✔
473
}
474

475
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
67,045,503✔
476
  int32_t code = 0;
67,045,503✔
477

478
  // meta is wlocked for calling this func.
479

480
  // search
481
  SMetaCache*          pCache = pMeta->pCache;
67,045,503✔
482
  int32_t              iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
67,056,510✔
483
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
67,050,179✔
484
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
70,570,153✔
485
    ppEntry = &(*ppEntry)->next;
3,543,366✔
486
  }
487

488
  if (*ppEntry) {  // update
67,025,790✔
489
    (*ppEntry)->info.ctbNum = pInfo->ctbNum;
59,900,039✔
490
    (*ppEntry)->info.colNum = pInfo->colNum;
59,922,986✔
491
    (*ppEntry)->info.flags = pInfo->flags;
59,920,456✔
492
    (*ppEntry)->info.keep = pInfo->keep;
59,923,425✔
493
  } else {  // insert
494
    if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
7,127,969✔
495
      TAOS_UNUSED(metaRehashStatsCache(pCache, 1));
115,780✔
496
      iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
115,780✔
497
    }
498

499
    SMetaStbStatsEntry* pEntryNew = (SMetaStbStatsEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
7,118,291✔
500
    if (pEntryNew == NULL) {
7,118,456✔
UNCOV
501
      code = terrno;
×
UNCOV
502
      goto _exit;
×
503
    }
504

505
    pEntryNew->info = *pInfo;
7,118,456✔
506
    pEntryNew->next = pCache->sStbStatsCache.aBucket[iBucket];
7,118,273✔
507
    pCache->sStbStatsCache.aBucket[iBucket] = pEntryNew;
7,118,021✔
508
    pCache->sStbStatsCache.nEntry++;
7,118,014✔
509
  }
510

511
_exit:
67,013,926✔
512
  return code;
67,013,926✔
513
}
514

515
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
1,110,412✔
516
  int32_t code = 0;
1,110,412✔
517

518
  SMetaCache*          pCache = pMeta->pCache;
1,110,412✔
519
  int32_t              iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
1,111,196✔
520
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
1,111,009✔
521
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
1,247,543✔
522
    ppEntry = &(*ppEntry)->next;
138,259✔
523
  }
524

525
  SMetaStbStatsEntry* pEntry = *ppEntry;
1,109,750✔
526
  if (pEntry) {
1,111,196✔
527
    *ppEntry = pEntry->next;
736,908✔
528
    taosMemoryFree(pEntry);
736,908✔
529
    pCache->sStbStatsCache.nEntry--;
736,908✔
530
    if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
736,908✔
531
        pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
518,151✔
532
      TAOS_UNUSED(metaRehashStatsCache(pCache, 0));
7,711✔
533
    }
534
  } else {
535
    code = TSDB_CODE_NOT_FOUND;
374,288✔
536
  }
537

538
_exit:
1,111,009✔
539
  return code;
1,111,009✔
540
}
541

542
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
267,150,962✔
543
  int32_t code = TSDB_CODE_SUCCESS;
267,150,962✔
544

545
  SMetaCache*         pCache = pMeta->pCache;
267,150,962✔
546
  int32_t             iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
267,196,664✔
547
  SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
267,190,826✔
548

549
  while (pEntry && pEntry->info.uid != uid) {
301,906,384✔
550
    pEntry = pEntry->next;
34,691,623✔
551
  }
552

553
  if (pEntry) {
267,215,520✔
554
    if (pInfo) {
249,397,135✔
555
      *pInfo = pEntry->info;
249,394,654✔
556
    }
557
  } else {
558
    code = TSDB_CODE_NOT_FOUND;
17,818,385✔
559
  }
560

561
  return code;
267,219,284✔
562
}
563

564
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
565
  memcpy(&pBuf[2], key, keyLen);
139,498,403✔
566
}
139,461,409✔
567

568
// the format of key:
569
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
570
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
139,488,685✔
571
  buf[0] = (uint64_t)pHashMap;
139,488,685✔
572
  buf[1] = suid;
139,493,274✔
573
  setMD5DigestInKey(buf, key, keyLen);
574
}
139,497,549✔
575

576
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
8,113✔
577
                                  bool* acquireRes) {
578
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
8,113✔
579
  int32_t vgId = TD_VID(pMeta->pVnode);
8,113✔
580

581
  // generate the composed key for LRU cache
582
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
8,113✔
583
  SHashObj*      pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
8,113✔
584
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
8,113✔
585

586
  *acquireRes = 0;
8,113✔
587
  uint64_t key[4];
8,113✔
588
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
8,113✔
589
  
590
  // void* tmp = NULL;
591
  // uint32_t len = 0;
592
  // (void)taosAscii2Hex((const char*)key, 32, &tmp, &len);
593
  // qDebug("metaGetCachedTableUidList %p %"PRId64" key: %s", pTableMap, suid, tmp);
594
  // taosMemoryFree(tmp);
595

596
  (void)taosThreadMutexLock(pLock);
8,113✔
597
  pMeta->pCache->sTagFilterResCache.accTimes += 1;
8,113✔
598

599
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
8,113✔
600
  if (pHandle == NULL) {
8,113✔
601
    (void)taosThreadMutexUnlock(pLock);
2,989✔
602
    return TSDB_CODE_SUCCESS;
2,989✔
603
  }
604

605
  // do some book mark work after acquiring the filter result from cache
606
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
5,124✔
607
  if (NULL == pEntry) {
5,124✔
UNCOV
608
    metaError("meta/cache: pEntry should not be NULL.");
×
UNCOV
609
    return TSDB_CODE_NOT_FOUND;
×
610
  }
611

612
  *acquireRes = 1;
5,124✔
613

614
  const char* p = taosLRUCacheValue(pCache, pHandle);
5,124✔
615
  int32_t     size = *(int32_t*)p;
5,124✔
616

617
  // set the result into the buffer
618
  if (taosArrayAddBatch(pList1, p + sizeof(int32_t), size) == NULL) {
5,124✔
UNCOV
619
    return terrno;
×
620
  }
621

622
  (*pEntry)->hitTimes += 1;
5,124✔
623

624
  uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
5,124✔
625
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
5,124✔
UNCOV
626
    metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
×
627
             ((double)(*pEntry)->hitTimes) / acc);
628
  }
629

630
  bool ret = taosLRUCacheRelease(pCache, pHandle, false);
5,124✔
631

632
  // unlock meta
633
  (void)taosThreadMutexUnlock(pLock);
5,124✔
634
  return TSDB_CODE_SUCCESS;
5,124✔
635
}
636

NEW
637
int32_t metaStableTagFilterCacheGet(void* pVnode, tb_uid_t suid,
×
638
  const uint8_t* pTagCondKey, int32_t tagCondKeyLen,
639
  const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) {
640

NEW
641
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
642
  int32_t lino = 0;
×
NEW
643
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
NEW
644
  int32_t vgId = TD_VID(pMeta->pVnode);
×
NEW
645
  *acquireRes = 0;
×
646

647
  // generate the composed key for LRU cache
NEW
648
  SHashObj*       pTableMap = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
×
NEW
649
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
×
650

NEW
651
  code = taosThreadRwlockRdlock(pRwlock);
×
NEW
652
  TSDB_CHECK_CODE(code, lino, _end);
×
NEW
653
  pMeta->pCache->sStableTagFilterResCache.accTimes += 1;
×
654

655
  STagConds** pTagConds =
NEW
656
    (STagConds**)taosHashGet(pTableMap, &suid, sizeof(tb_uid_t));
×
NEW
657
  TSDB_CHECK_NULL(pTagConds, code, lino, _end, TSDB_CODE_SUCCESS);
×
658

NEW
659
  STagCondFilterEntry** pFilterEntry = (STagCondFilterEntry**)taosHashGet(
×
NEW
660
    (*pTagConds)->set, pTagCondKey, tagCondKeyLen);
×
NEW
661
  TSDB_CHECK_NULL(pFilterEntry, code, lino, _end, TSDB_CODE_SUCCESS);
×
662

NEW
663
  SArray** pArray = (SArray**)taosHashGet((*pFilterEntry)->set, pKey, keyLen);
×
NEW
664
  TSDB_CHECK_NULL(pArray, code, lino, _end, TSDB_CODE_SUCCESS);
×
665

666
  // set the result into the buffer
NEW
667
  *acquireRes = 1;
×
NEW
668
  TAOS_UNUSED(taosArrayAddBatch(
×
669
    pList1, TARRAY_GET_ELEM(*pArray, 0), taosArrayGetSize(*pArray)));
670

671
  // do some bookmark work after acquiring the filter result from cache
NEW
672
  (*pTagConds)->hitTimes += 1;
×
NEW
673
  (*pFilterEntry)->hitTimes += 1;
×
NEW
674
  uint64_t hit = ++pMeta->pCache->sStableTagFilterResCache.hitTimes;
×
NEW
675
  uint64_t acc = pMeta->pCache->sStableTagFilterResCache.accTimes;
×
NEW
676
  if ((*pTagConds)->hitTimes % 1000 == 0 && (*pTagConds)->hitTimes > 0) {
×
NEW
677
    metaInfo(
×
678
      "vgId:%d, suid:%" PRIu64 
679
      ", current stable cache hit:%" PRIu32 ", this tag condition hit:%" PRIu32
680
      ", total cache hit:%" PRIu64 ", acc:%" PRIu64 ", hit rate:%.2f%%",
681
      vgId, suid, (*pTagConds)->hitTimes, (*pFilterEntry)->hitTimes, hit, acc,
682
      ((double)hit / acc * 100));
683
  }
684

NEW
685
_end:
×
NEW
686
  if (TSDB_CODE_SUCCESS != code) {
×
NEW
687
    metaError("vgId:%d, %s failed at %s:%d since %s",
×
688
      vgId, __func__, __FILE__, lino, tstrerror(code));
689
  }
690
  // unlock meta
NEW
691
  code = taosThreadRwlockUnlock(pRwlock);
×
NEW
692
  if (TSDB_CODE_SUCCESS != code) {
×
NEW
693
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
×
694
      vgId, __func__, __FILE__, lino, tstrerror(code));
695
  }
NEW
696
  return code;
×
697
}
698

699
static void freeUidCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
2,989✔
700
  (void)ud;
701
  if (value == NULL) {
2,989✔
UNCOV
702
    return;
×
703
  }
704

705
  const uint64_t* p = key;
2,989✔
706
  if (keyLen != sizeof(int64_t) * 4) {
2,989✔
UNCOV
707
    metaError("key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
×
UNCOV
708
    return;
×
709
  }
710

711
  SHashObj* pHashObj = (SHashObj*)p[0];
2,989✔
712

713
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
2,989✔
714

715
  if (pEntry != NULL && (*pEntry) != NULL) {
2,989✔
716
    int64_t st = taosGetTimestampUs();
854✔
717
    int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
854✔
718
    if (code == TSDB_CODE_SUCCESS) {
854✔
719
      double el = (taosGetTimestampUs() - st) / 1000.0;
854✔
720
      metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", taosHashGetSize((*pEntry)->set),
854✔
721
               el);
722
    }
723
  }
724

725
  taosMemoryFree(value);
2,989✔
726
}
727

728
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
1,281✔
729
  int32_t             code = TSDB_CODE_SUCCESS;
1,281✔
730
  int32_t             lino = 0;
1,281✔
731
  STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
1,281✔
732
  TSDB_CHECK_NULL(p, code, lino, _end, terrno);
1,281✔
733

734
  p->hitTimes = 0;
1,281✔
735
  p->set = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,281✔
736
  TSDB_CHECK_NULL(p->set, code, lino, _end, terrno);
1,281✔
737
  code = taosHashPut(p->set, pKey, keyLen, NULL, 0);
1,281✔
738
  TSDB_CHECK_CODE(code, lino, _end);
1,281✔
739
  code = taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
1,281✔
740
  TSDB_CHECK_CODE(code, lino, _end);
1,281✔
741

742
_end:
1,281✔
743
  if (code != TSDB_CODE_SUCCESS) {
1,281✔
744
    metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
745
    if (p != NULL) {
×
UNCOV
746
      if (p->set != NULL) {
×
UNCOV
747
        taosHashCleanup(p->set);
×
748
      }
UNCOV
749
      taosMemoryFree(p);
×
750
    }
751
  }
752
  return code;
1,281✔
753
}
754

755
// check both the payload size and selectivity ratio
756
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
2,989✔
757
                              int32_t payloadLen, double selectivityRatio) {
758
  int32_t code = 0;
2,989✔
759
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
2,989✔
760
  int32_t vgId = TD_VID(pMeta->pVnode);
2,989✔
761

762
  if (selectivityRatio > tsSelectivityRatio) {
2,989✔
763
    metaDebug("vgId:%d, suid:%" PRIu64
×
764
              " failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
765
              vgId, suid, selectivityRatio, tsSelectivityRatio);
766
    taosMemoryFree(pPayload);
×
767
    return TSDB_CODE_SUCCESS;
×
768
  }
769

770
  if (payloadLen > tsTagFilterResCacheSize) {
2,989✔
771
    metaDebug("vgId:%d, suid:%" PRIu64
×
772
              " failed to add to uid list cache, due to payload length %d greater than threshold %d",
773
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
774
    taosMemoryFree(pPayload);
×
775
    return TSDB_CODE_SUCCESS;
×
776
  }
777

778
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
2,989✔
779
  SHashObj*      pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
2,989✔
780
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
2,989✔
781

782
  uint64_t key[4] = {0};
2,989✔
783
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
2,989✔
784

785
  // void* tmp = NULL;
786
  // uint32_t len = 0;
787
  // (void)taosAscii2Hex((const char*)key, 32, &tmp, &len);
788
  // qDebug("metaUidFilterCachePut %p %"PRId64" key: %s", pTableEntry, suid, tmp);
789
  // taosMemoryFree(tmp);
790

791
  (void)taosThreadMutexLock(pLock);
2,989✔
792
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
2,989✔
793
  if (pEntry == NULL) {
2,989✔
794
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
1,281✔
795
    if (code != TSDB_CODE_SUCCESS) {
1,281✔
UNCOV
796
      goto _end;
×
797
    }
798
  } else {  // check if it exists or not
799
    code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
1,708✔
800
    if (code == TSDB_CODE_DUP_KEY) {
1,708✔
801
      // we have already found the existed items, no need to added to cache anymore.
UNCOV
802
      (void)taosThreadMutexUnlock(pLock);
×
803
      return TSDB_CODE_SUCCESS;
×
804
    }
805
    if (code != TSDB_CODE_SUCCESS) {
1,708✔
UNCOV
806
      goto _end;
×
807
    }
808
  }
809

810
  // add to cache.
811
  (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL, NULL,
2,989✔
812
                           TAOS_LRU_PRIORITY_LOW, NULL);
813
_end:
2,989✔
814
  (void)taosThreadMutexUnlock(pLock);
2,989✔
815
  metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
2,989✔
816
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
817

818
  return code;
2,989✔
819
}
820

NEW
821
static void freeSArrayPtr(void* pp) {
×
NEW
822
  SArray* pArray = *(SArray**)pp;
×
NEW
823
  taosArrayDestroy(pArray);
×
NEW
824
}
×
825

NEW
826
int32_t metaStableTagFilterCachePut(
×
827
  void* pVnode, uint64_t suid, const void* pTagCondKey, int32_t tagCondKeyLen,
828
  const void* pKey, int32_t keyLen, SArray* pUidList, SArray** pTagColIds) {
829

NEW
830
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
831
  int32_t lino = 0;
×
NEW
832
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
NEW
833
  int32_t vgId = TD_VID(pMeta->pVnode);
×
834

NEW
835
  SHashObj*       pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
×
NEW
836
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
×
837

NEW
838
  code = taosThreadRwlockWrlock(pRwlock);
×
NEW
839
  TSDB_CHECK_CODE(code, lino, _end);
×
840

841
  STagConds** pTagConds = 
NEW
842
    (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
×
NEW
843
  if (pTagConds == NULL) {
×
844
    // add new (suid -> tag conds) entry
NEW
845
    STagConds* pEntry = (STagConds*)taosMemoryMalloc(sizeof(STagConds));
×
NEW
846
    TSDB_CHECK_NULL(pEntry, code, lino, _end, terrno);
×
847

NEW
848
    pEntry->hitTimes = 0;
×
NEW
849
    pEntry->set = taosHashInit(
×
850
      1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
851
      false, HASH_NO_LOCK);
NEW
852
    taosHashSetFreeFp(pEntry->set, freeTagFilterEntryFp);
×
NEW
853
    TSDB_CHECK_NULL(pEntry->set, code, lino, _end, terrno);
×
854

NEW
855
    code = taosHashPut(
×
856
      pTableEntry, &suid, sizeof(uint64_t), &pEntry, POINTER_BYTES);
NEW
857
    TSDB_CHECK_CODE(code, lino, _end);
×
858

NEW
859
    pTagConds = (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
×
NEW
860
    TSDB_CHECK_NULL(pTagConds, code, lino, _end, TSDB_CODE_NOT_FOUND);
×
861
  }
862

863
  STagCondFilterEntry** pFilterEntry =
NEW
864
    (STagCondFilterEntry**)taosHashGet(
×
NEW
865
      (*pTagConds)->set, pTagCondKey, tagCondKeyLen);
×
NEW
866
  if (pFilterEntry == NULL) {
×
867
    // add new (tag cond -> filter entry) entry
NEW
868
    STagCondFilterEntry* pEntry = 
×
NEW
869
      (STagCondFilterEntry*)taosMemoryMalloc(sizeof(STagCondFilterEntry));
×
NEW
870
    TSDB_CHECK_NULL(pEntry, code, lino, _end, terrno);
×
871

NEW
872
    pEntry->hitTimes = 0;
×
NEW
873
    pEntry->set = taosHashInit(
×
874
      1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
875
      false, HASH_NO_LOCK);
NEW
876
    TSDB_CHECK_NULL(pEntry->set, code, lino, _end, terrno);
×
NEW
877
    taosHashSetFreeFp(pEntry->set, freeSArrayPtr);
×
NEW
878
    pEntry->pColIds = *pTagColIds;
×
NEW
879
    *pTagColIds = NULL;
×
880

NEW
881
    code = taosHashPut(
×
NEW
882
      (*pTagConds)->set, pTagCondKey, tagCondKeyLen, &pEntry, POINTER_BYTES);
×
NEW
883
    TSDB_CHECK_CODE(code, lino, _end);
×
884

NEW
885
    pFilterEntry = (STagCondFilterEntry**)taosHashGet(
×
NEW
886
      (*pTagConds)->set, pTagCondKey, tagCondKeyLen);
×
887
  } else {
888
    // pColIds is already set, so we can destroy the new one
NEW
889
    taosArrayDestroy(*pTagColIds);
×
NEW
890
    *pTagColIds = NULL;
×
891
  }
892

893
  // add to cache.
NEW
894
  SArray* pPayload = taosArrayDup(pUidList, NULL);
×
NEW
895
  code = taosHashPut(
×
NEW
896
    (*pFilterEntry)->set, pKey, keyLen, &pPayload, POINTER_BYTES);
×
NEW
897
  TSDB_CHECK_CODE(code, lino, _end);
×
NEW
898
  pMeta->pCache->sStableTagFilterResCache.numTagDataEntries += 1;
×
NEW
899
  (*pTagConds)->numTagDataEntries += 1;
×
900

NEW
901
_end:
×
NEW
902
  if (TSDB_CODE_SUCCESS != code) {
×
NEW
903
    metaError("vgId:%d, %s failed at %s:%d since %s",
×
904
      vgId, __func__, __FILE__, lino, tstrerror(code));
905
  } else {
NEW
906
    metaInfo("vgId:%d, suid:%" PRIu64 " new tag data filter entry added, "
×
907
      "uid num:%d, current stable tag conditions num:%d, "
908
      "this tag condition data entries num:%d, "
909
      "cache stable num:%d, total tag data entries num:%" PRIu32 ", "
910
      "total tag data entries num:%" PRIu32,
911
      vgId, suid, (int32_t)taosArrayGetSize(pUidList), 
912
      pTagConds ? (int32_t)taosHashGetSize((*pTagConds)->set) : 0,
913
      pFilterEntry ? (int32_t)taosHashGetSize((*pFilterEntry)->set) : 0,
914
      (int32_t)taosHashGetSize(pTableEntry),
915
      pMeta->pCache->sStableTagFilterResCache.numTagDataEntries,
916
      (*pTagConds)->numTagDataEntries);
917
  }
918
  // unlock meta
NEW
919
  code = taosThreadRwlockUnlock(pRwlock);
×
NEW
920
  if (TSDB_CODE_SUCCESS != code) {
×
NEW
921
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
×
922
      vgId, __func__, __FILE__, lino, tstrerror(code));
923
  }
924

NEW
925
  return code;
×
926
}
927

928
// drop all the cache entries for a super table 
NEW
929
int32_t metaStableTagFilterCacheDropSTable(
×
930
  SMeta* pMeta, tb_uid_t suid) {
NEW
931
  if (pMeta == NULL) {
×
NEW
932
    return TSDB_CODE_INVALID_PARA;
×
933
  }
NEW
934
  int32_t   lino = 0;
×
NEW
935
  int32_t   code = TSDB_CODE_SUCCESS;
×
NEW
936
  SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
×
NEW
937
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
×
938

NEW
939
  code = taosThreadRwlockWrlock(pRwlock);
×
NEW
940
  TSDB_CHECK_CODE(code, lino, _end);
×
NEW
941
  STagConds** pTagConds = taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t));
×
NEW
942
  if (pTagConds != NULL) {
×
NEW
943
    pMeta->pCache->sStableTagFilterResCache.
×
NEW
944
      numTagDataEntries -= (*pTagConds)->numTagDataEntries;
×
945
  }
NEW
946
  code = taosHashRemove(pTableEntry, &suid, sizeof(tb_uid_t));
×
NEW
947
  TSDB_CHECK_CODE(code, lino, _end);
×
948

NEW
949
_end:
×
NEW
950
  if (TSDB_CODE_SUCCESS != code) {
×
NEW
951
    metaError("vgId:%d, %s failed at %s:%d since %s",
×
952
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
953
  } else {
NEW
954
    metaDebug(
×
955
      "vgId:%d, suid:%" PRIu64 " stable tag filter cache dropped from cache"
956
      "left stable num:%d, tag conditions num:%" PRIu32,
957
      TD_VID(pMeta->pVnode), suid, (int32_t)taosHashGetSize(pTableEntry),
958
      pMeta->pCache->sStableTagFilterResCache.numTagDataEntries);
959
  }
NEW
960
  code = taosThreadRwlockUnlock(pRwlock);
×
NEW
961
  if (TSDB_CODE_SUCCESS != code) {
×
NEW
962
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
×
963
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
964
  }
NEW
965
  return code;
×
966
}
967

NEW
968
static int32_t getTagColSize(
×
969
  const SSchema* pTagSchemas, int32_t nTagCols, col_id_t cid) {
NEW
970
  for (int32_t i = 0; i < nTagCols; i++) {
×
NEW
971
    if (pTagSchemas[i].colId == cid) {
×
NEW
972
      return pTagSchemas[i].bytes;
×
973
    }
974
  }
NEW
975
  return 0;
×
976
}
977

978
// when encode nchar tag into tag data entry key, need to convert it to var type
979
static FORCE_INLINE int32_t ncharToVar(char *pData, int32_t nData, char **ppOut) {
NEW
980
  int32_t code = TSDB_CODE_SUCCESS;
×
981

NEW
982
  char *t = taosMemoryCalloc(1, nData + VARSTR_HEADER_SIZE);
×
NEW
983
  if (NULL == t) {
×
NEW
984
    return terrno;
×
985
  }
NEW
986
  int32_t len = taosUcs4ToMbs(
×
987
    (TdUcs4 *)pData, nData, varDataVal(t), NULL);
NEW
988
  if (len < 0) {
×
NEW
989
    taosMemoryFree(t);
×
NEW
990
    return TSDB_CODE_SCALAR_CONVERT_ERROR;
×
991
  }
NEW
992
  varDataSetLen(t, len);
×
993

NEW
994
  *ppOut = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
×
NEW
995
  memcpy(*ppOut, t, len + VARSTR_HEADER_SIZE);
×
996

NEW
997
_return:
×
NEW
998
  taosMemoryFree(t);
×
NEW
999
  return code;
×
1000
}
1001

NEW
1002
static int32_t buildTagDataEntryKey(const SArray* pColIds, const STag* pTag,
×
1003
  const SSchemaWrapper* pTagScheam, T_MD5_CTX* pContext) {
NEW
1004
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
1005
  int32_t lino = 0;
×
NEW
1006
  int32_t keyLen = 0;
×
NEW
1007
  char* pKey = NULL;
×
1008
  // get length first
NEW
1009
  for (int32_t i = 0; i < taosArrayGetSize(pColIds); i++) {
×
NEW
1010
    STagVal pTagValue = {.cid = *(col_id_t*)taosArrayGet(pColIds, i)};
×
NEW
1011
    if (tTagGet(pTag, &pTagValue)) {
×
NEW
1012
      keyLen += sizeof(col_id_t);
×
NEW
1013
      if (IS_VAR_DATA_TYPE(pTagValue.type)) {
×
NEW
1014
        int32_t varLen = getTagColSize(
×
NEW
1015
          pTagScheam->pSchema, pTagScheam->nCols, pTagValue.cid);
×
NEW
1016
        code = varLen > 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_NOT_FOUND;
×
NEW
1017
        QUERY_CHECK_CODE(code, lino, _end);
×
NEW
1018
        keyLen += varLen;
×
1019
      } else {
NEW
1020
        keyLen += tDataTypes[pTagValue.type].bytes;
×
1021
      }
1022
    } else {
1023
      // tag value not found
NEW
1024
      code = TSDB_CODE_NOT_FOUND;
×
NEW
1025
      QUERY_CHECK_CODE(code, lino, _end);
×
1026
    }
1027
  }
1028

NEW
1029
  pKey = taosMemoryCalloc(1, keyLen);
×
NEW
1030
  if (NULL == pKey) {
×
NEW
1031
    code = terrno;
×
NEW
1032
    return code;
×
1033
  }
1034

1035
  // build the key
NEW
1036
  char* pStart = pKey;
×
NEW
1037
  for (int32_t i = 0; i < taosArrayGetSize(pColIds); i++) {
×
NEW
1038
    STagVal pTagValue = {.cid = *(col_id_t*)taosArrayGet(pColIds, i)};
×
NEW
1039
    if (tTagGet(pTag, &pTagValue)) {
×
1040
      // copy cid
NEW
1041
      memcpy(pStart, &pTagValue.cid, sizeof(col_id_t));
×
NEW
1042
      pStart += sizeof(col_id_t);
×
1043
      // copy value
NEW
1044
      if (IS_VAR_DATA_TYPE(pTagValue.type) && pTagValue.pData != NULL) {
×
NEW
1045
        if (TSDB_DATA_TYPE_NCHAR == pTagValue.type) {
×
1046
          // need to convert nchar to var
NEW
1047
          char *pVar = NULL;
×
NEW
1048
          code = ncharToVar((char *)pTagValue.pData, pTagValue.nData, &pVar);
×
NEW
1049
          QUERY_CHECK_CODE(code, lino, _end);
×
NEW
1050
          memcpy(pStart, varDataVal(pVar), varDataLen(pVar));
×
NEW
1051
          pStart += varDataLen(pVar);
×
NEW
1052
          taosMemoryFree(pVar);
×
1053
        } else {
NEW
1054
          memcpy(pStart, pTagValue.pData, pTagValue.nData);
×
NEW
1055
          pStart += pTagValue.nData;
×
1056
        }
1057
      } else {
NEW
1058
        memcpy(pStart, &pTagValue.i64, tDataTypes[pTagValue.type].bytes);
×
NEW
1059
        pStart += tDataTypes[pTagValue.type].bytes;
×
1060
      }
1061
    } else {
1062
      // tag value not found
NEW
1063
      code = TSDB_CODE_NOT_FOUND;
×
NEW
1064
      QUERY_CHECK_CODE(code, lino, _end);
×
1065
    }
1066
  }
1067

1068
  // update MD5
NEW
1069
  tMD5Init(pContext);
×
NEW
1070
  tMD5Update(pContext, (uint8_t*)pKey, (uint32_t)keyLen);
×
NEW
1071
  tMD5Final(pContext);
×
1072

NEW
1073
_end:
×
NEW
1074
  taosMemFreeClear(pKey);
×
NEW
1075
  return code;
×
1076
}
1077

1078
// remove the dropped table uid from all cache entries
1079
// pDroppedTable is the dropped child table meta entry
1080
int32_t metaStableTagFilterCacheUpdateUid(SMeta* pMeta,
77,559,223✔
1081
  const SMetaEntry* pChildTable, const SMetaEntry* pSuperTable,
1082
  ETagFilterCacheAction action) {
1083
  if (pMeta == NULL || pChildTable == NULL || pSuperTable == NULL) {
77,559,223✔
NEW
1084
    return TSDB_CODE_INVALID_PARA;
×
1085
  }
1086
  int32_t   lino = 0;
77,565,221✔
1087
  int32_t   code = TSDB_CODE_SUCCESS;
77,565,221✔
1088
  SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
77,565,221✔
1089
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
77,568,018✔
1090

1091
  code = taosThreadRwlockWrlock(pRwlock);
77,565,998✔
1092
  TSDB_CHECK_CODE(code, lino, _end);
77,558,641✔
1093

1094
  tb_uid_t suid = pChildTable->ctbEntry.suid;;
77,558,641✔
1095
  STagConds** pTagConds =
1096
    (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t));
77,561,913✔
1097
  if (pTagConds != NULL) {
77,569,320✔
NEW
1098
    STagCondFilterEntry** ppFilterEntry = NULL;
×
NEW
1099
    while ((ppFilterEntry = taosHashIterate((*pTagConds)->set, ppFilterEntry))) {
×
NEW
1100
      STagCondFilterEntry* pFilterEntry = *ppFilterEntry;
×
1101
      // rebuild the tagCondKey and check existence
NEW
1102
      SArray* pColIds = pFilterEntry->pColIds;
×
1103
      // rebuild the tagCondFilterKey
NEW
1104
      int32_t keyLen = 0;
×
NEW
1105
      char*   pKey = NULL;
×
NEW
1106
      T_MD5_CTX context = {0};
×
NEW
1107
      code = buildTagDataEntryKey(pColIds, (STag*)pChildTable->ctbEntry.pTags, 
×
1108
        &pSuperTable->stbEntry.schemaTag, &context);
NEW
1109
      if (code != TSDB_CODE_SUCCESS) {
×
NEW
1110
        metaError("vgId:%d, suid:%" PRIu64 " failed to build tag condition"
×
1111
          " key for dropped table uid:%" PRIu64 " since %s",
1112
          TD_VID(pMeta->pVnode), suid, pChildTable->uid, tstrerror(code));
NEW
1113
        goto _end;
×
1114
      }
1115

NEW
1116
      SArray** pArray = (SArray**)taosHashGet(
×
1117
        pFilterEntry->set, context.digest, tListLen(context.digest));
NEW
1118
      if (pArray != NULL) {
×
1119
        // check and remove the dropped table uid from the array
1120
        // TODO(Tony Zhang): optimize this scan
NEW
1121
        if (action == STABLE_TAG_FILTER_CACHE_DROP_TABLE) {
×
NEW
1122
          for (int32_t i = 0; i < taosArrayGetSize(*pArray); i++) {
×
NEW
1123
            uint64_t uid = *(uint64_t*)taosArrayGet(*pArray, i);
×
NEW
1124
            if (uid == pChildTable->uid) {
×
NEW
1125
              taosArrayRemove(*pArray, i);
×
NEW
1126
              metaDebug("vgId:%d, suid:%" PRIu64
×
1127
                " removed dropped table uid:%" PRIu64
1128
                " from stable tag filter cache",
1129
                TD_VID(pMeta->pVnode), suid, pChildTable->uid);
NEW
1130
              break;
×
1131
            } 
1132
          }
1133
        } else {
1134
          // STABLE_TAG_FILTER_CACHE_ADD_TABLE
NEW
1135
          void* _tmp = taosArrayPush(*pArray, &pChildTable->uid);
×
1136
        }
1137
      }
1138
    }
1139
  }
1140

1141
_end:
77,569,320✔
1142
  if (TSDB_CODE_SUCCESS != code) {
77,559,837✔
NEW
1143
    metaError("vgId:%d, %s failed at %s:%d since %s",
×
1144
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
1145
  } else {
1146
    metaDebug(
77,559,837✔
1147
      "vgId:%d, suid:%" PRIu64 " update table uid:%" PRIu64
1148
      " in stable tag filter cache, action:%d",
1149
      TD_VID(pMeta->pVnode),
1150
      pChildTable->ctbEntry.suid, pChildTable->uid, action);
1151
  }
1152
  code = taosThreadRwlockUnlock(pRwlock);
77,560,356✔
1153
  if (TSDB_CODE_SUCCESS != code) {
77,555,902✔
NEW
1154
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
×
1155
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
1156
  }
1157
  return code;
77,547,691✔
1158
}
1159

1160
int32_t metaStableTagFilterCacheDropTag(
304,935✔
1161
  SMeta* pMeta, tb_uid_t suid, col_id_t cid) {
1162
  if (pMeta == NULL) {
304,935✔
NEW
1163
    return TSDB_CODE_INVALID_PARA;
×
1164
  }
1165
  int32_t   lino = 0;
304,935✔
1166
  int32_t   code = TSDB_CODE_SUCCESS;
304,935✔
1167
  SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
304,935✔
1168
  TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
304,281✔
1169

1170
  code = taosThreadRwlockWrlock(pRwlock);
304,176✔
1171
  TSDB_CHECK_CODE(code, lino, _end);
304,830✔
1172

1173
  STagConds** pTagConds =
1174
    (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t));
304,830✔
1175
  if (pTagConds != NULL) {
303,627✔
NEW
1176
    void* pIter = taosHashIterate((*pTagConds)->set, NULL);
×
NEW
1177
    while (pIter) {
×
NEW
1178
      STagCondFilterEntry* pFilterEntry = *(STagCondFilterEntry**)pIter;
×
NEW
1179
      bool found = false;
×
NEW
1180
      for (int32_t i = 0; i < taosArrayGetSize(pFilterEntry->pColIds); i++) {
×
NEW
1181
        col_id_t existCid = *(col_id_t*)taosArrayGet(pFilterEntry->pColIds, i);
×
NEW
1182
        if (existCid == cid) {
×
NEW
1183
          found = true;
×
NEW
1184
          break;
×
1185
        }
1186
      }
NEW
1187
      if (found) {
×
NEW
1188
        uint32_t numEntries = taosHashGetSize(pFilterEntry->set);
×
NEW
1189
        size_t keyLen = 0;
×
NEW
1190
        char  *key = (char *)taosHashGetKey(pIter, &keyLen);
×
NEW
1191
        code = taosHashRemove((*pTagConds)->set, key, keyLen);
×
NEW
1192
        TSDB_CHECK_CODE(code, lino, _end);
×
NEW
1193
        (*pTagConds)->numTagDataEntries -= numEntries;
×
NEW
1194
        pMeta->pCache->sStableTagFilterResCache.numTagDataEntries -= numEntries;
×
1195
      }
NEW
1196
      pIter = taosHashIterate((*pTagConds)->set, pIter);
×
1197
    }
1198
  }
1199
_end:
303,627✔
1200
  if (TSDB_CODE_SUCCESS != code) {
304,935✔
NEW
1201
    metaError("vgId:%d, %s failed at %s:%d since %s",
×
1202
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
1203
  } else {
1204
    metaDebug(
304,935✔
1205
      "vgId:%d, suid:%" PRIu64 " dropped tag cid:%d "
1206
      "from stable tag filter cache",
1207
      TD_VID(pMeta->pVnode), suid, cid);
1208
  }
1209
  code = taosThreadRwlockUnlock(pRwlock);
304,935✔
1210
  if (TSDB_CODE_SUCCESS != code) {
304,935✔
NEW
1211
    metaError("vgId:%d, %s unlock failed at %s:%d since %s",
×
1212
      TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
1213
  }
1214
  return code;
304,935✔
1215
}
1216

1217
void metaCacheClear(SMeta* pMeta) {
4,232,795✔
1218
  metaWLock(pMeta);
4,232,795✔
1219
  metaCacheClose(pMeta);
4,232,795✔
1220
  (void)metaCacheOpen(pMeta);
4,232,795✔
1221
  metaULock(pMeta);
4,232,795✔
1222
}
4,232,795✔
1223

1224
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
1225
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
69,736,519✔
1226
  uint64_t  p[4] = {0};
69,736,519✔
1227
  int32_t   vgId = TD_VID(pMeta->pVnode);
69,742,073✔
1228
  SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
69,734,972✔
1229

1230
  uint64_t dummy[2] = {0};
69,736,760✔
1231
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
69,745,635✔
1232

1233
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
69,736,871✔
1234
  (void)taosThreadMutexLock(pLock);
69,740,040✔
1235

1236
  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
69,747,554✔
1237
  if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
69,745,707✔
1238
    (void)taosThreadMutexUnlock(pLock);
69,744,853✔
1239
    return TSDB_CODE_SUCCESS;
69,750,289✔
1240
  }
1241

1242
  (*pEntry)->hitTimes = 0;
854✔
1243

1244
  char *iter = taosHashIterate((*pEntry)->set, NULL);
854✔
1245
  while (iter != NULL) {
1,708✔
1246
    setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
1247
    taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
854✔
1248
    iter = taosHashIterate((*pEntry)->set, iter);
854✔
1249
  }
1250
  taosHashClear((*pEntry)->set);
854✔
1251
  (void)taosThreadMutexUnlock(pLock);
854✔
1252

1253
  metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
854✔
1254
  return TSDB_CODE_SUCCESS;
854✔
1255
}
1256

UNCOV
1257
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
×
1258
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
1259
  int32_t vgId = TD_VID(pMeta->pVnode);
×
1260

1261
  // generate the composed key for LRU cache
UNCOV
1262
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
×
1263
  SHashObj*      pTableMap = pMeta->pCache->STbGroupResCache.pTableEntry;
×
UNCOV
1264
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
×
1265

UNCOV
1266
  *pList = NULL;
×
UNCOV
1267
  uint64_t key[4];
×
UNCOV
1268
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
×
1269

UNCOV
1270
  (void)taosThreadMutexLock(pLock);
×
UNCOV
1271
  pMeta->pCache->STbGroupResCache.accTimes += 1;
×
1272

UNCOV
1273
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
×
UNCOV
1274
  if (pHandle == NULL) {
×
UNCOV
1275
    (void)taosThreadMutexUnlock(pLock);
×
UNCOV
1276
    return TSDB_CODE_SUCCESS;
×
1277
  }
1278

UNCOV
1279
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
×
1280
  if (NULL == pEntry) {
×
1281
    metaDebug("suid %" PRIu64 " not in tb group cache", suid);
×
UNCOV
1282
    return TSDB_CODE_NOT_FOUND;
×
1283
  }
1284

1285
  *pList = taosArrayDup(taosLRUCacheValue(pCache, pHandle), NULL);
×
1286

UNCOV
1287
  (*pEntry)->hitTimes += 1;
×
1288

1289
  uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
×
UNCOV
1290
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
×
UNCOV
1291
    metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
×
1292
             ((double)(*pEntry)->hitTimes) / acc);
1293
  }
1294

UNCOV
1295
  bool ret = taosLRUCacheRelease(pCache, pHandle, false);
×
1296

1297
  // unlock meta
UNCOV
1298
  (void)taosThreadMutexUnlock(pLock);
×
UNCOV
1299
  return TSDB_CODE_SUCCESS;
×
1300
}
1301

UNCOV
1302
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
×
1303
  (void)ud;
UNCOV
1304
  if (value == NULL) {
×
UNCOV
1305
    return;
×
1306
  }
1307

UNCOV
1308
  const uint64_t* p = key;
×
UNCOV
1309
  if (keyLen != sizeof(int64_t) * 4) {
×
1310
    metaError("tb group key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
×
UNCOV
1311
    return;
×
1312
  }
1313

UNCOV
1314
  SHashObj* pHashObj = (SHashObj*)p[0];
×
1315

1316
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
×
1317

UNCOV
1318
  if (pEntry != NULL && (*pEntry) != NULL) {
×
UNCOV
1319
    int64_t st = taosGetTimestampUs();
×
1320
    int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
×
UNCOV
1321
    if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
1322
      double el = (taosGetTimestampUs() - st) / 1000.0;
×
UNCOV
1323
      metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
×
1324
                taosHashGetSize((*pEntry)->set), el);
1325
    }
1326
  }
1327

UNCOV
1328
  taosArrayDestroy((SArray*)value);
×
1329
}
1330

UNCOV
1331
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
×
1332
                              int32_t payloadLen) {
UNCOV
1333
  int32_t code = 0;
×
UNCOV
1334
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
1335
  int32_t vgId = TD_VID(pMeta->pVnode);
×
1336

1337
  if (payloadLen > tsTagFilterResCacheSize) {
×
1338
    metaDebug("vgId:%d, suid:%" PRIu64
×
1339
              " ignore to add to tb group cache, due to payload length %d greater than threshold %d",
1340
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
UNCOV
1341
    taosArrayDestroy((SArray*)pPayload);
×
UNCOV
1342
    return TSDB_CODE_SUCCESS;
×
1343
  }
1344

1345
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
×
1346
  SHashObj*      pTableEntry = pMeta->pCache->STbGroupResCache.pTableEntry;
×
1347
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
×
1348

1349
  uint64_t key[4] = {0};
×
1350
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
×
1351

1352
  (void)taosThreadMutexLock(pLock);
×
1353
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
×
UNCOV
1354
  if (pEntry == NULL) {
×
UNCOV
1355
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
×
1356
    if (code != TSDB_CODE_SUCCESS) {
×
1357
      goto _end;
×
1358
    }
1359
  } else {  // check if it exists or not
1360
    code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
×
UNCOV
1361
    if (code == TSDB_CODE_DUP_KEY) {
×
1362
      // we have already found the existed items, no need to added to cache anymore.
1363
      (void)taosThreadMutexUnlock(pLock);
×
UNCOV
1364
      return TSDB_CODE_SUCCESS;
×
1365
    }
1366
    if (code != TSDB_CODE_SUCCESS) {
×
1367
      goto _end;
×
1368
    }
1369
  }
1370

1371
  // add to cache.
UNCOV
1372
  (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL, NULL,
×
1373
                           TAOS_LRU_PRIORITY_LOW, NULL);
1374
_end:
×
UNCOV
1375
  (void)taosThreadMutexUnlock(pLock);
×
UNCOV
1376
  metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
×
1377
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
1378

1379
  return code;
×
1380
}
1381

1382
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
1383
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
69,729,972✔
1384
  uint64_t  p[4] = {0};
69,729,972✔
1385
  int32_t   vgId = TD_VID(pMeta->pVnode);
69,737,282✔
1386
  SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;
69,724,467✔
1387

1388
  uint64_t dummy[2] = {0};
69,730,847✔
1389
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
69,742,519✔
1390

1391
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
69,749,948✔
1392
  (void)taosThreadMutexLock(pLock);
69,741,549✔
1393

1394
  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
69,746,582✔
1395
  if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
69,749,422✔
1396
    (void)taosThreadMutexUnlock(pLock);
69,749,422✔
1397
    return TSDB_CODE_SUCCESS;
69,753,534✔
1398
  }
1399

1400
  (*pEntry)->hitTimes = 0;
×
1401

UNCOV
1402
  char *iter = taosHashIterate((*pEntry)->set, NULL);
×
1403
  while (iter != NULL) {
×
1404
    setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
UNCOV
1405
    taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
×
UNCOV
1406
    iter = taosHashIterate((*pEntry)->set, iter);
×
1407
  }
1408
  taosHashClear((*pEntry)->set);
×
1409
  (void)taosThreadMutexUnlock(pLock);
×
1410

1411
  metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
×
1412
  return TSDB_CODE_SUCCESS;
×
1413
}
1414

1415
bool metaTbInFilterCache(SMeta* pMeta, const void* key, int8_t type) {
120,755,901✔
1416
  if (type == 0 && taosHashGet(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t))) {
120,755,901✔
1417
    return true;
1,250✔
1418
  }
1419

1420
  if (type == 1 && taosHashGet(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key))) {
120,754,651✔
UNCOV
1421
    return true;
×
1422
  }
1423

1424
  return false;
120,777,946✔
1425
}
1426

1427
int32_t metaPutTbToFilterCache(SMeta* pMeta, const void* key, int8_t type) {
8,000✔
1428
  if (type == 0) {
8,000✔
1429
    return taosHashPut(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t), NULL, 0);
3,250✔
1430
  }
1431

1432
  if (type == 1) {
4,750✔
1433
    return taosHashPut(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key), NULL, 0);
4,750✔
1434
  }
1435

UNCOV
1436
  return 0;
×
1437
}
1438

1439
int32_t metaSizeOfTbFilterCache(SMeta* pMeta, int8_t type) {
5,000✔
1440
  if (type == 0) {
5,000✔
1441
    return taosHashGetSize(pMeta->pCache->STbFilterCache.pStb);
5,000✔
1442
  }
1443
  return 0;
×
1444
}
1445

1446
int32_t metaInitTbFilterCache(SMeta* pMeta) {
4,234,735✔
1447
#ifdef TD_ENTERPRISE
1448
  int32_t      tbNum = 0;
4,234,735✔
1449
  const char** pTbArr = NULL;
4,234,735✔
1450
  const char*  dbName = NULL;
4,234,735✔
1451

1452
  if (!(dbName = strchr(pMeta->pVnode->config.dbname, '.'))) return 0;
4,234,735✔
1453
  if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
4,234,120✔
1454
    tbNum = tkLogStbNum;
250✔
1455
    pTbArr = (const char**)&tkLogStb;
250✔
1456
  } else if (0 == strncmp(dbName, "audit", TSDB_DB_NAME_LEN)) {
4,233,870✔
1457
    tbNum = tkAuditStbNum;
×
1458
    pTbArr = (const char**)&tkAuditStb;
×
1459
  }
1460
  if (tbNum && pTbArr) {
4,234,120✔
1461
    for (int32_t i = 0; i < tbNum; ++i) {
5,000✔
1462
      TAOS_CHECK_RETURN(metaPutTbToFilterCache(pMeta, pTbArr[i], 1));
4,750✔
1463
    }
1464
  }
1465
#else
1466
#endif
1467
  return 0;
4,234,120✔
1468
}
1469

1470
int64_t metaGetStbKeep(SMeta* pMeta, int64_t uid) {
184,084✔
1471
  SMetaStbStats stats = {0};
184,084✔
1472

1473
  if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
184,084✔
1474
    return stats.keep;
160,582✔
1475
  }
1476

1477
  SMetaEntry* pEntry = NULL;
23,502✔
1478
  if (metaFetchEntryByUid(pMeta, uid, &pEntry) == TSDB_CODE_SUCCESS) {
23,502✔
1479
    int64_t keep = -1;
22,879✔
1480
    if (pEntry->type == TSDB_SUPER_TABLE) {
22,879✔
1481
      keep = pEntry->stbEntry.keep;
22,879✔
1482
    }
1483
    metaFetchEntryFree(&pEntry);
22,879✔
1484
    return keep;
22,879✔
1485
  }
1486
  
1487
  return -1;
623✔
1488
}
1489

1490
int32_t metaRefDbsCacheClear(SMeta* pMeta, uint64_t suid) {
310,974✔
1491
  int32_t        code = TSDB_CODE_SUCCESS;
310,974✔
1492
  int32_t        vgId = TD_VID(pMeta->pVnode);
310,974✔
1493
  SHashObj*      pEntryHashMap = pMeta->pCache->STbRefDbCache.pStbRefs;
310,974✔
1494
  TdThreadMutex* pLock = &pMeta->pCache->STbRefDbCache.lock;
310,974✔
1495

1496
  (void)taosThreadMutexLock(pLock);
310,974✔
1497

1498
  SHashObj** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
310,974✔
1499
  if (pEntry == NULL) {
310,974✔
1500
    goto _return;
308,158✔
1501
  }
1502

1503
  code = taosHashRemove(pEntryHashMap, &suid, sizeof(uint64_t));
2,816✔
1504

1505
  metaDebug("vgId:%d suid:%" PRId64 " cached virtual stable ref db cleared", vgId, suid);
2,816✔
1506

1507
_return:
2,026✔
1508
  (void)taosThreadMutexUnlock(pLock);
310,974✔
1509
  return code;
310,974✔
1510
}
1511

1512
int32_t metaGetCachedRefDbs(void* pVnode, tb_uid_t suid, SArray* pList) {
2,150,979✔
1513
  int32_t        code = TSDB_CODE_SUCCESS;
2,150,979✔
1514
  int32_t        line = 0;
2,150,979✔
1515
  SMeta*         pMeta = ((SVnode*)pVnode)->pMeta;
2,150,979✔
1516
  SHashObj*      pTableMap = pMeta->pCache->STbRefDbCache.pStbRefs;
2,150,979✔
1517
  TdThreadMutex* pLock = &pMeta->pCache->STbRefDbCache.lock;
2,152,782✔
1518

1519
  (void)taosThreadMutexLock(pLock);
2,152,186✔
1520

1521
  SHashObj** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
2,152,210✔
1522
  if (pEntry) {
2,152,762✔
1523
    void *iter = taosHashIterate(*pEntry, NULL);
2,128,570✔
1524
    while (iter != NULL) {
7,405,310✔
1525
      size_t   dbNameLen = 0;
5,276,148✔
1526
      char*    name = NULL;
5,276,148✔
1527
      char*    dbName = NULL;
5,276,148✔
1528
      name = taosHashGetKey(iter, &dbNameLen);
5,276,148✔
1529
      TSDB_CHECK_NULL(name, code, line, _return, terrno);
5,271,457✔
1530
      dbName = taosMemoryMalloc(dbNameLen + 1);
5,271,457✔
1531
      TSDB_CHECK_NULL(dbName, code, line, _return, terrno);
5,272,038✔
1532
      tstrncpy(dbName, name, dbNameLen + 1);
5,272,038✔
1533
      TSDB_CHECK_NULL(taosArrayPush(pList, &dbName), code, line, _return, terrno);
5,275,600✔
1534
      iter = taosHashIterate(*pEntry, iter);
5,275,600✔
1535
    }
1536
  }
1537

1538
_return:
2,153,354✔
1539
  if (code) {
2,152,135✔
UNCOV
1540
    metaError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1541
  }
1542
  (void)taosThreadMutexUnlock(pLock);
2,152,135✔
1543
  return code;
2,153,354✔
1544
}
1545

1546
static int32_t addRefDbsCacheNewEntry(SHashObj* pRefDbs, uint64_t suid, SHashObj **pEntry) {
24,192✔
1547
  int32_t      code = TSDB_CODE_SUCCESS;
24,192✔
1548
  int32_t      lino = 0;
24,192✔
1549
  SHashObj*    p = NULL;
24,192✔
1550

1551
  p = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
24,192✔
1552
  TSDB_CHECK_NULL(p, code, lino, _end, terrno);
24,192✔
1553

1554
  code = taosHashPut(pRefDbs, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
24,192✔
1555
  TSDB_CHECK_CODE(code, lino, _end);
24,192✔
1556

1557
  *pEntry = p;
24,192✔
1558

1559
_end:
24,192✔
1560
  if (code != TSDB_CODE_SUCCESS) {
24,192✔
1561
    metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1562
  }
1563
  return code;
24,192✔
1564
}
1565

1566
int32_t metaPutRefDbsToCache(void* pVnode, tb_uid_t suid, SArray* pList) {
27,362✔
1567
  int32_t        code = 0;
27,362✔
1568
  int32_t        line = 0;
27,362✔
1569
  SMeta*         pMeta = ((SVnode*)pVnode)->pMeta;
27,362✔
1570
  SHashObj*      pStbRefs = pMeta->pCache->STbRefDbCache.pStbRefs;
27,362✔
1571
  TdThreadMutex* pLock = &pMeta->pCache->STbRefDbCache.lock;
27,362✔
1572

1573
  (void)taosThreadMutexLock(pLock);
27,362✔
1574

1575
  SHashObj*  pEntry = NULL;
27,362✔
1576
  SHashObj** find = taosHashGet(pStbRefs, &suid, sizeof(uint64_t));
27,362✔
1577
  if (find == NULL) {
27,362✔
1578
    code = addRefDbsCacheNewEntry(pStbRefs, suid, &pEntry);
24,192✔
1579
    TSDB_CHECK_CODE(code, line, _return);
24,192✔
1580
  } else {  // check if it exists or not
1581
    pEntry = *find;
3,170✔
1582
  }
1583

1584
  for (int32_t i = 0; i < taosArrayGetSize(pList); i++) {
63,333✔
1585
    char* dbName = taosArrayGetP(pList, i);
35,971✔
1586
    void* pItem = taosHashGet(pEntry, dbName, strlen(dbName));
35,971✔
1587
    if (pItem == NULL) {
35,971✔
1588
      code = taosHashPut(pEntry, dbName, strlen(dbName), NULL, 0);
35,971✔
1589
      TSDB_CHECK_CODE(code, line, _return);
35,971✔
1590
    }
1591
  }
1592

1593
_return:
27,362✔
1594
  if (code) {
27,362✔
UNCOV
1595
    metaError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1596
  }
1597
  (void)taosThreadMutexUnlock(pLock);
27,362✔
1598

1599
  return code;
27,362✔
1600
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc