• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.0
/source/dnode/vnode/src/meta/metaTtl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "metaTtl.h"
17
#include "meta.h"
18

19
typedef struct {
20
  TTB   *pNewTtlIdx;
21
  SMeta *pMeta;
22
} SConvertData;
23

24
typedef struct {
25
  int32_t      ttlDropMaxCount;
26
  int32_t      count;
27
  STtlIdxKeyV1 expiredKey;
28
  SArray      *pTbUids;
29
} STtlExpiredCtx;
30

31
static void ttlMgrCleanup(STtlManger *pTtlMgr);
32

33
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta);
34

35
static void    ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid);
36
static int     ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
37
static int     ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
38
static int     ttlMgrFillCache(STtlManger *pTtlMgr);
39
static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache);
40
static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData);
41
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
42
                                         void *pExpiredInfo);
43

44
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr);
45

46
const char *ttlTbname = "ttl.idx";
47
const char *ttlV1Tbname = "ttlv1.idx";
48

49
int32_t ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix, int32_t flushThreshold) {
553✔
50
  int32_t code = TSDB_CODE_SUCCESS;
553✔
51
  int64_t startNs = taosGetTimestampNs();
553✔
52
  int32_t pathLen = 0;
553✔
53

54
  *ppTtlMgr = NULL;
553✔
55

56
  STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
553!
57
  if (pTtlMgr == NULL) TAOS_RETURN(terrno);
553!
58

59
  pathLen = strlen(logPrefix) + 1;
553✔
60
  char *logBuffer = (char *)tdbOsCalloc(1, pathLen);
553!
61
  if (logBuffer == NULL) {
553!
62
    tdbOsFree(pTtlMgr);
×
63
    TAOS_RETURN(terrno);
×
64
  }
65
  tstrncpy(logBuffer, logPrefix, pathLen);
553✔
66
  pTtlMgr->logPrefix = logBuffer;
553✔
67
  pTtlMgr->flushThreshold = flushThreshold;
553✔
68

69
  code = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
553✔
70
  if (TSDB_CODE_SUCCESS != code) {
553!
71
    metaError("%s, failed to open %s since %s", pTtlMgr->logPrefix, ttlV1Tbname, tstrerror(code));
×
72
    tdbOsFree(pTtlMgr);
×
73
    TAOS_RETURN(code);
×
74
  }
75

76
  pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
553✔
77
  pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
553✔
78

79
  if ((code = ttlMgrFillCache(pTtlMgr)) != TSDB_CODE_SUCCESS) {
553!
80
    metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
×
81
    ttlMgrCleanup(pTtlMgr);
×
82
    TAOS_RETURN(code);
×
83
  }
84

85
  int64_t endNs = taosGetTimestampNs();
553✔
86
  metaInfo("%s, ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
553!
87
           taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
88

89
  *ppTtlMgr = pTtlMgr;
553✔
90
  TAOS_RETURN(TSDB_CODE_SUCCESS);
553✔
91
}
92

93
void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); }
553✔
94

95
bool ttlMgrNeedUpgrade(TDB *pEnv) {
553✔
96
  bool needUpgrade = tdbTbExist(ttlTbname, pEnv);
553✔
97
  if (needUpgrade) {
553!
98
    metaInfo("find ttl idx in old version , will convert");
×
99
  }
100
  return needUpgrade;
553✔
101
}
102

103
int32_t ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
×
104
  SMeta  *meta = (SMeta *)pMeta;
×
105
  int32_t code = TSDB_CODE_SUCCESS;
×
106

107
  if (!tdbTbExist(ttlTbname, meta->pEnv)) TAOS_RETURN(TSDB_CODE_SUCCESS);
×
108

109
  metaInfo("%s, ttl mgr start upgrade", pTtlMgr->logPrefix);
×
110

111
  int64_t startNs = taosGetTimestampNs();
×
112

113
  code = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
×
114
  if (TSDB_CODE_SUCCESS != code) {
×
115
    metaError("%s, failed to open %s index since %s", pTtlMgr->logPrefix, ttlTbname, tstrerror(code));
×
116
    goto _out;
×
117
  }
118

119
  if ((code = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta)) != TSDB_CODE_SUCCESS) {
×
120
    metaError("%s, failed to convert ttl index since %s", pTtlMgr->logPrefix, tstrerror(code));
×
121
    goto _out;
×
122
  }
123

124
  if ((code = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn)) != TSDB_CODE_SUCCESS) {
×
125
    metaError("%s, failed to drop old ttl index since %s", pTtlMgr->logPrefix, tstrerror(code));
×
126
    goto _out;
×
127
  }
128

129
  if ((code = ttlMgrFillCache(pTtlMgr)) != TSDB_CODE_SUCCESS) {
×
130
    metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(code));
×
131
    goto _out;
×
132
  }
133

134
  int64_t endNs = taosGetTimestampNs();
×
135
  metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
×
136
           taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
137

138
_out:
×
139
  tdbTbClose(pTtlMgr->pOldTtlIdx);
×
140
  pTtlMgr->pOldTtlIdx = NULL;
×
141

142
  TAOS_RETURN(code);
×
143
}
144

145
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
553✔
146
  taosMemoryFree(pTtlMgr->logPrefix);
553!
147
  taosHashCleanup(pTtlMgr->pTtlCache);
553✔
148
  taosHashCleanup(pTtlMgr->pDirtyUids);
553✔
149
  tdbTbClose(pTtlMgr->pTtlIdx);
553✔
150
  taosMemoryFree(pTtlMgr);
553!
151
}
553✔
152

UNCOV
153
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
×
UNCOV
154
  if (ttlDays <= 0) return;
×
155

UNCOV
156
  pTtlKey->deleteTimeMs = changeTimeMs + ttlDays * tsTtlUnit * 1000;
×
UNCOV
157
  pTtlKey->uid = uid;
×
158
}
159

160
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
×
161
  STtlIdxKey *pTtlIdxKey1 = (STtlIdxKey *)pKey1;
×
162
  STtlIdxKey *pTtlIdxKey2 = (STtlIdxKey *)pKey2;
×
163

164
  if (pTtlIdxKey1->deleteTimeSec > pTtlIdxKey2->deleteTimeSec) {
×
165
    return 1;
×
166
  } else if (pTtlIdxKey1->deleteTimeSec < pTtlIdxKey2->deleteTimeSec) {
×
167
    return -1;
×
168
  }
169

170
  if (pTtlIdxKey1->uid > pTtlIdxKey2->uid) {
×
171
    return 1;
×
172
  } else if (pTtlIdxKey1->uid < pTtlIdxKey2->uid) {
×
173
    return -1;
×
174
  }
175

176
  return 0;
×
177
}
178

UNCOV
179
static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
×
UNCOV
180
  STtlIdxKeyV1 *pTtlIdxKey1 = (STtlIdxKeyV1 *)pKey1;
×
UNCOV
181
  STtlIdxKeyV1 *pTtlIdxKey2 = (STtlIdxKeyV1 *)pKey2;
×
182

UNCOV
183
  if (pTtlIdxKey1->deleteTimeMs > pTtlIdxKey2->deleteTimeMs) {
×
UNCOV
184
    return 1;
×
UNCOV
185
  } else if (pTtlIdxKey1->deleteTimeMs < pTtlIdxKey2->deleteTimeMs) {
×
UNCOV
186
    return -1;
×
187
  }
188

UNCOV
189
  if (pTtlIdxKey1->uid > pTtlIdxKey2->uid) {
×
190
    return 1;
×
UNCOV
191
  } else if (pTtlIdxKey1->uid < pTtlIdxKey2->uid) {
×
192
    return -1;
×
193
  }
194

UNCOV
195
  return 0;
×
196
}
197

198
static int ttlMgrFillCache(STtlManger *pTtlMgr) {
553✔
199
  return tdbTbTraversal(pTtlMgr->pTtlIdx, pTtlMgr->pTtlCache, ttlMgrFillCacheOneEntry);
553✔
200
}
201

202
static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache) {
×
203
  SHashObj *pCache = (SHashObj *)pTtlCache;
×
204

205
  STtlIdxKeyV1 *ttlKey = (STtlIdxKeyV1 *)pKey;
×
206
  tb_uid_t      uid = ttlKey->uid;
×
207
  int64_t       ttlDays = *(int64_t *)pVal;
×
208
  int64_t       changeTimeMs = ttlKey->deleteTimeMs - ttlDays * tsTtlUnit * 1000;
×
209

210
  STtlCacheEntry data = {
×
211
      .ttlDays = ttlDays, .changeTimeMs = changeTimeMs, .ttlDaysDirty = ttlDays, .changeTimeMsDirty = changeTimeMs};
212

213
  return taosHashPut(pCache, &uid, sizeof(uid), &data, sizeof(data));
×
214
}
215

216
static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData) {
×
217
  SConvertData *pData = (SConvertData *)pConvertData;
×
218

219
  STtlIdxKey *ttlKey = (STtlIdxKey *)pKey;
×
220
  tb_uid_t    uid = ttlKey->uid;
×
221
  int64_t     ttlDays = 0;
×
222

223
  int32_t code = TSDB_CODE_SUCCESS;
×
224
  if ((code = metaGetTableTtlByUid(pData->pMeta, uid, &ttlDays)) != TSDB_CODE_SUCCESS) {
×
225
    metaError("ttlMgr convert failed to get ttl since %s", tstrerror(code));
×
226
    goto _out;
×
227
  }
228

229
  STtlIdxKeyV1 ttlKeyV1 = {.deleteTimeMs = ttlKey->deleteTimeSec * 1000, .uid = uid};
×
230
  code = tdbTbUpsert(pData->pNewTtlIdx, &ttlKeyV1, sizeof(ttlKeyV1), &ttlDays, sizeof(ttlDays), pData->pMeta->txn);
×
231
  if (code != TSDB_CODE_SUCCESS) {
×
232
    metaError("ttlMgr convert failed to upsert since %s", tstrerror(code));
×
233
    goto _out;
×
234
  }
235

236
  code = TSDB_CODE_SUCCESS;
×
237

238
_out:
×
239
  TAOS_RETURN(code);
×
240
}
241

UNCOV
242
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
×
243
                                         void *pExpiredCtx) {
UNCOV
244
  STtlExpiredCtx *pCtx = (STtlExpiredCtx *)pExpiredCtx;
×
UNCOV
245
  if (pCtx->count >= pCtx->ttlDropMaxCount) return -1;
×
246

UNCOV
247
  int c = ttlIdxKeyV1Cmpr(&pCtx->expiredKey, sizeof(pCtx->expiredKey), pKey, keyLen);
×
UNCOV
248
  if (c > 0) {
×
UNCOV
249
    if (NULL == taosArrayPush(pCtx->pTbUids, &((STtlIdxKeyV1 *)pKey)->uid)) {
×
250
      metaError("ttlMgr find expired failed since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
251
      return -1;
×
252
    }
UNCOV
253
    pCtx->count++;
×
254
  }
255

UNCOV
256
  return c;
×
257
}
258

259
// static int32_t ttlMgrDumpOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pDumpCtx) {
260
//   STtlIdxKeyV1 *ttlKey = (STtlIdxKeyV1 *)pKey;
261
//   int64_t      *ttlDays = (int64_t *)pVal;
262

263
//   metaInfo("ttlMgr dump, ttl: %" PRId64 ", ctime: %" PRId64 ", uid: %" PRId64, *ttlDays, ttlKey->deleteTimeMs,
264
//            ttlKey->uid);
265

266
//   TAOS_RETURN(TSDB_CODE_SUCCESS);
267
// }
268

269
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
×
270
  SMeta *meta = pMeta;
×
271

272
  metaInfo("ttlMgr convert start.");
×
273

274
  SConvertData cvData = {.pNewTtlIdx = pNewTtlIdx, .pMeta = meta};
×
275

276
  int code = TSDB_CODE_SUCCESS;
×
277
  if ((code = tdbTbTraversal(pOldTtlIdx, &cvData, ttlMgrConvertOneEntry)) != TSDB_CODE_SUCCESS) {
×
278
    metaError("failed to convert since %s", tstrerror(code));
×
279
  }
280

281
  metaInfo("ttlMgr convert end.");
×
282
  TAOS_RETURN(code);
×
283
}
284

285
int32_t ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
566✔
286
  if (updCtx->ttlDays == 0) return 0;
566!
287

UNCOV
288
  STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays,
×
UNCOV
289
                               .changeTimeMs = updCtx->changeTimeMs,
×
UNCOV
290
                               .ttlDaysDirty = updCtx->ttlDays,
×
UNCOV
291
                               .changeTimeMsDirty = updCtx->changeTimeMs};
×
UNCOV
292
  STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
×
293

UNCOV
294
  int32_t code = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
×
UNCOV
295
  if (TSDB_CODE_SUCCESS != code) {
×
296
    metaError("%s, ttlMgr insert failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(code));
×
297
    goto _out;
×
298
  }
299

UNCOV
300
  code = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
×
UNCOV
301
  if (TSDB_CODE_SUCCESS != code) {
×
302
    metaError("%s, ttlMgr insert failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
×
303
    goto _out;
×
304
  }
305

UNCOV
306
  if (ttlMgrNeedFlush(pTtlMgr)) {
×
307
    int32_t ret = ttlMgrFlush(pTtlMgr, updCtx->pTxn);
×
308
    if (ret < 0) {
×
309
      metaError("%s, ttlMgr insert failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
×
310
    }
311
  }
312

UNCOV
313
  code = TSDB_CODE_SUCCESS;
×
314

UNCOV
315
_out:
×
UNCOV
316
  metaTrace("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
×
317
            updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
318

UNCOV
319
  TAOS_RETURN(code);
×
320
}
321

UNCOV
322
int32_t ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
×
UNCOV
323
  if (delCtx->ttlDays == 0) return 0;
×
324

UNCOV
325
  STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DELETE};
×
326

UNCOV
327
  int32_t code = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
×
UNCOV
328
  if (TSDB_CODE_SUCCESS != code) {
×
329
    metaError("%s, ttlMgr del failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
×
330
    goto _out;
×
331
  }
332

UNCOV
333
  if (ttlMgrNeedFlush(pTtlMgr)) {
×
334
    int32_t ret = ttlMgrFlush(pTtlMgr, delCtx->pTxn);
×
335
    if (ret < 0) {
×
336
      metaError("%s, ttlMgr del failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
×
337
    }
338
  }
339

UNCOV
340
  code = TSDB_CODE_SUCCESS;
×
341

UNCOV
342
_out:
×
UNCOV
343
  metaTrace("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
×
UNCOV
344
  TAOS_RETURN(code);
×
345
}
346

347
int32_t ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
×
348
  int32_t code = TSDB_CODE_SUCCESS;
×
349

350
  STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
×
351
  if (oldData == NULL) {
×
352
    goto _out;
×
353
  }
354

355
  STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays,
×
356
                               .changeTimeMs = oldData->changeTimeMs,
×
357
                               .ttlDaysDirty = oldData->ttlDays,
×
358
                               .changeTimeMsDirty = pUpdCtimeCtx->changeTimeMs};
×
359
  STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
×
360

361
  code =
362
      taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
×
363
  if (TSDB_CODE_SUCCESS != code) {
×
364
    metaError("%s, ttlMgr update ctime failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(code));
×
365
    goto _out;
×
366
  }
367

368
  code = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
×
369
                     sizeof(dirtryEntry));
370
  if (TSDB_CODE_SUCCESS != code) {
×
371
    metaError("%s, ttlMgr update ctime failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
×
372
    goto _out;
×
373
  }
374

375
  if (ttlMgrNeedFlush(pTtlMgr)) {
×
376
    int32_t ret = ttlMgrFlush(pTtlMgr, pUpdCtimeCtx->pTxn);
×
377
    if (ret < 0) {
×
378
      metaError("%s, ttlMgr update ctime failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
×
379
    }
380
  }
381

382
  code = TSDB_CODE_SUCCESS;
×
383

384
_out:
×
385
  metaTrace("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
×
386
            pUpdCtimeCtx->changeTimeMs);
387
  TAOS_RETURN(code);
×
388
}
389

390
int32_t ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) {
247✔
391
  int32_t code = TSDB_CODE_SUCCESS;
247✔
392

393
  STtlIdxKeyV1   ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX};
247✔
394
  STtlExpiredCtx expiredCtx = {
247✔
395
      .ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids};
396
  TAOS_CHECK_GOTO(tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry), NULL, _out);
247!
397

398
  size_t vIdx = 0;
248✔
399
  for (size_t i = 0; i < pTbUids->size; i++) {
248!
UNCOV
400
    tb_uid_t *pUid = taosArrayGet(pTbUids, i);
×
UNCOV
401
    if (taosHashGet(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t)) == NULL) {
×
402
      // not in dirty && expired in tdb => must be expired
UNCOV
403
      taosArraySet(pTbUids, vIdx, pUid);
×
UNCOV
404
      vIdx++;
×
405
    }
406
  }
407

408
  taosArrayPopTailBatch(pTbUids, pTbUids->size - vIdx);
248✔
409

410
_out:
246✔
411
  TAOS_RETURN(code);
246✔
412
}
413

UNCOV
414
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) {
×
UNCOV
415
  return pTtlMgr->flushThreshold > 0 && taosHashGetSize(pTtlMgr->pDirtyUids) > pTtlMgr->flushThreshold;
×
416
}
417

418
int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
13✔
419
  int64_t startNs = taosGetTimestampNs();
13✔
420
  int64_t endNs = startNs;
13✔
421

422
  metaTrace("%s, ttl mgr flush start. num of dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
13!
423

424
  int32_t code = TSDB_CODE_SUCCESS;
13✔
425

426
  void *pIter = NULL;
13✔
427
  while ((pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIter)) != NULL) {
13!
UNCOV
428
    STtlDirtyEntry *pEntry = (STtlDirtyEntry *)pIter;
×
UNCOV
429
    tb_uid_t       *pUid = taosHashGetKey(pIter, NULL);
×
430

UNCOV
431
    STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
×
UNCOV
432
    if (cacheEntry == NULL) {
×
433
      metaError("%s, ttlMgr flush failed to get ttl cache, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, *pUid,
×
434
                pEntry->type);
435
      continue;
×
436
    }
437

438
    STtlIdxKeyV1 ttlKey;
UNCOV
439
    ttlMgrBuildKey(&ttlKey, cacheEntry->ttlDays, cacheEntry->changeTimeMs, *pUid);
×
440

441
    STtlIdxKeyV1 ttlKeyDirty;
UNCOV
442
    ttlMgrBuildKey(&ttlKeyDirty, cacheEntry->ttlDaysDirty, cacheEntry->changeTimeMsDirty, *pUid);
×
443

UNCOV
444
    if (pEntry->type == ENTRY_TYPE_UPSERT) {
×
445
      // delete old key & upsert new key
UNCOV
446
      code = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);  // maybe first insert, ignore error
×
UNCOV
447
      if (TSDB_CODE_SUCCESS != code && TSDB_CODE_NOT_FOUND != code) {
×
448
        metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(code));
×
449
        continue;
×
450
      }
UNCOV
451
      code = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKeyDirty, sizeof(ttlKeyDirty), &cacheEntry->ttlDaysDirty,
×
452
                         sizeof(cacheEntry->ttlDaysDirty), pTxn);
UNCOV
453
      if (TSDB_CODE_SUCCESS != code) {
×
454
        metaError("%s, ttlMgr flush failed to upsert since %s", pTtlMgr->logPrefix, tstrerror(code));
×
455
        continue;
×
456
      }
457

UNCOV
458
      cacheEntry->ttlDays = cacheEntry->ttlDaysDirty;
×
UNCOV
459
      cacheEntry->changeTimeMs = cacheEntry->changeTimeMsDirty;
×
UNCOV
460
    } else if (pEntry->type == ENTRY_TYPE_DELETE) {
×
UNCOV
461
      code = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
×
UNCOV
462
      if (TSDB_CODE_SUCCESS != code && TSDB_CODE_NOT_FOUND != code) {
×
463
        metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(code));
×
464
        continue;
×
465
      }
466

UNCOV
467
      code = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
×
UNCOV
468
      if (TSDB_CODE_SUCCESS != code) {
×
469
        metaError("%s, ttlMgr flush failed to remove cache since %s", pTtlMgr->logPrefix, tstrerror(code));
×
470
        continue;
×
471
      }
472
    } else {
473
      metaError("%s, ttlMgr flush failed, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
×
474
      continue;
×
475
    }
476

UNCOV
477
    metaDebug("isdel:%d", pEntry->type == ENTRY_TYPE_DELETE);
×
UNCOV
478
    metaDebug("ttlkey:%" PRId64 ", uid:%" PRId64, ttlKey.deleteTimeMs, ttlKey.uid);
×
UNCOV
479
    metaDebug("ttlkeyDirty:%" PRId64 ", uid:%" PRId64, ttlKeyDirty.deleteTimeMs, ttlKeyDirty.uid);
×
480

UNCOV
481
    code = taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(*pUid));
×
UNCOV
482
    if (TSDB_CODE_SUCCESS != code) {
×
483
      metaError("%s, ttlMgr flush failed to remove dirty uid since %s", pTtlMgr->logPrefix, tstrerror(code));
×
484
      continue;
×
485
    }
486
  }
487

488
  int32_t count = taosHashGetSize(pTtlMgr->pDirtyUids);
13✔
489
  if (count != 0) {
13!
490
    taosHashClear(pTtlMgr->pDirtyUids);
×
491
    metaError("%s, ttlMgr flush failed, dirty uids not empty, count: %d", pTtlMgr->logPrefix, count);
×
492
    code = TSDB_CODE_VND_TTL_FLUSH_INCOMPLETION;
×
493

494
    goto _out;
×
495
  }
496

497
  code = TSDB_CODE_SUCCESS;
13✔
498

499
_out:
13✔
500
  taosHashCancelIterate(pTtlMgr->pDirtyUids, pIter);
13✔
501

502
  endNs = taosGetTimestampNs();
13✔
503
  metaTrace("%s, ttl mgr flush end, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, endNs - startNs);
13!
504

505
  TAOS_RETURN(code);
13✔
506
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc