• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4913

06 Jan 2026 01:30AM UTC coverage: 64.884% (-0.004%) from 64.888%
#4913

push

travis-ci

web-flow
merge: from main to 3.0 branch #34167

180 of 319 new or added lines in 14 files covered. (56.43%)

571 existing lines in 128 files now uncovered.

195016 of 300563 relevant lines covered (64.88%)

117540852.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.05
/source/dnode/vnode/src/tsdb/tsdbRetention.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tss.h"
17
#include "tsdb.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSet2.h"
20
#include "vnd.h"
21
#include "tsdbInt.h"
22

23
extern int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, ETsdbOpType type);
24

25
// tsdbRetentionMonitor.c
26
extern int32_t tsdbAddRetentionMonitorTask(STsdb *tsdb, int32_t fid, SVATaskID *taskId, int64_t fileSize);
27
extern void    tsdbRemoveRetentionMonitorTask(STsdb *tsdb, SVATaskID *taskId);
28

29
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
×
30
  STFileOp op = {
×
31
      .optype = TSDB_FOP_REMOVE,
32
      .fid = fobj->f->fid,
×
33
      .of = fobj->f[0],
34
  };
35

36
  return TARRAY2_APPEND(&rtner->fopArr, op);
×
37
}
38

39
static int32_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
66,001✔
40
  int64_t interval = 1000;  // 1s
66,001✔
41
  int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX;
66,001✔
42
  int64_t offset = 0;
66,001✔
43
  int64_t remain = size;
66,001✔
44

45
  while (remain > 0) {
132,002✔
46
    int64_t n;
47
    int64_t last = taosGetTimestampMs();
66,001✔
48
    if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) {
66,001✔
49
      TAOS_CHECK_RETURN(terrno);
×
50
    }
51

52
    remain -= n;
66,001✔
53

54
    if (remain > 0) {
66,001✔
55
      int64_t elapsed = taosGetTimestampMs() - last;
×
56
      if (elapsed < interval) {
×
57
        taosMsleep(interval - elapsed);
×
58
      }
59
    }
60
  }
61

62
  return 0;
66,001✔
63
}
64

65
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
×
66
  int32_t   code = 0;
×
67
  int32_t   lino = 0;
×
68
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
69
  char      fname_from[TSDB_FILENAME_LEN];
×
70
  char      fname_to[TSDB_FILENAME_LEN];
×
71

72
  tsdbTFileLastChunkName(rtner->tsdb, from->f, fname_from);
×
73
  tsdbTFileLastChunkName(rtner->tsdb, to, fname_to);
×
74

75
  fdFrom = taosOpenFile(fname_from, TD_FILE_READ);
×
76
  if (fdFrom == NULL) {
×
77
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
78
  }
79

80
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname_to, from->f->size);
×
81

82
  fdTo = taosOpenFile(fname_to, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
83
  if (fdTo == NULL) {
×
84
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
85
  }
86

87
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
88
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
89
  int64_t    lc_size = tsdbLogicToFileSize(to->size, rtner->szPage) - chunksize * (to->lcn - 1);
×
90

91
  if (taosFSendFile(fdTo, fdFrom, 0, lc_size) < 0) {
×
92
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
93
  }
94

95
_exit:
×
96
  if (code) {
×
97
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
98
              tstrerror(code));
99
  }
100
  if (taosCloseFile(&fdFrom) != 0) {
×
101
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_from);
×
102
  }
103
  if (taosCloseFile(&fdTo) != 0) {
×
104
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_to);
×
105
  }
106
  return code;
×
107
}
108

109
static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile *to) {
66,001✔
110
  int32_t code = 0;
66,001✔
111
  int32_t lino = 0;
66,001✔
112

113
  char      fname[TSDB_FILENAME_LEN];
66,001✔
114
  TdFilePtr fdFrom = NULL;
66,001✔
115
  TdFilePtr fdTo = NULL;
66,001✔
116

117
  tsdbTFileName(rtner->tsdb, to, fname);
66,001✔
118

119
  fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
66,001✔
120
  if (fdFrom == NULL) {
66,001✔
121
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
122
  }
123

124
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, from->f->size);
66,001✔
125

126
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
66,001✔
127
  if (fdTo == NULL) {
66,001✔
128
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
129
  }
130
  TSDB_CHECK_CODE(code, lino, _exit);
66,001✔
131

132
  TAOS_CHECK_GOTO(tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
132,002✔
133
                                               tsRetentionSpeedLimitMB),
134
                  &lino, _exit);
135

136
_exit:
66,001✔
137
  if (code) {
66,001✔
138
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
139
              tstrerror(code));
140
  }
141
  if (taosCloseFile(&fdFrom) != 0) {
66,001✔
142
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
143
  }
144
  if (taosCloseFile(&fdTo) != 0) {
66,001✔
145
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
146
  }
147
  return code;
66,001✔
148
}
149

150
static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) {
66,001✔
151
  int32_t  code = 0;
66,001✔
152
  int32_t  lino = 0;
66,001✔
153
  STFileOp op = {0};
66,001✔
154
  int32_t  lcn = fobj->f->lcn;
66,001✔
155

156
  // remove old
157
  op = (STFileOp){
66,001✔
158
      .optype = TSDB_FOP_REMOVE,
159
      .fid = fobj->f->fid,
66,001✔
160
      .of = fobj->f[0],
66,001✔
161
  };
162

163
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
132,002✔
164

165
  // create new
166
  op = (STFileOp){
66,001✔
167
      .optype = TSDB_FOP_CREATE,
168
      .fid = fobj->f->fid,
66,001✔
169
      .nf =
170
          {
171
              .type = fobj->f->type,
66,001✔
172
              .did = did[0],
66,001✔
173
              .fid = fobj->f->fid,
66,001✔
174
              .minVer = fobj->f->minVer,
66,001✔
175
              .maxVer = fobj->f->maxVer,
66,001✔
176
              .mid = fobj->f->mid,
66,001✔
177
              .cid = fobj->f->cid,
66,001✔
178
              .size = fobj->f->size,
66,001✔
179
              .lcn = lcn,
180
              .stt[0] =
181
                  {
182
                      .level = fobj->f->stt[0].level,
66,001✔
183
                  },
184
          },
185
  };
186

187
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
132,002✔
188

189
  // do copy the file
190

191
  if (lcn < 1) {
66,001✔
192
    TAOS_CHECK_GOTO(tsdbDoCopyFile(rtner, fobj, &op.nf), &lino, _exit);
66,001✔
193
  } else {
194
    TAOS_CHECK_GOTO(tsdbDoCopyFileLC(rtner, fobj, &op.nf), &lino, _exit);
×
195
  }
196

197
_exit:
66,001✔
198
  if (code) {
66,001✔
199
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
200
              tstrerror(code));
201
  }
202
  return code;
66,001✔
203
}
204

205

206
static int32_t tsdbDoRetentionEnd(SRTNer *rtner, EFEditT etype) {
109,333✔
207
  int32_t code = 0;
109,333✔
208
  int32_t lino = 0;
109,333✔
209

210
  if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
109,333✔
211
    TAOS_CHECK_GOTO(tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, etype), &lino, _exit);
32,361✔
212

213
    (void)taosThreadMutexLock(&rtner->tsdb->mutex);
32,361✔
214

215
    code = tsdbFSEditCommit(rtner->tsdb->pFS);
32,361✔
216
    if (code) {
32,361✔
217
      (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
×
218
      TSDB_CHECK_CODE(code, lino, _exit);
×
219
    }
220

221
    (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
32,361✔
222

223
    TARRAY2_DESTROY(&rtner->fopArr, NULL);
32,361✔
224
  }
225

226
_exit:
109,333✔
227
  if (code) {
109,333✔
228
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
229
              tstrerror(code));
230
  } else {
231
    tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
109,333✔
232
  }
233
  return code;
109,333✔
234
}
235

236
static int32_t tsdbRemoveOrMoveFileObject(SRTNer *rtner, int32_t expLevel, STFileObj *fobj) {
385,733✔
237
  int32_t code = 0;
385,733✔
238
  int32_t lino = 0;
385,733✔
239

240
  if (fobj == NULL) {
385,733✔
241
    return code;
178,478✔
242
  }
243

244
  if (expLevel < 0) {
207,255✔
245
    // remove the file
246
    code = tsdbDoRemoveFileObject(rtner, fobj);
×
247
    TSDB_CHECK_CODE(code, lino, _exit);
×
248
  } else if (expLevel > fobj->f->did.level) {
207,255✔
249
    // Try to move the file to a new level
250
    for (; expLevel > fobj->f->did.level; expLevel--) {
66,001✔
251
      SDiskID diskId = {0};
66,001✔
252

253
      code = tsdbAllocateDiskAtLevel(rtner->tsdb, expLevel, tsdbFTypeLabel(fobj->f->type), &diskId);
66,001✔
254
      if (code) {
66,001✔
255
        tsdbTrace("vgId:%d, cannot allocate disk for file %s, level:%d, reason:%s, skip!", TD_VID(rtner->tsdb->pVnode),
×
256
                  fobj->fname, expLevel, tstrerror(code));
257
        code = 0;
×
258
        continue;
×
259
      } else {
260
        tsdbInfo("vgId:%d start to migrate file %s from level %d to %d, size:%" PRId64, TD_VID(rtner->tsdb->pVnode),
66,001✔
261
                 fobj->fname, fobj->f->did.level, diskId.level, fobj->f->size);
262

263
        code = tsdbDoMigrateFileObj(rtner, fobj, &diskId);
66,001✔
264
        TSDB_CHECK_CODE(code, lino, _exit);
66,001✔
265

266
        tsdbInfo("vgId:%d end to migrate file %s from level %d to %d, size:%" PRId64, TD_VID(rtner->tsdb->pVnode),
66,001✔
267
                 fobj->fname, fobj->f->did.level, diskId.level, fobj->f->size);
268
        break;
66,001✔
269
      }
270
    }
271
  }
272

273
_exit:
141,254✔
274
  if (code) {
207,255✔
275
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
276
              tstrerror(code));
277
  }
278
  return code;
207,255✔
279
}
280

281
static int32_t tsdbDoRetention(SRTNer *rtner) {
88,865✔
282
  int32_t    code = 0;
88,865✔
283
  int32_t    lino = 0;
88,865✔
284
  STFileObj *fobj = NULL;
88,865✔
285
  STFileSet *fset = rtner->fset;
88,865✔
286

287
  // handle data file sets
288
  int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->tw.ekey);
88,865✔
289
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
444,325✔
290
    code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fset->farr[ftype]);
355,460✔
291
    TSDB_CHECK_CODE(code, lino, _exit);
355,460✔
292
  }
293

294
  // handle stt file
295
  SSttLvl *lvl;
296
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
119,138✔
297
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
60,546✔
298
      code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fobj);
30,273✔
299
      TSDB_CHECK_CODE(code, lino, _exit);
30,273✔
300
    }
301
  }
302

303
_exit:
88,865✔
304
  if (code) {
88,865✔
305
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
306
              tstrerror(code));
307
  }
308
  return code;
88,865✔
309
}
310

UNCOV
311
void tsdbRetentionCancel(void *arg) { taosMemoryFree(arg); }
×
312

313
static bool tsdbFSetNeedRetention(STFileSet *fset, int32_t expLevel) {
4,386✔
314
  if (expLevel < 0) {
4,386✔
315
    return false;
×
316
  }
317

318
  STFileObj *fobj = NULL;
4,386✔
319
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
21,930✔
320
    fobj = fset->farr[ftype];
17,544✔
321
    if (fobj && (expLevel > fset->farr[ftype]->f->did.level)) {
17,544✔
322
      return true;
×
323
    }
324
  }
325

326
  // handle stt file
327
  SSttLvl *lvl = NULL;
4,386✔
328
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
5,848✔
329
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
5,848✔
330
      if (fobj && (expLevel > fobj->f->did.level)) {
4,386✔
331
        return true;
2,924✔
332
      }
333
    }
334
  }
335

336
  return false;
1,462✔
337
}
338
#ifdef TD_ENTERPRISE
339
static bool tsdbShouldRollup(STsdb *tsdb, SRTNer *rtner, SRtnArg *rtnArg) {
109,333✔
340
  SVnode    *pVnode = tsdb->pVnode;
109,333✔
341
  STFileSet *fset = rtner->fset;
109,333✔
342

343
  if (!VND_IS_RSMA(pVnode)) {
109,333✔
344
    return false;
87,403✔
345
  }
346

347
  int32_t expLevel = tsdbFidLevel(fset->fid, &tsdb->keepCfg, rtner->tw.ekey);
21,930✔
348
  if (expLevel <= 0) {
21,930✔
349
    return false;
×
350
  }
351

352
  if (rtnArg->optrType == TSDB_OPTR_ROLLUP) {
21,930✔
353
    return true;
17,544✔
354
  } else if (rtnArg->optrType == TSDB_OPTR_NORMAL) {
4,386✔
355
    return tsdbFSetNeedRetention(fset, expLevel);
4,386✔
356
  }
357
  return false;
×
358
}
359
#endif
360
int32_t tsdbRetention(void *arg) {
109,333✔
361
  int32_t code = 0;
109,333✔
362
  int32_t lino = 0;
109,333✔
363

364
  SRtnArg   *rtnArg = (SRtnArg *)arg;
109,333✔
365
  STsdb     *pTsdb = rtnArg->tsdb;
109,333✔
366
  SVnode    *pVnode = pTsdb->pVnode;
109,333✔
367
  STFileSet *fset = NULL;
109,333✔
368
  SRTNer     rtner = {
218,666✔
369
          .tsdb = pTsdb,
370
          .szPage = pVnode->config.tsdbPageSize,
109,333✔
371
          .tw = rtnArg->tw,
372
          .lastCommit = rtnArg->lastCommit,
109,333✔
373
          .cid = tsdbFSAllocEid(pTsdb->pFS),
109,333✔
374
          .nodeId = rtnArg->nodeId,
109,333✔
375
  };
376

377
  // begin task
378
  (void)taosThreadMutexLock(&pTsdb->mutex);
109,333✔
379

380
  // check if background task is disabled
381
  if (pTsdb->bgTaskDisabled) {
109,333✔
382
    tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(pTsdb->pVnode));
×
383
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
384
    return 0;
×
385
  }
386

387
  // set flag and copy
388
  tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION, &fset);
109,333✔
389
  if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) {
109,333✔
390
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
391
    TSDB_CHECK_CODE(code, lino, _exit);
×
392
  }
393

394
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
109,333✔
395

396
  // do retention
397
  if (rtner.fset) {
109,333✔
398
    EFEditT etype = TSDB_FEDIT_RETENTION;
109,333✔
399
    if (rtnArg->optrType == TSDB_OPTR_SSMIGRATE) {
109,333✔
400
      etype = TSDB_FEDIT_SSMIGRATE;
×
401
      TAOS_CHECK_GOTO(tsdbDoSsMigrate(&rtner), &lino, _exit);
×
402
#ifdef TD_ENTERPRISE
403
    } else if (tsdbShouldRollup(pTsdb, &rtner, rtnArg)) {
109,333✔
404
      etype = TSDB_FEDIT_ROLLUP;
20,468✔
405
      TAOS_CHECK_GOTO(tsdbDoRollup(&rtner), &lino, _exit);
20,468✔
406
#endif
407
    } else if (rtnArg->optrType == TSDB_OPTR_NORMAL) {
88,865✔
408
      TAOS_CHECK_GOTO(tsdbDoRetention(&rtner), &lino, _exit);
88,865✔
409
    } else {
410
      goto _exit;
×
411
    }
412

413
    TAOS_CHECK_GOTO(tsdbDoRetentionEnd(&rtner, etype), &lino, _exit);
109,333✔
414
  }
415

416
_exit:
109,333✔
417
  if (rtner.fset) {
109,333✔
418
    (void)taosThreadMutexLock(&pTsdb->mutex);
109,333✔
419
    tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION);
109,333✔
420
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
109,333✔
421
  }
422

423
  // clear resources
424
  tsdbTFileSetClear(&rtner.fset);
109,333✔
425
  TARRAY2_DESTROY(&rtner.fopArr, NULL);
109,333✔
426
  (void)tsdbRemoveRetentionMonitorTask(pTsdb, &rtnArg->taskid);
109,333✔
427
  taosMemoryFree(arg);
109,333✔
428
  if (code) {
109,333✔
429
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
430
  }
431
  return code;
109,333✔
432
}
433

434
static bool tsdbInRetentionTimeRange(STsdb *tsdb, int32_t fid, STimeWindow tw, int8_t optrType) {
110,795✔
435
  if (optrType == TSDB_OPTR_ROLLUP) {
110,795✔
436
    TSKEY  minKey, maxKey;
19,006✔
437
    int8_t precision = tsdb->keepCfg.precision;
19,006✔
438
    if (precision < TSDB_TIME_PRECISION_MILLI || precision > TSDB_TIME_PRECISION_NANO) {
19,006✔
439
      tsdbError("vgId:%d, failed to check retention time range since invalid precision %" PRIi8, TD_VID(tsdb->pVnode), precision);
×
440
      return false;
×
441
    }
442
    tsdbFidKeyRange(fid, tsdb->keepCfg.days, precision, &minKey, &maxKey);
19,006✔
443

444
    if ((tw.ekey != INT64_MAX) && ((double)tw.ekey * (double)tsSecTimes[precision] < (double)minKey)) {
19,006✔
445
      return false;
×
446
    }
447
    if ((tw.skey != INT64_MIN) && ((double)tw.skey * (double)tsSecTimes[precision] > (double)maxKey)) {
19,006✔
448
      return false;
1,462✔
449
    }
450
    return true;
17,544✔
451
  }
452
  return true;
91,789✔
453
}
454

455
static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, STimeWindow tw, int8_t optrType, int8_t triggerType) {
39,910✔
456
  int32_t code = 0;
39,910✔
457
  int32_t lino = 0;
39,910✔
458

459
  // check if background task is disabled
460
  if (tsdb->bgTaskDisabled) {
39,910✔
461
    tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(tsdb->pVnode));
×
462
    return 0;
×
463
  }
464

465
  STFileSet *fset;
466
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
150,705✔
467
    if (!tsdbInRetentionTimeRange(tsdb, fset->fid, tw, optrType)) {
110,795✔
468
      continue;
1,462✔
469
    }
470
    SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
109,333✔
471
    if (arg == NULL) {
109,333✔
472
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
473
    }
474

475
    arg->tsdb = tsdb;
109,333✔
476
    arg->tw = tw;
109,333✔
477
    arg->fid = fset->fid;
109,333✔
478
    arg->nodeId = 0;
109,333✔
479
    arg->optrType = optrType;
109,333✔
480
    arg->triggerType = triggerType;
109,333✔
481
    arg->lastCommit = fset->lastCommit;
109,333✔
482

483
    code = vnodeAsync(RETENTION_TASK_ASYNC, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg,
109,333✔
484
                      &fset->retentionTask);
485
    if (code) {
109,333✔
486
      taosMemoryFree(arg);
×
487
      TSDB_CHECK_CODE(code, lino, _exit);
×
488
    } else {
489
      arg->taskid = fset->retentionTask;
109,333✔
490
      int64_t fileSize = tsdbTFileSetGetDataSize(fset);
109,333✔
491
      TAOS_UNUSED(tsdbAddRetentionMonitorTask(tsdb, fset->fid, &arg->taskid, fileSize));
109,333✔
492
    }
493
  }
494
_exit:
39,910✔
495
  if (code) {
39,910✔
496
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
497
  }
498
  return code;
39,910✔
499
}
500

501
int32_t tsdbAsyncRetention(STsdb *tsdb, STimeWindow tw, int8_t optrType, int8_t triggerType) {
39,910✔
502
  int32_t code = 0;
39,910✔
503
  (void)taosThreadMutexLock(&tsdb->mutex);
39,910✔
504
  code = tsdbAsyncRetentionImpl(tsdb, tw, optrType, triggerType);
39,910✔
505
  (void)taosThreadMutexUnlock(&tsdb->mutex);
39,910✔
506
  return code;
39,910✔
507
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc