• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4923

09 Jan 2026 08:13AM UTC coverage: 65.373% (+0.2%) from 65.161%
#4923

push

travis-ci

web-flow
merge: from main to 3.0 branch #34232

33 of 56 new or added lines in 8 files covered. (58.93%)

3042 existing lines in 131 files now uncovered.

198273 of 303297 relevant lines covered (65.37%)

118980690.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.9
/source/dnode/mnode/impl/src/mndScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndScan.h"
16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndDnode.h"
19
#include "mndPrivilege.h"
20
#include "mndScan.h"
21
#include "mndScanDetail.h"
22
#include "mndShow.h"
23
#include "mndTrans.h"
24
#include "mndUser.h"
25
#include "mndVgroup.h"
26
#include "tmisce.h"
27
#include "tmsgcb.h"
28

29
#define MND_SCAN_VER_NUMBER 1
30
#define MND_SCAN_ID_LEN     11
31

32
static int32_t  mndProcessScanTimer(SRpcMsg *pReq);
33
static SSdbRaw *mndScanActionEncode(SScanObj *pScan);
34
static SSdbRow *mndScanActionDecode(SSdbRaw *pRaw);
35
static int32_t  mndScanActionInsert(SSdb *pSdb, SScanObj *pScan);
36
static int32_t  mndScanActionUpdate(SSdb *pSdb, SScanObj *pOldScan, SScanObj *pNewScan);
37
static int32_t  mndScanActionDelete(SSdb *pSdb, SScanObj *pScan);
38
static int32_t  mndRetrieveScan(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
39
static int32_t  mndProcessKillScanReq(SRpcMsg *pReq);
40
static int32_t  mndProcessQueryScanRsp(SRpcMsg *pReq);
41

42
int32_t mndInitScan(SMnode *pMnode) {
398,397✔
43
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SCAN, mndRetrieveScan);
398,397✔
44
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SCAN, mndProcessKillScanReq);
398,397✔
45
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SCAN_PROGRESS_RSP, mndProcessQueryScanRsp);
398,397✔
46
  mndSetMsgHandle(pMnode, TDMT_MND_SCAN_TIMER, mndProcessScanTimer);
398,397✔
47
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SCAN_RSP, mndTransProcessRsp);
398,397✔
48

49
  SSdbTable table = {
398,397✔
50
      .sdbType = SDB_SCAN,
51
      .keyType = SDB_KEY_INT32,
52
      .encodeFp = (SdbEncodeFp)mndScanActionEncode,
53
      .decodeFp = (SdbDecodeFp)mndScanActionDecode,
54
      .insertFp = (SdbInsertFp)mndScanActionInsert,
55
      .updateFp = (SdbUpdateFp)mndScanActionUpdate,
56
      .deleteFp = (SdbDeleteFp)mndScanActionDelete,
57
  };
58

59
  return sdbSetTable(pMnode->pSdb, table);
398,397✔
60
}
61

62
void mndCleanupScan(SMnode *pMnode) { mDebug("mnd scan cleanup"); }
398,338✔
63

64
void tFreeScanObj(SScanObj *pScan) {}
480✔
65

66
static int32_t tSerializeSScanObj(void *buf, int32_t bufLen, const SScanObj *pObj) {
1,152✔
67
  SEncoder encoder = {0};
1,152✔
68
  int32_t  code = 0;
1,152✔
69
  int32_t  lino;
70
  int32_t  tlen;
71
  tEncoderInit(&encoder, buf, bufLen);
1,152✔
72

73
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,152✔
74
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->scanId));
2,304✔
75
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
2,304✔
76
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
2,304✔
77
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
2,304✔
78

79
  tEndEncode(&encoder);
1,152✔
80

81
_exit:
1,152✔
82
  if (code) {
1,152✔
83
    tlen = code;
×
84
  } else {
85
    tlen = encoder.pos;
1,152✔
86
  }
87
  tEncoderClear(&encoder);
1,152✔
88
  return tlen;
1,152✔
89
}
90

91
int32_t tDeserializeSScanObj(void *buf, int32_t bufLen, SScanObj *pObj) {
480✔
92
  int32_t  code = 0;
480✔
93
  int32_t  lino;
94
  SDecoder decoder = {0};
480✔
95
  tDecoderInit(&decoder, buf, bufLen);
480✔
96

97
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
480✔
98
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->scanId));
960✔
99
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
480✔
100
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
960✔
101
  if (!tDecodeIsEnd(&decoder)) {
480✔
102
    TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
960✔
103
  } else {
104
    pObj->dbUid = 0;
×
105
  }
106

107
  tEndDecode(&decoder);
480✔
108

109
_exit:
480✔
110
  tDecoderClear(&decoder);
480✔
111
  return code;
480✔
112
}
113

114
static SSdbRaw *mndScanActionEncode(SScanObj *pScan) {
576✔
115
  int32_t code = 0;
576✔
116
  int32_t lino = 0;
576✔
117
  terrno = TSDB_CODE_SUCCESS;
576✔
118

119
  void    *buf = NULL;
576✔
120
  SSdbRaw *pRaw = NULL;
576✔
121

122
  int32_t tlen = tSerializeSScanObj(NULL, 0, pScan);
576✔
123
  if (tlen < 0) {
576✔
124
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
125
    goto OVER;
×
126
  }
127

128
  int32_t size = sizeof(int32_t) + tlen;
576✔
129
  pRaw = sdbAllocRaw(SDB_SCAN, MND_SCAN_VER_NUMBER, size);
576✔
130
  if (pRaw == NULL) {
576✔
131
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
132
    goto OVER;
×
133
  }
134

135
  buf = taosMemoryMalloc(tlen);
576✔
136
  if (buf == NULL) {
576✔
137
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
138
    goto OVER;
×
139
  }
140

141
  tlen = tSerializeSScanObj(buf, tlen, pScan);
576✔
142
  if (tlen < 0) {
576✔
143
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
144
    goto OVER;
×
145
  }
146

147
  int32_t dataPos = 0;
576✔
148
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
576✔
149
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
576✔
150
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
576✔
151

152
OVER:
576✔
153
  taosMemoryFreeClear(buf);
576✔
154
  if (terrno != TSDB_CODE_SUCCESS) {
576✔
155
    mError("scan:%" PRId32 ", failed to encode to raw:%p since %s", pScan->scanId, pRaw, terrstr());
×
156
    sdbFreeRaw(pRaw);
×
157
    return NULL;
×
158
  }
159

160
  mTrace("scan:%" PRId32 ", encode to raw:%p, row:%p", pScan->scanId, pRaw, pScan);
576✔
161
  return pRaw;
576✔
162
}
163

164
static SSdbRow *mndScanActionDecode(SSdbRaw *pRaw) {
480✔
165
  int32_t   code = 0;
480✔
166
  int32_t   lino = 0;
480✔
167
  SSdbRow  *pRow = NULL;
480✔
168
  SScanObj *pScan = NULL;
480✔
169
  void     *buf = NULL;
480✔
170
  terrno = TSDB_CODE_SUCCESS;
480✔
171

172
  int8_t sver = 0;
480✔
173
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
480✔
174
    goto OVER;
×
175
  }
176

177
  if (sver != MND_SCAN_VER_NUMBER) {
480✔
178
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
179
    mError("scan read invalid ver, data ver: %d, curr ver: %d", sver, MND_SCAN_VER_NUMBER);
×
180
    goto OVER;
×
181
  }
182

183
  pRow = sdbAllocRow(sizeof(SScanObj));
480✔
184
  if (pRow == NULL) {
480✔
185
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
186
    goto OVER;
×
187
  }
188

189
  pScan = sdbGetRowObj(pRow);
480✔
190
  if (pScan == NULL) {
480✔
191
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
192
    goto OVER;
×
193
  }
194

195
  int32_t tlen;
480✔
196
  int32_t dataPos = 0;
480✔
197
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
480✔
198
  buf = taosMemoryMalloc(tlen + 1);
480✔
199
  if (buf == NULL) {
480✔
200
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
201
    goto OVER;
×
202
  }
203
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
480✔
204

205
  if ((terrno = tDeserializeSScanObj(buf, tlen, pScan)) < 0) {
480✔
206
    goto OVER;
×
207
  }
208

209
OVER:
480✔
210
  taosMemoryFreeClear(buf);
480✔
211
  if (terrno != TSDB_CODE_SUCCESS) {
480✔
212
    mError("scan:%" PRId32 ", failed to decode from raw:%p since %s", pScan->scanId, pRaw, terrstr());
×
213
    taosMemoryFreeClear(pRow);
×
214
    return NULL;
×
215
  }
216

217
  mTrace("scan:%" PRId32 ", decode from raw:%p, row:%p", pScan->scanId, pRaw, pScan);
480✔
218
  return pRow;
480✔
219
}
220

221
static int32_t mndScanActionInsert(SSdb *pSdb, SScanObj *pScan) {
192✔
222
  mTrace("scan:%" PRId32 ", perform insert action", pScan->scanId);
192✔
223
  return 0;
192✔
224
}
225

226
static int32_t mndScanActionDelete(SSdb *pSdb, SScanObj *pScan) {
480✔
227
  mTrace("scan:%" PRId32 ", perform delete action", pScan->scanId);
480✔
228
  tFreeScanObj(pScan);
480✔
229
  return 0;
480✔
230
}
231

232
static int32_t mndScanActionUpdate(SSdb *pSdb, SScanObj *pOldScan, SScanObj *pNewScan) {
96✔
233
  mTrace("scan:%" PRId32 ", perform update action, old row:%p new row:%p", pOldScan->scanId, pOldScan, pNewScan);
96✔
234

235
  return 0;
96✔
236
}
237

238
static SScanObj *mndAcquireScan(SMnode *pMnode, int64_t scanId) {
1,056✔
239
  SSdb     *pSdb = pMnode->pSdb;
1,056✔
240
  SScanObj *pScan = sdbAcquire(pSdb, SDB_SCAN, &scanId);
1,056✔
241
  if (pScan == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
1,056✔
242
    terrno = TSDB_CODE_SUCCESS;
×
243
  }
244
  return pScan;
1,056✔
245
}
246

247
static void mndReleaseScan(SMnode *pMnode, SScanObj *pScan) {
1,056✔
248
  SSdb *pSdb = pMnode->pSdb;
1,056✔
249
  sdbRelease(pSdb, pScan);
1,056✔
250
  pScan = NULL;
1,056✔
251
}
1,056✔
252

253
static int32_t mndScanGetDbInfo(SMnode *pMnode, int32_t scanId, char *dbname, int32_t len, int64_t *dbUid) {
384✔
254
  int32_t   code = 0;
384✔
255
  SScanObj *pScan = mndAcquireScan(pMnode, scanId);
384✔
256
  if (pScan == NULL) {
384✔
257
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
258
    if (terrno != 0) code = terrno;
×
259
    TAOS_RETURN(code);
×
260
  }
261

262
  tstrncpy(dbname, pScan->dbname, len);
384✔
263
  if (dbUid) *dbUid = pScan->dbUid;
384✔
264
  mndReleaseScan(pMnode, pScan);
384✔
265
  TAOS_RETURN(code);
384✔
266
}
267

268
// scan db
269
int32_t mndAddScanToTran(SMnode *pMnode, STrans *pTrans, SScanObj *pScan, SDbObj *pDb, SScanDbRsp *rsp) {
288✔
270
  int32_t code = 0;
288✔
271
  pScan->scanId = tGenIdPI32();
288✔
272

273
  tstrncpy(pScan->dbname, pDb->name, sizeof(pScan->dbname));
288✔
274
  pScan->dbUid = pDb->uid;
288✔
275

276
  pScan->startTime = taosGetTimestampMs();
288✔
277

278
  SSdbRaw *pVgRaw = mndScanActionEncode(pScan);
288✔
279
  if (pVgRaw == NULL) {
288✔
280
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
281
    if (terrno != 0) code = terrno;
×
282
    TAOS_RETURN(code);
×
283
  }
284
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
288✔
285
    sdbFreeRaw(pVgRaw);
×
286
    TAOS_RETURN(code);
×
287
  }
288

289
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
288✔
290
    sdbFreeRaw(pVgRaw);
×
291
    TAOS_RETURN(code);
×
292
  }
293

294
  rsp->scanId = pScan->scanId;
288✔
295

296
  return 0;
288✔
297
}
298

299
// retrieve scan
300
static int32_t mndRetrieveScan(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
2,112✔
301
  SMnode   *pMnode = pReq->info.node;
2,112✔
302
  SSdb     *pSdb = pMnode->pSdb;
2,112✔
303
  int32_t   numOfRows = 0;
2,112✔
304
  SScanObj *pScan = NULL;
2,112✔
305
  char     *sep = NULL;
2,112✔
306
  SDbObj   *pDb = NULL;
2,112✔
307
  int32_t   code = 0;
2,112✔
308
  int32_t   lino = 0;
2,112✔
309
  SUserObj *pUser = NULL;
2,112✔
310
  SDbObj   *pIterDb = NULL;
2,112✔
311
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
2,112✔
312
  bool      showAll = false, showIter = false;
2,112✔
313
  int64_t   dbUid = 0;
2,112✔
314

315
  if (strlen(pShow->db) > 0) {
2,112✔
316
    sep = strchr(pShow->db, '.');
×
317
    if (sep &&
×
318
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
319
      sep++;
×
320
    } else {
321
      pDb = mndAcquireDb(pMnode, pShow->db);
×
322
      if (pDb == NULL) return terrno;
×
323
    }
324
  }
325

326
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), PRIV_SHOW_SCANS, PRIV_OBJ_DB, 0, _OVER);
2,112✔
327

328
  while (numOfRows < rows) {
4,128✔
329
    pShow->pIter = sdbFetch(pSdb, SDB_SCAN, pShow->pIter, (void **)&pScan);
4,128✔
330
    if (pShow->pIter == NULL) break;
4,128✔
331

332
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pScan->dbname, pScan, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_SCANS, _OVER);
2,016✔
333

334
    SColumnInfoData *pColInfo;
335
    SName            n;
336
    int32_t          cols = 0;
2,016✔
337

338
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
2,016✔
339

340
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,016✔
341
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pScan->scanId, false), pScan, &lino, _OVER);
2,016✔
342

343
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,016✔
344
    if (pDb != NULL || !IS_SYS_DBNAME(pScan->dbname)) {
4,032✔
345
      SName name = {0};
2,016✔
346
      TAOS_CHECK_GOTO(tNameFromString(&name, pScan->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
2,016✔
347
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
2,016✔
348
    } else {
349
      tstrncpy(varDataVal(tmpBuf), pScan->dbname, TSDB_SHOW_SQL_LEN);
×
350
    }
351
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
2,016✔
352
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pScan, &lino, _OVER);
2,016✔
353

354
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,016✔
355
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pScan->startTime, false), pScan, &lino,
2,016✔
356
                        _OVER);
357

358
    numOfRows++;
2,016✔
359
    sdbRelease(pSdb, pScan);
2,016✔
360
  }
361

362
_OVER:
2,112✔
363
  if (pUser) mndReleaseUser(pMnode, pUser);
2,112✔
364
  mndReleaseDb(pMnode, pDb);
2,112✔
365
  if (code != 0) {
2,112✔
366
    mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
367
    TAOS_RETURN(code);
×
368
  }
369
  pShow->numOfRows += numOfRows;
2,112✔
370
  return numOfRows;
2,112✔
371
}
372

373
// kill scan
374
static void *mndBuildKillScanReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t scanId, int32_t dnodeid) {
96✔
375
  SVKillScanReq req = {0};
96✔
376
  req.scanId = scanId;
96✔
377
  req.vgId = pVgroup->vgId;
96✔
378
  req.dnodeId = dnodeid;
96✔
379
  terrno = 0;
96✔
380

381
  mInfo("vgId:%d, build scan vnode config req", pVgroup->vgId);
96✔
382
  int32_t contLen = tSerializeSVKillScanReq(NULL, 0, &req);
96✔
383
  if (contLen < 0) {
96✔
384
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
385
    return NULL;
×
386
  }
387
  contLen += sizeof(SMsgHead);
96✔
388

389
  void *pReq = taosMemoryMalloc(contLen);
96✔
390
  if (pReq == NULL) {
96✔
391
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
392
    return NULL;
×
393
  }
394

395
  SMsgHead *pHead = pReq;
96✔
396
  pHead->contLen = htonl(contLen);
96✔
397
  pHead->vgId = htonl(pVgroup->vgId);
96✔
398

399
  mTrace("vgId:%d, build scan vnode config req, contLen:%d", pVgroup->vgId, contLen);
96✔
400
  int32_t ret = 0;
96✔
401
  if ((ret = tSerializeSVKillScanReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
96✔
402
    taosMemoryFreeClear(pReq);
×
403
    terrno = ret;
×
404
    return NULL;
×
405
  }
406
  *pContLen = contLen;
96✔
407
  return pReq;
96✔
408
}
409

410
static int32_t mndAddKillScanAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t scanId, int32_t dnodeid) {
96✔
411
  int32_t      code = 0;
96✔
412
  STransAction action = {0};
96✔
413

414
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
96✔
415
  if (pDnode == NULL) {
96✔
416
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
417
    if (terrno != 0) code = terrno;
×
418
    TAOS_RETURN(code);
×
419
  }
420
  action.epSet = mndGetDnodeEpset(pDnode);
96✔
421
  mndReleaseDnode(pMnode, pDnode);
96✔
422

423
  int32_t contLen = 0;
96✔
424
  void   *pReq = mndBuildKillScanReq(pMnode, pVgroup, &contLen, scanId, dnodeid);
96✔
425
  if (pReq == NULL) {
96✔
426
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
427
    if (terrno != 0) code = terrno;
×
428
    TAOS_RETURN(code);
×
429
  }
430

431
  action.pCont = pReq;
96✔
432
  action.contLen = contLen;
96✔
433
  action.msgType = TDMT_VND_KILL_SCAN;
96✔
434

435
  mTrace("trans:%d, kill scan msg len:%d", pTrans->id, contLen);
96✔
436

437
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
96✔
438
    taosMemoryFree(pReq);
×
439
    TAOS_RETURN(code);
×
440
  }
441

442
  return 0;
96✔
443
}
444

445
static int32_t mndKillScan(SMnode *pMnode, SRpcMsg *pReq, SScanObj *pScan) {
96✔
446
  int32_t code = 0;
96✔
447
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-scan");
96✔
448
  if (pTrans == NULL) {
96✔
449
    mError("scan:%" PRId32 ", failed to drop since %s", pScan->scanId, terrstr());
×
450
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
451
    if (terrno != 0) code = terrno;
×
452
    TAOS_RETURN(code);
×
453
  }
454
  mInfo("trans:%d, used to kill scan:%" PRId32, pTrans->id, pScan->scanId);
96✔
455

456
  mndTransSetDbName(pTrans, pScan->dbname, NULL);
96✔
457

458
  SSdbRaw *pCommitRaw = mndScanActionEncode(pScan);
96✔
459
  if (pCommitRaw == NULL) {
96✔
460
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
461
    if (terrno != 0) code = terrno;
×
462
    mndTransDrop(pTrans);
×
463
    TAOS_RETURN(code);
×
464
  }
465
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
96✔
466
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
467
    mndTransDrop(pTrans);
×
468
    TAOS_RETURN(code);
×
469
  }
470
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
96✔
471
    mndTransDrop(pTrans);
×
472
    TAOS_RETURN(code);
×
473
  }
474

475
  void *pIter = NULL;
96✔
476
  while (1) {
192✔
477
    SScanDetailObj *pDetail = NULL;
288✔
478
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
288✔
479
    if (pIter == NULL) break;
288✔
480

481
    if (pDetail->scanId == pScan->scanId) {
192✔
482
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
96✔
483
      if (pVgroup == NULL) {
96✔
484
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
485
        sdbCancelFetch(pMnode->pSdb, pIter);
×
486
        sdbRelease(pMnode->pSdb, pDetail);
×
487
        mndTransDrop(pTrans);
×
488
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
489
        if (terrno != 0) code = terrno;
×
490
        TAOS_RETURN(code);
×
491
      }
492

493
      if ((code = mndAddKillScanAction(pMnode, pTrans, pVgroup, pScan->scanId, pDetail->dnodeId)) != 0) {
96✔
494
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
495
        sdbCancelFetch(pMnode->pSdb, pIter);
×
496
        sdbRelease(pMnode->pSdb, pDetail);
×
497
        mndTransDrop(pTrans);
×
498
        TAOS_RETURN(code);
×
499
      }
500

501
      mndReleaseVgroup(pMnode, pVgroup);
96✔
502
    }
503

504
    sdbRelease(pMnode->pSdb, pDetail);
192✔
505
  }
506

507
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
96✔
508
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
509
    mndTransDrop(pTrans);
×
510
    TAOS_RETURN(code);
×
511
  }
512

513
  mndTransDrop(pTrans);
96✔
514
  return 0;
96✔
515
}
516

517
static int32_t mndProcessKillScanReq(SRpcMsg *pReq) {
96✔
518
  int32_t      code = 0;
96✔
519
  int32_t      lino = 0;
96✔
520
  SKillScanReq killScanReq = {0};
96✔
521

522
  if ((code = tDeserializeSKillScanReq(pReq->pCont, pReq->contLen, &killScanReq)) != 0) {
96✔
523
    TAOS_RETURN(code);
×
524
  }
525

526
  mInfo("start to kill scan:%" PRId32, killScanReq.scanId);
96✔
527

528
  SMnode   *pMnode = pReq->info.node;
96✔
529
  SScanObj *pScan = mndAcquireScan(pMnode, killScanReq.scanId);
96✔
530
  if (pScan == NULL) {
96✔
531
    code = TSDB_CODE_MND_INVALID_SCAN_ID;
×
532
    tFreeSKillScanReq(&killScanReq);
×
533
    TAOS_RETURN(code);
×
534
  }
535

536
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_SCAN_DB), &lino, _OVER);
96✔
537

538
  TAOS_CHECK_GOTO(mndKillScan(pMnode, pReq, pScan), &lino, _OVER);
96✔
539

540
  code = TSDB_CODE_ACTION_IN_PROGRESS;
96✔
541

542
#if 0
543
  char    obj[TSDB_INT32_ID_LEN] = {0};
544
  int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pScan->scanId);
545
  if ((uint32_t)nBytes < sizeof(obj)) {
546
    auditRecord(pReq, pMnode->clusterId, "killScan", pScan->dbname, obj, killScanReq.sql, killScanReq.sqlLen);
547
  } else {
548
    mError("scan:%" PRId32 " failed to audit since %s", pScan->scanId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
549
  }
550
#endif
551
_OVER:
96✔
552
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
96✔
553
    mError("failed to kill scan %" PRId32 " since %s", killScanReq.scanId, terrstr());
×
554
  }
555

556
  tFreeSKillScanReq(&killScanReq);
96✔
557
  mndReleaseScan(pMnode, pScan);
96✔
558

559
  TAOS_RETURN(code);
96✔
560
}
561

562
// update progress
563
static int32_t mndUpdateScanProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t scanId, SQueryScanProgressRsp *rsp) {
480✔
564
  int32_t code = 0;
480✔
565

566
  void *pIter = NULL;
480✔
567
  while (1) {
192✔
568
    SScanDetailObj *pDetail = NULL;
672✔
569
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
672✔
570
    if (pIter == NULL) break;
672✔
571

572
    if (pDetail->scanId == scanId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
672✔
573
      pDetail->newNumberFileset = rsp->numberFileset;
480✔
574
      pDetail->newFinished = rsp->finished;
480✔
575
      pDetail->progress = rsp->progress;
480✔
576
      pDetail->remainingTime = rsp->remainingTime;
480✔
577

578
      sdbCancelFetch(pMnode->pSdb, pIter);
480✔
579
      sdbRelease(pMnode->pSdb, pDetail);
480✔
580

581
      TAOS_RETURN(code);
480✔
582
    }
583

584
    sdbRelease(pMnode->pSdb, pDetail);
192✔
585
  }
586

UNCOV
587
  return TSDB_CODE_MND_SCAN_DETAIL_NOT_EXIST;
×
588
}
589

590
static int32_t mndProcessQueryScanRsp(SRpcMsg *pReq) {
480✔
591
  int32_t               code = 0;
480✔
592
  SQueryScanProgressRsp req = {0};
480✔
593
  if (pReq->code != 0) {
480✔
594
    mError("received wrong scan response, req code is %s", tstrerror(pReq->code));
×
595
    TAOS_RETURN(pReq->code);
×
596
  }
597
  code = tDeserializeSQueryScanProgressRsp(pReq->pCont, pReq->contLen, &req);
480✔
598
  if (code != 0) {
480✔
599
    mError("failed to deserialize vnode-query-scan-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
600
           pReq->contLen);
601
    TAOS_RETURN(code);
×
602
  }
603

604
  mDebug("scan:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.scanId, req.vgId,
480✔
605
         req.dnodeId, req.numberFileset, req.finished);
606

607
  SMnode *pMnode = pReq->info.node;
480✔
608

609
  code = mndUpdateScanProgress(pMnode, pReq, req.scanId, &req);
480✔
610
  if (code != 0) {
480✔
UNCOV
611
    mError("scan:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.scanId,
×
612
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
UNCOV
613
    TAOS_RETURN(code);
×
614
  }
615

616
  TAOS_RETURN(code);
480✔
617
}
618

619
// timer
620
static void mndScanSendProgressReq(SMnode *pMnode, SScanObj *pScan) {
384✔
621
  void *pIter = NULL;
384✔
622

623
  while (1) {
624✔
624
    SScanDetailObj *pDetail = NULL;
1,008✔
625
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
1,008✔
626
    if (pIter == NULL) break;
1,008✔
627

628
    if (pDetail->scanId == pScan->scanId) {
624✔
629
      SEpSet epSet = {0};
480✔
630

631
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
480✔
632
      if (pDnode == NULL) break;
480✔
633
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
480✔
634
        sdbRelease(pMnode->pSdb, pDetail);
×
635
        continue;
×
636
      }
637
      mndReleaseDnode(pMnode, pDnode);
480✔
638

639
      SQueryScanProgressReq req;
480✔
640
      req.scanId = pDetail->scanId;
480✔
641
      req.vgId = pDetail->vgId;
480✔
642
      req.dnodeId = pDetail->dnodeId;
480✔
643

644
      int32_t contLen = tSerializeSQueryScanProgressReq(NULL, 0, &req);
480✔
645
      if (contLen < 0) {
480✔
646
        sdbRelease(pMnode->pSdb, pDetail);
×
647
        continue;
×
648
      }
649

650
      contLen += sizeof(SMsgHead);
480✔
651

652
      SMsgHead *pHead = rpcMallocCont(contLen);
480✔
653
      if (pHead == NULL) {
480✔
654
        sdbRelease(pMnode->pSdb, pDetail);
×
655
        continue;
×
656
      }
657

658
      pHead->contLen = htonl(contLen);
480✔
659
      pHead->vgId = htonl(pDetail->vgId);
480✔
660

661
      if (tSerializeSQueryScanProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
480✔
662
        sdbRelease(pMnode->pSdb, pDetail);
×
663
        continue;
×
664
      }
665

666
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SCAN_PROGRESS, .contLen = contLen};
480✔
667

668
      rpcMsg.pCont = pHead;
480✔
669

670
      char    detail[1024] = {0};
480✔
671
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
960✔
672
                              TMSG_INFO(TDMT_VND_QUERY_SCAN_PROGRESS), epSet.numOfEps, epSet.inUse);
960✔
673
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
960✔
674
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
480✔
675
      }
676

677
      mDebug("scan:%d, send update progress msg to %s", pDetail->scanId, detail);
480✔
678

679
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
480✔
680
        sdbRelease(pMnode->pSdb, pDetail);
×
681
        continue;
×
682
      }
683
    }
684

685
    sdbRelease(pMnode->pSdb, pDetail);
624✔
686
  }
687
}
384✔
688

689
static int32_t mndSaveScanProgress(SMnode *pMnode, int32_t scanId) {
384✔
690
  int32_t code = 0;
384✔
691
  bool    needSave = false;
384✔
692
  void   *pIter = NULL;
384✔
693
  while (1) {
624✔
694
    SScanDetailObj *pDetail = NULL;
1,008✔
695
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
1,008✔
696
    if (pIter == NULL) break;
1,008✔
697

698
    if (pDetail->scanId == scanId) {
624✔
699
      mDebug(
480✔
700
          "scan:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
701
          "newNumberFileset:%d, newFinished:%d",
702
          pDetail->scanId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
703
          pDetail->newNumberFileset, pDetail->newFinished);
704

705
      // these 2 number will jump back after dnode restart, so < is not used here
706
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
480✔
707
        needSave = true;
240✔
708
    }
709

710
    sdbRelease(pMnode->pSdb, pDetail);
624✔
711
  }
712

713
  char    dbname[TSDB_TABLE_FNAME_LEN] = {0};
384✔
714
  int64_t dbUid = 0;
384✔
715
  TAOS_CHECK_RETURN(mndScanGetDbInfo(pMnode, scanId, dbname, TSDB_TABLE_FNAME_LEN, &dbUid));
384✔
716

717
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
384✔
718
    needSave = true;
×
719
    mWarn("scan:%" PRId32 ", no db exist, set needSave:%s", scanId, dbname);
×
720
  }
721

722
  if (!needSave) {
384✔
723
    mDebug("scan:%" PRId32 ", no need to save", scanId);
192✔
724
    TAOS_RETURN(code);
192✔
725
  }
726

727
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-scan-progress");
192✔
728
  if (pTrans == NULL) {
192✔
729
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
730
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
731
    if (terrno != 0) code = terrno;
×
732
    TAOS_RETURN(code);
×
733
  }
734
  mInfo("scan:%d, trans:%d, used to update scan progress.", scanId, pTrans->id);
192✔
735

736
  mndTransSetDbName(pTrans, dbname, NULL);
192✔
737

738
  pIter = NULL;
192✔
739
  while (1) {
288✔
740
    SScanDetailObj *pDetail = NULL;
480✔
741
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
480✔
742
    if (pIter == NULL) break;
480✔
743

744
    if (pDetail->scanId == scanId) {
288✔
745
      mInfo(
240✔
746
          "scan:%d, trans:%d, check scan progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
747
          "newNumberFileset:%d, newFinished:%d",
748
          pDetail->scanId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
749
          pDetail->newNumberFileset, pDetail->newFinished);
750

751
      pDetail->numberFileset = pDetail->newNumberFileset;
240✔
752
      pDetail->finished = pDetail->newFinished;
240✔
753

754
      SSdbRaw *pCommitRaw = mndScanDetailActionEncode(pDetail);
240✔
755
      if (pCommitRaw == NULL) {
240✔
756
        sdbCancelFetch(pMnode->pSdb, pIter);
×
757
        sdbRelease(pMnode->pSdb, pDetail);
×
758
        mndTransDrop(pTrans);
×
759
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
760
        if (terrno != 0) code = terrno;
×
761
        TAOS_RETURN(code);
×
762
      }
763
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
240✔
764
        mError("scan:%d, trans:%d, failed to append commit log since %s", pDetail->scanId, pTrans->id, terrstr());
×
765
        sdbCancelFetch(pMnode->pSdb, pIter);
×
766
        sdbRelease(pMnode->pSdb, pDetail);
×
767
        mndTransDrop(pTrans);
×
768
        TAOS_RETURN(code);
×
769
      }
770
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
240✔
771
        sdbCancelFetch(pMnode->pSdb, pIter);
×
772
        sdbRelease(pMnode->pSdb, pDetail);
×
773
        mndTransDrop(pTrans);
×
774
        TAOS_RETURN(code);
×
775
      }
776
    }
777

778
    sdbRelease(pMnode->pSdb, pDetail);
288✔
779
  }
780

781
  bool allFinished = true;
192✔
782
  pIter = NULL;
192✔
783
  while (1) {
288✔
784
    SScanDetailObj *pDetail = NULL;
480✔
785
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
480✔
786
    if (pIter == NULL) break;
480✔
787

788
    if (pDetail->scanId == scanId) {
288✔
789
      mInfo("scan:%d, trans:%d, check scan finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
240✔
790
            pDetail->scanId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
791

792
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
240✔
793
        allFinished = false;
×
794
        sdbCancelFetch(pMnode->pSdb, pIter);
×
795
        sdbRelease(pMnode->pSdb, pDetail);
×
796
        break;
×
797
      }
798
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
240✔
799
        allFinished = false;
×
800
        sdbCancelFetch(pMnode->pSdb, pIter);
×
801
        sdbRelease(pMnode->pSdb, pDetail);
×
802
        break;
×
803
      }
804
    }
805

806
    sdbRelease(pMnode->pSdb, pDetail);
288✔
807
  }
808

809
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
192✔
810
    allFinished = true;
×
811
    mWarn("scan:%" PRId32 ", no db exist, set all finished:%s", scanId, dbname);
×
812
  }
813

814
  if (allFinished) {
192✔
815
    mInfo("scan:%d, all finished", scanId);
192✔
816
    pIter = NULL;
192✔
817
    while (1) {
288✔
818
      SScanDetailObj *pDetail = NULL;
480✔
819
      pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
480✔
820
      if (pIter == NULL) break;
480✔
821

822
      if (pDetail->scanId == scanId) {
288✔
823
        SSdbRaw *pCommitRaw = mndScanDetailActionEncode(pDetail);
240✔
824
        if (pCommitRaw == NULL) {
240✔
825
          mndTransDrop(pTrans);
×
826
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
827
          if (terrno != 0) code = terrno;
×
828
          TAOS_RETURN(code);
×
829
        }
830
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
240✔
831
          mError("scan:%d, trans:%d, failed to append commit log since %s", pDetail->scanId, pTrans->id, terrstr());
×
832
          sdbCancelFetch(pMnode->pSdb, pIter);
×
833
          sdbRelease(pMnode->pSdb, pDetail);
×
834
          mndTransDrop(pTrans);
×
835
          TAOS_RETURN(code);
×
836
        }
837
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
240✔
838
          sdbCancelFetch(pMnode->pSdb, pIter);
×
839
          sdbRelease(pMnode->pSdb, pDetail);
×
840
          mndTransDrop(pTrans);
×
841
          TAOS_RETURN(code);
×
842
        }
843
        mInfo("scan:%d, add drop scandetail action", pDetail->scanDetailId);
240✔
844
      }
845

846
      sdbRelease(pMnode->pSdb, pDetail);
288✔
847
    }
848

849
    SScanObj *pScan = mndAcquireScan(pMnode, scanId);
192✔
850
    if (pScan == NULL) {
192✔
851
      mndTransDrop(pTrans);
×
852
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
853
      if (terrno != 0) code = terrno;
×
854
      TAOS_RETURN(code);
×
855
    }
856
    SSdbRaw *pCommitRaw = mndScanActionEncode(pScan);
192✔
857
    mndReleaseScan(pMnode, pScan);
192✔
858
    if (pCommitRaw == NULL) {
192✔
859
      mndTransDrop(pTrans);
×
860
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
861
      if (terrno != 0) code = terrno;
×
862
      TAOS_RETURN(code);
×
863
    }
864
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
192✔
865
      mError("scan:%d, trans:%d, failed to append commit log since %s", scanId, pTrans->id, terrstr());
×
866
      mndTransDrop(pTrans);
×
867
      TAOS_RETURN(code);
×
868
    }
869
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
192✔
870
      mError("scan:%d, trans:%d, failed to append commit log since %s", scanId, pTrans->id, terrstr());
×
871
      mndTransDrop(pTrans);
×
872
      TAOS_RETURN(code);
×
873
    }
874
    mInfo("scan:%d, add drop scan action", pScan->scanId);
192✔
875
  }
876

877
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
192✔
878
    mError("scan:%d, trans:%d, failed to prepare since %s", scanId, pTrans->id, terrstr());
×
879
    mndTransDrop(pTrans);
×
880
    TAOS_RETURN(code);
×
881
  }
882

883
  mndTransDrop(pTrans);
192✔
884
  return 0;
192✔
885
}
886

887
static void mndScanPullup(SMnode *pMnode) {
3,321,434✔
888
  int32_t code = 0;
3,321,434✔
889
  SSdb   *pSdb = pMnode->pSdb;
3,321,434✔
890
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_SCAN), sizeof(int32_t));
3,321,434✔
891
  if (pArray == NULL) return;
3,321,434✔
892

893
  void *pIter = NULL;
3,321,434✔
894
  while (1) {
384✔
895
    SScanObj *pScan = NULL;
3,321,818✔
896
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN, pIter, (void **)&pScan);
3,321,818✔
897
    if (pIter == NULL) break;
3,321,818✔
898
    if (taosArrayPush(pArray, &pScan->scanId) == NULL) {
768✔
899
      mError("failed to push scan id:%d into array, but continue pull up", pScan->scanId);
×
900
    }
901
    sdbRelease(pSdb, pScan);
384✔
902
  }
903

904
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
3,321,818✔
905
    mInfo("begin to pull up");
384✔
906
    int32_t  *pScanId = taosArrayGet(pArray, i);
384✔
907
    SScanObj *pScan = mndAcquireScan(pMnode, *pScanId);
384✔
908
    if (pScan != NULL) {
384✔
909
      mInfo("scan:%d, begin to pull up", pScan->scanId);
384✔
910
      mndScanSendProgressReq(pMnode, pScan);
384✔
911
      if ((code = mndSaveScanProgress(pMnode, pScan->scanId)) != 0) {
384✔
912
        mError("scan:%d, failed to save scan progress since %s", pScan->scanId, tstrerror(code));
×
913
      }
914
      mndReleaseScan(pMnode, pScan);
384✔
915
    }
916
  }
917
  taosArrayDestroy(pArray);
3,321,434✔
918
}
919

920
static int32_t mndBuildScanDbRsp(SScanDbRsp *pScanRsp, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) {
192✔
921
  int32_t code = 0;
192✔
922
  int32_t rspLen = tSerializeSScanDbRsp(NULL, 0, pScanRsp);
192✔
923
  void   *pRsp = NULL;
192✔
924
  if (useRpcMalloc) {
192✔
925
    pRsp = rpcMallocCont(rspLen);
×
926
  } else {
927
    pRsp = taosMemoryMalloc(rspLen);
192✔
928
  }
929

930
  if (pRsp == NULL) {
192✔
931
    code = TSDB_CODE_OUT_OF_MEMORY;
×
932
    TAOS_RETURN(code);
×
933
  }
934

935
  (void)tSerializeSScanDbRsp(pRsp, rspLen, pScanRsp);
192✔
936
  *pRspLen = rspLen;
192✔
937
  *ppRsp = pRsp;
192✔
938
  TAOS_RETURN(code);
192✔
939
}
940

941
static int32_t mndSetScanDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t scanTs) {
288✔
942
  int32_t code = 0;
288✔
943
  SDbObj  dbObj = {0};
288✔
944
  memcpy(&dbObj, pDb, sizeof(SDbObj));
288✔
945
  dbObj.scanStartTime = scanTs;
288✔
946

947
  SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj);
288✔
948
  if (pCommitRaw == NULL) {
288✔
949
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
950
    if (terrno != 0) code = terrno;
×
951
    TAOS_RETURN(code);
×
952
  }
953
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
288✔
954
    sdbFreeRaw(pCommitRaw);
×
955
    TAOS_RETURN(code);
×
956
  }
957

958
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
288✔
959
    sdbFreeRaw(pCommitRaw);
×
960
    TAOS_RETURN(code);
×
961
  }
962
  TAOS_RETURN(code);
288✔
963
}
964

965
static void *mndBuildScanVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t scanTs,
240✔
966
                                  STimeWindow tw) {
967
  SScanVnodeReq scanReq = {0};
240✔
968
  scanReq.dbUid = pDb->uid;
240✔
969
  scanReq.scanStartTime = scanTs;
240✔
970
  scanReq.tw = tw;
240✔
971
  tstrncpy(scanReq.db, pDb->name, TSDB_DB_FNAME_LEN);
240✔
972

973
  mInfo("vgId:%d, build scan vnode config req", pVgroup->vgId);
240✔
974
  int32_t contLen = tSerializeSScanVnodeReq(NULL, 0, &scanReq);
240✔
975
  if (contLen < 0) {
240✔
976
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
977
    return NULL;
×
978
  }
979
  contLen += sizeof(SMsgHead);
240✔
980

981
  void *pReq = taosMemoryMalloc(contLen);
240✔
982
  if (pReq == NULL) {
240✔
983
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
984
    return NULL;
×
985
  }
986

987
  SMsgHead *pHead = pReq;
240✔
988
  pHead->contLen = htonl(contLen);
240✔
989
  pHead->vgId = htonl(pVgroup->vgId);
240✔
990

991
  if (tSerializeSScanVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &scanReq) < 0) {
240✔
992
    taosMemoryFree(pReq);
×
993
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
994
    return NULL;
×
995
  }
996
  *pContLen = contLen;
240✔
997
  return pReq;
240✔
998
}
999

1000
static int32_t mndBuildScanVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t scanTs,
240✔
1001
                                        STimeWindow tw) {
1002
  int32_t      code = 0;
240✔
1003
  STransAction action = {0};
240✔
1004
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
240✔
1005

1006
  int32_t contLen = 0;
240✔
1007
  void   *pReq = mndBuildScanVnodeReq(pMnode, pDb, pVgroup, &contLen, scanTs, tw);
240✔
1008
  if (pReq == NULL) {
240✔
1009
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1010
    if (terrno != 0) code = terrno;
×
1011
    TAOS_RETURN(code);
×
1012
  }
1013

1014
  action.pCont = pReq;
240✔
1015
  action.contLen = contLen;
240✔
1016
  action.msgType = TDMT_VND_SCAN;
240✔
1017

1018
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
240✔
1019
    taosMemoryFree(pReq);
×
1020
    TAOS_RETURN(code);
×
1021
  }
1022

1023
  TAOS_RETURN(code);
240✔
1024
}
1025

1026
extern int32_t mndAddScanDetailToTran(SMnode *pMnode, STrans *pTrans, SScanObj *pScan, SVgObj *pVgroup,
1027
                                      SVnodeGid *pVgid, int32_t index);
1028

1029
static int32_t mndSetScanDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t scanTs, STimeWindow tw,
288✔
1030
                                       SArray *vgroupIds, SScanDbRsp *pScanRsp) {
1031
  int32_t code = 0;
288✔
1032
  SSdb   *pSdb = pMnode->pSdb;
288✔
1033
  void   *pIter = NULL;
288✔
1034

1035
  SScanObj scan;
288✔
1036
  if ((code = mndAddScanToTran(pMnode, pTrans, &scan, pDb, pScanRsp)) != 0) {
288✔
1037
    TAOS_RETURN(code);
×
1038
  }
1039

1040
  int32_t j = 0;
288✔
1041
  int32_t numOfVgroups = taosArrayGetSize(vgroupIds);
288✔
1042
  if (numOfVgroups > 0) {
288✔
1043
    for (int32_t i = 0; i < numOfVgroups; i++) {
336✔
1044
      int64_t vgId = *(int64_t *)taosArrayGet(vgroupIds, i);
240✔
1045
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
240✔
1046

1047
      if (pVgroup == NULL) {
240✔
1048
        mError("db:%s, vgroup:%" PRId64 " not exist", pDb->name, vgId);
×
1049
        TAOS_RETURN(TSDB_CODE_MND_VGROUP_NOT_EXIST);
×
1050
      } else if (pVgroup->dbUid != pDb->uid) {
240✔
1051
        mError("db:%s, vgroup:%" PRId64 " not belong to db:%s", pDb->name, vgId, pDb->name);
96✔
1052
        sdbRelease(pSdb, pVgroup);
96✔
1053
        TAOS_RETURN(TSDB_CODE_MND_VGROUP_NOT_EXIST);
96✔
1054
      }
1055
      sdbRelease(pSdb, pVgroup);
144✔
1056
    }
1057

1058
    for (int32_t i = 0; i < numOfVgroups; i++) {
240✔
1059
      int64_t vgId = *(int64_t *)taosArrayGet(vgroupIds, i);
144✔
1060
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
144✔
1061

1062
      if ((code = mndBuildScanVgroupAction(pMnode, pTrans, pDb, pVgroup, scanTs, tw)) != 0) {
144✔
1063
        sdbRelease(pSdb, pVgroup);
×
1064
        TAOS_RETURN(code);
×
1065
      }
1066

1067
      for (int32_t i = 0; i < pVgroup->replica; i++) {
288✔
1068
        SVnodeGid *gid = &pVgroup->vnodeGid[i];
144✔
1069
        if ((code = mndAddScanDetailToTran(pMnode, pTrans, &scan, pVgroup, gid, j)) != 0) {
144✔
1070
          sdbRelease(pSdb, pVgroup);
×
1071
          TAOS_RETURN(code);
×
1072
        }
1073
        j++;
144✔
1074
      }
1075
      sdbRelease(pSdb, pVgroup);
144✔
1076
    }
1077
  } else {
1078
    while (1) {
192✔
1079
      SVgObj *pVgroup = NULL;
288✔
1080
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
288✔
1081
      if (pIter == NULL) break;
288✔
1082

1083
      if (pVgroup->dbUid == pDb->uid) {
192✔
1084
        if ((code = mndBuildScanVgroupAction(pMnode, pTrans, pDb, pVgroup, scanTs, tw)) != 0) {
96✔
1085
          sdbCancelFetch(pSdb, pIter);
×
1086
          sdbRelease(pSdb, pVgroup);
×
1087
          TAOS_RETURN(code);
×
1088
        }
1089

1090
        for (int32_t i = 0; i < pVgroup->replica; i++) {
192✔
1091
          SVnodeGid *gid = &pVgroup->vnodeGid[i];
96✔
1092
          if ((code = mndAddScanDetailToTran(pMnode, pTrans, &scan, pVgroup, gid, j)) != 0) {
96✔
1093
            sdbCancelFetch(pSdb, pIter);
×
1094
            sdbRelease(pSdb, pVgroup);
×
1095
            TAOS_RETURN(code);
×
1096
          }
1097
          j++;
96✔
1098
        }
1099
      }
1100

1101
      sdbRelease(pSdb, pVgroup);
192✔
1102
    }
1103
  }
1104

1105
  TAOS_RETURN(code);
192✔
1106
}
1107

1108
static int32_t mndScanDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds) {
288✔
1109
  int32_t    code = 0;
288✔
1110
  int32_t    lino;
1111
  SScanDbRsp scanRsp = {0};
288✔
1112

1113
  bool  isExist = false;
288✔
1114
  void *pIter = NULL;
288✔
1115
  while (1) {
48✔
1116
    SScanObj *pScan = NULL;
336✔
1117
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN, pIter, (void **)&pScan);
336✔
1118
    if (pIter == NULL) break;
336✔
1119

1120
    if (strcmp(pScan->dbname, pDb->name) == 0) {
48✔
1121
      isExist = true;
×
1122
    }
1123
    sdbRelease(pMnode->pSdb, pScan);
48✔
1124
  }
1125
  if (isExist) {
288✔
1126
    mInfo("scan db:%s already exist", pDb->name);
×
1127

1128
    if (pReq) {
×
1129
      int32_t rspLen = 0;
×
1130
      void   *pRsp = NULL;
×
1131
      scanRsp.scanId = 0;
×
1132
      scanRsp.bAccepted = false;
×
1133
      code = mndBuildScanDbRsp(&scanRsp, &rspLen, &pRsp, true);
×
1134
      TSDB_CHECK_CODE(code, lino, _OVER);
×
1135

1136
      pReq->info.rsp = pRsp;
×
1137
      pReq->info.rspLen = rspLen;
×
1138
    }
1139

1140
    return TSDB_CODE_MND_SCAN_ALREADY_EXIST;
×
1141
  }
1142

1143
  int64_t scanTs = taosGetTimestampMs();
288✔
1144
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "scan-db");
288✔
1145
  if (pTrans == NULL) goto _OVER;
288✔
1146

1147
  mInfo("trans:%d, used to scan db:%s", pTrans->id, pDb->name);
288✔
1148
  mndTransSetDbName(pTrans, pDb->name, NULL);
288✔
1149
  code = mndTransCheckConflict(pMnode, pTrans);
288✔
1150
  TSDB_CHECK_CODE(code, lino, _OVER);
288✔
1151

1152
  code = mndSetScanDbCommitLogs(pMnode, pTrans, pDb, scanTs);
288✔
1153
  TSDB_CHECK_CODE(code, lino, _OVER);
288✔
1154

1155
  code = mndSetScanDbRedoActions(pMnode, pTrans, pDb, scanTs, tw, vgroupIds, &scanRsp);
288✔
1156
  TSDB_CHECK_CODE(code, lino, _OVER);
288✔
1157

1158
  if (pReq) {
192✔
1159
    int32_t rspLen = 0;
192✔
1160
    void   *pRsp = NULL;
192✔
1161
    scanRsp.bAccepted = true;
192✔
1162
    code = mndBuildScanDbRsp(&scanRsp, &rspLen, &pRsp, false);
192✔
1163
    TSDB_CHECK_CODE(code, lino, _OVER);
192✔
1164
    mndTransSetRpcRsp(pTrans, pRsp, rspLen);
192✔
1165
  }
1166

1167
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
192✔
1168
  code = 0;
192✔
1169

1170
_OVER:
288✔
1171
  mndTransDrop(pTrans);
288✔
1172
  TAOS_RETURN(code);
288✔
1173
}
1174

1175
static int32_t mndProcessScanTimer(SRpcMsg *pReq) {
3,321,434✔
1176
  mTrace("start to process scan timer");
3,321,434✔
1177
  mndScanPullup(pReq->info.node);
3,321,434✔
1178
  return 0;
3,321,434✔
1179
}
1180

1181
int32_t mndProcessScanDbReq(SRpcMsg *pReq) {
288✔
1182
  SMnode    *pMnode = pReq->info.node;
288✔
1183
  int32_t    code = -1;
288✔
1184
  SDbObj    *pDb = NULL;
288✔
1185
  SScanDbReq scanReq = {0};
288✔
1186

1187
  if (tDeserializeSScanDbReq(pReq->pCont, pReq->contLen, &scanReq) != 0) {
288✔
1188
    code = TSDB_CODE_INVALID_MSG;
×
1189
    goto _OVER;
×
1190
  }
1191

1192
  mInfo("db:%s, start to scan", scanReq.db);
288✔
1193

1194
  pDb = mndAcquireDb(pMnode, scanReq.db);
288✔
1195
  if (pDb == NULL) {
288✔
1196
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1197
    if (terrno != 0) code = terrno;
×
1198
    goto _OVER;
×
1199
  }
1200

1201
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_SCAN_DB, pDb), NULL, _OVER);
288✔
1202

1203
  code = mndScanDb(pMnode, pReq, pDb, scanReq.timeRange, scanReq.vgroupIds);
288✔
1204
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
288✔
1205

1206
_OVER:
288✔
1207
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
288✔
1208
    mError("db:%s, failed to process scan db req since %s", scanReq.db, terrstr());
96✔
1209
  }
1210

1211
  mndReleaseDb(pMnode, pDb);
288✔
1212
  tFreeSScanDbReq(&scanReq);
288✔
1213
  TAOS_RETURN(code);
288✔
1214
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc