• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4987

16 Mar 2026 12:26PM UTC coverage: 73.883% (+36.6%) from 37.305%
#4987

push

travis-ci

web-flow
feat: support secure delete option. (#34591)

209 of 391 new or added lines in 24 files covered. (53.45%)

3062 existing lines in 140 files now uncovered.

261133 of 353439 relevant lines covered (73.88%)

121262425.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.27
/source/dnode/vnode/src/tsdb/tsdbMigrate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tss.h"
17
#include "tsdb.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSet2.h"
20
#include "vnd.h"
21
#include "tsdbInt.h"
22

23
extern int32_t tsdbAsyncCompact(STsdb* tsdb, const STimeWindow* tw, ETsdbOpType type, bool force);
24

25
// migrate monitor related functions
26
typedef struct SSsMigrateMonitor {
27
  TdThreadCond  stateChanged;
28
  int32_t ssMigrateId;
29
  int32_t fid;
30
  int32_t state;
31
  int64_t startTimeSec;
32
} SSsMigrateMonitor;
33

34

35
static int32_t getSsMigrateId(STsdb* tsdb) {
6,000✔
36
  return tsdb->pSsMigrateMonitor->ssMigrateId;
6,000✔
37
}
38

39

40
int32_t tsdbOpenSsMigrateMonitor(STsdb *tsdb) {
4,206,631✔
41
  SSsMigrateMonitor* pmm = (SSsMigrateMonitor*)taosMemCalloc(1, sizeof(SSsMigrateMonitor));
4,206,631✔
42
  if (pmm == NULL) {
4,217,449✔
43
    return TSDB_CODE_OUT_OF_MEMORY;
×
44
  }
45
  TAOS_UNUSED(taosThreadCondInit(&pmm->stateChanged, NULL));
4,217,449✔
46
  tsdb->pSsMigrateMonitor = pmm;
4,217,606✔
47
  return 0;
4,217,125✔
48
}
49

50

51
void tsdbCloseSsMigrateMonitor(STsdb *tsdb) {
4,217,987✔
52
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
4,217,987✔
53
  if (pmm == NULL) {
4,217,987✔
54
    return;
×
55
  }
56

57
  TAOS_UNUSED(taosThreadCondDestroy(&pmm->stateChanged));
4,217,987✔
58
  taosMemoryFree(tsdb->pSsMigrateMonitor);
4,217,363✔
59
  tsdb->pSsMigrateMonitor = NULL;
4,217,795✔
60
}
61

62

63
static void setMigrationState(STsdb* tsdb, int32_t state) {
3,750✔
64
  (void)taosThreadMutexLock(&tsdb->mutex);
3,750✔
65
  tsdb->pSsMigrateMonitor->state = state;
3,750✔
66
  (void)taosThreadMutexUnlock(&tsdb->mutex);
3,750✔
67
}
3,750✔
68

69

70
int32_t tsdbQuerySsMigrateProgress(STsdb *tsdb, SSsMigrateProgress *pProgress) {
6,750✔
71
  int32_t code = 0, vid = TD_VID(tsdb->pVnode);
6,750✔
72

73
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
6,750✔
74

75
  (void)taosThreadMutexLock(&tsdb->mutex);
6,750✔
76

77
  if (pmm->ssMigrateId != pProgress->ssMigrateId) {
6,750✔
78
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, migrate id mismatch in query progress, actual %d",
×
79
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->ssMigrateId);
80
    code = TSDB_CODE_INVALID_MSG;
×
81
  } else if (pmm->fid != pProgress->fid) {
6,750✔
82
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, file set id mismatch in query progress, actual %d",
×
83
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->fid);
84
    code = TSDB_CODE_INVALID_MSG;
×
85
  } else {
86
    pProgress->state = pmm->state;
6,750✔
87
  }
88

89
  (void)taosThreadMutexUnlock(&tsdb->mutex);
6,750✔
90

91
  return code;
6,750✔
92
}
93

94

95

96
int32_t tsdbUpdateSsMigrateProgress(STsdb* tsdb, SSsMigrateProgress* pProgress) {
6,750✔
97
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
6,750✔
98
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
6,750✔
99

100
  // the state was generated by this vnode, so no need to process it
101
  if (pProgress->nodeId == vnodeNodeId(tsdb->pVnode)) {
6,750✔
102
    tsdbDebug("vgId:%d, skip migration progress update since it was generated by this vnode", vid);
6,750✔
103
    return 0;
6,750✔
104
  }
105

106
  (void)taosThreadMutexLock(&tsdb->mutex);
×
107

108
  if (pmm->ssMigrateId != pProgress->ssMigrateId) {
×
109
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, migrate id mismatch in update progress, actual %d",
×
110
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->ssMigrateId);
111
    code = TSDB_CODE_INVALID_MSG;
×
112
  } else if (pmm->fid != pProgress->fid) {
×
113
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, file set id mismatch in update progress, actual %d",
×
114
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->fid);
115
    code = TSDB_CODE_INVALID_MSG;
×
116
  } else {
117
    // update the state, and broadcast state change message, to avoid the timeout of
118
    // the waiting thread, we should always broadcast the message even if the state
119
    // is not changed actually.
120
    pmm->state = pProgress->state;
×
121
    (void)taosThreadCondBroadcast(&pmm->stateChanged);
×
122
  }
123

124
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
125

126
  return code;
×
127
}
128

129

130

131
// migrate file related functions
132
int32_t tsdbSsFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int32_t ssKeepLocal, int64_t nowSec) {
2,250✔
133
  int32_t localFid;
134
  TSKEY   key;
135

136
  if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
2,250✔
137
    nowSec = nowSec * 1000;
2,250✔
138
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
×
139
    nowSec = nowSec * 1000000l;
×
140
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
×
141
    nowSec = nowSec * 1000000000l;
×
142
  }
143

144
  nowSec = nowSec - pKeepCfg->keepTimeOffset * tsTickPerHour[pKeepCfg->precision];
2,250✔
145

146
  key = nowSec - ssKeepLocal * tsTickPerMin[pKeepCfg->precision];
2,250✔
147
  localFid = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
2,250✔
148

149
  return fid >= localFid ? 0 : 1;
2,250✔
150
}
151

152

153
static int32_t downloadManifest(SVnode* pVnode, int32_t fid, STFileSet** ppFileSet) {
1,500✔
154
  int32_t code = 0, vid = TD_VID(pVnode);
1,500✔
155

156
  char path[64];
1,500✔
157
  snprintf(path, sizeof(path), "vnode%d/f%d/manifests.json", vid, fid);
1,500✔
158
  int64_t size = 0;
1,500✔
159
  code = tssGetFileSizeOfDefault(path, &size);
1,500✔
160
  if (code != TSDB_CODE_SUCCESS) {
1,500✔
161
    tsdbError("vgId:%d, fid:%d, failed to get manifests size since %s", vid, fid, tstrerror(code));
1,500✔
162
    return code;
1,500✔
163
  }
164

165
  char* buf = taosMemoryMalloc(size + 1);
×
166
  code = tssReadFileFromDefault(path, 0, buf, &size);
×
167
  if (code != TSDB_CODE_SUCCESS) {
×
168
    tsdbError("vgId:%d, fid:%d, failed to read manifest from shared storage since %s", vid, fid, tstrerror(code));
×
169
    taosMemoryFree(buf);
×
170
    return code;
×
171
  }
172
  buf[size] = 0;
×
173

174
  cJSON* json = cJSON_Parse(buf);
×
175
  taosMemoryFree(buf);
×
176
  if (json == NULL) {
×
177
    tsdbError("vgId:%d, fid:%d, failed to parse manifest json since %s", vid, fid, tstrerror(code));
×
178
    return TSDB_CODE_FILE_CORRUPTED;
×
179
  }
180
  
181
  code = tsdbJsonToTFileSet(pVnode->pTsdb, json, ppFileSet);
×
182
  cJSON_Delete(json);
×
183
  if (code != TSDB_CODE_SUCCESS) {
×
184
    tsdbError("vgId:%d, fid:%d, failed to parse manifest since %s", vid, fid, tstrerror(code));
×
185
    return code;
×
186
  }
187

188
  STFileSet* fset = *ppFileSet;
×
189
  if (fset->fid != fid) {
×
190
    tsdbError("vgId:%d, fid:%d, mismatch fid, manifest fid is %d", vid, fid, fset->fid);
×
191
    tsdbTFileSetClear(ppFileSet);
×
192
    return TSDB_CODE_FILE_CORRUPTED;
×
193
  }
194
  if (fset->farr[TSDB_FTYPE_DATA] == NULL) {
×
195
    tsdbError("vgId:%d, fid:%d, data file not found in manifest", vid, fid);
×
196
    tsdbTFileSetClear(ppFileSet);
×
197
    return TSDB_CODE_FILE_CORRUPTED;
×
198
  }
199

200
  return code;
×
201
}
202

203

204
static int32_t uploadManifest(int32_t dnode, int32_t vnode, STFileSet* fset, int32_t mid) {
1,500✔
205
  int32_t code = 0;
1,500✔
206

207
  cJSON* json = cJSON_CreateObject();
1,500✔
208
  if (json == NULL) {
1,500✔
209
    return TSDB_CODE_OUT_OF_MEMORY;
×
210
  }
211

212
  // update migration id for all files in the file set
213
  STFileObj* fobj = fset->farr[TSDB_FTYPE_HEAD];
1,500✔
214
  fobj->f->mid = mid;
1,500✔
215
  fobj = fset->farr[TSDB_FTYPE_SMA];
1,500✔
216
  fobj->f->mid = mid;
1,500✔
217
  fobj = fset->farr[TSDB_FTYPE_DATA];
1,500✔
218
  fobj->f->mid = mid;
1,500✔
219
  fobj = fset->farr[TSDB_FTYPE_TOMB];
1,500✔
220
  if (fobj != NULL) {
1,500✔
221
    fobj->f->mid = mid;
×
222
  }
223

224
  SSttLvl* lvl;
225
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
1,500✔
226
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
227
      fobj->f->mid = mid;
×
228
    }
229
  }
230
  
231
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
1,500✔
232
    code = terrno;
×
233
    cJSON_Delete(json);
×
234
    return code;
×
235
  }
236
  if (cJSON_AddNumberToObject(json, "dnode", dnode) == NULL) {
1,500✔
237
    code = terrno;
×
238
    cJSON_Delete(json);
×
239
    return code;
×
240
  }
241
  if (cJSON_AddNumberToObject(json, "vnode", vnode) == NULL) {
1,500✔
242
    code = terrno;
×
243
    cJSON_Delete(json);
×
244
    return code;
×
245
  }
246

247
  code = tsdbTFileSetToJson(fset, json);
1,500✔
248
  if (code != TSDB_CODE_SUCCESS) {
1,500✔
249
    cJSON_Delete(json);
×
250
    return code;
×
251
  }
252
  char* buf = cJSON_PrintUnformatted(json);
1,500✔
253
  cJSON_Delete(json);
1,500✔
254

255
  char path[64];
1,500✔
256
  snprintf(path, sizeof(path), "vnode%d/f%d/manifests.json", vnode, fset->fid);
1,500✔
257
  code = tssUploadToDefault(path, buf, strlen(buf));
1,500✔
258
  taosMemoryFree(buf);
1,500✔
259
  if (code != TSDB_CODE_SUCCESS) {
1,500✔
260
    tsdbError("vgId:%d, fid:%d, failed to upload manifest since %s", vnode, fset->fid, tstrerror(code));
×
261
    return code;
×
262
  }
263

264
  return code;
1,500✔
265
}
266

267

268
// upload local files to shared storage
269
//
270
// local file name is like:
271
//     [base]/vnode2/f1736/v2f1736ver16.head
272
// or
273
//     [base]/vnode2/f1736/v2f1736ver16.m334233.head
274
//
275
// remote file name is like:
276
//     vnode2/f1736/v2f1736ver16.m13552343.head
277
//
278
// NOTE: the migration id is always included in the remote file name, because
279
// the commit id may be different between the vnodes of the same vgroup,
280
// that's an interrupted migration may overwrite some of the remote files
281
// while leaving the others intact if we don't include the migration id in
282
// the remote file name.
283
static int32_t uploadFile(SRTNer* rtner, STFileObj* fobj) {
4,500✔
284
  if (fobj == NULL) {
4,500✔
285
    return TSDB_CODE_SUCCESS;
1,500✔
286
  }
287

288
  const char* ext = strrchr(fobj->fname, '.');
3,000✔
289
  int32_t vid = TD_VID(rtner->tsdb->pVnode), mid = getSsMigrateId(rtner->tsdb);
3,000✔
290
  STFile* f = fobj->f;
3,000✔
291
  
292
  char path[TSDB_FILENAME_LEN];
3,000✔
293
  snprintf(path, sizeof(path), "vnode%d/f%d/v%df%dver%" PRId64 ".m%d%s", vid, f->fid, vid, f->fid, f->cid, mid, ext);
3,000✔
294

295
  int code = tssUploadFileToDefault(path, fobj->fname, 0, -1);
3,000✔
296
  if (code != TSDB_CODE_SUCCESS) {
3,000✔
297
    tsdbError("vgId:%d, fid:%d, failed to upload file %s since %s", vid, f->fid, fobj->fname, tstrerror(code));
×
298
    return code;
×
299
  }
300

301
  return 0;
3,000✔
302
}
303

304

305

306
static int32_t downloadFile(SRTNer* rtner, STFileObj* fobj) {
×
307
  if (fobj == NULL) {
×
308
    return TSDB_CODE_SUCCESS;
×
309
  }
310

311
  const char* fname = strrchr(fobj->fname, TD_DIRSEP_CHAR) + 1;
×
312
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
313
  STFile* f = fobj->f;
×
314
  
UNCOV
315
  char path[TSDB_FILENAME_LEN];
×
316
  snprintf(path, sizeof(path), "vnode%d/f%d/%s", vid, f->fid, fname);
×
317
  int code = tssDownloadFileFromDefault(path, fobj->fname, 0, -1);
×
318
  if (code != TSDB_CODE_SUCCESS) {
×
319
    tsdbError("vgId:%d, fid:%d, failed to download file %s since %s", vid, f->fid, path, tstrerror(code));
×
320
    return code;
×
321
  }
322

323
  return 0;
×
324
}
325

326

327
static void tsdbRemoveSsGarbageFiles(int32_t vid, STFileSet* fset) {
1,500✔
328
  char prefix[TSDB_FILENAME_LEN];
1,500✔
329
  snprintf(prefix, sizeof(prefix), "vnode%d/f%d/", vid, fset->fid);
1,500✔
330

331
  SArray* paths = taosArrayInit(10, sizeof(char*));
1,500✔
332
  int32_t code = tssListFileOfDefault(prefix, paths);
1,500✔
333
  if (code != TSDB_CODE_SUCCESS) {
1,500✔
334
    tsdbError("vgId:%d, fid:%d, failed to list files in shared storage since %s", vid, fset->fid, tstrerror(code));
×
335
    taosArrayDestroy(paths);
×
336
    return;
×
337
  }
338

339
  for(int i = 0; i < taosArrayGetSize(paths); i++) {
9,000✔
340
      char* p = *(char**)taosArrayGet(paths, i);
7,500✔
341
      const char* ext = strrchr(p, '.');
7,500✔
342
      const char* rname = strrchr(p, '/') + 1;
7,500✔
343
      bool remove = true;
7,500✔
344
      int32_t vgId = 0, fid = 0, mid = 0, cn = 0;
7,500✔
345
      int64_t cid = 0;
7,500✔
346
  
347
      // NOTE: when compare file name, don't use strcmp(fobj->fname, rname) because 'fobj->fname' may not
348
      // contain the migration id. that's why we use sscanf to parse the file name.
349

350
      if (ext == NULL) {
7,500✔
351
        // no extension, remove
352
      } else if (taosStrcasecmp(ext, ".head") == 0) {
7,500✔
353
        STFileObj* fobj = fset->farr[TSDB_FTYPE_HEAD];
1,500✔
354
        int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.head", &vgId, &fid, &cid, &mid);
1,500✔
355
        remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
1,500✔
356
      } else if (taosStrcasecmp(ext, ".sma") == 0) {
6,000✔
357
        STFileObj* fobj = fset->farr[TSDB_FTYPE_SMA];
1,500✔
358
        int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.sma", &vgId, &fid, &cid, &mid);
1,500✔
359
        remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
1,500✔
360
      } else if (taosStrcasecmp(ext, ".tomb") == 0) {
4,500✔
361
        STFileObj* fobj = fset->farr[TSDB_FTYPE_TOMB];
×
362
        if (fobj) {
×
363
          int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.tomb", &vgId, &fid, &cid, &mid);
×
364
          remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
×
365
        }
366
      } else if (taosStrcasecmp(ext, ".stt") == 0) {
4,500✔
367
        SSttLvl* lvl = NULL;
×
368
        TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
369
          STFileObj* fobj;
370
          TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
371
            int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.stt", &vgId, &fid, &cid, &mid);
×
372
            if (n == 4 && vgId == vid && fid == fset->fid && cid == fobj->f->cid && mid == fobj->f->mid) {
×
373
              remove = false;
×
374
              break;
×
375
            }
376
          }
377
          if (!remove) {
×
378
            break;
×
379
          }
380
        }
381
      } else if (taosStrcasecmp(ext, ".data") == 0) {
4,500✔
382
        STFileObj* fobj = fset->farr[TSDB_FTYPE_DATA];
3,000✔
383
        int n = sscanf(rname, "v%df%dver%" PRId64 ".%d.data", &vgId, &fid, &cid, &cn);
3,000✔
384
        if (n == 4) {
3,000✔
385
          if (vgId == vid && fid == fset->fid && cid == fobj->f->cid && cn >= 1 && cn < fobj->f->lcn) {
1,500✔
386
            remove = false; // not the last chunk, keep it
1,500✔
387
          }
388
        } else {
389
          n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.%d.data", &vgId, &fid, &cid, &mid, &cn);
1,500✔
390
          remove = (n != 5 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid || cn != fobj->f->lcn);
1,500✔
391
        }
392
      } else {
393
        remove = (taosStrcasecmp(rname, "manifests.json") != 0); // keep manifest, remove all other files
1,500✔
394
      }
395

396
      if (remove) {
7,500✔
397
        int32_t code = tssDeleteFileFromDefault(p);
×
398
        if (code != TSDB_CODE_SUCCESS) {
×
399
          tsdbError("vgId:%d, fid:%d, failed to remove garbage file %s from shared storage since %s", vid, fset->fid, p, tstrerror(code));
×
400
        } else {
401
          tsdbInfo("vgId:%d, fid:%d, garbage file %s is removed from shared storage", vid, fset->fid, p);
×
402
        }
403
      }
404

405
      taosMemoryFree(p);
7,500✔
406
  }
407

408
  taosArrayDestroy(paths);
1,500✔
409
}
410

411

412
// download the last chunk of a data file
413
// remote file name is like:
414
//      vnode2/f1736/v2f1736ver16.m13552343.4.data
415
static int32_t downloadDataFileLastChunk(SRTNer* rtner, STFileObj* fobj) {
×
416
  int32_t code = 0;
×
417
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
418
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
419
  STFile *f = fobj->f;
×
420

UNCOV
421
  char lpath[TSDB_FILENAME_LEN], rpath[TSDB_FILENAME_LEN];
×
422
  tsdbTFileLastChunkName(rtner->tsdb, f, lpath);
×
423
  char* fname = strrchr(lpath, TD_DIRSEP_CHAR) + 1;
×
424

425
  sprintf(rpath, "vnode%d/f%d/%s", vid, f->fid, fname);
×
426

427
  code = tssDownloadFileFromDefault(rpath, lpath, 0, -1);
×
428
  if (code != TSDB_CODE_SUCCESS) {
×
429
    tsdbError("vgId:%d, fid:%d, failed to download data file %s since %s", vid, f->fid, rpath, tstrerror(code));
×
430
    return code;
×
431
  }
432

433
  return code;
×
434
}
435

436

437
// while other files all include the migration id in the remote file name, only the last
438
// chunk of a data file does the same. this is ok because:
439
// 1. without a compaction, data file is always uploaded chunk by chunk, only the last
440
//    chunk may be modified.
441
// 2. after a compaction, all of the data is downloaded to local, so overwriting remote
442
//    data chunks won't cause any problem. (this is not likely to happen because we will
443
//    cancel the migration in this case, refer comment in function shouldMigrate).
444
static int32_t uploadDataFile(SRTNer* rtner, STFileObj* fobj) {
1,500✔
445
  int32_t code = 0;
1,500✔
446
  int32_t vid = TD_VID(rtner->tsdb->pVnode), mid = getSsMigrateId(rtner->tsdb);
1,500✔
447
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
1,500✔
448
  int64_t szFile = 0, szChunk = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
1,500✔
449
  STFile *f = fobj->f;
1,500✔
450
  STFileOp op = {.optype = TSDB_FOP_MODIFY, .fid = f->fid, .of = *f};
1,500✔
451

452
  char path[TSDB_FILENAME_LEN];
1,500✔
453
  if (f->lcn <= 1) {
1,500✔
454
    strcpy(path, fobj->fname);
1,500✔
455
  } else {
456
    tsdbTFileLastChunkName(rtner->tsdb, f, path);
×
457
  }
458

459
  code = taosStatFile(path, &szFile, NULL, NULL);
1,500✔
460
  if (code != TSDB_CODE_SUCCESS) {
1,500✔
461
    tsdbError("vgId:%d, fid:%d failed to stat file %s since %s", vid, f->fid, path, tstrerror(code));
×
462
    return false;
×
463
  }
464

465
  if (f->lcn > 1) {
1,500✔
466
    szFile += szChunk * (f->lcn - 1); // add the size of migrated chunks
×
467
  }
468

469
  int totalChunks = szFile / szChunk;
1,500✔
470
  if (szFile % szChunk) {
1,500✔
471
    totalChunks++;
1,500✔
472
  }
473

474
  int lcn = f->lcn < 1 ? 1 : f->lcn;
1,500✔
475

476
  // upload chunks one by one, the first chunk may already been uploaded, but may be
477
  // modified thereafter, so we need to upload it again
478
  for (int i = lcn; i <= totalChunks; ++i) {
4,500✔
479
    int64_t offset = (int64_t)(i - lcn) * szChunk;
3,000✔
480
    int64_t size = szChunk;
3,000✔
481
    if (i == totalChunks && szFile % szChunk) {
3,000✔
482
        size = szFile % szChunk;
1,500✔
483
    }
484

485
    // only include the migration id in the last chunk filename
486
    char rpath[TSDB_FILENAME_LEN];
3,000✔
487
    if (i == totalChunks) {
3,000✔
488
      sprintf(rpath, "vnode%d/f%d/v%df%dver%" PRId64 ".m%d.%d.data", vid, f->fid, vid, f->fid, f->cid, mid, i);
1,500✔
489
    } else {
490
      sprintf(rpath, "vnode%d/f%d/v%df%dver%" PRId64 ".%d.data", vid, f->fid, vid, f->fid, f->cid, i);
1,500✔
491
    }
492

493
    code = tssUploadFileToDefault(rpath, path, offset, size);
3,000✔
494
    if (code != TSDB_CODE_SUCCESS) {
3,000✔
495
      tsdbError("vgId:%d, fid:%d, failed to migrate data file since %s", vid, f->fid, tstrerror(code));
×
496
      return code;
×
497
    }
498
  }
499

500
  f->lcn = totalChunks;
1,500✔
501
  op.nf = *f;
1,500✔
502
  code = TARRAY2_APPEND(&rtner->fopArr, op);
1,500✔
503
  if (code != TSDB_CODE_SUCCESS) {
1,500✔
504
    tsdbError("vgId:%d, fid:%d, failed to append file operation since %s", vid, f->fid, tstrerror(code));
×
505
    return code;
×
506
  }
507

508
  // manifest must be uploaded before copy last chunk, otherwise, failed to upload manifest
509
  // will result in a broken migration
510
  tsdbInfo("vgId:%d, fid:%d, data file migrated, begin generate & upload manifest", vid, f->fid);
1,500✔
511

512
  // manifest, this also commit the migration
513
  code = uploadManifest(vnodeNodeId(rtner->tsdb->pVnode), vid, rtner->fset, getSsMigrateId(rtner->tsdb));
1,500✔
514
  if (code != TSDB_CODE_SUCCESS) {
1,500✔
515
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
516
    return code;
×
517
  }
518

519
  tsdbInfo("vgId:%d, fid:%d, manifest uploaded, begin remove garbage files", vid, f->fid);
1,500✔
520
  tsdbRemoveSsGarbageFiles(vid, rtner->fset);
1,500✔
521

522
  setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SUCCEEDED);
1,500✔
523
  tsdbInfo("vgId:%d, fid:%d, leader migration succeeded", vid, f->fid);
1,500✔
524

525
  // no new chunks generated, no need to copy the last chunk
526
  if (totalChunks == lcn) {
1,500✔
527
    return 0;
×
528
  }
529

530
  // copy the last chunk to the new file
531
  char newPath[TSDB_FILENAME_LEN];
1,500✔
532
  tsdbTFileLastChunkName(rtner->tsdb, &op.nf, newPath);
1,500✔
533

534
  int64_t offset = (int64_t)(totalChunks - lcn) * szChunk;
1,500✔
535
  int64_t size = szChunk;
1,500✔
536
  if (szFile % szChunk) {
1,500✔
537
    size = szFile % szChunk;
1,500✔
538
  }
539

540
  TdFilePtr fdFrom = taosOpenFile(path, TD_FILE_READ);
1,500✔
541
  if (fdFrom == NULL) {
1,500✔
542
    code = terrno;
×
543
    tsdbError("vgId:%d, fid:%d, failed to open source file %s since %s", vid, f->fid, path, tstrerror(code));
×
544
    return code;
×
545
  }
546

547
  TdFilePtr fdTo = taosOpenFile(newPath, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
1,500✔
548
  if (fdTo == NULL) {
1,500✔
549
    code = terrno;
×
550
    tsdbError("vgId:%d, fid:%d, failed to open target file %s since %s", vid, f->fid, newPath, tstrerror(code));
×
551
    TAOS_UNUSED(taosCloseFile(&fdFrom));
×
552
    return code;
×
553
  }
554

555
  int64_t n = taosFSendFile(fdTo, fdFrom, &offset, size);
1,500✔
556
  if (n < 0) {
1,500✔
557
    code = terrno;
×
558
    tsdbError("vgId:%d, fid:%d, failed to copy file %s to %s since %s", vid, f->fid, path, newPath, tstrerror(code));
×
559
  }
560
  TAOS_UNUSED(taosCloseFile(&fdFrom));
1,500✔
561
  TAOS_UNUSED(taosCloseFile(&fdTo));
1,500✔
562

563
  return code;
1,500✔
564
}
565

566

567
static bool shouldMigrate(SRTNer *rtner, int32_t *pCode) {
3,750✔
568
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
3,750✔
569
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
3,750✔
570
  STFileSet *pLocalFset = rtner->fset;
3,750✔
571
  STFileObj *flocal = pLocalFset->farr[TSDB_FTYPE_DATA];
3,750✔
572

573
  *pCode = 0;
3,750✔
574
  if (!flocal) {
3,750✔
575
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, local data file not exist", vid, pLocalFset->fid);
×
576
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
577
    return false;
×
578
  }
579

580
  if (rtner->lastCommit != pLocalFset->lastCommit) {
3,750✔
581
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, there are new commits after migration task is scheduled", vid, pLocalFset->fid);
×
582
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
583
    return false;
×
584
  }
585

586
  if (pCfg->ssCompact && flocal->f->lcn < 0) {
3,750✔
587
    int32_t     lcn = flocal->f->lcn;
1,500✔
588
    STimeWindow win = {0};
1,500✔
589
    tsdbFidKeyRange(pLocalFset->fid, rtner->tsdb->keepCfg.days, rtner->tsdb->keepCfg.precision, &win.skey, &win.ekey);
1,500✔
590
    *pCode = tsdbAsyncCompact(rtner->tsdb, &win, TSDB_OPTR_SSMIGRATE, false);
1,500✔
591
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, fileset need compact, lcn: %d", vid, pLocalFset->fid, lcn);
1,500✔
592
    if (*pCode) {
1,500✔
593
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
594
    } else {
595
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_COMPACT);
1,500✔
596
    }
597
    return false; // compact in progress
1,500✔
598
  }
599

600
  char path[TSDB_FILENAME_LEN];
2,250✔
601
  if (flocal->f->lcn <= 1) {
2,250✔
602
    strcpy(path, flocal->fname);
2,250✔
603
  } else {
604
    tsdbTFileLastChunkName(rtner->tsdb, flocal->f, path);
×
605
  }
606

607
  int64_t size = 0, mtime = 0;
2,250✔
608
  *pCode = taosStatFile(path, &size, &mtime, NULL);
2,250✔
609
  if (*pCode != TSDB_CODE_SUCCESS) {
2,250✔
610
    tsdbError("vgId:%d, fid:%d, migration cancelled, failed to stat file %s since %s", vid, pLocalFset->fid, path, tstrerror(*pCode));
×
611
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
612
    return false;
×
613
  }
614

615
  // file may become smaller after a compaction, especially after a RSMA compaction
616
  if (flocal->f->lcn < 1 && size <= (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize) {
2,250✔
617
    tsdbInfo("vgId:%d, fid:%d, migration skipped, data file is too small, size: %" PRId64 " bytes", vid, flocal->f->fid, size);
750✔
618
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
750✔
619
    return false;
750✔
620
  }
621

622
  // 'mtime >= rtner->tw.ekey - tsSsUploadDelaySec' means the file is active writing, and we should skip
623
  // the migration. However, this may also be a result of the [ssCompact] option, which should not
624
  // be skipped, so we also check 'mtime > pLocalFset->lastCompact / 1000 || !pCfg->ssCompact', note
625
  // this is not an acurate condition, but is simple and good enough.
626
  if (mtime >= rtner->tw.ekey - tsSsUploadDelaySec && (mtime > pLocalFset->lastCompact / 1000 || !pCfg->ssCompact)) {
1,500✔
627
    tsdbInfo("vgId:%d, fid:%d, migration skipped, data file is active writting, modified at %" PRId64, vid, pLocalFset->fid, mtime);
×
628
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
629
    return false; // still active writing, postpone migration
×
630
  }
631

632
  // download manifest from shared storage
633
  STFileSet *pRemoteFset = NULL;
1,500✔
634
  *pCode = downloadManifest(rtner->tsdb->pVnode, pLocalFset->fid, &pRemoteFset);
1,500✔
635
  if (*pCode == TSDB_CODE_SUCCESS) {
1,500✔
636
    // remote file exists but local file has not been migrated, there are two possibilities:
637
    // 1. there's a compact after the last migration, this is a normal case, we can discard
638
    //    the remote files and continue the migration;
639
    // 2. in the last migration, this node was a follower, the leader did its job successfully,
640
    //    but this node did not, continue the migration may overwrite remote file and result in
641
    //    data corruption on other nodes.
642
    // however, it is hard to distinguish them, so just treat both as a migration error. hope
643
    // the user could do something to recover, such as remove remote files.
644
    if (flocal->f->lcn < 1) {
×
645
      tsdbTFileSetClear(&pRemoteFset);
×
646
      tsdbError("vgId:%d, fid:%d, migration cancelled, remote manifest found but local lcn < 1", vid, pLocalFset->fid);
×
647
      *pCode = TSDB_CODE_FILE_CORRUPTED;
×
648
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
649
      return false;
×
650
    }
651

652
  } else if (*pCode == TSDB_CODE_NOT_FOUND) {
1,500✔
653
    if (flocal->f->lcn >= 1) {
1,500✔
654
      tsdbError("vgId:%d, fid:%d, migration cancelled, remote manifest not found but local lcn >= 1", vid, pLocalFset->fid);
×
655
      *pCode = TSDB_CODE_FILE_CORRUPTED;
×
656
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
657
      return false;
×
658
    }
659

660
    // this is the first migration, we should do it.
661
    *pCode = TSDB_CODE_SUCCESS;
1,500✔
662
    return true;
1,500✔
663

664
  } else {
665
    tsdbError("vgId:%d, fid:%d, migration cancelled, failed to download manifest, code: %d", vid, pLocalFset->fid, *pCode);
×
666
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
667
    return false;
×
668
  }
669

670
  STFileObj *fremote = pRemoteFset->farr[TSDB_FTYPE_DATA];
×
671
  if (fremote == NULL) {
×
672
    tsdbError("vgId:%d, fid:%d, migration cancelled, cannot find data file information from remote manifest", vid, pLocalFset->fid);
×
673
    tsdbTFileSetClear(&pRemoteFset);
×
674
    *pCode = TSDB_CODE_FILE_CORRUPTED;
×
675
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
676
    return false;
×
677
  }
678

679
  if (fremote->f->lcn != flocal->f->lcn) {
×
680
    tsdbError("vgId:%d, fid:%d, migration cancelled, remote and local data file information mismatch", vid, pLocalFset->fid);
×
681
    tsdbTFileSetClear(&pRemoteFset);
×
682
    *pCode = TSDB_CODE_FILE_CORRUPTED;
×
683
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
684
    return false;
×
685
  }
686
  
687
  if (fremote->f->maxVer == flocal->f->maxVer) {
×
688
    tsdbTFileSetClear(&pRemoteFset);
×
689
    tsdbInfo("vgId:%d, fid:%d, migration skipped, no new data", vid, pLocalFset->fid);
×
690
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
691
    return false; // no new data
×
692
  }
693

694
  tsdbTFileSetClear(&pRemoteFset); // we use the local file set information for migration
×
695
  tsdbInfo("vgId:%d, fid:%d, file set will be migrated", vid, pLocalFset->fid);
×
696
  return true;
×
697
}
698

699

700
static int32_t tsdbFollowerDoSsMigrate(SRTNer *rtner) {
×
701
  int32_t code = 0, vid = TD_VID(rtner->tsdb->pVnode);
×
702
  STFileSet *fset = rtner->fset;
×
703
  SSsMigrateMonitor* pmm = rtner->tsdb->pSsMigrateMonitor;
×
704
  int32_t fsIdx = 0;
×
705

706
  // though we make this check in the leader node, we should do this in the follower nodes too.
707
  // because there may be a leader change and the execution order of async tasks may result in
708
  // different commit time. if we don't do this, we may corrupt the follower data.
709
  if (rtner->lastCommit != fset->lastCommit) {
×
710
    tsdbInfo("vgId:%d, fid:%d, follower migration cancelled, there are new commits after migration is scheduled", vid, fset->fid);
×
711
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
712
    return 0;
×
713
  }
714

715
  tsdbInfo("vgId:%d, fid:%d, vnode is follower, waiting leader on node %d to upload.", vid, fset->fid, rtner->nodeId);
×
716

717
  (void)taosThreadMutexLock(&rtner->tsdb->mutex);
×
718

719
  while(pmm->state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
UNCOV
720
    struct timespec ts;
×
721
    if ((code = taosClockGetTime(CLOCK_REALTIME, &ts)) != TSDB_CODE_SUCCESS) {
×
722
      tsdbError("vgId:%d, fid:%d, failed to get current time since %s", vid, fset->fid, tstrerror(code));
×
723
      TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
724
      return code;
×
725
    }
726
    ts.tv_sec += 30; // TODO: make it configurable
×
727
    code = taosThreadCondTimedWait(&pmm->stateChanged, &rtner->tsdb->mutex, &ts);
×
728
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
×
729
      tsdbError("vgId:%d, fid:%d, waiting leader migration timed out", vid, fset->fid);
×
730
      pmm->state = SSMIGRATE_FILESET_STATE_FAILED;
×
731
    }
732
  }
733

734
  if (pmm->state != SSMIGRATE_FILESET_STATE_SUCCEEDED) {
×
735
    TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
736
    tsdbInfo("vgId:%d, fid:%d, follower migration skipped because leader migration skipped or failed", vid, fset->fid);
×
737
    return 0;
×
738
  }
739

740
  TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
741

742
  // NOTE: After the leader node finished processing the current file set and mnode sent the final
743
  // follower migrate request, mnode waits at least 30 seconds before triggering the processing of
744
  // the next file set. Because all file sets of a vgroup shares the same [pmm], from now on, the
745
  // follower should not access [pmm->state] anymore.
746
  // refer comments in mndUpdateSsMigrateProgress for more details.
747

748
  tsdbInfo("vgId:%d, fid:%d, follower migration started, begin downloading manifest...", vid, fset->fid);
×
749
  STFileSet *pRemoteFset = NULL;
×
750
  code = downloadManifest(rtner->tsdb->pVnode, fset->fid, &pRemoteFset);
×
751
  if (code == TSDB_CODE_NOT_FOUND) {
×
752
    tsdbTFileSetClear(&pRemoteFset);
×
753
    return TSDB_CODE_FILE_CORRUPTED;
×
754
  }
755

756
  // this often happens in the catch up process of a new node, it is ok to continue, but will
757
  // result in download same files more than once, which is a waste of time and bandwidth.
758
  if (pRemoteFset->farr[TSDB_FTYPE_HEAD]->f->mid != getSsMigrateId(rtner->tsdb)) {
×
759
    tsdbTFileSetClear(&pRemoteFset);
×
760
    tsdbWarn("vgId:%d, fid:%d, follower migration cancelled, migration id mismatch", vid, fset->fid);
×
761
    return 0;
×
762
  }
763

764
  tsdbInfo("vgId:%d, fid:%d, manifest downloaded, begin downloading head file", vid, fset->fid);
×
765
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_HEAD]);
×
766
  if (code != TSDB_CODE_SUCCESS) {
×
767
    tsdbTFileSetClear(&pRemoteFset);
×
768
    return code;
×
769
  }
770
  STFileOp op = {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_HEAD]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_HEAD]->f};
×
771
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
772
  if (code != TSDB_CODE_SUCCESS) {
×
773
    tsdbTFileSetClear(&pRemoteFset);
×
774
    tsdbError("vgId:%d, fid:%d, failed to append head file operation since %s", vid, fset->fid, tstrerror(code));
×
775
    return code;
×
776
  }
777

778
  tsdbInfo("vgId:%d, fid:%d, head file downloaded, begin downloading sma file", vid, fset->fid);
×
779
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_SMA]);
×
780
  if (code != TSDB_CODE_SUCCESS) {
×
781
    tsdbTFileSetClear(&pRemoteFset);
×
782
    return code;
×
783
  }
784
  op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_SMA]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_SMA]->f};
×
785
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
786
  if (code != TSDB_CODE_SUCCESS) {
×
787
    tsdbTFileSetClear(&pRemoteFset);
×
788
    tsdbError("vgId:%d, fid:%d, failed to append sma file operation since %s", vid, fset->fid, tstrerror(code));
×
789
    return code;
×
790
  }
791

792
  tsdbInfo("vgId:%d, fid:%d, sma file downloaded, begin downloading tomb file", vid, fset->fid);
×
793
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_TOMB]);
×
794
  if (code != TSDB_CODE_SUCCESS) {
×
795
    tsdbTFileSetClear(&pRemoteFset);
×
796
    return code;
×
797
  }
798
  if (fset->farr[TSDB_FTYPE_TOMB] != NULL && pRemoteFset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
799
    op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_TOMB]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_TOMB]->f};
×
800
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
801
  } else if (fset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
802
    // the remote tomb file is not found, but local tomb file exists, we should remove it
803
    op = (STFileOp) {.optype = TSDB_FOP_REMOVE, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_TOMB]->f};
×
804
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
805
  } else if (pRemoteFset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
806
    op = (STFileOp) {.optype = TSDB_FOP_CREATE, .fid = fset->fid, .nf = *pRemoteFset->farr[TSDB_FTYPE_TOMB]->f};
×
807
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
808
  }
809
  if (code != TSDB_CODE_SUCCESS) {
×
810
    tsdbTFileSetClear(&pRemoteFset);
×
811
    tsdbError("vgId:%d, fid:%d, failed to append tomb file operation since %s", vid, fset->fid, tstrerror(code));
×
812
    return code;
×
813
  }
814

815
  tsdbInfo("vgId:%d, fid:%d, tomb file downloaded, begin downloading stt files", vid, fset->fid);
×
816
  SSttLvl* lvl;
817
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
818
    STFileObj* fobj;
819
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
820
      op = (STFileOp) {.optype = TSDB_FOP_REMOVE, .fid = fset->fid, .of = *fobj->f};
×
821
      code = TARRAY2_APPEND(&rtner->fopArr, op);
×
822
      if (code != TSDB_CODE_SUCCESS) {
×
823
        tsdbTFileSetClear(&pRemoteFset);
×
824
        tsdbError("vgId:%d, fid:%d, failed to append stt file remove operation since %s", vid, fset->fid, tstrerror(code));
×
825
        return code;
×
826
      }
827
    }
828
  }
829
  TARRAY2_FOREACH(pRemoteFset->lvlArr, lvl) {
×
830
    STFileObj* fobj;
831
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
832
      code = downloadFile(rtner, fobj);
×
833
      if (code != TSDB_CODE_SUCCESS) {
×
834
        tsdbTFileSetClear(&pRemoteFset);
×
835
        return code;
×
836
      }
837
      op = (STFileOp) {.optype = TSDB_FOP_CREATE, .fid = fset->fid, .nf = *fobj->f};
×
838
      code = TARRAY2_APPEND(&rtner->fopArr, op);
×
839
      if (code != TSDB_CODE_SUCCESS) {
×
840
        tsdbTFileSetClear(&pRemoteFset);
×
841
        tsdbError("vgId:%d, fid:%d, failed to append stt file create operation since %s", vid, fset->fid, tstrerror(code));
×
842
        return code;
×
843
      }
844
    }
845
  }
846

847
  tsdbInfo("vgId:%d, fid:%d, stt files downloaded, begin downloading data file", vid, fset->fid);
×
848
  code = downloadDataFileLastChunk(rtner, pRemoteFset->farr[TSDB_FTYPE_DATA]);
×
849
  if (code != TSDB_CODE_SUCCESS) {
×
850
    tsdbTFileSetClear(&pRemoteFset);
×
851
    return code;
×
852
  }
853
  op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_DATA]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_DATA]->f};
×
854
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
855
  if (code != TSDB_CODE_SUCCESS) {
×
856
    tsdbTFileSetClear(&pRemoteFset);
×
857
    tsdbError("vgId:%d, fid:%d, failed to append data file operation since %s", vid, fset->fid, tstrerror(code));
×
858
    return code;
×
859
  }
860

861
  tsdbInfo("vgId:%d, fid:%d, data file downloaded", vid, fset->fid);
×
862
  tsdbTFileSetClear(&pRemoteFset);
×
863
  return 0;
×
864
}
865

866

867
static int32_t tsdbLeaderDoSsMigrate(SRTNer *rtner) {
3,750✔
868
  int32_t code = 0, vid = TD_VID(rtner->tsdb->pVnode);
3,750✔
869
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
3,750✔
870
  STFileSet *fset = rtner->fset;
3,750✔
871

872
  tsdbInfo("vgId:%d, fid:%d, vnode is leader, migration started", vid, fset->fid);
3,750✔
873

874
  if (!shouldMigrate(rtner, &code)) {
3,750✔
875
    return code;
2,250✔
876
  }
877

878
  // head file
879
  tsdbInfo("vgId:%d, fid:%d, begin migrate head file", vid, fset->fid);
1,500✔
880
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_HEAD]);
1,500✔
881
  if (code != TSDB_CODE_SUCCESS) {
1,500✔
882
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
883
    return code;
×
884
  }
885

886
  tsdbInfo("vgId:%d, fid:%d, head file migrated, begin migrate sma file", vid, fset->fid);
1,500✔
887

888
  // sma file
889
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_SMA]);
1,500✔
890
  if (code != TSDB_CODE_SUCCESS) {
1,500✔
891
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
892
    return code;
×
893
  }
894

895
  tsdbInfo("vgId:%d, fid:%d, sma file migrated, begin migrate tomb file", vid, fset->fid);
1,500✔
896

897
  // tomb file
898
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_TOMB]);
1,500✔
899
  if (code != TSDB_CODE_SUCCESS) {
1,500✔
900
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
901
    return code;
×
902
  }
903

904
  tsdbInfo("vgId:%d, fid:%d, tomb file migrated, begin migrate stt files", vid, fset->fid);
1,500✔
905

906
  // stt files
907
  SSttLvl* lvl;
908
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
1,500✔
909
    STFileObj* fobj;
910
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
911
      code = uploadFile(rtner, fobj);
×
912
      if (code != TSDB_CODE_SUCCESS) {
×
913
        setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
914
        return code;
×
915
      }
916
    }
917
  }
918
  
919
  tsdbInfo("vgId:%d, fid:%d, stt files migrated, begin migrate data file", vid, fset->fid);
1,500✔
920

921
  // data file
922
  code = uploadDataFile(rtner, fset->farr[TSDB_FTYPE_DATA]);
1,500✔
923
  if (code != TSDB_CODE_SUCCESS) {
1,500✔
924
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
925
    return code;
×
926
  }
927

928
  return TSDB_CODE_SUCCESS;
1,500✔
929
}
930

931

932
int32_t tsdbDoSsMigrate(SRTNer *rtner) {
3,750✔
933
  // note: leader is decided when the task is scheduled, the actual leader may change after that,
934
  // but this is ok.
935
  if (rtner->nodeId == vnodeNodeId(rtner->tsdb->pVnode)) {
3,750✔
936
    return tsdbLeaderDoSsMigrate(rtner);
3,750✔
937
  }
938
  return tsdbFollowerDoSsMigrate(rtner);
×
939
}
940

941

942
#ifdef USE_SHARED_STORAGE
943

944
int32_t tsdbListSsMigrateFileSets(STsdb *tsdb, SArray* fidArr) {
4,500✔
945
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
4,500✔
946
  int64_t now = taosGetTimestampSec();
4,500✔
947
  SVnodeCfg *pCfg = &tsdb->pVnode->config;
4,500✔
948

949
  (void)taosThreadMutexLock(&tsdb->mutex);
4,500✔
950

951
  STFileSet *fset;
952
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
6,750✔
953
    if (tsdbFidLevel(fset->fid, &tsdb->keepCfg, now) < 0) {
2,250✔
954
      tsdbInfo("vgId:%d, fid:%d, migration skipped, file set is expired", vid, fset->fid);
×
955
      continue;
×
956
    }
957

958
    if (tsdbSsFidLevel(fset->fid, &tsdb->keepCfg, tsdb->pVnode->config.ssKeepLocal, now) < 1) {
2,250✔
959
      tsdbInfo("vgId:%d, fid:%d, migration skipped, keep local file set", vid, fset->fid);
×
960
      continue;
×
961
    }
962

963
    STFileObj *fdata = fset->farr[TSDB_FTYPE_DATA];
2,250✔
964
    if (fdata == NULL) {
2,250✔
965
      tsdbInfo("vgId:%d, fid:%d, migration skipped, no data file", vid, fset->fid);
×
966
      continue;
×
967
    }
968

969
    char path[TSDB_FILENAME_LEN];
2,250✔
970
    if (fdata->f->lcn <= 1) {
2,250✔
971
      strcpy(path, fdata->fname);
2,250✔
972
    } else {
973
      tsdbTFileLastChunkName(tsdb, fdata->f, path);
×
974
    }
975

976
    int64_t size = 0;
2,250✔
977
    int32_t code = taosStatFile(path, &size, NULL, NULL);
2,250✔
978
    if (code != TSDB_CODE_SUCCESS) {
2,250✔
979
      tsdbError("vgId:%d, fid:%d, migration skipped, failed to stat file since %s", vid, fset->fid, tstrerror(code));
×
980
      continue;
×
981
    }
982

983
    if (size <= (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize) {
2,250✔
984
      tsdbInfo("vgId:%d, fid:%d, migration skipped, data file is too small, size: %" PRId64 " bytes", vid, fset->fid, size);
×
985
      continue;
×
986
    }
987

988
    if (taosArrayPush(fidArr, &fset->fid) == NULL) {
4,500✔
989
      code = terrno;
×
990
      tsdbError("vgId:%d, failed to push file set id %d to array since %s", vid, fset->fid, tstrerror(code));
×
991
      break;
×
992
    }
993
  }
994

995
  (void)taosThreadMutexUnlock(&tsdb->mutex);
4,500✔
996
  return code;
4,500✔
997
}
998

999

1000
static int32_t tsdbAsyncMigrateFileSetImpl(STsdb *tsdb,  const SSsMigrateFileSetReq *pReq) {
3,750✔
1001
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
3,750✔
1002

1003
  // check if background task is disabled
1004
  if (tsdb->bgTaskDisabled) {
3,750✔
1005
    tsdbInfo("vgId:%d, ssMigrateId:%d, background task is disabled, skip", vid, pReq->ssMigrateId);
×
1006
    return TSDB_CODE_FAILED;
×
1007
  }
1008

1009
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
3,750✔
1010
  if (pmm->fid != 0 && pmm->state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
3,750✔
1011
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, failed to monitor since previous migration is still in progress",
×
1012
              vid, pReq->fid, pReq->ssMigrateId);
1013
    return TSDB_CODE_FAILED;
×
1014
  }
1015

1016
  STFileSet *fset = NULL, *fset1;
3,750✔
1017
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset1) {
3,750✔
1018
    if (fset1->fid == pReq->fid) {
3,750✔
1019
      fset = fset1;
3,750✔
1020
      break;
3,750✔
1021
    }
1022
  }
1023

1024
  if (fset == NULL) {
3,750✔
1025
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, file set not found", vid, pReq->fid, pReq->ssMigrateId);
×
1026
    return TSDB_CODE_NOT_FOUND;
×
1027
  }
1028

1029
  if (fset->lastMigrate/1000 >= pReq->startTimeSec) {
3,750✔
1030
    tsdbInfo("vgId:%d, fid:%d, ssMigrate:%d, start time < last migration time, skip", vid, fset->fid, pReq->ssMigrateId);
×
1031
    return TSDB_CODE_FAILED;
×
1032
  }
1033

1034
  SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
3,750✔
1035
  if (arg == NULL) {
3,750✔
1036
    code = terrno;
×
1037
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, memory allocation failed", vid, pReq->fid, pReq->ssMigrateId);
×
1038
    return code;
×
1039
  }
1040

1041
  arg->tsdb = tsdb;
3,750✔
1042
  arg->tw.skey = INT64_MIN;
3,750✔
1043
  arg->tw.ekey = pReq->startTimeSec;
3,750✔
1044
  arg->fid = fset->fid;
3,750✔
1045
  arg->nodeId = pReq->nodeId;
3,750✔
1046
  arg->optrType = TSDB_OPTR_SSMIGRATE;
3,750✔
1047
  arg->lastCommit = fset->lastCommit;
3,750✔
1048

1049
  pmm->ssMigrateId = pReq->ssMigrateId;
3,750✔
1050
  pmm->fid = fset->fid;
3,750✔
1051
  pmm->state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
3,750✔
1052
  pmm->startTimeSec = pReq->startTimeSec;
3,750✔
1053

1054
  code = vnodeAsync(RETENTION_TASK_ASYNC, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, &fset->migrateTask);
3,750✔
1055
  if (code) {
3,750✔
1056
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, schedule async task failed", vid, pReq->fid, pReq->ssMigrateId);
×
1057
    taosMemoryFree(arg);
×
1058
    pmm->state = SSMIGRATE_FILESET_STATE_FAILED;
×
1059
  }
1060

1061
  return code;
3,750✔
1062
}
1063

1064

1065
int32_t tsdbAsyncSsMigrateFileSet(STsdb *tsdb, SSsMigrateFileSetReq *pReq) {
3,750✔
1066
  int32_t code = 0;
3,750✔
1067

1068
  (void)taosThreadMutexLock(&tsdb->mutex);
3,750✔
1069
  code = tsdbAsyncMigrateFileSetImpl(tsdb, pReq);
3,750✔
1070
  (void)taosThreadMutexUnlock(&tsdb->mutex);
3,750✔
1071

1072
  return code;
3,750✔
1073
}
1074

1075

1076
void tsdbStopSsMigrateTask(STsdb* tsdb, int32_t ssMigrateId) {
×
1077
  (void)taosThreadMutexLock(&tsdb->mutex);
×
1078

1079
  if (tsdb->pSsMigrateMonitor == NULL) {
×
1080
    (void)taosThreadMutexUnlock(&tsdb->mutex);
×
1081
    return;
×
1082
  }
1083

1084
  if (tsdb->pSsMigrateMonitor->ssMigrateId != ssMigrateId) {
×
1085
    tsdbInfo("vgId:%d, ssMigrateId:%d, migration task not found", TD_VID(tsdb->pVnode), ssMigrateId);
×
1086
    (void)taosThreadMutexUnlock(&tsdb->mutex);
×
1087
    return;
×
1088
  }
1089

1090
  if (tsdb->pSsMigrateMonitor->state != SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
1091
    (void)taosThreadMutexUnlock(&tsdb->mutex);
×
1092
    return;
×
1093
  }
1094

1095
  STFileSet *fset;
1096
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
×
1097
    if (fset->fid == tsdb->pSsMigrateMonitor->fid) {
×
1098
      (void)vnodeACancel(&fset->migrateTask);
×
1099
      break;
×
1100
    }
1101
  }
1102

1103
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
1104
}
1105

1106
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc