• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4791

13 Oct 2025 06:50AM UTC coverage: 57.628% (-0.8%) from 58.476%
#4791

push

travis-ci

web-flow
Merge pull request #33213 from taosdata/fix/huoh/timemoe_model_directory

fix: fix tdgpt timemoe model directory

136628 of 303332 branches covered (45.04%)

Branch coverage included in aggregate %.

208121 of 294900 relevant lines covered (70.57%)

4250784.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.12
/source/dnode/mnode/impl/src/mndCompact.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "audit.h"
16
#include "mndCompact.h"
17
#include "mndCompactDetail.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndPrivilege.h"
21
#include "mndShow.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tmisce.h"
25
#include "tmsgcb.h"
26

27
#define MND_COMPACT_VER_NUMBER 1
28
#define MND_COMPACT_ID_LEN     11
29

30
static int32_t mndProcessCompactTimer(SRpcMsg *pReq);
31

32
int32_t mndInitCompact(SMnode *pMnode) {
1,331✔
33
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact);
1,331✔
34
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq);
1,331✔
35
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp);
1,331✔
36
  mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer);
1,331✔
37
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_COMPACT_RSP, mndTransProcessRsp);
1,331✔
38

39
  SSdbTable table = {
1,331✔
40
      .sdbType = SDB_COMPACT,
41
      .keyType = SDB_KEY_INT32,
42
      .encodeFp = (SdbEncodeFp)mndCompactActionEncode,
43
      .decodeFp = (SdbDecodeFp)mndCompactActionDecode,
44
      .insertFp = (SdbInsertFp)mndCompactActionInsert,
45
      .updateFp = (SdbUpdateFp)mndCompactActionUpdate,
46
      .deleteFp = (SdbDeleteFp)mndCompactActionDelete,
47
  };
48

49
  return sdbSetTable(pMnode->pSdb, table);
1,331✔
50
}
51

52
void mndCleanupCompact(SMnode *pMnode) { mDebug("mnd compact cleanup"); }
1,331✔
53

54
void tFreeCompactObj(SCompactObj *pCompact) {}
161✔
55

56
int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) {
282✔
57
  SEncoder encoder = {0};
282✔
58
  int32_t  code = 0;
282✔
59
  int32_t  lino;
60
  int32_t  tlen;
61
  tEncoderInit(&encoder, buf, bufLen);
282✔
62

63
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
282!
64
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->compactId));
564!
65
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
564!
66
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
564!
67
  TAOS_CHECK_EXIT(tEncodeU32v(&encoder, pObj->flags));
564!
68

69
  tEndEncode(&encoder);
282✔
70

71
_exit:
282✔
72
  if (code) {
282!
73
    tlen = code;
×
74
  } else {
75
    tlen = encoder.pos;
282✔
76
  }
77
  tEncoderClear(&encoder);
282✔
78
  return tlen;
282✔
79
}
80

81
int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) {
161✔
82
  int32_t  code = 0;
161✔
83
  int32_t  lino;
84
  SDecoder decoder = {0};
161✔
85
  tDecoderInit(&decoder, buf, bufLen);
161✔
86

87
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
161!
88
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->compactId));
322!
89
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
161!
90
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
322!
91

92
  if (!tDecodeIsEnd(&decoder)) {
161!
93
    TAOS_CHECK_EXIT(tDecodeU32v(&decoder, &pObj->flags));
322!
94
  } else {
95
    pObj->flags = 0;
×
96
  }
97

98
  tEndDecode(&decoder);
161✔
99

100
_exit:
161✔
101
  tDecoderClear(&decoder);
161✔
102
  return code;
161✔
103
}
104

105
SSdbRaw *mndCompactActionEncode(SCompactObj *pCompact) {
92✔
106
  int32_t code = 0;
92✔
107
  int32_t lino = 0;
92✔
108
  terrno = TSDB_CODE_SUCCESS;
92✔
109

110
  void    *buf = NULL;
92✔
111
  SSdbRaw *pRaw = NULL;
92✔
112

113
  int32_t tlen = tSerializeSCompactObj(NULL, 0, pCompact);
92✔
114
  if (tlen < 0) {
92!
115
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
116
    goto OVER;
×
117
  }
118

119
  int32_t size = sizeof(int32_t) + tlen;
92✔
120
  pRaw = sdbAllocRaw(SDB_COMPACT, MND_COMPACT_VER_NUMBER, size);
92✔
121
  if (pRaw == NULL) {
92!
122
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
123
    goto OVER;
×
124
  }
125

126
  buf = taosMemoryMalloc(tlen);
92!
127
  if (buf == NULL) {
92!
128
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
129
    goto OVER;
×
130
  }
131

132
  tlen = tSerializeSCompactObj(buf, tlen, pCompact);
92✔
133
  if (tlen < 0) {
92!
134
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
135
    goto OVER;
×
136
  }
137

138
  int32_t dataPos = 0;
92✔
139
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
92!
140
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
92!
141
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
92!
142

143
OVER:
92✔
144
  taosMemoryFreeClear(buf);
92!
145
  if (terrno != TSDB_CODE_SUCCESS) {
92!
146
    mError("compact:%" PRId32 ", failed to encode to raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
147
    sdbFreeRaw(pRaw);
×
148
    return NULL;
×
149
  }
150

151
  mTrace("compact:%" PRId32 ", encode to raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
92!
152
  return pRaw;
92✔
153
}
154

155
SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) {
120✔
156
  int32_t      code = 0;
120✔
157
  int32_t      lino = 0;
120✔
158
  SSdbRow     *pRow = NULL;
120✔
159
  SCompactObj *pCompact = NULL;
120✔
160
  void        *buf = NULL;
120✔
161
  terrno = TSDB_CODE_SUCCESS;
120✔
162

163
  int8_t sver = 0;
120✔
164
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
120!
165
    goto OVER;
×
166
  }
167

168
  if (sver != MND_COMPACT_VER_NUMBER) {
120!
169
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
170
    mError("compact read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER);
×
171
    goto OVER;
×
172
  }
173

174
  pRow = sdbAllocRow(sizeof(SCompactObj));
120✔
175
  if (pRow == NULL) {
120!
176
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
177
    goto OVER;
×
178
  }
179

180
  pCompact = sdbGetRowObj(pRow);
120✔
181
  if (pCompact == NULL) {
120!
182
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
183
    goto OVER;
×
184
  }
185

186
  int32_t tlen;
187
  int32_t dataPos = 0;
120✔
188
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
120!
189
  buf = taosMemoryMalloc(tlen + 1);
120!
190
  if (buf == NULL) {
120!
191
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
192
    goto OVER;
×
193
  }
194
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
120!
195

196
  if ((terrno = tDeserializeSCompactObj(buf, tlen, pCompact)) < 0) {
120!
197
    goto OVER;
×
198
  }
199

200
OVER:
120✔
201
  taosMemoryFreeClear(buf);
120!
202
  if (terrno != TSDB_CODE_SUCCESS) {
120!
203
    mError("compact:%" PRId32 ", failed to decode from raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
204
    taosMemoryFreeClear(pRow);
×
205
    return NULL;
×
206
  }
207

208
  mTrace("compact:%" PRId32 ", decode from raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
120!
209
  return pRow;
120✔
210
}
211

212
int32_t mndCompactActionInsert(SSdb *pSdb, SCompactObj *pCompact) {
62✔
213
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
62!
214
  return 0;
62✔
215
}
216

217
int32_t mndCompactActionDelete(SSdb *pSdb, SCompactObj *pCompact) {
120✔
218
  mTrace("compact:%" PRId32 ", perform delete action", pCompact->compactId);
120!
219
  tFreeCompactObj(pCompact);
120✔
220
  return 0;
120✔
221
}
222

223
int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj *pNewCompact) {
2✔
224
  mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p", pOldCompact->compactId, pOldCompact,
2!
225
         pNewCompact);
226

227
  return 0;
2✔
228
}
229

230
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
194✔
231
  SSdb        *pSdb = pMnode->pSdb;
194✔
232
  SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId);
194✔
233
  if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
194!
234
    terrno = TSDB_CODE_SUCCESS;
×
235
  }
236
  return pCompact;
194✔
237
}
238

239
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
194✔
240
  SSdb *pSdb = pMnode->pSdb;
194✔
241
  sdbRelease(pSdb, pCompact);
194✔
242
  pCompact = NULL;
194✔
243
}
194✔
244

245
int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len) {
77✔
246
  int32_t      code = 0;
77✔
247
  SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
77✔
248
  if (pCompact == NULL) {
77!
249
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
250
    if (terrno != 0) code = terrno;
×
251
    TAOS_RETURN(code);
×
252
  }
253

254
  tstrncpy(dbname, pCompact->dbname, len);
77✔
255
  mndReleaseCompact(pMnode, pCompact);
77✔
256
  TAOS_RETURN(code);
77✔
257
}
258

259
// compact db
260
int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SDbObj *pDb, SCompactDbRsp *rsp) {
46✔
261
  int32_t code = 0;
46✔
262
  pCompact->compactId = tGenIdPI32();
46✔
263

264
  tstrncpy(pCompact->dbname, pDb->name, sizeof(pCompact->dbname));
46✔
265

266
  pCompact->startTime = taosGetTimestampMs();
46✔
267

268
  SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact);
46✔
269
  if (pVgRaw == NULL) {
46!
270
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
271
    if (terrno != 0) code = terrno;
×
272
    TAOS_RETURN(code);
×
273
  }
274
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
46!
275
    sdbFreeRaw(pVgRaw);
×
276
    TAOS_RETURN(code);
×
277
  }
278

279
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
46!
280
    sdbFreeRaw(pVgRaw);
×
281
    TAOS_RETURN(code);
×
282
  }
283

284
  rsp->compactId = pCompact->compactId;
46✔
285

286
  return 0;
46✔
287
}
288

289
// retrieve compact
290
int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
578✔
291
  SMnode      *pMnode = pReq->info.node;
578✔
292
  SSdb        *pSdb = pMnode->pSdb;
578✔
293
  int32_t      numOfRows = 0;
578✔
294
  SCompactObj *pCompact = NULL;
578✔
295
  char        *sep = NULL;
578✔
296
  SDbObj      *pDb = NULL;
578✔
297
  int32_t      code = 0;
578✔
298
  int32_t      lino = 0;
578✔
299

300
  if (strlen(pShow->db) > 0) {
578!
301
    sep = strchr(pShow->db, '.');
×
302
    if (sep &&
×
303
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
304
      sep++;
×
305
    } else {
306
      pDb = mndAcquireDb(pMnode, pShow->db);
×
307
      if (pDb == NULL) return terrno;
×
308
    }
309
  }
310

311
  while (numOfRows < rows) {
1,119!
312
    pShow->pIter = sdbFetch(pSdb, SDB_COMPACT, pShow->pIter, (void **)&pCompact);
1,119✔
313
    if (pShow->pIter == NULL) break;
1,119✔
314

315
    SColumnInfoData *pColInfo;
316
    SName            n;
317
    int32_t          cols = 0;
541✔
318

319
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
541✔
320

321
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
541✔
322
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->compactId, false), pCompact, &lino,
541!
323
                        _OVER);
324

325
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
541✔
326
    if (pDb != NULL || !IS_SYS_DBNAME(pCompact->dbname)) {
1,082!
327
      SName name = {0};
541✔
328
      TAOS_CHECK_GOTO(tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
541!
329
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
541✔
330
    } else {
331
      tstrncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
×
332
    }
333
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
541✔
334
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pCompact, &lino, _OVER);
541!
335

336
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
541✔
337
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->startTime, false), pCompact, &lino,
541!
338
                        _OVER);
339

340
    numOfRows++;
541✔
341
    sdbRelease(pSdb, pCompact);
541✔
342
  }
343

344
_OVER:
×
345
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
578!
346
  pShow->numOfRows += numOfRows;
578✔
347
  mndReleaseDb(pMnode, pDb);
578✔
348
  return numOfRows;
578✔
349
}
350

351
// kill compact
352
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId,
2✔
353
                                    int32_t dnodeid) {
354
  SVKillCompactReq req = {0};
2✔
355
  req.compactId = compactId;
2✔
356
  req.vgId = pVgroup->vgId;
2✔
357
  req.dnodeId = dnodeid;
2✔
358
  terrno = 0;
2✔
359

360
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
2!
361
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
2✔
362
  if (contLen < 0) {
2!
363
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
364
    return NULL;
×
365
  }
366
  contLen += sizeof(SMsgHead);
2✔
367

368
  void *pReq = taosMemoryMalloc(contLen);
2!
369
  if (pReq == NULL) {
2!
370
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
371
    return NULL;
×
372
  }
373

374
  SMsgHead *pHead = pReq;
2✔
375
  pHead->contLen = htonl(contLen);
2✔
376
  pHead->vgId = htonl(pVgroup->vgId);
2✔
377

378
  mTrace("vgId:%d, build compact vnode config req, contLen:%d", pVgroup->vgId, contLen);
2!
379
  int32_t ret = 0;
2✔
380
  if ((ret = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
2!
381
    terrno = ret;
×
382
    taosMemoryFreeClear(pReq);
×
383
    return NULL;
×
384
  }
385
  *pContLen = contLen;
2✔
386
  return pReq;
2✔
387
}
388

389
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId,
2✔
390
                                       int32_t dnodeid) {
391
  int32_t      code = 0;
2✔
392
  STransAction action = {0};
2✔
393

394
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
2✔
395
  if (pDnode == NULL) {
2!
396
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
397
    if (terrno != 0) code = terrno;
×
398
    TAOS_RETURN(code);
×
399
  }
400
  action.epSet = mndGetDnodeEpset(pDnode);
2✔
401
  mndReleaseDnode(pMnode, pDnode);
2✔
402

403
  int32_t contLen = 0;
2✔
404
  void   *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid);
2✔
405
  if (pReq == NULL) {
2!
406
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
407
    if (terrno != 0) code = terrno;
×
408
    TAOS_RETURN(code);
×
409
  }
410

411
  action.pCont = pReq;
2✔
412
  action.contLen = contLen;
2✔
413
  action.msgType = TDMT_VND_KILL_COMPACT;
2✔
414

415
  mTrace("trans:%d, kill compact msg len:%d", pTrans->id, contLen);
2!
416

417
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2!
418
    taosMemoryFree(pReq);
×
419
    TAOS_RETURN(code);
×
420
  }
421

422
  return 0;
2✔
423
}
424

425
static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) {
2✔
426
  int32_t code = 0;
2✔
427
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact");
2✔
428
  if (pTrans == NULL) {
2!
429
    mError("compact:%" PRId32 ", failed to drop since %s", pCompact->compactId, terrstr());
×
430
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
431
    if (terrno != 0) code = terrno;
×
432
    TAOS_RETURN(code);
×
433
  }
434
  mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId);
2!
435

436
  mndTransSetDbName(pTrans, pCompact->dbname, NULL);
2✔
437

438
  SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
2✔
439
  if (pCommitRaw == NULL) {
2!
440
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
441
    if (terrno != 0) code = terrno;
×
442
    mndTransDrop(pTrans);
×
443
    TAOS_RETURN(code);
×
444
  }
445
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
2!
446
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
447
    mndTransDrop(pTrans);
×
448
    TAOS_RETURN(code);
×
449
  }
450
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
2!
451
    mndTransDrop(pTrans);
×
452
    TAOS_RETURN(code);
×
453
  }
454

455
  void *pIter = NULL;
2✔
456
  while (1) {
4✔
457
    SCompactDetailObj *pDetail = NULL;
6✔
458
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
6✔
459
    if (pIter == NULL) break;
6✔
460

461
    if (pDetail->compactId == pCompact->compactId) {
4✔
462
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
2✔
463
      if (pVgroup == NULL) {
2!
464
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
465
        sdbCancelFetch(pMnode->pSdb, pIter);
×
466
        sdbRelease(pMnode->pSdb, pDetail);
×
467
        mndTransDrop(pTrans);
×
468
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
469
        if (terrno != 0) code = terrno;
×
470
        TAOS_RETURN(code);
×
471
      }
472

473
      if ((code = mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId)) != 0) {
2!
474
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
475
        sdbCancelFetch(pMnode->pSdb, pIter);
×
476
        sdbRelease(pMnode->pSdb, pDetail);
×
477
        mndTransDrop(pTrans);
×
478
        TAOS_RETURN(code);
×
479
      }
480

481
      mndReleaseVgroup(pMnode, pVgroup);
2✔
482

483
      /*
484
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
485
      if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
486
        mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
487
        mndTransDrop(pTrans);
488
        return -1;
489
      }
490
      sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
491
      */
492
    }
493

494
    sdbRelease(pMnode->pSdb, pDetail);
4✔
495
  }
496

497
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
2!
498
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
499
    mndTransDrop(pTrans);
×
500
    TAOS_RETURN(code);
×
501
  }
502

503
  mndTransDrop(pTrans);
2✔
504
  return 0;
2✔
505
}
506

507
int32_t mndProcessKillCompactReq(SRpcMsg *pReq) {
2✔
508
  int32_t         code = 0;
2✔
509
  int32_t         lino = 0;
2✔
510
  SKillCompactReq killCompactReq = {0};
2✔
511

512
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq)) != 0) {
2!
513
    TAOS_RETURN(code);
×
514
  }
515

516
  mInfo("start to kill compact:%" PRId32, killCompactReq.compactId);
2!
517

518
  SMnode      *pMnode = pReq->info.node;
2✔
519
  SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId);
2✔
520
  if (pCompact == NULL) {
2!
521
    code = TSDB_CODE_MND_INVALID_COMPACT_ID;
×
522
    tFreeSKillCompactReq(&killCompactReq);
×
523
    TAOS_RETURN(code);
×
524
  }
525

526
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB), &lino, _OVER);
2!
527

528
  TAOS_CHECK_GOTO(mndKillCompact(pMnode, pReq, pCompact), &lino, _OVER);
2!
529

530
  code = TSDB_CODE_ACTION_IN_PROGRESS;
2✔
531

532
  char    obj[TSDB_INT32_ID_LEN] = {0};
2✔
533
  int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pCompact->compactId);
2✔
534
  if ((uint32_t)nBytes < sizeof(obj)) {
2!
535
    auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql,
2✔
536
                killCompactReq.sqlLen);
537
  } else {
538
    mError("compact:%" PRId32 " failed to audit since %s", pCompact->compactId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
539
  }
540
_OVER:
×
541
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
542
    mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr());
×
543
  }
544

545
  tFreeSKillCompactReq(&killCompactReq);
2✔
546
  mndReleaseCompact(pMnode, pCompact);
2✔
547

548
  TAOS_RETURN(code);
2✔
549
}
550

551
// update progress
552
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId,
158✔
553
                                        SQueryCompactProgressRsp *rsp) {
554
  int32_t code = 0;
158✔
555

556
  void *pIter = NULL;
158✔
557
  while (1) {
108✔
558
    SCompactDetailObj *pDetail = NULL;
266✔
559
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
266✔
560
    if (pIter == NULL) break;
266!
561

562
    if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
266!
563
      pDetail->newNumberFileset = rsp->numberFileset;
158✔
564
      pDetail->newFinished = rsp->finished;
158✔
565
      pDetail->progress = rsp->progress;
158✔
566
      pDetail->remainingTime = rsp->remainingTime;
158✔
567

568
      sdbCancelFetch(pMnode->pSdb, pIter);
158✔
569
      sdbRelease(pMnode->pSdb, pDetail);
158✔
570

571
      TAOS_RETURN(code);
158✔
572
    }
573

574
    sdbRelease(pMnode->pSdb, pDetail);
108✔
575
  }
576

577
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
×
578
}
579

580
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) {
158✔
581
  int32_t                  code = 0;
158✔
582
  SQueryCompactProgressRsp req = {0};
158✔
583
  if (pReq->code != 0) {
158!
584
    mError("received wrong compact response, req code is %s", tstrerror(pReq->code));
×
585
    TAOS_RETURN(pReq->code);
×
586
  }
587
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
158✔
588
  if (code != 0) {
158!
589
    mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
590
           pReq->contLen);
591
    TAOS_RETURN(code);
×
592
  }
593

594
  mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
158!
595
         req.vgId, req.dnodeId, req.numberFileset, req.finished);
596

597
  SMnode *pMnode = pReq->info.node;
158✔
598

599
  code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
158✔
600
  if (code != 0) {
158!
601
    mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
×
602
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
603
    TAOS_RETURN(code);
×
604
  }
605

606
  TAOS_RETURN(code);
158✔
607
}
608

609
// timer
610
void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
77✔
611
  void *pIter = NULL;
77✔
612

613
  while (1) {
163✔
614
    SCompactDetailObj *pDetail = NULL;
240✔
615
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
240✔
616
    if (pIter == NULL) break;
240✔
617

618
    if (pDetail->compactId == pCompact->compactId) {
163✔
619
      SEpSet epSet = {0};
160✔
620

621
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
160✔
622
      if (pDnode == NULL) break;
160!
623
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
160!
624
        sdbRelease(pMnode->pSdb, pDetail);
×
625
        continue;
2✔
626
      }
627
      mndReleaseDnode(pMnode, pDnode);
160✔
628

629
      SQueryCompactProgressReq req;
630
      req.compactId = pDetail->compactId;
160✔
631
      req.vgId = pDetail->vgId;
160✔
632
      req.dnodeId = pDetail->dnodeId;
160✔
633

634
      int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
160✔
635
      if (contLen < 0) {
160!
636
        sdbRelease(pMnode->pSdb, pDetail);
×
637
        continue;
×
638
      }
639

640
      contLen += sizeof(SMsgHead);
160✔
641

642
      SMsgHead *pHead = rpcMallocCont(contLen);
160✔
643
      if (pHead == NULL) {
160!
644
        sdbRelease(pMnode->pSdb, pDetail);
×
645
        continue;
×
646
      }
647

648
      pHead->contLen = htonl(contLen);
160✔
649
      pHead->vgId = htonl(pDetail->vgId);
160✔
650

651
      if (tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
160!
652
        sdbRelease(pMnode->pSdb, pDetail);
×
653
        continue;
×
654
      }
655

656
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen};
160✔
657

658
      rpcMsg.pCont = pHead;
160✔
659

660
      char    detail[1024] = {0};
160✔
661
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
320!
662
                              TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS), epSet.numOfEps, epSet.inUse);
320✔
663
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
320✔
664
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
160✔
665
      }
666

667
      mDebug("compact:%d, send update progress msg to %s", pDetail->compactId, detail);
160!
668

669
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
160✔
670
        sdbRelease(pMnode->pSdb, pDetail);
2✔
671
        continue;
2✔
672
      }
673
    }
674

675
    sdbRelease(pMnode->pSdb, pDetail);
161✔
676
  }
677
}
77✔
678

679
static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
77✔
680
  int32_t code = 0;
77✔
681
  bool    needSave = false;
77✔
682
  void   *pIter = NULL;
77✔
683
  while (1) {
163✔
684
    SCompactDetailObj *pDetail = NULL;
240✔
685
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
240✔
686
    if (pIter == NULL) break;
240✔
687

688
    if (pDetail->compactId == compactId) {
163✔
689
      mDebug(
160!
690
          "compact:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
691
          "newNumberFileset:%d, newFinished:%d",
692
          pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
693
          pDetail->newNumberFileset, pDetail->newFinished);
694

695
      // these 2 number will jump back after dnode restart, so < is not used here
696
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
160!
697
        needSave = true;
79✔
698
    }
699

700
    sdbRelease(pMnode->pSdb, pDetail);
163✔
701
  }
702

703
  char dbname[TSDB_TABLE_FNAME_LEN] = {0};
77✔
704
  TAOS_CHECK_RETURN(mndCompactGetDbName(pMnode, compactId, dbname, TSDB_TABLE_FNAME_LEN));
77!
705

706
  if (!mndDbIsExist(pMnode, dbname)) {
77!
707
    needSave = true;
×
708
    mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, dbname);
×
709
  }
710

711
  if (!needSave) {
77✔
712
    mDebug("compact:%" PRId32 ", no need to save", compactId);
39!
713
    TAOS_RETURN(code);
39✔
714
  }
715

716
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress");
38✔
717
  if (pTrans == NULL) {
38!
718
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
719
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
720
    if (terrno != 0) code = terrno;
×
721
    TAOS_RETURN(code);
×
722
  }
723
  mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
38!
724

725
  mndTransSetDbName(pTrans, dbname, NULL);
38✔
726

727
  pIter = NULL;
38✔
728
  while (1) {
80✔
729
    SCompactDetailObj *pDetail = NULL;
118✔
730
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
118✔
731
    if (pIter == NULL) break;
118✔
732

733
    if (pDetail->compactId == compactId) {
80✔
734
      mInfo(
79!
735
          "compact:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
736
          "newNumberFileset:%d, newFinished:%d",
737
          pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
738
          pDetail->newNumberFileset, pDetail->newFinished);
739

740
      pDetail->numberFileset = pDetail->newNumberFileset;
79✔
741
      pDetail->finished = pDetail->newFinished;
79✔
742

743
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
79✔
744
      if (pCommitRaw == NULL) {
79!
745
        sdbCancelFetch(pMnode->pSdb, pIter);
×
746
        sdbRelease(pMnode->pSdb, pDetail);
×
747
        mndTransDrop(pTrans);
×
748
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
749
        if (terrno != 0) code = terrno;
×
750
        TAOS_RETURN(code);
×
751
      }
752
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
79!
753
        mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr());
×
754
        sdbCancelFetch(pMnode->pSdb, pIter);
×
755
        sdbRelease(pMnode->pSdb, pDetail);
×
756
        mndTransDrop(pTrans);
×
757
        TAOS_RETURN(code);
×
758
      }
759
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
79!
760
        sdbCancelFetch(pMnode->pSdb, pIter);
×
761
        sdbRelease(pMnode->pSdb, pDetail);
×
762
        mndTransDrop(pTrans);
×
763
        TAOS_RETURN(code);
×
764
      }
765
    }
766

767
    sdbRelease(pMnode->pSdb, pDetail);
80✔
768
  }
769

770
  bool allFinished = true;
38✔
771
  pIter = NULL;
38✔
772
  while (1) {
80✔
773
    SCompactDetailObj *pDetail = NULL;
118✔
774
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
118✔
775
    if (pIter == NULL) break;
118✔
776

777
    if (pDetail->compactId == compactId) {
80✔
778
      mInfo("compact:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
79!
779
            pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
780

781
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
79!
782
        allFinished = false;
×
783
        sdbCancelFetch(pMnode->pSdb, pIter);
×
784
        sdbRelease(pMnode->pSdb, pDetail);
×
785
        break;
×
786
      }
787
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
79!
788
        allFinished = false;
×
789
        sdbCancelFetch(pMnode->pSdb, pIter);
×
790
        sdbRelease(pMnode->pSdb, pDetail);
×
791
        break;
×
792
      }
793
    }
794

795
    sdbRelease(pMnode->pSdb, pDetail);
80✔
796
  }
797

798
  if (!mndDbIsExist(pMnode, dbname)) {
38!
799
    allFinished = true;
×
800
    mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, dbname);
×
801
  }
802

803
  if (allFinished) {
38!
804
    mInfo("compact:%d, all finished", compactId);
38!
805
    pIter = NULL;
38✔
806
    while (1) {
80✔
807
      SCompactDetailObj *pDetail = NULL;
118✔
808
      pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
118✔
809
      if (pIter == NULL) break;
118✔
810

811
      if (pDetail->compactId == compactId) {
80✔
812
        SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
79✔
813
        if (pCommitRaw == NULL) {
79!
814
          mndTransDrop(pTrans);
×
815
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
816
          if (terrno != 0) code = terrno;
×
817
          TAOS_RETURN(code);
×
818
        }
819
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
79!
820
          mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id,
×
821
                 terrstr());
822
          sdbCancelFetch(pMnode->pSdb, pIter);
×
823
          sdbRelease(pMnode->pSdb, pDetail);
×
824
          mndTransDrop(pTrans);
×
825
          TAOS_RETURN(code);
×
826
        }
827
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
79!
828
          sdbCancelFetch(pMnode->pSdb, pIter);
×
829
          sdbRelease(pMnode->pSdb, pDetail);
×
830
          mndTransDrop(pTrans);
×
831
          TAOS_RETURN(code);
×
832
        }
833
        mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId);
79!
834
      }
835

836
      sdbRelease(pMnode->pSdb, pDetail);
80✔
837
    }
838

839
    SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
38✔
840
    if (pCompact == NULL) {
38!
841
      mndTransDrop(pTrans);
×
842
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
843
      if (terrno != 0) code = terrno;
×
844
      TAOS_RETURN(code);
×
845
    }
846
    SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
38✔
847
    mndReleaseCompact(pMnode, pCompact);
38✔
848
    if (pCommitRaw == NULL) {
38!
849
      mndTransDrop(pTrans);
×
850
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
851
      if (terrno != 0) code = terrno;
×
852
      TAOS_RETURN(code);
×
853
    }
854
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
38!
855
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
856
      mndTransDrop(pTrans);
×
857
      TAOS_RETURN(code);
×
858
    }
859
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
38!
860
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
861
      mndTransDrop(pTrans);
×
862
      TAOS_RETURN(code);
×
863
    }
864
    mInfo("compact:%d, add drop compact action", pCompact->compactId);
38!
865
  }
866

867
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
38!
868
    mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
×
869
    mndTransDrop(pTrans);
×
870
    TAOS_RETURN(code);
×
871
  }
872

873
  mndTransDrop(pTrans);
38✔
874
  return 0;
38✔
875
}
876

877
static void mndCompactPullup(SMnode *pMnode) {
3,554✔
878
  int32_t code = 0;
3,554✔
879
  SSdb   *pSdb = pMnode->pSdb;
3,554✔
880
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t));
3,554✔
881
  if (pArray == NULL) return;
3,554!
882

883
  void *pIter = NULL;
3,554✔
884
  while (1) {
77✔
885
    SCompactObj *pCompact = NULL;
3,631✔
886
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
3,631✔
887
    if (pIter == NULL) break;
3,631✔
888
    if (taosArrayPush(pArray, &pCompact->compactId) == NULL) {
154!
889
      mError("failed to push compact id:%d into array, but continue pull up", pCompact->compactId);
×
890
    }
891
    sdbRelease(pSdb, pCompact);
77✔
892
  }
893

894
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
3,631✔
895
    mInfo("begin to pull up");
77!
896
    int32_t     *pCompactId = taosArrayGet(pArray, i);
77✔
897
    SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);
77✔
898
    if (pCompact != NULL) {
77!
899
      mInfo("compact:%d, begin to pull up", pCompact->compactId);
77!
900
      mndCompactSendProgressReq(pMnode, pCompact);
77✔
901
      if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
77!
902
        mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
×
903
      }
904
      mndReleaseCompact(pMnode, pCompact);
77✔
905
    }
906
  }
907
  taosArrayDestroy(pArray);
3,554✔
908
}
909
#ifdef TD_ENTERPRISE
910
static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
×
911
  if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
×
912
    return 0;
×
913
  }
914

915
  SName   name = {0};
×
916
  int32_t sqlLen = 0;
×
917
  char    sql[256] = {0};
×
918
  char    skeyStr[40] = {0};
×
919
  char    ekeyStr[40] = {0};
×
920
  char   *pDbName = pDb->name;
×
921

922
  if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) {
×
923
    pDbName = name.dbname;
×
924
  }
925

926
  if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 &&
×
927
      taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) {
×
928
    sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr);
×
929
  } else {
930
    sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
×
931
                       tw->ekey);
932
  }
933
  auditRecord(NULL, pMnode->clusterId, "autoCompactDB", name.dbname, "", sql, sqlLen);
×
934

935
  return 0;
×
936
}
937

938
extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds,
939
                            bool metaOnly, ETsdbOpType type, ETriggerType triggerType);
940
static int32_t mndCompactDispatch(SRpcMsg *pReq) {
3,554✔
941
  int32_t code = 0;
3,554✔
942
  SMnode *pMnode = pReq->info.node;
3,554✔
943
  SSdb   *pSdb = pMnode->pSdb;
3,554✔
944
  int64_t curMs = taosGetTimestampMs();
3,554✔
945
  int64_t curMin = curMs / 60000LL;
3,554✔
946

947
  void   *pIter = NULL;
3,554✔
948
  SDbObj *pDb = NULL;
3,554✔
949
  while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
11,023✔
950
    if (pDb->cfg.compactInterval <= 0) {
7,469✔
951
      mDebug("db:%p,%s, compact interval is %dm, skip", pDb, pDb->name, pDb->cfg.compactInterval);
7,464✔
952
      sdbRelease(pSdb, pDb);
7,464✔
953
      continue;
7,469✔
954
    }
955

956
    if (pDb->cfg.isMount) {
5!
957
      sdbRelease(pSdb, pDb);
×
958
      continue;
×
959
    }
960

961
    // daysToKeep2 would be altered
962
    if (pDb->cfg.compactEndTime && (pDb->cfg.compactEndTime <= -pDb->cfg.daysToKeep2)) {
5!
963
      mWarn("db:%p,%s, compact end time:%dm <= -keep2:%dm , skip", pDb, pDb->name, pDb->cfg.compactEndTime,
×
964
            -pDb->cfg.daysToKeep2);
965
      sdbRelease(pSdb, pDb);
×
966
      continue;
×
967
    }
968

969
    int64_t compactStartTime = pDb->cfg.compactStartTime ? pDb->cfg.compactStartTime : -pDb->cfg.daysToKeep2;
5✔
970
    int64_t compactEndTime = pDb->cfg.compactEndTime ? pDb->cfg.compactEndTime : -pDb->cfg.daysPerFile;
5✔
971

972
    if (compactStartTime >= compactEndTime) {
5!
973
      mDebug("db:%p,%s, compact start time:%" PRIi64 "m >= end time:%" PRIi64 "m, skip", pDb, pDb->name,
×
974
             compactStartTime, compactEndTime);
975
      sdbRelease(pSdb, pDb);
×
976
      continue;
×
977
    }
978

979
    int64_t remainder = ((curMin - (int64_t)pDb->cfg.compactTimeOffset * 60LL) % pDb->cfg.compactInterval);
5✔
980
    if (remainder != 0) {
5!
981
      mDebug("db:%p,%s, current time:%" PRIi64 "m is not divisible by compact interval:%dm, offset:%" PRIi8
5!
982
             "h, remainder:%" PRIi64 "m, skip",
983
             pDb, pDb->name, curMin, pDb->cfg.compactInterval, pDb->cfg.compactTimeOffset, remainder);
984
      sdbRelease(pSdb, pDb);
5✔
985
      continue;
5✔
986
    }
987

988
    if ((pDb->compactStartTime / 60000LL) == curMin) {
×
989
      mDebug("db:%p:%s, compact has already been dispatched at %" PRIi64 "m(%" PRIi64 "ms), skip", pDb, pDb->name,
×
990
             curMin, pDb->compactStartTime);
991
      sdbRelease(pSdb, pDb);
×
992
      continue;
×
993
    }
994

995
    STimeWindow tw = {
×
996
        .skey = convertTimePrecision(curMs + compactStartTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision),
×
997
        .ekey = convertTimePrecision(curMs + compactEndTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)};
×
998

999
    if ((code = mndCompactDb(pMnode, NULL, pDb, tw, NULL, false, TSDB_OPTR_NORMAL, TSDB_TRIGGER_AUTO)) == 0) {
×
1000
      mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
1001
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h",
1002
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
1003
            pDb->cfg.compactTimeOffset);
1004
    } else {
1005
      mWarn("db:%p,%s, failed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
1006
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h, since %s",
1007
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
1008
            pDb->cfg.compactTimeOffset, tstrerror(code));
1009
    }
1010

1011
    TAOS_UNUSED(mndCompactDispatchAudit(pMnode, pReq, pDb, &tw));
×
1012

1013
    sdbRelease(pSdb, pDb);
×
1014
  }
1015
  return 0;
3,554✔
1016
}
1017
#endif
1018

1019
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
3,554✔
1020
#ifdef TD_ENTERPRISE
1021
  mTrace("start to process compact timer");
3,554✔
1022
  mndCompactPullup(pReq->info.node);
3,554✔
1023
  TAOS_UNUSED(mndCompactDispatch(pReq));
3,554✔
1024
#endif
1025
  return 0;
3,554✔
1026
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc