• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4688

26 Aug 2025 02:05PM UTC coverage: 56.997% (-0.9%) from 57.894%
#4688

push

travis-ci

web-flow
fix: modify the prompt language of the taos-shell (#32758)

* fix: modify prompt language

* fix: add shell test case

* fix: modify comments

* fix: modify test case for TDengine TSDB

130660 of 292423 branches covered (44.68%)

Branch coverage included in aggregate %.

16 of 17 new or added lines in 2 files covered. (94.12%)

9459 existing lines in 157 files now uncovered.

198294 of 284715 relevant lines covered (69.65%)

4532552.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.41
/source/dnode/vnode/src/tsdb/tsdbMigrate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tss.h"
17
#include "tsdb.h"
18
#include "tsdbFS2.h"
19
#include "tsdbFSet2.h"
20
#include "vnd.h"
21
#include "tsdbInt.h"
22

23

24
extern int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool ssMigrate);
25

26

27
// migrate monitor related functions
28
typedef struct SSsMigrateMonitor {
29
  TdThreadCond  stateChanged;
30
  int32_t ssMigrateId;
31
  int32_t fid;
32
  int32_t state;
33
  int64_t startTimeSec;
34
} SSsMigrateMonitor;
35

36

UNCOV
37
static int32_t getSsMigrateId(STsdb* tsdb) {
×
UNCOV
38
  return tsdb->pSsMigrateMonitor->ssMigrateId;
×
39
}
40

41

42
int32_t tsdbOpenSsMigrateMonitor(STsdb *tsdb) {
12,121✔
43
  SSsMigrateMonitor* pmm = (SSsMigrateMonitor*)taosMemCalloc(1, sizeof(SSsMigrateMonitor));
12,121✔
44
  if (pmm == NULL) {
12,120!
UNCOV
45
    return TSDB_CODE_OUT_OF_MEMORY;
×
46
  }
47
  TAOS_UNUSED(taosThreadCondInit(&pmm->stateChanged, NULL));
12,120✔
48
  tsdb->pSsMigrateMonitor = pmm;
12,118✔
49
  return 0;
12,118✔
50
}
51

52

53
void tsdbCloseSsMigrateMonitor(STsdb *tsdb) {
12,112✔
54
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
12,112✔
55
  if (pmm == NULL) {
12,112!
UNCOV
56
    return;
×
57
  }
58

59
  TAOS_UNUSED(taosThreadCondDestroy(&pmm->stateChanged));
12,112✔
60
  taosMemoryFree(tsdb->pSsMigrateMonitor);
12,116!
61
  tsdb->pSsMigrateMonitor = NULL;
12,119✔
62
}
63

64

UNCOV
65
static void setMigrationState(STsdb* tsdb, int32_t state) {
×
UNCOV
66
  (void)taosThreadMutexLock(&tsdb->mutex);
×
UNCOV
67
  tsdb->pSsMigrateMonitor->state = state;
×
UNCOV
68
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
UNCOV
69
}
×
70

71

UNCOV
72
int32_t tsdbQuerySsMigrateProgress(STsdb *tsdb, SSsMigrateProgress *pProgress) {
×
73
  int32_t code = 0, vid = TD_VID(tsdb->pVnode);
×
74

75
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
×
76

77
  (void)taosThreadMutexLock(&tsdb->mutex);
×
78

UNCOV
79
  if (pmm->ssMigrateId != pProgress->ssMigrateId) {
×
UNCOV
80
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, migrate id mismatch in query progress, actual %d",
×
81
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->ssMigrateId);
82
    code = TSDB_CODE_INVALID_MSG;
×
83
  } else if (pmm->fid != pProgress->fid) {
×
84
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, file set id mismatch in query progress, actual %d",
×
85
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->fid);
UNCOV
86
    code = TSDB_CODE_INVALID_MSG;
×
87
  } else {
88
    pProgress->state = pmm->state;
×
89
  }
90

91
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
92

UNCOV
93
  return code;
×
94
}
95

96

97

98
int32_t tsdbUpdateSsMigrateProgress(STsdb* tsdb, SSsMigrateProgress* pProgress) {
×
UNCOV
99
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
×
100
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
×
101

102
  // the state was generated by this vnode, so no need to process it
UNCOV
103
  if (pProgress->nodeId == vnodeNodeId(tsdb->pVnode)) {
×
104
    tsdbDebug("vgId:%d, skip migration progress update since it was generated by this vnode", vid);
×
105
    return 0;
×
106
  }
107

108
  (void)taosThreadMutexLock(&tsdb->mutex);
×
109

UNCOV
110
  if (pmm->ssMigrateId != pProgress->ssMigrateId) {
×
UNCOV
111
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, migrate id mismatch in update progress, actual %d",
×
112
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->ssMigrateId);
UNCOV
113
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
114
  } else if (pmm->fid != pProgress->fid) {
×
115
    tsdbError("vgId:%d, ssMigrateId:%d, fid:%d, file set id mismatch in update progress, actual %d",
×
116
              vid, pProgress->ssMigrateId, pProgress->fid, pmm->fid);
UNCOV
117
    code = TSDB_CODE_INVALID_MSG;
×
118
  } else {
119
    // update the state, and broadcast state change message, to avoid the timeout of
120
    // the waiting thread, we should always broadcast the message even if the state
121
    // is not changed actually.
122
    pmm->state = pProgress->state;
×
123
    (void)taosThreadCondBroadcast(&pmm->stateChanged);
×
124
  }
125

126
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
127

128
  return code;
×
129
}
130

131

132

133
// migrate file related functions
134
int32_t tsdbSsFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int32_t ssKeepLocal, int64_t nowSec) {
×
135
  int32_t localFid;
136
  TSKEY   key;
137

UNCOV
138
  if (pKeepCfg->precision == TSDB_TIME_PRECISION_MILLI) {
×
139
    nowSec = nowSec * 1000;
×
140
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_MICRO) {
×
UNCOV
141
    nowSec = nowSec * 1000000l;
×
142
  } else if (pKeepCfg->precision == TSDB_TIME_PRECISION_NANO) {
×
UNCOV
143
    nowSec = nowSec * 1000000000l;
×
144
  }
145

146
  nowSec = nowSec - pKeepCfg->keepTimeOffset * tsTickPerHour[pKeepCfg->precision];
×
147

UNCOV
148
  key = nowSec - ssKeepLocal * tsTickPerMin[pKeepCfg->precision];
×
149
  localFid = tsdbKeyFid(key, pKeepCfg->days, pKeepCfg->precision);
×
150

151
  return fid >= localFid ? 0 : 1;
×
152
}
153

154

155
static int32_t downloadManifest(SVnode* pVnode, int32_t fid, STFileSet** ppFileSet) {
×
156
  int32_t code = 0, vid = TD_VID(pVnode);
×
157

158
  char path[64];
UNCOV
159
  snprintf(path, sizeof(path), "vnode%d/f%d/manifests.json", vid, fid);
×
UNCOV
160
  int64_t size = 0;
×
161
  code = tssGetFileSizeOfDefault(path, &size);
×
162
  if (code != TSDB_CODE_SUCCESS) {
×
163
    tsdbError("vgId:%d, fid:%d, failed to get manifests size since %s", vid, fid, tstrerror(code));
×
UNCOV
164
    return code;
×
165
  }
166

167
  char* buf = taosMemoryMalloc(size + 1);
×
168
  code = tssReadFileFromDefault(path, 0, buf, &size);
×
169
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
170
    tsdbError("vgId:%d, fid:%d, failed to read manifest from shared storage since %s", vid, fid, tstrerror(code));
×
UNCOV
171
    taosMemoryFree(buf);
×
UNCOV
172
    return code;
×
173
  }
174
  buf[size] = 0;
×
175

176
  cJSON* json = cJSON_Parse(buf);
×
UNCOV
177
  taosMemoryFree(buf);
×
UNCOV
178
  if (json == NULL) {
×
UNCOV
179
    tsdbError("vgId:%d, fid:%d, failed to parse manifest json since %s", vid, fid, tstrerror(code));
×
180
    return TSDB_CODE_FILE_CORRUPTED;
×
181
  }
182
  
183
  code = tsdbJsonToTFileSet(pVnode->pTsdb, json, ppFileSet);
×
UNCOV
184
  cJSON_Delete(json);
×
UNCOV
185
  if (code != TSDB_CODE_SUCCESS) {
×
186
    tsdbError("vgId:%d, fid:%d, failed to parse manifest since %s", vid, fid, tstrerror(code));
×
187
    return code;
×
188
  }
189

190
  STFileSet* fset = *ppFileSet;
×
191
  if (fset->fid != fid) {
×
192
    tsdbError("vgId:%d, fid:%d, mismatch fid, manifest fid is %d", vid, fid, fset->fid);
×
193
    tsdbTFileSetClear(ppFileSet);
×
UNCOV
194
    return TSDB_CODE_FILE_CORRUPTED;
×
195
  }
UNCOV
196
  if (fset->farr[TSDB_FTYPE_DATA] == NULL) {
×
UNCOV
197
    tsdbError("vgId:%d, fid:%d, data file not found in manifest", vid, fid);
×
UNCOV
198
    tsdbTFileSetClear(ppFileSet);
×
UNCOV
199
    return TSDB_CODE_FILE_CORRUPTED;
×
200
  }
201

202
  return code;
×
203
}
204

205

UNCOV
206
static int32_t uploadManifest(int32_t dnode, int32_t vnode, STFileSet* fset, int32_t mid) {
×
UNCOV
207
  int32_t code = 0;
×
208

209
  cJSON* json = cJSON_CreateObject();
×
210
  if (json == NULL) {
×
UNCOV
211
    return TSDB_CODE_OUT_OF_MEMORY;
×
212
  }
213

214
  // update migration id for all files in the file set
UNCOV
215
  STFileObj* fobj = fset->farr[TSDB_FTYPE_HEAD];
×
UNCOV
216
  fobj->f->mid = mid;
×
UNCOV
217
  fobj = fset->farr[TSDB_FTYPE_SMA];
×
218
  fobj->f->mid = mid;
×
219
  fobj = fset->farr[TSDB_FTYPE_DATA];
×
220
  fobj->f->mid = mid;
×
221
  fobj = fset->farr[TSDB_FTYPE_TOMB];
×
222
  if (fobj != NULL) {
×
223
    fobj->f->mid = mid;
×
224
  }
225

226
  SSttLvl* lvl;
UNCOV
227
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
228
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
229
      fobj->f->mid = mid;
×
230
    }
231
  }
232
  
UNCOV
233
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
×
UNCOV
234
    code = terrno;
×
235
    cJSON_Delete(json);
×
236
    return code;
×
237
  }
UNCOV
238
  if (cJSON_AddNumberToObject(json, "dnode", dnode) == NULL) {
×
239
    code = terrno;
×
240
    cJSON_Delete(json);
×
241
    return code;
×
242
  }
243
  if (cJSON_AddNumberToObject(json, "vnode", vnode) == NULL) {
×
244
    code = terrno;
×
UNCOV
245
    cJSON_Delete(json);
×
UNCOV
246
    return code;
×
247
  }
248

249
  code = tsdbTFileSetToJson(fset, json);
×
250
  if (code != TSDB_CODE_SUCCESS) {
×
251
    cJSON_Delete(json);
×
252
    return code;
×
253
  }
254
  char* buf = cJSON_PrintUnformatted(json);
×
UNCOV
255
  cJSON_Delete(json);
×
256

257
  char path[64];
258
  snprintf(path, sizeof(path), "vnode%d/f%d/manifests.json", vnode, fset->fid);
×
259
  code = tssUploadToDefault(path, buf, strlen(buf));
×
260
  taosMemoryFree(buf);
×
UNCOV
261
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
262
    tsdbError("vgId:%d, fid:%d, failed to upload manifest since %s", vnode, fset->fid, tstrerror(code));
×
263
    return code;
×
264
  }
265

266
  return code;
×
267
}
268

269

270
// upload local files to shared storage
271
//
272
// local file name is like:
273
//     [base]/vnode2/f1736/v2f1736ver16.head
274
// or
275
//     [base]/vnode2/f1736/v2f1736ver16.m334233.head
276
//
277
// remote file name is like:
278
//     vnode2/f1736/v2f1736ver16.m13552343.head
279
//
280
// NOTE: the migration id is always included in the remote file name, because
281
// the commit id may be different between the vnodes of the same vgroup,
282
// that's an interrupted migration may overwrite some of the remote files
283
// while leaving the others intact if we don't include the migration id in
284
// the remote file name.
UNCOV
285
static int32_t uploadFile(SRTNer* rtner, STFileObj* fobj) {
×
286
  if (fobj == NULL) {
×
287
    return TSDB_CODE_SUCCESS;
×
288
  }
289

290
  const char* ext = strrchr(fobj->fname, '.');
×
291
  int32_t vid = TD_VID(rtner->tsdb->pVnode), mid = getSsMigrateId(rtner->tsdb);
×
UNCOV
292
  STFile* f = fobj->f;
×
293
  
294
  char path[TSDB_FILENAME_LEN];
295
  snprintf(path, sizeof(path), "vnode%d/f%d/v%df%dver%" PRId64 ".m%d%s", vid, f->fid, vid, f->fid, f->cid, mid, ext);
×
296

297
  int code = tssUploadFileToDefault(path, fobj->fname, 0, -1);
×
298
  if (code != TSDB_CODE_SUCCESS) {
×
299
    tsdbError("vgId:%d, fid:%d, failed to upload file %s since %s", vid, f->fid, fobj->fname, tstrerror(code));
×
300
    return code;
×
301
  }
302

303
  return 0;
×
304
}
305

306

307

308
static int32_t downloadFile(SRTNer* rtner, STFileObj* fobj) {
×
309
  if (fobj == NULL) {
×
UNCOV
310
    return TSDB_CODE_SUCCESS;
×
311
  }
312

313
  const char* fname = strrchr(fobj->fname, TD_DIRSEP_CHAR) + 1;
×
314
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
315
  STFile* f = fobj->f;
×
316
  
317
  char path[TSDB_FILENAME_LEN];
318
  snprintf(path, sizeof(path), "vnode%d/f%d/%s", vid, f->fid, fname);
×
319
  int code = tssDownloadFileFromDefault(path, fobj->fname, 0, -1);
×
320
  if (code != TSDB_CODE_SUCCESS) {
×
321
    tsdbError("vgId:%d, fid:%d, failed to download file %s since %s", vid, f->fid, path, tstrerror(code));
×
UNCOV
322
    return code;
×
323
  }
324

325
  return 0;
×
326
}
327

328

329
static void tsdbRemoveSsGarbageFiles(int32_t vid, STFileSet* fset) {
×
330
  char prefix[TSDB_FILENAME_LEN];
331
  snprintf(prefix, sizeof(prefix), "vnode%d/f%d/", vid, fset->fid);
×
332

UNCOV
333
  SArray* paths = taosArrayInit(10, sizeof(char*));
×
334
  int32_t code = tssListFileOfDefault(prefix, paths);
×
335
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
336
    tsdbError("vgId:%d, fid:%d, failed to list files in shared storage since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
337
    taosArrayDestroy(paths);
×
338
    return;
×
339
  }
340

341
  for(int i = 0; i < taosArrayGetSize(paths); i++) {
×
342
      char* p = *(char**)taosArrayGet(paths, i);
×
343
      const char* ext = strrchr(p, '.');
×
UNCOV
344
      const char* rname = strrchr(p, '/') + 1;
×
UNCOV
345
      bool remove = true;
×
346
      int32_t vgId = 0, fid = 0, mid = 0, cn = 0;
×
UNCOV
347
      int64_t cid = 0;
×
348
  
349
      // NOTE: when compare file name, don't use strcmp(fobj->fname, rname) because 'fobj->fname' may not
350
      // contain the migration id. that's why we use sscanf to parse the file name.
351

UNCOV
352
      if (ext == NULL) {
×
353
        // no extension, remove
UNCOV
354
      } else if (taosStrcasecmp(ext, ".head") == 0) {
×
UNCOV
355
        STFileObj* fobj = fset->farr[TSDB_FTYPE_HEAD];
×
UNCOV
356
        int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.head", &vgId, &fid, &cid, &mid);
×
UNCOV
357
        remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
×
UNCOV
358
      } else if (taosStrcasecmp(ext, ".sma") == 0) {
×
UNCOV
359
        STFileObj* fobj = fset->farr[TSDB_FTYPE_SMA];
×
UNCOV
360
        int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.sma", &vgId, &fid, &cid, &mid);
×
UNCOV
361
        remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
×
UNCOV
362
      } else if (taosStrcasecmp(ext, ".tomb") == 0) {
×
UNCOV
363
        STFileObj* fobj = fset->farr[TSDB_FTYPE_TOMB];
×
UNCOV
364
        if (fobj) {
×
365
          int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.tomb", &vgId, &fid, &cid, &mid);
×
366
          remove = (n != 4 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid);
×
367
        }
UNCOV
368
      } else if (taosStrcasecmp(ext, ".stt") == 0) {
×
UNCOV
369
        SSttLvl* lvl = NULL;
×
370
        TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
371
          STFileObj* fobj;
372
          TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
UNCOV
373
            int n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.stt", &vgId, &fid, &cid, &mid);
×
UNCOV
374
            if (n == 4 && vgId == vid && fid == fset->fid && cid == fobj->f->cid && mid == fobj->f->mid) {
×
375
              remove = false;
×
UNCOV
376
              break;
×
377
            }
378
          }
379
          if (!remove) {
×
380
            break;
×
381
          }
382
        }
383
      } else if (taosStrcasecmp(ext, ".data") == 0) {
×
UNCOV
384
        STFileObj* fobj = fset->farr[TSDB_FTYPE_DATA];
×
UNCOV
385
        int n = sscanf(rname, "v%df%dver%" PRId64 ".%d.data", &vgId, &fid, &cid, &cn);
×
UNCOV
386
        if (n == 4) {
×
UNCOV
387
          if (vgId == vid && fid == fset->fid && cid == fobj->f->cid && cn >= 1 && cn < fobj->f->lcn) {
×
388
            remove = false; // not the last chunk, keep it
×
389
          }
390
        } else {
UNCOV
391
          n = sscanf(rname, "v%df%dver%" PRId64 ".m%d.%d.data", &vgId, &fid, &cid, &mid, &cn);
×
UNCOV
392
          remove = (n != 5 || vgId != vid || fid != fset->fid || cid != fobj->f->cid || mid != fobj->f->mid || cn != fobj->f->lcn);
×
393
        }
394
      } else {
395
        remove = (taosStrcasecmp(rname, "manifests.json") != 0); // keep manifest, remove all other files
×
396
      }
397

398
      if (remove) {
×
399
        int32_t code = tssDeleteFileFromDefault(p);
×
400
        if (code != TSDB_CODE_SUCCESS) {
×
401
          tsdbError("vgId:%d, fid:%d, failed to remove garbage file %s from shared storage since %s", vid, fset->fid, p, tstrerror(code));
×
402
        } else {
UNCOV
403
          tsdbInfo("vgId:%d, fid:%d, garbage file %s is removed from shared storage", vid, fset->fid, p);
×
404
        }
405
      }
406

UNCOV
407
      taosMemoryFree(p);
×
408
  }
409

UNCOV
410
  taosArrayDestroy(paths);
×
411
}
412

413

414
// download the last chunk of a data file
415
// remote file name is like:
416
//      vnode2/f1736/v2f1736ver16.m13552343.4.data
417
static int32_t downloadDataFileLastChunk(SRTNer* rtner, STFileObj* fobj) {
×
418
  int32_t code = 0;
×
UNCOV
419
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
UNCOV
420
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
421
  STFile *f = fobj->f;
×
422

423
  char lpath[TSDB_FILENAME_LEN], rpath[TSDB_FILENAME_LEN];
424
  tsdbTFileLastChunkName(rtner->tsdb, f, lpath);
×
425
  char* fname = strrchr(lpath, TD_DIRSEP_CHAR) + 1;
×
426

427
  sprintf(rpath, "vnode%d/f%d/%s", vid, f->fid, fname);
×
428

UNCOV
429
  code = tssDownloadFileFromDefault(rpath, lpath, 0, -1);
×
UNCOV
430
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
431
    tsdbError("vgId:%d, fid:%d, failed to download data file %s since %s", vid, f->fid, rpath, tstrerror(code));
×
432
    return code;
×
433
  }
434

435
  return code;
×
436
}
437

438

439
// while other files all include the migration id in the remote file name, only the last
440
// chunk of a data file does the same. this is ok because:
441
// 1. without a compaction, data file is always uploaded chunk by chunk, only the last
442
//    chunk may be modified.
443
// 2. after a compaction, all of the data is downloaded to local, so overwriting remote
444
//    data chunks won't cause any problem. (this is not likely to happen because we will
445
//    cancel the migration in this case, refer comment in function shouldMigrate).
446
static int32_t uploadDataFile(SRTNer* rtner, STFileObj* fobj) {
×
UNCOV
447
  int32_t code = 0;
×
448
  int32_t vid = TD_VID(rtner->tsdb->pVnode), mid = getSsMigrateId(rtner->tsdb);
×
449
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
450
  int64_t szFile = 0, szChunk = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
×
UNCOV
451
  STFile *f = fobj->f;
×
452
  STFileOp op = {.optype = TSDB_FOP_MODIFY, .fid = f->fid, .of = *f};
×
453

454
  char path[TSDB_FILENAME_LEN];
455
  if (f->lcn <= 1) {
×
456
    strcpy(path, fobj->fname);
×
457
  } else {
UNCOV
458
    tsdbTFileLastChunkName(rtner->tsdb, f, path);
×
459
  }
460

UNCOV
461
  code = taosStatFile(path, &szFile, NULL, NULL);
×
UNCOV
462
  if (code != TSDB_CODE_SUCCESS) {
×
463
    tsdbError("vgId:%d, fid:%d failed to stat file %s since %s", vid, f->fid, path, tstrerror(code));
×
464
    return false;
×
465
  }
466

467
  if (f->lcn > 1) {
×
468
    szFile += szChunk * (f->lcn - 1); // add the size of migrated chunks
×
469
  }
470

471
  int totalChunks = szFile / szChunk;
×
472
  if (szFile % szChunk) {
×
UNCOV
473
    totalChunks++;
×
474
  }
475

UNCOV
476
  int lcn = f->lcn < 1 ? 1 : f->lcn;
×
477

478
  // upload chunks one by one, the first chunk may already been uploaded, but may be
479
  // modified thereafter, so we need to upload it again
480
  for (int i = lcn; i <= totalChunks; ++i) {
×
481
    int64_t offset = (int64_t)(i - lcn) * szChunk;
×
UNCOV
482
    int64_t size = szChunk;
×
483
    if (i == totalChunks && szFile % szChunk) {
×
UNCOV
484
        size = szFile % szChunk;
×
485
    }
486

487
    // only include the migration id in the last chunk filename
488
    char rpath[TSDB_FILENAME_LEN];
UNCOV
489
    if (i == totalChunks) {
×
490
      sprintf(rpath, "vnode%d/f%d/v%df%dver%" PRId64 ".m%d.%d.data", vid, f->fid, vid, f->fid, f->cid, mid, i);
×
491
    } else {
UNCOV
492
      sprintf(rpath, "vnode%d/f%d/v%df%dver%" PRId64 ".%d.data", vid, f->fid, vid, f->fid, f->cid, i);
×
493
    }
494

UNCOV
495
    code = tssUploadFileToDefault(rpath, path, offset, size);
×
UNCOV
496
    if (code != TSDB_CODE_SUCCESS) {
×
497
      tsdbError("vgId:%d, fid:%d, failed to migrate data file since %s", vid, f->fid, tstrerror(code));
×
498
      return code;
×
499
    }
500
  }
501

UNCOV
502
  f->lcn = totalChunks;
×
UNCOV
503
  op.nf = *f;
×
504
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
505
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
506
    tsdbError("vgId:%d, fid:%d, failed to append file operation since %s", vid, f->fid, tstrerror(code));
×
507
    return code;
×
508
  }
509

510
  // manifest must be uploaded before copy last chunk, otherwise, failed to upload manifest
511
  // will result in a broken migration
512
  tsdbInfo("vgId:%d, fid:%d, data file migrated, begin generate & upload manifest", vid, f->fid);
×
513

514
  // manifest, this also commit the migration
515
  code = uploadManifest(vnodeNodeId(rtner->tsdb->pVnode), vid, rtner->fset, getSsMigrateId(rtner->tsdb));
×
UNCOV
516
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
517
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
518
    return code;
×
519
  }
520

UNCOV
521
  tsdbInfo("vgId:%d, fid:%d, manifest uploaded, begin remove garbage files", vid, f->fid);
×
UNCOV
522
  tsdbRemoveSsGarbageFiles(vid, rtner->fset);
×
523

UNCOV
524
  setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SUCCEEDED);
×
UNCOV
525
  tsdbInfo("vgId:%d, fid:%d, leader migration succeeded", vid, f->fid);
×
526

527
  // no new chunks generated, no need to copy the last chunk
528
  if (totalChunks == lcn) {
×
529
    return 0;
×
530
  }
531

532
  // copy the last chunk to the new file
533
  char newPath[TSDB_FILENAME_LEN];
UNCOV
534
  tsdbTFileLastChunkName(rtner->tsdb, &op.nf, newPath);
×
535

536
  int64_t offset = (int64_t)(totalChunks - lcn) * szChunk;
×
UNCOV
537
  int64_t size = szChunk;
×
538
  if (szFile % szChunk) {
×
UNCOV
539
    size = szFile % szChunk;
×
540
  }
541

542
  TdFilePtr fdFrom = taosOpenFile(path, TD_FILE_READ);
×
543
  if (fdFrom == NULL) {
×
544
    code = terrno;
×
UNCOV
545
    tsdbError("vgId:%d, fid:%d, failed to open source file %s since %s", vid, f->fid, path, tstrerror(code));
×
UNCOV
546
    return code;
×
547
  }
548

UNCOV
549
  TdFilePtr fdTo = taosOpenFile(newPath, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
×
UNCOV
550
  if (fdTo == NULL) {
×
551
    code = terrno;
×
552
    tsdbError("vgId:%d, fid:%d, failed to open target file %s since %s", vid, f->fid, newPath, tstrerror(code));
×
553
    TAOS_UNUSED(taosCloseFile(&fdFrom));
×
UNCOV
554
    return code;
×
555
  }
556

UNCOV
557
  int64_t n = taosFSendFile(fdTo, fdFrom, &offset, size);
×
UNCOV
558
  if (n < 0) {
×
UNCOV
559
    code = terrno;
×
560
    tsdbError("vgId:%d, fid:%d, failed to copy file %s to %s since %s", vid, f->fid, path, newPath, tstrerror(code));
×
561
  }
562
  TAOS_UNUSED(taosCloseFile(&fdFrom));
×
563
  TAOS_UNUSED(taosCloseFile(&fdTo));
×
564

UNCOV
565
  return code;
×
566
}
567

568

569
static bool shouldMigrate(SRTNer *rtner, int32_t *pCode) {
×
570
  int32_t vid = TD_VID(rtner->tsdb->pVnode);
×
UNCOV
571
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
572
  STFileSet *pLocalFset = rtner->fset;
×
UNCOV
573
  STFileObj *flocal = pLocalFset->farr[TSDB_FTYPE_DATA];
×
574

575
  *pCode = 0;
×
576
  if (!flocal) {
×
577
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, local data file not exist", vid, pLocalFset->fid);
×
578
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
UNCOV
579
    return false;
×
580
  }
581

582
  if (rtner->lastCommit != pLocalFset->lastCommit) {
×
583
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, there are new commits after migration task is scheduled", vid, pLocalFset->fid);
×
584
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
585
    return false;
×
586
  }
587

UNCOV
588
  if (pCfg->ssCompact && flocal->f->lcn < 0) {
×
UNCOV
589
    int32_t lcn = flocal->f->lcn;
×
UNCOV
590
    STimeWindow win = {0};
×
UNCOV
591
    tsdbFidKeyRange(pLocalFset->fid, rtner->tsdb->keepCfg.days, rtner->tsdb->keepCfg.precision, &win.skey, &win.ekey);
×
592
    *pCode = tsdbAsyncCompact(rtner->tsdb, &win, true);
×
UNCOV
593
    tsdbInfo("vgId:%d, fid:%d, migration cancelled, fileset need compact, lcn: %d", vid, pLocalFset->fid, lcn);
×
UNCOV
594
    if (*pCode) {
×
595
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
596
    } else {
597
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_COMPACT);
×
598
    }
UNCOV
599
    return false; // compact in progress
×
600
  }
601

602
  char path[TSDB_FILENAME_LEN];
UNCOV
603
  if (flocal->f->lcn <= 1) {
×
604
    strcpy(path, flocal->fname);
×
605
  } else {
UNCOV
606
    tsdbTFileLastChunkName(rtner->tsdb, flocal->f, path);
×
607
  }
608

609
  int64_t mtime = 0;
×
UNCOV
610
  *pCode = taosStatFile(path, NULL, &mtime, NULL);
×
UNCOV
611
  if (*pCode != TSDB_CODE_SUCCESS) {
×
UNCOV
612
    tsdbError("vgId:%d, fid:%d, migration cancelled, failed to stat file %s since %s", vid, pLocalFset->fid, path, tstrerror(*pCode));
×
UNCOV
613
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
614
    return false;
×
615
  }
616

617
  // 'mtime >= rtner->now - tsSsUploadDelaySec' means the file is active writing, and we should skip
618
  // the migration. However, this may also be a result of the [ssCompact] option, which should not
619
  // be skipped, so we also check 'mtime > pLocalFset->lastCompact / 1000 || !pCfg->ssCompact', note
620
  // this is not an acurate condition, but is simple and good enough.
UNCOV
621
  if (mtime >= rtner->now - tsSsUploadDelaySec && (mtime > pLocalFset->lastCompact / 1000 || !pCfg->ssCompact)) {
×
622
    tsdbInfo("vgId:%d, fid:%d, migration skipped, data file is active writting, modified at %" PRId64, vid, pLocalFset->fid, mtime);
×
623
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
624
    return false; // still active writing, postpone migration
×
625
  }
626

627
  // download manifest from shared storage
UNCOV
628
  STFileSet *pRemoteFset = NULL;
×
629
  *pCode = downloadManifest(rtner->tsdb->pVnode, pLocalFset->fid, &pRemoteFset);
×
630
  if (*pCode == TSDB_CODE_SUCCESS) {
×
631
    // remote file exists but local file has not been migrated, there are two possibilities:
632
    // 1. there's a compact after the last migration, this is a normal case, we can discard
633
    //    the remote files and continue the migration;
634
    // 2. in the last migration, this node was a follower, the leader did its job successfully,
635
    //    but this node did not, continue the migration may overwrite remote file and result in
636
    //    data corruption on other nodes.
637
    // however, it is hard to distinguish them, so just treat both as a migration error. hope
638
    // the user could do something to recover, such as remove remote files.
639
    if (flocal->f->lcn < 1) {
×
640
      tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
641
      tsdbError("vgId:%d, fid:%d, migration cancelled, remote manifest found but local lcn < 1", vid, pLocalFset->fid);
×
642
      *pCode = TSDB_CODE_FILE_CORRUPTED;
×
643
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
644
      return false;
×
645
    }
646

UNCOV
647
  } else if (*pCode == TSDB_CODE_NOT_FOUND) {
×
UNCOV
648
    if (flocal->f->lcn >= 1) {
×
649
      tsdbError("vgId:%d, fid:%d, migration cancelled, remote manifest not found but local lcn >= 1", vid, pLocalFset->fid);
×
650
      *pCode = TSDB_CODE_FILE_CORRUPTED;
×
651
      setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
652
      return false;
×
653
    }
654

655
    // this is the first migration, we should do it.
656
    *pCode = TSDB_CODE_SUCCESS;
×
657
    return true;
×
658

659
  } else {
UNCOV
660
    tsdbError("vgId:%d, fid:%d, migration cancelled, failed to download manifest, code: %d", vid, pLocalFset->fid, *pCode);
×
661
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
662
    return false;
×
663
  }
664

UNCOV
665
  STFileObj *fremote = pRemoteFset->farr[TSDB_FTYPE_DATA];
×
666
  if (fremote == NULL) {
×
667
    tsdbError("vgId:%d, fid:%d, migration cancelled, cannot find data file information from remote manifest", vid, pLocalFset->fid);
×
668
    tsdbTFileSetClear(&pRemoteFset);
×
669
    *pCode = TSDB_CODE_FILE_CORRUPTED;
×
670
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
671
    return false;
×
672
  }
673

UNCOV
674
  if (fremote->f->lcn != flocal->f->lcn) {
×
UNCOV
675
    tsdbError("vgId:%d, fid:%d, migration cancelled, remote and local data file information mismatch", vid, pLocalFset->fid);
×
676
    tsdbTFileSetClear(&pRemoteFset);
×
677
    *pCode = TSDB_CODE_FILE_CORRUPTED;
×
UNCOV
678
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
679
    return false;
×
680
  }
681
  
682
  if (fremote->f->maxVer == flocal->f->maxVer) {
×
683
    tsdbTFileSetClear(&pRemoteFset);
×
684
    tsdbError("vgId:%d, fid:%d, migration skipped, no new data", vid, pLocalFset->fid);
×
685
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
686
    return false; // no new data
×
687
  }
688

689
  tsdbTFileSetClear(&pRemoteFset); // we use the local file set information for migration
×
690
  tsdbInfo("vgId:%d, fid:%d, file set will be migrated", vid, pLocalFset->fid);
×
691
  return true;
×
692
}
693

694

695
static int32_t tsdbFollowerDoSsMigrate(SRTNer *rtner) {
×
696
  int32_t code = 0, vid = TD_VID(rtner->tsdb->pVnode);
×
UNCOV
697
  STFileSet *fset = rtner->fset;
×
UNCOV
698
  SSsMigrateMonitor* pmm = rtner->tsdb->pSsMigrateMonitor;
×
699
  int32_t fsIdx = 0;
×
700

701
  // though we make this check in the leader node, we should do this in the follower nodes too.
702
  // because there may be a leader change and the execution order of async tasks may result in
703
  // different commit time. if we don't do this, we may corrupt the follower data.
704
  if (rtner->lastCommit != fset->lastCommit) {
×
705
    tsdbInfo("vgId:%d, fid:%d, follower migration cancelled, there are new commits after migration is scheduled", vid, fset->fid);
×
706
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_SKIPPED);
×
UNCOV
707
    return 0;
×
708
  }
709

710
  tsdbInfo("vgId:%d, fid:%d, vnode is follower, waiting leader on node %d to upload.", vid, fset->fid, rtner->nodeId);
×
711

712
  (void)taosThreadMutexLock(&rtner->tsdb->mutex);
×
713

UNCOV
714
  while(pmm->state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
715
    struct timespec ts;
UNCOV
716
    if ((code = taosClockGetTime(CLOCK_REALTIME, &ts)) != TSDB_CODE_SUCCESS) {
×
UNCOV
717
      tsdbError("vgId:%d, fid:%d, failed to get current time since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
718
      TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
UNCOV
719
      return code;
×
720
    }
721
    ts.tv_sec += 30; // TODO: make it configurable
×
722
    code = taosThreadCondTimedWait(&pmm->stateChanged, &rtner->tsdb->mutex, &ts);
×
723
    if (code == TSDB_CODE_TIMEOUT_ERROR) {
×
724
      tsdbError("vgId:%d, fid:%d, waiting leader migration timed out", vid, fset->fid);
×
725
      pmm->state = SSMIGRATE_FILESET_STATE_FAILED;
×
726
    }
727
  }
728

729
  if (pmm->state != SSMIGRATE_FILESET_STATE_SUCCEEDED) {
×
730
    TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
731
    tsdbInfo("vgId:%d, fid:%d, follower migration skipped because leader migration skipped or failed", vid, fset->fid);
×
732
    return 0;
×
733
  }
734

UNCOV
735
  TAOS_UNUSED(taosThreadMutexUnlock(&rtner->tsdb->mutex));
×
736

737
  // NOTE: After the leader node finished processing the current file set and mnode sent the final
738
  // follower migrate request, mnode waits at least 30 seconds before triggering the processing of
739
  // the next file set. Because all file sets of a vgroup shares the same [pmm], from now on, the
740
  // follower should not access [pmm->state] anymore.
741
  // refer comments in mndUpdateSsMigrateProgress for more details.
742

UNCOV
743
  tsdbInfo("vgId:%d, fid:%d, follower migration started, begin downloading manifest...", vid, fset->fid);
×
744
  STFileSet *pRemoteFset = NULL;
×
745
  code = downloadManifest(rtner->tsdb->pVnode, fset->fid, &pRemoteFset);
×
746
  if (code == TSDB_CODE_NOT_FOUND) {
×
747
    tsdbTFileSetClear(&pRemoteFset);
×
748
    return TSDB_CODE_FILE_CORRUPTED;
×
749
  }
750

751
  // this often happens in the catch up process of a new node, it is ok to continue, but will
752
  // result in download same files more than once, which is a waste of time and bandwidth.
753
  if (pRemoteFset->farr[TSDB_FTYPE_HEAD]->f->mid != getSsMigrateId(rtner->tsdb)) {
×
754
    tsdbTFileSetClear(&pRemoteFset);
×
755
    tsdbWarn("vgId:%d, fid:%d, follower migration cancelled, migration id mismatch", vid, fset->fid);
×
756
    return 0;
×
757
  }
758

759
  tsdbInfo("vgId:%d, fid:%d, manifest downloaded, begin downloading head file", vid, fset->fid);
×
760
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_HEAD]);
×
761
  if (code != TSDB_CODE_SUCCESS) {
×
762
    tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
763
    return code;
×
764
  }
765
  STFileOp op = {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_HEAD]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_HEAD]->f};
×
766
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
767
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
768
    tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
769
    tsdbError("vgId:%d, fid:%d, failed to append head file operation since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
770
    return code;
×
771
  }
772

773
  tsdbInfo("vgId:%d, fid:%d, head file downloaded, begin downloading sma file", vid, fset->fid);
×
774
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_SMA]);
×
775
  if (code != TSDB_CODE_SUCCESS) {
×
776
    tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
777
    return code;
×
778
  }
UNCOV
779
  op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_SMA]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_SMA]->f};
×
UNCOV
780
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
781
  if (code != TSDB_CODE_SUCCESS) {
×
782
    tsdbTFileSetClear(&pRemoteFset);
×
783
    tsdbError("vgId:%d, fid:%d, failed to append sma file operation since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
784
    return code;
×
785
  }
786

UNCOV
787
  tsdbInfo("vgId:%d, fid:%d, sma file downloaded, begin downloading tomb file", vid, fset->fid);
×
788
  code = downloadFile(rtner, pRemoteFset->farr[TSDB_FTYPE_TOMB]);
×
789
  if (code != TSDB_CODE_SUCCESS) {
×
790
    tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
791
    return code;
×
792
  }
793
  if (fset->farr[TSDB_FTYPE_TOMB] != NULL && pRemoteFset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
794
    op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_TOMB]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_TOMB]->f};
×
795
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
796
  } else if (fset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
797
    // the remote tomb file is not found, but local tomb file exists, we should remove it
UNCOV
798
    op = (STFileOp) {.optype = TSDB_FOP_REMOVE, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_TOMB]->f};
×
UNCOV
799
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
800
  } else if (pRemoteFset->farr[TSDB_FTYPE_TOMB] != NULL) {
×
UNCOV
801
    op = (STFileOp) {.optype = TSDB_FOP_CREATE, .fid = fset->fid, .nf = *pRemoteFset->farr[TSDB_FTYPE_TOMB]->f};
×
802
    code = TARRAY2_APPEND(&rtner->fopArr, op);
×
803
  }
804
  if (code != TSDB_CODE_SUCCESS) {
×
805
    tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
806
    tsdbError("vgId:%d, fid:%d, failed to append tomb file operation since %s", vid, fset->fid, tstrerror(code));
×
807
    return code;
×
808
  }
809

810
  tsdbInfo("vgId:%d, fid:%d, tomb file downloaded, begin downloading stt files", vid, fset->fid);
×
811
  SSttLvl* lvl;
812
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
813
    STFileObj* fobj;
UNCOV
814
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
UNCOV
815
      op = (STFileOp) {.optype = TSDB_FOP_REMOVE, .fid = fset->fid, .of = *fobj->f};
×
816
      code = TARRAY2_APPEND(&rtner->fopArr, op);
×
UNCOV
817
      if (code != TSDB_CODE_SUCCESS) {
×
818
        tsdbTFileSetClear(&pRemoteFset);
×
819
        tsdbError("vgId:%d, fid:%d, failed to append stt file remove operation since %s", vid, fset->fid, tstrerror(code));
×
820
        return code;
×
821
      }
822
    }
823
  }
824
  TARRAY2_FOREACH(pRemoteFset->lvlArr, lvl) {
×
825
    STFileObj* fobj;
826
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
827
      code = downloadFile(rtner, fobj);
×
828
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
829
        tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
830
        return code;
×
831
      }
UNCOV
832
      op = (STFileOp) {.optype = TSDB_FOP_CREATE, .fid = fset->fid, .nf = *fobj->f};
×
833
      code = TARRAY2_APPEND(&rtner->fopArr, op);
×
834
      if (code != TSDB_CODE_SUCCESS) {
×
835
        tsdbTFileSetClear(&pRemoteFset);
×
836
        tsdbError("vgId:%d, fid:%d, failed to append stt file create operation since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
837
        return code;
×
838
      }
839
    }
840
  }
841

842
  tsdbInfo("vgId:%d, fid:%d, stt files downloaded, begin downloading data file", vid, fset->fid);
×
843
  code = downloadDataFileLastChunk(rtner, pRemoteFset->farr[TSDB_FTYPE_DATA]);
×
UNCOV
844
  if (code != TSDB_CODE_SUCCESS) {
×
845
    tsdbTFileSetClear(&pRemoteFset);
×
846
    return code;
×
847
  }
848
  op = (STFileOp) {.optype = TSDB_FOP_MODIFY, .fid = fset->fid, .of = *fset->farr[TSDB_FTYPE_DATA]->f, .nf = *pRemoteFset->farr[TSDB_FTYPE_DATA]->f};
×
849
  code = TARRAY2_APPEND(&rtner->fopArr, op);
×
850
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
851
    tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
852
    tsdbError("vgId:%d, fid:%d, failed to append data file operation since %s", vid, fset->fid, tstrerror(code));
×
853
    return code;
×
854
  }
855

856
  tsdbInfo("vgId:%d, fid:%d, data file downloaded", vid, fset->fid);
×
857
  tsdbTFileSetClear(&pRemoteFset);
×
UNCOV
858
  return 0;
×
859
}
860

861

862
static int32_t tsdbLeaderDoSsMigrate(SRTNer *rtner) {
×
863
  int32_t code = 0, vid = TD_VID(rtner->tsdb->pVnode);
×
864
  SVnodeCfg *pCfg = &rtner->tsdb->pVnode->config;
×
UNCOV
865
  STFileSet *fset = rtner->fset;
×
866

867
  tsdbInfo("vgId:%d, fid:%d, vnode is leader, migration started", vid, fset->fid);
×
868

869
  if (!shouldMigrate(rtner, &code)) {
×
870
    return code;
×
871
  }
872

873
  // head file
874
  tsdbInfo("vgId:%d, fid:%d, begin migrate head file", vid, fset->fid);
×
875
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_HEAD]);
×
876
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
877
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
878
    return code;
×
879
  }
880

881
  tsdbInfo("vgId:%d, fid:%d, head file migrated, begin migrate sma file", vid, fset->fid);
×
882

883
  // sma file
884
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_SMA]);
×
885
  if (code != TSDB_CODE_SUCCESS) {
×
886
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
887
    return code;
×
888
  }
889

890
  tsdbInfo("vgId:%d, fid:%d, sma file migrated, begin migrate tomb file", vid, fset->fid);
×
891

892
  // tomb file
UNCOV
893
  code = uploadFile(rtner, fset->farr[TSDB_FTYPE_TOMB]);
×
894
  if (code != TSDB_CODE_SUCCESS) {
×
895
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
896
    return code;
×
897
  }
898

899
  tsdbInfo("vgId:%d, fid:%d, tomb file migrated, begin migrate stt files", vid, fset->fid);
×
900

901
  // stt files
902
  SSttLvl* lvl;
UNCOV
903
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
×
904
    STFileObj* fobj;
UNCOV
905
    TARRAY2_FOREACH(lvl->fobjArr, fobj) {
×
906
      code = uploadFile(rtner, fobj);
×
907
      if (code != TSDB_CODE_SUCCESS) {
×
908
        setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
909
        return code;
×
910
      }
911
    }
912
  }
913
  
914
  tsdbInfo("vgId:%d, fid:%d, stt files migrated, begin migrate data file", vid, fset->fid);
×
915

916
  // data file
917
  code = uploadDataFile(rtner, fset->farr[TSDB_FTYPE_DATA]);
×
UNCOV
918
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
919
    setMigrationState(rtner->tsdb, SSMIGRATE_FILESET_STATE_FAILED);
×
UNCOV
920
    return code;
×
921
  }
922

923
  return TSDB_CODE_SUCCESS;
×
924
}
925

926

UNCOV
927
int32_t tsdbDoSsMigrate(SRTNer *rtner) {
×
928
  // note: leader is decided when the task is scheduled, the actual leader may change after that,
929
  // but this is ok.
930
  if (rtner->nodeId == vnodeNodeId(rtner->tsdb->pVnode)) {
×
931
    return tsdbLeaderDoSsMigrate(rtner);
×
932
  }
933
  return tsdbFollowerDoSsMigrate(rtner);
×
934
}
935

936

937
#ifdef USE_SHARED_STORAGE
938

UNCOV
939
int32_t tsdbListSsMigrateFileSets(STsdb *tsdb, SArray* fidArr) {
×
UNCOV
940
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
×
UNCOV
941
  int64_t now = taosGetTimestampSec();
×
942
  SVnodeCfg *pCfg = &tsdb->pVnode->config;
×
943

944
  (void)taosThreadMutexLock(&tsdb->mutex);
×
945

946
  STFileSet *fset;
947
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
×
UNCOV
948
    if (tsdbFidLevel(fset->fid, &tsdb->keepCfg, now) < 0) {
×
949
      tsdbInfo("vgId:%d, fid:%d, migration skipped, file set is expired", vid, fset->fid);
×
950
      continue;
×
951
    }
952

UNCOV
953
    if (tsdbSsFidLevel(fset->fid, &tsdb->keepCfg, tsdb->pVnode->config.ssKeepLocal, now) < 1) {
×
UNCOV
954
      tsdbInfo("vgId:%d, fid:%d, migration skipped, keep local file set", vid, fset->fid);
×
UNCOV
955
      continue;
×
956
    }
957

958
    STFileObj *fdata = fset->farr[TSDB_FTYPE_DATA];
×
959
    if (fdata == NULL) {
×
960
      tsdbInfo("vgId:%d, fid:%d, migration skipped, no data file", vid, fset->fid);
×
UNCOV
961
      continue;
×
962
    }
963

964
    char path[TSDB_FILENAME_LEN];
UNCOV
965
    if (fdata->f->lcn <= 1) {
×
966
      strcpy(path, fdata->fname);
×
967
    } else {
968
      tsdbTFileLastChunkName(tsdb, fdata->f, path);
×
969
    }
970

UNCOV
971
    int64_t size = 0;
×
972
    int32_t code = taosStatFile(path, &size, NULL, NULL);
×
UNCOV
973
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
974
      tsdbError("vgId:%d, fid:%d, migration skipped, failed to stat file since %s", vid, fset->fid, tstrerror(code));
×
975
      continue;
×
976
    }
977

978
    if (size <= (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize) {
×
UNCOV
979
      tsdbInfo("vgId:%d, fid:%d, migration skipped, data file is too small, size: %" PRId64 " bytes", vid, fset->fid, size);
×
UNCOV
980
      continue;
×
981
    }
982

UNCOV
983
    if (taosArrayPush(fidArr, &fset->fid) == NULL) {
×
UNCOV
984
      code = terrno;
×
985
      tsdbError("vgId:%d, failed to push file set id %d to array since %s", vid, fset->fid, tstrerror(code));
×
UNCOV
986
      break;
×
987
    }
988
  }
989

990
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
991
  return code;
×
992
}
993

994

UNCOV
995
static int32_t tsdbAsyncMigrateFileSetImpl(STsdb *tsdb,  const SSsMigrateFileSetReq *pReq) {
×
996
  int32_t vid = TD_VID(tsdb->pVnode), code = 0;
×
997

998
  // check if background task is disabled
999
  if (tsdb->bgTaskDisabled) {
×
1000
    tsdbInfo("vgId:%d, ssMigrateId:%d, background task is disabled, skip", vid, pReq->ssMigrateId);
×
1001
    return TSDB_CODE_FAILED;
×
1002
  }
1003

UNCOV
1004
  SSsMigrateMonitor* pmm = tsdb->pSsMigrateMonitor;
×
1005
  if (pmm->fid != 0 && pmm->state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
UNCOV
1006
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, failed to monitor since previous migration is still in progress",
×
1007
              vid, pReq->fid, pReq->ssMigrateId);
UNCOV
1008
    return TSDB_CODE_FAILED;
×
1009
  }
1010

1011
  STFileSet *fset, *fset1;
1012
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset1) {
×
1013
    if (fset1->fid == pReq->fid) {
×
UNCOV
1014
      fset = fset1;
×
1015
      break;
×
1016
    }
1017
  }
1018

UNCOV
1019
  if (fset == NULL) {
×
UNCOV
1020
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, file set not found", vid, pReq->fid, pReq->ssMigrateId);
×
UNCOV
1021
    return TSDB_CODE_NOT_FOUND;
×
1022
  }
1023

UNCOV
1024
  if (fset->lastMigrate/1000 >= pReq->startTimeSec) {
×
UNCOV
1025
    tsdbInfo("vgId:%d, fid:%d, ssMigrate:%d, start time < last migration time, skip", vid, fset->fid, pReq->ssMigrateId);
×
UNCOV
1026
    return TSDB_CODE_FAILED;
×
1027
  }
1028

UNCOV
1029
  SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
×
UNCOV
1030
  if (arg == NULL) {
×
UNCOV
1031
    code = terrno;
×
UNCOV
1032
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, memory allocation failed", vid, pReq->fid, pReq->ssMigrateId);
×
UNCOV
1033
    return code;
×
1034
  }
1035

UNCOV
1036
  arg->tsdb = tsdb;
×
UNCOV
1037
  arg->now = pReq->startTimeSec;
×
UNCOV
1038
  arg->fid = fset->fid;
×
UNCOV
1039
  arg->nodeId = pReq->nodeId;
×
UNCOV
1040
  arg->ssMigrate = true;
×
UNCOV
1041
  arg->lastCommit = fset->lastCommit;
×
1042

UNCOV
1043
  pmm->ssMigrateId = pReq->ssMigrateId;
×
UNCOV
1044
  pmm->fid = fset->fid;
×
UNCOV
1045
  pmm->state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
UNCOV
1046
  pmm->startTimeSec = pReq->startTimeSec;
×
1047

UNCOV
1048
  code = vnodeAsync(RETENTION_TASK_ASYNC, EVA_PRIORITY_LOW, tsdbRetention, tsdbRetentionCancel, arg,
×
1049
                    &fset->retentionTask);
UNCOV
1050
  if (code) {
×
UNCOV
1051
    tsdbError("vgId:%d, fid:%d, ssMigrateId:%d, schedule async task failed", vid, pReq->fid, pReq->ssMigrateId);
×
UNCOV
1052
    taosMemoryFree(arg);
×
UNCOV
1053
    pmm->state = SSMIGRATE_FILESET_STATE_FAILED;
×
1054
  }
1055

UNCOV
1056
  return code;
×
1057
}
1058

1059

UNCOV
1060
int32_t tsdbAsyncSsMigrateFileSet(STsdb *tsdb, SSsMigrateFileSetReq *pReq) {
×
UNCOV
1061
  int32_t code = 0;
×
1062

UNCOV
1063
  (void)taosThreadMutexLock(&tsdb->mutex);
×
UNCOV
1064
  code = tsdbAsyncMigrateFileSetImpl(tsdb, pReq);
×
UNCOV
1065
  (void)taosThreadMutexUnlock(&tsdb->mutex);
×
1066

UNCOV
1067
  return code;
×
1068
}
1069

1070
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc