• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4860

22 Nov 2025 07:23AM UTC coverage: 64.335% (+0.06%) from 64.272%
#4860

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

0 of 79 new or added lines in 2 files covered. (0.0%)

559 existing lines in 108 files now uncovered.

154614 of 240326 relevant lines covered (64.34%)

112619890.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.04
/source/dnode/vnode/src/tsdb/tsdbMerge.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbMerge.h"
17

18
#define TSDB_MAX_LEVEL 2  // means max level is 3
19

20
typedef struct {
21
  STsdb     *tsdb;
22
  int32_t    fid;
23
  STFileSet *fset;
24

25
  int32_t sttTrigger;
26
  int32_t maxRow;
27
  int32_t minRow;
28
  int32_t szPage;
29
  int8_t  cmprAlg;
30
  int64_t compactVersion;
31
  int64_t cid;
32

33
  // context
34
  struct {
35
    bool       opened;
36
    int64_t    now;
37
    STFileSet *fset;
38
    bool       toData;
39
    int32_t    level;
40
    TABLEID    tbid[1];
41
  } ctx[1];
42

43
  TFileOpArray fopArr[1];
44

45
  // reader
46
  TSttFileReaderArray sttReaderArr[1];
47
  // iter
48
  TTsdbIterArray dataIterArr[1];
49
  SIterMerger   *dataIterMerger;
50
  TTsdbIterArray tombIterArr[1];
51
  SIterMerger   *tombIterMerger;
52
  // writer
53
  SFSetWriter *writer;
54
} SMerger;
55

56
static int32_t tsdbMergerOpen(SMerger *merger) {
957,455✔
57
  merger->ctx->now = taosGetTimestampSec();
957,455✔
58
  merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows;
957,455✔
59
  merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows;
957,455✔
60
  merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize;
957,455✔
61
  merger->cmprAlg = merger->tsdb->pVnode->config.tsdbCfg.compression;
957,455✔
62
  merger->compactVersion = INT64_MAX;
957,455✔
63
  merger->cid = tsdbFSAllocEid(merger->tsdb->pFS);
957,455✔
64
  merger->ctx->opened = true;
957,455✔
65
  return 0;
957,455✔
66
}
67

68
static int32_t tsdbMergerClose(SMerger *merger) {
957,455✔
69
  int32_t lino = 0;
957,455✔
70
  SVnode *pVnode = merger->tsdb->pVnode;
957,455✔
71

72
  // clear the merge
73
  TARRAY2_DESTROY(merger->tombIterArr, NULL);
957,455✔
74
  TARRAY2_DESTROY(merger->dataIterArr, NULL);
957,455✔
75
  TARRAY2_DESTROY(merger->sttReaderArr, NULL);
957,455✔
76
  TARRAY2_DESTROY(merger->fopArr, NULL);
957,455✔
77
  return 0;
957,455✔
78
}
79

80
static void tsdbMergeFileSetEndCloseReader(SMerger *merger) {
957,455✔
81
  TARRAY2_CLEAR(merger->sttReaderArr, tsdbSttFileReaderClose);
3,621,190✔
82
}
957,455✔
83

84
static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) {
957,455✔
85
  int32_t  code = 0;
957,455✔
86
  int32_t  lino = 0;
957,455✔
87
  SSttLvl *lvl;
88

89
  bool hasLevelLargerThanMax = false;
957,455✔
90
  TARRAY2_FOREACH_REVERSE(merger->ctx->fset->lvlArr, lvl) {
957,455✔
91
    if (lvl->level <= TSDB_MAX_LEVEL) {
957,455✔
92
      break;
957,455✔
93
    } else if (TARRAY2_SIZE(lvl->fobjArr) > 0) {
×
94
      hasLevelLargerThanMax = true;
×
95
      break;
×
96
    }
97
  }
98

99
  if (hasLevelLargerThanMax) {
957,455✔
100
    // merge all stt files
101
    merger->ctx->toData = true;
×
102
    merger->ctx->level = TSDB_MAX_LEVEL;
×
103

104
    TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
×
105
      int32_t numMergeFile = TARRAY2_SIZE(lvl->fobjArr);
×
106

107
      for (int32_t i = 0; i < numMergeFile; ++i) {
×
108
        STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i);
×
109

110
        STFileOp op = {
×
111
            .optype = TSDB_FOP_REMOVE,
112
            .fid = merger->ctx->fset->fid,
×
113
            .of = fobj->f[0],
114
        };
115
        TAOS_CHECK_GOTO(TARRAY2_APPEND(merger->fopArr, op), &lino, _exit);
×
116

117
        SSttFileReader      *reader;
×
118
        SSttFileReaderConfig config = {
×
119
            .tsdb = merger->tsdb,
×
120
            .szPage = merger->szPage,
×
121
            .file[0] = fobj->f[0],
122
        };
123

124
        TAOS_CHECK_GOTO(tsdbSttFileReaderOpen(fobj->fname, &config, &reader), &lino, _exit);
×
125

126
        TAOS_CHECK_GOTO(TARRAY2_APPEND(merger->sttReaderArr, reader), &lino, _exit);
×
127
      }
128
    }
129
  } else {
130
    // do regular merge
131
    merger->ctx->toData = true;
957,455✔
132
    merger->ctx->level = 0;
957,455✔
133

134
    // find the highest level that can be merged to
135
    for (int32_t i = 0, numCarry = 0;;) {
957,455✔
136
      int32_t numFile = numCarry;
2,658,305✔
137
      if (i < TARRAY2_SIZE(merger->ctx->fset->lvlArr) &&
2,658,305✔
138
          merger->ctx->level == TARRAY2_GET(merger->ctx->fset->lvlArr, i)->level) {
2,076,880✔
139
        numFile += TARRAY2_SIZE(TARRAY2_GET(merger->ctx->fset->lvlArr, i)->fobjArr);
1,698,540✔
140
        i++;
1,698,540✔
141
      }
142

143
      numCarry = numFile / merger->sttTrigger;
2,658,305✔
144
      if (numCarry == 0) {
2,658,305✔
145
        break;
957,455✔
146
      } else {
147
        merger->ctx->level++;
1,700,850✔
148
      }
149
    }
150

151
    if (merger->ctx->level <= TSDB_MAX_LEVEL) {
957,455✔
152
      TARRAY2_FOREACH_REVERSE(merger->ctx->fset->lvlArr, lvl) {
603,337✔
153
        if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
603,337✔
154
          continue;
×
155
        }
156

157
        if (lvl->level >= merger->ctx->level) {
603,337✔
158
          merger->ctx->toData = false;
378,340✔
159
        }
160
        break;
603,337✔
161
      }
162
    }
163

164
    // get number of level-0 files to merge
165
    int32_t numFile = pow(merger->sttTrigger, merger->ctx->level);
957,455✔
166
    TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
2,655,995✔
167
      if (lvl->level == 0) continue;
2,076,880✔
168
      if (lvl->level >= merger->ctx->level) break;
1,119,425✔
169

170
      numFile = numFile - TARRAY2_SIZE(lvl->fobjArr) * pow(merger->sttTrigger, lvl->level);
741,085✔
171
    }
172

173
    // get file system operations
174
    TARRAY2_FOREACH(merger->ctx->fset->lvlArr, lvl) {
2,655,995✔
175
      if (lvl->level >= merger->ctx->level) {
2,076,880✔
176
        break;
378,340✔
177
      }
178

179
      int32_t numMergeFile;
180
      if (lvl->level == 0) {
1,698,540✔
181
        numMergeFile = numFile;
957,455✔
182
      } else {
183
        numMergeFile = TARRAY2_SIZE(lvl->fobjArr);
741,085✔
184
      }
185

186
      for (int32_t i = 0; i < numMergeFile; ++i) {
4,362,275✔
187
        STFileObj *fobj = TARRAY2_GET(lvl->fobjArr, i);
2,663,735✔
188

189
        STFileOp op = {
5,326,550✔
190
            .optype = TSDB_FOP_REMOVE,
191
            .fid = merger->ctx->fset->fid,
2,663,735✔
192
            .of = fobj->f[0],
193
        };
194
        TAOS_CHECK_GOTO(TARRAY2_APPEND(merger->fopArr, op), &lino, _exit);
5,327,470✔
195

196
        SSttFileReader      *reader;
2,662,815✔
197
        SSttFileReaderConfig config = {
5,326,550✔
198
            .tsdb = merger->tsdb,
2,663,735✔
199
            .szPage = merger->szPage,
2,663,735✔
200
            .file[0] = fobj->f[0],
201
        };
202

203
        TAOS_CHECK_GOTO(tsdbSttFileReaderOpen(fobj->fname, &config, &reader), &lino, _exit);
2,663,735✔
204

205
        if ((code = TARRAY2_APPEND(merger->sttReaderArr, reader))) {
5,326,834✔
206
          tsdbSttFileReaderClose(&reader);
×
207
          TSDB_CHECK_CODE(code, lino, _exit);
×
208
        }
209
      }
210
    }
211

212
    if (merger->ctx->level > TSDB_MAX_LEVEL) {
957,455✔
213
      merger->ctx->level = TSDB_MAX_LEVEL;
354,118✔
214
    }
215
  }
216

217
_exit:
956,819✔
218
  if (code) {
957,455✔
219
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(merger->tsdb->pVnode), __func__, __FILE__, lino,
×
220
              tstrerror(code));
221
    tsdbMergeFileSetEndCloseReader(merger);
×
222
  }
223
  return code;
957,455✔
224
}
225

226
static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) {
957,455✔
227
  int32_t code = 0;
957,455✔
228
  int32_t lino = 0;
957,455✔
229
  int32_t vid = TD_VID(merger->tsdb->pVnode);
957,455✔
230

231
  SSttFileReader *sttReader;
232
  TARRAY2_FOREACH(merger->sttReaderArr, sttReader) {
3,621,190✔
233
    STsdbIter      *iter;
2,662,815✔
234
    STsdbIterConfig config = {0};
2,663,735✔
235

236
    // data iter
237
    config.type = TSDB_ITER_TYPE_STT;
2,663,735✔
238
    config.sttReader = sttReader;
2,663,735✔
239

240
    TAOS_CHECK_GOTO(tsdbIterOpen(&config, &iter), &lino, _exit);
2,663,735✔
241
    TAOS_CHECK_GOTO(TARRAY2_APPEND(merger->dataIterArr, iter), &lino, _exit);
5,327,470✔
242

243
    // tomb iter
244
    config.type = TSDB_ITER_TYPE_STT_TOMB;
2,663,735✔
245
    config.sttReader = sttReader;
2,663,735✔
246

247
    TAOS_CHECK_GOTO(tsdbIterOpen(&config, &iter), &lino, _exit);
2,663,735✔
248

249
    TAOS_CHECK_GOTO(TARRAY2_APPEND(merger->tombIterArr, iter), &lino, _exit);
5,327,470✔
250
  }
251

252
  TAOS_CHECK_GOTO(tsdbIterMergerOpen(merger->dataIterArr, &merger->dataIterMerger, false), &lino, _exit);
957,455✔
253

254
  TAOS_CHECK_GOTO(tsdbIterMergerOpen(merger->tombIterArr, &merger->tombIterMerger, true), &lino, _exit);
957,455✔
255

256
_exit:
957,455✔
257
  if (code) {
957,455✔
258
    tsdbError("vgId:%d %s failed at %s:%d since %s", vid, __func__, __FILE__, lino, tstrerror(code));
×
259
  }
260
  return code;
957,455✔
261
}
262

263
static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
957,455✔
264
  int32_t code = 0;
957,455✔
265
  int32_t lino = 0;
957,455✔
266
  int32_t vid = TD_VID(merger->tsdb->pVnode);
957,455✔
267

268
  SFSetWriterConfig config = {
957,915✔
269
      .tsdb = merger->tsdb,
957,455✔
270
      .toSttOnly = true,
271
      .compactVersion = merger->compactVersion,
957,455✔
272
      .minRow = merger->minRow,
957,455✔
273
      .maxRow = merger->maxRow,
957,455✔
274
      .szPage = merger->szPage,
957,455✔
275
      .cmprAlg = merger->cmprAlg,
957,455✔
276
      .fid = merger->ctx->fset->fid,
957,455✔
277
      .cid = merger->cid,
957,455✔
278
      .expLevel = tsdbFidLevel(merger->ctx->fset->fid, &merger->tsdb->keepCfg, merger->ctx->now),
957,455✔
279
      .level = merger->ctx->level,
957,455✔
280
  };
281

282
  if (merger->ctx->toData) {
957,455✔
283
    config.toSttOnly = false;
579,115✔
284

285
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
2,895,575✔
286
      if (merger->ctx->fset->farr[ftype]) {
2,316,460✔
287
        config.files[ftype].exist = true;
532,610✔
288
        config.files[ftype].file = merger->ctx->fset->farr[ftype]->f[0];
532,610✔
289
      } else {
290
        config.files[ftype].exist = false;
1,783,850✔
291
      }
292
    }
293
  }
294

295
  TAOS_CHECK_GOTO(tsdbFSetWriterOpen(&config, &merger->writer), &lino, _exit);
957,455✔
296

297
_exit:
957,455✔
298
  if (code) {
957,455✔
299
    tsdbError("vgId:%d %s failed at %s:%d since %s", vid, __func__, __FILE__, lino, tstrerror(code));
×
300
  }
301
  return code;
957,455✔
302
}
303

304
static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
957,455✔
305
  int32_t code = 0;
957,455✔
306
  int32_t lino = 0;
957,455✔
307

308
  TARRAY2_CLEAR(merger->fopArr, NULL);
957,455✔
309

310
  merger->ctx->tbid->suid = 0;
957,455✔
311
  merger->ctx->tbid->uid = 0;
957,455✔
312

313
  // open reader
314
  TAOS_CHECK_GOTO(tsdbMergeFileSetBeginOpenReader(merger), &lino, _exit);
957,455✔
315

316
  // open iterator
317
  TAOS_CHECK_GOTO(tsdbMergeFileSetBeginOpenIter(merger), &lino, _exit);
957,455✔
318

319
  // open writer
320
  TAOS_CHECK_GOTO(tsdbMergeFileSetBeginOpenWriter(merger), &lino, _exit);
957,455✔
321

322
_exit:
957,455✔
323
  if (code) {
957,455✔
324
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(merger->tsdb->pVnode), __func__, __FILE__, lino,
×
325
              tstrerror(code));
326
  }
327
  return code;
957,455✔
328
}
329

330
static int32_t tsdbMergeFileSetEndCloseWriter(SMerger *merger) {
957,455✔
331
  return tsdbFSetWriterClose(&merger->writer, 0, merger->fopArr);
957,455✔
332
}
333

334
static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) {
957,455✔
335
  tsdbIterMergerClose(&merger->tombIterMerger);
957,455✔
336
  TARRAY2_CLEAR(merger->tombIterArr, tsdbIterClose);
3,619,850✔
337
  tsdbIterMergerClose(&merger->dataIterMerger);
957,455✔
338
  TARRAY2_CLEAR(merger->dataIterArr, tsdbIterClose);
3,621,054✔
339
  return 0;
957,455✔
340
}
341

342
static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
957,455✔
343
  int32_t code = 0;
957,455✔
344
  int32_t lino = 0;
957,455✔
345

346
  TAOS_CHECK_GOTO(tsdbMergeFileSetEndCloseWriter(merger), &lino, _exit);
957,455✔
347

348
  TAOS_CHECK_GOTO(tsdbMergeFileSetEndCloseIter(merger), &lino, _exit);
957,455✔
349

350
  tsdbMergeFileSetEndCloseReader(merger);
957,455✔
351

352
  // edit file system
353
  TAOS_CHECK_GOTO(tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE), &lino, _exit);
957,455✔
354

355
  (void)taosThreadMutexLock(&merger->tsdb->mutex);
957,455✔
356
  code = tsdbFSEditCommit(merger->tsdb->pFS);
957,455✔
357
  if (code) {
957,455✔
358
    (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
×
359
    TSDB_CHECK_CODE(code, lino, _exit);
×
360
  }
361
  (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
957,455✔
362

363
_exit:
957,455✔
364
  if (code) {
957,455✔
365
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(merger->tsdb->pVnode), __func__, __FILE__, lino,
×
366
              tstrerror(code));
367
    TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code);
×
368
  }
369
  return code;
957,455✔
370
}
371

372
static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
957,455✔
373
  int32_t code = 0;
957,455✔
374
  int32_t lino = 0;
957,455✔
375

376
  merger->ctx->fset = fset;
957,455✔
377
  TAOS_CHECK_GOTO(tsdbMergeFileSetBegin(merger), &lino, _exit);
957,455✔
378

379
  // data
380
  SMetaInfo info;
956,995✔
381
  SRowInfo *row;
382
  merger->ctx->tbid->suid = 0;
957,455✔
383
  merger->ctx->tbid->uid = 0;
957,455✔
384
  while ((row = tsdbIterMergerGetData(merger->dataIterMerger)) != NULL) {
2,147,483,647✔
385
    if (row->uid != merger->ctx->tbid->uid) {
2,147,483,647✔
386
      merger->ctx->tbid->uid = row->uid;
11,995,148✔
387
      merger->ctx->tbid->suid = row->suid;
11,995,148✔
388

389
      if (metaGetInfo(merger->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) {
11,995,861✔
390
        TAOS_CHECK_GOTO(tsdbIterMergerSkipTableData(merger->dataIterMerger, merger->ctx->tbid), &lino, _exit);
320,042✔
391
        continue;
320,042✔
392
      }
393
    }
394

395
    TAOS_CHECK_GOTO(tsdbFSetWriteRow(merger->writer, row), &lino, _exit);
2,147,483,647✔
396

397
    TAOS_CHECK_GOTO(tsdbIterMergerNext(merger->dataIterMerger), &lino, _exit);
2,147,483,647✔
398
  }
399

400
  // tomb
401
  merger->ctx->tbid->suid = 0;
957,455✔
402
  merger->ctx->tbid->uid = 0;
957,455✔
403
  for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(merger->tombIterMerger)) != NULL;) {
5,830,678✔
404
    if (record->uid != merger->ctx->tbid->uid) {
4,873,223✔
405
      merger->ctx->tbid->uid = record->uid;
996,027✔
406
      merger->ctx->tbid->suid = record->suid;
996,027✔
407

408
      if (metaGetInfo(merger->tsdb->pVnode->pMeta, record->uid, &info, NULL) != 0) {
996,027✔
409
        TAOS_CHECK_GOTO(tsdbIterMergerSkipTableData(merger->tombIterMerger, merger->ctx->tbid), &lino, _exit);
78,361✔
410
        continue;
78,361✔
411
      }
412
    }
413
    TAOS_CHECK_GOTO(tsdbFSetWriteTombRecord(merger->writer, record), &lino, _exit);
4,794,862✔
414

415
    TAOS_CHECK_GOTO(tsdbIterMergerNext(merger->tombIterMerger), &lino, _exit);
4,794,862✔
416
  }
417

418
  TAOS_CHECK_GOTO(tsdbMergeFileSetEnd(merger), &lino, _exit);
957,455✔
419

420
_exit:
957,455✔
421
  if (code) {
957,455✔
422
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(merger->tsdb->pVnode), __func__, __FILE__, lino,
×
423
              tstrerror(code));
424
  } else {
425
    tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(merger->tsdb->pVnode), __func__, fset->fid);
957,455✔
426
  }
427
  return code;
957,455✔
428
}
429

430
static int32_t tsdbDoMerge(SMerger *merger) {
958,268✔
431
  int32_t code = 0;
958,268✔
432
  int32_t lino = 0;
958,268✔
433

434
  if (TARRAY2_SIZE(merger->fset->lvlArr) == 0) return 0;
958,268✔
435

436
  SSttLvl *lvl = TARRAY2_FIRST(merger->fset->lvlArr);
957,455✔
437
  if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < merger->sttTrigger) {
957,455✔
438
    return 0;
×
439
  }
440

441
  TAOS_CHECK_GOTO(tsdbMergerOpen(merger), &lino, _exit);
957,455✔
442
  TAOS_CHECK_GOTO(tsdbMergeFileSet(merger, merger->fset), &lino, _exit);
957,455✔
443
  TAOS_CHECK_GOTO(tsdbMergerClose(merger), &lino, _exit);
957,455✔
444

445
_exit:
957,455✔
446
  if (code) {
957,455✔
447
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(merger->tsdb->pVnode), __func__, __FILE__, lino,
×
448
              tstrerror(code));
449
  } else {
450
    tsdbDebug("vgId:%d %s done", TD_VID(merger->tsdb->pVnode), __func__);
957,455✔
451
  }
452
  return code;
957,455✔
453
}
454

455
static int32_t tsdbMergeGetFSet(SMerger *merger) {
958,268✔
456
  STFileSet *fset;
957,808✔
457
  int32_t    code;
458
  STsdb     *tsdb = merger->tsdb;
958,268✔
459

460
  (void)taosThreadMutexLock(&merger->tsdb->mutex);
958,268✔
461

462
  if (tsdb->bgTaskDisabled) {
958,268✔
UNCOV
463
    (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
×
UNCOV
464
    return 0;
×
465
  }
466

467
  tsdbBeginTaskOnFileSet(tsdb, merger->fid, EVA_TASK_MERGE, &fset);
958,268✔
468
  if (NULL == fset) {
958,268✔
469
    (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
×
470
    return 0;
×
471
  }
472

473
  code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset);
958,268✔
474
  if (code) {
958,268✔
475
    (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
×
476
    return code;
×
477
  }
478

479
  (void)taosThreadMutexUnlock(&merger->tsdb->mutex);
958,268✔
480
  return 0;
958,268✔
481
}
482

483
int32_t tsdbMerge(void *arg) {
958,268✔
484
  int32_t    code = 0;
958,268✔
485
  int32_t    lino = 0;
958,268✔
486
  SMergeArg *mergeArg = (SMergeArg *)arg;
958,268✔
487
  STsdb     *tsdb = mergeArg->tsdb;
958,268✔
488

489
  SMerger merger[1] = {{
958,268✔
490
      .tsdb = tsdb,
491
      .fid = mergeArg->fid,
958,268✔
492
      .sttTrigger = tsdb->pVnode->config.sttTrigger,
958,268✔
493
  }};
494

495
  if (merger->sttTrigger <= 1) {
958,268✔
496
    return 0;
×
497
  }
498

499
  // copy snapshot
500
  code = tsdbMergeGetFSet(merger);
958,268✔
501
  TSDB_CHECK_CODE(code, lino, _exit);
958,268✔
502

503
  if (merger->fset == NULL) {
958,268✔
UNCOV
504
    return 0;
×
505
  }
506

507
  // do merge
508
  tsdbInfo("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid);
958,268✔
509
  METRICS_TIMING_BLOCK(tsdb->pVnode->writeMetrics.merge_time, METRIC_LEVEL_HIGH, { code = tsdbDoMerge(merger); });
958,268✔
510
  METRICS_UPDATE(tsdb->pVnode->writeMetrics.merge_count, METRIC_LEVEL_HIGH, 1);
958,268✔
511
  tsdbInfo("vgId:%d merge done, fid:%d", TD_VID(tsdb->pVnode), mergeArg->fid);
958,268✔
512
  TSDB_CHECK_CODE(code, lino, _exit);
958,268✔
513

514
_exit:
958,268✔
515
  if (merger->fset) {
958,268✔
516
    (void)taosThreadMutexLock(&tsdb->mutex);
958,268✔
517
    tsdbFinishTaskOnFileSet(tsdb, mergeArg->fid, EVA_TASK_MERGE);
958,268✔
518
    (void)taosThreadMutexUnlock(&tsdb->mutex);
958,268✔
519
  }
520

521
  if (code) {
958,268✔
522
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
523
    tsdbFatal("vgId:%d, failed to merge stt files since %s. code:%d", TD_VID(tsdb->pVnode), terrstr(), code);
×
524
    taosMsleep(100);
×
525
    exit(EXIT_FAILURE);
×
526
  }
527

528
  tsdbTFileSetClear(&merger->fset);
958,268✔
529
  taosMemoryFree(arg);
958,268✔
530
  return code;
958,268✔
531
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc