• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.41
/source/libs/executor/src/cachescanoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "os.h"
18
#include "tname.h"
19

20
#include "tdatablock.h"
21
#include "tmsg.h"
22

23
#include "executorInt.h"
24
#include "functionMgt.h"
25
#include "operator.h"
26
#include "querytask.h"
27
#include "tcompare.h"
28
#include "thash.h"
29
#include "ttypes.h"
30

31
#include "storageapi.h"
32

33
typedef struct SCacheRowsScanInfo {
34
  SSDataBlock*    pRes;
35
  SReadHandle     readHandle;
36
  void*           pLastrowReader;
37
  SColMatchInfo   matchInfo;
38
  int32_t*        pSlotIds;
39
  int32_t*        pDstSlotIds;
40
  SExprSupp       pseudoExprSup;
41
  int32_t         retrieveType;
42
  int32_t         currentGroupIndex;
43
  SSDataBlock*    pBufferedRes;
44
  SArray*         pUidList;
45
  SArray*         pCidList;
46
  int32_t         indexOfBufferedRes;
47
  STableListInfo* pTableList;
48
  SArray*         pFuncTypeList;
49
  int32_t         numOfPks;
50
  SColumnInfo     pkCol;
51
  bool            gotAll;
52
} SCacheRowsScanInfo;
53

54
static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
55
static void    destroyCacheScanOperator(void* param);
56
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
57
                                      int32_t** pDstSlotIds);
58
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
59

60
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
61

62
static int32_t setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode* pScan) {
2,559✔
63
  int32_t code = TSDB_CODE_SUCCESS;
2,559✔
64
  int32_t lino = 0;
2,559✔
65
  SNode*  pNode;
66
  int32_t idx = 0;
2,559✔
67
  FOREACH(pNode, pScan->pTargets) {
9,935!
68
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
7,379!
69
      SColumnNode*     pCol = (SColumnNode*)pNode;
7,379✔
70
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
7,379✔
71
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
7,377✔
72
      pColInfo->info.colId = pCol->colId;
7,376✔
73
    }
74
    idx++;
7,376✔
75
  }
76

77
  for (; idx < pBlock->pDataBlock->size; ++idx) {
2,852✔
78
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
295✔
79
    QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
295!
80
    if (pScan->scan.pScanPseudoCols) {
296✔
81
      FOREACH(pNode, pScan->scan.pScanPseudoCols) {
274!
82
        STargetNode* pTarget = (STargetNode*)pNode;
262✔
83
        if (pColInfo->info.slotId == pTarget->slotId) {
262✔
84
          pColInfo->info.colId = 0;
246✔
85
          break;
246✔
86
        }
87
      }
88
    }
89
  }
90

91
_end:
2,557✔
92
  if (code != TSDB_CODE_SUCCESS) {
2,557!
93
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
94
  }
95
  return code;
2,561✔
96
}
97

98
int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
2,551✔
99
                                    STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
100
                                    SOperatorInfo** pOptrInfo) {
101
  QRY_PARAM_CHECK(pOptrInfo);
2,551!
102

103
  int32_t             code = TSDB_CODE_SUCCESS;
2,551✔
104
  int32_t             lino = 0;
2,551✔
105
  int32_t             numOfCols = 0;
2,551✔
106
  SNodeList*          pScanCols = pScanNode->scan.pScanCols;
2,551✔
107
  SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
2,551!
108
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,562!
109

110
  if (pInfo == NULL || pOperator == NULL) {
2,562!
111
    code = terrno;
×
112
    goto _error;
×
113
  }
114

115
  pInfo->pTableList = pTableListInfo;
2,563✔
116
  pInfo->readHandle = *readHandle;
2,563✔
117

118
  SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
2,563✔
119
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2,563✔
120
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
2,562!
121

122
  code = extractColMatchInfo(pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2,562✔
123
  QUERY_CHECK_CODE(code, lino, _error);
2,557!
124

125
  // todo: the pk information should comes from the physical plan
126
  // pk info may not in pScanCols, so extract primary key from pInfo->matchInfo may failed
127
  SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, 0);
2,557✔
128
  if (pSchemaInfo != NULL) {
2,559!
129
    if (pSchemaInfo->sw->pSchema[1].flags & COL_IS_KEY) {  // is primary key
2,559✔
130
      SSchema* pColSchema = &pSchemaInfo->sw->pSchema[1];
72✔
131
      pInfo->numOfPks = 1;
72✔
132
      pInfo->pkCol.type = pColSchema->type;
72✔
133
      pInfo->pkCol.bytes = pColSchema->bytes;
72✔
134
      pInfo->pkCol.pk = 1;
72✔
135
    }
136
  } else {
137
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
×
138
      SColMatchItem* pItem = taosArrayGet(pInfo->matchInfo.pList, i);
×
139
      QUERY_CHECK_NULL(pItem, code, lino, _error, terrno);
×
140
      if (pItem->isPk) {
×
141
        pInfo->numOfPks += 1;
×
142
        pInfo->pkCol.type = pItem->dataType.type;    // only record one primary key
×
143
        pInfo->pkCol.bytes = pItem->dataType.bytes;  // only record one primary key
×
144
        pInfo->pkCol.pk = 1;
×
145
      }
146
    }
147
  }
148

149
  SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t));
2,559✔
150
  QUERY_CHECK_NULL(pCidList, code, lino, _error, terrno);
2,561!
151

152
  pInfo->pFuncTypeList = taosArrayInit(taosArrayGetSize(pScanNode->pFuncTypes), sizeof(int32_t));
2,561✔
153
  QUERY_CHECK_NULL(pInfo->pFuncTypeList, code, lino, _error, terrno);
2,563!
154

155
  void* tmp = taosArrayAddAll(pInfo->pFuncTypeList, pScanNode->pFuncTypes);
2,563✔
156
  if (!tmp && taosArrayGetSize(pScanNode->pFuncTypes) > 0) {
2,559!
157
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
×
158
  }
159

160
  for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) {
8,197✔
161
    SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i);
5,635✔
162
    QUERY_CHECK_NULL(pColInfo, code, lino, _error, terrno);
5,636!
163

164
    void*          tmp = taosArrayPush(pCidList, &pColInfo->colId);
5,636✔
165
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
5,637!
166
    if (pInfo->pFuncTypeList != NULL && taosArrayGetSize(pInfo->pFuncTypeList) > i) {
5,637!
167
      void* pFuncType = taosArrayGet(pInfo->pFuncTypeList, i);
855✔
168
      QUERY_CHECK_NULL(pFuncType, code, lino, _error, terrno);
856!
169
      pColInfo->funcType = *(int32_t*)pFuncType;
856✔
170
    }
171
  }
172
  pInfo->pCidList = pCidList;
2,562✔
173

174
  code = removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
2,562✔
175
  QUERY_CHECK_CODE(code, lino, _error);
2,562!
176

177
  code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds);
2,562✔
178
  QUERY_CHECK_CODE(code, lino, _error);
2,560!
179

180
  int32_t totalTables = 0;
2,560✔
181
  code = tableListGetSize(pTableListInfo, &totalTables);
2,560✔
182
  QUERY_CHECK_CODE(code, lino, _error);
2,560!
183

184
  int32_t capacity = 0;
2,560✔
185

186
  pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
2,560✔
187
  QUERY_CHECK_NULL(pInfo->pUidList, code, lino, _error, terrno);
2,562!
188

189
  // partition by tbname
190
  if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) {
4,240✔
191
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
1,676✔
192

193
    STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
1,676✔
194
    if (totalTables) QUERY_CHECK_NULL(pList, code, lino, _error, terrno);
1,679!
195

196
    uint64_t suid = tableListGetSuid(pTableListInfo);
1,679✔
197
    code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
1,679✔
198
                                                    taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds,
1,679✔
199
                                                    suid, &pInfo->pLastrowReader, pTaskInfo->id.str,
1,679✔
200
                                                    pScanNode->pFuncTypes, &pInfo->pkCol, pInfo->numOfPks);
201
    QUERY_CHECK_CODE(code, lino, _error);
1,676!
202

203
    capacity = TMIN(totalTables, 4096);
1,676✔
204

205
    pInfo->pBufferedRes = NULL;
1,676✔
206
    code = createOneDataBlock(pInfo->pRes, false, &pInfo->pBufferedRes);
1,676✔
207
    QUERY_CHECK_CODE(code, lino, _error);
1,679!
208

209
    code = setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
1,679✔
210
    QUERY_CHECK_CODE(code, lino, _error);
1,679!
211

212
    code = blockDataEnsureCapacity(pInfo->pBufferedRes, capacity);
1,679✔
213
    QUERY_CHECK_CODE(code, lino, _error);
1,678!
214
  } else {  // by tags
215
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
883✔
216
    capacity = 1;  // only one row output
883✔
217
    code = setColIdForCacheReadBlock(pInfo->pRes, pScanNode);
883✔
218
    QUERY_CHECK_CODE(code, lino, _error);
882!
219
  }
220

221
  initResultSizeInfo(&pOperator->resultInfo, capacity);
2,560✔
222
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
2,561✔
223
  QUERY_CHECK_CODE(code, lino, _error);
2,562!
224

225
  if (pScanNode->scan.pScanPseudoCols != NULL) {
2,562✔
226
    SExprSupp* p = &pInfo->pseudoExprSup;
1,043✔
227
    code = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->pExprInfo, &p->numOfExprs);
1,043✔
228
    TSDB_CHECK_CODE(code, lino, _error);
1,041!
229

230
    p->pCtx =
1,041✔
231
        createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
1,041✔
232
    QUERY_CHECK_NULL(p->pCtx, code, lino, _error, terrno);
1,041!
233
  }
234

235
  setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED,
2,560✔
236
                  pInfo, pTaskInfo);
237
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
2,560✔
238

239
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doScanCacheNext, NULL, destroyCacheScanOperator, optrDefaultBufFn,
2,561✔
240
                                         NULL, optrDefaultGetNextExtFn, NULL);
241

242
  pOperator->cost.openCost = 0;
2,560✔
243

244
  *pOptrInfo = pOperator;
2,560✔
245
  return code;
2,560✔
246

247
_error:
×
248
  pTaskInfo->code = code;
×
249
  if (code != TSDB_CODE_SUCCESS) {
×
250
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
251
  }
252
  if (pInfo != NULL) {
×
253
    pInfo->pTableList = NULL;
×
254
    destroyCacheScanOperator(pInfo);
×
255
  }
256
  if (pOperator != NULL) {
×
257
    pOperator->info = NULL;
×
258
    destroyOperator(pOperator);
×
259
  }
260
  return code;
×
261
}
262

263
static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
4,731✔
264
  int32_t code = TSDB_CODE_SUCCESS;
4,731✔
265
  int32_t lino = 0;
4,731✔
266
  if (pOperator->status == OP_EXEC_DONE) {
4,731!
267
    (*ppRes) = NULL;
×
268
    return code;
×
269
  }
270

271
  SCacheRowsScanInfo* pInfo = pOperator->info;
4,731✔
272
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
4,731✔
273
  STableListInfo*     pTableList = pInfo->pTableList;
4,731✔
274
  SStoreCacheReader*  pReaderFn = &pInfo->readHandle.api.cacheFn;
4,731✔
275
  SSDataBlock*        pBufRes = pInfo->pBufferedRes;
4,731✔
276

277
  uint64_t suid = tableListGetSuid(pTableList);
4,731✔
278
  int32_t  size = 0;
4,733✔
279
  code = tableListGetSize(pTableList, &size);
4,733✔
280
  QUERY_CHECK_CODE(code, lino, _end);
4,733!
281

282
  if (size == 0) {
4,733✔
283
    setOperatorCompleted(pOperator);
242✔
284
    (*ppRes) = NULL;
242✔
285
    return code;
242✔
286
  }
287

288
  blockDataCleanup(pInfo->pRes);
4,491✔
289

290
  // check if it is a group by tbname
291
  if ((pInfo->retrieveType & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
4,490✔
292
    if (isTaskKilled(pTaskInfo)) {
3,277!
293
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
294
    }
295

296
    if (pInfo->indexOfBufferedRes >= pBufRes->info.rows && !pInfo->gotAll) {
3,277✔
297
      blockDataCleanup(pBufRes);
2,968✔
298
      taosArrayClear(pInfo->pUidList);
2,969✔
299

300
      code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pBufRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
2,970✔
301
                                     pInfo->pUidList, &pInfo->gotAll);
302
      QUERY_CHECK_CODE(code, lino, _end);
2,970!
303

304
      // check for tag values
305
      int32_t resultRows = pBufRes->info.rows;
2,970✔
306

307
      // the results may be null, if last values are all null
308
      if (resultRows != 0 && resultRows != taosArrayGetSize(pInfo->pUidList)) {
2,970✔
309
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
1✔
310
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pTaskInfo->code));
1!
311
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1!
312
      }
313
      pInfo->indexOfBufferedRes = 0;
2,969✔
314
    }
315

316
    SSDataBlock* pRes = pInfo->pRes;
3,278✔
317

318
    if (pInfo->indexOfBufferedRes < pBufRes->info.rows) {
3,278✔
319
      for (int32_t i = 0; i < taosArrayGetSize(pBufRes->pDataBlock); ++i) {
6,723✔
320
        SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
5,121✔
321
        QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
5,121!
322
        int32_t          slotId = pCol->info.slotId;
5,121✔
323

324
        SColumnInfoData* pSrc = taosArrayGet(pBufRes->pDataBlock, slotId);
5,121✔
325
        QUERY_CHECK_NULL(pSrc, code, lino, _end, terrno);
5,120!
326
        SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
5,120✔
327
        QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
5,121!
328

329
        if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
10,242✔
330
          colDataSetNULL(pDst, 0);
331
        } else {
332
          if (pSrc->pData) {
4,379✔
333
            char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
3,768!
334
            code = colDataSetVal(pDst, 0, p, false);
3,768✔
335
            QUERY_CHECK_CODE(code, lino, _end);
3,770!
336
          }
337
        }
338
      }
339

340
      void* pUid = taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
1,599✔
341
      QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
1,599!
342

343
      pRes->info.id.uid = *(tb_uid_t*)pUid;
1,599✔
344
      pRes->info.rows = 1;
1,599✔
345
      pRes->info.scanFlag = MAIN_SCAN;
1,599✔
346

347
      SExprSupp* pSup = &pInfo->pseudoExprSup;
1,599✔
348
      code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows,
1,599✔
349
                                    pTaskInfo, NULL);
350
      QUERY_CHECK_CODE(code, lino, _end);
1,600!
351

352
      pRes->info.id.groupId = tableListGetTableGroupId(pTableList, pRes->info.id.uid);
1,600✔
353
      pInfo->indexOfBufferedRes += 1;
1,600✔
354
      (*ppRes) = pRes;
1,600✔
355
      return code;
1,600✔
356
    } else {
357
      setOperatorCompleted(pOperator);
1,678✔
358
      (*ppRes) = NULL;
1,678✔
359
      return code;
1,678✔
360
    }
361
  } else {
362
    size_t totalGroups = tableListGetOutputGroups(pTableList);
1,213✔
363

364
    while (pInfo->currentGroupIndex < totalGroups) {
1,575✔
365
      if (isTaskKilled(pTaskInfo)) {
933!
366
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
367
      }
368

369
      STableKeyInfo* pList = NULL;
933✔
370
      int32_t        num = 0;
933✔
371

372
      code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
933✔
373
      QUERY_CHECK_CODE(code, lino, _end);
933!
374

375
      if (NULL == pInfo->pLastrowReader) {
933✔
376
        int32_t tmpRes = pReaderFn->openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
641✔
377
                                               taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList,
642✔
378
                                               pInfo->pSlotIds, suid, &pInfo->pLastrowReader, pTaskInfo->id.str,
642✔
379
                                               pInfo->pFuncTypeList, &pInfo->pkCol, pInfo->numOfPks);
380

381
        if (tmpRes != TSDB_CODE_SUCCESS) {
642!
382
          pInfo->currentGroupIndex += 1;
×
383
          taosArrayClear(pInfo->pUidList);
×
384
          continue;
×
385
        }
386
      } else {
387
        code = pReaderFn->reuseReader(pInfo->pLastrowReader, pList, num);
291✔
388
        QUERY_CHECK_CODE(code, lino, _end);
291!
389
      }
390

391
      taosArrayClear(pInfo->pUidList);
933✔
392

393
      code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
932✔
394
                                     pInfo->pUidList, NULL);
395
      QUERY_CHECK_CODE(code, lino, _end);
933!
396

397
      pInfo->currentGroupIndex += 1;
933✔
398

399
      // check for tag values
400
      if (pInfo->pRes->info.rows > 0) {
933✔
401
        if (pInfo->pseudoExprSup.numOfExprs > 0) {
572✔
402
          SExprSupp* pSup = &pInfo->pseudoExprSup;
250✔
403

404
          STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0];
250✔
405
          pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
250✔
406

407
          if (taosArrayGetSize(pInfo->pUidList) > 0) {
250!
408
            void* pUid = taosArrayGet(pInfo->pUidList, 0);
250✔
409
            QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
250!
410
            pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid;
250✔
411
            code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
250✔
412
                                          pInfo->pRes->info.rows, pTaskInfo, NULL);
250✔
413
            QUERY_CHECK_CODE(code, lino, _end);
250!
414
          }
415
        }
416

417
        // pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
418
        (*ppRes) = pInfo->pRes;
572✔
419
        return code;
572✔
420
      } else {
421
        // pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
422
      }
423
    }
424

425
    pReaderFn->closeReader(pInfo->pLastrowReader);
642✔
426
    pInfo->pLastrowReader = NULL;
642✔
427
    setOperatorCompleted(pOperator);
642✔
428
    (*ppRes) = NULL;
641✔
429
    return code;
641✔
430
  }
431

432
_end:
×
433
  if (code != TSDB_CODE_SUCCESS) {
×
434
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
435
    pTaskInfo->code = code;
×
436
    T_LONG_JMP(pTaskInfo->env, code);
×
437
  }
438
  return code;
×
439
}
440

441
void destroyCacheScanOperator(void* param) {
2,554✔
442
  SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
2,554✔
443
  blockDataDestroy(pInfo->pRes);
2,554✔
444
  blockDataDestroy(pInfo->pBufferedRes);
2,559✔
445
  taosMemoryFreeClear(pInfo->pSlotIds);
2,562!
446
  taosMemoryFreeClear(pInfo->pDstSlotIds);
2,562!
447
  taosArrayDestroy(pInfo->pCidList);
2,562✔
448
  taosArrayDestroy(pInfo->pFuncTypeList);
2,563✔
449
  taosArrayDestroy(pInfo->pUidList);
2,563✔
450
  taosArrayDestroy(pInfo->matchInfo.pList);
2,563✔
451
  tableListDestroy(pInfo->pTableList);
2,563✔
452

453
  if (pInfo->pLastrowReader != NULL && pInfo->readHandle.api.cacheFn.closeReader != NULL) {
2,562!
454
    pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader);
1,679✔
455
    pInfo->pLastrowReader = NULL;
1,679✔
456
  }
457

458
  cleanupExprSupp(&pInfo->pseudoExprSup);
2,562✔
459
  taosMemoryFreeClear(param);
2,562!
460
}
2,563✔
461

462
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
2,561✔
463
                               int32_t** pDstSlotIds) {
464
  size_t numOfCols = taosArrayGetSize(pColMatchInfo);
2,561✔
465

466
  *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
2,563!
467
  if (*pSlotIds == NULL) {
2,562!
468
    return terrno;
×
469
  }
470

471
  *pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
2,562!
472
  if (*pDstSlotIds == NULL) {
2,560!
473
    taosMemoryFreeClear(*pSlotIds);
×
474
    return terrno;
×
475
  }
476

477
  SSchemaInfo*    pSchemaInfo = taosArrayGetLast(pTaskInfo->schemaInfos);
2,560✔
478
  SSchemaWrapper* pWrapper = pSchemaInfo->sw;
2,560✔
479

480
  for (int32_t i = 0; i < numOfCols; ++i) {
8,200✔
481
    SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
5,638✔
482
    if (!pColMatch) {
5,637!
UNCOV
483
      return terrno;
×
484
    }
485
    bool           found = false;
5,640✔
486
    for (int32_t j = 0; j < pWrapper->nCols; ++j) {
51,914✔
487
      /*      if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
488
        (*pSlotIds)[pColMatch->dstSlotId] = -1;
489
        break;
490
        }*/
491

492
      if (pColMatch->colId == pWrapper->pSchema[j].colId) {
51,548✔
493
        (*pSlotIds)[i] = j;
5,274✔
494
        (*pDstSlotIds)[i] = pColMatch->dstSlotId;
5,274✔
495
        found = true;
5,274✔
496
        break;
5,274✔
497
      }
498
    }
499
    if (!found) {
5,640✔
500
      (*pSlotIds)[i] = -1;
364✔
501
      (*pDstSlotIds)[i] = pColMatch->dstSlotId;
364✔
502
    }
503
  }
504

505
  return TSDB_CODE_SUCCESS;
2,562✔
506
}
507

508
int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo) {
2,543✔
509
  int32_t code = TSDB_CODE_SUCCESS;
2,543✔
510
  int32_t lino = 0;
2,543✔
511
  if (!pScanNode->ignoreNull) {  // retrieve cached last value
2,543✔
512
    return TSDB_CODE_SUCCESS;
1,407✔
513
  }
514

515
  size_t  size = taosArrayGetSize(pColMatchInfo->pList);
1,136✔
516
  SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
1,151✔
517
  QUERY_CHECK_NULL(pMatchInfo, code, lino, _end, terrno);
1,153!
518

519
  for (int32_t i = 0; i < size; ++i) {
3,823✔
520
    SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i);
2,668✔
521
    if (!pColInfo) {
2,669!
522
      return terrno;
×
523
    }
524

525
    int32_t    slotId = pColInfo->dstSlotId;
2,669✔
526
    SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots;
2,669✔
527

528
    SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
2,669✔
529
    QUERY_CHECK_NULL(pDesc, code, lino, _end, terrno);
2,669!
530

531
    if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
2,669✔
532
      void* tmp = taosArrayPush(pMatchInfo, pColInfo);
2,556✔
533
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2,556!
534
    } else if (FUNCTION_TYPE_CACHE_LAST_ROW == pColInfo->funcType) {
114!
535
      void* tmp = taosArrayPush(pMatchInfo, pColInfo);
114✔
536
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
114!
537
    }
538
  }
539

540
  taosArrayDestroy(pColMatchInfo->pList);
1,155✔
541
  pColMatchInfo->pList = pMatchInfo;
1,154✔
542

543
_end:
1,154✔
544
  if (code != TSDB_CODE_SUCCESS) {
1,154!
545
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
546
  }
547
  return code;
1,154✔
548
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc