• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tsdb/tsdbRetention.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tss.h"
17
#include "tsdb.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSet2.h"
20
#include "vnd.h"
21
#include "tsdbInt.h"
22

23
extern int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool ssMigrate);
24

25
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
×
26
  STFileOp op = {
×
27
      .optype = TSDB_FOP_REMOVE,
28
      .fid = fobj->f->fid,
×
29
      .of = fobj->f[0],
30
  };
31

32
  return TARRAY2_APPEND(&rtner->fopArr, op);
×
33
}
34

35
static int32_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
×
36
  int64_t interval = 1000;  // 1s
×
37
  int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX;
×
38
  int64_t offset = 0;
×
39
  int64_t remain = size;
×
40

41
  while (remain > 0) {
×
42
    int64_t n;
43
    int64_t last = taosGetTimestampMs();
×
44
    if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) {
×
45
      TAOS_CHECK_RETURN(terrno);
×
46
    }
47

48
    remain -= n;
×
49

50
    if (remain > 0) {
×
51
      int64_t elapsed = taosGetTimestampMs() - last;
×
52
      if (elapsed < interval) {
×
53
        taosMsleep(interval - elapsed);
×
54
      }
55
    }
56
  }
57

58
  return 0;
×
59
}
60

61
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
×
62
  int32_t   code = 0;
×
63
  int32_t   lino = 0;
×
64
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
65
  char      fname_from[TSDB_FILENAME_LEN];
66
  char      fname_to[TSDB_FILENAME_LEN];
67

68
  tsdbTFileLastChunkName(rtner->tsdb, from->f, fname_from);
×
69
  tsdbTFileLastChunkName(rtner->tsdb, to, fname_to);
×
70

71
  fdFrom = taosOpenFile(fname_from, TD_FILE_READ);
×
72
  if (fdFrom == NULL) {
×
73
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
74
  }
75

76
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname_to, from->f->size);
×
77

78
  fdTo = taosOpenFile(fname_to, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
79
  if (fdTo == NULL) {
×
80
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
81
  }
82

83
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
84
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
85
  int64_t    lc_size = tsdbLogicToFileSize(to->size, rtner->szPage) - chunksize * (to->lcn - 1);
×
86

87
  if (taosFSendFile(fdTo, fdFrom, 0, lc_size) < 0) {
×
88
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
89
  }
90

91
_exit:
×
92
  if (code) {
×
93
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
94
              tstrerror(code));
95
  }
96
  if (taosCloseFile(&fdFrom) != 0) {
×
97
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_from);
×
98
  }
99
  if (taosCloseFile(&fdTo) != 0) {
×
100
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_to);
×
101
  }
102
  return code;
×
103
}
104

105
static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile *to) {
×
106
  int32_t code = 0;
×
107
  int32_t lino = 0;
×
108

109
  char      fname[TSDB_FILENAME_LEN];
110
  TdFilePtr fdFrom = NULL;
×
111
  TdFilePtr fdTo = NULL;
×
112

113
  tsdbTFileName(rtner->tsdb, to, fname);
×
114

115
  fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
×
116
  if (fdFrom == NULL) {
×
117
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
118
  }
119

120
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, from->f->size);
×
121

122
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
123
  if (fdTo == NULL) {
×
124
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
125
  }
126
  TSDB_CHECK_CODE(code, lino, _exit);
×
127

128
  TAOS_CHECK_GOTO(tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
×
129
                                               tsRetentionSpeedLimitMB),
130
                  &lino, _exit);
131

132
_exit:
×
133
  if (code) {
×
134
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
135
              tstrerror(code));
136
  }
137
  if (taosCloseFile(&fdFrom) != 0) {
×
138
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
139
  }
140
  if (taosCloseFile(&fdTo) != 0) {
×
141
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
142
  }
143
  return code;
×
144
}
145

146
static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) {
×
147
  int32_t  code = 0;
×
148
  int32_t  lino = 0;
×
149
  STFileOp op = {0};
×
150
  int32_t  lcn = fobj->f->lcn;
×
151

152
  // remove old
153
  op = (STFileOp){
×
154
      .optype = TSDB_FOP_REMOVE,
155
      .fid = fobj->f->fid,
×
156
      .of = fobj->f[0],
×
157
  };
158

159
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
160

161
  // create new
162
  op = (STFileOp){
×
163
      .optype = TSDB_FOP_CREATE,
164
      .fid = fobj->f->fid,
×
165
      .nf =
166
          {
167
              .type = fobj->f->type,
×
168
              .did = did[0],
×
169
              .fid = fobj->f->fid,
×
170
              .minVer = fobj->f->minVer,
×
171
              .maxVer = fobj->f->maxVer,
×
172
              .mid = fobj->f->mid,
×
173
              .cid = fobj->f->cid,
×
174
              .size = fobj->f->size,
×
175
              .lcn = lcn,
176
              .stt[0] =
177
                  {
178
                      .level = fobj->f->stt[0].level,
×
179
                  },
180
          },
181
  };
182

183
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
184

185
  // do copy the file
186

187
  if (lcn < 1) {
×
188
    TAOS_CHECK_GOTO(tsdbDoCopyFile(rtner, fobj, &op.nf), &lino, _exit);
×
189
  } else {
190
    TAOS_CHECK_GOTO(tsdbDoCopyFileLC(rtner, fobj, &op.nf), &lino, _exit);
×
191
  }
192

193
_exit:
×
194
  if (code) {
×
195
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
196
              tstrerror(code));
197
  }
198
  return code;
×
199
}
200

201
typedef struct {
202
  STsdb  *tsdb;
203
  int64_t now;
204
  TSKEY   lastCommit;
205
  int32_t nodeId; // node id of leader vnode in ss migration
206
  int32_t fid;
207
  bool    ssMigrate;
208
} SRtnArg;
209

210
static int32_t tsdbDoRetentionEnd(SRTNer *rtner, bool ssMigrate) {
×
211
  int32_t code = 0;
×
212
  int32_t lino = 0;
×
213

214
  if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
×
215
    EFEditT etype = ssMigrate ? TSDB_FEDIT_SSMIGRATE : TSDB_FEDIT_RETENTION;
×
216
    TAOS_CHECK_GOTO(tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, etype), &lino, _exit);
×
217

218
    (void)taosThreadMutexLock(&rtner->tsdb->mutex);
×
219

220
    code = tsdbFSEditCommit(rtner->tsdb->pFS);
×
221
    if (code) {
×
222
      (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
×
223
      TSDB_CHECK_CODE(code, lino, _exit);
×
224
    }
225

226
    (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
×
227

228
    TARRAY2_DESTROY(&rtner->fopArr, NULL);
×
229
  }
230

231
_exit:
×
232
  if (code) {
×
233
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
234
              tstrerror(code));
235
  } else {
236
    tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
×
237
  }
238
  return code;
×
239
}
240

241
static int32_t tsdbRemoveOrMoveFileObject(SRTNer *rtner, int32_t expLevel, STFileObj *fobj) {
×
242
  int32_t code = 0;
×
243
  int32_t lino = 0;
×
244

245
  if (fobj == NULL) {
×
246
    return code;
×
247
  }
248

249
  if (expLevel < 0) {
×
250
    // remove the file
251
    code = tsdbDoRemoveFileObject(rtner, fobj);
×
252
    TSDB_CHECK_CODE(code, lino, _exit);
×
253
  } else if (expLevel > fobj->f->did.level) {
×
254
    // Try to move the file to a new level
255
    for (; expLevel > fobj->f->did.level; expLevel--) {
×
256
      SDiskID diskId = {0};
×
257

258
      code = tsdbAllocateDiskAtLevel(rtner->tsdb, expLevel, tsdbFTypeLabel(fobj->f->type), &diskId);
×
259
      if (code) {
×
260
        tsdbTrace("vgId:%d, cannot allocate disk for file %s, level:%d, reason:%s, skip!", TD_VID(rtner->tsdb->pVnode),
×
261
                  fobj->fname, expLevel, tstrerror(code));
262
        code = 0;
×
263
        continue;
×
264
      } else {
265
        tsdbInfo("vgId:%d start to migrate file %s from level %d to %d, size:%" PRId64, TD_VID(rtner->tsdb->pVnode),
×
266
                 fobj->fname, fobj->f->did.level, diskId.level, fobj->f->size);
267

268
        code = tsdbDoMigrateFileObj(rtner, fobj, &diskId);
×
269
        TSDB_CHECK_CODE(code, lino, _exit);
×
270

271
        tsdbInfo("vgId:%d end to migrate file %s from level %d to %d, size:%" PRId64, TD_VID(rtner->tsdb->pVnode),
×
272
                 fobj->fname, fobj->f->did.level, diskId.level, fobj->f->size);
273
        break;
×
274
      }
275
    }
276
  }
277

278
_exit:
×
279
  if (code) {
×
280
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
281
              tstrerror(code));
282
  }
283
  return code;
×
284
}
285

286
static int32_t tsdbDoRetention(SRTNer *rtner) {
×
287
  int32_t    code = 0;
×
288
  int32_t    lino = 0;
×
289
  STFileObj *fobj = NULL;
×
290
  STFileSet *fset = rtner->fset;
×
291

292
  // handle data file sets
293
  int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
×
294
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
×
295
    code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fset->farr[ftype]);
×
296
    TSDB_CHECK_CODE(code, lino, _exit);
×
297
  }
298

299
  // handle stt file
300
  SSttLvl *lvl;
301
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
302
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
303
      code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fobj);
×
304
      TSDB_CHECK_CODE(code, lino, _exit);
×
305
    }
306
  }
307

308
_exit:
×
309
  if (code) {
×
310
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
311
              tstrerror(code));
312
  }
313
  return code;
×
314
}
315

316
static void tsdbRetentionCancel(void *arg) { taosMemoryFree(arg); }
×
317

318
static int32_t tsdbRetention(void *arg) {
×
319
  int32_t code = 0;
×
320
  int32_t lino = 0;
×
321

322
  SRtnArg   *rtnArg = (SRtnArg *)arg;
×
323
  STsdb     *pTsdb = rtnArg->tsdb;
×
324
  SVnode    *pVnode = pTsdb->pVnode;
×
325
  STFileSet *fset = NULL;
×
326
  SRTNer     rtner = {
×
327
          .tsdb = pTsdb,
328
          .szPage = pVnode->config.tsdbPageSize,
×
329
          .now = rtnArg->now,
×
330
          .lastCommit = rtnArg->lastCommit,
×
331
          .cid = tsdbFSAllocEid(pTsdb->pFS),
×
332
          .nodeId = rtnArg->nodeId,
×
333
  };
334

335
  // begin task
336
  (void)taosThreadMutexLock(&pTsdb->mutex);
×
337

338
  // check if background task is disabled
339
  if (pTsdb->bgTaskDisabled) {
×
340
    tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(pTsdb->pVnode));
×
341
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
342
    return 0;
×
343
  }
344

345
  // set flag and copy
346
  tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION, &fset);
×
347
  if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) {
×
348
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
349
    TSDB_CHECK_CODE(code, lino, _exit);
×
350
  }
351

352
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
353

354
  // do retention
355
  if (rtner.fset) {
×
356
    if (rtnArg->ssMigrate) {
×
357
      TAOS_CHECK_GOTO(tsdbDoSsMigrate(&rtner), &lino, _exit);
×
358
    } else {
359
      TAOS_CHECK_GOTO(tsdbDoRetention(&rtner), &lino, _exit);
×
360
    }
361

362
    TAOS_CHECK_GOTO(tsdbDoRetentionEnd(&rtner, rtnArg->ssMigrate), &lino, _exit);
×
363
  }
364

365
_exit:
×
366
  if (rtner.fset) {
×
367
    (void)taosThreadMutexLock(&pTsdb->mutex);
×
368
    tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION);
×
369
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
370
  }
371

372
  // clear resources
373
  tsdbTFileSetClear(&rtner.fset);
×
374
  TARRAY2_DESTROY(&rtner.fopArr, NULL);
×
375
  taosMemoryFree(arg);
×
376
  if (code) {
×
377
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
378
  }
379
  return code;
×
380
}
381

382
static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool ssMigrate, int32_t nodeId) {
×
383
  int32_t tsdbSsMigrateMonitorAddFileSet(STsdb *tsdb, int32_t fid);
384

385
  int32_t code = 0;
×
386
  int32_t lino = 0;
×
387

388
  // check if background task is disabled
389
  if (tsdb->bgTaskDisabled) {
×
390
    if (ssMigrate) {
×
391
      tsdbInfo("vgId:%d, background task is disabled, skip ss migration", TD_VID(tsdb->pVnode));
×
392
    } else {
393
      tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(tsdb->pVnode));
×
394
    }
395
    return 0;
×
396
  }
397

398
  STFileSet *fset;
399
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
×
400
    // TODO: when migrating to S3, skip fset that should not be migrated
401
    
402
    if (ssMigrate && fset->lastMigrate/1000 >= now) {
×
403
      tsdbInfo("vgId:%d, fid:%d, skip migration as start time < last migration time", TD_VID(tsdb->pVnode), fset->fid);
×
404
      continue;
×
405
    }
406

407
    SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
×
408
    if (arg == NULL) {
×
409
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
410
    }
411

412
    arg->tsdb = tsdb;
×
413
    arg->now = now;
×
414
    arg->fid = fset->fid;
×
415
    arg->nodeId = nodeId;
×
416
    arg->ssMigrate = ssMigrate;
×
417
    arg->lastCommit = fset->lastCommit;
×
418

419
    if (ssMigrate) {
×
420
      code = tsdbSsMigrateMonitorAddFileSet(tsdb, fset->fid);
×
421
      if (code) {
×
422
        taosMemoryFree(arg);
×
423
        TSDB_CHECK_CODE(code, lino, _exit);
×
424
      }
425
    }
426
    code = vnodeAsync(RETENTION_TASK_ASYNC, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg,
×
427
                      &fset->retentionTask);
428
    if (code) {
×
429
      taosMemoryFree(arg);
×
430
      TSDB_CHECK_CODE(code, lino, _exit);
×
431
    }
432
  }
433

434
_exit:
×
435
  if (code) {
×
436
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
437
  }
438
  return code;
×
439
}
440

441
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now) {
×
442
  int32_t code = 0;
×
443
  (void)taosThreadMutexLock(&tsdb->mutex);
×
444
  code = tsdbAsyncRetentionImpl(tsdb, now, false, 0);
×
445
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
446
  return code;
×
447
}
448

449
#ifdef USE_SHARED_STORAGE
450

451
int32_t tsdbAsyncSsMigrate(STsdb *tsdb, SSsMigrateVgroupReq *pReq) {
×
452
  int32_t code = 0;
×
453

454
  bool tsdbResetSsMigrateMonitor(STsdb *tsdb, int32_t ssMigrateId);
455

456
  (void)taosThreadMutexLock(&tsdb->mutex);
×
457
  if (!tsdbResetSsMigrateMonitor(tsdb, pReq->ssMigrateId)) {
×
458
    (void)taosThreadMutexUnlock(&tsdb->mutex);
×
459
    tsdbInfo("vgId:%d, skip ss migration as there's an in progress one", TD_VID(tsdb->pVnode));
×
460
    return 0;
×
461
  }
462
  code = tsdbAsyncRetentionImpl(tsdb, pReq->timestamp, true, pReq->nodeId);
×
463
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
464

465
  if (code) {
×
466
    tsdbError("vgId:%d, %s failed, reason:%s", TD_VID(tsdb->pVnode), __func__, tstrerror(code));
×
467
  }
468
  return code;
×
469
}
470

471
#endif
472

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc