• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.58
/source/dnode/vnode/src/meta/metaTtl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "metaTtl.h"
17
#include "meta.h"
18

19
typedef struct {
20
  TTB   *pNewTtlIdx;
21
  SMeta *pMeta;
22
} SConvertData;
23

24
typedef struct {
25
  int32_t      ttlDropMaxCount;
26
  int32_t      count;
27
  STtlIdxKeyV1 expiredKey;
28
  SArray      *pTbUids;
29
} STtlExpiredCtx;
30

31
static void ttlMgrCleanup(STtlManger *pTtlMgr);
32

33
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta);
34

35
static void    ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid);
36
static int     ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
37
static int     ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
38
static int     ttlMgrFillCache(STtlManger *pTtlMgr);
39
static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache);
40
static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData);
41
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
42
                                         void *pExpiredInfo);
43

44
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr);
45

46
const char *ttlTbname = "ttl.idx";
47
const char *ttlV1Tbname = "ttlv1.idx";
48

49
int32_t ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix, int32_t flushThreshold) {
6,597✔
50
  int32_t code = TSDB_CODE_SUCCESS;
6,597✔
51
  int64_t startNs = taosGetTimestampNs();
6,623✔
52

53
  *ppTtlMgr = NULL;
6,623✔
54

55
  STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
6,623✔
56
  if (pTtlMgr == NULL) TAOS_RETURN(terrno);
6,624!
57

58
  char *logBuffer = (char *)tdbOsCalloc(1, strlen(logPrefix) + 1);
6,624✔
59
  if (logBuffer == NULL) {
6,624!
60
    tdbOsFree(pTtlMgr);
×
61
    TAOS_RETURN(terrno);
×
62
  }
63
  (void)strcpy(logBuffer, logPrefix);
6,624✔
64
  pTtlMgr->logPrefix = logBuffer;
6,624✔
65
  pTtlMgr->flushThreshold = flushThreshold;
6,624✔
66

67
  code = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
6,624✔
68
  if (TSDB_CODE_SUCCESS != code) {
6,624!
69
    metaError("%s, failed to open %s since %s", pTtlMgr->logPrefix, ttlV1Tbname, tstrerror(code));
×
70
    tdbOsFree(pTtlMgr);
×
71
    TAOS_RETURN(code);
×
72
  }
73

74
  pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
6,624✔
75
  pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
6,625✔
76

77
  if ((code = ttlMgrFillCache(pTtlMgr)) != TSDB_CODE_SUCCESS) {
6,624!
78
    metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
×
79
    ttlMgrCleanup(pTtlMgr);
×
80
    TAOS_RETURN(code);
×
81
  }
82

83
  int64_t endNs = taosGetTimestampNs();
6,622✔
84
  metaInfo("%s, ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
6,622!
85
           taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
86

87
  *ppTtlMgr = pTtlMgr;
6,623✔
88
  TAOS_RETURN(TSDB_CODE_SUCCESS);
6,623✔
89
}
90

91
void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); }
6,625✔
92

93
bool ttlMgrNeedUpgrade(TDB *pEnv) {
6,624✔
94
  bool needUpgrade = tdbTbExist(ttlTbname, pEnv);
6,624✔
95
  if (needUpgrade) {
6,624!
96
    metaInfo("find ttl idx in old version , will convert");
×
97
  }
98
  return needUpgrade;
6,624✔
99
}
100

101
int32_t ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
×
102
  SMeta  *meta = (SMeta *)pMeta;
×
103
  int32_t code = TSDB_CODE_SUCCESS;
×
104

105
  if (!tdbTbExist(ttlTbname, meta->pEnv)) TAOS_RETURN(TSDB_CODE_SUCCESS);
×
106

107
  metaInfo("%s, ttl mgr start upgrade", pTtlMgr->logPrefix);
×
108

109
  int64_t startNs = taosGetTimestampNs();
×
110

111
  code = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
×
112
  if (TSDB_CODE_SUCCESS != code) {
×
113
    metaError("%s, failed to open %s index since %s", pTtlMgr->logPrefix, ttlTbname, tstrerror(code));
×
114
    goto _out;
×
115
  }
116

117
  if ((code = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta)) != TSDB_CODE_SUCCESS) {
×
118
    metaError("%s, failed to convert ttl index since %s", pTtlMgr->logPrefix, tstrerror(code));
×
119
    goto _out;
×
120
  }
121

122
  if ((code = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn)) != TSDB_CODE_SUCCESS) {
×
123
    metaError("%s, failed to drop old ttl index since %s", pTtlMgr->logPrefix, tstrerror(code));
×
124
    goto _out;
×
125
  }
126

127
  if ((code = ttlMgrFillCache(pTtlMgr)) != TSDB_CODE_SUCCESS) {
×
128
    metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(code));
×
129
    goto _out;
×
130
  }
131

132
  int64_t endNs = taosGetTimestampNs();
×
133
  metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
×
134
           taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
135

136
_out:
×
137
  tdbTbClose(pTtlMgr->pOldTtlIdx);
×
138
  pTtlMgr->pOldTtlIdx = NULL;
×
139

140
  TAOS_RETURN(code);
×
141
}
142

143
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
6,625✔
144
  taosMemoryFree(pTtlMgr->logPrefix);
6,625✔
145
  taosHashCleanup(pTtlMgr->pTtlCache);
6,625✔
146
  taosHashCleanup(pTtlMgr->pDirtyUids);
6,625✔
147
  tdbTbClose(pTtlMgr->pTtlIdx);
6,625✔
148
  taosMemoryFree(pTtlMgr);
6,625✔
149
}
6,625✔
150

151
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
423✔
152
  if (ttlDays <= 0) return;
423!
153

154
  pTtlKey->deleteTimeMs = changeTimeMs + ttlDays * tsTtlUnit * 1000;
423✔
155
  pTtlKey->uid = uid;
423✔
156
}
157

158
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
×
159
  STtlIdxKey *pTtlIdxKey1 = (STtlIdxKey *)pKey1;
×
160
  STtlIdxKey *pTtlIdxKey2 = (STtlIdxKey *)pKey2;
×
161

162
  if (pTtlIdxKey1->deleteTimeSec > pTtlIdxKey2->deleteTimeSec) {
×
163
    return 1;
×
164
  } else if (pTtlIdxKey1->deleteTimeSec < pTtlIdxKey2->deleteTimeSec) {
×
165
    return -1;
×
166
  }
167

168
  if (pTtlIdxKey1->uid > pTtlIdxKey2->uid) {
×
169
    return 1;
×
170
  } else if (pTtlIdxKey1->uid < pTtlIdxKey2->uid) {
×
171
    return -1;
×
172
  }
173

174
  return 0;
×
175
}
176

177
static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
1,019✔
178
  STtlIdxKeyV1 *pTtlIdxKey1 = (STtlIdxKeyV1 *)pKey1;
1,019✔
179
  STtlIdxKeyV1 *pTtlIdxKey2 = (STtlIdxKeyV1 *)pKey2;
1,019✔
180

181
  if (pTtlIdxKey1->deleteTimeMs > pTtlIdxKey2->deleteTimeMs) {
1,019✔
182
    return 1;
945✔
183
  } else if (pTtlIdxKey1->deleteTimeMs < pTtlIdxKey2->deleteTimeMs) {
74✔
184
    return -1;
50✔
185
  }
186

187
  if (pTtlIdxKey1->uid > pTtlIdxKey2->uid) {
24!
188
    return 1;
×
189
  } else if (pTtlIdxKey1->uid < pTtlIdxKey2->uid) {
24!
190
    return -1;
×
191
  }
192

193
  return 0;
24✔
194
}
195

196
static int ttlMgrFillCache(STtlManger *pTtlMgr) {
6,623✔
197
  return tdbTbTraversal(pTtlMgr->pTtlIdx, pTtlMgr->pTtlCache, ttlMgrFillCacheOneEntry);
6,623✔
198
}
199

200
static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache) {
×
201
  SHashObj *pCache = (SHashObj *)pTtlCache;
×
202

203
  STtlIdxKeyV1 *ttlKey = (STtlIdxKeyV1 *)pKey;
×
204
  tb_uid_t      uid = ttlKey->uid;
×
205
  int64_t       ttlDays = *(int64_t *)pVal;
×
206
  int64_t       changeTimeMs = ttlKey->deleteTimeMs - ttlDays * tsTtlUnit * 1000;
×
207

208
  STtlCacheEntry data = {
×
209
      .ttlDays = ttlDays, .changeTimeMs = changeTimeMs, .ttlDaysDirty = ttlDays, .changeTimeMsDirty = changeTimeMs};
210

211
  return taosHashPut(pCache, &uid, sizeof(uid), &data, sizeof(data));
×
212
}
213

214
static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData) {
×
215
  SConvertData *pData = (SConvertData *)pConvertData;
×
216

217
  STtlIdxKey *ttlKey = (STtlIdxKey *)pKey;
×
218
  tb_uid_t    uid = ttlKey->uid;
×
219
  int64_t     ttlDays = 0;
×
220

221
  int32_t code = TSDB_CODE_SUCCESS;
×
222
  if ((code = metaGetTableTtlByUid(pData->pMeta, uid, &ttlDays)) != TSDB_CODE_SUCCESS) {
×
223
    metaError("ttlMgr convert failed to get ttl since %s", tstrerror(code));
×
224
    goto _out;
×
225
  }
226

227
  STtlIdxKeyV1 ttlKeyV1 = {.deleteTimeMs = ttlKey->deleteTimeSec * 1000, .uid = uid};
×
228
  code = tdbTbUpsert(pData->pNewTtlIdx, &ttlKeyV1, sizeof(ttlKeyV1), &ttlDays, sizeof(ttlDays), pData->pMeta->txn);
×
229
  if (code != TSDB_CODE_SUCCESS) {
×
230
    metaError("ttlMgr convert failed to upsert since %s", tstrerror(code));
×
231
    goto _out;
×
232
  }
233

234
  code = TSDB_CODE_SUCCESS;
×
235

236
_out:
×
237
  TAOS_RETURN(code);
×
238
}
239

240
static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen,
134✔
241
                                         void *pExpiredCtx) {
242
  STtlExpiredCtx *pCtx = (STtlExpiredCtx *)pExpiredCtx;
134✔
243
  if (pCtx->count >= pCtx->ttlDropMaxCount) return -1;
134✔
244

245
  int c = ttlIdxKeyV1Cmpr(&pCtx->expiredKey, sizeof(pCtx->expiredKey), pKey, keyLen);
133✔
246
  if (c > 0) {
133✔
247
    if (NULL == taosArrayPush(pCtx->pTbUids, &((STtlIdxKeyV1 *)pKey)->uid)) {
190!
248
      metaError("ttlMgr find expired failed since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
249
      return -1;
×
250
    }
251
    pCtx->count++;
95✔
252
  }
253

254
  return c;
133✔
255
}
256

257
// static int32_t ttlMgrDumpOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pDumpCtx) {
258
//   STtlIdxKeyV1 *ttlKey = (STtlIdxKeyV1 *)pKey;
259
//   int64_t      *ttlDays = (int64_t *)pVal;
260

261
//   metaInfo("ttlMgr dump, ttl: %" PRId64 ", ctime: %" PRId64 ", uid: %" PRId64, *ttlDays, ttlKey->deleteTimeMs,
262
//            ttlKey->uid);
263

264
//   TAOS_RETURN(TSDB_CODE_SUCCESS);
265
// }
266

267
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
×
268
  SMeta *meta = pMeta;
×
269

270
  metaInfo("ttlMgr convert start.");
×
271

272
  SConvertData cvData = {.pNewTtlIdx = pNewTtlIdx, .pMeta = meta};
×
273

274
  int code = TSDB_CODE_SUCCESS;
×
275
  if ((code = tdbTbTraversal(pOldTtlIdx, &cvData, ttlMgrConvertOneEntry)) != TSDB_CODE_SUCCESS) {
×
276
    metaError("failed to convert since %s", tstrerror(code));
×
277
  }
278

279
  metaInfo("ttlMgr convert end.");
×
280
  TAOS_RETURN(code);
×
281
}
282

283
int32_t ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
30,866✔
284
  if (updCtx->ttlDays == 0) return 0;
30,866✔
285

286
  STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays,
198✔
287
                               .changeTimeMs = updCtx->changeTimeMs,
198✔
288
                               .ttlDaysDirty = updCtx->ttlDays,
198✔
289
                               .changeTimeMsDirty = updCtx->changeTimeMs};
198✔
290
  STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
198✔
291

292
  int32_t code = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
198✔
293
  if (TSDB_CODE_SUCCESS != code) {
198!
294
    metaError("%s, ttlMgr insert failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(code));
×
295
    goto _out;
×
296
  }
297

298
  code = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
198✔
299
  if (TSDB_CODE_SUCCESS != code) {
198!
300
    metaError("%s, ttlMgr insert failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
×
301
    goto _out;
×
302
  }
303

304
  if (ttlMgrNeedFlush(pTtlMgr)) {
198!
305
    int32_t ret = ttlMgrFlush(pTtlMgr, updCtx->pTxn);
×
306
    if (ret < 0) {
×
307
      metaError("%s, ttlMgr insert failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
×
308
    }
309
  }
310

311
  code = TSDB_CODE_SUCCESS;
198✔
312

313
_out:
198✔
314
  metaTrace("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
198!
315
            updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
316

317
  TAOS_RETURN(code);
198✔
318
}
319

320
int32_t ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
1,521✔
321
  if (delCtx->ttlDays == 0) return 0;
1,521✔
322

323
  STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DELETE};
68✔
324

325
  int32_t code = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
68✔
326
  if (TSDB_CODE_SUCCESS != code) {
68!
327
    metaError("%s, ttlMgr del failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
×
328
    goto _out;
×
329
  }
330

331
  if (ttlMgrNeedFlush(pTtlMgr)) {
68!
332
    int32_t ret = ttlMgrFlush(pTtlMgr, delCtx->pTxn);
×
333
    if (ret < 0) {
×
334
      metaError("%s, ttlMgr del failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
×
335
    }
336
  }
337

338
  code = TSDB_CODE_SUCCESS;
68✔
339

340
_out:
68✔
341
  metaTrace("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
68!
342
  TAOS_RETURN(code);
68✔
343
}
344

345
int32_t ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
1✔
346
  int32_t code = TSDB_CODE_SUCCESS;
1✔
347

348
  STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
1✔
349
  if (oldData == NULL) {
1!
350
    goto _out;
×
351
  }
352

353
  STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays,
1✔
354
                               .changeTimeMs = oldData->changeTimeMs,
1✔
355
                               .ttlDaysDirty = oldData->ttlDays,
1✔
356
                               .changeTimeMsDirty = pUpdCtimeCtx->changeTimeMs};
1✔
357
  STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
1✔
358

359
  code =
360
      taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
1✔
361
  if (TSDB_CODE_SUCCESS != code) {
1!
362
    metaError("%s, ttlMgr update ctime failed to update cache since %s", pTtlMgr->logPrefix, tstrerror(code));
×
363
    goto _out;
×
364
  }
365

366
  code = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
1✔
367
                     sizeof(dirtryEntry));
368
  if (TSDB_CODE_SUCCESS != code) {
1!
369
    metaError("%s, ttlMgr update ctime failed to update dirty uids since %s", pTtlMgr->logPrefix, tstrerror(code));
×
370
    goto _out;
×
371
  }
372

373
  if (ttlMgrNeedFlush(pTtlMgr)) {
1!
374
    int32_t ret = ttlMgrFlush(pTtlMgr, pUpdCtimeCtx->pTxn);
×
375
    if (ret < 0) {
×
376
      metaError("%s, ttlMgr update ctime failed to flush since %s", pTtlMgr->logPrefix, tstrerror(ret));
×
377
    }
378
  }
379

380
  code = TSDB_CODE_SUCCESS;
1✔
381

382
_out:
1✔
383
  metaTrace("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
1!
384
            pUpdCtimeCtx->changeTimeMs);
385
  TAOS_RETURN(code);
1✔
386
}
387

388
int32_t ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids, int32_t ttlDropMaxCount) {
8,585✔
389
  int32_t code = TSDB_CODE_SUCCESS;
8,585✔
390

391
  STtlIdxKeyV1   ttlKey = {.deleteTimeMs = timePointMs, .uid = INT64_MAX};
8,585✔
392
  STtlExpiredCtx expiredCtx = {
8,585✔
393
      .ttlDropMaxCount = ttlDropMaxCount, .count = 0, .expiredKey = ttlKey, .pTbUids = pTbUids};
394
  TAOS_CHECK_GOTO(tdbTbTraversal(pTtlMgr->pTtlIdx, &expiredCtx, ttlMgrFindExpiredOneEntry), NULL, _out);
8,585!
395

396
  size_t vIdx = 0;
8,602✔
397
  for (size_t i = 0; i < pTbUids->size; i++) {
8,697✔
398
    tb_uid_t *pUid = taosArrayGet(pTbUids, i);
95✔
399
    if (taosHashGet(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t)) == NULL) {
95✔
400
      // not in dirty && expired in tdb => must be expired
401
      taosArraySet(pTbUids, vIdx, pUid);
74✔
402
      vIdx++;
74✔
403
    }
404
  }
405

406
  taosArrayPopTailBatch(pTbUids, pTbUids->size - vIdx);
8,602✔
407

408
_out:
8,576✔
409
  TAOS_RETURN(code);
8,576✔
410
}
411

412
static bool ttlMgrNeedFlush(STtlManger *pTtlMgr) {
267✔
413
  return pTtlMgr->flushThreshold > 0 && taosHashGetSize(pTtlMgr->pDirtyUids) > pTtlMgr->flushThreshold;
267!
414
}
415

416
int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
7,371✔
417
  int64_t startNs = taosGetTimestampNs();
7,371✔
418
  int64_t endNs = startNs;
7,371✔
419

420
  metaTrace("%s, ttl mgr flush start. num of dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
7,371✔
421

422
  int32_t code = TSDB_CODE_SUCCESS;
7,371✔
423

424
  void *pIter = NULL;
7,371✔
425
  while ((pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIter)) != NULL) {
7,583✔
426
    STtlDirtyEntry *pEntry = (STtlDirtyEntry *)pIter;
212✔
427
    tb_uid_t       *pUid = taosHashGetKey(pIter, NULL);
212✔
428

429
    STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
212✔
430
    if (cacheEntry == NULL) {
212!
431
      metaError("%s, ttlMgr flush failed to get ttl cache, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, *pUid,
×
432
                pEntry->type);
433
      continue;
×
434
    }
435

436
    STtlIdxKeyV1 ttlKey;
437
    ttlMgrBuildKey(&ttlKey, cacheEntry->ttlDays, cacheEntry->changeTimeMs, *pUid);
212✔
438

439
    STtlIdxKeyV1 ttlKeyDirty;
440
    ttlMgrBuildKey(&ttlKeyDirty, cacheEntry->ttlDaysDirty, cacheEntry->changeTimeMsDirty, *pUid);
212✔
441

442
    if (pEntry->type == ENTRY_TYPE_UPSERT) {
212✔
443
      // delete old key & upsert new key
444
      code = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);  // maybe first insert, ignore error
184✔
445
      if (TSDB_CODE_SUCCESS != code && TSDB_CODE_NOT_FOUND != code) {
184!
446
        metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(code));
×
447
        continue;
×
448
      }
449
      code = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKeyDirty, sizeof(ttlKeyDirty), &cacheEntry->ttlDaysDirty,
184✔
450
                         sizeof(cacheEntry->ttlDaysDirty), pTxn);
451
      if (TSDB_CODE_SUCCESS != code) {
184!
452
        metaError("%s, ttlMgr flush failed to upsert since %s", pTtlMgr->logPrefix, tstrerror(code));
×
453
        continue;
×
454
      }
455

456
      cacheEntry->ttlDays = cacheEntry->ttlDaysDirty;
184✔
457
      cacheEntry->changeTimeMs = cacheEntry->changeTimeMsDirty;
184✔
458
    } else if (pEntry->type == ENTRY_TYPE_DELETE) {
28!
459
      code = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
28✔
460
      if (TSDB_CODE_SUCCESS != code && TSDB_CODE_NOT_FOUND != code) {
28!
461
        metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(code));
×
462
        continue;
×
463
      }
464

465
      code = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
28✔
466
      if (TSDB_CODE_SUCCESS != code) {
28!
467
        metaError("%s, ttlMgr flush failed to remove cache since %s", pTtlMgr->logPrefix, tstrerror(code));
×
468
        continue;
×
469
      }
470
    } else {
471
      metaError("%s, ttlMgr flush failed, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
×
472
      continue;
×
473
    }
474

475
    metaDebug("isdel:%d", pEntry->type == ENTRY_TYPE_DELETE);
212!
476
    metaDebug("ttlkey:%" PRId64 ", uid:%" PRId64, ttlKey.deleteTimeMs, ttlKey.uid);
212!
477
    metaDebug("ttlkeyDirty:%" PRId64 ", uid:%" PRId64, ttlKeyDirty.deleteTimeMs, ttlKeyDirty.uid);
212!
478

479
    code = taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(*pUid));
212✔
480
    if (TSDB_CODE_SUCCESS != code) {
212!
UNCOV
481
      metaError("%s, ttlMgr flush failed to remove dirty uid since %s", pTtlMgr->logPrefix, tstrerror(code));
×
482
      continue;
×
483
    }
484
  }
485

486
  int32_t count = taosHashGetSize(pTtlMgr->pDirtyUids);
7,371✔
487
  if (count != 0) {
7,371!
UNCOV
488
    taosHashClear(pTtlMgr->pDirtyUids);
×
489
    metaError("%s, ttlMgr flush failed, dirty uids not empty, count: %d", pTtlMgr->logPrefix, count);
×
490
    code = TSDB_CODE_VND_TTL_FLUSH_INCOMPLETION;
×
491

492
    goto _out;
×
493
  }
494

495
  code = TSDB_CODE_SUCCESS;
7,371✔
496

497
_out:
7,371✔
498
  taosHashCancelIterate(pTtlMgr->pDirtyUids, pIter);
7,371✔
499

500
  endNs = taosGetTimestampNs();
7,371✔
501
  metaTrace("%s, ttl mgr flush end, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, endNs - startNs);
7,371✔
502

503
  TAOS_RETURN(code);
7,371✔
504
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc