• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4506

15 Jul 2025 12:33AM UTC coverage: 62.026% (-0.7%) from 62.706%
#4506

push

travis-ci

web-flow
docs: update stream docs (#31874)

155391 of 320094 branches covered (48.55%)

Branch coverage included in aggregate %.

240721 of 318525 relevant lines covered (75.57%)

6529048.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.27
/source/dnode/vnode/src/tsdb/tsdbRetention.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tss.h"
17
#include "tsdb.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSet2.h"
20
#include "vnd.h"
21
#include "tsdbInt.h"
22

23
extern int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool ssMigrate);
24

25
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
×
26
  STFileOp op = {
×
27
      .optype = TSDB_FOP_REMOVE,
28
      .fid = fobj->f->fid,
×
29
      .of = fobj->f[0],
30
  };
31

32
  return TARRAY2_APPEND(&rtner->fopArr, op);
×
33
}
34

35
static int32_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
24✔
36
  int64_t interval = 1000;  // 1s
24✔
37
  int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX;
24!
38
  int64_t offset = 0;
24✔
39
  int64_t remain = size;
24✔
40

41
  while (remain > 0) {
48✔
42
    int64_t n;
43
    int64_t last = taosGetTimestampMs();
24✔
44
    if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) {
24!
45
      TAOS_CHECK_RETURN(terrno);
×
46
    }
47

48
    remain -= n;
24✔
49

50
    if (remain > 0) {
24!
51
      int64_t elapsed = taosGetTimestampMs() - last;
×
52
      if (elapsed < interval) {
×
53
        taosMsleep(interval - elapsed);
×
54
      }
55
    }
56
  }
57

58
  return 0;
24✔
59
}
60

61
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
×
62
  int32_t   code = 0;
×
63
  int32_t   lino = 0;
×
64
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
65
  char      fname_from[TSDB_FILENAME_LEN];
66
  char      fname_to[TSDB_FILENAME_LEN];
67

68
  tsdbTFileLastChunkName(rtner->tsdb, from->f, fname_from);
×
69
  tsdbTFileLastChunkName(rtner->tsdb, to, fname_to);
×
70

71
  fdFrom = taosOpenFile(fname_from, TD_FILE_READ);
×
72
  if (fdFrom == NULL) {
×
73
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
74
  }
75

76
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname_to, from->f->size);
×
77

78
  fdTo = taosOpenFile(fname_to, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
79
  if (fdTo == NULL) {
×
80
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
81
  }
82

83
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
84
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
85
  int64_t    lc_size = tsdbLogicToFileSize(to->size, rtner->szPage) - chunksize * (to->lcn - 1);
×
86

87
  if (taosFSendFile(fdTo, fdFrom, 0, lc_size) < 0) {
×
88
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
89
  }
90

91
_exit:
×
92
  if (code) {
×
93
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
94
              tstrerror(code));
95
  }
96
  if (taosCloseFile(&fdFrom) != 0) {
×
97
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_from);
×
98
  }
99
  if (taosCloseFile(&fdTo) != 0) {
×
100
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_to);
×
101
  }
102
  return code;
×
103
}
104

105
static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile *to) {
24✔
106
  int32_t code = 0;
24✔
107
  int32_t lino = 0;
24✔
108

109
  char      fname[TSDB_FILENAME_LEN];
110
  TdFilePtr fdFrom = NULL;
24✔
111
  TdFilePtr fdTo = NULL;
24✔
112

113
  tsdbTFileName(rtner->tsdb, to, fname);
24✔
114

115
  fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
24✔
116
  if (fdFrom == NULL) {
24!
117
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
118
  }
119

120
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, from->f->size);
24!
121

122
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
24✔
123
  if (fdTo == NULL) {
24!
124
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
125
  }
126
  TSDB_CHECK_CODE(code, lino, _exit);
24!
127

128
  TAOS_CHECK_GOTO(tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
48!
129
                                               tsRetentionSpeedLimitMB),
130
                  &lino, _exit);
131

132
_exit:
24✔
133
  if (code) {
24!
134
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
135
              tstrerror(code));
136
  }
137
  if (taosCloseFile(&fdFrom) != 0) {
24!
138
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
139
  }
140
  if (taosCloseFile(&fdTo) != 0) {
24!
141
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
142
  }
143
  return code;
24✔
144
}
145

146
static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) {
24✔
147
  int32_t  code = 0;
24✔
148
  int32_t  lino = 0;
24✔
149
  STFileOp op = {0};
24✔
150
  int32_t  lcn = fobj->f->lcn;
24✔
151

152
  // remove old
153
  op = (STFileOp){
24✔
154
      .optype = TSDB_FOP_REMOVE,
155
      .fid = fobj->f->fid,
24✔
156
      .of = fobj->f[0],
24✔
157
  };
158

159
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
48!
160

161
  // create new
162
  op = (STFileOp){
24✔
163
      .optype = TSDB_FOP_CREATE,
164
      .fid = fobj->f->fid,
24✔
165
      .nf =
166
          {
167
              .type = fobj->f->type,
24✔
168
              .did = did[0],
24✔
169
              .fid = fobj->f->fid,
24✔
170
              .minVer = fobj->f->minVer,
24✔
171
              .maxVer = fobj->f->maxVer,
24✔
172
              .mid = fobj->f->mid,
24✔
173
              .cid = fobj->f->cid,
24✔
174
              .size = fobj->f->size,
24✔
175
              .lcn = lcn,
176
              .stt[0] =
177
                  {
178
                      .level = fobj->f->stt[0].level,
24✔
179
                  },
180
          },
181
  };
182

183
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
48!
184

185
  // do copy the file
186

187
  if (lcn < 1) {
24!
188
    TAOS_CHECK_GOTO(tsdbDoCopyFile(rtner, fobj, &op.nf), &lino, _exit);
24!
189
  } else {
190
    TAOS_CHECK_GOTO(tsdbDoCopyFileLC(rtner, fobj, &op.nf), &lino, _exit);
×
191
  }
192

193
_exit:
×
194
  if (code) {
24!
195
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
196
              tstrerror(code));
197
  }
198
  return code;
24✔
199
}
200

201
typedef struct {
202
  STsdb  *tsdb;
203
  int64_t now;
204
  TSKEY   lastCommit;
205
  int32_t nodeId; // node id of leader vnode in ss migration
206
  int32_t fid;
207
  bool    ssMigrate;
208
} SRtnArg;
209

210
static int32_t tsdbDoRetentionEnd(SRTNer *rtner, bool ssMigrate) {
96✔
211
  int32_t code = 0;
96✔
212
  int32_t lino = 0;
96✔
213

214
  if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
96✔
215
    EFEditT etype = ssMigrate ? TSDB_FEDIT_SSMIGRATE : TSDB_FEDIT_RETENTION;
10!
216
    TAOS_CHECK_GOTO(tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, etype), &lino, _exit);
10!
217

218
    (void)taosThreadMutexLock(&rtner->tsdb->mutex);
10✔
219

220
    code = tsdbFSEditCommit(rtner->tsdb->pFS);
10✔
221
    if (code) {
10!
222
      (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
×
223
      TSDB_CHECK_CODE(code, lino, _exit);
×
224
    }
225

226
    (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
10✔
227

228
    TARRAY2_DESTROY(&rtner->fopArr, NULL);
10!
229
  }
230

231
_exit:
86✔
232
  if (code) {
96!
233
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
234
              tstrerror(code));
235
  } else {
236
    tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
96!
237
  }
238
  return code;
96✔
239
}
240

241
static int32_t tsdbRemoveOrMoveFileObject(SRTNer *rtner, int32_t expLevel, STFileObj *fobj) {
472✔
242
  int32_t code = 0;
472✔
243
  int32_t lino = 0;
472✔
244

245
  if (fobj == NULL) {
472✔
246
    return code;
300✔
247
  }
248

249
  if (expLevel < 0) {
172!
250
    // remove the file
251
    code = tsdbDoRemoveFileObject(rtner, fobj);
×
252
    TSDB_CHECK_CODE(code, lino, _exit);
×
253
  } else if (expLevel > fobj->f->did.level) {
172✔
254
    // Try to move the file to a new level
255
    for (; expLevel > fobj->f->did.level; expLevel--) {
24!
256
      SDiskID diskId = {0};
24✔
257

258
      code = tsdbAllocateDiskAtLevel(rtner->tsdb, expLevel, tsdbFTypeLabel(fobj->f->type), &diskId);
24✔
259
      if (code) {
24!
260
        tsdbTrace("vgId:%d, cannot allocate disk for file %s, level:%d, reason:%s, skip!", TD_VID(rtner->tsdb->pVnode),
×
261
                  fobj->fname, expLevel, tstrerror(code));
262
        code = 0;
×
263
        continue;
×
264
      } else {
265
        tsdbInfo("vgId:%d start to migrate file %s from level %d to %d, size:%" PRId64, TD_VID(rtner->tsdb->pVnode),
24!
266
                 fobj->fname, fobj->f->did.level, diskId.level, fobj->f->size);
267

268
        code = tsdbDoMigrateFileObj(rtner, fobj, &diskId);
24✔
269
        TSDB_CHECK_CODE(code, lino, _exit);
24!
270

271
        tsdbInfo("vgId:%d end to migrate file %s from level %d to %d, size:%" PRId64, TD_VID(rtner->tsdb->pVnode),
24!
272
                 fobj->fname, fobj->f->did.level, diskId.level, fobj->f->size);
273
        break;
24✔
274
      }
275
    }
276
  }
277

278
_exit:
148✔
279
  if (code) {
172!
280
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
281
              tstrerror(code));
282
  }
283
  return code;
172✔
284
}
285

286
static int32_t tsdbDoRetention(SRTNer *rtner) {
96✔
287
  int32_t    code = 0;
96✔
288
  int32_t    lino = 0;
96✔
289
  STFileObj *fobj = NULL;
96✔
290
  STFileSet *fset = rtner->fset;
96✔
291

292
  // handle data file sets
293
  int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
96✔
294
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
480✔
295
    code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fset->farr[ftype]);
384✔
296
    TSDB_CHECK_CODE(code, lino, _exit);
384!
297
  }
298

299
  // handle stt file
300
  SSttLvl *lvl;
301
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
164✔
302
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
156✔
303
      code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fobj);
88✔
304
      TSDB_CHECK_CODE(code, lino, _exit);
88!
305
    }
306
  }
307

308
_exit:
96✔
309
  if (code) {
96!
310
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
311
              tstrerror(code));
312
  }
313
  return code;
96✔
314
}
315

316
static void tsdbRetentionCancel(void *arg) { taosMemoryFree(arg); }
×
317

318
static int32_t tsdbRetention(void *arg) {
96✔
319
  int32_t code = 0;
96✔
320
  int32_t lino = 0;
96✔
321

322
  SRtnArg   *rtnArg = (SRtnArg *)arg;
96✔
323
  STsdb     *pTsdb = rtnArg->tsdb;
96✔
324
  SVnode    *pVnode = pTsdb->pVnode;
96✔
325
  STFileSet *fset = NULL;
96✔
326
  SRTNer     rtner = {
192✔
327
          .tsdb = pTsdb,
328
          .szPage = pVnode->config.tsdbPageSize,
96✔
329
          .now = rtnArg->now,
96✔
330
          .lastCommit = rtnArg->lastCommit,
96✔
331
          .cid = tsdbFSAllocEid(pTsdb->pFS),
96✔
332
          .nodeId = rtnArg->nodeId,
96✔
333
  };
334

335
  // begin task
336
  (void)taosThreadMutexLock(&pTsdb->mutex);
96✔
337

338
  // check if background task is disabled
339
  if (pTsdb->bgTaskDisabled) {
96!
340
    tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(pTsdb->pVnode));
×
341
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
342
    return 0;
×
343
  }
344

345
  // set flag and copy
346
  tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION, &fset);
96✔
347
  if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) {
96!
348
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
349
    TSDB_CHECK_CODE(code, lino, _exit);
×
350
  }
351

352
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
96✔
353

354
  // do retention
355
  if (rtner.fset) {
96!
356
    if (rtnArg->ssMigrate) {
96!
357
      TAOS_CHECK_GOTO(tsdbDoSsMigrate(&rtner), &lino, _exit);
×
358
    } else {
359
      TAOS_CHECK_GOTO(tsdbDoRetention(&rtner), &lino, _exit);
96!
360
    }
361

362
    TAOS_CHECK_GOTO(tsdbDoRetentionEnd(&rtner, rtnArg->ssMigrate), &lino, _exit);
96!
363
  }
364

365
_exit:
96✔
366
  if (rtner.fset) {
96!
367
    (void)taosThreadMutexLock(&pTsdb->mutex);
96✔
368
    tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION);
96✔
369
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
96✔
370
  }
371

372
  // clear resources
373
  tsdbTFileSetClear(&rtner.fset);
96✔
374
  TARRAY2_DESTROY(&rtner.fopArr, NULL);
96!
375
  taosMemoryFree(arg);
96!
376
  if (code) {
96!
377
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
378
  }
379
  return code;
96✔
380
}
381

382
static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool ssMigrate, int32_t nodeId) {
12✔
383
  void tsdbSsMigrateMonitorAddFileSet(STsdb *tsdb, int32_t fid);
384

385
  int32_t code = 0;
12✔
386
  int32_t lino = 0;
12✔
387

388
  // check if background task is disabled
389
  if (tsdb->bgTaskDisabled) {
12!
390
    if (ssMigrate) {
×
391
      tsdbInfo("vgId:%d, background task is disabled, skip ss migration", TD_VID(tsdb->pVnode));
×
392
    } else {
393
      tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(tsdb->pVnode));
×
394
    }
395
    return 0;
×
396
  }
397

398
  STFileSet *fset;
399
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
108✔
400
    // TODO: when migrating to S3, skip fset that should not be migrated
401
    
402
    if (ssMigrate && fset->lastMigrate/1000 >= now) {
96!
403
      tsdbInfo("vgId:%d, fid:%d, skip migration as start time < last migration time", TD_VID(tsdb->pVnode), fset->fid);
×
404
      continue;
×
405
    }
406

407
    SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
96!
408
    if (arg == NULL) {
96!
409
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
410
    }
411

412
    arg->tsdb = tsdb;
96✔
413
    arg->now = now;
96✔
414
    arg->fid = fset->fid;
96✔
415
    arg->nodeId = nodeId;
96✔
416
    arg->ssMigrate = ssMigrate;
96✔
417
    arg->lastCommit = fset->lastCommit;
96✔
418

419
    if (ssMigrate) {
96!
420
      tsdbSsMigrateMonitorAddFileSet(tsdb, fset->fid);
×
421
    }
422
    code = vnodeAsync(RETENTION_TASK_ASYNC, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg,
96✔
423
                      &fset->retentionTask);
424
    if (code) {
96!
425
      taosMemoryFree(arg);
×
426
      TSDB_CHECK_CODE(code, lino, _exit);
×
427
    }
428
  }
429

430
_exit:
12✔
431
  if (code) {
12!
432
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
433
  }
434
  return code;
12✔
435
}
436

437
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now) {
12✔
438
  int32_t code = 0;
12✔
439
  (void)taosThreadMutexLock(&tsdb->mutex);
12✔
440
  code = tsdbAsyncRetentionImpl(tsdb, now, false, 0);
12✔
441
  (void)taosThreadMutexUnlock(&tsdb->mutex);
12✔
442
  return code;
12✔
443
}
444

445
#ifdef USE_SHARED_STORAGE
446

447
int32_t tsdbAsyncSsMigrate(STsdb *tsdb, SSsMigrateVgroupReq *pReq) {
×
448
  int32_t code = 0;
×
449

450
  bool tsdbResetSsMigrateMonitor(STsdb *tsdb, int32_t ssMigrateId);
451

452
  (void)taosThreadMutexLock(&tsdb->mutex);
×
453
  if (!tsdbResetSsMigrateMonitor(tsdb, pReq->ssMigrateId)) {
×
454
    (void)taosThreadMutexUnlock(&tsdb->mutex);
×
455
    tsdbInfo("vgId:%d, skip ss migration as there's an in progress one", TD_VID(tsdb->pVnode));
×
456
    return 0;
×
457
  }
458
  code = tsdbAsyncRetentionImpl(tsdb, pReq->timestamp, true, pReq->nodeId);
×
459
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
460

461
  if (code) {
×
462
    tsdbError("vgId:%d, %s failed, reason:%s", TD_VID(tsdb->pVnode), __func__, tstrerror(code));
×
463
  }
464
  return code;
×
465
}
466

467
#endif
468

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc