• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4986

15 Mar 2026 08:32AM UTC coverage: 37.305% (-31.3%) from 68.601%
#4986

push

travis-ci

tomchon
test: keep docs and unit test

125478 of 336361 relevant lines covered (37.3%)

1134847.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.76
/source/dnode/mnode/impl/src/mndSsMigrate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndSsMigrate.h"
16
#include "mndDb.h"
17
#include "mndDnode.h"
18
#include "mndPrivilege.h"
19
#include "mndShow.h"
20
#include "mndTrans.h"
21
#include "mndUser.h"
22
#include "mndVgroup.h"
23
#include "tmisce.h"
24
#include "tmsgcb.h"
25

26
#define MND_SSMIGRATE_VER_NUMBER       2
27

28
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq);
29
static int32_t mndProcessUpdateSsMigrateProgressTimer(SRpcMsg *pReq);
30
static int32_t mndProcessSsMigrateListFileSetsRsp(SRpcMsg *pMsg);
31
static int32_t mndProcessSsMigrateFileSetRsp(SRpcMsg *pMsg);
32
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pReq);
33
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pMsg);
34

35
int32_t mndInitSsMigrate(SMnode *pMnode) {
16✔
36
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SSMIGRATE, mndRetrieveSsMigrate);
16✔
37
  mndSetMsgHandle(pMnode, TDMT_MND_SSMIGRATE_DB_TIMER, mndProcessSsMigrateDbTimer);
16✔
38
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SSMIGRATE, mndProcessKillSsMigrateReq);
16✔
39
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SSMIGRATE_RSP, mndTransProcessRsp);
16✔
40
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_SSMIGRATE_PROGRESS_TIMER, mndProcessUpdateSsMigrateProgressTimer);
16✔
41
  mndSetMsgHandle(pMnode, TDMT_VND_LIST_SSMIGRATE_FILESETS_RSP, mndProcessSsMigrateListFileSetsRsp);
16✔
42
  mndSetMsgHandle(pMnode, TDMT_VND_SSMIGRATE_FILESET_RSP, mndProcessSsMigrateFileSetRsp);
16✔
43
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SSMIGRATE_PROGRESS_RSP, mndProcessQuerySsMigrateProgressRsp);
16✔
44
  mndSetMsgHandle(pMnode, TDMT_VND_FOLLOWER_SSMIGRATE_RSP, mndProcessFollowerSsMigrateRsp);
16✔
45

46
  SSdbTable table = {
16✔
47
      .sdbType = SDB_SSMIGRATE,
48
      .keyType = SDB_KEY_INT32,
49
      .encodeFp = (SdbEncodeFp)mndSsMigrateActionEncode,
50
      .decodeFp = (SdbDecodeFp)mndSsMigrateActionDecode,
51
      .insertFp = (SdbInsertFp)mndSsMigrateActionInsert,
52
      .updateFp = (SdbUpdateFp)mndSsMigrateActionUpdate,
53
      .deleteFp = (SdbDeleteFp)mndSsMigrateActionDelete,
54
  };
55

56
  return sdbSetTable(pMnode->pSdb, table);
16✔
57
}
58

59
void mndCleanupSsMigrate(SMnode *pMnode) { mDebug("mnd ssmigrate cleanup"); }
16✔
60

61
void tFreeSsMigrateObj(SSsMigrateObj *pSsMigrate) {
×
62
  taosArrayDestroy(pSsMigrate->vgroups);
×
63
  taosArrayDestroy(pSsMigrate->fileSets);
×
64
}
×
65

66
int32_t tSerializeSSsMigrateObj(void *buf, int32_t bufLen, const SSsMigrateObj *pObj) {
×
67
  SEncoder encoder = {0};
×
68
  int32_t  code = 0;
×
69
  int32_t  lino;
70
  int32_t  tlen;
71
  tEncoderInit(&encoder, buf, bufLen);
×
72

73
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
74
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->id));
×
75
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->dbUid));
×
76
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
×
77
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
×
78
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->stateUpdateTime));
×
79
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgIdx));
×
80
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgState));
×
81
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->fsetIdx));
×
82
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.nodeId));
×
83
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.vgId));
×
84
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.fid));
×
85
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.state));
×
86
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->currFset.startTime));
×
87

88
  int32_t numVg = pObj->vgroups ? taosArrayGetSize(pObj->vgroups) : 0;
×
89
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numVg));
×
90
  for (int32_t i = 0; i < numVg; ++i) {
×
91
    int32_t *vgId = (int32_t *)taosArrayGet(pObj->vgroups, i);
×
92
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, *vgId));
×
93
  }
94

95
  int32_t numFset = pObj->fileSets ? taosArrayGetSize(pObj->fileSets) : 0;
×
96
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numFset));
×
97
  for (int32_t i = 0; i < numFset; ++i) {
×
98
    int32_t *fsetId = (int32_t *)taosArrayGet(pObj->fileSets, i);
×
99
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, *fsetId));
×
100
  }
101
  tEndEncode(&encoder);
×
102

103
_exit:
×
104
  if (code) {
×
105
    tlen = code;
×
106
  } else {
107
    tlen = encoder.pos;
×
108
  }
109
  tEncoderClear(&encoder);
×
110
  return tlen;
×
111
}
112

113
int32_t tDeserializeSSsMigrateObj(void *buf, int32_t bufLen, SSsMigrateObj *pObj) {
×
114
  int32_t  code = TSDB_CODE_SUCCESS, lino;
×
115
  SArray *vgroups = NULL, *fileSets = NULL;
×
116
  SDecoder decoder = {0};
×
117
  tDecoderInit(&decoder, buf, bufLen);
×
118

119
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
120
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->id));
×
121
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->dbUid));
×
122
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
×
123
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
×
124
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->stateUpdateTime));
×
125
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgIdx));
×
126
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgState));
×
127
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->fsetIdx));
×
128
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.nodeId));
×
129
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.vgId));
×
130
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.fid));
×
131
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.state));
×
132
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->currFset.startTime));
×
133

134
  int32_t numVg = 0;
×
135
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numVg));
×
136
  vgroups = pObj->vgroups;
×
137
  if (vgroups) {
×
138
    taosArrayClear(vgroups);
×
139
  } else if ((vgroups = taosArrayInit(numVg, sizeof(int32_t))) == NULL) {
×
140
    code = terrno;
×
141
    goto _exit;
×
142
  }
143
  for (int32_t i = 0; i < numVg; ++i) {
×
144
    int32_t vgId = 0;
×
145
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &vgId));
×
146
    if(taosArrayPush(vgroups, &vgId) == NULL) {
×
147
      TAOS_CHECK_EXIT(terrno);
×
148
    }
149
  }
150

151
  int32_t numFset = 0;
×
152
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numFset));
×
153
  fileSets = pObj->fileSets;
×
154
  if (fileSets) {
×
155
    taosArrayClear(fileSets);
×
156
  } else if ((fileSets = taosArrayInit(numFset, sizeof(int32_t))) == NULL) {
×
157
    code = terrno;
×
158
    goto _exit;
×
159
  }
160
  for (int32_t i = 0; i < numFset; ++i) {
×
161
    int32_t fsetId = 0;
×
162
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &fsetId));
×
163
    if(taosArrayPush(fileSets, &fsetId) == NULL) {
×
164
      TAOS_CHECK_EXIT(terrno);
×
165
    }
166
  }
167

168
  tEndDecode(&decoder);
×
169

170
_exit:
×
171
  tDecoderClear(&decoder);
×
172
  if (code == TSDB_CODE_SUCCESS) {
×
173
    pObj->vgroups = vgroups;
×
174
  } else if (pObj->vgroups) {
×
175
    taosArrayClear(pObj->vgroups);
×
176
  } else {
177
    taosArrayDestroy(vgroups);
×
178
  }
179
  if (code == TSDB_CODE_SUCCESS) {
×
180
    pObj->fileSets = fileSets;
×
181
  } else if (pObj->fileSets) {
×
182
    taosArrayClear(pObj->fileSets);
×
183
  } else {
184
    taosArrayDestroy(fileSets);
×
185
  }
186
  return code;
×
187
}
188

189
SSdbRaw *mndSsMigrateActionEncode(SSsMigrateObj *pSsMigrate) {
×
190
  int32_t code = 0;
×
191
  int32_t lino = 0;
×
192
  terrno = TSDB_CODE_SUCCESS;
×
193

194
  void    *buf = NULL;
×
195
  SSdbRaw *pRaw = NULL;
×
196

197
  int32_t tlen = tSerializeSSsMigrateObj(NULL, 0, pSsMigrate);
×
198
  if (tlen < 0) {
×
199
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
200
    goto OVER;
×
201
  }
202

203
  int32_t size = sizeof(int32_t) + tlen;
×
204
  pRaw = sdbAllocRaw(SDB_SSMIGRATE, MND_SSMIGRATE_VER_NUMBER, size);
×
205
  if (pRaw == NULL) {
×
206
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
207
    goto OVER;
×
208
  }
209

210
  buf = taosMemoryMalloc(tlen);
×
211
  if (buf == NULL) {
×
212
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
213
    goto OVER;
×
214
  }
215

216
  tlen = tSerializeSSsMigrateObj(buf, tlen, pSsMigrate);
×
217
  if (tlen < 0) {
×
218
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
219
    goto OVER;
×
220
  }
221

222
  int32_t dataPos = 0;
×
223
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
×
224
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
225
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
×
226

227
OVER:
×
228
  taosMemoryFreeClear(buf);
×
229
  if (terrno != TSDB_CODE_SUCCESS) {
×
230
    mError("ssmigrate:%" PRId32 ", failed to encode to raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
231
    sdbFreeRaw(pRaw);
×
232
    return NULL;
×
233
  }
234

235
  mTrace("ssmigrate:%" PRId32 ", encode to raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
×
236
  return pRaw;
×
237
}
238

239
SSdbRow *mndSsMigrateActionDecode(SSdbRaw *pRaw) {
×
240
  int32_t      code = 0;
×
241
  int32_t      lino = 0;
×
242
  SSdbRow     *pRow = NULL;
×
243
  SSsMigrateObj *pSsMigrate = NULL;
×
244
  void        *buf = NULL;
×
245
  terrno = TSDB_CODE_SUCCESS;
×
246

247
  int8_t sver = 0;
×
248
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
×
249
    goto OVER;
×
250
  }
251

252
  if (sver != MND_SSMIGRATE_VER_NUMBER) {
×
253
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
254
    mError("ssmigrate read invalid ver, data ver: %d, curr ver: %d", sver, MND_SSMIGRATE_VER_NUMBER);
×
255
    goto OVER;
×
256
  }
257

258
  pRow = sdbAllocRow(sizeof(SSsMigrateObj));
×
259
  if (pRow == NULL) {
×
260
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
261
    goto OVER;
×
262
  }
263

264
  pSsMigrate = sdbGetRowObj(pRow);
×
265
  if (pSsMigrate == NULL) {
×
266
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
267
    goto OVER;
×
268
  }
269

270
  int32_t tlen;
271
  int32_t dataPos = 0;
×
272
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
×
273
  buf = taosMemoryMalloc(tlen + 1);
×
274
  if (buf == NULL) {
×
275
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
276
    goto OVER;
×
277
  }
278
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
279

280
  if ((terrno = tDeserializeSSsMigrateObj(buf, tlen, pSsMigrate)) < 0) {
×
281
    goto OVER;
×
282
  }
283

284
OVER:
×
285
  taosMemoryFreeClear(buf);
×
286
  if (terrno != TSDB_CODE_SUCCESS) {
×
287
    mError("ssmigrate:%" PRId32 ", failed to decode from raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
288
    taosMemoryFreeClear(pRow);
×
289
    return NULL;
×
290
  }
291

292
  mTrace("ssmigrate:%" PRId32 ", decode from raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
×
293
  return pRow;
×
294
}
295

296
int32_t mndSsMigrateActionInsert(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
×
297
  mTrace("ssmigrate:%" PRId32 ", perform insert action", pSsMigrate->id);
×
298
  return 0;
×
299
}
300

301
int32_t mndSsMigrateActionDelete(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
×
302
  mTrace("ssmigrate:%" PRId32 ", perform delete action", pSsMigrate->id);
×
303
  tFreeSsMigrateObj(pSsMigrate);
×
304
  return 0;
×
305
}
306

307
int32_t mndSsMigrateActionUpdate(SSdb *pSdb, SSsMigrateObj *pOldSsMigrate, SSsMigrateObj *pNewSsMigrate) {
×
308
  mTrace("ssmigrate:%" PRId32 ", perform update action, old row:%p new row:%p", pOldSsMigrate->id, pOldSsMigrate,
×
309
         pNewSsMigrate);
310

311
  return 0;
×
312
}
313

314
SSsMigrateObj *mndAcquireSsMigrate(SMnode *pMnode, int64_t ssMigrateId) {
×
315
  SSdb        *pSdb = pMnode->pSdb;
×
316
  SSsMigrateObj *pSsMigrate = sdbAcquire(pSdb, SDB_SSMIGRATE, &ssMigrateId);
×
317
  if (pSsMigrate == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
318
    terrno = TSDB_CODE_SUCCESS;
×
319
  }
320
  return pSsMigrate;
×
321
}
322

323
void mndReleaseSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
324
  SSdb *pSdb = pMnode->pSdb;
×
325
  sdbRelease(pSdb, pSsMigrate);
×
326
  pSsMigrate = NULL;
×
327
}
×
328

329
int32_t mndSsMigrateGetDbName(SMnode *pMnode, int32_t ssMigrateId, char *dbname, int32_t len) {
×
330
  int32_t      code = 0;
×
331
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, ssMigrateId);
×
332
  if (pSsMigrate == NULL) {
×
333
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
334
    if (terrno != 0) code = terrno;
×
335
    TAOS_RETURN(code);
×
336
  }
337

338
  tstrncpy(dbname, pSsMigrate->dbname, len);
×
339
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
340
  TAOS_RETURN(code);
×
341
}
342

343
// ssmigrate db
344
int32_t mndAddSsMigrateToTran(SMnode *pMnode, STrans *pTrans, SSsMigrateObj *pSsMigrate, SDbObj *pDb) {
×
345
  int32_t code = 0;
×
346
  SSdb   *pSdb = pMnode->pSdb;
×
347
  void   *pIter = NULL;
×
348

349
  pSsMigrate->dbUid = pDb->uid;
×
350
  pSsMigrate->id = tGenIdPI32();
×
351
  tstrncpy(pSsMigrate->dbname, pDb->name, sizeof(pSsMigrate->dbname));
×
352
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
353
  pSsMigrate->vgIdx = 0;
×
354
  pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
355
  pSsMigrate->fsetIdx = 0;
×
356

357
  pSsMigrate->vgroups = taosArrayInit(8, sizeof(int32_t));
×
358
  if (pSsMigrate->vgroups == NULL) {
×
359
    TAOS_RETURN(terrno);
×
360
  }
361

362
  while (1) {
×
363
    SVgObj *pVgroup = NULL;
×
364
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
365
    if (pIter == NULL) break;
×
366

367
    if (pVgroup->mountVgId || pVgroup->dbUid != pDb->uid) {
×
368
      sdbRelease(pSdb, pVgroup);
×
369
      continue;
×
370
    }
371

372
    int32_t vgId = pVgroup->vgId;
×
373
    sdbRelease(pSdb, pVgroup);
×
374
    if (taosArrayPush(pSsMigrate->vgroups, &vgId) == NULL) {
×
375
      code = terrno;
×
376
      taosArrayDestroy(pSsMigrate->vgroups);
×
377
      pSsMigrate->vgroups = NULL;
×
378
      sdbCancelFetch(pSdb, pIter);
×
379
      sdbRelease(pSdb, pVgroup);
×
380
      TAOS_RETURN(code);
×
381
    }
382
  }
383

384
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
385
  code = terrno;
×
386
  taosArrayDestroy(pSsMigrate->vgroups);
×
387
  pSsMigrate->vgroups = NULL;
×
388
  if (pRaw == NULL) {
×
389
    TAOS_RETURN(code);
×
390
  }
391
  if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
×
392
    sdbFreeRaw(pRaw);
×
393
    TAOS_RETURN(code);
×
394
  }
395

396
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
×
397
    sdbFreeRaw(pRaw);
×
398
    TAOS_RETURN(code);
×
399
  }
400

401
  mInfo("trans:%d, ssmigrate:%d, db:%s, has been added", pTrans->id, pSsMigrate->id, pSsMigrate->dbname);
×
402
  return 0;
×
403
}
404

405
// retrieve ssmigrate
406
int32_t mndRetrieveSsMigrate(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
407
  SMnode        *pMnode = pReq->info.node;
×
408
  SSdb          *pSdb = pMnode->pSdb;
×
409
  int32_t        numOfRows = 0;
×
410
  SSsMigrateObj *pSsMigrate = NULL;
×
411
  char          *sep = NULL;
×
412
  SDbObj        *pDb = NULL;
×
413
  int32_t        code = 0;
×
414
  int32_t        lino = 0;
×
415
  SUserObj      *pUser = NULL;
×
416
  SDbObj        *pIterDb = NULL;
×
417
  char           objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
×
418
  bool           showAll = false, showIter = false;
×
419
  int64_t        dbUid = 0;
×
420

421
  if (strlen(pShow->db) > 0) {
×
422
    sep = strchr(pShow->db, '.');
×
423
    if (sep &&
×
424
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
425
      sep++;
×
426
    } else {
427
      pDb = mndAcquireDb(pMnode, pShow->db);
×
428
      if (pDb == NULL) return terrno;
×
429
    }
430
  }
431

432
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_SSMIGRATES, PRIV_OBJ_DB, 0,
×
433
                                   _OVER);
434

435
  while (numOfRows < rows) {
×
436
    pShow->pIter = sdbFetch(pSdb, SDB_SSMIGRATE, pShow->pIter, (void **)&pSsMigrate);
×
437
    if (pShow->pIter == NULL) break;
×
438

439
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pSsMigrate->dbname, pSsMigrate, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_SSMIGRATES, _OVER);
×
440

441
    SColumnInfoData *pColInfo;
442
    SName            n;
443
    int32_t          cols = 0;
×
444

445
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
446

447
    // ssmigrate_id
448
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
449
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->id, false), pSsMigrate, &lino, _OVER);
×
450

451
    // db_name
452
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
453
    if (pDb != NULL || !IS_SYS_DBNAME(pSsMigrate->dbname)) {
×
454
      SName name = {0};
×
455
      TAOS_CHECK_GOTO(tNameFromString(&name, pSsMigrate->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
×
456
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
×
457
    } else {
458
      tstrncpy(varDataVal(tmpBuf), pSsMigrate->dbname, TSDB_SHOW_SQL_LEN);
×
459
    }
460
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
×
461
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pSsMigrate, &lino, _OVER);
×
462

463
    // start_time
464
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
465
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->startTime, false), pSsMigrate, &lino, _OVER);
×
466

467
    // number_vgroup
468
    int32_t numVg = taosArrayGetSize(pSsMigrate->vgroups);
×
469
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
470
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&numVg, false), pSsMigrate, &lino, _OVER);
×
471

472
    // migrated_vgroup
473
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
474
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->vgIdx, false), pSsMigrate, &lino, _OVER);
×
475

476
    if (pSsMigrate->vgIdx < numVg) {
×
477
      // vgroup_id
478
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
479
      int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
480
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&vgId, false), pSsMigrate, &lino, _OVER);
×
481
      
482
      // number_fileset
483
      int32_t numFset = taosArrayGetSize(pSsMigrate->fileSets);
×
484
      if (pSsMigrate->vgState < SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
485
        numFset = 0;
×
486
      }
487
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
488
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&numFset, false), pSsMigrate, &lino, _OVER);
×
489

490
      // migrated_fileset
491
      int32_t fsetIdx = pSsMigrate->fsetIdx;
×
492
      if (pSsMigrate->vgState < SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
493
        fsetIdx = 0;
×
494
      }
495
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
496
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&fsetIdx, false), pSsMigrate, &lino, _OVER);
×
497

498
      // fileset_id
499
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
500
      if (fsetIdx < numFset) {
×
501
        int32_t fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, fsetIdx);
×
502
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&fid, false), pSsMigrate, &lino, _OVER);
×
503
      } else {
504
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
505
      }
506
    } else {
507
      // vgroup_id
508
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
509
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
510

511
      // number_fileset
512
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
513
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
514

515
      // migrated_fileset
516
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
517
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
518

519
      // fileset_id
520
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
521
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
522
    }
523

524
    numOfRows++;
×
525
    sdbRelease(pSdb, pSsMigrate);
×
526
  }
527

528
_OVER:
×
529
  if (pUser) mndReleaseUser(pMnode, pUser);
×
530
  mndReleaseDb(pMnode, pDb);
×
531
  if (code != 0) {
×
532
    mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
533
    TAOS_RETURN(code);
×
534
  }
535
  pShow->numOfRows += numOfRows;
×
536
  return numOfRows;
×
537
}
538

539

540
int32_t mndSsMigrateDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb);
541

542
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq) {
×
543
  SMnode *pMnode = pReq->info.node;
×
544
  void *pIter = NULL;
×
545

546
  while (1) {
×
547
    SDbObj *pDb = NULL;
×
548
    pIter = sdbFetch(pMnode->pSdb, SDB_DB, pIter, (void **)&pDb);
×
549
    if (pIter == NULL) {
×
550
      break;
×
551
    }
552
    int32_t code = mndSsMigrateDb(pMnode, NULL, pDb);
×
553
    sdbRelease(pMnode->pSdb, pDb);
×
554
    if (code == TSDB_CODE_SUCCESS) {
×
555
      mInfo("ssmigrate db:%s, has been triggered by timer", pDb->name);
×
556
    } else {
557
      mError("failed to trigger ssmigrate db:%s, code:%d, %s", pDb->name, code, tstrerror(code));
×
558
    }
559
  }
560

561
  TAOS_RETURN(0);
×
562
}
563

564

565
static int32_t mndDropSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
566
  int32_t code = 0, lino = 0;
×
567

568
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "drop-ssmigrate");
×
569
  if (pTrans == NULL) {
×
570
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
571
  }
572

573
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
×
574

575
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
576
  if (pRaw == NULL) {
×
577
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
578
  }
579
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED)) != 0) {
×
580
    sdbFreeRaw(pRaw);
×
581
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
582
  }
583

584
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _exit);
×
585
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
×
586

587
_exit:
×
588
  mndTransDrop(pTrans);
×
589
  if (code == TSDB_CODE_SUCCESS) {
×
590
    mInfo("ssmigrate:%d was dropped successfully", pSsMigrate->id);
×
591
  } else {
592
    mError("ssmigrate:%d, failed to drop at lino %d since %s", pSsMigrate->id, lino, tstrerror(code));
×
593
  }
594
  return code;
×
595
}
596

597

598
static void mndUpdateSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
599
  int32_t code = 0, lino = 0;
×
600

601
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-ssmigrate");
×
602
  if (pTrans == NULL) {
×
603
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
604
  }
605
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
×
606

607
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
608
  if (pRaw == NULL) {
×
609
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
610
  }
611
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
×
612
    sdbFreeRaw(pRaw);
×
613
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
614
  }
615

616
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _exit);
×
617
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
×
618

619
_exit:
×
620
  mndTransDrop(pTrans);
×
621
  if (code == TSDB_CODE_SUCCESS) {
×
622
    mTrace("ssmigrate:%d was updated successfully", pSsMigrate->id);
×
623
  } else {
624
    mError("ssmigrate:%d, failed to update at lino %d since %s", pSsMigrate->id, lino, tstrerror(code));
×
625
  }
626
}
×
627

628

629
static int32_t mndKillSsMigrate(SMnode *pMnode, SRpcMsg *pReq, SSsMigrateObj *pSsMigrate) {
×
630
  if (pSsMigrate->vgIdx >= taosArrayGetSize(pSsMigrate->vgroups)) {
×
631
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
632
  }
633

634
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
635
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
636
  if (pVgroup == NULL) {
×
637
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
638
  }
639

640
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
641
  mndReleaseVgroup(pMnode, pVgroup);
×
642

643
  SVnodeKillSsMigrateReq req = {.ssMigrateId = pSsMigrate->id};
×
644
  int32_t   reqLen = tSerializeSVnodeKillSsMigrateReq(NULL, 0, &req);
×
645
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
646
  SMsgHead *pHead = rpcMallocCont(contLen);
×
647
  if (pHead == NULL) {
×
648
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
649
  }
650

651
  pHead->contLen = htonl(contLen);
×
652
  pHead->vgId = htonl(vgId);
×
653
  int32_t ret = 0;
×
654
  if ((ret = tSerializeSVnodeKillSsMigrateReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
655
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
656
  }
657

658
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_KILL_SSMIGRATE, .pCont = pHead, .contLen = contLen};
×
659
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
660
  if (code != 0) {
×
661
    mError("ssmigrate:%d, vgId:%d, failed to send kill ssmigrate request to vnode since 0x%x", req.ssMigrateId, vgId, code);
×
662
  } else {
663
    mInfo("ssmigrate:%d, vgId:%d, kill ssmigrate request was sent to vnode", req.ssMigrateId, vgId);
×
664
  }
665

666
  return mndDropSsMigrate(pMnode, pSsMigrate);
×
667
}
668

669
int32_t mndProcessKillSsMigrateReq(SRpcMsg *pReq) {
×
670
  int32_t         code = 0;
×
671
  int32_t         lino = 0;
×
672
  SKillSsMigrateReq killReq = {0};
×
673

674
  if ((code = tDeserializeSKillSsMigrateReq(pReq->pCont, pReq->contLen, &killReq)) != 0) {
×
675
    TAOS_RETURN(code);
×
676
  }
677

678
  mInfo("start to kill ssmigrate:%" PRId32, killReq.ssMigrateId);
×
679

680
  SMnode      *pMnode = pReq->info.node;
×
681
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, killReq.ssMigrateId);
×
682
  if (pSsMigrate == NULL) {
×
683
    code = TSDB_CODE_MND_INVALID_SSMIGRATE_ID;
×
684
    tFreeSKillSsMigrateReq(&killReq);
×
685
    TAOS_RETURN(code);
×
686
  }
687

688
  //TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_SSMIGRATE_DB), &lino, _OVER);
689

690
  TAOS_CHECK_GOTO(mndKillSsMigrate(pMnode, pReq, pSsMigrate), &lino, _OVER);
×
691

692
_OVER:
×
693
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
694
    mError("failed to kill ssmigrate %" PRId32 " since %s", killReq.ssMigrateId, tstrerror(code));
×
695
  }
696

697
  tFreeSKillSsMigrateReq(&killReq);
×
698
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
699

700
  TAOS_RETURN(code);
×
701
}
702

703

704
static void mndSendSsMigrateListFileSetsReq(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
705
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
706
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
707
  if (pVgroup == NULL) {
×
708
    mError("ssmigrate:%d, vgId:%d, vgroup does not exist in %s", pSsMigrate->id, vgId, __func__);
×
709
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
710
    pSsMigrate->vgIdx++;
×
711
    return;
×
712
  }
713

714
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
715
  mndReleaseVgroup(pMnode, pVgroup);
×
716

717
  SListSsMigrateFileSetsReq req = {.ssMigrateId = pSsMigrate->id};
×
718
  int32_t   reqLen = tSerializeSListSsMigrateFileSetsReq(NULL, 0, &req);
×
719
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
720
  SMsgHead *pHead = rpcMallocCont(contLen);
×
721
  if (pHead == NULL) {
×
722
    return;
×
723
  }
724

725
  pHead->contLen = htonl(contLen);
×
726
  pHead->vgId = htonl(vgId);
×
727
  int32_t ret = 0;
×
728
  if ((ret = tSerializeSListSsMigrateFileSetsReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
729
    return;
×
730
  }
731

732
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_LIST_SSMIGRATE_FILESETS, .pCont = pHead, .contLen = contLen};
×
733
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
734
  if (code != 0) {
×
735
    mError("ssmigrate:%d, vgId:%d, failed to send list filesets request to vnode since 0x%x", req.ssMigrateId, vgId, code);
×
736
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
737
    pSsMigrate->vgIdx++;
×
738
  } else {
739
    mInfo("ssmigrate:%d, vgId:%d, list filesets request was sent to vnode", req.ssMigrateId, vgId);
×
740
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_WAITING_FSET_LIST;
×
741
  }
742

743
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
744
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
745
}
746

747

748
static int32_t mndProcessSsMigrateListFileSetsRsp(SRpcMsg *pMsg) {
×
749
  int32_t code = 0, lino = 0;
×
750

751
  if (pMsg->code != 0) {
×
752
    mError("received wrong ssmigrate list filesets response, req code is %s", tstrerror(pMsg->code));
×
753
    TAOS_RETURN(pMsg->code);
×
754
  }
755

756
  SSsMigrateObj *pSsMigrate = NULL;
×
757
  SListSsMigrateFileSetsRsp rsp = {0};
×
758
  code = tDeserializeSListSsMigrateFileSetsRsp(pMsg->pCont, pMsg->contLen, &rsp);
×
759
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
760

761
  SMnode *pMnode = pMsg->info.node;
×
762
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
763
  if (pSsMigrate == NULL) {
×
764
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
765
  }
766

767
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
768
  if (vgId != rsp.vgId) {
×
769
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
770
  }
771

772
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_WAITING_FSET_LIST) {
×
773
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
774
  }
775

776
  // we need to use the new filesets to update the SSsMigrateObj,
777
  // swap is only to make the it is easier to free both of them.
778
  SArray* tmp = pSsMigrate->fileSets;
×
779
  pSsMigrate->fileSets = rsp.pFileSets;
×
780
  rsp.pFileSets = tmp;
×
781

782
  pSsMigrate->fsetIdx = 0;
×
783
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
784
  if (taosArrayGetSize(pSsMigrate->fileSets) == 0) {
×
785
    mInfo("ssmigrate:%d, vgId:%d, no filesets to migrate.", pSsMigrate->id, rsp.vgId);
×
786
    pSsMigrate->vgIdx++;
×
787
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
788
  } else {
789
    mInfo("ssmigrate:%d, vgId:%d, filesets received.", pSsMigrate->id, rsp.vgId);
×
790
    pSsMigrate->currFset.nodeId = 0;
×
791
    pSsMigrate->currFset.vgId = vgId;
×
792
    pSsMigrate->currFset.fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, 0);
×
793
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
794
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
795
  }
796

797
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
798
  
799
_exit:
×
800
  if (code != TSDB_CODE_SUCCESS) {
×
801
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
802
  }
803
  tFreeSListSsMigrateFileSetsRsp(&rsp);
×
804
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
805
  return code;
×
806
}
807

808

809
static void mndSendSsMigrateFileSetReq(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
810
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
811
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
812
  if (pVgroup == NULL) {
×
813
    mError("ssmigrate:%d, vgId:%d, vgroup does not exist in %s", pSsMigrate->id, vgId, __func__);
×
814
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
815
    pSsMigrate->vgIdx++;
×
816
    return;
×
817
  }
818

819
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
820
  mndReleaseVgroup(pMnode, pVgroup);
×
821

822
  SSsMigrateFileSetReq req = { 0 };
×
823
  req.ssMigrateId = pSsMigrate->id;
×
824
  req.nodeId = 0;
×
825
  req.fid = pSsMigrate->currFset.fid;
×
826
  req.startTimeSec = taosGetTimestampSec();
×
827

828
  int32_t   reqLen = tSerializeSSsMigrateFileSetReq(NULL, 0, &req);
×
829
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
830
  SMsgHead *pHead = rpcMallocCont(contLen);
×
831
  if (pHead == NULL) {
×
832
    return;
×
833
  }
834

835
  pHead->contLen = htonl(contLen);
×
836
  pHead->vgId = htonl(vgId);
×
837
  int32_t ret = 0;
×
838
  if ((ret = tSerializeSSsMigrateFileSetReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
839
    return;
×
840
  }
841

842
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_SSMIGRATE_FILESET, .pCont = pHead, .contLen = contLen};
×
843
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
844
  if (code != 0) {
×
845
    mError("ssmigrate:%d, vgId:%d, fid:%d, failed to send migrate fileset request to vnode since 0x%x", req.ssMigrateId, vgId, req.fid, code);
×
846
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
847
    pSsMigrate->vgIdx++;
×
848
  } else {
849
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, migrate fileset request was sent to vnode", req.ssMigrateId, vgId, req.fid);
×
850
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_STARTING;
×
851
    pSsMigrate->currFset.startTime = req.startTimeSec;
×
852
  }
853

854
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
855
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
856
}
857

858

859
static int32_t mndProcessSsMigrateFileSetRsp(SRpcMsg *pMsg) {
×
860
  int32_t code = 0, lino = 0;
×
861

862
  if (pMsg->code != 0) {
×
863
    mError("received wrong ssmigrate fileset response, error code is %s", tstrerror(pMsg->code));
×
864
    TAOS_RETURN(pMsg->code);
×
865
  }
866

867
  SSsMigrateObj *pSsMigrate = NULL;
×
868
  SSsMigrateFileSetRsp rsp = {0};
×
869
  code = tDeserializeSSsMigrateFileSetRsp(pMsg->pCont, pMsg->contLen, &rsp);
×
870
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
871

872
  SMnode *pMnode = pMsg->info.node;
×
873
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
874
  if (pSsMigrate == NULL) {
×
875
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
876
  }
877

878
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
879
  if (vgId != rsp.vgId) {
×
880
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
881
  }
882
  if (rsp.fid != pSsMigrate->currFset.fid) {
×
883
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
884
  }
885
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_FSET_STARTING) {
×
886
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
887
  }
888
  if (rsp.nodeId <= 0) {
×
889
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
890
  }
891

892
  mInfo("ssmigrate:%d, vgId:%d, fid:%d, leader node is %d", rsp.ssMigrateId, vgId, rsp.fid, rsp.nodeId);
×
893
  pSsMigrate->currFset.nodeId = rsp.nodeId;
×
894
  pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_STARTED;
×
895
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
896
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
897
  
898
_exit:
×
899
  if (code != TSDB_CODE_SUCCESS) {
×
900
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
901
  }
902
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
903
  return code;
×
904
}
905

906

907

908
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pReq) {
×
909
  TAOS_RETURN(0);
×
910
}
911

912

913
static void mndSendFollowerSsMigrateReq(SMnode* pMnode, SSsMigrateObj *pSsMigrate) {
×
914
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, pSsMigrate->currFset.vgId);
×
915
  if (pVgroup == NULL) {
×
916
    return;
×
917
  }
918

919
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
920
  mndReleaseVgroup(pMnode, pVgroup);
×
921

922
  SSsMigrateProgress req = {
×
923
    .ssMigrateId = pSsMigrate->id,
×
924
    .nodeId = pSsMigrate->currFset.nodeId,
×
925
    .vgId = pSsMigrate->currFset.vgId,
×
926
    .fid = pSsMigrate->currFset.fid,
×
927
    .state = pSsMigrate->currFset.state,
×
928
  };
929

930
  int32_t          reqLen = tSerializeSSsMigrateProgress(NULL, 0, &req);
×
931
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
932
  SMsgHead *pHead = rpcMallocCont(contLen);
×
933
  if (pHead == NULL) {
×
934
    return;
×
935
  }
936

937
  pHead->contLen = htonl(contLen);
×
938
  pHead->vgId = htonl(req.vgId);
×
939
  TAOS_UNUSED(tSerializeSSsMigrateProgress((char *)pHead + sizeof(SMsgHead), reqLen, &req));
×
940
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_FOLLOWER_SSMIGRATE, .pCont = pHead, .contLen = contLen};
×
941

942
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
943
  if (code != 0) {
×
944
    mError("vgId:%d, ssmigrate:%d, fid:%d, failed to send follower-ssmigrate request since 0x%x", req.ssMigrateId, req.vgId, req.fid, code);
×
945
  } else {
946
    mTrace("vgId:%d, ssmigrate:%d, fid:%d, follower-ssmigrate request sent", req.ssMigrateId, req.vgId, req.fid);
×
947
  }
948
}
949

950

951

952
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pMsg) {
×
953
  int32_t code = 0, lino = 0;
×
954

955
  if (pMsg->code != 0) {
×
956
    mError("received wrong query ssmigrate progress response, error code is %s", tstrerror(pMsg->code));
×
957
    TAOS_RETURN(pMsg->code);
×
958
  }
959

960
  SSsMigrateObj *pSsMigrate = NULL;
×
961
  SSsMigrateProgress rsp = {0};
×
962
  code = tDeserializeSSsMigrateProgress(pMsg->pCont, pMsg->contLen, &rsp);
×
963
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
964

965
  SMnode *pMnode = pMsg->info.node;
×
966
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
967
  if (pSsMigrate == NULL) {
×
968
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
969
  }
970

971
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_FSET_STARTED) {
×
972
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
973
  }
974
  if (rsp.nodeId != pSsMigrate->currFset.nodeId) {
×
975
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
976
  }
977
  if (rsp.vgId != pSsMigrate->currFset.vgId) {
×
978
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
979
  }
980
  if (rsp.fid != pSsMigrate->currFset.fid) {
×
981
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
982
  }
983

984
  if (rsp.state == pSsMigrate->currFset.state) {
×
985
    mTrace("ssmigrate:%d, vgId:%d, fid:%d, state is %d", rsp.ssMigrateId, rsp.vgId, rsp.fid, rsp.state);
×
986
  } else {
987
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, state is %d", rsp.ssMigrateId, rsp.vgId, rsp.fid, rsp.state);
×
988
  }
989
  pSsMigrate->currFset.state = rsp.state;
×
990
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
991
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
992

993
  mndSendFollowerSsMigrateReq(pMnode, pSsMigrate);
×
994

995
_exit:
×
996
  if (code != TSDB_CODE_SUCCESS) {
×
997
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
998
  }
999
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
1000
  return code;
×
1001
}
1002

1003

1004

1005
// when query migration progress, we need to send the msg to dnode instead of vgroup,
1006
// because migration may take a long time, and leader may change during the migration process,
1007
// while only the initial leader vnode can handle the migration progress query.
1008
void mndSendQuerySsMigrateProgressReq(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
1009
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pSsMigrate->currFset.nodeId);
×
1010
  if (pDnode == NULL) {
×
1011
    return;
×
1012
  }
1013

1014
  SEpSet epSet = mndGetDnodeEpset(pDnode);
×
1015
  mndReleaseDnode(pMnode, pDnode);
×
1016

1017
  SSsMigrateProgress req = {
×
1018
    .ssMigrateId = pSsMigrate->id,
×
1019
    .nodeId = pSsMigrate->currFset.nodeId,
×
1020
    .vgId = pSsMigrate->currFset.vgId,
×
1021
    .fid = pSsMigrate->currFset.fid,
×
1022
    .state = SSMIGRATE_FILESET_STATE_IN_PROGRESS,
1023
  };
1024

1025
  int32_t          reqLen = tSerializeSSsMigrateProgress(NULL, 0, &req);
×
1026
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
1027
  SMsgHead *pHead = rpcMallocCont(contLen);
×
1028
  if (pHead == NULL) {
×
1029
    return;
×
1030
  }
1031

1032
  pHead->contLen = htonl(contLen);
×
1033
  pHead->vgId = htonl(req.vgId);
×
1034
  TAOS_UNUSED(tSerializeSSsMigrateProgress((char *)pHead + sizeof(SMsgHead), reqLen, &req));
×
1035
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SSMIGRATE_PROGRESS, .pCont = pHead, .contLen = contLen};
×
1036

1037
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
1038
  if (code != 0) {
×
1039
    mError("ssmigrate:%d, vgId:%d, fid:%d, failed to send ssmigrate-query-progress request since 0x%x", req.ssMigrateId, req.vgId, req.fid, code)
×
1040
  } else {
1041
    mTrace("ssmigrate:%d, vgId:%d, fid:%d, ssmigrate-query-progress request sent", req.ssMigrateId, req.vgId, req.fid);
×
1042
  }
1043
}
1044

1045

1046

1047
static void mndUpdateSsMigrateProgress(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
1048
  int32_t numVg = taosArrayGetSize(pSsMigrate->vgroups);
×
1049
  int32_t numFset = taosArrayGetSize(pSsMigrate->fileSets);
×
1050

1051
  if (pSsMigrate->vgIdx >= numVg) {
×
1052
    mInfo("ssmigrate:%d, all vgroups has been processed", pSsMigrate->id);
×
1053
    TAOS_UNUSED(mndDropSsMigrate(pMnode, pSsMigrate));
×
1054
    return;
×
1055
  }
1056

1057
  // vgroup state is init, we need to get the list of its file sets
1058
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_INIT) {
×
1059
    mndSendSsMigrateListFileSetsReq(pMnode, pSsMigrate);
×
1060
    return;
×
1061
  }
1062

1063
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
1064

1065
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_WAITING_FSET_LIST) {
×
1066
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1067
      mWarn("ssmigrate:%d, vgId:%d, haven't receive file set list in 30 seconds, skip", pSsMigrate->id, vgId);
×
1068
      pSsMigrate->vgIdx++;
×
1069
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1070
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1071
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1072
    }
1073
    return;
×
1074
  }
1075

1076
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
1077
    mndSendSsMigrateFileSetReq(pMnode, pSsMigrate);
×
1078
    return;
×
1079
  }
1080

1081
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_FSET_STARTING) {
×
1082
    // if timeout, we skip the current vgroup instead of the current file set, because timeout
1083
    // of a file set often means the vgroup is not available.
1084
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1085
      mWarn("ssmigrate:%d, vgId:%d, fid:%d, haven't receive response in 30 seconds, skip", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
1086
      pSsMigrate->vgIdx++;
×
1087
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1088
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1089
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1090
    }
1091
    return;
×
1092
  }
1093

1094
  // compact need some time, so only reset migration state here and wait the next
1095
  // tick to send the first migration request again.
1096
  if (pSsMigrate->currFset.state == SSMIGRATE_FILESET_STATE_COMPACT) {
×
1097
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, compacting, will retry later", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
1098
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
1099
    pSsMigrate->currFset.nodeId = 0;
×
1100
    pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1101
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
1102
    mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1103
    return;
×
1104
  }
1105

1106
  if (pSsMigrate->currFset.state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
1107
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1108
      mWarn("ssmigrate:%d, vgId:%d, fid:%d, haven't receive state in 30 seconds, skip", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
1109
      pSsMigrate->vgIdx++;
×
1110
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1111
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1112
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1113
    } else {
1114
      mndSendQuerySsMigrateProgressReq(pMnode, pSsMigrate);
×
1115
    }
1116
    return;
×
1117
  }
1118

1119
  // wait at least 30 seconds after the leader node has processed the file set, this is to ensure
1120
  // that the follower nodes have enough time to start process the file set, and make the code of
1121
  // tsdb simpler.
1122
  if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime < 30) {
×
1123
    return;
×
1124
  }
1125

1126
  // this file set has been processed, move to the next file set
1127
  pSsMigrate->fsetIdx++;
×
1128
  if (pSsMigrate->fsetIdx >= numFset) {
×
1129
    pSsMigrate->vgIdx++;
×
1130
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1131
  } else {
1132
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
1133
    pSsMigrate->currFset.nodeId = 0;
×
1134
    pSsMigrate->currFset.vgId = vgId;
×
1135
    pSsMigrate->currFset.fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, pSsMigrate->fsetIdx);
×
1136
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
1137
  }
1138

1139
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1140
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1141
}
1142

1143

1144
static int32_t mndProcessUpdateSsMigrateProgressTimer(SRpcMsg *pReq) {
×
1145
  mTrace("start to process update ssmigrate progress timer");
×
1146

1147
  int32_t code = 0;
×
1148
  SMnode* pMnode = pReq->info.node;
×
1149
  SSdb   *pSdb = pMnode->pSdb;
×
1150
  void *pIter = NULL;
×
1151

1152
  while (1) {
×
1153
    SSsMigrateObj *pSsMigrate = NULL;
×
1154
    pIter = sdbFetch(pMnode->pSdb, SDB_SSMIGRATE, pIter, (void **)&pSsMigrate);
×
1155
    if (pIter == NULL) {
×
1156
      break;
×
1157
    }
1158
    mndUpdateSsMigrateProgress(pMnode, pSsMigrate);
×
1159
    sdbRelease(pSdb, pSsMigrate);
×
1160
  }
1161

1162
  return 0;
×
1163
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc