• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.92
/source/dnode/vnode/src/tsdb/tsdbFS2.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbFS2.h"
17
#include "cos.h"
18
#include "tsdbUpgrade.h"
19
#include "vnd.h"
20

21
#define BLOCK_COMMIT_FACTOR 3
22

23
typedef struct STFileHashEntry {
24
  struct STFileHashEntry *next;
25
  char                    fname[TSDB_FILENAME_LEN];
26
} STFileHashEntry;
27

28
typedef struct {
29
  int32_t           numFile;
30
  int32_t           numBucket;
31
  STFileHashEntry **buckets;
32
} STFileHash;
33

34
static const char *gCurrentFname[] = {
35
    [TSDB_FCURRENT] = "current.json",
36
    [TSDB_FCURRENT_C] = "current.c.json",
37
    [TSDB_FCURRENT_M] = "current.m.json",
38
};
39

40
static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
11,846✔
41
  fs[0] = taosMemoryCalloc(1, sizeof(*fs[0]));
11,846!
42
  if (fs[0] == NULL) {
11,935!
43
    return terrno;
×
44
  }
45

46
  fs[0]->tsdb = pTsdb;
11,935✔
47
  int32_t code = tsem_init(&fs[0]->canEdit, 0, 1);
11,935✔
48
  if (code) {
11,930✔
49
    taosMemoryFree(fs[0]);
2!
50
    return code;
×
51
  }
52

53
  fs[0]->fsstate = TSDB_FS_STATE_NORMAL;
11,928✔
54
  fs[0]->neid = 0;
11,928✔
55
  TARRAY2_INIT(fs[0]->fSetArr);
11,928✔
56
  TARRAY2_INIT(fs[0]->fSetArrTmp);
11,928✔
57

58
  return 0;
11,928✔
59
}
60

61
static void destroy_fs(STFileSystem **fs) {
11,941✔
62
  if (fs[0] == NULL) return;
11,941!
63

64
  TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
11,941!
65
  TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
11,941!
66
  if (tsem_destroy(&fs[0]->canEdit) != 0) {
11,941!
67
    tsdbError("failed to destroy semaphore");
×
68
  }
69
  taosMemoryFree(fs[0]);
11,941!
70
  fs[0] = NULL;
11,940✔
71
}
72

73
void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
78,727✔
74
  int32_t offset = 0;
78,727✔
75

76
  vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->diskPrimary, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN);
78,727✔
77
  offset = strlen(fname);
78,778✔
78
  snprintf(fname + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, gCurrentFname[ftype]);
78,778✔
79
}
78,778✔
80

81
static int32_t save_json(const cJSON *json, const char *fname) {
23,259✔
82
  int32_t   code = 0;
23,259✔
83
  int32_t   lino;
84
  char     *data = NULL;
23,259✔
85
  TdFilePtr fp = NULL;
23,259✔
86

87
  data = cJSON_PrintUnformatted(json);
23,259✔
88
  if (data == NULL) {
23,259!
89
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
90
  }
91

92
  fp = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
23,259✔
93
  if (fp == NULL) {
23,263!
94
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
95
  }
96

97
  if (taosWriteFile(fp, data, strlen(data)) < 0) {
23,263!
98
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
99
  }
100

101
  if (taosFsyncFile(fp) < 0) {
23,266!
102
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
103
  }
104

105
_exit:
23,269✔
106
  if (code) {
23,269!
107
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
108
  }
109
  taosMemoryFree(data);
23,269!
110
  taosCloseFileWithLog(&fp);
23,268!
111
  return code;
23,269✔
112
}
113

114
static int32_t load_json(const char *fname, cJSON **json) {
2,226✔
115
  int32_t code = 0;
2,226✔
116
  int32_t lino;
117
  char   *data = NULL;
2,226✔
118

119
  TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
2,226✔
120
  if (fp == NULL) {
2,237!
121
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
122
  }
123

124
  int64_t size;
125
  code = taosFStatFile(fp, &size, NULL);
2,237✔
126
  if (code != 0) {
2,240!
127
    TSDB_CHECK_CODE(code, lino, _exit);
×
128
  }
129

130
  data = taosMemoryMalloc(size + 1);
2,240!
131
  if (data == NULL) {
2,246!
132
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
133
  }
134

135
  if (taosReadFile(fp, data, size) < 0) {
2,246!
136
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
137
  }
138
  data[size] = '\0';
2,237✔
139

140
  json[0] = cJSON_Parse(data);
2,237✔
141
  if (json[0] == NULL) {
2,240!
142
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
143
  }
144

145
_exit:
2,240✔
146
  if (code) {
2,240!
147
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
148
    json[0] = NULL;
×
149
  }
150
  taosCloseFileWithLog(&fp);
2,240!
151
  taosMemoryFree(data);
2,247!
152
  return code;
2,247✔
153
}
154

155
int32_t save_fs(const TFileSetArray *arr, const char *fname) {
23,262✔
156
  int32_t code = 0;
23,262✔
157
  int32_t lino = 0;
23,262✔
158

159
  cJSON *json = cJSON_CreateObject();
23,262✔
160
  if (json == NULL) {
23,264!
161
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
162
  }
163

164
  // fmtv
165
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
23,264!
166
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
167
  }
168

169
  // fset
170
  cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
23,260✔
171
  if (!ajson) {
23,260!
172
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
173
  }
174
  const STFileSet *fset;
175
  TARRAY2_FOREACH(arr, fset) {
378,057✔
176
    cJSON *item = cJSON_CreateObject();
354,796✔
177
    if (!item) {
354,813!
178
      TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
179
    }
180
    (void)cJSON_AddItemToArray(ajson, item);
354,813✔
181

182
    code = tsdbTFileSetToJson(fset, item);
354,798✔
183
    TSDB_CHECK_CODE(code, lino, _exit);
354,797!
184
  }
185

186
  code = save_json(json, fname);
23,261✔
187
  TSDB_CHECK_CODE(code, lino, _exit);
23,269!
188

189
_exit:
23,269✔
190
  if (code) {
23,269!
191
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
192
  }
193
  cJSON_Delete(json);
23,269✔
194
  return code;
23,269✔
195
}
196

197
static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
2,235✔
198
  int32_t code = 0;
2,235✔
199
  int32_t lino = 0;
2,235✔
200

201
  TARRAY2_CLEAR(arr, tsdbTFileSetClear);
2,235!
202

203
  // load json
204
  cJSON *json = NULL;
2,235✔
205
  code = load_json(fname, &json);
2,235✔
206
  TSDB_CHECK_CODE(code, lino, _exit);
2,247!
207

208
  // parse json
209
  const cJSON *item1;
210

211
  /* fmtv */
212
  item1 = cJSON_GetObjectItem(json, "fmtv");
2,247✔
213
  if (cJSON_IsNumber(item1)) {
2,247!
214
    if (item1->valuedouble != 1) {
2,230!
215
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
216
    }
217
  } else {
218
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
219
  }
220

221
  /* fset */
222
  item1 = cJSON_GetObjectItem(json, "fset");
2,230✔
223
  if (cJSON_IsArray(item1)) {
2,247!
224
    const cJSON *item2;
225
    cJSON_ArrayForEach(item2, item1) {
3,829!
226
      STFileSet *fset;
227
      code = tsdbJsonToTFileSet(pTsdb, item2, &fset);
1,591✔
228
      TSDB_CHECK_CODE(code, lino, _exit);
1,590!
229

230
      code = TARRAY2_APPEND(arr, fset);
1,590✔
231
      TSDB_CHECK_CODE(code, lino, _exit);
1,591!
232
    }
233
    TARRAY2_SORT(arr, tsdbTFileSetCmprFn);
2,238✔
234
  } else {
235
    code = TSDB_CODE_FILE_CORRUPTED;
×
236
    TSDB_CHECK_CODE(code, lino, _exit);
×
237
  }
238

239
_exit:
×
240
  if (code) {
2,237!
241
    tsdbError("%s failed at %sP%d since %s, fname:%s", __func__, __FILE__, lino, tstrerror(code), fname);
×
242
  }
243
  if (json) {
2,231!
244
    cJSON_Delete(json);
2,231✔
245
  }
246
  return code;
2,242✔
247
}
248

249
static int32_t apply_commit(STFileSystem *fs) {
13,575✔
250
  int32_t        code = 0;
13,575✔
251
  int32_t        lino;
252
  TFileSetArray *fsetArray1 = fs->fSetArr;
13,575✔
253
  TFileSetArray *fsetArray2 = fs->fSetArrTmp;
13,575✔
254
  int32_t        i1 = 0, i2 = 0;
13,575✔
255

256
  while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
368,391✔
257
    STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_GET(fsetArray1, i1) : NULL;
354,816✔
258
    STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_GET(fsetArray2, i2) : NULL;
354,816!
259

260
    if (fset1 && fset2) {
354,816!
261
      if (fset1->fid < fset2->fid) {
24,905!
262
        // delete fset1
263
        tsdbTFileSetRemove(fset1);
×
264
        i1++;
×
265
      } else if (fset1->fid > fset2->fid) {
24,905✔
266
        // create new file set with fid of fset2->fid
267
        code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
31✔
268
        TSDB_CHECK_CODE(code, lino, _exit);
31!
269
        code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
31✔
270
        TSDB_CHECK_CODE(code, lino, _exit);
31!
271
        i1++;
31✔
272
        i2++;
31✔
273
      } else {
274
        // edit
275
        code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1);
24,874✔
276
        TSDB_CHECK_CODE(code, lino, _exit);
24,874!
277
        i1++;
24,874✔
278
        i2++;
24,874✔
279
      }
280
    } else if (fset1) {
329,911!
281
      // delete fset1
UNCOV
282
      tsdbTFileSetRemove(fset1);
×
UNCOV
283
      i1++;
×
284
    } else {
285
      // create new file set with fid of fset2->fid
286
      code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
329,911✔
287
      TSDB_CHECK_CODE(code, lino, _exit);
329,911!
288
      code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
329,911✔
289
      TSDB_CHECK_CODE(code, lino, _exit);
329,911!
290
      i1++;
329,911✔
291
      i2++;
329,911✔
292
    }
293
  }
294

295
_exit:
13,575✔
296
  if (code) {
13,575!
297
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
298
  }
299
  return code;
13,575✔
300
}
301

302
static int32_t commit_edit(STFileSystem *fs) {
13,575✔
303
  char current[TSDB_FILENAME_LEN];
304
  char current_t[TSDB_FILENAME_LEN];
305

306
  current_fname(fs->tsdb, current, TSDB_FCURRENT);
13,575✔
307
  if (fs->etype == TSDB_FEDIT_COMMIT) {
13,575✔
308
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
9,250✔
309
  } else {
310
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
4,325✔
311
  }
312

313
  int32_t code;
314
  int32_t lino;
315
  TSDB_CHECK_CODE(taosRenameFile(current_t, current), lino, _exit);
13,575!
316

317
  code = apply_commit(fs);
13,575✔
318
  TSDB_CHECK_CODE(code, lino, _exit);
13,575!
319

320
_exit:
13,575✔
321
  if (code) {
13,575!
322
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
323
              tstrerror(code));
324
  } else {
325
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
13,575!
326
  }
327
  return code;
13,575✔
328
}
329

330
// static int32_t
331
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs);
332
static int32_t apply_abort(STFileSystem *fs) { return tsdbFSDoSanAndFix(fs); }
×
333

334
static int32_t abort_edit(STFileSystem *fs) {
×
335
  char fname[TSDB_FILENAME_LEN];
336

337
  if (fs->etype == TSDB_FEDIT_COMMIT) {
×
338
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
×
339
  } else {
340
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
×
341
  }
342

343
  int32_t code;
344
  int32_t lino;
345
  if ((code = taosRemoveFile(fname))) {
×
346
    code = TAOS_SYSTEM_ERROR(code);
×
347
    TSDB_CHECK_CODE(code, lino, _exit);
×
348
  }
349

350
  code = apply_abort(fs);
×
351
  TSDB_CHECK_CODE(code, lino, _exit);
×
352

353
_exit:
×
354
  if (code) {
×
355
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
356
              tstrerror(code));
357
  } else {
358
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
×
359
  }
360
  return code;
×
361
}
362

363
static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) {
2,337✔
364
  int32_t code = 0;
2,337✔
365
  int32_t lino = 0;
2,337✔
366

367
  // check file existence
368
  if (!taosCheckExistFile(fobj->fname)) {
2,337!
369
    bool found = false;
×
370

371
    if (tsS3Enabled && fobj->f->lcn > 1) {
×
372
      char fname1[TSDB_FILENAME_LEN];
373
      tsdbTFileLastChunkName(fs->tsdb, fobj->f, fname1);
×
374
      if (!taosCheckExistFile(fname1)) {
×
375
        code = TSDB_CODE_FILE_CORRUPTED;
×
376
        tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fname1);
×
377
        return code;
×
378
      }
379

380
      found = true;
×
381
    }
382

383
    if (!found) {
×
384
      code = TSDB_CODE_FILE_CORRUPTED;
×
385
      tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fobj->fname);
×
386
      return code;
×
387
    }
388
  }
389

390
  return 0;
2,343✔
391
}
392

393
static void tsdbFSDestroyFileObjHash(STFileHash *hash);
394

395
static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname) {
4,582✔
396
  STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry));
4,582!
397
  if (entry == NULL) return terrno;
4,587!
398

399
  tstrncpy(entry->fname, fname, TSDB_FILENAME_LEN);
4,587✔
400

401
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
4,587✔
402

403
  entry->next = hash->buckets[idx];
4,590✔
404
  hash->buckets[idx] = entry;
4,590✔
405
  hash->numFile++;
4,590✔
406

407
  return 0;
4,590✔
408
}
409

410
static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
2,227✔
411
  int32_t code = 0;
2,227✔
412
  int32_t lino;
413
  char    fname[TSDB_FILENAME_LEN];
414

415
  // init hash table
416
  hash->numFile = 0;
2,227✔
417
  hash->numBucket = 4096;
2,227✔
418
  hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *));
2,227!
419
  if (hash->buckets == NULL) {
2,247!
420
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
421
  }
422

423
  // vnode.json
424
  current_fname(fs->tsdb, fname, TSDB_FCURRENT);
2,247✔
425
  code = tsdbFSAddEntryToFileObjHash(hash, fname);
2,244✔
426
  TSDB_CHECK_CODE(code, lino, _exit);
2,246!
427

428
  // other
429
  STFileSet *fset = NULL;
2,246✔
430
  TARRAY2_FOREACH(fs->fSetArr, fset) {
3,840✔
431
    // data file
432
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
7,970✔
433
      if (fset->farr[i] != NULL) {
6,376✔
434
        code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname);
688✔
435
        TSDB_CHECK_CODE(code, lino, _exit);
688!
436

437
        if (TSDB_FTYPE_DATA == i && fset->farr[i]->f->lcn > 0) {
688!
438
          STFileObj *fobj = fset->farr[i];
×
439
          int32_t    lcn = fobj->f->lcn;
×
440
          char       lcn_name[TSDB_FILENAME_LEN];
441

442
          snprintf(lcn_name, TSDB_FQDN_LEN, "%s", fobj->fname);
×
443
          char *dot = strrchr(lcn_name, '.');
×
444
          if (dot) {
×
445
            snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lcn_name), "%d.data", lcn);
×
446

447
            code = tsdbFSAddEntryToFileObjHash(hash, lcn_name);
×
448
            TSDB_CHECK_CODE(code, lino, _exit);
×
449
          }
450
        }
451
      }
452
    }
453

454
    // stt file
455
    SSttLvl *lvl = NULL;
1,594✔
456
    TARRAY2_FOREACH(fset->lvlArr, lvl) {
3,148✔
457
      STFileObj *fobj;
458
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
3,210✔
459
        code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname);
1,656✔
460
        TSDB_CHECK_CODE(code, lino, _exit);
1,656!
461
      }
462
    }
463
  }
464

465
_exit:
2,246✔
466
  if (code) {
2,246!
467
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
468
    tsdbFSDestroyFileObjHash(hash);
×
469
  }
470
  return code;
2,247✔
471
}
472

473
static const STFileHashEntry *tsdbFSGetFileObjHashEntry(STFileHash *hash, const char *fname) {
4,584✔
474
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
4,584✔
475

476
  STFileHashEntry *entry = hash->buckets[idx];
4,589✔
477
  while (entry) {
4,592✔
478
    if (strcmp(entry->fname, fname) == 0) {
4,591✔
479
      return entry;
4,588✔
480
    }
481
    entry = entry->next;
3✔
482
  }
483

484
  return NULL;
1✔
485
}
486

487
static void tsdbFSDestroyFileObjHash(STFileHash *hash) {
2,247✔
488
  for (int32_t i = 0; i < hash->numBucket; i++) {
9,078,097✔
489
    STFileHashEntry *entry = hash->buckets[i];
9,075,851✔
490
    while (entry) {
9,080,441✔
491
      STFileHashEntry *next = entry->next;
4,591✔
492
      taosMemoryFree(entry);
4,591!
493
      entry = next;
4,590✔
494
    }
495
  }
496
  taosMemoryFree(hash->buckets);
2,246!
497
  memset(hash, 0, sizeof(*hash));
2,247✔
498
}
2,247✔
499

500
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
2,223✔
501
  int32_t code = 0;
2,223✔
502
  int32_t lino = 0;
2,223✔
503
  int32_t corrupt = false;
2,223✔
504

505
  {  // scan each file
506
    STFileSet *fset = NULL;
2,223✔
507
    TARRAY2_FOREACH(fs->fSetArr, fset) {
3,809✔
508
      // data file
509
      for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
7,930✔
510
        if (fset->farr[ftype] == NULL) continue;
6,345✔
511
        STFileObj *fobj = fset->farr[ftype];
687✔
512
        code = tsdbFSDoScanAndFixFile(fs, fobj);
687✔
513
        if (code) {
687!
514
          fset->maxVerValid = (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
515
          corrupt = true;
×
516
        }
517
      }
518

519
      // stt file
520
      SSttLvl *lvl;
521
      TARRAY2_FOREACH(fset->lvlArr, lvl) {
3,132✔
522
        STFileObj *fobj;
523
        TARRAY2_FOREACH(lvl->fobjArr, fobj) {
3,197✔
524
          code = tsdbFSDoScanAndFixFile(fs, fobj);
1,650✔
525
          if (code) {
1,651!
526
            fset->maxVerValid =
×
527
                (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
528
            corrupt = true;
×
529
          }
530
        }
531
      }
532
    }
533
  }
534

535
  if (corrupt) {
2,224!
536
    tsdbError("vgId:%d, not to clear dangling files due to fset incompleteness", TD_VID(fs->tsdb->pVnode));
×
537
    fs->fsstate = TSDB_FS_STATE_INCOMPLETE;
×
538
    code = 0;
×
539
    goto _exit;
×
540
  }
541

542
  {  // clear unreferenced files
543
    STfsDir *dir = NULL;
2,224✔
544
    TAOS_CHECK_GOTO(tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path, &dir), &lino, _exit);
2,224!
545

546
    STFileHash fobjHash = {0};
2,232✔
547
    code = tsdbFSCreateFileObjHash(fs, &fobjHash);
2,232✔
548
    if (code) goto _close_dir;
2,247!
549

550
    for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) {
9,082✔
551
      if (taosIsDir(file->aname)) continue;
6,833✔
552

553
      if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
4,584!
554
        tsdbRemoveFile(file->aname);
×
555
      }
556
    }
557

558
    tsdbFSDestroyFileObjHash(&fobjHash);
2,242✔
559

560
  _close_dir:
2,246✔
561
    tfsClosedir(dir);
2,246✔
562
  }
563

564
_exit:
2,246✔
565
  if (code) {
2,246!
566
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
567
  }
568
  return code;
2,246✔
569
}
570

571
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
2,228✔
572
  fs->neid = 0;
2,228✔
573

574
  // get max commit id
575
  const STFileSet *fset;
576
  TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
3,817✔
577

578
  // scan and fix
579
  int32_t code = 0;
2,228✔
580
  int32_t lino = 0;
2,228✔
581

582
  code = tsdbFSDoSanAndFix(fs);
2,228✔
583
  TSDB_CHECK_CODE(code, lino, _exit);
2,243!
584

585
_exit:
2,243✔
586
  if (code) {
2,243!
587
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
588
  }
589
  return code;
2,243✔
590
}
591

592
static int32_t tsdbFSDupState(STFileSystem *fs) {
15,818✔
593
  int32_t code;
594

595
  const TFileSetArray *src = fs->fSetArr;
15,818✔
596
  TFileSetArray       *dst = fs->fSetArrTmp;
15,818✔
597

598
  TARRAY2_CLEAR(dst, tsdbTFileSetClear);
40,692✔
599

600
  const STFileSet *fset1;
601
  TARRAY2_FOREACH(src, fset1) {
42,282✔
602
    STFileSet *fset2;
603
    code = tsdbTFileSetInitCopy(fs->tsdb, fset1, &fset2);
26,462✔
604
    if (code) return code;
26,461!
605
    code = TARRAY2_APPEND(dst, fset2);
26,461✔
606
    if (code) return code;
26,464!
607
  }
608

609
  return 0;
15,820✔
610
}
611

612
static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
11,912✔
613
  int32_t code = 0;
11,912✔
614
  int32_t lino = 0;
11,912✔
615
  STsdb  *pTsdb = fs->tsdb;
11,912✔
616

617
  char fCurrent[TSDB_FILENAME_LEN];
618
  char cCurrent[TSDB_FILENAME_LEN];
619
  char mCurrent[TSDB_FILENAME_LEN];
620

621
  current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
11,912✔
622
  current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
11,917✔
623
  current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
11,934✔
624

625
  if (taosCheckExistFile(fCurrent)) {  // current.json exists
11,940✔
626
    code = load_fs(pTsdb, fCurrent, fs->fSetArr);
2,244✔
627
    TSDB_CHECK_CODE(code, lino, _exit);
2,234!
628

629
    if (taosCheckExistFile(cCurrent)) {
2,234!
630
      // current.c.json exists
631

632
      fs->etype = TSDB_FEDIT_COMMIT;
×
633
      if (rollback) {
×
634
        code = abort_edit(fs);
×
635
        TSDB_CHECK_CODE(code, lino, _exit);
×
636
      } else {
637
        code = load_fs(pTsdb, cCurrent, fs->fSetArrTmp);
×
638
        TSDB_CHECK_CODE(code, lino, _exit);
×
639

640
        code = commit_edit(fs);
×
641
        TSDB_CHECK_CODE(code, lino, _exit);
×
642
      }
643
    } else if (taosCheckExistFile(mCurrent)) {
2,236!
644
      // current.m.json exists
645
      fs->etype = TSDB_FEDIT_MERGE;
×
646
      code = abort_edit(fs);
×
647
      TSDB_CHECK_CODE(code, lino, _exit);
×
648
    }
649

650
    code = tsdbFSDupState(fs);
2,244✔
651
    TSDB_CHECK_CODE(code, lino, _exit);
2,237!
652

653
    code = tsdbFSScanAndFix(fs);
2,237✔
654
    TSDB_CHECK_CODE(code, lino, _exit);
2,243!
655
  } else {
656
    code = save_fs(fs->fSetArr, fCurrent);
9,689✔
657
    TSDB_CHECK_CODE(code, lino, _exit);
9,694!
658
  }
659

660
_exit:
9,694✔
661
  if (code) {
11,937!
662
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
663
  } else {
664
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
11,937✔
665
  }
666
  return code;
11,941✔
667
}
668

669
static void close_file_system(STFileSystem *fs) {
11,941✔
670
  TARRAY2_CLEAR(fs->fSetArr, tsdbTFileSetClear);
343,420✔
671
  TARRAY2_CLEAR(fs->fSetArrTmp, tsdbTFileSetClear);
343,433✔
672
}
11,939✔
673

674
static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) {
×
675
  if (pSet1->fid < pSet2->fid) {
×
676
    return -1;
×
677
  } else if (pSet1->fid > pSet2->fid) {
×
678
    return 1;
×
679
  }
680
  return 0;
×
681
}
682

683
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
13,575✔
684
  int32_t code = 0;
13,575✔
685
  int32_t lino = 0;
13,575✔
686

687
  code = tsdbFSDupState(fs);
13,575✔
688
  if (code) return code;
13,575!
689

690
  TFileSetArray  *fsetArray = fs->fSetArrTmp;
13,575✔
691
  STFileSet      *fset = NULL;
13,575✔
692
  const STFileOp *op;
693
  int32_t         fid = INT32_MIN;
13,575✔
694
  TSKEY           now = taosGetTimestampMs();
13,575✔
695
  TARRAY2_FOREACH_PTR(opArray, op) {
374,544✔
696
    if (!fset || fset->fid != op->fid) {
360,969✔
697
      STFileSet tfset = {.fid = op->fid};
342,260✔
698
      fset = &tfset;
342,260✔
699
      STFileSet **fsetPtr = TARRAY2_SEARCH(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ);
342,238✔
700
      fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
342,238✔
701

702
      if (!fset) {
342,238✔
703
        code = tsdbTFileSetInit(op->fid, &fset);
329,909✔
704
        TSDB_CHECK_CODE(code, lino, _exit);
329,869!
705

706
        code = TARRAY2_SORT_INSERT(fsetArray, fset, tsdbTFileSetCmprFn);
329,895✔
707
        TSDB_CHECK_CODE(code, lino, _exit);
329,895!
708
      }
709
    }
710

711
    code = tsdbTFileSetEdit(fs->tsdb, fset, op);
360,933✔
712
    TSDB_CHECK_CODE(code, lino, _exit);
360,969!
713

714
    if (fid != op->fid) {
360,969✔
715
      fid = op->fid;
342,260✔
716
      if (etype == TSDB_FEDIT_COMMIT) {
342,260✔
717
        fset->lastCommit = now;
337,935✔
718
      } else if (etype == TSDB_FEDIT_COMPACT) {
4,325✔
719
        fset->lastCompact = now;
106✔
720
      }
721
    }
722
  }
723

724
  // remove empty empty stt level and empty file set
725
  int32_t i = 0;
13,575✔
726
  while (i < TARRAY2_SIZE(fsetArray)) {
368,390✔
727
    fset = TARRAY2_GET(fsetArray, i);
354,816✔
728

729
    SSttLvl *lvl;
730
    int32_t  j = 0;
354,816✔
731
    while (j < TARRAY2_SIZE(fset->lvlArr)) {
720,220✔
732
      lvl = TARRAY2_GET(fset->lvlArr, j);
365,404✔
733

734
      if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
365,404✔
735
        TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
5,512!
736
      } else {
737
        j++;
359,892✔
738
      }
739
    }
740

741
    if (tsdbTFileSetIsEmpty(fset)) {
354,816!
UNCOV
742
      TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear);
×
743
    } else {
744
      i++;
354,815✔
745
    }
746
  }
747

748
_exit:
13,574✔
749
  if (code) {
13,574!
750
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
751
  }
752
  return code;
13,574✔
753
}
754

755
// return error code
756
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) {
11,874✔
757
  int32_t code;
758
  int32_t lino;
759

760
  code = tsdbCheckAndUpgradeFileSystem(pTsdb, rollback);
11,874✔
761
  TSDB_CHECK_CODE(code, lino, _exit);
11,925!
762

763
  code = create_fs(pTsdb, fs);
11,925✔
764
  TSDB_CHECK_CODE(code, lino, _exit);
11,934!
765

766
  code = open_fs(fs[0], rollback);
11,934✔
767
  TSDB_CHECK_CODE(code, lino, _exit);
11,941!
768

769
_exit:
11,941✔
770
  if (code) {
11,941!
771
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
772
    destroy_fs(fs);
×
773
  } else {
774
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
11,941✔
775
  }
776
  return code;
11,941✔
777
}
778

779
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block);
780
extern void tsdbStopAllCompTask(STsdb *tsdb);
781

782
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
11,990✔
783
  STFileSystem *fs = pTsdb->pFS;
11,990✔
784
  SArray       *asyncTasks = taosArrayInit(0, sizeof(SVATaskID));
11,990✔
785
  if (asyncTasks == NULL) {
11,990!
786
    return terrno;
×
787
  }
788

789
  (void)taosThreadMutexLock(&pTsdb->mutex);
11,990✔
790

791
  // disable
792
  pTsdb->bgTaskDisabled = true;
11,990✔
793

794
  // collect channel
795
  STFileSet *fset;
796
  TARRAY2_FOREACH(fs->fSetArr, fset) {
343,487✔
797
    if (taosArrayPush(asyncTasks, &fset->mergeTask) == NULL       //
663,008!
798
        || taosArrayPush(asyncTasks, &fset->compactTask) == NULL  //
663,010!
799
        || taosArrayPush(asyncTasks, &fset->retentionTask) == NULL) {
662,987!
800
      taosArrayDestroy(asyncTasks);
×
801
      (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
802
      return terrno;
×
803
    }
804
    tsdbFSSetBlockCommit(fset, false);
331,490✔
805
  }
806

807
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
11,990✔
808

809
  // destroy all channels
810
  for (int32_t k = 0; k < 2; k++) {
35,969✔
811
    for (int32_t i = 0; i < taosArrayGetSize(asyncTasks); i++) {
2,012,957✔
812
      SVATaskID *task = taosArrayGet(asyncTasks, i);
1,988,976✔
813
      if (k == 0) {
1,988,977✔
814
        (void)vnodeACancel(task);
994,608✔
815
      } else {
816
        vnodeAWait(task);
994,369✔
817
      }
818
    }
819
  }
820
  taosArrayDestroy(asyncTasks);
11,989✔
821

822
#ifdef TD_ENTERPRISE
823
  tsdbStopAllCompTask(pTsdb);
11,990✔
824
#endif
825
  return 0;
11,990✔
826
}
827

828
void tsdbEnableBgTask(STsdb *pTsdb) {
49✔
829
  (void)taosThreadMutexLock(&pTsdb->mutex);
49✔
830
  pTsdb->bgTaskDisabled = false;
49✔
831
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
49✔
832
}
49✔
833

834
void tsdbCloseFS(STFileSystem **fs) {
11,941✔
835
  if (fs[0] == NULL) return;
11,941!
836

837
  int32_t code = tsdbDisableAndCancelAllBgTask((*fs)->tsdb);
11,941✔
838
  if (code) {
11,941!
839
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID((*fs)->tsdb->pVnode), __func__, __LINE__,
×
840
              tstrerror(code));
841
  }
842
  close_file_system(fs[0]);
11,941✔
843
  destroy_fs(fs);
11,940✔
844
  return;
11,940✔
845
}
846

847
int64_t tsdbFSAllocEid(STFileSystem *fs) {
13,945✔
848
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
13,945✔
849
  int64_t cid = ++fs->neid;
13,945✔
850
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
13,945✔
851
  return cid;
13,944✔
852
}
853

854
void tsdbFSUpdateEid(STFileSystem *fs, int64_t cid) {
174✔
855
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
174✔
856
  fs->neid = TMAX(fs->neid, cid);
174✔
857
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
174✔
858
}
174✔
859

860
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
13,575✔
861
  int32_t code = 0;
13,575✔
862
  int32_t lino;
863
  char    current_t[TSDB_FILENAME_LEN];
864

865
  if (etype == TSDB_FEDIT_COMMIT) {
13,575✔
866
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
9,250✔
867
  } else {
868
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
4,325✔
869
  }
870

871
  if (tsem_wait(&fs->canEdit) != 0) {
13,575!
872
    tsdbError("vgId:%d failed to wait semaphore", TD_VID(fs->tsdb->pVnode));
×
873
  }
874
  fs->etype = etype;
13,575✔
875

876
  // edit
877
  code = edit_fs(fs, opArray, etype);
13,575✔
878
  TSDB_CHECK_CODE(code, lino, _exit);
13,574!
879

880
  // save fs
881
  code = save_fs(fs->fSetArrTmp, current_t);
13,574✔
882
  TSDB_CHECK_CODE(code, lino, _exit);
13,575!
883

884
_exit:
13,575✔
885
  if (code) {
13,575!
886
    tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, lino,
×
887
              tstrerror(code), etype);
888
  } else {
889
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, etype);
13,575!
890
  }
891
  return code;
13,575✔
892
}
893

894
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
681,345✔
895
  if (block) {
681,345!
896
    fset->blockCommit = true;
×
897
  } else {
898
    fset->blockCommit = false;
681,345✔
899
    if (fset->numWaitCommit > 0) {
681,345!
900
      (void)taosThreadCondSignal(&fset->canCommit);
×
901
    }
902
  }
903
}
681,345✔
904

905
void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
456,798✔
906
  (void)taosThreadMutexLock(&tsdb->mutex);
456,798✔
907
  STFileSet *fset;
908
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
456,827✔
909
  if (fset) {
456,756✔
910
    while (fset->blockCommit) {
8,007!
911
      fset->numWaitCommit++;
×
912
      (void)taosThreadCondWait(&fset->canCommit, &tsdb->mutex);
×
913
      fset->numWaitCommit--;
×
914
    }
915
  }
916
  (void)taosThreadMutexUnlock(&tsdb->mutex);
456,756✔
917
  return;
456,812✔
918
}
919

920
// IMPORTANT: the caller must hold fs->tsdb->mutex
921
int32_t tsdbFSEditCommit(STFileSystem *fs) {
13,575✔
922
  int32_t code = 0;
13,575✔
923
  int32_t lino = 0;
13,575✔
924

925
  // commit
926
  code = commit_edit(fs);
13,575✔
927
  TSDB_CHECK_CODE(code, lino, _exit);
13,575!
928

929
  // schedule merge
930
  int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
13,575✔
931
  if (sttTrigger > 1 && !fs->tsdb->bgTaskDisabled) {
13,575✔
932
    STFileSet *fset;
933
    TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
362,886✔
934
      if (TARRAY2_SIZE(fset->lvlArr) == 0) {
349,852✔
935
        tsdbFSSetBlockCommit(fset, false);
2,902✔
936
        continue;
2,902✔
937
      }
938

939
      SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
346,950✔
940
      if (lvl->level != 0) {
346,950✔
941
        tsdbFSSetBlockCommit(fset, false);
5,859✔
942
        continue;
5,859✔
943
      }
944

945
      // bool    skipMerge = false;
946
      int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
341,091✔
947
      if (numFile >= sttTrigger && (!vnodeATaskValid(&fset->mergeTask))) {
341,091✔
948
        SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
4,230!
949
        if (arg == NULL) {
4,230!
950
          code = terrno;
×
951
          TSDB_CHECK_CODE(code, lino, _exit);
×
952
        }
953

954
        arg->tsdb = fs->tsdb;
4,230✔
955
        arg->fid = fset->fid;
4,230✔
956

957
        code = vnodeAsync(MERGE_TASK_ASYNC, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, &fset->mergeTask);
4,230✔
958
        TSDB_CHECK_CODE(code, lino, _exit);
4,230!
959
      }
960

961
      if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
341,091!
962
        tsdbFSSetBlockCommit(fset, true);
×
963
      } else {
964
        tsdbFSSetBlockCommit(fset, false);
341,091✔
965
      }
966
    }
967
  }
968

969
_exit:
13,575✔
970
  if (code) {
13,575!
971
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
×
972
  } else {
973
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
13,575!
974
  }
975
  if (tsem_post(&fs->canEdit) != 0) {
13,575!
976
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
977
  }
978
  return code;
13,575✔
979
}
980

981
int32_t tsdbFSEditAbort(STFileSystem *fs) {
×
982
  int32_t code = abort_edit(fs);
×
983
  if (tsem_post(&fs->canEdit) != 0) {
×
984
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
985
  }
986
  return code;
×
987
}
988

989
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
930,864✔
990
  STFileSet   tfset = {.fid = fid};
930,864✔
991
  STFileSet  *pset = &tfset;
930,864✔
992
  STFileSet **fsetPtr = TARRAY2_SEARCH(fs->fSetArr, &pset, tsdbTFileSetCmprFn, TD_EQ);
930,864✔
993
  fset[0] = (fsetPtr == NULL) ? NULL : fsetPtr[0];
930,982✔
994
}
930,982✔
995

996
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
46✔
997
  int32_t    code = 0;
46✔
998
  STFileSet *fset;
999
  STFileSet *fset1;
1000

1001
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
46!
1002
  if (fsetArr[0] == NULL) return terrno;
46!
1003

1004
  TARRAY2_INIT(fsetArr[0]);
46✔
1005

1006
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
46✔
1007
  TARRAY2_FOREACH(fs->fSetArr, fset) {
46!
1008
    code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1);
×
1009
    if (code) break;
×
1010

1011
    code = TARRAY2_APPEND(fsetArr[0], fset1);
×
1012
    if (code) break;
×
1013
  }
1014
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
46✔
1015

1016
  if (code) {
46!
1017
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1018
    taosMemoryFree(fsetArr[0]);
×
1019
    fsetArr[0] = NULL;
×
1020
  }
1021
  return code;
46✔
1022
}
1023

1024
void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
46✔
1025
  if (fsetArr[0]) {
46!
1026
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
46!
1027
    taosMemoryFree(fsetArr[0]);
46!
1028
    fsetArr[0] = NULL;
46✔
1029
  }
1030
}
46✔
1031

1032
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
51✔
1033
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
51✔
1034
  int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr);
51✔
1035
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
51✔
1036
  return code;
51✔
1037
}
1038

1039
int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr) {
4,506,974✔
1040
  int32_t    code = 0;
4,506,974✔
1041
  STFileSet *fset, *fset1;
1042

1043
  fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0]));
4,506,974!
1044
  if (fsetArr[0] == NULL) return terrno;
4,510,579!
1045

1046
  TARRAY2_FOREACH(fs->fSetArr, fset) {
11,950,096✔
1047
    code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1);
7,440,062✔
1048
    if (code) break;
7,440,277!
1049

1050
    code = TARRAY2_APPEND(fsetArr[0], fset1);
7,440,277✔
1051
    if (code) {
7,439,517!
1052
      tsdbTFileSetClear(&fset1);
×
1053
      break;
×
1054
    }
1055
  }
1056

1057
  if (code) {
4,510,034!
1058
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1059
    taosMemoryFree(fsetArr[0]);
×
1060
    fsetArr[0] = NULL;
×
1061
  }
1062
  return code;
4,510,034✔
1063
}
1064

1065
void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
4,510,662✔
1066
  if (fsetArr[0]) {
4,510,662!
1067
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
11,957,853✔
1068
    taosMemoryFreeClear(fsetArr[0]);
4,511,905!
1069
    fsetArr[0] = NULL;
4,512,429✔
1070
  }
1071
}
4,511,734✔
1072

1073
static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) {
×
1074
  int32_t   capacity = TARRAY2_SIZE(pRanges) * 2;
×
1075
  SHashObj *pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
×
1076
  if (pHash == NULL) {
×
1077
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1078
    return NULL;
×
1079
  }
1080

1081
  for (int32_t i = 0; i < TARRAY2_SIZE(pRanges); i++) {
×
1082
    STFileSetRange *u = TARRAY2_GET(pRanges, i);
×
1083
    int32_t         fid = u->fid;
×
1084
    int32_t         code = taosHashPut(pHash, &fid, sizeof(fid), u, sizeof(*u));
×
1085
    tsdbDebug("range diff hash fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
×
1086
  }
1087
  return pHash;
×
1088
}
1089

1090
int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr,
×
1091
                                       TFileOpArray *fopArr) {
1092
  int32_t    code = 0;
×
1093
  STFileSet *fset;
1094
  STFileSet *fset1;
1095
  SHashObj  *pHash = NULL;
×
1096

1097
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
×
1098
  if (fsetArr == NULL) return terrno;
×
1099
  TARRAY2_INIT(fsetArr[0]);
×
1100

1101
  if (pRanges) {
×
1102
    pHash = tsdbFSetRangeArrayToHash(pRanges);
×
1103
    if (pHash == NULL) {
×
1104
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1105
      goto _out;
×
1106
    }
1107
  }
1108

1109
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
×
1110
  TARRAY2_FOREACH(fs->fSetArr, fset) {
×
1111
    int64_t ever = VERSION_MAX;
×
1112
    if (pHash) {
×
1113
      int32_t         fid = fset->fid;
×
1114
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
×
1115
      if (u) {
×
1116
        ever = u->sver - 1;
×
1117
      }
1118
    }
1119

1120
    code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr);
×
1121
    if (code) break;
×
1122

1123
    code = TARRAY2_APPEND(fsetArr[0], fset1);
×
1124
    if (code) break;
×
1125
  }
1126
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
×
1127

1128
_out:
×
1129
  if (code) {
×
1130
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1131
    taosMemoryFree(fsetArr[0]);
×
1132
    fsetArr[0] = NULL;
×
1133
  }
1134
  if (pHash) {
×
1135
    taosHashCleanup(pHash);
×
1136
    pHash = NULL;
×
1137
  }
1138
  return code;
×
1139
}
1140

1141
void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); }
×
1142

1143
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
×
1144
                                      TFileSetRangeArray **fsrArr) {
1145
  int32_t         code = 0;
×
1146
  STFileSet      *fset;
1147
  STFileSetRange *fsr1 = NULL;
×
1148
  SHashObj       *pHash = NULL;
×
1149

1150
  fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
×
1151
  if (fsrArr[0] == NULL) {
×
1152
    code = terrno;
×
1153
    goto _out;
×
1154
  }
1155

1156
  tsdbInfo("pRanges size:%d", (pRanges == NULL ? 0 : TARRAY2_SIZE(pRanges)));
×
1157
  if (pRanges) {
×
1158
    pHash = tsdbFSetRangeArrayToHash(pRanges);
×
1159
    if (pHash == NULL) {
×
1160
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1161
      goto _out;
×
1162
    }
1163
  }
1164

1165
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
×
1166
  TARRAY2_FOREACH(fs->fSetArr, fset) {
×
1167
    int64_t sver1 = sver;
×
1168
    int64_t ever1 = ever;
×
1169

1170
    if (pHash) {
×
1171
      int32_t         fid = fset->fid;
×
1172
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
×
1173
      if (u) {
×
1174
        sver1 = u->sver;
×
1175
        tsdbDebug("range hash get fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
×
1176
      }
1177
    }
1178

1179
    if (sver1 > ever1) {
×
1180
      tsdbDebug("skip fid:%d, sver:%" PRId64 ", ever:%" PRId64, fset->fid, sver1, ever1);
×
1181
      continue;
×
1182
    }
1183

1184
    tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
×
1185

1186
    code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
×
1187
    if (code) break;
×
1188

1189
    code = TARRAY2_APPEND(fsrArr[0], fsr1);
×
1190
    if (code) break;
×
1191

1192
    fsr1 = NULL;
×
1193
  }
1194
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
×
1195

1196
  if (code) {
×
1197
    tsdbTFileSetRangeClear(&fsr1);
×
1198
    TARRAY2_DESTROY(fsrArr[0], tsdbTFileSetRangeClear);
×
1199
    fsrArr[0] = NULL;
×
1200
  }
1201

1202
_out:
×
1203
  if (pHash) {
×
1204
    taosHashCleanup(pHash);
×
1205
    pHash = NULL;
×
1206
  }
1207
  return code;
×
1208
}
1209

1210
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { tsdbTFileSetRangeArrayDestroy(fsrArr); }
×
1211

1212
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task, STFileSet **fset) {
461,394✔
1213
  // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
1214
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
461,394✔
1215

1216
  tsdbFSGetFSet(tsdb->pFS, fid, fset);
461,394✔
1217
  if (*fset == NULL) {
461,522✔
1218
    return;
448,818✔
1219
  }
1220

1221
  struct STFileSetCond *cond = NULL;
12,704✔
1222
  if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
12,704✔
1223
    cond = &(*fset)->conds[0];
8,006✔
1224
  } else {
1225
    cond = &(*fset)->conds[1];
4,698✔
1226
  }
1227

1228
  while (1) {
1229
    if (cond->running) {
12,712✔
1230
      cond->numWait++;
7✔
1231
      (void)taosThreadCondWait(&cond->cond, &tsdb->mutex);
7✔
1232
      cond->numWait--;
8✔
1233
    } else {
1234
      cond->running = true;
12,705✔
1235
      break;
12,705✔
1236
    }
1237
  }
1238

1239
  tsdbInfo("vgId:%d begin %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
12,705!
1240
  return;
12,705✔
1241
}
1242

1243
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task) {
12,705✔
1244
  // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
1245
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
12,705✔
1246

1247
  STFileSet *fset = NULL;
12,705✔
1248
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
12,705✔
1249
  if (fset == NULL) {
12,705!
1250
    return;
×
1251
  }
1252

1253
  struct STFileSetCond *cond = NULL;
12,705✔
1254
  if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
12,705✔
1255
    cond = &fset->conds[0];
8,007✔
1256
  } else {
1257
    cond = &fset->conds[1];
4,698✔
1258
  }
1259

1260
  cond->running = false;
12,705✔
1261
  if (cond->numWait > 0) {
12,705✔
1262
    (void)taosThreadCondSignal(&cond->cond);
7✔
1263
  }
1264

1265
  tsdbInfo("vgId:%d finish %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
12,705!
1266
  return;
12,705✔
1267
}
1268

1269
struct SFileSetReader {
1270
  STsdb     *pTsdb;
1271
  STFileSet *pFileSet;
1272
  int32_t    fid;
1273
  int64_t    startTime;
1274
  int64_t    endTime;
1275
  int64_t    lastCompactTime;
1276
  int64_t    totalSize;
1277
};
1278

1279
int32_t tsdbFileSetReaderOpen(void *pVnode, struct SFileSetReader **ppReader) {
×
1280
  if (pVnode == NULL || ppReader == NULL) {
×
1281
    return TSDB_CODE_INVALID_PARA;
×
1282
  }
1283

1284
  STsdb *pTsdb = ((SVnode *)pVnode)->pTsdb;
×
1285

1286
  (*ppReader) = taosMemoryCalloc(1, sizeof(struct SFileSetReader));
×
1287
  if (*ppReader == NULL) {
×
1288
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, __LINE__,
×
1289
              tstrerror(terrno));
1290
    return terrno;
×
1291
  }
1292

1293
  (*ppReader)->pTsdb = pTsdb;
×
1294
  (*ppReader)->fid = INT32_MIN;
×
1295
  (*ppReader)->pFileSet = NULL;
×
1296

1297
  return TSDB_CODE_SUCCESS;
×
1298
}
1299

1300
extern bool tsdbShouldCompact(const STFileSet *pFileSet);
1301

1302
#ifndef TD_ENTERPRISE
1303
bool tsdbShouldCompact(const STFileSet *pFileSet) { return false; }
1304
#endif
1305

1306
static int32_t tsdbFileSetReaderNextNoLock(struct SFileSetReader *pReader) {
×
1307
  STsdb  *pTsdb = pReader->pTsdb;
×
1308
  int32_t code = TSDB_CODE_SUCCESS;
×
1309

1310
  tsdbTFileSetClear(&pReader->pFileSet);
×
1311

1312
  STFileSet *fset = &(STFileSet){
×
1313
      .fid = pReader->fid,
×
1314
  };
1315

1316
  STFileSet **fsetPtr = TARRAY2_SEARCH(pReader->pTsdb->pFS->fSetArr, &fset, tsdbTFileSetCmprFn, TD_GT);
×
1317
  if (fsetPtr == NULL) {
×
1318
    pReader->fid = INT32_MAX;
×
1319
    return TSDB_CODE_NOT_FOUND;
×
1320
  }
1321

1322
  // ref file set
1323
  code = tsdbTFileSetInitRef(pReader->pTsdb, *fsetPtr, &pReader->pFileSet);
×
1324
  if (code) return code;
×
1325

1326
  // get file set details
1327
  pReader->fid = pReader->pFileSet->fid;
×
1328
  tsdbFidKeyRange(pReader->fid, pTsdb->keepCfg.days, pTsdb->keepCfg.precision, &pReader->startTime, &pReader->endTime);
×
1329
  pReader->lastCompactTime = pReader->pFileSet->lastCompact;
×
1330
  pReader->totalSize = 0;
×
1331
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
×
1332
    STFileObj *fobj = pReader->pFileSet->farr[i];
×
1333
    if (fobj) {
×
1334
      pReader->totalSize += fobj->f->size;
×
1335
    }
1336
  }
1337
  SSttLvl *lvl;
1338
  TARRAY2_FOREACH(pReader->pFileSet->lvlArr, lvl) {
×
1339
    STFileObj *fobj;
1340
    TARRAY2_FOREACH(lvl->fobjArr, fobj) { pReader->totalSize += fobj->f->size; }
×
1341
  }
1342

1343
  return code;
×
1344
}
1345

1346
int32_t tsdbFileSetReaderNext(struct SFileSetReader *pReader) {
×
1347
  int32_t code = TSDB_CODE_SUCCESS;
×
1348
  (void)taosThreadMutexLock(&pReader->pTsdb->mutex);
×
1349
  code = tsdbFileSetReaderNextNoLock(pReader);
×
1350
  (void)taosThreadMutexUnlock(&pReader->pTsdb->mutex);
×
1351
  return code;
×
1352
}
1353

1354
int32_t tsdbFileSetGetEntryField(struct SFileSetReader *pReader, const char *field, void *value) {
×
1355
  const char *fieldName;
1356

1357
  if (pReader->fid == INT32_MIN || pReader->fid == INT32_MAX) {
×
1358
    return TSDB_CODE_INVALID_PARA;
×
1359
  }
1360

1361
  fieldName = "fileset_id";
×
1362
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1363
    *(int32_t *)value = pReader->fid;
×
1364
    return TSDB_CODE_SUCCESS;
×
1365
  }
1366

1367
  fieldName = "start_time";
×
1368
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1369
    *(int64_t *)value = pReader->startTime;
×
1370
    return TSDB_CODE_SUCCESS;
×
1371
  }
1372

1373
  fieldName = "end_time";
×
1374
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1375
    *(int64_t *)value = pReader->endTime;
×
1376
    return TSDB_CODE_SUCCESS;
×
1377
  }
1378

1379
  fieldName = "total_size";
×
1380
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1381
    *(int64_t *)value = pReader->totalSize;
×
1382
    return TSDB_CODE_SUCCESS;
×
1383
  }
1384

1385
  fieldName = "last_compact_time";
×
1386
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1387
    *(int64_t *)value = pReader->lastCompactTime;
×
1388
    return TSDB_CODE_SUCCESS;
×
1389
  }
1390

1391
  fieldName = "should_compact";
×
1392
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1393
    *(char *)value = tsdbShouldCompact(pReader->pFileSet);
×
1394
    return TSDB_CODE_SUCCESS;
×
1395
  }
1396

1397
  fieldName = "details";
×
1398
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1399
    // TODO
1400
    return TSDB_CODE_SUCCESS;
×
1401
  }
1402

1403
  return TSDB_CODE_INVALID_PARA;
×
1404
}
1405

1406
void tsdbFileSetReaderClose(struct SFileSetReader **ppReader) {
×
1407
  if (ppReader == NULL || *ppReader == NULL) {
×
1408
    return;
×
1409
  }
1410

1411
  tsdbTFileSetClear(&(*ppReader)->pFileSet);
×
1412
  taosMemoryFree(*ppReader);
×
1413

1414
  *ppReader = NULL;
×
1415
  return;
×
1416
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc