• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4761

28 Sep 2025 10:49AM UTC coverage: 57.837% (-1.0%) from 58.866%
#4761

push

travis-ci

web-flow
merge: set version (#33122)

136913 of 302095 branches covered (45.32%)

Branch coverage included in aggregate %.

207750 of 293830 relevant lines covered (70.7%)

5673932.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.11
/source/dnode/mnode/impl/src/mndSsMigrate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "audit.h"
16
#include "mndSsMigrate.h"
17
#include "mndDb.h"
18
#include "mndDnode.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "tmisce.h"
24
#include "tmsgcb.h"
25

26
#define MND_SSMIGRATE_VER_NUMBER       2
27

28
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq);
29
static int32_t mndProcessUpdateSsMigrateProgressTimer(SRpcMsg *pReq);
30
static int32_t mndProcessSsMigrateListFileSetsRsp(SRpcMsg *pMsg);
31
static int32_t mndProcessSsMigrateFileSetRsp(SRpcMsg *pMsg);
32
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pReq);
33
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pMsg);
34

35
int32_t mndInitSsMigrate(SMnode *pMnode) {
1,944✔
36
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SSMIGRATE, mndRetrieveSsMigrate);
1,944✔
37
  mndSetMsgHandle(pMnode, TDMT_MND_SSMIGRATE_DB_TIMER, mndProcessSsMigrateDbTimer);
1,944✔
38
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SSMIGRATE, mndProcessKillSsMigrateReq);
1,944✔
39
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SSMIGRATE_RSP, mndTransProcessRsp);
1,944✔
40
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_SSMIGRATE_PROGRESS_TIMER, mndProcessUpdateSsMigrateProgressTimer);
1,944✔
41
  mndSetMsgHandle(pMnode, TDMT_VND_LIST_SSMIGRATE_FILESETS_RSP, mndProcessSsMigrateListFileSetsRsp);
1,944✔
42
  mndSetMsgHandle(pMnode, TDMT_VND_SSMIGRATE_FILESET_RSP, mndProcessSsMigrateFileSetRsp);
1,944✔
43
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SSMIGRATE_PROGRESS_RSP, mndProcessQuerySsMigrateProgressRsp);
1,944✔
44
  mndSetMsgHandle(pMnode, TDMT_VND_FOLLOWER_SSMIGRATE_RSP, mndProcessFollowerSsMigrateRsp);
1,944✔
45

46
  SSdbTable table = {
1,944✔
47
      .sdbType = SDB_SSMIGRATE,
48
      .keyType = SDB_KEY_INT32,
49
      .encodeFp = (SdbEncodeFp)mndSsMigrateActionEncode,
50
      .decodeFp = (SdbDecodeFp)mndSsMigrateActionDecode,
51
      .insertFp = (SdbInsertFp)mndSsMigrateActionInsert,
52
      .updateFp = (SdbUpdateFp)mndSsMigrateActionUpdate,
53
      .deleteFp = (SdbDeleteFp)mndSsMigrateActionDelete,
54
  };
55

56
  return sdbSetTable(pMnode->pSdb, table);
1,944✔
57
}
58

59
void mndCleanupSsMigrate(SMnode *pMnode) { mDebug("mnd ssmigrate cleanup"); }
1,944✔
60

61
void tFreeSsMigrateObj(SSsMigrateObj *pSsMigrate) {
×
62
  taosArrayDestroy(pSsMigrate->vgroups);
×
63
  taosArrayDestroy(pSsMigrate->fileSets);
×
64
}
×
65

66
int32_t tSerializeSSsMigrateObj(void *buf, int32_t bufLen, const SSsMigrateObj *pObj) {
×
67
  SEncoder encoder = {0};
×
68
  int32_t  code = 0;
×
69
  int32_t  lino;
70
  int32_t  tlen;
71
  tEncoderInit(&encoder, buf, bufLen);
×
72

73
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
74
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->id));
×
75
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->dbUid));
×
76
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
×
77
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
×
78
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->stateUpdateTime));
×
79
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgIdx));
×
80
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgState));
×
81
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->fsetIdx));
×
82
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.nodeId));
×
83
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.vgId));
×
84
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.fid));
×
85
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.state));
×
86
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->currFset.startTime));
×
87

88
  int32_t numVg = pObj->vgroups ? taosArrayGetSize(pObj->vgroups) : 0;
×
89
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numVg));
×
90
  for (int32_t i = 0; i < numVg; ++i) {
×
91
    int32_t *vgId = (int32_t *)taosArrayGet(pObj->vgroups, i);
×
92
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, *vgId));
×
93
  }
94

95
  int32_t numFset = pObj->fileSets ? taosArrayGetSize(pObj->fileSets) : 0;
×
96
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numFset));
×
97
  for (int32_t i = 0; i < numFset; ++i) {
×
98
    int32_t *fsetId = (int32_t *)taosArrayGet(pObj->fileSets, i);
×
99
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, *fsetId));
×
100
  }
101
  tEndEncode(&encoder);
×
102

103
_exit:
×
104
  if (code) {
×
105
    tlen = code;
×
106
  } else {
107
    tlen = encoder.pos;
×
108
  }
109
  tEncoderClear(&encoder);
×
110
  return tlen;
×
111
}
112

113
int32_t tDeserializeSSsMigrateObj(void *buf, int32_t bufLen, SSsMigrateObj *pObj) {
×
114
  int32_t  code = TSDB_CODE_SUCCESS, lino;
×
115
  SArray *vgroups = NULL, *fileSets = NULL;
×
116
  SDecoder decoder = {0};
×
117
  tDecoderInit(&decoder, buf, bufLen);
×
118

119
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
120
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->id));
×
121
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->dbUid));
×
122
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
×
123
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
×
124
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->stateUpdateTime));
×
125
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgIdx));
×
126
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgState));
×
127
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->fsetIdx));
×
128
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.nodeId));
×
129
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.vgId));
×
130
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.fid));
×
131
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.state));
×
132
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->currFset.startTime));
×
133

134
  int32_t numVg = 0;
×
135
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numVg));
×
136
  vgroups = pObj->vgroups;
×
137
  if (vgroups) {
×
138
    taosArrayClear(vgroups);
×
139
  } else if ((vgroups = taosArrayInit(numVg, sizeof(int32_t))) == NULL) {
×
140
    code = terrno;
×
141
    goto _exit;
×
142
  }
143
  for (int32_t i = 0; i < numVg; ++i) {
×
144
    int32_t vgId = 0;
×
145
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &vgId));
×
146
    if(taosArrayPush(vgroups, &vgId) == NULL) {
×
147
      TAOS_CHECK_EXIT(terrno);
×
148
    }
149
  }
150

151
  int32_t numFset = 0;
×
152
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numFset));
×
153
  fileSets = pObj->fileSets;
×
154
  if (fileSets) {
×
155
    taosArrayClear(fileSets);
×
156
  } else if ((fileSets = taosArrayInit(numFset, sizeof(int32_t))) == NULL) {
×
157
    code = terrno;
×
158
    goto _exit;
×
159
  }
160
  for (int32_t i = 0; i < numFset; ++i) {
×
161
    int32_t fsetId = 0;
×
162
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &fsetId));
×
163
    if(taosArrayPush(fileSets, &fsetId) == NULL) {
×
164
      TAOS_CHECK_EXIT(terrno);
×
165
    }
166
  }
167

168
  tEndDecode(&decoder);
×
169

170
_exit:
×
171
  tDecoderClear(&decoder);
×
172
  if (code == TSDB_CODE_SUCCESS) {
×
173
    pObj->vgroups = vgroups;
×
174
  } else if (pObj->vgroups) {
×
175
    taosArrayClear(pObj->vgroups);
×
176
  } else {
177
    taosArrayDestroy(vgroups);
×
178
  }
179
  if (code == TSDB_CODE_SUCCESS) {
×
180
    pObj->fileSets = fileSets;
×
181
  } else if (pObj->fileSets) {
×
182
    taosArrayClear(pObj->fileSets);
×
183
  } else {
184
    taosArrayDestroy(fileSets);
×
185
  }
186
  return code;
×
187
}
188

189
SSdbRaw *mndSsMigrateActionEncode(SSsMigrateObj *pSsMigrate) {
×
190
  int32_t code = 0;
×
191
  int32_t lino = 0;
×
192
  terrno = TSDB_CODE_SUCCESS;
×
193

194
  void    *buf = NULL;
×
195
  SSdbRaw *pRaw = NULL;
×
196

197
  int32_t tlen = tSerializeSSsMigrateObj(NULL, 0, pSsMigrate);
×
198
  if (tlen < 0) {
×
199
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
200
    goto OVER;
×
201
  }
202

203
  int32_t size = sizeof(int32_t) + tlen;
×
204
  pRaw = sdbAllocRaw(SDB_SSMIGRATE, MND_SSMIGRATE_VER_NUMBER, size);
×
205
  if (pRaw == NULL) {
×
206
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
207
    goto OVER;
×
208
  }
209

210
  buf = taosMemoryMalloc(tlen);
×
211
  if (buf == NULL) {
×
212
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
213
    goto OVER;
×
214
  }
215

216
  tlen = tSerializeSSsMigrateObj(buf, tlen, pSsMigrate);
×
217
  if (tlen < 0) {
×
218
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
219
    goto OVER;
×
220
  }
221

222
  int32_t dataPos = 0;
×
223
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
×
224
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
225
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
×
226

227
OVER:
×
228
  taosMemoryFreeClear(buf);
×
229
  if (terrno != TSDB_CODE_SUCCESS) {
×
230
    mError("ssmigrate:%" PRId32 ", failed to encode to raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
231
    sdbFreeRaw(pRaw);
×
232
    return NULL;
×
233
  }
234

235
  mTrace("ssmigrate:%" PRId32 ", encode to raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
×
236
  return pRaw;
×
237
}
238

239
SSdbRow *mndSsMigrateActionDecode(SSdbRaw *pRaw) {
×
240
  int32_t      code = 0;
×
241
  int32_t      lino = 0;
×
242
  SSdbRow     *pRow = NULL;
×
243
  SSsMigrateObj *pSsMigrate = NULL;
×
244
  void        *buf = NULL;
×
245
  terrno = TSDB_CODE_SUCCESS;
×
246

247
  int8_t sver = 0;
×
248
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
×
249
    goto OVER;
×
250
  }
251

252
  if (sver != MND_SSMIGRATE_VER_NUMBER) {
×
253
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
254
    mError("ssmigrate read invalid ver, data ver: %d, curr ver: %d", sver, MND_SSMIGRATE_VER_NUMBER);
×
255
    goto OVER;
×
256
  }
257

258
  pRow = sdbAllocRow(sizeof(SSsMigrateObj));
×
259
  if (pRow == NULL) {
×
260
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
261
    goto OVER;
×
262
  }
263

264
  pSsMigrate = sdbGetRowObj(pRow);
×
265
  if (pSsMigrate == NULL) {
×
266
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
267
    goto OVER;
×
268
  }
269

270
  int32_t tlen;
271
  int32_t dataPos = 0;
×
272
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
×
273
  buf = taosMemoryMalloc(tlen + 1);
×
274
  if (buf == NULL) {
×
275
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
276
    goto OVER;
×
277
  }
278
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
279

280
  if ((terrno = tDeserializeSSsMigrateObj(buf, tlen, pSsMigrate)) < 0) {
×
281
    goto OVER;
×
282
  }
283

284
OVER:
×
285
  taosMemoryFreeClear(buf);
×
286
  if (terrno != TSDB_CODE_SUCCESS) {
×
287
    mError("ssmigrate:%" PRId32 ", failed to decode from raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
288
    taosMemoryFreeClear(pRow);
×
289
    return NULL;
×
290
  }
291

292
  mTrace("ssmigrate:%" PRId32 ", decode from raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
×
293
  return pRow;
×
294
}
295

296
int32_t mndSsMigrateActionInsert(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
×
297
  mTrace("ssmigrate:%" PRId32 ", perform insert action", pSsMigrate->id);
×
298
  return 0;
×
299
}
300

301
int32_t mndSsMigrateActionDelete(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
×
302
  mTrace("ssmigrate:%" PRId32 ", perform delete action", pSsMigrate->id);
×
303
  tFreeSsMigrateObj(pSsMigrate);
×
304
  return 0;
×
305
}
306

307
int32_t mndSsMigrateActionUpdate(SSdb *pSdb, SSsMigrateObj *pOldSsMigrate, SSsMigrateObj *pNewSsMigrate) {
×
308
  mTrace("ssmigrate:%" PRId32 ", perform update action, old row:%p new row:%p", pOldSsMigrate->id, pOldSsMigrate,
×
309
         pNewSsMigrate);
310

311
  return 0;
×
312
}
313

314
SSsMigrateObj *mndAcquireSsMigrate(SMnode *pMnode, int64_t ssMigrateId) {
×
315
  SSdb        *pSdb = pMnode->pSdb;
×
316
  SSsMigrateObj *pSsMigrate = sdbAcquire(pSdb, SDB_SSMIGRATE, &ssMigrateId);
×
317
  if (pSsMigrate == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
318
    terrno = TSDB_CODE_SUCCESS;
×
319
  }
320
  return pSsMigrate;
×
321
}
322

323
void mndReleaseSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
324
  SSdb *pSdb = pMnode->pSdb;
×
325
  sdbRelease(pSdb, pSsMigrate);
×
326
  pSsMigrate = NULL;
×
327
}
×
328

329
int32_t mndSsMigrateGetDbName(SMnode *pMnode, int32_t ssMigrateId, char *dbname, int32_t len) {
×
330
  int32_t      code = 0;
×
331
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, ssMigrateId);
×
332
  if (pSsMigrate == NULL) {
×
333
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
334
    if (terrno != 0) code = terrno;
×
335
    TAOS_RETURN(code);
×
336
  }
337

338
  tstrncpy(dbname, pSsMigrate->dbname, len);
×
339
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
340
  TAOS_RETURN(code);
×
341
}
342

343
// ssmigrate db
344
int32_t mndAddSsMigrateToTran(SMnode *pMnode, STrans *pTrans, SSsMigrateObj *pSsMigrate, SDbObj *pDb) {
×
345
  int32_t code = 0;
×
346
  SSdb   *pSdb = pMnode->pSdb;
×
347
  void   *pIter = NULL;
×
348

349
  pSsMigrate->dbUid = pDb->uid;
×
350
  pSsMigrate->id = tGenIdPI32();
×
351
  tstrncpy(pSsMigrate->dbname, pDb->name, sizeof(pSsMigrate->dbname));
×
352
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
353
  pSsMigrate->vgIdx = 0;
×
354
  pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
355
  pSsMigrate->fsetIdx = 0;
×
356

357
  pSsMigrate->vgroups = taosArrayInit(8, sizeof(int32_t));
×
358
  if (pSsMigrate->vgroups == NULL) {
×
359
    TAOS_RETURN(terrno);
×
360
  }
361

362
  while (1) {
×
363
    SVgObj *pVgroup = NULL;
×
364
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
365
    if (pIter == NULL) break;
×
366

367
    if (pVgroup->mountVgId || pVgroup->dbUid != pDb->uid) {
×
368
      sdbRelease(pSdb, pVgroup);
×
369
      continue;
×
370
    }
371

372
    int32_t vgId = pVgroup->vgId;
×
373
    sdbRelease(pSdb, pVgroup);
×
374
    if (taosArrayPush(pSsMigrate->vgroups, &vgId) == NULL) {
×
375
      code = terrno;
×
376
      taosArrayDestroy(pSsMigrate->vgroups);
×
377
      pSsMigrate->vgroups = NULL;
×
378
      TAOS_RETURN(code);
×
379
    }
380
  }
381

382
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
383
  code = terrno;
×
384
  taosArrayDestroy(pSsMigrate->vgroups);
×
385
  pSsMigrate->vgroups = NULL;
×
386
  if (pRaw == NULL) {
×
387
    TAOS_RETURN(code);
×
388
  }
389
  if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
×
390
    sdbFreeRaw(pRaw);
×
391
    TAOS_RETURN(code);
×
392
  }
393

394
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
×
395
    sdbFreeRaw(pRaw);
×
396
    TAOS_RETURN(code);
×
397
  }
398

399
  mInfo("trans:%d, ssmigrate:%d, db:%s, has been added", pTrans->id, pSsMigrate->id, pSsMigrate->dbname);
×
400
  return 0;
×
401
}
402

403
// retrieve ssmigrate
404
int32_t mndRetrieveSsMigrate(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
405
  SMnode      *pMnode = pReq->info.node;
×
406
  SSdb        *pSdb = pMnode->pSdb;
×
407
  int32_t      numOfRows = 0;
×
408
  SSsMigrateObj *pSsMigrate = NULL;
×
409
  char        *sep = NULL;
×
410
  SDbObj      *pDb = NULL;
×
411
  int32_t      code = 0;
×
412
  int32_t      lino = 0;
×
413

414
  if (strlen(pShow->db) > 0) {
×
415
    sep = strchr(pShow->db, '.');
×
416
    if (sep &&
×
417
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
418
      sep++;
×
419
    } else {
420
      pDb = mndAcquireDb(pMnode, pShow->db);
×
421
      if (pDb == NULL) return terrno;
×
422
    }
423
  }
424

425
  while (numOfRows < rows) {
×
426
    pShow->pIter = sdbFetch(pSdb, SDB_SSMIGRATE, pShow->pIter, (void **)&pSsMigrate);
×
427
    if (pShow->pIter == NULL) break;
×
428

429
    SColumnInfoData *pColInfo;
430
    SName            n;
431
    int32_t          cols = 0;
×
432

433
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
434

435
    // ssmigrate_id
436
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
437
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->id, false), pSsMigrate, &lino, _OVER);
×
438

439
    // db_name
440
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
441
    if (pDb != NULL || !IS_SYS_DBNAME(pSsMigrate->dbname)) {
×
442
      SName name = {0};
×
443
      TAOS_CHECK_GOTO(tNameFromString(&name, pSsMigrate->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
×
444
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
×
445
    } else {
446
      tstrncpy(varDataVal(tmpBuf), pSsMigrate->dbname, TSDB_SHOW_SQL_LEN);
×
447
    }
448
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
×
449
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pSsMigrate, &lino, _OVER);
×
450

451
    // start_time
452
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
453
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->startTime, false), pSsMigrate, &lino, _OVER);
×
454

455
    // number_vgroup
456
    int32_t numVg = taosArrayGetSize(pSsMigrate->vgroups);
×
457
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
458
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&numVg, false), pSsMigrate, &lino, _OVER);
×
459

460
    // migrated_vgroup
461
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
462
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->vgIdx, false), pSsMigrate, &lino, _OVER);
×
463

464
    if (pSsMigrate->vgIdx < numVg) {
×
465
      // vgroup_id
466
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
467
      int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
468
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&vgId, false), pSsMigrate, &lino, _OVER);
×
469
      
470
      // number_fileset
471
      int32_t numFset = taosArrayGetSize(pSsMigrate->fileSets);
×
472
      if (pSsMigrate->vgState < SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
473
        numFset = 0;
×
474
      }
475
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
476
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&numFset, false), pSsMigrate, &lino, _OVER);
×
477

478
      // migrated_fileset
479
      int32_t fsetIdx = pSsMigrate->fsetIdx;
×
480
      if (pSsMigrate->vgState < SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
481
        fsetIdx = 0;
×
482
      }
483
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
484
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&fsetIdx, false), pSsMigrate, &lino, _OVER);
×
485

486
      // fileset_id
487
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
488
      if (fsetIdx < numFset) {
×
489
        int32_t fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, fsetIdx);
×
490
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&fid, false), pSsMigrate, &lino, _OVER);
×
491
      } else {
492
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
493
      }
494
    } else {
495
      // vgroup_id
496
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
497
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
498

499
      // number_fileset
500
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
501
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
502

503
      // migrated_fileset
504
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
505
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
506

507
      // fileset_id
508
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
509
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
510
    }
511

512
    numOfRows++;
×
513
    sdbRelease(pSdb, pSsMigrate);
×
514
  }
515

516
_OVER:
×
517
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
518
  pShow->numOfRows += numOfRows;
×
519
  mndReleaseDb(pMnode, pDb);
×
520
  return numOfRows;
×
521
}
522

523

524
int32_t mndSsMigrateDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb);
525

526
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq) {
×
527
  SMnode *pMnode = pReq->info.node;
×
528
  void *pIter = NULL;
×
529

530
  while (1) {
×
531
    SDbObj *pDb = NULL;
×
532
    pIter = sdbFetch(pMnode->pSdb, SDB_DB, pIter, (void **)&pDb);
×
533
    if (pIter == NULL) {
×
534
      break;
×
535
    }
536
    int32_t code = mndSsMigrateDb(pMnode, NULL, pDb);
×
537
    sdbRelease(pMnode->pSdb, pDb);
×
538
    if (code == TSDB_CODE_SUCCESS) {
×
539
      mInfo("ssmigrate db:%s, has been triggered by timer", pDb->name);
×
540
    } else {
541
      mError("failed to trigger ssmigrate db:%s, code:%d, %s", pDb->name, code, tstrerror(code));
×
542
    }
543
  }
544

545
  TAOS_RETURN(0);
×
546
}
547

548

549
static int32_t mndDropSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
550
  int32_t code = 0, lino = 0;
×
551

552
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "drop-ssmigrate");
×
553
  if (pTrans == NULL) {
×
554
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
555
  }
556

557
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
×
558

559
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
560
  if (pRaw == NULL) {
×
561
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
562
  }
563
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED)) != 0) {
×
564
    sdbFreeRaw(pRaw);
×
565
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
566
  }
567

568
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _exit);
×
569
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
×
570

571
_exit:
×
572
  mndTransDrop(pTrans);
×
573
  if (code == TSDB_CODE_SUCCESS) {
×
574
    mInfo("ssmigrate:%d was dropped successfully", pSsMigrate->id);
×
575
  } else {
576
    mError("ssmigrate:%d, failed to drop at lino %d since %s", pSsMigrate->id, lino, tstrerror(code));
×
577
  }
578
  return code;
×
579
}
580

581

582
static void mndUpdateSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
583
  int32_t code = 0, lino = 0;
×
584

585
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-ssmigrate");
×
586
  if (pTrans == NULL) {
×
587
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
588
  }
589
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
×
590

591
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
592
  if (pRaw == NULL) {
×
593
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
594
  }
595
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
×
596
    sdbFreeRaw(pRaw);
×
597
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
598
  }
599

600
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _exit);
×
601
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
×
602

603
_exit:
×
604
  mndTransDrop(pTrans);
×
605
  if (code == TSDB_CODE_SUCCESS) {
×
606
    mTrace("ssmigrate:%d was updated successfully", pSsMigrate->id);
×
607
  } else {
608
    mError("ssmigrate:%d, failed to update at lino %d since %s", pSsMigrate->id, lino, tstrerror(code));
×
609
  }
610
}
×
611

612

613
static int32_t mndKillSsMigrate(SMnode *pMnode, SRpcMsg *pReq, SSsMigrateObj *pSsMigrate) {
×
614
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
615
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
616
  if (pVgroup == NULL) {
×
617
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
618
  }
619

620
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
621
  mndReleaseVgroup(pMnode, pVgroup);
×
622

623
  SVnodeKillSsMigrateReq req = {.ssMigrateId = pSsMigrate->id};
×
624
  int32_t   reqLen = tSerializeSVnodeKillSsMigrateReq(NULL, 0, &req);
×
625
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
626
  SMsgHead *pHead = rpcMallocCont(contLen);
×
627
  if (pHead == NULL) {
×
628
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
629
  }
630

631
  pHead->contLen = htonl(contLen);
×
632
  pHead->vgId = htonl(vgId);
×
633
  int32_t ret = 0;
×
634
  if ((ret = tSerializeSVnodeKillSsMigrateReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
635
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
636
  }
637

638
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_KILL_SSMIGRATE, .pCont = pHead, .contLen = contLen};
×
639
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
640
  if (code != 0) {
×
641
    mError("ssmigrate:%d, vgId:%d, failed to send kill ssmigrate request to vnode since 0x%x", req.ssMigrateId, vgId, code);
×
642
  } else {
643
    mInfo("ssmigrate:%d, vgId:%d, kill ssmigrate request was sent to vnode", req.ssMigrateId, vgId);
×
644
  }
645

646
  return mndDropSsMigrate(pMnode, pSsMigrate);
×
647
}
648

649
int32_t mndProcessKillSsMigrateReq(SRpcMsg *pReq) {
×
650
  int32_t         code = 0;
×
651
  int32_t         lino = 0;
×
652
  SKillSsMigrateReq killReq = {0};
×
653

654
  if ((code = tDeserializeSKillSsMigrateReq(pReq->pCont, pReq->contLen, &killReq)) != 0) {
×
655
    TAOS_RETURN(code);
×
656
  }
657

658
  mInfo("start to kill ssmigrate:%" PRId32, killReq.ssMigrateId);
×
659

660
  SMnode      *pMnode = pReq->info.node;
×
661
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, killReq.ssMigrateId);
×
662
  if (pSsMigrate == NULL) {
×
663
    code = TSDB_CODE_MND_INVALID_SSMIGRATE_ID;
×
664
    tFreeSKillSsMigrateReq(&killReq);
×
665
    TAOS_RETURN(code);
×
666
  }
667

668
  //TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SSMIGRATE_DB), &lino, _OVER);
669

670
  TAOS_CHECK_GOTO(mndKillSsMigrate(pMnode, pReq, pSsMigrate), &lino, _OVER);
×
671

672
_OVER:
×
673
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
674
    mError("failed to kill ssmigrate %" PRId32 " since %s", killReq.ssMigrateId, tstrerror(code));
×
675
  }
676

677
  tFreeSKillSsMigrateReq(&killReq);
×
678
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
679

680
  TAOS_RETURN(code);
×
681
}
682

683

684
static void mndSendSsMigrateListFileSetsReq(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
685
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
686
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
687
  if (pVgroup == NULL) {
×
688
    mError("ssmigrate:%d, vgId:%d, vgroup does not exist in %s", pSsMigrate->id, vgId, __func__);
×
689
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
690
    pSsMigrate->vgIdx++;
×
691
    return;
×
692
  }
693

694
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
695
  mndReleaseVgroup(pMnode, pVgroup);
×
696

697
  SListSsMigrateFileSetsReq req = {.ssMigrateId = pSsMigrate->id};
×
698
  int32_t   reqLen = tSerializeSListSsMigrateFileSetsReq(NULL, 0, &req);
×
699
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
700
  SMsgHead *pHead = rpcMallocCont(contLen);
×
701
  if (pHead == NULL) {
×
702
    return;
×
703
  }
704

705
  pHead->contLen = htonl(contLen);
×
706
  pHead->vgId = htonl(vgId);
×
707
  int32_t ret = 0;
×
708
  if ((ret = tSerializeSListSsMigrateFileSetsReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
709
    return;
×
710
  }
711

712
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_LIST_SSMIGRATE_FILESETS, .pCont = pHead, .contLen = contLen};
×
713
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
714
  if (code != 0) {
×
715
    mError("ssmigrate:%d, vgId:%d, failed to send list filesets request to vnode since 0x%x", req.ssMigrateId, vgId, code);
×
716
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
717
    pSsMigrate->vgIdx++;
×
718
  } else {
719
    mInfo("ssmigrate:%d, vgId:%d, list filesets request was sent to vnode", req.ssMigrateId, vgId);
×
720
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_WAITING_FSET_LIST;
×
721
  }
722

723
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
724
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
725
}
726

727

728
static int32_t mndProcessSsMigrateListFileSetsRsp(SRpcMsg *pMsg) {
×
729
  int32_t code = 0, lino = 0;
×
730

731
  if (pMsg->code != 0) {
×
732
    mError("received wrong ssmigrate list filesets response, req code is %s", tstrerror(pMsg->code));
×
733
    TAOS_RETURN(pMsg->code);
×
734
  }
735

736
  SSsMigrateObj *pSsMigrate = NULL;
×
737
  SListSsMigrateFileSetsRsp rsp = {0};
×
738
  code = tDeserializeSListSsMigrateFileSetsRsp(pMsg->pCont, pMsg->contLen, &rsp);
×
739
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
740

741
  SMnode *pMnode = pMsg->info.node;
×
742
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
743
  if (pSsMigrate == NULL) {
×
744
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
745
  }
746

747
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
748
  if (vgId != rsp.vgId) {
×
749
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
750
  }
751

752
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_WAITING_FSET_LIST) {
×
753
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
754
  }
755

756
  // we need to use the new filesets to update the SSsMigrateObj,
757
  // swap is only to make the it is easier to free both of them.
758
  SArray* tmp = pSsMigrate->fileSets;
×
759
  pSsMigrate->fileSets = rsp.pFileSets;
×
760
  rsp.pFileSets = tmp;
×
761

762
  pSsMigrate->fsetIdx = 0;
×
763
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
764
  if (taosArrayGetSize(pSsMigrate->fileSets) == 0) {
×
765
    mInfo("ssmigrate:%d, vgId:%d, no filesets to migrate.", pSsMigrate->id, rsp.vgId);
×
766
    pSsMigrate->vgIdx++;
×
767
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
768
  } else {
769
    mInfo("ssmigrate:%d, vgId:%d, filesets received.", pSsMigrate->id, rsp.vgId);
×
770
    pSsMigrate->currFset.nodeId = 0;
×
771
    pSsMigrate->currFset.vgId = vgId;
×
772
    pSsMigrate->currFset.fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, 0);
×
773
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
774
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
775
  }
776

777
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
778
  
779
_exit:
×
780
  if (code != TSDB_CODE_SUCCESS) {
×
781
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
782
  }
783
  tFreeSListSsMigrateFileSetsRsp(&rsp);
×
784
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
785
  return code;
×
786
}
787

788

789
static void mndSendSsMigrateFileSetReq(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
790
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
791
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
792
  if (pVgroup == NULL) {
×
793
    mError("ssmigrate:%d, vgId:%d, vgroup does not exist in %s", pSsMigrate->id, vgId, __func__);
×
794
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
795
    pSsMigrate->vgIdx++;
×
796
    return;
×
797
  }
798

799
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
800
  mndReleaseVgroup(pMnode, pVgroup);
×
801

802
  SSsMigrateFileSetReq req = { 0 };
×
803
  req.ssMigrateId = pSsMigrate->id;
×
804
  req.nodeId = 0;
×
805
  req.fid = pSsMigrate->currFset.fid;
×
806
  req.startTimeSec = taosGetTimestampSec();
×
807

808
  int32_t   reqLen = tSerializeSSsMigrateFileSetReq(NULL, 0, &req);
×
809
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
810
  SMsgHead *pHead = rpcMallocCont(contLen);
×
811
  if (pHead == NULL) {
×
812
    return;
×
813
  }
814

815
  pHead->contLen = htonl(contLen);
×
816
  pHead->vgId = htonl(vgId);
×
817
  int32_t ret = 0;
×
818
  if ((ret = tSerializeSSsMigrateFileSetReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
819
    return;
×
820
  }
821

822
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_SSMIGRATE_FILESET, .pCont = pHead, .contLen = contLen};
×
823
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
824
  if (code != 0) {
×
825
    mError("ssmigrate:%d, vgId:%d, fid:%d, failed to send migrate fileset request to vnode since 0x%x", req.ssMigrateId, vgId, req.fid, code);
×
826
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
827
    pSsMigrate->vgIdx++;
×
828
  } else {
829
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, migrate fileset request was sent to vnode", req.ssMigrateId, vgId, req.fid);
×
830
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_STARTING;
×
831
    pSsMigrate->currFset.startTime = req.startTimeSec;
×
832
  }
833

834
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
835
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
836
}
837

838

839
static int32_t mndProcessSsMigrateFileSetRsp(SRpcMsg *pMsg) {
×
840
  int32_t code = 0, lino = 0;
×
841

842
  if (pMsg->code != 0) {
×
843
    mError("received wrong ssmigrate fileset response, error code is %s", tstrerror(pMsg->code));
×
844
    TAOS_RETURN(pMsg->code);
×
845
  }
846

847
  SSsMigrateObj *pSsMigrate = NULL;
×
848
  SSsMigrateFileSetRsp rsp = {0};
×
849
  code = tDeserializeSSsMigrateFileSetRsp(pMsg->pCont, pMsg->contLen, &rsp);
×
850
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
851

852
  SMnode *pMnode = pMsg->info.node;
×
853
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
854
  if (pSsMigrate == NULL) {
×
855
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
856
  }
857

858
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
859
  if (vgId != rsp.vgId) {
×
860
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
861
  }
862
  if (rsp.fid != pSsMigrate->currFset.fid) {
×
863
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
864
  }
865
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_FSET_STARTING) {
×
866
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
867
  }
868
  if (rsp.nodeId <= 0) {
×
869
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
870
  }
871

872
  mInfo("ssmigrate:%d, vgId:%d, fid:%d, leader node is %d", rsp.ssMigrateId, vgId, rsp.fid, rsp.nodeId);
×
873
  pSsMigrate->currFset.nodeId = rsp.nodeId;
×
874
  pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_STARTED;
×
875
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
876
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
877
  
878
_exit:
×
879
  if (code != TSDB_CODE_SUCCESS) {
×
880
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
881
  }
882
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
883
  return code;
×
884
}
885

886

887

888
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pReq) {
×
889
  TAOS_RETURN(0);
×
890
}
891

892

893
static void mndSendFollowerSsMigrateReq(SMnode* pMnode, SSsMigrateObj *pSsMigrate) {
×
894
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, pSsMigrate->currFset.vgId);
×
895
  if (pVgroup == NULL) {
×
896
    return;
×
897
  }
898

899
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
900
  mndReleaseVgroup(pMnode, pVgroup);
×
901

902
  SSsMigrateProgress req = {
×
903
    .ssMigrateId = pSsMigrate->id,
×
904
    .nodeId = pSsMigrate->currFset.nodeId,
×
905
    .vgId = pSsMigrate->currFset.vgId,
×
906
    .fid = pSsMigrate->currFset.fid,
×
907
    .state = pSsMigrate->currFset.state,
×
908
  };
909

910
  int32_t          reqLen = tSerializeSSsMigrateProgress(NULL, 0, &req);
×
911
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
912
  SMsgHead *pHead = rpcMallocCont(contLen);
×
913
  if (pHead == NULL) {
×
914
    return;
×
915
  }
916

917
  pHead->contLen = htonl(contLen);
×
918
  pHead->vgId = htonl(req.vgId);
×
919
  TAOS_UNUSED(tSerializeSSsMigrateProgress((char *)pHead + sizeof(SMsgHead), reqLen, &req));
×
920
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_FOLLOWER_SSMIGRATE, .pCont = pHead, .contLen = contLen};
×
921

922
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
923
  if (code != 0) {
×
924
    mError("vgId:%d, ssmigrate:%d, fid:%d, failed to send follower-ssmigrate request since 0x%x", req.ssMigrateId, req.vgId, req.fid, code);
×
925
  } else {
926
    mTrace("vgId:%d, ssmigrate:%d, fid:%d, follower-ssmigrate request sent", req.ssMigrateId, req.vgId, req.fid);
×
927
  }
928
}
929

930

931

932
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pMsg) {
×
933
  int32_t code = 0, lino = 0;
×
934

935
  if (pMsg->code != 0) {
×
936
    mError("received wrong query ssmigrate progress response, error code is %s", tstrerror(pMsg->code));
×
937
    TAOS_RETURN(pMsg->code);
×
938
  }
939

940
  SSsMigrateObj *pSsMigrate = NULL;
×
941
  SSsMigrateProgress rsp = {0};
×
942
  code = tDeserializeSSsMigrateProgress(pMsg->pCont, pMsg->contLen, &rsp);
×
943
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
944

945
  SMnode *pMnode = pMsg->info.node;
×
946
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
947
  if (pSsMigrate == NULL) {
×
948
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
949
  }
950

951
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_FSET_STARTED) {
×
952
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
953
  }
954
  if (rsp.nodeId != pSsMigrate->currFset.nodeId) {
×
955
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
956
  }
957
  if (rsp.vgId != pSsMigrate->currFset.vgId) {
×
958
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
959
  }
960
  if (rsp.fid != pSsMigrate->currFset.fid) {
×
961
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
962
  }
963

964
  if (rsp.state == pSsMigrate->currFset.state) {
×
965
    mTrace("ssmigrate:%d, vgId:%d, fid:%d, state is %d", rsp.ssMigrateId, rsp.vgId, rsp.fid, rsp.state);
×
966
  } else {
967
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, state is %d", rsp.ssMigrateId, rsp.vgId, rsp.fid, rsp.state);
×
968
  }
969
  pSsMigrate->currFset.state = rsp.state;
×
970
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
971
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
972

973
  mndSendFollowerSsMigrateReq(pMnode, pSsMigrate);
×
974

975
_exit:
×
976
  if (code != TSDB_CODE_SUCCESS) {
×
977
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
978
  }
979
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
980
  return code;
×
981
}
982

983

984

985
// when query migration progress, we need to send the msg to dnode instead of vgroup,
986
// because migration may take a long time, and leader may change during the migration process,
987
// while only the initial leader vnode can handle the migration progress query.
988
void mndSendQuerySsMigrateProgressReq(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
989
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pSsMigrate->currFset.nodeId);
×
990
  if (pDnode == NULL) {
×
991
    return;
×
992
  }
993

994
  SEpSet epSet = mndGetDnodeEpset(pDnode);
×
995
  mndReleaseDnode(pMnode, pDnode);
×
996

997
  SSsMigrateProgress req = {
×
998
    .ssMigrateId = pSsMigrate->id,
×
999
    .nodeId = pSsMigrate->currFset.nodeId,
×
1000
    .vgId = pSsMigrate->currFset.vgId,
×
1001
    .fid = pSsMigrate->currFset.fid,
×
1002
    .state = SSMIGRATE_FILESET_STATE_IN_PROGRESS,
1003
  };
1004

1005
  int32_t          reqLen = tSerializeSSsMigrateProgress(NULL, 0, &req);
×
1006
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
1007
  SMsgHead *pHead = rpcMallocCont(contLen);
×
1008
  if (pHead == NULL) {
×
1009
    return;
×
1010
  }
1011

1012
  pHead->contLen = htonl(contLen);
×
1013
  pHead->vgId = htonl(req.vgId);
×
1014
  TAOS_UNUSED(tSerializeSSsMigrateProgress((char *)pHead + sizeof(SMsgHead), reqLen, &req));
×
1015
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SSMIGRATE_PROGRESS, .pCont = pHead, .contLen = contLen};
×
1016

1017
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
1018
  if (code != 0) {
×
1019
    mError("ssmigrate:%d, vgId:%d, fid:%d, failed to send ssmigrate-query-progress request since 0x%x", req.ssMigrateId, req.vgId, req.fid, code)
×
1020
  } else {
1021
    mTrace("ssmigrate:%d, vgId:%d, fid:%d, ssmigrate-query-progress request sent", req.ssMigrateId, req.vgId, req.fid);
×
1022
  }
1023
}
1024

1025

1026

1027
static void mndUpdateSsMigrateProgress(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
1028
  int32_t numVg = taosArrayGetSize(pSsMigrate->vgroups);
×
1029
  int32_t numFset = taosArrayGetSize(pSsMigrate->fileSets);
×
1030

1031
  if (pSsMigrate->vgIdx >= numVg) {
×
1032
    mInfo("ssmigrate:%d, all vgroups has been processed", pSsMigrate->id);
×
1033
    TAOS_UNUSED(mndDropSsMigrate(pMnode, pSsMigrate));
×
1034
    return;
×
1035
  }
1036

1037
  // vgroup state is init, we need to get the list of its file sets
1038
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_INIT) {
×
1039
    mndSendSsMigrateListFileSetsReq(pMnode, pSsMigrate);
×
1040
    return;
×
1041
  }
1042

1043
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
1044

1045
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_WAITING_FSET_LIST) {
×
1046
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1047
      mWarn("ssmigrate:%d, vgId:%d, haven't receive file set list in 30 seconds, skip", pSsMigrate->id, vgId);
×
1048
      pSsMigrate->vgIdx++;
×
1049
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1050
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1051
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1052
    }
1053
    return;
×
1054
  }
1055

1056
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
1057
    mndSendSsMigrateFileSetReq(pMnode, pSsMigrate);
×
1058
    return;
×
1059
  }
1060

1061
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_FSET_STARTING) {
×
1062
    // if timeout, we skip the current vgroup instead of the current file set, because timeout
1063
    // of a file set often means the vgroup is not available.
1064
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1065
      mWarn("ssmigrate:%d, vgId:%d, fid:%d, haven't receive response in 30 seconds, skip", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
1066
      pSsMigrate->vgIdx++;
×
1067
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1068
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1069
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1070
    }
1071
    return;
×
1072
  }
1073

1074
  // compact need some time, so only reset migration state here and wait the next
1075
  // tick to send the first migration request again.
1076
  if (pSsMigrate->currFset.state == SSMIGRATE_FILESET_STATE_COMPACT) {
×
1077
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, compacting, will retry later", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
1078
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
1079
    pSsMigrate->currFset.nodeId = 0;
×
1080
    pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1081
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
1082
    mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1083
    return;
×
1084
  }
1085

1086
  if (pSsMigrate->currFset.state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
1087
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1088
      mWarn("ssmigrate:%d, vgId:%d, fid:%d, haven't receive state in 30 seconds, skip", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
1089
      pSsMigrate->vgIdx++;
×
1090
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1091
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1092
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1093
    } else {
1094
      mndSendQuerySsMigrateProgressReq(pMnode, pSsMigrate);
×
1095
    }
1096
    return;
×
1097
  }
1098

1099
  // wait at least 30 seconds after the leader node has processed the file set, this is to ensure
1100
  // that the follower nodes have enough time to start process the file set, and make the code of
1101
  // tsdb simpler.
1102
  if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime < 30) {
×
1103
    return;
×
1104
  }
1105

1106
  // this file set has been processed, move to the next file set
1107
  pSsMigrate->fsetIdx++;
×
1108
  if (pSsMigrate->fsetIdx >= numFset) {
×
1109
    pSsMigrate->vgIdx++;
×
1110
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1111
  } else {
1112
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
1113
    pSsMigrate->currFset.nodeId = 0;
×
1114
    pSsMigrate->currFset.vgId = vgId;
×
1115
    pSsMigrate->currFset.fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, pSsMigrate->fsetIdx);
×
1116
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
1117
  }
1118

1119
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1120
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1121
}
1122

1123

1124
static int32_t mndProcessUpdateSsMigrateProgressTimer(SRpcMsg *pReq) {
×
1125
  mTrace("start to process update ssmigrate progress timer");
×
1126

1127
  int32_t code = 0;
×
1128
  SMnode* pMnode = pReq->info.node;
×
1129
  SSdb   *pSdb = pMnode->pSdb;
×
1130
  void *pIter = NULL;
×
1131

1132
  while (1) {
×
1133
    SSsMigrateObj *pSsMigrate = NULL;
×
1134
    pIter = sdbFetch(pMnode->pSdb, SDB_SSMIGRATE, pIter, (void **)&pSsMigrate);
×
1135
    if (pIter == NULL) {
×
1136
      break;
×
1137
    }
1138
    mndUpdateSsMigrateProgress(pMnode, pSsMigrate);
×
1139
    sdbRelease(pSdb, pSsMigrate);
×
1140
  }
1141

1142
  return 0;
×
1143
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc