• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4308

14 Jun 2025 02:06PM UTC coverage: 62.454% (-0.3%) from 62.777%
#4308

push

travis-ci

web-flow
fix: taosdump windows pthread_mutex_unlock crash(3.0) (#31357)

* fix: windows pthread_mutex_unlock crash

* enh: sync from main fix taosdump crash windows

* fix: restore .github action branch to main

153985 of 315105 branches covered (48.87%)

Branch coverage included in aggregate %.

238120 of 312727 relevant lines covered (76.14%)

6462519.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.76
/source/libs/executor/src/cachescanoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "os.h"
18
#include "tname.h"
19

20
#include "tdatablock.h"
21
#include "tmsg.h"
22

23
#include "executorInt.h"
24
#include "functionMgt.h"
25
#include "operator.h"
26
#include "querytask.h"
27
#include "tcompare.h"
28
#include "thash.h"
29
#include "ttypes.h"
30

31
#include "storageapi.h"
32

33
typedef struct SCacheRowsScanInfo {
34
  SSDataBlock*    pRes;
35
  SReadHandle     readHandle;
36
  void*           pLastrowReader;
37
  SColMatchInfo   matchInfo;
38
  int32_t*        pSlotIds;
39
  int32_t*        pDstSlotIds;
40
  SExprSupp       pseudoExprSup;
41
  int32_t         retrieveType;
42
  int32_t         currentGroupIndex;
43
  SSDataBlock*    pBufferedRes;
44
  SArray*         pUidList;
45
  SArray*         pCidList;
46
  int32_t         indexOfBufferedRes;
47
  STableListInfo* pTableList;
48
  SArray*         pFuncTypeList;
49
  int32_t         numOfPks;
50
  SColumnInfo     pkCol;
51
  bool            gotAll;
52
} SCacheRowsScanInfo;
53

54
static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
55
static void    destroyCacheScanOperator(void* param);
56
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
57
                                      int32_t** pDstSlotIds);
58
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
59

60
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
61

62
static int32_t setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode* pScan) {
6,123✔
63
  int32_t code = TSDB_CODE_SUCCESS;
6,123✔
64
  int32_t lino = 0;
6,123✔
65
  SNode*  pNode;
66
  int32_t idx = 0;
6,123✔
67
  FOREACH(pNode, pScan->pTargets) {
22,511!
68
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
16,387!
69
      SColumnNode*     pCol = (SColumnNode*)pNode;
16,387✔
70
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
16,387✔
71
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
16,388!
72
      pColInfo->info.colId = pCol->colId;
16,388✔
73
    }
74
    idx++;
16,388✔
75
  }
76

77
  for (; idx < pBlock->pDataBlock->size; ++idx) {
6,212✔
78
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
88✔
79
    QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
88!
80
    if (pScan->scan.pScanPseudoCols) {
88✔
81
      FOREACH(pNode, pScan->scan.pScanPseudoCols) {
56!
82
        STargetNode* pTarget = (STargetNode*)pNode;
32✔
83
        if (pColInfo->info.slotId == pTarget->slotId) {
32!
84
          pColInfo->info.colId = 0;
×
85
          break;
×
86
        }
87
      }
88
    }
89
  }
90

91
_end:
6,124✔
92
  if (code != TSDB_CODE_SUCCESS) {
6,124!
93
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
94
  }
95
  return code;
6,122✔
96
}
97

98
int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
6,110✔
99
                                    STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
100
                                    SOperatorInfo** pOptrInfo) {
101
  QRY_PARAM_CHECK(pOptrInfo);
6,110!
102

103
  int32_t             code = TSDB_CODE_SUCCESS;
6,110✔
104
  int32_t             lino = 0;
6,110✔
105
  int32_t             numOfCols = 0;
6,110✔
106
  SNodeList*          pScanCols = pScanNode->scan.pScanCols;
6,110✔
107
  SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
6,110!
108
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
6,121!
109

110
  if (pInfo == NULL || pOperator == NULL) {
6,122!
111
    code = terrno;
×
112
    goto _error;
×
113
  }
114

115
  pInfo->pTableList = pTableListInfo;
6,122✔
116
  pInfo->readHandle = *readHandle;
6,122✔
117

118
  SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
6,122✔
119
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
6,122✔
120
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
6,125!
121

122
  code = extractColMatchInfo(pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
6,125✔
123
  QUERY_CHECK_CODE(code, lino, _error);
6,120!
124

125
  // todo: the pk information should comes from the physical plan
126
  // pk info may not in pScanCols, so extract primary key from pInfo->matchInfo may failed
127
  SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, 0);
6,120✔
128
  if (pSchemaInfo != NULL) {
6,119!
129
    if (pSchemaInfo->sw->pSchema[1].flags & COL_IS_KEY) {  // is primary key
6,119✔
130
      SSchema* pColSchema = &pSchemaInfo->sw->pSchema[1];
2,050✔
131
      pInfo->numOfPks = 1;
2,050✔
132
      pInfo->pkCol.type = pColSchema->type;
2,050✔
133
      pInfo->pkCol.bytes = pColSchema->bytes;
2,050✔
134
      pInfo->pkCol.pk = 1;
2,050✔
135
    }
136
  } else {
137
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
×
138
      SColMatchItem* pItem = taosArrayGet(pInfo->matchInfo.pList, i);
×
139
      QUERY_CHECK_NULL(pItem, code, lino, _error, terrno);
×
140
      if (pItem->isPk) {
×
141
        pInfo->numOfPks += 1;
×
142
        pInfo->pkCol.type = pItem->dataType.type;    // only record one primary key
×
143
        pInfo->pkCol.bytes = pItem->dataType.bytes;  // only record one primary key
×
144
        pInfo->pkCol.pk = 1;
×
145
      }
146
    }
147
  }
148

149
  SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t));
6,119✔
150
  QUERY_CHECK_NULL(pCidList, code, lino, _error, terrno);
6,126!
151

152
  pInfo->pFuncTypeList = taosArrayInit(taosArrayGetSize(pScanNode->pFuncTypes), sizeof(int32_t));
6,126✔
153
  QUERY_CHECK_NULL(pInfo->pFuncTypeList, code, lino, _error, terrno);
6,128!
154

155
  void* tmp = taosArrayAddAll(pInfo->pFuncTypeList, pScanNode->pFuncTypes);
6,128✔
156
  if (!tmp && taosArrayGetSize(pScanNode->pFuncTypes) > 0) {
6,124!
157
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
×
158
  }
159

160
  for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) {
18,951✔
161
    SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i);
12,822✔
162
    QUERY_CHECK_NULL(pColInfo, code, lino, _error, terrno);
12,829!
163

164
    void*          tmp = taosArrayPush(pCidList, &pColInfo->colId);
12,829✔
165
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
12,832!
166
    if (pInfo->pFuncTypeList != NULL && taosArrayGetSize(pInfo->pFuncTypeList) > i) {
12,832✔
167
      void* pFuncType = taosArrayGet(pInfo->pFuncTypeList, i);
1,644✔
168
      QUERY_CHECK_NULL(pFuncType, code, lino, _error, terrno);
1,641✔
169
      pColInfo->funcType = *(int32_t*)pFuncType;
1,640✔
170
    }
171
  }
172
  pInfo->pCidList = pCidList;
6,129✔
173

174
  code = removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
6,129✔
175
  QUERY_CHECK_CODE(code, lino, _error);
6,117!
176

177
  code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds);
6,117✔
178
  QUERY_CHECK_CODE(code, lino, _error);
6,122!
179

180
  int32_t totalTables = 0;
6,122✔
181
  code = tableListGetSize(pTableListInfo, &totalTables);
6,122✔
182
  QUERY_CHECK_CODE(code, lino, _error);
6,121!
183

184
  int32_t capacity = 0;
6,121✔
185

186
  pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
6,121✔
187
  QUERY_CHECK_NULL(pInfo->pUidList, code, lino, _error, terrno);
6,124!
188

189
  // partition by tbname
190
  if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) {
9,661✔
191
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
3,537✔
192

193
    STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
3,537✔
194
    if (totalTables) QUERY_CHECK_NULL(pList, code, lino, _error, terrno);
3,540!
195

196
    uint64_t suid = tableListGetSuid(pTableListInfo);
3,540✔
197
    code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
3,540✔
198
                                                    taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds,
3,539✔
199
                                                    suid, &pInfo->pLastrowReader, pTaskInfo->id.str,
3,539✔
200
                                                    pScanNode->pFuncTypes, &pInfo->pkCol, pInfo->numOfPks);
201
    QUERY_CHECK_CODE(code, lino, _error);
3,538!
202

203
    capacity = TMIN(totalTables, 4096);
3,538✔
204

205
    pInfo->pBufferedRes = NULL;
3,538✔
206
    code = createOneDataBlock(pInfo->pRes, false, &pInfo->pBufferedRes);
3,538✔
207
    QUERY_CHECK_CODE(code, lino, _error);
3,540!
208

209
    code = setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
3,540✔
210
    QUERY_CHECK_CODE(code, lino, _error);
3,539!
211

212
    code = blockDataEnsureCapacity(pInfo->pBufferedRes, capacity);
3,539✔
213
    QUERY_CHECK_CODE(code, lino, _error);
3,537!
214
  } else {  // by tags
215
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
2,584✔
216
    capacity = 1;  // only one row output
2,584✔
217
    code = setColIdForCacheReadBlock(pInfo->pRes, pScanNode);
2,584✔
218
    QUERY_CHECK_CODE(code, lino, _error);
2,586!
219
  }
220

221
  initResultSizeInfo(&pOperator->resultInfo, capacity);
6,123✔
222
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
6,123✔
223
  QUERY_CHECK_CODE(code, lino, _error);
6,126!
224

225
  if (pScanNode->scan.pScanPseudoCols != NULL) {
6,126✔
226
    SExprSupp* p = &pInfo->pseudoExprSup;
1,062✔
227
    code = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->pExprInfo, &p->numOfExprs);
1,062✔
228
    TSDB_CHECK_CODE(code, lino, _error);
1,063!
229

230
    p->pCtx =
1,063✔
231
        createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
1,063✔
232
    QUERY_CHECK_NULL(p->pCtx, code, lino, _error, terrno);
1,063!
233
  }
234

235
  setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED,
6,127✔
236
                  pInfo, pTaskInfo);
237
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
6,126✔
238

239
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doScanCacheNext, NULL, destroyCacheScanOperator, optrDefaultBufFn,
6,128✔
240
                                         NULL, optrDefaultGetNextExtFn, NULL);
241

242
  pOperator->cost.openCost = 0;
6,127✔
243

244
  *pOptrInfo = pOperator;
6,127✔
245
  return code;
6,127✔
246

247
_error:
×
248
  pTaskInfo->code = code;
×
249
  if (code != TSDB_CODE_SUCCESS) {
×
250
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
251
  }
252
  if (pInfo != NULL) {
×
253
    pInfo->pTableList = NULL;
×
254
    destroyCacheScanOperator(pInfo);
×
255
  }
256
  if (pOperator != NULL) {
×
257
    pOperator->info = NULL;
×
258
    destroyOperator(pOperator);
×
259
  }
260
  return code;
×
261
}
262

263
static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
11,603✔
264
  int32_t code = TSDB_CODE_SUCCESS;
11,603✔
265
  int32_t lino = 0;
11,603✔
266
  if (pOperator->status == OP_EXEC_DONE) {
11,603!
267
    (*ppRes) = NULL;
×
268
    return code;
×
269
  }
270

271
  SCacheRowsScanInfo* pInfo = pOperator->info;
11,603✔
272
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
11,603✔
273
  STableListInfo*     pTableList = pInfo->pTableList;
11,603✔
274
  SStoreCacheReader*  pReaderFn = &pInfo->readHandle.api.cacheFn;
11,603✔
275
  SSDataBlock*        pBufRes = pInfo->pBufferedRes;
11,603✔
276

277
  uint64_t suid = tableListGetSuid(pTableList);
11,603✔
278
  int32_t  size = 0;
11,604✔
279
  code = tableListGetSize(pTableList, &size);
11,604✔
280
  QUERY_CHECK_CODE(code, lino, _end);
11,605!
281

282
  if (size == 0) {
11,605✔
283
    setOperatorCompleted(pOperator);
566✔
284
    (*ppRes) = NULL;
566✔
285
    return code;
566✔
286
  }
287

288
  blockDataCleanup(pInfo->pRes);
11,039✔
289

290
  // check if it is a group by tbname
291
  if ((pInfo->retrieveType & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
11,031✔
292
    if (isTaskKilled(pTaskInfo)) {
7,051!
293
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
294
    }
295

296
    if (pInfo->indexOfBufferedRes >= pBufRes->info.rows && !pInfo->gotAll) {
7,053✔
297
      blockDataCleanup(pBufRes);
6,521✔
298
      taosArrayClear(pInfo->pUidList);
6,519✔
299

300
      code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pBufRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
6,519✔
301
                                     pInfo->pUidList, &pInfo->gotAll);
302
      QUERY_CHECK_CODE(code, lino, _end);
6,521!
303

304
      // check for tag values
305
      int32_t resultRows = pBufRes->info.rows;
6,521✔
306

307
      // the results may be null, if last values are all null
308
      if (resultRows != 0 && resultRows != taosArrayGetSize(pInfo->pUidList)) {
6,521!
309
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
310
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pTaskInfo->code));
×
311
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
312
      }
313
      pInfo->indexOfBufferedRes = 0;
6,521✔
314
    }
315

316
    SSDataBlock* pRes = pInfo->pRes;
7,053✔
317

318
    if (pInfo->indexOfBufferedRes < pBufRes->info.rows) {
7,053✔
319
      for (int32_t i = 0; i < taosArrayGetSize(pBufRes->pDataBlock); ++i) {
13,282✔
320
        SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
9,769✔
321
        QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
9,769!
322
        int32_t          slotId = pCol->info.slotId;
9,769✔
323

324
        SColumnInfoData* pSrc = taosArrayGet(pBufRes->pDataBlock, slotId);
9,769✔
325
        QUERY_CHECK_NULL(pSrc, code, lino, _end, terrno);
9,769!
326
        SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
9,769✔
327
        QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
9,769!
328

329
        if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
19,538✔
330
          colDataSetNULL(pDst, 0);
331
        } else {
332
          if (pSrc->pData) {
8,779✔
333
            char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
8,511!
334
            code = colDataSetVal(pDst, 0, p, false);
8,511✔
335
            QUERY_CHECK_CODE(code, lino, _end);
8,510!
336
          }
337
        }
338
      }
339

340
      void* pUid = taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
3,512✔
341
      QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
3,514!
342

343
      pRes->info.id.uid = *(tb_uid_t*)pUid;
3,514✔
344
      pRes->info.rows = 1;
3,514✔
345
      pRes->info.scanFlag = MAIN_SCAN;
3,514✔
346

347
      SExprSupp* pSup = &pInfo->pseudoExprSup;
3,514✔
348
      code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows,
3,514✔
349
                                    pTaskInfo, NULL);
350
      QUERY_CHECK_CODE(code, lino, _end);
3,514!
351

352
      pRes->info.id.groupId = tableListGetTableGroupId(pTableList, pRes->info.id.uid);
3,514✔
353
      pInfo->indexOfBufferedRes += 1;
3,514✔
354
      (*ppRes) = pRes;
3,514✔
355
      return code;
3,514✔
356
    } else {
357
      setOperatorCompleted(pOperator);
3,539✔
358
      (*ppRes) = NULL;
3,538✔
359
      return code;
3,538✔
360
    }
361
  } else {
362
    size_t totalGroups = tableListGetOutputGroups(pTableList);
3,980✔
363

364
    while (pInfo->currentGroupIndex < totalGroups) {
4,346✔
365
      if (isTaskKilled(pTaskInfo)) {
2,324!
366
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
367
      }
368

369
      STableKeyInfo* pList = NULL;
2,326✔
370
      int32_t        num = 0;
2,326✔
371

372
      code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
2,326✔
373
      QUERY_CHECK_CODE(code, lino, _end);
2,326!
374

375
      if (NULL == pInfo->pLastrowReader) {
2,326✔
376
        int32_t tmpRes = pReaderFn->openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
2,020✔
377
                                               taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList,
2,020✔
378
                                               pInfo->pSlotIds, suid, &pInfo->pLastrowReader, pTaskInfo->id.str,
2,020✔
379
                                               pInfo->pFuncTypeList, &pInfo->pkCol, pInfo->numOfPks);
380

381
        if (tmpRes != TSDB_CODE_SUCCESS) {
2,022!
382
          pInfo->currentGroupIndex += 1;
×
383
          taosArrayClear(pInfo->pUidList);
×
384
          continue;
×
385
        }
386
      } else {
387
        code = pReaderFn->reuseReader(pInfo->pLastrowReader, pList, num);
306✔
388
        QUERY_CHECK_CODE(code, lino, _end);
307!
389
      }
390

391
      taosArrayClear(pInfo->pUidList);
2,329✔
392

393
      code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
2,328✔
394
                                     pInfo->pUidList, NULL);
395
      QUERY_CHECK_CODE(code, lino, _end);
2,330!
396

397
      pInfo->currentGroupIndex += 1;
2,330✔
398

399
      // check for tag values
400
      if (pInfo->pRes->info.rows > 0) {
2,330✔
401
        SExprSupp* pSup = &pInfo->pseudoExprSup;
1,966✔
402

403
        STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0];
1,966✔
404
        pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
1,966✔
405

406
        if (taosArrayGetSize(pInfo->pUidList) > 0) {
1,966!
407
          void* pUid = taosArrayGet(pInfo->pUidList, 0);
1,966✔
408
          QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
1,966!
409
          pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid;
1,967✔
410
          if (pInfo->pseudoExprSup.numOfExprs > 0) {
1,967✔
411
            code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
289✔
412
                                          pInfo->pRes->info.rows, pTaskInfo, NULL);
289✔
413
            QUERY_CHECK_CODE(code, lino, _end);
288!
414
          }
415
        }
416

417
        // pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
418
        (*ppRes) = pInfo->pRes;
1,966✔
419
        return code;
1,966✔
420
      } else {
421
        // pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
422
      }
423
    }
424

425
    pReaderFn->closeReader(pInfo->pLastrowReader);
2,022✔
426
    pInfo->pLastrowReader = NULL;
2,023✔
427
    setOperatorCompleted(pOperator);
2,023✔
428
    (*ppRes) = NULL;
2,019✔
429
    return code;
2,019✔
430
  }
431

432
_end:
×
433
  if (code != TSDB_CODE_SUCCESS) {
×
434
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
435
    pTaskInfo->code = code;
×
436
    T_LONG_JMP(pTaskInfo->env, code);
×
437
  }
438
  return code;
×
439
}
440

441
void destroyCacheScanOperator(void* param) {
6,120✔
442
  SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
6,120✔
443
  blockDataDestroy(pInfo->pRes);
6,120✔
444
  blockDataDestroy(pInfo->pBufferedRes);
6,129✔
445
  taosMemoryFreeClear(pInfo->pSlotIds);
6,128!
446
  taosMemoryFreeClear(pInfo->pDstSlotIds);
6,129!
447
  taosArrayDestroy(pInfo->pCidList);
6,130✔
448
  taosArrayDestroy(pInfo->pFuncTypeList);
6,129✔
449
  taosArrayDestroy(pInfo->pUidList);
6,129✔
450
  taosArrayDestroy(pInfo->matchInfo.pList);
6,128✔
451
  tableListDestroy(pInfo->pTableList);
6,129✔
452

453
  if (pInfo->pLastrowReader != NULL && pInfo->readHandle.api.cacheFn.closeReader != NULL) {
6,129!
454
    pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader);
3,540✔
455
    pInfo->pLastrowReader = NULL;
3,540✔
456
  }
457

458
  cleanupExprSupp(&pInfo->pseudoExprSup);
6,129✔
459
  taosMemoryFreeClear(param);
6,129!
460
}
6,129✔
461

462
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
6,112✔
463
                               int32_t** pDstSlotIds) {
464
  size_t numOfCols = taosArrayGetSize(pColMatchInfo);
6,112✔
465

466
  *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
6,115!
467
  if (*pSlotIds == NULL) {
6,123!
468
    return terrno;
×
469
  }
470

471
  *pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
6,123!
472
  if (*pDstSlotIds == NULL) {
6,118!
473
    taosMemoryFreeClear(*pSlotIds);
×
474
    return terrno;
×
475
  }
476

477
  SSchemaInfo*    pSchemaInfo = taosArrayGetLast(pTaskInfo->schemaInfos);
6,118✔
478
  SSchemaWrapper* pWrapper = pSchemaInfo->sw;
6,121✔
479

480
  for (int32_t i = 0; i < numOfCols; ++i) {
18,950✔
481
    SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
12,830✔
482
    if (!pColMatch) {
12,834✔
483
      return terrno;
5✔
484
    }
485
    bool           found = false;
12,829✔
486
    for (int32_t j = 0; j < pWrapper->nCols; ++j) {
88,355✔
487
      /*      if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
488
        (*pSlotIds)[pColMatch->dstSlotId] = -1;
489
        break;
490
        }*/
491

492
      if (pColMatch->colId == pWrapper->pSchema[j].colId) {
87,992✔
493
        (*pSlotIds)[i] = j;
12,466✔
494
        (*pDstSlotIds)[i] = pColMatch->dstSlotId;
12,466✔
495
        found = true;
12,466✔
496
        break;
12,466✔
497
      }
498
    }
499
    if (!found) {
12,829✔
500
      (*pSlotIds)[i] = -1;
368✔
501
      (*pDstSlotIds)[i] = pColMatch->dstSlotId;
368✔
502
    }
503
  }
504

505
  return TSDB_CODE_SUCCESS;
6,120✔
506
}
507

508
int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo) {
6,110✔
509
  int32_t code = TSDB_CODE_SUCCESS;
6,110✔
510
  int32_t lino = 0;
6,110✔
511
  if (!pScanNode->ignoreNull) {  // retrieve cached last value
6,110✔
512
    return TSDB_CODE_SUCCESS;
3,576✔
513
  }
514

515
  size_t  size = taosArrayGetSize(pColMatchInfo->pList);
2,534✔
516
  SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
2,536✔
517
  QUERY_CHECK_NULL(pMatchInfo, code, lino, _end, terrno);
2,541!
518

519
  for (int32_t i = 0; i < size; ++i) {
7,965✔
520
    SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i);
5,422✔
521
    if (!pColInfo) {
5,422!
522
      return terrno;
×
523
    }
524

525
    int32_t    slotId = pColInfo->dstSlotId;
5,422✔
526
    SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots;
5,422✔
527

528
    SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
5,422✔
529
    QUERY_CHECK_NULL(pDesc, code, lino, _end, terrno);
5,421!
530

531
    if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
5,422✔
532
      void* tmp = taosArrayPush(pMatchInfo, pColInfo);
5,141✔
533
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
5,141!
534
    } else if (FUNCTION_TYPE_CACHE_LAST_ROW == pColInfo->funcType) {
283!
535
      void* tmp = taosArrayPush(pMatchInfo, pColInfo);
283✔
536
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
283!
537
    }
538
  }
539

540
  taosArrayDestroy(pColMatchInfo->pList);
2,543✔
541
  pColMatchInfo->pList = pMatchInfo;
2,541✔
542

543
_end:
2,541✔
544
  if (code != TSDB_CODE_SUCCESS) {
2,541!
545
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
546
  }
547
  return code;
2,541✔
548
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc