• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.89
/source/libs/executor/src/cachescanoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "function.h"
17
#include "os.h"
18
#include "tname.h"
19

20
#include "tdatablock.h"
21
#include "tmsg.h"
22

23
#include "executorInt.h"
24
#include "functionMgt.h"
25
#include "operator.h"
26
#include "querytask.h"
27
#include "tcompare.h"
28
#include "thash.h"
29
#include "ttypes.h"
30

31
#include "storageapi.h"
32

33
typedef struct SCacheRowsScanInfo {
34
  SSDataBlock*    pRes;
35
  SReadHandle     readHandle;
36
  void*           pLastrowReader;
37
  SColMatchInfo   matchInfo;
38
  int32_t*        pSlotIds;
39
  int32_t*        pDstSlotIds;
40
  SExprSupp       pseudoExprSup;
41
  int32_t         retrieveType;
42
  int32_t         currentGroupIndex;
43
  SSDataBlock*    pBufferedRes;
44
  SArray*         pUidList;
45
  SArray*         pCidList;
46
  int32_t         indexOfBufferedRes;
47
  STableListInfo* pTableList;
48
  SArray*         pFuncTypeList;
49
  int32_t         numOfPks;
50
  SColumnInfo     pkCol;
51
  bool            gotAll;
52
} SCacheRowsScanInfo;
53

54
static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
55
static void    destroyCacheScanOperator(void* param);
56
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
57
                                      int32_t** pDstSlotIds);
58
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
59

60
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
61

62
static int32_t setColIdForCacheReadBlock(SSDataBlock* pBlock, SLastRowScanPhysiNode* pScan) {
16,830✔
63
  int32_t code = TSDB_CODE_SUCCESS;
16,830✔
64
  int32_t lino = 0;
16,830✔
65
  SNode*  pNode;
66
  int32_t idx = 0;
16,830✔
67
  FOREACH(pNode, pScan->pTargets) {
58,419!
68
    if (nodeType(pNode) == QUERY_NODE_COLUMN) {
41,597✔
69
      SColumnNode*     pCol = (SColumnNode*)pNode;
41,596✔
70
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
41,596✔
71
      QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
41,588!
72
      pColInfo->info.colId = pCol->colId;
41,588✔
73
    }
74
    idx++;
41,589✔
75
  }
76

77
  for (; idx < pBlock->pDataBlock->size; ++idx) {
18,692✔
78
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
1,869✔
79
    QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,869!
80
    if (pScan->scan.pScanPseudoCols) {
1,870✔
81
      FOREACH(pNode, pScan->scan.pScanPseudoCols) {
1,827!
82
        STargetNode* pTarget = (STargetNode*)pNode;
1,804✔
83
        if (pColInfo->info.slotId == pTarget->slotId) {
1,804✔
84
          pColInfo->info.colId = 0;
1,772✔
85
          break;
1,772✔
86
        }
87
      }
88
    }
89
  }
90

91
_end:
16,823✔
92
  if (code != TSDB_CODE_SUCCESS) {
16,823!
93
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
94
  }
95
  return code;
16,828✔
96
}
97

98
int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
16,823✔
99
                                    STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
100
                                    SOperatorInfo** pOptrInfo) {
101
  QRY_PARAM_CHECK(pOptrInfo);
16,823!
102

103
  int32_t             code = TSDB_CODE_SUCCESS;
16,823✔
104
  int32_t             lino = 0;
16,823✔
105
  int32_t             numOfCols = 0;
16,823✔
106
  SNodeList*          pScanCols = pScanNode->scan.pScanCols;
16,823✔
107
  SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
16,823✔
108
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
16,837✔
109

110
  if (pInfo == NULL || pOperator == NULL) {
16,836!
111
    code = terrno;
×
112
    goto _error;
×
113
  }
114

115
  pInfo->pTableList = pTableListInfo;
16,838✔
116
  pInfo->readHandle = *readHandle;
16,838✔
117

118
  SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
16,838✔
119
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
16,838✔
120
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
16,846!
121

122
  code = extractColMatchInfo(pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
16,846✔
123
  QUERY_CHECK_CODE(code, lino, _error);
16,838!
124

125
  // todo: the pk information should comes from the physical plan
126
  // pk info may not in pScanCols, so extract primary key from pInfo->matchInfo may failed
127
  SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, 0);
16,838✔
128
  if (pSchemaInfo != NULL) {
16,833!
129
    if (pSchemaInfo->sw->pSchema[1].flags & COL_IS_KEY) {  // is primary key
16,833✔
130
      SSchema* pColSchema = &pSchemaInfo->sw->pSchema[1];
8,815✔
131
      pInfo->numOfPks = 1;
8,815✔
132
      pInfo->pkCol.type = pColSchema->type;
8,815✔
133
      pInfo->pkCol.bytes = pColSchema->bytes;
8,815✔
134
      pInfo->pkCol.pk = 1;
8,815✔
135
    }
136
  } else {
137
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
×
138
      SColMatchItem* pItem = taosArrayGet(pInfo->matchInfo.pList, i);
×
139
      QUERY_CHECK_NULL(pItem, code, lino, _error, terrno);
×
140
      if (pItem->isPk) {
×
141
        pInfo->numOfPks += 1;
×
142
        pInfo->pkCol.type = pItem->dataType.type;    // only record one primary key
×
143
        pInfo->pkCol.bytes = pItem->dataType.bytes;  // only record one primary key
×
144
        pInfo->pkCol.pk = 1;
×
145
      }
146
    }
147
  }
148

149
  SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t));
16,833✔
150
  QUERY_CHECK_NULL(pCidList, code, lino, _error, terrno);
16,828!
151

152
  pInfo->pFuncTypeList = taosArrayInit(taosArrayGetSize(pScanNode->pFuncTypes), sizeof(int32_t));
16,828✔
153
  QUERY_CHECK_NULL(pInfo->pFuncTypeList, code, lino, _error, terrno);
16,844!
154

155
  void* tmp = taosArrayAddAll(pInfo->pFuncTypeList, pScanNode->pFuncTypes);
16,844✔
156
  if (!tmp && taosArrayGetSize(pScanNode->pFuncTypes) > 0) {
16,841!
157
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
×
158
  }
159

160
  for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) {
43,341✔
161
    SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i);
26,500✔
162
    QUERY_CHECK_NULL(pColInfo, code, lino, _error, terrno);
26,499!
163

164
    void*          tmp = taosArrayPush(pCidList, &pColInfo->colId);
26,499✔
165
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
26,510!
166
    if (pInfo->pFuncTypeList != NULL && taosArrayGetSize(pInfo->pFuncTypeList) > i) {
26,510✔
167
      void* pFuncType = taosArrayGet(pInfo->pFuncTypeList, i);
1,643✔
168
      QUERY_CHECK_NULL(pFuncType, code, lino, _error, terrno);
1,641!
169
      pColInfo->funcType = *(int32_t*)pFuncType;
1,641✔
170
    }
171
  }
172
  pInfo->pCidList = pCidList;
16,841✔
173

174
  code = removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
16,841✔
175
  QUERY_CHECK_CODE(code, lino, _error);
16,839!
176

177
  code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds);
16,839✔
178
  QUERY_CHECK_CODE(code, lino, _error);
16,836!
179

180
  int32_t totalTables = 0;
16,836✔
181
  code = tableListGetSize(pTableListInfo, &totalTables);
16,836✔
182
  QUERY_CHECK_CODE(code, lino, _error);
16,823!
183

184
  int32_t capacity = 0;
16,823✔
185

186
  pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
16,823✔
187
  QUERY_CHECK_NULL(pInfo->pUidList, code, lino, _error, terrno);
16,844!
188

189
  // partition by tbname
190
  if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) {
25,497✔
191
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
8,646✔
192

193
    STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
8,646✔
194
    if (totalTables) QUERY_CHECK_NULL(pList, code, lino, _error, terrno);
8,651!
195

196
    uint64_t suid = tableListGetSuid(pTableListInfo);
8,651✔
197
    code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
8,648✔
198
                                                    taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds,
8,651✔
199
                                                    suid, &pInfo->pLastrowReader, pTaskInfo->id.str,
8,651✔
200
                                                    pScanNode->pFuncTypes, &pInfo->pkCol, pInfo->numOfPks);
201
    QUERY_CHECK_CODE(code, lino, _error);
8,649!
202

203
    capacity = TMIN(totalTables, 4096);
8,649✔
204

205
    pInfo->pBufferedRes = NULL;
8,649✔
206
    code = createOneDataBlock(pInfo->pRes, false, &pInfo->pBufferedRes);
8,649✔
207
    QUERY_CHECK_CODE(code, lino, _error);
8,652!
208

209
    code = setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
8,652✔
210
    QUERY_CHECK_CODE(code, lino, _error);
8,647!
211

212
    code = blockDataEnsureCapacity(pInfo->pBufferedRes, capacity);
8,647✔
213
    QUERY_CHECK_CODE(code, lino, _error);
8,653!
214
  } else {  // by tags
215
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
8,185✔
216
    capacity = 1;  // only one row output
8,185✔
217
    code = setColIdForCacheReadBlock(pInfo->pRes, pScanNode);
8,185✔
218
    QUERY_CHECK_CODE(code, lino, _error);
8,184!
219
  }
220

221
  initResultSizeInfo(&pOperator->resultInfo, capacity);
16,837✔
222
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
16,836✔
223
  QUERY_CHECK_CODE(code, lino, _error);
16,842!
224

225
  if (pScanNode->scan.pScanPseudoCols != NULL) {
16,842✔
226
    SExprSupp* p = &pInfo->pseudoExprSup;
4,396✔
227
    code = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->pExprInfo, &p->numOfExprs);
4,396✔
228
    TSDB_CHECK_CODE(code, lino, _error);
4,395!
229

230
    p->pCtx =
4,396✔
231
        createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
4,395✔
232
    QUERY_CHECK_NULL(p->pCtx, code, lino, _error, terrno);
4,396!
233
  }
234

235
  setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED,
16,842✔
236
                  pInfo, pTaskInfo);
237
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
16,833✔
238

239
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doScanCacheNext, NULL, destroyCacheScanOperator, optrDefaultBufFn,
16,831✔
240
                                         NULL, optrDefaultGetNextExtFn, NULL);
241

242
  pOperator->cost.openCost = 0;
16,834✔
243

244
  *pOptrInfo = pOperator;
16,834✔
245
  return code;
16,834✔
246

247
_error:
×
248
  pTaskInfo->code = code;
×
249
  if (code != TSDB_CODE_SUCCESS) {
×
250
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
251
  }
252
  if (pInfo != NULL) {
×
253
    pInfo->pTableList = NULL;
×
254
    destroyCacheScanOperator(pInfo);
×
255
  }
256
  if (pOperator != NULL) {
×
257
    pOperator->info = NULL;
×
258
    destroyOperator(pOperator);
×
259
  }
260
  return code;
×
261
}
262

263
static int32_t doScanCacheNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
29,915✔
264
  int32_t code = TSDB_CODE_SUCCESS;
29,915✔
265
  int32_t lino = 0;
29,915✔
266
  if (pOperator->status == OP_EXEC_DONE) {
29,915!
267
    (*ppRes) = NULL;
×
268
    return code;
×
269
  }
270

271
  SCacheRowsScanInfo* pInfo = pOperator->info;
29,915✔
272
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
29,915✔
273
  STableListInfo*     pTableList = pInfo->pTableList;
29,915✔
274
  SStoreCacheReader*  pReaderFn = &pInfo->readHandle.api.cacheFn;
29,915✔
275
  SSDataBlock*        pBufRes = pInfo->pBufferedRes;
29,915✔
276

277
  uint64_t suid = tableListGetSuid(pTableList);
29,915✔
278
  int32_t  size = 0;
29,915✔
279
  code = tableListGetSize(pTableList, &size);
29,915✔
280
  QUERY_CHECK_CODE(code, lino, _end);
29,920!
281

282
  if (size == 0) {
29,920✔
283
    setOperatorCompleted(pOperator);
2,364✔
284
    (*ppRes) = NULL;
2,364✔
285
    return code;
2,364✔
286
  }
287

288
  blockDataCleanup(pInfo->pRes);
27,556✔
289

290
  // check if it is a group by tbname
291
  if ((pInfo->retrieveType & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
27,540✔
292
    if (isTaskKilled(pTaskInfo)) {
17,063!
UNCOV
293
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
294
    }
295

296
    if (pInfo->indexOfBufferedRes >= pBufRes->info.rows && !pInfo->gotAll) {
17,071✔
297
      blockDataCleanup(pBufRes);
15,288✔
298
      taosArrayClear(pInfo->pUidList);
15,290✔
299

300
      code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pBufRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
15,292✔
301
                                     pInfo->pUidList, &pInfo->gotAll);
302
      QUERY_CHECK_CODE(code, lino, _end);
15,296!
303

304
      // check for tag values
305
      int32_t resultRows = pBufRes->info.rows;
15,296✔
306

307
      // the results may be null, if last values are all null
308
      if (resultRows != 0 && resultRows != taosArrayGetSize(pInfo->pUidList)) {
15,296!
309
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
310
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pTaskInfo->code));
×
311
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
312
      }
313
      pInfo->indexOfBufferedRes = 0;
15,297✔
314
    }
315

316
    SSDataBlock* pRes = pInfo->pRes;
17,080✔
317

318
    if (pInfo->indexOfBufferedRes < pBufRes->info.rows) {
17,080✔
319
      for (int32_t i = 0; i < taosArrayGetSize(pBufRes->pDataBlock); ++i) {
32,645✔
320
        SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
24,224✔
321
        QUERY_CHECK_NULL(pCol, code, lino, _end, terrno);
24,223!
322
        int32_t          slotId = pCol->info.slotId;
24,223✔
323

324
        SColumnInfoData* pSrc = taosArrayGet(pBufRes->pDataBlock, slotId);
24,223✔
325
        QUERY_CHECK_NULL(pSrc, code, lino, _end, terrno);
24,223!
326
        SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
24,223✔
327
        QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
24,221!
328

329
        if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
48,442✔
330
          colDataSetNULL(pDst, 0);
331
        } else {
332
          if (pSrc->pData) {
23,219✔
333
            char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
19,748!
334
            code = colDataSetVal(pDst, 0, p, false);
19,748✔
335
            QUERY_CHECK_CODE(code, lino, _end);
19,746!
336
          }
337
        }
338
      }
339

340
      void* pUid = taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
8,419✔
341
      QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
8,424!
342

343
      pRes->info.id.uid = *(tb_uid_t*)pUid;
8,424✔
344
      pRes->info.rows = 1;
8,424✔
345
      pRes->info.scanFlag = MAIN_SCAN;
8,424✔
346

347
      SExprSupp* pSup = &pInfo->pseudoExprSup;
8,424✔
348
      code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows,
8,424✔
349
                                    pTaskInfo, NULL);
350
      QUERY_CHECK_CODE(code, lino, _end);
8,423!
351

352
      pRes->info.id.groupId = tableListGetTableGroupId(pTableList, pRes->info.id.uid);
8,423✔
353
      pInfo->indexOfBufferedRes += 1;
8,426✔
354
      (*ppRes) = pRes;
8,426✔
355
      return code;
8,426✔
356
    } else {
357
      setOperatorCompleted(pOperator);
8,654✔
358
      (*ppRes) = NULL;
8,654✔
359
      return code;
8,654✔
360
    }
361
  } else {
362
    size_t totalGroups = tableListGetOutputGroups(pTableList);
10,477✔
363

364
    while (pInfo->currentGroupIndex < totalGroups) {
11,929✔
365
      if (isTaskKilled(pTaskInfo)) {
6,101!
366
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
367
      }
368

369
      STableKeyInfo* pList = NULL;
6,101✔
370
      int32_t        num = 0;
6,101✔
371

372
      code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
6,101✔
373
      QUERY_CHECK_CODE(code, lino, _end);
6,101!
374

375
      if (NULL == pInfo->pLastrowReader) {
6,101✔
376
        int32_t tmpRes = pReaderFn->openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
5,810✔
377
                                               taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList,
5,810✔
378
                                               pInfo->pSlotIds, suid, &pInfo->pLastrowReader, pTaskInfo->id.str,
5,810✔
379
                                               pInfo->pFuncTypeList, &pInfo->pkCol, pInfo->numOfPks);
380

381
        if (tmpRes != TSDB_CODE_SUCCESS) {
5,821!
382
          pInfo->currentGroupIndex += 1;
×
383
          taosArrayClear(pInfo->pUidList);
×
384
          continue;
×
385
        }
386
      } else {
387
        code = pReaderFn->reuseReader(pInfo->pLastrowReader, pList, num);
291✔
388
        QUERY_CHECK_CODE(code, lino, _end);
289!
389
      }
390

391
      taosArrayClear(pInfo->pUidList);
6,110✔
392

393
      code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
6,108✔
394
                                     pInfo->pUidList, NULL);
395
      QUERY_CHECK_CODE(code, lino, _end);
6,119!
396

397
      pInfo->currentGroupIndex += 1;
6,119✔
398

399
      // check for tag values
400
      if (pInfo->pRes->info.rows > 0) {
6,119✔
401
        if (pInfo->pseudoExprSup.numOfExprs > 0) {
4,667✔
402
          SExprSupp* pSup = &pInfo->pseudoExprSup;
338✔
403

404
          STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0];
338✔
405
          pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
338✔
406

407
          if (taosArrayGetSize(pInfo->pUidList) > 0) {
338!
408
            void* pUid = taosArrayGet(pInfo->pUidList, 0);
339✔
409
            QUERY_CHECK_NULL(pUid, code, lino, _end, terrno);
338!
410
            pInfo->pRes->info.id.uid = *(tb_uid_t*)pUid;
338✔
411
            code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
338✔
412
                                          pInfo->pRes->info.rows, pTaskInfo, NULL);
338✔
413
            QUERY_CHECK_CODE(code, lino, _end);
338!
414
          }
415
        }
416

417
        // pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
418
        (*ppRes) = pInfo->pRes;
4,666✔
419
        return code;
4,666✔
420
      } else {
421
        // pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
422
      }
423
    }
424

425
    pReaderFn->closeReader(pInfo->pLastrowReader);
5,828✔
426
    pInfo->pLastrowReader = NULL;
5,829✔
427
    setOperatorCompleted(pOperator);
5,829✔
428
    (*ppRes) = NULL;
5,825✔
429
    return code;
5,825✔
430
  }
431

432
_end:
×
433
  if (code != TSDB_CODE_SUCCESS) {
×
434
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
435
    pTaskInfo->code = code;
×
436
    T_LONG_JMP(pTaskInfo->env, code);
×
437
  }
438
  return code;
×
439
}
440

441
void destroyCacheScanOperator(void* param) {
16,845✔
442
  SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
16,845✔
443
  blockDataDestroy(pInfo->pRes);
16,845✔
444
  blockDataDestroy(pInfo->pBufferedRes);
16,846✔
445
  taosMemoryFreeClear(pInfo->pSlotIds);
16,846!
446
  taosMemoryFreeClear(pInfo->pDstSlotIds);
16,847!
447
  taosArrayDestroy(pInfo->pCidList);
16,848✔
448
  taosArrayDestroy(pInfo->pFuncTypeList);
16,847✔
449
  taosArrayDestroy(pInfo->pUidList);
16,848✔
450
  taosArrayDestroy(pInfo->matchInfo.pList);
16,849✔
451
  tableListDestroy(pInfo->pTableList);
16,849✔
452

453
  if (pInfo->pLastrowReader != NULL && pInfo->readHandle.api.cacheFn.closeReader != NULL) {
16,849!
454
    pInfo->readHandle.api.cacheFn.closeReader(pInfo->pLastrowReader);
8,655✔
455
    pInfo->pLastrowReader = NULL;
8,655✔
456
  }
457

458
  cleanupExprSupp(&pInfo->pseudoExprSup);
16,849✔
459
  taosMemoryFreeClear(param);
16,848!
460
}
16,849✔
461

462
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
16,824✔
463
                               int32_t** pDstSlotIds) {
464
  size_t numOfCols = taosArrayGetSize(pColMatchInfo);
16,824✔
465

466
  *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
16,827✔
467
  if (*pSlotIds == NULL) {
16,833!
468
    return terrno;
×
469
  }
470

471
  *pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
16,833✔
472
  if (*pDstSlotIds == NULL) {
16,843!
473
    taosMemoryFreeClear(*pSlotIds);
×
474
    return terrno;
×
475
  }
476

477
  SSchemaInfo*    pSchemaInfo = taosArrayGetLast(pTaskInfo->schemaInfos);
16,843✔
478
  SSchemaWrapper* pWrapper = pSchemaInfo->sw;
16,833✔
479

480
  for (int32_t i = 0; i < numOfCols; ++i) {
43,329✔
481
    SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
26,494✔
482
    if (!pColMatch) {
26,493!
483
      return terrno;
×
484
    }
485
    bool           found = false;
26,496✔
486
    for (int32_t j = 0; j < pWrapper->nCols; ++j) {
126,879✔
487
      /*      if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
488
        (*pSlotIds)[pColMatch->dstSlotId] = -1;
489
        break;
490
        }*/
491

492
      if (pColMatch->colId == pWrapper->pSchema[j].colId) {
126,511✔
493
        (*pSlotIds)[i] = j;
26,128✔
494
        (*pDstSlotIds)[i] = pColMatch->dstSlotId;
26,128✔
495
        found = true;
26,128✔
496
        break;
26,128✔
497
      }
498
    }
499
    if (!found) {
26,496✔
500
      (*pSlotIds)[i] = -1;
376✔
501
      (*pDstSlotIds)[i] = pColMatch->dstSlotId;
376✔
502
    }
503
  }
504

505
  return TSDB_CODE_SUCCESS;
16,835✔
506
}
507

508
int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo) {
16,826✔
509
  int32_t code = TSDB_CODE_SUCCESS;
16,826✔
510
  int32_t lino = 0;
16,826✔
511
  if (!pScanNode->ignoreNull) {  // retrieve cached last value
16,826✔
512
    return TSDB_CODE_SUCCESS;
7,487✔
513
  }
514

515
  size_t  size = taosArrayGetSize(pColMatchInfo->pList);
9,339✔
516
  SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
9,342✔
517
  QUERY_CHECK_NULL(pMatchInfo, code, lino, _end, terrno);
9,353!
518

519
  for (int32_t i = 0; i < size; ++i) {
21,267✔
520
    SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i);
11,913✔
521
    if (!pColInfo) {
11,908!
522
      return terrno;
×
523
    }
524

525
    int32_t    slotId = pColInfo->dstSlotId;
11,908✔
526
    SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots;
11,908✔
527

528
    SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
11,908✔
529
    QUERY_CHECK_NULL(pDesc, code, lino, _end, terrno);
11,921!
530

531
    if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
11,922✔
532
      void* tmp = taosArrayPush(pMatchInfo, pColInfo);
11,632✔
533
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
11,632!
534
    } else if (FUNCTION_TYPE_CACHE_LAST_ROW == pColInfo->funcType) {
282!
535
      void* tmp = taosArrayPush(pMatchInfo, pColInfo);
283✔
536
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
283!
537
    }
538
  }
539

540
  taosArrayDestroy(pColMatchInfo->pList);
9,354✔
541
  pColMatchInfo->pList = pMatchInfo;
9,355✔
542

543
_end:
9,355✔
544
  if (code != TSDB_CODE_SUCCESS) {
9,355!
545
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
546
  }
547
  return code;
9,352✔
548
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc