• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4828

29 Oct 2025 11:10AM UTC coverage: 61.071% (-0.3%) from 61.383%
#4828

push

travis-ci

web-flow
Merge pull request #33419 from taosdata/3.0

3.0

155924 of 324872 branches covered (48.0%)

Branch coverage included in aggregate %.

207404 of 270051 relevant lines covered (76.8%)

243478715.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.75
/source/dnode/vnode/src/meta/metaSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17

18
// SMetaSnapReader ========================================
19
struct SMetaSnapReader {
20
  SMeta*  pMeta;
21
  int64_t sver;
22
  int64_t ever;
23
  TBC*    pTbc;
24
  int32_t iLoop;
25
};
26

27
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
88,453✔
28
  int32_t          code = 0;
88,453✔
29
  int32_t          lino;
30
  int32_t          c = 0;
88,453✔
31
  SMetaSnapReader* pReader = NULL;
88,453✔
32

33
  // alloc
34
  pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
88,453!
35
  if (pReader == NULL) {
88,453!
36
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
37
  }
38
  pReader->pMeta = pMeta;
88,453✔
39
  pReader->sver = sver;
88,453✔
40
  pReader->ever = ever;
88,453✔
41

42
  // impl
43
  code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
88,453✔
44
  TSDB_CHECK_CODE(code, lino, _exit);
88,453!
45

46
  code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
88,453✔
47
  TSDB_CHECK_CODE(code, lino, _exit);
88,453!
48

49
_exit:
88,453✔
50
  if (code) {
88,453!
51
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
52
    metaSnapReaderClose(&pReader);
×
53
    *ppReader = NULL;
×
54
  } else {
55
    metaInfo("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
88,453!
56
    *ppReader = pReader;
88,453✔
57
  }
58
  return code;
88,453✔
59
}
60

61
void metaSnapReaderClose(SMetaSnapReader** ppReader) {
88,453✔
62
  if (ppReader && *ppReader) {
88,453!
63
    tdbTbcClose((*ppReader)->pTbc);
88,453✔
64
    taosMemoryFree(*ppReader);
88,453!
65
    *ppReader = NULL;
88,453✔
66
  }
67
}
88,453✔
68

69
extern int metaDecodeEntryImpl(SDecoder* pCoder, SMetaEntry* pME, bool headerOnly);
70

71
static int32_t metaDecodeEntryHeader(void* data, int32_t size, SMetaEntry* entry) {
1,028,434✔
72
  SDecoder decoder = {0};
1,028,434✔
73
  tDecoderInit(&decoder, (uint8_t*)data, size);
1,028,434✔
74

75
  int32_t code = metaDecodeEntryImpl(&decoder, entry, true);
1,028,434✔
76
  if (code) {
1,028,434!
77
    tDecoderClear(&decoder);
×
78
    return code;
×
79
  }
80

81
  tDecoderClear(&decoder);
1,028,434✔
82
  return 0;
1,028,434✔
83
}
84

85
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
597,267✔
86
  int32_t     code = 0;
597,267✔
87
  const void* pKey = NULL;
597,267✔
88
  const void* pData = NULL;
597,267✔
89
  int32_t     nKey = 0;
597,267✔
90
  int32_t     nData = 0;
597,267✔
91
  STbDbKey    key;
92
  int32_t     c;
597,267✔
93

94
  *ppData = NULL;
597,267✔
95
  while (pReader->iLoop < 2) {
1,293,793✔
96
    if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData) != 0 || ((STbDbKey*)pKey)->version > pReader->ever) {
1,205,340!
97
      pReader->iLoop++;
176,906✔
98

99
      // Reopen the cursor to read from the beginning
100
      tdbTbcClose(pReader->pTbc);
176,906✔
101
      pReader->pTbc = NULL;
176,906✔
102
      code = tdbTbcOpen(pReader->pMeta->pTbDb, &pReader->pTbc, NULL);
176,906✔
103
      if (code) {
176,906!
104
        metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
×
105
                  tstrerror(code));
106
        goto _exit;
×
107
      }
108

109
      code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = pReader->sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
176,906✔
110
      if (code) {
176,906!
111
        metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
×
112
                  tstrerror(code));
113
        goto _exit;
×
114
      }
115

116
      continue;
176,906✔
117
    }
118

119
    // Decode meta entry
120
    SMetaEntry entry = {0};
1,028,434✔
121
    code = metaDecodeEntryHeader((void*)pData, nData, &entry);
1,028,434✔
122
    if (code) {
1,028,434!
123
      metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
×
124
                tstrerror(code));
125
      goto _exit;
×
126
    }
127

128
    key = ((STbDbKey*)pKey)[0];
1,028,434✔
129
    if (key.version < pReader->sver                                       //
1,028,434✔
130
        || (pReader->iLoop == 0 && TABS(entry.type) != TSDB_SUPER_TABLE)  // First loop send super table entry
1,017,628✔
131
        || (pReader->iLoop == 1 && TABS(entry.type) == TSDB_SUPER_TABLE)  // Second loop send non-super table entry
592,276✔
132
    ) {
133
      if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
519,620!
134
        metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
135
      }
136
      continue;
519,620✔
137
    }
138

139
    if (!pData || !nData) {
508,814!
140
      metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData);
×
141
      goto _exit;
×
142
    }
143

144
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
508,814!
145
    if (*ppData == NULL) {
508,814!
146
      code = terrno;
×
147
      goto _exit;
×
148
    }
149

150
    SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
508,814✔
151
    pHdr->type = SNAP_DATA_META;
508,814✔
152
    pHdr->size = nData;
508,814✔
153
    memcpy(pHdr->data, pData, nData);
508,814!
154

155
    metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
508,814!
156
              TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
157

158
    if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
508,814!
159
      metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
160
    }
161
    break;
508,814✔
162
  }
163

164
_exit:
597,267✔
165
  if (code) {
597,267!
166
    metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode),
×
167
              tstrerror(code));
168
  }
169
  return code;
597,267✔
170
}
171

172
// SMetaSnapWriter ========================================
173
struct SMetaSnapWriter {
174
  SMeta*  pMeta;
175
  int64_t sver;
176
  int64_t ever;
177
};
178

179
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
74,180✔
180
  int32_t          code = 0;
74,180✔
181
  int32_t          lino;
182
  SMetaSnapWriter* pWriter;
183

184
  // alloc
185
  pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
74,180!
186
  if (pWriter == NULL) {
74,180!
187
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
188
  }
189
  pWriter->pMeta = pMeta;
74,180✔
190
  pWriter->sver = sver;
74,180✔
191
  pWriter->ever = ever;
74,180✔
192

193
  code = metaBegin(pMeta, META_BEGIN_HEAP_NIL);
74,180✔
194
  TSDB_CHECK_CODE(code, lino, _exit);
74,180!
195

196
_exit:
74,180✔
197
  if (code) {
74,180!
198
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
199
    taosMemoryFree(pWriter);
×
200
    *ppWriter = NULL;
×
201
  } else {
202
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
74,180!
203
    *ppWriter = pWriter;
74,180✔
204
  }
205
  return code;
74,180✔
206
}
207

208
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
74,180✔
209
  int32_t          code = 0;
74,180✔
210
  SMetaSnapWriter* pWriter = *ppWriter;
74,180✔
211

212
  if (rollback) {
74,180!
213
    metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
×
214
    code = metaAbort(pWriter->pMeta);
×
215
    metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
×
216
             code);
217
    if (code) goto _err;
×
218
  } else {
219
    code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
74,180✔
220
    if (code) goto _err;
74,180!
221
    code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
74,180✔
222
    if (code) goto _err;
74,180!
223
  }
224
  taosMemoryFree(pWriter);
74,180!
225
  *ppWriter = NULL;
74,180✔
226

227
  return code;
74,180✔
228

229
_err:
×
230
  metaError("vgId:%d, meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
×
231
  return code;
×
232
}
233

234
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
508,826✔
235
  int32_t    code = 0;
508,826✔
236
  int32_t    lino = 0;
508,826✔
237
  SMeta*     pMeta = pWriter->pMeta;
508,826✔
238
  SMetaEntry metaEntry = {0};
508,826✔
239
  SDecoder*  pDecoder = &(SDecoder){0};
508,826✔
240

241
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
508,826✔
242
  code = metaDecodeEntry(pDecoder, &metaEntry);
508,826✔
243
  TSDB_CHECK_CODE(code, lino, _exit);
508,826!
244

245
  metaHandleSyncEntry(pMeta, &metaEntry);
508,826✔
246

247
_exit:
508,826✔
248
  if (code) {
508,826!
249
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
250
  }
251
  tDecoderClear(pDecoder);
508,826✔
252
  return code;
508,826✔
253
}
254

255
typedef struct STableInfoForChildTable {
256
  char*           tableName;
257
  SSchemaWrapper* schemaRow;
258
  SSchemaWrapper* tagRow;
259
  SExtSchema*     pExtSchemas;
260
} STableInfoForChildTable;
261

262
static void destroySTableInfoForChildTable(void* data) {
427,233✔
263
  STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
427,233✔
264
  taosMemoryFree(pData->tableName);
427,233!
265
  tDeleteSchemaWrapper(pData->schemaRow);
426,355!
266
  tDeleteSchemaWrapper(pData->tagRow);
427,233!
267
  taosMemoryFreeClear(pData->pExtSchemas);
426,355!
268
}
427,233✔
269

270
static int32_t MoveToSnapShotVersion(SSnapContext* ctx) {
290,878✔
271
  int32_t code = 0;
290,878✔
272
  tdbTbcClose((TBC*)ctx->pCur);
290,878✔
273
  code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
291,767✔
274
  if (code != 0) {
291,767!
275
    return TAOS_GET_TERRNO(code);
×
276
  }
277
  STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
291,767✔
278
  int      c = 0;
291,767✔
279
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
291,767✔
280
  if (code != 0) {
291,767!
281
    return TAOS_GET_TERRNO(code);
×
282
  }
283
  if (c < 0) {
291,767✔
284
    if (tdbTbcMoveToPrev((TBC*)ctx->pCur) != 0) {
1,719!
285
      metaTrace("vgId:%d, vnode snapshot move to prev failed", TD_VID(ctx->pMeta->pVnode));
×
286
    }
287
  }
288
  return 0;
291,767✔
289
}
290

291
static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) {
4,513,488✔
292
  tdbTbcClose((TBC*)ctx->pCur);
4,513,488✔
293
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
4,511,793✔
294
  if (code != 0) {
4,510,951!
295
    return TAOS_GET_TERRNO(code);
×
296
  }
297
  STbDbKey key = {.version = ver, .uid = uid};
4,510,951✔
298
  int      c = 0;
4,511,804✔
299
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
4,512,646✔
300
  if (code != 0) {
4,513,488!
301
    return TAOS_GET_TERRNO(code);
×
302
  }
303
  return c;
4,513,488✔
304
}
305

306
static int32_t MoveToFirst(SSnapContext* ctx) {
291,767✔
307
  tdbTbcClose((TBC*)ctx->pCur);
291,767✔
308
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
291,767✔
309
  if (code != 0) {
291,767!
310
    return TAOS_GET_TERRNO(code);
×
311
  }
312
  code = tdbTbcMoveToFirst((TBC*)ctx->pCur);
291,767✔
313
  if (code != 0) {
291,767!
314
    return TAOS_GET_TERRNO(code);
×
315
  }
316
  return 0;
291,767✔
317
}
318

319
static int32_t saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) {
427,233✔
320
  STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
427,233✔
321
  if (data) {
427,233!
322
    return 0;
×
323
  }
324
  int32_t                 code = 0;
427,233✔
325
  STableInfoForChildTable dataTmp = {0};
427,233✔
326
  dataTmp.tableName = taosStrdup(me->name);
427,233!
327
  if (dataTmp.tableName == NULL) {
427,233!
328
    code = terrno;
×
329
    goto END;
×
330
  }
331
  dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
427,233!
332
  if (dataTmp.schemaRow == NULL) {
427,233!
333
    code = TSDB_CODE_OUT_OF_MEMORY;
×
334
    goto END;
×
335
  }
336
  dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
427,233!
337
  if (dataTmp.tagRow == NULL) {
427,233!
338
    code = TSDB_CODE_OUT_OF_MEMORY;
×
339
    goto END;
×
340
  }
341
  if (me->pExtSchemas != NULL) {
427,233✔
342
    dataTmp.pExtSchemas = taosMemoryMalloc(sizeof(SExtSchema) * me->stbEntry.schemaRow.nCols);
24,754!
343
    if (dataTmp.pExtSchemas == NULL) {
24,754!
344
      code = TSDB_CODE_OUT_OF_MEMORY;
×
345
      goto END;
×
346
    }
347
    memcpy(dataTmp.pExtSchemas, me->pExtSchemas, sizeof(SExtSchema) * me->stbEntry.schemaRow.nCols);
24,754!
348
  }
349
  
350
  code = taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
427,233✔
351
  if (code != 0) {
427,233!
352
    goto END;
×
353
  }
354
  return 0;
427,233✔
355

356
END:
×
357
  destroySTableInfoForChildTable(&dataTmp);
×
358
  return TAOS_GET_TERRNO(code);
×
359
  ;
360
}
361

362
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
290,031✔
363
                         SSnapContext** ctxRet) {
364
  SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
290,031!
365
  if (ctx == NULL) {
290,878!
366
    return terrno;
×
367
  }
368
  *ctxRet = ctx;
290,878✔
369
  ctx->pMeta = pVnode->pMeta;
290,878✔
370
  ctx->snapVersion = snapVersion;
291,767✔
371
  ctx->suid = suid;
291,767✔
372
  ctx->subType = subType;
290,031✔
373
  ctx->queryMeta = withMeta;
290,920✔
374
  ctx->withMeta = withMeta;
291,767✔
375
  ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
291,767✔
376
  if (ctx->idVersion == NULL) {
291,767!
377
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
378
  }
379

380
  ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
291,767✔
381
  if (ctx->suidInfo == NULL) {
291,767!
382
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
383
  }
384
  taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);
291,767✔
385

386
  ctx->index = 0;
291,767✔
387
  ctx->idList = taosArrayInit(100, sizeof(int64_t));
291,767✔
388
  if (ctx->idList == NULL) {
291,767!
389
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
390
    ;
391
  }
392
  void* pKey = NULL;
291,767✔
393
  void* pVal = NULL;
291,767✔
394
  int   vLen = 0, kLen = 0;
291,767✔
395

396
  metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
291,767✔
397
  int32_t code = MoveToFirst(ctx);
291,767✔
398
  if (code != 0) {
291,767!
399
    return code;
×
400
  }
401
  while (1) {
23,553,617✔
402
    int32_t ret = tdbTbcNext((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
23,845,384✔
403
    if (ret < 0) break;
23,758,939✔
404
    STbDbKey* tmp = (STbDbKey*)pKey;
23,470,610✔
405
    if (tmp->version > ctx->snapVersion) break;
23,470,610✔
406

407
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
23,512,485✔
408
    if (idData) {
23,540,148✔
409
      continue;
247,132✔
410
    }
411

412
    if (tdbTbGet(ctx->pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
23,293,016✔
413
        0) {  // check if table exist for now, need optimize later
414
      continue;
270,250✔
415
    }
416

417
    SDecoder   dc = {0};
22,983,300✔
418
    SMetaEntry me = {0};
23,003,253✔
419
    tDecoderInit(&dc, pVal, vLen);
22,984,193✔
420
    ret = metaDecodeEntry(&dc, &me);
22,907,623✔
421
    if (ret < 0) {
22,825,916!
422
      tDecoderClear(&dc);
×
423
      return TAOS_GET_TERRNO(ret);
×
424
    }
425
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
22,825,916✔
426
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
668,305✔
427
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
667,483✔
428
        tDecoderClear(&dc);
3,288✔
429
        continue;
3,288✔
430
      }
431
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
22,283,410✔
432
      if (me.type == TSDB_VIRTUAL_NORMAL_TABLE ||
22,302,621✔
433
          me.type == TSDB_VIRTUAL_CHILD_TABLE ||
22,160,781!
434
          TABLE_IS_VIRTUAL(me.flags)) {
22,208,940✔
435
        tDecoderClear(&dc);
94,503✔
436
        continue;
1,644✔
437
      }
438
    }
439

440
    if (taosArrayPush(ctx->idList, &tmp->uid) == NULL) {
45,672,587!
441
      tDecoderClear(&dc);
×
442
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
443
    }
444
    metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
22,825,890✔
445
    tDecoderClear(&dc);
22,946,362✔
446

447
    SIdInfo info = {0};
22,928,555✔
448
    if (taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo)) != 0) {
22,982,443!
449
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
450
    }
451
  }
452
  taosHashClear(ctx->idVersion);
291,767✔
453

454
  code = MoveToSnapShotVersion(ctx);
291,767✔
455
  if (code != 0) {
291,767!
456
    return code;
×
457
  }
458
  while (1) {
22,974,359✔
459
    int32_t ret = tdbTbcPrev((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
23,266,126✔
460
    if (ret < 0) break;
23,328,563✔
461

462
    STbDbKey* tmp = (STbDbKey*)pKey;
23,036,796✔
463
    SIdInfo*  idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
23,036,796✔
464
    if (idData) {
23,116,027✔
465
      continue;
428,700✔
466
    }
467
    SIdInfo info = {.version = tmp->version, .index = 0};
22,687,327✔
468
    ret = taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
22,698,945✔
469
    if (ret != 0) {
22,674,221!
470
      return TAOS_GET_TERRNO(ret);
×
471
    }
472

473
    SDecoder   dc = {0};
22,674,221✔
474
    SMetaEntry me = {0};
22,573,299✔
475
    tDecoderInit(&dc, pVal, vLen);
22,624,874✔
476
    ret = metaDecodeEntry(&dc, &me);
22,989,296✔
477
    if (ret < 0) {
22,268,795!
478
      tDecoderClear(&dc);
×
479
      return TAOS_GET_TERRNO(ret);
×
480
    }
481

482
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
22,268,795✔
483
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
668,305✔
484
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
667,483✔
485
        tDecoderClear(&dc);
3,288✔
486
        continue;
3,288✔
487
      }
488
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
22,016,130!
489
      if (me.type == TSDB_VIRTUAL_NORMAL_TABLE ||
22,176,418!
490
          me.type == TSDB_VIRTUAL_CHILD_TABLE ||
22,213,543!
491
          TABLE_IS_VIRTUAL(me.flags)) {
22,226,780✔
492
        tDecoderClear(&dc);
×
493
        continue;
1,644✔
494
      }
495
    }
496

497
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
22,799,298✔
498
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
22,403,022✔
499
      ret = saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
334,527✔
500
      if (ret != 0) {
427,233!
501
        tDecoderClear(&dc);
×
502
        return ret;
×
503
      }
504
    }
505
    tDecoderClear(&dc);
22,910,639✔
506
  }
507

508
  for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
23,356,665✔
509
    int64_t* uid = taosArrayGet(ctx->idList, i);
23,063,248✔
510
    if (uid == NULL) {
23,064,898!
511
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
512
    }
513
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
23,064,898✔
514
    if (!idData) {
23,061,598!
515
      metaError("meta/snap: null idData");
×
516
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
517
    }
518

519
    idData->index = i;
23,061,598✔
520
    metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version,
23,061,598✔
521
              idData->index);
522
  }
523

524
  tdbFree(pKey);
291,767✔
525
  tdbFree(pVal);
291,767✔
526
  return TDB_CODE_SUCCESS;
291,767✔
527
}
528

529
void destroySnapContext(SSnapContext* ctx) {
291,767✔
530
  tdbTbcClose((TBC*)ctx->pCur);
291,767✔
531
  taosArrayDestroy(ctx->idList);
291,767✔
532
  taosHashCleanup(ctx->idVersion);
291,767✔
533
  taosHashCleanup(ctx->suidInfo);
291,767✔
534
  taosMemoryFree(ctx);
290,889!
535
}
291,767✔
536

537
static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) {
78,818✔
538
  int32_t            ret = 0;
78,818✔
539
  SVCreateTbBatchReq reqs = {0};
78,818✔
540

541
  reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
78,818✔
542
  if (NULL == reqs.pArray) {
78,818!
543
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
544
    goto end;
×
545
  }
546
  if (taosArrayPush(reqs.pArray, req) == NULL) {
157,636!
547
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
548
    goto end;
×
549
  }
550
  reqs.nReqs = 1;
78,818✔
551

552
  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
78,818!
553
  if (ret < 0) {
78,818!
554
    ret = TAOS_GET_TERRNO(ret);
×
555
    goto end;
×
556
  }
557
  *contLen += sizeof(SMsgHead);
78,818✔
558
  *pBuf = taosMemoryMalloc(*contLen);
78,818!
559
  if (NULL == *pBuf) {
78,818!
560
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
561
    goto end;
×
562
  }
563
  SEncoder coder = {0};
78,818✔
564
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
78,818✔
565
  ret = tEncodeSVCreateTbBatchReq(&coder, &reqs);
78,818✔
566
  tEncoderClear(&coder);
78,818✔
567

568
  if (ret < 0) {
78,818!
569
    taosMemoryFreeClear(*pBuf);
×
570
    ret = TAOS_GET_TERRNO(ret);
×
571
    goto end;
×
572
  }
573

574
end:
78,818✔
575
  taosArrayDestroy(reqs.pArray);
78,818✔
576
  return ret;
78,818✔
577
}
578

579
static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) {
35,119✔
580
  int32_t ret = 0;
35,119✔
581
  tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
35,119!
582
  if (ret < 0) {
34,272!
583
    return TAOS_GET_TERRNO(ret);
×
584
  }
585

586
  *contLen += sizeof(SMsgHead);
34,272✔
587
  *pBuf = taosMemoryMalloc(*contLen);
35,119!
588
  if (NULL == *pBuf) {
35,119!
589
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
590
  }
591

592
  SEncoder encoder = {0};
35,119✔
593
  tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
35,119✔
594
  ret = tEncodeSVCreateStbReq(&encoder, req);
35,119✔
595
  tEncoderClear(&encoder);
35,119✔
596
  if (ret < 0) {
35,119!
597
    taosMemoryFreeClear(*pBuf);
×
598
    return TAOS_GET_TERRNO(ret);
×
599
  }
600
  return 0;
35,119✔
601
}
602

603
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
2,202,639✔
604
  if (uid == 0) {
2,202,639✔
605
    ctx->index = 0;
46,104✔
606
    return 0;
46,104✔
607
  }
608

609
  SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
2,156,535✔
610
  if (idInfo == NULL) {
2,158,235!
611
    return terrno;
×
612
  }
613

614
  ctx->index = idInfo->index;
2,158,235✔
615

616
  return 0;
2,158,235✔
617
}
618

619
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
2,184,771✔
620
  bool            ret = false;
2,184,771✔
621
  SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL, 0);
2,184,771✔
622
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
2,181,403!
623
    ret = true;
×
624
  }
625
  tDeleteSchemaWrapper(schema);
626
  ctx->hasPrimaryKey = ret;
2,179,708✔
627
}
2,183,918✔
628

629
bool taosXGetTablePrimaryKey(SSnapContext* ctx) { return ctx->hasPrimaryKey; }
4,226,411!
630

631
int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
129,328✔
632
  int32_t ret = 0;
129,328✔
633
  void*   pKey = NULL;
129,328✔
634
  void*   pVal = NULL;
129,328✔
635
  int     vLen = 0, kLen = 0;
129,328✔
636

637
  while (1) {
×
638
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
129,328✔
639
      metaDebug("tmqsnap get meta end");
15,391!
640
      ctx->index = 0;
15,391✔
641
      ctx->queryMeta = 0;  // change to get data
15,391✔
642
      return 0;
15,391✔
643
    }
644

645
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
113,937✔
646
    if (uidTmp == NULL) {
113,937!
647
      metaError("tmqsnap get meta null uid");
×
648
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
649
    }
650
    ctx->index++;
113,937✔
651
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
113,937✔
652
    if (!idInfo) {
113,937!
653
      metaError("meta/snap: null idInfo");
×
654
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
655
    }
656

657
    *uid = *uidTmp;
113,937✔
658
    ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
113,937✔
659
    if (ret == 0) {
113,937!
660
      break;
113,937✔
661
    }
662
    metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version);
×
663
  }
664

665
  ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
113,937✔
666
  if (ret < 0) {
113,937!
667
    return TAOS_GET_TERRNO(ret);
×
668
  }
669
  SDecoder   dc = {0};
113,937✔
670
  SMetaEntry me = {0};
113,937✔
671
  tDecoderInit(&dc, pVal, vLen);
113,937✔
672
  ret = metaDecodeEntry(&dc, &me);
113,937✔
673
  if (ret < 0) {
113,937!
674
    tDecoderClear(&dc);
×
675
    ret = TAOS_GET_TERRNO(ret);
×
676
    goto END;
×
677
  }
678
  metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
113,937!
679

680
  if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
113,937!
681
      (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
113,937!
682
    SVCreateStbReq req = {0};
35,119✔
683
    req.name = me.name;
35,119✔
684
    req.suid = me.uid;
35,119✔
685
    req.schemaRow = me.stbEntry.schemaRow;
35,119✔
686
    req.schemaTag = me.stbEntry.schemaTag;
35,119✔
687
    req.schemaRow.version = 1;
35,119✔
688
    req.schemaTag.version = 1;
35,119✔
689
    req.colCmpr = me.colCmpr;
35,119✔
690
    req.pExtSchemas = me.pExtSchemas;
35,119✔
691

692
    ret = buildSuperTableInfo(&req, pBuf, contLen);
35,119✔
693
    *type = TDMT_VND_CREATE_STB;
35,119✔
694
  } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
78,818!
695
             (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
78,818!
696
    STableInfoForChildTable* data =
697
        (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
70,598✔
698
    if (!data) {
70,598!
699
      metaError("meta/snap: null data");
×
700
      ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
701
      goto END;
×
702
    }
703

704
    SVCreateTbReq req = {0};
70,598✔
705

706
    req.type = TSDB_CHILD_TABLE;
70,598✔
707
    req.name = me.name;
70,598✔
708
    req.uid = me.uid;
70,598✔
709
    req.commentLen = -1;
70,598✔
710
    req.ctb.suid = me.ctbEntry.suid;
70,598✔
711
    req.ctb.tagNum = data->tagRow->nCols;
70,598✔
712
    req.ctb.stbName = data->tableName;
70,598✔
713

714
    SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
70,598✔
715
    if (tagName == NULL) {
70,598!
716
      metaError("meta/snap: init tag name failed.");
×
717
      ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
718
      goto END;
×
719
    }
720
    STag* p = (STag*)me.ctbEntry.pTags;
70,598✔
721
    if (tTagIsJson(p)) {
70,598✔
722
      if (p->nTag != 0) {
9,864✔
723
        SSchema* schema = &data->tagRow->pSchema[0];
4,932✔
724
        if (taosArrayPush(tagName, schema->name) == NULL) {
9,864!
725
          ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
726
          taosArrayDestroy(tagName);
×
727
          goto END;
×
728
        }
729
      }
730
    } else {
731
      SArray* pTagVals = NULL;
60,734✔
732
      ret = tTagToValArray((const STag*)p, &pTagVals);
60,734✔
733
      if (ret != 0) {
60,734!
734
        metaError("meta/snap: tag to val array failed.");
×
735
        taosArrayDestroy(pTagVals);
×
736
        taosArrayDestroy(tagName);
×
737
        goto END;
×
738
      }
739
      int16_t nCols = taosArrayGetSize(pTagVals);
60,734✔
740
      for (int j = 0; j < nCols; ++j) {
203,668✔
741
        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
142,934✔
742
        for (int i = 0; pTagVal && i < data->tagRow->nCols; i++) {
571,924!
743
          SSchema* schema = &data->tagRow->pSchema[i];
428,990✔
744
          if (schema->colId == pTagVal->cid) {
428,990✔
745
            if (taosArrayPush(tagName, schema->name) == NULL) {
285,868!
746
              ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
747
              taosArrayDestroy(pTagVals);
×
748
              taosArrayDestroy(tagName);
×
749
              goto END;
×
750
            }
751
          }
752
        }
753
      }
754
      taosArrayDestroy(pTagVals);
60,734✔
755
    }
756
    req.ctb.pTag = me.ctbEntry.pTags;
70,598✔
757
    req.ctb.tagName = tagName;
70,598✔
758
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
70,598✔
759
    *type = TDMT_VND_CREATE_TABLE;
70,598✔
760
    taosArrayDestroy(tagName);
70,598✔
761
  } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
16,440!
762
    SVCreateTbReq req = {0};
8,220✔
763
    req.type = TSDB_NORMAL_TABLE;
8,220✔
764
    req.name = me.name;
8,220✔
765
    req.uid = me.uid;
8,220✔
766
    req.commentLen = -1;
8,220✔
767
    req.ntb.schemaRow = me.ntbEntry.schemaRow;
8,220✔
768
    req.colCmpr = me.colCmpr;
8,220✔
769
    req.pExtSchemas = me.pExtSchemas;
8,220✔
770
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
8,220✔
771
    *type = TDMT_VND_CREATE_TABLE;
8,220✔
772
  } else {
773
    metaError("meta/snap: invalid topic sub type: %" PRId8 " get meta from snap failed.", ctx->subType);
×
774
    ret = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
×
775
  }
776

777
END:
113,937✔
778
  tDecoderClear(&dc);
113,937✔
779
  return ret;
113,937✔
780
}
781

782
int32_t getMetaTableInfoFromSnapshot(SSnapContext* ctx, SMetaTableInfo* result) {
4,377,054✔
783
  void* pKey = NULL;
4,377,054✔
784
  void* pVal = NULL;
4,377,896✔
785
  int   vLen, kLen;
4,377,896✔
786

787
  while (1) {
55,703✔
788
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
4,433,599✔
789
      metaDebug("tmqsnap get uid info end");
34,048✔
790
      return 0;
34,048✔
791
    }
792
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
4,399,551✔
793
    if (uidTmp == NULL) {
4,399,551!
794
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
795
    }
796
    ctx->index++;
4,399,551✔
797
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
4,399,551✔
798
    if (!idInfo) {
4,399,551!
799
      metaError("meta/snap: null idInfo");
×
800
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
801
    }
802

803
    int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
4,399,551✔
804
    if (ret != 0) {
4,398,709!
805
      metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp,
×
806
                idInfo->version);
807
      continue;
×
808
    }
809
    ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
4,398,709✔
810
    if (ret != 0) {
4,397,014!
811
      return TAOS_GET_TERRNO(ret);
×
812
    }
813
    SDecoder   dc = {0};
4,397,014✔
814
    SMetaEntry me = {0};
4,397,867✔
815
    tDecoderInit(&dc, pVal, vLen);
4,397,867✔
816
    ret = metaDecodeEntry(&dc, &me);
4,398,709✔
817
    if (ret != 0) {
4,391,131!
818
      tDecoderClear(&dc);
×
819
      return TAOS_GET_TERRNO(ret);
×
820
    }
821
    metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
4,391,131✔
822

823
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
4,391,131!
824
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
4,392,804!
825
      STableInfoForChildTable* data =
826
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
969,392✔
827
      if (data == NULL) {
971,076!
828
        tDecoderClear(&dc);
×
829
        metaError("meta/snap: null data");
×
830
        return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
831
      }
832
      result->suid = me.ctbEntry.suid;
971,076✔
833
      result->schema = tCloneSSchemaWrapper(data->schemaRow);
1,941,299!
834
      if (data->pExtSchemas != NULL) {
970,223✔
835
        result->pExtSchemas = taosMemoryMalloc(sizeof(SExtSchema) * data->schemaRow->nCols);
26,351!
836
        if (result->pExtSchemas == NULL) {
26,351!
837
          tDecoderClear(&dc);
×
838
          metaError("result->pExtSchemas malloc failed");
×
839
          return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
840
        }
841
        memcpy(result->pExtSchemas, data->pExtSchemas, sizeof(SExtSchema) * data->schemaRow->nCols);
26,351!
842
      }
843
      
844
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
3,421,739✔
845
      result->suid = 0;
3,367,720!
846
      result->schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
3,367,720✔
847
      if (me.pExtSchemas != NULL) {
3,367,720!
848
        result->pExtSchemas = taosMemoryMalloc(sizeof(SExtSchema) * me.ntbEntry.schemaRow.nCols);
×
849
        if (result->pExtSchemas == NULL) {
×
850
          tDecoderClear(&dc);
×
851
          metaError("result->pExtSchemas malloc failed");
×
852
          return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
853
        }
854
        memcpy(result->pExtSchemas, me.pExtSchemas, sizeof(SExtSchema) * me.ntbEntry.schemaRow.nCols);
×
855
      }
856
    } else {
857
      metaDebug("tmqsnap get uid continue");
56,545✔
858
      tDecoderClear(&dc);
56,545✔
859
      continue;
55,703✔
860
    }
861
    result->uid = me.uid;
4,338,796✔
862
    tstrncpy(result->tbName, me.name, TSDB_TABLE_NAME_LEN);
4,339,638!
863
    tDecoderClear(&dc);
4,339,638✔
864
    if (result->schema == NULL) {
4,339,627!
865
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
866
    }
867
    break;
4,337,954✔
868
  }
869
  return 0;
4,342,164✔
870
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc