• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3535

23 Nov 2024 02:07AM UTC coverage: 60.85% (+0.03%) from 60.825%
#3535

push

travis-ci

web-flow
Merge pull request #28893 from taosdata/doc/internal

refact: rename taos lib name

120252 of 252737 branches covered (47.58%)

Branch coverage included in aggregate %.

201187 of 275508 relevant lines covered (73.02%)

15886166.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.77
/source/dnode/vnode/src/meta/metaSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17

18
// SMetaSnapReader ========================================
19
struct SMetaSnapReader {
20
  SMeta*  pMeta;
21
  int64_t sver;
22
  int64_t ever;
23
  TBC*    pTbc;
24
};
25

26
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
104✔
27
  int32_t          code = 0;
104✔
28
  int32_t          lino;
29
  int32_t          c = 0;
104✔
30
  SMetaSnapReader* pReader = NULL;
104✔
31

32
  // alloc
33
  pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
104✔
34
  if (pReader == NULL) {
104!
35
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
36
  }
37
  pReader->pMeta = pMeta;
104✔
38
  pReader->sver = sver;
104✔
39
  pReader->ever = ever;
104✔
40

41
  // impl
42
  code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
104✔
43
  TSDB_CHECK_CODE(code, lino, _exit);
104!
44

45
  code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
104✔
46
  TSDB_CHECK_CODE(code, lino, _exit);
104!
47

48
_exit:
104✔
49
  if (code) {
104!
50
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
51
    metaSnapReaderClose(&pReader);
×
52
    *ppReader = NULL;
×
53
  } else {
54
    metaInfo("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
104!
55
    *ppReader = pReader;
104✔
56
  }
57
  return code;
104✔
58
}
59

60
void metaSnapReaderClose(SMetaSnapReader** ppReader) {
104✔
61
  if (ppReader && *ppReader) {
104!
62
    tdbTbcClose((*ppReader)->pTbc);
104✔
63
    taosMemoryFree(*ppReader);
104✔
64
    *ppReader = NULL;
104✔
65
  }
66
}
104✔
67

68
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
77,844✔
69
  int32_t     code = 0;
77,844✔
70
  const void* pKey = NULL;
77,844✔
71
  const void* pData = NULL;
77,844✔
72
  int32_t     nKey = 0;
77,844✔
73
  int32_t     nData = 0;
77,844✔
74
  STbDbKey    key;
75
  SMetaInfo   info;
76

77
  *ppData = NULL;
77,844✔
78
  for (;;) {
×
79
    if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData)) {
77,844✔
80
      goto _exit;
104✔
81
    }
82

83
    key = ((STbDbKey*)pKey)[0];
77,740✔
84
    if (key.version > pReader->ever) {
77,740!
85
      goto _exit;
×
86
    }
87

88
    if (key.version < pReader->sver  //
77,740!
89
        || metaGetInfo(pReader->pMeta, key.uid, &info, NULL) == TSDB_CODE_NOT_FOUND) {
77,740!
90
      if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
×
91
        metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
92
      }
93
      continue;
×
94
    }
95

96
    if (!pData || !nData) {
77,740!
97
      metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData);
×
98
      goto _exit;
×
99
    }
100

101
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
77,740✔
102
    if (*ppData == NULL) {
77,740!
103
      code = terrno;
×
104
      goto _exit;
×
105
    }
106

107
    SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
77,740✔
108
    pHdr->type = SNAP_DATA_META;
77,740✔
109
    pHdr->size = nData;
77,740✔
110
    memcpy(pHdr->data, pData, nData);
77,740✔
111

112
    metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
77,740!
113
              TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
114

115
    if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
77,740!
116
      metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
117
    }
118
    break;
77,740✔
119
  }
120

121
_exit:
77,844✔
122
  if (code) {
77,844!
123
    metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode),
×
124
              tstrerror(code));
125
  }
126
  return code;
77,844✔
127
}
128

129
// SMetaSnapWriter ========================================
130
struct SMetaSnapWriter {
131
  SMeta*  pMeta;
132
  int64_t sver;
133
  int64_t ever;
134
};
135

136
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
94✔
137
  int32_t          code = 0;
94✔
138
  int32_t          lino;
139
  SMetaSnapWriter* pWriter;
140

141
  // alloc
142
  pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
94✔
143
  if (pWriter == NULL) {
94!
144
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
145
  }
146
  pWriter->pMeta = pMeta;
94✔
147
  pWriter->sver = sver;
94✔
148
  pWriter->ever = ever;
94✔
149

150
  code = metaBegin(pMeta, META_BEGIN_HEAP_NIL);
94✔
151
  TSDB_CHECK_CODE(code, lino, _exit);
94!
152

153
_exit:
94✔
154
  if (code) {
94!
155
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
156
    taosMemoryFree(pWriter);
×
157
    *ppWriter = NULL;
×
158
  } else {
159
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
94!
160
    *ppWriter = pWriter;
94✔
161
  }
162
  return code;
94✔
163
}
164

165
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
94✔
166
  int32_t          code = 0;
94✔
167
  SMetaSnapWriter* pWriter = *ppWriter;
94✔
168

169
  if (rollback) {
94!
170
    metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
×
171
    code = metaAbort(pWriter->pMeta);
×
172
    metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
×
173
             code);
174
    if (code) goto _err;
×
175
  } else {
176
    code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
94✔
177
    if (code) goto _err;
94!
178
    code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
94✔
179
    if (code) goto _err;
94!
180
  }
181
  taosMemoryFree(pWriter);
94✔
182
  *ppWriter = NULL;
94✔
183

184
  return code;
94✔
185

186
_err:
×
187
  metaError("vgId:%d, meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
×
188
  return code;
×
189
}
190

191
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
77,732✔
192
  int32_t    code = 0;
77,732✔
193
  int32_t    lino = 0;
77,732✔
194
  SMeta*     pMeta = pWriter->pMeta;
77,732✔
195
  SMetaEntry metaEntry = {0};
77,732✔
196
  SDecoder*  pDecoder = &(SDecoder){0};
77,732✔
197

198
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
77,732✔
199
  code = metaDecodeEntry(pDecoder, &metaEntry);
77,732✔
200
  TSDB_CHECK_CODE(code, lino, _exit);
77,732!
201

202
  code = metaHandleEntry(pMeta, &metaEntry);
77,732✔
203
  TSDB_CHECK_CODE(code, lino, _exit);
77,732!
204

205
_exit:
77,732✔
206
  if (code) {
77,732!
207
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
208
  }
209
  tDecoderClear(pDecoder);
77,732✔
210
  return code;
77,732✔
211
}
212

213
typedef struct STableInfoForChildTable {
214
  char*           tableName;
215
  SSchemaWrapper* schemaRow;
216
  SSchemaWrapper* tagRow;
217
} STableInfoForChildTable;
218

219
static void destroySTableInfoForChildTable(void* data) {
498✔
220
  STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
498✔
221
  taosMemoryFree(pData->tableName);
498✔
222
  tDeleteSchemaWrapper(pData->schemaRow);
498!
223
  tDeleteSchemaWrapper(pData->tagRow);
498!
224
}
498✔
225

226
static int32_t MoveToSnapShotVersion(SSnapContext* ctx) {
324✔
227
  int32_t code = 0;
324✔
228
  tdbTbcClose((TBC*)ctx->pCur);
324✔
229
  code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
324✔
230
  if (code != 0) {
324!
231
    return TAOS_GET_TERRNO(code);
×
232
  }
233
  STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
324✔
234
  int      c = 0;
324✔
235
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
324✔
236
  if (code != 0) {
324!
237
    return TAOS_GET_TERRNO(code);
×
238
  }
239
  if (c < 0) {
324✔
240
    if (tdbTbcMoveToPrev((TBC*)ctx->pCur) != 0) {
3!
241
      metaTrace("vgId:%d, vnode snapshot move to prev failed", TD_VID(ctx->pMeta->pVnode));
×
242
    }
243
  }
244
  return 0;
324✔
245
}
246

247
static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) {
5,211✔
248
  tdbTbcClose((TBC*)ctx->pCur);
5,211✔
249
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
5,218✔
250
  if (code != 0) {
5,221!
251
    return TAOS_GET_TERRNO(code);
×
252
  }
253
  STbDbKey key = {.version = ver, .uid = uid};
5,221✔
254
  int      c = 0;
5,221✔
255
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
5,221✔
256
  if (code != 0) {
5,216!
257
    return TAOS_GET_TERRNO(code);
×
258
  }
259
  return c;
5,216✔
260
}
261

262
static int32_t MoveToFirst(SSnapContext* ctx) {
323✔
263
  tdbTbcClose((TBC*)ctx->pCur);
323✔
264
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
323✔
265
  if (code != 0) {
324!
266
    return TAOS_GET_TERRNO(code);
×
267
  }
268
  code = tdbTbcMoveToFirst((TBC*)ctx->pCur);
324✔
269
  if (code != 0) {
323!
270
    return TAOS_GET_TERRNO(code);
×
271
  }
272
  return 0;
323✔
273
}
274

275
static int32_t saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) {
497✔
276
  STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
497✔
277
  if (data) {
497!
278
    return 0;
×
279
  }
280
  int32_t                 code = 0;
497✔
281
  STableInfoForChildTable dataTmp = {0};
497✔
282
  dataTmp.tableName = taosStrdup(me->name);
497✔
283
  if (dataTmp.tableName == NULL) {
498!
284
    code = terrno;
×
285
    goto END;
×
286
  }
287
  dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
498!
288
  if (dataTmp.schemaRow == NULL) {
498!
289
    code = TSDB_CODE_OUT_OF_MEMORY;
×
290
    goto END;
×
291
  }
292
  dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
498!
293
  if (dataTmp.tagRow == NULL) {
498!
294
    code = TSDB_CODE_OUT_OF_MEMORY;
×
295
    goto END;
×
296
  }
297
  code = taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
498✔
298
  if (code != 0) {
498!
299
    goto END;
×
300
  }
301
  return 0;
498✔
302

303
END:
×
304
  destroySTableInfoForChildTable(&dataTmp);
×
305
  return TAOS_GET_TERRNO(code);
×
306
  ;
307
}
308

309
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
324✔
310
                         SSnapContext** ctxRet) {
311
  SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
324✔
312
  if (ctx == NULL) {
324!
313
    return terrno;
×
314
  }
315
  *ctxRet = ctx;
324✔
316
  ctx->pMeta = pVnode->pMeta;
324✔
317
  ctx->snapVersion = snapVersion;
324✔
318
  ctx->suid = suid;
324✔
319
  ctx->subType = subType;
324✔
320
  ctx->queryMeta = withMeta;
324✔
321
  ctx->withMeta = withMeta;
324✔
322
  ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
324✔
323
  if (ctx->idVersion == NULL) {
324!
324
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
325
  }
326

327
  ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
324✔
328
  if (ctx->suidInfo == NULL) {
324!
329
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
330
    ;
331
  }
332
  taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);
324✔
333

334
  ctx->index = 0;
323✔
335
  ctx->idList = taosArrayInit(100, sizeof(int64_t));
323✔
336
  if (ctx->idList == NULL) {
324!
337
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
338
    ;
339
  }
340
  void* pKey = NULL;
324✔
341
  void* pVal = NULL;
324✔
342
  int   vLen = 0, kLen = 0;
324✔
343

344
  metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
324✔
345
  int32_t code = MoveToFirst(ctx);
324✔
346
  if (code != 0) {
323!
347
    return code;
×
348
  }
349
  while (1) {
8,110✔
350
    int32_t ret = tdbTbcNext((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
8,433✔
351
    if (ret < 0) break;
8,311✔
352
    STbDbKey* tmp = (STbDbKey*)pKey;
7,991✔
353
    if (tmp->version > ctx->snapVersion) break;
7,991✔
354

355
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
7,987✔
356
    if (idData) {
7,928✔
357
      continue;
402✔
358
    }
359

360
    if (tdbTbGet(ctx->pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
7,642✔
361
        0) {  // check if table exist for now, need optimize later
362
      continue;
103✔
363
    }
364

365
    SDecoder   dc = {0};
7,709✔
366
    SMetaEntry me = {0};
7,709✔
367
    tDecoderInit(&dc, pVal, vLen);
7,709✔
368
    ret = metaDecodeEntry(&dc, &me);
7,692✔
369
    if (ret < 0) {
7,499!
370
      tDecoderClear(&dc);
×
371
      return TAOS_GET_TERRNO(ret);
×
372
    }
373
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
7,499✔
374
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
807✔
375
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
803✔
376
        tDecoderClear(&dc);
13✔
377
        continue;
13✔
378
      }
379
    }
380

381
    if (taosArrayPush(ctx->idList, &tmp->uid) == NULL) {
14,995!
382
      tDecoderClear(&dc);
×
383
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
384
    }
385
    metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
7,509✔
386
    tDecoderClear(&dc);
7,509✔
387

388
    SIdInfo info = {0};
7,685✔
389
    if (taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo)) != 0) {
7,685!
390
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
391
    }
392
  }
393
  taosHashClear(ctx->idVersion);
324✔
394

395
  code = MoveToSnapShotVersion(ctx);
324✔
396
  if (code != 0) {
324!
397
    return code;
×
398
  }
399
  while (1) {
8,059✔
400
    int32_t ret = tdbTbcPrev((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
8,383✔
401
    if (ret < 0) break;
8,209✔
402

403
    STbDbKey* tmp = (STbDbKey*)pKey;
7,885✔
404
    SIdInfo*  idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
7,885✔
405
    if (idData) {
7,776✔
406
      continue;
380✔
407
    }
408
    SIdInfo info = {.version = tmp->version, .index = 0};
7,409✔
409
    ret = taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
7,409✔
410
    if (ret != 0) {
7,645!
411
      return TAOS_GET_TERRNO(ret);
×
412
    }
413

414
    SDecoder   dc = {0};
7,645✔
415
    SMetaEntry me = {0};
7,645✔
416
    tDecoderInit(&dc, pVal, vLen);
7,645✔
417
    ret = metaDecodeEntry(&dc, &me);
7,613✔
418
    if (ret < 0) {
7,342!
419
      tDecoderClear(&dc);
×
420
      return TAOS_GET_TERRNO(ret);
×
421
    }
422

423
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
7,342✔
424
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
807✔
425
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
803✔
426
        tDecoderClear(&dc);
13✔
427
        continue;
13✔
428
      }
429
    }
430

431
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
7,329✔
432
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
6,905✔
433
      ret = saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
491✔
434
      if (ret != 0) {
498!
435
        tDecoderClear(&dc);
×
436
        return ret;
×
437
      }
438
    }
439
    tDecoderClear(&dc);
7,336✔
440
  }
441

442
  for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
8,070✔
443
    int64_t* uid = taosArrayGet(ctx->idList, i);
7,747✔
444
    if (uid == NULL) {
7,747!
445
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
446
    }
447
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
7,747✔
448
    if (!idData) {
7,746!
449
      metaError("meta/snap: null idData");
×
450
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
451
    }
452

453
    idData->index = i;
7,746✔
454
    metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version,
7,746✔
455
              idData->index);
456
  }
457

458
  tdbFree(pKey);
323✔
459
  tdbFree(pVal);
324✔
460
  return TDB_CODE_SUCCESS;
324✔
461
}
462

463
void destroySnapContext(SSnapContext* ctx) {
324✔
464
  tdbTbcClose((TBC*)ctx->pCur);
324✔
465
  taosArrayDestroy(ctx->idList);
324✔
466
  taosHashCleanup(ctx->idVersion);
324✔
467
  taosHashCleanup(ctx->suidInfo);
324✔
468
  taosMemoryFree(ctx);
324✔
469
}
324✔
470

471
static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) {
91✔
472
  int32_t            ret = 0;
91✔
473
  SVCreateTbBatchReq reqs = {0};
91✔
474

475
  reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
91✔
476
  if (NULL == reqs.pArray) {
91!
477
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
478
    goto end;
×
479
  }
480
  if (taosArrayPush(reqs.pArray, req) == NULL) {
182!
481
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
482
    goto end;
×
483
  }
484
  reqs.nReqs = 1;
91✔
485

486
  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
91!
487
  if (ret < 0) {
91!
488
    ret = TAOS_GET_TERRNO(ret);
×
489
    goto end;
×
490
  }
491
  *contLen += sizeof(SMsgHead);
91✔
492
  *pBuf = taosMemoryMalloc(*contLen);
91✔
493
  if (NULL == *pBuf) {
91!
494
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
495
    goto end;
×
496
  }
497
  SEncoder coder = {0};
91✔
498
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
91✔
499
  ret = tEncodeSVCreateTbBatchReq(&coder, &reqs);
91✔
500
  tEncoderClear(&coder);
91✔
501

502
  if (ret < 0) {
91!
503
    taosMemoryFreeClear(*pBuf);
×
504
    ret = TAOS_GET_TERRNO(ret);
×
505
    goto end;
×
506
  }
507

508
end:
91✔
509
  taosArrayDestroy(reqs.pArray);
91✔
510
  return ret;
91✔
511
}
512

513
static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) {
40✔
514
  int32_t ret = 0;
40✔
515
  tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
40!
516
  if (ret < 0) {
40!
517
    return TAOS_GET_TERRNO(ret);
×
518
  }
519

520
  *contLen += sizeof(SMsgHead);
40✔
521
  *pBuf = taosMemoryMalloc(*contLen);
40✔
522
  if (NULL == *pBuf) {
40!
523
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
524
  }
525

526
  SEncoder encoder = {0};
40✔
527
  tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
40✔
528
  ret = tEncodeSVCreateStbReq(&encoder, req);
40✔
529
  tEncoderClear(&encoder);
40✔
530
  if (ret < 0) {
40!
531
    taosMemoryFreeClear(*pBuf);
×
532
    return TAOS_GET_TERRNO(ret);
×
533
  }
534
  return 0;
40✔
535
}
536

537
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
2,544✔
538
  if (uid == 0) {
2,544✔
539
    ctx->index = 0;
55✔
540
    return 0;
55✔
541
  }
542

543
  SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
2,489✔
544
  if (idInfo == NULL) {
2,489!
545
    return terrno;
×
546
  }
547

548
  ctx->index = idInfo->index;
2,489✔
549

550
  return 0;
2,489✔
551
}
552

553
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
2,527✔
554
  bool            ret = false;
2,527✔
555
  SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL);
2,527✔
556
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
2,526!
557
    ret = true;
4✔
558
  }
559
  tDeleteSchemaWrapper(schema);
560
  ctx->hasPrimaryKey = ret;
2,528✔
561
}
2,528✔
562

563
bool taosXGetTablePrimaryKey(SSnapContext* ctx) { return ctx->hasPrimaryKey; }
5,062✔
564

565
int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
149✔
566
  int32_t ret = 0;
149✔
567
  void*   pKey = NULL;
149✔
568
  void*   pVal = NULL;
149✔
569
  int     vLen = 0, kLen = 0;
149✔
570

571
  while (1) {
×
572
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
149✔
573
      metaDebug("tmqsnap get meta end");
18!
574
      ctx->index = 0;
18✔
575
      ctx->queryMeta = 0;  // change to get data
18✔
576
      return 0;
18✔
577
    }
578

579
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
131✔
580
    if (uidTmp == NULL) {
131!
581
      metaError("tmqsnap get meta null uid");
×
582
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
583
    }
584
    ctx->index++;
131✔
585
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
131✔
586
    if (!idInfo) {
131!
587
      metaError("meta/snap: null idInfo");
×
588
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
589
    }
590

591
    *uid = *uidTmp;
131✔
592
    ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
131✔
593
    if (ret == 0) {
131!
594
      break;
131✔
595
    }
596
    metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version);
×
597
  }
598

599
  ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
131✔
600
  if (ret < 0) {
131!
601
    return TAOS_GET_TERRNO(ret);
×
602
  }
603
  SDecoder   dc = {0};
131✔
604
  SMetaEntry me = {0};
131✔
605
  tDecoderInit(&dc, pVal, vLen);
131✔
606
  ret = metaDecodeEntry(&dc, &me);
131✔
607
  if (ret < 0) {
131!
608
    tDecoderClear(&dc);
×
609
    ret = TAOS_GET_TERRNO(ret);
×
610
    goto END;
×
611
  }
612
  metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
131!
613

614
  if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
131✔
615
      (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
93✔
616
    SVCreateStbReq req = {0};
40✔
617
    req.name = me.name;
40✔
618
    req.suid = me.uid;
40✔
619
    req.schemaRow = me.stbEntry.schemaRow;
40✔
620
    req.schemaTag = me.stbEntry.schemaTag;
40✔
621
    req.schemaRow.version = 1;
40✔
622
    req.schemaTag.version = 1;
40✔
623
    req.colCmpr = me.colCmpr;
40✔
624

625
    ret = buildSuperTableInfo(&req, pBuf, contLen);
40✔
626
    *type = TDMT_VND_CREATE_STB;
40✔
627
  } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
91✔
628
             (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
93!
629
    STableInfoForChildTable* data =
630
        (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
79✔
631
    if (!data) {
79!
632
      metaError("meta/snap: null data");
×
633
      ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
634
      goto END;
×
635
    }
636

637
    SVCreateTbReq req = {0};
79✔
638

639
    req.type = TSDB_CHILD_TABLE;
79✔
640
    req.name = me.name;
79✔
641
    req.uid = me.uid;
79✔
642
    req.commentLen = -1;
79✔
643
    req.ctb.suid = me.ctbEntry.suid;
79✔
644
    req.ctb.tagNum = data->tagRow->nCols;
79✔
645
    req.ctb.stbName = data->tableName;
79✔
646

647
    SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
79✔
648
    if (tagName == NULL) {
79!
649
      metaError("meta/snap: init tag name failed.");
×
650
      ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
651
      goto END;
×
652
    }
653
    STag* p = (STag*)me.ctbEntry.pTags;
79✔
654
    if (tTagIsJson(p)) {
79✔
655
      if (p->nTag != 0) {
12✔
656
        SSchema* schema = &data->tagRow->pSchema[0];
6✔
657
        if (taosArrayPush(tagName, schema->name) == NULL) {
12!
658
          ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
659
          taosArrayDestroy(tagName);
×
660
          goto END;
×
661
        }
662
      }
663
    } else {
664
      SArray* pTagVals = NULL;
67✔
665
      ret = tTagToValArray((const STag*)p, &pTagVals);
67✔
666
      if (ret != 0) {
67!
667
        metaError("meta/snap: tag to val array failed.");
×
668
        taosArrayDestroy(pTagVals);
×
669
        taosArrayDestroy(tagName);
×
670
        goto END;
×
671
      }
672
      int16_t nCols = taosArrayGetSize(pTagVals);
67✔
673
      for (int j = 0; j < nCols; ++j) {
234✔
674
        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
167✔
675
        for (int i = 0; pTagVal && i < data->tagRow->nCols; i++) {
682!
676
          SSchema* schema = &data->tagRow->pSchema[i];
515✔
677
          if (schema->colId == pTagVal->cid) {
515✔
678
            if (taosArrayPush(tagName, schema->name) == NULL) {
334!
679
              ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
680
              taosArrayDestroy(pTagVals);
×
681
              taosArrayDestroy(tagName);
×
682
              goto END;
×
683
            }
684
          }
685
        }
686
      }
687
      taosArrayDestroy(pTagVals);
67✔
688
    }
689
    req.ctb.pTag = me.ctbEntry.pTags;
79✔
690
    req.ctb.tagName = tagName;
79✔
691
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
79✔
692
    *type = TDMT_VND_CREATE_TABLE;
79✔
693
    taosArrayDestroy(tagName);
79✔
694
  } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
12!
695
    SVCreateTbReq req = {0};
12✔
696
    req.type = TSDB_NORMAL_TABLE;
12✔
697
    req.name = me.name;
12✔
698
    req.uid = me.uid;
12✔
699
    req.commentLen = -1;
12✔
700
    req.ntb.schemaRow = me.ntbEntry.schemaRow;
12✔
701
    req.colCmpr = me.colCmpr;
12✔
702
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
12✔
703
    *type = TDMT_VND_CREATE_TABLE;
12✔
704
  } else {
705
    metaError("meta/snap: invalid topic sub type: %" PRId8 " get meta from snap failed.", ctx->subType);
×
706
    ret = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
×
707
  }
708

709
END:
131✔
710
  tDecoderClear(&dc);
131✔
711
  return ret;
131✔
712
}
713

714
int32_t getMetaTableInfoFromSnapshot(SSnapContext* ctx, SMetaTableInfo* result) {
5,048✔
715
  void* pKey = NULL;
5,048✔
716
  void* pVal = NULL;
5,048✔
717
  int   vLen, kLen;
718

719
  while (1) {
73✔
720
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
5,121✔
721
      metaDebug("tmqsnap get uid info end");
35✔
722
      return 0;
35✔
723
    }
724
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
5,085✔
725
    if (uidTmp == NULL) {
5,083!
726
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
727
    }
728
    ctx->index++;
5,083✔
729
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
5,083✔
730
    if (!idInfo) {
5,078!
731
      metaError("meta/snap: null idInfo");
×
732
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
733
    }
734

735
    int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
5,078✔
736
    if (ret != 0) {
5,085✔
737
      metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp,
9!
738
                idInfo->version);
739
      continue;
73✔
740
    }
741
    ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
5,076✔
742
    if (ret != 0) {
5,072!
743
      return TAOS_GET_TERRNO(ret);
×
744
    }
745
    SDecoder   dc = {0};
5,072✔
746
    SMetaEntry me = {0};
5,072✔
747
    tDecoderInit(&dc, pVal, vLen);
5,072✔
748
    ret = metaDecodeEntry(&dc, &me);
5,077✔
749
    if (ret != 0) {
5,071!
750
      tDecoderClear(&dc);
×
751
      return TAOS_GET_TERRNO(ret);
×
752
    }
753
    metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
5,071✔
754

755
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
5,071✔
756
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
4,063!
757
      STableInfoForChildTable* data =
758
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
1,011✔
759
      if (data == NULL) {
1,012!
760
        tDecoderClear(&dc);
×
761
        metaError("meta/snap: null data");
×
762
        return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
763
      }
764
      result->suid = me.ctbEntry.suid;
1,012✔
765
      result->schema = tCloneSSchemaWrapper(data->schemaRow);
2,023!
766
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
4,060✔
767
      result->suid = 0;
3,995!
768
      result->schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
4,007✔
769
    } else {
770
      metaDebug("tmqsnap get uid continue");
65✔
771
      tDecoderClear(&dc);
65✔
772
      continue;
64✔
773
    }
774
    result->uid = me.uid;
5,018✔
775
    tstrncpy(result->tbName, me.name, TSDB_TABLE_NAME_LEN);
5,018✔
776
    tDecoderClear(&dc);
5,018✔
777
    if (result->schema == NULL) {
5,015!
778
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
779
    }
780
    break;
5,015✔
781
  }
782
  return 0;
5,015✔
783
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc