• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.81
/source/dnode/mnode/impl/src/mndCompact.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "audit.h"
16
#include "mndCompact.h"
17
#include "mndCompactDetail.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndPrivilege.h"
21
#include "mndShow.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tmisce.h"
25
#include "tmsgcb.h"
26

27
#define MND_COMPACT_VER_NUMBER 1
28
#define MND_COMPACT_ID_LEN     11
29

30
static int32_t mndProcessCompactTimer(SRpcMsg *pReq);
31

32
int32_t mndInitCompact(SMnode *pMnode) {
1,929✔
33
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact);
1,929✔
34
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq);
1,929✔
35
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp);
1,929✔
36
  mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer);
1,929✔
37
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_COMPACT_RSP, mndTransProcessRsp);
1,929✔
38

39
  SSdbTable table = {
1,929✔
40
      .sdbType = SDB_COMPACT,
41
      .keyType = SDB_KEY_INT32,
42
      .encodeFp = (SdbEncodeFp)mndCompactActionEncode,
43
      .decodeFp = (SdbDecodeFp)mndCompactActionDecode,
44
      .insertFp = (SdbInsertFp)mndCompactActionInsert,
45
      .updateFp = (SdbUpdateFp)mndCompactActionUpdate,
46
      .deleteFp = (SdbDeleteFp)mndCompactActionDelete,
47
  };
48

49
  return sdbSetTable(pMnode->pSdb, table);
1,929✔
50
}
51

52
void mndCleanupCompact(SMnode *pMnode) { mDebug("mnd compact cleanup"); }
1,929✔
53

54
void tFreeCompactObj(SCompactObj *pCompact) {}
121✔
55

56
int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) {
192✔
57
  SEncoder encoder = {0};
192✔
58
  int32_t  code = 0;
192✔
59
  int32_t  lino;
60
  int32_t  tlen;
61
  tEncoderInit(&encoder, buf, bufLen);
192✔
62

63
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
192!
64
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->compactId));
384!
65
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
384!
66
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
384!
67

68
  tEndEncode(&encoder);
192✔
69

70
_exit:
192✔
71
  if (code) {
192!
72
    tlen = code;
×
73
  } else {
74
    tlen = encoder.pos;
192✔
75
  }
76
  tEncoderClear(&encoder);
192✔
77
  return tlen;
192✔
78
}
79

80
int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) {
121✔
81
  int32_t  code = 0;
121✔
82
  int32_t  lino;
83
  SDecoder decoder = {0};
121✔
84
  tDecoderInit(&decoder, buf, bufLen);
121✔
85

86
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
121!
87
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->compactId));
242!
88
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
121!
89
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
242!
90

91
  tEndDecode(&decoder);
121✔
92

93
_exit:
121✔
94
  tDecoderClear(&decoder);
121✔
95
  return code;
121✔
96
}
97

98
SSdbRaw *mndCompactActionEncode(SCompactObj *pCompact) {
96✔
99
  int32_t code = 0;
96✔
100
  int32_t lino = 0;
96✔
101
  terrno = TSDB_CODE_SUCCESS;
96✔
102

103
  void    *buf = NULL;
96✔
104
  SSdbRaw *pRaw = NULL;
96✔
105

106
  int32_t tlen = tSerializeSCompactObj(NULL, 0, pCompact);
96✔
107
  if (tlen < 0) {
96!
108
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
109
    goto OVER;
×
110
  }
111

112
  int32_t size = sizeof(int32_t) + tlen;
96✔
113
  pRaw = sdbAllocRaw(SDB_COMPACT, MND_COMPACT_VER_NUMBER, size);
96✔
114
  if (pRaw == NULL) {
96!
115
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
116
    goto OVER;
×
117
  }
118

119
  buf = taosMemoryMalloc(tlen);
96!
120
  if (buf == NULL) {
96!
121
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
122
    goto OVER;
×
123
  }
124

125
  tlen = tSerializeSCompactObj(buf, tlen, pCompact);
96✔
126
  if (tlen < 0) {
96!
127
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
128
    goto OVER;
×
129
  }
130

131
  int32_t dataPos = 0;
96✔
132
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
96!
133
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
96!
134
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
96!
135

136
OVER:
96✔
137
  taosMemoryFreeClear(buf);
96!
138
  if (terrno != TSDB_CODE_SUCCESS) {
96!
139
    mError("compact:%" PRId32 ", failed to encode to raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
140
    sdbFreeRaw(pRaw);
×
141
    return NULL;
×
142
  }
143

144
  mTrace("compact:%" PRId32 ", encode to raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
96!
145
  return pRaw;
96✔
146
}
147

148
SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) {
121✔
149
  int32_t      code = 0;
121✔
150
  int32_t      lino = 0;
121✔
151
  SSdbRow     *pRow = NULL;
121✔
152
  SCompactObj *pCompact = NULL;
121✔
153
  void        *buf = NULL;
121✔
154
  terrno = TSDB_CODE_SUCCESS;
121✔
155

156
  int8_t sver = 0;
121✔
157
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
121!
158
    goto OVER;
×
159
  }
160

161
  if (sver != MND_COMPACT_VER_NUMBER) {
121!
162
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
163
    mError("compact read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER);
×
164
    goto OVER;
×
165
  }
166

167
  pRow = sdbAllocRow(sizeof(SCompactObj));
121✔
168
  if (pRow == NULL) {
121!
169
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
170
    goto OVER;
×
171
  }
172

173
  pCompact = sdbGetRowObj(pRow);
121✔
174
  if (pCompact == NULL) {
121!
175
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
176
    goto OVER;
×
177
  }
178

179
  int32_t tlen;
180
  int32_t dataPos = 0;
121✔
181
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
121!
182
  buf = taosMemoryMalloc(tlen + 1);
121!
183
  if (buf == NULL) {
121!
184
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
185
    goto OVER;
×
186
  }
187
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
121!
188

189
  if ((terrno = tDeserializeSCompactObj(buf, tlen, pCompact)) < 0) {
121!
190
    goto OVER;
×
191
  }
192

193
OVER:
121✔
194
  taosMemoryFreeClear(buf);
121!
195
  if (terrno != TSDB_CODE_SUCCESS) {
121!
196
    mError("compact:%" PRId32 ", failed to decode from raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
197
    taosMemoryFreeClear(pRow);
×
198
    return NULL;
×
199
  }
200

201
  mTrace("compact:%" PRId32 ", decode from raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
121!
202
  return pRow;
121✔
203
}
204

205
int32_t mndCompactActionInsert(SSdb *pSdb, SCompactObj *pCompact) {
63✔
206
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
63!
207
  return 0;
63✔
208
}
209

210
int32_t mndCompactActionDelete(SSdb *pSdb, SCompactObj *pCompact) {
121✔
211
  mTrace("compact:%" PRId32 ", perform delete action", pCompact->compactId);
121!
212
  tFreeCompactObj(pCompact);
121✔
213
  return 0;
121✔
214
}
215

216
int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj *pNewCompact) {
2✔
217
  mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p", pOldCompact->compactId, pOldCompact,
2!
218
         pNewCompact);
219

220
  return 0;
2✔
221
}
222

223
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
204✔
224
  SSdb        *pSdb = pMnode->pSdb;
204✔
225
  SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId);
204✔
226
  if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
204!
227
    terrno = TSDB_CODE_SUCCESS;
×
228
  }
229
  return pCompact;
204✔
230
}
231

232
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
204✔
233
  SSdb *pSdb = pMnode->pSdb;
204✔
234
  sdbRelease(pSdb, pCompact);
204✔
235
  pCompact = NULL;
204✔
236
}
204✔
237

238
int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len) {
81✔
239
  int32_t      code = 0;
81✔
240
  SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
81✔
241
  if (pCompact == NULL) {
81!
242
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
243
    if (terrno != 0) code = terrno;
×
244
    TAOS_RETURN(code);
×
245
  }
246

247
  tstrncpy(dbname, pCompact->dbname, len);
81✔
248
  mndReleaseCompact(pMnode, pCompact);
81✔
249
  TAOS_RETURN(code);
81✔
250
}
251

252
// compact db
253
int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SDbObj *pDb, SCompactDbRsp *rsp) {
47✔
254
  int32_t code = 0;
47✔
255
  pCompact->compactId = tGenIdPI32();
47✔
256

257
  tstrncpy(pCompact->dbname, pDb->name, sizeof(pCompact->dbname));
47✔
258

259
  pCompact->startTime = taosGetTimestampMs();
47✔
260

261
  SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact);
47✔
262
  if (pVgRaw == NULL) {
47!
263
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
264
    if (terrno != 0) code = terrno;
×
265
    TAOS_RETURN(code);
×
266
  }
267
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
47!
268
    sdbFreeRaw(pVgRaw);
×
269
    TAOS_RETURN(code);
×
270
  }
271

272
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
47!
273
    sdbFreeRaw(pVgRaw);
×
274
    TAOS_RETURN(code);
×
275
  }
276

277
  rsp->compactId = pCompact->compactId;
47✔
278

279
  return 0;
47✔
280
}
281

282
// retrieve compact
283
int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
602✔
284
  SMnode      *pMnode = pReq->info.node;
602✔
285
  SSdb        *pSdb = pMnode->pSdb;
602✔
286
  int32_t      numOfRows = 0;
602✔
287
  SCompactObj *pCompact = NULL;
602✔
288
  char        *sep = NULL;
602✔
289
  SDbObj      *pDb = NULL;
602✔
290
  int32_t      code = 0;
602✔
291
  int32_t      lino = 0;
602✔
292

293
  if (strlen(pShow->db) > 0) {
602!
294
    sep = strchr(pShow->db, '.');
×
295
    if (sep &&
×
296
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
297
      sep++;
×
298
    } else {
299
      pDb = mndAcquireDb(pMnode, pShow->db);
×
300
      if (pDb == NULL) return terrno;
×
301
    }
302
  }
303

304
  while (numOfRows < rows) {
1,166!
305
    pShow->pIter = sdbFetch(pSdb, SDB_COMPACT, pShow->pIter, (void **)&pCompact);
1,166✔
306
    if (pShow->pIter == NULL) break;
1,166✔
307

308
    SColumnInfoData *pColInfo;
309
    SName            n;
310
    int32_t          cols = 0;
564✔
311

312
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
564✔
313

314
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
564✔
315
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->compactId, false), pCompact, &lino,
564!
316
                        _OVER);
317

318
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
564✔
319
    if (pDb != NULL || !IS_SYS_DBNAME(pCompact->dbname)) {
1,128!
320
      SName name = {0};
564✔
321
      TAOS_CHECK_GOTO(tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
564!
322
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
564✔
323
    } else {
324
      tstrncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
×
325
    }
326
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
564✔
327
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pCompact, &lino, _OVER);
564!
328

329
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
564✔
330
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->startTime, false), pCompact, &lino,
564!
331
                        _OVER);
332

333
    numOfRows++;
564✔
334
    sdbRelease(pSdb, pCompact);
564✔
335
  }
336

337
_OVER:
×
338
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
602!
339
  pShow->numOfRows += numOfRows;
602✔
340
  mndReleaseDb(pMnode, pDb);
602✔
341
  return numOfRows;
602✔
342
}
343

344
// kill compact
345
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId,
2✔
346
                                    int32_t dnodeid) {
347
  SVKillCompactReq req = {0};
2✔
348
  req.compactId = compactId;
2✔
349
  req.vgId = pVgroup->vgId;
2✔
350
  req.dnodeId = dnodeid;
2✔
351
  terrno = 0;
2✔
352

353
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
2!
354
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
2✔
355
  if (contLen < 0) {
2!
356
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
357
    return NULL;
×
358
  }
359
  contLen += sizeof(SMsgHead);
2✔
360

361
  void *pReq = taosMemoryMalloc(contLen);
2!
362
  if (pReq == NULL) {
2!
363
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
364
    return NULL;
×
365
  }
366

367
  SMsgHead *pHead = pReq;
2✔
368
  pHead->contLen = htonl(contLen);
2✔
369
  pHead->vgId = htonl(pVgroup->vgId);
2✔
370

371
  mTrace("vgId:%d, build compact vnode config req, contLen:%d", pVgroup->vgId, contLen);
2!
372
  int32_t ret = 0;
2✔
373
  if ((ret = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
2!
374
    terrno = ret;
×
375
    return NULL;
×
376
  }
377
  *pContLen = contLen;
2✔
378
  return pReq;
2✔
379
}
380

381
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId,
2✔
382
                                       int32_t dnodeid) {
383
  int32_t      code = 0;
2✔
384
  STransAction action = {0};
2✔
385

386
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
2✔
387
  if (pDnode == NULL) {
2!
388
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
389
    if (terrno != 0) code = terrno;
×
390
    TAOS_RETURN(code);
×
391
  }
392
  action.epSet = mndGetDnodeEpset(pDnode);
2✔
393
  mndReleaseDnode(pMnode, pDnode);
2✔
394

395
  int32_t contLen = 0;
2✔
396
  void   *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid);
2✔
397
  if (pReq == NULL) {
2!
398
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
399
    if (terrno != 0) code = terrno;
×
400
    TAOS_RETURN(code);
×
401
  }
402

403
  action.pCont = pReq;
2✔
404
  action.contLen = contLen;
2✔
405
  action.msgType = TDMT_VND_KILL_COMPACT;
2✔
406

407
  mTrace("trans:%d, kill compact msg len:%d", pTrans->id, contLen);
2!
408

409
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2!
410
    taosMemoryFree(pReq);
×
411
    TAOS_RETURN(code);
×
412
  }
413

414
  return 0;
2✔
415
}
416

417
static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) {
2✔
418
  int32_t code = 0;
2✔
419
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact");
2✔
420
  if (pTrans == NULL) {
2!
421
    mError("compact:%" PRId32 ", failed to drop since %s", pCompact->compactId, terrstr());
×
422
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
423
    if (terrno != 0) code = terrno;
×
424
    TAOS_RETURN(code);
×
425
  }
426
  mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId);
2!
427

428
  mndTransSetDbName(pTrans, pCompact->dbname, NULL);
2✔
429

430
  SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
2✔
431
  if (pCommitRaw == NULL) {
2!
432
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
433
    if (terrno != 0) code = terrno;
×
434
    mndTransDrop(pTrans);
×
435
    TAOS_RETURN(code);
×
436
  }
437
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
2!
438
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
439
    mndTransDrop(pTrans);
×
440
    TAOS_RETURN(code);
×
441
  }
442
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
2!
443
    mndTransDrop(pTrans);
×
444
    TAOS_RETURN(code);
×
445
  }
446

447
  void *pIter = NULL;
2✔
448
  while (1) {
4✔
449
    SCompactDetailObj *pDetail = NULL;
6✔
450
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
6✔
451
    if (pIter == NULL) break;
6✔
452

453
    if (pDetail->compactId == pCompact->compactId) {
4✔
454
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
2✔
455
      if (pVgroup == NULL) {
2!
456
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
457
        sdbCancelFetch(pMnode->pSdb, pIter);
×
458
        sdbRelease(pMnode->pSdb, pDetail);
×
459
        mndTransDrop(pTrans);
×
460
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
461
        if (terrno != 0) code = terrno;
×
462
        TAOS_RETURN(code);
×
463
      }
464

465
      if ((code = mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId)) != 0) {
2!
466
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
467
        sdbCancelFetch(pMnode->pSdb, pIter);
×
468
        sdbRelease(pMnode->pSdb, pDetail);
×
469
        mndTransDrop(pTrans);
×
470
        TAOS_RETURN(code);
×
471
      }
472

473
      mndReleaseVgroup(pMnode, pVgroup);
2✔
474

475
      /*
476
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
477
      if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
478
        mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
479
        mndTransDrop(pTrans);
480
        return -1;
481
      }
482
      sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
483
      */
484
    }
485

486
    sdbRelease(pMnode->pSdb, pDetail);
4✔
487
  }
488

489
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
2!
490
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
491
    mndTransDrop(pTrans);
×
492
    TAOS_RETURN(code);
×
493
  }
494

495
  mndTransDrop(pTrans);
2✔
496
  return 0;
2✔
497
}
498

499
int32_t mndProcessKillCompactReq(SRpcMsg *pReq) {
2✔
500
  int32_t         code = 0;
2✔
501
  int32_t         lino = 0;
2✔
502
  SKillCompactReq killCompactReq = {0};
2✔
503

504
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq)) != 0) {
2!
505
    TAOS_RETURN(code);
×
506
  }
507

508
  mInfo("start to kill compact:%" PRId32, killCompactReq.compactId);
2!
509

510
  SMnode      *pMnode = pReq->info.node;
2✔
511
  SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId);
2✔
512
  if (pCompact == NULL) {
2!
513
    code = TSDB_CODE_MND_INVALID_COMPACT_ID;
×
514
    tFreeSKillCompactReq(&killCompactReq);
×
515
    TAOS_RETURN(code);
×
516
  }
517

518
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB), &lino, _OVER);
2!
519

520
  TAOS_CHECK_GOTO(mndKillCompact(pMnode, pReq, pCompact), &lino, _OVER);
2!
521

522
  code = TSDB_CODE_ACTION_IN_PROGRESS;
2✔
523

524
  char    obj[TSDB_INT32_ID_LEN] = {0};
2✔
525
  int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pCompact->compactId);
2✔
526
  if ((uint32_t)nBytes < sizeof(obj)) {
2!
527
    auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql,
2✔
528
                killCompactReq.sqlLen);
529
  } else {
530
    mError("compact:%" PRId32 " failed to audit since %s", pCompact->compactId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
531
  }
532
_OVER:
×
533
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
534
    mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr());
×
535
  }
536

537
  tFreeSKillCompactReq(&killCompactReq);
2✔
538
  mndReleaseCompact(pMnode, pCompact);
2✔
539

540
  TAOS_RETURN(code);
2✔
541
}
542

543
// update progress
544
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId,
181✔
545
                                        SQueryCompactProgressRsp *rsp) {
546
  int32_t code = 0;
181✔
547

548
  void *pIter = NULL;
181✔
549
  while (1) {
184✔
550
    SCompactDetailObj *pDetail = NULL;
365✔
551
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
365✔
552
    if (pIter == NULL) break;
365!
553

554
    if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
365✔
555
      pDetail->newNumberFileset = rsp->numberFileset;
181✔
556
      pDetail->newFinished = rsp->finished;
181✔
557
      pDetail->progress = rsp->progress;
181✔
558
      pDetail->remainingTime = rsp->remainingTime;
181✔
559

560
      sdbCancelFetch(pMnode->pSdb, pIter);
181✔
561
      sdbRelease(pMnode->pSdb, pDetail);
181✔
562

563
      TAOS_RETURN(code);
181✔
564
    }
565

566
    sdbRelease(pMnode->pSdb, pDetail);
184✔
567
  }
568

569
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
×
570
}
571

572
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) {
181✔
573
  int32_t                  code = 0;
181✔
574
  SQueryCompactProgressRsp req = {0};
181✔
575
  if (pReq->code != 0) {
181!
576
    mError("received wrong compact response, req code is %s", tstrerror(pReq->code));
×
577
    TAOS_RETURN(pReq->code);
×
578
  }
579
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
181✔
580
  if (code != 0) {
180!
581
    mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
582
           pReq->contLen);
583
    TAOS_RETURN(code);
×
584
  }
585

586
  mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
180!
587
         req.vgId, req.dnodeId, req.numberFileset, req.finished);
588

589
  SMnode *pMnode = pReq->info.node;
181✔
590

591
  code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
181✔
592
  if (code != 0) {
181!
593
    mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
×
594
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
595
    TAOS_RETURN(code);
×
596
  }
597

598
  TAOS_RETURN(code);
181✔
599
}
600

601
// timer
602
void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
81✔
603
  void *pIter = NULL;
81✔
604

605
  while (1) {
184✔
606
    SCompactDetailObj *pDetail = NULL;
265✔
607
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
265✔
608
    if (pIter == NULL) break;
265✔
609

610
    if (pDetail->compactId == pCompact->compactId) {
184✔
611
      SEpSet epSet = {0};
181✔
612

613
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
181✔
614
      if (pDnode == NULL) break;
181!
615
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
181!
616
        sdbRelease(pMnode->pSdb, pDetail);
×
617
        continue;
×
618
      }
619
      mndReleaseDnode(pMnode, pDnode);
181✔
620

621
      SQueryCompactProgressReq req;
622
      req.compactId = pDetail->compactId;
181✔
623
      req.vgId = pDetail->vgId;
181✔
624
      req.dnodeId = pDetail->dnodeId;
181✔
625

626
      int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
181✔
627
      if (contLen < 0) {
181!
628
        sdbRelease(pMnode->pSdb, pDetail);
×
629
        continue;
×
630
      }
631

632
      contLen += sizeof(SMsgHead);
181✔
633

634
      SMsgHead *pHead = rpcMallocCont(contLen);
181✔
635
      if (pHead == NULL) {
181!
636
        sdbRelease(pMnode->pSdb, pDetail);
×
637
        continue;
×
638
      }
639

640
      pHead->contLen = htonl(contLen);
181✔
641
      pHead->vgId = htonl(pDetail->vgId);
181✔
642

643
      if (tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
181!
644
        sdbRelease(pMnode->pSdb, pDetail);
×
645
        continue;
×
646
      }
647

648
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen};
181✔
649

650
      rpcMsg.pCont = pHead;
181✔
651

652
      char    detail[1024] = {0};
181✔
653
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
362!
654
                              TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS), epSet.numOfEps, epSet.inUse);
362✔
655
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
362✔
656
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
181✔
657
      }
658

659
      mDebug("compact:%d, send update progress msg to %s", pDetail->compactId, detail);
181!
660

661
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
181!
662
        sdbRelease(pMnode->pSdb, pDetail);
×
663
        continue;
×
664
      }
665
    }
666

667
    sdbRelease(pMnode->pSdb, pDetail);
184✔
668
  }
669
}
81✔
670

671
static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
81✔
672
  int32_t code = 0;
81✔
673
  bool    needSave = false;
81✔
674
  void   *pIter = NULL;
81✔
675
  while (1) {
184✔
676
    SCompactDetailObj *pDetail = NULL;
265✔
677
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
265✔
678
    if (pIter == NULL) break;
265✔
679

680
    if (pDetail->compactId == compactId) {
184✔
681
      mDebug(
181!
682
          "compact:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
683
          "newNumberFileset:%d, newFinished:%d",
684
          pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
685
          pDetail->newNumberFileset, pDetail->newFinished);
686

687
      // these 2 number will jump back after dnode restart, so < is not used here
688
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
181✔
689
        needSave = true;
95✔
690
    }
691

692
    sdbRelease(pMnode->pSdb, pDetail);
184✔
693
  }
694

695
  char dbname[TSDB_TABLE_FNAME_LEN] = {0};
81✔
696
  TAOS_CHECK_RETURN(mndCompactGetDbName(pMnode, compactId, dbname, TSDB_TABLE_FNAME_LEN));
81!
697

698
  if (!mndDbIsExist(pMnode, dbname)) {
81!
699
    needSave = true;
×
700
    mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, dbname);
×
701
  }
702

703
  if (!needSave) {
81✔
704
    mDebug("compact:%" PRId32 ", no need to save", compactId);
40!
705
    TAOS_RETURN(code);
40✔
706
  }
707

708
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress");
41✔
709
  if (pTrans == NULL) {
41!
710
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
711
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
712
    if (terrno != 0) code = terrno;
×
713
    TAOS_RETURN(code);
×
714
  }
715
  mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
41!
716

717
  mndTransSetDbName(pTrans, dbname, NULL);
41✔
718

719
  pIter = NULL;
41✔
720
  while (1) {
98✔
721
    SCompactDetailObj *pDetail = NULL;
139✔
722
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
139✔
723
    if (pIter == NULL) break;
139✔
724

725
    if (pDetail->compactId == compactId) {
98✔
726
      mInfo(
97!
727
          "compact:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
728
          "newNumberFileset:%d, newFinished:%d",
729
          pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
730
          pDetail->newNumberFileset, pDetail->newFinished);
731

732
      pDetail->numberFileset = pDetail->newNumberFileset;
97✔
733
      pDetail->finished = pDetail->newFinished;
97✔
734

735
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
97✔
736
      if (pCommitRaw == NULL) {
97!
737
        sdbCancelFetch(pMnode->pSdb, pIter);
×
738
        sdbRelease(pMnode->pSdb, pDetail);
×
739
        mndTransDrop(pTrans);
×
740
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
741
        if (terrno != 0) code = terrno;
×
742
        TAOS_RETURN(code);
×
743
      }
744
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
97!
745
        mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr());
×
746
        sdbCancelFetch(pMnode->pSdb, pIter);
×
747
        sdbRelease(pMnode->pSdb, pDetail);
×
748
        mndTransDrop(pTrans);
×
749
        TAOS_RETURN(code);
×
750
      }
751
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
97!
752
        sdbCancelFetch(pMnode->pSdb, pIter);
×
753
        sdbRelease(pMnode->pSdb, pDetail);
×
754
        mndTransDrop(pTrans);
×
755
        TAOS_RETURN(code);
×
756
      }
757
    }
758

759
    sdbRelease(pMnode->pSdb, pDetail);
98✔
760
  }
761

762
  bool allFinished = true;
41✔
763
  pIter = NULL;
41✔
764
  while (1) {
95✔
765
    SCompactDetailObj *pDetail = NULL;
136✔
766
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
136✔
767
    if (pIter == NULL) break;
136✔
768

769
    if (pDetail->compactId == compactId) {
96✔
770
      mInfo("compact:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
95!
771
            pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
772

773
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
95!
774
        allFinished = false;
×
775
        sdbCancelFetch(pMnode->pSdb, pIter);
×
776
        sdbRelease(pMnode->pSdb, pDetail);
×
777
        break;
×
778
      }
779
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
95!
780
        allFinished = false;
1✔
781
        sdbCancelFetch(pMnode->pSdb, pIter);
1✔
782
        sdbRelease(pMnode->pSdb, pDetail);
1✔
783
        break;
1✔
784
      }
785
    }
786

787
    sdbRelease(pMnode->pSdb, pDetail);
95✔
788
  }
789

790
  if (!mndDbIsExist(pMnode, dbname)) {
41!
791
    allFinished = true;
×
792
    mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, dbname);
×
793
  }
794

795
  if (allFinished) {
41✔
796
    mInfo("compact:%d, all finished", compactId);
40!
797
    pIter = NULL;
40✔
798
    while (1) {
95✔
799
      SCompactDetailObj *pDetail = NULL;
135✔
800
      pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
135✔
801
      if (pIter == NULL) break;
135✔
802

803
      if (pDetail->compactId == compactId) {
95✔
804
        SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
94✔
805
        if (pCommitRaw == NULL) {
94!
806
          mndTransDrop(pTrans);
×
807
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
808
          if (terrno != 0) code = terrno;
×
809
          TAOS_RETURN(code);
×
810
        }
811
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
94!
812
          mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id,
×
813
                 terrstr());
814
          sdbCancelFetch(pMnode->pSdb, pIter);
×
815
          sdbRelease(pMnode->pSdb, pDetail);
×
816
          mndTransDrop(pTrans);
×
817
          TAOS_RETURN(code);
×
818
        }
819
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
94!
820
          sdbCancelFetch(pMnode->pSdb, pIter);
×
821
          sdbRelease(pMnode->pSdb, pDetail);
×
822
          mndTransDrop(pTrans);
×
823
          TAOS_RETURN(code);
×
824
        }
825
        mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId);
94!
826
      }
827

828
      sdbRelease(pMnode->pSdb, pDetail);
95✔
829
    }
830

831
    SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
40✔
832
    if (pCompact == NULL) {
40!
833
      mndTransDrop(pTrans);
×
834
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
835
      if (terrno != 0) code = terrno;
×
836
      TAOS_RETURN(code);
×
837
    }
838
    SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
40✔
839
    mndReleaseCompact(pMnode, pCompact);
40✔
840
    if (pCommitRaw == NULL) {
40!
841
      mndTransDrop(pTrans);
×
842
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
843
      if (terrno != 0) code = terrno;
×
844
      TAOS_RETURN(code);
×
845
    }
846
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
40!
847
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
848
      mndTransDrop(pTrans);
×
849
      TAOS_RETURN(code);
×
850
    }
851
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
40!
852
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
853
      mndTransDrop(pTrans);
×
854
      TAOS_RETURN(code);
×
855
    }
856
    mInfo("compact:%d, add drop compact action", pCompact->compactId);
40!
857
  }
858

859
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
41!
860
    mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
×
861
    mndTransDrop(pTrans);
×
862
    TAOS_RETURN(code);
×
863
  }
864

865
  mndTransDrop(pTrans);
41✔
866
  return 0;
41✔
867
}
868

869
static void mndCompactPullup(SMnode *pMnode) {
5,036✔
870
  int32_t code = 0;
5,036✔
871
  SSdb   *pSdb = pMnode->pSdb;
5,036✔
872
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t));
5,036✔
873
  if (pArray == NULL) return;
5,036!
874

875
  void *pIter = NULL;
5,036✔
876
  while (1) {
81✔
877
    SCompactObj *pCompact = NULL;
5,117✔
878
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
5,117✔
879
    if (pIter == NULL) break;
5,117✔
880
    if (taosArrayPush(pArray, &pCompact->compactId) == NULL) {
162!
881
      mError("failed to push compact id:%d into array, but continue pull up", pCompact->compactId);
×
882
    }
883
    sdbRelease(pSdb, pCompact);
81✔
884
  }
885

886
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
5,117✔
887
    mInfo("begin to pull up");
81!
888
    int32_t     *pCompactId = taosArrayGet(pArray, i);
81✔
889
    SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);
81✔
890
    if (pCompact != NULL) {
81!
891
      mInfo("compact:%d, begin to pull up", pCompact->compactId);
81!
892
      mndCompactSendProgressReq(pMnode, pCompact);
81✔
893
      if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
81!
894
        mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
×
895
      }
896
      mndReleaseCompact(pMnode, pCompact);
81✔
897
    }
898
  }
899
  taosArrayDestroy(pArray);
5,036✔
900
}
901
#ifdef TD_ENTERPRISE
902
static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
×
903
  if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
×
904
    return 0;
×
905
  }
906

907
  SName   name = {0};
×
908
  int32_t sqlLen = 0;
×
909
  char    sql[256] = {0};
×
910
  char    skeyStr[40] = {0};
×
911
  char    ekeyStr[40] = {0};
×
912
  char   *pDbName = pDb->name;
×
913

914
  if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) {
×
915
    pDbName = name.dbname;
×
916
  }
917

918
  if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 &&
×
919
      taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) {
×
920
    sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr);
×
921
  } else {
922
    sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
×
923
                       tw->ekey);
924
  }
925
  auditRecord(NULL, pMnode->clusterId, "autoCompactDB", name.dbname, "", sql, sqlLen);
×
926

927
  return 0;
×
928
}
929

930
extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds,
931
                            bool metaOnly);
932
static int32_t mndCompactDispatch(SRpcMsg *pReq) {
5,036✔
933
  int32_t code = 0;
5,036✔
934
  SMnode *pMnode = pReq->info.node;
5,036✔
935
  SSdb   *pSdb = pMnode->pSdb;
5,036✔
936
  int64_t curMs = taosGetTimestampMs();
5,036✔
937
  int64_t curMin = curMs / 60000LL;
5,036✔
938

939
  void   *pIter = NULL;
5,036✔
940
  SDbObj *pDb = NULL;
5,036✔
941
  while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
13,811✔
942
    if (pDb->cfg.compactInterval <= 0) {
8,775✔
943
      mDebug("db:%p,%s, compact interval is %dm, skip", pDb, pDb->name, pDb->cfg.compactInterval);
8,770✔
944
      sdbRelease(pSdb, pDb);
8,770✔
945
      continue;
8,775✔
946
    }
947

948
    if (pDb->cfg.isMount) {
5!
949
      sdbRelease(pSdb, pDb);
×
950
      continue;
×
951
    }
952

953
    // daysToKeep2 would be altered
954
    if (pDb->cfg.compactEndTime && (pDb->cfg.compactEndTime <= -pDb->cfg.daysToKeep2)) {
5!
955
      mWarn("db:%p,%s, compact end time:%dm <= -keep2:%dm , skip", pDb, pDb->name, pDb->cfg.compactEndTime,
×
956
            -pDb->cfg.daysToKeep2);
957
      sdbRelease(pSdb, pDb);
×
958
      continue;
×
959
    }
960

961
    int64_t compactStartTime = pDb->cfg.compactStartTime ? pDb->cfg.compactStartTime : -pDb->cfg.daysToKeep2;
5✔
962
    int64_t compactEndTime = pDb->cfg.compactEndTime ? pDb->cfg.compactEndTime : -pDb->cfg.daysPerFile;
5✔
963

964
    if (compactStartTime >= compactEndTime) {
5!
965
      mDebug("db:%p,%s, compact start time:%" PRIi64 "m >= end time:%" PRIi64 "m, skip", pDb, pDb->name,
×
966
             compactStartTime, compactEndTime);
967
      sdbRelease(pSdb, pDb);
×
968
      continue;
×
969
    }
970

971
    int64_t remainder = ((curMin - (int64_t)pDb->cfg.compactTimeOffset * 60LL) % pDb->cfg.compactInterval);
5✔
972
    if (remainder != 0) {
5!
973
      mDebug("db:%p,%s, current time:%" PRIi64 "m is not divisible by compact interval:%dm, offset:%" PRIi8
5!
974
             "h, remainder:%" PRIi64 "m, skip",
975
             pDb, pDb->name, curMin, pDb->cfg.compactInterval, pDb->cfg.compactTimeOffset, remainder);
976
      sdbRelease(pSdb, pDb);
5✔
977
      continue;
5✔
978
    }
979

980
    if ((pDb->compactStartTime / 60000LL) == curMin) {
×
981
      mDebug("db:%p:%s, compact has already been dispatched at %" PRIi64 "m(%" PRIi64 "ms), skip", pDb, pDb->name,
×
982
             curMin, pDb->compactStartTime);
983
      sdbRelease(pSdb, pDb);
×
984
      continue;
×
985
    }
986

987
    STimeWindow tw = {
×
988
        .skey = convertTimePrecision(curMs + compactStartTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision),
×
989
        .ekey = convertTimePrecision(curMs + compactEndTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)};
×
990

991
    if ((code = mndCompactDb(pMnode, NULL, pDb, tw, NULL, false)) == 0) {
×
992
      mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
993
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h",
994
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
995
            pDb->cfg.compactTimeOffset);
996
    } else {
997
      mWarn("db:%p,%s, failed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
998
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h, since %s",
999
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
1000
            pDb->cfg.compactTimeOffset, tstrerror(code));
1001
    }
1002

1003
    TAOS_UNUSED(mndCompactDispatchAudit(pMnode, pReq, pDb, &tw));
×
1004

1005
    sdbRelease(pSdb, pDb);
×
1006
  }
1007
  return 0;
5,036✔
1008
}
1009
#endif
1010

1011
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
5,036✔
1012
#ifdef TD_ENTERPRISE
1013
  mTrace("start to process compact timer");
5,036✔
1014
  mndCompactPullup(pReq->info.node);
5,036✔
1015
  TAOS_UNUSED(mndCompactDispatch(pReq));
5,036✔
1016
#endif
1017
  return 0;
5,036✔
1018
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc