• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4905

29 Dec 2025 02:08PM UTC coverage: 65.423% (-0.3%) from 65.734%
#4905

push

travis-ci

web-flow
enh: sign connect request (#34067)

23 of 29 new or added lines in 4 files covered. (79.31%)

11614 existing lines in 186 files now uncovered.

193476 of 295730 relevant lines covered (65.42%)

115752566.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.76
/source/dnode/mnode/impl/src/mndSsMigrate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndSsMigrate.h"
16
#include "mndDb.h"
17
#include "mndDnode.h"
18
#include "mndPrivilege.h"
19
#include "mndShow.h"
20
#include "mndTrans.h"
21
#include "mndUser.h"
22
#include "mndVgroup.h"
23
#include "tmisce.h"
24
#include "tmsgcb.h"
25

26
#define MND_SSMIGRATE_VER_NUMBER       2
27

28
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq);
29
static int32_t mndProcessUpdateSsMigrateProgressTimer(SRpcMsg *pReq);
30
static int32_t mndProcessSsMigrateListFileSetsRsp(SRpcMsg *pMsg);
31
static int32_t mndProcessSsMigrateFileSetRsp(SRpcMsg *pMsg);
32
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pReq);
33
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pMsg);
34

35
int32_t mndInitSsMigrate(SMnode *pMnode) {
384,569✔
36
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SSMIGRATE, mndRetrieveSsMigrate);
384,569✔
37
  mndSetMsgHandle(pMnode, TDMT_MND_SSMIGRATE_DB_TIMER, mndProcessSsMigrateDbTimer);
384,569✔
38
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SSMIGRATE, mndProcessKillSsMigrateReq);
384,569✔
39
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SSMIGRATE_RSP, mndTransProcessRsp);
384,569✔
40
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_SSMIGRATE_PROGRESS_TIMER, mndProcessUpdateSsMigrateProgressTimer);
384,569✔
41
  mndSetMsgHandle(pMnode, TDMT_VND_LIST_SSMIGRATE_FILESETS_RSP, mndProcessSsMigrateListFileSetsRsp);
384,569✔
42
  mndSetMsgHandle(pMnode, TDMT_VND_SSMIGRATE_FILESET_RSP, mndProcessSsMigrateFileSetRsp);
384,569✔
43
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SSMIGRATE_PROGRESS_RSP, mndProcessQuerySsMigrateProgressRsp);
384,569✔
44
  mndSetMsgHandle(pMnode, TDMT_VND_FOLLOWER_SSMIGRATE_RSP, mndProcessFollowerSsMigrateRsp);
384,569✔
45

46
  SSdbTable table = {
384,569✔
47
      .sdbType = SDB_SSMIGRATE,
48
      .keyType = SDB_KEY_INT32,
49
      .encodeFp = (SdbEncodeFp)mndSsMigrateActionEncode,
50
      .decodeFp = (SdbDecodeFp)mndSsMigrateActionDecode,
51
      .insertFp = (SdbInsertFp)mndSsMigrateActionInsert,
52
      .updateFp = (SdbUpdateFp)mndSsMigrateActionUpdate,
53
      .deleteFp = (SdbDeleteFp)mndSsMigrateActionDelete,
54
  };
55

56
  return sdbSetTable(pMnode->pSdb, table);
384,569✔
57
}
58

59
void mndCleanupSsMigrate(SMnode *pMnode) { mDebug("mnd ssmigrate cleanup"); }
384,506✔
60

61
void tFreeSsMigrateObj(SSsMigrateObj *pSsMigrate) {
×
62
  taosArrayDestroy(pSsMigrate->vgroups);
×
63
  taosArrayDestroy(pSsMigrate->fileSets);
×
64
}
×
65

66
int32_t tSerializeSSsMigrateObj(void *buf, int32_t bufLen, const SSsMigrateObj *pObj) {
×
67
  SEncoder encoder = {0};
×
68
  int32_t  code = 0;
×
69
  int32_t  lino;
70
  int32_t  tlen;
71
  tEncoderInit(&encoder, buf, bufLen);
×
72

73
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
74
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->id));
×
75
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->dbUid));
×
76
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
×
77
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
×
78
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->stateUpdateTime));
×
79
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgIdx));
×
80
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgState));
×
81
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->fsetIdx));
×
82
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.nodeId));
×
83
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.vgId));
×
84
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.fid));
×
85
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.state));
×
86
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->currFset.startTime));
×
87

88
  int32_t numVg = pObj->vgroups ? taosArrayGetSize(pObj->vgroups) : 0;
×
89
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numVg));
×
90
  for (int32_t i = 0; i < numVg; ++i) {
×
91
    int32_t *vgId = (int32_t *)taosArrayGet(pObj->vgroups, i);
×
92
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, *vgId));
×
93
  }
94

95
  int32_t numFset = pObj->fileSets ? taosArrayGetSize(pObj->fileSets) : 0;
×
96
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numFset));
×
97
  for (int32_t i = 0; i < numFset; ++i) {
×
98
    int32_t *fsetId = (int32_t *)taosArrayGet(pObj->fileSets, i);
×
99
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, *fsetId));
×
100
  }
101
  tEndEncode(&encoder);
×
102

103
_exit:
×
104
  if (code) {
×
105
    tlen = code;
×
106
  } else {
107
    tlen = encoder.pos;
×
108
  }
109
  tEncoderClear(&encoder);
×
110
  return tlen;
×
111
}
112

113
int32_t tDeserializeSSsMigrateObj(void *buf, int32_t bufLen, SSsMigrateObj *pObj) {
×
114
  int32_t  code = TSDB_CODE_SUCCESS, lino;
×
115
  SArray *vgroups = NULL, *fileSets = NULL;
×
116
  SDecoder decoder = {0};
×
117
  tDecoderInit(&decoder, buf, bufLen);
×
118

119
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
120
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->id));
×
121
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->dbUid));
×
122
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
×
123
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
×
124
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->stateUpdateTime));
×
125
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgIdx));
×
126
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgState));
×
127
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->fsetIdx));
×
128
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.nodeId));
×
129
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.vgId));
×
130
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.fid));
×
131
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.state));
×
132
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->currFset.startTime));
×
133

134
  int32_t numVg = 0;
×
135
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numVg));
×
136
  vgroups = pObj->vgroups;
×
137
  if (vgroups) {
×
138
    taosArrayClear(vgroups);
×
139
  } else if ((vgroups = taosArrayInit(numVg, sizeof(int32_t))) == NULL) {
×
140
    code = terrno;
×
141
    goto _exit;
×
142
  }
143
  for (int32_t i = 0; i < numVg; ++i) {
×
144
    int32_t vgId = 0;
×
145
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &vgId));
×
146
    if(taosArrayPush(vgroups, &vgId) == NULL) {
×
147
      TAOS_CHECK_EXIT(terrno);
×
148
    }
149
  }
150

151
  int32_t numFset = 0;
×
152
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numFset));
×
153
  fileSets = pObj->fileSets;
×
154
  if (fileSets) {
×
155
    taosArrayClear(fileSets);
×
156
  } else if ((fileSets = taosArrayInit(numFset, sizeof(int32_t))) == NULL) {
×
157
    code = terrno;
×
158
    goto _exit;
×
159
  }
160
  for (int32_t i = 0; i < numFset; ++i) {
×
161
    int32_t fsetId = 0;
×
162
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &fsetId));
×
163
    if(taosArrayPush(fileSets, &fsetId) == NULL) {
×
164
      TAOS_CHECK_EXIT(terrno);
×
165
    }
166
  }
167

168
  tEndDecode(&decoder);
×
169

170
_exit:
×
171
  tDecoderClear(&decoder);
×
172
  if (code == TSDB_CODE_SUCCESS) {
×
173
    pObj->vgroups = vgroups;
×
174
  } else if (pObj->vgroups) {
×
175
    taosArrayClear(pObj->vgroups);
×
176
  } else {
177
    taosArrayDestroy(vgroups);
×
178
  }
179
  if (code == TSDB_CODE_SUCCESS) {
×
180
    pObj->fileSets = fileSets;
×
181
  } else if (pObj->fileSets) {
×
182
    taosArrayClear(pObj->fileSets);
×
183
  } else {
184
    taosArrayDestroy(fileSets);
×
185
  }
186
  return code;
×
187
}
188

189
SSdbRaw *mndSsMigrateActionEncode(SSsMigrateObj *pSsMigrate) {
×
190
  int32_t code = 0;
×
191
  int32_t lino = 0;
×
192
  terrno = TSDB_CODE_SUCCESS;
×
193

194
  void    *buf = NULL;
×
195
  SSdbRaw *pRaw = NULL;
×
196

197
  int32_t tlen = tSerializeSSsMigrateObj(NULL, 0, pSsMigrate);
×
198
  if (tlen < 0) {
×
199
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
200
    goto OVER;
×
201
  }
202

203
  int32_t size = sizeof(int32_t) + tlen;
×
204
  pRaw = sdbAllocRaw(SDB_SSMIGRATE, MND_SSMIGRATE_VER_NUMBER, size);
×
205
  if (pRaw == NULL) {
×
206
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
207
    goto OVER;
×
208
  }
209

210
  buf = taosMemoryMalloc(tlen);
×
211
  if (buf == NULL) {
×
212
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
213
    goto OVER;
×
214
  }
215

216
  tlen = tSerializeSSsMigrateObj(buf, tlen, pSsMigrate);
×
217
  if (tlen < 0) {
×
218
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
219
    goto OVER;
×
220
  }
221

222
  int32_t dataPos = 0;
×
223
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
×
224
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
225
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
×
226

227
OVER:
×
228
  taosMemoryFreeClear(buf);
×
229
  if (terrno != TSDB_CODE_SUCCESS) {
×
230
    mError("ssmigrate:%" PRId32 ", failed to encode to raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
231
    sdbFreeRaw(pRaw);
×
232
    return NULL;
×
233
  }
234

235
  mTrace("ssmigrate:%" PRId32 ", encode to raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
×
236
  return pRaw;
×
237
}
238

239
SSdbRow *mndSsMigrateActionDecode(SSdbRaw *pRaw) {
×
240
  int32_t      code = 0;
×
241
  int32_t      lino = 0;
×
242
  SSdbRow     *pRow = NULL;
×
243
  SSsMigrateObj *pSsMigrate = NULL;
×
244
  void        *buf = NULL;
×
245
  terrno = TSDB_CODE_SUCCESS;
×
246

247
  int8_t sver = 0;
×
248
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
×
249
    goto OVER;
×
250
  }
251

252
  if (sver != MND_SSMIGRATE_VER_NUMBER) {
×
253
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
254
    mError("ssmigrate read invalid ver, data ver: %d, curr ver: %d", sver, MND_SSMIGRATE_VER_NUMBER);
×
255
    goto OVER;
×
256
  }
257

258
  pRow = sdbAllocRow(sizeof(SSsMigrateObj));
×
259
  if (pRow == NULL) {
×
260
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
261
    goto OVER;
×
262
  }
263

264
  pSsMigrate = sdbGetRowObj(pRow);
×
265
  if (pSsMigrate == NULL) {
×
266
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
267
    goto OVER;
×
268
  }
269

270
  int32_t tlen;
×
271
  int32_t dataPos = 0;
×
272
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
×
273
  buf = taosMemoryMalloc(tlen + 1);
×
274
  if (buf == NULL) {
×
275
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
276
    goto OVER;
×
277
  }
278
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
279

280
  if ((terrno = tDeserializeSSsMigrateObj(buf, tlen, pSsMigrate)) < 0) {
×
281
    goto OVER;
×
282
  }
283

284
OVER:
×
285
  taosMemoryFreeClear(buf);
×
286
  if (terrno != TSDB_CODE_SUCCESS) {
×
287
    mError("ssmigrate:%" PRId32 ", failed to decode from raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
288
    taosMemoryFreeClear(pRow);
×
289
    return NULL;
×
290
  }
291

292
  mTrace("ssmigrate:%" PRId32 ", decode from raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
×
293
  return pRow;
×
294
}
295

296
int32_t mndSsMigrateActionInsert(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
×
297
  mTrace("ssmigrate:%" PRId32 ", perform insert action", pSsMigrate->id);
×
298
  return 0;
×
299
}
300

301
int32_t mndSsMigrateActionDelete(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
×
302
  mTrace("ssmigrate:%" PRId32 ", perform delete action", pSsMigrate->id);
×
303
  tFreeSsMigrateObj(pSsMigrate);
×
304
  return 0;
×
305
}
306

307
int32_t mndSsMigrateActionUpdate(SSdb *pSdb, SSsMigrateObj *pOldSsMigrate, SSsMigrateObj *pNewSsMigrate) {
×
308
  mTrace("ssmigrate:%" PRId32 ", perform update action, old row:%p new row:%p", pOldSsMigrate->id, pOldSsMigrate,
×
309
         pNewSsMigrate);
310

311
  return 0;
×
312
}
313

314
SSsMigrateObj *mndAcquireSsMigrate(SMnode *pMnode, int64_t ssMigrateId) {
×
315
  SSdb        *pSdb = pMnode->pSdb;
×
316
  SSsMigrateObj *pSsMigrate = sdbAcquire(pSdb, SDB_SSMIGRATE, &ssMigrateId);
×
317
  if (pSsMigrate == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
318
    terrno = TSDB_CODE_SUCCESS;
×
319
  }
320
  return pSsMigrate;
×
321
}
322

323
void mndReleaseSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
324
  SSdb *pSdb = pMnode->pSdb;
×
325
  sdbRelease(pSdb, pSsMigrate);
×
326
  pSsMigrate = NULL;
×
327
}
×
328

329
int32_t mndSsMigrateGetDbName(SMnode *pMnode, int32_t ssMigrateId, char *dbname, int32_t len) {
×
330
  int32_t      code = 0;
×
331
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, ssMigrateId);
×
332
  if (pSsMigrate == NULL) {
×
333
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
334
    if (terrno != 0) code = terrno;
×
335
    TAOS_RETURN(code);
×
336
  }
337

338
  tstrncpy(dbname, pSsMigrate->dbname, len);
×
339
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
340
  TAOS_RETURN(code);
×
341
}
342

343
// ssmigrate db
344
int32_t mndAddSsMigrateToTran(SMnode *pMnode, STrans *pTrans, SSsMigrateObj *pSsMigrate, SDbObj *pDb) {
×
345
  int32_t code = 0;
×
346
  SSdb   *pSdb = pMnode->pSdb;
×
347
  void   *pIter = NULL;
×
348

349
  pSsMigrate->dbUid = pDb->uid;
×
350
  pSsMigrate->id = tGenIdPI32();
×
351
  tstrncpy(pSsMigrate->dbname, pDb->name, sizeof(pSsMigrate->dbname));
×
352
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
353
  pSsMigrate->vgIdx = 0;
×
354
  pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
355
  pSsMigrate->fsetIdx = 0;
×
356

357
  pSsMigrate->vgroups = taosArrayInit(8, sizeof(int32_t));
×
358
  if (pSsMigrate->vgroups == NULL) {
×
359
    TAOS_RETURN(terrno);
×
360
  }
361

362
  while (1) {
×
363
    SVgObj *pVgroup = NULL;
×
364
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
365
    if (pIter == NULL) break;
×
366

367
    if (pVgroup->mountVgId || pVgroup->dbUid != pDb->uid) {
×
368
      sdbRelease(pSdb, pVgroup);
×
369
      continue;
×
370
    }
371

372
    int32_t vgId = pVgroup->vgId;
×
373
    sdbRelease(pSdb, pVgroup);
×
374
    if (taosArrayPush(pSsMigrate->vgroups, &vgId) == NULL) {
×
375
      code = terrno;
×
376
      taosArrayDestroy(pSsMigrate->vgroups);
×
377
      pSsMigrate->vgroups = NULL;
×
378
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
379
      sdbRelease(pSdb, pVgroup);
×
UNCOV
380
      TAOS_RETURN(code);
×
381
    }
382
  }
383

384
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
385
  code = terrno;
×
386
  taosArrayDestroy(pSsMigrate->vgroups);
×
387
  pSsMigrate->vgroups = NULL;
×
UNCOV
388
  if (pRaw == NULL) {
×
389
    TAOS_RETURN(code);
×
390
  }
391
  if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
×
UNCOV
392
    sdbFreeRaw(pRaw);
×
UNCOV
393
    TAOS_RETURN(code);
×
394
  }
395

396
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
×
UNCOV
397
    sdbFreeRaw(pRaw);
×
UNCOV
398
    TAOS_RETURN(code);
×
399
  }
400

UNCOV
401
  mInfo("trans:%d, ssmigrate:%d, db:%s, has been added", pTrans->id, pSsMigrate->id, pSsMigrate->dbname);
×
UNCOV
402
  return 0;
×
403
}
404

405
// retrieve ssmigrate
406
int32_t mndRetrieveSsMigrate(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
407
  SMnode        *pMnode = pReq->info.node;
×
408
  SSdb          *pSdb = pMnode->pSdb;
×
409
  int32_t        numOfRows = 0;
×
410
  SSsMigrateObj *pSsMigrate = NULL;
×
411
  char          *sep = NULL;
×
412
  SDbObj        *pDb = NULL;
×
UNCOV
413
  int32_t        code = 0;
×
414
  int32_t        lino = 0;
×
415
  SUserObj      *pUser = NULL;
×
416
  SDbObj        *pIterDb = NULL;
×
417
  char           objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
×
418
  bool           showAll = false, showIter = false;
×
UNCOV
419
  int64_t        dbUid = 0;
×
420

421
  if (strlen(pShow->db) > 0) {
×
UNCOV
422
    sep = strchr(pShow->db, '.');
×
UNCOV
423
    if (sep &&
×
UNCOV
424
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
425
      sep++;
×
426
    } else {
427
      pDb = mndAcquireDb(pMnode, pShow->db);
×
UNCOV
428
      if (pDb == NULL) return terrno;
×
429
    }
430
  }
431

UNCOV
432
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), PRIV_SHOW_SSMIGRATES, PRIV_OBJ_DB, 0, _OVER);
×
433

UNCOV
434
  while (numOfRows < rows) {
×
UNCOV
435
    pShow->pIter = sdbFetch(pSdb, SDB_SSMIGRATE, pShow->pIter, (void **)&pSsMigrate);
×
436
    if (pShow->pIter == NULL) break;
×
437

UNCOV
438
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pSsMigrate->dbname, pSsMigrate, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_SSMIGRATES, _OVER);
×
439

440
    SColumnInfoData *pColInfo;
441
    SName            n;
442
    int32_t          cols = 0;
×
443

444
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
445

446
    // ssmigrate_id
UNCOV
447
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
448
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->id, false), pSsMigrate, &lino, _OVER);
×
449

450
    // db_name
UNCOV
451
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
452
    if (pDb != NULL || !IS_SYS_DBNAME(pSsMigrate->dbname)) {
×
453
      SName name = {0};
×
UNCOV
454
      TAOS_CHECK_GOTO(tNameFromString(&name, pSsMigrate->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
×
UNCOV
455
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
×
456
    } else {
457
      tstrncpy(varDataVal(tmpBuf), pSsMigrate->dbname, TSDB_SHOW_SQL_LEN);
×
458
    }
UNCOV
459
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
×
UNCOV
460
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pSsMigrate, &lino, _OVER);
×
461

462
    // start_time
UNCOV
463
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
464
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->startTime, false), pSsMigrate, &lino, _OVER);
×
465

466
    // number_vgroup
467
    int32_t numVg = taosArrayGetSize(pSsMigrate->vgroups);
×
468
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
469
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&numVg, false), pSsMigrate, &lino, _OVER);
×
470

471
    // migrated_vgroup
472
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
473
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->vgIdx, false), pSsMigrate, &lino, _OVER);
×
474

475
    if (pSsMigrate->vgIdx < numVg) {
×
476
      // vgroup_id
UNCOV
477
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
478
      int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
479
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&vgId, false), pSsMigrate, &lino, _OVER);
×
480
      
481
      // number_fileset
UNCOV
482
      int32_t numFset = taosArrayGetSize(pSsMigrate->fileSets);
×
483
      if (pSsMigrate->vgState < SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
484
        numFset = 0;
×
485
      }
UNCOV
486
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
487
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&numFset, false), pSsMigrate, &lino, _OVER);
×
488

489
      // migrated_fileset
490
      int32_t fsetIdx = pSsMigrate->fsetIdx;
×
UNCOV
491
      if (pSsMigrate->vgState < SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
492
        fsetIdx = 0;
×
493
      }
UNCOV
494
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
495
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&fsetIdx, false), pSsMigrate, &lino, _OVER);
×
496

497
      // fileset_id
UNCOV
498
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
499
      if (fsetIdx < numFset) {
×
500
        int32_t fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, fsetIdx);
×
501
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&fid, false), pSsMigrate, &lino, _OVER);
×
502
      } else {
UNCOV
503
        RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
504
      }
505
    } else {
506
      // vgroup_id
UNCOV
507
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
508
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
509

510
      // number_fileset
UNCOV
511
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
512
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
513

514
      // migrated_fileset
UNCOV
515
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
516
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
517

518
      // fileset_id
519
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
520
      RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pSsMigrate, &lino, _OVER);
×
521
    }
522

UNCOV
523
    numOfRows++;
×
UNCOV
524
    sdbRelease(pSdb, pSsMigrate);
×
525
  }
526

527
_OVER:
×
528
  if (pUser) mndReleaseUser(pMnode, pUser);
×
UNCOV
529
  mndReleaseDb(pMnode, pDb);
×
530
  if (code != 0) {
×
531
    mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
532
    TAOS_RETURN(code);
×
533
  }
534
  pShow->numOfRows += numOfRows;
×
UNCOV
535
  return numOfRows;
×
536
}
537

538

539
int32_t mndSsMigrateDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb);
540

541
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq) {
×
UNCOV
542
  SMnode *pMnode = pReq->info.node;
×
UNCOV
543
  void *pIter = NULL;
×
544

545
  while (1) {
×
UNCOV
546
    SDbObj *pDb = NULL;
×
UNCOV
547
    pIter = sdbFetch(pMnode->pSdb, SDB_DB, pIter, (void **)&pDb);
×
UNCOV
548
    if (pIter == NULL) {
×
549
      break;
×
550
    }
UNCOV
551
    int32_t code = mndSsMigrateDb(pMnode, NULL, pDb);
×
552
    sdbRelease(pMnode->pSdb, pDb);
×
553
    if (code == TSDB_CODE_SUCCESS) {
×
554
      mInfo("ssmigrate db:%s, has been triggered by timer", pDb->name);
×
555
    } else {
UNCOV
556
      mError("failed to trigger ssmigrate db:%s, code:%d, %s", pDb->name, code, tstrerror(code));
×
557
    }
558
  }
559

560
  TAOS_RETURN(0);
×
561
}
562

563

564
static int32_t mndDropSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
565
  int32_t code = 0, lino = 0;
×
566

UNCOV
567
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "drop-ssmigrate");
×
568
  if (pTrans == NULL) {
×
569
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
570
  }
571

572
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
×
573

574
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
UNCOV
575
  if (pRaw == NULL) {
×
576
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
577
  }
578
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED)) != 0) {
×
UNCOV
579
    sdbFreeRaw(pRaw);
×
UNCOV
580
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
581
  }
582

583
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _exit);
×
UNCOV
584
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
×
585

586
_exit:
×
587
  mndTransDrop(pTrans);
×
UNCOV
588
  if (code == TSDB_CODE_SUCCESS) {
×
589
    mInfo("ssmigrate:%d was dropped successfully", pSsMigrate->id);
×
590
  } else {
591
    mError("ssmigrate:%d, failed to drop at lino %d since %s", pSsMigrate->id, lino, tstrerror(code));
×
592
  }
593
  return code;
×
594
}
595

596

597
static void mndUpdateSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
UNCOV
598
  int32_t code = 0, lino = 0;
×
599

600
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-ssmigrate");
×
601
  if (pTrans == NULL) {
×
UNCOV
602
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
603
  }
604
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
×
605

606
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
UNCOV
607
  if (pRaw == NULL) {
×
608
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
609
  }
610
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
×
UNCOV
611
    sdbFreeRaw(pRaw);
×
UNCOV
612
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
613
  }
614

615
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _exit);
×
616
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
×
617

UNCOV
618
_exit:
×
UNCOV
619
  mndTransDrop(pTrans);
×
620
  if (code == TSDB_CODE_SUCCESS) {
×
621
    mTrace("ssmigrate:%d was updated successfully", pSsMigrate->id);
×
622
  } else {
623
    mError("ssmigrate:%d, failed to update at lino %d since %s", pSsMigrate->id, lino, tstrerror(code));
×
624
  }
625
}
×
626

627

628
static int32_t mndKillSsMigrate(SMnode *pMnode, SRpcMsg *pReq, SSsMigrateObj *pSsMigrate) {
×
UNCOV
629
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
UNCOV
630
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
631
  if (pVgroup == NULL) {
×
632
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
633
  }
634

635
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
UNCOV
636
  mndReleaseVgroup(pMnode, pVgroup);
×
637

638
  SVnodeKillSsMigrateReq req = {.ssMigrateId = pSsMigrate->id};
×
639
  int32_t   reqLen = tSerializeSVnodeKillSsMigrateReq(NULL, 0, &req);
×
640
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
641
  SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
642
  if (pHead == NULL) {
×
643
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
644
  }
645

646
  pHead->contLen = htonl(contLen);
×
UNCOV
647
  pHead->vgId = htonl(vgId);
×
UNCOV
648
  int32_t ret = 0;
×
649
  if ((ret = tSerializeSVnodeKillSsMigrateReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
650
    return mndDropSsMigrate(pMnode, pSsMigrate);
×
651
  }
652

UNCOV
653
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_KILL_SSMIGRATE, .pCont = pHead, .contLen = contLen};
×
654
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
655
  if (code != 0) {
×
UNCOV
656
    mError("ssmigrate:%d, vgId:%d, failed to send kill ssmigrate request to vnode since 0x%x", req.ssMigrateId, vgId, code);
×
657
  } else {
658
    mInfo("ssmigrate:%d, vgId:%d, kill ssmigrate request was sent to vnode", req.ssMigrateId, vgId);
×
659
  }
660

661
  return mndDropSsMigrate(pMnode, pSsMigrate);
×
662
}
663

664
int32_t mndProcessKillSsMigrateReq(SRpcMsg *pReq) {
×
665
  int32_t         code = 0;
×
UNCOV
666
  int32_t         lino = 0;
×
UNCOV
667
  SKillSsMigrateReq killReq = {0};
×
668

UNCOV
669
  if ((code = tDeserializeSKillSsMigrateReq(pReq->pCont, pReq->contLen, &killReq)) != 0) {
×
670
    TAOS_RETURN(code);
×
671
  }
672

673
  mInfo("start to kill ssmigrate:%" PRId32, killReq.ssMigrateId);
×
674

UNCOV
675
  SMnode      *pMnode = pReq->info.node;
×
UNCOV
676
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, killReq.ssMigrateId);
×
677
  if (pSsMigrate == NULL) {
×
678
    code = TSDB_CODE_MND_INVALID_SSMIGRATE_ID;
×
UNCOV
679
    tFreeSKillSsMigrateReq(&killReq);
×
680
    TAOS_RETURN(code);
×
681
  }
682

683
  //TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_SSMIGRATE_DB), &lino, _OVER);
684

685
  TAOS_CHECK_GOTO(mndKillSsMigrate(pMnode, pReq, pSsMigrate), &lino, _OVER);
×
686

687
_OVER:
×
688
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
689
    mError("failed to kill ssmigrate %" PRId32 " since %s", killReq.ssMigrateId, tstrerror(code));
×
690
  }
691

UNCOV
692
  tFreeSKillSsMigrateReq(&killReq);
×
UNCOV
693
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
694

695
  TAOS_RETURN(code);
×
696
}
697

698

699
static void mndSendSsMigrateListFileSetsReq(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
700
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
701
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
702
  if (pVgroup == NULL) {
×
UNCOV
703
    mError("ssmigrate:%d, vgId:%d, vgroup does not exist in %s", pSsMigrate->id, vgId, __func__);
×
UNCOV
704
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
705
    pSsMigrate->vgIdx++;
×
706
    return;
×
707
  }
708

709
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
UNCOV
710
  mndReleaseVgroup(pMnode, pVgroup);
×
711

712
  SListSsMigrateFileSetsReq req = {.ssMigrateId = pSsMigrate->id};
×
713
  int32_t   reqLen = tSerializeSListSsMigrateFileSetsReq(NULL, 0, &req);
×
714
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
715
  SMsgHead *pHead = rpcMallocCont(contLen);
×
716
  if (pHead == NULL) {
×
717
    return;
×
718
  }
719

720
  pHead->contLen = htonl(contLen);
×
UNCOV
721
  pHead->vgId = htonl(vgId);
×
UNCOV
722
  int32_t ret = 0;
×
723
  if ((ret = tSerializeSListSsMigrateFileSetsReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
724
    return;
×
725
  }
726

UNCOV
727
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_LIST_SSMIGRATE_FILESETS, .pCont = pHead, .contLen = contLen};
×
728
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
729
  if (code != 0) {
×
UNCOV
730
    mError("ssmigrate:%d, vgId:%d, failed to send list filesets request to vnode since 0x%x", req.ssMigrateId, vgId, code);
×
731
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
732
    pSsMigrate->vgIdx++;
×
733
  } else {
UNCOV
734
    mInfo("ssmigrate:%d, vgId:%d, list filesets request was sent to vnode", req.ssMigrateId, vgId);
×
UNCOV
735
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_WAITING_FSET_LIST;
×
736
  }
737

738
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
739
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
740
}
741

742

743
static int32_t mndProcessSsMigrateListFileSetsRsp(SRpcMsg *pMsg) {
×
744
  int32_t code = 0, lino = 0;
×
745

UNCOV
746
  if (pMsg->code != 0) {
×
747
    mError("received wrong ssmigrate list filesets response, req code is %s", tstrerror(pMsg->code));
×
748
    TAOS_RETURN(pMsg->code);
×
749
  }
750

UNCOV
751
  SSsMigrateObj *pSsMigrate = NULL;
×
752
  SListSsMigrateFileSetsRsp rsp = {0};
×
753
  code = tDeserializeSListSsMigrateFileSetsRsp(pMsg->pCont, pMsg->contLen, &rsp);
×
UNCOV
754
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
755

UNCOV
756
  SMnode *pMnode = pMsg->info.node;
×
UNCOV
757
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
758
  if (pSsMigrate == NULL) {
×
759
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
760
  }
761

762
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
763
  if (vgId != rsp.vgId) {
×
764
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
765
  }
766

767
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_WAITING_FSET_LIST) {
×
UNCOV
768
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
769
  }
770

771
  // we need to use the new filesets to update the SSsMigrateObj,
772
  // swap is only to make the it is easier to free both of them.
773
  SArray* tmp = pSsMigrate->fileSets;
×
774
  pSsMigrate->fileSets = rsp.pFileSets;
×
UNCOV
775
  rsp.pFileSets = tmp;
×
776

777
  pSsMigrate->fsetIdx = 0;
×
UNCOV
778
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
779
  if (taosArrayGetSize(pSsMigrate->fileSets) == 0) {
×
780
    mInfo("ssmigrate:%d, vgId:%d, no filesets to migrate.", pSsMigrate->id, rsp.vgId);
×
781
    pSsMigrate->vgIdx++;
×
UNCOV
782
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
783
  } else {
784
    mInfo("ssmigrate:%d, vgId:%d, filesets received.", pSsMigrate->id, rsp.vgId);
×
785
    pSsMigrate->currFset.nodeId = 0;
×
UNCOV
786
    pSsMigrate->currFset.vgId = vgId;
×
UNCOV
787
    pSsMigrate->currFset.fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, 0);
×
UNCOV
788
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
789
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
790
  }
791

792
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
793
  
794
_exit:
×
795
  if (code != TSDB_CODE_SUCCESS) {
×
796
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
797
  }
UNCOV
798
  tFreeSListSsMigrateFileSetsRsp(&rsp);
×
799
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
800
  return code;
×
801
}
802

803

804
static void mndSendSsMigrateFileSetReq(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
805
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
806
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
×
UNCOV
807
  if (pVgroup == NULL) {
×
808
    mError("ssmigrate:%d, vgId:%d, vgroup does not exist in %s", pSsMigrate->id, vgId, __func__);
×
809
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
810
    pSsMigrate->vgIdx++;
×
811
    return;
×
812
  }
813

UNCOV
814
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
815
  mndReleaseVgroup(pMnode, pVgroup);
×
816

817
  SSsMigrateFileSetReq req = { 0 };
×
818
  req.ssMigrateId = pSsMigrate->id;
×
819
  req.nodeId = 0;
×
UNCOV
820
  req.fid = pSsMigrate->currFset.fid;
×
UNCOV
821
  req.startTimeSec = taosGetTimestampSec();
×
822

823
  int32_t   reqLen = tSerializeSSsMigrateFileSetReq(NULL, 0, &req);
×
824
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
825
  SMsgHead *pHead = rpcMallocCont(contLen);
×
826
  if (pHead == NULL) {
×
827
    return;
×
828
  }
829

830
  pHead->contLen = htonl(contLen);
×
831
  pHead->vgId = htonl(vgId);
×
UNCOV
832
  int32_t ret = 0;
×
UNCOV
833
  if ((ret = tSerializeSSsMigrateFileSetReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
834
    return;
×
835
  }
836

UNCOV
837
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_SSMIGRATE_FILESET, .pCont = pHead, .contLen = contLen};
×
UNCOV
838
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
839
  if (code != 0) {
×
840
    mError("ssmigrate:%d, vgId:%d, fid:%d, failed to send migrate fileset request to vnode since 0x%x", req.ssMigrateId, vgId, req.fid, code);
×
UNCOV
841
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
842
    pSsMigrate->vgIdx++;
×
843
  } else {
844
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, migrate fileset request was sent to vnode", req.ssMigrateId, vgId, req.fid);
×
UNCOV
845
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_STARTING;
×
UNCOV
846
    pSsMigrate->currFset.startTime = req.startTimeSec;
×
847
  }
848

849
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
850
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
851
}
852

853

854
static int32_t mndProcessSsMigrateFileSetRsp(SRpcMsg *pMsg) {
×
855
  int32_t code = 0, lino = 0;
×
856

UNCOV
857
  if (pMsg->code != 0) {
×
858
    mError("received wrong ssmigrate fileset response, error code is %s", tstrerror(pMsg->code));
×
859
    TAOS_RETURN(pMsg->code);
×
860
  }
861

862
  SSsMigrateObj *pSsMigrate = NULL;
×
863
  SSsMigrateFileSetRsp rsp = {0};
×
UNCOV
864
  code = tDeserializeSSsMigrateFileSetRsp(pMsg->pCont, pMsg->contLen, &rsp);
×
865
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
866

UNCOV
867
  SMnode *pMnode = pMsg->info.node;
×
868
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
869
  if (pSsMigrate == NULL) {
×
UNCOV
870
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
871
  }
872

873
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
874
  if (vgId != rsp.vgId) {
×
875
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
876
  }
UNCOV
877
  if (rsp.fid != pSsMigrate->currFset.fid) {
×
878
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
879
  }
880
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_FSET_STARTING) {
×
UNCOV
881
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
882
  }
883
  if (rsp.nodeId <= 0) {
×
UNCOV
884
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
885
  }
886

UNCOV
887
  mInfo("ssmigrate:%d, vgId:%d, fid:%d, leader node is %d", rsp.ssMigrateId, vgId, rsp.fid, rsp.nodeId);
×
888
  pSsMigrate->currFset.nodeId = rsp.nodeId;
×
889
  pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_STARTED;
×
UNCOV
890
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
UNCOV
891
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
892
  
893
_exit:
×
894
  if (code != TSDB_CODE_SUCCESS) {
×
895
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
896
  }
UNCOV
897
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
UNCOV
898
  return code;
×
899
}
900

901

902

903
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pReq) {
×
904
  TAOS_RETURN(0);
×
905
}
906

907

UNCOV
908
static void mndSendFollowerSsMigrateReq(SMnode* pMnode, SSsMigrateObj *pSsMigrate) {
×
UNCOV
909
  SVgObj *pVgroup = mndAcquireVgroup(pMnode, pSsMigrate->currFset.vgId);
×
910
  if (pVgroup == NULL) {
×
911
    return;
×
912
  }
913

914
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
UNCOV
915
  mndReleaseVgroup(pMnode, pVgroup);
×
916

917
  SSsMigrateProgress req = {
×
918
    .ssMigrateId = pSsMigrate->id,
×
919
    .nodeId = pSsMigrate->currFset.nodeId,
×
920
    .vgId = pSsMigrate->currFset.vgId,
×
UNCOV
921
    .fid = pSsMigrate->currFset.fid,
×
922
    .state = pSsMigrate->currFset.state,
×
923
  };
924

UNCOV
925
  int32_t          reqLen = tSerializeSSsMigrateProgress(NULL, 0, &req);
×
926
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
UNCOV
927
  SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
928
  if (pHead == NULL) {
×
UNCOV
929
    return;
×
930
  }
931

932
  pHead->contLen = htonl(contLen);
×
933
  pHead->vgId = htonl(req.vgId);
×
UNCOV
934
  TAOS_UNUSED(tSerializeSSsMigrateProgress((char *)pHead + sizeof(SMsgHead), reqLen, &req));
×
935
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_FOLLOWER_SSMIGRATE, .pCont = pHead, .contLen = contLen};
×
936

937
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
UNCOV
938
  if (code != 0) {
×
UNCOV
939
    mError("vgId:%d, ssmigrate:%d, fid:%d, failed to send follower-ssmigrate request since 0x%x", req.ssMigrateId, req.vgId, req.fid, code);
×
940
  } else {
941
    mTrace("vgId:%d, ssmigrate:%d, fid:%d, follower-ssmigrate request sent", req.ssMigrateId, req.vgId, req.fid);
×
942
  }
943
}
944

945

946

947
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pMsg) {
×
948
  int32_t code = 0, lino = 0;
×
949

UNCOV
950
  if (pMsg->code != 0) {
×
951
    mError("received wrong query ssmigrate progress response, error code is %s", tstrerror(pMsg->code));
×
952
    TAOS_RETURN(pMsg->code);
×
953
  }
954

955
  SSsMigrateObj *pSsMigrate = NULL;
×
UNCOV
956
  SSsMigrateProgress rsp = {0};
×
957
  code = tDeserializeSSsMigrateProgress(pMsg->pCont, pMsg->contLen, &rsp);
×
958
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
959

960
  SMnode *pMnode = pMsg->info.node;
×
961
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
UNCOV
962
  if (pSsMigrate == NULL) {
×
UNCOV
963
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
964
  }
965

UNCOV
966
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_FSET_STARTED) {
×
967
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
968
  }
969
  if (rsp.nodeId != pSsMigrate->currFset.nodeId) {
×
970
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
971
  }
UNCOV
972
  if (rsp.vgId != pSsMigrate->currFset.vgId) {
×
973
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
974
  }
975
  if (rsp.fid != pSsMigrate->currFset.fid) {
×
976
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
977
  }
978

979
  if (rsp.state == pSsMigrate->currFset.state) {
×
980
    mTrace("ssmigrate:%d, vgId:%d, fid:%d, state is %d", rsp.ssMigrateId, rsp.vgId, rsp.fid, rsp.state);
×
981
  } else {
UNCOV
982
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, state is %d", rsp.ssMigrateId, rsp.vgId, rsp.fid, rsp.state);
×
983
  }
UNCOV
984
  pSsMigrate->currFset.state = rsp.state;
×
UNCOV
985
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
UNCOV
986
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
987

988
  mndSendFollowerSsMigrateReq(pMnode, pSsMigrate);
×
989

990
_exit:
×
991
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
992
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
993
  }
994
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
995
  return code;
×
996
}
997

998

999

1000
// when query migration progress, we need to send the msg to dnode instead of vgroup,
1001
// because migration may take a long time, and leader may change during the migration process,
1002
// while only the initial leader vnode can handle the migration progress query.
UNCOV
1003
void mndSendQuerySsMigrateProgressReq(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
UNCOV
1004
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pSsMigrate->currFset.nodeId);
×
1005
  if (pDnode == NULL) {
×
1006
    return;
×
1007
  }
1008

1009
  SEpSet epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
1010
  mndReleaseDnode(pMnode, pDnode);
×
1011

1012
  SSsMigrateProgress req = {
×
1013
    .ssMigrateId = pSsMigrate->id,
×
1014
    .nodeId = pSsMigrate->currFset.nodeId,
×
1015
    .vgId = pSsMigrate->currFset.vgId,
×
UNCOV
1016
    .fid = pSsMigrate->currFset.fid,
×
1017
    .state = SSMIGRATE_FILESET_STATE_IN_PROGRESS,
1018
  };
1019

UNCOV
1020
  int32_t          reqLen = tSerializeSSsMigrateProgress(NULL, 0, &req);
×
1021
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
UNCOV
1022
  SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
1023
  if (pHead == NULL) {
×
UNCOV
1024
    return;
×
1025
  }
1026

1027
  pHead->contLen = htonl(contLen);
×
1028
  pHead->vgId = htonl(req.vgId);
×
1029
  TAOS_UNUSED(tSerializeSSsMigrateProgress((char *)pHead + sizeof(SMsgHead), reqLen, &req));
×
UNCOV
1030
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SSMIGRATE_PROGRESS, .pCont = pHead, .contLen = contLen};
×
1031

1032
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
1033
  if (code != 0) {
×
1034
    mError("ssmigrate:%d, vgId:%d, fid:%d, failed to send ssmigrate-query-progress request since 0x%x", req.ssMigrateId, req.vgId, req.fid, code)
×
1035
  } else {
UNCOV
1036
    mTrace("ssmigrate:%d, vgId:%d, fid:%d, ssmigrate-query-progress request sent", req.ssMigrateId, req.vgId, req.fid);
×
1037
  }
1038
}
1039

1040

1041

UNCOV
1042
static void mndUpdateSsMigrateProgress(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
1043
  int32_t numVg = taosArrayGetSize(pSsMigrate->vgroups);
×
UNCOV
1044
  int32_t numFset = taosArrayGetSize(pSsMigrate->fileSets);
×
1045

1046
  if (pSsMigrate->vgIdx >= numVg) {
×
1047
    mInfo("ssmigrate:%d, all vgroups has been processed", pSsMigrate->id);
×
1048
    TAOS_UNUSED(mndDropSsMigrate(pMnode, pSsMigrate));
×
1049
    return;
×
1050
  }
1051

1052
  // vgroup state is init, we need to get the list of its file sets
1053
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_INIT) {
×
UNCOV
1054
    mndSendSsMigrateListFileSetsReq(pMnode, pSsMigrate);
×
UNCOV
1055
    return;
×
1056
  }
1057

1058
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
1059

UNCOV
1060
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_WAITING_FSET_LIST) {
×
1061
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
UNCOV
1062
      mWarn("ssmigrate:%d, vgId:%d, haven't receive file set list in 30 seconds, skip", pSsMigrate->id, vgId);
×
UNCOV
1063
      pSsMigrate->vgIdx++;
×
1064
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1065
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1066
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1067
    }
1068
    return;
×
1069
  }
1070

1071
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
UNCOV
1072
    mndSendSsMigrateFileSetReq(pMnode, pSsMigrate);
×
UNCOV
1073
    return;
×
1074
  }
1075

1076
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_FSET_STARTING) {
×
1077
    // if timeout, we skip the current vgroup instead of the current file set, because timeout
1078
    // of a file set often means the vgroup is not available.
1079
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1080
      mWarn("ssmigrate:%d, vgId:%d, fid:%d, haven't receive response in 30 seconds, skip", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
1081
      pSsMigrate->vgIdx++;
×
1082
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1083
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
UNCOV
1084
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1085
    }
1086
    return;
×
1087
  }
1088

1089
  // compact need some time, so only reset migration state here and wait the next
1090
  // tick to send the first migration request again.
1091
  if (pSsMigrate->currFset.state == SSMIGRATE_FILESET_STATE_COMPACT) {
×
1092
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, compacting, will retry later", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
UNCOV
1093
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
1094
    pSsMigrate->currFset.nodeId = 0;
×
UNCOV
1095
    pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1096
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
UNCOV
1097
    mndUpdateSsMigrate(pMnode, pSsMigrate);
×
UNCOV
1098
    return;
×
1099
  }
1100

UNCOV
1101
  if (pSsMigrate->currFset.state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
1102
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
1103
      mWarn("ssmigrate:%d, vgId:%d, fid:%d, haven't receive state in 30 seconds, skip", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
UNCOV
1104
      pSsMigrate->vgIdx++;
×
UNCOV
1105
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
UNCOV
1106
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1107
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1108
    } else {
1109
      mndSendQuerySsMigrateProgressReq(pMnode, pSsMigrate);
×
1110
    }
UNCOV
1111
    return;
×
1112
  }
1113

1114
  // wait at least 30 seconds after the leader node has processed the file set, this is to ensure
1115
  // that the follower nodes have enough time to start process the file set, and make the code of
1116
  // tsdb simpler.
UNCOV
1117
  if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime < 30) {
×
UNCOV
1118
    return;
×
1119
  }
1120

1121
  // this file set has been processed, move to the next file set
UNCOV
1122
  pSsMigrate->fsetIdx++;
×
UNCOV
1123
  if (pSsMigrate->fsetIdx >= numFset) {
×
1124
    pSsMigrate->vgIdx++;
×
1125
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1126
  } else {
1127
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
1128
    pSsMigrate->currFset.nodeId = 0;
×
1129
    pSsMigrate->currFset.vgId = vgId;
×
1130
    pSsMigrate->currFset.fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, pSsMigrate->fsetIdx);
×
UNCOV
1131
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
1132
  }
1133

1134
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
1135
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1136
}
1137

1138

1139
static int32_t mndProcessUpdateSsMigrateProgressTimer(SRpcMsg *pReq) {
×
UNCOV
1140
  mTrace("start to process update ssmigrate progress timer");
×
1141

1142
  int32_t code = 0;
×
UNCOV
1143
  SMnode* pMnode = pReq->info.node;
×
UNCOV
1144
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
1145
  void *pIter = NULL;
×
1146

UNCOV
1147
  while (1) {
×
UNCOV
1148
    SSsMigrateObj *pSsMigrate = NULL;
×
UNCOV
1149
    pIter = sdbFetch(pMnode->pSdb, SDB_SSMIGRATE, pIter, (void **)&pSsMigrate);
×
UNCOV
1150
    if (pIter == NULL) {
×
UNCOV
1151
      break;
×
1152
    }
UNCOV
1153
    mndUpdateSsMigrateProgress(pMnode, pSsMigrate);
×
UNCOV
1154
    sdbRelease(pSdb, pSsMigrate);
×
1155
  }
1156

UNCOV
1157
  return 0;
×
1158
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc