• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3819

01 Apr 2025 09:27AM UTC coverage: 34.076% (+0.01%) from 34.065%
#3819

push

travis-ci

happyguoxy
test:alter gcda dir

148544 of 599532 branches covered (24.78%)

Branch coverage included in aggregate %.

222541 of 489451 relevant lines covered (45.47%)

763329.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.94
/source/dnode/vnode/src/meta/metaTtl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "metaTtl.h"
17
#include "meta.h"
18

19
typedef struct {
20
  TTB   *pNewTtlIdx;
21
  SMeta *pMeta;
22
} SConvertData;
23

24
typedef struct {
25
  int32_t      ttlDropMaxCount;
26
  int32_t      count;
27
  STtlIdxKeyV1 expiredKey;
28
  SArray      *pTbUids;
29
} STtlExpiredCtx;
30

31
static void ttlMgrCleanup(STtlManger *pTtlMgr);
32

33
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta);
34

35
static void    ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid);
36
static int     ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
37
static int     ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
38
static int     ttlMgrFillCache(STtlManger *pTtlMgr);
39
static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache);
40
static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData);
41
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
42
                                         void *pExpiredInfo);
43

44
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr);
45

46
const char *ttlTbname = "ttl.idx";
47
const char *ttlV1Tbname = "ttlv1.idx";
48

49
int32_t ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix, int32_t flushThreshold) {
275✔
50
  int32_t code = TSDB_CODE_SUCCESS;
275✔
51
  int64_t startNs = taosGetTimestampNs();
277✔
52
  int32_t pathLen = 0;
277✔
53

54
  *ppTtlMgr = NULL;
277✔
55

56
  STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
277!
57
  if (pTtlMgr == NULL) TAOS_RETURN(terrno);
276!
58

59
  pathLen = strlen(logPrefix) + 1;
276✔
60
  char *logBuffer = (char *)tdbOsCalloc(1, pathLen);
276!
61
  if (logBuffer == NULL) {
276!
62
    tdbOsFree(pTtlMgr);
×
63
    TAOS_RETURN(terrno);
×
64
  }
65
  tstrncpy(logBuffer, logPrefix, pathLen);
276✔
66
  pTtlMgr->logPrefix = logBuffer;
276✔
67
  pTtlMgr->flushThreshold = flushThreshold;
276✔
68

69
  code = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
276✔
70
  if (TSDB_CODE_SUCCESS != code) {
277!
71
    metaError("%s, failed to open %s since %s", pTtlMgr->logPrefix, ttlV1Tbname, tstrerror(code));
×
72
    tdbOsFree(pTtlMgr);
×
73
    TAOS_RETURN(code);
×
74
  }
75

76
  pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
277✔
77
  pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
277✔
78

79
  if ((code = ttlMgrFillCache(pTtlMgr)) != TSDB_CODE_SUCCESS) {
277!
80
    metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
×
81
    ttlMgrCleanup(pTtlMgr);
×
82
    TAOS_RETURN(code);
×
83
  }
84

85
  int64_t endNs = taosGetTimestampNs();
277✔
86
  metaInfo("%s, ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
277!
87
           taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
88

89
  *ppTtlMgr = pTtlMgr;
277✔
90
  TAOS_RETURN(TSDB_CODE_SUCCESS);
277✔
91
}
92

93
void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); }
277✔
94

95
bool ttlMgrNeedUpgrade(TDB *pEnv) {
277✔
96
  bool needUpgrade = tdbTbExist(ttlTbname, pEnv);
277✔
97
  if (needUpgrade) {
277!
98
    metaInfo("find ttl idx in old version , will convert");
×
99
  }
100
  return needUpgrade;
277✔
101
}
102

103
int32_t ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
×
104
  SMeta  *meta = (SMeta *)pMeta;
×
105
  int32_t code = TSDB_CODE_SUCCESS;
×
106

107
  if (!tdbTbExist(ttlTbname, meta->pEnv)) TAOS_RETURN(TSDB_CODE_SUCCESS);
×
108

109
  metaInfo("%s, ttl mgr start upgrade", pTtlMgr->logPrefix);
×
110

111
  int64_t startNs = taosGetTimestampNs();
×
112

113
  code = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
×
114
  if (TSDB_CODE_SUCCESS != code) {
×
115
    metaError("%s, failed to open %s index since %s", pTtlMgr->logPrefix, ttlTbname, tstrerror(code));
×
116
    goto _out;
×
117
  }
118

119
  if ((code = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta)) != TSDB_CODE_SUCCESS) {
×
120
    metaError("%s, failed to convert ttl index since %s", pTtlMgr->logPrefix, tstrerror(code));
×
121
    goto _out;
×
122
  }
123

124
  if ((code = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn)) != TSDB_CODE_SUCCESS) {
×
125
    metaError("%s, failed to drop old ttl index since %s", pTtlMgr->logPrefix, tstrerror(code));
×
126
    goto _out;
×
127
  }
128

129
  if ((code = ttlMgrFillCache(pTtlMgr)) != TSDB_CODE_SUCCESS) {
×
130
    metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(code));
×
131
    goto _out;
×
132
  }
133

134
  int64_t endNs = taosGetTimestampNs();
×
135
  metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
×
136
           taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
137

138
_out:
×
139
  tdbTbClose(pTtlMgr->pOldTtlIdx);
×
140
  pTtlMgr->pOldTtlIdx = NULL;
×
141

142
  TAOS_RETURN(code);
×
143
}
144

145
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
277✔
146
  taosMemoryFree(pTtlMgr->logPrefix);
277!
147
  taosHashCleanup(pTtlMgr->pTtlCache);
277✔
148
  taosHashCleanup(pTtlMgr->pDirtyUids);
277✔
149
  tdbTbClose(pTtlMgr->pTtlIdx);
277✔
150
  taosMemoryFree(pTtlMgr);
277!
151
}
277✔
152

153
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
4✔
154
  if (ttlDays <= 0) return;
4!
155

156
  pTtlKey->deleteTimeMs = changeTimeMs + ttlDays * tsTtlUnit * 1000;
4✔
157
  pTtlKey->uid = uid;
4✔
158
}
159

160
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
×
161
  STtlIdxKey *pTtlIdxKey1 = (STtlIdxKey *)pKey1;
×
162
  STtlIdxKey *pTtlIdxKey2 = (STtlIdxKey *)pKey2;
×
163

164
  if (pTtlIdxKey1->deleteTimeSec > pTtlIdxKey2->deleteTimeSec) {
×
165
    return 1;
×
166
  } else if (pTtlIdxKey1->deleteTimeSec < pTtlIdxKey2->deleteTimeSec) {
×
167
    return -1;
×
168
  }
169

170
  if (pTtlIdxKey1->uid > pTtlIdxKey2->uid) {
×
171
    return 1;
×
172
  } else if (pTtlIdxKey1->uid < pTtlIdxKey2->uid) {
×
173
    return -1;
×
174
  }
175

176
  return 0;
×
177
}
178

179
static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
×
180
  STtlIdxKeyV1 *pTtlIdxKey1 = (STtlIdxKeyV1 *)pKey1;
×
181
  STtlIdxKeyV1 *pTtlIdxKey2 = (STtlIdxKeyV1 *)pKey2;
×
182

183
  if (pTtlIdxKey1->deleteTimeMs > pTtlIdxKey2->deleteTimeMs) {
×
184
    return 1;
×
185
  } else if (pTtlIdxKey1->deleteTimeMs < pTtlIdxKey2->deleteTimeMs) {
×
186
    return -1;
×
187
  }
188

189
  if (pTtlIdxKey1->uid > pTtlIdxKey2->uid) {
×
190
    return 1;
×
191
  } else if (pTtlIdxKey1->uid < pTtlIdxKey2->uid) {
×
192
    return -1;
×
193
  }
194

195
  return 0;
×
196
}
197

198
static int ttlMgrFillCache(STtlManger *pTtlMgr) {
277✔
199
  return tdbTbTraversal(pTtlMgr->pTtlIdx, pTtlMgr->pTtlCache, ttlMgrFillCacheOneEntry);
277✔
200
}
201

202
static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache) {
×
203
  SHashObj *pCache = (SHashObj *)pTtlCache;
×
204

205
  STtlIdxKeyV1 *ttlKey = (STtlIdxKeyV1 *)pKey;
×
206
  tb_uid_t      uid = ttlKey->uid;
×
207
  int64_t       ttlDays = *(int64_t *)pVal;
×
208
  int64_t       changeTimeMs = ttlKey->deleteTimeMs - ttlDays * tsTtlUnit * 1000;
×
209

210
  STtlCacheEntry data = {
×
211
      .ttlDays = ttlDays, .changeTimeMs = changeTimeMs, .ttlDaysDirty = ttlDays, .changeTimeMsDirty = changeTimeMs};
212

213
  return taosHashPut(pCache, &uid, sizeof(uid), &data, sizeof(data));
×
214
}
215

216
static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData) {
×
217
  SConvertData *pData = (SConvertData *)pConvertData;
×
218

219
  STtlIdxKey *ttlKey = (STtlIdxKey *)pKey;
×
220
  tb_uid_t    uid = ttlKey->uid;
×
221
  int64_t     ttlDays = 0;
×
222

223
  int32_t code = TSDB_CODE_SUCCESS;
×
224
  if ((code = metaGetTableTtlByUid(pData->pMeta, uid, &ttlDays)) != TSDB_CODE_SUCCESS) {
×
225
    metaError("ttlMgr convert failed to get ttl since %s", tstrerror(code));
×
226
    goto _out;
×
227
  }
228

229
  STtlIdxKeyV1 ttlKeyV1 = {.deleteTimeMs = ttlKey->deleteTimeSec * 1000, .uid = uid};
×
230
  code = tdbTbUpsert(pData->pNewTtlIdx, &ttlKeyV1, sizeof(ttlKeyV1), &ttlDays, sizeof(ttlDays), pData->pMeta->txn);
×
231
  if (code != TSDB_CODE_SUCCESS) {
×
232
    metaError("ttlMgr convert failed to upsert since %s", tstrerror(code));
×
233
    goto _out;
×
234
  }
235

236
  code = TSDB_CODE_SUCCESS;
×
237

238
_out:
×
239
  TAOS_RETURN(code);
×
240
}
241

242
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
×
243
                                         void *pExpiredCtx) {
244
  STtlExpiredCtx *pCtx = (STtlExpiredCtx *)pExpiredCtx;
×
245
  if (pCtx->count >= pCtx->ttlDropMaxCount) return -1;
×
246

247
  int c = ttlIdxKeyV1Cmpr(&pCtx->expiredKey, sizeof(pCtx->expiredKey), pKey, keyLen);
×
248
  if (c > 0) {
×
249
    if (NULL == taosArrayPush(pCtx->pTbUids, &((STtlIdxKeyV1 *)pKey)->uid)) {
×
250
      metaError("ttlMgr find expired failed since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
251
      return -1;
×
252
    }
253
    pCtx->count++;
×
254
  }
255

256
  return c;
×
257
}
258

259
// static int32_t ttlMgrDumpOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pDumpCtx) {
260
//   STtlIdxKeyV1 *ttlKey = (STtlIdxKeyV1 *)pKey;
261
//   int64_t      *ttlDays = (int64_t *)pVal;
262

263
//   metaInfo("ttlMgr dump, ttl: %" PRId64 ", ctime: %" PRId64 ", uid: %" PRId64, *ttlDays, ttlKey->deleteTimeMs,
264
//            ttlKey->uid);
265

266
//   TAOS_RETURN(TSDB_CODE_SUCCESS);
267
// }
268

269
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
×
270
  SMeta *meta = pMeta;
×
271

272
  metaInfo("ttlMgr convert start.");
×
273

274
  SConvertData cvData = {.pNewTtlIdx = pNewTtlIdx, .pMeta = meta};
×
275

276
  int code = TSDB_CODE_SUCCESS;
×
277
  if ((code = tdbTbTraversal(pOldTtlIdx, &cvData, ttlMgrConvertOneEntry)) != TSDB_CODE_SUCCESS) {
×
278
    metaError("failed to convert since %s", tstrerror(code));
×
279
  }
280

281
  metaInfo("ttlMgr convert end.");
×
282
  TAOS_RETURN(code);
×
283
}
284

285
int32_t ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
3,437✔
286
  if (updCtx->ttlDays == 0) return 0;
3,437!
287

288
  STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays,
×
289
                               .changeTimeMs = updCtx->changeTimeMs,
×
290
                               .ttlDaysDirty = updCtx->ttlDays,
×
291
                               .changeTimeMsDirty = updCtx->changeTimeMs};
×
292
  STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
×
293

294
  int32_t code = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
×
295
  if (TSDB_CODE_SUCCESS != code) {
2!
296
    metaError("%s, ttlMgr insert failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(code));
×
297
    goto _out;
×
298
  }
299

300
  code = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
2✔
301
  if (TSDB_CODE_SUCCESS != code) {
2!
302
    metaError("%s, ttlMgr insert failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
×
303
    goto _out;
×
304
  }
305

306
  if (ttlMgrNeedFlush(pTtlMgr)) {
2!
307
    int32_t ret = ttlMgrFlush(pTtlMgr, updCtx->pTxn);
×
308
    if (ret < 0) {
×
309
      metaError("%s, ttlMgr insert failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
×
310
    }
311
  }
312

313
  code = TSDB_CODE_SUCCESS;
2✔
314

315
_out:
2✔
316
  metaTrace("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
2!
317
            updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
318

319
  TAOS_RETURN(code);
2✔
320
}
321

322
int32_t ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
537✔
323
  if (delCtx->ttlDays == 0) return 0;
537!
324

325
  STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DELETE};
×
326

327
  int32_t code = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
×
328
  if (TSDB_CODE_SUCCESS != code) {
×
329
    metaError("%s, ttlMgr del failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
×
330
    goto _out;
×
331
  }
332

333
  if (ttlMgrNeedFlush(pTtlMgr)) {
×
334
    int32_t ret = ttlMgrFlush(pTtlMgr, delCtx->pTxn);
×
335
    if (ret < 0) {
×
336
      metaError("%s, ttlMgr del failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
×
337
    }
338
  }
339

340
  code = TSDB_CODE_SUCCESS;
×
341

342
_out:
×
343
  metaTrace("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
×
344
  TAOS_RETURN(code);
×
345
}
346

347
int32_t ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
×
348
  int32_t code = TSDB_CODE_SUCCESS;
×
349

350
  STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
×
351
  if (oldData == NULL) {
×
352
    goto _out;
×
353
  }
354

355
  STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays,
×
356
                               .changeTimeMs = oldData->changeTimeMs,
×
357
                               .ttlDaysDirty = oldData->ttlDays,
×
358
                               .changeTimeMsDirty = pUpdCtimeCtx->changeTimeMs};
×
359
  STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
×
360

361
  code =
362
      taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
×
363
  if (TSDB_CODE_SUCCESS != code) {
×
364
    metaError("%s, ttlMgr update ctime failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(code));
×
365
    goto _out;
×
366
  }
367

368
  code = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
×
369
                     sizeof(dirtryEntry));
370
  if (TSDB_CODE_SUCCESS != code) {
×
371
    metaError("%s, ttlMgr update ctime failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
×
372
    goto _out;
×
373
  }
374

375
  if (ttlMgrNeedFlush(pTtlMgr)) {
×
376
    int32_t ret = ttlMgrFlush(pTtlMgr, pUpdCtimeCtx->pTxn);
×
377
    if (ret < 0) {
×
378
      metaError("%s, ttlMgr update ctime failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
×
379
    }
380
  }
381

382
  code = TSDB_CODE_SUCCESS;
×
383

384
_out:
×
385
  metaTrace("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
×
386
            pUpdCtimeCtx->changeTimeMs);
387
  TAOS_RETURN(code);
×
388
}
389

390
int32_t ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) {
510✔
391
  int32_t code = TSDB_CODE_SUCCESS;
510✔
392

393
  STtlIdxKeyV1   ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX};
510✔
394
  STtlExpiredCtx expiredCtx = {
510✔
395
      .ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids};
396
  TAOS_CHECK_GOTO(tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry), NULL, _out);
510!
397

398
  size_t vIdx = 0;
511✔
399
  for (size_t i = 0; i < pTbUids->size; i++) {
511!
400
    tb_uid_t *pUid = taosArrayGet(pTbUids, i);
×
401
    if (taosHashGet(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t)) == NULL) {
×
402
      // not in dirty && expired in tdb => must be expired
403
      taosArraySet(pTbUids, vIdx, pUid);
×
404
      vIdx++;
×
405
    }
406
  }
407

408
  taosArrayPopTailBatch(pTbUids, pTbUids->size - vIdx);
511✔
409

410
_out:
510✔
411
  TAOS_RETURN(code);
510✔
412
}
413

414
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) {
2✔
415
  return pTtlMgr->flushThreshold > 0 && taosHashGetSize(pTtlMgr->pDirtyUids) > pTtlMgr->flushThreshold;
2!
416
}
417

418
int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
719✔
419
  int64_t startNs = taosGetTimestampNs();
719✔
420
  int64_t endNs = startNs;
719✔
421

422
  metaTrace("%s, ttl mgr flush start. num of dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
719✔
423

424
  int32_t code = TSDB_CODE_SUCCESS;
719✔
425

426
  void *pIter = NULL;
719✔
427
  while ((pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIter)) != NULL) {
721✔
428
    STtlDirtyEntry *pEntry = (STtlDirtyEntry *)pIter;
2✔
429
    tb_uid_t       *pUid = taosHashGetKey(pIter, NULL);
2✔
430

431
    STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
2✔
432
    if (cacheEntry == NULL) {
2!
433
      metaError("%s, ttlMgr flush failed to get ttl cache, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, *pUid,
×
434
                pEntry->type);
435
      continue;
×
436
    }
437

438
    STtlIdxKeyV1 ttlKey;
439
    ttlMgrBuildKey(&ttlKey, cacheEntry->ttlDays, cacheEntry->changeTimeMs, *pUid);
2✔
440

441
    STtlIdxKeyV1 ttlKeyDirty;
442
    ttlMgrBuildKey(&ttlKeyDirty, cacheEntry->ttlDaysDirty, cacheEntry->changeTimeMsDirty, *pUid);
2✔
443

444
    if (pEntry->type == ENTRY_TYPE_UPSERT) {
2!
445
      // delete old key & upsert new key
446
      code = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);  // maybe first insert, ignore error
2✔
447
      if (TSDB_CODE_SUCCESS != code && TSDB_CODE_NOT_FOUND != code) {
2!
448
        metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(code));
×
449
        continue;
×
450
      }
451
      code = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKeyDirty, sizeof(ttlKeyDirty), &cacheEntry->ttlDaysDirty,
2✔
452
                         sizeof(cacheEntry->ttlDaysDirty), pTxn);
453
      if (TSDB_CODE_SUCCESS != code) {
2!
454
        metaError("%s, ttlMgr flush failed to upsert since %s", pTtlMgr->logPrefix, tstrerror(code));
×
455
        continue;
×
456
      }
457

458
      cacheEntry->ttlDays = cacheEntry->ttlDaysDirty;
2✔
459
      cacheEntry->changeTimeMs = cacheEntry->changeTimeMsDirty;
2✔
460
    } else if (pEntry->type == ENTRY_TYPE_DELETE) {
×
461
      code = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
×
462
      if (TSDB_CODE_SUCCESS != code && TSDB_CODE_NOT_FOUND != code) {
×
463
        metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(code));
×
464
        continue;
×
465
      }
466

467
      code = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
×
468
      if (TSDB_CODE_SUCCESS != code) {
×
469
        metaError("%s, ttlMgr flush failed to remove cache since %s", pTtlMgr->logPrefix, tstrerror(code));
×
470
        continue;
×
471
      }
472
    } else {
473
      metaError("%s, ttlMgr flush failed, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
×
474
      continue;
×
475
    }
476

477
    metaDebug("isdel:%d", pEntry->type == ENTRY_TYPE_DELETE);
2!
478
    metaDebug("ttlkey:%" PRId64 ", uid:%" PRId64, ttlKey.deleteTimeMs, ttlKey.uid);
2!
479
    metaDebug("ttlkeyDirty:%" PRId64 ", uid:%" PRId64, ttlKeyDirty.deleteTimeMs, ttlKeyDirty.uid);
2!
480

481
    code = taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(*pUid));
2✔
482
    if (TSDB_CODE_SUCCESS != code) {
2!
483
      metaError("%s, ttlMgr flush failed to remove dirty uid since %s", pTtlMgr->logPrefix, tstrerror(code));
×
484
      continue;
×
485
    }
486
  }
487

488
  int32_t count = taosHashGetSize(pTtlMgr->pDirtyUids);
719✔
489
  if (count != 0) {
719!
490
    taosHashClear(pTtlMgr->pDirtyUids);
×
491
    metaError("%s, ttlMgr flush failed, dirty uids not empty, count: %d", pTtlMgr->logPrefix, count);
×
492
    code = TSDB_CODE_VND_TTL_FLUSH_INCOMPLETION;
×
493

494
    goto _out;
×
495
  }
496

497
  code = TSDB_CODE_SUCCESS;
719✔
498

499
_out:
719✔
500
  taosHashCancelIterate(pTtlMgr->pDirtyUids, pIter);
719✔
501

502
  endNs = taosGetTimestampNs();
719✔
503
  metaTrace("%s, ttl mgr flush end, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, endNs - startNs);
719✔
504

505
  TAOS_RETURN(code);
719✔
506
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc