• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4878

11 Dec 2025 02:43AM UTC coverage: 64.569% (-0.02%) from 64.586%
#4878

push

travis-ci

guanshengliang
feat(TS-7270): internal dependence

307 of 617 new or added lines in 24 files covered. (49.76%)

3821 existing lines in 123 files now uncovered.

163630 of 253417 relevant lines covered (64.57%)

107598827.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.78
/source/dnode/mnode/impl/src/mndScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndScan.h"
16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndDnode.h"
19
#include "mndPrivilege.h"
20
#include "mndScan.h"
21
#include "mndScanDetail.h"
22
#include "mndShow.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "tmisce.h"
26
#include "tmsgcb.h"
27

28
#define MND_SCAN_VER_NUMBER 1
29
#define MND_SCAN_ID_LEN     11
30

31
static int32_t  mndProcessScanTimer(SRpcMsg *pReq);
32
static SSdbRaw *mndScanActionEncode(SScanObj *pScan);
33
static SSdbRow *mndScanActionDecode(SSdbRaw *pRaw);
34
static int32_t  mndScanActionInsert(SSdb *pSdb, SScanObj *pScan);
35
static int32_t  mndScanActionUpdate(SSdb *pSdb, SScanObj *pOldScan, SScanObj *pNewScan);
36
static int32_t  mndScanActionDelete(SSdb *pSdb, SScanObj *pScan);
37
static int32_t  mndRetrieveScan(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
38
static int32_t  mndProcessKillScanReq(SRpcMsg *pReq);
39
static int32_t  mndProcessQueryScanRsp(SRpcMsg *pReq);
40

41
int32_t mndInitScan(SMnode *pMnode) {
491,777✔
42
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SCAN, mndRetrieveScan);
491,777✔
43
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SCAN, mndProcessKillScanReq);
491,777✔
44
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SCAN_PROGRESS_RSP, mndProcessQueryScanRsp);
491,777✔
45
  mndSetMsgHandle(pMnode, TDMT_MND_SCAN_TIMER, mndProcessScanTimer);
491,777✔
46
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SCAN_RSP, mndTransProcessRsp);
491,777✔
47

48
  SSdbTable table = {
491,777✔
49
      .sdbType = SDB_SCAN,
50
      .keyType = SDB_KEY_INT32,
51
      .encodeFp = (SdbEncodeFp)mndScanActionEncode,
52
      .decodeFp = (SdbDecodeFp)mndScanActionDecode,
53
      .insertFp = (SdbInsertFp)mndScanActionInsert,
54
      .updateFp = (SdbUpdateFp)mndScanActionUpdate,
55
      .deleteFp = (SdbDeleteFp)mndScanActionDelete,
56
  };
57

58
  return sdbSetTable(pMnode->pSdb, table);
491,777✔
59
}
60

61
void mndCleanupScan(SMnode *pMnode) { mDebug("mnd scan cleanup"); }
491,007✔
62

63
void tFreeScanObj(SScanObj *pScan) {}
1,166✔
64

65
static int32_t tSerializeSScanObj(void *buf, int32_t bufLen, const SScanObj *pObj) {
2,796✔
66
  SEncoder encoder = {0};
2,796✔
67
  int32_t  code = 0;
2,796✔
68
  int32_t  lino;
69
  int32_t  tlen;
70
  tEncoderInit(&encoder, buf, bufLen);
2,796✔
71

72
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2,796✔
73
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->scanId));
5,592✔
74
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
5,592✔
75
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
5,592✔
76
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
5,592✔
77

78
  tEndEncode(&encoder);
2,796✔
79

80
_exit:
2,796✔
81
  if (code) {
2,796✔
82
    tlen = code;
×
83
  } else {
84
    tlen = encoder.pos;
2,796✔
85
  }
86
  tEncoderClear(&encoder);
2,796✔
87
  return tlen;
2,796✔
88
}
89

90
int32_t tDeserializeSScanObj(void *buf, int32_t bufLen, SScanObj *pObj) {
1,166✔
91
  int32_t  code = 0;
1,166✔
92
  int32_t  lino;
93
  SDecoder decoder = {0};
1,166✔
94
  tDecoderInit(&decoder, buf, bufLen);
1,166✔
95

96
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
1,166✔
97
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->scanId));
2,332✔
98
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
1,166✔
99
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
2,332✔
100
  if (!tDecodeIsEnd(&decoder)) {
1,166✔
101
    TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
2,332✔
102
  } else {
103
    pObj->dbUid = 0;
×
104
  }
105

106
  tEndDecode(&decoder);
1,166✔
107

108
_exit:
1,166✔
109
  tDecoderClear(&decoder);
1,166✔
110
  return code;
1,166✔
111
}
112

113
static SSdbRaw *mndScanActionEncode(SScanObj *pScan) {
1,398✔
114
  int32_t code = 0;
1,398✔
115
  int32_t lino = 0;
1,398✔
116
  terrno = TSDB_CODE_SUCCESS;
1,398✔
117

118
  void    *buf = NULL;
1,398✔
119
  SSdbRaw *pRaw = NULL;
1,398✔
120

121
  int32_t tlen = tSerializeSScanObj(NULL, 0, pScan);
1,398✔
122
  if (tlen < 0) {
1,398✔
123
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
124
    goto OVER;
×
125
  }
126

127
  int32_t size = sizeof(int32_t) + tlen;
1,398✔
128
  pRaw = sdbAllocRaw(SDB_SCAN, MND_SCAN_VER_NUMBER, size);
1,398✔
129
  if (pRaw == NULL) {
1,398✔
130
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
131
    goto OVER;
×
132
  }
133

134
  buf = taosMemoryMalloc(tlen);
1,398✔
135
  if (buf == NULL) {
1,398✔
136
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
137
    goto OVER;
×
138
  }
139

140
  tlen = tSerializeSScanObj(buf, tlen, pScan);
1,398✔
141
  if (tlen < 0) {
1,398✔
142
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
143
    goto OVER;
×
144
  }
145

146
  int32_t dataPos = 0;
1,398✔
147
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
1,398✔
148
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
1,398✔
149
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
1,398✔
150

151
OVER:
1,398✔
152
  taosMemoryFreeClear(buf);
1,398✔
153
  if (terrno != TSDB_CODE_SUCCESS) {
1,398✔
154
    mError("scan:%" PRId32 ", failed to encode to raw:%p since %s", pScan->scanId, pRaw, terrstr());
×
155
    sdbFreeRaw(pRaw);
×
156
    return NULL;
×
157
  }
158

159
  mTrace("scan:%" PRId32 ", encode to raw:%p, row:%p", pScan->scanId, pRaw, pScan);
1,398✔
160
  return pRaw;
1,398✔
161
}
162

163
static SSdbRow *mndScanActionDecode(SSdbRaw *pRaw) {
1,166✔
164
  int32_t   code = 0;
1,166✔
165
  int32_t   lino = 0;
1,166✔
166
  SSdbRow  *pRow = NULL;
1,166✔
167
  SScanObj *pScan = NULL;
1,166✔
168
  void     *buf = NULL;
1,166✔
169
  terrno = TSDB_CODE_SUCCESS;
1,166✔
170

171
  int8_t sver = 0;
1,166✔
172
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
1,166✔
173
    goto OVER;
×
174
  }
175

176
  if (sver != MND_SCAN_VER_NUMBER) {
1,166✔
177
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
178
    mError("scan read invalid ver, data ver: %d, curr ver: %d", sver, MND_SCAN_VER_NUMBER);
×
179
    goto OVER;
×
180
  }
181

182
  pRow = sdbAllocRow(sizeof(SScanObj));
1,166✔
183
  if (pRow == NULL) {
1,166✔
184
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
185
    goto OVER;
×
186
  }
187

188
  pScan = sdbGetRowObj(pRow);
1,166✔
189
  if (pScan == NULL) {
1,166✔
190
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
191
    goto OVER;
×
192
  }
193

194
  int32_t tlen;
1,166✔
195
  int32_t dataPos = 0;
1,166✔
196
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
1,166✔
197
  buf = taosMemoryMalloc(tlen + 1);
1,166✔
198
  if (buf == NULL) {
1,166✔
199
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
200
    goto OVER;
×
201
  }
202
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
1,166✔
203

204
  if ((terrno = tDeserializeSScanObj(buf, tlen, pScan)) < 0) {
1,166✔
205
    goto OVER;
×
206
  }
207

208
OVER:
1,166✔
209
  taosMemoryFreeClear(buf);
1,166✔
210
  if (terrno != TSDB_CODE_SUCCESS) {
1,166✔
211
    mError("scan:%" PRId32 ", failed to decode from raw:%p since %s", pScan->scanId, pRaw, terrstr());
×
212
    taosMemoryFreeClear(pRow);
×
213
    return NULL;
×
214
  }
215

216
  mTrace("scan:%" PRId32 ", decode from raw:%p, row:%p", pScan->scanId, pRaw, pScan);
1,166✔
217
  return pRow;
1,166✔
218
}
219

220
static int32_t mndScanActionInsert(SSdb *pSdb, SScanObj *pScan) {
466✔
221
  mTrace("scan:%" PRId32 ", perform insert action", pScan->scanId);
466✔
222
  return 0;
466✔
223
}
224

225
static int32_t mndScanActionDelete(SSdb *pSdb, SScanObj *pScan) {
1,166✔
226
  mTrace("scan:%" PRId32 ", perform delete action", pScan->scanId);
1,166✔
227
  tFreeScanObj(pScan);
1,166✔
228
  return 0;
1,166✔
229
}
230

231
static int32_t mndScanActionUpdate(SSdb *pSdb, SScanObj *pOldScan, SScanObj *pNewScan) {
234✔
232
  mTrace("scan:%" PRId32 ", perform update action, old row:%p new row:%p", pOldScan->scanId, pOldScan, pNewScan);
234✔
233

234
  return 0;
234✔
235
}
236

237
static SScanObj *mndAcquireScan(SMnode *pMnode, int64_t scanId) {
2,564✔
238
  SSdb     *pSdb = pMnode->pSdb;
2,564✔
239
  SScanObj *pScan = sdbAcquire(pSdb, SDB_SCAN, &scanId);
2,564✔
240
  if (pScan == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
2,564✔
241
    terrno = TSDB_CODE_SUCCESS;
×
242
  }
243
  return pScan;
2,564✔
244
}
245

246
static void mndReleaseScan(SMnode *pMnode, SScanObj *pScan) {
2,564✔
247
  SSdb *pSdb = pMnode->pSdb;
2,564✔
248
  sdbRelease(pSdb, pScan);
2,564✔
249
  pScan = NULL;
2,564✔
250
}
2,564✔
251

252
static int32_t mndScanGetDbInfo(SMnode *pMnode, int32_t scanId, char *dbname, int32_t len, int64_t *dbUid) {
932✔
253
  int32_t   code = 0;
932✔
254
  SScanObj *pScan = mndAcquireScan(pMnode, scanId);
932✔
255
  if (pScan == NULL) {
932✔
256
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
257
    if (terrno != 0) code = terrno;
×
258
    TAOS_RETURN(code);
×
259
  }
260

261
  tstrncpy(dbname, pScan->dbname, len);
932✔
262
  if (dbUid) *dbUid = pScan->dbUid;
932✔
263
  mndReleaseScan(pMnode, pScan);
932✔
264
  TAOS_RETURN(code);
932✔
265
}
266

267
// scan db
268
int32_t mndAddScanToTran(SMnode *pMnode, STrans *pTrans, SScanObj *pScan, SDbObj *pDb, SScanDbRsp *rsp) {
698✔
269
  int32_t code = 0;
698✔
270
  pScan->scanId = tGenIdPI32();
698✔
271

272
  tstrncpy(pScan->dbname, pDb->name, sizeof(pScan->dbname));
698✔
273
  pScan->dbUid = pDb->uid;
698✔
274

275
  pScan->startTime = taosGetTimestampMs();
698✔
276

277
  SSdbRaw *pVgRaw = mndScanActionEncode(pScan);
698✔
278
  if (pVgRaw == NULL) {
698✔
279
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
280
    if (terrno != 0) code = terrno;
×
281
    TAOS_RETURN(code);
×
282
  }
283
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
698✔
284
    sdbFreeRaw(pVgRaw);
×
285
    TAOS_RETURN(code);
×
286
  }
287

288
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
698✔
289
    sdbFreeRaw(pVgRaw);
×
290
    TAOS_RETURN(code);
×
291
  }
292

293
  rsp->scanId = pScan->scanId;
698✔
294

295
  return 0;
698✔
296
}
297

298
// retrieve scan
299
static int32_t mndRetrieveScan(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
4,989✔
300
  SMnode   *pMnode = pReq->info.node;
4,989✔
301
  SSdb     *pSdb = pMnode->pSdb;
4,989✔
302
  int32_t   numOfRows = 0;
4,989✔
303
  SScanObj *pScan = NULL;
4,989✔
304
  char     *sep = NULL;
4,989✔
305
  SDbObj   *pDb = NULL;
4,989✔
306
  int32_t   code = 0;
4,989✔
307
  int32_t   lino = 0;
4,989✔
308

309
  if (strlen(pShow->db) > 0) {
4,989✔
310
    sep = strchr(pShow->db, '.');
×
311
    if (sep &&
×
312
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
313
      sep++;
×
314
    } else {
315
      pDb = mndAcquireDb(pMnode, pShow->db);
×
316
      if (pDb == NULL) return terrno;
×
317
    }
318
  }
319

320
  while (numOfRows < rows) {
9,747✔
321
    pShow->pIter = sdbFetch(pSdb, SDB_SCAN, pShow->pIter, (void **)&pScan);
9,747✔
322
    if (pShow->pIter == NULL) break;
9,747✔
323

324
    SColumnInfoData *pColInfo;
325
    SName            n;
326
    int32_t          cols = 0;
4,758✔
327

328
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
4,758✔
329

330
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,758✔
331
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pScan->scanId, false), pScan, &lino, _OVER);
4,758✔
332

333
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,758✔
334
    if (pDb != NULL || !IS_SYS_DBNAME(pScan->dbname)) {
9,516✔
335
      SName name = {0};
4,758✔
336
      TAOS_CHECK_GOTO(tNameFromString(&name, pScan->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
4,758✔
337
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
4,758✔
338
    } else {
339
      tstrncpy(varDataVal(tmpBuf), pScan->dbname, TSDB_SHOW_SQL_LEN);
×
340
    }
341
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
4,758✔
342
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pScan, &lino, _OVER);
4,758✔
343

344
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
4,758✔
345
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pScan->startTime, false), pScan, &lino,
4,758✔
346
                        _OVER);
347

348
    numOfRows++;
4,758✔
349
    sdbRelease(pSdb, pScan);
4,758✔
350
  }
351

352
_OVER:
4,989✔
353
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
4,989✔
354
  pShow->numOfRows += numOfRows;
4,989✔
355
  mndReleaseDb(pMnode, pDb);
4,989✔
356
  return numOfRows;
4,989✔
357
}
358

359
// kill scan
360
static void *mndBuildKillScanReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t scanId, int32_t dnodeid) {
234✔
361
  SVKillScanReq req = {0};
234✔
362
  req.scanId = scanId;
234✔
363
  req.vgId = pVgroup->vgId;
234✔
364
  req.dnodeId = dnodeid;
234✔
365
  terrno = 0;
234✔
366

367
  mInfo("vgId:%d, build scan vnode config req", pVgroup->vgId);
234✔
368
  int32_t contLen = tSerializeSVKillScanReq(NULL, 0, &req);
234✔
369
  if (contLen < 0) {
234✔
370
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
371
    return NULL;
×
372
  }
373
  contLen += sizeof(SMsgHead);
234✔
374

375
  void *pReq = taosMemoryMalloc(contLen);
234✔
376
  if (pReq == NULL) {
234✔
377
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
378
    return NULL;
×
379
  }
380

381
  SMsgHead *pHead = pReq;
234✔
382
  pHead->contLen = htonl(contLen);
234✔
383
  pHead->vgId = htonl(pVgroup->vgId);
234✔
384

385
  mTrace("vgId:%d, build scan vnode config req, contLen:%d", pVgroup->vgId, contLen);
234✔
386
  int32_t ret = 0;
234✔
387
  if ((ret = tSerializeSVKillScanReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
234✔
388
    taosMemoryFreeClear(pReq);
×
389
    terrno = ret;
×
390
    return NULL;
×
391
  }
392
  *pContLen = contLen;
234✔
393
  return pReq;
234✔
394
}
395

396
static int32_t mndAddKillScanAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t scanId, int32_t dnodeid) {
234✔
397
  int32_t      code = 0;
234✔
398
  STransAction action = {0};
234✔
399

400
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
234✔
401
  if (pDnode == NULL) {
234✔
402
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
403
    if (terrno != 0) code = terrno;
×
404
    TAOS_RETURN(code);
×
405
  }
406
  action.epSet = mndGetDnodeEpset(pDnode);
234✔
407
  mndReleaseDnode(pMnode, pDnode);
234✔
408

409
  int32_t contLen = 0;
234✔
410
  void   *pReq = mndBuildKillScanReq(pMnode, pVgroup, &contLen, scanId, dnodeid);
234✔
411
  if (pReq == NULL) {
234✔
412
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
413
    if (terrno != 0) code = terrno;
×
414
    TAOS_RETURN(code);
×
415
  }
416

417
  action.pCont = pReq;
234✔
418
  action.contLen = contLen;
234✔
419
  action.msgType = TDMT_VND_KILL_SCAN;
234✔
420

421
  mTrace("trans:%d, kill scan msg len:%d", pTrans->id, contLen);
234✔
422

423
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
234✔
424
    taosMemoryFree(pReq);
×
425
    TAOS_RETURN(code);
×
426
  }
427

428
  return 0;
234✔
429
}
430

431
static int32_t mndKillScan(SMnode *pMnode, SRpcMsg *pReq, SScanObj *pScan) {
234✔
432
  int32_t code = 0;
234✔
433
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-scan");
234✔
434
  if (pTrans == NULL) {
234✔
435
    mError("scan:%" PRId32 ", failed to drop since %s", pScan->scanId, terrstr());
×
436
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
437
    if (terrno != 0) code = terrno;
×
438
    TAOS_RETURN(code);
×
439
  }
440
  mInfo("trans:%d, used to kill scan:%" PRId32, pTrans->id, pScan->scanId);
234✔
441

442
  mndTransSetDbName(pTrans, pScan->dbname, NULL);
234✔
443

444
  SSdbRaw *pCommitRaw = mndScanActionEncode(pScan);
234✔
445
  if (pCommitRaw == NULL) {
234✔
446
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
447
    if (terrno != 0) code = terrno;
×
448
    mndTransDrop(pTrans);
×
449
    TAOS_RETURN(code);
×
450
  }
451
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
234✔
452
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
453
    mndTransDrop(pTrans);
×
454
    TAOS_RETURN(code);
×
455
  }
456
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
234✔
457
    mndTransDrop(pTrans);
×
458
    TAOS_RETURN(code);
×
459
  }
460

461
  void *pIter = NULL;
234✔
462
  while (1) {
468✔
463
    SScanDetailObj *pDetail = NULL;
702✔
464
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
702✔
465
    if (pIter == NULL) break;
702✔
466

467
    if (pDetail->scanId == pScan->scanId) {
468✔
468
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
234✔
469
      if (pVgroup == NULL) {
234✔
470
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
471
        sdbCancelFetch(pMnode->pSdb, pIter);
×
472
        sdbRelease(pMnode->pSdb, pDetail);
×
473
        mndTransDrop(pTrans);
×
474
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
475
        if (terrno != 0) code = terrno;
×
476
        TAOS_RETURN(code);
×
477
      }
478

479
      if ((code = mndAddKillScanAction(pMnode, pTrans, pVgroup, pScan->scanId, pDetail->dnodeId)) != 0) {
234✔
480
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
481
        sdbCancelFetch(pMnode->pSdb, pIter);
×
482
        sdbRelease(pMnode->pSdb, pDetail);
×
483
        mndTransDrop(pTrans);
×
484
        TAOS_RETURN(code);
×
485
      }
486

487
      mndReleaseVgroup(pMnode, pVgroup);
234✔
488
    }
489

490
    sdbRelease(pMnode->pSdb, pDetail);
468✔
491
  }
492

493
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
234✔
494
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
495
    mndTransDrop(pTrans);
×
496
    TAOS_RETURN(code);
×
497
  }
498

499
  mndTransDrop(pTrans);
234✔
500
  return 0;
234✔
501
}
502

503
static int32_t mndProcessKillScanReq(SRpcMsg *pReq) {
234✔
504
  int32_t      code = 0;
234✔
505
  int32_t      lino = 0;
234✔
506
  SKillScanReq killScanReq = {0};
234✔
507

508
  if ((code = tDeserializeSKillScanReq(pReq->pCont, pReq->contLen, &killScanReq)) != 0) {
234✔
509
    TAOS_RETURN(code);
×
510
  }
511

512
  mInfo("start to kill scan:%" PRId32, killScanReq.scanId);
234✔
513

514
  SMnode   *pMnode = pReq->info.node;
234✔
515
  SScanObj *pScan = mndAcquireScan(pMnode, killScanReq.scanId);
234✔
516
  if (pScan == NULL) {
234✔
517
    code = TSDB_CODE_MND_INVALID_SCAN_ID;
×
518
    tFreeSKillScanReq(&killScanReq);
×
519
    TAOS_RETURN(code);
×
520
  }
521

522
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SCAN_DB), &lino, _OVER);
234✔
523

524
  TAOS_CHECK_GOTO(mndKillScan(pMnode, pReq, pScan), &lino, _OVER);
234✔
525

526
  code = TSDB_CODE_ACTION_IN_PROGRESS;
234✔
527

528
#if 0
529
  char    obj[TSDB_INT32_ID_LEN] = {0};
530
  int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pScan->scanId);
531
  if ((uint32_t)nBytes < sizeof(obj)) {
532
    auditRecord(pReq, pMnode->clusterId, "killScan", pScan->dbname, obj, killScanReq.sql, killScanReq.sqlLen);
533
  } else {
534
    mError("scan:%" PRId32 " failed to audit since %s", pScan->scanId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
535
  }
536
#endif
537
_OVER:
234✔
538
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
234✔
539
    mError("failed to kill scan %" PRId32 " since %s", killScanReq.scanId, terrstr());
×
540
  }
541

542
  tFreeSKillScanReq(&killScanReq);
234✔
543
  mndReleaseScan(pMnode, pScan);
234✔
544

545
  TAOS_RETURN(code);
234✔
546
}
547

548
// update progress
549
static int32_t mndUpdateScanProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t scanId, SQueryScanProgressRsp *rsp) {
1,164✔
550
  int32_t code = 0;
1,164✔
551

552
  void *pIter = NULL;
1,164✔
553
  while (1) {
466✔
554
    SScanDetailObj *pDetail = NULL;
1,630✔
555
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
1,630✔
556
    if (pIter == NULL) break;
1,630✔
557

558
    if (pDetail->scanId == scanId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
1,630✔
559
      pDetail->newNumberFileset = rsp->numberFileset;
1,164✔
560
      pDetail->newFinished = rsp->finished;
1,164✔
561
      pDetail->progress = rsp->progress;
1,164✔
562
      pDetail->remainingTime = rsp->remainingTime;
1,164✔
563

564
      sdbCancelFetch(pMnode->pSdb, pIter);
1,164✔
565
      sdbRelease(pMnode->pSdb, pDetail);
1,164✔
566

567
      TAOS_RETURN(code);
1,164✔
568
    }
569

570
    sdbRelease(pMnode->pSdb, pDetail);
466✔
571
  }
572

573
  return TSDB_CODE_MND_SCAN_DETAIL_NOT_EXIST;
×
574
}
575

576
static int32_t mndProcessQueryScanRsp(SRpcMsg *pReq) {
1,164✔
577
  int32_t               code = 0;
1,164✔
578
  SQueryScanProgressRsp req = {0};
1,164✔
579
  if (pReq->code != 0) {
1,164✔
580
    mError("received wrong scan response, req code is %s", tstrerror(pReq->code));
×
581
    TAOS_RETURN(pReq->code);
×
582
  }
583
  code = tDeserializeSQueryScanProgressRsp(pReq->pCont, pReq->contLen, &req);
1,164✔
584
  if (code != 0) {
1,164✔
585
    mError("failed to deserialize vnode-query-scan-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
586
           pReq->contLen);
587
    TAOS_RETURN(code);
×
588
  }
589

590
  mDebug("scan:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.scanId, req.vgId,
1,164✔
591
         req.dnodeId, req.numberFileset, req.finished);
592

593
  SMnode *pMnode = pReq->info.node;
1,164✔
594

595
  code = mndUpdateScanProgress(pMnode, pReq, req.scanId, &req);
1,164✔
596
  if (code != 0) {
1,164✔
597
    mError("scan:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.scanId,
×
598
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
599
    TAOS_RETURN(code);
×
600
  }
601

602
  TAOS_RETURN(code);
1,164✔
603
}
604

605
// timer
606
static void mndScanSendProgressReq(SMnode *pMnode, SScanObj *pScan) {
932✔
607
  void *pIter = NULL;
932✔
608

609
  while (1) {
1,515✔
610
    SScanDetailObj *pDetail = NULL;
2,447✔
611
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
2,447✔
612
    if (pIter == NULL) break;
2,447✔
613

614
    if (pDetail->scanId == pScan->scanId) {
1,515✔
615
      SEpSet epSet = {0};
1,164✔
616

617
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
1,164✔
618
      if (pDnode == NULL) break;
1,164✔
619
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
1,164✔
620
        sdbRelease(pMnode->pSdb, pDetail);
×
621
        continue;
×
622
      }
623
      mndReleaseDnode(pMnode, pDnode);
1,164✔
624

625
      SQueryScanProgressReq req;
1,164✔
626
      req.scanId = pDetail->scanId;
1,164✔
627
      req.vgId = pDetail->vgId;
1,164✔
628
      req.dnodeId = pDetail->dnodeId;
1,164✔
629

630
      int32_t contLen = tSerializeSQueryScanProgressReq(NULL, 0, &req);
1,164✔
631
      if (contLen < 0) {
1,164✔
632
        sdbRelease(pMnode->pSdb, pDetail);
×
633
        continue;
×
634
      }
635

636
      contLen += sizeof(SMsgHead);
1,164✔
637

638
      SMsgHead *pHead = rpcMallocCont(contLen);
1,164✔
639
      if (pHead == NULL) {
1,164✔
640
        sdbRelease(pMnode->pSdb, pDetail);
×
641
        continue;
×
642
      }
643

644
      pHead->contLen = htonl(contLen);
1,164✔
645
      pHead->vgId = htonl(pDetail->vgId);
1,164✔
646

647
      if (tSerializeSQueryScanProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
1,164✔
648
        sdbRelease(pMnode->pSdb, pDetail);
×
649
        continue;
×
650
      }
651

652
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SCAN_PROGRESS, .contLen = contLen};
1,164✔
653

654
      rpcMsg.pCont = pHead;
1,164✔
655

656
      char    detail[1024] = {0};
1,164✔
657
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
2,328✔
658
                              TMSG_INFO(TDMT_VND_QUERY_SCAN_PROGRESS), epSet.numOfEps, epSet.inUse);
2,328✔
659
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
2,328✔
660
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
1,164✔
661
      }
662

663
      mDebug("scan:%d, send update progress msg to %s", pDetail->scanId, detail);
1,164✔
664

665
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
1,164✔
666
        sdbRelease(pMnode->pSdb, pDetail);
×
667
        continue;
×
668
      }
669
    }
670

671
    sdbRelease(pMnode->pSdb, pDetail);
1,515✔
672
  }
673
}
932✔
674

675
static int32_t mndSaveScanProgress(SMnode *pMnode, int32_t scanId) {
932✔
676
  int32_t code = 0;
932✔
677
  bool    needSave = false;
932✔
678
  void   *pIter = NULL;
932✔
679
  while (1) {
1,515✔
680
    SScanDetailObj *pDetail = NULL;
2,447✔
681
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
2,447✔
682
    if (pIter == NULL) break;
2,447✔
683

684
    if (pDetail->scanId == scanId) {
1,515✔
685
      mDebug(
1,164✔
686
          "scan:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
687
          "newNumberFileset:%d, newFinished:%d",
688
          pDetail->scanId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
689
          pDetail->newNumberFileset, pDetail->newFinished);
690

691
      // these 2 number will jump back after dnode restart, so < is not used here
692
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
1,164✔
693
        needSave = true;
582✔
694
    }
695

696
    sdbRelease(pMnode->pSdb, pDetail);
1,515✔
697
  }
698

699
  char    dbname[TSDB_TABLE_FNAME_LEN] = {0};
932✔
700
  int64_t dbUid = 0;
932✔
701
  TAOS_CHECK_RETURN(mndScanGetDbInfo(pMnode, scanId, dbname, TSDB_TABLE_FNAME_LEN, &dbUid));
932✔
702

703
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
932✔
704
    needSave = true;
×
705
    mWarn("scan:%" PRId32 ", no db exist, set needSave:%s", scanId, dbname);
×
706
  }
707

708
  if (!needSave) {
932✔
709
    mDebug("scan:%" PRId32 ", no need to save", scanId);
466✔
710
    TAOS_RETURN(code);
466✔
711
  }
712

713
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-scan-progress");
466✔
714
  if (pTrans == NULL) {
466✔
715
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
716
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
717
    if (terrno != 0) code = terrno;
×
718
    TAOS_RETURN(code);
×
719
  }
720
  mInfo("scan:%d, trans:%d, used to update scan progress.", scanId, pTrans->id);
466✔
721

722
  mndTransSetDbName(pTrans, dbname, NULL);
466✔
723

724
  pIter = NULL;
466✔
725
  while (1) {
699✔
726
    SScanDetailObj *pDetail = NULL;
1,165✔
727
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
1,165✔
728
    if (pIter == NULL) break;
1,165✔
729

730
    if (pDetail->scanId == scanId) {
699✔
731
      mInfo(
582✔
732
          "scan:%d, trans:%d, check scan progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
733
          "newNumberFileset:%d, newFinished:%d",
734
          pDetail->scanId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
735
          pDetail->newNumberFileset, pDetail->newFinished);
736

737
      pDetail->numberFileset = pDetail->newNumberFileset;
582✔
738
      pDetail->finished = pDetail->newFinished;
582✔
739

740
      SSdbRaw *pCommitRaw = mndScanDetailActionEncode(pDetail);
582✔
741
      if (pCommitRaw == NULL) {
582✔
742
        sdbCancelFetch(pMnode->pSdb, pIter);
×
743
        sdbRelease(pMnode->pSdb, pDetail);
×
744
        mndTransDrop(pTrans);
×
745
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
746
        if (terrno != 0) code = terrno;
×
747
        TAOS_RETURN(code);
×
748
      }
749
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
582✔
750
        mError("scan:%d, trans:%d, failed to append commit log since %s", pDetail->scanId, pTrans->id, terrstr());
×
751
        sdbCancelFetch(pMnode->pSdb, pIter);
×
752
        sdbRelease(pMnode->pSdb, pDetail);
×
753
        mndTransDrop(pTrans);
×
754
        TAOS_RETURN(code);
×
755
      }
756
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
582✔
757
        sdbCancelFetch(pMnode->pSdb, pIter);
×
758
        sdbRelease(pMnode->pSdb, pDetail);
×
759
        mndTransDrop(pTrans);
×
760
        TAOS_RETURN(code);
×
761
      }
762
    }
763

764
    sdbRelease(pMnode->pSdb, pDetail);
699✔
765
  }
766

767
  bool allFinished = true;
466✔
768
  pIter = NULL;
466✔
769
  while (1) {
699✔
770
    SScanDetailObj *pDetail = NULL;
1,165✔
771
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
1,165✔
772
    if (pIter == NULL) break;
1,165✔
773

774
    if (pDetail->scanId == scanId) {
699✔
775
      mInfo("scan:%d, trans:%d, check scan finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
582✔
776
            pDetail->scanId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
777

778
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
582✔
UNCOV
779
        allFinished = false;
×
UNCOV
780
        sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
781
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
782
        break;
×
783
      }
784
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
582✔
785
        allFinished = false;
×
786
        sdbCancelFetch(pMnode->pSdb, pIter);
×
787
        sdbRelease(pMnode->pSdb, pDetail);
×
788
        break;
×
789
      }
790
    }
791

792
    sdbRelease(pMnode->pSdb, pDetail);
699✔
793
  }
794

795
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
466✔
796
    allFinished = true;
×
797
    mWarn("scan:%" PRId32 ", no db exist, set all finished:%s", scanId, dbname);
×
798
  }
799

800
  if (allFinished) {
466✔
801
    mInfo("scan:%d, all finished", scanId);
466✔
802
    pIter = NULL;
466✔
803
    while (1) {
699✔
804
      SScanDetailObj *pDetail = NULL;
1,165✔
805
      pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
1,165✔
806
      if (pIter == NULL) break;
1,165✔
807

808
      if (pDetail->scanId == scanId) {
699✔
809
        SSdbRaw *pCommitRaw = mndScanDetailActionEncode(pDetail);
582✔
810
        if (pCommitRaw == NULL) {
582✔
811
          mndTransDrop(pTrans);
×
812
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
813
          if (terrno != 0) code = terrno;
×
814
          TAOS_RETURN(code);
×
815
        }
816
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
582✔
817
          mError("scan:%d, trans:%d, failed to append commit log since %s", pDetail->scanId, pTrans->id, terrstr());
×
818
          sdbCancelFetch(pMnode->pSdb, pIter);
×
819
          sdbRelease(pMnode->pSdb, pDetail);
×
820
          mndTransDrop(pTrans);
×
821
          TAOS_RETURN(code);
×
822
        }
823
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
582✔
824
          sdbCancelFetch(pMnode->pSdb, pIter);
×
825
          sdbRelease(pMnode->pSdb, pDetail);
×
826
          mndTransDrop(pTrans);
×
827
          TAOS_RETURN(code);
×
828
        }
829
        mInfo("scan:%d, add drop scandetail action", pDetail->scanDetailId);
582✔
830
      }
831

832
      sdbRelease(pMnode->pSdb, pDetail);
699✔
833
    }
834

835
    SScanObj *pScan = mndAcquireScan(pMnode, scanId);
466✔
836
    if (pScan == NULL) {
466✔
837
      mndTransDrop(pTrans);
×
838
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
839
      if (terrno != 0) code = terrno;
×
840
      TAOS_RETURN(code);
×
841
    }
842
    SSdbRaw *pCommitRaw = mndScanActionEncode(pScan);
466✔
843
    mndReleaseScan(pMnode, pScan);
466✔
844
    if (pCommitRaw == NULL) {
466✔
845
      mndTransDrop(pTrans);
×
846
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
847
      if (terrno != 0) code = terrno;
×
848
      TAOS_RETURN(code);
×
849
    }
850
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
466✔
851
      mError("scan:%d, trans:%d, failed to append commit log since %s", scanId, pTrans->id, terrstr());
×
852
      mndTransDrop(pTrans);
×
853
      TAOS_RETURN(code);
×
854
    }
855
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
466✔
856
      mError("scan:%d, trans:%d, failed to append commit log since %s", scanId, pTrans->id, terrstr());
×
857
      mndTransDrop(pTrans);
×
858
      TAOS_RETURN(code);
×
859
    }
860
    mInfo("scan:%d, add drop scan action", pScan->scanId);
466✔
861
  }
862

863
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
466✔
864
    mError("scan:%d, trans:%d, failed to prepare since %s", scanId, pTrans->id, terrstr());
×
865
    mndTransDrop(pTrans);
×
866
    TAOS_RETURN(code);
×
867
  }
868

869
  mndTransDrop(pTrans);
466✔
870
  return 0;
466✔
871
}
872

873
static void mndScanPullup(SMnode *pMnode) {
1,965,964✔
874
  int32_t code = 0;
1,965,964✔
875
  SSdb   *pSdb = pMnode->pSdb;
1,965,964✔
876
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_SCAN), sizeof(int32_t));
1,965,964✔
877
  if (pArray == NULL) return;
1,965,964✔
878

879
  void *pIter = NULL;
1,965,964✔
880
  while (1) {
932✔
881
    SScanObj *pScan = NULL;
1,966,896✔
882
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN, pIter, (void **)&pScan);
1,966,896✔
883
    if (pIter == NULL) break;
1,966,896✔
884
    if (taosArrayPush(pArray, &pScan->scanId) == NULL) {
1,864✔
885
      mError("failed to push scan id:%d into array, but continue pull up", pScan->scanId);
×
886
    }
887
    sdbRelease(pSdb, pScan);
932✔
888
  }
889

890
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
1,966,896✔
891
    mInfo("begin to pull up");
932✔
892
    int32_t  *pScanId = taosArrayGet(pArray, i);
932✔
893
    SScanObj *pScan = mndAcquireScan(pMnode, *pScanId);
932✔
894
    if (pScan != NULL) {
932✔
895
      mInfo("scan:%d, begin to pull up", pScan->scanId);
932✔
896
      mndScanSendProgressReq(pMnode, pScan);
932✔
897
      if ((code = mndSaveScanProgress(pMnode, pScan->scanId)) != 0) {
932✔
898
        mError("scan:%d, failed to save scan progress since %s", pScan->scanId, tstrerror(code));
×
899
      }
900
      mndReleaseScan(pMnode, pScan);
932✔
901
    }
902
  }
903
  taosArrayDestroy(pArray);
1,965,964✔
904
}
905

906
static int32_t mndBuildScanDbRsp(SScanDbRsp *pScanRsp, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) {
466✔
907
  int32_t code = 0;
466✔
908
  int32_t rspLen = tSerializeSScanDbRsp(NULL, 0, pScanRsp);
466✔
909
  void   *pRsp = NULL;
466✔
910
  if (useRpcMalloc) {
466✔
911
    pRsp = rpcMallocCont(rspLen);
×
912
  } else {
913
    pRsp = taosMemoryMalloc(rspLen);
466✔
914
  }
915

916
  if (pRsp == NULL) {
466✔
917
    code = TSDB_CODE_OUT_OF_MEMORY;
×
918
    TAOS_RETURN(code);
×
919
  }
920

921
  (void)tSerializeSScanDbRsp(pRsp, rspLen, pScanRsp);
466✔
922
  *pRspLen = rspLen;
466✔
923
  *ppRsp = pRsp;
466✔
924
  TAOS_RETURN(code);
466✔
925
}
926

927
static int32_t mndSetScanDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t scanTs) {
698✔
928
  int32_t code = 0;
698✔
929
  SDbObj  dbObj = {0};
698✔
930
  memcpy(&dbObj, pDb, sizeof(SDbObj));
698✔
931
  dbObj.scanStartTime = scanTs;
698✔
932

933
  SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj);
698✔
934
  if (pCommitRaw == NULL) {
698✔
935
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
936
    if (terrno != 0) code = terrno;
×
937
    TAOS_RETURN(code);
×
938
  }
939
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
698✔
940
    sdbFreeRaw(pCommitRaw);
×
941
    TAOS_RETURN(code);
×
942
  }
943

944
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
698✔
945
    sdbFreeRaw(pCommitRaw);
×
946
    TAOS_RETURN(code);
×
947
  }
948
  TAOS_RETURN(code);
698✔
949
}
950

951
static void *mndBuildScanVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t scanTs,
582✔
952
                                  STimeWindow tw) {
953
  SScanVnodeReq scanReq = {0};
582✔
954
  scanReq.dbUid = pDb->uid;
582✔
955
  scanReq.scanStartTime = scanTs;
582✔
956
  scanReq.tw = tw;
582✔
957
  tstrncpy(scanReq.db, pDb->name, TSDB_DB_FNAME_LEN);
582✔
958

959
  mInfo("vgId:%d, build scan vnode config req", pVgroup->vgId);
582✔
960
  int32_t contLen = tSerializeSScanVnodeReq(NULL, 0, &scanReq);
582✔
961
  if (contLen < 0) {
582✔
962
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
963
    return NULL;
×
964
  }
965
  contLen += sizeof(SMsgHead);
582✔
966

967
  void *pReq = taosMemoryMalloc(contLen);
582✔
968
  if (pReq == NULL) {
582✔
969
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
970
    return NULL;
×
971
  }
972

973
  SMsgHead *pHead = pReq;
582✔
974
  pHead->contLen = htonl(contLen);
582✔
975
  pHead->vgId = htonl(pVgroup->vgId);
582✔
976

977
  if (tSerializeSScanVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &scanReq) < 0) {
582✔
978
    taosMemoryFree(pReq);
×
979
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
980
    return NULL;
×
981
  }
982
  *pContLen = contLen;
582✔
983
  return pReq;
582✔
984
}
985

986
static int32_t mndBuildScanVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t scanTs,
582✔
987
                                        STimeWindow tw) {
988
  int32_t      code = 0;
582✔
989
  STransAction action = {0};
582✔
990
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
582✔
991

992
  int32_t contLen = 0;
582✔
993
  void   *pReq = mndBuildScanVnodeReq(pMnode, pDb, pVgroup, &contLen, scanTs, tw);
582✔
994
  if (pReq == NULL) {
582✔
995
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
996
    if (terrno != 0) code = terrno;
×
997
    TAOS_RETURN(code);
×
998
  }
999

1000
  action.pCont = pReq;
582✔
1001
  action.contLen = contLen;
582✔
1002
  action.msgType = TDMT_VND_SCAN;
582✔
1003

1004
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
582✔
1005
    taosMemoryFree(pReq);
×
1006
    TAOS_RETURN(code);
×
1007
  }
1008

1009
  TAOS_RETURN(code);
582✔
1010
}
1011

1012
extern int32_t mndAddScanDetailToTran(SMnode *pMnode, STrans *pTrans, SScanObj *pScan, SVgObj *pVgroup,
1013
                                      SVnodeGid *pVgid, int32_t index);
1014

1015
static int32_t mndSetScanDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t scanTs, STimeWindow tw,
698✔
1016
                                       SArray *vgroupIds, SScanDbRsp *pScanRsp) {
1017
  int32_t code = 0;
698✔
1018
  SSdb   *pSdb = pMnode->pSdb;
698✔
1019
  void   *pIter = NULL;
698✔
1020

1021
  SScanObj scan;
698✔
1022
  if ((code = mndAddScanToTran(pMnode, pTrans, &scan, pDb, pScanRsp)) != 0) {
698✔
1023
    TAOS_RETURN(code);
×
1024
  }
1025

1026
  int32_t j = 0;
698✔
1027
  int32_t numOfVgroups = taosArrayGetSize(vgroupIds);
698✔
1028
  if (numOfVgroups > 0) {
698✔
1029
    for (int32_t i = 0; i < numOfVgroups; i++) {
1,044✔
1030
      int64_t vgId = *(int64_t *)taosArrayGet(vgroupIds, i);
812✔
1031
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
812✔
1032

1033
      if (pVgroup == NULL) {
812✔
1034
        mError("db:%s, vgroup:%" PRId64 " not exist", pDb->name, vgId);
×
1035
        TAOS_RETURN(TSDB_CODE_MND_VGROUP_NOT_EXIST);
×
1036
      } else if (pVgroup->dbUid != pDb->uid) {
812✔
1037
        mError("db:%s, vgroup:%" PRId64 " not belong to db:%s", pDb->name, vgId, pDb->name);
232✔
1038
        sdbRelease(pSdb, pVgroup);
232✔
1039
        TAOS_RETURN(TSDB_CODE_MND_VGROUP_NOT_EXIST);
232✔
1040
      }
1041
      sdbRelease(pSdb, pVgroup);
580✔
1042
    }
1043

1044
    for (int32_t i = 0; i < numOfVgroups; i++) {
580✔
1045
      int64_t vgId = *(int64_t *)taosArrayGet(vgroupIds, i);
348✔
1046
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
348✔
1047

1048
      if ((code = mndBuildScanVgroupAction(pMnode, pTrans, pDb, pVgroup, scanTs, tw)) != 0) {
348✔
1049
        sdbRelease(pSdb, pVgroup);
×
1050
        TAOS_RETURN(code);
×
1051
      }
1052

1053
      for (int32_t i = 0; i < pVgroup->replica; i++) {
696✔
1054
        SVnodeGid *gid = &pVgroup->vnodeGid[i];
348✔
1055
        if ((code = mndAddScanDetailToTran(pMnode, pTrans, &scan, pVgroup, gid, j)) != 0) {
348✔
1056
          sdbRelease(pSdb, pVgroup);
×
1057
          TAOS_RETURN(code);
×
1058
        }
1059
        j++;
348✔
1060
      }
1061
      sdbRelease(pSdb, pVgroup);
348✔
1062
    }
1063
  } else {
1064
    while (1) {
468✔
1065
      SVgObj *pVgroup = NULL;
702✔
1066
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
702✔
1067
      if (pIter == NULL) break;
702✔
1068

1069
      if (pVgroup->dbUid == pDb->uid) {
468✔
1070
        if ((code = mndBuildScanVgroupAction(pMnode, pTrans, pDb, pVgroup, scanTs, tw)) != 0) {
234✔
1071
          sdbCancelFetch(pSdb, pIter);
×
1072
          sdbRelease(pSdb, pVgroup);
×
1073
          TAOS_RETURN(code);
×
1074
        }
1075

1076
        for (int32_t i = 0; i < pVgroup->replica; i++) {
468✔
1077
          SVnodeGid *gid = &pVgroup->vnodeGid[i];
234✔
1078
          if ((code = mndAddScanDetailToTran(pMnode, pTrans, &scan, pVgroup, gid, j)) != 0) {
234✔
1079
            sdbCancelFetch(pSdb, pIter);
×
1080
            sdbRelease(pSdb, pVgroup);
×
1081
            TAOS_RETURN(code);
×
1082
          }
1083
          j++;
234✔
1084
        }
1085
      }
1086

1087
      sdbRelease(pSdb, pVgroup);
468✔
1088
    }
1089
  }
1090

1091
  TAOS_RETURN(code);
466✔
1092
}
1093

1094
static int32_t mndScanDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds) {
698✔
1095
  int32_t    code = 0;
698✔
1096
  int32_t    lino;
1097
  SScanDbRsp scanRsp = {0};
698✔
1098

1099
  bool  isExist = false;
698✔
1100
  void *pIter = NULL;
698✔
1101
  while (1) {
117✔
1102
    SScanObj *pScan = NULL;
815✔
1103
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN, pIter, (void **)&pScan);
815✔
1104
    if (pIter == NULL) break;
815✔
1105

1106
    if (strcmp(pScan->dbname, pDb->name) == 0) {
117✔
1107
      isExist = true;
×
1108
    }
1109
    sdbRelease(pMnode->pSdb, pScan);
117✔
1110
  }
1111
  if (isExist) {
698✔
1112
    mInfo("scan db:%s already exist", pDb->name);
×
1113

1114
    if (pReq) {
×
1115
      int32_t rspLen = 0;
×
1116
      void   *pRsp = NULL;
×
1117
      scanRsp.scanId = 0;
×
1118
      scanRsp.bAccepted = false;
×
1119
      code = mndBuildScanDbRsp(&scanRsp, &rspLen, &pRsp, true);
×
1120
      TSDB_CHECK_CODE(code, lino, _OVER);
×
1121

1122
      pReq->info.rsp = pRsp;
×
1123
      pReq->info.rspLen = rspLen;
×
1124
    }
1125

1126
    return TSDB_CODE_MND_SCAN_ALREADY_EXIST;
×
1127
  }
1128

1129
  int64_t scanTs = taosGetTimestampMs();
698✔
1130
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "scan-db");
698✔
1131
  if (pTrans == NULL) goto _OVER;
698✔
1132

1133
  mInfo("trans:%d, used to scan db:%s", pTrans->id, pDb->name);
698✔
1134
  mndTransSetDbName(pTrans, pDb->name, NULL);
698✔
1135
  code = mndTransCheckConflict(pMnode, pTrans);
698✔
1136
  TSDB_CHECK_CODE(code, lino, _OVER);
698✔
1137

1138
  code = mndSetScanDbCommitLogs(pMnode, pTrans, pDb, scanTs);
698✔
1139
  TSDB_CHECK_CODE(code, lino, _OVER);
698✔
1140

1141
  code = mndSetScanDbRedoActions(pMnode, pTrans, pDb, scanTs, tw, vgroupIds, &scanRsp);
698✔
1142
  TSDB_CHECK_CODE(code, lino, _OVER);
698✔
1143

1144
  if (pReq) {
466✔
1145
    int32_t rspLen = 0;
466✔
1146
    void   *pRsp = NULL;
466✔
1147
    scanRsp.bAccepted = true;
466✔
1148
    code = mndBuildScanDbRsp(&scanRsp, &rspLen, &pRsp, false);
466✔
1149
    TSDB_CHECK_CODE(code, lino, _OVER);
466✔
1150
    mndTransSetRpcRsp(pTrans, pRsp, rspLen);
466✔
1151
  }
1152

1153
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
466✔
1154
  code = 0;
466✔
1155

1156
_OVER:
698✔
1157
  mndTransDrop(pTrans);
698✔
1158
  TAOS_RETURN(code);
698✔
1159
}
1160

1161
static int32_t mndProcessScanTimer(SRpcMsg *pReq) {
1,965,964✔
1162
  mTrace("start to process scan timer");
1,965,964✔
1163
  mndScanPullup(pReq->info.node);
1,965,964✔
1164
  return 0;
1,965,964✔
1165
}
1166

1167
int32_t mndProcessScanDbReq(SRpcMsg *pReq) {
698✔
1168
  SMnode    *pMnode = pReq->info.node;
698✔
1169
  int32_t    code = -1;
698✔
1170
  SDbObj    *pDb = NULL;
698✔
1171
  SScanDbReq scanReq = {0};
698✔
1172

1173
  if (tDeserializeSScanDbReq(pReq->pCont, pReq->contLen, &scanReq) != 0) {
698✔
1174
    code = TSDB_CODE_INVALID_MSG;
×
1175
    goto _OVER;
×
1176
  }
1177

1178
  mInfo("db:%s, start to scan", scanReq.db);
698✔
1179

1180
  pDb = mndAcquireDb(pMnode, scanReq.db);
698✔
1181
  if (pDb == NULL) {
698✔
1182
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1183
    if (terrno != 0) code = terrno;
×
1184
    goto _OVER;
×
1185
  }
1186

1187
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SCAN_DB, pDb), NULL, _OVER);
698✔
1188

1189
  code = mndScanDb(pMnode, pReq, pDb, scanReq.timeRange, scanReq.vgroupIds);
698✔
1190
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
698✔
1191

1192
_OVER:
698✔
1193
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
698✔
1194
    mError("db:%s, failed to process scan db req since %s", scanReq.db, terrstr());
232✔
1195
  }
1196

1197
  mndReleaseDb(pMnode, pDb);
698✔
1198
  tFreeSScanDbReq(&scanReq);
698✔
1199
  TAOS_RETURN(code);
698✔
1200
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc