• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4850

14 Nov 2025 08:06AM UTC coverage: 63.728% (-0.1%) from 63.829%
#4850

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

355 of 675 new or added lines in 18 files covered. (52.59%)

634 existing lines in 110 files now uncovered.

149066 of 233910 relevant lines covered (63.73%)

115676883.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.69
/source/dnode/vnode/src/tsdb/tsdbFS2.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cos.h"
17
#include "tsdbFS2.h"
18
#include "tsdbUpgrade.h"
19
#include "vnd.h"
20

21
#define BLOCK_COMMIT_FACTOR 3
22

23
typedef struct STFileHashEntry {
24
  struct STFileHashEntry *next;
25
  char                    fname[TSDB_FILENAME_LEN];
26
} STFileHashEntry;
27

28
typedef struct {
29
  int32_t           numFile;
30
  int32_t           numBucket;
31
  STFileHashEntry **buckets;
32
} STFileHash;
33

34
static const char *gCurrentFname[] = {
35
    [TSDB_FCURRENT] = "current.json",
36
    [TSDB_FCURRENT_C] = "current.c.json",
37
    [TSDB_FCURRENT_M] = "current.m.json",
38
};
39

40
static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
4,024,698✔
41
  fs[0] = taosMemoryCalloc(1, sizeof(*fs[0]));
4,024,698✔
42
  if (fs[0] == NULL) {
4,036,355✔
43
    return terrno;
×
44
  }
45

46
  fs[0]->tsdb = pTsdb;
4,036,821✔
47
  int32_t code = tsem_init(&fs[0]->canEdit, 0, 1);
4,037,955✔
48
  if (code) {
4,031,372✔
49
    taosMemoryFree(fs[0]);
×
50
    return code;
×
51
  }
52

53
  fs[0]->fsstate = TSDB_FS_STATE_NORMAL;
4,031,372✔
54
  fs[0]->neid = 0;
4,035,215✔
55
  TARRAY2_INIT(fs[0]->fSetArr);
4,034,003✔
56
  TARRAY2_INIT(fs[0]->fSetArrTmp);
4,033,576✔
57

58
  return 0;
4,033,820✔
59
}
60

61
static void destroy_fs(STFileSystem **fs) {
4,037,044✔
62
  if (fs[0] == NULL) return;
4,037,044✔
63

64
  TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
4,037,044✔
65
  TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
4,037,255✔
66
  if (tsem_destroy(&fs[0]->canEdit) != 0) {
4,038,609✔
67
    tsdbError("failed to destroy semaphore");
×
68
  }
69
  taosMemoryFree(fs[0]);
4,036,815✔
70
  fs[0] = NULL;
4,036,898✔
71
}
72

73
void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
24,910,588✔
74
  int32_t offset = 0;
24,910,588✔
75

76
  vnodeGetPrimaryPath(pTsdb->pVnode, false, fname, TSDB_FILENAME_LEN);
24,910,588✔
77
  offset = strlen(fname);
24,910,480✔
78
  snprintf(fname + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s%s", TD_DIRSEP, pTsdb->name, TD_DIRSEP,
24,912,221✔
79
           gCurrentFname[ftype]);
24,883,246✔
80
}
24,908,118✔
81

82
static int32_t save_json(const cJSON *json, const char *fname) {
6,803,842✔
83
  int32_t   code = 0;
6,803,842✔
84
  int32_t   lino;
85
  char     *data = NULL;
6,803,842✔
86
  TdFilePtr fp = NULL;
6,803,842✔
87

88
  data = cJSON_PrintUnformatted(json);
6,803,842✔
89
  if (data == NULL) {
6,804,652✔
90
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
91
  }
92

93
  fp = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
6,804,652✔
94
  if (fp == NULL) {
6,806,002✔
95
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
96
  }
97

98
  if (taosWriteFile(fp, data, strlen(data)) < 0) {
6,806,002✔
99
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
100
  }
101

102
  if (taosFsyncFile(fp) < 0) {
6,805,774✔
103
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
104
  }
105

106
_exit:
6,804,145✔
107
  if (code) {
6,802,697✔
108
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
109
  }
110
  taosMemoryFree(data);
6,804,878✔
111
  taosCloseFileWithLog(&fp);
6,803,124✔
112
  return code;
6,805,320✔
113
}
114

115
static int32_t load_json(const char *fname, cJSON **json) {
1,124,750✔
116
  int32_t code = 0;
1,124,750✔
117
  int32_t lino;
118
  char   *data = NULL;
1,124,750✔
119

120
  TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
1,124,750✔
121
  if (fp == NULL) {
1,124,234✔
122
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
123
  }
124

125
  int64_t size;
1,123,325✔
126
  code = taosFStatFile(fp, &size, NULL);
1,124,234✔
127
  if (code != 0) {
1,122,757✔
128
    TSDB_CHECK_CODE(code, lino, _exit);
×
129
  }
130

131
  data = taosMemoryMalloc(size + 1);
1,122,757✔
132
  if (data == NULL) {
1,124,067✔
133
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
134
  }
135

136
  if (taosReadFile(fp, data, size) < 0) {
1,124,067✔
137
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
138
  }
139
  data[size] = '\0';
1,124,750✔
140

141
  json[0] = cJSON_Parse(data);
1,124,750✔
142
  if (json[0] == NULL) {
1,124,726✔
143
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
144
  }
145

146
_exit:
1,124,726✔
147
  if (code) {
1,124,726✔
148
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
149
    json[0] = NULL;
×
150
  }
151
  taosCloseFileWithLog(&fp);
1,124,726✔
152
  taosMemoryFree(data);
1,124,750✔
153
  return code;
1,124,750✔
154
}
155

156
int32_t save_fs(const TFileSetArray *arr, const char *fname) {
6,805,319✔
157
  int32_t code = 0;
6,805,319✔
158
  int32_t lino = 0;
6,805,319✔
159

160
  cJSON *json = cJSON_CreateObject();
6,805,319✔
161
  if (json == NULL) {
6,805,544✔
162
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
163
  }
164

165
  // fmtv
166
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
6,805,544✔
167
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
168
  }
169

170
  // fset
171
  cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
6,805,439✔
172
  if (!ajson) {
6,805,253✔
173
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
174
  }
175
  const STFileSet *fset;
176
  TARRAY2_FOREACH(arr, fset) {
24,814,476✔
177
    cJSON *item = cJSON_CreateObject();
18,007,403✔
178
    if (!item) {
18,008,793✔
179
      TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
180
    }
181
    (void)cJSON_AddItemToArray(ajson, item);
18,008,793✔
182

183
    code = tsdbTFileSetToJson(fset, item);
18,009,947✔
184
    TSDB_CHECK_CODE(code, lino, _exit);
18,009,223✔
185
  }
186

187
  code = save_json(json, fname);
6,804,140✔
188
  TSDB_CHECK_CODE(code, lino, _exit);
6,805,062✔
189

190
_exit:
6,805,062✔
191
  if (code) {
6,805,062✔
192
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
193
  }
194
  cJSON_Delete(json);
6,805,062✔
195
  return code;
6,806,308✔
196
}
197

198
static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
1,119,656✔
199
  int32_t code = 0;
1,119,656✔
200
  int32_t lino = 0;
1,119,656✔
201

202
  TARRAY2_CLEAR(arr, tsdbTFileSetClear);
1,119,656✔
203

204
  // load json
205
  cJSON *json = NULL;
1,124,750✔
206
  code = load_json(fname, &json);
1,124,750✔
207
  TSDB_CHECK_CODE(code, lino, _exit);
1,124,726✔
208

209
  // parse json
210
  const cJSON *item1;
211

212
  /* fmtv */
213
  item1 = cJSON_GetObjectItem(json, "fmtv");
1,124,726✔
214
  if (cJSON_IsNumber(item1)) {
1,124,726✔
215
    if (item1->valuedouble != 1) {
1,124,750✔
216
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
217
    }
218
  } else {
219
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
220
  }
221

222
  /* fset */
223
  item1 = cJSON_GetObjectItem(json, "fset");
1,124,750✔
224
  if (cJSON_IsArray(item1)) {
1,124,750✔
225
    const cJSON *item2;
226
    cJSON_ArrayForEach(item2, item1) {
1,833,278✔
227
      STFileSet *fset;
708,382✔
228
      code = tsdbJsonToTFileSet(pTsdb, item2, &fset);
708,552✔
229
      TSDB_CHECK_CODE(code, lino, _exit);
709,540✔
230

231
      code = TARRAY2_APPEND(arr, fset);
709,540✔
232
      TSDB_CHECK_CODE(code, lino, _exit);
708,573✔
233
    }
234
    TARRAY2_SORT(arr, tsdbTFileSetCmprFn);
1,124,254✔
235
  } else {
236
    code = TSDB_CODE_FILE_CORRUPTED;
×
237
    TSDB_CHECK_CODE(code, lino, _exit);
×
238
  }
239

240
_exit:
1,123,841✔
241
  if (code) {
1,124,750✔
242
    tsdbError("%s failed at %s:%d since %s, fname:%s", __func__, __FILE__, lino, tstrerror(code), fname);
×
243
  }
244
  if (json) {
1,124,726✔
245
    cJSON_Delete(json);
1,124,726✔
246
  }
247
  return code;
1,124,750✔
248
}
249

250
static int32_t apply_commit(STFileSystem *fs) {
3,891,320✔
251
  int32_t        code = 0;
3,891,320✔
252
  int32_t        lino;
253
  TFileSetArray *fsetArray1 = fs->fSetArr;
3,891,320✔
254
  TFileSetArray *fsetArray2 = fs->fSetArrTmp;
3,892,092✔
255
  int32_t        i1 = 0, i2 = 0;
3,891,320✔
256

257
  while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
21,990,085✔
258
    STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_GET(fsetArray1, i1) : NULL;
18,097,685✔
259
    STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_GET(fsetArray2, i2) : NULL;
18,099,420✔
260

261
    if (fset1 && fset2) {
18,099,420✔
262
      if (fset1->fid < fset2->fid) {
4,985,478✔
263
        // delete fset1
264
        tsdbTFileSetRemove(fset1);
83,820✔
265
        i1++;
83,820✔
266
      } else if (fset1->fid > fset2->fid) {
4,901,658✔
267
        // create new file set with fid of fset2->fid
268
        code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
11,729✔
269
        TSDB_CHECK_CODE(code, lino, _exit);
11,729✔
270
        code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
11,729✔
271
        TSDB_CHECK_CODE(code, lino, _exit);
11,729✔
272
        i1++;
11,729✔
273
        i2++;
11,729✔
274
      } else {
275
        // edit
276
        code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1);
4,889,438✔
277
        TSDB_CHECK_CODE(code, lino, _exit);
4,889,438✔
278
        i1++;
4,889,438✔
279
        i2++;
4,889,438✔
280
      }
281
    } else if (fset1) {
13,113,942✔
282
      // delete fset1
283
      tsdbTFileSetRemove(fset1);
5,653✔
284
      i1++;
5,653✔
285
    } else {
286
      // create new file set with fid of fset2->fid
287
      code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
13,108,289✔
288
      TSDB_CHECK_CODE(code, lino, _exit);
13,106,745✔
289
      code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
13,107,444✔
290
      TSDB_CHECK_CODE(code, lino, _exit);
13,107,444✔
291
      i1++;
13,107,444✔
292
      i2++;
13,107,444✔
293
    }
294
  }
295

296
_exit:
3,891,579✔
297
  if (code) {
3,891,001✔
298
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
299
  }
300
  return code;
3,891,312✔
301
}
302

303
static int32_t commit_edit(STFileSystem *fs) {
3,890,640✔
304
  char current[TSDB_FILENAME_LEN];
3,887,606✔
305
  char current_t[TSDB_FILENAME_LEN];
3,888,739✔
306

307
  current_fname(fs->tsdb, current, TSDB_FCURRENT);
3,891,773✔
308
  if (fs->etype == TSDB_FEDIT_COMMIT) {
3,892,184✔
309
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
2,733,896✔
310
  } else {
311
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
1,157,516✔
312
  }
313

314
  int32_t code;
315
  int32_t lino;
316
  TSDB_CHECK_CODE(taosRenameFile(current_t, current), lino, _exit);
3,892,184✔
317

318
  code = apply_commit(fs);
3,892,545✔
319
  TSDB_CHECK_CODE(code, lino, _exit);
3,892,161✔
320

321
_exit:
3,892,161✔
322
  if (code) {
3,889,701✔
323
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
324
              tstrerror(code));
325
  } else {
326
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
3,889,701✔
327
  }
328
  return code;
3,891,629✔
329
}
330

331
// static int32_t
332
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs);
UNCOV
333
static int32_t apply_abort(STFileSystem *fs) { return tsdbFSDoSanAndFix(fs); }
×
334

UNCOV
335
static int32_t abort_edit(STFileSystem *fs) {
×
UNCOV
336
  char fname[TSDB_FILENAME_LEN];
×
337

UNCOV
338
  if (fs->etype == TSDB_FEDIT_COMMIT) {
×
UNCOV
339
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
×
340
  } else {
341
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
×
342
  }
343

344
  int32_t code;
345
  int32_t lino;
UNCOV
346
  if ((code = taosRemoveFile(fname))) {
×
347
    code = TAOS_SYSTEM_ERROR(code);
×
348
    TSDB_CHECK_CODE(code, lino, _exit);
×
349
  }
350

UNCOV
351
  code = apply_abort(fs);
×
UNCOV
352
  TSDB_CHECK_CODE(code, lino, _exit);
×
353

UNCOV
354
_exit:
×
UNCOV
355
  if (code) {
×
356
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
357
              tstrerror(code));
358
  } else {
UNCOV
359
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
×
360
  }
UNCOV
361
  return code;
×
362
}
363

364
static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) {
1,375,323✔
365
  int32_t code = 0;
1,375,323✔
366
  int32_t lino = 0;
1,375,323✔
367

368
  // check file existence
369
  if (!taosCheckExistFile(fobj->fname)) {
1,375,323✔
370
    bool found = false;
×
371

372
    if (tsSsEnabled && fobj->f->lcn > 1) {
×
373
      char fname1[TSDB_FILENAME_LEN];
×
374
      tsdbTFileLastChunkName(fs->tsdb, fobj->f, fname1);
×
375
      if (!taosCheckExistFile(fname1)) {
×
376
        code = TSDB_CODE_FILE_CORRUPTED;
×
377
        tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fname1);
×
378
        return code;
×
379
      }
380

381
      found = true;
×
382
    }
383

384
    if (!found) {
×
385
      code = TSDB_CODE_FILE_CORRUPTED;
×
386
      tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fobj->fname);
×
387
      return code;
×
388
    }
389
  }
390

391
  return 0;
1,375,323✔
392
}
393

394
static void tsdbFSDestroyFileObjHash(STFileHash *hash);
395

396
static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname) {
2,494,970✔
397
  STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry));
2,494,970✔
398
  if (entry == NULL) return terrno;
2,495,037✔
399

400
  tstrncpy(entry->fname, fname, TSDB_FILENAME_LEN);
2,495,037✔
401

402
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
2,495,037✔
403

404
  entry->next = hash->buckets[idx];
2,495,037✔
405
  hash->buckets[idx] = entry;
2,495,037✔
406
  hash->numFile++;
2,495,037✔
407

408
  return 0;
2,495,037✔
409
}
410

411
static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
1,119,690✔
412
  int32_t code = 0;
1,119,690✔
413
  int32_t lino;
414
  char    fname[TSDB_FILENAME_LEN];
1,118,781✔
415

416
  // init hash table
417
  hash->numFile = 0;
1,119,690✔
418
  hash->numBucket = 4096;
1,119,690✔
419
  hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *));
1,119,690✔
420
  if (hash->buckets == NULL) {
1,119,690✔
421
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
422
  }
423

424
  // vnode.json
425
  current_fname(fs->tsdb, fname, TSDB_FCURRENT);
1,119,690✔
426
  code = tsdbFSAddEntryToFileObjHash(hash, fname);
1,119,690✔
427
  TSDB_CHECK_CODE(code, lino, _exit);
1,119,690✔
428

429
  // other
430
  STFileSet *fset = NULL;
1,119,690✔
431
  TARRAY2_FOREACH(fs->fSetArr, fset) {
1,826,700✔
432
    // data file
433
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
3,535,050✔
434
      if (fset->farr[i] != NULL) {
2,828,040✔
435
        code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname);
689,480✔
436
        TSDB_CHECK_CODE(code, lino, _exit);
689,480✔
437

438
        if (TSDB_FTYPE_DATA == i && fset->farr[i]->f->lcn > 0) {
689,480✔
439
          STFileObj *fobj = fset->farr[i];
×
440
          int32_t    lcn = fobj->f->lcn;
×
441
          char       lcn_name[TSDB_FILENAME_LEN];
×
442

443
          snprintf(lcn_name, TSDB_FQDN_LEN, "%s", fobj->fname);
×
444
          char *dot = strrchr(lcn_name, '.');
×
445
          if (dot) {
×
446
            snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lcn_name), "%d.data", lcn);
×
447

448
            code = tsdbFSAddEntryToFileObjHash(hash, lcn_name);
×
449
            TSDB_CHECK_CODE(code, lino, _exit);
×
450
          }
451
        }
452
      }
453
    }
454

455
    // stt file
456
    SSttLvl *lvl = NULL;
707,010✔
457
    TARRAY2_FOREACH(fset->lvlArr, lvl) {
1,364,631✔
458
      STFileObj *fobj;
459
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
1,343,488✔
460
        code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname);
685,867✔
461
        TSDB_CHECK_CODE(code, lino, _exit);
685,867✔
462
      }
463
    }
464
  }
465

466
_exit:
1,119,690✔
467
  if (code) {
1,119,690✔
468
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
469
    tsdbFSDestroyFileObjHash(hash);
×
470
  }
471
  return code;
1,119,690✔
472
}
473

474
static const STFileHashEntry *tsdbFSGetFileObjHashEntry(STFileHash *hash, const char *fname) {
2,495,037✔
475
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
2,495,037✔
476

477
  STFileHashEntry *entry = hash->buckets[idx];
2,495,037✔
478
  while (entry) {
2,496,939✔
479
    if (strcmp(entry->fname, fname) == 0) {
2,496,939✔
480
      return entry;
2,494,721✔
481
    }
482
    entry = entry->next;
1,902✔
483
  }
484

UNCOV
485
  return NULL;
×
486
}
487

488
static void tsdbFSDestroyFileObjHash(STFileHash *hash) {
1,119,690✔
489
  for (int32_t i = 0; i < hash->numBucket; i++) {
2,147,483,647✔
490
    STFileHashEntry *entry = hash->buckets[i];
2,147,483,647✔
491
    while (entry) {
2,147,483,647✔
492
      STFileHashEntry *next = entry->next;
2,495,037✔
493
      taosMemoryFree(entry);
2,495,037✔
494
      entry = next;
2,495,037✔
495
    }
496
  }
497
  taosMemoryFree(hash->buckets);
1,119,690✔
498
  memset(hash, 0, sizeof(*hash));
1,119,690✔
499
}
1,119,690✔
500

501
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
1,124,726✔
502
  int32_t code = 0;
1,124,726✔
503
  int32_t lino = 0;
1,124,726✔
504
  int32_t corrupt = false;
1,124,726✔
505

506
  if (fs->tsdb->pVnode->mounted) goto _exit;
1,124,726✔
507

508
  {  // scan each file
509
    STFileSet *fset = NULL;
1,119,666✔
510
    TARRAY2_FOREACH(fs->fSetArr, fset) {
1,826,652✔
511
      // data file
512
      for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
3,534,930✔
513
        if (fset->farr[ftype] == NULL) continue;
2,827,944✔
514
        STFileObj *fobj = fset->farr[ftype];
689,480✔
515
        code = tsdbFSDoScanAndFixFile(fs, fobj);
689,480✔
516
        if (code) {
689,480✔
517
          fset->maxVerValid = (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
518
          corrupt = true;
×
519
        }
520
      }
521

522
      // stt file
523
      SSttLvl *lvl;
524
      TARRAY2_FOREACH(fset->lvlArr, lvl) {
1,364,583✔
525
        STFileObj *fobj;
526
        TARRAY2_FOREACH(lvl->fobjArr, fobj) {
1,343,464✔
527
          code = tsdbFSDoScanAndFixFile(fs, fobj);
685,843✔
528
          if (code) {
685,867✔
529
            fset->maxVerValid =
×
530
                (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
531
            corrupt = true;
×
532
          }
533
        }
534
      }
535
    }
536
  }
537

538
  if (corrupt) {
1,119,666✔
539
    tsdbError("vgId:%d, not to clear dangling files due to fset incompleteness", TD_VID(fs->tsdb->pVnode));
×
540
    fs->fsstate = TSDB_FS_STATE_INCOMPLETE;
×
541
    code = 0;
×
542
    goto _exit;
×
543
  }
544

545
  {  // clear unreferenced files
546
    STfsDir *dir = NULL;
1,119,666✔
547
    TAOS_CHECK_GOTO(tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path, &dir), &lino, _exit);
1,119,666✔
548

549
    STFileHash fobjHash = {0};
1,119,690✔
550
    code = tsdbFSCreateFileObjHash(fs, &fobjHash);
1,119,690✔
551
    if (code) goto _close_dir;
1,119,690✔
552

553
    for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) {
4,734,101✔
554
      if (taosIsDir(file->aname)) continue;
3,614,727✔
555

556
      if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
2,495,037✔
UNCOV
557
        tsdbRemoveFile(file->aname);
×
558
      }
559
    }
560

561
    tsdbFSDestroyFileObjHash(&fobjHash);
1,119,690✔
562

563
  _close_dir:
1,119,690✔
564
    tfsClosedir(dir);
1,119,690✔
565
  }
566

567
_exit:
1,124,750✔
568
  if (code) {
1,124,750✔
569
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
570
  }
571
  return code;
1,124,750✔
572
}
573

574
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
1,124,702✔
575
  fs->neid = 0;
1,124,702✔
576

577
  // get max commit id
578
  const STFileSet *fset;
579
  TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
1,834,218✔
580

581
  // scan and fix
582
  int32_t code = 0;
1,124,702✔
583
  int32_t lino = 0;
1,124,702✔
584

585
  code = tsdbFSDoSanAndFix(fs);
1,124,702✔
586
  TSDB_CHECK_CODE(code, lino, _exit);
1,124,750✔
587

588
_exit:
1,124,750✔
589
  if (code) {
1,124,750✔
590
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
591
  }
592
  return code;
1,124,750✔
593
}
594

595
static int32_t tsdbFSDupState(STFileSystem *fs) {
5,013,644✔
596
  int32_t code;
597

598
  const TFileSetArray *src = fs->fSetArr;
5,013,644✔
599
  TFileSetArray       *dst = fs->fSetArrTmp;
5,014,348✔
600

601
  TARRAY2_CLEAR(dst, tsdbTFileSetClear);
9,919,492✔
602

603
  const STFileSet *fset1;
604
  TARRAY2_FOREACH(src, fset1) {
10,702,697✔
605
    STFileSet *fset2;
5,687,354✔
606
    code = tsdbTFileSetInitCopy(fs->tsdb, fset1, &fset2);
5,688,218✔
607
    if (code) return code;
5,688,942✔
608
    code = TARRAY2_APPEND(dst, fset2);
5,688,942✔
609
    if (code) return code;
5,688,918✔
610
  }
611

612
  return 0;
5,014,574✔
613
}
614

615
static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
4,019,992✔
616
  int32_t code = 0;
4,019,992✔
617
  int32_t lino = 0;
4,019,992✔
618
  STsdb  *pTsdb = fs->tsdb;
4,019,992✔
619

620
  char fCurrent[TSDB_FILENAME_LEN];
4,033,238✔
621
  char cCurrent[TSDB_FILENAME_LEN];
4,033,238✔
622
  char mCurrent[TSDB_FILENAME_LEN];
4,033,238✔
623

624
  current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
4,038,528✔
625
  current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
4,038,528✔
626
  current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
4,038,979✔
627

628
  if (taosCheckExistFile(fCurrent)) {  // current.json exists
4,038,979✔
629
    code = load_fs(pTsdb, fCurrent, fs->fSetArr);
1,124,750✔
630
    TSDB_CHECK_CODE(code, lino, _exit);
1,124,750✔
631

632
    if (taosCheckExistFile(cCurrent)) {
1,124,750✔
633
      // current.c.json exists
634

635
      fs->etype = TSDB_FEDIT_COMMIT;
×
636
      if (rollback) {
×
637
        code = abort_edit(fs);
×
638
        TSDB_CHECK_CODE(code, lino, _exit);
×
639
      } else {
640
        code = load_fs(pTsdb, cCurrent, fs->fSetArrTmp);
×
641
        TSDB_CHECK_CODE(code, lino, _exit);
×
642

643
        code = commit_edit(fs);
×
644
        TSDB_CHECK_CODE(code, lino, _exit);
×
645
      }
646
    } else if (taosCheckExistFile(mCurrent)) {
1,124,750✔
647
      // current.m.json exists
648
      fs->etype = TSDB_FEDIT_MERGE;
×
649
      code = abort_edit(fs);
×
650
      TSDB_CHECK_CODE(code, lino, _exit);
×
651
    }
652

653
    code = tsdbFSDupState(fs);
1,124,750✔
654
    TSDB_CHECK_CODE(code, lino, _exit);
1,124,702✔
655

656
    code = tsdbFSScanAndFix(fs);
1,124,702✔
657
    TSDB_CHECK_CODE(code, lino, _exit);
1,124,750✔
658
  } else {
659
    code = save_fs(fs->fSetArr, fCurrent);
2,914,229✔
660
    TSDB_CHECK_CODE(code, lino, _exit);
2,912,010✔
661
  }
662

663
_exit:
4,035,851✔
664
  if (code) {
4,038,513✔
665
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
666
  } else {
667
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
4,038,513✔
668
  }
669
  return code;
4,039,421✔
670
}
671

672
static void close_file_system(STFileSystem *fs) {
4,038,259✔
673
  TARRAY2_CLEAR(fs->fSetArr, tsdbTFileSetClear);
17,865,661✔
674
  TARRAY2_CLEAR(fs->fSetArrTmp, tsdbTFileSetClear);
17,845,907✔
675
}
4,037,344✔
676

677
static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) {
×
678
  if (pSet1->fid < pSet2->fid) {
×
679
    return -1;
×
680
  } else if (pSet1->fid > pSet2->fid) {
×
681
    return 1;
×
682
  }
683
  return 0;
×
684
}
685

686
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
3,891,320✔
687
  int32_t code = 0;
3,891,320✔
688
  int32_t lino = 0;
3,891,320✔
689

690
  code = tsdbFSDupState(fs);
3,891,320✔
691
  if (code) return code;
3,889,872✔
692

693
  TFileSetArray  *fsetArray = fs->fSetArrTmp;
3,889,872✔
694
  STFileSet      *fset = NULL;
3,890,758✔
695
  const STFileOp *op;
696
  int32_t         fid = INT32_MIN;
3,890,325✔
697
  TSKEY           now = taosGetTimestampMs();
3,889,432✔
698
  TARRAY2_FOREACH_PTR(opArray, op) {
24,937,511✔
699
    if (!fset || fset->fid != op->fid) {
21,050,870✔
700
      STFileSet tfset = {.fid = op->fid};
16,347,901✔
701
      fset = &tfset;
16,347,949✔
702
      STFileSet **fsetPtr = TARRAY2_SEARCH(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ);
16,347,901✔
703
      fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
16,347,901✔
704

705
      if (!fset) {
16,347,901✔
706
        code = tsdbTFileSetInit(op->fid, &fset);
13,119,950✔
707
        TSDB_CHECK_CODE(code, lino, _exit);
13,119,197✔
708

709
        code = TARRAY2_SORT_INSERT(fsetArray, fset, tsdbTFileSetCmprFn);
13,117,644✔
710
        TSDB_CHECK_CODE(code, lino, _exit);
13,117,644✔
711
      }
712
    }
713

714
    code = tsdbTFileSetEdit(fs->tsdb, fset, op);
21,051,926✔
715
    TSDB_CHECK_CODE(code, lino, _exit);
21,050,949✔
716

717
    if (fid != op->fid) {
21,050,949✔
718
      fid = op->fid;
16,347,002✔
719
      if (etype == TSDB_FEDIT_COMMIT) {
16,347,179✔
720
        fset->lastCommit = now;
15,189,825✔
721
      } else if (etype == TSDB_FEDIT_COMPACT) {
1,157,354✔
722
        fset->lastCompact = now;
109,895✔
723
      } else if (etype == TSDB_FEDIT_SSMIGRATE) {
1,047,459✔
724
        fset->lastMigrate = now;
×
725
      } else if (etype == TSDB_FEDIT_ROLLUP) {
1,047,459✔
726
        fset->lastRollupLevel = fs->rollupLevel;
34,638✔
727
        fset->lastRollup = now;
34,638✔
728
        fset->lastCompact = now;  // rollup implies compact
34,638✔
729
      }
730
    }
731
  }
732

733
  // remove empty empty stt level and empty file set
734
  int32_t i = 0;
3,889,455✔
735
  while (i < TARRAY2_SIZE(fsetArray)) {
21,982,003✔
736
    fset = TARRAY2_GET(fsetArray, i);
18,098,878✔
737

738
    SSttLvl *lvl;
739
    int32_t  j = 0;
18,097,288✔
740
    while (j < TARRAY2_SIZE(fset->lvlArr)) {
39,689,816✔
741
      lvl = TARRAY2_GET(fset->lvlArr, j);
21,598,396✔
742

743
      if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
21,598,912✔
744
        TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
1,438,475✔
745
      } else {
746
        j++;
20,154,215✔
747
      }
748
    }
749

750
    if (tsdbTFileSetIsEmpty(fset)) {
18,091,902✔
751
      TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear);
90,787✔
752
    } else {
753
      i++;
18,003,075✔
754
    }
755
  }
756

757
_exit:
3,886,374✔
758
  if (code) {
3,887,039✔
759
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
760
  }
761
  return code;
3,890,244✔
762
}
763

764
// return error code
765
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) {
4,027,003✔
766
  int32_t code;
767
  int32_t lino;
768

769
  code = tsdbCheckAndUpgradeFileSystem(pTsdb, rollback);
4,027,003✔
770
  TSDB_CHECK_CODE(code, lino, _exit);
4,037,961✔
771

772
  code = create_fs(pTsdb, fs);
4,037,961✔
773
  TSDB_CHECK_CODE(code, lino, _exit);
4,032,051✔
774

775
  code = open_fs(fs[0], rollback);
4,032,051✔
776
  TSDB_CHECK_CODE(code, lino, _exit);
4,038,979✔
777

778
_exit:
4,038,979✔
779
  if (code) {
4,038,979✔
780
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
781
    destroy_fs(fs);
×
782
  } else {
783
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
4,038,979✔
784
  }
785
  return code;
4,038,979✔
786
}
787

788
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block);
789
extern void tsdbStopAllCompTask(STsdb *tsdb);
790
extern void tsdbStopAllRetentionTask(STsdb *tsdb);
791

792
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
4,059,247✔
793
  STFileSystem *fs = pTsdb->pFS;
4,059,247✔
794
  SArray       *asyncTasks = taosArrayInit(0, sizeof(SVATaskID));
4,058,708✔
795
  if (asyncTasks == NULL) {
4,057,729✔
796
    return terrno;
×
797
  }
798

799
  (void)taosThreadMutexLock(&pTsdb->mutex);
4,057,729✔
800

801
  // disable
802
  pTsdb->bgTaskDisabled = true;
4,058,620✔
803

804
  // collect channel
805
  STFileSet *fset;
806
  TARRAY2_FOREACH(fs->fSetArr, fset) {
17,888,327✔
807
    if (taosArrayPush(asyncTasks, &fset->mergeTask) == NULL       //
27,662,610✔
808
        || taosArrayPush(asyncTasks, &fset->compactTask) == NULL  //
27,662,610✔
809
        || taosArrayPush(asyncTasks, &fset->retentionTask) == NULL
27,662,454✔
810
        || taosArrayPush(asyncTasks, &fset->migrateTask) == NULL) {
27,662,610✔
UNCOV
811
      taosArrayDestroy(asyncTasks);
×
812
      (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
813
      return terrno;
×
814
    }
815
    tsdbFSSetBlockCommit(fset, false);
13,831,383✔
816
  }
817

818
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
4,057,733✔
819

820
  // destroy all channels
821
  for (int32_t k = 0; k < 2; k++) {
12,171,406✔
822
    for (int32_t i = 0; i < taosArrayGetSize(asyncTasks); i++) {
118,754,502✔
823
      SVATaskID *task = taosArrayGet(asyncTasks, i);
110,632,370✔
824
      if (k == 0) {
110,633,942✔
825
        (void)vnodeACancel(task);
55,323,926✔
826
      } else {
827
        vnodeAWait(task);
55,310,016✔
828
      }
829
    }
830
  }
831
  taosArrayDestroy(asyncTasks);
4,058,363✔
832

833
#ifdef TD_ENTERPRISE
834
  tsdbStopAllCompTask(pTsdb);
4,057,359✔
835
#endif
836
  tsdbStopAllRetentionTask(pTsdb);
4,059,138✔
837
  return 0;
4,058,987✔
838
}
839

840
void tsdbEnableBgTask(STsdb *pTsdb) {
20,386✔
841
  (void)taosThreadMutexLock(&pTsdb->mutex);
20,386✔
842
  pTsdb->bgTaskDisabled = false;
20,386✔
843
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
20,386✔
844
}
20,386✔
845

846
void tsdbCloseFS(STFileSystem **fs) {
4,038,329✔
847
  if (fs[0] == NULL) return;
4,038,329✔
848

849
  int32_t code = tsdbDisableAndCancelAllBgTask((*fs)->tsdb);
4,038,329✔
850
  if (code) {
4,038,601✔
851
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID((*fs)->tsdb->pVnode), __func__, __LINE__,
×
852
              tstrerror(code));
853
  }
854
  close_file_system(fs[0]);
4,038,601✔
855
  destroy_fs(fs);
4,036,909✔
856
  return;
4,037,423✔
857
}
858

859
int64_t tsdbFSAllocEid(STFileSystem *fs) {
4,037,508✔
860
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
4,037,508✔
861
  int64_t cid = ++fs->neid;
4,037,909✔
862
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
4,036,694✔
863
  return cid;
4,036,805✔
864
}
865

866
void tsdbFSUpdateEid(STFileSystem *fs, int64_t cid) {
144,805✔
867
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
144,805✔
868
  fs->neid = TMAX(fs->neid, cid);
144,805✔
869
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
144,805✔
870
}
144,805✔
871

872
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
3,891,320✔
873
  int32_t code = 0;
3,891,320✔
874
  int32_t lino;
875
  char    current_t[TSDB_FILENAME_LEN];
3,888,286✔
876

877
  if (etype == TSDB_FEDIT_COMMIT) {
3,890,799✔
878
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
2,733,283✔
879
  } else {
880
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
1,157,516✔
881
  }
882

883
  if (tsem_wait(&fs->canEdit) != 0) {
3,891,320✔
884
    tsdbError("vgId:%d failed to wait semaphore", TD_VID(fs->tsdb->pVnode));
×
885
  }
886
  fs->etype = etype;
3,888,992✔
887

888
  // edit
889
  code = edit_fs(fs, opArray, etype);
3,890,827✔
890
  TSDB_CHECK_CODE(code, lino, _exit);
3,890,635✔
891

892
  // save fs
893
  code = save_fs(fs->fSetArrTmp, current_t);
3,890,635✔
894
  TSDB_CHECK_CODE(code, lino, _exit);
3,892,545✔
895

896
_exit:
3,892,545✔
897
  if (code) {
3,892,545✔
898
    tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, lino,
×
899
              tstrerror(code), etype);
900
  } else {
901
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, etype);
3,892,545✔
902
  }
903
  return code;
3,892,545✔
904
}
905

906
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
31,169,259✔
907
  if (block) {
31,169,259✔
908
    fset->blockCommit = true;
2,601✔
909
  } else {
910
    fset->blockCommit = false;
31,166,658✔
911
    if (fset->numWaitCommit > 0) {
31,166,658✔
912
      (void)taosThreadCondSignal(&fset->canCommit);
2,142✔
913
    }
914
  }
915
}
31,166,535✔
916

917
void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
15,211,474✔
918
  (void)taosThreadMutexLock(&tsdb->mutex);
15,211,474✔
919
  STFileSet *fset;
15,208,676✔
920
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
15,210,599✔
921
  bool blockCommit = false;
15,210,290✔
922
  if (fset) {
15,210,290✔
923
    blockCommit = fset->blockCommit;
2,119,489✔
924
  }
925
  if (fset) {
15,210,290✔
926
    METRICS_TIMING_BLOCK(tsdb->pVnode->writeMetrics.block_commit_time, METRIC_LEVEL_HIGH, {
2,121,631✔
927
      while (fset->blockCommit) {
928
        fset->numWaitCommit++;
929
        (void)taosThreadCondWait(&fset->canCommit, &tsdb->mutex);
930
        fset->numWaitCommit--;
931
      }
932
    });
933
  }
934
  if (blockCommit) {
15,207,807✔
935
    METRICS_UPDATE(tsdb->pVnode->writeMetrics.blocked_commit_count, METRIC_LEVEL_HIGH, 1);
2,142✔
936
  }
937
  (void)taosThreadMutexUnlock(&tsdb->mutex);
15,207,807✔
938
  return;
15,207,810✔
939
}
940

941
// IMPORTANT: the caller must hold fs->tsdb->mutex
942
int32_t tsdbFSEditCommit(STFileSystem *fs) {
3,892,545✔
943
  int32_t code = 0;
3,892,545✔
944
  int32_t lino = 0;
3,892,545✔
945

946
  // commit
947
  code = commit_edit(fs);
3,892,545✔
948
  TSDB_CHECK_CODE(code, lino, _exit);
3,892,545✔
949

950
  // schedule merge
951
  int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
3,892,545✔
952
  if (sttTrigger > 1 && !fs->tsdb->bgTaskDisabled) {
3,892,545✔
953
    STFileSet *fset;
954
    TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
21,057,964✔
955
      if (TARRAY2_SIZE(fset->lvlArr) == 0) {
17,337,876✔
956
        tsdbFSSetBlockCommit(fset, false);
676,644✔
957
        continue;
676,644✔
958
      }
959

960
      SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
16,661,232✔
961
      if (lvl->level != 0) {
16,661,232✔
962
        tsdbFSSetBlockCommit(fset, false);
1,072,162✔
963
        continue;
1,072,162✔
964
      }
965

966
      // bool    skipMerge = false;
967
      int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
15,589,070✔
968
      if (numFile >= sttTrigger && (!vnodeATaskValid(&fset->mergeTask))) {
15,589,070✔
969
        SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
1,004,001✔
970
        if (arg == NULL) {
1,004,001✔
971
          code = terrno;
×
972
          TSDB_CHECK_CODE(code, lino, _exit);
×
973
        }
974

975
        arg->tsdb = fs->tsdb;
1,004,001✔
976
        arg->fid = fset->fid;
1,004,001✔
977

978
        code = vnodeAsync(MERGE_TASK_ASYNC, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, &fset->mergeTask);
1,004,001✔
979
        TSDB_CHECK_CODE(code, lino, _exit);
1,004,001✔
980
      }
981

982
      if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
15,589,070✔
983
        tsdbFSSetBlockCommit(fset, true);
2,601✔
984
      } else {
985
        tsdbFSSetBlockCommit(fset, false);
15,586,469✔
986
      }
987
    }
988
  }
989

990
_exit:
3,892,545✔
991
  if (code) {
3,892,545✔
992
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
×
993
  } else {
994
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
3,892,545✔
995
  }
996
  if (tsem_post(&fs->canEdit) != 0) {
3,892,545✔
997
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
998
  }
999
  return code;
3,892,545✔
1000
}
1001

UNCOV
1002
int32_t tsdbFSEditAbort(STFileSystem *fs) {
×
UNCOV
1003
  int32_t code = abort_edit(fs);
×
UNCOV
1004
  if (tsem_post(&fs->canEdit) != 0) {
×
1005
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
1006
  }
UNCOV
1007
  return code;
×
1008
}
1009

1010
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
35,063,042✔
1011
  STFileSet   tfset = {.fid = fid};
35,063,042✔
1012
  STFileSet  *pset = &tfset;
35,065,729✔
1013
  STFileSet **fsetPtr = TARRAY2_SEARCH(fs->fSetArr, &pset, tsdbTFileSetCmprFn, TD_EQ);
35,065,850✔
1014
  fset[0] = (fsetPtr == NULL) ? NULL : fsetPtr[0];
35,065,250✔
1015
}
35,071,014✔
1016

1017
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
15,391✔
1018
  int32_t    code = 0;
15,391✔
1019
  STFileSet *fset;
1020
  STFileSet *fset1;
15,391✔
1021

1022
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
15,391✔
1023
  if (fsetArr[0] == NULL) return terrno;
15,391✔
1024

1025
  TARRAY2_INIT(fsetArr[0]);
15,391✔
1026

1027
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
15,391✔
1028
  TARRAY2_FOREACH(fs->fSetArr, fset) {
15,391✔
1029
    code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1);
×
1030
    if (code) break;
×
1031

1032
    code = TARRAY2_APPEND(fsetArr[0], fset1);
×
1033
    if (code) break;
×
1034
  }
1035
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
15,391✔
1036

1037
  if (code) {
15,391✔
1038
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1039
    taosMemoryFree(fsetArr[0]);
×
1040
    fsetArr[0] = NULL;
×
1041
  }
1042
  return code;
15,391✔
1043
}
1044

1045
void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
16,674✔
1046
  if (fsetArr[0]) {
16,674✔
1047
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
17,957✔
1048
    taosMemoryFree(fsetArr[0]);
16,674✔
1049
    fsetArr[0] = NULL;
16,674✔
1050
  }
1051
}
16,674✔
1052

1053
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
18,045✔
1054
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
18,045✔
1055
  int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr);
18,045✔
1056
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
18,045✔
1057
  return code;
18,045✔
1058
}
1059

1060
int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr) {
125,786,975✔
1061
  int32_t    code = 0;
125,786,975✔
1062
  STFileSet *fset, *fset1;
125,752,209✔
1063

1064
  fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0]));
125,814,956✔
1065
  if (fsetArr[0] == NULL) return terrno;
125,804,463✔
1066

1067
  TARRAY2_FOREACH(fs->fSetArr, fset) {
301,556,932✔
1068
    code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1);
175,777,765✔
1069
    if (code) break;
175,782,461✔
1070

1071
    code = TARRAY2_APPEND(fsetArr[0], fset1);
175,782,461✔
1072
    if (code) {
175,794,593✔
1073
      tsdbTFileSetClear(&fset1);
×
1074
      break;
×
1075
    }
1076
  }
1077

1078
  if (code) {
125,801,601✔
1079
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1080
    taosMemoryFree(fsetArr[0]);
×
1081
    fsetArr[0] = NULL;
×
1082
  }
1083
  return code;
125,801,601✔
1084
}
1085

1086
void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
125,838,331✔
1087
  if (fsetArr[0]) {
125,838,331✔
1088
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
301,672,037✔
1089
    taosMemoryFreeClear(fsetArr[0]);
125,843,837✔
1090
    fsetArr[0] = NULL;
125,839,773✔
1091
  }
1092
}
125,822,813✔
1093

1094
static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) {
2,566✔
1095
  int32_t   capacity = TARRAY2_SIZE(pRanges) * 2;
2,566✔
1096
  SHashObj *pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
2,566✔
1097
  if (pHash == NULL) {
2,566✔
1098
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1099
    return NULL;
×
1100
  }
1101

1102
  for (int32_t i = 0; i < TARRAY2_SIZE(pRanges); i++) {
5,132✔
1103
    STFileSetRange *u = TARRAY2_GET(pRanges, i);
2,566✔
1104
    int32_t         fid = u->fid;
2,566✔
1105
    int32_t         code = taosHashPut(pHash, &fid, sizeof(fid), u, sizeof(*u));
2,566✔
1106
    tsdbDebug("range diff hash fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
2,566✔
1107
  }
1108
  return pHash;
2,566✔
1109
}
1110

1111
int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr,
1,283✔
1112
                                       TFileOpArray *fopArr) {
1113
  int32_t    code = 0;
1,283✔
1114
  STFileSet *fset;
1115
  STFileSet *fset1;
1,283✔
1116
  SHashObj  *pHash = NULL;
1,283✔
1117

1118
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
1,283✔
1119
  if (fsetArr == NULL) return terrno;
1,283✔
1120
  TARRAY2_INIT(fsetArr[0]);
1,283✔
1121

1122
  if (pRanges) {
1,283✔
1123
    pHash = tsdbFSetRangeArrayToHash(pRanges);
1,283✔
1124
    if (pHash == NULL) {
1,283✔
1125
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1126
      goto _out;
×
1127
    }
1128
  }
1129

1130
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
1,283✔
1131
  TARRAY2_FOREACH(fs->fSetArr, fset) {
2,566✔
1132
    int64_t ever = VERSION_MAX;
1,283✔
1133
    if (pHash) {
1,283✔
1134
      int32_t         fid = fset->fid;
1,283✔
1135
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
1,283✔
1136
      if (u) {
1,283✔
1137
        ever = u->sver - 1;
1,283✔
1138
      }
1139
    }
1140

1141
    code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr);
1,283✔
1142
    if (code) break;
1,283✔
1143

1144
    code = TARRAY2_APPEND(fsetArr[0], fset1);
1,283✔
1145
    if (code) break;
1,283✔
1146
  }
1147
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
1,283✔
1148

1149
_out:
1,283✔
1150
  if (code) {
1,283✔
1151
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1152
    taosMemoryFree(fsetArr[0]);
×
1153
    fsetArr[0] = NULL;
×
1154
  }
1155
  if (pHash) {
1,283✔
1156
    taosHashCleanup(pHash);
1,283✔
1157
    pHash = NULL;
1,283✔
1158
  }
1159
  return code;
1,283✔
1160
}
1161

1162
void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); }
1,283✔
1163

1164
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
1,283✔
1165
                                      TFileSetRangeArray **fsrArr) {
1166
  int32_t         code = 0;
1,283✔
1167
  STFileSet      *fset;
1168
  STFileSetRange *fsr1 = NULL;
1,283✔
1169
  SHashObj       *pHash = NULL;
1,283✔
1170

1171
  fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
1,283✔
1172
  if (fsrArr[0] == NULL) {
1,283✔
1173
    code = terrno;
×
1174
    goto _out;
×
1175
  }
1176

1177
  tsdbInfo("pRanges size:%d", (pRanges == NULL ? 0 : TARRAY2_SIZE(pRanges)));
1,283✔
1178
  if (pRanges) {
1,283✔
1179
    pHash = tsdbFSetRangeArrayToHash(pRanges);
1,283✔
1180
    if (pHash == NULL) {
1,283✔
1181
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1182
      goto _out;
×
1183
    }
1184
  }
1185

1186
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
1,283✔
1187
  TARRAY2_FOREACH(fs->fSetArr, fset) {
2,566✔
1188
    int64_t sver1 = sver;
1,283✔
1189
    int64_t ever1 = ever;
1,283✔
1190

1191
    if (pHash) {
1,283✔
1192
      int32_t         fid = fset->fid;
1,283✔
1193
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
1,283✔
1194
      if (u) {
1,283✔
1195
        sver1 = u->sver;
1,283✔
1196
        tsdbDebug("range hash get fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
1,283✔
1197
      }
1198
    }
1199

1200
    if (sver1 > ever1) {
1,283✔
1201
      tsdbDebug("skip fid:%d, sver:%" PRId64 ", ever:%" PRId64, fset->fid, sver1, ever1);
×
1202
      continue;
×
1203
    }
1204

1205
    tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
1,283✔
1206

1207
    code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
1,283✔
1208
    if (code) break;
1,283✔
1209

1210
    code = TARRAY2_APPEND(fsrArr[0], fsr1);
1,283✔
1211
    if (code) break;
1,283✔
1212

1213
    fsr1 = NULL;
1,283✔
1214
  }
1215
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
1,283✔
1216

1217
  if (code) {
1,283✔
1218
    tsdbTFileSetRangeClear(&fsr1);
×
1219
    TARRAY2_DESTROY(fsrArr[0], tsdbTFileSetRangeClear);
×
1220
    fsrArr[0] = NULL;
×
1221
  }
1222

1223
_out:
1,283✔
1224
  if (pHash) {
1,283✔
1225
    taosHashCleanup(pHash);
1,283✔
1226
    pHash = NULL;
1,283✔
1227
  }
1228
  return code;
1,283✔
1229
}
1230

1231
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { tsdbTFileSetRangeArrayDestroy(fsrArr); }
1,283✔
1232

1233
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task, STFileSet **fset) {
16,473,267✔
1234
  // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
1235
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
16,473,267✔
1236

1237
  tsdbFSGetFSet(tsdb->pFS, fid, fset);
16,478,133✔
1238
  if (*fset == NULL) {
16,478,405✔
1239
    return;
13,097,936✔
1240
  }
1241

1242
  struct STFileSetCond *cond = NULL;
3,380,469✔
1243
  if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
3,380,469✔
1244
    cond = &(*fset)->conds[0];
2,206,572✔
1245
  } else {
1246
    cond = &(*fset)->conds[1];
1,173,897✔
1247
  }
1248

1249
  while (1) {
1250
    if (cond->running) {
3,382,627✔
1251
      cond->numWait++;
2,649✔
1252
      (void)taosThreadCondWait(&cond->cond, &tsdb->mutex);
2,649✔
1253
      cond->numWait--;
2,649✔
1254
    } else {
1255
      cond->running = true;
3,379,978✔
1256
      break;
3,379,978✔
1257
    }
1258
  }
1259

1260
  tsdbTrace("vgId:%d begin %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
3,379,978✔
1261
  return;
3,379,978✔
1262
}
1263

1264
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task) {
3,380,469✔
1265
  // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
1266
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
3,380,469✔
1267

1268
  STFileSet *fset = NULL;
3,380,469✔
1269
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
3,380,469✔
1270
  if (fset == NULL) {
3,380,469✔
1271
    return;
×
1272
  }
1273

1274
  struct STFileSetCond *cond = NULL;
3,380,469✔
1275
  if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
3,380,469✔
1276
    cond = &fset->conds[0];
2,206,081✔
1277
  } else {
1278
    cond = &fset->conds[1];
1,174,388✔
1279
  }
1280

1281
  cond->running = false;
3,380,469✔
1282
  if (cond->numWait > 0) {
3,380,469✔
1283
    (void)taosThreadCondSignal(&cond->cond);
2,649✔
1284
  }
1285

1286
  tsdbTrace("vgId:%d finish %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
3,380,469✔
1287
  return;
3,380,469✔
1288
}
1289

1290
struct SFileSetReader {
1291
  STsdb     *pTsdb;
1292
  STFileSet *pFileSet;
1293
  int32_t    fid;
1294
  int64_t    startTime;
1295
  int64_t    endTime;
1296
  int64_t    lastCompactTime;
1297
  int64_t    totalSize;
1298
};
1299

1300
int32_t tsdbFileSetReaderOpen(void *pVnode, struct SFileSetReader **ppReader) {
331✔
1301
  if (pVnode == NULL || ppReader == NULL) {
331✔
1302
    return TSDB_CODE_INVALID_PARA;
×
1303
  }
1304

1305
  STsdb *pTsdb = ((SVnode *)pVnode)->pTsdb;
331✔
1306

1307
  (*ppReader) = taosMemoryCalloc(1, sizeof(struct SFileSetReader));
331✔
1308
  if (*ppReader == NULL) {
331✔
1309
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, __LINE__,
×
1310
              tstrerror(terrno));
1311
    return terrno;
×
1312
  }
1313

1314
  (*ppReader)->pTsdb = pTsdb;
331✔
1315
  (*ppReader)->fid = INT32_MIN;
331✔
1316
  (*ppReader)->pFileSet = NULL;
331✔
1317

1318
  return TSDB_CODE_SUCCESS;
331✔
1319
}
1320

1321
static int32_t tsdbFileSetReaderNextNoLock(struct SFileSetReader *pReader) {
662✔
1322
  STsdb  *pTsdb = pReader->pTsdb;
662✔
1323
  int32_t code = TSDB_CODE_SUCCESS;
662✔
1324

1325
  tsdbTFileSetClear(&pReader->pFileSet);
662✔
1326

1327
  STFileSet *fset = &(STFileSet){
662✔
1328
      .fid = pReader->fid,
662✔
1329
  };
1330

1331
  STFileSet **fsetPtr = TARRAY2_SEARCH(pReader->pTsdb->pFS->fSetArr, &fset, tsdbTFileSetCmprFn, TD_GT);
662✔
1332
  if (fsetPtr == NULL) {
662✔
1333
    pReader->fid = INT32_MAX;
331✔
1334
    return TSDB_CODE_NOT_FOUND;
331✔
1335
  }
1336

1337
  // ref file set
1338
  code = tsdbTFileSetInitRef(pReader->pTsdb, *fsetPtr, &pReader->pFileSet);
331✔
1339
  if (code) return code;
331✔
1340

1341
  // get file set details
1342
  pReader->fid = pReader->pFileSet->fid;
331✔
1343
  tsdbFidKeyRange(pReader->fid, pTsdb->keepCfg.days, pTsdb->keepCfg.precision, &pReader->startTime, &pReader->endTime);
331✔
1344
  pReader->lastCompactTime = pReader->pFileSet->lastCompact;
331✔
1345
  pReader->totalSize = 0;
331✔
1346
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
1,655✔
1347
    STFileObj *fobj = pReader->pFileSet->farr[i];
1,324✔
1348
    if (fobj) {
1,324✔
1349
      pReader->totalSize += fobj->f->size;
×
1350
    }
1351
  }
1352
  SSttLvl *lvl;
1353
  TARRAY2_FOREACH(pReader->pFileSet->lvlArr, lvl) {
662✔
1354
    STFileObj *fobj;
1355
    TARRAY2_FOREACH(lvl->fobjArr, fobj) { pReader->totalSize += fobj->f->size; }
662✔
1356
  }
1357

1358
  return code;
331✔
1359
}
1360

1361
int32_t tsdbFileSetReaderNext(struct SFileSetReader *pReader) {
662✔
1362
  int32_t code = TSDB_CODE_SUCCESS;
662✔
1363
  (void)taosThreadMutexLock(&pReader->pTsdb->mutex);
662✔
1364
  code = tsdbFileSetReaderNextNoLock(pReader);
662✔
1365
  (void)taosThreadMutexUnlock(&pReader->pTsdb->mutex);
662✔
1366
  return code;
662✔
1367
}
1368

1369
extern bool tsdbShouldCompact(STFileSet *fset, int32_t vgId, int32_t expLevel, ETsdbOpType type);
1370
int32_t tsdbFileSetGetEntryField(struct SFileSetReader *pReader, const char *field, void *value) {
1,986✔
1371
  const char *fieldName;
1372

1373
  if (pReader->fid == INT32_MIN || pReader->fid == INT32_MAX) {
1,986✔
1374
    return TSDB_CODE_INVALID_PARA;
×
1375
  }
1376

1377
  fieldName = "fileset_id";
1,986✔
1378
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
1,986✔
1379
    *(int32_t *)value = pReader->fid;
331✔
1380
    return TSDB_CODE_SUCCESS;
331✔
1381
  }
1382

1383
  fieldName = "start_time";
1,655✔
1384
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
1,655✔
1385
    *(int64_t *)value = pReader->startTime;
331✔
1386
    return TSDB_CODE_SUCCESS;
331✔
1387
  }
1388

1389
  fieldName = "end_time";
1,324✔
1390
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
1,324✔
1391
    *(int64_t *)value = pReader->endTime;
331✔
1392
    return TSDB_CODE_SUCCESS;
331✔
1393
  }
1394

1395
  fieldName = "total_size";
993✔
1396
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
993✔
1397
    *(int64_t *)value = pReader->totalSize;
331✔
1398
    return TSDB_CODE_SUCCESS;
331✔
1399
  }
1400

1401
  fieldName = "last_compact_time";
662✔
1402
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
662✔
1403
    *(int64_t *)value = pReader->lastCompactTime;
331✔
1404
    return TSDB_CODE_SUCCESS;
331✔
1405
  }
1406

1407
  fieldName = "should_compact";
331✔
1408
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
331✔
1409
    *(bool *)value = false;
331✔
1410
#ifdef TD_ENTERPRISE
1411
    *(bool *)value = tsdbShouldCompact(pReader->pFileSet, pReader->pTsdb->pVnode->config.vgId, 0, TSDB_OPTR_NORMAL);
331✔
1412
#endif
1413
    return TSDB_CODE_SUCCESS;
331✔
1414
  }
1415

1416
  fieldName = "details";
×
1417
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1418
    // TODO
1419
    return TSDB_CODE_SUCCESS;
×
1420
  }
1421

1422
  return TSDB_CODE_INVALID_PARA;
×
1423
}
1424

1425
void tsdbFileSetReaderClose(struct SFileSetReader **ppReader) {
331✔
1426
  if (ppReader == NULL || *ppReader == NULL) {
331✔
1427
    return;
×
1428
  }
1429

1430
  tsdbTFileSetClear(&(*ppReader)->pFileSet);
331✔
1431
  taosMemoryFree(*ppReader);
331✔
1432

1433
  *ppReader = NULL;
331✔
1434
  return;
331✔
1435
}
1436

1437
static FORCE_INLINE void getLevelSize(const STFileObj *fObj, int64_t szArr[TFS_MAX_TIERS]) {
1438
  if (fObj == NULL) return;
10,947,874✔
1439

1440
  int64_t sz = fObj->f->size;
8,710,608✔
1441
  // level == 0, primary storage
1442
  // level == 1, second storage,
1443
  // level == 2, third storage
1444
  int32_t level = fObj->f->did.level;
8,710,608✔
1445
  if (level >= 0 && level < TFS_MAX_TIERS) {
8,710,608✔
1446
    szArr[level] += sz;
8,710,608✔
1447
  }
1448
}
1449

1450
static FORCE_INLINE int32_t tsdbGetFsSizeImpl(STsdb *tsdb, SDbSizeStatisInfo *pInfo) {
1451
  int32_t code = 0;
6,510,882✔
1452
  int64_t levelSize[TFS_MAX_TIERS] = {0};
6,510,882✔
1453
  int64_t ssSize = 0;
6,510,882✔
1454

1455
  const STFileSet *fset;
1456
  const SSttLvl   *stt = NULL;
6,510,882✔
1457
  const STFileObj *fObj = NULL;
6,510,882✔
1458

1459
  SVnodeCfg *pCfg = &tsdb->pVnode->config;
6,510,882✔
1460
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
6,510,882✔
1461

1462
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
10,327,178✔
1463
    for (int32_t t = TSDB_FTYPE_MIN; t < TSDB_FTYPE_MAX; ++t) {
19,081,480✔
1464
      getLevelSize(fset->farr[t], levelSize);
15,265,184✔
1465
    }
1466

1467
    TARRAY2_FOREACH(fset->lvlArr, stt) {
5,921,656✔
1468
      TARRAY2_FOREACH(stt->fobjArr, fObj) { getLevelSize(fObj, levelSize); }
4,302,009✔
1469
    }
1470

1471
    fObj = fset->farr[TSDB_FTYPE_DATA];
3,816,296✔
1472
    if (fObj) {
3,816,296✔
1473
      int32_t lcn = fObj->f->lcn;
2,157,521✔
1474
      if (lcn > 1) {
2,157,521✔
1475
        ssSize += ((lcn - 1) * chunksize);
×
1476
      }
1477
    }
1478
  }
1479

1480
  pInfo->l1Size = levelSize[0];
6,510,882✔
1481
  pInfo->l2Size = levelSize[1];
6,510,882✔
1482
  pInfo->l3Size = levelSize[2];
6,510,882✔
1483
  pInfo->ssSize = ssSize;
6,510,882✔
1484
  return code;
6,510,882✔
1485
}
1486
int32_t tsdbGetFsSize(STsdb *tsdb, SDbSizeStatisInfo *pInfo) {
6,510,882✔
1487
  int32_t code = 0;
6,510,882✔
1488

1489
  (void)taosThreadMutexLock(&tsdb->mutex);
6,510,882✔
1490
  code = tsdbGetFsSizeImpl(tsdb, pInfo);
6,510,882✔
1491
  (void)taosThreadMutexUnlock(&tsdb->mutex);
6,510,882✔
1492
  return code;
6,510,882✔
1493
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc