• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/meta/metaCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "meta.h"
16

17
#ifdef TD_ENTERPRISE
18
extern const char* tkLogStb[];
19
extern const char* tkAuditStb[];
20
extern const int   tkLogStbNum;
21
extern const int   tkAuditStbNum;
22
#endif
23

24
#define TAG_FILTER_RES_KEY_LEN  32
25
#define META_CACHE_BASE_BUCKET  1024
26
#define META_CACHE_STATS_BUCKET 16
27

28
// (uid , suid) : child table
29
// (uid,     0) : normal table
30
// (suid, suid) : super table
31
typedef struct SMetaCacheEntry SMetaCacheEntry;
32
struct SMetaCacheEntry {
33
  SMetaCacheEntry* next;
34
  SMetaInfo        info;
35
};
36

37
typedef struct SMetaStbStatsEntry {
38
  struct SMetaStbStatsEntry* next;
39
  SMetaStbStats              info;
40
} SMetaStbStatsEntry;
41

42
typedef struct STagFilterResEntry {
43
  SHashObj *set;    // the set of md5 digest, extracted from the serialized tag query condition
44
  uint32_t hitTimes;  // queried times for current super table
45
} STagFilterResEntry;
46

47
struct SMetaCache {
48
  // child, normal, super, table entry cache
49
  struct SEntryCache {
50
    int32_t           nEntry;
51
    int32_t           nBucket;
52
    SMetaCacheEntry** aBucket;
53
  } sEntryCache;
54

55
  // stable stats cache
56
  struct SStbStatsCache {
57
    int32_t              nEntry;
58
    int32_t              nBucket;
59
    SMetaStbStatsEntry** aBucket;
60
  } sStbStatsCache;
61

62
  // query cache
63
  struct STagFilterResCache {
64
    TdThreadMutex lock;
65
    uint32_t      accTimes;
66
    SHashObj*     pTableEntry;
67
    SLRUCache*    pUidResCache;
68
  } sTagFilterResCache;
69

70
  struct STbGroupResCache {
71
    TdThreadMutex lock;
72
    uint32_t      accTimes;
73
    SHashObj*     pTableEntry;
74
    SLRUCache*    pResCache;
75
  } STbGroupResCache;
76

77
  struct STbFilterCache {
78
    SHashObj* pStb;
79
    SHashObj* pStbName;
80
  } STbFilterCache;
81
};
82

UNCOV
83
static void entryCacheClose(SMeta* pMeta) {
×
UNCOV
84
  if (pMeta->pCache) {
×
85
    // close entry cache
UNCOV
86
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
×
UNCOV
87
      SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
×
UNCOV
88
      while (pEntry) {
×
UNCOV
89
        SMetaCacheEntry* tEntry = pEntry->next;
×
UNCOV
90
        taosMemoryFree(pEntry);
×
UNCOV
91
        pEntry = tEntry;
×
92
      }
93
    }
UNCOV
94
    taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
×
95
  }
UNCOV
96
}
×
97

UNCOV
98
static void statsCacheClose(SMeta* pMeta) {
×
UNCOV
99
  if (pMeta->pCache) {
×
100
    // close entry cache
UNCOV
101
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sStbStatsCache.nBucket; iBucket++) {
×
UNCOV
102
      SMetaStbStatsEntry* pEntry = pMeta->pCache->sStbStatsCache.aBucket[iBucket];
×
UNCOV
103
      while (pEntry) {
×
UNCOV
104
        SMetaStbStatsEntry* tEntry = pEntry->next;
×
UNCOV
105
        taosMemoryFree(pEntry);
×
UNCOV
106
        pEntry = tEntry;
×
107
      }
108
    }
UNCOV
109
    taosMemoryFree(pMeta->pCache->sStbStatsCache.aBucket);
×
110
  }
UNCOV
111
}
×
112

113
static void freeCacheEntryFp(void* param) {
×
114
  STagFilterResEntry** p = param;
×
115
  taosHashCleanup((*p)->set);
×
116
  taosMemoryFreeClear(*p);
×
117
}
×
118

UNCOV
119
int32_t metaCacheOpen(SMeta* pMeta) {
×
UNCOV
120
  int32_t code = 0;
×
121
  int32_t lino;
122

UNCOV
123
  pMeta->pCache = (SMetaCache*)taosMemoryCalloc(1, sizeof(SMetaCache));
×
UNCOV
124
  if (pMeta->pCache == NULL) {
×
125
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
126
  }
127

128
  // open entry cache
UNCOV
129
  pMeta->pCache->sEntryCache.nEntry = 0;
×
UNCOV
130
  pMeta->pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
×
UNCOV
131
  pMeta->pCache->sEntryCache.aBucket =
×
UNCOV
132
      (SMetaCacheEntry**)taosMemoryCalloc(pMeta->pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
×
UNCOV
133
  if (pMeta->pCache->sEntryCache.aBucket == NULL) {
×
134
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
135
  }
136

137
  // open stats cache
UNCOV
138
  pMeta->pCache->sStbStatsCache.nEntry = 0;
×
UNCOV
139
  pMeta->pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
×
UNCOV
140
  pMeta->pCache->sStbStatsCache.aBucket =
×
UNCOV
141
      (SMetaStbStatsEntry**)taosMemoryCalloc(pMeta->pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
×
UNCOV
142
  if (pMeta->pCache->sStbStatsCache.aBucket == NULL) {
×
143
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
144
  }
145

UNCOV
146
  pMeta->pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
×
UNCOV
147
  if (pMeta->pCache->sTagFilterResCache.pUidResCache == NULL) {
×
148
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
149
  }
150

UNCOV
151
  pMeta->pCache->sTagFilterResCache.accTimes = 0;
×
UNCOV
152
  pMeta->pCache->sTagFilterResCache.pTableEntry =
×
UNCOV
153
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
×
UNCOV
154
  if (pMeta->pCache->sTagFilterResCache.pTableEntry == NULL) {
×
155
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
156
  }
157

UNCOV
158
  taosHashSetFreeFp(pMeta->pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
×
UNCOV
159
  (void)taosThreadMutexInit(&pMeta->pCache->sTagFilterResCache.lock, NULL);
×
160

UNCOV
161
  pMeta->pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
×
UNCOV
162
  if (pMeta->pCache->STbGroupResCache.pResCache == NULL) {
×
163
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
164
  }
165

UNCOV
166
  pMeta->pCache->STbGroupResCache.accTimes = 0;
×
UNCOV
167
  pMeta->pCache->STbGroupResCache.pTableEntry =
×
UNCOV
168
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
×
UNCOV
169
  if (pMeta->pCache->STbGroupResCache.pTableEntry == NULL) {
×
170
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
171
  }
172

UNCOV
173
  taosHashSetFreeFp(pMeta->pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
×
UNCOV
174
  (void)taosThreadMutexInit(&pMeta->pCache->STbGroupResCache.lock, NULL);
×
175

UNCOV
176
  pMeta->pCache->STbFilterCache.pStb =
×
UNCOV
177
      taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
UNCOV
178
  if (pMeta->pCache->STbFilterCache.pStb == NULL) {
×
179
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
180
  }
181

UNCOV
182
  pMeta->pCache->STbFilterCache.pStbName =
×
UNCOV
183
      taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
×
UNCOV
184
  if (pMeta->pCache->STbFilterCache.pStbName == NULL) {
×
185
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
186
  }
187

UNCOV
188
_exit:
×
UNCOV
189
  if (code) {
×
190
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
191
    metaCacheClose(pMeta);
×
192
  } else {
UNCOV
193
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
×
194
  }
UNCOV
195
  return code;
×
196
}
197

UNCOV
198
void metaCacheClose(SMeta* pMeta) {
×
UNCOV
199
  if (pMeta->pCache) {
×
UNCOV
200
    entryCacheClose(pMeta);
×
UNCOV
201
    statsCacheClose(pMeta);
×
202

UNCOV
203
    taosHashClear(pMeta->pCache->sTagFilterResCache.pTableEntry);
×
UNCOV
204
    taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
×
UNCOV
205
    (void)taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
×
UNCOV
206
    taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
×
207

UNCOV
208
    taosHashClear(pMeta->pCache->STbGroupResCache.pTableEntry);
×
UNCOV
209
    taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
×
UNCOV
210
    (void)taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
×
UNCOV
211
    taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);
×
212

UNCOV
213
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStb);
×
UNCOV
214
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStbName);
×
215

UNCOV
216
    taosMemoryFree(pMeta->pCache);
×
UNCOV
217
    pMeta->pCache = NULL;
×
218
  }
UNCOV
219
}
×
220

UNCOV
221
static void metaRehashCache(SMetaCache* pCache, int8_t expand) {
×
UNCOV
222
  int32_t code = 0;
×
223
  int32_t nBucket;
224

UNCOV
225
  if (expand) {
×
UNCOV
226
    nBucket = pCache->sEntryCache.nBucket * 2;
×
227
  } else {
228
    nBucket = pCache->sEntryCache.nBucket / 2;
×
229
  }
230

UNCOV
231
  SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
×
UNCOV
232
  if (aBucket == NULL) {
×
233
    return;
×
234
  }
235

236
  // rehash
UNCOV
237
  for (int32_t iBucket = 0; iBucket < pCache->sEntryCache.nBucket; iBucket++) {
×
UNCOV
238
    SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
×
239

UNCOV
240
    while (pEntry) {
×
UNCOV
241
      SMetaCacheEntry* pTEntry = pEntry->next;
×
242

UNCOV
243
      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
×
UNCOV
244
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
×
245

UNCOV
246
      pEntry = pTEntry;
×
247
    }
248
  }
249

250
  // final set
UNCOV
251
  taosMemoryFree(pCache->sEntryCache.aBucket);
×
UNCOV
252
  pCache->sEntryCache.nBucket = nBucket;
×
UNCOV
253
  pCache->sEntryCache.aBucket = aBucket;
×
UNCOV
254
  return;
×
255
}
256

UNCOV
257
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
×
UNCOV
258
  int32_t code = 0;
×
259

260
  // meta is wlocked for calling this func.
261

262
  // search
UNCOV
263
  SMetaCache*       pCache = pMeta->pCache;
×
UNCOV
264
  int32_t           iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
×
UNCOV
265
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
×
UNCOV
266
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
×
UNCOV
267
    ppEntry = &(*ppEntry)->next;
×
268
  }
269

UNCOV
270
  if (*ppEntry) {  // update
×
UNCOV
271
    if (pInfo->suid != (*ppEntry)->info.suid) {
×
272
      metaError("meta/cache: suid should be same as the one in cache.");
×
273
      return TSDB_CODE_INVALID_PARA;
×
274
    }
UNCOV
275
    if (pInfo->version > (*ppEntry)->info.version) {
×
UNCOV
276
      (*ppEntry)->info.version = pInfo->version;
×
UNCOV
277
      (*ppEntry)->info.skmVer = pInfo->skmVer;
×
278
    }
279
  } else {  // insert
UNCOV
280
    if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
×
UNCOV
281
      metaRehashCache(pCache, 1);
×
282

UNCOV
283
      iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
×
284
    }
285

UNCOV
286
    SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
×
UNCOV
287
    if (pEntryNew == NULL) {
×
288
      code = terrno;
×
289
      goto _exit;
×
290
    }
291

UNCOV
292
    pEntryNew->info = *pInfo;
×
UNCOV
293
    pEntryNew->next = pCache->sEntryCache.aBucket[iBucket];
×
UNCOV
294
    pCache->sEntryCache.aBucket[iBucket] = pEntryNew;
×
UNCOV
295
    pCache->sEntryCache.nEntry++;
×
296
  }
297

UNCOV
298
_exit:
×
UNCOV
299
  return code;
×
300
}
301

UNCOV
302
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
×
UNCOV
303
  int32_t code = 0;
×
304

UNCOV
305
  SMetaCache*       pCache = pMeta->pCache;
×
UNCOV
306
  int32_t           iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
×
UNCOV
307
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
×
UNCOV
308
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
×
UNCOV
309
    ppEntry = &(*ppEntry)->next;
×
310
  }
311

UNCOV
312
  SMetaCacheEntry* pEntry = *ppEntry;
×
UNCOV
313
  if (pEntry) {
×
UNCOV
314
    *ppEntry = pEntry->next;
×
UNCOV
315
    taosMemoryFree(pEntry);
×
UNCOV
316
    pCache->sEntryCache.nEntry--;
×
UNCOV
317
    if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
×
UNCOV
318
        pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
×
319
      metaRehashCache(pCache, 0);
×
320
    }
321
  } else {
UNCOV
322
    code = TSDB_CODE_NOT_FOUND;
×
323
  }
324

UNCOV
325
_exit:
×
UNCOV
326
  return code;
×
327
}
328

UNCOV
329
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
×
UNCOV
330
  int32_t code = 0;
×
331

UNCOV
332
  SMetaCache*      pCache = pMeta->pCache;
×
UNCOV
333
  int32_t          iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
×
UNCOV
334
  SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
×
335

UNCOV
336
  while (pEntry && pEntry->info.uid != uid) {
×
UNCOV
337
    pEntry = pEntry->next;
×
338
  }
339

UNCOV
340
  if (pEntry) {
×
UNCOV
341
    if (pInfo) {
×
UNCOV
342
      *pInfo = pEntry->info;
×
343
    }
344
  } else {
UNCOV
345
    code = TSDB_CODE_NOT_FOUND;
×
346
  }
347

UNCOV
348
  return code;
×
349
}
350

UNCOV
351
static int32_t metaRehashStatsCache(SMetaCache* pCache, int8_t expand) {
×
UNCOV
352
  int32_t code = 0;
×
353
  int32_t nBucket;
354

UNCOV
355
  if (expand) {
×
UNCOV
356
    nBucket = pCache->sStbStatsCache.nBucket * 2;
×
357
  } else {
UNCOV
358
    nBucket = pCache->sStbStatsCache.nBucket / 2;
×
359
  }
360

UNCOV
361
  SMetaStbStatsEntry** aBucket = (SMetaStbStatsEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaStbStatsEntry*));
×
UNCOV
362
  if (aBucket == NULL) {
×
363
    code = terrno;
×
364
    goto _exit;
×
365
  }
366

367
  // rehash
UNCOV
368
  for (int32_t iBucket = 0; iBucket < pCache->sStbStatsCache.nBucket; iBucket++) {
×
UNCOV
369
    SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
×
370

UNCOV
371
    while (pEntry) {
×
UNCOV
372
      SMetaStbStatsEntry* pTEntry = pEntry->next;
×
373

UNCOV
374
      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
×
UNCOV
375
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
×
376

UNCOV
377
      pEntry = pTEntry;
×
378
    }
379
  }
380

381
  // final set
UNCOV
382
  taosMemoryFree(pCache->sStbStatsCache.aBucket);
×
UNCOV
383
  pCache->sStbStatsCache.nBucket = nBucket;
×
UNCOV
384
  pCache->sStbStatsCache.aBucket = aBucket;
×
385

UNCOV
386
_exit:
×
UNCOV
387
  return code;
×
388
}
389

UNCOV
390
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
×
UNCOV
391
  int32_t code = 0;
×
392

393
  // meta is wlocked for calling this func.
394

395
  // search
UNCOV
396
  SMetaCache*          pCache = pMeta->pCache;
×
UNCOV
397
  int32_t              iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
×
UNCOV
398
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
×
UNCOV
399
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
×
UNCOV
400
    ppEntry = &(*ppEntry)->next;
×
401
  }
402

UNCOV
403
  if (*ppEntry) {  // update
×
UNCOV
404
    (*ppEntry)->info.ctbNum = pInfo->ctbNum;
×
UNCOV
405
    (*ppEntry)->info.colNum = pInfo->colNum;
×
406
  } else {  // insert
UNCOV
407
    if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
×
UNCOV
408
      TAOS_UNUSED(metaRehashStatsCache(pCache, 1));
×
UNCOV
409
      iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
×
410
    }
411

UNCOV
412
    SMetaStbStatsEntry* pEntryNew = (SMetaStbStatsEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
×
UNCOV
413
    if (pEntryNew == NULL) {
×
414
      code = terrno;
×
415
      goto _exit;
×
416
    }
417

UNCOV
418
    pEntryNew->info = *pInfo;
×
UNCOV
419
    pEntryNew->next = pCache->sStbStatsCache.aBucket[iBucket];
×
UNCOV
420
    pCache->sStbStatsCache.aBucket[iBucket] = pEntryNew;
×
UNCOV
421
    pCache->sStbStatsCache.nEntry++;
×
422
  }
423

UNCOV
424
_exit:
×
UNCOV
425
  return code;
×
426
}
427

UNCOV
428
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
×
UNCOV
429
  int32_t code = 0;
×
430

UNCOV
431
  SMetaCache*          pCache = pMeta->pCache;
×
UNCOV
432
  int32_t              iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
×
UNCOV
433
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
×
UNCOV
434
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
×
UNCOV
435
    ppEntry = &(*ppEntry)->next;
×
436
  }
437

UNCOV
438
  SMetaStbStatsEntry* pEntry = *ppEntry;
×
UNCOV
439
  if (pEntry) {
×
UNCOV
440
    *ppEntry = pEntry->next;
×
UNCOV
441
    taosMemoryFree(pEntry);
×
UNCOV
442
    pCache->sStbStatsCache.nEntry--;
×
UNCOV
443
    if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
×
UNCOV
444
        pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
×
UNCOV
445
      TAOS_UNUSED(metaRehashStatsCache(pCache, 0));
×
446
    }
447
  } else {
UNCOV
448
    code = TSDB_CODE_NOT_FOUND;
×
449
  }
450

UNCOV
451
_exit:
×
UNCOV
452
  return code;
×
453
}
454

UNCOV
455
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
×
UNCOV
456
  int32_t code = TSDB_CODE_SUCCESS;
×
457

UNCOV
458
  SMetaCache*         pCache = pMeta->pCache;
×
UNCOV
459
  int32_t             iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
×
UNCOV
460
  SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
×
461

UNCOV
462
  while (pEntry && pEntry->info.uid != uid) {
×
UNCOV
463
    pEntry = pEntry->next;
×
464
  }
465

UNCOV
466
  if (pEntry) {
×
UNCOV
467
    if (pInfo) {
×
UNCOV
468
      *pInfo = pEntry->info;
×
469
    }
470
  } else {
UNCOV
471
    code = TSDB_CODE_NOT_FOUND;
×
472
  }
473

UNCOV
474
  return code;
×
475
}
476

477
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
UNCOV
478
  memcpy(&pBuf[2], key, keyLen);
×
479
}
×
480

481
// the format of key:
482
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
UNCOV
483
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
×
UNCOV
484
  buf[0] = (uint64_t)pHashMap;
×
UNCOV
485
  buf[1] = suid;
×
486
  setMD5DigestInKey(buf, key, keyLen);
UNCOV
487
}
×
488

489
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
×
490
                                  bool* acquireRes) {
491
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
492
  int32_t vgId = TD_VID(pMeta->pVnode);
×
493

494
  // generate the composed key for LRU cache
495
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
×
496
  SHashObj*      pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
×
497
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
×
498

499
  *acquireRes = 0;
×
500
  uint64_t key[4];
501
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
×
502

503
  (void)taosThreadMutexLock(pLock);
×
504
  pMeta->pCache->sTagFilterResCache.accTimes += 1;
×
505

506
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
×
507
  if (pHandle == NULL) {
×
508
    (void)taosThreadMutexUnlock(pLock);
×
509
    return TSDB_CODE_SUCCESS;
×
510
  }
511

512
  // do some book mark work after acquiring the filter result from cache
513
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
×
514
  if (NULL == pEntry) {
×
515
    metaError("meta/cache: pEntry should not be NULL.");
×
516
    return TSDB_CODE_NOT_FOUND;
×
517
  }
518

519
  *acquireRes = 1;
×
520

521
  const char* p = taosLRUCacheValue(pCache, pHandle);
×
522
  int32_t     size = *(int32_t*)p;
×
523

524
  // set the result into the buffer
525
  if (taosArrayAddBatch(pList1, p + sizeof(int32_t), size) == NULL) {
×
526
    return terrno;
×
527
  }
528

529
  (*pEntry)->hitTimes += 1;
×
530

531
  uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
×
532
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
×
533
    metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
×
534
             ((double)(*pEntry)->hitTimes) / acc);
535
  }
536

537
  bool ret = taosLRUCacheRelease(pCache, pHandle, false);
×
538

539
  // unlock meta
540
  (void)taosThreadMutexUnlock(pLock);
×
541
  return TSDB_CODE_SUCCESS;
×
542
}
543

544
static void freeUidCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
×
545
  (void)ud;
546
  if (value == NULL) {
×
547
    return;
×
548
  }
549

550
  const uint64_t* p = key;
×
551
  if (keyLen != sizeof(int64_t) * 4) {
×
552
    metaError("key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
×
553
    return;
×
554
  }
555

556
  SHashObj* pHashObj = (SHashObj*)p[0];
×
557

558
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
×
559

560
  if (pEntry != NULL && (*pEntry) != NULL) {
×
561
    int64_t st = taosGetTimestampUs();
×
562
    int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
×
563
    if (code == TSDB_CODE_SUCCESS) {
×
564
      double el = (taosGetTimestampUs() - st) / 1000.0;
×
565
      metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", taosHashGetSize((*pEntry)->set),
×
566
               el);
567
    }
568
  }
569

570
  taosMemoryFree(value);
×
571
}
572

573
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
×
574
  int32_t             code = TSDB_CODE_SUCCESS;
×
575
  int32_t             lino = 0;
×
576
  STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
×
577
  TSDB_CHECK_NULL(p, code, lino, _end, terrno);
×
578

579
  p->hitTimes = 0;
×
580
  p->set = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
×
581
  TSDB_CHECK_NULL(p->set, code, lino, _end, terrno);
×
582
  code = taosHashPut(p->set, pKey, keyLen, NULL, 0);
×
583
  TSDB_CHECK_CODE(code, lino, _end);
×
584
  code = taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
×
585
  TSDB_CHECK_CODE(code, lino, _end);
×
586

587
_end:
×
588
  if (code != TSDB_CODE_SUCCESS) {
×
589
    metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
590
    if (p != NULL) {
×
591
      if (p->set != NULL) {
×
592
        taosHashCleanup(p->set);
×
593
      }
594
      taosMemoryFree(p);
×
595
    }
596
  }
597
  return code;
×
598
}
599

600
// check both the payload size and selectivity ratio
601
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
×
602
                              int32_t payloadLen, double selectivityRatio) {
603
  int32_t code = 0;
×
604
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
605
  int32_t vgId = TD_VID(pMeta->pVnode);
×
606

607
  if (selectivityRatio > tsSelectivityRatio) {
×
608
    metaDebug("vgId:%d, suid:%" PRIu64
×
609
              " failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
610
              vgId, suid, selectivityRatio, tsSelectivityRatio);
611
    taosMemoryFree(pPayload);
×
612
    return TSDB_CODE_SUCCESS;
×
613
  }
614

615
  if (payloadLen > tsTagFilterResCacheSize) {
×
616
    metaDebug("vgId:%d, suid:%" PRIu64
×
617
              " failed to add to uid list cache, due to payload length %d greater than threshold %d",
618
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
619
    taosMemoryFree(pPayload);
×
620
    return TSDB_CODE_SUCCESS;
×
621
  }
622

623
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
×
624
  SHashObj*      pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
×
625
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
×
626

627
  uint64_t key[4] = {0};
×
628
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
×
629

630
  (void)taosThreadMutexLock(pLock);
×
631
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
×
632
  if (pEntry == NULL) {
×
633
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
×
634
    if (code != TSDB_CODE_SUCCESS) {
×
635
      goto _end;
×
636
    }
637
  } else {  // check if it exists or not
638
    code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
×
639
    if (code == TSDB_CODE_DUP_KEY) {
×
640
      // we have already found the existed items, no need to added to cache anymore.
641
      (void)taosThreadMutexUnlock(pLock);
×
642
      return TSDB_CODE_SUCCESS;
×
643
    }
644
    if (code != TSDB_CODE_SUCCESS) {
×
645
      goto _end;
×
646
    }
647
  }
648

649
  // add to cache.
650
  (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL, NULL,
×
651
                           TAOS_LRU_PRIORITY_LOW, NULL);
652
_end:
×
653
  (void)taosThreadMutexUnlock(pLock);
×
654
  metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
×
655
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
656

657
  return code;
×
658
}
659

660
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
UNCOV
661
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
×
UNCOV
662
  uint64_t  p[4] = {0};
×
UNCOV
663
  int32_t   vgId = TD_VID(pMeta->pVnode);
×
UNCOV
664
  SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
×
665

UNCOV
666
  uint64_t dummy[2] = {0};
×
UNCOV
667
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
×
668

UNCOV
669
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
×
UNCOV
670
  (void)taosThreadMutexLock(pLock);
×
671

UNCOV
672
  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
×
UNCOV
673
  if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
×
UNCOV
674
    (void)taosThreadMutexUnlock(pLock);
×
UNCOV
675
    return TSDB_CODE_SUCCESS;
×
676
  }
677

678
  (*pEntry)->hitTimes = 0;
×
679

680
  char *iter = taosHashIterate((*pEntry)->set, NULL);
×
681
  while (iter != NULL) {
×
682
    setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
683
    taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
×
684
    iter = taosHashIterate((*pEntry)->set, iter);
×
685
  }
686
  taosHashClear((*pEntry)->set);
×
687
  (void)taosThreadMutexUnlock(pLock);
×
688

689
  metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
×
690
  return TSDB_CODE_SUCCESS;
×
691
}
692

693
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
×
694
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
695
  int32_t vgId = TD_VID(pMeta->pVnode);
×
696

697
  // generate the composed key for LRU cache
698
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
×
699
  SHashObj*      pTableMap = pMeta->pCache->STbGroupResCache.pTableEntry;
×
700
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
×
701

702
  *pList = NULL;
×
703
  uint64_t key[4];
704
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
×
705

706
  (void)taosThreadMutexLock(pLock);
×
707
  pMeta->pCache->STbGroupResCache.accTimes += 1;
×
708

709
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
×
710
  if (pHandle == NULL) {
×
711
    (void)taosThreadMutexUnlock(pLock);
×
712
    return TSDB_CODE_SUCCESS;
×
713
  }
714

715
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
×
716
  if (NULL == pEntry) {
×
717
    metaDebug("suid %" PRIu64 " not in tb group cache", suid);
×
718
    return TSDB_CODE_NOT_FOUND;
×
719
  }
720

721
  *pList = taosArrayDup(taosLRUCacheValue(pCache, pHandle), NULL);
×
722

723
  (*pEntry)->hitTimes += 1;
×
724

725
  uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
×
726
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
×
727
    metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
×
728
             ((double)(*pEntry)->hitTimes) / acc);
729
  }
730

731
  bool ret = taosLRUCacheRelease(pCache, pHandle, false);
×
732

733
  // unlock meta
734
  (void)taosThreadMutexUnlock(pLock);
×
735
  return TSDB_CODE_SUCCESS;
×
736
}
737

738
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
×
739
  (void)ud;
740
  if (value == NULL) {
×
741
    return;
×
742
  }
743

744
  const uint64_t* p = key;
×
745
  if (keyLen != sizeof(int64_t) * 4) {
×
746
    metaError("tb group key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
×
747
    return;
×
748
  }
749

750
  SHashObj* pHashObj = (SHashObj*)p[0];
×
751

752
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
×
753

754
  if (pEntry != NULL && (*pEntry) != NULL) {
×
755
    int64_t st = taosGetTimestampUs();
×
756
    int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
×
757
    if (code == TSDB_CODE_SUCCESS) {
×
758
      double el = (taosGetTimestampUs() - st) / 1000.0;
×
759
      metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
×
760
                taosHashGetSize((*pEntry)->set), el);
761
    }
762
  }
763

764
  taosArrayDestroy((SArray*)value);
×
765
}
766

767
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
×
768
                              int32_t payloadLen) {
769
  int32_t code = 0;
×
770
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
771
  int32_t vgId = TD_VID(pMeta->pVnode);
×
772

773
  if (payloadLen > tsTagFilterResCacheSize) {
×
774
    metaDebug("vgId:%d, suid:%" PRIu64
×
775
              " ignore to add to tb group cache, due to payload length %d greater than threshold %d",
776
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
777
    taosArrayDestroy((SArray*)pPayload);
×
778
    return TSDB_CODE_SUCCESS;
×
779
  }
780

781
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
×
782
  SHashObj*      pTableEntry = pMeta->pCache->STbGroupResCache.pTableEntry;
×
783
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
×
784

785
  uint64_t key[4] = {0};
×
786
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
×
787

788
  (void)taosThreadMutexLock(pLock);
×
789
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
×
790
  if (pEntry == NULL) {
×
791
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
×
792
    if (code != TSDB_CODE_SUCCESS) {
×
793
      goto _end;
×
794
    }
795
  } else {  // check if it exists or not
796
    code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
×
797
    if (code == TSDB_CODE_DUP_KEY) {
×
798
      // we have already found the existed items, no need to added to cache anymore.
799
      (void)taosThreadMutexUnlock(pLock);
×
800
      return TSDB_CODE_SUCCESS;
×
801
    }
802
    if (code != TSDB_CODE_SUCCESS) {
×
803
      goto _end;
×
804
    }
805
  }
806

807
  // add to cache.
808
  (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL, NULL,
×
809
                           TAOS_LRU_PRIORITY_LOW, NULL);
810
_end:
×
811
  (void)taosThreadMutexUnlock(pLock);
×
812
  metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
×
813
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
814

815
  return code;
×
816
}
817

818
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
UNCOV
819
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
×
UNCOV
820
  uint64_t  p[4] = {0};
×
UNCOV
821
  int32_t   vgId = TD_VID(pMeta->pVnode);
×
UNCOV
822
  SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;
×
823

UNCOV
824
  uint64_t dummy[2] = {0};
×
UNCOV
825
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
×
826

UNCOV
827
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
×
UNCOV
828
  (void)taosThreadMutexLock(pLock);
×
829

UNCOV
830
  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
×
UNCOV
831
  if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
×
UNCOV
832
    (void)taosThreadMutexUnlock(pLock);
×
UNCOV
833
    return TSDB_CODE_SUCCESS;
×
834
  }
835

836
  (*pEntry)->hitTimes = 0;
×
837

838
  char *iter = taosHashIterate((*pEntry)->set, NULL);
×
839
  while (iter != NULL) {
×
840
    setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
841
    taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
×
842
    iter = taosHashIterate((*pEntry)->set, iter);
×
843
  }
844
  taosHashClear((*pEntry)->set);
×
845
  (void)taosThreadMutexUnlock(pLock);
×
846

847
  metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
×
848
  return TSDB_CODE_SUCCESS;
×
849
}
850

UNCOV
851
bool metaTbInFilterCache(SMeta* pMeta, const void* key, int8_t type) {
×
UNCOV
852
  if (type == 0 && taosHashGet(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t))) {
×
853
    return true;
×
854
  }
855

UNCOV
856
  if (type == 1 && taosHashGet(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key))) {
×
857
    return true;
×
858
  }
859

UNCOV
860
  return false;
×
861
}
862

863
int32_t metaPutTbToFilterCache(SMeta* pMeta, const void* key, int8_t type) {
×
864
  if (type == 0) {
×
865
    return taosHashPut(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t), NULL, 0);
×
866
  }
867

868
  if (type == 1) {
×
869
    return taosHashPut(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key), NULL, 0);
×
870
  }
871

872
  return 0;
×
873
}
874

875
int32_t metaSizeOfTbFilterCache(SMeta* pMeta, int8_t type) {
×
876
  if (type == 0) {
×
877
    return taosHashGetSize(pMeta->pCache->STbFilterCache.pStb);
×
878
  }
879
  return 0;
×
880
}
881

UNCOV
882
int32_t metaInitTbFilterCache(SMeta* pMeta) {
×
883
#ifdef TD_ENTERPRISE
UNCOV
884
  int32_t      tbNum = 0;
×
UNCOV
885
  const char** pTbArr = NULL;
×
UNCOV
886
  const char*  dbName = NULL;
×
887

UNCOV
888
  if (!(dbName = strchr(pMeta->pVnode->config.dbname, '.'))) return 0;
×
UNCOV
889
  if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
×
890
    tbNum = tkLogStbNum;
×
891
    pTbArr = (const char**)&tkLogStb;
×
UNCOV
892
  } else if (0 == strncmp(dbName, "audit", TSDB_DB_NAME_LEN)) {
×
893
    tbNum = tkAuditStbNum;
×
894
    pTbArr = (const char**)&tkAuditStb;
×
895
  }
UNCOV
896
  if (tbNum && pTbArr) {
×
897
    for (int32_t i = 0; i < tbNum; ++i) {
×
898
      TAOS_CHECK_RETURN(metaPutTbToFilterCache(pMeta, pTbArr[i], 1));
×
899
    }
900
  }
901
#else
902
#endif
UNCOV
903
  return 0;
×
904
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc