• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4864

26 Nov 2025 05:46AM UTC coverage: 64.548% (+0.009%) from 64.539%
#4864

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

769 of 945 new or added lines in 33 files covered. (81.38%)

3006 existing lines in 116 files now uncovered.

158227 of 245129 relevant lines covered (64.55%)

111826500.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.96
/source/dnode/vnode/src/tsdb/tsdbFS2.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cos.h"
17
#include "tsdbFS2.h"
18
#include "tsdbUpgrade.h"
19
#include "vnd.h"
20

21
#define BLOCK_COMMIT_FACTOR 3
22

23
typedef struct STFileHashEntry {
24
  struct STFileHashEntry *next;
25
  char                    fname[TSDB_FILENAME_LEN];
26
} STFileHashEntry;
27

28
typedef struct {
29
  int32_t           numFile;
30
  int32_t           numBucket;
31
  STFileHashEntry **buckets;
32
} STFileHash;
33

34
static const char *gCurrentFname[] = {
35
    [TSDB_FCURRENT] = "current.json",
36
    [TSDB_FCURRENT_C] = "current.c.json",
37
    [TSDB_FCURRENT_M] = "current.m.json",
38
};
39

40
static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
4,168,902✔
41
  fs[0] = taosMemoryCalloc(1, sizeof(*fs[0]));
4,168,902✔
42
  if (fs[0] == NULL) {
4,174,777✔
43
    return terrno;
×
44
  }
45

46
  fs[0]->tsdb = pTsdb;
4,174,331✔
47
  int32_t code = tsem_init(&fs[0]->canEdit, 0, 1);
4,175,201✔
48
  if (code) {
4,173,107✔
49
    taosMemoryFree(fs[0]);
×
50
    return code;
×
51
  }
52

53
  fs[0]->fsstate = TSDB_FS_STATE_NORMAL;
4,173,107✔
54
  fs[0]->neid = 0;
4,172,727✔
55
  TARRAY2_INIT(fs[0]->fSetArr);
4,173,822✔
56
  TARRAY2_INIT(fs[0]->fSetArrTmp);
4,173,060✔
57

58
  return 0;
4,171,695✔
59
}
60

61
static void destroy_fs(STFileSystem **fs) {
4,175,359✔
62
  if (fs[0] == NULL) return;
4,175,359✔
63

64
  TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
4,175,359✔
65
  TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
4,175,811✔
66
  if (tsem_destroy(&fs[0]->canEdit) != 0) {
4,176,099✔
67
    tsdbError("failed to destroy semaphore");
×
68
  }
69
  taosMemoryFree(fs[0]);
4,174,615✔
70
  fs[0] = NULL;
4,175,359✔
71
}
72

73
void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
25,560,550✔
74
  int32_t offset = 0;
25,560,550✔
75

76
  vnodeGetPrimaryPath(pTsdb->pVnode, false, fname, TSDB_FILENAME_LEN);
25,560,550✔
77
  offset = strlen(fname);
25,558,623✔
78
  snprintf(fname + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s%s", TD_DIRSEP, pTsdb->name, TD_DIRSEP,
25,559,799✔
79
           gCurrentFname[ftype]);
25,530,945✔
80
}
25,558,780✔
81

82
static int32_t save_json(const cJSON *json, const char *fname) {
6,959,416✔
83
  int32_t   code = 0;
6,959,416✔
84
  int32_t   lino;
85
  char     *data = NULL;
6,959,416✔
86
  TdFilePtr fp = NULL;
6,959,416✔
87

88
  data = cJSON_PrintUnformatted(json);
6,959,416✔
89
  if (data == NULL) {
6,960,501✔
90
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
91
  }
92

93
  fp = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
6,960,501✔
94
  if (fp == NULL) {
6,961,218✔
95
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
96
  }
97

98
  if (taosWriteFile(fp, data, strlen(data)) < 0) {
6,961,218✔
99
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
100
  }
101

102
  if (taosFsyncFile(fp) < 0) {
6,960,622✔
103
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
104
  }
105

106
_exit:
6,960,529✔
107
  if (code) {
6,960,590✔
108
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
109
  }
110
  taosMemoryFree(data);
6,960,590✔
111
  taosCloseFileWithLog(&fp);
6,959,843✔
112
  return code;
6,959,315✔
113
}
114

115
static int32_t load_json(const char *fname, cJSON **json) {
1,170,588✔
116
  int32_t code = 0;
1,170,588✔
117
  int32_t lino;
118
  char   *data = NULL;
1,170,588✔
119

120
  TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
1,170,588✔
121
  if (fp == NULL) {
1,170,588✔
122
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
123
  }
124

125
  int64_t size;
1,169,637✔
126
  code = taosFStatFile(fp, &size, NULL);
1,170,588✔
127
  if (code != 0) {
1,170,558✔
128
    TSDB_CHECK_CODE(code, lino, _exit);
×
129
  }
130

131
  data = taosMemoryMalloc(size + 1);
1,170,558✔
132
  if (data == NULL) {
1,170,588✔
133
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
134
  }
135

136
  if (taosReadFile(fp, data, size) < 0) {
1,170,588✔
137
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
138
  }
139
  data[size] = '\0';
1,169,864✔
140

141
  json[0] = cJSON_Parse(data);
1,170,588✔
142
  if (json[0] == NULL) {
1,169,140✔
143
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
144
  }
145

146
_exit:
1,169,864✔
147
  if (code) {
1,169,864✔
148
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
149
    json[0] = NULL;
×
150
  }
151
  taosCloseFileWithLog(&fp);
1,169,864✔
152
  taosMemoryFree(data);
1,170,588✔
153
  return code;
1,170,588✔
154
}
155

156
int32_t save_fs(const TFileSetArray *arr, const char *fname) {
6,958,972✔
157
  int32_t code = 0;
6,958,972✔
158
  int32_t lino = 0;
6,958,972✔
159

160
  cJSON *json = cJSON_CreateObject();
6,958,972✔
161
  if (json == NULL) {
6,959,087✔
162
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
163
  }
164

165
  // fmtv
166
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
6,959,087✔
167
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
168
  }
169

170
  // fset
171
  cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
6,958,362✔
172
  if (!ajson) {
6,957,460✔
173
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
174
  }
175
  const STFileSet *fset;
176
  TARRAY2_FOREACH(arr, fset) {
25,561,216✔
177
    cJSON *item = cJSON_CreateObject();
18,602,312✔
178
    if (!item) {
18,604,332✔
179
      TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
180
    }
181
    (void)cJSON_AddItemToArray(ajson, item);
18,604,332✔
182

183
    code = tsdbTFileSetToJson(fset, item);
18,603,553✔
184
    TSDB_CHECK_CODE(code, lino, _exit);
18,603,756✔
185
  }
186

187
  code = save_json(json, fname);
6,959,425✔
188
  TSDB_CHECK_CODE(code, lino, _exit);
6,958,536✔
189

190
_exit:
6,958,536✔
191
  if (code) {
6,958,536✔
192
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
193
  }
194
  cJSON_Delete(json);
6,958,536✔
195
  return code;
6,959,725✔
196
}
197

198
static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
1,169,987✔
199
  int32_t code = 0;
1,169,987✔
200
  int32_t lino = 0;
1,169,987✔
201

202
  TARRAY2_CLEAR(arr, tsdbTFileSetClear);
1,169,987✔
203

204
  // load json
205
  cJSON *json = NULL;
1,170,588✔
206
  code = load_json(fname, &json);
1,170,588✔
207
  TSDB_CHECK_CODE(code, lino, _exit);
1,170,588✔
208

209
  // parse json
210
  const cJSON *item1;
211

212
  /* fmtv */
213
  item1 = cJSON_GetObjectItem(json, "fmtv");
1,170,588✔
214
  if (cJSON_IsNumber(item1)) {
1,169,864✔
215
    if (item1->valuedouble != 1) {
1,169,864✔
216
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
217
    }
218
  } else {
219
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
220
  }
221

222
  /* fset */
223
  item1 = cJSON_GetObjectItem(json, "fset");
1,169,864✔
224
  if (cJSON_IsArray(item1)) {
1,170,588✔
225
    const cJSON *item2;
226
    cJSON_ArrayForEach(item2, item1) {
1,909,895✔
227
      STFileSet *fset;
739,330✔
228
      code = tsdbJsonToTFileSet(pTsdb, item2, &fset);
740,031✔
229
      TSDB_CHECK_CODE(code, lino, _exit);
740,007✔
230

231
      code = TARRAY2_APPEND(arr, fset);
740,007✔
232
      TSDB_CHECK_CODE(code, lino, _exit);
740,031✔
233
    }
234
    TARRAY2_SORT(arr, tsdbTFileSetCmprFn);
1,170,588✔
235
  } else {
236
    code = TSDB_CODE_FILE_CORRUPTED;
×
237
    TSDB_CHECK_CODE(code, lino, _exit);
×
238
  }
239

240
_exit:
1,168,913✔
241
  if (code) {
1,169,864✔
242
    tsdbError("%s failed at %s:%d since %s, fname:%s", __func__, __FILE__, lino, tstrerror(code), fname);
×
243
  }
244
  if (json) {
1,169,864✔
245
    cJSON_Delete(json);
1,169,864✔
246
  }
247
  return code;
1,169,864✔
248
}
249

250
static int32_t apply_commit(STFileSystem *fs) {
3,954,779✔
251
  int32_t        code = 0;
3,954,779✔
252
  int32_t        lino;
253
  TFileSetArray *fsetArray1 = fs->fSetArr;
3,954,779✔
254
  TFileSetArray *fsetArray2 = fs->fSetArrTmp;
3,955,503✔
255
  int32_t        i1 = 0, i2 = 0;
3,955,503✔
256

257
  while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
22,637,426✔
258
    STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_GET(fsetArray1, i1) : NULL;
18,682,789✔
259
    STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_GET(fsetArray2, i2) : NULL;
18,681,598✔
260

261
    if (fset1 && fset2) {
18,681,256✔
262
      if (fset1->fid < fset2->fid) {
5,033,412✔
263
        // delete fset1
264
        tsdbTFileSetRemove(fset1);
72,372✔
265
        i1++;
72,372✔
266
      } else if (fset1->fid > fset2->fid) {
4,961,040✔
267
        // create new file set with fid of fset2->fid
268
        code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
10,977✔
269
        TSDB_CHECK_CODE(code, lino, _exit);
10,977✔
270
        code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
10,977✔
271
        TSDB_CHECK_CODE(code, lino, _exit);
10,977✔
272
        i1++;
10,977✔
273
        i2++;
10,977✔
274
      } else {
275
        // edit
276
        code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1);
4,950,063✔
277
        TSDB_CHECK_CODE(code, lino, _exit);
4,950,063✔
278
        i1++;
4,950,063✔
279
        i2++;
4,950,063✔
280
      }
281
    } else if (fset1) {
13,647,844✔
282
      // delete fset1
283
      tsdbTFileSetRemove(fset1);
5,927✔
284
      i1++;
5,927✔
285
    } else {
286
      // create new file set with fid of fset2->fid
287
      code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
13,641,917✔
288
      TSDB_CHECK_CODE(code, lino, _exit);
13,642,259✔
289
      code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
13,642,641✔
290
      TSDB_CHECK_CODE(code, lino, _exit);
13,642,641✔
291
      i1++;
13,642,641✔
292
      i2++;
13,642,641✔
293
    }
294
  }
295

296
_exit:
3,954,537✔
297
  if (code) {
3,954,537✔
298
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
299
  }
300
  return code;
3,954,537✔
301
}
302

303
static int32_t commit_edit(STFileSystem *fs) {
3,955,479✔
304
  char current[TSDB_FILENAME_LEN];
3,952,431✔
305
  char current_t[TSDB_FILENAME_LEN];
3,952,455✔
306

307
  current_fname(fs->tsdb, current, TSDB_FCURRENT);
3,955,503✔
308
  if (fs->etype == TSDB_FEDIT_COMMIT) {
3,954,724✔
309
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
2,780,778✔
310
  } else {
311
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
1,174,725✔
312
  }
313

314
  int32_t code;
315
  int32_t lino;
316
  TSDB_CHECK_CODE(taosRenameFile(current_t, current), lino, _exit);
3,954,724✔
317

318
  code = apply_commit(fs);
3,954,779✔
319
  TSDB_CHECK_CODE(code, lino, _exit);
3,955,104✔
320

321
_exit:
3,955,104✔
322
  if (code) {
3,954,480✔
323
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
324
              tstrerror(code));
325
  } else {
326
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
3,954,480✔
327
  }
328
  return code;
3,955,446✔
329
}
330

331
// static int32_t
332
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs);
333
static int32_t apply_abort(STFileSystem *fs) { return tsdbFSDoSanAndFix(fs); }
1,033✔
334

335
static int32_t abort_edit(STFileSystem *fs) {
1,033✔
336
  char fname[TSDB_FILENAME_LEN];
1,033✔
337

338
  if (fs->etype == TSDB_FEDIT_COMMIT) {
1,033✔
339
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
1,033✔
340
  } else {
341
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
×
342
  }
343

344
  int32_t code;
345
  int32_t lino;
346
  if ((code = taosRemoveFile(fname))) {
1,033✔
347
    code = TAOS_SYSTEM_ERROR(code);
×
348
    TSDB_CHECK_CODE(code, lino, _exit);
×
349
  }
350

351
  code = apply_abort(fs);
1,033✔
352
  TSDB_CHECK_CODE(code, lino, _exit);
1,033✔
353

354
_exit:
1,033✔
355
  if (code) {
1,033✔
356
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
357
              tstrerror(code));
358
  } else {
359
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
1,033✔
360
  }
361
  return code;
1,033✔
362
}
363

364
static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) {
1,402,001✔
365
  int32_t code = 0;
1,402,001✔
366
  int32_t lino = 0;
1,402,001✔
367

368
  // check file existence
369
  if (!taosCheckExistFile(fobj->fname)) {
1,402,001✔
370
    bool found = false;
×
371

372
    if (tsSsEnabled && fobj->f->lcn > 1) {
×
373
      char fname1[TSDB_FILENAME_LEN];
×
374
      tsdbTFileLastChunkName(fs->tsdb, fobj->f, fname1);
×
375
      if (!taosCheckExistFile(fname1)) {
×
376
        code = TSDB_CODE_FILE_CORRUPTED;
×
377
        tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fname1);
×
378
        return code;
×
379
      }
380

381
      found = true;
×
382
    }
383

384
    if (!found) {
×
385
      code = TSDB_CODE_FILE_CORRUPTED;
×
386
      tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fobj->fname);
×
387
      return code;
×
388
    }
389
  }
390

391
  return 0;
1,402,001✔
392
}
393

394
static void tsdbFSDestroyFileObjHash(STFileHash *hash);
395

396
static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname) {
2,568,366✔
397
  STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry));
2,568,366✔
398
  if (entry == NULL) return terrno;
2,568,366✔
399

400
  tstrncpy(entry->fname, fname, TSDB_FILENAME_LEN);
2,568,366✔
401

402
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
2,568,366✔
403

404
  entry->next = hash->buckets[idx];
2,568,366✔
405
  hash->buckets[idx] = entry;
2,568,366✔
406
  hash->numFile++;
2,568,366✔
407

408
  return 0;
2,568,366✔
409
}
410

411
static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
1,166,365✔
412
  int32_t code = 0;
1,166,365✔
413
  int32_t lino;
414
  char    fname[TSDB_FILENAME_LEN];
1,165,414✔
415

416
  // init hash table
417
  hash->numFile = 0;
1,166,365✔
418
  hash->numBucket = 4096;
1,166,365✔
419
  hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *));
1,166,365✔
420
  if (hash->buckets == NULL) {
1,166,365✔
421
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
422
  }
423

424
  // vnode.json
425
  current_fname(fs->tsdb, fname, TSDB_FCURRENT);
1,166,365✔
426
  code = tsdbFSAddEntryToFileObjHash(hash, fname);
1,166,365✔
427
  TSDB_CHECK_CODE(code, lino, _exit);
1,166,365✔
428

429
  // other
430
  STFileSet *fset = NULL;
1,166,365✔
431
  TARRAY2_FOREACH(fs->fSetArr, fset) {
1,904,077✔
432
    // data file
433
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
3,688,560✔
434
      if (fset->farr[i] != NULL) {
2,950,848✔
435
        code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname);
650,410✔
436
        TSDB_CHECK_CODE(code, lino, _exit);
650,410✔
437

438
        if (TSDB_FTYPE_DATA == i && fset->farr[i]->f->lcn > 0) {
650,410✔
439
          STFileObj *fobj = fset->farr[i];
×
440
          int32_t    lcn = fobj->f->lcn;
×
441
          char       lcn_name[TSDB_FILENAME_LEN];
×
442

443
          snprintf(lcn_name, TSDB_FQDN_LEN, "%s", fobj->fname);
×
444
          char *dot = strrchr(lcn_name, '.');
×
445
          if (dot) {
×
446
            snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lcn_name), "%d.data", lcn);
×
447

448
            code = tsdbFSAddEntryToFileObjHash(hash, lcn_name);
×
449
            TSDB_CHECK_CODE(code, lino, _exit);
×
450
          }
451
        }
452
      }
453
    }
454

455
    // stt file
456
    SSttLvl *lvl = NULL;
737,712✔
457
    TARRAY2_FOREACH(fset->lvlArr, lvl) {
1,422,026✔
458
      STFileObj *fobj;
459
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
1,435,905✔
460
        code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname);
751,591✔
461
        TSDB_CHECK_CODE(code, lino, _exit);
751,591✔
462
      }
463
    }
464
  }
465

466
_exit:
1,166,365✔
467
  if (code) {
1,166,365✔
468
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
469
    tsdbFSDestroyFileObjHash(hash);
×
470
  }
471
  return code;
1,166,365✔
472
}
473

474
static const STFileHashEntry *tsdbFSGetFileObjHashEntry(STFileHash *hash, const char *fname) {
2,569,293✔
475
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
2,569,293✔
476

477
  STFileHashEntry *entry = hash->buckets[idx];
2,569,293✔
478
  while (entry) {
2,571,244✔
479
    if (strcmp(entry->fname, fname) == 0) {
2,570,317✔
480
      return entry;
2,568,366✔
481
    }
482
    entry = entry->next;
2,313✔
483
  }
484

485
  return NULL;
927✔
486
}
487

488
static void tsdbFSDestroyFileObjHash(STFileHash *hash) {
1,166,365✔
489
  for (int32_t i = 0; i < hash->numBucket; i++) {
2,147,483,647✔
490
    STFileHashEntry *entry = hash->buckets[i];
2,147,483,647✔
491
    while (entry) {
2,147,483,647✔
492
      STFileHashEntry *next = entry->next;
2,568,366✔
493
      taosMemoryFree(entry);
2,568,366✔
494
      entry = next;
2,568,366✔
495
    }
496
  }
497
  taosMemoryFree(hash->buckets);
1,166,365✔
498
  memset(hash, 0, sizeof(*hash));
1,166,365✔
499
}
1,166,365✔
500

501
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
1,170,897✔
502
  int32_t code = 0;
1,170,897✔
503
  int32_t lino = 0;
1,170,897✔
504
  int32_t corrupt = false;
1,170,897✔
505

506
  if (fs->tsdb->pVnode->mounted) goto _exit;
1,170,897✔
507

508
  {  // scan each file
509
    STFileSet *fset = NULL;
1,165,641✔
510
    TARRAY2_FOREACH(fs->fSetArr, fset) {
1,903,353✔
511
      // data file
512
      for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
3,688,440✔
513
        if (fset->farr[ftype] == NULL) continue;
2,950,776✔
514
        STFileObj *fobj = fset->farr[ftype];
650,410✔
515
        code = tsdbFSDoScanAndFixFile(fs, fobj);
650,410✔
516
        if (code) {
650,410✔
517
          fset->maxVerValid = (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
518
          corrupt = true;
×
519
        }
520
      }
521

522
      // stt file
523
      SSttLvl *lvl;
524
      TARRAY2_FOREACH(fset->lvlArr, lvl) {
1,421,978✔
525
        STFileObj *fobj;
526
        TARRAY2_FOREACH(lvl->fobjArr, fobj) {
1,435,905✔
527
          code = tsdbFSDoScanAndFixFile(fs, fobj);
751,591✔
528
          if (code) {
751,591✔
529
            fset->maxVerValid =
×
530
                (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
531
            corrupt = true;
×
532
          }
533
        }
534
      }
535
    }
536
  }
537

538
  {  // clear unreferenced files
539
    STfsDir *dir = NULL;
1,165,641✔
540
    TAOS_CHECK_GOTO(tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path, &dir), &lino, _exit);
1,165,641✔
541

542
    STFileHash fobjHash = {0};
1,166,365✔
543
    code = tsdbFSCreateFileObjHash(fs, &fobjHash);
1,166,365✔
544
    if (code) goto _close_dir;
1,166,365✔
545

546
    for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) {
4,902,023✔
547
      if (taosIsDir(file->aname)) continue;
3,735,658✔
548

549
      if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
2,569,293✔
550
        tsdbRemoveFile(file->aname);
927✔
551
      }
552
    }
553

554
    tsdbFSDestroyFileObjHash(&fobjHash);
1,166,365✔
555

556
  _close_dir:
1,166,365✔
557
    tfsClosedir(dir);
1,166,365✔
558
  }
559

560
_exit:
1,171,621✔
561
  if (corrupt) {
1,171,621✔
562
    tsdbError("vgId:%d, TSDB file system is corrupted", TD_VID(fs->tsdb->pVnode));
×
563
    fs->fsstate = TSDB_FS_STATE_INCOMPLETE;
×
564
    code = 0;
×
565
  }
566

567
  if (code) {
1,171,621✔
568
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
569
  }
570
  return code;
1,171,621✔
571
}
572

573
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
1,169,864✔
574
  fs->neid = 0;
1,169,864✔
575

576
  // get max commit id
577
  const STFileSet *fset;
578
  TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
1,909,847✔
579

580
  // scan and fix
581
  int32_t code = 0;
1,169,864✔
582
  int32_t lino = 0;
1,169,864✔
583

584
  code = tsdbFSDoSanAndFix(fs);
1,169,864✔
585
  TSDB_CHECK_CODE(code, lino, _exit);
1,170,588✔
586

587
_exit:
1,170,588✔
588
  if (code) {
1,170,588✔
589
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
590
  }
591
  return code;
1,170,588✔
592
}
593

594
static int32_t tsdbFSDupState(STFileSystem *fs) {
5,123,553✔
595
  int32_t code;
596

597
  const TFileSetArray *src = fs->fSetArr;
5,123,553✔
598
  TFileSetArray       *dst = fs->fSetArrTmp;
5,122,820✔
599

600
  TARRAY2_CLEAR(dst, tsdbTFileSetClear);
10,089,868✔
601

602
  const STFileSet *fset1;
603
  TARRAY2_FOREACH(src, fset1) {
10,888,660✔
604
    STFileSet *fset2;
5,766,392✔
605
    code = tsdbTFileSetInitCopy(fs->tsdb, fset1, &fset2);
5,767,969✔
606
    if (code) return code;
5,768,702✔
607
    code = TARRAY2_APPEND(dst, fset2);
5,768,702✔
608
    if (code) return code;
5,768,076✔
609
  }
610

611
  return 0;
5,121,843✔
612
}
613

614
static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
4,167,480✔
615
  int32_t code = 0;
4,167,480✔
616
  int32_t lino = 0;
4,167,480✔
617
  STsdb  *pTsdb = fs->tsdb;
4,167,480✔
618

619
  char fCurrent[TSDB_FILENAME_LEN];
4,170,133✔
620
  char cCurrent[TSDB_FILENAME_LEN];
4,170,133✔
621
  char mCurrent[TSDB_FILENAME_LEN];
4,170,238✔
622

623
  current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
4,176,099✔
624
  current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
4,175,868✔
625
  current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
4,176,099✔
626

627
  if (taosCheckExistFile(fCurrent)) {  // current.json exists
4,175,817✔
628
    code = load_fs(pTsdb, fCurrent, fs->fSetArr);
1,170,588✔
629
    TSDB_CHECK_CODE(code, lino, _exit);
1,170,588✔
630

631
    if (taosCheckExistFile(cCurrent)) {
1,170,588✔
632
      // current.c.json exists
633

634
      fs->etype = TSDB_FEDIT_COMMIT;
724✔
635
      if (rollback) {
724✔
636
        code = abort_edit(fs);
724✔
637
        TSDB_CHECK_CODE(code, lino, _exit);
724✔
638
      } else {
639
        code = load_fs(pTsdb, cCurrent, fs->fSetArrTmp);
×
640
        TSDB_CHECK_CODE(code, lino, _exit);
×
641

642
        code = commit_edit(fs);
×
643
        TSDB_CHECK_CODE(code, lino, _exit);
×
644
      }
645
    } else if (taosCheckExistFile(mCurrent)) {
1,169,864✔
646
      // current.m.json exists
647
      fs->etype = TSDB_FEDIT_MERGE;
×
648
      code = abort_edit(fs);
×
649
      TSDB_CHECK_CODE(code, lino, _exit);
×
650
    }
651

652
    code = tsdbFSDupState(fs);
1,170,588✔
653
    TSDB_CHECK_CODE(code, lino, _exit);
1,169,864✔
654

655
    code = tsdbFSScanAndFix(fs);
1,169,864✔
656
    TSDB_CHECK_CODE(code, lino, _exit);
1,170,588✔
657
  } else {
658
    code = save_fs(fs->fSetArr, fCurrent);
3,005,511✔
659
    TSDB_CHECK_CODE(code, lino, _exit);
3,003,572✔
660
  }
661

662
_exit:
4,173,209✔
663
  if (code) {
4,173,896✔
664
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
665
  } else {
666
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
4,173,896✔
667
  }
668
  return code;
4,175,354✔
669
}
670

671
static void close_file_system(STFileSystem *fs) {
4,175,359✔
672
  TARRAY2_CLEAR(fs->fSetArr, tsdbTFileSetClear);
18,568,478✔
673
  TARRAY2_CLEAR(fs->fSetArrTmp, tsdbTFileSetClear);
18,544,853✔
674
}
4,175,359✔
675

676
static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) {
×
677
  if (pSet1->fid < pSet2->fid) {
×
678
    return -1;
×
679
  } else if (pSet1->fid > pSet2->fid) {
×
680
    return 1;
×
681
  }
682
  return 0;
×
683
}
684

685
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
3,953,356✔
686
  int32_t code = 0;
3,953,356✔
687
  int32_t lino = 0;
3,953,356✔
688

689
  code = tsdbFSDupState(fs);
3,953,356✔
690
  if (code) return code;
3,952,603✔
691

692
  TFileSetArray  *fsetArray = fs->fSetArrTmp;
3,952,603✔
693
  STFileSet      *fset = NULL;
3,951,571✔
694
  const STFileOp *op;
695
  int32_t         fid = INT32_MIN;
3,953,083✔
696
  TSKEY           now = taosGetTimestampMs();
3,953,179✔
697
  TARRAY2_FOREACH_PTR(opArray, op) {
25,713,517✔
698
    if (!fset || fset->fid != op->fid) {
21,763,689✔
699
      STFileSet tfset = {.fid = op->fid};
16,908,540✔
700
      fset = &tfset;
16,909,615✔
701
      STFileSet **fsetPtr = TARRAY2_SEARCH(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ);
16,908,451✔
702
      fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
16,908,451✔
703

704
      if (!fset) {
16,908,451✔
705
        code = tsdbTFileSetInit(op->fid, &fset);
13,651,307✔
706
        TSDB_CHECK_CODE(code, lino, _exit);
13,652,462✔
707

708
        code = TARRAY2_SORT_INSERT(fsetArray, fset, tsdbTFileSetCmprFn);
13,653,960✔
709
        TSDB_CHECK_CODE(code, lino, _exit);
13,653,960✔
710
      }
711
    }
712

713
    code = tsdbTFileSetEdit(fs->tsdb, fset, op);
21,764,333✔
714
    TSDB_CHECK_CODE(code, lino, _exit);
21,763,294✔
715

716
    if (fid != op->fid) {
21,763,294✔
717
      fid = op->fid;
16,905,857✔
718
      if (etype == TSDB_FEDIT_COMMIT) {
16,907,254✔
719
        fset->lastCommit = now;
15,732,529✔
720
      } else if (etype == TSDB_FEDIT_COMPACT) {
1,174,725✔
721
        fset->lastCompact = now;
118,583✔
722
      } else if (etype == TSDB_FEDIT_SSMIGRATE) {
1,056,142✔
723
        fset->lastMigrate = now;
×
724
      } else if (etype == TSDB_FEDIT_ROLLUP) {
1,056,142✔
725
        fset->lastRollupLevel = fs->rollupLevel;
35,834✔
726
        fset->lastRollup = now;
35,834✔
727
        fset->lastCompact = now;  // rollup implies compact
35,834✔
728
      }
729
    }
730
  }
731

732
  // remove empty empty stt level and empty file set
733
  int32_t i = 0;
3,950,696✔
734
  while (i < TARRAY2_SIZE(fsetArray)) {
22,625,549✔
735
    fset = TARRAY2_GET(fsetArray, i);
18,680,052✔
736

737
    SSttLvl *lvl;
738
    int32_t  j = 0;
18,679,998✔
739
    while (j < TARRAY2_SIZE(fset->lvlArr)) {
40,400,393✔
740
      lvl = TARRAY2_GET(fset->lvlArr, j);
21,721,805✔
741

742
      if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
21,719,258✔
743
        TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
1,415,498✔
744
      } else {
745
        j++;
20,304,897✔
746
      }
747
    }
748

749
    if (tsdbTFileSetIsEmpty(fset)) {
18,677,035✔
750
      TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear);
83,263✔
751
    } else {
752
      i++;
18,596,554✔
753
    }
754
  }
755

756
_exit:
3,949,173✔
757
  if (code) {
3,950,962✔
758
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
759
  }
760
  return code;
3,949,449✔
761
}
762

763
// return error code
764
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) {
4,167,387✔
765
  int32_t code;
766
  int32_t lino;
767

768
  code = tsdbCheckAndUpgradeFileSystem(pTsdb, rollback);
4,167,387✔
769
  TSDB_CHECK_CODE(code, lino, _exit);
4,175,530✔
770

771
  code = create_fs(pTsdb, fs);
4,175,530✔
772
  TSDB_CHECK_CODE(code, lino, _exit);
4,172,218✔
773

774
  code = open_fs(fs[0], rollback);
4,172,218✔
775
  TSDB_CHECK_CODE(code, lino, _exit);
4,176,099✔
776

777
_exit:
4,176,099✔
778
  if (code) {
4,176,099✔
779
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
780
    destroy_fs(fs);
×
781
  } else {
782
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
4,176,099✔
783
  }
784
  return code;
4,176,099✔
785
}
786

787
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block);
788
extern void tsdbStopAllCompTask(STsdb *tsdb);
789
extern void tsdbStopAllRetentionTask(STsdb *tsdb);
790

791
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
4,195,868✔
792
  STFileSystem *fs = pTsdb->pFS;
4,195,868✔
793
  SArray       *asyncTasks = taosArrayInit(0, sizeof(SVATaskID));
4,195,868✔
794
  if (asyncTasks == NULL) {
4,197,363✔
795
    return terrno;
×
796
  }
797

798
  (void)taosThreadMutexLock(&pTsdb->mutex);
4,197,363✔
799

800
  // disable
801
  pTsdb->bgTaskDisabled = true;
4,197,387✔
802

803
  // collect channel
804
  STFileSet *fset;
805
  TARRAY2_FOREACH(fs->fSetArr, fset) {
18,592,907✔
806
    if (taosArrayPush(asyncTasks, &fset->mergeTask) == NULL       //
28,790,263✔
807
        || taosArrayPush(asyncTasks, &fset->compactTask) == NULL  //
28,789,624✔
808
        || taosArrayPush(asyncTasks, &fset->retentionTask) == NULL
28,789,624✔
809
        || taosArrayPush(asyncTasks, &fset->migrateTask) == NULL) {
28,789,624✔
UNCOV
810
      taosArrayDestroy(asyncTasks);
×
811
      (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
812
      return terrno;
×
813
    }
814
    tsdbFSSetBlockCommit(fset, false);
14,394,812✔
815
  }
816

817
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
4,197,061✔
818

819
  // destroy all channels
820
  for (int32_t k = 0; k < 2; k++) {
12,590,588✔
821
    for (int32_t i = 0; i < taosArrayGetSize(asyncTasks); i++) {
123,554,264✔
822
      SVATaskID *task = taosArrayGet(asyncTasks, i);
115,158,845✔
823
      if (k == 0) {
115,161,965✔
824
        (void)vnodeACancel(task);
57,580,640✔
825
      } else {
826
        vnodeAWait(task);
57,581,325✔
827
      }
828
    }
829
  }
830
  taosArrayDestroy(asyncTasks);
4,196,754✔
831

832
#ifdef TD_ENTERPRISE
833
  tsdbStopAllCompTask(pTsdb);
4,197,387✔
834
#endif
835
  tsdbStopAllRetentionTask(pTsdb);
4,196,663✔
836
  return 0;
4,197,387✔
837
}
838

839
void tsdbEnableBgTask(STsdb *pTsdb) {
21,288✔
840
  (void)taosThreadMutexLock(&pTsdb->mutex);
21,288✔
841
  pTsdb->bgTaskDisabled = false;
21,288✔
842
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
21,288✔
843
}
21,288✔
844

845
void tsdbCloseFS(STFileSystem **fs) {
4,175,359✔
846
  if (fs[0] == NULL) return;
4,175,359✔
847

848
  int32_t code = tsdbDisableAndCancelAllBgTask((*fs)->tsdb);
4,175,359✔
849
  if (code) {
4,176,099✔
850
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID((*fs)->tsdb->pVnode), __func__, __LINE__,
×
851
              tstrerror(code));
852
  }
853
  close_file_system(fs[0]);
4,176,099✔
854
  destroy_fs(fs);
4,174,045✔
855
  return;
4,175,359✔
856
}
857

858
int64_t tsdbFSAllocEid(STFileSystem *fs) {
4,172,028✔
859
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
4,172,028✔
860
  int64_t cid = ++fs->neid;
4,172,687✔
861
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
4,172,687✔
862
  return cid;
4,172,687✔
863
}
864

865
void tsdbFSUpdateEid(STFileSystem *fs, int64_t cid) {
140,862✔
866
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
140,862✔
867
  fs->neid = TMAX(fs->neid, cid);
140,862✔
868
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
140,862✔
869
}
140,862✔
870

871
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
3,954,300✔
872
  int32_t code = 0;
3,954,300✔
873
  int32_t lino;
874
  char    current_t[TSDB_FILENAME_LEN];
3,951,252✔
875

876
  if (etype == TSDB_FEDIT_COMMIT) {
3,955,033✔
877
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
2,780,308✔
878
  } else {
879
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
1,174,725✔
880
  }
881

882
  if (tsem_wait(&fs->canEdit) != 0) {
3,955,079✔
883
    tsdbError("vgId:%d failed to wait semaphore", TD_VID(fs->tsdb->pVnode));
×
884
  }
885
  fs->etype = etype;
3,953,958✔
886

887
  // edit
888
  code = edit_fs(fs, opArray, etype);
3,955,470✔
889
  TSDB_CHECK_CODE(code, lino, _exit);
3,949,962✔
890

891
  // save fs
892
  code = save_fs(fs->fSetArrTmp, current_t);
3,949,962✔
893
  TSDB_CHECK_CODE(code, lino, _exit);
3,955,088✔
894

895
_exit:
3,955,088✔
896
  if (code) {
3,955,088✔
897
    tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, lino,
×
898
              tstrerror(code), etype);
899
  } else {
900
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, etype);
3,955,088✔
901
  }
902
  return code;
3,955,812✔
903
}
904

905
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
32,242,796✔
906
  if (block) {
32,242,796✔
907
    fset->blockCommit = true;
7,521✔
908
  } else {
909
    fset->blockCommit = false;
32,235,275✔
910
    if (fset->numWaitCommit > 0) {
32,235,275✔
911
      (void)taosThreadCondSignal(&fset->canCommit);
2,958✔
912
    }
913
  }
914
}
32,242,039✔
915

916
void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
15,731,504✔
917
  (void)taosThreadMutexLock(&tsdb->mutex);
15,731,504✔
918
  STFileSet *fset;
15,726,772✔
919
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
15,730,002✔
920
  bool blockCommit = false;
15,724,033✔
921
  if (fset) {
15,724,033✔
922
    blockCommit = fset->blockCommit;
2,107,850✔
923
  }
924
  if (fset) {
15,724,657✔
925
    METRICS_TIMING_BLOCK(tsdb->pVnode->writeMetrics.block_commit_time, METRIC_LEVEL_HIGH, {
2,110,808✔
926
      while (fset->blockCommit) {
927
        fset->numWaitCommit++;
928
        (void)taosThreadCondWait(&fset->canCommit, &tsdb->mutex);
929
        fset->numWaitCommit--;
930
      }
931
    });
932
  }
933
  if (blockCommit) {
15,721,082✔
934
    METRICS_UPDATE(tsdb->pVnode->writeMetrics.blocked_commit_count, METRIC_LEVEL_HIGH, 1);
2,958✔
935
  }
936
  (void)taosThreadMutexUnlock(&tsdb->mutex);
15,721,082✔
937
  return;
15,725,934✔
938
}
939

940
// IMPORTANT: the caller must hold fs->tsdb->mutex
941
int32_t tsdbFSEditCommit(STFileSystem *fs) {
3,955,503✔
942
  int32_t code = 0;
3,955,503✔
943
  int32_t lino = 0;
3,955,503✔
944

945
  // commit
946
  code = commit_edit(fs);
3,955,503✔
947
  TSDB_CHECK_CODE(code, lino, _exit);
3,955,503✔
948

949
  // schedule merge
950
  int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
3,955,503✔
951
  if (sttTrigger > 1 && !fs->tsdb->bgTaskDisabled) {
3,955,503✔
952
    STFileSet *fset;
953
    TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
21,619,995✔
954
      if (TARRAY2_SIZE(fset->lvlArr) == 0) {
17,847,984✔
955
        tsdbFSSetBlockCommit(fset, false);
885,301✔
956
        continue;
885,301✔
957
      }
958

959
      SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
16,962,683✔
960
      if (lvl->level != 0) {
16,962,683✔
961
        tsdbFSSetBlockCommit(fset, false);
948,019✔
962
        continue;
948,019✔
963
      }
964

965
      // bool    skipMerge = false;
966
      int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
16,014,664✔
967
      if (numFile >= sttTrigger && (!vnodeATaskValid(&fset->mergeTask))) {
16,014,664✔
968
        SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
1,013,977✔
969
        if (arg == NULL) {
1,013,977✔
970
          code = terrno;
×
971
          TSDB_CHECK_CODE(code, lino, _exit);
×
972
        }
973

974
        arg->tsdb = fs->tsdb;
1,013,977✔
975
        arg->fid = fset->fid;
1,013,977✔
976

977
        code = vnodeAsync(MERGE_TASK_ASYNC, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, &fset->mergeTask);
1,013,977✔
978
        TSDB_CHECK_CODE(code, lino, _exit);
1,013,977✔
979
      }
980

981
      if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
16,014,664✔
982
        tsdbFSSetBlockCommit(fset, true);
7,521✔
983
      } else {
984
        tsdbFSSetBlockCommit(fset, false);
16,007,143✔
985
      }
986
    }
987
  }
988

989
_exit:
3,955,503✔
990
  if (code) {
3,955,503✔
991
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
×
992
  } else {
993
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
3,955,503✔
994
  }
995
  if (tsem_post(&fs->canEdit) != 0) {
3,955,503✔
996
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
997
  }
998
  return code;
3,955,503✔
999
}
1000

1001
int32_t tsdbFSEditAbort(STFileSystem *fs) {
309✔
1002
  int32_t code = abort_edit(fs);
309✔
1003
  if (tsem_post(&fs->canEdit) != 0) {
309✔
1004
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
1005
  }
1006
  return code;
309✔
1007
}
1008

1009
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
36,267,042✔
1010
  STFileSet   tfset = {.fid = fid};
36,267,042✔
1011
  STFileSet  *pset = &tfset;
36,271,169✔
1012
  STFileSet **fsetPtr = TARRAY2_SEARCH(fs->fSetArr, &pset, tsdbTFileSetCmprFn, TD_EQ);
36,272,576✔
1013
  fset[0] = (fsetPtr == NULL) ? NULL : fsetPtr[0];
36,263,894✔
1014
}
36,266,525✔
1015

1016
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
15,855✔
1017
  int32_t    code = 0;
15,855✔
1018
  STFileSet *fset;
1019
  STFileSet *fset1;
15,855✔
1020

1021
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
15,855✔
1022
  if (fsetArr[0] == NULL) return terrno;
15,855✔
1023

1024
  TARRAY2_INIT(fsetArr[0]);
15,855✔
1025

1026
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
15,855✔
1027
  TARRAY2_FOREACH(fs->fSetArr, fset) {
15,855✔
1028
    code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1);
×
1029
    if (code) break;
×
1030

1031
    code = TARRAY2_APPEND(fsetArr[0], fset1);
×
1032
    if (code) break;
×
1033
  }
1034
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
15,855✔
1035

1036
  if (code) {
15,855✔
1037
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1038
    taosMemoryFree(fsetArr[0]);
×
1039
    fsetArr[0] = NULL;
×
1040
  }
1041
  return code;
15,855✔
1042
}
1043

1044
void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
16,818✔
1045
  if (fsetArr[0]) {
16,818✔
1046
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
17,781✔
1047
    taosMemoryFree(fsetArr[0]);
16,818✔
1048
    fsetArr[0] = NULL;
16,818✔
1049
  }
1050
}
16,818✔
1051

1052
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
19,180✔
1053
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
19,180✔
1054
  int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr);
19,180✔
1055
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
19,180✔
1056
  return code;
19,180✔
1057
}
1058

1059
int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr) {
127,807,930✔
1060
  int32_t    code = 0;
127,807,930✔
1061
  STFileSet *fset, *fset1;
127,774,199✔
1062

1063
  fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0]));
127,826,728✔
1064
  if (fsetArr[0] == NULL) return terrno;
127,822,596✔
1065

1066
  TARRAY2_FOREACH(fs->fSetArr, fset) {
329,884,582✔
1067
    code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1);
202,120,400✔
1068
    if (code) break;
202,110,113✔
1069

1070
    code = TARRAY2_APPEND(fsetArr[0], fset1);
202,110,113✔
1071
    if (code) {
202,107,449✔
1072
      tsdbTFileSetClear(&fset1);
×
1073
      break;
×
1074
    }
1075
  }
1076

1077
  if (code) {
127,814,925✔
1078
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1079
    taosMemoryFree(fsetArr[0]);
×
1080
    fsetArr[0] = NULL;
×
1081
  }
1082
  return code;
127,814,925✔
1083
}
1084

1085
void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
127,849,014✔
1086
  if (fsetArr[0]) {
127,849,014✔
1087
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
329,985,434✔
1088
    taosMemoryFreeClear(fsetArr[0]);
127,853,423✔
1089
    fsetArr[0] = NULL;
127,852,327✔
1090
  }
1091
}
127,830,878✔
1092

1093
static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) {
1,926✔
1094
  int32_t   capacity = TARRAY2_SIZE(pRanges) * 2;
1,926✔
1095
  SHashObj *pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,926✔
1096
  if (pHash == NULL) {
1,926✔
1097
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1098
    return NULL;
×
1099
  }
1100

1101
  for (int32_t i = 0; i < TARRAY2_SIZE(pRanges); i++) {
3,852✔
1102
    STFileSetRange *u = TARRAY2_GET(pRanges, i);
1,926✔
1103
    int32_t         fid = u->fid;
1,926✔
1104
    int32_t         code = taosHashPut(pHash, &fid, sizeof(fid), u, sizeof(*u));
1,926✔
1105
    tsdbDebug("range diff hash fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
1,926✔
1106
  }
1107
  return pHash;
1,926✔
1108
}
1109

1110
int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr,
963✔
1111
                                       TFileOpArray *fopArr) {
1112
  int32_t    code = 0;
963✔
1113
  STFileSet *fset;
1114
  STFileSet *fset1;
963✔
1115
  SHashObj  *pHash = NULL;
963✔
1116

1117
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
963✔
1118
  if (fsetArr == NULL) return terrno;
963✔
1119
  TARRAY2_INIT(fsetArr[0]);
963✔
1120

1121
  if (pRanges) {
963✔
1122
    pHash = tsdbFSetRangeArrayToHash(pRanges);
963✔
1123
    if (pHash == NULL) {
963✔
1124
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1125
      goto _out;
×
1126
    }
1127
  }
1128

1129
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
963✔
1130
  TARRAY2_FOREACH(fs->fSetArr, fset) {
1,926✔
1131
    int64_t ever = VERSION_MAX;
963✔
1132
    if (pHash) {
963✔
1133
      int32_t         fid = fset->fid;
963✔
1134
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
963✔
1135
      if (u) {
963✔
1136
        ever = u->sver - 1;
963✔
1137
      }
1138
    }
1139

1140
    code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr);
963✔
1141
    if (code) break;
963✔
1142

1143
    code = TARRAY2_APPEND(fsetArr[0], fset1);
963✔
1144
    if (code) break;
963✔
1145
  }
1146
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
963✔
1147

1148
_out:
963✔
1149
  if (code) {
963✔
1150
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1151
    taosMemoryFree(fsetArr[0]);
×
1152
    fsetArr[0] = NULL;
×
1153
  }
1154
  if (pHash) {
963✔
1155
    taosHashCleanup(pHash);
963✔
1156
    pHash = NULL;
963✔
1157
  }
1158
  return code;
963✔
1159
}
1160

1161
void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); }
963✔
1162

1163
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
963✔
1164
                                      TFileSetRangeArray **fsrArr) {
1165
  int32_t         code = 0;
963✔
1166
  STFileSet      *fset;
1167
  STFileSetRange *fsr1 = NULL;
963✔
1168
  SHashObj       *pHash = NULL;
963✔
1169

1170
  fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
963✔
1171
  if (fsrArr[0] == NULL) {
963✔
1172
    code = terrno;
×
1173
    goto _out;
×
1174
  }
1175

1176
  tsdbInfo("pRanges size:%d", (pRanges == NULL ? 0 : TARRAY2_SIZE(pRanges)));
963✔
1177
  if (pRanges) {
963✔
1178
    pHash = tsdbFSetRangeArrayToHash(pRanges);
963✔
1179
    if (pHash == NULL) {
963✔
1180
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1181
      goto _out;
×
1182
    }
1183
  }
1184

1185
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
963✔
1186
  TARRAY2_FOREACH(fs->fSetArr, fset) {
1,926✔
1187
    int64_t sver1 = sver;
963✔
1188
    int64_t ever1 = ever;
963✔
1189

1190
    if (pHash) {
963✔
1191
      int32_t         fid = fset->fid;
963✔
1192
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
963✔
1193
      if (u) {
963✔
1194
        sver1 = u->sver;
963✔
1195
        tsdbDebug("range hash get fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
963✔
1196
      }
1197
    }
1198

1199
    if (sver1 > ever1) {
963✔
1200
      tsdbDebug("skip fid:%d, sver:%" PRId64 ", ever:%" PRId64, fset->fid, sver1, ever1);
×
1201
      continue;
×
1202
    }
1203

1204
    tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
963✔
1205

1206
    code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
963✔
1207
    if (code) break;
963✔
1208

1209
    code = TARRAY2_APPEND(fsrArr[0], fsr1);
963✔
1210
    if (code) break;
963✔
1211

1212
    fsr1 = NULL;
963✔
1213
  }
1214
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
963✔
1215

1216
  if (code) {
963✔
1217
    tsdbTFileSetRangeClear(&fsr1);
×
1218
    TARRAY2_DESTROY(fsrArr[0], tsdbTFileSetRangeClear);
×
1219
    fsrArr[0] = NULL;
×
1220
  }
1221

1222
_out:
963✔
1223
  if (pHash) {
963✔
1224
    taosHashCleanup(pHash);
963✔
1225
    pHash = NULL;
963✔
1226
  }
1227
  return code;
963✔
1228
}
1229

1230
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { tsdbTFileSetRangeArrayDestroy(fsrArr); }
963✔
1231

1232
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task, STFileSet **fset) {
17,085,173✔
1233
  // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
1234
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
17,085,173✔
1235

1236
  tsdbFSGetFSet(tsdb->pFS, fid, fset);
17,087,101✔
1237
  if (*fset == NULL) {
17,087,101✔
1238
    return;
13,629,799✔
1239
  }
1240

1241
  struct STFileSetCond *cond = NULL;
3,457,302✔
1242
  if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
3,457,302✔
1243
    cond = &(*fset)->conds[0];
2,199,722✔
1244
  } else {
1245
    cond = &(*fset)->conds[1];
1,257,580✔
1246
  }
1247

1248
  while (1) {
1249
    if (cond->running) {
3,465,431✔
1250
      cond->numWait++;
8,129✔
1251
      (void)taosThreadCondWait(&cond->cond, &tsdb->mutex);
8,129✔
1252
      cond->numWait--;
8,129✔
1253
    } else {
1254
      cond->running = true;
3,457,302✔
1255
      break;
3,457,302✔
1256
    }
1257
  }
1258

1259
  tsdbTrace("vgId:%d begin %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
3,457,302✔
1260
  return;
3,456,569✔
1261
}
1262

1263
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task) {
3,457,302✔
1264
  // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
1265
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
3,457,302✔
1266

1267
  STFileSet *fset = NULL;
3,457,302✔
1268
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
3,457,302✔
1269
  if (fset == NULL) {
3,457,302✔
1270
    return;
×
1271
  }
1272

1273
  struct STFileSetCond *cond = NULL;
3,457,302✔
1274
  if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
3,457,302✔
1275
    cond = &fset->conds[0];
2,199,722✔
1276
  } else {
1277
    cond = &fset->conds[1];
1,257,580✔
1278
  }
1279

1280
  cond->running = false;
3,457,302✔
1281
  if (cond->numWait > 0) {
3,457,302✔
1282
    (void)taosThreadCondSignal(&cond->cond);
8,129✔
1283
  }
1284

1285
  tsdbTrace("vgId:%d finish %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
3,457,302✔
1286
  return;
3,457,302✔
1287
}
1288

1289
struct SFileSetReader {
1290
  STsdb     *pTsdb;
1291
  STFileSet *pFileSet;
1292
  int32_t    fid;
1293
  int64_t    startTime;
1294
  int64_t    endTime;
1295
  int64_t    lastCompactTime;
1296
  int64_t    totalSize;
1297
};
1298

1299
int32_t tsdbFileSetReaderOpen(void *pVnode, struct SFileSetReader **ppReader) {
342✔
1300
  if (pVnode == NULL || ppReader == NULL) {
342✔
1301
    return TSDB_CODE_INVALID_PARA;
×
1302
  }
1303

1304
  STsdb *pTsdb = ((SVnode *)pVnode)->pTsdb;
342✔
1305

1306
  (*ppReader) = taosMemoryCalloc(1, sizeof(struct SFileSetReader));
342✔
1307
  if (*ppReader == NULL) {
342✔
1308
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, __LINE__,
×
1309
              tstrerror(terrno));
1310
    return terrno;
×
1311
  }
1312

1313
  (*ppReader)->pTsdb = pTsdb;
342✔
1314
  (*ppReader)->fid = INT32_MIN;
342✔
1315
  (*ppReader)->pFileSet = NULL;
342✔
1316

1317
  return TSDB_CODE_SUCCESS;
342✔
1318
}
1319

1320
static int32_t tsdbFileSetReaderNextNoLock(struct SFileSetReader *pReader) {
684✔
1321
  STsdb  *pTsdb = pReader->pTsdb;
684✔
1322
  int32_t code = TSDB_CODE_SUCCESS;
684✔
1323

1324
  tsdbTFileSetClear(&pReader->pFileSet);
684✔
1325

1326
  STFileSet *fset = &(STFileSet){
684✔
1327
      .fid = pReader->fid,
684✔
1328
  };
1329

1330
  STFileSet **fsetPtr = TARRAY2_SEARCH(pReader->pTsdb->pFS->fSetArr, &fset, tsdbTFileSetCmprFn, TD_GT);
684✔
1331
  if (fsetPtr == NULL) {
684✔
1332
    pReader->fid = INT32_MAX;
342✔
1333
    return TSDB_CODE_NOT_FOUND;
342✔
1334
  }
1335

1336
  // ref file set
1337
  code = tsdbTFileSetInitRef(pReader->pTsdb, *fsetPtr, &pReader->pFileSet);
342✔
1338
  if (code) return code;
342✔
1339

1340
  // get file set details
1341
  pReader->fid = pReader->pFileSet->fid;
342✔
1342
  tsdbFidKeyRange(pReader->fid, pTsdb->keepCfg.days, pTsdb->keepCfg.precision, &pReader->startTime, &pReader->endTime);
342✔
1343
  if (pTsdb->keepCfg.precision == TSDB_TIME_PRECISION_MICRO) {
342✔
NEW
1344
    pReader->startTime /= 1000;
×
NEW
1345
    pReader->endTime /= 1000;
×
1346
  } else if (pTsdb->keepCfg.precision == TSDB_TIME_PRECISION_NANO) {
342✔
NEW
1347
    pReader->startTime /= 1000000;
×
NEW
1348
    pReader->endTime /= 1000000;
×
1349
  }
1350
  pReader->lastCompactTime = pReader->pFileSet->lastCompact;
342✔
1351
  pReader->totalSize = 0;
342✔
1352
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
1,710✔
1353
    STFileObj *fobj = pReader->pFileSet->farr[i];
1,368✔
1354
    if (fobj) {
1,368✔
1355
      pReader->totalSize += fobj->f->size;
×
1356
    }
1357
  }
1358
  SSttLvl *lvl;
1359
  TARRAY2_FOREACH(pReader->pFileSet->lvlArr, lvl) {
684✔
1360
    STFileObj *fobj;
1361
    TARRAY2_FOREACH(lvl->fobjArr, fobj) { pReader->totalSize += fobj->f->size; }
684✔
1362
  }
1363

1364
  return code;
342✔
1365
}
1366

1367
int32_t tsdbFileSetReaderNext(struct SFileSetReader *pReader) {
684✔
1368
  int32_t code = TSDB_CODE_SUCCESS;
684✔
1369
  (void)taosThreadMutexLock(&pReader->pTsdb->mutex);
684✔
1370
  code = tsdbFileSetReaderNextNoLock(pReader);
684✔
1371
  (void)taosThreadMutexUnlock(&pReader->pTsdb->mutex);
684✔
1372
  return code;
684✔
1373
}
1374

1375
extern bool tsdbShouldCompact(STFileSet *fset, int32_t vgId, int32_t expLevel, ETsdbOpType type);
1376
int32_t tsdbFileSetGetEntryField(struct SFileSetReader *pReader, const char *field, void *value) {
2,052✔
1377
  const char *fieldName;
1378

1379
  if (pReader->fid == INT32_MIN || pReader->fid == INT32_MAX) {
2,052✔
UNCOV
1380
    return TSDB_CODE_INVALID_PARA;
×
1381
  }
1382

1383
  fieldName = "fileset_id";
2,052✔
1384
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
2,052✔
1385
    *(int32_t *)value = pReader->fid;
342✔
1386
    return TSDB_CODE_SUCCESS;
342✔
1387
  }
1388

1389
  fieldName = "start_time";
1,710✔
1390
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
1,710✔
1391
    *(int64_t *)value = pReader->startTime;
342✔
1392
    return TSDB_CODE_SUCCESS;
342✔
1393
  }
1394

1395
  fieldName = "end_time";
1,368✔
1396
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
1,368✔
1397
    *(int64_t *)value = pReader->endTime;
342✔
1398
    return TSDB_CODE_SUCCESS;
342✔
1399
  }
1400

1401
  fieldName = "total_size";
1,026✔
1402
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
1,026✔
1403
    *(int64_t *)value = pReader->totalSize;
342✔
1404
    return TSDB_CODE_SUCCESS;
342✔
1405
  }
1406

1407
  fieldName = "last_compact_time";
684✔
1408
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
684✔
1409
    *(int64_t *)value = pReader->lastCompactTime;
342✔
1410
    return TSDB_CODE_SUCCESS;
342✔
1411
  }
1412

1413
  fieldName = "should_compact";
342✔
1414
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
342✔
1415
    *(bool *)value = false;
342✔
1416
#ifdef TD_ENTERPRISE
1417
    *(bool *)value = tsdbShouldCompact(pReader->pFileSet, pReader->pTsdb->pVnode->config.vgId, 0, TSDB_OPTR_NORMAL);
342✔
1418
#endif
1419
    return TSDB_CODE_SUCCESS;
342✔
1420
  }
1421

UNCOV
1422
  fieldName = "details";
×
UNCOV
1423
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1424
    // TODO
UNCOV
1425
    return TSDB_CODE_SUCCESS;
×
1426
  }
1427

UNCOV
1428
  return TSDB_CODE_INVALID_PARA;
×
1429
}
1430

1431
void tsdbFileSetReaderClose(struct SFileSetReader **ppReader) {
342✔
1432
  if (ppReader == NULL || *ppReader == NULL) {
342✔
UNCOV
1433
    return;
×
1434
  }
1435

1436
  tsdbTFileSetClear(&(*ppReader)->pFileSet);
342✔
1437
  taosMemoryFree(*ppReader);
342✔
1438

1439
  *ppReader = NULL;
342✔
1440
  return;
342✔
1441
}
1442

1443
static FORCE_INLINE void getLevelSize(const STFileObj *fObj, int64_t szArr[TFS_MAX_TIERS]) {
1444
  if (fObj == NULL) return;
12,748,716✔
1445

1446
  int64_t sz = fObj->f->size;
9,295,458✔
1447
  // level == 0, primary storage
1448
  // level == 1, second storage,
1449
  // level == 2, third storage
1450
  int32_t level = fObj->f->did.level;
9,295,458✔
1451
  if (level >= 0 && level < TFS_MAX_TIERS) {
9,295,458✔
1452
    szArr[level] += sz;
9,295,458✔
1453
  }
1454
}
1455

1456
static FORCE_INLINE int32_t tsdbGetFsSizeImpl(STsdb *tsdb, SDbSizeStatisInfo *pInfo) {
1457
  int32_t code = 0;
6,625,175✔
1458
  int64_t levelSize[TFS_MAX_TIERS] = {0};
6,625,175✔
1459
  int64_t ssSize = 0;
6,625,175✔
1460

1461
  const STFileSet *fset;
1462
  const SSttLvl   *stt = NULL;
6,625,175✔
1463
  const STFileObj *fObj = NULL;
6,625,175✔
1464

1465
  SVnodeCfg *pCfg = &tsdb->pVnode->config;
6,625,175✔
1466
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
6,625,175✔
1467

1468
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
10,873,869✔
1469
    for (int32_t t = TSDB_FTYPE_MIN; t < TSDB_FTYPE_MAX; ++t) {
21,243,470✔
1470
      getLevelSize(fset->farr[t], levelSize);
16,994,776✔
1471
    }
1472

1473
    TARRAY2_FOREACH(fset->lvlArr, stt) {
6,606,187✔
1474
      TARRAY2_FOREACH(stt->fobjArr, fObj) { getLevelSize(fObj, levelSize); }
4,882,192✔
1475
    }
1476

1477
    fObj = fset->farr[TSDB_FTYPE_DATA];
4,248,694✔
1478
    if (fObj) {
4,248,694✔
1479
      int32_t lcn = fObj->f->lcn;
2,245,786✔
1480
      if (lcn > 1) {
2,245,786✔
UNCOV
1481
        ssSize += ((lcn - 1) * chunksize);
×
1482
      }
1483
    }
1484
  }
1485

1486
  pInfo->l1Size = levelSize[0];
6,625,175✔
1487
  pInfo->l2Size = levelSize[1];
6,625,175✔
1488
  pInfo->l3Size = levelSize[2];
6,625,175✔
1489
  pInfo->ssSize = ssSize;
6,625,175✔
1490
  return code;
6,625,175✔
1491
}
1492
int32_t tsdbGetFsSize(STsdb *tsdb, SDbSizeStatisInfo *pInfo) {
6,625,175✔
1493
  int32_t code = 0;
6,625,175✔
1494

1495
  (void)taosThreadMutexLock(&tsdb->mutex);
6,625,175✔
1496
  code = tsdbGetFsSizeImpl(tsdb, pInfo);
6,625,175✔
1497
  (void)taosThreadMutexUnlock(&tsdb->mutex);
6,625,175✔
1498
  return code;
6,625,175✔
1499
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc