• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.62
/source/dnode/mnode/impl/src/mndCompact.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "audit.h"
16
#include "mndCompact.h"
17
#include "mndCompactDetail.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndPrivilege.h"
21
#include "mndShow.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tmisce.h"
25
#include "tmsgcb.h"
26

27
#define MND_COMPACT_VER_NUMBER 1
28
#define MND_COMPACT_ID_LEN     11
29

30
static int32_t mndProcessCompactTimer(SRpcMsg *pReq);
31

32
int32_t mndInitCompact(SMnode *pMnode) {
1,518✔
33
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact);
1,518✔
34
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq);
1,518✔
35
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp);
1,518✔
36
  mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer);
1,518✔
37
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_COMPACT_RSP, mndTransProcessRsp);
1,518✔
38

39
  SSdbTable table = {
1,518✔
40
      .sdbType = SDB_COMPACT,
41
      .keyType = SDB_KEY_INT32,
42
      .encodeFp = (SdbEncodeFp)mndCompactActionEncode,
43
      .decodeFp = (SdbDecodeFp)mndCompactActionDecode,
44
      .insertFp = (SdbInsertFp)mndCompactActionInsert,
45
      .updateFp = (SdbUpdateFp)mndCompactActionUpdate,
46
      .deleteFp = (SdbDeleteFp)mndCompactActionDelete,
47
  };
48

49
  return sdbSetTable(pMnode->pSdb, table);
1,518✔
50
}
51

52
void mndCleanupCompact(SMnode *pMnode) { mDebug("mnd compact cleanup"); }
1,517✔
53

54
void tFreeCompactObj(SCompactObj *pCompact) {}
9✔
55

56
int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) {
20✔
57
  SEncoder encoder = {0};
20✔
58
  int32_t  code = 0;
20✔
59
  int32_t  lino;
60
  int32_t  tlen;
61
  tEncoderInit(&encoder, buf, bufLen);
20✔
62

63
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
20!
64
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->compactId));
40!
65
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
40!
66
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
40!
67

68
  tEndEncode(&encoder);
20✔
69

70
_exit:
20✔
71
  if (code) {
20!
72
    tlen = code;
×
73
  } else {
74
    tlen = encoder.pos;
20✔
75
  }
76
  tEncoderClear(&encoder);
20✔
77
  return tlen;
20✔
78
}
79

80
int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) {
9✔
81
  int32_t  code = 0;
9✔
82
  int32_t  lino;
83
  SDecoder decoder = {0};
9✔
84
  tDecoderInit(&decoder, buf, bufLen);
9✔
85

86
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
9!
87
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->compactId));
18!
88
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
9!
89
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
18!
90

91
  tEndDecode(&decoder);
9✔
92

93
_exit:
9✔
94
  tDecoderClear(&decoder);
9✔
95
  return code;
9✔
96
}
97

98
SSdbRaw *mndCompactActionEncode(SCompactObj *pCompact) {
10✔
99
  int32_t code = 0;
10✔
100
  int32_t lino = 0;
10✔
101
  terrno = TSDB_CODE_SUCCESS;
10✔
102

103
  void    *buf = NULL;
10✔
104
  SSdbRaw *pRaw = NULL;
10✔
105

106
  int32_t tlen = tSerializeSCompactObj(NULL, 0, pCompact);
10✔
107
  if (tlen < 0) {
10!
108
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
109
    goto OVER;
×
110
  }
111

112
  int32_t size = sizeof(int32_t) + tlen;
10✔
113
  pRaw = sdbAllocRaw(SDB_COMPACT, MND_COMPACT_VER_NUMBER, size);
10✔
114
  if (pRaw == NULL) {
10!
115
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
116
    goto OVER;
×
117
  }
118

119
  buf = taosMemoryMalloc(tlen);
10!
120
  if (buf == NULL) {
10!
121
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
122
    goto OVER;
×
123
  }
124

125
  tlen = tSerializeSCompactObj(buf, tlen, pCompact);
10✔
126
  if (tlen < 0) {
10!
127
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
128
    goto OVER;
×
129
  }
130

131
  int32_t dataPos = 0;
10✔
132
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
10!
133
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
10!
134
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
10!
135

136
OVER:
10✔
137
  taosMemoryFreeClear(buf);
10!
138
  if (terrno != TSDB_CODE_SUCCESS) {
10!
139
    mError("compact:%" PRId32 ", failed to encode to raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
140
    sdbFreeRaw(pRaw);
×
141
    return NULL;
×
142
  }
143

144
  mTrace("compact:%" PRId32 ", encode to raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
10!
145
  return pRaw;
10✔
146
}
147

148
SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) {
9✔
149
  int32_t      code = 0;
9✔
150
  int32_t      lino = 0;
9✔
151
  SSdbRow     *pRow = NULL;
9✔
152
  SCompactObj *pCompact = NULL;
9✔
153
  void        *buf = NULL;
9✔
154
  terrno = TSDB_CODE_SUCCESS;
9✔
155

156
  int8_t sver = 0;
9✔
157
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
9!
158
    goto OVER;
×
159
  }
160

161
  if (sver != MND_COMPACT_VER_NUMBER) {
9!
162
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
163
    mError("compact read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER);
×
164
    goto OVER;
×
165
  }
166

167
  pRow = sdbAllocRow(sizeof(SCompactObj));
9✔
168
  if (pRow == NULL) {
9!
169
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
170
    goto OVER;
×
171
  }
172

173
  pCompact = sdbGetRowObj(pRow);
9✔
174
  if (pCompact == NULL) {
9!
175
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
176
    goto OVER;
×
177
  }
178

179
  int32_t tlen;
180
  int32_t dataPos = 0;
9✔
181
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
9!
182
  buf = taosMemoryMalloc(tlen + 1);
9!
183
  if (buf == NULL) {
9!
184
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
185
    goto OVER;
×
186
  }
187
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
9!
188

189
  if ((terrno = tDeserializeSCompactObj(buf, tlen, pCompact)) < 0) {
9!
190
    goto OVER;
×
191
  }
192

193
OVER:
9✔
194
  taosMemoryFreeClear(buf);
9!
195
  if (terrno != TSDB_CODE_SUCCESS) {
9!
196
    mError("compact:%" PRId32 ", failed to decode from raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
197
    taosMemoryFreeClear(pRow);
×
198
    return NULL;
×
199
  }
200

201
  mTrace("compact:%" PRId32 ", decode from raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
9!
202
  return pRow;
9✔
203
}
204

205
int32_t mndCompactActionInsert(SSdb *pSdb, SCompactObj *pCompact) {
5✔
206
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
5!
207
  return 0;
5✔
208
}
209

210
int32_t mndCompactActionDelete(SSdb *pSdb, SCompactObj *pCompact) {
9✔
211
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
9!
212
  tFreeCompactObj(pCompact);
9✔
213
  return 0;
9✔
214
}
215

216
int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj *pNewCompact) {
×
217
  mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p", pOldCompact->compactId, pOldCompact,
×
218
         pNewCompact);
219

220
  return 0;
×
221
}
222

223
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
20✔
224
  SSdb        *pSdb = pMnode->pSdb;
20✔
225
  SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId);
20✔
226
  if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
20!
227
    terrno = TSDB_CODE_SUCCESS;
×
228
  }
229
  return pCompact;
20✔
230
}
231

232
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
20✔
233
  SSdb *pSdb = pMnode->pSdb;
20✔
234
  sdbRelease(pSdb, pCompact);
20✔
235
  pCompact = NULL;
20✔
236
}
20✔
237

238
int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len) {
8✔
239
  int32_t      code = 0;
8✔
240
  SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
8✔
241
  if (pCompact == NULL) {
8!
242
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
243
    if (terrno != 0) code = terrno;
×
244
    TAOS_RETURN(code);
×
245
  }
246

247
  tstrncpy(dbname, pCompact->dbname, len);
8✔
248
  mndReleaseCompact(pMnode, pCompact);
8✔
249
  TAOS_RETURN(code);
8✔
250
}
251

252
// compact db
253
int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SDbObj *pDb, SCompactDbRsp *rsp) {
5✔
254
  int32_t code = 0;
5✔
255
  pCompact->compactId = tGenIdPI32();
5✔
256

257
  tstrncpy(pCompact->dbname, pDb->name, sizeof(pCompact->dbname));
5✔
258

259
  pCompact->startTime = taosGetTimestampMs();
5✔
260

261
  SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact);
5✔
262
  if (pVgRaw == NULL) {
5!
263
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
264
    if (terrno != 0) code = terrno;
×
265
    TAOS_RETURN(code);
×
266
  }
267
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
5!
268
    sdbFreeRaw(pVgRaw);
×
269
    TAOS_RETURN(code);
×
270
  }
271

272
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
5!
273
    sdbFreeRaw(pVgRaw);
×
274
    TAOS_RETURN(code);
×
275
  }
276

277
  rsp->compactId = pCompact->compactId;
5✔
278

279
  return 0;
5✔
280
}
281

282
// retrieve compact
283
int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
82✔
284
  SMnode      *pMnode = pReq->info.node;
82✔
285
  SSdb        *pSdb = pMnode->pSdb;
82✔
286
  int32_t      numOfRows = 0;
82✔
287
  SCompactObj *pCompact = NULL;
82✔
288
  char        *sep = NULL;
82✔
289
  SDbObj      *pDb = NULL;
82✔
290
  int32_t      code = 0;
82✔
291
  int32_t      lino = 0;
82✔
292

293
  if (strlen(pShow->db) > 0) {
82!
294
    sep = strchr(pShow->db, '.');
×
295
    if (sep &&
×
296
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
297
      sep++;
×
298
    } else {
299
      pDb = mndAcquireDb(pMnode, pShow->db);
×
300
      if (pDb == NULL) return terrno;
×
301
    }
302
  }
303

304
  while (numOfRows < rows) {
160!
305
    pShow->pIter = sdbFetch(pSdb, SDB_COMPACT, pShow->pIter, (void **)&pCompact);
160✔
306
    if (pShow->pIter == NULL) break;
160✔
307

308
    SColumnInfoData *pColInfo;
309
    SName            n;
310
    int32_t          cols = 0;
78✔
311

312
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
78✔
313

314
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
78✔
315
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->compactId, false), pCompact, &lino,
78!
316
                        _OVER);
317

318
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
78✔
319
    if (pDb != NULL || !IS_SYS_DBNAME(pCompact->dbname)) {
156!
320
      SName name = {0};
78✔
321
      TAOS_CHECK_GOTO(tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
78!
322
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
78✔
323
    } else {
324
      tstrncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
×
325
    }
326
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
78✔
327
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pCompact, &lino, _OVER);
78!
328

329
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
78✔
330
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->startTime, false), pCompact, &lino,
78!
331
                        _OVER);
332

333
    numOfRows++;
78✔
334
    sdbRelease(pSdb, pCompact);
78✔
335
  }
336

UNCOV
337
_OVER:
×
338
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
82!
339
  pShow->numOfRows += numOfRows;
82✔
340
  mndReleaseDb(pMnode, pDb);
82✔
341
  return numOfRows;
82✔
342
}
343

344
// kill compact
345
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId,
×
346
                                    int32_t dnodeid) {
347
  SVKillCompactReq req = {0};
×
348
  req.compactId = compactId;
×
349
  req.vgId = pVgroup->vgId;
×
350
  req.dnodeId = dnodeid;
×
351

352
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
×
353
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
×
354
  if (contLen < 0) {
×
355
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
356
    return NULL;
×
357
  }
358
  contLen += sizeof(SMsgHead);
×
359

360
  void *pReq = taosMemoryMalloc(contLen);
×
361
  if (pReq == NULL) {
×
362
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
363
    return NULL;
×
364
  }
365

366
  SMsgHead *pHead = pReq;
×
367
  pHead->contLen = htonl(contLen);
×
368
  pHead->vgId = htonl(pVgroup->vgId);
×
369

370
  if ((contLen = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
×
371
    terrno = contLen;
×
372
    return NULL;
×
373
  }
374
  *pContLen = contLen;
×
375
  return pReq;
×
376
}
377

378
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId,
×
379
                                       int32_t dnodeid) {
380
  int32_t      code = 0;
×
381
  STransAction action = {0};
×
382

383
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
×
384
  if (pDnode == NULL) {
×
385
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
386
    if (terrno != 0) code = terrno;
×
387
    TAOS_RETURN(code);
×
388
  }
389
  action.epSet = mndGetDnodeEpset(pDnode);
×
390
  mndReleaseDnode(pMnode, pDnode);
×
391

392
  int32_t contLen = 0;
×
393
  void   *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid);
×
394
  if (pReq == NULL) {
×
395
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
396
    if (terrno != 0) code = terrno;
×
397
    TAOS_RETURN(code);
×
398
  }
399

400
  action.pCont = pReq;
×
401
  action.contLen = contLen;
×
402
  action.msgType = TDMT_VND_KILL_COMPACT;
×
403

404
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
405
    taosMemoryFree(pReq);
×
406
    TAOS_RETURN(code);
×
407
  }
408

409
  return 0;
×
410
}
411

412
static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) {
×
413
  int32_t code = 0;
×
414
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact");
×
415
  if (pTrans == NULL) {
×
416
    mError("compact:%" PRId32 ", failed to drop since %s", pCompact->compactId, terrstr());
×
417
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
418
    if (terrno != 0) code = terrno;
×
419
    TAOS_RETURN(code);
×
420
  }
421
  mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId);
×
422

423
  mndTransSetDbName(pTrans, pCompact->dbname, NULL);
×
424

425
  SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
×
426
  if (pCommitRaw == NULL) {
×
427
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
428
    if (terrno != 0) code = terrno;
×
429
    mndTransDrop(pTrans);
×
430
    TAOS_RETURN(code);
×
431
  }
432
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
433
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
434
    mndTransDrop(pTrans);
×
435
    TAOS_RETURN(code);
×
436
  }
437
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
×
438
    mndTransDrop(pTrans);
×
439
    TAOS_RETURN(code);
×
440
  }
441

442
  void *pIter = NULL;
×
443
  while (1) {
×
444
    SCompactDetailObj *pDetail = NULL;
×
445
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
446
    if (pIter == NULL) break;
×
447

448
    if (pDetail->compactId == pCompact->compactId) {
×
449
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
×
450
      if (pVgroup == NULL) {
×
451
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
452
        sdbCancelFetch(pMnode->pSdb, pIter);
×
453
        sdbRelease(pMnode->pSdb, pDetail);
×
454
        mndTransDrop(pTrans);
×
455
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
456
        if (terrno != 0) code = terrno;
×
457
        TAOS_RETURN(code);
×
458
      }
459

460
      if ((code = mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId)) != 0) {
×
461
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
462
        sdbCancelFetch(pMnode->pSdb, pIter);
×
463
        sdbRelease(pMnode->pSdb, pDetail);
×
464
        mndTransDrop(pTrans);
×
465
        TAOS_RETURN(code);
×
466
      }
467

468
      mndReleaseVgroup(pMnode, pVgroup);
×
469

470
      /*
471
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
472
      if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
473
        mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
474
        mndTransDrop(pTrans);
475
        return -1;
476
      }
477
      sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
478
      */
479
    }
480

481
    sdbRelease(pMnode->pSdb, pDetail);
×
482
  }
483

484
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
485
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
486
    mndTransDrop(pTrans);
×
487
    TAOS_RETURN(code);
×
488
  }
489

490
  mndTransDrop(pTrans);
×
491
  return 0;
×
492
}
493

494
int32_t mndProcessKillCompactReq(SRpcMsg *pReq) {
×
495
  int32_t         code = 0;
×
496
  int32_t         lino = 0;
×
497
  SKillCompactReq killCompactReq = {0};
×
498

499
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq)) != 0) {
×
500
    TAOS_RETURN(code);
×
501
  }
502

503
  mInfo("start to kill compact:%" PRId32, killCompactReq.compactId);
×
504

505
  SMnode      *pMnode = pReq->info.node;
×
506
  SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId);
×
507
  if (pCompact == NULL) {
×
508
    code = TSDB_CODE_MND_INVALID_COMPACT_ID;
×
509
    tFreeSKillCompactReq(&killCompactReq);
×
510
    TAOS_RETURN(code);
×
511
  }
512

513
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB), &lino, _OVER);
×
514

515
  TAOS_CHECK_GOTO(mndKillCompact(pMnode, pReq, pCompact), &lino, _OVER);
×
516

517
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
518

519
  char    obj[TSDB_INT32_ID_LEN] = {0};
×
520
  int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pCompact->compactId);
×
521
  if ((uint32_t)nBytes < sizeof(obj)) {
×
522
    auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql,
×
523
                killCompactReq.sqlLen);
524
  } else {
525
    mError("compact:%" PRId32 " failed to audit since %s", pCompact->compactId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
526
  }
527
_OVER:
×
528
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
529
    mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr());
×
530
  }
531

532
  tFreeSKillCompactReq(&killCompactReq);
×
533
  mndReleaseCompact(pMnode, pCompact);
×
534

535
  TAOS_RETURN(code);
×
536
}
537

538
// update progress
539
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId,
32✔
540
                                        SQueryCompactProgressRsp *rsp) {
541
  int32_t code = 0;
32✔
542

543
  void *pIter = NULL;
32✔
544
  while (1) {
48✔
545
    SCompactDetailObj *pDetail = NULL;
80✔
546
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
80✔
547
    if (pIter == NULL) break;
80!
548

549
    if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
80!
550
      pDetail->newNumberFileset = rsp->numberFileset;
32✔
551
      pDetail->newFinished = rsp->finished;
32✔
552
      pDetail->progress = rsp->progress;
32✔
553
      pDetail->remainingTime = rsp->remainingTime;
32✔
554

555
      sdbCancelFetch(pMnode->pSdb, pIter);
32✔
556
      sdbRelease(pMnode->pSdb, pDetail);
32✔
557

558
      TAOS_RETURN(code);
32✔
559
    }
560

561
    sdbRelease(pMnode->pSdb, pDetail);
48✔
562
  }
563

564
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
×
565
}
566

567
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) {
32✔
568
  int32_t                  code = 0;
32✔
569
  SQueryCompactProgressRsp req = {0};
32✔
570
  if (pReq->code != 0) {
32!
571
    mError("received wrong compact response, req code is %s", tstrerror(pReq->code));
×
572
    TAOS_RETURN(pReq->code);
×
573
  }
574
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
32✔
575
  if (code != 0) {
32!
576
    mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
577
           pReq->contLen);
578
    TAOS_RETURN(code);
×
579
  }
580

581
  mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
32!
582
         req.vgId, req.dnodeId, req.numberFileset, req.finished);
583

584
  SMnode *pMnode = pReq->info.node;
32✔
585

586
  code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
32✔
587
  if (code != 0) {
32!
588
    mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
×
589
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
590
    TAOS_RETURN(code);
×
591
  }
592

593
  TAOS_RETURN(code);
32✔
594
}
595

596
// timer
597
void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
8✔
598
  void *pIter = NULL;
8✔
599

600
  while (1) {
32✔
601
    SCompactDetailObj *pDetail = NULL;
40✔
602
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
40✔
603
    if (pIter == NULL) break;
40✔
604

605
    if (pDetail->compactId == pCompact->compactId) {
32!
606
      SEpSet epSet = {0};
32✔
607

608
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
32✔
609
      if (pDnode == NULL) break;
32!
610
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
32!
611
        sdbRelease(pMnode->pSdb, pDetail);
×
612
        continue;
×
613
      }
614
      mndReleaseDnode(pMnode, pDnode);
32✔
615

616
      SQueryCompactProgressReq req;
617
      req.compactId = pDetail->compactId;
32✔
618
      req.vgId = pDetail->vgId;
32✔
619
      req.dnodeId = pDetail->dnodeId;
32✔
620

621
      int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
32✔
622
      if (contLen < 0) {
32!
623
        sdbRelease(pMnode->pSdb, pDetail);
×
624
        continue;
×
625
      }
626

627
      contLen += sizeof(SMsgHead);
32✔
628

629
      SMsgHead *pHead = rpcMallocCont(contLen);
32✔
630
      if (pHead == NULL) {
32!
631
        sdbRelease(pMnode->pSdb, pDetail);
×
632
        continue;
×
633
      }
634

635
      pHead->contLen = htonl(contLen);
32✔
636
      pHead->vgId = htonl(pDetail->vgId);
32✔
637

638
      if (tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
32!
639
        sdbRelease(pMnode->pSdb, pDetail);
×
640
        continue;
×
641
      }
642

643
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen};
32✔
644

645
      rpcMsg.pCont = pHead;
32✔
646

647
      char    detail[1024] = {0};
32✔
648
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
64!
649
                              TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS), epSet.numOfEps, epSet.inUse);
64✔
650
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
64✔
651
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
32✔
652
      }
653

654
      mDebug("compact:%d, send update progress msg to %s", pDetail->compactId, detail);
32!
655

656
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
32!
657
        sdbRelease(pMnode->pSdb, pDetail);
×
658
        continue;
×
659
      }
660
    }
661

662
    sdbRelease(pMnode->pSdb, pDetail);
32✔
663
  }
664
}
8✔
665

666
static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
8✔
667
  int32_t code = 0;
8✔
668
  bool    needSave = false;
8✔
669
  void   *pIter = NULL;
8✔
670
  while (1) {
32✔
671
    SCompactDetailObj *pDetail = NULL;
40✔
672
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
40✔
673
    if (pIter == NULL) break;
40✔
674

675
    if (pDetail->compactId == compactId) {
32!
676
      mDebug(
32!
677
          "compact:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
678
          "newNumberFileset:%d, newFinished:%d",
679
          pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
680
          pDetail->newNumberFileset, pDetail->newFinished);
681

682
      // these 2 number will jump back after dnode restart, so < is not used here
683
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
32!
684
        needSave = true;
16✔
685
    }
686

687
    sdbRelease(pMnode->pSdb, pDetail);
32✔
688
  }
689

690
  char dbname[TSDB_TABLE_FNAME_LEN] = {0};
8✔
691
  TAOS_CHECK_RETURN(mndCompactGetDbName(pMnode, compactId, dbname, TSDB_TABLE_FNAME_LEN));
8!
692

693
  if (!mndDbIsExist(pMnode, dbname)) {
8!
694
    needSave = true;
×
695
    mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, dbname);
×
696
  }
697

698
  if (!needSave) {
8✔
699
    mDebug("compact:%" PRId32 ", no need to save", compactId);
4!
700
    TAOS_RETURN(code);
4✔
701
  }
702

703
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress");
4✔
704
  if (pTrans == NULL) {
4!
705
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
706
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
707
    if (terrno != 0) code = terrno;
×
708
    TAOS_RETURN(code);
×
709
  }
710
  mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
4!
711

712
  mndTransSetDbName(pTrans, dbname, NULL);
4✔
713

714
  pIter = NULL;
4✔
715
  while (1) {
16✔
716
    SCompactDetailObj *pDetail = NULL;
20✔
717
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
20✔
718
    if (pIter == NULL) break;
20✔
719

720
    if (pDetail->compactId == compactId) {
16!
721
      mInfo(
16!
722
          "compact:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
723
          "newNumberFileset:%d, newFinished:%d",
724
          pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
725
          pDetail->newNumberFileset, pDetail->newFinished);
726

727
      pDetail->numberFileset = pDetail->newNumberFileset;
16✔
728
      pDetail->finished = pDetail->newFinished;
16✔
729

730
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
16✔
731
      if (pCommitRaw == NULL) {
16!
732
        sdbCancelFetch(pMnode->pSdb, pIter);
×
733
        sdbRelease(pMnode->pSdb, pDetail);
×
734
        mndTransDrop(pTrans);
×
735
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
736
        if (terrno != 0) code = terrno;
×
737
        TAOS_RETURN(code);
×
738
      }
739
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
16!
740
        mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr());
×
741
        sdbCancelFetch(pMnode->pSdb, pIter);
×
742
        sdbRelease(pMnode->pSdb, pDetail);
×
743
        mndTransDrop(pTrans);
×
744
        TAOS_RETURN(code);
×
745
      }
746
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
16!
747
        sdbCancelFetch(pMnode->pSdb, pIter);
×
748
        sdbRelease(pMnode->pSdb, pDetail);
×
749
        mndTransDrop(pTrans);
×
750
        TAOS_RETURN(code);
×
751
      }
752
    }
753

754
    sdbRelease(pMnode->pSdb, pDetail);
16✔
755
  }
756

757
  bool allFinished = true;
4✔
758
  pIter = NULL;
4✔
759
  while (1) {
16✔
760
    SCompactDetailObj *pDetail = NULL;
20✔
761
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
20✔
762
    if (pIter == NULL) break;
20✔
763

764
    if (pDetail->compactId == compactId) {
16!
765
      mInfo("compact:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
16!
766
            pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
767

768
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
16!
UNCOV
769
        allFinished = false;
×
UNCOV
770
        sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
771
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
772
        break;
×
773
      }
774
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
16!
775
        allFinished = false;
×
776
        sdbCancelFetch(pMnode->pSdb, pIter);
×
777
        sdbRelease(pMnode->pSdb, pDetail);
×
778
        break;
×
779
      }
780
    }
781

782
    sdbRelease(pMnode->pSdb, pDetail);
16✔
783
  }
784

785
  if (!mndDbIsExist(pMnode, dbname)) {
4!
786
    allFinished = true;
×
787
    mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, dbname);
×
788
  }
789

790
  if (allFinished) {
4!
791
    mInfo("compact:%d, all finished", compactId);
4!
792
    pIter = NULL;
4✔
793
    while (1) {
16✔
794
      SCompactDetailObj *pDetail = NULL;
20✔
795
      pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
20✔
796
      if (pIter == NULL) break;
20✔
797

798
      if (pDetail->compactId == compactId) {
16!
799
        SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
16✔
800
        if (pCommitRaw == NULL) {
16!
801
          mndTransDrop(pTrans);
×
802
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
803
          if (terrno != 0) code = terrno;
×
804
          TAOS_RETURN(code);
×
805
        }
806
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
16!
807
          mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id,
×
808
                 terrstr());
809
          sdbCancelFetch(pMnode->pSdb, pIter);
×
810
          sdbRelease(pMnode->pSdb, pDetail);
×
811
          mndTransDrop(pTrans);
×
812
          TAOS_RETURN(code);
×
813
        }
814
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
16!
815
          sdbCancelFetch(pMnode->pSdb, pIter);
×
816
          sdbRelease(pMnode->pSdb, pDetail);
×
817
          mndTransDrop(pTrans);
×
818
          TAOS_RETURN(code);
×
819
        }
820
        mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId);
16!
821
      }
822

823
      sdbRelease(pMnode->pSdb, pDetail);
16✔
824
    }
825

826
    SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
4✔
827
    if (pCompact == NULL) {
4!
828
      mndTransDrop(pTrans);
×
829
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
830
      if (terrno != 0) code = terrno;
×
831
      TAOS_RETURN(code);
×
832
    }
833
    SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
4✔
834
    mndReleaseCompact(pMnode, pCompact);
4✔
835
    if (pCommitRaw == NULL) {
4!
836
      mndTransDrop(pTrans);
×
837
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
838
      if (terrno != 0) code = terrno;
×
839
      TAOS_RETURN(code);
×
840
    }
841
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
4!
842
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
843
      mndTransDrop(pTrans);
×
844
      TAOS_RETURN(code);
×
845
    }
846
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
4!
847
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
848
      mndTransDrop(pTrans);
×
849
      TAOS_RETURN(code);
×
850
    }
851
    mInfo("compact:%d, add drop compact action", pCompact->compactId);
4!
852
  }
853

854
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
4!
855
    mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
×
856
    mndTransDrop(pTrans);
×
857
    TAOS_RETURN(code);
×
858
  }
859

860
  mndTransDrop(pTrans);
4✔
861
  return 0;
4✔
862
}
863

864
static void mndCompactPullup(SMnode *pMnode) {
3,788✔
865
  int32_t code = 0;
3,788✔
866
  SSdb   *pSdb = pMnode->pSdb;
3,788✔
867
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t));
3,788✔
868
  if (pArray == NULL) return;
3,788!
869

870
  void *pIter = NULL;
3,788✔
871
  while (1) {
8✔
872
    SCompactObj *pCompact = NULL;
3,796✔
873
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
3,796✔
874
    if (pIter == NULL) break;
3,796✔
875
    if (taosArrayPush(pArray, &pCompact->compactId) == NULL) {
16!
876
      mError("failed to push compact id:%d into array, but continue pull up", pCompact->compactId);
×
877
    }
878
    sdbRelease(pSdb, pCompact);
8✔
879
  }
880

881
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
3,796✔
882
    mInfo("begin to pull up");
8!
883
    int32_t     *pCompactId = taosArrayGet(pArray, i);
8✔
884
    SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);
8✔
885
    if (pCompact != NULL) {
8!
886
      mInfo("compact:%d, begin to pull up", pCompact->compactId);
8!
887
      mndCompactSendProgressReq(pMnode, pCompact);
8✔
888
      if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
8!
889
        mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
×
890
      }
891
      mndReleaseCompact(pMnode, pCompact);
8✔
892
    }
893
  }
894
  taosArrayDestroy(pArray);
3,788✔
895
}
896
#ifdef TD_ENTERPRISE
897
static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
×
898
  if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
×
899
    return 0;
×
900
  }
901

902
  SName   name = {0};
×
903
  int32_t sqlLen = 0;
×
904
  char    sql[256] = {0};
×
905
  char    skeyStr[40] = {0};
×
906
  char    ekeyStr[40] = {0};
×
907
  char   *pDbName = pDb->name;
×
908

909
  if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) {
×
910
    pDbName = name.dbname;
×
911
  }
912

913
  if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 &&
×
914
      taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) {
×
915
    sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr);
×
916
  } else {
917
    sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
×
918
                       tw->ekey);
919
  }
920
  auditRecord(NULL, pMnode->clusterId, "autoCompactDB", name.dbname, "", sql, sqlLen);
×
921

922
  return 0;
×
923
}
924

925
extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds);
926
static int32_t mndCompactDispatch(SRpcMsg *pReq) {
3,788✔
927
  int32_t code = 0;
3,788✔
928
  SMnode *pMnode = pReq->info.node;
3,788✔
929
  SSdb   *pSdb = pMnode->pSdb;
3,788✔
930
  int64_t curMs = taosGetTimestampMs();
3,788✔
931
  int64_t curMin = curMs / 60000LL;
3,788✔
932

933
  void   *pIter = NULL;
3,788✔
934
  SDbObj *pDb = NULL;
3,788✔
935
  while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
9,948✔
936
    if (pDb->cfg.compactInterval <= 0) {
6,160!
937
      mDebug("db:%p,%s, compact interval is %dm, skip", pDb, pDb->name, pDb->cfg.compactInterval);
6,160✔
938
      sdbRelease(pSdb, pDb);
6,160✔
939
      continue;
6,160✔
940
    }
941

942
    // daysToKeep2 would be altered
943
    if (pDb->cfg.compactEndTime && (pDb->cfg.compactEndTime <= -pDb->cfg.daysToKeep2)) {
×
944
      mWarn("db:%p,%s, compact end time:%dm <= -keep2:%dm , skip", pDb, pDb->name, pDb->cfg.compactEndTime,
×
945
            -pDb->cfg.daysToKeep2);
946
      sdbRelease(pSdb, pDb);
×
947
      continue;
×
948
    }
949

950
    int64_t compactStartTime = pDb->cfg.compactStartTime ? pDb->cfg.compactStartTime : -pDb->cfg.daysToKeep2;
×
951
    int64_t compactEndTime = pDb->cfg.compactEndTime ? pDb->cfg.compactEndTime : -pDb->cfg.daysPerFile;
×
952

953
    if (compactStartTime >= compactEndTime) {
×
954
      mDebug("db:%p,%s, compact start time:%" PRIi64 "m >= end time:%" PRIi64 "m, skip", pDb, pDb->name,
×
955
             compactStartTime, compactEndTime);
956
      sdbRelease(pSdb, pDb);
×
957
      continue;
×
958
    }
959

960
    int64_t remainder = ((curMin - (int64_t)pDb->cfg.compactTimeOffset * 60LL) % pDb->cfg.compactInterval);
×
961
    if (remainder != 0) {
×
962
      mDebug("db:%p,%s, current time:%" PRIi64 "m is not divisible by compact interval:%dm, offset:%" PRIi8
×
963
             "h, remainder:%" PRIi64 "m, skip",
964
             pDb, pDb->name, curMin, pDb->cfg.compactInterval, pDb->cfg.compactTimeOffset, remainder);
965
      sdbRelease(pSdb, pDb);
×
966
      continue;
×
967
    }
968

969
    if ((pDb->compactStartTime / 60000LL) == curMin) {
×
970
      mDebug("db:%p:%s, compact has already been dispatched at %" PRIi64 "m(%" PRIi64 "ms), skip", pDb, pDb->name,
×
971
             curMin, pDb->compactStartTime);
972
      sdbRelease(pSdb, pDb);
×
973
      continue;
×
974
    }
975

976
    STimeWindow tw = {
×
977
        .skey = convertTimePrecision(curMs + compactStartTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision),
×
978
        .ekey = convertTimePrecision(curMs + compactEndTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)};
×
979

980
    if ((code = mndCompactDb(pMnode, NULL, pDb, tw, NULL)) == 0) {
×
981
      mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
982
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h",
983
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
984
            pDb->cfg.compactTimeOffset);
985
    } else {
986
      mWarn("db:%p,%s, failed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
987
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h, since %s",
988
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
989
            pDb->cfg.compactTimeOffset, tstrerror(code));
990
    }
991

992
    TAOS_UNUSED(mndCompactDispatchAudit(pMnode, pReq, pDb, &tw));
×
993

994
    sdbRelease(pSdb, pDb);
×
995
  }
996
  return 0;
3,788✔
997
}
998
#endif
999

1000
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
3,788✔
1001
#ifdef TD_ENTERPRISE
1002
  mTrace("start to process compact timer");
3,788✔
1003
  mndCompactPullup(pReq->info.node);
3,788✔
1004
  TAOS_UNUSED(mndCompactDispatch(pReq));
3,788✔
1005
#endif
1006
  return 0;
3,788✔
1007
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc