• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.99
/source/dnode/vnode/src/tsdb/tsdbMigrate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tss.h"
17
#include "tsdb.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSet2.h"
20
#include "vnd.h"
21
#include "tsdbInt.h"
22

23

24
extern int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool ssMigrate);
25

26

27
// migrate monitor related functions
28
typedef struct SSsMigrateMonitor {
29
  TdThreadCond  stateChanged;
30
  SVnodeSsMigrateState state;
31
} SSsMigrateMonitor;
32

33

34
static int32_t getSsMigrateId(STsdb* tsdb) {
×
35
  return tsdb->pSsMigrateMonitor->state.vnodeMigrateId;
×
36
}
37

38

39
int32_t tsdbOpenSsMigrateMonitor(STsdb *tsdb) {
52✔
40
  SSsMigrateMonitor* pmm = (SSsMigrateMonitor*)taosMemCalloc(1, sizeof(SSsMigrateMonitor));
52✔
41
  if (pmm == NULL) {
53!
42
    return TSDB_CODE_OUT_OF_MEMORY;
×
43
  }
44

45
  pmm->state.pFileSetStates = taosArrayInit(16, sizeof(SFileSetSsMigrateState));
53✔
46
  if (pmm->state.pFileSetStates == NULL) {
53!
47
    taosMemoryFree(pmm);
×
48
    return TSDB_CODE_OUT_OF_MEMORY;
×
49
  }
50

51
  TAOS_UNUSED(taosThreadCondInit(&pmm->stateChanged, NULL));
53✔
52

53
  pmm->state.dnodeId = vnodeNodeId(tsdb->pVnode);
53✔
54
  pmm->state.vgId = TD_VID(tsdb->pVnode);
53✔
55

56
  tsdb->pSsMigrateMonitor = pmm;
53✔
57
  return 0;
53✔
58
}
59

60

61
void tsdbCloseSsMigrateMonitor(STsdb *tsdb) {
53✔
62
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
53✔
63
  if (pmm == NULL) {
53!
64
    return;
×
65
  }
66

67
  TAOS_UNUSED(taosThreadCondDestroy(&pmm->stateChanged));
53✔
68
  tFreeSVnodeSsMigrateState(&pmm->state);
53✔
69
  taosMemoryFree(tsdb->pSsMigrateMonitor);
53!
70
  tsdb->pSsMigrateMonitor = NULL;
53✔
71
}
72

73
bool tsdbResetSsMigrateMonitor(STsdb *tsdb, int32_t ssMigrateId) {
×
74
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
×
75
  for (int32_t i = 0; i < taosArrayGetSize(pmm->state.pFileSetStates); i++) {
×
76
    SFileSetSsMigrateState *pState = taosArrayGet(pmm->state.pFileSetStates, i);
×
77
    if (pState->state == FILE_SET_MIGRATE_STATE_IN_PROGRESS) {
×
78
      return false;
×
79
    }
80
  }
81
  pmm->state.mnodeMigrateId = 0;
×
82
  pmm->state.vnodeMigrateId = ssMigrateId;
×
83
  pmm->state.startTimeSec = taosGetTimestampSec();
×
84
  taosArrayClear(pmm->state.pFileSetStates);
×
85
  return true;
×
86
}
87

88
int32_t tsdbSsMigrateMonitorAddFileSet(STsdb *tsdb, int32_t fid) {
×
89
  // no need to lock mutex here, since the caller should have already locked it
90
  SFileSetSsMigrateState state = { .fid = fid, .state = FILE_SET_MIGRATE_STATE_IN_PROGRESS };
×
91
  if (taosArrayPush(tsdb->pSsMigrateMonitor->state.pFileSetStates, &state) == NULL) {
×
92
    return terrno;
×
93
  }
94
  return 0;
×
95
}
96

97
void tsdbSsMigrateMonitorSetFileSetState(STsdb *tsdb, int32_t fid, int32_t state) {
×
98
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
×
99

100
  if (taosThreadMutexLock(&tsdb->mutex) != TSDB_CODE_SUCCESS) {
×
101
    return;
×
102
  }
103

104
  for(int32_t i = 0; i < taosArrayGetSize(pmm->state.pFileSetStates); i++) {
×
105
    SFileSetSsMigrateState *pState = taosArrayGet(pmm->state.pFileSetStates, i);
×
106
    if (pState->fid == fid) {
×
107
      pState->state = state;
×
108
      break;
×
109
    }
110
  }
111

112
  TAOS_UNUSED(taosThreadMutexUnlock(&tsdb->mutex));
×
113
}
114

115
int32_t tsdbQuerySsMigrateProgress(STsdb *tsdb, int32_t ssMigrateId, int32_t *rspSize, void** ppRsp) {
×
116
  int32_t code = 0;
×
117

118
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
×
119
  SVnodeSsMigrateState *pState = &pmm->state;
×
120

121
  code = taosThreadMutexLock(&tsdb->mutex);
×
122
  if (code != TSDB_CODE_SUCCESS) {
×
123
    return code;
×
124
  }
125

126
  pState->mnodeMigrateId = ssMigrateId;
×
127
  *rspSize = tSerializeSQuerySsMigrateProgressRsp(NULL, 0, pState);
×
128
  if (*rspSize < 0) {
×
129
    TAOS_UNUSED(taosThreadMutexUnlock(&tsdb->mutex));
×
130
    return TSDB_CODE_INVALID_MSG;
×
131
  }
132

133
  *ppRsp = rpcMallocCont(*rspSize);
×
134
  if (*ppRsp == NULL) {
×
135
    TAOS_UNUSED(taosThreadMutexUnlock(&tsdb->mutex));
×
136
    tsdbError("rpcMallocCont %d failed", *rspSize);
×
137
    return TSDB_CODE_OUT_OF_MEMORY;
×
138
  }
139
  TAOS_UNUSED(tSerializeSQuerySsMigrateProgressRsp(*ppRsp, *rspSize, pState));
×
140
  TAOS_UNUSED(taosThreadMutexUnlock(&tsdb->mutex));
×
141

142
  return 0;
×
143
}
144

145
int32_t tsdbUpdateSsMigrateState(STsdb* tsdb, SVnodeSsMigrateState* pState) {
×
146
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
×
147

148
  // the state was generated by this vnode, so no need to process it
149
  if (pState->dnodeId == vnodeNodeId(tsdb->pVnode)) {
×
150
      tsdbDebug("vgId:%d, skip migration state update since it was generated by this vnode", vid);
×
151
      return 0;
×
152
  }
153

154
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
×
155
  SVnodeSsMigrateState *pLocalState = &pmm->state;
×
156
  if (pLocalState == NULL) {
×
157
    tsdbDebug("vgId:%d, skip migration state update since local state not found", vid);
×
158
    return 0;
×
159
  }
160

161
  code = taosThreadMutexLock(&tsdb->mutex);
×
162
  if (code != TSDB_CODE_SUCCESS) {
×
163
    return code;
×
164
  }
165

166
  for( int32_t i = 0; i < taosArrayGetSize(pLocalState->pFileSetStates); i++) {
×
167
    SFileSetSsMigrateState *pLocalFileSet = taosArrayGet(pLocalState->pFileSetStates, i);
×
168
    if (pLocalFileSet->state != FILE_SET_MIGRATE_STATE_IN_PROGRESS) {
×
169
      continue; // only update the in-progress file sets
×
170
    }
171

172
    // a wrong case
173
    if (pState->mnodeMigrateId != pLocalState->vnodeMigrateId) {
×
174
      pLocalFileSet->state = FILE_SET_MIGRATE_STATE_FAILED;
×
175
      tsdbError("vgId:%d, fid:%d, set migration state to failed since mnode migrate id mismatch", vid, pLocalFileSet->fid);
×
176
      continue;
×
177
    }
178

179
    // another wrong case
180
    if (pState->vnodeMigrateId != pLocalState->vnodeMigrateId) {
×
181
      pLocalFileSet->state = FILE_SET_MIGRATE_STATE_FAILED;
×
182
      tsdbError("vgId:%d, fid:%d, set migration state to failed since vnode migrate id mismatch", vid, pLocalFileSet->fid);
×
183
      continue;
×
184
    }
185

186
    bool found = false;
×
187
    for( int32_t j = 0; j < taosArrayGetSize(pState->pFileSetStates); j++) {
×
188
      SFileSetSsMigrateState *pRemoteFileSet = taosArrayGet(pState->pFileSetStates, j);
×
189
      if (pLocalFileSet->fid == pRemoteFileSet->fid) {
×
190
        found = true;
×
191
        if (pRemoteFileSet->state != FILE_SET_MIGRATE_STATE_IN_PROGRESS) {
×
192
          pLocalFileSet->state = pRemoteFileSet->state;
×
193
          tsdbDebug("vgId:%d, fid:%d, migration state was updated to %d", vid, pLocalFileSet->fid, pLocalFileSet->state);
×
194
        }
195
        break;
×
196
      }
197
    }
198

199
    // the leader vnode has not this file set, so it will never be migrated, mark it as failed
200
    // to avoid waiting for forever.
201
    if (!found) {
×
202
      pLocalFileSet->state = FILE_SET_MIGRATE_STATE_FAILED;
×
203
      tsdbDebug("vgId:%d, fid:%d, set migration state to failed since remote state not found", vid, pLocalFileSet->fid);
×
204
    }
205
  }
206

207
  // always boradcast the state change, so that the waiting threads won't timeout
208
  TAOS_UNUSED(taosThreadCondBroadcast(&pmm->stateChanged));
×
209
  TAOS_UNUSED(taosThreadMutexUnlock(&tsdb->mutex));
×
210
  return 0;
×
211
}
212

213
// migrate file related functions
214
int32_t tsdbSsFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int32_t ssKeepLocal, int64_t nowSec) {
×
215
  int32_t localFid;
216
  TSKEY   key;
217

218
  if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
×
219
    nowSec = nowSec * 1000;
×
220
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
×
221
    nowSec = nowSec * 1000000l;
×
222
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
×
223
    nowSec = nowSec * 1000000000l;
×
224
  }
225

226
  nowSec = nowSec - pKeepCfg->keepTimeOffset * tsTickPerHour[pKeepCfg->precision];
×
227

228
  key = nowSec - ssKeepLocal * tsTickPerMin[pKeepCfg->precision];
×
229
  localFid = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
×
230

231
  return fid >= localFid ? 0 : 1;
×
232
}
233

234

235
static int32_t downloadManifest(SVnode* pVnode, int32_t fid, STFileSet** ppFileSet) {
×
236
  int32_t code = 0, vid = TD_VID(pVnode);
×
237

238
  char path[64];
239
  snprintf(path, sizeof(path), "vnode%d/f%d/manifests.json", vid, fid);
×
240
  int64_t size = 0;
×
241
  code = tssGetFileSizeOfDefault(path, &size);
×
242
  if (code != TSDB_CODE_SUCCESS) {
×
243
    tsdbError("vgId:%d, fid:%d, failed to get manifests size since %s", vid, fid, tstrerror(code));
×
244
    return code;
×
245
  }
246

247
  char* buf = taosMemoryMalloc(size + 1);
×
248
  code = tssReadFileFromDefault(path, 0, buf, &size);
×
249
  if (code != TSDB_CODE_SUCCESS) {
×
250
    tsdbError("vgId:%d, fid:%d, failed to read manifest from shared storage since %s", vid, fid, tstrerror(code));
×
251
    taosMemoryFree(buf);
×
252
    return code;
×
253
  }
254
  buf[size] = 0;
×
255

256
  cJSON* json = cJSON_Parse(buf);
×
257
  taosMemoryFree(buf);
×
258
  if (json == NULL) {
×
259
    tsdbError("vgId:%d, fid:%d, failed to parse manifest json since %s", vid, fid, tstrerror(code));
×
260
    return TSDB_CODE_FILE_CORRUPTED;
×
261
  }
262
  
263
  code = tsdbJsonToTFileSet(pVnode->pTsdb, json, ppFileSet);
×
264
  cJSON_Delete(json);
×
265
  if (code != TSDB_CODE_SUCCESS) {
×
266
    tsdbError("vgId:%d, fid:%d, failed to parse manifest since %s", vid, fid, tstrerror(code));
×
267
    return code;
×
268
  }
269

270
  STFileSet* fset = *ppFileSet;
×
271
  if (fset->fid != fid) {
×
272
    tsdbError("vgId:%d, fid:%d, mismatch fid, manifest fid is %d", vid, fid, fset->fid);
×
273
    tsdbTFileSetClear(ppFileSet);
×
274
    return TSDB_CODE_FILE_CORRUPTED;
×
275
  }
276
  if (fset->farr[TSDB_FTYPE_DATA] == NULL) {
×
277
    tsdbError("vgId:%d, fid:%d, data file not found in manifest", vid, fid);
×
278
    tsdbTFileSetClear(ppFileSet);
×
279
    return TSDB_CODE_FILE_CORRUPTED;
×
280
  }
281

282
  return code;
×
283
}
284

285

286
static int32_t uploadManifest(int32_t dnode, int32_t vnode, STFileSet* fset, int32_t mid) {
×
287
  int32_t code = 0;
×
288

289
  cJSON* json = cJSON_CreateObject();
×
290
  if (json == NULL) {
×
291
    return TSDB_CODE_OUT_OF_MEMORY;
×
292
  }
293

294
  // update migration id for all files in the file set
295
  STFileObj* fobj = fset->farr[TSDB_FTYPE_HEAD];
×
296
  fobj->f->mid = mid;
×
297
  fobj = fset->farr[TSDB_FTYPE_SMA];
×
298
  fobj->f->mid = mid;
×
299
  fobj = fset->farr[TSDB_FTYPE_DATA];
×
300
  fobj->f->mid = mid;
×
301
  fobj = fset->farr[TSDB_FTYPE_TOMB];
×
302
  if (fobj != NULL) {
×
303
    fobj->f->mid = mid;
×
304
  }
305

306
  SSttLvl* lvl;
307
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
308
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
309
      fobj->f->mid = mid;
×
310
    }
311
  }
312
  
313
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
×
314
    code = terrno;
×
315
    cJSON_Delete(json);
×
316
    return code;
×
317
  }
318
  if (cJSON_AddNumberToObject(json, "dnode", dnode) == NULL) {
×
319
    code = terrno;
×
320
    cJSON_Delete(json);
×
321
    return code;
×
322
  }
323
  if (cJSON_AddNumberToObject(json, "vnode", vnode) == NULL) {
×
324
    code = terrno;
×
325
    cJSON_Delete(json);
×
326
    return code;
×
327
  }
328

329
  code = tsdbTFileSetToJson(fset, json);
×
330
  if (code != TSDB_CODE_SUCCESS) {
×
331
    cJSON_Delete(json);
×
332
    return code;
×
333
  }
334
  char* buf = cJSON_PrintUnformatted(json);
×
335
  cJSON_Delete(json);
×
336

337
  char path[64];
338
  snprintf(path, sizeof(path), "vnode%d/f%d/manifests.json", vnode, fset->fid);
×
339
  code = tssUploadToDefault(path, buf, strlen(buf));
×
340
  taosMemoryFree(buf);
×
341
  if (code != TSDB_CODE_SUCCESS) {
×
342
    tsdbError("vgId:%d, fid:%d, failed to upload manifest since %s", vnode, fset->fid, tstrerror(code));
×
343
    return code;
×
344
  }
345

346
  return code;
×
347
}
348

349

350
// upload local files to shared storage
351
//
352
// local file name is like:
353
//     [base]/vnode2/f1736/v2f1736ver16.head
354
// or
355
//     [base]/vnode2/f1736/v2f1736ver16.m334233.head
356
//
357
// remote file name is like:
358
//     vnode2/f1736/v2f1736ver16.m13552343.head
359
//
360
// NOTE: the migration id is always included in the remote file name, because
361
// the commit id may be different between the vnodes of the same vgroup,
362
// that's an interrupted migration may overwrite some of the remote files
363
// while leaving the others intact if we don't include the migration id in
364
// the remote file name.
365
static int32_t uploadFile(SRTNer* rtner, STFileObj* fobj) {
×
366
  if (fobj == NULL) {
×
367
    return TSDB_CODE_SUCCESS;
×
368
  }
369

370
  const char* ext = strrchr(fobj->fname, '.');
×
371
  int32_t vid = TD_VID(rtner->tsdb->pVnode), mid = getSsMigrateId(rtner->tsdb);
×
372
  STFile* f = fobj->f;
×
373
  
374
  char path[TSDB_FILENAME_LEN];
375
  snprintf(path, sizeof(path), "vnode%d/f%d/v%df%dver%" PRId64 ".m%d%s", vid, f->fid, vid, f->fid, f->cid, mid, ext);
×
376

377
  int code = tssUploadFileToDefault(path, fobj->fname, 0, -1);
×
378
  if (code != TSDB_CODE_SUCCESS) {
×
379
    tsdbError("vgId:%d, fid:%d, failed to upload file %s since %s", vid, f->fid, fobj->fname, tstrerror(code));
×
380
    return code;
×
381
  }
382

383
  return 0;
×
384
}
385

386

387

388
static int32_t downloadFile(SRTNer* rtner, STFileObj* fobj) {
×
389
  if (fobj == NULL) {
×
390
    return TSDB_CODE_SUCCESS;
×
391
  }
392

393
  const char* fname = strrchr(fobj->fname, TD_DIRSEP_CHAR) + 1;
×
394
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
395
  STFile* f = fobj->f;
×
396
  
397
  char path[TSDB_FILENAME_LEN];
398
  snprintf(path, sizeof(path), "vnode%d/f%d/%s", vid, f->fid, fname);
×
399
  int code = tssDownloadFileFromDefault(path, fobj->fname, 0, -1);
×
400
  if (code != TSDB_CODE_SUCCESS) {
×
401
    tsdbError("vgId:%d, fid:%d, failed to download file %s since %s", vid, f->fid, path, tstrerror(code));
×
402
    return code;
×
403
  }
404

405
  return 0;
×
406
}
407

408

409
static void tsdbRemoveSsGarbageFiles(int32_t vid, STFileSet* fset) {
×
410
  char prefix[TSDB_FILENAME_LEN];
411
  snprintf(prefix, sizeof(prefix), "vnode%d/f%d/", vid, fset->fid);
×
412

413
  SArray* paths = taosArrayInit(10, sizeof(char*));
×
414
  int32_t code = tssListFileOfDefault(prefix, paths);
×
415
  if (code != TSDB_CODE_SUCCESS) {
×
416
    tsdbError("vgId:%d, fid:%d, failed to list files in shared storage since %s", vid, fset->fid, tstrerror(code));
×
417
    taosArrayDestroy(paths);
×
418
    return;
×
419
  }
420

421
  for(int i = 0; i < taosArrayGetSize(paths); i++) {
×
422
      char* p = *(char**)taosArrayGet(paths, i);
×
423
      const char* ext = strrchr(p, '.');
×
424
      const char* rname = strrchr(p, '/') + 1;
×
425
      bool remove = true;
×
426
      int32_t vgId = 0, fid = 0, mid = 0, cn = 0;
×
427
      int64_t cid = 0;
×
428
  
429
      // NOTE: when compare file name, don't use strcmp(fobj->fname, rname) because 'fobj->fname' may not
430
      // contain the migration id. that's why we use sscanf to parse the file name.
431

432
      if (ext == NULL) {
×
433
        // no extension, remove
434
      } else if (taosStrcasecmp(ext, ".head") == 0) {
×
435
        STFileObj* fobj = fset->farr[TSDB_FTYPE_HEAD];
×
436
        int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.head", &vgId, &fid, &cid, &mid);
×
437
        remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
×
438
      } else if (taosStrcasecmp(ext, ".sma") == 0) {
×
439
        STFileObj* fobj = fset->farr[TSDB_FTYPE_SMA];
×
440
        int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.sma", &vgId, &fid, &cid, &mid);
×
441
        remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
×
442
      } else if (taosStrcasecmp(ext, ".tomb") == 0) {
×
443
        STFileObj* fobj = fset->farr[TSDB_FTYPE_TOMB];
×
444
        if (fobj) {
×
445
          int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.tomb", &vgId, &fid, &cid, &mid);
×
446
          remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
×
447
        }
448
      } else if (taosStrcasecmp(ext, ".stt") == 0) {
×
449
        SSttLvl* lvl = NULL;
×
450
        TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
451
          STFileObj* fobj;
452
          TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
453
            int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.stt", &vgId, &fid, &cid, &mid);
×
454
            if (n == 4 && vgId == vid && fid == fset->fid && cid == fobj->f->cid && mid == fobj->f->mid) {
×
455
              remove = false;
×
456
              break;
×
457
            }
458
          }
459
          if (!remove) {
×
460
            break;
×
461
          }
462
        }
463
      } else if (taosStrcasecmp(ext, ".data") == 0) {
×
464
        STFileObj* fobj = fset->farr[TSDB_FTYPE_DATA];
×
465
        int n = sscanf(rname, "v%df%dver%" PRId64 ".%d.data", &vgId, &fid, &cid, &cn);
×
466
        if (n == 4) {
×
467
          if (vgId == vid && fid == fset->fid && cid == fobj->f->cid && cn >= 1 && cn < fobj->f->lcn) {
×
468
            remove = false; // not the last chunk, keep it
×
469
          }
470
        } else {
471
          n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.%d.data", &vgId, &fid, &cid, &mid, &cn);
×
472
          remove = (n != 5 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid || cn != fobj->f->lcn);
×
473
        }
474
      } else {
475
        remove = (taosStrcasecmp(rname, "manifests.json") != 0); // keep manifest, remove all other files
×
476
      }
477

478
      if (remove) {
×
479
        int32_t code = tssDeleteFileFromDefault(p);
×
480
        if (code != TSDB_CODE_SUCCESS) {
×
481
          tsdbError("vgId:%d, fid:%d, failed to remove garbage file %s from shared storage since %s", vid, fset->fid, p, tstrerror(code));
×
482
        } else {
483
          tsdbInfo("vgId:%d, fid:%d, garbage file %s is removed from shared storage", vid, fset->fid, p);
×
484
        }
485
      }
486

487
      taosMemoryFree(p);
×
488
  }
489

490
  taosArrayDestroy(paths);
×
491
}
492

493

494
// download the last chunk of a data file
495
// remote file name is like:
496
//      vnode2/f1736/v2f1736ver16.m13552343.4.data
497
static int32_t downloadDataFileLastChunk(SRTNer* rtner, STFileObj* fobj) {
×
498
  int32_t code = 0;
×
499
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
500
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
501
  STFile *f = fobj->f;
×
502

503
  char lpath[TSDB_FILENAME_LEN], rpath[TSDB_FILENAME_LEN];
504
  tsdbTFileLastChunkName(rtner->tsdb, f, lpath);
×
505
  char* fname = strrchr(lpath, TD_DIRSEP_CHAR) + 1;
×
506

507
  sprintf(rpath, "vnode%d/f%d/%s", vid, f->fid, fname);
×
508

509
  code = tssDownloadFileFromDefault(rpath, lpath, 0, -1);
×
510
  if (code != TSDB_CODE_SUCCESS) {
×
511
    tsdbError("vgId:%d, fid:%d, failed to download data file %s since %s", vid, f->fid, rpath, tstrerror(code));
×
512
    return code;
×
513
  }
514

515
  return code;
×
516
}
517

518

519
// while other files all include the migration id in the remote file name, only the last
520
// chunk of a data file does the same. this is ok because:
521
// 1. without a compaction, data file is always uploaded chunk by chunk, only the last
522
//    chunk may be modified.
523
// 2. after a compaction, all of the data is downloaded to local, so overwriting remote
524
//    data chunks won't cause any problem. (this is not likely to happen because we will
525
//    cancel the migration in this case, refer comment in function shouldMigrate).
526
static int32_t uploadDataFile(SRTNer* rtner, STFileObj* fobj) {
×
527
  int32_t code = 0;
×
528
  int32_t vid = TD_VID(rtner->tsdb->pVnode), mid = getSsMigrateId(rtner->tsdb);
×
529
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
530
  int64_t szFile = 0, szChunk = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
531
  STFile *f = fobj->f;
×
532
  STFileOp op = {.optype = TSDB_FOP_MODIFY, .fid = f->fid, .of = *f};
×
533

534
  char path[TSDB_FILENAME_LEN];
535
  if (f->lcn <= 1) {
×
536
    strcpy(path, fobj->fname);
×
537
  } else {
538
    tsdbTFileLastChunkName(rtner->tsdb, f, path);
×
539
  }
540

541
  code = taosStatFile(path, &szFile, NULL, NULL);
×
542
  if (code != TSDB_CODE_SUCCESS) {
×
543
    tsdbError("vgId:%d, fid:%d failed to stat file %s since %s", vid, f->fid, path, tstrerror(code));
×
544
    return false;
×
545
  }
546

547
  if (f->lcn > 1) {
×
548
    szFile += szChunk * (f->lcn - 1); // add the size of migrated chunks
×
549
  }
550

551
  int totalChunks = szFile / szChunk;
×
552
  if (szFile % szChunk) {
×
553
    totalChunks++;
×
554
  }
555

556
  int lcn = f->lcn < 1 ? 1 : f->lcn;
×
557

558
  // upload chunks one by one, the first chunk may already been uploaded, but may be
559
  // modified thereafter, so we need to upload it again
560
  for (int i = lcn; i <= totalChunks; ++i) {
×
561
    int64_t offset = (int64_t)(i - lcn) * szChunk;
×
562
    int64_t size = szChunk;
×
563
    if (i == totalChunks && szFile % szChunk) {
×
564
        size = szFile % szChunk;
×
565
    }
566

567
    // only include the migration id in the last chunk filename
568
    char rpath[TSDB_FILENAME_LEN];
569
    if (i == totalChunks) {
×
570
      sprintf(rpath, "vnode%d/f%d/v%df%dver%" PRId64 ".m%d.%d.data", vid, f->fid, vid, f->fid, f->cid, mid, i);
×
571
    } else {
572
      sprintf(rpath, "vnode%d/f%d/v%df%dver%" PRId64 ".%d.data", vid, f->fid, vid, f->fid, f->cid, i);
×
573
    }
574

575
    code = tssUploadFileToDefault(rpath, path, offset, size);
×
576
    if (code != TSDB_CODE_SUCCESS) {
×
577
      tsdbError("vgId:%d, fid:%d, failed to migrate data file since %s", vid, f->fid, tstrerror(code));
×
578
      return code;
×
579
    }
580
  }
581

582
  f->lcn = totalChunks;
×
583
  op.nf = *f;
×
584
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
585
  if (code != TSDB_CODE_SUCCESS) {
×
586
    tsdbError("vgId:%d, fid:%d, failed to append file operation since %s", vid, f->fid, tstrerror(code));
×
587
    return code;
×
588
  }
589

590
  // manifest must be uploaded before copy last chunk, otherwise, failed to upload manifest
591
  // will result in a broken migration
592
  tsdbInfo("vgId:%d, fid:%d, data file migrated, begin generate & upload manifest", vid, f->fid);
×
593

594
  // manifest, this also commit the migration
595
  code = uploadManifest(vnodeNodeId(rtner->tsdb->pVnode), vid, rtner->fset, getSsMigrateId(rtner->tsdb));
×
596
  if (code != TSDB_CODE_SUCCESS) {
×
597
    tsdbSsMigrateMonitorSetFileSetState(rtner->tsdb, f->fid, FILE_SET_MIGRATE_STATE_FAILED);
×
598
    return code;
×
599
  }
600

601
  tsdbInfo("vgId:%d, fid:%d, manifest uploaded, begin remove garbage files", vid, f->fid);
×
602
  tsdbRemoveSsGarbageFiles(vid, rtner->fset);
×
603

604
  tsdbSsMigrateMonitorSetFileSetState(rtner->tsdb, f->fid, FILE_SET_MIGRATE_STATE_SUCCEEDED);
×
605
  tsdbInfo("vgId:%d, fid:%d, leader migration succeeded", vid, f->fid);
×
606

607
  // no new chunks generated, no need to copy the last chunk
608
  if (totalChunks == lcn) {
×
609
    return 0;
×
610
  }
611

612
  // copy the last chunk to the new file
613
  char newPath[TSDB_FILENAME_LEN];
614
  tsdbTFileLastChunkName(rtner->tsdb, &op.nf, newPath);
×
615

616
  int64_t offset = (int64_t)(totalChunks - lcn) * szChunk;
×
617
  int64_t size = szChunk;
×
618
  if (szFile % szChunk) {
×
619
    size = szFile % szChunk;
×
620
  }
621

622
  TdFilePtr fdFrom = taosOpenFile(path, TD_FILE_READ);
×
623
  if (fdFrom == NULL) {
×
624
    code = terrno;
×
625
    tsdbError("vgId:%d, fid:%d, failed to open source file %s since %s", vid, f->fid, path, tstrerror(code));
×
626
    return code;
×
627
  }
628

629
  TdFilePtr fdTo = taosOpenFile(newPath, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
630
  if (fdTo == NULL) {
×
631
    code = terrno;
×
632
    tsdbError("vgId:%d, fid:%d, failed to open target file %s since %s", vid, f->fid, newPath, tstrerror(code));
×
633
    TAOS_UNUSED(taosCloseFile(&fdFrom));
×
634
    return code;
×
635
  }
636

637
  int64_t n = taosFSendFile(fdTo, fdFrom, &offset, size);
×
638
  if (n < 0) {
×
639
    code = terrno;
×
640
    tsdbError("vgId:%d, fid:%d, failed to copy file %s to %s since %s", vid, f->fid, path, newPath, tstrerror(code));
×
641
  }
642
  TAOS_UNUSED(taosCloseFile(&fdFrom));
×
643
  TAOS_UNUSED(taosCloseFile(&fdTo));
×
644

645
  return code;
×
646
}
647

648

649
static bool shouldMigrate(SRTNer *rtner, int32_t *pCode) {
×
650
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
651
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
652
  STFileSet *pLocalFset = rtner->fset;
×
653
  STFileObj *flocal = pLocalFset->farr[TSDB_FTYPE_DATA];
×
654

655
  *pCode = 0;
×
656
  if (!flocal) {
×
657
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, local data file not exist", vid, pLocalFset->fid);
×
658
    return false;
×
659
  }
660

661
  if (rtner->lastCommit != pLocalFset->lastCommit) {
×
662
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, there are new commits after migration task is scheduled", vid, pLocalFset->fid);
×
663
    return false;
×
664
  }
665

666
  if (pCfg->ssCompact && flocal->f->lcn < 0) {
×
667
    int32_t lcn = flocal->f->lcn;
×
668
    STimeWindow win = {0};
×
669
    tsdbFidKeyRange(pLocalFset->fid, rtner->tsdb->keepCfg.days, rtner->tsdb->keepCfg.precision, &win.skey, &win.ekey);
×
670
    *pCode = tsdbAsyncCompact(rtner->tsdb, &win, true);
×
671
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, fileset need compact, lcn: %d", vid, pLocalFset->fid, lcn);
×
672
    return false; // compact in progress
×
673
  }
674

675
  char path[TSDB_FILENAME_LEN];
676
  if (flocal->f->lcn <= 1) {
×
677
    strcpy(path, flocal->fname);
×
678
  } else {
679
    tsdbTFileLastChunkName(rtner->tsdb, flocal->f, path);
×
680
  }
681

682
  int64_t mtime = 0, size = 0;
×
683
  *pCode = taosStatFile(path, &size, &mtime, NULL);
×
684
  if (*pCode != TSDB_CODE_SUCCESS) {
×
685
    tsdbError("vgId:%d, fid:%d, migration cancelled, failed to stat file %s since %s", vid, pLocalFset->fid, path, tstrerror(*pCode));
×
686
    return false;
×
687
  }
688

689
  if (size <= (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize) {
×
690
    tsdbInfo("vgId:%d, fid:%d, migration skipped, data file is too small, size: %" PRId64 " bytes", vid, pLocalFset->fid, size);
×
691
    return false; // file too small, no need to migrate
×
692
  }
693

694
  if (mtime >= rtner->now - tsSsUploadDelaySec) {
×
695
    tsdbInfo("vgId:%d, fid:%d, migration skipped, data file is active writting, modified at %" PRId64, vid, pLocalFset->fid, mtime);
×
696
    return false; // still active writing, postpone migration
×
697
  }
698

699
  if (tsdbFidLevel(pLocalFset->fid, &rtner->tsdb->keepCfg, rtner->now) < 0) {
×
700
    tsdbInfo("vgId:%d, fid:%d, migration skipped, file set is expired", vid, pLocalFset->fid);
×
701
    return false; // file set expired
×
702
  }
703

704
  if (tsdbSsFidLevel(pLocalFset->fid, &rtner->tsdb->keepCfg, pCfg->ssKeepLocal, rtner->now) < 1) {
×
705
    tsdbInfo("vgId:%d, fid:%d, migration skipped, keep local file set", vid, pLocalFset->fid);
×
706
    return false; // keep on local storage
×
707
  }
708

709
  // download manifest from shared storage
710
  STFileSet *pRemoteFset = NULL;
×
711
  *pCode = downloadManifest(rtner->tsdb->pVnode, pLocalFset->fid, &pRemoteFset);
×
712
  if (*pCode == TSDB_CODE_SUCCESS) {
×
713
    // remote file exists but local file has not been migrated, there are two possibilities:
714
    // 1. there's a compact after the last migration, this is a normal case, we can discard
715
    //    the remote files and continue the migration;
716
    // 2. in the last migration, this node was a follower, the leader did its job successfully,
717
    //    but this node did not, continue the migration may overwrite remote file and result in
718
    //    data corruption on other nodes.
719
    // however, it is hard to distinguish them, so just treat both as a migration error. hope
720
    // the user could do something to recover, such as remove remote files.
721
    if (flocal->f->lcn < 1) {
×
722
      tsdbTFileSetClear(&pRemoteFset);
×
723
      tsdbError("vgId:%d, fid:%d, migration cancelled, remote manifest found but local lcn < 1", vid, pLocalFset->fid);
×
724
      *pCode = TSDB_CODE_FILE_CORRUPTED;
×
725
      return false;
×
726
    }
727

728
  } else if (*pCode == TSDB_CODE_NOT_FOUND) {
×
729
    if (flocal->f->lcn >= 1) {
×
730
      tsdbError("vgId:%d, fid:%d, migration cancelled, remote manifest not found but local lcn >= 1", vid, pLocalFset->fid);
×
731
      *pCode = TSDB_CODE_FILE_CORRUPTED;
×
732
      return false;
×
733
    }
734

735
    // this is the first migration, we should do it.
736
    *pCode = TSDB_CODE_SUCCESS;
×
737
    return true;
×
738

739
  } else {
740
    tsdbError("vgId:%d, fid:%d, migration cancelled, failed to download manifest, code: %d", vid, pLocalFset->fid, *pCode);
×
741
    return false;
×
742
  }
743

744
  STFileObj *fremote = pRemoteFset->farr[TSDB_FTYPE_DATA];
×
745
  if (fremote == NULL) {
×
746
    tsdbError("vgId:%d, fid:%d, migration cancelled, cannot find data file information from remote manifest", vid, pLocalFset->fid);
×
747
    tsdbTFileSetClear(&pRemoteFset);
×
748
    *pCode = TSDB_CODE_FILE_CORRUPTED;
×
749
    return false;
×
750
  }
751

752
  if (fremote->f->lcn != flocal->f->lcn) {
×
753
    tsdbError("vgId:%d, fid:%d, migration cancelled, remote and local data file information mismatch", vid, pLocalFset->fid);
×
754
    tsdbTFileSetClear(&pRemoteFset);
×
755
    *pCode = TSDB_CODE_FILE_CORRUPTED;
×
756
    return false;
×
757
  }
758
  
759
  if (fremote->f->maxVer == flocal->f->maxVer) {
×
760
    tsdbTFileSetClear(&pRemoteFset);
×
761
    tsdbError("vgId:%d, fid:%d, migration skipped, no new data", vid, pLocalFset->fid);
×
762
    return false; // no new data
×
763
  }
764

765
  tsdbTFileSetClear(&pRemoteFset); // we use the local file set information for migration
×
766
  tsdbInfo("vgId:%d, fid:%d, file set will be migrated", vid, pLocalFset->fid);
×
767
  return true;
×
768
}
769

770

771
static int32_t tsdbFollowerDoSsMigrate(SRTNer *rtner) {
×
772
  int32_t code = 0, vid = TD_VID(rtner->tsdb->pVnode);
×
773
  STFileSet *fset = rtner->fset;
×
774
  SSsMigrateMonitor* pmm = rtner->tsdb->pSsMigrateMonitor;
×
775
  SFileSetSsMigrateState *pState = NULL;
×
776
  int32_t fsIdx = 0;
×
777

778
  // though we make this check in the leader node, we should do this in the follower nodes too.
779
  // because there may be a leader change and the execution order of async tasks may result in
780
  // different commit time. if we don't do this, we may corrupt the follower data.
781
  if (rtner->lastCommit != fset->lastCommit) {
×
782
    tsdbInfo("vgId:%d, fid:%d, follower migration cancelled, there are new commits after migration is scheduled", vid, fset->fid);
×
783
    return 0;
×
784
  }
785

786
  tsdbInfo("vgId:%d, fid:%d, vnode is follower, waiting leader on node %d to upload.", vid, fset->fid, rtner->nodeId);
×
787

788
  code = taosThreadMutexLock(&rtner->tsdb->mutex);
×
789
  if (code != TSDB_CODE_SUCCESS) {
×
790
    return code;
×
791
  }
792

793
  for (; fsIdx < taosArrayGetSize(pmm->state.pFileSetStates); fsIdx++) {
×
794
    pState = taosArrayGet(pmm->state.pFileSetStates, fsIdx);
×
795
    if (pState->fid == fset->fid) {
×
796
      break;
×
797
    }
798
  }
799

800
  while(pState->state == FILE_SET_MIGRATE_STATE_IN_PROGRESS) {
×
801
    struct timespec ts;
802
    if ((code = taosClockGetTime(CLOCK_REALTIME, &ts)) != TSDB_CODE_SUCCESS) {
×
803
      tsdbError("vgId:%d, fid:%d, failed to get current time since %s", vid, fset->fid, tstrerror(code));
×
804
      TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
805
      return code;
×
806
    }
807
    ts.tv_sec += 30; // TODO: make it configurable
×
808
    code = taosThreadCondTimedWait(&pmm->stateChanged, &rtner->tsdb->mutex, &ts);
×
809
    pState = taosArrayGet(pmm->state.pFileSetStates, fsIdx);
×
810
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
×
811
      tsdbError("vgId:%d, fid:%d, waiting leader migration timed out", vid, fset->fid);
×
812
      pState->state = FILE_SET_MIGRATE_STATE_FAILED;
×
813
    }
814
  }
815

816
  TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
817

818
  if (pState->state != FILE_SET_MIGRATE_STATE_SUCCEEDED) {
×
819
    tsdbInfo("vgId:%d, fid:%d, follower migration skipped because leader migration skipped or failed", vid, fset->fid);
×
820
    return 0;
×
821
  }
822

823
  tsdbInfo("vgId:%d, fid:%d, follower migration started, begin downloading manifest...", vid, fset->fid);
×
824
  STFileSet *pRemoteFset = NULL;
×
825
  code = downloadManifest(rtner->tsdb->pVnode, fset->fid, &pRemoteFset);
×
826
  if (code == TSDB_CODE_NOT_FOUND) {
×
827
    tsdbTFileSetClear(&pRemoteFset);
×
828
    return TSDB_CODE_FILE_CORRUPTED;
×
829
  }
830

831
  // this often happens in the catch up process of a new node, it is ok to continue, but will
832
  // result in download same files more than once, which is a waste of time and bandwidth.
833
  if (pRemoteFset->farr[TSDB_FTYPE_HEAD]->f->mid != getSsMigrateId(rtner->tsdb)) {
×
834
    tsdbTFileSetClear(&pRemoteFset);
×
835
    tsdbWarn("vgId:%d, fid:%d, follower migration cancelled, migration id mismatch", vid, fset->fid);
×
836
    return 0;
×
837
  }
838

839
  tsdbInfo("vgId:%d, fid:%d, manifest downloaded, begin downloading head file", vid, fset->fid);
×
840
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_HEAD]);
×
841
  if (code != TSDB_CODE_SUCCESS) {
×
842
    tsdbTFileSetClear(&pRemoteFset);
×
843
    return code;
×
844
  }
845
  STFileOp op = {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_HEAD]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_HEAD]->f};
×
846
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
847
  if (code != TSDB_CODE_SUCCESS) {
×
848
    tsdbTFileSetClear(&pRemoteFset);
×
849
    tsdbError("vgId:%d, fid:%d, failed to append head file operation since %s", vid, fset->fid, tstrerror(code));
×
850
    return code;
×
851
  }
852

853
  tsdbInfo("vgId:%d, fid:%d, head file downloaded, begin downloading sma file", vid, fset->fid);
×
854
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_SMA]);
×
855
  if (code != TSDB_CODE_SUCCESS) {
×
856
    tsdbTFileSetClear(&pRemoteFset);
×
857
    return code;
×
858
  }
859
  op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_SMA]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_SMA]->f};
×
860
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
861
  if (code != TSDB_CODE_SUCCESS) {
×
862
    tsdbTFileSetClear(&pRemoteFset);
×
863
    tsdbError("vgId:%d, fid:%d, failed to append sma file operation since %s", vid, fset->fid, tstrerror(code));
×
864
    return code;
×
865
  }
866

867
  tsdbInfo("vgId:%d, fid:%d, sma file downloaded, begin downloading tomb file", vid, fset->fid);
×
868
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_TOMB]);
×
869
  if (code != TSDB_CODE_SUCCESS) {
×
870
    tsdbTFileSetClear(&pRemoteFset);
×
871
    return code;
×
872
  }
873
  if (fset->farr[TSDB_FTYPE_TOMB] != NULL && pRemoteFset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
874
    op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_TOMB]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_TOMB]->f};
×
875
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
876
  } else if (fset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
877
    // the remote tomb file is not found, but local tomb file exists, we should remove it
878
    op = (STFileOp) {.optype = TSDB_FOP_REMOVE, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_TOMB]->f};
×
879
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
880
  } else if (pRemoteFset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
881
    op = (STFileOp) {.optype = TSDB_FOP_CREATE, .fid = fset->fid, .nf = *pRemoteFset->farr[TSDB_FTYPE_TOMB]->f};
×
882
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
883
  }
884
  if (code != TSDB_CODE_SUCCESS) {
×
885
    tsdbTFileSetClear(&pRemoteFset);
×
886
    tsdbError("vgId:%d, fid:%d, failed to append tomb file operation since %s", vid, fset->fid, tstrerror(code));
×
887
    return code;
×
888
  }
889

890
  tsdbInfo("vgId:%d, fid:%d, tomb file downloaded, begin downloading stt files", vid, fset->fid);
×
891
  SSttLvl* lvl;
892
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
893
    STFileObj* fobj;
894
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
895
      op = (STFileOp) {.optype = TSDB_FOP_REMOVE, .fid = fset->fid, .of = *fobj->f};
×
896
      code = TARRAY2_APPEND(&rtner->fopArr, op);
×
897
      if (code != TSDB_CODE_SUCCESS) {
×
898
        tsdbTFileSetClear(&pRemoteFset);
×
899
        tsdbError("vgId:%d, fid:%d, failed to append stt file remove operation since %s", vid, fset->fid, tstrerror(code));
×
900
        return code;
×
901
      }
902
    }
903
  }
904
  TARRAY2_FOREACH(pRemoteFset->lvlArr, lvl) {
×
905
    STFileObj* fobj;
906
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
907
      code = downloadFile(rtner, fobj);
×
908
      if (code != TSDB_CODE_SUCCESS) {
×
909
        tsdbTFileSetClear(&pRemoteFset);
×
910
        return code;
×
911
      }
912
      op = (STFileOp) {.optype = TSDB_FOP_CREATE, .fid = fset->fid, .nf = *fobj->f};
×
913
      code = TARRAY2_APPEND(&rtner->fopArr, op);
×
914
      if (code != TSDB_CODE_SUCCESS) {
×
915
        tsdbTFileSetClear(&pRemoteFset);
×
916
        tsdbError("vgId:%d, fid:%d, failed to append stt file create operation since %s", vid, fset->fid, tstrerror(code));
×
917
        return code;
×
918
      }
919
    }
920
  }
921

922
  tsdbInfo("vgId:%d, fid:%d, stt files downloaded, begin downloading data file", vid, fset->fid);
×
923
  code = downloadDataFileLastChunk(rtner, pRemoteFset->farr[TSDB_FTYPE_DATA]);
×
924
  if (code != TSDB_CODE_SUCCESS) {
×
925
    tsdbTFileSetClear(&pRemoteFset);
×
926
    return code;
×
927
  }
928
  op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_DATA]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_DATA]->f};
×
929
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
930
  if (code != TSDB_CODE_SUCCESS) {
×
931
    tsdbTFileSetClear(&pRemoteFset);
×
932
    tsdbError("vgId:%d, fid:%d, failed to append data file operation since %s", vid, fset->fid, tstrerror(code));
×
933
    return code;
×
934
  }
935

936
  tsdbInfo("vgId:%d, fid:%d, data file downloaded", vid, fset->fid);
×
937
  tsdbTFileSetClear(&pRemoteFset);
×
938
  return 0;
×
939
}
940

941

942
static int32_t tsdbLeaderDoSsMigrate(SRTNer *rtner) {
×
943
  int32_t code = 0, vid = TD_VID(rtner->tsdb->pVnode);
×
944
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
945
  STFileSet *fset = rtner->fset;
×
946

947
  tsdbInfo("vgId:%d, fid:%d, vnode is leader, migration started", vid, fset->fid);
×
948

949
  if (!shouldMigrate(rtner, &code)) {
×
950
    int32_t state = (code == TSDB_CODE_SUCCESS) ? FILE_SET_MIGRATE_STATE_SKIPPED : FILE_SET_MIGRATE_STATE_FAILED;
×
951
    tsdbSsMigrateMonitorSetFileSetState(rtner->tsdb, fset->fid, state);
×
952
    return code;
×
953
  }
954

955
  // head file
956
  tsdbInfo("vgId:%d, fid:%d, begin migrate head file", vid, fset->fid);
×
957
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_HEAD]);
×
958
  if (code != TSDB_CODE_SUCCESS) {
×
959
    tsdbSsMigrateMonitorSetFileSetState(rtner->tsdb, fset->fid, FILE_SET_MIGRATE_STATE_FAILED);
×
960
    return code;
×
961
  }
962

963
  tsdbInfo("vgId:%d, fid:%d, head file migrated, begin migrate sma file", vid, fset->fid);
×
964

965
  // sma file
966
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_SMA]);
×
967
  if (code != TSDB_CODE_SUCCESS) {
×
968
    tsdbSsMigrateMonitorSetFileSetState(rtner->tsdb, fset->fid, FILE_SET_MIGRATE_STATE_FAILED);
×
969
    return code;
×
970
  }
971

972
  tsdbInfo("vgId:%d, fid:%d, sma file migrated, begin migrate tomb file", vid, fset->fid);
×
973

974
  // tomb file
975
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_TOMB]);
×
976
  if (code != TSDB_CODE_SUCCESS) {
×
977
    tsdbSsMigrateMonitorSetFileSetState(rtner->tsdb, fset->fid, FILE_SET_MIGRATE_STATE_FAILED);
×
978
    return code;
×
979
  }
980

981
  tsdbInfo("vgId:%d, fid:%d, tomb file migrated, begin migrate stt files", vid, fset->fid);
×
982

983
  // stt files
984
  SSttLvl* lvl;
985
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
986
    STFileObj* fobj;
987
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
988
      code = uploadFile(rtner, fobj);
×
989
      if (code != TSDB_CODE_SUCCESS) {
×
990
        tsdbSsMigrateMonitorSetFileSetState(rtner->tsdb, fset->fid, FILE_SET_MIGRATE_STATE_FAILED);
×
991
        return code;
×
992
      }
993
    }
994
  }
995
  
996
  tsdbInfo("vgId:%d, fid:%d, stt files migrated, begin migrate data file", vid, fset->fid);
×
997

998
  // data file
999
  code = uploadDataFile(rtner, fset->farr[TSDB_FTYPE_DATA]);
×
1000
  if (code != TSDB_CODE_SUCCESS) {
×
1001
    tsdbSsMigrateMonitorSetFileSetState(rtner->tsdb, fset->fid, FILE_SET_MIGRATE_STATE_FAILED);
×
1002
    return code;
×
1003
  }
1004

1005
  return TSDB_CODE_SUCCESS;
×
1006
}
1007

1008

1009
int32_t tsdbDoSsMigrate(SRTNer *rtner) {
×
1010
  // note: leader is decided when the task is scheduled, the actual leader may change after that,
1011
  // but this is ok.
1012
  if (rtner->nodeId == vnodeNodeId(rtner->tsdb->pVnode)) {
×
1013
    return tsdbLeaderDoSsMigrate(rtner);
×
1014
  }
1015
  return tsdbFollowerDoSsMigrate(rtner);
×
1016
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc