• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3565

25 Dec 2024 05:34AM UTC coverage: 51.098% (-11.1%) from 62.21%
#3565

push

travis-ci

web-flow
Merge pull request #29316 from taosdata/enh/3.0/TD-33266

enh(ut):Add wal & config UT.

111558 of 284773 branches covered (39.17%)

Branch coverage included in aggregate %.

1 of 2 new or added lines in 2 files covered. (50.0%)

39015 existing lines in 102 files now uncovered.

177882 of 281666 relevant lines covered (63.15%)

15090998.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tsdb/tsdbMerge.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbMerge.h"
17

18
#define TSDB_MAX_LEVEL 2  // means max level is 3
19

20
typedef struct {
21
  STsdb     *tsdb;
22
  int32_t    fid;
23
  STFileSet *fset;
24

25
  int32_t sttTrigger;
26
  int32_t maxRow;
27
  int32_t minRow;
28
  int32_t szPage;
29
  int8_t  cmprAlg;
30
  int64_t compactVersion;
31
  int64_t cid;
32

33
  // context
34
  struct {
35
    bool       opened;
36
    int64_t    now;
37
    STFileSet *fset;
38
    bool       toData;
39
    int32_t    level;
40
    TABLEID    tbid[1];
41
  } ctx[1];
42

43
  TFileOpArray fopArr[1];
44

45
  // reader
46
  TSttFileReaderArray sttReaderArr[1];
47
  // iter
48
  TTsdbIterArray dataIterArr[1];
49
  SIterMerger   *dataIterMerger;
50
  TTsdbIterArray tombIterArr[1];
51
  SIterMerger   *tombIterMerger;
52
  // writer
53
  SFSetWriter *writer;
54
} SMerger;
55

UNCOV
56
static int32_t tsdbMergerOpen(SMerger *merger) {
×
UNCOV
57
  merger->ctx->now = taosGetTimestampSec();
×
UNCOV
58
  merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows;
×
UNCOV
59
  merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows;
×
UNCOV
60
  merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize;
×
UNCOV
61
  merger->cmprAlg = merger->tsdb->pVnode->config.tsdbCfg.compression;
×
UNCOV
62
  merger->compactVersion = INT64_MAX;
×
UNCOV
63
  merger->cid = tsdbFSAllocEid(merger->tsdb->pFS);
×
UNCOV
64
  merger->ctx->opened = true;
×
UNCOV
65
  return 0;
×
66
}
67

UNCOV
68
static int32_t tsdbMergerClose(SMerger *merger) {
×
UNCOV
69
  int32_t lino = 0;
×
UNCOV
70
  SVnode *pVnode = merger->tsdb->pVnode;
×
71

72
  // clear the merge
UNCOV
73
  TARRAY2_DESTROY(merger->tombIterArr, NULL);
×
UNCOV
74
  TARRAY2_DESTROY(merger->dataIterArr, NULL);
×
UNCOV
75
  TARRAY2_DESTROY(merger->sttReaderArr, NULL);
×
UNCOV
76
  TARRAY2_DESTROY(merger->fopArr, NULL);
×
UNCOV
77
  return 0;
×
78
}
79

UNCOV
80
static void tsdbMergeFileSetEndCloseReader(SMerger *merger) {
×
UNCOV
81
  TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFileReaderClose);
×
UNCOV
82
}
×
83

UNCOV
84
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
×
UNCOV
85
  int32_t  code = 0;
×
UNCOV
86
  int32_t  lino = 0;
×
87
  SSttLvl *lvl;
88

UNCOV
89
  bool hasLevelLargerThanMax = false;
×
UNCOV
90
  TARRAY2_FOREACH_REVERSE(merger->ctx->fset->lvlArr, lvl) {
×
UNCOV
91
    if (lvl->level <= TSDB_MAX_LEVEL) {
×
UNCOV
92
      break;
×
93
    } else if (TARRAY2_SIZE(lvl->fobjArr) > 0) {
×
94
      hasLevelLargerThanMax = true;
×
95
      break;
×
96
    }
97
  }
98

UNCOV
99
  if (hasLevelLargerThanMax) {
×
100
    // merge all stt files
101
    merger->ctx->toData = true;
×
102
    merger->ctx->level = TSDB_MAX_LEVEL;
×
103

104
    TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
×
105
      int32_t numMergeFile = TARRAY2_SIZE(lvl->fobjArr);
×
106

107
      for (int32_t i = 0; i < numMergeFile; ++i) {
×
108
        STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i);
×
109

110
        STFileOp op = {
×
111
            .optype = TSDB_FOP_REMOVE,
112
            .fid = merger->ctx->fset->fid,
×
113
            .of = fobj->f[0],
114
        };
115
        TAOS_CHECK_GOTO(TARRAY2_APPEND(merger->fopArr, op), &lino, _exit);
×
116

117
        SSttFileReader      *reader;
118
        SSttFileReaderConfig config = {
×
119
            .tsdb = merger->tsdb,
×
120
            .szPage = merger->szPage,
×
121
            .file[0] = fobj->f[0],
122
        };
123

124
        TAOS_CHECK_GOTO(tsdbSttFileReaderOpen(fobj->fname, &config, &reader), &lino, _exit);
×
125

126
        TAOS_CHECK_GOTO(TARRAY2_APPEND(merger->sttReaderArr, reader), &lino, _exit);
×
127
      }
128
    }
129
  } else {
130
    // do regular merge
UNCOV
131
    merger->ctx->toData = true;
×
UNCOV
132
    merger->ctx->level = 0;
×
133

134
    // find the highest level that can be merged to
UNCOV
135
    for (int32_t i = 0, numCarry = 0;;) {
×
UNCOV
136
      int32_t numFile = numCarry;
×
UNCOV
137
      if (i < TARRAY2_SIZE(merger->ctx->fset->lvlArr) &&
×
UNCOV
138
          merger->ctx->level == TARRAY2_GET(merger->ctx->fset->lvlArr, i)->level) {
×
UNCOV
139
        numFile += TARRAY2_SIZE(TARRAY2_GET(merger->ctx->fset->lvlArr, i)->fobjArr);
×
UNCOV
140
        i++;
×
141
      }
142

UNCOV
143
      numCarry = numFile / merger->sttTrigger;
×
UNCOV
144
      if (numCarry == 0) {
×
UNCOV
145
        break;
×
146
      } else {
UNCOV
147
        merger->ctx->level++;
×
148
      }
149
    }
150

UNCOV
151
    if (merger->ctx->level <= TSDB_MAX_LEVEL) {
×
UNCOV
152
      TARRAY2_FOREACH_REVERSE(merger->ctx->fset->lvlArr, lvl) {
×
UNCOV
153
        if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
×
154
          continue;
×
155
        }
156

UNCOV
157
        if (lvl->level >= merger->ctx->level) {
×
UNCOV
158
          merger->ctx->toData = false;
×
159
        }
UNCOV
160
        break;
×
161
      }
162
    }
163

164
    // get number of level-0 files to merge
UNCOV
165
    int32_t numFile = pow(merger->sttTrigger, merger->ctx->level);
×
UNCOV
166
    TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
×
UNCOV
167
      if (lvl->level == 0) continue;
×
UNCOV
168
      if (lvl->level >= merger->ctx->level) break;
×
169

UNCOV
170
      numFile = numFile - TARRAY2_SIZE(lvl->fobjArr) * pow(merger->sttTrigger, lvl->level);
×
171
    }
172

173
    // get file system operations
UNCOV
174
    TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
×
UNCOV
175
      if (lvl->level >= merger->ctx->level) {
×
UNCOV
176
        break;
×
177
      }
178

179
      int32_t numMergeFile;
UNCOV
180
      if (lvl->level == 0) {
×
UNCOV
181
        numMergeFile = numFile;
×
182
      } else {
UNCOV
183
        numMergeFile = TARRAY2_SIZE(lvl->fobjArr);
×
184
      }
185

UNCOV
186
      for (int32_t i = 0; i < numMergeFile; ++i) {
×
UNCOV
187
        STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i);
×
188

UNCOV
189
        STFileOp op = {
×
190
            .optype = TSDB_FOP_REMOVE,
UNCOV
191
            .fid = merger->ctx->fset->fid,
×
192
            .of = fobj->f[0],
193
        };
UNCOV
194
        TAOS_CHECK_GOTO(TARRAY2_APPEND(merger->fopArr, op), &lino, _exit);
×
195

196
        SSttFileReader      *reader;
UNCOV
197
        SSttFileReaderConfig config = {
×
UNCOV
198
            .tsdb = merger->tsdb,
×
UNCOV
199
            .szPage = merger->szPage,
×
200
            .file[0] = fobj->f[0],
201
        };
202

UNCOV
203
        TAOS_CHECK_GOTO(tsdbSttFileReaderOpen(fobj->fname, &config, &reader), &lino, _exit);
×
204

UNCOV
205
        if ((code = TARRAY2_APPEND(merger->sttReaderArr, reader))) {
×
206
          tsdbSttFileReaderClose(&reader);
×
207
          TSDB_CHECK_CODE(code, lino, _exit);
×
208
        }
209
      }
210
    }
211

UNCOV
212
    if (merger->ctx->level > TSDB_MAX_LEVEL) {
×
UNCOV
213
      merger->ctx->level = TSDB_MAX_LEVEL;
×
214
    }
215
  }
216

UNCOV
217
_exit:
×
UNCOV
218
  if (code) {
×
219
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(merger->tsdb->pVnode), __func__, __FILE__, lino,
×
220
              tstrerror(code));
221
    tsdbMergeFileSetEndCloseReader(merger);
×
222
  }
UNCOV
223
  return code;
×
224
}
225

UNCOV
226
static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) {
×
UNCOV
227
  int32_t code = 0;
×
UNCOV
228
  int32_t lino = 0;
×
UNCOV
229
  int32_t vid = TD_VID(merger->tsdb->pVnode);
×
230

231
  SSttFileReader *sttReader;
UNCOV
232
  TARRAY2_FOREACH(merger->sttReaderArr, sttReader) {
×
233
    STsdbIter      *iter;
UNCOV
234
    STsdbIterConfig config = {0};
×
235

236
    // data iter
UNCOV
237
    config.type = TSDB_ITER_TYPE_STT;
×
UNCOV
238
    config.sttReader = sttReader;
×
239

UNCOV
240
    TAOS_CHECK_GOTO(tsdbIterOpen(&config, &iter), &lino, _exit);
×
UNCOV
241
    TAOS_CHECK_GOTO(TARRAY2_APPEND(merger->dataIterArr, iter), &lino, _exit);
×
242

243
    // tomb iter
UNCOV
244
    config.type = TSDB_ITER_TYPE_STT_TOMB;
×
UNCOV
245
    config.sttReader = sttReader;
×
246

UNCOV
247
    TAOS_CHECK_GOTO(tsdbIterOpen(&config, &iter), &lino, _exit);
×
248

UNCOV
249
    TAOS_CHECK_GOTO(TARRAY2_APPEND(merger->tombIterArr, iter), &lino, _exit);
×
250
  }
251

UNCOV
252
  TAOS_CHECK_GOTO(tsdbIterMergerOpen(merger->dataIterArr, &merger->dataIterMerger, false), &lino, _exit);
×
253

UNCOV
254
  TAOS_CHECK_GOTO(tsdbIterMergerOpen(merger->tombIterArr, &merger->tombIterMerger, true), &lino, _exit);
×
255

UNCOV
256
_exit:
×
UNCOV
257
  if (code) {
×
258
    tsdbError("vgId:%d %s failed at %s:%d since %s", vid, __func__, __FILE__, lino, tstrerror(code));
×
259
  }
UNCOV
260
  return code;
×
261
}
262

UNCOV
263
static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
×
UNCOV
264
  int32_t code = 0;
×
UNCOV
265
  int32_t lino = 0;
×
UNCOV
266
  int32_t vid = TD_VID(merger->tsdb->pVnode);
×
267

268
  SDiskID did;
UNCOV
269
  int32_t level = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now);
×
270

UNCOV
271
  TAOS_CHECK_GOTO(tfsAllocDisk(merger->tsdb->pVnode->pTfs, level, &did), &lino, _exit);
×
272

UNCOV
273
  code = tfsMkdirRecurAt(merger->tsdb->pVnode->pTfs, merger->tsdb->path, did);
×
UNCOV
274
  TSDB_CHECK_CODE(code, lino, _exit);
×
275

UNCOV
276
  SFSetWriterConfig config = {
×
UNCOV
277
      .tsdb = merger->tsdb,
×
278
      .toSttOnly = true,
UNCOV
279
      .compactVersion = merger->compactVersion,
×
UNCOV
280
      .minRow = merger->minRow,
×
UNCOV
281
      .maxRow = merger->maxRow,
×
UNCOV
282
      .szPage = merger->szPage,
×
UNCOV
283
      .cmprAlg = merger->cmprAlg,
×
UNCOV
284
      .fid = merger->ctx->fset->fid,
×
UNCOV
285
      .cid = merger->cid,
×
286
      .did = did,
UNCOV
287
      .level = merger->ctx->level,
×
288
  };
289

UNCOV
290
  if (merger->ctx->toData) {
×
UNCOV
291
    config.toSttOnly = false;
×
292

UNCOV
293
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
×
UNCOV
294
      if (merger->ctx->fset->farr[ftype]) {
×
UNCOV
295
        config.files[ftype].exist = true;
×
UNCOV
296
        config.files[ftype].file = merger->ctx->fset->farr[ftype]->f[0];
×
297
      } else {
UNCOV
298
        config.files[ftype].exist = false;
×
299
      }
300
    }
301
  }
302

UNCOV
303
  TAOS_CHECK_GOTO(tsdbFSetWriterOpen(&config, &merger->writer), &lino, _exit);
×
304

UNCOV
305
_exit:
×
UNCOV
306
  if (code) {
×
307
    tsdbError("vgId:%d %s failed at %s:%d since %s", vid, __func__, __FILE__, lino, tstrerror(code));
×
308
  }
UNCOV
309
  return code;
×
310
}
311

UNCOV
312
static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
×
UNCOV
313
  int32_t code = 0;
×
UNCOV
314
  int32_t lino = 0;
×
315

UNCOV
316
  TARRAY2_CLEAR(merger->fopArr, NULL);
×
317

UNCOV
318
  merger->ctx->tbid->suid = 0;
×
UNCOV
319
  merger->ctx->tbid->uid = 0;
×
320

321
  // open reader
UNCOV
322
  TAOS_CHECK_GOTO(tsdbMergeFileSetBeginOpenReader(merger), &lino, _exit);
×
323

324
  // open iterator
UNCOV
325
  TAOS_CHECK_GOTO(tsdbMergeFileSetBeginOpenIter(merger), &lino, _exit);
×
326

327
  // open writer
UNCOV
328
  TAOS_CHECK_GOTO(tsdbMergeFileSetBeginOpenWriter(merger), &lino, _exit);
×
329

UNCOV
330
_exit:
×
UNCOV
331
  if (code) {
×
332
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(merger->tsdb->pVnode), __func__, __FILE__, lino,
×
333
              tstrerror(code));
334
  }
UNCOV
335
  return code;
×
336
}
337

UNCOV
338
static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
×
UNCOV
339
  return tsdbFSetWriterClose(&merger->writer, 0, merger->fopArr);
×
340
}
341

UNCOV
342
static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) {
×
UNCOV
343
  tsdbIterMergerClose(&merger->tombIterMerger);
×
UNCOV
344
  TARRAY2_CLEAR(merger->tombIterArr, tsdbIterClose);
×
UNCOV
345
  tsdbIterMergerClose(&merger->dataIterMerger);
×
UNCOV
346
  TARRAY2_CLEAR(merger->dataIterArr, tsdbIterClose);
×
UNCOV
347
  return 0;
×
348
}
349

UNCOV
350
static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
×
UNCOV
351
  int32_t code = 0;
×
UNCOV
352
  int32_t lino = 0;
×
353

UNCOV
354
  TAOS_CHECK_GOTO(tsdbMergeFileSetEndCloseWriter(merger), &lino, _exit);
×
355

UNCOV
356
  TAOS_CHECK_GOTO(tsdbMergeFileSetEndCloseIter(merger), &lino, _exit);
×
357

UNCOV
358
  tsdbMergeFileSetEndCloseReader(merger);
×
359

360
  // edit file system
UNCOV
361
  TAOS_CHECK_GOTO(tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE), &lino, _exit);
×
362

UNCOV
363
  (void)taosThreadMutexLock(&merger->tsdb->mutex);
×
UNCOV
364
  code = tsdbFSEditCommit(merger->tsdb->pFS);
×
UNCOV
365
  if (code) {
×
366
    (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
×
367
    TSDB_CHECK_CODE(code, lino, _exit);
×
368
  }
UNCOV
369
  (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
×
370

UNCOV
371
_exit:
×
UNCOV
372
  if (code) {
×
373
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(merger->tsdb->pVnode), __func__, __FILE__, lino,
×
374
              tstrerror(code));
375
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
×
376
  }
UNCOV
377
  return code;
×
378
}
379

UNCOV
380
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
×
UNCOV
381
  int32_t code = 0;
×
UNCOV
382
  int32_t lino = 0;
×
383

UNCOV
384
  merger->ctx->fset = fset;
×
UNCOV
385
  TAOS_CHECK_GOTO(tsdbMergeFileSetBegin(merger), &lino, _exit);
×
386

387
  // data
388
  SMetaInfo info;
389
  SRowInfo *row;
UNCOV
390
  merger->ctx->tbid->suid = 0;
×
UNCOV
391
  merger->ctx->tbid->uid = 0;
×
UNCOV
392
  while ((row = tsdbIterMergerGetData(merger->dataIterMerger)) != NULL) {
×
UNCOV
393
    if (row->uid != merger->ctx->tbid->uid) {
×
UNCOV
394
      merger->ctx->tbid->uid = row->uid;
×
UNCOV
395
      merger->ctx->tbid->suid = row->suid;
×
396

UNCOV
397
      if (metaGetInfo(merger->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) {
×
UNCOV
398
        TAOS_CHECK_GOTO(tsdbIterMergerSkipTableData(merger->dataIterMerger, merger->ctx->tbid), &lino, _exit);
×
UNCOV
399
        continue;
×
400
      }
401
    }
402

UNCOV
403
    TAOS_CHECK_GOTO(tsdbFSetWriteRow(merger->writer, row), &lino, _exit);
×
404

UNCOV
405
    TAOS_CHECK_GOTO(tsdbIterMergerNext(merger->dataIterMerger), &lino, _exit);
×
406
  }
407

408
  // tomb
UNCOV
409
  merger->ctx->tbid->suid = 0;
×
UNCOV
410
  merger->ctx->tbid->uid = 0;
×
UNCOV
411
  for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(merger->tombIterMerger)) != NULL;) {
×
UNCOV
412
    if (record->uid != merger->ctx->tbid->uid) {
×
UNCOV
413
      merger->ctx->tbid->uid = record->uid;
×
UNCOV
414
      merger->ctx->tbid->suid = record->suid;
×
415

UNCOV
416
      if (metaGetInfo(merger->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) {
×
UNCOV
417
        TAOS_CHECK_GOTO(tsdbIterMergerSkipTableData(merger->tombIterMerger, merger->ctx->tbid), &lino, _exit);
×
UNCOV
418
        continue;
×
419
      }
420
    }
UNCOV
421
    TAOS_CHECK_GOTO(tsdbFSetWriteTombRecord(merger->writer, record), &lino, _exit);
×
422

UNCOV
423
    TAOS_CHECK_GOTO(tsdbIterMergerNext(merger->tombIterMerger), &lino, _exit);
×
424
  }
425

UNCOV
426
  TAOS_CHECK_GOTO(tsdbMergeFileSetEnd(merger), &lino, _exit);
×
427

UNCOV
428
_exit:
×
UNCOV
429
  if (code) {
×
430
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(merger->tsdb->pVnode), __func__, __FILE__, lino,
×
431
              tstrerror(code));
432
  } else {
UNCOV
433
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid);
×
434
  }
UNCOV
435
  return code;
×
436
}
437

UNCOV
438
static int32_t tsdbDoMerge(SMerger *merger) {
×
UNCOV
439
  int32_t code = 0;
×
UNCOV
440
  int32_t lino = 0;
×
441

UNCOV
442
  if (TARRAY2_SIZE(merger->fset->lvlArr) == 0) return 0;
×
443

UNCOV
444
  SSttLvl *lvl = TARRAY2_FIRST(merger->fset->lvlArr);
×
UNCOV
445
  if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < merger->sttTrigger) {
×
UNCOV
446
    return 0;
×
447
  }
448

UNCOV
449
  TAOS_CHECK_GOTO(tsdbMergerOpen(merger), &lino, _exit);
×
UNCOV
450
  TAOS_CHECK_GOTO(tsdbMergeFileSet(merger, merger->fset), &lino, _exit);
×
UNCOV
451
  TAOS_CHECK_GOTO(tsdbMergerClose(merger), &lino, _exit);
×
452

UNCOV
453
_exit:
×
UNCOV
454
  if (code) {
×
455
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(merger->tsdb->pVnode), __func__, __FILE__, lino,
×
456
              tstrerror(code));
457
  } else {
UNCOV
458
    tsdbDebug("vgId:%d %s done", TD_VID(merger->tsdb->pVnode), __func__);
×
459
  }
UNCOV
460
  return code;
×
461
}
462

UNCOV
463
static int32_t tsdbMergeGetFSet(SMerger *merger) {
×
464
  STFileSet *fset;
465
  int32_t    code;
UNCOV
466
  STsdb     *tsdb = merger->tsdb;
×
467

UNCOV
468
  (void)taosThreadMutexLock(&merger->tsdb->mutex);
×
469

UNCOV
470
  if (tsdb->bgTaskDisabled) {
×
471
    (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
×
472
    return 0;
×
473
  }
474

UNCOV
475
  tsdbBeginTaskOnFileSet(tsdb, merger->fid, EVA_TASK_MERGE, &fset);
×
UNCOV
476
  if (NULL == fset) {
×
477
    (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
×
478
    return 0;
×
479
  }
480

UNCOV
481
  code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset);
×
UNCOV
482
  if (code) {
×
483
    (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
×
484
    return code;
×
485
  }
486

UNCOV
487
  fset->mergeScheduled = false;
×
UNCOV
488
  (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
×
UNCOV
489
  return 0;
×
490
}
491

UNCOV
492
int32_t tsdbMerge(void *arg) {
×
UNCOV
493
  int32_t    code = 0;
×
UNCOV
494
  int32_t    lino = 0;
×
UNCOV
495
  SMergeArg *mergeArg = (SMergeArg *)arg;
×
UNCOV
496
  STsdb     *tsdb = mergeArg->tsdb;
×
497

UNCOV
498
  SMerger merger[1] = {{
×
499
      .tsdb = tsdb,
UNCOV
500
      .fid = mergeArg->fid,
×
UNCOV
501
      .sttTrigger = tsdb->pVnode->config.sttTrigger,
×
502
  }};
503

UNCOV
504
  if (merger->sttTrigger <= 1) {
×
505
    return 0;
×
506
  }
507

508
  // copy snapshot
UNCOV
509
  code = tsdbMergeGetFSet(merger);
×
UNCOV
510
  TSDB_CHECK_CODE(code, lino, _exit);
×
511

UNCOV
512
  if (merger->fset == NULL) {
×
513
    return 0;
×
514
  }
515

516
  // do merge
UNCOV
517
  tsdbInfo("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid);
×
UNCOV
518
  code = tsdbDoMerge(merger);
×
UNCOV
519
  tsdbInfo("vgId:%d merge done, fid:%d", TD_VID(tsdb->pVnode), mergeArg->fid);
×
UNCOV
520
  TSDB_CHECK_CODE(code, lino, _exit);
×
521

UNCOV
522
_exit:
×
UNCOV
523
  if (merger->fset) {
×
UNCOV
524
    (void)taosThreadMutexLock(&tsdb->mutex);
×
UNCOV
525
    tsdbFinishTaskOnFileSet(tsdb, mergeArg->fid, EVA_TASK_MERGE);
×
UNCOV
526
    (void)taosThreadMutexUnlock(&tsdb->mutex);
×
527
  }
528

UNCOV
529
  if (code) {
×
530
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
531
    tsdbFatal("vgId:%d, failed to merge stt files since %s. code:%d", TD_VID(tsdb->pVnode), terrstr(), code);
×
532
    taosMsleep(100);
×
533
    exit(EXIT_FAILURE);
×
534
  }
535

UNCOV
536
  tsdbTFileSetClear(&merger->fset);
×
UNCOV
537
  taosMemoryFree(arg);
×
UNCOV
538
  return code;
×
539
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc