• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.43
/source/dnode/mnode/impl/src/mndSsMigrate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "audit.h"
16
#include "mndSsMigrate.h"
17
#include "mndDb.h"
18
#include "mndDnode.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "tmisce.h"
24
#include "tmsgcb.h"
25

26
#define MND_SSMIGRATE_VER_NUMBER 1
27
#define MND_SSMIGRATE_ID_LEN     11
28

29
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq);
30
static int32_t mndProcessQuerySsMigrateProgressTimer(SRpcMsg *pReq);
31
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pReq);
32
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pReq);
33

34
int32_t mndInitSsMigrate(SMnode *pMnode) {
15✔
35
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SSMIGRATE, mndRetrieveSsMigrate);
15✔
36
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SSMIGRATE, mndProcessKillSsMigrateReq);
15✔
37
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SSMIGRATE_PROGRESS_RSP, mndProcessQuerySsMigrateProgressRsp);
15✔
38
  mndSetMsgHandle(pMnode, TDMT_MND_SSMIGRATE_DB_TIMER, mndProcessSsMigrateDbTimer);
15✔
39
  mndSetMsgHandle(pMnode, TDMT_MND_QUERY_SSMIGRATE_PROGRESS_TIMER, mndProcessQuerySsMigrateProgressTimer);
15✔
40
  mndSetMsgHandle(pMnode, TDMT_VND_FOLLOWER_SSMIGRATE_RSP, mndProcessFollowerSsMigrateRsp);
15✔
41
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SSMIGRATE_RSP, mndTransProcessRsp);
15✔
42

43
  SSdbTable table = {
15✔
44
      .sdbType = SDB_SSMIGRATE,
45
      .keyType = SDB_KEY_INT32,
46
      .encodeFp = (SdbEncodeFp)mndSsMigrateActionEncode,
47
      .decodeFp = (SdbDecodeFp)mndSsMigrateActionDecode,
48
      .insertFp = (SdbInsertFp)mndSsMigrateActionInsert,
49
      .updateFp = (SdbUpdateFp)mndSsMigrateActionUpdate,
50
      .deleteFp = (SdbDeleteFp)mndSsMigrateActionDelete,
51
  };
52

53
  return sdbSetTable(pMnode->pSdb, table);
15✔
54
}
55

56
void mndCleanupSsMigrate(SMnode *pMnode) { mDebug("mnd ssmigrate cleanup"); }
15!
57

58
void tFreeSsMigrateObj(SSsMigrateObj *pSsMigrate) {
×
59
  taosArrayDestroy(pSsMigrate->vgroups);
×
60
}
×
61

62
int32_t tSerializeSSsMigrateObj(void *buf, int32_t bufLen, const SSsMigrateObj *pObj) {
×
63
  SEncoder encoder = {0};
×
64
  int32_t  code = 0;
×
65
  int32_t  lino;
66
  int32_t  tlen;
67
  tEncoderInit(&encoder, buf, bufLen);
×
68

69
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
70
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->id));
×
71
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->dbUid));
×
72
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
×
73
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
×
74
  int32_t numVnode = 0;
×
75
  if (pObj->vgroups) {
×
76
    numVnode = taosArrayGetSize(pObj->vgroups);
×
77
  }
78
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numVnode));
×
79
  for (int32_t i = 0; i < numVnode; ++i) {
×
80
    SVgroupSsMigrateDetail *pDetail = (SVgroupSsMigrateDetail *)taosArrayGet(pObj->vgroups, i);
×
81
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, pDetail->vgId));
×
82
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, pDetail->nodeId));
×
83
    TAOS_CHECK_EXIT(tEncodeBool(&encoder, pDetail->done));
×
84
  }
85
  tEndEncode(&encoder);
×
86

87
_exit:
×
88
  if (code) {
×
89
    tlen = code;
×
90
  } else {
91
    tlen = encoder.pos;
×
92
  }
93
  tEncoderClear(&encoder);
×
94
  return tlen;
×
95
}
96

97
int32_t tDeserializeSSsMigrateObj(void *buf, int32_t bufLen, SSsMigrateObj *pObj) {
×
98
  int32_t  code = TSDB_CODE_SUCCESS;
×
99
  int32_t  lino;
100
  SDecoder decoder = {0};
×
101
  tDecoderInit(&decoder, buf, bufLen);
×
102

103
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
104
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->id));
×
105
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->dbUid));
×
106
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
×
107
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
×
108

109
  int32_t numVnode = 0;
×
110
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numVnode));
×
111
  SArray* vgroups = pObj->vgroups;
×
112
  if (vgroups) {
×
113
    taosArrayClear(vgroups);
×
114
  } else if ((vgroups = taosArrayInit(numVnode, sizeof(SVgroupSsMigrateDetail))) == NULL) {
×
115
    code = terrno;
×
116
    goto _exit;
×
117
  }
118

119
  for (int32_t i = 0; i < numVnode; ++i) {
×
120
    SVgroupSsMigrateDetail detail;
121
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &detail.vgId));
×
122
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &detail.nodeId));
×
123
    TAOS_CHECK_EXIT(tDecodeBool(&decoder, &detail.done));
×
124
    if(taosArrayPush(vgroups, &detail) == NULL) {
×
125
      code = terrno;
×
126
      goto _exit;
×
127
    }
128
  }
129

130
  tEndDecode(&decoder);
×
131

132
_exit:
×
133
  tDecoderClear(&decoder);
×
134
  if (code == TSDB_CODE_SUCCESS) {
×
135
    pObj->vgroups = vgroups;
×
136
  } else if (pObj->vgroups) {
×
137
    taosArrayClear(pObj->vgroups);
×
138
  } else {
139
    taosArrayDestroy(vgroups);
×
140
  }
141
  return code;
×
142
}
143

144
SSdbRaw *mndSsMigrateActionEncode(SSsMigrateObj *pSsMigrate) {
×
145
  int32_t code = 0;
×
146
  int32_t lino = 0;
×
147
  terrno = TSDB_CODE_SUCCESS;
×
148

149
  void    *buf = NULL;
×
150
  SSdbRaw *pRaw = NULL;
×
151

152
  int32_t tlen = tSerializeSSsMigrateObj(NULL, 0, pSsMigrate);
×
153
  if (tlen < 0) {
×
154
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
155
    goto OVER;
×
156
  }
157

158
  int32_t size = sizeof(int32_t) + tlen;
×
159
  pRaw = sdbAllocRaw(SDB_SSMIGRATE, MND_SSMIGRATE_VER_NUMBER, size);
×
160
  if (pRaw == NULL) {
×
161
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
162
    goto OVER;
×
163
  }
164

165
  buf = taosMemoryMalloc(tlen);
×
166
  if (buf == NULL) {
×
167
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
168
    goto OVER;
×
169
  }
170

171
  tlen = tSerializeSSsMigrateObj(buf, tlen, pSsMigrate);
×
172
  if (tlen < 0) {
×
173
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
174
    goto OVER;
×
175
  }
176

177
  int32_t dataPos = 0;
×
178
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
×
179
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
180
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
×
181

182
OVER:
×
183
  taosMemoryFreeClear(buf);
×
184
  if (terrno != TSDB_CODE_SUCCESS) {
×
185
    mError("ssmigrate:%" PRId32 ", failed to encode to raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
186
    sdbFreeRaw(pRaw);
×
187
    return NULL;
×
188
  }
189

190
  mTrace("ssmigrate:%" PRId32 ", encode to raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
×
191
  return pRaw;
×
192
}
193

194
SSdbRow *mndSsMigrateActionDecode(SSdbRaw *pRaw) {
×
195
  int32_t      code = 0;
×
196
  int32_t      lino = 0;
×
197
  SSdbRow     *pRow = NULL;
×
198
  SSsMigrateObj *pSsMigrate = NULL;
×
199
  void        *buf = NULL;
×
200
  terrno = TSDB_CODE_SUCCESS;
×
201

202
  int8_t sver = 0;
×
203
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
×
204
    goto OVER;
×
205
  }
206

207
  if (sver != MND_SSMIGRATE_VER_NUMBER) {
×
208
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
209
    mError("ssmigrate read invalid ver, data ver: %d, curr ver: %d", sver, MND_SSMIGRATE_VER_NUMBER);
×
210
    goto OVER;
×
211
  }
212

213
  pRow = sdbAllocRow(sizeof(SSsMigrateObj));
×
214
  if (pRow == NULL) {
×
215
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
216
    goto OVER;
×
217
  }
218

219
  pSsMigrate = sdbGetRowObj(pRow);
×
220
  if (pSsMigrate == NULL) {
×
221
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
222
    goto OVER;
×
223
  }
224

225
  int32_t tlen;
226
  int32_t dataPos = 0;
×
227
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
×
228
  buf = taosMemoryMalloc(tlen + 1);
×
229
  if (buf == NULL) {
×
230
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
231
    goto OVER;
×
232
  }
233
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
234

235
  if ((terrno = tDeserializeSSsMigrateObj(buf, tlen, pSsMigrate)) < 0) {
×
236
    goto OVER;
×
237
  }
238

239
OVER:
×
240
  taosMemoryFreeClear(buf);
×
241
  if (terrno != TSDB_CODE_SUCCESS) {
×
242
    mError("ssmigrate:%" PRId32 ", failed to decode from raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
243
    taosMemoryFreeClear(pRow);
×
244
    return NULL;
×
245
  }
246

247
  mTrace("ssmigrate:%" PRId32 ", decode from raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
×
248
  return pRow;
×
249
}
250

251
int32_t mndSsMigrateActionInsert(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
×
252
  mTrace("ssmigrate:%" PRId32 ", perform insert action", pSsMigrate->id);
×
253
  return 0;
×
254
}
255

256
int32_t mndSsMigrateActionDelete(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
×
257
  mTrace("ssmigrate:%" PRId32 ", perform delete action", pSsMigrate->id);
×
258
  tFreeSsMigrateObj(pSsMigrate);
×
259
  return 0;
×
260
}
261

262
int32_t mndSsMigrateActionUpdate(SSdb *pSdb, SSsMigrateObj *pOldSsMigrate, SSsMigrateObj *pNewSsMigrate) {
×
263
  mTrace("ssmigrate:%" PRId32 ", perform update action, old row:%p new row:%p", pOldSsMigrate->id, pOldSsMigrate,
×
264
         pNewSsMigrate);
265

266
  return 0;
×
267
}
268

269
SSsMigrateObj *mndAcquireSsMigrate(SMnode *pMnode, int64_t ssMigrateId) {
×
270
  SSdb        *pSdb = pMnode->pSdb;
×
271
  SSsMigrateObj *pSsMigrate = sdbAcquire(pSdb, SDB_SSMIGRATE, &ssMigrateId);
×
272
  if (pSsMigrate == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
273
    terrno = TSDB_CODE_SUCCESS;
×
274
  }
275
  return pSsMigrate;
×
276
}
277

278
void mndReleaseSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
279
  SSdb *pSdb = pMnode->pSdb;
×
280
  sdbRelease(pSdb, pSsMigrate);
×
281
  pSsMigrate = NULL;
×
282
}
×
283

284
int32_t mndSsMigrateGetDbName(SMnode *pMnode, int32_t ssMigrateId, char *dbname, int32_t len) {
×
285
  int32_t      code = 0;
×
286
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, ssMigrateId);
×
287
  if (pSsMigrate == NULL) {
×
288
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
289
    if (terrno != 0) code = terrno;
×
290
    TAOS_RETURN(code);
×
291
  }
292

293
  tstrncpy(dbname, pSsMigrate->dbname, len);
×
294
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
295
  TAOS_RETURN(code);
×
296
}
297

298
// ssmigrate db
299
int32_t mndAddSsMigrateToTran(SMnode *pMnode, STrans *pTrans, SSsMigrateObj *pSsMigrate, SDbObj *pDb) {
×
300
  int32_t code = 0;
×
301
  SSdb   *pSdb = pMnode->pSdb;
×
302
  void   *pIter = NULL;
×
303

304
  pSsMigrate->dbUid = pDb->uid;
×
305
  pSsMigrate->id = tGenIdPI32();
×
306
  tstrncpy(pSsMigrate->dbname, pDb->name, sizeof(pSsMigrate->dbname));
×
307

308
  pSsMigrate->vgroups = taosArrayInit(8, sizeof(SVgroupSsMigrateDetail));
×
309
  if (pSsMigrate->vgroups == NULL) {
×
310
    TAOS_RETURN(terrno);
×
311
  }
312

313
  while (1) {
×
314
    SVgObj *pVgroup = NULL;
×
315
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
316
    if (pIter == NULL) break;
×
317

318
    if (pVgroup->mountVgId || pVgroup->dbUid != pDb->uid) {
×
319
      sdbRelease(pSdb, pVgroup);
×
320
      continue;
×
321
    }
322

323
    SVgroupSsMigrateDetail detail = {.vgId = pVgroup->vgId, .done = false };
×
324
    sdbRelease(pSdb, pVgroup);
×
325
    if (taosArrayPush(pSsMigrate->vgroups, &detail) == NULL) {
×
326
      code = terrno;
×
327
      taosArrayDestroy(pSsMigrate->vgroups);
×
328
      pSsMigrate->vgroups = NULL;
×
329
      TAOS_RETURN(code);
×
330
    }
331
  }
332

333
  SSdbRaw *pVgRaw = mndSsMigrateActionEncode(pSsMigrate);
×
334
  taosArrayDestroy(pSsMigrate->vgroups);
×
335
  pSsMigrate->vgroups = NULL;
×
336
  if (pVgRaw == NULL) {
×
337
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
338
    if (terrno != 0) code = terrno;
×
339
    TAOS_RETURN(code);
×
340
  }
341
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
×
342
    sdbFreeRaw(pVgRaw);
×
343
    TAOS_RETURN(code);
×
344
  }
345

346
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
×
347
    sdbFreeRaw(pVgRaw);
×
348
    TAOS_RETURN(code);
×
349
  }
350

351
  mInfo("trans:%d, ssmigrate:%d, db:%s, has been added", pTrans->id, pSsMigrate->id, pSsMigrate->dbname);
×
352
  return 0;
×
353
}
354

355
// retrieve ssmigrate
356
int32_t mndRetrieveSsMigrate(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
357
  SMnode      *pMnode = pReq->info.node;
×
358
  SSdb        *pSdb = pMnode->pSdb;
×
359
  int32_t      numOfRows = 0;
×
360
  SSsMigrateObj *pSsMigrate = NULL;
×
361
  char        *sep = NULL;
×
362
  SDbObj      *pDb = NULL;
×
363
  int32_t      code = 0;
×
364
  int32_t      lino = 0;
×
365

366
  if (strlen(pShow->db) > 0) {
×
367
    sep = strchr(pShow->db, '.');
×
368
    if (sep &&
×
369
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
370
      sep++;
×
371
    } else {
372
      pDb = mndAcquireDb(pMnode, pShow->db);
×
373
      if (pDb == NULL) return terrno;
×
374
    }
375
  }
376

377
  while (numOfRows < rows) {
×
378
    pShow->pIter = sdbFetch(pSdb, SDB_SSMIGRATE, pShow->pIter, (void **)&pSsMigrate);
×
379
    if (pShow->pIter == NULL) break;
×
380

381
    SColumnInfoData *pColInfo;
382
    SName            n;
383
    int32_t          cols = 0;
×
384

385
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
386

387
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
388
    RETRIEVE_CHECK_GOTO(
×
389
        colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->id, false), pSsMigrate, &lino, _OVER);
390

391
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
392
    if (pDb != NULL || !IS_SYS_DBNAME(pSsMigrate->dbname)) {
×
393
      SName name = {0};
×
394
      TAOS_CHECK_GOTO(tNameFromString(&name, pSsMigrate->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
×
395
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
×
396
    } else {
397
      tstrncpy(varDataVal(tmpBuf), pSsMigrate->dbname, TSDB_SHOW_SQL_LEN);
×
398
    }
399
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
×
400
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pSsMigrate, &lino, _OVER);
×
401

402
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
403
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->startTime, false), pSsMigrate, &lino,
×
404
                        _OVER);
405

406
    numOfRows++;
×
407
    sdbRelease(pSdb, pSsMigrate);
×
408
  }
409

410
_OVER:
×
411
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
412
  pShow->numOfRows += numOfRows;
×
413
  mndReleaseDb(pMnode, pDb);
×
414
  return numOfRows;
×
415
}
416

417

418

419
int32_t mndProcessKillSsMigrateReq(SRpcMsg *pReq) {
×
420
  mError("not implemented yet");
×
421
  return TSDB_CODE_OPS_NOT_SUPPORT;
×
422
}
423

424

425

426
// update progress
427
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pReq) {
×
428
  int32_t                  code = 0;
×
429
  TAOS_RETURN(code);
×
430
}
431

432

433
static void mndSendFollowerSsMigrateReq(SMnode* pMnode, SFollowerSsMigrateReq *pReq) {
×
434
  SSdb            *pSdb = pMnode->pSdb;
×
435
  SVgObj          *pVgroup = NULL;
×
436
  void            *pIter = NULL;
×
437

438
  while (1) {
439
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
440
    if (pIter == NULL) return;
×
441

442
    if (pVgroup->vgId == pReq->vgId) {
×
443
      sdbCancelFetch(pSdb, pIter);
×
444
      break;
×
445
    }
446
  }
447

448
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
449
  sdbRelease(pSdb, pVgroup);
×
450

451
  int32_t   reqLen = tSerializeSFollowerSsMigrateReq(NULL, 0, pReq);
×
452
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
453
  SMsgHead *pHead = rpcMallocCont(contLen);
×
454
  if (pHead == NULL) {
×
455
    return;
×
456
  }
457

458
  pHead->contLen = htonl(contLen);
×
459
  pHead->vgId = htonl(pReq->vgId);
×
460
  int32_t ret = 0;
×
461
  if ((ret = tSerializeSFollowerSsMigrateReq((char *)pHead + sizeof(SMsgHead), reqLen, pReq)) < 0) {
×
462
    return;
×
463
  }
464

465
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_FOLLOWER_SSMIGRATE, .pCont = pHead, .contLen = contLen};
×
466
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
467
  if (code != 0) {
×
468
    mError("vgId:%d, ssmigrate:%d, failed to send follower-ssmigrate request to vnode since 0x%x", pReq->vgId, pReq->mnodeMigrateId, code);
×
469
  } else {
470
    mInfo("vgId:%d, ssmigrate:%d, send follower-ssmigrate request to vnode", pReq->vgId, pReq->mnodeMigrateId);
×
471
  }
472
}
473

474

475

476
static int32_t mndUpdateSsMigrateProgress(SMnode *pMnode, SRpcMsg *pReq, SQuerySsMigrateProgressRsp *rsp) {
×
477
  int32_t code = 0;
×
478
  bool inProgress = false;
×
479

480
  for( int32_t i = 0; i < taosArrayGetSize(rsp->pFileSetStates); i++) {
×
481
    SFileSetSsMigrateState *pState = taosArrayGet(rsp->pFileSetStates, i);
×
482
    if (pState != NULL && pState->state == FILE_SET_MIGRATE_STATE_IN_PROGRESS) {
×
483
      inProgress = true;
×
484
    }
485
  }
486

487
  mndSendFollowerSsMigrateReq(pMnode, rsp);
×
488

489
  if (inProgress) {
×
490
    mDebug("ssmigrate:%d, vgId:%d, some filesets are still in progress.", rsp->mnodeMigrateId, rsp->vgId);
×
491
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
492
  }
493

494
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, rsp->mnodeMigrateId);
×
495
  if (pSsMigrate == NULL) {
×
496
    mDebug("ssmigrate:%d, failed to acquire ssmigrate in mndUpdateSsMigrateProgress since %s", rsp->mnodeMigrateId, terrstr());
×
497
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
498
    if (terrno != 0) code = terrno;
×
499
    TAOS_RETURN(code);
×
500
  }
501

502
  for(int32_t i = 0; i < taosArrayGetSize(pSsMigrate->vgroups); i++) {
×
503
    SVgroupSsMigrateDetail *pDetail = taosArrayGet(pSsMigrate->vgroups, i);
×
504
    if (pDetail->vgId == rsp->vgId) {
×
505
      pDetail->done = true;
×
506
    }
507
    if (rsp->mnodeMigrateId != rsp->vnodeMigrateId) {
×
508
      pDetail->done = true; // mark as done so that mnode will not request again
×
509
    }
510
    if (!pDetail->done) {
×
511
      inProgress = true;
×
512
    }
513
  }
514

515
  STrans *pTrans = NULL;
×
516
  if (inProgress) {
×
517
    pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-ssmigrate");
×
518
    if (pTrans == NULL) {
×
519
      mndReleaseSsMigrate(pMnode, pSsMigrate);
×
520
      mError("ssmigrate:%d, failed to create update-ssmigrate trans since %s", rsp->mnodeMigrateId, terrstr());
×
521
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
522
      if (terrno != 0) code = terrno;
×
523
      TAOS_RETURN(code);
×
524
    }
525
  } else {
526
    pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "drop-ssmigrate");
×
527
    if (pTrans == NULL) {
×
528
      mndReleaseSsMigrate(pMnode, pSsMigrate);
×
529
      mError("ssmigrate:%d, failed to create drop-ssmigrate trans since %s", rsp->mnodeMigrateId, terrstr());
×
530
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
531
      if (terrno != 0) code = terrno;
×
532
      TAOS_RETURN(code);
×
533
    }
534
  }
535

536
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
×
537
  mInfo("trans:%d, ssmigrate:%d, vgId:%d, %s-trans created", pTrans->id, rsp->mnodeMigrateId, rsp->vgId, pTrans->opername);
×
538

539
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
540
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
541

542
  if (pRaw == NULL) {
×
543
    mndTransDrop(pTrans);
×
544
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
545
    if (terrno != 0) code = terrno;
×
546
    TAOS_RETURN(code);
×
547
  }
548

549
  if ((code = sdbSetRawStatus(pRaw, inProgress ? SDB_STATUS_READY : SDB_STATUS_DROPPED)) != 0) {
×
550
    mndTransDrop(pTrans);
×
551
    TAOS_RETURN(code);
×
552
  }
553

554
  if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
×
555
    mError("trans:%d, ssmigrate:%d, failed to append commit log since %s", pTrans->id, rsp->mnodeMigrateId, terrstr());
×
556
    mndTransDrop(pTrans);
×
557
    TAOS_RETURN(code);
×
558
  }
559

560
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
561
    mError("trans:%d, ssmigrate:%d, failed to prepare since %s", pTrans->id, rsp->mnodeMigrateId, terrstr());
×
562
    mndTransDrop(pTrans);
×
563
    TAOS_RETURN(code);
×
564
  }
565

566
  mndTransDrop(pTrans);
×
567
  return 0;
×
568
}
569

570

571

572
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pMsg) {
×
573
  int32_t                  code = 0;
×
574
  if (pMsg->code != 0) {
×
575
    mError("received wrong ssmigrate response, req code is %s", tstrerror(pMsg->code));
×
576
    TAOS_RETURN(pMsg->code);
×
577
  }
578

579
  SQuerySsMigrateProgressRsp rsp = {0};
×
580
  code = tDeserializeSQuerySsMigrateProgressRsp(pMsg->pCont, pMsg->contLen, &rsp);
×
581
  if (code != 0) {
×
582
    taosArrayDestroy(rsp.pFileSetStates);
×
583
    mError("failed to deserialize vnode-query-ssmigrate-progress-rsp, ret:%d, pCont:%p, len:%d", code, pMsg->pCont,
×
584
           pMsg->contLen);
585
    TAOS_RETURN(code);
×
586
  }
587

588
  if (rsp.mnodeMigrateId == rsp.vnodeMigrateId) {
×
589
      mDebug("ssmigrate:%d, vgId:%d, migrate progress received", rsp.mnodeMigrateId, rsp.vgId);
×
590
  } else {
591
      mError("ssmigrate:%d, vgId:%d, migrate progress received, but vnode side migrate id is %d",
×
592
             rsp.mnodeMigrateId,
593
             rsp.vgId,
594
             rsp.vnodeMigrateId);
595
  }
596

597
  SMnode *pMnode = pMsg->info.node;
×
598
  code = mndUpdateSsMigrateProgress(pMnode, pMsg, &rsp);
×
599
  taosArrayDestroy(rsp.pFileSetStates);
×
600

601
  TAOS_RETURN(code);
×
602
}
603

604

605

606
void mndSendQuerySsMigrateProgressReq(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
607
  SSdb            *pSdb = pMnode->pSdb;
×
608
  void            *pIter = NULL;
×
609
  SQuerySsMigrateProgressReq req = { .ssMigrateId = pSsMigrate->id };
×
610
  int32_t          reqLen = tSerializeSQuerySsMigrateProgressReq(NULL, 0, &req);
×
611
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
612
  int32_t          code = 0;
×
613

614
  while (1) {
×
615
    SDnodeObj *pDnode = NULL;
×
616
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
617
    if (pIter == NULL) break;
×
618

619
    for(int32_t i = 0; i < taosArrayGetSize(pSsMigrate->vgroups); i++) {
×
620
      SVgroupSsMigrateDetail *pDetail = taosArrayGet(pSsMigrate->vgroups, i);
×
621
      if (pDetail->nodeId != pDnode->id || pDetail->done) {
×
622
        continue;
×
623
      }
624

625
      SMsgHead *pHead = rpcMallocCont(contLen);
×
626
      if (pHead == NULL) {
×
627
        continue;
×
628
      }
629
      pHead->contLen = htonl(contLen);
×
630
      pHead->vgId = htonl(pDetail->vgId);
×
631
      TAOS_UNUSED(tSerializeSQuerySsMigrateProgressReq((char *)pHead + sizeof(SMsgHead), reqLen, &req));
×
632

633
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SSMIGRATE_PROGRESS, .pCont = pHead, .contLen = contLen};
×
634

635
      // we need to send the msg to dnode instead of vgroup, because migration may take a long time,
636
      // and leader may change during the migration process, while only the initial leader vnode
637
      // can handle the migration progress query.
638
      SEpSet epSet = mndGetDnodeEpset(pDnode);
×
639
      int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
640
      if (code != 0) {
×
641
        mError("ssmigrate:%d, vgId:%d, failed to send ssmigrate-query-progress request since 0x%x",
×
642
              pSsMigrate->id,
643
              pDetail->vgId,
644
              code);
645
      } else {
646
        mInfo("ssmigrate:%d, vgId:%d, ssmigrate-query-progress request sent", pSsMigrate->id, pDetail->vgId);
×
647
      }
648

649
      break;
×
650
    }
651
    sdbRelease(pSdb, pDnode);
×
652
  }
653
}
×
654

655

656
int32_t mndSsMigrateDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb);
657

658
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq) {
×
659
  SMnode         *pMnode = pReq->info.node;
×
660
  void *pIter = NULL;
×
661

662
  while (1) {
×
663
    SDbObj *pDb = NULL;
×
664
    pIter = sdbFetch(pMnode->pSdb, SDB_DB, pIter, (void **)&pDb);
×
665
    if (pIter == NULL) {
×
666
      break;
×
667
    }
668
    int32_t code = mndSsMigrateDb(pMnode, NULL, pDb);
×
669
    sdbRelease(pMnode->pSdb, pDb);
×
670
    if (code == TSDB_CODE_SUCCESS) {
×
671
      mInfo("ssmigrate db:%s, has been triggered by timer", pDb->name);
×
672
    } else {
673
      mError("failed to trigger ssmigrate db:%s, code:%d, %s", pDb->name, code, tstrerror(code));
×
674
    }
675
  }
676

677
  TAOS_RETURN(0);
×
678
}
679

680

681
static int32_t mndProcessQuerySsMigrateProgressTimer(SRpcMsg *pReq) {
×
682
  mTrace("start to process query ssmigrate progress timer");
×
683

684
  int32_t code = 0;
×
685
  SMnode* pMnode = pReq->info.node;
×
686
  SSdb   *pSdb = pMnode->pSdb;
×
687

688
  void *pIter = NULL;
×
689
  while (1) {
×
690
    SSsMigrateObj *pSsMigrate = NULL;
×
691
    pIter = sdbFetch(pMnode->pSdb, SDB_SSMIGRATE, pIter, (void **)&pSsMigrate);
×
692
    if (pIter == NULL) {
×
693
      break;
×
694
    }
695
    mndSendQuerySsMigrateProgressReq(pMnode, pSsMigrate);
×
696
    sdbRelease(pSdb, pSsMigrate);
×
697
  }
698

699
  return 0;
×
700
}
701

702
int32_t mndTransProcessSsMigrateVgroupRsp(SRpcMsg *pRsp) {
×
703
  int32_t ret = 0, code = 0;
×
704
  SMnode *pMnode = pRsp->info.node;
×
705

706
  SSsMigrateVgroupRsp rsp = {0};
×
707
  code = tDeserializeSSsMigrateVgroupRsp(pRsp->pCont, pRsp->contLen, &rsp);
×
708
  ret = mndTransProcessRsp(pRsp);
×
709
  if (code != 0) {
×
710
    mError("failed to deserialize ssmigrate-vgroup-rsp, ret:%d, len:%d", code, pRsp->contLen);
×
711
    return ret;
×
712
  }
713
  mInfo("mndTransProcessSsmigrateVgroupRsp, vgId:%d, ssmigrate:%d, nodeId:%d", rsp.vgId, rsp.ssMigrateId, rsp.nodeId);
×
714

715

716
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
717
  if (pSsMigrate == NULL) {
×
718
    mError("ssmigrate:%d, failed to acquire ssmigrate in mndTransProcessSsMigrateVgroupRsp since %s", rsp.ssMigrateId, terrstr());
×
719
    return ret;
×
720
  }
721

722
  for(int32_t i = 0; i < taosArrayGetSize(pSsMigrate->vgroups); i++) {
×
723
    SVgroupSsMigrateDetail *pDetail = taosArrayGet(pSsMigrate->vgroups, i);
×
724
    if (pDetail->vgId == rsp.vgId) {
×
725
      pDetail->nodeId = rsp.nodeId;
×
726
      break;
×
727
    }
728
  }
729

730
  STrans* pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-ssmigrate-nodeid");
×
731
  if (pTrans == NULL) {
×
732
    mError("failed to create update-ssmigrate-nodeid trans since %s", terrstr());
×
733
    return ret;
×
734
  }
735

736
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
×
737
  mInfo("trans:%d, ssmigrate:%d, vgId:%d, %s-trans created", pTrans->id, rsp.ssMigrateId, rsp.vgId, pTrans->opername);
×
738

739
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
740
  if (pRaw == NULL) {
×
741
    mndTransDrop(pTrans);
×
742
    mndReleaseSsMigrate(pMnode, pSsMigrate);
×
743
    return ret;
×
744
  }
745

746
  if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
×
747
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
748
    mndTransDrop(pTrans);
×
749
    mndReleaseSsMigrate(pMnode, pSsMigrate);
×
750
    return ret;
×
751
  }
752

753
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
×
754
    mndTransDrop(pTrans);
×
755
    mndReleaseSsMigrate(pMnode, pSsMigrate);
×
756
    return ret;
×
757
  }
758

759
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
760

761
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
762
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
763
    mndTransDrop(pTrans);
×
764
    return ret;
×
765
  }
766

767
  mndTransDrop(pTrans);
×
768
  return ret;
×
769
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc