• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3842

07 Apr 2025 11:21AM UTC coverage: 62.696% (-0.3%) from 63.027%
#3842

push

travis-ci

web-flow
merge: from main to 3.0 branch (#30679)

154855 of 315075 branches covered (49.15%)

Branch coverage included in aggregate %.

6 of 8 new or added lines in 5 files covered. (75.0%)

2309 existing lines in 130 files now uncovered.

240176 of 314995 relevant lines covered (76.25%)

19119980.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.37
/source/dnode/vnode/src/tsdb/tsdbRetention.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcs.h"
17
#include "tsdb.h"
18
#include "tsdbFS2.h"
19
#include "vnd.h"
20

21
extern int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool s3Migrate);
22

23
typedef struct {
24
  STsdb  *tsdb;
25
  int32_t szPage;
26
  int64_t now;
27
  int64_t cid;
28

29
  STFileSet   *fset;
30
  TFileOpArray fopArr;
31
} SRTNer;
32

33
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
×
34
  STFileOp op = {
×
35
      .optype = TSDB_FOP_REMOVE,
36
      .fid = fobj->f->fid,
×
37
      .of = fobj->f[0],
38
  };
39

40
  return TARRAY2_APPEND(&rtner->fopArr, op);
×
41
}
42

43
static int32_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
28✔
44
  int64_t interval = 1000;  // 1s
28✔
45
  int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX;
28!
46
  int64_t offset = 0;
28✔
47
  int64_t remain = size;
28✔
48

49
  while (remain > 0) {
56✔
50
    int64_t n;
51
    int64_t last = taosGetTimestampMs();
28✔
52
    if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) {
28!
53
      TAOS_CHECK_RETURN(terrno);
×
54
    }
55

56
    remain -= n;
28✔
57

58
    if (remain > 0) {
28!
59
      int64_t elapsed = taosGetTimestampMs() - last;
×
60
      if (elapsed < interval) {
×
61
        taosMsleep(interval - elapsed);
×
62
      }
63
    }
64
  }
65

66
  return 0;
28✔
67
}
68

69
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
×
70
  int32_t   code = 0;
×
71
  int32_t   lino = 0;
×
72
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
73
  char      fname_from[TSDB_FILENAME_LEN];
74
  char      fname_to[TSDB_FILENAME_LEN];
75

76
  tsdbTFileLastChunkName(rtner->tsdb, from->f, fname_from);
×
77
  tsdbTFileLastChunkName(rtner->tsdb, to, fname_to);
×
78

79
  fdFrom = taosOpenFile(fname_from, TD_FILE_READ);
×
80
  if (fdFrom == NULL) {
×
81
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
82
  }
83

84
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname_to, from->f->size);
×
85

86
  fdTo = taosOpenFile(fname_to, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
87
  if (fdTo == NULL) {
×
88
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
89
  }
90

91
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
92
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
93
  int64_t    lc_size = tsdbLogicToFileSize(to->size, rtner->szPage) - chunksize * (to->lcn - 1);
×
94

95
  if (taosFSendFile(fdTo, fdFrom, 0, lc_size) < 0) {
×
96
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
97
  }
98

99
_exit:
×
100
  if (code) {
×
101
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
102
              tstrerror(code));
103
  }
104
  if (taosCloseFile(&fdFrom) != 0) {
×
105
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_from);
×
106
  }
107
  if (taosCloseFile(&fdTo) != 0) {
×
108
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_to);
×
109
  }
110
  return code;
×
111
}
112

113
static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile *to) {
28✔
114
  int32_t code = 0;
28✔
115
  int32_t lino = 0;
28✔
116

117
  char      fname[TSDB_FILENAME_LEN];
118
  TdFilePtr fdFrom = NULL;
28✔
119
  TdFilePtr fdTo = NULL;
28✔
120

121
  tsdbTFileName(rtner->tsdb, to, fname);
28✔
122

123
  fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
28✔
124
  if (fdFrom == NULL) {
28!
125
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
126
  }
127

128
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, from->f->size);
28!
129

130
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
28✔
131
  if (fdTo == NULL) {
28!
132
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
133
  }
134
  TSDB_CHECK_CODE(code, lino, _exit);
28!
135

136
  TAOS_CHECK_GOTO(tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
56!
137
                                               tsRetentionSpeedLimitMB),
138
                  &lino, _exit);
139

140
_exit:
28✔
141
  if (code) {
28!
142
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
143
              tstrerror(code));
144
  }
145
  if (taosCloseFile(&fdFrom) != 0) {
28!
146
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
147
  }
148
  if (taosCloseFile(&fdTo) != 0) {
28!
149
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
150
  }
151
  return code;
28✔
152
}
153

154
static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) {
28✔
155
  int32_t  code = 0;
28✔
156
  int32_t  lino = 0;
28✔
157
  STFileOp op = {0};
28✔
158
  int32_t  lcn = fobj->f->lcn;
28✔
159

160
  // remove old
161
  op = (STFileOp){
28✔
162
      .optype = TSDB_FOP_REMOVE,
163
      .fid = fobj->f->fid,
28✔
164
      .of = fobj->f[0],
28✔
165
  };
166

167
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
56!
168

169
  // create new
170
  op = (STFileOp){
28✔
171
      .optype = TSDB_FOP_CREATE,
172
      .fid = fobj->f->fid,
28✔
173
      .nf =
174
          {
175
              .type = fobj->f->type,
28✔
176
              .did = did[0],
28✔
177
              .fid = fobj->f->fid,
28✔
178
              .minVer = fobj->f->minVer,
28✔
179
              .maxVer = fobj->f->maxVer,
28✔
180
              .cid = fobj->f->cid,
28✔
181
              .size = fobj->f->size,
28✔
182
              .lcn = lcn,
183
              .stt[0] =
184
                  {
185
                      .level = fobj->f->stt[0].level,
28✔
186
                  },
187
          },
188
  };
189

190
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
56!
191

192
  // do copy the file
193

194
  if (lcn < 1) {
28!
195
    TAOS_CHECK_GOTO(tsdbDoCopyFile(rtner, fobj, &op.nf), &lino, _exit);
28!
196
  } else {
197
    TAOS_CHECK_GOTO(tsdbDoCopyFileLC(rtner, fobj, &op.nf), &lino, _exit);
×
198
  }
199

200
_exit:
×
201
  if (code) {
28!
202
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
203
              tstrerror(code));
204
  }
205
  return code;
28✔
206
}
207

208
typedef struct {
209
  STsdb  *tsdb;
210
  int64_t now;
211
  int32_t fid;
212
  bool    s3Migrate;
213
} SRtnArg;
214

215
static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
434✔
216
  int32_t code = 0;
434✔
217
  int32_t lino = 0;
434✔
218

219
  if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
434✔
220
    TAOS_CHECK_GOTO(tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION), &lino, _exit);
11!
221

222
    (void)taosThreadMutexLock(&rtner->tsdb->mutex);
11✔
223

224
    code = tsdbFSEditCommit(rtner->tsdb->pFS);
11✔
225
    if (code) {
11!
226
      (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
×
227
      TSDB_CHECK_CODE(code, lino, _exit);
×
228
    }
229

230
    (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
11✔
231

232
    TARRAY2_DESTROY(&rtner->fopArr, NULL);
11!
233
  }
234

235
_exit:
423✔
236
  if (code) {
434!
237
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
238
              tstrerror(code));
239
  } else {
240
    tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
434!
241
  }
242
  return code;
434✔
243
}
244

245
static int32_t tsdbRemoveOrMoveFileObject(SRTNer *rtner, int32_t expLevel, STFileObj *fobj) {
2,155✔
246
  int32_t code = 0;
2,155✔
247
  int32_t lino = 0;
2,155✔
248

249
  if (fobj == NULL) {
2,155✔
250
    return code;
1,589✔
251
  }
252

253
  if (expLevel < 0) {
566!
254
    // remove the file
255
    code = tsdbDoRemoveFileObject(rtner, fobj);
×
256
    TSDB_CHECK_CODE(code, lino, _exit);
×
257
  } else if (expLevel > fobj->f->did.level) {
566✔
258
    // Try to move the file to a new level
259
    for (; expLevel > fobj->f->did.level; expLevel--) {
28!
260
      SDiskID diskId = {0};
28✔
261

262
      code = tsdbAllocateDiskAtLevel(rtner->tsdb, expLevel, tsdbFTypeLabel(fobj->f->type), &diskId);
28✔
263
      if (code) {
28!
264
        tsdbTrace("vgId:%d, cannot allocate disk for file %s, level:%d, reason:%s, skip!", TD_VID(rtner->tsdb->pVnode),
×
265
                  fobj->fname, expLevel, tstrerror(code));
266
        code = 0;
×
267
        continue;
×
268
      } else {
269
        tsdbInfo("vgId:%d start to migrate file %s from level %d to %d, size:%" PRId64, TD_VID(rtner->tsdb->pVnode),
28!
270
                 fobj->fname, fobj->f->did.level, diskId.level, fobj->f->size);
271

272
        code = tsdbDoMigrateFileObj(rtner, fobj, &diskId);
28✔
273
        TSDB_CHECK_CODE(code, lino, _exit);
28!
274

275
        tsdbInfo("vgId:%d end to migrate file %s from level %d to %d, size:%" PRId64, TD_VID(rtner->tsdb->pVnode),
28!
276
                 fobj->fname, fobj->f->did.level, diskId.level, fobj->f->size);
277
        break;
28✔
278
      }
279
    }
280
  }
281

282
_exit:
538✔
283
  if (code) {
566!
284
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
285
              tstrerror(code));
286
  }
287
  return code;
566✔
288
}
289

290
static int32_t tsdbDoRetention(SRTNer *rtner) {
434✔
291
  int32_t    code = 0;
434✔
292
  int32_t    lino = 0;
434✔
293
  STFileObj *fobj = NULL;
434✔
294
  STFileSet *fset = rtner->fset;
434✔
295

296
  // handle data file sets
297
  int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
434✔
298
  for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ++ftype) {
2,170✔
299
    code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fset->farr[ftype]);
1,736✔
300
    TSDB_CHECK_CODE(code, lino, _exit);
1,736!
301
  }
302

303
  // handle stt file
304
  SSttLvl *lvl;
305
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
836✔
306
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
821✔
307
      code = tsdbRemoveOrMoveFileObject(rtner, expLevel, fobj);
419✔
308
      TSDB_CHECK_CODE(code, lino, _exit);
419!
309
    }
310
  }
311

312
_exit:
434✔
313
  if (code) {
434!
314
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
315
              tstrerror(code));
316
  }
317
  return code;
434✔
318
}
319

320
static void tsdbRetentionCancel(void *arg) { taosMemoryFree(arg); }
×
321

322
static int32_t tsdbDoS3Migrate(SRTNer *rtner);
323

324
static int32_t tsdbRetention(void *arg) {
434✔
325
  int32_t code = 0;
434✔
326
  int32_t lino = 0;
434✔
327

328
  SRtnArg   *rtnArg = (SRtnArg *)arg;
434✔
329
  STsdb     *pTsdb = rtnArg->tsdb;
434✔
330
  SVnode    *pVnode = pTsdb->pVnode;
434✔
331
  STFileSet *fset = NULL;
434✔
332
  SRTNer     rtner = {
868✔
333
          .tsdb = pTsdb,
334
          .szPage = pVnode->config.tsdbPageSize,
434✔
335
          .now = rtnArg->now,
434✔
336
          .cid = tsdbFSAllocEid(pTsdb->pFS),
434✔
337
  };
338

339
  // begin task
340
  (void)taosThreadMutexLock(&pTsdb->mutex);
434✔
341

342
  // check if background task is disabled
343
  if (pTsdb->bgTaskDisabled) {
434!
344
    tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(pTsdb->pVnode));
×
345
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
346
    return 0;
×
347
  }
348

349
  // set flag and copy
350
  tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION, &fset);
434✔
351
  if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) {
434!
352
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
353
    TSDB_CHECK_CODE(code, lino, _exit);
×
354
  }
355

356
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
434✔
357

358
  // do retention
359
  if (rtner.fset) {
434!
360
    if (rtnArg->s3Migrate) {
434!
361
#ifdef USE_S3
362
      TAOS_CHECK_GOTO(tsdbDoS3Migrate(&rtner), &lino, _exit);
×
363
#endif
364
    } else {
365
      TAOS_CHECK_GOTO(tsdbDoRetention(&rtner), &lino, _exit);
434!
366
    }
367

368
    TAOS_CHECK_GOTO(tsdbDoRetentionEnd(&rtner), &lino, _exit);
434!
369
  }
370

371
_exit:
434✔
372
  if (rtner.fset) {
434!
373
    (void)taosThreadMutexLock(&pTsdb->mutex);
434✔
374
    tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid, EVA_TASK_RETENTION);
434✔
375
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
434✔
376
  }
377

378
  // clear resources
379
  tsdbTFileSetClear(&rtner.fset);
434✔
380
  TARRAY2_DESTROY(&rtner.fopArr, NULL);
434!
381
  taosMemoryFree(arg);
434!
382
  if (code) {
434!
383
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
384
  }
385
  return code;
434✔
386
}
387

388
static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate) {
216✔
389
  int32_t code = 0;
216✔
390
  int32_t lino = 0;
216✔
391

392
  // check if background task is disabled
393
  if (tsdb->bgTaskDisabled) {
216!
394
    tsdbInfo("vgId:%d, background task is disabled, skip retention", TD_VID(tsdb->pVnode));
×
395
    return 0;
×
396
  }
397

398
  STFileSet *fset;
399
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
650✔
400
    SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
431!
401
    if (arg == NULL) {
433!
402
      TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
403
    }
404

405
    arg->tsdb = tsdb;
433✔
406
    arg->now = now;
433✔
407
    arg->fid = fset->fid;
433✔
408
    arg->s3Migrate = s3Migrate;
433✔
409

410
    code = vnodeAsync(RETENTION_TASK_ASYNC, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg,
433✔
411
                      &fset->retentionTask);
412
    if (code) {
434!
413
      taosMemoryFree(arg);
×
414
      TSDB_CHECK_CODE(code, lino, _exit);
×
415
    }
416
  }
417

418
_exit:
219✔
419
  if (code) {
219!
420
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
421
  }
422
  return code;
215✔
423
}
424

425
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now) {
216✔
426
  int32_t code = 0;
216✔
427
  (void)taosThreadMutexLock(&tsdb->mutex);
216✔
428
  code = tsdbAsyncRetentionImpl(tsdb, now, false);
216✔
429
  (void)taosThreadMutexUnlock(&tsdb->mutex);
215✔
430
  return code;
216✔
431
}
432

433
#ifdef USE_S3
434
static int32_t tsdbS3FidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int32_t s3KeepLocal, int64_t nowSec) {
×
435
  int32_t localFid;
436
  TSKEY   key;
437

438
  if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
×
439
    nowSec = nowSec * 1000;
×
440
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
×
441
    nowSec = nowSec * 1000000l;
×
442
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
×
443
    nowSec = nowSec * 1000000000l;
×
444
  }
445

446
  nowSec = nowSec - pKeepCfg->keepTimeOffset * tsTickPerHour[pKeepCfg->precision];
×
447

448
  key = nowSec - s3KeepLocal * tsTickPerMin[pKeepCfg->precision];
×
449
  localFid = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
×
450

451
  if (fid >= localFid) {
×
452
    return 0;
×
453
  } else {
454
    return 1;
×
455
  }
456
}
457

458
static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int64_t size, int64_t chunksize) {
×
459
  int32_t   code = 0;
×
460
  int32_t   lino = 0;
×
461
  STFileOp  op = {0};
×
462
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
463
  int32_t   lcn = fobj->f->lcn + (size - 1) / chunksize;
×
464

465
  // remove old
466
  op = (STFileOp){
×
467
      .optype = TSDB_FOP_REMOVE,
468
      .fid = fobj->f->fid,
×
469
      .of = fobj->f[0],
×
470
  };
471

472
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
473

474
  // create new
475
  op = (STFileOp){
×
476
      .optype = TSDB_FOP_CREATE,
477
      .fid = fobj->f->fid,
×
478
      .nf =
479
          {
480
              .type = fobj->f->type,
×
481
              .did = fobj->f->did,
×
482
              .fid = fobj->f->fid,
×
483
              .minVer = fobj->f->minVer,
×
484
              .maxVer = fobj->f->maxVer,
×
485
              .cid = fobj->f->cid,
×
486
              .size = fobj->f->size,
×
487
              .lcn = lcn,
488
              .stt[0] =
489
                  {
490
                      .level = fobj->f->stt[0].level,
×
491
                  },
492
          },
493
  };
494

495
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
496

497
  char fname[TSDB_FILENAME_LEN];
498
  tsdbTFileName(rtner->tsdb, &op.nf, fname);
×
499
  char   *object_name = taosDirEntryBaseName(fname);
×
500
  char    object_name_prefix[TSDB_FILENAME_LEN];
501
  int32_t node_id = vnodeNodeId(rtner->tsdb->pVnode);
×
502
  snprintf(object_name_prefix, TSDB_FQDN_LEN, "%d/%s", node_id, object_name);
×
503

504
  char *dot = strrchr(object_name_prefix, '.');
×
505
  if (!dot) {
×
506
    tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
×
507
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
508
  }
509

510
  char *dot2 = strchr(object_name, '.');
×
511
  if (!dot) {
×
512
    tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
×
513
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
514
  }
515
  snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", fobj->f->lcn);
×
516

517
  // do copy the file
518
  for (int32_t cn = fobj->f->lcn; cn < lcn; ++cn) {
×
519
    snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
×
520
    int64_t c_offset = chunksize * (cn - fobj->f->lcn);
×
521

522
    TAOS_CHECK_GOTO(tcsPutObjectFromFileOffset(fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
×
523
  }
524

525
  // copy last chunk
526
  int64_t lc_offset = chunksize * (lcn - fobj->f->lcn);
×
527
  int64_t lc_size = size - lc_offset;
×
528

529
  snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", fobj->f->lcn);
×
530

531
  fdFrom = taosOpenFile(fname, TD_FILE_READ);
×
532
  if (fdFrom == NULL) {
×
533
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
534
  }
535

536
  tsdbInfo("vgId:%d, open lcfile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, lc_size);
×
537

538
  snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", lcn);
×
539
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
540
  if (fdTo == NULL) {
×
541
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
542
  }
543

544
  int64_t n = taosFSendFile(fdTo, fdFrom, &lc_offset, lc_size);
×
545
  if (n < 0) {
×
546
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
547
  }
548

549
_exit:
×
550
  if (code) {
×
551
    tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
552
              tstrerror(code));
553
  }
554
  if (taosCloseFile(&fdFrom) != 0) {
×
555
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
556
  }
557

558
  if (taosCloseFile(&fdTo) != 0) {
×
559
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
560
  }
561
  return code;
×
562
}
563

564
static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64_t size, int64_t chunksize) {
×
565
  int32_t   code = 0;
×
566
  int32_t   lino = 0;
×
567
  STFileOp  op = {0};
×
568
  int32_t   lcn = (size - 1) / chunksize + 1;
×
569
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
570

571
  // remove old
572
  op = (STFileOp){
×
573
      .optype = TSDB_FOP_REMOVE,
574
      .fid = fobj->f->fid,
×
575
      .of = fobj->f[0],
×
576
  };
577

578
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
579

580
  // create new
581
  op = (STFileOp){
×
582
      .optype = TSDB_FOP_CREATE,
583
      .fid = fobj->f->fid,
×
584
      .nf =
585
          {
586
              .type = fobj->f->type,
×
587
              .did = fobj->f->did,
×
588
              .fid = fobj->f->fid,
×
589
              .minVer = fobj->f->minVer,
×
590
              .maxVer = fobj->f->maxVer,
×
591
              .cid = fobj->f->cid,
×
592
              .size = fobj->f->size,
×
593
              .lcn = lcn,
594
              .stt[0] =
595
                  {
596
                      .level = fobj->f->stt[0].level,
×
597
                  },
598
          },
599
  };
600

601
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
602

603
  char fname[TSDB_FILENAME_LEN];
604
  tsdbTFileName(rtner->tsdb, &op.nf, fname);
×
605
  char   *object_name = taosDirEntryBaseName(fname);
×
606
  char    object_name_prefix[TSDB_FILENAME_LEN];
607
  int32_t node_id = vnodeNodeId(rtner->tsdb->pVnode);
×
608
  snprintf(object_name_prefix, TSDB_FQDN_LEN, "%d/%s", node_id, object_name);
×
609

610
  char *dot = strrchr(object_name_prefix, '.');
×
611
  if (!dot) {
×
612
    tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
×
613
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
614
  }
615

616
  // do copy the file
617
  for (int32_t cn = 1; cn < lcn; ++cn) {
×
618
    snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
×
619
    int64_t c_offset = chunksize * (cn - 1);
×
620

621
    TAOS_CHECK_GOTO(tcsPutObjectFromFileOffset(fobj->fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
×
622
  }
623

624
  // copy last chunk
625
  int64_t lc_offset = (int64_t)(lcn - 1) * chunksize;
×
626
  int64_t lc_size = size - lc_offset;
×
627

628
  dot = strchr(object_name, '.');
×
629
  if (!dot) {
×
630
    tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
×
631
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
632
  }
633
  snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name), "%d.data", lcn);
×
634

635
  fdFrom = taosOpenFile(fobj->fname, TD_FILE_READ);
×
636
  if (fdFrom == NULL) {
×
637
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
638
  }
639

640
  tsdbInfo("vgId: %d, open lcfile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, fobj->f->size);
×
641

642
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
643
  if (fdTo == NULL) {
×
644
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
645
  }
646

647
  int64_t n = taosFSendFile(fdTo, fdFrom, &lc_offset, lc_size);
×
648
  if (n < 0) {
×
649
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
650
  }
651

652
_exit:
×
653
  if (code) {
×
654
    tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
655
              tstrerror(code));
656
  }
657
  if (taosCloseFile(&fdFrom) != 0) {
×
658
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
659
  }
660
  if (taosCloseFile(&fdTo) != 0) {
×
661
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
662
  }
663
  return code;
×
664
}
665

666
static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
×
667
  int32_t code = 0;
×
668
  int32_t lino = 0;
×
669

670
  STFileSet *fset = rtner->fset;
×
671
  STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
×
672
  if (!fobj) {
×
673
    return 0;
×
674
  }
675

676
  int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
×
677
  if (expLevel < 0) {  // expired
×
678
    return 0;
×
679
  }
680

681
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
682
  int32_t    s3KeepLocal = pCfg->s3KeepLocal;
×
683
  int32_t    s3ExpLevel = tsdbS3FidLevel(fset->fid, &rtner->tsdb->keepCfg, s3KeepLocal, rtner->now);
×
684
  if (s3ExpLevel < 1) {  // keep on local storage
×
685
    return 0;
×
686
  }
687

688
  int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
689
  int32_t lcn = fobj->f->lcn;
×
690

691
  if (/*lcn < 1 && */ taosCheckExistFile(fobj->fname)) {
×
692
    int64_t mtime = 0;
×
693
    int64_t size = 0;
×
694
    int32_t r = taosStatFile(fobj->fname, &size, &mtime, NULL);
×
695
    if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
×
696
      if (pCfg->s3Compact && lcn < 0) {
×
UNCOV
697
        STimeWindow win = {0};
×
UNCOV
698
        tsdbFidKeyRange(fset->fid, rtner->tsdb->keepCfg.days, rtner->tsdb->keepCfg.precision, &win.skey, &win.ekey);
×
699

700
        tsdbInfo("vgId:%d, async compact begin lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
×
NEW
701
        code = tsdbAsyncCompact(rtner->tsdb, &win, true);
×
702
        tsdbInfo("vgId:%d, async compact end lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
×
703
        goto _exit;
×
704
        return code;
705
      }
706

707
      TAOS_CHECK_GOTO(tsdbMigrateDataFileS3(rtner, fobj, size, chunksize), &lino, _exit);
×
708
    }
709
  } else {
710
    if (lcn <= 1) {
×
711
      TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _exit);
×
712
    }
713
    char fname1[TSDB_FILENAME_LEN];
714
    tsdbTFileLastChunkName(rtner->tsdb, fobj->f, fname1);
×
715

716
    if (taosCheckExistFile(fname1)) {
×
717
      int64_t mtime = 0;
×
718
      int64_t size = 0;
×
719
      if (taosStatFile(fname1, &size, &mtime, NULL) != 0) {
×
720
        tsdbError("vgId:%d, %s failed at %s:%d ", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, __LINE__);
×
721
      }
722
      if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
×
723
        TAOS_CHECK_GOTO(tsdbMigrateDataFileLCS3(rtner, fobj, size, chunksize), &lino, _exit);
×
724
      }
725
    } else {
726
      tsdbError("vgId:%d, file: %s not found, %s at line %d", TD_VID(rtner->tsdb->pVnode), fname1, __func__, lino);
×
727
      return code;
×
728
    }
729
  }
730

731
_exit:
×
732
  if (code) {
×
733
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
734
              tstrerror(code));
735
  }
736
  return code;
×
737
}
738

739
int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) {
×
740
  int32_t code = 0;
×
741

742
  int32_t expired = grantCheck(TSDB_GRANT_OBJECT_STORAGE);
×
743
  if (expired && tsS3Enabled) {
×
744
    tsdbWarn("s3 grant expired: %d", expired);
×
745
    tsS3Enabled = false;
×
746
  } else if (!expired && tsS3EnabledCfg) {
×
747
    tsS3Enabled = true;
×
748
  }
749

750
  if (!tsS3Enabled) {
×
751
    return 0;
×
752
  }
753

754
  (void)taosThreadMutexLock(&tsdb->mutex);
×
755
  code = tsdbAsyncRetentionImpl(tsdb, now, true);
×
756
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
757

758
  if (code) {
×
759
    tsdbError("vgId:%d, %s failed, reason:%s", TD_VID(tsdb->pVnode), __func__, tstrerror(code));
×
760
  }
761
  return code;
×
762
}
763

764
#endif
765

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc