• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4791

13 Oct 2025 06:50AM UTC coverage: 57.628% (-0.8%) from 58.476%
#4791

push

travis-ci

web-flow
Merge pull request #33213 from taosdata/fix/huoh/timemoe_model_directory

fix: fix tdgpt timemoe model directory

136628 of 303332 branches covered (45.04%)

Branch coverage included in aggregate %.

208121 of 294900 relevant lines covered (70.57%)

4250784.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.43
/source/dnode/mnode/impl/src/mndScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndScan.h"
16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndDnode.h"
19
#include "mndPrivilege.h"
20
#include "mndScan.h"
21
#include "mndScanDetail.h"
22
#include "mndShow.h"
23
#include "mndTrans.h"
24
#include "mndVgroup.h"
25
#include "tmisce.h"
26
#include "tmsgcb.h"
27

28
#define MND_SCAN_VER_NUMBER 1
29
#define MND_SCAN_ID_LEN     11
30

31
static int32_t  mndProcessScanTimer(SRpcMsg *pReq);
32
static SSdbRaw *mndScanActionEncode(SScanObj *pScan);
33
static SSdbRow *mndScanActionDecode(SSdbRaw *pRaw);
34
static int32_t  mndScanActionInsert(SSdb *pSdb, SScanObj *pScan);
35
static int32_t  mndScanActionUpdate(SSdb *pSdb, SScanObj *pOldScan, SScanObj *pNewScan);
36
static int32_t  mndScanActionDelete(SSdb *pSdb, SScanObj *pScan);
37
static int32_t  mndRetrieveScan(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
38
static int32_t  mndProcessKillScanReq(SRpcMsg *pReq);
39
static int32_t  mndProcessQueryScanRsp(SRpcMsg *pReq);
40

41
int32_t mndInitScan(SMnode *pMnode) {
1,332✔
42
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SCAN, mndRetrieveScan);
1,332✔
43
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SCAN, mndProcessKillScanReq);
1,332✔
44
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SCAN_PROGRESS_RSP, mndProcessQueryScanRsp);
1,332✔
45
  mndSetMsgHandle(pMnode, TDMT_MND_SCAN_TIMER, mndProcessScanTimer);
1,332✔
46
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SCAN_RSP, mndTransProcessRsp);
1,332✔
47

48
  SSdbTable table = {
1,332✔
49
      .sdbType = SDB_SCAN,
50
      .keyType = SDB_KEY_INT32,
51
      .encodeFp = (SdbEncodeFp)mndScanActionEncode,
52
      .decodeFp = (SdbDecodeFp)mndScanActionDecode,
53
      .insertFp = (SdbInsertFp)mndScanActionInsert,
54
      .updateFp = (SdbUpdateFp)mndScanActionUpdate,
55
      .deleteFp = (SdbDeleteFp)mndScanActionDelete,
56
  };
57

58
  return sdbSetTable(pMnode->pSdb, table);
1,332✔
59
}
60

61
void mndCleanupScan(SMnode *pMnode) { mDebug("mnd scan cleanup"); }
1,332✔
62

63
void tFreeScanObj(SScanObj *pScan) {}
10✔
64

65
static int32_t tSerializeSScanObj(void *buf, int32_t bufLen, const SScanObj *pObj) {
24✔
66
  SEncoder encoder = {0};
24✔
67
  int32_t  code = 0;
24✔
68
  int32_t  lino;
69
  int32_t  tlen;
70
  tEncoderInit(&encoder, buf, bufLen);
24✔
71

72
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
24!
73
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->scanId));
48!
74
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
48!
75
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
48!
76

77
  tEndEncode(&encoder);
24✔
78

79
_exit:
24✔
80
  if (code) {
24!
81
    tlen = code;
×
82
  } else {
83
    tlen = encoder.pos;
24✔
84
  }
85
  tEncoderClear(&encoder);
24✔
86
  return tlen;
24✔
87
}
88

89
int32_t tDeserializeSScanObj(void *buf, int32_t bufLen, SScanObj *pObj) {
10✔
90
  int32_t  code = 0;
10✔
91
  int32_t  lino;
92
  SDecoder decoder = {0};
10✔
93
  tDecoderInit(&decoder, buf, bufLen);
10✔
94

95
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
10!
96
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->scanId));
20!
97
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
10!
98
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
20!
99

100
  tEndDecode(&decoder);
10✔
101

102
_exit:
10✔
103
  tDecoderClear(&decoder);
10✔
104
  return code;
10✔
105
}
106

107
static SSdbRaw *mndScanActionEncode(SScanObj *pScan) {
12✔
108
  int32_t code = 0;
12✔
109
  int32_t lino = 0;
12✔
110
  terrno = TSDB_CODE_SUCCESS;
12✔
111

112
  void    *buf = NULL;
12✔
113
  SSdbRaw *pRaw = NULL;
12✔
114

115
  int32_t tlen = tSerializeSScanObj(NULL, 0, pScan);
12✔
116
  if (tlen < 0) {
12!
117
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
118
    goto OVER;
×
119
  }
120

121
  int32_t size = sizeof(int32_t) + tlen;
12✔
122
  pRaw = sdbAllocRaw(SDB_SCAN, MND_SCAN_VER_NUMBER, size);
12✔
123
  if (pRaw == NULL) {
12!
124
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
125
    goto OVER;
×
126
  }
127

128
  buf = taosMemoryMalloc(tlen);
12!
129
  if (buf == NULL) {
12!
130
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
131
    goto OVER;
×
132
  }
133

134
  tlen = tSerializeSScanObj(buf, tlen, pScan);
12✔
135
  if (tlen < 0) {
12!
136
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
137
    goto OVER;
×
138
  }
139

140
  int32_t dataPos = 0;
12✔
141
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
12!
142
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
12!
143
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
12!
144

145
OVER:
12✔
146
  taosMemoryFreeClear(buf);
12!
147
  if (terrno != TSDB_CODE_SUCCESS) {
12!
148
    mError("scan:%" PRId32 ", failed to encode to raw:%p since %s", pScan->scanId, pRaw, terrstr());
×
149
    sdbFreeRaw(pRaw);
×
150
    return NULL;
×
151
  }
152

153
  mTrace("scan:%" PRId32 ", encode to raw:%p, row:%p", pScan->scanId, pRaw, pScan);
12!
154
  return pRaw;
12✔
155
}
156

157
static SSdbRow *mndScanActionDecode(SSdbRaw *pRaw) {
10✔
158
  int32_t   code = 0;
10✔
159
  int32_t   lino = 0;
10✔
160
  SSdbRow  *pRow = NULL;
10✔
161
  SScanObj *pScan = NULL;
10✔
162
  void     *buf = NULL;
10✔
163
  terrno = TSDB_CODE_SUCCESS;
10✔
164

165
  int8_t sver = 0;
10✔
166
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
10!
167
    goto OVER;
×
168
  }
169

170
  if (sver != MND_SCAN_VER_NUMBER) {
10!
171
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
172
    mError("scan read invalid ver, data ver: %d, curr ver: %d", sver, MND_SCAN_VER_NUMBER);
×
173
    goto OVER;
×
174
  }
175

176
  pRow = sdbAllocRow(sizeof(SScanObj));
10✔
177
  if (pRow == NULL) {
10!
178
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
179
    goto OVER;
×
180
  }
181

182
  pScan = sdbGetRowObj(pRow);
10✔
183
  if (pScan == NULL) {
10!
184
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
185
    goto OVER;
×
186
  }
187

188
  int32_t tlen;
189
  int32_t dataPos = 0;
10✔
190
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
10!
191
  buf = taosMemoryMalloc(tlen + 1);
10!
192
  if (buf == NULL) {
10!
193
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
194
    goto OVER;
×
195
  }
196
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
10!
197

198
  if ((terrno = tDeserializeSScanObj(buf, tlen, pScan)) < 0) {
10!
199
    goto OVER;
×
200
  }
201

202
OVER:
10✔
203
  taosMemoryFreeClear(buf);
10!
204
  if (terrno != TSDB_CODE_SUCCESS) {
10!
205
    mError("scan:%" PRId32 ", failed to decode from raw:%p since %s", pScan->scanId, pRaw, terrstr());
×
206
    taosMemoryFreeClear(pRow);
×
207
    return NULL;
×
208
  }
209

210
  mTrace("scan:%" PRId32 ", decode from raw:%p, row:%p", pScan->scanId, pRaw, pScan);
10!
211
  return pRow;
10✔
212
}
213

214
static int32_t mndScanActionInsert(SSdb *pSdb, SScanObj *pScan) {
4✔
215
  mTrace("scan:%" PRId32 ", perform insert action", pScan->scanId);
4!
216
  return 0;
4✔
217
}
218

219
static int32_t mndScanActionDelete(SSdb *pSdb, SScanObj *pScan) {
10✔
220
  mTrace("scan:%" PRId32 ", perform delete action", pScan->scanId);
10!
221
  tFreeScanObj(pScan);
10✔
222
  return 0;
10✔
223
}
224

225
static int32_t mndScanActionUpdate(SSdb *pSdb, SScanObj *pOldScan, SScanObj *pNewScan) {
2✔
226
  mTrace("scan:%" PRId32 ", perform update action, old row:%p new row:%p", pOldScan->scanId, pOldScan, pNewScan);
2!
227

228
  return 0;
2✔
229
}
230

231
static SScanObj *mndAcquireScan(SMnode *pMnode, int64_t scanId) {
22✔
232
  SSdb     *pSdb = pMnode->pSdb;
22✔
233
  SScanObj *pScan = sdbAcquire(pSdb, SDB_SCAN, &scanId);
22✔
234
  if (pScan == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
22!
235
    terrno = TSDB_CODE_SUCCESS;
×
236
  }
237
  return pScan;
22✔
238
}
239

240
static void mndReleaseScan(SMnode *pMnode, SScanObj *pScan) {
22✔
241
  SSdb *pSdb = pMnode->pSdb;
22✔
242
  sdbRelease(pSdb, pScan);
22✔
243
  pScan = NULL;
22✔
244
}
22✔
245

246
static int32_t mndScanGetDbName(SMnode *pMnode, int32_t scanId, char *dbname, int32_t len) {
8✔
247
  int32_t   code = 0;
8✔
248
  SScanObj *pScan = mndAcquireScan(pMnode, scanId);
8✔
249
  if (pScan == NULL) {
8!
250
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
251
    if (terrno != 0) code = terrno;
×
252
    TAOS_RETURN(code);
×
253
  }
254

255
  tstrncpy(dbname, pScan->dbname, len);
8✔
256
  mndReleaseScan(pMnode, pScan);
8✔
257
  TAOS_RETURN(code);
8✔
258
}
259

260
// scan db
261
int32_t mndAddScanToTran(SMnode *pMnode, STrans *pTrans, SScanObj *pScan, SDbObj *pDb, SScanDbRsp *rsp) {
6✔
262
  int32_t code = 0;
6✔
263
  pScan->scanId = tGenIdPI32();
6✔
264

265
  tstrncpy(pScan->dbname, pDb->name, sizeof(pScan->dbname));
6✔
266

267
  pScan->startTime = taosGetTimestampMs();
6✔
268

269
  SSdbRaw *pVgRaw = mndScanActionEncode(pScan);
6✔
270
  if (pVgRaw == NULL) {
6!
271
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
272
    if (terrno != 0) code = terrno;
×
273
    TAOS_RETURN(code);
×
274
  }
275
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
6!
276
    sdbFreeRaw(pVgRaw);
×
277
    TAOS_RETURN(code);
×
278
  }
279

280
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
6!
281
    sdbFreeRaw(pVgRaw);
×
282
    TAOS_RETURN(code);
×
283
  }
284

285
  rsp->scanId = pScan->scanId;
6✔
286

287
  return 0;
6✔
288
}
289

290
// retrieve scan
291
static int32_t mndRetrieveScan(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
44✔
292
  SMnode   *pMnode = pReq->info.node;
44✔
293
  SSdb     *pSdb = pMnode->pSdb;
44✔
294
  int32_t   numOfRows = 0;
44✔
295
  SScanObj *pScan = NULL;
44✔
296
  char     *sep = NULL;
44✔
297
  SDbObj   *pDb = NULL;
44✔
298
  int32_t   code = 0;
44✔
299
  int32_t   lino = 0;
44✔
300

301
  if (strlen(pShow->db) > 0) {
44!
302
    sep = strchr(pShow->db, '.');
×
303
    if (sep &&
×
304
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
305
      sep++;
×
306
    } else {
307
      pDb = mndAcquireDb(pMnode, pShow->db);
×
308
      if (pDb == NULL) return terrno;
×
309
    }
310
  }
311

312
  while (numOfRows < rows) {
86!
313
    pShow->pIter = sdbFetch(pSdb, SDB_SCAN, pShow->pIter, (void **)&pScan);
86✔
314
    if (pShow->pIter == NULL) break;
86✔
315

316
    SColumnInfoData *pColInfo;
317
    SName            n;
318
    int32_t          cols = 0;
42✔
319

320
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
42✔
321

322
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
42✔
323
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pScan->scanId, false), pScan, &lino, _OVER);
42!
324

325
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
42✔
326
    if (pDb != NULL || !IS_SYS_DBNAME(pScan->dbname)) {
84!
327
      SName name = {0};
42✔
328
      TAOS_CHECK_GOTO(tNameFromString(&name, pScan->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
42!
329
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
42✔
330
    } else {
331
      tstrncpy(varDataVal(tmpBuf), pScan->dbname, TSDB_SHOW_SQL_LEN);
×
332
    }
333
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
42✔
334
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pScan, &lino, _OVER);
42!
335

336
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
42✔
337
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pScan->startTime, false), pScan, &lino,
42!
338
                        _OVER);
339

340
    numOfRows++;
42✔
341
    sdbRelease(pSdb, pScan);
42✔
342
  }
343

344
_OVER:
×
345
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
44!
346
  pShow->numOfRows += numOfRows;
44✔
347
  mndReleaseDb(pMnode, pDb);
44✔
348
  return numOfRows;
44✔
349
}
350

351
// kill scan
352
static void *mndBuildKillScanReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t scanId, int32_t dnodeid) {
2✔
353
  SVKillScanReq req = {0};
2✔
354
  req.scanId = scanId;
2✔
355
  req.vgId = pVgroup->vgId;
2✔
356
  req.dnodeId = dnodeid;
2✔
357
  terrno = 0;
2✔
358

359
  mInfo("vgId:%d, build scan vnode config req", pVgroup->vgId);
2!
360
  int32_t contLen = tSerializeSVKillScanReq(NULL, 0, &req);
2✔
361
  if (contLen < 0) {
2!
362
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
363
    return NULL;
×
364
  }
365
  contLen += sizeof(SMsgHead);
2✔
366

367
  void *pReq = taosMemoryMalloc(contLen);
2!
368
  if (pReq == NULL) {
2!
369
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
370
    return NULL;
×
371
  }
372

373
  SMsgHead *pHead = pReq;
2✔
374
  pHead->contLen = htonl(contLen);
2✔
375
  pHead->vgId = htonl(pVgroup->vgId);
2✔
376

377
  mTrace("vgId:%d, build scan vnode config req, contLen:%d", pVgroup->vgId, contLen);
2!
378
  int32_t ret = 0;
2✔
379
  if ((ret = tSerializeSVKillScanReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
2!
380
    taosMemoryFreeClear(pReq);
×
381
    terrno = ret;
×
382
    return NULL;
×
383
  }
384
  *pContLen = contLen;
2✔
385
  return pReq;
2✔
386
}
387

388
static int32_t mndAddKillScanAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t scanId, int32_t dnodeid) {
2✔
389
  int32_t      code = 0;
2✔
390
  STransAction action = {0};
2✔
391

392
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
2✔
393
  if (pDnode == NULL) {
2!
394
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
395
    if (terrno != 0) code = terrno;
×
396
    TAOS_RETURN(code);
×
397
  }
398
  action.epSet = mndGetDnodeEpset(pDnode);
2✔
399
  mndReleaseDnode(pMnode, pDnode);
2✔
400

401
  int32_t contLen = 0;
2✔
402
  void   *pReq = mndBuildKillScanReq(pMnode, pVgroup, &contLen, scanId, dnodeid);
2✔
403
  if (pReq == NULL) {
2!
404
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
405
    if (terrno != 0) code = terrno;
×
406
    TAOS_RETURN(code);
×
407
  }
408

409
  action.pCont = pReq;
2✔
410
  action.contLen = contLen;
2✔
411
  action.msgType = TDMT_VND_KILL_SCAN;
2✔
412

413
  mTrace("trans:%d, kill scan msg len:%d", pTrans->id, contLen);
2!
414

415
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2!
416
    taosMemoryFree(pReq);
×
417
    TAOS_RETURN(code);
×
418
  }
419

420
  return 0;
2✔
421
}
422

423
static int32_t mndKillScan(SMnode *pMnode, SRpcMsg *pReq, SScanObj *pScan) {
2✔
424
  int32_t code = 0;
2✔
425
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-scan");
2✔
426
  if (pTrans == NULL) {
2!
427
    mError("scan:%" PRId32 ", failed to drop since %s", pScan->scanId, terrstr());
×
428
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
429
    if (terrno != 0) code = terrno;
×
430
    TAOS_RETURN(code);
×
431
  }
432
  mInfo("trans:%d, used to kill scan:%" PRId32, pTrans->id, pScan->scanId);
2!
433

434
  mndTransSetDbName(pTrans, pScan->dbname, NULL);
2✔
435

436
  SSdbRaw *pCommitRaw = mndScanActionEncode(pScan);
2✔
437
  if (pCommitRaw == NULL) {
2!
438
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
439
    if (terrno != 0) code = terrno;
×
440
    mndTransDrop(pTrans);
×
441
    TAOS_RETURN(code);
×
442
  }
443
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
2!
444
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
445
    mndTransDrop(pTrans);
×
446
    TAOS_RETURN(code);
×
447
  }
448
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
2!
449
    mndTransDrop(pTrans);
×
450
    TAOS_RETURN(code);
×
451
  }
452

453
  void *pIter = NULL;
2✔
454
  while (1) {
4✔
455
    SScanDetailObj *pDetail = NULL;
6✔
456
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
6✔
457
    if (pIter == NULL) break;
6✔
458

459
    if (pDetail->scanId == pScan->scanId) {
4✔
460
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
2✔
461
      if (pVgroup == NULL) {
2!
462
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
463
        sdbCancelFetch(pMnode->pSdb, pIter);
×
464
        sdbRelease(pMnode->pSdb, pDetail);
×
465
        mndTransDrop(pTrans);
×
466
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
467
        if (terrno != 0) code = terrno;
×
468
        TAOS_RETURN(code);
×
469
      }
470

471
      if ((code = mndAddKillScanAction(pMnode, pTrans, pVgroup, pScan->scanId, pDetail->dnodeId)) != 0) {
2!
472
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
473
        sdbCancelFetch(pMnode->pSdb, pIter);
×
474
        sdbRelease(pMnode->pSdb, pDetail);
×
475
        mndTransDrop(pTrans);
×
476
        TAOS_RETURN(code);
×
477
      }
478

479
      mndReleaseVgroup(pMnode, pVgroup);
2✔
480
    }
481

482
    sdbRelease(pMnode->pSdb, pDetail);
4✔
483
  }
484

485
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
2!
486
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
487
    mndTransDrop(pTrans);
×
488
    TAOS_RETURN(code);
×
489
  }
490

491
  mndTransDrop(pTrans);
2✔
492
  return 0;
2✔
493
}
494

495
static int32_t mndProcessKillScanReq(SRpcMsg *pReq) {
2✔
496
  int32_t      code = 0;
2✔
497
  int32_t      lino = 0;
2✔
498
  SKillScanReq killScanReq = {0};
2✔
499

500
  if ((code = tDeserializeSKillScanReq(pReq->pCont, pReq->contLen, &killScanReq)) != 0) {
2!
501
    TAOS_RETURN(code);
×
502
  }
503

504
  mInfo("start to kill scan:%" PRId32, killScanReq.scanId);
2!
505

506
  SMnode   *pMnode = pReq->info.node;
2✔
507
  SScanObj *pScan = mndAcquireScan(pMnode, killScanReq.scanId);
2✔
508
  if (pScan == NULL) {
2!
509
    code = TSDB_CODE_MND_INVALID_SCAN_ID;
×
510
    tFreeSKillScanReq(&killScanReq);
×
511
    TAOS_RETURN(code);
×
512
  }
513

514
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SCAN_DB), &lino, _OVER);
2!
515

516
  TAOS_CHECK_GOTO(mndKillScan(pMnode, pReq, pScan), &lino, _OVER);
2!
517

518
  code = TSDB_CODE_ACTION_IN_PROGRESS;
2✔
519

520
#if 0
521
  char    obj[TSDB_INT32_ID_LEN] = {0};
522
  int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pScan->scanId);
523
  if ((uint32_t)nBytes < sizeof(obj)) {
524
    auditRecord(pReq, pMnode->clusterId, "killScan", pScan->dbname, obj, killScanReq.sql, killScanReq.sqlLen);
525
  } else {
526
    mError("scan:%" PRId32 " failed to audit since %s", pScan->scanId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
527
  }
528
#endif
529
_OVER:
2✔
530
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
531
    mError("failed to kill scan %" PRId32 " since %s", killScanReq.scanId, terrstr());
×
532
  }
533

534
  tFreeSKillScanReq(&killScanReq);
2✔
535
  mndReleaseScan(pMnode, pScan);
2✔
536

537
  TAOS_RETURN(code);
2✔
538
}
539

540
// update progress
541
static int32_t mndUpdateScanProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t scanId, SQueryScanProgressRsp *rsp) {
10✔
542
  int32_t code = 0;
10✔
543

544
  void *pIter = NULL;
10✔
545
  while (1) {
4✔
546
    SScanDetailObj *pDetail = NULL;
14✔
547
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
14✔
548
    if (pIter == NULL) break;
14!
549

550
    if (pDetail->scanId == scanId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
14!
551
      pDetail->newNumberFileset = rsp->numberFileset;
10✔
552
      pDetail->newFinished = rsp->finished;
10✔
553
      pDetail->progress = rsp->progress;
10✔
554
      pDetail->remainingTime = rsp->remainingTime;
10✔
555

556
      sdbCancelFetch(pMnode->pSdb, pIter);
10✔
557
      sdbRelease(pMnode->pSdb, pDetail);
10✔
558

559
      TAOS_RETURN(code);
10✔
560
    }
561

562
    sdbRelease(pMnode->pSdb, pDetail);
4✔
563
  }
564

565
  return TSDB_CODE_MND_SCAN_DETAIL_NOT_EXIST;
×
566
}
567

568
static int32_t mndProcessQueryScanRsp(SRpcMsg *pReq) {
10✔
569
  int32_t               code = 0;
10✔
570
  SQueryScanProgressRsp req = {0};
10✔
571
  if (pReq->code != 0) {
10!
572
    mError("received wrong scan response, req code is %s", tstrerror(pReq->code));
×
573
    TAOS_RETURN(pReq->code);
×
574
  }
575
  code = tDeserializeSQueryScanProgressRsp(pReq->pCont, pReq->contLen, &req);
10✔
576
  if (code != 0) {
10!
577
    mError("failed to deserialize vnode-query-scan-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
578
           pReq->contLen);
579
    TAOS_RETURN(code);
×
580
  }
581

582
  mDebug("scan:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.scanId, req.vgId,
10!
583
         req.dnodeId, req.numberFileset, req.finished);
584

585
  SMnode *pMnode = pReq->info.node;
10✔
586

587
  code = mndUpdateScanProgress(pMnode, pReq, req.scanId, &req);
10✔
588
  if (code != 0) {
10!
589
    mError("scan:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.scanId,
×
590
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
591
    TAOS_RETURN(code);
×
592
  }
593

594
  TAOS_RETURN(code);
10✔
595
}
596

597
// timer
598
static void mndScanSendProgressReq(SMnode *pMnode, SScanObj *pScan) {
8✔
599
  void *pIter = NULL;
8✔
600

601
  while (1) {
13✔
602
    SScanDetailObj *pDetail = NULL;
21✔
603
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
21✔
604
    if (pIter == NULL) break;
21✔
605

606
    if (pDetail->scanId == pScan->scanId) {
13✔
607
      SEpSet epSet = {0};
10✔
608

609
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
10✔
610
      if (pDnode == NULL) break;
10!
611
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
10!
612
        sdbRelease(pMnode->pSdb, pDetail);
×
613
        continue;
×
614
      }
615
      mndReleaseDnode(pMnode, pDnode);
10✔
616

617
      SQueryScanProgressReq req;
618
      req.scanId = pDetail->scanId;
10✔
619
      req.vgId = pDetail->vgId;
10✔
620
      req.dnodeId = pDetail->dnodeId;
10✔
621

622
      int32_t contLen = tSerializeSQueryScanProgressReq(NULL, 0, &req);
10✔
623
      if (contLen < 0) {
10!
624
        sdbRelease(pMnode->pSdb, pDetail);
×
625
        continue;
×
626
      }
627

628
      contLen += sizeof(SMsgHead);
10✔
629

630
      SMsgHead *pHead = rpcMallocCont(contLen);
10✔
631
      if (pHead == NULL) {
10!
632
        sdbRelease(pMnode->pSdb, pDetail);
×
633
        continue;
×
634
      }
635

636
      pHead->contLen = htonl(contLen);
10✔
637
      pHead->vgId = htonl(pDetail->vgId);
10✔
638

639
      if (tSerializeSQueryScanProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
10!
640
        sdbRelease(pMnode->pSdb, pDetail);
×
641
        continue;
×
642
      }
643

644
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SCAN_PROGRESS, .contLen = contLen};
10✔
645

646
      rpcMsg.pCont = pHead;
10✔
647

648
      char    detail[1024] = {0};
10✔
649
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
20!
650
                              TMSG_INFO(TDMT_VND_QUERY_SCAN_PROGRESS), epSet.numOfEps, epSet.inUse);
20✔
651
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
20✔
652
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
10✔
653
      }
654

655
      mDebug("scan:%d, send update progress msg to %s", pDetail->scanId, detail);
10!
656

657
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
10!
658
        sdbRelease(pMnode->pSdb, pDetail);
×
659
        continue;
×
660
      }
661
    }
662

663
    sdbRelease(pMnode->pSdb, pDetail);
13✔
664
  }
665
}
8✔
666

667
static int32_t mndSaveScanProgress(SMnode *pMnode, int32_t scanId) {
8✔
668
  int32_t code = 0;
8✔
669
  bool    needSave = false;
8✔
670
  void   *pIter = NULL;
8✔
671
  while (1) {
13✔
672
    SScanDetailObj *pDetail = NULL;
21✔
673
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
21✔
674
    if (pIter == NULL) break;
21✔
675

676
    if (pDetail->scanId == scanId) {
13✔
677
      mDebug(
10!
678
          "scan:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
679
          "newNumberFileset:%d, newFinished:%d",
680
          pDetail->scanId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
681
          pDetail->newNumberFileset, pDetail->newFinished);
682

683
      // these 2 number will jump back after dnode restart, so < is not used here
684
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
10!
685
        needSave = true;
5✔
686
    }
687

688
    sdbRelease(pMnode->pSdb, pDetail);
13✔
689
  }
690

691
  char dbname[TSDB_TABLE_FNAME_LEN] = {0};
8✔
692
  TAOS_CHECK_RETURN(mndScanGetDbName(pMnode, scanId, dbname, TSDB_TABLE_FNAME_LEN));
8!
693

694
  if (!mndDbIsExist(pMnode, dbname)) {
8!
695
    needSave = true;
×
696
    mWarn("scan:%" PRId32 ", no db exist, set needSave:%s", scanId, dbname);
×
697
  }
698

699
  if (!needSave) {
8✔
700
    mDebug("scan:%" PRId32 ", no need to save", scanId);
4!
701
    TAOS_RETURN(code);
4✔
702
  }
703

704
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-scan-progress");
4✔
705
  if (pTrans == NULL) {
4!
706
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
707
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
708
    if (terrno != 0) code = terrno;
×
709
    TAOS_RETURN(code);
×
710
  }
711
  mInfo("scan:%d, trans:%d, used to update scan progress.", scanId, pTrans->id);
4!
712

713
  mndTransSetDbName(pTrans, dbname, NULL);
4✔
714

715
  pIter = NULL;
4✔
716
  while (1) {
6✔
717
    SScanDetailObj *pDetail = NULL;
10✔
718
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
10✔
719
    if (pIter == NULL) break;
10✔
720

721
    if (pDetail->scanId == scanId) {
6✔
722
      mInfo(
5!
723
          "scan:%d, trans:%d, check scan progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
724
          "newNumberFileset:%d, newFinished:%d",
725
          pDetail->scanId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
726
          pDetail->newNumberFileset, pDetail->newFinished);
727

728
      pDetail->numberFileset = pDetail->newNumberFileset;
5✔
729
      pDetail->finished = pDetail->newFinished;
5✔
730

731
      SSdbRaw *pCommitRaw = mndScanDetailActionEncode(pDetail);
5✔
732
      if (pCommitRaw == NULL) {
5!
733
        sdbCancelFetch(pMnode->pSdb, pIter);
×
734
        sdbRelease(pMnode->pSdb, pDetail);
×
735
        mndTransDrop(pTrans);
×
736
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
737
        if (terrno != 0) code = terrno;
×
738
        TAOS_RETURN(code);
×
739
      }
740
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
5!
741
        mError("scan:%d, trans:%d, failed to append commit log since %s", pDetail->scanId, pTrans->id, terrstr());
×
742
        sdbCancelFetch(pMnode->pSdb, pIter);
×
743
        sdbRelease(pMnode->pSdb, pDetail);
×
744
        mndTransDrop(pTrans);
×
745
        TAOS_RETURN(code);
×
746
      }
747
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
5!
748
        sdbCancelFetch(pMnode->pSdb, pIter);
×
749
        sdbRelease(pMnode->pSdb, pDetail);
×
750
        mndTransDrop(pTrans);
×
751
        TAOS_RETURN(code);
×
752
      }
753
    }
754

755
    sdbRelease(pMnode->pSdb, pDetail);
6✔
756
  }
757

758
  bool allFinished = true;
4✔
759
  pIter = NULL;
4✔
760
  while (1) {
6✔
761
    SScanDetailObj *pDetail = NULL;
10✔
762
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
10✔
763
    if (pIter == NULL) break;
10✔
764

765
    if (pDetail->scanId == scanId) {
6✔
766
      mInfo("scan:%d, trans:%d, check scan finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
5!
767
            pDetail->scanId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
768

769
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
5!
770
        allFinished = false;
×
771
        sdbCancelFetch(pMnode->pSdb, pIter);
×
772
        sdbRelease(pMnode->pSdb, pDetail);
×
773
        break;
×
774
      }
775
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
5!
776
        allFinished = false;
×
777
        sdbCancelFetch(pMnode->pSdb, pIter);
×
778
        sdbRelease(pMnode->pSdb, pDetail);
×
779
        break;
×
780
      }
781
    }
782

783
    sdbRelease(pMnode->pSdb, pDetail);
6✔
784
  }
785

786
  if (!mndDbIsExist(pMnode, dbname)) {
4!
787
    allFinished = true;
×
788
    mWarn("scan:%" PRId32 ", no db exist, set all finished:%s", scanId, dbname);
×
789
  }
790

791
  if (allFinished) {
4!
792
    mInfo("scan:%d, all finished", scanId);
4!
793
    pIter = NULL;
4✔
794
    while (1) {
6✔
795
      SScanDetailObj *pDetail = NULL;
10✔
796
      pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
10✔
797
      if (pIter == NULL) break;
10✔
798

799
      if (pDetail->scanId == scanId) {
6✔
800
        SSdbRaw *pCommitRaw = mndScanDetailActionEncode(pDetail);
5✔
801
        if (pCommitRaw == NULL) {
5!
802
          mndTransDrop(pTrans);
×
803
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
804
          if (terrno != 0) code = terrno;
×
805
          TAOS_RETURN(code);
×
806
        }
807
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
5!
808
          mError("scan:%d, trans:%d, failed to append commit log since %s", pDetail->scanId, pTrans->id, terrstr());
×
809
          sdbCancelFetch(pMnode->pSdb, pIter);
×
810
          sdbRelease(pMnode->pSdb, pDetail);
×
811
          mndTransDrop(pTrans);
×
812
          TAOS_RETURN(code);
×
813
        }
814
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
5!
815
          sdbCancelFetch(pMnode->pSdb, pIter);
×
816
          sdbRelease(pMnode->pSdb, pDetail);
×
817
          mndTransDrop(pTrans);
×
818
          TAOS_RETURN(code);
×
819
        }
820
        mInfo("scan:%d, add drop scandetail action", pDetail->scanDetailId);
5!
821
      }
822

823
      sdbRelease(pMnode->pSdb, pDetail);
6✔
824
    }
825

826
    SScanObj *pScan = mndAcquireScan(pMnode, scanId);
4✔
827
    if (pScan == NULL) {
4!
828
      mndTransDrop(pTrans);
×
829
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
830
      if (terrno != 0) code = terrno;
×
831
      TAOS_RETURN(code);
×
832
    }
833
    SSdbRaw *pCommitRaw = mndScanActionEncode(pScan);
4✔
834
    mndReleaseScan(pMnode, pScan);
4✔
835
    if (pCommitRaw == NULL) {
4!
836
      mndTransDrop(pTrans);
×
837
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
838
      if (terrno != 0) code = terrno;
×
839
      TAOS_RETURN(code);
×
840
    }
841
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
4!
842
      mError("scan:%d, trans:%d, failed to append commit log since %s", scanId, pTrans->id, terrstr());
×
843
      mndTransDrop(pTrans);
×
844
      TAOS_RETURN(code);
×
845
    }
846
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
4!
847
      mError("scan:%d, trans:%d, failed to append commit log since %s", scanId, pTrans->id, terrstr());
×
848
      mndTransDrop(pTrans);
×
849
      TAOS_RETURN(code);
×
850
    }
851
    mInfo("scan:%d, add drop scan action", pScan->scanId);
4!
852
  }
853

854
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
4!
855
    mError("scan:%d, trans:%d, failed to prepare since %s", scanId, pTrans->id, terrstr());
×
856
    mndTransDrop(pTrans);
×
857
    TAOS_RETURN(code);
×
858
  }
859

860
  mndTransDrop(pTrans);
4✔
861
  return 0;
4✔
862
}
863

864
static void mndScanPullup(SMnode *pMnode) {
3,556✔
865
  int32_t code = 0;
3,556✔
866
  SSdb   *pSdb = pMnode->pSdb;
3,556✔
867
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_SCAN), sizeof(int32_t));
3,556✔
868
  if (pArray == NULL) return;
3,556!
869

870
  void *pIter = NULL;
3,556✔
871
  while (1) {
8✔
872
    SScanObj *pScan = NULL;
3,564✔
873
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN, pIter, (void **)&pScan);
3,564✔
874
    if (pIter == NULL) break;
3,564✔
875
    if (taosArrayPush(pArray, &pScan->scanId) == NULL) {
16!
876
      mError("failed to push scan id:%d into array, but continue pull up", pScan->scanId);
×
877
    }
878
    sdbRelease(pSdb, pScan);
8✔
879
  }
880

881
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
3,564✔
882
    mInfo("begin to pull up");
8!
883
    int32_t  *pScanId = taosArrayGet(pArray, i);
8✔
884
    SScanObj *pScan = mndAcquireScan(pMnode, *pScanId);
8✔
885
    if (pScan != NULL) {
8!
886
      mInfo("scan:%d, begin to pull up", pScan->scanId);
8!
887
      mndScanSendProgressReq(pMnode, pScan);
8✔
888
      if ((code = mndSaveScanProgress(pMnode, pScan->scanId)) != 0) {
8!
889
        mError("scan:%d, failed to save scan progress since %s", pScan->scanId, tstrerror(code));
×
890
      }
891
      mndReleaseScan(pMnode, pScan);
8✔
892
    }
893
  }
894
  taosArrayDestroy(pArray);
3,556✔
895
}
896

897
static int32_t mndBuildScanDbRsp(SScanDbRsp *pScanRsp, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) {
4✔
898
  int32_t code = 0;
4✔
899
  int32_t rspLen = tSerializeSScanDbRsp(NULL, 0, pScanRsp);
4✔
900
  void   *pRsp = NULL;
4✔
901
  if (useRpcMalloc) {
4!
902
    pRsp = rpcMallocCont(rspLen);
×
903
  } else {
904
    pRsp = taosMemoryMalloc(rspLen);
4!
905
  }
906

907
  if (pRsp == NULL) {
4!
908
    code = TSDB_CODE_OUT_OF_MEMORY;
×
909
    TAOS_RETURN(code);
×
910
  }
911

912
  (void)tSerializeSScanDbRsp(pRsp, rspLen, pScanRsp);
4✔
913
  *pRspLen = rspLen;
4✔
914
  *ppRsp = pRsp;
4✔
915
  TAOS_RETURN(code);
4✔
916
}
917

918
static int32_t mndSetScanDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t scanTs) {
6✔
919
  int32_t code = 0;
6✔
920
  SDbObj  dbObj = {0};
6✔
921
  memcpy(&dbObj, pDb, sizeof(SDbObj));
6✔
922
  dbObj.scanStartTime = scanTs;
6✔
923

924
  SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj);
6✔
925
  if (pCommitRaw == NULL) {
6!
926
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
927
    if (terrno != 0) code = terrno;
×
928
    TAOS_RETURN(code);
×
929
  }
930
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
6!
931
    sdbFreeRaw(pCommitRaw);
×
932
    TAOS_RETURN(code);
×
933
  }
934

935
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
6!
936
    sdbFreeRaw(pCommitRaw);
×
937
    TAOS_RETURN(code);
×
938
  }
939
  TAOS_RETURN(code);
6✔
940
}
941

942
static void *mndBuildScanVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t scanTs,
5✔
943
                                  STimeWindow tw) {
944
  SScanVnodeReq scanReq = {0};
5✔
945
  scanReq.dbUid = pDb->uid;
5✔
946
  scanReq.scanStartTime = scanTs;
5✔
947
  scanReq.tw = tw;
5✔
948
  tstrncpy(scanReq.db, pDb->name, TSDB_DB_FNAME_LEN);
5✔
949

950
  mInfo("vgId:%d, build scan vnode config req", pVgroup->vgId);
5!
951
  int32_t contLen = tSerializeSScanVnodeReq(NULL, 0, &scanReq);
5✔
952
  if (contLen < 0) {
5!
953
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
954
    return NULL;
×
955
  }
956
  contLen += sizeof(SMsgHead);
5✔
957

958
  void *pReq = taosMemoryMalloc(contLen);
5!
959
  if (pReq == NULL) {
5!
960
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
961
    return NULL;
×
962
  }
963

964
  SMsgHead *pHead = pReq;
5✔
965
  pHead->contLen = htonl(contLen);
5✔
966
  pHead->vgId = htonl(pVgroup->vgId);
5✔
967

968
  if (tSerializeSScanVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &scanReq) < 0) {
5!
969
    taosMemoryFree(pReq);
×
970
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
971
    return NULL;
×
972
  }
973
  *pContLen = contLen;
5✔
974
  return pReq;
5✔
975
}
976

977
static int32_t mndBuildScanVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t scanTs,
5✔
978
                                        STimeWindow tw) {
979
  int32_t      code = 0;
5✔
980
  STransAction action = {0};
5✔
981
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
5✔
982

983
  int32_t contLen = 0;
5✔
984
  void   *pReq = mndBuildScanVnodeReq(pMnode, pDb, pVgroup, &contLen, scanTs, tw);
5✔
985
  if (pReq == NULL) {
5!
986
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
987
    if (terrno != 0) code = terrno;
×
988
    TAOS_RETURN(code);
×
989
  }
990

991
  action.pCont = pReq;
5✔
992
  action.contLen = contLen;
5✔
993
  action.msgType = TDMT_VND_SCAN;
5✔
994

995
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
5!
996
    taosMemoryFree(pReq);
×
997
    TAOS_RETURN(code);
×
998
  }
999

1000
  TAOS_RETURN(code);
5✔
1001
}
1002

1003
extern int32_t mndAddScanDetailToTran(SMnode *pMnode, STrans *pTrans, SScanObj *pScan, SVgObj *pVgroup,
1004
                                      SVnodeGid *pVgid, int32_t index);
1005

1006
static int32_t mndSetScanDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t scanTs, STimeWindow tw,
6✔
1007
                                       SArray *vgroupIds, SScanDbRsp *pScanRsp) {
1008
  int32_t code = 0;
6✔
1009
  SSdb   *pSdb = pMnode->pSdb;
6✔
1010
  void   *pIter = NULL;
6✔
1011

1012
  SScanObj scan;
1013
  if ((code = mndAddScanToTran(pMnode, pTrans, &scan, pDb, pScanRsp)) != 0) {
6!
1014
    TAOS_RETURN(code);
×
1015
  }
1016

1017
  int32_t j = 0;
6✔
1018
  int32_t numOfVgroups = taosArrayGetSize(vgroupIds);
6✔
1019
  if (numOfVgroups > 0) {
6✔
1020
    for (int32_t i = 0; i < numOfVgroups; i++) {
9✔
1021
      int64_t vgId = *(int64_t *)taosArrayGet(vgroupIds, i);
7✔
1022
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
7✔
1023

1024
      if (pVgroup == NULL) {
7!
1025
        mError("db:%s, vgroup:%" PRId64 " not exist", pDb->name, vgId);
×
1026
        TAOS_RETURN(TSDB_CODE_MND_VGROUP_NOT_EXIST);
×
1027
      } else if (pVgroup->dbUid != pDb->uid) {
7✔
1028
        mError("db:%s, vgroup:%" PRId64 " not belong to db:%s", pDb->name, vgId, pDb->name);
2!
1029
        sdbRelease(pSdb, pVgroup);
2✔
1030
        TAOS_RETURN(TSDB_CODE_MND_VGROUP_NOT_EXIST);
2✔
1031
      }
1032
      sdbRelease(pSdb, pVgroup);
5✔
1033
    }
1034

1035
    for (int32_t i = 0; i < numOfVgroups; i++) {
5✔
1036
      int64_t vgId = *(int64_t *)taosArrayGet(vgroupIds, i);
3✔
1037
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
3✔
1038

1039
      if ((code = mndBuildScanVgroupAction(pMnode, pTrans, pDb, pVgroup, scanTs, tw)) != 0) {
3!
1040
        sdbRelease(pSdb, pVgroup);
×
1041
        TAOS_RETURN(code);
×
1042
      }
1043

1044
      for (int32_t i = 0; i < pVgroup->replica; i++) {
6✔
1045
        SVnodeGid *gid = &pVgroup->vnodeGid[i];
3✔
1046
        if ((code = mndAddScanDetailToTran(pMnode, pTrans, &scan, pVgroup, gid, j)) != 0) {
3!
1047
          sdbRelease(pSdb, pVgroup);
×
1048
          TAOS_RETURN(code);
×
1049
        }
1050
        j++;
3✔
1051
      }
1052
      sdbRelease(pSdb, pVgroup);
3✔
1053
    }
1054
  } else {
1055
    while (1) {
4✔
1056
      SVgObj *pVgroup = NULL;
6✔
1057
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
6✔
1058
      if (pIter == NULL) break;
6✔
1059

1060
      if (pVgroup->dbUid == pDb->uid) {
4✔
1061
        if ((code = mndBuildScanVgroupAction(pMnode, pTrans, pDb, pVgroup, scanTs, tw)) != 0) {
2!
1062
          sdbCancelFetch(pSdb, pIter);
×
1063
          sdbRelease(pSdb, pVgroup);
×
1064
          TAOS_RETURN(code);
×
1065
        }
1066

1067
        for (int32_t i = 0; i < pVgroup->replica; i++) {
4✔
1068
          SVnodeGid *gid = &pVgroup->vnodeGid[i];
2✔
1069
          if ((code = mndAddScanDetailToTran(pMnode, pTrans, &scan, pVgroup, gid, j)) != 0) {
2!
1070
            sdbCancelFetch(pSdb, pIter);
×
1071
            sdbRelease(pSdb, pVgroup);
×
1072
            TAOS_RETURN(code);
×
1073
          }
1074
          j++;
2✔
1075
        }
1076
      }
1077

1078
      sdbRelease(pSdb, pVgroup);
4✔
1079
    }
1080
  }
1081

1082
  TAOS_RETURN(code);
4✔
1083
}
1084

1085
static int32_t mndScanDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds) {
6✔
1086
  int32_t    code = 0;
6✔
1087
  int32_t    lino;
1088
  SScanDbRsp scanRsp = {0};
6✔
1089

1090
  bool  isExist = false;
6✔
1091
  void *pIter = NULL;
6✔
1092
  while (1) {
1✔
1093
    SScanObj *pScan = NULL;
7✔
1094
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN, pIter, (void **)&pScan);
7✔
1095
    if (pIter == NULL) break;
7✔
1096

1097
    if (strcmp(pScan->dbname, pDb->name) == 0) {
1!
1098
      isExist = true;
×
1099
    }
1100
    sdbRelease(pMnode->pSdb, pScan);
1✔
1101
  }
1102
  if (isExist) {
6!
1103
    mInfo("scan db:%s already exist", pDb->name);
×
1104

1105
    if (pReq) {
×
1106
      int32_t rspLen = 0;
×
1107
      void   *pRsp = NULL;
×
1108
      scanRsp.scanId = 0;
×
1109
      scanRsp.bAccepted = false;
×
1110
      code = mndBuildScanDbRsp(&scanRsp, &rspLen, &pRsp, true);
×
1111
      TSDB_CHECK_CODE(code, lino, _OVER);
×
1112

1113
      pReq->info.rsp = pRsp;
×
1114
      pReq->info.rspLen = rspLen;
×
1115
    }
1116

1117
    return TSDB_CODE_MND_SCAN_ALREADY_EXIST;
×
1118
  }
1119

1120
  int64_t scanTs = taosGetTimestampMs();
6✔
1121
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "scan-db");
6✔
1122
  if (pTrans == NULL) goto _OVER;
6!
1123

1124
  mInfo("trans:%d, used to scan db:%s", pTrans->id, pDb->name);
6!
1125
  mndTransSetDbName(pTrans, pDb->name, NULL);
6✔
1126
  code = mndTransCheckConflict(pMnode, pTrans);
6✔
1127
  TSDB_CHECK_CODE(code, lino, _OVER);
6!
1128

1129
  code = mndSetScanDbCommitLogs(pMnode, pTrans, pDb, scanTs);
6✔
1130
  TSDB_CHECK_CODE(code, lino, _OVER);
6!
1131

1132
  code = mndSetScanDbRedoActions(pMnode, pTrans, pDb, scanTs, tw, vgroupIds, &scanRsp);
6✔
1133
  TSDB_CHECK_CODE(code, lino, _OVER);
6✔
1134

1135
  if (pReq) {
4!
1136
    int32_t rspLen = 0;
4✔
1137
    void   *pRsp = NULL;
4✔
1138
    scanRsp.bAccepted = true;
4✔
1139
    code = mndBuildScanDbRsp(&scanRsp, &rspLen, &pRsp, false);
4✔
1140
    TSDB_CHECK_CODE(code, lino, _OVER);
4!
1141
    mndTransSetRpcRsp(pTrans, pRsp, rspLen);
4✔
1142
  }
1143

1144
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
4!
1145
  code = 0;
4✔
1146

1147
_OVER:
6✔
1148
  mndTransDrop(pTrans);
6✔
1149
  TAOS_RETURN(code);
6✔
1150
}
1151

1152
static int32_t mndProcessScanTimer(SRpcMsg *pReq) {
3,556✔
1153
  mTrace("start to process scan timer");
3,556✔
1154
  mndScanPullup(pReq->info.node);
3,556✔
1155
  return 0;
3,556✔
1156
}
1157

1158
int32_t mndProcessScanDbReq(SRpcMsg *pReq) {
6✔
1159
  SMnode    *pMnode = pReq->info.node;
6✔
1160
  int32_t    code = -1;
6✔
1161
  SDbObj    *pDb = NULL;
6✔
1162
  SScanDbReq scanReq = {0};
6✔
1163

1164
  if (tDeserializeSScanDbReq(pReq->pCont, pReq->contLen, &scanReq) != 0) {
6!
1165
    code = TSDB_CODE_INVALID_MSG;
×
1166
    goto _OVER;
×
1167
  }
1168

1169
  mInfo("db:%s, start to scan", scanReq.db);
6!
1170

1171
  pDb = mndAcquireDb(pMnode, scanReq.db);
6✔
1172
  if (pDb == NULL) {
6!
1173
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1174
    if (terrno != 0) code = terrno;
×
1175
    goto _OVER;
×
1176
  }
1177

1178
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SCAN_DB, pDb), NULL, _OVER);
6!
1179

1180
  code = mndScanDb(pMnode, pReq, pDb, scanReq.timeRange, scanReq.vgroupIds);
6✔
1181
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
6✔
1182

1183
_OVER:
2✔
1184
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
6!
1185
    mError("db:%s, failed to process scan db req since %s", scanReq.db, terrstr());
2!
1186
  }
1187

1188
  mndReleaseDb(pMnode, pDb);
6✔
1189
  tFreeSScanDbReq(&scanReq);
6✔
1190
  TAOS_RETURN(code);
6✔
1191
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc