• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.2
/source/dnode/vnode/src/meta/metaSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17

18
// SMetaSnapReader ========================================
19
struct SMetaSnapReader {
20
  SMeta*  pMeta;
21
  int64_t sver;
22
  int64_t ever;
23
  TBC*    pTbc;
24
};
25

26
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
69✔
27
  int32_t          code = 0;
69✔
28
  int32_t          lino;
29
  int32_t          c = 0;
69✔
30
  SMetaSnapReader* pReader = NULL;
69✔
31

32
  // alloc
33
  pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
69!
34
  if (pReader == NULL) {
69!
35
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
36
  }
37
  pReader->pMeta = pMeta;
69✔
38
  pReader->sver = sver;
69✔
39
  pReader->ever = ever;
69✔
40

41
  // impl
42
  code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
69✔
43
  TSDB_CHECK_CODE(code, lino, _exit);
69!
44

45
  code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
69✔
46
  TSDB_CHECK_CODE(code, lino, _exit);
69!
47

48
_exit:
69✔
49
  if (code) {
69!
50
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
51
    metaSnapReaderClose(&pReader);
×
52
    *ppReader = NULL;
×
53
  } else {
54
    metaInfo("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
69!
55
    *ppReader = pReader;
69✔
56
  }
57
  return code;
69✔
58
}
59

60
void metaSnapReaderClose(SMetaSnapReader** ppReader) {
69✔
61
  if (ppReader && *ppReader) {
69!
62
    tdbTbcClose((*ppReader)->pTbc);
69✔
63
    taosMemoryFree(*ppReader);
69!
64
    *ppReader = NULL;
69✔
65
  }
66
}
69✔
67

68
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
78,537✔
69
  int32_t     code = 0;
78,537✔
70
  const void* pKey = NULL;
78,537✔
71
  const void* pData = NULL;
78,537✔
72
  int32_t     nKey = 0;
78,537✔
73
  int32_t     nData = 0;
78,537✔
74
  STbDbKey    key;
75

76
  *ppData = NULL;
78,537✔
UNCOV
77
  for (;;) {
×
78
    if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
78,537✔
79
      goto _exit;
69✔
80
    }
81

82
    key = ((STbDbKey*)pKey)[0];
78,468✔
83
    if (key.version > pReader->ever) {
78,468!
UNCOV
84
      goto _exit;
×
85
    }
86

87
    if (key.version < pReader->sver) {
78,468!
UNCOV
88
      if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
×
UNCOV
89
        metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
90
      }
91
      continue;
×
92
    }
93

94
    if (!pData || !nData) {
78,468!
UNCOV
95
      metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData);
×
UNCOV
96
      goto _exit;
×
97
    }
98

99
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
78,468!
100
    if (*ppData == NULL) {
78,468!
UNCOV
101
      code = terrno;
×
UNCOV
102
      goto _exit;
×
103
    }
104

105
    SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
78,468✔
106
    pHdr->type = SNAP_DATA_META;
78,468✔
107
    pHdr->size = nData;
78,468✔
108
    memcpy(pHdr->data, pData, nData);
78,468✔
109

110
    metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
78,468!
111
              TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
112

113
    if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
78,468!
UNCOV
114
      metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
115
    }
116
    break;
78,468✔
117
  }
118

119
_exit:
78,537✔
120
  if (code) {
78,537!
UNCOV
121
    metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode),
×
122
              tstrerror(code));
123
  }
124
  return code;
78,537✔
125
}
126

127
// SMetaSnapWriter ========================================
128
struct SMetaSnapWriter {
129
  SMeta*  pMeta;
130
  int64_t sver;
131
  int64_t ever;
132
};
133

134
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
65✔
135
  int32_t          code = 0;
65✔
136
  int32_t          lino;
137
  SMetaSnapWriter* pWriter;
138

139
  // alloc
140
  pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
65!
141
  if (pWriter == NULL) {
65!
UNCOV
142
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
143
  }
144
  pWriter->pMeta = pMeta;
65✔
145
  pWriter->sver = sver;
65✔
146
  pWriter->ever = ever;
65✔
147

148
  code = metaBegin(pMeta, META_BEGIN_HEAP_NIL);
65✔
149
  TSDB_CHECK_CODE(code, lino, _exit);
65!
150

151
_exit:
65✔
152
  if (code) {
65!
UNCOV
153
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
UNCOV
154
    taosMemoryFree(pWriter);
×
155
    *ppWriter = NULL;
×
156
  } else {
157
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
65!
158
    *ppWriter = pWriter;
65✔
159
  }
160
  return code;
65✔
161
}
162

163
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
65✔
164
  int32_t          code = 0;
65✔
165
  SMetaSnapWriter* pWriter = *ppWriter;
65✔
166

167
  if (rollback) {
65!
UNCOV
168
    metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
×
UNCOV
169
    code = metaAbort(pWriter->pMeta);
×
170
    metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
×
171
             code);
172
    if (code) goto _err;
×
173
  } else {
174
    code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
65✔
175
    if (code) goto _err;
65!
176
    code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
65✔
177
    if (code) goto _err;
65!
178
  }
179
  taosMemoryFree(pWriter);
65!
180
  *ppWriter = NULL;
65✔
181

182
  return code;
65✔
183

UNCOV
184
_err:
×
UNCOV
185
  metaError("vgId:%d, meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
×
186
  return code;
×
187
}
188

189
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
78,464✔
190
  int32_t    code = 0;
78,464✔
191
  int32_t    lino = 0;
78,464✔
192
  SMeta*     pMeta = pWriter->pMeta;
78,464✔
193
  SMetaEntry metaEntry = {0};
78,464✔
194
  SDecoder*  pDecoder = &(SDecoder){0};
78,464✔
195

196
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
78,464✔
197
  code = metaDecodeEntry(pDecoder, &metaEntry);
78,464✔
198
  TSDB_CHECK_CODE(code, lino, _exit);
78,464!
199

200
  code = metaHandleEntry2(pMeta, &metaEntry);
78,464✔
201
  TSDB_CHECK_CODE(code, lino, _exit);
78,464!
202

203
_exit:
78,464✔
204
  if (code) {
78,464!
UNCOV
205
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
206
  }
207
  tDecoderClear(pDecoder);
78,464✔
208
  return code;
78,464✔
209
}
210

211
typedef struct STableInfoForChildTable {
212
  char*           tableName;
213
  SSchemaWrapper* schemaRow;
214
  SSchemaWrapper* tagRow;
215
} STableInfoForChildTable;
216

217
static void destroySTableInfoForChildTable(void* data) {
479✔
218
  STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
479✔
219
  taosMemoryFree(pData->tableName);
479!
220
  tDeleteSchemaWrapper(pData->schemaRow);
479!
221
  tDeleteSchemaWrapper(pData->tagRow);
479!
222
}
479✔
223

224
static int32_t MoveToSnapShotVersion(SSnapContext* ctx) {
321✔
225
  int32_t code = 0;
321✔
226
  tdbTbcClose((TBC*)ctx->pCur);
321✔
227
  code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
321✔
228
  if (code != 0) {
321!
UNCOV
229
    return TAOS_GET_TERRNO(code);
×
230
  }
231
  STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
321✔
232
  int      c = 0;
321✔
233
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
321✔
234
  if (code != 0) {
321!
UNCOV
235
    return TAOS_GET_TERRNO(code);
×
236
  }
237
  if (c < 0) {
321✔
238
    if (tdbTbcMoveToPrev((TBC*)ctx->pCur) != 0) {
3!
UNCOV
239
      metaTrace("vgId:%d, vnode snapshot move to prev failed", TD_VID(ctx->pMeta->pVnode));
×
240
    }
241
  }
242
  return 0;
321✔
243
}
244

245
static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) {
5,207✔
246
  tdbTbcClose((TBC*)ctx->pCur);
5,207✔
247
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
5,205✔
248
  if (code != 0) {
5,204!
UNCOV
249
    return TAOS_GET_TERRNO(code);
×
250
  }
251
  STbDbKey key = {.version = ver, .uid = uid};
5,204✔
252
  int      c = 0;
5,204✔
253
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
5,204✔
254
  if (code != 0) {
5,200!
UNCOV
255
    return TAOS_GET_TERRNO(code);
×
256
  }
257
  return c;
5,200✔
258
}
259

260
static int32_t MoveToFirst(SSnapContext* ctx) {
321✔
261
  tdbTbcClose((TBC*)ctx->pCur);
321✔
262
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
321✔
263
  if (code != 0) {
321!
UNCOV
264
    return TAOS_GET_TERRNO(code);
×
265
  }
266
  code = tdbTbcMoveToFirst((TBC*)ctx->pCur);
321✔
267
  if (code != 0) {
321!
UNCOV
268
    return TAOS_GET_TERRNO(code);
×
269
  }
270
  return 0;
321✔
271
}
272

273
static int32_t saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) {
479✔
274
  STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
479✔
275
  if (data) {
479!
UNCOV
276
    return 0;
×
277
  }
278
  int32_t                 code = 0;
479✔
279
  STableInfoForChildTable dataTmp = {0};
479✔
280
  dataTmp.tableName = taosStrdup(me->name);
479!
281
  if (dataTmp.tableName == NULL) {
479!
UNCOV
282
    code = terrno;
×
UNCOV
283
    goto END;
×
284
  }
285
  dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
479!
286
  if (dataTmp.schemaRow == NULL) {
479!
UNCOV
287
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
288
    goto END;
×
289
  }
290
  dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
479!
291
  if (dataTmp.tagRow == NULL) {
479!
UNCOV
292
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
293
    goto END;
×
294
  }
295
  code = taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
479✔
296
  if (code != 0) {
479!
UNCOV
297
    goto END;
×
298
  }
299
  return 0;
479✔
300

UNCOV
301
END:
×
UNCOV
302
  destroySTableInfoForChildTable(&dataTmp);
×
303
  return TAOS_GET_TERRNO(code);
×
304
  ;
305
}
306

307
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
317✔
308
                         SSnapContext** ctxRet) {
309
  SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
317!
310
  if (ctx == NULL) {
321!
UNCOV
311
    return terrno;
×
312
  }
313
  *ctxRet = ctx;
321✔
314
  ctx->pMeta = pVnode->pMeta;
321✔
315
  ctx->snapVersion = snapVersion;
321✔
316
  ctx->suid = suid;
321✔
317
  ctx->subType = subType;
321✔
318
  ctx->queryMeta = withMeta;
321✔
319
  ctx->withMeta = withMeta;
321✔
320
  ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
321✔
321
  if (ctx->idVersion == NULL) {
321!
UNCOV
322
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
323
  }
324

325
  ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
321✔
326
  if (ctx->suidInfo == NULL) {
321!
UNCOV
327
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
328
    ;
329
  }
330
  taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);
321✔
331

332
  ctx->index = 0;
321✔
333
  ctx->idList = taosArrayInit(100, sizeof(int64_t));
321✔
334
  if (ctx->idList == NULL) {
321!
UNCOV
335
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
336
    ;
337
  }
338
  void* pKey = NULL;
321✔
339
  void* pVal = NULL;
321✔
340
  int   vLen = 0, kLen = 0;
321✔
341

342
  metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
321✔
343
  int32_t code = MoveToFirst(ctx);
321✔
344
  if (code != 0) {
321!
UNCOV
345
    return code;
×
346
  }
347
  while (1) {
8,314✔
348
    int32_t ret = tdbTbcNext((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
8,635✔
349
    if (ret < 0) break;
8,385✔
350
    STbDbKey* tmp = (STbDbKey*)pKey;
8,068✔
351
    if (tmp->version > ctx->snapVersion) break;
8,068✔
352

353
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
8,064✔
354
    if (idData) {
8,107✔
355
      continue;
624✔
356
    }
357

358
    if (tdbTbGet(ctx->pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
7,821✔
359
        0) {  // check if table exist for now, need optimize later
360
      continue;
325✔
361
    }
362

363
    SDecoder   dc = {0};
7,580✔
364
    SMetaEntry me = {0};
7,580✔
365
    tDecoderInit(&dc, pVal, vLen);
7,580✔
366
    ret = metaDecodeEntry(&dc, &me);
7,546✔
367
    if (ret < 0) {
7,487!
UNCOV
368
      tDecoderClear(&dc);
×
UNCOV
369
      return TAOS_GET_TERRNO(ret);
×
370
    }
371
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
7,487✔
372
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
806✔
373
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
802✔
374
        tDecoderClear(&dc);
13✔
375
        continue;
13✔
376
      }
377
    }
378

379
    if (taosArrayPush(ctx->idList, &tmp->uid) == NULL) {
14,957!
UNCOV
380
      tDecoderClear(&dc);
×
UNCOV
381
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
382
    }
383
    metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
7,483✔
384
    tDecoderClear(&dc);
7,484✔
385

386
    SIdInfo info = {0};
7,600✔
387
    if (taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo)) != 0) {
7,600!
UNCOV
388
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
389
    }
390
  }
391
  taosHashClear(ctx->idVersion);
321✔
392

393
  code = MoveToSnapShotVersion(ctx);
321✔
394
  if (code != 0) {
321!
UNCOV
395
    return code;
×
396
  }
397
  while (1) {
8,214✔
398
    int32_t ret = tdbTbcPrev((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
8,535✔
399
    if (ret < 0) break;
8,342✔
400

401
    STbDbKey* tmp = (STbDbKey*)pKey;
8,021✔
402
    SIdInfo*  idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
8,021✔
403
    if (idData) {
7,940✔
404
      continue;
511✔
405
    }
406
    SIdInfo info = {.version = tmp->version, .index = 0};
7,442✔
407
    ret = taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
7,442✔
408
    if (ret != 0) {
7,731!
UNCOV
409
      return TAOS_GET_TERRNO(ret);
×
410
    }
411

412
    SDecoder   dc = {0};
7,731✔
413
    SMetaEntry me = {0};
7,731✔
414
    tDecoderInit(&dc, pVal, vLen);
7,731✔
415
    ret = metaDecodeEntry(&dc, &me);
7,658✔
416
    if (ret < 0) {
7,507!
UNCOV
417
      tDecoderClear(&dc);
×
UNCOV
418
      return TAOS_GET_TERRNO(ret);
×
419
    }
420

421
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
7,507✔
422
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
806✔
423
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
802✔
424
        tDecoderClear(&dc);
13✔
425
        continue;
13✔
426
      }
427
    }
428

429
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
7,494✔
430
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
7,085✔
431
      ret = saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
476✔
432
      if (ret != 0) {
479!
UNCOV
433
        tDecoderClear(&dc);
×
UNCOV
434
        return ret;
×
435
      }
436
    }
437
    tDecoderClear(&dc);
7,497✔
438
  }
439

440
  for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
7,901✔
441
    int64_t* uid = taosArrayGet(ctx->idList, i);
7,576✔
442
    if (uid == NULL) {
7,576!
UNCOV
443
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
444
    }
445
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
7,576✔
446
    if (!idData) {
7,579!
UNCOV
447
      metaError("meta/snap: null idData");
×
UNCOV
448
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
449
    }
450

451
    idData->index = i;
7,579✔
452
    metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version,
7,579✔
453
              idData->index);
454
  }
455

456
  tdbFree(pKey);
319✔
457
  tdbFree(pVal);
321✔
458
  return TDB_CODE_SUCCESS;
321✔
459
}
460

461
void destroySnapContext(SSnapContext* ctx) {
321✔
462
  tdbTbcClose((TBC*)ctx->pCur);
321✔
463
  taosArrayDestroy(ctx->idList);
321✔
464
  taosHashCleanup(ctx->idVersion);
320✔
465
  taosHashCleanup(ctx->suidInfo);
321✔
466
  taosMemoryFree(ctx);
321!
467
}
321✔
468

469
static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) {
90✔
470
  int32_t            ret = 0;
90✔
471
  SVCreateTbBatchReq reqs = {0};
90✔
472

473
  reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
90✔
474
  if (NULL == reqs.pArray) {
90!
UNCOV
475
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
UNCOV
476
    goto end;
×
477
  }
478
  if (taosArrayPush(reqs.pArray, req) == NULL) {
180!
UNCOV
479
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
UNCOV
480
    goto end;
×
481
  }
482
  reqs.nReqs = 1;
90✔
483

484
  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
90!
485
  if (ret < 0) {
90!
UNCOV
486
    ret = TAOS_GET_TERRNO(ret);
×
UNCOV
487
    goto end;
×
488
  }
489
  *contLen += sizeof(SMsgHead);
90✔
490
  *pBuf = taosMemoryMalloc(*contLen);
90!
491
  if (NULL == *pBuf) {
90!
UNCOV
492
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
UNCOV
493
    goto end;
×
494
  }
495
  SEncoder coder = {0};
90✔
496
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
90✔
497
  ret = tEncodeSVCreateTbBatchReq(&coder, &reqs);
90✔
498
  tEncoderClear(&coder);
90✔
499

500
  if (ret < 0) {
90!
UNCOV
501
    taosMemoryFreeClear(*pBuf);
×
UNCOV
502
    ret = TAOS_GET_TERRNO(ret);
×
503
    goto end;
×
504
  }
505

506
end:
90✔
507
  taosArrayDestroy(reqs.pArray);
90✔
508
  return ret;
90✔
509
}
510

511
static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) {
40✔
512
  int32_t ret = 0;
40✔
513
  tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
40!
514
  if (ret < 0) {
40!
UNCOV
515
    return TAOS_GET_TERRNO(ret);
×
516
  }
517

518
  *contLen += sizeof(SMsgHead);
40✔
519
  *pBuf = taosMemoryMalloc(*contLen);
40!
520
  if (NULL == *pBuf) {
40!
UNCOV
521
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
522
  }
523

524
  SEncoder encoder = {0};
40✔
525
  tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
40✔
526
  ret = tEncodeSVCreateStbReq(&encoder, req);
40✔
527
  tEncoderClear(&encoder);
40✔
528
  if (ret < 0) {
40!
UNCOV
529
    taosMemoryFreeClear(*pBuf);
×
UNCOV
530
    return TAOS_GET_TERRNO(ret);
×
531
  }
532
  return 0;
40✔
533
}
534

535
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
2,547✔
536
  if (uid == 0) {
2,547✔
537
    ctx->index = 0;
55✔
538
    return 0;
55✔
539
  }
540

541
  SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
2,492✔
542
  if (idInfo == NULL) {
2,492!
UNCOV
543
    return terrno;
×
544
  }
545

546
  ctx->index = idInfo->index;
2,492✔
547

548
  return 0;
2,492✔
549
}
550

551
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
2,526✔
552
  bool            ret = false;
2,526✔
553
  SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL);
2,526✔
554
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
2,527!
555
    ret = true;
4✔
556
  }
557
  tDeleteSchemaWrapper(schema);
558
  ctx->hasPrimaryKey = ret;
2,526✔
559
}
2,526✔
560

561
bool taosXGetTablePrimaryKey(SSnapContext* ctx) { return ctx->hasPrimaryKey; }
5,068✔
562

563
int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
148✔
564
  int32_t ret = 0;
148✔
565
  void*   pKey = NULL;
148✔
566
  void*   pVal = NULL;
148✔
567
  int     vLen = 0, kLen = 0;
148✔
568

UNCOV
569
  while (1) {
×
570
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
148✔
571
      metaDebug("tmqsnap get meta end");
18!
572
      ctx->index = 0;
18✔
573
      ctx->queryMeta = 0;  // change to get data
18✔
574
      return 0;
18✔
575
    }
576

577
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
130✔
578
    if (uidTmp == NULL) {
130!
UNCOV
579
      metaError("tmqsnap get meta null uid");
×
UNCOV
580
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
581
    }
582
    ctx->index++;
130✔
583
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
130✔
584
    if (!idInfo) {
130!
UNCOV
585
      metaError("meta/snap: null idInfo");
×
UNCOV
586
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
587
    }
588

589
    *uid = *uidTmp;
130✔
590
    ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
130✔
591
    if (ret == 0) {
130!
592
      break;
130✔
593
    }
UNCOV
594
    metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version);
×
595
  }
596

597
  ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
130✔
598
  if (ret < 0) {
130!
UNCOV
599
    return TAOS_GET_TERRNO(ret);
×
600
  }
601
  SDecoder   dc = {0};
130✔
602
  SMetaEntry me = {0};
130✔
603
  tDecoderInit(&dc, pVal, vLen);
130✔
604
  ret = metaDecodeEntry(&dc, &me);
130✔
605
  if (ret < 0) {
130!
UNCOV
606
    tDecoderClear(&dc);
×
UNCOV
607
    ret = TAOS_GET_TERRNO(ret);
×
608
    goto END;
×
609
  }
610
  metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
130!
611

612
  if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
130✔
613
      (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
92✔
614
    SVCreateStbReq req = {0};
40✔
615
    req.name = me.name;
40✔
616
    req.suid = me.uid;
40✔
617
    req.schemaRow = me.stbEntry.schemaRow;
40✔
618
    req.schemaTag = me.stbEntry.schemaTag;
40✔
619
    req.schemaRow.version = 1;
40✔
620
    req.schemaTag.version = 1;
40✔
621
    req.colCmpr = me.colCmpr;
40✔
622

623
    ret = buildSuperTableInfo(&req, pBuf, contLen);
40✔
624
    *type = TDMT_VND_CREATE_STB;
40✔
625
  } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
90✔
626
             (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
91!
627
    STableInfoForChildTable* data =
628
        (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
78✔
629
    if (!data) {
78!
UNCOV
630
      metaError("meta/snap: null data");
×
UNCOV
631
      ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
632
      goto END;
×
633
    }
634

635
    SVCreateTbReq req = {0};
78✔
636

637
    req.type = TSDB_CHILD_TABLE;
78✔
638
    req.name = me.name;
78✔
639
    req.uid = me.uid;
78✔
640
    req.commentLen = -1;
78✔
641
    req.ctb.suid = me.ctbEntry.suid;
78✔
642
    req.ctb.tagNum = data->tagRow->nCols;
78✔
643
    req.ctb.stbName = data->tableName;
78✔
644

645
    SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
78✔
646
    if (tagName == NULL) {
78!
UNCOV
647
      metaError("meta/snap: init tag name failed.");
×
UNCOV
648
      ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
649
      goto END;
×
650
    }
651
    STag* p = (STag*)me.ctbEntry.pTags;
78✔
652
    if (tTagIsJson(p)) {
78✔
653
      if (p->nTag != 0) {
12✔
654
        SSchema* schema = &data->tagRow->pSchema[0];
6✔
655
        if (taosArrayPush(tagName, schema->name) == NULL) {
12!
UNCOV
656
          ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
UNCOV
657
          taosArrayDestroy(tagName);
×
658
          goto END;
×
659
        }
660
      }
661
    } else {
662
      SArray* pTagVals = NULL;
66✔
663
      ret = tTagToValArray((const STag*)p, &pTagVals);
66✔
664
      if (ret != 0) {
66!
UNCOV
665
        metaError("meta/snap: tag to val array failed.");
×
UNCOV
666
        taosArrayDestroy(pTagVals);
×
667
        taosArrayDestroy(tagName);
×
668
        goto END;
×
669
      }
670
      int16_t nCols = taosArrayGetSize(pTagVals);
66✔
671
      for (int j = 0; j < nCols; ++j) {
232✔
672
        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
166✔
673
        for (int i = 0; pTagVal && i < data->tagRow->nCols; i++) {
680!
674
          SSchema* schema = &data->tagRow->pSchema[i];
514✔
675
          if (schema->colId == pTagVal->cid) {
514✔
676
            if (taosArrayPush(tagName, schema->name) == NULL) {
332!
UNCOV
677
              ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
UNCOV
678
              taosArrayDestroy(pTagVals);
×
679
              taosArrayDestroy(tagName);
×
680
              goto END;
×
681
            }
682
          }
683
        }
684
      }
685
      taosArrayDestroy(pTagVals);
66✔
686
    }
687
    req.ctb.pTag = me.ctbEntry.pTags;
78✔
688
    req.ctb.tagName = tagName;
78✔
689
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
78✔
690
    *type = TDMT_VND_CREATE_TABLE;
78✔
691
    taosArrayDestroy(tagName);
78✔
692
  } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
12!
693
    SVCreateTbReq req = {0};
12✔
694
    req.type = TSDB_NORMAL_TABLE;
12✔
695
    req.name = me.name;
12✔
696
    req.uid = me.uid;
12✔
697
    req.commentLen = -1;
12✔
698
    req.ntb.schemaRow = me.ntbEntry.schemaRow;
12✔
699
    req.colCmpr = me.colCmpr;
12✔
700
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
12✔
701
    *type = TDMT_VND_CREATE_TABLE;
12✔
702
  } else {
UNCOV
703
    metaError("meta/snap: invalid topic sub type: %" PRId8 " get meta from snap failed.", ctx->subType);
×
UNCOV
704
    ret = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
×
705
  }
706

707
END:
130✔
708
  tDecoderClear(&dc);
130✔
709
  return ret;
130✔
710
}
711

712
int32_t getMetaTableInfoFromSnapshot(SSnapContext* ctx, SMetaTableInfo* result) {
5,049✔
713
  void* pKey = NULL;
5,049✔
714
  void* pVal = NULL;
5,049✔
715
  int   vLen, kLen;
716

717
  while (1) {
65✔
718
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
5,114✔
719
      metaDebug("tmqsnap get uid info end");
37✔
720
      return 0;
37✔
721
    }
722
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
5,078✔
723
    if (uidTmp == NULL) {
5,078!
UNCOV
724
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
725
    }
726
    ctx->index++;
5,078✔
727
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
5,078✔
728
    if (!idInfo) {
5,077!
UNCOV
729
      metaError("meta/snap: null idInfo");
×
UNCOV
730
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
731
    }
732

733
    int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
5,077✔
734
    if (ret != 0) {
5,070!
UNCOV
735
      metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp,
×
736
                idInfo->version);
737
      continue;
65✔
738
    }
739
    ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
5,070✔
740
    if (ret != 0) {
5,071!
UNCOV
741
      return TAOS_GET_TERRNO(ret);
×
742
    }
743
    SDecoder   dc = {0};
5,071✔
744
    SMetaEntry me = {0};
5,071✔
745
    tDecoderInit(&dc, pVal, vLen);
5,071✔
746
    ret = metaDecodeEntry(&dc, &me);
5,075✔
747
    if (ret != 0) {
5,067!
UNCOV
748
      tDecoderClear(&dc);
×
UNCOV
749
      return TAOS_GET_TERRNO(ret);
×
750
    }
751
    metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
5,067✔
752

753
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
5,067✔
754
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
4,058!
755
      STableInfoForChildTable* data =
756
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
1,011✔
757
      if (data == NULL) {
1,011!
UNCOV
758
        tDecoderClear(&dc);
×
UNCOV
759
        metaError("meta/snap: null data");
×
760
        return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
761
      }
762
      result->suid = me.ctbEntry.suid;
1,011✔
763
      result->schema = tCloneSSchemaWrapper(data->schemaRow);
2,022!
764
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
4,056✔
765
      result->suid = 0;
3,993!
766
      result->schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
4,004✔
767
    } else {
768
      metaDebug("tmqsnap get uid continue");
63✔
769
      tDecoderClear(&dc);
63✔
770
      continue;
65✔
771
    }
772
    result->uid = me.uid;
5,015✔
773
    tstrncpy(result->tbName, me.name, TSDB_TABLE_NAME_LEN);
5,015✔
774
    tDecoderClear(&dc);
5,015✔
775
    if (result->schema == NULL) {
5,017!
UNCOV
776
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
777
    }
778
    break;
5,017✔
779
  }
780
  return 0;
5,017✔
781
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc