• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.13
/source/dnode/vnode/src/meta/metaQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "osMemory.h"
18
#include "tencode.h"
19

20
void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta *pAPI) {
10,279,103✔
21
  SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
10,279,103✔
22
  metaReaderDoInit(pReader, pMeta, flags);
10,279,103✔
23
  pReader->pAPI = pAPI;
10,291,577✔
24
}
10,291,577✔
25

26
void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
14,171,764✔
27
  memset(pReader, 0, sizeof(*pReader));
14,171,764✔
28
  pReader->pMeta = pMeta;
14,171,764✔
29
  pReader->flags = flags;
14,171,764✔
30
  if (pReader->pMeta && !(flags & META_READER_NOLOCK)) {
14,171,764!
31
    metaRLock(pMeta);
9,947,156✔
32
  }
33
}
14,172,285✔
34

35
void metaReaderReleaseLock(SMetaReader *pReader) {
1,830,805✔
36
  if (pReader->pMeta && !(pReader->flags & META_READER_NOLOCK)) {
1,830,805!
37
    metaULock(pReader->pMeta);
1,831,275✔
38
    pReader->flags |= META_READER_NOLOCK;
1,831,852✔
39
  }
40
}
1,831,382✔
41

42
void metaReaderClear(SMetaReader *pReader) {
14,846,069✔
43
  if (pReader->pMeta && !(pReader->flags & META_READER_NOLOCK)) {
14,846,069✔
44
    metaULock(pReader->pMeta);
8,112,572✔
45
  }
46
  tDecoderClear(&pReader->coder);
14,844,678✔
47
  tdbFree(pReader->pBuf);
14,844,455✔
48
  pReader->pBuf = NULL;
14,855,214✔
49
}
14,855,214✔
50

51
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
24,450,085✔
52
  int32_t  code = 0;
24,450,085✔
53
  SMeta   *pMeta = pReader->pMeta;
24,450,085✔
54
  STbDbKey tbDbKey = {.version = version, .uid = uid};
24,450,085✔
55

56
  // query table.db
57
  if ((code = tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf)) < 0) {
24,450,085!
58
    return terrno = (TSDB_CODE_NOT_FOUND == code ? TSDB_CODE_PAR_TABLE_NOT_EXIST : code);
×
59
  }
60

61
  // decode the entry
62
  tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
24,471,770✔
63

64
  code = metaDecodeEntry(&pReader->coder, &pReader->me);
24,455,410✔
65
  if (code) {
24,330,644!
66
    tDecoderClear(&pReader->coder);
×
67
    return code;
×
68
  }
69
  // taosMemoryFreeClear(pReader->me.colCmpr.pColCmpr);
70

71
  return 0;
24,330,644✔
72
}
73

74
bool metaIsTableExist(void *pVnode, tb_uid_t uid) {
725,673✔
75
  SVnode *pVnodeObj = pVnode;
725,673✔
76
  metaRLock(pVnodeObj->pMeta);  // query uid.idx
725,673✔
77

78
  if (tdbTbGet(pVnodeObj->pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) {
725,815✔
79
    metaULock(pVnodeObj->pMeta);
1,801✔
80
    return false;
1,801✔
81
  }
82

83
  metaULock(pVnodeObj->pMeta);
724,040✔
84
  return true;
724,051✔
85
}
86

87
int metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
6,047,261✔
88
  SMeta  *pMeta = pReader->pMeta;
6,047,261✔
89
  int64_t version1;
90

91
  // query uid.idx
92
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
6,047,261✔
93
    return terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
137✔
94
  }
95

96
  version1 = ((SUidIdxVal *)pReader->pBuf)[0].version;
6,049,847✔
97
  return metaGetTableEntryByVersion(pReader, version1, uid);
6,049,847✔
98
}
99

100
int metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid) {
7,158,669✔
101
  SMeta *pMeta = pReader->pMeta;
7,158,669✔
102

103
  SMetaInfo info;
104
  int32_t   code = metaGetInfo(pMeta, uid, &info, pReader);
7,158,669✔
105
  if (TSDB_CODE_SUCCESS != code) {
7,161,030✔
106
    return terrno = (TSDB_CODE_NOT_FOUND == code ? TSDB_CODE_PAR_TABLE_NOT_EXIST : code);
125!
107
  }
108

109
  return metaGetTableEntryByVersion(pReader, info.version, uid);
7,160,905✔
110
}
111

112
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
1,761,673✔
113
  SMeta   *pMeta = pReader->pMeta;
1,761,673✔
114
  tb_uid_t uid;
115

116
  // query name.idx
117
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
1,761,673✔
118
    return terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
53,143✔
119
  }
120

121
  uid = *(tb_uid_t *)pReader->pBuf;
1,708,592✔
122
  return metaReaderGetTableEntryByUid(pReader, uid);
1,708,592✔
123
}
124

125
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
138,490✔
126
  void    *pData = NULL;
138,490✔
127
  int      nData = 0;
138,490✔
128
  tb_uid_t uid = 0;
138,490✔
129

130
  metaRLock(pMeta);
138,490✔
131

132
  if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pData, &nData) == 0) {
138,546✔
133
    uid = *(tb_uid_t *)pData;
11,320✔
134
    tdbFree(pData);
11,320✔
135
  }
136

137
  metaULock(pMeta);
138,523✔
138

139
  return uid;
138,554✔
140
}
141

142
int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) {
17,127✔
143
  int         code = 0;
17,127✔
144
  SMetaReader mr = {0};
17,127✔
145
  metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, META_READER_LOCK);
17,127✔
146
  code = metaReaderGetTableEntryByUid(&mr, uid);
17,126✔
147
  if (code < 0) {
17,105!
148
    metaReaderClear(&mr);
×
149
    return code;
×
150
  }
151

152
  STR_TO_VARSTR(tbName, mr.me.name);
17,105✔
153
  metaReaderClear(&mr);
17,105✔
154

155
  return 0;
17,129✔
156
}
157

158
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
×
159
  int         code = 0;
×
160
  SMetaReader mr = {0};
×
161
  metaReaderDoInit(&mr, (SMeta *)meta, META_READER_LOCK);
×
162
  code = metaReaderGetTableEntryByUid(&mr, uid);
×
163
  if (code < 0) {
×
164
    metaReaderClear(&mr);
×
165
    return code;
×
166
  }
167
  tstrncpy(tbName, mr.me.name, TSDB_TABLE_NAME_LEN);
×
168
  metaReaderClear(&mr);
×
169

170
  return 0;
×
171
}
172

173
int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
417,828✔
174
  int         code = 0;
417,828✔
175
  SMetaReader mr = {0};
417,828✔
176
  metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, META_READER_LOCK);
417,828✔
177

178
  SMetaReader *pReader = &mr;
417,832✔
179

180
  // query name.idx
181
  if (tdbTbGet(((SMeta *)pReader->pMeta)->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
417,832✔
182
    metaReaderClear(&mr);
1,715✔
183
    return terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
1,715✔
184
  }
185

186
  *uid = *(tb_uid_t *)pReader->pBuf;
416,113✔
187

188
  metaReaderClear(&mr);
416,113✔
189

190
  return 0;
416,117✔
191
}
192

193
int metaGetTableTypeSuidByName(void *pVnode, char *tbName, ETableType *tbType, uint64_t* suid) {
416,114✔
194
  int         code = 0;
416,114✔
195
  SMetaReader mr = {0};
416,114✔
196
  metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, META_READER_LOCK);
416,114✔
197

198
  code = metaGetTableEntryByName(&mr, tbName);
416,123✔
199
  if (code == 0) *tbType = mr.me.type;
416,111✔
200
  if (TSDB_CHILD_TABLE == mr.me.type) {
416,111✔
201
    *suid = mr.me.ctbEntry.suid;
416,090✔
202
  } else if (TSDB_SUPER_TABLE == mr.me.type) {
21✔
203
    *suid = mr.me.uid;
11✔
204
  } else {
205
    *suid = 0;
10✔
206
  }
207

208
  metaReaderClear(&mr);
416,111✔
209
  return code;
416,121✔
210
}
211

212
int metaReadNext(SMetaReader *pReader) {
×
213
  SMeta *pMeta = pReader->pMeta;
×
214

215
  // TODO
216

217
  return 0;
×
218
}
219

220
int metaGetTableTtlByUid(void *meta, uint64_t uid, int64_t *ttlDays) {
×
221
  int         code = -1;
×
222
  SMetaReader mr = {0};
×
223
  metaReaderDoInit(&mr, (SMeta *)meta, META_READER_LOCK);
×
224
  code = metaReaderGetTableEntryByUid(&mr, uid);
×
225
  if (code < 0) {
×
226
    goto _exit;
×
227
  }
228
  if (mr.me.type == TSDB_CHILD_TABLE) {
×
229
    *ttlDays = mr.me.ctbEntry.ttlDays;
×
230
  } else if (mr.me.type == TSDB_NORMAL_TABLE) {
×
231
    *ttlDays = mr.me.ntbEntry.ttlDays;
×
232
  } else {
233
    goto _exit;
×
234
  }
235

236
  code = 0;
×
237

238
_exit:
×
239
  metaReaderClear(&mr);
×
240
  return code;
×
241
}
242

243
#if 1  // ===================================================
244
SMTbCursor *metaOpenTbCursor(void *pVnode) {
963,514✔
245
  SMTbCursor *pTbCur = NULL;
963,514✔
246
  int32_t     code;
247

248
  pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
963,514!
249
  if (pTbCur == NULL) {
964,136!
250
    return NULL;
×
251
  }
252

253
  SVnode *pVnodeObj = pVnode;
964,136✔
254
  // tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
255
  pTbCur->pMeta = pVnodeObj->pMeta;
964,136✔
256
  pTbCur->paused = 1;
964,136✔
257
  code = metaResumeTbCursor(pTbCur, 1, 0);
964,136✔
258
  if (code) {
964,624✔
259
    terrno = code;
165✔
260
    taosMemoryFree(pTbCur);
×
261
    return NULL;
×
262
  }
263
  return pTbCur;
964,459✔
264
}
265

266
void metaCloseTbCursor(SMTbCursor *pTbCur) {
1,931,209✔
267
  if (pTbCur) {
1,931,209✔
268
    tdbFree(pTbCur->pKey);
965,932✔
269
    tdbFree(pTbCur->pVal);
965,892✔
270
    if (!pTbCur->paused) {
965,918✔
271
      metaReaderClear(&pTbCur->mr);
79,214✔
272
      if (pTbCur->pDbc) {
79,213!
273
        tdbTbcClose((TBC *)pTbCur->pDbc);
79,215✔
274
      }
275
    }
276
    taosMemoryFree(pTbCur);
965,925!
277
  }
278
}
1,931,252✔
279

280
void metaPauseTbCursor(SMTbCursor *pTbCur) {
887,263✔
281
  if (!pTbCur->paused) {
887,263!
282
    metaReaderClear(&pTbCur->mr);
887,484✔
283
    tdbTbcClose((TBC *)pTbCur->pDbc);
887,531✔
284
    pTbCur->paused = 1;
887,466✔
285
  }
286
}
887,245✔
287
int32_t metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first, int8_t move) {
965,765✔
288
  int32_t code = 0;
965,765✔
289
  int32_t lino;
290
  int8_t  locked = 0;
965,765✔
291
  if (pTbCur->paused) {
965,765!
292
    metaReaderDoInit(&pTbCur->mr, pTbCur->pMeta, META_READER_LOCK);
965,846✔
293
    locked = 1;
966,433✔
294
    code = tdbTbcOpen(((SMeta *)pTbCur->pMeta)->pUidIdx, (TBC **)&pTbCur->pDbc, NULL);
966,433✔
295
    if (code != 0) {
965,156!
296
      TSDB_CHECK_CODE(code, lino, _exit);
×
297
    }
298

299
    if (first) {
965,156✔
300
      code = tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
964,340✔
301
      TSDB_CHECK_CODE(code, lino, _exit);
964,859!
302
    } else {
303
      int c = 1;
816✔
304
      code = tdbTbcMoveTo(pTbCur->pDbc, pTbCur->pKey, pTbCur->kLen, &c);
816✔
305
      TSDB_CHECK_CODE(code, lino, _exit);
816!
306
      if (c == 0) {
816!
307
        if (move) tdbTbcMoveToNext(pTbCur->pDbc);
816!
308
      } else if (c < 0) {
×
309
        code = tdbTbcMoveToPrev(pTbCur->pDbc);
×
310
        TSDB_CHECK_CODE(code, lino, _exit);
×
311
      } else {
312
        code = tdbTbcMoveToNext(pTbCur->pDbc);
×
313
        TSDB_CHECK_CODE(code, lino, _exit);
×
314
      }
315
    }
316

317
    pTbCur->paused = 0;
965,675✔
318
  }
319

320
_exit:
×
321
  if (code != 0 && locked) {
965,594!
322
    metaReaderReleaseLock(&pTbCur->mr);
×
323
  }
324
  return code;
965,201✔
325
}
326

327
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
12,209,977✔
328
  int    ret;
329
  void  *pBuf;
330
  STbCfg tbCfg;
331

332
  for (;;) {
333
    ret = tdbTbcNext((TBC *)pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
12,209,977✔
334
    if (ret < 0) {
12,242,699✔
335
      return ret;
965,854✔
336
    }
337

338
    tDecoderClear(&pTbCur->mr.coder);
11,276,845✔
339

340
    ret = metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
11,290,124✔
341
    if (ret) return ret;
11,248,429!
342

343
    if (pTbCur->mr.me.type == jumpTableType) {
11,248,429✔
344
      continue;
2,390,422✔
345
    }
346

347
    break;
8,858,007✔
348
  }
349

350
  return 0;
8,858,007✔
351
}
352

353
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) {
×
354
  int    ret;
355
  void  *pBuf;
356
  STbCfg tbCfg;
357

358
  for (;;) {
359
    ret = tdbTbcPrev((TBC *)pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
×
360
    if (ret < 0) {
×
361
      return -1;
×
362
    }
363

364
    tDecoderClear(&pTbCur->mr.coder);
×
365

366
    ret = metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
×
367
    if (ret < 0) {
×
368
      return ret;
×
369
    }
370

371
    if (pTbCur->mr.me.type == jumpTableType) {
×
372
      continue;
×
373
    }
374

375
    break;
×
376
  }
377

378
  return 0;
×
379
}
380

381
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, SExtSchema** extSchema) {
4,765,497✔
382
  void           *pData = NULL;
4,765,497✔
383
  int             nData = 0;
4,765,497✔
384
  int64_t         version;
385
  SSchemaWrapper  schema = {0};
4,765,497✔
386
  SSchemaWrapper *pSchema = NULL;
4,765,497✔
387
  SDecoder        dc = {0};
4,765,497✔
388
  if (lock) {
4,765,497✔
389
    metaRLock(pMeta);
4,741,590✔
390
  }
391
_query:
4,768,790✔
392
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
4,846,579✔
393
    goto _err;
2,557✔
394
  }
395

396
  version = ((SUidIdxVal *)pData)[0].version;
4,853,877✔
397

398
  if (tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData) != 0) {
4,853,877!
399
    goto _err;
×
400
  }
401

402
  SMetaEntry me = {0};
4,855,268✔
403
  tDecoderInit(&dc, pData, nData);
4,855,268✔
404
  int32_t code = metaDecodeEntry(&dc, &me);
4,852,129✔
405
  if (code) {
4,852,381!
406
    tDecoderClear(&dc);
×
407
    goto _err;
×
408
  }
409
  if (me.type == TSDB_SUPER_TABLE) {
4,852,381✔
410
    if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
4,451,637!
411
      pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
4,448,745✔
412
      if (extSchema != NULL) *extSchema = metaGetSExtSchema(&me);
4,448,745✔
413
      tDecoderClear(&dc);
4,448,746✔
414
      goto _exit;
4,453,675✔
415
    }
416
  } else if (me.type == TSDB_CHILD_TABLE) {
400,744✔
417
    uid = me.ctbEntry.suid;
77,772✔
418
    tDecoderClear(&dc);
77,772✔
419
    goto _query;
77,789✔
420
  } else {
421
    if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
322,972✔
422
      pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
323,025✔
423
      if (extSchema != NULL) *extSchema = metaGetSExtSchema(&me);
323,025✔
424
      tDecoderClear(&dc);
323,019✔
425
      goto _exit;
323,081✔
426
    }
427
  }
UNCOV
428
  if (extSchema != NULL) *extSchema = metaGetSExtSchema(&me);
×
UNCOV
429
  tDecoderClear(&dc);
×
430

431
  // query from skm db
432
  if (tdbTbGet(pMeta->pSkmDb, &(SSkmDbKey){.uid = uid, .sver = sver}, sizeof(SSkmDbKey), &pData, &nData) < 0) {
145!
UNCOV
433
    goto _err;
×
434
  }
435

436
  tDecoderInit(&dc, pData, nData);
145✔
437
  if (tDecodeSSchemaWrapperEx(&dc, &schema) != 0) {
145!
UNCOV
438
    goto _err;
×
439
  }
440
  pSchema = tCloneSSchemaWrapper(&schema);
145✔
441
  tDecoderClear(&dc);
145✔
442

443
_exit:
4,776,901✔
444
  if (lock) {
4,776,901✔
445
    metaULock(pMeta);
4,746,523✔
446
  }
447
  tdbFree(pData);
4,779,819✔
448
  return pSchema;
4,777,160✔
449

450
_err:
2,557✔
451
  if (lock) {
2,557!
452
    metaULock(pMeta);
2,557✔
453
  }
454
  tdbFree(pData);
2,557✔
455
  return NULL;
2,557✔
456
}
457

458
int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock) {
153✔
459
  void           *pData = NULL;
153✔
460
  int             nData = 0;
153✔
461
  int64_t         version = 0;
153✔
462
  SDecoder        dc = {0};
153✔
463
  int64_t         createTime = INT64_MAX;
153✔
464
  if (lock) {
153!
465
    metaRLock(pMeta);
153✔
466
  }
467

468
  if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
153!
UNCOV
469
    goto _exit;
×
470
  }
471

472
  version = ((SUidIdxVal *)pData)[0].version;
153✔
473

474
  if (tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData) != 0) {
153!
UNCOV
475
    goto _exit;
×
476
  }
477

478
  SMetaEntry me = {0};
153✔
479
  tDecoderInit(&dc, pData, nData);
153✔
480
  int32_t code = metaDecodeEntry(&dc, &me);
153✔
481
  if (code) {
153!
UNCOV
482
    tDecoderClear(&dc);
×
UNCOV
483
    goto _exit;
×
484
  }
485
  if (me.type == TSDB_CHILD_TABLE) {
153!
486
    createTime = me.ctbEntry.btime;
153✔
487
  }
488
  tDecoderClear(&dc);
153✔
489

490
  _exit:
153✔
491
  if (lock) {
153!
492
    metaULock(pMeta);
153✔
493
  }
494
  tdbFree(pData);
153✔
495
  return createTime;
153✔
496
}
497

498
SMCtbCursor *metaOpenCtbCursor(void *pVnode, tb_uid_t uid, int lock) {
3,749,941✔
499
  SMeta       *pMeta = ((SVnode *)pVnode)->pMeta;
3,749,941✔
500
  SMCtbCursor *pCtbCur = NULL;
3,749,941✔
501
  SCtbIdxKey   ctbIdxKey;
502
  int          ret = 0;
3,749,941✔
503
  int          c = 0;
3,749,941✔
504

505
  pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
3,749,941!
506
  if (pCtbCur == NULL) {
3,756,847!
UNCOV
507
    return NULL;
×
508
  }
509

510
  pCtbCur->pMeta = pMeta;
3,756,847✔
511
  pCtbCur->suid = uid;
3,756,847✔
512
  pCtbCur->lock = lock;
3,756,847✔
513
  pCtbCur->paused = 1;
3,756,847✔
514

515
  ret = metaResumeCtbCursor(pCtbCur, 1);
3,756,847✔
516
  if (ret < 0) {
3,752,111!
UNCOV
517
    return NULL;
×
518
  }
519
  return pCtbCur;
3,752,111✔
520
}
521

522
void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
3,757,158✔
523
  if (pCtbCur) {
3,757,158!
524
    if (!pCtbCur->paused) {
3,757,594✔
525
      if (pCtbCur->pMeta && pCtbCur->lock) metaULock(pCtbCur->pMeta);
3,714,540!
526
      if (pCtbCur->pCur) {
3,714,824!
527
        tdbTbcClose(pCtbCur->pCur);
3,714,900✔
528
      }
529
    }
530
    tdbFree(pCtbCur->pKey);
3,757,006✔
531
    tdbFree(pCtbCur->pVal);
3,756,927✔
532
  }
533
  taosMemoryFree(pCtbCur);
3,757,401!
534
}
3,757,746✔
535

536
void metaPauseCtbCursor(SMCtbCursor *pCtbCur) {
43,271✔
537
  if (!pCtbCur->paused) {
43,271!
538
    tdbTbcClose((TBC *)pCtbCur->pCur);
43,283✔
539
    if (pCtbCur->lock) {
43,309!
540
      metaULock(pCtbCur->pMeta);
43,309✔
541
    }
542
    pCtbCur->paused = 1;
43,301✔
543
  }
544
}
43,289✔
545

546
int32_t metaResumeCtbCursor(SMCtbCursor *pCtbCur, int8_t first) {
3,751,712✔
547
  if (pCtbCur->paused) {
3,751,712!
548
    pCtbCur->paused = 0;
3,754,434✔
549

550
    if (pCtbCur->lock) {
3,754,434✔
551
      metaRLock(pCtbCur->pMeta);
3,733,931✔
552
    }
553
    int ret = 0;
3,753,240✔
554
    ret = tdbTbcOpen(pCtbCur->pMeta->pCtbIdx, (TBC **)&pCtbCur->pCur, NULL);
3,753,240✔
555
    if (ret < 0) {
3,752,786!
UNCOV
556
      metaCloseCtbCursor(pCtbCur);
×
UNCOV
557
      return -1;
×
558
    }
559

560
    if (first) {
3,753,354!
561
      SCtbIdxKey ctbIdxKey;
562
      // move to the suid
563
      ctbIdxKey.suid = pCtbCur->suid;
3,753,354✔
564
      ctbIdxKey.uid = INT64_MIN;
3,753,354✔
565
      int c = 0;
3,753,354✔
566
      ret = tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
3,753,354✔
567
      if (c > 0) {
3,753,729✔
568
        ret = tdbTbcMoveToNext(pCtbCur->pCur);
715,810✔
569
      }
570
    } else {
571
      int c = 0;
×
UNCOV
572
      ret = tdbTbcMoveTo(pCtbCur->pCur, pCtbCur->pKey, pCtbCur->kLen, &c);
×
573
      if (c < 0) {
×
UNCOV
574
        ret = tdbTbcMoveToPrev(pCtbCur->pCur);
×
575
      } else {
UNCOV
576
        ret = tdbTbcMoveToNext(pCtbCur->pCur);
×
577
      }
578
    }
579
  }
580
  return 0;
3,750,478✔
581
}
582

583
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
15,704,254✔
584
  int         ret;
585
  SCtbIdxKey *pCtbIdxKey;
586

587
  ret = tdbTbcNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
15,704,254✔
588
  if (ret < 0) {
15,701,567✔
589
    return 0;
599,209✔
590
  }
591

592
  pCtbIdxKey = pCtbCur->pKey;
15,102,358✔
593
  if (pCtbIdxKey->suid > pCtbCur->suid) {
15,102,358✔
594
    return 0;
3,159,715✔
595
  }
596

597
  return pCtbIdxKey->uid;
11,942,643✔
598
}
599

600
struct SMStbCursor {
601
  SMeta   *pMeta;
602
  TBC     *pCur;
603
  tb_uid_t suid;
604
  void    *pKey;
605
  void    *pVal;
606
  int      kLen;
607
  int      vLen;
608
};
609

610
SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
165,346✔
611
  SMStbCursor *pStbCur = NULL;
165,346✔
612
  int          ret = 0;
165,346✔
613
  int          c = 0;
165,346✔
614

615
  pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
165,346!
616
  if (pStbCur == NULL) {
165,354!
UNCOV
617
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
618
    return NULL;
×
619
  }
620

621
  pStbCur->pMeta = pMeta;
165,354✔
622
  pStbCur->suid = suid;
165,354✔
623
  metaRLock(pMeta);
165,354✔
624

625
  ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
165,354✔
626
  if (ret < 0) {
165,351!
627
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
628
    metaULock(pMeta);
×
UNCOV
629
    taosMemoryFree(pStbCur);
×
UNCOV
630
    return NULL;
×
631
  }
632

633
  // move to the suid
634
  ret = tdbTbcMoveTo(pStbCur->pCur, &suid, sizeof(suid), &c);
165,351✔
635
  if (c > 0) {
165,351!
UNCOV
636
    ret = tdbTbcMoveToNext(pStbCur->pCur);
×
637
  }
638

639
  return pStbCur;
165,351✔
640
}
641

642
void metaCloseStbCursor(SMStbCursor *pStbCur) {
165,349✔
643
  if (pStbCur) {
165,349!
644
    if (pStbCur->pMeta) metaULock(pStbCur->pMeta);
165,349!
645
    if (pStbCur->pCur) {
165,351!
646
      tdbTbcClose(pStbCur->pCur);
165,351✔
647

648
      tdbFree(pStbCur->pKey);
165,353✔
649
      tdbFree(pStbCur->pVal);
165,352✔
650
    }
651

652
    taosMemoryFree(pStbCur);
165,352!
653
  }
654
}
165,353✔
655

656
tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
319,018✔
657
  int ret;
658

659
  ret = tdbTbcNext(pStbCur->pCur, &pStbCur->pKey, &pStbCur->kLen, &pStbCur->pVal, &pStbCur->vLen);
319,018✔
660
  if (ret < 0) {
319,030✔
661
    return 0;
165,349✔
662
  }
663
  return *(tb_uid_t *)pStbCur->pKey;
153,681✔
664
}
665

666
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
4,622,176✔
667
  STSchema       *pTSchema = NULL;
4,622,176✔
668
  SSchemaWrapper *pSW = NULL;
4,622,176✔
669

670
  pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL);
4,622,176✔
671
  if (!pSW) return NULL;
4,631,982✔
672

673
  pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version);
4,631,901✔
674

675
  taosMemoryFree(pSW->pSchema);
4,633,503✔
676
  taosMemoryFree(pSW);
4,634,236✔
677
  return pTSchema;
4,633,571✔
678
}
679

680
int32_t metaGetTbTSchemaNotNull(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema **ppTSchema) {
4,186✔
681
  *ppTSchema = metaGetTbTSchema(pMeta, uid, sver, lock);
4,186✔
682
  if (*ppTSchema == NULL) {
4,192!
UNCOV
683
    return terrno;
×
684
  }
685
  return TSDB_CODE_SUCCESS;
4,192✔
686
}
687

688
int32_t metaGetTbTSchemaMaybeNull(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema **ppTSchema) {
4,617,606✔
689
  *ppTSchema = metaGetTbTSchema(pMeta, uid, sver, lock);
4,617,606✔
690
  if (*ppTSchema == NULL && terrno == TSDB_CODE_OUT_OF_MEMORY) {
4,628,820!
UNCOV
691
    return terrno;
×
692
  }
693
  return TSDB_CODE_SUCCESS;
4,629,090✔
694
}
695

696
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
1,358,302✔
697
  int32_t code = 0;
1,358,302✔
698
  int32_t lino;
699

700
  void     *pData = NULL;
1,358,302✔
701
  int       nData = 0;
1,358,302✔
702
  SSkmDbKey skmDbKey;
703
  if (sver <= 0) {
1,358,302✔
704
    SMetaInfo info;
705
    if (metaGetInfo(pMeta, suid ? suid : uid, &info, NULL) == 0) {
676,197✔
706
      sver = info.skmVer;
676,109✔
707
    } else {
708
      TBC *pSkmDbC = NULL;
79✔
709
      int  c;
710

711
      skmDbKey.uid = suid ? suid : uid;
79!
712
      skmDbKey.sver = INT32_MAX;
79✔
713

714
      code = tdbTbcOpen(pMeta->pSkmDb, &pSkmDbC, NULL);
79✔
715
      TSDB_CHECK_CODE(code, lino, _exit);
81!
716
      metaRLock(pMeta);
81✔
717

718
      if (tdbTbcMoveTo(pSkmDbC, &skmDbKey, sizeof(skmDbKey), &c) < 0) {
81!
719
        metaULock(pMeta);
×
UNCOV
720
        tdbTbcClose(pSkmDbC);
×
UNCOV
721
        code = TSDB_CODE_NOT_FOUND;
×
UNCOV
722
        goto _exit;
×
723
      }
724

725
      if (c == 0) {
81!
726
        metaULock(pMeta);
×
727
        tdbTbcClose(pSkmDbC);
×
UNCOV
728
        code = TSDB_CODE_FAILED;
×
UNCOV
729
        metaError("meta/query: incorrect c: %" PRId32 ".", c);
×
UNCOV
730
        goto _exit;
×
731
      }
732

733
      if (c < 0) {
81!
UNCOV
734
        int32_t ret = tdbTbcMoveToPrev(pSkmDbC);
×
735
      }
736

737
      const void *pKey = NULL;
81✔
738
      int32_t     nKey = 0;
81✔
739
      int32_t     ret = tdbTbcGet(pSkmDbC, &pKey, &nKey, NULL, NULL);
81✔
740

741
      if (((SSkmDbKey *)pKey)->uid != skmDbKey.uid) {
81!
742
        metaULock(pMeta);
×
UNCOV
743
        tdbTbcClose(pSkmDbC);
×
UNCOV
744
        code = TSDB_CODE_NOT_FOUND;
×
UNCOV
745
        goto _exit;
×
746
      }
747

748
      sver = ((SSkmDbKey *)pKey)->sver;
81✔
749

750
      metaULock(pMeta);
81✔
751
      tdbTbcClose(pSkmDbC);
81✔
752
    }
753
  }
754

755
  if (!(sver > 0)) {
1,358,281!
UNCOV
756
    code = TSDB_CODE_NOT_FOUND;
×
UNCOV
757
    goto _exit;
×
758
  }
759

760
  skmDbKey.uid = suid ? suid : uid;
1,358,281✔
761
  skmDbKey.sver = sver;
1,358,281✔
762
  metaRLock(pMeta);
1,358,281✔
763
  if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(SSkmDbKey), &pData, &nData) < 0) {
1,358,522!
UNCOV
764
    metaULock(pMeta);
×
UNCOV
765
    code = TSDB_CODE_NOT_FOUND;
×
UNCOV
766
    goto _exit;
×
767
  }
768
  metaULock(pMeta);
1,358,398✔
769

770
  // decode
771
  SDecoder        dc = {0};
1,358,452✔
772
  SSchemaWrapper  schema;
773
  SSchemaWrapper *pSchemaWrapper = &schema;
1,358,452✔
774

775
  tDecoderInit(&dc, pData, nData);
1,358,452✔
776
  code = tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
1,357,898✔
777
  tDecoderClear(&dc);
1,357,898✔
778
  tdbFree(pData);
1,358,259✔
779
  if (TSDB_CODE_SUCCESS != code) {
1,358,287!
UNCOV
780
    taosMemoryFree(pSchemaWrapper->pSchema);
×
UNCOV
781
    goto _exit;
×
782
  }
783

784
  // convert
785
  STSchema *pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
1,358,287✔
786
  if (pTSchema == NULL) {
1,358,309!
UNCOV
787
    code = TSDB_CODE_OUT_OF_MEMORY;
×
788
  }
789

790
  *ppTSchema = pTSchema;
1,358,309✔
791
  taosMemoryFree(pSchemaWrapper->pSchema);
1,358,309!
792

793
_exit:
1,358,512✔
794
  return code;
1,358,512✔
795
}
796

797
// N.B. Called by statusReq per second
798
int64_t metaGetTbNum(SMeta *pMeta) {
1,166,108✔
799
  // num of child tables (excluding normal tables , stables and others)
800

801
  /* int64_t num = 0; */
802
  /* vnodeGetAllCtbNum(pMeta->pVnode, &num); */
803

804
  return pMeta->pVnode->config.vndStats.numOfCTables + pMeta->pVnode->config.vndStats.numOfNTables;
1,166,108✔
805
}
806

807
void metaUpdTimeSeriesNum(SMeta *pMeta) {
165,333✔
808
  int64_t nCtbTimeSeries = 0;
165,333✔
809
  if (vnodeGetTimeSeriesNum(pMeta->pVnode, &nCtbTimeSeries) == 0) {
165,333!
810
    atomic_store_64(&pMeta->pVnode->config.vndStats.numOfTimeSeries, nCtbTimeSeries);
165,337✔
811
  }
812
}
165,337✔
813

814
static FORCE_INLINE int64_t metaGetTimeSeriesNumImpl(SMeta *pMeta, bool forceUpd) {
815
  // sum of (number of columns of stable -  1) * number of ctables (excluding timestamp column)
816
  SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
1,326,591✔
817
  if (forceUpd || pStats->numOfTimeSeries <= 0) {
1,326,610✔
818
    metaUpdTimeSeriesNum(pMeta);
162,796✔
819
  }
820

821
  return pStats->numOfTimeSeries + pStats->numOfNTimeSeries;
1,326,627✔
822
}
823

824
// type: 1 reported timeseries
825
int64_t metaGetTimeSeriesNum(SMeta *pMeta, int type) {
1,326,591!
826
  int64_t nTimeSeries = metaGetTimeSeriesNumImpl(pMeta, false);
1,326,627✔
827
  if (type == 1) {
1,326,627✔
828
    atomic_store_64(&pMeta->pVnode->config.vndStats.numOfReportedTimeSeries, nTimeSeries);
1,166,108✔
829
  }
830
  return nTimeSeries;
1,326,624✔
831
}
832

833
typedef struct {
834
  SMeta   *pMeta;
835
  TBC     *pCur;
836
  tb_uid_t uid;
837
  void    *pKey;
838
  void    *pVal;
839
  int      kLen;
840
  int      vLen;
841
} SMSmaCursor;
842

UNCOV
843
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
×
UNCOV
844
  SMSmaCursor *pSmaCur = NULL;
×
845
  SSmaIdxKey   smaIdxKey;
846
  int          ret;
847
  int          c;
848

849
  pSmaCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pSmaCur));
×
UNCOV
850
  if (pSmaCur == NULL) {
×
UNCOV
851
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
852
    return NULL;
×
853
  }
854

UNCOV
855
  pSmaCur->pMeta = pMeta;
×
856
  pSmaCur->uid = uid;
×
857
  metaRLock(pMeta);
×
858

859
  ret = tdbTbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
×
860
  if (ret < 0) {
×
UNCOV
861
    metaULock(pMeta);
×
UNCOV
862
    taosMemoryFree(pSmaCur);
×
UNCOV
863
    return NULL;
×
864
  }
865

866
  // move to the suid
867
  smaIdxKey.uid = uid;
×
868
  smaIdxKey.smaUid = INT64_MIN;
×
UNCOV
869
  ret = tdbTbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
×
UNCOV
870
  if (c > 0) {
×
871
    ret = tdbTbcMoveToNext(pSmaCur->pCur);
×
872
  }
873

874
  return pSmaCur;
×
875
}
876

877
void metaCloseSmaCursor(SMSmaCursor *pSmaCur) {
×
878
  if (pSmaCur) {
×
879
    if (pSmaCur->pMeta) metaULock(pSmaCur->pMeta);
×
UNCOV
880
    if (pSmaCur->pCur) {
×
881
      tdbTbcClose(pSmaCur->pCur);
×
882
      pSmaCur->pCur = NULL;
×
883

UNCOV
884
      tdbFree(pSmaCur->pKey);
×
885
      tdbFree(pSmaCur->pVal);
×
886
    }
887

UNCOV
888
    taosMemoryFree(pSmaCur);
×
889
  }
UNCOV
890
}
×
891

UNCOV
892
tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
×
893
  int         ret;
894
  SSmaIdxKey *pSmaIdxKey;
895

UNCOV
896
  ret = tdbTbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
×
UNCOV
897
  if (ret < 0) {
×
898
    return 0;
×
899
  }
900

UNCOV
901
  pSmaIdxKey = pSmaCur->pKey;
×
UNCOV
902
  if (pSmaIdxKey->uid > pSmaCur->uid) {
×
903
    return 0;
×
904
  }
905

906
  return pSmaIdxKey->uid;
×
907
}
908

UNCOV
909
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
×
910
  STSmaWrapper *pSW = NULL;
×
911
  SArray       *pSmaIds = NULL;
×
912

UNCOV
913
  if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
×
914
    return NULL;
×
915
  }
916

917
  pSW = taosMemoryCalloc(1, sizeof(*pSW));
×
UNCOV
918
  if (!pSW) {
×
UNCOV
919
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
920
    goto _err;
×
921
  }
922

923
  pSW->number = taosArrayGetSize(pSmaIds);
×
924
  pSW->tSma = taosMemoryCalloc(pSW->number, sizeof(STSma));
×
925

UNCOV
926
  if (!pSW->tSma) {
×
UNCOV
927
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
928
    goto _err;
×
929
  }
930

931
  SMetaReader mr = {0};
×
932
  metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
×
933
  int64_t smaId;
934
  int     smaIdx = 0;
×
935
  STSma  *pTSma = NULL;
×
936
  for (int i = 0; i < pSW->number; ++i) {
×
937
    smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
×
938
    if (metaReaderGetTableEntryByUid(&mr, smaId) < 0) {
×
UNCOV
939
      tDecoderClear(&mr.coder);
×
940
      metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
×
941
      continue;
×
942
    }
943
    tDecoderClear(&mr.coder);
×
944
    pTSma = pSW->tSma + smaIdx;
×
945
    memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
×
946
    if (deepCopy) {
×
947
      if (pTSma->exprLen > 0) {
×
UNCOV
948
        if (!(pTSma->expr = taosMemoryCalloc(1, pTSma->exprLen))) {
×
949
          terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
950
          goto _err;
×
951
        }
952
        memcpy((void *)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
×
953
      }
954
      if (pTSma->tagsFilterLen > 0) {
×
UNCOV
955
        if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
×
UNCOV
956
          terrno = TSDB_CODE_OUT_OF_MEMORY;
×
957
          goto _err;
×
958
        }
959
      }
960
      memcpy((void *)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
×
961
    } else {
962
      pTSma->exprLen = 0;
×
UNCOV
963
      pTSma->expr = NULL;
×
UNCOV
964
      pTSma->tagsFilterLen = 0;
×
965
      pTSma->tagsFilter = NULL;
×
966
    }
967

968
    ++smaIdx;
×
969
  }
970

971
  if (smaIdx <= 0) goto _err;
×
972
  pSW->number = smaIdx;
×
973

974
  metaReaderClear(&mr);
×
975
  taosArrayDestroy(pSmaIds);
×
976
  return pSW;
×
977
_err:
×
978
  metaReaderClear(&mr);
×
UNCOV
979
  taosArrayDestroy(pSmaIds);
×
UNCOV
980
  pSW = tFreeTSmaWrapper(pSW, deepCopy);
×
UNCOV
981
  return NULL;
×
982
}
983

UNCOV
984
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
×
UNCOV
985
  STSma      *pTSma = NULL;
×
986
  SMetaReader mr = {0};
×
987
  metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
×
988
  if (metaReaderGetTableEntryByUid(&mr, indexUid) < 0) {
×
UNCOV
989
    metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid);
×
UNCOV
990
    metaReaderClear(&mr);
×
UNCOV
991
    return NULL;
×
992
  }
993
  pTSma = (STSma *)taosMemoryMalloc(sizeof(STSma));
×
994
  if (!pTSma) {
×
UNCOV
995
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
996
    metaReaderClear(&mr);
×
UNCOV
997
    return NULL;
×
998
  }
999

UNCOV
1000
  memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
×
1001

UNCOV
1002
  metaReaderClear(&mr);
×
1003
  return pTSma;
×
1004
}
1005

UNCOV
1006
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
×
1007
  SArray     *pUids = NULL;
×
1008
  SSmaIdxKey *pSmaIdxKey = NULL;
×
1009

UNCOV
1010
  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
×
UNCOV
1011
  if (!pCur) {
×
1012
    return NULL;
×
1013
  }
1014

1015
  while (1) {
×
UNCOV
1016
    tb_uid_t id = metaSmaCursorNext(pCur);
×
UNCOV
1017
    if (id == 0) {
×
1018
      break;
×
1019
    }
1020

1021
    if (!pUids) {
×
1022
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
×
1023
      if (!pUids) {
×
UNCOV
1024
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1025
        metaCloseSmaCursor(pCur);
×
UNCOV
1026
        return NULL;
×
1027
      }
1028
    }
1029

1030
    pSmaIdxKey = (SSmaIdxKey *)pCur->pKey;
×
1031

1032
    if (!taosArrayPush(pUids, &pSmaIdxKey->smaUid)) {
×
1033
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1034
      metaCloseSmaCursor(pCur);
×
UNCOV
1035
      taosArrayDestroy(pUids);
×
UNCOV
1036
      return NULL;
×
1037
    }
1038
  }
1039

UNCOV
1040
  metaCloseSmaCursor(pCur);
×
1041
  return pUids;
×
1042
}
1043

1044
SArray *metaGetSmaTbUids(SMeta *pMeta) {
×
UNCOV
1045
  SArray     *pUids = NULL;
×
1046
  SSmaIdxKey *pSmaIdxKey = NULL;
×
1047
  tb_uid_t    lastUid = 0;
×
1048

UNCOV
1049
  SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
×
UNCOV
1050
  if (!pCur) {
×
1051
    return NULL;
×
1052
  }
1053

1054
  while (1) {
×
UNCOV
1055
    tb_uid_t uid = metaSmaCursorNext(pCur);
×
UNCOV
1056
    if (uid == 0) {
×
1057
      break;
×
1058
    }
1059

UNCOV
1060
    if (lastUid == uid) {
×
1061
      continue;
×
1062
    }
1063

1064
    lastUid = uid;
×
1065

1066
    if (!pUids) {
×
1067
      pUids = taosArrayInit(16, sizeof(tb_uid_t));
×
1068
      if (!pUids) {
×
UNCOV
1069
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1070
        metaCloseSmaCursor(pCur);
×
UNCOV
1071
        return NULL;
×
1072
      }
1073
    }
1074

1075
    if (!taosArrayPush(pUids, &uid)) {
×
1076
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1077
      metaCloseSmaCursor(pCur);
×
UNCOV
1078
      taosArrayDestroy(pUids);
×
UNCOV
1079
      return NULL;
×
1080
    }
1081
  }
1082

UNCOV
1083
  metaCloseSmaCursor(pCur);
×
UNCOV
1084
  return pUids;
×
1085
}
1086

1087
#endif
1088

1089
const void *metaGetTableTagVal(const void *pTag, int16_t type, STagVal *val) {
18,340,898✔
1090
  STag *tag = (STag *)pTag;
18,340,898✔
1091
  if (type == TSDB_DATA_TYPE_JSON) {
18,340,898✔
1092
    return tag;
12,573✔
1093
  }
1094
  bool find = tTagGet(tag, val);
18,328,325✔
1095

1096
  if (!find) {
18,345,130✔
1097
    return NULL;
128,894✔
1098
  }
1099

1100
  return val;
18,216,236✔
1101
}
1102

1103
typedef struct {
1104
  SMeta   *pMeta;
1105
  TBC     *pCur;
1106
  tb_uid_t suid;
1107
  int16_t  cid;
1108
  int16_t  type;
1109
  void    *pKey;
1110
  void    *pVal;
1111
  int32_t  kLen;
1112
  int32_t  vLen;
1113
} SIdxCursor;
1114

1115
int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
6✔
1116
  SMeta         *pMeta = ((SVnode *)pVnode)->pMeta;
6✔
1117
  SMetaFltParam *param = arg;
6✔
1118
  int32_t        ret = 0;
6✔
1119

1120
  SIdxCursor *pCursor = NULL;
6✔
1121
  pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
6!
1122
  if (pCursor == NULL) {
6!
UNCOV
1123
    return terrno;
×
1124
  }
1125
  pCursor->pMeta = pMeta;
6✔
1126
  pCursor->suid = param->suid;
6✔
1127
  pCursor->cid = param->cid;
6✔
1128
  pCursor->type = param->type;
6✔
1129

1130
  metaRLock(pMeta);
6✔
1131
  ret = tdbTbcOpen(pMeta->pBtimeIdx, &pCursor->pCur, NULL);
6✔
1132
  if (ret != 0) {
6!
UNCOV
1133
    goto END;
×
1134
  }
1135
  int64_t uidLimit = param->reverse ? INT64_MAX : 0;
6✔
1136

1137
  SBtimeIdxKey  btimeKey = {.btime = *(int64_t *)(param->val), .uid = uidLimit};
6✔
1138
  SBtimeIdxKey *pBtimeKey = &btimeKey;
6✔
1139

1140
  int cmp = 0;
6✔
1141
  if (tdbTbcMoveTo(pCursor->pCur, &btimeKey, sizeof(btimeKey), &cmp) < 0) {
6!
UNCOV
1142
    goto END;
×
1143
  }
1144

1145
  int32_t valid = 0;
6✔
1146
  int32_t count = 0;
6✔
1147

1148
  static const int8_t TRY_ERROR_LIMIT = 1;
1149
  do {
35,270✔
1150
    void   *entryKey = NULL;
35,276✔
1151
    int32_t nEntryKey = -1;
35,276✔
1152
    valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, NULL, NULL);
35,276✔
1153
    if (valid < 0) break;
34,205✔
1154

1155
    SBtimeIdxKey *p = entryKey;
34,199✔
1156
    if (count > TRY_ERROR_LIMIT) break;
34,199!
1157

1158
    terrno = TSDB_CODE_SUCCESS;
34,199✔
1159
    int32_t cmp = (*param->filterFunc)((void *)&p->btime, (void *)&pBtimeKey->btime, param->type);
34,261✔
1160
    if (terrno != TSDB_CODE_SUCCESS) {
33,861!
UNCOV
1161
      ret = terrno;
×
UNCOV
1162
      break;
×
1163
    }
1164
    if (cmp == 0) {
33,851!
1165
      if (taosArrayPush(pUids, &p->uid) == NULL) {
68,109!
UNCOV
1166
        ret = terrno;
×
1167
        break;
×
1168
      }
1169
    } else {
UNCOV
1170
      if (param->equal == true) {
×
UNCOV
1171
        if (count > TRY_ERROR_LIMIT) break;
×
UNCOV
1172
        count++;
×
1173
      }
1174
    }
1175
    valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
34,258✔
1176
    if (valid < 0) break;
35,270!
1177
  } while (1);
1178

1179
END:
6✔
1180
  if (pCursor->pMeta) metaULock(pCursor->pMeta);
6!
1181
  if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
6!
1182
  taosMemoryFree(pCursor);
6!
1183
  return ret;
6✔
1184
}
1185

1186
int32_t metaFilterTableName(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
×
1187
  SMeta         *pMeta = ((SVnode *)pVnode)->pMeta;
×
UNCOV
1188
  SMetaFltParam *param = arg;
×
1189
  int32_t        ret = 0;
×
1190
  char          *buf = NULL;
×
1191

1192
  STagIdxKey *pKey = NULL;
×
1193
  int32_t     nKey = 0;
×
1194

1195
  SIdxCursor *pCursor = NULL;
×
UNCOV
1196
  pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
×
1197
  if (pCursor == NULL) {
×
1198
    return terrno;
×
1199
  }
1200
  pCursor->pMeta = pMeta;
×
UNCOV
1201
  pCursor->suid = param->suid;
×
1202
  pCursor->cid = param->cid;
×
UNCOV
1203
  pCursor->type = param->type;
×
1204

1205
  char *pName = param->val;
×
1206

1207
  metaRLock(pMeta);
×
UNCOV
1208
  ret = tdbTbcOpen(pMeta->pNameIdx, &pCursor->pCur, NULL);
×
UNCOV
1209
  if (ret != 0) {
×
1210
    goto END;
×
1211
  }
1212

UNCOV
1213
  int cmp = 0;
×
1214
  if (tdbTbcMoveTo(pCursor->pCur, pName, strlen(pName) + 1, &cmp) < 0) {
×
1215
    goto END;
×
1216
  }
1217
  int32_t valid = 0;
×
1218
  int32_t count = 0;
×
1219

1220
  int32_t TRY_ERROR_LIMIT = 1;
×
1221
  do {
×
1222
    void   *pEntryKey = NULL, *pEntryVal = NULL;
×
UNCOV
1223
    int32_t nEntryKey = -1, nEntryVal = 0;
×
1224
    valid = tdbTbcGet(pCursor->pCur, (const void **)pEntryKey, &nEntryKey, (const void **)&pEntryVal, &nEntryVal);
×
UNCOV
1225
    if (valid < 0) break;
×
1226

1227
    if (count > TRY_ERROR_LIMIT) break;
×
1228

1229
    char *pTableKey = (char *)pEntryKey;
×
1230
    terrno = TSDB_CODE_SUCCESS;
×
1231
    cmp = (*param->filterFunc)(pTableKey, pName, pCursor->type);
×
UNCOV
1232
    if (terrno != TSDB_CODE_SUCCESS) {
×
1233
      ret = terrno;
×
1234
      goto END;
×
1235
    }
1236
    if (cmp == 0) {
×
1237
      tb_uid_t tuid = *(tb_uid_t *)pEntryVal;
×
UNCOV
1238
      if (taosArrayPush(pUids, &tuid) == NULL) {
×
UNCOV
1239
        ret = terrno;
×
1240
        goto END;
×
1241
      }
1242
    } else {
UNCOV
1243
      if (param->equal == true) {
×
UNCOV
1244
        if (count > TRY_ERROR_LIMIT) break;
×
1245
        count++;
×
1246
      }
1247
    }
UNCOV
1248
    valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
×
UNCOV
1249
    if (valid < 0) {
×
UNCOV
1250
      break;
×
1251
    }
1252
  } while (1);
1253

1254
END:
×
1255
  if (pCursor->pMeta) metaULock(pCursor->pMeta);
×
UNCOV
1256
  if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
×
1257
  taosMemoryFree(buf);
×
UNCOV
1258
  taosMemoryFree(pKey);
×
1259

UNCOV
1260
  taosMemoryFree(pCursor);
×
1261

1262
  return ret;
×
1263
}
1264
int32_t metaFilterTtl(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
×
1265
  SMeta         *pMeta = ((SVnode *)pVnode)->pMeta;
×
UNCOV
1266
  SMetaFltParam *param = arg;
×
1267
  int32_t        ret = 0;
×
1268
  char          *buf = NULL;
×
1269

1270
  STtlIdxKey *pKey = NULL;
×
1271
  int32_t     nKey = 0;
×
1272

1273
  SIdxCursor *pCursor = NULL;
×
UNCOV
1274
  pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
×
1275
  if (pCursor == NULL) {
×
1276
    return terrno;
×
1277
  }
1278
  pCursor->pMeta = pMeta;
×
UNCOV
1279
  pCursor->suid = param->suid;
×
1280
  pCursor->cid = param->cid;
×
UNCOV
1281
  pCursor->type = param->type;
×
1282

1283
  metaRLock(pMeta);
×
1284
  // ret = tdbTbcOpen(pMeta->pTtlIdx, &pCursor->pCur, NULL);
1285

1286
END:
×
1287
  if (pCursor->pMeta) metaULock(pCursor->pMeta);
×
UNCOV
1288
  if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
×
1289
  taosMemoryFree(buf);
×
UNCOV
1290
  taosMemoryFree(pKey);
×
1291

UNCOV
1292
  taosMemoryFree(pCursor);
×
1293

UNCOV
1294
  return ret;
×
1295
  // impl later
1296
  return 0;
1297
}
1298
int32_t metaFilterTableIds(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
2,110✔
1299
  SMeta         *pMeta = ((SVnode *)pVnode)->pMeta;
2,110✔
1300
  SMetaFltParam *param = arg;
2,110✔
1301

1302
  SMetaEntry oStbEntry = {0};
2,110✔
1303
  int32_t    code = 0;
2,110✔
1304
  char      *buf = NULL;
2,110✔
1305
  void      *pData = NULL;
2,110✔
1306
  int        nData = 0;
2,110✔
1307

1308
  SDecoder    dc = {0};
2,110✔
1309
  STbDbKey    tbDbKey = {0};
2,110✔
1310
  STagIdxKey *pKey = NULL;
2,110✔
1311
  int32_t     nKey = 0;
2,110✔
1312

1313
  SIdxCursor *pCursor = NULL;
2,110✔
1314
  pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
2,110!
1315
  if (!pCursor) {
2,120!
UNCOV
1316
    return terrno;
×
1317
  }
1318
  pCursor->pMeta = pMeta;
2,120✔
1319
  pCursor->suid = param->suid;
2,120✔
1320
  pCursor->cid = param->cid;
2,120✔
1321
  pCursor->type = param->type;
2,120✔
1322

1323
  metaRLock(pMeta);
2,120✔
1324

1325
  TAOS_CHECK_GOTO(tdbTbGet(pMeta->pUidIdx, &param->suid, sizeof(tb_uid_t), &pData, &nData), NULL, END);
2,119!
1326

1327
  tbDbKey.uid = param->suid;
2,115✔
1328
  tbDbKey.version = ((SUidIdxVal *)pData)[0].version;
2,115✔
1329

1330
  TAOS_CHECK_GOTO(tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData), NULL, END);
2,115!
1331

1332
  tDecoderInit(&dc, pData, nData);
2,121✔
1333

1334
  code = metaDecodeEntry(&dc, &oStbEntry);
2,120✔
1335
  if (code) {
2,114!
UNCOV
1336
    tDecoderClear(&dc);
×
UNCOV
1337
    goto END;
×
1338
  }
1339

1340
  if (oStbEntry.stbEntry.schemaTag.pSchema == NULL || oStbEntry.stbEntry.schemaTag.pSchema == NULL) {
2,114!
UNCOV
1341
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, NULL, END);
×
1342
  }
1343

1344
  code = TSDB_CODE_INVALID_PARA;
2,114✔
1345
  for (int i = 0; i < oStbEntry.stbEntry.schemaTag.nCols; i++) {
4,277✔
1346
    SSchema *schema = oStbEntry.stbEntry.schemaTag.pSchema + i;
3,450✔
1347
    if (schema->colId == param->cid && param->type == schema->type && (IS_IDX_ON(schema))) {
3,450!
1348
      code = 0;
829✔
1349
    } else {
1350
      TAOS_CHECK_GOTO(code, NULL, END);
2,621✔
1351
    }
1352
  }
1353

1354
  code = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
827✔
1355
  if (code != 0) {
829✔
1356
    TAOS_CHECK_GOTO(terrno, NULL, END);
1!
1357
  }
1358

1359
  int32_t maxSize = 0;
828✔
1360
  int32_t nTagData = 0;
828✔
1361
  void   *tagData = NULL;
828✔
1362

1363
  if (param->val == NULL) {
828!
UNCOV
1364
    metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
×
UNCOV
1365
    goto END;
×
1366
  } else {
1367
    if (IS_VAR_DATA_TYPE(param->type)) {
828!
1368
      tagData = varDataVal(param->val);
186✔
1369
      nTagData = varDataLen(param->val);
186✔
1370

1371
      if (param->type == TSDB_DATA_TYPE_NCHAR) {
186!
1372
        maxSize = 4 * nTagData + 1;
×
UNCOV
1373
        buf = taosMemoryCalloc(1, maxSize);
×
UNCOV
1374
        if (buf == NULL) {
×
1375
          TAOS_CHECK_GOTO(terrno, NULL, END);
×
1376
        }
1377

UNCOV
1378
        if (false == taosMbsToUcs4(tagData, nTagData, (TdUcs4 *)buf, maxSize, &maxSize, NULL)) {
×
1379
          TAOS_CHECK_GOTO(terrno, NULL, END);
×
1380
        }
1381

UNCOV
1382
        tagData = buf;
×
UNCOV
1383
        nTagData = maxSize;
×
1384
      }
1385
    } else {
1386
      tagData = param->val;
642✔
1387
      nTagData = tDataTypes[param->type].bytes;
642✔
1388
    }
1389
  }
1390

1391
  TAOS_CHECK_GOTO(metaCreateTagIdxKey(pCursor->suid, pCursor->cid, tagData, nTagData, pCursor->type,
828!
1392
                                      param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey),
1393
                  NULL, END);
1394

1395
  int cmp = 0;
829✔
1396
  TAOS_CHECK_GOTO(tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp), 0, END);
829!
1397

1398
  int     count = 0;
831✔
1399
  int32_t valid = 0;
831✔
1400
  bool    found = false;
831✔
1401

1402
  static const int8_t TRY_ERROR_LIMIT = 1;
1403

1404
  /// src:   [[suid, cid1, type1]....[suid, cid2, type2]....[suid, cid3, type3]...]
1405
  /// target:                        [suid, cid2, type2]
1406
  int diffCidCount = 0;
831✔
1407
  do {
11,308✔
1408
    void   *entryKey = NULL, *entryVal = NULL;
12,139✔
1409
    int32_t nEntryKey, nEntryVal;
1410

1411
    valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
12,139✔
1412
    if (valid < 0) {
12,063✔
1413
      code = valid;
714✔
1414
    }
1415
    if (count > TRY_ERROR_LIMIT) {
12,063✔
1416
      break;
833✔
1417
    }
1418

1419
    STagIdxKey *p = entryKey;
11,974✔
1420
    if (p == NULL) break;
11,974✔
1421

1422
    if (p->type != pCursor->type || p->suid != pCursor->suid || p->cid != pCursor->cid) {
11,286✔
1423
      if (found == true) break;  //
116✔
1424
      if (diffCidCount > TRY_ERROR_LIMIT) break;
60!
1425
      diffCidCount++;
60✔
1426
      count++;
60✔
1427
      valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
60✔
1428
      if (valid < 0) {
62!
UNCOV
1429
        code = valid;
×
UNCOV
1430
        break;
×
1431
      } else {
1432
        continue;
62✔
1433
      }
1434
    }
1435

1436
    terrno = TSDB_CODE_SUCCESS;
11,170✔
1437
    int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
11,171✔
1438
    if (terrno != TSDB_CODE_SUCCESS) {
11,170!
UNCOV
1439
      TAOS_CHECK_GOTO(terrno, NULL, END);
×
UNCOV
1440
      break;
×
1441
    }
1442
    if (cmp == 0) {
11,180✔
1443
      // match
1444
      tb_uid_t tuid = 0;
10,417✔
1445
      if (IS_VAR_DATA_TYPE(pKey->type)) {
10,417!
1446
        tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
326✔
1447
      } else {
1448
        tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
10,091✔
1449
      }
1450
      if (taosArrayPush(pUids, &tuid) == NULL) {
10,459!
UNCOV
1451
        TAOS_CHECK_GOTO(terrno, NULL, END);
×
1452
      }
1453
      found = true;
10,459✔
1454
    } else {
1455
      if (param->equal == true) {
763✔
1456
        if (count > TRY_ERROR_LIMIT) break;
273!
1457
        count++;
273✔
1458
      }
1459
    }
1460
    valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
11,222✔
1461
    if (valid < 0) {
11,246!
UNCOV
1462
      code = valid;
×
UNCOV
1463
      break;
×
1464
    }
1465
  } while (1);
1466

1467
END:
2,120✔
1468
  if (pCursor->pMeta) metaULock(pCursor->pMeta);
2,120✔
1469
  if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
2,122✔
1470
  if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
2,122!
1471
  taosMemoryFreeClear(oStbEntry.pExtSchemas);
2,122!
1472
  tDecoderClear(&dc);
2,122✔
1473
  tdbFree(pData);
2,119✔
1474

1475
  taosMemoryFree(buf);
2,121!
1476
  taosMemoryFree(pKey);
2,120!
1477

1478
  taosMemoryFree(pCursor);
2,120!
1479

1480
  return code;
2,120✔
1481
}
1482

1483
static int32_t metaGetTableTagByUid(SMeta *pMeta, int64_t suid, int64_t uid, void **tag, int32_t *len, bool lock) {
252,918✔
1484
  int ret = 0;
252,918✔
1485
  if (lock) {
252,918!
UNCOV
1486
    metaRLock(pMeta);
×
1487
  }
1488

1489
  SCtbIdxKey ctbIdxKey = {.suid = suid, .uid = uid};
252,918✔
1490
  ret = tdbTbGet(pMeta->pCtbIdx, &ctbIdxKey, sizeof(SCtbIdxKey), tag, len);
252,918✔
1491
  if (lock) {
252,922!
UNCOV
1492
    metaULock(pMeta);
×
1493
  }
1494

1495
  return ret;
252,922✔
1496
}
1497

1498
int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList) {
252,570✔
1499
  SMeta        *pMeta = ((SVnode *)pVnode)->pMeta;
252,570✔
1500
  const int32_t LIMIT = 128;
252,570✔
1501

1502
  int32_t isLock = false;
252,570✔
1503
  int32_t sz = uidList ? taosArrayGetSize(uidList) : 0;
252,570!
1504
  for (int i = 0; i < sz; i++) {
505,489✔
1505
    STUidTagInfo *p = taosArrayGet(uidList, i);
252,917✔
1506

1507
    if (i % LIMIT == 0) {
252,916✔
1508
      if (isLock) metaULock(pMeta);
252,547!
1509

1510
      metaRLock(pMeta);
252,547✔
1511
      isLock = true;
252,553✔
1512
    }
1513

1514
    //    if (taosHashGet(tags, &p->uid, sizeof(tb_uid_t)) == NULL) {
1515
    void   *val = NULL;
252,922✔
1516
    int32_t len = 0;
252,922✔
1517
    if (metaGetTableTagByUid(pMeta, suid, p->uid, &val, &len, false) == 0) {
252,922!
1518
      p->pTagVal = taosMemoryMalloc(len);
252,922!
1519
      if (!p->pTagVal) {
252,921!
UNCOV
1520
        if (isLock) metaULock(pMeta);
×
1521

UNCOV
1522
        TAOS_RETURN(terrno);
×
1523
      }
1524
      memcpy(p->pTagVal, val, len);
252,921✔
1525
      tdbFree(val);
252,921✔
1526
    } else {
UNCOV
1527
      metaError("vgId:%d, failed to table tags, suid: %" PRId64 ", uid: %" PRId64 "", TD_VID(pMeta->pVnode), suid,
×
1528
                p->uid);
1529
    }
1530
  }
1531
  //  }
1532
  if (isLock) metaULock(pMeta);
252,572✔
1533
  return 0;
252,574✔
1534
}
1535

1536
int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) {
988,158✔
1537
  SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, suid, 1);
988,158✔
1538
  if (!pCur) {
989,128!
UNCOV
1539
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
1540
  }
1541

1542
  // If len > 0 means there already have uids, and we only want the
1543
  // tags of the specified tables, of which uid in the uid list. Otherwise, all table tags are retrieved and kept
1544
  // in the hash map, that may require a lot of memory
1545
  SHashObj *pSepecifiedUidMap = NULL;
989,128✔
1546
  size_t    numOfElems = taosArrayGetSize(pUidTagInfo);
989,128✔
1547
  if (numOfElems > 0) {
988,809✔
1548
    pSepecifiedUidMap =
1549
        taosHashInit(numOfElems / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
93,379✔
1550
    for (int i = 0; i < numOfElems; i++) {
380,257✔
1551
      STUidTagInfo *pTagInfo = taosArrayGet(pUidTagInfo, i);
286,857✔
1552
      int32_t       code = taosHashPut(pSepecifiedUidMap, &pTagInfo->uid, sizeof(uint64_t), &i, sizeof(int32_t));
286,811✔
1553
      if (code) {
286,916!
UNCOV
1554
        metaCloseCtbCursor(pCur);
×
UNCOV
1555
        taosHashCleanup(pSepecifiedUidMap);
×
UNCOV
1556
        return code;
×
1557
      }
1558
    }
1559
  }
1560

1561
  if (numOfElems == 0) {  // all data needs to be added into the pUidTagInfo list
988,830✔
1562
    while (1) {
2,666,024✔
1563
      tb_uid_t uid = metaCtbCursorNext(pCur);
3,560,653✔
1564
      if (uid == 0) {
3,556,184✔
1565
        break;
897,024✔
1566
      }
1567

1568
      STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
2,659,160✔
1569
      info.pTagVal = taosMemoryMalloc(pCur->vLen);
2,659,160!
1570
      if (!info.pTagVal) {
2,665,930!
UNCOV
1571
        metaCloseCtbCursor(pCur);
×
UNCOV
1572
        taosHashCleanup(pSepecifiedUidMap);
×
1573
        return terrno;
×
1574
      }
1575
      memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
2,665,930✔
1576
      if (taosArrayPush(pUidTagInfo, &info) == NULL) {
2,666,024!
UNCOV
1577
        taosMemoryFreeClear(info.pTagVal);
×
UNCOV
1578
        metaCloseCtbCursor(pCur);
×
UNCOV
1579
        taosHashCleanup(pSepecifiedUidMap);
×
UNCOV
1580
        return terrno;
×
1581
      }
1582
    }
1583
  } else {  // only the specified tables need to be added
1584
    while (1) {
385,313✔
1585
      tb_uid_t uid = metaCtbCursorNext(pCur);
479,514✔
1586
      if (uid == 0) {
478,865✔
1587
        break;
93,387✔
1588
      }
1589

1590
      int32_t *index = taosHashGet(pSepecifiedUidMap, &uid, sizeof(uint64_t));
385,478✔
1591
      if (index == NULL) {
385,765✔
1592
        continue;
99,183✔
1593
      }
1594

1595
      STUidTagInfo *pTagInfo = taosArrayGet(pUidTagInfo, *index);
286,582✔
1596
      if (pTagInfo->pTagVal == NULL) {
286,557!
1597
        pTagInfo->pTagVal = taosMemoryMalloc(pCur->vLen);
286,999✔
1598
        if (!pTagInfo->pTagVal) {
286,572!
UNCOV
1599
          metaCloseCtbCursor(pCur);
×
UNCOV
1600
          taosHashCleanup(pSepecifiedUidMap);
×
UNCOV
1601
          return terrno;
×
1602
        }
1603
        memcpy(pTagInfo->pTagVal, pCur->pVal, pCur->vLen);
286,572✔
1604
      }
1605
    }
1606
  }
1607

1608
  taosHashCleanup(pSepecifiedUidMap);
990,411✔
1609
  metaCloseCtbCursor(pCur);
990,251✔
1610
  return TSDB_CODE_SUCCESS;
990,155✔
1611
}
1612

1613
int32_t metaCacheGet(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo);
1614

1615
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo, SMetaReader *pReader) {
48,464,274✔
1616
  int32_t code = 0;
48,464,274✔
1617
  void   *pData = NULL;
48,464,274✔
1618
  int     nData = 0;
48,464,274✔
1619
  int     lock = 0;
48,464,274✔
1620

1621
  if (pReader && !(pReader->flags & META_READER_NOLOCK)) {
48,464,274!
1622
    lock = 1;
7,162,164✔
1623
  }
1624

1625
  if (!lock) metaRLock(pMeta);
48,464,274✔
1626

1627
  // search cache
1628
  if (metaCacheGet(pMeta, uid, pInfo) == 0) {
48,467,460✔
1629
    if (!lock) metaULock(pMeta);
34,666,726✔
1630
    goto _exit;
34,670,919✔
1631
  }
1632

1633
  // search TDB
1634
  if ((code = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData)) < 0) {
13,803,339✔
1635
    // not found
1636
    if (!lock) metaULock(pMeta);
13,784,407✔
1637
    goto _exit;
13,784,465✔
1638
  }
1639

1640
  if (!lock) metaULock(pMeta);
19,800✔
1641

1642
  pInfo->uid = uid;
19,802✔
1643
  pInfo->suid = ((SUidIdxVal *)pData)->suid;
19,802✔
1644
  pInfo->version = ((SUidIdxVal *)pData)->version;
19,802✔
1645
  pInfo->skmVer = ((SUidIdxVal *)pData)->skmVer;
19,802✔
1646

1647
  if (lock) {
19,802✔
1648
    metaULock(pReader->pMeta);
2,071✔
1649
    // metaReaderReleaseLock(pReader);
1650
  }
1651
  // upsert the cache
1652
  metaWLock(pMeta);
19,801✔
1653
  int32_t ret = metaCacheUpsert(pMeta, pInfo);
19,813✔
1654
  if (ret != 0) {
19,808!
UNCOV
1655
    metaError("vgId:%d, failed to upsert cache, uid:%" PRId64, TD_VID(pMeta->pVnode), uid);
×
1656
  }
1657
  metaULock(pMeta);
19,808✔
1658

1659
  if (lock) {
19,814✔
1660
    metaRLock(pReader->pMeta);
2,072✔
1661
  }
1662

1663
_exit:
17,742✔
1664
  tdbFree(pData);
48,475,198✔
1665
  return code;
48,473,108✔
1666
}
1667

1668
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t *numOfCols) {
304,709✔
1669
  int32_t code = 0;
304,709✔
1670

1671
  if (!numOfTables && !numOfCols) goto _exit;
304,709!
1672

1673
  SVnode *pVnodeObj = pVnode;
304,709✔
1674
  metaRLock(pVnodeObj->pMeta);
304,709✔
1675

1676
  // fast path: search cache
1677
  SMetaStbStats state = {0};
304,769✔
1678
  if (metaStatsCacheGet(pVnodeObj->pMeta, uid, &state) == TSDB_CODE_SUCCESS) {
304,769✔
1679
    metaULock(pVnodeObj->pMeta);
281,919✔
1680
    if (numOfTables) *numOfTables = state.ctbNum;
281,929✔
1681
    if (numOfCols) *numOfCols = state.colNum;
281,929✔
1682
    goto _exit;
281,929✔
1683
  }
1684

1685
  // slow path: search TDB
1686
  int64_t ctbNum = 0;
22,845✔
1687
  int32_t colNum = 0;
22,845✔
1688
  int64_t keep = 0;
22,845✔
1689
  code = vnodeGetCtbNum(pVnode, uid, &ctbNum);
22,845✔
1690
  if (TSDB_CODE_SUCCESS == code) {
22,842!
1691
    code = vnodeGetStbColumnNum(pVnode, uid, &colNum);
22,842✔
1692
  }
1693
  if (TSDB_CODE_SUCCESS == code) {
22,844✔
1694
    code = vnodeGetStbKeep(pVnode, uid, &keep);
22,842✔
1695
  }
1696
  metaULock(pVnodeObj->pMeta);
22,846✔
1697
  if (TSDB_CODE_SUCCESS != code) {
22,845!
UNCOV
1698
    goto _exit;
×
1699
  }
1700

1701
  if (numOfTables) *numOfTables = ctbNum;
22,845✔
1702
  if (numOfCols) *numOfCols = colNum;
22,845✔
1703

1704
  state.uid = uid;
22,845✔
1705
  state.ctbNum = ctbNum;
22,845✔
1706
  state.colNum = colNum;
22,845✔
1707
  state.keep = keep;
22,845✔
1708
  // upsert the cache
1709
  metaWLock(pVnodeObj->pMeta);
22,845✔
1710

1711
  int32_t ret = metaStatsCacheUpsert(pVnodeObj->pMeta, &state);
22,846✔
1712
  if (ret) {
22,843!
UNCOV
1713
    metaError("failed to upsert stats, uid:%" PRId64 ", ctbNum:%" PRId64 ", colNum:%d, keep:%" PRId64, uid, ctbNum,
×
1714
              colNum, keep);
1715
  }
1716

1717
  metaULock(pVnodeObj->pMeta);
22,843✔
1718

1719
_exit:
304,755✔
1720
  return code;
304,755✔
1721
}
1722

1723
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t deltaCtb, int32_t deltaCol, int64_t deltaKeep) {
158,863✔
1724
  SMetaStbStats stats = {0};
158,863✔
1725

1726
  if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
158,863✔
1727
    stats.ctbNum += deltaCtb;
144,922✔
1728
    stats.colNum += deltaCol;
144,922✔
1729
    if (deltaKeep > 0) {
144,922!
UNCOV
1730
      stats.keep = deltaKeep;
×
1731
    }
1732

1733
    int32_t code = metaStatsCacheUpsert(pMeta, &stats);
144,922✔
1734
    if (code) {
144,915!
UNCOV
1735
      metaError("vgId:%d, failed to update stats, uid:%" PRId64 ", ctbNum:%" PRId64 ", colNum:%d, keep:%" PRId64,
×
1736
                TD_VID(pMeta->pVnode), uid, deltaCtb, deltaCol, deltaKeep > 0 ? deltaKeep : stats.keep);
1737
    }
1738
  }
1739
}
158,909✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc