• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.43
/source/dnode/vnode/src/tsdb/tsdbIter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbIter.h"
17

18
// STsdbIter ================
19
struct STsdbIter {
20
  EIterType type;
21
  bool      noMoreData;
22
  bool      filterByVersion;
23
  int64_t   range[2];
24
  union {
25
    SRowInfo    row[1];
26
    STombRecord record[1];
27
  };
28
  SRBTreeNode node[1];
29
  union {
30
    struct {
31
      SSttFileReader     *reader;
32
      const TSttBlkArray *sttBlkArray;
33
      int32_t             sttBlkArrayIdx;
34
      SBlockData          blockData[1];
35
      int32_t             blockDataIdx;
36
    } sttData[1];
37
    struct {
38
      SDataFileReader     *reader;
39
      const TBrinBlkArray *brinBlkArray;
40
      int32_t              brinBlkArrayIdx;
41
      SBrinBlock           brinBlock[1];
42
      int32_t              brinBlockIdx;
43
      SBlockData           blockData[1];
44
      int32_t              blockDataIdx;
45
    } dataData[1];
46
    struct {
47
      SMemTable  *memt;
48
      STsdbRowKey from[1];
49
      SRBTreeIter iter[1];
50
      STbData    *tbData;
51
      STbDataIter tbIter[1];
52
    } memtData[1];
53
    struct {
54
      SSttFileReader      *reader;
55
      const TTombBlkArray *tombBlkArray;
56
      int32_t              tombBlkArrayIdx;
57
      STombBlock           tombBlock[1];
58
      int32_t              tombBlockIdx;
59
    } sttTomb[1];
60
    struct {
61
      SDataFileReader     *reader;
62
      const TTombBlkArray *tombBlkArray;
63
      int32_t              tombBlkArrayIdx;
64
      STombBlock           tombBlock[1];
65
      int32_t              tombBlockIdx;
66
    } dataTomb[1];
67
    struct {
68
      SMemTable  *memt;
69
      SRBTreeIter rbtIter[1];
70
      STbData    *tbData;
71
      SDelData   *delData;
72
    } memtTomb[1];
73
  };
74
};
75

76
static int32_t tsdbSttIterNext(STsdbIter *iter, const TABLEID *tbid) {
585,838,978✔
77
  while (!iter->noMoreData) {
585,986,680!
78
    for (; iter->sttData->blockDataIdx < iter->sttData->blockData->nRow; iter->sttData->blockDataIdx++) {
586,435,306✔
79
      int64_t version = iter->sttData->blockData->aVersion[iter->sttData->blockDataIdx];
586,278,362✔
80

81
      if (iter->filterByVersion && (version < iter->range[0] || version > iter->range[1])) {
586,278,362!
82
        continue;
424,863✔
83
      }
84

85
      iter->row->suid = iter->sttData->blockData->suid;
586,278,362✔
86
      iter->row->uid = iter->sttData->blockData->uid ? iter->sttData->blockData->uid
1,172,556,724✔
87
                                                     : iter->sttData->blockData->aUid[iter->sttData->blockDataIdx];
586,278,362✔
88

89
      if (tbid && iter->row->suid == tbid->suid && iter->row->uid == tbid->uid) {
586,278,362!
90
        continue;
424,863✔
91
      }
92

93
      iter->row->row = tsdbRowFromBlockData(iter->sttData->blockData, iter->sttData->blockDataIdx);
585,853,499✔
94
      iter->sttData->blockDataIdx++;
585,853,499✔
95
      goto _exit;
585,853,499✔
96
    }
97

98
    if (iter->sttData->sttBlkArrayIdx >= TARRAY2_SIZE(iter->sttData->sttBlkArray)) {
156,944✔
99
      iter->noMoreData = true;
9,250✔
100
      break;
9,250✔
101
    }
102

103
    for (; iter->sttData->sttBlkArrayIdx < TARRAY2_SIZE(iter->sttData->sttBlkArray); iter->sttData->sttBlkArrayIdx++) {
147,794!
104
      const SSttBlk *sttBlk = TARRAY2_GET_PTR(iter->sttData->sttBlkArray, iter->sttData->sttBlkArrayIdx);
159,308✔
105

106
      if (iter->filterByVersion && (sttBlk->maxVer < iter->range[0] || sttBlk->minVer > iter->range[1])) {
159,308!
107
        continue;
×
108
      }
109

110
      if (tbid && tbid->suid == sttBlk->suid && tbid->uid == sttBlk->minUid && tbid->uid == sttBlk->maxUid) {
159,308!
111
        continue;
100✔
112
      }
113

114
      int32_t code = tsdbSttFileReadBlockData(iter->sttData->reader, sttBlk, iter->sttData->blockData);
159,208✔
115
      if (code) return code;
159,216!
116

117
      iter->sttData->blockDataIdx = 0;
159,216✔
118
      iter->sttData->sttBlkArrayIdx++;
159,216✔
119
      break;
159,216✔
120
    }
121
  }
122

123
_exit:
×
124
  return 0;
585,838,986✔
125
}
126

127
static int32_t tsdbDataIterNext(STsdbIter *iter, const TABLEID *tbid) {
209,812✔
128
  int32_t code;
129

130
  while (!iter->noMoreData) {
209,821!
131
    for (;;) {
132
      // SBlockData
133
      for (; iter->dataData->blockDataIdx < iter->dataData->blockData->nRow; iter->dataData->blockDataIdx++) {
209,937✔
134
        int64_t version = iter->dataData->blockData->aVersion[iter->dataData->blockDataIdx];
209,845✔
135
        if (iter->filterByVersion && (version < iter->range[0] || version > iter->range[1])) {
209,845!
136
          continue;
×
137
        }
138

139
        if (tbid && tbid->suid == iter->dataData->blockData->suid && tbid->uid == iter->dataData->blockData->uid) {
209,845!
140
          iter->dataData->blockDataIdx = iter->dataData->blockData->nRow;
12✔
141
          break;
12✔
142
        }
143

144
        iter->row->row = tsdbRowFromBlockData(iter->dataData->blockData, iter->dataData->blockDataIdx);
209,833✔
145
        iter->dataData->blockDataIdx++;
209,833✔
146
        goto _exit;
209,833✔
147
      }
148

149
      // SBrinBlock
150
      if (iter->dataData->brinBlockIdx >= iter->dataData->brinBlock->numOfRecords) {
104✔
151
        break;
18✔
152
      }
153

154
      for (; iter->dataData->brinBlockIdx < iter->dataData->brinBlock->numOfRecords; iter->dataData->brinBlockIdx++) {
104!
155
        SBrinRecord record[1];
156
        code = tBrinBlockGet(iter->dataData->brinBlock, iter->dataData->brinBlockIdx, record);
105✔
157
        if (code) return code;
105!
158

159
        if (iter->filterByVersion && (record->maxVer < iter->range[0] || record->minVer > iter->range[1])) {
105!
160
          continue;
18✔
161
        }
162

163
        if (tbid && tbid->suid == record->suid && tbid->uid == record->uid) {
105!
164
          continue;
18✔
165
        }
166

167
        iter->row->suid = record->suid;
87✔
168
        iter->row->uid = record->uid;
87✔
169

170
        code = tsdbDataFileReadBlockData(iter->dataData->reader, record, iter->dataData->blockData);
87✔
171
        if (code) return code;
87!
172

173
        iter->dataData->blockDataIdx = 0;
87✔
174
        iter->dataData->brinBlockIdx++;
87✔
175
        break;
87✔
176
      }
177
    }
178

179
    if (iter->dataData->brinBlkArrayIdx >= TARRAY2_SIZE(iter->dataData->brinBlkArray)) {
18✔
180
      iter->noMoreData = true;
9✔
181
      break;
9✔
182
    }
183

184
    for (; iter->dataData->brinBlkArrayIdx < TARRAY2_SIZE(iter->dataData->brinBlkArray);
9!
185
         iter->dataData->brinBlkArrayIdx++) {
×
186
      const SBrinBlk *brinBlk = TARRAY2_GET_PTR(iter->dataData->brinBlkArray, iter->dataData->brinBlkArrayIdx);
9✔
187

188
      if (iter->filterByVersion && (brinBlk->maxVer < iter->range[0] || brinBlk->minVer > iter->range[1])) {
9!
189
        continue;
×
190
      }
191

192
      if (tbid && tbid->uid == brinBlk->minTbid.uid && tbid->uid == brinBlk->maxTbid.uid) {
9!
193
        continue;
×
194
      }
195

196
      code = tsdbDataFileReadBrinBlock(iter->dataData->reader, brinBlk, iter->dataData->brinBlock);
9✔
197
      if (code) return code;
9!
198

199
      iter->dataData->brinBlockIdx = 0;
9✔
200
      iter->dataData->brinBlkArrayIdx++;
9✔
201
      break;
9✔
202
    }
203
  }
204

UNCOV
205
_exit:
×
206
  return 0;
209,812✔
207
}
208

209
static int32_t tsdbMemTableIterNext(STsdbIter *iter, const TABLEID *tbid) {
671,644,342✔
210
  SRBTreeNode *node;
211

212
  while (!iter->noMoreData) {
688,996,940!
213
    for (TSDBROW *row; iter->memtData->tbData && (row = tsdbTbDataIterGet(iter->memtData->tbIter));) {
1,377,551,460!
214
      if (tbid && tbid->suid == iter->memtData->tbData->suid && tbid->uid == iter->memtData->tbData->uid) {
687,534,422✔
215
        iter->memtData->tbData = NULL;
16,431,634✔
216
        break;
16,431,634✔
217
      }
218

219
      if (iter->filterByVersion) {
671,102,788!
220
        int64_t version = TSDBROW_VERSION(row);
×
221
        if (version < iter->range[0] || version > iter->range[1]) {
×
222
          continue;
×
223
        }
224
      }
225

226
      iter->row->row = row[0];
671,102,788✔
227

228
      bool r = tsdbTbDataIterNext(iter->memtData->tbIter);
671,102,788✔
229
      goto _exit;
670,607,113✔
230
    }
231

232
    for (;;) {
233
      node = tRBTreeIterNext(iter->memtData->iter);
17,920,956✔
234
      if (!node) {
17,808,249✔
235
        iter->noMoreData = true;
454,519✔
236
        goto _exit;
454,519✔
237
      }
238

239
      iter->memtData->tbData = TCONTAINER_OF(node, STbData, rbtn);
17,353,730✔
240
      if (tbid && tbid->suid == iter->memtData->tbData->suid && tbid->uid == iter->memtData->tbData->uid) {
17,353,730!
241
        continue;
×
242
      } else {
243
        iter->row->suid = iter->memtData->tbData->suid;
17,353,730✔
244
        iter->row->uid = iter->memtData->tbData->uid;
17,353,730✔
245
        tsdbTbDataIterOpen(iter->memtData->tbData, iter->memtData->from, 0, iter->memtData->tbIter);
17,353,730✔
246
        break;
17,352,598✔
247
      }
248
    }
249
  }
250

251
_exit:
×
252
  return 0;
671,034,828✔
253
}
254

255
static int32_t tsdbDataTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
×
256
  while (!iter->noMoreData) {
×
257
    for (; iter->dataTomb->tombBlockIdx < TOMB_BLOCK_SIZE(iter->dataTomb->tombBlock); iter->dataTomb->tombBlockIdx++) {
×
258
      int32_t code = tTombBlockGet(iter->dataTomb->tombBlock, iter->dataTomb->tombBlockIdx, iter->record);
×
259
      if (code) return code;
×
260

261
      if (iter->filterByVersion && (iter->record->version < iter->range[0] || iter->record->version > iter->range[1])) {
×
262
        continue;
×
263
      }
264

265
      if (tbid && iter->record->suid == tbid->suid && iter->record->uid == tbid->uid) {
×
266
        continue;
×
267
      }
268
      iter->dataTomb->tombBlockIdx++;
×
269
      goto _exit;
×
270
    }
271

272
    if (iter->dataTomb->tombBlkArrayIdx >= TARRAY2_SIZE(iter->dataTomb->tombBlkArray)) {
×
273
      iter->noMoreData = true;
×
274
      goto _exit;
×
275
    }
276

277
    for (; iter->dataTomb->tombBlkArrayIdx < TARRAY2_SIZE(iter->dataTomb->tombBlkArray);
×
278
         iter->dataTomb->tombBlkArrayIdx++) {
×
279
      const STombBlk *tombBlk = TARRAY2_GET_PTR(iter->dataTomb->tombBlkArray, iter->dataTomb->tombBlkArrayIdx);
×
280

281
      if (tbid && tbid->suid == tombBlk->minTbid.suid && tbid->uid == tombBlk->minTbid.uid &&
×
282
          tbid->suid == tombBlk->maxTbid.suid && tbid->uid == tombBlk->maxTbid.uid) {
×
283
        continue;
×
284
      }
285

286
      int32_t code = tsdbDataFileReadTombBlock(iter->dataTomb->reader, tombBlk, iter->dataTomb->tombBlock);
×
287
      if (code) return code;
×
288

289
      iter->dataTomb->tombBlockIdx = 0;
×
290
      iter->dataTomb->tombBlkArrayIdx++;
×
291
      break;
×
292
    }
293
  }
294

295
_exit:
×
296
  return 0;
×
297
}
298

299
static int32_t tsdbMemTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
184,589✔
300
  while (!iter->noMoreData) {
235,468!
301
    for (; iter->memtTomb->delData;) {
235,468✔
302
      if (tbid && tbid->uid == iter->memtTomb->tbData->uid) {
177,914!
303
        iter->memtTomb->delData = NULL;
×
304
        break;
×
305
      }
306

307
      if (iter->filterByVersion &&
177,914!
308
          (iter->memtTomb->delData->version < iter->range[0] || iter->memtTomb->delData->version > iter->range[1])) {
×
309
        continue;
×
310
      }
311

312
      iter->record->suid = iter->memtTomb->tbData->suid;
177,914✔
313
      iter->record->uid = iter->memtTomb->tbData->uid;
177,914✔
314
      iter->record->version = iter->memtTomb->delData->version;
177,914✔
315
      iter->record->skey = iter->memtTomb->delData->sKey;
177,914✔
316
      iter->record->ekey = iter->memtTomb->delData->eKey;
177,914✔
317

318
      iter->memtTomb->delData = iter->memtTomb->delData->pNext;
177,914✔
319
      goto _exit;
177,914✔
320
    }
321

322
    for (;;) {
×
323
      SRBTreeNode *node = tRBTreeIterNext(iter->memtTomb->rbtIter);
57,554✔
324
      if (node == NULL) {
57,554✔
325
        iter->noMoreData = true;
6,675✔
326
        goto _exit;
6,675✔
327
      }
328

329
      iter->memtTomb->tbData = TCONTAINER_OF(node, STbData, rbtn);
50,879✔
330
      if (tbid && tbid->uid == iter->memtTomb->tbData->uid) {
50,879!
331
        continue;
×
332
      } else {
333
        iter->memtTomb->delData = iter->memtTomb->tbData->pHead;
50,879✔
334
        break;
50,879✔
335
      }
336
    }
337
  }
338

339
_exit:
×
340
  return 0;
184,589✔
341
}
342

343
static int32_t tsdbSttIterOpen(STsdbIter *iter) {
10,935✔
344
  int32_t code;
345

346
  code = tsdbSttFileReadSttBlk(iter->sttData->reader, &iter->sttData->sttBlkArray);
10,935✔
347
  if (code) return code;
10,933!
348

349
  if (TARRAY2_SIZE(iter->sttData->sttBlkArray) == 0) {
10,933✔
350
    iter->noMoreData = true;
1,682✔
351
    return 0;
1,682✔
352
  }
353

354
  iter->sttData->sttBlkArrayIdx = 0;
9,251✔
355
  code = tBlockDataCreate(iter->sttData->blockData);
9,251✔
356
  if (code) return code;
9,247!
357
  iter->sttData->blockDataIdx = 0;
9,247✔
358

359
  return tsdbSttIterNext(iter, NULL);
9,247✔
360
}
361

362
static int32_t tsdbDataIterOpen(STsdbIter *iter) {
106✔
363
  int32_t code;
364

365
  // SBrinBlk
366
  code = tsdbDataFileReadBrinBlk(iter->dataData->reader, &iter->dataData->brinBlkArray);
106✔
367
  if (code) return code;
106!
368

369
  if (TARRAY2_SIZE(iter->dataData->brinBlkArray) == 0) {
106✔
370
    iter->noMoreData = true;
97✔
371
    return 0;
97✔
372
  }
373

374
  iter->dataData->brinBlkArrayIdx = 0;
9✔
375

376
  // SBrinBlock
377
  code = tBrinBlockInit(iter->dataData->brinBlock);
9✔
378
  if (code) return code;
9!
379
  iter->dataData->brinBlockIdx = 0;
9✔
380

381
  // SBlockData
382
  code = tBlockDataCreate(iter->dataData->blockData);
9✔
383
  if (code) return code;
9!
384
  iter->dataData->blockDataIdx = 0;
9✔
385

386
  return tsdbDataIterNext(iter, NULL);
9✔
387
}
388

389
static int32_t tsdbMemTableIterOpen(STsdbIter *iter) {
456,743✔
390
  if (iter->memtData->memt->nRow == 0) {
456,743✔
391
    iter->noMoreData = true;
2,307✔
392
    return 0;
2,307✔
393
  }
394

395
  iter->memtData->iter[0] = tRBTreeIterCreate(iter->memtData->memt->tbDataTree, 1);
454,436✔
396
  return tsdbMemTableIterNext(iter, NULL);
454,436✔
397
}
398

399
static int32_t tsdbSttIterClose(STsdbIter *iter) {
10,939✔
400
  tBlockDataDestroy(iter->sttData->blockData);
10,939✔
401
  return 0;
10,939✔
402
}
403

404
static int32_t tsdbDataTombIterOpen(STsdbIter *iter) {
106✔
405
  int32_t code;
406

407
  code = tsdbDataFileReadTombBlk(iter->dataTomb->reader, &iter->dataTomb->tombBlkArray);
106✔
408
  if (code) return code;
106!
409

410
  if (TARRAY2_SIZE(iter->dataTomb->tombBlkArray) == 0) {
106!
411
    iter->noMoreData = true;
106✔
412
    return 0;
106✔
413
  }
414
  iter->dataTomb->tombBlkArrayIdx = 0;
×
415

416
  tTombBlockInit(iter->dataTomb->tombBlock);
×
417
  iter->dataTomb->tombBlockIdx = 0;
×
418

419
  return tsdbDataTombIterNext(iter, NULL);
×
420
}
421

422
static int32_t tsdbMemTombIterOpen(STsdbIter *iter) {
456,823✔
423
  int32_t code;
424

425
  if (iter->memtTomb->memt->nDel == 0) {
456,823✔
426
    iter->noMoreData = true;
450,153✔
427
    return 0;
450,153✔
428
  }
429

430
  iter->memtTomb->rbtIter[0] = tRBTreeIterCreate(iter->memtTomb->memt->tbDataTree, 1);
6,670✔
431
  return tsdbMemTombIterNext(iter, NULL);
6,670✔
432
}
433

434
static int32_t tsdbDataIterClose(STsdbIter *iter) {
106✔
435
  tBrinBlockDestroy(iter->dataData->brinBlock);
106✔
436
  tBlockDataDestroy(iter->dataData->blockData);
106✔
437
  return 0;
106✔
438
}
439

440
static int32_t tsdbMemTableIterClose(STsdbIter *iter) { return 0; }
456,773✔
441

442
static int32_t tsdbSttTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
57,997✔
443
  while (!iter->noMoreData) {
64,837!
444
    for (; iter->sttTomb->tombBlockIdx < TOMB_BLOCK_SIZE(iter->sttTomb->tombBlock); iter->sttTomb->tombBlockIdx++) {
66,601✔
445
      int32_t code = tTombBlockGet(iter->sttTomb->tombBlock, iter->sttTomb->tombBlockIdx, iter->record);
52,922✔
446
      if (code) return code;
52,922!
447

448
      if (iter->filterByVersion && (iter->record->version < iter->range[0] || iter->record->version > iter->range[1])) {
52,922!
449
        continue;
×
450
      }
451

452
      if (tbid && iter->record->suid == tbid->suid && iter->record->uid == tbid->uid) {
52,922!
453
        continue;
1,764✔
454
      }
455

456
      iter->sttTomb->tombBlockIdx++;
51,158✔
457
      goto _exit;
51,158✔
458
    }
459

460
    if (iter->sttTomb->tombBlkArrayIdx >= TARRAY2_SIZE(iter->sttTomb->tombBlkArray)) {
13,679✔
461
      iter->noMoreData = true;
6,841✔
462
      goto _exit;
6,841✔
463
    }
464

465
    for (; iter->sttTomb->tombBlkArrayIdx < TARRAY2_SIZE(iter->sttTomb->tombBlkArray);
6,838!
466
         iter->sttTomb->tombBlkArrayIdx++) {
×
467
      const STombBlk *tombBlk = TARRAY2_GET_PTR(iter->sttTomb->tombBlkArray, iter->sttTomb->tombBlkArrayIdx);
6,838✔
468

469
      if (iter->filterByVersion && (tombBlk->maxVer < iter->range[0] || tombBlk->minVer > iter->range[1])) {
6,838!
470
        continue;
×
471
      }
472

473
      if (tbid && tbid->suid == tombBlk->minTbid.suid && tbid->uid == tombBlk->minTbid.uid &&
6,838!
474
          tbid->suid == tombBlk->maxTbid.suid && tbid->uid == tombBlk->maxTbid.uid) {
×
475
        continue;
×
476
      }
477

478
      int32_t code = tsdbSttFileReadTombBlock(iter->sttTomb->reader, tombBlk, iter->sttTomb->tombBlock);
6,838✔
479
      if (code) return code;
6,840!
480

481
      iter->sttTomb->tombBlockIdx = 0;
6,840✔
482
      iter->sttTomb->tombBlkArrayIdx++;
6,840✔
483
      break;
6,840✔
484
    }
485
  }
486

487
_exit:
×
488
  return 0;
57,999✔
489
}
490

491
static int32_t tsdbSttTombIterOpen(STsdbIter *iter) {
10,933✔
492
  int32_t code;
493

494
  code = tsdbSttFileReadTombBlk(iter->sttTomb->reader, &iter->sttTomb->tombBlkArray);
10,933✔
495
  if (code) return code;
10,934!
496

497
  if (TARRAY2_SIZE(iter->sttTomb->tombBlkArray) == 0) {
10,934✔
498
    iter->noMoreData = true;
4,097✔
499
    return 0;
4,097✔
500
  }
501

502
  iter->sttTomb->tombBlkArrayIdx = 0;
6,837✔
503
  tTombBlockInit(iter->sttTomb->tombBlock);
6,837✔
504
  iter->sttTomb->tombBlockIdx = 0;
6,841✔
505

506
  return tsdbSttTombIterNext(iter, NULL);
6,841✔
507
}
508

509
int32_t tsdbIterOpen(const STsdbIterConfig *config, STsdbIter **iter) {
935,575✔
510
  int32_t code;
511

512
  iter[0] = taosMemoryCalloc(1, sizeof(*iter[0]));
935,575!
513
  if (iter[0] == NULL) {
935,688!
514
    return terrno;
×
515
  }
516

517
  iter[0]->type = config->type;
935,688✔
518
  iter[0]->noMoreData = false;
935,688✔
519
  iter[0]->filterByVersion = config->filterByVersion;
935,688✔
520
  if (iter[0]->filterByVersion) {
935,688!
521
    iter[0]->range[0] = config->verRange[0];
×
522
    iter[0]->range[1] = config->verRange[1];
×
523
  }
524

525
  switch (config->type) {
935,688!
526
    case TSDB_ITER_TYPE_STT:
10,936✔
527
      iter[0]->sttData->reader = config->sttReader;
10,936✔
528
      code = tsdbSttIterOpen(iter[0]);
10,936✔
529
      break;
10,936✔
530
    case TSDB_ITER_TYPE_DATA:
106✔
531
      iter[0]->dataData->reader = config->dataReader;
106✔
532
      code = tsdbDataIterOpen(iter[0]);
106✔
533
      break;
106✔
534
    case TSDB_ITER_TYPE_MEMT:
456,772✔
535
      iter[0]->memtData->memt = config->memt;
456,772✔
536
      iter[0]->memtData->from[0] = config->from[0];
456,772✔
537
      code = tsdbMemTableIterOpen(iter[0]);
456,772✔
538
      break;
456,785✔
539
    case TSDB_ITER_TYPE_STT_TOMB:
10,936✔
540
      iter[0]->sttTomb->reader = config->sttReader;
10,936✔
541
      code = tsdbSttTombIterOpen(iter[0]);
10,936✔
542
      break;
10,935✔
543
    case TSDB_ITER_TYPE_DATA_TOMB:
106✔
544
      iter[0]->dataTomb->reader = config->dataReader;
106✔
545
      code = tsdbDataTombIterOpen(iter[0]);
106✔
546
      break;
106✔
547
    case TSDB_ITER_TYPE_MEMT_TOMB:
456,842✔
548
      iter[0]->memtTomb->memt = config->memt;
456,842✔
549
      code = tsdbMemTombIterOpen(iter[0]);
456,842✔
550
      break;
456,841✔
UNCOV
551
    default:
×
UNCOV
552
      return TSDB_CODE_INVALID_PARA;
×
553
  }
554

555
  if (code) {
935,709!
556
    taosMemoryFree(iter[0]);
×
557
    iter[0] = NULL;
×
558
  }
559
  return code;
935,709✔
560
}
561

562
static int32_t tsdbSttTombIterClose(STsdbIter *iter) {
10,939✔
563
  tTombBlockDestroy(iter->sttTomb->tombBlock);
10,939✔
564
  return 0;
10,939✔
565
}
566

567
static int32_t tsdbDataTombIterClose(STsdbIter *iter) {
106✔
568
  tTombBlockDestroy(iter->dataTomb->tombBlock);
106✔
569
  return 0;
106✔
570
}
571

572
int32_t tsdbIterClose(STsdbIter **iter) {
935,654✔
573
  switch (iter[0]->type) {
935,654!
574
    case TSDB_ITER_TYPE_STT:
10,939✔
575
      tsdbSttIterClose(iter[0]);
10,939✔
576
      break;
10,939✔
577
    case TSDB_ITER_TYPE_DATA:
106✔
578
      tsdbDataIterClose(iter[0]);
106✔
579
      break;
106✔
580
    case TSDB_ITER_TYPE_MEMT:
456,835✔
581
      tsdbMemTableIterClose(iter[0]);
456,835✔
582
      break;
456,792✔
583
    case TSDB_ITER_TYPE_STT_TOMB:
10,939✔
584
      tsdbSttTombIterClose(iter[0]);
10,939✔
585
      break;
10,939✔
586
    case TSDB_ITER_TYPE_DATA_TOMB:
106✔
587
      tsdbDataTombIterClose(iter[0]);
106✔
588
      break;
106✔
589
    case TSDB_ITER_TYPE_MEMT_TOMB:
456,791✔
590
      break;
456,791✔
591
    default:
×
592
      return TSDB_CODE_INVALID_PARA;
×
593
  }
594
  taosMemoryFree(iter[0]);
935,673!
595
  iter[0] = NULL;
935,753✔
596
  return 0;
935,753✔
597
}
598

599
int32_t tsdbIterNext(STsdbIter *iter) {
1,239,971,138✔
600
  switch (iter->type) {
1,239,971,138!
601
    case TSDB_ITER_TYPE_STT:
585,846,421✔
602
      return tsdbSttIterNext(iter, NULL);
585,846,421✔
603
    case TSDB_ITER_TYPE_DATA:
209,813✔
604
      return tsdbDataIterNext(iter, NULL);
209,813✔
605
    case TSDB_ITER_TYPE_MEMT:
654,193,690✔
606
      return tsdbMemTableIterNext(iter, NULL);
654,193,690✔
607
    case TSDB_ITER_TYPE_STT_TOMB:
50,966✔
608
      return tsdbSttTombIterNext(iter, NULL);
50,966✔
609
    case TSDB_ITER_TYPE_DATA_TOMB:
×
610
      return tsdbDataTombIterNext(iter, NULL);
×
611
    case TSDB_ITER_TYPE_MEMT_TOMB:
177,907✔
612
      return tsdbMemTombIterNext(iter, NULL);
177,907✔
613
    default:
×
614
      return TSDB_CODE_INVALID_PARA;
×
615
  }
616
  return 0;
617
}
618

619
static int32_t tsdbIterSkipTableData(STsdbIter *iter, const TABLEID *tbid) {
17,166,836✔
620
  switch (iter->type) {
17,166,836!
621
    case TSDB_ITER_TYPE_STT:
703✔
622
      return tsdbSttIterNext(iter, tbid);
703✔
623
    case TSDB_ITER_TYPE_DATA:
12✔
624
      return tsdbDataIterNext(iter, tbid);
12✔
625
    case TSDB_ITER_TYPE_MEMT:
17,165,982✔
626
      return tsdbMemTableIterNext(iter, tbid);
17,165,982✔
627
    case TSDB_ITER_TYPE_STT_TOMB:
193✔
628
      return tsdbSttTombIterNext(iter, tbid);
193✔
629
    case TSDB_ITER_TYPE_DATA_TOMB:
×
630
      return tsdbDataTombIterNext(iter, tbid);
×
631
    case TSDB_ITER_TYPE_MEMT_TOMB:
4✔
632
      return tsdbMemTombIterNext(iter, tbid);
4✔
633
    default:
×
634
      return TSDB_CODE_INVALID_PARA;
×
635
  }
636
  return 0;
637
}
638

639
static int32_t tsdbIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
404,387,949✔
640
  STsdbIter *iter1 = TCONTAINER_OF(n1, STsdbIter, node);
404,387,949✔
641
  STsdbIter *iter2 = TCONTAINER_OF(n2, STsdbIter, node);
404,387,949✔
642
  return tRowInfoCmprFn(&iter1->row, &iter2->row);
404,387,949✔
643
}
644

645
static int32_t tsdbTombIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
56,032✔
646
  STsdbIter *iter1 = TCONTAINER_OF(n1, STsdbIter, node);
56,032✔
647
  STsdbIter *iter2 = TCONTAINER_OF(n2, STsdbIter, node);
56,032✔
648

649
  if (iter1->record->suid < iter2->record->suid) {
56,032✔
650
    return -1;
2,908✔
651
  } else if (iter1->record->suid > iter2->record->suid) {
53,124✔
652
    return 1;
2,117✔
653
  }
654

655
  if (iter1->record->uid < iter2->record->uid) {
51,007✔
656
    return -1;
21,155✔
657
  } else if (iter1->record->uid > iter2->record->uid) {
29,852✔
658
    return 1;
11,139✔
659
  }
660

661
  if (iter1->record->version < iter2->record->version) {
18,713✔
662
    return -1;
11,196✔
663
  } else if (iter1->record->version > iter2->record->version) {
7,517!
664
    return 1;
7,517✔
665
  }
666

667
  return 0;
×
668
}
669

670
// SIterMerger ================
671
struct SIterMerger {
672
  bool       isTomb;
673
  STsdbIter *iter;
674
  SRBTree    iterTree[1];
675
};
676

677
int32_t tsdbIterMergerOpen(const TTsdbIterArray *iterArray, SIterMerger **merger, bool isTomb) {
922,182✔
678
  STsdbIter   *iter;
679
  SRBTreeNode *node;
680

681
  merger[0] = taosMemoryCalloc(1, sizeof(*merger[0]));
922,182!
682
  if (merger[0] == NULL) {
922,241!
683
    return terrno;
×
684
  }
685

686
  merger[0]->isTomb = isTomb;
922,241✔
687
  if (isTomb) {
922,241✔
688
    tRBTreeCreate(merger[0]->iterTree, tsdbTombIterCmprFn);
461,145✔
689
  } else {
690
    tRBTreeCreate(merger[0]->iterTree, tsdbIterCmprFn);
461,096✔
691
  }
692
  TARRAY2_FOREACH(iterArray, iter) {
1,857,925✔
693
    if (iter->noMoreData) continue;
935,737✔
694
    node = tRBTreePut(merger[0]->iterTree, iter->node);
477,290✔
695
    if (node == NULL) {
477,270✔
696
      taosMemoryFree(merger[0]);
56!
697
      return TSDB_CODE_INVALID_PARA;
×
698
    }
699
  }
700

701
  return tsdbIterMergerNext(merger[0]);
922,188✔
702
}
703

704
void tsdbIterMergerClose(SIterMerger **merger) {
922,229✔
705
  if (merger[0]) {
922,229!
706
    taosMemoryFree(merger[0]);
922,253!
707
    merger[0] = NULL;
922,287✔
708
  }
709
}
922,263✔
710

711
int32_t tsdbIterMergerNext(SIterMerger *merger) {
1,241,046,996✔
712
  int32_t      code;
713
  int32_t      c;
714
  SRBTreeNode *node;
715

716
  if (merger->iter) {
1,241,046,996✔
717
    code = tsdbIterNext(merger->iter);
1,240,177,092✔
718
    if (code) return code;
1,238,909,222!
719

720
    if (merger->iter->noMoreData) {
1,238,909,222✔
721
      merger->iter = NULL;
31,130✔
722
    } else if ((node = tRBTreeMin(merger->iterTree))) {
1,238,878,092✔
723
      c = merger->iterTree->cmprFn(merger->iter->node, node);
402,691,969✔
724
      if (c > 0) {
402,672,643✔
725
        node = tRBTreePut(merger->iterTree, merger->iter->node);
309,265✔
726
        merger->iter = NULL;
421,503✔
727
      }
728
    }
729
  }
730

731
  if (merger->iter == NULL && (node = tRBTreeDropMin(merger->iterTree))) {
1,239,872,038✔
732
    merger->iter = TCONTAINER_OF(node, STsdbIter, node);
898,827✔
733
  }
734

735
  return 0;
1,239,872,477✔
736
}
737

738
SRowInfo *tsdbIterMergerGetData(SIterMerger *merger) { return merger->iter ? merger->iter->row : NULL; }
1,255,938,926✔
739

740
STombRecord *tsdbIterMergerGetTombRecord(SIterMerger *merger) { return merger->iter ? merger->iter->record : NULL; }
579,160✔
741

742
int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) {
17,166,750✔
743
  int32_t      code;
744
  int32_t      c;
745
  SRBTreeNode *node;
746

747
  while (merger->iter && tbid->suid == merger->iter->row->suid && tbid->uid == merger->iter->row->uid) {
34,332,564✔
748
    int32_t code = tsdbIterSkipTableData(merger->iter, tbid);
17,166,889✔
749
    if (code) return code;
17,165,535!
750

751
    if (merger->iter->noMoreData) {
17,165,535✔
752
      merger->iter = NULL;
446,165✔
753
    } else if ((node = tRBTreeMin(merger->iterTree))) {
16,719,370✔
754
      c = merger->iterTree->cmprFn(merger->iter->node, node);
452✔
755
      if (c > 0) {
452!
UNCOV
756
        node = tRBTreePut(merger->iterTree, merger->iter->node);
×
757
        merger->iter = NULL;
129✔
758
      }
759
    }
760

761
    if (!merger->iter && (node = tRBTreeDropMin(merger->iterTree))) {
17,165,816✔
762
      merger->iter = TCONTAINER_OF(node, STsdbIter, node);
525✔
763
    }
764
  }
765

766
  return 0;
17,165,675✔
767
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc