• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3580

14 Jan 2025 02:54AM UTC coverage: 63.556% (+0.6%) from 62.976%
#3580

push

travis-ci

web-flow
Merge pull request #29504 from taosdata/chore/codeowners

chore: add codeowners

140725 of 284535 branches covered (49.46%)

Branch coverage included in aggregate %.

219148 of 281694 relevant lines covered (77.8%)

19019641.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.19
/source/dnode/vnode/src/meta/metaSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17

18
// SMetaSnapReader ========================================
19
struct SMetaSnapReader {
20
  SMeta*  pMeta;
21
  int64_t sver;
22
  int64_t ever;
23
  TBC*    pTbc;
24
};
25

26
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
73✔
27
  int32_t          code = 0;
73✔
28
  int32_t          lino;
29
  int32_t          c = 0;
73✔
30
  SMetaSnapReader* pReader = NULL;
73✔
31

32
  // alloc
33
  pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
73!
34
  if (pReader == NULL) {
73!
35
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
36
  }
37
  pReader->pMeta = pMeta;
73✔
38
  pReader->sver = sver;
73✔
39
  pReader->ever = ever;
73✔
40

41
  // impl
42
  code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
73✔
43
  TSDB_CHECK_CODE(code, lino, _exit);
73!
44

45
  code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
73✔
46
  TSDB_CHECK_CODE(code, lino, _exit);
73!
47

48
_exit:
73✔
49
  if (code) {
73!
50
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
51
    metaSnapReaderClose(&pReader);
×
52
    *ppReader = NULL;
×
53
  } else {
54
    metaInfo("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
73!
55
    *ppReader = pReader;
73✔
56
  }
57
  return code;
73✔
58
}
59

60
void metaSnapReaderClose(SMetaSnapReader** ppReader) {
73✔
61
  if (ppReader && *ppReader) {
73!
62
    tdbTbcClose((*ppReader)->pTbc);
73✔
63
    taosMemoryFree(*ppReader);
73!
64
    *ppReader = NULL;
73✔
65
  }
66
}
73✔
67

68
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
79,747✔
69
  int32_t     code = 0;
79,747✔
70
  const void* pKey = NULL;
79,747✔
71
  const void* pData = NULL;
79,747✔
72
  int32_t     nKey = 0;
79,747✔
73
  int32_t     nData = 0;
79,747✔
74
  STbDbKey    key;
75

76
  *ppData = NULL;
79,747✔
77
  for (;;) {
×
78
    if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
79,747✔
79
      goto _exit;
73✔
80
    }
81

82
    key = ((STbDbKey*)pKey)[0];
79,674✔
83
    if (key.version > pReader->ever) {
79,674!
84
      goto _exit;
×
85
    }
86

87
    if (key.version < pReader->sver) {
79,674!
88
      if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
×
89
        metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
90
      }
91
      continue;
×
92
    }
93

94
    if (!pData || !nData) {
79,674!
95
      metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData);
×
96
      goto _exit;
×
97
    }
98

99
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
79,674!
100
    if (*ppData == NULL) {
79,674!
101
      code = terrno;
×
102
      goto _exit;
×
103
    }
104

105
    SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
79,674✔
106
    pHdr->type = SNAP_DATA_META;
79,674✔
107
    pHdr->size = nData;
79,674✔
108
    memcpy(pHdr->data, pData, nData);
79,674✔
109

110
    metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
79,674!
111
              TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
112

113
    if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
79,674!
114
      metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
115
    }
116
    break;
79,674✔
117
  }
118

119
_exit:
79,747✔
120
  if (code) {
79,747!
121
    metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode),
×
122
              tstrerror(code));
123
  }
124
  return code;
79,747✔
125
}
126

127
// SMetaSnapWriter ========================================
128
struct SMetaSnapWriter {
129
  SMeta*  pMeta;
130
  int64_t sver;
131
  int64_t ever;
132
};
133

134
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
67✔
135
  int32_t          code = 0;
67✔
136
  int32_t          lino;
137
  SMetaSnapWriter* pWriter;
138

139
  // alloc
140
  pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
67!
141
  if (pWriter == NULL) {
67!
142
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
143
  }
144
  pWriter->pMeta = pMeta;
67✔
145
  pWriter->sver = sver;
67✔
146
  pWriter->ever = ever;
67✔
147

148
  code = metaBegin(pMeta, META_BEGIN_HEAP_NIL);
67✔
149
  TSDB_CHECK_CODE(code, lino, _exit);
67!
150

151
_exit:
67✔
152
  if (code) {
67!
153
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
154
    taosMemoryFree(pWriter);
×
155
    *ppWriter = NULL;
×
156
  } else {
157
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
67!
158
    *ppWriter = pWriter;
67✔
159
  }
160
  return code;
67✔
161
}
162

163
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
67✔
164
  int32_t          code = 0;
67✔
165
  SMetaSnapWriter* pWriter = *ppWriter;
67✔
166

167
  if (rollback) {
67!
168
    metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
×
169
    code = metaAbort(pWriter->pMeta);
×
170
    metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
×
171
             code);
172
    if (code) goto _err;
×
173
  } else {
174
    code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
67✔
175
    if (code) goto _err;
67!
176
    code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
67✔
177
    if (code) goto _err;
67!
178
  }
179
  taosMemoryFree(pWriter);
67!
180
  *ppWriter = NULL;
67✔
181

182
  return code;
67✔
183

184
_err:
×
185
  metaError("vgId:%d, meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
×
186
  return code;
×
187
}
188

189
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
79,669✔
190
  int32_t    code = 0;
79,669✔
191
  int32_t    lino = 0;
79,669✔
192
  SMeta*     pMeta = pWriter->pMeta;
79,669✔
193
  SMetaEntry metaEntry = {0};
79,669✔
194
  SDecoder*  pDecoder = &(SDecoder){0};
79,669✔
195

196
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
79,669✔
197
  code = metaDecodeEntry(pDecoder, &metaEntry);
79,669✔
198
  TSDB_CHECK_CODE(code, lino, _exit);
79,669!
199

200
  metaHandleSyncEntry(pMeta, &metaEntry);
79,669✔
201

202
_exit:
79,669✔
203
  if (code) {
79,669!
204
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
205
  }
206
  tDecoderClear(pDecoder);
79,669✔
207
  return code;
79,669✔
208
}
209

210
typedef struct STableInfoForChildTable {
211
  char*           tableName;
212
  SSchemaWrapper* schemaRow;
213
  SSchemaWrapper* tagRow;
214
} STableInfoForChildTable;
215

216
static void destroySTableInfoForChildTable(void* data) {
440✔
217
  STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
440✔
218
  taosMemoryFree(pData->tableName);
440!
219
  tDeleteSchemaWrapper(pData->schemaRow);
440!
220
  tDeleteSchemaWrapper(pData->tagRow);
440!
221
}
440✔
222

223
static int32_t MoveToSnapShotVersion(SSnapContext* ctx) {
305✔
224
  int32_t code = 0;
305✔
225
  tdbTbcClose((TBC*)ctx->pCur);
305✔
226
  code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
305✔
227
  if (code != 0) {
305!
228
    return TAOS_GET_TERRNO(code);
×
229
  }
230
  STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
305✔
231
  int      c = 0;
305✔
232
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
305✔
233
  if (code != 0) {
305!
234
    return TAOS_GET_TERRNO(code);
×
235
  }
236
  if (c < 0) {
305✔
237
    if (tdbTbcMoveToPrev((TBC*)ctx->pCur) != 0) {
3!
238
      metaTrace("vgId:%d, vnode snapshot move to prev failed", TD_VID(ctx->pMeta->pVnode));
×
239
    }
240
  }
241
  return 0;
305✔
242
}
243

244
static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) {
4,367✔
245
  tdbTbcClose((TBC*)ctx->pCur);
4,367✔
246
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
4,375✔
247
  if (code != 0) {
4,365!
248
    return TAOS_GET_TERRNO(code);
×
249
  }
250
  STbDbKey key = {.version = ver, .uid = uid};
4,365✔
251
  int      c = 0;
4,365✔
252
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
4,365✔
253
  if (code != 0) {
4,370!
254
    return TAOS_GET_TERRNO(code);
×
255
  }
256
  return c;
4,370✔
257
}
258

259
static int32_t MoveToFirst(SSnapContext* ctx) {
305✔
260
  tdbTbcClose((TBC*)ctx->pCur);
305✔
261
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
305✔
262
  if (code != 0) {
305!
263
    return TAOS_GET_TERRNO(code);
×
264
  }
265
  code = tdbTbcMoveToFirst((TBC*)ctx->pCur);
305✔
266
  if (code != 0) {
305✔
267
    return TAOS_GET_TERRNO(code);
1!
268
  }
269
  return 0;
304✔
270
}
271

272
static int32_t saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) {
440✔
273
  STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
440✔
274
  if (data) {
440!
275
    return 0;
×
276
  }
277
  int32_t                 code = 0;
440✔
278
  STableInfoForChildTable dataTmp = {0};
440✔
279
  dataTmp.tableName = taosStrdup(me->name);
440!
280
  if (dataTmp.tableName == NULL) {
440!
281
    code = terrno;
×
282
    goto END;
×
283
  }
284
  dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
440!
285
  if (dataTmp.schemaRow == NULL) {
440!
286
    code = TSDB_CODE_OUT_OF_MEMORY;
×
287
    goto END;
×
288
  }
289
  dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
440!
290
  if (dataTmp.tagRow == NULL) {
440!
291
    code = TSDB_CODE_OUT_OF_MEMORY;
×
292
    goto END;
×
293
  }
294
  code = taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
440✔
295
  if (code != 0) {
440!
296
    goto END;
×
297
  }
298
  return 0;
440✔
299

300
END:
×
301
  destroySTableInfoForChildTable(&dataTmp);
×
302
  return TAOS_GET_TERRNO(code);
×
303
  ;
304
}
305

306
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
304✔
307
                         SSnapContext** ctxRet) {
308
  SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
304!
309
  if (ctx == NULL) {
305!
310
    return terrno;
×
311
  }
312
  *ctxRet = ctx;
305✔
313
  ctx->pMeta = pVnode->pMeta;
305✔
314
  ctx->snapVersion = snapVersion;
305✔
315
  ctx->suid = suid;
305✔
316
  ctx->subType = subType;
305✔
317
  ctx->queryMeta = withMeta;
305✔
318
  ctx->withMeta = withMeta;
305✔
319
  ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
305✔
320
  if (ctx->idVersion == NULL) {
305!
321
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
322
  }
323

324
  ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
305✔
325
  if (ctx->suidInfo == NULL) {
305!
326
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
327
  }
328
  taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);
305✔
329

330
  ctx->index = 0;
305✔
331
  ctx->idList = taosArrayInit(100, sizeof(int64_t));
305✔
332
  if (ctx->idList == NULL) {
305!
333
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
334
    ;
335
  }
336
  void* pKey = NULL;
305✔
337
  void* pVal = NULL;
305✔
338
  int   vLen = 0, kLen = 0;
305✔
339

340
  metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
305✔
341
  int32_t code = MoveToFirst(ctx);
305✔
342
  if (code != 0) {
304!
343
    return code;
×
344
  }
345
  while (1) {
7,216✔
346
    int32_t ret = tdbTbcNext((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
7,520✔
347
    if (ret < 0) break;
7,413✔
348
    STbDbKey* tmp = (STbDbKey*)pKey;
7,112✔
349
    if (tmp->version > ctx->snapVersion) break;
7,112✔
350

351
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
7,108✔
352
    if (idData) {
7,094✔
353
      continue;
526✔
354
    }
355

356
    if (tdbTbGet(ctx->pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
6,806✔
357
        0) {  // check if table exist for now, need optimize later
358
      continue;
225✔
359
    }
360

361
    SDecoder   dc = {0};
6,694✔
362
    SMetaEntry me = {0};
6,694✔
363
    tDecoderInit(&dc, pVal, vLen);
6,694✔
364
    ret = metaDecodeEntry(&dc, &me);
6,692✔
365
    if (ret < 0) {
6,534!
366
      tDecoderClear(&dc);
×
367
      return TAOS_GET_TERRNO(ret);
×
368
    }
369
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
6,534✔
370
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
786✔
371
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
782✔
372
        tDecoderClear(&dc);
13✔
373
        continue;
13✔
374
      }
375
    }
376

377
    if (taosArrayPush(ctx->idList, &tmp->uid) == NULL) {
13,039!
378
      tDecoderClear(&dc);
×
379
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
380
    }
381
    metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
6,518✔
382
    tDecoderClear(&dc);
6,519✔
383

384
    SIdInfo info = {0};
6,688✔
385
    if (taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo)) != 0) {
6,688!
386
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
387
    }
388
  }
389
  taosHashClear(ctx->idVersion);
305✔
390

391
  code = MoveToSnapShotVersion(ctx);
305✔
392
  if (code != 0) {
305!
393
    return code;
×
394
  }
395
  while (1) {
7,182✔
396
    int32_t ret = tdbTbcPrev((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
7,487✔
397
    if (ret < 0) break;
7,377✔
398

399
    STbDbKey* tmp = (STbDbKey*)pKey;
7,072✔
400
    SIdInfo*  idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
7,072✔
401
    if (idData) {
7,058✔
402
      continue;
467✔
403
    }
404
    SIdInfo info = {.version = tmp->version, .index = 0};
6,604✔
405
    ret = taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
6,604✔
406
    if (ret != 0) {
6,748!
407
      return TAOS_GET_TERRNO(ret);
×
408
    }
409

410
    SDecoder   dc = {0};
6,748✔
411
    SMetaEntry me = {0};
6,748✔
412
    tDecoderInit(&dc, pVal, vLen);
6,748✔
413
    ret = metaDecodeEntry(&dc, &me);
6,713✔
414
    if (ret < 0) {
6,555!
415
      tDecoderClear(&dc);
×
416
      return TAOS_GET_TERRNO(ret);
×
417
    }
418

419
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
6,555✔
420
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
783✔
421
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
779✔
422
        tDecoderClear(&dc);
13✔
423
        continue;
13✔
424
      }
425
    }
426

427
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
6,542✔
428
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
6,169✔
429
      ret = saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
439✔
430
      if (ret != 0) {
440!
431
        tDecoderClear(&dc);
×
432
        return ret;
×
433
      }
434
    }
435
    tDecoderClear(&dc);
6,543✔
436
  }
437

438
  for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
7,017✔
439
    int64_t* uid = taosArrayGet(ctx->idList, i);
6,712✔
440
    if (uid == NULL) {
6,712!
441
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
442
    }
443
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
6,712✔
444
    if (!idData) {
6,712!
445
      metaError("meta/snap: null idData");
×
446
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
447
    }
448

449
    idData->index = i;
6,712✔
450
    metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version,
6,712✔
451
              idData->index);
452
  }
453

454
  tdbFree(pKey);
305✔
455
  tdbFree(pVal);
305✔
456
  return TDB_CODE_SUCCESS;
305✔
457
}
458

459
void destroySnapContext(SSnapContext* ctx) {
304✔
460
  tdbTbcClose((TBC*)ctx->pCur);
304✔
461
  taosArrayDestroy(ctx->idList);
305✔
462
  taosHashCleanup(ctx->idVersion);
305✔
463
  taosHashCleanup(ctx->suidInfo);
305✔
464
  taosMemoryFree(ctx);
305!
465
}
305✔
466

467
static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) {
91✔
468
  int32_t            ret = 0;
91✔
469
  SVCreateTbBatchReq reqs = {0};
91✔
470

471
  reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
91✔
472
  if (NULL == reqs.pArray) {
91!
473
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
474
    goto end;
×
475
  }
476
  if (taosArrayPush(reqs.pArray, req) == NULL) {
182!
477
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
478
    goto end;
×
479
  }
480
  reqs.nReqs = 1;
91✔
481

482
  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
91!
483
  if (ret < 0) {
91!
484
    ret = TAOS_GET_TERRNO(ret);
×
485
    goto end;
×
486
  }
487
  *contLen += sizeof(SMsgHead);
91✔
488
  *pBuf = taosMemoryMalloc(*contLen);
91!
489
  if (NULL == *pBuf) {
91!
490
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
491
    goto end;
×
492
  }
493
  SEncoder coder = {0};
91✔
494
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
91✔
495
  ret = tEncodeSVCreateTbBatchReq(&coder, &reqs);
91✔
496
  tEncoderClear(&coder);
91✔
497

498
  if (ret < 0) {
91!
499
    taosMemoryFreeClear(*pBuf);
×
500
    ret = TAOS_GET_TERRNO(ret);
×
501
    goto end;
×
502
  }
503

504
end:
91✔
505
  taosArrayDestroy(reqs.pArray);
91✔
506
  return ret;
91✔
507
}
508

509
static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) {
40✔
510
  int32_t ret = 0;
40✔
511
  tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
40!
512
  if (ret < 0) {
40!
513
    return TAOS_GET_TERRNO(ret);
×
514
  }
515

516
  *contLen += sizeof(SMsgHead);
40✔
517
  *pBuf = taosMemoryMalloc(*contLen);
40!
518
  if (NULL == *pBuf) {
40!
519
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
520
  }
521

522
  SEncoder encoder = {0};
40✔
523
  tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
40✔
524
  ret = tEncodeSVCreateStbReq(&encoder, req);
40✔
525
  tEncoderClear(&encoder);
40✔
526
  if (ret < 0) {
40!
527
    taosMemoryFreeClear(*pBuf);
×
528
    return TAOS_GET_TERRNO(ret);
×
529
  }
530
  return 0;
40✔
531
}
532

533
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
2,124✔
534
  if (uid == 0) {
2,124✔
535
    ctx->index = 0;
47✔
536
    return 0;
47✔
537
  }
538

539
  SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
2,077✔
540
  if (idInfo == NULL) {
2,081!
541
    return terrno;
×
542
  }
543

544
  ctx->index = idInfo->index;
2,081✔
545

546
  return 0;
2,081✔
547
}
548

549
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
2,112✔
550
  bool            ret = false;
2,112✔
551
  SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL);
2,112✔
552
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
2,106!
553
    ret = true;
4✔
554
  }
555
  tDeleteSchemaWrapper(schema);
556
  ctx->hasPrimaryKey = ret;
2,109✔
557
}
2,109✔
558

559
bool taosXGetTablePrimaryKey(SSnapContext* ctx) { return ctx->hasPrimaryKey; }
4,211✔
560

561
int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
149✔
562
  int32_t ret = 0;
149✔
563
  void*   pKey = NULL;
149✔
564
  void*   pVal = NULL;
149✔
565
  int     vLen = 0, kLen = 0;
149✔
566

567
  while (1) {
×
568
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
149✔
569
      metaDebug("tmqsnap get meta end");
18!
570
      ctx->index = 0;
18✔
571
      ctx->queryMeta = 0;  // change to get data
18✔
572
      return 0;
18✔
573
    }
574

575
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
131✔
576
    if (uidTmp == NULL) {
131!
577
      metaError("tmqsnap get meta null uid");
×
578
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
579
    }
580
    ctx->index++;
131✔
581
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
131✔
582
    if (!idInfo) {
131!
583
      metaError("meta/snap: null idInfo");
×
584
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
585
    }
586

587
    *uid = *uidTmp;
131✔
588
    ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
131✔
589
    if (ret == 0) {
131!
590
      break;
131✔
591
    }
592
    metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version);
×
593
  }
594

595
  ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
131✔
596
  if (ret < 0) {
131!
597
    return TAOS_GET_TERRNO(ret);
×
598
  }
599
  SDecoder   dc = {0};
131✔
600
  SMetaEntry me = {0};
131✔
601
  tDecoderInit(&dc, pVal, vLen);
131✔
602
  ret = metaDecodeEntry(&dc, &me);
131✔
603
  if (ret < 0) {
131!
604
    tDecoderClear(&dc);
×
605
    ret = TAOS_GET_TERRNO(ret);
×
606
    goto END;
×
607
  }
608
  metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
131!
609

610
  if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
131✔
611
      (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
93✔
612
    SVCreateStbReq req = {0};
40✔
613
    req.name = me.name;
40✔
614
    req.suid = me.uid;
40✔
615
    req.schemaRow = me.stbEntry.schemaRow;
40✔
616
    req.schemaTag = me.stbEntry.schemaTag;
40✔
617
    req.schemaRow.version = 1;
40✔
618
    req.schemaTag.version = 1;
40✔
619
    req.colCmpr = me.colCmpr;
40✔
620

621
    ret = buildSuperTableInfo(&req, pBuf, contLen);
40✔
622
    *type = TDMT_VND_CREATE_STB;
40✔
623
  } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
91✔
624
             (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
93!
625
    STableInfoForChildTable* data =
626
        (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
79✔
627
    if (!data) {
79!
628
      metaError("meta/snap: null data");
×
629
      ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
630
      goto END;
×
631
    }
632

633
    SVCreateTbReq req = {0};
79✔
634

635
    req.type = TSDB_CHILD_TABLE;
79✔
636
    req.name = me.name;
79✔
637
    req.uid = me.uid;
79✔
638
    req.commentLen = -1;
79✔
639
    req.ctb.suid = me.ctbEntry.suid;
79✔
640
    req.ctb.tagNum = data->tagRow->nCols;
79✔
641
    req.ctb.stbName = data->tableName;
79✔
642

643
    SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
79✔
644
    if (tagName == NULL) {
79!
645
      metaError("meta/snap: init tag name failed.");
×
646
      ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
647
      goto END;
×
648
    }
649
    STag* p = (STag*)me.ctbEntry.pTags;
79✔
650
    if (tTagIsJson(p)) {
79✔
651
      if (p->nTag != 0) {
12✔
652
        SSchema* schema = &data->tagRow->pSchema[0];
6✔
653
        if (taosArrayPush(tagName, schema->name) == NULL) {
12!
654
          ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
655
          taosArrayDestroy(tagName);
×
656
          goto END;
×
657
        }
658
      }
659
    } else {
660
      SArray* pTagVals = NULL;
67✔
661
      ret = tTagToValArray((const STag*)p, &pTagVals);
67✔
662
      if (ret != 0) {
67!
663
        metaError("meta/snap: tag to val array failed.");
×
664
        taosArrayDestroy(pTagVals);
×
665
        taosArrayDestroy(tagName);
×
666
        goto END;
×
667
      }
668
      int16_t nCols = taosArrayGetSize(pTagVals);
67✔
669
      for (int j = 0; j < nCols; ++j) {
234✔
670
        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
167✔
671
        for (int i = 0; pTagVal && i < data->tagRow->nCols; i++) {
682!
672
          SSchema* schema = &data->tagRow->pSchema[i];
515✔
673
          if (schema->colId == pTagVal->cid) {
515✔
674
            if (taosArrayPush(tagName, schema->name) == NULL) {
334!
675
              ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
676
              taosArrayDestroy(pTagVals);
×
677
              taosArrayDestroy(tagName);
×
678
              goto END;
×
679
            }
680
          }
681
        }
682
      }
683
      taosArrayDestroy(pTagVals);
67✔
684
    }
685
    req.ctb.pTag = me.ctbEntry.pTags;
79✔
686
    req.ctb.tagName = tagName;
79✔
687
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
79✔
688
    *type = TDMT_VND_CREATE_TABLE;
79✔
689
    taosArrayDestroy(tagName);
79✔
690
  } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
12!
691
    SVCreateTbReq req = {0};
12✔
692
    req.type = TSDB_NORMAL_TABLE;
12✔
693
    req.name = me.name;
12✔
694
    req.uid = me.uid;
12✔
695
    req.commentLen = -1;
12✔
696
    req.ntb.schemaRow = me.ntbEntry.schemaRow;
12✔
697
    req.colCmpr = me.colCmpr;
12✔
698
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
12✔
699
    *type = TDMT_VND_CREATE_TABLE;
12✔
700
  } else {
701
    metaError("meta/snap: invalid topic sub type: %" PRId8 " get meta from snap failed.", ctx->subType);
×
702
    ret = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
×
703
  }
704

705
END:
131✔
706
  tDecoderClear(&dc);
131✔
707
  return ret;
131✔
708
}
709

710
int32_t getMetaTableInfoFromSnapshot(SSnapContext* ctx, SMetaTableInfo* result) {
4,222✔
711
  void* pKey = NULL;
4,222✔
712
  void* pVal = NULL;
4,222✔
713
  int   vLen, kLen;
714

715
  while (1) {
45✔
716
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
4,267✔
717
      metaDebug("tmqsnap get uid info end");
32✔
718
      return 0;
32✔
719
    }
720
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
4,232✔
721
    if (uidTmp == NULL) {
4,231!
722
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
723
    }
724
    ctx->index++;
4,231✔
725
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
4,231✔
726
    if (!idInfo) {
4,238!
727
      metaError("meta/snap: null idInfo");
×
728
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
729
    }
730

731
    int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
4,238✔
732
    if (ret != 0) {
4,239!
733
      metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp,
×
734
                idInfo->version);
735
      continue;
45✔
736
    }
737
    ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
4,239✔
738
    if (ret != 0) {
4,235!
739
      return TAOS_GET_TERRNO(ret);
×
740
    }
741
    SDecoder   dc = {0};
4,235✔
742
    SMetaEntry me = {0};
4,235✔
743
    tDecoderInit(&dc, pVal, vLen);
4,235✔
744
    ret = metaDecodeEntry(&dc, &me);
4,238✔
745
    if (ret != 0) {
4,226!
746
      tDecoderClear(&dc);
×
747
      return TAOS_GET_TERRNO(ret);
×
748
    }
749
    metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
4,226✔
750

751
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
4,226!
752
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
4,038!
753
      STableInfoForChildTable* data =
754
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
191✔
755
      if (data == NULL) {
190!
756
        tDecoderClear(&dc);
×
757
        metaError("meta/snap: null data");
×
758
        return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
759
      }
760
      result->suid = me.ctbEntry.suid;
190✔
761
      result->schema = tCloneSSchemaWrapper(data->schemaRow);
380!
762
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
4,035!
763
      result->suid = 0;
3,997!
764
      result->schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
3,996✔
765
    } else {
766
      metaDebug("tmqsnap get uid continue");
38!
767
      tDecoderClear(&dc);
38✔
768
      continue;
45✔
769
    }
770
    result->uid = me.uid;
4,186✔
771
    tstrncpy(result->tbName, me.name, TSDB_TABLE_NAME_LEN);
4,186✔
772
    tDecoderClear(&dc);
4,186✔
773
    if (result->schema == NULL) {
4,197✔
774
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
4!
775
    }
776
    break;
4,193✔
777
  }
778
  return 0;
4,193✔
779
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc