• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4999

22 Mar 2026 10:21AM UTC coverage: 72.31% (-0.03%) from 72.335%
#4999

push

travis-ci

web-flow
feat(subq/some): some/any/exists for stream subq (#34860)

50 of 68 new or added lines in 1 file covered. (73.53%)

701 existing lines in 132 files now uncovered.

253472 of 350536 relevant lines covered (72.31%)

131775367.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.35
/source/dnode/mnode/impl/src/mndSsMigrate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndSsMigrate.h"
16
#include "mndDb.h"
17
#include "mndDnode.h"
18
#include "mndPrivilege.h"
19
#include "mndShow.h"
20
#include "mndTrans.h"
21
#include "mndUser.h"
22
#include "mndVgroup.h"
23
#include "tmisce.h"
24
#include "tmsgcb.h"
25

26
#define MND_SSMIGRATE_VER_NUMBER       2
27

28
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq);
29
static int32_t mndProcessUpdateSsMigrateProgressTimer(SRpcMsg *pReq);
30
static int32_t mndProcessSsMigrateListFileSetsRsp(SRpcMsg *pMsg);
31
static int32_t mndProcessSsMigrateFileSetRsp(SRpcMsg *pMsg);
32
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pReq);
33
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pMsg);
34

35
int32_t mndInitSsMigrate(SMnode *pMnode) {
469,105✔
36
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SSMIGRATE, mndRetrieveSsMigrate);
469,105✔
37
  mndSetMsgHandle(pMnode, TDMT_MND_SSMIGRATE_DB_TIMER, mndProcessSsMigrateDbTimer);
469,105✔
38
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SSMIGRATE, mndProcessKillSsMigrateReq);
469,105✔
39
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SSMIGRATE_RSP, mndTransProcessRsp);
469,105✔
40
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_SSMIGRATE_PROGRESS_TIMER, mndProcessUpdateSsMigrateProgressTimer);
469,105✔
41
  mndSetMsgHandle(pMnode, TDMT_VND_LIST_SSMIGRATE_FILESETS_RSP, mndProcessSsMigrateListFileSetsRsp);
469,105✔
42
  mndSetMsgHandle(pMnode, TDMT_VND_SSMIGRATE_FILESET_RSP, mndProcessSsMigrateFileSetRsp);
469,105✔
43
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SSMIGRATE_PROGRESS_RSP, mndProcessQuerySsMigrateProgressRsp);
469,105✔
44
  mndSetMsgHandle(pMnode, TDMT_VND_FOLLOWER_SSMIGRATE_RSP, mndProcessFollowerSsMigrateRsp);
469,105✔
45

46
  SSdbTable table = {
469,105✔
47
      .sdbType = SDB_SSMIGRATE,
48
      .keyType = SDB_KEY_INT32,
49
      .encodeFp = (SdbEncodeFp)mndSsMigrateActionEncode,
50
      .decodeFp = (SdbDecodeFp)mndSsMigrateActionDecode,
51
      .insertFp = (SdbInsertFp)mndSsMigrateActionInsert,
52
      .updateFp = (SdbUpdateFp)mndSsMigrateActionUpdate,
53
      .deleteFp = (SdbDeleteFp)mndSsMigrateActionDelete,
54
  };
55

56
  return sdbSetTable(pMnode->pSdb, table);
469,105✔
57
}
58

59
void mndCleanupSsMigrate(SMnode *pMnode) { mDebug("mnd ssmigrate cleanup"); }
469,036✔
60

61
void tFreeSsMigrateObj(SSsMigrateObj *pSsMigrate) {
32,046✔
62
  taosArrayDestroy(pSsMigrate->vgroups);
32,046✔
63
  taosArrayDestroy(pSsMigrate->fileSets);
32,046✔
64
}
32,046✔
65

66
int32_t tSerializeSSsMigrateObj(void *buf, int32_t bufLen, const SSsMigrateObj *pObj) {
65,618✔
67
  SEncoder encoder = {0};
65,618✔
68
  int32_t  code = 0;
65,618✔
69
  int32_t  lino;
70
  int32_t  tlen;
71
  tEncoderInit(&encoder, buf, bufLen);
65,618✔
72

73
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
65,618✔
74
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->id));
131,236✔
75
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->dbUid));
131,236✔
76
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
131,236✔
77
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
131,236✔
78
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->stateUpdateTime));
131,236✔
79
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgIdx));
131,236✔
80
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgState));
131,236✔
81
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->fsetIdx));
131,236✔
82
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.nodeId));
131,236✔
83
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.vgId));
131,236✔
84
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.fid));
131,236✔
85
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.state));
131,236✔
86
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->currFset.startTime));
131,236✔
87

88
  int32_t numVg = pObj->vgroups ? taosArrayGetSize(pObj->vgroups) : 0;
65,618✔
89
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numVg));
65,618✔
90
  for (int32_t i = 0; i < numVg; ++i) {
196,854✔
91
    int32_t *vgId = (int32_t *)taosArrayGet(pObj->vgroups, i);
131,236✔
92
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, *vgId));
262,472✔
93
  }
94

95
  int32_t numFset = pObj->fileSets ? taosArrayGetSize(pObj->fileSets) : 0;
65,618✔
96
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numFset));
65,618✔
97
  for (int32_t i = 0; i < numFset; ++i) {
112,924✔
98
    int32_t *fsetId = (int32_t *)taosArrayGet(pObj->fileSets, i);
47,306✔
99
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, *fsetId));
94,612✔
100
  }
101
  tEndEncode(&encoder);
65,618✔
102

103
_exit:
65,618✔
104
  if (code) {
65,618✔
105
    tlen = code;
×
106
  } else {
107
    tlen = encoder.pos;
65,618✔
108
  }
109
  tEncoderClear(&encoder);
65,618✔
110
  return tlen;
65,618✔
111
}
112

113
int32_t tDeserializeSSsMigrateObj(void *buf, int32_t bufLen, SSsMigrateObj *pObj) {
32,046✔
114
  int32_t  code = TSDB_CODE_SUCCESS, lino;
32,046✔
115
  SArray *vgroups = NULL, *fileSets = NULL;
32,046✔
116
  SDecoder decoder = {0};
32,046✔
117
  tDecoderInit(&decoder, buf, bufLen);
32,046✔
118

119
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
32,046✔
120
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->id));
64,092✔
121
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->dbUid));
64,092✔
122
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
32,046✔
123
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
64,092✔
124
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->stateUpdateTime));
64,092✔
125
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgIdx));
64,092✔
126
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgState));
64,092✔
127
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->fsetIdx));
64,092✔
128
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.nodeId));
64,092✔
129
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.vgId));
64,092✔
130
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.fid));
64,092✔
131
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.state));
64,092✔
132
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->currFset.startTime));
64,092✔
133

134
  int32_t numVg = 0;
32,046✔
135
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numVg));
32,046✔
136
  vgroups = pObj->vgroups;
32,046✔
137
  if (vgroups) {
32,046✔
138
    taosArrayClear(vgroups);
×
139
  } else if ((vgroups = taosArrayInit(numVg, sizeof(int32_t))) == NULL) {
32,046✔
140
    code = terrno;
×
141
    goto _exit;
×
142
  }
143
  for (int32_t i = 0; i < numVg; ++i) {
96,138✔
144
    int32_t vgId = 0;
64,092✔
145
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &vgId));
64,092✔
146
    if(taosArrayPush(vgroups, &vgId) == NULL) {
64,092✔
147
      TAOS_CHECK_EXIT(terrno);
×
148
    }
149
  }
150

151
  int32_t numFset = 0;
32,046✔
152
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numFset));
32,046✔
153
  fileSets = pObj->fileSets;
32,046✔
154
  if (fileSets) {
32,046✔
155
    taosArrayClear(fileSets);
×
156
  } else if ((fileSets = taosArrayInit(numFset, sizeof(int32_t))) == NULL) {
32,046✔
157
    code = terrno;
×
158
    goto _exit;
×
159
  }
160
  for (int32_t i = 0; i < numFset; ++i) {
54,936✔
161
    int32_t fsetId = 0;
22,890✔
162
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &fsetId));
22,890✔
163
    if(taosArrayPush(fileSets, &fsetId) == NULL) {
22,890✔
164
      TAOS_CHECK_EXIT(terrno);
×
165
    }
166
  }
167

168
  tEndDecode(&decoder);
32,046✔
169

170
_exit:
32,046✔
171
  tDecoderClear(&decoder);
32,046✔
172
  if (code == TSDB_CODE_SUCCESS) {
32,046✔
173
    pObj->vgroups = vgroups;
32,046✔
174
  } else if (pObj->vgroups) {
×
175
    taosArrayClear(pObj->vgroups);
×
176
  } else {
177
    taosArrayDestroy(vgroups);
×
178
  }
179
  if (code == TSDB_CODE_SUCCESS) {
32,046✔
180
    pObj->fileSets = fileSets;
32,046✔
181
  } else if (pObj->fileSets) {
×
182
    taosArrayClear(pObj->fileSets);
×
183
  } else {
184
    taosArrayDestroy(fileSets);
×
185
  }
186
  return code;
32,046✔
187
}
188

189
SSdbRaw *mndSsMigrateActionEncode(SSsMigrateObj *pSsMigrate) {
32,809✔
190
  int32_t code = 0;
32,809✔
191
  int32_t lino = 0;
32,809✔
192
  terrno = TSDB_CODE_SUCCESS;
32,809✔
193

194
  void    *buf = NULL;
32,809✔
195
  SSdbRaw *pRaw = NULL;
32,809✔
196

197
  int32_t tlen = tSerializeSSsMigrateObj(NULL, 0, pSsMigrate);
32,809✔
198
  if (tlen < 0) {
32,809✔
199
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
200
    goto OVER;
×
201
  }
202

203
  int32_t size = sizeof(int32_t) + tlen;
32,809✔
204
  pRaw = sdbAllocRaw(SDB_SSMIGRATE, MND_SSMIGRATE_VER_NUMBER, size);
32,809✔
205
  if (pRaw == NULL) {
32,809✔
206
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
207
    goto OVER;
×
208
  }
209

210
  buf = taosMemoryMalloc(tlen);
32,809✔
211
  if (buf == NULL) {
32,809✔
212
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
213
    goto OVER;
×
214
  }
215

216
  tlen = tSerializeSSsMigrateObj(buf, tlen, pSsMigrate);
32,809✔
217
  if (tlen < 0) {
32,809✔
218
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
219
    goto OVER;
×
220
  }
221

222
  int32_t dataPos = 0;
32,809✔
223
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
32,809✔
224
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
32,809✔
225
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
32,809✔
226

227
OVER:
32,809✔
228
  taosMemoryFreeClear(buf);
32,809✔
229
  if (terrno != TSDB_CODE_SUCCESS) {
32,809✔
230
    mError("ssmigrate:%" PRId32 ", failed to encode to raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
231
    sdbFreeRaw(pRaw);
×
232
    return NULL;
×
233
  }
234

235
  mTrace("ssmigrate:%" PRId32 ", encode to raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
32,809✔
236
  return pRaw;
32,809✔
237
}
238

239
SSdbRow *mndSsMigrateActionDecode(SSdbRaw *pRaw) {
32,046✔
240
  int32_t      code = 0;
32,046✔
241
  int32_t      lino = 0;
32,046✔
242
  SSdbRow     *pRow = NULL;
32,046✔
243
  SSsMigrateObj *pSsMigrate = NULL;
32,046✔
244
  void        *buf = NULL;
32,046✔
245
  terrno = TSDB_CODE_SUCCESS;
32,046✔
246

247
  int8_t sver = 0;
32,046✔
248
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
32,046✔
249
    goto OVER;
×
250
  }
251

252
  if (sver != MND_SSMIGRATE_VER_NUMBER) {
32,046✔
253
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
254
    mError("ssmigrate read invalid ver, data ver: %d, curr ver: %d", sver, MND_SSMIGRATE_VER_NUMBER);
×
255
    goto OVER;
×
256
  }
257

258
  pRow = sdbAllocRow(sizeof(SSsMigrateObj));
32,046✔
259
  if (pRow == NULL) {
32,046✔
260
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
261
    goto OVER;
×
262
  }
263

264
  pSsMigrate = sdbGetRowObj(pRow);
32,046✔
265
  if (pSsMigrate == NULL) {
32,046✔
266
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
267
    goto OVER;
×
268
  }
269

270
  int32_t tlen;
32,046✔
271
  int32_t dataPos = 0;
32,046✔
272
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
32,046✔
273
  buf = taosMemoryMalloc(tlen + 1);
32,046✔
274
  if (buf == NULL) {
32,046✔
275
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
276
    goto OVER;
×
277
  }
278
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
32,046✔
279

280
  if ((terrno = tDeserializeSSsMigrateObj(buf, tlen, pSsMigrate)) < 0) {
32,046✔
281
    goto OVER;
×
282
  }
283

284
OVER:
32,046✔
285
  taosMemoryFreeClear(buf);
32,046✔
286
  if (terrno != TSDB_CODE_SUCCESS) {
32,046✔
287
    mError("ssmigrate:%" PRId32 ", failed to decode from raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
288
    taosMemoryFreeClear(pRow);
×
289
    return NULL;
×
290
  }
291

292
  mTrace("ssmigrate:%" PRId32 ", decode from raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
32,046✔
293
  return pRow;
32,046✔
294
}
295

296
int32_t mndSsMigrateActionInsert(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
2,289✔
297
  mTrace("ssmigrate:%" PRId32 ", perform insert action", pSsMigrate->id);
2,289✔
298
  return 0;
2,289✔
299
}
300

301
int32_t mndSsMigrateActionDelete(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
32,046✔
302
  mTrace("ssmigrate:%" PRId32 ", perform delete action", pSsMigrate->id);
32,046✔
303
  tFreeSsMigrateObj(pSsMigrate);
32,046✔
304
  return 0;
32,046✔
305
}
306

307
int32_t mndSsMigrateActionUpdate(SSdb *pSdb, SSsMigrateObj *pOldSsMigrate, SSsMigrateObj *pNewSsMigrate) {
27,468✔
308
  mTrace("ssmigrate:%" PRId32 ", perform update action, old row:%p new row:%p", pOldSsMigrate->id, pOldSsMigrate,
27,468✔
309
         pNewSsMigrate);
310

311
  return 0;
27,468✔
312
}
313

314
SSsMigrateObj *mndAcquireSsMigrate(SMnode *pMnode, int64_t ssMigrateId) {
15,260✔
315
  SSdb        *pSdb = pMnode->pSdb;
15,260✔
316
  SSsMigrateObj *pSsMigrate = sdbAcquire(pSdb, SDB_SSMIGRATE, &ssMigrateId);
15,260✔
317
  if (pSsMigrate == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
15,260✔
318
    terrno = TSDB_CODE_SUCCESS;
×
319
  }
320
  return pSsMigrate;
15,260✔
321
}
322

323
void mndReleaseSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
15,260✔
324
  SSdb *pSdb = pMnode->pSdb;
15,260✔
325
  sdbRelease(pSdb, pSsMigrate);
15,260✔
326
  pSsMigrate = NULL;
15,260✔
327
}
15,260✔
328

329
int32_t mndSsMigrateGetDbName(SMnode *pMnode, int32_t ssMigrateId, char *dbname, int32_t len) {
×
330
  int32_t      code = 0;
×
331
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, ssMigrateId);
×
332
  if (pSsMigrate == NULL) {
×
333
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
334
    if (terrno != 0) code = terrno;
×
335
    TAOS_RETURN(code);
×
336
  }
337

338
  tstrncpy(dbname, pSsMigrate->dbname, len);
×
339
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
340
  TAOS_RETURN(code);
×
341
}
342

343
// ssmigrate db
344
int32_t mndAddSsMigrateToTran(SMnode *pMnode, STrans *pTrans, SSsMigrateObj *pSsMigrate, SDbObj *pDb) {
2,289✔
345
  int32_t code = 0;
2,289✔
346
  SSdb   *pSdb = pMnode->pSdb;
2,289✔
347
  void   *pIter = NULL;
2,289✔
348

349
  pSsMigrate->dbUid = pDb->uid;
2,289✔
350
  pSsMigrate->id = tGenIdPI32();
2,289✔
351
  tstrncpy(pSsMigrate->dbname, pDb->name, sizeof(pSsMigrate->dbname));
2,289✔
352
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
2,289✔
353
  pSsMigrate->vgIdx = 0;
2,289✔
354
  pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
2,289✔
355
  pSsMigrate->fsetIdx = 0;
2,289✔
356

357
  pSsMigrate->vgroups = taosArrayInit(8, sizeof(int32_t));
2,289✔
358
  if (pSsMigrate->vgroups == NULL) {
2,289✔
359
    TAOS_RETURN(terrno);
×
360
  }
361

362
  while (1) {
9,156✔
363
    SVgObj *pVgroup = NULL;
11,445✔
364
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
11,445✔
365
    if (pIter == NULL) break;
11,445✔
366

367
    if (pVgroup->mountVgId || pVgroup->dbUid != pDb->uid) {
9,156✔
368
      sdbRelease(pSdb, pVgroup);
4,578✔
369
      continue;
4,578✔
370
    }
371

372
    int32_t vgId = pVgroup->vgId;
4,578✔
373
    sdbRelease(pSdb, pVgroup);
4,578✔
374
    if (taosArrayPush(pSsMigrate->vgroups, &vgId) == NULL) {
9,156✔
375
      code = terrno;
×
376
      taosArrayDestroy(pSsMigrate->vgroups);
×
377
      pSsMigrate->vgroups = NULL;
×
378
      sdbCancelFetch(pSdb, pIter);
×
379
      sdbRelease(pSdb, pVgroup);
×
380
      TAOS_RETURN(code);
×
381
    }
382
  }
383

384
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
2,289✔
385
  code = terrno;
2,289✔
386
  taosArrayDestroy(pSsMigrate->vgroups);
2,289✔
387
  pSsMigrate->vgroups = NULL;
2,289✔
388
  if (pRaw == NULL) {
2,289✔
389
    TAOS_RETURN(code);
×
390
  }
391
  if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
2,289✔
392
    sdbFreeRaw(pRaw);
×
393
    TAOS_RETURN(code);
×
394
  }
395

396
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
2,289✔
397
    sdbFreeRaw(pRaw);
×
398
    TAOS_RETURN(code);
×
399
  }
400

401
  mInfo("trans:%d, ssmigrate:%d, db:%s, has been added", pTrans->id, pSsMigrate->id, pSsMigrate->dbname);
2,289✔
402
  return 0;
2,289✔
403
}
404

405
// retrieve ssmigrate
406
int32_t mndRetrieveSsMigrate(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
90,797✔
407
  SMnode        *pMnode = pReq->info.node;
90,797✔
408
  SSdb          *pSdb = pMnode->pSdb;
90,797✔
409
  int32_t        numOfRows = 0;
90,797✔
410
  SSsMigrateObj *pSsMigrate = NULL;
90,797✔
411
  char          *sep = NULL;
90,797✔
412
  SDbObj        *pDb = NULL;
90,797✔
413
  int32_t        code = 0;
90,797✔
414
  int32_t        lino = 0;
90,797✔
415
  SUserObj      *pUser = NULL;
90,797✔
416
  SDbObj        *pIterDb = NULL;
90,797✔
417
  char           objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
90,797✔
418
  bool           showAll = false, showIter = false;
90,797✔
419
  int64_t        dbUid = 0;
90,797✔
420

421
  if (strlen(pShow->db) > 0) {
90,797✔
422
    sep = strchr(pShow->db, '.');
×
423
    if (sep &&
×
424
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
425
      sep++;
×
426
    } else {
427
      pDb = mndAcquireDb(pMnode, pShow->db);
×
428
      if (pDb == NULL) return terrno;
×
429
    }
430
  }
431

432
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_SSMIGRATES, PRIV_OBJ_DB, 0,
90,797✔
433
                                   _OVER);
434

435
  while (numOfRows < rows) {
179,305✔
436
    pShow->pIter = sdbFetch(pSdb, SDB_SSMIGRATE, pShow->pIter, (void **)&pSsMigrate);
179,305✔
437
    if (pShow->pIter == NULL) break;
179,305✔
438

439
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pSsMigrate->dbname, pSsMigrate, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_SSMIGRATES, _OVER);
88,508✔
440

441
    SColumnInfoData *pColInfo;
442
    SName            n;
443
    int32_t          cols = 0;
88,508✔
444

445
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
88,508✔
446

447
    // ssmigrate_id
448
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
88,508✔
449
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->id, false), pSsMigrate, &lino, _OVER);
88,508✔
450

451
    // db_name
452
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
88,508✔
453
    if (pDb != NULL || !IS_SYS_DBNAME(pSsMigrate->dbname)) {
177,016✔
454
      SName name = {0};
88,508✔
455
      TAOS_CHECK_GOTO(tNameFromString(&name, pSsMigrate->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
88,508✔
456
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
88,508✔
457
    } else {
458
      tstrncpy(varDataVal(tmpBuf), pSsMigrate->dbname, TSDB_SHOW_SQL_LEN);
×
459
    }
460
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
88,508✔
461
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pSsMigrate, &lino, _OVER);
88,508✔
462

463
    // start_time
464
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
88,508✔
465
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->startTime, false), pSsMigrate, &lino, _OVER);
88,508✔
466

467
    // number_vgroup
468
    int32_t numVg = taosArrayGetSize(pSsMigrate->vgroups);
88,508✔
469
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
88,508✔
470
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&numVg, false), pSsMigrate, &lino, _OVER);
88,508✔
471

472
    // migrated_vgroup
473
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
88,508✔
474
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->vgIdx, false), pSsMigrate, &lino, _OVER);
88,508✔
475

476
    if (pSsMigrate->vgIdx < numVg) {
88,508✔
477
      // vgroup_id
478
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
86,219✔
479
      int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
86,219✔
480
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&vgId, false), pSsMigrate, &lino, _OVER);
86,219✔
481
      
482
      // number_fileset
483
      int32_t numFset = taosArrayGetSize(pSsMigrate->fileSets);
86,219✔
484
      if (pSsMigrate->vgState < SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
86,219✔
485
        numFset = 0;
6,867✔
486
      }
487
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
86,219✔
488
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&numFset, false), pSsMigrate, &lino, _OVER);
86,219✔
489

490
      // migrated_fileset
491
      int32_t fsetIdx = pSsMigrate->fsetIdx;
86,219✔
492
      if (pSsMigrate->vgState < SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
86,219✔
493
        fsetIdx = 0;
6,867✔
494
      }
495
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
86,219✔
496
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&fsetIdx, false), pSsMigrate, &lino, _OVER);
86,219✔
497

498
      // fileset_id
499
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
86,219✔
500
      if (fsetIdx < numFset) {
86,219✔
501
        int32_t fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, fsetIdx);
79,352✔
502
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&fid, false), pSsMigrate, &lino, _OVER);
79,352✔
503
      } else {
504
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
6,867✔
505
      }
506
    } else {
507
      // vgroup_id
508
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,289✔
509
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
2,289✔
510

511
      // number_fileset
512
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,289✔
513
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
2,289✔
514

515
      // migrated_fileset
516
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,289✔
517
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
2,289✔
518

519
      // fileset_id
520
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,289✔
521
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
2,289✔
522
    }
523

524
    numOfRows++;
88,508✔
525
    sdbRelease(pSdb, pSsMigrate);
88,508✔
526
  }
527

528
_OVER:
90,797✔
529
  if (pUser) mndReleaseUser(pMnode, pUser);
90,797✔
530
  mndReleaseDb(pMnode, pDb);
90,797✔
531
  if (code != 0) {
90,797✔
532
    mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
533
    TAOS_RETURN(code);
×
534
  }
535
  pShow->numOfRows += numOfRows;
90,797✔
536
  return numOfRows;
90,797✔
537
}
538

539

540
int32_t mndSsMigrateDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb);
541

542
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq) {
×
543
  SMnode *pMnode = pReq->info.node;
×
544
  void *pIter = NULL;
×
545

546
  while (1) {
×
547
    SDbObj *pDb = NULL;
×
548
    pIter = sdbFetch(pMnode->pSdb, SDB_DB, pIter, (void **)&pDb);
×
549
    if (pIter == NULL) {
×
550
      break;
×
551
    }
552
    int32_t code = mndSsMigrateDb(pMnode, NULL, pDb);
×
553
    sdbRelease(pMnode->pSdb, pDb);
×
554
    if (code == TSDB_CODE_SUCCESS) {
×
555
      mInfo("ssmigrate db:%s, has been triggered by timer", pDb->name);
×
556
    } else {
557
      mError("failed to trigger ssmigrate db:%s, code:%d, %s", pDb->name, code, tstrerror(code));
×
558
    }
559
  }
560

561
  TAOS_RETURN(0);
×
562
}
563

564

565
static int32_t mndDropSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
2,289✔
566
  int32_t code = 0, lino = 0;
2,289✔
567

568
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "drop-ssmigrate");
2,289✔
569
  if (pTrans == NULL) {
2,289✔
570
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
571
  }
572

573
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
2,289✔
574

575
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
2,289✔
576
  if (pRaw == NULL) {
2,289✔
577
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
578
  }
579
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED)) != 0) {
2,289✔
580
    sdbFreeRaw(pRaw);
×
581
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
582
  }
583

584
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _exit);
2,289✔
585
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
2,289✔
586

587
_exit:
2,289✔
588
  mndTransDrop(pTrans);
2,289✔
589
  if (code == TSDB_CODE_SUCCESS) {
2,289✔
590
    mInfo("ssmigrate:%d was dropped successfully", pSsMigrate->id);
2,289✔
591
  } else {
592
    mError("ssmigrate:%d, failed to drop at lino %d since %s", pSsMigrate->id, lino, tstrerror(code));
×
593
  }
594
  return code;
2,289✔
595
}
596

597

598
static void mndUpdateSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
27,468✔
599
  int32_t code = 0, lino = 0;
27,468✔
600

601
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-ssmigrate");
27,468✔
602
  if (pTrans == NULL) {
27,468✔
603
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
604
  }
605
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
27,468✔
606

607
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
27,468✔
608
  if (pRaw == NULL) {
27,468✔
609
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
610
  }
611
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
27,468✔
612
    sdbFreeRaw(pRaw);
×
613
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
614
  }
615

616
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _exit);
27,468✔
617
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
27,468✔
618

619
_exit:
27,468✔
620
  mndTransDrop(pTrans);
27,468✔
621
  if (code == TSDB_CODE_SUCCESS) {
27,468✔
622
    mTrace("ssmigrate:%d was updated successfully", pSsMigrate->id);
27,468✔
623
  } else {
624
    mError("ssmigrate:%d, failed to update at lino %d since %s", pSsMigrate->id, lino, tstrerror(code));
×
625
  }
626
}
27,468✔
627

628

629
static int32_t mndKillSsMigrate(SMnode *pMnode, SRpcMsg *pReq, SSsMigrateObj *pSsMigrate) {
×
630
  if (pSsMigrate->vgIdx >= taosArrayGetSize(pSsMigrate->vgroups)) {
×
631
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
632
  }
633

634
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
635
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
636
  if (pVgroup == NULL) {
×
637
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
638
  }
639

640
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
641
  mndReleaseVgroup(pMnode, pVgroup);
×
642

643
  SVnodeKillSsMigrateReq req = {.ssMigrateId = pSsMigrate->id};
×
644
  int32_t   reqLen = tSerializeSVnodeKillSsMigrateReq(NULL, 0, &req);
×
645
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
646
  SMsgHead *pHead = rpcMallocCont(contLen);
×
647
  if (pHead == NULL) {
×
648
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
649
  }
650

651
  pHead->contLen = htonl(contLen);
×
652
  pHead->vgId = htonl(vgId);
×
653
  int32_t ret = 0;
×
654
  if ((ret = tSerializeSVnodeKillSsMigrateReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
655
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
656
  }
657

658
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_KILL_SSMIGRATE, .pCont = pHead, .contLen = contLen};
×
659
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
660
  if (code != 0) {
×
661
    mError("ssmigrate:%d, vgId:%d, failed to send kill ssmigrate request to vnode since 0x%x", req.ssMigrateId, vgId, code);
×
662
  } else {
663
    mInfo("ssmigrate:%d, vgId:%d, kill ssmigrate request was sent to vnode", req.ssMigrateId, vgId);
×
664
  }
665

666
  return mndDropSsMigrate(pMnode, pSsMigrate);
×
667
}
668

669
int32_t mndProcessKillSsMigrateReq(SRpcMsg *pReq) {
×
670
  int32_t         code = 0;
×
671
  int32_t         lino = 0;
×
672
  SKillSsMigrateReq killReq = {0};
×
673

674
  if ((code = tDeserializeSKillSsMigrateReq(pReq->pCont, pReq->contLen, &killReq)) != 0) {
×
675
    TAOS_RETURN(code);
×
676
  }
677

678
  mInfo("start to kill ssmigrate:%" PRId32, killReq.ssMigrateId);
×
679

680
  SMnode      *pMnode = pReq->info.node;
×
681
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, killReq.ssMigrateId);
×
682
  if (pSsMigrate == NULL) {
×
683
    code = TSDB_CODE_MND_INVALID_SSMIGRATE_ID;
×
684
    tFreeSKillSsMigrateReq(&killReq);
×
685
    TAOS_RETURN(code);
×
686
  }
687

688
  //TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_SSMIGRATE_DB), &lino, _OVER);
689

690
  TAOS_CHECK_GOTO(mndKillSsMigrate(pMnode, pReq, pSsMigrate), &lino, _OVER);
×
691

692
_OVER:
×
693
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
694
    mError("failed to kill ssmigrate %" PRId32 " since %s", killReq.ssMigrateId, tstrerror(code));
×
695
  }
696

697
  tFreeSKillSsMigrateReq(&killReq);
×
698
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
699

700
  TAOS_RETURN(code);
×
701
}
702

703

704
static void mndSendSsMigrateListFileSetsReq(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
4,578✔
705
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
4,578✔
706
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
4,578✔
707
  if (pVgroup == NULL) {
4,578✔
708
    mError("ssmigrate:%d, vgId:%d, vgroup does not exist in %s", pSsMigrate->id, vgId, __func__);
×
709
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
710
    pSsMigrate->vgIdx++;
×
711
    return;
×
712
  }
713

714
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
4,578✔
715
  mndReleaseVgroup(pMnode, pVgroup);
4,578✔
716

717
  SListSsMigrateFileSetsReq req = {.ssMigrateId = pSsMigrate->id};
4,578✔
718
  int32_t   reqLen = tSerializeSListSsMigrateFileSetsReq(NULL, 0, &req);
4,578✔
719
  int32_t   contLen = reqLen + sizeof(SMsgHead);
4,578✔
720
  SMsgHead *pHead = rpcMallocCont(contLen);
4,578✔
721
  if (pHead == NULL) {
4,578✔
722
    return;
×
723
  }
724

725
  pHead->contLen = htonl(contLen);
4,578✔
726
  pHead->vgId = htonl(vgId);
4,578✔
727
  int32_t ret = 0;
4,578✔
728
  if ((ret = tSerializeSListSsMigrateFileSetsReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
4,578✔
729
    return;
×
730
  }
731

732
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_LIST_SSMIGRATE_FILESETS, .pCont = pHead, .contLen = contLen};
4,578✔
733
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
4,578✔
734
  if (code != 0) {
4,578✔
735
    mError("ssmigrate:%d, vgId:%d, failed to send list filesets request to vnode since 0x%x", req.ssMigrateId, vgId, code);
×
736
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
737
    pSsMigrate->vgIdx++;
×
738
  } else {
739
    mInfo("ssmigrate:%d, vgId:%d, list filesets request was sent to vnode", req.ssMigrateId, vgId);
4,578✔
740
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_WAITING_FSET_LIST;
4,578✔
741
  }
742

743
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
4,578✔
744
  mndUpdateSsMigrate(pMnode, pSsMigrate);
4,578✔
745
}
746

747

748
static int32_t mndProcessSsMigrateListFileSetsRsp(SRpcMsg *pMsg) {
4,578✔
749
  int32_t code = 0, lino = 0;
4,578✔
750

751
  if (pMsg->code != 0) {
4,578✔
752
    mError("received wrong ssmigrate list filesets response, req code is %s", tstrerror(pMsg->code));
×
753
    TAOS_RETURN(pMsg->code);
×
754
  }
755

756
  SSsMigrateObj *pSsMigrate = NULL;
4,578✔
757
  SListSsMigrateFileSetsRsp rsp = {0};
4,578✔
758
  code = tDeserializeSListSsMigrateFileSetsRsp(pMsg->pCont, pMsg->contLen, &rsp);
4,578✔
759
  TAOS_CHECK_GOTO(code, &lino, _exit);
4,578✔
760

761
  SMnode *pMnode = pMsg->info.node;
4,578✔
762
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
4,578✔
763
  if (pSsMigrate == NULL) {
4,578✔
764
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
765
  }
766

767
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
4,578✔
768
  if (vgId != rsp.vgId) {
4,578✔
769
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
770
  }
771

772
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_WAITING_FSET_LIST) {
4,578✔
773
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
774
  }
775

776
  // we need to use the new filesets to update the SSsMigrateObj,
777
  // swap is only to make the it is easier to free both of them.
778
  SArray* tmp = pSsMigrate->fileSets;
4,578✔
779
  pSsMigrate->fileSets = rsp.pFileSets;
4,578✔
780
  rsp.pFileSets = tmp;
4,578✔
781

782
  pSsMigrate->fsetIdx = 0;
4,578✔
783
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
4,578✔
784
  if (taosArrayGetSize(pSsMigrate->fileSets) == 0) {
4,578✔
785
    mInfo("ssmigrate:%d, vgId:%d, no filesets to migrate.", pSsMigrate->id, rsp.vgId);
2,289✔
786
    pSsMigrate->vgIdx++;
2,289✔
787
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
2,289✔
788
  } else {
789
    mInfo("ssmigrate:%d, vgId:%d, filesets received.", pSsMigrate->id, rsp.vgId);
2,289✔
790
    pSsMigrate->currFset.nodeId = 0;
2,289✔
791
    pSsMigrate->currFset.vgId = vgId;
2,289✔
792
    pSsMigrate->currFset.fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, 0);
2,289✔
793
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
2,289✔
794
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
2,289✔
795
  }
796

797
  mndUpdateSsMigrate(pMnode, pSsMigrate);
4,578✔
798
  
799
_exit:
4,578✔
800
  if (code != TSDB_CODE_SUCCESS) {
4,578✔
801
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
802
  }
803
  tFreeSListSsMigrateFileSetsRsp(&rsp);
4,578✔
804
  mndReleaseSsMigrate(pMnode, pSsMigrate);
4,578✔
805
  return code;
4,578✔
806
}
807

808

809
static void mndSendSsMigrateFileSetReq(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
3,815✔
810
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
3,815✔
811
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
3,815✔
812
  if (pVgroup == NULL) {
3,815✔
813
    mError("ssmigrate:%d, vgId:%d, vgroup does not exist in %s", pSsMigrate->id, vgId, __func__);
×
814
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
815
    pSsMigrate->vgIdx++;
×
816
    return;
×
817
  }
818

819
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
3,815✔
820
  mndReleaseVgroup(pMnode, pVgroup);
3,815✔
821

822
  SSsMigrateFileSetReq req = { 0 };
3,815✔
823
  req.ssMigrateId = pSsMigrate->id;
3,815✔
824
  req.nodeId = 0;
3,815✔
825
  req.fid = pSsMigrate->currFset.fid;
3,815✔
826
  req.startTimeSec = taosGetTimestampSec();
3,815✔
827

828
  int32_t   reqLen = tSerializeSSsMigrateFileSetReq(NULL, 0, &req);
3,815✔
829
  int32_t   contLen = reqLen + sizeof(SMsgHead);
3,815✔
830
  SMsgHead *pHead = rpcMallocCont(contLen);
3,815✔
831
  if (pHead == NULL) {
3,815✔
832
    return;
×
833
  }
834

835
  pHead->contLen = htonl(contLen);
3,815✔
836
  pHead->vgId = htonl(vgId);
3,815✔
837
  int32_t ret = 0;
3,815✔
838
  if ((ret = tSerializeSSsMigrateFileSetReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
3,815✔
839
    return;
×
840
  }
841

842
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_SSMIGRATE_FILESET, .pCont = pHead, .contLen = contLen};
3,815✔
843
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
3,815✔
844
  if (code != 0) {
3,815✔
845
    mError("ssmigrate:%d, vgId:%d, fid:%d, failed to send migrate fileset request to vnode since 0x%x", req.ssMigrateId, vgId, req.fid, code);
×
846
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
847
    pSsMigrate->vgIdx++;
×
848
  } else {
849
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, migrate fileset request was sent to vnode", req.ssMigrateId, vgId, req.fid);
3,815✔
850
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_STARTING;
3,815✔
851
    pSsMigrate->currFset.startTime = req.startTimeSec;
3,815✔
852
  }
853

854
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
3,815✔
855
  mndUpdateSsMigrate(pMnode, pSsMigrate);
3,815✔
856
}
857

858

859
static int32_t mndProcessSsMigrateFileSetRsp(SRpcMsg *pMsg) {
3,815✔
860
  int32_t code = 0, lino = 0;
3,815✔
861

862
  if (pMsg->code != 0) {
3,815✔
863
    mError("received wrong ssmigrate fileset response, error code is %s", tstrerror(pMsg->code));
×
864
    TAOS_RETURN(pMsg->code);
×
865
  }
866

867
  SSsMigrateObj *pSsMigrate = NULL;
3,815✔
868
  SSsMigrateFileSetRsp rsp = {0};
3,815✔
869
  code = tDeserializeSSsMigrateFileSetRsp(pMsg->pCont, pMsg->contLen, &rsp);
3,815✔
870
  TAOS_CHECK_GOTO(code, &lino, _exit);
3,815✔
871

872
  SMnode *pMnode = pMsg->info.node;
3,815✔
873
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
3,815✔
874
  if (pSsMigrate == NULL) {
3,815✔
875
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
876
  }
877

878
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
3,815✔
879
  if (vgId != rsp.vgId) {
3,815✔
880
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
881
  }
882
  if (rsp.fid != pSsMigrate->currFset.fid) {
3,815✔
883
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
884
  }
885
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_FSET_STARTING) {
3,815✔
886
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
887
  }
888
  if (rsp.nodeId <= 0) {
3,815✔
889
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
890
  }
891

892
  mInfo("ssmigrate:%d, vgId:%d, fid:%d, leader node is %d", rsp.ssMigrateId, vgId, rsp.fid, rsp.nodeId);
3,815✔
893
  pSsMigrate->currFset.nodeId = rsp.nodeId;
3,815✔
894
  pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_STARTED;
3,815✔
895
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
3,815✔
896
  mndUpdateSsMigrate(pMnode, pSsMigrate);
3,815✔
897
  
898
_exit:
3,815✔
899
  if (code != TSDB_CODE_SUCCESS) {
3,815✔
900
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
901
  }
902
  mndReleaseSsMigrate(pMnode, pSsMigrate);
3,815✔
903
  return code;
3,815✔
904
}
905

906

907

908
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pReq) {
6,867✔
909
  TAOS_RETURN(0);
6,867✔
910
}
911

912

913
static void mndSendFollowerSsMigrateReq(SMnode* pMnode, SSsMigrateObj *pSsMigrate) {
6,867✔
914
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, pSsMigrate->currFset.vgId);
6,867✔
915
  if (pVgroup == NULL) {
6,867✔
916
    return;
×
917
  }
918

919
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
6,867✔
920
  mndReleaseVgroup(pMnode, pVgroup);
6,867✔
921

922
  SSsMigrateProgress req = {
6,867✔
923
    .ssMigrateId = pSsMigrate->id,
6,867✔
924
    .nodeId = pSsMigrate->currFset.nodeId,
6,867✔
925
    .vgId = pSsMigrate->currFset.vgId,
6,867✔
926
    .fid = pSsMigrate->currFset.fid,
6,867✔
927
    .state = pSsMigrate->currFset.state,
6,867✔
928
  };
929

930
  int32_t          reqLen = tSerializeSSsMigrateProgress(NULL, 0, &req);
6,867✔
931
  int32_t          contLen = reqLen + sizeof(SMsgHead);
6,867✔
932
  SMsgHead *pHead = rpcMallocCont(contLen);
6,867✔
933
  if (pHead == NULL) {
6,867✔
934
    return;
×
935
  }
936

937
  pHead->contLen = htonl(contLen);
6,867✔
938
  pHead->vgId = htonl(req.vgId);
6,867✔
939
  TAOS_UNUSED(tSerializeSSsMigrateProgress((char *)pHead + sizeof(SMsgHead), reqLen, &req));
6,867✔
940
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_FOLLOWER_SSMIGRATE, .pCont = pHead, .contLen = contLen};
6,867✔
941

942
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
6,867✔
943
  if (code != 0) {
6,867✔
944
    mError("vgId:%d, ssmigrate:%d, fid:%d, failed to send follower-ssmigrate request since 0x%x", req.ssMigrateId, req.vgId, req.fid, code);
×
945
  } else {
946
    mTrace("vgId:%d, ssmigrate:%d, fid:%d, follower-ssmigrate request sent", req.ssMigrateId, req.vgId, req.fid);
6,867✔
947
  }
948
}
949

950

951

952
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pMsg) {
6,867✔
953
  int32_t code = 0, lino = 0;
6,867✔
954

955
  if (pMsg->code != 0) {
6,867✔
956
    mError("received wrong query ssmigrate progress response, error code is %s", tstrerror(pMsg->code));
×
957
    TAOS_RETURN(pMsg->code);
×
958
  }
959

960
  SSsMigrateObj *pSsMigrate = NULL;
6,867✔
961
  SSsMigrateProgress rsp = {0};
6,867✔
962
  code = tDeserializeSSsMigrateProgress(pMsg->pCont, pMsg->contLen, &rsp);
6,867✔
963
  TAOS_CHECK_GOTO(code, &lino, _exit);
6,867✔
964

965
  SMnode *pMnode = pMsg->info.node;
6,867✔
966
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
6,867✔
967
  if (pSsMigrate == NULL) {
6,867✔
968
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
969
  }
970

971
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_FSET_STARTED) {
6,867✔
972
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
973
  }
974
  if (rsp.nodeId != pSsMigrate->currFset.nodeId) {
6,867✔
975
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
976
  }
977
  if (rsp.vgId != pSsMigrate->currFset.vgId) {
6,867✔
978
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
979
  }
980
  if (rsp.fid != pSsMigrate->currFset.fid) {
6,867✔
981
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
982
  }
983

984
  if (rsp.state == pSsMigrate->currFset.state) {
6,867✔
985
    mTrace("ssmigrate:%d, vgId:%d, fid:%d, state is %d", rsp.ssMigrateId, rsp.vgId, rsp.fid, rsp.state);
3,052✔
986
  } else {
987
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, state is %d", rsp.ssMigrateId, rsp.vgId, rsp.fid, rsp.state);
3,815✔
988
  }
989
  pSsMigrate->currFset.state = rsp.state;
6,867✔
990
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
6,867✔
991
  mndUpdateSsMigrate(pMnode, pSsMigrate);
6,867✔
992

993
  mndSendFollowerSsMigrateReq(pMnode, pSsMigrate);
6,867✔
994

995
_exit:
6,867✔
996
  if (code != TSDB_CODE_SUCCESS) {
6,867✔
997
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
998
  }
999
  mndReleaseSsMigrate(pMnode, pSsMigrate);
6,867✔
1000
  return code;
6,867✔
1001
}
1002

1003

1004

1005
// when query migration progress, we need to send the msg to dnode instead of vgroup,
1006
// because migration may take a long time, and leader may change during the migration process,
1007
// while only the initial leader vnode can handle the migration progress query.
1008
void mndSendQuerySsMigrateProgressReq(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
6,867✔
1009
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pSsMigrate->currFset.nodeId);
6,867✔
1010
  if (pDnode == NULL) {
6,867✔
1011
    return;
×
1012
  }
1013

1014
  SEpSet epSet = mndGetDnodeEpset(pDnode);
6,867✔
1015
  mndReleaseDnode(pMnode, pDnode);
6,867✔
1016

1017
  SSsMigrateProgress req = {
6,867✔
1018
    .ssMigrateId = pSsMigrate->id,
6,867✔
1019
    .nodeId = pSsMigrate->currFset.nodeId,
6,867✔
1020
    .vgId = pSsMigrate->currFset.vgId,
6,867✔
1021
    .fid = pSsMigrate->currFset.fid,
6,867✔
1022
    .state = SSMIGRATE_FILESET_STATE_IN_PROGRESS,
1023
  };
1024

1025
  int32_t          reqLen = tSerializeSSsMigrateProgress(NULL, 0, &req);
6,867✔
1026
  int32_t          contLen = reqLen + sizeof(SMsgHead);
6,867✔
1027
  SMsgHead *pHead = rpcMallocCont(contLen);
6,867✔
1028
  if (pHead == NULL) {
6,867✔
1029
    return;
×
1030
  }
1031

1032
  pHead->contLen = htonl(contLen);
6,867✔
1033
  pHead->vgId = htonl(req.vgId);
6,867✔
1034
  TAOS_UNUSED(tSerializeSSsMigrateProgress((char *)pHead + sizeof(SMsgHead), reqLen, &req));
6,867✔
1035
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SSMIGRATE_PROGRESS, .pCont = pHead, .contLen = contLen};
6,867✔
1036

1037
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
6,867✔
1038
  if (code != 0) {
6,867✔
1039
    mError("ssmigrate:%d, vgId:%d, fid:%d, failed to send ssmigrate-query-progress request since 0x%x", req.ssMigrateId, req.vgId, req.fid, code)
×
1040
  } else {
1041
    mTrace("ssmigrate:%d, vgId:%d, fid:%d, ssmigrate-query-progress request sent", req.ssMigrateId, req.vgId, req.fid);
6,867✔
1042
  }
1043
}
1044

1045

1046

1047
static void mndUpdateSsMigrateProgress(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
87,745✔
1048
  int32_t numVg = taosArrayGetSize(pSsMigrate->vgroups);
87,745✔
1049
  int32_t numFset = taosArrayGetSize(pSsMigrate->fileSets);
87,745✔
1050

1051
  if (pSsMigrate->vgIdx >= numVg) {
87,745✔
1052
    mInfo("ssmigrate:%d, all vgroups has been processed", pSsMigrate->id);
2,289✔
1053
    TAOS_UNUSED(mndDropSsMigrate(pMnode, pSsMigrate));
2,289✔
1054
    return;
2,289✔
1055
  }
1056

1057
  // vgroup state is init, we need to get the list of its file sets
1058
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_INIT) {
85,456✔
1059
    mndSendSsMigrateListFileSetsReq(pMnode, pSsMigrate);
4,578✔
1060
    return;
4,578✔
1061
  }
1062

1063
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
80,878✔
1064

1065
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_WAITING_FSET_LIST) {
80,878✔
1066
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1067
      mWarn("ssmigrate:%d, vgId:%d, haven't receive file set list in 30 seconds, skip", pSsMigrate->id, vgId);
×
1068
      pSsMigrate->vgIdx++;
×
1069
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1070
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1071
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1072
    }
1073
    return;
×
1074
  }
1075

1076
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
80,878✔
1077
    mndSendSsMigrateFileSetReq(pMnode, pSsMigrate);
3,815✔
1078
    return;
3,815✔
1079
  }
1080

1081
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_FSET_STARTING) {
77,063✔
1082
    // if timeout, we skip the current vgroup instead of the current file set, because timeout
1083
    // of a file set often means the vgroup is not available.
UNCOV
1084
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1085
      mWarn("ssmigrate:%d, vgId:%d, fid:%d, haven't receive response in 30 seconds, skip", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
1086
      pSsMigrate->vgIdx++;
×
1087
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1088
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1089
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1090
    }
UNCOV
1091
    return;
×
1092
  }
1093

1094
  // compact need some time, so only reset migration state here and wait the next
1095
  // tick to send the first migration request again.
1096
  if (pSsMigrate->currFset.state == SSMIGRATE_FILESET_STATE_COMPACT) {
77,063✔
1097
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, compacting, will retry later", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
1,526✔
1098
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
1,526✔
1099
    pSsMigrate->currFset.nodeId = 0;
1,526✔
1100
    pSsMigrate->stateUpdateTime = taosGetTimestampSec();
1,526✔
1101
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
1,526✔
1102
    mndUpdateSsMigrate(pMnode, pSsMigrate);
1,526✔
1103
    return;
1,526✔
1104
  }
1105

1106
  if (pSsMigrate->currFset.state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
75,537✔
1107
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
6,867✔
1108
      mWarn("ssmigrate:%d, vgId:%d, fid:%d, haven't receive state in 30 seconds, skip", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
1109
      pSsMigrate->vgIdx++;
×
1110
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1111
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1112
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1113
    } else {
1114
      mndSendQuerySsMigrateProgressReq(pMnode, pSsMigrate);
6,867✔
1115
    }
1116
    return;
6,867✔
1117
  }
1118

1119
  // wait at least 30 seconds after the leader node has processed the file set, this is to ensure
1120
  // that the follower nodes have enough time to start process the file set, and make the code of
1121
  // tsdb simpler.
1122
  if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime < 30) {
68,670✔
1123
    return;
66,381✔
1124
  }
1125

1126
  // this file set has been processed, move to the next file set
1127
  pSsMigrate->fsetIdx++;
2,289✔
1128
  if (pSsMigrate->fsetIdx >= numFset) {
2,289✔
1129
    pSsMigrate->vgIdx++;
2,289✔
1130
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
2,289✔
1131
  } else {
1132
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
1133
    pSsMigrate->currFset.nodeId = 0;
×
1134
    pSsMigrate->currFset.vgId = vgId;
×
1135
    pSsMigrate->currFset.fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, pSsMigrate->fsetIdx);
×
1136
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
1137
  }
1138

1139
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
2,289✔
1140
  mndUpdateSsMigrate(pMnode, pSsMigrate);
2,289✔
1141
}
1142

1143

1144
static int32_t mndProcessUpdateSsMigrateProgressTimer(SRpcMsg *pReq) {
166,334✔
1145
  mTrace("start to process update ssmigrate progress timer");
166,334✔
1146

1147
  int32_t code = 0;
166,334✔
1148
  SMnode* pMnode = pReq->info.node;
166,334✔
1149
  SSdb   *pSdb = pMnode->pSdb;
166,334✔
1150
  void *pIter = NULL;
166,334✔
1151

1152
  while (1) {
87,745✔
1153
    SSsMigrateObj *pSsMigrate = NULL;
254,079✔
1154
    pIter = sdbFetch(pMnode->pSdb, SDB_SSMIGRATE, pIter, (void **)&pSsMigrate);
254,079✔
1155
    if (pIter == NULL) {
254,079✔
1156
      break;
166,334✔
1157
    }
1158
    mndUpdateSsMigrateProgress(pMnode, pSsMigrate);
87,745✔
1159
    sdbRelease(pSdb, pSsMigrate);
87,745✔
1160
  }
1161

1162
  return 0;
166,334✔
1163
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc