• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4878

11 Dec 2025 02:43AM UTC coverage: 64.569% (-0.02%) from 64.586%
#4878

push

travis-ci

guanshengliang
feat(TS-7270): internal dependence

307 of 617 new or added lines in 24 files covered. (49.76%)

3821 existing lines in 123 files now uncovered.

163630 of 253417 relevant lines covered (64.57%)

107598827.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.97
/source/dnode/mnode/impl/src/mndRetention.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndRetention.h"
16
#include "audit.h"
17
#include "mndCompact.h"
18
#include "mndCompactDetail.h"
19
#include "mndDb.h"
20
#include "mndDef.h"
21
#include "mndDnode.h"
22
#include "mndPrivilege.h"
23
#include "mndRetentionDetail.h"
24
#include "mndShow.h"
25
#include "mndTrans.h"
26
#include "mndVgroup.h"
27
#include "tmisce.h"
28
#include "tmsgcb.h"
29

30
#define MND_RETENTION_VER_NUMBER 1
31

32
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq);
33
static int32_t mndProcessQueryRetentionTimer(SRpcMsg *pReq);
34
static int32_t mndRetrieveRetention(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
35
static void    mndCancelRetrieveRetention(SMnode *pMnode, void *pIter);
36

37
/**
38
 * @brief mndInitRetention
39
 *  init retention module.
40
 *  - trim is equivalent to retention
41
 * @param pMnode
42
 * @return
43
 */
44

45
int32_t mndInitRetention(SMnode *pMnode) {
491,616✔
46
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_RETENTION, mndRetrieveRetention);
491,616✔
47
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_RETENTION, mndCancelRetrieveRetention);
491,616✔
48
  mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB_TIMER, mndProcessTrimDbTimer);
491,616✔
49
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRIM, mndProcessKillRetentionReq);  // trim is equivalent to retention
491,616✔
50
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_TRIM_PROGRESS_RSP, mndProcessQueryRetentionRsp);
491,616✔
51
  mndSetMsgHandle(pMnode, TDMT_MND_QUERY_TRIM_TIMER, mndProcessQueryRetentionTimer);
491,616✔
52
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_TRIM_RSP, mndTransProcessRsp);
491,616✔
53

54
  SSdbTable table = {
491,616✔
55
      .sdbType = SDB_RETENTION,
56
      .keyType = SDB_KEY_INT32,
57
      .encodeFp = (SdbEncodeFp)mndRetentionActionEncode,
58
      .decodeFp = (SdbDecodeFp)mndRetentionActionDecode,
59
      .insertFp = (SdbInsertFp)mndRetentionActionInsert,
60
      .updateFp = (SdbUpdateFp)mndRetentionActionUpdate,
61
      .deleteFp = (SdbDeleteFp)mndRetentionActionDelete,
62
  };
63

64
  return sdbSetTable(pMnode->pSdb, table);
491,616✔
65
}
66

67
void mndCleanupRetention(SMnode *pMnode) { mDebug("mnd retention cleanup"); }
490,847✔
68

69
void tFreeRetentionObj(SRetentionObj *pObj) { tFreeCompactObj((SCompactObj *)pObj); }
55,340✔
70

71
int32_t tSerializeSRetentionObj(void *buf, int32_t bufLen, const SRetentionObj *pObj) {
122,834✔
72
  return tSerializeSCompactObj(buf, bufLen, (const SCompactObj *)pObj);
122,834✔
73
}
74

75
int32_t tDeserializeSRetentionObj(void *buf, int32_t bufLen, SRetentionObj *pObj) {
55,340✔
76
  return tDeserializeSCompactObj(buf, bufLen, (SCompactObj *)pObj);
55,340✔
77
}
78

79
SSdbRaw *mndRetentionActionEncode(SRetentionObj *pObj) {
61,417✔
80
  int32_t code = 0;
61,417✔
81
  int32_t lino = 0;
61,417✔
82
  terrno = TSDB_CODE_SUCCESS;
61,417✔
83

84
  void    *buf = NULL;
61,417✔
85
  SSdbRaw *pRaw = NULL;
61,417✔
86

87
  int32_t tlen = tSerializeSRetentionObj(NULL, 0, pObj);
61,417✔
88
  if (tlen < 0) {
61,417✔
89
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
90
    goto OVER;
×
91
  }
92

93
  int32_t size = sizeof(int32_t) + tlen;
61,417✔
94
  pRaw = sdbAllocRaw(SDB_RETENTION, MND_RETENTION_VER_NUMBER, size);
61,417✔
95
  if (pRaw == NULL) {
61,417✔
96
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
97
    goto OVER;
×
98
  }
99

100
  buf = taosMemoryMalloc(tlen);
61,417✔
101
  if (buf == NULL) {
61,417✔
102
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
103
    goto OVER;
×
104
  }
105

106
  tlen = tSerializeSRetentionObj(buf, tlen, pObj);
61,417✔
107
  if (tlen < 0) {
61,417✔
108
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
109
    goto OVER;
×
110
  }
111

112
  int32_t dataPos = 0;
61,417✔
113
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
61,417✔
114
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
61,417✔
115
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
61,417✔
116

117
OVER:
61,417✔
118
  taosMemoryFreeClear(buf);
61,417✔
119
  if (terrno != TSDB_CODE_SUCCESS) {
61,417✔
120
    mError("retention:%" PRId32 ", failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
121
    sdbFreeRaw(pRaw);
×
122
    return NULL;
×
123
  }
124

125
  mTrace("retention:%" PRId32 ", encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
61,417✔
126
  return pRaw;
61,417✔
127
}
128

129
SSdbRow *mndRetentionActionDecode(SSdbRaw *pRaw) {
55,340✔
130
  int32_t        code = 0;
55,340✔
131
  int32_t        lino = 0;
55,340✔
132
  SSdbRow       *pRow = NULL;
55,340✔
133
  SRetentionObj *pObj = NULL;
55,340✔
134
  void          *buf = NULL;
55,340✔
135
  terrno = TSDB_CODE_SUCCESS;
55,340✔
136

137
  int8_t sver = 0;
55,340✔
138
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
55,340✔
139
    goto OVER;
×
140
  }
141

142
  if (sver != MND_RETENTION_VER_NUMBER) {
55,340✔
143
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
144
    mError("retention read invalid ver, data ver: %d, curr ver: %d", sver, MND_RETENTION_VER_NUMBER);
×
145
    goto OVER;
×
146
  }
147

148
  pRow = sdbAllocRow(sizeof(SRetentionObj));
55,340✔
149
  if (pRow == NULL) {
55,340✔
150
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
151
    goto OVER;
×
152
  }
153

154
  pObj = sdbGetRowObj(pRow);
55,340✔
155
  if (pObj == NULL) {
55,340✔
156
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
157
    goto OVER;
×
158
  }
159

160
  int32_t tlen;
55,340✔
161
  int32_t dataPos = 0;
55,340✔
162
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
55,340✔
163
  buf = taosMemoryMalloc(tlen + 1);
55,340✔
164
  if (buf == NULL) {
55,340✔
165
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
166
    goto OVER;
×
167
  }
168
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
55,340✔
169

170
  if ((terrno = tDeserializeSRetentionObj(buf, tlen, pObj)) < 0) {
55,340✔
171
    goto OVER;
×
172
  }
173

174
OVER:
55,340✔
175
  taosMemoryFreeClear(buf);
55,340✔
176
  if (terrno != TSDB_CODE_SUCCESS) {
55,340✔
177
    mError("retention:%" PRId32 ", failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
×
178
    taosMemoryFreeClear(pRow);
×
179
    return NULL;
×
180
  }
181

182
  mTrace("retention:%" PRId32 ", decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
55,340✔
183
  return pRow;
55,340✔
184
}
185

186
int32_t mndRetentionActionInsert(SSdb *pSdb, SRetentionObj *pObj) {
28,763✔
187
  mTrace("retention:%" PRId32 ", perform insert action", pObj->id);
28,763✔
188
  return 0;
28,763✔
189
}
190

191
int32_t mndRetentionActionDelete(SSdb *pSdb, SRetentionObj *pObj) {
55,340✔
192
  mTrace("retention:%" PRId32 ", perform delete action", pObj->id);
55,340✔
193
  tFreeRetentionObj(pObj);
55,340✔
194
  return 0;
55,340✔
195
}
196

197
int32_t mndRetentionActionUpdate(SSdb *pSdb, SRetentionObj *pOldObj, SRetentionObj *pNewObj) {
×
198
  mTrace("retention:%" PRId32 ", perform update action, old row:%p new row:%p", pOldObj->id, pOldObj, pNewObj);
×
199

200
  return 0;
×
201
}
202

203
SRetentionObj *mndAcquireRetention(SMnode *pMnode, int32_t id) {
133,705✔
204
  SSdb          *pSdb = pMnode->pSdb;
133,705✔
205
  SRetentionObj *pObj = sdbAcquire(pSdb, SDB_RETENTION, &id);
133,705✔
206
  if (pObj == NULL && (terrno != TSDB_CODE_SDB_OBJ_NOT_THERE && terrno != TSDB_CODE_SDB_OBJ_CREATING &&
133,705✔
207
                       terrno != TSDB_CODE_SDB_OBJ_DROPPING)) {
×
208
    terrno = TSDB_CODE_APP_ERROR;
×
209
    mError("retention:%" PRId32 ", failed to acquire retention since %s", id, terrstr());
×
210
  }
211
  return pObj;
133,705✔
212
}
213

214
void mndReleaseRetention(SMnode *pMnode, SRetentionObj *pObj) {
133,705✔
215
  SSdb *pSdb = pMnode->pSdb;
133,705✔
216
  sdbRelease(pSdb, pObj);
133,705✔
217
}
133,705✔
218

219
static int32_t mndRetentionGetDbInfo(SMnode *pMnode, int32_t id, char *dbname, int32_t len, int64_t *dbUid) {
53,564✔
220
  int32_t        code = 0;
53,564✔
221
  SRetentionObj *pObj = mndAcquireRetention(pMnode, id);
53,564✔
222
  if (pObj == NULL) {
53,564✔
223
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
224
    if (terrno != 0) code = terrno;
×
225
    TAOS_RETURN(code);
×
226
  }
227

228
  tstrncpy(dbname, pObj->dbname, len);
53,564✔
229
  if (dbUid) *dbUid = pObj->dbUid;
53,564✔
230
  mndReleaseRetention(pMnode, pObj);
53,564✔
231
  TAOS_RETURN(code);
53,564✔
232
}
233

234
int32_t mndAddRetentionToTrans(SMnode *pMnode, STrans *pTrans, SRetentionObj *pObj, SDbObj *pDb, STrimDbRsp *rsp) {
33,395✔
235
  int32_t code = 0;
33,395✔
236
  pObj->id = tGenIdPI32();
33,395✔
237

238
  tstrncpy(pObj->dbname, pDb->name, sizeof(pObj->dbname));
33,395✔
239
  pObj->dbUid = pDb->uid;
33,395✔
240

241
  pObj->startTime = taosGetTimestampMs();
33,395✔
242

243
  SSdbRaw *pVgRaw = mndRetentionActionEncode(pObj);
33,395✔
244
  if (pVgRaw == NULL) {
33,395✔
245
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
246
    if (terrno != 0) code = terrno;
×
247
    TAOS_RETURN(code);
×
248
  }
249
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
33,395✔
250
    sdbFreeRaw(pVgRaw);
×
251
    TAOS_RETURN(code);
×
252
  }
253

254
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
33,395✔
255
    sdbFreeRaw(pVgRaw);
×
256
    TAOS_RETURN(code);
×
257
  }
258

259
  rsp->id = pObj->id;
33,395✔
260

261
  return 0;
33,395✔
262
}
263

264
static int32_t mndRetrieveRetention(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
90,324✔
265
  SMnode        *pMnode = pReq->info.node;
90,324✔
266
  SSdb          *pSdb = pMnode->pSdb;
90,324✔
267
  int32_t        numOfRows = 0;
90,324✔
268
  SRetentionObj *pObj = NULL;
90,324✔
269
  char          *sep = NULL;
90,324✔
270
  SDbObj        *pDb = NULL;
90,324✔
271
  int32_t        code = 0, lino = 0;
90,324✔
272
  char           tmpBuf[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
90,324✔
273

274
  if ((pShow->db[0] != 0) && (sep = strchr(pShow->db, '.')) && (*(++sep) != 0)) {
90,324✔
275
    if (IS_SYS_DBNAME(sep)) {
90,324✔
276
      goto _OVER;
×
277
    } else if (!(pDb = mndAcquireDb(pMnode, pShow->db))) {
90,324✔
278
      return terrno;
×
279
    }
280
  }
281

282
  while (numOfRows < rows) {
155,944✔
283
    pShow->pIter = sdbFetch(pSdb, SDB_RETENTION, pShow->pIter, (void **)&pObj);
155,944✔
284
    if (pShow->pIter == NULL) break;
155,944✔
285

286
    SColumnInfoData *pColInfo;
287
    int32_t          cols = 0;
65,620✔
288

289
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,620✔
290
    COL_DATA_SET_VAL_GOTO((const char *)&pObj->id, false, pObj, pShow->pIter, _OVER);
65,620✔
291

292
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,620✔
293
    if (pDb != NULL && strcmp(pDb->name, pObj->dbname) != 0) {
65,620✔
294
      sdbRelease(pSdb, pObj);
×
295
      continue;
×
296
    }
297
    SName name = {0};
65,620✔
298
    if ((code = tNameFromString(&name, pObj->dbname, T_NAME_ACCT | T_NAME_DB)) != 0) {
65,620✔
299
      sdbRelease(pSdb, pObj);
×
300
      sdbCancelFetch(pSdb, pShow->pIter);
×
301
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
302
    }
303
    (void)tNameGetDbName(&name, varDataVal(tmpBuf));
65,620✔
304
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
65,620✔
305
    COL_DATA_SET_VAL_GOTO((const char *)tmpBuf, false, pObj, pShow->pIter, _OVER);
65,620✔
306

307
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,620✔
308
    COL_DATA_SET_VAL_GOTO((const char *)&pObj->startTime, false, pObj, pShow->pIter, _OVER);
65,620✔
309

310
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,620✔
311
    tstrncpy(varDataVal(tmpBuf), pObj->triggerType == TSDB_TRIGGER_MANUAL ? "manual" : "auto",
65,620✔
312
             sizeof(tmpBuf) - VARSTR_HEADER_SIZE);
313
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
65,620✔
314
    COL_DATA_SET_VAL_GOTO((const char *)tmpBuf, false, pObj, pShow->pIter, _OVER);
65,620✔
315

316
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
65,620✔
317
    char *optr = "trim";
65,620✔
318
    if (pObj->optrType == TSDB_OPTR_SSMIGRATE) {
65,620✔
319
      optr = "ssmigrate";
×
320
    } else if (pObj->optrType == TSDB_OPTR_ROLLUP) {
65,620✔
321
      optr = "rollup";
51,724✔
322
    }
323
    tstrncpy(varDataVal(tmpBuf), optr, sizeof(tmpBuf) - VARSTR_HEADER_SIZE);
65,620✔
324
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
65,620✔
325
    COL_DATA_SET_VAL_GOTO((const char *)tmpBuf, false, pObj, pShow->pIter, _OVER);
65,620✔
326

327
    sdbRelease(pSdb, pObj);
65,620✔
328
    ++numOfRows;
65,620✔
329
  }
330

331
_OVER:
90,324✔
332
  mndReleaseDb(pMnode, pDb);
90,324✔
333
  if (code != 0) {
90,324✔
334
    mError("failed to retrieve retention at line %d since %s", lino, tstrerror(code));
×
335
    TAOS_RETURN(code);
×
336
  }
337
  pShow->numOfRows += numOfRows;
90,324✔
338
  return numOfRows;
90,324✔
339
}
340

341
static void mndCancelRetrieveRetention(SMnode *pMnode, void *pIter) {
×
342
  SSdb *pSdb = pMnode->pSdb;
×
343
  sdbCancelFetchByType(pSdb, pIter, SDB_RETENTION);
×
344
}
×
345

346
static void *mndBuildKillRetentionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t id, int32_t dnodeId) {
×
347
  SVKillRetentionReq req = {0};
×
348
  req.taskId = id;
×
349
  req.vgId = pVgroup->vgId;
×
350
  req.dnodeId = dnodeId;
×
351
  terrno = 0;
×
352

353
  mInfo("vgId:%d, build kill retention req", pVgroup->vgId);
×
354
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
×
355
  if (contLen < 0) {
×
356
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
357
    return NULL;
×
358
  }
359
  contLen += sizeof(SMsgHead);
×
360

361
  void *pReq = taosMemoryMalloc(contLen);
×
362
  if (pReq == NULL) {
×
363
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
364
    return NULL;
×
365
  }
366

367
  SMsgHead *pHead = pReq;
×
368
  pHead->contLen = htonl(contLen);
×
369
  pHead->vgId = htonl(pVgroup->vgId);
×
370

371
  int32_t ret = 0;
×
372
  if ((ret = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
×
373
    taosMemoryFreeClear(pReq);
×
374
    terrno = ret;
×
375
    return NULL;
×
376
  }
377
  *pContLen = contLen;
×
378
  return pReq;
×
379
}
380

381
static int32_t mndAddKillRetentionAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t id, int32_t dnodeId) {
×
382
  int32_t      code = 0;
×
383
  STransAction action = {0};
×
384

385
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
386
  if (pDnode == NULL) {
×
387
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
388
    if (terrno != 0) code = terrno;
×
389
    TAOS_RETURN(code);
×
390
  }
391
  action.epSet = mndGetDnodeEpset(pDnode);
×
392
  mndReleaseDnode(pMnode, pDnode);
×
393

394
  int32_t contLen = 0;
×
395
  void   *pReq = mndBuildKillRetentionReq(pMnode, pVgroup, &contLen, id, dnodeId);
×
396
  if (pReq == NULL) {
×
397
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
398
    if (terrno != 0) code = terrno;
×
399
    TAOS_RETURN(code);
×
400
  }
401

402
  action.pCont = pReq;
×
403
  action.contLen = contLen;
×
404
  action.msgType = TDMT_VND_KILL_TRIM;
×
405

406
  mTrace("trans:%d, kill retention msg len:%d", pTrans->id, contLen);
×
407

408
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
409
    taosMemoryFree(pReq);
×
410
    TAOS_RETURN(code);
×
411
  }
412

413
  return 0;
×
414
}
415

416
static int32_t mndKillRetention(SMnode *pMnode, SRpcMsg *pReq, SRetentionObj *pObj) {
×
417
  int32_t code = 0;
×
418
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-retention");
×
419
  if (pTrans == NULL) {
×
420
    mError("retention:%" PRId32 ", failed to drop since %s", pObj->id, terrstr());
×
421
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
422
    if (terrno != 0) code = terrno;
×
423
    TAOS_RETURN(code);
×
424
  }
425
  mInfo("trans:%d, used to kill retention:%" PRId32, pTrans->id, pObj->id);
×
426

427
  mndTransSetDbName(pTrans, pObj->dbname, NULL);
×
428

429
  SSdbRaw *pCommitRaw = mndRetentionActionEncode(pObj);
×
430
  if (pCommitRaw == NULL) {
×
431
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
432
    if (terrno != 0) code = terrno;
×
433
    mndTransDrop(pTrans);
×
434
    TAOS_RETURN(code);
×
435
  }
436
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
437
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
438
    mndTransDrop(pTrans);
×
439
    TAOS_RETURN(code);
×
440
  }
441
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
×
442
    mndTransDrop(pTrans);
×
443
    TAOS_RETURN(code);
×
444
  }
445

446
  void *pIter = NULL;
×
447
  while (1) {
×
448
    SCompactDetailObj *pDetail = NULL;
×
449
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
×
450
    if (pIter == NULL) break;
×
451

452
    if (pDetail->id == pObj->id) {
×
453
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
×
454
      if (pVgroup == NULL) {
×
455
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
456
        sdbCancelFetch(pMnode->pSdb, pIter);
×
457
        sdbRelease(pMnode->pSdb, pDetail);
×
458
        mndTransDrop(pTrans);
×
459
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
460
        if (terrno != 0) code = terrno;
×
461
        TAOS_RETURN(code);
×
462
      }
463

464
      if ((code = mndAddKillRetentionAction(pMnode, pTrans, pVgroup, pObj->id, pDetail->dnodeId)) != 0) {
×
465
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
466
        sdbCancelFetch(pMnode->pSdb, pIter);
×
467
        sdbRelease(pMnode->pSdb, pDetail);
×
468
        mndTransDrop(pTrans);
×
469
        TAOS_RETURN(code);
×
470
      }
471

472
      mndReleaseVgroup(pMnode, pVgroup);
×
473
    }
474

475
    sdbRelease(pMnode->pSdb, pDetail);
×
476
  }
477

478
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
479
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
480
    mndTransDrop(pTrans);
×
481
    TAOS_RETURN(code);
×
482
  }
483

484
  mndTransDrop(pTrans);
×
485
  return 0;
×
486
}
487

488
int32_t mndProcessKillRetentionReq(SRpcMsg *pReq) {
×
489
  int32_t           code = 0;
×
490
  int32_t           lino = 0;
×
491
  SKillRetentionReq req = {0};  // reuse SKillCompactReq
×
492

493
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
494
    TAOS_RETURN(code);
×
495
  }
496

497
  mInfo("start to kill retention:%" PRId32, req.id);
×
498

499
  SMnode        *pMnode = pReq->info.node;
×
500
  SRetentionObj *pObj = mndAcquireRetention(pMnode, req.id);
×
501
  if (pObj == NULL) {
×
502
    code = TSDB_CODE_MND_INVALID_RETENTION_ID;
×
503
    tFreeSKillCompactReq(&req);
×
504
    TAOS_RETURN(code);
×
505
  }
506

507
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_TRIM_DB), &lino, _OVER);
×
508

509
  TAOS_CHECK_GOTO(mndKillRetention(pMnode, pReq, pObj), &lino, _OVER);
×
510

511
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
512

513
  char    obj[TSDB_INT32_ID_LEN] = {0};
×
514
  int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pObj->id);
×
515
  if ((uint32_t)nBytes < sizeof(obj)) {
×
516
    auditRecord(pReq, pMnode->clusterId, "killRetention", pObj->dbname, obj, req.sql, req.sqlLen);
×
517
  } else {
518
    mError("retention:%" PRId32 " failed to audit since %s", pObj->id, tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
519
  }
520
_OVER:
×
521
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
522
    mError("failed to kill retention %" PRId32 " since %s", req.id, terrstr());
×
523
  }
524

525
  tFreeSKillCompactReq((SKillCompactReq *)&req);
×
526
  mndReleaseRetention(pMnode, pObj);
×
527

528
  TAOS_RETURN(code);
×
529
}
530

531
// update progress
532
static int32_t mndUpdateRetentionProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t id, SQueryRetentionProgressRsp *rsp) {
124,294✔
533
  int32_t code = 0;
124,294✔
534

535
  void *pIter = NULL;
124,294✔
536
  while (1) {
136,530✔
537
    SRetentionDetailObj *pDetail = NULL;
260,824✔
538
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
260,824✔
539
    if (pIter == NULL) break;
260,824✔
540

541
    if (pDetail->id == id && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
260,824✔
542
      pDetail->newNumberFileset = rsp->numberFileset;
124,294✔
543
      pDetail->newFinished = rsp->finished;
124,294✔
544
      pDetail->progress = rsp->progress;
124,294✔
545
      pDetail->remainingTime = rsp->remainingTime;
124,294✔
546

547
      sdbCancelFetch(pMnode->pSdb, pIter);
124,294✔
548
      sdbRelease(pMnode->pSdb, pDetail);
124,294✔
549

550
      TAOS_RETURN(code);
124,294✔
551
    }
552

553
    sdbRelease(pMnode->pSdb, pDetail);
136,530✔
554
  }
555

556
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
×
557
}
558

559
int32_t mndProcessQueryRetentionRsp(SRpcMsg *pReq) {
124,294✔
560
  int32_t                    code = 0;
124,294✔
561
  SQueryRetentionProgressRsp req = {0};
124,294✔
562
  if (pReq->code != 0) {
124,294✔
UNCOV
563
    mError("received wrong retention response, req code is %s", tstrerror(pReq->code));
×
UNCOV
564
    TAOS_RETURN(pReq->code);
×
565
  }
566
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
124,294✔
567
  if (code != 0) {
124,294✔
568
    mError("failed to deserialize vnode-query-retention-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
569
           pReq->contLen);
570
    TAOS_RETURN(code);
×
571
  }
572

573
  mDebug("retention:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.id, req.vgId,
124,294✔
574
         req.dnodeId, req.numberFileset, req.finished);
575

576
  SMnode *pMnode = pReq->info.node;
125,066✔
577

578
  code = mndUpdateRetentionProgress(pMnode, pReq, req.id, &req);
124,294✔
579
  if (code != 0) {
124,294✔
580
    mError("retention:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.id,
×
581
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
582
    TAOS_RETURN(code);
×
583
  }
584

585
  TAOS_RETURN(code);
124,294✔
586
}
587

588
// timer
589
void mndRetentionSendProgressReq(SMnode *pMnode, SRetentionObj *pObj) {
53,564✔
590
  void *pIter = NULL;
53,564✔
591

592
  while (1) {
124,294✔
593
    SRetentionDetailObj *pDetail = NULL;
177,858✔
594
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
177,858✔
595
    if (pIter == NULL) break;
177,858✔
596

597
    if (pDetail->id == pObj->id) {
124,294✔
598
      SEpSet epSet = {0};
124,294✔
599

600
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
124,294✔
601
      if (pDnode == NULL) break;
124,294✔
602
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
124,294✔
603
        sdbRelease(pMnode->pSdb, pDetail);
×
604
        continue;
×
605
      }
606
      mndReleaseDnode(pMnode, pDnode);
124,294✔
607

608
      SQueryRetentionProgressReq req;
124,294✔
609
      req.id = pDetail->id;
124,294✔
610
      req.vgId = pDetail->vgId;
124,294✔
611
      req.dnodeId = pDetail->dnodeId;
124,294✔
612

613
      int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
124,294✔
614
      if (contLen < 0) {
124,294✔
615
        sdbRelease(pMnode->pSdb, pDetail);
×
616
        continue;
×
617
      }
618

619
      contLen += sizeof(SMsgHead);
124,294✔
620

621
      SMsgHead *pHead = rpcMallocCont(contLen);
124,294✔
622
      if (pHead == NULL) {
124,294✔
623
        sdbRelease(pMnode->pSdb, pDetail);
×
624
        continue;
×
625
      }
626

627
      pHead->contLen = htonl(contLen);
124,294✔
628
      pHead->vgId = htonl(pDetail->vgId);
124,294✔
629

630
      if (tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
124,294✔
631
        sdbRelease(pMnode->pSdb, pDetail);
×
632
        continue;
×
633
      }
634

635
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_TRIM_PROGRESS, .contLen = contLen};
124,294✔
636

637
      rpcMsg.pCont = pHead;
124,294✔
638

639
      char    detail[1024] = {0};
124,294✔
640
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
248,588✔
641
                              TMSG_INFO(TDMT_VND_QUERY_TRIM_PROGRESS), epSet.numOfEps, epSet.inUse);
248,588✔
642
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
248,588✔
643
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
124,294✔
644
      }
645

646
      mDebug("retention:%d, send update progress msg to %s", pDetail->id, detail);
124,294✔
647

648
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
124,294✔
649
        sdbRelease(pMnode->pSdb, pDetail);
×
650
        continue;
×
651
      }
652
    }
653

654
    sdbRelease(pMnode->pSdb, pDetail);
124,294✔
655
  }
656
}
53,564✔
657

658
static int32_t mndSaveRetentionProgress(SMnode *pMnode, int32_t id) {
53,564✔
659
  int32_t code = 0;
53,564✔
660
  bool    needSave = false;
53,564✔
661
  void   *pIter = NULL;
53,564✔
662
  while (1) {
124,294✔
663
    SRetentionDetailObj *pDetail = NULL;
177,858✔
664
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
177,858✔
665
    if (pIter == NULL) break;
177,858✔
666

667
    if (pDetail->id == id) {
124,294✔
668
      mDebug(
124,294✔
669
          "retention:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
670
          "newNumberFileset:%d, newFinished:%d",
671
          pDetail->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
672
          pDetail->newNumberFileset, pDetail->newFinished);
673

674
      // these 2 number will jump back after dnode restart, so < is not used here
675
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
124,294✔
676
        needSave = true;
59,876✔
677
    }
678

679
    sdbRelease(pMnode->pSdb, pDetail);
124,294✔
680
  }
681

682
  char    dbname[TSDB_TABLE_FNAME_LEN] = {0};
53,564✔
683
  int64_t dbUid = 0;
53,564✔
684
  TAOS_CHECK_RETURN(mndRetentionGetDbInfo(pMnode, id, dbname, TSDB_TABLE_FNAME_LEN, &dbUid));
53,564✔
685

686
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
53,564✔
UNCOV
687
    needSave = true;
×
UNCOV
688
    mWarn("retention:%" PRId32 ", no db exist, set needSave:%s", id, dbname);
×
689
  }
690

691
  if (!needSave) {
53,564✔
692
    mDebug("retention:%" PRId32 ", no need to save", id);
25,443✔
693
    TAOS_RETURN(code);
25,443✔
694
  }
695

696
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-retention-progress");
28,121✔
697
  if (pTrans == NULL) {
28,121✔
698
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
699
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
700
    if (terrno != 0) code = terrno;
×
701
    TAOS_RETURN(code);
×
702
  }
703
  mInfo("retention:%d, trans:%d, used to update retention progress.", id, pTrans->id);
28,121✔
704

705
  mndTransSetDbName(pTrans, dbname, NULL);
28,121✔
706

707
  pIter = NULL;
28,121✔
708
  while (1) {
63,185✔
709
    SRetentionDetailObj *pDetail = NULL;
91,306✔
710
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
91,306✔
711
    if (pIter == NULL) break;
91,306✔
712

713
    if (pDetail->id == id) {
63,185✔
714
      mInfo(
63,185✔
715
          "retention:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
716
          "newNumberFileset:%d, newFinished:%d",
717
          pDetail->id, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
718
          pDetail->newNumberFileset, pDetail->newFinished);
719

720
      pDetail->numberFileset = pDetail->newNumberFileset;
63,185✔
721
      pDetail->finished = pDetail->newFinished;
63,185✔
722

723
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
63,185✔
724
      if (pCommitRaw == NULL) {
63,185✔
725
        sdbCancelFetch(pMnode->pSdb, pIter);
×
726
        sdbRelease(pMnode->pSdb, pDetail);
×
727
        mndTransDrop(pTrans);
×
728
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
729
        if (terrno != 0) code = terrno;
×
730
        TAOS_RETURN(code);
×
731
      }
732
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
63,185✔
733
        mError("retention:%d, trans:%d, failed to append commit log since %s", pDetail->id, pTrans->id, terrstr());
×
734
        sdbCancelFetch(pMnode->pSdb, pIter);
×
735
        sdbRelease(pMnode->pSdb, pDetail);
×
736
        mndTransDrop(pTrans);
×
737
        TAOS_RETURN(code);
×
738
      }
739
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
63,185✔
740
        sdbCancelFetch(pMnode->pSdb, pIter);
×
741
        sdbRelease(pMnode->pSdb, pDetail);
×
742
        mndTransDrop(pTrans);
×
743
        TAOS_RETURN(code);
×
744
      }
745
    }
746

747
    sdbRelease(pMnode->pSdb, pDetail);
63,185✔
748
  }
749

750
  bool allFinished = true;
28,121✔
751
  pIter = NULL;
28,121✔
752
  while (1) {
61,641✔
753
    SRetentionDetailObj *pDetail = NULL;
89,762✔
754
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
89,762✔
755
    if (pIter == NULL) break;
89,762✔
756

757
    if (pDetail->id == id) {
63,185✔
758
      mInfo("retention:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
63,185✔
759
            pDetail->id, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
760

761
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
63,185✔
762
        allFinished = false;
772✔
763
        sdbCancelFetch(pMnode->pSdb, pIter);
772✔
764
        sdbRelease(pMnode->pSdb, pDetail);
772✔
765
        break;
772✔
766
      }
767
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
62,413✔
768
        allFinished = false;
772✔
769
        sdbCancelFetch(pMnode->pSdb, pIter);
772✔
770
        sdbRelease(pMnode->pSdb, pDetail);
772✔
771
        break;
772✔
772
      }
773
    }
774

775
    sdbRelease(pMnode->pSdb, pDetail);
61,641✔
776
  }
777

778
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
28,121✔
UNCOV
779
    allFinished = true;
×
UNCOV
780
    mWarn("retention:%" PRId32 ", no db exist, set all finished:%s", id, dbname);
×
781
  }
782

783
  if (allFinished) {
28,121✔
784
    mInfo("retention:%d, all finished", id);
26,577✔
785
    pIter = NULL;
26,577✔
786
    while (1) {
60,097✔
787
      SRetentionDetailObj *pDetail = NULL;
86,674✔
788
      pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
86,674✔
789
      if (pIter == NULL) break;
86,674✔
790

791
      if (pDetail->id == id) {
60,097✔
792
        SSdbRaw *pCommitRaw = mndRetentionDetailActionEncode(pDetail);
60,097✔
793
        if (pCommitRaw == NULL) {
60,097✔
794
          mndTransDrop(pTrans);
×
795
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
796
          if (terrno != 0) code = terrno;
×
797
          TAOS_RETURN(code);
×
798
        }
799
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
60,097✔
800
          mError("retention:%d, trans:%d, failed to append commit log since %s", pDetail->id, pTrans->id,
×
801
                 tstrerror(code));
802
          sdbCancelFetch(pMnode->pSdb, pIter);
×
803
          sdbRelease(pMnode->pSdb, pDetail);
×
804
          mndTransDrop(pTrans);
×
805
          TAOS_RETURN(code);
×
806
        }
807
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
60,097✔
808
          sdbCancelFetch(pMnode->pSdb, pIter);
×
809
          sdbRelease(pMnode->pSdb, pDetail);
×
810
          mndTransDrop(pTrans);
×
811
          TAOS_RETURN(code);
×
812
        }
813
        mInfo("retention:%d, add drop compactdetail action", pDetail->compactDetailId);
60,097✔
814
      }
815

816
      sdbRelease(pMnode->pSdb, pDetail);
60,097✔
817
    }
818

819
    SRetentionObj *pObj = mndAcquireRetention(pMnode, id);
26,577✔
820
    if (pObj == NULL) {
26,577✔
821
      mndTransDrop(pTrans);
×
822
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
823
      if (terrno != 0) code = terrno;
×
824
      TAOS_RETURN(code);
×
825
    }
826
    SSdbRaw *pCommitRaw = mndRetentionActionEncode(pObj);
26,577✔
827
    mndReleaseRetention(pMnode, pObj);
26,577✔
828
    if (pCommitRaw == NULL) {
26,577✔
829
      mndTransDrop(pTrans);
×
830
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
831
      if (terrno != 0) code = terrno;
×
832
      TAOS_RETURN(code);
×
833
    }
834
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
26,577✔
835
      mError("retention:%d, trans:%d, failed to append commit log since %s", id, pTrans->id, tstrerror(code));
×
836
      mndTransDrop(pTrans);
×
837
      TAOS_RETURN(code);
×
838
    }
839
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
26,577✔
840
      mError("retention:%d, trans:%d, failed to append commit log since %s", id, pTrans->id, tstrerror(code));
×
841
      mndTransDrop(pTrans);
×
842
      TAOS_RETURN(code);
×
843
    }
844
    mInfo("retention:%d, add drop compact action", pObj->id);
26,577✔
845
  }
846

847
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
28,121✔
848
    mError("retention:%d, trans:%d, failed to prepare since %s", id, pTrans->id, tstrerror(code));
×
849
    mndTransDrop(pTrans);
×
850
    TAOS_RETURN(code);
×
851
  }
852

853
  mndTransDrop(pTrans);
28,121✔
854
  return 0;
28,121✔
855
}
856

857
static void mndRetentionPullup(SMnode *pMnode) {
2,097,981✔
858
  int32_t code = 0;
2,097,981✔
859
  SSdb   *pSdb = pMnode->pSdb;
2,097,981✔
860
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_RETENTION), sizeof(int32_t));
2,097,981✔
861
  if (pArray == NULL) return;
2,097,981✔
862

863
  void *pIter = NULL;
2,097,981✔
864
  while (1) {
53,564✔
865
    SRetentionObj *pObj = NULL;
2,151,545✔
866
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION, pIter, (void **)&pObj);
2,151,545✔
867
    if (pIter == NULL) break;
2,151,545✔
868
    if (taosArrayPush(pArray, &pObj->id) == NULL) {
107,128✔
869
      mError("failed to push retention id:%d into array, but continue pull up", pObj->id);
×
870
    }
871
    sdbRelease(pSdb, pObj);
53,564✔
872
  }
873

874
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
2,151,545✔
875
    int32_t *pId = taosArrayGet(pArray, i);
53,564✔
876
    mInfo("begin to pull up retention:%d", *pId);
53,564✔
877
    SRetentionObj *pObj = mndAcquireRetention(pMnode, *pId);
53,564✔
878
    if (pObj != NULL) {
53,564✔
879
      mInfo("retention:%d, begin to pull up", pObj->id);
53,564✔
880
      mndRetentionSendProgressReq(pMnode, pObj);
53,564✔
881
      if ((code = mndSaveRetentionProgress(pMnode, pObj->id)) != 0) {
53,564✔
882
        mError("retention:%d, failed to save retention progress since %s", pObj->id, tstrerror(code));
×
883
      }
884
      mndReleaseRetention(pMnode, pObj);
53,564✔
885
    }
886
  }
887
  taosArrayDestroy(pArray);
2,097,981✔
888
}
889

890
static int32_t mndTrimDbDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
×
891
  if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
×
892
    return 0;
×
893
  }
894

895
  SName   name = {0};
×
896
  int32_t sqlLen = 0;
×
897
  char    sql[256] = {0};
×
898
  char    skeyStr[40] = {0};
×
899
  char    ekeyStr[40] = {0};
×
900
  char   *pDbName = pDb->name;
×
901

902
  if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) {
×
903
    pDbName = name.dbname;
×
904
  }
905

906
  if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 &&
×
907
      taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) {
×
908
    sqlLen = tsnprintf(sql, sizeof(sql), "trim db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr);
×
909
  } else {
910
    sqlLen = tsnprintf(sql, sizeof(sql), "trim db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
×
911
                       tw->ekey);
912
  }
913
  auditRecord(NULL, pMnode->clusterId, "autoTrimDB", name.dbname, "", sql, sqlLen);
×
914

915
  return 0;
×
916
}
917

918
extern int32_t mndTrimDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds,
919
                         ETsdbOpType type, ETriggerType triggerType);
920
static int32_t mndTrimDbDispatch(SRpcMsg *pReq) {
119✔
921
  int32_t    code = 0, lino = 0;
119✔
922
  SMnode    *pMnode = pReq->info.node;
119✔
923
  SSdb      *pSdb = pMnode->pSdb;
119✔
924
  int64_t    curSec = taosGetTimestampMs() / 1000;
119✔
925
  STrimDbReq trimReq = {
119✔
926
      .tw.skey = INT64_MIN, .tw.ekey = curSec, .optrType = TSDB_OPTR_NORMAL, .triggerType = TSDB_TRIGGER_AUTO};
927

928
  void   *pIter = NULL;
119✔
929
  SDbObj *pDb = NULL;
119✔
930
  while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
119✔
931
    if (pDb->cfg.isMount) {
×
932
      sdbRelease(pSdb, pDb);
×
933
      continue;
×
934
    }
935

936
    (void)snprintf(trimReq.db, sizeof(trimReq.db), "%s", pDb->name);
×
937

938
    if ((code = mndTrimDb(pMnode, pReq, pDb, trimReq.tw, trimReq.vgroupIds, trimReq.optrType, trimReq.triggerType)) ==
×
939
        0) {
940
      mInfo("db:%s, start to auto trim, optr:%u, tw:%" PRId64 ",%" PRId64, trimReq.db, trimReq.optrType,
×
941
            trimReq.tw.skey, trimReq.tw.ekey);
942
    } else {
943
      mError("db:%s, failed to auto trim since %s", pDb->name, tstrerror(code));
×
944
      sdbRelease(pSdb, pDb);
×
945
      continue;
×
946
    }
947

948
    TAOS_UNUSED(mndTrimDbDispatchAudit(pMnode, pReq, pDb, &trimReq.tw));
×
949

950
    sdbRelease(pSdb, pDb);
×
951
  }
952
_exit:
119✔
953
  return code;
119✔
954
}
955

956
static int32_t mndProcessQueryRetentionTimer(SRpcMsg *pReq) {
2,097,981✔
957
  mTrace("start to process query trim timer");
2,097,981✔
958
  mndRetentionPullup(pReq->info.node);
2,097,981✔
959
  return 0;
2,097,981✔
960
}
961

962
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq) {
119✔
963
  mTrace("start to process trim db timer");
119✔
964
  return mndTrimDbDispatch(pReq);
119✔
965
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc