• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4768

01 Oct 2025 04:06AM UTC coverage: 57.85% (-0.8%) from 58.606%
#4768

push

travis-ci

web-flow
Merge pull request #33171 from taosdata/merge/3.3.6tomain

merge: from 3.3.6 to main branch

137167 of 302743 branches covered (45.31%)

Branch coverage included in aggregate %.

15 of 20 new or added lines in 2 files covered. (75.0%)

12125 existing lines in 175 files now uncovered.

208282 of 294403 relevant lines covered (70.75%)

5618137.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.62
/source/dnode/vnode/src/meta/metaSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17

18
// SMetaSnapReader ========================================
19
struct SMetaSnapReader {
20
  SMeta*  pMeta;
21
  int64_t sver;
22
  int64_t ever;
23
  TBC*    pTbc;
24
  int32_t iLoop;
25
};
26

27
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
86✔
28
  int32_t          code = 0;
86✔
29
  int32_t          lino;
30
  int32_t          c = 0;
86✔
31
  SMetaSnapReader* pReader = NULL;
86✔
32

33
  // alloc
34
  pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
86!
35
  if (pReader == NULL) {
86!
36
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
37
  }
38
  pReader->pMeta = pMeta;
86✔
39
  pReader->sver = sver;
86✔
40
  pReader->ever = ever;
86✔
41

42
  // impl
43
  code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
86✔
44
  TSDB_CHECK_CODE(code, lino, _exit);
86!
45

46
  code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
86✔
47
  TSDB_CHECK_CODE(code, lino, _exit);
86!
48

49
_exit:
86✔
50
  if (code) {
86!
51
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
52
    metaSnapReaderClose(&pReader);
×
53
    *ppReader = NULL;
×
54
  } else {
55
    metaInfo("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
86!
56
    *ppReader = pReader;
86✔
57
  }
58
  return code;
86✔
59
}
60

61
void metaSnapReaderClose(SMetaSnapReader** ppReader) {
86✔
62
  if (ppReader && *ppReader) {
86!
63
    tdbTbcClose((*ppReader)->pTbc);
86✔
64
    taosMemoryFree(*ppReader);
86!
65
    *ppReader = NULL;
86✔
66
  }
67
}
86✔
68

69
extern int metaDecodeEntryImpl(SDecoder* pCoder, SMetaEntry* pME, bool headerOnly);
70

71
static int32_t metaDecodeEntryHeader(void* data, int32_t size, SMetaEntry* entry) {
1,144✔
72
  SDecoder decoder = {0};
1,144✔
73
  tDecoderInit(&decoder, (uint8_t*)data, size);
1,144✔
74

75
  int32_t code = metaDecodeEntryImpl(&decoder, entry, true);
1,144✔
76
  if (code) {
1,144!
77
    tDecoderClear(&decoder);
×
78
    return code;
×
79
  }
80

81
  tDecoderClear(&decoder);
1,144✔
82
  return 0;
1,144✔
83
}
84

85
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
655✔
86
  int32_t     code = 0;
655✔
87
  const void* pKey = NULL;
655✔
88
  const void* pData = NULL;
655✔
89
  int32_t     nKey = 0;
655✔
90
  int32_t     nData = 0;
655✔
91
  STbDbKey    key;
92
  int32_t     c;
93

94
  *ppData = NULL;
655✔
95
  while (pReader->iLoop < 2) {
1,402✔
96
    if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData) != 0 || ((STbDbKey*)pKey)->version > pReader->ever) {
1,316!
97
      pReader->iLoop++;
172✔
98

99
      // Reopen the cursor to read from the beginning
100
      tdbTbcClose(pReader->pTbc);
172✔
101
      pReader->pTbc = NULL;
172✔
102
      code = tdbTbcOpen(pReader->pMeta->pTbDb, &pReader->pTbc, NULL);
172✔
103
      if (code) {
172!
104
        metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
×
105
                  tstrerror(code));
106
        goto _exit;
×
107
      }
108

109
      code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = pReader->sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
172✔
110
      if (code) {
172!
111
        metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
×
112
                  tstrerror(code));
113
        goto _exit;
×
114
      }
115

116
      continue;
172✔
117
    }
118

119
    // Decode meta entry
120
    SMetaEntry entry = {0};
1,144✔
121
    code = metaDecodeEntryHeader((void*)pData, nData, &entry);
1,144✔
122
    if (code) {
1,144!
123
      metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
×
124
                tstrerror(code));
125
      goto _exit;
×
126
    }
127

128
    key = ((STbDbKey*)pKey)[0];
1,144✔
129
    if (key.version < pReader->sver                                       //
1,144✔
130
        || (pReader->iLoop == 0 && TABS(entry.type) != TSDB_SUPER_TABLE)  // First loop send super table entry
1,138✔
131
        || (pReader->iLoop == 1 && TABS(entry.type) == TSDB_SUPER_TABLE)  // Second loop send non-super table entry
643✔
132
    ) {
133
      if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
575!
134
        metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
135
      }
136
      continue;
575✔
137
    }
138

139
    if (!pData || !nData) {
569!
140
      metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData);
×
141
      goto _exit;
×
142
    }
143

144
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
569!
145
    if (*ppData == NULL) {
569!
146
      code = terrno;
×
147
      goto _exit;
×
148
    }
149

150
    SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
569✔
151
    pHdr->type = SNAP_DATA_META;
569✔
152
    pHdr->size = nData;
569✔
153
    memcpy(pHdr->data, pData, nData);
569✔
154

155
    metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
569!
156
              TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
157

158
    if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
569!
159
      metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
160
    }
161
    break;
569✔
162
  }
163

164
_exit:
86✔
165
  if (code) {
655!
166
    metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode),
×
167
              tstrerror(code));
168
  }
169
  return code;
655✔
170
}
171

172
// SMetaSnapWriter ========================================
173
struct SMetaSnapWriter {
174
  SMeta*  pMeta;
175
  int64_t sver;
176
  int64_t ever;
177
};
178

179
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
74✔
180
  int32_t          code = 0;
74✔
181
  int32_t          lino;
182
  SMetaSnapWriter* pWriter;
183

184
  // alloc
185
  pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
74!
186
  if (pWriter == NULL) {
74!
187
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
188
  }
189
  pWriter->pMeta = pMeta;
74✔
190
  pWriter->sver = sver;
74✔
191
  pWriter->ever = ever;
74✔
192

193
  code = metaBegin(pMeta, META_BEGIN_HEAP_NIL);
74✔
194
  TSDB_CHECK_CODE(code, lino, _exit);
74!
195

196
_exit:
74✔
197
  if (code) {
74!
198
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
199
    taosMemoryFree(pWriter);
×
200
    *ppWriter = NULL;
×
201
  } else {
202
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
74!
203
    *ppWriter = pWriter;
74✔
204
  }
205
  return code;
74✔
206
}
207

208
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
74✔
209
  int32_t          code = 0;
74✔
210
  SMetaSnapWriter* pWriter = *ppWriter;
74✔
211

212
  if (rollback) {
74!
213
    metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
×
214
    code = metaAbort(pWriter->pMeta);
×
215
    metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
×
216
             code);
217
    if (code) goto _err;
×
218
  } else {
219
    code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
74✔
220
    if (code) goto _err;
74!
221
    code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
74✔
222
    if (code) goto _err;
74!
223
  }
224
  taosMemoryFree(pWriter);
74!
225
  *ppWriter = NULL;
74✔
226

227
  return code;
74✔
228

229
_err:
×
230
  metaError("vgId:%d, meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
×
231
  return code;
×
232
}
233

234
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
569✔
235
  int32_t    code = 0;
569✔
236
  int32_t    lino = 0;
569✔
237
  SMeta*     pMeta = pWriter->pMeta;
569✔
238
  SMetaEntry metaEntry = {0};
569✔
239
  SDecoder*  pDecoder = &(SDecoder){0};
569✔
240

241
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
569✔
242
  code = metaDecodeEntry(pDecoder, &metaEntry);
569✔
243
  TSDB_CHECK_CODE(code, lino, _exit);
569!
244

245
  metaHandleSyncEntry(pMeta, &metaEntry);
569✔
246

247
_exit:
569✔
248
  if (code) {
569!
249
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
250
  }
251
  tDecoderClear(pDecoder);
569✔
252
  return code;
569✔
253
}
254

255
typedef struct STableInfoForChildTable {
256
  char*           tableName;
257
  SSchemaWrapper* schemaRow;
258
  SSchemaWrapper* tagRow;
259
} STableInfoForChildTable;
260

261
static void destroySTableInfoForChildTable(void* data) {
396✔
262
  STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
396✔
263
  taosMemoryFree(pData->tableName);
396!
264
  tDeleteSchemaWrapper(pData->schemaRow);
396!
265
  tDeleteSchemaWrapper(pData->tagRow);
396!
266
}
396✔
267

268
static int32_t MoveToSnapShotVersion(SSnapContext* ctx) {
263✔
269
  int32_t code = 0;
263✔
270
  tdbTbcClose((TBC*)ctx->pCur);
263✔
271
  code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
264✔
272
  if (code != 0) {
264!
273
    return TAOS_GET_TERRNO(code);
×
274
  }
275
  STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
264✔
276
  int      c = 0;
264✔
277
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
264✔
278
  if (code != 0) {
264!
279
    return TAOS_GET_TERRNO(code);
×
280
  }
281
  if (c < 0) {
264✔
282
    if (tdbTbcMoveToPrev((TBC*)ctx->pCur) != 0) {
2!
283
      metaTrace("vgId:%d, vnode snapshot move to prev failed", TD_VID(ctx->pMeta->pVnode));
×
284
    }
285
  }
286
  return 0;
264✔
287
}
288

289
static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) {
4,378✔
290
  tdbTbcClose((TBC*)ctx->pCur);
4,378✔
291
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
4,378✔
292
  if (code != 0) {
4,379!
293
    return TAOS_GET_TERRNO(code);
×
294
  }
295
  STbDbKey key = {.version = ver, .uid = uid};
4,379✔
296
  int      c = 0;
4,379✔
297
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
4,379✔
298
  if (code != 0) {
4,378!
299
    return TAOS_GET_TERRNO(code);
×
300
  }
301
  return c;
4,378✔
302
}
303

304
static int32_t MoveToFirst(SSnapContext* ctx) {
262✔
305
  tdbTbcClose((TBC*)ctx->pCur);
262✔
306
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
263✔
307
  if (code != 0) {
264!
308
    return TAOS_GET_TERRNO(code);
×
309
  }
310
  code = tdbTbcMoveToFirst((TBC*)ctx->pCur);
264✔
311
  if (code != 0) {
264!
312
    return TAOS_GET_TERRNO(code);
×
313
  }
314
  return 0;
264✔
315
}
316

317
static int32_t saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) {
395✔
318
  STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
395✔
319
  if (data) {
395!
320
    return 0;
×
321
  }
322
  int32_t                 code = 0;
395✔
323
  STableInfoForChildTable dataTmp = {0};
395✔
324
  dataTmp.tableName = taosStrdup(me->name);
395!
325
  if (dataTmp.tableName == NULL) {
395!
326
    code = terrno;
×
327
    goto END;
×
328
  }
329
  dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
395!
330
  if (dataTmp.schemaRow == NULL) {
396!
331
    code = TSDB_CODE_OUT_OF_MEMORY;
×
332
    goto END;
×
333
  }
334
  dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
396!
335
  if (dataTmp.tagRow == NULL) {
396!
336
    code = TSDB_CODE_OUT_OF_MEMORY;
×
337
    goto END;
×
338
  }
339
  code = taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
396✔
340
  if (code != 0) {
396!
341
    goto END;
×
342
  }
343
  return 0;
396✔
344

345
END:
×
346
  destroySTableInfoForChildTable(&dataTmp);
×
347
  return TAOS_GET_TERRNO(code);
×
348
  ;
349
}
350

351
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
264✔
352
                         SSnapContext** ctxRet) {
353
  SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
264!
354
  if (ctx == NULL) {
264!
355
    return terrno;
×
356
  }
357
  *ctxRet = ctx;
264✔
358
  ctx->pMeta = pVnode->pMeta;
264✔
359
  ctx->snapVersion = snapVersion;
264✔
360
  ctx->suid = suid;
264✔
361
  ctx->subType = subType;
264✔
362
  ctx->queryMeta = withMeta;
264✔
363
  ctx->withMeta = withMeta;
264✔
364
  ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
264✔
365
  if (ctx->idVersion == NULL) {
264!
366
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
367
  }
368

369
  ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
264✔
370
  if (ctx->suidInfo == NULL) {
264!
371
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
372
  }
373
  taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);
264✔
374

375
  ctx->index = 0;
264✔
376
  ctx->idList = taosArrayInit(100, sizeof(int64_t));
264✔
377
  if (ctx->idList == NULL) {
264!
378
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
379
    ;
380
  }
381
  void* pKey = NULL;
264✔
382
  void* pVal = NULL;
264✔
383
  int   vLen = 0, kLen = 0;
264✔
384

385
  metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
264✔
386
  int32_t code = MoveToFirst(ctx);
264✔
387
  if (code != 0) {
264!
388
    return code;
×
389
  }
390
  while (1) {
6,880✔
391
    int32_t ret = tdbTbcNext((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
7,144✔
392
    if (ret < 0) break;
6,938✔
393
    STbDbKey* tmp = (STbDbKey*)pKey;
6,678✔
394
    if (tmp->version > ctx->snapVersion) break;
6,678✔
395

396
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
6,674✔
397
    if (idData) {
6,635✔
398
      continue;
519✔
399
    }
400

401
    if (tdbTbGet(ctx->pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
6,345✔
402
        0) {  // check if table exist for now, need optimize later
403
      continue;
223✔
404
    }
405

406
    SDecoder   dc = {0};
6,326✔
407
    SMetaEntry me = {0};
6,326✔
408
    tDecoderInit(&dc, pVal, vLen);
6,326✔
409
    ret = metaDecodeEntry(&dc, &me);
6,299✔
410
    if (ret < 0) {
6,209!
411
      tDecoderClear(&dc);
×
412
      return TAOS_GET_TERRNO(ret);
×
413
    }
414
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
6,209✔
415
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
784✔
416
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
783✔
417
        tDecoderClear(&dc);
4✔
418
        continue;
4✔
419
      }
420
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
5,425✔
421
      if (me.type == TSDB_VIRTUAL_NORMAL_TABLE ||
5,424!
422
          me.type == TSDB_VIRTUAL_CHILD_TABLE ||
5,428!
423
          TABLE_IS_VIRTUAL(me.flags)) {
5,432✔
UNCOV
424
        tDecoderClear(&dc);
×
425
        continue;
2✔
426
      }
427
    }
428

429
    if (taosArrayPush(ctx->idList, &tmp->uid) == NULL) {
12,423!
430
      tDecoderClear(&dc);
×
431
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
432
    }
433
    metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
6,211✔
434
    tDecoderClear(&dc);
6,212✔
435

436
    SIdInfo info = {0};
6,364✔
437
    if (taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo)) != 0) {
6,364!
438
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
439
    }
440
  }
441
  taosHashClear(ctx->idVersion);
264✔
442

443
  code = MoveToSnapShotVersion(ctx);
263✔
444
  if (code != 0) {
264!
445
    return code;
×
446
  }
447
  while (1) {
6,829✔
448
    int32_t ret = tdbTbcPrev((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
7,093✔
449
    if (ret < 0) break;
6,907✔
450

451
    STbDbKey* tmp = (STbDbKey*)pKey;
6,643✔
452
    SIdInfo*  idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
6,643✔
453
    if (idData) {
6,606✔
454
      continue;
451✔
455
    }
456
    SIdInfo info = {.version = tmp->version, .index = 0};
6,161✔
457
    ret = taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
6,161✔
458
    if (ret != 0) {
6,376!
459
      return TAOS_GET_TERRNO(ret);
×
460
    }
461

462
    SDecoder   dc = {0};
6,376✔
463
    SMetaEntry me = {0};
6,376✔
464
    tDecoderInit(&dc, pVal, vLen);
6,376✔
465
    ret = metaDecodeEntry(&dc, &me);
6,323✔
466
    if (ret < 0) {
6,140!
467
      tDecoderClear(&dc);
×
468
      return TAOS_GET_TERRNO(ret);
×
469
    }
470

471
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
6,140✔
472
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
783✔
473
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
782✔
474
        tDecoderClear(&dc);
4✔
475
        continue;
4✔
476
      }
477
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
5,357!
478
      if (me.type == TSDB_VIRTUAL_NORMAL_TABLE ||
5,357!
479
          me.type == TSDB_VIRTUAL_CHILD_TABLE ||
5,362!
480
          TABLE_IS_VIRTUAL(me.flags)) {
5,363✔
UNCOV
481
        tDecoderClear(&dc);
×
482
        continue;
2✔
483
      }
484
    }
485

486
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
6,141✔
487
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
5,807✔
488
      ret = saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
395✔
489
      if (ret != 0) {
396!
490
        tDecoderClear(&dc);
×
491
        return ret;
×
492
      }
493
    }
494
    tDecoderClear(&dc);
6,142✔
495
  }
496

497
  for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
6,663✔
498
    int64_t* uid = taosArrayGet(ctx->idList, i);
6,399✔
499
    if (uid == NULL) {
6,399!
500
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
501
    }
502
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
6,399✔
503
    if (!idData) {
6,399!
504
      metaError("meta/snap: null idData");
×
505
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
506
    }
507

508
    idData->index = i;
6,399✔
509
    metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version,
6,399✔
510
              idData->index);
511
  }
512

513
  tdbFree(pKey);
264✔
514
  tdbFree(pVal);
264✔
515
  return TDB_CODE_SUCCESS;
264✔
516
}
517

518
void destroySnapContext(SSnapContext* ctx) {
264✔
519
  tdbTbcClose((TBC*)ctx->pCur);
264✔
520
  taosArrayDestroy(ctx->idList);
264✔
521
  taosHashCleanup(ctx->idVersion);
264✔
522
  taosHashCleanup(ctx->suidInfo);
264✔
523
  taosMemoryFree(ctx);
264!
524
}
264✔
525

526
static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) {
93✔
527
  int32_t            ret = 0;
93✔
528
  SVCreateTbBatchReq reqs = {0};
93✔
529

530
  reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
93✔
531
  if (NULL == reqs.pArray) {
93!
532
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
533
    goto end;
×
534
  }
535
  if (taosArrayPush(reqs.pArray, req) == NULL) {
186!
536
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
537
    goto end;
×
538
  }
539
  reqs.nReqs = 1;
93✔
540

541
  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
93!
542
  if (ret < 0) {
93!
543
    ret = TAOS_GET_TERRNO(ret);
×
544
    goto end;
×
545
  }
546
  *contLen += sizeof(SMsgHead);
93✔
547
  *pBuf = taosMemoryMalloc(*contLen);
93!
548
  if (NULL == *pBuf) {
93!
549
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
550
    goto end;
×
551
  }
552
  SEncoder coder = {0};
93✔
553
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
93✔
554
  ret = tEncodeSVCreateTbBatchReq(&coder, &reqs);
93✔
555
  tEncoderClear(&coder);
93✔
556

557
  if (ret < 0) {
93!
558
    taosMemoryFreeClear(*pBuf);
×
559
    ret = TAOS_GET_TERRNO(ret);
×
560
    goto end;
×
561
  }
562

563
end:
93✔
564
  taosArrayDestroy(reqs.pArray);
93✔
565
  return ret;
93✔
566
}
567

568
static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) {
40✔
569
  int32_t ret = 0;
40✔
570
  tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
40!
571
  if (ret < 0) {
40!
572
    return TAOS_GET_TERRNO(ret);
×
573
  }
574

575
  *contLen += sizeof(SMsgHead);
40✔
576
  *pBuf = taosMemoryMalloc(*contLen);
40!
577
  if (NULL == *pBuf) {
40!
578
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
579
  }
580

581
  SEncoder encoder = {0};
40✔
582
  tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
40✔
583
  ret = tEncodeSVCreateStbReq(&encoder, req);
40✔
584
  tEncoderClear(&encoder);
40✔
585
  if (ret < 0) {
40!
586
    taosMemoryFreeClear(*pBuf);
×
587
    return TAOS_GET_TERRNO(ret);
×
588
  }
589
  return 0;
40✔
590
}
591

592
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
2,131✔
593
  if (uid == 0) {
2,131✔
594
    ctx->index = 0;
42✔
595
    return 0;
42✔
596
  }
597

598
  SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
2,089✔
599
  if (idInfo == NULL) {
2,088!
600
    return terrno;
×
601
  }
602

603
  ctx->index = idInfo->index;
2,088✔
604

605
  return 0;
2,088✔
606
}
607

608
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
2,113✔
609
  bool            ret = false;
2,113✔
610
  SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL, 0);
2,113✔
611
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
2,113!
612
    ret = true;
×
613
  }
614
  tDeleteSchemaWrapper(schema);
615
  ctx->hasPrimaryKey = ret;
2,113✔
616
}
2,113✔
617

618
bool taosXGetTablePrimaryKey(SSnapContext* ctx) { return ctx->hasPrimaryKey; }
4,240✔
619

620
int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
149✔
621
  int32_t ret = 0;
149✔
622
  void*   pKey = NULL;
149✔
623
  void*   pVal = NULL;
149✔
624
  int     vLen = 0, kLen = 0;
149✔
625

626
  while (1) {
×
627
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
149✔
628
      metaDebug("tmqsnap get meta end");
16!
629
      ctx->index = 0;
16✔
630
      ctx->queryMeta = 0;  // change to get data
16✔
631
      return 0;
16✔
632
    }
633

634
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
133✔
635
    if (uidTmp == NULL) {
133!
636
      metaError("tmqsnap get meta null uid");
×
637
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
638
    }
639
    ctx->index++;
133✔
640
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
133✔
641
    if (!idInfo) {
133!
642
      metaError("meta/snap: null idInfo");
×
643
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
644
    }
645

646
    *uid = *uidTmp;
133✔
647
    ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
133✔
648
    if (ret == 0) {
133!
649
      break;
133✔
650
    }
651
    metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version);
×
652
  }
653

654
  ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
133✔
655
  if (ret < 0) {
133!
656
    return TAOS_GET_TERRNO(ret);
×
657
  }
658
  SDecoder   dc = {0};
133✔
659
  SMetaEntry me = {0};
133✔
660
  tDecoderInit(&dc, pVal, vLen);
133✔
661
  ret = metaDecodeEntry(&dc, &me);
133✔
662
  if (ret < 0) {
133!
663
    tDecoderClear(&dc);
×
664
    ret = TAOS_GET_TERRNO(ret);
×
665
    goto END;
×
666
  }
667
  metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
133!
668

669
  if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
133!
670
      (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
93!
671
    SVCreateStbReq req = {0};
40✔
672
    req.name = me.name;
40✔
673
    req.suid = me.uid;
40✔
674
    req.schemaRow = me.stbEntry.schemaRow;
40✔
675
    req.schemaTag = me.stbEntry.schemaTag;
40✔
676
    req.schemaRow.version = 1;
40✔
677
    req.schemaTag.version = 1;
40✔
678
    req.colCmpr = me.colCmpr;
40✔
679

680
    ret = buildSuperTableInfo(&req, pBuf, contLen);
40✔
681
    *type = TDMT_VND_CREATE_STB;
40✔
682
  } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
93!
683
             (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
93!
684
    STableInfoForChildTable* data =
685
        (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
83✔
686
    if (!data) {
83!
687
      metaError("meta/snap: null data");
×
688
      ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
689
      goto END;
×
690
    }
691

692
    SVCreateTbReq req = {0};
83✔
693

694
    req.type = TSDB_CHILD_TABLE;
83✔
695
    req.name = me.name;
83✔
696
    req.uid = me.uid;
83✔
697
    req.commentLen = -1;
83✔
698
    req.ctb.suid = me.ctbEntry.suid;
83✔
699
    req.ctb.tagNum = data->tagRow->nCols;
83✔
700
    req.ctb.stbName = data->tableName;
83✔
701

702
    SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
83✔
703
    if (tagName == NULL) {
83!
704
      metaError("meta/snap: init tag name failed.");
×
705
      ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
706
      goto END;
×
707
    }
708
    STag* p = (STag*)me.ctbEntry.pTags;
83✔
709
    if (tTagIsJson(p)) {
83✔
710
      if (p->nTag != 0) {
12✔
711
        SSchema* schema = &data->tagRow->pSchema[0];
6✔
712
        if (taosArrayPush(tagName, schema->name) == NULL) {
12!
713
          ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
714
          taosArrayDestroy(tagName);
×
715
          goto END;
×
716
        }
717
      }
718
    } else {
719
      SArray* pTagVals = NULL;
71✔
720
      ret = tTagToValArray((const STag*)p, &pTagVals);
71✔
721
      if (ret != 0) {
71!
722
        metaError("meta/snap: tag to val array failed.");
×
723
        taosArrayDestroy(pTagVals);
×
724
        taosArrayDestroy(tagName);
×
725
        goto END;
×
726
      }
727
      int16_t nCols = taosArrayGetSize(pTagVals);
71✔
728
      for (int j = 0; j < nCols; ++j) {
242✔
729
        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
171✔
730
        for (int i = 0; pTagVal && i < data->tagRow->nCols; i++) {
690!
731
          SSchema* schema = &data->tagRow->pSchema[i];
519✔
732
          if (schema->colId == pTagVal->cid) {
519✔
733
            if (taosArrayPush(tagName, schema->name) == NULL) {
342!
734
              ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
735
              taosArrayDestroy(pTagVals);
×
736
              taosArrayDestroy(tagName);
×
737
              goto END;
×
738
            }
739
          }
740
        }
741
      }
742
      taosArrayDestroy(pTagVals);
71✔
743
    }
744
    req.ctb.pTag = me.ctbEntry.pTags;
83✔
745
    req.ctb.tagName = tagName;
83✔
746
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
83✔
747
    *type = TDMT_VND_CREATE_TABLE;
83✔
748
    taosArrayDestroy(tagName);
83✔
749
  } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
10!
750
    SVCreateTbReq req = {0};
10✔
751
    req.type = TSDB_NORMAL_TABLE;
10✔
752
    req.name = me.name;
10✔
753
    req.uid = me.uid;
10✔
754
    req.commentLen = -1;
10✔
755
    req.ntb.schemaRow = me.ntbEntry.schemaRow;
10✔
756
    req.colCmpr = me.colCmpr;
10✔
757
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
10✔
758
    *type = TDMT_VND_CREATE_TABLE;
10✔
759
  } else {
760
    metaError("meta/snap: invalid topic sub type: %" PRId8 " get meta from snap failed.", ctx->subType);
×
761
    ret = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
×
762
  }
763

764
END:
133✔
765
  tDecoderClear(&dc);
133✔
766
  return ret;
133✔
767
}
768

769
int32_t getMetaTableInfoFromSnapshot(SSnapContext* ctx, SMetaTableInfo* result) {
4,228✔
770
  void* pKey = NULL;
4,228✔
771
  void* pVal = NULL;
4,228✔
772
  int   vLen, kLen;
773

774
  while (1) {
44✔
775
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
4,272✔
776
      metaDebug("tmqsnap get uid info end");
29✔
777
      return 0;
29✔
778
    }
779
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
4,244✔
780
    if (uidTmp == NULL) {
4,244!
781
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
782
    }
783
    ctx->index++;
4,244✔
784
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
4,244✔
785
    if (!idInfo) {
4,245!
786
      metaError("meta/snap: null idInfo");
×
787
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
788
    }
789

790
    int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
4,245✔
791
    if (ret != 0) {
4,245!
792
      metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp,
×
793
                idInfo->version);
794
      continue;
44✔
795
    }
796
    ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
4,245✔
797
    if (ret != 0) {
4,245!
798
      return TAOS_GET_TERRNO(ret);
×
799
    }
800
    SDecoder   dc = {0};
4,245✔
801
    SMetaEntry me = {0};
4,245✔
802
    tDecoderInit(&dc, pVal, vLen);
4,245✔
803
    ret = metaDecodeEntry(&dc, &me);
4,245✔
804
    if (ret != 0) {
4,242!
805
      tDecoderClear(&dc);
×
806
      return TAOS_GET_TERRNO(ret);
×
807
    }
808
    metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
4,242✔
809

810
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
4,242!
811
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
4,045!
812
      STableInfoForChildTable* data =
813
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
197✔
814
      if (data == NULL) {
197!
815
        tDecoderClear(&dc);
×
816
        metaError("meta/snap: null data");
×
817
        return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
818
      }
819
      result->suid = me.ctbEntry.suid;
197✔
820
      result->schema = tCloneSSchemaWrapper(data->schemaRow);
394!
821
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
4,045!
822
      result->suid = 0;
4,001!
823
      result->schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
4,005✔
824
    } else {
825
      metaDebug("tmqsnap get uid continue");
44✔
826
      tDecoderClear(&dc);
44✔
827
      continue;
44✔
828
    }
829
    result->uid = me.uid;
4,202✔
830
    tstrncpy(result->tbName, me.name, TSDB_TABLE_NAME_LEN);
4,202✔
831
    tDecoderClear(&dc);
4,202✔
832
    if (result->schema == NULL) {
4,200!
833
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
834
    }
835
    break;
4,200✔
836
  }
837
  return 0;
4,200✔
838
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc