• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4932

19 Jan 2026 12:29PM UTC coverage: 66.646% (-0.1%) from 66.749%
#4932

push

travis-ci

web-flow
chore: upgrade taospy (#34272)

202981 of 304565 relevant lines covered (66.65%)

126831443.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.0
/source/dnode/vnode/src/meta/metaTtl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "metaTtl.h"
17
#include "meta.h"
18
#include "tdbInt.h"
19

20
typedef struct {
21
  TTB   *pNewTtlIdx;
22
  SMeta *pMeta;
23
} SConvertData;
24

25
typedef struct {
26
  int32_t      ttlDropMaxCount;
27
  int32_t      count;
28
  STtlIdxKeyV1 expiredKey;
29
  SArray      *pTbUids;
30
} STtlExpiredCtx;
31

32
static void ttlMgrCleanup(STtlManger *pTtlMgr);
33

34
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta);
35

36
static void    ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid);
37
static int     ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
38
static int     ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
39
static int     ttlMgrFillCache(STtlManger *pTtlMgr);
40
static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache);
41
static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData);
42
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
43
                                         void *pExpiredInfo);
44

45
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr);
46

47
const char *ttlTbname = "ttl.idx";
48
const char *ttlV1Tbname = "ttlv1.idx";
49

50
int32_t ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix, int32_t flushThreshold) {
3,753,182✔
51
  int32_t code = TSDB_CODE_SUCCESS;
3,753,182✔
52
  int64_t startNs = taosGetTimestampNs();
3,770,178✔
53
  int32_t pathLen = 0;
3,770,178✔
54

55
  *ppTtlMgr = NULL;
3,770,178✔
56

57
  STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
3,770,689✔
58
  if (pTtlMgr == NULL) TAOS_RETURN(terrno);
3,764,739✔
59

60
  pathLen = strlen(logPrefix) + 1;
3,764,739✔
61
  char *logBuffer = (char *)tdbOsCalloc(1, pathLen);
3,764,739✔
62
  if (logBuffer == NULL) {
3,767,697✔
63
    tdbOsFree(pTtlMgr);
×
64
    TAOS_RETURN(terrno);
×
65
  }
66
  tstrncpy(logBuffer, logPrefix, pathLen);
3,767,697✔
67
  pTtlMgr->logPrefix = logBuffer;
3,767,645✔
68
  pTtlMgr->flushThreshold = flushThreshold;
3,768,089✔
69

70
  code = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
3,767,361✔
71
  if (TSDB_CODE_SUCCESS != code) {
3,770,042✔
72
    metaError("%s, failed to open %s since %s", pTtlMgr->logPrefix, ttlV1Tbname, tstrerror(code));
×
73
    tdbOsFree(pTtlMgr);
×
74
    TAOS_RETURN(code);
×
75
  }
76

77
  pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
3,770,042✔
78
  pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
3,770,689✔
79

80
  if ((code = ttlMgrFillCache(pTtlMgr)) != TSDB_CODE_SUCCESS) {
3,770,689✔
81
    metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
×
82
    ttlMgrCleanup(pTtlMgr);
×
83
    TAOS_RETURN(code);
×
84
  }
85

86
  int64_t endNs = taosGetTimestampNs();
3,770,689✔
87
  metaInfo("%s, ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
3,770,689✔
88
           taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
89

90
  *ppTtlMgr = pTtlMgr;
3,770,936✔
91
  TAOS_RETURN(TSDB_CODE_SUCCESS);
3,770,689✔
92
}
93

94
void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); }
3,770,689✔
95

96
bool ttlMgrNeedUpgrade(TDB *pEnv) {
3,765,427✔
97
  bool needUpgrade = tdbTbExist(ttlTbname, pEnv);
3,765,427✔
98
  if (needUpgrade) {
3,765,056✔
99
    metaInfo("find ttl idx in old version , will convert");
×
100
  }
101
  return needUpgrade;
3,765,056✔
102
}
103

104
int32_t ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
×
105
  SMeta  *meta = (SMeta *)pMeta;
×
106
  int32_t code = TSDB_CODE_SUCCESS;
×
107

108
  if (!tdbTbExist(ttlTbname, meta->pEnv)) TAOS_RETURN(TSDB_CODE_SUCCESS);
×
109

110
  metaInfo("%s, ttl mgr start upgrade", pTtlMgr->logPrefix);
×
111

112
  int64_t startNs = taosGetTimestampNs();
×
113

114
  code = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
×
115
  if (TSDB_CODE_SUCCESS != code) {
×
116
    metaError("%s, failed to open %s index since %s", pTtlMgr->logPrefix, ttlTbname, tstrerror(code));
×
117
    goto _out;
×
118
  }
119

120
  if ((code = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta)) != TSDB_CODE_SUCCESS) {
×
121
    metaError("%s, failed to convert ttl index since %s", pTtlMgr->logPrefix, tstrerror(code));
×
122
    goto _out;
×
123
  }
124

125
  if ((code = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn)) != TSDB_CODE_SUCCESS) {
×
126
    metaError("%s, failed to drop old ttl index since %s", pTtlMgr->logPrefix, tstrerror(code));
×
127
    goto _out;
×
128
  }
129

130
  if ((code = ttlMgrFillCache(pTtlMgr)) != TSDB_CODE_SUCCESS) {
×
131
    metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(code));
×
132
    goto _out;
×
133
  }
134

135
  int64_t endNs = taosGetTimestampNs();
×
136
  metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
×
137
           taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
138

139
_out:
×
140
  tdbTbClose(pTtlMgr->pOldTtlIdx);
×
141
  pTtlMgr->pOldTtlIdx = NULL;
×
142

143
  TAOS_RETURN(code);
×
144
}
145

146
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
3,770,689✔
147
  taosMemoryFree(pTtlMgr->logPrefix);
3,770,689✔
148
  taosHashCleanup(pTtlMgr->pTtlCache);
3,770,689✔
149
  taosHashCleanup(pTtlMgr->pDirtyUids);
3,770,689✔
150
  tdbTbClose(pTtlMgr->pTtlIdx);
3,770,689✔
151
  taosMemoryFree(pTtlMgr);
3,770,689✔
152
}
3,770,689✔
153

154
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
237,082✔
155
  if (ttlDays <= 0) return;
237,082✔
156

157
  pTtlKey->deleteTimeMs = changeTimeMs + ttlDays * tsTtlUnit * 1000;
237,082✔
158
  pTtlKey->uid = uid;
237,082✔
159
}
160

161
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
×
162
  STtlIdxKey *pTtlIdxKey1 = (STtlIdxKey *)pKey1;
×
163
  STtlIdxKey *pTtlIdxKey2 = (STtlIdxKey *)pKey2;
×
164

165
  if (pTtlIdxKey1->deleteTimeSec > pTtlIdxKey2->deleteTimeSec) {
×
166
    return 1;
×
167
  } else if (pTtlIdxKey1->deleteTimeSec < pTtlIdxKey2->deleteTimeSec) {
×
168
    return -1;
×
169
  }
170

171
  if (pTtlIdxKey1->uid > pTtlIdxKey2->uid) {
×
172
    return 1;
×
173
  } else if (pTtlIdxKey1->uid < pTtlIdxKey2->uid) {
×
174
    return -1;
×
175
  }
176

177
  return 0;
×
178
}
179

180
static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
631,051✔
181
  STtlIdxKeyV1 *pTtlIdxKey1 = (STtlIdxKeyV1 *)pKey1;
631,051✔
182
  STtlIdxKeyV1 *pTtlIdxKey2 = (STtlIdxKeyV1 *)pKey2;
631,051✔
183

184
  if (pTtlIdxKey1->deleteTimeMs > pTtlIdxKey2->deleteTimeMs) {
631,051✔
185
    return 1;
218,411✔
186
  } else if (pTtlIdxKey1->deleteTimeMs < pTtlIdxKey2->deleteTimeMs) {
412,640✔
187
    return -1;
18,290✔
188
  }
189

190
  if (pTtlIdxKey1->uid > pTtlIdxKey2->uid) {
394,350✔
191
    return 1;
379,221✔
192
  } else if (pTtlIdxKey1->uid < pTtlIdxKey2->uid) {
15,129✔
193
    return -1;
×
194
  }
195

196
  return 0;
15,129✔
197
}
198

199
static int ttlMgrFillCache(STtlManger *pTtlMgr) {
3,768,938✔
200
  return tdbTbTraversal(pTtlMgr->pTtlIdx, pTtlMgr->pTtlCache, ttlMgrFillCacheOneEntry);
3,768,938✔
201
}
202

203
static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache) {
69✔
204
  SHashObj *pCache = (SHashObj *)pTtlCache;
69✔
205

206
  STtlIdxKeyV1 *ttlKey = (STtlIdxKeyV1 *)pKey;
69✔
207
  tb_uid_t      uid = ttlKey->uid;
69✔
208
  int64_t       ttlDays = *(int64_t *)pVal;
69✔
209
  int64_t       changeTimeMs = ttlKey->deleteTimeMs - ttlDays * tsTtlUnit * 1000;
69✔
210

211
  STtlCacheEntry data = {
69✔
212
      .ttlDays = ttlDays, .changeTimeMs = changeTimeMs, .ttlDaysDirty = ttlDays, .changeTimeMsDirty = changeTimeMs};
213

214
  return taosHashPut(pCache, &uid, sizeof(uid), &data, sizeof(data));
69✔
215
}
216

217
static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData) {
×
218
  SConvertData *pData = (SConvertData *)pConvertData;
×
219

220
  STtlIdxKey *ttlKey = (STtlIdxKey *)pKey;
×
221
  tb_uid_t    uid = ttlKey->uid;
×
222
  int64_t     ttlDays = 0;
×
223

224
  int32_t code = TSDB_CODE_SUCCESS;
×
225
  if ((code = metaGetTableTtlByUid(pData->pMeta, uid, &ttlDays)) != TSDB_CODE_SUCCESS) {
×
226
    metaError("ttlMgr convert failed to get ttl since %s", tstrerror(code));
×
227
    goto _out;
×
228
  }
229

230
  STtlIdxKeyV1 ttlKeyV1 = {.deleteTimeMs = ttlKey->deleteTimeSec * 1000, .uid = uid};
×
231
  code = tdbTbUpsert(pData->pNewTtlIdx, &ttlKeyV1, sizeof(ttlKeyV1), &ttlDays, sizeof(ttlDays), pData->pMeta->txn);
×
232
  if (code != TSDB_CODE_SUCCESS) {
×
233
    metaError("ttlMgr convert failed to upsert since %s", tstrerror(code));
×
234
    goto _out;
×
235
  }
236

237
  code = TSDB_CODE_SUCCESS;
×
238

239
_out:
×
240
  TAOS_RETURN(code);
×
241
}
242

243
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
91,985✔
244
                                         void *pExpiredCtx) {
245
  STtlExpiredCtx *pCtx = (STtlExpiredCtx *)pExpiredCtx;
91,985✔
246
  if (pCtx->count >= pCtx->ttlDropMaxCount) return -1;
91,985✔
247

248
  int c = ttlIdxKeyV1Cmpr(&pCtx->expiredKey, sizeof(pCtx->expiredKey), pKey, keyLen);
91,330✔
249
  if (c > 0) {
90,705✔
250
    if (NULL == taosArrayPush(pCtx->pTbUids, &((STtlIdxKeyV1 *)pKey)->uid)) {
149,295✔
251
      metaError("ttlMgr find expired failed since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
252
      return -1;
×
253
    }
254
    pCtx->count++;
74,960✔
255
  }
256

257
  return c;
90,705✔
258
}
259

260
// static int32_t ttlMgrDumpOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pDumpCtx) {
261
//   STtlIdxKeyV1 *ttlKey = (STtlIdxKeyV1 *)pKey;
262
//   int64_t      *ttlDays = (int64_t *)pVal;
263

264
//   metaInfo("ttlMgr dump, ttl: %" PRId64 ", ctime: %" PRId64 ", uid: %" PRId64, *ttlDays, ttlKey->deleteTimeMs,
265
//            ttlKey->uid);
266

267
//   TAOS_RETURN(TSDB_CODE_SUCCESS);
268
// }
269

270
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
×
271
  SMeta *meta = pMeta;
×
272

273
  metaInfo("ttlMgr convert start.");
×
274

275
  SConvertData cvData = {.pNewTtlIdx = pNewTtlIdx, .pMeta = meta};
×
276

277
  int code = TSDB_CODE_SUCCESS;
×
278
  if ((code = tdbTbTraversal(pOldTtlIdx, &cvData, ttlMgrConvertOneEntry)) != TSDB_CODE_SUCCESS) {
×
279
    metaError("failed to convert since %s", tstrerror(code));
×
280
  }
281

282
  metaInfo("ttlMgr convert end.");
×
283
  TAOS_RETURN(code);
×
284
}
285

286
int32_t ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
62,831,114✔
287
  if (updCtx->ttlDays == 0) return 0;
62,831,114✔
288

289
  STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays,
110,877✔
290
                               .changeTimeMs = updCtx->changeTimeMs,
110,877✔
291
                               .ttlDaysDirty = updCtx->ttlDays,
110,877✔
292
                               .changeTimeMsDirty = updCtx->changeTimeMs};
110,877✔
293
  STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
110,877✔
294

295
  int32_t code = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
110,877✔
296
  if (TSDB_CODE_SUCCESS != code) {
110,877✔
297
    metaError("%s, ttlMgr insert failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(code));
×
298
    goto _out;
×
299
  }
300

301
  code = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
110,877✔
302
  if (TSDB_CODE_SUCCESS != code) {
110,877✔
303
    metaError("%s, ttlMgr insert failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
×
304
    goto _out;
×
305
  }
306

307
  if (ttlMgrNeedFlush(pTtlMgr)) {
110,877✔
308
    int32_t ret = ttlMgrFlush(pTtlMgr, updCtx->pTxn);
×
309
    if (ret < 0) {
×
310
      metaError("%s, ttlMgr insert failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
×
311
    }
312
  }
313

314
  code = TSDB_CODE_SUCCESS;
110,877✔
315

316
_out:
110,877✔
317
  metaTrace("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
110,877✔
318
            updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
319

320
  TAOS_RETURN(code);
110,877✔
321
}
322

323
int32_t ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
2,080,706✔
324
  if (delCtx->ttlDays == 0) return 0;
2,080,706✔
325

326
  STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DELETE};
39,639✔
327

328
  int32_t code = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
39,639✔
329
  if (TSDB_CODE_SUCCESS != code) {
39,639✔
330
    metaError("%s, ttlMgr del failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
×
331
    goto _out;
×
332
  }
333

334
  if (ttlMgrNeedFlush(pTtlMgr)) {
39,639✔
335
    int32_t ret = ttlMgrFlush(pTtlMgr, delCtx->pTxn);
×
336
    if (ret < 0) {
×
337
      metaError("%s, ttlMgr del failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
×
338
    }
339
  }
340

341
  code = TSDB_CODE_SUCCESS;
39,639✔
342

343
_out:
39,639✔
344
  metaTrace("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
39,639✔
345
  TAOS_RETURN(code);
39,639✔
346
}
347

348
int32_t ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
20,194,402✔
349
  int32_t code = TSDB_CODE_SUCCESS;
20,194,402✔
350

351
  STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
20,194,402✔
352
  if (oldData == NULL) {
20,194,402✔
353
    goto _out;
20,192,482✔
354
  }
355

356
  STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays,
1,920✔
357
                               .changeTimeMs = oldData->changeTimeMs,
1,920✔
358
                               .ttlDaysDirty = oldData->ttlDays,
1,920✔
359
                               .changeTimeMsDirty = pUpdCtimeCtx->changeTimeMs};
1,920✔
360
  STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
1,920✔
361

362
  code =
363
      taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
1,920✔
364
  if (TSDB_CODE_SUCCESS != code) {
1,920✔
365
    metaError("%s, ttlMgr update ctime failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(code));
×
366
    goto _out;
×
367
  }
368

369
  code = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
1,920✔
370
                     sizeof(dirtryEntry));
371
  if (TSDB_CODE_SUCCESS != code) {
1,920✔
372
    metaError("%s, ttlMgr update ctime failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
×
373
    goto _out;
×
374
  }
375

376
  if (ttlMgrNeedFlush(pTtlMgr)) {
1,920✔
377
    int32_t ret = ttlMgrFlush(pTtlMgr, pUpdCtimeCtx->pTxn);
×
378
    if (ret < 0) {
×
379
      metaError("%s, ttlMgr update ctime failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
×
380
    }
381
  }
382

383
  code = TSDB_CODE_SUCCESS;
1,920✔
384

385
_out:
20,194,402✔
386
  metaTrace("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
20,194,402✔
387
            pUpdCtimeCtx->changeTimeMs);
388
  TAOS_RETURN(code);
20,194,402✔
389
}
390

391
int32_t ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) {
14,110,666✔
392
  int32_t code = TSDB_CODE_SUCCESS;
14,110,666✔
393

394
  STtlIdxKeyV1   ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX};
14,110,666✔
395
  STtlExpiredCtx expiredCtx = {
14,110,666✔
396
      .ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids};
397
  TAOS_CHECK_GOTO(tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry), NULL, _out);
14,111,241✔
398

399
  size_t vIdx = 0;
14,108,185✔
400
  for (size_t i = 0; i < pTbUids->size; i++) {
14,183,145✔
401
    tb_uid_t *pUid = taosArrayGet(pTbUids, i);
74,960✔
402
    if (taosHashGet(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t)) == NULL) {
74,960✔
403
      // not in dirty && expired in tdb => must be expired
404
      taosArraySet(pTbUids, vIdx, pUid);
46,430✔
405
      vIdx++;
45,805✔
406
    }
407
  }
408

409
  taosArrayPopTailBatch(pTbUids, pTbUids->size - vIdx);
14,107,465✔
410

411
_out:
14,102,523✔
412
  TAOS_RETURN(code);
14,103,961✔
413
}
414

415
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) {
152,436✔
416
  return pTtlMgr->flushThreshold > 0 && taosHashGetSize(pTtlMgr->pDirtyUids) > pTtlMgr->flushThreshold;
152,436✔
417
}
418

419
int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
4,730,348✔
420
  int64_t startNs = taosGetTimestampNs();
4,744,465✔
421
  int64_t endNs = startNs;
4,744,465✔
422

423
  metaTrace("%s, ttl mgr flush start. num of dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
4,744,465✔
424

425
  int32_t code = TSDB_CODE_SUCCESS;
4,742,400✔
426

427
  void *pIter = NULL;
4,742,400✔
428
  while ((pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIter)) != NULL) {
4,860,941✔
429
    STtlDirtyEntry *pEntry = (STtlDirtyEntry *)pIter;
118,541✔
430
    tb_uid_t       *pUid = taosHashGetKey(pIter, NULL);
118,541✔
431

432
    STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
118,541✔
433
    if (cacheEntry == NULL) {
118,541✔
434
      metaError("%s, ttlMgr flush failed to get ttl cache, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, *pUid,
×
435
                pEntry->type);
436
      continue;
×
437
    }
438

439
    STtlIdxKeyV1 ttlKey;
118,421✔
440
    ttlMgrBuildKey(&ttlKey, cacheEntry->ttlDays, cacheEntry->changeTimeMs, *pUid);
118,541✔
441

442
    STtlIdxKeyV1 ttlKeyDirty;
118,421✔
443
    ttlMgrBuildKey(&ttlKeyDirty, cacheEntry->ttlDaysDirty, cacheEntry->changeTimeMsDirty, *pUid);
118,541✔
444

445
    if (pEntry->type == ENTRY_TYPE_UPSERT) {
118,541✔
446
      // delete old key & upsert new key
447
      code = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);  // maybe first insert, ignore error
104,052✔
448
      if (TSDB_CODE_SUCCESS != code && TSDB_CODE_NOT_FOUND != code) {
104,052✔
449
        metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(code));
×
450
        continue;
×
451
      }
452
      code = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKeyDirty, sizeof(ttlKeyDirty), &cacheEntry->ttlDaysDirty,
104,052✔
453
                         sizeof(cacheEntry->ttlDaysDirty), pTxn);
454
      if (TSDB_CODE_SUCCESS != code) {
104,052✔
455
        metaError("%s, ttlMgr flush failed to upsert since %s", pTtlMgr->logPrefix, tstrerror(code));
×
456
        continue;
×
457
      }
458

459
      cacheEntry->ttlDays = cacheEntry->ttlDaysDirty;
104,052✔
460
      cacheEntry->changeTimeMs = cacheEntry->changeTimeMsDirty;
104,052✔
461
    } else if (pEntry->type == ENTRY_TYPE_DELETE) {
14,489✔
462
      code = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
14,489✔
463
      if (TSDB_CODE_SUCCESS != code && TSDB_CODE_NOT_FOUND != code) {
14,489✔
464
        metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(code));
×
465
        continue;
×
466
      }
467

468
      code = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
14,489✔
469
      if (TSDB_CODE_SUCCESS != code) {
14,489✔
470
        metaError("%s, ttlMgr flush failed to remove cache since %s", pTtlMgr->logPrefix, tstrerror(code));
×
471
        continue;
×
472
      }
473
    } else {
474
      metaError("%s, ttlMgr flush failed, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
×
475
      continue;
×
476
    }
477

478
    metaDebug("isdel:%d", pEntry->type == ENTRY_TYPE_DELETE);
118,541✔
479
    metaDebug("ttlkey:%" PRId64 ", uid:%" PRId64, ttlKey.deleteTimeMs, ttlKey.uid);
118,541✔
480
    metaDebug("ttlkeyDirty:%" PRId64 ", uid:%" PRId64, ttlKeyDirty.deleteTimeMs, ttlKeyDirty.uid);
118,541✔
481

482
    code = taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(*pUid));
118,541✔
483
    if (TSDB_CODE_SUCCESS != code) {
118,541✔
484
      metaError("%s, ttlMgr flush failed to remove dirty uid since %s", pTtlMgr->logPrefix, tstrerror(code));
×
485
      continue;
×
486
    }
487
  }
488

489
  int32_t count = taosHashGetSize(pTtlMgr->pDirtyUids);
4,743,271✔
490
  if (count != 0) {
4,741,613✔
491
    taosHashClear(pTtlMgr->pDirtyUids);
×
492
    metaError("%s, ttlMgr flush failed, dirty uids not empty, count: %d", pTtlMgr->logPrefix, count);
×
493
    code = TSDB_CODE_VND_TTL_FLUSH_INCOMPLETION;
×
494

495
    goto _out;
×
496
  }
497

498
  code = TSDB_CODE_SUCCESS;
4,741,613✔
499

500
_out:
4,741,613✔
501
  taosHashCancelIterate(pTtlMgr->pDirtyUids, pIter);
4,741,613✔
502

503
  endNs = taosGetTimestampNs();
4,743,378✔
504
  metaTrace("%s, ttl mgr flush end, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, endNs - startNs);
4,743,378✔
505

506
  TAOS_RETURN(code);
4,743,378✔
507
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc