• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4943

30 Jan 2026 06:19AM UTC coverage: 66.718% (-0.07%) from 66.788%
#4943

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1122 of 2018 new or added lines in 72 files covered. (55.6%)

823 existing lines in 156 files now uncovered.

204811 of 306978 relevant lines covered (66.72%)

123993567.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.76
/source/dnode/mnode/impl/src/mndSsMigrate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndSsMigrate.h"
16
#include "mndDb.h"
17
#include "mndDnode.h"
18
#include "mndPrivilege.h"
19
#include "mndShow.h"
20
#include "mndTrans.h"
21
#include "mndUser.h"
22
#include "mndVgroup.h"
23
#include "tmisce.h"
24
#include "tmsgcb.h"
25

26
#define MND_SSMIGRATE_VER_NUMBER       2
27

28
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq);
29
static int32_t mndProcessUpdateSsMigrateProgressTimer(SRpcMsg *pReq);
30
static int32_t mndProcessSsMigrateListFileSetsRsp(SRpcMsg *pMsg);
31
static int32_t mndProcessSsMigrateFileSetRsp(SRpcMsg *pMsg);
32
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pReq);
33
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pMsg);
34

35
int32_t mndInitSsMigrate(SMnode *pMnode) {
402,156✔
36
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SSMIGRATE, mndRetrieveSsMigrate);
402,156✔
37
  mndSetMsgHandle(pMnode, TDMT_MND_SSMIGRATE_DB_TIMER, mndProcessSsMigrateDbTimer);
402,156✔
38
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SSMIGRATE, mndProcessKillSsMigrateReq);
402,156✔
39
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SSMIGRATE_RSP, mndTransProcessRsp);
402,156✔
40
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_SSMIGRATE_PROGRESS_TIMER, mndProcessUpdateSsMigrateProgressTimer);
402,156✔
41
  mndSetMsgHandle(pMnode, TDMT_VND_LIST_SSMIGRATE_FILESETS_RSP, mndProcessSsMigrateListFileSetsRsp);
402,156✔
42
  mndSetMsgHandle(pMnode, TDMT_VND_SSMIGRATE_FILESET_RSP, mndProcessSsMigrateFileSetRsp);
402,156✔
43
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SSMIGRATE_PROGRESS_RSP, mndProcessQuerySsMigrateProgressRsp);
402,156✔
44
  mndSetMsgHandle(pMnode, TDMT_VND_FOLLOWER_SSMIGRATE_RSP, mndProcessFollowerSsMigrateRsp);
402,156✔
45

46
  SSdbTable table = {
402,156✔
47
      .sdbType = SDB_SSMIGRATE,
48
      .keyType = SDB_KEY_INT32,
49
      .encodeFp = (SdbEncodeFp)mndSsMigrateActionEncode,
50
      .decodeFp = (SdbDecodeFp)mndSsMigrateActionDecode,
51
      .insertFp = (SdbInsertFp)mndSsMigrateActionInsert,
52
      .updateFp = (SdbUpdateFp)mndSsMigrateActionUpdate,
53
      .deleteFp = (SdbDeleteFp)mndSsMigrateActionDelete,
54
  };
55

56
  return sdbSetTable(pMnode->pSdb, table);
402,156✔
57
}
58

59
void mndCleanupSsMigrate(SMnode *pMnode) { mDebug("mnd ssmigrate cleanup"); }
402,098✔
60

61
void tFreeSsMigrateObj(SSsMigrateObj *pSsMigrate) {
×
62
  taosArrayDestroy(pSsMigrate->vgroups);
×
63
  taosArrayDestroy(pSsMigrate->fileSets);
×
64
}
×
65

66
int32_t tSerializeSSsMigrateObj(void *buf, int32_t bufLen, const SSsMigrateObj *pObj) {
×
67
  SEncoder encoder = {0};
×
68
  int32_t  code = 0;
×
69
  int32_t  lino;
70
  int32_t  tlen;
71
  tEncoderInit(&encoder, buf, bufLen);
×
72

73
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
74
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->id));
×
75
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->dbUid));
×
76
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
×
77
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
×
78
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->stateUpdateTime));
×
79
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgIdx));
×
80
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgState));
×
81
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->fsetIdx));
×
82
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.nodeId));
×
83
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.vgId));
×
84
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.fid));
×
85
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.state));
×
86
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->currFset.startTime));
×
87

88
  int32_t numVg = pObj->vgroups ? taosArrayGetSize(pObj->vgroups) : 0;
×
89
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numVg));
×
90
  for (int32_t i = 0; i < numVg; ++i) {
×
91
    int32_t *vgId = (int32_t *)taosArrayGet(pObj->vgroups, i);
×
92
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, *vgId));
×
93
  }
94

95
  int32_t numFset = pObj->fileSets ? taosArrayGetSize(pObj->fileSets) : 0;
×
96
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numFset));
×
97
  for (int32_t i = 0; i < numFset; ++i) {
×
98
    int32_t *fsetId = (int32_t *)taosArrayGet(pObj->fileSets, i);
×
99
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, *fsetId));
×
100
  }
101
  tEndEncode(&encoder);
×
102

103
_exit:
×
104
  if (code) {
×
105
    tlen = code;
×
106
  } else {
107
    tlen = encoder.pos;
×
108
  }
109
  tEncoderClear(&encoder);
×
110
  return tlen;
×
111
}
112

113
int32_t tDeserializeSSsMigrateObj(void *buf, int32_t bufLen, SSsMigrateObj *pObj) {
×
114
  int32_t  code = TSDB_CODE_SUCCESS, lino;
×
115
  SArray *vgroups = NULL, *fileSets = NULL;
×
116
  SDecoder decoder = {0};
×
117
  tDecoderInit(&decoder, buf, bufLen);
×
118

119
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
120
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->id));
×
121
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->dbUid));
×
122
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
×
123
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
×
124
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->stateUpdateTime));
×
125
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgIdx));
×
126
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgState));
×
127
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->fsetIdx));
×
128
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.nodeId));
×
129
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.vgId));
×
130
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.fid));
×
131
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.state));
×
132
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->currFset.startTime));
×
133

134
  int32_t numVg = 0;
×
135
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numVg));
×
136
  vgroups = pObj->vgroups;
×
137
  if (vgroups) {
×
138
    taosArrayClear(vgroups);
×
139
  } else if ((vgroups = taosArrayInit(numVg, sizeof(int32_t))) == NULL) {
×
140
    code = terrno;
×
141
    goto _exit;
×
142
  }
143
  for (int32_t i = 0; i < numVg; ++i) {
×
144
    int32_t vgId = 0;
×
145
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &vgId));
×
146
    if(taosArrayPush(vgroups, &vgId) == NULL) {
×
147
      TAOS_CHECK_EXIT(terrno);
×
148
    }
149
  }
150

151
  int32_t numFset = 0;
×
152
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numFset));
×
153
  fileSets = pObj->fileSets;
×
154
  if (fileSets) {
×
155
    taosArrayClear(fileSets);
×
156
  } else if ((fileSets = taosArrayInit(numFset, sizeof(int32_t))) == NULL) {
×
157
    code = terrno;
×
158
    goto _exit;
×
159
  }
160
  for (int32_t i = 0; i < numFset; ++i) {
×
161
    int32_t fsetId = 0;
×
162
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &fsetId));
×
163
    if(taosArrayPush(fileSets, &fsetId) == NULL) {
×
164
      TAOS_CHECK_EXIT(terrno);
×
165
    }
166
  }
167

168
  tEndDecode(&decoder);
×
169

170
_exit:
×
171
  tDecoderClear(&decoder);
×
172
  if (code == TSDB_CODE_SUCCESS) {
×
173
    pObj->vgroups = vgroups;
×
174
  } else if (pObj->vgroups) {
×
175
    taosArrayClear(pObj->vgroups);
×
176
  } else {
177
    taosArrayDestroy(vgroups);
×
178
  }
179
  if (code == TSDB_CODE_SUCCESS) {
×
180
    pObj->fileSets = fileSets;
×
181
  } else if (pObj->fileSets) {
×
182
    taosArrayClear(pObj->fileSets);
×
183
  } else {
184
    taosArrayDestroy(fileSets);
×
185
  }
186
  return code;
×
187
}
188

189
SSdbRaw *mndSsMigrateActionEncode(SSsMigrateObj *pSsMigrate) {
×
190
  int32_t code = 0;
×
191
  int32_t lino = 0;
×
192
  terrno = TSDB_CODE_SUCCESS;
×
193

194
  void    *buf = NULL;
×
195
  SSdbRaw *pRaw = NULL;
×
196

197
  int32_t tlen = tSerializeSSsMigrateObj(NULL, 0, pSsMigrate);
×
198
  if (tlen < 0) {
×
199
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
200
    goto OVER;
×
201
  }
202

203
  int32_t size = sizeof(int32_t) + tlen;
×
204
  pRaw = sdbAllocRaw(SDB_SSMIGRATE, MND_SSMIGRATE_VER_NUMBER, size);
×
205
  if (pRaw == NULL) {
×
206
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
207
    goto OVER;
×
208
  }
209

210
  buf = taosMemoryMalloc(tlen);
×
211
  if (buf == NULL) {
×
212
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
213
    goto OVER;
×
214
  }
215

216
  tlen = tSerializeSSsMigrateObj(buf, tlen, pSsMigrate);
×
217
  if (tlen < 0) {
×
218
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
219
    goto OVER;
×
220
  }
221

222
  int32_t dataPos = 0;
×
223
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
×
224
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
225
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
×
226

227
OVER:
×
228
  taosMemoryFreeClear(buf);
×
229
  if (terrno != TSDB_CODE_SUCCESS) {
×
230
    mError("ssmigrate:%" PRId32 ", failed to encode to raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
231
    sdbFreeRaw(pRaw);
×
232
    return NULL;
×
233
  }
234

235
  mTrace("ssmigrate:%" PRId32 ", encode to raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
×
236
  return pRaw;
×
237
}
238

239
SSdbRow *mndSsMigrateActionDecode(SSdbRaw *pRaw) {
×
240
  int32_t      code = 0;
×
241
  int32_t      lino = 0;
×
242
  SSdbRow     *pRow = NULL;
×
243
  SSsMigrateObj *pSsMigrate = NULL;
×
244
  void        *buf = NULL;
×
245
  terrno = TSDB_CODE_SUCCESS;
×
246

247
  int8_t sver = 0;
×
248
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
×
249
    goto OVER;
×
250
  }
251

252
  if (sver != MND_SSMIGRATE_VER_NUMBER) {
×
253
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
254
    mError("ssmigrate read invalid ver, data ver: %d, curr ver: %d", sver, MND_SSMIGRATE_VER_NUMBER);
×
255
    goto OVER;
×
256
  }
257

258
  pRow = sdbAllocRow(sizeof(SSsMigrateObj));
×
259
  if (pRow == NULL) {
×
260
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
261
    goto OVER;
×
262
  }
263

264
  pSsMigrate = sdbGetRowObj(pRow);
×
265
  if (pSsMigrate == NULL) {
×
266
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
267
    goto OVER;
×
268
  }
269

270
  int32_t tlen;
×
271
  int32_t dataPos = 0;
×
272
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
×
273
  buf = taosMemoryMalloc(tlen + 1);
×
274
  if (buf == NULL) {
×
275
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
276
    goto OVER;
×
277
  }
278
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
279

280
  if ((terrno = tDeserializeSSsMigrateObj(buf, tlen, pSsMigrate)) < 0) {
×
281
    goto OVER;
×
282
  }
283

284
OVER:
×
285
  taosMemoryFreeClear(buf);
×
286
  if (terrno != TSDB_CODE_SUCCESS) {
×
287
    mError("ssmigrate:%" PRId32 ", failed to decode from raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
288
    taosMemoryFreeClear(pRow);
×
289
    return NULL;
×
290
  }
291

292
  mTrace("ssmigrate:%" PRId32 ", decode from raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
×
293
  return pRow;
×
294
}
295

296
int32_t mndSsMigrateActionInsert(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
×
297
  mTrace("ssmigrate:%" PRId32 ", perform insert action", pSsMigrate->id);
×
298
  return 0;
×
299
}
300

301
int32_t mndSsMigrateActionDelete(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
×
302
  mTrace("ssmigrate:%" PRId32 ", perform delete action", pSsMigrate->id);
×
303
  tFreeSsMigrateObj(pSsMigrate);
×
304
  return 0;
×
305
}
306

307
int32_t mndSsMigrateActionUpdate(SSdb *pSdb, SSsMigrateObj *pOldSsMigrate, SSsMigrateObj *pNewSsMigrate) {
×
308
  mTrace("ssmigrate:%" PRId32 ", perform update action, old row:%p new row:%p", pOldSsMigrate->id, pOldSsMigrate,
×
309
         pNewSsMigrate);
310

311
  return 0;
×
312
}
313

314
SSsMigrateObj *mndAcquireSsMigrate(SMnode *pMnode, int64_t ssMigrateId) {
×
315
  SSdb        *pSdb = pMnode->pSdb;
×
316
  SSsMigrateObj *pSsMigrate = sdbAcquire(pSdb, SDB_SSMIGRATE, &ssMigrateId);
×
317
  if (pSsMigrate == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
318
    terrno = TSDB_CODE_SUCCESS;
×
319
  }
320
  return pSsMigrate;
×
321
}
322

323
void mndReleaseSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
324
  SSdb *pSdb = pMnode->pSdb;
×
325
  sdbRelease(pSdb, pSsMigrate);
×
326
  pSsMigrate = NULL;
×
327
}
×
328

329
int32_t mndSsMigrateGetDbName(SMnode *pMnode, int32_t ssMigrateId, char *dbname, int32_t len) {
×
330
  int32_t      code = 0;
×
331
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, ssMigrateId);
×
332
  if (pSsMigrate == NULL) {
×
333
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
334
    if (terrno != 0) code = terrno;
×
335
    TAOS_RETURN(code);
×
336
  }
337

338
  tstrncpy(dbname, pSsMigrate->dbname, len);
×
339
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
340
  TAOS_RETURN(code);
×
341
}
342

343
// ssmigrate db
344
int32_t mndAddSsMigrateToTran(SMnode *pMnode, STrans *pTrans, SSsMigrateObj *pSsMigrate, SDbObj *pDb) {
×
345
  int32_t code = 0;
×
346
  SSdb   *pSdb = pMnode->pSdb;
×
347
  void   *pIter = NULL;
×
348

349
  pSsMigrate->dbUid = pDb->uid;
×
350
  pSsMigrate->id = tGenIdPI32();
×
351
  tstrncpy(pSsMigrate->dbname, pDb->name, sizeof(pSsMigrate->dbname));
×
352
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
353
  pSsMigrate->vgIdx = 0;
×
354
  pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
355
  pSsMigrate->fsetIdx = 0;
×
356

357
  pSsMigrate->vgroups = taosArrayInit(8, sizeof(int32_t));
×
358
  if (pSsMigrate->vgroups == NULL) {
×
359
    TAOS_RETURN(terrno);
×
360
  }
361

362
  while (1) {
×
363
    SVgObj *pVgroup = NULL;
×
364
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
365
    if (pIter == NULL) break;
×
366

367
    if (pVgroup->mountVgId || pVgroup->dbUid != pDb->uid) {
×
368
      sdbRelease(pSdb, pVgroup);
×
369
      continue;
×
370
    }
371

372
    int32_t vgId = pVgroup->vgId;
×
373
    sdbRelease(pSdb, pVgroup);
×
374
    if (taosArrayPush(pSsMigrate->vgroups, &vgId) == NULL) {
×
375
      code = terrno;
×
376
      taosArrayDestroy(pSsMigrate->vgroups);
×
377
      pSsMigrate->vgroups = NULL;
×
378
      sdbCancelFetch(pSdb, pIter);
×
379
      sdbRelease(pSdb, pVgroup);
×
380
      TAOS_RETURN(code);
×
381
    }
382
  }
383

384
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
385
  code = terrno;
×
386
  taosArrayDestroy(pSsMigrate->vgroups);
×
387
  pSsMigrate->vgroups = NULL;
×
388
  if (pRaw == NULL) {
×
389
    TAOS_RETURN(code);
×
390
  }
391
  if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
×
392
    sdbFreeRaw(pRaw);
×
393
    TAOS_RETURN(code);
×
394
  }
395

396
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
×
397
    sdbFreeRaw(pRaw);
×
398
    TAOS_RETURN(code);
×
399
  }
400

401
  mInfo("trans:%d, ssmigrate:%d, db:%s, has been added", pTrans->id, pSsMigrate->id, pSsMigrate->dbname);
×
402
  return 0;
×
403
}
404

405
// retrieve ssmigrate
406
int32_t mndRetrieveSsMigrate(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
407
  SMnode        *pMnode = pReq->info.node;
×
408
  SSdb          *pSdb = pMnode->pSdb;
×
409
  int32_t        numOfRows = 0;
×
410
  SSsMigrateObj *pSsMigrate = NULL;
×
411
  char          *sep = NULL;
×
412
  SDbObj        *pDb = NULL;
×
413
  int32_t        code = 0;
×
414
  int32_t        lino = 0;
×
415
  SUserObj      *pUser = NULL;
×
416
  SDbObj        *pIterDb = NULL;
×
417
  char           objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
×
418
  bool           showAll = false, showIter = false;
×
419
  int64_t        dbUid = 0;
×
420

421
  if (strlen(pShow->db) > 0) {
×
422
    sep = strchr(pShow->db, '.');
×
423
    if (sep &&
×
424
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
425
      sep++;
×
426
    } else {
427
      pDb = mndAcquireDb(pMnode, pShow->db);
×
428
      if (pDb == NULL) return terrno;
×
429
    }
430
  }
431

NEW
432
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_SSMIGRATES, PRIV_OBJ_DB, 0,
×
433
                                   _OVER);
434

435
  while (numOfRows < rows) {
×
436
    pShow->pIter = sdbFetch(pSdb, SDB_SSMIGRATE, pShow->pIter, (void **)&pSsMigrate);
×
437
    if (pShow->pIter == NULL) break;
×
438

439
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pSsMigrate->dbname, pSsMigrate, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_SSMIGRATES, _OVER);
×
440

441
    SColumnInfoData *pColInfo;
442
    SName            n;
443
    int32_t          cols = 0;
×
444

445
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
446

447
    // ssmigrate_id
448
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
449
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->id, false), pSsMigrate, &lino, _OVER);
×
450

451
    // db_name
452
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
453
    if (pDb != NULL || !IS_SYS_DBNAME(pSsMigrate->dbname)) {
×
454
      SName name = {0};
×
455
      TAOS_CHECK_GOTO(tNameFromString(&name, pSsMigrate->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
×
456
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
×
457
    } else {
458
      tstrncpy(varDataVal(tmpBuf), pSsMigrate->dbname, TSDB_SHOW_SQL_LEN);
×
459
    }
460
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
×
461
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pSsMigrate, &lino, _OVER);
×
462

463
    // start_time
464
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
465
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->startTime, false), pSsMigrate, &lino, _OVER);
×
466

467
    // number_vgroup
468
    int32_t numVg = taosArrayGetSize(pSsMigrate->vgroups);
×
469
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
470
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&numVg, false), pSsMigrate, &lino, _OVER);
×
471

472
    // migrated_vgroup
473
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
474
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->vgIdx, false), pSsMigrate, &lino, _OVER);
×
475

476
    if (pSsMigrate->vgIdx < numVg) {
×
477
      // vgroup_id
478
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
479
      int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
480
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&vgId, false), pSsMigrate, &lino, _OVER);
×
481
      
482
      // number_fileset
483
      int32_t numFset = taosArrayGetSize(pSsMigrate->fileSets);
×
484
      if (pSsMigrate->vgState < SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
485
        numFset = 0;
×
486
      }
487
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
488
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&numFset, false), pSsMigrate, &lino, _OVER);
×
489

490
      // migrated_fileset
491
      int32_t fsetIdx = pSsMigrate->fsetIdx;
×
492
      if (pSsMigrate->vgState < SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
493
        fsetIdx = 0;
×
494
      }
495
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
496
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&fsetIdx, false), pSsMigrate, &lino, _OVER);
×
497

498
      // fileset_id
499
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
500
      if (fsetIdx < numFset) {
×
501
        int32_t fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, fsetIdx);
×
502
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&fid, false), pSsMigrate, &lino, _OVER);
×
503
      } else {
504
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
505
      }
506
    } else {
507
      // vgroup_id
508
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
509
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
510

511
      // number_fileset
512
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
513
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
514

515
      // migrated_fileset
516
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
517
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
518

519
      // fileset_id
520
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
521
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
522
    }
523

524
    numOfRows++;
×
525
    sdbRelease(pSdb, pSsMigrate);
×
526
  }
527

528
_OVER:
×
529
  if (pUser) mndReleaseUser(pMnode, pUser);
×
530
  mndReleaseDb(pMnode, pDb);
×
531
  if (code != 0) {
×
532
    mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
533
    TAOS_RETURN(code);
×
534
  }
535
  pShow->numOfRows += numOfRows;
×
536
  return numOfRows;
×
537
}
538

539

540
int32_t mndSsMigrateDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb);
541

542
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq) {
×
543
  SMnode *pMnode = pReq->info.node;
×
544
  void *pIter = NULL;
×
545

546
  while (1) {
×
547
    SDbObj *pDb = NULL;
×
548
    pIter = sdbFetch(pMnode->pSdb, SDB_DB, pIter, (void **)&pDb);
×
549
    if (pIter == NULL) {
×
550
      break;
×
551
    }
552
    int32_t code = mndSsMigrateDb(pMnode, NULL, pDb);
×
553
    sdbRelease(pMnode->pSdb, pDb);
×
554
    if (code == TSDB_CODE_SUCCESS) {
×
555
      mInfo("ssmigrate db:%s, has been triggered by timer", pDb->name);
×
556
    } else {
557
      mError("failed to trigger ssmigrate db:%s, code:%d, %s", pDb->name, code, tstrerror(code));
×
558
    }
559
  }
560

561
  TAOS_RETURN(0);
×
562
}
563

564

565
static int32_t mndDropSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
566
  int32_t code = 0, lino = 0;
×
567

568
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "drop-ssmigrate");
×
569
  if (pTrans == NULL) {
×
570
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
571
  }
572

573
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
×
574

575
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
576
  if (pRaw == NULL) {
×
577
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
578
  }
579
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED)) != 0) {
×
580
    sdbFreeRaw(pRaw);
×
581
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
582
  }
583

584
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _exit);
×
585
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
×
586

587
_exit:
×
588
  mndTransDrop(pTrans);
×
589
  if (code == TSDB_CODE_SUCCESS) {
×
590
    mInfo("ssmigrate:%d was dropped successfully", pSsMigrate->id);
×
591
  } else {
592
    mError("ssmigrate:%d, failed to drop at lino %d since %s", pSsMigrate->id, lino, tstrerror(code));
×
593
  }
594
  return code;
×
595
}
596

597

598
static void mndUpdateSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
599
  int32_t code = 0, lino = 0;
×
600

601
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-ssmigrate");
×
602
  if (pTrans == NULL) {
×
603
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
604
  }
605
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
×
606

607
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
608
  if (pRaw == NULL) {
×
609
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
610
  }
611
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
×
612
    sdbFreeRaw(pRaw);
×
613
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
614
  }
615

616
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _exit);
×
617
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
×
618

619
_exit:
×
620
  mndTransDrop(pTrans);
×
621
  if (code == TSDB_CODE_SUCCESS) {
×
622
    mTrace("ssmigrate:%d was updated successfully", pSsMigrate->id);
×
623
  } else {
624
    mError("ssmigrate:%d, failed to update at lino %d since %s", pSsMigrate->id, lino, tstrerror(code));
×
625
  }
626
}
×
627

628

629
static int32_t mndKillSsMigrate(SMnode *pMnode, SRpcMsg *pReq, SSsMigrateObj *pSsMigrate) {
×
630
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
631
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
632
  if (pVgroup == NULL) {
×
633
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
634
  }
635

636
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
637
  mndReleaseVgroup(pMnode, pVgroup);
×
638

639
  SVnodeKillSsMigrateReq req = {.ssMigrateId = pSsMigrate->id};
×
640
  int32_t   reqLen = tSerializeSVnodeKillSsMigrateReq(NULL, 0, &req);
×
641
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
642
  SMsgHead *pHead = rpcMallocCont(contLen);
×
643
  if (pHead == NULL) {
×
644
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
645
  }
646

647
  pHead->contLen = htonl(contLen);
×
648
  pHead->vgId = htonl(vgId);
×
649
  int32_t ret = 0;
×
650
  if ((ret = tSerializeSVnodeKillSsMigrateReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
651
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
652
  }
653

654
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_KILL_SSMIGRATE, .pCont = pHead, .contLen = contLen};
×
655
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
656
  if (code != 0) {
×
657
    mError("ssmigrate:%d, vgId:%d, failed to send kill ssmigrate request to vnode since 0x%x", req.ssMigrateId, vgId, code);
×
658
  } else {
659
    mInfo("ssmigrate:%d, vgId:%d, kill ssmigrate request was sent to vnode", req.ssMigrateId, vgId);
×
660
  }
661

662
  return mndDropSsMigrate(pMnode, pSsMigrate);
×
663
}
664

665
int32_t mndProcessKillSsMigrateReq(SRpcMsg *pReq) {
×
666
  int32_t         code = 0;
×
667
  int32_t         lino = 0;
×
668
  SKillSsMigrateReq killReq = {0};
×
669

670
  if ((code = tDeserializeSKillSsMigrateReq(pReq->pCont, pReq->contLen, &killReq)) != 0) {
×
671
    TAOS_RETURN(code);
×
672
  }
673

674
  mInfo("start to kill ssmigrate:%" PRId32, killReq.ssMigrateId);
×
675

676
  SMnode      *pMnode = pReq->info.node;
×
677
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, killReq.ssMigrateId);
×
678
  if (pSsMigrate == NULL) {
×
679
    code = TSDB_CODE_MND_INVALID_SSMIGRATE_ID;
×
680
    tFreeSKillSsMigrateReq(&killReq);
×
681
    TAOS_RETURN(code);
×
682
  }
683

684
  //TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_SSMIGRATE_DB), &lino, _OVER);
685

686
  TAOS_CHECK_GOTO(mndKillSsMigrate(pMnode, pReq, pSsMigrate), &lino, _OVER);
×
687

688
_OVER:
×
689
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
690
    mError("failed to kill ssmigrate %" PRId32 " since %s", killReq.ssMigrateId, tstrerror(code));
×
691
  }
692

693
  tFreeSKillSsMigrateReq(&killReq);
×
694
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
695

696
  TAOS_RETURN(code);
×
697
}
698

699

700
static void mndSendSsMigrateListFileSetsReq(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
701
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
702
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
703
  if (pVgroup == NULL) {
×
704
    mError("ssmigrate:%d, vgId:%d, vgroup does not exist in %s", pSsMigrate->id, vgId, __func__);
×
705
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
706
    pSsMigrate->vgIdx++;
×
707
    return;
×
708
  }
709

710
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
711
  mndReleaseVgroup(pMnode, pVgroup);
×
712

713
  SListSsMigrateFileSetsReq req = {.ssMigrateId = pSsMigrate->id};
×
714
  int32_t   reqLen = tSerializeSListSsMigrateFileSetsReq(NULL, 0, &req);
×
715
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
716
  SMsgHead *pHead = rpcMallocCont(contLen);
×
717
  if (pHead == NULL) {
×
718
    return;
×
719
  }
720

721
  pHead->contLen = htonl(contLen);
×
722
  pHead->vgId = htonl(vgId);
×
723
  int32_t ret = 0;
×
724
  if ((ret = tSerializeSListSsMigrateFileSetsReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
725
    return;
×
726
  }
727

728
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_LIST_SSMIGRATE_FILESETS, .pCont = pHead, .contLen = contLen};
×
729
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
730
  if (code != 0) {
×
731
    mError("ssmigrate:%d, vgId:%d, failed to send list filesets request to vnode since 0x%x", req.ssMigrateId, vgId, code);
×
732
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
733
    pSsMigrate->vgIdx++;
×
734
  } else {
735
    mInfo("ssmigrate:%d, vgId:%d, list filesets request was sent to vnode", req.ssMigrateId, vgId);
×
736
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_WAITING_FSET_LIST;
×
737
  }
738

739
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
740
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
741
}
742

743

744
static int32_t mndProcessSsMigrateListFileSetsRsp(SRpcMsg *pMsg) {
×
745
  int32_t code = 0, lino = 0;
×
746

747
  if (pMsg->code != 0) {
×
748
    mError("received wrong ssmigrate list filesets response, req code is %s", tstrerror(pMsg->code));
×
749
    TAOS_RETURN(pMsg->code);
×
750
  }
751

752
  SSsMigrateObj *pSsMigrate = NULL;
×
753
  SListSsMigrateFileSetsRsp rsp = {0};
×
754
  code = tDeserializeSListSsMigrateFileSetsRsp(pMsg->pCont, pMsg->contLen, &rsp);
×
755
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
756

757
  SMnode *pMnode = pMsg->info.node;
×
758
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
759
  if (pSsMigrate == NULL) {
×
760
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
761
  }
762

763
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
764
  if (vgId != rsp.vgId) {
×
765
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
766
  }
767

768
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_WAITING_FSET_LIST) {
×
769
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
770
  }
771

772
  // we need to use the new filesets to update the SSsMigrateObj,
773
  // swap is only to make the it is easier to free both of them.
774
  SArray* tmp = pSsMigrate->fileSets;
×
775
  pSsMigrate->fileSets = rsp.pFileSets;
×
776
  rsp.pFileSets = tmp;
×
777

778
  pSsMigrate->fsetIdx = 0;
×
779
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
780
  if (taosArrayGetSize(pSsMigrate->fileSets) == 0) {
×
781
    mInfo("ssmigrate:%d, vgId:%d, no filesets to migrate.", pSsMigrate->id, rsp.vgId);
×
782
    pSsMigrate->vgIdx++;
×
783
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
784
  } else {
785
    mInfo("ssmigrate:%d, vgId:%d, filesets received.", pSsMigrate->id, rsp.vgId);
×
786
    pSsMigrate->currFset.nodeId = 0;
×
787
    pSsMigrate->currFset.vgId = vgId;
×
788
    pSsMigrate->currFset.fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, 0);
×
789
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
790
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
791
  }
792

793
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
794
  
795
_exit:
×
796
  if (code != TSDB_CODE_SUCCESS) {
×
797
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
798
  }
799
  tFreeSListSsMigrateFileSetsRsp(&rsp);
×
800
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
801
  return code;
×
802
}
803

804

805
static void mndSendSsMigrateFileSetReq(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
806
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
807
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
808
  if (pVgroup == NULL) {
×
809
    mError("ssmigrate:%d, vgId:%d, vgroup does not exist in %s", pSsMigrate->id, vgId, __func__);
×
810
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
811
    pSsMigrate->vgIdx++;
×
812
    return;
×
813
  }
814

815
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
816
  mndReleaseVgroup(pMnode, pVgroup);
×
817

818
  SSsMigrateFileSetReq req = { 0 };
×
819
  req.ssMigrateId = pSsMigrate->id;
×
820
  req.nodeId = 0;
×
821
  req.fid = pSsMigrate->currFset.fid;
×
822
  req.startTimeSec = taosGetTimestampSec();
×
823

824
  int32_t   reqLen = tSerializeSSsMigrateFileSetReq(NULL, 0, &req);
×
825
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
826
  SMsgHead *pHead = rpcMallocCont(contLen);
×
827
  if (pHead == NULL) {
×
828
    return;
×
829
  }
830

831
  pHead->contLen = htonl(contLen);
×
832
  pHead->vgId = htonl(vgId);
×
833
  int32_t ret = 0;
×
834
  if ((ret = tSerializeSSsMigrateFileSetReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
835
    return;
×
836
  }
837

838
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_SSMIGRATE_FILESET, .pCont = pHead, .contLen = contLen};
×
839
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
840
  if (code != 0) {
×
841
    mError("ssmigrate:%d, vgId:%d, fid:%d, failed to send migrate fileset request to vnode since 0x%x", req.ssMigrateId, vgId, req.fid, code);
×
842
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
843
    pSsMigrate->vgIdx++;
×
844
  } else {
845
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, migrate fileset request was sent to vnode", req.ssMigrateId, vgId, req.fid);
×
846
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_STARTING;
×
847
    pSsMigrate->currFset.startTime = req.startTimeSec;
×
848
  }
849

850
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
851
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
852
}
853

854

855
static int32_t mndProcessSsMigrateFileSetRsp(SRpcMsg *pMsg) {
×
856
  int32_t code = 0, lino = 0;
×
857

858
  if (pMsg->code != 0) {
×
859
    mError("received wrong ssmigrate fileset response, error code is %s", tstrerror(pMsg->code));
×
860
    TAOS_RETURN(pMsg->code);
×
861
  }
862

863
  SSsMigrateObj *pSsMigrate = NULL;
×
864
  SSsMigrateFileSetRsp rsp = {0};
×
865
  code = tDeserializeSSsMigrateFileSetRsp(pMsg->pCont, pMsg->contLen, &rsp);
×
866
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
867

868
  SMnode *pMnode = pMsg->info.node;
×
869
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
870
  if (pSsMigrate == NULL) {
×
871
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
872
  }
873

874
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
875
  if (vgId != rsp.vgId) {
×
876
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
877
  }
878
  if (rsp.fid != pSsMigrate->currFset.fid) {
×
879
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
880
  }
881
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_FSET_STARTING) {
×
882
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
883
  }
884
  if (rsp.nodeId <= 0) {
×
885
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
886
  }
887

888
  mInfo("ssmigrate:%d, vgId:%d, fid:%d, leader node is %d", rsp.ssMigrateId, vgId, rsp.fid, rsp.nodeId);
×
889
  pSsMigrate->currFset.nodeId = rsp.nodeId;
×
890
  pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_STARTED;
×
891
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
892
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
893
  
894
_exit:
×
895
  if (code != TSDB_CODE_SUCCESS) {
×
896
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
897
  }
898
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
899
  return code;
×
900
}
901

902

903

904
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pReq) {
×
905
  TAOS_RETURN(0);
×
906
}
907

908

909
static void mndSendFollowerSsMigrateReq(SMnode* pMnode, SSsMigrateObj *pSsMigrate) {
×
910
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, pSsMigrate->currFset.vgId);
×
911
  if (pVgroup == NULL) {
×
912
    return;
×
913
  }
914

915
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
916
  mndReleaseVgroup(pMnode, pVgroup);
×
917

918
  SSsMigrateProgress req = {
×
919
    .ssMigrateId = pSsMigrate->id,
×
920
    .nodeId = pSsMigrate->currFset.nodeId,
×
921
    .vgId = pSsMigrate->currFset.vgId,
×
922
    .fid = pSsMigrate->currFset.fid,
×
923
    .state = pSsMigrate->currFset.state,
×
924
  };
925

926
  int32_t          reqLen = tSerializeSSsMigrateProgress(NULL, 0, &req);
×
927
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
928
  SMsgHead *pHead = rpcMallocCont(contLen);
×
929
  if (pHead == NULL) {
×
930
    return;
×
931
  }
932

933
  pHead->contLen = htonl(contLen);
×
934
  pHead->vgId = htonl(req.vgId);
×
935
  TAOS_UNUSED(tSerializeSSsMigrateProgress((char *)pHead + sizeof(SMsgHead), reqLen, &req));
×
936
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_FOLLOWER_SSMIGRATE, .pCont = pHead, .contLen = contLen};
×
937

938
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
939
  if (code != 0) {
×
940
    mError("vgId:%d, ssmigrate:%d, fid:%d, failed to send follower-ssmigrate request since 0x%x", req.ssMigrateId, req.vgId, req.fid, code);
×
941
  } else {
942
    mTrace("vgId:%d, ssmigrate:%d, fid:%d, follower-ssmigrate request sent", req.ssMigrateId, req.vgId, req.fid);
×
943
  }
944
}
945

946

947

948
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pMsg) {
×
949
  int32_t code = 0, lino = 0;
×
950

951
  if (pMsg->code != 0) {
×
952
    mError("received wrong query ssmigrate progress response, error code is %s", tstrerror(pMsg->code));
×
953
    TAOS_RETURN(pMsg->code);
×
954
  }
955

956
  SSsMigrateObj *pSsMigrate = NULL;
×
957
  SSsMigrateProgress rsp = {0};
×
958
  code = tDeserializeSSsMigrateProgress(pMsg->pCont, pMsg->contLen, &rsp);
×
959
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
960

961
  SMnode *pMnode = pMsg->info.node;
×
962
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
963
  if (pSsMigrate == NULL) {
×
964
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
965
  }
966

967
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_FSET_STARTED) {
×
968
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
969
  }
970
  if (rsp.nodeId != pSsMigrate->currFset.nodeId) {
×
971
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
972
  }
973
  if (rsp.vgId != pSsMigrate->currFset.vgId) {
×
974
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
975
  }
976
  if (rsp.fid != pSsMigrate->currFset.fid) {
×
977
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
978
  }
979

980
  if (rsp.state == pSsMigrate->currFset.state) {
×
981
    mTrace("ssmigrate:%d, vgId:%d, fid:%d, state is %d", rsp.ssMigrateId, rsp.vgId, rsp.fid, rsp.state);
×
982
  } else {
983
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, state is %d", rsp.ssMigrateId, rsp.vgId, rsp.fid, rsp.state);
×
984
  }
985
  pSsMigrate->currFset.state = rsp.state;
×
986
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
987
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
988

989
  mndSendFollowerSsMigrateReq(pMnode, pSsMigrate);
×
990

991
_exit:
×
992
  if (code != TSDB_CODE_SUCCESS) {
×
993
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
994
  }
995
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
996
  return code;
×
997
}
998

999

1000

1001
// when query migration progress, we need to send the msg to dnode instead of vgroup,
1002
// because migration may take a long time, and leader may change during the migration process,
1003
// while only the initial leader vnode can handle the migration progress query.
1004
void mndSendQuerySsMigrateProgressReq(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
1005
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pSsMigrate->currFset.nodeId);
×
1006
  if (pDnode == NULL) {
×
1007
    return;
×
1008
  }
1009

1010
  SEpSet epSet = mndGetDnodeEpset(pDnode);
×
1011
  mndReleaseDnode(pMnode, pDnode);
×
1012

1013
  SSsMigrateProgress req = {
×
1014
    .ssMigrateId = pSsMigrate->id,
×
1015
    .nodeId = pSsMigrate->currFset.nodeId,
×
1016
    .vgId = pSsMigrate->currFset.vgId,
×
1017
    .fid = pSsMigrate->currFset.fid,
×
1018
    .state = SSMIGRATE_FILESET_STATE_IN_PROGRESS,
1019
  };
1020

1021
  int32_t          reqLen = tSerializeSSsMigrateProgress(NULL, 0, &req);
×
1022
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
1023
  SMsgHead *pHead = rpcMallocCont(contLen);
×
1024
  if (pHead == NULL) {
×
1025
    return;
×
1026
  }
1027

1028
  pHead->contLen = htonl(contLen);
×
1029
  pHead->vgId = htonl(req.vgId);
×
1030
  TAOS_UNUSED(tSerializeSSsMigrateProgress((char *)pHead + sizeof(SMsgHead), reqLen, &req));
×
1031
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SSMIGRATE_PROGRESS, .pCont = pHead, .contLen = contLen};
×
1032

1033
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
1034
  if (code != 0) {
×
1035
    mError("ssmigrate:%d, vgId:%d, fid:%d, failed to send ssmigrate-query-progress request since 0x%x", req.ssMigrateId, req.vgId, req.fid, code)
×
1036
  } else {
1037
    mTrace("ssmigrate:%d, vgId:%d, fid:%d, ssmigrate-query-progress request sent", req.ssMigrateId, req.vgId, req.fid);
×
1038
  }
1039
}
1040

1041

1042

1043
static void mndUpdateSsMigrateProgress(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
1044
  int32_t numVg = taosArrayGetSize(pSsMigrate->vgroups);
×
1045
  int32_t numFset = taosArrayGetSize(pSsMigrate->fileSets);
×
1046

1047
  if (pSsMigrate->vgIdx >= numVg) {
×
1048
    mInfo("ssmigrate:%d, all vgroups has been processed", pSsMigrate->id);
×
1049
    TAOS_UNUSED(mndDropSsMigrate(pMnode, pSsMigrate));
×
1050
    return;
×
1051
  }
1052

1053
  // vgroup state is init, we need to get the list of its file sets
1054
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_INIT) {
×
1055
    mndSendSsMigrateListFileSetsReq(pMnode, pSsMigrate);
×
1056
    return;
×
1057
  }
1058

1059
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
1060

1061
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_WAITING_FSET_LIST) {
×
1062
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1063
      mWarn("ssmigrate:%d, vgId:%d, haven't receive file set list in 30 seconds, skip", pSsMigrate->id, vgId);
×
1064
      pSsMigrate->vgIdx++;
×
1065
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1066
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1067
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1068
    }
1069
    return;
×
1070
  }
1071

1072
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
1073
    mndSendSsMigrateFileSetReq(pMnode, pSsMigrate);
×
1074
    return;
×
1075
  }
1076

1077
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_FSET_STARTING) {
×
1078
    // if timeout, we skip the current vgroup instead of the current file set, because timeout
1079
    // of a file set often means the vgroup is not available.
1080
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1081
      mWarn("ssmigrate:%d, vgId:%d, fid:%d, haven't receive response in 30 seconds, skip", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
1082
      pSsMigrate->vgIdx++;
×
1083
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1084
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1085
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1086
    }
1087
    return;
×
1088
  }
1089

1090
  // compact need some time, so only reset migration state here and wait the next
1091
  // tick to send the first migration request again.
1092
  if (pSsMigrate->currFset.state == SSMIGRATE_FILESET_STATE_COMPACT) {
×
1093
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, compacting, will retry later", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
1094
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
1095
    pSsMigrate->currFset.nodeId = 0;
×
1096
    pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1097
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
1098
    mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1099
    return;
×
1100
  }
1101

1102
  if (pSsMigrate->currFset.state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
1103
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1104
      mWarn("ssmigrate:%d, vgId:%d, fid:%d, haven't receive state in 30 seconds, skip", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
1105
      pSsMigrate->vgIdx++;
×
1106
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1107
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1108
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1109
    } else {
1110
      mndSendQuerySsMigrateProgressReq(pMnode, pSsMigrate);
×
1111
    }
1112
    return;
×
1113
  }
1114

1115
  // wait at least 30 seconds after the leader node has processed the file set, this is to ensure
1116
  // that the follower nodes have enough time to start process the file set, and make the code of
1117
  // tsdb simpler.
1118
  if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime < 30) {
×
1119
    return;
×
1120
  }
1121

1122
  // this file set has been processed, move to the next file set
1123
  pSsMigrate->fsetIdx++;
×
1124
  if (pSsMigrate->fsetIdx >= numFset) {
×
1125
    pSsMigrate->vgIdx++;
×
1126
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1127
  } else {
1128
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
1129
    pSsMigrate->currFset.nodeId = 0;
×
1130
    pSsMigrate->currFset.vgId = vgId;
×
1131
    pSsMigrate->currFset.fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, pSsMigrate->fsetIdx);
×
1132
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
1133
  }
1134

1135
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1136
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1137
}
1138

1139

1140
static int32_t mndProcessUpdateSsMigrateProgressTimer(SRpcMsg *pReq) {
×
1141
  mTrace("start to process update ssmigrate progress timer");
×
1142

1143
  int32_t code = 0;
×
1144
  SMnode* pMnode = pReq->info.node;
×
1145
  SSdb   *pSdb = pMnode->pSdb;
×
1146
  void *pIter = NULL;
×
1147

1148
  while (1) {
×
1149
    SSsMigrateObj *pSsMigrate = NULL;
×
1150
    pIter = sdbFetch(pMnode->pSdb, SDB_SSMIGRATE, pIter, (void **)&pSsMigrate);
×
1151
    if (pIter == NULL) {
×
1152
      break;
×
1153
    }
1154
    mndUpdateSsMigrateProgress(pMnode, pSsMigrate);
×
1155
    sdbRelease(pSdb, pSsMigrate);
×
1156
  }
1157

1158
  return 0;
×
1159
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc