• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3798

31 Mar 2025 10:39AM UTC coverage: 9.424% (-20.9%) from 30.372%
#3798

push

travis-ci

happyguoxy
test:add test cases

21549 of 307601 branches covered (7.01%)

Branch coverage included in aggregate %.

36084 of 303967 relevant lines covered (11.87%)

58620.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndCompact.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "audit.h"
16
#include "mndCompact.h"
17
#include "mndCompactDetail.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndPrivilege.h"
21
#include "mndShow.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tmisce.h"
25
#include "tmsgcb.h"
26

27
#define MND_COMPACT_VER_NUMBER 1
28
#define MND_COMPACT_ID_LEN     11
29

30
static int32_t mndProcessCompactTimer(SRpcMsg *pReq);
31

32
int32_t mndInitCompact(SMnode *pMnode) {
×
33
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact);
×
34
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq);
×
35
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp);
×
36
  mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer);
×
37
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_COMPACT_RSP, mndTransProcessRsp);
×
38

39
  SSdbTable table = {
×
40
      .sdbType = SDB_COMPACT,
41
      .keyType = SDB_KEY_INT32,
42
      .encodeFp = (SdbEncodeFp)mndCompactActionEncode,
43
      .decodeFp = (SdbDecodeFp)mndCompactActionDecode,
44
      .insertFp = (SdbInsertFp)mndCompactActionInsert,
45
      .updateFp = (SdbUpdateFp)mndCompactActionUpdate,
46
      .deleteFp = (SdbDeleteFp)mndCompactActionDelete,
47
  };
48

49
  return sdbSetTable(pMnode->pSdb, table);
×
50
}
51

52
void mndCleanupCompact(SMnode *pMnode) { mDebug("mnd compact cleanup"); }
×
53

54
void tFreeCompactObj(SCompactObj *pCompact) {}
×
55

56
int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) {
×
57
  SEncoder encoder = {0};
×
58
  int32_t  code = 0;
×
59
  int32_t  lino;
60
  int32_t  tlen;
61
  tEncoderInit(&encoder, buf, bufLen);
×
62

63
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
64
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->compactId));
×
65
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
×
66
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
×
67

68
  tEndEncode(&encoder);
×
69

70
_exit:
×
71
  if (code) {
×
72
    tlen = code;
×
73
  } else {
74
    tlen = encoder.pos;
×
75
  }
76
  tEncoderClear(&encoder);
×
77
  return tlen;
×
78
}
79

80
int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) {
×
81
  int32_t  code = 0;
×
82
  int32_t  lino;
83
  SDecoder decoder = {0};
×
84
  tDecoderInit(&decoder, buf, bufLen);
×
85

86
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
87
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->compactId));
×
88
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
×
89
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
×
90

91
  tEndDecode(&decoder);
×
92

93
_exit:
×
94
  tDecoderClear(&decoder);
×
95
  return code;
×
96
}
97

98
SSdbRaw *mndCompactActionEncode(SCompactObj *pCompact) {
×
99
  int32_t code = 0;
×
100
  int32_t lino = 0;
×
101
  terrno = TSDB_CODE_SUCCESS;
×
102

103
  void    *buf = NULL;
×
104
  SSdbRaw *pRaw = NULL;
×
105

106
  int32_t tlen = tSerializeSCompactObj(NULL, 0, pCompact);
×
107
  if (tlen < 0) {
×
108
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
109
    goto OVER;
×
110
  }
111

112
  int32_t size = sizeof(int32_t) + tlen;
×
113
  pRaw = sdbAllocRaw(SDB_COMPACT, MND_COMPACT_VER_NUMBER, size);
×
114
  if (pRaw == NULL) {
×
115
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
116
    goto OVER;
×
117
  }
118

119
  buf = taosMemoryMalloc(tlen);
×
120
  if (buf == NULL) {
×
121
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
122
    goto OVER;
×
123
  }
124

125
  tlen = tSerializeSCompactObj(buf, tlen, pCompact);
×
126
  if (tlen < 0) {
×
127
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
128
    goto OVER;
×
129
  }
130

131
  int32_t dataPos = 0;
×
132
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
×
133
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
134
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
×
135

136
OVER:
×
137
  taosMemoryFreeClear(buf);
×
138
  if (terrno != TSDB_CODE_SUCCESS) {
×
139
    mError("compact:%" PRId32 ", failed to encode to raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
140
    sdbFreeRaw(pRaw);
×
141
    return NULL;
×
142
  }
143

144
  mTrace("compact:%" PRId32 ", encode to raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
×
145
  return pRaw;
×
146
}
147

148
SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) {
×
149
  int32_t      code = 0;
×
150
  int32_t      lino = 0;
×
151
  SSdbRow     *pRow = NULL;
×
152
  SCompactObj *pCompact = NULL;
×
153
  void        *buf = NULL;
×
154
  terrno = TSDB_CODE_SUCCESS;
×
155

156
  int8_t sver = 0;
×
157
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
×
158
    goto OVER;
×
159
  }
160

161
  if (sver != MND_COMPACT_VER_NUMBER) {
×
162
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
163
    mError("compact read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER);
×
164
    goto OVER;
×
165
  }
166

167
  pRow = sdbAllocRow(sizeof(SCompactObj));
×
168
  if (pRow == NULL) {
×
169
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
170
    goto OVER;
×
171
  }
172

173
  pCompact = sdbGetRowObj(pRow);
×
174
  if (pCompact == NULL) {
×
175
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
176
    goto OVER;
×
177
  }
178

179
  int32_t tlen;
180
  int32_t dataPos = 0;
×
181
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
×
182
  buf = taosMemoryMalloc(tlen + 1);
×
183
  if (buf == NULL) {
×
184
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
185
    goto OVER;
×
186
  }
187
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
188

189
  if ((terrno = tDeserializeSCompactObj(buf, tlen, pCompact)) < 0) {
×
190
    goto OVER;
×
191
  }
192

193
OVER:
×
194
  taosMemoryFreeClear(buf);
×
195
  if (terrno != TSDB_CODE_SUCCESS) {
×
196
    mError("compact:%" PRId32 ", failed to decode from raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
197
    taosMemoryFreeClear(pRow);
×
198
    return NULL;
×
199
  }
200

201
  mTrace("compact:%" PRId32 ", decode from raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
×
202
  return pRow;
×
203
}
204

205
int32_t mndCompactActionInsert(SSdb *pSdb, SCompactObj *pCompact) {
×
206
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
×
207
  return 0;
×
208
}
209

210
int32_t mndCompactActionDelete(SSdb *pSdb, SCompactObj *pCompact) {
×
211
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
×
212
  tFreeCompactObj(pCompact);
×
213
  return 0;
×
214
}
215

216
int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj *pNewCompact) {
×
217
  mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p", pOldCompact->compactId, pOldCompact,
×
218
         pNewCompact);
219

220
  return 0;
×
221
}
222

223
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
×
224
  SSdb        *pSdb = pMnode->pSdb;
×
225
  SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId);
×
226
  if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
227
    terrno = TSDB_CODE_SUCCESS;
×
228
  }
229
  return pCompact;
×
230
}
231

232
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
×
233
  SSdb *pSdb = pMnode->pSdb;
×
234
  sdbRelease(pSdb, pCompact);
×
235
  pCompact = NULL;
×
236
}
×
237

238
int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len) {
×
239
  int32_t      code = 0;
×
240
  SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
×
241
  if (pCompact == NULL) {
×
242
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
243
    if (terrno != 0) code = terrno;
×
244
    TAOS_RETURN(code);
×
245
  }
246

247
  tstrncpy(dbname, pCompact->dbname, len);
×
248
  mndReleaseCompact(pMnode, pCompact);
×
249
  TAOS_RETURN(code);
×
250
}
251

252
// compact db
253
int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SDbObj *pDb, SCompactDbRsp *rsp) {
×
254
  int32_t code = 0;
×
255
  pCompact->compactId = tGenIdPI32();
×
256

257
  tstrncpy(pCompact->dbname, pDb->name, sizeof(pCompact->dbname));
×
258

259
  pCompact->startTime = taosGetTimestampMs();
×
260

261
  SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact);
×
262
  if (pVgRaw == NULL) {
×
263
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
264
    if (terrno != 0) code = terrno;
×
265
    TAOS_RETURN(code);
×
266
  }
267
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
×
268
    sdbFreeRaw(pVgRaw);
×
269
    TAOS_RETURN(code);
×
270
  }
271

272
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
×
273
    sdbFreeRaw(pVgRaw);
×
274
    TAOS_RETURN(code);
×
275
  }
276

277
  rsp->compactId = pCompact->compactId;
×
278

279
  return 0;
×
280
}
281

282
// retrieve compact
283
int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
284
  SMnode      *pMnode = pReq->info.node;
×
285
  SSdb        *pSdb = pMnode->pSdb;
×
286
  int32_t      numOfRows = 0;
×
287
  SCompactObj *pCompact = NULL;
×
288
  char        *sep = NULL;
×
289
  SDbObj      *pDb = NULL;
×
290
  int32_t      code = 0;
×
291
  int32_t      lino = 0;
×
292

293
  if (strlen(pShow->db) > 0) {
×
294
    sep = strchr(pShow->db, '.');
×
295
    if (sep &&
×
296
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
297
      sep++;
×
298
    } else {
299
      pDb = mndAcquireDb(pMnode, pShow->db);
×
300
      if (pDb == NULL) return terrno;
×
301
    }
302
  }
303

304
  while (numOfRows < rows) {
×
305
    pShow->pIter = sdbFetch(pSdb, SDB_COMPACT, pShow->pIter, (void **)&pCompact);
×
306
    if (pShow->pIter == NULL) break;
×
307

308
    SColumnInfoData *pColInfo;
309
    SName            n;
310
    int32_t          cols = 0;
×
311

312
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
313

314
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
315
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->compactId, false), pCompact, &lino,
×
316
                        _OVER);
317

318
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
319
    if (pDb != NULL || !IS_SYS_DBNAME(pCompact->dbname)) {
×
320
      SName name = {0};
×
321
      TAOS_CHECK_GOTO(tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
×
322
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
×
323
    } else {
324
      tstrncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
×
325
    }
326
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
×
327
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pCompact, &lino, _OVER);
×
328

329
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
330
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->startTime, false), pCompact, &lino,
×
331
                        _OVER);
332

333
    numOfRows++;
×
334
    sdbRelease(pSdb, pCompact);
×
335
  }
336

337
_OVER:
×
338
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
339
  pShow->numOfRows += numOfRows;
×
340
  mndReleaseDb(pMnode, pDb);
×
341
  return numOfRows;
×
342
}
343

344
// kill compact
345
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId,
×
346
                                    int32_t dnodeid) {
347
  SVKillCompactReq req = {0};
×
348
  req.compactId = compactId;
×
349
  req.vgId = pVgroup->vgId;
×
350
  req.dnodeId = dnodeid;
×
351
  terrno = 0;
×
352

353
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
×
354
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
×
355
  if (contLen < 0) {
×
356
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
357
    return NULL;
×
358
  }
359
  contLen += sizeof(SMsgHead);
×
360

361
  void *pReq = taosMemoryMalloc(contLen);
×
362
  if (pReq == NULL) {
×
363
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
364
    return NULL;
×
365
  }
366

367
  SMsgHead *pHead = pReq;
×
368
  pHead->contLen = htonl(contLen);
×
369
  pHead->vgId = htonl(pVgroup->vgId);
×
370

371
  mTrace("vgId:%d, build compact vnode config req, contLen:%d", pVgroup->vgId, contLen);
×
372
  int32_t ret = 0;
×
373
  if ((ret = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
×
374
    terrno = ret;
×
375
    return NULL;
×
376
  }
377
  *pContLen = contLen;
×
378
  return pReq;
×
379
}
380

381
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId,
×
382
                                       int32_t dnodeid) {
383
  int32_t      code = 0;
×
384
  STransAction action = {0};
×
385

386
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
×
387
  if (pDnode == NULL) {
×
388
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
389
    if (terrno != 0) code = terrno;
×
390
    TAOS_RETURN(code);
×
391
  }
392
  action.epSet = mndGetDnodeEpset(pDnode);
×
393
  mndReleaseDnode(pMnode, pDnode);
×
394

395
  int32_t contLen = 0;
×
396
  void   *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid);
×
397
  if (pReq == NULL) {
×
398
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
399
    if (terrno != 0) code = terrno;
×
400
    TAOS_RETURN(code);
×
401
  }
402

403
  action.pCont = pReq;
×
404
  action.contLen = contLen;
×
405
  action.msgType = TDMT_VND_KILL_COMPACT;
×
406

407
  mTrace("trans:%d, kill compact msg len:%d", pTrans->id, contLen);
×
408

409
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
410
    taosMemoryFree(pReq);
×
411
    TAOS_RETURN(code);
×
412
  }
413

414
  return 0;
×
415
}
416

417
static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) {
×
418
  int32_t code = 0;
×
419
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact");
×
420
  if (pTrans == NULL) {
×
421
    mError("compact:%" PRId32 ", failed to drop since %s", pCompact->compactId, terrstr());
×
422
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
423
    if (terrno != 0) code = terrno;
×
424
    TAOS_RETURN(code);
×
425
  }
426
  mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId);
×
427

428
  mndTransSetDbName(pTrans, pCompact->dbname, NULL);
×
429

430
  SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
×
431
  if (pCommitRaw == NULL) {
×
432
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
433
    if (terrno != 0) code = terrno;
×
434
    mndTransDrop(pTrans);
×
435
    TAOS_RETURN(code);
×
436
  }
437
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
438
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
439
    mndTransDrop(pTrans);
×
440
    TAOS_RETURN(code);
×
441
  }
442
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
×
443
    mndTransDrop(pTrans);
×
444
    TAOS_RETURN(code);
×
445
  }
446

447
  void *pIter = NULL;
×
448
  while (1) {
×
449
    SCompactDetailObj *pDetail = NULL;
×
450
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
451
    if (pIter == NULL) break;
×
452

453
    if (pDetail->compactId == pCompact->compactId) {
×
454
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
×
455
      if (pVgroup == NULL) {
×
456
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
457
        sdbCancelFetch(pMnode->pSdb, pIter);
×
458
        sdbRelease(pMnode->pSdb, pDetail);
×
459
        mndTransDrop(pTrans);
×
460
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
461
        if (terrno != 0) code = terrno;
×
462
        TAOS_RETURN(code);
×
463
      }
464

465
      if ((code = mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId)) != 0) {
×
466
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
467
        sdbCancelFetch(pMnode->pSdb, pIter);
×
468
        sdbRelease(pMnode->pSdb, pDetail);
×
469
        mndTransDrop(pTrans);
×
470
        TAOS_RETURN(code);
×
471
      }
472

473
      mndReleaseVgroup(pMnode, pVgroup);
×
474

475
      /*
476
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
477
      if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
478
        mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
479
        mndTransDrop(pTrans);
480
        return -1;
481
      }
482
      sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
483
      */
484
    }
485

486
    sdbRelease(pMnode->pSdb, pDetail);
×
487
  }
488

489
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
490
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
491
    mndTransDrop(pTrans);
×
492
    TAOS_RETURN(code);
×
493
  }
494

495
  mndTransDrop(pTrans);
×
496
  return 0;
×
497
}
498

499
int32_t mndProcessKillCompactReq(SRpcMsg *pReq) {
×
500
  int32_t         code = 0;
×
501
  int32_t         lino = 0;
×
502
  SKillCompactReq killCompactReq = {0};
×
503

504
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq)) != 0) {
×
505
    TAOS_RETURN(code);
×
506
  }
507

508
  mInfo("start to kill compact:%" PRId32, killCompactReq.compactId);
×
509

510
  SMnode      *pMnode = pReq->info.node;
×
511
  SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId);
×
512
  if (pCompact == NULL) {
×
513
    code = TSDB_CODE_MND_INVALID_COMPACT_ID;
×
514
    tFreeSKillCompactReq(&killCompactReq);
×
515
    TAOS_RETURN(code);
×
516
  }
517

518
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB), &lino, _OVER);
×
519

520
  TAOS_CHECK_GOTO(mndKillCompact(pMnode, pReq, pCompact), &lino, _OVER);
×
521

522
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
523

524
  char    obj[TSDB_INT32_ID_LEN] = {0};
×
525
  int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pCompact->compactId);
×
526
  if ((uint32_t)nBytes < sizeof(obj)) {
×
527
    auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql,
×
528
                killCompactReq.sqlLen);
529
  } else {
530
    mError("compact:%" PRId32 " failed to audit since %s", pCompact->compactId, tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
531
  }
532
_OVER:
×
533
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
534
    mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr());
×
535
  }
536

537
  tFreeSKillCompactReq(&killCompactReq);
×
538
  mndReleaseCompact(pMnode, pCompact);
×
539

540
  TAOS_RETURN(code);
×
541
}
542

543
// update progress
544
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId,
×
545
                                        SQueryCompactProgressRsp *rsp) {
546
  int32_t code = 0;
×
547

548
  void *pIter = NULL;
×
549
  while (1) {
×
550
    SCompactDetailObj *pDetail = NULL;
×
551
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
552
    if (pIter == NULL) break;
×
553

554
    if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
×
555
      pDetail->newNumberFileset = rsp->numberFileset;
×
556
      pDetail->newFinished = rsp->finished;
×
557
      pDetail->progress = rsp->progress;
×
558
      pDetail->remainingTime = rsp->remainingTime;
×
559

560
      sdbCancelFetch(pMnode->pSdb, pIter);
×
561
      sdbRelease(pMnode->pSdb, pDetail);
×
562

563
      TAOS_RETURN(code);
×
564
    }
565

566
    sdbRelease(pMnode->pSdb, pDetail);
×
567
  }
568

569
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
×
570
}
571

572
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) {
×
573
  int32_t                  code = 0;
×
574
  SQueryCompactProgressRsp req = {0};
×
575
  if (pReq->code != 0) {
×
576
    mError("received wrong compact response, req code is %s", tstrerror(pReq->code));
×
577
    TAOS_RETURN(pReq->code);
×
578
  }
579
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
×
580
  if (code != 0) {
×
581
    mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
582
           pReq->contLen);
583
    TAOS_RETURN(code);
×
584
  }
585

586
  mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
×
587
         req.vgId, req.dnodeId, req.numberFileset, req.finished);
588

589
  SMnode *pMnode = pReq->info.node;
×
590

591
  code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
×
592
  if (code != 0) {
×
593
    mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
×
594
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
595
    TAOS_RETURN(code);
×
596
  }
597

598
  TAOS_RETURN(code);
×
599
}
600

601
// timer
602
void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
×
603
  void *pIter = NULL;
×
604

605
  while (1) {
×
606
    SCompactDetailObj *pDetail = NULL;
×
607
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
608
    if (pIter == NULL) break;
×
609

610
    if (pDetail->compactId == pCompact->compactId) {
×
611
      SEpSet epSet = {0};
×
612

613
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
×
614
      if (pDnode == NULL) break;
×
615
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
×
616
        sdbRelease(pMnode->pSdb, pDetail);
×
617
        continue;
×
618
      }
619
      mndReleaseDnode(pMnode, pDnode);
×
620

621
      SQueryCompactProgressReq req;
622
      req.compactId = pDetail->compactId;
×
623
      req.vgId = pDetail->vgId;
×
624
      req.dnodeId = pDetail->dnodeId;
×
625

626
      int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
×
627
      if (contLen < 0) {
×
628
        sdbRelease(pMnode->pSdb, pDetail);
×
629
        continue;
×
630
      }
631

632
      contLen += sizeof(SMsgHead);
×
633

634
      SMsgHead *pHead = rpcMallocCont(contLen);
×
635
      if (pHead == NULL) {
×
636
        sdbRelease(pMnode->pSdb, pDetail);
×
637
        continue;
×
638
      }
639

640
      pHead->contLen = htonl(contLen);
×
641
      pHead->vgId = htonl(pDetail->vgId);
×
642

643
      if (tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
×
644
        sdbRelease(pMnode->pSdb, pDetail);
×
645
        continue;
×
646
      }
647

648
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen};
×
649

650
      rpcMsg.pCont = pHead;
×
651

652
      char    detail[1024] = {0};
×
653
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
×
654
                              TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS), epSet.numOfEps, epSet.inUse);
×
655
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
×
656
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
×
657
      }
658

659
      mDebug("compact:%d, send update progress msg to %s", pDetail->compactId, detail);
×
660

661
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
×
662
        sdbRelease(pMnode->pSdb, pDetail);
×
663
        continue;
×
664
      }
665
    }
666

667
    sdbRelease(pMnode->pSdb, pDetail);
×
668
  }
669
}
×
670

671
static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
×
672
  int32_t code = 0;
×
673
  bool    needSave = false;
×
674
  void   *pIter = NULL;
×
675
  while (1) {
×
676
    SCompactDetailObj *pDetail = NULL;
×
677
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
678
    if (pIter == NULL) break;
×
679

680
    if (pDetail->compactId == compactId) {
×
681
      mDebug(
×
682
          "compact:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
683
          "newNumberFileset:%d, newFinished:%d",
684
          pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
685
          pDetail->newNumberFileset, pDetail->newFinished);
686

687
      // these 2 number will jump back after dnode restart, so < is not used here
688
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
×
689
        needSave = true;
×
690
    }
691

692
    sdbRelease(pMnode->pSdb, pDetail);
×
693
  }
694

695
  char dbname[TSDB_TABLE_FNAME_LEN] = {0};
×
696
  TAOS_CHECK_RETURN(mndCompactGetDbName(pMnode, compactId, dbname, TSDB_TABLE_FNAME_LEN));
×
697

698
  if (!mndDbIsExist(pMnode, dbname)) {
×
699
    needSave = true;
×
700
    mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, dbname);
×
701
  }
702

703
  if (!needSave) {
×
704
    mDebug("compact:%" PRId32 ", no need to save", compactId);
×
705
    TAOS_RETURN(code);
×
706
  }
707

708
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress");
×
709
  if (pTrans == NULL) {
×
710
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
711
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
712
    if (terrno != 0) code = terrno;
×
713
    TAOS_RETURN(code);
×
714
  }
715
  mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
×
716

717
  mndTransSetDbName(pTrans, dbname, NULL);
×
718

719
  pIter = NULL;
×
720
  while (1) {
×
721
    SCompactDetailObj *pDetail = NULL;
×
722
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
723
    if (pIter == NULL) break;
×
724

725
    if (pDetail->compactId == compactId) {
×
726
      mInfo(
×
727
          "compact:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
728
          "newNumberFileset:%d, newFinished:%d",
729
          pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
730
          pDetail->newNumberFileset, pDetail->newFinished);
731

732
      pDetail->numberFileset = pDetail->newNumberFileset;
×
733
      pDetail->finished = pDetail->newFinished;
×
734

735
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
×
736
      if (pCommitRaw == NULL) {
×
737
        sdbCancelFetch(pMnode->pSdb, pIter);
×
738
        sdbRelease(pMnode->pSdb, pDetail);
×
739
        mndTransDrop(pTrans);
×
740
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
741
        if (terrno != 0) code = terrno;
×
742
        TAOS_RETURN(code);
×
743
      }
744
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
745
        mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr());
×
746
        sdbCancelFetch(pMnode->pSdb, pIter);
×
747
        sdbRelease(pMnode->pSdb, pDetail);
×
748
        mndTransDrop(pTrans);
×
749
        TAOS_RETURN(code);
×
750
      }
751
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
×
752
        sdbCancelFetch(pMnode->pSdb, pIter);
×
753
        sdbRelease(pMnode->pSdb, pDetail);
×
754
        mndTransDrop(pTrans);
×
755
        TAOS_RETURN(code);
×
756
      }
757
    }
758

759
    sdbRelease(pMnode->pSdb, pDetail);
×
760
  }
761

762
  bool allFinished = true;
×
763
  pIter = NULL;
×
764
  while (1) {
×
765
    SCompactDetailObj *pDetail = NULL;
×
766
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
767
    if (pIter == NULL) break;
×
768

769
    if (pDetail->compactId == compactId) {
×
770
      mInfo("compact:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
×
771
            pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
772

773
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
×
774
        allFinished = false;
×
775
        sdbCancelFetch(pMnode->pSdb, pIter);
×
776
        sdbRelease(pMnode->pSdb, pDetail);
×
777
        break;
×
778
      }
779
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
×
780
        allFinished = false;
×
781
        sdbCancelFetch(pMnode->pSdb, pIter);
×
782
        sdbRelease(pMnode->pSdb, pDetail);
×
783
        break;
×
784
      }
785
    }
786

787
    sdbRelease(pMnode->pSdb, pDetail);
×
788
  }
789

790
  if (!mndDbIsExist(pMnode, dbname)) {
×
791
    allFinished = true;
×
792
    mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, dbname);
×
793
  }
794

795
  if (allFinished) {
×
796
    mInfo("compact:%d, all finished", compactId);
×
797
    pIter = NULL;
×
798
    while (1) {
×
799
      SCompactDetailObj *pDetail = NULL;
×
800
      pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
801
      if (pIter == NULL) break;
×
802

803
      if (pDetail->compactId == compactId) {
×
804
        SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
×
805
        if (pCommitRaw == NULL) {
×
806
          mndTransDrop(pTrans);
×
807
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
808
          if (terrno != 0) code = terrno;
×
809
          TAOS_RETURN(code);
×
810
        }
811
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
812
          mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id,
×
813
                 terrstr());
814
          sdbCancelFetch(pMnode->pSdb, pIter);
×
815
          sdbRelease(pMnode->pSdb, pDetail);
×
816
          mndTransDrop(pTrans);
×
817
          TAOS_RETURN(code);
×
818
        }
819
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
×
820
          sdbCancelFetch(pMnode->pSdb, pIter);
×
821
          sdbRelease(pMnode->pSdb, pDetail);
×
822
          mndTransDrop(pTrans);
×
823
          TAOS_RETURN(code);
×
824
        }
825
        mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId);
×
826
      }
827

828
      sdbRelease(pMnode->pSdb, pDetail);
×
829
    }
830

831
    SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
×
832
    if (pCompact == NULL) {
×
833
      mndTransDrop(pTrans);
×
834
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
835
      if (terrno != 0) code = terrno;
×
836
      TAOS_RETURN(code);
×
837
    }
838
    SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
×
839
    mndReleaseCompact(pMnode, pCompact);
×
840
    if (pCommitRaw == NULL) {
×
841
      mndTransDrop(pTrans);
×
842
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
843
      if (terrno != 0) code = terrno;
×
844
      TAOS_RETURN(code);
×
845
    }
846
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
847
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
848
      mndTransDrop(pTrans);
×
849
      TAOS_RETURN(code);
×
850
    }
851
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
×
852
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
853
      mndTransDrop(pTrans);
×
854
      TAOS_RETURN(code);
×
855
    }
856
    mInfo("compact:%d, add drop compact action", pCompact->compactId);
×
857
  }
858

859
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
860
    mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
×
861
    mndTransDrop(pTrans);
×
862
    TAOS_RETURN(code);
×
863
  }
864

865
  mndTransDrop(pTrans);
×
866
  return 0;
×
867
}
868

869
static void mndCompactPullup(SMnode *pMnode) {
×
870
  int32_t code = 0;
×
871
  SSdb   *pSdb = pMnode->pSdb;
×
872
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t));
×
873
  if (pArray == NULL) return;
×
874

875
  void *pIter = NULL;
×
876
  while (1) {
×
877
    SCompactObj *pCompact = NULL;
×
878
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
×
879
    if (pIter == NULL) break;
×
880
    if (taosArrayPush(pArray, &pCompact->compactId) == NULL) {
×
881
      mError("failed to push compact id:%d into array, but continue pull up", pCompact->compactId);
×
882
    }
883
    sdbRelease(pSdb, pCompact);
×
884
  }
885

886
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
887
    mInfo("begin to pull up");
×
888
    int32_t     *pCompactId = taosArrayGet(pArray, i);
×
889
    SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);
×
890
    if (pCompact != NULL) {
×
891
      mInfo("compact:%d, begin to pull up", pCompact->compactId);
×
892
      mndCompactSendProgressReq(pMnode, pCompact);
×
893
      if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
×
894
        mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
×
895
      }
896
      mndReleaseCompact(pMnode, pCompact);
×
897
    }
898
  }
899
  taosArrayDestroy(pArray);
×
900
}
901
#ifdef TD_ENTERPRISE
902
static int32_t mndCompactDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
×
903
  if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
×
904
    return 0;
×
905
  }
906

907
  SName   name = {0};
×
908
  int32_t sqlLen = 0;
×
909
  char    sql[256] = {0};
×
910
  char    skeyStr[40] = {0};
×
911
  char    ekeyStr[40] = {0};
×
912
  char   *pDbName = pDb->name;
×
913

914
  if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) {
×
915
    pDbName = name.dbname;
×
916
  }
917

918
  if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 &&
×
919
      taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) {
×
920
    sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr);
×
921
  } else {
922
    sqlLen = tsnprintf(sql, sizeof(sql), "compact db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
×
923
                       tw->ekey);
924
  }
925
  auditRecord(NULL, pMnode->clusterId, "autoCompactDB", name.dbname, "", sql, sqlLen);
×
926

927
  return 0;
×
928
}
929

930
extern int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds,
931
                            bool metaOnly);
932
static int32_t mndCompactDispatch(SRpcMsg *pReq) {
×
933
  int32_t code = 0;
×
934
  SMnode *pMnode = pReq->info.node;
×
935
  SSdb   *pSdb = pMnode->pSdb;
×
936
  int64_t curMs = taosGetTimestampMs();
×
937
  int64_t curMin = curMs / 60000LL;
×
938

939
  void   *pIter = NULL;
×
940
  SDbObj *pDb = NULL;
×
941
  while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
×
942
    if (pDb->cfg.compactInterval <= 0) {
×
943
      mDebug("db:%p,%s, compact interval is %dm, skip", pDb, pDb->name, pDb->cfg.compactInterval);
×
944
      sdbRelease(pSdb, pDb);
×
945
      continue;
×
946
    }
947

948
    // daysToKeep2 would be altered
949
    if (pDb->cfg.compactEndTime && (pDb->cfg.compactEndTime <= -pDb->cfg.daysToKeep2)) {
×
950
      mWarn("db:%p,%s, compact end time:%dm <= -keep2:%dm , skip", pDb, pDb->name, pDb->cfg.compactEndTime,
×
951
            -pDb->cfg.daysToKeep2);
952
      sdbRelease(pSdb, pDb);
×
953
      continue;
×
954
    }
955

956
    int64_t compactStartTime = pDb->cfg.compactStartTime ? pDb->cfg.compactStartTime : -pDb->cfg.daysToKeep2;
×
957
    int64_t compactEndTime = pDb->cfg.compactEndTime ? pDb->cfg.compactEndTime : -pDb->cfg.daysPerFile;
×
958

959
    if (compactStartTime >= compactEndTime) {
×
960
      mDebug("db:%p,%s, compact start time:%" PRIi64 "m >= end time:%" PRIi64 "m, skip", pDb, pDb->name,
×
961
             compactStartTime, compactEndTime);
962
      sdbRelease(pSdb, pDb);
×
963
      continue;
×
964
    }
965

966
    int64_t remainder = ((curMin - (int64_t)pDb->cfg.compactTimeOffset * 60LL) % pDb->cfg.compactInterval);
×
967
    if (remainder != 0) {
×
968
      mDebug("db:%p,%s, current time:%" PRIi64 "m is not divisible by compact interval:%dm, offset:%" PRIi8
×
969
             "h, remainder:%" PRIi64 "m, skip",
970
             pDb, pDb->name, curMin, pDb->cfg.compactInterval, pDb->cfg.compactTimeOffset, remainder);
971
      sdbRelease(pSdb, pDb);
×
972
      continue;
×
973
    }
974

975
    if ((pDb->compactStartTime / 60000LL) == curMin) {
×
976
      mDebug("db:%p:%s, compact has already been dispatched at %" PRIi64 "m(%" PRIi64 "ms), skip", pDb, pDb->name,
×
977
             curMin, pDb->compactStartTime);
978
      sdbRelease(pSdb, pDb);
×
979
      continue;
×
980
    }
981

982
    STimeWindow tw = {
×
983
        .skey = convertTimePrecision(curMs + compactStartTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision),
×
984
        .ekey = convertTimePrecision(curMs + compactEndTime * 60000LL, TSDB_TIME_PRECISION_MILLI, pDb->cfg.precision)};
×
985

986
    if ((code = mndCompactDb(pMnode, NULL, pDb, tw, NULL, false)) == 0) {
×
987
      mInfo("db:%p,%s, succeed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
988
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h",
989
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
990
            pDb->cfg.compactTimeOffset);
991
    } else {
992
      mWarn("db:%p,%s, failed to dispatch compact with range:[%" PRIi64 ",%" PRIi64 "], interval:%dm, start:%" PRIi64
×
993
            "m, end:%" PRIi64 "m, offset:%" PRIi8 "h, since %s",
994
            pDb, pDb->name, tw.skey, tw.ekey, pDb->cfg.compactInterval, compactStartTime, compactEndTime,
995
            pDb->cfg.compactTimeOffset, tstrerror(code));
996
    }
997

998
    TAOS_UNUSED(mndCompactDispatchAudit(pMnode, pReq, pDb, &tw));
×
999

1000
    sdbRelease(pSdb, pDb);
×
1001
  }
1002
  return 0;
×
1003
}
1004
#endif
1005

1006
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
×
1007
#ifdef TD_ENTERPRISE
1008
  mTrace("start to process compact timer");
×
1009
  mndCompactPullup(pReq->info.node);
×
1010
  TAOS_UNUSED(mndCompactDispatch(pReq));
×
1011
#endif
1012
  return 0;
×
1013
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc