• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5057

17 May 2026 01:15AM UTC coverage: 73.406% (+0.02%) from 73.384%
#5057

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281727 of 383795 relevant lines covered (73.41%)

136101761.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.69
/source/dnode/mnode/impl/src/mndScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndScan.h"
16
#include "audit.h"
17
#include "mndDb.h"
18
#include "mndDnode.h"
19
#include "mndPrivilege.h"
20
#include "mndScan.h"
21
#include "mndScanDetail.h"
22
#include "mndShow.h"
23
#include "mndTrans.h"
24
#include "mndUser.h"
25
#include "mndVgroup.h"
26
#include "tmisce.h"
27
#include "tmsgcb.h"
28

29
#define MND_SCAN_VER_NUMBER 1
30
#define MND_SCAN_ID_LEN     11
31

32
static int32_t  mndProcessScanTimer(SRpcMsg *pReq);
33
static SSdbRaw *mndScanActionEncode(SScanObj *pScan);
34
static SSdbRow *mndScanActionDecode(SSdbRaw *pRaw);
35
static int32_t  mndScanActionInsert(SSdb *pSdb, SScanObj *pScan);
36
static int32_t  mndScanActionUpdate(SSdb *pSdb, SScanObj *pOldScan, SScanObj *pNewScan);
37
static int32_t  mndScanActionDelete(SSdb *pSdb, SScanObj *pScan);
38
static int32_t  mndRetrieveScan(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
39
static int32_t  mndProcessKillScanReq(SRpcMsg *pReq);
40
static int32_t  mndProcessQueryScanRsp(SRpcMsg *pReq);
41

42
int32_t mndInitScan(SMnode *pMnode) {
531,768✔
43
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SCAN, mndRetrieveScan);
531,768✔
44
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_SCAN, mndProcessKillScanReq);
531,768✔
45
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_SCAN_PROGRESS_RSP, mndProcessQueryScanRsp);
531,768✔
46
  mndSetMsgHandle(pMnode, TDMT_MND_SCAN_TIMER, mndProcessScanTimer);
531,768✔
47
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_SCAN_RSP, mndTransProcessRsp);
531,768✔
48

49
  SSdbTable table = {
531,768✔
50
      .sdbType = SDB_SCAN,
51
      .keyType = SDB_KEY_INT32,
52
      .encodeFp = (SdbEncodeFp)mndScanActionEncode,
53
      .decodeFp = (SdbDecodeFp)mndScanActionDecode,
54
      .insertFp = (SdbInsertFp)mndScanActionInsert,
55
      .updateFp = (SdbUpdateFp)mndScanActionUpdate,
56
      .deleteFp = (SdbDeleteFp)mndScanActionDelete,
57
  };
58

59
  return sdbSetTable(pMnode->pSdb, table);
531,768✔
60
}
61

62
void mndCleanupScan(SMnode *pMnode) { mDebug("mnd scan cleanup"); }
531,704✔
63

64
void tFreeScanObj(SScanObj *pScan) {}
1,364✔
65

66
static int32_t tSerializeSScanObj(void *buf, int32_t bufLen, const SScanObj *pObj) {
3,008✔
67
  SEncoder encoder = {0};
3,008✔
68
  int32_t  code = 0;
3,008✔
69
  int32_t  lino;
70
  int32_t  tlen;
71
  tEncoderInit(&encoder, buf, bufLen);
3,008✔
72

73
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
3,008✔
74
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->scanId));
6,016✔
75
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
6,016✔
76
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
6,016✔
77
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
6,016✔
78

79
  tEndEncode(&encoder);
3,008✔
80

81
_exit:
3,008✔
82
  if (code) {
3,008✔
83
    tlen = code;
×
84
  } else {
85
    tlen = encoder.pos;
3,008✔
86
  }
87
  tEncoderClear(&encoder);
3,008✔
88
  return tlen;
3,008✔
89
}
90

91
int32_t tDeserializeSScanObj(void *buf, int32_t bufLen, SScanObj *pObj) {
1,364✔
92
  int32_t  code = 0;
1,364✔
93
  int32_t  lino;
94
  SDecoder decoder = {0};
1,364✔
95
  tDecoderInit(&decoder, buf, bufLen);
1,364✔
96

97
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
1,364✔
98
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->scanId));
2,728✔
99
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
1,364✔
100
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
2,728✔
101
  if (!tDecodeIsEnd(&decoder)) {
1,364✔
102
    TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
2,728✔
103
  } else {
104
    pObj->dbUid = 0;
×
105
  }
106

107
  tEndDecode(&decoder);
1,364✔
108

109
_exit:
1,364✔
110
  tDecoderClear(&decoder);
1,364✔
111
  return code;
1,364✔
112
}
113

114
static SSdbRaw *mndScanActionEncode(SScanObj *pScan) {
1,504✔
115
  int32_t code = 0;
1,504✔
116
  int32_t lino = 0;
1,504✔
117
  terrno = TSDB_CODE_SUCCESS;
1,504✔
118

119
  void    *buf = NULL;
1,504✔
120
  SSdbRaw *pRaw = NULL;
1,504✔
121

122
  int32_t tlen = tSerializeSScanObj(NULL, 0, pScan);
1,504✔
123
  if (tlen < 0) {
1,504✔
124
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
125
    goto OVER;
×
126
  }
127

128
  int32_t size = sizeof(int32_t) + tlen;
1,504✔
129
  pRaw = sdbAllocRaw(SDB_SCAN, MND_SCAN_VER_NUMBER, size);
1,504✔
130
  if (pRaw == NULL) {
1,504✔
131
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
132
    goto OVER;
×
133
  }
134

135
  buf = taosMemoryMalloc(tlen);
1,504✔
136
  if (buf == NULL) {
1,504✔
137
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
138
    goto OVER;
×
139
  }
140

141
  tlen = tSerializeSScanObj(buf, tlen, pScan);
1,504✔
142
  if (tlen < 0) {
1,504✔
143
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
144
    goto OVER;
×
145
  }
146

147
  int32_t dataPos = 0;
1,504✔
148
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
1,504✔
149
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
1,504✔
150
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
1,504✔
151

152
OVER:
1,504✔
153
  taosMemoryFreeClear(buf);
1,504✔
154
  if (terrno != TSDB_CODE_SUCCESS) {
1,504✔
155
    mError("scan:%" PRId32 ", failed to encode to raw:%p since %s", pScan->scanId, pRaw, terrstr());
×
156
    sdbFreeRaw(pRaw);
×
157
    return NULL;
×
158
  }
159

160
  mTrace("scan:%" PRId32 ", encode to raw:%p, row:%p", pScan->scanId, pRaw, pScan);
1,504✔
161
  return pRaw;
1,504✔
162
}
163

164
static SSdbRow *mndScanActionDecode(SSdbRaw *pRaw) {
1,364✔
165
  int32_t   code = 0;
1,364✔
166
  int32_t   lino = 0;
1,364✔
167
  SSdbRow  *pRow = NULL;
1,364✔
168
  SScanObj *pScan = NULL;
1,364✔
169
  void     *buf = NULL;
1,364✔
170
  terrno = TSDB_CODE_SUCCESS;
1,364✔
171

172
  int8_t sver = 0;
1,364✔
173
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
1,364✔
174
    goto OVER;
×
175
  }
176

177
  if (sver != MND_SCAN_VER_NUMBER) {
1,364✔
178
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
179
    mError("scan read invalid ver, data ver: %d, curr ver: %d", sver, MND_SCAN_VER_NUMBER);
×
180
    goto OVER;
×
181
  }
182

183
  pRow = sdbAllocRow(sizeof(SScanObj));
1,364✔
184
  if (pRow == NULL) {
1,364✔
185
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
186
    goto OVER;
×
187
  }
188

189
  pScan = sdbGetRowObj(pRow);
1,364✔
190
  if (pScan == NULL) {
1,364✔
191
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
192
    goto OVER;
×
193
  }
194

195
  int32_t tlen;
1,364✔
196
  int32_t dataPos = 0;
1,364✔
197
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
1,364✔
198
  buf = taosMemoryMalloc(tlen + 1);
1,364✔
199
  if (buf == NULL) {
1,364✔
200
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
201
    goto OVER;
×
202
  }
203
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
1,364✔
204

205
  if ((terrno = tDeserializeSScanObj(buf, tlen, pScan)) < 0) {
1,364✔
206
    goto OVER;
×
207
  }
208

209
OVER:
1,364✔
210
  taosMemoryFreeClear(buf);
1,364✔
211
  if (terrno != TSDB_CODE_SUCCESS) {
1,364✔
212
    mError("scan:%" PRId32 ", failed to decode from raw:%p since %s", pScan->scanId, pRaw, terrstr());
×
213
    taosMemoryFreeClear(pRow);
×
214
    return NULL;
×
215
  }
216

217
  mTrace("scan:%" PRId32 ", decode from raw:%p, row:%p", pScan->scanId, pRaw, pScan);
1,364✔
218
  return pRow;
1,364✔
219
}
220

221
static int32_t mndScanActionInsert(SSdb *pSdb, SScanObj *pScan) {
611✔
222
  mTrace("scan:%" PRId32 ", perform insert action", pScan->scanId);
611✔
223
  return 0;
611✔
224
}
225

226
static int32_t mndScanActionDelete(SSdb *pSdb, SScanObj *pScan) {
1,364✔
227
  mTrace("scan:%" PRId32 ", perform delete action", pScan->scanId);
1,364✔
228
  tFreeScanObj(pScan);
1,364✔
229
  return 0;
1,364✔
230
}
231

232
static int32_t mndScanActionUpdate(SSdb *pSdb, SScanObj *pOldScan, SScanObj *pNewScan) {
142✔
233
  mTrace("scan:%" PRId32 ", perform update action, old row:%p new row:%p", pOldScan->scanId, pOldScan, pNewScan);
142✔
234

235
  return 0;
142✔
236
}
237

238
static SScanObj *mndAcquireScan(SMnode *pMnode, int64_t scanId) {
2,540✔
239
  SSdb     *pSdb = pMnode->pSdb;
2,540✔
240
  SScanObj *pScan = sdbAcquire(pSdb, SDB_SCAN, &scanId);
2,540✔
241
  if (pScan == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
2,540✔
242
    terrno = TSDB_CODE_SUCCESS;
×
243
  }
244
  return pScan;
2,540✔
245
}
246

247
static void mndReleaseScan(SMnode *pMnode, SScanObj *pScan) {
2,540✔
248
  SSdb *pSdb = pMnode->pSdb;
2,540✔
249
  sdbRelease(pSdb, pScan);
2,540✔
250
  pScan = NULL;
2,540✔
251
}
2,540✔
252

253
static int32_t mndScanGetDbInfo(SMnode *pMnode, int32_t scanId, char *dbname, int32_t len, int64_t *dbUid) {
894✔
254
  int32_t   code = 0;
894✔
255
  SScanObj *pScan = mndAcquireScan(pMnode, scanId);
894✔
256
  if (pScan == NULL) {
894✔
257
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
258
    if (terrno != 0) code = terrno;
×
259
    TAOS_RETURN(code);
×
260
  }
261

262
  tstrncpy(dbname, pScan->dbname, len);
894✔
263
  if (dbUid) *dbUid = pScan->dbUid;
894✔
264
  mndReleaseScan(pMnode, pScan);
894✔
265
  TAOS_RETURN(code);
894✔
266
}
267

268
// scan db
269
int32_t mndAddScanToTran(SMnode *pMnode, STrans *pTrans, SScanObj *pScan, SDbObj *pDb, SScanDbRsp *rsp) {
589✔
270
  int32_t code = 0;
589✔
271
  pScan->scanId = tGenIdPI32();
589✔
272

273
  tstrncpy(pScan->dbname, pDb->name, sizeof(pScan->dbname));
589✔
274
  pScan->dbUid = pDb->uid;
589✔
275

276
  pScan->startTime = taosGetTimestampMs();
589✔
277

278
  SSdbRaw *pVgRaw = mndScanActionEncode(pScan);
589✔
279
  if (pVgRaw == NULL) {
589✔
280
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
281
    if (terrno != 0) code = terrno;
×
282
    TAOS_RETURN(code);
×
283
  }
284
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
589✔
285
    sdbFreeRaw(pVgRaw);
×
286
    TAOS_RETURN(code);
×
287
  }
288

289
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
589✔
290
    sdbFreeRaw(pVgRaw);
×
291
    TAOS_RETURN(code);
×
292
  }
293

294
  rsp->scanId = pScan->scanId;
589✔
295

296
  return 0;
589✔
297
}
298

299
// retrieve scan
300
static int32_t mndRetrieveScan(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
3,124✔
301
  SMnode   *pMnode = pReq->info.node;
3,124✔
302
  SSdb     *pSdb = pMnode->pSdb;
3,124✔
303
  int32_t   numOfRows = 0;
3,124✔
304
  SScanObj *pScan = NULL;
3,124✔
305
  char     *sep = NULL;
3,124✔
306
  SDbObj   *pDb = NULL;
3,124✔
307
  int32_t   code = 0;
3,124✔
308
  int32_t   lino = 0;
3,124✔
309
  SUserObj *pUser = NULL;
3,124✔
310
  SDbObj   *pIterDb = NULL;
3,124✔
311
  char      objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
3,124✔
312
  bool      showAll = false, showIter = false;
3,124✔
313
  int64_t   dbUid = 0;
3,124✔
314

315
  if (strlen(pShow->db) > 0) {
3,124✔
316
    sep = strchr(pShow->db, '.');
×
317
    if (sep &&
×
318
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
319
      sep++;
×
320
    } else {
321
      pDb = mndAcquireDb(pMnode, pShow->db);
×
322
      if (pDb == NULL) return terrno;
×
323
    }
324
  }
325

326
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_SCANS, PRIV_OBJ_DB, 0, _OVER);
3,124✔
327

328
  while (numOfRows < rows) {
6,106✔
329
    pShow->pIter = sdbFetch(pSdb, SDB_SCAN, pShow->pIter, (void **)&pScan);
6,106✔
330
    if (pShow->pIter == NULL) break;
6,106✔
331

332
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pScan->dbname, pScan, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_SCANS, _OVER);
2,982✔
333

334
    SColumnInfoData *pColInfo;
335
    SName            n;
336
    int32_t          cols = 0;
2,982✔
337

338
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
2,982✔
339

340
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,982✔
341
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pScan->scanId, false), pScan, &lino, _OVER);
2,982✔
342

343
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,982✔
344
    if (pDb != NULL || !IS_SYS_DBNAME(pScan->dbname)) {
5,964✔
345
      SName name = {0};
2,982✔
346
      TAOS_CHECK_GOTO(tNameFromString(&name, pScan->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
2,982✔
347
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
2,982✔
348
    } else {
349
      tstrncpy(varDataVal(tmpBuf), pScan->dbname, TSDB_SHOW_SQL_LEN);
×
350
    }
351
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
2,982✔
352
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pScan, &lino, _OVER);
2,982✔
353

354
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,982✔
355
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pScan->startTime, false), pScan, &lino,
2,982✔
356
                        _OVER);
357

358
    numOfRows++;
2,982✔
359
    sdbRelease(pSdb, pScan);
2,982✔
360
  }
361

362
_OVER:
3,124✔
363
  if (pUser) mndReleaseUser(pMnode, pUser);
3,124✔
364
  mndReleaseDb(pMnode, pDb);
3,124✔
365
  if (code != 0) {
3,124✔
366
    mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
367
    TAOS_RETURN(code);
×
368
  }
369
  pShow->numOfRows += numOfRows;
3,124✔
370
  return numOfRows;
3,124✔
371
}
372

373
// kill scan
374
static void *mndBuildKillScanReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t scanId, int32_t dnodeid) {
142✔
375
  SVKillScanReq req = {0};
142✔
376
  req.scanId = scanId;
142✔
377
  req.vgId = pVgroup->vgId;
142✔
378
  req.dnodeId = dnodeid;
142✔
379
  terrno = 0;
142✔
380

381
  mInfo("vgId:%d, build scan vnode config req", pVgroup->vgId);
142✔
382
  int32_t contLen = tSerializeSVKillScanReq(NULL, 0, &req);
142✔
383
  if (contLen < 0) {
142✔
384
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
385
    return NULL;
×
386
  }
387
  contLen += sizeof(SMsgHead);
142✔
388

389
  void *pReq = taosMemoryMalloc(contLen);
142✔
390
  if (pReq == NULL) {
142✔
391
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
392
    return NULL;
×
393
  }
394

395
  SMsgHead *pHead = pReq;
142✔
396
  pHead->contLen = htonl(contLen);
142✔
397
  pHead->vgId = htonl(pVgroup->vgId);
142✔
398

399
  mTrace("vgId:%d, build scan vnode config req, contLen:%d", pVgroup->vgId, contLen);
142✔
400
  int32_t ret = 0;
142✔
401
  if ((ret = tSerializeSVKillScanReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
142✔
402
    taosMemoryFreeClear(pReq);
×
403
    terrno = ret;
×
404
    return NULL;
×
405
  }
406
  *pContLen = contLen;
142✔
407
  return pReq;
142✔
408
}
409

410
static int32_t mndAddKillScanAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t scanId, int32_t dnodeid) {
142✔
411
  int32_t      code = 0;
142✔
412
  STransAction action = {0};
142✔
413

414
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
142✔
415
  if (pDnode == NULL) {
142✔
416
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
417
    if (terrno != 0) code = terrno;
×
418
    TAOS_RETURN(code);
×
419
  }
420
  action.epSet = mndGetDnodeEpset(pDnode);
142✔
421
  mndReleaseDnode(pMnode, pDnode);
142✔
422

423
  int32_t contLen = 0;
142✔
424
  void   *pReq = mndBuildKillScanReq(pMnode, pVgroup, &contLen, scanId, dnodeid);
142✔
425
  if (pReq == NULL) {
142✔
426
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
427
    if (terrno != 0) code = terrno;
×
428
    TAOS_RETURN(code);
×
429
  }
430

431
  action.pCont = pReq;
142✔
432
  action.contLen = contLen;
142✔
433
  action.msgType = TDMT_VND_KILL_SCAN;
142✔
434

435
  mTrace("trans:%d, kill scan msg len:%d", pTrans->id, contLen);
142✔
436

437
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
142✔
438
    taosMemoryFree(pReq);
×
439
    TAOS_RETURN(code);
×
440
  }
441

442
  return 0;
142✔
443
}
444

445
static int32_t mndKillScan(SMnode *pMnode, SRpcMsg *pReq, SScanObj *pScan) {
142✔
446
  int32_t code = 0;
142✔
447
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-scan");
142✔
448
  if (pTrans == NULL) {
142✔
449
    mError("scan:%" PRId32 ", failed to drop since %s", pScan->scanId, terrstr());
×
450
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
451
    if (terrno != 0) code = terrno;
×
452
    TAOS_RETURN(code);
×
453
  }
454
  mInfo("trans:%d, used to kill scan:%" PRId32, pTrans->id, pScan->scanId);
142✔
455

456
  mndTransSetDbName(pTrans, pScan->dbname, NULL);
142✔
457

458
  SSdbRaw *pCommitRaw = mndScanActionEncode(pScan);
142✔
459
  if (pCommitRaw == NULL) {
142✔
460
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
461
    if (terrno != 0) code = terrno;
×
462
    mndTransDrop(pTrans);
×
463
    TAOS_RETURN(code);
×
464
  }
465
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
142✔
466
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
467
    mndTransDrop(pTrans);
×
468
    TAOS_RETURN(code);
×
469
  }
470
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
142✔
471
    mndTransDrop(pTrans);
×
472
    TAOS_RETURN(code);
×
473
  }
474

475
  void *pIter = NULL;
142✔
476
  while (1) {
284✔
477
    SScanDetailObj *pDetail = NULL;
426✔
478
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
426✔
479
    if (pIter == NULL) break;
426✔
480

481
    if (pDetail->scanId == pScan->scanId) {
284✔
482
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
142✔
483
      if (pVgroup == NULL) {
142✔
484
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
485
        sdbCancelFetch(pMnode->pSdb, pIter);
×
486
        sdbRelease(pMnode->pSdb, pDetail);
×
487
        mndTransDrop(pTrans);
×
488
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
489
        if (terrno != 0) code = terrno;
×
490
        TAOS_RETURN(code);
×
491
      }
492

493
      if ((code = mndAddKillScanAction(pMnode, pTrans, pVgroup, pScan->scanId, pDetail->dnodeId)) != 0) {
142✔
494
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
495
        sdbCancelFetch(pMnode->pSdb, pIter);
×
496
        sdbRelease(pMnode->pSdb, pDetail);
×
497
        mndTransDrop(pTrans);
×
498
        TAOS_RETURN(code);
×
499
      }
500

501
      mndReleaseVgroup(pMnode, pVgroup);
142✔
502
    }
503

504
    sdbRelease(pMnode->pSdb, pDetail);
284✔
505
  }
506

507
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
142✔
508
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
509
    mndTransDrop(pTrans);
×
510
    TAOS_RETURN(code);
×
511
  }
512

513
  mndTransDrop(pTrans);
142✔
514
  return 0;
142✔
515
}
516

517
static int32_t mndProcessKillScanReq(SRpcMsg *pReq) {
142✔
518
  int32_t      code = 0;
142✔
519
  int32_t      lino = 0;
142✔
520
  SKillScanReq killScanReq = {0};
142✔
521

522
  if ((code = tDeserializeSKillScanReq(pReq->pCont, pReq->contLen, &killScanReq)) != 0) {
142✔
523
    TAOS_RETURN(code);
×
524
  }
525

526
  mInfo("start to kill scan:%" PRId32, killScanReq.scanId);
142✔
527

528
  SMnode   *pMnode = pReq->info.node;
142✔
529
  SScanObj *pScan = mndAcquireScan(pMnode, killScanReq.scanId);
142✔
530
  if (pScan == NULL) {
142✔
531
    code = TSDB_CODE_MND_INVALID_SCAN_ID;
×
532
    tFreeSKillScanReq(&killScanReq);
×
533
    TAOS_RETURN(code);
×
534
  }
535

536
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_SCAN_DB), &lino, _OVER);
142✔
537

538
  TAOS_CHECK_GOTO(mndKillScan(pMnode, pReq, pScan), &lino, _OVER);
142✔
539

540
  code = TSDB_CODE_ACTION_IN_PROGRESS;
142✔
541

542
#if 0
543
  char    obj[TSDB_INT32_ID_LEN] = {0};
544
  int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pScan->scanId);
545
  if ((uint32_t)nBytes < sizeof(obj)) {
546
    auditRecord(pReq, pMnode->clusterId, "killScan", pScan->dbname, obj, killScanReq.sql, killScanReq.sqlLen);
547
  } else {
548
    mError("scan:%" PRId32 " failed to audit since %s", pScan->scanId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
549
  }
550
#endif
551
_OVER:
142✔
552
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
142✔
553
    mError("failed to kill scan %" PRId32 " since %s", killScanReq.scanId, terrstr());
×
554
  }
555

556
  tFreeSKillScanReq(&killScanReq);
142✔
557
  mndReleaseScan(pMnode, pScan);
142✔
558

559
  TAOS_RETURN(code);
142✔
560
}
561

562
// update progress
563
static int32_t mndUpdateScanProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t scanId, SQueryScanProgressRsp *rsp) {
710✔
564
  int32_t code = 0;
710✔
565

566
  void *pIter = NULL;
710✔
567
  while (1) {
284✔
568
    SScanDetailObj *pDetail = NULL;
994✔
569
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
994✔
570
    if (pIter == NULL) break;
994✔
571

572
    if (pDetail->scanId == scanId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
994✔
573
      pDetail->newNumberFileset = rsp->numberFileset;
710✔
574
      pDetail->newFinished = rsp->finished;
710✔
575
      pDetail->progress = rsp->progress;
710✔
576
      pDetail->remainingTime = rsp->remainingTime;
710✔
577

578
      sdbCancelFetch(pMnode->pSdb, pIter);
710✔
579
      sdbRelease(pMnode->pSdb, pDetail);
710✔
580

581
      TAOS_RETURN(code);
710✔
582
    }
583

584
    sdbRelease(pMnode->pSdb, pDetail);
284✔
585
  }
586

587
  return TSDB_CODE_MND_SCAN_DETAIL_NOT_EXIST;
×
588
}
589

590
static int32_t mndProcessQueryScanRsp(SRpcMsg *pReq) {
1,362✔
591
  int32_t               code = 0;
1,362✔
592
  SQueryScanProgressRsp req = {0};
1,362✔
593
  if (pReq->code != 0) {
1,362✔
594
    mError("received wrong scan response, req code is %s", tstrerror(pReq->code));
652✔
595
    TAOS_RETURN(pReq->code);
652✔
596
  }
597
  code = tDeserializeSQueryScanProgressRsp(pReq->pCont, pReq->contLen, &req);
710✔
598
  if (code != 0) {
710✔
599
    mError("failed to deserialize vnode-query-scan-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
600
           pReq->contLen);
601
    TAOS_RETURN(code);
×
602
  }
603

604
  mDebug("scan:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.scanId, req.vgId,
710✔
605
         req.dnodeId, req.numberFileset, req.finished);
606

607
  SMnode *pMnode = pReq->info.node;
710✔
608

609
  code = mndUpdateScanProgress(pMnode, pReq, req.scanId, &req);
710✔
610
  if (code != 0) {
710✔
611
    mError("scan:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.scanId,
×
612
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
613
    TAOS_RETURN(code);
×
614
  }
615

616
  TAOS_RETURN(code);
710✔
617
}
618

619
// timer
620
static void mndScanSendProgressReq(SMnode *pMnode, SScanObj *pScan) {
894✔
621
  void *pIter = NULL;
894✔
622

623
  while (1) {
1,575✔
624
    SScanDetailObj *pDetail = NULL;
2,469✔
625
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
2,469✔
626
    if (pIter == NULL) break;
2,469✔
627

628
    if (pDetail->scanId == pScan->scanId) {
1,575✔
629
      SEpSet epSet = {0};
1,362✔
630

631
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
1,362✔
632
      if (pDnode == NULL) break;
1,362✔
633
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
1,362✔
634
        sdbRelease(pMnode->pSdb, pDetail);
×
635
        continue;
×
636
      }
637
      mndReleaseDnode(pMnode, pDnode);
1,362✔
638

639
      SQueryScanProgressReq req;
1,362✔
640
      req.scanId = pDetail->scanId;
1,362✔
641
      req.vgId = pDetail->vgId;
1,362✔
642
      req.dnodeId = pDetail->dnodeId;
1,362✔
643

644
      int32_t contLen = tSerializeSQueryScanProgressReq(NULL, 0, &req);
1,362✔
645
      if (contLen < 0) {
1,362✔
646
        sdbRelease(pMnode->pSdb, pDetail);
×
647
        continue;
×
648
      }
649

650
      contLen += sizeof(SMsgHead);
1,362✔
651

652
      SMsgHead *pHead = rpcMallocCont(contLen);
1,362✔
653
      if (pHead == NULL) {
1,362✔
654
        sdbRelease(pMnode->pSdb, pDetail);
×
655
        continue;
×
656
      }
657

658
      pHead->contLen = htonl(contLen);
1,362✔
659
      pHead->vgId = htonl(pDetail->vgId);
1,362✔
660

661
      if (tSerializeSQueryScanProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
1,362✔
662
        sdbRelease(pMnode->pSdb, pDetail);
×
663
        continue;
×
664
      }
665

666
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_SCAN_PROGRESS, .contLen = contLen};
1,362✔
667

668
      rpcMsg.pCont = pHead;
1,362✔
669

670
      char    detail[1024] = {0};
1,362✔
671
      int32_t len = snprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
2,724✔
672
                             TMSG_INFO(TDMT_VND_QUERY_SCAN_PROGRESS), epSet.numOfEps, epSet.inUse);
2,724✔
673
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
2,724✔
674
        len += snprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
1,362✔
675
      }
676

677
      mDebug("scan:%d, send update progress msg to %s", pDetail->scanId, detail);
1,362✔
678

679
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
1,362✔
680
        sdbRelease(pMnode->pSdb, pDetail);
×
681
        continue;
×
682
      }
683
    }
684

685
    sdbRelease(pMnode->pSdb, pDetail);
1,575✔
686
  }
687
}
894✔
688

689
static int32_t mndSaveScanProgress(SMnode *pMnode, int32_t scanId) {
894✔
690
  int32_t code = 0;
894✔
691
  bool    needSave = false;
894✔
692
  void   *pIter = NULL;
894✔
693
  while (1) {
1,575✔
694
    SScanDetailObj *pDetail = NULL;
2,469✔
695
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
2,469✔
696
    if (pIter == NULL) break;
2,469✔
697

698
    if (pDetail->scanId == scanId) {
1,575✔
699
      mDebug(
1,362✔
700
          "scan:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
701
          "newNumberFileset:%d, newFinished:%d",
702
          pDetail->scanId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
703
          pDetail->newNumberFileset, pDetail->newFinished);
704

705
      // these 2 number will jump back after dnode restart, so < is not used here
706
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
1,362✔
707
        needSave = true;
355✔
708
    }
709

710
    sdbRelease(pMnode->pSdb, pDetail);
1,575✔
711
  }
712

713
  char    dbname[TSDB_TABLE_FNAME_LEN] = {0};
894✔
714
  int64_t dbUid = 0;
894✔
715
  TAOS_CHECK_RETURN(mndScanGetDbInfo(pMnode, scanId, dbname, TSDB_TABLE_FNAME_LEN, &dbUid));
894✔
716

717
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
894✔
718
    needSave = true;
326✔
719
    mWarn("scan:%" PRId32 ", no db exist, set needSave:%s", scanId, dbname);
326✔
720
  }
721

722
  if (!needSave) {
894✔
723
    mDebug("scan:%" PRId32 ", no need to save", scanId);
284✔
724
    TAOS_RETURN(code);
284✔
725
  }
726

727
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-scan-progress");
610✔
728
  if (pTrans == NULL) {
610✔
729
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
730
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
731
    if (terrno != 0) code = terrno;
×
732
    TAOS_RETURN(code);
×
733
  }
734
  mInfo("scan:%d, trans:%d, used to update scan progress.", scanId, pTrans->id);
610✔
735

736
  mndTransSetDbName(pTrans, dbname, NULL);
610✔
737

738
  pIter = NULL;
610✔
739
  while (1) {
1,078✔
740
    SScanDetailObj *pDetail = NULL;
1,688✔
741
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
1,688✔
742
    if (pIter == NULL) break;
1,688✔
743

744
    if (pDetail->scanId == scanId) {
1,078✔
745
      mInfo(
1,007✔
746
          "scan:%d, trans:%d, check scan progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
747
          "newNumberFileset:%d, newFinished:%d",
748
          pDetail->scanId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
749
          pDetail->newNumberFileset, pDetail->newFinished);
750

751
      pDetail->numberFileset = pDetail->newNumberFileset;
1,007✔
752
      pDetail->finished = pDetail->newFinished;
1,007✔
753

754
      SSdbRaw *pCommitRaw = mndScanDetailActionEncode(pDetail);
1,007✔
755
      if (pCommitRaw == NULL) {
1,007✔
756
        sdbCancelFetch(pMnode->pSdb, pIter);
×
757
        sdbRelease(pMnode->pSdb, pDetail);
×
758
        mndTransDrop(pTrans);
×
759
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
760
        if (terrno != 0) code = terrno;
×
761
        TAOS_RETURN(code);
×
762
      }
763
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
1,007✔
764
        mError("scan:%d, trans:%d, failed to append commit log since %s", pDetail->scanId, pTrans->id, terrstr());
×
765
        sdbCancelFetch(pMnode->pSdb, pIter);
×
766
        sdbRelease(pMnode->pSdb, pDetail);
×
767
        mndTransDrop(pTrans);
×
768
        TAOS_RETURN(code);
×
769
      }
770
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
1,007✔
771
        sdbCancelFetch(pMnode->pSdb, pIter);
×
772
        sdbRelease(pMnode->pSdb, pDetail);
×
773
        mndTransDrop(pTrans);
×
774
        TAOS_RETURN(code);
×
775
      }
776
    }
777

778
    sdbRelease(pMnode->pSdb, pDetail);
1,078✔
779
  }
780

781
  bool allFinished = true;
610✔
782
  pIter = NULL;
610✔
783
  while (1) {
426✔
784
    SScanDetailObj *pDetail = NULL;
1,036✔
785
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
1,036✔
786
    if (pIter == NULL) break;
1,036✔
787

788
    if (pDetail->scanId == scanId) {
752✔
789
      mInfo("scan:%d, trans:%d, check scan finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
681✔
790
            pDetail->scanId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
791

792
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
681✔
793
        allFinished = false;
326✔
794
        sdbCancelFetch(pMnode->pSdb, pIter);
326✔
795
        sdbRelease(pMnode->pSdb, pDetail);
326✔
796
        break;
326✔
797
      }
798
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
355✔
799
        allFinished = false;
×
800
        sdbCancelFetch(pMnode->pSdb, pIter);
×
801
        sdbRelease(pMnode->pSdb, pDetail);
×
802
        break;
×
803
      }
804
    }
805

806
    sdbRelease(pMnode->pSdb, pDetail);
426✔
807
  }
808

809
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
610✔
810
    allFinished = true;
326✔
811
    mWarn("scan:%" PRId32 ", no db exist, set all finished:%s", scanId, dbname);
326✔
812
  }
813

814
  if (allFinished) {
610✔
815
    mInfo("scan:%d, all finished", scanId);
610✔
816
    pIter = NULL;
610✔
817
    while (1) {
1,078✔
818
      SScanDetailObj *pDetail = NULL;
1,688✔
819
      pIter = sdbFetch(pMnode->pSdb, SDB_SCAN_DETAIL, pIter, (void **)&pDetail);
1,688✔
820
      if (pIter == NULL) break;
1,688✔
821

822
      if (pDetail->scanId == scanId) {
1,078✔
823
        SSdbRaw *pCommitRaw = mndScanDetailActionEncode(pDetail);
1,007✔
824
        if (pCommitRaw == NULL) {
1,007✔
825
          mndTransDrop(pTrans);
×
826
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
827
          if (terrno != 0) code = terrno;
×
828
          TAOS_RETURN(code);
×
829
        }
830
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
1,007✔
831
          mError("scan:%d, trans:%d, failed to append commit log since %s", pDetail->scanId, pTrans->id, terrstr());
×
832
          sdbCancelFetch(pMnode->pSdb, pIter);
×
833
          sdbRelease(pMnode->pSdb, pDetail);
×
834
          mndTransDrop(pTrans);
×
835
          TAOS_RETURN(code);
×
836
        }
837
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
1,007✔
838
          sdbCancelFetch(pMnode->pSdb, pIter);
×
839
          sdbRelease(pMnode->pSdb, pDetail);
×
840
          mndTransDrop(pTrans);
×
841
          TAOS_RETURN(code);
×
842
        }
843
        mInfo("scan:%d, add drop scandetail action", pDetail->scanDetailId);
1,007✔
844
      }
845

846
      sdbRelease(pMnode->pSdb, pDetail);
1,078✔
847
    }
848

849
    SScanObj *pScan = mndAcquireScan(pMnode, scanId);
610✔
850
    if (pScan == NULL) {
610✔
851
      mndTransDrop(pTrans);
×
852
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
853
      if (terrno != 0) code = terrno;
×
854
      TAOS_RETURN(code);
×
855
    }
856
    SSdbRaw *pCommitRaw = mndScanActionEncode(pScan);
610✔
857
    mndReleaseScan(pMnode, pScan);
610✔
858
    if (pCommitRaw == NULL) {
610✔
859
      mndTransDrop(pTrans);
×
860
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
861
      if (terrno != 0) code = terrno;
×
862
      TAOS_RETURN(code);
×
863
    }
864
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
610✔
865
      mError("scan:%d, trans:%d, failed to append commit log since %s", scanId, pTrans->id, terrstr());
×
866
      mndTransDrop(pTrans);
×
867
      TAOS_RETURN(code);
×
868
    }
869
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
610✔
870
      mError("scan:%d, trans:%d, failed to append commit log since %s", scanId, pTrans->id, terrstr());
×
871
      mndTransDrop(pTrans);
×
872
      TAOS_RETURN(code);
×
873
    }
874
    mInfo("scan:%d, add drop scan action", pScan->scanId);
610✔
875
  }
876

877
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
610✔
878
    mError("scan:%d, trans:%d, failed to prepare since %s", scanId, pTrans->id, terrstr());
163✔
879
    mndTransDrop(pTrans);
163✔
880
    TAOS_RETURN(code);
163✔
881
  }
882

883
  mndTransDrop(pTrans);
447✔
884
  return 0;
447✔
885
}
886

887
static void mndScanPullup(SMnode *pMnode) {
3,790,630✔
888
  int32_t code = 0;
3,790,630✔
889
  SSdb   *pSdb = pMnode->pSdb;
3,790,630✔
890
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_SCAN), sizeof(int32_t));
3,790,630✔
891
  if (pArray == NULL) return;
3,790,630✔
892

893
  void *pIter = NULL;
3,790,630✔
894
  while (1) {
894✔
895
    SScanObj *pScan = NULL;
3,791,524✔
896
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN, pIter, (void **)&pScan);
3,791,524✔
897
    if (pIter == NULL) break;
3,791,524✔
898
    if (taosArrayPush(pArray, &pScan->scanId) == NULL) {
1,788✔
899
      mError("failed to push scan id:%d into array, but continue pull up", pScan->scanId);
×
900
    }
901
    sdbRelease(pSdb, pScan);
894✔
902
  }
903

904
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
3,791,524✔
905
    mInfo("begin to pull up");
894✔
906
    int32_t  *pScanId = taosArrayGet(pArray, i);
894✔
907
    SScanObj *pScan = mndAcquireScan(pMnode, *pScanId);
894✔
908
    if (pScan != NULL) {
894✔
909
      mInfo("scan:%d, begin to pull up", pScan->scanId);
894✔
910
      mndScanSendProgressReq(pMnode, pScan);
894✔
911
      if ((code = mndSaveScanProgress(pMnode, pScan->scanId)) != 0) {
894✔
912
        mError("scan:%d, failed to save scan progress since %s", pScan->scanId, tstrerror(code));
163✔
913
      }
914
      mndReleaseScan(pMnode, pScan);
894✔
915
    }
916
  }
917
  taosArrayDestroy(pArray);
3,790,630✔
918
}
919

920
static int32_t mndBuildScanDbRsp(SScanDbRsp *pScanRsp, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) {
447✔
921
  int32_t code = 0;
447✔
922
  int32_t rspLen = tSerializeSScanDbRsp(NULL, 0, pScanRsp);
447✔
923
  void   *pRsp = NULL;
447✔
924
  if (useRpcMalloc) {
447✔
925
    pRsp = rpcMallocCont(rspLen);
×
926
  } else {
927
    pRsp = taosMemoryMalloc(rspLen);
447✔
928
  }
929

930
  if (pRsp == NULL) {
447✔
931
    code = TSDB_CODE_OUT_OF_MEMORY;
×
932
    TAOS_RETURN(code);
×
933
  }
934

935
  (void)tSerializeSScanDbRsp(pRsp, rspLen, pScanRsp);
447✔
936
  *pRspLen = rspLen;
447✔
937
  *ppRsp = pRsp;
447✔
938
  TAOS_RETURN(code);
447✔
939
}
940

941
static int32_t mndSetScanDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t scanTs) {
589✔
942
  int32_t code = 0;
589✔
943
  SDbObj  dbObj = {0};
589✔
944
  memcpy(&dbObj, pDb, sizeof(SDbObj));
589✔
945
  dbObj.scanStartTime = scanTs;
589✔
946

947
  SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj);
589✔
948
  if (pCommitRaw == NULL) {
589✔
949
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
950
    if (terrno != 0) code = terrno;
×
951
    TAOS_RETURN(code);
×
952
  }
953
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
589✔
954
    sdbFreeRaw(pCommitRaw);
×
955
    TAOS_RETURN(code);
×
956
  }
957

958
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
589✔
959
    sdbFreeRaw(pCommitRaw);
×
960
    TAOS_RETURN(code);
×
961
  }
962
  TAOS_RETURN(code);
589✔
963
}
964

965
static void *mndBuildScanVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t scanTs,
681✔
966
                                  STimeWindow tw) {
967
  SScanVnodeReq scanReq = {0};
681✔
968
  scanReq.dbUid = pDb->uid;
681✔
969
  scanReq.scanStartTime = scanTs;
681✔
970
  scanReq.tw = tw;
681✔
971
  tstrncpy(scanReq.db, pDb->name, TSDB_DB_FNAME_LEN);
681✔
972

973
  mInfo("vgId:%d, build scan vnode config req", pVgroup->vgId);
681✔
974
  int32_t contLen = tSerializeSScanVnodeReq(NULL, 0, &scanReq);
681✔
975
  if (contLen < 0) {
681✔
976
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
977
    return NULL;
×
978
  }
979
  contLen += sizeof(SMsgHead);
681✔
980

981
  void *pReq = taosMemoryMalloc(contLen);
681✔
982
  if (pReq == NULL) {
681✔
983
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
984
    return NULL;
×
985
  }
986

987
  SMsgHead *pHead = pReq;
681✔
988
  pHead->contLen = htonl(contLen);
681✔
989
  pHead->vgId = htonl(pVgroup->vgId);
681✔
990

991
  if (tSerializeSScanVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &scanReq) < 0) {
681✔
992
    taosMemoryFree(pReq);
×
993
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
994
    return NULL;
×
995
  }
996
  *pContLen = contLen;
681✔
997
  return pReq;
681✔
998
}
999

1000
static int32_t mndBuildScanVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t scanTs,
681✔
1001
                                        STimeWindow tw) {
1002
  int32_t      code = 0;
681✔
1003
  STransAction action = {0};
681✔
1004
  action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
681✔
1005

1006
  int32_t contLen = 0;
681✔
1007
  void   *pReq = mndBuildScanVnodeReq(pMnode, pDb, pVgroup, &contLen, scanTs, tw);
681✔
1008
  if (pReq == NULL) {
681✔
1009
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1010
    if (terrno != 0) code = terrno;
×
1011
    TAOS_RETURN(code);
×
1012
  }
1013

1014
  action.pCont = pReq;
681✔
1015
  action.contLen = contLen;
681✔
1016
  action.msgType = TDMT_VND_SCAN;
681✔
1017

1018
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
681✔
1019
    taosMemoryFree(pReq);
×
1020
    TAOS_RETURN(code);
×
1021
  }
1022

1023
  TAOS_RETURN(code);
681✔
1024
}
1025

1026
extern int32_t mndAddScanDetailToTran(SMnode *pMnode, STrans *pTrans, SScanObj *pScan, SVgObj *pVgroup,
1027
                                      SVnodeGid *pVgid, int32_t index);
1028

1029
static int32_t mndSetScanDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t scanTs, STimeWindow tw,
589✔
1030
                                       SArray *vgroupIds, SScanDbRsp *pScanRsp) {
1031
  int32_t code = 0;
589✔
1032
  SSdb   *pSdb = pMnode->pSdb;
589✔
1033
  void   *pIter = NULL;
589✔
1034

1035
  SScanObj scan;
589✔
1036
  if ((code = mndAddScanToTran(pMnode, pTrans, &scan, pDb, pScanRsp)) != 0) {
589✔
1037
    TAOS_RETURN(code);
×
1038
  }
1039

1040
  int32_t j = 0;
589✔
1041
  int32_t numOfVgroups = taosArrayGetSize(vgroupIds);
589✔
1042
  if (numOfVgroups > 0) {
589✔
1043
    for (int32_t i = 0; i < numOfVgroups; i++) {
497✔
1044
      int64_t vgId = *(int64_t *)taosArrayGet(vgroupIds, i);
355✔
1045
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
355✔
1046

1047
      if (pVgroup == NULL) {
355✔
1048
        mError("db:%s, vgroup:%" PRId64 " not exist", pDb->name, vgId);
×
1049
        TAOS_RETURN(TSDB_CODE_MND_VGROUP_NOT_EXIST);
×
1050
      } else if (pVgroup->dbUid != pDb->uid) {
355✔
1051
        mError("db:%s, vgroup:%" PRId64 " not belong to db:%s", pDb->name, vgId, pDb->name);
142✔
1052
        sdbRelease(pSdb, pVgroup);
142✔
1053
        TAOS_RETURN(TSDB_CODE_MND_VGROUP_NOT_EXIST);
142✔
1054
      }
1055
      sdbRelease(pSdb, pVgroup);
213✔
1056
    }
1057

1058
    for (int32_t i = 0; i < numOfVgroups; i++) {
355✔
1059
      int64_t vgId = *(int64_t *)taosArrayGet(vgroupIds, i);
213✔
1060
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
213✔
1061

1062
      if ((code = mndBuildScanVgroupAction(pMnode, pTrans, pDb, pVgroup, scanTs, tw)) != 0) {
213✔
1063
        sdbRelease(pSdb, pVgroup);
×
1064
        TAOS_RETURN(code);
×
1065
      }
1066

1067
      for (int32_t i = 0; i < pVgroup->replica; i++) {
426✔
1068
        SVnodeGid *gid = &pVgroup->vnodeGid[i];
213✔
1069
        if ((code = mndAddScanDetailToTran(pMnode, pTrans, &scan, pVgroup, gid, j)) != 0) {
213✔
1070
          sdbRelease(pSdb, pVgroup);
×
1071
          TAOS_RETURN(code);
×
1072
        }
1073
        j++;
213✔
1074
      }
1075
      sdbRelease(pSdb, pVgroup);
213✔
1076
    }
1077
  } else {
1078
    while (1) {
610✔
1079
      SVgObj *pVgroup = NULL;
915✔
1080
      pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
915✔
1081
      if (pIter == NULL) break;
915✔
1082

1083
      if (pVgroup->dbUid == pDb->uid) {
610✔
1084
        if ((code = mndBuildScanVgroupAction(pMnode, pTrans, pDb, pVgroup, scanTs, tw)) != 0) {
468✔
1085
          sdbCancelFetch(pSdb, pIter);
×
1086
          sdbRelease(pSdb, pVgroup);
×
1087
          TAOS_RETURN(code);
×
1088
        }
1089

1090
        for (int32_t i = 0; i < pVgroup->replica; i++) {
936✔
1091
          SVnodeGid *gid = &pVgroup->vnodeGid[i];
468✔
1092
          if ((code = mndAddScanDetailToTran(pMnode, pTrans, &scan, pVgroup, gid, j)) != 0) {
468✔
1093
            sdbCancelFetch(pSdb, pIter);
×
1094
            sdbRelease(pSdb, pVgroup);
×
1095
            TAOS_RETURN(code);
×
1096
          }
1097
          j++;
468✔
1098
        }
1099
      }
1100

1101
      sdbRelease(pSdb, pVgroup);
610✔
1102
    }
1103
  }
1104

1105
  TAOS_RETURN(code);
447✔
1106
}
1107

1108
static int32_t mndScanDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds) {
589✔
1109
  int32_t    code = 0;
589✔
1110
  int32_t    lino;
1111
  SScanDbRsp scanRsp = {0};
589✔
1112

1113
  bool  isExist = false;
589✔
1114
  void *pIter = NULL;
589✔
1115
  while (1) {
71✔
1116
    SScanObj *pScan = NULL;
660✔
1117
    pIter = sdbFetch(pMnode->pSdb, SDB_SCAN, pIter, (void **)&pScan);
660✔
1118
    if (pIter == NULL) break;
660✔
1119

1120
    if (strcmp(pScan->dbname, pDb->name) == 0) {
71✔
1121
      isExist = true;
×
1122
    }
1123
    sdbRelease(pMnode->pSdb, pScan);
71✔
1124
  }
1125
  if (isExist) {
589✔
1126
    mInfo("scan db:%s already exist", pDb->name);
×
1127

1128
    if (pReq) {
×
1129
      int32_t rspLen = 0;
×
1130
      void   *pRsp = NULL;
×
1131
      scanRsp.scanId = 0;
×
1132
      scanRsp.bAccepted = false;
×
1133
      code = mndBuildScanDbRsp(&scanRsp, &rspLen, &pRsp, true);
×
1134
      TSDB_CHECK_CODE(code, lino, _OVER);
×
1135

1136
      pReq->info.rsp = pRsp;
×
1137
      pReq->info.rspLen = rspLen;
×
1138
    }
1139

1140
    return TSDB_CODE_MND_SCAN_ALREADY_EXIST;
×
1141
  }
1142

1143
  int64_t scanTs = taosGetTimestampMs();
589✔
1144
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "scan-db");
589✔
1145
  if (pTrans == NULL) goto _OVER;
589✔
1146

1147
  mInfo("trans:%d, used to scan db:%s", pTrans->id, pDb->name);
589✔
1148
  mndTransSetDbName(pTrans, pDb->name, NULL);
589✔
1149
  code = mndTransCheckConflict(pMnode, pTrans);
589✔
1150
  TSDB_CHECK_CODE(code, lino, _OVER);
589✔
1151

1152
  code = mndSetScanDbCommitLogs(pMnode, pTrans, pDb, scanTs);
589✔
1153
  TSDB_CHECK_CODE(code, lino, _OVER);
589✔
1154

1155
  code = mndSetScanDbRedoActions(pMnode, pTrans, pDb, scanTs, tw, vgroupIds, &scanRsp);
589✔
1156
  TSDB_CHECK_CODE(code, lino, _OVER);
589✔
1157

1158
  if (pReq) {
447✔
1159
    int32_t rspLen = 0;
447✔
1160
    void   *pRsp = NULL;
447✔
1161
    scanRsp.bAccepted = true;
447✔
1162
    code = mndBuildScanDbRsp(&scanRsp, &rspLen, &pRsp, false);
447✔
1163
    TSDB_CHECK_CODE(code, lino, _OVER);
447✔
1164
    mndTransSetRpcRsp(pTrans, pRsp, rspLen);
447✔
1165
  }
1166

1167
  if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
447✔
1168
  code = 0;
447✔
1169

1170
_OVER:
589✔
1171
  mndTransDrop(pTrans);
589✔
1172
  TAOS_RETURN(code);
589✔
1173
}
1174

1175
static int32_t mndProcessScanTimer(SRpcMsg *pReq) {
3,790,630✔
1176
  mTrace("start to process scan timer");
3,790,630✔
1177
  mndScanPullup(pReq->info.node);
3,790,630✔
1178
  return 0;
3,790,630✔
1179
}
1180

1181
int32_t mndProcessScanDbReq(SRpcMsg *pReq) {
752✔
1182
  SMnode    *pMnode = pReq->info.node;
752✔
1183
  int32_t    code = -1;
752✔
1184
  SDbObj    *pDb = NULL;
752✔
1185
  SScanDbReq scanReq = {0};
752✔
1186

1187
  if (tDeserializeSScanDbReq(pReq->pCont, pReq->contLen, &scanReq) != 0) {
752✔
1188
    code = TSDB_CODE_INVALID_MSG;
×
1189
    goto _OVER;
×
1190
  }
1191

1192
  mInfo("db:%s, start to scan", scanReq.db);
752✔
1193

1194
  pDb = mndAcquireDb(pMnode, scanReq.db);
752✔
1195
  if (pDb == NULL) {
752✔
1196
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1197
    if (terrno != 0) code = terrno;
×
1198
    goto _OVER;
×
1199
  }
1200

1201
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_SCAN_DB, pDb), NULL, _OVER);
752✔
1202

1203
  code = mndScanDb(pMnode, pReq, pDb, scanReq.timeRange, scanReq.vgroupIds);
589✔
1204
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
589✔
1205

1206
_OVER:
752✔
1207
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
752✔
1208
    mError("db:%s, failed to process scan db req since %s", scanReq.db, terrstr());
305✔
1209
  }
1210

1211
  mndReleaseDb(pMnode, pDb);
752✔
1212
  tFreeSScanDbReq(&scanReq);
752✔
1213
  TAOS_RETURN(code);
752✔
1214
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc