• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4440

04 Jul 2025 02:10AM UTC coverage: 63.29% (-0.4%) from 63.643%
#4440

push

travis-ci

web-flow
fix:(stmt2) heap buffer overflow (#31607)

159782 of 321690 branches covered (49.67%)

Branch coverage included in aggregate %.

19 of 22 new or added lines in 3 files covered. (86.36%)

5735 existing lines in 195 files now uncovered.

246739 of 320626 relevant lines covered (76.96%)

6757056.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.17
/source/dnode/vnode/src/tsdb/tsdbFS2.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cos.h"
17
#include "tsdbFS2.h"
18
#include "tsdbUpgrade.h"
19
#include "vnd.h"
20

21
#define BLOCK_COMMIT_FACTOR 3
22

23
typedef struct STFileHashEntry {
24
  struct STFileHashEntry *next;
25
  char                    fname[TSDB_FILENAME_LEN];
26
} STFileHashEntry;
27

28
typedef struct {
29
  int32_t           numFile;
30
  int32_t           numBucket;
31
  STFileHashEntry **buckets;
32
} STFileHash;
33

34
static const char *gCurrentFname[] = {
35
    [TSDB_FCURRENT] = "current.json",
36
    [TSDB_FCURRENT_C] = "current.c.json",
37
    [TSDB_FCURRENT_M] = "current.m.json",
38
};
39

40
static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
15,968✔
41
  fs[0] = taosMemoryCalloc(1, sizeof(*fs[0]));
15,968!
42
  if (fs[0] == NULL) {
16,091!
43
    return terrno;
×
44
  }
45

46
  fs[0]->tsdb = pTsdb;
16,091✔
47
  int32_t code = tsem_init(&fs[0]->canEdit, 0, 1);
16,091✔
48
  if (code) {
16,092✔
49
    taosMemoryFree(fs[0]);
3!
50
    return code;
×
51
  }
52

53
  fs[0]->fsstate = TSDB_FS_STATE_NORMAL;
16,089✔
54
  fs[0]->neid = 0;
16,089✔
55
  TARRAY2_INIT(fs[0]->fSetArr);
16,089✔
56
  TARRAY2_INIT(fs[0]->fSetArrTmp);
16,089✔
57

58
  return 0;
16,089✔
59
}
60

61
static void destroy_fs(STFileSystem **fs) {
16,096✔
62
  if (fs[0] == NULL) return;
16,096!
63

64
  TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
16,096!
65
  TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
16,096!
66
  if (tsem_destroy(&fs[0]->canEdit) != 0) {
16,096!
67
    tsdbError("failed to destroy semaphore");
×
68
  }
69
  taosMemoryFree(fs[0]);
16,094!
70
  fs[0] = NULL;
16,096✔
71
}
72

73
void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
92,572✔
74
  int32_t offset = 0;
92,572✔
75

76
  vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->diskPrimary, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN);
92,572✔
77
  offset = strlen(fname);
92,606✔
78
  snprintf(fname + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, gCurrentFname[ftype]);
92,606✔
79
}
92,606✔
80

81
static int32_t save_json(const cJSON *json, const char *fname) {
26,018✔
82
  int32_t   code = 0;
26,018✔
83
  int32_t   lino;
84
  char     *data = NULL;
26,018✔
85
  TdFilePtr fp = NULL;
26,018✔
86

87
  data = cJSON_PrintUnformatted(json);
26,018✔
88
  if (data == NULL) {
26,019!
89
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
90
  }
91

92
  fp = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
26,019✔
93
  if (fp == NULL) {
26,023!
94
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
95
  }
96

97
  if (taosWriteFile(fp, data, strlen(data)) < 0) {
26,023!
98
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
99
  }
100

101
  if (taosFsyncFile(fp) < 0) {
26,026!
102
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
103
  }
104

105
_exit:
26,026✔
106
  if (code) {
26,026!
107
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
108
  }
109
  taosMemoryFree(data);
26,026!
110
  taosCloseFileWithLog(&fp);
26,024!
111
  return code;
26,026✔
112
}
113

114
static int32_t load_json(const char *fname, cJSON **json) {
3,627✔
115
  int32_t code = 0;
3,627✔
116
  int32_t lino;
117
  char   *data = NULL;
3,627✔
118

119
  TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
3,627✔
120
  if (fp == NULL) {
3,629!
121
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
122
  }
123

124
  int64_t size;
125
  code = taosFStatFile(fp, &size, NULL);
3,629✔
126
  if (code != 0) {
3,635!
127
    TSDB_CHECK_CODE(code, lino, _exit);
×
128
  }
129

130
  data = taosMemoryMalloc(size + 1);
3,635!
131
  if (data == NULL) {
3,632!
132
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
133
  }
134

135
  if (taosReadFile(fp, data, size) < 0) {
3,632!
136
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
137
  }
138
  data[size] = '\0';
3,633✔
139

140
  json[0] = cJSON_Parse(data);
3,633✔
141
  if (json[0] == NULL) {
3,631!
142
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
143
  }
144

145
_exit:
3,631✔
146
  if (code) {
3,631!
147
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
148
    json[0] = NULL;
×
149
  }
150
  taosCloseFileWithLog(&fp);
3,631!
151
  taosMemoryFree(data);
3,635!
152
  return code;
3,635✔
153
}
154

155
int32_t save_fs(const TFileSetArray *arr, const char *fname) {
26,022✔
156
  int32_t code = 0;
26,022✔
157
  int32_t lino = 0;
26,022✔
158

159
  cJSON *json = cJSON_CreateObject();
26,022✔
160
  if (json == NULL) {
26,019!
161
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
162
  }
163

164
  // fmtv
165
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
26,019!
166
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
167
  }
168

169
  // fset
170
  cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
26,014✔
171
  if (!ajson) {
26,020!
172
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
173
  }
174
  const STFileSet *fset;
175
  TARRAY2_FOREACH(arr, fset) {
381,189✔
176
    cJSON *item = cJSON_CreateObject();
355,163✔
177
    if (!item) {
355,180!
178
      TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
179
    }
180
    (void)cJSON_AddItemToArray(ajson, item);
355,180✔
181

182
    code = tsdbTFileSetToJson(fset, item);
355,181✔
183
    TSDB_CHECK_CODE(code, lino, _exit);
355,169!
184
  }
185

186
  code = save_json(json, fname);
26,026✔
187
  TSDB_CHECK_CODE(code, lino, _exit);
26,026!
188

189
_exit:
26,026✔
190
  if (code) {
26,026!
191
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
192
  }
193
  cJSON_Delete(json);
26,026✔
194
  return code;
26,026✔
195
}
196

197
static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
3,628✔
198
  int32_t code = 0;
3,628✔
199
  int32_t lino = 0;
3,628✔
200

201
  TARRAY2_CLEAR(arr, tsdbTFileSetClear);
3,628!
202

203
  // load json
204
  cJSON *json = NULL;
3,628✔
205
  code = load_json(fname, &json);
3,628✔
206
  TSDB_CHECK_CODE(code, lino, _exit);
3,635!
207

208
  // parse json
209
  const cJSON *item1;
210

211
  /* fmtv */
212
  item1 = cJSON_GetObjectItem(json, "fmtv");
3,635✔
213
  if (cJSON_IsNumber(item1)) {
3,633!
214
    if (item1->valuedouble != 1) {
3,631!
215
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
216
    }
217
  } else {
218
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
219
  }
220

221
  /* fset */
222
  item1 = cJSON_GetObjectItem(json, "fset");
3,631✔
223
  if (cJSON_IsArray(item1)) {
3,635!
224
    const cJSON *item2;
225
    cJSON_ArrayForEach(item2, item1) {
6,083!
226
      STFileSet *fset;
227
      code = tsdbJsonToTFileSet(pTsdb, item2, &fset);
2,448✔
228
      TSDB_CHECK_CODE(code, lino, _exit);
2,447!
229

230
      code = TARRAY2_APPEND(arr, fset);
2,447✔
231
      TSDB_CHECK_CODE(code, lino, _exit);
2,448!
232
    }
233
    TARRAY2_SORT(arr, tsdbTFileSetCmprFn);
3,635✔
234
  } else {
235
    code = TSDB_CODE_FILE_CORRUPTED;
×
236
    TSDB_CHECK_CODE(code, lino, _exit);
×
237
  }
238

239
_exit:
×
240
  if (code) {
3,635!
241
    tsdbError("%s failed at %sP%d since %s, fname:%s", __func__, __FILE__, lino, tstrerror(code), fname);
×
242
  }
243
  if (json) {
3,626!
244
    cJSON_Delete(json);
3,626✔
245
  }
246
  return code;
3,634✔
247
}
248

249
static int32_t apply_commit(STFileSystem *fs) {
13,565✔
250
  int32_t        code = 0;
13,565✔
251
  int32_t        lino;
252
  TFileSetArray *fsetArray1 = fs->fSetArr;
13,565✔
253
  TFileSetArray *fsetArray2 = fs->fSetArrTmp;
13,565✔
254
  int32_t        i1 = 0, i2 = 0;
13,565✔
255

256
  while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
368,207✔
257
    STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_GET(fsetArray1, i1) : NULL;
354,667✔
258
    STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_GET(fsetArray2, i2) : NULL;
354,667✔
259

260
    if (fset1 && fset2) {
354,667✔
261
      if (fset1->fid < fset2->fid) {
12,999!
262
        // delete fset1
263
        tsdbTFileSetRemove(fset1);
×
264
        i1++;
×
265
      } else if (fset1->fid > fset2->fid) {
12,999✔
266
        // create new file set with fid of fset2->fid
267
        code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
37✔
268
        TSDB_CHECK_CODE(code, lino, _exit);
37!
269
        code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
37✔
270
        TSDB_CHECK_CODE(code, lino, _exit);
37!
271
        i1++;
37✔
272
        i2++;
37✔
273
      } else {
274
        // edit
275
        code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1);
12,962✔
276
        TSDB_CHECK_CODE(code, lino, _exit);
12,962!
277
        i1++;
12,962✔
278
        i2++;
12,962✔
279
      }
280
    } else if (fset1) {
341,668✔
281
      // delete fset1
282
      tsdbTFileSetRemove(fset1);
7✔
283
      i1++;
7✔
284
    } else {
285
      // create new file set with fid of fset2->fid
286
      code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
341,661✔
287
      TSDB_CHECK_CODE(code, lino, _exit);
342,035!
288
      code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
341,636✔
289
      TSDB_CHECK_CODE(code, lino, _exit);
341,636!
290
      i1++;
341,636✔
291
      i2++;
341,636✔
292
    }
293
  }
294

295
_exit:
13,540✔
296
  if (code) {
13,540!
297
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
298
  }
299
  return code;
13,565✔
300
}
301

302
static int32_t commit_edit(STFileSystem *fs) {
13,565✔
303
  char current[TSDB_FILENAME_LEN];
304
  char current_t[TSDB_FILENAME_LEN];
305

306
  current_fname(fs->tsdb, current, TSDB_FCURRENT);
13,565✔
307
  if (fs->etype == TSDB_FEDIT_COMMIT) {
13,565✔
308
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
10,338✔
309
  } else {
310
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
3,227✔
311
  }
312

313
  int32_t code;
314
  int32_t lino;
315
  TSDB_CHECK_CODE(taosRenameFile(current_t, current), lino, _exit);
13,565!
316

317
  code = apply_commit(fs);
13,565✔
318
  TSDB_CHECK_CODE(code, lino, _exit);
13,565!
319

320
_exit:
13,565✔
321
  if (code) {
13,565!
322
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
323
              tstrerror(code));
324
  } else {
325
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
13,565!
326
  }
327
  return code;
13,565✔
328
}
329

330
// static int32_t
331
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs);
332
static int32_t apply_abort(STFileSystem *fs) { return tsdbFSDoSanAndFix(fs); }
×
333

334
static int32_t abort_edit(STFileSystem *fs) {
×
335
  char fname[TSDB_FILENAME_LEN];
336

337
  if (fs->etype == TSDB_FEDIT_COMMIT) {
×
338
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
×
339
  } else {
340
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
×
341
  }
342

343
  int32_t code;
344
  int32_t lino;
345
  if ((code = taosRemoveFile(fname))) {
×
346
    code = TAOS_SYSTEM_ERROR(code);
×
347
    TSDB_CHECK_CODE(code, lino, _exit);
×
348
  }
349

350
  code = apply_abort(fs);
×
351
  TSDB_CHECK_CODE(code, lino, _exit);
×
352

353
_exit:
×
354
  if (code) {
×
355
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
356
              tstrerror(code));
357
  } else {
358
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
×
359
  }
360
  return code;
×
361
}
362

363
static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) {
3,761✔
364
  int32_t code = 0;
3,761✔
365
  int32_t lino = 0;
3,761✔
366

367
  // check file existence
368
  if (!taosCheckExistFile(fobj->fname)) {
3,761!
369
    bool found = false;
×
370

371
    if (tsS3Enabled && fobj->f->lcn > 1) {
×
372
      char fname1[TSDB_FILENAME_LEN];
373
      tsdbTFileLastChunkName(fs->tsdb, fobj->f, fname1);
×
374
      if (!taosCheckExistFile(fname1)) {
×
375
        code = TSDB_CODE_FILE_CORRUPTED;
×
376
        tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fname1);
×
377
        return code;
×
378
      }
379

380
      found = true;
×
381
    }
382

383
    if (!found) {
×
384
      code = TSDB_CODE_FILE_CORRUPTED;
×
385
      tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fobj->fname);
×
386
      return code;
×
387
    }
388
  }
389

390
  return 0;
3,764✔
391
}
392

393
static void tsdbFSDestroyFileObjHash(STFileHash *hash);
394

395
static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname) {
7,392✔
396
  STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry));
7,392!
397
  if (entry == NULL) return terrno;
7,395!
398

399
  tstrncpy(entry->fname, fname, TSDB_FILENAME_LEN);
7,395✔
400

401
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
7,395✔
402

403
  entry->next = hash->buckets[idx];
7,396✔
404
  hash->buckets[idx] = entry;
7,396✔
405
  hash->numFile++;
7,396✔
406

407
  return 0;
7,396✔
408
}
409

410
static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
3,630✔
411
  int32_t code = 0;
3,630✔
412
  int32_t lino;
413
  char    fname[TSDB_FILENAME_LEN];
414

415
  // init hash table
416
  hash->numFile = 0;
3,630✔
417
  hash->numBucket = 4096;
3,630✔
418
  hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *));
3,630!
419
  if (hash->buckets == NULL) {
3,635!
420
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
421
  }
422

423
  // vnode.json
424
  current_fname(fs->tsdb, fname, TSDB_FCURRENT);
3,635✔
425
  code = tsdbFSAddEntryToFileObjHash(hash, fname);
3,634✔
426
  TSDB_CHECK_CODE(code, lino, _exit);
3,635!
427

428
  // other
429
  STFileSet *fset = NULL;
3,635✔
430
  TARRAY2_FOREACH(fs->fSetArr, fset) {
6,084✔
431
    // data file
432
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
12,245✔
433
      if (fset->farr[i] != NULL) {
9,796✔
434
        code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname);
1,255✔
435
        TSDB_CHECK_CODE(code, lino, _exit);
1,255!
436

437
        if (TSDB_FTYPE_DATA == i && fset->farr[i]->f->lcn > 0) {
1,255!
438
          STFileObj *fobj = fset->farr[i];
×
439
          int32_t    lcn = fobj->f->lcn;
×
440
          char       lcn_name[TSDB_FILENAME_LEN];
441

442
          snprintf(lcn_name, TSDB_FQDN_LEN, "%s", fobj->fname);
×
443
          char *dot = strrchr(lcn_name, '.');
×
444
          if (dot) {
×
445
            snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lcn_name), "%d.data", lcn);
×
446

447
            code = tsdbFSAddEntryToFileObjHash(hash, lcn_name);
×
448
            TSDB_CHECK_CODE(code, lino, _exit);
×
449
          }
450
        }
451
      }
452
    }
453

454
    // stt file
455
    SSttLvl *lvl = NULL;
2,449✔
456
    TARRAY2_FOREACH(fset->lvlArr, lvl) {
4,957✔
457
      STFileObj *fobj;
458
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
5,016✔
459
        code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname);
2,508✔
460
        TSDB_CHECK_CODE(code, lino, _exit);
2,508!
461
      }
462
    }
463
  }
464

465
_exit:
3,635✔
466
  if (code) {
3,635!
467
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
468
    tsdbFSDestroyFileObjHash(hash);
×
469
  }
470
  return code;
3,635✔
471
}
472

473
static const STFileHashEntry *tsdbFSGetFileObjHashEntry(STFileHash *hash, const char *fname) {
7,396✔
474
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
7,396✔
475

476
  STFileHashEntry *entry = hash->buckets[idx];
7,398✔
477
  while (entry) {
7,442!
478
    if (strcmp(entry->fname, fname) == 0) {
7,442✔
479
      return entry;
7,398✔
480
    }
481
    entry = entry->next;
44✔
482
  }
483

UNCOV
484
  return NULL;
×
485
}
486

487
static void tsdbFSDestroyFileObjHash(STFileHash *hash) {
3,635✔
488
  for (int32_t i = 0; i < hash->numBucket; i++) {
14,696,830✔
489
    STFileHashEntry *entry = hash->buckets[i];
14,693,195✔
490
    while (entry) {
14,700,593✔
491
      STFileHashEntry *next = entry->next;
7,398✔
492
      taosMemoryFree(entry);
7,398!
493
      entry = next;
7,398✔
494
    }
495
  }
496
  taosMemoryFree(hash->buckets);
3,635!
497
  memset(hash, 0, sizeof(*hash));
3,635✔
498
}
3,635✔
499

500
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
3,629✔
501
  int32_t code = 0;
3,629✔
502
  int32_t lino = 0;
3,629✔
503
  int32_t corrupt = false;
3,629✔
504

505
  {  // scan each file
506
    STFileSet *fset = NULL;
3,629✔
507
    TARRAY2_FOREACH(fs->fSetArr, fset) {
6,078✔
508
      // data file
509
      for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
12,232✔
510
        if (fset->farr[ftype] == NULL) continue;
9,785✔
511
        STFileObj *fobj = fset->farr[ftype];
1,253✔
512
        code = tsdbFSDoScanAndFixFile(fs, fobj);
1,253✔
513
        if (code) {
1,255!
514
          fset->maxVerValid = (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
515
          corrupt = true;
×
516
        }
517
      }
518

519
      // stt file
520
      SSttLvl *lvl;
521
      TARRAY2_FOREACH(fset->lvlArr, lvl) {
4,954✔
522
        STFileObj *fobj;
523
        TARRAY2_FOREACH(lvl->fobjArr, fobj) {
5,013✔
524
          code = tsdbFSDoScanAndFixFile(fs, fobj);
2,506✔
525
          if (code) {
2,508!
526
            fset->maxVerValid =
×
527
                (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
528
            corrupt = true;
×
529
          }
530
        }
531
      }
532
    }
533
  }
534

535
  if (corrupt) {
3,633!
536
    tsdbError("vgId:%d, not to clear dangling files due to fset incompleteness", TD_VID(fs->tsdb->pVnode));
×
537
    fs->fsstate = TSDB_FS_STATE_INCOMPLETE;
×
538
    code = 0;
×
539
    goto _exit;
×
540
  }
541

542
  {  // clear unreferenced files
543
    STfsDir *dir = NULL;
3,633✔
544
    TAOS_CHECK_GOTO(tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path, &dir), &lino, _exit);
3,633!
545

546
    STFileHash fobjHash = {0};
3,630✔
547
    code = tsdbFSCreateFileObjHash(fs, &fobjHash);
3,630✔
548
    if (code) goto _close_dir;
3,635!
549

550
    for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) {
14,666✔
551
      if (taosIsDir(file->aname)) continue;
11,031✔
552

553
      if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
7,397!
554
        tsdbRemoveFile(file->aname);
×
555
      }
556
    }
557

558
    tsdbFSDestroyFileObjHash(&fobjHash);
3,632✔
559

560
  _close_dir:
3,635✔
561
    tfsClosedir(dir);
3,635✔
562
  }
563

564
_exit:
3,635✔
565
  if (code) {
3,635!
566
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
567
  }
568
  return code;
3,635✔
569
}
570

571
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
3,630✔
572
  fs->neid = 0;
3,630✔
573

574
  // get max commit id
575
  const STFileSet *fset;
576
  TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
6,069✔
577

578
  // scan and fix
579
  int32_t code = 0;
3,626✔
580
  int32_t lino = 0;
3,626✔
581

582
  code = tsdbFSDoSanAndFix(fs);
3,626✔
583
  TSDB_CHECK_CODE(code, lino, _exit);
3,635!
584

585
_exit:
3,635✔
586
  if (code) {
3,635!
587
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
588
  }
589
  return code;
3,635✔
590
}
591

592
static int32_t tsdbFSDupState(STFileSystem *fs) {
17,199✔
593
  int32_t code;
594

595
  const TFileSetArray *src = fs->fSetArr;
17,199✔
596
  TFileSetArray       *dst = fs->fSetArrTmp;
17,199✔
597

598
  TARRAY2_CLEAR(dst, tsdbTFileSetClear);
30,168✔
599

600
  const STFileSet *fset1;
601
  TARRAY2_FOREACH(src, fset1) {
32,615✔
602
    STFileSet *fset2;
603
    code = tsdbTFileSetInitCopy(fs->tsdb, fset1, &fset2);
15,419✔
604
    if (code) return code;
15,414!
605
    code = TARRAY2_APPEND(dst, fset2);
15,414✔
606
    if (code) return code;
15,416!
607
  }
608

609
  return 0;
17,196✔
610
}
611

612
static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
16,083✔
613
  int32_t code = 0;
16,083✔
614
  int32_t lino = 0;
16,083✔
615
  STsdb  *pTsdb = fs->tsdb;
16,083✔
616

617
  char fCurrent[TSDB_FILENAME_LEN];
618
  char cCurrent[TSDB_FILENAME_LEN];
619
  char mCurrent[TSDB_FILENAME_LEN];
620

621
  current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
16,083✔
622
  current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
16,076✔
623
  current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
16,087✔
624

625
  if (taosCheckExistFile(fCurrent)) {  // current.json exists
16,094✔
626
    code = load_fs(pTsdb, fCurrent, fs->fSetArr);
3,634✔
627
    TSDB_CHECK_CODE(code, lino, _exit);
3,628!
628

629
    if (taosCheckExistFile(cCurrent)) {
3,628!
630
      // current.c.json exists
631

632
      fs->etype = TSDB_FEDIT_COMMIT;
×
633
      if (rollback) {
×
634
        code = abort_edit(fs);
×
635
        TSDB_CHECK_CODE(code, lino, _exit);
×
636
      } else {
637
        code = load_fs(pTsdb, cCurrent, fs->fSetArrTmp);
×
638
        TSDB_CHECK_CODE(code, lino, _exit);
×
639

640
        code = commit_edit(fs);
×
641
        TSDB_CHECK_CODE(code, lino, _exit);
×
642
      }
643
    } else if (taosCheckExistFile(mCurrent)) {
3,633!
644
      // current.m.json exists
645
      fs->etype = TSDB_FEDIT_MERGE;
×
646
      code = abort_edit(fs);
×
647
      TSDB_CHECK_CODE(code, lino, _exit);
×
648
    }
649

650
    code = tsdbFSDupState(fs);
3,635✔
651
    TSDB_CHECK_CODE(code, lino, _exit);
3,631!
652

653
    code = tsdbFSScanAndFix(fs);
3,631✔
654
    TSDB_CHECK_CODE(code, lino, _exit);
3,634!
655
  } else {
656
    code = save_fs(fs->fSetArr, fCurrent);
12,460✔
657
    TSDB_CHECK_CODE(code, lino, _exit);
12,460!
658
  }
659

660
_exit:
12,460✔
661
  if (code) {
16,094!
662
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
663
  } else {
664
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
16,094✔
665
  }
666
  return code;
16,096✔
667
}
668

669
static void close_file_system(STFileSystem *fs) {
16,096✔
670
  TARRAY2_CLEAR(fs->fSetArr, tsdbTFileSetClear);
360,751✔
671
  TARRAY2_CLEAR(fs->fSetArrTmp, tsdbTFileSetClear);
360,764✔
672
}
16,091✔
673

674
static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) {
×
675
  if (pSet1->fid < pSet2->fid) {
×
676
    return -1;
×
677
  } else if (pSet1->fid > pSet2->fid) {
×
678
    return 1;
×
679
  }
680
  return 0;
×
681
}
682

683
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
13,565✔
684
  int32_t code = 0;
13,565✔
685
  int32_t lino = 0;
13,565✔
686

687
  code = tsdbFSDupState(fs);
13,565✔
688
  if (code) return code;
13,564!
689

690
  TFileSetArray  *fsetArray = fs->fSetArrTmp;
13,564✔
691
  STFileSet      *fset = NULL;
13,564✔
692
  const STFileOp *op;
693
  int32_t         fid = INT32_MIN;
13,564✔
694
  TSKEY           now = taosGetTimestampMs();
13,565✔
695
  TARRAY2_FOREACH_PTR(opArray, op) {
380,246✔
696
    if (!fset || fset->fid != op->fid) {
366,677✔
697
      STFileSet tfset = {.fid = op->fid};
351,811✔
698
      fset = &tfset;
351,811✔
699
      STFileSet **fsetPtr = TARRAY2_SEARCH(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ);
351,663✔
700
      fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
351,663✔
701

702
      if (!fset) {
351,663✔
703
        code = tsdbTFileSetInit(op->fid, &fset);
342,060✔
704
        TSDB_CHECK_CODE(code, lino, _exit);
341,940!
705

706
        code = TARRAY2_SORT_INSERT(fsetArray, fset, tsdbTFileSetCmprFn);
341,909✔
707
        TSDB_CHECK_CODE(code, lino, _exit);
341,909!
708
      }
709
    }
710

711
    code = tsdbTFileSetEdit(fs->tsdb, fset, op);
366,378✔
712
    TSDB_CHECK_CODE(code, lino, _exit);
366,681!
713

714
    if (fid != op->fid) {
366,681✔
715
      fid = op->fid;
351,810✔
716
      if (etype == TSDB_FEDIT_COMMIT) {
351,810✔
717
        fset->lastCommit = now;
348,584✔
718
      } else if (etype == TSDB_FEDIT_COMPACT) {
3,226✔
719
        fset->lastCompact = now;
57✔
720
      }
721
    }
722
  }
723

724
  // remove empty empty stt level and empty file set
725
  int32_t i = 0;
13,569✔
726
  while (i < TARRAY2_SIZE(fsetArray)) {
368,784✔
727
    fset = TARRAY2_GET(fsetArray, i);
355,214✔
728

729
    SSttLvl *lvl;
730
    int32_t  j = 0;
355,214✔
731
    while (j < TARRAY2_SIZE(fset->lvlArr)) {
720,628✔
732
      lvl = TARRAY2_GET(fset->lvlArr, j);
365,413✔
733

734
      if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
365,413✔
735
        TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
4,368!
736
      } else {
737
        j++;
361,045✔
738
      }
739
    }
740

741
    if (tsdbTFileSetIsEmpty(fset)) {
355,215✔
742
      TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear);
7!
743
    } else {
744
      i++;
355,208✔
745
    }
746
  }
747

748
_exit:
13,570✔
749
  if (code) {
13,570!
750
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
751
  }
752
  return code;
13,563✔
753
}
754

755
// return error code
756
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) {
16,022✔
757
  int32_t code;
758
  int32_t lino;
759

760
  code = tsdbCheckAndUpgradeFileSystem(pTsdb, rollback);
16,022✔
761
  TSDB_CHECK_CODE(code, lino, _exit);
16,091!
762

763
  code = create_fs(pTsdb, fs);
16,091✔
764
  TSDB_CHECK_CODE(code, lino, _exit);
16,087!
765

766
  code = open_fs(fs[0], rollback);
16,087✔
767
  TSDB_CHECK_CODE(code, lino, _exit);
16,096!
768

769
_exit:
16,096✔
770
  if (code) {
16,096!
771
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
772
    destroy_fs(fs);
×
773
  } else {
774
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
16,096✔
775
  }
776
  return code;
16,096✔
777
}
778

779
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block);
780
extern void tsdbStopAllCompTask(STsdb *tsdb);
781

782
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
16,188✔
783
  STFileSystem *fs = pTsdb->pFS;
16,188✔
784
  SArray       *asyncTasks = taosArrayInit(0, sizeof(SVATaskID));
16,188✔
785
  if (asyncTasks == NULL) {
16,186!
786
    return terrno;
×
787
  }
788

789
  (void)taosThreadMutexLock(&pTsdb->mutex);
16,186✔
790

791
  // disable
792
  pTsdb->bgTaskDisabled = true;
16,188✔
793

794
  // collect channel
795
  STFileSet *fset;
796
  TARRAY2_FOREACH(fs->fSetArr, fset) {
360,873✔
797
    if (taosArrayPush(asyncTasks, &fset->mergeTask) == NULL       //
689,371!
798
        || taosArrayPush(asyncTasks, &fset->compactTask) == NULL  //
689,371!
799
        || taosArrayPush(asyncTasks, &fset->retentionTask) == NULL) {
689,370!
800
      taosArrayDestroy(asyncTasks);
×
801
      (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
802
      return terrno;
×
803
    }
804
    tsdbFSSetBlockCommit(fset, false);
344,684✔
805
  }
806

807
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
16,187✔
808

809
  // destroy all channels
810
  for (int32_t k = 0; k < 2; k++) {
48,563✔
811
    for (int32_t i = 0; i < taosArrayGetSize(asyncTasks); i++) {
2,100,551✔
812
      SVATaskID *task = taosArrayGet(asyncTasks, i);
2,068,166✔
813
      if (k == 0) {
2,068,173✔
814
        (void)vnodeACancel(task);
1,034,090✔
815
      } else {
816
        vnodeAWait(task);
1,034,083✔
817
      }
818
    }
819
  }
820
  taosArrayDestroy(asyncTasks);
16,188✔
821

822
#ifdef TD_ENTERPRISE
823
  tsdbStopAllCompTask(pTsdb);
16,187✔
824
#endif
825
  return 0;
16,188✔
826
}
827

828
void tsdbEnableBgTask(STsdb *pTsdb) {
92✔
829
  (void)taosThreadMutexLock(&pTsdb->mutex);
92✔
830
  pTsdb->bgTaskDisabled = false;
92✔
831
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
92✔
832
}
92✔
833

834
void tsdbCloseFS(STFileSystem **fs) {
16,095✔
835
  if (fs[0] == NULL) return;
16,095!
836

837
  int32_t code = tsdbDisableAndCancelAllBgTask((*fs)->tsdb);
16,095✔
838
  if (code) {
16,096!
839
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID((*fs)->tsdb->pVnode), __func__, __LINE__,
×
840
              tstrerror(code));
841
  }
842
  close_file_system(fs[0]);
16,096✔
843
  destroy_fs(fs);
16,096✔
844
  return;
16,095✔
845
}
846

847
int64_t tsdbFSAllocEid(STFileSystem *fs) {
13,699✔
848
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
13,699✔
849
  int64_t cid = ++fs->neid;
13,699✔
850
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
13,699✔
851
  return cid;
13,699✔
852
}
853

854
void tsdbFSUpdateEid(STFileSystem *fs, int64_t cid) {
310✔
855
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
310✔
856
  fs->neid = TMAX(fs->neid, cid);
310✔
857
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
310✔
858
}
310✔
859

860
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
13,565✔
861
  int32_t code = 0;
13,565✔
862
  int32_t lino;
863
  char    current_t[TSDB_FILENAME_LEN];
864

865
  if (etype == TSDB_FEDIT_COMMIT) {
13,565✔
866
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
10,338✔
867
  } else {
868
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
3,227✔
869
  }
870

871
  if (tsem_wait(&fs->canEdit) != 0) {
13,565!
872
    tsdbError("vgId:%d failed to wait semaphore", TD_VID(fs->tsdb->pVnode));
×
873
  }
874
  fs->etype = etype;
13,565✔
875

876
  // edit
877
  code = edit_fs(fs, opArray, etype);
13,565✔
878
  TSDB_CHECK_CODE(code, lino, _exit);
13,563!
879

880
  // save fs
881
  code = save_fs(fs->fSetArrTmp, current_t);
13,563✔
882
  TSDB_CHECK_CODE(code, lino, _exit);
13,565!
883

884
_exit:
13,565✔
885
  if (code) {
13,565!
886
    tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, lino,
×
887
              tstrerror(code), etype);
888
  } else {
889
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, etype);
13,565!
890
  }
891
  return code;
13,565✔
892
}
893

894
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
693,005✔
895
  if (block) {
693,005!
896
    fset->blockCommit = true;
×
897
  } else {
898
    fset->blockCommit = false;
693,005✔
899
    if (fset->numWaitCommit > 0) {
693,005!
900
      (void)taosThreadCondSignal(&fset->canCommit);
×
901
    }
902
  }
903
}
693,005✔
904

905
void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
467,491✔
906
  (void)taosThreadMutexLock(&tsdb->mutex);
467,491✔
907
  STFileSet *fset;
908
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
467,521✔
909
  bool blockCommit = false;
467,466✔
910
  if (fset) {
467,466✔
911
    blockCommit = fset->blockCommit;
6,425✔
912
  }
913
  if (fset) {
467,466✔
914
    METRICS_TIMING_BLOCK(tsdb->pVnode->writeMetrics.block_commit_time, METRIC_LEVEL_HIGH, {
6,425!
915
      while (fset->blockCommit) {
916
        fset->numWaitCommit++;
917
        (void)taosThreadCondWait(&fset->canCommit, &tsdb->mutex);
918
        fset->numWaitCommit--;
919
      }
920
    });
921
  }
922
  if (blockCommit) {
467,466!
923
    METRICS_UPDATE(tsdb->pVnode->writeMetrics.blocked_commit_count, METRIC_LEVEL_HIGH, 1);
×
924
  }
925
  (void)taosThreadMutexUnlock(&tsdb->mutex);
467,466✔
926
  return;
467,483✔
927
}
928

929
// IMPORTANT: the caller must hold fs->tsdb->mutex
930
int32_t tsdbFSEditCommit(STFileSystem *fs) {
13,565✔
931
  int32_t code = 0;
13,565✔
932
  int32_t lino = 0;
13,565✔
933

934
  // commit
935
  code = commit_edit(fs);
13,565✔
936
  TSDB_CHECK_CODE(code, lino, _exit);
13,565!
937

938
  // schedule merge
939
  int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
13,565✔
940
  if (sttTrigger > 1 && !fs->tsdb->bgTaskDisabled) {
13,565✔
941
    STFileSet *fset;
942
    TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
360,894✔
943
      if (TARRAY2_SIZE(fset->lvlArr) == 0) {
348,320✔
944
        tsdbFSSetBlockCommit(fset, false);
887✔
945
        continue;
887✔
946
      }
947

948
      SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
347,433✔
949
      if (lvl->level != 0) {
347,433✔
950
        tsdbFSSetBlockCommit(fset, false);
3,242✔
951
        continue;
3,242✔
952
      }
953

954
      // bool    skipMerge = false;
955
      int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
344,191✔
956
      if (numFile >= sttTrigger && (!vnodeATaskValid(&fset->mergeTask))) {
344,191✔
957
        SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
3,093!
958
        if (arg == NULL) {
3,093!
959
          code = terrno;
×
960
          TSDB_CHECK_CODE(code, lino, _exit);
×
961
        }
962

963
        arg->tsdb = fs->tsdb;
3,093✔
964
        arg->fid = fset->fid;
3,093✔
965

966
        code = vnodeAsync(MERGE_TASK_ASYNC, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, &fset->mergeTask);
3,093✔
967
        TSDB_CHECK_CODE(code, lino, _exit);
3,093!
968
      }
969

970
      if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
344,191!
971
        tsdbFSSetBlockCommit(fset, true);
×
972
      } else {
973
        tsdbFSSetBlockCommit(fset, false);
344,191✔
974
      }
975
    }
976
  }
977

978
_exit:
13,565✔
979
  if (code) {
13,565!
980
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
×
981
  } else {
982
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
13,565!
983
  }
984
  if (tsem_post(&fs->canEdit) != 0) {
13,565!
985
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
986
  }
987
  return code;
13,565✔
988
}
989

990
int32_t tsdbFSEditAbort(STFileSystem *fs) {
×
991
  int32_t code = abort_edit(fs);
×
992
  if (tsem_post(&fs->canEdit) != 0) {
×
993
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
994
  }
995
  return code;
×
996
}
997

998
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
947,799✔
999
  STFileSet   tfset = {.fid = fid};
947,799✔
1000
  STFileSet  *pset = &tfset;
947,799✔
1001
  STFileSet **fsetPtr = TARRAY2_SEARCH(fs->fSetArr, &pset, tsdbTFileSetCmprFn, TD_EQ);
947,799✔
1002
  fset[0] = (fsetPtr == NULL) ? NULL : fsetPtr[0];
948,113✔
1003
}
948,113✔
1004

1005
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
80✔
1006
  int32_t    code = 0;
80✔
1007
  STFileSet *fset;
1008
  STFileSet *fset1;
1009

1010
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
80!
1011
  if (fsetArr[0] == NULL) return terrno;
80!
1012

1013
  TARRAY2_INIT(fsetArr[0]);
80✔
1014

1015
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
80✔
1016
  TARRAY2_FOREACH(fs->fSetArr, fset) {
80!
1017
    code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1);
×
1018
    if (code) break;
×
1019

1020
    code = TARRAY2_APPEND(fsetArr[0], fset1);
×
1021
    if (code) break;
×
1022
  }
1023
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
80✔
1024

1025
  if (code) {
80!
1026
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1027
    taosMemoryFree(fsetArr[0]);
×
1028
    fsetArr[0] = NULL;
×
1029
  }
1030
  return code;
80✔
1031
}
1032

1033
void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
80✔
1034
  if (fsetArr[0]) {
80!
1035
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
80!
1036
    taosMemoryFree(fsetArr[0]);
80!
1037
    fsetArr[0] = NULL;
80✔
1038
  }
1039
}
80✔
1040

1041
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
94✔
1042
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
94✔
1043
  int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr);
94✔
1044
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
94✔
1045
  return code;
94✔
1046
}
1047

1048
int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr) {
1,318,252✔
1049
  int32_t    code = 0;
1,318,252✔
1050
  STFileSet *fset, *fset1;
1051

1052
  fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0]));
1,318,252!
1053
  if (fsetArr[0] == NULL) return terrno;
1,318,859!
1054

1055
  TARRAY2_FOREACH(fs->fSetArr, fset) {
4,929,701✔
1056
    code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1);
3,608,990✔
1057
    if (code) break;
3,610,862!
1058

1059
    code = TARRAY2_APPEND(fsetArr[0], fset1);
3,610,862✔
1060
    if (code) {
3,610,842!
1061
      tsdbTFileSetClear(&fset1);
×
1062
      break;
×
1063
    }
1064
  }
1065

1066
  if (code) {
1,320,711!
1067
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1068
    taosMemoryFree(fsetArr[0]);
×
1069
    fsetArr[0] = NULL;
×
1070
  }
1071
  return code;
1,320,711✔
1072
}
1073

1074
void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
1,318,506✔
1075
  if (fsetArr[0]) {
1,318,506!
1076
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
4,937,607!
1077
    taosMemoryFreeClear(fsetArr[0]);
1,318,766!
1078
    fsetArr[0] = NULL;
1,318,938✔
1079
  }
1080
}
1,318,738✔
1081

1082
static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) {
×
1083
  int32_t   capacity = TARRAY2_SIZE(pRanges) * 2;
×
1084
  SHashObj *pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
×
1085
  if (pHash == NULL) {
×
1086
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1087
    return NULL;
×
1088
  }
1089

1090
  for (int32_t i = 0; i < TARRAY2_SIZE(pRanges); i++) {
×
1091
    STFileSetRange *u = TARRAY2_GET(pRanges, i);
×
1092
    int32_t         fid = u->fid;
×
1093
    int32_t         code = taosHashPut(pHash, &fid, sizeof(fid), u, sizeof(*u));
×
1094
    tsdbDebug("range diff hash fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
×
1095
  }
1096
  return pHash;
×
1097
}
1098

1099
int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr,
×
1100
                                       TFileOpArray *fopArr) {
1101
  int32_t    code = 0;
×
1102
  STFileSet *fset;
1103
  STFileSet *fset1;
1104
  SHashObj  *pHash = NULL;
×
1105

1106
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
×
1107
  if (fsetArr == NULL) return terrno;
×
1108
  TARRAY2_INIT(fsetArr[0]);
×
1109

1110
  if (pRanges) {
×
1111
    pHash = tsdbFSetRangeArrayToHash(pRanges);
×
1112
    if (pHash == NULL) {
×
1113
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1114
      goto _out;
×
1115
    }
1116
  }
1117

1118
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
×
1119
  TARRAY2_FOREACH(fs->fSetArr, fset) {
×
1120
    int64_t ever = VERSION_MAX;
×
1121
    if (pHash) {
×
1122
      int32_t         fid = fset->fid;
×
1123
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
×
1124
      if (u) {
×
1125
        ever = u->sver - 1;
×
1126
      }
1127
    }
1128

1129
    code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr);
×
1130
    if (code) break;
×
1131

1132
    code = TARRAY2_APPEND(fsetArr[0], fset1);
×
1133
    if (code) break;
×
1134
  }
1135
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
×
1136

1137
_out:
×
1138
  if (code) {
×
1139
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1140
    taosMemoryFree(fsetArr[0]);
×
1141
    fsetArr[0] = NULL;
×
1142
  }
1143
  if (pHash) {
×
1144
    taosHashCleanup(pHash);
×
1145
    pHash = NULL;
×
1146
  }
1147
  return code;
×
1148
}
1149

1150
void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); }
×
1151

1152
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
×
1153
                                      TFileSetRangeArray **fsrArr) {
1154
  int32_t         code = 0;
×
1155
  STFileSet      *fset;
1156
  STFileSetRange *fsr1 = NULL;
×
1157
  SHashObj       *pHash = NULL;
×
1158

1159
  fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
×
1160
  if (fsrArr[0] == NULL) {
×
1161
    code = terrno;
×
1162
    goto _out;
×
1163
  }
1164

1165
  tsdbInfo("pRanges size:%d", (pRanges == NULL ? 0 : TARRAY2_SIZE(pRanges)));
×
1166
  if (pRanges) {
×
1167
    pHash = tsdbFSetRangeArrayToHash(pRanges);
×
1168
    if (pHash == NULL) {
×
1169
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1170
      goto _out;
×
1171
    }
1172
  }
1173

1174
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
×
1175
  TARRAY2_FOREACH(fs->fSetArr, fset) {
×
1176
    int64_t sver1 = sver;
×
1177
    int64_t ever1 = ever;
×
1178

1179
    if (pHash) {
×
1180
      int32_t         fid = fset->fid;
×
1181
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
×
1182
      if (u) {
×
1183
        sver1 = u->sver;
×
1184
        tsdbDebug("range hash get fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
×
1185
      }
1186
    }
1187

1188
    if (sver1 > ever1) {
×
1189
      tsdbDebug("skip fid:%d, sver:%" PRId64 ", ever:%" PRId64, fset->fid, sver1, ever1);
×
1190
      continue;
×
1191
    }
1192

1193
    tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
×
1194

1195
    code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
×
1196
    if (code) break;
×
1197

1198
    code = TARRAY2_APPEND(fsrArr[0], fsr1);
×
1199
    if (code) break;
×
1200

1201
    fsr1 = NULL;
×
1202
  }
1203
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
×
1204

1205
  if (code) {
×
1206
    tsdbTFileSetRangeClear(&fsr1);
×
1207
    TARRAY2_DESTROY(fsrArr[0], tsdbTFileSetRangeClear);
×
1208
    fsrArr[0] = NULL;
×
1209
  }
1210

1211
_out:
×
1212
  if (pHash) {
×
1213
    taosHashCleanup(pHash);
×
1214
    pHash = NULL;
×
1215
  }
1216
  return code;
×
1217
}
1218

1219
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { tsdbTFileSetRangeArrayDestroy(fsrArr); }
×
1220

1221
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task, STFileSet **fset) {
470,815✔
1222
  // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
1223
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
470,815✔
1224

1225
  tsdbFSGetFSet(tsdb->pFS, fid, fset);
470,815✔
1226
  if (*fset == NULL) {
470,876✔
1227
    return;
461,090✔
1228
  }
1229

1230
  struct STFileSetCond *cond = NULL;
9,786✔
1231
  if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
9,786✔
1232
    cond = &(*fset)->conds[0];
6,525✔
1233
  } else {
1234
    cond = &(*fset)->conds[1];
3,261✔
1235
  }
1236

1237
  while (1) {
1238
    if (cond->running) {
9,786!
1239
      cond->numWait++;
×
1240
      (void)taosThreadCondWait(&cond->cond, &tsdb->mutex);
×
UNCOV
1241
      cond->numWait--;
×
1242
    } else {
1243
      cond->running = true;
9,786✔
1244
      break;
9,786✔
1245
    }
1246
  }
1247

1248
  tsdbTrace("vgId:%d begin %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
9,786✔
1249
  return;
9,786✔
1250
}
1251

1252
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task) {
9,786✔
1253
  // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
1254
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
9,786✔
1255

1256
  STFileSet *fset = NULL;
9,786✔
1257
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
9,786✔
1258
  if (fset == NULL) {
9,786!
1259
    return;
×
1260
  }
1261

1262
  struct STFileSetCond *cond = NULL;
9,786✔
1263
  if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
9,786✔
1264
    cond = &fset->conds[0];
6,525✔
1265
  } else {
1266
    cond = &fset->conds[1];
3,261✔
1267
  }
1268

1269
  cond->running = false;
9,786✔
1270
  if (cond->numWait > 0) {
9,786!
1271
    (void)taosThreadCondSignal(&cond->cond);
×
1272
  }
1273

1274
  tsdbTrace("vgId:%d finish %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
9,786✔
1275
  return;
9,786✔
1276
}
1277

1278
struct SFileSetReader {
1279
  STsdb     *pTsdb;
1280
  STFileSet *pFileSet;
1281
  int32_t    fid;
1282
  int64_t    startTime;
1283
  int64_t    endTime;
1284
  int64_t    lastCompactTime;
1285
  int64_t    totalSize;
1286
};
1287

1288
int32_t tsdbFileSetReaderOpen(void *pVnode, struct SFileSetReader **ppReader) {
1✔
1289
  if (pVnode == NULL || ppReader == NULL) {
1!
1290
    return TSDB_CODE_INVALID_PARA;
×
1291
  }
1292

1293
  STsdb *pTsdb = ((SVnode *)pVnode)->pTsdb;
1✔
1294

1295
  (*ppReader) = taosMemoryCalloc(1, sizeof(struct SFileSetReader));
1!
1296
  if (*ppReader == NULL) {
1!
1297
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, __LINE__,
×
1298
              tstrerror(terrno));
1299
    return terrno;
×
1300
  }
1301

1302
  (*ppReader)->pTsdb = pTsdb;
1✔
1303
  (*ppReader)->fid = INT32_MIN;
1✔
1304
  (*ppReader)->pFileSet = NULL;
1✔
1305

1306
  return TSDB_CODE_SUCCESS;
1✔
1307
}
1308

1309
extern bool tsdbShouldCompact(const STFileSet *pFileSet, int32_t vgId);
1310

1311
#ifndef TD_ENTERPRISE
1312
bool tsdbShouldCompact(const STFileSet *pFileSet, int32_t vgId) { return false; }
1313
#endif
1314

1315
static int32_t tsdbFileSetReaderNextNoLock(struct SFileSetReader *pReader) {
2✔
1316
  STsdb  *pTsdb = pReader->pTsdb;
2✔
1317
  int32_t code = TSDB_CODE_SUCCESS;
2✔
1318

1319
  tsdbTFileSetClear(&pReader->pFileSet);
2✔
1320

1321
  STFileSet *fset = &(STFileSet){
2✔
1322
      .fid = pReader->fid,
2✔
1323
  };
1324

1325
  STFileSet **fsetPtr = TARRAY2_SEARCH(pReader->pTsdb->pFS->fSetArr, &fset, tsdbTFileSetCmprFn, TD_GT);
2✔
1326
  if (fsetPtr == NULL) {
2✔
1327
    pReader->fid = INT32_MAX;
1✔
1328
    return TSDB_CODE_NOT_FOUND;
1✔
1329
  }
1330

1331
  // ref file set
1332
  code = tsdbTFileSetInitRef(pReader->pTsdb, *fsetPtr, &pReader->pFileSet);
1✔
1333
  if (code) return code;
1!
1334

1335
  // get file set details
1336
  pReader->fid = pReader->pFileSet->fid;
1✔
1337
  tsdbFidKeyRange(pReader->fid, pTsdb->keepCfg.days, pTsdb->keepCfg.precision, &pReader->startTime, &pReader->endTime);
1✔
1338
  pReader->lastCompactTime = pReader->pFileSet->lastCompact;
1✔
1339
  pReader->totalSize = 0;
1✔
1340
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
5✔
1341
    STFileObj *fobj = pReader->pFileSet->farr[i];
4✔
1342
    if (fobj) {
4!
1343
      pReader->totalSize += fobj->f->size;
×
1344
    }
1345
  }
1346
  SSttLvl *lvl;
1347
  TARRAY2_FOREACH(pReader->pFileSet->lvlArr, lvl) {
2✔
1348
    STFileObj *fobj;
1349
    TARRAY2_FOREACH(lvl->fobjArr, fobj) { pReader->totalSize += fobj->f->size; }
2✔
1350
  }
1351

1352
  return code;
1✔
1353
}
1354

1355
int32_t tsdbFileSetReaderNext(struct SFileSetReader *pReader) {
2✔
1356
  int32_t code = TSDB_CODE_SUCCESS;
2✔
1357
  (void)taosThreadMutexLock(&pReader->pTsdb->mutex);
2✔
1358
  code = tsdbFileSetReaderNextNoLock(pReader);
2✔
1359
  (void)taosThreadMutexUnlock(&pReader->pTsdb->mutex);
2✔
1360
  return code;
2✔
1361
}
1362

1363
int32_t tsdbFileSetGetEntryField(struct SFileSetReader *pReader, const char *field, void *value) {
6✔
1364
  const char *fieldName;
1365

1366
  if (pReader->fid == INT32_MIN || pReader->fid == INT32_MAX) {
6!
1367
    return TSDB_CODE_INVALID_PARA;
×
1368
  }
1369

1370
  fieldName = "fileset_id";
6✔
1371
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
6✔
1372
    *(int32_t *)value = pReader->fid;
1✔
1373
    return TSDB_CODE_SUCCESS;
1✔
1374
  }
1375

1376
  fieldName = "start_time";
5✔
1377
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
5✔
1378
    *(int64_t *)value = pReader->startTime;
1✔
1379
    return TSDB_CODE_SUCCESS;
1✔
1380
  }
1381

1382
  fieldName = "end_time";
4✔
1383
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
4✔
1384
    *(int64_t *)value = pReader->endTime;
1✔
1385
    return TSDB_CODE_SUCCESS;
1✔
1386
  }
1387

1388
  fieldName = "total_size";
3✔
1389
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
3✔
1390
    *(int64_t *)value = pReader->totalSize;
1✔
1391
    return TSDB_CODE_SUCCESS;
1✔
1392
  }
1393

1394
  fieldName = "last_compact_time";
2✔
1395
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
2✔
1396
    *(int64_t *)value = pReader->lastCompactTime;
1✔
1397
    return TSDB_CODE_SUCCESS;
1✔
1398
  }
1399

1400
  fieldName = "should_compact";
1✔
1401
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
1!
1402
    *(char *)value = tsdbShouldCompact(pReader->pFileSet, pReader->pTsdb->pVnode->config.vgId);
1✔
1403
    return TSDB_CODE_SUCCESS;
1✔
1404
  }
1405

1406
  fieldName = "details";
×
1407
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1408
    // TODO
1409
    return TSDB_CODE_SUCCESS;
×
1410
  }
1411

1412
  return TSDB_CODE_INVALID_PARA;
×
1413
}
1414

1415
void tsdbFileSetReaderClose(struct SFileSetReader **ppReader) {
1✔
1416
  if (ppReader == NULL || *ppReader == NULL) {
1!
1417
    return;
×
1418
  }
1419

1420
  tsdbTFileSetClear(&(*ppReader)->pFileSet);
1✔
1421
  taosMemoryFree(*ppReader);
1!
1422

1423
  *ppReader = NULL;
1✔
1424
  return;
1✔
1425
}
1426

1427
static FORCE_INLINE void getLevelSize(const STFileObj *fObj, int64_t szArr[TFS_MAX_TIERS]) {
1428
  if (fObj == NULL) return;
256!
1429

1430
  int64_t sz = fObj->f->size;
196✔
1431
  // level == 0, primary storage
1432
  // level == 1, second storage,
1433
  // level == 2, third storage
1434
  int32_t level = fObj->f->did.level;
196✔
1435
  if (level >= 0 && level < TFS_MAX_TIERS) {
196!
1436
    szArr[level] += sz;
196✔
1437
  }
1438
}
1439

1440
static FORCE_INLINE int32_t tsdbGetFsSizeImpl(STsdb *tsdb, SDbSizeStatisInfo *pInfo) {
1441
  int32_t code = 0;
58✔
1442
  int64_t levelSize[TFS_MAX_TIERS] = {0};
58✔
1443
  int64_t s3Size = 0;
58✔
1444

1445
  const STFileSet *fset;
1446
  const SSttLvl   *stt = NULL;
58✔
1447
  const STFileObj *fObj = NULL;
58✔
1448

1449
  SVnodeCfg *pCfg = &tsdb->pVnode->config;
58✔
1450
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
58✔
1451

1452
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
148✔
1453
    for (int32_t t = TSDB_FTYPE_MIN; t < TSDB_FTYPE_MAX; ++t) {
450✔
1454
      getLevelSize(fset->farr[t], levelSize);
360✔
1455
    }
1456

1457
    TARRAY2_FOREACH(fset->lvlArr, stt) {
136✔
1458
      TARRAY2_FOREACH(stt->fobjArr, fObj) { getLevelSize(fObj, levelSize); }
92✔
1459
    }
1460

1461
    fObj = fset->farr[TSDB_FTYPE_DATA];
90✔
1462
    if (fObj) {
90✔
1463
      int32_t lcn = fObj->f->lcn;
50✔
1464
      if (lcn > 1) {
50!
1465
        s3Size += ((lcn - 1) * chunksize);
×
1466
      }
1467
    }
1468
  }
1469

1470
  pInfo->l1Size = levelSize[0];
58✔
1471
  pInfo->l2Size = levelSize[1];
58✔
1472
  pInfo->l3Size = levelSize[2];
58✔
1473
  pInfo->s3Size = s3Size;
58✔
1474
  return code;
58✔
1475
}
1476
int32_t tsdbGetFsSize(STsdb *tsdb, SDbSizeStatisInfo *pInfo) {
58✔
1477
  int32_t code = 0;
58✔
1478

1479
  (void)taosThreadMutexLock(&tsdb->mutex);
58✔
1480
  code = tsdbGetFsSizeImpl(tsdb, pInfo);
58✔
1481
  (void)taosThreadMutexUnlock(&tsdb->mutex);
58✔
1482
  return code;
58✔
1483
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc