• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4688

26 Aug 2025 02:05PM UTC coverage: 56.997% (-0.9%) from 57.894%
#4688

push

travis-ci

web-flow
fix: modify the prompt language of the taos-shell (#32758)

* fix: modify prompt language

* fix: add shell test case

* fix: modify comments

* fix: modify test case for TDengine TSDB

130660 of 292423 branches covered (44.68%)

Branch coverage included in aggregate %.

16 of 17 new or added lines in 2 files covered. (94.12%)

9459 existing lines in 157 files now uncovered.

198294 of 284715 relevant lines covered (69.65%)

4532552.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.25
/source/dnode/mnode/impl/src/mndSsMigrate.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "audit.h"
16
#include "mndSsMigrate.h"
17
#include "mndDb.h"
18
#include "mndDnode.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndTrans.h"
22
#include "mndVgroup.h"
23
#include "tmisce.h"
24
#include "tmsgcb.h"
25

26
#define MND_SSMIGRATE_VER_NUMBER       2
27

28
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq);
29
static int32_t mndProcessUpdateSsMigrateProgressTimer(SRpcMsg *pReq);
30
static int32_t mndProcessSsMigrateListFileSetsRsp(SRpcMsg *pMsg);
31
static int32_t mndProcessSsMigrateFileSetRsp(SRpcMsg *pMsg);
32
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pReq);
33
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pMsg);
34

35
int32_t mndInitSsMigrate(SMnode *pMnode) {
1,916✔
36
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SSMIGRATE, mndRetrieveSsMigrate);
1,916✔
37
  mndSetMsgHandle(pMnode, TDMT_MND_SSMIGRATE_DB_TIMER, mndProcessSsMigrateDbTimer);
1,916✔
38
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SSMIGRATE, mndProcessKillSsMigrateReq);
1,916✔
39
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SSMIGRATE_RSP, mndTransProcessRsp);
1,916✔
40
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_SSMIGRATE_PROGRESS_TIMER, mndProcessUpdateSsMigrateProgressTimer);
1,916✔
41
  mndSetMsgHandle(pMnode, TDMT_VND_LIST_SSMIGRATE_FILESETS_RSP, mndProcessSsMigrateListFileSetsRsp);
1,916✔
42
  mndSetMsgHandle(pMnode, TDMT_VND_SSMIGRATE_FILESET_RSP, mndProcessSsMigrateFileSetRsp);
1,916✔
43
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SSMIGRATE_PROGRESS_RSP, mndProcessQuerySsMigrateProgressRsp);
1,916✔
44
  mndSetMsgHandle(pMnode, TDMT_VND_FOLLOWER_SSMIGRATE_RSP, mndProcessFollowerSsMigrateRsp);
1,916✔
45

46
  SSdbTable table = {
1,916✔
47
      .sdbType = SDB_SSMIGRATE,
48
      .keyType = SDB_KEY_INT32,
49
      .encodeFp = (SdbEncodeFp)mndSsMigrateActionEncode,
50
      .decodeFp = (SdbDecodeFp)mndSsMigrateActionDecode,
51
      .insertFp = (SdbInsertFp)mndSsMigrateActionInsert,
52
      .updateFp = (SdbUpdateFp)mndSsMigrateActionUpdate,
53
      .deleteFp = (SdbDeleteFp)mndSsMigrateActionDelete,
54
  };
55

56
  return sdbSetTable(pMnode->pSdb, table);
1,916✔
57
}
58

59
void mndCleanupSsMigrate(SMnode *pMnode) { mDebug("mnd ssmigrate cleanup"); }
1,915✔
60

UNCOV
61
void tFreeSsMigrateObj(SSsMigrateObj *pSsMigrate) {
×
62
  taosArrayDestroy(pSsMigrate->vgroups);
×
63
  taosArrayDestroy(pSsMigrate->fileSets);
×
64
}
×
65

UNCOV
66
int32_t tSerializeSSsMigrateObj(void *buf, int32_t bufLen, const SSsMigrateObj *pObj) {
×
67
  SEncoder encoder = {0};
×
UNCOV
68
  int32_t  code = 0;
×
69
  int32_t  lino;
70
  int32_t  tlen;
71
  tEncoderInit(&encoder, buf, bufLen);
×
72

73
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
74
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->id));
×
75
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->dbUid));
×
76
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
×
UNCOV
77
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
×
78
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->stateUpdateTime));
×
79
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgIdx));
×
80
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->vgState));
×
81
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->fsetIdx));
×
82
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.nodeId));
×
83
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.vgId));
×
UNCOV
84
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.fid));
×
85
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->currFset.state));
×
UNCOV
86
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->currFset.startTime));
×
87

88
  int32_t numVg = pObj->vgroups ? taosArrayGetSize(pObj->vgroups) : 0;
×
89
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numVg));
×
UNCOV
90
  for (int32_t i = 0; i < numVg; ++i) {
×
91
    int32_t *vgId = (int32_t *)taosArrayGet(pObj->vgroups, i);
×
UNCOV
92
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, *vgId));
×
93
  }
94

UNCOV
95
  int32_t numFset = pObj->fileSets ? taosArrayGetSize(pObj->fileSets) : 0;
×
UNCOV
96
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, numFset));
×
97
  for (int32_t i = 0; i < numFset; ++i) {
×
98
    int32_t *fsetId = (int32_t *)taosArrayGet(pObj->fileSets, i);
×
UNCOV
99
    TAOS_CHECK_EXIT(tEncodeI32(&encoder, *fsetId));
×
100
  }
101
  tEndEncode(&encoder);
×
102

103
_exit:
×
104
  if (code) {
×
105
    tlen = code;
×
106
  } else {
107
    tlen = encoder.pos;
×
108
  }
109
  tEncoderClear(&encoder);
×
110
  return tlen;
×
111
}
112

113
int32_t tDeserializeSSsMigrateObj(void *buf, int32_t bufLen, SSsMigrateObj *pObj) {
×
114
  int32_t  code = TSDB_CODE_SUCCESS, lino;
×
115
  SArray *vgroups = NULL, *fileSets = NULL;
×
116
  SDecoder decoder = {0};
×
UNCOV
117
  tDecoderInit(&decoder, buf, bufLen);
×
118

119
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
UNCOV
120
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->id));
×
121
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->dbUid));
×
122
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
×
123
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
×
124
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->stateUpdateTime));
×
125
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgIdx));
×
126
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->vgState));
×
UNCOV
127
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->fsetIdx));
×
UNCOV
128
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.nodeId));
×
UNCOV
129
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.vgId));
×
130
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.fid));
×
UNCOV
131
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->currFset.state));
×
132
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->currFset.startTime));
×
133

134
  int32_t numVg = 0;
×
135
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numVg));
×
136
  vgroups = pObj->vgroups;
×
137
  if (vgroups) {
×
UNCOV
138
    taosArrayClear(vgroups);
×
139
  } else if ((vgroups = taosArrayInit(numVg, sizeof(int32_t))) == NULL) {
×
UNCOV
140
    code = terrno;
×
141
    goto _exit;
×
142
  }
UNCOV
143
  for (int32_t i = 0; i < numVg; ++i) {
×
144
    int32_t vgId = 0;
×
145
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &vgId));
×
146
    if(taosArrayPush(vgroups, &vgId) == NULL) {
×
147
      TAOS_CHECK_EXIT(terrno);
×
148
    }
149
  }
150

UNCOV
151
  int32_t numFset = 0;
×
152
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &numFset));
×
153
  fileSets = pObj->fileSets;
×
154
  if (fileSets) {
×
155
    taosArrayClear(fileSets);
×
UNCOV
156
  } else if ((fileSets = taosArrayInit(numFset, sizeof(int32_t))) == NULL) {
×
UNCOV
157
    code = terrno;
×
158
    goto _exit;
×
159
  }
160
  for (int32_t i = 0; i < numFset; ++i) {
×
161
    int32_t fsetId = 0;
×
162
    TAOS_CHECK_EXIT(tDecodeI32(&decoder, &fsetId));
×
UNCOV
163
    if(taosArrayPush(fileSets, &fsetId) == NULL) {
×
UNCOV
164
      TAOS_CHECK_EXIT(terrno);
×
165
    }
166
  }
167

168
  tEndDecode(&decoder);
×
169

UNCOV
170
_exit:
×
171
  tDecoderClear(&decoder);
×
172
  if (code == TSDB_CODE_SUCCESS) {
×
173
    pObj->vgroups = vgroups;
×
174
  } else if (pObj->vgroups) {
×
UNCOV
175
    taosArrayClear(pObj->vgroups);
×
176
  } else {
177
    taosArrayDestroy(vgroups);
×
178
  }
179
  if (code == TSDB_CODE_SUCCESS) {
×
180
    pObj->fileSets = fileSets;
×
UNCOV
181
  } else if (pObj->fileSets) {
×
182
    taosArrayClear(pObj->fileSets);
×
183
  } else {
184
    taosArrayDestroy(fileSets);
×
185
  }
186
  return code;
×
187
}
188

UNCOV
189
SSdbRaw *mndSsMigrateActionEncode(SSsMigrateObj *pSsMigrate) {
×
190
  int32_t code = 0;
×
191
  int32_t lino = 0;
×
UNCOV
192
  terrno = TSDB_CODE_SUCCESS;
×
193

194
  void    *buf = NULL;
×
195
  SSdbRaw *pRaw = NULL;
×
196

197
  int32_t tlen = tSerializeSSsMigrateObj(NULL, 0, pSsMigrate);
×
198
  if (tlen < 0) {
×
199
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
200
    goto OVER;
×
201
  }
202

203
  int32_t size = sizeof(int32_t) + tlen;
×
204
  pRaw = sdbAllocRaw(SDB_SSMIGRATE, MND_SSMIGRATE_VER_NUMBER, size);
×
UNCOV
205
  if (pRaw == NULL) {
×
UNCOV
206
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
207
    goto OVER;
×
208
  }
209

210
  buf = taosMemoryMalloc(tlen);
×
UNCOV
211
  if (buf == NULL) {
×
UNCOV
212
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
213
    goto OVER;
×
214
  }
215

216
  tlen = tSerializeSSsMigrateObj(buf, tlen, pSsMigrate);
×
UNCOV
217
  if (tlen < 0) {
×
UNCOV
218
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
219
    goto OVER;
×
220
  }
221

222
  int32_t dataPos = 0;
×
UNCOV
223
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
×
UNCOV
224
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
UNCOV
225
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
×
226

227
OVER:
×
228
  taosMemoryFreeClear(buf);
×
229
  if (terrno != TSDB_CODE_SUCCESS) {
×
230
    mError("ssmigrate:%" PRId32 ", failed to encode to raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
231
    sdbFreeRaw(pRaw);
×
UNCOV
232
    return NULL;
×
233
  }
234

235
  mTrace("ssmigrate:%" PRId32 ", encode to raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
×
236
  return pRaw;
×
237
}
238

239
SSdbRow *mndSsMigrateActionDecode(SSdbRaw *pRaw) {
×
240
  int32_t      code = 0;
×
241
  int32_t      lino = 0;
×
242
  SSdbRow     *pRow = NULL;
×
243
  SSsMigrateObj *pSsMigrate = NULL;
×
244
  void        *buf = NULL;
×
UNCOV
245
  terrno = TSDB_CODE_SUCCESS;
×
246

247
  int8_t sver = 0;
×
248
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
×
UNCOV
249
    goto OVER;
×
250
  }
251

252
  if (sver != MND_SSMIGRATE_VER_NUMBER) {
×
253
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
UNCOV
254
    mError("ssmigrate read invalid ver, data ver: %d, curr ver: %d", sver, MND_SSMIGRATE_VER_NUMBER);
×
UNCOV
255
    goto OVER;
×
256
  }
257

258
  pRow = sdbAllocRow(sizeof(SSsMigrateObj));
×
259
  if (pRow == NULL) {
×
UNCOV
260
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
261
    goto OVER;
×
262
  }
263

UNCOV
264
  pSsMigrate = sdbGetRowObj(pRow);
×
UNCOV
265
  if (pSsMigrate == NULL) {
×
266
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
267
    goto OVER;
×
268
  }
269

270
  int32_t tlen;
271
  int32_t dataPos = 0;
×
272
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
×
273
  buf = taosMemoryMalloc(tlen + 1);
×
UNCOV
274
  if (buf == NULL) {
×
275
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
276
    goto OVER;
×
277
  }
278
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
279

280
  if ((terrno = tDeserializeSSsMigrateObj(buf, tlen, pSsMigrate)) < 0) {
×
281
    goto OVER;
×
282
  }
283

284
OVER:
×
285
  taosMemoryFreeClear(buf);
×
286
  if (terrno != TSDB_CODE_SUCCESS) {
×
287
    mError("ssmigrate:%" PRId32 ", failed to decode from raw:%p since %s", pSsMigrate->id, pRaw, terrstr());
×
288
    taosMemoryFreeClear(pRow);
×
289
    return NULL;
×
290
  }
291

UNCOV
292
  mTrace("ssmigrate:%" PRId32 ", decode from raw:%p, row:%p", pSsMigrate->id, pRaw, pSsMigrate);
×
293
  return pRow;
×
294
}
295

UNCOV
296
int32_t mndSsMigrateActionInsert(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
×
UNCOV
297
  mTrace("ssmigrate:%" PRId32 ", perform insert action", pSsMigrate->id);
×
UNCOV
298
  return 0;
×
299
}
300

301
int32_t mndSsMigrateActionDelete(SSdb *pSdb, SSsMigrateObj *pSsMigrate) {
×
302
  mTrace("ssmigrate:%" PRId32 ", perform delete action", pSsMigrate->id);
×
UNCOV
303
  tFreeSsMigrateObj(pSsMigrate);
×
304
  return 0;
×
305
}
306

UNCOV
307
int32_t mndSsMigrateActionUpdate(SSdb *pSdb, SSsMigrateObj *pOldSsMigrate, SSsMigrateObj *pNewSsMigrate) {
×
308
  mTrace("ssmigrate:%" PRId32 ", perform update action, old row:%p new row:%p", pOldSsMigrate->id, pOldSsMigrate,
×
309
         pNewSsMigrate);
310

UNCOV
311
  return 0;
×
312
}
313

314
SSsMigrateObj *mndAcquireSsMigrate(SMnode *pMnode, int64_t ssMigrateId) {
×
315
  SSdb        *pSdb = pMnode->pSdb;
×
316
  SSsMigrateObj *pSsMigrate = sdbAcquire(pSdb, SDB_SSMIGRATE, &ssMigrateId);
×
UNCOV
317
  if (pSsMigrate == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
318
    terrno = TSDB_CODE_SUCCESS;
×
319
  }
320
  return pSsMigrate;
×
321
}
322

323
void mndReleaseSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
324
  SSdb *pSdb = pMnode->pSdb;
×
325
  sdbRelease(pSdb, pSsMigrate);
×
326
  pSsMigrate = NULL;
×
327
}
×
328

329
int32_t mndSsMigrateGetDbName(SMnode *pMnode, int32_t ssMigrateId, char *dbname, int32_t len) {
×
UNCOV
330
  int32_t      code = 0;
×
UNCOV
331
  SSsMigrateObj *pSsMigrate = mndAcquireSsMigrate(pMnode, ssMigrateId);
×
UNCOV
332
  if (pSsMigrate == NULL) {
×
333
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
334
    if (terrno != 0) code = terrno;
×
335
    TAOS_RETURN(code);
×
336
  }
337

338
  tstrncpy(dbname, pSsMigrate->dbname, len);
×
339
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
UNCOV
340
  TAOS_RETURN(code);
×
341
}
342

343
// ssmigrate db
UNCOV
344
int32_t mndAddSsMigrateToTran(SMnode *pMnode, STrans *pTrans, SSsMigrateObj *pSsMigrate, SDbObj *pDb) {
×
UNCOV
345
  int32_t code = 0;
×
346
  SSdb   *pSdb = pMnode->pSdb;
×
347
  void   *pIter = NULL;
×
348

UNCOV
349
  pSsMigrate->dbUid = pDb->uid;
×
UNCOV
350
  pSsMigrate->id = tGenIdPI32();
×
351
  tstrncpy(pSsMigrate->dbname, pDb->name, sizeof(pSsMigrate->dbname));
×
352
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
UNCOV
353
  pSsMigrate->vgIdx = 0;
×
UNCOV
354
  pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
UNCOV
355
  pSsMigrate->fsetIdx = 0;
×
356

357
  pSsMigrate->vgroups = taosArrayInit(8, sizeof(int32_t));
×
358
  if (pSsMigrate->vgroups == NULL) {
×
359
    TAOS_RETURN(terrno);
×
360
  }
361

362
  while (1) {
×
363
    SVgObj *pVgroup = NULL;
×
364
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
UNCOV
365
    if (pIter == NULL) break;
×
366

367
    if (pVgroup->mountVgId || pVgroup->dbUid != pDb->uid) {
×
368
      sdbRelease(pSdb, pVgroup);
×
369
      continue;
×
370
    }
371

372
    int32_t vgId = pVgroup->vgId;
×
373
    sdbRelease(pSdb, pVgroup);
×
UNCOV
374
    if (taosArrayPush(pSsMigrate->vgroups, &vgId) == NULL) {
×
UNCOV
375
      code = terrno;
×
UNCOV
376
      taosArrayDestroy(pSsMigrate->vgroups);
×
377
      pSsMigrate->vgroups = NULL;
×
378
      TAOS_RETURN(code);
×
379
    }
380
  }
381

UNCOV
382
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
383
  code = terrno;
×
UNCOV
384
  taosArrayDestroy(pSsMigrate->vgroups);
×
385
  pSsMigrate->vgroups = NULL;
×
UNCOV
386
  if (pRaw == NULL) {
×
387
    TAOS_RETURN(code);
×
388
  }
UNCOV
389
  if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
×
UNCOV
390
    sdbFreeRaw(pRaw);
×
391
    TAOS_RETURN(code);
×
392
  }
393

394
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
×
395
    sdbFreeRaw(pRaw);
×
UNCOV
396
    TAOS_RETURN(code);
×
397
  }
398

399
  mInfo("trans:%d, ssmigrate:%d, db:%s, has been added", pTrans->id, pSsMigrate->id, pSsMigrate->dbname);
×
400
  return 0;
×
401
}
402

403
// retrieve ssmigrate
UNCOV
404
int32_t mndRetrieveSsMigrate(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
405
  SMnode      *pMnode = pReq->info.node;
×
406
  SSdb        *pSdb = pMnode->pSdb;
×
407
  int32_t      numOfRows = 0;
×
UNCOV
408
  SSsMigrateObj *pSsMigrate = NULL;
×
UNCOV
409
  char        *sep = NULL;
×
410
  SDbObj      *pDb = NULL;
×
411
  int32_t      code = 0;
×
412
  int32_t      lino = 0;
×
413

414
  if (strlen(pShow->db) > 0) {
×
UNCOV
415
    sep = strchr(pShow->db, '.');
×
UNCOV
416
    if (sep &&
×
UNCOV
417
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
UNCOV
418
      sep++;
×
419
    } else {
420
      pDb = mndAcquireDb(pMnode, pShow->db);
×
421
      if (pDb == NULL) return terrno;
×
422
    }
423
  }
424

UNCOV
425
  while (numOfRows < rows) {
×
UNCOV
426
    pShow->pIter = sdbFetch(pSdb, SDB_SSMIGRATE, pShow->pIter, (void **)&pSsMigrate);
×
427
    if (pShow->pIter == NULL) break;
×
428

429
    SColumnInfoData *pColInfo;
430
    SName            n;
UNCOV
431
    int32_t          cols = 0;
×
432

433
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
434

435
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
436
    RETRIEVE_CHECK_GOTO(
×
437
        colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->id, false), pSsMigrate, &lino, _OVER);
438

439
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
440
    if (pDb != NULL || !IS_SYS_DBNAME(pSsMigrate->dbname)) {
×
UNCOV
441
      SName name = {0};
×
442
      TAOS_CHECK_GOTO(tNameFromString(&name, pSsMigrate->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
×
443
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
×
444
    } else {
UNCOV
445
      tstrncpy(varDataVal(tmpBuf), pSsMigrate->dbname, TSDB_SHOW_SQL_LEN);
×
446
    }
UNCOV
447
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
×
448
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pSsMigrate, &lino, _OVER);
×
449

UNCOV
450
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
451
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pSsMigrate->startTime, false), pSsMigrate, &lino,
×
452
                        _OVER);
453

454
    numOfRows++;
×
455
    sdbRelease(pSdb, pSsMigrate);
×
456
  }
457

458
_OVER:
×
459
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
460
  pShow->numOfRows += numOfRows;
×
461
  mndReleaseDb(pMnode, pDb);
×
462
  return numOfRows;
×
463
}
464

465

466

467
int32_t mndProcessKillSsMigrateReq(SRpcMsg *pReq) {
×
468
  mError("not implemented yet");
×
UNCOV
469
  return TSDB_CODE_OPS_NOT_SUPPORT;
×
470
}
471

472

473

474
int32_t mndSsMigrateDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb);
475

476
static int32_t mndProcessSsMigrateDbTimer(SRpcMsg *pReq) {
×
477
  SMnode *pMnode = pReq->info.node;
×
478
  void *pIter = NULL;
×
479

480
  while (1) {
×
481
    SDbObj *pDb = NULL;
×
482
    pIter = sdbFetch(pMnode->pSdb, SDB_DB, pIter, (void **)&pDb);
×
483
    if (pIter == NULL) {
×
UNCOV
484
      break;
×
485
    }
UNCOV
486
    int32_t code = mndSsMigrateDb(pMnode, NULL, pDb);
×
487
    sdbRelease(pMnode->pSdb, pDb);
×
UNCOV
488
    if (code == TSDB_CODE_SUCCESS) {
×
489
      mInfo("ssmigrate db:%s, has been triggered by timer", pDb->name);
×
490
    } else {
491
      mError("failed to trigger ssmigrate db:%s, code:%d, %s", pDb->name, code, tstrerror(code));
×
492
    }
493
  }
494

495
  TAOS_RETURN(0);
×
496
}
497

498

499
static int32_t mndDropSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
UNCOV
500
  int32_t code = 0, lino = 0;
×
501

502
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "drop-ssmigrate");
×
503
  if (pTrans == NULL) {
×
504
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
505
  }
506

507
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
×
508

UNCOV
509
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
510
  if (pRaw == NULL) {
×
511
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
512
  }
UNCOV
513
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED)) != 0) {
×
UNCOV
514
    sdbFreeRaw(pRaw);
×
515
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
516
  }
517

518
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _exit);
×
519
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
×
520

521
_exit:
×
522
  mndTransDrop(pTrans);
×
523
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
524
    mInfo("ssmigrate:%d was dropped successfully", pSsMigrate->id);
×
525
  } else {
526
    mError("ssmigrate:%d, failed to drop at lino %d since %s", pSsMigrate->id, lino, tstrerror(code));
×
527
  }
528
  return code;
×
529
}
530

531

532
static void mndUpdateSsMigrate(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
UNCOV
533
  int32_t code = 0, lino = 0;
×
534

UNCOV
535
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-ssmigrate");
×
536
  if (pTrans == NULL) {
×
537
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
538
  }
539
  mndTransSetDbName(pTrans, pSsMigrate->dbname, NULL);
×
540

UNCOV
541
  SSdbRaw *pRaw = mndSsMigrateActionEncode(pSsMigrate);
×
542
  if (pRaw == NULL) {
×
543
    TAOS_CHECK_GOTO(terrno, &lino, _exit);
×
544
  }
545
  if ((code = sdbSetRawStatus(pRaw, SDB_STATUS_READY)) != 0) {
×
546
    sdbFreeRaw(pRaw);
×
UNCOV
547
    TAOS_CHECK_GOTO(code, &lino, _exit);
×
548
  }
549

550
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pRaw), &lino, _exit);
×
551
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), &lino, _exit);
×
552

UNCOV
553
_exit:
×
554
  mndTransDrop(pTrans);
×
555
  if (code == TSDB_CODE_SUCCESS) {
×
556
    mTrace("ssmigrate:%d was updated successfully", pSsMigrate->id);
×
557
  } else {
UNCOV
558
    mError("ssmigrate:%d, failed to update at lino %d since %s", pSsMigrate->id, lino, tstrerror(code));
×
559
  }
560
}
×
561

562

563
static void mndSendSsMigrateListFileSetsReq(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
UNCOV
564
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
565
  SVgObj *pVgroup = NULL;
×
566
  void   *pIter = NULL;
×
567

UNCOV
568
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
569
  while (1) {
UNCOV
570
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
UNCOV
571
    if (pIter == NULL) return;
×
572
    if (pVgroup->vgId == vgId) {
×
573
      sdbCancelFetch(pSdb, pIter);
×
574
      break;
×
575
    }
576
  }
577

UNCOV
578
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
579
  sdbRelease(pSdb, pVgroup);
×
580

581
  SListSsMigrateFileSetsReq req = {.ssMigrateId = pSsMigrate->id};
×
582
  int32_t   reqLen = tSerializeSListSsMigrateFileSetsReq(NULL, 0, &req);
×
583
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
UNCOV
584
  SMsgHead *pHead = rpcMallocCont(contLen);
×
585
  if (pHead == NULL) {
×
UNCOV
586
    return;
×
587
  }
588

589
  pHead->contLen = htonl(contLen);
×
UNCOV
590
  pHead->vgId = htonl(vgId);
×
591
  int32_t ret = 0;
×
UNCOV
592
  if ((ret = tSerializeSListSsMigrateFileSetsReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
UNCOV
593
    return;
×
594
  }
595

UNCOV
596
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_LIST_SSMIGRATE_FILESETS, .pCont = pHead, .contLen = contLen};
×
597
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
598
  if (code != 0) {
×
599
    mError("ssmigrate:%d, vgId:%d, failed to send list filesets request to vnode since 0x%x", req.ssMigrateId, vgId, code);
×
UNCOV
600
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
601
    pSsMigrate->vgIdx++;
×
602
  } else {
UNCOV
603
    mInfo("ssmigrate:%d, vgId:%d, list filesets request was sent to vnode", req.ssMigrateId, vgId);
×
UNCOV
604
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_WAITING_FSET_LIST;
×
605
  }
606

607
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
608
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
609
}
610

611

612
static int32_t mndProcessSsMigrateListFileSetsRsp(SRpcMsg *pMsg) {
×
UNCOV
613
  int32_t code = 0, lino = 0;
×
614

615
  if (pMsg->code != 0) {
×
616
    mError("received wrong ssmigrate list filesets response, req code is %s", tstrerror(pMsg->code));
×
617
    TAOS_RETURN(pMsg->code);
×
618
  }
619

620
  SSsMigrateObj *pSsMigrate = NULL;
×
621
  SListSsMigrateFileSetsRsp rsp = {0};
×
622
  code = tDeserializeSListSsMigrateFileSetsRsp(pMsg->pCont, pMsg->contLen, &rsp);
×
UNCOV
623
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
624

625
  SMnode *pMnode = pMsg->info.node;
×
626
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
627
  if (pSsMigrate == NULL) {
×
UNCOV
628
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
629
  }
630

631
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
UNCOV
632
  if (vgId != rsp.vgId) {
×
633
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
634
  }
635

UNCOV
636
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_WAITING_FSET_LIST) {
×
UNCOV
637
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
638
  }
639

640
  // we need to use the new filesets to update the SSsMigrateObj,
641
  // swap is only to make the it is easier to free both of them.
UNCOV
642
  SArray* tmp = pSsMigrate->fileSets;
×
UNCOV
643
  pSsMigrate->fileSets = rsp.pFileSets;
×
UNCOV
644
  rsp.pFileSets = tmp;
×
645

646
  pSsMigrate->fsetIdx = 0;
×
UNCOV
647
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
UNCOV
648
  if (taosArrayGetSize(pSsMigrate->fileSets) == 0) {
×
649
    mInfo("ssmigrate:%d, vgId:%d, no filesets to migrate.", pSsMigrate->id, rsp.vgId);
×
UNCOV
650
    pSsMigrate->vgIdx++;
×
651
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
652
  } else {
653
    mInfo("ssmigrate:%d, vgId:%d, filesets received.", pSsMigrate->id, rsp.vgId);
×
UNCOV
654
    pSsMigrate->currFset.nodeId = 0;
×
UNCOV
655
    pSsMigrate->currFset.vgId = vgId;
×
UNCOV
656
    pSsMigrate->currFset.fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, 0);
×
UNCOV
657
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
658
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
659
  }
660

UNCOV
661
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
662
  
663
_exit:
×
664
  if (code != TSDB_CODE_SUCCESS) {
×
665
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
666
  }
UNCOV
667
  tFreeSListSsMigrateFileSetsRsp(&rsp);
×
668
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
669
  return code;
×
670
}
671

672

673
static void mndSendSsMigrateFileSetReq(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
UNCOV
674
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
675
  SVgObj *pVgroup = NULL;
×
UNCOV
676
  void   *pIter = NULL;
×
677

UNCOV
678
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
679
  while (1) {
UNCOV
680
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
681
    if (pIter == NULL) return;
×
682
    if (pVgroup->vgId == vgId) {
×
UNCOV
683
      sdbCancelFetch(pSdb, pIter);
×
684
      break;
×
685
    }
686
  }
687

688
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
689
  sdbRelease(pSdb, pVgroup);
×
690

691
  SSsMigrateFileSetReq req = { 0 };
×
692
  req.ssMigrateId = pSsMigrate->id;
×
693
  req.nodeId = 0;
×
UNCOV
694
  req.fid = pSsMigrate->currFset.fid;
×
695
  req.startTimeSec = taosGetTimestampSec();
×
696

UNCOV
697
  int32_t   reqLen = tSerializeSSsMigrateFileSetReq(NULL, 0, &req);
×
UNCOV
698
  int32_t   contLen = reqLen + sizeof(SMsgHead);
×
699
  SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
700
  if (pHead == NULL) {
×
UNCOV
701
    return;
×
702
  }
703

704
  pHead->contLen = htonl(contLen);
×
UNCOV
705
  pHead->vgId = htonl(vgId);
×
706
  int32_t ret = 0;
×
707
  if ((ret = tSerializeSSsMigrateFileSetReq((char *)pHead + sizeof(SMsgHead), reqLen, &req)) < 0) {
×
708
    return;
×
709
  }
710

711
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_SSMIGRATE_FILESET, .pCont = pHead, .contLen = contLen};
×
UNCOV
712
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
713
  if (code != 0) {
×
UNCOV
714
    mError("ssmigrate:%d, vgId:%d, fid:%d, failed to send migrate fileset request to vnode since 0x%x", req.ssMigrateId, vgId, req.fid, code);
×
UNCOV
715
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
716
    pSsMigrate->vgIdx++;
×
717
  } else {
718
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, migrate fileset request was sent to vnode", req.ssMigrateId, vgId, req.fid);
×
719
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_STARTING;
×
UNCOV
720
    pSsMigrate->currFset.startTime = req.startTimeSec;
×
721
  }
722

723
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
724
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
725
}
726

727

UNCOV
728
static int32_t mndProcessSsMigrateFileSetRsp(SRpcMsg *pMsg) {
×
UNCOV
729
  int32_t code = 0, lino = 0;
×
730

731
  if (pMsg->code != 0) {
×
732
    mError("received wrong ssmigrate fileset response, error code is %s", tstrerror(pMsg->code));
×
733
    TAOS_RETURN(pMsg->code);
×
734
  }
735

736
  SSsMigrateObj *pSsMigrate = NULL;
×
737
  SSsMigrateFileSetRsp rsp = {0};
×
UNCOV
738
  code = tDeserializeSSsMigrateFileSetRsp(pMsg->pCont, pMsg->contLen, &rsp);
×
739
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
740

741
  SMnode *pMnode = pMsg->info.node;
×
742
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
743
  if (pSsMigrate == NULL) {
×
UNCOV
744
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
745
  }
746

747
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
748
  if (vgId != rsp.vgId) {
×
749
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
750
  }
UNCOV
751
  if (rsp.fid != pSsMigrate->currFset.fid) {
×
UNCOV
752
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
753
  }
754
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_FSET_STARTING) {
×
755
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
756
  }
UNCOV
757
  if (rsp.nodeId <= 0) {
×
UNCOV
758
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
759
  }
760

761
  mInfo("ssmigrate:%d, vgId:%d, fid:%d, leader node is %d", rsp.ssMigrateId, vgId, rsp.fid, rsp.nodeId);
×
762
  pSsMigrate->currFset.nodeId = rsp.nodeId;
×
763
  pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_STARTED;
×
764
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
UNCOV
765
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
766
  
767
_exit:
×
768
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
769
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
770
  }
UNCOV
771
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
UNCOV
772
  return code;
×
773
}
774

775

776

UNCOV
777
static int32_t mndProcessFollowerSsMigrateRsp(SRpcMsg *pReq) {
×
UNCOV
778
  TAOS_RETURN(0);
×
779
}
780

781

UNCOV
782
static void mndSendFollowerSsMigrateReq(SMnode* pMnode, SSsMigrateObj *pSsMigrate) {
×
UNCOV
783
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
784
  SVgObj *pVgroup = NULL;
×
UNCOV
785
  void   *pIter = NULL;
×
786

787
  while (1) {
UNCOV
788
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
UNCOV
789
    if (pIter == NULL) return;
×
790

UNCOV
791
    if (pVgroup->vgId == pSsMigrate->currFset.vgId) {
×
UNCOV
792
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
793
      break;
×
794
    }
795
  }
796

UNCOV
797
  SEpSet  epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
UNCOV
798
  sdbRelease(pSdb, pVgroup);
×
799

UNCOV
800
  SSsMigrateProgress req = {
×
UNCOV
801
    .ssMigrateId = pSsMigrate->id,
×
UNCOV
802
    .nodeId = pSsMigrate->currFset.nodeId,
×
UNCOV
803
    .vgId = pSsMigrate->currFset.vgId,
×
UNCOV
804
    .fid = pSsMigrate->currFset.fid,
×
UNCOV
805
    .state = pSsMigrate->currFset.state,
×
806
  };
807

UNCOV
808
  int32_t          reqLen = tSerializeSSsMigrateProgress(NULL, 0, &req);
×
UNCOV
809
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
UNCOV
810
  SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
811
  if (pHead == NULL) {
×
UNCOV
812
    return;
×
813
  }
814

UNCOV
815
  pHead->contLen = htonl(contLen);
×
UNCOV
816
  pHead->vgId = htonl(req.vgId);
×
UNCOV
817
  TAOS_UNUSED(tSerializeSSsMigrateProgress((char *)pHead + sizeof(SMsgHead), reqLen, &req));
×
UNCOV
818
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_FOLLOWER_SSMIGRATE, .pCont = pHead, .contLen = contLen};
×
819

UNCOV
820
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
UNCOV
821
  if (code != 0) {
×
UNCOV
822
    mError("vgId:%d, ssmigrate:%d, fid:%d, failed to send follower-ssmigrate request since 0x%x", req.ssMigrateId, req.vgId, req.fid, code);
×
823
  } else {
UNCOV
824
    mTrace("vgId:%d, ssmigrate:%d, fid:%d, follower-ssmigrate request sent", req.ssMigrateId, req.vgId, req.fid);
×
825
  }
826
}
827

828

829

UNCOV
830
static int32_t mndProcessQuerySsMigrateProgressRsp(SRpcMsg *pMsg) {
×
UNCOV
831
  int32_t code = 0, lino = 0;
×
832

UNCOV
833
  if (pMsg->code != 0) {
×
UNCOV
834
    mError("received wrong query ssmigrate progress response, error code is %s", tstrerror(pMsg->code));
×
UNCOV
835
    TAOS_RETURN(pMsg->code);
×
836
  }
837

UNCOV
838
  SSsMigrateObj *pSsMigrate = NULL;
×
UNCOV
839
  SSsMigrateProgress rsp = {0};
×
UNCOV
840
  code = tDeserializeSSsMigrateProgress(pMsg->pCont, pMsg->contLen, &rsp);
×
UNCOV
841
  TAOS_CHECK_GOTO(code, &lino, _exit);
×
842

UNCOV
843
  SMnode *pMnode = pMsg->info.node;
×
UNCOV
844
  pSsMigrate = mndAcquireSsMigrate(pMnode, rsp.ssMigrateId);
×
UNCOV
845
  if (pSsMigrate == NULL) {
×
UNCOV
846
    TAOS_CHECK_GOTO(TSDB_CODE_MND_RETURN_VALUE_NULL, &lino, _exit);
×
847
  }
848

UNCOV
849
  if (pSsMigrate->vgState != SSMIGRATE_VGSTATE_FSET_STARTED) {
×
UNCOV
850
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
851
  }
UNCOV
852
  if (rsp.nodeId != pSsMigrate->currFset.nodeId) {
×
UNCOV
853
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
854
  }
UNCOV
855
  if (rsp.vgId != pSsMigrate->currFset.vgId) {
×
UNCOV
856
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
857
  }
UNCOV
858
  if (rsp.fid != pSsMigrate->currFset.fid) {
×
UNCOV
859
    TAOS_CHECK_GOTO(TSDB_CODE_INVALID_MSG, &lino, _exit);
×
860
  }
861

UNCOV
862
  if (rsp.state == pSsMigrate->currFset.state) {
×
UNCOV
863
    mTrace("ssmigrate:%d, vgId:%d, fid:%d, state is %d", rsp.ssMigrateId, rsp.vgId, rsp.fid, rsp.state);
×
864
  } else {
UNCOV
865
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, state is %d", rsp.ssMigrateId, rsp.vgId, rsp.fid, rsp.state);
×
866
  }
UNCOV
867
  pSsMigrate->currFset.state = rsp.state;
×
UNCOV
868
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
UNCOV
869
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
870

UNCOV
871
  mndSendFollowerSsMigrateReq(pMnode, pSsMigrate);
×
872

UNCOV
873
_exit:
×
UNCOV
874
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
875
    mError("%s:%d, error=%s", __func__, lino, tstrerror(code));
×
876
  }
UNCOV
877
  mndReleaseSsMigrate(pMnode, pSsMigrate);
×
UNCOV
878
  return code;
×
879
}
880

881

882

883
// when query migration progress, we need to send the msg to dnode instead of vgroup,
884
// because migration may take a long time, and leader may change during the migration process,
885
// while only the initial leader vnode can handle the migration progress query.
UNCOV
886
void mndSendQuerySsMigrateProgressReq(SMnode *pMnode, SSsMigrateObj *pSsMigrate) {
×
UNCOV
887
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
888
  SDnodeObj *pDnode = NULL;
×
UNCOV
889
  void *pIter = NULL;
×
890

891
  while (1) {
UNCOV
892
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
×
UNCOV
893
    if (pIter == NULL) return;
×
UNCOV
894
    if (pDnode->id == pSsMigrate->currFset.nodeId) {
×
UNCOV
895
      sdbCancelFetch(pSdb, pIter);
×
UNCOV
896
      break;
×
897
    }
898
  }
899

UNCOV
900
  SEpSet epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
901
  sdbRelease(pSdb, pDnode);
×
902

UNCOV
903
  SSsMigrateProgress req = {
×
UNCOV
904
    .ssMigrateId = pSsMigrate->id,
×
UNCOV
905
    .nodeId = pSsMigrate->currFset.nodeId,
×
UNCOV
906
    .vgId = pSsMigrate->currFset.vgId,
×
UNCOV
907
    .fid = pSsMigrate->currFset.fid,
×
908
    .state = SSMIGRATE_FILESET_STATE_IN_PROGRESS,
909
  };
910

UNCOV
911
  int32_t          reqLen = tSerializeSSsMigrateProgress(NULL, 0, &req);
×
UNCOV
912
  int32_t          contLen = reqLen + sizeof(SMsgHead);
×
UNCOV
913
  SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
914
  if (pHead == NULL) {
×
UNCOV
915
    return;
×
916
  }
917

UNCOV
918
  pHead->contLen = htonl(contLen);
×
UNCOV
919
  pHead->vgId = htonl(req.vgId);
×
UNCOV
920
  TAOS_UNUSED(tSerializeSSsMigrateProgress((char *)pHead + sizeof(SMsgHead), reqLen, &req));
×
UNCOV
921
  SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SSMIGRATE_PROGRESS, .pCont = pHead, .contLen = contLen};
×
922

UNCOV
923
  int32_t code = tmsgSendReq(&epSet, &rpcMsg);
×
UNCOV
924
  if (code != 0) {
×
UNCOV
925
    mError("ssmigrate:%d, vgId:%d, fid:%d, failed to send ssmigrate-query-progress request since 0x%x", req.ssMigrateId, req.vgId, req.fid, code)
×
926
  } else {
UNCOV
927
    mTrace("ssmigrate:%d, vgId:%d, fid:%d, ssmigrate-query-progress request sent", req.ssMigrateId, req.vgId, req.fid);
×
928
  }
929
}
930

931

932

UNCOV
933
static void mndUpdateSsMigrateProgress(SMnode* pMnode, SSsMigrateObj* pSsMigrate) {
×
UNCOV
934
  int32_t numVg = taosArrayGetSize(pSsMigrate->vgroups);
×
UNCOV
935
  int32_t numFset = taosArrayGetSize(pSsMigrate->fileSets);
×
936

UNCOV
937
  if (pSsMigrate->vgIdx >= numVg) {
×
UNCOV
938
    mInfo("ssmigrate:%d, all vgroups has been processed", pSsMigrate->id);
×
UNCOV
939
    TAOS_UNUSED(mndDropSsMigrate(pMnode, pSsMigrate));
×
UNCOV
940
    return;
×
941
  }
942

943
  // vgroup state is init, we need to get the list of its file sets
UNCOV
944
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_INIT) {
×
UNCOV
945
    mndSendSsMigrateListFileSetsReq(pMnode, pSsMigrate);
×
UNCOV
946
    return;
×
947
  }
948

UNCOV
949
  int32_t vgId = *(int32_t*)taosArrayGet(pSsMigrate->vgroups, pSsMigrate->vgIdx);
×
950

UNCOV
951
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_WAITING_FSET_LIST) {
×
UNCOV
952
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
UNCOV
953
      mWarn("ssmigrate:%d, vgId:%d, haven't receive file set list in 30 seconds, skip", pSsMigrate->id, vgId);
×
UNCOV
954
      pSsMigrate->vgIdx++;
×
UNCOV
955
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
UNCOV
956
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
UNCOV
957
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
958
    }
UNCOV
959
    return;
×
960
  }
961

UNCOV
962
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED) {
×
UNCOV
963
    mndSendSsMigrateFileSetReq(pMnode, pSsMigrate);
×
UNCOV
964
    return;
×
965
  }
966

UNCOV
967
  if (pSsMigrate->vgState == SSMIGRATE_VGSTATE_FSET_STARTING) {
×
968
    // if timeout, we skip the current vgroup instead of the current file set, because timeout
969
    // of a file set often means the vgroup is not available.
UNCOV
970
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
UNCOV
971
      mWarn("ssmigrate:%d, vgId:%d, fid:%d, haven't receive response in 30 seconds, skip", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
UNCOV
972
      pSsMigrate->vgIdx++;
×
UNCOV
973
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
UNCOV
974
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
UNCOV
975
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
976
    }
UNCOV
977
    return;
×
978
  }
979

980
  // compact need some time, so only reset migration state here and wait the next
981
  // tick to send the first migration request again.
UNCOV
982
  if (pSsMigrate->currFset.state == SSMIGRATE_FILESET_STATE_COMPACT) {
×
UNCOV
983
    mInfo("ssmigrate:%d, vgId:%d, fid:%d, compacting, will retry later", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
UNCOV
984
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
UNCOV
985
    pSsMigrate->currFset.nodeId = 0;
×
UNCOV
986
    pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
UNCOV
987
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
UNCOV
988
    mndUpdateSsMigrate(pMnode, pSsMigrate);
×
UNCOV
989
    return;
×
990
  }
991

UNCOV
992
  if (pSsMigrate->currFset.state == SSMIGRATE_FILESET_STATE_IN_PROGRESS) {
×
UNCOV
993
    if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime > 30) {
×
UNCOV
994
      mWarn("ssmigrate:%d, vgId:%d, fid:%d, haven't receive state in 30 seconds, skip", pSsMigrate->id, vgId, pSsMigrate->currFset.fid);
×
UNCOV
995
      pSsMigrate->vgIdx++;
×
UNCOV
996
      pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
UNCOV
997
      pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
UNCOV
998
      mndUpdateSsMigrate(pMnode, pSsMigrate);
×
999
    } else {
UNCOV
1000
      mndSendQuerySsMigrateProgressReq(pMnode, pSsMigrate);
×
1001
    }
UNCOV
1002
    return;
×
1003
  }
1004

1005
  // wait at least 30 seconds after the leader node has processed the file set, this is to ensure
1006
  // that the follower nodes have enough time to start process the file set, and make the code of
1007
  // tsdb simpler.
UNCOV
1008
  if (taosGetTimestampSec() - pSsMigrate->stateUpdateTime < 30) {
×
UNCOV
1009
    return;
×
1010
  }
1011

1012
  // this file set has been processed, move to the next file set
UNCOV
1013
  pSsMigrate->fsetIdx++;
×
UNCOV
1014
  if (pSsMigrate->fsetIdx >= numFset) {
×
UNCOV
1015
    pSsMigrate->vgIdx++;
×
UNCOV
1016
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_INIT;
×
1017
  } else {
UNCOV
1018
    pSsMigrate->vgState = SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED;
×
UNCOV
1019
    pSsMigrate->currFset.nodeId = 0;
×
UNCOV
1020
    pSsMigrate->currFset.vgId = vgId;
×
UNCOV
1021
    pSsMigrate->currFset.fid = *(int32_t*)taosArrayGet(pSsMigrate->fileSets, pSsMigrate->fsetIdx);
×
UNCOV
1022
    pSsMigrate->currFset.state = SSMIGRATE_FILESET_STATE_IN_PROGRESS;
×
1023
  }
1024

UNCOV
1025
  pSsMigrate->stateUpdateTime = taosGetTimestampSec();
×
UNCOV
1026
  mndUpdateSsMigrate(pMnode, pSsMigrate);
×
1027
}
1028

1029

UNCOV
1030
static int32_t mndProcessUpdateSsMigrateProgressTimer(SRpcMsg *pReq) {
×
UNCOV
1031
  mTrace("start to process update ssmigrate progress timer");
×
1032

UNCOV
1033
  int32_t code = 0;
×
UNCOV
1034
  SMnode* pMnode = pReq->info.node;
×
UNCOV
1035
  SSdb   *pSdb = pMnode->pSdb;
×
UNCOV
1036
  void *pIter = NULL;
×
1037

UNCOV
1038
  while (1) {
×
UNCOV
1039
    SSsMigrateObj *pSsMigrate = NULL;
×
UNCOV
1040
    pIter = sdbFetch(pMnode->pSdb, SDB_SSMIGRATE, pIter, (void **)&pSsMigrate);
×
UNCOV
1041
    if (pIter == NULL) {
×
UNCOV
1042
      break;
×
1043
    }
UNCOV
1044
    mndUpdateSsMigrateProgress(pMnode, pSsMigrate);
×
UNCOV
1045
    sdbRelease(pSdb, pSsMigrate);
×
1046
  }
1047

UNCOV
1048
  return 0;
×
1049
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc