• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4688

26 Aug 2025 02:05PM UTC coverage: 56.997% (-0.9%) from 57.894%
#4688

push

travis-ci

web-flow
fix: modify the prompt language of the taos-shell (#32758)

* fix: modify prompt language

* fix: add shell test case

* fix: modify comments

* fix: modify test case for TDengine TSDB

130660 of 292423 branches covered (44.68%)

Branch coverage included in aggregate %.

16 of 17 new or added lines in 2 files covered. (94.12%)

9459 existing lines in 157 files now uncovered.

198294 of 284715 relevant lines covered (69.65%)

4532552.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.73
/source/dnode/vnode/src/tsdb/tsdbRetention.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tss.h"
17
#include "tsdb.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSet2.h"
20
#include "vnd.h"
21
#include "tsdbInt.h"
22

23
extern int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool ssMigrate);
24

25
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
×
26
  STFileOp op = {
×
27
      .optype = TSDB_FOP_REMOVE,
28
      .fid = fobj->f->fid,
×
29
      .of = fobj->f[0],
30
  };
31

32
  return TARRAY2_APPEND(&rtner->fopArr, op);
×
33
}
34

35
static int32_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
157✔
36
  int64_t interval = 1000;  // 1s
157✔
37
  int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX;
157!
38
  int64_t offset = 0;
157✔
39
  int64_t remain = size;
157✔
40

41
  while (remain > 0) {
314✔
42
    int64_t n;
43
    int64_t last = taosGetTimestampMs();
157✔
44
    if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) {
157!
45
      TAOS_CHECK_RETURN(terrno);
×
46
    }
47

48
    remain -= n;
157✔
49

50
    if (remain > 0) {
157!
51
      int64_t elapsed = taosGetTimestampMs() - last;
×
52
      if (elapsed < interval) {
×
53
        taosMsleep(interval - elapsed);
×
54
      }
55
    }
56
  }
57

58
  return 0;
157✔
59
}
60

61
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
×
62
  int32_t   code = 0;
×
63
  int32_t   lino = 0;
×
64
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
65
  char      fname_from[TSDB_FILENAME_LEN];
66
  char      fname_to[TSDB_FILENAME_LEN];
67

68
  tsdbTFileLastChunkName(rtner->tsdb, from->f, fname_from);
×
69
  tsdbTFileLastChunkName(rtner->tsdb, to, fname_to);
×
70

71
  fdFrom = taosOpenFile(fname_from, TD_FILE_READ);
×
72
  if (fdFrom == NULL) {
×
73
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
74
  }
75

76
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname_to, from->f->size);
×
77

78
  fdTo = taosOpenFile(fname_to, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
79
  if (fdTo == NULL) {
×
80
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
81
  }
82

83
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
84
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
85
  int64_t    lc_size = tsdbLogicToFileSize(to->size, rtner->szPage) - chunksize * (to->lcn - 1);
×
86

87
  if (taosFSendFile(fdTo, fdFrom, 0, lc_size) < 0) {
×
88
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
89
  }
90

91
_exit:
×
92
  if (code) {
×
93
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
94
              tstrerror(code));
95
  }
96
  if (taosCloseFile(&fdFrom) != 0) {
×
97
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_from);
×
98
  }
99
  if (taosCloseFile(&fdTo) != 0) {
×
100
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_to);
×
101
  }
102
  return code;
×
103
}
104

105
static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile *to) {
157✔
106
  int32_t code = 0;
157✔
107
  int32_t lino = 0;
157✔
108

109
  char      fname[TSDB_FILENAME_LEN];
110
  TdFilePtr fdFrom = NULL;
157✔
111
  TdFilePtr fdTo = NULL;
157✔
112

113
  tsdbTFileName(rtner->tsdb, to, fname);
157✔
114

115
  fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
157✔
116
  if (fdFrom == NULL) {
157!
117
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
118
  }
119

120
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, from->f->size);
157!
121

122
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
157✔
123
  if (fdTo == NULL) {
157!
124
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
125
  }
126
  TSDB_CHECK_CODE(code, lino, _exit);
157!
127

128
  TAOS_CHECK_GOTO(tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
314!
129
                                               tsRetentionSpeedLimitMB),
130
                  &lino, _exit);
131

132
_exit:
157✔
133
  if (code) {
157!
134
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
135
              tstrerror(code));
136
  }
137
  if (taosCloseFile(&fdFrom) != 0) {
157!
138
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
139
  }
140
  if (taosCloseFile(&fdTo) != 0) {
157!
141
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
142
  }
143
  return code;
157✔
144
}
145

146
static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) {
157✔
147
  int32_t  code = 0;
157✔
148
  int32_t  lino = 0;
157✔
149
  STFileOp op = {0};
157✔
150
  int32_t  lcn = fobj->f->lcn;
157✔
151

152
  // remove old
153
  op = (STFileOp){
157✔
154
      .optype = TSDB_FOP_REMOVE,
155
      .fid = fobj->f->fid,
157✔
156
      .of = fobj->f[0],
157✔
157
  };
158

159
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
314!
160

161
  // create new
162
  op = (STFileOp){
157✔
163
      .optype = TSDB_FOP_CREATE,
164
      .fid = fobj->f->fid,
157✔
165
      .nf =
166
          {
167
              .type = fobj->f->type,
157✔
168
              .did = did[0],
157✔
169
              .fid = fobj->f->fid,
157✔
170
              .minVer = fobj->f->minVer,
157✔
171
              .maxVer = fobj->f->maxVer,
157✔
172
              .mid = fobj->f->mid,
157✔
173
              .cid = fobj->f->cid,
157✔
174
              .size = fobj->f->size,
157✔
175
              .lcn = lcn,
176
              .stt[0] =
177
                  {
178
                      .level = fobj->f->stt[0].level,
157✔
179
                  },
180
          },
181
  };
182

183
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
314!
184

185
  // do copy the file
186

187
  if (lcn < 1) {
157!
188
    TAOS_CHECK_GOTO(tsdbDoCopyFile(rtner, fobj, &op.nf), &lino, _exit);
157!
189
  } else {
190
    TAOS_CHECK_GOTO(tsdbDoCopyFileLC(rtner, fobj, &op.nf), &lino, _exit);
×
191
  }
192

193
_exit:
×
194
  if (code) {
157!
195
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
196
              tstrerror(code));
197
  }
198
  return code;
157✔
199
}
200

201

202
static int32_t tsdbDoRetentionEnd(SRTNer *rtner, bool ssMigrate) {
280✔
203
  int32_t code = 0;
280✔
204
  int32_t lino = 0;
280✔
205

206
  if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
280✔
207
    EFEditT etype = ssMigrate ? TSDB_FEDIT_SSMIGRATE : TSDB_FEDIT_RETENTION;
77!
208
    TAOS_CHECK_GOTO(tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, etype), &lino, _exit);
77!
209

210
    (void)taosThreadMutexLock(&rtner->tsdb->mutex);
77✔
211

212
    code = tsdbFSEditCommit(rtner->tsdb->pFS);
77✔
213
    if (code) {
77!
UNCOV
214
      (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
×
UNCOV
215
      TSDB_CHECK_CODE(code, lino, _exit);
×
216
    }
217

218
    (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
77✔
219

220
    TARRAY2_DESTROY(&rtner->fopArr, NULL);
77!
221
  }
222

223
_exit:
203✔
224
  if (code) {
280!
UNCOV
225
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
226
              tstrerror(code));
227
  } else {
228
    tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
280!
229
  }
230
  return code;
280✔
231
}
232

233
static int32_t tsdbRemoveOrMoveFileObject(SRTNer *rtner, int32_t expLevel, STFileObj *fobj) {
1,201✔
234
  int32_t code = 0;
1,201✔
235
  int32_t lino = 0;
1,201✔
236

237
  if (fobj == NULL) {
1,201✔
238
    return code;
523✔
239
  }
240

241
  if (expLevel < 0) {
678!
242
    // remove the file
UNCOV
243
    code = tsdbDoRemoveFileObject(rtner, fobj);
×
UNCOV
244
    TSDB_CHECK_CODE(code, lino, _exit);
×
245
  } else if (expLevel > fobj->f->did.level) {
678✔
246
    // Try to move the file to a new level
247
    for (; expLevel > fobj->f->did.level; expLevel--) {
157!
248
      SDiskID diskId = {0};
157✔
249

250
      code = tsdbAllocateDiskAtLevel(rtner->tsdb, expLevel, tsdbFTypeLabel(fobj->f->type), &diskId);
157✔
251
      if (code) {
157!
252
        tsdbTrace("vgId:%d, cannot allocate disk for file %s, level:%d, reason:%s, skip!", TD_VID(rtner->tsdb->pVnode),
×
253
                  fobj->fname, expLevel, tstrerror(code));
UNCOV
254
        code = 0;
×
UNCOV
255
        continue;
×
256
      } else {
257
        tsdbInfo("vgId:%d start to migrate file %s from level %d to %d, size:%" PRId64, TD_VID(rtner->tsdb->pVnode),
157!
258
                 fobj->fname, fobj->f->did.level, diskId.level, fobj->f->size);
259

260
        code = tsdbDoMigrateFileObj(rtner, fobj, &diskId);
157✔
261
        TSDB_CHECK_CODE(code, lino, _exit);
157!
262

263
        tsdbInfo("vgId:%d end to migrate file %s from level %d to %d, size:%" PRId64, TD_VID(rtner->tsdb->pVnode),
157!
264
                 fobj->fname, fobj->f->did.level, diskId.level, fobj->f->size);
265
        break;
157✔
266
      }
267
    }
268
  }
269

270
_exit:
521✔
271
  if (code) {
678!
UNCOV
272
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
273
              tstrerror(code));
274
  }
275
  return code;
678✔
276
}
277

278
static int32_t tsdbDoRetention(SRTNer *rtner) {
280✔
279
  int32_t    code = 0;
280✔
280
  int32_t    lino = 0;
280✔
281
  STFileObj *fobj = NULL;
280✔
282
  STFileSet *fset = rtner->fset;
280✔
283

284
  // handle data file sets
285
  int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
280✔
286
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
1,400✔
287
    code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fset->farr[ftype]);
1,120✔
288
    TSDB_CHECK_CODE(code, lino, _exit);
1,120!
289
  }
290

291
  // handle stt file
292
  SSttLvl *lvl;
293
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
361✔
294
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
162✔
295
      code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fobj);
81✔
296
      TSDB_CHECK_CODE(code, lino, _exit);
81!
297
    }
298
  }
299

300
_exit:
280✔
301
  if (code) {
280!
UNCOV
302
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
303
              tstrerror(code));
304
  }
305
  return code;
280✔
306
}
307

UNCOV
308
void tsdbRetentionCancel(void *arg) { taosMemoryFree(arg); }
×
309

310
int32_t tsdbRetention(void *arg) {
280✔
311
  int32_t code = 0;
280✔
312
  int32_t lino = 0;
280✔
313

314
  SRtnArg   *rtnArg = (SRtnArg *)arg;
280✔
315
  STsdb     *pTsdb = rtnArg->tsdb;
280✔
316
  SVnode    *pVnode = pTsdb->pVnode;
280✔
317
  STFileSet *fset = NULL;
280✔
318
  SRTNer     rtner = {
560✔
319
          .tsdb = pTsdb,
320
          .szPage = pVnode->config.tsdbPageSize,
280✔
321
          .now = rtnArg->now,
280✔
322
          .lastCommit = rtnArg->lastCommit,
280✔
323
          .cid = tsdbFSAllocEid(pTsdb->pFS),
280✔
324
          .nodeId = rtnArg->nodeId,
280✔
325
  };
326

327
  // begin task
328
  (void)taosThreadMutexLock(&pTsdb->mutex);
280✔
329

330
  // check if background task is disabled
331
  if (pTsdb->bgTaskDisabled) {
280!
UNCOV
332
    tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(pTsdb->pVnode));
×
UNCOV
333
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
UNCOV
334
    return 0;
×
335
  }
336

337
  // set flag and copy
338
  tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION, &fset);
280✔
339
  if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) {
280!
340
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
341
    TSDB_CHECK_CODE(code, lino, _exit);
×
342
  }
343

344
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
280✔
345

346
  // do retention
347
  if (rtner.fset) {
280!
348
    if (rtnArg->ssMigrate) {
280!
349
      TAOS_CHECK_GOTO(tsdbDoSsMigrate(&rtner), &lino, _exit);
×
350
    } else {
351
      TAOS_CHECK_GOTO(tsdbDoRetention(&rtner), &lino, _exit);
280!
352
    }
353

354
    TAOS_CHECK_GOTO(tsdbDoRetentionEnd(&rtner, rtnArg->ssMigrate), &lino, _exit);
280!
355
  }
356

357
_exit:
280✔
358
  if (rtner.fset) {
280!
359
    (void)taosThreadMutexLock(&pTsdb->mutex);
280✔
360
    tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION);
280✔
361
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
280✔
362
  }
363

364
  // clear resources
365
  tsdbTFileSetClear(&rtner.fset);
280✔
366
  TARRAY2_DESTROY(&rtner.fopArr, NULL);
280!
367
  taosMemoryFree(arg);
280!
368
  if (code) {
280!
UNCOV
369
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
370
  }
371
  return code;
280✔
372
}
373

374
static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now) {
43✔
375
  int32_t tsdbSsMigrateMonitorAddFileSet(STsdb *tsdb, int32_t fid);
376

377
  int32_t code = 0;
43✔
378
  int32_t lino = 0;
43✔
379

380
  // check if background task is disabled
381
  if (tsdb->bgTaskDisabled) {
43!
UNCOV
382
    tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(tsdb->pVnode));
×
UNCOV
383
    return 0;
×
384
  }
385

386
  STFileSet *fset;
387
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
323✔
388
    SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
280!
389
    if (arg == NULL) {
280!
390
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
391
    }
392

393
    arg->tsdb = tsdb;
280✔
394
    arg->now = now;
280✔
395
    arg->fid = fset->fid;
280✔
396
    arg->nodeId = 0;
280✔
397
    arg->ssMigrate = false;
280✔
398
    arg->lastCommit = fset->lastCommit;
280✔
399

400
    code = vnodeAsync(RETENTION_TASK_ASYNC, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg,
280✔
401
                      &fset->retentionTask);
402
    if (code) {
280!
403
      taosMemoryFree(arg);
×
404
      TSDB_CHECK_CODE(code, lino, _exit);
×
405
    }
406
  }
407

408
_exit:
43✔
409
  if (code) {
43!
UNCOV
410
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
411
  }
412
  return code;
43✔
413
}
414

415
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now) {
42✔
416
  int32_t code = 0;
42✔
417
  (void)taosThreadMutexLock(&tsdb->mutex);
42✔
418
  code = tsdbAsyncRetentionImpl(tsdb, now);
43✔
419
  (void)taosThreadMutexUnlock(&tsdb->mutex);
43✔
420
  return code;
43✔
421
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc