• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tsdb/tsdbRetention.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcs.h"
17
#include "tsdb.h"
18
#include "tsdbFS2.h"
19
#include "vnd.h"
20

21
typedef struct {
22
  STsdb  *tsdb;
23
  int32_t szPage;
24
  int64_t now;
25
  int64_t cid;
26

27
  STFileSet   *fset;
28
  TFileOpArray fopArr;
29
} SRTNer;
30

31
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
×
32
  STFileOp op = {
×
33
      .optype = TSDB_FOP_REMOVE,
34
      .fid = fobj->f->fid,
×
35
      .of = fobj->f[0],
36
  };
37

38
  return TARRAY2_APPEND(&rtner->fopArr, op);
×
39
}
40

UNCOV
41
static int32_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
×
UNCOV
42
  int64_t interval = 1000;  // 1s
×
UNCOV
43
  int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX;
×
UNCOV
44
  int64_t offset = 0;
×
UNCOV
45
  int64_t remain = size;
×
46

UNCOV
47
  while (remain > 0) {
×
48
    int64_t n;
UNCOV
49
    int64_t last = taosGetTimestampMs();
×
UNCOV
50
    if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) {
×
51
      TAOS_CHECK_RETURN(terrno);
×
52
    }
53

UNCOV
54
    remain -= n;
×
55

UNCOV
56
    if (remain > 0) {
×
57
      int64_t elapsed = taosGetTimestampMs() - last;
×
58
      if (elapsed < interval) {
×
59
        taosMsleep(interval - elapsed);
×
60
      }
61
    }
62
  }
63

UNCOV
64
  return 0;
×
65
}
66

67
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
×
68
  int32_t   code = 0;
×
69
  int32_t   lino = 0;
×
70
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
71
  char      fname_from[TSDB_FILENAME_LEN];
72
  char      fname_to[TSDB_FILENAME_LEN];
73

74
  tsdbTFileLastChunkName(rtner->tsdb, from->f, fname_from);
×
75
  tsdbTFileLastChunkName(rtner->tsdb, to, fname_to);
×
76

77
  fdFrom = taosOpenFile(fname_from, TD_FILE_READ);
×
78
  if (fdFrom == NULL) {
×
79
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
80
  }
81

82
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname_to, from->f->size);
×
83

84
  fdTo = taosOpenFile(fname_to, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
85
  if (fdTo == NULL) {
×
86
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
87
  }
88

89
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
90
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
91
  int64_t    lc_size = tsdbLogicToFileSize(to->size, rtner->szPage) - chunksize * (to->lcn - 1);
×
92

93
  if (taosFSendFile(fdTo, fdFrom, 0, lc_size) < 0) {
×
94
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
95
  }
96

97
_exit:
×
98
  if (code) {
×
99
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
100
              tstrerror(code));
101
  }
102
  if (taosCloseFile(&fdFrom) != 0) {
×
103
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_from);
×
104
  }
105
  if (taosCloseFile(&fdTo) != 0) {
×
106
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_to);
×
107
  }
108
  return code;
×
109
}
110

UNCOV
111
static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile *to) {
×
UNCOV
112
  int32_t code = 0;
×
UNCOV
113
  int32_t lino = 0;
×
114

115
  char      fname[TSDB_FILENAME_LEN];
UNCOV
116
  TdFilePtr fdFrom = NULL;
×
UNCOV
117
  TdFilePtr fdTo = NULL;
×
118

UNCOV
119
  tsdbTFileName(rtner->tsdb, to, fname);
×
120

UNCOV
121
  fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
×
UNCOV
122
  if (fdFrom == NULL) {
×
123
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
124
  }
125

UNCOV
126
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, from->f->size);
×
127

UNCOV
128
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
UNCOV
129
  if (fdTo == NULL) {
×
130
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
131
  }
UNCOV
132
  TSDB_CHECK_CODE(code, lino, _exit);
×
133

UNCOV
134
  TAOS_CHECK_GOTO(tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
×
135
                                               tsRetentionSpeedLimitMB),
136
                  &lino, _exit);
137

UNCOV
138
_exit:
×
UNCOV
139
  if (code) {
×
140
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
141
              tstrerror(code));
142
  }
UNCOV
143
  if (taosCloseFile(&fdFrom) != 0) {
×
144
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
145
  }
UNCOV
146
  if (taosCloseFile(&fdTo) != 0) {
×
147
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
148
  }
UNCOV
149
  return code;
×
150
}
151

UNCOV
152
static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) {
×
UNCOV
153
  int32_t  code = 0;
×
UNCOV
154
  int32_t  lino = 0;
×
UNCOV
155
  STFileOp op = {0};
×
UNCOV
156
  int32_t  lcn = fobj->f->lcn;
×
157

158
  // remove old
UNCOV
159
  op = (STFileOp){
×
160
      .optype = TSDB_FOP_REMOVE,
UNCOV
161
      .fid = fobj->f->fid,
×
UNCOV
162
      .of = fobj->f[0],
×
163
  };
164

UNCOV
165
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
166

167
  // create new
UNCOV
168
  op = (STFileOp){
×
169
      .optype = TSDB_FOP_CREATE,
UNCOV
170
      .fid = fobj->f->fid,
×
171
      .nf =
172
          {
UNCOV
173
              .type = fobj->f->type,
×
UNCOV
174
              .did = did[0],
×
UNCOV
175
              .fid = fobj->f->fid,
×
UNCOV
176
              .minVer = fobj->f->minVer,
×
UNCOV
177
              .maxVer = fobj->f->maxVer,
×
UNCOV
178
              .cid = fobj->f->cid,
×
UNCOV
179
              .size = fobj->f->size,
×
180
              .lcn = lcn,
181
              .stt[0] =
182
                  {
UNCOV
183
                      .level = fobj->f->stt[0].level,
×
184
                  },
185
          },
186
  };
187

UNCOV
188
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
189

190
  // do copy the file
191

UNCOV
192
  if (lcn < 1) {
×
UNCOV
193
    TAOS_CHECK_GOTO(tsdbDoCopyFile(rtner, fobj, &op.nf), &lino, _exit);
×
194
  } else {
195
    TAOS_CHECK_GOTO(tsdbDoCopyFileLC(rtner, fobj, &op.nf), &lino, _exit);
×
196
  }
197

198
_exit:
×
UNCOV
199
  if (code) {
×
200
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
201
              tstrerror(code));
202
  }
UNCOV
203
  return code;
×
204
}
205

206
typedef struct {
207
  STsdb  *tsdb;
208
  int64_t now;
209
  int32_t fid;
210
  bool    s3Migrate;
211
} SRtnArg;
212

UNCOV
213
static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
×
UNCOV
214
  int32_t code = 0;
×
UNCOV
215
  int32_t lino = 0;
×
216

UNCOV
217
  if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
×
UNCOV
218
    TAOS_CHECK_GOTO(tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION), &lino, _exit);
×
219

UNCOV
220
    (void)taosThreadMutexLock(&rtner->tsdb->mutex);
×
221

UNCOV
222
    code = tsdbFSEditCommit(rtner->tsdb->pFS);
×
UNCOV
223
    if (code) {
×
224
      (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
×
225
      TSDB_CHECK_CODE(code, lino, _exit);
×
226
    }
227

UNCOV
228
    (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
×
229

UNCOV
230
    TARRAY2_DESTROY(&rtner->fopArr, NULL);
×
231
  }
232

UNCOV
233
_exit:
×
UNCOV
234
  if (code) {
×
235
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
236
              tstrerror(code));
237
  } else {
UNCOV
238
    tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
×
239
  }
UNCOV
240
  return code;
×
241
}
242

UNCOV
243
static int32_t tsdbRemoveOrMoveFileObject(SRTNer *rtner, int32_t expLevel, STFileObj *fobj) {
×
UNCOV
244
  int32_t code = 0;
×
UNCOV
245
  int32_t lino = 0;
×
246

UNCOV
247
  if (fobj == NULL) {
×
UNCOV
248
    return code;
×
249
  }
250

UNCOV
251
  if (expLevel < 0) {
×
252
    // remove the file
253
    code = tsdbDoRemoveFileObject(rtner, fobj);
×
254
    TSDB_CHECK_CODE(code, lino, _exit);
×
UNCOV
255
  } else if (expLevel > fobj->f->did.level) {
×
256
    // Try to move the file to a new level
UNCOV
257
    for (; expLevel > fobj->f->did.level; expLevel--) {
×
UNCOV
258
      SDiskID diskId = {0};
×
259

UNCOV
260
      code = tsdbAllocateDiskAtLevel(rtner->tsdb, expLevel, tsdbFTypeLabel(fobj->f->type), &diskId);
×
UNCOV
261
      if (code) {
×
262
        tsdbTrace("vgId:%d, cannot allocate disk for file %s, level:%d, reason:%s, skip!", TD_VID(rtner->tsdb->pVnode),
×
263
                  fobj->fname, expLevel, tstrerror(code));
264
        code = 0;
×
265
        continue;
×
266
      } else {
UNCOV
267
        tsdbInfo("vgId:%d start to migrate file %s from level %d to %d, size:%" PRId64, TD_VID(rtner->tsdb->pVnode),
×
268
                 fobj->fname, fobj->f->did.level, diskId.level, fobj->f->size);
269

UNCOV
270
        code = tsdbDoMigrateFileObj(rtner, fobj, &diskId);
×
UNCOV
271
        TSDB_CHECK_CODE(code, lino, _exit);
×
272

UNCOV
273
        tsdbInfo("vgId:%d end to migrate file %s from level %d to %d, size:%" PRId64, TD_VID(rtner->tsdb->pVnode),
×
274
                 fobj->fname, fobj->f->did.level, diskId.level, fobj->f->size);
UNCOV
275
        break;
×
276
      }
277
    }
278
  }
279

UNCOV
280
_exit:
×
UNCOV
281
  if (code) {
×
282
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
283
              tstrerror(code));
284
  }
UNCOV
285
  return code;
×
286
}
287

UNCOV
288
static int32_t tsdbDoRetention(SRTNer *rtner) {
×
UNCOV
289
  int32_t    code = 0;
×
UNCOV
290
  int32_t    lino = 0;
×
UNCOV
291
  STFileObj *fobj = NULL;
×
UNCOV
292
  STFileSet *fset = rtner->fset;
×
293

294
  // handle data file sets
UNCOV
295
  int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
×
UNCOV
296
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
×
UNCOV
297
    code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fset->farr[ftype]);
×
UNCOV
298
    TSDB_CHECK_CODE(code, lino, _exit);
×
299
  }
300

301
  // handle stt file
302
  SSttLvl *lvl;
UNCOV
303
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
UNCOV
304
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
UNCOV
305
      code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fobj);
×
UNCOV
306
      TSDB_CHECK_CODE(code, lino, _exit);
×
307
    }
308
  }
309

UNCOV
310
_exit:
×
UNCOV
311
  if (code) {
×
312
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
313
              tstrerror(code));
314
  }
UNCOV
315
  return code;
×
316
}
317

318
static void tsdbRetentionCancel(void *arg) { taosMemoryFree(arg); }
×
319

320
static int32_t tsdbDoS3Migrate(SRTNer *rtner);
321

UNCOV
322
static int32_t tsdbRetention(void *arg) {
×
UNCOV
323
  int32_t code = 0;
×
UNCOV
324
  int32_t lino = 0;
×
325

UNCOV
326
  SRtnArg   *rtnArg = (SRtnArg *)arg;
×
UNCOV
327
  STsdb     *pTsdb = rtnArg->tsdb;
×
UNCOV
328
  SVnode    *pVnode = pTsdb->pVnode;
×
UNCOV
329
  STFileSet *fset = NULL;
×
UNCOV
330
  SRTNer     rtner = {
×
331
          .tsdb = pTsdb,
UNCOV
332
          .szPage = pVnode->config.tsdbPageSize,
×
UNCOV
333
          .now = rtnArg->now,
×
UNCOV
334
          .cid = tsdbFSAllocEid(pTsdb->pFS),
×
335
  };
336

337
  // begin task
UNCOV
338
  (void)taosThreadMutexLock(&pTsdb->mutex);
×
339

340
  // check if background task is disabled
UNCOV
341
  if (pTsdb->bgTaskDisabled) {
×
342
    tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(pTsdb->pVnode));
×
343
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
344
    return 0;
×
345
  }
346

347
  // set flag and copy
UNCOV
348
  tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION, &fset);
×
UNCOV
349
  if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) {
×
350
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
351
    TSDB_CHECK_CODE(code, lino, _exit);
×
352
  }
353

UNCOV
354
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
355

356
  // do retention
UNCOV
357
  if (rtner.fset) {
×
UNCOV
358
    if (rtnArg->s3Migrate) {
×
359
#ifdef USE_S3
UNCOV
360
      TAOS_CHECK_GOTO(tsdbDoS3Migrate(&rtner), &lino, _exit);
×
361
#endif
362
    } else {
UNCOV
363
      TAOS_CHECK_GOTO(tsdbDoRetention(&rtner), &lino, _exit);
×
364
    }
365

UNCOV
366
    TAOS_CHECK_GOTO(tsdbDoRetentionEnd(&rtner), &lino, _exit);
×
367
  }
368

UNCOV
369
_exit:
×
UNCOV
370
  if (rtner.fset) {
×
UNCOV
371
    (void)taosThreadMutexLock(&pTsdb->mutex);
×
UNCOV
372
    tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION);
×
UNCOV
373
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
374
  }
375

376
  // clear resources
UNCOV
377
  tsdbTFileSetClear(&rtner.fset);
×
UNCOV
378
  TARRAY2_DESTROY(&rtner.fopArr, NULL);
×
379
  taosMemoryFree(arg);
×
UNCOV
380
  if (code) {
×
UNCOV
381
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
382
  }
UNCOV
383
  return code;
×
384
}
385

UNCOV
386
static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate) {
×
UNCOV
387
  int32_t code = 0;
×
UNCOV
388
  int32_t lino = 0;
×
389

390
  // check if background task is disabled
391
  if (tsdb->bgTaskDisabled) {
×
UNCOV
392
    tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(tsdb->pVnode));
×
UNCOV
393
    return 0;
×
394
  }
395

396
  STFileSet *fset;
UNCOV
397
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
×
398
    SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
×
UNCOV
399
    if (arg == NULL) {
×
UNCOV
400
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
401
    }
402

UNCOV
403
    arg->tsdb = tsdb;
×
UNCOV
404
    arg->now = now;
×
UNCOV
405
    arg->fid = fset->fid;
×
UNCOV
406
    arg->s3Migrate = s3Migrate;
×
407

UNCOV
408
    code = vnodeAsync(RETENTION_TASK_ASYNC, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg,
×
409
                      &fset->retentionTask);
410
    if (code) {
×
UNCOV
411
      taosMemoryFree(arg);
×
UNCOV
412
      TSDB_CHECK_CODE(code, lino, _exit);
×
413
    }
414
  }
415

416
_exit:
×
UNCOV
417
  if (code) {
×
UNCOV
418
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
419
  }
UNCOV
420
  return code;
×
421
}
422

UNCOV
423
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now) {
×
UNCOV
424
  int32_t code = 0;
×
UNCOV
425
  (void)taosThreadMutexLock(&tsdb->mutex);
×
UNCOV
426
  code = tsdbAsyncRetentionImpl(tsdb, now, false);
×
UNCOV
427
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
UNCOV
428
  return code;
×
429
}
430

431
#ifdef USE_S3
UNCOV
432
static int32_t tsdbS3FidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int32_t s3KeepLocal, int64_t nowSec) {
×
433
  int32_t localFid;
434
  TSKEY   key;
435

436
  if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
×
437
    nowSec = nowSec * 1000;
×
438
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
×
UNCOV
439
    nowSec = nowSec * 1000000l;
×
UNCOV
440
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
×
441
    nowSec = nowSec * 1000000000l;
×
442
  }
443

444
  nowSec = nowSec - pKeepCfg->keepTimeOffset * tsTickPerHour[pKeepCfg->precision];
×
445

446
  key = nowSec - s3KeepLocal * tsTickPerMin[pKeepCfg->precision];
×
447
  localFid = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
×
448

449
  if (fid >= localFid) {
×
UNCOV
450
    return 0;
×
451
  } else {
UNCOV
452
    return 1;
×
453
  }
454
}
455

456
static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int64_t size, int64_t chunksize) {
×
457
  int32_t   code = 0;
×
458
  int32_t   lino = 0;
×
UNCOV
459
  STFileOp  op = {0};
×
UNCOV
460
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
461
  int32_t   lcn = fobj->f->lcn + (size - 1) / chunksize;
×
462

463
  // remove old
464
  op = (STFileOp){
×
465
      .optype = TSDB_FOP_REMOVE,
UNCOV
466
      .fid = fobj->f->fid,
×
467
      .of = fobj->f[0],
×
468
  };
469

470
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
471

472
  // create new
UNCOV
473
  op = (STFileOp){
×
474
      .optype = TSDB_FOP_CREATE,
475
      .fid = fobj->f->fid,
×
476
      .nf =
477
          {
478
              .type = fobj->f->type,
×
479
              .did = fobj->f->did,
×
480
              .fid = fobj->f->fid,
×
481
              .minVer = fobj->f->minVer,
×
UNCOV
482
              .maxVer = fobj->f->maxVer,
×
UNCOV
483
              .cid = fobj->f->cid,
×
UNCOV
484
              .size = fobj->f->size,
×
485
              .lcn = lcn,
486
              .stt[0] =
487
                  {
UNCOV
488
                      .level = fobj->f->stt[0].level,
×
489
                  },
490
          },
491
  };
492

493
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
494

495
  char fname[TSDB_FILENAME_LEN];
496
  tsdbTFileName(rtner->tsdb, &op.nf, fname);
×
497
  char   *object_name = taosDirEntryBaseName(fname);
×
498
  char    object_name_prefix[TSDB_FILENAME_LEN];
499
  int32_t node_id = vnodeNodeId(rtner->tsdb->pVnode);
×
500
  snprintf(object_name_prefix, TSDB_FQDN_LEN, "%d/%s", node_id, object_name);
×
501

502
  char *dot = strrchr(object_name_prefix, '.');
×
UNCOV
503
  if (!dot) {
×
UNCOV
504
    tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
×
505
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
506
  }
507

508
  char *dot2 = strchr(object_name, '.');
×
UNCOV
509
  if (!dot) {
×
510
    tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
×
UNCOV
511
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
512
  }
513
  snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", fobj->f->lcn);
×
514

515
  // do copy the file
UNCOV
516
  for (int32_t cn = fobj->f->lcn; cn < lcn; ++cn) {
×
517
    snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
×
UNCOV
518
    int64_t c_offset = chunksize * (cn - fobj->f->lcn);
×
519

UNCOV
520
    TAOS_CHECK_GOTO(tcsPutObjectFromFileOffset(fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
×
521
  }
522

523
  // copy last chunk
524
  int64_t lc_offset = chunksize * (lcn - fobj->f->lcn);
×
UNCOV
525
  int64_t lc_size = size - lc_offset;
×
526

527
  snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", fobj->f->lcn);
×
528

UNCOV
529
  fdFrom = taosOpenFile(fname, TD_FILE_READ);
×
UNCOV
530
  if (fdFrom == NULL) {
×
531
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
532
  }
533

534
  tsdbInfo("vgId:%d, open lcfile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, lc_size);
×
535

536
  snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", lcn);
×
UNCOV
537
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
UNCOV
538
  if (fdTo == NULL) {
×
539
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
540
  }
541

UNCOV
542
  int64_t n = taosFSendFile(fdTo, fdFrom, &lc_offset, lc_size);
×
UNCOV
543
  if (n < 0) {
×
544
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
545
  }
546

UNCOV
547
_exit:
×
UNCOV
548
  if (code) {
×
549
    tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
550
              tstrerror(code));
551
  }
UNCOV
552
  if (taosCloseFile(&fdFrom) != 0) {
×
553
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
554
  }
555

556
  if (taosCloseFile(&fdTo) != 0) {
×
UNCOV
557
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
558
  }
559
  return code;
×
560
}
561

562
static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64_t size, int64_t chunksize) {
×
563
  int32_t   code = 0;
×
564
  int32_t   lino = 0;
×
UNCOV
565
  STFileOp  op = {0};
×
UNCOV
566
  int32_t   lcn = (size - 1) / chunksize + 1;
×
567
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
568

569
  // remove old
570
  op = (STFileOp){
×
571
      .optype = TSDB_FOP_REMOVE,
UNCOV
572
      .fid = fobj->f->fid,
×
573
      .of = fobj->f[0],
×
574
  };
575

576
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
577

578
  // create new
UNCOV
579
  op = (STFileOp){
×
580
      .optype = TSDB_FOP_CREATE,
581
      .fid = fobj->f->fid,
×
582
      .nf =
583
          {
584
              .type = fobj->f->type,
×
585
              .did = fobj->f->did,
×
586
              .fid = fobj->f->fid,
×
587
              .minVer = fobj->f->minVer,
×
UNCOV
588
              .maxVer = fobj->f->maxVer,
×
UNCOV
589
              .cid = fobj->f->cid,
×
UNCOV
590
              .size = fobj->f->size,
×
591
              .lcn = lcn,
592
              .stt[0] =
593
                  {
UNCOV
594
                      .level = fobj->f->stt[0].level,
×
595
                  },
596
          },
597
  };
598

599
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
600

601
  char fname[TSDB_FILENAME_LEN];
602
  tsdbTFileName(rtner->tsdb, &op.nf, fname);
×
603
  char   *object_name = taosDirEntryBaseName(fname);
×
604
  char    object_name_prefix[TSDB_FILENAME_LEN];
605
  int32_t node_id = vnodeNodeId(rtner->tsdb->pVnode);
×
606
  snprintf(object_name_prefix, TSDB_FQDN_LEN, "%d/%s", node_id, object_name);
×
607

608
  char *dot = strrchr(object_name_prefix, '.');
×
UNCOV
609
  if (!dot) {
×
UNCOV
610
    tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
×
UNCOV
611
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
612
  }
613

614
  // do copy the file
UNCOV
615
  for (int32_t cn = 1; cn < lcn; ++cn) {
×
616
    snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
×
UNCOV
617
    int64_t c_offset = chunksize * (cn - 1);
×
618

UNCOV
619
    TAOS_CHECK_GOTO(tcsPutObjectFromFileOffset(fobj->fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
×
620
  }
621

622
  // copy last chunk
623
  int64_t lc_offset = (int64_t)(lcn - 1) * chunksize;
×
624
  int64_t lc_size = size - lc_offset;
×
625

626
  dot = strchr(object_name, '.');
×
UNCOV
627
  if (!dot) {
×
628
    tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
×
UNCOV
629
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
630
  }
631
  snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name), "%d.data", lcn);
×
632

UNCOV
633
  fdFrom = taosOpenFile(fobj->fname, TD_FILE_READ);
×
UNCOV
634
  if (fdFrom == NULL) {
×
635
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
636
  }
637

638
  tsdbInfo("vgId: %d, open lcfile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, fobj->f->size);
×
639

UNCOV
640
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
UNCOV
641
  if (fdTo == NULL) {
×
642
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
643
  }
644

UNCOV
645
  int64_t n = taosFSendFile(fdTo, fdFrom, &lc_offset, lc_size);
×
UNCOV
646
  if (n < 0) {
×
647
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
648
  }
649

UNCOV
650
_exit:
×
UNCOV
651
  if (code) {
×
652
    tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
653
              tstrerror(code));
654
  }
655
  if (taosCloseFile(&fdFrom) != 0) {
×
656
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
657
  }
658
  if (taosCloseFile(&fdTo) != 0) {
×
UNCOV
659
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
660
  }
661
  return code;
×
662
}
663

UNCOV
664
static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
×
665
  int32_t code = 0;
×
666
  int32_t lino = 0;
×
667

668
  STFileSet *fset = rtner->fset;
×
UNCOV
669
  STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
×
UNCOV
670
  if (!fobj) {
×
671
    return 0;
×
672
  }
673

UNCOV
674
  int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
×
UNCOV
675
  if (expLevel < 0) {  // expired
×
676
    return 0;
×
677
  }
678

679
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
680
  int32_t    s3KeepLocal = pCfg->s3KeepLocal;
×
UNCOV
681
  int32_t    s3ExpLevel = tsdbS3FidLevel(fset->fid, &rtner->tsdb->keepCfg, s3KeepLocal, rtner->now);
×
UNCOV
682
  if (s3ExpLevel < 1) {  // keep on local storage
×
683
    return 0;
×
684
  }
685

686
  int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
687
  int32_t lcn = fobj->f->lcn;
×
688

689
  if (/*lcn < 1 && */ taosCheckExistFile(fobj->fname)) {
×
690
    int64_t mtime = 0;
×
691
    int64_t size = 0;
×
UNCOV
692
    int32_t r = taosStatFile(fobj->fname, &size, &mtime, NULL);
×
UNCOV
693
    if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
×
694
      if (pCfg->s3Compact && lcn < 0) {
×
695
        extern int32_t tsdbAsyncCompact(STsdb * tsdb, const STimeWindow *tw, bool sync);
696

697
        STimeWindow win = {0};
×
698
        tsdbFidKeyRange(fset->fid, rtner->tsdb->keepCfg.days, rtner->tsdb->keepCfg.precision, &win.skey, &win.ekey);
×
699

700
        tsdbInfo("vgId:%d, async compact begin lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
×
UNCOV
701
        code = tsdbAsyncCompact(rtner->tsdb, &win, pCfg->sttTrigger == 1);
×
UNCOV
702
        tsdbInfo("vgId:%d, async compact end lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
×
UNCOV
703
        goto _exit;
×
704
        return code;
705
      }
706

707
      TAOS_CHECK_GOTO(tsdbMigrateDataFileS3(rtner, fobj, size, chunksize), &lino, _exit);
×
708
    }
709
  } else {
UNCOV
710
    if (lcn <= 1) {
×
711
      TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _exit);
×
712
    }
713
    char fname1[TSDB_FILENAME_LEN];
714
    tsdbTFileLastChunkName(rtner->tsdb, fobj->f, fname1);
×
715

716
    if (taosCheckExistFile(fname1)) {
×
717
      int64_t mtime = 0;
×
UNCOV
718
      int64_t size = 0;
×
719
      if (taosStatFile(fname1, &size, &mtime, NULL) != 0) {
×
720
        tsdbError("vgId:%d, %s failed at %s:%d ", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, __LINE__);
×
721
      }
UNCOV
722
      if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
×
723
        TAOS_CHECK_GOTO(tsdbMigrateDataFileLCS3(rtner, fobj, size, chunksize), &lino, _exit);
×
724
      }
725
    } else {
UNCOV
726
      tsdbError("vgId:%d, file: %s not found, %s at line %d", TD_VID(rtner->tsdb->pVnode), fname1, __func__, lino);
×
UNCOV
727
      return code;
×
728
    }
729
  }
730

UNCOV
731
_exit:
×
UNCOV
732
  if (code) {
×
733
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
734
              tstrerror(code));
735
  }
736
  return code;
×
737
}
738

739
int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) {
×
740
  int32_t code = 0;
×
741

742
  int32_t expired = grantCheck(TSDB_GRANT_OBJECT_STORAGE);
×
743
  if (expired && tsS3Enabled) {
×
744
    tsdbWarn("s3 grant expired: %d", expired);
×
UNCOV
745
    tsS3Enabled = false;
×
UNCOV
746
  } else if (!expired && tsS3EnabledCfg) {
×
747
    tsS3Enabled = true;
×
748
  }
749

UNCOV
750
  if (!tsS3Enabled) {
×
751
    return 0;
×
752
  }
753

UNCOV
754
  (void)taosThreadMutexLock(&tsdb->mutex);
×
755
  code = tsdbAsyncRetentionImpl(tsdb, now, true);
×
756
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
757

758
  if (code) {
×
UNCOV
759
    tsdbError("vgId:%d, %s failed, reason:%s", TD_VID(tsdb->pVnode), __func__, tstrerror(code));
×
760
  }
UNCOV
761
  return code;
×
762
}
763

UNCOV
764
static int32_t tsdbGetS3SizeImpl(STsdb *tsdb, int64_t *size) {
×
UNCOV
765
  int32_t code = 0;
×
766

UNCOV
767
  SVnodeCfg *pCfg = &tsdb->pVnode->config;
×
UNCOV
768
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
769

770
  STFileSet *fset;
771
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
×
772
    STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
×
773
    if (fobj) {
×
UNCOV
774
      int32_t lcn = fobj->f->lcn;
×
UNCOV
775
      if (lcn > 1) {
×
UNCOV
776
        *size += ((lcn - 1) * chunksize);
×
777
      }
778
    }
779
  }
780

UNCOV
781
  return code;
×
782
}
783
#endif
784

UNCOV
785
int32_t tsdbGetS3Size(STsdb *tsdb, int64_t *size) {
×
UNCOV
786
  int32_t code = 0;
×
787
#ifdef USE_S3
UNCOV
788
  (void)taosThreadMutexLock(&tsdb->mutex);
×
UNCOV
789
  code = tsdbGetS3SizeImpl(tsdb, size);
×
UNCOV
790
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
791
#endif
UNCOV
792
  return code;
×
793
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc