• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3541

26 Nov 2024 03:56AM UTC coverage: 60.776% (-0.07%) from 60.846%
#3541

push

travis-ci

web-flow
Merge pull request #28920 from taosdata/fix/TD-33008-3.0

fix(query)[TD-33008]. fix error handling in tsdbCacheRead

120076 of 252763 branches covered (47.51%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

1395 existing lines in 154 files now uncovered.

200995 of 275526 relevant lines covered (72.95%)

19612328.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.92
/source/dnode/vnode/src/tsdb/tsdbFS2.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbFS2.h"
17
#include "cos.h"
18
#include "tsdbUpgrade.h"
19
#include "vnd.h"
20

21
#define BLOCK_COMMIT_FACTOR 3
22

23
typedef struct STFileHashEntry {
24
  struct STFileHashEntry *next;
25
  char                    fname[TSDB_FILENAME_LEN];
26
} STFileHashEntry;
27

28
typedef struct {
29
  int32_t           numFile;
30
  int32_t           numBucket;
31
  STFileHashEntry **buckets;
32
} STFileHash;
33

34
static const char *gCurrentFname[] = {
35
    [TSDB_FCURRENT] = "current.json",
36
    [TSDB_FCURRENT_C] = "current.c.json",
37
    [TSDB_FCURRENT_M] = "current.m.json",
38
};
39

40
static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
13,655✔
41
  fs[0] = taosMemoryCalloc(1, sizeof(*fs[0]));
13,655✔
42
  if (fs[0] == NULL) {
13,735!
43
    return terrno;
×
44
  }
45

46
  fs[0]->tsdb = pTsdb;
13,735✔
47
  int32_t code = tsem_init(&fs[0]->canEdit, 0, 1);
13,735✔
48
  if (code) {
13,733!
49
    taosMemoryFree(fs[0]);
×
50
    return code;
×
51
  }
52

53
  fs[0]->fsstate = TSDB_FS_STATE_NORMAL;
13,736✔
54
  fs[0]->neid = 0;
13,736✔
55
  TARRAY2_INIT(fs[0]->fSetArr);
13,736✔
56
  TARRAY2_INIT(fs[0]->fSetArrTmp);
13,736✔
57

58
  return 0;
13,736✔
59
}
60

61
static void destroy_fs(STFileSystem **fs) {
13,736✔
62
  if (fs[0] == NULL) return;
13,736!
63

64
  TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
13,736✔
65
  TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
13,736✔
66
  if (tsem_destroy(&fs[0]->canEdit) != 0) {
13,736!
67
    tsdbError("failed to destroy semaphore");
×
68
  }
69
  taosMemoryFree(fs[0]);
13,736✔
70
  fs[0] = NULL;
13,736✔
71
}
72

73
void current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
95,677✔
74
  int32_t offset = 0;
95,677✔
75

76
  vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->diskPrimary, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN);
95,677✔
77
  offset = strlen(fname);
95,710✔
78
  snprintf(fname + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, gCurrentFname[ftype]);
95,710✔
79
}
95,710✔
80

81
static int32_t save_json(const cJSON *json, const char *fname) {
28,427✔
82
  int32_t   code = 0;
28,427✔
83
  int32_t   lino;
84
  char     *data = NULL;
28,427✔
85
  TdFilePtr fp = NULL;
28,427✔
86

87
  data = cJSON_PrintUnformatted(json);
28,427✔
88
  if (data == NULL) {
28,429!
89
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
90
  }
91

92
  fp = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
28,429✔
93
  if (fp == NULL) {
28,429!
94
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
95
  }
96

97
  if (taosWriteFile(fp, data, strlen(data)) < 0) {
28,429!
98
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
99
  }
100

101
  if (taosFsyncFile(fp) < 0) {
28,429!
102
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
103
  }
104

105
_exit:
28,433✔
106
  if (code) {
28,433!
107
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
108
  }
109
  taosMemoryFree(data);
28,433✔
110
  taosCloseFileWithLog(&fp);
28,433!
111
  return code;
28,433✔
112
}
113

114
static int32_t load_json(const char *fname, cJSON **json) {
2,584✔
115
  int32_t code = 0;
2,584✔
116
  int32_t lino;
117
  char   *data = NULL;
2,584✔
118

119
  TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
2,584✔
120
  if (fp == NULL) {
2,597!
121
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
122
  }
123

124
  int64_t size;
125
  code = taosFStatFile(fp, &size, NULL);
2,597✔
126
  if (code != 0) {
2,603!
127
    TSDB_CHECK_CODE(code, lino, _exit);
×
128
  }
129

130
  data = taosMemoryMalloc(size + 1);
2,603✔
131
  if (data == NULL) {
2,599!
132
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
133
  }
134

135
  if (taosReadFile(fp, data, size) < 0) {
2,599!
136
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
137
  }
138
  data[size] = '\0';
2,600✔
139

140
  json[0] = cJSON_Parse(data);
2,600✔
141
  if (json[0] == NULL) {
2,600!
142
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
143
  }
144

145
_exit:
2,600✔
146
  if (code) {
2,600!
147
    tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
×
148
    json[0] = NULL;
×
149
  }
150
  taosCloseFileWithLog(&fp);
2,600!
151
  taosMemoryFree(data);
2,607✔
152
  return code;
2,607✔
153
}
154

155
int32_t save_fs(const TFileSetArray *arr, const char *fname) {
28,430✔
156
  int32_t code = 0;
28,430✔
157
  int32_t lino = 0;
28,430✔
158

159
  cJSON *json = cJSON_CreateObject();
28,430✔
160
  if (json == NULL) {
28,432!
161
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
162
  }
163

164
  // fmtv
165
  if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
28,432!
166
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
167
  }
168

169
  // fset
170
  cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
28,433✔
171
  if (!ajson) {
28,430!
172
    TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
173
  }
174
  const STFileSet *fset;
175
  TARRAY2_FOREACH(arr, fset) {
199,321✔
176
    cJSON *item = cJSON_CreateObject();
170,891✔
177
    if (!item) {
170,890!
178
      TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
×
179
    }
180
    (void)cJSON_AddItemToArray(ajson, item);
170,890✔
181

182
    code = tsdbTFileSetToJson(fset, item);
170,878✔
183
    TSDB_CHECK_CODE(code, lino, _exit);
170,891!
184
  }
185

186
  code = save_json(json, fname);
28,430✔
187
  TSDB_CHECK_CODE(code, lino, _exit);
28,433!
188

189
_exit:
28,433✔
190
  if (code) {
28,433!
191
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
192
  }
193
  cJSON_Delete(json);
28,433✔
194
  return code;
28,433✔
195
}
196

197
static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
2,600✔
198
  int32_t code = 0;
2,600✔
199
  int32_t lino = 0;
2,600✔
200

201
  TARRAY2_CLEAR(arr, tsdbTFileSetClear);
2,600!
202

203
  // load json
204
  cJSON *json = NULL;
2,600✔
205
  code = load_json(fname, &json);
2,600✔
206
  TSDB_CHECK_CODE(code, lino, _exit);
2,607!
207

208
  // parse json
209
  const cJSON *item1;
210

211
  /* fmtv */
212
  item1 = cJSON_GetObjectItem(json, "fmtv");
2,607✔
213
  if (cJSON_IsNumber(item1)) {
2,606!
214
    if (item1->valuedouble != 1) {
2,597!
215
      TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
216
    }
217
  } else {
218
    TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
×
219
  }
220

221
  /* fset */
222
  item1 = cJSON_GetObjectItem(json, "fset");
2,597✔
223
  if (cJSON_IsArray(item1)) {
2,607!
224
    const cJSON *item2;
225
    cJSON_ArrayForEach(item2, item1) {
4,737!
226
      STFileSet *fset;
227
      code = tsdbJsonToTFileSet(pTsdb, item2, &fset);
2,130✔
228
      TSDB_CHECK_CODE(code, lino, _exit);
2,130!
229

230
      code = TARRAY2_APPEND(arr, fset);
2,130✔
231
      TSDB_CHECK_CODE(code, lino, _exit);
2,130!
232
    }
233
    TARRAY2_SORT(arr, tsdbTFileSetCmprFn);
2,607✔
234
  } else {
235
    code = TSDB_CODE_FILE_CORRUPTED;
×
236
    TSDB_CHECK_CODE(code, lino, _exit);
×
237
  }
238

239
_exit:
×
240
  if (code) {
2,607!
241
    tsdbError("%s failed at %sP%d since %s, fname:%s", __func__, __FILE__, lino, tstrerror(code), fname);
×
242
  }
243
  if (json) {
2,601!
244
    cJSON_Delete(json);
2,601✔
245
  }
246
  return code;
2,605✔
247
}
248

249
static int32_t apply_commit(STFileSystem *fs) {
17,304✔
250
  int32_t        code = 0;
17,304✔
251
  int32_t        lino;
252
  TFileSetArray *fsetArray1 = fs->fSetArr;
17,304✔
253
  TFileSetArray *fsetArray2 = fs->fSetArrTmp;
17,304✔
254
  int32_t        i1 = 0, i2 = 0;
17,304✔
255

256
  while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
187,516✔
257
    STFileSet *fset1 = i1 < TARRAY2_SIZE(fsetArray1) ? TARRAY2_GET(fsetArray1, i1) : NULL;
170,248✔
258
    STFileSet *fset2 = i2 < TARRAY2_SIZE(fsetArray2) ? TARRAY2_GET(fsetArray2, i2) : NULL;
170,248✔
259

260
    if (fset1 && fset2) {
170,248✔
261
      if (fset1->fid < fset2->fid) {
29,353!
262
        // delete fset1
263
        tsdbTFileSetRemove(fset1);
×
264
        i1++;
×
265
      } else if (fset1->fid > fset2->fid) {
29,353✔
266
        // create new file set with fid of fset2->fid
267
        code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
28✔
268
        TSDB_CHECK_CODE(code, lino, _exit);
28!
269
        code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
28✔
270
        TSDB_CHECK_CODE(code, lino, _exit);
28!
271
        i1++;
28✔
272
        i2++;
28✔
273
      } else {
274
        // edit
275
        code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1);
29,325✔
276
        TSDB_CHECK_CODE(code, lino, _exit);
29,324!
277
        i1++;
29,324✔
278
        i2++;
29,324✔
279
      }
280
    } else if (fset1) {
140,895✔
281
      // delete fset1
282
      tsdbTFileSetRemove(fset1);
9✔
283
      i1++;
9✔
284
    } else {
285
      // create new file set with fid of fset2->fid
286
      code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
140,886✔
287
      TSDB_CHECK_CODE(code, lino, _exit);
141,380!
288
      code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
140,851✔
289
      TSDB_CHECK_CODE(code, lino, _exit);
140,851!
290
      i1++;
140,851✔
291
      i2++;
140,851✔
292
    }
293
  }
294

295
_exit:
17,268✔
296
  if (code) {
17,268!
297
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
298
  }
299
  return code;
17,304✔
300
}
301

302
static int32_t commit_edit(STFileSystem *fs) {
17,304✔
303
  char current[TSDB_FILENAME_LEN];
304
  char current_t[TSDB_FILENAME_LEN];
305

306
  current_fname(fs->tsdb, current, TSDB_FCURRENT);
17,304✔
307
  if (fs->etype == TSDB_FEDIT_COMMIT) {
17,304✔
308
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
11,774✔
309
  } else {
310
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
5,530✔
311
  }
312

313
  int32_t code;
314
  int32_t lino;
315
  TSDB_CHECK_CODE(taosRenameFile(current_t, current), lino, _exit);
17,304!
316

317
  code = apply_commit(fs);
17,304✔
318
  TSDB_CHECK_CODE(code, lino, _exit);
17,304!
319

320
_exit:
17,304✔
321
  if (code) {
17,304!
322
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
323
              tstrerror(code));
324
  } else {
325
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
17,304!
326
  }
327
  return code;
17,304✔
328
}
329

330
// static int32_t
331
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs);
332
static int32_t apply_abort(STFileSystem *fs) { return tsdbFSDoSanAndFix(fs); }
×
333

334
static int32_t abort_edit(STFileSystem *fs) {
×
335
  char fname[TSDB_FILENAME_LEN];
336

337
  if (fs->etype == TSDB_FEDIT_COMMIT) {
×
338
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
×
339
  } else {
340
    current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
×
341
  }
342

343
  int32_t code;
344
  int32_t lino;
345
  if ((code = taosRemoveFile(fname))) {
×
346
    code = TAOS_SYSTEM_ERROR(code);
×
347
    TSDB_CHECK_CODE(code, lino, _exit);
×
348
  }
349

350
  code = apply_abort(fs);
×
351
  TSDB_CHECK_CODE(code, lino, _exit);
×
352

353
_exit:
×
354
  if (code) {
×
355
    tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
×
356
              tstrerror(code));
357
  } else {
358
    tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
×
359
  }
360
  return code;
×
361
}
362

363
static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) {
3,889✔
364
  int32_t code = 0;
3,889✔
365
  int32_t lino = 0;
3,889✔
366

367
  // check file existence
368
  if (!taosCheckExistFile(fobj->fname)) {
3,889!
369
    bool found = false;
×
370

371
    if (tsS3Enabled && fobj->f->lcn > 1) {
×
372
      char fname1[TSDB_FILENAME_LEN];
373
      tsdbTFileLastChunkName(fs->tsdb, fobj->f, fname1);
×
374
      if (!taosCheckExistFile(fname1)) {
×
375
        code = TSDB_CODE_FILE_CORRUPTED;
×
376
        tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fname1);
×
377
        return code;
×
378
      }
379

380
      found = true;
×
381
    }
382

383
    if (!found) {
×
384
      code = TSDB_CODE_FILE_CORRUPTED;
×
385
      tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fobj->fname);
×
386
      return code;
×
387
    }
388
  }
389

390
  return 0;
3,891✔
391
}
392

393
static void tsdbFSDestroyFileObjHash(STFileHash *hash);
394

395
static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname) {
6,499✔
396
  STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry));
6,499✔
397
  if (entry == NULL) return terrno;
6,499!
398

399
  strncpy(entry->fname, fname, TSDB_FILENAME_LEN);
6,499✔
400

401
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
6,499✔
402

403
  entry->next = hash->buckets[idx];
6,499✔
404
  hash->buckets[idx] = entry;
6,499✔
405
  hash->numFile++;
6,499✔
406

407
  return 0;
6,499✔
408
}
409

410
static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
2,598✔
411
  int32_t code = 0;
2,598✔
412
  int32_t lino;
413
  char    fname[TSDB_FILENAME_LEN];
414

415
  // init hash table
416
  hash->numFile = 0;
2,598✔
417
  hash->numBucket = 4096;
2,598✔
418
  hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *));
2,598✔
419
  if (hash->buckets == NULL) {
2,607!
420
    TSDB_CHECK_CODE(code = terrno, lino, _exit);
×
421
  }
422

423
  // vnode.json
424
  current_fname(fs->tsdb, fname, TSDB_FCURRENT);
2,607✔
425
  code = tsdbFSAddEntryToFileObjHash(hash, fname);
2,607✔
426
  TSDB_CHECK_CODE(code, lino, _exit);
2,607!
427

428
  // other
429
  STFileSet *fset = NULL;
2,607✔
430
  TARRAY2_FOREACH(fs->fSetArr, fset) {
4,737✔
431
    // data file
432
    for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
10,650✔
433
      if (fset->farr[i] != NULL) {
8,520✔
434
        code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname);
1,576✔
435
        TSDB_CHECK_CODE(code, lino, _exit);
1,576!
436

437
        if (TSDB_FTYPE_DATA == i && fset->farr[i]->f->lcn > 0) {
1,576!
438
          STFileObj *fobj = fset->farr[i];
×
439
          int32_t    lcn = fobj->f->lcn;
×
440
          char       lcn_name[TSDB_FILENAME_LEN];
441

442
          snprintf(lcn_name, TSDB_FQDN_LEN, "%s", fobj->fname);
×
443
          char *dot = strrchr(lcn_name, '.');
×
444
          if (dot) {
×
445
            snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lcn_name), "%d.data", lcn);
×
446

447
            code = tsdbFSAddEntryToFileObjHash(hash, lcn_name);
×
448
            TSDB_CHECK_CODE(code, lino, _exit);
×
449
          }
450
        }
451
      }
452
    }
453

454
    // stt file
455
    SSttLvl *lvl = NULL;
2,130✔
456
    TARRAY2_FOREACH(fset->lvlArr, lvl) {
4,332✔
457
      STFileObj *fobj;
458
      TARRAY2_FOREACH(lvl->fobjArr, fobj) {
4,518✔
459
        code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname);
2,316✔
460
        TSDB_CHECK_CODE(code, lino, _exit);
2,316!
461
      }
462
    }
463
  }
464

465
_exit:
2,607✔
466
  if (code) {
2,607!
467
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
468
    tsdbFSDestroyFileObjHash(hash);
×
469
  }
470
  return code;
2,606✔
471
}
472

473
static const STFileHashEntry *tsdbFSGetFileObjHashEntry(STFileHash *hash, const char *fname) {
6,489✔
474
  uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
6,489✔
475

476
  STFileHashEntry *entry = hash->buckets[idx];
6,499✔
477
  while (entry) {
6,499✔
478
    if (strcmp(entry->fname, fname) == 0) {
6,494!
479
      return entry;
6,494✔
480
    }
UNCOV
481
    entry = entry->next;
×
482
  }
483

484
  return NULL;
5✔
485
}
486

487
static void tsdbFSDestroyFileObjHash(STFileHash *hash) {
2,607✔
488
  for (int32_t i = 0; i < hash->numBucket; i++) {
10,577,872✔
489
    STFileHashEntry *entry = hash->buckets[i];
10,575,265✔
490
    while (entry) {
10,581,764✔
491
      STFileHashEntry *next = entry->next;
6,499✔
492
      taosMemoryFree(entry);
6,499✔
493
      entry = next;
6,499✔
494
    }
495
  }
496
  taosMemoryFree(hash->buckets);
2,607✔
497
  memset(hash, 0, sizeof(*hash));
2,607✔
498
}
2,607✔
499

500
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
2,593✔
501
  int32_t code = 0;
2,593✔
502
  int32_t lino = 0;
2,593✔
503
  int32_t corrupt = false;
2,593✔
504

505
  {  // scan each file
506
    STFileSet *fset = NULL;
2,593✔
507
    TARRAY2_FOREACH(fs->fSetArr, fset) {
4,721✔
508
      // data file
509
      for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
10,640✔
510
        if (fset->farr[ftype] == NULL) continue;
8,513✔
511
        STFileObj *fobj = fset->farr[ftype];
1,576✔
512
        code = tsdbFSDoScanAndFixFile(fs, fobj);
1,576✔
513
        if (code) {
1,576!
514
          fset->maxVerValid = (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
515
          corrupt = true;
×
516
        }
517
      }
518

519
      // stt file
520
      SSttLvl *lvl;
521
      TARRAY2_FOREACH(fset->lvlArr, lvl) {
4,327✔
522
        STFileObj *fobj;
523
        TARRAY2_FOREACH(lvl->fobjArr, fobj) {
4,513✔
524
          code = tsdbFSDoScanAndFixFile(fs, fobj);
2,313✔
525
          if (code) {
2,314!
526
            fset->maxVerValid =
×
527
                (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
×
528
            corrupt = true;
×
529
          }
530
        }
531
      }
532
    }
533
  }
534

535
  if (corrupt) {
2,594!
536
    tsdbError("vgId:%d, not to clear dangling files due to fset incompleteness", TD_VID(fs->tsdb->pVnode));
×
537
    fs->fsstate = TSDB_FS_STATE_INCOMPLETE;
×
538
    code = 0;
×
539
    goto _exit;
×
540
  }
541

542
  {  // clear unreferenced files
543
    STfsDir *dir = NULL;
2,594✔
544
    TAOS_CHECK_GOTO(tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path, &dir), &lino, _exit);
2,594!
545

546
    STFileHash fobjHash = {0};
2,603✔
547
    code = tsdbFSCreateFileObjHash(fs, &fobjHash);
2,603✔
548
    if (code) goto _close_dir;
2,606!
549

550
    for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) {
11,709✔
551
      if (taosIsDir(file->aname)) continue;
9,095✔
552

553
      if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
6,494!
554
        tsdbRemoveFile(file->aname);
×
555
      }
556
    }
557

558
    tsdbFSDestroyFileObjHash(&fobjHash);
2,608✔
559

560
  _close_dir:
2,607✔
561
    tfsClosedir(dir);
2,607✔
562
  }
563

564
_exit:
2,606✔
565
  if (code) {
2,606!
566
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
567
  }
568
  return code;
2,606✔
569
}
570

571
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
2,595✔
572
  fs->neid = 0;
2,595✔
573

574
  // get max commit id
575
  const STFileSet *fset;
576
  TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
4,722✔
577

578
  // scan and fix
579
  int32_t code = 0;
2,597✔
580
  int32_t lino = 0;
2,597✔
581

582
  code = tsdbFSDoSanAndFix(fs);
2,597✔
583
  TSDB_CHECK_CODE(code, lino, _exit);
2,605!
584

585
_exit:
2,605✔
586
  if (code) {
2,605!
587
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
588
  }
589
  return code;
2,605✔
590
}
591

592
static int32_t tsdbFSDupState(STFileSystem *fs) {
19,905✔
593
  int32_t code;
594

595
  const TFileSetArray *src = fs->fSetArr;
19,905✔
596
  TFileSetArray       *dst = fs->fSetArrTmp;
19,905✔
597

598
  TARRAY2_CLEAR(dst, tsdbTFileSetClear);
49,235✔
599

600
  const STFileSet *fset1;
601
  TARRAY2_FOREACH(src, fset1) {
51,367✔
602
    STFileSet *fset2;
603
    code = tsdbTFileSetInitCopy(fs->tsdb, fset1, &fset2);
31,463✔
604
    if (code) return code;
31,462!
605
    code = TARRAY2_APPEND(dst, fset2);
31,462✔
606
    if (code) return code;
31,462!
607
  }
608

609
  return 0;
19,904✔
610
}
611

612
static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
13,719✔
613
  int32_t code = 0;
13,719✔
614
  int32_t lino = 0;
13,719✔
615
  STsdb  *pTsdb = fs->tsdb;
13,719✔
616

617
  char fCurrent[TSDB_FILENAME_LEN];
618
  char cCurrent[TSDB_FILENAME_LEN];
619
  char mCurrent[TSDB_FILENAME_LEN];
620

621
  current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
13,719✔
622
  current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
13,711✔
623
  current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
13,728✔
624

625
  if (taosCheckExistFile(fCurrent)) {  // current.json exists
13,734✔
626
    code = load_fs(pTsdb, fCurrent, fs->fSetArr);
2,604✔
627
    TSDB_CHECK_CODE(code, lino, _exit);
2,601!
628

629
    if (taosCheckExistFile(cCurrent)) {
2,601!
630
      // current.c.json exists
631

632
      fs->etype = TSDB_FEDIT_COMMIT;
×
633
      if (rollback) {
×
634
        code = abort_edit(fs);
×
635
        TSDB_CHECK_CODE(code, lino, _exit);
×
636
      } else {
637
        code = load_fs(pTsdb, cCurrent, fs->fSetArrTmp);
×
638
        TSDB_CHECK_CODE(code, lino, _exit);
×
639

640
        code = commit_edit(fs);
×
641
        TSDB_CHECK_CODE(code, lino, _exit);
×
642
      }
643
    } else if (taosCheckExistFile(mCurrent)) {
2,604!
644
      // current.m.json exists
645
      fs->etype = TSDB_FEDIT_MERGE;
×
646
      code = abort_edit(fs);
×
647
      TSDB_CHECK_CODE(code, lino, _exit);
×
648
    }
649

650
    code = tsdbFSDupState(fs);
2,603✔
651
    TSDB_CHECK_CODE(code, lino, _exit);
2,596!
652

653
    code = tsdbFSScanAndFix(fs);
2,596✔
654
    TSDB_CHECK_CODE(code, lino, _exit);
2,604!
655
  } else {
656
    code = save_fs(fs->fSetArr, fCurrent);
11,127✔
657
    TSDB_CHECK_CODE(code, lino, _exit);
11,129!
658
  }
659

660
_exit:
11,129✔
661
  if (code) {
13,733!
662
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
663
  } else {
664
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
13,733!
665
  }
666
  return code;
13,736✔
667
}
668

669
static void close_file_system(STFileSystem *fs) {
13,736✔
670
  TARRAY2_CLEAR(fs->fSetArr, tsdbTFileSetClear);
157,383✔
671
  TARRAY2_CLEAR(fs->fSetArrTmp, tsdbTFileSetClear);
157,384✔
672
}
13,733✔
673

674
static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) {
×
675
  if (pSet1->fid < pSet2->fid) {
×
676
    return -1;
×
677
  } else if (pSet1->fid > pSet2->fid) {
×
678
    return 1;
×
679
  }
680
  return 0;
×
681
}
682

683
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
17,303✔
684
  int32_t code = 0;
17,303✔
685
  int32_t lino = 0;
17,303✔
686

687
  code = tsdbFSDupState(fs);
17,303✔
688
  if (code) return code;
17,303!
689

690
  TFileSetArray  *fsetArray = fs->fSetArrTmp;
17,303✔
691
  STFileSet      *fset = NULL;
17,303✔
692
  const STFileOp *op;
693
  TARRAY2_FOREACH_PTR(opArray, op) {
199,807✔
694
    if (!fset || fset->fid != op->fid) {
182,503✔
695
      STFileSet tfset = {.fid = op->fid};
157,393✔
696
      fset = &tfset;
157,393✔
697
      STFileSet **fsetPtr = TARRAY2_SEARCH(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ);
157,392✔
698
      fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
157,392✔
699

700
      if (!fset) {
157,392✔
701
        code = tsdbTFileSetInit(op->fid, &fset);
141,588✔
702
        TSDB_CHECK_CODE(code, lino, _exit);
141,588!
703

704
        code = TARRAY2_SORT_INSERT(fsetArray, fset, tsdbTFileSetCmprFn);
141,588✔
705
        TSDB_CHECK_CODE(code, lino, _exit);
141,588!
706
      }
707
    }
708

709
    code = tsdbTFileSetEdit(fs->tsdb, fset, op);
182,502✔
710
    TSDB_CHECK_CODE(code, lino, _exit);
182,504!
711
  }
712

713
  // remove empty empty stt level and empty file set
714
  int32_t i = 0;
17,304✔
715
  while (i < TARRAY2_SIZE(fsetArray)) {
188,235✔
716
    fset = TARRAY2_GET(fsetArray, i);
170,931✔
717

718
    SSttLvl *lvl;
719
    int32_t  j = 0;
170,931✔
720
    while (j < TARRAY2_SIZE(fset->lvlArr)) {
354,124✔
721
      lvl = TARRAY2_GET(fset->lvlArr, j);
183,193✔
722

723
      if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
183,193✔
724
        TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
6,438!
725
      } else {
726
        j++;
176,755✔
727
      }
728
    }
729

730
    if (tsdbTFileSetIsEmpty(fset)) {
170,931✔
731
      TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear);
10!
732
    } else {
733
      i++;
170,921✔
734
    }
735
  }
736

737
_exit:
17,304✔
738
  if (code) {
17,304!
739
    TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
×
740
  }
741
  return code;
17,304✔
742
}
743

744
// return error code
745
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) {
13,686✔
746
  int32_t code;
747
  int32_t lino;
748

749
  code = tsdbCheckAndUpgradeFileSystem(pTsdb, rollback);
13,686✔
750
  TSDB_CHECK_CODE(code, lino, _exit);
13,722!
751

752
  code = create_fs(pTsdb, fs);
13,722✔
753
  TSDB_CHECK_CODE(code, lino, _exit);
13,735!
754

755
  code = open_fs(fs[0], rollback);
13,735✔
756
  TSDB_CHECK_CODE(code, lino, _exit);
13,736!
757

758
_exit:
13,736✔
759
  if (code) {
13,736!
760
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
×
761
    destroy_fs(fs);
×
762
  } else {
763
    tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
13,736!
764
  }
765
  return code;
13,736✔
766
}
767

768
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block);
769
extern void tsdbStopAllCompTask(STsdb *tsdb);
770

771
int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
13,833✔
772
  STFileSystem *fs = pTsdb->pFS;
13,833✔
773
  SArray       *channelArray = taosArrayInit(0, sizeof(SVAChannelID));
13,833✔
774
  if (channelArray == NULL) {
13,833!
775
    return terrno;
×
776
  }
777

778
  (void)taosThreadMutexLock(&pTsdb->mutex);
13,833✔
779

780
  // disable
781
  pTsdb->bgTaskDisabled = true;
13,833✔
782

783
  // collect channel
784
  STFileSet *fset;
785
  TARRAY2_FOREACH(fs->fSetArr, fset) {
157,560✔
786
    if (fset->channelOpened) {
143,727✔
787
      if (taosArrayPush(channelArray, &fset->channel) == NULL) {
3,918!
788
        taosArrayDestroy(channelArray);
×
789
        (void)taosThreadMutexUnlock(&pTsdb->mutex);
×
790
        return terrno;
×
791
      }
792
      fset->channel = (SVAChannelID){0};
1,959✔
793
      fset->mergeScheduled = false;
1,959✔
794
      tsdbFSSetBlockCommit(fset, false);
1,959✔
795
      fset->channelOpened = false;
1,959✔
796
    }
797
  }
798

799
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
13,833✔
800

801
  // destroy all channels
802
  for (int32_t i = 0; i < taosArrayGetSize(channelArray); i++) {
15,792✔
803
    SVAChannelID *channel = taosArrayGet(channelArray, i);
1,959✔
804
    int32_t       code = vnodeAChannelDestroy(channel, true);
1,959✔
805
    if (code) {
1,959!
806
      tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
×
807
    }
808
  }
809
  taosArrayDestroy(channelArray);
13,833✔
810

811
#ifdef TD_ENTERPRISE
812
  tsdbStopAllCompTask(pTsdb);
13,833✔
813
#endif
814
  return 0;
13,833✔
815
}
816

817
void tsdbEnableBgTask(STsdb *pTsdb) {
97✔
818
  (void)taosThreadMutexLock(&pTsdb->mutex);
97✔
819
  pTsdb->bgTaskDisabled = false;
97✔
820
  (void)taosThreadMutexUnlock(&pTsdb->mutex);
97✔
821
}
97✔
822

823
void tsdbCloseFS(STFileSystem **fs) {
13,736✔
824
  if (fs[0] == NULL) return;
13,736!
825

826
  int32_t code = tsdbDisableAndCancelAllBgTask((*fs)->tsdb);
13,736✔
827
  if (code) {
13,736!
828
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID((*fs)->tsdb->pVnode), __func__, __LINE__,
×
829
              tstrerror(code));
830
  }
831
  close_file_system(fs[0]);
13,736✔
832
  destroy_fs(fs);
13,736✔
833
  return;
13,734✔
834
}
835

836
int64_t tsdbFSAllocEid(STFileSystem *fs) {
17,680✔
837
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
17,680✔
838
  int64_t cid = ++fs->neid;
17,680✔
839
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
17,680✔
840
  return cid;
17,680✔
841
}
842

843
void tsdbFSUpdateEid(STFileSystem *fs, int64_t cid) {
445✔
844
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
445✔
845
  fs->neid = TMAX(fs->neid, cid);
445✔
846
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
445✔
847
}
445✔
848

849
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype) {
17,303✔
850
  int32_t code = 0;
17,303✔
851
  int32_t lino;
852
  char    current_t[TSDB_FILENAME_LEN];
853

854
  if (etype == TSDB_FEDIT_COMMIT) {
17,303✔
855
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
11,773✔
856
  } else {
857
    current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
5,530✔
858
  }
859

860
  if (tsem_wait(&fs->canEdit) != 0) {
17,304!
861
    tsdbError("vgId:%d failed to wait semaphore", TD_VID(fs->tsdb->pVnode));
×
862
  }
863
  fs->etype = etype;
17,304✔
864

865
  // edit
866
  code = edit_fs(fs, opArray);
17,304✔
867
  TSDB_CHECK_CODE(code, lino, _exit);
17,304!
868

869
  // save fs
870
  code = save_fs(fs->fSetArrTmp, current_t);
17,304✔
871
  TSDB_CHECK_CODE(code, lino, _exit);
17,304!
872

873
_exit:
17,304✔
874
  if (code) {
17,304!
875
    tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, lino,
×
876
              tstrerror(code), etype);
877
  } else {
878
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, etype);
17,304!
879
  }
880
  return code;
17,304✔
881
}
882

883
static void tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
170,421✔
884
  if (block) {
170,421✔
885
    fset->blockCommit = true;
18✔
886
  } else {
887
    fset->blockCommit = false;
170,403✔
888
    if (fset->numWaitCommit > 0) {
170,403✔
889
      (void)taosThreadCondSignal(&fset->canCommit);
14✔
890
    }
891
  }
892
}
170,421✔
893

894
void tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
151,804✔
895
  (void)taosThreadMutexLock(&tsdb->mutex);
151,804✔
896
  STFileSet *fset;
897
  tsdbFSGetFSet(tsdb->pFS, fid, &fset);
151,848✔
898
  if (fset) {
151,735✔
899
    while (fset->blockCommit) {
10,330✔
900
      fset->numWaitCommit++;
14✔
901
      (void)taosThreadCondWait(&fset->canCommit, &tsdb->mutex);
14✔
902
      fset->numWaitCommit--;
14✔
903
    }
904
  }
905
  (void)taosThreadMutexUnlock(&tsdb->mutex);
151,735✔
906
  return;
151,832✔
907
}
908

909
// IMPORTANT: the caller must hold fs->tsdb->mutex
910
int32_t tsdbFSEditCommit(STFileSystem *fs) {
17,304✔
911
  int32_t code = 0;
17,304✔
912
  int32_t lino = 0;
17,304✔
913

914
  // commit
915
  code = commit_edit(fs);
17,304✔
916
  TSDB_CHECK_CODE(code, lino, _exit);
17,304!
917

918
  // schedule merge
919
  int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
17,304✔
920
  if (sttTrigger > 1 && !fs->tsdb->bgTaskDisabled) {
17,304✔
921
    STFileSet *fset;
922
    TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
185,555✔
923
      if (TARRAY2_SIZE(fset->lvlArr) == 0) {
168,462✔
924
        tsdbFSSetBlockCommit(fset, false);
3,061✔
925
        continue;
3,061✔
926
      }
927

928
      SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
165,401✔
929
      if (lvl->level != 0) {
165,401✔
930
        tsdbFSSetBlockCommit(fset, false);
6,658✔
931
        continue;
6,658✔
932
      }
933

934
      // bool    skipMerge = false;
935
      int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
158,743✔
936
      if (numFile >= sttTrigger && (!fset->mergeScheduled)) {
158,743✔
937
        code = tsdbTFileSetOpenChannel(fset);
6,784✔
938
        TSDB_CHECK_CODE(code, lino, _exit);
6,784!
939

940
        SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
6,784✔
941
        if (arg == NULL) {
6,784!
942
          code = terrno;
×
943
          TSDB_CHECK_CODE(code, lino, _exit);
×
944
        }
945

946
        arg->tsdb = fs->tsdb;
6,784✔
947
        arg->fid = fset->fid;
6,784✔
948

949
        code = vnodeAsync(&fset->channel, EVA_PRIORITY_HIGH, tsdbMerge, taosMemoryFree, arg, NULL);
6,784✔
950
        TSDB_CHECK_CODE(code, lino, _exit);
6,784!
951
        fset->mergeScheduled = true;
6,784✔
952
      }
953

954
      if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
158,743✔
955
        tsdbFSSetBlockCommit(fset, true);
18✔
956
      } else {
957
        tsdbFSSetBlockCommit(fset, false);
158,725✔
958
      }
959
    }
960
  }
961

962
_exit:
17,304✔
963
  if (code) {
17,304!
964
    tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
×
965
  } else {
966
    tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
17,304!
967
  }
968
  if (tsem_post(&fs->canEdit) != 0) {
17,304!
969
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
970
  }
971
  return code;
17,304✔
972
}
973

974
int32_t tsdbFSEditAbort(STFileSystem *fs) {
×
975
  int32_t code = abort_edit(fs);
×
976
  if (tsem_post(&fs->canEdit) != 0) {
×
977
    tsdbError("vgId:%d failed to post semaphore", TD_VID(fs->tsdb->pVnode));
×
978
  }
979
  return code;
×
980
}
981

982
void tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
311,183✔
983
  STFileSet   tfset = {.fid = fid};
311,183✔
984
  STFileSet  *pset = &tfset;
311,183✔
985
  STFileSet **fsetPtr = TARRAY2_SEARCH(fs->fSetArr, &pset, tsdbTFileSetCmprFn, TD_EQ);
311,183✔
986
  fset[0] = (fsetPtr == NULL) ? NULL : fsetPtr[0];
311,175✔
987
}
311,175✔
988

989
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
89✔
990
  int32_t    code = 0;
89✔
991
  STFileSet *fset;
992
  STFileSet *fset1;
993

994
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
89✔
995
  if (fsetArr[0] == NULL) return terrno;
89!
996

997
  TARRAY2_INIT(fsetArr[0]);
89✔
998

999
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
89✔
1000
  TARRAY2_FOREACH(fs->fSetArr, fset) {
89!
1001
    code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1);
×
1002
    if (code) break;
×
1003

1004
    code = TARRAY2_APPEND(fsetArr[0], fset1);
×
1005
    if (code) break;
×
1006
  }
1007
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
89✔
1008

1009
  if (code) {
89!
1010
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1011
    taosMemoryFree(fsetArr[0]);
×
1012
    fsetArr[0] = NULL;
×
1013
  }
1014
  return code;
89✔
1015
}
1016

1017
void tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
89✔
1018
  if (fsetArr[0]) {
89!
1019
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
89!
1020
    taosMemoryFree(fsetArr[0]);
89✔
1021
    fsetArr[0] = NULL;
89✔
1022
  }
1023
}
89✔
1024

1025
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
99✔
1026
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
99✔
1027
  int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr);
99✔
1028
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
99✔
1029
  return code;
99✔
1030
}
1031

1032
int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr) {
4,927,990✔
1033
  int32_t    code = 0;
4,927,990✔
1034
  STFileSet *fset, *fset1;
1035

1036
  fsetArr[0] = taosMemoryCalloc(1, sizeof(*fsetArr[0]));
4,927,990✔
1037
  if (fsetArr[0] == NULL) return terrno;
4,930,242!
1038

1039
  TARRAY2_FOREACH(fs->fSetArr, fset) {
15,857,468✔
1040
    code = tsdbTFileSetInitRef(fs->tsdb, fset, &fset1);
10,925,315✔
1041
    if (code) break;
10,926,447!
1042

1043
    code = TARRAY2_APPEND(fsetArr[0], fset1);
10,926,447✔
1044
    if (code) {
10,926,873!
1045
      tsdbTFileSetClear(&fset1);
×
1046
      break;
×
1047
    }
1048
  }
1049

1050
  if (code) {
4,932,153!
1051
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1052
    taosMemoryFree(fsetArr[0]);
×
1053
    fsetArr[0] = NULL;
×
1054
  }
1055
  return code;
4,932,153✔
1056
}
1057

1058
void tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) {
4,931,110✔
1059
  if (fsetArr[0]) {
4,931,110!
1060
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
15,863,240✔
1061
    taosMemoryFreeClear(fsetArr[0]);
4,931,529✔
1062
    fsetArr[0] = NULL;
4,931,586✔
1063
  }
1064
}
4,931,417✔
1065

1066
static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) {
×
1067
  int32_t   capacity = TARRAY2_SIZE(pRanges) * 2;
×
1068
  SHashObj *pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
×
1069
  if (pHash == NULL) {
×
1070
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1071
    return NULL;
×
1072
  }
1073

1074
  for (int32_t i = 0; i < TARRAY2_SIZE(pRanges); i++) {
×
1075
    STFileSetRange *u = TARRAY2_GET(pRanges, i);
×
1076
    int32_t         fid = u->fid;
×
1077
    int32_t         code = taosHashPut(pHash, &fid, sizeof(fid), u, sizeof(*u));
×
1078
    tsdbDebug("range diff hash fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
×
1079
  }
1080
  return pHash;
×
1081
}
1082

1083
int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr,
×
1084
                                       TFileOpArray *fopArr) {
1085
  int32_t    code = 0;
×
1086
  STFileSet *fset;
1087
  STFileSet *fset1;
1088
  SHashObj  *pHash = NULL;
×
1089

1090
  fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
×
1091
  if (fsetArr == NULL) return terrno;
×
1092
  TARRAY2_INIT(fsetArr[0]);
×
1093

1094
  if (pRanges) {
×
1095
    pHash = tsdbFSetRangeArrayToHash(pRanges);
×
1096
    if (pHash == NULL) {
×
1097
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1098
      goto _out;
×
1099
    }
1100
  }
1101

1102
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
×
1103
  TARRAY2_FOREACH(fs->fSetArr, fset) {
×
1104
    int64_t ever = VERSION_MAX;
×
1105
    if (pHash) {
×
1106
      int32_t         fid = fset->fid;
×
1107
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
×
1108
      if (u) {
×
1109
        ever = u->sver - 1;
×
1110
      }
1111
    }
1112

1113
    code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr);
×
1114
    if (code) break;
×
1115

1116
    code = TARRAY2_APPEND(fsetArr[0], fset1);
×
1117
    if (code) break;
×
1118
  }
1119
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
×
1120

1121
_out:
×
1122
  if (code) {
×
1123
    TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
×
1124
    taosMemoryFree(fsetArr[0]);
×
1125
    fsetArr[0] = NULL;
×
1126
  }
1127
  if (pHash) {
×
1128
    taosHashCleanup(pHash);
×
1129
    pHash = NULL;
×
1130
  }
1131
  return code;
×
1132
}
1133

1134
void tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { tsdbFSDestroyCopySnapshot(fsetArr); }
×
1135

1136
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
×
1137
                                      TFileSetRangeArray **fsrArr) {
1138
  int32_t         code = 0;
×
1139
  STFileSet      *fset;
1140
  STFileSetRange *fsr1 = NULL;
×
1141
  SHashObj       *pHash = NULL;
×
1142

1143
  fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
×
1144
  if (fsrArr[0] == NULL) {
×
1145
    code = terrno;
×
1146
    goto _out;
×
1147
  }
1148

1149
  tsdbInfo("pRanges size:%d", (pRanges == NULL ? 0 : TARRAY2_SIZE(pRanges)));
×
1150
  if (pRanges) {
×
1151
    pHash = tsdbFSetRangeArrayToHash(pRanges);
×
1152
    if (pHash == NULL) {
×
1153
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1154
      goto _out;
×
1155
    }
1156
  }
1157

1158
  (void)taosThreadMutexLock(&fs->tsdb->mutex);
×
1159
  TARRAY2_FOREACH(fs->fSetArr, fset) {
×
1160
    int64_t sver1 = sver;
×
1161
    int64_t ever1 = ever;
×
1162

1163
    if (pHash) {
×
1164
      int32_t         fid = fset->fid;
×
1165
      STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
×
1166
      if (u) {
×
1167
        sver1 = u->sver;
×
1168
        tsdbDebug("range hash get fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever);
×
1169
      }
1170
    }
1171

1172
    if (sver1 > ever1) {
×
1173
      tsdbDebug("skip fid:%d, sver:%" PRId64 ", ever:%" PRId64, fset->fid, sver1, ever1);
×
1174
      continue;
×
1175
    }
1176

1177
    tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
×
1178

1179
    code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);
×
1180
    if (code) break;
×
1181

1182
    code = TARRAY2_APPEND(fsrArr[0], fsr1);
×
1183
    if (code) break;
×
1184

1185
    fsr1 = NULL;
×
1186
  }
1187
  (void)taosThreadMutexUnlock(&fs->tsdb->mutex);
×
1188

1189
  if (code) {
×
1190
    tsdbTFileSetRangeClear(&fsr1);
×
1191
    TARRAY2_DESTROY(fsrArr[0], tsdbTFileSetRangeClear);
×
1192
    fsrArr[0] = NULL;
×
1193
  }
1194

1195
_out:
×
1196
  if (pHash) {
×
1197
    taosHashCleanup(pHash);
×
1198
    pHash = NULL;
×
1199
  }
1200
  return code;
×
1201
}
1202

1203
void tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { tsdbTFileSetRangeArrayDestroy(fsrArr); }
×
1204

1205
void tsdbBeginTaskOnFileSet(STsdb *tsdb, int32_t fid, STFileSet **fset) {
152,390✔
1206
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
152,390✔
1207

1208
  tsdbFSGetFSet(tsdb->pFS, fid, fset);
152,390✔
1209
  if (sttTrigger == 1 && (*fset)) {
152,394✔
1210
    for (;;) {
1211
      if ((*fset)->taskRunning) {
301!
1212
        (*fset)->numWaitTask++;
×
1213

1214
        (void)taosThreadCondWait(&(*fset)->beginTask, &tsdb->mutex);
×
1215

1216
        tsdbFSGetFSet(tsdb->pFS, fid, fset);
×
1217

1218
        (*fset)->numWaitTask--;
×
1219
      } else {
1220
        (*fset)->taskRunning = true;
301✔
1221
        break;
301✔
1222
      }
1223
    }
1224
    tsdbInfo("vgId:%d begin task on file set:%d", TD_VID(tsdb->pVnode), fid);
301!
1225
  }
1226
}
152,394✔
1227

1228
void tsdbFinishTaskOnFileSet(STsdb *tsdb, int32_t fid) {
10,849✔
1229
  int16_t sttTrigger = tsdb->pVnode->config.sttTrigger;
10,849✔
1230
  if (sttTrigger == 1) {
10,849✔
1231
    STFileSet *fset = NULL;
301✔
1232
    tsdbFSGetFSet(tsdb->pFS, fid, &fset);
301✔
1233
    if (fset != NULL && fset->taskRunning) {
301!
1234
      fset->taskRunning = false;
301✔
1235
      if (fset->numWaitTask > 0) {
301!
1236
        (void)taosThreadCondSignal(&fset->beginTask);
×
1237
      }
1238
      tsdbInfo("vgId:%d finish task on file set:%d", TD_VID(tsdb->pVnode), fid);
301!
1239
    }
1240
  }
1241
}
10,849✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc