• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.9
/source/dnode/vnode/src/tsdb/tsdbFS2.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cos.h"
17
#include "tsdbFS2.h"
18
#include "tsdbUpgrade.h"
19
#include "vnd.h"
20

21
#define BLOCK_COMMIT_FACTOR 3
22

23
typedef struct STFileHashEntry {
24
  struct STFileHashEntry *next;
25
  char                    fname[TSDB_FILENAME_LEN];
26
} STFileHashEntry;
27

28
typedef struct {
29
  int32_t           numFile;
30
  int32_t           numBucket;
31
  STFileHashEntry **buckets;
32
} STFileHash;
33

34
static const char *gCurrentFname[] = {
35
    [TSDB_FCURRENT] = "current.json",
36
    [TSDB_FCURRENT_C] = "current.c.json",
37
    [TSDB_FCURRENT_M] = "current.m.json",
38
};
39

40
static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
12,340✔
41
  fs[0] = taosMemoryCalloc(1, sizeof(*fs[0]));
12,340!
42
  if (fs[0] == NULL) {
12,402!
43
    return terrno;
×
44
  }
45

46
  fs[0]->tsdb = pTsdb;
12,402✔
47
  int32_t code = tsem_init(&fs[0]->canEdit, 0, 1);
12,402✔
48
  if (code) {
12,400✔
49
    taosMemoryFree(fs[0]);
2!
50
    return code;
×
51
  }
52

53
  fs[0]->fsstate = TSDB_FS_STATE_NORMAL;
12,398✔
54
  fs[0]->neid = 0;
12,398✔
55
  TARRAY2_INIT(fs[0]->fSetArr);
12,398✔
56
  TARRAY2_INIT(fs[0]->fSetArrTmp);
12,398✔
57

58
  return 0;
12,398✔
59
}
60

61
static void destroy_fs(STFileSystem **fs) {
12,409✔
62
  if (fs[0] == NULL) return;
12,409!
63

64
  TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
12,409!
65
  TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
12,409!
66
  if (tsem_destroy(&fs[0]->canEdit) != 0) {
12,409!
67
    tsdbError("failed to destroy semaphore");
×
68
  }
69
  taosMemoryFree(fs[0]);
12,409!
70
  fs[0] = NULL;
12,408✔
71
}
72

73
void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
79,878✔
74
  int32_t offset = 0;
79,878✔
75

76
  vnodeGetPrimaryPath(pTsdb->pVnode, false, fname, TSDB_FILENAME_LEN);
79,878✔
77
  offset = strlen(fname);
79,898✔
78
  snprintf(fname + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s%s", TD_DIRSEP, pTsdb->name, TD_DIRSEP,
79,898✔
79
           gCurrentFname[ftype]);
80
}
79,898✔
81

82
static int32_t save_json(const cJSON *json, const char *fname) {
23,083✔
83
  int32_t   code = 0;
23,083✔
84
  int32_t   lino;
85
  char     *data = NULL;
23,083✔
86
  TdFilePtr fp = NULL;
23,083✔
87

88
  data = cJSON_PrintUnformatted(json);
23,083✔
89
  if (data == NULL) {
23,085!
90
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
91
  }
92

93
  fp = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
23,085✔
94
  if (fp == NULL) {
23,081!
95
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
96
  }
97

98
  if (taosWriteFile(fp, data, strlen(data)) < 0) {
23,081!
99
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
100
  }
101

102
  if (taosFsyncFile(fp) < 0) {
23,087!
103
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
104
  }
105

106
_exit:
23,085✔
107
  if (code) {
23,085!
108
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
109
  }
110
  taosMemoryFree(data);
23,087!
111
  taosCloseFileWithLog(&fp);
23,087!
112
  return code;
23,087✔
113
}
114

115
static int32_t load_json(const char *fname, cJSON **json) {
2,663✔
116
  int32_t code = 0;
2,663✔
117
  int32_t lino;
118
  char   *data = NULL;
2,663✔
119

120
  TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
2,663✔
121
  if (fp == NULL) {
2,665!
122
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
123
  }
124

125
  int64_t size;
126
  code = taosFStatFile(fp, &size, NULL);
2,665✔
127
  if (code != 0) {
2,663!
128
    TSDB_CHECK_CODE(code, lino, _exit);
×
129
  }
130

131
  data = taosMemoryMalloc(size + 1);
2,663!
132
  if (data == NULL) {
2,665!
133
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
134
  }
135

136
  if (taosReadFile(fp, data, size) < 0) {
2,665!
137
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
138
  }
139
  data[size] = '\0';
2,664✔
140

141
  json[0] = cJSON_Parse(data);
2,664✔
142
  if (json[0] == NULL) {
2,657!
143
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
144
  }
145

146
_exit:
2,657✔
147
  if (code) {
2,657!
148
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
149
    json[0] = NULL;
×
150
  }
151
  taosCloseFileWithLog(&fp);
2,657!
152
  taosMemoryFree(data);
2,662!
153
  return code;
2,664✔
154
}
155

156
int32_t save_fs(const TFileSetArray *arr, const char *fname) {
23,086✔
157
  int32_t code = 0;
23,086✔
158
  int32_t lino = 0;
23,086✔
159

160
  cJSON *json = cJSON_CreateObject();
23,086✔
161
  if (json == NULL) {
23,083!
162
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
163
  }
164

165
  // fmtv
166
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
23,083!
167
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
168
  }
169

170
  // fset
171
  cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
23,080✔
172
  if (!ajson) {
23,083!
173
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
174
  }
175
  const STFileSet *fset;
176
  TARRAY2_FOREACH(arr, fset) {
51,439✔
177
    cJSON *item = cJSON_CreateObject();
28,356✔
178
    if (!item) {
28,355!
179
      TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
180
    }
181
    (void)cJSON_AddItemToArray(ajson, item);
28,355✔
182

183
    code = tsdbTFileSetToJson(fset, item);
28,357✔
184
    TSDB_CHECK_CODE(code, lino, _exit);
28,356!
185
  }
186

187
  code = save_json(json, fname);
23,083✔
188
  TSDB_CHECK_CODE(code, lino, _exit);
23,087!
189

190
_exit:
23,087✔
191
  if (code) {
23,087!
192
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
193
  }
194
  cJSON_Delete(json);
23,087✔
195
  return code;
23,087✔
196
}
197

198
static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
2,665✔
199
  int32_t code = 0;
2,665✔
200
  int32_t lino = 0;
2,665✔
201

202
  TARRAY2_CLEAR(arr, tsdbTFileSetClear);
2,665!
203

204
  // load json
205
  cJSON *json = NULL;
2,665✔
206
  code = load_json(fname, &json);
2,665✔
207
  TSDB_CHECK_CODE(code, lino, _exit);
2,664!
208

209
  // parse json
210
  const cJSON *item1;
211

212
  /* fmtv */
213
  item1 = cJSON_GetObjectItem(json, "fmtv");
2,664✔
214
  if (cJSON_IsNumber(item1)) {
2,662!
215
    if (item1->valuedouble != 1) {
2,656!
216
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
217
    }
218
  } else {
219
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
220
  }
221

222
  /* fset */
223
  item1 = cJSON_GetObjectItem(json, "fset");
2,656✔
224
  if (cJSON_IsArray(item1)) {
2,659!
225
    const cJSON *item2;
226
    cJSON_ArrayForEach(item2, item1) {
5,053!
227
      STFileSet *fset;
228
      code = tsdbJsonToTFileSet(pTsdb, item2, &fset);
2,390✔
229
      TSDB_CHECK_CODE(code, lino, _exit);
2,392!
230

231
      code = TARRAY2_APPEND(arr, fset);
2,392✔
232
      TSDB_CHECK_CODE(code, lino, _exit);
2,392!
233
    }
234
    TARRAY2_SORT(arr, tsdbTFileSetCmprFn);
2,663✔
235
  } else {
236
    code = TSDB_CODE_FILE_CORRUPTED;
×
237
    TSDB_CHECK_CODE(code, lino, _exit);
×
238
  }
239

240
_exit:
×
241
  if (code) {
2,663!
242
    tsdbError("%s failed at %s:%d since %s, fname:%s", __func__, __FILE__, lino, tstrerror(code), fname);
×
243
  }
244
  if (json) {
2,659!
245
    cJSON_Delete(json);
2,659✔
246
  }
247
  return code;
2,664✔
248
}
249

250
static int32_t apply_commit(STFileSystem *fs) {
13,343✔
251
  int32_t        code = 0;
13,343✔
252
  int32_t        lino;
253
  TFileSetArray *fsetArray1 = fs->fSetArr;
13,343✔
254
  TFileSetArray *fsetArray2 = fs->fSetArrTmp;
13,343✔
255
  int32_t        i1 = 0, i2 = 0;
13,343✔
256

257
  while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
41,826✔
258
    STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_GET(fsetArray1, i1) : NULL;
28,483✔
259
    STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_GET(fsetArray2, i2) : NULL;
28,483✔
260

261
    if (fset1 && fset2) {
28,483✔
262
      if (fset1->fid < fset2->fid) {
14,075✔
263
        // delete fset1
264
        tsdbTFileSetRemove(fset1);
120✔
265
        i1++;
120✔
266
      } else if (fset1->fid > fset2->fid) {
13,955✔
267
        // create new file set with fid of fset2->fid
268
        code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
21✔
269
        TSDB_CHECK_CODE(code, lino, _exit);
21!
270
        code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
21✔
271
        TSDB_CHECK_CODE(code, lino, _exit);
21!
272
        i1++;
21✔
273
        i2++;
21✔
274
      } else {
275
        // edit
276
        code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1);
13,934✔
277
        TSDB_CHECK_CODE(code, lino, _exit);
13,934!
278
        i1++;
13,934✔
279
        i2++;
13,934✔
280
      }
281
    } else if (fset1) {
14,408✔
282
      // delete fset1
283
      tsdbTFileSetRemove(fset1);
6✔
284
      i1++;
6✔
285
    } else {
286
      // create new file set with fid of fset2->fid
287
      code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
14,402✔
288
      TSDB_CHECK_CODE(code, lino, _exit);
14,402!
289
      code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
14,402✔
290
      TSDB_CHECK_CODE(code, lino, _exit);
14,402!
291
      i1++;
14,402✔
292
      i2++;
14,402✔
293
    }
294
  }
295

296
_exit:
13,343✔
297
  if (code) {
13,343!
298
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
299
  }
300
  return code;
13,343✔
301
}
302

303
static int32_t commit_edit(STFileSystem *fs) {
13,343✔
304
  char current[TSDB_FILENAME_LEN];
305
  char current_t[TSDB_FILENAME_LEN];
306

307
  current_fname(fs->tsdb, current, TSDB_FCURRENT);
13,343✔
308
  if (fs->etype == TSDB_FEDIT_COMMIT) {
13,343✔
309
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
9,892✔
310
  } else {
311
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
3,451✔
312
  }
313

314
  int32_t code;
315
  int32_t lino;
316
  TSDB_CHECK_CODE(taosRenameFile(current_t, current), lino, _exit);
13,343!
317

318
  code = apply_commit(fs);
13,343✔
319
  TSDB_CHECK_CODE(code, lino, _exit);
13,343!
320

321
_exit:
13,343✔
322
  if (code) {
13,343!
323
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
324
              tstrerror(code));
325
  } else {
326
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
13,343!
327
  }
328
  return code;
13,343✔
329
}
330

331
// static int32_t
332
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs);
333
static int32_t apply_abort(STFileSystem *fs) { return tsdbFSDoSanAndFix(fs); }
×
334

335
static int32_t abort_edit(STFileSystem *fs) {
×
336
  char fname[TSDB_FILENAME_LEN];
337

338
  if (fs->etype == TSDB_FEDIT_COMMIT) {
×
339
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
×
340
  } else {
341
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
×
342
  }
343

344
  int32_t code;
345
  int32_t lino;
346
  if ((code = taosRemoveFile(fname))) {
×
347
    code = TAOS_SYSTEM_ERROR(code);
×
348
    TSDB_CHECK_CODE(code, lino, _exit);
×
349
  }
350

351
  code = apply_abort(fs);
×
352
  TSDB_CHECK_CODE(code, lino, _exit);
×
353

354
_exit:
×
355
  if (code) {
×
356
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
357
              tstrerror(code));
358
  } else {
359
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
×
360
  }
361
  return code;
×
362
}
363

364
static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) {
4,113✔
365
  int32_t code = 0;
4,113✔
366
  int32_t lino = 0;
4,113✔
367

368
  // check file existence
369
  if (!taosCheckExistFile(fobj->fname)) {
4,113!
370
    bool found = false;
×
371

372
    if (tsSsEnabled && fobj->f->lcn > 1) {
×
373
      char fname1[TSDB_FILENAME_LEN];
374
      tsdbTFileLastChunkName(fs->tsdb, fobj->f, fname1);
×
375
      if (!taosCheckExistFile(fname1)) {
×
376
        code = TSDB_CODE_FILE_CORRUPTED;
×
377
        tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fname1);
×
378
        return code;
×
379
      }
380

381
      found = true;
×
382
    }
383

384
    if (!found) {
×
385
      code = TSDB_CODE_FILE_CORRUPTED;
×
386
      tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fobj->fname);
×
387
      return code;
×
388
    }
389
  }
390

391
  return 0;
4,116✔
392
}
393

394
static void tsdbFSDestroyFileObjHash(STFileHash *hash);
395

396
static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname) {
6,763✔
397
  STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry));
6,763!
398
  if (entry == NULL) return terrno;
6,765!
399

400
  tstrncpy(entry->fname, fname, TSDB_FILENAME_LEN);
6,765✔
401

402
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
6,765✔
403

404
  entry->next = hash->buckets[idx];
6,765✔
405
  hash->buckets[idx] = entry;
6,765✔
406
  hash->numFile++;
6,765✔
407

408
  return 0;
6,765✔
409
}
410

411
static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
2,649✔
412
  int32_t code = 0;
2,649✔
413
  int32_t lino;
414
  char    fname[TSDB_FILENAME_LEN];
415

416
  // init hash table
417
  hash->numFile = 0;
2,649✔
418
  hash->numBucket = 4096;
2,649✔
419
  hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *));
2,649!
420
  if (hash->buckets == NULL) {
2,648!
421
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
422
  }
423

424
  // vnode.json
425
  current_fname(fs->tsdb, fname, TSDB_FCURRENT);
2,648✔
426
  code = tsdbFSAddEntryToFileObjHash(hash, fname);
2,649✔
427
  TSDB_CHECK_CODE(code, lino, _exit);
2,649!
428

429
  // other
430
  STFileSet *fset = NULL;
2,649✔
431
  TARRAY2_FOREACH(fs->fSetArr, fset) {
5,034✔
432
    // data file
433
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
11,916✔
434
      if (fset->farr[i] != NULL) {
9,532✔
435
        code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname);
1,782✔
436
        TSDB_CHECK_CODE(code, lino, _exit);
1,782!
437

438
        if (TSDB_FTYPE_DATA == i && fset->farr[i]->f->lcn > 0) {
1,782!
439
          STFileObj *fobj = fset->farr[i];
×
440
          int32_t    lcn = fobj->f->lcn;
×
441
          char       lcn_name[TSDB_FILENAME_LEN];
442

443
          snprintf(lcn_name, TSDB_FQDN_LEN, "%s", fobj->fname);
×
444
          char *dot = strrchr(lcn_name, '.');
×
445
          if (dot) {
×
446
            snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lcn_name), "%d.data", lcn);
×
447

448
            code = tsdbFSAddEntryToFileObjHash(hash, lcn_name);
×
449
            TSDB_CHECK_CODE(code, lino, _exit);
×
450
          }
451
        }
452
      }
453
    }
454

455
    // stt file
456
    SSttLvl *lvl = NULL;
2,384✔
457
    TARRAY2_FOREACH(fset->lvlArr, lvl) {
4,616✔
458
      STFileObj *fobj;
459
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
4,565✔
460
        code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname);
2,333✔
461
        TSDB_CHECK_CODE(code, lino, _exit);
2,334!
462
      }
463
    }
464
  }
465

466
_exit:
2,650✔
467
  if (code) {
2,650!
468
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
469
    tsdbFSDestroyFileObjHash(hash);
×
470
  }
471
  return code;
2,649✔
472
}
473

474
static const STFileHashEntry *tsdbFSGetFileObjHashEntry(STFileHash *hash, const char *fname) {
6,763✔
475
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
6,763✔
476

477
  STFileHashEntry *entry = hash->buckets[idx];
6,765✔
478
  while (entry) {
6,806!
479
    if (strcmp(entry->fname, fname) == 0) {
6,806✔
480
      return entry;
6,765✔
481
    }
482
    entry = entry->next;
41✔
483
  }
484

485
  return NULL;
×
486
}
487

488
static void tsdbFSDestroyFileObjHash(STFileHash *hash) {
2,649✔
489
  for (int32_t i = 0; i < hash->numBucket; i++) {
10,797,423✔
490
    STFileHashEntry *entry = hash->buckets[i];
10,794,774✔
491
    while (entry) {
10,801,539✔
492
      STFileHashEntry *next = entry->next;
6,765✔
493
      taosMemoryFree(entry);
6,765!
494
      entry = next;
6,765✔
495
    }
496
  }
497
  taosMemoryFree(hash->buckets);
2,649!
498
  memset(hash, 0, sizeof(*hash));
2,648✔
499
}
2,648✔
500

501
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
2,661✔
502
  int32_t code = 0;
2,661✔
503
  int32_t lino = 0;
2,661✔
504
  int32_t corrupt = false;
2,661✔
505

506
  if (fs->tsdb->pVnode->mounted) goto _exit;
2,661✔
507

508
  {  // scan each file
509
    STFileSet *fset = NULL;
2,645✔
510
    TARRAY2_FOREACH(fs->fSetArr, fset) {
5,030✔
511
      // data file
512
      for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
11,899✔
513
        if (fset->farr[ftype] == NULL) continue;
9,518✔
514
        STFileObj *fobj = fset->farr[ftype];
1,782✔
515
        code = tsdbFSDoScanAndFixFile(fs, fobj);
1,782✔
516
        if (code) {
1,782!
517
          fset->maxVerValid = (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
518
          corrupt = true;
×
519
        }
520
      }
521

522
      // stt file
523
      SSttLvl *lvl;
524
      TARRAY2_FOREACH(fset->lvlArr, lvl) {
4,610✔
525
        STFileObj *fobj;
526
        TARRAY2_FOREACH(lvl->fobjArr, fobj) {
4,559✔
527
          code = tsdbFSDoScanAndFixFile(fs, fobj);
2,330✔
528
          if (code) {
2,334!
529
            fset->maxVerValid =
×
530
                (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
531
            corrupt = true;
×
532
          }
533
        }
534
      }
535
    }
536
  }
537

538
  if (corrupt) {
2,649!
539
    tsdbError("vgId:%d, not to clear dangling files due to fset incompleteness", TD_VID(fs->tsdb->pVnode));
×
540
    fs->fsstate = TSDB_FS_STATE_INCOMPLETE;
×
541
    code = 0;
×
542
    goto _exit;
×
543
  }
544

545
  {  // clear unreferenced files
546
    STfsDir *dir = NULL;
2,649✔
547
    TAOS_CHECK_GOTO(tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path, &dir), &lino, _exit);
2,649!
548

549
    STFileHash fobjHash = {0};
2,649✔
550
    code = tsdbFSCreateFileObjHash(fs, &fobjHash);
2,649✔
551
    if (code) goto _close_dir;
2,649!
552

553
    for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) {
12,063✔
554
      if (taosIsDir(file->aname)) continue;
9,414✔
555

556
      if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
6,763!
557
        tsdbRemoveFile(file->aname);
×
558
      }
559
    }
560

561
    tsdbFSDestroyFileObjHash(&fobjHash);
2,649✔
562

563
  _close_dir:
2,649✔
564
    tfsClosedir(dir);
2,649✔
565
  }
566

567
_exit:
2,665✔
568
  if (code) {
2,665!
569
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
570
  }
571
  return code;
2,665✔
572
}
573

574
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
2,664✔
575
  fs->neid = 0;
2,664✔
576

577
  // get max commit id
578
  const STFileSet *fset;
579
  TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
5,046✔
580

581
  // scan and fix
582
  int32_t code = 0;
2,662✔
583
  int32_t lino = 0;
2,662✔
584

585
  code = tsdbFSDoSanAndFix(fs);
2,662✔
586
  TSDB_CHECK_CODE(code, lino, _exit);
2,665!
587

588
_exit:
2,665✔
589
  if (code) {
2,665!
590
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
591
  }
592
  return code;
2,665✔
593
}
594

595
static int32_t tsdbFSDupState(STFileSystem *fs) {
16,007✔
596
  int32_t code;
597

598
  const TFileSetArray *src = fs->fSetArr;
16,007✔
599
  TFileSetArray       *dst = fs->fSetArrTmp;
16,007✔
600

601
  TARRAY2_CLEAR(dst, tsdbTFileSetClear);
29,977✔
602

603
  const STFileSet *fset1;
604
  TARRAY2_FOREACH(src, fset1) {
32,458✔
605
    STFileSet *fset2;
606
    code = tsdbTFileSetInitCopy(fs->tsdb, fset1, &fset2);
16,451✔
607
    if (code) return code;
16,451!
608
    code = TARRAY2_APPEND(dst, fset2);
16,451✔
609
    if (code) return code;
16,451!
610
  }
611

612
  return 0;
16,007✔
613
}
614

615
static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
12,397✔
616
  int32_t code = 0;
12,397✔
617
  int32_t lino = 0;
12,397✔
618
  STsdb  *pTsdb = fs->tsdb;
12,397✔
619

620
  char fCurrent[TSDB_FILENAME_LEN];
621
  char cCurrent[TSDB_FILENAME_LEN];
622
  char mCurrent[TSDB_FILENAME_LEN];
623

624
  current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
12,397✔
625
  current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
12,398✔
626
  current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
12,407✔
627

628
  if (taosCheckExistFile(fCurrent)) {  // current.json exists
12,408✔
629
    code = load_fs(pTsdb, fCurrent, fs->fSetArr);
2,665✔
630
    TSDB_CHECK_CODE(code, lino, _exit);
2,664!
631

632
    if (taosCheckExistFile(cCurrent)) {
2,664!
633
      // current.c.json exists
634

635
      fs->etype = TSDB_FEDIT_COMMIT;
×
636
      if (rollback) {
×
637
        code = abort_edit(fs);
×
638
        TSDB_CHECK_CODE(code, lino, _exit);
×
639
      } else {
640
        code = load_fs(pTsdb, cCurrent, fs->fSetArrTmp);
×
641
        TSDB_CHECK_CODE(code, lino, _exit);
×
642

643
        code = commit_edit(fs);
×
644
        TSDB_CHECK_CODE(code, lino, _exit);
×
645
      }
646
    } else if (taosCheckExistFile(mCurrent)) {
2,665!
647
      // current.m.json exists
648
      fs->etype = TSDB_FEDIT_MERGE;
×
649
      code = abort_edit(fs);
×
650
      TSDB_CHECK_CODE(code, lino, _exit);
×
651
    }
652

653
    code = tsdbFSDupState(fs);
2,665✔
654
    TSDB_CHECK_CODE(code, lino, _exit);
2,664!
655

656
    code = tsdbFSScanAndFix(fs);
2,664✔
657
    TSDB_CHECK_CODE(code, lino, _exit);
2,665!
658
  } else {
659
    code = save_fs(fs->fSetArr, fCurrent);
9,743✔
660
    TSDB_CHECK_CODE(code, lino, _exit);
9,744!
661
  }
662

663
_exit:
9,744✔
664
  if (code) {
12,409!
665
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
666
  } else {
667
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
12,409✔
668
  }
669
  return code;
12,409✔
670
}
671

672
static void close_file_system(STFileSystem *fs) {
12,409✔
673
  TARRAY2_CLEAR(fs->fSetArr, tsdbTFileSetClear);
29,224✔
674
  TARRAY2_CLEAR(fs->fSetArrTmp, tsdbTFileSetClear);
29,187✔
675
}
12,409✔
676

677
static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) {
×
678
  if (pSet1->fid < pSet2->fid) {
×
679
    return -1;
×
680
  } else if (pSet1->fid > pSet2->fid) {
×
681
    return 1;
×
682
  }
683
  return 0;
×
684
}
685

686
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
13,343✔
687
  int32_t code = 0;
13,343✔
688
  int32_t lino = 0;
13,343✔
689

690
  code = tsdbFSDupState(fs);
13,343✔
691
  if (code) return code;
13,343!
692

693
  TFileSetArray  *fsetArray = fs->fSetArrTmp;
13,343✔
694
  STFileSet      *fset = NULL;
13,343✔
695
  const STFileOp *op;
696
  int32_t         fid = INT32_MIN;
13,343✔
697
  TSKEY           now = taosGetTimestampMs();
13,343✔
698
  TARRAY2_FOREACH_PTR(opArray, op) {
55,290✔
699
    if (!fset || fset->fid != op->fid) {
41,947✔
700
      STFileSet tfset = {.fid = op->fid};
24,747✔
701
      fset = &tfset;
24,747✔
702
      STFileSet **fsetPtr = TARRAY2_SEARCH(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ);
24,747✔
703
      fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
24,747✔
704

705
      if (!fset) {
24,747✔
706
        code = tsdbTFileSetInit(op->fid, &fset);
14,423✔
707
        TSDB_CHECK_CODE(code, lino, _exit);
14,423!
708

709
        code = TARRAY2_SORT_INSERT(fsetArray, fset, tsdbTFileSetCmprFn);
14,423✔
710
        TSDB_CHECK_CODE(code, lino, _exit);
14,423!
711
      }
712
    }
713

714
    code = tsdbTFileSetEdit(fs->tsdb, fset, op);
41,947✔
715
    TSDB_CHECK_CODE(code, lino, _exit);
41,947!
716

717
    if (fid != op->fid) {
41,947✔
718
      fid = op->fid;
24,747✔
719
      if (etype == TSDB_FEDIT_COMMIT) {
24,747✔
720
        fset->lastCommit = now;
21,296✔
721
      } else if (etype == TSDB_FEDIT_COMPACT) {
3,451✔
722
        fset->lastCompact = now;
165✔
723
      } else if (etype == TSDB_FEDIT_SSMIGRATE) {
3,286!
724
        fset->lastMigrate = now;
×
725
      }
726
    }
727
  }
728

729
  // remove empty empty stt level and empty file set
730
  int32_t i = 0;
13,343✔
731
  while (i < TARRAY2_SIZE(fsetArray)) {
41,826✔
732
    fset = TARRAY2_GET(fsetArray, i);
28,483✔
733

734
    SSttLvl *lvl;
735
    int32_t  j = 0;
28,483✔
736
    while (j < TARRAY2_SIZE(fset->lvlArr)) {
65,973✔
737
      lvl = TARRAY2_GET(fset->lvlArr, j);
37,490✔
738

739
      if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
37,490✔
740
        TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
4,110!
741
      } else {
742
        j++;
33,380✔
743
      }
744
    }
745

746
    if (tsdbTFileSetIsEmpty(fset)) {
28,483✔
747
      TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear);
126!
748
    } else {
749
      i++;
28,357✔
750
    }
751
  }
752

753
_exit:
13,343✔
754
  if (code) {
13,343!
755
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
756
  }
757
  return code;
13,343✔
758
}
759

760
// return error code
761
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) {
12,383✔
762
  int32_t code;
763
  int32_t lino;
764

765
  code = tsdbCheckAndUpgradeFileSystem(pTsdb, rollback);
12,383✔
766
  TSDB_CHECK_CODE(code, lino, _exit);
12,408!
767

768
  code = create_fs(pTsdb, fs);
12,408✔
769
  TSDB_CHECK_CODE(code, lino, _exit);
12,399!
770

771
  code = open_fs(fs[0], rollback);
12,399✔
772
  TSDB_CHECK_CODE(code, lino, _exit);
12,409!
773

774
_exit:
12,409✔
775
  if (code) {
12,409!
776
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
777
    destroy_fs(fs);
×
778
  } else {
779
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
12,409✔
780
  }
781
  return code;
12,409✔
782
}
783

784
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block);
785
extern void tsdbStopAllCompTask(STsdb *tsdb);
786

787
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
12,504✔
788
  STFileSystem *fs = pTsdb->pFS;
12,504✔
789
  SArray       *asyncTasks = taosArrayInit(0, sizeof(SVATaskID));
12,504✔
790
  if (asyncTasks == NULL) {
12,505!
791
    return terrno;
×
792
  }
793

794
  (void)taosThreadMutexLock(&pTsdb->mutex);
12,505✔
795

796
  // disable
797
  pTsdb->bgTaskDisabled = true;
12,505✔
798

799
  // collect channel
800
  STFileSet *fset;
801
  TARRAY2_FOREACH(fs->fSetArr, fset) {
29,324✔
802
    if (taosArrayPush(asyncTasks, &fset->mergeTask) == NULL       //
33,637!
803
        || taosArrayPush(asyncTasks, &fset->compactTask) == NULL  //
33,635!
804
        || taosArrayPush(asyncTasks, &fset->retentionTask) == NULL) {
33,633!
805
      taosArrayDestroy(asyncTasks);
×
806
      (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
807
      return terrno;
×
808
    }
809
    tsdbFSSetBlockCommit(fset, false);
16,816✔
810
  }
811

812
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
12,505✔
813

814
  // destroy all channels
815
  for (int32_t k = 0; k < 2; k++) {
37,515✔
816
    for (int32_t i = 0; i < taosArrayGetSize(asyncTasks); i++) {
125,924✔
817
      SVATaskID *task = taosArrayGet(asyncTasks, i);
100,914✔
818
      if (k == 0) {
100,914✔
819
        (void)vnodeACancel(task);
50,457✔
820
      } else {
821
        vnodeAWait(task);
50,457✔
822
      }
823
    }
824
  }
825
  taosArrayDestroy(asyncTasks);
12,505✔
826

827
#ifdef TD_ENTERPRISE
828
  tsdbStopAllCompTask(pTsdb);
12,505✔
829
#endif
830
  return 0;
12,505✔
831
}
832

833
void tsdbEnableBgTask(STsdb *pTsdb) {
96✔
834
  (void)taosThreadMutexLock(&pTsdb->mutex);
96✔
835
  pTsdb->bgTaskDisabled = false;
96✔
836
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
96✔
837
}
96✔
838

839
void tsdbCloseFS(STFileSystem **fs) {
12,409✔
840
  if (fs[0] == NULL) return;
12,409!
841

842
  int32_t code = tsdbDisableAndCancelAllBgTask((*fs)->tsdb);
12,409✔
843
  if (code) {
12,409!
844
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID((*fs)->tsdb->pVnode), __func__, __LINE__,
×
845
              tstrerror(code));
846
  }
847
  close_file_system(fs[0]);
12,409✔
848
  destroy_fs(fs);
12,409✔
849
  return;
12,408✔
850
}
851

852
int64_t tsdbFSAllocEid(STFileSystem *fs) {
13,529✔
853
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
13,529✔
854
  int64_t cid = ++fs->neid;
13,530✔
855
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
13,530✔
856
  return cid;
13,530✔
857
}
858

859
void tsdbFSUpdateEid(STFileSystem *fs, int64_t cid) {
487✔
860
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
487✔
861
  fs->neid = TMAX(fs->neid, cid);
487✔
862
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
487✔
863
}
487✔
864

865
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
13,343✔
866
  int32_t code = 0;
13,343✔
867
  int32_t lino;
868
  char    current_t[TSDB_FILENAME_LEN];
869

870
  if (etype == TSDB_FEDIT_COMMIT) {
13,343✔
871
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
9,892✔
872
  } else {
873
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
3,451✔
874
  }
875

876
  if (tsem_wait(&fs->canEdit) != 0) {
13,343!
877
    tsdbError("vgId:%d failed to wait semaphore", TD_VID(fs->tsdb->pVnode));
×
878
  }
879
  fs->etype = etype;
13,343✔
880

881
  // edit
882
  code = edit_fs(fs, opArray, etype);
13,343✔
883
  TSDB_CHECK_CODE(code, lino, _exit);
13,343!
884

885
  // save fs
886
  code = save_fs(fs->fSetArrTmp, current_t);
13,343✔
887
  TSDB_CHECK_CODE(code, lino, _exit);
13,343!
888

889
_exit:
13,343✔
890
  if (code) {
13,343!
891
    tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, lino,
×
892
              tstrerror(code), etype);
893
  } else {
894
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, etype);
13,343!
895
  }
896
  return code;
13,343✔
897
}
898

899
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
41,918✔
900
  if (block) {
41,918✔
901
    fset->blockCommit = true;
66✔
902
  } else {
903
    fset->blockCommit = false;
41,852✔
904
    if (fset->numWaitCommit > 0) {
41,852✔
905
      (void)taosThreadCondSignal(&fset->canCommit);
41✔
906
    }
907
  }
908
}
41,918✔
909

910
void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
21,191✔
911
  (void)taosThreadMutexLock(&tsdb->mutex);
21,191✔
912
  STFileSet *fset;
913
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
21,191✔
914
  bool blockCommit = false;
21,191✔
915
  if (fset) {
21,191✔
916
    blockCommit = fset->blockCommit;
6,910✔
917
  }
918
  if (fset) {
21,191✔
919
    METRICS_TIMING_BLOCK(tsdb->pVnode->writeMetrics.block_commit_time, METRIC_LEVEL_HIGH, {
6,951!
920
      while (fset->blockCommit) {
921
        fset->numWaitCommit++;
922
        (void)taosThreadCondWait(&fset->canCommit, &tsdb->mutex);
923
        fset->numWaitCommit--;
924
      }
925
    });
926
  }
927
  if (blockCommit) {
21,191✔
928
    METRICS_UPDATE(tsdb->pVnode->writeMetrics.blocked_commit_count, METRIC_LEVEL_HIGH, 1);
41!
929
  }
930
  (void)taosThreadMutexUnlock(&tsdb->mutex);
21,191✔
931
  return;
21,191✔
932
}
933

934
// IMPORTANT: the caller must hold fs->tsdb->mutex
935
int32_t tsdbFSEditCommit(STFileSystem *fs) {
13,343✔
936
  int32_t code = 0;
13,343✔
937
  int32_t lino = 0;
13,343✔
938

939
  // commit
940
  code = commit_edit(fs);
13,343✔
941
  TSDB_CHECK_CODE(code, lino, _exit);
13,343!
942

943
  // schedule merge
944
  int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
13,343✔
945
  if (sttTrigger > 1 && !fs->tsdb->bgTaskDisabled) {
13,343✔
946
    STFileSet *fset;
947
    TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
38,025✔
948
      if (TARRAY2_SIZE(fset->lvlArr) == 0) {
25,102✔
949
        tsdbFSSetBlockCommit(fset, false);
1,079✔
950
        continue;
1,079✔
951
      }
952

953
      SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
24,023✔
954
      if (lvl->level != 0) {
24,023✔
955
        tsdbFSSetBlockCommit(fset, false);
2,637✔
956
        continue;
2,637✔
957
      }
958

959
      // bool    skipMerge = false;
960
      int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
21,386✔
961
      if (numFile >= sttTrigger && (!vnodeATaskValid(&fset->mergeTask))) {
21,386✔
962
        SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
3,269!
963
        if (arg == NULL) {
3,268!
964
          code = terrno;
×
965
          TSDB_CHECK_CODE(code, lino, _exit);
×
966
        }
967

968
        arg->tsdb = fs->tsdb;
3,268✔
969
        arg->fid = fset->fid;
3,268✔
970

971
        code = vnodeAsync(MERGE_TASK_ASYNC, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, &fset->mergeTask);
3,268✔
972
        TSDB_CHECK_CODE(code, lino, _exit);
3,268!
973
      }
974

975
      if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
21,385✔
976
        tsdbFSSetBlockCommit(fset, true);
66✔
977
      } else {
978
        tsdbFSSetBlockCommit(fset, false);
21,319✔
979
      }
980
    }
981
  }
982

983
_exit:
13,342✔
984
  if (code) {
13,342!
985
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
×
986
  } else {
987
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
13,342!
988
  }
989
  if (tsem_post(&fs->canEdit) != 0) {
13,343!
990
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
991
  }
992
  return code;
13,343✔
993
}
994

995
int32_t tsdbFSEditAbort(STFileSystem *fs) {
×
996
  int32_t code = abort_edit(fs);
×
997
  if (tsem_post(&fs->canEdit) != 0) {
×
998
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
999
  }
1000
  return code;
×
1001
}
1002

1003
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
56,567✔
1004
  STFileSet   tfset = {.fid = fid};
56,567✔
1005
  STFileSet  *pset = &tfset;
56,567✔
1006
  STFileSet **fsetPtr = TARRAY2_SEARCH(fs->fSetArr, &pset, tsdbTFileSetCmprFn, TD_EQ);
56,567✔
1007
  fset[0] = (fsetPtr == NULL) ? NULL : fsetPtr[0];
56,566✔
1008
}
56,566✔
1009

1010
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
84✔
1011
  int32_t    code = 0;
84✔
1012
  STFileSet *fset;
1013
  STFileSet *fset1;
1014

1015
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
84!
1016
  if (fsetArr[0] == NULL) return terrno;
84!
1017

1018
  TARRAY2_INIT(fsetArr[0]);
84✔
1019

1020
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
84✔
1021
  TARRAY2_FOREACH(fs->fSetArr, fset) {
84!
1022
    code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1);
×
1023
    if (code) break;
×
1024

1025
    code = TARRAY2_APPEND(fsetArr[0], fset1);
×
1026
    if (code) break;
×
1027
  }
1028
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
84✔
1029

1030
  if (code) {
84!
1031
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1032
    taosMemoryFree(fsetArr[0]);
×
1033
    fsetArr[0] = NULL;
×
1034
  }
1035
  return code;
84✔
1036
}
1037

1038
void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
87✔
1039
  if (fsetArr[0]) {
87!
1040
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
90!
1041
    taosMemoryFree(fsetArr[0]);
87!
1042
    fsetArr[0] = NULL;
87✔
1043
  }
1044
}
87✔
1045

1046
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
91✔
1047
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
91✔
1048
  int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr);
91✔
1049
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
91✔
1050
  return code;
91✔
1051
}
1052

1053
int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr) {
1,129,223✔
1054
  int32_t    code = 0;
1,129,223✔
1055
  STFileSet *fset, *fset1;
1056

1057
  fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0]));
1,129,223!
1058
  if (fsetArr[0] == NULL) return terrno;
1,129,868!
1059

1060
  TARRAY2_FOREACH(fs->fSetArr, fset) {
3,513,888✔
1061
    code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1);
2,383,018✔
1062
    if (code) break;
2,384,035!
1063

1064
    code = TARRAY2_APPEND(fsetArr[0], fset1);
2,384,035✔
1065
    if (code) {
2,384,020!
1066
      tsdbTFileSetClear(&fset1);
×
1067
      break;
×
1068
    }
1069
  }
1070

1071
  if (code) {
1,130,870!
1072
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1073
    taosMemoryFree(fsetArr[0]);
×
1074
    fsetArr[0] = NULL;
×
1075
  }
1076
  return code;
1,130,870✔
1077
}
1078

1079
void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
1,129,861✔
1080
  if (fsetArr[0]) {
1,129,861!
1081
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
3,515,975!
1082
    taosMemoryFreeClear(fsetArr[0]);
1,129,949!
1083
    fsetArr[0] = NULL;
1,129,941✔
1084
  }
1085
}
1,129,906✔
1086

1087
static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) {
6✔
1088
  int32_t   capacity = TARRAY2_SIZE(pRanges) * 2;
6✔
1089
  SHashObj *pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
6✔
1090
  if (pHash == NULL) {
6!
1091
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1092
    return NULL;
×
1093
  }
1094

1095
  for (int32_t i = 0; i < TARRAY2_SIZE(pRanges); i++) {
12✔
1096
    STFileSetRange *u = TARRAY2_GET(pRanges, i);
6✔
1097
    int32_t         fid = u->fid;
6✔
1098
    int32_t         code = taosHashPut(pHash, &fid, sizeof(fid), u, sizeof(*u));
6✔
1099
    tsdbDebug("range diff hash fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
6!
1100
  }
1101
  return pHash;
6✔
1102
}
1103

1104
int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr,
3✔
1105
                                       TFileOpArray *fopArr) {
1106
  int32_t    code = 0;
3✔
1107
  STFileSet *fset;
1108
  STFileSet *fset1;
1109
  SHashObj  *pHash = NULL;
3✔
1110

1111
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
3!
1112
  if (fsetArr == NULL) return terrno;
3!
1113
  TARRAY2_INIT(fsetArr[0]);
3✔
1114

1115
  if (pRanges) {
3!
1116
    pHash = tsdbFSetRangeArrayToHash(pRanges);
3✔
1117
    if (pHash == NULL) {
3!
1118
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1119
      goto _out;
×
1120
    }
1121
  }
1122

1123
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
3✔
1124
  TARRAY2_FOREACH(fs->fSetArr, fset) {
6✔
1125
    int64_t ever = VERSION_MAX;
3✔
1126
    if (pHash) {
3!
1127
      int32_t         fid = fset->fid;
3✔
1128
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
3✔
1129
      if (u) {
3!
1130
        ever = u->sver - 1;
3✔
1131
      }
1132
    }
1133

1134
    code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr);
3✔
1135
    if (code) break;
3!
1136

1137
    code = TARRAY2_APPEND(fsetArr[0], fset1);
3!
1138
    if (code) break;
3!
1139
  }
1140
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
3✔
1141

1142
_out:
3✔
1143
  if (code) {
3!
1144
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1145
    taosMemoryFree(fsetArr[0]);
×
1146
    fsetArr[0] = NULL;
×
1147
  }
1148
  if (pHash) {
3!
1149
    taosHashCleanup(pHash);
3✔
1150
    pHash = NULL;
3✔
1151
  }
1152
  return code;
3✔
1153
}
1154

1155
void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); }
3✔
1156

1157
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
3✔
1158
                                      TFileSetRangeArray **fsrArr) {
1159
  int32_t         code = 0;
3✔
1160
  STFileSet      *fset;
1161
  STFileSetRange *fsr1 = NULL;
3✔
1162
  SHashObj       *pHash = NULL;
3✔
1163

1164
  fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
3!
1165
  if (fsrArr[0] == NULL) {
3!
1166
    code = terrno;
×
1167
    goto _out;
×
1168
  }
1169

1170
  tsdbInfo("pRanges size:%d", (pRanges == NULL ? 0 : TARRAY2_SIZE(pRanges)));
3!
1171
  if (pRanges) {
3!
1172
    pHash = tsdbFSetRangeArrayToHash(pRanges);
3✔
1173
    if (pHash == NULL) {
3!
1174
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1175
      goto _out;
×
1176
    }
1177
  }
1178

1179
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
3✔
1180
  TARRAY2_FOREACH(fs->fSetArr, fset) {
6✔
1181
    int64_t sver1 = sver;
3✔
1182
    int64_t ever1 = ever;
3✔
1183

1184
    if (pHash) {
3!
1185
      int32_t         fid = fset->fid;
3✔
1186
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
3✔
1187
      if (u) {
3!
1188
        sver1 = u->sver;
3✔
1189
        tsdbDebug("range hash get fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
3!
1190
      }
1191
    }
1192

1193
    if (sver1 > ever1) {
3!
1194
      tsdbDebug("skip fid:%d, sver:%" PRId64 ", ever:%" PRId64, fset->fid, sver1, ever1);
×
1195
      continue;
×
1196
    }
1197

1198
    tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
3!
1199

1200
    code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
3✔
1201
    if (code) break;
3!
1202

1203
    code = TARRAY2_APPEND(fsrArr[0], fsr1);
3!
1204
    if (code) break;
3!
1205

1206
    fsr1 = NULL;
3✔
1207
  }
1208
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
3✔
1209

1210
  if (code) {
3!
1211
    tsdbTFileSetRangeClear(&fsr1);
×
1212
    TARRAY2_DESTROY(fsrArr[0], tsdbTFileSetRangeClear);
×
1213
    fsrArr[0] = NULL;
×
1214
  }
1215

1216
_out:
3✔
1217
  if (pHash) {
3!
1218
    taosHashCleanup(pHash);
3✔
1219
    pHash = NULL;
3✔
1220
  }
1221
  return code;
3✔
1222
}
1223

1224
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { tsdbTFileSetRangeArrayDestroy(fsrArr); }
3✔
1225

1226
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task, STFileSet **fset) {
24,829✔
1227
  // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
1228
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
24,829✔
1229

1230
  tsdbFSGetFSet(tsdb->pFS, fid, fset);
24,829✔
1231
  if (*fset == NULL) {
24,828✔
1232
    return;
14,280✔
1233
  }
1234

1235
  struct STFileSetCond *cond = NULL;
10,548✔
1236
  if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
10,548✔
1237
    cond = &(*fset)->conds[0];
7,024✔
1238
  } else {
1239
    cond = &(*fset)->conds[1];
3,524✔
1240
  }
1241

1242
  while (1) {
1243
    if (cond->running) {
10,548!
1244
      cond->numWait++;
×
1245
      (void)taosThreadCondWait(&cond->cond, &tsdb->mutex);
×
1246
      cond->numWait--;
×
1247
    } else {
1248
      cond->running = true;
10,548✔
1249
      break;
10,548✔
1250
    }
1251
  }
1252

1253
  tsdbTrace("vgId:%d begin %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
10,548✔
1254
  return;
10,548✔
1255
}
1256

1257
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task) {
10,548✔
1258
  // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
1259
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
10,548✔
1260

1261
  STFileSet *fset = NULL;
10,548✔
1262
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
10,548✔
1263
  if (fset == NULL) {
10,548!
1264
    return;
×
1265
  }
1266

1267
  struct STFileSetCond *cond = NULL;
10,548✔
1268
  if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
10,548✔
1269
    cond = &fset->conds[0];
7,024✔
1270
  } else {
1271
    cond = &fset->conds[1];
3,524✔
1272
  }
1273

1274
  cond->running = false;
10,548✔
1275
  if (cond->numWait > 0) {
10,548!
1276
    (void)taosThreadCondSignal(&cond->cond);
×
1277
  }
1278

1279
  tsdbTrace("vgId:%d finish %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
10,548✔
1280
  return;
10,548✔
1281
}
1282

1283
struct SFileSetReader {
1284
  STsdb     *pTsdb;
1285
  STFileSet *pFileSet;
1286
  int32_t    fid;
1287
  int64_t    startTime;
1288
  int64_t    endTime;
1289
  int64_t    lastCompactTime;
1290
  int64_t    totalSize;
1291
};
1292

1293
int32_t tsdbFileSetReaderOpen(void *pVnode, struct SFileSetReader **ppReader) {
×
1294
  if (pVnode == NULL || ppReader == NULL) {
×
1295
    return TSDB_CODE_INVALID_PARA;
×
1296
  }
1297

1298
  STsdb *pTsdb = ((SVnode *)pVnode)->pTsdb;
×
1299

1300
  (*ppReader) = taosMemoryCalloc(1, sizeof(struct SFileSetReader));
×
1301
  if (*ppReader == NULL) {
×
1302
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, __LINE__,
×
1303
              tstrerror(terrno));
1304
    return terrno;
×
1305
  }
1306

1307
  (*ppReader)->pTsdb = pTsdb;
×
1308
  (*ppReader)->fid = INT32_MIN;
×
1309
  (*ppReader)->pFileSet = NULL;
×
1310

1311
  return TSDB_CODE_SUCCESS;
×
1312
}
1313

1314
extern bool tsdbShouldCompact(const STFileSet *pFileSet, int32_t vgId);
1315

1316
#ifndef TD_ENTERPRISE
1317
bool tsdbShouldCompact(const STFileSet *pFileSet, int32_t vgId) { return false; }
1318
#endif
1319

1320
static int32_t tsdbFileSetReaderNextNoLock(struct SFileSetReader *pReader) {
×
1321
  STsdb  *pTsdb = pReader->pTsdb;
×
1322
  int32_t code = TSDB_CODE_SUCCESS;
×
1323

1324
  tsdbTFileSetClear(&pReader->pFileSet);
×
1325

1326
  STFileSet *fset = &(STFileSet){
×
1327
      .fid = pReader->fid,
×
1328
  };
1329

1330
  STFileSet **fsetPtr = TARRAY2_SEARCH(pReader->pTsdb->pFS->fSetArr, &fset, tsdbTFileSetCmprFn, TD_GT);
×
1331
  if (fsetPtr == NULL) {
×
1332
    pReader->fid = INT32_MAX;
×
1333
    return TSDB_CODE_NOT_FOUND;
×
1334
  }
1335

1336
  // ref file set
1337
  code = tsdbTFileSetInitRef(pReader->pTsdb, *fsetPtr, &pReader->pFileSet);
×
1338
  if (code) return code;
×
1339

1340
  // get file set details
1341
  pReader->fid = pReader->pFileSet->fid;
×
1342
  tsdbFidKeyRange(pReader->fid, pTsdb->keepCfg.days, pTsdb->keepCfg.precision, &pReader->startTime, &pReader->endTime);
×
1343
  pReader->lastCompactTime = pReader->pFileSet->lastCompact;
×
1344
  pReader->totalSize = 0;
×
1345
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
×
1346
    STFileObj *fobj = pReader->pFileSet->farr[i];
×
1347
    if (fobj) {
×
1348
      pReader->totalSize += fobj->f->size;
×
1349
    }
1350
  }
1351
  SSttLvl *lvl;
1352
  TARRAY2_FOREACH(pReader->pFileSet->lvlArr, lvl) {
×
1353
    STFileObj *fobj;
1354
    TARRAY2_FOREACH(lvl->fobjArr, fobj) { pReader->totalSize += fobj->f->size; }
×
1355
  }
1356

1357
  return code;
×
1358
}
1359

1360
int32_t tsdbFileSetReaderNext(struct SFileSetReader *pReader) {
×
1361
  int32_t code = TSDB_CODE_SUCCESS;
×
1362
  (void)taosThreadMutexLock(&pReader->pTsdb->mutex);
×
1363
  code = tsdbFileSetReaderNextNoLock(pReader);
×
1364
  (void)taosThreadMutexUnlock(&pReader->pTsdb->mutex);
×
1365
  return code;
×
1366
}
1367

1368
int32_t tsdbFileSetGetEntryField(struct SFileSetReader *pReader, const char *field, void *value) {
×
1369
  const char *fieldName;
1370

1371
  if (pReader->fid == INT32_MIN || pReader->fid == INT32_MAX) {
×
1372
    return TSDB_CODE_INVALID_PARA;
×
1373
  }
1374

1375
  fieldName = "fileset_id";
×
1376
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1377
    *(int32_t *)value = pReader->fid;
×
1378
    return TSDB_CODE_SUCCESS;
×
1379
  }
1380

1381
  fieldName = "start_time";
×
1382
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1383
    *(int64_t *)value = pReader->startTime;
×
1384
    return TSDB_CODE_SUCCESS;
×
1385
  }
1386

1387
  fieldName = "end_time";
×
1388
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1389
    *(int64_t *)value = pReader->endTime;
×
1390
    return TSDB_CODE_SUCCESS;
×
1391
  }
1392

1393
  fieldName = "total_size";
×
1394
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1395
    *(int64_t *)value = pReader->totalSize;
×
1396
    return TSDB_CODE_SUCCESS;
×
1397
  }
1398

1399
  fieldName = "last_compact_time";
×
1400
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1401
    *(int64_t *)value = pReader->lastCompactTime;
×
1402
    return TSDB_CODE_SUCCESS;
×
1403
  }
1404

1405
  fieldName = "should_compact";
×
1406
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1407
    *(char *)value = tsdbShouldCompact(pReader->pFileSet, pReader->pTsdb->pVnode->config.vgId);
×
1408
    return TSDB_CODE_SUCCESS;
×
1409
  }
1410

1411
  fieldName = "details";
×
1412
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1413
    // TODO
1414
    return TSDB_CODE_SUCCESS;
×
1415
  }
1416

1417
  return TSDB_CODE_INVALID_PARA;
×
1418
}
1419

1420
void tsdbFileSetReaderClose(struct SFileSetReader **ppReader) {
×
1421
  if (ppReader == NULL || *ppReader == NULL) {
×
1422
    return;
×
1423
  }
1424

1425
  tsdbTFileSetClear(&(*ppReader)->pFileSet);
×
1426
  taosMemoryFree(*ppReader);
×
1427

1428
  *ppReader = NULL;
×
1429
  return;
×
1430
}
1431

1432
static FORCE_INLINE void getLevelSize(const STFileObj *fObj, int64_t szArr[TFS_MAX_TIERS]) {
1433
  if (fObj == NULL) return;
40,436!
1434

1435
  int64_t sz = fObj->f->size;
26,252✔
1436
  // level == 0, primary storage
1437
  // level == 1, second storage,
1438
  // level == 2, third storage
1439
  int32_t level = fObj->f->did.level;
26,252✔
1440
  if (level >= 0 && level < TFS_MAX_TIERS) {
26,252!
1441
    szArr[level] += sz;
26,252✔
1442
  }
1443
}
1444

1445
static FORCE_INLINE int32_t tsdbGetFsSizeImpl(STsdb *tsdb, SDbSizeStatisInfo *pInfo) {
1446
  int32_t code = 0;
15,992✔
1447
  int64_t levelSize[TFS_MAX_TIERS] = {0};
15,992✔
1448
  int64_t ssSize = 0;
15,992✔
1449

1450
  const STFileSet *fset;
1451
  const SSttLvl   *stt = NULL;
15,992✔
1452
  const STFileObj *fObj = NULL;
15,992✔
1453

1454
  SVnodeCfg *pCfg = &tsdb->pVnode->config;
15,992✔
1455
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
15,992✔
1456

1457
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
28,747✔
1458
    for (int32_t t = TSDB_FTYPE_MIN; t < TSDB_FTYPE_MAX; ++t) {
63,775✔
1459
      getLevelSize(fset->farr[t], levelSize);
51,020✔
1460
    }
1461

1462
    TARRAY2_FOREACH(fset->lvlArr, stt) {
20,398✔
1463
      TARRAY2_FOREACH(stt->fobjArr, fObj) { getLevelSize(fObj, levelSize); }
15,477✔
1464
    }
1465

1466
    fObj = fset->farr[TSDB_FTYPE_DATA];
12,755✔
1467
    if (fObj) {
12,755✔
1468
      int32_t lcn = fObj->f->lcn;
6,012✔
1469
      if (lcn > 1) {
6,012!
1470
        ssSize += ((lcn - 1) * chunksize);
×
1471
      }
1472
    }
1473
  }
1474

1475
  pInfo->l1Size = levelSize[0];
15,992✔
1476
  pInfo->l2Size = levelSize[1];
15,992✔
1477
  pInfo->l3Size = levelSize[2];
15,992✔
1478
  pInfo->ssSize = ssSize;
15,992✔
1479
  return code;
15,992✔
1480
}
1481
int32_t tsdbGetFsSize(STsdb *tsdb, SDbSizeStatisInfo *pInfo) {
15,992✔
1482
  int32_t code = 0;
15,992✔
1483

1484
  (void)taosThreadMutexLock(&tsdb->mutex);
15,992✔
1485
  code = tsdbGetFsSizeImpl(tsdb, pInfo);
15,992✔
1486
  (void)taosThreadMutexUnlock(&tsdb->mutex);
15,992✔
1487
  return code;
15,992✔
1488
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc