• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/meta/metaSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17

18
// SMetaSnapReader ========================================
19
struct SMetaSnapReader {
20
  SMeta*  pMeta;
21
  int64_t sver;
22
  int64_t ever;
23
  TBC*    pTbc;
24
  int32_t iLoop;
25
};
26

UNCOV
27
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
×
UNCOV
28
  int32_t          code = 0;
×
29
  int32_t          lino;
UNCOV
30
  int32_t          c = 0;
×
UNCOV
31
  SMetaSnapReader* pReader = NULL;
×
32

33
  // alloc
UNCOV
34
  pReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
×
UNCOV
35
  if (pReader == NULL) {
×
36
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
37
  }
UNCOV
38
  pReader->pMeta = pMeta;
×
UNCOV
39
  pReader->sver = sver;
×
UNCOV
40
  pReader->ever = ever;
×
41

42
  // impl
UNCOV
43
  code = tdbTbcOpen(pMeta->pTbDb, &pReader->pTbc, NULL);
×
UNCOV
44
  TSDB_CHECK_CODE(code, lino, _exit);
×
45

UNCOV
46
  code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
×
UNCOV
47
  TSDB_CHECK_CODE(code, lino, _exit);
×
48

UNCOV
49
_exit:
×
UNCOV
50
  if (code) {
×
51
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
52
    metaSnapReaderClose(&pReader);
×
53
    *ppReader = NULL;
×
54
  } else {
UNCOV
55
    metaInfo("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
×
UNCOV
56
    *ppReader = pReader;
×
57
  }
UNCOV
58
  return code;
×
59
}
60

UNCOV
61
void metaSnapReaderClose(SMetaSnapReader** ppReader) {
×
UNCOV
62
  if (ppReader && *ppReader) {
×
UNCOV
63
    tdbTbcClose((*ppReader)->pTbc);
×
UNCOV
64
    taosMemoryFree(*ppReader);
×
UNCOV
65
    *ppReader = NULL;
×
66
  }
UNCOV
67
}
×
68

69
extern int metaDecodeEntryImpl(SDecoder* pCoder, SMetaEntry* pME, bool headerOnly);
70

UNCOV
71
static int32_t metaDecodeEntryHeader(void* data, int32_t size, SMetaEntry* entry) {
×
UNCOV
72
  SDecoder decoder = {0};
×
UNCOV
73
  tDecoderInit(&decoder, (uint8_t*)data, size);
×
74

UNCOV
75
  int32_t code = metaDecodeEntryImpl(&decoder, entry, true);
×
UNCOV
76
  if (code) {
×
77
    tDecoderClear(&decoder);
×
78
    return code;
×
79
  }
80

UNCOV
81
  tDecoderClear(&decoder);
×
UNCOV
82
  return 0;
×
83
}
84

UNCOV
85
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
×
UNCOV
86
  int32_t     code = 0;
×
UNCOV
87
  const void* pKey = NULL;
×
UNCOV
88
  const void* pData = NULL;
×
UNCOV
89
  int32_t     nKey = 0;
×
UNCOV
90
  int32_t     nData = 0;
×
91
  STbDbKey    key;
92
  int32_t     c;
93

UNCOV
94
  *ppData = NULL;
×
UNCOV
95
  while (pReader->iLoop < 2) {
×
UNCOV
96
    if (tdbTbcGet(pReader->pTbc, &pKey, &nKey, &pData, &nData) != 0 || ((STbDbKey*)pKey)->version > pReader->ever) {
×
UNCOV
97
      pReader->iLoop++;
×
98

99
      // Reopen the cursor to read from the beginning
UNCOV
100
      tdbTbcClose(pReader->pTbc);
×
UNCOV
101
      pReader->pTbc = NULL;
×
UNCOV
102
      code = tdbTbcOpen(pReader->pMeta->pTbDb, &pReader->pTbc, NULL);
×
UNCOV
103
      if (code) {
×
104
        metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
×
105
                  tstrerror(code));
106
        goto _exit;
×
107
      }
108

UNCOV
109
      code = tdbTbcMoveTo(pReader->pTbc, &(STbDbKey){.version = pReader->sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
×
UNCOV
110
      if (code) {
×
111
        metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
×
112
                  tstrerror(code));
113
        goto _exit;
×
114
      }
115

UNCOV
116
      continue;
×
117
    }
118

119
    // Decode meta entry
UNCOV
120
    SMetaEntry entry = {0};
×
UNCOV
121
    code = metaDecodeEntryHeader((void*)pData, nData, &entry);
×
UNCOV
122
    if (code) {
×
123
      metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pReader->pMeta->pVnode), __func__, __FILE__, __LINE__,
×
124
                tstrerror(code));
125
      goto _exit;
×
126
    }
127

UNCOV
128
    key = ((STbDbKey*)pKey)[0];
×
UNCOV
129
    if (key.version < pReader->sver                                       //
×
UNCOV
130
        || (pReader->iLoop == 0 && TABS(entry.type) != TSDB_SUPER_TABLE)  // First loop send super table entry
×
UNCOV
131
        || (pReader->iLoop == 1 && TABS(entry.type) == TSDB_SUPER_TABLE)  // Second loop send non-super table entry
×
132
    ) {
UNCOV
133
      if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
×
134
        metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
135
      }
UNCOV
136
      continue;
×
137
    }
138

UNCOV
139
    if (!pData || !nData) {
×
140
      metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData);
×
141
      goto _exit;
×
142
    }
143

UNCOV
144
    *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
×
UNCOV
145
    if (*ppData == NULL) {
×
146
      code = terrno;
×
147
      goto _exit;
×
148
    }
149

UNCOV
150
    SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
×
UNCOV
151
    pHdr->type = SNAP_DATA_META;
×
UNCOV
152
    pHdr->size = nData;
×
UNCOV
153
    memcpy(pHdr->data, pData, nData);
×
154

UNCOV
155
    metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
×
156
              TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
157

UNCOV
158
    if (tdbTbcMoveToNext(pReader->pTbc) != 0) {
×
159
      metaTrace("vgId:%d, vnode snapshot meta read data done", TD_VID(pReader->pMeta->pVnode));
×
160
    }
UNCOV
161
    break;
×
162
  }
163

UNCOV
164
_exit:
×
UNCOV
165
  if (code) {
×
166
    metaError("vgId:%d, vnode snapshot meta read data failed since %s", TD_VID(pReader->pMeta->pVnode),
×
167
              tstrerror(code));
168
  }
UNCOV
169
  return code;
×
170
}
171

172
// SMetaSnapWriter ========================================
173
struct SMetaSnapWriter {
174
  SMeta*  pMeta;
175
  int64_t sver;
176
  int64_t ever;
177
};
178

UNCOV
179
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter) {
×
UNCOV
180
  int32_t          code = 0;
×
181
  int32_t          lino;
182
  SMetaSnapWriter* pWriter;
183

184
  // alloc
UNCOV
185
  pWriter = (SMetaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
×
UNCOV
186
  if (pWriter == NULL) {
×
187
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
188
  }
UNCOV
189
  pWriter->pMeta = pMeta;
×
UNCOV
190
  pWriter->sver = sver;
×
UNCOV
191
  pWriter->ever = ever;
×
192

UNCOV
193
  code = metaBegin(pMeta, META_BEGIN_HEAP_NIL);
×
UNCOV
194
  TSDB_CHECK_CODE(code, lino, _exit);
×
195

UNCOV
196
_exit:
×
UNCOV
197
  if (code) {
×
198
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
199
    taosMemoryFree(pWriter);
×
200
    *ppWriter = NULL;
×
201
  } else {
UNCOV
202
    metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
×
UNCOV
203
    *ppWriter = pWriter;
×
204
  }
UNCOV
205
  return code;
×
206
}
207

UNCOV
208
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
×
UNCOV
209
  int32_t          code = 0;
×
UNCOV
210
  SMetaSnapWriter* pWriter = *ppWriter;
×
211

UNCOV
212
  if (rollback) {
×
213
    metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
×
214
    code = metaAbort(pWriter->pMeta);
×
215
    metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
×
216
             code);
217
    if (code) goto _err;
×
218
  } else {
UNCOV
219
    code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
×
UNCOV
220
    if (code) goto _err;
×
UNCOV
221
    code = metaFinishCommit(pWriter->pMeta, pWriter->pMeta->txn);
×
UNCOV
222
    if (code) goto _err;
×
223
  }
UNCOV
224
  taosMemoryFree(pWriter);
×
UNCOV
225
  *ppWriter = NULL;
×
226

UNCOV
227
  return code;
×
228

229
_err:
×
230
  metaError("vgId:%d, meta snapshot writer close failed since %s", TD_VID(pWriter->pMeta->pVnode), tstrerror(code));
×
231
  return code;
×
232
}
233

UNCOV
234
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
×
UNCOV
235
  int32_t    code = 0;
×
UNCOV
236
  int32_t    lino = 0;
×
UNCOV
237
  SMeta*     pMeta = pWriter->pMeta;
×
UNCOV
238
  SMetaEntry metaEntry = {0};
×
UNCOV
239
  SDecoder*  pDecoder = &(SDecoder){0};
×
240

UNCOV
241
  tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
×
UNCOV
242
  code = metaDecodeEntry(pDecoder, &metaEntry);
×
UNCOV
243
  TSDB_CHECK_CODE(code, lino, _exit);
×
244

UNCOV
245
  metaHandleSyncEntry(pMeta, &metaEntry);
×
246

UNCOV
247
_exit:
×
UNCOV
248
  if (code) {
×
249
    metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
250
  }
UNCOV
251
  tDecoderClear(pDecoder);
×
UNCOV
252
  return code;
×
253
}
254

255
typedef struct STableInfoForChildTable {
256
  char*           tableName;
257
  SSchemaWrapper* schemaRow;
258
  SSchemaWrapper* tagRow;
259
} STableInfoForChildTable;
260

UNCOV
261
static void destroySTableInfoForChildTable(void* data) {
×
UNCOV
262
  STableInfoForChildTable* pData = (STableInfoForChildTable*)data;
×
UNCOV
263
  taosMemoryFree(pData->tableName);
×
UNCOV
264
  tDeleteSchemaWrapper(pData->schemaRow);
×
UNCOV
265
  tDeleteSchemaWrapper(pData->tagRow);
×
UNCOV
266
}
×
267

UNCOV
268
static int32_t MoveToSnapShotVersion(SSnapContext* ctx) {
×
UNCOV
269
  int32_t code = 0;
×
UNCOV
270
  tdbTbcClose((TBC*)ctx->pCur);
×
UNCOV
271
  code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
×
UNCOV
272
  if (code != 0) {
×
273
    return TAOS_GET_TERRNO(code);
×
274
  }
UNCOV
275
  STbDbKey key = {.version = ctx->snapVersion, .uid = INT64_MAX};
×
UNCOV
276
  int      c = 0;
×
UNCOV
277
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
×
UNCOV
278
  if (code != 0) {
×
279
    return TAOS_GET_TERRNO(code);
×
280
  }
UNCOV
281
  if (c < 0) {
×
UNCOV
282
    if (tdbTbcMoveToPrev((TBC*)ctx->pCur) != 0) {
×
283
      metaTrace("vgId:%d, vnode snapshot move to prev failed", TD_VID(ctx->pMeta->pVnode));
×
284
    }
285
  }
UNCOV
286
  return 0;
×
287
}
288

UNCOV
289
static int32_t MoveToPosition(SSnapContext* ctx, int64_t ver, int64_t uid) {
×
UNCOV
290
  tdbTbcClose((TBC*)ctx->pCur);
×
UNCOV
291
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
×
UNCOV
292
  if (code != 0) {
×
293
    return TAOS_GET_TERRNO(code);
×
294
  }
UNCOV
295
  STbDbKey key = {.version = ver, .uid = uid};
×
UNCOV
296
  int      c = 0;
×
UNCOV
297
  code = tdbTbcMoveTo((TBC*)ctx->pCur, &key, sizeof(key), &c);
×
UNCOV
298
  if (code != 0) {
×
UNCOV
299
    return TAOS_GET_TERRNO(code);
×
300
  }
UNCOV
301
  return c;
×
302
}
303

UNCOV
304
static int32_t MoveToFirst(SSnapContext* ctx) {
×
UNCOV
305
  tdbTbcClose((TBC*)ctx->pCur);
×
UNCOV
306
  int32_t code = tdbTbcOpen(ctx->pMeta->pTbDb, (TBC**)&ctx->pCur, NULL);
×
UNCOV
307
  if (code != 0) {
×
308
    return TAOS_GET_TERRNO(code);
×
309
  }
UNCOV
310
  code = tdbTbcMoveToFirst((TBC*)ctx->pCur);
×
UNCOV
311
  if (code != 0) {
×
312
    return TAOS_GET_TERRNO(code);
×
313
  }
UNCOV
314
  return 0;
×
315
}
316

UNCOV
317
static int32_t saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) {
×
UNCOV
318
  STableInfoForChildTable* data = (STableInfoForChildTable*)taosHashGet(suidInfo, &me->uid, sizeof(tb_uid_t));
×
UNCOV
319
  if (data) {
×
320
    return 0;
×
321
  }
UNCOV
322
  int32_t                 code = 0;
×
UNCOV
323
  STableInfoForChildTable dataTmp = {0};
×
UNCOV
324
  dataTmp.tableName = taosStrdup(me->name);
×
UNCOV
325
  if (dataTmp.tableName == NULL) {
×
326
    code = terrno;
×
327
    goto END;
×
328
  }
UNCOV
329
  dataTmp.schemaRow = tCloneSSchemaWrapper(&me->stbEntry.schemaRow);
×
UNCOV
330
  if (dataTmp.schemaRow == NULL) {
×
331
    code = TSDB_CODE_OUT_OF_MEMORY;
×
332
    goto END;
×
333
  }
UNCOV
334
  dataTmp.tagRow = tCloneSSchemaWrapper(&me->stbEntry.schemaTag);
×
UNCOV
335
  if (dataTmp.tagRow == NULL) {
×
336
    code = TSDB_CODE_OUT_OF_MEMORY;
×
337
    goto END;
×
338
  }
UNCOV
339
  code = taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
×
UNCOV
340
  if (code != 0) {
×
341
    goto END;
×
342
  }
UNCOV
343
  return 0;
×
344

345
END:
×
346
  destroySTableInfoForChildTable(&dataTmp);
×
347
  return TAOS_GET_TERRNO(code);
×
348
  ;
349
}
350

UNCOV
351
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
×
352
                         SSnapContext** ctxRet) {
UNCOV
353
  SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
×
UNCOV
354
  if (ctx == NULL) {
×
355
    return terrno;
×
356
  }
UNCOV
357
  *ctxRet = ctx;
×
UNCOV
358
  ctx->pMeta = pVnode->pMeta;
×
UNCOV
359
  ctx->snapVersion = snapVersion;
×
UNCOV
360
  ctx->suid = suid;
×
UNCOV
361
  ctx->subType = subType;
×
UNCOV
362
  ctx->queryMeta = withMeta;
×
UNCOV
363
  ctx->withMeta = withMeta;
×
UNCOV
364
  ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
UNCOV
365
  if (ctx->idVersion == NULL) {
×
366
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
367
  }
368

UNCOV
369
  ctx->suidInfo = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
UNCOV
370
  if (ctx->suidInfo == NULL) {
×
371
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
372
  }
UNCOV
373
  taosHashSetFreeFp(ctx->suidInfo, destroySTableInfoForChildTable);
×
374

UNCOV
375
  ctx->index = 0;
×
UNCOV
376
  ctx->idList = taosArrayInit(100, sizeof(int64_t));
×
UNCOV
377
  if (ctx->idList == NULL) {
×
378
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
379
    ;
380
  }
UNCOV
381
  void* pKey = NULL;
×
UNCOV
382
  void* pVal = NULL;
×
UNCOV
383
  int   vLen = 0, kLen = 0;
×
384

UNCOV
385
  metaDebug("tmqsnap init snapVersion:%" PRIi64, ctx->snapVersion);
×
UNCOV
386
  int32_t code = MoveToFirst(ctx);
×
UNCOV
387
  if (code != 0) {
×
388
    return code;
×
389
  }
UNCOV
390
  while (1) {
×
UNCOV
391
    int32_t ret = tdbTbcNext((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
×
UNCOV
392
    if (ret < 0) break;
×
UNCOV
393
    STbDbKey* tmp = (STbDbKey*)pKey;
×
UNCOV
394
    if (tmp->version > ctx->snapVersion) break;
×
395

UNCOV
396
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
×
UNCOV
397
    if (idData) {
×
UNCOV
398
      continue;
×
399
    }
400

UNCOV
401
    if (tdbTbGet(ctx->pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
×
402
        0) {  // check if table exist for now, need optimize later
UNCOV
403
      continue;
×
404
    }
405

UNCOV
406
    SDecoder   dc = {0};
×
UNCOV
407
    SMetaEntry me = {0};
×
UNCOV
408
    tDecoderInit(&dc, pVal, vLen);
×
UNCOV
409
    ret = metaDecodeEntry(&dc, &me);
×
UNCOV
410
    if (ret < 0) {
×
411
      tDecoderClear(&dc);
×
412
      return TAOS_GET_TERRNO(ret);
×
413
    }
UNCOV
414
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
×
UNCOV
415
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
×
UNCOV
416
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
×
UNCOV
417
        tDecoderClear(&dc);
×
UNCOV
418
        continue;
×
419
      }
420
    }
421

UNCOV
422
    if (taosArrayPush(ctx->idList, &tmp->uid) == NULL) {
×
423
      tDecoderClear(&dc);
×
424
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
425
    }
UNCOV
426
    metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
×
UNCOV
427
    tDecoderClear(&dc);
×
428

UNCOV
429
    SIdInfo info = {0};
×
UNCOV
430
    if (taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo)) != 0) {
×
431
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
432
    }
433
  }
UNCOV
434
  taosHashClear(ctx->idVersion);
×
435

UNCOV
436
  code = MoveToSnapShotVersion(ctx);
×
UNCOV
437
  if (code != 0) {
×
438
    return code;
×
439
  }
UNCOV
440
  while (1) {
×
UNCOV
441
    int32_t ret = tdbTbcPrev((TBC*)ctx->pCur, &pKey, &kLen, &pVal, &vLen);
×
UNCOV
442
    if (ret < 0) break;
×
443

UNCOV
444
    STbDbKey* tmp = (STbDbKey*)pKey;
×
UNCOV
445
    SIdInfo*  idData = (SIdInfo*)taosHashGet(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t));
×
UNCOV
446
    if (idData) {
×
UNCOV
447
      continue;
×
448
    }
UNCOV
449
    SIdInfo info = {.version = tmp->version, .index = 0};
×
UNCOV
450
    ret = taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo));
×
UNCOV
451
    if (ret != 0) {
×
452
      return TAOS_GET_TERRNO(ret);
×
453
    }
454

UNCOV
455
    SDecoder   dc = {0};
×
UNCOV
456
    SMetaEntry me = {0};
×
UNCOV
457
    tDecoderInit(&dc, pVal, vLen);
×
UNCOV
458
    ret = metaDecodeEntry(&dc, &me);
×
UNCOV
459
    if (ret < 0) {
×
460
      tDecoderClear(&dc);
×
461
      return TAOS_GET_TERRNO(ret);
×
462
    }
463

UNCOV
464
    if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
×
UNCOV
465
      if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
×
UNCOV
466
          (me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
×
UNCOV
467
        tDecoderClear(&dc);
×
UNCOV
468
        continue;
×
469
      }
470
    }
471

UNCOV
472
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
×
UNCOV
473
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
×
UNCOV
474
      ret = saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
×
UNCOV
475
      if (ret != 0) {
×
476
        tDecoderClear(&dc);
×
477
        return ret;
×
478
      }
479
    }
UNCOV
480
    tDecoderClear(&dc);
×
481
  }
482

UNCOV
483
  for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
×
UNCOV
484
    int64_t* uid = taosArrayGet(ctx->idList, i);
×
UNCOV
485
    if (uid == NULL) {
×
486
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
487
    }
UNCOV
488
    SIdInfo* idData = (SIdInfo*)taosHashGet(ctx->idVersion, uid, sizeof(int64_t));
×
UNCOV
489
    if (!idData) {
×
490
      metaError("meta/snap: null idData");
×
491
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
492
    }
493

UNCOV
494
    idData->index = i;
×
UNCOV
495
    metaDebug("tmqsnap init idVersion uid:%" PRIi64 " version:%" PRIi64 " index:%d", *uid, idData->version,
×
496
              idData->index);
497
  }
498

UNCOV
499
  tdbFree(pKey);
×
UNCOV
500
  tdbFree(pVal);
×
UNCOV
501
  return TDB_CODE_SUCCESS;
×
502
}
503

UNCOV
504
void destroySnapContext(SSnapContext* ctx) {
×
UNCOV
505
  tdbTbcClose((TBC*)ctx->pCur);
×
UNCOV
506
  taosArrayDestroy(ctx->idList);
×
UNCOV
507
  taosHashCleanup(ctx->idVersion);
×
UNCOV
508
  taosHashCleanup(ctx->suidInfo);
×
UNCOV
509
  taosMemoryFree(ctx);
×
UNCOV
510
}
×
511

UNCOV
512
static int32_t buildNormalChildTableInfo(SVCreateTbReq* req, void** pBuf, int32_t* contLen) {
×
UNCOV
513
  int32_t            ret = 0;
×
UNCOV
514
  SVCreateTbBatchReq reqs = {0};
×
515

UNCOV
516
  reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
×
UNCOV
517
  if (NULL == reqs.pArray) {
×
518
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
519
    goto end;
×
520
  }
UNCOV
521
  if (taosArrayPush(reqs.pArray, req) == NULL) {
×
522
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
523
    goto end;
×
524
  }
UNCOV
525
  reqs.nReqs = 1;
×
526

UNCOV
527
  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
×
UNCOV
528
  if (ret < 0) {
×
529
    ret = TAOS_GET_TERRNO(ret);
×
530
    goto end;
×
531
  }
UNCOV
532
  *contLen += sizeof(SMsgHead);
×
UNCOV
533
  *pBuf = taosMemoryMalloc(*contLen);
×
UNCOV
534
  if (NULL == *pBuf) {
×
535
    ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
536
    goto end;
×
537
  }
UNCOV
538
  SEncoder coder = {0};
×
UNCOV
539
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
×
UNCOV
540
  ret = tEncodeSVCreateTbBatchReq(&coder, &reqs);
×
UNCOV
541
  tEncoderClear(&coder);
×
542

UNCOV
543
  if (ret < 0) {
×
544
    taosMemoryFreeClear(*pBuf);
×
545
    ret = TAOS_GET_TERRNO(ret);
×
546
    goto end;
×
547
  }
548

UNCOV
549
end:
×
UNCOV
550
  taosArrayDestroy(reqs.pArray);
×
UNCOV
551
  return ret;
×
552
}
553

UNCOV
554
static int32_t buildSuperTableInfo(SVCreateStbReq* req, void** pBuf, int32_t* contLen) {
×
UNCOV
555
  int32_t ret = 0;
×
UNCOV
556
  tEncodeSize(tEncodeSVCreateStbReq, req, *contLen, ret);
×
UNCOV
557
  if (ret < 0) {
×
558
    return TAOS_GET_TERRNO(ret);
×
559
  }
560

UNCOV
561
  *contLen += sizeof(SMsgHead);
×
UNCOV
562
  *pBuf = taosMemoryMalloc(*contLen);
×
UNCOV
563
  if (NULL == *pBuf) {
×
564
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
565
  }
566

UNCOV
567
  SEncoder encoder = {0};
×
UNCOV
568
  tEncoderInit(&encoder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *contLen);
×
UNCOV
569
  ret = tEncodeSVCreateStbReq(&encoder, req);
×
UNCOV
570
  tEncoderClear(&encoder);
×
UNCOV
571
  if (ret < 0) {
×
572
    taosMemoryFreeClear(*pBuf);
×
573
    return TAOS_GET_TERRNO(ret);
×
574
  }
UNCOV
575
  return 0;
×
576
}
577

UNCOV
578
int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) {
×
UNCOV
579
  if (uid == 0) {
×
UNCOV
580
    ctx->index = 0;
×
UNCOV
581
    return 0;
×
582
  }
583

UNCOV
584
  SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, &uid, sizeof(tb_uid_t));
×
UNCOV
585
  if (idInfo == NULL) {
×
UNCOV
586
    return terrno;
×
587
  }
588

UNCOV
589
  ctx->index = idInfo->index;
×
590

UNCOV
591
  return 0;
×
592
}
593

UNCOV
594
void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) {
×
UNCOV
595
  bool            ret = false;
×
UNCOV
596
  SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1);
×
UNCOV
597
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
×
598
    ret = true;
×
599
  }
600
  tDeleteSchemaWrapper(schema);
UNCOV
601
  ctx->hasPrimaryKey = ret;
×
UNCOV
602
}
×
603

UNCOV
604
bool taosXGetTablePrimaryKey(SSnapContext* ctx) { return ctx->hasPrimaryKey; }
×
605

UNCOV
606
int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid) {
×
UNCOV
607
  int32_t ret = 0;
×
UNCOV
608
  void*   pKey = NULL;
×
UNCOV
609
  void*   pVal = NULL;
×
UNCOV
610
  int     vLen = 0, kLen = 0;
×
611

612
  while (1) {
×
UNCOV
613
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
×
UNCOV
614
      metaDebug("tmqsnap get meta end");
×
UNCOV
615
      ctx->index = 0;
×
UNCOV
616
      ctx->queryMeta = 0;  // change to get data
×
UNCOV
617
      return 0;
×
618
    }
619

UNCOV
620
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
×
UNCOV
621
    if (uidTmp == NULL) {
×
622
      metaError("tmqsnap get meta null uid");
×
623
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
624
    }
UNCOV
625
    ctx->index++;
×
UNCOV
626
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
×
UNCOV
627
    if (!idInfo) {
×
628
      metaError("meta/snap: null idInfo");
×
629
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
630
    }
631

UNCOV
632
    *uid = *uidTmp;
×
UNCOV
633
    ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
×
UNCOV
634
    if (ret == 0) {
×
UNCOV
635
      break;
×
636
    }
637
    metaDebug("tmqsnap get meta not exist uid:%" PRIi64 " version:%" PRIi64, *uid, idInfo->version);
×
638
  }
639

UNCOV
640
  ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
×
UNCOV
641
  if (ret < 0) {
×
642
    return TAOS_GET_TERRNO(ret);
×
643
  }
UNCOV
644
  SDecoder   dc = {0};
×
UNCOV
645
  SMetaEntry me = {0};
×
UNCOV
646
  tDecoderInit(&dc, pVal, vLen);
×
UNCOV
647
  ret = metaDecodeEntry(&dc, &me);
×
UNCOV
648
  if (ret < 0) {
×
649
    tDecoderClear(&dc);
×
650
    ret = TAOS_GET_TERRNO(ret);
×
651
    goto END;
×
652
  }
UNCOV
653
  metaDebug("tmqsnap get meta uid:%" PRIi64 " name:%s index:%d", *uid, me.name, ctx->index - 1);
×
654

UNCOV
655
  if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_SUPER_TABLE) ||
×
UNCOV
656
      (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
×
UNCOV
657
    SVCreateStbReq req = {0};
×
UNCOV
658
    req.name = me.name;
×
UNCOV
659
    req.suid = me.uid;
×
UNCOV
660
    req.schemaRow = me.stbEntry.schemaRow;
×
UNCOV
661
    req.schemaTag = me.stbEntry.schemaTag;
×
UNCOV
662
    req.schemaRow.version = 1;
×
UNCOV
663
    req.schemaTag.version = 1;
×
UNCOV
664
    req.colCmpr = me.colCmpr;
×
665

UNCOV
666
    ret = buildSuperTableInfo(&req, pBuf, contLen);
×
UNCOV
667
    *type = TDMT_VND_CREATE_STB;
×
UNCOV
668
  } else if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
×
UNCOV
669
             (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
×
670
    STableInfoForChildTable* data =
UNCOV
671
        (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
×
UNCOV
672
    if (!data) {
×
673
      metaError("meta/snap: null data");
×
674
      ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
675
      goto END;
×
676
    }
677

UNCOV
678
    SVCreateTbReq req = {0};
×
679

UNCOV
680
    req.type = TSDB_CHILD_TABLE;
×
UNCOV
681
    req.name = me.name;
×
UNCOV
682
    req.uid = me.uid;
×
UNCOV
683
    req.commentLen = -1;
×
UNCOV
684
    req.ctb.suid = me.ctbEntry.suid;
×
UNCOV
685
    req.ctb.tagNum = data->tagRow->nCols;
×
UNCOV
686
    req.ctb.stbName = data->tableName;
×
687

UNCOV
688
    SArray* tagName = taosArrayInit(req.ctb.tagNum, TSDB_COL_NAME_LEN);
×
UNCOV
689
    if (tagName == NULL) {
×
690
      metaError("meta/snap: init tag name failed.");
×
691
      ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
692
      goto END;
×
693
    }
UNCOV
694
    STag* p = (STag*)me.ctbEntry.pTags;
×
UNCOV
695
    if (tTagIsJson(p)) {
×
UNCOV
696
      if (p->nTag != 0) {
×
UNCOV
697
        SSchema* schema = &data->tagRow->pSchema[0];
×
UNCOV
698
        if (taosArrayPush(tagName, schema->name) == NULL) {
×
699
          ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
700
          taosArrayDestroy(tagName);
×
701
          goto END;
×
702
        }
703
      }
704
    } else {
UNCOV
705
      SArray* pTagVals = NULL;
×
UNCOV
706
      ret = tTagToValArray((const STag*)p, &pTagVals);
×
UNCOV
707
      if (ret != 0) {
×
708
        metaError("meta/snap: tag to val array failed.");
×
709
        taosArrayDestroy(pTagVals);
×
710
        taosArrayDestroy(tagName);
×
711
        goto END;
×
712
      }
UNCOV
713
      int16_t nCols = taosArrayGetSize(pTagVals);
×
UNCOV
714
      for (int j = 0; j < nCols; ++j) {
×
UNCOV
715
        STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
×
UNCOV
716
        for (int i = 0; pTagVal && i < data->tagRow->nCols; i++) {
×
UNCOV
717
          SSchema* schema = &data->tagRow->pSchema[i];
×
UNCOV
718
          if (schema->colId == pTagVal->cid) {
×
UNCOV
719
            if (taosArrayPush(tagName, schema->name) == NULL) {
×
720
              ret = TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
721
              taosArrayDestroy(pTagVals);
×
722
              taosArrayDestroy(tagName);
×
723
              goto END;
×
724
            }
725
          }
726
        }
727
      }
UNCOV
728
      taosArrayDestroy(pTagVals);
×
729
    }
UNCOV
730
    req.ctb.pTag = me.ctbEntry.pTags;
×
UNCOV
731
    req.ctb.tagName = tagName;
×
UNCOV
732
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
×
UNCOV
733
    *type = TDMT_VND_CREATE_TABLE;
×
UNCOV
734
    taosArrayDestroy(tagName);
×
UNCOV
735
  } else if (ctx->subType == TOPIC_SUB_TYPE__DB) {
×
UNCOV
736
    SVCreateTbReq req = {0};
×
UNCOV
737
    req.type = TSDB_NORMAL_TABLE;
×
UNCOV
738
    req.name = me.name;
×
UNCOV
739
    req.uid = me.uid;
×
UNCOV
740
    req.commentLen = -1;
×
UNCOV
741
    req.ntb.schemaRow = me.ntbEntry.schemaRow;
×
UNCOV
742
    req.colCmpr = me.colCmpr;
×
UNCOV
743
    ret = buildNormalChildTableInfo(&req, pBuf, contLen);
×
UNCOV
744
    *type = TDMT_VND_CREATE_TABLE;
×
745
  } else {
746
    metaError("meta/snap: invalid topic sub type: %" PRId8 " get meta from snap failed.", ctx->subType);
×
747
    ret = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
×
748
  }
749

UNCOV
750
END:
×
UNCOV
751
  tDecoderClear(&dc);
×
UNCOV
752
  return ret;
×
753
}
754

UNCOV
755
int32_t getMetaTableInfoFromSnapshot(SSnapContext* ctx, SMetaTableInfo* result) {
×
UNCOV
756
  void* pKey = NULL;
×
UNCOV
757
  void* pVal = NULL;
×
758
  int   vLen, kLen;
759

UNCOV
760
  while (1) {
×
UNCOV
761
    if (ctx->index >= taosArrayGetSize(ctx->idList)) {
×
UNCOV
762
      metaDebug("tmqsnap get uid info end");
×
UNCOV
763
      return 0;
×
764
    }
UNCOV
765
    int64_t* uidTmp = taosArrayGet(ctx->idList, ctx->index);
×
UNCOV
766
    if (uidTmp == NULL) {
×
767
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
768
    }
UNCOV
769
    ctx->index++;
×
UNCOV
770
    SIdInfo* idInfo = (SIdInfo*)taosHashGet(ctx->idVersion, uidTmp, sizeof(tb_uid_t));
×
UNCOV
771
    if (!idInfo) {
×
772
      metaError("meta/snap: null idInfo");
×
773
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
774
    }
775

UNCOV
776
    int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
×
UNCOV
777
    if (ret != 0) {
×
778
      metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp,
×
779
                idInfo->version);
UNCOV
780
      continue;
×
781
    }
UNCOV
782
    ret = tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
×
UNCOV
783
    if (ret != 0) {
×
UNCOV
784
      return TAOS_GET_TERRNO(ret);
×
785
    }
UNCOV
786
    SDecoder   dc = {0};
×
UNCOV
787
    SMetaEntry me = {0};
×
UNCOV
788
    tDecoderInit(&dc, pVal, vLen);
×
UNCOV
789
    ret = metaDecodeEntry(&dc, &me);
×
UNCOV
790
    if (ret != 0) {
×
791
      tDecoderClear(&dc);
×
792
      return TAOS_GET_TERRNO(ret);
×
793
    }
UNCOV
794
    metaDebug("tmqsnap get uid info uid:%" PRIi64 " name:%s index:%d", me.uid, me.name, ctx->index - 1);
×
795

UNCOV
796
    if ((ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_CHILD_TABLE) ||
×
UNCOV
797
        (ctx->subType == TOPIC_SUB_TYPE__TABLE && me.type == TSDB_CHILD_TABLE && me.ctbEntry.suid == ctx->suid)) {
×
798
      STableInfoForChildTable* data =
UNCOV
799
          (STableInfoForChildTable*)taosHashGet(ctx->suidInfo, &me.ctbEntry.suid, sizeof(tb_uid_t));
×
UNCOV
800
      if (data == NULL) {
×
801
        tDecoderClear(&dc);
×
802
        metaError("meta/snap: null data");
×
803
        return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
804
      }
UNCOV
805
      result->suid = me.ctbEntry.suid;
×
UNCOV
806
      result->schema = tCloneSSchemaWrapper(data->schemaRow);
×
UNCOV
807
    } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) {
×
UNCOV
808
      result->suid = 0;
×
UNCOV
809
      result->schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
×
810
    } else {
UNCOV
811
      metaDebug("tmqsnap get uid continue");
×
UNCOV
812
      tDecoderClear(&dc);
×
UNCOV
813
      continue;
×
814
    }
UNCOV
815
    result->uid = me.uid;
×
UNCOV
816
    tstrncpy(result->tbName, me.name, TSDB_TABLE_NAME_LEN);
×
UNCOV
817
    tDecoderClear(&dc);
×
UNCOV
818
    if (result->schema == NULL) {
×
819
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
820
    }
UNCOV
821
    break;
×
822
  }
UNCOV
823
  return 0;
×
824
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc