• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4902

27 Dec 2025 02:36PM UTC coverage: 65.642% (-0.09%) from 65.734%
#4902

push

travis-ci

web-flow
fix: ci errors (#34079)

192706 of 293572 relevant lines covered (65.64%)

117199453.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.51
/source/dnode/mnode/impl/src/mndCompact.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "audit.h"
16
#include "mndCompact.h"
17
#include "mndCompactDetail.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndPrivilege.h"
21
#include "mndShow.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tmisce.h"
25
#include "tmsgcb.h"
26

27
#define MND_COMPACT_VER_NUMBER 1
28
#define MND_COMPACT_ID_LEN     11
29

30
static int32_t mndProcessCompactTimer(SRpcMsg *pReq);
31

32
int32_t mndInitCompact(SMnode *pMnode) {
399,157✔
33
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact);
399,157✔
34
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq);
399,157✔
35
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp);
399,157✔
36
  mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer);
399,157✔
37
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_COMPACT_RSP, mndTransProcessRsp);
399,157✔
38

39
  SSdbTable table = {
399,157✔
40
      .sdbType = SDB_COMPACT,
41
      .keyType = SDB_KEY_INT32,
42
      .encodeFp = (SdbEncodeFp)mndCompactActionEncode,
43
      .decodeFp = (SdbDecodeFp)mndCompactActionDecode,
44
      .insertFp = (SdbInsertFp)mndCompactActionInsert,
45
      .updateFp = (SdbUpdateFp)mndCompactActionUpdate,
46
      .deleteFp = (SdbDeleteFp)mndCompactActionDelete,
47
  };
48

49
  return sdbSetTable(pMnode->pSdb, table);
399,157✔
50
}
51

52
void mndCleanupCompact(SMnode *pMnode) { mDebug("mnd compact cleanup"); }
399,093✔
53

54
void tFreeCompactObj(SCompactObj *pCompact) {}
104,570✔
55

56
int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) {
180,632✔
57
  SEncoder encoder = {0};
180,632✔
58
  int32_t  code = 0;
180,632✔
59
  int32_t  lino;
60
  int32_t  tlen;
61
  tEncoderInit(&encoder, buf, bufLen);
180,632✔
62

63
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
180,632✔
64
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->compactId));
361,264✔
65
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
361,264✔
66
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
361,264✔
67
  TAOS_CHECK_EXIT(tEncodeU32v(&encoder, pObj->flags));
361,264✔
68
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->dbUid));
361,264✔
69

70
  tEndEncode(&encoder);
180,632✔
71

72
_exit:
180,632✔
73
  if (code) {
180,632✔
74
    tlen = code;
×
75
  } else {
76
    tlen = encoder.pos;
180,632✔
77
  }
78
  tEncoderClear(&encoder);
180,632✔
79
  return tlen;
180,632✔
80
}
81

82
int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) {
104,570✔
83
  int32_t  code = 0;
104,570✔
84
  int32_t  lino;
85
  SDecoder decoder = {0};
104,570✔
86
  tDecoderInit(&decoder, buf, bufLen);
104,570✔
87

88
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
104,570✔
89
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->compactId));
209,140✔
90
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
104,570✔
91
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
209,140✔
92

93
  if (!tDecodeIsEnd(&decoder)) {
104,570✔
94
    TAOS_CHECK_EXIT(tDecodeU32v(&decoder, &pObj->flags));
209,140✔
95
  } else {
96
    pObj->flags = 0;
×
97
  }
98
  if (!tDecodeIsEnd(&decoder)) {
104,570✔
99
    TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbUid));
209,140✔
100
  } else {
101
    pObj->dbUid = 0;
×
102
  }
103

104
  tEndDecode(&decoder);
104,570✔
105

106
_exit:
104,570✔
107
  tDecoderClear(&decoder);
104,570✔
108
  return code;
104,570✔
109
}
110

111
SSdbRaw *mndCompactActionEncode(SCompactObj *pCompact) {
58,176✔
112
  int32_t code = 0;
58,176✔
113
  int32_t lino = 0;
58,176✔
114
  terrno = TSDB_CODE_SUCCESS;
58,176✔
115

116
  void    *buf = NULL;
58,176✔
117
  SSdbRaw *pRaw = NULL;
58,176✔
118

119
  int32_t tlen = tSerializeSCompactObj(NULL, 0, pCompact);
58,176✔
120
  if (tlen < 0) {
58,176✔
121
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
122
    goto OVER;
×
123
  }
124

125
  int32_t size = sizeof(int32_t) + tlen;
58,176✔
126
  pRaw = sdbAllocRaw(SDB_COMPACT, MND_COMPACT_VER_NUMBER, size);
58,176✔
127
  if (pRaw == NULL) {
58,176✔
128
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
129
    goto OVER;
×
130
  }
131

132
  buf = taosMemoryMalloc(tlen);
58,176✔
133
  if (buf == NULL) {
58,176✔
134
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
135
    goto OVER;
×
136
  }
137

138
  tlen = tSerializeSCompactObj(buf, tlen, pCompact);
58,176✔
139
  if (tlen < 0) {
58,176✔
140
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
141
    goto OVER;
×
142
  }
143

144
  int32_t dataPos = 0;
58,176✔
145
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
58,176✔
146
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
58,176✔
147
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
58,176✔
148

149
OVER:
58,176✔
150
  taosMemoryFreeClear(buf);
58,176✔
151
  if (terrno != TSDB_CODE_SUCCESS) {
58,176✔
152
    mError("compact:%" PRId32 ", failed to encode to raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
153
    sdbFreeRaw(pRaw);
×
154
    return NULL;
×
155
  }
156

157
  mTrace("compact:%" PRId32 ", encode to raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
58,176✔
158
  return pRaw;
58,176✔
159
}
160

161
SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) {
76,164✔
162
  int32_t      code = 0;
76,164✔
163
  int32_t      lino = 0;
76,164✔
164
  SSdbRow     *pRow = NULL;
76,164✔
165
  SCompactObj *pCompact = NULL;
76,164✔
166
  void        *buf = NULL;
76,164✔
167
  terrno = TSDB_CODE_SUCCESS;
76,164✔
168

169
  int8_t sver = 0;
76,164✔
170
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
76,164✔
171
    goto OVER;
×
172
  }
173

174
  if (sver != MND_COMPACT_VER_NUMBER) {
76,164✔
175
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
176
    mError("compact read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER);
×
177
    goto OVER;
×
178
  }
179

180
  pRow = sdbAllocRow(sizeof(SCompactObj));
76,164✔
181
  if (pRow == NULL) {
76,164✔
182
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
183
    goto OVER;
×
184
  }
185

186
  pCompact = sdbGetRowObj(pRow);
76,164✔
187
  if (pCompact == NULL) {
76,164✔
188
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
189
    goto OVER;
×
190
  }
191

192
  int32_t tlen;
76,164✔
193
  int32_t dataPos = 0;
76,164✔
194
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
76,164✔
195
  buf = taosMemoryMalloc(tlen + 1);
76,164✔
196
  if (buf == NULL) {
76,164✔
197
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
198
    goto OVER;
×
199
  }
200
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
76,164✔
201

202
  if ((terrno = tDeserializeSCompactObj(buf, tlen, pCompact)) < 0) {
76,164✔
203
    goto OVER;
×
204
  }
205

206
OVER:
76,164✔
207
  taosMemoryFreeClear(buf);
76,164✔
208
  if (terrno != TSDB_CODE_SUCCESS) {
76,164✔
209
    mError("compact:%" PRId32 ", failed to decode from raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
210
    taosMemoryFreeClear(pRow);
×
211
    return NULL;
×
212
  }
213

214
  mTrace("compact:%" PRId32 ", decode from raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
76,164✔
215
  return pRow;
76,164✔
216
}
217

218
int32_t mndCompactActionInsert(SSdb *pSdb, SCompactObj *pCompact) {
38,588✔
219
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
38,588✔
220
  return 0;
38,588✔
221
}
222

223
int32_t mndCompactActionDelete(SSdb *pSdb, SCompactObj *pCompact) {
76,164✔
224
  mTrace("compact:%" PRId32 ", perform delete action", pCompact->compactId);
76,164✔
225
  tFreeCompactObj(pCompact);
76,164✔
226
  return 0;
76,164✔
227
}
228

229
int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj *pNewCompact) {
1,462✔
230
  mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p", pOldCompact->compactId, pOldCompact,
1,462✔
231
         pNewCompact);
232

233
  return 0;
1,462✔
234
}
235

236
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
135,162✔
237
  SSdb        *pSdb = pMnode->pSdb;
135,162✔
238
  SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId);
135,162✔
239
  if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
135,162✔
240
    terrno = TSDB_CODE_SUCCESS;
×
241
  }
242
  return pCompact;
135,162✔
243
}
244

245
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
135,162✔
246
  SSdb *pSdb = pMnode->pSdb;
135,162✔
247
  sdbRelease(pSdb, pCompact);
135,162✔
248
  pCompact = NULL;
135,162✔
249
}
135,162✔
250

251
static int32_t mndCompactGetDbInfo(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len, int64_t *dbUid) {
54,274✔
252
  int32_t      code = 0;
54,274✔
253
  SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
54,274✔
254
  if (pCompact == NULL) {
54,274✔
255
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
256
    if (terrno != 0) code = terrno;
×
257
    TAOS_RETURN(code);
×
258
  }
259

260
  tstrncpy(dbname, pCompact->dbname, len);
54,274✔
261
  if (dbUid) *dbUid = pCompact->dbUid;
54,274✔
262
  mndReleaseCompact(pMnode, pCompact);
54,274✔
263
  TAOS_RETURN(code);
54,274✔
264
}
265

266
// compact db
267
int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SDbObj *pDb, SCompactDbRsp *rsp) {
29,088✔
268
  int32_t code = 0;
29,088✔
269
  pCompact->compactId = tGenIdPI32();
29,088✔
270

271
  tstrncpy(pCompact->dbname, pDb->name, sizeof(pCompact->dbname));
29,088✔
272
  pCompact->dbUid = pDb->uid;
29,088✔
273

274
  pCompact->startTime = taosGetTimestampMs();
29,088✔
275

276
  SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact);
29,088✔
277
  if (pVgRaw == NULL) {
29,088✔
278
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
279
    if (terrno != 0) code = terrno;
×
280
    TAOS_RETURN(code);
×
281
  }
282
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
29,088✔
283
    sdbFreeRaw(pVgRaw);
×
284
    TAOS_RETURN(code);
×
285
  }
286

287
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
29,088✔
288
    sdbFreeRaw(pVgRaw);
×
289
    TAOS_RETURN(code);
×
290
  }
291

292
  rsp->compactId = pCompact->compactId;
29,088✔
293

294
  return 0;
29,088✔
295
}
296

297
// retrieve compact
298
int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
363,317✔
299
  SMnode      *pMnode = pReq->info.node;
363,317✔
300
  SSdb        *pSdb = pMnode->pSdb;
363,317✔
301
  int32_t      numOfRows = 0;
363,317✔
302
  SCompactObj *pCompact = NULL;
363,317✔
303
  char        *sep = NULL;
363,317✔
304
  SDbObj      *pDb = NULL;
363,317✔
305
  int32_t      code = 0;
363,317✔
306
  int32_t      lino = 0;
363,317✔
307

308
  if (strlen(pShow->db) > 0) {
363,317✔
309
    sep = strchr(pShow->db, '.');
×
310
    if (sep &&
×
311
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
312
      sep++;
×
313
    } else {
314
      pDb = mndAcquireDb(pMnode, pShow->db);
×
315
      if (pDb == NULL) return terrno;
×
316
    }
317
  }
318

319
  while (numOfRows < rows) {
703,901✔
320
    pShow->pIter = sdbFetch(pSdb, SDB_COMPACT, pShow->pIter, (void **)&pCompact);
703,901✔
321
    if (pShow->pIter == NULL) break;
703,901✔
322

323
    SColumnInfoData *pColInfo;
324
    SName            n;
325
    int32_t          cols = 0;
340,584✔
326

327
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
340,584✔
328

329
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
340,584✔
330
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->compactId, false), pCompact, &lino,
340,584✔
331
                        _OVER);
332

333
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
340,584✔
334
    if (pDb != NULL || !IS_SYS_DBNAME(pCompact->dbname)) {
681,168✔
335
      SName name = {0};
340,584✔
336
      TAOS_CHECK_GOTO(tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
340,584✔
337
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
340,584✔
338
    } else {
339
      tstrncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
×
340
    }
341
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
340,584✔
342
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pCompact, &lino, _OVER);
340,584✔
343

344
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
340,584✔
345
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->startTime, false), pCompact, &lino,
340,584✔
346
                        _OVER);
347

348
    numOfRows++;
340,584✔
349
    sdbRelease(pSdb, pCompact);
340,584✔
350
  }
351

352
_OVER:
363,317✔
353
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
363,317✔
354
  pShow->numOfRows += numOfRows;
363,317✔
355
  mndReleaseDb(pMnode, pDb);
363,317✔
356
  return numOfRows;
363,317✔
357
}
358

359
// kill compact
360
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId,
1,462✔
361
                                    int32_t dnodeid) {
362
  SVKillCompactReq req = {0};
1,462✔
363
  req.compactId = compactId;
1,462✔
364
  req.vgId = pVgroup->vgId;
1,462✔
365
  req.dnodeId = dnodeid;
1,462✔
366
  terrno = 0;
1,462✔
367

368
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
1,462✔
369
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
1,462✔
370
  if (contLen < 0) {
1,462✔
371
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
372
    return NULL;
×
373
  }
374
  contLen += sizeof(SMsgHead);
1,462✔
375

376
  void *pReq = taosMemoryMalloc(contLen);
1,462✔
377
  if (pReq == NULL) {
1,462✔
378
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
379
    return NULL;
×
380
  }
381

382
  SMsgHead *pHead = pReq;
1,462✔
383
  pHead->contLen = htonl(contLen);
1,462✔
384
  pHead->vgId = htonl(pVgroup->vgId);
1,462✔
385

386
  mTrace("vgId:%d, build compact vnode config req, contLen:%d", pVgroup->vgId, contLen);
1,462✔
387
  int32_t ret = 0;
1,462✔
388
  if ((ret = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
1,462✔
389
    terrno = ret;
×
390
    taosMemoryFreeClear(pReq);
×
391
    return NULL;
×
392
  }
393
  *pContLen = contLen;
1,462✔
394
  return pReq;
1,462✔
395
}
396

397
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId,
1,462✔
398
                                       int32_t dnodeid) {
399
  int32_t      code = 0;
1,462✔
400
  STransAction action = {0};
1,462✔
401

402
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
1,462✔
403
  if (pDnode == NULL) {
1,462✔
404
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
405
    if (terrno != 0) code = terrno;
×
406
    TAOS_RETURN(code);
×
407
  }
408
  action.epSet = mndGetDnodeEpset(pDnode);
1,462✔
409
  mndReleaseDnode(pMnode, pDnode);
1,462✔
410

411
  int32_t contLen = 0;
1,462✔
412
  void   *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid);
1,462✔
413
  if (pReq == NULL) {
1,462✔
414
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
415
    if (terrno != 0) code = terrno;
×
416
    TAOS_RETURN(code);
×
417
  }
418

419
  action.pCont = pReq;
1,462✔
420
  action.contLen = contLen;
1,462✔
421
  action.msgType = TDMT_VND_KILL_COMPACT;
1,462✔
422

423
  mTrace("trans:%d, kill compact msg len:%d", pTrans->id, contLen);
1,462✔
424

425
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
1,462✔
426
    taosMemoryFree(pReq);
×
427
    TAOS_RETURN(code);
×
428
  }
429

430
  return 0;
1,462✔
431
}
432

433
static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) {
1,462✔
434
  int32_t code = 0;
1,462✔
435
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact");
1,462✔
436
  if (pTrans == NULL) {
1,462✔
437
    mError("compact:%" PRId32 ", failed to drop since %s", pCompact->compactId, terrstr());
×
438
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
439
    if (terrno != 0) code = terrno;
×
440
    TAOS_RETURN(code);
×
441
  }
442
  mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId);
1,462✔
443

444
  mndTransSetDbName(pTrans, pCompact->dbname, NULL);
1,462✔
445

446
  SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
1,462✔
447
  if (pCommitRaw == NULL) {
1,462✔
448
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
449
    if (terrno != 0) code = terrno;
×
450
    mndTransDrop(pTrans);
×
451
    TAOS_RETURN(code);
×
452
  }
453
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
1,462✔
454
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
455
    mndTransDrop(pTrans);
×
456
    TAOS_RETURN(code);
×
457
  }
458
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
1,462✔
459
    mndTransDrop(pTrans);
×
460
    TAOS_RETURN(code);
×
461
  }
462

463
  void *pIter = NULL;
1,462✔
464
  while (1) {
2,924✔
465
    SCompactDetailObj *pDetail = NULL;
4,386✔
466
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
4,386✔
467
    if (pIter == NULL) break;
4,386✔
468

469
    if (pDetail->compactId == pCompact->compactId) {
2,924✔
470
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
1,462✔
471
      if (pVgroup == NULL) {
1,462✔
472
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
473
        sdbCancelFetch(pMnode->pSdb, pIter);
×
474
        sdbRelease(pMnode->pSdb, pDetail);
×
475
        mndTransDrop(pTrans);
×
476
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
477
        if (terrno != 0) code = terrno;
×
478
        TAOS_RETURN(code);
×
479
      }
480

481
      if ((code = mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId)) != 0) {
1,462✔
482
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
483
        sdbCancelFetch(pMnode->pSdb, pIter);
×
484
        sdbRelease(pMnode->pSdb, pDetail);
×
485
        mndTransDrop(pTrans);
×
486
        TAOS_RETURN(code);
×
487
      }
488

489
      mndReleaseVgroup(pMnode, pVgroup);
1,462✔
490

491
      /*
492
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
493
      if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
494
        mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
495
        mndTransDrop(pTrans);
496
        return -1;
497
      }
498
      sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
499
      */
500
    }
501

502
    sdbRelease(pMnode->pSdb, pDetail);
2,924✔
503
  }
504

505
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
1,462✔
506
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
507
    mndTransDrop(pTrans);
×
508
    TAOS_RETURN(code);
×
509
  }
510

511
  mndTransDrop(pTrans);
1,462✔
512
  return 0;
1,462✔
513
}
514

515
int32_t mndProcessKillCompactReq(SRpcMsg *pReq) {
1,462✔
516
  int32_t         code = 0;
1,462✔
517
  int32_t         lino = 0;
1,462✔
518
  SKillCompactReq killCompactReq = {0};
1,462✔
519
  int64_t         tss = taosGetTimestampMs();
1,462✔
520

521
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq)) != 0) {
1,462✔
522
    TAOS_RETURN(code);
×
523
  }
524

525
  mInfo("start to kill compact:%" PRId32, killCompactReq.compactId);
1,462✔
526

527
  SMnode      *pMnode = pReq->info.node;
1,462✔
528
  SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId);
1,462✔
529
  if (pCompact == NULL) {
1,462✔
530
    code = TSDB_CODE_MND_INVALID_COMPACT_ID;
×
531
    tFreeSKillCompactReq(&killCompactReq);
×
532
    TAOS_RETURN(code);
×
533
  }
534

535
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_COMPACT_DB), &lino, _OVER);
1,462✔
536

537
  TAOS_CHECK_GOTO(mndKillCompact(pMnode, pReq, pCompact), &lino, _OVER);
1,462✔
538

539
  code = TSDB_CODE_ACTION_IN_PROGRESS;
1,462✔
540

541
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
1,462✔
542
    char    obj[TSDB_INT32_ID_LEN] = {0};
1,462✔
543
    int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pCompact->compactId);
1,462✔
544
    if ((uint32_t)nBytes < sizeof(obj)) {
1,462✔
545
      int64_t tse = taosGetTimestampMs();
1,462✔
546
      double  duration = (double)(tse - tss);
1,462✔
547
      duration = duration / 1000;
1,462✔
548
      auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql,
1,462✔
549
                  killCompactReq.sqlLen, duration, 0);
550
    } else {
551
      mError("compact:%" PRId32 " failed to audit since %s", pCompact->compactId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
552
    }
553
  }
554

555
_OVER:
1,462✔
556
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,462✔
557
    mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr());
×
558
  }
559

560
  tFreeSKillCompactReq(&killCompactReq);
1,462✔
561
  mndReleaseCompact(pMnode, pCompact);
1,462✔
562

563
  TAOS_RETURN(code);
1,462✔
564
}
565

566
// update progress
567
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId,
101,073✔
568
                                        SQueryCompactProgressRsp *rsp) {
569
  int32_t code = 0;
101,073✔
570

571
  void *pIter = NULL;
101,073✔
572
  while (1) {
66,395✔
573
    SCompactDetailObj *pDetail = NULL;
167,468✔
574
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
167,468✔
575
    if (pIter == NULL) break;
167,468✔
576

577
    if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
166,245✔
578
      pDetail->newNumberFileset = rsp->numberFileset;
99,850✔
579
      pDetail->newFinished = rsp->finished;
99,850✔
580
      pDetail->progress = rsp->progress;
99,850✔
581
      pDetail->remainingTime = rsp->remainingTime;
99,850✔
582

583
      sdbCancelFetch(pMnode->pSdb, pIter);
99,850✔
584
      sdbRelease(pMnode->pSdb, pDetail);
99,850✔
585

586
      TAOS_RETURN(code);
99,850✔
587
    }
588

589
    sdbRelease(pMnode->pSdb, pDetail);
66,395✔
590
  }
591

592
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
1,223✔
593
}
594

595
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) {
109,093✔
596
  int32_t                  code = 0;
109,093✔
597
  SQueryCompactProgressRsp req = {0};
109,093✔
598
  if (pReq->code != 0) {
109,093✔
599
    mError("received wrong compact response, req code is %s", tstrerror(pReq->code));
8,020✔
600
    TAOS_RETURN(pReq->code);
8,020✔
601
  }
602
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
101,073✔
603
  if (code != 0) {
101,073✔
604
    mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
605
           pReq->contLen);
606
    TAOS_RETURN(code);
×
607
  }
608

609
  mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
101,073✔
610
         req.vgId, req.dnodeId, req.numberFileset, req.finished);
611

612
  SMnode *pMnode = pReq->info.node;
101,073✔
613

614
  code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
101,073✔
615
  if (code != 0) {
101,073✔
616
    mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
1,223✔
617
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
618
    TAOS_RETURN(code);
1,223✔
619
  }
620

621
  TAOS_RETURN(code);
99,850✔
622
}
623

624
// timer
625
void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
54,274✔
626
  void *pIter = NULL;
54,274✔
627

628
  while (1) {
118,519✔
629
    SCompactDetailObj *pDetail = NULL;
172,793✔
630
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
172,793✔
631
    if (pIter == NULL) break;
172,793✔
632

633
    if (pDetail->compactId == pCompact->compactId) {
118,519✔
634
      SEpSet epSet = {0};
109,093✔
635

636
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
109,093✔
637
      if (pDnode == NULL) break;
109,093✔
638
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
109,093✔
639
        sdbRelease(pMnode->pSdb, pDetail);
×
640
        continue;
×
641
      }
642
      mndReleaseDnode(pMnode, pDnode);
109,093✔
643

644
      SQueryCompactProgressReq req;
109,093✔
645
      req.compactId = pDetail->compactId;
109,093✔
646
      req.vgId = pDetail->vgId;
109,093✔
647
      req.dnodeId = pDetail->dnodeId;
109,093✔
648

649
      int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
109,093✔
650
      if (contLen < 0) {
109,093✔
651
        sdbRelease(pMnode->pSdb, pDetail);
×
652
        continue;
×
653
      }
654

655
      contLen += sizeof(SMsgHead);
109,093✔
656

657
      SMsgHead *pHead = rpcMallocCont(contLen);
109,093✔
658
      if (pHead == NULL) {
109,093✔
659
        sdbRelease(pMnode->pSdb, pDetail);
×
660
        continue;
×
661
      }
662

663
      pHead->contLen = htonl(contLen);
109,093✔
664
      pHead->vgId = htonl(pDetail->vgId);
109,093✔
665

666
      if (tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
109,093✔
667
        sdbRelease(pMnode->pSdb, pDetail);
×
668
        continue;
×
669
      }
670

671
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen};
109,093✔
672

673
      rpcMsg.pCont = pHead;
109,093✔
674

675
      char    detail[1024] = {0};
109,093✔
676
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
218,186✔
677
                              TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS), epSet.numOfEps, epSet.inUse);
218,186✔
678
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
218,186✔
679
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
109,093✔
680
      }
681

682
      mDebug("compact:%d, send update progress msg to %s", pDetail->compactId, detail);
109,093✔
683

684
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
109,093✔
685
        sdbRelease(pMnode->pSdb, pDetail);
×
686
        continue;
×
687
      }
688
    }
689

690
    sdbRelease(pMnode->pSdb, pDetail);
118,519✔
691
  }
692
}
54,274✔
693

694
static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
54,274✔
695
  int32_t code = 0;
54,274✔
696
  bool    needSave = false;
54,274✔
697
  void   *pIter = NULL;
54,274✔
698
  while (1) {
118,519✔
699
    SCompactDetailObj *pDetail = NULL;
172,793✔
700
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
172,793✔
701
    if (pIter == NULL) break;
172,793✔
702

703
    if (pDetail->compactId == compactId) {
118,519✔
704
      mDebug(
109,093✔
705
          "compact:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
706
          "newNumberFileset:%d, newFinished:%d",
707
          pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
708
          pDetail->newNumberFileset, pDetail->newFinished);
709

710
      // these 2 number will jump back after dnode restart, so < is not used here
711
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
109,093✔
712
        needSave = true;
52,557✔
713
    }
714

715
    sdbRelease(pMnode->pSdb, pDetail);
118,519✔
716
  }
717

718
  char    dbname[TSDB_TABLE_FNAME_LEN] = {0};
54,274✔
719
  int64_t dbUid = 0;
54,274✔
720
  TAOS_CHECK_RETURN(mndCompactGetDbInfo(pMnode, compactId, dbname, TSDB_TABLE_FNAME_LEN, &dbUid));
54,274✔
721

722
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
54,274✔
723
    needSave = true;
1,119✔
724
    mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, dbname);
1,119✔
725
  }
726

727
  if (!needSave) {
54,274✔
728
    mDebug("compact:%" PRId32 ", no need to save", compactId);
25,955✔
729
    TAOS_RETURN(code);
25,955✔
730
  }
731

732
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress");
28,319✔
733
  if (pTrans == NULL) {
28,319✔
734
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
735
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
736
    if (terrno != 0) code = terrno;
×
737
    TAOS_RETURN(code);
×
738
  }
739
  mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
28,319✔
740

741
  mndTransSetDbName(pTrans, dbname, NULL);
28,319✔
742

743
  pIter = NULL;
28,319✔
744
  while (1) {
67,718✔
745
    SCompactDetailObj *pDetail = NULL;
96,037✔
746
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
96,037✔
747
    if (pIter == NULL) break;
96,037✔
748

749
    if (pDetail->compactId == compactId) {
67,718✔
750
      mInfo(
61,749✔
751
          "compact:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
752
          "newNumberFileset:%d, newFinished:%d",
753
          pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
754
          pDetail->newNumberFileset, pDetail->newFinished);
755

756
      pDetail->numberFileset = pDetail->newNumberFileset;
61,749✔
757
      pDetail->finished = pDetail->newFinished;
61,749✔
758

759
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
61,749✔
760
      if (pCommitRaw == NULL) {
61,749✔
761
        sdbCancelFetch(pMnode->pSdb, pIter);
×
762
        sdbRelease(pMnode->pSdb, pDetail);
×
763
        mndTransDrop(pTrans);
×
764
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
765
        if (terrno != 0) code = terrno;
×
766
        TAOS_RETURN(code);
×
767
      }
768
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
61,749✔
769
        mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr());
×
770
        sdbCancelFetch(pMnode->pSdb, pIter);
×
771
        sdbRelease(pMnode->pSdb, pDetail);
×
772
        mndTransDrop(pTrans);
×
773
        TAOS_RETURN(code);
×
774
      }
775
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
61,749✔
776
        sdbCancelFetch(pMnode->pSdb, pIter);
×
777
        sdbRelease(pMnode->pSdb, pDetail);
×
778
        mndTransDrop(pTrans);
×
779
        TAOS_RETURN(code);
×
780
      }
781
    }
782

783
    sdbRelease(pMnode->pSdb, pDetail);
67,718✔
784
  }
785

786
  bool allFinished = true;
28,319✔
787
  pIter = NULL;
28,319✔
788
  while (1) {
52,486✔
789
    SCompactDetailObj *pDetail = NULL;
80,805✔
790
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
80,805✔
791
    if (pIter == NULL) break;
80,805✔
792

793
    if (pDetail->compactId == compactId) {
56,096✔
794
      mInfo("compact:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
52,746✔
795
            pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
796

797
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
52,746✔
798
        allFinished = false;
1,655✔
799
        sdbCancelFetch(pMnode->pSdb, pIter);
1,655✔
800
        sdbRelease(pMnode->pSdb, pDetail);
1,655✔
801
        break;
1,655✔
802
      }
803
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
51,091✔
804
        allFinished = false;
1,955✔
805
        sdbCancelFetch(pMnode->pSdb, pIter);
1,955✔
806
        sdbRelease(pMnode->pSdb, pDetail);
1,955✔
807
        break;
1,955✔
808
      }
809
    }
810

811
    sdbRelease(pMnode->pSdb, pDetail);
52,486✔
812
  }
813

814
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
28,319✔
815
    allFinished = true;
1,119✔
816
    mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, dbname);
1,119✔
817
  }
818

819
  if (allFinished) {
28,319✔
820
    mInfo("compact:%d, all finished", compactId);
25,152✔
821
    pIter = NULL;
25,152✔
822
    while (1) {
54,718✔
823
      SCompactDetailObj *pDetail = NULL;
79,870✔
824
      pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
79,870✔
825
      if (pIter == NULL) break;
79,870✔
826

827
      if (pDetail->compactId == compactId) {
54,718✔
828
        SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
53,240✔
829
        if (pCommitRaw == NULL) {
53,240✔
830
          mndTransDrop(pTrans);
×
831
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
832
          if (terrno != 0) code = terrno;
×
833
          TAOS_RETURN(code);
×
834
        }
835
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
53,240✔
836
          mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id,
×
837
                 terrstr());
838
          sdbCancelFetch(pMnode->pSdb, pIter);
×
839
          sdbRelease(pMnode->pSdb, pDetail);
×
840
          mndTransDrop(pTrans);
×
841
          TAOS_RETURN(code);
×
842
        }
843
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
53,240✔
844
          sdbCancelFetch(pMnode->pSdb, pIter);
×
845
          sdbRelease(pMnode->pSdb, pDetail);
×
846
          mndTransDrop(pTrans);
×
847
          TAOS_RETURN(code);
×
848
        }
849
        mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId);
53,240✔
850
      }
851

852
      sdbRelease(pMnode->pSdb, pDetail);
54,718✔
853
    }
854

855
    SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
25,152✔
856
    if (pCompact == NULL) {
25,152✔
857
      mndTransDrop(pTrans);
×
858
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
859
      if (terrno != 0) code = terrno;
×
860
      TAOS_RETURN(code);
×
861
    }
862
    SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
25,152✔
863
    mndReleaseCompact(pMnode, pCompact);
25,152✔
864
    if (pCommitRaw == NULL) {
25,152✔
865
      mndTransDrop(pTrans);
×
866
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
867
      if (terrno != 0) code = terrno;
×
868
      TAOS_RETURN(code);
×
869
    }
870
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
25,152✔
871
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
872
      mndTransDrop(pTrans);
×
873
      TAOS_RETURN(code);
×
874
    }
875
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
25,152✔
876
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
877
      mndTransDrop(pTrans);
×
878
      TAOS_RETURN(code);
×
879
    }
880
    mInfo("compact:%d, add drop compact action", pCompact->compactId);
25,152✔
881
  }
882

883
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
28,319✔
884
    mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
×
885
    mndTransDrop(pTrans);
×
886
    TAOS_RETURN(code);
×
887
  }
888

889
  mndTransDrop(pTrans);
28,319✔
890
  return 0;
28,319✔
891
}
892

893
static void mndCompactPullup(SMnode *pMnode) {
2,148,148✔
894
  int32_t code = 0;
2,148,148✔
895
  SSdb   *pSdb = pMnode->pSdb;
2,148,148✔
896
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t));
2,148,148✔
897
  if (pArray == NULL) return;
2,148,148✔
898

899
  void *pIter = NULL;
2,148,148✔
900
  while (1) {
54,274✔
901
    SCompactObj *pCompact = NULL;
2,202,422✔
902
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
2,202,422✔
903
    if (pIter == NULL) break;
2,202,422✔
904
    if (taosArrayPush(pArray, &pCompact->compactId) == NULL) {
108,548✔
905
      mError("failed to push compact id:%d into array, but continue pull up", pCompact->compactId);
×
906
    }
907
    sdbRelease(pSdb, pCompact);
54,274✔
908
  }
909

910
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
2,202,422✔
911
    mInfo("begin to pull up");
54,274✔
912
    int32_t     *pCompactId = taosArrayGet(pArray, i);
54,274✔
913
    SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);
54,274✔
914
    if (pCompact != NULL) {
54,274✔
915
      mInfo("compact:%d, begin to pull up", pCompact->compactId);
54,274✔
916
      mndCompactSendProgressReq(pMnode, pCompact);
54,274✔
917
      if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
54,274✔
918
        mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
×
919
      }
920
      mndReleaseCompact(pMnode, pCompact);
54,274✔
921
    }
922
  }
923
  taosArrayDestroy(pArray);
2,148,148✔
924
}
925
#ifdef TD_ENTERPRISE
926
static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
×
927
  if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
×
928
    return 0;
×
929
  }
930
  int64_t tss = taosGetTimestampMs();
×
931

932
  SName   name = {0};
×
933
  int32_t sqlLen = 0;
×
934
  char    sql[256] = {0};
×
935
  char    skeyStr[40] = {0};
×
936
  char    ekeyStr[40] = {0};
×
937
  char   *pDbName = pDb->name;
×
938

939
  if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) {
×
940
    pDbName = name.dbname;
×
941
  }
942

943
  if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 &&
×
944
      taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) {
×
945
    sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr);
×
946
  } else {
947
    sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
×
948
                       tw->ekey);
949
  }
950

951
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
×
952
    int64_t tse = taosGetTimestampMs();
×
953
    double  duration = (double)(tse - tss);
×
954
    duration = duration / 1000;
×
955
    auditRecord(NULL, pMnode->clusterId, "autoCompactDB", name.dbname, "", sql, sqlLen, duration, 0);
×
956
  }
957

958
  return 0;
×
959
}
960

961
extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds,
962
                            bool metaOnly, ETsdbOpType type, ETriggerType triggerType);
963
static int32_t mndCompactDispatch(SRpcMsg *pReq) {
2,148,148✔
964
  int32_t code = 0;
2,148,148✔
965
  SMnode *pMnode = pReq->info.node;
2,148,148✔
966
  SSdb   *pSdb = pMnode->pSdb;
2,148,148✔
967
  int64_t curMs = taosGetTimestampMs();
2,148,148✔
968
  int64_t curMin = curMs / 60000LL;
2,148,148✔
969

970
  void   *pIter = NULL;
2,148,148✔
971
  SDbObj *pDb = NULL;
2,148,148✔
972
  while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
6,179,232✔
973
    if (pDb->cfg.compactInterval <= 0) {
4,031,084✔
974
      mDebug("db:%p,%s, compact interval is %dm, skip", pDb, pDb->name, pDb->cfg.compactInterval);
4,027,457✔
975
      sdbRelease(pSdb, pDb);
4,027,457✔
976
      continue;
4,027,457✔
977
    }
978

979
    if (pDb->cfg.isMount) {
3,627✔
980
      sdbRelease(pSdb, pDb);
×
981
      continue;
×
982
    }
983

984
    // daysToKeep2 would be altered
985
    if (pDb->cfg.compactEndTime && (pDb->cfg.compactEndTime <= -pDb->cfg.daysToKeep2)) {
3,627✔
986
      mWarn("db:%p,%s, compact end time:%dm <= -keep2:%dm , skip", pDb, pDb->name, pDb->cfg.compactEndTime,
×
987
            -pDb->cfg.daysToKeep2);
988
      sdbRelease(pSdb, pDb);
×
989
      continue;
×
990
    }
991

992
    int64_t compactStartTime = pDb->cfg.compactStartTime ? pDb->cfg.compactStartTime : -pDb->cfg.daysToKeep2;
3,627✔
993
    int64_t compactEndTime = pDb->cfg.compactEndTime ? pDb->cfg.compactEndTime : -pDb->cfg.daysPerFile;
3,627✔
994

995
    if (compactStartTime >= compactEndTime) {
3,627✔
996
      mDebug("db:%p,%s, compact start time:%" PRIi64 "m >= end time:%" PRIi64 "m, skip", pDb, pDb->name,
×
997
             compactStartTime, compactEndTime);
998
      sdbRelease(pSdb, pDb);
×
999
      continue;
×
1000
    }
1001

1002
    int64_t remainder = ((curMin - (int64_t)pDb->cfg.compactTimeOffset * 60LL) % pDb->cfg.compactInterval);
3,627✔
1003
    if (remainder != 0) {
3,627✔
1004
      mDebug("db:%p,%s, current time:%" PRIi64 "m is not divisible by compact interval:%dm, offset:%" PRIi8
3,627✔
1005
             "h, remainder:%" PRIi64 "m, skip",
1006
             pDb, pDb->name, curMin, pDb->cfg.compactInterval, pDb->cfg.compactTimeOffset, remainder);
1007
      sdbRelease(pSdb, pDb);
3,627✔
1008
      continue;
3,627✔
1009
    }
1010

1011
    if ((pDb->compactStartTime / 60000LL) == curMin) {
×
1012
      mDebug("db:%p:%s, compact has already been dispatched at %" PRIi64 "m(%" PRIi64 "ms), skip", pDb, pDb->name,
×
1013
             curMin, pDb->compactStartTime);
1014
      sdbRelease(pSdb, pDb);
×
1015
      continue;
×
1016
    }
1017

1018
    STimeWindow tw = {
×
1019
        .skey = convertTimePrecision(curMs + compactStartTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision),
×
1020
        .ekey = convertTimePrecision(curMs + compactEndTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)};
×
1021

1022
    if ((code = mndCompactDb(pMnode, NULL, pDb, tw, NULL, false, TSDB_OPTR_NORMAL, TSDB_TRIGGER_AUTO)) == 0) {
×
1023
      mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
1024
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h",
1025
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
1026
            pDb->cfg.compactTimeOffset);
1027
    } else {
1028
      mWarn("db:%p,%s, failed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
1029
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h, since %s",
1030
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
1031
            pDb->cfg.compactTimeOffset, tstrerror(code));
1032
    }
1033

1034
    TAOS_UNUSED(mndCompactDispatchAudit(pMnode, pReq, pDb, &tw));
×
1035

1036
    sdbRelease(pSdb, pDb);
×
1037
  }
1038
  return 0;
2,148,148✔
1039
}
1040
#endif
1041

1042
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
2,148,148✔
1043
#ifdef TD_ENTERPRISE
1044
  mTrace("start to process compact timer");
2,148,148✔
1045
  mndCompactPullup(pReq->info.node);
2,148,148✔
1046
  TAOS_UNUSED(mndCompactDispatch(pReq));
2,148,148✔
1047
#endif
1048
  return 0;
2,148,148✔
1049
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc