• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3796

31 Mar 2025 10:39AM UTC coverage: 30.372% (-7.1%) from 37.443%
#3796

push

travis-ci

happyguoxy
test:add test cases

69287 of 309062 branches covered (22.42%)

Branch coverage included in aggregate %.

118044 of 307720 relevant lines covered (38.36%)

278592.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/streamscanoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "streamexecutorInt.h"
23
#include "streamsession.h"
24
#include "systable.h"
25
#include "tname.h"
26

27
#include "tdatablock.h"
28
#include "tmsg.h"
29
#include "ttime.h"
30

31
#include "operator.h"
32
#include "query.h"
33
#include "querytask.h"
34
#include "tcompare.h"
35
#include "thash.h"
36
#include "ttypes.h"
37

38
#include "storageapi.h"
39
#include "wal.h"
40

41
#define STREAM_DATA_SCAN_OP_NAME            "StreamDataScanOperator"
42
#define STREAM_DATA_SCAN_OP_STATE_NAME      "StreamDataScanFillHistoryState"
43
#define STREAM_DATA_SCAN_OP_CHECKPOINT_NAME "StreamDataScanOperator_Checkpoint"
44
#define STREAM_DATA_SCAN_OP_REC_ID_NAME     "StreamDataScanOperator_Recalculate_ID"
45

46
static int32_t getMaxTsKeyInfo(SStreamScanInfo* pInfo, SSDataBlock* pBlock, TSKEY* pCurTs, void** ppPkVal,
×
47
                               int32_t* pWinCode) {
48
  int32_t code = TSDB_CODE_SUCCESS;
×
49
  int32_t lino = 0;
×
50
  void*   pLastPkVal = NULL;
×
51
  int32_t lastPkLen = 0;
×
52
  if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
53
    SColumnInfoData* pPkColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->basic.primaryPkIndex);
×
54
    pLastPkVal = colDataGetData(pPkColDataInfo, pBlock->info.rows - 1);
×
55
    lastPkLen = colDataGetRowLength(pPkColDataInfo, pBlock->info.rows - 1);
×
56
  }
57

58
  code = pInfo->stateStore.streamStateGetAndSetTsData(pInfo->basic.pTsDataState, pBlock->info.id.uid, pCurTs, ppPkVal,
×
59
                                                      pBlock->info.window.ekey, pLastPkVal, lastPkLen, pWinCode);
60
  QUERY_CHECK_CODE(code, lino, _end);
×
61

62
_end:
×
63
  if (code != TSDB_CODE_SUCCESS) {
×
64
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
65
  }
66
  return code;
×
67
}
68

69
int32_t copyRecDataToBuff(TSKEY calStart, TSKEY calEnd, uint64_t uid, uint64_t version, EStreamType mode,
×
70
                          const SColumnInfoData* pPkColDataInfo, int32_t rowId, SRecDataInfo* pValueBuff,
71
                          int32_t buffLen) {
72
  pValueBuff->calWin.skey = calStart;
×
73
  pValueBuff->calWin.ekey = calEnd;
×
74
  pValueBuff->tableUid = uid;
×
75
  pValueBuff->dataVersion = version;
×
76
  pValueBuff->mode = mode;
×
77

78
  int32_t pkLen = 0;
×
79
  if (pPkColDataInfo != NULL) {
×
80
    pkLen = colDataGetRowLength(pPkColDataInfo, rowId);
×
81
    memcpy(pValueBuff->pPkColData, colDataGetData(pPkColDataInfo, rowId), pkLen);
×
82
  }
83
  return pkLen + sizeof(SRecDataInfo);
×
84
}
85

86
int32_t saveRecalculateData(SStateStore* pStateStore, STableTsDataState* pTsDataState, SSDataBlock* pSrcBlock,
×
87
                            EStreamType mode) {
88
  int32_t code = TSDB_CODE_SUCCESS;
×
89
  int32_t lino = 0;
×
90

91
  if (pSrcBlock->info.rows == 0) {
×
92
    return TSDB_CODE_SUCCESS;
×
93
  }
94
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
95
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
96
  SColumnInfoData* pSrcCalStartTsCol =
97
      (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
98
  SColumnInfoData* pSrcCalEndTsCol =
99
      (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
100
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
×
101
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
102
  TSKEY*           srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
×
103
  TSKEY*           srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
×
104
  TSKEY*           srcCalStartTsCol = (TSKEY*)pSrcCalStartTsCol->pData;
×
105
  TSKEY*           srcCalEndTsCol = (TSKEY*)pSrcCalEndTsCol->pData;
×
106
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
×
107
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
×
108
  TSKEY            calStart = INT64_MIN;
×
109
  TSKEY            calEnd = INT64_MIN;
×
110
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
×
111
    SSessionKey key = {.win.skey = srcStartTsCol[i], .win.ekey = srcEndTsCol[i], .groupId = srcGp[i]};
×
112
    if (mode == STREAM_RETRIEVE) {
×
113
      calStart = srcCalStartTsCol[i];
×
114
      calEnd = srcCalEndTsCol[i];
×
115
    } else {
116
      calStart = srcStartTsCol[i];
×
117
      calEnd = srcEndTsCol[i];
×
118
    }
119
    int32_t len = copyRecDataToBuff(calStart, calEnd, srcUidData[i], pSrcBlock->info.version, mode, NULL, 0,
×
120
                                    pTsDataState->pRecValueBuff, pTsDataState->recValueLen);
121
    code = pStateStore->streamStateMergeAndSaveScanRange(pTsDataState, &key.win, key.groupId,
×
122
                                                         pTsDataState->pRecValueBuff, len);
123
    QUERY_CHECK_CODE(code, lino, _end);
×
124
  }
125

126
_end:
×
127
  if (code != TSDB_CODE_SUCCESS) {
×
128
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
129
  }
130
  return code;
×
131
}
132

133
void buildRecalculateDataSnapshort(SStreamScanInfo* pInfo, SExecTaskInfo* pTaskInfo) {
×
134
  void*   buff = NULL;
×
135
  int32_t len = 0;
×
136
  int32_t res = pInfo->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_DATA_SCAN_OP_REC_ID_NAME,
×
137
                                                     strlen(STREAM_DATA_SCAN_OP_REC_ID_NAME), &buff, &len);
138
  taosMemFreeClear(buff);
×
139
  if (res == TSDB_CODE_SUCCESS) {
×
140
    qDebug("===stream===%s recalculate task is not completed yet, so no need to create a recalculate snapshot", GET_TASKID(pTaskInfo));
×
141
    return;
×
142
  }
143

144
  int32_t recID = pInfo->stateStore.streamStateGetNumber(pInfo->basic.pTsDataState->pState);
×
145
  pInfo->stateStore.streamStateSaveInfo(pTaskInfo->streamInfo.pState, STREAM_DATA_SCAN_OP_REC_ID_NAME,
×
146
                                        strlen(STREAM_DATA_SCAN_OP_REC_ID_NAME), &recID, sizeof(int32_t));
147
  pInfo->stateStore.streamStateSetNumber(pInfo->basic.pTsDataState->pState, recID + 1, pInfo->primaryTsIndex);
×
148
  qDebug("===stream===%s build recalculate snapshot id:%d", GET_TASKID(pTaskInfo), recID);
×
149
}
150

151
static int32_t getRecalculateId(SStateStore* pStateStore, void* pState, int32_t* pRecId) {
×
152
  void*   buff = NULL;
×
153
  int32_t len = 0;
×
154
  int32_t res = pStateStore->streamStateGetInfo(pState, STREAM_DATA_SCAN_OP_REC_ID_NAME,
×
155
                                                strlen(STREAM_DATA_SCAN_OP_REC_ID_NAME), &buff, &len);
156
  if (res != TSDB_CODE_SUCCESS) {
×
157
    qError("Not receive recalculate start block, but received recalculate end block");
×
158
    return res;
×
159
  }
160
  *(pRecId) = *(int32_t*)buff;
×
161
  taosMemFreeClear(buff);
×
162
  return TSDB_CODE_SUCCESS;
×
163
}
164

165
static int32_t deleteRecalculateDataSnapshort(SStreamScanInfo* pInfo, SExecTaskInfo* pTaskInfo) {
×
166
  int32_t code = TSDB_CODE_SUCCESS;
×
167
  int32_t lino = 0;
×
168
  void*   buff = NULL;
×
169
  int32_t len = 0;
×
170
  int32_t prevRecId = 0;
×
171
  code = getRecalculateId(&pInfo->stateStore, pTaskInfo->streamInfo.pState, &prevRecId);
×
172
  QUERY_CHECK_CODE(code, lino, _end);
×
173

174
  pInfo->stateStore.streamStateDeleteInfo(pTaskInfo->streamInfo.pState, STREAM_DATA_SCAN_OP_REC_ID_NAME,
×
175
                                          strlen(STREAM_DATA_SCAN_OP_REC_ID_NAME));
176

177
  int32_t curID = pInfo->stateStore.streamStateGetNumber(pInfo->basic.pTsDataState->pState);
×
178
  pInfo->stateStore.streamStateSetNumber(pInfo->basic.pTsDataState->pState, prevRecId, pInfo->primaryTsIndex);
×
179
  pInfo->stateStore.streamStateSessionDeleteAll(pInfo->basic.pTsDataState->pState);
×
180

181
  pInfo->stateStore.streamStateSetNumber(pInfo->basic.pTsDataState->pState, curID, pInfo->primaryTsIndex);
×
182
  qDebug("===stream===%s delete recalculate snapshot id:%d", GET_TASKID(pTaskInfo), prevRecId);
×
183

184
_end:
×
185
  if (code != TSDB_CODE_SUCCESS) {
×
186
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
187
  }
188
  return code;
×
189
}
190

191
static int32_t readPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int64_t version, char* taskIdStr,
×
192
                                       SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
193
  int32_t      code = TSDB_CODE_SUCCESS;
×
194
  int32_t      lino = 0;
×
195
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, startTs, endTs, version);
×
196
  QUERY_CHECK_NULL(pPreRes, code, lino, _end, terrno);
×
197

198
  printDataBlock(pPreRes, "pre res", taskIdStr);
×
199
  code = blockDataEnsureCapacity(pBlock, pPreRes->info.rows + pBlock->info.rows);
×
200
  QUERY_CHECK_CODE(code, lino, _end);
×
201

202
  SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
×
203
  SColumnInfoData* pPkCol = NULL;
×
204
  if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
205
    pPkCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->basic.primaryPkIndex);
×
206
  }
207
  for (int32_t i = 0; i < pPreRes->info.rows; i++) {
×
208
    uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
×
209
    code = appendPkToSpecialBlock(pBlock, (TSKEY*)pTsCol->pData, pPkCol, i, &uid, &groupId, NULL);
×
210
    QUERY_CHECK_CODE(code, lino, _end);
×
211
  }
212
  printDataBlock(pBlock, "new delete", taskIdStr);
×
213

214
_end:
×
215
  if (code != TSDB_CODE_SUCCESS) {
×
216
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
217
  }
218
  return code;
×
219
}
220

221
static int32_t readPrevVersionDataByBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, char* pTaskIdStr) {
×
222
  int32_t code = TSDB_CODE_SUCCESS;
×
223
  int32_t lino = 0;
×
224

225
  if (pSrcBlock->info.rows == 0) {
×
226
    return TSDB_CODE_SUCCESS;
×
227
  }
228
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
229
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
230
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
×
231

232
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
×
233
  TSKEY*    srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
×
234
  TSKEY*    srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
×
235
  int64_t   ver = pSrcBlock->info.version - 1;
×
236
  blockDataCleanup(pDestBlock);
×
237
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
×
238
    code = readPreVersionDataBlock(srcUidData[i], srcStartTsCol[i], srcEndTsCol[i], ver, pTaskIdStr, pInfo,
×
239
                                   pDestBlock);
240
    QUERY_CHECK_CODE(code, lino, _end);
×
241
  }
242

243
_end:
×
244
  if (code != TSDB_CODE_SUCCESS) {
×
245
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
246
  }
247
  return code;
×
248
}
249

250
static int32_t doStreamBlockScan(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
251
  int32_t          code = TSDB_CODE_SUCCESS;
×
252
  int32_t          lino = 0;
×
253
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
254
  SStreamScanInfo* pInfo = pOperator->info;
×
255
  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
×
256

257
  qDebug("===stream===%s doStreamBlockScan", GET_TASKID(pTaskInfo));
×
258
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
×
259
  while (1) {
×
260
    if (pInfo->validBlockIndex >= total) {
×
261
      doClearBufferedBlocks(pInfo);
×
262
      (*ppRes) = NULL;
×
263
      break;
×
264
    }
265

266
    int32_t current = pInfo->validBlockIndex++;
×
267
    qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, GET_TASKID(pTaskInfo));
×
268

269
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
×
270
    QUERY_CHECK_NULL(pPacked, code, lino, _end, terrno);
×
271

272
    SSDataBlock* pBlock = pPacked->pDataBlock;
×
273
    if (pBlock->info.parTbName[0]) {
×
274
      code =
275
          pInfo->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName);
×
276
      QUERY_CHECK_CODE(code, lino, _end);
×
277
    }
278

279
    pBlock->info.calWin.skey = INT64_MIN;
×
280
    pBlock->info.calWin.ekey = INT64_MAX;
×
281
    pBlock->info.dataLoad = 1;
×
282

283
    code = blockDataUpdateTsWindow(pBlock, 0);
×
284
    QUERY_CHECK_CODE(code, lino, _end);
×
285
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
×
286
    switch (pBlock->info.type) {
×
287
      case STREAM_NORMAL:
×
288
      case STREAM_INVALID:
289
      case STREAM_GET_ALL: 
290
      case STREAM_RECALCULATE_DATA: 
291
      case STREAM_RECALCULATE_DELETE:
292
      case STREAM_CREATE_CHILD_TABLE: {
293
        setStreamOperatorState(&pInfo->basic, pBlock->info.type);
×
294
        (*ppRes) = pBlock;
×
295
      } break;
×
296
      case STREAM_DELETE_DATA: {
×
297
        printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo));
×
298
        if (pInfo->tqReader) {
×
299
          code = filterDelBlockByUid(pInfo->pDeleteDataRes, pBlock, pInfo->tqReader, &pInfo->readerFn);
×
300
          QUERY_CHECK_CODE(code, lino, _end);
×
301
        } else {
302
          // its parent operator is final agg operator
303
          code = copyDataBlock(pInfo->pDeleteDataRes, pBlock);
×
304
          QUERY_CHECK_CODE(code, lino, _end);
×
305
          pInfo->pDeleteDataRes->info.type = STREAM_RECALCULATE_DATA;
×
306
        }
307

308
        code = setBlockGroupIdByUid(pInfo, pInfo->pDeleteDataRes);
×
309
        QUERY_CHECK_CODE(code, lino, _end);
×
310
        code = rebuildDeleteBlockData(pInfo->pDeleteDataRes, &pStreamInfo->fillHistoryWindow, GET_TASKID(pTaskInfo));
×
311
        QUERY_CHECK_CODE(code, lino, _end);
×
312
        if (pInfo->pDeleteDataRes->info.rows == 0) {
×
313
          continue;
×
314
        }
315
        printSpecDataBlock(pInfo->pDeleteDataRes, getStreamOpName(pOperator->operatorType), "delete recv filtered",
×
316
                           GET_TASKID(pTaskInfo));
×
317
        if (pInfo->partitionSup.needCalc) {
×
318
          SSDataBlock* pTmpBlock = NULL;
×
319
          code = createOneDataBlock(pInfo->pDeleteDataRes, true, &pTmpBlock);
×
320
          QUERY_CHECK_CODE(code, lino, _end);
×
321
          readPrevVersionDataByBlock(pInfo, pTmpBlock, pInfo->pDeleteDataRes, GET_TASKID(pTaskInfo));
×
322
          blockDataDestroy(pTmpBlock);
×
323
        }
324
        pInfo->pDeleteDataRes->info.type = STREAM_RECALCULATE_DELETE;
×
325
        (*ppRes) = pInfo->pDeleteDataRes;
×
326
      } break;
×
327
      case STREAM_DROP_CHILD_TABLE: {
×
328
        int32_t deleteNum = 0;
×
329
        code = deletePartName(&pInfo->stateStore, pTaskInfo->streamInfo.pState, pBlock, &deleteNum);
×
330
        QUERY_CHECK_CODE(code, lino, _end);
×
331
        if (deleteNum == 0) {
×
332
          continue;
×
333
        }
334
        (*ppRes) = pBlock;
×
335
      } break;
×
336
      case STREAM_CHECKPOINT: {
×
337
        qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK");
×
338
      } break;
×
339
      case STREAM_RECALCULATE_START: {
×
340
        if (!isSemiOperator(&pInfo->basic)) {
×
341
          code = pInfo->stateStore.streamStateFlushReaminInfoToDisk(pInfo->basic.pTsDataState);
×
342
          QUERY_CHECK_CODE(code, lino, _end);
×
343
          buildRecalculateDataSnapshort(pInfo, pTaskInfo);
×
344
        }
345
        continue;
×
346
      } break;
347
      case STREAM_RECALCULATE_END: {
×
348
        if (isRecalculateOperator(&pInfo->basic)) {
×
349
          qError("stream recalculate error since recalculate operator receive STREAM_RECALCULATE_END");
×
350
          continue;
×
351
        }
352
        code = deleteRecalculateDataSnapshort(pInfo, pTaskInfo);
×
353
        QUERY_CHECK_CODE(code, lino, _end);
×
354
        continue;
×
355
      } break;
356
      default:
×
357
        break;
×
358
    }
359
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
×
360
    break;
×
361
  }
362

363
_end:
×
364
  printDataBlock((*ppRes), getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
365
  if (code != TSDB_CODE_SUCCESS) {
×
366
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
367
    pTaskInfo->code = code;
×
368
    (*ppRes) = NULL;
×
369
  }
370
  return code;
×
371
}
372

373
#ifdef BUILD_NO_CALL
374
static int32_t buildAndSaveRecalculateData(SSDataBlock* pSrcBlock, TSKEY* pTsCol, SColumnInfoData* pPkColDataInfo, int32_t num,
375
                                    SPartitionBySupporter* pParSup, SExprSupp* pPartScalarSup, SStateStore* pStateStore,
376
                                    STableTsDataState* pTsDataState, SSDataBlock* pDestBlock) {
377
  int32_t code = TSDB_CODE_SUCCESS;
378
  int32_t lino = 0;
379
  int32_t len = 0;
380
  if (pParSup->needCalc) {
381
    blockDataEnsureCapacity(pDestBlock, num * 2);
382
  } else {
383
    blockDataEnsureCapacity(pDestBlock, num);
384
  }
385

386
  for (int32_t rowId = 0; rowId < num; rowId++) {
387
    len = copyRecDataToBuff(pTsCol[rowId], pTsCol[rowId], pSrcBlock->info.id.uid, pSrcBlock->info.version, STREAM_CLEAR,
388
                            NULL, 0, pTsDataState->pRecValueBuff, pTsDataState->recValueLen);
389
    SSessionKey key = {.win.skey = pTsCol[rowId], .win.ekey = pTsCol[rowId], .groupId = 0};
390
    code = pStateStore->streamState1SessionSaveToDisk(pTsDataState, &key, pTsDataState->pRecValueBuff, len);
391
    QUERY_CHECK_CODE(code, lino, _end);
392
    uint64_t gpId = 0;
393
    code = appendPkToSpecialBlock(pDestBlock, pTsCol, pPkColDataInfo, rowId, &pSrcBlock->info.id.uid, &gpId, NULL);
394
    QUERY_CHECK_CODE(code, lino, _end);
395

396
    if (pParSup->needCalc) {
397
      key.groupId = calGroupIdByData(pParSup, pPartScalarSup, pSrcBlock, rowId);
398
      len = copyRecDataToBuff(pTsCol[rowId], pTsCol[rowId], pSrcBlock->info.id.uid, pSrcBlock->info.version,
399
                              STREAM_DELETE_DATA, NULL, 0, pTsDataState->pRecValueBuff,
400
                              pTsDataState->recValueLen);
401
      code = pStateStore->streamState1SessionSaveToDisk(pTsDataState, &key, pTsDataState->pRecValueBuff, len);
402
      QUERY_CHECK_CODE(code, lino, _end);
403

404
      code = appendPkToSpecialBlock(pDestBlock, pTsCol, pPkColDataInfo, rowId, &pSrcBlock->info.id.uid, &gpId, NULL);
405
      QUERY_CHECK_CODE(code, lino, _end);
406
    }
407
  }
408

409
_end:
410
  if (code != TSDB_CODE_SUCCESS) {
411
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
412
  }
413
  return code;
414
}
415
#endif
416

417
static uint64_t getCurDataGroupId(SPartitionBySupporter* pParSup, SExprSupp* pPartScalarSup, SSDataBlock* pSrcBlock, int32_t rowId) {
×
418
  if (pParSup->needCalc) {
×
419
    return calGroupIdByData(pParSup, pPartScalarSup, pSrcBlock, rowId);
×
420
  }
421

422
  return pSrcBlock->info.id.groupId;
×
423
}
424

425
static uint64_t getDataGroupIdByCol(SSteamOpBasicInfo* pBasic, SOperatorInfo* pTableScanOp,
×
426
                                    SPartitionBySupporter* pParSup, SExprSupp* pPartScalarSup, uint64_t uid, TSKEY ts,
427
                                    int64_t maxVersion, void* pVal, bool* pRes) {
428
  SSDataBlock* pPreRes = readPreVersionData(pTableScanOp, uid, ts, ts, maxVersion);
×
429
  if (!pPreRes || pPreRes->info.rows == 0) {
×
430
    if (terrno != TSDB_CODE_SUCCESS) {
×
431
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
432
    }
433
    (*pRes) = false;
×
434
    return 0;
×
435
  }
436

437
  int32_t rowId = 0;
×
438
  if (hasSrcPrimaryKeyCol(pBasic)) {
×
439
    SColumnInfoData* pPkCol = taosArrayGet(pPreRes->pDataBlock, pBasic->primaryPkIndex);
×
440
    for (; rowId < pPreRes->info.rows; rowId++) {
×
441
      if (comparePrimaryKey(pPkCol, rowId, pVal)) {
×
442
        break;
×
443
      }
444
    }
445
  }
446
  if (rowId >= pPreRes->info.rows) {
×
447
    qInfo("===stream===read preversion data of primary key failed. ts:%" PRId64 ",version:%" PRId64, ts, maxVersion);
×
448
    (*pRes) = false;
×
449
    return 0;
×
450
  }
451
  (*pRes) = true;
×
452
  return calGroupIdByData(pParSup, pPartScalarSup, pPreRes, rowId);
×
453
}
454

455
static int32_t buildRecalculateData(SSteamOpBasicInfo* pBasic, SOperatorInfo* pTableScanOp,
×
456
                                    SPartitionBySupporter* pParSup, SExprSupp* pPartScalarSup, SSDataBlock* pSrcBlock,
457
                                    TSKEY* pTsCol, SColumnInfoData* pPkColDataInfo, SSDataBlock* pDestBlock,
458
                                    int32_t num) {
459
  int32_t code = TSDB_CODE_SUCCESS;
×
460
  int32_t lino = 0;
×
461
  blockDataEnsureCapacity(pDestBlock, num * 2);
×
462
  for (int32_t rowId = 0; rowId < num; rowId++) {
×
463
    uint64_t gpId = getCurDataGroupId(pParSup, pPartScalarSup, pSrcBlock, rowId);
×
464
    code = appendPkToSpecialBlock(pDestBlock, pTsCol, pPkColDataInfo, rowId, &pSrcBlock->info.id.uid, &gpId, NULL);
×
465
    QUERY_CHECK_CODE(code, lino, _end);
×
466
    if (pParSup->needCalc) {
×
467
      bool  res = false;
×
468
      void* pVal = NULL;
×
469
      if (hasSrcPrimaryKeyCol(pBasic)) {
×
470
        pVal = colDataGetData(pPkColDataInfo, rowId);
×
471
      }
472
      gpId = getDataGroupIdByCol(pBasic, pTableScanOp, pParSup, pPartScalarSup, pSrcBlock->info.id.uid, pTsCol[rowId],
×
473
                                 pSrcBlock->info.version - 1, pVal, &res);
×
474
      if (res == true) {
×
475
        code = appendPkToSpecialBlock(pDestBlock, pTsCol, pPkColDataInfo, rowId, &pSrcBlock->info.id.uid, &gpId, NULL);
×
476
        QUERY_CHECK_CODE(code, lino, _end);
×
477
      }
478
    }
479
  }
480

481
_end:
×
482
  if (code != TSDB_CODE_SUCCESS) {
×
483
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
484
  }
485
  return code;
×
486
}
487

488
static int32_t doStreamWALScan(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
489
  int32_t          code = TSDB_CODE_SUCCESS;
×
490
  int32_t          lino = 0;
×
491
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
492
  SStreamScanInfo* pInfo = pOperator->info;
×
493
  SStorageAPI*     pAPI = &pTaskInfo->storageAPI;
×
494
  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
×
495
  int32_t          totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
×
496

497
  switch (pInfo->scanMode) {
×
498
    case STREAM_SCAN_FROM_RES: {
×
499
      if (pInfo->pUpdateRes->info.rows > 0) {
×
500
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
×
501
      } else {
502
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
×
503
      }
504
      (*ppRes) = pInfo->pRes;
×
505
      goto _end;
×
506
    } break;
507
    case STREAM_SCAN_FROM_UPDATERES: {
×
508
      (*ppRes) = pInfo->pUpdateRes;
×
509
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
×
510
      goto _end;
×
511
    } break;
512
    default:
×
513
      (*ppRes) = NULL;
×
514
      break;
×
515
  }
516

517
  while (1) {
518
    if (pInfo->readerFn.tqReaderCurrentBlockConsumed(pInfo->tqReader)) {
×
519
      if (pInfo->validBlockIndex >= totalBlocks) {
×
520
        doClearBufferedBlocks(pInfo);
×
521

522
        qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, GET_TASKID(pTaskInfo));
×
523
        (*ppRes) = NULL;
×
524
        goto _end;
×
525
      }
526

527
      int32_t      current = pInfo->validBlockIndex++;
×
528
      SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
×
529
      QUERY_CHECK_NULL(pSubmit, code, lino, _end, terrno);
×
530

531
      qDebug("set %d/%d as the input submit block, %s", current + 1, totalBlocks, GET_TASKID(pTaskInfo));
×
532
      if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver, NULL) < 0) {
×
533
        qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current,
×
534
               totalBlocks, GET_TASKID(pTaskInfo));
535
        continue;
×
536
      }
537
    }
538

539
    blockDataCleanup(pInfo->pRes);
×
540

541
    while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, GET_TASKID(pTaskInfo))) {
×
542
      SSDataBlock* pRes = NULL;
×
543

544
      code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, GET_TASKID(pTaskInfo));
×
545
      qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows,
×
546
             GET_TASKID(pTaskInfo));
547

548
      if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
×
549
        qDebug("retrieve data failed, try next block in submit block, %s", GET_TASKID(pTaskInfo));
×
550
        continue;
×
551
      }
552

553
      code = setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false);
×
554
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
555
        pInfo->pRes->info.rows = 0;
×
556
        code = TSDB_CODE_SUCCESS;
×
557
      }
558
      QUERY_CHECK_CODE(code, lino, _end);
×
559

560
      if (pInfo->pRes->info.rows == 0) {
×
561
        continue;
×
562
      }
563

564
      setStreamOperatorState(&pInfo->basic, pInfo->pRes->info.type);
×
565
      code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
×
566
      QUERY_CHECK_CODE(code, lino, _end);
×
567

568
      code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
×
569
      QUERY_CHECK_CODE(code, lino, _end);
×
570

571
      TSKEY   curTs = INT64_MIN;
×
572
      void*   pPkVal = NULL;
×
573
      int32_t winCode = TSDB_CODE_FAILED;
×
574
      code = getMaxTsKeyInfo(pInfo, pInfo->pRes, &curTs, &pPkVal, &winCode);
×
575
      QUERY_CHECK_CODE(code, lino, _end);
×
576
      if (pInfo->pUpdateRes->info.rows > 0) {
×
577
        blockDataCleanup(pInfo->pUpdateRes);
×
578
      }
579

580
      SColumnInfoData* pPkColDataInfo = NULL;
×
581
      if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
582
        pPkColDataInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->basic.primaryPkIndex);
×
583
      }
584
      SColumnInfoData* pTsCol = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsIndex);
×
585
      if (winCode == TSDB_CODE_SUCCESS && curTs >= pInfo->pRes->info.window.skey) {
×
586
        int32_t num = 0;
×
587
        if (curTs < pInfo->pRes->info.window.ekey) {
×
588
          num = getForwardStepsInBlock(pInfo->pRes->info.rows, binarySearchForKey, curTs, 0, TSDB_ORDER_ASC,
×
589
                                       (TSKEY*)pTsCol->pData);
×
590
          if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
591
            for (; num >= 0; num--) {
×
592
              void* pColPkData = colDataGetData(pPkColDataInfo, num);
×
593
              if (pInfo->comparePkColFn(pColPkData, pPkVal) <= 0) {
×
594
                break;
×
595
              }
596
            }
597
          }
598
        } else {
599
          num = pInfo->pRes->info.rows;
×
600
        }
601

602
        if (num > 0) {
×
603
          qInfo("%s stream scan op ignore disorder data. rows:%d, tableUid:%" PRId64 ", last max ts:%" PRId64
×
604
                ", block start key:%" PRId64 ", end key:%" PRId64,
605
                GET_TASKID(pTaskInfo), num, pInfo->pRes->info.id.uid, curTs, pInfo->pRes->info.window.skey,
606
                pInfo->pRes->info.window.ekey);
607
          code = buildRecalculateData(&pInfo->basic, pInfo->pTableScanOp, &pInfo->partitionSup, pInfo->pPartScalarSup,
×
608
                                      pInfo->pRes, (TSKEY*)pTsCol->pData, pPkColDataInfo, pInfo->pUpdateRes, num);
×
609
          QUERY_CHECK_CODE(code, lino, _end);
×
610
          code = blockDataTrimFirstRows(pInfo->pRes, num);
×
611
          QUERY_CHECK_CODE(code, lino, _end);
×
612
          code = blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
×
613
          QUERY_CHECK_CODE(code, lino, _end);
×
614
        }
615
      }
616

617
      if (pInfo->pCreateTbRes->info.rows > 0) {
×
618
        if (pInfo->pRes->info.rows > 0) {
×
619
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
×
620
        } else if (pInfo->pUpdateRes->info.rows > 0) {
×
621
          pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
×
622
        }
623
        qDebug("create table res exists, rows:%" PRId64 " return from stream scan, %s", pInfo->pCreateTbRes->info.rows,
×
624
               GET_TASKID(pTaskInfo));
625
        (*ppRes) = pInfo->pCreateTbRes;
×
626
        break;
×
627
      }
628

629
      if (pInfo->pRes->info.rows > 0) {
×
630
        if (pInfo->pUpdateRes->info.rows > 0) {
×
631
          pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
×
632
        }
633
        (*ppRes) = pInfo->pRes;
×
634
        break;
×
635
      }
636

637
      if (pInfo->pUpdateRes->info.rows > 0) {
×
638
        (*ppRes) = pInfo->pUpdateRes;
×
639
        break;
×
640
      }
641
    }
642

643
    if ((*ppRes) != NULL && (*ppRes)->info.rows > 0) {
×
644
      break;
×
645
    }
646
  }
647

648
_end:
×
649
  printDataBlock(*ppRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
650
  if (code != TSDB_CODE_SUCCESS) {
×
651
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
652
    pTaskInfo->code = code;
×
653
    (*ppRes) = NULL;
×
654
  }
655
  return code;
×
656
}
657

658
void streamDataScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
×
659
  int32_t code = TSDB_CODE_SUCCESS;
×
660
  int32_t lino = 0;
×
661

662
  if (needSaveStreamOperatorInfo(&pInfo->basic)) {
×
663
    pInfo->stateStore.streamStateTsDataCommit(pInfo->basic.pTsDataState);
×
664
    saveStreamOperatorStateComplete(&pInfo->basic);
×
665
  }
666

667
_end:
×
668
  if (code != TSDB_CODE_SUCCESS) {
×
669
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
670
  }
671
}
×
672

673
int32_t doStreamDataScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
674
  int32_t          code = TSDB_CODE_SUCCESS;
×
675
  int32_t          lino = 0;
×
676
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
677
  const char*      id = GET_TASKID(pTaskInfo);
×
678
  SStorageAPI*     pAPI = &pTaskInfo->storageAPI;
×
679
  SStreamScanInfo* pInfo = pOperator->info;
×
680
  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
×
681

682
  qDebug("stream data scan started, %s", id);
×
683

684
  if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
×
685
      pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
×
686
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
×
687
    memcpy(&pTSInfo->base.cond, &pStreamInfo->tableCond, sizeof(SQueryTableDataCond));
×
688

689
    if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
×
690
      pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
×
691
      pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
×
692

693
      pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
×
694
      qDebug("stream scan step1, verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 ", %s",
×
695
             pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
696
             pTSInfo->base.cond.twindows.ekey, id);
697
      pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN1;
×
698
      pStreamInfo->recoverScanFinished = false;
×
699
    } else {
700
      pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
×
701
      pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
×
702
      pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
×
703
      qDebug("stream scan step2 (scan wal), verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
×
704
             pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
705
             pTSInfo->base.cond.twindows.ekey, id);
706
      pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
×
707
    }
708

709
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
×
710

711
    pTSInfo->base.dataReader = NULL;
×
712
    pInfo->pTableScanOp->status = OP_OPENED;
×
713

714
    pTSInfo->scanTimes = 0;
×
715
    pTSInfo->currentGroupId = -1;
×
716
  }
717

718
  if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
×
719
    if (isTaskKilled(pTaskInfo)) {
×
720
      qInfo("===stream===stream scan is killed. task id:%s, code %s", id, tstrerror(pTaskInfo->code));
×
721
      (*ppRes) = NULL;
×
722
      return code;
×
723
    }
724

725
    if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
×
726
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
×
727
      (*ppRes) = pInfo->pRecoverRes;
×
728
      return code;
×
729
    }
730

731
    while (1) {
×
732
      code = doTableScanNext(pInfo->pTableScanOp, &pInfo->pRecoverRes);
×
733
      QUERY_CHECK_CODE(code, lino, _end);
×
734
      if (pInfo->pRecoverRes == NULL) {
×
735
        break;
×
736
      }
737
      setStreamOperatorState(&pInfo->basic, pInfo->pRecoverRes->info.type);
×
738
      code = doFilter(pInfo->pRecoverRes, pOperator->exprSupp.pFilterInfo, NULL);
×
739
      QUERY_CHECK_CODE(code, lino, _end);
×
740

741
      if (pInfo->pRecoverRes->info.rows <= 0) {
×
742
        continue;
×
743
      }
744

745
      TSKEY   curTs = INT64_MIN;
×
746
      void*   pPkVal = NULL;
×
747
      int32_t winCode = TSDB_CODE_FAILED;
×
748
      code = getMaxTsKeyInfo(pInfo, pInfo->pRecoverRes, &curTs, &pPkVal, &winCode);
×
749
      QUERY_CHECK_CODE(code, lino, _end);
×
750

751
      code = calBlockTbName(pInfo, pInfo->pRecoverRes, 0);
×
752
      QUERY_CHECK_CODE(code, lino, _end);
×
753

754
      if (pInfo->pCreateTbRes->info.rows > 0) {
×
755
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
×
756
        printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "recover",
×
757
                           GET_TASKID(pTaskInfo));
×
758
        (*ppRes) = pInfo->pCreateTbRes;
×
759
        return code;
×
760
      }
761

762
      qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
×
763
      printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover",
×
764
                         GET_TASKID(pTaskInfo));
×
765
      (*ppRes) = pInfo->pRecoverRes;
×
766
      return code;
×
767
    }
768
    pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
×
769
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
×
770
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
×
771
    pTSInfo->base.dataReader = NULL;
×
772
    pTSInfo->base.cond.startVersion = -1;
×
773
    pTSInfo->base.cond.endVersion = -1;
×
774
    pStreamInfo->recoverScanFinished = true;
×
775
    (*ppRes) = NULL;
×
776
    qDebug("===stream===%s fill history is finished.", GET_TASKID(pTaskInfo));
×
777
    return code;
×
778
  }
779

780
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
×
781

782
  switch (pInfo->blockType) {
×
783
    case STREAM_INPUT__DATA_BLOCK: {
×
784
      doStreamBlockScan(pOperator, ppRes);
×
785
    } break;
×
786
    case STREAM_INPUT__DATA_SUBMIT: {
×
787
      doStreamWALScan(pOperator, ppRes);
×
788
    } break;
×
789
    case STREAM_INPUT__CHECKPOINT:
×
790
    case STREAM_INPUT__RECALCULATE: {
791
      if (pInfo->validBlockIndex >= total) {
×
792
        doClearBufferedBlocks(pInfo);
×
793
        (*ppRes) = NULL;
×
794
        return code;
×
795
      }
796

797
      int32_t current = pInfo->validBlockIndex++;
×
798
      qDebug("process %d/%d input data blocks, %s", current, (int32_t)total, id);
×
799

800
      SPackedData* pData = taosArrayGet(pInfo->pBlockLists, current);
×
801
      QUERY_CHECK_NULL(pData, code, lino, _end, terrno);
×
802
      SSDataBlock* pBlock = taosArrayGet(pData->pDataBlock, 0);
×
803
      QUERY_CHECK_NULL(pBlock, code, lino, _end, terrno);
×
804
      printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
×
805

806
      if (pBlock->info.type == STREAM_CHECKPOINT) {
×
807
        code = pInfo->stateStore.streamStateFlushReaminInfoToDisk(pInfo->basic.pTsDataState);
×
808
        QUERY_CHECK_CODE(code, lino, _end);
×
809
        streamDataScanOperatorSaveCheckpoint(pInfo);
×
810
        (*ppRes) = pInfo->pCheckpointRes;
×
811
      } else if (pBlock->info.type == STREAM_RECALCULATE_START) {
×
812
        if (!isSemiOperator(&pInfo->basic)) {
×
813
          code = pInfo->stateStore.streamStateFlushReaminInfoToDisk(pInfo->basic.pTsDataState);
×
814
          QUERY_CHECK_CODE(code, lino, _end);
×
815
          buildRecalculateDataSnapshort(pInfo, pTaskInfo);
×
816
        }
817
      } else if (pBlock->info.type == STREAM_RECALCULATE_END) {
×
818
        if (isRecalculateOperator(&pInfo->basic)) {
×
819
          qError("stream recalculate error since recalculate operator receive STREAM_RECALCULATE_END");
×
820
        } else {
821
          code = deleteRecalculateDataSnapshort(pInfo, pTaskInfo);
×
822
          QUERY_CHECK_CODE(code, lino, _end);
×
823
        }
824
      }
825
      (*ppRes) = pBlock;
×
826
      return code;
×
827
    } break;
828
    default: {
×
829
      qError("stream scan error, invalid block type %d, %s", pInfo->blockType, id);
×
830
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
831
    } break;
×
832
  }
833

834
_end:
×
835
  if (code != TSDB_CODE_SUCCESS) {
×
836
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
837
    pTaskInfo->code = code;
×
838
    (*ppRes) = NULL;
×
839
  }
840
  return code;
×
841
}
842

843
void streamDataScanReleaseState(SOperatorInfo* pOperator) {
×
844
  SStreamScanInfo* pInfo = pOperator->info;
×
845
  pInfo->stateStore.streamStateTsDataCommit(pInfo->basic.pTsDataState);
×
846
}
×
847

848
void streamDataScanReloadState(SOperatorInfo* pOperator) {
×
849
  int32_t          code = TSDB_CODE_SUCCESS;
×
850
  int32_t          lino = 0;
×
851
  SStreamScanInfo* pInfo = pOperator->info;
×
852
  code = pInfo->stateStore.streamStateReloadTsDataState(pInfo->basic.pTsDataState);
×
853
  QUERY_CHECK_CODE(code, lino, _end);
×
854

855
_end:
×
856
  if (code != TSDB_CODE_SUCCESS) {
×
857
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
858
  }
859
}
×
860

861
static uint64_t getDataGroupId(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion, void* pVal,
×
862
                               bool* pRes) {
863
  if (pInfo->partitionSup.needCalc) {
×
864
    return getDataGroupIdByCol(&pInfo->basic, pInfo->pTableScanOp, &pInfo->partitionSup, pInfo->pPartScalarSup, uid, ts,
×
865
                               maxVersion, pVal, pRes);
866
  }
867

868
  *pRes = true;
×
869
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
×
870
  return tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
×
871
}
872

873
static int32_t generateSessionDataScanRange(SStreamScanInfo* pInfo, SSHashObj* pRecRangeMap, SArray* pSrcRange) {
×
874
  int32_t code = TSDB_CODE_SUCCESS;
×
875
  int32_t lino = 0;
×
876

877
  int32_t size =  taosArrayGetSize(pSrcRange);
×
878
  for (int32_t i = 0; i < size; i++) {
×
879
    SArray* pRange = taosArrayGetP(pSrcRange, i);
×
880
    uint64_t      groupId = *(uint64_t*) taosArrayGet(pRange, 2);
×
881
    STimeWindow   resWin = {0};
×
882
    resWin.skey = *(TSKEY*) taosArrayGet(pRange, 3);
×
883
    resWin.ekey = *(TSKEY*) taosArrayGet(pRange, 4);
×
884

885
    SSessionKey key = {.groupId = groupId};
×
886
    key.win.skey = *(TSKEY*) taosArrayGet(pRange, 0);
×
887
    key.win.ekey = *(TSKEY*) taosArrayGet(pRange, 1);
×
888
    void* pVal = tSimpleHashGet(pRecRangeMap, &key, sizeof(SSessionKey));
×
889
    QUERY_CHECK_NULL(pVal, code, lino, _end, TSDB_CODE_FAILED);
×
890
    SRecDataInfo* pRecData = *(void**)pVal;
×
891

892
    code = pInfo->stateStore.streamStateMergeAndSaveScanRange(pInfo->basic.pTsDataState, &resWin, groupId, pRecData,
×
893
                                                              pInfo->basic.pTsDataState->recValueLen);
×
894
    QUERY_CHECK_CODE(code, lino, _end);
×
895
    taosArrayDestroy(pRange);
×
896
  }
897
  taosArrayClear(pSrcRange);
×
898

899
_end:
×
900
  if (code != TSDB_CODE_SUCCESS) {
×
901
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
902
  }
903
  return code;
×
904
}
905

906
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, char* pTaskIdStr) {
×
907
  int32_t code = TSDB_CODE_SUCCESS;
×
908
  int32_t lino = 0;
×
909

910
  SSessionKey      firstKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
×
911
  SStreamStateCur* pCur =
912
      pInfo->stateStore.streamStateSessionSeekKeyCurrentNext(pInfo->basic.pTsDataState->pStreamTaskState, &firstKey);
×
913
  while (1) {
×
914
    SSessionKey rangKey = {0};
×
915
    void*       pVal = NULL;
×
916
    int32_t     len = 0;
×
917
    int32_t     winRes = pInfo->stateStore.streamStateSessionGetKVByCur(pCur, &rangKey, &pVal, &len);
×
918
    if (winRes != TSDB_CODE_SUCCESS) {
×
919
      break;
×
920
    }
921
    qDebug("===stream===%s get range from disk. start ts:%" PRId64 ",end ts:%" PRId64 ", group id:%" PRIu64, pTaskIdStr,
×
922
           rangKey.win.skey, rangKey.win.ekey, rangKey.groupId);
923
    code = tSimpleHashPut(pInfo->pRecRangeMap, &rangKey, sizeof(SSessionKey), &pVal, POINTER_BYTES);
×
924
    QUERY_CHECK_CODE(code, lino, _end);
×
925

926
    if (tSimpleHashGetSize(pInfo->pRecRangeMap) > 1024) {
×
927
      code = streamClientGetResultRange(&pInfo->recParam, pInfo->pRecRangeMap, pInfo->pRecRangeRes);
×
928
      QUERY_CHECK_CODE(code, lino, _end);
×
929
      code = generateSessionDataScanRange(pInfo, pInfo->pRecRangeMap, pInfo->pRecRangeRes);
×
930
      QUERY_CHECK_CODE(code, lino, _end);
×
931
      tSimpleHashClear(pInfo->pRecRangeMap);
×
932
    }
933
    pInfo->stateStore.streamStateCurNext(pInfo->basic.pTsDataState->pStreamTaskState, pCur);
×
934
  }
935
  pInfo->stateStore.streamStateFreeCur(pCur);
×
936

937
  if (tSimpleHashGetSize(pInfo->pRecRangeMap) > 0) {
×
938
    code = streamClientGetResultRange(&pInfo->recParam, pInfo->pRecRangeMap, pInfo->pRecRangeRes);
×
939
    QUERY_CHECK_CODE(code, lino, _end);
×
940
    code = generateSessionDataScanRange(pInfo, pInfo->pRecRangeMap, pInfo->pRecRangeRes);
×
941
    QUERY_CHECK_CODE(code, lino, _end);
×
942
    tSimpleHashClear(pInfo->pRecRangeMap);
×
943
  }
944

945
_end:
×
946
  if (code != TSDB_CODE_SUCCESS) {
×
947
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
948
  }
949
  return code;
×
950
}
951

952
static int32_t generateIntervalDataScanRange(SStreamScanInfo* pInfo, char* pTaskIdStr, SSessionKey* pSeKey,
×
953
                                             SRecDataInfo* pRecData, int32_t len) {
954
  int32_t        code = TSDB_CODE_SUCCESS;
×
955
  int32_t        lino = 0;
×
956
  int32_t        rowId = 0;
×
957
  SDataBlockInfo tmpInfo = {0};
×
958
  tmpInfo.rows = 1;
×
959
  STimeWindow win = getSlidingWindow(&pSeKey->win.skey, &pSeKey->win.ekey, &pSeKey->groupId, &pInfo->interval, &tmpInfo,
×
960
                                     &rowId, pInfo->partitionSup.needCalc);
×
961
  pInfo->stateStore.streamStateMergeAndSaveScanRange(pInfo->basic.pTsDataState, &win, pSeKey->groupId, pRecData, len);
×
962

963
_end:
×
964
  if (code != TSDB_CODE_SUCCESS) {
×
965
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
966
  }
967
  return code;
×
968
}
969

970
static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, char* pTaskIdStr) {
×
971
  int32_t code = TSDB_CODE_SUCCESS;
×
972
  int32_t lino = 0;
×
973

974
  SSessionKey      firstKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
×
975
  SStreamStateCur* pCur =
976
      pInfo->stateStore.streamStateSessionSeekKeyCurrentNext(pInfo->basic.pTsDataState->pStreamTaskState, &firstKey);
×
977
  while (1) {
×
978
    SSessionKey rangKey = {0};
×
979
    void*       pVal = NULL;
×
980
    int32_t     len = 0;
×
981
    int32_t     winRes = pInfo->stateStore.streamStateSessionGetKVByCur(pCur, &rangKey, &pVal, &len);
×
982
    if (winRes != TSDB_CODE_SUCCESS) {
×
983
      break;
×
984
    }
985
    qDebug("===stream===%s get range from disk. start ts:%" PRId64 ",end ts:%" PRId64 ", group id:%" PRIu64,
×
986
           pTaskIdStr, rangKey.win.skey, rangKey.win.ekey, rangKey.groupId);
987
    code = generateIntervalDataScanRange(pInfo, pTaskIdStr, &rangKey, (SRecDataInfo*)pVal, len);
×
988
    QUERY_CHECK_CODE(code, lino, _end);
×
989
    taosMemFreeClear(pVal);
×
990
    pInfo->stateStore.streamStateCurNext(pInfo->basic.pTsDataState->pStreamTaskState, pCur);
×
991
  }
992
  pInfo->stateStore.streamStateFreeCur(pCur);
×
993

994
_end:
×
995
  if (code != TSDB_CODE_SUCCESS) {
×
996
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
997
  }
998
  return code;
×
999
}
1000

1001
static int32_t generateDataScanRange(SStreamScanInfo* pInfo, char* pTaskIdStr) {
×
1002
  int32_t code = TSDB_CODE_SUCCESS;
×
1003
  int32_t lino = 0;
×
1004
  switch (pInfo->windowSup.parentType) {
×
1005
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL:
×
1006
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SEMI_INTERVAL:
1007
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_FINAL_INTERVAL: {
1008
      code = generateIntervalScanRange(pInfo, pTaskIdStr);
×
1009
      QUERY_CHECK_CODE(code, lino, _end);
×
1010
    } break;
×
1011
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SESSION:
×
1012
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SEMI_SESSION:
1013
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_FINAL_SESSION:
1014
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_STATE:
1015
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_EVENT: {
1016
      code = generateSessionScanRange(pInfo, pTaskIdStr);
×
1017
      QUERY_CHECK_CODE(code, lino, _end);
×
1018
    } break;
×
1019
    case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_COUNT: {
×
1020

1021
    } break;
×
1022
    default:
×
1023
      break;
×
1024
  }
1025

1026
_end:
×
1027
  if (code != TSDB_CODE_SUCCESS) {
×
1028
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1029
  }
1030
  return code;
×
1031
}
1032

1033
static int32_t doOneRangeScan(SStreamScanInfo* pInfo, SScanRange* pRange, SSDataBlock** ppRes) {
×
1034
  qDebug("do stream recalculate scan.");
×
1035

1036
  int32_t code = TSDB_CODE_SUCCESS;
×
1037
  int32_t lino = 0;
×
1038

1039
  SOperatorInfo* pScanOp = NULL;
×
1040
  if (pInfo->scanAllTables) {
×
1041
    pScanOp = pInfo->pTableScanOp;
×
1042
  } else {
1043
    pScanOp = pInfo->pRecTableScanOp;
×
1044
  }
1045

1046
  while (1) {
×
1047
    SSDataBlock* pResult = NULL;
×
1048
    code = doTableScanNext(pScanOp, &pResult);
×
1049
    QUERY_CHECK_CODE(code, lino, _end);
×
1050

1051
    STableScanInfo* pTableScanInfo = pScanOp->info;
×
1052
    if (pResult == NULL) {
×
1053
      pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
×
1054
      pTableScanInfo->base.dataReader = NULL;
×
1055
      (*ppRes) = NULL;
×
1056
      goto _end;
×
1057
    }
1058

1059
    code = doFilter(pResult, pScanOp->exprSupp.pFilterInfo, NULL);
×
1060
    QUERY_CHECK_CODE(code, lino, _end);
×
1061
    if (pResult->info.rows == 0) {
×
1062
      continue;
×
1063
    }
1064

1065
    printDataBlock(pResult, "tsdb", GET_TASKID(pScanOp->pTaskInfo));
×
1066
    if (!pInfo->assignBlockUid) {
×
1067
      pResult->info.id.groupId = 0;
×
1068
    }
1069

1070
    if (pInfo->partitionSup.needCalc) {
×
1071
      SSDataBlock* tmpBlock = NULL;
×
1072
      code = createOneDataBlock(pResult, true, &tmpBlock);
×
1073
      QUERY_CHECK_CODE(code, lino, _end);
×
1074

1075
      blockDataCleanup(pResult);
×
1076
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
×
1077
        uint64_t dataGroupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i);
×
1078
        if (tSimpleHashGet(pRange->pGroupIds, &dataGroupId, sizeof(uint64_t)) != NULL) {
×
1079
          for (int32_t j = 0; j < pScanOp->exprSupp.numOfExprs; j++) {
×
1080
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
×
1081
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
×
1082
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
×
1083
            char*            pSrcData = NULL;
×
1084
            if (!isNull) pSrcData = colDataGetData(pSrcCol, i);
×
1085
            code = colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
×
1086
            QUERY_CHECK_CODE(code, lino, _end);
×
1087
          }
1088
          pResult->info.rows++;
×
1089
        }
1090
      }
1091

1092
      blockDataDestroy(tmpBlock);
×
1093

1094
      if (pResult->info.rows > 0) {
×
1095
        pResult->info.calWin = pRange->calWin;
×
1096
        (*ppRes) = pResult;
×
1097
        goto _end;
×
1098
      }
1099
    } else {
1100
      if (tSimpleHashGet(pRange->pGroupIds, &pResult->info.id.groupId, sizeof(uint64_t)) != NULL) {
×
1101
        pResult->info.calWin = pRange->calWin;
×
1102
        (*ppRes) = pResult;
×
1103
        goto _end;
×
1104
      }
1105
    }
1106
  }
1107

1108
_end:
×
1109
  if (code != TSDB_CODE_SUCCESS) {
×
1110
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1111
  }
1112
  return code;
×
1113
}
1114

1115
// static void exchangeTableListInfo(SStreamScanInfo* pInfo, SOperatorInfo* pScanOp) {
1116
//   STableScanInfo* pScanInfo = (STableScanInfo*)pScanOp->info;
1117
//   STableListInfo* pTemp = pScanInfo->base.pTableListInfo;
1118
//   pScanInfo->base.pTableListInfo = pInfo->pRecTableListInfo;
1119
//   pInfo->pRecTableListInfo = pTemp;
1120
// }
1121

1122
static int32_t prepareDataRangeScan(SStreamScanInfo* pInfo, SScanRange* pRange) {
×
1123
  int32_t                  code = TSDB_CODE_SUCCESS;
×
1124
  int32_t                  lino = 0;
×
1125
  SOperatorParam*          pOpParam = NULL;
×
1126
  STableScanOperatorParam* pTableScanParam = NULL;
×
1127

1128
  qDebug("prepare data range scan start:%" PRId64 ",end:%" PRId64 ",is all table:%d", pRange->win.skey,
×
1129
         pRange->win.ekey, pInfo->scanAllTables);
1130
  SOperatorInfo* pScanOp = NULL;
×
1131
  if (pInfo->scanAllTables) {
×
1132
    pScanOp = pInfo->pTableScanOp;
×
1133
  } else {
1134
    pScanOp = pInfo->pRecTableScanOp;
×
1135
  }
1136

1137
  resetTableScanInfo(pScanOp->info, &pRange->win, -1);
×
1138
  pScanOp->status = OP_OPENED;
×
1139

1140
  if (pInfo->scanAllTables == true) {
×
1141
    goto _end;
×
1142
  }
1143

1144
  pOpParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
×
1145
  QUERY_CHECK_NULL(pOpParam, code, lino, _end, terrno);
×
1146
  pOpParam->downstreamIdx = 0;
×
1147
  pOpParam->opType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
1148
  pOpParam->pChildren = NULL;
×
1149

1150
  pTableScanParam = taosMemoryCalloc(1, sizeof(STableScanOperatorParam));
×
1151
  QUERY_CHECK_NULL(pTableScanParam, code, lino, _end, terrno);
×
1152
  pOpParam->value = pTableScanParam;
×
1153
  pTableScanParam->tableSeq = false;
×
1154
  int32_t size = tSimpleHashGetSize(pRange->pUIds);
×
1155
  pTableScanParam->pUidList = taosArrayInit(size, sizeof(uint64_t));
×
1156
  QUERY_CHECK_NULL(pTableScanParam->pUidList, code, lino, _end, terrno);
×
1157

1158
  void*   pIte = NULL;
×
1159
  int32_t iter = 0;
×
1160
  while ((pIte = tSimpleHashIterate(pRange->pUIds, pIte, &iter)) != NULL) {
×
1161
    void* pTempUid = tSimpleHashGetKey(pIte, NULL);
×
1162
    QUERY_CHECK_NULL(pTempUid, code, lino, _end, terrno);
×
1163
    void* pTemRes = taosArrayPush(pTableScanParam->pUidList, pTempUid);
×
1164
    QUERY_CHECK_NULL(pTemRes, code, lino, _end, terrno);
×
1165
    qDebug("prepare data range add table uid:%" PRIu64, *(uint64_t*)pTempUid);
×
1166
  }
1167
  pScanOp->pOperatorGetParam = pOpParam;
×
1168

1169
_end:
×
1170
  if (code != TSDB_CODE_SUCCESS) {
×
1171
    taosMemoryFree(pOpParam);
×
1172
    if (pTableScanParam != NULL) {
×
1173
      if (pTableScanParam->pUidList != NULL) {
×
1174
        taosArrayDestroy(pTableScanParam->pUidList);
×
1175
      }
1176
      taosMemoryFree(pTableScanParam);
×
1177
    }
1178
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1179
  }
1180
  return code;
×
1181
}
1182

1183
static void destroyScanRange(SScanRange* pRange) {
×
1184
  pRange->win.skey = INT64_MIN;
×
1185
  pRange->win.ekey = INT64_MIN;
×
1186
  tSimpleHashCleanup(pRange->pUIds);
×
1187
  pRange->pUIds = NULL;
×
1188
  tSimpleHashCleanup(pRange->pGroupIds);
×
1189
  pRange->pGroupIds = NULL;
×
1190
}
×
1191

1192
static int32_t buildRecBlockByRange(SScanRange* pRange, SSDataBlock* pRes) {
×
1193
  int32_t code = TSDB_CODE_SUCCESS;
×
1194
  int32_t lino = 0;
×
1195
  int32_t size = tSimpleHashGetSize(pRange->pGroupIds);
×
1196
  code = blockDataEnsureCapacity(pRes, size);
×
1197
  QUERY_CHECK_CODE(code, lino, _end);
×
1198

1199
  void*   pIte = NULL;
×
1200
  int32_t iter = 0;
×
1201
  while ((pIte = tSimpleHashIterate(pRange->pGroupIds, pIte, &iter)) != NULL) {
×
1202
    uint64_t* pGroupId = (uint64_t*)tSimpleHashGetKey(pIte, NULL);
×
1203
    code = appendOneRowToSpecialBlockImpl(pRes, &pRange->win.skey, &pRange->win.ekey, &pRange->calWin.skey,
×
1204
                                          &pRange->calWin.ekey, NULL, pGroupId, NULL, NULL);
1205
    QUERY_CHECK_CODE(code, lino, _end);
×
1206
  }
1207
  pRes->info.type = STREAM_RECALCULATE_DATA;
×
1208

1209
_end:
×
1210
  if (code != TSDB_CODE_SUCCESS) {
×
1211
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1212
  }
1213
  return code;
×
1214
}
1215

1216
static int32_t doDataRangeScan(SStreamScanInfo* pInfo, SExecTaskInfo* pTaskInfo, SSDataBlock** ppRes) {
×
1217
  int32_t      code = TSDB_CODE_SUCCESS;
×
1218
  int32_t      lino = 0;
×
1219
  SSDataBlock* pTsdbBlock = NULL;
×
1220
  pInfo->pRecoverRes = NULL;
×
1221
  while (1) {
1222
    if (IS_INVALID_RANGE(pInfo->curRange)) {
×
1223
      code = pInfo->stateStore.streamStateMergeAllScanRange(pInfo->basic.pTsDataState);
×
1224
      QUERY_CHECK_CODE(code, lino, _end);
×
1225

1226
      code = pInfo->stateStore.streamStatePopScanRange(pInfo->basic.pTsDataState, &pInfo->curRange);
×
1227
      QUERY_CHECK_CODE(code, lino, _end);
×
1228
      if (IS_INVALID_RANGE(pInfo->curRange)) {
×
1229
        break;
×
1230
      }
1231
      code = prepareDataRangeScan(pInfo, &pInfo->curRange);
×
1232
      QUERY_CHECK_CODE(code, lino, _end);
×
1233

1234
      blockDataCleanup(pInfo->pUpdateRes);
×
1235
      code = buildRecBlockByRange(&pInfo->curRange, pInfo->pUpdateRes);
×
1236
      QUERY_CHECK_CODE(code, lino, _end);
×
1237
      if (pInfo->pUpdateRes->info.rows > 0) {
×
1238
        (*ppRes) = pInfo->pUpdateRes;
×
1239
        break;
×
1240
      }
1241
    }
1242

1243
    code = doOneRangeScan(pInfo, &pInfo->curRange, &pTsdbBlock);
×
1244
    QUERY_CHECK_CODE(code, lino, _end);
×
1245

1246
    if (pTsdbBlock != NULL) {
×
1247
      pInfo->pRangeScanRes = pTsdbBlock;
×
1248
      code = calBlockTbName(pInfo, pTsdbBlock, 0);
×
1249
      QUERY_CHECK_CODE(code, lino, _end);
×
1250

1251
      if (pInfo->pCreateTbRes->info.rows > 0) {
×
1252
        (*ppRes) = pInfo->pCreateTbRes;
×
1253
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
×
1254
        break;
×
1255
      }
1256
      (*ppRes) = pTsdbBlock;
×
1257
      break;
×
1258
    } else {
1259
      destroyScanRange(&pInfo->curRange);
×
1260
    }
1261
  }
1262

1263
_end:
×
1264
  printDataBlock((*ppRes), "stream tsdb scan", GET_TASKID(pTaskInfo));
×
1265
  if (code != TSDB_CODE_SUCCESS) {
×
1266
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1267
  }
1268
  return code;
×
1269
}
1270

1271
static int32_t buildStreamRecalculateBlock(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
1272
  int32_t          code = TSDB_CODE_SUCCESS;
×
1273
  int32_t          lino = 0;
×
1274
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
1275
  SStreamScanInfo* pInfo = pOperator->info;
×
1276

1277
  if (pInfo->basic.pTsDataState->curRecId == -1) {
×
1278
    int32_t recId = 0;
×
1279
    code = getRecalculateId(&pInfo->stateStore, pInfo->basic.pTsDataState->pStreamTaskState, &recId);
×
1280
    QUERY_CHECK_CODE(code, lino, _end);
×
1281

1282
    qDebug("===stream===%s do recalculate.recId:%d", GET_TASKID(pTaskInfo), recId);
×
1283
    pInfo->basic.pTsDataState->curRecId = recId;
×
1284
    pInfo->stateStore.streamStateSetNumber(pInfo->basic.pTsDataState->pStreamTaskState, recId, pInfo->primaryTsIndex);
×
1285

1286
    SSessionKey      firstKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
×
1287
    pInfo->basic.pTsDataState->pRecCur =
×
1288
        pInfo->stateStore.streamStateSessionSeekKeyCurrentNext(pInfo->basic.pTsDataState->pStreamTaskState, &firstKey);
×
1289
  }
1290

1291
  blockDataCleanup(pInfo->pUpdateRes);
×
1292
  code = blockDataEnsureCapacity(pInfo->pUpdateRes, 1024);
×
1293
  QUERY_CHECK_CODE(code, lino, _end);
×
1294

1295
  while (1) {
×
1296
    SSessionKey rangKey = {0};
×
1297
    void*       pVal = NULL;
×
1298
    int32_t     len = 0;
×
1299
    int32_t     winRes = pInfo->stateStore.streamStateSessionGetKVByCur(pInfo->basic.pTsDataState->pRecCur, &rangKey, &pVal, &len);
×
1300
    if (winRes != TSDB_CODE_SUCCESS) {
×
1301
      break;
×
1302
    }
1303
    SRecDataInfo* pRecData = (SRecDataInfo*)pVal;
×
1304
    if (pInfo->pUpdateRes->info.rows == 0) {
×
1305
      pInfo->pUpdateRes->info.type = pRecData->mode;
×
1306
    } else if (pInfo->pUpdateRes->info.type != pRecData->mode || pInfo->pUpdateRes->info.rows ==  pInfo->pUpdateRes->info.capacity) {
×
1307
      break;
1308
    }
1309

1310
    code = appendOneRowToSpecialBlockImpl(pInfo->pUpdateRes, &rangKey.win.skey, &rangKey.win.ekey, &pRecData->calWin.skey,
×
1311
                                          &pRecData->calWin.ekey, &pRecData->tableUid ,&rangKey.groupId, NULL, (void*)pRecData->pPkColData);
×
1312
    QUERY_CHECK_CODE(code, lino, _end);
×
1313
    pInfo->stateStore.streamStateCurNext(pInfo->basic.pTsDataState->pStreamTaskState, pInfo->basic.pTsDataState->pRecCur);
×
1314
  }
1315

1316
  if (pInfo->pUpdateRes->info.rows > 0) {
×
1317
    (*ppRes) = pInfo->pUpdateRes;
×
1318
  } else {
1319
    (*ppRes) = NULL;
×
1320
    pInfo->stateStore.streamStateFreeCur(pInfo->basic.pTsDataState->pRecCur);
×
1321
    pInfo->basic.pTsDataState->pRecCur = NULL;
×
1322
    pInfo->basic.pTsDataState->curRecId = -1;
×
1323
  }
1324

1325
_end:
×
1326
  if (code != TSDB_CODE_SUCCESS) {
×
1327
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1328
    pTaskInfo->code = code;
×
1329
  }
1330
  return code;
×
1331
}
1332

1333
static int32_t doStreamRecalculateDataScan(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
1334
  int32_t          code = TSDB_CODE_SUCCESS;
×
1335
  int32_t          lino = 0;
×
1336
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
1337
  SStreamScanInfo* pInfo = pOperator->info;
×
1338

1339
  switch (pInfo->scanMode) {
×
1340
    case STREAM_SCAN_FROM_RES: {
×
1341
      (*ppRes) = pInfo->pRangeScanRes;
×
1342
      pInfo->pRangeScanRes = NULL;
×
1343
      pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
×
1344
      printDataBlock((*ppRes), "stream tsdb scan", GET_TASKID(pTaskInfo));
×
1345
      goto _end;
×
1346
    } break;
1347
    case STREAM_SCAN_FROM_CREATE_TABLERES: {
×
1348
      (*ppRes) = pInfo->pCreateTbRes;
×
1349
      pInfo->scanMode = STREAM_SCAN_FROM_RES;
×
1350
    }
1351
    default:
×
1352
      (*ppRes) = NULL;
×
1353
      break;
×
1354
  }
1355

1356
  if (pInfo->basic.pTsDataState->curRecId == -1) {
×
1357
    int32_t recId = 0;
×
1358
    code = getRecalculateId(&pInfo->stateStore, pInfo->basic.pTsDataState->pStreamTaskState, &recId);
×
1359
    QUERY_CHECK_CODE(code, lino, _end);
×
1360
    qDebug("===stream===%s do recalculate.recId:%d", GET_TASKID(pTaskInfo), recId);
×
1361
    pInfo->stateStore.streamStateSetNumber(pInfo->basic.pTsDataState->pStreamTaskState, recId, pInfo->primaryTsIndex);
×
1362
    code = generateDataScanRange(pInfo, GET_TASKID(pTaskInfo));
×
1363
    QUERY_CHECK_CODE(code, lino, _end);
×
1364
    pInfo->basic.pTsDataState->curRecId = recId;
×
1365
  }
1366

1367
  code = doDataRangeScan(pInfo, pTaskInfo, ppRes);
×
1368
  QUERY_CHECK_CODE(code, lino, _end);
×
1369

1370
  if ((*ppRes) == NULL) {
×
1371
    pInfo->stateStore.streamStateSessionDeleteAll(pInfo->basic.pTsDataState->pState);
×
1372
    pInfo->basic.pTsDataState->curRecId = -1;
×
1373
    pTaskInfo->streamInfo.recoverScanFinished = true;
×
1374
    qInfo("===stream===%s recalculate is finished.", GET_TASKID(pTaskInfo));
×
1375
  }
1376

1377
_end:
×
1378
  if (code != TSDB_CODE_SUCCESS) {
×
1379
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1380
    pTaskInfo->code = code;
×
1381
  }
1382
  return code;
×
1383
}
1384

1385
static int32_t doStreamRecalculateBlockScan(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
1386
  int32_t          code = TSDB_CODE_SUCCESS;
×
1387
  int32_t          lino = 0;
×
1388
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
1389
  SStreamScanInfo* pInfo = pOperator->info;
×
1390
  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
×
1391

1392
  qDebug("===stream===%s doStreamRecalculateBlockScan", GET_TASKID(pTaskInfo));
×
1393

1394
  if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
×
1395
    if (pInfo->pRangeScanRes != NULL) {
×
1396
      (*ppRes) = pInfo->pRangeScanRes;
×
1397
      pInfo->pRangeScanRes = NULL;
×
1398
      goto _end;
×
1399
    }
1400
    SSDataBlock* pSDB = NULL;
×
1401
    code = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex, &pSDB);
×
1402
    QUERY_CHECK_CODE(code, lino, _end);
×
1403
    if (pSDB) {
×
1404
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
×
1405
      pSDB->info.type = STREAM_PULL_DATA;
×
1406
      
1407
      printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo));
×
1408
      code = calBlockTbName(pInfo, pSDB, 0);
×
1409
      QUERY_CHECK_CODE(code, lino, _end);
×
1410

1411
      if (pInfo->pCreateTbRes->info.rows > 0) {
×
1412
        printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "update",
×
1413
                           GET_TASKID(pTaskInfo));
×
1414
        (*ppRes) = pInfo->pCreateTbRes;
×
1415
        pInfo->pRangeScanRes = pSDB;
×
1416
        goto _end;
×
1417
      }
1418

1419
      (*ppRes) = pSDB;
×
1420
      goto _end;
×
1421
    }
1422
    blockDataCleanup(pInfo->pUpdateRes);
×
1423
    pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
×
1424
    pStreamInfo->recoverScanFinished = true;
×
1425
  }
1426

1427
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
×
1428
  while (1) {
1429
    if (pInfo->validBlockIndex >= total) {
×
1430
      doClearBufferedBlocks(pInfo);
×
1431
      (*ppRes) = NULL;
×
1432
      break;
×
1433
    }
1434
    int32_t current = pInfo->validBlockIndex++;
×
1435
    qDebug("process %d/%d recalculate input data blocks, %s", current, (int32_t)total, GET_TASKID(pTaskInfo));
×
1436

1437
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
×
1438
    QUERY_CHECK_NULL(pPacked, code, lino, _end, terrno);
×
1439

1440
    SSDataBlock* pBlock = pPacked->pDataBlock;
×
1441
    pBlock->info.calWin.skey = INT64_MIN;
×
1442
    pBlock->info.calWin.ekey = INT64_MAX;
×
1443
    pBlock->info.dataLoad = 1;
×
1444

1445
    code = blockDataUpdateTsWindow(pBlock, 0);
×
1446
    QUERY_CHECK_CODE(code, lino, _end);
×
1447
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "rec recv", GET_TASKID(pTaskInfo));
×
1448
    switch (pBlock->info.type) {
×
1449
      case STREAM_RETRIEVE: {
×
1450
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
×
1451
        code = copyDataBlock(pInfo->pUpdateRes, pBlock);
×
1452
        QUERY_CHECK_CODE(code, lino, _end);
×
1453
        pInfo->updateResIndex = 0;
×
1454
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
×
1455
        (*ppRes) = pInfo->pUpdateRes;
×
1456
        goto _end;
×
1457
      } break;
1458
      default: {
×
1459
        (*ppRes) = pBlock;
×
1460
        goto _end;
×
1461
      } break;
1462
    }
1463
  }
1464

1465
_end:
×
1466
  if (code != TSDB_CODE_SUCCESS) {
×
1467
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1468
    pTaskInfo->code = code;
×
1469
  }
1470
  return code;
×
1471
}
1472

1473
int32_t doStreamRecalculateScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
1474
  int32_t          code = TSDB_CODE_SUCCESS;
×
1475
  int32_t          lino = 0;
×
1476
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
×
1477
  char*            pTaskIdStr = GET_TASKID(pTaskInfo);
×
1478
  SStreamScanInfo* pInfo = pOperator->info;
×
1479

1480
  qDebug("stream recalculate scan started, %s", pTaskIdStr);
×
1481

1482
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
×
1483
  switch (pInfo->blockType) {
×
1484
    case STREAM_INPUT__DATA_BLOCK: {
×
1485
      code = doStreamRecalculateBlockScan(pOperator, ppRes);
×
1486
      QUERY_CHECK_CODE(code, lino, _end);
×
1487
    } break;
×
1488
    case STREAM_INPUT__CHECKPOINT: {
×
1489
      doClearBufferedBlocks(pInfo);
×
1490
      (*ppRes) = NULL;
×
1491
      qDebug("===stream===%s process input data blocks,size:%d", pTaskIdStr, (int32_t)total);
×
1492
    } break;
×
1493
    case STREAM_INPUT__RECALCULATE:
×
1494
    default: {
1495
      doClearBufferedBlocks(pInfo);
×
1496
      if(isFinalOperator(&pInfo->basic)) {
×
1497
        code = buildStreamRecalculateBlock(pOperator, ppRes);
×
1498
        QUERY_CHECK_CODE(code, lino, _end);
×
1499
      } else if (isSingleOperator(&pInfo->basic)) {
×
1500
        code = doStreamRecalculateDataScan(pOperator, ppRes);
×
1501
        QUERY_CHECK_CODE(code, lino, _end);
×
1502
      } else {
1503
        qDebug("===stream===%s return empty block", pTaskIdStr);
×
1504
        (*ppRes) = NULL;
×
1505
      }
1506
    } break;
×
1507
  }
1508

1509
_end:
×
1510
  if (code != TSDB_CODE_SUCCESS) {
×
1511
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1512
    pTaskInfo->code = code;
×
1513
  }
1514
  return code;
×
1515
}
1516

1517
static void destroyStreamRecalculateParam(SStreamRecParam* pParam) {
×
1518
  tSimpleHashCleanup(pParam->pColIdMap);
×
1519
  pParam->pColIdMap = NULL;
×
1520
}
×
1521

1522
static void destroyStreamDataScanOperatorInfo(void* param) {
×
1523
  if (param == NULL) {
×
1524
    return;
×
1525
  }
1526

1527
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
×
1528
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
×
1529
    destroyOperator(pStreamScan->pTableScanOp);
×
1530
  }
1531

1532
  if (pStreamScan->pRecTableScanOp && pStreamScan->pRecTableScanOp->info) {
×
1533
    destroyOperator(pStreamScan->pRecTableScanOp);
×
1534
  }
1535

1536
  if (pStreamScan->tqReader != NULL && pStreamScan->readerFn.tqReaderClose != NULL) {
×
1537
    pStreamScan->readerFn.tqReaderClose(pStreamScan->tqReader);
×
1538
  }
1539
  if (pStreamScan->matchInfo.pList) {
×
1540
    taosArrayDestroy(pStreamScan->matchInfo.pList);
×
1541
  }
1542
  if (pStreamScan->pPseudoExpr) {
×
1543
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
×
1544
    taosMemoryFree(pStreamScan->pPseudoExpr);
×
1545
  }
1546

1547
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
×
1548
  cleanupExprSupp(&pStreamScan->tagCalSup);
×
1549

1550
  blockDataDestroy(pStreamScan->pRes);
×
1551
  blockDataDestroy(pStreamScan->pCreateTbRes);
×
1552
  taosArrayDestroy(pStreamScan->pBlockLists);
×
1553
  blockDataDestroy(pStreamScan->pCheckpointRes);
×
1554
  blockDataDestroy(pStreamScan->pDeleteDataRes);
×
1555
  blockDataDestroy(pStreamScan->pUpdateRes);
×
1556

1557
  if (pStreamScan->stateStore.streamStateDestroyTsDataState) {
×
1558
    pStreamScan->stateStore.streamStateDestroyTsDataState(pStreamScan->basic.pTsDataState);
×
1559
    pStreamScan->basic.pTsDataState = NULL;
×
1560
  }
1561

1562
  if (pStreamScan->stateStore.updateInfoDestroy != NULL && pStreamScan->pUpdateInfo != NULL) {
×
1563
    pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo);
×
1564
  }
1565
  tSimpleHashCleanup(pStreamScan->pRecRangeMap);
×
1566
  pStreamScan->pRecRangeMap = NULL;
×
1567

1568
  taosArrayDestroy(pStreamScan->pRecRangeRes);
×
1569
  pStreamScan->pRecRangeRes = NULL;
×
1570

1571
  destroyStreamRecalculateParam(&pStreamScan->recParam);
×
1572

1573
  taosMemoryFree(pStreamScan);
×
1574
}
1575

1576
#if 0
1577
static int32_t doStreamScanTest(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1578
  int32_t          code = TSDB_CODE_SUCCESS;
1579
  int32_t          lino = 0;
1580
  SStreamScanInfo* pInfo = pOperator->info;
1581
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1582
  SSDataBlock* pBlock = NULL;
1583

1584
  pInfo->basic.pTsDataState->pStreamTaskState = (SStreamState*)taosMemoryCalloc(1, sizeof(SStreamState));
1585
  QUERY_CHECK_NULL(pInfo->basic.pTsDataState->pStreamTaskState, code, lino, _end, terrno);
1586
  *((SStreamState*)pInfo->basic.pTsDataState->pStreamTaskState) = *pTaskInfo->streamInfo.pState;
1587

1588
  doStreamDataScanNext(pOperator, ppRes);
1589
  if ((*ppRes) != NULL) {
1590
    return code;
1591
  }
1592

1593
  {
1594
    code = createDataBlock(&pBlock);
1595
    QUERY_CHECK_CODE(code, lino, _end);
1596
    pBlock->info.type = STREAM_RECALCULATE_START;
1597
    SPackedData pack = {0};
1598
    pack.pDataBlock = pBlock;
1599
    void* pBuf = taosArrayPush(pInfo->pBlockLists, &pack);
1600
    QUERY_CHECK_NULL(pBuf, code, lino, _end, terrno);
1601
  }
1602

1603
  pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
1604
  doStreamDataScanNext(pOperator, ppRes);
1605

1606
  SOperatorInfo* pRoot = pTaskInfo->pRoot;
1607
  SStreamIntervalSliceOperatorInfo* pIntervalInfo = pRoot->info;
1608
  setRecalculateOperatorFlag(&pIntervalInfo->basic);
1609
  code = doStreamRecalculateScanNext(pOperator, ppRes);
1610
  QUERY_CHECK_CODE(code, lino, _end);
1611

1612
  if ((*ppRes) == NULL) {
1613
    // unsetRecalculateOperatorFlag(&pIntervalInfo->basic);
1614
    // pBlock->info.type = STREAM_RECALCULATE_END;
1615
    // SPackedData pack = {0};
1616
    // pack.pDataBlock = pBlock;
1617
    // void* pBuf = taosArrayPush(pInfo->pBlockLists, &pack);
1618
    // QUERY_CHECK_NULL(pBuf, code, lino, _end, terrno);
1619
    // doStreamDataScanNext(pOperator, ppRes);
1620
    // pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
1621
  }
1622

1623
_end:
1624
  blockDataDestroy(pBlock);
1625
  if (code != TSDB_CODE_SUCCESS) {
1626
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1627
    (*ppRes) = NULL;
1628
  }
1629
  return code;
1630
}
1631
#endif
1632

1633
static void initStreamRecalculateParam(STableScanPhysiNode* pTableScanNode, SStreamRecParam* pParam) {
×
1634
  tstrncpy(pParam->pStbFullName, pTableScanNode->pStbFullName, TSDB_TABLE_FNAME_LEN);
×
1635
  tstrncpy(pParam->pWstartName, pTableScanNode->pWstartName, TSDB_COL_NAME_LEN);
×
1636
  tstrncpy(pParam->pWendName, pTableScanNode->pWendName, TSDB_COL_NAME_LEN);
×
1637
  tstrncpy(pParam->pGroupIdName, pTableScanNode->pGroupIdName, TSDB_COL_NAME_LEN);
×
1638
  tstrncpy(pParam->pIsWindowFilledName, pTableScanNode->pIsWindowFilledName, TSDB_COL_NAME_LEN);
×
1639

1640

1641
  pParam->sqlCapcity = tListLen(pParam->pSql);
×
1642
  (void)tsnprintf(pParam->pUrl, tListLen(pParam->pUrl), "http://%s:%d/rest/sql", tsAdapterFqdn, tsAdapterPort);
×
1643
  (void)tsnprintf(pParam->pAuth, tListLen(pParam->pAuth), "Authorization: Basic %s", tsAdapterToken);
×
1644

1645
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
1646
  pParam->pColIdMap = tSimpleHashInit(32, hashFn);
×
1647
}
×
1648

1649
int32_t createStreamDataScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
×
1650
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
1651
                                         SOperatorInfo** pOptrInfo) {
1652
  QRY_PARAM_CHECK(pOptrInfo);
×
1653

1654
  int32_t             code = TSDB_CODE_SUCCESS;
×
1655
  int32_t             lino = 0;
×
1656
  SArray*             pColIds = NULL;
×
1657
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
×
1658
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
×
1659
  SStreamScanInfo*    pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
×
1660
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
1661
  const char*         idstr = pTaskInfo->id.str;
×
1662
  SStorageAPI*        pAPI = &pTaskInfo->storageAPI;
×
1663

1664
  if (pInfo == NULL || pOperator == NULL) {
×
1665
    code = terrno;
×
1666
    goto _error;
×
1667
  }
1668

1669
  pInfo->pTagCond = pTagCond;
×
1670
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
×
1671

1672
  int32_t numOfCols = 0;
×
1673
  code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
×
1674
  if (code != TSDB_CODE_SUCCESS) {
×
1675
    goto _error;
×
1676
  }
1677

1678
  pInfo->basic.primaryPkIndex = -1;
×
1679
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
×
1680
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
×
1681
  QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno);
×
1682

1683
  SDataType pkType = {0};
×
1684
  for (int32_t i = 0; i < numOfOutput; ++i) {
×
1685
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
×
1686
    QUERY_CHECK_NULL(id, code, lino, _error, terrno);
×
1687

1688
    int16_t colId = id->colId;
×
1689
    void*   tmp = taosArrayPush(pColIds, &colId);
×
1690
    QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
×
1691

1692
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
×
1693
      pInfo->primaryTsIndex = id->dstSlotId;
×
1694
    }
1695
    if (id->isPk) {
×
1696
      pInfo->basic.primaryPkIndex = id->dstSlotId;
×
1697
      pkType = id->dataType;
×
1698
    }
1699
  }
1700

1701
  pInfo->pPartTbnameSup = NULL;
×
1702
  if (pTableScanNode->pSubtable != NULL) {
×
1703
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
×
1704
    QUERY_CHECK_NULL(pSubTableExpr, code, lino, _error, terrno);
×
1705

1706
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
×
1707
    code = createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
×
1708
    QUERY_CHECK_CODE(code, lino, _error);
×
1709

1710
    code = initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1, &pTaskInfo->storageAPI.functionStore);
×
1711
    QUERY_CHECK_CODE(code, lino, _error);
×
1712
  }
1713

1714
  if (pTableScanNode->pTags != NULL) {
×
1715
    int32_t    numOfTags;
1716
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
×
1717
    QUERY_CHECK_NULL(pTagExpr, code, lino, _error, terrno);
×
1718
    code = initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore);
×
1719
    QUERY_CHECK_CODE(code, lino, _error);
×
1720
  }
1721

1722
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
×
1723
  TSDB_CHECK_NULL(pInfo->pBlockLists, code, lino, _error, terrno);
×
1724

1725
  pInfo->pTableScanOp = NULL;
×
1726
  if (pHandle->vnode) {
×
1727
    code = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pInfo->pTableScanOp);
×
1728
    QUERY_CHECK_CODE(code, lino, _error);
×
1729

1730
    STableScanInfo* pTSInfo = (STableScanInfo*)pInfo->pTableScanOp->info;
×
1731
    if (pHandle->version > 0) {
×
1732
      pTSInfo->base.cond.endVersion = pHandle->version;
×
1733
    }
1734

1735
    STableKeyInfo* pList = NULL;
×
1736
    int32_t        num = 0;
×
1737
    code = tableListGetGroupList(pTableListInfo, 0, &pList, &num);
×
1738
    QUERY_CHECK_CODE(code, lino, _error);
×
1739

1740
    if (pHandle->initTqReader) {
×
1741
      pInfo->tqReader = pAPI->tqReaderFn.tqReaderOpen(pHandle->vnode);
×
1742
      QUERY_CHECK_NULL(pInfo->tqReader, code, lino, _error, terrno);
×
1743
    } else {
1744
      pInfo->tqReader = pHandle->tqReader;
×
1745
      QUERY_CHECK_NULL(pInfo->tqReader, code, lino, _error, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1746
    }
1747

1748
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
×
1749
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
×
1750
    QUERY_CHECK_NULL(pInfo->pCreateTbRes, code, lino, _error, terrno);
×
1751

1752
    code = blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
×
1753
    QUERY_CHECK_CODE(code, lino, _error);
×
1754

1755
    // set the extract column id to streamHandle
1756
    pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds);
×
1757

1758
    SArray* tableIdList = NULL;
×
1759
    code = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo, &tableIdList);
×
1760
    QUERY_CHECK_CODE(code, lino, _error);
×
1761
    pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList, idstr);
×
1762
    taosArrayDestroy(tableIdList);
×
1763
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
×
1764

1765
    STableListInfo* pRecTableListInfo = tableListCreate();
×
1766
    QUERY_CHECK_NULL(pRecTableListInfo, code, lino, _error, terrno);
×
1767
    code = createTableScanOperatorInfo(pTableScanNode, pHandle, pRecTableListInfo, pTaskInfo, &pInfo->pRecTableScanOp);
×
1768
    QUERY_CHECK_CODE(code, lino, _error);
×
1769
  } else {
1770
    taosArrayDestroy(pColIds);
×
1771
    tableListDestroy(pTableListInfo);
×
1772
  }
1773

1774
  // clear the local variable to avoid repeatly free
1775
  pColIds = NULL;
×
1776

1777
  // create the pseduo columns info
1778
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
×
1779
    code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr);
×
1780
    QUERY_CHECK_CODE(code, lino, _error);
×
1781
  }
1782

1783
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
×
1784
  QUERY_CHECK_CODE(code, lino, _error);
×
1785

1786
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
×
1787
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
×
1788

1789
  code = createSpecialDataBlock(STREAM_RECALCULATE_DELETE, &pInfo->pDeleteDataRes);
×
1790
  QUERY_CHECK_CODE(code, lino, _error);
×
1791

1792
  code = createSpecialDataBlock(STREAM_RECALCULATE_DATA, &pInfo->pUpdateRes);
×
1793
  QUERY_CHECK_CODE(code, lino, _error);
×
1794
  pInfo->pRecoverRes = NULL;
×
1795

1796
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
×
1797
  pInfo->twAggSup.maxTs = INT64_MIN;
×
1798
  pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
×
1799
  pInfo->readHandle = *pHandle;
×
1800
  pInfo->comparePkColFn = getKeyComparFunc(pkType.type, TSDB_ORDER_ASC);
×
1801
  pInfo->curRange = (SScanRange){0};
×
1802
  pInfo->scanAllTables = false;
×
1803
  pInfo->hasPart = false;
×
1804
  pInfo->assignBlockUid = groupbyTbname(pInfo->pGroupTags);
×
1805

1806
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
×
1807
  QUERY_CHECK_CODE(code, lino, _error);
×
1808

1809
  SStreamState* pTempState = (SStreamState*)taosMemoryCalloc(1, sizeof(SStreamState));
×
1810
  QUERY_CHECK_NULL(pTempState, code, lino, _error, terrno);
×
1811
  (*pTempState) = *pTaskInfo->streamInfo.pState;
×
1812
  pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
×
1813
  pInfo->stateStore.streamStateSetNumber(pTempState, 1, pInfo->primaryTsIndex);
×
1814
  
1815
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
1816
  pInfo->pRecRangeMap = tSimpleHashInit(32, hashFn);
×
1817
  taosArrayDestroy(pInfo->pRecRangeRes);
×
1818
  pInfo->pRecRangeRes = taosArrayInit(64, POINTER_BYTES);
×
1819
  initStreamRecalculateParam(pTableScanNode, &pInfo->recParam);
×
1820

1821
  void* pOtherState = pTaskInfo->streamInfo.pOtherState;
×
1822
  pAPI->stateStore.streamStateInitTsDataState(&pInfo->basic.pTsDataState, pkType.type, pkType.bytes, pTempState, pOtherState);
×
1823
  pAPI->stateStore.streamStateRecoverTsData(pInfo->basic.pTsDataState);
×
1824
  setSingleOperatorFlag(&pInfo->basic);
×
1825

1826
  pInfo->pStreamScanOp = pOperator;
×
1827

1828
  // for stream
1829
  if (pTaskInfo->streamInfo.pState) {
×
1830
    void*   buff = NULL;
×
1831
    int32_t len = 0;
×
1832
    int32_t res = pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_DATA_SCAN_OP_CHECKPOINT_NAME,
×
1833
                                                      strlen(STREAM_DATA_SCAN_OP_CHECKPOINT_NAME), &buff, &len);
1834
  }
1835

1836
  setOperatorInfo(pOperator, STREAM_DATA_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED,
×
1837
                  pInfo, pTaskInfo);
1838
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
×
1839

1840
  if (pHandle->fillHistory == STREAM_RECALCUL_OPERATOR) {
×
1841
    pOperator->fpSet =
1842
        createOperatorFpSet(optrDummyOpenFn, doStreamRecalculateScanNext, NULL, destroyStreamDataScanOperatorInfo,
×
1843
                            optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1844
  } else {
1845
    pOperator->fpSet =
1846
        createOperatorFpSet(optrDummyOpenFn, doStreamDataScanNext, NULL, destroyStreamDataScanOperatorInfo,
×
1847
                            optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1848
  }
1849
  // doStreamScanTest
1850
  // pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamScanTest, NULL, destroyStreamDataScanOperatorInfo,
1851
  //                                        optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1852
  setOperatorStreamStateFn(pOperator, streamDataScanReleaseState, streamDataScanReloadState);
×
1853

1854
  *pOptrInfo = pOperator;
×
1855
  return code;
×
1856

1857
_error:
×
1858
  if (pColIds != NULL) {
×
1859
    taosArrayDestroy(pColIds);
×
1860
  }
1861

1862
  if (pInfo != NULL) {
×
1863
    STableScanInfo* p = (STableScanInfo*)pInfo->pTableScanOp->info;
×
1864
    if (p != NULL) {
×
1865
      p->base.pTableListInfo = NULL;
×
1866
    }
1867
    destroyStreamDataScanOperatorInfo(pInfo);
×
1868
  }
1869

1870
  if (pOperator != NULL) {
×
1871
    pOperator->info = NULL;
×
1872
    destroyOperator(pOperator);
×
1873
  }
1874
  pTaskInfo->code = code;
×
1875
  return code;
×
1876
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc