• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3720

26 Mar 2025 06:20AM UTC coverage: 30.242% (-31.7%) from 61.936%
#3720

push

travis-ci

web-flow
feat(taosBenchmark): supports decimal data type (#30456)

* feat: taosBenchmark supports decimal data type

* build: decimal script not use pytest.sh

* fix: fix typo for decimal script

* test: insertBasic.py debug

71234 of 313946 branches covered (22.69%)

Branch coverage included in aggregate %.

38 of 423 new or added lines in 8 files covered. (8.98%)

120240 existing lines in 447 files now uncovered.

118188 of 312400 relevant lines covered (37.83%)

1450220.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/streamscanoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "streamexecutorInt.h"
23
#include "streamsession.h"
24
#include "systable.h"
25
#include "tname.h"
26

27
#include "tdatablock.h"
28
#include "tmsg.h"
29
#include "ttime.h"
30

31
#include "operator.h"
32
#include "query.h"
33
#include "querytask.h"
34
#include "tcompare.h"
35
#include "thash.h"
36
#include "ttypes.h"
37

38
#include "storageapi.h"
39
#include "wal.h"
40

41
#define STREAM_DATA_SCAN_OP_NAME            "StreamDataScanOperator"
42
#define STREAM_DATA_SCAN_OP_STATE_NAME      "StreamDataScanFillHistoryState"
43
#define STREAM_DATA_SCAN_OP_CHECKPOINT_NAME "StreamDataScanOperator_Checkpoint"
44
#define STREAM_DATA_SCAN_OP_REC_ID_NAME     "StreamDataScanOperator_Recalculate_ID"
45

UNCOV
46
static int32_t getMaxTsKeyInfo(SStreamScanInfo* pInfo, SSDataBlock* pBlock, TSKEY* pCurTs, void** ppPkVal,
×
47
                               int32_t* pWinCode) {
UNCOV
48
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
49
  int32_t lino = 0;
×
UNCOV
50
  void*   pLastPkVal = NULL;
×
UNCOV
51
  int32_t lastPkLen = 0;
×
UNCOV
52
  if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
53
    SColumnInfoData* pPkColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->basic.primaryPkIndex);
×
54
    pLastPkVal = colDataGetData(pPkColDataInfo, pBlock->info.rows - 1);
×
55
    lastPkLen = colDataGetRowLength(pPkColDataInfo, pBlock->info.rows - 1);
×
56
  }
57

UNCOV
58
  code = pInfo->stateStore.streamStateGetAndSetTsData(pInfo->basic.pTsDataState, pBlock->info.id.uid, pCurTs, ppPkVal,
×
59
                                                      pBlock->info.window.ekey, pLastPkVal, lastPkLen, pWinCode);
UNCOV
60
  QUERY_CHECK_CODE(code, lino, _end);
×
61

UNCOV
62
_end:
×
UNCOV
63
  if (code != TSDB_CODE_SUCCESS) {
×
64
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
65
  }
UNCOV
66
  return code;
×
67
}
68

UNCOV
69
int32_t copyRecDataToBuff(TSKEY calStart, TSKEY calEnd, uint64_t uid, uint64_t version, EStreamType mode,
×
70
                          const SColumnInfoData* pPkColDataInfo, int32_t rowId, SRecDataInfo* pValueBuff,
71
                          int32_t buffLen) {
UNCOV
72
  pValueBuff->calWin.skey = calStart;
×
UNCOV
73
  pValueBuff->calWin.ekey = calEnd;
×
UNCOV
74
  pValueBuff->tableUid = uid;
×
UNCOV
75
  pValueBuff->dataVersion = version;
×
UNCOV
76
  pValueBuff->mode = mode;
×
77

UNCOV
78
  int32_t pkLen = 0;
×
UNCOV
79
  if (pPkColDataInfo != NULL) {
×
80
    pkLen = colDataGetRowLength(pPkColDataInfo, rowId);
×
81
    memcpy(pValueBuff->pPkColData, colDataGetData(pPkColDataInfo, rowId), pkLen);
×
82
  }
UNCOV
83
  return pkLen + sizeof(SRecDataInfo);
×
84
}
85

UNCOV
86
int32_t saveRecalculateData(SStateStore* pStateStore, STableTsDataState* pTsDataState, SSDataBlock* pSrcBlock,
×
87
                            EStreamType mode) {
UNCOV
88
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
89
  int32_t lino = 0;
×
90

UNCOV
91
  if (pSrcBlock->info.rows == 0) {
×
92
    return TSDB_CODE_SUCCESS;
×
93
  }
UNCOV
94
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
UNCOV
95
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
96
  SColumnInfoData* pSrcCalStartTsCol =
UNCOV
97
      (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
98
  SColumnInfoData* pSrcCalEndTsCol =
UNCOV
99
      (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
UNCOV
100
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
×
UNCOV
101
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
UNCOV
102
  TSKEY*           srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
×
UNCOV
103
  TSKEY*           srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
×
UNCOV
104
  TSKEY*           srcCalStartTsCol = (TSKEY*)pSrcCalStartTsCol->pData;
×
UNCOV
105
  TSKEY*           srcCalEndTsCol = (TSKEY*)pSrcCalEndTsCol->pData;
×
UNCOV
106
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
×
UNCOV
107
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
×
UNCOV
108
  TSKEY            calStart = INT64_MIN;
×
UNCOV
109
  TSKEY            calEnd = INT64_MIN;
×
UNCOV
110
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
×
UNCOV
111
    SSessionKey key = {.win.skey = srcStartTsCol[i], .win.ekey = srcEndTsCol[i], .groupId = srcGp[i]};
×
UNCOV
112
    if (mode == STREAM_RETRIEVE) {
×
113
      calStart = srcCalStartTsCol[i];
×
114
      calEnd = srcCalEndTsCol[i];
×
115
    } else {
UNCOV
116
      calStart = srcStartTsCol[i];
×
UNCOV
117
      calEnd = srcEndTsCol[i];
×
118
    }
UNCOV
119
    int32_t len = copyRecDataToBuff(calStart, calEnd, srcUidData[i], pSrcBlock->info.version, mode, NULL, 0,
×
120
                                    pTsDataState->pRecValueBuff, pTsDataState->recValueLen);
UNCOV
121
    code = pStateStore->streamStateMergeAndSaveScanRange(pTsDataState, &key.win, key.groupId,
×
122
                                                         pTsDataState->pRecValueBuff, len);
UNCOV
123
    QUERY_CHECK_CODE(code, lino, _end);
×
124
  }
125

UNCOV
126
_end:
×
UNCOV
127
  if (code != TSDB_CODE_SUCCESS) {
×
128
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
129
  }
UNCOV
130
  return code;
×
131
}
132

UNCOV
133
void buildRecalculateDataSnapshort(SStreamScanInfo* pInfo, SExecTaskInfo* pTaskInfo) {
×
UNCOV
134
  void*   buff = NULL;
×
UNCOV
135
  int32_t len = 0;
×
UNCOV
136
  int32_t res = pInfo->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_DATA_SCAN_OP_REC_ID_NAME,
×
137
                                                     strlen(STREAM_DATA_SCAN_OP_REC_ID_NAME), &buff, &len);
UNCOV
138
  taosMemFreeClear(buff);
×
UNCOV
139
  if (res == TSDB_CODE_SUCCESS) {
×
140
    qDebug("===stream===%s recalculate task is not completed yet, so no need to create a recalculate snapshot", GET_TASKID(pTaskInfo));
×
141
    return;
×
142
  }
143

UNCOV
144
  int32_t recID = pInfo->stateStore.streamStateGetNumber(pInfo->basic.pTsDataState->pState);
×
UNCOV
145
  pInfo->stateStore.streamStateSaveInfo(pTaskInfo->streamInfo.pState, STREAM_DATA_SCAN_OP_REC_ID_NAME,
×
146
                                        strlen(STREAM_DATA_SCAN_OP_REC_ID_NAME), &recID, sizeof(int32_t));
UNCOV
147
  pInfo->stateStore.streamStateSetNumber(pInfo->basic.pTsDataState->pState, recID + 1, pInfo->primaryTsIndex);
×
UNCOV
148
  qDebug("===stream===%s build recalculate snapshot id:%d", GET_TASKID(pTaskInfo), recID);
×
149
}
150

UNCOV
151
static int32_t getRecalculateId(SStateStore* pStateStore, void* pState, int32_t* pRecId) {
×
UNCOV
152
  void*   buff = NULL;
×
UNCOV
153
  int32_t len = 0;
×
UNCOV
154
  int32_t res = pStateStore->streamStateGetInfo(pState, STREAM_DATA_SCAN_OP_REC_ID_NAME,
×
155
                                                strlen(STREAM_DATA_SCAN_OP_REC_ID_NAME), &buff, &len);
UNCOV
156
  if (res != TSDB_CODE_SUCCESS) {
×
157
    qError("Not receive recalculate start block, but received recalculate end block");
×
158
    return res;
×
159
  }
UNCOV
160
  *(pRecId) = *(int32_t*)buff;
×
UNCOV
161
  taosMemFreeClear(buff);
×
UNCOV
162
  return TSDB_CODE_SUCCESS;
×
163
}
164

UNCOV
165
static int32_t deleteRecalculateDataSnapshort(SStreamScanInfo* pInfo, SExecTaskInfo* pTaskInfo) {
×
UNCOV
166
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
167
  int32_t lino = 0;
×
UNCOV
168
  void*   buff = NULL;
×
UNCOV
169
  int32_t len = 0;
×
UNCOV
170
  int32_t prevRecId = 0;
×
UNCOV
171
  code = getRecalculateId(&pInfo->stateStore, pTaskInfo->streamInfo.pState, &prevRecId);
×
UNCOV
172
  QUERY_CHECK_CODE(code, lino, _end);
×
173

UNCOV
174
  pInfo->stateStore.streamStateDeleteInfo(pTaskInfo->streamInfo.pState, STREAM_DATA_SCAN_OP_REC_ID_NAME,
×
175
                                          strlen(STREAM_DATA_SCAN_OP_REC_ID_NAME));
176

UNCOV
177
  int32_t curID = pInfo->stateStore.streamStateGetNumber(pInfo->basic.pTsDataState->pState);
×
UNCOV
178
  pInfo->stateStore.streamStateSetNumber(pInfo->basic.pTsDataState->pState, prevRecId, pInfo->primaryTsIndex);
×
UNCOV
179
  pInfo->stateStore.streamStateSessionDeleteAll(pInfo->basic.pTsDataState->pState);
×
180

UNCOV
181
  pInfo->stateStore.streamStateSetNumber(pInfo->basic.pTsDataState->pState, curID, pInfo->primaryTsIndex);
×
UNCOV
182
  qDebug("===stream===%s delete recalculate snapshot id:%d", GET_TASKID(pTaskInfo), prevRecId);
×
183

184
_end:
×
UNCOV
185
  if (code != TSDB_CODE_SUCCESS) {
×
186
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
187
  }
UNCOV
188
  return code;
×
189
}
190

191
static int32_t readPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int64_t version, char* taskIdStr,
×
192
                                       SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
193
  int32_t      code = TSDB_CODE_SUCCESS;
×
194
  int32_t      lino = 0;
×
195
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, startTs, endTs, version);
×
196
  QUERY_CHECK_NULL(pPreRes, code, lino, _end, terrno);
×
197

198
  printDataBlock(pPreRes, "pre res", taskIdStr);
×
199
  code = blockDataEnsureCapacity(pBlock, pPreRes->info.rows + pBlock->info.rows);
×
200
  QUERY_CHECK_CODE(code, lino, _end);
×
201

202
  SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
×
203
  SColumnInfoData* pPkCol = NULL;
×
204
  if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
205
    pPkCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->basic.primaryPkIndex);
×
206
  }
207
  for (int32_t i = 0; i < pPreRes->info.rows; i++) {
×
208
    uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
×
209
    code = appendPkToSpecialBlock(pBlock, (TSKEY*)pTsCol->pData, pPkCol, i, &uid, &groupId, NULL);
×
210
    QUERY_CHECK_CODE(code, lino, _end);
×
211
  }
212
  printDataBlock(pBlock, "new delete", taskIdStr);
×
213

214
_end:
×
215
  if (code != TSDB_CODE_SUCCESS) {
×
216
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
217
  }
218
  return code;
×
219
}
220

221
static int32_t readPrevVersionDataByBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, char* pTaskIdStr) {
×
222
  int32_t code = TSDB_CODE_SUCCESS;
×
223
  int32_t lino = 0;
×
224

225
  if (pSrcBlock->info.rows == 0) {
×
226
    return TSDB_CODE_SUCCESS;
×
227
  }
228
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
229
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
230
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
×
231

232
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
×
233
  TSKEY*    srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
×
234
  TSKEY*    srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
×
235
  int64_t   ver = pSrcBlock->info.version - 1;
×
236
  blockDataCleanup(pDestBlock);
×
237
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
×
238
    code = readPreVersionDataBlock(srcUidData[i], srcStartTsCol[i], srcEndTsCol[i], ver, pTaskIdStr, pInfo,
×
239
                                   pDestBlock);
240
    QUERY_CHECK_CODE(code, lino, _end);
×
241
  }
242

243
_end:
×
244
  if (code != TSDB_CODE_SUCCESS) {
×
245
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
246
  }
247
  return code;
×
248
}
249

UNCOV
250
static int32_t doStreamBlockScan(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
251
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
252
  int32_t          lino = 0;
×
UNCOV
253
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
254
  SStreamScanInfo* pInfo = pOperator->info;
×
UNCOV
255
  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
×
256

UNCOV
257
  qDebug("===stream===%s doStreamBlockScan", GET_TASKID(pTaskInfo));
×
UNCOV
258
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
×
259
  while (1) {
×
UNCOV
260
    if (pInfo->validBlockIndex >= total) {
×
UNCOV
261
      doClearBufferedBlocks(pInfo);
×
UNCOV
262
      (*ppRes) = NULL;
×
UNCOV
263
      break;
×
264
    }
265

UNCOV
266
    int32_t current = pInfo->validBlockIndex++;
×
UNCOV
267
    qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, GET_TASKID(pTaskInfo));
×
268

UNCOV
269
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
×
UNCOV
270
    QUERY_CHECK_NULL(pPacked, code, lino, _end, terrno);
×
271

UNCOV
272
    SSDataBlock* pBlock = pPacked->pDataBlock;
×
UNCOV
273
    if (pBlock->info.parTbName[0]) {
×
274
      code =
UNCOV
275
          pInfo->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName);
×
UNCOV
276
      QUERY_CHECK_CODE(code, lino, _end);
×
277
    }
278

UNCOV
279
    pBlock->info.calWin.skey = INT64_MIN;
×
UNCOV
280
    pBlock->info.calWin.ekey = INT64_MAX;
×
UNCOV
281
    pBlock->info.dataLoad = 1;
×
282

UNCOV
283
    code = blockDataUpdateTsWindow(pBlock, 0);
×
UNCOV
284
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
285
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
×
UNCOV
286
    switch (pBlock->info.type) {
×
UNCOV
287
      case STREAM_NORMAL:
×
288
      case STREAM_INVALID:
289
      case STREAM_GET_ALL: 
290
      case STREAM_RECALCULATE_DATA: 
291
      case STREAM_RECALCULATE_DELETE:
292
      case STREAM_CREATE_CHILD_TABLE: {
UNCOV
293
        setStreamOperatorState(&pInfo->basic, pBlock->info.type);
×
UNCOV
294
        (*ppRes) = pBlock;
×
UNCOV
295
      } break;
×
UNCOV
296
      case STREAM_DELETE_DATA: {
×
UNCOV
297
        printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo));
×
UNCOV
298
        if (pInfo->tqReader) {
×
UNCOV
299
          code = filterDelBlockByUid(pInfo->pDeleteDataRes, pBlock, pInfo->tqReader, &pInfo->readerFn);
×
UNCOV
300
          QUERY_CHECK_CODE(code, lino, _end);
×
301
        } else {
302
          // its parent operator is final agg operator
303
          code = copyDataBlock(pInfo->pDeleteDataRes, pBlock);
×
304
          QUERY_CHECK_CODE(code, lino, _end);
×
305
          pInfo->pDeleteDataRes->info.type = STREAM_RECALCULATE_DATA;
×
306
        }
307

UNCOV
308
        code = setBlockGroupIdByUid(pInfo, pInfo->pDeleteDataRes);
×
UNCOV
309
        QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
310
        code = rebuildDeleteBlockData(pInfo->pDeleteDataRes, &pStreamInfo->fillHistoryWindow, GET_TASKID(pTaskInfo));
×
UNCOV
311
        QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
312
        if (pInfo->pDeleteDataRes->info.rows == 0) {
×
313
          continue;
×
314
        }
UNCOV
315
        printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete recv filtered",
×
UNCOV
316
                           GET_TASKID(pTaskInfo));
×
UNCOV
317
        if (pInfo->partitionSup.needCalc) {
×
318
          SSDataBlock* pTmpBlock = NULL;
×
319
          code = createOneDataBlock(pInfo->pDeleteDataRes, true, &pTmpBlock);
×
320
          QUERY_CHECK_CODE(code, lino, _end);
×
321
          readPrevVersionDataByBlock(pInfo, pTmpBlock, pInfo->pDeleteDataRes, GET_TASKID(pTaskInfo));
×
322
          blockDataDestroy(pTmpBlock);
×
323
        }
UNCOV
324
        pInfo->pDeleteDataRes->info.type = STREAM_RECALCULATE_DELETE;
×
UNCOV
325
        (*ppRes) = pInfo->pDeleteDataRes;
×
UNCOV
326
      } break;
×
327
      case STREAM_DROP_CHILD_TABLE: {
×
328
        int32_t deleteNum = 0;
×
329
        code = deletePartName(&pInfo->stateStore, pTaskInfo->streamInfo.pState, pBlock, &deleteNum);
×
330
        QUERY_CHECK_CODE(code, lino, _end);
×
331
        if (deleteNum == 0) {
×
332
          continue;
×
333
        }
334
        (*ppRes) = pBlock;
×
335
      } break;
×
336
      case STREAM_CHECKPOINT: {
×
337
        qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK");
×
338
      } break;
×
339
      case STREAM_RECALCULATE_START: {
×
340
        if (!isSemiOperator(&pInfo->basic)) {
×
341
          code = pInfo->stateStore.streamStateFlushReaminInfoToDisk(pInfo->basic.pTsDataState);
×
342
          QUERY_CHECK_CODE(code, lino, _end);
×
343
          buildRecalculateDataSnapshort(pInfo, pTaskInfo);
×
344
        }
345
        continue;
×
346
      } break;
347
      case STREAM_RECALCULATE_END: {
×
348
        if (isRecalculateOperator(&pInfo->basic)) {
×
349
          qError("stream recalculate error since recalculate operator receive STREAM_RECALCULATE_END");
×
350
          continue;
×
351
        }
352
        code = deleteRecalculateDataSnapshort(pInfo, pTaskInfo);
×
353
        QUERY_CHECK_CODE(code, lino, _end);
×
354
        continue;
×
355
      } break;
356
      default:
×
357
        break;
×
358
    }
UNCOV
359
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
×
UNCOV
360
    break;
×
361
  }
362

UNCOV
363
_end:
×
UNCOV
364
  printDataBlock((*ppRes), getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
UNCOV
365
  if (code != TSDB_CODE_SUCCESS) {
×
366
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
367
    pTaskInfo->code = code;
×
368
    (*ppRes) = NULL;
×
369
  }
UNCOV
370
  return code;
×
371
}
372

373
#ifdef BUILD_NO_CALL
374
static int32_t buildAndSaveRecalculateData(SSDataBlock* pSrcBlock, TSKEY* pTsCol, SColumnInfoData* pPkColDataInfo, int32_t num,
375
                                    SPartitionBySupporter* pParSup, SExprSupp* pPartScalarSup, SStateStore* pStateStore,
376
                                    STableTsDataState* pTsDataState, SSDataBlock* pDestBlock) {
377
  int32_t code = TSDB_CODE_SUCCESS;
378
  int32_t lino = 0;
379
  int32_t len = 0;
380
  if (pParSup->needCalc) {
381
    blockDataEnsureCapacity(pDestBlock, num * 2);
382
  } else {
383
    blockDataEnsureCapacity(pDestBlock, num);
384
  }
385

386
  for (int32_t rowId = 0; rowId < num; rowId++) {
387
    len = copyRecDataToBuff(pTsCol[rowId], pTsCol[rowId], pSrcBlock->info.id.uid, pSrcBlock->info.version, STREAM_CLEAR,
388
                            NULL, 0, pTsDataState->pRecValueBuff, pTsDataState->recValueLen);
389
    SSessionKey key = {.win.skey = pTsCol[rowId], .win.ekey = pTsCol[rowId], .groupId = 0};
390
    code = pStateStore->streamState1SessionSaveToDisk(pTsDataState, &key, pTsDataState->pRecValueBuff, len);
391
    QUERY_CHECK_CODE(code, lino, _end);
392
    uint64_t gpId = 0;
393
    code = appendPkToSpecialBlock(pDestBlock, pTsCol, pPkColDataInfo, rowId, &pSrcBlock->info.id.uid, &gpId, NULL);
394
    QUERY_CHECK_CODE(code, lino, _end);
395

396
    if (pParSup->needCalc) {
397
      key.groupId = calGroupIdByData(pParSup, pPartScalarSup, pSrcBlock, rowId);
398
      len = copyRecDataToBuff(pTsCol[rowId], pTsCol[rowId], pSrcBlock->info.id.uid, pSrcBlock->info.version,
399
                              STREAM_DELETE_DATA, NULL, 0, pTsDataState->pRecValueBuff,
400
                              pTsDataState->recValueLen);
401
      code = pStateStore->streamState1SessionSaveToDisk(pTsDataState, &key, pTsDataState->pRecValueBuff, len);
402
      QUERY_CHECK_CODE(code, lino, _end);
403

404
      code = appendPkToSpecialBlock(pDestBlock, pTsCol, pPkColDataInfo, rowId, &pSrcBlock->info.id.uid, &gpId, NULL);
405
      QUERY_CHECK_CODE(code, lino, _end);
406
    }
407
  }
408

409
_end:
410
  if (code != TSDB_CODE_SUCCESS) {
411
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
412
  }
413
  return code;
414
}
415
#endif
416

UNCOV
417
static uint64_t getCurDataGroupId(SPartitionBySupporter* pParSup, SExprSupp* pPartScalarSup, SSDataBlock* pSrcBlock, int32_t rowId) {
×
UNCOV
418
  if (pParSup->needCalc) {
×
419
    return calGroupIdByData(pParSup, pPartScalarSup, pSrcBlock, rowId);
×
420
  }
421

UNCOV
422
  return pSrcBlock->info.id.groupId;
×
423
}
424

425
static uint64_t getDataGroupIdByCol(SSteamOpBasicInfo* pBasic, SOperatorInfo* pTableScanOp,
×
426
                                    SPartitionBySupporter* pParSup, SExprSupp* pPartScalarSup, uint64_t uid, TSKEY ts,
427
                                    int64_t maxVersion, void* pVal, bool* pRes) {
428
  SSDataBlock* pPreRes = readPreVersionData(pTableScanOp, uid, ts, ts, maxVersion);
×
429
  if (!pPreRes || pPreRes->info.rows == 0) {
×
430
    if (terrno != TSDB_CODE_SUCCESS) {
×
431
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
432
    }
433
    (*pRes) = false;
×
434
    return 0;
×
435
  }
436

437
  int32_t rowId = 0;
×
438
  if (hasSrcPrimaryKeyCol(pBasic)) {
×
439
    SColumnInfoData* pPkCol = taosArrayGet(pPreRes->pDataBlock, pBasic->primaryPkIndex);
×
440
    for (; rowId < pPreRes->info.rows; rowId++) {
×
441
      if (comparePrimaryKey(pPkCol, rowId, pVal)) {
×
442
        break;
×
443
      }
444
    }
445
  }
446
  if (rowId >= pPreRes->info.rows) {
×
447
    qInfo("===stream===read preversion data of primary key failed. ts:%" PRId64 ",version:%" PRId64, ts, maxVersion);
×
448
    (*pRes) = false;
×
449
    return 0;
×
450
  }
451
  (*pRes) = true;
×
452
  return calGroupIdByData(pParSup, pPartScalarSup, pPreRes, rowId);
×
453
}
454

UNCOV
455
static int32_t buildRecalculateData(SSteamOpBasicInfo* pBasic, SOperatorInfo* pTableScanOp,
×
456
                                    SPartitionBySupporter* pParSup, SExprSupp* pPartScalarSup, SSDataBlock* pSrcBlock,
457
                                    TSKEY* pTsCol, SColumnInfoData* pPkColDataInfo, SSDataBlock* pDestBlock,
458
                                    int32_t num) {
UNCOV
459
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
460
  int32_t lino = 0;
×
UNCOV
461
  blockDataEnsureCapacity(pDestBlock, num * 2);
×
UNCOV
462
  for (int32_t rowId = 0; rowId < num; rowId++) {
×
UNCOV
463
    uint64_t gpId = getCurDataGroupId(pParSup, pPartScalarSup, pSrcBlock, rowId);
×
UNCOV
464
    code = appendPkToSpecialBlock(pDestBlock, pTsCol, pPkColDataInfo, rowId, &pSrcBlock->info.id.uid, &gpId, NULL);
×
UNCOV
465
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
466
    if (pParSup->needCalc) {
×
467
      bool  res = false;
×
468
      void* pVal = NULL;
×
469
      if (hasSrcPrimaryKeyCol(pBasic)) {
×
470
        pVal = colDataGetData(pPkColDataInfo, rowId);
×
471
      }
472
      gpId = getDataGroupIdByCol(pBasic, pTableScanOp, pParSup, pPartScalarSup, pSrcBlock->info.id.uid, pTsCol[rowId],
×
473
                                 pSrcBlock->info.version - 1, pVal, &res);
×
474
      if (res == true) {
×
475
        code = appendPkToSpecialBlock(pDestBlock, pTsCol, pPkColDataInfo, rowId, &pSrcBlock->info.id.uid, &gpId, NULL);
×
476
        QUERY_CHECK_CODE(code, lino, _end);
×
477
      }
478
    }
479
  }
480

UNCOV
481
_end:
×
UNCOV
482
  if (code != TSDB_CODE_SUCCESS) {
×
483
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
484
  }
UNCOV
485
  return code;
×
486
}
487

UNCOV
488
static int32_t doStreamWALScan(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
489
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
490
  int32_t          lino = 0;
×
UNCOV
491
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
492
  SStreamScanInfo* pInfo = pOperator->info;
×
UNCOV
493
  SStorageAPI*     pAPI = &pTaskInfo->storageAPI;
×
UNCOV
494
  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
×
UNCOV
495
  int32_t          totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
×
496

UNCOV
497
  switch (pInfo->scanMode) {
×
UNCOV
498
    case STREAM_SCAN_FROM_RES: {
×
UNCOV
499
      if (pInfo->pUpdateRes->info.rows > 0) {
×
500
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
×
501
      } else {
UNCOV
502
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
×
503
      }
UNCOV
504
      (*ppRes) = pInfo->pRes;
×
UNCOV
505
      goto _end;
×
506
    } break;
507
    case STREAM_SCAN_FROM_UPDATERES: {
×
508
      (*ppRes) = pInfo->pUpdateRes;
×
509
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
×
510
      goto _end;
×
511
    } break;
UNCOV
512
    default:
×
UNCOV
513
      (*ppRes) = NULL;
×
UNCOV
514
      break;
×
515
  }
516

517
  while (1) {
UNCOV
518
    if (pInfo->readerFn.tqReaderCurrentBlockConsumed(pInfo->tqReader)) {
×
UNCOV
519
      if (pInfo->validBlockIndex >= totalBlocks) {
×
UNCOV
520
        doClearBufferedBlocks(pInfo);
×
521

UNCOV
522
        qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, GET_TASKID(pTaskInfo));
×
UNCOV
523
        (*ppRes) = NULL;
×
UNCOV
524
        goto _end;
×
525
      }
526

UNCOV
527
      int32_t      current = pInfo->validBlockIndex++;
×
UNCOV
528
      SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
×
UNCOV
529
      QUERY_CHECK_NULL(pSubmit, code, lino, _end, terrno);
×
530

UNCOV
531
      qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, GET_TASKID(pTaskInfo));
×
UNCOV
532
      if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver, NULL) < 0) {
×
533
        qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current,
×
534
               totalBlocks, GET_TASKID(pTaskInfo));
535
        continue;
×
536
      }
537
    }
538

UNCOV
539
    blockDataCleanup(pInfo->pRes);
×
540

UNCOV
541
    while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, GET_TASKID(pTaskInfo))) {
×
UNCOV
542
      SSDataBlock* pRes = NULL;
×
543

UNCOV
544
      code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, GET_TASKID(pTaskInfo));
×
UNCOV
545
      qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows,
×
546
             GET_TASKID(pTaskInfo));
547

UNCOV
548
      if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
×
549
        qDebug("retrieve data failed, try next block in submit block, %s", GET_TASKID(pTaskInfo));
×
550
        continue;
×
551
      }
552

UNCOV
553
      code = setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false);
×
UNCOV
554
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
555
        pInfo->pRes->info.rows = 0;
×
556
        code = TSDB_CODE_SUCCESS;
×
557
      }
UNCOV
558
      QUERY_CHECK_CODE(code, lino, _end);
×
559

UNCOV
560
      if (pInfo->pRes->info.rows == 0) {
×
561
        continue;
×
562
      }
563

UNCOV
564
      setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type);
×
UNCOV
565
      code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
×
UNCOV
566
      QUERY_CHECK_CODE(code, lino, _end);
×
567

UNCOV
568
      code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
×
UNCOV
569
      QUERY_CHECK_CODE(code, lino, _end);
×
570

UNCOV
571
      TSKEY   curTs = INT64_MIN;
×
UNCOV
572
      void*   pPkVal = NULL;
×
UNCOV
573
      int32_t winCode = TSDB_CODE_FAILED;
×
UNCOV
574
      code = getMaxTsKeyInfo(pInfo, pInfo->pRes, &curTs, &pPkVal, &winCode);
×
UNCOV
575
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
576
      if (pInfo->pUpdateRes->info.rows > 0) {
×
UNCOV
577
        blockDataCleanup(pInfo->pUpdateRes);
×
578
      }
579

UNCOV
580
      SColumnInfoData* pPkColDataInfo = NULL;
×
UNCOV
581
      if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
582
        pPkColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->basic.primaryPkIndex);
×
583
      }
UNCOV
584
      SColumnInfoData* pTsCol = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex);
×
UNCOV
585
      if (winCode == TSDB_CODE_SUCCESS && curTs >= pInfo->pRes->info.window.skey) {
×
UNCOV
586
        int32_t num = 0;
×
UNCOV
587
        if (curTs < pInfo->pRes->info.window.ekey) {
×
588
          num = getForwardStepsInBlock(pInfo->pRes->info.rows, binarySearchForKey, curTs, 0, TSDB_ORDER_ASC,
×
589
                                       (TSKEY*)pTsCol->pData);
×
590
          if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
591
            for (; num >= 0; num--) {
×
592
              void* pColPkData = colDataGetData(pPkColDataInfo, num);
×
593
              if (pInfo->comparePkColFn(pColPkData, pPkVal) <= 0) {
×
594
                break;
×
595
              }
596
            }
597
          }
598
        } else {
UNCOV
599
          num = pInfo->pRes->info.rows;
×
600
        }
601

UNCOV
602
        if (num > 0) {
×
UNCOV
603
          qInfo("%s stream scan op ignore disorder data. rows:%d, tableUid:%" PRId64 ", last max ts:%" PRId64
×
604
                ", block start key:%" PRId64 ", end key:%" PRId64,
605
                GET_TASKID(pTaskInfo), num, pInfo->pRes->info.id.uid, curTs, pInfo->pRes->info.window.skey,
606
                pInfo->pRes->info.window.ekey);
UNCOV
607
          code = buildRecalculateData(&pInfo->basic, pInfo->pTableScanOp, &pInfo->partitionSup, pInfo->pPartScalarSup,
×
UNCOV
608
                                      pInfo->pRes, (TSKEY*)pTsCol->pData, pPkColDataInfo, pInfo->pUpdateRes, num);
×
UNCOV
609
          QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
610
          code = blockDataTrimFirstRows(pInfo->pRes, num);
×
UNCOV
611
          QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
612
          code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
×
UNCOV
613
          QUERY_CHECK_CODE(code, lino, _end);
×
614
        }
615
      }
616

UNCOV
617
      if (pInfo->pCreateTbRes->info.rows > 0) {
×
UNCOV
618
        if (pInfo->pRes->info.rows > 0) {
×
UNCOV
619
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
×
620
        } else if (pInfo->pUpdateRes->info.rows > 0) {
×
621
          pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
×
622
        }
UNCOV
623
        qDebug("create table res exists, rows:%" PRId64 " return from stream scan, %s", pInfo->pCreateTbRes->info.rows,
×
624
               GET_TASKID(pTaskInfo));
UNCOV
625
        (*ppRes) = pInfo->pCreateTbRes;
×
UNCOV
626
        break;
×
627
      }
628

UNCOV
629
      if (pInfo->pRes->info.rows > 0) {
×
UNCOV
630
        if (pInfo->pUpdateRes->info.rows > 0) {
×
631
          pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
×
632
        }
UNCOV
633
        (*ppRes) = pInfo->pRes;
×
UNCOV
634
        break;
×
635
      }
636

UNCOV
637
      if (pInfo->pUpdateRes->info.rows > 0) {
×
UNCOV
638
        (*ppRes) = pInfo->pUpdateRes;
×
UNCOV
639
        break;
×
640
      }
641
    }
642

UNCOV
643
    if ((*ppRes) != NULL && (*ppRes)->info.rows > 0) {
×
UNCOV
644
      break;
×
645
    }
646
  }
647

UNCOV
648
_end:
×
UNCOV
649
  printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
UNCOV
650
  if (code != TSDB_CODE_SUCCESS) {
×
651
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
652
    pTaskInfo->code = code;
×
653
    (*ppRes) = NULL;
×
654
  }
UNCOV
655
  return code;
×
656
}
657

UNCOV
658
void streamDataScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
×
UNCOV
659
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
660
  int32_t lino = 0;
×
661

UNCOV
662
  if (needSaveStreamOperatorInfo(&pInfo->basic)) {
×
663
    pInfo->stateStore.streamStateTsDataCommit(pInfo->basic.pTsDataState);
×
664
    saveStreamOperatorStateComplete(&pInfo->basic);
×
665
  }
666

UNCOV
667
_end:
×
UNCOV
668
  if (code != TSDB_CODE_SUCCESS) {
×
669
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
670
  }
UNCOV
671
}
×
672

UNCOV
673
int32_t doStreamDataScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
674
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
675
  int32_t          lino = 0;
×
UNCOV
676
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
677
  const char*      id = GET_TASKID(pTaskInfo);
×
UNCOV
678
  SStorageAPI*     pAPI = &pTaskInfo->storageAPI;
×
UNCOV
679
  SStreamScanInfo* pInfo = pOperator->info;
×
UNCOV
680
  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
×
681

UNCOV
682
  qDebug("stream data scan started, %s", id);
×
683

UNCOV
684
  if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
×
UNCOV
685
      pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
×
UNCOV
686
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
×
UNCOV
687
    memcpy(&pTSInfo->base.cond, &pStreamInfo->tableCond, sizeof(SQueryTableDataCond));
×
688

UNCOV
689
    if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
×
UNCOV
690
      pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
×
UNCOV
691
      pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
×
692

UNCOV
693
      pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
×
UNCOV
694
      qDebug("stream scan step1, verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 ", %s",
×
695
             pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
696
             pTSInfo->base.cond.twindows.ekey, id);
UNCOV
697
      pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN1;
×
UNCOV
698
      pStreamInfo->recoverScanFinished = false;
×
699
    } else {
700
      pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
×
701
      pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
×
702
      pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
×
703
      qDebug("stream scan step2 (scan wal), verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
×
704
             pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
705
             pTSInfo->base.cond.twindows.ekey, id);
706
      pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
×
707
    }
708

UNCOV
709
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
×
710

UNCOV
711
    pTSInfo->base.dataReader = NULL;
×
UNCOV
712
    pInfo->pTableScanOp->status = OP_OPENED;
×
713

UNCOV
714
    pTSInfo->scanTimes = 0;
×
UNCOV
715
    pTSInfo->currentGroupId = -1;
×
716
  }
717

UNCOV
718
  if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
×
UNCOV
719
    if (isTaskKilled(pTaskInfo)) {
×
720
      qInfo("===stream===stream scan is killed. task id:%s, code %s", id, tstrerror(pTaskInfo->code));
×
721
      (*ppRes) = NULL;
×
722
      return code;
×
723
    }
724

UNCOV
725
    if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
×
UNCOV
726
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
×
UNCOV
727
      (*ppRes) = pInfo->pRecoverRes;
×
UNCOV
728
      return code;
×
729
    }
730

731
    while (1) {
×
UNCOV
732
      code = doTableScanNext(pInfo->pTableScanOp, &pInfo->pRecoverRes);
×
UNCOV
733
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
734
      if (pInfo->pRecoverRes == NULL) {
×
UNCOV
735
        break;
×
736
      }
UNCOV
737
      setStreamOperatorState(&pInfo->basic, pInfo->pRecoverRes->info.type);
×
UNCOV
738
      code = doFilter(pInfo->pRecoverRes, pOperator->exprSupp.pFilterInfo, NULL);
×
UNCOV
739
      QUERY_CHECK_CODE(code, lino, _end);
×
740

UNCOV
741
      if (pInfo->pRecoverRes->info.rows <= 0) {
×
742
        continue;
×
743
      }
744

UNCOV
745
      TSKEY   curTs = INT64_MIN;
×
UNCOV
746
      void*   pPkVal = NULL;
×
UNCOV
747
      int32_t winCode = TSDB_CODE_FAILED;
×
UNCOV
748
      code = getMaxTsKeyInfo(pInfo, pInfo->pRecoverRes, &curTs, &pPkVal, &winCode);
×
UNCOV
749
      QUERY_CHECK_CODE(code, lino, _end);
×
750

UNCOV
751
      code = calBlockTbName(pInfo, pInfo->pRecoverRes, 0);
×
UNCOV
752
      QUERY_CHECK_CODE(code, lino, _end);
×
753

UNCOV
754
      if (pInfo->pCreateTbRes->info.rows > 0) {
×
UNCOV
755
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
×
UNCOV
756
        printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "recover",
×
UNCOV
757
                           GET_TASKID(pTaskInfo));
×
UNCOV
758
        (*ppRes) = pInfo->pCreateTbRes;
×
UNCOV
759
        return code;
×
760
      }
761

UNCOV
762
      qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
×
UNCOV
763
      printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover",
×
UNCOV
764
                         GET_TASKID(pTaskInfo));
×
UNCOV
765
      (*ppRes) = pInfo->pRecoverRes;
×
UNCOV
766
      return code;
×
767
    }
UNCOV
768
    pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
×
UNCOV
769
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
×
UNCOV
770
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
×
UNCOV
771
    pTSInfo->base.dataReader = NULL;
×
UNCOV
772
    pTSInfo->base.cond.startVersion = -1;
×
UNCOV
773
    pTSInfo->base.cond.endVersion = -1;
×
UNCOV
774
    pStreamInfo->recoverScanFinished = true;
×
UNCOV
775
    (*ppRes) = NULL;
×
UNCOV
776
    qDebug("===stream===%s fill history is finished.", GET_TASKID(pTaskInfo));
×
UNCOV
777
    return code;
×
778
  }
779

UNCOV
780
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
×
781

UNCOV
782
  switch (pInfo->blockType) {
×
UNCOV
783
    case STREAM_INPUT__DATA_BLOCK: {
×
UNCOV
784
      doStreamBlockScan(pOperator, ppRes);
×
UNCOV
785
    } break;
×
UNCOV
786
    case STREAM_INPUT__DATA_SUBMIT: {
×
UNCOV
787
      doStreamWALScan(pOperator, ppRes);
×
UNCOV
788
    } break;
×
UNCOV
789
    case STREAM_INPUT__CHECKPOINT:
×
790
    case STREAM_INPUT__RECALCULATE: {
UNCOV
791
      if (pInfo->validBlockIndex >= total) {
×
UNCOV
792
        doClearBufferedBlocks(pInfo);
×
UNCOV
793
        (*ppRes) = NULL;
×
UNCOV
794
        return code;
×
795
      }
796

UNCOV
797
      int32_t current = pInfo->validBlockIndex++;
×
UNCOV
798
      qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id);
×
799

UNCOV
800
      SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current);
×
UNCOV
801
      QUERY_CHECK_NULL(pData, code, lino, _end, terrno);
×
UNCOV
802
      SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0);
×
UNCOV
803
      QUERY_CHECK_NULL(pBlock, code, lino, _end, terrno);
×
UNCOV
804
      printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
×
805

UNCOV
806
      if (pBlock->info.type == STREAM_CHECKPOINT) {
×
UNCOV
807
        code = pInfo->stateStore.streamStateFlushReaminInfoToDisk(pInfo->basic.pTsDataState);
×
UNCOV
808
        QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
809
        streamDataScanOperatorSaveCheckpoint(pInfo);
×
UNCOV
810
        (*ppRes) = pInfo->pCheckpointRes;
×
UNCOV
811
      } else if (pBlock->info.type == STREAM_RECALCULATE_START) {
×
UNCOV
812
        if (!isSemiOperator(&pInfo->basic)) {
×
UNCOV
813
          code = pInfo->stateStore.streamStateFlushReaminInfoToDisk(pInfo->basic.pTsDataState);
×
UNCOV
814
          QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
815
          buildRecalculateDataSnapshort(pInfo, pTaskInfo);
×
816
        }
UNCOV
817
      } else if (pBlock->info.type == STREAM_RECALCULATE_END) {
×
UNCOV
818
        if (isRecalculateOperator(&pInfo->basic)) {
×
819
          qError("stream recalculate error since recalculate operator receive STREAM_RECALCULATE_END");
×
820
        } else {
UNCOV
821
          code = deleteRecalculateDataSnapshort(pInfo, pTaskInfo);
×
UNCOV
822
          QUERY_CHECK_CODE(code, lino, _end);
×
823
        }
824
      }
UNCOV
825
      (*ppRes) = pBlock;
×
UNCOV
826
      return code;
×
827
    } break;
828
    default: {
×
829
      qError("stream scan error, invalid block type %d, %s", pInfo->blockType, id);
×
830
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
831
    } break;
×
832
  }
833

UNCOV
834
_end:
×
UNCOV
835
  if (code != TSDB_CODE_SUCCESS) {
×
836
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
837
    pTaskInfo->code = code;
×
838
    (*ppRes) = NULL;
×
839
  }
UNCOV
840
  return code;
×
841
}
842

UNCOV
843
void streamDataScanReleaseState(SOperatorInfo* pOperator) {
×
UNCOV
844
  SStreamScanInfo* pInfo = pOperator->info;
×
UNCOV
845
  pInfo->stateStore.streamStateTsDataCommit(pInfo->basic.pTsDataState);
×
UNCOV
846
}
×
847

UNCOV
848
void streamDataScanReloadState(SOperatorInfo* pOperator) {
×
UNCOV
849
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
850
  int32_t          lino = 0;
×
UNCOV
851
  SStreamScanInfo* pInfo = pOperator->info;
×
UNCOV
852
  code = pInfo->stateStore.streamStateReloadTsDataState(pInfo->basic.pTsDataState);
×
UNCOV
853
  QUERY_CHECK_CODE(code, lino, _end);
×
854

UNCOV
855
_end:
×
UNCOV
856
  if (code != TSDB_CODE_SUCCESS) {
×
857
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
858
  }
UNCOV
859
}
×
860

861
static uint64_t getDataGroupId(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion, void* pVal,
×
862
                               bool* pRes) {
863
  if (pInfo->partitionSup.needCalc) {
×
864
    return getDataGroupIdByCol(&pInfo->basic, pInfo->pTableScanOp, &pInfo->partitionSup, pInfo->pPartScalarSup, uid, ts,
×
865
                               maxVersion, pVal, pRes);
866
  }
867

868
  *pRes = true;
×
869
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
×
870
  return tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
×
871
}
872

873
static int32_t generateSessionDataScanRange(SStreamScanInfo* pInfo, SSHashObj* pRecRangeMap, SArray* pSrcRange) {
×
874
  int32_t code = TSDB_CODE_SUCCESS;
×
875
  int32_t lino = 0;
×
876

877
  int32_t size =  taosArrayGetSize(pSrcRange);
×
878
  for (int32_t i = 0; i < size; i++) {
×
879
    SArray* pRange = taosArrayGetP(pSrcRange, i);
×
880
    uint64_t      groupId = *(uint64_t*) taosArrayGet(pRange, 2);
×
881
    STimeWindow   resWin = {0};
×
882
    resWin.skey = *(TSKEY*) taosArrayGet(pRange, 3);
×
883
    resWin.ekey = *(TSKEY*) taosArrayGet(pRange, 4);
×
884

885
    SSessionKey key = {.groupId = groupId};
×
886
    key.win.skey = *(TSKEY*) taosArrayGet(pRange, 0);
×
887
    key.win.ekey = *(TSKEY*) taosArrayGet(pRange, 1);
×
888
    void* pVal = tSimpleHashGet(pRecRangeMap, &key, sizeof(SSessionKey));
×
889
    QUERY_CHECK_NULL(pVal, code, lino, _end, TSDB_CODE_FAILED);
×
890
    SRecDataInfo* pRecData = *(void**)pVal;
×
891

892
    code = pInfo->stateStore.streamStateMergeAndSaveScanRange(pInfo->basic.pTsDataState, &resWin, groupId, pRecData,
×
893
                                                              pInfo->basic.pTsDataState->recValueLen);
×
894
    QUERY_CHECK_CODE(code, lino, _end);
×
895
    taosArrayDestroy(pRange);
×
896
  }
897
  taosArrayClear(pSrcRange);
×
898

899
_end:
×
900
  if (code != TSDB_CODE_SUCCESS) {
×
901
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
902
  }
903
  return code;
×
904
}
905

906
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, char* pTaskIdStr) {
×
907
  int32_t code = TSDB_CODE_SUCCESS;
×
908
  int32_t lino = 0;
×
909

910
  SSessionKey      firstKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
×
911
  SStreamStateCur* pCur =
912
      pInfo->stateStore.streamStateSessionSeekKeyCurrentNext(pInfo->basic.pTsDataState->pStreamTaskState, &firstKey);
×
913
  while (1) {
×
914
    SSessionKey rangKey = {0};
×
915
    void*       pVal = NULL;
×
916
    int32_t     len = 0;
×
917
    int32_t     winRes = pInfo->stateStore.streamStateSessionGetKVByCur(pCur, &rangKey, &pVal, &len);
×
918
    if (winRes != TSDB_CODE_SUCCESS) {
×
919
      break;
×
920
    }
921
    qDebug("===stream===%s get range from disk. start ts:%" PRId64 ",end ts:%" PRId64 ", group id:%" PRIu64, pTaskIdStr,
×
922
           rangKey.win.skey, rangKey.win.ekey, rangKey.groupId);
923
    code = tSimpleHashPut(pInfo->pRecRangeMap, &rangKey, sizeof(SSessionKey), &pVal, POINTER_BYTES);
×
924
    QUERY_CHECK_CODE(code, lino, _end);
×
925

926
    if (tSimpleHashGetSize(pInfo->pRecRangeMap) > 1024) {
×
927
      code = streamClientGetResultRange(&pInfo->recParam, pInfo->pRecRangeMap, pInfo->pRecRangeRes);
×
928
      QUERY_CHECK_CODE(code, lino, _end);
×
929
      code = generateSessionDataScanRange(pInfo, pInfo->pRecRangeMap, pInfo->pRecRangeRes);
×
930
      QUERY_CHECK_CODE(code, lino, _end);
×
931
      tSimpleHashClear(pInfo->pRecRangeMap);
×
932
    }
933
    pInfo->stateStore.streamStateCurNext(pInfo->basic.pTsDataState->pStreamTaskState, pCur);
×
934
  }
935
  pInfo->stateStore.streamStateFreeCur(pCur);
×
936

937
  if (tSimpleHashGetSize(pInfo->pRecRangeMap) > 0) {
×
938
    code = streamClientGetResultRange(&pInfo->recParam, pInfo->pRecRangeMap, pInfo->pRecRangeRes);
×
939
    QUERY_CHECK_CODE(code, lino, _end);
×
940
    code = generateSessionDataScanRange(pInfo, pInfo->pRecRangeMap, pInfo->pRecRangeRes);
×
941
    QUERY_CHECK_CODE(code, lino, _end);
×
942
    tSimpleHashClear(pInfo->pRecRangeMap);
×
943
  }
944

945
_end:
×
946
  if (code != TSDB_CODE_SUCCESS) {
×
947
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
948
  }
949
  return code;
×
950
}
951

UNCOV
952
static int32_t generateIntervalDataScanRange(SStreamScanInfo* pInfo, char* pTaskIdStr, SSessionKey* pSeKey,
×
953
                                             SRecDataInfo* pRecData, int32_t len) {
UNCOV
954
  int32_t        code = TSDB_CODE_SUCCESS;
×
UNCOV
955
  int32_t        lino = 0;
×
UNCOV
956
  int32_t        rowId = 0;
×
UNCOV
957
  SDataBlockInfo tmpInfo = {0};
×
UNCOV
958
  tmpInfo.rows = 1;
×
UNCOV
959
  STimeWindow win = getSlidingWindow(&pSeKey->win.skey, &pSeKey->win.ekey, &pSeKey->groupId, &pInfo->interval, &tmpInfo,
×
UNCOV
960
                                     &rowId, pInfo->partitionSup.needCalc);
×
UNCOV
961
  pInfo->stateStore.streamStateMergeAndSaveScanRange(pInfo->basic.pTsDataState, &win, pSeKey->groupId, pRecData, len);
×
962

UNCOV
963
_end:
×
UNCOV
964
  if (code != TSDB_CODE_SUCCESS) {
×
965
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
966
  }
UNCOV
967
  return code;
×
968
}
969

UNCOV
970
static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, char* pTaskIdStr) {
×
UNCOV
971
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
972
  int32_t lino = 0;
×
973

UNCOV
974
  SSessionKey      firstKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
×
975
  SStreamStateCur* pCur =
UNCOV
976
      pInfo->stateStore.streamStateSessionSeekKeyCurrentNext(pInfo->basic.pTsDataState->pStreamTaskState, &firstKey);
×
UNCOV
977
  while (1) {
×
UNCOV
978
    SSessionKey rangKey = {0};
×
UNCOV
979
    void*       pVal = NULL;
×
UNCOV
980
    int32_t     len = 0;
×
UNCOV
981
    int32_t     winRes = pInfo->stateStore.streamStateSessionGetKVByCur(pCur, &rangKey, &pVal, &len);
×
UNCOV
982
    if (winRes != TSDB_CODE_SUCCESS) {
×
UNCOV
983
      break;
×
984
    }
UNCOV
985
    qDebug("===stream===%s get range from disk. start ts:%" PRId64 ",end ts:%" PRId64 ", group id:%" PRIu64,
×
986
           pTaskIdStr, rangKey.win.skey, rangKey.win.ekey, rangKey.groupId);
UNCOV
987
    code = generateIntervalDataScanRange(pInfo, pTaskIdStr, &rangKey, (SRecDataInfo*)pVal, len);
×
UNCOV
988
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
989
    taosMemFreeClear(pVal);
×
UNCOV
990
    pInfo->stateStore.streamStateCurNext(pInfo->basic.pTsDataState->pStreamTaskState, pCur);
×
991
  }
UNCOV
992
  pInfo->stateStore.streamStateFreeCur(pCur);
×
993

UNCOV
994
_end:
×
UNCOV
995
  if (code != TSDB_CODE_SUCCESS) {
×
996
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
997
  }
UNCOV
998
  return code;
×
999
}
1000

UNCOV
1001
static int32_t generateDataScanRange(SStreamScanInfo* pInfo, char* pTaskIdStr) {
×
UNCOV
1002
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1003
  int32_t lino = 0;
×
UNCOV
1004
  switch (pInfo->windowSup.parentType) {
×
UNCOV
1005
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL:
×
1006
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SEMI_INTERVAL:
1007
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_FINAL_INTERVAL: {
UNCOV
1008
      code = generateIntervalScanRange(pInfo, pTaskIdStr);
×
UNCOV
1009
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1010
    } break;
×
1011
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SESSION:
×
1012
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SEMI_SESSION:
1013
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_FINAL_SESSION:
1014
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_STATE:
1015
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_EVENT: {
1016
      code = generateSessionScanRange(pInfo, pTaskIdStr);
×
1017
      QUERY_CHECK_CODE(code, lino, _end);
×
1018
    } break;
×
1019
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_COUNT: {
×
1020

1021
    } break;
×
1022
    default:
×
1023
      break;
×
1024
  }
1025

UNCOV
1026
_end:
×
UNCOV
1027
  if (code != TSDB_CODE_SUCCESS) {
×
1028
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1029
  }
UNCOV
1030
  return code;
×
1031
}
1032

UNCOV
1033
static int32_t doOneRangeScan(SStreamScanInfo* pInfo, SScanRange* pRange, SSDataBlock** ppRes) {
×
UNCOV
1034
  qDebug("do stream recalculate scan.");
×
1035

UNCOV
1036
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1037
  int32_t lino = 0;
×
1038

UNCOV
1039
  SOperatorInfo* pScanOp = NULL;
×
UNCOV
1040
  if (pInfo->scanAllTables) {
×
1041
    pScanOp = pInfo->pTableScanOp;
×
1042
  } else {
UNCOV
1043
    pScanOp = pInfo->pRecTableScanOp;
×
1044
  }
1045

1046
  while (1) {
×
UNCOV
1047
    SSDataBlock* pResult = NULL;
×
UNCOV
1048
    code = doTableScanNext(pScanOp, &pResult);
×
UNCOV
1049
    QUERY_CHECK_CODE(code, lino, _end);
×
1050

UNCOV
1051
    STableScanInfo* pTableScanInfo = pScanOp->info;
×
UNCOV
1052
    if (pResult == NULL) {
×
UNCOV
1053
      pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
×
UNCOV
1054
      pTableScanInfo->base.dataReader = NULL;
×
UNCOV
1055
      (*ppRes) = NULL;
×
UNCOV
1056
      goto _end;
×
1057
    }
1058

UNCOV
1059
    code = doFilter(pResult, pScanOp->exprSupp.pFilterInfo, NULL);
×
UNCOV
1060
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1061
    if (pResult->info.rows == 0) {
×
1062
      continue;
×
1063
    }
1064

UNCOV
1065
    printDataBlock(pResult, "tsdb", GET_TASKID(pScanOp->pTaskInfo));
×
UNCOV
1066
    if (!pInfo->assignBlockUid) {
×
1067
      pResult->info.id.groupId = 0;
×
1068
    }
1069

UNCOV
1070
    if (pInfo->partitionSup.needCalc) {
×
1071
      SSDataBlock* tmpBlock = NULL;
×
1072
      code = createOneDataBlock(pResult, true, &tmpBlock);
×
1073
      QUERY_CHECK_CODE(code, lino, _end);
×
1074

1075
      blockDataCleanup(pResult);
×
1076
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
×
1077
        uint64_t dataGroupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i);
×
1078
        if (tSimpleHashGet(pRange->pGroupIds, &dataGroupId, sizeof(uint64_t)) != NULL) {
×
1079
          for (int32_t j = 0; j < pScanOp->exprSupp.numOfExprs; j++) {
×
1080
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
×
1081
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
×
1082
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
×
1083
            char*            pSrcData = NULL;
×
1084
            if (!isNull) pSrcData = colDataGetData(pSrcCol, i);
×
1085
            code = colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
×
1086
            QUERY_CHECK_CODE(code, lino, _end);
×
1087
          }
1088
          pResult->info.rows++;
×
1089
        }
1090
      }
1091

1092
      blockDataDestroy(tmpBlock);
×
1093

1094
      if (pResult->info.rows > 0) {
×
1095
        pResult->info.calWin = pRange->calWin;
×
1096
        (*ppRes) = pResult;
×
1097
        goto _end;
×
1098
      }
1099
    } else {
UNCOV
1100
      if (tSimpleHashGet(pRange->pGroupIds, &pResult->info.id.groupId, sizeof(uint64_t)) != NULL) {
×
UNCOV
1101
        pResult->info.calWin = pRange->calWin;
×
UNCOV
1102
        (*ppRes) = pResult;
×
UNCOV
1103
        goto _end;
×
1104
      }
1105
    }
1106
  }
1107

UNCOV
1108
_end:
×
UNCOV
1109
  if (code != TSDB_CODE_SUCCESS) {
×
1110
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1111
  }
UNCOV
1112
  return code;
×
1113
}
1114

1115
// static void exchangeTableListInfo(SStreamScanInfo* pInfo, SOperatorInfo* pScanOp) {
1116
//   STableScanInfo* pScanInfo = (STableScanInfo*)pScanOp->info;
1117
//   STableListInfo* pTemp = pScanInfo->base.pTableListInfo;
1118
//   pScanInfo->base.pTableListInfo = pInfo->pRecTableListInfo;
1119
//   pInfo->pRecTableListInfo = pTemp;
1120
// }
1121

UNCOV
1122
static int32_t prepareDataRangeScan(SStreamScanInfo* pInfo, SScanRange* pRange) {
×
UNCOV
1123
  int32_t                  code = TSDB_CODE_SUCCESS;
×
UNCOV
1124
  int32_t                  lino = 0;
×
UNCOV
1125
  SOperatorParam*          pOpParam = NULL;
×
UNCOV
1126
  STableScanOperatorParam* pTableScanParam = NULL;
×
1127

UNCOV
1128
  qDebug("prepare data range scan start:%" PRId64 ",end:%" PRId64 ",is all table:%d", pRange->win.skey,
×
1129
         pRange->win.ekey, pInfo->scanAllTables);
UNCOV
1130
  SOperatorInfo* pScanOp = NULL;
×
UNCOV
1131
  if (pInfo->scanAllTables) {
×
1132
    pScanOp = pInfo->pTableScanOp;
×
1133
  } else {
UNCOV
1134
    pScanOp = pInfo->pRecTableScanOp;
×
1135
  }
1136

UNCOV
1137
  resetTableScanInfo(pScanOp->info, &pRange->win, -1);
×
UNCOV
1138
  pScanOp->status = OP_OPENED;
×
1139

UNCOV
1140
  if (pInfo->scanAllTables == true) {
×
1141
    goto _end;
×
1142
  }
1143

UNCOV
1144
  pOpParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
×
UNCOV
1145
  QUERY_CHECK_NULL(pOpParam, code, lino, _end, terrno);
×
UNCOV
1146
  pOpParam->downstreamIdx = 0;
×
UNCOV
1147
  pOpParam->opType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
UNCOV
1148
  pOpParam->pChildren = NULL;
×
1149

UNCOV
1150
  pTableScanParam = taosMemoryCalloc(1, sizeof(STableScanOperatorParam));
×
UNCOV
1151
  QUERY_CHECK_NULL(pTableScanParam, code, lino, _end, terrno);
×
UNCOV
1152
  pOpParam->value = pTableScanParam;
×
UNCOV
1153
  pTableScanParam->tableSeq = false;
×
UNCOV
1154
  int32_t size = tSimpleHashGetSize(pRange->pUIds);
×
UNCOV
1155
  pTableScanParam->pUidList = taosArrayInit(size, sizeof(uint64_t));
×
UNCOV
1156
  QUERY_CHECK_NULL(pTableScanParam->pUidList, code, lino, _end, terrno);
×
1157

UNCOV
1158
  void*   pIte = NULL;
×
UNCOV
1159
  int32_t iter = 0;
×
UNCOV
1160
  while ((pIte = tSimpleHashIterate(pRange->pUIds, pIte, &iter)) != NULL) {
×
UNCOV
1161
    void* pTempUid = tSimpleHashGetKey(pIte, NULL);
×
UNCOV
1162
    QUERY_CHECK_NULL(pTempUid, code, lino, _end, terrno);
×
UNCOV
1163
    void* pTemRes = taosArrayPush(pTableScanParam->pUidList, pTempUid);
×
UNCOV
1164
    QUERY_CHECK_NULL(pTemRes, code, lino, _end, terrno);
×
UNCOV
1165
    qDebug("prepare data range add table uid:%" PRIu64, *(uint64_t*)pTempUid);
×
1166
  }
UNCOV
1167
  pScanOp->pOperatorGetParam = pOpParam;
×
1168

UNCOV
1169
_end:
×
UNCOV
1170
  if (code != TSDB_CODE_SUCCESS) {
×
1171
    taosMemoryFree(pOpParam);
×
1172
    if (pTableScanParam != NULL) {
×
1173
      if (pTableScanParam->pUidList != NULL) {
×
1174
        taosArrayDestroy(pTableScanParam->pUidList);
×
1175
      }
1176
      taosMemoryFree(pTableScanParam);
×
1177
    }
1178
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1179
  }
UNCOV
1180
  return code;
×
1181
}
1182

UNCOV
1183
static void destroyScanRange(SScanRange* pRange) {
×
UNCOV
1184
  pRange->win.skey = INT64_MIN;
×
UNCOV
1185
  pRange->win.ekey = INT64_MIN;
×
UNCOV
1186
  tSimpleHashCleanup(pRange->pUIds);
×
UNCOV
1187
  pRange->pUIds = NULL;
×
UNCOV
1188
  tSimpleHashCleanup(pRange->pGroupIds);
×
UNCOV
1189
  pRange->pGroupIds = NULL;
×
UNCOV
1190
}
×
1191

UNCOV
1192
static int32_t buildRecBlockByRange(SScanRange* pRange, SSDataBlock* pRes) {
×
UNCOV
1193
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1194
  int32_t lino = 0;
×
UNCOV
1195
  int32_t size = tSimpleHashGetSize(pRange->pGroupIds);
×
UNCOV
1196
  code = blockDataEnsureCapacity(pRes, size);
×
UNCOV
1197
  QUERY_CHECK_CODE(code, lino, _end);
×
1198

UNCOV
1199
  void*   pIte = NULL;
×
UNCOV
1200
  int32_t iter = 0;
×
UNCOV
1201
  while ((pIte = tSimpleHashIterate(pRange->pGroupIds, pIte, &iter)) != NULL) {
×
UNCOV
1202
    uint64_t* pGroupId = (uint64_t*)tSimpleHashGetKey(pIte, NULL);
×
UNCOV
1203
    code = appendOneRowToSpecialBlockImpl(pRes, &pRange->win.skey, &pRange->win.ekey, &pRange->calWin.skey,
×
1204
                                          &pRange->calWin.ekey, NULL, pGroupId, NULL, NULL);
UNCOV
1205
    QUERY_CHECK_CODE(code, lino, _end);
×
1206
  }
UNCOV
1207
  pRes->info.type = STREAM_RECALCULATE_DATA;
×
1208

UNCOV
1209
_end:
×
UNCOV
1210
  if (code != TSDB_CODE_SUCCESS) {
×
1211
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1212
  }
UNCOV
1213
  return code;
×
1214
}
1215

UNCOV
1216
static int32_t doDataRangeScan(SStreamScanInfo* pInfo, SExecTaskInfo* pTaskInfo, SSDataBlock** ppRes) {
×
UNCOV
1217
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
1218
  int32_t      lino = 0;
×
UNCOV
1219
  SSDataBlock* pTsdbBlock = NULL;
×
UNCOV
1220
  pInfo->pRecoverRes = NULL;
×
1221
  while (1) {
UNCOV
1222
    if (IS_INVALID_RANGE(pInfo->curRange)) {
×
UNCOV
1223
      code = pInfo->stateStore.streamStateMergeAllScanRange(pInfo->basic.pTsDataState);
×
UNCOV
1224
      QUERY_CHECK_CODE(code, lino, _end);
×
1225

UNCOV
1226
      code = pInfo->stateStore.streamStatePopScanRange(pInfo->basic.pTsDataState, &pInfo->curRange);
×
UNCOV
1227
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1228
      if (IS_INVALID_RANGE(pInfo->curRange)) {
×
UNCOV
1229
        break;
×
1230
      }
UNCOV
1231
      code = prepareDataRangeScan(pInfo, &pInfo->curRange);
×
UNCOV
1232
      QUERY_CHECK_CODE(code, lino, _end);
×
1233

UNCOV
1234
      blockDataCleanup(pInfo->pUpdateRes);
×
UNCOV
1235
      code = buildRecBlockByRange(&pInfo->curRange, pInfo->pUpdateRes);
×
UNCOV
1236
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1237
      if (pInfo->pUpdateRes->info.rows > 0) {
×
UNCOV
1238
        (*ppRes) = pInfo->pUpdateRes;
×
UNCOV
1239
        break;
×
1240
      }
1241
    }
1242

UNCOV
1243
    code = doOneRangeScan(pInfo, &pInfo->curRange, &pTsdbBlock);
×
UNCOV
1244
    QUERY_CHECK_CODE(code, lino, _end);
×
1245

UNCOV
1246
    if (pTsdbBlock != NULL) {
×
UNCOV
1247
      pInfo->pRangeScanRes = pTsdbBlock;
×
UNCOV
1248
      code = calBlockTbName(pInfo, pTsdbBlock, 0);
×
UNCOV
1249
      QUERY_CHECK_CODE(code, lino, _end);
×
1250

UNCOV
1251
      if (pInfo->pCreateTbRes->info.rows > 0) {
×
UNCOV
1252
        (*ppRes) = pInfo->pCreateTbRes;
×
UNCOV
1253
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
×
UNCOV
1254
        break;
×
1255
      }
UNCOV
1256
      (*ppRes) = pTsdbBlock;
×
UNCOV
1257
      break;
×
1258
    } else {
UNCOV
1259
      destroyScanRange(&pInfo->curRange);
×
1260
    }
1261
  }
1262

UNCOV
1263
_end:
×
UNCOV
1264
  printDataBlock((*ppRes), "stream tsdb scan", GET_TASKID(pTaskInfo));
×
UNCOV
1265
  if (code != TSDB_CODE_SUCCESS) {
×
1266
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1267
  }
UNCOV
1268
  return code;
×
1269
}
1270

UNCOV
1271
static int32_t buildStreamRecalculateBlock(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
1272
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
1273
  int32_t          lino = 0;
×
UNCOV
1274
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
1275
  SStreamScanInfo* pInfo = pOperator->info;
×
1276

UNCOV
1277
  if (pInfo->basic.pTsDataState->curRecId == -1) {
×
UNCOV
1278
    int32_t recId = 0;
×
UNCOV
1279
    code = getRecalculateId(&pInfo->stateStore, pInfo->basic.pTsDataState->pStreamTaskState, &recId);
×
UNCOV
1280
    QUERY_CHECK_CODE(code, lino, _end);
×
1281

UNCOV
1282
    qDebug("===stream===%s do recalculate.recId:%d", GET_TASKID(pTaskInfo), recId);
×
UNCOV
1283
    pInfo->basic.pTsDataState->curRecId = recId;
×
UNCOV
1284
    pInfo->stateStore.streamStateSetNumber(pInfo->basic.pTsDataState->pStreamTaskState, recId, pInfo->primaryTsIndex);
×
1285

UNCOV
1286
    SSessionKey      firstKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
×
UNCOV
1287
    pInfo->basic.pTsDataState->pRecCur =
×
UNCOV
1288
        pInfo->stateStore.streamStateSessionSeekKeyCurrentNext(pInfo->basic.pTsDataState->pStreamTaskState, &firstKey);
×
1289
  }
1290

UNCOV
1291
  blockDataCleanup(pInfo->pUpdateRes);
×
UNCOV
1292
  code = blockDataEnsureCapacity(pInfo->pUpdateRes, 1024);
×
UNCOV
1293
  QUERY_CHECK_CODE(code, lino, _end);
×
1294

UNCOV
1295
  while (1) {
×
UNCOV
1296
    SSessionKey rangKey = {0};
×
UNCOV
1297
    void*       pVal = NULL;
×
UNCOV
1298
    int32_t     len = 0;
×
UNCOV
1299
    int32_t     winRes = pInfo->stateStore.streamStateSessionGetKVByCur(pInfo->basic.pTsDataState->pRecCur, &rangKey, &pVal, &len);
×
UNCOV
1300
    if (winRes != TSDB_CODE_SUCCESS) {
×
UNCOV
1301
      break;
×
1302
    }
UNCOV
1303
    SRecDataInfo* pRecData = (SRecDataInfo*)pVal;
×
UNCOV
1304
    if (pInfo->pUpdateRes->info.rows == 0) {
×
UNCOV
1305
      pInfo->pUpdateRes->info.type = pRecData->mode;
×
UNCOV
1306
    } else if (pInfo->pUpdateRes->info.type != pRecData->mode || pInfo->pUpdateRes->info.rows ==  pInfo->pUpdateRes->info.capacity) {
×
1307
      break;
1308
    }
1309

UNCOV
1310
    code = appendOneRowToSpecialBlockImpl(pInfo->pUpdateRes, &rangKey.win.skey, &rangKey.win.ekey, &pRecData->calWin.skey,
×
UNCOV
1311
                                          &pRecData->calWin.ekey, &pRecData->tableUid ,&rangKey.groupId, NULL, (void*)pRecData->pPkColData);
×
UNCOV
1312
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1313
    pInfo->stateStore.streamStateCurNext(pInfo->basic.pTsDataState->pStreamTaskState, pInfo->basic.pTsDataState->pRecCur);
×
1314
  }
1315

UNCOV
1316
  if (pInfo->pUpdateRes->info.rows > 0) {
×
UNCOV
1317
    (*ppRes) = pInfo->pUpdateRes;
×
1318
  } else {
UNCOV
1319
    (*ppRes) = NULL;
×
UNCOV
1320
    pInfo->stateStore.streamStateFreeCur(pInfo->basic.pTsDataState->pRecCur);
×
UNCOV
1321
    pInfo->basic.pTsDataState->pRecCur = NULL;
×
UNCOV
1322
    pInfo->basic.pTsDataState->curRecId = -1;
×
1323
  }
1324

UNCOV
1325
_end:
×
UNCOV
1326
  if (code != TSDB_CODE_SUCCESS) {
×
1327
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1328
    pTaskInfo->code = code;
×
1329
  }
UNCOV
1330
  return code;
×
1331
}
1332

UNCOV
1333
static int32_t doStreamRecalculateDataScan(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
1334
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
1335
  int32_t          lino = 0;
×
UNCOV
1336
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
1337
  SStreamScanInfo* pInfo = pOperator->info;
×
1338

UNCOV
1339
  switch (pInfo->scanMode) {
×
UNCOV
1340
    case STREAM_SCAN_FROM_RES: {
×
UNCOV
1341
      (*ppRes) = pInfo->pRangeScanRes;
×
UNCOV
1342
      pInfo->pRangeScanRes = NULL;
×
UNCOV
1343
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
×
UNCOV
1344
      printDataBlock((*ppRes), "stream tsdb scan", GET_TASKID(pTaskInfo));
×
UNCOV
1345
      goto _end;
×
1346
    } break;
1347
    case STREAM_SCAN_FROM_CREATE_TABLERES: {
×
1348
      (*ppRes) = pInfo->pCreateTbRes;
×
1349
      pInfo->scanMode = STREAM_SCAN_FROM_RES;
×
1350
    }
UNCOV
1351
    default:
×
UNCOV
1352
      (*ppRes) = NULL;
×
UNCOV
1353
      break;
×
1354
  }
1355

UNCOV
1356
  if (pInfo->basic.pTsDataState->curRecId == -1) {
×
UNCOV
1357
    int32_t recId = 0;
×
UNCOV
1358
    code = getRecalculateId(&pInfo->stateStore, pInfo->basic.pTsDataState->pStreamTaskState, &recId);
×
UNCOV
1359
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1360
    qDebug("===stream===%s do recalculate.recId:%d", GET_TASKID(pTaskInfo), recId);
×
UNCOV
1361
    pInfo->stateStore.streamStateSetNumber(pInfo->basic.pTsDataState->pStreamTaskState, recId, pInfo->primaryTsIndex);
×
UNCOV
1362
    code = generateDataScanRange(pInfo, GET_TASKID(pTaskInfo));
×
UNCOV
1363
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1364
    pInfo->basic.pTsDataState->curRecId = recId;
×
1365
  }
1366

UNCOV
1367
  code = doDataRangeScan(pInfo, pTaskInfo, ppRes);
×
UNCOV
1368
  QUERY_CHECK_CODE(code, lino, _end);
×
1369

UNCOV
1370
  if ((*ppRes) == NULL) {
×
UNCOV
1371
    pInfo->stateStore.streamStateSessionDeleteAll(pInfo->basic.pTsDataState->pState);
×
UNCOV
1372
    pInfo->basic.pTsDataState->curRecId = -1;
×
UNCOV
1373
    pTaskInfo->streamInfo.recoverScanFinished = true;
×
UNCOV
1374
    qInfo("===stream===%s recalculate is finished.", GET_TASKID(pTaskInfo));
×
1375
  }
1376

UNCOV
1377
_end:
×
UNCOV
1378
  if (code != TSDB_CODE_SUCCESS) {
×
1379
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1380
    pTaskInfo->code = code;
×
1381
  }
UNCOV
1382
  return code;
×
1383
}
1384

UNCOV
1385
static int32_t doStreamRecalculateBlockScan(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
1386
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
1387
  int32_t          lino = 0;
×
UNCOV
1388
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
1389
  SStreamScanInfo* pInfo = pOperator->info;
×
UNCOV
1390
  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
×
1391

UNCOV
1392
  qDebug("===stream===%s doStreamRecalculateBlockScan", GET_TASKID(pTaskInfo));
×
1393

UNCOV
1394
  if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
×
UNCOV
1395
    if (pInfo->pRangeScanRes != NULL) {
×
UNCOV
1396
      (*ppRes) = pInfo->pRangeScanRes;
×
UNCOV
1397
      pInfo->pRangeScanRes = NULL;
×
UNCOV
1398
      goto _end;
×
1399
    }
UNCOV
1400
    SSDataBlock* pSDB = NULL;
×
UNCOV
1401
    code = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex, &pSDB);
×
UNCOV
1402
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1403
    if (pSDB) {
×
UNCOV
1404
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
×
UNCOV
1405
      pSDB->info.type = STREAM_PULL_DATA;
×
1406
      
UNCOV
1407
      printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo));
×
UNCOV
1408
      code = calBlockTbName(pInfo, pSDB, 0);
×
UNCOV
1409
      QUERY_CHECK_CODE(code, lino, _end);
×
1410

UNCOV
1411
      if (pInfo->pCreateTbRes->info.rows > 0) {
×
UNCOV
1412
        printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "update",
×
UNCOV
1413
                           GET_TASKID(pTaskInfo));
×
UNCOV
1414
        (*ppRes) = pInfo->pCreateTbRes;
×
UNCOV
1415
        pInfo->pRangeScanRes = pSDB;
×
UNCOV
1416
        goto _end;
×
1417
      }
1418

UNCOV
1419
      (*ppRes) = pSDB;
×
UNCOV
1420
      goto _end;
×
1421
    }
UNCOV
1422
    blockDataCleanup(pInfo->pUpdateRes);
×
UNCOV
1423
    pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
×
UNCOV
1424
    pStreamInfo->recoverScanFinished = true;
×
1425
  }
1426

UNCOV
1427
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
×
1428
  while (1) {
UNCOV
1429
    if (pInfo->validBlockIndex >= total) {
×
UNCOV
1430
      doClearBufferedBlocks(pInfo);
×
UNCOV
1431
      (*ppRes) = NULL;
×
UNCOV
1432
      break;
×
1433
    }
UNCOV
1434
    int32_t current = pInfo->validBlockIndex++;
×
UNCOV
1435
    qDebug("process %d/%d recalculate input data blocks, %s", current, (int32_t)total, GET_TASKID(pTaskInfo));
×
1436

UNCOV
1437
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
×
UNCOV
1438
    QUERY_CHECK_NULL(pPacked, code, lino, _end, terrno);
×
1439

UNCOV
1440
    SSDataBlock* pBlock = pPacked->pDataBlock;
×
UNCOV
1441
    pBlock->info.calWin.skey = INT64_MIN;
×
UNCOV
1442
    pBlock->info.calWin.ekey = INT64_MAX;
×
UNCOV
1443
    pBlock->info.dataLoad = 1;
×
1444

UNCOV
1445
    code = blockDataUpdateTsWindow(pBlock, 0);
×
UNCOV
1446
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1447
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "rec recv", GET_TASKID(pTaskInfo));
×
UNCOV
1448
    switch (pBlock->info.type) {
×
UNCOV
1449
      case STREAM_RETRIEVE: {
×
UNCOV
1450
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
×
UNCOV
1451
        code = copyDataBlock(pInfo->pUpdateRes, pBlock);
×
UNCOV
1452
        QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1453
        pInfo->updateResIndex = 0;
×
UNCOV
1454
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
×
UNCOV
1455
        (*ppRes) = pInfo->pUpdateRes;
×
UNCOV
1456
        goto _end;
×
1457
      } break;
UNCOV
1458
      default: {
×
UNCOV
1459
        (*ppRes) = pBlock;
×
UNCOV
1460
        goto _end;
×
1461
      } break;
1462
    }
1463
  }
1464

UNCOV
1465
_end:
×
UNCOV
1466
  if (code != TSDB_CODE_SUCCESS) {
×
1467
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1468
    pTaskInfo->code = code;
×
1469
  }
UNCOV
1470
  return code;
×
1471
}
1472

UNCOV
1473
int32_t doStreamRecalculateScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
1474
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
1475
  int32_t          lino = 0;
×
UNCOV
1476
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
1477
  char*            pTaskIdStr = GET_TASKID(pTaskInfo);
×
UNCOV
1478
  SStreamScanInfo* pInfo = pOperator->info;
×
1479

UNCOV
1480
  qDebug("stream recalculate scan started, %s", pTaskIdStr);
×
1481

UNCOV
1482
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
×
UNCOV
1483
  switch (pInfo->blockType) {
×
UNCOV
1484
    case STREAM_INPUT__DATA_BLOCK: {
×
UNCOV
1485
      code = doStreamRecalculateBlockScan(pOperator, ppRes);
×
UNCOV
1486
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1487
    } break;
×
1488
    case STREAM_INPUT__CHECKPOINT: {
×
1489
      doClearBufferedBlocks(pInfo);
×
1490
      (*ppRes) = NULL;
×
1491
      qDebug("===stream===%s process input data blocks,size:%d", pTaskIdStr, (int32_t)total);
×
1492
    } break;
×
UNCOV
1493
    case STREAM_INPUT__RECALCULATE:
×
1494
    default: {
UNCOV
1495
      doClearBufferedBlocks(pInfo);
×
UNCOV
1496
      if(isFinalOperator(&pInfo->basic)) {
×
UNCOV
1497
        code = buildStreamRecalculateBlock(pOperator, ppRes);
×
UNCOV
1498
        QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1499
      } else if (isSingleOperator(&pInfo->basic)) {
×
UNCOV
1500
        code = doStreamRecalculateDataScan(pOperator, ppRes);
×
UNCOV
1501
        QUERY_CHECK_CODE(code, lino, _end);
×
1502
      } else {
1503
        qDebug("===stream===%s return empty block", pTaskIdStr);
×
1504
        (*ppRes) = NULL;
×
1505
      }
UNCOV
1506
    } break;
×
1507
  }
1508

UNCOV
1509
_end:
×
UNCOV
1510
  if (code != TSDB_CODE_SUCCESS) {
×
1511
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1512
    pTaskInfo->code = code;
×
1513
  }
UNCOV
1514
  return code;
×
1515
}
1516

UNCOV
1517
static void destroyStreamRecalculateParam(SStreamRecParam* pParam) {
×
UNCOV
1518
  tSimpleHashCleanup(pParam->pColIdMap);
×
UNCOV
1519
  pParam->pColIdMap = NULL;
×
UNCOV
1520
}
×
1521

UNCOV
1522
static void destroyStreamDataScanOperatorInfo(void* param) {
×
UNCOV
1523
  if (param == NULL) {
×
1524
    return;
×
1525
  }
1526

UNCOV
1527
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
×
UNCOV
1528
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
×
UNCOV
1529
    destroyOperator(pStreamScan->pTableScanOp);
×
1530
  }
1531

UNCOV
1532
  if (pStreamScan->pRecTableScanOp && pStreamScan->pRecTableScanOp->info) {
×
UNCOV
1533
    destroyOperator(pStreamScan->pRecTableScanOp);
×
1534
  }
1535

UNCOV
1536
  if (pStreamScan->tqReader != NULL && pStreamScan->readerFn.tqReaderClose != NULL) {
×
UNCOV
1537
    pStreamScan->readerFn.tqReaderClose(pStreamScan->tqReader);
×
1538
  }
UNCOV
1539
  if (pStreamScan->matchInfo.pList) {
×
UNCOV
1540
    taosArrayDestroy(pStreamScan->matchInfo.pList);
×
1541
  }
UNCOV
1542
  if (pStreamScan->pPseudoExpr) {
×
UNCOV
1543
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
×
UNCOV
1544
    taosMemoryFree(pStreamScan->pPseudoExpr);
×
1545
  }
1546

UNCOV
1547
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
×
UNCOV
1548
  cleanupExprSupp(&pStreamScan->tagCalSup);
×
1549

UNCOV
1550
  blockDataDestroy(pStreamScan->pRes);
×
UNCOV
1551
  blockDataDestroy(pStreamScan->pCreateTbRes);
×
UNCOV
1552
  taosArrayDestroy(pStreamScan->pBlockLists);
×
UNCOV
1553
  blockDataDestroy(pStreamScan->pCheckpointRes);
×
UNCOV
1554
  blockDataDestroy(pStreamScan->pDeleteDataRes);
×
UNCOV
1555
  blockDataDestroy(pStreamScan->pUpdateRes);
×
1556

UNCOV
1557
  if (pStreamScan->stateStore.streamStateDestroyTsDataState) {
×
UNCOV
1558
    pStreamScan->stateStore.streamStateDestroyTsDataState(pStreamScan->basic.pTsDataState);
×
UNCOV
1559
    pStreamScan->basic.pTsDataState = NULL;
×
1560
  }
1561

UNCOV
1562
  if (pStreamScan->stateStore.updateInfoDestroy != NULL && pStreamScan->pUpdateInfo != NULL) {
×
UNCOV
1563
    pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo);
×
1564
  }
UNCOV
1565
  tSimpleHashCleanup(pStreamScan->pRecRangeMap);
×
UNCOV
1566
  pStreamScan->pRecRangeMap = NULL;
×
1567

UNCOV
1568
  taosArrayDestroy(pStreamScan->pRecRangeRes);
×
UNCOV
1569
  pStreamScan->pRecRangeRes = NULL;
×
1570

UNCOV
1571
  destroyStreamRecalculateParam(&pStreamScan->recParam);
×
1572

UNCOV
1573
  taosMemoryFree(pStreamScan);
×
1574
}
1575

1576
#if 0
1577
static int32_t doStreamScanTest(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1578
  int32_t          code = TSDB_CODE_SUCCESS;
1579
  int32_t          lino = 0;
1580
  SStreamScanInfo* pInfo = pOperator->info;
1581
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1582
  SSDataBlock* pBlock = NULL;
1583

1584
  pInfo->basic.pTsDataState->pStreamTaskState = (SStreamState*)taosMemoryCalloc(1, sizeof(SStreamState));
1585
  QUERY_CHECK_NULL(pInfo->basic.pTsDataState->pStreamTaskState, code, lino, _end, terrno);
1586
  *((SStreamState*)pInfo->basic.pTsDataState->pStreamTaskState) = *pTaskInfo->streamInfo.pState;
1587

1588
  doStreamDataScanNext(pOperator, ppRes);
1589
  if ((*ppRes) != NULL) {
1590
    return code;
1591
  }
1592

1593
  {
1594
    code = createDataBlock(&pBlock);
1595
    QUERY_CHECK_CODE(code, lino, _end);
1596
    pBlock->info.type = STREAM_RECALCULATE_START;
1597
    SPackedData pack = {0};
1598
    pack.pDataBlock = pBlock;
1599
    void* pBuf = taosArrayPush(pInfo->pBlockLists, &pack);
1600
    QUERY_CHECK_NULL(pBuf, code, lino, _end, terrno);
1601
  }
1602

1603
  pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
1604
  doStreamDataScanNext(pOperator, ppRes);
1605

1606
  SOperatorInfo* pRoot = pTaskInfo->pRoot;
1607
  SStreamIntervalSliceOperatorInfo* pIntervalInfo = pRoot->info;
1608
  setRecalculateOperatorFlag(&pIntervalInfo->basic);
1609
  code = doStreamRecalculateScanNext(pOperator, ppRes);
1610
  QUERY_CHECK_CODE(code, lino, _end);
1611

1612
  if ((*ppRes) == NULL) {
1613
    // unsetRecalculateOperatorFlag(&pIntervalInfo->basic);
1614
    // pBlock->info.type = STREAM_RECALCULATE_END;
1615
    // SPackedData pack = {0};
1616
    // pack.pDataBlock = pBlock;
1617
    // void* pBuf = taosArrayPush(pInfo->pBlockLists, &pack);
1618
    // QUERY_CHECK_NULL(pBuf, code, lino, _end, terrno);
1619
    // doStreamDataScanNext(pOperator, ppRes);
1620
    // pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
1621
  }
1622

1623
_end:
1624
  blockDataDestroy(pBlock);
1625
  if (code != TSDB_CODE_SUCCESS) {
1626
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1627
    (*ppRes) = NULL;
1628
  }
1629
  return code;
1630
}
1631
#endif
1632

UNCOV
1633
static void initStreamRecalculateParam(STableScanPhysiNode* pTableScanNode, SStreamRecParam* pParam) {
×
UNCOV
1634
  tstrncpy(pParam->pStbFullName, pTableScanNode->pStbFullName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
1635
  tstrncpy(pParam->pWstartName, pTableScanNode->pWstartName, TSDB_COL_NAME_LEN);
×
UNCOV
1636
  tstrncpy(pParam->pWendName, pTableScanNode->pWendName, TSDB_COL_NAME_LEN);
×
UNCOV
1637
  tstrncpy(pParam->pGroupIdName, pTableScanNode->pGroupIdName, TSDB_COL_NAME_LEN);
×
UNCOV
1638
  tstrncpy(pParam->pIsWindowFilledName, pTableScanNode->pIsWindowFilledName, TSDB_COL_NAME_LEN);
×
1639

1640

UNCOV
1641
  pParam->sqlCapcity = tListLen(pParam->pSql);
×
UNCOV
1642
  (void)tsnprintf(pParam->pUrl, tListLen(pParam->pUrl), "http://%s:%d/rest/sql", tsAdapterFqdn, tsAdapterPort);
×
UNCOV
1643
  (void)tsnprintf(pParam->pAuth, tListLen(pParam->pAuth), "Authorization: Basic %s", tsAdapterToken);
×
1644

UNCOV
1645
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
UNCOV
1646
  pParam->pColIdMap = tSimpleHashInit(32, hashFn);
×
UNCOV
1647
}
×
1648

UNCOV
1649
int32_t createStreamDataScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
×
1650
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
1651
                                         SOperatorInfo** pOptrInfo) {
UNCOV
1652
  QRY_PARAM_CHECK(pOptrInfo);
×
1653

UNCOV
1654
  int32_t             code = TSDB_CODE_SUCCESS;
×
UNCOV
1655
  int32_t             lino = 0;
×
UNCOV
1656
  SArray*             pColIds = NULL;
×
UNCOV
1657
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
×
UNCOV
1658
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
×
UNCOV
1659
  SStreamScanInfo*    pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
×
UNCOV
1660
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
UNCOV
1661
  const char*         idstr = pTaskInfo->id.str;
×
UNCOV
1662
  SStorageAPI*        pAPI = &pTaskInfo->storageAPI;
×
1663

UNCOV
1664
  if (pInfo == NULL || pOperator == NULL) {
×
1665
    code = terrno;
×
1666
    goto _error;
×
1667
  }
1668

UNCOV
1669
  pInfo->pTagCond = pTagCond;
×
UNCOV
1670
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
×
1671

UNCOV
1672
  int32_t numOfCols = 0;
×
UNCOV
1673
  code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
×
UNCOV
1674
  if (code != TSDB_CODE_SUCCESS) {
×
1675
    goto _error;
×
1676
  }
1677

UNCOV
1678
  pInfo->basic.primaryPkIndex = -1;
×
UNCOV
1679
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
×
UNCOV
1680
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
×
UNCOV
1681
  QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno);
×
1682

UNCOV
1683
  SDataType pkType = {0};
×
UNCOV
1684
  for (int32_t i = 0; i < numOfOutput; ++i) {
×
UNCOV
1685
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
×
UNCOV
1686
    QUERY_CHECK_NULL(id, code, lino, _error, terrno);
×
1687

UNCOV
1688
    int16_t colId = id->colId;
×
UNCOV
1689
    void*   tmp = taosArrayPush(pColIds, &colId);
×
UNCOV
1690
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
×
1691

UNCOV
1692
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
×
UNCOV
1693
      pInfo->primaryTsIndex = id->dstSlotId;
×
1694
    }
UNCOV
1695
    if (id->isPk) {
×
1696
      pInfo->basic.primaryPkIndex = id->dstSlotId;
×
1697
      pkType = id->dataType;
×
1698
    }
1699
  }
1700

UNCOV
1701
  pInfo->pPartTbnameSup = NULL;
×
UNCOV
1702
  if (pTableScanNode->pSubtable != NULL) {
×
UNCOV
1703
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
×
UNCOV
1704
    QUERY_CHECK_NULL(pSubTableExpr, code, lino, _error, terrno);
×
1705

UNCOV
1706
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
×
UNCOV
1707
    code = createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
×
UNCOV
1708
    QUERY_CHECK_CODE(code, lino, _error);
×
1709

UNCOV
1710
    code = initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1, &pTaskInfo->storageAPI.functionStore);
×
UNCOV
1711
    QUERY_CHECK_CODE(code, lino, _error);
×
1712
  }
1713

UNCOV
1714
  if (pTableScanNode->pTags != NULL) {
×
1715
    int32_t    numOfTags;
UNCOV
1716
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
×
UNCOV
1717
    QUERY_CHECK_NULL(pTagExpr, code, lino, _error, terrno);
×
UNCOV
1718
    code = initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore);
×
UNCOV
1719
    QUERY_CHECK_CODE(code, lino, _error);
×
1720
  }
1721

UNCOV
1722
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
×
UNCOV
1723
  TSDB_CHECK_NULL(pInfo->pBlockLists, code, lino, _error, terrno);
×
1724

UNCOV
1725
  pInfo->pTableScanOp = NULL;
×
UNCOV
1726
  if (pHandle->vnode) {
×
UNCOV
1727
    code = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pInfo->pTableScanOp);
×
UNCOV
1728
    QUERY_CHECK_CODE(code, lino, _error);
×
1729

UNCOV
1730
    STableScanInfo* pTSInfo = (STableScanInfo*)pInfo->pTableScanOp->info;
×
UNCOV
1731
    if (pHandle->version > 0) {
×
1732
      pTSInfo->base.cond.endVersion = pHandle->version;
×
1733
    }
1734

UNCOV
1735
    STableKeyInfo* pList = NULL;
×
UNCOV
1736
    int32_t        num = 0;
×
UNCOV
1737
    code = tableListGetGroupList(pTableListInfo, 0, &pList, &num);
×
UNCOV
1738
    QUERY_CHECK_CODE(code, lino, _error);
×
1739

UNCOV
1740
    if (pHandle->initTqReader) {
×
UNCOV
1741
      pInfo->tqReader = pAPI->tqReaderFn.tqReaderOpen(pHandle->vnode);
×
UNCOV
1742
      QUERY_CHECK_NULL(pInfo->tqReader, code, lino, _error, terrno);
×
1743
    } else {
1744
      pInfo->tqReader = pHandle->tqReader;
×
1745
      QUERY_CHECK_NULL(pInfo->tqReader, code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1746
    }
1747

UNCOV
1748
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
×
UNCOV
1749
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
×
UNCOV
1750
    QUERY_CHECK_NULL(pInfo->pCreateTbRes, code, lino, _error, terrno);
×
1751

UNCOV
1752
    code = blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
×
UNCOV
1753
    QUERY_CHECK_CODE(code, lino, _error);
×
1754

1755
    // set the extract column id to streamHandle
UNCOV
1756
    pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds);
×
1757

UNCOV
1758
    SArray* tableIdList = NULL;
×
UNCOV
1759
    code = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo, &tableIdList);
×
UNCOV
1760
    QUERY_CHECK_CODE(code, lino, _error);
×
UNCOV
1761
    pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList, idstr);
×
UNCOV
1762
    taosArrayDestroy(tableIdList);
×
UNCOV
1763
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
×
1764

UNCOV
1765
    STableListInfo* pRecTableListInfo = tableListCreate();
×
UNCOV
1766
    QUERY_CHECK_NULL(pRecTableListInfo, code, lino, _error, terrno);
×
UNCOV
1767
    code = createTableScanOperatorInfo(pTableScanNode, pHandle, pRecTableListInfo, pTaskInfo, &pInfo->pRecTableScanOp);
×
UNCOV
1768
    QUERY_CHECK_CODE(code, lino, _error);
×
1769
  } else {
UNCOV
1770
    taosArrayDestroy(pColIds);
×
UNCOV
1771
    tableListDestroy(pTableListInfo);
×
1772
  }
1773

1774
  // clear the local variable to avoid repeatly free
UNCOV
1775
  pColIds = NULL;
×
1776

1777
  // create the pseduo columns info
UNCOV
1778
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
×
UNCOV
1779
    code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr);
×
UNCOV
1780
    QUERY_CHECK_CODE(code, lino, _error);
×
1781
  }
1782

UNCOV
1783
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
×
UNCOV
1784
  QUERY_CHECK_CODE(code, lino, _error);
×
1785

UNCOV
1786
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
×
UNCOV
1787
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
×
1788

UNCOV
1789
  code = createSpecialDataBlock(STREAM_RECALCULATE_DELETE, &pInfo->pDeleteDataRes);
×
UNCOV
1790
  QUERY_CHECK_CODE(code, lino, _error);
×
1791

UNCOV
1792
  code = createSpecialDataBlock(STREAM_RECALCULATE_DATA, &pInfo->pUpdateRes);
×
UNCOV
1793
  QUERY_CHECK_CODE(code, lino, _error);
×
UNCOV
1794
  pInfo->pRecoverRes = NULL;
×
1795

UNCOV
1796
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
×
UNCOV
1797
  pInfo->twAggSup.maxTs = INT64_MIN;
×
UNCOV
1798
  pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
×
UNCOV
1799
  pInfo->readHandle = *pHandle;
×
UNCOV
1800
  pInfo->comparePkColFn = getKeyComparFunc(pkType.type, TSDB_ORDER_ASC);
×
UNCOV
1801
  pInfo->curRange = (SScanRange){0};
×
UNCOV
1802
  pInfo->scanAllTables = false;
×
UNCOV
1803
  pInfo->hasPart = false;
×
UNCOV
1804
  pInfo->assignBlockUid = groupbyTbname(pInfo->pGroupTags);
×
1805

UNCOV
1806
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
×
UNCOV
1807
  QUERY_CHECK_CODE(code, lino, _error);
×
1808

UNCOV
1809
  SStreamState* pTempState = (SStreamState*)taosMemoryCalloc(1, sizeof(SStreamState));
×
UNCOV
1810
  QUERY_CHECK_NULL(pTempState, code, lino, _error, terrno);
×
UNCOV
1811
  (*pTempState) = *pTaskInfo->streamInfo.pState;
×
UNCOV
1812
  pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
×
UNCOV
1813
  pInfo->stateStore.streamStateSetNumber(pTempState, 1, pInfo->primaryTsIndex);
×
1814
  
UNCOV
1815
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
UNCOV
1816
  pInfo->pRecRangeMap = tSimpleHashInit(32, hashFn);
×
UNCOV
1817
  taosArrayDestroy(pInfo->pRecRangeRes);
×
UNCOV
1818
  pInfo->pRecRangeRes = taosArrayInit(64, POINTER_BYTES);
×
UNCOV
1819
  initStreamRecalculateParam(pTableScanNode, &pInfo->recParam);
×
1820

UNCOV
1821
  void* pOtherState = pTaskInfo->streamInfo.pOtherState;
×
UNCOV
1822
  pAPI->stateStore.streamStateInitTsDataState(&pInfo->basic.pTsDataState, pkType.type, pkType.bytes, pTempState, pOtherState);
×
UNCOV
1823
  pAPI->stateStore.streamStateRecoverTsData(pInfo->basic.pTsDataState);
×
UNCOV
1824
  setSingleOperatorFlag(&pInfo->basic);
×
1825

UNCOV
1826
  pInfo->pStreamScanOp = pOperator;
×
1827

1828
  // for stream
UNCOV
1829
  if (pTaskInfo->streamInfo.pState) {
×
UNCOV
1830
    void*   buff = NULL;
×
UNCOV
1831
    int32_t len = 0;
×
UNCOV
1832
    int32_t res = pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_DATA_SCAN_OP_CHECKPOINT_NAME,
×
1833
                                                      strlen(STREAM_DATA_SCAN_OP_CHECKPOINT_NAME), &buff, &len);
1834
  }
1835

UNCOV
1836
  setOperatorInfo(pOperator, STREAM_DATA_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED,
×
1837
                  pInfo, pTaskInfo);
UNCOV
1838
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
×
1839

UNCOV
1840
  if (pHandle->fillHistory == STREAM_RECALCUL_OPERATOR) {
×
1841
    pOperator->fpSet =
UNCOV
1842
        createOperatorFpSet(optrDummyOpenFn, doStreamRecalculateScanNext, NULL, destroyStreamDataScanOperatorInfo,
×
1843
                            optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1844
  } else {
1845
    pOperator->fpSet =
UNCOV
1846
        createOperatorFpSet(optrDummyOpenFn, doStreamDataScanNext, NULL, destroyStreamDataScanOperatorInfo,
×
1847
                            optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1848
  }
1849
  // doStreamScanTest
1850
  // pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamScanTest, NULL, destroyStreamDataScanOperatorInfo,
1851
  //                                        optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
UNCOV
1852
  setOperatorStreamStateFn(pOperator, streamDataScanReleaseState, streamDataScanReloadState);
×
1853

UNCOV
1854
  *pOptrInfo = pOperator;
×
UNCOV
1855
  return code;
×
1856

1857
_error:
×
1858
  if (pColIds != NULL) {
×
1859
    taosArrayDestroy(pColIds);
×
1860
  }
1861

1862
  if (pInfo != NULL) {
×
1863
    STableScanInfo* p = (STableScanInfo*)pInfo->pTableScanOp->info;
×
1864
    if (p != NULL) {
×
1865
      p->base.pTableListInfo = NULL;
×
1866
    }
1867
    destroyStreamDataScanOperatorInfo(pInfo);
×
1868
  }
1869

1870
  if (pOperator != NULL) {
×
1871
    pOperator->info = NULL;
×
1872
    destroyOperator(pOperator);
×
1873
  }
1874
  pTaskInfo->code = code;
×
1875
  return code;
×
1876
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc