• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5028

20 Apr 2026 09:07AM UTC coverage: 72.986% (-0.01%) from 72.996%
#5028

push

travis-ci

web-flow
perf: optimize compact progress query from O(m*n) to O(n+m) (#35115)

104 of 141 new or added lines in 4 files covered. (73.76%)

5170 existing lines in 133 files now uncovered.

273777 of 375111 relevant lines covered (72.99%)

130458474.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.52
/source/dnode/mnode/impl/src/mndCompact.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "audit.h"
16
#include "mndCompact.h"
17
#include "mndCompactDetail.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndPrivilege.h"
21
#include "mndShow.h"
22
#include "mndTrans.h"
23
#include "mndUser.h"
24
#include "mndVgroup.h"
25
#include "tmisce.h"
26
#include "tmsgcb.h"
27

28
#define MND_COMPACT_VER_NUMBER 1
29
#define MND_COMPACT_ID_LEN     11
30

31
static int32_t mndProcessCompactTimer(SRpcMsg *pReq);
32

33
int32_t mndInitCompact(SMnode *pMnode) {
472,750✔
34
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact);
472,750✔
35
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq);
472,750✔
36
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp);
472,750✔
37
  mndSetMsgHandle(pMnode, TDMT_DND_QUERY_COMPACT_PROGRESS_RSP, mndProcessDnodeCompactProgressRsp);
472,750✔
38
  mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer);
472,750✔
39
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_COMPACT_RSP, mndTransProcessRsp);
472,750✔
40

41
  SSdbTable table = {
472,750✔
42
      .sdbType = SDB_COMPACT,
43
      .keyType = SDB_KEY_INT32,
44
      .encodeFp = (SdbEncodeFp)mndCompactActionEncode,
45
      .decodeFp = (SdbDecodeFp)mndCompactActionDecode,
46
      .insertFp = (SdbInsertFp)mndCompactActionInsert,
47
      .updateFp = (SdbUpdateFp)mndCompactActionUpdate,
48
      .deleteFp = (SdbDeleteFp)mndCompactActionDelete,
49
  };
50

51
  return sdbSetTable(pMnode->pSdb, table);
472,750✔
52
}
53

54
void mndCleanupCompact(SMnode *pMnode) { mDebug("mnd compact cleanup"); }
472,688✔
55

56
void tFreeCompactObj(SCompactObj *pCompact) {}
139,328✔
57

58
int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) {
250,028✔
59
  SEncoder encoder = {0};
250,028✔
60
  int32_t  code = 0;
250,028✔
61
  int32_t  lino;
62
  int32_t  tlen;
63
  tEncoderInit(&encoder, buf, bufLen);
250,028✔
64

65
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
250,028✔
66
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->compactId));
500,056✔
67
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
500,056✔
68
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
500,056✔
69
  TAOS_CHECK_EXIT(tEncodeU32v(&encoder, pObj->flags));
500,056✔
70
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
500,056✔
71

72
  tEndEncode(&encoder);
250,028✔
73

74
_exit:
250,028✔
75
  if (code) {
250,028✔
76
    tlen = code;
×
77
  } else {
78
    tlen = encoder.pos;
250,028✔
79
  }
80
  tEncoderClear(&encoder);
250,028✔
81
  return tlen;
250,028✔
82
}
83

84
int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) {
139,328✔
85
  int32_t  code = 0;
139,328✔
86
  int32_t  lino;
87
  SDecoder decoder = {0};
139,328✔
88
  tDecoderInit(&decoder, buf, bufLen);
139,328✔
89

90
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
139,328✔
91
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->compactId));
278,656✔
92
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
139,328✔
93
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
278,656✔
94

95
  if (!tDecodeIsEnd(&decoder)) {
139,328✔
96
    TAOS_CHECK_EXIT(tDecodeU32v(&decoder, &pObj->flags));
278,656✔
97
  } else {
98
    pObj->flags = 0;
×
99
  }
100
  if (!tDecodeIsEnd(&decoder)) {
139,328✔
101
    TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
278,656✔
102
  } else {
103
    pObj->dbUid = 0;
×
104
  }
105

106
  tEndDecode(&decoder);
139,328✔
107

108
_exit:
139,328✔
109
  tDecoderClear(&decoder);
139,328✔
110
  return code;
139,328✔
111
}
112

113
SSdbRaw *mndCompactActionEncode(SCompactObj *pCompact) {
66,303✔
114
  int32_t code = 0;
66,303✔
115
  int32_t lino = 0;
66,303✔
116
  terrno = TSDB_CODE_SUCCESS;
66,303✔
117

118
  void    *buf = NULL;
66,303✔
119
  SSdbRaw *pRaw = NULL;
66,303✔
120

121
  int32_t tlen = tSerializeSCompactObj(NULL, 0, pCompact);
66,303✔
122
  if (tlen < 0) {
66,303✔
123
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
124
    goto OVER;
×
125
  }
126

127
  int32_t size = sizeof(int32_t) + tlen;
66,303✔
128
  pRaw = sdbAllocRaw(SDB_COMPACT, MND_COMPACT_VER_NUMBER, size);
66,303✔
129
  if (pRaw == NULL) {
66,303✔
130
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
131
    goto OVER;
×
132
  }
133

134
  buf = taosMemoryMalloc(tlen);
66,303✔
135
  if (buf == NULL) {
66,303✔
136
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
137
    goto OVER;
×
138
  }
139

140
  tlen = tSerializeSCompactObj(buf, tlen, pCompact);
66,303✔
141
  if (tlen < 0) {
66,303✔
142
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
143
    goto OVER;
×
144
  }
145

146
  int32_t dataPos = 0;
66,303✔
147
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
66,303✔
148
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
66,303✔
149
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
66,303✔
150

151
OVER:
66,303✔
152
  taosMemoryFreeClear(buf);
66,303✔
153
  if (terrno != TSDB_CODE_SUCCESS) {
66,303✔
154
    mError("compact:%" PRId32 ", failed to encode to raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
155
    sdbFreeRaw(pRaw);
×
156
    return NULL;
×
157
  }
158

159
  mTrace("compact:%" PRId32 ", encode to raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
66,303✔
160
  return pRaw;
66,303✔
161
}
162

163
SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) {
84,920✔
164
  int32_t      code = 0;
84,920✔
165
  int32_t      lino = 0;
84,920✔
166
  SSdbRow     *pRow = NULL;
84,920✔
167
  SCompactObj *pCompact = NULL;
84,920✔
168
  void        *buf = NULL;
84,920✔
169
  terrno = TSDB_CODE_SUCCESS;
84,920✔
170

171
  int8_t sver = 0;
84,920✔
172
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
84,920✔
173
    goto OVER;
×
174
  }
175

176
  if (sver != MND_COMPACT_VER_NUMBER) {
84,920✔
177
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
178
    mError("compact read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER);
×
179
    goto OVER;
×
180
  }
181

182
  pRow = sdbAllocRow(sizeof(SCompactObj));
84,920✔
183
  if (pRow == NULL) {
84,920✔
184
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
185
    goto OVER;
×
186
  }
187

188
  pCompact = sdbGetRowObj(pRow);
84,920✔
189
  if (pCompact == NULL) {
84,920✔
190
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
191
    goto OVER;
×
192
  }
193

194
  int32_t tlen;
84,920✔
195
  int32_t dataPos = 0;
84,920✔
196
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
84,920✔
197
  buf = taosMemoryMalloc(tlen + 1);
84,920✔
198
  if (buf == NULL) {
84,920✔
199
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
200
    goto OVER;
×
201
  }
202
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
84,920✔
203

204
  if ((terrno = tDeserializeSCompactObj(buf, tlen, pCompact)) < 0) {
84,920✔
205
    goto OVER;
×
206
  }
207

208
OVER:
84,920✔
209
  taosMemoryFreeClear(buf);
84,920✔
210
  if (terrno != TSDB_CODE_SUCCESS) {
84,920✔
211
    mError("compact:%" PRId32 ", failed to decode from raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
212
    taosMemoryFreeClear(pRow);
×
213
    return NULL;
×
214
  }
215

216
  mTrace("compact:%" PRId32 ", decode from raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
84,920✔
217
  return pRow;
84,920✔
218
}
219

220
int32_t mndCompactActionInsert(SSdb *pSdb, SCompactObj *pCompact) {
43,196✔
221
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
43,196✔
222
  return 0;
43,196✔
223
}
224

225
int32_t mndCompactActionDelete(SSdb *pSdb, SCompactObj *pCompact) {
84,920✔
226
  mTrace("compact:%" PRId32 ", perform delete action", pCompact->compactId);
84,920✔
227
  tFreeCompactObj(pCompact);
84,920✔
228
  return 0;
84,920✔
229
}
230

231
int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj *pNewCompact) {
1,570✔
232
  mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p", pOldCompact->compactId, pOldCompact,
1,570✔
233
         pNewCompact);
234

235
  return 0;
1,570✔
236
}
237

238
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
154,018✔
239
  SSdb        *pSdb = pMnode->pSdb;
154,018✔
240
  SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId);
154,018✔
241
  if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
154,018✔
242
    terrno = TSDB_CODE_SUCCESS;
×
243
  }
244
  return pCompact;
154,018✔
245
}
246

247
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
154,018✔
248
  SSdb *pSdb = pMnode->pSdb;
154,018✔
249
  sdbRelease(pSdb, pCompact);
154,018✔
250
  pCompact = NULL;
154,018✔
251
}
154,018✔
252

253
static int32_t mndCompactGetDbInfo(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len, int64_t *dbUid) {
61,778✔
254
  int32_t      code = 0;
61,778✔
255
  SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
61,778✔
256
  if (pCompact == NULL) {
61,778✔
257
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
258
    if (terrno != 0) code = terrno;
×
259
    TAOS_RETURN(code);
×
260
  }
261

262
  tstrncpy(dbname, pCompact->dbname, len);
61,778✔
263
  if (dbUid) *dbUid = pCompact->dbUid;
61,778✔
264
  mndReleaseCompact(pMnode, pCompact);
61,778✔
265
  TAOS_RETURN(code);
61,778✔
266
}
267

268
// compact db
269
int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SDbObj *pDb, SCompactDbRsp *rsp) {
32,652✔
270
  int32_t code = 0;
32,652✔
271
  pCompact->compactId = tGenIdPI32();
32,652✔
272

273
  tstrncpy(pCompact->dbname, pDb->name, sizeof(pCompact->dbname));
32,652✔
274
  pCompact->dbUid = pDb->uid;
32,652✔
275

276
  pCompact->startTime = taosGetTimestampMs();
32,652✔
277

278
  SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact);
32,652✔
279
  if (pVgRaw == NULL) {
32,652✔
280
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
281
    if (terrno != 0) code = terrno;
×
282
    TAOS_RETURN(code);
×
283
  }
284
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
32,652✔
285
    sdbFreeRaw(pVgRaw);
×
286
    TAOS_RETURN(code);
×
287
  }
288

289
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
32,652✔
290
    sdbFreeRaw(pVgRaw);
×
291
    TAOS_RETURN(code);
×
292
  }
293

294
  rsp->compactId = pCompact->compactId;
32,652✔
295

296
  return 0;
32,652✔
297
}
298

299
// retrieve compact
300
int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
373,936✔
301
  SMnode      *pMnode = pReq->info.node;
373,936✔
302
  SSdb        *pSdb = pMnode->pSdb;
373,936✔
303
  int32_t      numOfRows = 0;
373,936✔
304
  SCompactObj *pCompact = NULL;
373,936✔
305
  char        *sep = NULL;
373,936✔
306
  SDbObj      *pDb = NULL;
373,936✔
307
  int32_t      code = 0;
373,936✔
308
  int32_t      lino = 0;
373,936✔
309
  SUserObj    *pUser = NULL;
373,936✔
310
  SDbObj      *pIterDb = NULL;
373,936✔
311
  char         objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
373,936✔
312
  bool         showAll = false, showIter = false;
373,936✔
313
  int64_t      dbUid = 0;
373,936✔
314

315
  if (strlen(pShow->db) > 0) {
373,936✔
316
    sep = strchr(pShow->db, '.');
×
317
    if (sep &&
×
318
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
319
      sep++;
×
320
    } else {
321
      pDb = mndAcquireDb(pMnode, pShow->db);
×
322
      if (pDb == NULL) return terrno;
×
323
    }
324
  }
325

326
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_COMPACTS, PRIV_OBJ_DB, 0, _OVER);
373,936✔
327

328
  while (numOfRows < rows) {
722,719✔
329
    pShow->pIter = sdbFetch(pSdb, SDB_COMPACT, pShow->pIter, (void **)&pCompact);
722,719✔
330
    if (pShow->pIter == NULL) break;
722,719✔
331

332
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pCompact->dbname, pCompact, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_COMPACTS, _OVER);
348,783✔
333

334
    SColumnInfoData *pColInfo;
335
    SName            n;
336
    int32_t          cols = 0;
348,783✔
337

338
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
348,783✔
339

340
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
348,783✔
341
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->compactId, false), pCompact, &lino,
348,783✔
342
                        _OVER);
343

344
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
348,783✔
345
    if (pDb != NULL || !IS_SYS_DBNAME(pCompact->dbname)) {
697,566✔
346
      SName name = {0};
348,783✔
347
      TAOS_CHECK_GOTO(tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
348,783✔
348
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
348,783✔
349
    } else {
350
      tstrncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
×
351
    }
352
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
348,783✔
353
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pCompact, &lino, _OVER);
348,783✔
354

355
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
348,783✔
356
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->startTime, false), pCompact, &lino,
348,783✔
357
                        _OVER);
358

359
    numOfRows++;
348,783✔
360
    sdbRelease(pSdb, pCompact);
348,783✔
361
  }
362

363
_OVER:
373,936✔
364
  if (pUser) mndReleaseUser(pMnode, pUser);
373,936✔
365
  mndReleaseDb(pMnode, pDb);
373,936✔
366
  if (code != 0) {
373,936✔
367
    mError("failed to retrieve compact at line %d since %s", lino, tstrerror(code));
×
368
    TAOS_RETURN(code);
×
369
  }
370
  pShow->numOfRows += numOfRows;
373,936✔
371
  return numOfRows;
373,936✔
372
}
373

374
// kill compact
375
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId,
1,570✔
376
                                    int32_t dnodeid) {
377
  SVKillCompactReq req = {0};
1,570✔
378
  req.compactId = compactId;
1,570✔
379
  req.vgId = pVgroup->vgId;
1,570✔
380
  req.dnodeId = dnodeid;
1,570✔
381
  terrno = 0;
1,570✔
382

383
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
1,570✔
384
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
1,570✔
385
  if (contLen < 0) {
1,570✔
386
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
387
    return NULL;
×
388
  }
389
  contLen += sizeof(SMsgHead);
1,570✔
390

391
  void *pReq = taosMemoryMalloc(contLen);
1,570✔
392
  if (pReq == NULL) {
1,570✔
393
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
394
    return NULL;
×
395
  }
396

397
  SMsgHead *pHead = pReq;
1,570✔
398
  pHead->contLen = htonl(contLen);
1,570✔
399
  pHead->vgId = htonl(pVgroup->vgId);
1,570✔
400

401
  mTrace("vgId:%d, build compact vnode config req, contLen:%d", pVgroup->vgId, contLen);
1,570✔
402
  int32_t ret = 0;
1,570✔
403
  if ((ret = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
1,570✔
404
    terrno = ret;
×
405
    taosMemoryFreeClear(pReq);
×
406
    return NULL;
×
407
  }
408
  *pContLen = contLen;
1,570✔
409
  return pReq;
1,570✔
410
}
411

412
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId,
1,570✔
413
                                       int32_t dnodeid) {
414
  int32_t      code = 0;
1,570✔
415
  STransAction action = {0};
1,570✔
416

417
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
1,570✔
418
  if (pDnode == NULL) {
1,570✔
419
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
420
    if (terrno != 0) code = terrno;
×
421
    TAOS_RETURN(code);
×
422
  }
423
  action.epSet = mndGetDnodeEpset(pDnode);
1,570✔
424
  mndReleaseDnode(pMnode, pDnode);
1,570✔
425

426
  int32_t contLen = 0;
1,570✔
427
  void   *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid);
1,570✔
428
  if (pReq == NULL) {
1,570✔
429
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
430
    if (terrno != 0) code = terrno;
×
431
    TAOS_RETURN(code);
×
432
  }
433

434
  action.pCont = pReq;
1,570✔
435
  action.contLen = contLen;
1,570✔
436
  action.msgType = TDMT_VND_KILL_COMPACT;
1,570✔
437

438
  mTrace("trans:%d, kill compact msg len:%d", pTrans->id, contLen);
1,570✔
439

440
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,570✔
441
    taosMemoryFree(pReq);
×
442
    TAOS_RETURN(code);
×
443
  }
444

445
  return 0;
1,570✔
446
}
447

448
static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) {
1,570✔
449
  int32_t code = 0;
1,570✔
450
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact");
1,570✔
451
  if (pTrans == NULL) {
1,570✔
452
    mError("compact:%" PRId32 ", failed to drop since %s", pCompact->compactId, terrstr());
×
453
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
454
    if (terrno != 0) code = terrno;
×
455
    TAOS_RETURN(code);
×
456
  }
457
  mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId);
1,570✔
458

459
  mndTransSetDbName(pTrans, pCompact->dbname, NULL);
1,570✔
460

461
  SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
1,570✔
462
  if (pCommitRaw == NULL) {
1,570✔
463
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
464
    if (terrno != 0) code = terrno;
×
465
    mndTransDrop(pTrans);
×
466
    TAOS_RETURN(code);
×
467
  }
468
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
1,570✔
469
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
470
    mndTransDrop(pTrans);
×
471
    TAOS_RETURN(code);
×
472
  }
473
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
1,570✔
474
    mndTransDrop(pTrans);
×
475
    TAOS_RETURN(code);
×
476
  }
477

478
  void *pIter = NULL;
1,570✔
479
  while (1) {
3,140✔
480
    SCompactDetailObj *pDetail = NULL;
4,710✔
481
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
4,710✔
482
    if (pIter == NULL) break;
4,710✔
483

484
    if (pDetail->compactId == pCompact->compactId) {
3,140✔
485
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
1,570✔
486
      if (pVgroup == NULL) {
1,570✔
487
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
488
        sdbCancelFetch(pMnode->pSdb, pIter);
×
489
        sdbRelease(pMnode->pSdb, pDetail);
×
490
        mndTransDrop(pTrans);
×
491
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
492
        if (terrno != 0) code = terrno;
×
493
        TAOS_RETURN(code);
×
494
      }
495

496
      if ((code = mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId)) != 0) {
1,570✔
497
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
498
        sdbCancelFetch(pMnode->pSdb, pIter);
×
499
        sdbRelease(pMnode->pSdb, pDetail);
×
500
        mndTransDrop(pTrans);
×
501
        TAOS_RETURN(code);
×
502
      }
503

504
      mndReleaseVgroup(pMnode, pVgroup);
1,570✔
505

506
      /*
507
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
508
      if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
509
        mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
510
        mndTransDrop(pTrans);
511
        return -1;
512
      }
513
      sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
514
      */
515
    }
516

517
    sdbRelease(pMnode->pSdb, pDetail);
3,140✔
518
  }
519

520
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
1,570✔
521
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
522
    mndTransDrop(pTrans);
×
523
    TAOS_RETURN(code);
×
524
  }
525

526
  mndTransDrop(pTrans);
1,570✔
527
  return 0;
1,570✔
528
}
529

530
int32_t mndProcessKillCompactReq(SRpcMsg *pReq) {
1,570✔
531
  int32_t         code = 0;
1,570✔
532
  int32_t         lino = 0;
1,570✔
533
  SKillCompactReq killCompactReq = {0};
1,570✔
534
  int64_t         tss = taosGetTimestampMs();
1,570✔
535

536
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq)) != 0) {
1,570✔
537
    TAOS_RETURN(code);
×
538
  }
539

540
  mInfo("start to kill compact:%" PRId32, killCompactReq.compactId);
1,570✔
541

542
  SMnode      *pMnode = pReq->info.node;
1,570✔
543
  SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId);
1,570✔
544
  if (pCompact == NULL) {
1,570✔
545
    code = TSDB_CODE_MND_INVALID_COMPACT_ID;
×
546
    tFreeSKillCompactReq(&killCompactReq);
×
547
    TAOS_RETURN(code);
×
548
  }
549

550
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_COMPACT_DB), &lino, _OVER);
1,570✔
551

552
  TAOS_CHECK_GOTO(mndKillCompact(pMnode, pReq, pCompact), &lino, _OVER);
1,570✔
553

554
  code = TSDB_CODE_ACTION_IN_PROGRESS;
1,570✔
555

556
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
1,570✔
557
    char    obj[TSDB_INT32_ID_LEN] = {0};
1,570✔
558
    int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pCompact->compactId);
1,570✔
559
    if ((uint32_t)nBytes < sizeof(obj)) {
1,570✔
560
      int64_t tse = taosGetTimestampMs();
1,570✔
561
      double  duration = (double)(tse - tss);
1,570✔
562
      duration = duration / 1000;
1,570✔
563
      auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql,
1,570✔
564
                  killCompactReq.sqlLen, duration, 0);
565
    } else {
566
      mError("compact:%" PRId32 " failed to audit since %s", pCompact->compactId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
567
    }
568
  }
569

570
_OVER:
1,570✔
571
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,570✔
572
    mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr());
×
573
  }
574

575
  tFreeSKillCompactReq(&killCompactReq);
1,570✔
576
  mndReleaseCompact(pMnode, pCompact);
1,570✔
577

578
  TAOS_RETURN(code);
1,570✔
579
}
580

581
// update progress
UNCOV
582
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId,
×
583
                                        SQueryCompactProgressRsp *rsp) {
UNCOV
584
  int32_t code = 0;
×
585

UNCOV
586
  void *pIter = NULL;
×
UNCOV
587
  while (1) {
×
UNCOV
588
    SCompactDetailObj *pDetail = NULL;
×
UNCOV
589
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
UNCOV
590
    if (pIter == NULL) break;
×
591

UNCOV
592
    if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
×
UNCOV
593
      pDetail->newNumberFileset = rsp->numberFileset;
×
UNCOV
594
      pDetail->newFinished = rsp->finished;
×
UNCOV
595
      pDetail->progress = rsp->progress;
×
UNCOV
596
      pDetail->remainingTime = rsp->remainingTime;
×
597

UNCOV
598
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
599
      sdbRelease(pMnode->pSdb, pDetail);
×
600

UNCOV
601
      TAOS_RETURN(code);
×
602
    }
603

UNCOV
604
    sdbRelease(pMnode->pSdb, pDetail);
×
605
  }
606

UNCOV
607
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
×
608
}
609

UNCOV
610
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) {
×
UNCOV
611
  int32_t                  code = 0;
×
UNCOV
612
  SQueryCompactProgressRsp req = {0};
×
UNCOV
613
  if (pReq->code != 0) {
×
UNCOV
614
    mError("received wrong compact response, req code is %s", tstrerror(pReq->code));
×
UNCOV
615
    TAOS_RETURN(pReq->code);
×
616
  }
UNCOV
617
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
×
UNCOV
618
  if (code != 0) {
×
619
    mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
620
           pReq->contLen);
621
    TAOS_RETURN(code);
×
622
  }
623

UNCOV
624
  mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
×
625
         req.vgId, req.dnodeId, req.numberFileset, req.finished);
626

UNCOV
627
  SMnode *pMnode = pReq->info.node;
×
628

UNCOV
629
  code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
×
UNCOV
630
  if (code != 0) {
×
UNCOV
631
    mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
×
632
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
UNCOV
633
    TAOS_RETURN(code);
×
634
  }
635

UNCOV
636
  TAOS_RETURN(code);
×
637
}
638

639
// timer
640
void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
61,778✔
641
  SSdb *pSdb = pMnode->pSdb;
61,778✔
642
  void *pIter = NULL;
61,778✔
643

644
  mDebug("compact:%d, broadcast progress query to all dnodes", pCompact->compactId);
61,778✔
645

646
  while (1) {
143,634✔
647
    SDnodeObj *pDnode = NULL;
205,412✔
648
    pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
205,412✔
649
    if (pIter == NULL) break;
205,412✔
650

651
    SDnodeQueryCompactProgressReq req = {.compactId = pCompact->compactId};
143,634✔
652

653
    int32_t contLen = tSerializeSDnodeQueryCompactProgressReq(NULL, 0, &req);
143,634✔
654
    if (contLen < 0) {
143,634✔
NEW
655
      sdbRelease(pSdb, pDnode);
×
NEW
656
      continue;
×
657
    }
658
    contLen += sizeof(SMsgHead);
143,634✔
659

660
    SMsgHead *pHead = rpcMallocCont(contLen);
143,634✔
661
    if (pHead == NULL) {
143,634✔
NEW
662
      sdbRelease(pSdb, pDnode);
×
NEW
663
      continue;
×
664
    }
665
    pHead->contLen = htonl(contLen);
143,634✔
666
    pHead->vgId    = htonl(0);
143,634✔
667

668
    if (tSerializeSDnodeQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead),
143,634✔
NEW
669
                                                contLen - sizeof(SMsgHead), &req) < 0) {
×
NEW
670
      rpcFreeCont(pHead);
×
NEW
671
      sdbRelease(pSdb, pDnode);
×
NEW
672
      continue;
×
673
    }
674

675
    SEpSet  epSet = mndGetDnodeEpset(pDnode);
143,634✔
676
    SRpcMsg rpcMsg = {.msgType = TDMT_DND_QUERY_COMPACT_PROGRESS, .pCont = pHead, .contLen = contLen};
143,634✔
677

678
    char    detail[256] = {0};
143,634✔
679
    int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
287,268✔
680
                            TMSG_INFO(TDMT_DND_QUERY_COMPACT_PROGRESS), epSet.numOfEps, epSet.inUse);
287,268✔
681
    for (int32_t i = 0; i < epSet.numOfEps; ++i) {
287,268✔
682
      len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
143,634✔
683
    }
684
    mDebug("compact:%d, send progress query to dnode:%d %s", pCompact->compactId, pDnode->id, detail);
143,634✔
685

686
    if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
143,634✔
687
      mError("compact:%d, failed to send progress query to dnode:%d", pCompact->compactId, pDnode->id);
2,070✔
688
    }
689

690
    sdbRelease(pSdb, pDnode);
143,634✔
691
  }
692
}
61,778✔
693

694
static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
61,778✔
695
  int32_t code = 0;
61,778✔
696
  bool    needSave = false;
61,778✔
697
  void   *pIter = NULL;
61,778✔
698
  while (1) {
175,960✔
699
    SCompactDetailObj *pDetail = NULL;
237,738✔
700
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
237,738✔
701
    if (pIter == NULL) break;
237,738✔
702

703
    if (pDetail->compactId == compactId) {
175,960✔
704
      mDebug(
136,396✔
705
          "compact:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
706
          "newNumberFileset:%d, newFinished:%d",
707
          pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
708
          pDetail->newNumberFileset, pDetail->newFinished);
709

710
      // these 2 number will jump back after dnode restart, so < is not used here
711
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
136,396✔
712
        needSave = true;
58,465✔
713
    }
714

715
    sdbRelease(pMnode->pSdb, pDetail);
175,960✔
716
  }
717

718
  char    dbname[TSDB_TABLE_FNAME_LEN] = {0};
61,778✔
719
  int64_t dbUid = 0;
61,778✔
720
  TAOS_CHECK_RETURN(mndCompactGetDbInfo(pMnode, compactId, dbname, TSDB_TABLE_FNAME_LEN, &dbUid));
61,778✔
721

722
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
61,778✔
723
    needSave = true;
2,436✔
724
    mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, dbname);
2,436✔
725
  }
726

727
  if (!needSave) {
61,778✔
728
    mDebug("compact:%" PRId32 ", no need to save", compactId);
27,871✔
729
    TAOS_RETURN(code);
27,871✔
730
  }
731

732
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress");
33,907✔
733
  if (pTrans == NULL) {
33,907✔
734
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
735
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
736
    if (terrno != 0) code = terrno;
×
737
    TAOS_RETURN(code);
×
738
  }
739
  mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
33,907✔
740

741
  mndTransSetDbName(pTrans, dbname, NULL);
33,907✔
742

743
  pIter = NULL;
33,907✔
744
  while (1) {
98,868✔
745
    SCompactDetailObj *pDetail = NULL;
132,775✔
746
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
132,775✔
747
    if (pIter == NULL) break;
132,775✔
748

749
    if (pDetail->compactId == compactId) {
98,868✔
750
      mInfo(
81,829✔
751
          "compact:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
752
          "newNumberFileset:%d, newFinished:%d",
753
          pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
754
          pDetail->newNumberFileset, pDetail->newFinished);
755

756
      pDetail->numberFileset = pDetail->newNumberFileset;
81,829✔
757
      pDetail->finished = pDetail->newFinished;
81,829✔
758

759
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
81,829✔
760
      if (pCommitRaw == NULL) {
81,829✔
761
        sdbCancelFetch(pMnode->pSdb, pIter);
×
762
        sdbRelease(pMnode->pSdb, pDetail);
×
763
        mndTransDrop(pTrans);
×
764
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
765
        if (terrno != 0) code = terrno;
×
766
        TAOS_RETURN(code);
×
767
      }
768
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
81,829✔
769
        mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr());
×
770
        sdbCancelFetch(pMnode->pSdb, pIter);
×
771
        sdbRelease(pMnode->pSdb, pDetail);
×
772
        mndTransDrop(pTrans);
×
773
        TAOS_RETURN(code);
×
774
      }
775
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
81,829✔
776
        sdbCancelFetch(pMnode->pSdb, pIter);
×
777
        sdbRelease(pMnode->pSdb, pDetail);
×
778
        mndTransDrop(pTrans);
×
779
        TAOS_RETURN(code);
×
780
      }
781
    }
782

783
    sdbRelease(pMnode->pSdb, pDetail);
98,868✔
784
  }
785

786
  bool allFinished = true;
33,907✔
787
  pIter = NULL;
33,907✔
788
  while (1) {
59,174✔
789
    SCompactDetailObj *pDetail = NULL;
93,081✔
790
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
93,081✔
791
    if (pIter == NULL) break;
93,081✔
792

793
    if (pDetail->compactId == compactId) {
65,893✔
794
      mInfo("compact:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
60,791✔
795
            pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
796

797
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
60,791✔
798
        allFinished = false;
3,109✔
799
        sdbCancelFetch(pMnode->pSdb, pIter);
3,109✔
800
        sdbRelease(pMnode->pSdb, pDetail);
3,109✔
801
        break;
3,109✔
802
      }
803
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
57,682✔
804
        allFinished = false;
3,610✔
805
        sdbCancelFetch(pMnode->pSdb, pIter);
3,610✔
806
        sdbRelease(pMnode->pSdb, pDetail);
3,610✔
807
        break;
3,610✔
808
      }
809
    }
810

811
    sdbRelease(pMnode->pSdb, pDetail);
59,174✔
812
  }
813

814
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
33,907✔
815
    allFinished = true;
2,436✔
816
    mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, dbname);
2,436✔
817
  }
818

819
  if (allFinished) {
33,907✔
820
    mInfo("compact:%d, all finished", compactId);
28,892✔
821
    pIter = NULL;
28,892✔
822
    while (1) {
71,974✔
823
      SCompactDetailObj *pDetail = NULL;
100,866✔
824
      pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
100,866✔
825
      if (pIter == NULL) break;
100,866✔
826

827
      if (pDetail->compactId == compactId) {
71,974✔
828
        SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
70,175✔
829
        if (pCommitRaw == NULL) {
70,175✔
830
          mndTransDrop(pTrans);
×
831
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
832
          if (terrno != 0) code = terrno;
×
833
          TAOS_RETURN(code);
×
834
        }
835
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
70,175✔
836
          mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id,
×
837
                 terrstr());
838
          sdbCancelFetch(pMnode->pSdb, pIter);
×
839
          sdbRelease(pMnode->pSdb, pDetail);
×
840
          mndTransDrop(pTrans);
×
841
          TAOS_RETURN(code);
×
842
        }
843
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
70,175✔
844
          sdbCancelFetch(pMnode->pSdb, pIter);
×
845
          sdbRelease(pMnode->pSdb, pDetail);
×
846
          mndTransDrop(pTrans);
×
847
          TAOS_RETURN(code);
×
848
        }
849
        mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId);
70,175✔
850
      }
851

852
      sdbRelease(pMnode->pSdb, pDetail);
71,974✔
853
    }
854

855
    SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
28,892✔
856
    if (pCompact == NULL) {
28,892✔
857
      mndTransDrop(pTrans);
×
858
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
859
      if (terrno != 0) code = terrno;
×
860
      TAOS_RETURN(code);
×
861
    }
862
    SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
28,892✔
863
    mndReleaseCompact(pMnode, pCompact);
28,892✔
864
    if (pCommitRaw == NULL) {
28,892✔
865
      mndTransDrop(pTrans);
×
866
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
867
      if (terrno != 0) code = terrno;
×
868
      TAOS_RETURN(code);
×
869
    }
870
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
28,892✔
871
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
872
      mndTransDrop(pTrans);
×
873
      TAOS_RETURN(code);
×
874
    }
875
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
28,892✔
876
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
877
      mndTransDrop(pTrans);
×
878
      TAOS_RETURN(code);
×
879
    }
880
    mInfo("compact:%d, add drop compact action", pCompact->compactId);
28,892✔
881
  }
882

883
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
33,907✔
884
    mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
1,542✔
885
    mndTransDrop(pTrans);
1,542✔
886
    TAOS_RETURN(code);
1,542✔
887
  }
888

889
  mndTransDrop(pTrans);
32,365✔
890
  return 0;
32,365✔
891
}
892

893
static void mndCompactPullup(SMnode *pMnode) {
3,290,176✔
894
  int32_t code = 0;
3,290,176✔
895
  SSdb   *pSdb = pMnode->pSdb;
3,290,176✔
896
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t));
3,290,176✔
897
  if (pArray == NULL) return;
3,290,176✔
898

899
  void *pIter = NULL;
3,290,176✔
900
  while (1) {
61,778✔
901
    SCompactObj *pCompact = NULL;
3,351,954✔
902
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
3,351,954✔
903
    if (pIter == NULL) break;
3,351,954✔
904
    if (taosArrayPush(pArray, &pCompact->compactId) == NULL) {
123,556✔
905
      mError("failed to push compact id:%d into array, but continue pull up", pCompact->compactId);
×
906
    }
907
    sdbRelease(pSdb, pCompact);
61,778✔
908
  }
909

910
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
3,351,954✔
911
    mInfo("begin to pull up");
61,778✔
912
    int32_t     *pCompactId = taosArrayGet(pArray, i);
61,778✔
913
    SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);
61,778✔
914
    if (pCompact != NULL) {
61,778✔
915
      mInfo("compact:%d, begin to pull up", pCompact->compactId);
61,778✔
916
      mndCompactSendProgressReq(pMnode, pCompact);
61,778✔
917
      if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
61,778✔
918
        mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
1,542✔
919
      }
920
      mndReleaseCompact(pMnode, pCompact);
61,778✔
921
    }
922
  }
923
  taosArrayDestroy(pArray);
3,290,176✔
924
}
925
#ifdef TD_ENTERPRISE
926
static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
×
927
  if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
×
928
    return 0;
×
929
  }
930
  int64_t tss = taosGetTimestampMs();
×
931

932
  SName   name = {0};
×
933
  int32_t sqlLen = 0;
×
934
  char    sql[256] = {0};
×
935
  char    skeyStr[40] = {0};
×
936
  char    ekeyStr[40] = {0};
×
937
  char   *pDbName = pDb->name;
×
938

939
  if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) {
×
940
    pDbName = name.dbname;
×
941
  }
942

943
  if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 &&
×
944
      taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) {
×
945
    sqlLen = snprintf(sql, sizeof(sql), "compact db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr);
×
946
  } else {
947
    sqlLen = snprintf(sql, sizeof(sql), "compact db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
×
948
                      tw->ekey);
949
  }
950

951
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
×
952
    int64_t tse = taosGetTimestampMs();
×
953
    double  duration = (double)(tse - tss);
×
954
    duration = duration / 1000;
×
955
    auditRecord(NULL, pMnode->clusterId, "autoCompactDB", name.dbname, "", sql, sqlLen, duration, 0);
×
956
  }
957

958
  return 0;
×
959
}
960

961
extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds,
962
                            bool metaOnly, ETsdbOpType type, ETriggerType triggerType);
963
static int32_t mndCompactDispatch(SRpcMsg *pReq) {
3,290,176✔
964
  int32_t code = 0;
3,290,176✔
965
  SMnode *pMnode = pReq->info.node;
3,290,176✔
966
  SSdb   *pSdb = pMnode->pSdb;
3,290,176✔
967
  int64_t curMs = taosGetTimestampMs();
3,290,176✔
968
  int64_t curMin = curMs / 60000LL;
3,290,176✔
969

970
  void   *pIter = NULL;
3,290,176✔
971
  SDbObj *pDb = NULL;
3,290,176✔
972
  while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
9,364,378✔
973
    if (pDb->cfg.compactInterval <= 0) {
6,074,202✔
974
      mDebug("db:%p,%s, compact interval is %dm, skip", pDb, pDb->name, pDb->cfg.compactInterval);
6,070,277✔
975
      sdbRelease(pSdb, pDb);
6,070,277✔
976
      continue;
6,070,277✔
977
    }
978

979
    if (pDb->cfg.isMount) {
3,925✔
980
      sdbRelease(pSdb, pDb);
×
981
      continue;
×
982
    }
983

984
    // daysToKeep2 would be altered
985
    if (pDb->cfg.compactEndTime && (pDb->cfg.compactEndTime <= -pDb->cfg.daysToKeep2)) {
3,925✔
986
      mWarn("db:%p,%s, compact end time:%dm <= -keep2:%dm , skip", pDb, pDb->name, pDb->cfg.compactEndTime,
×
987
            -pDb->cfg.daysToKeep2);
988
      sdbRelease(pSdb, pDb);
×
989
      continue;
×
990
    }
991

992
    int64_t compactStartTime = pDb->cfg.compactStartTime ? pDb->cfg.compactStartTime : -pDb->cfg.daysToKeep2;
3,925✔
993
    int64_t compactEndTime = pDb->cfg.compactEndTime ? pDb->cfg.compactEndTime : -pDb->cfg.daysPerFile;
3,925✔
994

995
    if (compactStartTime >= compactEndTime) {
3,925✔
996
      mDebug("db:%p,%s, compact start time:%" PRIi64 "m >= end time:%" PRIi64 "m, skip", pDb, pDb->name,
×
997
             compactStartTime, compactEndTime);
998
      sdbRelease(pSdb, pDb);
×
999
      continue;
×
1000
    }
1001

1002
    int64_t remainder = ((curMin - (int64_t)pDb->cfg.compactTimeOffset * 60LL) % pDb->cfg.compactInterval);
3,925✔
1003
    if (remainder != 0) {
3,925✔
1004
      mDebug("db:%p,%s, current time:%" PRIi64 "m is not divisible by compact interval:%dm, offset:%" PRIi8
3,925✔
1005
             "h, remainder:%" PRIi64 "m, skip",
1006
             pDb, pDb->name, curMin, pDb->cfg.compactInterval, pDb->cfg.compactTimeOffset, remainder);
1007
      sdbRelease(pSdb, pDb);
3,925✔
1008
      continue;
3,925✔
1009
    }
1010

1011
    if ((pDb->compactStartTime / 60000LL) == curMin) {
×
1012
      mDebug("db:%p:%s, compact has already been dispatched at %" PRIi64 "m(%" PRIi64 "ms), skip", pDb, pDb->name,
×
1013
             curMin, pDb->compactStartTime);
1014
      sdbRelease(pSdb, pDb);
×
1015
      continue;
×
1016
    }
1017

1018
    STimeWindow tw = {
×
1019
        .skey = convertTimePrecision(curMs + compactStartTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision),
×
1020
        .ekey = convertTimePrecision(curMs + compactEndTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)};
×
1021

1022
    if ((code = mndCompactDb(pMnode, NULL, pDb, tw, NULL, false, TSDB_OPTR_NORMAL, TSDB_TRIGGER_AUTO)) == 0) {
×
1023
      mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
1024
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h",
1025
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
1026
            pDb->cfg.compactTimeOffset);
1027
    } else {
1028
      mWarn("db:%p,%s, failed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
1029
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h, since %s",
1030
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
1031
            pDb->cfg.compactTimeOffset, tstrerror(code));
1032
    }
1033

1034
    TAOS_UNUSED(mndCompactDispatchAudit(pMnode, pReq, pDb, &tw));
×
1035

1036
    sdbRelease(pSdb, pDb);
×
1037
  }
1038
  return 0;
3,290,176✔
1039
}
1040
#endif
1041

1042
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
3,290,176✔
1043
#ifdef TD_ENTERPRISE
1044
  mTrace("start to process compact timer");
3,290,176✔
1045
  mndCompactPullup(pReq->info.node);
3,290,176✔
1046
  TAOS_UNUSED(mndCompactDispatch(pReq));
3,290,176✔
1047
#endif
1048
  return 0;
3,290,176✔
1049
}
1050

1051
int32_t mndProcessDnodeCompactProgressRsp(SRpcMsg *pReq) {
141,564✔
1052
  int32_t                       code = 0;
141,564✔
1053
  SDnodeQueryCompactProgressRsp rsp = {0};
141,564✔
1054
  SHashObj                     *pProgressMap = NULL;
141,564✔
1055

1056
  if (pReq->code != 0) {
141,564✔
NEW
1057
    mError("received dnode compact progress rsp with error: %s", tstrerror(pReq->code));
×
NEW
1058
    code = pReq->code;
×
NEW
1059
    goto _exit;
×
1060
  }
1061

1062
  code = tDeserializeSDnodeQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &rsp);
141,564✔
1063
  if (code != 0) {
141,564✔
NEW
1064
    mError("failed to deserialize dnode-query-compact-progress-rsp, code:%s", tstrerror(code));
×
NEW
1065
    goto _exit;
×
1066
  }
1067

1068
  mDebug("compact progress rsp from dnode:%d, numOfVnodes:%d", rsp.dnodeId, rsp.numOfVnodes);
141,564✔
1069

1070
  SMnode *pMnode = pReq->info.node;
141,564✔
1071

1072
  // batch update all vnodes for this dnode in memory (no SDB write, safe in read thread)
1073
  // Optimized from O(m*n) to O(n+m):
1074
  //   step1 — build a (compactId,vgId)->rsp lookup map in O(m)
1075
  //   step2 — single SDB scan in O(n), O(1) lookup per matched entry
1076
  pProgressMap =
1077
      taosHashInit(rsp.numOfVnodes * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
141,564✔
1078
  if (pProgressMap == NULL) {
141,564✔
NEW
1079
    code = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
1080
    goto _exit;
×
1081
  }
1082

1083
  for (int32_t i = 0; i < rsp.numOfVnodes; i++) {
636,083✔
1084
    SQueryCompactProgressRsp *pVnodeRsp = &rsp.vnodeProgress[i];
494,519✔
1085
    mDebug("compact:%d, update progress from dnode:%d vgId:%d, "
494,519✔
1086
           "numberFileset:%d, finished:%d, progress:%d, remainingTime:%" PRId64,
1087
           pVnodeRsp->compactId, pVnodeRsp->dnodeId, pVnodeRsp->vgId,
1088
           pVnodeRsp->numberFileset, pVnodeRsp->finished, pVnodeRsp->progress, pVnodeRsp->remainingTime);
1089
    // pack (compactId, vgId) into a single int64 key
1090
    int64_t key = ((int64_t)(uint32_t)pVnodeRsp->compactId << 32) | (uint32_t)pVnodeRsp->vgId;
494,519✔
1091
    code = taosHashPut(pProgressMap, &key, sizeof(key), &pVnodeRsp, sizeof(SQueryCompactProgressRsp *));
494,519✔
1092
    if (code != 0) goto _exit;
494,519✔
1093
  }
1094

1095
  // single pass over SDB_COMPACT_DETAIL — O(n)
1096
  void *pIter = NULL;
141,564✔
1097
  while (1) {
429,303✔
1098
    SCompactDetailObj *pDetail = NULL;
570,867✔
1099
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
570,867✔
1100
    if (pIter == NULL) break;
570,867✔
1101

1102
    if (pDetail->dnodeId == rsp.dnodeId) {
429,303✔
1103
      int64_t                    key = ((int64_t)(uint32_t)pDetail->compactId << 32) | (uint32_t)pDetail->vgId;
168,455✔
1104
      SQueryCompactProgressRsp **ppVnodeRsp = taosHashGet(pProgressMap, &key, sizeof(key));
168,455✔
1105
      if (ppVnodeRsp != NULL) {
168,455✔
1106
        SQueryCompactProgressRsp *pVnodeRsp = *ppVnodeRsp;
111,215✔
1107
        pDetail->newNumberFileset = pVnodeRsp->numberFileset;
111,215✔
1108
        pDetail->newFinished      = pVnodeRsp->finished;
111,215✔
1109
        pDetail->progress         = pVnodeRsp->progress;
111,215✔
1110
        pDetail->remainingTime    = pVnodeRsp->remainingTime;
111,215✔
1111
      }
1112
    }
1113

1114
    sdbRelease(pMnode->pSdb, pDetail);
429,303✔
1115
  }
1116

1117
_exit:
141,564✔
1118
  taosHashCleanup(pProgressMap);
141,564✔
1119
  tFreeSDnodeQueryCompactProgressRsp(&rsp);
141,564✔
1120
  TAOS_RETURN(code);
141,564✔
1121
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc