• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4904

29 Dec 2025 12:28PM UTC coverage: 65.734% (+0.004%) from 65.73%
#4904

push

travis-ci

web-flow
Merge d019577a3 into 31126ab2b

15 of 16 new or added lines in 2 files covered. (93.75%)

268 existing lines in 3 files now uncovered.

192952 of 293535 relevant lines covered (65.73%)

118519910.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.03
/source/dnode/mnode/impl/src/mndRetention.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndRetention.h"
16
#include "audit.h"
17
#include "mndCompact.h"
18
#include "mndCompactDetail.h"
19
#include "mndDb.h"
20
#include "mndDef.h"
21
#include "mndDnode.h"
22
#include "mndPrivilege.h"
23
#include "mndRetentionDetail.h"
24
#include "mndShow.h"
25
#include "mndTrans.h"
26
#include "mndUser.h"
27
#include "mndVgroup.h"
28
#include "tmisce.h"
29
#include "tmsgcb.h"
30

31
#define MND_RETENTION_VER_NUMBER 1
32

33
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq);
34
static int32_t mndProcessQueryRetentionTimer(SRpcMsg *pReq);
35
static int32_t mndRetrieveRetention(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
36
static void    mndCancelRetrieveRetention(SMnode *pMnode, void *pIter);
37

38
/**
39
 * @brief mndInitRetention
40
 *  init retention module.
41
 *  - trim is equivalent to retention
42
 * @param pMnode
43
 * @return
44
 */
45

395,339✔
46
int32_t mndInitRetention(SMnode *pMnode) {
395,339✔
47
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_RETENTION, mndRetrieveRetention);
395,339✔
48
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_RETENTION, mndCancelRetrieveRetention);
395,339✔
49
  mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB_TIMER, mndProcessTrimDbTimer);
395,339✔
50
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRIM, mndProcessKillRetentionReq);  // trim is equivalent to retention
395,339✔
51
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_TRIM_PROGRESS_RSP, mndProcessQueryRetentionRsp);
395,339✔
52
  mndSetMsgHandle(pMnode, TDMT_MND_QUERY_TRIM_TIMER, mndProcessQueryRetentionTimer);
395,339✔
53
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_TRIM_RSP, mndTransProcessRsp);
54

395,339✔
55
  SSdbTable table = {
56
      .sdbType = SDB_RETENTION,
57
      .keyType = SDB_KEY_INT32,
58
      .encodeFp = (SdbEncodeFp)mndRetentionActionEncode,
59
      .decodeFp = (SdbDecodeFp)mndRetentionActionDecode,
60
      .insertFp = (SdbInsertFp)mndRetentionActionInsert,
61
      .updateFp = (SdbUpdateFp)mndRetentionActionUpdate,
62
      .deleteFp = (SdbDeleteFp)mndRetentionActionDelete,
63
  };
64

395,339✔
65
  return sdbSetTable(pMnode->pSdb, table);
66
}
67

395,276✔
68
void mndCleanupRetention(SMnode *pMnode) { mDebug("mnd retention cleanup"); }
69

27,519✔
70
void tFreeRetentionObj(SRetentionObj *pObj) { tFreeCompactObj((SCompactObj *)pObj); }
71

62,640✔
72
int32_t tSerializeSRetentionObj(void *buf, int32_t bufLen, const SRetentionObj *pObj) {
62,640✔
73
  return tSerializeSCompactObj(buf, bufLen, (const SCompactObj *)pObj);
74
}
75

27,519✔
76
int32_t tDeserializeSRetentionObj(void *buf, int32_t bufLen, SRetentionObj *pObj) {
27,519✔
77
  return tDeserializeSCompactObj(buf, bufLen, (SCompactObj *)pObj);
78
}
79

31,320✔
80
SSdbRaw *mndRetentionActionEncode(SRetentionObj *pObj) {
31,320✔
81
  int32_t code = 0;
31,320✔
82
  int32_t lino = 0;
31,320✔
83
  terrno = TSDB_CODE_SUCCESS;
84

31,320✔
85
  void    *buf = NULL;
31,320✔
86
  SSdbRaw *pRaw = NULL;
87

31,320✔
88
  int32_t tlen = tSerializeSRetentionObj(NULL, 0, pObj);
31,320✔
89
  if (tlen < 0) {
×
90
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
91
    goto OVER;
92
  }
93

31,320✔
94
  int32_t size = sizeof(int32_t) + tlen;
31,320✔
95
  pRaw = sdbAllocRaw(SDB_RETENTION, MND_RETENTION_VER_NUMBER, size);
31,320✔
96
  if (pRaw == NULL) {
×
97
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
98
    goto OVER;
99
  }
100

31,320✔
101
  buf = taosMemoryMalloc(tlen);
31,320✔
102
  if (buf == NULL) {
×
103
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
104
    goto OVER;
105
  }
106

31,320✔
107
  tlen = tSerializeSRetentionObj(buf, tlen, pObj);
31,320✔
108
  if (tlen < 0) {
×
109
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
110
    goto OVER;
111
  }
112

31,320✔
113
  int32_t dataPos = 0;
31,320✔
114
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
31,320✔
115
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
31,320✔
116
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
117

31,320✔
118
OVER:
31,320✔
119
  taosMemoryFreeClear(buf);
31,320✔
120
  if (terrno != TSDB_CODE_SUCCESS) {
×
121
    mError("retention:%" PRId32 ", failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
122
    sdbFreeRaw(pRaw);
×
123
    return NULL;
124
  }
125

31,320✔
126
  mTrace("retention:%" PRId32 ", encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
31,320✔
127
  return pRaw;
128
}
129

27,519✔
130
SSdbRow *mndRetentionActionDecode(SSdbRaw *pRaw) {
27,519✔
131
  int32_t        code = 0;
27,519✔
132
  int32_t        lino = 0;
27,519✔
133
  SSdbRow       *pRow = NULL;
27,519✔
134
  SRetentionObj *pObj = NULL;
27,519✔
135
  void          *buf = NULL;
27,519✔
136
  terrno = TSDB_CODE_SUCCESS;
137

27,519✔
138
  int8_t sver = 0;
27,519✔
139
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
×
140
    goto OVER;
141
  }
142

27,519✔
143
  if (sver != MND_RETENTION_VER_NUMBER) {
×
144
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
145
    mError("retention read invalid ver, data ver: %d, curr ver: %d", sver, MND_RETENTION_VER_NUMBER);
×
146
    goto OVER;
147
  }
148

27,519✔
149
  pRow = sdbAllocRow(sizeof(SRetentionObj));
27,519✔
150
  if (pRow == NULL) {
×
151
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
152
    goto OVER;
153
  }
154

27,519✔
155
  pObj = sdbGetRowObj(pRow);
27,519✔
156
  if (pObj == NULL) {
×
157
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
158
    goto OVER;
159
  }
160

27,519✔
161
  int32_t tlen;
27,519✔
162
  int32_t dataPos = 0;
27,519✔
163
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
27,519✔
164
  buf = taosMemoryMalloc(tlen + 1);
27,519✔
165
  if (buf == NULL) {
×
166
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
167
    goto OVER;
168
  }
27,519✔
169
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
170

27,519✔
171
  if ((terrno = tDeserializeSRetentionObj(buf, tlen, pObj)) < 0) {
×
172
    goto OVER;
173
  }
174

27,519✔
175
OVER:
27,519✔
176
  taosMemoryFreeClear(buf);
27,519✔
177
  if (terrno != TSDB_CODE_SUCCESS) {
×
178
    mError("retention:%" PRId32 ", failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
×
179
    taosMemoryFreeClear(pRow);
×
180
    return NULL;
181
  }
182

27,519✔
183
  mTrace("retention:%" PRId32 ", decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
27,519✔
184
  return pRow;
185
}
186

14,930✔
187
int32_t mndRetentionActionInsert(SSdb *pSdb, SRetentionObj *pObj) {
14,930✔
188
  mTrace("retention:%" PRId32 ", perform insert action", pObj->id);
14,930✔
189
  return 0;
190
}
191

27,519✔
192
int32_t mndRetentionActionDelete(SSdb *pSdb, SRetentionObj *pObj) {
27,519✔
193
  mTrace("retention:%" PRId32 ", perform delete action", pObj->id);
27,519✔
194
  tFreeRetentionObj(pObj);
27,519✔
195
  return 0;
196
}
197

×
198
int32_t mndRetentionActionUpdate(SSdb *pSdb, SRetentionObj *pOldObj, SRetentionObj *pNewObj) {
×
199
  mTrace("retention:%" PRId32 ", perform update action, old row:%p new row:%p", pOldObj->id, pOldObj, pNewObj);
200

×
201
  return 0;
202
}
203

65,979✔
204
SRetentionObj *mndAcquireRetention(SMnode *pMnode, int32_t id) {
65,979✔
205
  SSdb          *pSdb = pMnode->pSdb;
65,979✔
206
  SRetentionObj *pObj = sdbAcquire(pSdb, SDB_RETENTION, &id);
65,979✔
207
  if (pObj == NULL && (terrno != TSDB_CODE_SDB_OBJ_NOT_THERE && terrno != TSDB_CODE_SDB_OBJ_CREATING &&
×
208
                       terrno != TSDB_CODE_SDB_OBJ_DROPPING)) {
×
209
    terrno = TSDB_CODE_APP_ERROR;
×
210
    mError("retention:%" PRId32 ", failed to acquire retention since %s", id, terrstr());
211
  }
65,979✔
212
  return pObj;
213
}
214

65,979✔
215
void mndReleaseRetention(SMnode *pMnode, SRetentionObj *pObj) {
65,979✔
216
  SSdb *pSdb = pMnode->pSdb;
65,979✔
217
  sdbRelease(pSdb, pObj);
65,979✔
218
}
219

26,695✔
220
static int32_t mndRetentionGetDbInfo(SMnode *pMnode, int32_t id, char *dbname, int32_t len, int64_t *dbUid) {
26,695✔
221
  int32_t        code = 0;
26,695✔
222
  SRetentionObj *pObj = mndAcquireRetention(pMnode, id);
26,695✔
223
  if (pObj == NULL) {
×
224
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
225
    if (terrno != 0) code = terrno;
×
226
    TAOS_RETURN(code);
227
  }
228

26,695✔
229
  tstrncpy(dbname, pObj->dbname, len);
26,695✔
230
  if (dbUid) *dbUid = pObj->dbUid;
26,695✔
231
  mndReleaseRetention(pMnode, pObj);
26,695✔
232
  TAOS_RETURN(code);
233
}
234

17,063✔
235
int32_t mndAddRetentionToTrans(SMnode *pMnode, STrans *pTrans, SRetentionObj *pObj, SDbObj *pDb, STrimDbRsp *rsp) {
17,063✔
236
  int32_t code = 0;
17,063✔
237
  pObj->id = tGenIdPI32();
238

17,063✔
239
  tstrncpy(pObj->dbname, pDb->name, sizeof(pObj->dbname));
17,063✔
240
  pObj->dbUid = pDb->uid;
241

17,063✔
242
  pObj->startTime = taosGetTimestampMs();
243

17,063✔
244
  SSdbRaw *pVgRaw = mndRetentionActionEncode(pObj);
17,063✔
245
  if (pVgRaw == NULL) {
×
246
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
247
    if (terrno != 0) code = terrno;
×
248
    TAOS_RETURN(code);
249
  }
17,063✔
250
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
×
251
    sdbFreeRaw(pVgRaw);
×
252
    TAOS_RETURN(code);
253
  }
254

17,063✔
255
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
×
256
    sdbFreeRaw(pVgRaw);
×
257
    TAOS_RETURN(code);
258
  }
259

17,063✔
260
  rsp->id = pObj->id;
261

17,063✔
262
  return 0;
263
}
264

41,238✔
265
static int32_t mndRetrieveRetention(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
41,238✔
266
  SMnode        *pMnode = pReq->info.node;
41,238✔
267
  SSdb          *pSdb = pMnode->pSdb;
41,238✔
268
  int32_t        numOfRows = 0;
41,238✔
269
  SRetentionObj *pObj = NULL;
41,238✔
270
  char          *sep = NULL;
41,238✔
271
  SDbObj        *pDb = NULL;
41,238✔
272
  int32_t        code = 0, lino = 0;
41,238✔
273
  char           tmpBuf[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
274
  SUserObj      *pUser = NULL;
41,238✔
275
  SDbObj        *pIterDb = NULL;
41,238✔
276
  char           objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
×
277
  bool           showAll = false, showIter = false;
41,238✔
278
  int64_t        dbUid = 0;
×
279

280
  if ((pShow->db[0] != 0) && (sep = strchr(pShow->db, '.')) && (*(++sep) != 0)) {
281
    if (IS_SYS_DBNAME(sep)) {
282
      goto _OVER;
71,100✔
283
    } else if (!(pDb = mndAcquireDb(pMnode, pShow->db))) {
71,100✔
284
      return terrno;
71,100✔
285
    }
286
  }
287

29,862✔
288
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), PRIV_SHOW_RETENTIONS, PRIV_OBJ_DB, 0, _OVER);
289

29,862✔
290
  while (numOfRows < rows) {
29,862✔
291
    pShow->pIter = sdbFetch(pSdb, SDB_RETENTION, pShow->pIter, (void **)&pObj);
292
    if (pShow->pIter == NULL) break;
29,862✔
293

29,862✔
294
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pObj->dbname, pObj, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_RETENTIONS, _OVER);
×
295

×
296
    SColumnInfoData *pColInfo;
297
    int32_t          cols = 0;
29,862✔
298

29,862✔
299
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
300
    COL_DATA_SET_VAL_GOTO((const char *)&pObj->id, false, pObj, pShow->pIter, _OVER);
×
301

×
302
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
303
    if (pDb != NULL && strcmp(pDb->name, pObj->dbname) != 0) {
29,862✔
304
      sdbRelease(pSdb, pObj);
29,862✔
305
      continue;
29,862✔
306
    }
307
    SName name = {0};
29,862✔
308
    if ((code = tNameFromString(&name, pObj->dbname, T_NAME_ACCT | T_NAME_DB)) != 0) {
29,862✔
309
      sdbRelease(pSdb, pObj);
310
      TAOS_CHECK_GOTO(code, &lino, _OVER);
29,862✔
311
    }
29,862✔
312
    (void)tNameGetDbName(&name, varDataVal(tmpBuf));
313
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
29,862✔
314
    COL_DATA_SET_VAL_GOTO((const char *)tmpBuf, false, pObj, pShow->pIter, _OVER);
29,862✔
315

316
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
29,862✔
317
    COL_DATA_SET_VAL_GOTO((const char *)&pObj->startTime, false, pObj, pShow->pIter, _OVER);
29,862✔
318

29,862✔
UNCOV
319
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
320
    tstrncpy(varDataVal(tmpBuf), pObj->triggerType == TSDB_TRIGGER_MANUAL ? "manual" : "auto",
29,862✔
321
             sizeof(tmpBuf) - VARSTR_HEADER_SIZE);
23,463✔
322
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
323
    COL_DATA_SET_VAL_GOTO((const char *)tmpBuf, false, pObj, pShow->pIter, _OVER);
29,862✔
324

29,862✔
325
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
29,862✔
326
    char *optr = "trim";
327
    if (pObj->optrType == TSDB_OPTR_SSMIGRATE) {
29,862✔
328
      optr = "ssmigrate";
29,862✔
329
    } else if (pObj->optrType == TSDB_OPTR_ROLLUP) {
330
      optr = "rollup";
331
    }
41,238✔
332
    tstrncpy(varDataVal(tmpBuf), optr, sizeof(tmpBuf) - VARSTR_HEADER_SIZE);
41,238✔
333
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
41,238✔
334
    COL_DATA_SET_VAL_GOTO((const char *)tmpBuf, false, pObj, pShow->pIter, _OVER);
×
UNCOV
335

×
336
    sdbRelease(pSdb, pObj);
337
    ++numOfRows;
41,238✔
338
  }
41,238✔
339

340
_OVER:
341
  if (pUser) mndReleaseUser(pMnode, pUser);
×
342
  mndReleaseDb(pMnode, pDb);
×
343
  if (code != 0) {
×
UNCOV
344
    mError("failed to retrieve retention at line %d since %s", lino, tstrerror(code));
×
345
    TAOS_RETURN(code);
346
  }
×
347
  pShow->numOfRows += numOfRows;
×
348
  return numOfRows;
×
349
}
×
350

×
UNCOV
351
static void mndCancelRetrieveRetention(SMnode *pMnode, void *pIter) {
×
352
  SSdb *pSdb = pMnode->pSdb;
353
  sdbCancelFetchByType(pSdb, pIter, SDB_RETENTION);
×
354
}
×
355

×
356
static void *mndBuildKillRetentionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t id, int32_t dnodeId) {
×
UNCOV
357
  SVKillRetentionReq req = {0};
×
358
  req.taskId = id;
UNCOV
359
  req.vgId = pVgroup->vgId;
×
360
  req.dnodeId = dnodeId;
361
  terrno = 0;
×
362

×
363
  mInfo("vgId:%d, build kill retention req", pVgroup->vgId);
×
UNCOV
364
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
×
365
  if (contLen < 0) {
366
    terrno = TSDB_CODE_OUT_OF_MEMORY;
367
    return NULL;
×
368
  }
×
UNCOV
369
  contLen += sizeof(SMsgHead);
×
370

371
  void *pReq = taosMemoryMalloc(contLen);
×
372
  if (pReq == NULL) {
×
373
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
374
    return NULL;
×
UNCOV
375
  }
×
376

377
  SMsgHead *pHead = pReq;
×
UNCOV
378
  pHead->contLen = htonl(contLen);
×
379
  pHead->vgId = htonl(pVgroup->vgId);
380

381
  int32_t ret = 0;
×
382
  if ((ret = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
×
UNCOV
383
    taosMemoryFreeClear(pReq);
×
384
    terrno = ret;
385
    return NULL;
×
386
  }
×
387
  *pContLen = contLen;
×
388
  return pReq;
×
UNCOV
389
}
×
390

391
static int32_t mndAddKillRetentionAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t id, int32_t dnodeId) {
×
UNCOV
392
  int32_t      code = 0;
×
393
  STransAction action = {0};
394

×
395
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
396
  if (pDnode == NULL) {
×
397
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
398
    if (terrno != 0) code = terrno;
×
UNCOV
399
    TAOS_RETURN(code);
×
400
  }
401
  action.epSet = mndGetDnodeEpset(pDnode);
402
  mndReleaseDnode(pMnode, pDnode);
×
403

×
UNCOV
404
  int32_t contLen = 0;
×
405
  void   *pReq = mndBuildKillRetentionReq(pMnode, pVgroup, &contLen, id, dnodeId);
UNCOV
406
  if (pReq == NULL) {
×
407
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
408
    if (terrno != 0) code = terrno;
×
409
    TAOS_RETURN(code);
×
UNCOV
410
  }
×
411

412
  action.pCont = pReq;
UNCOV
413
  action.contLen = contLen;
×
414
  action.msgType = TDMT_VND_KILL_TRIM;
415

416
  mTrace("trans:%d, kill retention msg len:%d", pTrans->id, contLen);
×
417

×
418
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
419
    taosMemoryFree(pReq);
×
420
    TAOS_RETURN(code);
×
421
  }
×
422

×
UNCOV
423
  return 0;
×
424
}
UNCOV
425

×
426
static int32_t mndKillRetention(SMnode *pMnode, SRpcMsg *pReq, SRetentionObj *pObj) {
UNCOV
427
  int32_t code = 0;
×
428
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-retention");
429
  if (pTrans == NULL) {
×
430
    mError("retention:%" PRId32 ", failed to drop since %s", pObj->id, terrstr());
×
431
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
432
    if (terrno != 0) code = terrno;
×
433
    TAOS_RETURN(code);
×
UNCOV
434
  }
×
435
  mInfo("trans:%d, used to kill retention:%" PRId32, pTrans->id, pObj->id);
436

×
437
  mndTransSetDbName(pTrans, pObj->dbname, NULL);
×
438

×
UNCOV
439
  SSdbRaw *pCommitRaw = mndRetentionActionEncode(pObj);
×
440
  if (pCommitRaw == NULL) {
441
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
442
    if (terrno != 0) code = terrno;
×
UNCOV
443
    mndTransDrop(pTrans);
×
444
    TAOS_RETURN(code);
445
  }
446
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
447
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
448
    mndTransDrop(pTrans);
×
449
    TAOS_RETURN(code);
×
UNCOV
450
  }
×
451
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
452
    mndTransDrop(pTrans);
×
453
    TAOS_RETURN(code);
×
454
  }
×
455

×
456
  void *pIter = NULL;
×
457
  while (1) {
×
458
    SCompactDetailObj *pDetail = NULL;
×
459
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
×
460
    if (pIter == NULL) break;
×
UNCOV
461

×
462
    if (pDetail->id == pObj->id) {
463
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
464
      if (pVgroup == NULL) {
×
465
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
466
        sdbCancelFetch(pMnode->pSdb, pIter);
×
467
        sdbRelease(pMnode->pSdb, pDetail);
×
468
        mndTransDrop(pTrans);
×
UNCOV
469
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
470
        if (terrno != 0) code = terrno;
471
        TAOS_RETURN(code);
UNCOV
472
      }
×
473

474
      if ((code = mndAddKillRetentionAction(pMnode, pTrans, pVgroup, pObj->id, pDetail->dnodeId)) != 0) {
UNCOV
475
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
476
        sdbCancelFetch(pMnode->pSdb, pIter);
477
        sdbRelease(pMnode->pSdb, pDetail);
478
        mndTransDrop(pTrans);
×
479
        TAOS_RETURN(code);
×
480
      }
×
UNCOV
481

×
482
      mndReleaseVgroup(pMnode, pVgroup);
483
    }
484

×
UNCOV
485
    sdbRelease(pMnode->pSdb, pDetail);
×
486
  }
487

488
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
489
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
490
    mndTransDrop(pTrans);
×
491
    TAOS_RETURN(code);
×
UNCOV
492
  }
×
493

494
  mndTransDrop(pTrans);
×
UNCOV
495
  return 0;
×
496
}
497

UNCOV
498
int32_t mndProcessKillRetentionReq(SRpcMsg *pReq) {
×
499
  int32_t           code = 0;
500
  int32_t           lino = 0;
×
501
  SKillRetentionReq req = {0};  // reuse SKillCompactReq
×
502
  int64_t           tss = taosGetTimestampMs();
×
503

×
504
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
UNCOV
505
    TAOS_RETURN(code);
×
506
  }
507

UNCOV
508
  mInfo("start to kill retention:%" PRId32, req.id);
×
509

UNCOV
510
  SMnode        *pMnode = pReq->info.node;
×
511
  SRetentionObj *pObj = mndAcquireRetention(pMnode, req.id);
UNCOV
512
  if (pObj == NULL) {
×
513
    code = TSDB_CODE_MND_INVALID_RETENTION_ID;
514
    tFreeSKillCompactReq(&req);
×
515
    TAOS_RETURN(code);
×
516
  }
×
517

×
518
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_TRIM_DB), &lino, _OVER);
×
519

×
520
  TAOS_CHECK_GOTO(mndKillRetention(pMnode, pReq, pObj), &lino, _OVER);
×
UNCOV
521

×
522
  code = TSDB_CODE_ACTION_IN_PROGRESS;
UNCOV
523

×
524
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
525
    char    obj[TSDB_INT32_ID_LEN] = {0};
526
    int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pObj->id);
×
527
    if ((uint32_t)nBytes < sizeof(obj)) {
×
UNCOV
528
      int64_t tse = taosGetTimestampMs();
×
529
      double  duration = (double)(tse - tss);
530
      duration = duration / 1000;
531
      auditRecord(pReq, pMnode->clusterId, "killRetention", pObj->dbname, obj, req.sql, req.sqlLen, duration, 0);
×
UNCOV
532
    } else {
×
533
      mError("retention:%" PRId32 " failed to audit since %s", pObj->id, tstrerror(TSDB_CODE_OUT_OF_RANGE));
UNCOV
534
    }
×
535
  }
536
_OVER:
537
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
538
    mError("failed to kill retention %" PRId32 " since %s", req.id, terrstr());
63,610✔
539
  }
63,610✔
540

541
  tFreeSKillCompactReq((SKillCompactReq *)&req);
63,610✔
542
  mndReleaseRetention(pMnode, pObj);
81,118✔
543

144,728✔
544
  TAOS_RETURN(code);
144,728✔
545
}
144,728✔
546

547
// update progress
144,728✔
548
static int32_t mndUpdateRetentionProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t id, SQueryRetentionProgressRsp *rsp) {
63,610✔
549
  int32_t code = 0;
63,610✔
550

63,610✔
551
  void *pIter = NULL;
63,610✔
552
  while (1) {
553
    SRetentionDetailObj *pDetail = NULL;
63,610✔
554
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
63,610✔
555
    if (pIter == NULL) break;
556

63,610✔
557
    if (pDetail->id == id && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
558
      pDetail->newNumberFileset = rsp->numberFileset;
559
      pDetail->newFinished = rsp->finished;
81,118✔
560
      pDetail->progress = rsp->progress;
561
      pDetail->remainingTime = rsp->remainingTime;
UNCOV
562

×
563
      sdbCancelFetch(pMnode->pSdb, pIter);
564
      sdbRelease(pMnode->pSdb, pDetail);
565

63,610✔
566
      TAOS_RETURN(code);
63,610✔
567
    }
63,610✔
568

63,610✔
569
    sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
570
  }
×
571

572
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
63,610✔
573
}
63,610✔
UNCOV
574

×
575
int32_t mndProcessQueryRetentionRsp(SRpcMsg *pReq) {
UNCOV
576
  int32_t                    code = 0;
×
577
  SQueryRetentionProgressRsp req = {0};
578
  if (pReq->code != 0) {
579
    mError("received wrong retention response, req code is %s", tstrerror(pReq->code));
63,610✔
580
    TAOS_RETURN(pReq->code);
581
  }
582
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
63,610✔
583
  if (code != 0) {
584
    mError("failed to deserialize vnode-query-retention-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
63,610✔
585
           pReq->contLen);
63,610✔
UNCOV
586
    TAOS_RETURN(code);
×
587
  }
UNCOV
588

×
589
  mDebug("retention:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.id, req.vgId,
590
         req.dnodeId, req.numberFileset, req.finished);
591

63,610✔
592
  SMnode *pMnode = pReq->info.node;
593

594
  code = mndUpdateRetentionProgress(pMnode, pReq, req.id, &req);
595
  if (code != 0) {
26,695✔
596
    mError("retention:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.id,
26,695✔
597
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
598
    TAOS_RETURN(code);
63,610✔
599
  }
90,305✔
600

90,305✔
601
  TAOS_RETURN(code);
90,305✔
602
}
603

63,610✔
604
// timer
63,610✔
605
void mndRetentionSendProgressReq(SMnode *pMnode, SRetentionObj *pObj) {
606
  void *pIter = NULL;
63,610✔
607

63,610✔
608
  while (1) {
63,610✔
609
    SRetentionDetailObj *pDetail = NULL;
×
UNCOV
610
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
×
611
    if (pIter == NULL) break;
612

63,610✔
613
    if (pDetail->id == pObj->id) {
614
      SEpSet epSet = {0};
63,610✔
615

63,610✔
616
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
63,610✔
617
      if (pDnode == NULL) break;
63,610✔
618
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
619
        sdbRelease(pMnode->pSdb, pDetail);
63,610✔
620
        continue;
63,610✔
621
      }
×
UNCOV
622
      mndReleaseDnode(pMnode, pDnode);
×
623

624
      SQueryRetentionProgressReq req;
625
      req.id = pDetail->id;
63,610✔
626
      req.vgId = pDetail->vgId;
627
      req.dnodeId = pDetail->dnodeId;
63,610✔
628

63,610✔
629
      int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
×
UNCOV
630
      if (contLen < 0) {
×
631
        sdbRelease(pMnode->pSdb, pDetail);
632
        continue;
633
      }
63,610✔
634

63,610✔
635
      contLen += sizeof(SMsgHead);
636

63,610✔
637
      SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
638
      if (pHead == NULL) {
×
639
        sdbRelease(pMnode->pSdb, pDetail);
640
        continue;
641
      }
63,610✔
642

643
      pHead->contLen = htonl(contLen);
63,610✔
644
      pHead->vgId = htonl(pDetail->vgId);
645

63,610✔
646
      if (tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
127,220✔
647
        sdbRelease(pMnode->pSdb, pDetail);
127,220✔
648
        continue;
127,220✔
649
      }
63,610✔
650

651
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_TRIM_PROGRESS, .contLen = contLen};
652

63,610✔
653
      rpcMsg.pCont = pHead;
654

63,610✔
655
      char    detail[1024] = {0};
×
UNCOV
656
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
×
657
                              TMSG_INFO(TDMT_VND_QUERY_TRIM_PROGRESS), epSet.numOfEps, epSet.inUse);
658
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
659
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
660
      }
63,610✔
661

662
      mDebug("retention:%d, send update progress msg to %s", pDetail->id, detail);
26,695✔
663

664
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
26,695✔
665
        sdbRelease(pMnode->pSdb, pDetail);
26,695✔
666
        continue;
26,695✔
667
      }
26,695✔
668
    }
63,610✔
669

90,305✔
670
    sdbRelease(pMnode->pSdb, pDetail);
90,305✔
671
  }
90,305✔
672
}
673

63,610✔
674
static int32_t mndSaveRetentionProgress(SMnode *pMnode, int32_t id) {
63,610✔
675
  int32_t code = 0;
676
  bool    needSave = false;
677
  void   *pIter = NULL;
678
  while (1) {
679
    SRetentionDetailObj *pDetail = NULL;
680
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
681
    if (pIter == NULL) break;
63,610✔
682

34,692✔
683
    if (pDetail->id == id) {
684
      mDebug(
685
          "retention:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
63,610✔
686
          "newNumberFileset:%d, newFinished:%d",
687
          pDetail->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
688
          pDetail->newNumberFileset, pDetail->newFinished);
26,695✔
689

26,695✔
690
      // these 2 number will jump back after dnode restart, so < is not used here
26,695✔
691
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
692
        needSave = true;
26,695✔
693
    }
×
UNCOV
694

×
695
    sdbRelease(pMnode->pSdb, pDetail);
696
  }
697

26,695✔
698
  char    dbname[TSDB_TABLE_FNAME_LEN] = {0};
11,096✔
699
  int64_t dbUid = 0;
11,096✔
700
  TAOS_CHECK_RETURN(mndRetentionGetDbInfo(pMnode, id, dbname, TSDB_TABLE_FNAME_LEN, &dbUid));
701

702
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
15,599✔
703
    needSave = true;
15,599✔
704
    mWarn("retention:%" PRId32 ", no db exist, set needSave:%s", id, dbname);
×
705
  }
×
706

×
UNCOV
707
  if (!needSave) {
×
708
    mDebug("retention:%" PRId32 ", no need to save", id);
709
    TAOS_RETURN(code);
15,599✔
710
  }
711

15,599✔
712
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-retention-progress");
713
  if (pTrans == NULL) {
15,599✔
714
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
40,740✔
715
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
56,339✔
716
    if (terrno != 0) code = terrno;
56,339✔
717
    TAOS_RETURN(code);
56,339✔
718
  }
719
  mInfo("retention:%d, trans:%d, used to update retention progress.", id, pTrans->id);
40,740✔
720

40,740✔
721
  mndTransSetDbName(pTrans, dbname, NULL);
722

723
  pIter = NULL;
724
  while (1) {
725
    SRetentionDetailObj *pDetail = NULL;
726
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
40,740✔
727
    if (pIter == NULL) break;
40,740✔
728

729
    if (pDetail->id == id) {
40,740✔
730
      mInfo(
40,740✔
731
          "retention:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
×
732
          "newNumberFileset:%d, newFinished:%d",
×
733
          pDetail->id, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
×
734
          pDetail->newNumberFileset, pDetail->newFinished);
×
735

×
UNCOV
736
      pDetail->numberFileset = pDetail->newNumberFileset;
×
737
      pDetail->finished = pDetail->newFinished;
738

40,740✔
739
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
×
740
      if (pCommitRaw == NULL) {
×
741
        sdbCancelFetch(pMnode->pSdb, pIter);
×
742
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
743
        mndTransDrop(pTrans);
×
744
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
745
        if (terrno != 0) code = terrno;
40,740✔
746
        TAOS_RETURN(code);
×
747
      }
×
748
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
UNCOV
749
        mError("retention:%d, trans:%d, failed to append commit log since %s", pDetail->id, pTrans->id, terrstr());
×
750
        sdbCancelFetch(pMnode->pSdb, pIter);
751
        sdbRelease(pMnode->pSdb, pDetail);
752
        mndTransDrop(pTrans);
753
        TAOS_RETURN(code);
40,740✔
754
      }
755
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
756
        sdbCancelFetch(pMnode->pSdb, pIter);
15,599✔
757
        sdbRelease(pMnode->pSdb, pDetail);
15,599✔
758
        mndTransDrop(pTrans);
31,782✔
759
        TAOS_RETURN(code);
47,381✔
760
      }
47,381✔
761
    }
47,381✔
762

763
    sdbRelease(pMnode->pSdb, pDetail);
34,792✔
764
  }
34,792✔
765

766
  bool allFinished = true;
767
  pIter = NULL;
34,792✔
768
  while (1) {
2,299✔
769
    SRetentionDetailObj *pDetail = NULL;
2,299✔
770
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
2,299✔
771
    if (pIter == NULL) break;
2,299✔
772

773
    if (pDetail->id == id) {
32,493✔
774
      mInfo("retention:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
711✔
775
            pDetail->id, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
711✔
776

711✔
777
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
711✔
778
        allFinished = false;
779
        sdbCancelFetch(pMnode->pSdb, pIter);
780
        sdbRelease(pMnode->pSdb, pDetail);
781
        break;
31,782✔
782
      }
783
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
784
        allFinished = false;
15,599✔
785
        sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
786
        sdbRelease(pMnode->pSdb, pDetail);
×
787
        break;
788
      }
789
    }
15,599✔
790

12,589✔
791
    sdbRelease(pMnode->pSdb, pDetail);
12,589✔
792
  }
27,775✔
793

40,364✔
794
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
40,364✔
795
    allFinished = true;
40,364✔
796
    mWarn("retention:%" PRId32 ", no db exist, set all finished:%s", id, dbname);
797
  }
27,775✔
798

27,775✔
799
  if (allFinished) {
27,775✔
800
    mInfo("retention:%d, all finished", id);
×
801
    pIter = NULL;
×
802
    while (1) {
×
UNCOV
803
      SRetentionDetailObj *pDetail = NULL;
×
804
      pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
805
      if (pIter == NULL) break;
27,775✔
UNCOV
806

×
807
      if (pDetail->id == id) {
808
        SSdbRaw *pCommitRaw = mndRetentionDetailActionEncode(pDetail);
×
809
        if (pCommitRaw == NULL) {
×
810
          mndTransDrop(pTrans);
×
UNCOV
811
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
812
          if (terrno != 0) code = terrno;
813
          TAOS_RETURN(code);
27,775✔
814
        }
×
815
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
816
          mError("retention:%d, trans:%d, failed to append commit log since %s", pDetail->id, pTrans->id,
×
UNCOV
817
                 tstrerror(code));
×
818
          sdbCancelFetch(pMnode->pSdb, pIter);
819
          sdbRelease(pMnode->pSdb, pDetail);
27,775✔
820
          mndTransDrop(pTrans);
821
          TAOS_RETURN(code);
822
        }
27,775✔
823
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
824
          sdbCancelFetch(pMnode->pSdb, pIter);
825
          sdbRelease(pMnode->pSdb, pDetail);
12,589✔
826
          mndTransDrop(pTrans);
12,589✔
827
          TAOS_RETURN(code);
×
828
        }
×
829
        mInfo("retention:%d, add drop compactdetail action", pDetail->compactDetailId);
×
UNCOV
830
      }
×
831

832
      sdbRelease(pMnode->pSdb, pDetail);
12,589✔
833
    }
12,589✔
834

12,589✔
835
    SRetentionObj *pObj = mndAcquireRetention(pMnode, id);
×
836
    if (pObj == NULL) {
×
837
      mndTransDrop(pTrans);
×
UNCOV
838
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
839
      if (terrno != 0) code = terrno;
840
      TAOS_RETURN(code);
12,589✔
841
    }
×
842
    SSdbRaw *pCommitRaw = mndRetentionActionEncode(pObj);
×
UNCOV
843
    mndReleaseRetention(pMnode, pObj);
×
844
    if (pCommitRaw == NULL) {
845
      mndTransDrop(pTrans);
12,589✔
846
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
847
      if (terrno != 0) code = terrno;
×
UNCOV
848
      TAOS_RETURN(code);
×
849
    }
850
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
12,589✔
851
      mError("retention:%d, trans:%d, failed to append commit log since %s", id, pTrans->id, tstrerror(code));
852
      mndTransDrop(pTrans);
853
      TAOS_RETURN(code);
15,599✔
854
    }
×
855
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
×
UNCOV
856
      mError("retention:%d, trans:%d, failed to append commit log since %s", id, pTrans->id, tstrerror(code));
×
857
      mndTransDrop(pTrans);
858
      TAOS_RETURN(code);
859
    }
15,599✔
860
    mInfo("retention:%d, add drop compact action", pObj->id);
15,599✔
861
  }
862

863
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
2,513,854✔
864
    mError("retention:%d, trans:%d, failed to prepare since %s", id, pTrans->id, tstrerror(code));
2,513,854✔
865
    mndTransDrop(pTrans);
2,513,854✔
866
    TAOS_RETURN(code);
2,513,854✔
867
  }
2,513,854✔
868

869
  mndTransDrop(pTrans);
2,513,854✔
870
  return 0;
26,695✔
871
}
2,540,549✔
872

2,540,549✔
873
static void mndRetentionPullup(SMnode *pMnode) {
2,540,549✔
874
  int32_t code = 0;
53,390✔
UNCOV
875
  SSdb   *pSdb = pMnode->pSdb;
×
876
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_RETENTION), sizeof(int32_t));
877
  if (pArray == NULL) return;
26,695✔
878

879
  void *pIter = NULL;
880
  while (1) {
2,540,549✔
881
    SRetentionObj *pObj = NULL;
26,695✔
882
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION, pIter, (void **)&pObj);
26,695✔
883
    if (pIter == NULL) break;
26,695✔
884
    if (taosArrayPush(pArray, &pObj->id) == NULL) {
26,695✔
885
      mError("failed to push retention id:%d into array, but continue pull up", pObj->id);
26,695✔
886
    }
26,695✔
887
    sdbRelease(pSdb, pObj);
26,695✔
UNCOV
888
  }
×
889

890
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
26,695✔
891
    int32_t *pId = taosArrayGet(pArray, i);
892
    mInfo("begin to pull up retention:%d", *pId);
893
    SRetentionObj *pObj = mndAcquireRetention(pMnode, *pId);
2,513,854✔
894
    if (pObj != NULL) {
895
      mInfo("retention:%d, begin to pull up", pObj->id);
896
      mndRetentionSendProgressReq(pMnode, pObj);
×
897
      if ((code = mndSaveRetentionProgress(pMnode, pObj->id)) != 0) {
×
898
        mError("retention:%d, failed to save retention progress since %s", pObj->id, tstrerror(code));
×
UNCOV
899
      }
×
900
      mndReleaseRetention(pMnode, pObj);
901
    }
902
  }
×
UNCOV
903
  taosArrayDestroy(pArray);
×
904
}
905

906
static int32_t mndTrimDbDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
×
907
  int64_t tss = taosGetTimestampMs();
×
908
  if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
×
909
    return 0;
×
910
  }
×
UNCOV
911

×
912
  if (tsAuditLevel < AUDIT_LEVEL_CLUSTER) {
913
    return 0;
×
UNCOV
914
  }
×
915

916
  SName   name = {0};
917
  int32_t sqlLen = 0;
×
918
  char    sql[256] = {0};
×
UNCOV
919
  char    skeyStr[40] = {0};
×
920
  char    ekeyStr[40] = {0};
UNCOV
921
  char   *pDbName = pDb->name;
×
922

923
  if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) {
924
    pDbName = name.dbname;
925
  }
×
926

×
927
  if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 &&
×
UNCOV
928
      taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) {
×
929
    sqlLen = tsnprintf(sql, sizeof(sql), "trim db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr);
UNCOV
930
  } else {
×
931
    sqlLen = tsnprintf(sql, sizeof(sql), "trim db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
932
                       tw->ekey);
933
  }
934

935
  int64_t tse = taosGetTimestampMs();
51✔
936
  double  duration = (double)(tse - tss);
51✔
937
  duration = duration / 1000;
51✔
938
  auditRecord(NULL, pMnode->clusterId, "autoTrimDB", name.dbname, "", sql, sqlLen, duration, 0);
51✔
939

51✔
940
  return 0;
51✔
941
}
942

943
extern int32_t mndTrimDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds,
51✔
944
                         ETsdbOpType type, ETriggerType triggerType);
51✔
945
static int32_t mndTrimDbDispatch(SRpcMsg *pReq) {
51✔
946
  int32_t    code = 0, lino = 0;
×
947
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
948
  SSdb      *pSdb = pMnode->pSdb;
×
949
  int64_t    curSec = taosGetTimestampMs() / 1000;
950
  STrimDbReq trimReq = {
UNCOV
951
      .tw.skey = INT64_MIN, .tw.ekey = curSec, .optrType = TSDB_OPTR_NORMAL, .triggerType = TSDB_TRIGGER_AUTO};
×
952

UNCOV
953
  void   *pIter = NULL;
×
954
  SDbObj *pDb = NULL;
UNCOV
955
  while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
×
956
    if (pDb->cfg.isMount) {
957
      sdbRelease(pSdb, pDb);
958
      continue;
×
959
    }
×
UNCOV
960

×
961
    (void)snprintf(trimReq.db, sizeof(trimReq.db), "%s", pDb->name);
962

UNCOV
963
    if ((code = mndTrimDb(pMnode, pReq, pDb, trimReq.tw, trimReq.vgroupIds, trimReq.optrType, trimReq.triggerType)) ==
×
964
        0) {
UNCOV
965
      mInfo("db:%s, start to auto trim, optr:%u, tw:%" PRId64 ",%" PRId64, trimReq.db, trimReq.optrType,
×
966
            trimReq.tw.skey, trimReq.tw.ekey);
967
    } else {
51✔
968
      mError("db:%s, failed to auto trim since %s", pDb->name, tstrerror(code));
51✔
969
      sdbRelease(pSdb, pDb);
970
      continue;
971
    }
2,513,854✔
972

2,513,854✔
973
    TAOS_UNUSED(mndTrimDbDispatchAudit(pMnode, pReq, pDb, &trimReq.tw));
2,513,854✔
974

2,513,854✔
975
    sdbRelease(pSdb, pDb);
976
  }
977
_exit:
51✔
978
  return code;
51✔
979
}
51✔
980

981
static int32_t mndProcessQueryRetentionTimer(SRpcMsg *pReq) {
982
  mTrace("start to process query trim timer");
983
  mndRetentionPullup(pReq->info.node);
984
  return 0;
985
}
986

987
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq) {
988
  mTrace("start to process trim db timer");
989
  return mndTrimDbDispatch(pReq);
990
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc