• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4808

16 Oct 2025 11:40AM UTC coverage: 57.938% (-0.6%) from 58.524%
#4808

push

travis-ci

web-flow
fix(tref): increase TSDB_REF_OBJECTS from 100 to 2000 for improved reference handling (#33281)

137662 of 303532 branches covered (45.35%)

Branch coverage included in aggregate %.

209234 of 295200 relevant lines covered (70.88%)

4035326.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.51
/source/dnode/mnode/impl/src/mndScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndScan.h"
16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndDnode.h"
19
#include "mndPrivilege.h"
20
#include "mndScan.h"
21
#include "mndScanDetail.h"
22
#include "mndShow.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "tmisce.h"
26
#include "tmsgcb.h"
27

28
#define MND_SCAN_VER_NUMBER 1
29
#define MND_SCAN_ID_LEN     11
30

31
static int32_t  mndProcessScanTimer(SRpcMsg *pReq);
32
static SSdbRaw *mndScanActionEncode(SScanObj *pScan);
33
static SSdbRow *mndScanActionDecode(SSdbRaw *pRaw);
34
static int32_t  mndScanActionInsert(SSdb *pSdb, SScanObj *pScan);
35
static int32_t  mndScanActionUpdate(SSdb *pSdb, SScanObj *pOldScan, SScanObj *pNewScan);
36
static int32_t  mndScanActionDelete(SSdb *pSdb, SScanObj *pScan);
37
static int32_t  mndRetrieveScan(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
38
static int32_t  mndProcessKillScanReq(SRpcMsg *pReq);
39
static int32_t  mndProcessQueryScanRsp(SRpcMsg *pReq);
40

41
int32_t mndInitScan(SMnode *pMnode) {
1,338✔
42
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SCAN, mndRetrieveScan);
1,338✔
43
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SCAN, mndProcessKillScanReq);
1,338✔
44
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SCAN_PROGRESS_RSP, mndProcessQueryScanRsp);
1,338✔
45
  mndSetMsgHandle(pMnode, TDMT_MND_SCAN_TIMER, mndProcessScanTimer);
1,338✔
46
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SCAN_RSP, mndTransProcessRsp);
1,338✔
47

48
  SSdbTable table = {
1,338✔
49
      .sdbType = SDB_SCAN,
50
      .keyType = SDB_KEY_INT32,
51
      .encodeFp = (SdbEncodeFp)mndScanActionEncode,
52
      .decodeFp = (SdbDecodeFp)mndScanActionDecode,
53
      .insertFp = (SdbInsertFp)mndScanActionInsert,
54
      .updateFp = (SdbUpdateFp)mndScanActionUpdate,
55
      .deleteFp = (SdbDeleteFp)mndScanActionDelete,
56
  };
57

58
  return sdbSetTable(pMnode->pSdb, table);
1,338✔
59
}
60

61
void mndCleanupScan(SMnode *pMnode) { mDebug("mnd scan cleanup"); }
1,338✔
62

63
void tFreeScanObj(SScanObj *pScan) {}
10✔
64

65
static int32_t tSerializeSScanObj(void *buf, int32_t bufLen, const SScanObj *pObj) {
24✔
66
  SEncoder encoder = {0};
24✔
67
  int32_t  code = 0;
24✔
68
  int32_t  lino;
69
  int32_t  tlen;
70
  tEncoderInit(&encoder, buf, bufLen);
24✔
71

72
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
24!
73
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->scanId));
48!
74
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
48!
75
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
48!
76
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
48!
77

78
  tEndEncode(&encoder);
24✔
79

80
_exit:
24✔
81
  if (code) {
24!
82
    tlen = code;
×
83
  } else {
84
    tlen = encoder.pos;
24✔
85
  }
86
  tEncoderClear(&encoder);
24✔
87
  return tlen;
24✔
88
}
89

90
int32_t tDeserializeSScanObj(void *buf, int32_t bufLen, SScanObj *pObj) {
10✔
91
  int32_t  code = 0;
10✔
92
  int32_t  lino;
93
  SDecoder decoder = {0};
10✔
94
  tDecoderInit(&decoder, buf, bufLen);
10✔
95

96
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
10!
97
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->scanId));
20!
98
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
10!
99
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
20!
100
  if (!tDecodeIsEnd(&decoder)) {
10!
101
    TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
20!
102
  } else {
103
    pObj->dbUid = 0;
×
104
  }
105

106
  tEndDecode(&decoder);
10✔
107

108
_exit:
10✔
109
  tDecoderClear(&decoder);
10✔
110
  return code;
10✔
111
}
112

113
static SSdbRaw *mndScanActionEncode(SScanObj *pScan) {
12✔
114
  int32_t code = 0;
12✔
115
  int32_t lino = 0;
12✔
116
  terrno = TSDB_CODE_SUCCESS;
12✔
117

118
  void    *buf = NULL;
12✔
119
  SSdbRaw *pRaw = NULL;
12✔
120

121
  int32_t tlen = tSerializeSScanObj(NULL, 0, pScan);
12✔
122
  if (tlen < 0) {
12!
123
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
124
    goto OVER;
×
125
  }
126

127
  int32_t size = sizeof(int32_t) + tlen;
12✔
128
  pRaw = sdbAllocRaw(SDB_SCAN, MND_SCAN_VER_NUMBER, size);
12✔
129
  if (pRaw == NULL) {
12!
130
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
131
    goto OVER;
×
132
  }
133

134
  buf = taosMemoryMalloc(tlen);
12!
135
  if (buf == NULL) {
12!
136
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
137
    goto OVER;
×
138
  }
139

140
  tlen = tSerializeSScanObj(buf, tlen, pScan);
12✔
141
  if (tlen < 0) {
12!
142
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
143
    goto OVER;
×
144
  }
145

146
  int32_t dataPos = 0;
12✔
147
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
12!
148
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
12!
149
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
12!
150

151
OVER:
12✔
152
  taosMemoryFreeClear(buf);
12!
153
  if (terrno != TSDB_CODE_SUCCESS) {
12!
154
    mError("scan:%" PRId32 ", failed to encode to raw:%p since %s", pScan->scanId, pRaw, terrstr());
×
155
    sdbFreeRaw(pRaw);
×
156
    return NULL;
×
157
  }
158

159
  mTrace("scan:%" PRId32 ", encode to raw:%p, row:%p", pScan->scanId, pRaw, pScan);
12!
160
  return pRaw;
12✔
161
}
162

163
static SSdbRow *mndScanActionDecode(SSdbRaw *pRaw) {
10✔
164
  int32_t   code = 0;
10✔
165
  int32_t   lino = 0;
10✔
166
  SSdbRow  *pRow = NULL;
10✔
167
  SScanObj *pScan = NULL;
10✔
168
  void     *buf = NULL;
10✔
169
  terrno = TSDB_CODE_SUCCESS;
10✔
170

171
  int8_t sver = 0;
10✔
172
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
10!
173
    goto OVER;
×
174
  }
175

176
  if (sver != MND_SCAN_VER_NUMBER) {
10!
177
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
178
    mError("scan read invalid ver, data ver: %d, curr ver: %d", sver, MND_SCAN_VER_NUMBER);
×
179
    goto OVER;
×
180
  }
181

182
  pRow = sdbAllocRow(sizeof(SScanObj));
10✔
183
  if (pRow == NULL) {
10!
184
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
185
    goto OVER;
×
186
  }
187

188
  pScan = sdbGetRowObj(pRow);
10✔
189
  if (pScan == NULL) {
10!
190
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
191
    goto OVER;
×
192
  }
193

194
  int32_t tlen;
195
  int32_t dataPos = 0;
10✔
196
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
10!
197
  buf = taosMemoryMalloc(tlen + 1);
10!
198
  if (buf == NULL) {
10!
199
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
200
    goto OVER;
×
201
  }
202
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
10!
203

204
  if ((terrno = tDeserializeSScanObj(buf, tlen, pScan)) < 0) {
10!
205
    goto OVER;
×
206
  }
207

208
OVER:
10✔
209
  taosMemoryFreeClear(buf);
10!
210
  if (terrno != TSDB_CODE_SUCCESS) {
10!
211
    mError("scan:%" PRId32 ", failed to decode from raw:%p since %s", pScan->scanId, pRaw, terrstr());
×
212
    taosMemoryFreeClear(pRow);
×
213
    return NULL;
×
214
  }
215

216
  mTrace("scan:%" PRId32 ", decode from raw:%p, row:%p", pScan->scanId, pRaw, pScan);
10!
217
  return pRow;
10✔
218
}
219

220
static int32_t mndScanActionInsert(SSdb *pSdb, SScanObj *pScan) {
4✔
221
  mTrace("scan:%" PRId32 ", perform insert action", pScan->scanId);
4!
222
  return 0;
4✔
223
}
224

225
static int32_t mndScanActionDelete(SSdb *pSdb, SScanObj *pScan) {
10✔
226
  mTrace("scan:%" PRId32 ", perform delete action", pScan->scanId);
10!
227
  tFreeScanObj(pScan);
10✔
228
  return 0;
10✔
229
}
230

231
static int32_t mndScanActionUpdate(SSdb *pSdb, SScanObj *pOldScan, SScanObj *pNewScan) {
2✔
232
  mTrace("scan:%" PRId32 ", perform update action, old row:%p new row:%p", pOldScan->scanId, pOldScan, pNewScan);
2!
233

234
  return 0;
2✔
235
}
236

237
static SScanObj *mndAcquireScan(SMnode *pMnode, int64_t scanId) {
22✔
238
  SSdb     *pSdb = pMnode->pSdb;
22✔
239
  SScanObj *pScan = sdbAcquire(pSdb, SDB_SCAN, &scanId);
22✔
240
  if (pScan == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
22!
241
    terrno = TSDB_CODE_SUCCESS;
×
242
  }
243
  return pScan;
22✔
244
}
245

246
static void mndReleaseScan(SMnode *pMnode, SScanObj *pScan) {
22✔
247
  SSdb *pSdb = pMnode->pSdb;
22✔
248
  sdbRelease(pSdb, pScan);
22✔
249
  pScan = NULL;
22✔
250
}
22✔
251

252
static int32_t mndScanGetDbInfo(SMnode *pMnode, int32_t scanId, char *dbname, int32_t len, int64_t *dbUid) {
8✔
253
  int32_t   code = 0;
8✔
254
  SScanObj *pScan = mndAcquireScan(pMnode, scanId);
8✔
255
  if (pScan == NULL) {
8!
256
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
257
    if (terrno != 0) code = terrno;
×
258
    TAOS_RETURN(code);
×
259
  }
260

261
  tstrncpy(dbname, pScan->dbname, len);
8✔
262
  if (dbUid) *dbUid = pScan->dbUid;
8!
263
  mndReleaseScan(pMnode, pScan);
8✔
264
  TAOS_RETURN(code);
8✔
265
}
266

267
// scan db
268
int32_t mndAddScanToTran(SMnode *pMnode, STrans *pTrans, SScanObj *pScan, SDbObj *pDb, SScanDbRsp *rsp) {
6✔
269
  int32_t code = 0;
6✔
270
  pScan->scanId = tGenIdPI32();
6✔
271

272
  tstrncpy(pScan->dbname, pDb->name, sizeof(pScan->dbname));
6✔
273
  pScan->dbUid = pDb->uid;
6✔
274

275
  pScan->startTime = taosGetTimestampMs();
6✔
276

277
  SSdbRaw *pVgRaw = mndScanActionEncode(pScan);
6✔
278
  if (pVgRaw == NULL) {
6!
279
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
280
    if (terrno != 0) code = terrno;
×
281
    TAOS_RETURN(code);
×
282
  }
283
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
6!
284
    sdbFreeRaw(pVgRaw);
×
285
    TAOS_RETURN(code);
×
286
  }
287

288
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
6!
289
    sdbFreeRaw(pVgRaw);
×
290
    TAOS_RETURN(code);
×
291
  }
292

293
  rsp->scanId = pScan->scanId;
6✔
294

295
  return 0;
6✔
296
}
297

298
// retrieve scan
299
static int32_t mndRetrieveScan(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
44✔
300
  SMnode   *pMnode = pReq->info.node;
44✔
301
  SSdb     *pSdb = pMnode->pSdb;
44✔
302
  int32_t   numOfRows = 0;
44✔
303
  SScanObj *pScan = NULL;
44✔
304
  char     *sep = NULL;
44✔
305
  SDbObj   *pDb = NULL;
44✔
306
  int32_t   code = 0;
44✔
307
  int32_t   lino = 0;
44✔
308

309
  if (strlen(pShow->db) > 0) {
44!
310
    sep = strchr(pShow->db, '.');
×
311
    if (sep &&
×
312
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
313
      sep++;
×
314
    } else {
315
      pDb = mndAcquireDb(pMnode, pShow->db);
×
316
      if (pDb == NULL) return terrno;
×
317
    }
318
  }
319

320
  while (numOfRows < rows) {
86!
321
    pShow->pIter = sdbFetch(pSdb, SDB_SCAN, pShow->pIter, (void **)&pScan);
86✔
322
    if (pShow->pIter == NULL) break;
86✔
323

324
    SColumnInfoData *pColInfo;
325
    SName            n;
326
    int32_t          cols = 0;
42✔
327

328
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
42✔
329

330
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
42✔
331
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pScan->scanId, false), pScan, &lino, _OVER);
42!
332

333
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
42✔
334
    if (pDb != NULL || !IS_SYS_DBNAME(pScan->dbname)) {
84!
335
      SName name = {0};
42✔
336
      TAOS_CHECK_GOTO(tNameFromString(&name, pScan->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
42!
337
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
42✔
338
    } else {
339
      tstrncpy(varDataVal(tmpBuf), pScan->dbname, TSDB_SHOW_SQL_LEN);
×
340
    }
341
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
42✔
342
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pScan, &lino, _OVER);
42!
343

344
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
42✔
345
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pScan->startTime, false), pScan, &lino,
42!
346
                        _OVER);
347

348
    numOfRows++;
42✔
349
    sdbRelease(pSdb, pScan);
42✔
350
  }
351

352
_OVER:
×
353
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
44!
354
  pShow->numOfRows += numOfRows;
44✔
355
  mndReleaseDb(pMnode, pDb);
44✔
356
  return numOfRows;
44✔
357
}
358

359
// kill scan
360
static void *mndBuildKillScanReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t scanId, int32_t dnodeid) {
2✔
361
  SVKillScanReq req = {0};
2✔
362
  req.scanId = scanId;
2✔
363
  req.vgId = pVgroup->vgId;
2✔
364
  req.dnodeId = dnodeid;
2✔
365
  terrno = 0;
2✔
366

367
  mInfo("vgId:%d, build scan vnode config req", pVgroup->vgId);
2!
368
  int32_t contLen = tSerializeSVKillScanReq(NULL, 0, &req);
2✔
369
  if (contLen < 0) {
2!
370
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
371
    return NULL;
×
372
  }
373
  contLen += sizeof(SMsgHead);
2✔
374

375
  void *pReq = taosMemoryMalloc(contLen);
2!
376
  if (pReq == NULL) {
2!
377
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
378
    return NULL;
×
379
  }
380

381
  SMsgHead *pHead = pReq;
2✔
382
  pHead->contLen = htonl(contLen);
2✔
383
  pHead->vgId = htonl(pVgroup->vgId);
2✔
384

385
  mTrace("vgId:%d, build scan vnode config req, contLen:%d", pVgroup->vgId, contLen);
2!
386
  int32_t ret = 0;
2✔
387
  if ((ret = tSerializeSVKillScanReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
2!
388
    taosMemoryFreeClear(pReq);
×
389
    terrno = ret;
×
390
    return NULL;
×
391
  }
392
  *pContLen = contLen;
2✔
393
  return pReq;
2✔
394
}
395

396
static int32_t mndAddKillScanAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t scanId, int32_t dnodeid) {
2✔
397
  int32_t      code = 0;
2✔
398
  STransAction action = {0};
2✔
399

400
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
2✔
401
  if (pDnode == NULL) {
2!
402
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
403
    if (terrno != 0) code = terrno;
×
404
    TAOS_RETURN(code);
×
405
  }
406
  action.epSet = mndGetDnodeEpset(pDnode);
2✔
407
  mndReleaseDnode(pMnode, pDnode);
2✔
408

409
  int32_t contLen = 0;
2✔
410
  void   *pReq = mndBuildKillScanReq(pMnode, pVgroup, &contLen, scanId, dnodeid);
2✔
411
  if (pReq == NULL) {
2!
412
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
413
    if (terrno != 0) code = terrno;
×
414
    TAOS_RETURN(code);
×
415
  }
416

417
  action.pCont = pReq;
2✔
418
  action.contLen = contLen;
2✔
419
  action.msgType = TDMT_VND_KILL_SCAN;
2✔
420

421
  mTrace("trans:%d, kill scan msg len:%d", pTrans->id, contLen);
2!
422

423
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2!
424
    taosMemoryFree(pReq);
×
425
    TAOS_RETURN(code);
×
426
  }
427

428
  return 0;
2✔
429
}
430

431
static int32_t mndKillScan(SMnode *pMnode, SRpcMsg *pReq, SScanObj *pScan) {
2✔
432
  int32_t code = 0;
2✔
433
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-scan");
2✔
434
  if (pTrans == NULL) {
2!
435
    mError("scan:%" PRId32 ", failed to drop since %s", pScan->scanId, terrstr());
×
436
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
437
    if (terrno != 0) code = terrno;
×
438
    TAOS_RETURN(code);
×
439
  }
440
  mInfo("trans:%d, used to kill scan:%" PRId32, pTrans->id, pScan->scanId);
2!
441

442
  mndTransSetDbName(pTrans, pScan->dbname, NULL);
2✔
443

444
  SSdbRaw *pCommitRaw = mndScanActionEncode(pScan);
2✔
445
  if (pCommitRaw == NULL) {
2!
446
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
447
    if (terrno != 0) code = terrno;
×
448
    mndTransDrop(pTrans);
×
449
    TAOS_RETURN(code);
×
450
  }
451
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
2!
452
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
453
    mndTransDrop(pTrans);
×
454
    TAOS_RETURN(code);
×
455
  }
456
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
2!
457
    mndTransDrop(pTrans);
×
458
    TAOS_RETURN(code);
×
459
  }
460

461
  void *pIter = NULL;
2✔
462
  while (1) {
4✔
463
    SScanDetailObj *pDetail = NULL;
6✔
464
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
6✔
465
    if (pIter == NULL) break;
6✔
466

467
    if (pDetail->scanId == pScan->scanId) {
4✔
468
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
2✔
469
      if (pVgroup == NULL) {
2!
470
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
471
        sdbCancelFetch(pMnode->pSdb, pIter);
×
472
        sdbRelease(pMnode->pSdb, pDetail);
×
473
        mndTransDrop(pTrans);
×
474
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
475
        if (terrno != 0) code = terrno;
×
476
        TAOS_RETURN(code);
×
477
      }
478

479
      if ((code = mndAddKillScanAction(pMnode, pTrans, pVgroup, pScan->scanId, pDetail->dnodeId)) != 0) {
2!
480
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
481
        sdbCancelFetch(pMnode->pSdb, pIter);
×
482
        sdbRelease(pMnode->pSdb, pDetail);
×
483
        mndTransDrop(pTrans);
×
484
        TAOS_RETURN(code);
×
485
      }
486

487
      mndReleaseVgroup(pMnode, pVgroup);
2✔
488
    }
489

490
    sdbRelease(pMnode->pSdb, pDetail);
4✔
491
  }
492

493
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
2!
494
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
495
    mndTransDrop(pTrans);
×
496
    TAOS_RETURN(code);
×
497
  }
498

499
  mndTransDrop(pTrans);
2✔
500
  return 0;
2✔
501
}
502

503
static int32_t mndProcessKillScanReq(SRpcMsg *pReq) {
2✔
504
  int32_t      code = 0;
2✔
505
  int32_t      lino = 0;
2✔
506
  SKillScanReq killScanReq = {0};
2✔
507

508
  if ((code = tDeserializeSKillScanReq(pReq->pCont, pReq->contLen, &killScanReq)) != 0) {
2!
509
    TAOS_RETURN(code);
×
510
  }
511

512
  mInfo("start to kill scan:%" PRId32, killScanReq.scanId);
2!
513

514
  SMnode   *pMnode = pReq->info.node;
2✔
515
  SScanObj *pScan = mndAcquireScan(pMnode, killScanReq.scanId);
2✔
516
  if (pScan == NULL) {
2!
517
    code = TSDB_CODE_MND_INVALID_SCAN_ID;
×
518
    tFreeSKillScanReq(&killScanReq);
×
519
    TAOS_RETURN(code);
×
520
  }
521

522
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SCAN_DB), &lino, _OVER);
2!
523

524
  TAOS_CHECK_GOTO(mndKillScan(pMnode, pReq, pScan), &lino, _OVER);
2!
525

526
  code = TSDB_CODE_ACTION_IN_PROGRESS;
2✔
527

528
#if 0
529
  char    obj[TSDB_INT32_ID_LEN] = {0};
530
  int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pScan->scanId);
531
  if ((uint32_t)nBytes < sizeof(obj)) {
532
    auditRecord(pReq, pMnode->clusterId, "killScan", pScan->dbname, obj, killScanReq.sql, killScanReq.sqlLen);
533
  } else {
534
    mError("scan:%" PRId32 " failed to audit since %s", pScan->scanId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
535
  }
536
#endif
537
_OVER:
2✔
538
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
539
    mError("failed to kill scan %" PRId32 " since %s", killScanReq.scanId, terrstr());
×
540
  }
541

542
  tFreeSKillScanReq(&killScanReq);
2✔
543
  mndReleaseScan(pMnode, pScan);
2✔
544

545
  TAOS_RETURN(code);
2✔
546
}
547

548
// update progress
549
static int32_t mndUpdateScanProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t scanId, SQueryScanProgressRsp *rsp) {
10✔
550
  int32_t code = 0;
10✔
551

552
  void *pIter = NULL;
10✔
553
  while (1) {
4✔
554
    SScanDetailObj *pDetail = NULL;
14✔
555
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
14✔
556
    if (pIter == NULL) break;
14!
557

558
    if (pDetail->scanId == scanId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
14!
559
      pDetail->newNumberFileset = rsp->numberFileset;
10✔
560
      pDetail->newFinished = rsp->finished;
10✔
561
      pDetail->progress = rsp->progress;
10✔
562
      pDetail->remainingTime = rsp->remainingTime;
10✔
563

564
      sdbCancelFetch(pMnode->pSdb, pIter);
10✔
565
      sdbRelease(pMnode->pSdb, pDetail);
10✔
566

567
      TAOS_RETURN(code);
10✔
568
    }
569

570
    sdbRelease(pMnode->pSdb, pDetail);
4✔
571
  }
572

573
  return TSDB_CODE_MND_SCAN_DETAIL_NOT_EXIST;
×
574
}
575

576
static int32_t mndProcessQueryScanRsp(SRpcMsg *pReq) {
10✔
577
  int32_t               code = 0;
10✔
578
  SQueryScanProgressRsp req = {0};
10✔
579
  if (pReq->code != 0) {
10!
580
    mError("received wrong scan response, req code is %s", tstrerror(pReq->code));
×
581
    TAOS_RETURN(pReq->code);
×
582
  }
583
  code = tDeserializeSQueryScanProgressRsp(pReq->pCont, pReq->contLen, &req);
10✔
584
  if (code != 0) {
10!
585
    mError("failed to deserialize vnode-query-scan-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
586
           pReq->contLen);
587
    TAOS_RETURN(code);
×
588
  }
589

590
  mDebug("scan:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.scanId, req.vgId,
10!
591
         req.dnodeId, req.numberFileset, req.finished);
592

593
  SMnode *pMnode = pReq->info.node;
10✔
594

595
  code = mndUpdateScanProgress(pMnode, pReq, req.scanId, &req);
10✔
596
  if (code != 0) {
10!
597
    mError("scan:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.scanId,
×
598
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
599
    TAOS_RETURN(code);
×
600
  }
601

602
  TAOS_RETURN(code);
10✔
603
}
604

605
// timer
606
static void mndScanSendProgressReq(SMnode *pMnode, SScanObj *pScan) {
8✔
607
  void *pIter = NULL;
8✔
608

609
  while (1) {
13✔
610
    SScanDetailObj *pDetail = NULL;
21✔
611
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
21✔
612
    if (pIter == NULL) break;
21✔
613

614
    if (pDetail->scanId == pScan->scanId) {
13✔
615
      SEpSet epSet = {0};
10✔
616

617
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
10✔
618
      if (pDnode == NULL) break;
10!
619
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
10!
620
        sdbRelease(pMnode->pSdb, pDetail);
×
621
        continue;
×
622
      }
623
      mndReleaseDnode(pMnode, pDnode);
10✔
624

625
      SQueryScanProgressReq req;
626
      req.scanId = pDetail->scanId;
10✔
627
      req.vgId = pDetail->vgId;
10✔
628
      req.dnodeId = pDetail->dnodeId;
10✔
629

630
      int32_t contLen = tSerializeSQueryScanProgressReq(NULL, 0, &req);
10✔
631
      if (contLen < 0) {
10!
632
        sdbRelease(pMnode->pSdb, pDetail);
×
633
        continue;
×
634
      }
635

636
      contLen += sizeof(SMsgHead);
10✔
637

638
      SMsgHead *pHead = rpcMallocCont(contLen);
10✔
639
      if (pHead == NULL) {
10!
640
        sdbRelease(pMnode->pSdb, pDetail);
×
641
        continue;
×
642
      }
643

644
      pHead->contLen = htonl(contLen);
10✔
645
      pHead->vgId = htonl(pDetail->vgId);
10✔
646

647
      if (tSerializeSQueryScanProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
10!
648
        sdbRelease(pMnode->pSdb, pDetail);
×
649
        continue;
×
650
      }
651

652
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SCAN_PROGRESS, .contLen = contLen};
10✔
653

654
      rpcMsg.pCont = pHead;
10✔
655

656
      char    detail[1024] = {0};
10✔
657
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
20!
658
                              TMSG_INFO(TDMT_VND_QUERY_SCAN_PROGRESS), epSet.numOfEps, epSet.inUse);
20✔
659
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
20✔
660
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
10✔
661
      }
662

663
      mDebug("scan:%d, send update progress msg to %s", pDetail->scanId, detail);
10!
664

665
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
10!
666
        sdbRelease(pMnode->pSdb, pDetail);
×
667
        continue;
×
668
      }
669
    }
670

671
    sdbRelease(pMnode->pSdb, pDetail);
13✔
672
  }
673
}
8✔
674

675
static int32_t mndSaveScanProgress(SMnode *pMnode, int32_t scanId) {
8✔
676
  int32_t code = 0;
8✔
677
  bool    needSave = false;
8✔
678
  void   *pIter = NULL;
8✔
679
  while (1) {
13✔
680
    SScanDetailObj *pDetail = NULL;
21✔
681
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
21✔
682
    if (pIter == NULL) break;
21✔
683

684
    if (pDetail->scanId == scanId) {
13✔
685
      mDebug(
10!
686
          "scan:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
687
          "newNumberFileset:%d, newFinished:%d",
688
          pDetail->scanId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
689
          pDetail->newNumberFileset, pDetail->newFinished);
690

691
      // these 2 number will jump back after dnode restart, so < is not used here
692
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
10!
693
        needSave = true;
5✔
694
    }
695

696
    sdbRelease(pMnode->pSdb, pDetail);
13✔
697
  }
698

699
  char    dbname[TSDB_TABLE_FNAME_LEN] = {0};
8✔
700
  int64_t dbUid = 0;
8✔
701
  TAOS_CHECK_RETURN(mndScanGetDbInfo(pMnode, scanId, dbname, TSDB_TABLE_FNAME_LEN, &dbUid));
8!
702

703
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
8!
704
    needSave = true;
×
705
    mWarn("scan:%" PRId32 ", no db exist, set needSave:%s", scanId, dbname);
×
706
  }
707

708
  if (!needSave) {
8✔
709
    mDebug("scan:%" PRId32 ", no need to save", scanId);
4!
710
    TAOS_RETURN(code);
4✔
711
  }
712

713
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-scan-progress");
4✔
714
  if (pTrans == NULL) {
4!
715
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
716
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
717
    if (terrno != 0) code = terrno;
×
718
    TAOS_RETURN(code);
×
719
  }
720
  mInfo("scan:%d, trans:%d, used to update scan progress.", scanId, pTrans->id);
4!
721

722
  mndTransSetDbName(pTrans, dbname, NULL);
4✔
723

724
  pIter = NULL;
4✔
725
  while (1) {
6✔
726
    SScanDetailObj *pDetail = NULL;
10✔
727
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
10✔
728
    if (pIter == NULL) break;
10✔
729

730
    if (pDetail->scanId == scanId) {
6✔
731
      mInfo(
5!
732
          "scan:%d, trans:%d, check scan progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
733
          "newNumberFileset:%d, newFinished:%d",
734
          pDetail->scanId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
735
          pDetail->newNumberFileset, pDetail->newFinished);
736

737
      pDetail->numberFileset = pDetail->newNumberFileset;
5✔
738
      pDetail->finished = pDetail->newFinished;
5✔
739

740
      SSdbRaw *pCommitRaw = mndScanDetailActionEncode(pDetail);
5✔
741
      if (pCommitRaw == NULL) {
5!
742
        sdbCancelFetch(pMnode->pSdb, pIter);
×
743
        sdbRelease(pMnode->pSdb, pDetail);
×
744
        mndTransDrop(pTrans);
×
745
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
746
        if (terrno != 0) code = terrno;
×
747
        TAOS_RETURN(code);
×
748
      }
749
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
5!
750
        mError("scan:%d, trans:%d, failed to append commit log since %s", pDetail->scanId, pTrans->id, terrstr());
×
751
        sdbCancelFetch(pMnode->pSdb, pIter);
×
752
        sdbRelease(pMnode->pSdb, pDetail);
×
753
        mndTransDrop(pTrans);
×
754
        TAOS_RETURN(code);
×
755
      }
756
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
5!
757
        sdbCancelFetch(pMnode->pSdb, pIter);
×
758
        sdbRelease(pMnode->pSdb, pDetail);
×
759
        mndTransDrop(pTrans);
×
760
        TAOS_RETURN(code);
×
761
      }
762
    }
763

764
    sdbRelease(pMnode->pSdb, pDetail);
6✔
765
  }
766

767
  bool allFinished = true;
4✔
768
  pIter = NULL;
4✔
769
  while (1) {
6✔
770
    SScanDetailObj *pDetail = NULL;
10✔
771
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
10✔
772
    if (pIter == NULL) break;
10✔
773

774
    if (pDetail->scanId == scanId) {
6✔
775
      mInfo("scan:%d, trans:%d, check scan finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
5!
776
            pDetail->scanId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
777

778
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
5!
779
        allFinished = false;
×
780
        sdbCancelFetch(pMnode->pSdb, pIter);
×
781
        sdbRelease(pMnode->pSdb, pDetail);
×
782
        break;
×
783
      }
784
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
5!
785
        allFinished = false;
×
786
        sdbCancelFetch(pMnode->pSdb, pIter);
×
787
        sdbRelease(pMnode->pSdb, pDetail);
×
788
        break;
×
789
      }
790
    }
791

792
    sdbRelease(pMnode->pSdb, pDetail);
6✔
793
  }
794

795
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
4!
796
    allFinished = true;
×
797
    mWarn("scan:%" PRId32 ", no db exist, set all finished:%s", scanId, dbname);
×
798
  }
799

800
  if (allFinished) {
4!
801
    mInfo("scan:%d, all finished", scanId);
4!
802
    pIter = NULL;
4✔
803
    while (1) {
6✔
804
      SScanDetailObj *pDetail = NULL;
10✔
805
      pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
10✔
806
      if (pIter == NULL) break;
10✔
807

808
      if (pDetail->scanId == scanId) {
6✔
809
        SSdbRaw *pCommitRaw = mndScanDetailActionEncode(pDetail);
5✔
810
        if (pCommitRaw == NULL) {
5!
811
          mndTransDrop(pTrans);
×
812
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
813
          if (terrno != 0) code = terrno;
×
814
          TAOS_RETURN(code);
×
815
        }
816
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
5!
817
          mError("scan:%d, trans:%d, failed to append commit log since %s", pDetail->scanId, pTrans->id, terrstr());
×
818
          sdbCancelFetch(pMnode->pSdb, pIter);
×
819
          sdbRelease(pMnode->pSdb, pDetail);
×
820
          mndTransDrop(pTrans);
×
821
          TAOS_RETURN(code);
×
822
        }
823
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
5!
824
          sdbCancelFetch(pMnode->pSdb, pIter);
×
825
          sdbRelease(pMnode->pSdb, pDetail);
×
826
          mndTransDrop(pTrans);
×
827
          TAOS_RETURN(code);
×
828
        }
829
        mInfo("scan:%d, add drop scandetail action", pDetail->scanDetailId);
5!
830
      }
831

832
      sdbRelease(pMnode->pSdb, pDetail);
6✔
833
    }
834

835
    SScanObj *pScan = mndAcquireScan(pMnode, scanId);
4✔
836
    if (pScan == NULL) {
4!
837
      mndTransDrop(pTrans);
×
838
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
839
      if (terrno != 0) code = terrno;
×
840
      TAOS_RETURN(code);
×
841
    }
842
    SSdbRaw *pCommitRaw = mndScanActionEncode(pScan);
4✔
843
    mndReleaseScan(pMnode, pScan);
4✔
844
    if (pCommitRaw == NULL) {
4!
845
      mndTransDrop(pTrans);
×
846
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
847
      if (terrno != 0) code = terrno;
×
848
      TAOS_RETURN(code);
×
849
    }
850
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
4!
851
      mError("scan:%d, trans:%d, failed to append commit log since %s", scanId, pTrans->id, terrstr());
×
852
      mndTransDrop(pTrans);
×
853
      TAOS_RETURN(code);
×
854
    }
855
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
4!
856
      mError("scan:%d, trans:%d, failed to append commit log since %s", scanId, pTrans->id, terrstr());
×
857
      mndTransDrop(pTrans);
×
858
      TAOS_RETURN(code);
×
859
    }
860
    mInfo("scan:%d, add drop scan action", pScan->scanId);
4!
861
  }
862

863
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
4!
864
    mError("scan:%d, trans:%d, failed to prepare since %s", scanId, pTrans->id, terrstr());
×
865
    mndTransDrop(pTrans);
×
866
    TAOS_RETURN(code);
×
867
  }
868

869
  mndTransDrop(pTrans);
4✔
870
  return 0;
4✔
871
}
872

873
static void mndScanPullup(SMnode *pMnode) {
3,586✔
874
  int32_t code = 0;
3,586✔
875
  SSdb   *pSdb = pMnode->pSdb;
3,586✔
876
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_SCAN), sizeof(int32_t));
3,586✔
877
  if (pArray == NULL) return;
3,586!
878

879
  void *pIter = NULL;
3,586✔
880
  while (1) {
8✔
881
    SScanObj *pScan = NULL;
3,594✔
882
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN, pIter, (void **)&pScan);
3,594✔
883
    if (pIter == NULL) break;
3,594✔
884
    if (taosArrayPush(pArray, &pScan->scanId) == NULL) {
16!
885
      mError("failed to push scan id:%d into array, but continue pull up", pScan->scanId);
×
886
    }
887
    sdbRelease(pSdb, pScan);
8✔
888
  }
889

890
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
3,594✔
891
    mInfo("begin to pull up");
8!
892
    int32_t  *pScanId = taosArrayGet(pArray, i);
8✔
893
    SScanObj *pScan = mndAcquireScan(pMnode, *pScanId);
8✔
894
    if (pScan != NULL) {
8!
895
      mInfo("scan:%d, begin to pull up", pScan->scanId);
8!
896
      mndScanSendProgressReq(pMnode, pScan);
8✔
897
      if ((code = mndSaveScanProgress(pMnode, pScan->scanId)) != 0) {
8!
898
        mError("scan:%d, failed to save scan progress since %s", pScan->scanId, tstrerror(code));
×
899
      }
900
      mndReleaseScan(pMnode, pScan);
8✔
901
    }
902
  }
903
  taosArrayDestroy(pArray);
3,586✔
904
}
905

906
static int32_t mndBuildScanDbRsp(SScanDbRsp *pScanRsp, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) {
4✔
907
  int32_t code = 0;
4✔
908
  int32_t rspLen = tSerializeSScanDbRsp(NULL, 0, pScanRsp);
4✔
909
  void   *pRsp = NULL;
4✔
910
  if (useRpcMalloc) {
4!
911
    pRsp = rpcMallocCont(rspLen);
×
912
  } else {
913
    pRsp = taosMemoryMalloc(rspLen);
4!
914
  }
915

916
  if (pRsp == NULL) {
4!
917
    code = TSDB_CODE_OUT_OF_MEMORY;
×
918
    TAOS_RETURN(code);
×
919
  }
920

921
  (void)tSerializeSScanDbRsp(pRsp, rspLen, pScanRsp);
4✔
922
  *pRspLen = rspLen;
4✔
923
  *ppRsp = pRsp;
4✔
924
  TAOS_RETURN(code);
4✔
925
}
926

927
static int32_t mndSetScanDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t scanTs) {
6✔
928
  int32_t code = 0;
6✔
929
  SDbObj  dbObj = {0};
6✔
930
  memcpy(&dbObj, pDb, sizeof(SDbObj));
6✔
931
  dbObj.scanStartTime = scanTs;
6✔
932

933
  SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj);
6✔
934
  if (pCommitRaw == NULL) {
6!
935
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
936
    if (terrno != 0) code = terrno;
×
937
    TAOS_RETURN(code);
×
938
  }
939
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
6!
940
    sdbFreeRaw(pCommitRaw);
×
941
    TAOS_RETURN(code);
×
942
  }
943

944
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
6!
945
    sdbFreeRaw(pCommitRaw);
×
946
    TAOS_RETURN(code);
×
947
  }
948
  TAOS_RETURN(code);
6✔
949
}
950

951
static void *mndBuildScanVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t scanTs,
5✔
952
                                  STimeWindow tw) {
953
  SScanVnodeReq scanReq = {0};
5✔
954
  scanReq.dbUid = pDb->uid;
5✔
955
  scanReq.scanStartTime = scanTs;
5✔
956
  scanReq.tw = tw;
5✔
957
  tstrncpy(scanReq.db, pDb->name, TSDB_DB_FNAME_LEN);
5✔
958

959
  mInfo("vgId:%d, build scan vnode config req", pVgroup->vgId);
5!
960
  int32_t contLen = tSerializeSScanVnodeReq(NULL, 0, &scanReq);
5✔
961
  if (contLen < 0) {
5!
962
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
963
    return NULL;
×
964
  }
965
  contLen += sizeof(SMsgHead);
5✔
966

967
  void *pReq = taosMemoryMalloc(contLen);
5!
968
  if (pReq == NULL) {
5!
969
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
970
    return NULL;
×
971
  }
972

973
  SMsgHead *pHead = pReq;
5✔
974
  pHead->contLen = htonl(contLen);
5✔
975
  pHead->vgId = htonl(pVgroup->vgId);
5✔
976

977
  if (tSerializeSScanVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &scanReq) < 0) {
5!
978
    taosMemoryFree(pReq);
×
979
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
980
    return NULL;
×
981
  }
982
  *pContLen = contLen;
5✔
983
  return pReq;
5✔
984
}
985

986
static int32_t mndBuildScanVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t scanTs,
5✔
987
                                        STimeWindow tw) {
988
  int32_t      code = 0;
5✔
989
  STransAction action = {0};
5✔
990
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
5✔
991

992
  int32_t contLen = 0;
5✔
993
  void   *pReq = mndBuildScanVnodeReq(pMnode, pDb, pVgroup, &contLen, scanTs, tw);
5✔
994
  if (pReq == NULL) {
5!
995
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
996
    if (terrno != 0) code = terrno;
×
997
    TAOS_RETURN(code);
×
998
  }
999

1000
  action.pCont = pReq;
5✔
1001
  action.contLen = contLen;
5✔
1002
  action.msgType = TDMT_VND_SCAN;
5✔
1003

1004
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
5!
1005
    taosMemoryFree(pReq);
×
1006
    TAOS_RETURN(code);
×
1007
  }
1008

1009
  TAOS_RETURN(code);
5✔
1010
}
1011

1012
extern int32_t mndAddScanDetailToTran(SMnode *pMnode, STrans *pTrans, SScanObj *pScan, SVgObj *pVgroup,
1013
                                      SVnodeGid *pVgid, int32_t index);
1014

1015
static int32_t mndSetScanDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t scanTs, STimeWindow tw,
6✔
1016
                                       SArray *vgroupIds, SScanDbRsp *pScanRsp) {
1017
  int32_t code = 0;
6✔
1018
  SSdb   *pSdb = pMnode->pSdb;
6✔
1019
  void   *pIter = NULL;
6✔
1020

1021
  SScanObj scan;
1022
  if ((code = mndAddScanToTran(pMnode, pTrans, &scan, pDb, pScanRsp)) != 0) {
6!
1023
    TAOS_RETURN(code);
×
1024
  }
1025

1026
  int32_t j = 0;
6✔
1027
  int32_t numOfVgroups = taosArrayGetSize(vgroupIds);
6✔
1028
  if (numOfVgroups > 0) {
6✔
1029
    for (int32_t i = 0; i < numOfVgroups; i++) {
9✔
1030
      int64_t vgId = *(int64_t *)taosArrayGet(vgroupIds, i);
7✔
1031
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
7✔
1032

1033
      if (pVgroup == NULL) {
7!
1034
        mError("db:%s, vgroup:%" PRId64 " not exist", pDb->name, vgId);
×
1035
        TAOS_RETURN(TSDB_CODE_MND_VGROUP_NOT_EXIST);
×
1036
      } else if (pVgroup->dbUid != pDb->uid) {
7✔
1037
        mError("db:%s, vgroup:%" PRId64 " not belong to db:%s", pDb->name, vgId, pDb->name);
2!
1038
        sdbRelease(pSdb, pVgroup);
2✔
1039
        TAOS_RETURN(TSDB_CODE_MND_VGROUP_NOT_EXIST);
2✔
1040
      }
1041
      sdbRelease(pSdb, pVgroup);
5✔
1042
    }
1043

1044
    for (int32_t i = 0; i < numOfVgroups; i++) {
5✔
1045
      int64_t vgId = *(int64_t *)taosArrayGet(vgroupIds, i);
3✔
1046
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
3✔
1047

1048
      if ((code = mndBuildScanVgroupAction(pMnode, pTrans, pDb, pVgroup, scanTs, tw)) != 0) {
3!
1049
        sdbRelease(pSdb, pVgroup);
×
1050
        TAOS_RETURN(code);
×
1051
      }
1052

1053
      for (int32_t i = 0; i < pVgroup->replica; i++) {
6✔
1054
        SVnodeGid *gid = &pVgroup->vnodeGid[i];
3✔
1055
        if ((code = mndAddScanDetailToTran(pMnode, pTrans, &scan, pVgroup, gid, j)) != 0) {
3!
1056
          sdbRelease(pSdb, pVgroup);
×
1057
          TAOS_RETURN(code);
×
1058
        }
1059
        j++;
3✔
1060
      }
1061
      sdbRelease(pSdb, pVgroup);
3✔
1062
    }
1063
  } else {
1064
    while (1) {
4✔
1065
      SVgObj *pVgroup = NULL;
6✔
1066
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
6✔
1067
      if (pIter == NULL) break;
6✔
1068

1069
      if (pVgroup->dbUid == pDb->uid) {
4✔
1070
        if ((code = mndBuildScanVgroupAction(pMnode, pTrans, pDb, pVgroup, scanTs, tw)) != 0) {
2!
1071
          sdbCancelFetch(pSdb, pIter);
×
1072
          sdbRelease(pSdb, pVgroup);
×
1073
          TAOS_RETURN(code);
×
1074
        }
1075

1076
        for (int32_t i = 0; i < pVgroup->replica; i++) {
4✔
1077
          SVnodeGid *gid = &pVgroup->vnodeGid[i];
2✔
1078
          if ((code = mndAddScanDetailToTran(pMnode, pTrans, &scan, pVgroup, gid, j)) != 0) {
2!
1079
            sdbCancelFetch(pSdb, pIter);
×
1080
            sdbRelease(pSdb, pVgroup);
×
1081
            TAOS_RETURN(code);
×
1082
          }
1083
          j++;
2✔
1084
        }
1085
      }
1086

1087
      sdbRelease(pSdb, pVgroup);
4✔
1088
    }
1089
  }
1090

1091
  TAOS_RETURN(code);
4✔
1092
}
1093

1094
static int32_t mndScanDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds) {
6✔
1095
  int32_t    code = 0;
6✔
1096
  int32_t    lino;
1097
  SScanDbRsp scanRsp = {0};
6✔
1098

1099
  bool  isExist = false;
6✔
1100
  void *pIter = NULL;
6✔
1101
  while (1) {
1✔
1102
    SScanObj *pScan = NULL;
7✔
1103
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN, pIter, (void **)&pScan);
7✔
1104
    if (pIter == NULL) break;
7✔
1105

1106
    if (strcmp(pScan->dbname, pDb->name) == 0) {
1!
1107
      isExist = true;
×
1108
    }
1109
    sdbRelease(pMnode->pSdb, pScan);
1✔
1110
  }
1111
  if (isExist) {
6!
1112
    mInfo("scan db:%s already exist", pDb->name);
×
1113

1114
    if (pReq) {
×
1115
      int32_t rspLen = 0;
×
1116
      void   *pRsp = NULL;
×
1117
      scanRsp.scanId = 0;
×
1118
      scanRsp.bAccepted = false;
×
1119
      code = mndBuildScanDbRsp(&scanRsp, &rspLen, &pRsp, true);
×
1120
      TSDB_CHECK_CODE(code, lino, _OVER);
×
1121

1122
      pReq->info.rsp = pRsp;
×
1123
      pReq->info.rspLen = rspLen;
×
1124
    }
1125

1126
    return TSDB_CODE_MND_SCAN_ALREADY_EXIST;
×
1127
  }
1128

1129
  int64_t scanTs = taosGetTimestampMs();
6✔
1130
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "scan-db");
6✔
1131
  if (pTrans == NULL) goto _OVER;
6!
1132

1133
  mInfo("trans:%d, used to scan db:%s", pTrans->id, pDb->name);
6!
1134
  mndTransSetDbName(pTrans, pDb->name, NULL);
6✔
1135
  code = mndTransCheckConflict(pMnode, pTrans);
6✔
1136
  TSDB_CHECK_CODE(code, lino, _OVER);
6!
1137

1138
  code = mndSetScanDbCommitLogs(pMnode, pTrans, pDb, scanTs);
6✔
1139
  TSDB_CHECK_CODE(code, lino, _OVER);
6!
1140

1141
  code = mndSetScanDbRedoActions(pMnode, pTrans, pDb, scanTs, tw, vgroupIds, &scanRsp);
6✔
1142
  TSDB_CHECK_CODE(code, lino, _OVER);
6✔
1143

1144
  if (pReq) {
4!
1145
    int32_t rspLen = 0;
4✔
1146
    void   *pRsp = NULL;
4✔
1147
    scanRsp.bAccepted = true;
4✔
1148
    code = mndBuildScanDbRsp(&scanRsp, &rspLen, &pRsp, false);
4✔
1149
    TSDB_CHECK_CODE(code, lino, _OVER);
4!
1150
    mndTransSetRpcRsp(pTrans, pRsp, rspLen);
4✔
1151
  }
1152

1153
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
4!
1154
  code = 0;
4✔
1155

1156
_OVER:
6✔
1157
  mndTransDrop(pTrans);
6✔
1158
  TAOS_RETURN(code);
6✔
1159
}
1160

1161
static int32_t mndProcessScanTimer(SRpcMsg *pReq) {
3,586✔
1162
  mTrace("start to process scan timer");
3,586✔
1163
  mndScanPullup(pReq->info.node);
3,586✔
1164
  return 0;
3,586✔
1165
}
1166

1167
int32_t mndProcessScanDbReq(SRpcMsg *pReq) {
6✔
1168
  SMnode    *pMnode = pReq->info.node;
6✔
1169
  int32_t    code = -1;
6✔
1170
  SDbObj    *pDb = NULL;
6✔
1171
  SScanDbReq scanReq = {0};
6✔
1172

1173
  if (tDeserializeSScanDbReq(pReq->pCont, pReq->contLen, &scanReq) != 0) {
6!
1174
    code = TSDB_CODE_INVALID_MSG;
×
1175
    goto _OVER;
×
1176
  }
1177

1178
  mInfo("db:%s, start to scan", scanReq.db);
6!
1179

1180
  pDb = mndAcquireDb(pMnode, scanReq.db);
6✔
1181
  if (pDb == NULL) {
6!
1182
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1183
    if (terrno != 0) code = terrno;
×
1184
    goto _OVER;
×
1185
  }
1186

1187
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SCAN_DB, pDb), NULL, _OVER);
6!
1188

1189
  code = mndScanDb(pMnode, pReq, pDb, scanReq.timeRange, scanReq.vgroupIds);
6✔
1190
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
6✔
1191

1192
_OVER:
2✔
1193
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
6!
1194
    mError("db:%s, failed to process scan db req since %s", scanReq.db, terrstr());
2!
1195
  }
1196

1197
  mndReleaseDb(pMnode, pDb);
6✔
1198
  tFreeSScanDbReq(&scanReq);
6✔
1199
  TAOS_RETURN(code);
6✔
1200
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc