• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3911

24 Apr 2025 11:36PM UTC coverage: 53.735% (-1.6%) from 55.295%
#3911

push

travis-ci

happyguoxy
Sync branches at 2025-04-25 07:35

170049 of 316459 relevant lines covered (53.73%)

1192430.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.6
/source/libs/executor/src/cachescanoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "os.h"
18
#include "tname.h"
19

20
#include "tdatablock.h"
21
#include "tmsg.h"
22

23
#include "executorInt.h"
24
#include "functionMgt.h"
25
#include "operator.h"
26
#include "querytask.h"
27
#include "tcompare.h"
28
#include "thash.h"
29
#include "ttypes.h"
30

31
#include "storageapi.h"
32

33
typedef struct SCacheRowsScanInfo {
34
  SSDataBlock*    pRes;
35
  SReadHandle     readHandle;
36
  void*           pLastrowReader;
37
  SColMatchInfo   matchInfo;
38
  int32_t*        pSlotIds;
39
  int32_t*        pDstSlotIds;
40
  SExprSupp       pseudoExprSup;
41
  int32_t         retrieveType;
42
  int32_t         currentGroupIndex;
43
  SSDataBlock*    pBufferedRes;
44
  SArray*         pUidList;
45
  SArray*         pCidList;
46
  int32_t         indexOfBufferedRes;
47
  STableListInfo* pTableList;
48
  SArray*         pFuncTypeList;
49
  int32_t         numOfPks;
50
  SColumnInfo     pkCol;
51
  bool            gotAll;
52
} SCacheRowsScanInfo;
53

54
static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
55
static void    destroyCacheScanOperator(void* param);
56
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
57
                                      int32_t** pDstSlotIds);
58
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
59

60
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
61

62
static int32_t setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode* pScan) {
12✔
63
  int32_t code = TSDB_CODE_SUCCESS;
12✔
64
  int32_t lino = 0;
12✔
65
  SNode*  pNode;
66
  int32_t idx = 0;
12✔
67
  FOREACH(pNode, pScan->pTargets) {
80✔
68
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
68✔
69
      SColumnNode*     pCol = (SColumnNode*)pNode;
68✔
70
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
68✔
71
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
68✔
72
      pColInfo->info.colId = pCol->colId;
68✔
73
    }
74
    idx++;
68✔
75
  }
76

77
  for (; idx < pBlock->pDataBlock->size; ++idx) {
12✔
78
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
×
79
    QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
80
    if (pScan->scan.pScanPseudoCols) {
×
81
      FOREACH(pNode, pScan->scan.pScanPseudoCols) {
×
82
        STargetNode* pTarget = (STargetNode*)pNode;
×
83
        if (pColInfo->info.slotId == pTarget->slotId) {
×
84
          pColInfo->info.colId = 0;
×
85
          break;
×
86
        }
87
      }
88
    }
89
  }
90

91
_end:
12✔
92
  if (code != TSDB_CODE_SUCCESS) {
12✔
93
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
94
  }
95
  return code;
12✔
96
}
97

98
int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
12✔
99
                                    STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
100
                                    SOperatorInfo** pOptrInfo) {
101
  QRY_PARAM_CHECK(pOptrInfo);
12✔
102

103
  int32_t             code = TSDB_CODE_SUCCESS;
12✔
104
  int32_t             lino = 0;
12✔
105
  int32_t             numOfCols = 0;
12✔
106
  SNodeList*          pScanCols = pScanNode->scan.pScanCols;
12✔
107
  SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
12✔
108
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
12✔
109

110
  if (pInfo == NULL || pOperator == NULL) {
12✔
111
    code = terrno;
×
112
    goto _error;
×
113
  }
114

115
  pInfo->pTableList = pTableListInfo;
12✔
116
  pInfo->readHandle = *readHandle;
12✔
117

118
  SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
12✔
119
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
12✔
120
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
12✔
121

122
  code = extractColMatchInfo(pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
12✔
123
  QUERY_CHECK_CODE(code, lino, _error);
12✔
124

125
  // todo: the pk information should comes from the physical plan
126
  // pk info may not in pScanCols, so extract primary key from pInfo->matchInfo may failed
127
  SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, 0);
12✔
128
  if (pSchemaInfo != NULL) {
12✔
129
    if (pSchemaInfo->sw->pSchema[1].flags & COL_IS_KEY) {  // is primary key
12✔
130
      SSchema* pColSchema = &pSchemaInfo->sw->pSchema[1];
×
131
      pInfo->numOfPks = 1;
×
132
      pInfo->pkCol.type = pColSchema->type;
×
133
      pInfo->pkCol.bytes = pColSchema->bytes;
×
134
      pInfo->pkCol.pk = 1;
×
135
    }
136
  } else {
137
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
×
138
      SColMatchItem* pItem = taosArrayGet(pInfo->matchInfo.pList, i);
×
139
      QUERY_CHECK_NULL(pItem, code, lino, _error, terrno);
×
140
      if (pItem->isPk) {
×
141
        pInfo->numOfPks += 1;
×
142
        pInfo->pkCol.type = pItem->dataType.type;    // only record one primary key
×
143
        pInfo->pkCol.bytes = pItem->dataType.bytes;  // only record one primary key
×
144
        pInfo->pkCol.pk = 1;
×
145
      }
146
    }
147
  }
148

149
  SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t));
12✔
150
  QUERY_CHECK_NULL(pCidList, code, lino, _error, terrno);
12✔
151

152
  pInfo->pFuncTypeList = taosArrayInit(taosArrayGetSize(pScanNode->pFuncTypes), sizeof(int32_t));
12✔
153
  QUERY_CHECK_NULL(pInfo->pFuncTypeList, code, lino, _error, terrno);
12✔
154

155
  void* tmp = taosArrayAddAll(pInfo->pFuncTypeList, pScanNode->pFuncTypes);
12✔
156
  if (!tmp && taosArrayGetSize(pScanNode->pFuncTypes) > 0) {
12✔
157
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
×
158
  }
159

160
  for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) {
80✔
161
    SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i);
68✔
162
    QUERY_CHECK_NULL(pColInfo, code, lino, _error, terrno);
68✔
163

164
    void*          tmp = taosArrayPush(pCidList, &pColInfo->colId);
68✔
165
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
68✔
166
    if (pInfo->pFuncTypeList != NULL && taosArrayGetSize(pInfo->pFuncTypeList) > i) {
68✔
167
      void* pFuncType = taosArrayGet(pInfo->pFuncTypeList, i);
×
168
      QUERY_CHECK_NULL(pFuncType, code, lino, _error, terrno);
×
169
      pColInfo->funcType = *(int32_t*)pFuncType;
×
170
    }
171
  }
172
  pInfo->pCidList = pCidList;
12✔
173

174
  code = removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
12✔
175
  QUERY_CHECK_CODE(code, lino, _error);
12✔
176

177
  code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds);
12✔
178
  QUERY_CHECK_CODE(code, lino, _error);
12✔
179

180
  int32_t totalTables = 0;
12✔
181
  code = tableListGetSize(pTableListInfo, &totalTables);
12✔
182
  QUERY_CHECK_CODE(code, lino, _error);
12✔
183

184
  int32_t capacity = 0;
12✔
185

186
  pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
12✔
187
  QUERY_CHECK_NULL(pInfo->pUidList, code, lino, _error, terrno);
12✔
188

189
  // partition by tbname
190
  if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) {
17✔
191
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
5✔
192

193
    STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
5✔
194
    if (totalTables) QUERY_CHECK_NULL(pList, code, lino, _error, terrno);
5✔
195

196
    uint64_t suid = tableListGetSuid(pTableListInfo);
5✔
197
    code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
5✔
198
                                                    taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds,
5✔
199
                                                    suid, &pInfo->pLastrowReader, pTaskInfo->id.str,
5✔
200
                                                    pScanNode->pFuncTypes, &pInfo->pkCol, pInfo->numOfPks);
201
    QUERY_CHECK_CODE(code, lino, _error);
5✔
202

203
    capacity = TMIN(totalTables, 4096);
5✔
204

205
    pInfo->pBufferedRes = NULL;
5✔
206
    code = createOneDataBlock(pInfo->pRes, false, &pInfo->pBufferedRes);
5✔
207
    QUERY_CHECK_CODE(code, lino, _error);
5✔
208

209
    code = setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
5✔
210
    QUERY_CHECK_CODE(code, lino, _error);
5✔
211

212
    code = blockDataEnsureCapacity(pInfo->pBufferedRes, capacity);
5✔
213
    QUERY_CHECK_CODE(code, lino, _error);
5✔
214
  } else {  // by tags
215
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
7✔
216
    capacity = 1;  // only one row output
7✔
217
    code = setColIdForCacheReadBlock(pInfo->pRes, pScanNode);
7✔
218
    QUERY_CHECK_CODE(code, lino, _error);
7✔
219
  }
220

221
  initResultSizeInfo(&pOperator->resultInfo, capacity);
12✔
222
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
12✔
223
  QUERY_CHECK_CODE(code, lino, _error);
12✔
224

225
  if (pScanNode->scan.pScanPseudoCols != NULL) {
12✔
226
    SExprSupp* p = &pInfo->pseudoExprSup;
×
227
    code = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->pExprInfo, &p->numOfExprs);
×
228
    TSDB_CHECK_CODE(code, lino, _error);
×
229

230
    p->pCtx =
×
231
        createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
×
232
    QUERY_CHECK_NULL(p->pCtx, code, lino, _error, terrno);
×
233
  }
234

235
  setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED,
12✔
236
                  pInfo, pTaskInfo);
237
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
12✔
238

239
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doScanCacheNext, NULL, destroyCacheScanOperator, optrDefaultBufFn,
12✔
240
                                         NULL, optrDefaultGetNextExtFn, NULL);
241

242
  pOperator->cost.openCost = 0;
12✔
243

244
  *pOptrInfo = pOperator;
12✔
245
  return code;
12✔
246

247
_error:
×
248
  pTaskInfo->code = code;
×
249
  if (code != TSDB_CODE_SUCCESS) {
×
250
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
251
  }
252
  if (pInfo != NULL) {
×
253
    pInfo->pTableList = NULL;
×
254
    destroyCacheScanOperator(pInfo);
×
255
  }
256
  if (pOperator != NULL) {
×
257
    pOperator->info = NULL;
×
258
    destroyOperator(pOperator);
×
259
  }
260
  return code;
×
261
}
262

263
static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
321✔
264
  int32_t code = TSDB_CODE_SUCCESS;
321✔
265
  int32_t lino = 0;
321✔
266
  if (pOperator->status == OP_EXEC_DONE) {
321✔
267
    (*ppRes) = NULL;
×
268
    return code;
×
269
  }
270

271
  SCacheRowsScanInfo* pInfo = pOperator->info;
321✔
272
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
321✔
273
  STableListInfo*     pTableList = pInfo->pTableList;
321✔
274
  SStoreCacheReader*  pReaderFn = &pInfo->readHandle.api.cacheFn;
321✔
275
  SSDataBlock*        pBufRes = pInfo->pBufferedRes;
321✔
276

277
  uint64_t suid = tableListGetSuid(pTableList);
321✔
278
  int32_t  size = 0;
321✔
279
  code = tableListGetSize(pTableList, &size);
321✔
280
  QUERY_CHECK_CODE(code, lino, _end);
321✔
281

282
  if (size == 0) {
321✔
283
    setOperatorCompleted(pOperator);
×
284
    (*ppRes) = NULL;
×
285
    return code;
×
286
  }
287

288
  blockDataCleanup(pInfo->pRes);
321✔
289

290
  // check if it is a group by tbname
291
  if ((pInfo->retrieveType & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
321✔
292
    if (isTaskKilled(pTaskInfo)) {
307✔
293
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
294
    }
295

296
    if (pInfo->indexOfBufferedRes >= pBufRes->info.rows && !pInfo->gotAll) {
307✔
297
      blockDataCleanup(pBufRes);
10✔
298
      taosArrayClear(pInfo->pUidList);
10✔
299

300
      code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pBufRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
10✔
301
                                     pInfo->pUidList, &pInfo->gotAll);
302
      QUERY_CHECK_CODE(code, lino, _end);
10✔
303

304
      // check for tag values
305
      int32_t resultRows = pBufRes->info.rows;
10✔
306

307
      // the results may be null, if last values are all null
308
      if (resultRows != 0 && resultRows != taosArrayGetSize(pInfo->pUidList)) {
10✔
309
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
310
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pTaskInfo->code));
×
311
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
312
      }
313
      pInfo->indexOfBufferedRes = 0;
10✔
314
    }
315

316
    SSDataBlock* pRes = pInfo->pRes;
307✔
317

318
    if (pInfo->indexOfBufferedRes < pBufRes->info.rows) {
307✔
319
      for (int32_t i = 0; i < taosArrayGetSize(pBufRes->pDataBlock); ++i) {
1,510✔
320
        SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
1,208✔
321
        QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
1,208✔
322
        int32_t          slotId = pCol->info.slotId;
1,208✔
323

324
        SColumnInfoData* pSrc = taosArrayGet(pBufRes->pDataBlock, slotId);
1,208✔
325
        QUERY_CHECK_NULL(pSrc, code, lino, _end, terrno);
1,208✔
326
        SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
1,208✔
327
        QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
1,208✔
328

329
        if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
2,416✔
330
          colDataSetNULL(pDst, 0);
331
        } else {
332
          if (pSrc->pData) {
1,208✔
333
            char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
1,208✔
334
            code = colDataSetVal(pDst, 0, p, false);
1,208✔
335
            QUERY_CHECK_CODE(code, lino, _end);
1,208✔
336
          }
337
        }
338
      }
339

340
      void* pUid = taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
302✔
341
      QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
302✔
342

343
      pRes->info.id.uid = *(tb_uid_t*)pUid;
302✔
344
      pRes->info.rows = 1;
302✔
345
      pRes->info.scanFlag = MAIN_SCAN;
302✔
346

347
      SExprSupp* pSup = &pInfo->pseudoExprSup;
302✔
348
      code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows,
302✔
349
                                    pTaskInfo, NULL);
350
      QUERY_CHECK_CODE(code, lino, _end);
302✔
351

352
      pRes->info.id.groupId = tableListGetTableGroupId(pTableList, pRes->info.id.uid);
302✔
353
      pInfo->indexOfBufferedRes += 1;
302✔
354
      (*ppRes) = pRes;
302✔
355
      return code;
302✔
356
    } else {
357
      setOperatorCompleted(pOperator);
5✔
358
      (*ppRes) = NULL;
5✔
359
      return code;
5✔
360
    }
361
  } else {
362
    size_t totalGroups = tableListGetOutputGroups(pTableList);
14✔
363

364
    while (pInfo->currentGroupIndex < totalGroups) {
14✔
365
      if (isTaskKilled(pTaskInfo)) {
7✔
366
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
367
      }
368

369
      STableKeyInfo* pList = NULL;
7✔
370
      int32_t        num = 0;
7✔
371

372
      code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
7✔
373
      QUERY_CHECK_CODE(code, lino, _end);
7✔
374

375
      if (NULL == pInfo->pLastrowReader) {
7✔
376
        int32_t tmpRes = pReaderFn->openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
7✔
377
                                               taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList,
7✔
378
                                               pInfo->pSlotIds, suid, &pInfo->pLastrowReader, pTaskInfo->id.str,
7✔
379
                                               pInfo->pFuncTypeList, &pInfo->pkCol, pInfo->numOfPks);
380

381
        if (tmpRes != TSDB_CODE_SUCCESS) {
7✔
382
          pInfo->currentGroupIndex += 1;
×
383
          taosArrayClear(pInfo->pUidList);
×
384
          continue;
×
385
        }
386
      } else {
387
        code = pReaderFn->reuseReader(pInfo->pLastrowReader, pList, num);
×
388
        QUERY_CHECK_CODE(code, lino, _end);
×
389
      }
390

391
      taosArrayClear(pInfo->pUidList);
7✔
392

393
      code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
7✔
394
                                     pInfo->pUidList, NULL);
395
      QUERY_CHECK_CODE(code, lino, _end);
7✔
396

397
      pInfo->currentGroupIndex += 1;
7✔
398

399
      // check for tag values
400
      if (pInfo->pRes->info.rows > 0) {
7✔
401
        SExprSupp* pSup = &pInfo->pseudoExprSup;
7✔
402

403
        STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0];
7✔
404
        pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
7✔
405

406
        if (taosArrayGetSize(pInfo->pUidList) > 0) {
7✔
407
          void* pUid = taosArrayGet(pInfo->pUidList, 0);
7✔
408
          QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
7✔
409
          pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid;
7✔
410
          if (pInfo->pseudoExprSup.numOfExprs > 0) {
7✔
411
            code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
×
412
                                          pInfo->pRes->info.rows, pTaskInfo, NULL);
×
413
            QUERY_CHECK_CODE(code, lino, _end);
×
414
          }
415
        }
416

417
        // pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
418
        (*ppRes) = pInfo->pRes;
7✔
419
        return code;
7✔
420
      } else {
421
        // pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
422
      }
423
    }
424

425
    pReaderFn->closeReader(pInfo->pLastrowReader);
7✔
426
    pInfo->pLastrowReader = NULL;
7✔
427
    setOperatorCompleted(pOperator);
7✔
428
    (*ppRes) = NULL;
7✔
429
    return code;
7✔
430
  }
431

432
_end:
×
433
  if (code != TSDB_CODE_SUCCESS) {
×
434
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
435
    pTaskInfo->code = code;
×
436
    T_LONG_JMP(pTaskInfo->env, code);
×
437
  }
438
  return code;
×
439
}
440

441
void destroyCacheScanOperator(void* param) {
12✔
442
  SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
12✔
443
  blockDataDestroy(pInfo->pRes);
12✔
444
  blockDataDestroy(pInfo->pBufferedRes);
12✔
445
  taosMemoryFreeClear(pInfo->pSlotIds);
12✔
446
  taosMemoryFreeClear(pInfo->pDstSlotIds);
12✔
447
  taosArrayDestroy(pInfo->pCidList);
12✔
448
  taosArrayDestroy(pInfo->pFuncTypeList);
12✔
449
  taosArrayDestroy(pInfo->pUidList);
12✔
450
  taosArrayDestroy(pInfo->matchInfo.pList);
12✔
451
  tableListDestroy(pInfo->pTableList);
12✔
452

453
  if (pInfo->pLastrowReader != NULL && pInfo->readHandle.api.cacheFn.closeReader != NULL) {
12✔
454
    pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader);
5✔
455
    pInfo->pLastrowReader = NULL;
5✔
456
  }
457

458
  cleanupExprSupp(&pInfo->pseudoExprSup);
12✔
459
  taosMemoryFreeClear(param);
12✔
460
}
12✔
461

462
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
12✔
463
                               int32_t** pDstSlotIds) {
464
  size_t numOfCols = taosArrayGetSize(pColMatchInfo);
12✔
465

466
  *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
12✔
467
  if (*pSlotIds == NULL) {
12✔
468
    return terrno;
×
469
  }
470

471
  *pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
12✔
472
  if (*pDstSlotIds == NULL) {
12✔
473
    taosMemoryFreeClear(*pSlotIds);
×
474
    return terrno;
×
475
  }
476

477
  SSchemaInfo*    pSchemaInfo = taosArrayGetLast(pTaskInfo->schemaInfos);
12✔
478
  SSchemaWrapper* pWrapper = pSchemaInfo->sw;
12✔
479

480
  for (int32_t i = 0; i < numOfCols; ++i) {
80✔
481
    SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
68✔
482
    if (!pColMatch) {
68✔
483
      return terrno;
×
484
    }
485
    bool           found = false;
68✔
486
    for (int32_t j = 0; j < pWrapper->nCols; ++j) {
310✔
487
      /*      if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
488
        (*pSlotIds)[pColMatch->dstSlotId] = -1;
489
        break;
490
        }*/
491

492
      if (pColMatch->colId == pWrapper->pSchema[j].colId) {
310✔
493
        (*pSlotIds)[i] = j;
68✔
494
        (*pDstSlotIds)[i] = pColMatch->dstSlotId;
68✔
495
        found = true;
68✔
496
        break;
68✔
497
      }
498
    }
499
    if (!found) {
68✔
500
      (*pSlotIds)[i] = -1;
×
501
      (*pDstSlotIds)[i] = pColMatch->dstSlotId;
×
502
    }
503
  }
504

505
  return TSDB_CODE_SUCCESS;
12✔
506
}
507

508
int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo) {
12✔
509
  int32_t code = TSDB_CODE_SUCCESS;
12✔
510
  int32_t lino = 0;
12✔
511
  if (!pScanNode->ignoreNull) {  // retrieve cached last value
12✔
512
    return TSDB_CODE_SUCCESS;
4✔
513
  }
514

515
  size_t  size = taosArrayGetSize(pColMatchInfo->pList);
8✔
516
  SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
8✔
517
  QUERY_CHECK_NULL(pMatchInfo, code, lino, _end, terrno);
8✔
518

519
  for (int32_t i = 0; i < size; ++i) {
60✔
520
    SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i);
52✔
521
    if (!pColInfo) {
52✔
522
      return terrno;
×
523
    }
524

525
    int32_t    slotId = pColInfo->dstSlotId;
52✔
526
    SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots;
52✔
527

528
    SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
52✔
529
    QUERY_CHECK_NULL(pDesc, code, lino, _end, terrno);
52✔
530

531
    if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
52✔
532
      void* tmp = taosArrayPush(pMatchInfo, pColInfo);
52✔
533
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
52✔
534
    } else if (FUNCTION_TYPE_CACHE_LAST_ROW == pColInfo->funcType) {
×
535
      void* tmp = taosArrayPush(pMatchInfo, pColInfo);
×
536
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
537
    }
538
  }
539

540
  taosArrayDestroy(pColMatchInfo->pList);
8✔
541
  pColMatchInfo->pList = pMatchInfo;
8✔
542

543
_end:
8✔
544
  if (code != TSDB_CODE_SUCCESS) {
8✔
545
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
546
  }
547
  return code;
8✔
548
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc