• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.16
/source/dnode/vnode/src/tsdb/tsdbCacheRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "functionMgt.h"
17
#include "functionResInfo.h"
18
#include "taoserror.h"
19
#include "tarray.h"
20
#include "tcommon.h"
21
#include "tsdb.h"
22
#include "tsdbDataFileRW.h"
23
#include "tsdbReadUtil.h"
24

25
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
26

27
static int32_t setFirstLastResColToNull(SColumnInfoData* pCol, int32_t row) {
22✔
28
  int32_t        code = TSDB_CODE_SUCCESS;
22✔
29
  int32_t        lino = 0;
22✔
30
  char*          buf = NULL;
22✔
31
  SFirstLastRes* pRes = NULL;
22✔
32

33
  TSDB_CHECK_NULL(pCol, code, lino, _end, TSDB_CODE_INVALID_PARA);
22!
34

35
  buf = taosMemoryCalloc(1, pCol->info.bytes);
22!
36
  TSDB_CHECK_NULL(buf, code, lino, _end, terrno);
22!
37

38
  pRes = (SFirstLastRes*)((char*)buf + VARSTR_HEADER_SIZE);
22✔
39
  pRes->bytes = 0;
22✔
40
  pRes->hasResult = true;
22✔
41
  pRes->isNull = true;
22✔
42
  varDataSetLen(buf, pCol->info.bytes - VARSTR_HEADER_SIZE);
22✔
43
  code = colDataSetVal(pCol, row, buf, false);
22✔
44
  TSDB_CHECK_CODE(code, lino, _end);
22!
45

46
_end:
22✔
47
  if (code != TSDB_CODE_SUCCESS) {
22!
48
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
49
  }
50
  if (buf != NULL) {
22!
51
    taosMemoryFreeClear(buf);
22!
52
  }
53
  return code;
22✔
54
}
55

56
static int32_t saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, const int32_t slotId,
6,784✔
57
                                    SColumnInfoData* pColInfoData, int32_t numOfRows) {
58
  int32_t  code = TSDB_CODE_SUCCESS;
6,784✔
59
  int32_t  lino = 0;
6,784✔
60
  SColVal* pVal = NULL;
6,784✔
61

62
  TSDB_CHECK_NULL(pColVal, code, lino, _end, TSDB_CODE_INVALID_PARA);
6,784!
63
  TSDB_CHECK_NULL(pColInfoData, code, lino, _end, TSDB_CODE_INVALID_PARA);
6,784!
64

65
  pVal = &pColVal->colVal;
6,784✔
66

67
  // allNullRow = false;
68
  if (IS_VAR_DATA_TYPE(pColVal->colVal.value.type)) {
6,784!
69
    if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
586✔
70
      colDataSetNULL(pColInfoData, numOfRows);
288✔
71
    } else {
72
      TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
298!
73
      varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
298✔
74

75
      memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
298✔
76
      code = colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
298✔
77
      TSDB_CHECK_CODE(code, lino, _end);
298!
78
    }
79
  } else {
80
    code = colDataSetVal(pColInfoData, numOfRows, VALUE_GET_DATUM(&pVal->value, pColVal->colVal.value.type),
6,198!
81
                         !COL_VAL_IS_VALUE(pVal));
6,198!
82
    TSDB_CHECK_CODE(code, lino, _end);
6,197!
83
  }
84

85
_end:
6,197✔
86
  if (code != TSDB_CODE_SUCCESS) {
6,783!
UNCOV
87
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
88
  }
89
  return code;
6,783✔
90
}
91

92
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
3,564✔
93
                          const int32_t* dstSlotIds, void** pRes, const char* idStr) {
94
  int32_t code = TSDB_CODE_SUCCESS;
3,564✔
95
  int32_t lino = 0;
3,564✔
96
  int32_t numOfRows = 0;
3,564✔
97
  SArray* funcTypeBlockArray = NULL;
3,564✔
98

99
  TSDB_CHECK_NULL(pBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
3,564!
100
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
3,564!
101
  if (pReader->numOfCols > 0) {
3,564!
102
    TSDB_CHECK_NULL(slotIds, code, lino, _end, TSDB_CODE_INVALID_PARA);
3,564!
103
    TSDB_CHECK_NULL(dstSlotIds, code, lino, _end, TSDB_CODE_INVALID_PARA);
3,564!
104
    TSDB_CHECK_NULL(pRes, code, lino, _end, TSDB_CODE_INVALID_PARA);
3,564!
105
  }
106

107
  numOfRows = pBlock->info.rows;
3,564✔
108

109
  if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
3,564✔
110
    uint64_t       ts = TSKEY_MIN;
1,461✔
111
    SFirstLastRes* p = NULL;
1,461✔
112
    col_id_t       colId = -1;
1,461✔
113

114
    funcTypeBlockArray = taosArrayInit(pReader->numOfCols, sizeof(int32_t));
1,461✔
115
    TSDB_CHECK_NULL(funcTypeBlockArray, code, lino, _end, terrno);
1,461!
116

117
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
4,459✔
118
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
2,996✔
119
      TSDB_CHECK_NULL(pColInfoData, code, lino, _end, TSDB_CODE_INVALID_PARA);
2,995!
120

121
      int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
2,995✔
122
      if (pReader->pFuncTypeList != NULL && taosArrayGetSize(pReader->pFuncTypeList) > i) {
2,995✔
123
        void* pVal = taosArrayGet(pReader->pFuncTypeList, i);
788✔
124
        TSDB_CHECK_NULL(pVal, code, lino, _end, TSDB_CODE_INVALID_PARA);
788!
125

126
        funcType = *(int32_t*)pVal;
788✔
127
        pVal = taosArrayGet(pReader->pFuncTypeList, i);
788✔
128
        TSDB_CHECK_NULL(pVal, code, lino, _end, TSDB_CODE_INVALID_PARA);
788!
129

130
        void* px = taosArrayInsert(funcTypeBlockArray, dstSlotIds[i], pVal);
788✔
131
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
788!
132
      }
133

134
      if (slotIds[i] == -1) {
2,995✔
135
        if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
22!
136
          colDataSetNULL(pColInfoData, numOfRows);
×
UNCOV
137
          continue;
×
138
        }
139

140
        code = setFirstLastResColToNull(pColInfoData, numOfRows);
22✔
141
        TSDB_CHECK_CODE(code, lino, _end);
22!
142
        continue;
22✔
143
      }
144

145
      int32_t   slotId = slotIds[i];
2,973✔
146
      SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
2,973✔
147
      TSDB_CHECK_NULL(pColVal, code, lino, _end, TSDB_CODE_INVALID_PARA);
2,973!
148

149
      colId = pColVal->colVal.cid;
2,973✔
150
      if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
2,973✔
151
        code = saveOneRowForLastRaw(pColVal, pReader, slotId, pColInfoData, numOfRows);
415✔
152
        TSDB_CHECK_CODE(code, lino, _end);
415!
153

154
        continue;
415✔
155
      }
156

157
      p = (SFirstLastRes*)varDataVal(pRes[i]);
2,558✔
158

159
      p->ts = pColVal->rowKey.ts;
2,558✔
160
      ts = p->ts;
2,558✔
161
      p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
2,558✔
162
      // allNullRow = p->isNull & allNullRow;
163
      if (!p->isNull) {
2,558✔
164
        if (IS_VAR_DATA_TYPE(pColVal->colVal.value.type)) {
2,546!
165
          varDataSetLen(p->buf, pColVal->colVal.value.nData);
372✔
166

167
          memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
372✔
168
          p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE;  // binary needs to plus the header size
372✔
169
        } else {
170
          memcpy(p->buf, VALUE_GET_DATUM(&pColVal->colVal.value, pColVal->colVal.value.type),
2,174!
171
                 pReader->pSchema->columns[slotId].bytes);
2,174!
172
          p->bytes = pReader->pSchema->columns[slotId].bytes;
2,174✔
173
        }
174
      }
175

176
      // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to subtract it
177
      p->hasResult = true;
2,558✔
178
      varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
2,558✔
179
      code = colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
2,558✔
180
      TSDB_CHECK_CODE(code, lino, _end);
2,561!
181
    }
182

183
    for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) {
5,498✔
184
      SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx);
4,034✔
185
      TSDB_CHECK_NULL(pCol, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,035!
186

187
      if (idx < funcTypeBlockArray->size) {
4,035✔
188
        void* pVal = taosArrayGet(funcTypeBlockArray, idx);
788✔
189
        TSDB_CHECK_NULL(pVal, code, lino, _end, TSDB_CODE_INVALID_PARA);
788!
190

191
        int32_t funcType = *(int32_t*)pVal;
788✔
192
        if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
788✔
193
          continue;
415✔
194
        }
195
      }
196

197
      if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
3,620✔
198
        if (ts == TSKEY_MIN) {
367✔
199
          colDataSetNULL(pCol, numOfRows);
22!
200
        } else {
201
          code = colDataSetVal(pCol, numOfRows, (const char*)&ts, false);
345✔
202
          TSDB_CHECK_CODE(code, lino, _end);
345!
203
        }
204
        continue;
367✔
205
      } else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && (pCol->info.colId == colId || colId == -1)) {
3,253✔
206
        if (p && !p->isNull) {
270!
207
          code = colDataSetVal(pCol, numOfRows, p->buf, false);
248✔
208
          TSDB_CHECK_CODE(code, lino, _end);
248!
209
        } else {
210
          colDataSetNULL(pCol, numOfRows);
22!
211
        }
212
      }
213
    }
214

215
    // pBlock->info.rows += allNullRow ? 0 : 1;
216
    ++pBlock->info.rows;
1,460✔
217
  } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
2,103!
218
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
8,830✔
219
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
6,727✔
220
      TSDB_CHECK_NULL(pColInfoData, code, lino, _end, TSDB_CODE_INVALID_PARA);
6,727!
221

222
      int32_t slotId = slotIds[i];
6,727✔
223
      if (slotId == -1) {
6,727✔
224
        colDataSetNULL(pColInfoData, numOfRows);
359!
225
        continue;
359✔
226
      }
227

228
      SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
6,368✔
229
      TSDB_CHECK_NULL(pColVal, code, lino, _end, TSDB_CODE_INVALID_PARA);
6,369!
230

231
      code = saveOneRowForLastRaw(pColVal, pReader, slotId, pColInfoData, numOfRows);
6,369✔
232
      TSDB_CHECK_CODE(code, lino, _end);
6,368!
233
    }
234

235
    // pBlock->info.rows += allNullRow ? 0 : 1;
236
    ++pBlock->info.rows;
2,103✔
237
  } else {
238
    tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
×
UNCOV
239
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
240
    TSDB_CHECK_CODE(code, lino, _end);
×
241
  }
242

243
_end:
×
244
  if (code != TSDB_CODE_SUCCESS) {
3,563!
UNCOV
245
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
246
  }
247
  if (funcTypeBlockArray != NULL) {
3,562✔
248
    taosArrayDestroy(funcTypeBlockArray);
1,460✔
249
  }
250
  return code;
3,562✔
251
}
252

253
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
4,340✔
254
  int32_t code = TSDB_CODE_SUCCESS;
4,340✔
255
  int32_t lino = 0;
4,340✔
256
  int32_t numOfTables = 0;
4,340✔
257

258
  TSDB_CHECK_NULL(p, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,340!
259

260
  numOfTables = p->numOfTables;
4,340✔
261

262
  if (suid != 0) {
4,340✔
263
    code = metaGetTbTSchemaNotNull(p->pVnode->pMeta, suid, -1, 1, &p->pSchema);
4,172✔
264
    if (TSDB_CODE_SUCCESS != code) {
4,173!
265
      tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
×
UNCOV
266
      if (code == TSDB_CODE_NOT_FOUND) {
×
267
        code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
268
      }
UNCOV
269
      TSDB_CHECK_CODE(code, lino, _end);
×
270
    }
271
  } else {
272
    for (int32_t i = 0; i < numOfTables; ++i) {
168!
273
      uint64_t uid = p->pTableList[i].uid;
171✔
274
      code = metaGetTbTSchemaMaybeNull(p->pVnode->pMeta, uid, -1, 1, &p->pSchema);
171✔
275
      TSDB_CHECK_CODE(code, lino, _end);
171!
276
      if (p->pSchema != NULL) {
171!
277
        break;
171✔
278
      }
279

UNCOV
280
      tsdbWarn("table:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", uid, idstr);
×
281
    }
282

283
    // all queried tables have been dropped already, return immediately.
284
    if (p->pSchema == NULL) {
168!
285
      tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
×
UNCOV
286
      code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
UNCOV
287
      TSDB_CHECK_CODE(code, lino, _end);
×
288
    }
289
  }
290

291
_end:
168✔
292
  if (code != TSDB_CODE_SUCCESS) {
4,341!
UNCOV
293
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
294
  }
295
  return code;
4,338✔
296
}
297

298
int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOfTables) {
280✔
299
  int32_t           code = TSDB_CODE_SUCCESS;
280✔
300
  int32_t           lino = 0;
280✔
301
  SCacheRowsReader* pReader = (SCacheRowsReader*)reader;
280✔
302

303
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
280!
304

305
  pReader->pTableList = pTableIdList;
280✔
306
  pReader->numOfTables = numOfTables;
280✔
307
  pReader->lastTs = INT64_MIN;
280✔
308
  destroySttBlockReader(pReader->pLDataIterArray, NULL);
280✔
309
  pReader->pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
281✔
310
  TSDB_CHECK_NULL(pReader->pLDataIterArray, code, lino, _end, terrno);
281!
311

312
_end:
281✔
313
  if (code != TSDB_CODE_SUCCESS) {
281!
UNCOV
314
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
315
  }
316
  return code;
281✔
317
}
318

319
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
4,340✔
320
                                SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr,
321
                                SArray* pFuncTypeList, SColumnInfo* pPkCol, int32_t numOfPks) {
322
  int32_t           code = TSDB_CODE_SUCCESS;
4,340✔
323
  int32_t           lino = 0;
4,340✔
324
  SCacheRowsReader* p = NULL;
4,340✔
325

326
  TSDB_CHECK_NULL(pVnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,340!
327
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
4,340!
328

329
  *pReader = NULL;
4,340✔
330
  p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
4,340!
331
  TSDB_CHECK_NULL(p, code, lino, _end, terrno);
4,338!
332

333
  p->type = type;
4,338✔
334
  p->pVnode = pVnode;
4,338✔
335
  p->pTsdb = p->pVnode->pTsdb;
4,338✔
336
  p->info.verRange = (SVersionRange){.minVer = 0, .maxVer = INT64_MAX};
4,338✔
337
  p->info.suid = suid;
4,338✔
338
  p->numOfCols = numOfCols;
4,338✔
339
  p->pCidList = pCidList;
4,338✔
340
  p->pSlotIds = pSlotIds;
4,338✔
341
  p->pFuncTypeList = pFuncTypeList;
4,338✔
342

343
  p->rowKey.numOfPKs = numOfPks;
4,338✔
344
  if (numOfPks > 0) {
4,338✔
345
    TSDB_CHECK_NULL(pPkCol, code, lino, _end, TSDB_CODE_INVALID_PARA);
36!
346
    p->rowKey.pks[0].type = pPkCol->type;
36✔
347
    if (IS_VAR_DATA_TYPE(pPkCol->type)) {
36!
348
      p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes);
×
349
      if (p->rowKey.pks[0].pData == NULL) {
×
350
        taosMemoryFreeClear(p);
×
UNCOV
351
        code = terrno;
×
UNCOV
352
        TSDB_CHECK_CODE(code, lino, _end);
×
353
      }
354
    }
355

356
    p->pkColumn = *pPkCol;
36✔
357
  }
358

359
  if (numOfTables == 0) {
4,338!
360
    *pReader = p;
×
UNCOV
361
    p = NULL;
×
UNCOV
362
    goto _end;
×
363
  }
364

365
  p->pTableList = pTableIdList;
4,338✔
366
  p->numOfTables = numOfTables;
4,338✔
367

368
  code = setTableSchema(p, suid, idstr);
4,338✔
369
  TSDB_CHECK_CODE(code, lino, _end);
4,338!
370

371
  p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
4,338!
372
  TSDB_CHECK_NULL(p->transferBuf, code, lino, _end, terrno);
4,342!
373

374
  for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
101,321✔
375
    if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
96,969✔
376
      p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
11,901!
377
      TSDB_CHECK_NULL(p->transferBuf[i], code, lino, _end, terrno);
11,911!
378
    }
379
  }
380

381
  if (idstr != NULL) {
4,352✔
382
    p->idstr = taosStrdup(idstr);
4,346!
383
    TSDB_CHECK_NULL(p->idstr, code, lino, _end, terrno);
4,339!
384
  }
385
  code = taosThreadMutexInit(&p->readerMutex, NULL);
4,345✔
386
  TSDB_CHECK_CODE(code, lino, _end);
4,336!
387

388
  p->lastTs = INT64_MIN;
4,336✔
389

390
  *pReader = p;
4,336✔
391
  p = NULL;
4,336✔
392

393
_end:
4,336✔
394
  if (code != TSDB_CODE_SUCCESS) {
4,336!
UNCOV
395
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
396
    *pReader = NULL;
×
397
  }
398
  if (p != NULL) {
4,336!
UNCOV
399
    tsdbCacherowsReaderClose(p);
×
400
  }
401
  return code;
4,333✔
402
}
403

404
void tsdbCacherowsReaderClose(void* pReader) {
4,346✔
405
  SCacheRowsReader* p = pReader;
4,346✔
406
  if (p == NULL) {
4,346!
UNCOV
407
    return;
×
408
  }
409

410
  if (p->pSchema != NULL && p->transferBuf != NULL) {
4,346!
411
    for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
101,623✔
412
      taosMemoryFreeClear(p->transferBuf[i]);
97,278!
413
    }
414

415
    taosMemoryFree(p->transferBuf);
4,345!
416
    taosMemoryFree(p->pSchema);
4,346!
417
  }
418

419
  taosMemoryFree(p->pCurrSchema);
4,345!
420

421
  if (p->rowKey.numOfPKs > 0) {
4,348✔
422
    for (int32_t i = 0; i < p->rowKey.numOfPKs; i++) {
72✔
423
      if (IS_VAR_DATA_TYPE(p->rowKey.pks[i].type)) {
36!
UNCOV
424
        taosMemoryFree(p->rowKey.pks[i].pData);
×
425
      }
426
    }
427
  }
428

429
  if (p->pLDataIterArray) {
4,348✔
430
    destroySttBlockReader(p->pLDataIterArray, NULL);
126✔
431
    p->pLDataIterArray = NULL;
126✔
432
  }
433

434
  if (p->pFileReader) {
4,348!
UNCOV
435
    tsdbDataFileReaderClose(&p->pFileReader);
×
UNCOV
436
    p->pFileReader = NULL;
×
437
  }
438

439
  taosMemoryFree((void*)p->idstr);
4,348!
440
  (void)taosThreadMutexDestroy(&p->readerMutex);
4,348✔
441

442
  if (p->pTableMap) {
4,347✔
443
    void*   pe = NULL;
3,286✔
444
    int32_t iter = 0;
3,286✔
445
    while ((pe = tSimpleHashIterate(p->pTableMap, pe, &iter)) != NULL) {
8,183✔
446
      STableLoadInfo* pInfo = *(STableLoadInfo**)pe;
4,894✔
447
      taosArrayDestroy(pInfo->pTombData);
4,894✔
448
      pInfo->pTombData = NULL;
4,897✔
449
    }
450

451
    tSimpleHashCleanup(p->pTableMap);
3,285✔
452
  }
453
  if (p->uidList) {
4,347✔
454
    taosMemoryFree(p->uidList);
14!
455
  }
456

457
  taosMemoryFree(pReader);
4,347!
458
}
459

460
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
×
UNCOV
461
  int32_t           code = 0;
×
462
  SCacheRowsReader* pReader = pQHandle;
×
463

UNCOV
464
  code = taosThreadMutexTryLock(&pReader->readerMutex);
×
UNCOV
465
  if (code == 0) {
×
466
    // pause current reader's state if not paused, save ts & version for resuming
467
    // just wait for the big all tables' snapshot untaking for now
468

UNCOV
469
    code = TSDB_CODE_VND_QUERY_BUSY;
×
470
    (void)taosThreadMutexUnlock(&pReader->readerMutex);
×
471

472
    return code;
×
UNCOV
473
  } else if (code == EBUSY) {
×
474
    return TSDB_CODE_VND_QUERY_BUSY;
×
475
  } else {
UNCOV
476
    return -1;
×
477
  }
478
}
479

480
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, const int32_t* dstSlotIds,
6,407✔
481
                              SArray* pTableUidList, bool* pGotAll) {
482
  int32_t           code = TSDB_CODE_SUCCESS;
6,407✔
483
  int32_t           lino = 0;
6,407✔
484
  bool              hasRes = false;
6,407✔
485
  SArray*           pRow = NULL;
6,407✔
486
  void**            pRes = NULL;
6,407✔
487
  SCacheRowsReader* pr = NULL;
6,407✔
488
  int32_t           pkBufLen = 0;
6,407✔
489

490
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
6,407!
491
  TSDB_CHECK_NULL(pResBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
6,407!
492

493
  pr = pReader;
6,407✔
494

495
  pr->pReadSnap = NULL;
6,407✔
496
  pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
6,407✔
497
  TSDB_CHECK_NULL(pRow, code, lino, _end, terrno);
6,412!
498

499
  pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
6,412!
500
  TSDB_CHECK_NULL(pRes, code, lino, _end, terrno);
6,413!
501

502
  pkBufLen = (pr->rowKey.numOfPKs > 0) ? pr->pkColumn.bytes : 0;
6,413✔
503
  for (int32_t j = 0; j < pr->numOfCols; ++j) {
23,963✔
504
    int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes;
17,548✔
505

506
    pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + pkBufLen + VARSTR_HEADER_SIZE);
17,548!
507
    TSDB_CHECK_NULL(pRes[j], code, lino, _end, terrno);
17,550!
508

509
    SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
17,550✔
510
    p->ts = INT64_MIN;
17,550✔
511
  }
512

513
  (void)taosThreadMutexLock(&pr->readerMutex);
6,415✔
514
  code = tsdbTakeReadSnap2((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap, pr->idstr);
6,414✔
515
  TSDB_CHECK_CODE(code, lino, _end);
6,410!
516

517
  int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
6,410✔
518

519
  STableKeyInfo* pTableList = pr->pTableList;
6,410✔
520

521
  // retrieve the only one last row of all tables in the uid list.
522
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
6,410✔
523
    SArray* pLastCols = taosArrayInit(pr->numOfCols, sizeof(SLastCol));
1,830✔
524
    TSDB_CHECK_NULL(pLastCols, code, lino, _end, terrno);
1,827!
525

526
    for (int32_t i = 0; i < pr->numOfCols; ++i) {
4,856✔
527
      int32_t slotId = slotIds[i];
3,031✔
528
      if (slotId == -1) {
3,031✔
529
        SLastCol p = {.rowKey.ts = INT64_MIN, .colVal.value.type = TSDB_DATA_TYPE_BOOL, .colVal.flag = CV_FLAG_NULL};
253✔
530
        void*    px = taosArrayPush(pLastCols, &p);
253✔
531
        TSDB_CHECK_NULL(px, code, lino, _end, terrno);
253!
532
        continue;
253✔
533
      }
534
      struct STColumn* pCol = &pr->pSchema->columns[slotId];
2,778✔
535
      SLastCol         p = {.rowKey.ts = INT64_MIN, .colVal.value.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
2,778✔
536

537
      if (pr->rowKey.numOfPKs > 0) {
2,778!
538
        p.rowKey.numOfPKs = pr->rowKey.numOfPKs;
×
539
        for (int32_t j = 0; j < pr->rowKey.numOfPKs; j++) {
×
540
          p.rowKey.pks[j].type = pr->pkColumn.type;
×
541
          if (IS_VAR_DATA_TYPE(pr->pkColumn.type)) {
×
UNCOV
542
            p.rowKey.pks[j].pData = taosMemoryCalloc(1, pr->pkColumn.bytes);
×
UNCOV
543
            TSDB_CHECK_NULL(p.rowKey.pks[j].pData, code, lino, _end, terrno);
×
544
          }
545
        }
546
      }
547

548
      if (IS_VAR_DATA_TYPE(pCol->type) || pCol->type == TSDB_DATA_TYPE_DECIMAL) {
2,778!
549
        p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
476!
550
        TSDB_CHECK_NULL(p.colVal.value.pData, code, lino, _end, terrno);
479!
551
      }
552

553
      void* px = taosArrayPush(pLastCols, &p);
2,776✔
554
      TSDB_CHECK_NULL(px, code, lino, _end, terrno);
2,776!
555
    }
556

557
    int64_t st = taosGetTimestampUs();
1,831✔
558
    int64_t totalLastTs = INT64_MAX;
1,831✔
559
    for (int32_t i = 0; i < pr->numOfTables; ++i) {
6,932✔
560
      tb_uid_t uid = pTableList[i].uid;
5,096✔
561

562
      code = tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
5,096✔
563
      if (code == -1) {  // fix the invalid return code
5,100!
UNCOV
564
        code = 0;
×
565
      }
566
      TSDB_CHECK_CODE(code, lino, _end);
5,100!
567

568
      if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
5,100!
569
        taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
3,188✔
570
        continue;
3,188✔
571
      }
572

573
      {
574
        bool    hasNotNullRow = true;
1,912✔
575
        int64_t singleTableLastTs = INT64_MAX;
1,912✔
576
        for (int32_t k = 0; k < pr->numOfCols; ++k) {
5,439✔
577
          if (slotIds[k] == -1) continue;
3,527✔
578
          SLastCol* p = taosArrayGet(pLastCols, k);
3,439✔
579
          if (p == NULL) {
3,438!
UNCOV
580
            return TSDB_CODE_INVALID_PARA;
×
581
          }
582

583
          SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k);
3,438✔
584
          if (pColVal == NULL) {
3,438!
UNCOV
585
            return TSDB_CODE_INVALID_PARA;
×
586
          }
587

588
          if (tRowKeyCompare(&pColVal->rowKey, &p->rowKey) > 0) {
3,438✔
589
            if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
2,501✔
590
              if (!COL_VAL_IS_VALUE(&p->colVal)) {
169✔
591
                hasNotNullRow = false;
127✔
592
              }
593
              // For all of cols is null, the last null col of last table will be save
594
              if (i != pr->numOfTables - 1 || k != pr->numOfCols - 1 || hasRes) {
169!
595
                continue;
169✔
596
              }
597
            }
598

599
            hasRes = true;
2,332✔
600
            p->rowKey.ts = pColVal->rowKey.ts;
2,332✔
601
            for (int32_t j = 0; j < p->rowKey.numOfPKs; j++) {
2,332!
602
              if (IS_VAR_DATA_TYPE(p->rowKey.pks[j].type)) {
×
UNCOV
603
                memcpy(p->rowKey.pks[j].pData, pColVal->rowKey.pks[j].pData, pColVal->rowKey.pks[j].nData);
×
604
                p->rowKey.pks[j].nData = pColVal->rowKey.pks[j].nData;
×
605
              } else {
UNCOV
606
                valueCloneDatum(p->rowKey.pks + j, pColVal->rowKey.pks + j, p->rowKey.pks[j].type);
×
607
              }
608
            }
609

610
            if (k == 0) {
2,332✔
611
              if (TARRAY_SIZE(pTableUidList) == 0) {
1,258✔
612
                void* px = taosArrayPush(pTableUidList, &uid);
1,039✔
613
                TSDB_CHECK_NULL(px, code, lino, _end, terrno);
1,039!
614
              } else {
615
                taosArraySet(pTableUidList, 0, &uid);
218✔
616
              }
617
            }
618

619
            if (pColVal->rowKey.ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
2,332✔
620
              singleTableLastTs = pColVal->rowKey.ts;
745✔
621
            }
622

623
            if (p->colVal.value.type != pColVal->colVal.value.type) {
2,332!
624
              // check for type/cid mismatch
UNCOV
625
              tsdbError("last cache type mismatch, uid:%" PRIu64
×
626
                        ", schema-type:%d, slotId:%d, cache-type:%d, cache-col:%d",
627
                        uid, p->colVal.value.type, slotIds[k], pColVal->colVal.value.type, pColVal->colVal.cid);
628
              taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
×
UNCOV
629
              code = TSDB_CODE_INVALID_PARA;
×
UNCOV
630
              goto _end;
×
631
            }
632

633
            if (!IS_VAR_DATA_TYPE(pColVal->colVal.value.type) && pColVal->colVal.value.type != TSDB_DATA_TYPE_DECIMAL) {
2,332!
634
              p->colVal = pColVal->colVal;
1,834✔
635
            } else {
636
              if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
498✔
637
                memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
455✔
638
              }
639

640
              p->colVal.value.nData = pColVal->colVal.value.nData;
498✔
641
              p->colVal.value.type = pColVal->colVal.value.type;
498✔
642
              p->colVal.flag = pColVal->colVal.flag;
498✔
643
              p->colVal.cid = pColVal->colVal.cid;
498✔
644
            }
645
          }
646
        }
647

648
        if (hasNotNullRow) {
1,912✔
649
          if (INT64_MAX == totalLastTs || (INT64_MAX != singleTableLastTs && totalLastTs < singleTableLastTs)) {
1,902✔
650
            totalLastTs = singleTableLastTs;
1,403✔
651
          }
652
          double cost = (taosGetTimestampUs() - st) / 1000.0;
1,904✔
653
          if (cost > tsCacheLazyLoadThreshold) {
1,904✔
654
            // pr->lastTs = totalLastTs;
655
          }
656
        }
657
      }
658

659
      taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
1,914✔
660
    }
661

662
    if (hasRes) {
1,836✔
663
      code = saveOneRow(pLastCols, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
1,040✔
664
      TSDB_CHECK_CODE(code, lino, _end);
1,040!
665
    }
666

667
    taosArrayDestroyEx(pLastCols, tsdbCacheFreeSLastColItem);
1,836✔
668
  } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
4,580!
669
    int32_t i = pr->tableIndex;
4,580✔
670
    for (; i < pr->numOfTables; ++i) {
7,208✔
671
      tb_uid_t uid = pTableList[i].uid;
4,415✔
672

673
      if ((code = tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype)) != 0) {
4,415!
UNCOV
674
        if (code == -1) {  // fix the invalid return code
×
675
          code = 0;
×
676
        }
UNCOV
677
        TSDB_CHECK_CODE(code, lino, _end);
×
678
      }
679

680
      if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
4,417!
681
        taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
1,893✔
682
        continue;
1,892✔
683
      }
684

685
      code = saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
2,524✔
686
      TSDB_CHECK_CODE(code, lino, _end);
2,523!
687

688
      taosArrayClearEx(pRow, tsdbCacheFreeSLastColItem);
2,523✔
689

690
      void* px = taosArrayPush(pTableUidList, &uid);
2,525✔
691
      TSDB_CHECK_NULL(px, code, lino, _end, terrno);
2,525!
692

693
      ++pr->tableIndex;
2,525✔
694
      if (pResBlock->info.rows >= pResBlock->info.capacity) {
2,525✔
695
        break;
1,789✔
696
      }
697
    }
698

699
    if (pGotAll && i == pr->numOfTables) {
4,582!
700
      *pGotAll = true;
2,796✔
701
    }
702
  } else {
UNCOV
703
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
704
    TSDB_CHECK_CODE(code, lino, _end);
×
705
  }
706

UNCOV
707
_end:
×
708
  tsdbUntakeReadSnap2((STsdbReader*)pr, pr->pReadSnap, true);
6,414✔
709
  pr->pReadSnap = NULL;
6,415✔
710

711
  if (pr->pCurFileSet) {
6,415✔
712
    pr->pCurFileSet = NULL;
12✔
713
  }
714

715
  (void)taosThreadMutexUnlock(&pr->readerMutex);
6,415✔
716

717
  if (pRes != NULL) {
6,417!
718
    for (int32_t j = 0; j < pr->numOfCols; ++j) {
23,972✔
719
      taosMemoryFree(pRes[j]);
17,554!
720
    }
721
  }
722

723
  taosMemoryFree(pRes);
6,418!
724
  taosArrayDestroy(pRow);
6,414✔
725

726
  if (code != TSDB_CODE_SUCCESS) {
6,413!
UNCOV
727
    tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
728
  }
729
  return code;
6,413✔
730
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc