• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5079

17 May 2026 01:15AM UTC coverage: 73.395% (-0.05%) from 73.443%
#5079

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281685 of 383795 relevant lines covered (73.39%)

137851703.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.23
/source/dnode/vnode/src/tsdb/tsdbFS2.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cos.h"
17
#include "tencrypt.h"
18
#include "tsdbFS2.h"
19
#include "tsdbUpgrade.h"
20
#include "vnd.h"
21

22
#define BLOCK_COMMIT_FACTOR 3
23

24
bool    tsdbShouldForceRepair(STFileSystem *fs);
25
int32_t tsdbForceRepair(STFileSystem *fs);
26

27
typedef struct STFileHashEntry {
28
  struct STFileHashEntry *next;
29
  char                    fname[TSDB_FILENAME_LEN];
30
} STFileHashEntry;
31

32
typedef struct {
33
  int32_t           numFile;
34
  int32_t           numBucket;
35
  STFileHashEntry **buckets;
36
} STFileHash;
37

38
static const char *gCurrentFname[] = {
39
    [TSDB_FCURRENT] = "current.json",
40
    [TSDB_FCURRENT_C] = "current.c.json",
41
    [TSDB_FCURRENT_M] = "current.m.json",
42
};
43

44
static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
4,879,654✔
45
  fs[0] = taosMemoryCalloc(1, sizeof(*fs[0]));
4,879,654✔
46
  if (fs[0] == NULL) {
4,902,057✔
47
    return terrno;
×
48
  }
49

50
  fs[0]->tsdb = pTsdb;
4,902,204✔
51
  int32_t code = tsem_init(&fs[0]->canEdit, 0, 1);
4,901,714✔
52
  if (code) {
4,892,536✔
53
    taosMemoryFree(fs[0]);
×
54
    return code;
×
55
  }
56

57
  fs[0]->fsstate = TSDB_FS_STATE_NORMAL;
4,892,536✔
58
  fs[0]->neid = 0;
4,901,658✔
59
  TARRAY2_INIT(fs[0]->fSetArr);
4,893,218✔
60
  TARRAY2_INIT(fs[0]->fSetArrTmp);
4,896,213✔
61

62
  return 0;
4,891,863✔
63
}
64

65
static void destroy_fs(STFileSystem **fs) {
4,906,020✔
66
  if (fs[0] == NULL) return;
4,906,020✔
67

68
  TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
4,907,320✔
69
  TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
4,906,634✔
70
  if (tsem_destroy(&fs[0]->canEdit) != 0) {
4,905,504✔
71
    tsdbError("failed to destroy semaphore");
×
72
  }
73
  taosMemoryFree(fs[0]);
4,902,734✔
74
  fs[0] = NULL;
4,906,525✔
75
}
76

77
void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
30,032,944✔
78
  int32_t offset = 0;
30,032,944✔
79

80
  vnodeGetPrimaryPath(pTsdb->pVnode, false, fname, TSDB_FILENAME_LEN);
30,032,944✔
81
  offset = strlen(fname);
30,037,205✔
82
  snprintf(fname + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s%s", TD_DIRSEP, pTsdb->name, TD_DIRSEP,
30,032,390✔
83
           gCurrentFname[ftype]);
30,027,860✔
84
}
30,037,823✔
85

86
static int32_t save_json(const cJSON *json, const char *fname) {
8,298,726✔
87
  int32_t   code = 0;
8,298,726✔
88
  int32_t   lino;
89
  char     *data = NULL;
8,298,726✔
90

91
  data = cJSON_PrintUnformatted(json);
8,298,726✔
92
  if (data == NULL) {
8,298,744✔
93
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
94
  }
95

96
  int32_t len = strlen(data);
8,298,744✔
97
  
98
  // Use encrypted write if tsCfgKey is enabled
99
  code = taosWriteCfgFile(fname, data, len);
8,298,744✔
100
  if (code != 0) {
8,302,427✔
101
    TSDB_CHECK_CODE(code, lino, _exit);
×
102
  }
103

104
_exit:
8,302,427✔
105
  if (code) {
8,302,427✔
106
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
107
  }
108
  taosMemoryFree(data);
8,307,241✔
109
  return code;
8,302,786✔
110
}
111

112
static int32_t load_json(const char *fname, cJSON **json) {
1,280,910✔
113
  int32_t code = 0;
1,280,910✔
114
  int32_t lino;
115
  char   *data = NULL;
1,280,910✔
116
  int32_t dataLen = 0;
1,280,910✔
117

118
  // Use taosReadCfgFile for automatic decryption support (returns null-terminated string)
119
  code = taosReadCfgFile(fname, &data, &dataLen);
1,280,727✔
120
  if (code != 0) {
1,280,635✔
121
    TSDB_CHECK_CODE(code, lino, _exit);
×
122
  }
123

124
  json[0] = cJSON_Parse(data);
1,280,635✔
125
  if (json[0] == NULL) {
1,280,711✔
126
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
127
  }
128

129
_exit:
1,280,600✔
130
  if (code) {
1,280,711✔
131
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
132
    json[0] = NULL;
×
133
  }
134
  taosMemoryFree(data);
1,280,711✔
135
  return code;
1,280,910✔
136
}
137

138
int32_t save_fs(const TFileSetArray *arr, const char *fname) {
8,301,525✔
139
  int32_t code = 0;
8,301,525✔
140
  int32_t lino = 0;
8,301,525✔
141

142
  cJSON *json = cJSON_CreateObject();
8,301,525✔
143
  if (json == NULL) {
8,302,320✔
144
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
145
  }
146

147
  // fmtv
148
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
8,302,320✔
149
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
150
  }
151

152
  // fset
153
  cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
8,298,309✔
154
  if (!ajson) {
8,299,672✔
155
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
156
  }
157
  const STFileSet *fset;
158
  TARRAY2_FOREACH(arr, fset) {
31,320,851✔
159
    cJSON *item = cJSON_CreateObject();
23,016,563✔
160
    if (!item) {
23,021,002✔
161
      TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
162
    }
163
    (void)cJSON_AddItemToArray(ajson, item);
23,021,002✔
164

165
    code = tsdbTFileSetToJson(fset, item);
23,019,395✔
166
    TSDB_CHECK_CODE(code, lino, _exit);
23,021,179✔
167
  }
168

169
  code = save_json(json, fname);
8,300,445✔
170
  TSDB_CHECK_CODE(code, lino, _exit);
8,301,998✔
171

172
_exit:
8,301,998✔
173
  if (code) {
8,301,998✔
174
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
175
  }
176
  cJSON_Delete(json);
8,301,998✔
177
  return code;
8,302,288✔
178
}
179

180
static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
1,275,603✔
181
  int32_t code = 0;
1,275,603✔
182
  int32_t lino = 0;
1,275,603✔
183

184
  TARRAY2_CLEAR(arr, tsdbTFileSetClear);
1,275,603✔
185

186
  // load json
187
  cJSON *json = NULL;
1,280,910✔
188
  code = load_json(fname, &json);
1,280,910✔
189
  TSDB_CHECK_CODE(code, lino, _exit);
1,280,818✔
190

191
  // parse json
192
  const cJSON *item1;
193

194
  /* fmtv */
195
  item1 = cJSON_GetObjectItem(json, "fmtv");
1,280,818✔
196
  if (cJSON_IsNumber(item1)) {
1,278,918✔
197
    if (item1->valuedouble != 1) {
1,278,677✔
198
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
199
    }
200
  } else {
201
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
202
  }
203

204
  /* fset */
205
  item1 = cJSON_GetObjectItem(json, "fset");
1,280,798✔
206
  if (cJSON_IsArray(item1)) {
1,280,798✔
207
    const cJSON *item2;
208
    cJSON_ArrayForEach(item2, item1) {
2,120,126✔
209
      STFileSet *fset;
842,535✔
210
      code = tsdbJsonToTFileSet(pTsdb, item2, &fset);
842,619✔
211
      TSDB_CHECK_CODE(code, lino, _exit);
842,129✔
212

213
      code = TARRAY2_APPEND(arr, fset);
842,129✔
214
      TSDB_CHECK_CODE(code, lino, _exit);
840,814✔
215
    }
216
    TARRAY2_SORT(arr, tsdbTFileSetCmprFn);
1,278,349✔
217
  } else {
218
    code = TSDB_CODE_FILE_CORRUPTED;
×
219
    TSDB_CHECK_CODE(code, lino, _exit);
×
220
  }
221

222
_exit:
1,281,038✔
223
  if (code) {
1,278,473✔
224
    tsdbError("%s failed at %s:%d since %s, fname:%s", __func__, __FILE__, lino, tstrerror(code), fname);
×
225
  }
226
  if (json) {
1,279,545✔
227
    cJSON_Delete(json);
1,279,545✔
228
  }
229
  return code;
1,280,818✔
230
}
231

232
static int32_t apply_commit(STFileSystem *fs) {
4,680,540✔
233
  int32_t        code = 0;
4,680,540✔
234
  int32_t        lino;
235
  TFileSetArray *fsetArray1 = fs->fSetArr;
4,680,540✔
236
  TFileSetArray *fsetArray2 = fs->fSetArrTmp;
4,680,977✔
237
  int32_t        i1 = 0, i2 = 0;
4,680,169✔
238

239
  while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
27,798,211✔
240
    STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_GET(fsetArray1, i1) : NULL;
23,121,553✔
241
    STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_GET(fsetArray2, i2) : NULL;
23,120,716✔
242

243
    if (fset1 && fset2) {
23,121,670✔
244
      if (fset1->fid < fset2->fid) {
5,423,137✔
245
        // delete fset1
246
        tsdbTFileSetRemove(fset1);
92,830✔
247
        i1++;
92,830✔
248
      } else if (fset1->fid > fset2->fid) {
5,330,307✔
249
        // create new file set with fid of fset2->fid
250
        code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
11,082✔
251
        TSDB_CHECK_CODE(code, lino, _exit);
11,082✔
252
        code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
11,082✔
253
        TSDB_CHECK_CODE(code, lino, _exit);
11,082✔
254
        i1++;
11,082✔
255
        i2++;
11,082✔
256
      } else {
257
        // edit
258
        code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1);
5,318,417✔
259
        TSDB_CHECK_CODE(code, lino, _exit);
5,319,225✔
260
        i1++;
5,319,225✔
261
        i2++;
5,319,225✔
262
      }
263
    } else if (fset1) {
17,698,533✔
264
      // delete fset1
265
      tsdbTFileSetRemove(fset1);
6,794✔
266
      i1++;
6,794✔
267
    } else {
268
      // create new file set with fid of fset2->fid
269
      code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
17,691,739✔
270
      TSDB_CHECK_CODE(code, lino, _exit);
17,688,596✔
271
      code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
17,688,429✔
272
      TSDB_CHECK_CODE(code, lino, _exit);
17,688,429✔
273
      i1++;
17,688,429✔
274
      i2++;
17,688,429✔
275
    }
276
  }
277

278
_exit:
4,679,469✔
279
  if (code) {
4,681,436✔
280
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
281
  }
282
  return code;
4,677,453✔
283
}
284

285
static int32_t commit_edit(STFileSystem *fs) {
4,680,254✔
286
  char current[TSDB_FILENAME_LEN];
4,679,912✔
287
  char current_t[TSDB_FILENAME_LEN];
4,681,094✔
288

289
  current_fname(fs->tsdb, current, TSDB_FCURRENT);
4,681,436✔
290
  if (fs->etype == TSDB_FEDIT_COMMIT) {
4,681,436✔
291
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
3,377,027✔
292
  } else {
293
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
1,304,409✔
294
  }
295

296
  int32_t code;
297
  int32_t lino;
298
  TSDB_CHECK_CODE(taosRenameFile(current_t, current), lino, _exit);
4,681,436✔
299

300
  code = apply_commit(fs);
4,680,999✔
301
  TSDB_CHECK_CODE(code, lino, _exit);
4,675,889✔
302

303
_exit:
4,675,889✔
304
  if (code) {
4,679,278✔
305
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
306
              tstrerror(code));
307
  } else {
308
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
4,679,278✔
309
  }
310
  return code;
4,682,478✔
311
}
312

313
// static int32_t
314
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs);
315
static int32_t apply_abort(STFileSystem *fs) { return tsdbFSDoSanAndFix(fs); }
118✔
316

317
static int32_t abort_edit(STFileSystem *fs) {
118✔
318
  char fname[TSDB_FILENAME_LEN];
118✔
319

320
  if (fs->etype == TSDB_FEDIT_COMMIT) {
118✔
321
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
118✔
322
  } else {
323
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
×
324
  }
325

326
  int32_t code;
327
  int32_t lino;
328
  if ((code = taosRemoveFile(fname))) {
118✔
329
    code = TAOS_SYSTEM_ERROR(code);
×
330
    TSDB_CHECK_CODE(code, lino, _exit);
×
331
  }
332

333
  code = apply_abort(fs);
118✔
334
  TSDB_CHECK_CODE(code, lino, _exit);
118✔
335

336
_exit:
118✔
337
  if (code) {
118✔
338
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
339
              tstrerror(code));
340
  } else {
341
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
118✔
342
  }
343
  return code;
118✔
344
}
345

346
static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) {
1,470,187✔
347
  int32_t code = 0;
1,470,187✔
348
  int32_t lino = 0;
1,470,187✔
349

350
  // check file existence
351
  if (!taosCheckExistFile(fobj->fname)) {
1,470,187✔
352
    bool found = false;
×
353

354
    if (tsSsEnabled && fobj->f->lcn > 1) {
×
355
      char fname1[TSDB_FILENAME_LEN];
×
356
      tsdbTFileLastChunkName(fs->tsdb, fobj->f, fname1);
×
357
      if (!taosCheckExistFile(fname1)) {
×
358
        code = TSDB_CODE_FILE_CORRUPTED;
×
359
        tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fname1);
×
360
        return code;
×
361
      }
362

363
      found = true;
×
364
    }
365

366
    if (!found) {
×
367
      code = TSDB_CODE_FILE_CORRUPTED;
×
368
      tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fobj->fname);
×
369
      return code;
×
370
    }
371
  }
372

373
  return 0;
1,471,303✔
374
}
375

376
static void tsdbFSDestroyFileObjHash(STFileHash *hash);
377

378
static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname) {
2,748,976✔
379
  STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry));
2,748,976✔
380
  if (entry == NULL) return terrno;
2,749,430✔
381

382
  tstrncpy(entry->fname, fname, TSDB_FILENAME_LEN);
2,749,430✔
383

384
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
2,749,430✔
385

386
  entry->next = hash->buckets[idx];
2,749,430✔
387
  hash->buckets[idx] = entry;
2,749,430✔
388
  hash->numFile++;
2,749,430✔
389

390
  return 0;
2,749,430✔
391
}
392

393
static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
1,277,341✔
394
  int32_t code = 0;
1,277,341✔
395
  int32_t lino;
396
  char    fname[TSDB_FILENAME_LEN];
1,277,266✔
397

398
  // init hash table
399
  hash->numFile = 0;
1,277,341✔
400
  hash->numBucket = 4096;
1,277,341✔
401
  hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *));
1,277,341✔
402
  if (hash->buckets == NULL) {
1,277,341✔
403
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
404
  }
405

406
  // vnode.json
407
  current_fname(fs->tsdb, fname, TSDB_FCURRENT);
1,277,341✔
408
  code = tsdbFSAddEntryToFileObjHash(hash, fname);
1,277,341✔
409
  TSDB_CHECK_CODE(code, lino, _exit);
1,277,341✔
410

411
  // other
412
  STFileSet *fset = NULL;
1,277,341✔
413
  TARRAY2_FOREACH(fs->fSetArr, fset) {
2,117,573✔
414
    // data file
415
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
4,201,160✔
416
      if (fset->farr[i] != NULL) {
3,360,928✔
417
        code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname);
644,385✔
418
        TSDB_CHECK_CODE(code, lino, _exit);
644,385✔
419

420
        if (TSDB_FTYPE_DATA == i && fset->farr[i]->f->lcn > 0) {
644,385✔
421
          STFileObj *fobj = fset->farr[i];
×
422
          int32_t    lcn = fobj->f->lcn;
×
423
          char       lcn_name[TSDB_FILENAME_LEN];
×
424

425
          snprintf(lcn_name, TSDB_FQDN_LEN, "%s", fobj->fname);
×
426
          char *dot = strrchr(lcn_name, '.');
×
427
          if (dot) {
×
428
            snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lcn_name), "%d.data", lcn);
×
429

430
            code = tsdbFSAddEntryToFileObjHash(hash, lcn_name);
×
431
            TSDB_CHECK_CODE(code, lino, _exit);
×
432
          }
433
        }
434
      }
435
    }
436

437
    // stt file
438
    SSttLvl *lvl = NULL;
840,232✔
439
    TARRAY2_FOREACH(fset->lvlArr, lvl) {
1,654,609✔
440
      STFileObj *fobj;
441
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
1,642,081✔
442
        code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname);
827,704✔
443
        TSDB_CHECK_CODE(code, lino, _exit);
827,704✔
444
      }
445
    }
446
  }
447

448
_exit:
1,277,341✔
449
  if (code) {
1,277,341✔
450
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
451
    tsdbFSDestroyFileObjHash(hash);
×
452
  }
453
  return code;
1,277,341✔
454
}
455

456
static const STFileHashEntry *tsdbFSGetFileObjHashEntry(STFileHash *hash, const char *fname) {
2,749,784✔
457
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
2,749,784✔
458

459
  STFileHashEntry *entry = hash->buckets[idx];
2,749,784✔
460
  while (entry) {
2,754,019✔
461
    if (strcmp(entry->fname, fname) == 0) {
2,753,665✔
462
      return entry;
2,748,970✔
463
    }
464
    entry = entry->next;
4,235✔
465
  }
466

467
  return NULL;
354✔
468
}
469

470
static void tsdbFSDestroyFileObjHash(STFileHash *hash) {
1,277,341✔
471
  for (int32_t i = 0; i < hash->numBucket; i++) {
2,147,483,647✔
472
    STFileHashEntry *entry = hash->buckets[i];
2,147,483,647✔
473
    while (entry) {
2,147,483,647✔
474
      STFileHashEntry *next = entry->next;
2,749,430✔
475
      taosMemoryFree(entry);
2,749,430✔
476
      entry = next;
2,749,430✔
477
    }
478
  }
479
  taosMemoryFree(hash->buckets);
1,277,341✔
480
  memset(hash, 0, sizeof(*hash));
1,277,341✔
481
}
1,277,341✔
482

483
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
1,279,238✔
484
  int32_t code = 0;
1,279,238✔
485
  int32_t lino = 0;
1,279,238✔
486
  int32_t corrupt = false;
1,279,345✔
487

488
  if (fs->tsdb->pVnode->mounted) goto _exit;
1,279,345✔
489

490
  {  // scan each file
491
    STFileSet *fset = NULL;
1,276,419✔
492
    TARRAY2_FOREACH(fs->fSetArr, fset) {
2,115,454✔
493
      // data file
494
      for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
4,189,819✔
495
        if (fset->farr[ftype] == NULL) continue;
3,351,118✔
496
        STFileObj *fobj = fset->farr[ftype];
644,145✔
497
        code = tsdbFSDoScanAndFixFile(fs, fobj);
644,265✔
498
        if (code) {
644,191✔
499
          fset->maxVerValid = (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
500
          corrupt = true;
×
501
        }
502
      }
503

504
      // stt file
505
      SSttLvl *lvl;
506
      TARRAY2_FOREACH(fset->lvlArr, lvl) {
1,653,078✔
507
        STFileObj *fobj;
508
        TARRAY2_FOREACH(lvl->fobjArr, fobj) {
1,640,106✔
509
          code = tsdbFSDoScanAndFixFile(fs, fobj);
826,511✔
510
          if (code) {
827,645✔
511
            fset->maxVerValid =
×
512
                (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
513
            corrupt = true;
×
514
          }
515
        }
516
      }
517
    }
518
  }
519

520
  {  // clear unreferenced files
521
    STfsDir *dir = NULL;
1,273,479✔
522
    TAOS_CHECK_GOTO(tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path, &dir), &lino, _exit);
1,274,790✔
523

524
    STFileHash fobjHash = {0};
1,277,341✔
525
    code = tsdbFSCreateFileObjHash(fs, &fobjHash);
1,277,341✔
526
    if (code) goto _close_dir;
1,277,341✔
527

528
    for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) {
5,304,466✔
529
      if (taosIsDir(file->aname)) continue;
4,026,465✔
530

531
      if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
2,749,784✔
532
        tsdbRemoveFile(file->aname);
354✔
533
      }
534
    }
535

536
    tsdbFSDestroyFileObjHash(&fobjHash);
1,277,341✔
537

538
  _close_dir:
1,277,341✔
539
    tfsClosedir(dir);
1,277,341✔
540
  }
541

542
_exit:
1,280,121✔
543
  if (corrupt) {
1,280,121✔
544
    tsdbError("vgId:%d, TSDB file system is corrupted", TD_VID(fs->tsdb->pVnode));
×
545
    fs->fsstate = TSDB_FS_STATE_INCOMPLETE;
×
546
    code = 0;
×
547
  }
548

549
  if (code) {
1,280,121✔
550
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
551
  }
552
  return code;
1,280,121✔
553
}
554

555
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
1,279,339✔
556
  fs->neid = 0;
1,279,339✔
557

558
  // get max commit id
559
  const STFileSet *fset;
560
  TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
2,120,664✔
561

562
  // scan and fix
563
  int32_t code = 0;
1,279,028✔
564
  int32_t lino = 0;
1,279,028✔
565

566
  code = tsdbFSDoSanAndFix(fs);
1,279,028✔
567
  TSDB_CHECK_CODE(code, lino, _exit);
1,280,003✔
568

569
_exit:
1,280,003✔
570
  if (code) {
1,280,003✔
571
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
572
  }
573
  return code;
1,280,003✔
574
}
575

576
static int32_t tsdbFSDupState(STFileSystem *fs) {
5,958,952✔
577
  int32_t code;
578

579
  const TFileSetArray *src = fs->fSetArr;
5,958,952✔
580
  TFileSetArray       *dst = fs->fSetArrTmp;
5,957,585✔
581

582
  TARRAY2_CLEAR(dst, tsdbTFileSetClear);
11,299,367✔
583

584
  const STFileSet *fset1;
585
  TARRAY2_FOREACH(src, fset1) {
12,209,254✔
586
    STFileSet *fset2;
6,260,849✔
587
    code = tsdbTFileSetInitCopy(fs->tsdb, fset1, &fset2);
6,260,576✔
588
    if (code) return code;
6,260,330✔
589
    code = TARRAY2_APPEND(dst, fset2);
6,260,330✔
590
    if (code) return code;
6,260,142✔
591
  }
592

593
  return 0;
5,951,604✔
594
}
595

596
static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
4,874,566✔
597
  int32_t code = 0;
4,874,566✔
598
  int32_t lino = 0;
4,874,566✔
599
  STsdb  *pTsdb = fs->tsdb;
4,874,566✔
600

601
  char fCurrent[TSDB_FILENAME_LEN];
4,902,002✔
602
  char cCurrent[TSDB_FILENAME_LEN];
4,902,436✔
603
  char mCurrent[TSDB_FILENAME_LEN];
4,901,601✔
604

605
  current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
4,904,862✔
606
  current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
4,905,403✔
607
  current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
4,907,413✔
608

609
  if (taosCheckExistFile(fCurrent)) {  // current.json exists
4,906,625✔
610
    code = load_fs(pTsdb, fCurrent, fs->fSetArr);
1,280,910✔
611
    TSDB_CHECK_CODE(code, lino, _exit);
1,280,910✔
612

613
    if (taosCheckExistFile(cCurrent)) {
1,280,910✔
614
      // current.c.json exists
615

616
      fs->etype = TSDB_FEDIT_COMMIT;
×
617
      if (rollback) {
×
618
        code = abort_edit(fs);
×
619
        TSDB_CHECK_CODE(code, lino, _exit);
×
620
      } else {
621
        code = load_fs(pTsdb, cCurrent, fs->fSetArrTmp);
×
622
        TSDB_CHECK_CODE(code, lino, _exit);
×
623

624
        code = commit_edit(fs);
×
625
        TSDB_CHECK_CODE(code, lino, _exit);
×
626
      }
627
    } else if (taosCheckExistFile(mCurrent)) {
1,280,910✔
628
      // current.m.json exists
629
      fs->etype = TSDB_FEDIT_MERGE;
×
630
      code = abort_edit(fs);
×
631
      TSDB_CHECK_CODE(code, lino, _exit);
×
632
    }
633

634
    code = tsdbFSDupState(fs);
1,280,910✔
635
    TSDB_CHECK_CODE(code, lino, _exit);
1,277,068✔
636

637
    if (!tsdbShouldForceRepair(fs)) {
1,277,068✔
638
      code = tsdbFSScanAndFix(fs);
1,279,339✔
639
      TSDB_CHECK_CODE(code, lino, _exit);
1,280,003✔
640
    } else {
641
      code = tsdbForceRepair(fs);
907✔
642
      TSDB_CHECK_CODE(code, lino, _exit);
907✔
643
    }
644
  } else {
645
    code = save_fs(fs->fSetArr, fCurrent);
3,623,166✔
646
    TSDB_CHECK_CODE(code, lino, _exit);
3,622,032✔
647
  }
648

649
_exit:
4,902,867✔
650
  if (code) {
4,898,179✔
651
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
652
  } else {
653
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
4,898,179✔
654
  }
655
  return code;
4,903,398✔
656
}
657

658
static void close_file_system(STFileSystem *fs) {
4,905,346✔
659
  TARRAY2_CLEAR(fs->fSetArr, tsdbTFileSetClear);
23,448,027✔
660
  TARRAY2_CLEAR(fs->fSetArrTmp, tsdbTFileSetClear);
23,424,245✔
661
}
4,905,974✔
662

663
static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) {
×
664
  if (pSet1->fid < pSet2->fid) {
×
665
    return -1;
×
666
  } else if (pSet1->fid > pSet2->fid) {
×
667
    return 1;
×
668
  }
669
  return 0;
×
670
}
671

672
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
4,679,652✔
673
  int32_t code = 0;
4,679,652✔
674
  int32_t lino = 0;
4,679,652✔
675

676
  code = tsdbFSDupState(fs);
4,679,652✔
677
  if (code) return code;
4,670,652✔
678

679
  TFileSetArray  *fsetArray = fs->fSetArrTmp;
4,670,652✔
680
  STFileSet      *fset = NULL;
4,679,168✔
681
  const STFileOp *op;
682
  int32_t         fid = INT32_MIN;
4,679,168✔
683
  TSKEY           now = taosGetTimestampMs();
4,676,835✔
684
  TARRAY2_FOREACH_PTR(opArray, op) {
31,610,473✔
685
    if (!fset || fset->fid != op->fid) {
26,934,037✔
686
      STFileSet tfset = {.fid = op->fid};
21,382,567✔
687
      fset = &tfset;
21,389,789✔
688
      STFileSet **fsetPtr = TARRAY2_SEARCH(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ);
21,386,668✔
689
      fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
21,386,668✔
690

691
      if (!fset) {
21,386,668✔
692
        code = tsdbTFileSetInit(op->fid, &fset);
17,695,977✔
693
        TSDB_CHECK_CODE(code, lino, _exit);
17,694,947✔
694

695
        code = TARRAY2_SORT_INSERT(fsetArray, fset, tsdbTFileSetCmprFn);
17,697,244✔
696
        TSDB_CHECK_CODE(code, lino, _exit);
17,697,244✔
697
      }
698
    }
699

700
    code = tsdbTFileSetEdit(fs->tsdb, fset, op);
26,940,632✔
701
    TSDB_CHECK_CODE(code, lino, _exit);
26,935,771✔
702

703
    if (fid != op->fid) {
26,935,771✔
704
      fid = op->fid;
21,382,550✔
705
      if (etype == TSDB_FEDIT_COMMIT) {
21,382,366✔
706
        fset->lastCommit = now;
20,077,957✔
707
      } else if (etype == TSDB_FEDIT_COMPACT) {
1,304,409✔
708
        fset->lastCompact = now;
109,958✔
709
      } else if (etype == TSDB_FEDIT_SSMIGRATE) {
1,194,451✔
710
        fset->lastMigrate = now;
1,660✔
711
      } else if (etype == TSDB_FEDIT_ROLLUP) {
1,192,791✔
712
        fset->lastRollupLevel = fs->rollupLevel;
19,920✔
713
        fset->lastRollup = now;
19,920✔
714
        fset->lastCompact = now;  // rollup implies compact
19,920✔
715
      }
716
    }
717
  }
718

719
  // remove empty empty stt level and empty file set
720
  int32_t i = 0;
4,676,883✔
721
  while (i < TARRAY2_SIZE(fsetArray)) {
27,792,340✔
722
    fset = TARRAY2_GET(fsetArray, i);
23,121,013✔
723

724
    SSttLvl *lvl;
725
    int32_t  j = 0;
23,119,532✔
726
    while (j < TARRAY2_SIZE(fset->lvlArr)) {
49,981,867✔
727
      lvl = TARRAY2_GET(fset->lvlArr, j);
26,869,034✔
728

729
      if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
26,865,875✔
730
        TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
1,621,518✔
731
      } else {
732
        j++;
25,240,817✔
733
      }
734
    }
735

736
    if (tsdbTFileSetIsEmpty(fset)) {
23,115,908✔
737
      TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear);
101,186✔
738
    } else {
739
      i++;
23,015,833✔
740
    }
741
  }
742

743
_exit:
4,673,389✔
744
  if (code) {
4,673,610✔
745
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
746
  }
747
  return code;
4,676,193✔
748
}
749

750
// return error code
751
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) {
4,884,208✔
752
  int32_t code;
753
  int32_t lino;
754

755
  code = tsdbCheckAndUpgradeFileSystem(pTsdb, rollback);
4,884,208✔
756
  TSDB_CHECK_CODE(code, lino, _exit);
4,902,943✔
757

758
  code = create_fs(pTsdb, fs);
4,902,943✔
759
  TSDB_CHECK_CODE(code, lino, _exit);
4,890,202✔
760

761
  code = open_fs(fs[0], rollback);
4,890,202✔
762
  TSDB_CHECK_CODE(code, lino, _exit);
4,907,460✔
763

764
_exit:
4,907,460✔
765
  if (code) {
4,907,460✔
766
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
767
    destroy_fs(fs);
×
768
  } else {
769
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
4,907,460✔
770
  }
771
  return code;
4,907,460✔
772
}
773

774
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block);
775
extern void tsdbStopAllCompTask(STsdb *tsdb);
776
extern void tsdbStopAllRetentionTask(STsdb *tsdb);
777

778
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
4,943,617✔
779
  STFileSystem *fs = pTsdb->pFS;
4,943,617✔
780
  SArray       *asyncTasks = taosArrayInit(0, sizeof(SVATaskID));
4,943,617✔
781
  if (asyncTasks == NULL) {
4,942,123✔
782
    return terrno;
×
783
  }
784

785
  (void)taosThreadMutexLock(&pTsdb->mutex);
4,942,123✔
786

787
  // disable
788
  pTsdb->bgTaskDisabled = true;
4,943,611✔
789

790
  // collect channel
791
  STFileSet *fset;
792
  TARRAY2_FOREACH(fs->fSetArr, fset) {
23,490,113✔
793
    if (taosArrayPush(asyncTasks, &fset->mergeTask) == NULL       //
37,096,163✔
794
        || taosArrayPush(asyncTasks, &fset->compactTask) == NULL  //
37,096,224✔
795
        || taosArrayPush(asyncTasks, &fset->retentionTask) == NULL
37,096,224✔
796
        || taosArrayPush(asyncTasks, &fset->migrateTask) == NULL) {
37,096,224✔
797
      taosArrayDestroy(asyncTasks);
699✔
798
      (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
799
      return terrno;
×
800
    }
801
    tsdbFSSetBlockCommit(fset, false);
18,547,413✔
802
  }
803

804
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
4,942,679✔
805

806
  // destroy all channels
807
  for (int32_t k = 0; k < 2; k++) {
14,828,027✔
808
    for (int32_t i = 0; i < taosArrayGetSize(asyncTasks); i++) {
158,262,633✔
809
      SVATaskID *task = taosArrayGet(asyncTasks, i);
148,372,476✔
810
      if (k == 0) {
148,376,232✔
811
        (void)vnodeACancel(task);
74,191,113✔
812
      } else {
813
        vnodeAWait(task);
74,185,119✔
814
      }
815
    }
816
  }
817
  taosArrayDestroy(asyncTasks);
4,943,598✔
818

819
#ifdef TD_ENTERPRISE
820
  tsdbStopAllCompTask(pTsdb);
4,942,123✔
821
#endif
822
  tsdbStopAllRetentionTask(pTsdb);
4,943,163✔
823
  return 0;
4,943,622✔
824
}
825

826
void tsdbEnableBgTask(STsdb *pTsdb) {
36,297✔
827
  (void)taosThreadMutexLock(&pTsdb->mutex);
36,297✔
828
  pTsdb->bgTaskDisabled = false;
36,297✔
829
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
36,297✔
830
}
36,297✔
831

832
void tsdbCloseFS(STFileSystem **fs) {
4,904,580✔
833
  if (fs[0] == NULL) return;
4,904,580✔
834

835
  int32_t code = tsdbDisableAndCancelAllBgTask((*fs)->tsdb);
4,906,045✔
836
  if (code) {
4,907,325✔
837
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID((*fs)->tsdb->pVnode), __func__, __LINE__,
×
838
              tstrerror(code));
839
  }
840
  close_file_system(fs[0]);
4,907,325✔
841
  destroy_fs(fs);
4,904,819✔
842
  return;
4,906,525✔
843
}
844

845
int64_t tsdbFSAllocEid(STFileSystem *fs) {
4,796,586✔
846
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
4,796,586✔
847
  int64_t cid = ++fs->neid;
4,798,018✔
848
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
4,797,840✔
849
  return cid;
4,796,842✔
850
}
851

852
void tsdbFSUpdateEid(STFileSystem *fs, int64_t cid) {
138,768✔
853
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
138,768✔
854
  fs->neid = TMAX(fs->neid, cid);
138,768✔
855
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
138,768✔
856
}
138,768✔
857

858
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
4,676,723✔
859
  int32_t code = 0;
4,676,723✔
860
  int32_t lino;
861
  char    current_t[TSDB_FILENAME_LEN];
4,676,381✔
862

863
  if (etype == TSDB_FEDIT_COMMIT) {
4,677,486✔
864
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
3,374,430✔
865
  } else {
866
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
1,303,056✔
867
  }
868

869
  if (tsem_wait(&fs->canEdit) != 0) {
4,678,394✔
870
    tsdbError("vgId:%d failed to wait semaphore", TD_VID(fs->tsdb->pVnode));
×
871
  }
872
  fs->etype = etype;
4,676,435✔
873

874
  // edit
875
  code = edit_fs(fs, opArray, etype);
4,676,088✔
876
  TSDB_CHECK_CODE(code, lino, _exit);
4,675,542✔
877

878
  // save fs
879
  code = save_fs(fs->fSetArrTmp, current_t);
4,675,542✔
880
  TSDB_CHECK_CODE(code, lino, _exit);
4,679,912✔
881

882
_exit:
4,679,912✔
883
  if (code) {
4,679,912✔
884
    tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, lino,
×
885
              tstrerror(code), etype);
886
  } else {
887
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, etype);
4,679,912✔
888
  }
889
  return code;
4,681,594✔
890
}
891

892
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
37,073,302✔
893
  if (block) {
37,073,302✔
894
    fset->blockCommit = true;
4✔
895
  } else {
896
    fset->blockCommit = false;
37,073,298✔
897
    if (fset->numWaitCommit > 0) {
37,072,941✔
898
      (void)taosThreadCondSignal(&fset->canCommit);
4✔
899
    }
900
  }
901
}
37,073,454✔
902

903
void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
20,060,277✔
904
  (void)taosThreadMutexLock(&tsdb->mutex);
20,060,277✔
905
  STFileSet *fset;
20,061,382✔
906
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
20,061,468✔
907
  bool blockCommit = false;
20,020,190✔
908
  if (fset) {
20,020,190✔
909
    blockCommit = fset->blockCommit;
2,415,408✔
910
  }
911
  if (fset) {
20,020,951✔
912
    METRICS_TIMING_BLOCK(tsdb->pVnode->writeMetrics.block_commit_time, METRIC_LEVEL_HIGH, {
2,415,515✔
913
      while (fset->blockCommit) {
914
        fset->numWaitCommit++;
915
        (void)taosThreadCondWait(&fset->canCommit, &tsdb->mutex);
916
        fset->numWaitCommit--;
917
      }
918
    });
919
  }
920
  if (blockCommit) {
20,024,372✔
921
    METRICS_UPDATE(tsdb->pVnode->writeMetrics.blocked_commit_count, METRIC_LEVEL_HIGH, 1);
4✔
922
  }
923
  (void)taosThreadMutexUnlock(&tsdb->mutex);
20,024,372✔
924
  return;
20,030,717✔
925
}
926

927
// IMPORTANT: the caller must hold fs->tsdb->mutex
928
int32_t tsdbFSEditCommit(STFileSystem *fs) {
4,681,436✔
929
  int32_t code = 0;
4,681,436✔
930
  int32_t lino = 0;
4,681,436✔
931

932
  // commit
933
  code = commit_edit(fs);
4,681,436✔
934
  TSDB_CHECK_CODE(code, lino, _exit);
4,681,436✔
935

936
  // Disable merge schedule when repair
937
  if (fs->etype == TSDB_FEDIT_FORCE_REPAIR) {
4,681,436✔
938
    goto _exit;
545✔
939
  }
940

941
  // schedule merge
942
  int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
4,680,891✔
943
  if (sttTrigger > 1 && !fs->tsdb->bgTaskDisabled) {
4,680,891✔
944
    STFileSet *fset;
945
    TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
22,971,708✔
946
      if (TARRAY2_SIZE(fset->lvlArr) == 0) {
18,525,947✔
947
        tsdbFSSetBlockCommit(fset, false);
477,271✔
948
        continue;
477,271✔
949
      }
950

951
      SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
18,048,676✔
952
      if (lvl->level != 0) {
18,048,676✔
953
        tsdbFSSetBlockCommit(fset, false);
1,069,900✔
954
        continue;
1,069,900✔
955
      }
956

957
      // bool    skipMerge = false;
958
      int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
16,978,776✔
959
      if (numFile >= sttTrigger && (!vnodeATaskValid(&fset->mergeTask))) {
16,978,776✔
960
        SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
1,145,006✔
961
        if (arg == NULL) {
1,145,006✔
962
          code = terrno;
×
963
          TSDB_CHECK_CODE(code, lino, _exit);
×
964
        }
965

966
        arg->tsdb = fs->tsdb;
1,145,006✔
967
        arg->fid = fset->fid;
1,145,006✔
968

969
        code = vnodeAsync(MERGE_TASK_ASYNC, EVA_PRIORITY_HIGH, tsdbMerge, taosAutoMemoryFree, arg, &fset->mergeTask);
1,145,006✔
970
        TSDB_CHECK_CODE(code, lino, _exit);
1,145,006✔
971
      }
972

973
      if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
16,978,776✔
974
        tsdbFSSetBlockCommit(fset, true);
4✔
975
      } else {
976
        tsdbFSSetBlockCommit(fset, false);
16,978,772✔
977
      }
978
    }
979
  }
980

981
_exit:
4,680,891✔
982
  if (code) {
4,681,436✔
983
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
×
984
  } else {
985
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
4,681,436✔
986
  }
987
  if (tsem_post(&fs->canEdit) != 0) {
4,681,436✔
988
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
989
  }
990
  return code;
4,681,436✔
991
}
992

993
int32_t tsdbFSEditAbort(STFileSystem *fs) {
118✔
994
  int32_t code = abort_edit(fs);
118✔
995
  if (tsem_post(&fs->canEdit) != 0) {
118✔
996
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
997
  }
998
  return code;
118✔
999
}
1000

1001
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
45,347,403✔
1002
  STFileSet   tfset = {.fid = fid};
45,347,403✔
1003
  STFileSet  *pset = &tfset;
45,357,464✔
1004
  STFileSet **fsetPtr = TARRAY2_SEARCH(fs->fSetArr, &pset, tsdbTFileSetCmprFn, TD_EQ);
45,363,301✔
1005
  fset[0] = (fsetPtr == NULL) ? NULL : fsetPtr[0];
45,332,109✔
1006
}
45,330,636✔
1007

1008
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
30,598✔
1009
  int32_t    code = 0;
30,598✔
1010
  STFileSet *fset;
1011
  STFileSet *fset1;
30,598✔
1012

1013
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
30,598✔
1014
  if (fsetArr[0] == NULL) return terrno;
30,598✔
1015

1016
  TARRAY2_INIT(fsetArr[0]);
30,598✔
1017

1018
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
30,598✔
1019
  TARRAY2_FOREACH(fs->fSetArr, fset) {
30,598✔
1020
    code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1);
×
1021
    if (code) break;
×
1022

1023
    code = TARRAY2_APPEND(fsetArr[0], fset1);
×
1024
    if (code) break;
×
1025
  }
1026
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
30,598✔
1027

1028
  if (code) {
30,598✔
1029
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1030
    taosMemoryFree(fsetArr[0]);
×
1031
    fsetArr[0] = NULL;
×
1032
  }
1033
  return code;
30,598✔
1034
}
1035

1036
void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
30,996✔
1037
  if (fsetArr[0]) {
30,996✔
1038
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
31,394✔
1039
    taosMemoryFree(fsetArr[0]);
30,996✔
1040
    fsetArr[0] = NULL;
30,996✔
1041
  }
1042
}
30,996✔
1043

1044
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
34,564✔
1045
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
34,564✔
1046
  int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr);
34,564✔
1047
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
34,564✔
1048
  return code;
34,564✔
1049
}
1050

1051
int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr) {
315,157,385✔
1052
  int32_t    code = 0;
315,157,385✔
1053
  STFileSet *fset, *fset1;
315,154,095✔
1054

1055
  fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0]));
315,326,486✔
1056
  if (fsetArr[0] == NULL) return terrno;
315,162,207✔
1057

1058
  TARRAY2_FOREACH(fs->fSetArr, fset) {
637,548,946✔
1059
    code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1);
323,195,378✔
1060
    if (code) break;
322,989,223✔
1061

1062
    code = TARRAY2_APPEND(fsetArr[0], fset1);
322,989,223✔
1063
    if (code) {
322,929,667✔
1064
      tsdbTFileSetClear(&fset1);
×
1065
      break;
×
1066
    }
1067
  }
1068

1069
  if (code) {
315,242,029✔
1070
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1071
    taosMemoryFree(fsetArr[0]);
×
1072
    fsetArr[0] = NULL;
×
1073
  }
1074
  return code;
315,242,029✔
1075
}
1076

1077
void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
315,543,571✔
1078
  if (fsetArr[0]) {
315,543,571✔
1079
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
639,039,916✔
1080
    taosMemoryFreeClear(fsetArr[0]);
315,546,665✔
1081
    fsetArr[0] = NULL;
315,321,530✔
1082
  }
1083
}
315,292,426✔
1084

1085
static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) {
1,559✔
1086
  int32_t   capacity = TARRAY2_SIZE(pRanges) * 2;
1,559✔
1087
  SHashObj *pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,559✔
1088
  if (pHash == NULL) {
1,559✔
1089
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1090
    return NULL;
×
1091
  }
1092

1093
  for (int32_t i = 0; i < TARRAY2_SIZE(pRanges); i++) {
3,118✔
1094
    STFileSetRange *u = TARRAY2_GET(pRanges, i);
1,559✔
1095
    int32_t         fid = u->fid;
1,559✔
1096
    int32_t         code = taosHashPut(pHash, &fid, sizeof(fid), u, sizeof(*u));
1,559✔
1097
    tsdbDebug("range diff hash fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
1,559✔
1098
  }
1099
  return pHash;
1,559✔
1100
}
1101

1102
int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr,
398✔
1103
                                       TFileOpArray *fopArr) {
1104
  int32_t    code = 0;
398✔
1105
  STFileSet *fset;
1106
  STFileSet *fset1;
398✔
1107
  SHashObj  *pHash = NULL;
398✔
1108

1109
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
398✔
1110
  if (fsetArr == NULL) return terrno;
398✔
1111
  TARRAY2_INIT(fsetArr[0]);
398✔
1112

1113
  if (pRanges) {
398✔
1114
    pHash = tsdbFSetRangeArrayToHash(pRanges);
398✔
1115
    if (pHash == NULL) {
398✔
1116
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1117
      goto _out;
×
1118
    }
1119
  }
1120

1121
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
398✔
1122
  TARRAY2_FOREACH(fs->fSetArr, fset) {
796✔
1123
    int64_t ever = VERSION_MAX;
398✔
1124
    if (pHash) {
398✔
1125
      int32_t         fid = fset->fid;
398✔
1126
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
398✔
1127
      if (u) {
398✔
1128
        ever = u->sver - 1;
398✔
1129
      }
1130
    }
1131

1132
    code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr);
398✔
1133
    if (code) break;
398✔
1134

1135
    code = TARRAY2_APPEND(fsetArr[0], fset1);
398✔
1136
    if (code) break;
398✔
1137
  }
1138
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
398✔
1139

1140
_out:
398✔
1141
  if (code) {
398✔
1142
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1143
    taosMemoryFree(fsetArr[0]);
×
1144
    fsetArr[0] = NULL;
×
1145
  }
1146
  if (pHash) {
398✔
1147
    taosHashCleanup(pHash);
398✔
1148
    pHash = NULL;
398✔
1149
  }
1150
  return code;
398✔
1151
}
1152

1153
void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); }
398✔
1154

1155
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
1,161✔
1156
                                      TFileSetRangeArray **fsrArr) {
1157
  int32_t         code = 0;
1,161✔
1158
  STFileSet      *fset;
1159
  STFileSetRange *fsr1 = NULL;
1,161✔
1160
  SHashObj       *pHash = NULL;
1,161✔
1161

1162
  fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
1,161✔
1163
  if (fsrArr[0] == NULL) {
1,161✔
1164
    code = terrno;
×
1165
    goto _out;
×
1166
  }
1167

1168
  tsdbInfo("pRanges size:%d", (pRanges == NULL ? 0 : TARRAY2_SIZE(pRanges)));
1,161✔
1169
  if (pRanges) {
1,161✔
1170
    pHash = tsdbFSetRangeArrayToHash(pRanges);
1,161✔
1171
    if (pHash == NULL) {
1,161✔
1172
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1173
      goto _out;
×
1174
    }
1175
  }
1176

1177
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
1,161✔
1178
  TARRAY2_FOREACH(fs->fSetArr, fset) {
2,322✔
1179
    int64_t sver1 = sver;
1,161✔
1180
    int64_t ever1 = ever;
1,161✔
1181

1182
    if (pHash) {
1,161✔
1183
      int32_t         fid = fset->fid;
1,161✔
1184
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
1,161✔
1185
      if (u) {
1,161✔
1186
        sver1 = u->sver;
1,161✔
1187
        tsdbDebug("range hash get fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
1,161✔
1188
      }
1189
    }
1190

1191
    if (sver1 > ever1) {
1,161✔
1192
      tsdbDebug("skip fid:%d, sver:%" PRId64 ", ever:%" PRId64, fset->fid, sver1, ever1);
×
1193
      continue;
×
1194
    }
1195

1196
    tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
1,161✔
1197

1198
    code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
1,161✔
1199
    if (code) break;
1,161✔
1200

1201
    code = TARRAY2_APPEND(fsrArr[0], fsr1);
1,161✔
1202
    if (code) break;
1,161✔
1203

1204
    fsr1 = NULL;
1,161✔
1205
  }
1206
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
1,161✔
1207

1208
  if (code) {
1,161✔
1209
    tsdbTFileSetRangeClear(&fsr1);
×
1210
    TARRAY2_DESTROY(fsrArr[0], tsdbTFileSetRangeClear);
×
1211
    fsrArr[0] = NULL;
×
1212
  }
1213

1214
_out:
1,161✔
1215
  if (pHash) {
1,161✔
1216
    taosHashCleanup(pHash);
1,161✔
1217
    pHash = NULL;
1,161✔
1218
  }
1219
  return code;
1,161✔
1220
}
1221

1222
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { tsdbTFileSetRangeArrayDestroy(fsrArr); }
1,161✔
1223

1224
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task, STFileSet **fset) {
21,485,573✔
1225
  // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
1226
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
21,485,573✔
1227

1228
  tsdbFSGetFSet(tsdb->pFS, fid, fset);
21,495,831✔
1229
  if (*fset == NULL) {
21,491,799✔
1230
    return;
17,679,083✔
1231
  }
1232

1233
  struct STFileSetCond *cond = NULL;
3,813,849✔
1234
  if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
3,813,849✔
1235
    cond = &(*fset)->conds[0];
2,497,135✔
1236
  } else {
1237
    cond = &(*fset)->conds[1];
1,316,714✔
1238
  }
1239

1240
  while (1) {
1241
    if (cond->running) {
3,816,339✔
1242
      cond->numWait++;
2,490✔
1243
      (void)taosThreadCondWait(&cond->cond, &tsdb->mutex);
2,490✔
1244
      cond->numWait--;
2,490✔
1245
    } else {
1246
      cond->running = true;
3,813,849✔
1247
      break;
3,813,849✔
1248
    }
1249
  }
1250

1251
  tsdbTrace("vgId:%d begin %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
3,813,849✔
1252
  return;
3,813,849✔
1253
}
1254

1255
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid, EVATaskT task) {
3,813,849✔
1256
  // Here, sttTrigger is protected by tsdb->mutex, so it is safe to read it without lock
1257
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
3,813,849✔
1258

1259
  STFileSet *fset = NULL;
3,813,849✔
1260
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
3,813,849✔
1261
  if (fset == NULL) {
3,813,849✔
1262
    return;
×
1263
  }
1264

1265
  struct STFileSetCond *cond = NULL;
3,813,849✔
1266
  if (sttTrigger == 1 || task == EVA_TASK_COMMIT) {
3,813,849✔
1267
    cond = &fset->conds[0];
2,497,135✔
1268
  } else {
1269
    cond = &fset->conds[1];
1,316,714✔
1270
  }
1271

1272
  cond->running = false;
3,813,849✔
1273
  if (cond->numWait > 0) {
3,813,849✔
1274
    (void)taosThreadCondSignal(&cond->cond);
2,490✔
1275
  }
1276

1277
  tsdbTrace("vgId:%d finish %s task on file set:%d", TD_VID(tsdb->pVnode), vnodeGetATaskName(task), fid);
3,813,849✔
1278
  return;
3,813,849✔
1279
}
1280

1281
struct SFileSetReader {
1282
  STsdb     *pTsdb;
1283
  STFileSet *pFileSet;
1284
  int32_t    fid;
1285
  int64_t    startTime;
1286
  int64_t    endTime;
1287
  int64_t    lastCompactTime;
1288
  int64_t    totalSize;
1289
};
1290

1291
int32_t tsdbFileSetReaderOpen(void *pVnode, struct SFileSetReader **ppReader) {
210✔
1292
  if (pVnode == NULL || ppReader == NULL) {
210✔
1293
    return TSDB_CODE_INVALID_PARA;
×
1294
  }
1295

1296
  STsdb *pTsdb = ((SVnode *)pVnode)->pTsdb;
210✔
1297

1298
  (*ppReader) = taosMemoryCalloc(1, sizeof(struct SFileSetReader));
210✔
1299
  if (*ppReader == NULL) {
210✔
1300
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, __LINE__,
×
1301
              tstrerror(terrno));
1302
    return terrno;
×
1303
  }
1304

1305
  (*ppReader)->pTsdb = pTsdb;
210✔
1306
  (*ppReader)->fid = INT32_MIN;
210✔
1307
  (*ppReader)->pFileSet = NULL;
210✔
1308

1309
  return TSDB_CODE_SUCCESS;
210✔
1310
}
1311

1312
static int32_t tsdbFileSetReaderNextNoLock(struct SFileSetReader *pReader) {
420✔
1313
  STsdb  *pTsdb = pReader->pTsdb;
420✔
1314
  int32_t code = TSDB_CODE_SUCCESS;
420✔
1315

1316
  tsdbTFileSetClear(&pReader->pFileSet);
420✔
1317

1318
  STFileSet *fset = &(STFileSet){
420✔
1319
      .fid = pReader->fid,
420✔
1320
  };
1321

1322
  STFileSet **fsetPtr = TARRAY2_SEARCH(pReader->pTsdb->pFS->fSetArr, &fset, tsdbTFileSetCmprFn, TD_GT);
420✔
1323
  if (fsetPtr == NULL) {
420✔
1324
    pReader->fid = INT32_MAX;
210✔
1325
    return TSDB_CODE_NOT_FOUND;
210✔
1326
  }
1327

1328
  // ref file set
1329
  code = tsdbTFileSetInitRef(pReader->pTsdb, *fsetPtr, &pReader->pFileSet);
210✔
1330
  if (code) return code;
210✔
1331

1332
  // get file set details
1333
  pReader->fid = pReader->pFileSet->fid;
210✔
1334
  tsdbFidKeyRange(pReader->fid, pTsdb->keepCfg.days, pTsdb->keepCfg.precision, &pReader->startTime, &pReader->endTime);
210✔
1335
  if (pTsdb->keepCfg.precision == TSDB_TIME_PRECISION_MICRO) {
210✔
1336
    pReader->startTime /= 1000;
×
1337
    pReader->endTime /= 1000;
×
1338
  } else if (pTsdb->keepCfg.precision == TSDB_TIME_PRECISION_NANO) {
210✔
1339
    pReader->startTime /= 1000000;
×
1340
    pReader->endTime /= 1000000;
×
1341
  }
1342
  pReader->lastCompactTime = pReader->pFileSet->lastCompact;
210✔
1343
  pReader->totalSize = 0;
210✔
1344
  for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
1,050✔
1345
    STFileObj *fobj = pReader->pFileSet->farr[i];
840✔
1346
    if (fobj) {
840✔
1347
      pReader->totalSize += fobj->f->size;
×
1348
    }
1349
  }
1350
  SSttLvl *lvl;
1351
  TARRAY2_FOREACH(pReader->pFileSet->lvlArr, lvl) {
420✔
1352
    STFileObj *fobj;
1353
    TARRAY2_FOREACH(lvl->fobjArr, fobj) { pReader->totalSize += fobj->f->size; }
420✔
1354
  }
1355

1356
  return code;
210✔
1357
}
1358

1359
int32_t tsdbFileSetReaderNext(struct SFileSetReader *pReader) {
420✔
1360
  int32_t code = TSDB_CODE_SUCCESS;
420✔
1361
  (void)taosThreadMutexLock(&pReader->pTsdb->mutex);
420✔
1362
  code = tsdbFileSetReaderNextNoLock(pReader);
420✔
1363
  (void)taosThreadMutexUnlock(&pReader->pTsdb->mutex);
420✔
1364
  return code;
420✔
1365
}
1366

1367
extern bool tsdbShouldCompact(STFileSet *fset, int32_t vgId, int32_t expLevel, ETsdbOpType type);
1368
int32_t tsdbFileSetGetEntryField(struct SFileSetReader *pReader, const char *field, void *value) {
1,260✔
1369
  const char *fieldName;
1370

1371
  if (pReader->fid == INT32_MIN || pReader->fid == INT32_MAX) {
1,260✔
1372
    return TSDB_CODE_INVALID_PARA;
×
1373
  }
1374

1375
  fieldName = "fileset_id";
1,260✔
1376
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
1,260✔
1377
    *(int32_t *)value = pReader->fid;
210✔
1378
    return TSDB_CODE_SUCCESS;
210✔
1379
  }
1380

1381
  fieldName = "start_time";
1,050✔
1382
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
1,050✔
1383
    *(int64_t *)value = pReader->startTime;
210✔
1384
    return TSDB_CODE_SUCCESS;
210✔
1385
  }
1386

1387
  fieldName = "end_time";
840✔
1388
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
840✔
1389
    *(int64_t *)value = pReader->endTime;
210✔
1390
    return TSDB_CODE_SUCCESS;
210✔
1391
  }
1392

1393
  fieldName = "total_size";
630✔
1394
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
630✔
1395
    *(int64_t *)value = pReader->totalSize;
210✔
1396
    return TSDB_CODE_SUCCESS;
210✔
1397
  }
1398

1399
  fieldName = "last_compact_time";
420✔
1400
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
420✔
1401
    *(int64_t *)value = pReader->lastCompactTime;
210✔
1402
    return TSDB_CODE_SUCCESS;
210✔
1403
  }
1404

1405
  fieldName = "should_compact";
210✔
1406
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
210✔
1407
    *(bool *)value = false;
210✔
1408
#ifdef TD_ENTERPRISE
1409
    *(bool *)value = tsdbShouldCompact(pReader->pFileSet, pReader->pTsdb->pVnode->config.vgId, 0, TSDB_OPTR_NORMAL);
210✔
1410
#endif
1411
    return TSDB_CODE_SUCCESS;
210✔
1412
  }
1413

1414
  fieldName = "details";
×
1415
  if (strncmp(field, fieldName, strlen(fieldName) + 1) == 0) {
×
1416
    // TODO
1417
    return TSDB_CODE_SUCCESS;
×
1418
  }
1419

1420
  return TSDB_CODE_INVALID_PARA;
×
1421
}
1422

1423
void tsdbFileSetReaderClose(struct SFileSetReader **ppReader) {
210✔
1424
  if (ppReader == NULL || *ppReader == NULL) {
210✔
1425
    return;
×
1426
  }
1427

1428
  tsdbTFileSetClear(&(*ppReader)->pFileSet);
210✔
1429
  taosMemoryFree(*ppReader);
210✔
1430

1431
  *ppReader = NULL;
210✔
1432
  return;
210✔
1433
}
1434

1435
static FORCE_INLINE void getLevelSize(const STFileObj *fObj, int64_t szArr[TFS_MAX_TIERS]) {
1436
  if (fObj == NULL) return;
16,499,209✔
1437

1438
  int64_t sz = fObj->f->size;
11,420,731✔
1439
  // level == 0, primary storage
1440
  // level == 1, second storage,
1441
  // level == 2, third storage
1442
  int32_t level = fObj->f->did.level;
11,420,731✔
1443
  if (level >= 0 && level < TFS_MAX_TIERS) {
11,420,731✔
1444
    szArr[level] += sz;
11,420,731✔
1445
  }
1446
}
1447

1448
static FORCE_INLINE int32_t tsdbGetFsSizeImpl(STsdb *tsdb, SDbSizeStatisInfo *pInfo) {
1449
  int32_t code = 0;
7,816,166✔
1450
  int64_t levelSize[TFS_MAX_TIERS] = {0};
7,816,166✔
1451
  int64_t ssSize = 0;
7,816,166✔
1452

1453
  const STFileSet *fset;
1454
  const SSttLvl   *stt = NULL;
7,816,166✔
1455
  const STFileObj *fObj = NULL;
7,816,166✔
1456

1457
  SVnodeCfg *pCfg = &tsdb->pVnode->config;
7,816,166✔
1458
  int64_t    chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->ssChunkSize;
7,816,166✔
1459

1460
  TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
13,282,999✔
1461
    for (int32_t t = TSDB_FTYPE_MIN; t < TSDB_FTYPE_MAX; ++t) {
27,334,165✔
1462
      getLevelSize(fset->farr[t], levelSize);
21,867,332✔
1463
    }
1464

1465
    TARRAY2_FOREACH(fset->lvlArr, stt) {
8,436,521✔
1466
      TARRAY2_FOREACH(stt->fobjArr, fObj) { getLevelSize(fObj, levelSize); }
5,995,992✔
1467
    }
1468

1469
    fObj = fset->farr[TSDB_FTYPE_DATA];
5,466,833✔
1470
    if (fObj) {
5,466,833✔
1471
      int32_t lcn = fObj->f->lcn;
2,795,327✔
1472
      if (lcn > 1) {
2,795,327✔
1473
        ssSize += ((lcn - 1) * chunksize);
2,490✔
1474
      }
1475
    }
1476
  }
1477

1478
  pInfo->l1Size = levelSize[0];
7,816,166✔
1479
  pInfo->l2Size = levelSize[1];
7,816,166✔
1480
  pInfo->l3Size = levelSize[2];
7,816,166✔
1481
  pInfo->ssSize = ssSize;
7,816,166✔
1482
  return code;
7,816,166✔
1483
}
1484
int32_t tsdbGetFsSize(STsdb *tsdb, SDbSizeStatisInfo *pInfo) {
7,816,166✔
1485
  int32_t code = 0;
7,816,166✔
1486

1487
  (void)taosThreadMutexLock(&tsdb->mutex);
7,816,166✔
1488
  code = tsdbGetFsSizeImpl(tsdb, pInfo);
7,816,166✔
1489
  (void)taosThreadMutexUnlock(&tsdb->mutex);
7,816,166✔
1490
  return code;
7,816,166✔
1491
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc