• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4791

13 Oct 2025 06:50AM UTC coverage: 57.628% (-0.8%) from 58.476%
#4791

push

travis-ci

web-flow
Merge pull request #33213 from taosdata/fix/huoh/timemoe_model_directory

fix: fix tdgpt timemoe model directory

136628 of 303332 branches covered (45.04%)

Branch coverage included in aggregate %.

208121 of 294900 relevant lines covered (70.57%)

4250784.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.94
/source/dnode/mnode/impl/src/mndRetention.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndRetention.h"
16
#include "audit.h"
17
#include "mndCompact.h"
18
#include "mndCompactDetail.h"
19
#include "mndDb.h"
20
#include "mndDef.h"
21
#include "mndDnode.h"
22
#include "mndPrivilege.h"
23
#include "mndRetentionDetail.h"
24
#include "mndShow.h"
25
#include "mndTrans.h"
26
#include "mndVgroup.h"
27
#include "tmisce.h"
28
#include "tmsgcb.h"
29

30
#define MND_RETENTION_VER_NUMBER 1
31

32
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq);
33
static int32_t mndProcessQueryRetentionTimer(SRpcMsg *pReq);
34
static int32_t mndRetrieveRetention(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
35
static void    mndCancelRetrieveRetention(SMnode *pMnode, void *pIter);
36

37
/**
38
 * @brief mndInitRetention
39
 *  init retention module.
40
 *  - trim is equivalent to retention
41
 * @param pMnode
42
 * @return
43
 */
44

45
int32_t mndInitRetention(SMnode *pMnode) {
1,332✔
46
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_RETENTION, mndRetrieveRetention);
1,332✔
47
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_RETENTION, mndCancelRetrieveRetention);
1,332✔
48
  mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB_TIMER, mndProcessTrimDbTimer);
1,332✔
49
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRIM, mndProcessKillRetentionReq);  // trim is equivalent to retention
1,332✔
50
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_TRIM_PROGRESS_RSP, mndProcessQueryRetentionRsp);
1,332✔
51
  mndSetMsgHandle(pMnode, TDMT_MND_QUERY_TRIM_TIMER, mndProcessQueryRetentionTimer);
1,332✔
52
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_TRIM_RSP, mndTransProcessRsp);
1,332✔
53

54
  SSdbTable table = {
1,332✔
55
      .sdbType = SDB_RETENTION,
56
      .keyType = SDB_KEY_INT32,
57
      .encodeFp = (SdbEncodeFp)mndRetentionActionEncode,
58
      .decodeFp = (SdbDecodeFp)mndRetentionActionDecode,
59
      .insertFp = (SdbInsertFp)mndRetentionActionInsert,
60
      .updateFp = (SdbUpdateFp)mndRetentionActionUpdate,
61
      .deleteFp = (SdbDeleteFp)mndRetentionActionDelete,
62
  };
63

64
  return sdbSetTable(pMnode->pSdb, table);
1,332✔
65
}
66

67
void mndCleanupRetention(SMnode *pMnode) { mDebug("mnd retention cleanup"); }
1,332✔
68

69
void tFreeRetentionObj(SRetentionObj *pObj) {}
×
70

71
int32_t tSerializeSRetentionObj(void *buf, int32_t bufLen, const SRetentionObj *pObj) {
×
72
  return tSerializeSCompactObj(buf, bufLen, (const SCompactObj *)pObj);
×
73
}
74

75
int32_t tDeserializeSRetentionObj(void *buf, int32_t bufLen, SRetentionObj *pObj) {
×
76
  return tDeserializeSCompactObj(buf, bufLen, (SCompactObj *)pObj);
×
77
}
78

79
SSdbRaw *mndRetentionActionEncode(SRetentionObj *pObj) {
49✔
80
  int32_t code = 0;
49✔
81
  int32_t lino = 0;
49✔
82
  terrno = TSDB_CODE_SUCCESS;
49✔
83

84
  void    *buf = NULL;
49✔
85
  SSdbRaw *pRaw = NULL;
49✔
86

87
  int32_t tlen = tSerializeSCompactObj(NULL, 0, pObj);
49✔
88
  if (tlen < 0) {
49!
89
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
90
    goto OVER;
×
91
  }
92

93
  int32_t size = sizeof(int32_t) + tlen;
49✔
94
  pRaw = sdbAllocRaw(SDB_RETENTION, MND_RETENTION_VER_NUMBER, size);
49✔
95
  if (pRaw == NULL) {
49!
96
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
97
    goto OVER;
×
98
  }
99

100
  buf = taosMemoryMalloc(tlen);
49!
101
  if (buf == NULL) {
49!
102
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
103
    goto OVER;
×
104
  }
105

106
  tlen = tSerializeSCompactObj(buf, tlen, pObj);
49✔
107
  if (tlen < 0) {
49!
108
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
109
    goto OVER;
×
110
  }
111

112
  int32_t dataPos = 0;
49✔
113
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
49!
114
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
49!
115
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
49!
116

117
OVER:
49✔
118
  taosMemoryFreeClear(buf);
49!
119
  if (terrno != TSDB_CODE_SUCCESS) {
49!
120
    mError("retention:%" PRId32 ", failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
121
    sdbFreeRaw(pRaw);
×
122
    return NULL;
×
123
  }
124

125
  mTrace("retention:%" PRId32 ", encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
49!
126
  return pRaw;
49✔
127
}
128

129
SSdbRow *mndRetentionActionDecode(SSdbRaw *pRaw) {
41✔
130
  int32_t        code = 0;
41✔
131
  int32_t        lino = 0;
41✔
132
  SSdbRow       *pRow = NULL;
41✔
133
  SRetentionObj *pObj = NULL;
41✔
134
  void          *buf = NULL;
41✔
135
  terrno = TSDB_CODE_SUCCESS;
41✔
136

137
  int8_t sver = 0;
41✔
138
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
41!
139
    goto OVER;
×
140
  }
141

142
  if (sver != MND_RETENTION_VER_NUMBER) {
41!
143
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
144
    mError("retention read invalid ver, data ver: %d, curr ver: %d", sver, MND_RETENTION_VER_NUMBER);
×
145
    goto OVER;
×
146
  }
147

148
  pRow = sdbAllocRow(sizeof(SRetentionObj));
41✔
149
  if (pRow == NULL) {
41!
150
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
151
    goto OVER;
×
152
  }
153

154
  pObj = sdbGetRowObj(pRow);
41✔
155
  if (pObj == NULL) {
41!
156
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
157
    goto OVER;
×
158
  }
159

160
  int32_t tlen;
161
  int32_t dataPos = 0;
41✔
162
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
41!
163
  buf = taosMemoryMalloc(tlen + 1);
41!
164
  if (buf == NULL) {
41!
165
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
166
    goto OVER;
×
167
  }
168
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
41!
169

170
  if ((terrno = tDeserializeSCompactObj(buf, tlen, pObj)) < 0) {
41!
171
    goto OVER;
×
172
  }
173

174
OVER:
41✔
175
  taosMemoryFreeClear(buf);
41!
176
  if (terrno != TSDB_CODE_SUCCESS) {
41!
177
    mError("retention:%" PRId32 ", failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
×
178
    taosMemoryFreeClear(pRow);
×
179
    return NULL;
×
180
  }
181

182
  mTrace("retention:%" PRId32 ", decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
41!
183
  return pRow;
41✔
184
}
185

186
int32_t mndRetentionActionInsert(SSdb *pSdb, SRetentionObj *pObj) {
23✔
187
  mTrace("retention:%" PRId32 ", perform insert action", pObj->id);
23!
188
  return 0;
23✔
189
}
190

191
int32_t mndRetentionActionDelete(SSdb *pSdb, SRetentionObj *pObj) {
41✔
192
  mTrace("retention:%" PRId32 ", perform delete action", pObj->id);
41!
193
  tFreeCompactObj(pObj);
41✔
194
  return 0;
41✔
195
}
196

197
int32_t mndRetentionActionUpdate(SSdb *pSdb, SRetentionObj *pOldObj, SRetentionObj *pNewObj) {
×
198
  mTrace("retention:%" PRId32 ", perform update action, old row:%p new row:%p", pOldObj->id, pOldObj, pNewObj);
×
199

200
  return 0;
×
201
}
202

203
SRetentionObj *mndAcquireRetention(SMnode *pMnode, int32_t id) {
88✔
204
  SSdb          *pSdb = pMnode->pSdb;
88✔
205
  SRetentionObj *pObj = sdbAcquire(pSdb, SDB_RETENTION, &id);
88✔
206
  if (pObj == NULL && (terrno != TSDB_CODE_SDB_OBJ_NOT_THERE && terrno != TSDB_CODE_SDB_OBJ_CREATING &&
88!
207
                       terrno != TSDB_CODE_SDB_OBJ_DROPPING)) {
×
208
    terrno = TSDB_CODE_APP_ERROR;
×
209
    mError("retention:%" PRId32 ", failed to acquire retention since %s", id, terrstr());
×
210
  }
211
  return pObj;
88✔
212
}
213

214
void mndReleaseRetention(SMnode *pMnode, SRetentionObj *pObj) {
88✔
215
  SSdb *pSdb = pMnode->pSdb;
88✔
216
  sdbRelease(pSdb, pObj);
88✔
217
}
88✔
218

219
int32_t mndRetentionGetDbName(SMnode *pMnode, int32_t id, char *dbname, int32_t len) {
35✔
220
  int32_t        code = 0;
35✔
221
  SRetentionObj *pObj = mndAcquireRetention(pMnode, id);
35✔
222
  if (pObj == NULL) {
35!
223
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
224
    if (terrno != 0) code = terrno;
×
225
    TAOS_RETURN(code);
×
226
  }
227

228
  tstrncpy(dbname, pObj->dbname, len);
35✔
229
  mndReleaseRetention(pMnode, pObj);
35✔
230
  TAOS_RETURN(code);
35✔
231
}
232

233
int32_t mndAddRetentionToTrans(SMnode *pMnode, STrans *pTrans, SRetentionObj *pObj, SDbObj *pDb, STrimDbRsp *rsp) {
26✔
234
  int32_t code = 0;
26✔
235
  pObj->id = tGenIdPI32();
26✔
236

237
  tstrncpy(pObj->dbname, pDb->name, sizeof(pObj->dbname));
26✔
238

239
  pObj->startTime = taosGetTimestampMs();
26✔
240

241
  SSdbRaw *pVgRaw = mndRetentionActionEncode(pObj);
26✔
242
  if (pVgRaw == NULL) {
26!
243
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
244
    if (terrno != 0) code = terrno;
×
245
    TAOS_RETURN(code);
×
246
  }
247
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
26!
248
    sdbFreeRaw(pVgRaw);
×
249
    TAOS_RETURN(code);
×
250
  }
251

252
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
26!
253
    sdbFreeRaw(pVgRaw);
×
254
    TAOS_RETURN(code);
×
255
  }
256

257
  rsp->id = pObj->id;
26✔
258

259
  return 0;
26✔
260
}
261

262
static int32_t mndRetrieveRetention(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
55✔
263
  SMnode        *pMnode = pReq->info.node;
55✔
264
  SSdb          *pSdb = pMnode->pSdb;
55✔
265
  int32_t        numOfRows = 0;
55✔
266
  SRetentionObj *pObj = NULL;
55✔
267
  char          *sep = NULL;
55✔
268
  SDbObj        *pDb = NULL;
55✔
269
  int32_t        code = 0, lino = 0;
55✔
270
  char           tmpBuf[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
55✔
271

272
  if (strlen(pShow->db) > 0) {
55!
273
    sep = strchr(pShow->db, '.');
55✔
274
    if (sep &&
55!
275
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
55!
276
      sep++;
×
277
    } else {
278
      pDb = mndAcquireDb(pMnode, pShow->db);
55✔
279
      if (pDb == NULL) return terrno;
55!
280
    }
281
  }
282

283
  while (numOfRows < rows) {
95!
284
    pShow->pIter = sdbFetch(pSdb, SDB_RETENTION, pShow->pIter, (void **)&pObj);
95✔
285
    if (pShow->pIter == NULL) break;
95✔
286

287
    SColumnInfoData *pColInfo;
288
    int32_t          cols = 0;
40✔
289

290
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
40✔
291
    COL_DATA_SET_VAL_GOTO((const char *)&pObj->id, false, pObj, pShow->pIter, _OVER);
40!
292

293
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
40✔
294
    if (pDb != NULL || !IS_SYS_DBNAME(pObj->dbname)) {
80!
295
      SName name = {0};
40✔
296
      TAOS_CHECK_GOTO(tNameFromString(&name, pObj->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
40!
297
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
40✔
298
    } else {
299
      tstrncpy(varDataVal(tmpBuf), pObj->dbname, sizeof(tmpBuf) - VARSTR_HEADER_SIZE);
×
300
    }
301
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
40✔
302
    COL_DATA_SET_VAL_GOTO((const char *)tmpBuf, false, pObj, pShow->pIter, _OVER);
40!
303

304
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
40✔
305
    COL_DATA_SET_VAL_GOTO((const char *)&pObj->startTime, false, pObj, pShow->pIter, _OVER);
40!
306

307
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
40✔
308
    tstrncpy(varDataVal(tmpBuf), pObj->triggerType == TSDB_TRIGGER_MANUAL ? "manual" : "auto",
40!
309
             sizeof(tmpBuf) - VARSTR_HEADER_SIZE);
310
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
40✔
311
    COL_DATA_SET_VAL_GOTO((const char *)tmpBuf, false, pObj, pShow->pIter, _OVER);
40!
312

313
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
40✔
314
    char *optr = "trim";
40✔
315
    if (pObj->optrType == TSDB_OPTR_SSMIGRATE) {
40!
316
      optr = "ssmigrate";
×
317
    } else if (pObj->optrType == TSDB_OPTR_ROLLUP) {
40✔
318
      optr = "rollup";
31✔
319
    }
320
    tstrncpy(varDataVal(tmpBuf), optr, sizeof(tmpBuf) - VARSTR_HEADER_SIZE);
40✔
321
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
40✔
322
    COL_DATA_SET_VAL_GOTO((const char *)tmpBuf, false, pObj, pShow->pIter, _OVER);
40!
323

324
    sdbRelease(pSdb, pObj);
40✔
325
    ++numOfRows;
40✔
326
  }
327

328
_OVER:
×
329
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
55!
330
  pShow->numOfRows += numOfRows;
55✔
331
  mndReleaseDb(pMnode, pDb);
55✔
332
  return numOfRows;
55✔
333
}
334

335
static void mndCancelRetrieveRetention(SMnode *pMnode, void *pIter) {
×
336
  SSdb *pSdb = pMnode->pSdb;
×
337
  sdbCancelFetchByType(pSdb, pIter, SDB_RETENTION);
×
338
}
×
339

340
static void *mndBuildKillRetentionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t id, int32_t dnodeId) {
×
341
  SVKillRetentionReq req = {0};
×
342
  req.taskId = id;
×
343
  req.vgId = pVgroup->vgId;
×
344
  req.dnodeId = dnodeId;
×
345
  terrno = 0;
×
346

347
  mInfo("vgId:%d, build kill retention vnode config req", pVgroup->vgId);
×
348
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
×
349
  if (contLen < 0) {
×
350
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
351
    return NULL;
×
352
  }
353
  contLen += sizeof(SMsgHead);
×
354

355
  void *pReq = taosMemoryMalloc(contLen);
×
356
  if (pReq == NULL) {
×
357
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
358
    return NULL;
×
359
  }
360

361
  SMsgHead *pHead = pReq;
×
362
  pHead->contLen = htonl(contLen);
×
363
  pHead->vgId = htonl(pVgroup->vgId);
×
364

365
  mTrace("vgId:%d, build compact vnode config req, contLen:%d", pVgroup->vgId, contLen);
×
366
  int32_t ret = 0;
×
367
  if ((ret = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
×
368
    taosMemoryFreeClear(pReq);
×
369
    terrno = ret;
×
370
    return NULL;
×
371
  }
372
  *pContLen = contLen;
×
373
  return pReq;
×
374
}
375

376
static int32_t mndAddKillRetentionAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t id, int32_t dnodeId) {
×
377
  int32_t      code = 0;
×
378
  STransAction action = {0};
×
379

380
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
381
  if (pDnode == NULL) {
×
382
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
383
    if (terrno != 0) code = terrno;
×
384
    TAOS_RETURN(code);
×
385
  }
386
  action.epSet = mndGetDnodeEpset(pDnode);
×
387
  mndReleaseDnode(pMnode, pDnode);
×
388

389
  int32_t contLen = 0;
×
390
  void   *pReq = mndBuildKillRetentionReq(pMnode, pVgroup, &contLen, id, dnodeId);
×
391
  if (pReq == NULL) {
×
392
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
393
    if (terrno != 0) code = terrno;
×
394
    TAOS_RETURN(code);
×
395
  }
396

397
  action.pCont = pReq;
×
398
  action.contLen = contLen;
×
399
  action.msgType = TDMT_VND_KILL_TRIM;
×
400

401
  mTrace("trans:%d, kill retention msg len:%d", pTrans->id, contLen);
×
402

403
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
404
    taosMemoryFree(pReq);
×
405
    TAOS_RETURN(code);
×
406
  }
407

408
  return 0;
×
409
}
410

411
static int32_t mndKillRetention(SMnode *pMnode, SRpcMsg *pReq, SRetentionObj *pObj) {
×
412
  int32_t code = 0;
×
413
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-retention");
×
414
  if (pTrans == NULL) {
×
415
    mError("retention:%" PRId32 ", failed to drop since %s", pObj->id, terrstr());
×
416
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
417
    if (terrno != 0) code = terrno;
×
418
    TAOS_RETURN(code);
×
419
  }
420
  mInfo("trans:%d, used to kill retention:%" PRId32, pTrans->id, pObj->id);
×
421

422
  mndTransSetDbName(pTrans, pObj->dbname, NULL);
×
423

424
  SSdbRaw *pCommitRaw = mndRetentionActionEncode(pObj);
×
425
  if (pCommitRaw == NULL) {
×
426
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
427
    if (terrno != 0) code = terrno;
×
428
    mndTransDrop(pTrans);
×
429
    TAOS_RETURN(code);
×
430
  }
431
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
432
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
433
    mndTransDrop(pTrans);
×
434
    TAOS_RETURN(code);
×
435
  }
436
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
×
437
    mndTransDrop(pTrans);
×
438
    TAOS_RETURN(code);
×
439
  }
440

441
  void *pIter = NULL;
×
442
  while (1) {
×
443
    SCompactDetailObj *pDetail = NULL;
×
444
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
×
445
    if (pIter == NULL) break;
×
446

447
    if (pDetail->id == pObj->id) {
×
448
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
×
449
      if (pVgroup == NULL) {
×
450
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
451
        sdbCancelFetch(pMnode->pSdb, pIter);
×
452
        sdbRelease(pMnode->pSdb, pDetail);
×
453
        mndTransDrop(pTrans);
×
454
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
455
        if (terrno != 0) code = terrno;
×
456
        TAOS_RETURN(code);
×
457
      }
458

459
      if ((code = mndAddKillRetentionAction(pMnode, pTrans, pVgroup, pObj->id, pDetail->dnodeId)) != 0) {
×
460
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
461
        sdbCancelFetch(pMnode->pSdb, pIter);
×
462
        sdbRelease(pMnode->pSdb, pDetail);
×
463
        mndTransDrop(pTrans);
×
464
        TAOS_RETURN(code);
×
465
      }
466

467
      mndReleaseVgroup(pMnode, pVgroup);
×
468
    }
469

470
    sdbRelease(pMnode->pSdb, pDetail);
×
471
  }
472

473
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
474
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
475
    mndTransDrop(pTrans);
×
476
    TAOS_RETURN(code);
×
477
  }
478

479
  mndTransDrop(pTrans);
×
480
  return 0;
×
481
}
482

483
int32_t mndProcessKillRetentionReq(SRpcMsg *pReq) {
×
484
  int32_t           code = 0;
×
485
  int32_t           lino = 0;
×
486
  SKillRetentionReq req = {0};  // reuse SKillCompactReq
×
487

488
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
489
    TAOS_RETURN(code);
×
490
  }
491

492
  mInfo("start to kill retention:%" PRId32, req.id);
×
493

494
  SMnode        *pMnode = pReq->info.node;
×
495
  SRetentionObj *pObj = mndAcquireRetention(pMnode, req.id);
×
496
  if (pObj == NULL) {
×
497
    code = TSDB_CODE_MND_INVALID_TRIM_ID;
×
498
    tFreeSKillCompactReq(&req);
×
499
    TAOS_RETURN(code);
×
500
  }
501

502
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_TRIM_DB), &lino, _OVER);
×
503

504
  TAOS_CHECK_GOTO(mndKillRetention(pMnode, pReq, pObj), &lino, _OVER);
×
505

506
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
507

508
  char    obj[TSDB_INT32_ID_LEN] = {0};
×
509
  int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pObj->id);
×
510
  if ((uint32_t)nBytes < sizeof(obj)) {
×
511
    auditRecord(pReq, pMnode->clusterId, "killRetention", pObj->dbname, obj, req.sql, req.sqlLen);
×
512
  } else {
513
    mError("retention:%" PRId32 " failed to audit since %s", pObj->id, tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
514
  }
515
_OVER:
×
516
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
517
    mError("failed to kill retention %" PRId32 " since %s", req.id, terrstr());
×
518
  }
519

520
  tFreeSKillCompactReq((SKillCompactReq *)&req);
×
521
  mndReleaseRetention(pMnode, pObj);
×
522

523
  TAOS_RETURN(code);
×
524
}
525

526
// update progress
527
static int32_t mndUpdateRetentionProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t id, SQueryRetentionProgressRsp *rsp) {
88✔
528
  int32_t code = 0;
88✔
529

530
  void *pIter = NULL;
88✔
531
  while (1) {
136✔
532
    SRetentionDetailObj *pDetail = NULL;
224✔
533
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
224✔
534
    if (pIter == NULL) break;
224!
535

536
    if (pDetail->id == id && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
224!
537
      pDetail->newNumberFileset = rsp->numberFileset;
88✔
538
      pDetail->newFinished = rsp->finished;
88✔
539
      pDetail->progress = rsp->progress;
88✔
540
      pDetail->remainingTime = rsp->remainingTime;
88✔
541

542
      sdbCancelFetch(pMnode->pSdb, pIter);
88✔
543
      sdbRelease(pMnode->pSdb, pDetail);
88✔
544

545
      TAOS_RETURN(code);
88✔
546
    }
547

548
    sdbRelease(pMnode->pSdb, pDetail);
136✔
549
  }
550

551
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
×
552
}
553

554
int32_t mndProcessQueryRetentionRsp(SRpcMsg *pReq) {
88✔
555
  int32_t                    code = 0;
88✔
556
  SQueryRetentionProgressRsp req = {0};
88✔
557
  if (pReq->code != 0) {
88!
558
    mError("received wrong retention response, req code is %s", tstrerror(pReq->code));
×
559
    TAOS_RETURN(pReq->code);
×
560
  }
561
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
88✔
562
  if (code != 0) {
88!
563
    mError("failed to deserialize vnode-query-retention-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
564
           pReq->contLen);
565
    TAOS_RETURN(code);
×
566
  }
567

568
  mDebug("retention:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.id, req.vgId,
88!
569
         req.dnodeId, req.numberFileset, req.finished);
570

571
  SMnode *pMnode = pReq->info.node;
88✔
572

573
  code = mndUpdateRetentionProgress(pMnode, pReq, req.id, &req);
88✔
574
  if (code != 0) {
88!
575
    mError("retention:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.id,
×
576
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
577
    TAOS_RETURN(code);
×
578
  }
579

580
  TAOS_RETURN(code);
88✔
581
}
582

583
// timer
584
void mndRetentionSendProgressReq(SMnode *pMnode, SRetentionObj *pObj) {
35✔
585
  void *pIter = NULL;
35✔
586

587
  while (1) {
90✔
588
    SRetentionDetailObj *pDetail = NULL;
125✔
589
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
125✔
590
    if (pIter == NULL) break;
125✔
591

592
    if (pDetail->id == pObj->id) {
90!
593
      SEpSet epSet = {0};
90✔
594

595
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
90✔
596
      if (pDnode == NULL) break;
90!
597
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
90!
598
        sdbRelease(pMnode->pSdb, pDetail);
×
599
        continue;
2✔
600
      }
601
      mndReleaseDnode(pMnode, pDnode);
90✔
602

603
      SQueryRetentionProgressReq req;
604
      req.id = pDetail->id;
90✔
605
      req.vgId = pDetail->vgId;
90✔
606
      req.dnodeId = pDetail->dnodeId;
90✔
607

608
      int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
90✔
609
      if (contLen < 0) {
90!
610
        sdbRelease(pMnode->pSdb, pDetail);
×
611
        continue;
×
612
      }
613

614
      contLen += sizeof(SMsgHead);
90✔
615

616
      SMsgHead *pHead = rpcMallocCont(contLen);
90✔
617
      if (pHead == NULL) {
90!
618
        sdbRelease(pMnode->pSdb, pDetail);
×
619
        continue;
×
620
      }
621

622
      pHead->contLen = htonl(contLen);
90✔
623
      pHead->vgId = htonl(pDetail->vgId);
90✔
624

625
      if (tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
90!
626
        sdbRelease(pMnode->pSdb, pDetail);
×
627
        continue;
×
628
      }
629

630
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_TRIM_PROGRESS, .contLen = contLen};
90✔
631

632
      rpcMsg.pCont = pHead;
90✔
633

634
      char    detail[1024] = {0};
90✔
635
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
180!
636
                              TMSG_INFO(TDMT_VND_QUERY_TRIM_PROGRESS), epSet.numOfEps, epSet.inUse);
180✔
637
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
180✔
638
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
90✔
639
      }
640

641
      mDebug("retention:%d, send update progress msg to %s", pDetail->id, detail);
90!
642

643
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
90✔
644
        sdbRelease(pMnode->pSdb, pDetail);
2✔
645
        continue;
2✔
646
      }
647
    }
648

649
    sdbRelease(pMnode->pSdb, pDetail);
88✔
650
  }
651
}
35✔
652

653
static int32_t mndSaveRetentionProgress(SMnode *pMnode, int32_t id) {
35✔
654
  int32_t code = 0;
35✔
655
  bool    needSave = false;
35✔
656
  void   *pIter = NULL;
35✔
657
  while (1) {
90✔
658
    SRetentionDetailObj *pDetail = NULL;
125✔
659
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
125✔
660
    if (pIter == NULL) break;
125✔
661

662
    if (pDetail->id == id) {
90!
663
      mDebug(
90!
664
          "retention:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
665
          "newNumberFileset:%d, newFinished:%d",
666
          pDetail->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
667
          pDetail->newNumberFileset, pDetail->newFinished);
668

669
      // these 2 number will jump back after dnode restart, so < is not used here
670
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
90!
671
        needSave = true;
49✔
672
    }
673

674
    sdbRelease(pMnode->pSdb, pDetail);
90✔
675
  }
676

677
  char dbname[TSDB_TABLE_FNAME_LEN] = {0};
35✔
678
  TAOS_CHECK_RETURN(mndRetentionGetDbName(pMnode, id, dbname, TSDB_TABLE_FNAME_LEN));
35!
679

680
  if (!mndDbIsExist(pMnode, dbname)) {
35!
681
    needSave = true;
×
682
    mWarn("retention:%" PRId32 ", no db exist, set needSave:%s", id, dbname);
×
683
  }
684

685
  if (!needSave) {
35✔
686
    mDebug("retention:%" PRId32 ", no need to save", id);
17!
687
    TAOS_RETURN(code);
17✔
688
  }
689

690
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-retention-progress");
18✔
691
  if (pTrans == NULL) {
18!
692
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
693
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
694
    if (terrno != 0) code = terrno;
×
695
    TAOS_RETURN(code);
×
696
  }
697
  mInfo("retention:%d, trans:%d, used to update retention progress.", id, pTrans->id);
18!
698

699
  mndTransSetDbName(pTrans, dbname, NULL);
18✔
700

701
  pIter = NULL;
18✔
702
  while (1) {
56✔
703
    SRetentionDetailObj *pDetail = NULL;
74✔
704
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
74✔
705
    if (pIter == NULL) break;
74✔
706

707
    if (pDetail->id == id) {
56!
708
      mInfo(
56!
709
          "retention:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
710
          "newNumberFileset:%d, newFinished:%d",
711
          pDetail->id, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
712
          pDetail->newNumberFileset, pDetail->newFinished);
713

714
      pDetail->numberFileset = pDetail->newNumberFileset;
56✔
715
      pDetail->finished = pDetail->newFinished;
56✔
716

717
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
56✔
718
      if (pCommitRaw == NULL) {
56!
719
        sdbCancelFetch(pMnode->pSdb, pIter);
×
720
        sdbRelease(pMnode->pSdb, pDetail);
×
721
        mndTransDrop(pTrans);
×
722
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
723
        if (terrno != 0) code = terrno;
×
724
        TAOS_RETURN(code);
×
725
      }
726
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
56!
727
        mError("retention:%d, trans:%d, failed to append commit log since %s", pDetail->id, pTrans->id, terrstr());
×
728
        sdbCancelFetch(pMnode->pSdb, pIter);
×
729
        sdbRelease(pMnode->pSdb, pDetail);
×
730
        mndTransDrop(pTrans);
×
731
        TAOS_RETURN(code);
×
732
      }
733
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
56!
734
        sdbCancelFetch(pMnode->pSdb, pIter);
×
735
        sdbRelease(pMnode->pSdb, pDetail);
×
736
        mndTransDrop(pTrans);
×
737
        TAOS_RETURN(code);
×
738
      }
739
    }
740

741
    sdbRelease(pMnode->pSdb, pDetail);
56✔
742
  }
743

744
  bool allFinished = true;
18✔
745
  pIter = NULL;
18✔
746
  while (1) {
56✔
747
    SRetentionDetailObj *pDetail = NULL;
74✔
748
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
74✔
749
    if (pIter == NULL) break;
74✔
750

751
    if (pDetail->id == id) {
56!
752
      mInfo("retention:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
56!
753
            pDetail->id, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
754

755
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
56!
756
        allFinished = false;
×
757
        sdbCancelFetch(pMnode->pSdb, pIter);
×
758
        sdbRelease(pMnode->pSdb, pDetail);
×
759
        break;
×
760
      }
761
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
56!
762
        allFinished = false;
×
763
        sdbCancelFetch(pMnode->pSdb, pIter);
×
764
        sdbRelease(pMnode->pSdb, pDetail);
×
765
        break;
×
766
      }
767
    }
768

769
    sdbRelease(pMnode->pSdb, pDetail);
56✔
770
  }
771

772
  if (!mndDbIsExist(pMnode, dbname)) {
18!
773
    allFinished = true;
×
774
    mWarn("retention:%" PRId32 ", no db exist, set all finished:%s", id, dbname);
×
775
  }
776

777
  if (allFinished) {
18!
778
    mInfo("retention:%d, all finished", id);
18!
779
    pIter = NULL;
18✔
780
    while (1) {
56✔
781
      SRetentionDetailObj *pDetail = NULL;
74✔
782
      pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
74✔
783
      if (pIter == NULL) break;
74✔
784

785
      if (pDetail->id == id) {
56!
786
        SSdbRaw *pCommitRaw = mndRetentionDetailActionEncode(pDetail);
56✔
787
        if (pCommitRaw == NULL) {
56!
788
          mndTransDrop(pTrans);
×
789
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
790
          if (terrno != 0) code = terrno;
×
791
          TAOS_RETURN(code);
×
792
        }
793
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
56!
794
          mError("retention:%d, trans:%d, failed to append commit log since %s", pDetail->id, pTrans->id,
×
795
                 tstrerror(code));
796
          sdbCancelFetch(pMnode->pSdb, pIter);
×
797
          sdbRelease(pMnode->pSdb, pDetail);
×
798
          mndTransDrop(pTrans);
×
799
          TAOS_RETURN(code);
×
800
        }
801
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
56!
802
          sdbCancelFetch(pMnode->pSdb, pIter);
×
803
          sdbRelease(pMnode->pSdb, pDetail);
×
804
          mndTransDrop(pTrans);
×
805
          TAOS_RETURN(code);
×
806
        }
807
        mInfo("retention:%d, add drop compactdetail action", pDetail->compactDetailId);
56!
808
      }
809

810
      sdbRelease(pMnode->pSdb, pDetail);
56✔
811
    }
812

813
    SRetentionObj *pObj = mndAcquireRetention(pMnode, id);
18✔
814
    if (pObj == NULL) {
18!
815
      mndTransDrop(pTrans);
×
816
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
817
      if (terrno != 0) code = terrno;
×
818
      TAOS_RETURN(code);
×
819
    }
820
    SSdbRaw *pCommitRaw = mndRetentionActionEncode(pObj);
18✔
821
    mndReleaseRetention(pMnode, pObj);
18✔
822
    if (pCommitRaw == NULL) {
18!
823
      mndTransDrop(pTrans);
×
824
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
825
      if (terrno != 0) code = terrno;
×
826
      TAOS_RETURN(code);
×
827
    }
828
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
18!
829
      mError("retention:%d, trans:%d, failed to append commit log since %s", id, pTrans->id, tstrerror(code));
×
830
      mndTransDrop(pTrans);
×
831
      TAOS_RETURN(code);
×
832
    }
833
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
18!
834
      mError("retention:%d, trans:%d, failed to append commit log since %s", id, pTrans->id, tstrerror(code));
×
835
      mndTransDrop(pTrans);
×
836
      TAOS_RETURN(code);
×
837
    }
838
    mInfo("retention:%d, add drop compact action", pObj->id);
18!
839
  }
840

841
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
18!
842
    mError("retention:%d, trans:%d, failed to prepare since %s", id, pTrans->id, tstrerror(code));
×
843
    mndTransDrop(pTrans);
×
844
    TAOS_RETURN(code);
×
845
  }
846

847
  mndTransDrop(pTrans);
18✔
848
  return 0;
18✔
849
}
850

851
static void mndRetentionPullup(SMnode *pMnode) {
3,618✔
852
  int32_t code = 0;
3,618✔
853
  SSdb   *pSdb = pMnode->pSdb;
3,618✔
854
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_RETENTION), sizeof(int32_t));
3,618✔
855
  if (pArray == NULL) return;
3,618!
856

857
  void *pIter = NULL;
3,618✔
858
  while (1) {
35✔
859
    SRetentionObj *pObj = NULL;
3,653✔
860
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION, pIter, (void **)&pObj);
3,653✔
861
    if (pIter == NULL) break;
3,653✔
862
    if (taosArrayPush(pArray, &pObj->id) == NULL) {
70!
863
      mError("failed to push retention id:%d into array, but continue pull up", pObj->id);
×
864
    }
865
    sdbRelease(pSdb, pObj);
35✔
866
  }
867

868
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
3,653✔
869
    int32_t *pId = taosArrayGet(pArray, i);
35✔
870
    mInfo("begin to pull up retention:%d", *pId);
35!
871
    SRetentionObj *pObj = mndAcquireRetention(pMnode, *pId);
35✔
872
    if (pObj != NULL) {
35!
873
      mInfo("retention:%d, begin to pull up", pObj->id);
35!
874
      mndRetentionSendProgressReq(pMnode, pObj);
35✔
875
      if ((code = mndSaveRetentionProgress(pMnode, pObj->id)) != 0) {
35!
876
        mError("retention:%d, failed to save retention progress since %s", pObj->id, tstrerror(code));
×
877
      }
878
      mndReleaseRetention(pMnode, pObj);
35✔
879
    }
880
  }
881
  taosArrayDestroy(pArray);
3,618✔
882
}
883

884
static int32_t mndTrimDbDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
×
885
  if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
×
886
    return 0;
×
887
  }
888

889
  SName   name = {0};
×
890
  int32_t sqlLen = 0;
×
891
  char    sql[256] = {0};
×
892
  char    skeyStr[40] = {0};
×
893
  char    ekeyStr[40] = {0};
×
894
  char   *pDbName = pDb->name;
×
895

896
  if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) {
×
897
    pDbName = name.dbname;
×
898
  }
899

900
  if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 &&
×
901
      taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) {
×
902
    sqlLen = tsnprintf(sql, sizeof(sql), "trim db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr);
×
903
  } else {
904
    sqlLen = tsnprintf(sql, sizeof(sql), "trim db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
×
905
                       tw->ekey);
906
  }
907
  auditRecord(NULL, pMnode->clusterId, "autoTrimDB", name.dbname, "", sql, sqlLen);
×
908

909
  return 0;
×
910
}
911

912
extern int32_t mndTrimDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds,
913
                         ETsdbOpType type, ETriggerType triggerType);
914
static int32_t mndTrimDbDispatch(SRpcMsg *pReq) {
×
915
  int32_t    code = 0, lino = 0;
×
916
  SMnode    *pMnode = pReq->info.node;
×
917
  SSdb      *pSdb = pMnode->pSdb;
×
918
  int64_t    curSec = taosGetTimestampMs() / 1000;
×
919
  STrimDbReq trimReq = {
×
920
      .tw.skey = INT64_MIN, .tw.ekey = curSec, .optrType = TSDB_OPTR_NORMAL, .triggerType = TSDB_TRIGGER_AUTO};
921

922
  void   *pIter = NULL;
×
923
  SDbObj *pDb = NULL;
×
924
  while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
×
925
    if (pDb->cfg.isMount) {
×
926
      sdbRelease(pSdb, pDb);
×
927
      continue;
×
928
    }
929

930
    (void)snprintf(trimReq.db, sizeof(trimReq.db), "%s", pDb->name);
×
931

932
    if ((code = mndTrimDb(pMnode, pReq, pDb, trimReq.tw, trimReq.vgroupIds, trimReq.optrType, trimReq.triggerType)) ==
×
933
        0) {
934
      mInfo("db:%s, start to auto trim, optr:%u, tw:%" PRId64 ",%" PRId64, trimReq.db, trimReq.optrType,
×
935
            trimReq.tw.skey, trimReq.tw.ekey);
936
    } else {
937
      mError("db:%s, failed to auto trim since %s", pDb->name, tstrerror(code));
×
938
      sdbRelease(pSdb, pDb);
×
939
      continue;
×
940
    }
941

942
    TAOS_UNUSED(mndTrimDbDispatchAudit(pMnode, pReq, pDb, &trimReq.tw));
×
943

944
    sdbRelease(pSdb, pDb);
×
945
  }
946
_exit:
×
947
  return code;
×
948
}
949

950
static int32_t mndProcessQueryRetentionTimer(SRpcMsg *pReq) {
3,618✔
951
  mTrace("start to process query trim timer");
3,618✔
952
  mndRetentionPullup(pReq->info.node);
3,618✔
953
  return 0;
3,618✔
954
}
955

956
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq) {
×
957
  mTrace("start to process trim db timer");
×
958
  return mndTrimDbDispatch(pReq);
×
959
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc