• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.63
/source/dnode/vnode/src/tsdb/tsdbFSet2.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbFSet2.h"
17
#include "vnd.h"
18

19
int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
8,063,636✔
20
  if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) {
8,063,636!
21
    return terrno;
×
22
  }
23
  lvl[0]->level = level;
8,066,080✔
24
  TARRAY2_INIT(lvl[0]->fobjArr);
8,066,080✔
25
  return 0;
8,066,080✔
26
}
27

28
static void tsdbSttLvlClearFObj(void *data) { TAOS_UNUSED(tsdbTFileObjUnref(*(STFileObj **)data)); }
8,103,276✔
29

30
void tsdbSttLvlClear(SSttLvl **lvl) {
8,063,341✔
31
  if (lvl[0] != NULL) {
8,063,341!
32
    TARRAY2_DESTROY(lvl[0]->fobjArr, tsdbSttLvlClearFObj);
16,156,063!
33
    taosMemoryFree(lvl[0]);
8,063,802!
34
    lvl[0] = NULL;
8,064,040✔
35
  }
36
}
8,063,604✔
37

38
static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl) {
386,621✔
39
  int32_t code = tsdbSttLvlInit(lvl1->level, lvl);
386,621✔
40
  if (code) return code;
386,625!
41

42
  const STFileObj *fobj1;
43
  TARRAY2_FOREACH(lvl1->fobjArr, fobj1) {
787,301✔
44
    STFileObj *fobj;
45
    code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj);
400,662✔
46
    if (code) {
400,675✔
47
      tsdbSttLvlClear(lvl);
1✔
48
      return code;
×
49
    }
50

51
    code = TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
400,674✔
52
    if (code) {
400,676!
53
      tsdbSttLvlClear(lvl);
×
54
      taosMemoryFree(fobj);
×
55
      return code;
×
56
    }
57
  }
58
  return 0;
386,639✔
59
}
60

61
static int32_t tsdbSttLvlInitRef(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl) {
7,341,434✔
62
  int32_t code = tsdbSttLvlInit(lvl1->level, lvl);
7,341,434✔
63
  if (code) return code;
7,343,442!
64

65
  STFileObj *fobj1;
66
  TARRAY2_FOREACH(lvl1->fobjArr, fobj1) {
14,705,975✔
67
    code = tsdbTFileObjRef(fobj1);
7,361,703✔
68
    if (code) {
7,366,235✔
69
      tsdbSttLvlClear(lvl);
1,776✔
70
      return code;
×
71
    }
72
    code = TARRAY2_APPEND(lvl[0]->fobjArr, fobj1);
7,364,459✔
73
    if (code) {
7,362,533!
74
      if (tsdbTFileObjUnref(fobj1) != 0) {
×
75
        tsdbError("failed to unref file obj, fobj:%p", fobj1);
×
76
      }
77
      tsdbSttLvlClear(lvl);
×
78
      return code;
×
79
    }
80
  }
81
  return 0;
7,344,272✔
82
}
83

84
static int32_t tsdbSttLvlFilteredInitEx(STsdb *pTsdb, const SSttLvl *lvl1, int64_t ever, SSttLvl **lvl,
×
85
                                        TFileOpArray *fopArr) {
86
  int32_t code = tsdbSttLvlInit(lvl1->level, lvl);
×
87
  if (code) return code;
×
88

89
  const STFileObj *fobj1;
90
  TARRAY2_FOREACH(lvl1->fobjArr, fobj1) {
×
91
    if (fobj1->f->maxVer <= ever) {
×
92
      STFileObj *fobj;
93
      code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj);
×
94
      if (code) {
×
95
        tsdbSttLvlClear(lvl);
×
96
        return code;
×
97
      }
98

99
      TAOS_CHECK_RETURN(TARRAY2_APPEND(lvl[0]->fobjArr, fobj));
×
100
    } else {
101
      STFileOp op = {
×
102
          .optype = TSDB_FOP_REMOVE,
103
          .fid = fobj1->f->fid,
×
104
          .of = fobj1->f[0],
105
      };
106
      TAOS_CHECK_RETURN(TARRAY2_APPEND(fopArr, op));
×
107
    }
108
  }
109
  return 0;
×
110
}
111

112
static void tsdbSttLvlRemoveFObj(void *data) {
10,957✔
113
  int32_t code = tsdbTFileObjRemove(*(STFileObj **)data);
10,957✔
114
  if (code) {
10,957!
115
    tsdbError("failed to remove file obj, code:%d, error:%s", code, tstrerror(code));
×
116
  }
117
}
10,957✔
118
static void tsdbSttLvlRemove(SSttLvl **lvl) {
5,512✔
119
  TARRAY2_DESTROY(lvl[0]->fobjArr, tsdbSttLvlRemoveFObj);
15,130!
120
  taosMemoryFree(lvl[0]);
5,512!
121
  lvl[0] = NULL;
5,512✔
122
}
5,512✔
123

124
static int32_t tsdbSttLvlApplyEdit(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl *lvl2) {
25,649✔
125
  int32_t code = 0;
25,649✔
126

127
  if (lvl1->level != lvl2->level) {
25,649!
128
    return TSDB_CODE_INVALID_PARA;
×
129
  }
130

131
  int32_t i1 = 0, i2 = 0;
25,649✔
132
  while (i1 < TARRAY2_SIZE(lvl1->fobjArr) || i2 < TARRAY2_SIZE(lvl2->fobjArr)) {
62,097!
133
    STFileObj *fobj1 = i1 < TARRAY2_SIZE(lvl1->fobjArr) ? TARRAY2_GET(lvl1->fobjArr, i1) : NULL;
36,448!
134
    STFileObj *fobj2 = i2 < TARRAY2_SIZE(lvl2->fobjArr) ? TARRAY2_GET(lvl2->fobjArr, i2) : NULL;
36,448✔
135

136
    if (fobj1 && fobj2) {
36,448!
137
      if (fobj1->f->cid < fobj2->f->cid) {
31,012!
138
        // create a file obj
139
        code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2);
×
140
        if (code) return code;
×
141
        code = TARRAY2_INSERT_PTR(lvl2->fobjArr, i2, &fobj2);
×
142
        if (code) return code;
×
143
        i1++;
×
144
        i2++;
×
145
      } else if (fobj1->f->cid > fobj2->f->cid) {
31,012✔
146
        // remove a file obj
147
        TARRAY2_REMOVE(lvl2->fobjArr, i2, tsdbSttLvlRemoveFObj);
1,321!
148
      } else {
149
        if (tsdbIsSameTFile(fobj1->f, fobj2->f)) {
29,691✔
150
          if (tsdbIsTFileChanged(fobj1->f, fobj2->f)) {
29,673!
151
            fobj2->f[0] = fobj1->f[0];
×
152
          }
153
        } else {
154
          TARRAY2_REMOVE(lvl2->fobjArr, i2, tsdbSttLvlRemoveFObj);
18!
155
          code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2);
18✔
156
          if (code) return code;
18!
157
          code = TARRAY2_SORT_INSERT(lvl2->fobjArr, fobj2, tsdbTFileObjCmpr);
18✔
158
          if (code) return code;
18!
159
        }
160
        i1++;
29,691✔
161
        i2++;
29,691✔
162
      }
163
    } else if (fobj1) {
5,436!
164
      // create a file obj
165
      code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2);
5,436✔
166
      if (code) return code;
5,436!
167
      code = TARRAY2_INSERT_PTR(lvl2->fobjArr, i2, &fobj2);
5,436!
168
      if (code) return code;
5,436!
169
      i1++;
5,436✔
170
      i2++;
5,436✔
171
    } else {
172
      // remove a file obj
173
      TARRAY2_REMOVE(lvl2->fobjArr, i2, tsdbSttLvlRemoveFObj);
×
174
    }
175
  }
176
  return 0;
25,649✔
177
}
178

179
static int32_t tsdbSttLvlCmprFn(const SSttLvl **lvl1, const SSttLvl **lvl2) {
38,192✔
180
  if (lvl1[0]->level < lvl2[0]->level) return -1;
38,192✔
181
  if (lvl1[0]->level > lvl2[0]->level) return 1;
23,706✔
182
  return 0;
16,428✔
183
}
184

185
static int32_t tsdbSttLvlToJson(const SSttLvl *lvl, cJSON *json) {
359,881✔
186
  if (cJSON_AddNumberToObject(json, "level", lvl->level) == NULL) {
359,881!
187
    return TSDB_CODE_OUT_OF_MEMORY;
×
188
  }
189

190
  cJSON *ajson = cJSON_AddArrayToObject(json, "files");
359,877✔
191
  if (ajson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
359,877!
192
  const STFileObj *fobj;
193
  TARRAY2_FOREACH(lvl->fobjArr, fobj) {
729,252✔
194
    cJSON *item = cJSON_CreateObject();
369,372✔
195
    if (item == NULL) return TSDB_CODE_OUT_OF_MEMORY;
369,386!
196
    (void)cJSON_AddItemToArray(ajson, item);
369,386✔
197

198
    int32_t code = tsdbTFileToJson(fobj->f, item);
369,379✔
199
    if (code) return code;
369,375!
200
  }
201

202
  return 0;
359,880✔
203
}
204

205
static int32_t tsdbJsonToSttLvl(STsdb *pTsdb, const cJSON *json, SSttLvl **lvl) {
1,544✔
206
  const cJSON *item1, *item2;
207
  int32_t      level;
208

209
  item1 = cJSON_GetObjectItem(json, "level");
1,544✔
210
  if (cJSON_IsNumber(item1)) {
1,552!
211
    level = item1->valuedouble;
1,552✔
212
  } else {
213
    return TSDB_CODE_FILE_CORRUPTED;
×
214
  }
215

216
  int32_t code = tsdbSttLvlInit(level, lvl);
1,552✔
217
  if (code) return code;
1,552!
218

219
  item1 = cJSON_GetObjectItem(json, "files");
1,552✔
220
  if (!cJSON_IsArray(item1)) {
1,554!
221
    tsdbSttLvlClear(lvl);
×
222
    return TSDB_CODE_FILE_CORRUPTED;
×
223
  }
224

225
  cJSON_ArrayForEach(item2, item1) {
3,207!
226
    STFile tf;
227
    code = tsdbJsonToTFile(item2, TSDB_FTYPE_STT, &tf);
1,654✔
228
    if (code) {
1,655!
229
      tsdbSttLvlClear(lvl);
×
230
      return code;
×
231
    }
232

233
    STFileObj *fobj;
234
    code = tsdbTFileObjInit(pTsdb, &tf, &fobj);
1,655✔
235
    if (code) {
1,655✔
236
      tsdbSttLvlClear(lvl);
1✔
237
      return code;
×
238
    }
239

240
    code = TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
1,654✔
241
    if (code) return code;
1,654!
242
  }
243
  TARRAY2_SORT(lvl[0]->fobjArr, tsdbTFileObjCmpr);
1,553✔
244
  return 0;
1,550✔
245
}
246

247
int32_t tsdbTFileSetToJson(const STFileSet *fset, cJSON *json) {
354,796✔
248
  int32_t code = 0;
354,796✔
249
  cJSON  *item1, *item2;
250

251
  // fid
252
  if (cJSON_AddNumberToObject(json, "fid", fset->fid) == NULL) {
354,796!
253
    return TSDB_CODE_OUT_OF_MEMORY;
×
254
  }
255

256
  for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
1,774,013✔
257
    if (fset->farr[ftype] == NULL) continue;
1,419,208✔
258

259
    code = tsdbTFileToJson(fset->farr[ftype]->f, json);
39,284✔
260
    if (code) return code;
39,284!
261
  }
262

263
  // each level
264
  item1 = cJSON_AddArrayToObject(json, "stt lvl");
354,805✔
265
  if (item1 == NULL) return TSDB_CODE_OUT_OF_MEMORY;
354,799!
266
  const SSttLvl *lvl;
267
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
714,679✔
268
    item2 = cJSON_CreateObject();
359,876✔
269
    if (!item2) return TSDB_CODE_OUT_OF_MEMORY;
359,888!
270
    (void)cJSON_AddItemToArray(item1, item2);
359,888✔
271

272
    code = tsdbSttLvlToJson(lvl, item2);
359,881✔
273
    if (code) return code;
359,880!
274
  }
275

276
  // about compact and commit
277
  if (cJSON_AddNumberToObject(json, "last compact", fset->lastCompact) == NULL) {
354,803!
278
    return TSDB_CODE_OUT_OF_MEMORY;
×
279
  }
280

281
  if (cJSON_AddNumberToObject(json, "last commit", fset->lastCommit) == NULL) {
354,797!
282
    return TSDB_CODE_OUT_OF_MEMORY;
×
283
  }
284

285
  return 0;
354,798✔
286
}
287

288
int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset) {
1,580✔
289
  int32_t      code;
290
  const cJSON *item1, *item2;
291
  int32_t      fid;
292
  STFile       tf;
293

294
  // fid
295
  item1 = cJSON_GetObjectItem(json, "fid");
1,580✔
296
  if (cJSON_IsNumber(item1)) {
1,593!
297
    fid = item1->valuedouble;
1,591✔
298
  } else {
299
    return TSDB_CODE_FILE_CORRUPTED;
×
300
  }
301

302
  code = tsdbTFileSetInit(fid, fset);
1,591✔
303
  if (code) return code;
1,588!
304

305
  for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
7,923✔
306
    code = tsdbJsonToTFile(json, ftype, &tf);
6,333✔
307
    if (code == TSDB_CODE_NOT_FOUND) {
6,332✔
308
      continue;
5,647✔
309
    } else if (code) {
685!
310
      tsdbTFileSetClear(fset);
×
311
      return code;
×
312
    } else {
313
      code = tsdbTFileObjInit(pTsdb, &tf, &(*fset)->farr[ftype]);
685✔
314
      if (code) return code;
688!
315
    }
316
  }
317

318
  // each level
319
  item1 = cJSON_GetObjectItem(json, "stt lvl");
1,590✔
320
  if (cJSON_IsArray(item1)) {
1,593!
321
    cJSON_ArrayForEach(item2, item1) {
3,144!
322
      SSttLvl *lvl;
323
      code = tsdbJsonToSttLvl(pTsdb, item2, &lvl);
1,546✔
324
      if (code) {
1,549!
325
        tsdbTFileSetClear(fset);
×
326
        return code;
×
327
      }
328

329
      code = TARRAY2_APPEND((*fset)->lvlArr, lvl);
1,552✔
330
      if (code) return code;
1,552!
331
    }
332
    TARRAY2_SORT((*fset)->lvlArr, tsdbSttLvlCmprFn);
1,598✔
333
  } else {
334
    return TSDB_CODE_FILE_CORRUPTED;
×
335
  }
336
  // about compact and commit
337
  item1 = cJSON_GetObjectItem(json, "last compact");
1,598✔
338
  if (cJSON_IsNumber(item1)) {
1,593!
339
    (*fset)->lastCompact = item1->valuedouble;
1,593✔
340
  } else {
341
    (*fset)->lastCompact = 0;
×
342
  }
343

344
  item1 = cJSON_GetObjectItem(json, "last commit");
1,593✔
345
  if (cJSON_IsNumber(item1)) {
1,593!
346
    (*fset)->lastCommit = item1->valuedouble;
1,593✔
347
  } else {
348
    (*fset)->lastCommit = 0;
×
349
  }
350

351
  return 0;
1,593✔
352
}
353

354
// NOTE: the api does not remove file, only do memory operation
355
int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
360,949✔
356
  int32_t code = 0;
360,949✔
357

358
  if (op->optype == TSDB_FOP_CREATE) {
360,949✔
359
    // create a new file
360
    STFileObj *fobj;
361
    code = tsdbTFileObjInit(pTsdb, &op->nf, &fobj);
348,391✔
362
    if (code) return code;
348,402!
363

364
    if (fobj->f->type == TSDB_FTYPE_STT) {
348,402✔
365
      SSttLvl *lvl = tsdbTFileSetGetSttLvl(fset, fobj->f->stt->level);
339,694✔
366
      if (!lvl) {
339,705✔
367
        code = tsdbSttLvlInit(fobj->f->stt->level, &lvl);
334,234✔
368
        if (code) return code;
334,232!
369

370
        code = TARRAY2_SORT_INSERT(fset->lvlArr, lvl, tsdbSttLvlCmprFn);
334,232✔
371
        if (code) return code;
334,229!
372
      }
373

374
      code = TARRAY2_SORT_INSERT(lvl->fobjArr, fobj, tsdbTFileObjCmpr);
339,700✔
375
      if (code) return code;
339,692!
376
    } else {
377
      fset->farr[fobj->f->type] = fobj;
8,708✔
378
    }
379
  } else if (op->optype == TSDB_FOP_REMOVE) {
12,558✔
380
    // delete a file
381
    if (op->of.type == TSDB_FTYPE_STT) {
11,952✔
382
      SSttLvl *lvl = tsdbTFileSetGetSttLvl(fset, op->of.stt->level);
10,957✔
383

384
      STFileObj  tfobj = {.f[0] = {.cid = op->of.cid}};
10,957✔
385
      STFileObj *tfobjp = &tfobj;
10,957✔
386
      int32_t    idx = TARRAY2_SEARCH_IDX(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ);
10,957✔
387
      TARRAY2_REMOVE(lvl->fobjArr, idx, tsdbSttLvlClearFObj);
10,957!
388
    } else {
389
      code = tsdbTFileObjUnref(fset->farr[op->of.type]);
995✔
390
      if (code) return code;
995!
391
      fset->farr[op->of.type] = NULL;
995✔
392
    }
393
  } else {
394
    if (op->nf.type == TSDB_FTYPE_STT) {
606!
395
      SSttLvl *lvl = tsdbTFileSetGetSttLvl(fset, op->of.stt->level);
×
396

397
      STFileObj   tfobj = {.f[0] = {.cid = op->of.cid}}, *tfobjp = &tfobj;
×
398
      STFileObj **fobjPtr = TARRAY2_SEARCH(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ);
×
399
      if (fobjPtr) {
×
400
        tfobjp = *fobjPtr;
×
401
        tfobjp->f[0] = op->nf;
×
402
      } else {
403
        tsdbError("file not found, cid:%" PRId64, op->of.cid);
×
404
      }
405
    } else {
406
      fset->farr[op->nf.type]->f[0] = op->nf;
606✔
407
    }
408
  }
409

410
  return 0;
360,958✔
411
}
412

413
int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *fset2) {
24,874✔
414
  int32_t code = 0;
24,874✔
415

416
  if (fset1->fid != fset2->fid) {
24,874!
417
    return TSDB_CODE_INVALID_PARA;
×
418
  }
419

420
  for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
124,370✔
421
    if (!fset1->farr[ftype] && !fset2->farr[ftype]) continue;
99,496!
422

423
    STFileObj *fobj1 = fset1->farr[ftype];
35,806✔
424
    STFileObj *fobj2 = fset2->farr[ftype];
35,806✔
425

426
    if (fobj1 && fobj2) {
35,806!
427
      if (tsdbIsSameTFile(fobj1->f, fobj2->f)) {
31,572✔
428
        if (tsdbIsTFileChanged(fobj1->f, fobj2->f)) {
30,577✔
429
          fobj2->f[0] = fobj1->f[0];
606✔
430
        }
431
      } else {
432
        if (fobj1->f->cid != fobj2->f->cid) {
995✔
433
          code = tsdbTFileObjRemove(fobj2);
992✔
434
          if (code) return code;
992!
435
        } else {
436
          code = tsdbTFileObjRemoveUpdateLC(fobj2);
3✔
437
          if (code) return code;
3!
438
        }
439
        code = tsdbTFileObjInit(pTsdb, fobj1->f, &fset2->farr[ftype]);
995✔
440
        if (code) return code;
995!
441
      }
442
    } else if (fobj1) {
4,234!
443
      // create a new file
444
      code = tsdbTFileObjInit(pTsdb, fobj1->f, &fset2->farr[ftype]);
4,234✔
445
      if (code) return code;
4,234!
446
    } else {
447
      // remove the file
448
      code = tsdbTFileObjRemove(fobj2);
×
449
      if (code) return code;
×
450
      fset2->farr[ftype] = NULL;
×
451
    }
452
  }
453

454
  // stt part
455
  int32_t i1 = 0, i2 = 0;
24,874✔
456
  while (i1 < TARRAY2_SIZE(fset1->lvlArr) || i2 < TARRAY2_SIZE(fset2->lvlArr)) {
61,477✔
457
    SSttLvl *lvl1 = i1 < TARRAY2_SIZE(fset1->lvlArr) ? TARRAY2_GET(fset1->lvlArr, i1) : NULL;
36,603✔
458
    SSttLvl *lvl2 = i2 < TARRAY2_SIZE(fset2->lvlArr) ? TARRAY2_GET(fset2->lvlArr, i2) : NULL;
36,603✔
459

460
    if (lvl1 && lvl2) {
36,603✔
461
      if (lvl1->level < lvl2->level) {
33,828✔
462
        // add a new stt level
463
        code = tsdbSttLvlInitEx(pTsdb, lvl1, &lvl2);
3,756✔
464
        if (code) return code;
3,756!
465
        code = TARRAY2_SORT_INSERT(fset2->lvlArr, lvl2, tsdbSttLvlCmprFn);
3,756✔
466
        if (code) return code;
3,756!
467
        i1++;
3,756✔
468
        i2++;
3,756✔
469
      } else if (lvl1->level > lvl2->level) {
30,072✔
470
        // remove the stt level
471
        TARRAY2_REMOVE(fset2->lvlArr, i2, tsdbSttLvlRemove);
4,423!
472
      } else {
473
        // apply edit on stt level
474
        code = tsdbSttLvlApplyEdit(pTsdb, lvl1, lvl2);
25,649✔
475
        if (code) return code;
25,649!
476
        i1++;
25,649✔
477
        i2++;
25,649✔
478
      }
479
    } else if (lvl1) {
2,775✔
480
      // add a new stt level
481
      code = tsdbSttLvlInitEx(pTsdb, lvl1, &lvl2);
1,686✔
482
      if (code) return code;
1,686!
483
      code = TARRAY2_SORT_INSERT(fset2->lvlArr, lvl2, tsdbSttLvlCmprFn);
1,686✔
484
      if (code) return code;
1,686!
485
      i1++;
1,686✔
486
      i2++;
1,686✔
487
    } else {
488
      // remove the stt level
489
      TARRAY2_REMOVE(fset2->lvlArr, i2, tsdbSttLvlRemove);
1,089!
490
    }
491
  }
492

493
  fset2->lastCompact = fset1->lastCompact;
24,874✔
494
  fset2->lastCommit = fset1->lastCommit;
24,874✔
495

496
  return 0;
24,874✔
497
}
498

499
int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) {
8,141,349✔
500
  fset[0] = taosMemoryCalloc(1, sizeof(STFileSet));
8,141,349!
501
  if (fset[0] == NULL) {
8,131,460!
502
    return terrno;
×
503
  }
504

505
  fset[0]->fid = fid;
8,131,460✔
506
  fset[0]->maxVerValid = VERSION_MAX;
8,131,460✔
507
  TARRAY2_INIT(fset[0]->lvlArr);
8,131,460✔
508

509
  // block commit variables
510
  (void)taosThreadCondInit(&fset[0]->canCommit, NULL);
8,131,460✔
511
  (*fset)->numWaitCommit = 0;
8,137,090✔
512
  (*fset)->blockCommit = false;
8,137,090✔
513

514
  for (int32_t i = 0; i < sizeof((*fset)->conds) / sizeof((*fset)->conds[0]); ++i) {
24,393,422✔
515
    struct STFileSetCond *cond = &(*fset)->conds[i];
16,256,861✔
516
    cond->running = false;
16,256,861✔
517
    cond->numWait = 0;
16,256,861✔
518
    (void)taosThreadCondInit(&cond->cond, NULL);
16,256,861✔
519
  }
520

521
  return 0;
8,136,561✔
522
}
523

524
int32_t tsdbTFileSetInitCopy(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset) {
369,107✔
525
  int32_t code = tsdbTFileSetInit(fset1->fid, fset);
369,107✔
526
  if (code) return code;
369,113!
527

528
  for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
1,845,551✔
529
    if (fset1->farr[ftype] == NULL) continue;
1,476,437✔
530

531
    code = tsdbTFileObjInit(pTsdb, fset1->farr[ftype]->f, &fset[0]->farr[ftype]);
47,734✔
532
    if (code) {
47,735!
533
      tsdbTFileSetClear(fset);
×
534
      return code;
×
535
    }
536
  }
537

538
  const SSttLvl *lvl1;
539
  TARRAY2_FOREACH(fset1->lvlArr, lvl1) {
750,295✔
540
    SSttLvl *lvl;
541
    code = tsdbSttLvlInitEx(pTsdb, lvl1, &lvl);
381,178✔
542
    if (code) {
381,179✔
543
      tsdbTFileSetClear(fset);
1✔
544
      return code;
×
545
    }
546

547
    code = TARRAY2_APPEND(fset[0]->lvlArr, lvl);
381,178✔
548
    if (code) return code;
381,181!
549
  }
550

551
  (*fset)->lastCompact = fset1->lastCompact;
369,117✔
552
  (*fset)->lastCommit = fset1->lastCommit;
369,117✔
553

554
  return 0;
369,117✔
555
}
556

557
int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset,
×
558
                                    TFileOpArray *fopArr) {
559
  int32_t code = tsdbTFileSetInit(fset1->fid, fset);
×
560
  if (code) return code;
×
561

562
  for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
×
563
    if (fset1->farr[ftype] == NULL) continue;
×
564
    STFileObj *fobj = fset1->farr[ftype];
×
565
    if (fobj->f->maxVer <= ever) {
×
566
      code = tsdbTFileObjInit(pTsdb, fobj->f, &fset[0]->farr[ftype]);
×
567
      if (code) {
×
568
        tsdbTFileSetClear(fset);
×
569
        return code;
×
570
      }
571
    } else {
572
      STFileOp op = {
×
573
          .optype = TSDB_FOP_REMOVE,
574
          .fid = fobj->f->fid,
×
575
          .of = fobj->f[0],
576
      };
577
      code = TARRAY2_APPEND(fopArr, op);
×
578
      if (code) return code;
×
579
    }
580
  }
581

582
  const SSttLvl *lvl1;
583
  TARRAY2_FOREACH(fset1->lvlArr, lvl1) {
×
584
    SSttLvl *lvl;
585
    code = tsdbSttLvlFilteredInitEx(pTsdb, lvl1, ever, &lvl, fopArr);
×
586
    if (code) {
×
587
      tsdbTFileSetClear(fset);
×
588
      return code;
×
589
    }
590

591
    code = TARRAY2_APPEND(fset[0]->lvlArr, lvl);
×
592
    if (code) return code;
×
593
  }
594

595
  return 0;
×
596
}
597

598
int32_t tsdbTFileSetRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever,
×
599
                                 STFileSetRange **fsr) {
600
  fsr[0] = taosMemoryCalloc(1, sizeof(*fsr[0]));
×
601
  if (fsr[0] == NULL) {
×
602
    return terrno;
×
603
  }
604
  fsr[0]->fid = fset1->fid;
×
605
  fsr[0]->sver = sver;
×
606
  fsr[0]->ever = ever;
×
607

608
  int32_t code = tsdbTFileSetInitRef(pTsdb, fset1, &fsr[0]->fset);
×
609
  if (code) {
×
610
    taosMemoryFree(fsr[0]);
×
611
    fsr[0] = NULL;
×
612
  }
613
  return code;
×
614
}
615

616
int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset) {
7,440,209✔
617
  int32_t code = tsdbTFileSetInit(fset1->fid, fset);
7,440,209✔
618
  if (code) return code;
7,434,055!
619

620
  for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
37,178,177✔
621
    if (fset1->farr[ftype] == NULL) continue;
29,739,581✔
622

623
    code = tsdbTFileObjRef(fset1->farr[ftype]);
784,067✔
624
    if (code) {
788,582!
625
      tsdbTFileSetClear(fset);
×
626
      return code;
×
627
    }
628
    fset[0]->farr[ftype] = fset1->farr[ftype];
788,608✔
629
  }
630

631
  const SSttLvl *lvl1;
632
  TARRAY2_FOREACH(fset1->lvlArr, lvl1) {
14,779,927✔
633
    SSttLvl *lvl;
634
    code = tsdbSttLvlInitRef(pTsdb, lvl1, &lvl);
7,339,577✔
635
    if (code) {
7,339,866!
636
      tsdbSttLvlClear(&lvl);
×
637
      tsdbTFileSetClear(fset);
×
638
      return code;
×
639
    }
640

641
    code = TARRAY2_APPEND(fset[0]->lvlArr, lvl);
7,344,036✔
642
    if (code) {
7,341,331!
643
      tsdbSttLvlClear(&lvl);
×
644
      tsdbTFileSetClear(fset);
×
645
      return code;
×
646
    }
647
  }
648

649
  (*fset)->lastCompact = fset1->lastCompact;
7,440,350✔
650
  (*fset)->lastCommit = fset1->lastCommit;
7,440,350✔
651

652
  return 0;
7,440,350✔
653
}
654

655
void tsdbTFileSetRangeClear(STFileSetRange **fsr) {
×
656
  if (!fsr[0]) return;
×
657

658
  tsdbTFileSetClear(&fsr[0]->fset);
×
659
  taosMemoryFree(fsr[0]);
×
660
  fsr[0] = NULL;
×
661
  return;
×
662
}
663

664
void tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray **ppArr) {
300✔
665
  if (ppArr && ppArr[0]) {
300!
666
    TARRAY2_DESTROY(ppArr[0], tsdbTFileSetRangeClear);
100!
667
    taosMemoryFree(ppArr[0]);
100!
668
    ppArr[0] = NULL;
100✔
669
  }
670
}
300✔
671

672
void tsdbTFileSetClear(STFileSet **fset) {
8,595,507✔
673
  if (fset && *fset) {
8,595,507!
674
    for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
40,735,409✔
675
      if ((*fset)->farr[ftype] == NULL) continue;
32,587,427✔
676
      int32_t code = tsdbTFileObjUnref((*fset)->farr[ftype]);
849,810✔
677
      if (code) {
850,503!
678
        tsdbError("failed to unref file, fid:%d, ftype:%d", (*fset)->fid, ftype);
×
679
      }
680
      (*fset)->farr[ftype] = NULL;
850,482✔
681
    }
682

683
    TARRAY2_DESTROY((*fset)->lvlArr, tsdbSttLvlClear);
16,206,484!
684

685
    (void)taosThreadCondDestroy(&(*fset)->canCommit);
8,147,571✔
686
    for (int32_t i = 0; i < sizeof((*fset)->conds) / sizeof((*fset)->conds[0]); ++i) {
24,441,487✔
687
      (void)taosThreadCondDestroy(&(*fset)->conds[i].cond);
16,294,167✔
688
    }
689
    taosMemoryFreeClear(*fset);
8,147,320!
690
  }
691
}
8,595,164✔
692

UNCOV
693
void tsdbTFileSetRemove(STFileSet *fset) {
×
UNCOV
694
  if (fset == NULL) return;
×
695

UNCOV
696
  for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
×
UNCOV
697
    if (fset->farr[ftype] != NULL) {
×
698
      int32_t code = tsdbTFileObjRemove(fset->farr[ftype]);
×
699
      if (code) {
×
700
        tsdbError("failed to remove file, fid:%d, ftype:%d", fset->fid, ftype);
×
701
      }
702
      fset->farr[ftype] = NULL;
×
703
    }
704
  }
705

UNCOV
706
  TARRAY2_DESTROY(fset->lvlArr, tsdbSttLvlRemove);
×
707
}
708

709
SSttLvl *tsdbTFileSetGetSttLvl(STFileSet *fset, int32_t level) {
350,651✔
710
  SSttLvl   sttLvl = {.level = level};
350,651✔
711
  SSttLvl  *lvl = &sttLvl;
350,651✔
712
  SSttLvl **lvlPtr = TARRAY2_SEARCH(fset->lvlArr, &lvl, tsdbSttLvlCmprFn, TD_EQ);
350,651✔
713
  return lvlPtr ? lvlPtr[0] : NULL;
350,662✔
714
}
715

716
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2) {
11,086,039✔
717
  if (fset1[0]->fid < fset2[0]->fid) return -1;
11,086,039✔
718
  if (fset1[0]->fid > fset2[0]->fid) return 1;
11,074,050✔
719
  return 0;
45,664✔
720
}
721

722
int64_t tsdbTFileSetMaxCid(const STFileSet *fset) {
1,584✔
723
  int64_t maxCid = 0;
1,584✔
724
  for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
7,931✔
725
    if (fset->farr[ftype] == NULL) continue;
6,347✔
726
    maxCid = TMAX(maxCid, fset->farr[ftype]->f->cid);
685✔
727
  }
728
  const SSttLvl   *lvl;
729
  const STFileObj *fobj;
730
  TARRAY2_FOREACH(fset->lvlArr, lvl) {
3,129✔
731
    TARRAY2_FOREACH(lvl->fobjArr, fobj) { maxCid = TMAX(maxCid, fobj->f->cid); }
3,196✔
732
  }
733
  return maxCid;
1,584✔
734
}
735

736
bool tsdbTFileSetIsEmpty(const STFileSet *fset) {
360,933✔
737
  for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
1,751,210✔
738
    if (fset->farr[ftype] != NULL) return false;
1,408,896✔
739
  }
740
  return TARRAY2_SIZE(fset->lvlArr) == 0;
342,314✔
741
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc