• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4944

30 Jan 2026 06:19AM UTC coverage: 66.849% (+0.1%) from 66.718%
#4944

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1124 of 2018 new or added lines in 72 files covered. (55.7%)

13677 existing lines in 155 files now uncovered.

205211 of 306978 relevant lines covered (66.85%)

125657591.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.69
/source/dnode/mnode/impl/src/mndRetention.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndRetention.h"
16
#include "audit.h"
17
#include "mndCompact.h"
18
#include "mndCompactDetail.h"
19
#include "mndDb.h"
20
#include "mndDef.h"
21
#include "mndDnode.h"
22
#include "mndPrivilege.h"
23
#include "mndRetentionDetail.h"
24
#include "mndShow.h"
25
#include "mndTrans.h"
26
#include "mndUser.h"
27
#include "mndVgroup.h"
28
#include "tmisce.h"
29
#include "tmsgcb.h"
30

31
#define MND_RETENTION_VER_NUMBER 1
32

33
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq);
34
static int32_t mndProcessQueryRetentionTimer(SRpcMsg *pReq);
35
static int32_t mndRetrieveRetention(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
36
static void    mndCancelRetrieveRetention(SMnode *pMnode, void *pIter);
37

38
/**
39
 * @brief mndInitRetention
40
 *  init retention module.
41
 *  - trim is equivalent to retention
42
 * @param pMnode
43
 * @return
44
 */
45

46
int32_t mndInitRetention(SMnode *pMnode) {
415,885✔
47
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_RETENTION, mndRetrieveRetention);
415,885✔
48
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_RETENTION, mndCancelRetrieveRetention);
415,885✔
49
  mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB_TIMER, mndProcessTrimDbTimer);
415,885✔
50
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRIM, mndProcessKillRetentionReq);  // trim is equivalent to retention
415,885✔
51
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_TRIM_PROGRESS_RSP, mndProcessQueryRetentionRsp);
415,885✔
52
  mndSetMsgHandle(pMnode, TDMT_MND_QUERY_TRIM_TIMER, mndProcessQueryRetentionTimer);
415,885✔
53
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_TRIM_RSP, mndTransProcessRsp);
415,885✔
54

55
  SSdbTable table = {
415,885✔
56
      .sdbType = SDB_RETENTION,
57
      .keyType = SDB_KEY_INT32,
58
      .encodeFp = (SdbEncodeFp)mndRetentionActionEncode,
59
      .decodeFp = (SdbDecodeFp)mndRetentionActionDecode,
60
      .insertFp = (SdbInsertFp)mndRetentionActionInsert,
61
      .updateFp = (SdbUpdateFp)mndRetentionActionUpdate,
62
      .deleteFp = (SdbDeleteFp)mndRetentionActionDelete,
63
  };
64

65
  return sdbSetTable(pMnode->pSdb, table);
415,885✔
66
}
67

68
void mndCleanupRetention(SMnode *pMnode) { mDebug("mnd retention cleanup"); }
415,825✔
69

70
void tFreeRetentionObj(SRetentionObj *pObj) { tFreeCompactObj((SCompactObj *)pObj); }
57,940✔
71

72
int32_t tSerializeSRetentionObj(void *buf, int32_t bufLen, const SRetentionObj *pObj) {
131,516✔
73
  return tSerializeSCompactObj(buf, bufLen, (const SCompactObj *)pObj);
131,516✔
74
}
75

76
int32_t tDeserializeSRetentionObj(void *buf, int32_t bufLen, SRetentionObj *pObj) {
57,940✔
77
  return tDeserializeSCompactObj(buf, bufLen, (SCompactObj *)pObj);
57,940✔
78
}
79

80
SSdbRaw *mndRetentionActionEncode(SRetentionObj *pObj) {
65,758✔
81
  int32_t code = 0;
65,758✔
82
  int32_t lino = 0;
65,758✔
83
  terrno = TSDB_CODE_SUCCESS;
65,758✔
84

85
  void    *buf = NULL;
65,758✔
86
  SSdbRaw *pRaw = NULL;
65,758✔
87

88
  int32_t tlen = tSerializeSRetentionObj(NULL, 0, pObj);
65,758✔
89
  if (tlen < 0) {
65,758✔
90
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
91
    goto OVER;
×
92
  }
93

94
  int32_t size = sizeof(int32_t) + tlen;
65,758✔
95
  pRaw = sdbAllocRaw(SDB_RETENTION, MND_RETENTION_VER_NUMBER, size);
65,758✔
96
  if (pRaw == NULL) {
65,758✔
97
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
98
    goto OVER;
×
99
  }
100

101
  buf = taosMemoryMalloc(tlen);
65,758✔
102
  if (buf == NULL) {
65,758✔
103
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
104
    goto OVER;
×
105
  }
106

107
  tlen = tSerializeSRetentionObj(buf, tlen, pObj);
65,758✔
108
  if (tlen < 0) {
65,758✔
109
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
110
    goto OVER;
×
111
  }
112

113
  int32_t dataPos = 0;
65,758✔
114
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
65,758✔
115
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
65,758✔
116
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
65,758✔
117

118
OVER:
65,758✔
119
  taosMemoryFreeClear(buf);
65,758✔
120
  if (terrno != TSDB_CODE_SUCCESS) {
65,758✔
121
    mError("retention:%" PRId32 ", failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
122
    sdbFreeRaw(pRaw);
×
123
    return NULL;
×
124
  }
125

126
  mTrace("retention:%" PRId32 ", encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
65,758✔
127
  return pRaw;
65,758✔
128
}
129

130
SSdbRow *mndRetentionActionDecode(SSdbRaw *pRaw) {
57,940✔
131
  int32_t        code = 0;
57,940✔
132
  int32_t        lino = 0;
57,940✔
133
  SSdbRow       *pRow = NULL;
57,940✔
134
  SRetentionObj *pObj = NULL;
57,940✔
135
  void          *buf = NULL;
57,940✔
136
  terrno = TSDB_CODE_SUCCESS;
57,940✔
137

138
  int8_t sver = 0;
57,940✔
139
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
57,940✔
140
    goto OVER;
×
141
  }
142

143
  if (sver != MND_RETENTION_VER_NUMBER) {
57,940✔
144
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
145
    mError("retention read invalid ver, data ver: %d, curr ver: %d", sver, MND_RETENTION_VER_NUMBER);
×
146
    goto OVER;
×
147
  }
148

149
  pRow = sdbAllocRow(sizeof(SRetentionObj));
57,940✔
150
  if (pRow == NULL) {
57,940✔
151
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
152
    goto OVER;
×
153
  }
154

155
  pObj = sdbGetRowObj(pRow);
57,940✔
156
  if (pObj == NULL) {
57,940✔
157
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
158
    goto OVER;
×
159
  }
160

161
  int32_t tlen;
57,940✔
162
  int32_t dataPos = 0;
57,940✔
163
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
57,940✔
164
  buf = taosMemoryMalloc(tlen + 1);
57,940✔
165
  if (buf == NULL) {
57,940✔
166
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
167
    goto OVER;
×
168
  }
169
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
57,940✔
170

171
  if ((terrno = tDeserializeSRetentionObj(buf, tlen, pObj)) < 0) {
57,940✔
172
    goto OVER;
×
173
  }
174

175
OVER:
57,940✔
176
  taosMemoryFreeClear(buf);
57,940✔
177
  if (terrno != TSDB_CODE_SUCCESS) {
57,940✔
178
    mError("retention:%" PRId32 ", failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
×
179
    taosMemoryFreeClear(pRow);
×
180
    return NULL;
×
181
  }
182

183
  mTrace("retention:%" PRId32 ", decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
57,940✔
184
  return pRow;
57,940✔
185
}
186

187
int32_t mndRetentionActionInsert(SSdb *pSdb, SRetentionObj *pObj) {
31,787✔
188
  mTrace("retention:%" PRId32 ", perform insert action", pObj->id);
31,787✔
189
  return 0;
31,787✔
190
}
191

192
int32_t mndRetentionActionDelete(SSdb *pSdb, SRetentionObj *pObj) {
57,940✔
193
  mTrace("retention:%" PRId32 ", perform delete action", pObj->id);
57,940✔
194
  tFreeRetentionObj(pObj);
57,940✔
195
  return 0;
57,940✔
196
}
197

198
int32_t mndRetentionActionUpdate(SSdb *pSdb, SRetentionObj *pOldObj, SRetentionObj *pNewObj) {
×
199
  mTrace("retention:%" PRId32 ", perform update action, old row:%p new row:%p", pOldObj->id, pOldObj, pNewObj);
×
200

201
  return 0;
×
202
}
203

204
SRetentionObj *mndAcquireRetention(SMnode *pMnode, int32_t id) {
159,439✔
205
  SSdb          *pSdb = pMnode->pSdb;
159,439✔
206
  SRetentionObj *pObj = sdbAcquire(pSdb, SDB_RETENTION, &id);
159,439✔
207
  if (pObj == NULL && (terrno != TSDB_CODE_SDB_OBJ_NOT_THERE && terrno != TSDB_CODE_SDB_OBJ_CREATING &&
159,439✔
208
                       terrno != TSDB_CODE_SDB_OBJ_DROPPING)) {
×
209
    terrno = TSDB_CODE_APP_ERROR;
×
210
    mError("retention:%" PRId32 ", failed to acquire retention since %s", id, terrstr());
×
211
  }
212
  return pObj;
159,439✔
213
}
214

215
void mndReleaseRetention(SMnode *pMnode, SRetentionObj *pObj) {
159,439✔
216
  SSdb *pSdb = pMnode->pSdb;
159,439✔
217
  sdbRelease(pSdb, pObj);
159,439✔
218
}
159,439✔
219

220
static int32_t mndRetentionGetDbInfo(SMnode *pMnode, int32_t id, char *dbname, int32_t len, int64_t *dbUid) {
66,643✔
221
  int32_t        code = 0;
66,643✔
222
  SRetentionObj *pObj = mndAcquireRetention(pMnode, id);
66,643✔
223
  if (pObj == NULL) {
66,643✔
224
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
225
    if (terrno != 0) code = terrno;
×
226
    TAOS_RETURN(code);
×
227
  }
228

229
  tstrncpy(dbname, pObj->dbname, len);
66,643✔
230
  if (dbUid) *dbUid = pObj->dbUid;
66,643✔
231
  mndReleaseRetention(pMnode, pObj);
66,643✔
232
  TAOS_RETURN(code);
66,643✔
233
}
234

235
int32_t mndAddRetentionToTrans(SMnode *pMnode, STrans *pTrans, SRetentionObj *pObj, SDbObj *pDb, STrimDbRsp *rsp) {
33,971✔
236
  int32_t code = 0;
33,971✔
237
  pObj->id = tGenIdPI32();
33,971✔
238

239
  tstrncpy(pObj->dbname, pDb->name, sizeof(pObj->dbname));
33,971✔
240
  pObj->dbUid = pDb->uid;
33,971✔
241

242
  pObj->startTime = taosGetTimestampMs();
33,971✔
243

244
  SSdbRaw *pVgRaw = mndRetentionActionEncode(pObj);
33,971✔
245
  if (pVgRaw == NULL) {
33,971✔
246
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
247
    if (terrno != 0) code = terrno;
×
248
    TAOS_RETURN(code);
×
249
  }
250
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
33,971✔
251
    sdbFreeRaw(pVgRaw);
×
252
    TAOS_RETURN(code);
×
253
  }
254

255
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
33,971✔
256
    sdbFreeRaw(pVgRaw);
×
257
    TAOS_RETURN(code);
×
258
  }
259

260
  rsp->id = pObj->id;
33,971✔
261

262
  return 0;
33,971✔
263
}
264

265
static int32_t mndRetrieveRetention(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
42,804✔
266
  SMnode        *pMnode = pReq->info.node;
42,804✔
267
  SSdb          *pSdb = pMnode->pSdb;
42,804✔
268
  int32_t        numOfRows = 0;
42,804✔
269
  SRetentionObj *pObj = NULL;
42,804✔
270
  char          *sep = NULL;
42,804✔
271
  SDbObj        *pDb = NULL;
42,804✔
272
  int32_t        code = 0, lino = 0;
42,804✔
273
  char           tmpBuf[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
42,804✔
274
  SUserObj      *pUser = NULL;
42,804✔
275
  SDbObj        *pIterDb = NULL;
42,804✔
276
  char           objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
42,804✔
277
  bool           showAll = false, showIter = false;
42,804✔
278
  int64_t        dbUid = 0;
42,804✔
279

280
  if ((pShow->db[0] != 0) && (sep = strchr(pShow->db, '.')) && (*(++sep) != 0)) {
42,804✔
281
    if (IS_SYS_DBNAME(sep)) {
42,804✔
282
      goto _OVER;
×
283
    } else if (!(pDb = mndAcquireDb(pMnode, pShow->db))) {
42,804✔
284
      return terrno;
×
285
    }
286
  }
287

288
  MND_SHOW_CHECK_OBJ_PRIVILEGE_ALL(RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), PRIV_SHOW_RETENTIONS, PRIV_OBJ_DB, 0,
42,804✔
289
                                   _OVER);
290

291
  while (numOfRows < rows) {
73,800✔
292
    pShow->pIter = sdbFetch(pSdb, SDB_RETENTION, pShow->pIter, (void **)&pObj);
73,800✔
293
    if (pShow->pIter == NULL) break;
73,800✔
294

295
    MND_SHOW_CHECK_DB_PRIVILEGE(pDb, pObj->dbname, pObj, RPC_MSG_TOKEN(pReq), MND_OPER_SHOW_RETENTIONS, _OVER);
30,996✔
296

297
    SColumnInfoData *pColInfo;
298
    int32_t          cols = 0;
30,996✔
299

300
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,996✔
301
    COL_DATA_SET_VAL_GOTO((const char *)&pObj->id, false, pObj, pShow->pIter, _OVER);
30,996✔
302

303
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,996✔
304
    if (pDb != NULL && strcmp(pDb->name, pObj->dbname) != 0) {
30,996✔
UNCOV
305
      sdbRelease(pSdb, pObj);
×
306
      continue;
×
307
    }
308
    SName name = {0};
30,996✔
309
    if ((code = tNameFromString(&name, pObj->dbname, T_NAME_ACCT | T_NAME_DB)) != 0) {
30,996✔
UNCOV
310
      sdbRelease(pSdb, pObj);
×
311
      TAOS_CHECK_GOTO(code, &lino, _OVER);
×
312
    }
313
    (void)tNameGetDbName(&name, varDataVal(tmpBuf));
30,996✔
314
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
30,996✔
315
    COL_DATA_SET_VAL_GOTO((const char *)tmpBuf, false, pObj, pShow->pIter, _OVER);
30,996✔
316

317
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,996✔
318
    COL_DATA_SET_VAL_GOTO((const char *)&pObj->startTime, false, pObj, pShow->pIter, _OVER);
30,996✔
319

320
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,996✔
321
    tstrncpy(varDataVal(tmpBuf), pObj->triggerType == TSDB_TRIGGER_MANUAL ? "manual" : "auto",
30,996✔
322
             sizeof(tmpBuf) - VARSTR_HEADER_SIZE);
323
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
30,996✔
324
    COL_DATA_SET_VAL_GOTO((const char *)tmpBuf, false, pObj, pShow->pIter, _OVER);
30,996✔
325

326
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,996✔
327
    char *optr = "trim";
30,996✔
328
    if (pObj->optrType == TSDB_OPTR_SSMIGRATE) {
30,996✔
UNCOV
329
      optr = "ssmigrate";
×
330
    } else if (pObj->optrType == TSDB_OPTR_ROLLUP) {
30,996✔
331
      optr = "rollup";
24,354✔
332
    }
333
    tstrncpy(varDataVal(tmpBuf), optr, sizeof(tmpBuf) - VARSTR_HEADER_SIZE);
30,996✔
334
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
30,996✔
335
    COL_DATA_SET_VAL_GOTO((const char *)tmpBuf, false, pObj, pShow->pIter, _OVER);
30,996✔
336

337
    sdbRelease(pSdb, pObj);
30,996✔
338
    ++numOfRows;
30,996✔
339
  }
340

341
_OVER:
42,804✔
342
  if (pUser) mndReleaseUser(pMnode, pUser);
42,804✔
343
  mndReleaseDb(pMnode, pDb);
42,804✔
344
  if (code != 0) {
42,804✔
UNCOV
345
    mError("failed to retrieve retention at line %d since %s", lino, tstrerror(code));
×
346
    TAOS_RETURN(code);
×
347
  }
348
  pShow->numOfRows += numOfRows;
42,804✔
349
  return numOfRows;
42,804✔
350
}
351

UNCOV
352
static void mndCancelRetrieveRetention(SMnode *pMnode, void *pIter) {
×
353
  SSdb *pSdb = pMnode->pSdb;
×
354
  sdbCancelFetchByType(pSdb, pIter, SDB_RETENTION);
×
355
}
×
356

UNCOV
357
static void *mndBuildKillRetentionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t id, int32_t dnodeId) {
×
358
  SVKillRetentionReq req = {0};
×
359
  req.taskId = id;
×
360
  req.vgId = pVgroup->vgId;
×
361
  req.dnodeId = dnodeId;
×
362
  terrno = 0;
×
363

UNCOV
364
  mInfo("vgId:%d, build kill retention req", pVgroup->vgId);
×
365
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
×
366
  if (contLen < 0) {
×
367
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
368
    return NULL;
×
369
  }
UNCOV
370
  contLen += sizeof(SMsgHead);
×
371

UNCOV
372
  void *pReq = taosMemoryMalloc(contLen);
×
373
  if (pReq == NULL) {
×
374
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
375
    return NULL;
×
376
  }
377

UNCOV
378
  SMsgHead *pHead = pReq;
×
379
  pHead->contLen = htonl(contLen);
×
380
  pHead->vgId = htonl(pVgroup->vgId);
×
381

UNCOV
382
  int32_t ret = 0;
×
383
  if ((ret = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
×
384
    taosMemoryFreeClear(pReq);
×
385
    terrno = ret;
×
386
    return NULL;
×
387
  }
UNCOV
388
  *pContLen = contLen;
×
389
  return pReq;
×
390
}
391

UNCOV
392
static int32_t mndAddKillRetentionAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t id, int32_t dnodeId) {
×
393
  int32_t      code = 0;
×
394
  STransAction action = {0};
×
395

UNCOV
396
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
×
397
  if (pDnode == NULL) {
×
398
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
399
    if (terrno != 0) code = terrno;
×
400
    TAOS_RETURN(code);
×
401
  }
UNCOV
402
  action.epSet = mndGetDnodeEpset(pDnode);
×
403
  mndReleaseDnode(pMnode, pDnode);
×
404

UNCOV
405
  int32_t contLen = 0;
×
406
  void   *pReq = mndBuildKillRetentionReq(pMnode, pVgroup, &contLen, id, dnodeId);
×
407
  if (pReq == NULL) {
×
408
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
409
    if (terrno != 0) code = terrno;
×
410
    TAOS_RETURN(code);
×
411
  }
412

UNCOV
413
  action.pCont = pReq;
×
414
  action.contLen = contLen;
×
415
  action.msgType = TDMT_VND_KILL_TRIM;
×
416

UNCOV
417
  mTrace("trans:%d, kill retention msg len:%d", pTrans->id, contLen);
×
418

UNCOV
419
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
420
    taosMemoryFree(pReq);
×
421
    TAOS_RETURN(code);
×
422
  }
423

UNCOV
424
  return 0;
×
425
}
426

UNCOV
427
static int32_t mndKillRetention(SMnode *pMnode, SRpcMsg *pReq, SRetentionObj *pObj) {
×
428
  int32_t code = 0;
×
429
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-retention");
×
430
  if (pTrans == NULL) {
×
431
    mError("retention:%" PRId32 ", failed to drop since %s", pObj->id, terrstr());
×
432
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
433
    if (terrno != 0) code = terrno;
×
434
    TAOS_RETURN(code);
×
435
  }
UNCOV
436
  mInfo("trans:%d, used to kill retention:%" PRId32, pTrans->id, pObj->id);
×
437

UNCOV
438
  mndTransSetDbName(pTrans, pObj->dbname, NULL);
×
439

UNCOV
440
  SSdbRaw *pCommitRaw = mndRetentionActionEncode(pObj);
×
441
  if (pCommitRaw == NULL) {
×
442
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
443
    if (terrno != 0) code = terrno;
×
444
    mndTransDrop(pTrans);
×
445
    TAOS_RETURN(code);
×
446
  }
UNCOV
447
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
448
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
449
    mndTransDrop(pTrans);
×
450
    TAOS_RETURN(code);
×
451
  }
UNCOV
452
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
×
453
    mndTransDrop(pTrans);
×
454
    TAOS_RETURN(code);
×
455
  }
456

UNCOV
457
  void *pIter = NULL;
×
458
  while (1) {
×
459
    SCompactDetailObj *pDetail = NULL;
×
460
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
×
461
    if (pIter == NULL) break;
×
462

UNCOV
463
    if (pDetail->id == pObj->id) {
×
464
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
×
465
      if (pVgroup == NULL) {
×
466
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
467
        sdbCancelFetch(pMnode->pSdb, pIter);
×
468
        sdbRelease(pMnode->pSdb, pDetail);
×
469
        mndTransDrop(pTrans);
×
470
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
471
        if (terrno != 0) code = terrno;
×
472
        TAOS_RETURN(code);
×
473
      }
474

UNCOV
475
      if ((code = mndAddKillRetentionAction(pMnode, pTrans, pVgroup, pObj->id, pDetail->dnodeId)) != 0) {
×
476
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
477
        sdbCancelFetch(pMnode->pSdb, pIter);
×
478
        sdbRelease(pMnode->pSdb, pDetail);
×
479
        mndTransDrop(pTrans);
×
480
        TAOS_RETURN(code);
×
481
      }
482

UNCOV
483
      mndReleaseVgroup(pMnode, pVgroup);
×
484
    }
485

UNCOV
486
    sdbRelease(pMnode->pSdb, pDetail);
×
487
  }
488

UNCOV
489
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
490
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
491
    mndTransDrop(pTrans);
×
492
    TAOS_RETURN(code);
×
493
  }
494

UNCOV
495
  mndTransDrop(pTrans);
×
496
  return 0;
×
497
}
498

UNCOV
499
int32_t mndProcessKillRetentionReq(SRpcMsg *pReq) {
×
500
  int32_t           code = 0;
×
501
  int32_t           lino = 0;
×
502
  SKillRetentionReq req = {0};  // reuse SKillCompactReq
×
503
  int64_t           tss = taosGetTimestampMs();
×
504

UNCOV
505
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &req)) != 0) {
×
506
    TAOS_RETURN(code);
×
507
  }
508

UNCOV
509
  mInfo("start to kill retention:%" PRId32, req.id);
×
510

UNCOV
511
  SMnode        *pMnode = pReq->info.node;
×
512
  SRetentionObj *pObj = mndAcquireRetention(pMnode, req.id);
×
513
  if (pObj == NULL) {
×
514
    code = TSDB_CODE_MND_INVALID_RETENTION_ID;
×
515
    tFreeSKillCompactReq(&req);
×
516
    TAOS_RETURN(code);
×
517
  }
518

UNCOV
519
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_TRIM_DB), &lino, _OVER);
×
520

UNCOV
521
  TAOS_CHECK_GOTO(mndKillRetention(pMnode, pReq, pObj), &lino, _OVER);
×
522

UNCOV
523
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
524

UNCOV
525
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
×
526
    char    obj[TSDB_INT32_ID_LEN] = {0};
×
527
    int32_t nBytes = snprintf(obj, sizeof(obj), "%d", pObj->id);
×
528
    if ((uint32_t)nBytes < sizeof(obj)) {
×
529
      int64_t tse = taosGetTimestampMs();
×
530
      double  duration = (double)(tse - tss);
×
531
      duration = duration / 1000;
×
532
      auditRecord(pReq, pMnode->clusterId, "killRetention", pObj->dbname, obj, req.sql, req.sqlLen, duration, 0);
×
533
    } else {
UNCOV
534
      mError("retention:%" PRId32 " failed to audit since %s", pObj->id, tstrerror(TSDB_CODE_OUT_OF_RANGE));
×
535
    }
536
  }
UNCOV
537
_OVER:
×
538
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
539
    mError("failed to kill retention %" PRId32 " since %s", req.id, terrstr());
×
540
  }
541

UNCOV
542
  tFreeSKillCompactReq((SKillCompactReq *)&req);
×
543
  mndReleaseRetention(pMnode, pObj);
×
544

UNCOV
545
  TAOS_RETURN(code);
×
546
}
547

548
// update progress
549
static int32_t mndUpdateRetentionProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t id, SQueryRetentionProgressRsp *rsp) {
114,114✔
550
  int32_t code = 0;
114,114✔
551

552
  void *pIter = NULL;
114,114✔
553
  while (1) {
141,588✔
554
    SRetentionDetailObj *pDetail = NULL;
255,702✔
555
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
255,702✔
556
    if (pIter == NULL) break;
255,702✔
557

558
    if (pDetail->id == id && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
255,175✔
559
      pDetail->newNumberFileset = rsp->numberFileset;
113,587✔
560
      pDetail->newFinished = rsp->finished;
113,587✔
561
      pDetail->progress = rsp->progress;
113,587✔
562
      pDetail->remainingTime = rsp->remainingTime;
113,587✔
563

564
      sdbCancelFetch(pMnode->pSdb, pIter);
113,587✔
565
      sdbRelease(pMnode->pSdb, pDetail);
113,587✔
566

567
      TAOS_RETURN(code);
113,587✔
568
    }
569

570
    sdbRelease(pMnode->pSdb, pDetail);
141,588✔
571
  }
572

573
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
527✔
574
}
575

576
int32_t mndProcessQueryRetentionRsp(SRpcMsg *pReq) {
115,096✔
577
  int32_t                    code = 0;
115,096✔
578
  SQueryRetentionProgressRsp req = {0};
115,096✔
579
  if (pReq->code != 0) {
115,096✔
580
    mError("received wrong retention response, req code is %s", tstrerror(pReq->code));
982✔
581
    TAOS_RETURN(pReq->code);
982✔
582
  }
583
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
114,114✔
584
  if (code != 0) {
114,114✔
UNCOV
585
    mError("failed to deserialize vnode-query-retention-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
586
           pReq->contLen);
UNCOV
587
    TAOS_RETURN(code);
×
588
  }
589

590
  mDebug("retention:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.id, req.vgId,
114,114✔
591
         req.dnodeId, req.numberFileset, req.finished);
592

593
  SMnode *pMnode = pReq->info.node;
114,114✔
594

595
  code = mndUpdateRetentionProgress(pMnode, pReq, req.id, &req);
114,114✔
596
  if (code != 0) {
114,114✔
597
    mError("retention:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.id,
527✔
598
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
599
    TAOS_RETURN(code);
527✔
600
  }
601

602
  TAOS_RETURN(code);
113,587✔
603
}
604

605
// timer
606
void mndRetentionSendProgressReq(SMnode *pMnode, SRetentionObj *pObj) {
66,643✔
607
  void *pIter = NULL;
66,643✔
608

609
  while (1) {
139,430✔
610
    SRetentionDetailObj *pDetail = NULL;
206,073✔
611
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
206,073✔
612
    if (pIter == NULL) break;
206,073✔
613

614
    if (pDetail->id == pObj->id) {
139,430✔
615
      SEpSet epSet = {0};
115,096✔
616

617
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
115,096✔
618
      if (pDnode == NULL) break;
115,096✔
619
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
115,096✔
UNCOV
620
        sdbRelease(pMnode->pSdb, pDetail);
×
621
        continue;
×
622
      }
623
      mndReleaseDnode(pMnode, pDnode);
115,096✔
624

625
      SQueryRetentionProgressReq req;
115,096✔
626
      req.id = pDetail->id;
115,096✔
627
      req.vgId = pDetail->vgId;
115,096✔
628
      req.dnodeId = pDetail->dnodeId;
115,096✔
629

630
      int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
115,096✔
631
      if (contLen < 0) {
115,096✔
UNCOV
632
        sdbRelease(pMnode->pSdb, pDetail);
×
633
        continue;
×
634
      }
635

636
      contLen += sizeof(SMsgHead);
115,096✔
637

638
      SMsgHead *pHead = rpcMallocCont(contLen);
115,096✔
639
      if (pHead == NULL) {
115,096✔
UNCOV
640
        sdbRelease(pMnode->pSdb, pDetail);
×
641
        continue;
×
642
      }
643

644
      pHead->contLen = htonl(contLen);
115,096✔
645
      pHead->vgId = htonl(pDetail->vgId);
115,096✔
646

647
      if (tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
115,096✔
UNCOV
648
        sdbRelease(pMnode->pSdb, pDetail);
×
649
        continue;
×
650
      }
651

652
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_TRIM_PROGRESS, .contLen = contLen};
115,096✔
653

654
      rpcMsg.pCont = pHead;
115,096✔
655

656
      char    detail[1024] = {0};
115,096✔
657
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
230,192✔
658
                              TMSG_INFO(TDMT_VND_QUERY_TRIM_PROGRESS), epSet.numOfEps, epSet.inUse);
230,192✔
659
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
230,192✔
660
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
115,096✔
661
      }
662

663
      mDebug("retention:%d, send update progress msg to %s", pDetail->id, detail);
115,096✔
664

665
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
115,096✔
UNCOV
666
        sdbRelease(pMnode->pSdb, pDetail);
×
UNCOV
667
        continue;
×
668
      }
669
    }
670

671
    sdbRelease(pMnode->pSdb, pDetail);
139,430✔
672
  }
673
}
66,643✔
674

675
static int32_t mndSaveRetentionProgress(SMnode *pMnode, int32_t id) {
66,643✔
676
  int32_t code = 0;
66,643✔
677
  bool    needSave = false;
66,643✔
678
  void   *pIter = NULL;
66,643✔
679
  while (1) {
139,430✔
680
    SRetentionDetailObj *pDetail = NULL;
206,073✔
681
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
206,073✔
682
    if (pIter == NULL) break;
206,073✔
683

684
    if (pDetail->id == id) {
139,430✔
685
      mDebug(
115,096✔
686
          "retention:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
687
          "newNumberFileset:%d, newFinished:%d",
688
          pDetail->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
689
          pDetail->newNumberFileset, pDetail->newFinished);
690

691
      // these 2 number will jump back after dnode restart, so < is not used here
692
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
115,096✔
693
        needSave = true;
60,950✔
694
    }
695

696
    sdbRelease(pMnode->pSdb, pDetail);
139,430✔
697
  }
698

699
  char    dbname[TSDB_TABLE_FNAME_LEN] = {0};
66,643✔
700
  int64_t dbUid = 0;
66,643✔
701
  TAOS_CHECK_RETURN(mndRetentionGetDbInfo(pMnode, id, dbname, TSDB_TABLE_FNAME_LEN, &dbUid));
66,643✔
702

703
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
66,643✔
704
    needSave = true;
491✔
705
    mWarn("retention:%" PRId32 ", no db exist, set needSave:%s", id, dbname);
491✔
706
  }
707

708
  if (!needSave) {
66,643✔
709
    mDebug("retention:%" PRId32 ", no need to save", id);
38,961✔
710
    TAOS_RETURN(code);
38,961✔
711
  }
712

713
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-retention-progress");
27,682✔
714
  if (pTrans == NULL) {
27,682✔
UNCOV
715
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
716
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
717
    if (terrno != 0) code = terrno;
×
718
    TAOS_RETURN(code);
×
719
  }
720
  mInfo("retention:%d, trans:%d, used to update retention progress.", id, pTrans->id);
27,682✔
721

722
  mndTransSetDbName(pTrans, dbname, NULL);
27,682✔
723

724
  pIter = NULL;
27,682✔
725
  while (1) {
77,678✔
726
    SRetentionDetailObj *pDetail = NULL;
105,360✔
727
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
105,360✔
728
    if (pIter == NULL) break;
105,360✔
729

730
    if (pDetail->id == id) {
77,678✔
731
      mInfo(
67,944✔
732
          "retention:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
733
          "newNumberFileset:%d, newFinished:%d",
734
          pDetail->id, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
735
          pDetail->newNumberFileset, pDetail->newFinished);
736

737
      pDetail->numberFileset = pDetail->newNumberFileset;
67,944✔
738
      pDetail->finished = pDetail->newFinished;
67,944✔
739

740
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
67,944✔
741
      if (pCommitRaw == NULL) {
67,944✔
UNCOV
742
        sdbCancelFetch(pMnode->pSdb, pIter);
×
743
        sdbRelease(pMnode->pSdb, pDetail);
×
744
        mndTransDrop(pTrans);
×
745
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
746
        if (terrno != 0) code = terrno;
×
747
        TAOS_RETURN(code);
×
748
      }
749
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
67,944✔
UNCOV
750
        mError("retention:%d, trans:%d, failed to append commit log since %s", pDetail->id, pTrans->id, terrstr());
×
751
        sdbCancelFetch(pMnode->pSdb, pIter);
×
752
        sdbRelease(pMnode->pSdb, pDetail);
×
753
        mndTransDrop(pTrans);
×
754
        TAOS_RETURN(code);
×
755
      }
756
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
67,944✔
UNCOV
757
        sdbCancelFetch(pMnode->pSdb, pIter);
×
758
        sdbRelease(pMnode->pSdb, pDetail);
×
759
        mndTransDrop(pTrans);
×
760
        TAOS_RETURN(code);
×
761
      }
762
    }
763

764
    sdbRelease(pMnode->pSdb, pDetail);
77,678✔
765
  }
766

767
  bool allFinished = true;
27,682✔
768
  pIter = NULL;
27,682✔
769
  while (1) {
74,785✔
770
    SRetentionDetailObj *pDetail = NULL;
102,467✔
771
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
102,467✔
772
    if (pIter == NULL) break;
102,467✔
773

774
    if (pDetail->id == id) {
76,314✔
775
      mInfo("retention:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
66,608✔
776
            pDetail->id, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
777

778
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
66,608✔
779
        allFinished = false;
1,527✔
780
        sdbCancelFetch(pMnode->pSdb, pIter);
1,527✔
781
        sdbRelease(pMnode->pSdb, pDetail);
1,527✔
782
        break;
1,527✔
783
      }
784
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
65,081✔
785
        allFinished = false;
2✔
786
        sdbCancelFetch(pMnode->pSdb, pIter);
2✔
787
        sdbRelease(pMnode->pSdb, pDetail);
2✔
788
        break;
2✔
789
      }
790
    }
791

792
    sdbRelease(pMnode->pSdb, pDetail);
74,785✔
793
  }
794

795
  if (!mndDbIsExist(pMnode, dbname, dbUid)) {
27,682✔
796
    allFinished = true;
491✔
797
    mWarn("retention:%" PRId32 ", no db exist, set all finished:%s", id, dbname);
491✔
798
  }
799

800
  if (allFinished) {
27,682✔
801
    mInfo("retention:%d, all finished", id);
26,153✔
802
    pIter = NULL;
26,153✔
803
    while (1) {
71,969✔
804
      SRetentionDetailObj *pDetail = NULL;
98,122✔
805
      pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION_DETAIL, pIter, (void **)&pDetail);
98,122✔
806
      if (pIter == NULL) break;
98,122✔
807

808
      if (pDetail->id == id) {
71,969✔
809
        SSdbRaw *pCommitRaw = mndRetentionDetailActionEncode(pDetail);
63,385✔
810
        if (pCommitRaw == NULL) {
63,385✔
UNCOV
811
          mndTransDrop(pTrans);
×
812
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
813
          if (terrno != 0) code = terrno;
×
814
          TAOS_RETURN(code);
×
815
        }
816
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
63,385✔
UNCOV
817
          mError("retention:%d, trans:%d, failed to append commit log since %s", pDetail->id, pTrans->id,
×
818
                 tstrerror(code));
UNCOV
819
          sdbCancelFetch(pMnode->pSdb, pIter);
×
820
          sdbRelease(pMnode->pSdb, pDetail);
×
821
          mndTransDrop(pTrans);
×
822
          TAOS_RETURN(code);
×
823
        }
824
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
63,385✔
UNCOV
825
          sdbCancelFetch(pMnode->pSdb, pIter);
×
826
          sdbRelease(pMnode->pSdb, pDetail);
×
827
          mndTransDrop(pTrans);
×
828
          TAOS_RETURN(code);
×
829
        }
830
        mInfo("retention:%d, add drop compactdetail action", pDetail->compactDetailId);
63,385✔
831
      }
832

833
      sdbRelease(pMnode->pSdb, pDetail);
71,969✔
834
    }
835

836
    SRetentionObj *pObj = mndAcquireRetention(pMnode, id);
26,153✔
837
    if (pObj == NULL) {
26,153✔
UNCOV
838
      mndTransDrop(pTrans);
×
839
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
840
      if (terrno != 0) code = terrno;
×
841
      TAOS_RETURN(code);
×
842
    }
843
    SSdbRaw *pCommitRaw = mndRetentionActionEncode(pObj);
26,153✔
844
    mndReleaseRetention(pMnode, pObj);
26,153✔
845
    if (pCommitRaw == NULL) {
26,153✔
UNCOV
846
      mndTransDrop(pTrans);
×
847
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
848
      if (terrno != 0) code = terrno;
×
849
      TAOS_RETURN(code);
×
850
    }
851
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
26,153✔
UNCOV
852
      mError("retention:%d, trans:%d, failed to append commit log since %s", id, pTrans->id, tstrerror(code));
×
853
      mndTransDrop(pTrans);
×
854
      TAOS_RETURN(code);
×
855
    }
856
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
26,153✔
UNCOV
857
      mError("retention:%d, trans:%d, failed to append commit log since %s", id, pTrans->id, tstrerror(code));
×
858
      mndTransDrop(pTrans);
×
859
      TAOS_RETURN(code);
×
860
    }
861
    mInfo("retention:%d, add drop compact action", pObj->id);
26,153✔
862
  }
863

864
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
27,682✔
UNCOV
865
    mError("retention:%d, trans:%d, failed to prepare since %s", id, pTrans->id, tstrerror(code));
×
UNCOV
866
    mndTransDrop(pTrans);
×
UNCOV
867
    TAOS_RETURN(code);
×
868
  }
869

870
  mndTransDrop(pTrans);
27,682✔
871
  return 0;
27,682✔
872
}
873

874
static void mndRetentionPullup(SMnode *pMnode) {
2,786,770✔
875
  int32_t code = 0;
2,786,770✔
876
  SSdb   *pSdb = pMnode->pSdb;
2,786,770✔
877
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_RETENTION), sizeof(int32_t));
2,786,770✔
878
  if (pArray == NULL) return;
2,786,770✔
879

880
  void *pIter = NULL;
2,786,770✔
881
  while (1) {
66,643✔
882
    SRetentionObj *pObj = NULL;
2,853,413✔
883
    pIter = sdbFetch(pMnode->pSdb, SDB_RETENTION, pIter, (void **)&pObj);
2,853,413✔
884
    if (pIter == NULL) break;
2,853,413✔
885
    if (taosArrayPush(pArray, &pObj->id) == NULL) {
133,286✔
UNCOV
886
      mError("failed to push retention id:%d into array, but continue pull up", pObj->id);
×
887
    }
888
    sdbRelease(pSdb, pObj);
66,643✔
889
  }
890

891
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
2,853,413✔
892
    int32_t *pId = taosArrayGet(pArray, i);
66,643✔
893
    mInfo("begin to pull up retention:%d", *pId);
66,643✔
894
    SRetentionObj *pObj = mndAcquireRetention(pMnode, *pId);
66,643✔
895
    if (pObj != NULL) {
66,643✔
896
      mInfo("retention:%d, begin to pull up", pObj->id);
66,643✔
897
      mndRetentionSendProgressReq(pMnode, pObj);
66,643✔
898
      if ((code = mndSaveRetentionProgress(pMnode, pObj->id)) != 0) {
66,643✔
UNCOV
899
        mError("retention:%d, failed to save retention progress since %s", pObj->id, tstrerror(code));
×
900
      }
901
      mndReleaseRetention(pMnode, pObj);
66,643✔
902
    }
903
  }
904
  taosArrayDestroy(pArray);
2,786,770✔
905
}
906

907
static int32_t mndTrimDbDispatchAudit(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow *tw) {
16,212✔
908
  int64_t tss = taosGetTimestampMs();
16,212✔
909
  if (!tsEnableAudit || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) {
16,212✔
910
    return 0;
16,212✔
911
  }
912

UNCOV
913
  if (tsAuditLevel < AUDIT_LEVEL_CLUSTER) {
×
914
    return 0;
×
915
  }
916

UNCOV
917
  SName   name = {0};
×
918
  int32_t sqlLen = 0;
×
919
  char    sql[256] = {0};
×
920
  char    skeyStr[40] = {0};
×
921
  char    ekeyStr[40] = {0};
×
922
  char   *pDbName = pDb->name;
×
923

UNCOV
924
  if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) == 0) {
×
925
    pDbName = name.dbname;
×
926
  }
927

UNCOV
928
  if (taosFormatUtcTime(skeyStr, sizeof(skeyStr), tw->skey, pDb->cfg.precision) == 0 &&
×
929
      taosFormatUtcTime(ekeyStr, sizeof(ekeyStr), tw->ekey, pDb->cfg.precision) == 0) {
×
930
    sqlLen = tsnprintf(sql, sizeof(sql), "trim db %s start with '%s' end with '%s'", pDbName, skeyStr, ekeyStr);
×
931
  } else {
UNCOV
932
    sqlLen = tsnprintf(sql, sizeof(sql), "trim db %s start with %" PRIi64 " end with %" PRIi64, pDbName, tw->skey,
×
933
                       tw->ekey);
934
  }
935

UNCOV
936
  int64_t tse = taosGetTimestampMs();
×
937
  double  duration = (double)(tse - tss);
×
938
  duration = duration / 1000;
×
939
  auditRecord(NULL, pMnode->clusterId, "autoTrimDB", name.dbname, "", sql, sqlLen, duration, 0);
×
940

UNCOV
941
  return 0;
×
942
}
943

944
extern int32_t mndTrimDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, STimeWindow tw, SArray *vgroupIds,
945
                         ETsdbOpType type, ETriggerType triggerType);
946
static int32_t mndTrimDbDispatch(SRpcMsg *pReq) {
9,351✔
947
  int32_t    code = 0, lino = 0;
9,351✔
948
  SMnode    *pMnode = pReq->info.node;
9,351✔
949
  SSdb      *pSdb = pMnode->pSdb;
9,351✔
950
  int64_t    curSec = taosGetTimestampMs() / 1000;
9,351✔
951
  STrimDbReq trimReq = {
9,351✔
952
      .tw.skey = INT64_MIN, .tw.ekey = curSec, .optrType = TSDB_OPTR_NORMAL, .triggerType = TSDB_TRIGGER_AUTO};
953

954
  void   *pIter = NULL;
9,351✔
955
  SDbObj *pDb = NULL;
9,351✔
956
  while ((pIter = sdbFetch(pSdb, SDB_DB, pIter, (void **)&pDb))) {
25,563✔
957
    if (pDb->cfg.isMount) {
16,212✔
UNCOV
958
      sdbRelease(pSdb, pDb);
×
959
      continue;
×
960
    }
961

962
    (void)snprintf(trimReq.db, sizeof(trimReq.db), "%s", pDb->name);
16,212✔
963

964
    if ((code = mndTrimDb(pMnode, pReq, pDb, trimReq.tw, trimReq.vgroupIds, trimReq.optrType, trimReq.triggerType)) ==
16,212✔
965
        0) {
966
      mInfo("db:%s, start to auto trim, optr:%u, tw:%" PRId64 ",%" PRId64, trimReq.db, trimReq.optrType,
16,212✔
967
            trimReq.tw.skey, trimReq.tw.ekey);
968
    } else {
UNCOV
969
      mError("db:%s, failed to auto trim since %s", pDb->name, tstrerror(code));
×
970
      sdbRelease(pSdb, pDb);
×
971
      continue;
×
972
    }
973

974
    TAOS_UNUSED(mndTrimDbDispatchAudit(pMnode, pReq, pDb, &trimReq.tw));
16,212✔
975

976
    sdbRelease(pSdb, pDb);
16,212✔
977
  }
978
_exit:
9,351✔
979
  return code;
9,351✔
980
}
981

982
static int32_t mndProcessQueryRetentionTimer(SRpcMsg *pReq) {
2,786,770✔
983
  mTrace("start to process query trim timer");
2,786,770✔
984
  mndRetentionPullup(pReq->info.node);
2,786,770✔
985
  return 0;
2,786,770✔
986
}
987

988
static int32_t mndProcessTrimDbTimer(SRpcMsg *pReq) {
9,351✔
989
  mTrace("start to process trim db timer");
9,351✔
990
  return mndTrimDbDispatch(pReq);
9,351✔
991
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc