• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4768

01 Oct 2025 04:06AM UTC coverage: 57.85% (-0.8%) from 58.606%
#4768

push

travis-ci

web-flow
Merge pull request #33171 from taosdata/merge/3.3.6tomain

merge: from 3.3.6 to main branch

137167 of 302743 branches covered (45.31%)

Branch coverage included in aggregate %.

15 of 20 new or added lines in 2 files covered. (75.0%)

12125 existing lines in 175 files now uncovered.

208282 of 294403 relevant lines covered (70.75%)

5618137.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.37
/source/dnode/vnode/src/tsdb/tsdbMigrate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tss.h"
17
#include "tsdb.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSet2.h"
20
#include "vnd.h"
21
#include "tsdbInt.h"
22

23

24
extern int32_t tsdbAsyncCompact(STsdb* tsdb, const STimeWindow* tw, ETsdbOpType type);
25

26

27
// migrate monitor related functions
28
typedef struct SSsMigrateMonitor {
29
  TdThreadCond  stateChanged;
30
  int32_t ssMigrateId;
31
  int32_t fid;
32
  int32_t state;
33
  int64_t startTimeSec;
34
} SSsMigrateMonitor;
35

36

37
static int32_t getSsMigrateId(STsdb* tsdb) {
×
38
  return tsdb->pSsMigrateMonitor->ssMigrateId;
×
39
}
40

41

42
int32_t tsdbOpenSsMigrateMonitor(STsdb *tsdb) {
12,514✔
43
  SSsMigrateMonitor* pmm = (SSsMigrateMonitor*)taosMemCalloc(1, sizeof(SSsMigrateMonitor));
12,514✔
44
  if (pmm == NULL) {
12,516!
45
    return TSDB_CODE_OUT_OF_MEMORY;
×
46
  }
47
  TAOS_UNUSED(taosThreadCondInit(&pmm->stateChanged, NULL));
12,516✔
48
  tsdb->pSsMigrateMonitor = pmm;
12,511✔
49
  return 0;
12,511✔
50
}
51

52

53
void tsdbCloseSsMigrateMonitor(STsdb *tsdb) {
12,514✔
54
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
12,514✔
55
  if (pmm == NULL) {
12,514!
56
    return;
×
57
  }
58

59
  TAOS_UNUSED(taosThreadCondDestroy(&pmm->stateChanged));
12,514✔
60
  taosMemoryFree(tsdb->pSsMigrateMonitor);
12,512!
61
  tsdb->pSsMigrateMonitor = NULL;
12,521✔
62
}
63

64

65
static void setMigrationState(STsdb* tsdb, int32_t state) {
×
66
  (void)taosThreadMutexLock(&tsdb->mutex);
×
67
  tsdb->pSsMigrateMonitor->state = state;
×
68
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
69
}
×
70

71

72
int32_t tsdbQuerySsMigrateProgress(STsdb *tsdb, SSsMigrateProgress *pProgress) {
×
73
  int32_t code = 0, vid = TD_VID(tsdb->pVnode);
×
74

75
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
×
76

77
  (void)taosThreadMutexLock(&tsdb->mutex);
×
78

79
  if (pmm->ssMigrateId != pProgress->ssMigrateId) {
×
80
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, migrate id mismatch in query progress, actual %d",
×
81
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->ssMigrateId);
82
    code = TSDB_CODE_INVALID_MSG;
×
83
  } else if (pmm->fid != pProgress->fid) {
×
84
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, file set id mismatch in query progress, actual %d",
×
85
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->fid);
86
    code = TSDB_CODE_INVALID_MSG;
×
87
  } else {
88
    pProgress->state = pmm->state;
×
89
  }
90

91
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
92

93
  return code;
×
94
}
95

96

97

98
int32_t tsdbUpdateSsMigrateProgress(STsdb* tsdb, SSsMigrateProgress* pProgress) {
×
99
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
×
100
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
×
101

102
  // the state was generated by this vnode, so no need to process it
103
  if (pProgress->nodeId == vnodeNodeId(tsdb->pVnode)) {
×
104
    tsdbDebug("vgId:%d, skip migration progress update since it was generated by this vnode", vid);
×
105
    return 0;
×
106
  }
107

108
  (void)taosThreadMutexLock(&tsdb->mutex);
×
109

110
  if (pmm->ssMigrateId != pProgress->ssMigrateId) {
×
111
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, migrate id mismatch in update progress, actual %d",
×
112
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->ssMigrateId);
113
    code = TSDB_CODE_INVALID_MSG;
×
114
  } else if (pmm->fid != pProgress->fid) {
×
115
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, file set id mismatch in update progress, actual %d",
×
116
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->fid);
117
    code = TSDB_CODE_INVALID_MSG;
×
118
  } else {
119
    // update the state, and broadcast state change message, to avoid the timeout of
120
    // the waiting thread, we should always broadcast the message even if the state
121
    // is not changed actually.
122
    pmm->state = pProgress->state;
×
123
    (void)taosThreadCondBroadcast(&pmm->stateChanged);
×
124
  }
125

126
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
127

128
  return code;
×
129
}
130

131

132

133
// migrate file related functions
134
int32_t tsdbSsFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int32_t ssKeepLocal, int64_t nowSec) {
×
135
  int32_t localFid;
136
  TSKEY   key;
137

138
  if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
×
139
    nowSec = nowSec * 1000;
×
140
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
×
141
    nowSec = nowSec * 1000000l;
×
142
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
×
143
    nowSec = nowSec * 1000000000l;
×
144
  }
145

146
  nowSec = nowSec - pKeepCfg->keepTimeOffset * tsTickPerHour[pKeepCfg->precision];
×
147

148
  key = nowSec - ssKeepLocal * tsTickPerMin[pKeepCfg->precision];
×
149
  localFid = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
×
150

151
  return fid >= localFid ? 0 : 1;
×
152
}
153

154

155
static int32_t downloadManifest(SVnode* pVnode, int32_t fid, STFileSet** ppFileSet) {
×
156
  int32_t code = 0, vid = TD_VID(pVnode);
×
157

158
  char path[64];
159
  snprintf(path, sizeof(path), "vnode%d/f%d/manifests.json", vid, fid);
×
160
  int64_t size = 0;
×
161
  code = tssGetFileSizeOfDefault(path, &size);
×
162
  if (code != TSDB_CODE_SUCCESS) {
×
163
    tsdbError("vgId:%d, fid:%d, failed to get manifests size since %s", vid, fid, tstrerror(code));
×
164
    return code;
×
165
  }
166

167
  char* buf = taosMemoryMalloc(size + 1);
×
168
  code = tssReadFileFromDefault(path, 0, buf, &size);
×
169
  if (code != TSDB_CODE_SUCCESS) {
×
170
    tsdbError("vgId:%d, fid:%d, failed to read manifest from shared storage since %s", vid, fid, tstrerror(code));
×
171
    taosMemoryFree(buf);
×
172
    return code;
×
173
  }
174
  buf[size] = 0;
×
175

176
  cJSON* json = cJSON_Parse(buf);
×
177
  taosMemoryFree(buf);
×
178
  if (json == NULL) {
×
179
    tsdbError("vgId:%d, fid:%d, failed to parse manifest json since %s", vid, fid, tstrerror(code));
×
180
    return TSDB_CODE_FILE_CORRUPTED;
×
181
  }
182
  
183
  code = tsdbJsonToTFileSet(pVnode->pTsdb, json, ppFileSet);
×
184
  cJSON_Delete(json);
×
185
  if (code != TSDB_CODE_SUCCESS) {
×
186
    tsdbError("vgId:%d, fid:%d, failed to parse manifest since %s", vid, fid, tstrerror(code));
×
187
    return code;
×
188
  }
189

190
  STFileSet* fset = *ppFileSet;
×
191
  if (fset->fid != fid) {
×
192
    tsdbError("vgId:%d, fid:%d, mismatch fid, manifest fid is %d", vid, fid, fset->fid);
×
193
    tsdbTFileSetClear(ppFileSet);
×
194
    return TSDB_CODE_FILE_CORRUPTED;
×
195
  }
196
  if (fset->farr[TSDB_FTYPE_DATA] == NULL) {
×
197
    tsdbError("vgId:%d, fid:%d, data file not found in manifest", vid, fid);
×
198
    tsdbTFileSetClear(ppFileSet);
×
199
    return TSDB_CODE_FILE_CORRUPTED;
×
200
  }
201

202
  return code;
×
203
}
204

205

206
static int32_t uploadManifest(int32_t dnode, int32_t vnode, STFileSet* fset, int32_t mid) {
×
207
  int32_t code = 0;
×
208

209
  cJSON* json = cJSON_CreateObject();
×
210
  if (json == NULL) {
×
211
    return TSDB_CODE_OUT_OF_MEMORY;
×
212
  }
213

214
  // update migration id for all files in the file set
215
  STFileObj* fobj = fset->farr[TSDB_FTYPE_HEAD];
×
216
  fobj->f->mid = mid;
×
217
  fobj = fset->farr[TSDB_FTYPE_SMA];
×
218
  fobj->f->mid = mid;
×
219
  fobj = fset->farr[TSDB_FTYPE_DATA];
×
220
  fobj->f->mid = mid;
×
221
  fobj = fset->farr[TSDB_FTYPE_TOMB];
×
222
  if (fobj != NULL) {
×
223
    fobj->f->mid = mid;
×
224
  }
225

226
  SSttLvl* lvl;
227
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
228
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
229
      fobj->f->mid = mid;
×
230
    }
231
  }
232
  
233
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
×
234
    code = terrno;
×
235
    cJSON_Delete(json);
×
236
    return code;
×
237
  }
238
  if (cJSON_AddNumberToObject(json, "dnode", dnode) == NULL) {
×
239
    code = terrno;
×
240
    cJSON_Delete(json);
×
241
    return code;
×
242
  }
243
  if (cJSON_AddNumberToObject(json, "vnode", vnode) == NULL) {
×
244
    code = terrno;
×
245
    cJSON_Delete(json);
×
246
    return code;
×
247
  }
248

249
  code = tsdbTFileSetToJson(fset, json);
×
250
  if (code != TSDB_CODE_SUCCESS) {
×
251
    cJSON_Delete(json);
×
252
    return code;
×
253
  }
254
  char* buf = cJSON_PrintUnformatted(json);
×
255
  cJSON_Delete(json);
×
256

257
  char path[64];
258
  snprintf(path, sizeof(path), "vnode%d/f%d/manifests.json", vnode, fset->fid);
×
259
  code = tssUploadToDefault(path, buf, strlen(buf));
×
260
  taosMemoryFree(buf);
×
261
  if (code != TSDB_CODE_SUCCESS) {
×
262
    tsdbError("vgId:%d, fid:%d, failed to upload manifest since %s", vnode, fset->fid, tstrerror(code));
×
263
    return code;
×
264
  }
265

266
  return code;
×
267
}
268

269

270
// upload local files to shared storage
271
//
272
// local file name is like:
273
//     [base]/vnode2/f1736/v2f1736ver16.head
274
// or
275
//     [base]/vnode2/f1736/v2f1736ver16.m334233.head
276
//
277
// remote file name is like:
278
//     vnode2/f1736/v2f1736ver16.m13552343.head
279
//
280
// NOTE: the migration id is always included in the remote file name, because
281
// the commit id may be different between the vnodes of the same vgroup,
282
// that's an interrupted migration may overwrite some of the remote files
283
// while leaving the others intact if we don't include the migration id in
284
// the remote file name.
285
static int32_t uploadFile(SRTNer* rtner, STFileObj* fobj) {
×
286
  if (fobj == NULL) {
×
287
    return TSDB_CODE_SUCCESS;
×
288
  }
289

290
  const char* ext = strrchr(fobj->fname, '.');
×
291
  int32_t vid = TD_VID(rtner->tsdb->pVnode), mid = getSsMigrateId(rtner->tsdb);
×
292
  STFile* f = fobj->f;
×
293
  
294
  char path[TSDB_FILENAME_LEN];
295
  snprintf(path, sizeof(path), "vnode%d/f%d/v%df%dver%" PRId64 ".m%d%s", vid, f->fid, vid, f->fid, f->cid, mid, ext);
×
296

297
  int code = tssUploadFileToDefault(path, fobj->fname, 0, -1);
×
298
  if (code != TSDB_CODE_SUCCESS) {
×
299
    tsdbError("vgId:%d, fid:%d, failed to upload file %s since %s", vid, f->fid, fobj->fname, tstrerror(code));
×
300
    return code;
×
301
  }
302

303
  return 0;
×
304
}
305

306

307

308
static int32_t downloadFile(SRTNer* rtner, STFileObj* fobj) {
×
309
  if (fobj == NULL) {
×
310
    return TSDB_CODE_SUCCESS;
×
311
  }
312

313
  const char* fname = strrchr(fobj->fname, TD_DIRSEP_CHAR) + 1;
×
314
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
315
  STFile* f = fobj->f;
×
316
  
317
  char path[TSDB_FILENAME_LEN];
318
  snprintf(path, sizeof(path), "vnode%d/f%d/%s", vid, f->fid, fname);
×
319
  int code = tssDownloadFileFromDefault(path, fobj->fname, 0, -1);
×
320
  if (code != TSDB_CODE_SUCCESS) {
×
321
    tsdbError("vgId:%d, fid:%d, failed to download file %s since %s", vid, f->fid, path, tstrerror(code));
×
322
    return code;
×
323
  }
324

325
  return 0;
×
326
}
327

328

329
static void tsdbRemoveSsGarbageFiles(int32_t vid, STFileSet* fset) {
×
330
  char prefix[TSDB_FILENAME_LEN];
331
  snprintf(prefix, sizeof(prefix), "vnode%d/f%d/", vid, fset->fid);
×
332

333
  SArray* paths = taosArrayInit(10, sizeof(char*));
×
334
  int32_t code = tssListFileOfDefault(prefix, paths);
×
335
  if (code != TSDB_CODE_SUCCESS) {
×
336
    tsdbError("vgId:%d, fid:%d, failed to list files in shared storage since %s", vid, fset->fid, tstrerror(code));
×
337
    taosArrayDestroy(paths);
×
338
    return;
×
339
  }
340

341
  for(int i = 0; i < taosArrayGetSize(paths); i++) {
×
342
      char* p = *(char**)taosArrayGet(paths, i);
×
343
      const char* ext = strrchr(p, '.');
×
344
      const char* rname = strrchr(p, '/') + 1;
×
345
      bool remove = true;
×
346
      int32_t vgId = 0, fid = 0, mid = 0, cn = 0;
×
347
      int64_t cid = 0;
×
348
  
349
      // NOTE: when compare file name, don't use strcmp(fobj->fname, rname) because 'fobj->fname' may not
350
      // contain the migration id. that's why we use sscanf to parse the file name.
351

352
      if (ext == NULL) {
×
353
        // no extension, remove
354
      } else if (taosStrcasecmp(ext, ".head") == 0) {
×
355
        STFileObj* fobj = fset->farr[TSDB_FTYPE_HEAD];
×
356
        int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.head", &vgId, &fid, &cid, &mid);
×
357
        remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
×
358
      } else if (taosStrcasecmp(ext, ".sma") == 0) {
×
359
        STFileObj* fobj = fset->farr[TSDB_FTYPE_SMA];
×
360
        int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.sma", &vgId, &fid, &cid, &mid);
×
361
        remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
×
362
      } else if (taosStrcasecmp(ext, ".tomb") == 0) {
×
363
        STFileObj* fobj = fset->farr[TSDB_FTYPE_TOMB];
×
364
        if (fobj) {
×
365
          int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.tomb", &vgId, &fid, &cid, &mid);
×
366
          remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
×
367
        }
368
      } else if (taosStrcasecmp(ext, ".stt") == 0) {
×
369
        SSttLvl* lvl = NULL;
×
370
        TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
371
          STFileObj* fobj;
372
          TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
373
            int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.stt", &vgId, &fid, &cid, &mid);
×
374
            if (n == 4 && vgId == vid && fid == fset->fid && cid == fobj->f->cid && mid == fobj->f->mid) {
×
375
              remove = false;
×
376
              break;
×
377
            }
378
          }
379
          if (!remove) {
×
380
            break;
×
381
          }
382
        }
383
      } else if (taosStrcasecmp(ext, ".data") == 0) {
×
384
        STFileObj* fobj = fset->farr[TSDB_FTYPE_DATA];
×
385
        int n = sscanf(rname, "v%df%dver%" PRId64 ".%d.data", &vgId, &fid, &cid, &cn);
×
386
        if (n == 4) {
×
387
          if (vgId == vid && fid == fset->fid && cid == fobj->f->cid && cn >= 1 && cn < fobj->f->lcn) {
×
388
            remove = false; // not the last chunk, keep it
×
389
          }
390
        } else {
391
          n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.%d.data", &vgId, &fid, &cid, &mid, &cn);
×
392
          remove = (n != 5 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid || cn != fobj->f->lcn);
×
393
        }
394
      } else {
395
        remove = (taosStrcasecmp(rname, "manifests.json") != 0); // keep manifest, remove all other files
×
396
      }
397

398
      if (remove) {
×
399
        int32_t code = tssDeleteFileFromDefault(p);
×
400
        if (code != TSDB_CODE_SUCCESS) {
×
401
          tsdbError("vgId:%d, fid:%d, failed to remove garbage file %s from shared storage since %s", vid, fset->fid, p, tstrerror(code));
×
402
        } else {
403
          tsdbInfo("vgId:%d, fid:%d, garbage file %s is removed from shared storage", vid, fset->fid, p);
×
404
        }
405
      }
406

407
      taosMemoryFree(p);
×
408
  }
409

410
  taosArrayDestroy(paths);
×
411
}
412

413

414
// download the last chunk of a data file
415
// remote file name is like:
416
//      vnode2/f1736/v2f1736ver16.m13552343.4.data
417
static int32_t downloadDataFileLastChunk(SRTNer* rtner, STFileObj* fobj) {
×
418
  int32_t code = 0;
×
419
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
420
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
421
  STFile *f = fobj->f;
×
422

423
  char lpath[TSDB_FILENAME_LEN], rpath[TSDB_FILENAME_LEN];
424
  tsdbTFileLastChunkName(rtner->tsdb, f, lpath);
×
425
  char* fname = strrchr(lpath, TD_DIRSEP_CHAR) + 1;
×
426

427
  sprintf(rpath, "vnode%d/f%d/%s", vid, f->fid, fname);
×
428

429
  code = tssDownloadFileFromDefault(rpath, lpath, 0, -1);
×
430
  if (code != TSDB_CODE_SUCCESS) {
×
431
    tsdbError("vgId:%d, fid:%d, failed to download data file %s since %s", vid, f->fid, rpath, tstrerror(code));
×
432
    return code;
×
433
  }
434

435
  return code;
×
436
}
437

438

439
// while other files all include the migration id in the remote file name, only the last
440
// chunk of a data file does the same. this is ok because:
441
// 1. without a compaction, data file is always uploaded chunk by chunk, only the last
442
//    chunk may be modified.
443
// 2. after a compaction, all of the data is downloaded to local, so overwriting remote
444
//    data chunks won't cause any problem. (this is not likely to happen because we will
445
//    cancel the migration in this case, refer comment in function shouldMigrate).
446
static int32_t uploadDataFile(SRTNer* rtner, STFileObj* fobj) {
×
447
  int32_t code = 0;
×
448
  int32_t vid = TD_VID(rtner->tsdb->pVnode), mid = getSsMigrateId(rtner->tsdb);
×
449
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
450
  int64_t szFile = 0, szChunk = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
451
  STFile *f = fobj->f;
×
452
  STFileOp op = {.optype = TSDB_FOP_MODIFY, .fid = f->fid, .of = *f};
×
453

454
  char path[TSDB_FILENAME_LEN];
455
  if (f->lcn <= 1) {
×
456
    strcpy(path, fobj->fname);
×
457
  } else {
458
    tsdbTFileLastChunkName(rtner->tsdb, f, path);
×
459
  }
460

461
  code = taosStatFile(path, &szFile, NULL, NULL);
×
462
  if (code != TSDB_CODE_SUCCESS) {
×
463
    tsdbError("vgId:%d, fid:%d failed to stat file %s since %s", vid, f->fid, path, tstrerror(code));
×
464
    return false;
×
465
  }
466

467
  if (f->lcn > 1) {
×
468
    szFile += szChunk * (f->lcn - 1); // add the size of migrated chunks
×
469
  }
470

471
  int totalChunks = szFile / szChunk;
×
472
  if (szFile % szChunk) {
×
473
    totalChunks++;
×
474
  }
475

476
  int lcn = f->lcn < 1 ? 1 : f->lcn;
×
477

478
  // upload chunks one by one, the first chunk may already been uploaded, but may be
479
  // modified thereafter, so we need to upload it again
480
  for (int i = lcn; i <= totalChunks; ++i) {
×
481
    int64_t offset = (int64_t)(i - lcn) * szChunk;
×
482
    int64_t size = szChunk;
×
483
    if (i == totalChunks && szFile % szChunk) {
×
484
        size = szFile % szChunk;
×
485
    }
486

487
    // only include the migration id in the last chunk filename
488
    char rpath[TSDB_FILENAME_LEN];
489
    if (i == totalChunks) {
×
490
      sprintf(rpath, "vnode%d/f%d/v%df%dver%" PRId64 ".m%d.%d.data", vid, f->fid, vid, f->fid, f->cid, mid, i);
×
491
    } else {
492
      sprintf(rpath, "vnode%d/f%d/v%df%dver%" PRId64 ".%d.data", vid, f->fid, vid, f->fid, f->cid, i);
×
493
    }
494

495
    code = tssUploadFileToDefault(rpath, path, offset, size);
×
496
    if (code != TSDB_CODE_SUCCESS) {
×
497
      tsdbError("vgId:%d, fid:%d, failed to migrate data file since %s", vid, f->fid, tstrerror(code));
×
498
      return code;
×
499
    }
500
  }
501

502
  f->lcn = totalChunks;
×
503
  op.nf = *f;
×
504
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
505
  if (code != TSDB_CODE_SUCCESS) {
×
506
    tsdbError("vgId:%d, fid:%d, failed to append file operation since %s", vid, f->fid, tstrerror(code));
×
507
    return code;
×
508
  }
509

510
  // manifest must be uploaded before copy last chunk, otherwise, failed to upload manifest
511
  // will result in a broken migration
512
  tsdbInfo("vgId:%d, fid:%d, data file migrated, begin generate & upload manifest", vid, f->fid);
×
513

514
  // manifest, this also commit the migration
515
  code = uploadManifest(vnodeNodeId(rtner->tsdb->pVnode), vid, rtner->fset, getSsMigrateId(rtner->tsdb));
×
516
  if (code != TSDB_CODE_SUCCESS) {
×
517
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
518
    return code;
×
519
  }
520

521
  tsdbInfo("vgId:%d, fid:%d, manifest uploaded, begin remove garbage files", vid, f->fid);
×
522
  tsdbRemoveSsGarbageFiles(vid, rtner->fset);
×
523

524
  setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SUCCEEDED);
×
525
  tsdbInfo("vgId:%d, fid:%d, leader migration succeeded", vid, f->fid);
×
526

527
  // no new chunks generated, no need to copy the last chunk
528
  if (totalChunks == lcn) {
×
529
    return 0;
×
530
  }
531

532
  // copy the last chunk to the new file
533
  char newPath[TSDB_FILENAME_LEN];
534
  tsdbTFileLastChunkName(rtner->tsdb, &op.nf, newPath);
×
535

536
  int64_t offset = (int64_t)(totalChunks - lcn) * szChunk;
×
537
  int64_t size = szChunk;
×
538
  if (szFile % szChunk) {
×
539
    size = szFile % szChunk;
×
540
  }
541

542
  TdFilePtr fdFrom = taosOpenFile(path, TD_FILE_READ);
×
543
  if (fdFrom == NULL) {
×
544
    code = terrno;
×
545
    tsdbError("vgId:%d, fid:%d, failed to open source file %s since %s", vid, f->fid, path, tstrerror(code));
×
546
    return code;
×
547
  }
548

549
  TdFilePtr fdTo = taosOpenFile(newPath, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
550
  if (fdTo == NULL) {
×
551
    code = terrno;
×
552
    tsdbError("vgId:%d, fid:%d, failed to open target file %s since %s", vid, f->fid, newPath, tstrerror(code));
×
553
    TAOS_UNUSED(taosCloseFile(&fdFrom));
×
554
    return code;
×
555
  }
556

557
  int64_t n = taosFSendFile(fdTo, fdFrom, &offset, size);
×
558
  if (n < 0) {
×
559
    code = terrno;
×
560
    tsdbError("vgId:%d, fid:%d, failed to copy file %s to %s since %s", vid, f->fid, path, newPath, tstrerror(code));
×
561
  }
562
  TAOS_UNUSED(taosCloseFile(&fdFrom));
×
563
  TAOS_UNUSED(taosCloseFile(&fdTo));
×
564

565
  return code;
×
566
}
567

568

569
static bool shouldMigrate(SRTNer *rtner, int32_t *pCode) {
×
570
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
571
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
572
  STFileSet *pLocalFset = rtner->fset;
×
573
  STFileObj *flocal = pLocalFset->farr[TSDB_FTYPE_DATA];
×
574

575
  *pCode = 0;
×
576
  if (!flocal) {
×
577
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, local data file not exist", vid, pLocalFset->fid);
×
578
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
579
    return false;
×
580
  }
581

582
  if (rtner->lastCommit != pLocalFset->lastCommit) {
×
583
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, there are new commits after migration task is scheduled", vid, pLocalFset->fid);
×
584
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
585
    return false;
×
586
  }
587

588
  if (pCfg->ssCompact && flocal->f->lcn < 0) {
×
589
    int32_t     lcn = flocal->f->lcn;
×
590
    STimeWindow win = {0};
×
591
    tsdbFidKeyRange(pLocalFset->fid, rtner->tsdb->keepCfg.days, rtner->tsdb->keepCfg.precision, &win.skey, &win.ekey);
×
592
    ETsdbOpType type = VND_IS_RSMA((rtner->tsdb->pVnode)) ? TSDB_OPTR_ROLLUP : TSDB_OPTR_SSMIGRATE;
×
593
    *pCode = tsdbAsyncCompact(rtner->tsdb, &win, type);
×
594
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, fileset need compact, lcn: %d", vid, pLocalFset->fid, lcn);
×
595
    if (*pCode) {
×
596
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
597
    } else {
598
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_COMPACT);
×
599
    }
600
    return false; // compact in progress
×
601
  }
602

603
  char path[TSDB_FILENAME_LEN];
604
  if (flocal->f->lcn <= 1) {
×
605
    strcpy(path, flocal->fname);
×
606
  } else {
607
    tsdbTFileLastChunkName(rtner->tsdb, flocal->f, path);
×
608
  }
609

610
  int64_t mtime = 0;
×
611
  *pCode = taosStatFile(path, NULL, &mtime, NULL);
×
612
  if (*pCode != TSDB_CODE_SUCCESS) {
×
613
    tsdbError("vgId:%d, fid:%d, migration cancelled, failed to stat file %s since %s", vid, pLocalFset->fid, path, tstrerror(*pCode));
×
614
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
615
    return false;
×
616
  }
617

618
  // 'mtime >= rtner->tw.ekey - tsSsUploadDelaySec' means the file is active writing, and we should skip
619
  // the migration. However, this may also be a result of the [ssCompact] option, which should not
620
  // be skipped, so we also check 'mtime > pLocalFset->lastCompact / 1000 || !pCfg->ssCompact', note
621
  // this is not an acurate condition, but is simple and good enough.
622
  if (mtime >= rtner->tw.ekey - tsSsUploadDelaySec && (mtime > pLocalFset->lastCompact / 1000 || !pCfg->ssCompact)) {
×
623
    tsdbInfo("vgId:%d, fid:%d, migration skipped, data file is active writting, modified at %" PRId64, vid, pLocalFset->fid, mtime);
×
624
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
625
    return false; // still active writing, postpone migration
×
626
  }
627

628
  // download manifest from shared storage
629
  STFileSet *pRemoteFset = NULL;
×
630
  *pCode = downloadManifest(rtner->tsdb->pVnode, pLocalFset->fid, &pRemoteFset);
×
631
  if (*pCode == TSDB_CODE_SUCCESS) {
×
632
    // remote file exists but local file has not been migrated, there are two possibilities:
633
    // 1. there's a compact after the last migration, this is a normal case, we can discard
634
    //    the remote files and continue the migration;
635
    // 2. in the last migration, this node was a follower, the leader did its job successfully,
636
    //    but this node did not, continue the migration may overwrite remote file and result in
637
    //    data corruption on other nodes.
638
    // however, it is hard to distinguish them, so just treat both as a migration error. hope
639
    // the user could do something to recover, such as remove remote files.
640
    if (flocal->f->lcn < 1) {
×
641
      tsdbTFileSetClear(&pRemoteFset);
×
642
      tsdbError("vgId:%d, fid:%d, migration cancelled, remote manifest found but local lcn < 1", vid, pLocalFset->fid);
×
643
      *pCode = TSDB_CODE_FILE_CORRUPTED;
×
644
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
645
      return false;
×
646
    }
647

648
  } else if (*pCode == TSDB_CODE_NOT_FOUND) {
×
649
    if (flocal->f->lcn >= 1) {
×
650
      tsdbError("vgId:%d, fid:%d, migration cancelled, remote manifest not found but local lcn >= 1", vid, pLocalFset->fid);
×
651
      *pCode = TSDB_CODE_FILE_CORRUPTED;
×
652
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
653
      return false;
×
654
    }
655

656
    // this is the first migration, we should do it.
657
    *pCode = TSDB_CODE_SUCCESS;
×
658
    return true;
×
659

660
  } else {
661
    tsdbError("vgId:%d, fid:%d, migration cancelled, failed to download manifest, code: %d", vid, pLocalFset->fid, *pCode);
×
662
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
663
    return false;
×
664
  }
665

666
  STFileObj *fremote = pRemoteFset->farr[TSDB_FTYPE_DATA];
×
667
  if (fremote == NULL) {
×
668
    tsdbError("vgId:%d, fid:%d, migration cancelled, cannot find data file information from remote manifest", vid, pLocalFset->fid);
×
669
    tsdbTFileSetClear(&pRemoteFset);
×
670
    *pCode = TSDB_CODE_FILE_CORRUPTED;
×
671
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
672
    return false;
×
673
  }
674

675
  if (fremote->f->lcn != flocal->f->lcn) {
×
676
    tsdbError("vgId:%d, fid:%d, migration cancelled, remote and local data file information mismatch", vid, pLocalFset->fid);
×
677
    tsdbTFileSetClear(&pRemoteFset);
×
678
    *pCode = TSDB_CODE_FILE_CORRUPTED;
×
679
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
680
    return false;
×
681
  }
682
  
683
  if (fremote->f->maxVer == flocal->f->maxVer) {
×
684
    tsdbTFileSetClear(&pRemoteFset);
×
685
    tsdbError("vgId:%d, fid:%d, migration skipped, no new data", vid, pLocalFset->fid);
×
686
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
687
    return false; // no new data
×
688
  }
689

690
  tsdbTFileSetClear(&pRemoteFset); // we use the local file set information for migration
×
691
  tsdbInfo("vgId:%d, fid:%d, file set will be migrated", vid, pLocalFset->fid);
×
692
  return true;
×
693
}
694

695

696
static int32_t tsdbFollowerDoSsMigrate(SRTNer *rtner) {
×
697
  int32_t code = 0, vid = TD_VID(rtner->tsdb->pVnode);
×
698
  STFileSet *fset = rtner->fset;
×
699
  SSsMigrateMonitor* pmm = rtner->tsdb->pSsMigrateMonitor;
×
700
  int32_t fsIdx = 0;
×
701

702
  // though we make this check in the leader node, we should do this in the follower nodes too.
703
  // because there may be a leader change and the execution order of async tasks may result in
704
  // different commit time. if we don't do this, we may corrupt the follower data.
705
  if (rtner->lastCommit != fset->lastCommit) {
×
706
    tsdbInfo("vgId:%d, fid:%d, follower migration cancelled, there are new commits after migration is scheduled", vid, fset->fid);
×
707
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
708
    return 0;
×
709
  }
710

711
  tsdbInfo("vgId:%d, fid:%d, vnode is follower, waiting leader on node %d to upload.", vid, fset->fid, rtner->nodeId);
×
712

713
  (void)taosThreadMutexLock(&rtner->tsdb->mutex);
×
714

715
  while(pmm->state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
716
    struct timespec ts;
717
    if ((code = taosClockGetTime(CLOCK_REALTIME, &ts)) != TSDB_CODE_SUCCESS) {
×
718
      tsdbError("vgId:%d, fid:%d, failed to get current time since %s", vid, fset->fid, tstrerror(code));
×
719
      TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
720
      return code;
×
721
    }
722
    ts.tv_sec += 30; // TODO: make it configurable
×
723
    code = taosThreadCondTimedWait(&pmm->stateChanged, &rtner->tsdb->mutex, &ts);
×
724
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
×
725
      tsdbError("vgId:%d, fid:%d, waiting leader migration timed out", vid, fset->fid);
×
726
      pmm->state = SSMIGRATE_FILESET_STATE_FAILED;
×
727
    }
728
  }
729

730
  if (pmm->state != SSMIGRATE_FILESET_STATE_SUCCEEDED) {
×
731
    TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
732
    tsdbInfo("vgId:%d, fid:%d, follower migration skipped because leader migration skipped or failed", vid, fset->fid);
×
733
    return 0;
×
734
  }
735

736
  TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
737

738
  // NOTE: After the leader node finished processing the current file set and mnode sent the final
739
  // follower migrate request, mnode waits at least 30 seconds before triggering the processing of
740
  // the next file set. Because all file sets of a vgroup shares the same [pmm], from now on, the
741
  // follower should not access [pmm->state] anymore.
742
  // refer comments in mndUpdateSsMigrateProgress for more details.
743

744
  tsdbInfo("vgId:%d, fid:%d, follower migration started, begin downloading manifest...", vid, fset->fid);
×
745
  STFileSet *pRemoteFset = NULL;
×
746
  code = downloadManifest(rtner->tsdb->pVnode, fset->fid, &pRemoteFset);
×
747
  if (code == TSDB_CODE_NOT_FOUND) {
×
748
    tsdbTFileSetClear(&pRemoteFset);
×
749
    return TSDB_CODE_FILE_CORRUPTED;
×
750
  }
751

752
  // this often happens in the catch up process of a new node, it is ok to continue, but will
753
  // result in download same files more than once, which is a waste of time and bandwidth.
754
  if (pRemoteFset->farr[TSDB_FTYPE_HEAD]->f->mid != getSsMigrateId(rtner->tsdb)) {
×
755
    tsdbTFileSetClear(&pRemoteFset);
×
756
    tsdbWarn("vgId:%d, fid:%d, follower migration cancelled, migration id mismatch", vid, fset->fid);
×
757
    return 0;
×
758
  }
759

760
  tsdbInfo("vgId:%d, fid:%d, manifest downloaded, begin downloading head file", vid, fset->fid);
×
761
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_HEAD]);
×
762
  if (code != TSDB_CODE_SUCCESS) {
×
763
    tsdbTFileSetClear(&pRemoteFset);
×
764
    return code;
×
765
  }
766
  STFileOp op = {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_HEAD]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_HEAD]->f};
×
767
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
768
  if (code != TSDB_CODE_SUCCESS) {
×
769
    tsdbTFileSetClear(&pRemoteFset);
×
770
    tsdbError("vgId:%d, fid:%d, failed to append head file operation since %s", vid, fset->fid, tstrerror(code));
×
771
    return code;
×
772
  }
773

774
  tsdbInfo("vgId:%d, fid:%d, head file downloaded, begin downloading sma file", vid, fset->fid);
×
775
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_SMA]);
×
776
  if (code != TSDB_CODE_SUCCESS) {
×
777
    tsdbTFileSetClear(&pRemoteFset);
×
778
    return code;
×
779
  }
780
  op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_SMA]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_SMA]->f};
×
781
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
782
  if (code != TSDB_CODE_SUCCESS) {
×
783
    tsdbTFileSetClear(&pRemoteFset);
×
784
    tsdbError("vgId:%d, fid:%d, failed to append sma file operation since %s", vid, fset->fid, tstrerror(code));
×
785
    return code;
×
786
  }
787

788
  tsdbInfo("vgId:%d, fid:%d, sma file downloaded, begin downloading tomb file", vid, fset->fid);
×
789
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_TOMB]);
×
790
  if (code != TSDB_CODE_SUCCESS) {
×
791
    tsdbTFileSetClear(&pRemoteFset);
×
792
    return code;
×
793
  }
794
  if (fset->farr[TSDB_FTYPE_TOMB] != NULL && pRemoteFset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
795
    op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_TOMB]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_TOMB]->f};
×
796
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
797
  } else if (fset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
798
    // the remote tomb file is not found, but local tomb file exists, we should remove it
799
    op = (STFileOp) {.optype = TSDB_FOP_REMOVE, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_TOMB]->f};
×
800
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
801
  } else if (pRemoteFset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
802
    op = (STFileOp) {.optype = TSDB_FOP_CREATE, .fid = fset->fid, .nf = *pRemoteFset->farr[TSDB_FTYPE_TOMB]->f};
×
803
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
804
  }
805
  if (code != TSDB_CODE_SUCCESS) {
×
806
    tsdbTFileSetClear(&pRemoteFset);
×
807
    tsdbError("vgId:%d, fid:%d, failed to append tomb file operation since %s", vid, fset->fid, tstrerror(code));
×
808
    return code;
×
809
  }
810

811
  tsdbInfo("vgId:%d, fid:%d, tomb file downloaded, begin downloading stt files", vid, fset->fid);
×
812
  SSttLvl* lvl;
813
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
814
    STFileObj* fobj;
815
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
816
      op = (STFileOp) {.optype = TSDB_FOP_REMOVE, .fid = fset->fid, .of = *fobj->f};
×
817
      code = TARRAY2_APPEND(&rtner->fopArr, op);
×
818
      if (code != TSDB_CODE_SUCCESS) {
×
819
        tsdbTFileSetClear(&pRemoteFset);
×
820
        tsdbError("vgId:%d, fid:%d, failed to append stt file remove operation since %s", vid, fset->fid, tstrerror(code));
×
821
        return code;
×
822
      }
823
    }
824
  }
825
  TARRAY2_FOREACH(pRemoteFset->lvlArr, lvl) {
×
826
    STFileObj* fobj;
827
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
828
      code = downloadFile(rtner, fobj);
×
829
      if (code != TSDB_CODE_SUCCESS) {
×
830
        tsdbTFileSetClear(&pRemoteFset);
×
831
        return code;
×
832
      }
833
      op = (STFileOp) {.optype = TSDB_FOP_CREATE, .fid = fset->fid, .nf = *fobj->f};
×
834
      code = TARRAY2_APPEND(&rtner->fopArr, op);
×
835
      if (code != TSDB_CODE_SUCCESS) {
×
836
        tsdbTFileSetClear(&pRemoteFset);
×
837
        tsdbError("vgId:%d, fid:%d, failed to append stt file create operation since %s", vid, fset->fid, tstrerror(code));
×
838
        return code;
×
839
      }
840
    }
841
  }
842

843
  tsdbInfo("vgId:%d, fid:%d, stt files downloaded, begin downloading data file", vid, fset->fid);
×
844
  code = downloadDataFileLastChunk(rtner, pRemoteFset->farr[TSDB_FTYPE_DATA]);
×
845
  if (code != TSDB_CODE_SUCCESS) {
×
846
    tsdbTFileSetClear(&pRemoteFset);
×
847
    return code;
×
848
  }
849
  op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_DATA]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_DATA]->f};
×
850
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
851
  if (code != TSDB_CODE_SUCCESS) {
×
852
    tsdbTFileSetClear(&pRemoteFset);
×
853
    tsdbError("vgId:%d, fid:%d, failed to append data file operation since %s", vid, fset->fid, tstrerror(code));
×
854
    return code;
×
855
  }
856

857
  tsdbInfo("vgId:%d, fid:%d, data file downloaded", vid, fset->fid);
×
858
  tsdbTFileSetClear(&pRemoteFset);
×
859
  return 0;
×
860
}
861

862

863
static int32_t tsdbLeaderDoSsMigrate(SRTNer *rtner) {
×
864
  int32_t code = 0, vid = TD_VID(rtner->tsdb->pVnode);
×
865
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
866
  STFileSet *fset = rtner->fset;
×
867

868
  tsdbInfo("vgId:%d, fid:%d, vnode is leader, migration started", vid, fset->fid);
×
869

870
  if (!shouldMigrate(rtner, &code)) {
×
871
    return code;
×
872
  }
873

874
  // head file
875
  tsdbInfo("vgId:%d, fid:%d, begin migrate head file", vid, fset->fid);
×
876
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_HEAD]);
×
877
  if (code != TSDB_CODE_SUCCESS) {
×
878
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
879
    return code;
×
880
  }
881

882
  tsdbInfo("vgId:%d, fid:%d, head file migrated, begin migrate sma file", vid, fset->fid);
×
883

884
  // sma file
885
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_SMA]);
×
886
  if (code != TSDB_CODE_SUCCESS) {
×
887
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
888
    return code;
×
889
  }
890

891
  tsdbInfo("vgId:%d, fid:%d, sma file migrated, begin migrate tomb file", vid, fset->fid);
×
892

893
  // tomb file
894
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_TOMB]);
×
895
  if (code != TSDB_CODE_SUCCESS) {
×
896
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
897
    return code;
×
898
  }
899

900
  tsdbInfo("vgId:%d, fid:%d, tomb file migrated, begin migrate stt files", vid, fset->fid);
×
901

902
  // stt files
903
  SSttLvl* lvl;
904
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
905
    STFileObj* fobj;
906
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
907
      code = uploadFile(rtner, fobj);
×
908
      if (code != TSDB_CODE_SUCCESS) {
×
909
        setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
910
        return code;
×
911
      }
912
    }
913
  }
914
  
915
  tsdbInfo("vgId:%d, fid:%d, stt files migrated, begin migrate data file", vid, fset->fid);
×
916

917
  // data file
918
  code = uploadDataFile(rtner, fset->farr[TSDB_FTYPE_DATA]);
×
919
  if (code != TSDB_CODE_SUCCESS) {
×
920
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
921
    return code;
×
922
  }
923

924
  return TSDB_CODE_SUCCESS;
×
925
}
926

927

928
int32_t tsdbDoSsMigrate(SRTNer *rtner) {
×
929
  // note: leader is decided when the task is scheduled, the actual leader may change after that,
930
  // but this is ok.
931
  if (rtner->nodeId == vnodeNodeId(rtner->tsdb->pVnode)) {
×
932
    return tsdbLeaderDoSsMigrate(rtner);
×
933
  }
934
  return tsdbFollowerDoSsMigrate(rtner);
×
935
}
936

937

938
#ifdef USE_SHARED_STORAGE
939

940
int32_t tsdbListSsMigrateFileSets(STsdb *tsdb, SArray* fidArr) {
×
941
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
×
942
  int64_t now = taosGetTimestampSec();
×
943
  SVnodeCfg *pCfg = &tsdb->pVnode->config;
×
944

945
  (void)taosThreadMutexLock(&tsdb->mutex);
×
946

947
  STFileSet *fset;
948
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
×
949
    if (tsdbFidLevel(fset->fid, &tsdb->keepCfg, now) < 0) {
×
950
      tsdbInfo("vgId:%d, fid:%d, migration skipped, file set is expired", vid, fset->fid);
×
951
      continue;
×
952
    }
953

954
    if (tsdbSsFidLevel(fset->fid, &tsdb->keepCfg, tsdb->pVnode->config.ssKeepLocal, now) < 1) {
×
955
      tsdbInfo("vgId:%d, fid:%d, migration skipped, keep local file set", vid, fset->fid);
×
956
      continue;
×
957
    }
958

959
    STFileObj *fdata = fset->farr[TSDB_FTYPE_DATA];
×
960
    if (fdata == NULL) {
×
961
      tsdbInfo("vgId:%d, fid:%d, migration skipped, no data file", vid, fset->fid);
×
962
      continue;
×
963
    }
964

965
    char path[TSDB_FILENAME_LEN];
966
    if (fdata->f->lcn <= 1) {
×
967
      strcpy(path, fdata->fname);
×
968
    } else {
969
      tsdbTFileLastChunkName(tsdb, fdata->f, path);
×
970
    }
971

972
    int64_t size = 0;
×
973
    int32_t code = taosStatFile(path, &size, NULL, NULL);
×
974
    if (code != TSDB_CODE_SUCCESS) {
×
975
      tsdbError("vgId:%d, fid:%d, migration skipped, failed to stat file since %s", vid, fset->fid, tstrerror(code));
×
976
      continue;
×
977
    }
978

979
    if (size <= (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize) {
×
980
      tsdbInfo("vgId:%d, fid:%d, migration skipped, data file is too small, size: %" PRId64 " bytes", vid, fset->fid, size);
×
981
      continue;
×
982
    }
983

984
    if (taosArrayPush(fidArr, &fset->fid) == NULL) {
×
985
      code = terrno;
×
986
      tsdbError("vgId:%d, failed to push file set id %d to array since %s", vid, fset->fid, tstrerror(code));
×
987
      break;
×
988
    }
989
  }
990

991
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
992
  return code;
×
993
}
994

995

996
static int32_t tsdbAsyncMigrateFileSetImpl(STsdb *tsdb,  const SSsMigrateFileSetReq *pReq) {
×
997
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
×
998

999
  // check if background task is disabled
1000
  if (tsdb->bgTaskDisabled) {
×
1001
    tsdbInfo("vgId:%d, ssMigrateId:%d, background task is disabled, skip", vid, pReq->ssMigrateId);
×
1002
    return TSDB_CODE_FAILED;
×
1003
  }
1004

1005
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
×
1006
  if (pmm->fid != 0 && pmm->state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
1007
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, failed to monitor since previous migration is still in progress",
×
1008
              vid, pReq->fid, pReq->ssMigrateId);
1009
    return TSDB_CODE_FAILED;
×
1010
  }
1011

UNCOV
1012
  STFileSet *fset = NULL, *fset1;
×
1013
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset1) {
×
1014
    if (fset1->fid == pReq->fid) {
×
1015
      fset = fset1;
×
1016
      break;
×
1017
    }
1018
  }
1019

1020
  if (fset == NULL) {
×
1021
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, file set not found", vid, pReq->fid, pReq->ssMigrateId);
×
1022
    return TSDB_CODE_NOT_FOUND;
×
1023
  }
1024

1025
  if (fset->lastMigrate/1000 >= pReq->startTimeSec) {
×
1026
    tsdbInfo("vgId:%d, fid:%d, ssMigrate:%d, start time < last migration time, skip", vid, fset->fid, pReq->ssMigrateId);
×
1027
    return TSDB_CODE_FAILED;
×
1028
  }
1029

1030
  SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
×
1031
  if (arg == NULL) {
×
1032
    code = terrno;
×
1033
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, memory allocation failed", vid, pReq->fid, pReq->ssMigrateId);
×
1034
    return code;
×
1035
  }
1036

1037
  arg->tsdb = tsdb;
×
1038
  arg->tw.skey = INT64_MIN;
×
1039
  arg->tw.ekey = pReq->startTimeSec;
×
1040
  arg->fid = fset->fid;
×
1041
  arg->nodeId = pReq->nodeId;
×
1042
  arg->optrType = TSDB_OPTR_SSMIGRATE;
×
1043
  arg->lastCommit = fset->lastCommit;
×
1044

1045
  pmm->ssMigrateId = pReq->ssMigrateId;
×
1046
  pmm->fid = fset->fid;
×
1047
  pmm->state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
1048
  pmm->startTimeSec = pReq->startTimeSec;
×
1049

1050
  code = vnodeAsync(RETENTION_TASK_ASYNC, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg, &fset->migrateTask);
×
1051
  if (code) {
×
1052
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, schedule async task failed", vid, pReq->fid, pReq->ssMigrateId);
×
1053
    taosMemoryFree(arg);
×
1054
    pmm->state = SSMIGRATE_FILESET_STATE_FAILED;
×
1055
  }
1056

1057
  return code;
×
1058
}
1059

1060

1061
int32_t tsdbAsyncSsMigrateFileSet(STsdb *tsdb, SSsMigrateFileSetReq *pReq) {
×
1062
  int32_t code = 0;
×
1063

1064
  (void)taosThreadMutexLock(&tsdb->mutex);
×
1065
  code = tsdbAsyncMigrateFileSetImpl(tsdb, pReq);
×
1066
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
1067

1068
  return code;
×
1069
}
1070

1071

1072
void tsdbStopSsMigrateTask(STsdb* tsdb, int32_t ssMigrateId) {
×
1073
  (void)taosThreadMutexLock(&tsdb->mutex);
×
1074

1075
  if (tsdb->pSsMigrateMonitor == NULL) {
×
1076
    (void)taosThreadMutexUnlock(&tsdb->mutex);
×
1077
    return;
×
1078
  }
1079

1080
  if (tsdb->pSsMigrateMonitor->ssMigrateId != ssMigrateId) {
×
1081
    tsdbInfo("vgId:%d, ssMigrateId:%d, migration task not found", TD_VID(tsdb->pVnode), ssMigrateId);
×
1082
    (void)taosThreadMutexUnlock(&tsdb->mutex);
×
1083
    return;
×
1084
  }
1085

1086
  if (tsdb->pSsMigrateMonitor->state != SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
1087
    (void)taosThreadMutexUnlock(&tsdb->mutex);
×
1088
    return;
×
1089
  }
1090

1091
  STFileSet *fset;
1092
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
×
1093
    if (fset->fid == tsdb->pSsMigrateMonitor->fid) {
×
1094
      (void)vnodeACancel(&fset->migrateTask);
×
1095
      break;
×
1096
    }
1097
  }
1098

1099
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
1100
}
1101

1102
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc