• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4931

16 Jan 2026 02:32AM UTC coverage: 66.749% (+0.03%) from 66.716%
#4931

push

travis-ci

web-flow
enh: interp supports using non-null prev/next values to fill (#34236)

281 of 327 new or added lines in 11 files covered. (85.93%)

1890 existing lines in 121 files now uncovered.

203303 of 304580 relevant lines covered (66.75%)

129941648.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.97
/source/dnode/vnode/src/meta/metaSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17

18
// SMetaSnapReader ========================================
19
struct SMetaSnapReader {
20
  SMeta*  pMeta;
21
  int64_t sver;
22
  int64_t ever;
23
  TBC*    pTbc;
24
  int32_t iLoop;
25
};
26

27
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
27,518✔
28
  int32_t          code = 0;
27,518✔
29
  int32_t          lino;
30
  int32_t          c = 0;
27,518✔
31
  SMetaSnapReader* pReader = NULL;
27,518✔
32

33
  // alloc
34
  pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
27,518✔
35
  if (pReader == NULL) {
27,518✔
36
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
37
  }
38
  pReader->pMeta = pMeta;
27,518✔
39
  pReader->sver = sver;
27,518✔
40
  pReader->ever = ever;
27,518✔
41

42
  // impl
43
  code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
27,518✔
44
  TSDB_CHECK_CODE(code, lino, _exit);
27,518✔
45

46
  code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
27,518✔
47
  TSDB_CHECK_CODE(code, lino, _exit);
27,518✔
48

49
_exit:
27,518✔
50
  if (code) {
27,518✔
51
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
52
    metaSnapReaderClose(&pReader);
×
53
    *ppReader = NULL;
×
54
  } else {
55
    metaInfo("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
27,518✔
56
    *ppReader = pReader;
27,518✔
57
  }
58
  return code;
27,518✔
59
}
60

61
void metaSnapReaderClose(SMetaSnapReader** ppReader) {
27,518✔
62
  if (ppReader && *ppReader) {
27,518✔
63
    tdbTbcClose((*ppReader)->pTbc);
27,518✔
64
    taosMemoryFree(*ppReader);
27,518✔
65
    *ppReader = NULL;
27,518✔
66
  }
67
}
27,518✔
68

69
extern int metaDecodeEntryImpl(SDecoder* pCoder, SMetaEntry* pME, bool headerOnly);
70

71
static int32_t metaDecodeEntryHeader(void* data, int32_t size, SMetaEntry* entry) {
318,224✔
72
  SDecoder decoder = {0};
318,224✔
73
  tDecoderInit(&decoder, (uint8_t*)data, size);
318,224✔
74

75
  int32_t code = metaDecodeEntryImpl(&decoder, entry, true);
318,224✔
76
  if (code) {
318,224✔
77
    tDecoderClear(&decoder);
×
78
    return code;
×
79
  }
80

81
  tDecoderClear(&decoder);
318,224✔
82
  return 0;
318,224✔
83
}
84

85
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
186,397✔
86
  int32_t     code = 0;
186,397✔
87
  const void* pKey = NULL;
186,397✔
88
  const void* pData = NULL;
186,397✔
89
  int32_t     nKey = 0;
186,397✔
90
  int32_t     nData = 0;
186,397✔
91
  STbDbKey    key;
92
  int32_t     c;
186,397✔
93

94
  *ppData = NULL;
186,397✔
95
  while (pReader->iLoop < 2) {
400,778✔
96
    if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData) != 0 || ((STbDbKey*)pKey)->version > pReader->ever) {
373,260✔
97
      pReader->iLoop++;
55,036✔
98

99
      // Reopen the cursor to read from the beginning
100
      tdbTbcClose(pReader->pTbc);
55,036✔
101
      pReader->pTbc = NULL;
55,036✔
102
      code = tdbTbcOpen(pReader->pMeta->pTbDb, &pReader->pTbc, NULL);
55,036✔
103
      if (code) {
55,036✔
104
        metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
×
105
                  tstrerror(code));
106
        goto _exit;
×
107
      }
108

109
      code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = pReader->sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
55,036✔
110
      if (code) {
55,036✔
111
        metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
×
112
                  tstrerror(code));
113
        goto _exit;
×
114
      }
115

116
      continue;
55,036✔
117
    }
118

119
    // Decode meta entry
120
    SMetaEntry entry = {0};
318,224✔
121
    code = metaDecodeEntryHeader((void*)pData, nData, &entry);
318,224✔
122
    if (code) {
318,224✔
123
      metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
×
124
                tstrerror(code));
125
      goto _exit;
×
126
    }
127

128
    key = ((STbDbKey*)pKey)[0];
318,224✔
129
    if (key.version < pReader->sver                                       //
318,224✔
130
        || (pReader->iLoop == 0 && TABS(entry.type) != TSDB_SUPER_TABLE)  // First loop send super table entry
317,758✔
131
        || (pReader->iLoop == 1 && TABS(entry.type) == TSDB_SUPER_TABLE)  // Second loop send non-super table entry
183,171✔
132
    ) {
133
      if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
159,345✔
134
        metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
135
      }
136
      continue;
159,345✔
137
    }
138

139
    if (!pData || !nData) {
158,879✔
140
      metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData);
×
141
      goto _exit;
×
142
    }
143

144
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
158,879✔
145
    if (*ppData == NULL) {
158,879✔
146
      code = terrno;
×
147
      goto _exit;
×
148
    }
149

150
    SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
158,879✔
151
    pHdr->type = SNAP_DATA_META;
158,879✔
152
    pHdr->size = nData;
158,879✔
153
    memcpy(pHdr->data, pData, nData);
158,879✔
154

155
    metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
158,879✔
156
              TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
157

158
    if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
158,879✔
159
      metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
160
    }
161
    break;
158,879✔
162
  }
163

164
_exit:
186,397✔
165
  if (code) {
186,397✔
166
    metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode),
×
167
              tstrerror(code));
168
  }
169
  return code;
186,397✔
170
}
171

172
// SMetaSnapWriter ========================================
173
struct SMetaSnapWriter {
174
  SMeta*  pMeta;
175
  int64_t sver;
176
  int64_t ever;
177
};
178

179
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
24,219✔
180
  int32_t          code = 0;
24,219✔
181
  int32_t          lino;
182
  SMetaSnapWriter* pWriter;
183

184
  // alloc
185
  pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
24,219✔
186
  if (pWriter == NULL) {
24,219✔
187
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
188
  }
189
  pWriter->pMeta = pMeta;
24,219✔
190
  pWriter->sver = sver;
24,219✔
191
  pWriter->ever = ever;
24,219✔
192

193
  code = metaBegin(pMeta, META_BEGIN_HEAP_NIL);
24,219✔
194
  TSDB_CHECK_CODE(code, lino, _exit);
24,219✔
195

196
_exit:
24,219✔
197
  if (code) {
24,219✔
198
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
199
    taosMemoryFree(pWriter);
×
200
    *ppWriter = NULL;
×
201
  } else {
202
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
24,219✔
203
    *ppWriter = pWriter;
24,219✔
204
  }
205
  return code;
24,219✔
206
}
207

208
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
24,219✔
209
  int32_t          code = 0;
24,219✔
210
  SMetaSnapWriter* pWriter = *ppWriter;
24,219✔
211

212
  if (rollback) {
24,219✔
UNCOV
213
    metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
×
UNCOV
214
    code = metaAbort(pWriter->pMeta);
×
UNCOV
215
    metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
×
216
             code);
UNCOV
217
    if (code) goto _err;
×
218
  } else {
219
    code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
24,219✔
220
    if (code) goto _err;
24,219✔
221
    code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
24,219✔
222
    if (code) goto _err;
24,219✔
223
  }
224
  taosMemoryFree(pWriter);
24,219✔
225
  *ppWriter = NULL;
24,219✔
226

227
  return code;
24,219✔
228

229
_err:
×
230
  metaError("vgId:%d, meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
×
231
  return code;
×
232
}
233

234
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
158,866✔
235
  int32_t    code = 0;
158,866✔
236
  int32_t    lino = 0;
158,866✔
237
  SMeta*     pMeta = pWriter->pMeta;
158,866✔
238
  SMetaEntry metaEntry = {0};
158,866✔
239
  SDecoder*  pDecoder = &(SDecoder){0};
158,866✔
240

241
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
158,866✔
242
  code = metaDecodeEntry(pDecoder, &metaEntry);
158,866✔
243
  TSDB_CHECK_CODE(code, lino, _exit);
158,866✔
244

245
  metaHandleSyncEntry(pMeta, &metaEntry);
158,866✔
246

247
_exit:
158,866✔
248
  if (code) {
158,866✔
249
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
250
  }
251
  tDecoderClear(pDecoder);
158,866✔
252
  return code;
158,866✔
253
}
254

255
typedef struct STableInfoForChildTable {
256
  char*           tableName;
257
  SSchemaWrapper* schemaRow;
258
  SSchemaWrapper* tagRow;
259
  SExtSchema*     pExtSchemas;
260
} STableInfoForChildTable;
261

262
static void destroySTableInfoForChildTable(void* data) {
165,835✔
263
  STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
165,835✔
264
  taosMemoryFree(pData->tableName);
165,835✔
265
  tDeleteSchemaWrapper(pData->schemaRow);
165,516✔
266
  tDeleteSchemaWrapper(pData->tagRow);
165,835✔
267
  taosMemoryFreeClear(pData->pExtSchemas);
165,835✔
268
}
165,835✔
269

270
static int32_t MoveToSnapShotVersion(SSnapContext* ctx) {
114,131✔
271
  int32_t code = 0;
114,131✔
272
  tdbTbcClose((TBC*)ctx->pCur);
114,131✔
273
  code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
114,131✔
274
  if (code != 0) {
114,131✔
275
    return TAOS_GET_TERRNO(code);
×
276
  }
277
  STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
114,131✔
278
  int      c = 0;
114,131✔
279
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
114,131✔
280
  if (code != 0) {
114,131✔
281
    return TAOS_GET_TERRNO(code);
×
282
  }
283
  if (c < 0) {
114,131✔
284
    if (tdbTbcMoveToPrev((TBC*)ctx->pCur) != 0) {
3,775✔
285
      metaTrace("vgId:%d, vnode snapshot move to prev failed", TD_VID(ctx->pMeta->pVnode));
×
286
    }
287
  }
288
  return 0;
114,131✔
289
}
290

291
static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) {
1,491,553✔
292
  tdbTbcClose((TBC*)ctx->pCur);
1,491,553✔
293
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
1,491,849✔
294
  if (code != 0) {
1,489,241✔
295
    return TAOS_GET_TERRNO(code);
×
296
  }
297
  STbDbKey key = {.version = ver, .uid = uid};
1,489,241✔
298
  int      c = 0;
1,489,537✔
299
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
1,490,950✔
300
  if (code != 0) {
1,490,697✔
301
    return TAOS_GET_TERRNO(code);
×
302
  }
303
  return c;
1,490,697✔
304
}
305

306
static int32_t MoveToFirst(SSnapContext* ctx) {
114,131✔
307
  tdbTbcClose((TBC*)ctx->pCur);
114,131✔
308
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
114,131✔
309
  if (code != 0) {
114,131✔
310
    return TAOS_GET_TERRNO(code);
×
311
  }
312
  code = tdbTbcMoveToFirst((TBC*)ctx->pCur);
114,131✔
313
  if (code != 0) {
114,131✔
314
    return TAOS_GET_TERRNO(code);
×
315
  }
316
  return 0;
114,131✔
317
}
318

319
static int32_t saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) {
165,835✔
320
  STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
165,835✔
321
  if (data) {
165,835✔
322
    return 0;
×
323
  }
324
  int32_t                 code = 0;
165,835✔
325
  STableInfoForChildTable dataTmp = {0};
165,835✔
326
  dataTmp.tableName = taosStrdup(me->name);
165,835✔
327
  if (dataTmp.tableName == NULL) {
165,835✔
328
    code = terrno;
×
329
    goto END;
×
330
  }
331
  dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
165,835✔
332
  if (dataTmp.schemaRow == NULL) {
165,835✔
333
    code = TSDB_CODE_OUT_OF_MEMORY;
×
334
    goto END;
×
335
  }
336
  dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
165,835✔
337
  if (dataTmp.tagRow == NULL) {
165,835✔
338
    code = TSDB_CODE_OUT_OF_MEMORY;
×
339
    goto END;
×
340
  }
341
  if (me->pExtSchemas != NULL) {
165,835✔
342
    dataTmp.pExtSchemas = taosMemoryMalloc(sizeof(SExtSchema) * me->stbEntry.schemaRow.nCols);
7,515✔
343
    if (dataTmp.pExtSchemas == NULL) {
7,515✔
344
      code = TSDB_CODE_OUT_OF_MEMORY;
×
345
      goto END;
×
346
    }
347
    memcpy(dataTmp.pExtSchemas, me->pExtSchemas, sizeof(SExtSchema) * me->stbEntry.schemaRow.nCols);
7,515✔
348
  }
349
  
350
  code = taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
165,835✔
351
  if (code != 0) {
165,835✔
352
    goto END;
×
353
  }
354
  return 0;
165,835✔
355

356
END:
×
357
  destroySTableInfoForChildTable(&dataTmp);
×
358
  return TAOS_GET_TERRNO(code);
×
359
}
360

361
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
114,131✔
362
                         SSnapContext** ctxRet) {
363
  int32_t code = 0;
114,131✔
364
  int32_t lino = 0;
114,131✔
365
  SDecoder   dc = {0};
114,131✔
366
  void* pKey = NULL;
114,131✔
367
  void* pVal = NULL;
114,131✔
368
  int   vLen = 0, kLen = 0;
114,131✔
369

370
  metaRLock(pVnode->pMeta);
114,131✔
371
  SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
114,131✔
372
  TSDB_CHECK_NULL(ctx, code, lino, END, terrno);
113,810✔
373
  *ctxRet = ctx;
113,810✔
374
  ctx->pMeta = pVnode->pMeta;
113,810✔
375
  ctx->snapVersion = snapVersion;
114,131✔
376
  ctx->suid = suid;
114,131✔
377
  ctx->subType = subType;
114,131✔
378
  ctx->queryMeta = withMeta;
114,131✔
379
  ctx->withMeta = withMeta;
114,131✔
380
  ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
114,131✔
381
  TSDB_CHECK_NULL(ctx->idVersion, code, lino, END, terrno);
114,131✔
382
  ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
114,131✔
383
  TSDB_CHECK_NULL(ctx->suidInfo, code, lino, END, terrno);
114,131✔
384
  taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);
114,131✔
385

386
  ctx->index = 0;
114,131✔
387
  ctx->idList = taosArrayInit(100, sizeof(int64_t));
114,131✔
388
  TSDB_CHECK_NULL(ctx->idList, code, lino, END, terrno);
114,131✔
389

390
  metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
114,131✔
391
  code = MoveToFirst(ctx);
114,131✔
392
  TSDB_CHECK_CODE(code, lino, END);
114,131✔
393
  while (1) {
8,709,658✔
394
    int32_t ret = tdbTbcNext((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
8,823,789✔
395
    if (ret < 0) break;
8,815,635✔
396
    STbDbKey* tmp = (STbDbKey*)pKey;
8,705,886✔
397
    if (tmp->version > ctx->snapVersion) break;
8,705,886✔
398

399
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
8,712,520✔
400
    if (idData) {
8,708,724✔
401
      continue;
84,925✔
402
    }
403

404
    // check if table exist for now, need optimize later
405
    if (tdbTbGet(ctx->pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) < 0) {
8,623,799✔
406
      continue;
86,525✔
407
    }
408

409
    SMetaEntry me = {0};
8,532,269✔
410
    tDecoderInit(&dc, pVal, vLen);
8,530,688✔
411
    code = metaDecodeEntry(&dc, &me);
8,543,277✔
412
    TSDB_CHECK_CODE(code, lino, END);
8,519,784✔
413
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
8,519,784✔
414
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
187,582✔
415
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
187,329✔
416
        tDecoderClear(&dc);
1,012✔
417
        continue;
1,012✔
418
      }
419
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
8,338,041✔
420
      if (me.type == TSDB_VIRTUAL_NORMAL_TABLE ||
8,341,959✔
421
          me.type == TSDB_VIRTUAL_CHILD_TABLE ||
8,331,421✔
422
          TABLE_IS_VIRTUAL(me.flags)) {
8,341,204✔
423
        tDecoderClear(&dc);
899✔
424
        continue;
×
425
      }
426
    }
427

428
    TSDB_CHECK_NULL(taosArrayPush(ctx->idList, &tmp->uid), code, lino, END, terrno);
17,046,895✔
429
    metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
8,520,648✔
430
    tDecoderClear(&dc);
8,525,902✔
431

432
    SIdInfo info = {0};
8,520,806✔
433
    code = taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
8,530,310✔
434
    TSDB_CHECK_CODE(code, lino, END);
8,537,812✔
435
  }
436
  taosHashClear(ctx->idVersion);
114,131✔
437

438
  code = MoveToSnapShotVersion(ctx);
114,131✔
439
  TSDB_CHECK_CODE(code, lino, END);
114,131✔
440

441
  while (1) {
8,715,136✔
442
    int32_t ret = tdbTbcPrev((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
8,829,267✔
443
    if (ret < 0) break;
8,826,487✔
444

445
    STbDbKey* tmp = (STbDbKey*)pKey;
8,712,356✔
446
    SIdInfo*  idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
8,712,356✔
447
    if (idData) {
8,708,539✔
448
      continue;
141,976✔
449
    }
450
    SIdInfo info = {.version = tmp->version, .index = 0};
8,566,563✔
451
    code = taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
8,566,851✔
452
    TSDB_CHECK_CODE(code, lino, END);
8,573,301✔
453

454
    SMetaEntry me = {0};
8,573,301✔
455
    tDecoderInit(&dc, pVal, vLen);
8,572,988✔
456
    code = metaDecodeEntry(&dc, &me);
8,573,873✔
457
    TSDB_CHECK_CODE(code, lino, END);
8,572,644✔
458

459
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
8,572,644✔
460
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
187,582✔
461
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
187,329✔
462
        tDecoderClear(&dc);
1,012✔
463
        continue;
1,012✔
464
      }
465
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
8,384,747✔
466
      if (me.type == TSDB_VIRTUAL_NORMAL_TABLE ||
8,384,747✔
467
          me.type == TSDB_VIRTUAL_CHILD_TABLE ||
8,384,747✔
468
          TABLE_IS_VIRTUAL(me.flags)) {
8,384,747✔
469
        tDecoderClear(&dc);
×
470
        continue;
×
471
      }
472
    }
473

474
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
8,571,317✔
475
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
8,425,896✔
476
      code = saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
165,259✔
477
      TSDB_CHECK_CODE(code, lino, END);
165,835✔
478
    }
479
    tDecoderClear(&dc);
8,572,504✔
480

481
  }
482

483
  for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
8,660,854✔
484
    int64_t* uid = taosArrayGet(ctx->idList, i);
8,547,035✔
485
    TSDB_CHECK_NULL(uid, code, lino, END, terrno);
8,547,035✔
486
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
8,547,035✔
487
    TSDB_CHECK_NULL(idData, code, lino, END, terrno);
8,547,348✔
488

489
    idData->index = i;
8,547,348✔
490
    metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version, idData->index);
8,547,348✔
491
  }
492

493
END:
111,539✔
494
  tdbFree(pKey);
114,131✔
495
  tdbFree(pVal);
114,131✔
496
  tDecoderClear(&dc);
114,131✔
497

498
  if (ctx != NULL) {
114,131✔
499
    tdbTbcClose((TBC*)ctx->pCur);
114,131✔
500
    ctx->pCur = NULL;
114,131✔
501
  }
502
  metaULock(pVnode->pMeta);
114,131✔
503

504
  if(code != 0) {
114,131✔
505
    destroySnapContext(ctx);
×
506
    *ctxRet = NULL;
×
507
    metaError("tmqsnap build snap context failed line:%d since %s", lino, tstrerror(code));
×
508
  }
509
  return code;
114,131✔
510
}
511

512
void destroySnapContext(SSnapContext* ctx) {
541,147✔
513
  if (ctx == NULL) {
541,147✔
514
    return;
428,428✔
515
  }
516
  taosArrayDestroy(ctx->idList);
112,719✔
517
  taosHashCleanup(ctx->idVersion);
114,131✔
518
  taosHashCleanup(ctx->suidInfo);
114,131✔
519
  taosMemoryFree(ctx);
113,812✔
520
}
521

522
static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) {
23,672✔
523
  int32_t            ret = 0;
23,672✔
524
  SVCreateTbBatchReq reqs = {0};
23,672✔
525

526
  reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
23,672✔
527
  if (NULL == reqs.pArray) {
23,672✔
528
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
529
    goto end;
×
530
  }
531
  if (taosArrayPush(reqs.pArray, req) == NULL) {
47,344✔
532
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
533
    goto end;
×
534
  }
535
  reqs.nReqs = 1;
23,672✔
536

537
  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
23,672✔
538
  if (ret < 0) {
23,672✔
539
    ret = TAOS_GET_TERRNO(ret);
×
540
    goto end;
×
541
  }
542
  *contLen += sizeof(SMsgHead);
23,672✔
543
  *pBuf = taosMemoryMalloc(*contLen);
23,672✔
544
  if (NULL == *pBuf) {
23,672✔
545
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
546
    goto end;
×
547
  }
548
  SEncoder coder = {0};
23,672✔
549
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
23,672✔
550
  ret = tEncodeSVCreateTbBatchReq(&coder, &reqs);
23,672✔
551
  tEncoderClear(&coder);
23,672✔
552

553
  if (ret < 0) {
23,672✔
554
    taosMemoryFreeClear(*pBuf);
×
555
    ret = TAOS_GET_TERRNO(ret);
×
556
    goto end;
×
557
  }
558

559
end:
23,672✔
560
  taosArrayDestroy(reqs.pArray);
23,672✔
561
  return ret;
23,672✔
562
}
563

564
static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) {
10,046✔
565
  int32_t ret = 0;
10,046✔
566
  tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
10,046✔
567
  if (ret < 0) {
10,046✔
568
    return TAOS_GET_TERRNO(ret);
×
569
  }
570

571
  *contLen += sizeof(SMsgHead);
10,046✔
572
  *pBuf = taosMemoryMalloc(*contLen);
10,046✔
573
  if (NULL == *pBuf) {
10,046✔
574
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
575
  }
576

577
  SEncoder encoder = {0};
10,046✔
578
  tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
10,046✔
579
  ret = tEncodeSVCreateStbReq(&encoder, req);
10,046✔
580
  tEncoderClear(&encoder);
10,046✔
581
  if (ret < 0) {
10,046✔
582
    taosMemoryFreeClear(*pBuf);
×
583
    return TAOS_GET_TERRNO(ret);
×
584
  }
585
  return 0;
10,046✔
586
}
587

588
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
730,721✔
589
  if (uid == 0) {
730,721✔
590
    ctx->index = 0;
14,933✔
591
    return 0;
14,933✔
592
  }
593

594
  SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
715,788✔
595
  if (idInfo == NULL) {
715,788✔
596
    return terrno;
×
597
  }
598

599
  ctx->index = idInfo->index;
715,788✔
600

601
  return 0;
715,788✔
602
}
603

604
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
725,267✔
605
  bool            ret = false;
725,267✔
606
  SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL, 0);
725,267✔
607
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
722,379✔
608
    ret = true;
1,277✔
609
  }
610
  tDeleteSchemaWrapper(schema);
611
  ctx->hasPrimaryKey = ret;
720,931✔
612
}
722,659✔
613

614
bool taosXGetTablePrimaryKey(SSnapContext* ctx) { return ctx->hasPrimaryKey; }
1,434,334✔
615

616
int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
38,354✔
617
  int32_t ret = 0;
38,354✔
618
  int32_t lino = 0;
38,354✔
619
  void*   pKey = NULL;
38,354✔
620
  void*   pVal = NULL;
38,354✔
621
  int     vLen = 0, kLen = 0;
38,354✔
622
  SDecoder   dc = {0};
38,354✔
623
  SArray* tagName = NULL;
38,354✔
624
  SArray* pTagVals = NULL;
38,354✔
625

626
  metaRLock(ctx->pMeta);
38,354✔
627
  while (1) {
×
628
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
38,354✔
629
      metaDebug("tmqsnap get meta end");
4,636✔
630
      ctx->index = 0;
4,636✔
631
      ctx->queryMeta = 0;  // change to get data
4,636✔
632
      goto END;
4,636✔
633
    }
634

635
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
33,718✔
636
    TSDB_CHECK_NULL(uidTmp, ret, lino, END, terrno);
33,718✔
637
    ctx->index++;
33,718✔
638
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
33,718✔
639
    TSDB_CHECK_NULL(idInfo, ret, lino, END, terrno);
33,718✔
640

641
    *uid = *uidTmp;
33,718✔
642
    ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
33,718✔
643
    if (ret == 0) {
33,718✔
644
      break;
33,718✔
645
    }
646
    metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version);
×
647
  }
648

649
  ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
33,718✔
650
  TSDB_CHECK_CONDITION(ret >= 0, ret, lino, END, TAOS_GET_TERRNO(ret));
33,718✔
651
  SMetaEntry me = {0};
33,718✔
652
  tDecoderInit(&dc, pVal, vLen);
33,718✔
653
  ret = metaDecodeEntry(&dc, &me);
33,718✔
654
  TSDB_CHECK_CONDITION(ret >= 0, ret, lino, END, TAOS_GET_TERRNO(ret));
33,718✔
655
  metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
33,718✔
656

657
  if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
33,718✔
658
      (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
33,718✔
659
    SVCreateStbReq req = {0};
10,046✔
660
    req.name = me.name;
10,046✔
661
    req.suid = me.uid;
10,046✔
662
    req.schemaRow = me.stbEntry.schemaRow;
10,046✔
663
    req.schemaTag = me.stbEntry.schemaTag;
10,046✔
664
    req.schemaRow.version = 1;
10,046✔
665
    req.schemaTag.version = 1;
10,046✔
666
    req.colCmpr = me.colCmpr;
10,046✔
667
    req.pExtSchemas = me.pExtSchemas;
10,046✔
668

669
    ret = buildSuperTableInfo(&req, pBuf, contLen);
10,046✔
670
    *type = TDMT_VND_CREATE_STB;
10,046✔
671
  } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
23,672✔
672
             (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
23,672✔
673
    STableInfoForChildTable* data =
674
        (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
20,480✔
675
    TSDB_CHECK_NULL(data, ret, lino, END, terrno);
20,480✔
676

677
    SVCreateTbReq req = {0};
20,480✔
678

679
    req.type = TSDB_CHILD_TABLE;
20,480✔
680
    req.name = me.name;
20,480✔
681
    req.uid = me.uid;
20,480✔
682
    req.commentLen = -1;
20,480✔
683
    req.ctb.suid = me.ctbEntry.suid;
20,480✔
684
    req.ctb.tagNum = data->tagRow->nCols;
20,480✔
685
    req.ctb.stbName = data->tableName;
20,480✔
686

687
    tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
20,480✔
688
    TSDB_CHECK_NULL(tagName, ret, lino, END, terrno);
20,480✔
689
    STag* p = (STag*)me.ctbEntry.pTags;
20,480✔
690
    if (tTagIsJson(p)) {
20,480✔
691
      if (p->nTag != 0) {
3,036✔
692
        SSchema* schema = &data->tagRow->pSchema[0];
1,518✔
693
        TSDB_CHECK_NULL(taosArrayPush(tagName, schema->name), ret, lino, END, terrno);
3,036✔
694
      }
695
    } else {
696
      ret = tTagToValArray((const STag*)p, &pTagVals);
17,444✔
697
      TSDB_CHECK_CODE(ret, lino, END);
17,444✔
698
      int16_t nCols = taosArrayGetSize(pTagVals);
17,444✔
699
      for (int j = 0; j < nCols; ++j) {
60,188✔
700
        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
42,744✔
701
        for (int i = 0; pTagVal && i < data->tagRow->nCols; i++) {
173,532✔
702
          SSchema* schema = &data->tagRow->pSchema[i];
130,788✔
703
          if (schema->colId == pTagVal->cid) {
130,788✔
704
            TSDB_CHECK_NULL(taosArrayPush(tagName, schema->name), ret, lino, END, terrno);
85,488✔
705
          }
706
        }
707
      }
708
    }
709
    req.ctb.pTag = me.ctbEntry.pTags;
20,480✔
710
    req.ctb.tagName = tagName;
20,480✔
711
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
20,480✔
712
    *type = TDMT_VND_CREATE_TABLE;
20,480✔
713
  } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
6,384✔
714
    SVCreateTbReq req = {0};
3,192✔
715
    req.type = TSDB_NORMAL_TABLE;
3,192✔
716
    req.name = me.name;
3,192✔
717
    req.uid = me.uid;
3,192✔
718
    req.commentLen = -1;
3,192✔
719
    req.ntb.schemaRow = me.ntbEntry.schemaRow;
3,192✔
720
    req.colCmpr = me.colCmpr;
3,192✔
721
    req.pExtSchemas = me.pExtSchemas;
3,192✔
722
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
3,192✔
723
    *type = TDMT_VND_CREATE_TABLE;
3,192✔
724
  } else {
725
    metaError("meta/snap: invalid topic sub type: %" PRId8 " get meta from snap failed.", ctx->subType);
×
726
    ret = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
×
727
  }
728

729
END:
38,354✔
730
  tdbTbcClose((TBC*)ctx->pCur);
38,354✔
731
  ctx->pCur = NULL;
38,354✔
732
  taosArrayDestroy(pTagVals);
38,354✔
733
  taosArrayDestroy(tagName);
38,354✔
734
  tDecoderClear(&dc);
38,354✔
735
  metaULock(ctx->pMeta);
38,354✔
736

737
  if(ret != 0) {
38,354✔
738
    metaError("tmqsnap get table info from snapshot failed line:%d since %s", lino, tstrerror(ret));
×
739
  }
740
  return ret;
38,354✔
741
}
742

743
int32_t getMetaTableInfoFromSnapshot(SSnapContext* ctx, SMetaTableInfo* result) {
1,449,495✔
744
  void* pKey = NULL;
1,449,495✔
745
  void* pVal = NULL;
1,450,079✔
746
  int   vLen = 0;
1,450,367✔
747
  int   kLen = 0;
1,450,079✔
748
  int32_t code = 0;
1,450,079✔
749
  int32_t lino = 0;
1,450,079✔
750
  SDecoder   dc = {0};
1,450,079✔
751

752
  metaRLock(ctx->pMeta);
1,450,655✔
753
  while (1) {
17,613✔
754
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
1,468,268✔
755
      metaDebug("tmqsnap get uid info end");
9,884✔
756
      goto END;
9,884✔
757
    }
758
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
1,458,384✔
759
    TSDB_CHECK_NULL(uidTmp, code, lino, END, terrno);
1,458,096✔
760
    ctx->index++;
1,458,096✔
761
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
1,458,088✔
762
    TSDB_CHECK_NULL(idInfo, code, lino, END, terrno);
1,457,843✔
763

764
    if (MoveToPosition(ctx, idInfo->version, *uidTmp) != 0) {
1,457,843✔
765
      metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp,
×
766
                idInfo->version);
767
      continue;
×
768
    }
769
    code = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
1,455,862✔
770
    TSDB_CHECK_CODE(code, lino, END);
1,457,547✔
771
    SMetaEntry me = {0};
1,457,547✔
772
    tDecoderInit(&dc, pVal, vLen);
1,457,800✔
773
    code = metaDecodeEntry(&dc, &me);
1,457,520✔
774
    TSDB_CHECK_CODE(code, lino, END);
1,456,368✔
775
    metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
1,456,368✔
776

777
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
1,456,368✔
778
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
1,458,707✔
779
      STableInfoForChildTable* data =
780
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
283,123✔
781
      TSDB_CHECK_NULL(data, code, lino, END, terrno);
286,587✔
782
      result->suid = me.ctbEntry.suid;
286,587✔
783
      result->schema = tCloneSSchemaWrapper(data->schemaRow);
573,174✔
784
      if (data->pExtSchemas != NULL) {
286,587✔
785
        result->pExtSchemas = taosMemoryMalloc(sizeof(SExtSchema) * data->schemaRow->nCols);
7,903✔
786
        TSDB_CHECK_NULL(result->pExtSchemas, code, lino, END, terrno);
7,903✔
787
        memcpy(result->pExtSchemas, data->pExtSchemas, sizeof(SExtSchema) * data->schemaRow->nCols);
7,903✔
788
      }
789
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
1,171,221✔
790
      result->suid = 0;
1,152,744✔
791
      result->schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
1,151,592✔
792
      if (me.pExtSchemas != NULL) {
1,149,864✔
793
        result->pExtSchemas = taosMemoryMalloc(sizeof(SExtSchema) * me.ntbEntry.schemaRow.nCols);
×
794
        TSDB_CHECK_NULL(result->pExtSchemas, code, lino, END, terrno);
×
795
        memcpy(result->pExtSchemas, me.pExtSchemas, sizeof(SExtSchema) * me.ntbEntry.schemaRow.nCols);
×
796
      }
797
    } else {
798
      metaDebug("tmqsnap get uid continue");
17,037✔
799
      tDecoderClear(&dc);
17,037✔
800
      continue;
17,613✔
801
    }
802
    result->uid = me.uid;
1,436,451✔
803
    tstrncpy(result->tbName, me.name, TSDB_TABLE_NAME_LEN);
1,437,027✔
804
    TSDB_CHECK_NULL(result->schema, code, lino, END, TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY));
1,437,011✔
805
    break;
1,435,579✔
806
  }
807

808
END:
1,448,335✔
809
  tDecoderClear(&dc);
1,450,079✔
810
  tdbTbcClose((TBC*)ctx->pCur);
1,450,367✔
811
  ctx->pCur = NULL;
1,448,039✔
812
  metaULock(ctx->pMeta);
1,448,615✔
813

814
  if (code != 0) {
1,446,615✔
815
    metaError("tmqsnap get meta table info from snapshot failed line:%d since %s", lino, tstrerror(code));
×
816
  }
817
  return code;
1,446,615✔
818
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc