• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3563

21 Dec 2024 05:37AM UTC coverage: 61.045% (+34.4%) from 26.655%
#3563

push

travis-ci

web-flow
Merge pull request #29256 from taosdata/merge/mainto3.0

merge: from main t 3.0

135730 of 287838 branches covered (47.15%)

Branch coverage included in aggregate %.

9 of 28 new or added lines in 5 files covered. (32.14%)

784 existing lines in 21 files now uncovered.

213302 of 283921 relevant lines covered (75.13%)

9176355.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tsdb/tsdbSnapshot.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "tsdbDataFileRW.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSetRW.h"
20
#include "tsdbIter.h"
21
#include "tsdbSttFileRW.h"
22

23
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
24

25
// STsdbSnapReader ========================================
26
struct STsdbSnapReader {
27
  STsdb*  tsdb;
28
  int64_t sver;
29
  int64_t ever;
30
  int8_t  type;
31

32
  SBuffer  buffers[10];
33
  SSkmInfo skmTb[1];
34

35
  TFileSetRangeArray* fsrArr;
36

37
  // context
38
  struct {
39
    int32_t         fsrArrIdx;
40
    STFileSetRange* fsr;
41
    bool            isDataDone;
42
    bool            isTombDone;
43
  } ctx[1];
44

45
  // reader
46
  SDataFileReader*    dataReader;
47
  TSttFileReaderArray sttReaderArr[1];
48

49
  // iter
50
  TTsdbIterArray dataIterArr[1];
51
  SIterMerger*   dataIterMerger;
52
  TTsdbIterArray tombIterArr[1];
53
  SIterMerger*   tombIterMerger;
54

55
  // data
56
  SBlockData blockData[1];
57
  STombBlock tombBlock[1];
58
};
59

60
static void tsdbSnapReadFileSetCloseReader(STsdbSnapReader* reader) {
×
61
  TARRAY2_CLEAR(reader->sttReaderArr, tsdbSttFileReaderClose);
×
62
  tsdbDataFileReaderClose(&reader->dataReader);
×
63
  return;
×
64
}
65

66
static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
×
67
  int32_t code = 0;
×
68
  int32_t lino = 0;
×
69

70
  // data
71
  SDataFileReaderConfig config = {
×
72
      .tsdb = reader->tsdb,
×
73
      .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
×
74
      .buffers = reader->buffers,
×
75
  };
76
  bool hasDataFile = false;
×
77
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
×
78
    if (reader->ctx->fsr->fset->farr[ftype] != NULL) {
×
79
      hasDataFile = true;
×
80
      config.files[ftype].exist = true;
×
81
      config.files[ftype].file = reader->ctx->fsr->fset->farr[ftype]->f[0];
×
82
    }
83
  }
84

85
  if (hasDataFile) {
×
86
    code = tsdbDataFileReaderOpen(NULL, &config, &reader->dataReader);
×
87
    TSDB_CHECK_CODE(code, lino, _exit);
×
88
  }
89

90
  // stt
91
  SSttLvl* lvl;
92
  TARRAY2_FOREACH(reader->ctx->fsr->fset->lvlArr, lvl) {
×
93
    STFileObj* fobj;
94
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
95
      SSttFileReader*      sttReader;
96
      SSttFileReaderConfig config = {
×
97
          .tsdb = reader->tsdb,
×
98
          .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
×
99
          .file = fobj->f[0],
100
          .buffers = reader->buffers,
×
101
      };
102

103
      code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
×
104
      TSDB_CHECK_CODE(code, lino, _exit);
×
105

106
      if ((code = TARRAY2_APPEND(reader->sttReaderArr, sttReader))) {
×
107
        tsdbSttFileReaderClose(&sttReader);
×
108
        TSDB_CHECK_CODE(code, lino, _exit);
×
109
      }
110
    }
111
  }
112

113
_exit:
×
114
  if (code) {
×
115
    tsdbSnapReadFileSetCloseReader(reader);
×
116
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
117
  }
118
  return code;
×
119
}
120

121
static int32_t tsdbSnapReadFileSetOpenIter(STsdbSnapReader* reader) {
×
122
  int32_t code = 0;
×
123
  int32_t lino = 0;
×
124

125
  STsdbIter*      iter;
126
  STsdbIterConfig config = {
×
127
      .filterByVersion = true,
128
      .verRange[0] = reader->ctx->fsr->sver,
×
129
      .verRange[1] = reader->ctx->fsr->ever,
×
130
  };
131

132
  // data file
133
  if (reader->dataReader) {
×
134
    // data
135
    config.type = TSDB_ITER_TYPE_DATA;
×
136
    config.dataReader = reader->dataReader;
×
137

138
    code = tsdbIterOpen(&config, &iter);
×
139
    TSDB_CHECK_CODE(code, lino, _exit);
×
140

141
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
×
142
    TSDB_CHECK_CODE(code, lino, _exit);
×
143

144
    // tomb
145
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
×
146
    config.dataReader = reader->dataReader;
×
147

148
    code = tsdbIterOpen(&config, &iter);
×
149
    TSDB_CHECK_CODE(code, lino, _exit);
×
150

151
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
×
152
    TSDB_CHECK_CODE(code, lino, _exit);
×
153
  }
154

155
  // stt file
156
  SSttFileReader* sttReader;
157
  TARRAY2_FOREACH(reader->sttReaderArr, sttReader) {
×
158
    // data
159
    config.type = TSDB_ITER_TYPE_STT;
×
160
    config.sttReader = sttReader;
×
161

162
    code = tsdbIterOpen(&config, &iter);
×
163
    TSDB_CHECK_CODE(code, lino, _exit);
×
164

165
    code = TARRAY2_APPEND(reader->dataIterArr, iter);
×
166
    TSDB_CHECK_CODE(code, lino, _exit);
×
167

168
    // tomb
169
    config.type = TSDB_ITER_TYPE_STT_TOMB;
×
170
    config.sttReader = sttReader;
×
171

172
    code = tsdbIterOpen(&config, &iter);
×
173
    TSDB_CHECK_CODE(code, lino, _exit);
×
174

175
    code = TARRAY2_APPEND(reader->tombIterArr, iter);
×
176
    TSDB_CHECK_CODE(code, lino, _exit);
×
177
  }
178

179
  // merger
180
  code = tsdbIterMergerOpen(reader->dataIterArr, &reader->dataIterMerger, false);
×
181
  TSDB_CHECK_CODE(code, lino, _exit);
×
182

183
  code = tsdbIterMergerOpen(reader->tombIterArr, &reader->tombIterMerger, true);
×
184
  TSDB_CHECK_CODE(code, lino, _exit);
×
185

186
_exit:
×
187
  if (code) {
×
188
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
189
  }
190
  return code;
×
191
}
192

193
static void tsdbSnapReadFileSetCloseIter(STsdbSnapReader* reader) {
×
194
  tsdbIterMergerClose(&reader->dataIterMerger);
×
195
  tsdbIterMergerClose(&reader->tombIterMerger);
×
196
  TARRAY2_CLEAR(reader->dataIterArr, tsdbIterClose);
×
197
  TARRAY2_CLEAR(reader->tombIterArr, tsdbIterClose);
×
198
  return;
×
199
}
200

201
static int32_t tsdbSnapReadRangeBegin(STsdbSnapReader* reader) {
×
202
  int32_t code = 0;
×
203
  int32_t lino = 0;
×
204

205
  if (reader->ctx->fsrArrIdx < TARRAY2_SIZE(reader->fsrArr)) {
×
206
    reader->ctx->fsr = TARRAY2_GET(reader->fsrArr, reader->ctx->fsrArrIdx++);
×
207
    reader->ctx->isDataDone = false;
×
208
    reader->ctx->isTombDone = false;
×
209

210
    code = tsdbSnapReadFileSetOpenReader(reader);
×
211
    TSDB_CHECK_CODE(code, lino, _exit);
×
212

213
    code = tsdbSnapReadFileSetOpenIter(reader);
×
214
    TSDB_CHECK_CODE(code, lino, _exit);
×
215
  }
216

217
_exit:
×
218
  if (code) {
×
219
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
220
  }
221
  return code;
×
222
}
223

224
static int32_t tsdbSnapReadRangeEnd(STsdbSnapReader* reader) {
×
225
  tsdbSnapReadFileSetCloseIter(reader);
×
226
  tsdbSnapReadFileSetCloseReader(reader);
×
227
  reader->ctx->fsr = NULL;
×
228
  return 0;
×
229
}
230

231
static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
×
232
  int32_t code = 0;
×
233
  int32_t lino = 0;
×
234

235
  SColCompressInfo info;
236

237
  SColCompressInfo cmprInfo = {.pColCmpr = NULL, .defaultCmprAlg = NO_COMPRESSION};
×
238
  code = tBlockDataCompress(reader->blockData, (void*)&cmprInfo, reader->buffers, reader->buffers + 4);
×
239
  TSDB_CHECK_CODE(code, lino, _exit);
×
240

241
  int32_t size = 0;
×
242
  for (int i = 0; i < 4; i++) {
×
243
    size += reader->buffers[i].size;
×
244
  }
245
  *data = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
×
246
  if (*data == NULL) {
×
247
    code = terrno;
×
248
    TSDB_CHECK_CODE(code, lino, _exit);
×
249
  }
250

251
  SSnapDataHdr* pHdr = (SSnapDataHdr*)*data;
×
252
  uint8_t*      pBuf = pHdr->data;
×
253

254
  pHdr->type = reader->type;
×
255
  pHdr->size = size;
×
256
  for (int i = 0; i < 4; i++) {
×
257
    memcpy(pBuf, reader->buffers[i].data, reader->buffers[i].size);
×
258
    pBuf += reader->buffers[i].size;
×
259
  }
260

261
_exit:
×
262
  if (code) {
×
263
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), lino, code);
×
264
  }
265
  return code;
×
266
}
267

268
static int64_t tBlockDataSize(SBlockData* pBlockData) {
×
269
  int64_t nData = 0;
×
270
  for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
×
271
    SColData* pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
×
272
    nData += pColData->nData;
×
273
  }
274
  return nData;
×
275
}
276

277
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) {
×
278
  int32_t   code = 0;
×
279
  int32_t   lino = 0;
×
280
  SMetaInfo info;
281

282
  tBlockDataReset(reader->blockData);
×
283

UNCOV
284
  TABLEID tbid[1] = {0};
×
UNCOV
285
  for (SRowInfo* row; (row = tsdbIterMergerGetData(reader->dataIterMerger));) {
×
286
    // skip dropped table
UNCOV
287
    if (row->uid != tbid->uid) {
×
UNCOV
288
      tbid->suid = row->suid;
×
UNCOV
289
      tbid->uid = row->uid;
×
UNCOV
290
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
×
UNCOV
291
        code = tsdbIterMergerSkipTableData(reader->dataIterMerger, tbid);
×
UNCOV
292
        TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
293
        continue;
×
294
      }
295
    }
296

UNCOV
297
    if (reader->blockData->suid == 0 && reader->blockData->uid == 0) {
×
UNCOV
298
      code = tsdbUpdateSkmTb(reader->tsdb, (TABLEID*)row, reader->skmTb);
×
UNCOV
299
      TSDB_CHECK_CODE(code, lino, _exit);
×
300

UNCOV
301
      TABLEID tbid1 = {
×
UNCOV
302
          .suid = row->suid,
×
UNCOV
303
          .uid = row->suid ? 0 : row->uid,
×
304
      };
UNCOV
305
      code = tBlockDataInit(reader->blockData, &tbid1, reader->skmTb->pTSchema, NULL, 0);
×
UNCOV
306
      TSDB_CHECK_CODE(code, lino, _exit);
×
307
    }
308

UNCOV
309
    if (!TABLE_SAME_SCHEMA(reader->blockData->suid, reader->blockData->uid, row->suid, row->uid)) {
×
UNCOV
310
      break;
×
311
    }
312

UNCOV
313
    code = tBlockDataAppendRow(reader->blockData, &row->row, NULL, row->uid);
×
UNCOV
314
    TSDB_CHECK_CODE(code, lino, _exit);
×
315

UNCOV
316
    code = tsdbIterMergerNext(reader->dataIterMerger);
×
UNCOV
317
    TSDB_CHECK_CODE(code, lino, _exit);
×
318

UNCOV
319
    if (!(reader->blockData->nRow % 16)) {
×
UNCOV
320
      int64_t nData = tBlockDataSize(reader->blockData);
×
UNCOV
321
      if (nData >= TSDB_SNAP_DATA_PAYLOAD_SIZE) {
×
UNCOV
322
        break;
×
323
      }
324
    }
325
  }
326

UNCOV
327
  if (reader->blockData->nRow > 0) {
×
UNCOV
328
    code = tsdbSnapCmprData(reader, data);
×
UNCOV
329
    TSDB_CHECK_CODE(code, lino, _exit);
×
330
  }
331

UNCOV
332
_exit:
×
UNCOV
333
  if (code) {
×
UNCOV
334
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
335
  }
UNCOV
336
  return code;
×
337
}
338

UNCOV
339
static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) {
×
UNCOV
340
  int32_t code = 0;
×
UNCOV
341
  int32_t lino = 0;
×
342

UNCOV
343
  int64_t size = 0;
×
UNCOV
344
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
UNCOV
345
    size += reader->tombBlock->buffers[i].size;
×
346
  }
347

UNCOV
348
  data[0] = taosMemoryMalloc(size + sizeof(SSnapDataHdr));
×
UNCOV
349
  if (data[0] == NULL) {
×
UNCOV
350
    code = terrno;
×
UNCOV
351
    TSDB_CHECK_CODE(code, lino, _exit);
×
352
  }
353

UNCOV
354
  SSnapDataHdr* hdr = (SSnapDataHdr*)(data[0]);
×
UNCOV
355
  hdr->type = SNAP_DATA_DEL;
×
UNCOV
356
  hdr->size = size;
×
357

UNCOV
358
  uint8_t* tdata = hdr->data;
×
UNCOV
359
  for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
×
UNCOV
360
    memcpy(tdata, reader->tombBlock->buffers[i].data, reader->tombBlock->buffers[i].size);
×
UNCOV
361
    tdata += reader->tombBlock->buffers[i].size;
×
362
  }
363

UNCOV
364
_exit:
×
UNCOV
365
  if (code) {
×
UNCOV
366
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
367
  }
UNCOV
368
  return code;
×
369
}
370

UNCOV
371
static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
×
UNCOV
372
  int32_t   code = 0;
×
UNCOV
373
  int32_t   lino = 0;
×
374
  SMetaInfo info;
375

UNCOV
376
  tTombBlockClear(reader->tombBlock);
×
377

UNCOV
378
  TABLEID tbid[1] = {0};
×
UNCOV
379
  for (STombRecord* record; (record = tsdbIterMergerGetTombRecord(reader->tombIterMerger)) != NULL;) {
×
UNCOV
380
    if (record->uid != tbid->uid) {
×
UNCOV
381
      tbid->suid = record->suid;
×
UNCOV
382
      tbid->uid = record->uid;
×
UNCOV
383
      if (metaGetInfo(reader->tsdb->pVnode->pMeta, tbid->uid, &info, NULL) != 0) {
×
UNCOV
384
        code = tsdbIterMergerSkipTableData(reader->tombIterMerger, tbid);
×
UNCOV
385
        TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
386
        continue;
×
387
      }
388
    }
389

UNCOV
390
    code = tTombBlockPut(reader->tombBlock, record);
×
UNCOV
391
    TSDB_CHECK_CODE(code, lino, _exit);
×
392

UNCOV
393
    code = tsdbIterMergerNext(reader->tombIterMerger);
×
UNCOV
394
    TSDB_CHECK_CODE(code, lino, _exit);
×
395

UNCOV
396
    if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) {
×
UNCOV
397
      break;
×
398
    }
399
  }
400

UNCOV
401
  if (TOMB_BLOCK_SIZE(reader->tombBlock) > 0) {
×
UNCOV
402
    code = tsdbSnapCmprTombData(reader, data);
×
UNCOV
403
    TSDB_CHECK_CODE(code, lino, _exit);
×
404
  }
405

UNCOV
406
_exit:
×
UNCOV
407
  if (code) {
×
UNCOV
408
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
409
  }
UNCOV
410
  return code;
×
411
}
412

UNCOV
413
int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
×
414
                           STsdbSnapReader** reader) {
UNCOV
415
  int32_t code = 0;
×
UNCOV
416
  int32_t lino = 0;
×
417

UNCOV
418
  reader[0] = (STsdbSnapReader*)taosMemoryCalloc(1, sizeof(*reader[0]));
×
UNCOV
419
  if (reader[0] == NULL) return terrno;
×
420

UNCOV
421
  reader[0]->tsdb = tsdb;
×
UNCOV
422
  reader[0]->sver = sver;
×
UNCOV
423
  reader[0]->ever = ever;
×
UNCOV
424
  reader[0]->type = type;
×
425

UNCOV
426
  code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, &reader[0]->fsrArr);
×
UNCOV
427
  TSDB_CHECK_CODE(code, lino, _exit);
×
428

UNCOV
429
_exit:
×
UNCOV
430
  if (code) {
×
UNCOV
431
    tsdbError("vgId:%d %s failed at %s:%d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
×
432
              __func__, __FILE__, lino, tstrerror(code), sver, ever, type);
UNCOV
433
    tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr);
×
UNCOV
434
    taosMemoryFree(reader[0]);
×
UNCOV
435
    reader[0] = NULL;
×
436
  } else {
UNCOV
437
    tsdbInfo("vgId:%d, tsdb snapshot incremental reader opened. sver:%" PRId64 " ever:%" PRId64 " type:%d",
×
438
             TD_VID(tsdb->pVnode), sver, ever, type);
439
  }
UNCOV
440
  return code;
×
441
}
442

UNCOV
443
void tsdbSnapReaderClose(STsdbSnapReader** reader) {
×
UNCOV
444
  if (reader[0] == NULL) {
×
UNCOV
445
    return;
×
446
  }
447

UNCOV
448
  int32_t code = 0;
×
449

UNCOV
450
  STsdb* tsdb = reader[0]->tsdb;
×
451

UNCOV
452
  tTombBlockDestroy(reader[0]->tombBlock);
×
UNCOV
453
  tBlockDataDestroy(reader[0]->blockData);
×
454

UNCOV
455
  tsdbIterMergerClose(&reader[0]->dataIterMerger);
×
UNCOV
456
  tsdbIterMergerClose(&reader[0]->tombIterMerger);
×
UNCOV
457
  TARRAY2_DESTROY(reader[0]->dataIterArr, tsdbIterClose);
×
UNCOV
458
  TARRAY2_DESTROY(reader[0]->tombIterArr, tsdbIterClose);
×
UNCOV
459
  TARRAY2_DESTROY(reader[0]->sttReaderArr, tsdbSttFileReaderClose);
×
UNCOV
460
  tsdbDataFileReaderClose(&reader[0]->dataReader);
×
461

UNCOV
462
  tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr);
×
UNCOV
463
  tDestroyTSchema(reader[0]->skmTb->pTSchema);
×
464

UNCOV
465
  for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->buffers); ++i) {
×
UNCOV
466
    tBufferDestroy(reader[0]->buffers + i);
×
467
  }
468

UNCOV
469
  taosMemoryFree(reader[0]);
×
UNCOV
470
  reader[0] = NULL;
×
471

UNCOV
472
  return;
×
473
}
474

UNCOV
475
int32_t tsdbSnapRead(STsdbSnapReader* reader, uint8_t** data) {
×
UNCOV
476
  int32_t code = 0;
×
UNCOV
477
  int32_t lino = 0;
×
478

UNCOV
479
  data[0] = NULL;
×
480

481
  for (;;) {
UNCOV
482
    if (reader->ctx->fsr == NULL) {
×
UNCOV
483
      code = tsdbSnapReadRangeBegin(reader);
×
UNCOV
484
      TSDB_CHECK_CODE(code, lino, _exit);
×
485

UNCOV
486
      if (reader->ctx->fsr == NULL) {
×
UNCOV
487
        break;
×
488
      }
489
    }
490

UNCOV
491
    if (!reader->ctx->isDataDone) {
×
UNCOV
492
      code = tsdbSnapReadTimeSeriesData(reader, data);
×
UNCOV
493
      TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
494
      if (data[0]) {
×
UNCOV
495
        goto _exit;
×
496
      } else {
UNCOV
497
        reader->ctx->isDataDone = true;
×
498
      }
499
    }
500

UNCOV
501
    if (!reader->ctx->isTombDone) {
×
UNCOV
502
      code = tsdbSnapReadTombData(reader, data);
×
UNCOV
503
      TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
504
      if (data[0]) {
×
UNCOV
505
        goto _exit;
×
506
      } else {
UNCOV
507
        reader->ctx->isTombDone = true;
×
508
      }
509
    }
510

UNCOV
511
    code = tsdbSnapReadRangeEnd(reader);
×
UNCOV
512
    TSDB_CHECK_CODE(code, lino, _exit);
×
513
  }
514

UNCOV
515
_exit:
×
UNCOV
516
  if (code) {
×
UNCOV
517
    TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
×
518
  } else {
UNCOV
519
    tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__);
×
520
  }
UNCOV
521
  return code;
×
522
}
523

524
// STsdbSnapWriter ========================================
525
struct STsdbSnapWriter {
526
  STsdb*  tsdb;
527
  int64_t sver;
528
  int64_t ever;
529
  int32_t minutes;
530
  int8_t  precision;
531
  int32_t minRow;
532
  int32_t maxRow;
533
  int8_t  cmprAlg;
534
  int64_t commitID;
535
  int32_t szPage;
536
  int64_t compactVersion;
537
  int64_t now;
538
  SBuffer buffers[10];
539

540
  TFileSetArray* fsetArr;
541
  TFileOpArray   fopArr[1];
542

543
  struct {
544
    bool       fsetWriteBegin;
545
    int32_t    fid;
546
    STFileSet* fset;
547
    SDiskID    did;
548
    bool       hasData;  // if have time series data
549
    bool       hasTomb;  // if have tomb data
550

551
    // reader
552
    SDataFileReader*    dataReader;
553
    TSttFileReaderArray sttReaderArr[1];
554

555
    // iter/merger
556
    TTsdbIterArray dataIterArr[1];
557
    SIterMerger*   dataIterMerger;
558
    TTsdbIterArray tombIterArr[1];
559
    SIterMerger*   tombIterMerger;
560

561
    // writer
562
    SFSetWriter* fsetWriter;
563
  } ctx[1];
564
};
565

566
// APIs
UNCOV
567
static int32_t tsdbSnapWriteTimeSeriesRow(STsdbSnapWriter* writer, SRowInfo* row) {
×
UNCOV
568
  int32_t   code = 0;
×
UNCOV
569
  int32_t   lino = 0;
×
UNCOV
570
  TABLEID   tbid = {0};
×
571
  SMetaInfo info;
572

UNCOV
573
  while (writer->ctx->hasData) {
×
574
    SRowInfo* row1;
575
    for (;;) {
UNCOV
576
      row1 = tsdbIterMergerGetData(writer->ctx->dataIterMerger);
×
UNCOV
577
      if (row1 == NULL) {
×
UNCOV
578
        writer->ctx->hasData = false;
×
UNCOV
579
      } else if (row1->uid != tbid.uid) {
×
UNCOV
580
        tbid.suid = row1->suid;
×
UNCOV
581
        tbid.uid = row1->uid;
×
UNCOV
582
        if (metaGetInfo(writer->tsdb->pVnode->pMeta, tbid.uid, &info, NULL) != 0) {
×
UNCOV
583
          code = tsdbIterMergerSkipTableData(writer->ctx->dataIterMerger, &tbid);
×
UNCOV
584
          TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
585
          continue;
×
586
        }
587
      }
UNCOV
588
      break;
×
589
    }
590

UNCOV
591
    if (writer->ctx->hasData == false) {
×
UNCOV
592
      break;
×
593
    }
594

UNCOV
595
    int32_t c = tRowInfoCmprFn(row1, row);
×
UNCOV
596
    if (c <= 0) {
×
UNCOV
597
      code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row1);
×
UNCOV
598
      TSDB_CHECK_CODE(code, lino, _exit);
×
599

UNCOV
600
      code = tsdbIterMergerNext(writer->ctx->dataIterMerger);
×
UNCOV
601
      TSDB_CHECK_CODE(code, lino, _exit);
×
602
    } else {
UNCOV
603
      break;
×
604
    }
605
  }
606

UNCOV
607
  if (row->suid == INT64_MAX) {
×
UNCOV
608
    goto _exit;
×
609
  }
610

UNCOV
611
  code = tsdbFSetWriteRow(writer->ctx->fsetWriter, row);
×
UNCOV
612
  TSDB_CHECK_CODE(code, lino, _exit);
×
613

UNCOV
614
_exit:
×
UNCOV
615
  if (code) {
×
UNCOV
616
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
617
  }
UNCOV
618
  return code;
×
619
}
620

UNCOV
621
static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) {
×
UNCOV
622
  int32_t code = 0;
×
UNCOV
623
  int32_t lino = 0;
×
624

UNCOV
625
  if (writer->ctx->fset) {
×
626
#if 0
627
    // open data reader
628
    SDataFileReaderConfig dataFileReaderConfig = {
629
        .tsdb = writer->tsdb,
630
        .buffers = writer->buffers,
631
        .szPage = writer->szPage,
632
    };
633

634
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
635
      if (writer->ctx->fset->farr[ftype] == NULL) {
636
        continue;
637
      }
638

639
      dataFileReaderConfig.files[ftype].exist = true;
640
      dataFileReaderConfig.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
641

642
      STFileOp fileOp = {
643
          .optype = TSDB_FOP_REMOVE,
644
          .fid = writer->ctx->fset->fid,
645
          .of = writer->ctx->fset->farr[ftype]->f[0],
646
      };
647

648
      code = TARRAY2_APPEND(writer->fopArr, fileOp);
649
      TSDB_CHECK_CODE(code, lino, _exit);
650
    }
651

652
    code = tsdbDataFileReaderOpen(NULL, &dataFileReaderConfig, &writer->ctx->dataReader);
653
    TSDB_CHECK_CODE(code, lino, _exit);
654
#endif
655

656
    // open stt reader array
657
    SSttLvl* lvl;
UNCOV
658
    TARRAY2_FOREACH(writer->ctx->fset->lvlArr, lvl) {
×
659
      STFileObj* fobj;
UNCOV
660
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
661
        SSttFileReader*      reader;
UNCOV
662
        SSttFileReaderConfig sttFileReaderConfig = {
×
UNCOV
663
            .tsdb = writer->tsdb,
×
UNCOV
664
            .szPage = writer->szPage,
×
UNCOV
665
            .buffers = writer->buffers,
×
666
            .file = fobj->f[0],
667
        };
668

UNCOV
669
        code = tsdbSttFileReaderOpen(fobj->fname, &sttFileReaderConfig, &reader);
×
UNCOV
670
        TSDB_CHECK_CODE(code, lino, _exit);
×
671

UNCOV
672
        code = TARRAY2_APPEND(writer->ctx->sttReaderArr, reader);
×
UNCOV
673
        TSDB_CHECK_CODE(code, lino, _exit);
×
674

UNCOV
675
        STFileOp fileOp = {
×
676
            .optype = TSDB_FOP_REMOVE,
UNCOV
677
            .fid = fobj->f->fid,
×
678
            .of = fobj->f[0],
679
        };
680

UNCOV
681
        code = TARRAY2_APPEND(writer->fopArr, fileOp);
×
UNCOV
682
        TSDB_CHECK_CODE(code, lino, _exit);
×
683
      }
684
    }
685
  }
686

UNCOV
687
_exit:
×
UNCOV
688
  if (code) {
×
UNCOV
689
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
690
  }
UNCOV
691
  return code;
×
692
}
693

UNCOV
694
static int32_t tsdbSnapWriteFileSetCloseReader(STsdbSnapWriter* writer) {
×
UNCOV
695
  TARRAY2_CLEAR(writer->ctx->sttReaderArr, tsdbSttFileReaderClose);
×
UNCOV
696
  tsdbDataFileReaderClose(&writer->ctx->dataReader);
×
UNCOV
697
  return 0;
×
698
}
699

UNCOV
700
static int32_t tsdbSnapWriteFileSetOpenIter(STsdbSnapWriter* writer) {
×
UNCOV
701
  int32_t code = 0;
×
UNCOV
702
  int32_t lino = 0;
×
703

704
  // data ieter
UNCOV
705
  if (writer->ctx->dataReader) {
×
706
    STsdbIter*      iter;
UNCOV
707
    STsdbIterConfig config = {0};
×
708

709
    // data
UNCOV
710
    config.type = TSDB_ITER_TYPE_DATA;
×
UNCOV
711
    config.dataReader = writer->ctx->dataReader;
×
712

UNCOV
713
    code = tsdbIterOpen(&config, &iter);
×
UNCOV
714
    TSDB_CHECK_CODE(code, lino, _exit);
×
715

UNCOV
716
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
×
UNCOV
717
    TSDB_CHECK_CODE(code, lino, _exit);
×
718

719
    // tome
UNCOV
720
    config.type = TSDB_ITER_TYPE_DATA_TOMB;
×
UNCOV
721
    config.dataReader = writer->ctx->dataReader;
×
722

UNCOV
723
    code = tsdbIterOpen(&config, &iter);
×
UNCOV
724
    TSDB_CHECK_CODE(code, lino, _exit);
×
725

UNCOV
726
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
×
UNCOV
727
    TSDB_CHECK_CODE(code, lino, _exit);
×
728
  }
729

730
  // stt iter
731
  SSttFileReader* sttFileReader;
UNCOV
732
  TARRAY2_FOREACH(writer->ctx->sttReaderArr, sttFileReader) {
×
733
    STsdbIter*      iter;
UNCOV
734
    STsdbIterConfig config = {0};
×
735

736
    // data
UNCOV
737
    config.type = TSDB_ITER_TYPE_STT;
×
UNCOV
738
    config.sttReader = sttFileReader;
×
739

UNCOV
740
    code = tsdbIterOpen(&config, &iter);
×
UNCOV
741
    TSDB_CHECK_CODE(code, lino, _exit);
×
742

UNCOV
743
    code = TARRAY2_APPEND(writer->ctx->dataIterArr, iter);
×
UNCOV
744
    TSDB_CHECK_CODE(code, lino, _exit);
×
745

746
    // tomb
UNCOV
747
    config.type = TSDB_ITER_TYPE_STT_TOMB;
×
UNCOV
748
    config.sttReader = sttFileReader;
×
749

UNCOV
750
    code = tsdbIterOpen(&config, &iter);
×
UNCOV
751
    TSDB_CHECK_CODE(code, lino, _exit);
×
752

UNCOV
753
    code = TARRAY2_APPEND(writer->ctx->tombIterArr, iter);
×
UNCOV
754
    TSDB_CHECK_CODE(code, lino, _exit);
×
755
  }
756

757
  // open merger
UNCOV
758
  code = tsdbIterMergerOpen(writer->ctx->dataIterArr, &writer->ctx->dataIterMerger, false);
×
UNCOV
759
  TSDB_CHECK_CODE(code, lino, _exit);
×
760

UNCOV
761
  code = tsdbIterMergerOpen(writer->ctx->tombIterArr, &writer->ctx->tombIterMerger, true);
×
UNCOV
762
  TSDB_CHECK_CODE(code, lino, _exit);
×
763

UNCOV
764
_exit:
×
UNCOV
765
  if (code) {
×
UNCOV
766
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
767
  }
UNCOV
768
  return code;
×
769
}
770

UNCOV
771
static int32_t tsdbSnapWriteFileSetCloseIter(STsdbSnapWriter* writer) {
×
UNCOV
772
  tsdbIterMergerClose(&writer->ctx->dataIterMerger);
×
UNCOV
773
  tsdbIterMergerClose(&writer->ctx->tombIterMerger);
×
UNCOV
774
  TARRAY2_CLEAR(writer->ctx->dataIterArr, tsdbIterClose);
×
UNCOV
775
  TARRAY2_CLEAR(writer->ctx->tombIterArr, tsdbIterClose);
×
UNCOV
776
  return 0;
×
777
}
778

UNCOV
779
static int32_t tsdbSnapWriteFileSetOpenWriter(STsdbSnapWriter* writer) {
×
UNCOV
780
  int32_t code = 0;
×
UNCOV
781
  int32_t lino = 0;
×
782

UNCOV
783
  SFSetWriterConfig config = {
×
UNCOV
784
      .tsdb = writer->tsdb,
×
785
      .toSttOnly = false,
UNCOV
786
      .compactVersion = writer->compactVersion,
×
UNCOV
787
      .minRow = writer->minRow,
×
UNCOV
788
      .maxRow = writer->maxRow,
×
UNCOV
789
      .szPage = writer->szPage,
×
UNCOV
790
      .cmprAlg = writer->cmprAlg,
×
UNCOV
791
      .fid = writer->ctx->fid,
×
UNCOV
792
      .cid = writer->commitID,
×
793
      .did = writer->ctx->did,
794
      .level = 0,
795
  };
796
  // merge stt files to either data or a new stt file
UNCOV
797
  if (writer->ctx->fset) {
×
UNCOV
798
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
×
UNCOV
799
      if (writer->ctx->fset->farr[ftype] != NULL) {
×
UNCOV
800
        config.files[ftype].exist = true;
×
UNCOV
801
        config.files[ftype].file = writer->ctx->fset->farr[ftype]->f[0];
×
802
      }
803
    }
804
  }
805

UNCOV
806
  code = tsdbFSetWriterOpen(&config, &writer->ctx->fsetWriter);
×
UNCOV
807
  TSDB_CHECK_CODE(code, lino, _exit);
×
808

UNCOV
809
_exit:
×
UNCOV
810
  if (code) {
×
UNCOV
811
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
812
  }
UNCOV
813
  return code;
×
814
}
815

UNCOV
816
static int32_t tsdbSnapWriteFileSetCloseWriter(STsdbSnapWriter* writer) {
×
UNCOV
817
  return tsdbFSetWriterClose(&writer->ctx->fsetWriter, 0, writer->fopArr);
×
818
}
819

UNCOV
820
static int32_t tsdbSnapWriteFileSetBegin(STsdbSnapWriter* writer, int32_t fid) {
×
UNCOV
821
  int32_t code = 0;
×
UNCOV
822
  int32_t lino = 0;
×
823

UNCOV
824
  STFileSet* fset = &(STFileSet){.fid = fid};
×
825

UNCOV
826
  writer->ctx->fid = fid;
×
UNCOV
827
  STFileSet** fsetPtr = TARRAY2_SEARCH(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ);
×
UNCOV
828
  writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
×
829

UNCOV
830
  int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
×
UNCOV
831
  if (tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did)) {
×
UNCOV
832
    code = TSDB_CODE_NO_AVAIL_DISK;
×
UNCOV
833
    TSDB_CHECK_CODE(code, lino, _exit);
×
834
  }
UNCOV
835
  if (tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did) != 0) {
×
UNCOV
836
    tsdbError("vgId:%d failed to create directory %s", TD_VID(writer->tsdb->pVnode), writer->tsdb->path);
×
837
  }
838

UNCOV
839
  writer->ctx->hasData = true;
×
UNCOV
840
  writer->ctx->hasTomb = true;
×
841

UNCOV
842
  code = tsdbSnapWriteFileSetOpenReader(writer);
×
UNCOV
843
  TSDB_CHECK_CODE(code, lino, _exit);
×
844

UNCOV
845
  code = tsdbSnapWriteFileSetOpenIter(writer);
×
UNCOV
846
  TSDB_CHECK_CODE(code, lino, _exit);
×
847

UNCOV
848
  code = tsdbSnapWriteFileSetOpenWriter(writer);
×
UNCOV
849
  TSDB_CHECK_CODE(code, lino, _exit);
×
850

UNCOV
851
  writer->ctx->fsetWriteBegin = true;
×
852

UNCOV
853
_exit:
×
UNCOV
854
  if (code) {
×
UNCOV
855
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
856
  } else {
UNCOV
857
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, fid);
×
858
  }
UNCOV
859
  return code;
×
860
}
861

UNCOV
862
static int32_t tsdbSnapWriteTombRecord(STsdbSnapWriter* writer, const STombRecord* record) {
×
UNCOV
863
  int32_t code = 0;
×
UNCOV
864
  int32_t lino = 0;
×
865

UNCOV
866
  while (writer->ctx->hasTomb) {
×
UNCOV
867
    STombRecord* record1 = tsdbIterMergerGetTombRecord(writer->ctx->tombIterMerger);
×
UNCOV
868
    if (record1 == NULL) {
×
UNCOV
869
      writer->ctx->hasTomb = false;
×
UNCOV
870
      break;
×
871
    }
872

UNCOV
873
    int32_t c = tTombRecordCompare(record1, record);
×
UNCOV
874
    if (c <= 0) {
×
UNCOV
875
      code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record1);
×
UNCOV
876
      TSDB_CHECK_CODE(code, lino, _exit);
×
877
    } else {
UNCOV
878
      break;
×
879
    }
880

UNCOV
881
    code = tsdbIterMergerNext(writer->ctx->tombIterMerger);
×
UNCOV
882
    TSDB_CHECK_CODE(code, lino, _exit);
×
883
  }
884

UNCOV
885
  if (record->suid == INT64_MAX) {
×
UNCOV
886
    goto _exit;
×
887
  }
888

UNCOV
889
  code = tsdbFSetWriteTombRecord(writer->ctx->fsetWriter, record);
×
UNCOV
890
  TSDB_CHECK_CODE(code, lino, _exit);
×
891

UNCOV
892
_exit:
×
UNCOV
893
  if (code) {
×
UNCOV
894
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
895
  }
UNCOV
896
  return code;
×
897
}
898

UNCOV
899
static int32_t tsdbSnapWriteFileSetEnd(STsdbSnapWriter* writer) {
×
UNCOV
900
  if (!writer->ctx->fsetWriteBegin) return 0;
×
901

UNCOV
902
  int32_t code = 0;
×
UNCOV
903
  int32_t lino = 0;
×
904

905
  // end timeseries data write
UNCOV
906
  SRowInfo row = {
×
907
      .suid = INT64_MAX,
908
      .uid = INT64_MAX,
909
  };
910

UNCOV
911
  code = tsdbSnapWriteTimeSeriesRow(writer, &row);
×
UNCOV
912
  TSDB_CHECK_CODE(code, lino, _exit);
×
913

914
  // end tombstone data write
UNCOV
915
  STombRecord record = {
×
916
      .suid = INT64_MAX,
917
      .uid = INT64_MAX,
918
  };
919

UNCOV
920
  code = tsdbSnapWriteTombRecord(writer, &record);
×
UNCOV
921
  TSDB_CHECK_CODE(code, lino, _exit);
×
922

923
  // close write
UNCOV
924
  code = tsdbSnapWriteFileSetCloseWriter(writer);
×
UNCOV
925
  TSDB_CHECK_CODE(code, lino, _exit);
×
926

UNCOV
927
  code = tsdbSnapWriteFileSetCloseIter(writer);
×
UNCOV
928
  TSDB_CHECK_CODE(code, lino, _exit);
×
929

UNCOV
930
  code = tsdbSnapWriteFileSetCloseReader(writer);
×
UNCOV
931
  TSDB_CHECK_CODE(code, lino, _exit);
×
932

UNCOV
933
  writer->ctx->fsetWriteBegin = false;
×
934

UNCOV
935
_exit:
×
UNCOV
936
  if (code) {
×
UNCOV
937
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
938
  } else {
UNCOV
939
    tsdbInfo("vgId:%d %s succeeded, fid:%d", TD_VID(writer->tsdb->pVnode), __func__, writer->ctx->fid);
×
940
  }
UNCOV
941
  return code;
×
942
}
943

UNCOV
944
static int32_t tsdbSnapWriteFileSetAbort(STsdbSnapWriter* writer) {
×
UNCOV
945
  if (!writer->ctx->fsetWriteBegin) return 0;
×
946

UNCOV
947
  int32_t code = 0;
×
UNCOV
948
  int32_t lino = 0;
×
949

950
  // close write
UNCOV
951
  code = tsdbSnapWriteFileSetCloseWriter(writer);
×
UNCOV
952
  TSDB_CHECK_CODE(code, lino, _exit);
×
953

UNCOV
954
  code = tsdbSnapWriteFileSetCloseIter(writer);
×
UNCOV
955
  TSDB_CHECK_CODE(code, lino, _exit);
×
956

UNCOV
957
  code = tsdbSnapWriteFileSetCloseReader(writer);
×
UNCOV
958
  TSDB_CHECK_CODE(code, lino, _exit);
×
959

UNCOV
960
  writer->ctx->fsetWriteBegin = false;
×
961

UNCOV
962
_exit:
×
UNCOV
963
  if (code) {
×
UNCOV
964
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
965
  }
UNCOV
966
  return code;
×
967
}
968

UNCOV
969
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
×
UNCOV
970
  int32_t code = 0;
×
UNCOV
971
  int32_t lino = 0;
×
972

UNCOV
973
  SBlockData blockData[1] = {0};
×
974

UNCOV
975
  SBuffer buffer = {
×
UNCOV
976
      .capacity = hdr->size,
×
UNCOV
977
      .data = hdr->data,
×
UNCOV
978
      .size = hdr->size,
×
979
  };
UNCOV
980
  SBufferReader br = BUFFER_READER_INITIALIZER(0, &buffer);
×
981

UNCOV
982
  code = tBlockDataDecompress(&br, blockData, &writer->buffers[0]);
×
UNCOV
983
  TSDB_CHECK_CODE(code, lino, _exit);
×
984

UNCOV
985
  int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision);
×
UNCOV
986
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
×
UNCOV
987
    code = tsdbSnapWriteFileSetEnd(writer);
×
UNCOV
988
    TSDB_CHECK_CODE(code, lino, _exit);
×
989

UNCOV
990
    code = tsdbSnapWriteFileSetBegin(writer, fid);
×
UNCOV
991
    TSDB_CHECK_CODE(code, lino, _exit);
×
992
  }
993

UNCOV
994
  for (int32_t i = 0; i < blockData->nRow; ++i) {
×
UNCOV
995
    SRowInfo rowInfo = {
×
UNCOV
996
        .suid = blockData->suid,
×
UNCOV
997
        .uid = blockData->uid ? blockData->uid : blockData->aUid[i],
×
998
        .row = tsdbRowFromBlockData(blockData, i),
999
    };
1000

UNCOV
1001
    code = tsdbSnapWriteTimeSeriesRow(writer, &rowInfo);
×
UNCOV
1002
    TSDB_CHECK_CODE(code, lino, _exit);
×
1003
  }
1004

UNCOV
1005
_exit:
×
UNCOV
1006
  if (code) {
×
UNCOV
1007
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1008
  } else {
UNCOV
1009
    tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(writer->tsdb->pVnode), __func__,
×
1010
              blockData->suid, blockData->uid, blockData->nRow);
1011
  }
UNCOV
1012
  tBlockDataDestroy(blockData);
×
UNCOV
1013
  return code;
×
1014
}
1015

UNCOV
1016
static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombBlock) {
×
UNCOV
1017
  int32_t code = 0;
×
UNCOV
1018
  int32_t lino = 0;
×
1019

UNCOV
1020
  tTombBlockClear(tombBlock);
×
1021

UNCOV
1022
  int64_t size = hdr->size;
×
UNCOV
1023
  size = size / TOMB_RECORD_ELEM_NUM;
×
UNCOV
1024
  tombBlock->numOfRecords = size / sizeof(int64_t);
×
1025

1026
  // int64_t* data = (int64_t*)hdr->data;
UNCOV
1027
  uint8_t* data = hdr->data;
×
UNCOV
1028
  for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
×
UNCOV
1029
    code = tBufferPut(tombBlock->buffers + i, data, size);
×
UNCOV
1030
    TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
1031
    data += size;
×
1032
  }
1033

UNCOV
1034
_exit:
×
UNCOV
1035
  return code;
×
1036
}
1037

UNCOV
1038
static int32_t tsdbSnapWriteTombData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
×
UNCOV
1039
  int32_t code = 0;
×
UNCOV
1040
  int32_t lino = 0;
×
1041

1042
  STombRecord record;
UNCOV
1043
  STombBlock  tombBlock[1] = {0};
×
1044

UNCOV
1045
  code = tsdbSnapWriteDecmprTombBlock(hdr, tombBlock);
×
UNCOV
1046
  TSDB_CHECK_CODE(code, lino, _exit);
×
1047

UNCOV
1048
  code = tTombBlockGet(tombBlock, 0, &record);
×
UNCOV
1049
  TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
1050
  int32_t fid = tsdbKeyFid(record.skey, writer->minutes, writer->precision);
×
UNCOV
1051
  if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
×
UNCOV
1052
    code = tsdbSnapWriteFileSetEnd(writer);
×
UNCOV
1053
    TSDB_CHECK_CODE(code, lino, _exit);
×
1054

UNCOV
1055
    code = tsdbSnapWriteFileSetBegin(writer, fid);
×
UNCOV
1056
    TSDB_CHECK_CODE(code, lino, _exit);
×
1057
  }
1058

UNCOV
1059
  if (writer->ctx->hasData) {
×
UNCOV
1060
    SRowInfo row = {
×
1061
        .suid = INT64_MAX,
1062
        .uid = INT64_MAX,
1063
    };
1064

UNCOV
1065
    code = tsdbSnapWriteTimeSeriesRow(writer, &row);
×
UNCOV
1066
    TSDB_CHECK_CODE(code, lino, _exit);
×
1067
  }
1068

UNCOV
1069
  for (int32_t i = 0; i < TOMB_BLOCK_SIZE(tombBlock); ++i) {
×
UNCOV
1070
    code = tTombBlockGet(tombBlock, i, &record);
×
UNCOV
1071
    TSDB_CHECK_CODE(code, lino, _exit);
×
1072

UNCOV
1073
    code = tsdbSnapWriteTombRecord(writer, &record);
×
UNCOV
1074
    TSDB_CHECK_CODE(code, lino, _exit);
×
1075
  }
1076

UNCOV
1077
  tTombBlockDestroy(tombBlock);
×
1078

UNCOV
1079
_exit:
×
UNCOV
1080
  if (code) {
×
UNCOV
1081
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1082
  }
UNCOV
1083
  return code;
×
1084
}
1085

UNCOV
1086
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** writer) {
×
UNCOV
1087
  int32_t code = 0;
×
UNCOV
1088
  int32_t lino = 0;
×
1089

1090
  // start to write
UNCOV
1091
  writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
×
UNCOV
1092
  if (writer[0] == NULL) return terrno;
×
1093

UNCOV
1094
  writer[0]->tsdb = pTsdb;
×
UNCOV
1095
  writer[0]->sver = sver;
×
UNCOV
1096
  writer[0]->ever = ever;
×
UNCOV
1097
  writer[0]->minutes = pTsdb->keepCfg.days;
×
UNCOV
1098
  writer[0]->precision = pTsdb->keepCfg.precision;
×
UNCOV
1099
  writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
×
UNCOV
1100
  writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
×
UNCOV
1101
  writer[0]->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
×
UNCOV
1102
  writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS);
×
UNCOV
1103
  writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize;
×
UNCOV
1104
  writer[0]->compactVersion = INT64_MAX;
×
UNCOV
1105
  writer[0]->now = taosGetTimestampMs();
×
1106

1107
  code =
UNCOV
1108
      tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TFileSetRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr);
×
UNCOV
1109
  TSDB_CHECK_CODE(code, lino, _exit);
×
1110

UNCOV
1111
_exit:
×
UNCOV
1112
  if (code) {
×
UNCOV
1113
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
1114
  } else {
UNCOV
1115
    tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, sver, ever);
×
1116
  }
UNCOV
1117
  return code;
×
1118
}
1119

UNCOV
1120
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer, bool rollback) {
×
UNCOV
1121
  int32_t code = 0;
×
UNCOV
1122
  int32_t lino = 0;
×
1123

UNCOV
1124
  if (!rollback) {
×
UNCOV
1125
    code = tsdbSnapWriteFileSetEnd(writer);
×
UNCOV
1126
    TSDB_CHECK_CODE(code, lino, _exit);
×
1127

UNCOV
1128
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
×
UNCOV
1129
    TSDB_CHECK_CODE(code, lino, _exit);
×
1130
  } else {
UNCOV
1131
    code = tsdbSnapWriteFileSetAbort(writer);
×
UNCOV
1132
    TSDB_CHECK_CODE(code, lino, _exit);
×
1133

UNCOV
1134
    code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
×
UNCOV
1135
    TSDB_CHECK_CODE(code, lino, _exit);
×
1136
  }
1137

UNCOV
1138
_exit:
×
UNCOV
1139
  if (code) {
×
UNCOV
1140
    TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
×
1141
  } else {
UNCOV
1142
    tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__);
×
1143
  }
UNCOV
1144
  return code;
×
1145
}
1146

UNCOV
1147
int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
×
UNCOV
1148
  if (writer[0] == NULL) return 0;
×
1149

UNCOV
1150
  int32_t code = 0;
×
UNCOV
1151
  int32_t lino = 0;
×
1152

UNCOV
1153
  STsdb* tsdb = writer[0]->tsdb;
×
1154

UNCOV
1155
  if (rollback) {
×
UNCOV
1156
    code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
×
UNCOV
1157
    TSDB_CHECK_CODE(code, lino, _exit);
×
1158
  } else {
UNCOV
1159
    (void)taosThreadMutexLock(&writer[0]->tsdb->mutex);
×
1160

UNCOV
1161
    code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
×
UNCOV
1162
    if (code) {
×
UNCOV
1163
      (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
×
UNCOV
1164
      TSDB_CHECK_CODE(code, lino, _exit);
×
1165
    }
1166

UNCOV
1167
    writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
×
1168

UNCOV
1169
    (void)taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
×
1170
  }
1171

UNCOV
1172
  tsdbIterMergerClose(&writer[0]->ctx->tombIterMerger);
×
UNCOV
1173
  tsdbIterMergerClose(&writer[0]->ctx->dataIterMerger);
×
UNCOV
1174
  TARRAY2_DESTROY(writer[0]->ctx->tombIterArr, tsdbIterClose);
×
UNCOV
1175
  TARRAY2_DESTROY(writer[0]->ctx->dataIterArr, tsdbIterClose);
×
UNCOV
1176
  TARRAY2_DESTROY(writer[0]->ctx->sttReaderArr, tsdbSttFileReaderClose);
×
UNCOV
1177
  tsdbDataFileReaderClose(&writer[0]->ctx->dataReader);
×
1178

UNCOV
1179
  TARRAY2_DESTROY(writer[0]->fopArr, NULL);
×
UNCOV
1180
  tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr);
×
1181

UNCOV
1182
  for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); ++i) {
×
UNCOV
1183
    tBufferDestroy(writer[0]->buffers + i);
×
1184
  }
1185

UNCOV
1186
  taosMemoryFree(writer[0]);
×
UNCOV
1187
  writer[0] = NULL;
×
1188

UNCOV
1189
_exit:
×
UNCOV
1190
  if (code) {
×
UNCOV
1191
    TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
×
1192
  } else {
UNCOV
1193
    tsdbInfo("vgId:%d %s done, rollback:%d", TD_VID(tsdb->pVnode), __func__, rollback);
×
1194
  }
UNCOV
1195
  return code;
×
1196
}
1197

UNCOV
1198
int32_t tsdbSnapWrite(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
×
UNCOV
1199
  int32_t code = 0;
×
UNCOV
1200
  int32_t lino = 0;
×
1201

UNCOV
1202
  if (hdr->type == SNAP_DATA_TSDB) {
×
UNCOV
1203
    code = tsdbSnapWriteTimeSeriesData(writer, hdr);
×
UNCOV
1204
    TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
1205
  } else if (hdr->type == SNAP_DATA_DEL) {
×
UNCOV
1206
    code = tsdbSnapWriteTombData(writer, hdr);
×
UNCOV
1207
    TSDB_CHECK_CODE(code, lino, _exit);
×
1208
  } else {
UNCOV
1209
    TSDB_CHECK_CODE(code = TSDB_CODE_INVALID_PARA, lino, _exit);
×
1210
  }
1211

UNCOV
1212
_exit:
×
UNCOV
1213
  if (code) {
×
UNCOV
1214
    tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
×
1215
              TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code), hdr->type, hdr->index, hdr->size);
1216
  } else {
UNCOV
1217
    tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(writer->tsdb->pVnode), __func__,
×
1218
              hdr->type, hdr->index, hdr->size);
1219
  }
UNCOV
1220
  return code;
×
1221
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc