• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4923

09 Jan 2026 08:13AM UTC coverage: 65.373% (+0.2%) from 65.161%
#4923

push

travis-ci

web-flow
merge: from main to 3.0 branch #34232

33 of 56 new or added lines in 8 files covered. (58.93%)

3042 existing lines in 131 files now uncovered.

198273 of 303297 relevant lines covered (65.37%)

118980690.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.22
/source/dnode/mnode/impl/src/mndCompact.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "audit.h"
16
#include "mndCompact.h"
17
#include "mndCompactDetail.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndPrivilege.h"
21
#include "mndShow.h"
22
#include "mndTrans.h"
23
#include "mndUser.h"
24
#include "mndVgroup.h"
25
#include "tmisce.h"
26
#include "tmsgcb.h"
27

28
#define MND_COMPACT_VER_NUMBER 1
29
#define MND_COMPACT_ID_LEN     11
30

31
static int32_t mndProcessCompactTimer(SRpcMsg *pReq);
32

33
int32_t mndInitCompact(SMnode *pMnode) {
398,788✔
34
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact);
398,788✔
35
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq);
398,788✔
36
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp);
398,788✔
37
  mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer);
398,788✔
38
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_COMPACT_RSP, mndTransProcessRsp);
398,788✔
39

40
  SSdbTable table = {
398,788✔
41
      .sdbType = SDB_COMPACT,
42
      .keyType = SDB_KEY_INT32,
43
      .encodeFp = (SdbEncodeFp)mndCompactActionEncode,
44
      .decodeFp = (SdbDecodeFp)mndCompactActionDecode,
45
      .insertFp = (SdbInsertFp)mndCompactActionInsert,
46
      .updateFp = (SdbUpdateFp)mndCompactActionUpdate,
47
      .deleteFp = (SdbDeleteFp)mndCompactActionDelete,
48
  };
49

50
  return sdbSetTable(pMnode->pSdb, table);
398,788✔
51
}
52

53
void mndCleanupCompact(SMnode *pMnode) { mDebug("mnd compact cleanup"); }
398,729✔
54

55
void tFreeCompactObj(SCompactObj *pCompact) {}
105,983✔
56

57
int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) {
183,084✔
58
  SEncoder encoder = {0};
183,084✔
59
  int32_t  code = 0;
183,084✔
60
  int32_t  lino;
61
  int32_t  tlen;
62
  tEncoderInit(&encoder, buf, bufLen);
183,084✔
63

64
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
183,084✔
65
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->compactId));
366,168✔
66
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
366,168✔
67
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
366,168✔
68
  TAOS_CHECK_EXIT(tEncodeU32v(&encoder, pObj->flags));
366,168✔
69
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
366,168✔
70

71
  tEndEncode(&encoder);
183,084✔
72

73
_exit:
183,084✔
74
  if (code) {
183,084✔
75
    tlen = code;
×
76
  } else {
77
    tlen = encoder.pos;
183,084✔
78
  }
79
  tEncoderClear(&encoder);
183,084✔
80
  return tlen;
183,084✔
81
}
82

83
int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) {
105,983✔
84
  int32_t  code = 0;
105,983✔
85
  int32_t  lino;
86
  SDecoder decoder = {0};
105,983✔
87
  tDecoderInit(&decoder, buf, bufLen);
105,983✔
88

89
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
105,983✔
90
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->compactId));
211,966✔
91
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
105,983✔
92
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
211,966✔
93

94
  if (!tDecodeIsEnd(&decoder)) {
105,983✔
95
    TAOS_CHECK_EXIT(tDecodeU32v(&decoder, &pObj->flags));
211,966✔
96
  } else {
97
    pObj->flags = 0;
×
98
  }
99
  if (!tDecodeIsEnd(&decoder)) {
105,983✔
100
    TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
211,966✔
101
  } else {
102
    pObj->dbUid = 0;
×
103
  }
104

105
  tEndDecode(&decoder);
105,983✔
106

107
_exit:
105,983✔
108
  tDecoderClear(&decoder);
105,983✔
109
  return code;
105,983✔
110
}
111

112
SSdbRaw *mndCompactActionEncode(SCompactObj *pCompact) {
58,642✔
113
  int32_t code = 0;
58,642✔
114
  int32_t lino = 0;
58,642✔
115
  terrno = TSDB_CODE_SUCCESS;
58,642✔
116

117
  void    *buf = NULL;
58,642✔
118
  SSdbRaw *pRaw = NULL;
58,642✔
119

120
  int32_t tlen = tSerializeSCompactObj(NULL, 0, pCompact);
58,642✔
121
  if (tlen < 0) {
58,642✔
122
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
123
    goto OVER;
×
124
  }
125

126
  int32_t size = sizeof(int32_t) + tlen;
58,642✔
127
  pRaw = sdbAllocRaw(SDB_COMPACT, MND_COMPACT_VER_NUMBER, size);
58,642✔
128
  if (pRaw == NULL) {
58,642✔
129
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
130
    goto OVER;
×
131
  }
132

133
  buf = taosMemoryMalloc(tlen);
58,642✔
134
  if (buf == NULL) {
58,642✔
135
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
136
    goto OVER;
×
137
  }
138

139
  tlen = tSerializeSCompactObj(buf, tlen, pCompact);
58,642✔
140
  if (tlen < 0) {
58,642✔
141
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
142
    goto OVER;
×
143
  }
144

145
  int32_t dataPos = 0;
58,642✔
146
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
58,642✔
147
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
58,642✔
148
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
58,642✔
149

150
OVER:
58,642✔
151
  taosMemoryFreeClear(buf);
58,642✔
152
  if (terrno != TSDB_CODE_SUCCESS) {
58,642✔
153
    mError("compact:%" PRId32 ", failed to encode to raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
154
    sdbFreeRaw(pRaw);
×
155
    return NULL;
×
156
  }
157

158
  mTrace("compact:%" PRId32 ", encode to raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
58,642✔
159
  return pRaw;
58,642✔
160
}
161

162
SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) {
76,840✔
163
  int32_t      code = 0;
76,840✔
164
  int32_t      lino = 0;
76,840✔
165
  SSdbRow     *pRow = NULL;
76,840✔
166
  SCompactObj *pCompact = NULL;
76,840✔
167
  void        *buf = NULL;
76,840✔
168
  terrno = TSDB_CODE_SUCCESS;
76,840✔
169

170
  int8_t sver = 0;
76,840✔
171
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
76,840✔
172
    goto OVER;
×
173
  }
174

175
  if (sver != MND_COMPACT_VER_NUMBER) {
76,840✔
176
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
177
    mError("compact read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER);
×
178
    goto OVER;
×
179
  }
180

181
  pRow = sdbAllocRow(sizeof(SCompactObj));
76,840✔
182
  if (pRow == NULL) {
76,840✔
183
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
184
    goto OVER;
×
185
  }
186

187
  pCompact = sdbGetRowObj(pRow);
76,840✔
188
  if (pCompact == NULL) {
76,840✔
189
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
190
    goto OVER;
×
191
  }
192

193
  int32_t tlen;
76,840✔
194
  int32_t dataPos = 0;
76,840✔
195
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
76,840✔
196
  buf = taosMemoryMalloc(tlen + 1);
76,840✔
197
  if (buf == NULL) {
76,840✔
198
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
199
    goto OVER;
×
200
  }
201
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
76,840✔
202

203
  if ((terrno = tDeserializeSCompactObj(buf, tlen, pCompact)) < 0) {
76,840✔
204
    goto OVER;
×
205
  }
206

207
OVER:
76,840✔
208
  taosMemoryFreeClear(buf);
76,840✔
209
  if (terrno != TSDB_CODE_SUCCESS) {
76,840✔
210
    mError("compact:%" PRId32 ", failed to decode from raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
211
    taosMemoryFreeClear(pRow);
×
212
    return NULL;
×
213
  }
214

215
  mTrace("compact:%" PRId32 ", decode from raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
76,840✔
216
  return pRow;
76,840✔
217
}
218

219
int32_t mndCompactActionInsert(SSdb *pSdb, SCompactObj *pCompact) {
39,049✔
220
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
39,049✔
221
  return 0;
39,049✔
222
}
223

224
int32_t mndCompactActionDelete(SSdb *pSdb, SCompactObj *pCompact) {
76,840✔
225
  mTrace("compact:%" PRId32 ", perform delete action", pCompact->compactId);
76,840✔
226
  tFreeCompactObj(pCompact);
76,840✔
227
  return 0;
76,840✔
228
}
229

230
int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj *pNewCompact) {
1,378✔
231
  mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p", pOldCompact->compactId, pOldCompact,
1,378✔
232
         pNewCompact);
233

234
  return 0;
1,378✔
235
}
236

237
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
134,195✔
238
  SSdb        *pSdb = pMnode->pSdb;
134,195✔
239
  SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId);
134,195✔
240
  if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
134,195✔
241
    terrno = TSDB_CODE_SUCCESS;
×
242
  }
243
  return pCompact;
134,195✔
244
}
245

246
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
134,195✔
247
  SSdb *pSdb = pMnode->pSdb;
134,195✔
248
  sdbRelease(pSdb, pCompact);
134,195✔
249
  pCompact = NULL;
134,195✔
250
}
134,195✔
251

252
static int32_t mndCompactGetDbInfo(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len, int64_t *dbUid) {
53,755✔
253
  int32_t      code = 0;
53,755✔
254
  SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
53,755✔
255
  if (pCompact == NULL) {
53,755✔
256
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
257
    if (terrno != 0) code = terrno;
×
258
    TAOS_RETURN(code);
×
259
  }
260

261
  tstrncpy(dbname, pCompact->dbname, len);
53,755✔
262
  if (dbUid) *dbUid = pCompact->dbUid;
53,755✔
263
  mndReleaseCompact(pMnode, pCompact);
53,755✔
264
  TAOS_RETURN(code);
53,755✔
265
}
266

267
// compact db
268
int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SDbObj *pDb, SCompactDbRsp *rsp) {
29,321✔
269
  int32_t code = 0;
29,321✔
270
  pCompact->compactId = tGenIdPI32();
29,321✔
271

272
  tstrncpy(pCompact->dbname, pDb->name, sizeof(pCompact->dbname));
29,321✔
273
  pCompact->dbUid = pDb->uid;
29,321✔
274

275
  pCompact->startTime = taosGetTimestampMs();
29,321✔
276

277
  SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact);
29,321✔
278
  if (pVgRaw == NULL) {
29,321✔
279
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
280
    if (terrno != 0) code = terrno;
×
281
    TAOS_RETURN(code);
×
282
  }
283
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
29,321✔
284
    sdbFreeRaw(pVgRaw);
×
285
    TAOS_RETURN(code);
×
286
  }
287

288
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
29,321✔
289
    sdbFreeRaw(pVgRaw);
×
290
    TAOS_RETURN(code);
×
291
  }
292

293
  rsp->compactId = pCompact->compactId;
29,321✔
294

295
  return 0;
29,321✔
296
}
297

298
// retrieve compact
299
int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
321,913✔
300
  SMnode      *pMnode = pReq->info.node;
321,913✔
301
  SSdb        *pSdb = pMnode->pSdb;
321,913✔
302
  int32_t      numOfRows = 0;
321,913✔
303
  SCompactObj *pCompact = NULL;
321,913✔
304
  char        *sep = NULL;
321,913✔
305
  SDbObj      *pDb = NULL;
321,913✔
306
  int32_t      code = 0;
321,913✔
307
  int32_t      lino = 0;
321,913✔
308
  SUserObj    *pUser = NULL;
321,913✔
309
  SDbObj      *pIterDb = NULL;
321,913✔
310
  char         objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
321,913✔
311
  bool         showAll = false, showIter = false;
321,913✔
312
  int64_t      dbUid = 0;
321,913✔
313

314
  if (strlen(pShow->db) > 0) {
321,913✔
315
    sep = strchr(pShow->db, '.');
×
316
    if (sep &&
×
317
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
318
      sep++;
×
319
    } else {
320
      pDb = mndAcquireDb(pMnode, pShow->db);
×
321
      if (pDb == NULL) return terrno;
×
322
    }
323
  }
324

325
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), PRIV_SHOW_COMPACTS, PRIV_OBJ_DB, 0, _OVER);
321,913✔
326

327
  while (numOfRows < rows) {
621,067✔
328
    pShow->pIter = sdbFetch(pSdb, SDB_COMPACT, pShow->pIter, (void **)&pCompact);
621,067✔
329
    if (pShow->pIter == NULL) break;
621,067✔
330

331
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pCompact->dbname, pCompact, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_COMPACTS, _OVER);
299,154✔
332

333
    SColumnInfoData *pColInfo;
334
    SName            n;
335
    int32_t          cols = 0;
299,154✔
336

337
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
299,154✔
338

339
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
299,154✔
340
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->compactId, false), pCompact, &lino,
299,154✔
341
                        _OVER);
342

343
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
299,154✔
344
    if (pDb != NULL || !IS_SYS_DBNAME(pCompact->dbname)) {
598,308✔
345
      SName name = {0};
299,154✔
346
      TAOS_CHECK_GOTO(tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
299,154✔
347
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
299,154✔
348
    } else {
349
      tstrncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
×
350
    }
351
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
299,154✔
352
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pCompact, &lino, _OVER);
299,154✔
353

354
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
299,154✔
355
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->startTime, false), pCompact, &lino,
299,154✔
356
                        _OVER);
357

358
    numOfRows++;
299,154✔
359
    sdbRelease(pSdb, pCompact);
299,154✔
360
  }
361

362
_OVER:
321,913✔
363
  if (pUser) mndReleaseUser(pMnode, pUser);
321,913✔
364
  mndReleaseDb(pMnode, pDb);
321,913✔
365
  if (code != 0) {
321,913✔
366
    mError("failed to retrieve compact at line %d since %s", lino, tstrerror(code));
×
367
    TAOS_RETURN(code);
×
368
  }
369
  pShow->numOfRows += numOfRows;
321,913✔
370
  return numOfRows;
321,913✔
371
}
372

373
// kill compact
374
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId,
1,378✔
375
                                    int32_t dnodeid) {
376
  SVKillCompactReq req = {0};
1,378✔
377
  req.compactId = compactId;
1,378✔
378
  req.vgId = pVgroup->vgId;
1,378✔
379
  req.dnodeId = dnodeid;
1,378✔
380
  terrno = 0;
1,378✔
381

382
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
1,378✔
383
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
1,378✔
384
  if (contLen < 0) {
1,378✔
385
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
386
    return NULL;
×
387
  }
388
  contLen += sizeof(SMsgHead);
1,378✔
389

390
  void *pReq = taosMemoryMalloc(contLen);
1,378✔
391
  if (pReq == NULL) {
1,378✔
392
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
393
    return NULL;
×
394
  }
395

396
  SMsgHead *pHead = pReq;
1,378✔
397
  pHead->contLen = htonl(contLen);
1,378✔
398
  pHead->vgId = htonl(pVgroup->vgId);
1,378✔
399

400
  mTrace("vgId:%d, build compact vnode config req, contLen:%d", pVgroup->vgId, contLen);
1,378✔
401
  int32_t ret = 0;
1,378✔
402
  if ((ret = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
1,378✔
403
    terrno = ret;
×
404
    taosMemoryFreeClear(pReq);
×
405
    return NULL;
×
406
  }
407
  *pContLen = contLen;
1,378✔
408
  return pReq;
1,378✔
409
}
410

411
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId,
1,378✔
412
                                       int32_t dnodeid) {
413
  int32_t      code = 0;
1,378✔
414
  STransAction action = {0};
1,378✔
415

416
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
1,378✔
417
  if (pDnode == NULL) {
1,378✔
418
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
419
    if (terrno != 0) code = terrno;
×
420
    TAOS_RETURN(code);
×
421
  }
422
  action.epSet = mndGetDnodeEpset(pDnode);
1,378✔
423
  mndReleaseDnode(pMnode, pDnode);
1,378✔
424

425
  int32_t contLen = 0;
1,378✔
426
  void   *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid);
1,378✔
427
  if (pReq == NULL) {
1,378✔
428
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
429
    if (terrno != 0) code = terrno;
×
430
    TAOS_RETURN(code);
×
431
  }
432

433
  action.pCont = pReq;
1,378✔
434
  action.contLen = contLen;
1,378✔
435
  action.msgType = TDMT_VND_KILL_COMPACT;
1,378✔
436

437
  mTrace("trans:%d, kill compact msg len:%d", pTrans->id, contLen);
1,378✔
438

439
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,378✔
440
    taosMemoryFree(pReq);
×
441
    TAOS_RETURN(code);
×
442
  }
443

444
  return 0;
1,378✔
445
}
446

447
static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) {
1,378✔
448
  int32_t code = 0;
1,378✔
449
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact");
1,378✔
450
  if (pTrans == NULL) {
1,378✔
451
    mError("compact:%" PRId32 ", failed to drop since %s", pCompact->compactId, terrstr());
×
452
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
453
    if (terrno != 0) code = terrno;
×
454
    TAOS_RETURN(code);
×
455
  }
456
  mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId);
1,378✔
457

458
  mndTransSetDbName(pTrans, pCompact->dbname, NULL);
1,378✔
459

460
  SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
1,378✔
461
  if (pCommitRaw == NULL) {
1,378✔
462
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
463
    if (terrno != 0) code = terrno;
×
464
    mndTransDrop(pTrans);
×
465
    TAOS_RETURN(code);
×
466
  }
467
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
1,378✔
468
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
469
    mndTransDrop(pTrans);
×
470
    TAOS_RETURN(code);
×
471
  }
472
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
1,378✔
473
    mndTransDrop(pTrans);
×
474
    TAOS_RETURN(code);
×
475
  }
476

477
  void *pIter = NULL;
1,378✔
478
  while (1) {
2,756✔
479
    SCompactDetailObj *pDetail = NULL;
4,134✔
480
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
4,134✔
481
    if (pIter == NULL) break;
4,134✔
482

483
    if (pDetail->compactId == pCompact->compactId) {
2,756✔
484
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
1,378✔
485
      if (pVgroup == NULL) {
1,378✔
486
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
487
        sdbCancelFetch(pMnode->pSdb, pIter);
×
488
        sdbRelease(pMnode->pSdb, pDetail);
×
489
        mndTransDrop(pTrans);
×
490
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
491
        if (terrno != 0) code = terrno;
×
492
        TAOS_RETURN(code);
×
493
      }
494

495
      if ((code = mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId)) != 0) {
1,378✔
496
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
497
        sdbCancelFetch(pMnode->pSdb, pIter);
×
498
        sdbRelease(pMnode->pSdb, pDetail);
×
499
        mndTransDrop(pTrans);
×
500
        TAOS_RETURN(code);
×
501
      }
502

503
      mndReleaseVgroup(pMnode, pVgroup);
1,378✔
504

505
      /*
506
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
507
      if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
508
        mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
509
        mndTransDrop(pTrans);
510
        return -1;
511
      }
512
      sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
513
      */
514
    }
515

516
    sdbRelease(pMnode->pSdb, pDetail);
2,756✔
517
  }
518

519
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
1,378✔
520
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
521
    mndTransDrop(pTrans);
×
522
    TAOS_RETURN(code);
×
523
  }
524

525
  mndTransDrop(pTrans);
1,378✔
526
  return 0;
1,378✔
527
}
528

529
int32_t mndProcessKillCompactReq(SRpcMsg *pReq) {
1,378✔
530
  int32_t         code = 0;
1,378✔
531
  int32_t         lino = 0;
1,378✔
532
  SKillCompactReq killCompactReq = {0};
1,378✔
533
  int64_t         tss = taosGetTimestampMs();
1,378✔
534

535
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq)) != 0) {
1,378✔
536
    TAOS_RETURN(code);
×
537
  }
538

539
  mInfo("start to kill compact:%" PRId32, killCompactReq.compactId);
1,378✔
540

541
  SMnode      *pMnode = pReq->info.node;
1,378✔
542
  SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId);
1,378✔
543
  if (pCompact == NULL) {
1,378✔
544
    code = TSDB_CODE_MND_INVALID_COMPACT_ID;
×
545
    tFreeSKillCompactReq(&killCompactReq);
×
546
    TAOS_RETURN(code);
×
547
  }
548

549
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_COMPACT_DB), &lino, _OVER);
1,378✔
550

551
  TAOS_CHECK_GOTO(mndKillCompact(pMnode, pReq, pCompact), &lino, _OVER);
1,378✔
552

553
  code = TSDB_CODE_ACTION_IN_PROGRESS;
1,378✔
554

555
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
1,378✔
556
    char    obj[TSDB_INT32_ID_LEN] = {0};
1,378✔
557
    int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pCompact->compactId);
1,378✔
558
    if ((uint32_t)nBytes < sizeof(obj)) {
1,378✔
559
      int64_t tse = taosGetTimestampMs();
1,378✔
560
      double  duration = (double)(tse - tss);
1,378✔
561
      duration = duration / 1000;
1,378✔
562
      auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql,
1,378✔
563
                  killCompactReq.sqlLen, duration, 0);
564
    } else {
565
      mError("compact:%" PRId32 " failed to audit since %s", pCompact->compactId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
566
    }
567
  }
568

569
_OVER:
1,378✔
570
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,378✔
571
    mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr());
×
572
  }
573

574
  tFreeSKillCompactReq(&killCompactReq);
1,378✔
575
  mndReleaseCompact(pMnode, pCompact);
1,378✔
576

577
  TAOS_RETURN(code);
1,378✔
578
}
579

580
// update progress
581
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId,
97,796✔
582
                                        SQueryCompactProgressRsp *rsp) {
583
  int32_t code = 0;
97,796✔
584

585
  void *pIter = NULL;
97,796✔
586
  while (1) {
66,638✔
587
    SCompactDetailObj *pDetail = NULL;
164,434✔
588
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
164,434✔
589
    if (pIter == NULL) break;
164,434✔
590

591
    if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
164,434✔
592
      pDetail->newNumberFileset = rsp->numberFileset;
97,796✔
593
      pDetail->newFinished = rsp->finished;
97,796✔
594
      pDetail->progress = rsp->progress;
97,796✔
595
      pDetail->remainingTime = rsp->remainingTime;
97,796✔
596

597
      sdbCancelFetch(pMnode->pSdb, pIter);
97,796✔
598
      sdbRelease(pMnode->pSdb, pDetail);
97,796✔
599

600
      TAOS_RETURN(code);
97,796✔
601
    }
602

603
    sdbRelease(pMnode->pSdb, pDetail);
66,638✔
604
  }
605

UNCOV
606
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
×
607
}
608

609
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) {
108,332✔
610
  int32_t                  code = 0;
108,332✔
611
  SQueryCompactProgressRsp req = {0};
108,332✔
612
  if (pReq->code != 0) {
108,332✔
613
    mError("received wrong compact response, req code is %s", tstrerror(pReq->code));
10,536✔
614
    TAOS_RETURN(pReq->code);
10,536✔
615
  }
616
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
97,796✔
617
  if (code != 0) {
97,179✔
618
    mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
619
           pReq->contLen);
620
    TAOS_RETURN(code);
×
621
  }
622

623
  mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
97,179✔
624
         req.vgId, req.dnodeId, req.numberFileset, req.finished);
625

626
  SMnode *pMnode = pReq->info.node;
97,179✔
627

628
  code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
97,796✔
629
  if (code != 0) {
97,796✔
UNCOV
630
    mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
×
631
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
UNCOV
632
    TAOS_RETURN(code);
×
633
  }
634

635
  TAOS_RETURN(code);
97,796✔
636
}
637

638
// timer
639
void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
53,755✔
640
  void *pIter = NULL;
53,755✔
641

642
  while (1) {
124,493✔
643
    SCompactDetailObj *pDetail = NULL;
178,248✔
644
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
178,248✔
645
    if (pIter == NULL) break;
178,248✔
646

647
    if (pDetail->compactId == pCompact->compactId) {
124,493✔
648
      SEpSet epSet = {0};
108,332✔
649

650
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
108,332✔
651
      if (pDnode == NULL) break;
108,332✔
652
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
108,332✔
653
        sdbRelease(pMnode->pSdb, pDetail);
×
654
        continue;
×
655
      }
656
      mndReleaseDnode(pMnode, pDnode);
108,332✔
657

658
      SQueryCompactProgressReq req;
108,332✔
659
      req.compactId = pDetail->compactId;
108,332✔
660
      req.vgId = pDetail->vgId;
108,332✔
661
      req.dnodeId = pDetail->dnodeId;
108,332✔
662

663
      int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
108,332✔
664
      if (contLen < 0) {
108,332✔
665
        sdbRelease(pMnode->pSdb, pDetail);
×
666
        continue;
×
667
      }
668

669
      contLen += sizeof(SMsgHead);
108,332✔
670

671
      SMsgHead *pHead = rpcMallocCont(contLen);
108,332✔
672
      if (pHead == NULL) {
108,332✔
673
        sdbRelease(pMnode->pSdb, pDetail);
×
674
        continue;
×
675
      }
676

677
      pHead->contLen = htonl(contLen);
108,332✔
678
      pHead->vgId = htonl(pDetail->vgId);
108,332✔
679

680
      if (tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
108,332✔
681
        sdbRelease(pMnode->pSdb, pDetail);
×
682
        continue;
×
683
      }
684

685
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen};
108,332✔
686

687
      rpcMsg.pCont = pHead;
108,332✔
688

689
      char    detail[1024] = {0};
108,332✔
690
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
216,664✔
691
                              TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS), epSet.numOfEps, epSet.inUse);
216,664✔
692
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
216,664✔
693
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
108,332✔
694
      }
695

696
      mDebug("compact:%d, send update progress msg to %s", pDetail->compactId, detail);
108,332✔
697

698
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
108,332✔
699
        sdbRelease(pMnode->pSdb, pDetail);
×
700
        continue;
×
701
      }
702
    }
703

704
    sdbRelease(pMnode->pSdb, pDetail);
124,493✔
705
  }
706
}
53,755✔
707

708
static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
53,755✔
709
  int32_t code = 0;
53,755✔
710
  bool    needSave = false;
53,755✔
711
  void   *pIter = NULL;
53,755✔
712
  while (1) {
124,493✔
713
    SCompactDetailObj *pDetail = NULL;
178,248✔
714
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
178,248✔
715
    if (pIter == NULL) break;
178,248✔
716

717
    if (pDetail->compactId == compactId) {
124,493✔
718
      mDebug(
108,332✔
719
          "compact:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
720
          "newNumberFileset:%d, newFinished:%d",
721
          pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
722
          pDetail->newNumberFileset, pDetail->newFinished);
723

724
      // these 2 number will jump back after dnode restart, so < is not used here
725
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
108,332✔
726
        needSave = true;
48,701✔
727
    }
728

729
    sdbRelease(pMnode->pSdb, pDetail);
124,493✔
730
  }
731

732
  char    dbname[TSDB_TABLE_FNAME_LEN] = {0};
53,755✔
733
  int64_t dbUid = 0;
53,755✔
734
  TAOS_CHECK_RETURN(mndCompactGetDbInfo(pMnode, compactId, dbname, TSDB_TABLE_FNAME_LEN, &dbUid));
53,755✔
735

736
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
53,755✔
737
    needSave = true;
1,330✔
738
    mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, dbname);
1,330✔
739
  }
740

741
  if (!needSave) {
53,755✔
742
    mDebug("compact:%" PRId32 ", no need to save", compactId);
22,763✔
743
    TAOS_RETURN(code);
22,763✔
744
  }
745

746
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress");
30,992✔
747
  if (pTrans == NULL) {
30,992✔
748
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
749
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
750
    if (terrno != 0) code = terrno;
×
751
    TAOS_RETURN(code);
×
752
  }
753
  mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
30,992✔
754

755
  mndTransSetDbName(pTrans, dbname, NULL);
30,992✔
756

757
  pIter = NULL;
30,992✔
758
  while (1) {
76,330✔
759
    SCompactDetailObj *pDetail = NULL;
107,322✔
760
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
107,322✔
761
    if (pIter == NULL) break;
107,322✔
762

763
    if (pDetail->compactId == compactId) {
76,330✔
764
      mInfo(
67,230✔
765
          "compact:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
766
          "newNumberFileset:%d, newFinished:%d",
767
          pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
768
          pDetail->newNumberFileset, pDetail->newFinished);
769

770
      pDetail->numberFileset = pDetail->newNumberFileset;
67,230✔
771
      pDetail->finished = pDetail->newFinished;
67,230✔
772

773
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
67,230✔
774
      if (pCommitRaw == NULL) {
67,230✔
775
        sdbCancelFetch(pMnode->pSdb, pIter);
×
776
        sdbRelease(pMnode->pSdb, pDetail);
×
777
        mndTransDrop(pTrans);
×
778
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
779
        if (terrno != 0) code = terrno;
×
780
        TAOS_RETURN(code);
×
781
      }
782
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
67,230✔
783
        mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr());
×
784
        sdbCancelFetch(pMnode->pSdb, pIter);
×
785
        sdbRelease(pMnode->pSdb, pDetail);
×
786
        mndTransDrop(pTrans);
×
787
        TAOS_RETURN(code);
×
788
      }
789
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
67,230✔
790
        sdbCancelFetch(pMnode->pSdb, pIter);
×
791
        sdbRelease(pMnode->pSdb, pDetail);
×
792
        mndTransDrop(pTrans);
×
793
        TAOS_RETURN(code);
×
794
      }
795
    }
796

797
    sdbRelease(pMnode->pSdb, pDetail);
76,330✔
798
  }
799

800
  bool allFinished = true;
30,992✔
801
  pIter = NULL;
30,992✔
802
  while (1) {
51,496✔
803
    SCompactDetailObj *pDetail = NULL;
82,488✔
804
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
82,488✔
805
    if (pIter == NULL) break;
82,488✔
806

807
    if (pDetail->compactId == compactId) {
58,511✔
808
      mInfo("compact:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
53,990✔
809
            pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
810

811
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
53,990✔
812
        allFinished = false;
3,764✔
813
        sdbCancelFetch(pMnode->pSdb, pIter);
3,764✔
814
        sdbRelease(pMnode->pSdb, pDetail);
3,764✔
815
        break;
3,764✔
816
      }
817
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
50,226✔
818
        allFinished = false;
3,251✔
819
        sdbCancelFetch(pMnode->pSdb, pIter);
3,251✔
820
        sdbRelease(pMnode->pSdb, pDetail);
3,251✔
821
        break;
3,251✔
822
      }
823
    }
824

825
    sdbRelease(pMnode->pSdb, pDetail);
51,496✔
826
  }
827

828
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
30,992✔
829
    allFinished = true;
1,330✔
830
    mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, dbname);
1,330✔
831
  }
832

833
  if (allFinished) {
30,992✔
834
    mInfo("compact:%d, all finished", compactId);
25,307✔
835
    pIter = NULL;
25,307✔
836
    while (1) {
57,130✔
837
      SCompactDetailObj *pDetail = NULL;
82,437✔
838
      pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
82,437✔
839
      if (pIter == NULL) break;
82,437✔
840

841
      if (pDetail->compactId == compactId) {
57,130✔
842
        SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
55,694✔
843
        if (pCommitRaw == NULL) {
55,694✔
844
          mndTransDrop(pTrans);
×
845
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
846
          if (terrno != 0) code = terrno;
×
847
          TAOS_RETURN(code);
×
848
        }
849
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
55,694✔
850
          mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id,
×
851
                 terrstr());
852
          sdbCancelFetch(pMnode->pSdb, pIter);
×
853
          sdbRelease(pMnode->pSdb, pDetail);
×
854
          mndTransDrop(pTrans);
×
855
          TAOS_RETURN(code);
×
856
        }
857
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
55,694✔
858
          sdbCancelFetch(pMnode->pSdb, pIter);
×
859
          sdbRelease(pMnode->pSdb, pDetail);
×
860
          mndTransDrop(pTrans);
×
861
          TAOS_RETURN(code);
×
862
        }
863
        mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId);
55,694✔
864
      }
865

866
      sdbRelease(pMnode->pSdb, pDetail);
57,130✔
867
    }
868

869
    SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
25,307✔
870
    if (pCompact == NULL) {
25,307✔
871
      mndTransDrop(pTrans);
×
872
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
873
      if (terrno != 0) code = terrno;
×
874
      TAOS_RETURN(code);
×
875
    }
876
    SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
25,307✔
877
    mndReleaseCompact(pMnode, pCompact);
25,307✔
878
    if (pCommitRaw == NULL) {
25,307✔
879
      mndTransDrop(pTrans);
×
880
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
881
      if (terrno != 0) code = terrno;
×
882
      TAOS_RETURN(code);
×
883
    }
884
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
25,307✔
885
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
886
      mndTransDrop(pTrans);
×
887
      TAOS_RETURN(code);
×
888
    }
889
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
25,307✔
890
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
891
      mndTransDrop(pTrans);
×
892
      TAOS_RETURN(code);
×
893
    }
894
    mInfo("compact:%d, add drop compact action", pCompact->compactId);
25,307✔
895
  }
896

897
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
30,992✔
898
    mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
×
899
    mndTransDrop(pTrans);
×
900
    TAOS_RETURN(code);
×
901
  }
902

903
  mndTransDrop(pTrans);
30,992✔
904
  return 0;
30,992✔
905
}
906

907
static void mndCompactPullup(SMnode *pMnode) {
3,324,207✔
908
  int32_t code = 0;
3,324,207✔
909
  SSdb   *pSdb = pMnode->pSdb;
3,324,207✔
910
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t));
3,324,207✔
911
  if (pArray == NULL) return;
3,324,207✔
912

913
  void *pIter = NULL;
3,324,207✔
914
  while (1) {
53,755✔
915
    SCompactObj *pCompact = NULL;
3,377,962✔
916
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
3,377,962✔
917
    if (pIter == NULL) break;
3,377,962✔
918
    if (taosArrayPush(pArray, &pCompact->compactId) == NULL) {
107,510✔
919
      mError("failed to push compact id:%d into array, but continue pull up", pCompact->compactId);
×
920
    }
921
    sdbRelease(pSdb, pCompact);
53,755✔
922
  }
923

924
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
3,377,962✔
925
    mInfo("begin to pull up");
53,755✔
926
    int32_t     *pCompactId = taosArrayGet(pArray, i);
53,755✔
927
    SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);
53,755✔
928
    if (pCompact != NULL) {
53,755✔
929
      mInfo("compact:%d, begin to pull up", pCompact->compactId);
53,755✔
930
      mndCompactSendProgressReq(pMnode, pCompact);
53,755✔
931
      if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
53,755✔
932
        mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
×
933
      }
934
      mndReleaseCompact(pMnode, pCompact);
53,755✔
935
    }
936
  }
937
  taosArrayDestroy(pArray);
3,324,207✔
938
}
939
#ifdef TD_ENTERPRISE
940
static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
×
941
  if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
×
942
    return 0;
×
943
  }
944
  int64_t tss = taosGetTimestampMs();
×
945

946
  SName   name = {0};
×
947
  int32_t sqlLen = 0;
×
948
  char    sql[256] = {0};
×
949
  char    skeyStr[40] = {0};
×
950
  char    ekeyStr[40] = {0};
×
951
  char   *pDbName = pDb->name;
×
952

953
  if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) {
×
954
    pDbName = name.dbname;
×
955
  }
956

957
  if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 &&
×
958
      taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) {
×
959
    sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr);
×
960
  } else {
961
    sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
×
962
                       tw->ekey);
963
  }
964

965
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
×
966
    int64_t tse = taosGetTimestampMs();
×
967
    double  duration = (double)(tse - tss);
×
968
    duration = duration / 1000;
×
969
    auditRecord(NULL, pMnode->clusterId, "autoCompactDB", name.dbname, "", sql, sqlLen, duration, 0);
×
970
  }
971

972
  return 0;
×
973
}
974

975
extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds,
976
                            bool metaOnly, ETsdbOpType type, ETriggerType triggerType);
977
static int32_t mndCompactDispatch(SRpcMsg *pReq) {
3,324,207✔
978
  int32_t code = 0;
3,324,207✔
979
  SMnode *pMnode = pReq->info.node;
3,324,207✔
980
  SSdb   *pSdb = pMnode->pSdb;
3,324,207✔
981
  int64_t curMs = taosGetTimestampMs();
3,324,207✔
982
  int64_t curMin = curMs / 60000LL;
3,324,207✔
983

984
  void   *pIter = NULL;
3,324,207✔
985
  SDbObj *pDb = NULL;
3,324,207✔
986
  while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
9,225,285✔
987
    if (pDb->cfg.compactInterval <= 0) {
5,901,078✔
988
      mDebug("db:%p,%s, compact interval is %dm, skip", pDb, pDb->name, pDb->cfg.compactInterval);
5,897,615✔
989
      sdbRelease(pSdb, pDb);
5,897,615✔
990
      continue;
5,897,615✔
991
    }
992

993
    if (pDb->cfg.isMount) {
3,463✔
994
      sdbRelease(pSdb, pDb);
×
995
      continue;
×
996
    }
997

998
    // daysToKeep2 would be altered
999
    if (pDb->cfg.compactEndTime && (pDb->cfg.compactEndTime <= -pDb->cfg.daysToKeep2)) {
3,463✔
1000
      mWarn("db:%p,%s, compact end time:%dm <= -keep2:%dm , skip", pDb, pDb->name, pDb->cfg.compactEndTime,
×
1001
            -pDb->cfg.daysToKeep2);
1002
      sdbRelease(pSdb, pDb);
×
1003
      continue;
×
1004
    }
1005

1006
    int64_t compactStartTime = pDb->cfg.compactStartTime ? pDb->cfg.compactStartTime : -pDb->cfg.daysToKeep2;
3,463✔
1007
    int64_t compactEndTime = pDb->cfg.compactEndTime ? pDb->cfg.compactEndTime : -pDb->cfg.daysPerFile;
3,463✔
1008

1009
    if (compactStartTime >= compactEndTime) {
3,463✔
1010
      mDebug("db:%p,%s, compact start time:%" PRIi64 "m >= end time:%" PRIi64 "m, skip", pDb, pDb->name,
×
1011
             compactStartTime, compactEndTime);
1012
      sdbRelease(pSdb, pDb);
×
1013
      continue;
×
1014
    }
1015

1016
    int64_t remainder = ((curMin - (int64_t)pDb->cfg.compactTimeOffset * 60LL) % pDb->cfg.compactInterval);
3,463✔
1017
    if (remainder != 0) {
3,463✔
1018
      mDebug("db:%p,%s, current time:%" PRIi64 "m is not divisible by compact interval:%dm, offset:%" PRIi8
3,463✔
1019
             "h, remainder:%" PRIi64 "m, skip",
1020
             pDb, pDb->name, curMin, pDb->cfg.compactInterval, pDb->cfg.compactTimeOffset, remainder);
1021
      sdbRelease(pSdb, pDb);
3,463✔
1022
      continue;
3,463✔
1023
    }
1024

1025
    if ((pDb->compactStartTime / 60000LL) == curMin) {
×
1026
      mDebug("db:%p:%s, compact has already been dispatched at %" PRIi64 "m(%" PRIi64 "ms), skip", pDb, pDb->name,
×
1027
             curMin, pDb->compactStartTime);
1028
      sdbRelease(pSdb, pDb);
×
1029
      continue;
×
1030
    }
1031

1032
    STimeWindow tw = {
×
1033
        .skey = convertTimePrecision(curMs + compactStartTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision),
×
1034
        .ekey = convertTimePrecision(curMs + compactEndTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)};
×
1035

1036
    if ((code = mndCompactDb(pMnode, NULL, pDb, tw, NULL, false, TSDB_OPTR_NORMAL, TSDB_TRIGGER_AUTO)) == 0) {
×
1037
      mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
1038
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h",
1039
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
1040
            pDb->cfg.compactTimeOffset);
1041
    } else {
1042
      mWarn("db:%p,%s, failed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
1043
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h, since %s",
1044
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
1045
            pDb->cfg.compactTimeOffset, tstrerror(code));
1046
    }
1047

1048
    TAOS_UNUSED(mndCompactDispatchAudit(pMnode, pReq, pDb, &tw));
×
1049

1050
    sdbRelease(pSdb, pDb);
×
1051
  }
1052
  return 0;
3,324,207✔
1053
}
1054
#endif
1055

1056
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
3,324,207✔
1057
#ifdef TD_ENTERPRISE
1058
  mTrace("start to process compact timer");
3,324,207✔
1059
  mndCompactPullup(pReq->info.node);
3,324,207✔
1060
  TAOS_UNUSED(mndCompactDispatch(pReq));
3,324,207✔
1061
#endif
1062
  return 0;
3,324,207✔
1063
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc