• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3541

26 Nov 2024 03:56AM UTC coverage: 60.776% (-0.07%) from 60.846%
#3541

push

travis-ci

web-flow
Merge pull request #28920 from taosdata/fix/TD-33008-3.0

fix(query)[TD-33008]. fix error handling in tsdbCacheRead

120076 of 252763 branches covered (47.51%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

1395 existing lines in 154 files now uncovered.

200995 of 275526 relevant lines covered (72.95%)

19612328.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.42
/source/dnode/vnode/src/meta/metaCache.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "meta.h"
16

17
#ifdef TD_ENTERPRISE
18
extern const char* tkLogStb[];
19
extern const char* tkAuditStb[];
20
extern const int   tkLogStbNum;
21
extern const int   tkAuditStbNum;
22
#endif
23

24
#define TAG_FILTER_RES_KEY_LEN  32
25
#define META_CACHE_BASE_BUCKET  1024
26
#define META_CACHE_STATS_BUCKET 16
27

28
// (uid , suid) : child table
29
// (uid,     0) : normal table
30
// (suid, suid) : super table
31
typedef struct SMetaCacheEntry SMetaCacheEntry;
32
struct SMetaCacheEntry {
33
  SMetaCacheEntry* next;
34
  SMetaInfo        info;
35
};
36

37
typedef struct SMetaStbStatsEntry {
38
  struct SMetaStbStatsEntry* next;
39
  SMetaStbStats              info;
40
} SMetaStbStatsEntry;
41

42
typedef struct STagFilterResEntry {
43
  SHashObj *set;    // the set of md5 digest, extracted from the serialized tag query condition
44
  uint32_t hitTimes;  // queried times for current super table
45
} STagFilterResEntry;
46

47
struct SMetaCache {
48
  // child, normal, super, table entry cache
49
  struct SEntryCache {
50
    int32_t           nEntry;
51
    int32_t           nBucket;
52
    SMetaCacheEntry** aBucket;
53
  } sEntryCache;
54

55
  // stable stats cache
56
  struct SStbStatsCache {
57
    int32_t              nEntry;
58
    int32_t              nBucket;
59
    SMetaStbStatsEntry** aBucket;
60
  } sStbStatsCache;
61

62
  // query cache
63
  struct STagFilterResCache {
64
    TdThreadMutex lock;
65
    uint32_t      accTimes;
66
    SHashObj*     pTableEntry;
67
    SLRUCache*    pUidResCache;
68
  } sTagFilterResCache;
69

70
  struct STbGroupResCache {
71
    TdThreadMutex lock;
72
    uint32_t      accTimes;
73
    SHashObj*     pTableEntry;
74
    SLRUCache*    pResCache;
75
  } STbGroupResCache;
76

77
  struct STbFilterCache {
78
    SHashObj* pStb;
79
    SHashObj* pStbName;
80
  } STbFilterCache;
81
};
82

83
static void entryCacheClose(SMeta* pMeta) {
13,702✔
84
  if (pMeta->pCache) {
13,702!
85
    // close entry cache
86
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
14,254,811✔
87
      SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
14,241,640✔
88
      while (pEntry) {
14,572,353✔
89
        SMetaCacheEntry* tEntry = pEntry->next;
331,244✔
90
        taosMemoryFree(pEntry);
331,244✔
91
        pEntry = tEntry;
330,713✔
92
      }
93
    }
94
    taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
13,171✔
95
  }
96
}
13,702✔
97

98
static void statsCacheClose(SMeta* pMeta) {
13,702✔
99
  if (pMeta->pCache) {
13,702!
100
    // close entry cache
101
    for (int32_t iBucket = 0; iBucket < pMeta->pCache->sStbStatsCache.nBucket; iBucket++) {
242,851✔
102
      SMetaStbStatsEntry* pEntry = pMeta->pCache->sStbStatsCache.aBucket[iBucket];
229,150✔
103
      while (pEntry) {
251,106✔
104
        SMetaStbStatsEntry* tEntry = pEntry->next;
21,957✔
105
        taosMemoryFree(pEntry);
21,957✔
106
        pEntry = tEntry;
21,956✔
107
      }
108
    }
109
    taosMemoryFree(pMeta->pCache->sStbStatsCache.aBucket);
13,701✔
110
  }
111
}
13,701✔
112

113
static void freeCacheEntryFp(void* param) {
×
114
  STagFilterResEntry** p = param;
×
115
  taosHashCleanup((*p)->set);
×
116
  taosMemoryFreeClear(*p);
×
117
}
×
118

119
int32_t metaCacheOpen(SMeta* pMeta) {
13,700✔
120
  int32_t code = 0;
13,700✔
121
  int32_t lino;
122

123
  pMeta->pCache = (SMetaCache*)taosMemoryCalloc(1, sizeof(SMetaCache));
13,700✔
124
  if (pMeta->pCache == NULL) {
13,700!
125
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
126
  }
127

128
  // open entry cache
129
  pMeta->pCache->sEntryCache.nEntry = 0;
13,700✔
130
  pMeta->pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
13,700✔
131
  pMeta->pCache->sEntryCache.aBucket =
27,402✔
132
      (SMetaCacheEntry**)taosMemoryCalloc(pMeta->pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
13,700✔
133
  if (pMeta->pCache->sEntryCache.aBucket == NULL) {
13,702!
134
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
135
  }
136

137
  // open stats cache
138
  pMeta->pCache->sStbStatsCache.nEntry = 0;
13,702✔
139
  pMeta->pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
13,702✔
140
  pMeta->pCache->sStbStatsCache.aBucket =
27,400✔
141
      (SMetaStbStatsEntry**)taosMemoryCalloc(pMeta->pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
13,702✔
142
  if (pMeta->pCache->sStbStatsCache.aBucket == NULL) {
13,698!
143
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
144
  }
145

146
  pMeta->pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
13,698✔
147
  if (pMeta->pCache->sTagFilterResCache.pUidResCache == NULL) {
13,701!
148
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
149
  }
150

151
  pMeta->pCache->sTagFilterResCache.accTimes = 0;
13,701✔
152
  pMeta->pCache->sTagFilterResCache.pTableEntry =
27,403✔
153
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
13,701✔
154
  if (pMeta->pCache->sTagFilterResCache.pTableEntry == NULL) {
13,702!
155
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
156
  }
157

158
  taosHashSetFreeFp(pMeta->pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
13,702✔
159
  (void)taosThreadMutexInit(&pMeta->pCache->sTagFilterResCache.lock, NULL);
13,702✔
160

161
  pMeta->pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
13,702✔
162
  if (pMeta->pCache->STbGroupResCache.pResCache == NULL) {
13,701!
163
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
164
  }
165

166
  pMeta->pCache->STbGroupResCache.accTimes = 0;
13,701✔
167
  pMeta->pCache->STbGroupResCache.pTableEntry =
27,403✔
168
      taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
13,701✔
169
  if (pMeta->pCache->STbGroupResCache.pTableEntry == NULL) {
13,702!
170
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
171
  }
172

173
  taosHashSetFreeFp(pMeta->pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
13,702✔
174
  (void)taosThreadMutexInit(&pMeta->pCache->STbGroupResCache.lock, NULL);
13,702✔
175

176
  pMeta->pCache->STbFilterCache.pStb =
27,404✔
177
      taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
13,702✔
178
  if (pMeta->pCache->STbFilterCache.pStb == NULL) {
13,702!
179
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
180
  }
181

182
  pMeta->pCache->STbFilterCache.pStbName =
27,404✔
183
      taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
13,702✔
184
  if (pMeta->pCache->STbFilterCache.pStbName == NULL) {
13,702!
185
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
186
  }
187

188
_exit:
13,702✔
189
  if (code) {
13,702!
190
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
191
    metaCacheClose(pMeta);
×
192
  } else {
193
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
13,702✔
194
  }
195
  return code;
13,702✔
196
}
197

198
void metaCacheClose(SMeta* pMeta) {
13,702✔
199
  if (pMeta->pCache) {
13,702!
200
    entryCacheClose(pMeta);
13,702✔
201
    statsCacheClose(pMeta);
13,702✔
202

203
    taosHashClear(pMeta->pCache->sTagFilterResCache.pTableEntry);
13,701✔
204
    taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
13,702✔
205
    (void)taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
13,702✔
206
    taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
13,702✔
207

208
    taosHashClear(pMeta->pCache->STbGroupResCache.pTableEntry);
13,702✔
209
    taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
13,702✔
210
    (void)taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
13,701✔
211
    taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);
13,702✔
212

213
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStb);
13,702✔
214
    taosHashCleanup(pMeta->pCache->STbFilterCache.pStbName);
13,702✔
215

216
    taosMemoryFree(pMeta->pCache);
13,702✔
217
    pMeta->pCache = NULL;
13,701✔
218
  }
219
}
13,701✔
220

221
static void metaRehashCache(SMetaCache* pCache, int8_t expand) {
94✔
222
  int32_t code = 0;
94✔
223
  int32_t nBucket;
224

225
  if (expand) {
94!
226
    nBucket = pCache->sEntryCache.nBucket * 2;
94✔
227
  } else {
228
    nBucket = pCache->sEntryCache.nBucket / 2;
×
229
  }
230

231
  SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
94✔
232
  if (aBucket == NULL) {
94!
233
    return;
×
234
  }
235

236
  // rehash
237
  for (int32_t iBucket = 0; iBucket < pCache->sEntryCache.nBucket; iBucket++) {
218,206✔
238
    SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
218,112✔
239

240
    while (pEntry) {
436,224✔
241
      SMetaCacheEntry* pTEntry = pEntry->next;
218,112✔
242

243
      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
218,112✔
244
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
218,112✔
245

246
      pEntry = pTEntry;
218,112✔
247
    }
248
  }
249

250
  // final set
251
  taosMemoryFree(pCache->sEntryCache.aBucket);
94✔
252
  pCache->sEntryCache.nBucket = nBucket;
94✔
253
  pCache->sEntryCache.aBucket = aBucket;
94✔
254
  return;
94✔
255
}
256

257
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
366,506✔
258
  int32_t code = 0;
366,506✔
259

260
  // meta is wlocked for calling this func.
261

262
  // search
263
  SMetaCache*       pCache = pMeta->pCache;
366,506✔
264
  int32_t           iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
366,506✔
265
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
366,506✔
266
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
408,961✔
267
    ppEntry = &(*ppEntry)->next;
42,455✔
268
  }
269

270
  if (*ppEntry) {  // update
366,506✔
271
    if (pInfo->suid != (*ppEntry)->info.suid) {
12,327!
272
      metaError("meta/cache: suid should be same as the one in cache.");
×
273
      return TSDB_CODE_INVALID_PARA;
×
274
    }
275
    if (pInfo->version > (*ppEntry)->info.version) {
12,327!
276
      (*ppEntry)->info.version = pInfo->version;
12,328✔
277
      (*ppEntry)->info.skmVer = pInfo->skmVer;
12,328✔
278
    }
279
  } else {  // insert
280
    if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
354,179✔
281
      metaRehashCache(pCache, 1);
94✔
282

283
      iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
94✔
284
    }
285

286
    SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
354,179✔
287
    if (pEntryNew == NULL) {
354,236✔
288
      code = terrno;
19✔
289
      goto _exit;
×
290
    }
291

292
    pEntryNew->info = *pInfo;
354,217✔
293
    pEntryNew->next = pCache->sEntryCache.aBucket[iBucket];
354,217✔
294
    pCache->sEntryCache.aBucket[iBucket] = pEntryNew;
354,217✔
295
    pCache->sEntryCache.nEntry++;
354,217✔
296
  }
297

298
_exit:
366,544✔
299
  return code;
366,544✔
300
}
301

302
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
25,896✔
303
  int32_t code = 0;
25,896✔
304

305
  SMetaCache*       pCache = pMeta->pCache;
25,896✔
306
  int32_t           iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
25,896✔
307
  SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
25,896✔
308
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
25,905✔
309
    ppEntry = &(*ppEntry)->next;
9✔
310
  }
311

312
  SMetaCacheEntry* pEntry = *ppEntry;
25,896✔
313
  if (pEntry) {
25,896✔
314
    *ppEntry = pEntry->next;
20,623✔
315
    taosMemoryFree(pEntry);
20,623✔
316
    pCache->sEntryCache.nEntry--;
20,622✔
317
    if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
20,622✔
318
        pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
19,656!
319
      metaRehashCache(pCache, 0);
×
320
    }
321
  } else {
322
    code = TSDB_CODE_NOT_FOUND;
5,273✔
323
  }
324

325
_exit:
25,895✔
326
  return code;
25,895✔
327
}
328

329
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
39,242,652✔
330
  int32_t code = 0;
39,242,652✔
331

332
  SMetaCache*      pCache = pMeta->pCache;
39,242,652✔
333
  int32_t          iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
39,242,652✔
334
  SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
39,242,652✔
335

336
  while (pEntry && pEntry->info.uid != uid) {
49,246,050✔
337
    pEntry = pEntry->next;
10,003,398✔
338
  }
339

340
  if (pEntry) {
39,242,652✔
341
    if (pInfo) {
39,138,612!
342
      *pInfo = pEntry->info;
39,138,719✔
343
    }
344
  } else {
345
    code = TSDB_CODE_NOT_FOUND;
104,040✔
346
  }
347

348
  return code;
39,242,652✔
349
}
350

351
static int32_t metaRehashStatsCache(SMetaCache* pCache, int8_t expand) {
312✔
352
  int32_t code = 0;
312✔
353
  int32_t nBucket;
354

355
  if (expand) {
312✔
356
    nBucket = pCache->sStbStatsCache.nBucket * 2;
276✔
357
  } else {
358
    nBucket = pCache->sStbStatsCache.nBucket / 2;
36✔
359
  }
360

361
  SMetaStbStatsEntry** aBucket = (SMetaStbStatsEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaStbStatsEntry*));
312✔
362
  if (aBucket == NULL) {
312!
363
    code = terrno;
×
364
    goto _exit;
×
365
  }
366

367
  // rehash
368
  for (int32_t iBucket = 0; iBucket < pCache->sStbStatsCache.nBucket; iBucket++) {
12,632✔
369
    SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
12,320✔
370

371
    while (pEntry) {
23,404✔
372
      SMetaStbStatsEntry* pTEntry = pEntry->next;
11,084✔
373

374
      pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
11,084✔
375
      aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
11,084✔
376

377
      pEntry = pTEntry;
11,084✔
378
    }
379
  }
380

381
  // final set
382
  taosMemoryFree(pCache->sStbStatsCache.aBucket);
312✔
383
  pCache->sStbStatsCache.nBucket = nBucket;
312✔
384
  pCache->sStbStatsCache.aBucket = aBucket;
312✔
385

386
_exit:
312✔
387
  return code;
312✔
388
}
389

390
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
227,023✔
391
  int32_t code = 0;
227,023✔
392

393
  // meta is wlocked for calling this func.
394

395
  // search
396
  SMetaCache*          pCache = pMeta->pCache;
227,023✔
397
  int32_t              iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
227,023✔
398
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
227,023✔
399
  while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
246,223✔
400
    ppEntry = &(*ppEntry)->next;
19,200✔
401
  }
402

403
  if (*ppEntry) {  // update
227,023✔
404
    (*ppEntry)->info.ctbNum = pInfo->ctbNum;
202,191✔
405
  } else {  // insert
406
    if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
24,832✔
407
      TAOS_UNUSED(metaRehashStatsCache(pCache, 1));
276✔
408
      iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
276✔
409
    }
410

411
    SMetaStbStatsEntry* pEntryNew = (SMetaStbStatsEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
24,832✔
412
    if (pEntryNew == NULL) {
24,845!
UNCOV
413
      code = terrno;
×
414
      goto _exit;
×
415
    }
416

417
    pEntryNew->info = *pInfo;
24,845✔
418
    pEntryNew->next = pCache->sStbStatsCache.aBucket[iBucket];
24,845✔
419
    pCache->sStbStatsCache.aBucket[iBucket] = pEntryNew;
24,845✔
420
    pCache->sStbStatsCache.nEntry++;
24,845✔
421
  }
422

423
_exit:
227,036✔
424
  return code;
227,036✔
425
}
426

427
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
3,377✔
428
  int32_t code = 0;
3,377✔
429

430
  SMetaCache*          pCache = pMeta->pCache;
3,377✔
431
  int32_t              iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
3,377✔
432
  SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
3,377✔
433
  while (*ppEntry && (*ppEntry)->info.uid != uid) {
3,611✔
434
    ppEntry = &(*ppEntry)->next;
234✔
435
  }
436

437
  SMetaStbStatsEntry* pEntry = *ppEntry;
3,377✔
438
  if (pEntry) {
3,377✔
439
    *ppEntry = pEntry->next;
2,889✔
440
    taosMemoryFree(pEntry);
2,889✔
441
    pCache->sStbStatsCache.nEntry--;
2,889✔
442
    if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
2,889✔
443
        pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
2,020✔
444
      TAOS_UNUSED(metaRehashStatsCache(pCache, 0));
36✔
445
    }
446
  } else {
447
    code = TSDB_CODE_NOT_FOUND;
488✔
448
  }
449

450
_exit:
3,377✔
451
  return code;
3,377✔
452
}
453

454
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
573,052✔
455
  int32_t code = TSDB_CODE_SUCCESS;
573,052✔
456

457
  SMetaCache*         pCache = pMeta->pCache;
573,052✔
458
  int32_t             iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
573,052✔
459
  SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
573,052✔
460

461
  while (pEntry && pEntry->info.uid != uid) {
631,922✔
462
    pEntry = pEntry->next;
58,870✔
463
  }
464

465
  if (pEntry) {
573,052✔
466
    if (pInfo) {
542,911✔
467
      *pInfo = pEntry->info;
542,901✔
468
    }
469
  } else {
470
    code = TSDB_CODE_NOT_FOUND;
30,141✔
471
  }
472

473
  return code;
573,052✔
474
}
475

476
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
477
  memcpy(&pBuf[2], key, keyLen);
416,952✔
478
}
×
479

480
// the format of key:
481
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
482
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
416,952✔
483
  buf[0] = (uint64_t)pHashMap;
416,952✔
484
  buf[1] = suid;
416,952✔
485
  setMD5DigestInKey(buf, key, keyLen);
486
}
416,952✔
487

488
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
×
489
                                  bool* acquireRes) {
490
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
491
  int32_t vgId = TD_VID(pMeta->pVnode);
×
492

493
  // generate the composed key for LRU cache
494
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
×
495
  SHashObj*      pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
×
496
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
×
497

498
  *acquireRes = 0;
×
499
  uint64_t key[4];
500
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
×
501

502
  (void)taosThreadMutexLock(pLock);
×
503
  pMeta->pCache->sTagFilterResCache.accTimes += 1;
×
504

505
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
×
506
  if (pHandle == NULL) {
×
507
    (void)taosThreadMutexUnlock(pLock);
×
508
    return TSDB_CODE_SUCCESS;
×
509
  }
510

511
  // do some book mark work after acquiring the filter result from cache
512
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
×
513
  if (NULL == pEntry) {
×
514
    metaError("meta/cache: pEntry should not be NULL.");
×
515
    return TSDB_CODE_NOT_FOUND;
×
516
  }
517

518
  *acquireRes = 1;
×
519

520
  const char* p = taosLRUCacheValue(pCache, pHandle);
×
521
  int32_t     size = *(int32_t*)p;
×
522

523
  // set the result into the buffer
524
  if (taosArrayAddBatch(pList1, p + sizeof(int32_t), size) == NULL) {
×
525
    return terrno;
×
526
  }
527

528
  (*pEntry)->hitTimes += 1;
×
529

530
  uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
×
531
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
×
532
    metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
×
533
             ((double)(*pEntry)->hitTimes) / acc);
534
  }
535

536
  bool ret = taosLRUCacheRelease(pCache, pHandle, false);
×
537

538
  // unlock meta
539
  (void)taosThreadMutexUnlock(pLock);
×
540
  return TSDB_CODE_SUCCESS;
×
541
}
542

543
static void freeUidCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
×
544
  (void)ud;
545
  if (value == NULL) {
×
546
    return;
×
547
  }
548

549
  const uint64_t* p = key;
×
550
  if (keyLen != sizeof(int64_t) * 4) {
×
551
    metaError("key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
×
552
    return;
×
553
  }
554

555
  SHashObj* pHashObj = (SHashObj*)p[0];
×
556

557
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
×
558

559
  if (pEntry != NULL && (*pEntry) != NULL) {
×
560
    int64_t st = taosGetTimestampUs();
×
561
    int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
×
562
    if (code == TSDB_CODE_SUCCESS) {
×
563
      double el = (taosGetTimestampUs() - st) / 1000.0;
×
564
      metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", taosHashGetSize((*pEntry)->set),
×
565
               el);
566
    }
567
  }
568

569
  taosMemoryFree(value);
×
570
}
571

572
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
×
573
  int32_t             code = TSDB_CODE_SUCCESS;
×
574
  int32_t             lino = 0;
×
575
  STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
×
576
  TSDB_CHECK_NULL(p, code, lino, _end, terrno);
×
577

578
  p->hitTimes = 0;
×
579
  p->set = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
×
580
  TSDB_CHECK_NULL(p->set, code, lino, _end, terrno);
×
581
  code = taosHashPut(p->set, pKey, keyLen, NULL, 0);
×
582
  TSDB_CHECK_CODE(code, lino, _end);
×
583
  code = taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
×
584
  TSDB_CHECK_CODE(code, lino, _end);
×
585

586
_end:
×
587
  if (code != TSDB_CODE_SUCCESS) {
×
588
    metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
589
    if (p != NULL) {
×
590
      if (p->set != NULL) {
×
591
        taosHashCleanup(p->set);
×
592
      }
593
      taosMemoryFree(p);
×
594
    }
595
  }
596
  return code;
×
597
}
598

599
// check both the payload size and selectivity ratio
600
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
×
601
                              int32_t payloadLen, double selectivityRatio) {
602
  int32_t code = 0;
×
603
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
604
  int32_t vgId = TD_VID(pMeta->pVnode);
×
605

606
  if (selectivityRatio > tsSelectivityRatio) {
×
607
    metaDebug("vgId:%d, suid:%" PRIu64
×
608
              " failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
609
              vgId, suid, selectivityRatio, tsSelectivityRatio);
610
    taosMemoryFree(pPayload);
×
611
    return TSDB_CODE_SUCCESS;
×
612
  }
613

614
  if (payloadLen > tsTagFilterResCacheSize) {
×
615
    metaDebug("vgId:%d, suid:%" PRIu64
×
616
              " failed to add to uid list cache, due to payload length %d greater than threshold %d",
617
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
618
    taosMemoryFree(pPayload);
×
619
    return TSDB_CODE_SUCCESS;
×
620
  }
621

622
  SLRUCache*     pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
×
623
  SHashObj*      pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
×
624
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
×
625

626
  uint64_t key[4] = {0};
×
627
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
×
628

629
  (void)taosThreadMutexLock(pLock);
×
630
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
×
631
  if (pEntry == NULL) {
×
632
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
×
633
    if (code != TSDB_CODE_SUCCESS) {
×
634
      goto _end;
×
635
    }
636
  } else {  // check if it exists or not
637
    code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
×
638
    if (code == TSDB_CODE_DUP_KEY) {
×
639
      // we have already found the existed items, no need to added to cache anymore.
640
      (void)taosThreadMutexUnlock(pLock);
×
641
      return TSDB_CODE_SUCCESS;
×
642
    }
643
    if (code != TSDB_CODE_SUCCESS) {
×
644
      goto _end;
×
645
    }
646
  }
647

648
  // add to cache.
649
  (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL, NULL,
×
650
                           TAOS_LRU_PRIORITY_LOW, NULL);
651
_end:
×
652
  (void)taosThreadMutexUnlock(pLock);
×
653
  metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
×
654
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
655

656
  return code;
×
657
}
658

659
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
660
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
208,472✔
661
  uint64_t  p[4] = {0};
208,472✔
662
  int32_t   vgId = TD_VID(pMeta->pVnode);
208,472✔
663
  SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
208,472✔
664

665
  uint64_t dummy[2] = {0};
208,472✔
666
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
208,472✔
667

668
  TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
208,497✔
669
  (void)taosThreadMutexLock(pLock);
208,497✔
670

671
  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
208,519✔
672
  if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
208,510!
673
    (void)taosThreadMutexUnlock(pLock);
208,510✔
674
    return TSDB_CODE_SUCCESS;
208,531✔
675
  }
676

677
  (*pEntry)->hitTimes = 0;
×
678

679
  char *iter = taosHashIterate((*pEntry)->set, NULL);
×
680
  while (iter != NULL) {
×
681
    setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
682
    taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
×
683
    iter = taosHashIterate((*pEntry)->set, iter);
×
684
  }
685
  taosHashClear((*pEntry)->set);
×
686
  (void)taosThreadMutexUnlock(pLock);
×
687

688
  metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
×
689
  return TSDB_CODE_SUCCESS;
×
690
}
691

692
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
×
693
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
694
  int32_t vgId = TD_VID(pMeta->pVnode);
×
695

696
  // generate the composed key for LRU cache
697
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
×
698
  SHashObj*      pTableMap = pMeta->pCache->STbGroupResCache.pTableEntry;
×
699
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
×
700

701
  *pList = NULL;
×
702
  uint64_t key[4];
703
  initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
×
704

705
  (void)taosThreadMutexLock(pLock);
×
706
  pMeta->pCache->STbGroupResCache.accTimes += 1;
×
707

708
  LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
×
709
  if (pHandle == NULL) {
×
710
    (void)taosThreadMutexUnlock(pLock);
×
711
    return TSDB_CODE_SUCCESS;
×
712
  }
713

714
  STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
×
715
  if (NULL == pEntry) {
×
716
    metaDebug("suid %" PRIu64 " not in tb group cache", suid);
×
717
    return TSDB_CODE_NOT_FOUND;
×
718
  }
719

720
  *pList = taosArrayDup(taosLRUCacheValue(pCache, pHandle), NULL);
×
721

722
  (*pEntry)->hitTimes += 1;
×
723

724
  uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
×
725
  if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
×
726
    metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
×
727
             ((double)(*pEntry)->hitTimes) / acc);
728
  }
729

730
  bool ret = taosLRUCacheRelease(pCache, pHandle, false);
×
731

732
  // unlock meta
733
  (void)taosThreadMutexUnlock(pLock);
×
734
  return TSDB_CODE_SUCCESS;
×
735
}
736

737
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
×
738
  (void)ud;
739
  if (value == NULL) {
×
740
    return;
×
741
  }
742

743
  const uint64_t* p = key;
×
744
  if (keyLen != sizeof(int64_t) * 4) {
×
745
    metaError("tb group key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
×
746
    return;
×
747
  }
748

749
  SHashObj* pHashObj = (SHashObj*)p[0];
×
750

751
  STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
×
752

753
  if (pEntry != NULL && (*pEntry) != NULL) {
×
754
    int64_t st = taosGetTimestampUs();
×
755
    int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
×
756
    if (code == TSDB_CODE_SUCCESS) {
×
757
      double el = (taosGetTimestampUs() - st) / 1000.0;
×
758
      metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
×
759
                taosHashGetSize((*pEntry)->set), el);
760
    }
761
  }
762

763
  taosArrayDestroy((SArray*)value);
×
764
}
765

766
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
×
767
                              int32_t payloadLen) {
768
  int32_t code = 0;
×
769
  SMeta*  pMeta = ((SVnode*)pVnode)->pMeta;
×
770
  int32_t vgId = TD_VID(pMeta->pVnode);
×
771

772
  if (payloadLen > tsTagFilterResCacheSize) {
×
773
    metaDebug("vgId:%d, suid:%" PRIu64
×
774
              " ignore to add to tb group cache, due to payload length %d greater than threshold %d",
775
              vgId, suid, payloadLen, tsTagFilterResCacheSize);
776
    taosArrayDestroy((SArray*)pPayload);
×
777
    return TSDB_CODE_SUCCESS;
×
778
  }
779

780
  SLRUCache*     pCache = pMeta->pCache->STbGroupResCache.pResCache;
×
781
  SHashObj*      pTableEntry = pMeta->pCache->STbGroupResCache.pTableEntry;
×
782
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
×
783

784
  uint64_t key[4] = {0};
×
785
  initCacheKey(key, pTableEntry, suid, pKey, keyLen);
×
786

787
  (void)taosThreadMutexLock(pLock);
×
788
  STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
×
789
  if (pEntry == NULL) {
×
790
    code = addNewEntry(pTableEntry, pKey, keyLen, suid);
×
791
    if (code != TSDB_CODE_SUCCESS) {
×
792
      goto _end;
×
793
    }
794
  } else {  // check if it exists or not
795
    code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
×
796
    if (code == TSDB_CODE_DUP_KEY) {
×
797
      // we have already found the existed items, no need to added to cache anymore.
798
      (void)taosThreadMutexUnlock(pLock);
×
799
      return TSDB_CODE_SUCCESS;
×
800
    }
801
    if (code != TSDB_CODE_SUCCESS) {
×
802
      goto _end;
×
803
    }
804
  }
805

806
  // add to cache.
807
  (void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL, NULL,
×
808
                           TAOS_LRU_PRIORITY_LOW, NULL);
809
_end:
×
810
  (void)taosThreadMutexUnlock(pLock);
×
811
  metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
×
812
            (int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
813

814
  return code;
×
815
}
816

817
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
818
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
208,501✔
819
  uint64_t  p[4] = {0};
208,501✔
820
  int32_t   vgId = TD_VID(pMeta->pVnode);
208,501✔
821
  SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;
208,501✔
822

823
  uint64_t dummy[2] = {0};
208,501✔
824
  initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
208,501✔
825

826
  TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
208,513✔
827
  (void)taosThreadMutexLock(pLock);
208,513✔
828

829
  STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
208,527✔
830
  if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
208,513!
831
    (void)taosThreadMutexUnlock(pLock);
208,513✔
832
    return TSDB_CODE_SUCCESS;
208,520✔
833
  }
834

835
  (*pEntry)->hitTimes = 0;
×
836

837
  char *iter = taosHashIterate((*pEntry)->set, NULL);
×
838
  while (iter != NULL) {
×
839
    setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
840
    taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
×
841
    iter = taosHashIterate((*pEntry)->set, iter);
×
842
  }
843
  taosHashClear((*pEntry)->set);
×
844
  (void)taosThreadMutexUnlock(pLock);
×
845

846
  metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
×
847
  return TSDB_CODE_SUCCESS;
×
848
}
849

850
bool metaTbInFilterCache(SMeta* pMeta, const void* key, int8_t type) {
203,367✔
851
  if (type == 0 && taosHashGet(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t))) {
203,367!
852
    return true;
10✔
853
  }
854

855
  if (type == 1 && taosHashGet(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key))) {
203,357!
856
    return true;
×
857
  }
858

859
  return false;
203,365✔
860
}
861

862
int32_t metaPutTbToFilterCache(SMeta* pMeta, const void* key, int8_t type) {
62✔
863
  if (type == 0) {
62✔
864
    return taosHashPut(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t), NULL, 0);
24✔
865
  }
866

867
  if (type == 1) {
38!
868
    return taosHashPut(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key), NULL, 0);
38✔
869
  }
870

871
  return 0;
×
872
}
873

874
int32_t metaSizeOfTbFilterCache(SMeta* pMeta, int8_t type) {
22✔
875
  if (type == 0) {
22!
876
    return taosHashGetSize(pMeta->pCache->STbFilterCache.pStb);
22✔
877
  }
878
  return 0;
×
879
}
880

881
int32_t metaInitTbFilterCache(SMeta* pMeta) {
13,692✔
882
#ifdef TD_ENTERPRISE
883
  int32_t      tbNum = 0;
13,692✔
884
  const char** pTbArr = NULL;
13,692✔
885
  const char*  dbName = NULL;
13,692✔
886

887
  if (!(dbName = strchr(pMeta->pVnode->config.dbname, '.'))) return 0;
13,692!
888
  if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
13,692✔
889
    tbNum = tkLogStbNum;
2✔
890
    pTbArr = (const char**)&tkLogStb;
2✔
891
  } else if (0 == strncmp(dbName, "audit", TSDB_DB_NAME_LEN)) {
13,690!
892
    tbNum = tkAuditStbNum;
×
893
    pTbArr = (const char**)&tkAuditStb;
×
894
  }
895
  if (tbNum && pTbArr) {
13,692!
896
    for (int32_t i = 0; i < tbNum; ++i) {
40✔
897
      TAOS_CHECK_RETURN(metaPutTbToFilterCache(pMeta, pTbArr[i], 1));
38!
898
    }
899
  }
900
#else
901
#endif
902
  return 0;
13,692✔
903
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc