• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4991

17 Mar 2026 07:57AM UTC coverage: 69.756% (+0.4%) from 69.348%
#4991

push

travis-ci

web-flow
merge: from main to 3.0 branch #34807

14 of 16 new or added lines in 5 files covered. (87.5%)

3928 existing lines in 138 files now uncovered.

192146 of 275455 relevant lines covered (69.76%)

137208686.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.27
/source/dnode/vnode/src/tsdb/tsdbMigrate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tss.h"
17
#include "tsdb.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSet2.h"
20
#include "vnd.h"
21
#include "tsdbInt.h"
22

23
extern int32_t tsdbAsyncCompact(STsdb* tsdb, const STimeWindow* tw, ETsdbOpType type, bool force);
24

25
// migrate monitor related functions
26
typedef struct SSsMigrateMonitor {
27
  TdThreadCond  stateChanged;
28
  int32_t ssMigrateId;
29
  int32_t fid;
30
  int32_t state;
31
  int64_t startTimeSec;
32
} SSsMigrateMonitor;
33

34

35
static int32_t getSsMigrateId(STsdb* tsdb) {
5,984✔
36
  return tsdb->pSsMigrateMonitor->ssMigrateId;
5,984✔
37
}
38

39

40
int32_t tsdbOpenSsMigrateMonitor(STsdb *tsdb) {
4,080,194✔
41
  SSsMigrateMonitor* pmm = (SSsMigrateMonitor*)taosMemCalloc(1, sizeof(SSsMigrateMonitor));
4,080,194✔
42
  if (pmm == NULL) {
4,115,805✔
43
    return TSDB_CODE_OUT_OF_MEMORY;
×
44
  }
45
  TAOS_UNUSED(taosThreadCondInit(&pmm->stateChanged, NULL));
4,115,805✔
46
  tsdb->pSsMigrateMonitor = pmm;
4,106,819✔
47
  return 0;
4,110,094✔
48
}
49

50

51
void tsdbCloseSsMigrateMonitor(STsdb *tsdb) {
4,115,276✔
52
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
4,115,276✔
53
  if (pmm == NULL) {
4,115,360✔
54
    return;
×
55
  }
56

57
  TAOS_UNUSED(taosThreadCondDestroy(&pmm->stateChanged));
4,115,360✔
58
  taosMemoryFree(tsdb->pSsMigrateMonitor);
4,114,895✔
59
  tsdb->pSsMigrateMonitor = NULL;
4,114,235✔
60
}
61

62

63
static void setMigrationState(STsdb* tsdb, int32_t state) {
3,740✔
64
  (void)taosThreadMutexLock(&tsdb->mutex);
3,740✔
65
  tsdb->pSsMigrateMonitor->state = state;
3,740✔
66
  (void)taosThreadMutexUnlock(&tsdb->mutex);
3,740✔
67
}
3,740✔
68

69

70
int32_t tsdbQuerySsMigrateProgress(STsdb *tsdb, SSsMigrateProgress *pProgress) {
5,984✔
71
  int32_t code = 0, vid = TD_VID(tsdb->pVnode);
5,984✔
72

73
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
5,984✔
74

75
  (void)taosThreadMutexLock(&tsdb->mutex);
5,984✔
76

77
  if (pmm->ssMigrateId != pProgress->ssMigrateId) {
5,984✔
78
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, migrate id mismatch in query progress, actual %d",
×
79
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->ssMigrateId);
80
    code = TSDB_CODE_INVALID_MSG;
×
81
  } else if (pmm->fid != pProgress->fid) {
5,984✔
82
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, file set id mismatch in query progress, actual %d",
×
83
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->fid);
84
    code = TSDB_CODE_INVALID_MSG;
×
85
  } else {
86
    pProgress->state = pmm->state;
5,984✔
87
  }
88

89
  (void)taosThreadMutexUnlock(&tsdb->mutex);
5,984✔
90

91
  return code;
5,984✔
92
}
93

94

95

96
int32_t tsdbUpdateSsMigrateProgress(STsdb* tsdb, SSsMigrateProgress* pProgress) {
5,984✔
97
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
5,984✔
98
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
5,984✔
99

100
  // the state was generated by this vnode, so no need to process it
101
  if (pProgress->nodeId == vnodeNodeId(tsdb->pVnode)) {
5,984✔
102
    tsdbDebug("vgId:%d, skip migration progress update since it was generated by this vnode", vid);
5,984✔
103
    return 0;
5,984✔
104
  }
105

106
  (void)taosThreadMutexLock(&tsdb->mutex);
×
107

108
  if (pmm->ssMigrateId != pProgress->ssMigrateId) {
×
109
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, migrate id mismatch in update progress, actual %d",
×
110
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->ssMigrateId);
111
    code = TSDB_CODE_INVALID_MSG;
×
112
  } else if (pmm->fid != pProgress->fid) {
×
113
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, file set id mismatch in update progress, actual %d",
×
114
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->fid);
115
    code = TSDB_CODE_INVALID_MSG;
×
116
  } else {
117
    // update the state, and broadcast state change message, to avoid the timeout of
118
    // the waiting thread, we should always broadcast the message even if the state
119
    // is not changed actually.
120
    pmm->state = pProgress->state;
×
121
    (void)taosThreadCondBroadcast(&pmm->stateChanged);
×
122
  }
123

124
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
125

126
  return code;
×
127
}
128

129

130

131
// migrate file related functions
132
int32_t tsdbSsFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int32_t ssKeepLocal, int64_t nowSec) {
2,244✔
133
  int32_t localFid;
134
  TSKEY   key;
135

136
  if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
2,244✔
137
    nowSec = nowSec * 1000;
2,244✔
138
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
×
139
    nowSec = nowSec * 1000000l;
×
140
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
×
141
    nowSec = nowSec * 1000000000l;
×
142
  }
143

144
  nowSec = nowSec - pKeepCfg->keepTimeOffset * tsTickPerHour[pKeepCfg->precision];
2,244✔
145

146
  key = nowSec - ssKeepLocal * tsTickPerMin[pKeepCfg->precision];
2,244✔
147
  localFid = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
2,244✔
148

149
  return fid >= localFid ? 0 : 1;
2,244✔
150
}
151

152

153
static int32_t downloadManifest(SVnode* pVnode, int32_t fid, STFileSet** ppFileSet) {
1,496✔
154
  int32_t code = 0, vid = TD_VID(pVnode);
1,496✔
155

156
  char path[64];
1,496✔
157
  snprintf(path, sizeof(path), "vnode%d/f%d/manifests.json", vid, fid);
1,496✔
158
  int64_t size = 0;
1,496✔
159
  code = tssGetFileSizeOfDefault(path, &size);
1,496✔
160
  if (code != TSDB_CODE_SUCCESS) {
1,496✔
161
    tsdbError("vgId:%d, fid:%d, failed to get manifests size since %s", vid, fid, tstrerror(code));
1,496✔
162
    return code;
1,496✔
163
  }
164

165
  char* buf = taosMemoryMalloc(size + 1);
×
166
  code = tssReadFileFromDefault(path, 0, buf, &size);
×
167
  if (code != TSDB_CODE_SUCCESS) {
×
168
    tsdbError("vgId:%d, fid:%d, failed to read manifest from shared storage since %s", vid, fid, tstrerror(code));
×
169
    taosMemoryFree(buf);
×
170
    return code;
×
171
  }
172
  buf[size] = 0;
×
173

174
  cJSON* json = cJSON_Parse(buf);
×
175
  taosMemoryFree(buf);
×
176
  if (json == NULL) {
×
177
    tsdbError("vgId:%d, fid:%d, failed to parse manifest json since %s", vid, fid, tstrerror(code));
×
178
    return TSDB_CODE_FILE_CORRUPTED;
×
179
  }
180
  
181
  code = tsdbJsonToTFileSet(pVnode->pTsdb, json, ppFileSet);
×
182
  cJSON_Delete(json);
×
183
  if (code != TSDB_CODE_SUCCESS) {
×
184
    tsdbError("vgId:%d, fid:%d, failed to parse manifest since %s", vid, fid, tstrerror(code));
×
185
    return code;
×
186
  }
187

188
  STFileSet* fset = *ppFileSet;
×
189
  if (fset->fid != fid) {
×
190
    tsdbError("vgId:%d, fid:%d, mismatch fid, manifest fid is %d", vid, fid, fset->fid);
×
191
    tsdbTFileSetClear(ppFileSet);
×
192
    return TSDB_CODE_FILE_CORRUPTED;
×
193
  }
194
  if (fset->farr[TSDB_FTYPE_DATA] == NULL) {
×
195
    tsdbError("vgId:%d, fid:%d, data file not found in manifest", vid, fid);
×
196
    tsdbTFileSetClear(ppFileSet);
×
197
    return TSDB_CODE_FILE_CORRUPTED;
×
198
  }
199

200
  return code;
×
201
}
202

203

204
static int32_t uploadManifest(int32_t dnode, int32_t vnode, STFileSet* fset, int32_t mid) {
1,496✔
205
  int32_t code = 0;
1,496✔
206

207
  cJSON* json = cJSON_CreateObject();
1,496✔
208
  if (json == NULL) {
1,496✔
209
    return TSDB_CODE_OUT_OF_MEMORY;
×
210
  }
211

212
  // update migration id for all files in the file set
213
  STFileObj* fobj = fset->farr[TSDB_FTYPE_HEAD];
1,496✔
214
  fobj->f->mid = mid;
1,496✔
215
  fobj = fset->farr[TSDB_FTYPE_SMA];
1,496✔
216
  fobj->f->mid = mid;
1,496✔
217
  fobj = fset->farr[TSDB_FTYPE_DATA];
1,496✔
218
  fobj->f->mid = mid;
1,496✔
219
  fobj = fset->farr[TSDB_FTYPE_TOMB];
1,496✔
220
  if (fobj != NULL) {
1,496✔
221
    fobj->f->mid = mid;
×
222
  }
223

224
  SSttLvl* lvl;
225
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
1,496✔
226
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
227
      fobj->f->mid = mid;
×
228
    }
229
  }
230
  
231
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
1,496✔
232
    code = terrno;
×
233
    cJSON_Delete(json);
×
234
    return code;
×
235
  }
236
  if (cJSON_AddNumberToObject(json, "dnode", dnode) == NULL) {
1,496✔
237
    code = terrno;
×
238
    cJSON_Delete(json);
×
239
    return code;
×
240
  }
241
  if (cJSON_AddNumberToObject(json, "vnode", vnode) == NULL) {
1,496✔
242
    code = terrno;
×
243
    cJSON_Delete(json);
×
244
    return code;
×
245
  }
246

247
  code = tsdbTFileSetToJson(fset, json);
1,496✔
248
  if (code != TSDB_CODE_SUCCESS) {
1,496✔
249
    cJSON_Delete(json);
×
250
    return code;
×
251
  }
252
  char* buf = cJSON_PrintUnformatted(json);
1,496✔
253
  cJSON_Delete(json);
1,496✔
254

255
  char path[64];
1,496✔
256
  snprintf(path, sizeof(path), "vnode%d/f%d/manifests.json", vnode, fset->fid);
1,496✔
257
  code = tssUploadToDefault(path, buf, strlen(buf));
1,496✔
258
  taosMemoryFree(buf);
1,496✔
259
  if (code != TSDB_CODE_SUCCESS) {
1,496✔
260
    tsdbError("vgId:%d, fid:%d, failed to upload manifest since %s", vnode, fset->fid, tstrerror(code));
×
261
    return code;
×
262
  }
263

264
  return code;
1,496✔
265
}
266

267

268
// upload local files to shared storage
269
//
270
// local file name is like:
271
//     [base]/vnode2/f1736/v2f1736ver16.head
272
// or
273
//     [base]/vnode2/f1736/v2f1736ver16.m334233.head
274
//
275
// remote file name is like:
276
//     vnode2/f1736/v2f1736ver16.m13552343.head
277
//
278
// NOTE: the migration id is always included in the remote file name, because
279
// the commit id may be different between the vnodes of the same vgroup,
280
// that's an interrupted migration may overwrite some of the remote files
281
// while leaving the others intact if we don't include the migration id in
282
// the remote file name.
283
static int32_t uploadFile(SRTNer* rtner, STFileObj* fobj) {
4,488✔
284
  if (fobj == NULL) {
4,488✔
285
    return TSDB_CODE_SUCCESS;
1,496✔
286
  }
287

288
  const char* ext = strrchr(fobj->fname, '.');
2,992✔
289
  int32_t vid = TD_VID(rtner->tsdb->pVnode), mid = getSsMigrateId(rtner->tsdb);
2,992✔
290
  STFile* f = fobj->f;
2,992✔
291
  
292
  char path[TSDB_FILENAME_LEN];
2,992✔
293
  snprintf(path, sizeof(path), "vnode%d/f%d/v%df%dver%" PRId64 ".m%d%s", vid, f->fid, vid, f->fid, f->cid, mid, ext);
2,992✔
294

295
  int code = tssUploadFileToDefault(path, fobj->fname, 0, -1);
2,992✔
296
  if (code != TSDB_CODE_SUCCESS) {
2,992✔
297
    tsdbError("vgId:%d, fid:%d, failed to upload file %s since %s", vid, f->fid, fobj->fname, tstrerror(code));
×
298
    return code;
×
299
  }
300

301
  return 0;
2,992✔
302
}
303

304

305

306
static int32_t downloadFile(SRTNer* rtner, STFileObj* fobj) {
×
307
  if (fobj == NULL) {
×
308
    return TSDB_CODE_SUCCESS;
×
309
  }
310

311
  const char* fname = strrchr(fobj->fname, TD_DIRSEP_CHAR) + 1;
×
312
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
313
  STFile* f = fobj->f;
×
314
  
315
  char path[TSDB_FILENAME_LEN];
×
316
  snprintf(path, sizeof(path), "vnode%d/f%d/%s", vid, f->fid, fname);
×
317
  int code = tssDownloadFileFromDefault(path, fobj->fname, 0, -1);
×
318
  if (code != TSDB_CODE_SUCCESS) {
×
319
    tsdbError("vgId:%d, fid:%d, failed to download file %s since %s", vid, f->fid, path, tstrerror(code));
×
320
    return code;
×
321
  }
322

323
  return 0;
×
324
}
325

326

327
static void tsdbRemoveSsGarbageFiles(int32_t vid, STFileSet* fset) {
1,496✔
328
  char prefix[TSDB_FILENAME_LEN];
1,496✔
329
  snprintf(prefix, sizeof(prefix), "vnode%d/f%d/", vid, fset->fid);
1,496✔
330

331
  SArray* paths = taosArrayInit(10, sizeof(char*));
1,496✔
332
  int32_t code = tssListFileOfDefault(prefix, paths);
1,496✔
333
  if (code != TSDB_CODE_SUCCESS) {
1,496✔
334
    tsdbError("vgId:%d, fid:%d, failed to list files in shared storage since %s", vid, fset->fid, tstrerror(code));
×
335
    taosArrayDestroy(paths);
×
336
    return;
×
337
  }
338

339
  for(int i = 0; i < taosArrayGetSize(paths); i++) {
8,976✔
340
      char* p = *(char**)taosArrayGet(paths, i);
7,480✔
341
      const char* ext = strrchr(p, '.');
7,480✔
342
      const char* rname = strrchr(p, '/') + 1;
7,480✔
343
      bool remove = true;
7,480✔
344
      int32_t vgId = 0, fid = 0, mid = 0, cn = 0;
7,480✔
345
      int64_t cid = 0;
7,480✔
346
  
347
      // NOTE: when compare file name, don't use strcmp(fobj->fname, rname) because 'fobj->fname' may not
348
      // contain the migration id. that's why we use sscanf to parse the file name.
349

350
      if (ext == NULL) {
7,480✔
351
        // no extension, remove
352
      } else if (taosStrcasecmp(ext, ".head") == 0) {
7,480✔
353
        STFileObj* fobj = fset->farr[TSDB_FTYPE_HEAD];
1,496✔
354
        int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.head", &vgId, &fid, &cid, &mid);
1,496✔
355
        remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
1,496✔
356
      } else if (taosStrcasecmp(ext, ".sma") == 0) {
5,984✔
357
        STFileObj* fobj = fset->farr[TSDB_FTYPE_SMA];
1,496✔
358
        int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.sma", &vgId, &fid, &cid, &mid);
1,496✔
359
        remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
1,496✔
360
      } else if (taosStrcasecmp(ext, ".tomb") == 0) {
4,488✔
361
        STFileObj* fobj = fset->farr[TSDB_FTYPE_TOMB];
×
362
        if (fobj) {
×
363
          int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.tomb", &vgId, &fid, &cid, &mid);
×
364
          remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
×
365
        }
366
      } else if (taosStrcasecmp(ext, ".stt") == 0) {
4,488✔
367
        SSttLvl* lvl = NULL;
×
368
        TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
369
          STFileObj* fobj;
370
          TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
371
            int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.stt", &vgId, &fid, &cid, &mid);
×
372
            if (n == 4 && vgId == vid && fid == fset->fid && cid == fobj->f->cid && mid == fobj->f->mid) {
×
373
              remove = false;
×
374
              break;
×
375
            }
376
          }
377
          if (!remove) {
×
378
            break;
×
379
          }
380
        }
381
      } else if (taosStrcasecmp(ext, ".data") == 0) {
4,488✔
382
        STFileObj* fobj = fset->farr[TSDB_FTYPE_DATA];
2,992✔
383
        int n = sscanf(rname, "v%df%dver%" PRId64 ".%d.data", &vgId, &fid, &cid, &cn);
2,992✔
384
        if (n == 4) {
2,992✔
385
          if (vgId == vid && fid == fset->fid && cid == fobj->f->cid && cn >= 1 && cn < fobj->f->lcn) {
1,496✔
386
            remove = false; // not the last chunk, keep it
1,496✔
387
          }
388
        } else {
389
          n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.%d.data", &vgId, &fid, &cid, &mid, &cn);
1,496✔
390
          remove = (n != 5 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid || cn != fobj->f->lcn);
1,496✔
391
        }
392
      } else {
393
        remove = (taosStrcasecmp(rname, "manifests.json") != 0); // keep manifest, remove all other files
1,496✔
394
      }
395

396
      if (remove) {
7,480✔
397
        int32_t code = tssDeleteFileFromDefault(p);
×
398
        if (code != TSDB_CODE_SUCCESS) {
×
399
          tsdbError("vgId:%d, fid:%d, failed to remove garbage file %s from shared storage since %s", vid, fset->fid, p, tstrerror(code));
×
400
        } else {
401
          tsdbInfo("vgId:%d, fid:%d, garbage file %s is removed from shared storage", vid, fset->fid, p);
×
402
        }
403
      }
404

405
      taosMemoryFree(p);
7,480✔
406
  }
407

408
  taosArrayDestroy(paths);
1,496✔
409
}
410

411

412
// download the last chunk of a data file
413
// remote file name is like:
414
//      vnode2/f1736/v2f1736ver16.m13552343.4.data
415
static int32_t downloadDataFileLastChunk(SRTNer* rtner, STFileObj* fobj) {
×
416
  int32_t code = 0;
×
417
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
418
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
419
  STFile *f = fobj->f;
×
420

421
  char lpath[TSDB_FILENAME_LEN], rpath[TSDB_FILENAME_LEN];
×
422
  tsdbTFileLastChunkName(rtner->tsdb, f, lpath);
×
423
  char* fname = strrchr(lpath, TD_DIRSEP_CHAR) + 1;
×
424

425
  snprintf(rpath, sizeof(rpath), "vnode%d/f%d/%s", vid, f->fid, fname);
×
426

427
  code = tssDownloadFileFromDefault(rpath, lpath, 0, -1);
×
428
  if (code != TSDB_CODE_SUCCESS) {
×
429
    tsdbError("vgId:%d, fid:%d, failed to download data file %s since %s", vid, f->fid, rpath, tstrerror(code));
×
430
    return code;
×
431
  }
432

433
  return code;
×
434
}
435

436

437
// while other files all include the migration id in the remote file name, only the last
438
// chunk of a data file does the same. this is ok because:
439
// 1. without a compaction, data file is always uploaded chunk by chunk, only the last
440
//    chunk may be modified.
441
// 2. after a compaction, all of the data is downloaded to local, so overwriting remote
442
//    data chunks won't cause any problem. (this is not likely to happen because we will
443
//    cancel the migration in this case, refer comment in function shouldMigrate).
444
static int32_t uploadDataFile(SRTNer* rtner, STFileObj* fobj) {
1,496✔
445
  int32_t code = 0;
1,496✔
446
  int32_t vid = TD_VID(rtner->tsdb->pVnode), mid = getSsMigrateId(rtner->tsdb);
1,496✔
447
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
1,496✔
448
  int64_t szFile = 0, szChunk = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
1,496✔
449
  STFile *f = fobj->f;
1,496✔
450
  STFileOp op = {.optype = TSDB_FOP_MODIFY, .fid = f->fid, .of = *f};
1,496✔
451

452
  char path[TSDB_FILENAME_LEN];
1,496✔
453
  if (f->lcn <= 1) {
1,496✔
454
    tstrncpy(path, fobj->fname, sizeof(path));
1,496✔
455
  } else {
456
    tsdbTFileLastChunkName(rtner->tsdb, f, path);
×
457
  }
458

459
  code = taosStatFile(path, &szFile, NULL, NULL);
1,496✔
460
  if (code != TSDB_CODE_SUCCESS) {
1,496✔
461
    tsdbError("vgId:%d, fid:%d failed to stat file %s since %s", vid, f->fid, path, tstrerror(code));
×
462
    return false;
×
463
  }
464

465
  if (f->lcn > 1) {
1,496✔
466
    szFile += szChunk * (f->lcn - 1); // add the size of migrated chunks
×
467
  }
468

469
  int totalChunks = szFile / szChunk;
1,496✔
470
  if (szFile % szChunk) {
1,496✔
471
    totalChunks++;
1,496✔
472
  }
473

474
  int lcn = f->lcn < 1 ? 1 : f->lcn;
1,496✔
475

476
  // upload chunks one by one, the first chunk may already been uploaded, but may be
477
  // modified thereafter, so we need to upload it again
478
  for (int i = lcn; i <= totalChunks; ++i) {
4,488✔
479
    int64_t offset = (int64_t)(i - lcn) * szChunk;
2,992✔
480
    int64_t size = szChunk;
2,992✔
481
    if (i == totalChunks && szFile % szChunk) {
2,992✔
482
        size = szFile % szChunk;
1,496✔
483
    }
484

485
    // only include the migration id in the last chunk filename
486
    char rpath[TSDB_FILENAME_LEN];
2,992✔
487
    if (i == totalChunks) {
2,992✔
488
        snprintf(rpath, sizeof(rpath), "vnode%d/f%d/v%df%dver%" PRId64 ".m%d.%d.data", vid, f->fid, vid, f->fid, f->cid,
1,496✔
489
                 mid, i);
490
    } else {
491
        snprintf(rpath, sizeof(rpath), "vnode%d/f%d/v%df%dver%" PRId64 ".%d.data", vid, f->fid, vid, f->fid, f->cid, i);
1,496✔
492
    }
493

494
    code = tssUploadFileToDefault(rpath, path, offset, size);
2,992✔
495
    if (code != TSDB_CODE_SUCCESS) {
2,992✔
496
      tsdbError("vgId:%d, fid:%d, failed to migrate data file since %s", vid, f->fid, tstrerror(code));
×
UNCOV
497
      return code;
×
498
    }
499
  }
500

501
  f->lcn = totalChunks;
1,496✔
502
  op.nf = *f;
1,496✔
503
  code = TARRAY2_APPEND(&rtner->fopArr, op);
1,496✔
504
  if (code != TSDB_CODE_SUCCESS) {
1,496✔
505
    tsdbError("vgId:%d, fid:%d, failed to append file operation since %s", vid, f->fid, tstrerror(code));
×
UNCOV
506
    return code;
×
507
  }
508

509
  // manifest must be uploaded before copy last chunk, otherwise, failed to upload manifest
510
  // will result in a broken migration
511
  tsdbInfo("vgId:%d, fid:%d, data file migrated, begin generate & upload manifest", vid, f->fid);
1,496✔
512

513
  // manifest, this also commit the migration
514
  code = uploadManifest(vnodeNodeId(rtner->tsdb->pVnode), vid, rtner->fset, getSsMigrateId(rtner->tsdb));
1,496✔
515
  if (code != TSDB_CODE_SUCCESS) {
1,496✔
516
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
517
    return code;
×
518
  }
519

520
  tsdbInfo("vgId:%d, fid:%d, manifest uploaded, begin remove garbage files", vid, f->fid);
1,496✔
521
  tsdbRemoveSsGarbageFiles(vid, rtner->fset);
1,496✔
522

523
  setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SUCCEEDED);
1,496✔
524
  tsdbInfo("vgId:%d, fid:%d, leader migration succeeded", vid, f->fid);
1,496✔
525

526
  // no new chunks generated, no need to copy the last chunk
527
  if (totalChunks == lcn) {
1,496✔
UNCOV
528
    return 0;
×
529
  }
530

531
  // copy the last chunk to the new file
532
  char newPath[TSDB_FILENAME_LEN];
1,496✔
533
  tsdbTFileLastChunkName(rtner->tsdb, &op.nf, newPath);
1,496✔
534

535
  int64_t offset = (int64_t)(totalChunks - lcn) * szChunk;
1,496✔
536
  int64_t size = szChunk;
1,496✔
537
  if (szFile % szChunk) {
1,496✔
538
    size = szFile % szChunk;
1,496✔
539
  }
540

541
  TdFilePtr fdFrom = taosOpenFile(path, TD_FILE_READ);
1,496✔
542
  if (fdFrom == NULL) {
1,496✔
543
    code = terrno;
×
544
    tsdbError("vgId:%d, fid:%d, failed to open source file %s since %s", vid, f->fid, path, tstrerror(code));
×
UNCOV
545
    return code;
×
546
  }
547

548
  TdFilePtr fdTo = taosOpenFile(newPath, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
1,496✔
549
  if (fdTo == NULL) {
1,496✔
550
    code = terrno;
×
551
    tsdbError("vgId:%d, fid:%d, failed to open target file %s since %s", vid, f->fid, newPath, tstrerror(code));
×
552
    TAOS_UNUSED(taosCloseFile(&fdFrom));
×
UNCOV
553
    return code;
×
554
  }
555

556
  int64_t n = taosFSendFile(fdTo, fdFrom, &offset, size);
1,496✔
557
  if (n < 0) {
1,496✔
558
    code = terrno;
×
UNCOV
559
    tsdbError("vgId:%d, fid:%d, failed to copy file %s to %s since %s", vid, f->fid, path, newPath, tstrerror(code));
×
560
  }
561
  TAOS_UNUSED(taosCloseFile(&fdFrom));
1,496✔
562
  TAOS_UNUSED(taosCloseFile(&fdTo));
1,496✔
563

564
  return code;
1,496✔
565
}
566

567

568
static bool shouldMigrate(SRTNer *rtner, int32_t *pCode) {
3,740✔
569
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
3,740✔
570
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
3,740✔
571
  STFileSet *pLocalFset = rtner->fset;
3,740✔
572
  STFileObj *flocal = pLocalFset->farr[TSDB_FTYPE_DATA];
3,740✔
573

574
  *pCode = 0;
3,740✔
575
  if (!flocal) {
3,740✔
576
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, local data file not exist", vid, pLocalFset->fid);
×
577
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
UNCOV
578
    return false;
×
579
  }
580

581
  if (rtner->lastCommit != pLocalFset->lastCommit) {
3,740✔
582
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, there are new commits after migration task is scheduled", vid, pLocalFset->fid);
×
583
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
UNCOV
584
    return false;
×
585
  }
586

587
  if (pCfg->ssCompact && flocal->f->lcn < 0) {
3,740✔
588
    int32_t     lcn = flocal->f->lcn;
1,496✔
589
    STimeWindow win = {0};
1,496✔
590
    tsdbFidKeyRange(pLocalFset->fid, rtner->tsdb->keepCfg.days, rtner->tsdb->keepCfg.precision, &win.skey, &win.ekey);
1,496✔
591
    *pCode = tsdbAsyncCompact(rtner->tsdb, &win, TSDB_OPTR_SSMIGRATE, false);
1,496✔
592
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, fileset need compact, lcn: %d", vid, pLocalFset->fid, lcn);
1,496✔
593
    if (*pCode) {
1,496✔
UNCOV
594
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
595
    } else {
596
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_COMPACT);
1,496✔
597
    }
598
    return false; // compact in progress
1,496✔
599
  }
600

601
  char path[TSDB_FILENAME_LEN];
2,244✔
602
  if (flocal->f->lcn <= 1) {
2,244✔
603
    tstrncpy(path, flocal->fname, sizeof(path));
2,244✔
604
  } else {
UNCOV
605
    tsdbTFileLastChunkName(rtner->tsdb, flocal->f, path);
×
606
  }
607

608
  int64_t size = 0, mtime = 0;
2,244✔
609
  *pCode = taosStatFile(path, &size, &mtime, NULL);
2,244✔
610
  if (*pCode != TSDB_CODE_SUCCESS) {
2,244✔
611
    tsdbError("vgId:%d, fid:%d, migration cancelled, failed to stat file %s since %s", vid, pLocalFset->fid, path, tstrerror(*pCode));
×
612
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
613
    return false;
×
614
  }
615

616
  // file may become smaller after a compaction, especially after a RSMA compaction
617
  if (flocal->f->lcn < 1 && size <= (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize) {
2,244✔
618
    tsdbInfo("vgId:%d, fid:%d, migration skipped, data file is too small, size: %" PRId64 " bytes", vid, flocal->f->fid, size);
748✔
619
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
748✔
620
    return false;
748✔
621
  }
622

623
  // 'mtime >= rtner->tw.ekey - tsSsUploadDelaySec' means the file is active writing, and we should skip
624
  // the migration. However, this may also be a result of the [ssCompact] option, which should not
625
  // be skipped, so we also check 'mtime > pLocalFset->lastCompact / 1000 || !pCfg->ssCompact', note
626
  // this is not an acurate condition, but is simple and good enough.
627
  if (mtime >= rtner->tw.ekey - tsSsUploadDelaySec && (mtime > pLocalFset->lastCompact / 1000 || !pCfg->ssCompact)) {
1,496✔
628
    tsdbInfo("vgId:%d, fid:%d, migration skipped, data file is active writting, modified at %" PRId64, vid, pLocalFset->fid, mtime);
×
629
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
UNCOV
630
    return false; // still active writing, postpone migration
×
631
  }
632

633
  // download manifest from shared storage
634
  STFileSet *pRemoteFset = NULL;
1,496✔
635
  *pCode = downloadManifest(rtner->tsdb->pVnode, pLocalFset->fid, &pRemoteFset);
1,496✔
636
  if (*pCode == TSDB_CODE_SUCCESS) {
1,496✔
637
    // remote file exists but local file has not been migrated, there are two possibilities:
638
    // 1. there's a compact after the last migration, this is a normal case, we can discard
639
    //    the remote files and continue the migration;
640
    // 2. in the last migration, this node was a follower, the leader did its job successfully,
641
    //    but this node did not, continue the migration may overwrite remote file and result in
642
    //    data corruption on other nodes.
643
    // however, it is hard to distinguish them, so just treat both as a migration error. hope
644
    // the user could do something to recover, such as remove remote files.
645
    if (flocal->f->lcn < 1) {
×
646
      tsdbTFileSetClear(&pRemoteFset);
×
647
      tsdbError("vgId:%d, fid:%d, migration cancelled, remote manifest found but local lcn < 1", vid, pLocalFset->fid);
×
648
      *pCode = TSDB_CODE_FILE_CORRUPTED;
×
649
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
650
      return false;
×
651
    }
652

653
  } else if (*pCode == TSDB_CODE_NOT_FOUND) {
1,496✔
654
    if (flocal->f->lcn >= 1) {
1,496✔
655
      tsdbError("vgId:%d, fid:%d, migration cancelled, remote manifest not found but local lcn >= 1", vid, pLocalFset->fid);
×
656
      *pCode = TSDB_CODE_FILE_CORRUPTED;
×
657
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
658
      return false;
×
659
    }
660

661
    // this is the first migration, we should do it.
662
    *pCode = TSDB_CODE_SUCCESS;
1,496✔
663
    return true;
1,496✔
664

665
  } else {
666
    tsdbError("vgId:%d, fid:%d, migration cancelled, failed to download manifest, code: %d", vid, pLocalFset->fid, *pCode);
×
667
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
668
    return false;
×
669
  }
670

671
  STFileObj *fremote = pRemoteFset->farr[TSDB_FTYPE_DATA];
×
672
  if (fremote == NULL) {
×
673
    tsdbError("vgId:%d, fid:%d, migration cancelled, cannot find data file information from remote manifest", vid, pLocalFset->fid);
×
674
    tsdbTFileSetClear(&pRemoteFset);
×
675
    *pCode = TSDB_CODE_FILE_CORRUPTED;
×
676
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
677
    return false;
×
678
  }
679

680
  if (fremote->f->lcn != flocal->f->lcn) {
×
681
    tsdbError("vgId:%d, fid:%d, migration cancelled, remote and local data file information mismatch", vid, pLocalFset->fid);
×
682
    tsdbTFileSetClear(&pRemoteFset);
×
683
    *pCode = TSDB_CODE_FILE_CORRUPTED;
×
684
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
685
    return false;
×
686
  }
687
  
688
  if (fremote->f->maxVer == flocal->f->maxVer) {
×
689
    tsdbTFileSetClear(&pRemoteFset);
×
690
    tsdbInfo("vgId:%d, fid:%d, migration skipped, no new data", vid, pLocalFset->fid);
×
691
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
UNCOV
692
    return false; // no new data
×
693
  }
694

695
  tsdbTFileSetClear(&pRemoteFset); // we use the local file set information for migration
×
696
  tsdbInfo("vgId:%d, fid:%d, file set will be migrated", vid, pLocalFset->fid);
×
UNCOV
697
  return true;
×
698
}
699

700

701
static int32_t tsdbFollowerDoSsMigrate(SRTNer *rtner) {
×
702
  int32_t code = 0, vid = TD_VID(rtner->tsdb->pVnode);
×
703
  STFileSet *fset = rtner->fset;
×
704
  SSsMigrateMonitor* pmm = rtner->tsdb->pSsMigrateMonitor;
×
UNCOV
705
  int32_t fsIdx = 0;
×
706

707
  // though we make this check in the leader node, we should do this in the follower nodes too.
708
  // because there may be a leader change and the execution order of async tasks may result in
709
  // different commit time. if we don't do this, we may corrupt the follower data.
710
  if (rtner->lastCommit != fset->lastCommit) {
×
711
    tsdbInfo("vgId:%d, fid:%d, follower migration cancelled, there are new commits after migration is scheduled", vid, fset->fid);
×
712
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
UNCOV
713
    return 0;
×
714
  }
715

UNCOV
716
  tsdbInfo("vgId:%d, fid:%d, vnode is follower, waiting leader on node %d to upload.", vid, fset->fid, rtner->nodeId);
×
717

UNCOV
718
  (void)taosThreadMutexLock(&rtner->tsdb->mutex);
×
719

720
  while(pmm->state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
721
    struct timespec ts;
×
722
    if ((code = taosClockGetTime(CLOCK_REALTIME, &ts)) != TSDB_CODE_SUCCESS) {
×
723
      tsdbError("vgId:%d, fid:%d, failed to get current time since %s", vid, fset->fid, tstrerror(code));
×
724
      TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
UNCOV
725
      return code;
×
726
    }
727
    ts.tv_sec += 30; // TODO: make it configurable
×
728
    code = taosThreadCondTimedWait(&pmm->stateChanged, &rtner->tsdb->mutex, &ts);
×
729
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
×
730
      tsdbError("vgId:%d, fid:%d, waiting leader migration timed out", vid, fset->fid);
×
UNCOV
731
      pmm->state = SSMIGRATE_FILESET_STATE_FAILED;
×
732
    }
733
  }
734

735
  if (pmm->state != SSMIGRATE_FILESET_STATE_SUCCEEDED) {
×
736
    TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
737
    tsdbInfo("vgId:%d, fid:%d, follower migration skipped because leader migration skipped or failed", vid, fset->fid);
×
UNCOV
738
    return 0;
×
739
  }
740

UNCOV
741
  TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
742

743
  // NOTE: After the leader node finished processing the current file set and mnode sent the final
744
  // follower migrate request, mnode waits at least 30 seconds before triggering the processing of
745
  // the next file set. Because all file sets of a vgroup shares the same [pmm], from now on, the
746
  // follower should not access [pmm->state] anymore.
747
  // refer comments in mndUpdateSsMigrateProgress for more details.
748

749
  tsdbInfo("vgId:%d, fid:%d, follower migration started, begin downloading manifest...", vid, fset->fid);
×
750
  STFileSet *pRemoteFset = NULL;
×
751
  code = downloadManifest(rtner->tsdb->pVnode, fset->fid, &pRemoteFset);
×
752
  if (code == TSDB_CODE_NOT_FOUND) {
×
753
    tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
754
    return TSDB_CODE_FILE_CORRUPTED;
×
755
  }
756

757
  // this often happens in the catch up process of a new node, it is ok to continue, but will
758
  // result in download same files more than once, which is a waste of time and bandwidth.
759
  if (pRemoteFset->farr[TSDB_FTYPE_HEAD]->f->mid != getSsMigrateId(rtner->tsdb)) {
×
760
    tsdbTFileSetClear(&pRemoteFset);
×
761
    tsdbWarn("vgId:%d, fid:%d, follower migration cancelled, migration id mismatch", vid, fset->fid);
×
UNCOV
762
    return 0;
×
763
  }
764

765
  tsdbInfo("vgId:%d, fid:%d, manifest downloaded, begin downloading head file", vid, fset->fid);
×
766
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_HEAD]);
×
767
  if (code != TSDB_CODE_SUCCESS) {
×
768
    tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
769
    return code;
×
770
  }
771
  STFileOp op = {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_HEAD]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_HEAD]->f};
×
772
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
773
  if (code != TSDB_CODE_SUCCESS) {
×
774
    tsdbTFileSetClear(&pRemoteFset);
×
775
    tsdbError("vgId:%d, fid:%d, failed to append head file operation since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
776
    return code;
×
777
  }
778

779
  tsdbInfo("vgId:%d, fid:%d, head file downloaded, begin downloading sma file", vid, fset->fid);
×
780
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_SMA]);
×
781
  if (code != TSDB_CODE_SUCCESS) {
×
782
    tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
783
    return code;
×
784
  }
785
  op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_SMA]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_SMA]->f};
×
786
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
787
  if (code != TSDB_CODE_SUCCESS) {
×
788
    tsdbTFileSetClear(&pRemoteFset);
×
789
    tsdbError("vgId:%d, fid:%d, failed to append sma file operation since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
790
    return code;
×
791
  }
792

793
  tsdbInfo("vgId:%d, fid:%d, sma file downloaded, begin downloading tomb file", vid, fset->fid);
×
794
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_TOMB]);
×
795
  if (code != TSDB_CODE_SUCCESS) {
×
796
    tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
797
    return code;
×
798
  }
799
  if (fset->farr[TSDB_FTYPE_TOMB] != NULL && pRemoteFset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
800
    op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_TOMB]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_TOMB]->f};
×
801
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
UNCOV
802
  } else if (fset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
803
    // the remote tomb file is not found, but local tomb file exists, we should remove it
804
    op = (STFileOp) {.optype = TSDB_FOP_REMOVE, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_TOMB]->f};
×
805
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
806
  } else if (pRemoteFset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
807
    op = (STFileOp) {.optype = TSDB_FOP_CREATE, .fid = fset->fid, .nf = *pRemoteFset->farr[TSDB_FTYPE_TOMB]->f};
×
UNCOV
808
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
809
  }
810
  if (code != TSDB_CODE_SUCCESS) {
×
811
    tsdbTFileSetClear(&pRemoteFset);
×
812
    tsdbError("vgId:%d, fid:%d, failed to append tomb file operation since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
813
    return code;
×
814
  }
815

UNCOV
816
  tsdbInfo("vgId:%d, fid:%d, tomb file downloaded, begin downloading stt files", vid, fset->fid);
×
817
  SSttLvl* lvl;
UNCOV
818
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
819
    STFileObj* fobj;
820
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
821
      op = (STFileOp) {.optype = TSDB_FOP_REMOVE, .fid = fset->fid, .of = *fobj->f};
×
822
      code = TARRAY2_APPEND(&rtner->fopArr, op);
×
823
      if (code != TSDB_CODE_SUCCESS) {
×
824
        tsdbTFileSetClear(&pRemoteFset);
×
825
        tsdbError("vgId:%d, fid:%d, failed to append stt file remove operation since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
826
        return code;
×
827
      }
828
    }
829
  }
UNCOV
830
  TARRAY2_FOREACH(pRemoteFset->lvlArr, lvl) {
×
831
    STFileObj* fobj;
832
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
833
      code = downloadFile(rtner, fobj);
×
834
      if (code != TSDB_CODE_SUCCESS) {
×
835
        tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
836
        return code;
×
837
      }
838
      op = (STFileOp) {.optype = TSDB_FOP_CREATE, .fid = fset->fid, .nf = *fobj->f};
×
839
      code = TARRAY2_APPEND(&rtner->fopArr, op);
×
840
      if (code != TSDB_CODE_SUCCESS) {
×
841
        tsdbTFileSetClear(&pRemoteFset);
×
842
        tsdbError("vgId:%d, fid:%d, failed to append stt file create operation since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
843
        return code;
×
844
      }
845
    }
846
  }
847

848
  tsdbInfo("vgId:%d, fid:%d, stt files downloaded, begin downloading data file", vid, fset->fid);
×
849
  code = downloadDataFileLastChunk(rtner, pRemoteFset->farr[TSDB_FTYPE_DATA]);
×
850
  if (code != TSDB_CODE_SUCCESS) {
×
851
    tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
852
    return code;
×
853
  }
854
  op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_DATA]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_DATA]->f};
×
855
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
856
  if (code != TSDB_CODE_SUCCESS) {
×
857
    tsdbTFileSetClear(&pRemoteFset);
×
858
    tsdbError("vgId:%d, fid:%d, failed to append data file operation since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
859
    return code;
×
860
  }
861

862
  tsdbInfo("vgId:%d, fid:%d, data file downloaded", vid, fset->fid);
×
863
  tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
864
  return 0;
×
865
}
866

867

868
static int32_t tsdbLeaderDoSsMigrate(SRTNer *rtner) {
3,740✔
869
  int32_t code = 0, vid = TD_VID(rtner->tsdb->pVnode);
3,740✔
870
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
3,740✔
871
  STFileSet *fset = rtner->fset;
3,740✔
872

873
  tsdbInfo("vgId:%d, fid:%d, vnode is leader, migration started", vid, fset->fid);
3,740✔
874

875
  if (!shouldMigrate(rtner, &code)) {
3,740✔
876
    return code;
2,244✔
877
  }
878

879
  // head file
880
  tsdbInfo("vgId:%d, fid:%d, begin migrate head file", vid, fset->fid);
1,496✔
881
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_HEAD]);
1,496✔
882
  if (code != TSDB_CODE_SUCCESS) {
1,496✔
883
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
884
    return code;
×
885
  }
886

887
  tsdbInfo("vgId:%d, fid:%d, head file migrated, begin migrate sma file", vid, fset->fid);
1,496✔
888

889
  // sma file
890
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_SMA]);
1,496✔
891
  if (code != TSDB_CODE_SUCCESS) {
1,496✔
892
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
893
    return code;
×
894
  }
895

896
  tsdbInfo("vgId:%d, fid:%d, sma file migrated, begin migrate tomb file", vid, fset->fid);
1,496✔
897

898
  // tomb file
899
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_TOMB]);
1,496✔
900
  if (code != TSDB_CODE_SUCCESS) {
1,496✔
901
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
902
    return code;
×
903
  }
904

905
  tsdbInfo("vgId:%d, fid:%d, tomb file migrated, begin migrate stt files", vid, fset->fid);
1,496✔
906

907
  // stt files
908
  SSttLvl* lvl;
909
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
1,496✔
910
    STFileObj* fobj;
911
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
912
      code = uploadFile(rtner, fobj);
×
913
      if (code != TSDB_CODE_SUCCESS) {
×
914
        setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
915
        return code;
×
916
      }
917
    }
918
  }
919
  
920
  tsdbInfo("vgId:%d, fid:%d, stt files migrated, begin migrate data file", vid, fset->fid);
1,496✔
921

922
  // data file
923
  code = uploadDataFile(rtner, fset->farr[TSDB_FTYPE_DATA]);
1,496✔
924
  if (code != TSDB_CODE_SUCCESS) {
1,496✔
925
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
926
    return code;
×
927
  }
928

929
  return TSDB_CODE_SUCCESS;
1,496✔
930
}
931

932

933
int32_t tsdbDoSsMigrate(SRTNer *rtner) {
3,740✔
934
  // note: leader is decided when the task is scheduled, the actual leader may change after that,
935
  // but this is ok.
936
  if (rtner->nodeId == vnodeNodeId(rtner->tsdb->pVnode)) {
3,740✔
937
    return tsdbLeaderDoSsMigrate(rtner);
3,740✔
938
  }
UNCOV
939
  return tsdbFollowerDoSsMigrate(rtner);
×
940
}
941

942

943
#ifdef USE_SHARED_STORAGE
944

945
int32_t tsdbListSsMigrateFileSets(STsdb *tsdb, SArray* fidArr) {
4,488✔
946
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
4,488✔
947
  int64_t now = taosGetTimestampSec();
4,488✔
948
  SVnodeCfg *pCfg = &tsdb->pVnode->config;
4,488✔
949

950
  (void)taosThreadMutexLock(&tsdb->mutex);
4,488✔
951

952
  STFileSet *fset;
953
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
6,732✔
954
    if (tsdbFidLevel(fset->fid, &tsdb->keepCfg, now) < 0) {
2,244✔
955
      tsdbInfo("vgId:%d, fid:%d, migration skipped, file set is expired", vid, fset->fid);
×
UNCOV
956
      continue;
×
957
    }
958

959
    if (tsdbSsFidLevel(fset->fid, &tsdb->keepCfg, tsdb->pVnode->config.ssKeepLocal, now) < 1) {
2,244✔
960
      tsdbInfo("vgId:%d, fid:%d, migration skipped, keep local file set", vid, fset->fid);
×
UNCOV
961
      continue;
×
962
    }
963

964
    STFileObj *fdata = fset->farr[TSDB_FTYPE_DATA];
2,244✔
965
    if (fdata == NULL) {
2,244✔
966
      tsdbInfo("vgId:%d, fid:%d, migration skipped, no data file", vid, fset->fid);
×
UNCOV
967
      continue;
×
968
    }
969

970
    char path[TSDB_FILENAME_LEN];
2,244✔
971
    if (fdata->f->lcn <= 1) {
2,244✔
972
      tstrncpy(path, fdata->fname, sizeof(path));
2,244✔
973
    } else {
UNCOV
974
      tsdbTFileLastChunkName(tsdb, fdata->f, path);
×
975
    }
976

977
    int64_t size = 0;
2,244✔
978
    int32_t code = taosStatFile(path, &size, NULL, NULL);
2,244✔
979
    if (code != TSDB_CODE_SUCCESS) {
2,244✔
980
      tsdbError("vgId:%d, fid:%d, migration skipped, failed to stat file since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
981
      continue;
×
982
    }
983

984
    if (size <= (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize) {
2,244✔
985
      tsdbInfo("vgId:%d, fid:%d, migration skipped, data file is too small, size: %" PRId64 " bytes", vid, fset->fid, size);
×
UNCOV
986
      continue;
×
987
    }
988

989
    if (taosArrayPush(fidArr, &fset->fid) == NULL) {
4,488✔
990
      code = terrno;
×
991
      tsdbError("vgId:%d, failed to push file set id %d to array since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
992
      break;
×
993
    }
994
  }
995

996
  (void)taosThreadMutexUnlock(&tsdb->mutex);
4,488✔
997
  return code;
4,488✔
998
}
999

1000

1001
static int32_t tsdbAsyncMigrateFileSetImpl(STsdb *tsdb,  const SSsMigrateFileSetReq *pReq) {
3,740✔
1002
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
3,740✔
1003

1004
  // check if background task is disabled
1005
  if (tsdb->bgTaskDisabled) {
3,740✔
1006
    tsdbInfo("vgId:%d, ssMigrateId:%d, background task is disabled, skip", vid, pReq->ssMigrateId);
×
UNCOV
1007
    return TSDB_CODE_FAILED;
×
1008
  }
1009

1010
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
3,740✔
1011
  if (pmm->fid != 0 && pmm->state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
3,740✔
UNCOV
1012
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, failed to monitor since previous migration is still in progress",
×
1013
              vid, pReq->fid, pReq->ssMigrateId);
UNCOV
1014
    return TSDB_CODE_FAILED;
×
1015
  }
1016

1017
  STFileSet *fset = NULL, *fset1;
3,740✔
1018
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset1) {
3,740✔
1019
    if (fset1->fid == pReq->fid) {
3,740✔
1020
      fset = fset1;
3,740✔
1021
      break;
3,740✔
1022
    }
1023
  }
1024

1025
  if (fset == NULL) {
3,740✔
1026
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, file set not found", vid, pReq->fid, pReq->ssMigrateId);
×
UNCOV
1027
    return TSDB_CODE_NOT_FOUND;
×
1028
  }
1029

1030
  if (fset->lastMigrate/1000 >= pReq->startTimeSec) {
3,740✔
1031
    tsdbInfo("vgId:%d, fid:%d, ssMigrate:%d, start time < last migration time, skip", vid, fset->fid, pReq->ssMigrateId);
×
UNCOV
1032
    return TSDB_CODE_FAILED;
×
1033
  }
1034

1035
  SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
3,740✔
1036
  if (arg == NULL) {
3,740✔
1037
    code = terrno;
×
1038
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, memory allocation failed", vid, pReq->fid, pReq->ssMigrateId);
×
UNCOV
1039
    return code;
×
1040
  }
1041

1042
  arg->tsdb = tsdb;
3,740✔
1043
  arg->tw.skey = INT64_MIN;
3,740✔
1044
  arg->tw.ekey = pReq->startTimeSec;
3,740✔
1045
  arg->fid = fset->fid;
3,740✔
1046
  arg->nodeId = pReq->nodeId;
3,740✔
1047
  arg->optrType = TSDB_OPTR_SSMIGRATE;
3,740✔
1048
  arg->lastCommit = fset->lastCommit;
3,740✔
1049

1050
  pmm->ssMigrateId = pReq->ssMigrateId;
3,740✔
1051
  pmm->fid = fset->fid;
3,740✔
1052
  pmm->state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
3,740✔
1053
  pmm->startTimeSec = pReq->startTimeSec;
3,740✔
1054

1055
  code = vnodeAsync(RETENTION_TASK_ASYNC, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, &fset->migrateTask);
3,740✔
1056
  if (code) {
3,740✔
1057
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, schedule async task failed", vid, pReq->fid, pReq->ssMigrateId);
×
1058
    taosMemoryFree(arg);
×
UNCOV
1059
    pmm->state = SSMIGRATE_FILESET_STATE_FAILED;
×
1060
  }
1061

1062
  return code;
3,740✔
1063
}
1064

1065

1066
int32_t tsdbAsyncSsMigrateFileSet(STsdb *tsdb, SSsMigrateFileSetReq *pReq) {
3,740✔
1067
  int32_t code = 0;
3,740✔
1068

1069
  (void)taosThreadMutexLock(&tsdb->mutex);
3,740✔
1070
  code = tsdbAsyncMigrateFileSetImpl(tsdb, pReq);
3,740✔
1071
  (void)taosThreadMutexUnlock(&tsdb->mutex);
3,740✔
1072

1073
  return code;
3,740✔
1074
}
1075

1076

1077
void tsdbStopSsMigrateTask(STsdb* tsdb, int32_t ssMigrateId) {
×
UNCOV
1078
  (void)taosThreadMutexLock(&tsdb->mutex);
×
1079

1080
  if (tsdb->pSsMigrateMonitor == NULL) {
×
1081
    (void)taosThreadMutexUnlock(&tsdb->mutex);
×
UNCOV
1082
    return;
×
1083
  }
1084

1085
  if (tsdb->pSsMigrateMonitor->ssMigrateId != ssMigrateId) {
×
1086
    tsdbInfo("vgId:%d, ssMigrateId:%d, migration task not found", TD_VID(tsdb->pVnode), ssMigrateId);
×
1087
    (void)taosThreadMutexUnlock(&tsdb->mutex);
×
UNCOV
1088
    return;
×
1089
  }
1090

1091
  if (tsdb->pSsMigrateMonitor->state != SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
1092
    (void)taosThreadMutexUnlock(&tsdb->mutex);
×
UNCOV
1093
    return;
×
1094
  }
1095

1096
  STFileSet *fset;
1097
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
×
1098
    if (fset->fid == tsdb->pSsMigrateMonitor->fid) {
×
1099
      (void)vnodeACancel(&fset->migrateTask);
×
UNCOV
1100
      break;
×
1101
    }
1102
  }
1103

UNCOV
1104
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
1105
}
1106

1107
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc