• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.86
/source/dnode/vnode/src/tsdb/tsdbReadUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdbReadUtil.h"
17
#include "tsdb.h"
18
#include "tsdbDataFileRW.h"
19
#include "tsdbFS2.h"
20
#include "tsdbMerge.h"
21
#include "tsdbUtil2.h"
22
#include "tsimplehash.h"
23

24
static bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
25
                                            int32_t order);
26

27
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
4,619,691✔
28
  int32_t              code = TSDB_CODE_SUCCESS;
4,619,691✔
29
  int32_t              lino = 0;
4,619,691✔
30
  int32_t              num = 0;
4,619,691✔
31
  int32_t              remainder = 0;
4,619,691✔
32
  STableBlockScanInfo* p = NULL;
4,619,691✔
33
  const void*          px = NULL;
4,619,691✔
34

35
  TSDB_CHECK_CONDITION(pBuf && pBuf->numPerBucket > 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,619,691!
36
  TSDB_CHECK_CONDITION(numOfTables >= 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,619,691!
37

38
  num = numOfTables / pBuf->numPerBucket;
4,619,691✔
39
  remainder = numOfTables % pBuf->numPerBucket;
4,619,691✔
40

41
  if (pBuf->pData == NULL) {
4,619,691!
42
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
4,625,480✔
43
    TSDB_CHECK_NULL(pBuf->pData, code, lino, _end, terrno);
4,629,187!
44
  }
45

46
  for (int32_t i = 0; i < num; ++i) {
4,629,353✔
47
    p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
112!
48
    TSDB_CHECK_NULL(p, code, lino, _end, terrno);
112!
49

50
    px = taosArrayPush(pBuf->pData, &p);
112✔
51
    TSDB_CHECK_NULL(px, code, lino, _end, terrno);
112!
52
    p = NULL;
112✔
53
  }
54

55
  if (remainder > 0) {
4,629,241✔
56
    p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
4,500,815✔
57
    TSDB_CHECK_NULL(p, code, lino, _end, terrno);
4,502,982!
58

59
    px = taosArrayPush(pBuf->pData, &p);
4,502,982✔
60
    TSDB_CHECK_NULL(px, code, lino, _end, terrno);
4,504,411!
61
    p = NULL;
4,504,411✔
62
  }
63

64
  pBuf->numOfTables = numOfTables;
4,632,837✔
65

66
_end:
4,632,837✔
67
  if (code != TSDB_CODE_SUCCESS) {
4,632,837!
68
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
69
  }
70
  if (p) {
4,622,824!
71
    taosMemoryFreeClear(p);
×
72
  }
73
  return code;
4,622,824✔
74
}
75

76
int32_t uidComparFunc(const void* p1, const void* p2) {
10,188,743✔
77
  uint64_t pu1 = *(const uint64_t*)p1;
10,188,743✔
78
  uint64_t pu2 = *(const uint64_t*)p2;
10,188,743✔
79
  if (pu1 == pu2) {
10,188,743!
80
    return 0;
×
81
  } else {
82
    return (pu1 < pu2) ? -1 : 1;
10,188,743!
83
  }
84
}
85

86
int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
6✔
87
  int32_t              code = TSDB_CODE_SUCCESS;
6✔
88
  int32_t              lino = 0;
6✔
89
  int32_t              num = 0;
6✔
90
  int32_t              remainder = 0;
6✔
91
  STableBlockScanInfo* p = NULL;
6✔
92
  const void*          px = NULL;
6✔
93

94
  TSDB_CHECK_CONDITION(pBuf && pBuf->numPerBucket > 0 && pBuf->numOfTables >= 0, code, lino, _end,
6!
95
                       TSDB_CODE_INVALID_PARA);
96
  TSDB_CHECK_CONDITION(numOfTables >= 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
6!
97

98
  if (numOfTables <= pBuf->numOfTables) {
6!
99
    goto _end;
×
100
  }
101

102
  remainder = pBuf->numOfTables % pBuf->numPerBucket;
6✔
103
  if (remainder > 0) {
6!
104
    TSDB_CHECK_CONDITION(taosArrayGetSize(pBuf->pData) > 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
6!
105
    px = taosArrayPop(pBuf->pData);
6✔
106
    TSDB_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INVALID_PARA);
6!
107
    p = *(STableBlockScanInfo**)px;
6✔
108
    taosMemoryFreeClear(p);
6!
109
    pBuf->numOfTables -= remainder;
6✔
110
  }
111

112
  num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket;
6✔
113
  remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket;
6✔
114

115
  if (pBuf->pData == NULL) {
6!
116
    pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
×
117
    TSDB_CHECK_NULL(pBuf->pData, code, lino, _end, terrno);
×
118
  }
119

120
  for (int32_t i = 0; i < num; ++i) {
6!
121
    p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
×
122
    TSDB_CHECK_NULL(p, code, lino, _end, terrno);
×
123

124
    px = taosArrayPush(pBuf->pData, &p);
×
125
    TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
126
    p = NULL;
×
127
  }
128

129
  if (remainder > 0) {
6!
130
    p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
6!
131
    TSDB_CHECK_NULL(p, code, lino, _end, terrno);
6!
132

133
    px = taosArrayPush(pBuf->pData, &p);
6✔
134
    TSDB_CHECK_NULL(px, code, lino, _end, terrno);
6!
135
    p = NULL;
6✔
136
  }
137

138
  pBuf->numOfTables = numOfTables;
6✔
139

140
_end:
6✔
141
  if (code != TSDB_CODE_SUCCESS) {
6!
142
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
143
  }
144
  if (p) {
6!
145
    taosMemoryFreeClear(p);
×
146
  }
147
  return code;
6✔
148
}
149

150
void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
4,686,568✔
151
  if (pBuf == NULL) return;
4,686,568!
152
  if (pBuf->pData != NULL) {
4,686,568✔
153
    taosArrayDestroyP(pBuf->pData, NULL);
4,629,802✔
154
    pBuf->pData = NULL;
4,630,450✔
155
  }
156
}
157

158
int32_t getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index, STableBlockScanInfo** pInfo) {
11,808,703✔
159
  int32_t               code = TSDB_CODE_SUCCESS;
11,808,703✔
160
  int32_t               lino = 0;
11,808,703✔
161
  int32_t               bucketIndex = 0;
11,808,703✔
162
  STableBlockScanInfo** pBucket = NULL;
11,808,703✔
163

164
  TSDB_CHECK_CONDITION(pBuf && pBuf->numPerBucket > 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
11,808,703!
165
  TSDB_CHECK_CONDITION(index >= 0 && index < pBuf->numOfTables, code, lino, _end, TSDB_CODE_INVALID_PARA);
11,808,703!
166
  TSDB_CHECK_NULL(pInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
11,808,703!
167

168
  *pInfo = NULL;
11,808,703✔
169

170
  bucketIndex = index / pBuf->numPerBucket;
11,808,703✔
171
  pBucket = taosArrayGet(pBuf->pData, bucketIndex);
11,808,703✔
172
  TSDB_CHECK_NULL(pBucket, code, lino, _end, terrno);
11,801,945!
173

174
  *pInfo = (*pBucket) + (index % pBuf->numPerBucket);
11,809,030✔
175

176
_end:
11,809,030✔
177
  if (code != TSDB_CODE_SUCCESS) {
11,809,030!
178
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
179
  }
180
  return code;
11,807,865✔
181
}
182

183
int32_t getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, STableBlockScanInfo** pInfo, const char* id) {
13,126,880✔
184
  int32_t               code = TSDB_CODE_SUCCESS;
13,126,880✔
185
  int32_t               lino = 0;
13,126,880✔
186
  STableBlockScanInfo** pVal = NULL;
13,126,880✔
187

188
  TSDB_CHECK_NULL(pInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
13,126,880!
189
  TSDB_CHECK_NULL(id, code, lino, _end, TSDB_CODE_INVALID_PARA);
13,126,880!
190

191
  pVal = (STableBlockScanInfo**)tSimpleHashGet(pTableMap, &uid, sizeof(uid));
13,126,880✔
192
  if (pVal == NULL) {
13,126,708!
193
    int32_t size = tSimpleHashGetSize(pTableMap);
×
194
    tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
×
195
    code = TSDB_CODE_INVALID_PARA;
×
196
    TSDB_CHECK_CODE(code, lino, _end);
×
197
  }
198
  *pInfo = *pVal;
13,126,596✔
199
  TSDB_CHECK_NULL(*pInfo, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
13,126,596!
200

201
_end:
13,126,596✔
202
  if (code != TSDB_CODE_SUCCESS) {
13,126,596!
203
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
204
  }
205
  return code;
13,126,620✔
206
}
207

208
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc) {
51,617,327✔
209
  int32_t code = TSDB_CODE_SUCCESS;
51,617,327✔
210
  int32_t lino = 0;
51,617,327✔
211

212
  TSDB_CHECK_NULL(pKey, code, lino, _end, TSDB_CODE_INVALID_PARA);
51,617,327!
213

214
  pKey->numOfPKs = numOfPks;
51,617,327✔
215
  pKey->ts = ts;
51,617,327✔
216

217
  if (numOfPks > 0) {
51,617,327✔
218
    pKey->pks[0].type = type;
1,322,358✔
219

220
    if (IS_NUMERIC_TYPE(type)) {
1,322,358!
221
      if (asc) {
1,322,183✔
222
        switch (type) {
1,038,164!
223
          case TSDB_DATA_TYPE_BIGINT: {
1,036,587✔
224
            VALUE_SET_TRIVIAL_DATUM(pKey->pks, INT64_MIN);
1,036,587✔
225
            break;
1,036,587✔
226
          }
227
          case TSDB_DATA_TYPE_INT: {
1,575✔
228
            int32_t min = INT32_MIN;
1,575✔
229
            valueSetDatum(pKey->pks, type, &min, tDataTypes[type].bytes);
1,575✔
230
            break;
1,575✔
231
          }
232
          case TSDB_DATA_TYPE_SMALLINT: {
×
233
            int16_t min = INT16_MIN;
×
234
            valueSetDatum(pKey->pks, type, &min, tDataTypes[type].bytes);
×
235
            break;
×
236
          }
237
          case TSDB_DATA_TYPE_TINYINT: {
×
238
            int8_t min = INT8_MIN;
×
239
            valueSetDatum(pKey->pks, type, &min, tDataTypes[type].bytes);
×
240
            break;
×
241
          }
242
          case TSDB_DATA_TYPE_UTINYINT:
×
243
          case TSDB_DATA_TYPE_USMALLINT:
244
          case TSDB_DATA_TYPE_UINT:
245
          case TSDB_DATA_TYPE_UBIGINT: {
246
            VALUE_SET_TRIVIAL_DATUM(pKey->pks, 0);
×
247
            break;
×
248
          }
249
          default:
2✔
250
            code = TSDB_CODE_INVALID_PARA;
2✔
251
            TSDB_CHECK_CODE(code, lino, _end);
2!
252
        }
253
      } else {
254
        switch (type) {
284,019!
255
          case TSDB_DATA_TYPE_BIGINT:
283,810✔
256
            VALUE_SET_TRIVIAL_DATUM(pKey->pks, INT64_MAX);
283,810✔
257
            break;
283,810✔
258
          case TSDB_DATA_TYPE_INT:
239✔
259
            VALUE_SET_TRIVIAL_DATUM(pKey->pks, INT32_MAX);
239✔
260
            break;
239✔
261
          case TSDB_DATA_TYPE_SMALLINT:
×
262
            VALUE_SET_TRIVIAL_DATUM(pKey->pks, INT16_MAX);
×
263
            break;
×
264
          case TSDB_DATA_TYPE_TINYINT:
×
265
            VALUE_SET_TRIVIAL_DATUM(pKey->pks, INT8_MAX);
×
266
            break;
×
267
          case TSDB_DATA_TYPE_UBIGINT:
×
268
            VALUE_SET_TRIVIAL_DATUM(pKey->pks, UINT64_MAX);
×
269
            break;
×
270
          case TSDB_DATA_TYPE_UINT:
×
271
            VALUE_SET_TRIVIAL_DATUM(pKey->pks, UINT32_MAX);
×
272
            break;
×
273
          case TSDB_DATA_TYPE_USMALLINT:
×
274
            VALUE_SET_TRIVIAL_DATUM(pKey->pks, UINT16_MAX);
×
275
            break;
×
276
          case TSDB_DATA_TYPE_UTINYINT:
×
277
            VALUE_SET_TRIVIAL_DATUM(pKey->pks, UINT8_MAX);
×
278
            break;
×
279
          default:
×
280
            code = TSDB_CODE_INVALID_PARA;
×
281
            TSDB_CHECK_CODE(code, lino, _end);
×
282
        }
283
      }
284
    } else {
285
      pKey->pks[0].nData = 0;
175✔
286
      pKey->pks[0].pData = taosMemoryCalloc(1, len);
175!
287
      TSDB_CHECK_NULL(pKey->pks[0].pData, code, lino, _end, terrno);
175!
288

289
      if (!asc) {
175✔
290
        pKey->numOfPKs = 2;
36✔
291
      }
292
    }
293
  }
294

295
_end:
50,295,108✔
296
  if (code != TSDB_CODE_SUCCESS) {
51,617,327!
297
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
298
  }
299
  return code;
51,618,185✔
300
}
301

302
void clearRowKey(SRowKey* pKey) {
51,819,875✔
303
  if (pKey == NULL || pKey->numOfPKs == 0 || (!IS_VAR_DATA_TYPE(pKey->pks[0].type))) {
51,819,875!
304
    return;
51,820,575✔
305
  }
306
  taosMemoryFreeClear(pKey->pks[0].pData);
×
307
}
308

309
static int32_t initLastProcKey(STableBlockScanInfo* pScanInfo, const STsdbReader* pReader) {
11,804,044✔
310
  int32_t  code = TSDB_CODE_SUCCESS;
11,804,044✔
311
  int32_t  lino = 0;
11,804,044✔
312
  int32_t  numOfPks = 0;
11,804,044✔
313
  int32_t  type = 0;
11,804,044✔
314
  int32_t  bytes = 0;
11,804,044✔
315
  bool     asc = false;
11,804,044✔
316
  SRowKey* pRowKey = NULL;
11,804,044✔
317

318
  TSDB_CHECK_NULL(pScanInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
11,804,044!
319
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
11,804,044!
320

321
  numOfPks = pReader->suppInfo.numOfPks;
11,804,044✔
322
  asc = ASCENDING_TRAVERSE(pReader->info.order);
11,804,044✔
323
  type = pReader->suppInfo.pk.type;
11,804,044✔
324
  bytes = pReader->suppInfo.pk.bytes;
11,804,044✔
325

326
  pRowKey = &pScanInfo->lastProcKey;
11,804,044✔
327
  if (asc) {
11,804,044✔
328
    int64_t skey = pReader->info.window.skey;
9,537,631✔
329
    int64_t ts = (skey > INT64_MIN) ? (skey - 1) : skey;
9,537,631✔
330

331
    code = initRowKey(pRowKey, ts, numOfPks, type, bytes, asc);
9,537,631✔
332
    TSDB_CHECK_CODE(code, lino, _end);
9,534,093!
333

334
    code = initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, type, bytes, asc);
9,534,093✔
335
    TSDB_CHECK_CODE(code, lino, _end);
9,528,345!
336
  } else {
337
    int64_t ekey = pReader->info.window.ekey;
2,266,413✔
338
    int64_t ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
2,266,413✔
339

340
    code = initRowKey(pRowKey, ts, numOfPks, type, bytes, asc);
2,266,413✔
341
    TSDB_CHECK_CODE(code, lino, _end);
2,267,687!
342

343
    code = initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, type, bytes, asc);
2,267,687✔
344
    TSDB_CHECK_CODE(code, lino, _end);
2,266,833!
345
  }
346

347
  code = initRowKey(&pScanInfo->sttRange.skey, INT64_MAX, numOfPks, type, bytes, asc);
11,795,178✔
348
  TSDB_CHECK_CODE(code, lino, _end);
11,786,625!
349

350
  code = initRowKey(&pScanInfo->sttRange.ekey, INT64_MIN, numOfPks, type, bytes, asc);
11,786,625✔
351
  TSDB_CHECK_CODE(code, lino, _end);
11,784,986!
352

353
_end:
11,784,986✔
354
  if (code != TSDB_CODE_SUCCESS) {
11,784,986!
355
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
356
  }
357
  return code;
11,786,041✔
358
}
359

360
int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSHashObj* pTableMap,
11,805,464✔
361
                               STsdbReader* pReader) {
362
  int32_t code = TSDB_CODE_SUCCESS;
11,805,464✔
363
  int32_t lino = 0;
11,805,464✔
364

365
  TSDB_CHECK_NULL(pScanInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
11,805,464!
366
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
11,805,464!
367

368
  pScanInfo->uid = uid;
11,805,464✔
369
  INIT_KEYRANGE(&pScanInfo->sttRange);
11,805,464✔
370
  INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
11,805,464✔
371

372
  pScanInfo->cleanSttBlocks = false;
11,805,464✔
373
  pScanInfo->sttBlockReturned = false;
11,805,464✔
374

375
  code = initLastProcKey(pScanInfo, pReader);
11,805,464✔
376
  TSDB_CHECK_CODE(code, lino, _end);
11,787,400!
377

378
  pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
11,787,400✔
379
  code = tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
11,787,400✔
380
  TSDB_CHECK_CODE(code, lino, _end);
11,816,148!
381

382
  tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pReader, pScanInfo->uid,
11,816,148✔
383
            pScanInfo->lastProcKey.ts, pReader->idStr);
384

385
_end:
11,734,057✔
386
  if (code != TSDB_CODE_SUCCESS) {
11,816,160!
387
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
388
  }
389
  return code;
11,816,829✔
390
}
391

392
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
393
int32_t createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
4,618,090✔
394
                                STableUidList* pUidList, int32_t numOfTables, SSHashObj** pHashObj) {
395
  int32_t    code = TSDB_CODE_SUCCESS;
4,618,090✔
396
  int32_t    lino = 0;
4,618,090✔
397
  SSHashObj* pTableMap = NULL;
4,618,090✔
398
  int64_t    st = 0;
4,618,090✔
399

400
  TSDB_CHECK_NULL(pUidList, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,618,090!
401
  TSDB_CHECK_NULL(pHashObj, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,618,090!
402

403
  *pHashObj = NULL;
4,618,090✔
404

405
  // allocate buffer in order to load data blocks from file
406
  // todo use simple hash instead, optimize the memory consumption
407
  pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
4,618,090✔
408
  TSDB_CHECK_NULL(pTableMap, code, lino, _end, terrno);
4,629,373!
409

410
  st = taosGetTimestampUs();
4,628,159✔
411
  code = initBlockScanInfoBuf(pBuf, numOfTables);
4,628,159✔
412
  if (code != TSDB_CODE_SUCCESS) {
4,624,561!
413
    tSimpleHashCleanup(pTableMap);
×
414
    TSDB_CHECK_CODE(code, lino, _end);
×
415
  }
416

417
  pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
4,624,561!
418
  if (pUidList->tableUidList == NULL) {
4,621,046!
419
    tSimpleHashCleanup(pTableMap);
×
420
    TSDB_CHECK_NULL(pUidList->tableUidList, code, lino, _end, terrno);
×
421
  }
422

423
  pUidList->currentIndex = 0;
4,621,046✔
424

425
  for (int32_t j = 0; j < numOfTables; ++j) {
16,417,623✔
426
    pUidList->tableUidList[j] = idList[j].uid;
11,789,151✔
427

428
    STableBlockScanInfo* pScanInfo = NULL;
11,789,151✔
429
    code = getPosInBlockInfoBuf(pBuf, j, &pScanInfo);
11,789,151✔
430
    if (code != TSDB_CODE_SUCCESS) {
11,783,837!
431
      lino = __LINE__;
×
432
      break;
×
433
    }
434

435
    code = initTableBlockScanInfo(pScanInfo, idList[j].uid, pTableMap, pTsdbReader);
11,783,837✔
436
    if (code != TSDB_CODE_SUCCESS) {
11,796,577!
437
      lino = __LINE__;
×
438
      break;
×
439
    }
440
  }
441

442
  taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
4,628,472✔
443

444
  pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
4,625,635✔
445
  tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
4,625,635✔
446
            (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
447
            pTsdbReader->idStr);
448

449
  *pHashObj = pTableMap;
4,625,024✔
450

451
_end:
4,625,024✔
452
  if (code != TSDB_CODE_SUCCESS) {
4,625,024!
453
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
454
  }
455
  return code;
4,625,717✔
456
}
457

458
void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
36,587✔
459
  STableBlockScanInfo** p = NULL;
36,587✔
460
  int32_t               iter = 0;
36,587✔
461

462
  while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
73,265✔
463
    STableBlockScanInfo* pInfo = *p;
36,676✔
464
    if (pInfo == NULL) {
36,676!
465
      continue;
×
466
    }
467

468
    pInfo->iterInit = false;
36,676✔
469
    pInfo->iter.hasVal = false;
36,676✔
470
    pInfo->iiter.hasVal = false;
36,676✔
471

472
    if (pInfo->iter.iter != NULL) {
36,676✔
473
      pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
16,277✔
474
    }
475

476
    if (pInfo->iiter.iter != NULL) {
36,676✔
477
      pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
14✔
478
    }
479

480
    taosArrayDestroy(pInfo->delSkyline);
36,676✔
481
    pInfo->delSkyline = NULL;
36,678✔
482
    pInfo->lastProcKey.ts = ts;
36,678✔
483
    // todo check the nextProcKey info
484
    pInfo->sttKeyInfo.nextProcKey.ts = ts + step;
36,678✔
485
  }
486
}
36,582✔
487

488
void clearBlockScanInfo(STableBlockScanInfo* p) {
11,828,262✔
489
  if (p == NULL) {
11,828,262!
490
    return;
×
491
  }
492

493
  p->iterInit = false;
11,828,262✔
494
  p->iter.hasVal = false;
11,828,262✔
495
  p->iiter.hasVal = false;
11,828,262✔
496
  p->sttKeyInfo.status = STT_FILE_READER_UNINIT;
11,828,262✔
497

498
  if (p->iter.iter != NULL) {
11,828,262✔
499
    p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
4,072,777✔
500
  }
501

502
  if (p->iiter.iter != NULL) {
11,829,189✔
503
    p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
33,435✔
504
  }
505

506
  taosArrayDestroy(p->delSkyline);
11,829,189✔
507
  p->delSkyline = NULL;
11,828,913✔
508
  taosArrayDestroy(p->pBlockList);
11,828,913✔
509
  p->pBlockList = NULL;
11,828,956✔
510
  taosArrayDestroy(p->pBlockIdxList);
11,828,956✔
511
  p->pBlockIdxList = NULL;
11,829,101✔
512
  taosArrayDestroy(p->pMemDelData);
11,829,101✔
513
  p->pMemDelData = NULL;
11,830,681✔
514
  taosArrayDestroy(p->pFileDelData);
11,830,681✔
515
  p->pFileDelData = NULL;
11,830,650✔
516

517
  clearRowKey(&p->lastProcKey);
11,830,650✔
518
  clearRowKey(&p->sttRange.skey);
11,829,727✔
519
  clearRowKey(&p->sttRange.ekey);
11,829,487✔
520
  clearRowKey(&p->sttKeyInfo.nextProcKey);
11,829,341✔
521
}
522

523
void destroyAllBlockScanInfo(SSHashObj** pTableMap) {
4,628,951✔
524
  STableBlockScanInfo** p = NULL;
4,628,951✔
525
  int32_t               iter = 0;
4,628,951✔
526

527
  if (pTableMap == NULL || *pTableMap == NULL) {
4,628,951!
528
    return;
×
529
  }
530

531
  while ((p = tSimpleHashIterate(*pTableMap, p, &iter)) != NULL) {
16,439,392✔
532
    clearBlockScanInfo(*p);
11,808,874✔
533
  }
534

535
  tSimpleHashCleanup(*pTableMap);
4,629,354✔
536
  *pTableMap = NULL;
4,630,561✔
537
}
538

539
static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
11,359,003✔
540
  if (pScanInfo == NULL) {
11,359,003!
541
    return;
×
542
  }
543
  // reset the index in last block when handing a new file
544
  taosArrayClear(pScanInfo->pBlockList);
11,359,003✔
545
  taosArrayClear(pScanInfo->pBlockIdxList);
11,356,741✔
546
  taosArrayClear(pScanInfo->pFileDelData);  // del data from each file set
11,357,269✔
547
  pScanInfo->cleanSttBlocks = false;
11,357,465✔
548
  pScanInfo->numOfRowsInStt = 0;
11,357,465✔
549
  pScanInfo->sttBlockReturned = false;
11,357,465✔
550
  INIT_KEYRANGE(&pScanInfo->sttRange);
11,357,465✔
551
  INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
11,357,465✔
552
  pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
11,357,465✔
553
}
554

555
void cleanupInfoForNextFileset(SSHashObj* pTableMap) {
5,095,982✔
556
  STableBlockScanInfo** p = NULL;
5,095,982✔
557
  int32_t               iter = 0;
5,095,982✔
558

559
  while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
16,452,691✔
560
    doCleanupInfoForNextFileset(*p);
11,359,489✔
561
  }
562
}
5,092,420✔
563

564
// brin records iterator
565
void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList) {
5,098,085✔
566
  if (pIter == NULL) {
5,098,085!
567
    return;
×
568
  }
569

570
  (void)memset(&pIter->block, 0, sizeof(SBrinBlock));
5,098,085✔
571
  (void)memset(&pIter->record, 0, sizeof(SBrinRecord));
5,098,085✔
572
  pIter->blockIndex = -1;
5,098,085✔
573
  pIter->recordIndex = -1;
5,098,085✔
574

575
  pIter->pReader = pReader;
5,098,085✔
576
  pIter->pBrinBlockList = pList;
5,098,085✔
577
}
578

579
int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) {
14,622,179✔
580
  int32_t code = TSDB_CODE_SUCCESS;
14,622,179✔
581
  int32_t lino = 0;
14,622,179✔
582

583
  TSDB_CHECK_NULL(pIter, code, lino, _end, TSDB_CODE_INVALID_PARA);
14,622,179!
584
  TSDB_CHECK_NULL(pRecord, code, lino, _end, TSDB_CODE_INVALID_PARA);
14,622,179!
585

586
  *pRecord = NULL;
14,622,179✔
587

588
  if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= pIter->block.numOfRecords) {
14,622,179✔
589
    pIter->blockIndex += 1;
5,234,914✔
590
    if (pIter->blockIndex >= taosArrayGetSize(pIter->pBrinBlockList)) {
5,234,914✔
591
      goto _end;
5,050,969✔
592
    }
593

594
    pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex);
183,810✔
595
    TSDB_CHECK_NULL(pIter->pCurrentBlk, code, lino, _end, terrno);
184,645!
596

597
    tBrinBlockClear(&pIter->block);
184,645✔
598
    TSDB_CHECK_NULL(pIter->pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
184,708!
599
    code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
184,708✔
600
    if (code != TSDB_CODE_SUCCESS) {
184,690!
601
      tsdbError("failed to read brinBlock from file, code:%s", tstrerror(code));
×
602
      TSDB_CHECK_CODE(code, lino, _end);
3!
603
    }
604

605
    pIter->recordIndex = -1;
184,693✔
606
  }
607

608
  pIter->recordIndex += 1;
9,571,958✔
609
  code = tBrinBlockGet(&pIter->block, pIter->recordIndex, &pIter->record);
9,571,958✔
610
  *pRecord = &pIter->record;
9,571,259✔
611
  TSDB_CHECK_CODE(code, lino, _end);
9,571,259!
612

613
_end:
9,571,259✔
614
  if (code != TSDB_CODE_SUCCESS) {
14,622,228!
615
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
616
  }
617
  return code;
14,619,183✔
618
}
619

620
void clearBrinBlockIter(SBrinRecordIter* pIter) {
5,093,849✔
621
  if (pIter != NULL) {
5,093,849!
622
    tBrinBlockDestroy(&pIter->block);
5,094,419✔
623
  }
624
}
5,091,969✔
625

626
// initialize the file block access order
627
//  sort the file blocks according to the offset of each data block in the files
628
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
160,622✔
629
  if (pSup == NULL) {
160,622!
630
    return;
×
631
  }
632

633
  taosMemoryFreeClear(pSup->numOfBlocksPerTable);
160,622!
634
  taosMemoryFreeClear(pSup->indexPerTable);
160,672!
635

636
  if (pSup->pDataBlockInfo != NULL) {
160,675!
637
    for (int32_t i = 0; i < pSup->numOfTables; ++i) {
553,814✔
638
      SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
393,103✔
639
      taosMemoryFreeClear(pBlockInfo);
393,103!
640
    }
641

642
    taosMemoryFreeClear(pSup->pDataBlockInfo);
160,711!
643
  }
644
}
645

646
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
160,607✔
647
  int32_t code = TSDB_CODE_SUCCESS;
160,607✔
648
  int32_t lino = 0;
160,607✔
649

650
  TSDB_CHECK_NULL(pSup, code, lino, _end, TSDB_CODE_INVALID_PARA);
160,607!
651
  TSDB_CHECK_CONDITION(numOfTables >= 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
160,607!
652

653
  pSup->pDataBlockInfo = taosMemoryCalloc(numOfTables, POINTER_BYTES);
160,607!
654
  TSDB_CHECK_NULL(pSup->pDataBlockInfo, code, lino, _end, terrno);
160,694!
655
  pSup->indexPerTable = taosMemoryCalloc(numOfTables, sizeof(int32_t));
160,694!
656
  TSDB_CHECK_NULL(pSup->indexPerTable, code, lino, _end, terrno);
160,714!
657
  pSup->numOfBlocksPerTable = taosMemoryCalloc(numOfTables, sizeof(int32_t));
160,714!
658
  TSDB_CHECK_NULL(pSup->numOfBlocksPerTable, code, lino, _end, terrno);
160,735!
659
  pSup->numOfTables = 0;
160,735✔
660

661
_end:
160,735✔
662
  if (code != TSDB_CODE_SUCCESS) {
160,735!
663
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
664
  }
665
  return code;
160,684✔
666
}
667

668
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
2,607,286✔
669
  int32_t                     leftIndex = 0;
2,607,286✔
670
  int32_t                     rightIndex = 0;
2,607,286✔
671
  int32_t                     leftTableBlockIndex = 0;
2,607,286✔
672
  int32_t                     rightTableBlockIndex = 0;
2,607,286✔
673
  const SBlockOrderSupporter* pSupporter = NULL;
2,607,286✔
674
  const SBlockOrderWrapper*   pLeftBlock = NULL;
2,607,286✔
675
  const SBlockOrderWrapper*   pRightBlock = NULL;
2,607,286✔
676

677
  leftIndex = *(const int32_t*)pLeft;
2,607,286✔
678
  rightIndex = *(const int32_t*)pRight;
2,607,286✔
679
  pSupporter = (const SBlockOrderSupporter*)param;
2,607,286✔
680

681
  leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
2,607,286✔
682
  rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
2,607,286✔
683

684
  if (leftTableBlockIndex >= pSupporter->numOfBlocksPerTable[leftIndex]) {
2,607,286✔
685
    /* left block is empty */
686
    return 1;
683,884✔
687
  } else if (rightTableBlockIndex >= pSupporter->numOfBlocksPerTable[rightIndex]) {
1,923,402✔
688
    /* right block is empty */
689
    return -1;
232,391✔
690
  }
691

692
  pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
1,691,011✔
693
  pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
1,691,011✔
694

695
  return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
1,691,011✔
696
}
697

698
int32_t recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) {
5,319,743✔
699
  int32_t        code = TSDB_CODE_SUCCESS;
5,319,743✔
700
  int32_t        lino = 0;
5,319,743✔
701
  const SRowKey* pFirstKey = NULL;
5,319,743✔
702
  const SRowKey* pLastKey = NULL;
5,319,743✔
703

704
  TSDB_CHECK_NULL(pBlockInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
5,319,743!
705
  TSDB_CHECK_NULL(record, code, lino, _end, TSDB_CODE_INVALID_PARA);
5,319,743!
706

707
  pBlockInfo->uid = record->uid;
5,319,743✔
708
  pBlockInfo->firstKey = record->firstKey.key.ts;
5,319,743✔
709
  pBlockInfo->lastKey = record->lastKey.key.ts;
5,319,743✔
710
  pBlockInfo->minVer = record->minVer;
5,319,743✔
711
  pBlockInfo->maxVer = record->maxVer;
5,319,743✔
712
  pBlockInfo->blockOffset = record->blockOffset;
5,319,743✔
713
  pBlockInfo->smaOffset = record->smaOffset;
5,319,743✔
714
  pBlockInfo->blockSize = record->blockSize;
5,319,743✔
715
  pBlockInfo->blockKeySize = record->blockKeySize;
5,319,743✔
716
  pBlockInfo->smaSize = record->smaSize;
5,319,743✔
717
  pBlockInfo->numRow = record->numRow;
5,319,743✔
718
  pBlockInfo->count = record->count;
5,319,743✔
719

720
  pFirstKey = &record->firstKey.key;
5,319,743✔
721
  pLastKey = &record->lastKey.key;
5,319,743✔
722
  TSDB_CHECK_CONDITION((pFirstKey->numOfPKs == pLastKey->numOfPKs), code, lino, _end, TSDB_CODE_INVALID_PARA);
5,319,743!
723
  if (pFirstKey->numOfPKs > 0) {
5,319,743✔
724
    if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) {
172,096!
725
      pBlockInfo->firstPk.val = VALUE_GET_TRIVIAL_DATUM(pFirstKey->pks);
172,096✔
726
      pBlockInfo->lastPk.val = VALUE_GET_TRIVIAL_DATUM(pLastKey->pks);
172,096✔
727
    } else {
728
      int32_t keyLen = pFirstKey->pks[0].nData;
×
729
      char*   p = taosMemoryMalloc(keyLen + VARSTR_HEADER_SIZE);
×
730
      TSDB_CHECK_NULL(p, code, lino, _end, terrno);
×
731
      memcpy(varDataVal(p), pFirstKey->pks[0].pData, keyLen);
×
732
      varDataSetLen(p, keyLen);
×
733
      pBlockInfo->firstPk.pData = (uint8_t*)p;
×
734

735
      keyLen = pLastKey->pks[0].nData;
×
736
      p = taosMemoryMalloc(keyLen + VARSTR_HEADER_SIZE);
×
737
      TSDB_CHECK_NULL(p, code, lino, _end, terrno);
×
738
      memcpy(varDataVal(p), pLastKey->pks[0].pData, keyLen);
×
739
      varDataSetLen(p, keyLen);
×
740
      pBlockInfo->lastPk.pData = (uint8_t*)p;
×
741
    }
742
  }
743

744
_end:
5,147,647✔
745
  if (code != TSDB_CODE_SUCCESS) {
5,319,743!
746
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
747
  }
748
  return code;
5,320,286✔
749
}
750

751
static void freePkItem(void* pItem) {
×
752
  SFileDataBlockInfo* p = pItem;
×
753
  if (p != NULL) {
×
754
    taosMemoryFreeClear(p->firstPk.pData);
×
755
    taosMemoryFreeClear(p->lastPk.pData);
×
756
  }
757
}
×
758

759
void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree) {
5,273,904✔
760
  if (pIter == NULL) {
5,273,904!
761
    return;
×
762
  }
763

764
  pIter->index = -1;
5,273,904✔
765
  pIter->numOfBlocks = 0;
5,273,904✔
766

767
  if (needFree) {
5,273,904!
768
    taosArrayClearEx(pIter->blockList, freePkItem);
×
769
  } else {
770
    taosArrayClear(pIter->blockList);
5,273,904✔
771
  }
772
}
773

774
void cleanupDataBlockIterator(SDataBlockIter* pIter, bool needFree) {
4,686,030✔
775
  if (pIter == NULL) {
4,686,030!
776
    return;
×
777
  }
778

779
  pIter->index = -1;
4,686,030✔
780
  pIter->numOfBlocks = 0;
4,686,030✔
781
  if (needFree) {
4,686,030✔
782
    taosArrayDestroyEx(pIter->blockList, freePkItem);
33✔
783
  } else {
784
    taosArrayDestroy(pIter->blockList);
4,685,997✔
785
  }
786
  pIter->blockList = NULL;
4,687,548✔
787
}
788

789
int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) {
160,652✔
790
  int32_t                   code = TSDB_CODE_SUCCESS;
160,652✔
791
  int32_t                   lino = 0;
160,652✔
792
  bool                      asc = false;
160,652✔
793
  int32_t                   numOfTables = 0;
160,652✔
794
  int64_t                   st = 0;
160,652✔
795
  int64_t                   et = 0;
160,652✔
796
  int32_t                   cnt = 0;
160,652✔
797
  SBlockOrderSupporter      sup = {0};
160,652✔
798
  SMultiwayMergeTreeInfo*   pTree = NULL;
160,652✔
799
  STableBlockScanInfo*      pTableScanInfo = NULL;
160,652✔
800
  const SFileDataBlockInfo* pBlockInfo = NULL;
160,652✔
801
  const void*               px = NULL;
160,652✔
802

803
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
160,652!
804
  TSDB_CHECK_NULL(pBlockIter, code, lino, _end, TSDB_CODE_INVALID_PARA);
160,652!
805
  TSDB_CHECK_CONDITION(numOfBlocks >= 0, code, lino, _end, TSDB_CODE_INVALID_PARA);
160,652!
806

807
  asc = ASCENDING_TRAVERSE(pReader->info.order);
160,652✔
808
  clearDataBlockIterator(pBlockIter, shouldFreePkBuf(&pReader->suppInfo));
160,652✔
809
  pBlockIter->numOfBlocks = numOfBlocks;
160,643✔
810

811
  // access data blocks according to the offset of each block in asc/desc order.
812
  numOfTables = taosArrayGetSize(pTableList);
160,643✔
813

814
  st = taosGetTimestampUs();
160,668✔
815
  code = initBlockOrderSupporter(&sup, numOfTables);
160,668✔
816
  TSDB_CHECK_CODE(code, lino, _end);
160,685!
817

818
  for (int32_t i = 0; i < numOfTables; ++i) {
553,718✔
819
    pTableScanInfo = taosArrayGetP(pTableList, i);
392,950✔
820

821
    size_t              num = taosArrayGetSize(pTableScanInfo->pBlockList);
392,994✔
822
    SBlockOrderWrapper* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
392,997✔
823
    TSDB_CHECK_NULL(buf, code, lino, _end, terrno);
393,048!
824
    sup.numOfBlocksPerTable[sup.numOfTables] = num;
393,048✔
825
    sup.pDataBlockInfo[sup.numOfTables] = buf;
393,048✔
826
    sup.numOfTables++;
393,048✔
827

828
    for (int32_t k = 0; k < num; ++k) {
5,713,380✔
829
      pBlockInfo = taosArrayGet(pTableScanInfo->pBlockList, k);
5,320,347✔
830
      TSDB_CHECK_NULL(pBlockInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
5,320,332!
831

832
      sup.pDataBlockInfo[i][k] =
5,320,332✔
833
          (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pBlockInfo->blockOffset, .pInfo = pTableScanInfo};
5,320,332✔
834
      cnt++;
5,320,332✔
835
    }
836
  }
837

838
  TSDB_CHECK_CONDITION(!(numOfBlocks != cnt && sup.numOfTables != numOfTables), code, lino, _end,
160,768!
839
                       TSDB_CODE_INVALID_PARA);
840

841
  // since there is only one table qualified, blocks are not sorted
842
  if (sup.numOfTables == 1) {
160,768✔
843
    pTableScanInfo = taosArrayGetP(pTableList, 0);
103,143✔
844
    for (int32_t i = 0; i < numOfBlocks; ++i) {
5,010,866✔
845
      STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i};
4,907,686✔
846
      px = taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
4,907,686✔
847
      TSDB_CHECK_NULL(px, code, lino, _end, terrno);
4,907,705!
848
    }
849

850
    px = taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList);
103,180✔
851
    TSDB_CHECK_NULL(px, code, lino, _end, terrno);
103,142!
852

853
    taosArrayDestroy(pTableScanInfo->pBlockList);
103,142✔
854
    pTableScanInfo->pBlockList = NULL;
103,168✔
855

856
    et = taosGetTimestampUs();
103,159✔
857
    tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
103,159✔
858
              pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
859

860
    pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
103,128✔
861
    goto _end;
103,128✔
862
  }
863

864
  tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
57,625✔
865
            pReader->idStr);
866

867
  code = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
57,625✔
868
  TSDB_CHECK_CODE(code, lino, _end);
57,546!
869

870
  for (int32_t i = 0; i < cnt; ++i) {
470,018✔
871
    int32_t pos = tMergeTreeGetChosenIndex(pTree);
412,468✔
872
    int32_t index = sup.indexPerTable[pos]++;
412,468✔
873
    pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo;
412,468✔
874

875
    pBlockInfo = taosArrayGet(pTableScanInfo->pBlockList, index);
412,468✔
876
    TSDB_CHECK_NULL(pBlockInfo, code, lino, _end, terrno);
412,466!
877

878
    px = taosArrayPush(pBlockIter->blockList, pBlockInfo);
412,466✔
879
    TSDB_CHECK_NULL(px, code, lino, _end, terrno);
412,391!
880

881
    STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i};
412,391✔
882
    px = taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
412,391✔
883
    TSDB_CHECK_NULL(px, code, lino, _end, terrno);
412,367!
884

885
    // set data block index overflow, in order to disable the offset comparator
886
    if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
412,367✔
887
      sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
289,910✔
888
    }
889

890
    code = tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
412,367✔
891
    TSDB_CHECK_CODE(code, lino, _end);
412,472!
892
  }
893

894
  for (int32_t i = 0; i < numOfTables; ++i) {
347,517✔
895
    pTableScanInfo = taosArrayGetP(pTableList, i);
289,967✔
896
    taosArrayDestroy(pTableScanInfo->pBlockList);
289,965✔
897
    pTableScanInfo->pBlockList = NULL;
289,967✔
898
  }
899

900
  et = taosGetTimestampUs();
57,550✔
901
  tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
57,550✔
902
            (et - st) / 1000.0, pReader->idStr);
903

904
  pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
57,550✔
905

906
_end:
160,678✔
907
  if (code != TSDB_CODE_SUCCESS) {
160,678!
908
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
909
  }
910
  cleanupBlockOrderSupporter(&sup);
160,678✔
911
  if (pTree != NULL) {
160,655✔
912
    tMergeTreeDestroy(&pTree);
57,550✔
913
  }
914
  return code;
160,657✔
915
}
916

917
bool blockIteratorNext(SDataBlockIter* pBlockIter) {
5,312,532✔
918
  bool asc = false;
5,312,532✔
919

920
  if (pBlockIter == NULL) {
5,312,532!
921
    return false;
×
922
  }
923

924
  asc = ASCENDING_TRAVERSE(pBlockIter->order);
5,312,532✔
925
  if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
5,312,532✔
926
    return false;
159,506✔
927
  }
928
  pBlockIter->index += asc ? 1 : -1;
5,153,026✔
929
  return true;
5,153,026✔
930
}
931

932
typedef enum {
933
  BLK_CHECK_CONTINUE = 0x1,
934
  BLK_CHECK_QUIT = 0x2,
935
} ETombBlkCheckEnum;
936

937
static int32_t loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
938
                                       const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i,
939
                                       int32_t* j);
940
static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j,
120,475✔
941
                                ETombBlkCheckEnum* pRet) {
942
  int32_t              code = TSDB_CODE_SUCCESS;
120,475✔
943
  int32_t              lino = 0;
120,475✔
944
  STombRecord          record = {0};
120,475✔
945
  uint64_t             uid = 0;
120,475✔
946
  STableBlockScanInfo* pScanInfo = NULL;
120,475✔
947

948
  TSDB_CHECK_NULL(pBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
120,475!
949
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
120,475!
950
  TSDB_CHECK_NULL(pRet, code, lino, _end, TSDB_CODE_INVALID_PARA);
120,475!
951

952
  *pRet = BLK_CHECK_QUIT;
120,475✔
953
  uid = pReader->status.uidList.tableUidList[*j];
120,475✔
954
  code = getTableBlockScanInfo(pReader->status.pTableMap, uid, &pScanInfo, pReader->idStr);
120,475✔
955
  TSDB_CHECK_CODE(code, lino, _end);
120,473!
956

957
  if (pScanInfo->pFileDelData == NULL) {
120,473✔
958
    pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
49,989✔
959
    TSDB_CHECK_NULL(pScanInfo->pFileDelData, code, lino, _end, terrno);
49,989!
960
  }
961

962
  for (int32_t k = 0; k < pBlock->numOfRecords; ++k) {
52,563,166✔
963
    code = tTombBlockGet(pBlock, k, &record);
52,536,951✔
964
    TSDB_CHECK_CODE(code, lino, _end);
52,530,424!
965

966
    if (record.suid < pReader->info.suid) {
52,530,424✔
967
      continue;
919,934✔
968
    }
969

970
    if (record.suid > pReader->info.suid) {
51,610,490✔
971
      *pRet = BLK_CHECK_QUIT;
3,798✔
972
      goto _end;
3,798✔
973
    }
974

975
    if (uid < record.uid) {
51,606,692✔
976
      while ((*j) < numOfTables && pReader->status.uidList.tableUidList[*j] < record.uid) {
230,912✔
977
        (*j) += 1;
116,350✔
978
      }
979

980
      if ((*j) >= numOfTables) {
114,562✔
981
        *pRet = BLK_CHECK_QUIT;
91,686✔
982
        goto _end;
91,686✔
983
      }
984

985
      uid = pReader->status.uidList.tableUidList[*j];
22,876✔
986
      code = getTableBlockScanInfo(pReader->status.pTableMap, uid, &pScanInfo, pReader->idStr);
22,876✔
987
      TSDB_CHECK_CODE(code, lino, _end);
22,882!
988

989
      if (pScanInfo->pFileDelData == NULL) {
22,882✔
990
        pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
5,760✔
991
        TSDB_CHECK_NULL(pScanInfo->pFileDelData, code, lino, _end, terrno);
5,756!
992
      }
993
    }
994

995
    if (record.uid < uid) {
51,522,735✔
996
      continue;
51,119,938✔
997
    }
998

999
    TSDB_CHECK_CONDITION((record.suid == pReader->info.suid) && (uid == record.uid), code, lino, _end,
402,797!
1000
                         TSDB_CODE_INTERNAL_ERROR);
1001

1002
    if (record.version <= pReader->info.verRange.maxVer) {
402,797!
1003
      SDelData    delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
402,801✔
1004
      const void* px = taosArrayPush(pScanInfo->pFileDelData, &delData);
402,801✔
1005
      TSDB_CHECK_NULL(px, code, lino, _end, terrno);
402,823!
1006
    }
1007
  }
1008

1009
  *pRet = BLK_CHECK_CONTINUE;
26,215✔
1010

1011
_end:
121,699✔
1012
  if (code != TSDB_CODE_SUCCESS) {
121,699!
1013
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1014
  }
1015
  return code;
120,474✔
1016
}
1017

1018
// load tomb data API
1019
static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STsdbReader* pReader, void* pFileReader,
5,088,630✔
1020
                                         bool isFile) {
1021
  int32_t              code = TSDB_CODE_SUCCESS;
5,088,630✔
1022
  int32_t              lino = 0;
5,088,630✔
1023
  const STableUidList* pList = NULL;
5,088,630✔
1024
  int32_t              numOfTables = 0;
5,088,630✔
1025

1026
  TSDB_CHECK_NULL(pTombBlkArray, code, lino, _end, TSDB_CODE_INVALID_PARA);
5,088,630!
1027
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
5,088,630!
1028

1029
  pList = &pReader->status.uidList;
5,088,630✔
1030
  numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
5,088,630✔
1031

1032
  int32_t i = 0, j = 0;
5,089,846✔
1033
  while (i < pTombBlkArray->size && j < numOfTables) {
5,133,044!
1034
    STombBlk* pTombBlk = &pTombBlkArray->data[i];
145,588✔
1035
    if (pTombBlk->maxTbid.suid < pReader->info.suid) {
145,588✔
1036
      i += 1;
8,846✔
1037
      continue;
18,207✔
1038
    }
1039

1040
    if (pTombBlk->minTbid.suid > pReader->info.suid) {
136,742✔
1041
      break;
102,391✔
1042
    }
1043

1044
    TSDB_CHECK_CONDITION(
131,060!
1045
        (pTombBlk->minTbid.suid <= pReader->info.suid) && (pTombBlk->maxTbid.suid >= pReader->info.suid), code, lino,
1046
        _end, TSDB_CODE_INTERNAL_ERROR);
1047

1048
    if (pTombBlk->maxTbid.suid == pReader->info.suid && pTombBlk->maxTbid.uid < pList->tableUidList[0]) {
131,060✔
1049
      i += 1;
9,361✔
1050
      continue;
9,361✔
1051
    }
1052

1053
    if (pTombBlk->minTbid.suid == pReader->info.suid && pTombBlk->minTbid.uid > pList->tableUidList[numOfTables - 1]) {
121,699✔
1054
      break;
1,225✔
1055
    }
1056

1057
    STombBlock block = {0};
120,474✔
1058
    code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
58,743✔
1059
                  : tsdbSttFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block);
120,474✔
1060
    TSDB_CHECK_CODE(code, lino, _end);
120,474!
1061

1062
    ETombBlkCheckEnum ret = 0;
120,474✔
1063
    code = doCheckTombBlock(&block, pReader, numOfTables, &j, &ret);
120,474✔
1064

1065
    tTombBlockDestroy(&block);
120,474✔
1066
    TSDB_CHECK_CODE(code, lino, _end);
120,475!
1067
    if (ret == BLK_CHECK_QUIT) {
120,475✔
1068
      break;
95,484✔
1069
    }
1070

1071
    i += 1;
24,991✔
1072
  }
1073

1074
_end:
4,987,456✔
1075
  if (code != TSDB_CODE_SUCCESS) {
5,089,847!
1076
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1077
  }
1078
  return code;
5,089,219✔
1079
}
1080

1081
int32_t loadDataFileTombDataForAll(STsdbReader* pReader) {
7,175,098✔
1082
  int32_t              code = TSDB_CODE_SUCCESS;
7,175,098✔
1083
  int32_t              lino = 0;
7,175,098✔
1084
  const TTombBlkArray* pBlkArray = NULL;
7,175,098✔
1085

1086
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
7,175,098!
1087

1088
  if ((pReader->status.pCurrentFileset == NULL) || (pReader->status.pCurrentFileset->farr[TSDB_FTYPE_TOMB] == NULL)) {
7,175,098!
1089
    return TSDB_CODE_SUCCESS;
7,108,524✔
1090
  }
1091

1092
  code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray);
66,574✔
1093
  TSDB_CHECK_CODE(code, lino, _end);
66,572!
1094

1095
  code = doLoadTombDataFromTombBlk(pBlkArray, pReader, pReader->pFileReader, true);
66,572✔
1096
  TSDB_CHECK_CODE(code, lino, _end);
66,574!
1097

1098
_end:
66,574✔
1099
  if (code != TSDB_CODE_SUCCESS) {
66,574!
1100
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1101
  }
1102
  return code;
66,574✔
1103
}
1104

1105
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo) {
5,023,759✔
1106
  int32_t              code = TSDB_CODE_SUCCESS;
5,023,759✔
1107
  int32_t              lino = 0;
5,023,759✔
1108
  const TTombBlkArray* pBlkArray = NULL;
5,023,759✔
1109

1110
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
5,023,759!
1111

1112
  code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray);
5,023,759✔
1113
  TSDB_CHECK_CODE(code, lino, _end);
5,023,200!
1114

1115
  code = doLoadTombDataFromTombBlk(pBlkArray, pReader, pSttFileReader, false);
5,023,200✔
1116
  TSDB_CHECK_CODE(code, lino, _end);
5,022,825!
1117

1118
_end:
5,022,825✔
1119
  if (code != TSDB_CODE_SUCCESS) {
5,022,825!
1120
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1121
  }
1122
  return code;
5,022,734✔
1123
}
1124

1125
int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) {
11,719,885✔
1126
  int32_t         code = TSDB_CODE_SUCCESS;
11,719,885✔
1127
  int32_t         lino = 0;
11,719,885✔
1128
  SArray*         pMemDelData = NULL;
11,719,885✔
1129
  const SDelData* p = NULL;
11,719,885✔
1130
  const void*     px = NULL;
11,719,885✔
1131

1132
  TSDB_CHECK_NULL(ppMemDelData, code, lino, _end, TSDB_CODE_INVALID_PARA);
11,719,885!
1133

1134
  if (*ppMemDelData == NULL) {
11,719,885✔
1135
    *ppMemDelData = taosArrayInit(4, sizeof(SDelData));
11,705,242✔
1136
    TSDB_CHECK_NULL(*ppMemDelData, code, lino, _end, terrno);
11,709,130!
1137
  }
1138

1139
  pMemDelData = *ppMemDelData;
11,723,773✔
1140

1141
  if (pMemTbData != NULL) {
11,723,773✔
1142
    taosRLockLatch(&pMemTbData->lock);
4,093,372✔
1143
    p = pMemTbData->pHead;
4,094,793✔
1144
    while (p) {
4,672,022✔
1145
      if (p->version <= ver) {
577,233✔
1146
        px = taosArrayPush(pMemDelData, p);
571,844✔
1147
        if (px == NULL) {
571,844!
1148
          taosRUnLockLatch(&pMemTbData->lock);
×
1149
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1150
        }
1151
      }
1152
      p = p->pNext;
577,229✔
1153
    }
1154
    taosRUnLockLatch(&pMemTbData->lock);
4,094,789✔
1155
  }
1156

1157
  if (piMemTbData != NULL) {
11,725,083✔
1158
    p = piMemTbData->pHead;
36,277✔
1159
    while (p) {
139,289✔
1160
      if (p->version <= ver) {
103,013!
1161
        px = taosArrayPush(pMemDelData, p);
103,013✔
1162
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
103,013!
1163
      }
1164
      p = p->pNext;
103,012✔
1165
    }
1166
  }
1167

1168
_end:
11,725,082✔
1169
  if (code != TSDB_CODE_SUCCESS) {
11,725,082!
1170
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1171
  }
1172
  return code;
11,724,857✔
1173
}
1174

1175
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
4,877✔
1176
                               TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
1177
                               int32_t numOfTables, int32_t* pNumOfRows) {
1178
  int32_t           code = TSDB_CODE_SUCCESS;
4,877✔
1179
  int32_t           lino = 0;
4,877✔
1180
  int32_t           num = 0;
4,877✔
1181
  int64_t           st = 0;
4,877✔
1182
  const SStatisBlk* p = NULL;
4,877✔
1183
  STbStatisBlock*   pStatisBlock = NULL;
4,877✔
1184

1185
  TSDB_CHECK_NULL(pSttFileReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,877!
1186
  TSDB_CHECK_NULL(pBlockLoadInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,877!
1187
  TSDB_CHECK_NULL(pStatisBlkArray, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,877!
1188
  TSDB_CHECK_NULL(pNumOfRows, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,877!
1189

1190
  *pNumOfRows = 0;
4,877✔
1191

1192
  if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
4,877!
1193
    return code;
×
1194
  }
1195

1196
  int32_t i = 0;
4,877✔
1197
  while ((i < TARRAY2_SIZE(pStatisBlkArray)) && (pStatisBlkArray->data[i].maxTbid.suid < suid)) {
6,821✔
1198
    ++i;
1,944✔
1199
  }
1200

1201
  if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
4,877✔
1202
    goto _end;
1,944✔
1203
  }
1204

1205
  p = &pStatisBlkArray->data[i];
2,933✔
1206
  pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
2,933!
1207
  TSDB_CHECK_NULL(pStatisBlock, code, lino, _end, terrno);
2,938!
1208

1209
  code = tStatisBlockInit(pStatisBlock);
2,938✔
1210
  TSDB_CHECK_CODE(code, lino, _end);
2,936!
1211

1212
  st = taosGetTimestampUs();
2,939✔
1213
  code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock);
2,939✔
1214
  TSDB_CHECK_CODE(code, lino, _end);
2,941!
1215

1216
  double el = (taosGetTimestampUs() - st) / 1000.0;
2,941✔
1217
  pBlockLoadInfo->cost.loadStatisBlocks += 1;
2,941✔
1218
  pBlockLoadInfo->cost.statisElapsedTime += el;
2,941✔
1219

1220
  int32_t index = 0;
2,941✔
1221
  while (index < pStatisBlock->numOfRecords && ((int64_t*)pStatisBlock->suids.data)[index] < suid) {
5,857!
1222
    ++index;
2,916✔
1223
  }
1224

1225
  if (index >= pStatisBlock->numOfRecords) {
2,941!
1226
    *pNumOfRows = num;
×
1227
    goto _end;
×
1228
  }
1229

1230
  int32_t j = index;
2,941✔
1231
  int32_t uidIndex = 0;
2,941✔
1232
  while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) {
7,826✔
1233
    p = &pStatisBlkArray->data[i];
4,887✔
1234
    if (p->minTbid.suid > suid) {
4,887✔
1235
      *pNumOfRows = num;
1✔
1236
      goto _end;
1✔
1237
    }
1238

1239
    uint64_t uid = pUidList[uidIndex];
4,886✔
1240

1241
    if (((int64_t*)pStatisBlock->uids.data)[j] == uid) {
4,886✔
1242
      num += ((int64_t*)pStatisBlock->counts.data)[j];
2,943✔
1243
      uidIndex += 1;
2,943✔
1244
      j += 1;
2,943✔
1245
      code = loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j);
2,943✔
1246
      TSDB_CHECK_CODE(code, lino, _end);
2,942!
1247
    } else if (((int64_t*)pStatisBlock->uids.data)[j] < uid) {
1,943!
1248
      j += 1;
×
1249
      code = loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j);
×
1250
      TSDB_CHECK_CODE(code, lino, _end);
×
1251
    } else {
1252
      uidIndex += 1;
1,943✔
1253
    }
1254
  }
1255

1256
  *pNumOfRows = num;
2,939✔
1257

1258
_end:
4,884✔
1259
  if (code != TSDB_CODE_SUCCESS) {
4,884!
1260
    tsdbError("%s with %p failed at line %d since %s", __func__, pSttFileReader, lino, tstrerror(code));
×
1261
  }
1262
  if (pStatisBlock) {
4,884✔
1263
    tStatisBlockDestroy(pStatisBlock);
2,940✔
1264
    taosMemoryFreeClear(pStatisBlock);
2,941!
1265
  }
1266
  return code;
4,885✔
1267
}
1268

1269
// load next stt statistics block
1270
static int32_t loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
2,943✔
1271
                                       const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i,
1272
                                       int32_t* j) {
1273
  int32_t code = TSDB_CODE_SUCCESS;
2,943✔
1274
  int32_t lino = 0;
2,943✔
1275

1276
  if ((*j) >= numOfRows) {
2,943✔
1277
    (*i) += 1;
1,968✔
1278
    (*j) = 0;
1,968✔
1279
    if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) {
1,968!
1280
      code = tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock);
×
1281
      if (code != 0) {
×
1282
        tsdbError("%p failed to read statisBlock, code:%s", pSttFileReader, tstrerror(code));
×
1283
        TSDB_CHECK_CODE(code, lino, _end);
×
1284
      }
1285
    }
1286
  }
1287

1288
_end:
2,943✔
1289
  if (code != TSDB_CODE_SUCCESS) {
2,943!
1290
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1291
  }
1292
  return code;
2,943✔
1293
}
1294

1295
int32_t doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
11,151,886✔
1296
  int32_t     code = TSDB_CODE_SUCCESS;
11,151,886✔
1297
  int32_t     lino = 0;
11,151,886✔
1298
  int32_t     size = 0;
11,151,886✔
1299
  int32_t     inc = 0;
11,151,886✔
1300
  SLDataIter* pIter = NULL;
11,151,886✔
1301

1302
  size = taosArrayGetSize(pLDIterList);
11,151,886✔
1303

1304
  if (size < numOfFileObj) {
11,152,219✔
1305
    inc = numOfFileObj - size;
5,010,828✔
1306
    for (int32_t k = 0; k < inc; ++k) {
10,038,643✔
1307
      pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
5,024,596✔
1308
      TSDB_CHECK_NULL(pIter, code, lino, _end, terrno);
5,025,051!
1309
      void* px = taosArrayPush(pLDIterList, &pIter);
5,027,815✔
1310
      if (px == NULL) {
5,027,815!
1311
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1312
        taosMemoryFreeClear(pIter);
×
1313
      }
1314
    }
1315
  } else if (size > numOfFileObj) {  // remove unused LDataIter
6,141,391!
1316
    inc = size - numOfFileObj;
×
1317

1318
    for (int32_t i = 0; i < inc; ++i) {
×
1319
      pIter = taosArrayPop(pLDIterList);
×
1320
      destroyLDataIter(pIter);
×
1321
    }
1322
  }
1323

1324
_end:
6,141,391✔
1325
  if (code != TSDB_CODE_SUCCESS) {
11,155,438!
1326
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1327
  }
1328
  return code;
11,153,423✔
1329
}
1330

1331
int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) {
10,994,865✔
1332
  int32_t  code = TSDB_CODE_SUCCESS;
10,994,865✔
1333
  int32_t  lino = 0;
10,994,865✔
1334
  int32_t  numOfLevels = 0;
10,994,865✔
1335
  SSttLvl* pSttLevel = NULL;
10,994,865✔
1336
  SArray*  pList = NULL;
10,994,865✔
1337

1338
  TSDB_CHECK_NULL(pFileSet, code, lino, _end, TSDB_CODE_INVALID_PARA);
10,994,865!
1339

1340
  numOfLevels = pFileSet->lvlArr->size;
10,994,865✔
1341

1342
  // add the list/iter placeholder
1343
  while (taosArrayGetSize(pSttFileBlockIterArray) < numOfLevels) {
16,006,481✔
1344
    pList = taosArrayInit(4, POINTER_BYTES);
5,005,438✔
1345
    TSDB_CHECK_NULL(pList, code, lino, _end, terrno);
5,013,939!
1346
    void* px = taosArrayPush(pSttFileBlockIterArray, &pList);
5,011,616✔
1347
    if (px == NULL) {
5,011,616!
1348
      taosArrayDestroy(pList);
×
1349
      TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1350
    }
1351
  }
1352

1353
  for (int32_t j = 0; j < numOfLevels; ++j) {
22,151,420✔
1354
    pSttLevel = pFileSet->lvlArr->data[j];
11,154,867✔
1355
    pList = taosArrayGetP(pSttFileBlockIterArray, j);
11,154,867✔
1356
    code = doAdjustValidDataIters(pList, TARRAY2_SIZE(pSttLevel->fobjArr));
11,154,666✔
1357
    TSDB_CHECK_CODE(code, lino, _end);
11,154,271!
1358
  }
1359

1360
_end:
10,996,553✔
1361
  if (code != TSDB_CODE_SUCCESS) {
10,996,553!
1362
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1363
  }
1364
  return code;
10,996,505✔
1365
}
1366

1367
int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf,
4,875✔
1368
                              const char* pstr) {
1369
  int32_t code = TSDB_CODE_SUCCESS;
4,875✔
1370
  int32_t lino = 0;
4,875✔
1371
  int32_t numOfRows = 0;
4,875✔
1372
  int32_t numOfLevels = 0;
4,875✔
1373

1374
  TSDB_CHECK_NULL(pFileSet, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,875!
1375

1376
  // no data exists, go to end
1377
  numOfLevels = pFileSet->lvlArr->size;
4,875✔
1378
  if (numOfLevels == 0) {
4,875!
UNCOV
1379
    goto _end;
×
1380
  }
1381

1382
  // add the list/iter placeholder
1383
  code = adjustSttDataIters(pSttFileBlockIterArray, pFileSet);
4,875✔
1384
  TSDB_CHECK_CODE(code, lino, _end);
4,882!
1385

1386
  for (int32_t j = 0; j < numOfLevels; ++j) {
9,767✔
1387
    SSttLvl* pSttLevel = pFileSet->lvlArr->data[j];
4,881✔
1388
    SArray*  pList = taosArrayGetP(pSttFileBlockIterArray, j);
4,881✔
1389

1390
    for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {  // open all last file
9,765✔
1391
      SLDataIter* pIter = taosArrayGetP(pList, i);
4,878✔
1392

1393
      // open stt file reader if not opened yet
1394
      // if failed to open this stt file, ignore the error and try next one
1395
      if (pIter->pReader == NULL) {
4,883!
1396
        SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
4,883✔
1397
        conf.file[0] = *pSttLevel->fobjArr->data[i]->f;
4,883✔
1398

1399
        const char* pName = pSttLevel->fobjArr->data[i]->fname;
4,883✔
1400
        code = tsdbSttFileReaderOpen(pName, &conf, &pIter->pReader);
4,883✔
1401
        if (code != TSDB_CODE_SUCCESS) {
4,877✔
1402
          tsdbError("open stt file reader error. file:%s, code %s, %s", pName, tstrerror(code), pstr);
1!
1403
          continue;
×
1404
        }
1405
      }
1406

1407
      if (pIter->pBlockLoadInfo == NULL) {
4,876!
1408
        code = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols, &pIter->pBlockLoadInfo);
4,878✔
1409
        if (code != TSDB_CODE_SUCCESS) {
4,875!
1410
          tsdbError("failed to create block load info, code: out of memory, %s", pstr);
×
1411
          continue;
×
1412
        }
1413
      }
1414

1415
      // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
1416
      TStatisBlkArray* pStatisBlkArray = NULL;
4,873✔
1417
      code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray**)&pStatisBlkArray);
4,873✔
1418
      if (code != TSDB_CODE_SUCCESS) {
4,877!
1419
        tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), pstr);
×
1420
        continue;
×
1421
      }
1422

1423
      // extract rows from each stt file one-by-one
1424
      STsdbReader* pReader = pConf->pReader;
4,877✔
1425
      int32_t      numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
4,877✔
1426
      uint64_t*    pUidList = pReader->status.uidList.tableUidList;
4,878✔
1427
      int32_t      n = 0;
4,878✔
1428
      code = getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, pConf->suid, pUidList,
4,878✔
1429
                                    numOfTables, &n);
1430
      numOfRows += n;
4,885✔
1431
      if (code) {
4,885!
1432
        tsdbError("%s failed to get rows in stt blocks, code:%s", pstr, tstrerror(code));
×
1433
      }
1434
    }
1435
  }
1436

1437
_end:
4,886✔
1438
  if (code != TSDB_CODE_SUCCESS) {
4,886!
1439
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1440
  }
1441
  return numOfRows;
4,884✔
1442
}
1443

1444
static bool overlapHelper(const STimeWindow* pLeft, TSKEY minKey, TSKEY maxKey) {
34,497✔
1445
  return (pLeft != NULL) && (pLeft->ekey >= minKey) && (pLeft->skey <= maxKey);
34,497!
1446
}
1447

1448
static bool overlapWithTimeWindow(STimeWindow* p1, STimeWindow* pQueryWindow, STableBlockScanInfo* pBlockScanInfo,
82,123✔
1449
                                  int32_t order) {
1450
  SIterInfo*   pMemIter = NULL;
82,123✔
1451
  SIterInfo*   pIMemIter = NULL;
82,123✔
1452
  STbData*     pTbData = NULL;
82,123✔
1453
  STimeWindow* pFileWin = NULL;
82,123✔
1454

1455
  if (p1 == NULL || pQueryWindow == NULL) {
82,123!
1456
    return false;
×
1457
  }
1458

1459
  // overlap with query window
1460
  if (!(p1->skey >= pQueryWindow->skey && p1->ekey <= pQueryWindow->ekey)) {
82,180!
UNCOV
1461
    return true;
×
1462
  }
1463

1464
  if (pBlockScanInfo == NULL) {
82,192!
1465
    return false;
×
1466
  }
1467

1468
  pMemIter = &pBlockScanInfo->iter;
82,192✔
1469
  pIMemIter = &pBlockScanInfo->iiter;
82,192✔
1470

1471
  // overlap with mem data
1472
  if (pMemIter->hasVal) {
82,192✔
1473
    pTbData = pMemIter->iter->pTbData;
32,417✔
1474
    if (overlapHelper(p1, pTbData->minKey, pTbData->maxKey)) {
32,417✔
1475
      return true;
32,299✔
1476
    }
1477
  }
1478

1479
  // overlap with imem data
1480
  if (pIMemIter->hasVal) {
49,893✔
1481
    pTbData = pIMemIter->iter->pTbData;
1,607✔
1482
    if (overlapHelper(p1, pTbData->minKey, pTbData->maxKey)) {
1,607✔
1483
      return true;
1,590✔
1484
    }
1485
  }
1486

1487
  // overlap with data file block
1488
  pFileWin = &pBlockScanInfo->filesetWindow;
48,304✔
1489
  if ((taosArrayGetSize(pBlockScanInfo->pBlockIdxList) > 0) && overlapHelper(p1, pFileWin->skey, pFileWin->ekey)) {
48,304✔
1490
    return true;
474✔
1491
  }
1492

1493
  // overlap with deletion skyline
1494
  SBrinRecord record = {.firstKey = p1->skey, .lastKey = p1->ekey};
47,836✔
1495
  if (overlapWithDelSkylineWithoutVer(pBlockScanInfo, &record, order)) {
47,836✔
1496
    return true;
8,034✔
1497
  }
1498

1499
  return false;
39,801✔
1500
}
1501

1502
static int32_t sortUidComparFn(const void* p1, const void* p2) {
64,934✔
1503
  const SSttKeyRange* px1 = p1;
64,934✔
1504
  const SSttKeyRange* px2 = p2;
64,934✔
1505

1506
  int32_t ret = tRowKeyCompare(&px1->skey, &px2->skey);
64,934✔
1507
  return ret;
65,041✔
1508
}
1509

1510
bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order) {
150,803✔
1511
  int32_t       num = 0;
150,803✔
1512
  SSttKeyRange* pRange = NULL;
150,803✔
1513
  STimeWindow   w;
1514

1515
  num = taosArrayGetSize(pKeyRangeList);
150,803✔
1516
  if (num == 0) {
150,829✔
1517
    return false;
68,626✔
1518
  }
1519

1520
  // check if it overlap with del skyline
1521
  taosArraySort(pKeyRangeList, sortUidComparFn);
82,203✔
1522

1523
  pRange = taosArrayGet(pKeyRangeList, 0);
82,140✔
1524
  if (pRange == NULL) {
82,131!
1525
    return false;
×
1526
  }
1527

1528
  w = (STimeWindow){.skey = pRange->skey.ts, .ekey = pRange->ekey.ts};
82,131✔
1529
  if (overlapWithTimeWindow(&w, pQueryWindow, pScanInfo, order)) {
82,131✔
1530
    return false;
42,404✔
1531
  }
1532

1533
  for (int32_t i = 0; i < num - 1; ++i) {
39,800✔
1534
    SSttKeyRange* p1 = taosArrayGet(pKeyRangeList, i);
23,866✔
1535
    SSttKeyRange* p2 = taosArrayGet(pKeyRangeList, i + 1);
23,880✔
1536
    if (p1 == NULL || p2 == NULL) {
23,873!
1537
      return false;
23,871✔
1538
    }
1539

1540
    if (p1->ekey.ts >= p2->skey.ts) {
23,873✔
1541
      return false;
23,871✔
1542
    }
1543

1544
    w = (STimeWindow){.skey = p2->skey.ts, .ekey = p2->ekey.ts};
2✔
1545
    bool overlap = overlapWithTimeWindow(&w, pQueryWindow, pScanInfo, order);
2✔
1546
    if (overlap) {
8!
1547
      return false;
×
1548
    }
1549
  }
1550

1551
  return true;
15,934✔
1552
}
1553

1554
static bool doCheckDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
59,618✔
1555
                                    int32_t startIndex) {
1556
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
59,618✔
1557

1558
  for (int32_t i = startIndex; i < num; i += 1) {
128,814✔
1559
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
125,418✔
1560
    if (p == NULL) {
125,418!
1561
      return false;
×
1562
    }
1563

1564
    if (p->ts >= pRecord->firstKey.key.ts && p->ts <= pRecord->lastKey.key.ts) {
125,418✔
1565
      if (p->version >= pRecord->minVer) {
64,248✔
1566
        return true;
48,586✔
1567
      }
1568
    } else if (p->ts < pRecord->firstKey.key.ts) {  // p->ts < pBlock->minKey.ts
61,170✔
1569
      if (p->version >= pRecord->minVer) {
60,286✔
1570
        if (i < num - 1) {
21,338!
1571
          TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
21,338✔
1572
          if (pnext == NULL) {
21,338!
1573
            return false;
×
1574
          }
1575

1576
          if (pnext->ts >= pRecord->firstKey.key.ts) {
21,338✔
1577
            return true;
6,752✔
1578
          }
1579
        } else {  // it must be the last point
1580
          if (!(p->version == 0)) {
×
1581
            tsdbError("unexpected version:%" PRId64, p->version);
×
1582
          }
1583
        }
1584
      }
1585
    } else {  // (p->ts > pBlock->maxKey.ts) {
1586
      return false;
884✔
1587
    }
1588
  }
1589

1590
  return false;
3,396✔
1591
}
1592

1593
static bool doCheckDatablockOverlapWithoutVersion(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord,
8,034✔
1594
                                                  int32_t startIndex) {
1595
  size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
8,034✔
1596

1597
  for (int32_t i = startIndex; i < num; i += 1) {
8,034!
1598
    TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
8,035✔
1599
    if (p == NULL) {
8,034!
1600
      return false;
×
1601
    }
1602

1603
    if (p->ts >= pRecord->firstKey.key.ts && p->ts <= pRecord->lastKey.key.ts) {
8,034!
1604
      return true;
8,034✔
1605
    } else if (p->ts < pRecord->firstKey.key.ts) {  // p->ts < pBlock->minKey.ts
×
1606
      if (i < num - 1) {
×
1607
        TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
×
1608
        if (pnext == NULL) {
×
1609
          return false;
×
1610
        }
1611

1612
        if (pnext->ts >= pRecord->firstKey.key.ts) {
×
1613
          return true;
×
1614
        }
1615
      }
1616
    } else {  // (p->ts > pBlock->maxKey.ts) {
1617
      return false;
×
1618
    }
1619
  }
1620

1621
  return false;
×
1622
}
1623

1624
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) {
5,280,618✔
1625
  if (pBlockScanInfo->delSkyline == NULL || (taosArrayGetSize(pBlockScanInfo->delSkyline) == 0)) {
5,280,618✔
1626
    return false;
5,210,297✔
1627
  }
1628

1629
  // ts is not overlap
1630
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
70,321✔
1631
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
70,374✔
1632
  if (pFirst == NULL || pLast == NULL) {
70,374!
1633
    return false;
×
1634
  }
1635

1636
  if (pRecord->firstKey.key.ts > pLast->ts || pRecord->lastKey.key.ts < pFirst->ts) {
70,374✔
1637
    return false;
10,756✔
1638
  }
1639

1640
  // version is not overlap
1641
  if (ASCENDING_TRAVERSE(order)) {
59,618✔
1642
    return doCheckDatablockOverlap(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex);
40,876✔
1643
  } else {
1644
    int32_t index = pBlockScanInfo->fileDelIndex;
18,742✔
1645
    while (1) {
55,200✔
1646
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
73,942✔
1647
      if (p == NULL) {
73,942!
1648
        return false;
×
1649
      }
1650

1651
      if (p->ts > pRecord->firstKey.key.ts && index > 0) {
73,942✔
1652
        index -= 1;
55,200✔
1653
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
1654
        if (p->ts == pRecord->firstKey.key.ts && p->version < pRecord->maxVer && index > 0) {
18,742!
1655
          index -= 1;
36✔
1656
        }
1657
        break;
18,742✔
1658
      }
1659
    }
1660

1661
    return doCheckDatablockOverlap(pBlockScanInfo, pRecord, index);
18,742✔
1662
  }
1663
}
1664

1665
bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order) {
47,827✔
1666
  if (pBlockScanInfo->delSkyline == NULL || (taosArrayGetSize(pBlockScanInfo->delSkyline) == 0)) {
47,827✔
1667
    return false;
39,804✔
1668
  }
1669

1670
  // ts is not overlap
1671
  TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
8,022✔
1672
  TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
8,035✔
1673
  if (pFirst == NULL || pLast == NULL) {
8,035!
1674
    return false;
×
1675
  }
1676

1677
  if (pRecord->firstKey.key.ts > pLast->ts || pRecord->lastKey.key.ts < pFirst->ts) {
8,035!
1678
    return false;
1✔
1679
  }
1680

1681
  // version is not overlap
1682
  if (ASCENDING_TRAVERSE(order)) {
8,034!
1683
    return doCheckDatablockOverlapWithoutVersion(pBlockScanInfo, pRecord, pBlockScanInfo->fileDelIndex);
8,034✔
1684
  } else {
1685
    int32_t index = pBlockScanInfo->fileDelIndex;
×
1686
    while (1) {
×
1687
      TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
×
1688
      if (p == NULL) {
×
1689
        return false;
×
1690
      }
1691

1692
      if (p->ts > pRecord->firstKey.key.ts && index > 0) {
×
1693
        index -= 1;
×
1694
      } else {  // find the first point that is smaller than the minKey.ts of dataBlock.
1695
        if (p->ts == pRecord->firstKey.key.ts && index > 0) {
×
1696
          index -= 1;
×
1697
        }
1698
        break;
×
1699
      }
1700
    }
1701

1702
    return doCheckDatablockOverlapWithoutVersion(pBlockScanInfo, pRecord, index);
×
1703
  }
1704
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc