• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4905

29 Dec 2025 02:08PM UTC coverage: 65.423% (-0.3%) from 65.734%
#4905

push

travis-ci

web-flow
enh: sign connect request (#34067)

23 of 29 new or added lines in 4 files covered. (79.31%)

11614 existing lines in 186 files now uncovered.

193476 of 295730 relevant lines covered (65.42%)

115752566.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.9
/source/dnode/mnode/impl/src/mndScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndScan.h"
16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndDnode.h"
19
#include "mndPrivilege.h"
20
#include "mndScan.h"
21
#include "mndScanDetail.h"
22
#include "mndShow.h"
23
#include "mndTrans.h"
24
#include "mndUser.h"
25
#include "mndVgroup.h"
26
#include "tmisce.h"
27
#include "tmsgcb.h"
28

29
#define MND_SCAN_VER_NUMBER 1
30
#define MND_SCAN_ID_LEN     11
31

32
static int32_t  mndProcessScanTimer(SRpcMsg *pReq);
33
static SSdbRaw *mndScanActionEncode(SScanObj *pScan);
34
static SSdbRow *mndScanActionDecode(SSdbRaw *pRaw);
35
static int32_t  mndScanActionInsert(SSdb *pSdb, SScanObj *pScan);
36
static int32_t  mndScanActionUpdate(SSdb *pSdb, SScanObj *pOldScan, SScanObj *pNewScan);
37
static int32_t  mndScanActionDelete(SSdb *pSdb, SScanObj *pScan);
38
static int32_t  mndRetrieveScan(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
39
static int32_t  mndProcessKillScanReq(SRpcMsg *pReq);
40
static int32_t  mndProcessQueryScanRsp(SRpcMsg *pReq);
41

42
int32_t mndInitScan(SMnode *pMnode) {
385,292✔
43
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SCAN, mndRetrieveScan);
385,292✔
44
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SCAN, mndProcessKillScanReq);
385,292✔
45
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SCAN_PROGRESS_RSP, mndProcessQueryScanRsp);
385,292✔
46
  mndSetMsgHandle(pMnode, TDMT_MND_SCAN_TIMER, mndProcessScanTimer);
385,292✔
47
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SCAN_RSP, mndTransProcessRsp);
385,292✔
48

49
  SSdbTable table = {
385,292✔
50
      .sdbType = SDB_SCAN,
51
      .keyType = SDB_KEY_INT32,
52
      .encodeFp = (SdbEncodeFp)mndScanActionEncode,
53
      .decodeFp = (SdbDecodeFp)mndScanActionDecode,
54
      .insertFp = (SdbInsertFp)mndScanActionInsert,
55
      .updateFp = (SdbUpdateFp)mndScanActionUpdate,
56
      .deleteFp = (SdbDeleteFp)mndScanActionDelete,
57
  };
58

59
  return sdbSetTable(pMnode->pSdb, table);
385,292✔
60
}
61

62
void mndCleanupScan(SMnode *pMnode) { mDebug("mnd scan cleanup"); }
385,229✔
63

64
void tFreeScanObj(SScanObj *pScan) {}
450✔
65

66
static int32_t tSerializeSScanObj(void *buf, int32_t bufLen, const SScanObj *pObj) {
1,080✔
67
  SEncoder encoder = {0};
1,080✔
68
  int32_t  code = 0;
1,080✔
69
  int32_t  lino;
70
  int32_t  tlen;
71
  tEncoderInit(&encoder, buf, bufLen);
1,080✔
72

73
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
1,080✔
74
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->scanId));
2,160✔
75
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
2,160✔
76
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
2,160✔
77
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
2,160✔
78

79
  tEndEncode(&encoder);
1,080✔
80

81
_exit:
1,080✔
82
  if (code) {
1,080✔
UNCOV
83
    tlen = code;
×
84
  } else {
85
    tlen = encoder.pos;
1,080✔
86
  }
87
  tEncoderClear(&encoder);
1,080✔
88
  return tlen;
1,080✔
89
}
90

91
int32_t tDeserializeSScanObj(void *buf, int32_t bufLen, SScanObj *pObj) {
450✔
92
  int32_t  code = 0;
450✔
93
  int32_t  lino;
94
  SDecoder decoder = {0};
450✔
95
  tDecoderInit(&decoder, buf, bufLen);
450✔
96

97
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
450✔
98
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->scanId));
900✔
99
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
450✔
100
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
900✔
101
  if (!tDecodeIsEnd(&decoder)) {
450✔
102
    TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
900✔
103
  } else {
UNCOV
104
    pObj->dbUid = 0;
×
105
  }
106

107
  tEndDecode(&decoder);
450✔
108

109
_exit:
450✔
110
  tDecoderClear(&decoder);
450✔
111
  return code;
450✔
112
}
113

114
static SSdbRaw *mndScanActionEncode(SScanObj *pScan) {
540✔
115
  int32_t code = 0;
540✔
116
  int32_t lino = 0;
540✔
117
  terrno = TSDB_CODE_SUCCESS;
540✔
118

119
  void    *buf = NULL;
540✔
120
  SSdbRaw *pRaw = NULL;
540✔
121

122
  int32_t tlen = tSerializeSScanObj(NULL, 0, pScan);
540✔
123
  if (tlen < 0) {
540✔
124
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
125
    goto OVER;
×
126
  }
127

128
  int32_t size = sizeof(int32_t) + tlen;
540✔
129
  pRaw = sdbAllocRaw(SDB_SCAN, MND_SCAN_VER_NUMBER, size);
540✔
130
  if (pRaw == NULL) {
540✔
131
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
132
    goto OVER;
×
133
  }
134

135
  buf = taosMemoryMalloc(tlen);
540✔
136
  if (buf == NULL) {
540✔
137
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
138
    goto OVER;
×
139
  }
140

141
  tlen = tSerializeSScanObj(buf, tlen, pScan);
540✔
142
  if (tlen < 0) {
540✔
143
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
144
    goto OVER;
×
145
  }
146

147
  int32_t dataPos = 0;
540✔
148
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
540✔
149
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
540✔
150
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
540✔
151

152
OVER:
540✔
153
  taosMemoryFreeClear(buf);
540✔
154
  if (terrno != TSDB_CODE_SUCCESS) {
540✔
155
    mError("scan:%" PRId32 ", failed to encode to raw:%p since %s", pScan->scanId, pRaw, terrstr());
×
156
    sdbFreeRaw(pRaw);
×
UNCOV
157
    return NULL;
×
158
  }
159

160
  mTrace("scan:%" PRId32 ", encode to raw:%p, row:%p", pScan->scanId, pRaw, pScan);
540✔
161
  return pRaw;
540✔
162
}
163

164
static SSdbRow *mndScanActionDecode(SSdbRaw *pRaw) {
450✔
165
  int32_t   code = 0;
450✔
166
  int32_t   lino = 0;
450✔
167
  SSdbRow  *pRow = NULL;
450✔
168
  SScanObj *pScan = NULL;
450✔
169
  void     *buf = NULL;
450✔
170
  terrno = TSDB_CODE_SUCCESS;
450✔
171

172
  int8_t sver = 0;
450✔
173
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
450✔
UNCOV
174
    goto OVER;
×
175
  }
176

177
  if (sver != MND_SCAN_VER_NUMBER) {
450✔
178
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
179
    mError("scan read invalid ver, data ver: %d, curr ver: %d", sver, MND_SCAN_VER_NUMBER);
×
UNCOV
180
    goto OVER;
×
181
  }
182

183
  pRow = sdbAllocRow(sizeof(SScanObj));
450✔
184
  if (pRow == NULL) {
450✔
185
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
186
    goto OVER;
×
187
  }
188

189
  pScan = sdbGetRowObj(pRow);
450✔
190
  if (pScan == NULL) {
450✔
191
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
192
    goto OVER;
×
193
  }
194

195
  int32_t tlen;
450✔
196
  int32_t dataPos = 0;
450✔
197
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
450✔
198
  buf = taosMemoryMalloc(tlen + 1);
450✔
199
  if (buf == NULL) {
450✔
200
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
201
    goto OVER;
×
202
  }
203
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
450✔
204

205
  if ((terrno = tDeserializeSScanObj(buf, tlen, pScan)) < 0) {
450✔
UNCOV
206
    goto OVER;
×
207
  }
208

209
OVER:
450✔
210
  taosMemoryFreeClear(buf);
450✔
211
  if (terrno != TSDB_CODE_SUCCESS) {
450✔
212
    mError("scan:%" PRId32 ", failed to decode from raw:%p since %s", pScan->scanId, pRaw, terrstr());
×
213
    taosMemoryFreeClear(pRow);
×
UNCOV
214
    return NULL;
×
215
  }
216

217
  mTrace("scan:%" PRId32 ", decode from raw:%p, row:%p", pScan->scanId, pRaw, pScan);
450✔
218
  return pRow;
450✔
219
}
220

221
static int32_t mndScanActionInsert(SSdb *pSdb, SScanObj *pScan) {
180✔
222
  mTrace("scan:%" PRId32 ", perform insert action", pScan->scanId);
180✔
223
  return 0;
180✔
224
}
225

226
static int32_t mndScanActionDelete(SSdb *pSdb, SScanObj *pScan) {
450✔
227
  mTrace("scan:%" PRId32 ", perform delete action", pScan->scanId);
450✔
228
  tFreeScanObj(pScan);
450✔
229
  return 0;
450✔
230
}
231

232
static int32_t mndScanActionUpdate(SSdb *pSdb, SScanObj *pOldScan, SScanObj *pNewScan) {
90✔
233
  mTrace("scan:%" PRId32 ", perform update action, old row:%p new row:%p", pOldScan->scanId, pOldScan, pNewScan);
90✔
234

235
  return 0;
90✔
236
}
237

238
static SScanObj *mndAcquireScan(SMnode *pMnode, int64_t scanId) {
900✔
239
  SSdb     *pSdb = pMnode->pSdb;
900✔
240
  SScanObj *pScan = sdbAcquire(pSdb, SDB_SCAN, &scanId);
900✔
241
  if (pScan == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
900✔
UNCOV
242
    terrno = TSDB_CODE_SUCCESS;
×
243
  }
244
  return pScan;
900✔
245
}
246

247
static void mndReleaseScan(SMnode *pMnode, SScanObj *pScan) {
900✔
248
  SSdb *pSdb = pMnode->pSdb;
900✔
249
  sdbRelease(pSdb, pScan);
900✔
250
  pScan = NULL;
900✔
251
}
900✔
252

253
static int32_t mndScanGetDbInfo(SMnode *pMnode, int32_t scanId, char *dbname, int32_t len, int64_t *dbUid) {
315✔
254
  int32_t   code = 0;
315✔
255
  SScanObj *pScan = mndAcquireScan(pMnode, scanId);
315✔
256
  if (pScan == NULL) {
315✔
257
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
258
    if (terrno != 0) code = terrno;
×
UNCOV
259
    TAOS_RETURN(code);
×
260
  }
261

262
  tstrncpy(dbname, pScan->dbname, len);
315✔
263
  if (dbUid) *dbUid = pScan->dbUid;
315✔
264
  mndReleaseScan(pMnode, pScan);
315✔
265
  TAOS_RETURN(code);
315✔
266
}
267

268
// scan db
269
int32_t mndAddScanToTran(SMnode *pMnode, STrans *pTrans, SScanObj *pScan, SDbObj *pDb, SScanDbRsp *rsp) {
270✔
270
  int32_t code = 0;
270✔
271
  pScan->scanId = tGenIdPI32();
270✔
272

273
  tstrncpy(pScan->dbname, pDb->name, sizeof(pScan->dbname));
270✔
274
  pScan->dbUid = pDb->uid;
270✔
275

276
  pScan->startTime = taosGetTimestampMs();
270✔
277

278
  SSdbRaw *pVgRaw = mndScanActionEncode(pScan);
270✔
279
  if (pVgRaw == NULL) {
270✔
280
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
281
    if (terrno != 0) code = terrno;
×
UNCOV
282
    TAOS_RETURN(code);
×
283
  }
284
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
270✔
285
    sdbFreeRaw(pVgRaw);
×
UNCOV
286
    TAOS_RETURN(code);
×
287
  }
288

289
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
270✔
290
    sdbFreeRaw(pVgRaw);
×
UNCOV
291
    TAOS_RETURN(code);
×
292
  }
293

294
  rsp->scanId = pScan->scanId;
270✔
295

296
  return 0;
270✔
297
}
298

299
// retrieve scan
300
static int32_t mndRetrieveScan(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
1,530✔
301
  SMnode   *pMnode = pReq->info.node;
1,530✔
302
  SSdb     *pSdb = pMnode->pSdb;
1,530✔
303
  int32_t   numOfRows = 0;
1,530✔
304
  SScanObj *pScan = NULL;
1,530✔
305
  char     *sep = NULL;
1,530✔
306
  SDbObj   *pDb = NULL;
1,530✔
307
  int32_t   code = 0;
1,530✔
308
  int32_t   lino = 0;
1,530✔
309
  SUserObj *pUser = NULL;
1,530✔
310
  SDbObj   *pIterDb = NULL;
1,530✔
311
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
1,530✔
312
  bool      showAll = false, showIter = false;
1,530✔
313
  int64_t   dbUid = 0;
1,530✔
314

315
  if (strlen(pShow->db) > 0) {
1,530✔
316
    sep = strchr(pShow->db, '.');
×
UNCOV
317
    if (sep &&
×
UNCOV
318
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
UNCOV
319
      sep++;
×
320
    } else {
UNCOV
321
      pDb = mndAcquireDb(pMnode, pShow->db);
×
UNCOV
322
      if (pDb == NULL) return terrno;
×
323
    }
324
  }
325

326
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), PRIV_SHOW_SCANS, PRIV_OBJ_DB, 0, _OVER);
1,530✔
327

328
  while (numOfRows < rows) {
2,970✔
329
    pShow->pIter = sdbFetch(pSdb, SDB_SCAN, pShow->pIter, (void **)&pScan);
2,970✔
330
    if (pShow->pIter == NULL) break;
2,970✔
331

332
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pScan->dbname, pScan, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_SCANS, _OVER);
1,440✔
333

334
    SColumnInfoData *pColInfo;
335
    SName            n;
336
    int32_t          cols = 0;
1,440✔
337

338
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
1,440✔
339

340
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,440✔
341
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pScan->scanId, false), pScan, &lino, _OVER);
1,440✔
342

343
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,440✔
344
    if (pDb != NULL || !IS_SYS_DBNAME(pScan->dbname)) {
2,880✔
345
      SName name = {0};
1,440✔
346
      TAOS_CHECK_GOTO(tNameFromString(&name, pScan->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
1,440✔
347
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
1,440✔
348
    } else {
UNCOV
349
      tstrncpy(varDataVal(tmpBuf), pScan->dbname, TSDB_SHOW_SQL_LEN);
×
350
    }
351
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
1,440✔
352
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pScan, &lino, _OVER);
1,440✔
353

354
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,440✔
355
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pScan->startTime, false), pScan, &lino,
1,440✔
356
                        _OVER);
357

358
    numOfRows++;
1,440✔
359
    sdbRelease(pSdb, pScan);
1,440✔
360
  }
361

362
_OVER:
1,530✔
363
  if (pUser) mndReleaseUser(pMnode, pUser);
1,530✔
364
  mndReleaseDb(pMnode, pDb);
1,530✔
365
  if (code != 0) {
1,530✔
UNCOV
366
    mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
UNCOV
367
    TAOS_RETURN(code);
×
368
  }
369
  pShow->numOfRows += numOfRows;
1,530✔
370
  return numOfRows;
1,530✔
371
}
372

373
// kill scan
374
static void *mndBuildKillScanReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t scanId, int32_t dnodeid) {
90✔
375
  SVKillScanReq req = {0};
90✔
376
  req.scanId = scanId;
90✔
377
  req.vgId = pVgroup->vgId;
90✔
378
  req.dnodeId = dnodeid;
90✔
379
  terrno = 0;
90✔
380

381
  mInfo("vgId:%d, build scan vnode config req", pVgroup->vgId);
90✔
382
  int32_t contLen = tSerializeSVKillScanReq(NULL, 0, &req);
90✔
383
  if (contLen < 0) {
90✔
UNCOV
384
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
385
    return NULL;
×
386
  }
387
  contLen += sizeof(SMsgHead);
90✔
388

389
  void *pReq = taosMemoryMalloc(contLen);
90✔
390
  if (pReq == NULL) {
90✔
UNCOV
391
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
392
    return NULL;
×
393
  }
394

395
  SMsgHead *pHead = pReq;
90✔
396
  pHead->contLen = htonl(contLen);
90✔
397
  pHead->vgId = htonl(pVgroup->vgId);
90✔
398

399
  mTrace("vgId:%d, build scan vnode config req, contLen:%d", pVgroup->vgId, contLen);
90✔
400
  int32_t ret = 0;
90✔
401
  if ((ret = tSerializeSVKillScanReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
90✔
402
    taosMemoryFreeClear(pReq);
×
403
    terrno = ret;
×
404
    return NULL;
×
405
  }
406
  *pContLen = contLen;
90✔
407
  return pReq;
90✔
408
}
409

410
static int32_t mndAddKillScanAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t scanId, int32_t dnodeid) {
90✔
411
  int32_t      code = 0;
90✔
412
  STransAction action = {0};
90✔
413

414
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
90✔
415
  if (pDnode == NULL) {
90✔
UNCOV
416
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
UNCOV
417
    if (terrno != 0) code = terrno;
×
UNCOV
418
    TAOS_RETURN(code);
×
419
  }
420
  action.epSet = mndGetDnodeEpset(pDnode);
90✔
421
  mndReleaseDnode(pMnode, pDnode);
90✔
422

423
  int32_t contLen = 0;
90✔
424
  void   *pReq = mndBuildKillScanReq(pMnode, pVgroup, &contLen, scanId, dnodeid);
90✔
425
  if (pReq == NULL) {
90✔
UNCOV
426
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
UNCOV
427
    if (terrno != 0) code = terrno;
×
UNCOV
428
    TAOS_RETURN(code);
×
429
  }
430

431
  action.pCont = pReq;
90✔
432
  action.contLen = contLen;
90✔
433
  action.msgType = TDMT_VND_KILL_SCAN;
90✔
434

435
  mTrace("trans:%d, kill scan msg len:%d", pTrans->id, contLen);
90✔
436

437
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
90✔
438
    taosMemoryFree(pReq);
×
UNCOV
439
    TAOS_RETURN(code);
×
440
  }
441

442
  return 0;
90✔
443
}
444

445
static int32_t mndKillScan(SMnode *pMnode, SRpcMsg *pReq, SScanObj *pScan) {
90✔
446
  int32_t code = 0;
90✔
447
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-scan");
90✔
448
  if (pTrans == NULL) {
90✔
449
    mError("scan:%" PRId32 ", failed to drop since %s", pScan->scanId, terrstr());
×
UNCOV
450
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
451
    if (terrno != 0) code = terrno;
×
452
    TAOS_RETURN(code);
×
453
  }
454
  mInfo("trans:%d, used to kill scan:%" PRId32, pTrans->id, pScan->scanId);
90✔
455

456
  mndTransSetDbName(pTrans, pScan->dbname, NULL);
90✔
457

458
  SSdbRaw *pCommitRaw = mndScanActionEncode(pScan);
90✔
459
  if (pCommitRaw == NULL) {
90✔
UNCOV
460
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
461
    if (terrno != 0) code = terrno;
×
UNCOV
462
    mndTransDrop(pTrans);
×
UNCOV
463
    TAOS_RETURN(code);
×
464
  }
465
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
90✔
UNCOV
466
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
UNCOV
467
    mndTransDrop(pTrans);
×
UNCOV
468
    TAOS_RETURN(code);
×
469
  }
470
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
90✔
471
    mndTransDrop(pTrans);
×
472
    TAOS_RETURN(code);
×
473
  }
474

475
  void *pIter = NULL;
90✔
476
  while (1) {
180✔
477
    SScanDetailObj *pDetail = NULL;
270✔
478
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
270✔
479
    if (pIter == NULL) break;
270✔
480

481
    if (pDetail->scanId == pScan->scanId) {
180✔
482
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
90✔
483
      if (pVgroup == NULL) {
90✔
484
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
UNCOV
485
        sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
486
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
487
        mndTransDrop(pTrans);
×
UNCOV
488
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
489
        if (terrno != 0) code = terrno;
×
UNCOV
490
        TAOS_RETURN(code);
×
491
      }
492

493
      if ((code = mndAddKillScanAction(pMnode, pTrans, pVgroup, pScan->scanId, pDetail->dnodeId)) != 0) {
90✔
494
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
495
        sdbCancelFetch(pMnode->pSdb, pIter);
×
496
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
497
        mndTransDrop(pTrans);
×
UNCOV
498
        TAOS_RETURN(code);
×
499
      }
500

501
      mndReleaseVgroup(pMnode, pVgroup);
90✔
502
    }
503

504
    sdbRelease(pMnode->pSdb, pDetail);
180✔
505
  }
506

507
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
90✔
UNCOV
508
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
509
    mndTransDrop(pTrans);
×
UNCOV
510
    TAOS_RETURN(code);
×
511
  }
512

513
  mndTransDrop(pTrans);
90✔
514
  return 0;
90✔
515
}
516

517
static int32_t mndProcessKillScanReq(SRpcMsg *pReq) {
90✔
518
  int32_t      code = 0;
90✔
519
  int32_t      lino = 0;
90✔
520
  SKillScanReq killScanReq = {0};
90✔
521

522
  if ((code = tDeserializeSKillScanReq(pReq->pCont, pReq->contLen, &killScanReq)) != 0) {
90✔
UNCOV
523
    TAOS_RETURN(code);
×
524
  }
525

526
  mInfo("start to kill scan:%" PRId32, killScanReq.scanId);
90✔
527

528
  SMnode   *pMnode = pReq->info.node;
90✔
529
  SScanObj *pScan = mndAcquireScan(pMnode, killScanReq.scanId);
90✔
530
  if (pScan == NULL) {
90✔
UNCOV
531
    code = TSDB_CODE_MND_INVALID_SCAN_ID;
×
UNCOV
532
    tFreeSKillScanReq(&killScanReq);
×
UNCOV
533
    TAOS_RETURN(code);
×
534
  }
535

536
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_SCAN_DB), &lino, _OVER);
90✔
537

538
  TAOS_CHECK_GOTO(mndKillScan(pMnode, pReq, pScan), &lino, _OVER);
90✔
539

540
  code = TSDB_CODE_ACTION_IN_PROGRESS;
90✔
541

542
#if 0
543
  char    obj[TSDB_INT32_ID_LEN] = {0};
544
  int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pScan->scanId);
545
  if ((uint32_t)nBytes < sizeof(obj)) {
546
    auditRecord(pReq, pMnode->clusterId, "killScan", pScan->dbname, obj, killScanReq.sql, killScanReq.sqlLen);
547
  } else {
548
    mError("scan:%" PRId32 " failed to audit since %s", pScan->scanId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
549
  }
550
#endif
551
_OVER:
90✔
552
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
90✔
UNCOV
553
    mError("failed to kill scan %" PRId32 " since %s", killScanReq.scanId, terrstr());
×
554
  }
555

556
  tFreeSKillScanReq(&killScanReq);
90✔
557
  mndReleaseScan(pMnode, pScan);
90✔
558

559
  TAOS_RETURN(code);
90✔
560
}
561

562
// update progress
563
static int32_t mndUpdateScanProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t scanId, SQueryScanProgressRsp *rsp) {
360✔
564
  int32_t code = 0;
360✔
565

566
  void *pIter = NULL;
360✔
567
  while (1) {
135✔
568
    SScanDetailObj *pDetail = NULL;
495✔
569
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
495✔
570
    if (pIter == NULL) break;
495✔
571

572
    if (pDetail->scanId == scanId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
495✔
573
      pDetail->newNumberFileset = rsp->numberFileset;
360✔
574
      pDetail->newFinished = rsp->finished;
360✔
575
      pDetail->progress = rsp->progress;
360✔
576
      pDetail->remainingTime = rsp->remainingTime;
360✔
577

578
      sdbCancelFetch(pMnode->pSdb, pIter);
360✔
579
      sdbRelease(pMnode->pSdb, pDetail);
360✔
580

581
      TAOS_RETURN(code);
360✔
582
    }
583

584
    sdbRelease(pMnode->pSdb, pDetail);
135✔
585
  }
586

587
  return TSDB_CODE_MND_SCAN_DETAIL_NOT_EXIST;
×
588
}
589

590
static int32_t mndProcessQueryScanRsp(SRpcMsg *pReq) {
360✔
591
  int32_t               code = 0;
360✔
592
  SQueryScanProgressRsp req = {0};
360✔
593
  if (pReq->code != 0) {
360✔
UNCOV
594
    mError("received wrong scan response, req code is %s", tstrerror(pReq->code));
×
UNCOV
595
    TAOS_RETURN(pReq->code);
×
596
  }
597
  code = tDeserializeSQueryScanProgressRsp(pReq->pCont, pReq->contLen, &req);
360✔
598
  if (code != 0) {
360✔
599
    mError("failed to deserialize vnode-query-scan-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
600
           pReq->contLen);
UNCOV
601
    TAOS_RETURN(code);
×
602
  }
603

604
  mDebug("scan:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.scanId, req.vgId,
360✔
605
         req.dnodeId, req.numberFileset, req.finished);
606

607
  SMnode *pMnode = pReq->info.node;
360✔
608

609
  code = mndUpdateScanProgress(pMnode, pReq, req.scanId, &req);
360✔
610
  if (code != 0) {
360✔
UNCOV
611
    mError("scan:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.scanId,
×
612
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
UNCOV
613
    TAOS_RETURN(code);
×
614
  }
615

616
  TAOS_RETURN(code);
360✔
617
}
618

619
// timer
620
static void mndScanSendProgressReq(SMnode *pMnode, SScanObj *pScan) {
315✔
621
  void *pIter = NULL;
315✔
622

623
  while (1) {
495✔
624
    SScanDetailObj *pDetail = NULL;
810✔
625
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
810✔
626
    if (pIter == NULL) break;
810✔
627

628
    if (pDetail->scanId == pScan->scanId) {
495✔
629
      SEpSet epSet = {0};
360✔
630

631
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
360✔
632
      if (pDnode == NULL) break;
360✔
633
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
360✔
UNCOV
634
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
635
        continue;
×
636
      }
637
      mndReleaseDnode(pMnode, pDnode);
360✔
638

639
      SQueryScanProgressReq req;
360✔
640
      req.scanId = pDetail->scanId;
360✔
641
      req.vgId = pDetail->vgId;
360✔
642
      req.dnodeId = pDetail->dnodeId;
360✔
643

644
      int32_t contLen = tSerializeSQueryScanProgressReq(NULL, 0, &req);
360✔
645
      if (contLen < 0) {
360✔
UNCOV
646
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
647
        continue;
×
648
      }
649

650
      contLen += sizeof(SMsgHead);
360✔
651

652
      SMsgHead *pHead = rpcMallocCont(contLen);
360✔
653
      if (pHead == NULL) {
360✔
UNCOV
654
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
655
        continue;
×
656
      }
657

658
      pHead->contLen = htonl(contLen);
360✔
659
      pHead->vgId = htonl(pDetail->vgId);
360✔
660

661
      if (tSerializeSQueryScanProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
360✔
UNCOV
662
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
663
        continue;
×
664
      }
665

666
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SCAN_PROGRESS, .contLen = contLen};
360✔
667

668
      rpcMsg.pCont = pHead;
360✔
669

670
      char    detail[1024] = {0};
360✔
671
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
720✔
672
                              TMSG_INFO(TDMT_VND_QUERY_SCAN_PROGRESS), epSet.numOfEps, epSet.inUse);
720✔
673
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
720✔
674
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
360✔
675
      }
676

677
      mDebug("scan:%d, send update progress msg to %s", pDetail->scanId, detail);
360✔
678

679
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
360✔
UNCOV
680
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
681
        continue;
×
682
      }
683
    }
684

685
    sdbRelease(pMnode->pSdb, pDetail);
495✔
686
  }
687
}
315✔
688

689
static int32_t mndSaveScanProgress(SMnode *pMnode, int32_t scanId) {
315✔
690
  int32_t code = 0;
315✔
691
  bool    needSave = false;
315✔
692
  void   *pIter = NULL;
315✔
693
  while (1) {
495✔
694
    SScanDetailObj *pDetail = NULL;
810✔
695
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
810✔
696
    if (pIter == NULL) break;
810✔
697

698
    if (pDetail->scanId == scanId) {
495✔
699
      mDebug(
360✔
700
          "scan:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
701
          "newNumberFileset:%d, newFinished:%d",
702
          pDetail->scanId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
703
          pDetail->newNumberFileset, pDetail->newFinished);
704

705
      // these 2 number will jump back after dnode restart, so < is not used here
706
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
360✔
707
        needSave = true;
225✔
708
    }
709

710
    sdbRelease(pMnode->pSdb, pDetail);
495✔
711
  }
712

713
  char    dbname[TSDB_TABLE_FNAME_LEN] = {0};
315✔
714
  int64_t dbUid = 0;
315✔
715
  TAOS_CHECK_RETURN(mndScanGetDbInfo(pMnode, scanId, dbname, TSDB_TABLE_FNAME_LEN, &dbUid));
315✔
716

717
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
315✔
718
    needSave = true;
×
UNCOV
719
    mWarn("scan:%" PRId32 ", no db exist, set needSave:%s", scanId, dbname);
×
720
  }
721

722
  if (!needSave) {
315✔
723
    mDebug("scan:%" PRId32 ", no need to save", scanId);
135✔
724
    TAOS_RETURN(code);
135✔
725
  }
726

727
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-scan-progress");
180✔
728
  if (pTrans == NULL) {
180✔
UNCOV
729
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
UNCOV
730
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
731
    if (terrno != 0) code = terrno;
×
UNCOV
732
    TAOS_RETURN(code);
×
733
  }
734
  mInfo("scan:%d, trans:%d, used to update scan progress.", scanId, pTrans->id);
180✔
735

736
  mndTransSetDbName(pTrans, dbname, NULL);
180✔
737

738
  pIter = NULL;
180✔
739
  while (1) {
270✔
740
    SScanDetailObj *pDetail = NULL;
450✔
741
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
450✔
742
    if (pIter == NULL) break;
450✔
743

744
    if (pDetail->scanId == scanId) {
270✔
745
      mInfo(
225✔
746
          "scan:%d, trans:%d, check scan progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
747
          "newNumberFileset:%d, newFinished:%d",
748
          pDetail->scanId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
749
          pDetail->newNumberFileset, pDetail->newFinished);
750

751
      pDetail->numberFileset = pDetail->newNumberFileset;
225✔
752
      pDetail->finished = pDetail->newFinished;
225✔
753

754
      SSdbRaw *pCommitRaw = mndScanDetailActionEncode(pDetail);
225✔
755
      if (pCommitRaw == NULL) {
225✔
UNCOV
756
        sdbCancelFetch(pMnode->pSdb, pIter);
×
757
        sdbRelease(pMnode->pSdb, pDetail);
×
758
        mndTransDrop(pTrans);
×
759
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
760
        if (terrno != 0) code = terrno;
×
UNCOV
761
        TAOS_RETURN(code);
×
762
      }
763
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
225✔
UNCOV
764
        mError("scan:%d, trans:%d, failed to append commit log since %s", pDetail->scanId, pTrans->id, terrstr());
×
UNCOV
765
        sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
766
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
767
        mndTransDrop(pTrans);
×
UNCOV
768
        TAOS_RETURN(code);
×
769
      }
770
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
225✔
UNCOV
771
        sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
772
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
773
        mndTransDrop(pTrans);
×
UNCOV
774
        TAOS_RETURN(code);
×
775
      }
776
    }
777

778
    sdbRelease(pMnode->pSdb, pDetail);
270✔
779
  }
780

781
  bool allFinished = true;
180✔
782
  pIter = NULL;
180✔
783
  while (1) {
270✔
784
    SScanDetailObj *pDetail = NULL;
450✔
785
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
450✔
786
    if (pIter == NULL) break;
450✔
787

788
    if (pDetail->scanId == scanId) {
270✔
789
      mInfo("scan:%d, trans:%d, check scan finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
225✔
790
            pDetail->scanId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
791

792
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
225✔
UNCOV
793
        allFinished = false;
×
UNCOV
794
        sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
795
        sdbRelease(pMnode->pSdb, pDetail);
×
796
        break;
×
797
      }
798
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
225✔
UNCOV
799
        allFinished = false;
×
UNCOV
800
        sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
801
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
802
        break;
×
803
      }
804
    }
805

806
    sdbRelease(pMnode->pSdb, pDetail);
270✔
807
  }
808

809
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
180✔
UNCOV
810
    allFinished = true;
×
811
    mWarn("scan:%" PRId32 ", no db exist, set all finished:%s", scanId, dbname);
×
812
  }
813

814
  if (allFinished) {
180✔
815
    mInfo("scan:%d, all finished", scanId);
180✔
816
    pIter = NULL;
180✔
817
    while (1) {
270✔
818
      SScanDetailObj *pDetail = NULL;
450✔
819
      pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
450✔
820
      if (pIter == NULL) break;
450✔
821

822
      if (pDetail->scanId == scanId) {
270✔
823
        SSdbRaw *pCommitRaw = mndScanDetailActionEncode(pDetail);
225✔
824
        if (pCommitRaw == NULL) {
225✔
825
          mndTransDrop(pTrans);
×
826
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
827
          if (terrno != 0) code = terrno;
×
UNCOV
828
          TAOS_RETURN(code);
×
829
        }
830
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
225✔
UNCOV
831
          mError("scan:%d, trans:%d, failed to append commit log since %s", pDetail->scanId, pTrans->id, terrstr());
×
UNCOV
832
          sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
833
          sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
834
          mndTransDrop(pTrans);
×
UNCOV
835
          TAOS_RETURN(code);
×
836
        }
837
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
225✔
838
          sdbCancelFetch(pMnode->pSdb, pIter);
×
839
          sdbRelease(pMnode->pSdb, pDetail);
×
840
          mndTransDrop(pTrans);
×
UNCOV
841
          TAOS_RETURN(code);
×
842
        }
843
        mInfo("scan:%d, add drop scandetail action", pDetail->scanDetailId);
225✔
844
      }
845

846
      sdbRelease(pMnode->pSdb, pDetail);
270✔
847
    }
848

849
    SScanObj *pScan = mndAcquireScan(pMnode, scanId);
180✔
850
    if (pScan == NULL) {
180✔
851
      mndTransDrop(pTrans);
×
852
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
853
      if (terrno != 0) code = terrno;
×
UNCOV
854
      TAOS_RETURN(code);
×
855
    }
856
    SSdbRaw *pCommitRaw = mndScanActionEncode(pScan);
180✔
857
    mndReleaseScan(pMnode, pScan);
180✔
858
    if (pCommitRaw == NULL) {
180✔
UNCOV
859
      mndTransDrop(pTrans);
×
UNCOV
860
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
861
      if (terrno != 0) code = terrno;
×
UNCOV
862
      TAOS_RETURN(code);
×
863
    }
864
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
180✔
865
      mError("scan:%d, trans:%d, failed to append commit log since %s", scanId, pTrans->id, terrstr());
×
866
      mndTransDrop(pTrans);
×
UNCOV
867
      TAOS_RETURN(code);
×
868
    }
869
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
180✔
UNCOV
870
      mError("scan:%d, trans:%d, failed to append commit log since %s", scanId, pTrans->id, terrstr());
×
UNCOV
871
      mndTransDrop(pTrans);
×
UNCOV
872
      TAOS_RETURN(code);
×
873
    }
874
    mInfo("scan:%d, add drop scan action", pScan->scanId);
180✔
875
  }
876

877
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
180✔
UNCOV
878
    mError("scan:%d, trans:%d, failed to prepare since %s", scanId, pTrans->id, terrstr());
×
UNCOV
879
    mndTransDrop(pTrans);
×
UNCOV
880
    TAOS_RETURN(code);
×
881
  }
882

883
  mndTransDrop(pTrans);
180✔
884
  return 0;
180✔
885
}
886

887
static void mndScanPullup(SMnode *pMnode) {
2,052,282✔
888
  int32_t code = 0;
2,052,282✔
889
  SSdb   *pSdb = pMnode->pSdb;
2,052,282✔
890
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_SCAN), sizeof(int32_t));
2,052,282✔
891
  if (pArray == NULL) return;
2,052,282✔
892

893
  void *pIter = NULL;
2,052,282✔
894
  while (1) {
315✔
895
    SScanObj *pScan = NULL;
2,052,597✔
896
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN, pIter, (void **)&pScan);
2,052,597✔
897
    if (pIter == NULL) break;
2,052,597✔
898
    if (taosArrayPush(pArray, &pScan->scanId) == NULL) {
630✔
UNCOV
899
      mError("failed to push scan id:%d into array, but continue pull up", pScan->scanId);
×
900
    }
901
    sdbRelease(pSdb, pScan);
315✔
902
  }
903

904
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
2,052,597✔
905
    mInfo("begin to pull up");
315✔
906
    int32_t  *pScanId = taosArrayGet(pArray, i);
315✔
907
    SScanObj *pScan = mndAcquireScan(pMnode, *pScanId);
315✔
908
    if (pScan != NULL) {
315✔
909
      mInfo("scan:%d, begin to pull up", pScan->scanId);
315✔
910
      mndScanSendProgressReq(pMnode, pScan);
315✔
911
      if ((code = mndSaveScanProgress(pMnode, pScan->scanId)) != 0) {
315✔
UNCOV
912
        mError("scan:%d, failed to save scan progress since %s", pScan->scanId, tstrerror(code));
×
913
      }
914
      mndReleaseScan(pMnode, pScan);
315✔
915
    }
916
  }
917
  taosArrayDestroy(pArray);
2,052,282✔
918
}
919

920
static int32_t mndBuildScanDbRsp(SScanDbRsp *pScanRsp, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) {
180✔
921
  int32_t code = 0;
180✔
922
  int32_t rspLen = tSerializeSScanDbRsp(NULL, 0, pScanRsp);
180✔
923
  void   *pRsp = NULL;
180✔
924
  if (useRpcMalloc) {
180✔
UNCOV
925
    pRsp = rpcMallocCont(rspLen);
×
926
  } else {
927
    pRsp = taosMemoryMalloc(rspLen);
180✔
928
  }
929

930
  if (pRsp == NULL) {
180✔
UNCOV
931
    code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
932
    TAOS_RETURN(code);
×
933
  }
934

935
  (void)tSerializeSScanDbRsp(pRsp, rspLen, pScanRsp);
180✔
936
  *pRspLen = rspLen;
180✔
937
  *ppRsp = pRsp;
180✔
938
  TAOS_RETURN(code);
180✔
939
}
940

941
static int32_t mndSetScanDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t scanTs) {
270✔
942
  int32_t code = 0;
270✔
943
  SDbObj  dbObj = {0};
270✔
944
  memcpy(&dbObj, pDb, sizeof(SDbObj));
270✔
945
  dbObj.scanStartTime = scanTs;
270✔
946

947
  SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj);
270✔
948
  if (pCommitRaw == NULL) {
270✔
UNCOV
949
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
950
    if (terrno != 0) code = terrno;
×
UNCOV
951
    TAOS_RETURN(code);
×
952
  }
953
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
270✔
UNCOV
954
    sdbFreeRaw(pCommitRaw);
×
UNCOV
955
    TAOS_RETURN(code);
×
956
  }
957

958
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
270✔
UNCOV
959
    sdbFreeRaw(pCommitRaw);
×
UNCOV
960
    TAOS_RETURN(code);
×
961
  }
962
  TAOS_RETURN(code);
270✔
963
}
964

965
static void *mndBuildScanVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t scanTs,
225✔
966
                                  STimeWindow tw) {
967
  SScanVnodeReq scanReq = {0};
225✔
968
  scanReq.dbUid = pDb->uid;
225✔
969
  scanReq.scanStartTime = scanTs;
225✔
970
  scanReq.tw = tw;
225✔
971
  tstrncpy(scanReq.db, pDb->name, TSDB_DB_FNAME_LEN);
225✔
972

973
  mInfo("vgId:%d, build scan vnode config req", pVgroup->vgId);
225✔
974
  int32_t contLen = tSerializeSScanVnodeReq(NULL, 0, &scanReq);
225✔
975
  if (contLen < 0) {
225✔
UNCOV
976
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
977
    return NULL;
×
978
  }
979
  contLen += sizeof(SMsgHead);
225✔
980

981
  void *pReq = taosMemoryMalloc(contLen);
225✔
982
  if (pReq == NULL) {
225✔
UNCOV
983
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
984
    return NULL;
×
985
  }
986

987
  SMsgHead *pHead = pReq;
225✔
988
  pHead->contLen = htonl(contLen);
225✔
989
  pHead->vgId = htonl(pVgroup->vgId);
225✔
990

991
  if (tSerializeSScanVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &scanReq) < 0) {
225✔
UNCOV
992
    taosMemoryFree(pReq);
×
UNCOV
993
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
994
    return NULL;
×
995
  }
996
  *pContLen = contLen;
225✔
997
  return pReq;
225✔
998
}
999

1000
static int32_t mndBuildScanVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t scanTs,
225✔
1001
                                        STimeWindow tw) {
1002
  int32_t      code = 0;
225✔
1003
  STransAction action = {0};
225✔
1004
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
225✔
1005

1006
  int32_t contLen = 0;
225✔
1007
  void   *pReq = mndBuildScanVnodeReq(pMnode, pDb, pVgroup, &contLen, scanTs, tw);
225✔
1008
  if (pReq == NULL) {
225✔
UNCOV
1009
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1010
    if (terrno != 0) code = terrno;
×
UNCOV
1011
    TAOS_RETURN(code);
×
1012
  }
1013

1014
  action.pCont = pReq;
225✔
1015
  action.contLen = contLen;
225✔
1016
  action.msgType = TDMT_VND_SCAN;
225✔
1017

1018
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
225✔
UNCOV
1019
    taosMemoryFree(pReq);
×
UNCOV
1020
    TAOS_RETURN(code);
×
1021
  }
1022

1023
  TAOS_RETURN(code);
225✔
1024
}
1025

1026
extern int32_t mndAddScanDetailToTran(SMnode *pMnode, STrans *pTrans, SScanObj *pScan, SVgObj *pVgroup,
1027
                                      SVnodeGid *pVgid, int32_t index);
1028

1029
static int32_t mndSetScanDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t scanTs, STimeWindow tw,
270✔
1030
                                       SArray *vgroupIds, SScanDbRsp *pScanRsp) {
1031
  int32_t code = 0;
270✔
1032
  SSdb   *pSdb = pMnode->pSdb;
270✔
1033
  void   *pIter = NULL;
270✔
1034

1035
  SScanObj scan;
270✔
1036
  if ((code = mndAddScanToTran(pMnode, pTrans, &scan, pDb, pScanRsp)) != 0) {
270✔
UNCOV
1037
    TAOS_RETURN(code);
×
1038
  }
1039

1040
  int32_t j = 0;
270✔
1041
  int32_t numOfVgroups = taosArrayGetSize(vgroupIds);
270✔
1042
  if (numOfVgroups > 0) {
270✔
1043
    for (int32_t i = 0; i < numOfVgroups; i++) {
315✔
1044
      int64_t vgId = *(int64_t *)taosArrayGet(vgroupIds, i);
225✔
1045
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
225✔
1046

1047
      if (pVgroup == NULL) {
225✔
UNCOV
1048
        mError("db:%s, vgroup:%" PRId64 " not exist", pDb->name, vgId);
×
1049
        TAOS_RETURN(TSDB_CODE_MND_VGROUP_NOT_EXIST);
×
1050
      } else if (pVgroup->dbUid != pDb->uid) {
225✔
1051
        mError("db:%s, vgroup:%" PRId64 " not belong to db:%s", pDb->name, vgId, pDb->name);
90✔
1052
        sdbRelease(pSdb, pVgroup);
90✔
1053
        TAOS_RETURN(TSDB_CODE_MND_VGROUP_NOT_EXIST);
90✔
1054
      }
1055
      sdbRelease(pSdb, pVgroup);
135✔
1056
    }
1057

1058
    for (int32_t i = 0; i < numOfVgroups; i++) {
225✔
1059
      int64_t vgId = *(int64_t *)taosArrayGet(vgroupIds, i);
135✔
1060
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
135✔
1061

1062
      if ((code = mndBuildScanVgroupAction(pMnode, pTrans, pDb, pVgroup, scanTs, tw)) != 0) {
135✔
UNCOV
1063
        sdbRelease(pSdb, pVgroup);
×
UNCOV
1064
        TAOS_RETURN(code);
×
1065
      }
1066

1067
      for (int32_t i = 0; i < pVgroup->replica; i++) {
270✔
1068
        SVnodeGid *gid = &pVgroup->vnodeGid[i];
135✔
1069
        if ((code = mndAddScanDetailToTran(pMnode, pTrans, &scan, pVgroup, gid, j)) != 0) {
135✔
UNCOV
1070
          sdbRelease(pSdb, pVgroup);
×
1071
          TAOS_RETURN(code);
×
1072
        }
1073
        j++;
135✔
1074
      }
1075
      sdbRelease(pSdb, pVgroup);
135✔
1076
    }
1077
  } else {
1078
    while (1) {
180✔
1079
      SVgObj *pVgroup = NULL;
270✔
1080
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
270✔
1081
      if (pIter == NULL) break;
270✔
1082

1083
      if (pVgroup->dbUid == pDb->uid) {
180✔
1084
        if ((code = mndBuildScanVgroupAction(pMnode, pTrans, pDb, pVgroup, scanTs, tw)) != 0) {
90✔
UNCOV
1085
          sdbCancelFetch(pSdb, pIter);
×
UNCOV
1086
          sdbRelease(pSdb, pVgroup);
×
UNCOV
1087
          TAOS_RETURN(code);
×
1088
        }
1089

1090
        for (int32_t i = 0; i < pVgroup->replica; i++) {
180✔
1091
          SVnodeGid *gid = &pVgroup->vnodeGid[i];
90✔
1092
          if ((code = mndAddScanDetailToTran(pMnode, pTrans, &scan, pVgroup, gid, j)) != 0) {
90✔
UNCOV
1093
            sdbCancelFetch(pSdb, pIter);
×
UNCOV
1094
            sdbRelease(pSdb, pVgroup);
×
UNCOV
1095
            TAOS_RETURN(code);
×
1096
          }
1097
          j++;
90✔
1098
        }
1099
      }
1100

1101
      sdbRelease(pSdb, pVgroup);
180✔
1102
    }
1103
  }
1104

1105
  TAOS_RETURN(code);
180✔
1106
}
1107

1108
static int32_t mndScanDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds) {
270✔
1109
  int32_t    code = 0;
270✔
1110
  int32_t    lino;
1111
  SScanDbRsp scanRsp = {0};
270✔
1112

1113
  bool  isExist = false;
270✔
1114
  void *pIter = NULL;
270✔
1115
  while (1) {
45✔
1116
    SScanObj *pScan = NULL;
315✔
1117
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN, pIter, (void **)&pScan);
315✔
1118
    if (pIter == NULL) break;
315✔
1119

1120
    if (strcmp(pScan->dbname, pDb->name) == 0) {
45✔
UNCOV
1121
      isExist = true;
×
1122
    }
1123
    sdbRelease(pMnode->pSdb, pScan);
45✔
1124
  }
1125
  if (isExist) {
270✔
1126
    mInfo("scan db:%s already exist", pDb->name);
×
1127

UNCOV
1128
    if (pReq) {
×
UNCOV
1129
      int32_t rspLen = 0;
×
UNCOV
1130
      void   *pRsp = NULL;
×
UNCOV
1131
      scanRsp.scanId = 0;
×
UNCOV
1132
      scanRsp.bAccepted = false;
×
UNCOV
1133
      code = mndBuildScanDbRsp(&scanRsp, &rspLen, &pRsp, true);
×
UNCOV
1134
      TSDB_CHECK_CODE(code, lino, _OVER);
×
1135

UNCOV
1136
      pReq->info.rsp = pRsp;
×
UNCOV
1137
      pReq->info.rspLen = rspLen;
×
1138
    }
1139

UNCOV
1140
    return TSDB_CODE_MND_SCAN_ALREADY_EXIST;
×
1141
  }
1142

1143
  int64_t scanTs = taosGetTimestampMs();
270✔
1144
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "scan-db");
270✔
1145
  if (pTrans == NULL) goto _OVER;
270✔
1146

1147
  mInfo("trans:%d, used to scan db:%s", pTrans->id, pDb->name);
270✔
1148
  mndTransSetDbName(pTrans, pDb->name, NULL);
270✔
1149
  code = mndTransCheckConflict(pMnode, pTrans);
270✔
1150
  TSDB_CHECK_CODE(code, lino, _OVER);
270✔
1151

1152
  code = mndSetScanDbCommitLogs(pMnode, pTrans, pDb, scanTs);
270✔
1153
  TSDB_CHECK_CODE(code, lino, _OVER);
270✔
1154

1155
  code = mndSetScanDbRedoActions(pMnode, pTrans, pDb, scanTs, tw, vgroupIds, &scanRsp);
270✔
1156
  TSDB_CHECK_CODE(code, lino, _OVER);
270✔
1157

1158
  if (pReq) {
180✔
1159
    int32_t rspLen = 0;
180✔
1160
    void   *pRsp = NULL;
180✔
1161
    scanRsp.bAccepted = true;
180✔
1162
    code = mndBuildScanDbRsp(&scanRsp, &rspLen, &pRsp, false);
180✔
1163
    TSDB_CHECK_CODE(code, lino, _OVER);
180✔
1164
    mndTransSetRpcRsp(pTrans, pRsp, rspLen);
180✔
1165
  }
1166

1167
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
180✔
1168
  code = 0;
180✔
1169

1170
_OVER:
270✔
1171
  mndTransDrop(pTrans);
270✔
1172
  TAOS_RETURN(code);
270✔
1173
}
1174

1175
static int32_t mndProcessScanTimer(SRpcMsg *pReq) {
2,052,282✔
1176
  mTrace("start to process scan timer");
2,052,282✔
1177
  mndScanPullup(pReq->info.node);
2,052,282✔
1178
  return 0;
2,052,282✔
1179
}
1180

1181
int32_t mndProcessScanDbReq(SRpcMsg *pReq) {
270✔
1182
  SMnode    *pMnode = pReq->info.node;
270✔
1183
  int32_t    code = -1;
270✔
1184
  SDbObj    *pDb = NULL;
270✔
1185
  SScanDbReq scanReq = {0};
270✔
1186

1187
  if (tDeserializeSScanDbReq(pReq->pCont, pReq->contLen, &scanReq) != 0) {
270✔
UNCOV
1188
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1189
    goto _OVER;
×
1190
  }
1191

1192
  mInfo("db:%s, start to scan", scanReq.db);
270✔
1193

1194
  pDb = mndAcquireDb(pMnode, scanReq.db);
270✔
1195
  if (pDb == NULL) {
270✔
UNCOV
1196
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1197
    if (terrno != 0) code = terrno;
×
UNCOV
1198
    goto _OVER;
×
1199
  }
1200

1201
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_SCAN_DB, pDb), NULL, _OVER);
270✔
1202

1203
  code = mndScanDb(pMnode, pReq, pDb, scanReq.timeRange, scanReq.vgroupIds);
270✔
1204
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
270✔
1205

1206
_OVER:
270✔
1207
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
270✔
1208
    mError("db:%s, failed to process scan db req since %s", scanReq.db, terrstr());
90✔
1209
  }
1210

1211
  mndReleaseDb(pMnode, pDb);
270✔
1212
  tFreeSScanDbReq(&scanReq);
270✔
1213
  TAOS_RETURN(code);
270✔
1214
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc