• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3546

03 Dec 2024 10:02AM UTC coverage: 60.691% (-0.1%) from 60.839%
#3546

push

travis-ci

web-flow
Merge pull request #29015 from taosdata/fix/TS-5668

[TS-5668] fix(keeper): fix endpoint value too long for column/tag and eliminate warnings

120577 of 253823 branches covered (47.5%)

Branch coverage included in aggregate %.

201666 of 277134 relevant lines covered (72.77%)

18719900.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

28.33
/source/dnode/vnode/src/tsdb/tsdbRetention.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcs.h"
17
#include "tsdb.h"
18
#include "tsdbFS2.h"
19
#include "vnd.h"
20

21
typedef struct {
22
  STsdb  *tsdb;
23
  int32_t szPage;
24
  int64_t now;
25
  int64_t cid;
26

27
  STFileSet   *fset;
28
  TFileOpArray fopArr;
29
} SRTNer;
30

31
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
×
32
  STFileOp op = {
×
33
      .optype = TSDB_FOP_REMOVE,
34
      .fid = fobj->f->fid,
×
35
      .of = fobj->f[0],
36
  };
37

38
  return TARRAY2_APPEND(&rtner->fopArr, op);
×
39
}
40

41
static int32_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
19✔
42
  int64_t interval = 1000;  // 1s
19✔
43
  int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX;
19!
44
  int64_t offset = 0;
19✔
45
  int64_t remain = size;
19✔
46

47
  while (remain > 0) {
38✔
48
    int64_t n;
49
    int64_t last = taosGetTimestampMs();
19✔
50
    if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) {
19!
51
      TAOS_CHECK_RETURN(terrno);
×
52
    }
53

54
    remain -= n;
19✔
55

56
    if (remain > 0) {
19!
57
      int64_t elapsed = taosGetTimestampMs() - last;
×
58
      if (elapsed < interval) {
×
59
        taosMsleep(interval - elapsed);
×
60
      }
61
    }
62
  }
63

64
  return 0;
19✔
65
}
66

67
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
×
68
  int32_t   code = 0;
×
69
  int32_t   lino = 0;
×
70
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
71
  char      fname_from[TSDB_FILENAME_LEN];
72
  char      fname_to[TSDB_FILENAME_LEN];
73

74
  tsdbTFileLastChunkName(rtner->tsdb, from->f, fname_from);
×
75
  tsdbTFileLastChunkName(rtner->tsdb, to, fname_to);
×
76

77
  fdFrom = taosOpenFile(fname_from, TD_FILE_READ);
×
78
  if (fdFrom == NULL) {
×
79
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
80
  }
81

82
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname_to, from->f->size);
×
83

84
  fdTo = taosOpenFile(fname_to, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
85
  if (fdTo == NULL) {
×
86
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
87
  }
88

89
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
90
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
91
  int64_t    lc_size = tsdbLogicToFileSize(to->size, rtner->szPage) - chunksize * (to->lcn - 1);
×
92

93
  if (taosFSendFile(fdTo, fdFrom, 0, lc_size) < 0) {
×
94
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
95
  }
96

97
_exit:
×
98
  if (code) {
×
99
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
100
              tstrerror(code));
101
  }
102
  if (taosCloseFile(&fdFrom) != 0) {
×
103
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_from);
×
104
  }
105
  if (taosCloseFile(&fdTo) != 0) {
×
106
    tsdbError("vgId:%d, failed to close file %s", TD_VID(rtner->tsdb->pVnode), fname_to);
×
107
  }
108
  return code;
×
109
}
110

111
static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile *to) {
19✔
112
  int32_t code = 0;
19✔
113
  int32_t lino = 0;
19✔
114

115
  char      fname[TSDB_FILENAME_LEN];
116
  TdFilePtr fdFrom = NULL;
19✔
117
  TdFilePtr fdTo = NULL;
19✔
118

119
  tsdbTFileName(rtner->tsdb, to, fname);
19✔
120

121
  fdFrom = taosOpenFile(from->fname, TD_FILE_READ);
19✔
122
  if (fdFrom == NULL) {
19!
123
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
124
  }
125

126
  tsdbInfo("vgId: %d, open tofile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, from->f->size);
19!
127

128
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
19✔
129
  if (fdTo == NULL) {
19!
130
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
131
  }
132
  TSDB_CHECK_CODE(code, lino, _exit);
19!
133

134
  TAOS_CHECK_GOTO(tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
38!
135
                                               tsRetentionSpeedLimitMB),
136
                  &lino, _exit);
137

138
_exit:
19✔
139
  if (code) {
19!
140
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
141
              tstrerror(code));
142
  }
143
  if (taosCloseFile(&fdFrom) != 0) {
19!
144
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
145
  }
146
  if (taosCloseFile(&fdTo) != 0) {
19!
147
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
148
  }
149
  return code;
19✔
150
}
151

152
static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const SDiskID *did) {
19✔
153
  int32_t  code = 0;
19✔
154
  int32_t  lino = 0;
19✔
155
  STFileOp op = {0};
19✔
156
  int32_t  lcn = fobj->f->lcn;
19✔
157

158
  // remove old
159
  op = (STFileOp){
19✔
160
      .optype = TSDB_FOP_REMOVE,
161
      .fid = fobj->f->fid,
19✔
162
      .of = fobj->f[0],
19✔
163
  };
164

165
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
38!
166

167
  // create new
168
  op = (STFileOp){
19✔
169
      .optype = TSDB_FOP_CREATE,
170
      .fid = fobj->f->fid,
19✔
171
      .nf =
172
          {
173
              .type = fobj->f->type,
19✔
174
              .did = did[0],
19✔
175
              .fid = fobj->f->fid,
19✔
176
              .minVer = fobj->f->minVer,
19✔
177
              .maxVer = fobj->f->maxVer,
19✔
178
              .cid = fobj->f->cid,
19✔
179
              .size = fobj->f->size,
19✔
180
              .lcn = lcn,
181
              .stt[0] =
182
                  {
183
                      .level = fobj->f->stt[0].level,
19✔
184
                  },
185
          },
186
  };
187

188
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
38!
189

190
  // do copy the file
191

192
  if (lcn < 1) {
19!
193
    TAOS_CHECK_GOTO(tsdbDoCopyFile(rtner, fobj, &op.nf), &lino, _exit);
19!
194
  } else {
195
    TAOS_CHECK_GOTO(tsdbDoCopyFileLC(rtner, fobj, &op.nf), &lino, _exit);
×
196
  }
197

198
_exit:
×
199
  if (code) {
19!
200
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
201
              tstrerror(code));
202
  }
203
  return code;
19✔
204
}
205

206
typedef struct {
207
  STsdb  *tsdb;
208
  int64_t now;
209
  int32_t fid;
210
  bool    s3Migrate;
211
} SRtnArg;
212

213
static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
416✔
214
  int32_t code = 0;
416✔
215
  int32_t lino = 0;
416✔
216

217
  if (TARRAY2_SIZE(&rtner->fopArr) > 0) {
416✔
218
    TAOS_CHECK_GOTO(tsdbFSEditBegin(rtner->tsdb->pFS, &rtner->fopArr, TSDB_FEDIT_RETENTION), &lino, _exit);
19!
219

220
    (void)taosThreadMutexLock(&rtner->tsdb->mutex);
19✔
221

222
    code = tsdbFSEditCommit(rtner->tsdb->pFS);
19✔
223
    if (code) {
19!
224
      (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
×
225
      TSDB_CHECK_CODE(code, lino, _exit);
×
226
    }
227

228
    (void)taosThreadMutexUnlock(&rtner->tsdb->mutex);
19✔
229

230
    TARRAY2_DESTROY(&rtner->fopArr, NULL);
19!
231
  }
232

233
_exit:
397✔
234
  if (code) {
416!
235
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
236
              tstrerror(code));
237
  } else {
238
    tsdbDebug("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtner->tsdb->pVnode), rtner->cid, __func__);
416!
239
  }
240
  return code;
415✔
241
}
242

243
static int32_t tsdbDoRetention(SRTNer *rtner) {
416✔
244
  int32_t    code = 0;
416✔
245
  int32_t    lino = 0;
416✔
246
  STFileObj *fobj = NULL;
416✔
247
  STFileSet *fset = rtner->fset;
416✔
248
  int32_t    expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
416✔
249

250
  if (expLevel < 0) {  // remove the fileset
416!
251
    for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) {
×
252
      if (fobj == NULL) continue;
×
253
      TAOS_CHECK_GOTO(tsdbDoRemoveFileObject(rtner, fobj), &lino, _exit);
×
254
    }
255

256
    SSttLvl *lvl;
257
    TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
258
      TARRAY2_FOREACH(lvl->fobjArr, fobj) { TAOS_CHECK_GOTO(tsdbDoRemoveFileObject(rtner, fobj), &lino, _exit); }
×
259
    }
260
  } else if (expLevel == 0) {  // only migrate to upper level
416✔
261
    return 0;
280✔
262
  } else {  // migrate
263
    SDiskID did;
264

265
    TAOS_CHECK_GOTO(tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did), &lino, _exit);
136!
266
    code = tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did);
136✔
267
    TSDB_CHECK_CODE(code, lino, _exit);
136!
268

269
    // data
270
    for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = fset->farr[ftype], 1); ++ftype) {
680✔
271
      if (fobj == NULL) continue;
544✔
272

273
      if (fobj->f->did.level == did.level) {
27!
274
        continue;
27✔
275
      }
276

277
      if (fobj->f->did.level > did.level) {
×
278
        continue;
×
279
      }
280
      tsdbInfo("file:%s size: %" PRId64 " do migrate from %d to %d", fobj->fname, fobj->f->size, fobj->f->did.level,
×
281
               did.level);
282

283
      TAOS_CHECK_GOTO(tsdbDoMigrateFileObj(rtner, fobj, &did), &lino, _exit);
×
284
    }
285

286
    // stt
287
    SSttLvl *lvl;
288
    TARRAY2_FOREACH(fset->lvlArr, lvl) {
263✔
289
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
317✔
290
        if (fobj->f->did.level == did.level) {
190✔
291
          continue;
171✔
292
        }
293

294
        TAOS_CHECK_GOTO(tsdbDoMigrateFileObj(rtner, fobj, &did), &lino, _exit);
19!
295
      }
296
    }
297
  }
298

299
_exit:
136✔
300
  if (code) {
136!
301
    tsdbError("vgId:%d, %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
302
              tstrerror(code));
303
  }
304
  return code;
136✔
305
}
306

307
static void tsdbRetentionCancel(void *arg) { taosMemoryFree(arg); }
×
308

309
static int32_t tsdbDoS3Migrate(SRTNer *rtner);
310

311
static int32_t tsdbRetention(void *arg) {
415✔
312
  int32_t code = 0;
415✔
313
  int32_t lino = 0;
415✔
314

315
  SRtnArg   *rtnArg = (SRtnArg *)arg;
415✔
316
  STsdb     *pTsdb = rtnArg->tsdb;
415✔
317
  SVnode    *pVnode = pTsdb->pVnode;
415✔
318
  STFileSet *fset = NULL;
415✔
319
  SRTNer     rtner = {
831✔
320
          .tsdb = pTsdb,
321
          .szPage = pVnode->config.tsdbPageSize,
415✔
322
          .now = rtnArg->now,
415✔
323
          .cid = tsdbFSAllocEid(pTsdb->pFS),
415✔
324
  };
325

326
  // begin task
327
  (void)taosThreadMutexLock(&pTsdb->mutex);
416✔
328
  tsdbBeginTaskOnFileSet(pTsdb, rtnArg->fid, &fset);
416✔
329
  if (fset && (code = tsdbTFileSetInitCopy(pTsdb, fset, &rtner.fset))) {
415!
330
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
331
    TSDB_CHECK_CODE(code, lino, _exit);
×
332
  }
333
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
416✔
334

335
  // do retention
336
  if (rtner.fset) {
416!
337
    if (rtnArg->s3Migrate) {
416!
338
      TAOS_CHECK_GOTO(tsdbDoS3Migrate(&rtner), &lino, _exit);
×
339
    } else {
340
      TAOS_CHECK_GOTO(tsdbDoRetention(&rtner), &lino, _exit);
416!
341
    }
342

343
    TAOS_CHECK_GOTO(tsdbDoRetentionEnd(&rtner), &lino, _exit);
416!
344
  }
345

346
_exit:
415✔
347
  if (rtner.fset) {
415!
348
    (void)taosThreadMutexLock(&pTsdb->mutex);
415✔
349
    tsdbFinishTaskOnFileSet(pTsdb, rtnArg->fid);
416✔
350
    (void)taosThreadMutexUnlock(&pTsdb->mutex);
416✔
351
  }
352

353
  // clear resources
354
  tsdbTFileSetClear(&rtner.fset);
415✔
355
  TARRAY2_DESTROY(&rtner.fopArr, NULL);
416!
356
  taosMemoryFree(arg);
416✔
357
  if (code) {
415!
358
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
359
  }
360
  return code;
415✔
361
}
362

363
static int32_t tsdbAsyncRetentionImpl(STsdb *tsdb, int64_t now, bool s3Migrate) {
215✔
364
  int32_t code = 0;
215✔
365
  int32_t lino = 0;
215✔
366

367
  STFileSet *fset;
368

369
  if (!tsdb->bgTaskDisabled) {
215!
370
    TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
630✔
371
      TAOS_CHECK_GOTO(tsdbTFileSetOpenChannel(fset), &lino, _exit);
414!
372

373
      SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
416✔
374
      if (arg == NULL) {
416!
375
        TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
376
      }
377

378
      arg->tsdb = tsdb;
416✔
379
      arg->now = now;
416✔
380
      arg->fid = fset->fid;
416✔
381
      arg->s3Migrate = s3Migrate;
416✔
382

383
      if ((code = vnodeAsync(&fset->channel, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, NULL))) {
416!
384
        taosMemoryFree(arg);
×
385
        TSDB_CHECK_CODE(code, lino, _exit);
×
386
      }
387
    }
388
  }
389

390
_exit:
216✔
391
  if (code) {
216!
392
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
×
393
  }
394
  return code;
215✔
395
}
396

397
int32_t tsdbAsyncRetention(STsdb *tsdb, int64_t now) {
215✔
398
  int32_t code = 0;
215✔
399
  (void)taosThreadMutexLock(&tsdb->mutex);
215✔
400
  code = tsdbAsyncRetentionImpl(tsdb, now, false);
215✔
401
  (void)taosThreadMutexUnlock(&tsdb->mutex);
215✔
402
  return code;
215✔
403
}
404

405
static int32_t tsdbS3FidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int32_t s3KeepLocal, int64_t nowSec) {
×
406
  int32_t localFid;
407
  TSKEY   key;
408

409
  if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
×
410
    nowSec = nowSec * 1000;
×
411
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
×
412
    nowSec = nowSec * 1000000l;
×
413
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
×
414
    nowSec = nowSec * 1000000000l;
×
415
  }
416

417
  nowSec = nowSec - pKeepCfg->keepTimeOffset * tsTickPerHour[pKeepCfg->precision];
×
418

419
  key = nowSec - s3KeepLocal * tsTickPerMin[pKeepCfg->precision];
×
420
  localFid = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
×
421

422
  if (fid >= localFid) {
×
423
    return 0;
×
424
  } else {
425
    return 1;
×
426
  }
427
}
428

429
static int32_t tsdbMigrateDataFileLCS3(SRTNer *rtner, const STFileObj *fobj, int64_t size, int64_t chunksize) {
×
430
  int32_t   code = 0;
×
431
  int32_t   lino = 0;
×
432
  STFileOp  op = {0};
×
433
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
434
  int32_t   lcn = fobj->f->lcn + (size - 1) / chunksize;
×
435

436
  // remove old
437
  op = (STFileOp){
×
438
      .optype = TSDB_FOP_REMOVE,
439
      .fid = fobj->f->fid,
×
440
      .of = fobj->f[0],
×
441
  };
442

443
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
444

445
  // create new
446
  op = (STFileOp){
×
447
      .optype = TSDB_FOP_CREATE,
448
      .fid = fobj->f->fid,
×
449
      .nf =
450
          {
451
              .type = fobj->f->type,
×
452
              .did = fobj->f->did,
×
453
              .fid = fobj->f->fid,
×
454
              .minVer = fobj->f->minVer,
×
455
              .maxVer = fobj->f->maxVer,
×
456
              .cid = fobj->f->cid,
×
457
              .size = fobj->f->size,
×
458
              .lcn = lcn,
459
              .stt[0] =
460
                  {
461
                      .level = fobj->f->stt[0].level,
×
462
                  },
463
          },
464
  };
465

466
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
467

468
  char fname[TSDB_FILENAME_LEN];
469
  tsdbTFileName(rtner->tsdb, &op.nf, fname);
×
470
  char   *object_name = taosDirEntryBaseName(fname);
×
471
  char    object_name_prefix[TSDB_FILENAME_LEN];
472
  int32_t node_id = vnodeNodeId(rtner->tsdb->pVnode);
×
473
  snprintf(object_name_prefix, TSDB_FQDN_LEN, "%d/%s", node_id, object_name);
×
474

475
  char *dot = strrchr(object_name_prefix, '.');
×
476
  if (!dot) {
×
477
    tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
×
478
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
479
  }
480

481
  char *dot2 = strchr(object_name, '.');
×
482
  if (!dot) {
×
483
    tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
×
484
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
485
  }
486
  snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", fobj->f->lcn);
×
487

488
  // do copy the file
489
  for (int32_t cn = fobj->f->lcn; cn < lcn; ++cn) {
×
490
    snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
×
491
    int64_t c_offset = chunksize * (cn - fobj->f->lcn);
×
492

493
    TAOS_CHECK_GOTO(tcsPutObjectFromFileOffset(fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
×
494
  }
495

496
  // copy last chunk
497
  int64_t lc_offset = chunksize * (lcn - fobj->f->lcn);
×
498
  int64_t lc_size = size - lc_offset;
×
499

500
  snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", fobj->f->lcn);
×
501

502
  fdFrom = taosOpenFile(fname, TD_FILE_READ);
×
503
  if (fdFrom == NULL) {
×
504
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
505
  }
506

507
  tsdbInfo("vgId:%d, open lcfile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, lc_size);
×
508

509
  snprintf(dot2 + 1, TSDB_FQDN_LEN - (dot2 + 1 - object_name), "%d.data", lcn);
×
510
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
511
  if (fdTo == NULL) {
×
512
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
513
  }
514

515
  int64_t n = taosFSendFile(fdTo, fdFrom, &lc_offset, lc_size);
×
516
  if (n < 0) {
×
517
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
518
  }
519

520
_exit:
×
521
  if (code) {
×
522
    tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
523
              tstrerror(code));
524
  }
525
  if (taosCloseFile(&fdFrom) != 0) {
×
526
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
527
  }
528

529
  if (taosCloseFile(&fdTo) != 0) {
×
530
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
531
  }
532
  return code;
×
533
}
534

535
static int32_t tsdbMigrateDataFileS3(SRTNer *rtner, const STFileObj *fobj, int64_t size, int64_t chunksize) {
×
536
  int32_t   code = 0;
×
537
  int32_t   lino = 0;
×
538
  STFileOp  op = {0};
×
539
  int32_t   lcn = (size - 1) / chunksize + 1;
×
540
  TdFilePtr fdFrom = NULL, fdTo = NULL;
×
541

542
  // remove old
543
  op = (STFileOp){
×
544
      .optype = TSDB_FOP_REMOVE,
545
      .fid = fobj->f->fid,
×
546
      .of = fobj->f[0],
×
547
  };
548

549
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
550

551
  // create new
552
  op = (STFileOp){
×
553
      .optype = TSDB_FOP_CREATE,
554
      .fid = fobj->f->fid,
×
555
      .nf =
556
          {
557
              .type = fobj->f->type,
×
558
              .did = fobj->f->did,
×
559
              .fid = fobj->f->fid,
×
560
              .minVer = fobj->f->minVer,
×
561
              .maxVer = fobj->f->maxVer,
×
562
              .cid = fobj->f->cid,
×
563
              .size = fobj->f->size,
×
564
              .lcn = lcn,
565
              .stt[0] =
566
                  {
567
                      .level = fobj->f->stt[0].level,
×
568
                  },
569
          },
570
  };
571

572
  TAOS_CHECK_GOTO(TARRAY2_APPEND(&rtner->fopArr, op), &lino, _exit);
×
573

574
  char fname[TSDB_FILENAME_LEN];
575
  tsdbTFileName(rtner->tsdb, &op.nf, fname);
×
576
  char   *object_name = taosDirEntryBaseName(fname);
×
577
  char    object_name_prefix[TSDB_FILENAME_LEN];
578
  int32_t node_id = vnodeNodeId(rtner->tsdb->pVnode);
×
579
  snprintf(object_name_prefix, TSDB_FQDN_LEN, "%d/%s", node_id, object_name);
×
580

581
  char *dot = strrchr(object_name_prefix, '.');
×
582
  if (!dot) {
×
583
    tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
×
584
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
585
  }
586

587
  // do copy the file
588
  for (int32_t cn = 1; cn < lcn; ++cn) {
×
589
    snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", cn);
×
590
    int64_t c_offset = chunksize * (cn - 1);
×
591

592
    TAOS_CHECK_GOTO(tcsPutObjectFromFileOffset(fobj->fname, object_name_prefix, c_offset, chunksize), &lino, _exit);
×
593
  }
594

595
  // copy last chunk
596
  int64_t lc_offset = (int64_t)(lcn - 1) * chunksize;
×
597
  int64_t lc_size = size - lc_offset;
×
598

599
  dot = strchr(object_name, '.');
×
600
  if (!dot) {
×
601
    tsdbError("vgId:%d, incorrect lcn: %d, %s at line %d", TD_VID(rtner->tsdb->pVnode), lcn, __func__, lino);
×
602
    TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit);
×
603
  }
604
  snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name), "%d.data", lcn);
×
605

606
  fdFrom = taosOpenFile(fobj->fname, TD_FILE_READ);
×
607
  if (fdFrom == NULL) {
×
608
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
609
  }
610

611
  tsdbInfo("vgId: %d, open lcfile: %s size: %" PRId64, TD_VID(rtner->tsdb->pVnode), fname, fobj->f->size);
×
612

613
  fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
614
  if (fdTo == NULL) {
×
615
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
616
  }
617

618
  int64_t n = taosFSendFile(fdTo, fdFrom, &lc_offset, lc_size);
×
619
  if (n < 0) {
×
620
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
621
  }
622

623
_exit:
×
624
  if (code) {
×
625
    tsdbError("vgId:%d %s failed at line %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
626
              tstrerror(code));
627
  }
628
  if (taosCloseFile(&fdFrom) != 0) {
×
629
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
630
  }
631
  if (taosCloseFile(&fdTo) != 0) {
×
632
    tsdbTrace("vgId:%d, failed to close file", TD_VID(rtner->tsdb->pVnode));
×
633
  }
634
  return code;
×
635
}
636

637
static int32_t tsdbDoS3Migrate(SRTNer *rtner) {
×
638
  int32_t code = 0;
×
639
  int32_t lino = 0;
×
640

641
  STFileSet *fset = rtner->fset;
×
642
  STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
×
643
  if (!fobj) {
×
644
    return 0;
×
645
  }
646

647
  int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
×
648
  if (expLevel < 0) {  // expired
×
649
    return 0;
×
650
  }
651

652
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
653
  int32_t    s3KeepLocal = pCfg->s3KeepLocal;
×
654
  int32_t    s3ExpLevel = tsdbS3FidLevel(fset->fid, &rtner->tsdb->keepCfg, s3KeepLocal, rtner->now);
×
655
  if (s3ExpLevel < 1) {  // keep on local storage
×
656
    return 0;
×
657
  }
658

659
  int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
×
660
  int32_t lcn = fobj->f->lcn;
×
661

662
  if (/*lcn < 1 && */ taosCheckExistFile(fobj->fname)) {
×
663
    int32_t mtime = 0;
×
664
    int64_t size = 0;
×
665
    int32_t r = taosStatFile(fobj->fname, &size, &mtime, NULL);
×
666
    if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
×
667
      if (pCfg->s3Compact && lcn < 0) {
×
668
        extern int32_t tsdbAsyncCompact(STsdb * tsdb, const STimeWindow *tw, bool sync);
669

670
        STimeWindow win = {0};
×
671
        tsdbFidKeyRange(fset->fid, rtner->tsdb->keepCfg.days, rtner->tsdb->keepCfg.precision, &win.skey, &win.ekey);
×
672

673
        tsdbInfo("vgId:%d, async compact begin lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
×
674
        code = tsdbAsyncCompact(rtner->tsdb, &win, pCfg->sttTrigger == 1);
×
675
        tsdbInfo("vgId:%d, async compact end lcn: %d.", TD_VID(rtner->tsdb->pVnode), lcn);
×
676
        goto _exit;
×
677
        return code;
678
      }
679

680
      TAOS_CHECK_GOTO(tsdbMigrateDataFileS3(rtner, fobj, size, chunksize), &lino, _exit);
×
681
    }
682
  } else {
683
    if (lcn <= 1) {
×
684
      TAOS_CHECK_GOTO(TSDB_CODE_INVALID_PARA, &lino, _exit);
×
685
    }
686
    char fname1[TSDB_FILENAME_LEN];
687
    tsdbTFileLastChunkName(rtner->tsdb, fobj->f, fname1);
×
688

689
    if (taosCheckExistFile(fname1)) {
×
690
      int32_t mtime = 0;
×
691
      int64_t size = 0;
×
692
      if (taosStatFile(fname1, &size, &mtime, NULL) != 0) {
×
693
        tsdbError("vgId:%d, %s failed at %s:%d ", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, __LINE__);
×
694
      }
695
      if (size > chunksize && mtime < rtner->now - tsS3UploadDelaySec) {
×
696
        TAOS_CHECK_GOTO(tsdbMigrateDataFileLCS3(rtner, fobj, size, chunksize), &lino, _exit);
×
697
      }
698
    } else {
699
      tsdbError("vgId:%d, file: %s not found, %s at line %d", TD_VID(rtner->tsdb->pVnode), fname1, __func__, lino);
×
700
      return code;
×
701
    }
702
  }
703

704
_exit:
×
705
  if (code) {
×
706
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(rtner->tsdb->pVnode), __func__, __FILE__, lino,
×
707
              tstrerror(code));
708
  }
709
  return code;
×
710
}
711

712
int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) {
×
713
  int32_t code = 0;
×
714

715
  int32_t expired = grantCheck(TSDB_GRANT_OBJECT_STORAGE);
×
716
  if (expired && tsS3Enabled) {
×
717
    tsdbWarn("s3 grant expired: %d", expired);
×
718
    tsS3Enabled = false;
×
719
  } else if (!expired && tsS3EnabledCfg) {
×
720
    tsS3Enabled = true;
×
721
  }
722

723
  if (!tsS3Enabled) {
×
724
    return 0;
×
725
  }
726

727
  (void)taosThreadMutexLock(&tsdb->mutex);
×
728
  code = tsdbAsyncRetentionImpl(tsdb, now, true);
×
729
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
730

731
  if (code) {
×
732
    tsdbError("vgId:%d, %s failed, reason:%s", TD_VID(tsdb->pVnode), __func__, tstrerror(code));
×
733
  }
734
  return code;
×
735
}
736

737
static int32_t tsdbGetS3SizeImpl(STsdb *tsdb, int64_t *size) {
22✔
738
  int32_t code = 0;
22✔
739

740
  SVnodeCfg *pCfg = &tsdb->pVnode->config;
22✔
741
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
22✔
742

743
  STFileSet *fset;
744
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
42✔
745
    STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
20✔
746
    if (fobj) {
20!
747
      int32_t lcn = fobj->f->lcn;
×
748
      if (lcn > 1) {
×
749
        *size += ((lcn - 1) * chunksize);
×
750
      }
751
    }
752
  }
753

754
  return code;
22✔
755
}
756
int32_t tsdbGetS3Size(STsdb *tsdb, int64_t *size) {
22✔
757
  int32_t code = 0;
22✔
758
  (void)taosThreadMutexLock(&tsdb->mutex);
22✔
759
  code = tsdbGetS3SizeImpl(tsdb, size);
22✔
760
  (void)taosThreadMutexUnlock(&tsdb->mutex);
22✔
761
  return code;
22✔
762
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc