• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3903

24 Apr 2025 11:36AM UTC coverage: 55.307% (+0.09%) from 55.213%
#3903

push

travis-ci

happyguoxy
Sync branches at 2025-04-24 19:35

175024 of 316459 relevant lines covered (55.31%)

1151858.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/streamtimesliceoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "storageapi.h"
22
#include "streamexecutorInt.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "ttime.h"
29

30
#define STREAM_TIME_SLICE_OP_STATE_NAME      "StreamTimeSliceHistoryState"
31
#define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint"
32

33
int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap) {
×
34
  return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), NULL, 0);
×
35
}
36

37
void streamTimeSliceReleaseState(SOperatorInfo* pOperator) {
×
38
  int32_t                       code = TSDB_CODE_SUCCESS;
×
39
  int32_t                       lino = 0;
×
40
  SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
×
41
  int32_t                       winNum = taosArrayGetSize(pInfo->historyWins);
×
42

43
  int32_t winSize = winNum * sizeof(SWinKey);
×
44
  int32_t resSize = winSize + sizeof(TSKEY);
×
45
  char*   pBuff = taosMemoryCalloc(1, resSize);
×
46
  QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
×
47

48
  if (winNum > 0) {
×
49
    memcpy(pBuff, pInfo->historyWins->pData, winSize);
×
50
  }
51
  memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
×
52
  qDebug("===stream=== time slice operator relase state. save result count:%d", winNum);
×
53
  pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_TIME_SLICE_OP_STATE_NAME,
×
54
                                                     strlen(STREAM_TIME_SLICE_OP_STATE_NAME), pBuff, resSize);
55
  pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
×
56
  taosMemoryFreeClear(pBuff);
×
57

58
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
59
  if (downstream->fpSet.releaseStreamStateFn) {
×
60
    downstream->fpSet.releaseStreamStateFn(downstream);
×
61
  }
62

63
_end:
×
64
  if (code != TSDB_CODE_SUCCESS) {
×
65
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
66
  }
67
}
×
68

69
void streamTimeSliceReloadState(SOperatorInfo* pOperator) {
×
70
  int32_t                       code = TSDB_CODE_SUCCESS;
×
71
  int32_t                       lino = 0;
×
72
  SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
×
73
  SStreamAggSupporter*          pAggSup = &pInfo->streamAggSup;
×
74
  SExecTaskInfo*                pTaskInfo = pOperator->pTaskInfo;
×
75
  SStreamFillSupporter*         pFillSup = pInfo->pFillSup;
×
76
  resetWinRange(&pAggSup->winRange);
×
77

78
  int32_t size = 0;
×
79
  void*   pBuf = NULL;
×
80
  code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_TIME_SLICE_OP_STATE_NAME,
×
81
                                                strlen(STREAM_TIME_SLICE_OP_STATE_NAME), &pBuf, &size);
82
  QUERY_CHECK_CODE(code, lino, _end);
×
83

84
  int32_t num = (size - sizeof(TSKEY)) / sizeof(SWinKey);
×
85
  qDebug("===stream=== time slice operator reload state. get result count:%d", num);
×
86
  SWinKey* pKeyBuf = (SWinKey*)pBuf;
×
87
  QUERY_CHECK_CONDITION((size == num * sizeof(SWinKey) + sizeof(TSKEY)), code, lino, _end,
×
88
                        TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
89

90
  TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY));
×
91
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
×
92
  pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts);
×
93
  qDebug("===stream=== reload state. reload ts:%" PRId64, ts);
×
94

95
  if (!pInfo->pUpdatedMap && num > 0) {
×
96
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
97
    pInfo->pUpdatedMap = tSimpleHashInit(64, hashFn);
×
98
    QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
×
99
  }
100

101
  int32_t tmpRes = TSDB_CODE_SUCCESS;
×
102
  for (int32_t i = 0; i < num; i++) {
×
103
    SWinKey* pKey = pKeyBuf + i;
×
104
    SWinKey  resKey = {.groupId = pKey->groupId};
×
105
    if (pFillSup->type != TSDB_FILL_PREV && pFillSup->type != TSDB_FILL_LINEAR) {
×
106
      code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, pKey, &resKey, NULL, NULL, &tmpRes);
×
107
      QUERY_CHECK_CODE(code, lino, _end);
×
108

109
      if (tmpRes != TSDB_CODE_SUCCESS) {
×
110
        continue;
×
111
      }
112
    } else {
113
      resKey = *pKey;
×
114
    }
115
    qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", resKey.ts,
×
116
           resKey.groupId, i);
117
    code = saveTimeSliceWinResult(&resKey, pInfo->pUpdatedMap);
×
118
    QUERY_CHECK_CODE(code, lino, _end);
×
119
  }
120
  taosMemoryFree(pBuf);
×
121

122
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
123
  if (downstream->fpSet.reloadStreamStateFn) {
×
124
    downstream->fpSet.reloadStreamStateFn(downstream);
×
125
  }
126
  reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
×
127

128
_end:
×
129
  if (code != TSDB_CODE_SUCCESS) {
×
130
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
131
  }
132
}
×
133

134
static void resetFillWindow(SResultRowData* pRowData) {
×
135
  pRowData->key = INT64_MIN;
×
136
  pRowData->pRowVal = NULL;
×
137
}
×
138

139
void resetTimeSlicePrevAndNextWindow(SStreamFillSupporter* pFillSup) {
×
140
  resetFillWindow(&pFillSup->cur);
×
141
  resetFillWindow(&pFillSup->prev);
×
142
  resetFillWindow(&pFillSup->next);
×
143
  resetFillWindow(&pFillSup->nextNext);
×
144
}
×
145

146
void destroyStreamTimeSliceOperatorInfo(void* param) {
×
147
  SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)param;
×
148
  if (pInfo->pOperator) {
×
149
    cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
×
150
                              &pInfo->groupResInfo);
151
    pInfo->pOperator = NULL;
×
152
  }
153
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
154

155
  resetTimeSlicePrevAndNextWindow(pInfo->pFillSup);
×
156
  destroyStreamFillSupporter(pInfo->pFillSup);
×
157
  destroyStreamFillInfo(pInfo->pFillInfo);
×
158
  blockDataDestroy(pInfo->pRes);
×
159
  blockDataDestroy(pInfo->pDelRes);
×
160
  blockDataDestroy(pInfo->pCheckpointRes);
×
161

162
  taosMemoryFreeClear(pInfo->leftRow.pRowVal);
×
163
  taosMemoryFreeClear(pInfo->valueRow.pRowVal);
×
164
  taosMemoryFreeClear(pInfo->rightRow.pRowVal);
×
165

166
  cleanupExprSupp(&pInfo->scalarSup);
×
167
  taosArrayDestroy(pInfo->historyPoints);
×
168

169
  taosArrayDestroy(pInfo->pUpdated);
×
170
  pInfo->pUpdated = NULL;
×
171

172
  tSimpleHashCleanup(pInfo->pUpdatedMap);
×
173
  pInfo->pUpdatedMap = NULL;
×
174

175
  taosArrayDestroy(pInfo->pDelWins);
×
176
  tSimpleHashCleanup(pInfo->pDeletedMap);
×
177
  clearGroupResArray(&pInfo->groupResInfo);
×
178

179
  taosArrayDestroy(pInfo->historyWins);
×
180

181
  taosArrayDestroy(pInfo->pCloseTs);
×
182
  destroyStreamAggSupporter(&pInfo->streamAggSup);
×
183

184
  destroyStreamBasicInfo(&pInfo->basic);
×
185

186
  taosMemoryFreeClear(param);
×
187
}
×
188

189
int32_t doStreamTimeSliceEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, int32_t* pLen) {
×
190
  int32_t                       code = TSDB_CODE_SUCCESS;
×
191
  SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
×
192
  if (!pInfo) {
×
193
    return TSDB_CODE_FAILED;
×
194
  }
195

196
  void* pData = (buf == NULL) ? NULL : *buf;
×
197

198
  // 1.streamAggSup.pResultRows
199
  int32_t tlen = 0;
×
200
  int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
×
201
  tlen += taosEncodeFixedI32(buf, mapSize);
×
202
  void*   pIte = NULL;
×
203
  size_t  keyLen = 0;
×
204
  int32_t iter = 0;
×
205
  while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
×
206
    void* pKey = tSimpleHashGetKey(pIte, &keyLen);
×
207
    tlen += encodeSSessionKey(buf, pKey);
×
208
    tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
×
209
  }
210

211
  // 2.twAggSup
212
  tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
213

214
  // 3.checksum
215
  if (buf) {
×
216
    uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
×
217
    tlen += taosEncodeFixedU32(buf, cksum);
×
218
  } else {
219
    tlen += sizeof(uint32_t);
×
220
  }
221

222
  (*pLen) = tlen;
×
223
  return code;
×
224
}
225

226
int32_t doStreamTimeSliceDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) {
×
227
  int32_t                       code = TSDB_CODE_SUCCESS;
×
228
  int32_t                       lino = 0;
×
229
  SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
×
230
  SExecTaskInfo*                pTaskInfo = pOperator->pTaskInfo;
×
231
  if (!pInfo) {
×
232
    code = TSDB_CODE_FAILED;
×
233
    QUERY_CHECK_CODE(code, lino, _end);
×
234
  }
235
  SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
×
236

237
  // 3.checksum
238
  int32_t dataLen = len - sizeof(uint32_t);
×
239
  void*   pCksum = POINTER_SHIFT(buf, dataLen);
×
240
  if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
×
241
    qError("stream event state is invalid");
×
242
    code = TSDB_CODE_FAILED;
×
243
    QUERY_CHECK_CODE(code, lino, _end);
×
244
  }
245

246
  // 1.streamAggSup.pResultRows
247
  int32_t mapSize = 0;
×
248
  buf = taosDecodeFixedI32(buf, &mapSize);
×
249
  for (int32_t i = 0; i < mapSize; i++) {
×
250
    SResultWindowInfo winfo = {0};
×
251
    buf = decodeSSessionKey(buf, &winfo.sessionWin);
×
252
    int32_t winCode = TSDB_CODE_SUCCESS;
×
253
    code = pAggSup->stateStore.streamStateSessionAddIfNotExist(
×
254
        pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode);
255
    QUERY_CHECK_CODE(code, lino, _end);
×
256

257
    buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
×
258
    code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo,
×
259
                          sizeof(SResultWindowInfo));
260
    QUERY_CHECK_CODE(code, lino, _end);
×
261
  }
262

263
  // 2.twAggSup
264
  buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
×
265

266
_end:
×
267
  if (code != TSDB_CODE_SUCCESS) {
×
268
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
269
  }
270
  return code;
×
271
}
272

273
static int32_t initTimeSliceResultBuf(SStreamFillSupporter* pFillSup, SExprSupp* pExpSup) {
×
274
  pFillSup->rowSize = sizeof(TSKEY) + getResultRowSize(pExpSup->pCtx, pFillSup->numOfAllCols);
×
275
  pFillSup->next.key = INT64_MIN;
×
276
  pFillSup->nextNext.key = INT64_MIN;
×
277
  pFillSup->prev.key = INT64_MIN;
×
278
  pFillSup->cur.key = INT64_MIN;
×
279
  pFillSup->next.pRowVal = NULL;
×
280
  pFillSup->nextNext.pRowVal = NULL;
×
281
  pFillSup->prev.pRowVal = NULL;
×
282
  pFillSup->cur.pRowVal = NULL;
×
283

284
  return TSDB_CODE_SUCCESS;
×
285
}
286

287
int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes) {
×
288
  int32_t  code = TSDB_CODE_SUCCESS;
×
289
  int32_t  lino = 0;
×
290
  int32_t  numOfCol = taosArrayGetSize(pRes->pDataBlock);
×
291
  int32_t  preLength = 0;
×
292
  int32_t* pOffsetInfo = taosMemoryCalloc(numOfCol, sizeof(int32_t));
×
293
  QUERY_CHECK_NULL(pOffsetInfo, code, lino, _end, lino);
×
294

295
  for (int32_t i = 0; i < numOfCol; i++) {
×
296
    SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, i);
×
297
    pOffsetInfo[i] = preLength;
×
298
    int32_t bytes = 1;
×
299
    if (pColInfo != NULL) {
×
300
      bytes = pColInfo->info.bytes;
×
301
    }
302
    preLength += bytes + sizeof(SResultCellData);
×
303
  }
304

305
  (*ppOffset) = pOffsetInfo;
×
306

307
_end:
×
308
  if (code != TSDB_CODE_SUCCESS) {
×
309
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
310
  }
311
  return code;
×
312
}
313
static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs,
×
314
                                    SSDataBlock* pInputRes, SColumnInfo* pPkCol, SStreamFillSupporter** ppResFillSup) {
315
  int32_t               code = TSDB_CODE_SUCCESS;
×
316
  int32_t               lino = 0;
×
317
  SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
×
318
  QUERY_CHECK_NULL(pFillSup, code, lino, _end, terrno);
×
319

320
  pFillSup->numOfFillCols = numOfExprs;
×
321
  int32_t numOfNotFillCols = 0;
×
322
  pFillSup->pAllColInfo = createFillColInfo(pExprSup->pExprInfo, pFillSup->numOfFillCols, NULL, numOfNotFillCols, NULL,
×
323
                                            0, (const SNodeListNode*)(pPhyFillNode->pFillValues));
×
324
  QUERY_CHECK_NULL(pFillSup->pAllColInfo, code, lino, _end, terrno);
×
325

326
  pFillSup->type = convertFillType(pPhyFillNode->fillMode);
×
327
  pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols;
×
328
  pFillSup->interval.interval = pPhyFillNode->interval;
×
329
  pFillSup->interval.intervalUnit = pPhyFillNode->intervalUnit;
×
330
  pFillSup->interval.offset = 0;
×
331
  pFillSup->interval.offsetUnit = pPhyFillNode->intervalUnit;
×
332
  pFillSup->interval.precision = pPhyFillNode->precision;
×
333
  pFillSup->interval.sliding = pPhyFillNode->interval;
×
334
  pFillSup->interval.slidingUnit = pPhyFillNode->intervalUnit;
×
335
  pFillSup->pAPI = NULL;
×
336
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
337
  pFillSup->pResMap = tSimpleHashInit(16, hashFn);
×
338
  QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno);
×
339

340
  code = initTimeSliceResultBuf(pFillSup, pExprSup);
×
341
  QUERY_CHECK_CODE(code, lino, _end);
×
342

343
  pFillSup->hasDelete = false;
×
344
  if (pPkCol != NULL) {
×
345
    pFillSup->pkColBytes = pPkCol->bytes;
×
346
    pFillSup->comparePkColFn = getKeyComparFunc(pPkCol->type, TSDB_ORDER_ASC);
×
347
  } else {
348
    pFillSup->pkColBytes = 0;
×
349
    pFillSup->comparePkColFn = NULL;
×
350
  }
351

352
  code = initOffsetInfo(&pFillSup->pOffsetInfo, pInputRes);
×
353
  QUERY_CHECK_CODE(code, lino, _end);
×
354
  pFillSup->normalFill = false;
×
355

356
  (*ppResFillSup) = pFillSup;
×
357

358
_end:
×
359
  if (code != TSDB_CODE_SUCCESS) {
×
360
    destroyStreamFillSupporter(pFillSup);
×
361
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
362
  }
363
  return code;
×
364
}
365

366
static void doStreamTimeSliceSaveCheckpoint(SOperatorInfo* pOperator) {
×
367
  SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
×
368
  int32_t                       code = TSDB_CODE_SUCCESS;
×
369
  int32_t                       lino = 0;
×
370
  void*                         buf = NULL;
×
371
  if (needSaveStreamOperatorInfo(&pInfo->basic)) {
×
372
    int32_t len = 0;
×
373
    code = doStreamTimeSliceEncodeOpState(NULL, 0, pOperator, &len);
×
374
    QUERY_CHECK_CODE(code, lino, _end);
×
375

376
    buf = taosMemoryCalloc(1, len);
×
377
    QUERY_CHECK_NULL(buf, code, lino, _end, terrno);
×
378

379
    void* pBuf = buf;
×
380
    code = doStreamTimeSliceEncodeOpState(&pBuf, len, pOperator, &len);
×
381
    QUERY_CHECK_CODE(code, lino, _end);
×
382

383
    pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_TIME_SLICE_OP_CHECKPOINT_NAME,
×
384
                                                       strlen(STREAM_TIME_SLICE_OP_CHECKPOINT_NAME), buf, len);
385
    saveStreamOperatorStateComplete(&pInfo->basic);
×
386
  }
387

388
_end:
×
389
  taosMemoryFreeClear(buf);
×
390
  if (code != TSDB_CODE_SUCCESS) {
×
391
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
392
  }
393
}
×
394

395
SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index, int32_t* pCellOffsetInfo) {
×
396
  if (!pRowVal) {
×
397
    return NULL;
×
398
  }
399
  return POINTER_SHIFT(pRowVal, pCellOffsetInfo[index]);
×
400
}
401

402
static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
×
403
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
×
404
  return (functionType == FUNCTION_TYPE_GROUP_KEY);
×
405
}
406

407
static bool isSelectGroupConstValueFunc(SExprInfo* pExprInfo) {
×
408
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
×
409
  return (functionType == FUNCTION_TYPE_GROUP_CONST_VALUE);
×
410
}
411

412
static bool isWindowFunction(SFillColInfo* pCol) {
×
413
  return (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START ||
×
414
          pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_END ||
×
415
          pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_DURATION ||
×
416
          pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_IS_WINDOW_FILLED);
×
417
}
418

419
static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, SResultRowData* pNonFillRow,
×
420
                               TSKEY ts, SSDataBlock* pBlock, bool* pRes, bool isFilled) {
421
  int32_t code = TSDB_CODE_SUCCESS;
×
422
  int32_t lino = 0;
×
423
  if (pBlock->info.rows >= pBlock->info.capacity) {
×
424
    (*pRes) = false;
×
425
    goto _end;
×
426
  }
427

428
  bool ckRes = true;
×
429
  code = checkResult(pFillSup, ts, pBlock->info.id.groupId, &ckRes);
×
430
  QUERY_CHECK_CODE(code, lino, _end);
×
431
  if (!ckRes) {
×
432
    (*pRes) = true;
×
433
    goto _end;
×
434
  }
435

436
  for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
×
437
    SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
×
438
    int32_t          dstSlotId = GET_DEST_SLOT_ID(pFillCol);
×
439
    SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
×
440

441
    if (isIrowtsPseudoColumn(pFillCol->pExpr)) {
×
442
      code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&ts, false);
×
443
      QUERY_CHECK_CODE(code, lino, _end);
×
444
    } else if (isIsfilledPseudoColumn(pFillCol->pExpr)) {
×
445
      code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false);
×
446
      QUERY_CHECK_CODE(code, lino, _end);
×
447
    } else if (pFillSup->normalFill && isWindowFunction(pFillCol)) {
×
448
      SFillInfo tmpInfo = {
×
449
          .currentKey = ts,
450
          .order = TSDB_ORDER_ASC,
451
          .interval = pFillSup->interval,
452
          .isFilled = isFilled,
453
      };
454
      bool filled = fillIfWindowPseudoColumn(&tmpInfo, pFillCol, pDstCol, pBlock->info.rows);
×
455
      if (!filled) {
×
456
        qError("%s failed at line %d since fill errror", __func__, __LINE__);
×
457
      }
458
    } else {
459
      int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
×
460
      if (pFillSup->normalFill) {
×
461
        srcSlot = dstSlotId;
×
462
      }
463
      SResultCellData* pCell = NULL;
×
464
      if (IS_FILL_CONST_VALUE(pFillSup->type) &&
×
465
          (isGroupKeyFunc(pFillCol->pExpr) || isSelectGroupConstValueFunc(pFillCol->pExpr))) {
×
466
        pCell = getSliceResultCell(pNonFillRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
×
467
      } else {
468
        pCell = getSliceResultCell(pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
×
469
      }
470
      code = setRowCell(pDstCol, pBlock->info.rows, pCell);
×
471
      QUERY_CHECK_CODE(code, lino, _end);
×
472
    }
473
  }
474

475
  pBlock->info.rows++;
×
476
  (*pRes) = true;
×
477

478
_end:
×
479
  if (code != TSDB_CODE_SUCCESS) {
×
480
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
481
  }
482
  return code;
×
483
}
484

485
static void fillNormalRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
×
486
  int32_t code = TSDB_CODE_SUCCESS;
×
487
  int32_t lino = 0;
×
488
  while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
×
489
    STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
×
490
    // if (inWinRange(&pFillSup->winRange, &st)) {
491
    bool res = true;
×
492
    code =
493
        fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->pNonFillRow, pFillInfo->current, pBlock, &res, true);
×
494
    QUERY_CHECK_CODE(code, lino, _end);
×
495
    // }
496
    pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
×
497
                                     pFillSup->interval.precision, NULL);
×
498
  }
499

500
_end:
×
501
  if (code != TSDB_CODE_SUCCESS) {
×
502
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
503
  }
504
}
×
505

506
static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
×
507
  int32_t code = TSDB_CODE_SUCCESS;
×
508
  int32_t lino = 0;
×
509
  while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
×
510
    bool ckRes = true;
×
511
    code = checkResult(pFillSup, pFillInfo->current, pBlock->info.id.groupId, &ckRes);
×
512
    QUERY_CHECK_CODE(code, lino, _end);
×
513
    for (int32_t i = 0; i < pFillSup->numOfAllCols && ckRes; ++i) {
×
514
      SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
×
515
      int32_t          dstSlotId = GET_DEST_SLOT_ID(pFillCol);
×
516
      SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
×
517
      int16_t          type = pDstCol->info.type;
×
518
      int32_t          index = pBlock->info.rows;
×
519
      if (isIrowtsPseudoColumn(pFillCol->pExpr)) {
×
520
        code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&pFillInfo->current, false);
×
521
        QUERY_CHECK_CODE(code, lino, _end);
×
522
      } else if (isIsfilledPseudoColumn(pFillCol->pExpr)) {
×
523
        bool isFilled = true;
×
524
        code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false);
×
525
        QUERY_CHECK_CODE(code, lino, _end);
×
526
      } else if (pFillSup->normalFill && isWindowFunction(pFillCol)) {
×
527
        SFillInfo tmpInfo = {
×
528
            .currentKey = pFillInfo->current,
×
529
            .order = TSDB_ORDER_ASC,
530
            .interval = pFillSup->interval,
531
            .isFilled = true,
532
        };
533
        bool filled = fillIfWindowPseudoColumn(&tmpInfo, pFillCol, pDstCol, pBlock->info.rows);
×
534
        if (!filled) {
×
535
          qError("%s failed at line %d since fill errror", __func__, lino);
×
536
        }
537
      } else if (isInterpFunc(pFillCol->pExpr) || pFillSup->normalFill) {
×
538
        int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
×
539
        if (pFillSup->normalFill) {
×
540
          srcSlot = dstSlotId;
×
541
        }
542
        SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
×
543
        if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
×
544
          colDataSetNULL(pDstCol, index);
×
545
          continue;
×
546
        }
547
        SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, srcSlot);
×
548
        double  vCell = 0;
×
549
        SPoint  start = {0};
×
550
        start.key = pFillInfo->pResRow->key;
×
551
        start.val = pCell->pData;
×
552

553
        SPoint cur = {0};
×
554
        cur.key = pFillInfo->current;
×
555
        cur.val = taosMemoryCalloc(1, pCell->bytes);
×
556
        QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno);
×
557

558
        taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type, typeGetTypeModFromColInfo(&pDstCol->info));
×
559
        code = colDataSetVal(pDstCol, index, (const char*)cur.val, false);
×
560
        QUERY_CHECK_CODE(code, lino, _end);
×
561

562
        destroySPoint(&cur);
×
563
      } else {
564
        int32_t          srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
×
565
        SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
×
566
        code = setRowCell(pDstCol, pBlock->info.rows, pCell);
×
567
        QUERY_CHECK_CODE(code, lino, _end);
×
568
      }
569
    }
570
    pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
×
571
                                     pFillSup->interval.precision, NULL);
×
572
    if (ckRes) {
×
573
      pBlock->info.rows++;
×
574
    }
575
  }
576

577
_end:
×
578
  if (code != TSDB_CODE_SUCCESS) {
×
579
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
580
  }
581
}
×
582

583
static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
×
584
  pFillInfo->start = start;
×
585
  pFillInfo->current = pFillInfo->start;
×
586
  pFillInfo->end = end;
×
587
}
×
588

589
TSKEY adustPrevTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) {
×
590
  if (rowTs >= pointTs) {
×
591
    pointTs = taosTimeAdd(pointTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
×
592
  }
593
  return pointTs;
×
594
}
595

596
TSKEY adustEndTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) {
×
597
  if (rowTs <= pointTs) {
×
598
    pointTs = taosTimeAdd(pointTs, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision, NULL);
×
599
  }
600
  return pointTs;
×
601
}
602

603
static void adjustFillResRow(SResultRowData** ppResRow, SStreamFillSupporter* pFillSup) {
×
604
  if (pFillSup->type == TSDB_FILL_PREV) {
×
605
    (*ppResRow) = &pFillSup->cur;
×
606
  } else if (pFillSup->type == TSDB_FILL_NEXT) {
×
607
    (*ppResRow) = &pFillSup->next;
×
608
  }
609
}
×
610

611
void doStreamTimeSliceFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pRes) {
×
612
  int32_t code = TSDB_CODE_SUCCESS;
×
613
  int32_t lino = 0;
×
614
  bool    res = true;
×
615
  if (pFillInfo->needFill == false && pFillInfo->pos != FILL_POS_INVALID) {
×
616
    code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false);
×
617
    QUERY_CHECK_CODE(code, lino, _end);
×
618
    return;
×
619
  }
620

621
  if (pFillInfo->pos == FILL_POS_START) {
×
622
    code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false);
×
623
    QUERY_CHECK_CODE(code, lino, _end);
×
624
    if (res) {
×
625
      pFillInfo->pos = FILL_POS_INVALID;
×
626
    }
627
  }
628
  if (pFillInfo->type != TSDB_FILL_LINEAR) {
×
629
    fillNormalRange(pFillSup, pFillInfo, pRes);
×
630

631
    if (pFillInfo->pos == FILL_POS_MID) {
×
632
      code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false);
×
633
      QUERY_CHECK_CODE(code, lino, _end);
×
634
      if (res) {
×
635
        pFillInfo->pos = FILL_POS_INVALID;
×
636
      }
637
    }
638
    if (pFillInfo->current > pFillInfo->end && pFillInfo->hasNext) {
×
639
      pFillInfo->hasNext = false;
×
640
      TSKEY startTs = adustPrevTsKey(pFillInfo->current, pFillSup->cur.key, &pFillSup->interval);
×
641
      setFillKeyInfo(startTs, pFillSup->next.key, &pFillSup->interval, pFillInfo);
×
642
      adjustFillResRow(&pFillInfo->pResRow, pFillSup);
×
643
      fillNormalRange(pFillSup, pFillInfo, pRes);
×
644
    }
645

646
  } else {
647
    fillLinearRange(pFillSup, pFillInfo, pRes);
×
648

649
    if (pFillInfo->pos == FILL_POS_MID) {
×
650
      code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false);
×
651
      QUERY_CHECK_CODE(code, lino, _end);
×
652
      if (res) {
×
653
        pFillInfo->pos = FILL_POS_INVALID;
×
654
      }
655
    }
656

657
    if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
×
658
      pFillInfo->pLinearInfo->hasNext = false;
×
659
      taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints);
×
660
      pFillInfo->pResRow = &pFillSup->cur;
×
661
      TSKEY newStart = adustPrevTsKey(pFillSup->cur.key, pFillSup->cur.key, &pFillSup->interval);
×
662
      setFillKeyInfo(newStart, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo);
×
663
      fillLinearRange(pFillSup, pFillInfo, pRes);
×
664
    }
665
  }
666
  if (pFillInfo->pos == FILL_POS_END) {
×
667
    code = fillPointResult(pFillSup, &pFillSup->cur, pFillInfo->pNonFillRow, pFillSup->cur.key, pRes, &res, false);
×
668
    QUERY_CHECK_CODE(code, lino, _end);
×
669
    if (res) {
×
670
      pFillInfo->pos = FILL_POS_INVALID;
×
671
    }
672
  }
673

674
_end:
×
675
  if (code != TSDB_CODE_SUCCESS) {
×
676
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
677
  }
678
}
679

680
static int32_t getQualifiedRowNumAsc(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId, bool ignoreNull) {
×
681
  if (rowId >= pBlock->info.rows) {
×
682
    return -1;
×
683
  }
684

685
  if (!ignoreNull) {
×
686
    return rowId;
×
687
  }
688

689
  for (int32_t i = rowId; i < pBlock->info.rows; i++) {
×
690
    if (!checkNullRow(pExprSup, pBlock, i, ignoreNull)) {
×
691
      return i;
×
692
    }
693
  }
694
  return -1;
×
695
}
696

697
int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId,
×
698
                               bool ignoreNull) {
699
  TSKEY   ts = tsCols[rowId];
×
700
  int32_t resRow = -1;
×
701
  for (; rowId >= 0; rowId--) {
×
702
    if (checkNullRow(pExprSup, pBlock, rowId, ignoreNull)) {
×
703
      continue;
×
704
    }
705

706
    if (ts != tsCols[rowId]) {
×
707
      if (resRow >= 0) {
×
708
        break;
×
709
      } else {
710
        ts = tsCols[rowId];
×
711
      }
712
    }
713
    resRow = rowId;
×
714
  }
715
  return resRow;
×
716
}
717

718
static void setResultRowData(SSliceRowData** ppRowData, void* pBuff) { (*ppRowData) = (SSliceRowData*)pBuff; }
×
719

720
void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup) {
×
721
  if (pFillSup->type != TSDB_FILL_LINEAR) {
×
722
    setResultRowData(&pPoint->pRightRow, pPoint->pResPos->pRowBuff);
×
723
    pPoint->pLeftRow = pPoint->pRightRow;
×
724
  } else {
725
    setResultRowData(&pPoint->pLeftRow, pPoint->pResPos->pRowBuff);
×
726
    void* pBuff = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pFillSup->rowSize + pFillSup->pkColBytes);
×
727
    setResultRowData(&pPoint->pRightRow, pBuff);
×
728
  }
729
}
×
730

731
static int32_t getLinearResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
×
732
                                            int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint,
733
                                            SSlicePoint* pNextPoint) {
734
  int32_t code = TSDB_CODE_SUCCESS;
×
735
  int32_t lino = 0;
×
736
  int32_t tmpRes = TSDB_CODE_SUCCESS;
×
737
  void*   pState = pAggSup->pState;
×
738
  resetTimeSlicePrevAndNextWindow(pFillSup);
×
739
  pCurPoint->pResPos = NULL;
×
740
  pPrevPoint->pResPos = NULL;
×
741
  pNextPoint->pResPos = NULL;
×
742

743
  pCurPoint->key.groupId = groupId;
×
744
  pCurPoint->key.ts = ts;
×
745
  int32_t curVLen = 0;
×
746
  code =
747
      pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen, &tmpRes);
×
748
  QUERY_CHECK_CODE(code, lino, _end);
×
749

750
  setPointBuff(pCurPoint, pFillSup);
×
751

752
  if (HAS_ROW_DATA(pCurPoint->pRightRow)) {
×
753
    pFillSup->cur.key = pCurPoint->pRightRow->key;
×
754
    pFillSup->cur.pRowVal = (SResultCellData*)pCurPoint->pRightRow->pRowVal;
×
755
    if (HAS_NON_ROW_DATA(pCurPoint->pLeftRow)) {
×
756
      pPrevPoint->key.groupId = groupId;
×
757
      int32_t preVLen = 0;
×
758
      code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key,
×
759
                                                        (void**)&pPrevPoint->pResPos, &preVLen, &tmpRes);
×
760
      QUERY_CHECK_CODE(code, lino, _end);
×
761
      if (tmpRes == TSDB_CODE_SUCCESS) {
×
762
        QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts), code, lino, _end,
×
763
                              TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
764
        setPointBuff(pPrevPoint, pFillSup);
×
765
        if (HAS_ROW_DATA(pPrevPoint->pRightRow)) {
×
766
          pFillSup->prev.key = pPrevPoint->pRightRow->key;
×
767
          pFillSup->prev.pRowVal = (SResultCellData*)pPrevPoint->pRightRow->pRowVal;
×
768
        } else {
769
          pFillSup->prev.key = pPrevPoint->pLeftRow->key;
×
770
          pFillSup->prev.pRowVal = (SResultCellData*)pPrevPoint->pLeftRow->pRowVal;
×
771
        }
772
        pFillSup->prevOriginKey = pFillSup->prev.key;
×
773
        pFillSup->prev.key = adustPrevTsKey(pPrevPoint->key.ts, pFillSup->prev.key, &pFillSup->interval);
×
774
      }
775
      goto _end;
×
776
    }
777
  }
778

779
  if (HAS_ROW_DATA(pCurPoint->pLeftRow)) {
×
780
    pFillSup->prev.key = pCurPoint->pLeftRow->key;
×
781
    pFillSup->prev.pRowVal = (SResultCellData*)pCurPoint->pLeftRow->pRowVal;
×
782
    pFillSup->prevOriginKey = pFillSup->prev.key;
×
783
    pFillSup->prev.key = adustPrevTsKey(pCurPoint->key.ts, pFillSup->prev.key, &pFillSup->interval);
×
784
    if (HAS_NON_ROW_DATA(pCurPoint->pRightRow)) {
×
785
      pNextPoint->key.groupId = groupId;
×
786
      int32_t nextVLen = 0;
×
787
      code = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key,
×
788
                                                        (void**)&pNextPoint->pResPos, &nextVLen, &tmpRes);
×
789
      QUERY_CHECK_CODE(code, lino, _end);
×
790
      if (tmpRes == TSDB_CODE_SUCCESS) {
×
791
        QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pNextPoint->key.ts), code, lino, _end,
×
792
                              TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
793
        setPointBuff(pNextPoint, pFillSup);
×
794
        if (HAS_ROW_DATA(pNextPoint->pLeftRow)) {
×
795
          pFillSup->next.key = pNextPoint->pLeftRow->key;
×
796
          pFillSup->next.pRowVal = (SResultCellData*)pNextPoint->pLeftRow->pRowVal;
×
797
        } else {
798
          pFillSup->next.key = pNextPoint->pRightRow->key;
×
799
          pFillSup->next.pRowVal = (SResultCellData*)pNextPoint->pRightRow->pRowVal;
×
800
        }
801
        pFillSup->nextOriginKey = pFillSup->next.key;
×
802
        pFillSup->next.key = adustEndTsKey(pNextPoint->key.ts, pFillSup->next.key, &pFillSup->interval);
×
803
      } else {
804
        resetFillWindow(&pFillSup->prev);
×
805
      }
806
    }
807
  }
808

809
_end:
×
810
  if (code != TSDB_CODE_SUCCESS) {
×
811
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
812
  }
813
  return code;
×
814
}
815

816
static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
×
817
                                      int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint,
818
                                      SSlicePoint* pNextPoint, bool isFwc) {
819
  int32_t code = TSDB_CODE_SUCCESS;
×
820
  int32_t lino = 0;
×
821
  int32_t tmpRes = TSDB_CODE_SUCCESS;
×
822
  void*   pState = pAggSup->pState;
×
823
  resetTimeSlicePrevAndNextWindow(pFillSup);
×
824
  pCurPoint->pResPos = NULL;
×
825
  pPrevPoint->pResPos = NULL;
×
826
  pNextPoint->pResPos = NULL;
×
827

828
  pCurPoint->key.groupId = groupId;
×
829
  pCurPoint->key.ts = ts;
×
830
  int32_t curVLen = 0;
×
831
  code =
832
      pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen, &tmpRes);
×
833
  QUERY_CHECK_CODE(code, lino, _end);
×
834

835
  if (tmpRes == TSDB_CODE_SUCCESS) {
×
836
    setPointBuff(pCurPoint, pFillSup);
×
837
    pFillSup->cur.key = pCurPoint->pRightRow->key;
×
838
    pFillSup->cur.pRowVal = (SResultCellData*)pCurPoint->pRightRow->pRowVal;
×
839
    if (isFwc) {
×
840
      qDebug("===stream=== only get current point state");
×
841
      goto _end;
×
842
    }
843
  } else {
844
    pFillSup->cur.key = pCurPoint->key.ts + 1;
×
845
  }
846

847
  pPrevPoint->key.groupId = groupId;
×
848
  int32_t preVLen = 0;
×
849
  code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key,
×
850
                                                    (void**)&pPrevPoint->pResPos, &preVLen, &tmpRes);
×
851
  QUERY_CHECK_CODE(code, lino, _end);
×
852
  qDebug("===stream=== set stream interp resutl prev buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d",
×
853
         pPrevPoint->key.ts, pPrevPoint->key.groupId, tmpRes);
854

855
  if (tmpRes == TSDB_CODE_SUCCESS) {
×
856
    QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts), code, lino, _end,
×
857
                          TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
858
    setPointBuff(pPrevPoint, pFillSup);
×
859
    if (HAS_ROW_DATA(pPrevPoint->pRightRow)) {
×
860
      pFillSup->prev.key = pPrevPoint->pRightRow->key;
×
861
      pFillSup->prev.pRowVal = (SResultCellData*)pPrevPoint->pRightRow->pRowVal;
×
862
    } else {
863
      pFillSup->prev.key = pPrevPoint->pLeftRow->key;
×
864
      pFillSup->prev.pRowVal = (SResultCellData*)pPrevPoint->pLeftRow->pRowVal;
×
865
    }
866
    pFillSup->prev.key = adustPrevTsKey(pPrevPoint->key.ts, pFillSup->prev.key, &pFillSup->interval);
×
867
  }
868

869
  pNextPoint->key.groupId = groupId;
×
870
  int32_t nextVLen = 0;
×
871
  code = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key,
×
872
                                                    (void**)&pNextPoint->pResPos, &nextVLen, &tmpRes);
×
873
  QUERY_CHECK_CODE(code, lino, _end);
×
874
  qDebug("===stream=== set stream interp resutl next buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d",
×
875
         pNextPoint->key.ts, pNextPoint->key.groupId, tmpRes);
876
  if (tmpRes == TSDB_CODE_SUCCESS) {
×
877
    QUERY_CHECK_CONDITION(!IS_INVALID_WIN_KEY(pNextPoint->key.ts), code, lino, _end,
×
878
                          TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
879
    setPointBuff(pNextPoint, pFillSup);
×
880
    if (HAS_ROW_DATA(pNextPoint->pLeftRow)) {
×
881
      pFillSup->next.key = pNextPoint->pLeftRow->key;
×
882
      pFillSup->next.pRowVal = (SResultCellData*)pNextPoint->pLeftRow->pRowVal;
×
883
    } else {
884
      pFillSup->next.key = pNextPoint->pRightRow->key;
×
885
      pFillSup->next.pRowVal = (SResultCellData*)pNextPoint->pRightRow->pRowVal;
×
886
    }
887
    pFillSup->next.key = adustEndTsKey(pNextPoint->key.ts, pFillSup->next.key, &pFillSup->interval);
×
888

889
    if (pFillSup->type == TSDB_FILL_PREV) {
×
890
      int32_t     nextNextVLen = 0;
×
891
      int32_t     tmpWinCode = TSDB_CODE_SUCCESS;
×
892
      SSlicePoint nextNextPoint = {.key.groupId = pNextPoint->key.groupId};
×
893
      code = pAggSup->stateStore.streamStateFillGetNext(pState, &pNextPoint->key, &nextNextPoint.key, NULL, NULL,
×
894
                                                        &tmpWinCode);
895
      QUERY_CHECK_CODE(code, lino, _end);
×
896
      if (tmpWinCode == TSDB_CODE_SUCCESS) {
×
897
        pFillSup->nextNext.key = nextNextPoint.key.ts;
×
898
      }
899
    }
900
  }
901

902
_end:
×
903
  if (code != TSDB_CODE_SUCCESS) {
×
904
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
905
  }
906
  return code;
×
907
}
908

909
static int32_t getPointInfoFromStateRight(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
×
910
                                          int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pNextPoint,
911
                                          int32_t* pWinCode) {
912
  int32_t code = TSDB_CODE_SUCCESS;
×
913
  int32_t lino = 0;
×
914
  int32_t tmpRes = TSDB_CODE_SUCCESS;
×
915
  void*   pState = pAggSup->pState;
×
916
  pCurPoint->pResPos = NULL;
×
917
  pNextPoint->pResPos = NULL;
×
918

919
  pNextPoint->key.groupId = groupId;
×
920
  STimeWindow stw = {.skey = ts, .ekey = ts};
×
921
  getNextTimeWindow(&pFillSup->interval, &stw, TSDB_ORDER_ASC);
×
922
  pNextPoint->key.ts = stw.skey;
×
923

924
  int32_t curVLen = 0;
×
925
  code = pAggSup->stateStore.streamStateFillAddIfNotExist(pState, &pNextPoint->key, (void**)&pNextPoint->pResPos,
×
926
                                                          &curVLen, pWinCode);
927
  QUERY_CHECK_CODE(code, lino, _end);
×
928

929
  qDebug("===stream=== set stream interp next point buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d",
×
930
         pNextPoint->key.ts, pNextPoint->key.groupId, *pWinCode);
931

932
  setPointBuff(pNextPoint, pFillSup);
×
933

934
  if (*pWinCode != TSDB_CODE_SUCCESS) {
×
935
    if (pNextPoint->pLeftRow) {
×
936
      SET_WIN_KEY_INVALID(pNextPoint->pLeftRow->key);
×
937
    }
938
    if (pNextPoint->pRightRow) {
×
939
      SET_WIN_KEY_INVALID(pNextPoint->pRightRow->key);
×
940
    }
941
  }
942

943
  SET_WIN_KEY_INVALID(pCurPoint->key.ts);
×
944
  pCurPoint->key.groupId = groupId;
×
945
  int32_t nextVLen = 0;
×
946
  code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pNextPoint->key, &pCurPoint->key,
×
947
                                                    (void**)&pCurPoint->pResPos, &nextVLen, &tmpRes);
×
948
  QUERY_CHECK_CODE(code, lino, _end);
×
949

950
  qDebug("===stream=== set stream interp cur point buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d", pCurPoint->key.ts,
×
951
         pCurPoint->key.groupId, tmpRes);
952

953
  if (tmpRes == TSDB_CODE_SUCCESS) {
×
954
    setPointBuff(pCurPoint, pFillSup);
×
955
  }
956

957
_end:
×
958
  if (code != TSDB_CODE_SUCCESS) {
×
959
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
960
  }
961
  return code;
×
962
}
963

964
static int32_t getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
×
965
                                     int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pNextPoint,
966
                                     int32_t* pWinCode) {
967
  int32_t code = TSDB_CODE_SUCCESS;
×
968
  int32_t lino = 0;
×
969
  int32_t tmpRes = TSDB_CODE_SUCCESS;
×
970
  void*   pState = pAggSup->pState;
×
971
  pCurPoint->pResPos = NULL;
×
972
  pNextPoint->pResPos = NULL;
×
973
  pCurPoint->key.groupId = groupId;
×
974
  pCurPoint->key.ts = ts;
×
975

976
  int32_t curVLen = 0;
×
977
  code = pAggSup->stateStore.streamStateFillAddIfNotExist(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos,
×
978
                                                          &curVLen, pWinCode);
979
  QUERY_CHECK_CODE(code, lino, _end);
×
980

981
  qDebug("===stream=== set stream interp buf.ts:%" PRId64 ", groupId:%" PRId64, pCurPoint->key.ts,
×
982
         pCurPoint->key.groupId);
983

984
  setPointBuff(pCurPoint, pFillSup);
×
985

986
  if (*pWinCode != TSDB_CODE_SUCCESS) {
×
987
    if (pCurPoint->pLeftRow) {
×
988
      SET_WIN_KEY_INVALID(pCurPoint->pLeftRow->key);
×
989
    }
990
    if (pCurPoint->pRightRow) {
×
991
      SET_WIN_KEY_INVALID(pCurPoint->pRightRow->key);
×
992
    }
993
  }
994

995
  int32_t nextVLen = 0;
×
996
  pNextPoint->key.groupId = groupId;
×
997
  if (pFillSup->type != TSDB_FILL_LINEAR && pFillSup->type != TSDB_FILL_PREV) {
×
998
    SET_WIN_KEY_INVALID(pNextPoint->key.ts);
×
999
    code = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key,
×
1000
                                                      (void**)&pNextPoint->pResPos, &nextVLen, &tmpRes);
×
1001
    QUERY_CHECK_CODE(code, lino, _end);
×
1002
    if (tmpRes == TSDB_CODE_SUCCESS) {
×
1003
      setPointBuff(pNextPoint, pFillSup);
×
1004
    }
1005
  } else {
1006
    pNextPoint->key.ts = taosTimeAdd(pCurPoint->key.ts, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
×
1007
                                     pFillSup->interval.precision, NULL);
×
1008
    code = pAggSup->stateStore.streamStateFillAddIfNotExist(pState, &pNextPoint->key, (void**)&pNextPoint->pResPos,
×
1009
                                                            &nextVLen, &tmpRes);
1010
    QUERY_CHECK_CODE(code, lino, _end);
×
1011
    setPointBuff(pNextPoint, pFillSup);
×
1012
    if (tmpRes != TSDB_CODE_SUCCESS) {
×
1013
      SET_WIN_KEY_INVALID(pNextPoint->pLeftRow->key);
×
1014
      SET_WIN_KEY_INVALID(pNextPoint->pRightRow->key);
×
1015
    }
1016
  }
1017

1018
_end:
×
1019
  if (code != TSDB_CODE_SUCCESS) {
×
1020
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1021
  }
1022
  return code;
×
1023
}
1024

1025
// partition key
1026
static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
×
1027
  for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
×
1028
    SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
×
1029
    if (!isInterpFunc(pFillCol->pExpr) && !isIrowtsPseudoColumn(pFillCol->pExpr) &&
×
1030
        !isIsfilledPseudoColumn(pFillCol->pExpr)) {
×
1031
      int32_t          srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
×
1032
      SResultCellData* pSrcCell = getSliceResultCell(pFillSup->cur.pRowVal, srcSlot, pFillSup->pOffsetInfo);
×
1033
      SResultCellData* pDestCell = getSliceResultCell(pFillInfo->pNonFillRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
×
1034
      pDestCell->isNull = pSrcCell->isNull;
×
1035
      if (!pDestCell->isNull) {
×
1036
        memcpy(pDestCell->pData, pSrcCell->pData, pSrcCell->bytes);
×
1037
      }
1038
    }
1039
  }
1040
}
×
1041

1042
static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol,
×
1043
                                 int32_t* pOffsetInfo, bool normalFill) {
1044
  for (int32_t i = 0; i < numOfCol; i++) {
×
1045
    if (isInterpFunc(pFillCol[i].pExpr) || normalFill) {
×
1046
      int32_t          slotId = pFillCol[i].pExpr->base.pParam[0].pCol->slotId;
×
1047
      SResultCellData* pECell = getSliceResultCell(pEndRow->pRowVal, slotId, pOffsetInfo);
×
1048
      SPoint*          pPoint = taosArrayGet(pEndPoins, slotId);
×
1049
      pPoint->key = pEndRow->key;
×
1050
      memcpy(pPoint->val, pECell->pData, pECell->bytes);
×
1051
    }
1052
  }
1053
}
×
1054

1055
static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {
×
1056
  qDebug("===stream=== set force window close rule.ts:%" PRId64 ",cur key:%" PRId64 ", has prev%d, has next:%d", ts,
×
1057
         pFillSup->cur.key, hasPrevWindow(pFillSup), hasNextWindow(pFillSup));
1058
  pFillInfo->needFill = true;
×
1059
  pFillInfo->pos = FILL_POS_INVALID;
×
1060
  switch (pFillInfo->type) {
×
1061
    case TSDB_FILL_NULL:
×
1062
    case TSDB_FILL_NULL_F:
1063
    case TSDB_FILL_SET_VALUE:
1064
    case TSDB_FILL_SET_VALUE_F: {
1065
      if (ts == pFillSup->cur.key) {
×
1066
        pFillInfo->pos = FILL_POS_START;
×
1067
        pFillInfo->needFill = false;
×
1068
      } else {
1069
        pFillInfo->pos = FILL_POS_INVALID;
×
1070
        setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo);
×
1071
      }
1072
      if (pFillSup->cur.pRowVal != NULL) {
×
1073
        copyNonFillValueInfo(pFillSup, pFillInfo);
×
1074
      }
1075
    } break;
×
1076
    case TSDB_FILL_PREV: {
×
1077
      if (ts == pFillSup->cur.key) {
×
1078
        pFillInfo->pos = FILL_POS_START;
×
1079
        pFillInfo->needFill = false;
×
1080
      } else if (ts > pFillSup->cur.key) {
×
1081
        setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo);
×
1082
        pFillInfo->pResRow = &pFillSup->cur;
×
1083
      } else if (hasPrevWindow(pFillSup)) {
×
1084
        pFillInfo->pos = FILL_POS_INVALID;
×
1085
        setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo);
×
1086
        pFillInfo->pResRow = &pFillSup->prev;
×
1087
      } else {
1088
        pFillInfo->needFill = false;
×
1089
        pFillInfo->pos = FILL_POS_INVALID;
×
1090
      }
1091
    } break;
×
1092
    default:
×
1093
      qError("%s failed at line %d since invalid fill type", __func__, __LINE__);
×
1094
      break;
×
1095
  }
1096
}
×
1097

1098
void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {
×
1099
  int32_t code = TSDB_CODE_SUCCESS;
×
1100
  int32_t lino = 0;
×
1101
  if (IS_FILL_CONST_VALUE(pFillInfo->type)) {
×
1102
    copyNonFillValueInfo(pFillSup, pFillInfo);
×
1103
  }
1104
  if (!hasNextWindow(pFillSup) && !hasPrevWindow(pFillSup)) {
×
1105
    pFillInfo->needFill = false;
×
1106
    pFillInfo->pos = FILL_POS_START;
×
1107
    goto _end;
×
1108
  }
1109

1110
  TSKEY prevWKey = INT64_MIN;
×
1111
  TSKEY nextWKey = INT64_MIN;
×
1112
  if (hasPrevWindow(pFillSup)) {
×
1113
    prevWKey = pFillSup->prev.key;
×
1114
  }
1115
  if (hasNextWindow(pFillSup)) {
×
1116
    nextWKey = pFillSup->next.key;
×
1117
  }
1118
  TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
×
1119
  TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
×
1120

1121
  pFillInfo->needFill = true;
×
1122
  pFillInfo->pos = FILL_POS_INVALID;
×
1123
  switch (pFillInfo->type) {
×
1124
    case TSDB_FILL_NULL:
×
1125
    case TSDB_FILL_NULL_F:
1126
    case TSDB_FILL_SET_VALUE:
1127
    case TSDB_FILL_SET_VALUE_F: {
1128
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->preRowKey == pFillInfo->prePointKey &&
×
1129
          pFillInfo->nextRowKey != pFillInfo->nextPointKey) {
×
1130
        setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
×
1131
        pFillInfo->pos = FILL_POS_MID;
×
1132
        pFillInfo->hasNext = true;
×
1133
      } else if (hasPrevWindow(pFillSup)) {
×
1134
        setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
×
1135
        pFillInfo->pos = FILL_POS_END;
×
1136
      } else {
1137
        setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo);
×
1138
        pFillInfo->pos = FILL_POS_START;
×
1139
      }
1140
      // copyNonFillValueInfo(pFillSup, pFillInfo);
1141
    } break;
×
1142
    case TSDB_FILL_PREV: {
×
1143
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->preRowKey != pFillInfo->prePointKey &&
×
1144
          pFillInfo->nextRowKey == pFillInfo->nextPointKey) {
×
1145
        setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
×
1146
        pFillInfo->pos = FILL_POS_MID;
×
1147
        pFillInfo->hasNext = true;
×
1148
      } else if (hasNextWindow(pFillSup)) {
×
1149
        setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo);
×
1150
        pFillInfo->pos = FILL_POS_START;
×
1151
        resetFillWindow(&pFillSup->prev);
×
1152
        pFillSup->prev.key = ts;
×
1153
        pFillSup->prev.pRowVal = pFillSup->cur.pRowVal;
×
1154
      } else {
1155
        QUERY_CHECK_CONDITION(hasPrevWindow(pFillSup), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1156
        setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
×
1157
        pFillInfo->pos = FILL_POS_END;
×
1158
      }
1159
      pFillInfo->pResRow = &pFillSup->prev;
×
1160
    } break;
×
1161
    case TSDB_FILL_NEXT: {
×
1162
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup) && pFillInfo->preRowKey == pFillInfo->prePointKey &&
×
1163
          pFillInfo->nextRowKey != pFillInfo->nextPointKey) {
×
1164
        setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
×
1165
        pFillInfo->pos = FILL_POS_MID;
×
1166
        pFillInfo->hasNext = true;
×
1167
        pFillInfo->pResRow = &pFillSup->cur;
×
1168
      } else if (hasPrevWindow(pFillSup)) {
×
1169
        setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
×
1170
        pFillInfo->pos = FILL_POS_END;
×
1171
        resetFillWindow(&pFillSup->next);
×
1172
        pFillSup->next.key = ts;
×
1173
        pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
×
1174
        pFillInfo->pResRow = &pFillSup->next;
×
1175
      } else {
1176
        setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo);
×
1177
        pFillInfo->pos = FILL_POS_START;
×
1178
        resetFillWindow(&pFillSup->prev);
×
1179
        pFillInfo->pResRow = &pFillSup->next;
×
1180
      }
1181
    } break;
×
1182
    case TSDB_FILL_LINEAR: {
×
1183
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
×
1184
        if (pFillSup->normalFill) {
×
1185
          setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
×
1186
          pFillInfo->pos = FILL_POS_MID;
×
1187
          copyCalcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
×
1188
                               pFillSup->numOfAllCols, pFillSup->pOffsetInfo, pFillSup->normalFill);
×
1189
          pFillSup->next.key = pFillSup->nextOriginKey;
×
1190
          pFillInfo->pResRow = &pFillSup->prev;
×
1191
          copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo,
×
1192
                               pFillSup->numOfAllCols, pFillSup->pOffsetInfo, pFillSup->normalFill);
×
1193
          pFillInfo->pLinearInfo->nextEnd = nextWKey;
×
1194
          pFillInfo->pLinearInfo->hasNext = true;
×
1195
        } else {
1196
          setFillKeyInfo(prevWKey, nextWKey, &pFillSup->interval, pFillInfo);
×
1197
          pFillInfo->pos = FILL_POS_INVALID;
×
1198
          SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
×
1199
          pFillSup->next.key = pFillSup->nextOriginKey;
×
1200
          copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
×
1201
                               pFillSup->numOfAllCols, pFillSup->pOffsetInfo, pFillSup->normalFill);
×
1202
          pFillSup->prev.key = pFillSup->prevOriginKey;
×
1203
          pFillInfo->pResRow = &pFillSup->prev;
×
1204
          pFillInfo->pLinearInfo->hasNext = false;
×
1205
        }
1206
      } else if (hasPrevWindow(pFillSup)) {
×
1207
        setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
×
1208
        pFillInfo->pos = FILL_POS_END;
×
1209
        SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
×
1210
        copyCalcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
×
1211
                             pFillSup->numOfAllCols, pFillSup->pOffsetInfo, pFillSup->normalFill);
×
1212
        pFillSup->prev.key = pFillSup->prevOriginKey;
×
1213
        pFillInfo->pResRow = &pFillSup->prev;
×
1214
        pFillInfo->pLinearInfo->hasNext = false;
×
1215
      } else {
1216
        QUERY_CHECK_CONDITION(hasNextWindow(pFillSup), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1217
        setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo);
×
1218
        pFillInfo->pos = FILL_POS_START;
×
1219
        SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
×
1220
        pFillSup->next.key = pFillSup->nextOriginKey;
×
1221
        copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
×
1222
                             pFillSup->numOfAllCols, pFillSup->pOffsetInfo, pFillSup->normalFill);
×
1223
        pFillInfo->pResRow = &pFillSup->cur;
×
1224
        pFillInfo->pLinearInfo->hasNext = false;
×
1225
      }
1226
    } break;
×
1227
    default:
×
1228
      qError("%s failed at line %d since invalid fill type", __func__, __LINE__);
×
1229
      break;
×
1230
  }
1231

1232
_end:
×
1233
  if (ts != pFillSup->cur.key) {
×
1234
    pFillInfo->pos = FILL_POS_INVALID;
×
1235
  }
1236
  if (code != TSDB_CODE_SUCCESS) {
×
1237
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1238
  }
1239
}
×
1240

1241
static int32_t comparePkVal(void* pLeft, void* pRight, SStreamFillSupporter* pFillSup) {
×
1242
  void* pTmpVal = POINTER_SHIFT(pLeft, pFillSup->rowSize);
×
1243
  return pFillSup->comparePkColFn(pTmpVal, pRight);
×
1244
}
1245

1246
static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, void* pPkVal, SStreamFillSupporter* pFillSup, bool isLeft,
×
1247
                            int32_t fillType) {
1248
  if (IS_INVALID_WIN_KEY(pPoint->key.ts)) {
×
1249
    return false;
×
1250
  }
1251

1252
  switch (fillType) {
×
1253
    case TSDB_FILL_NULL:
×
1254
    case TSDB_FILL_NULL_F:
1255
    case TSDB_FILL_SET_VALUE:
1256
    case TSDB_FILL_SET_VALUE_F: {
1257
      if (!isLeft) {
×
1258
        if (HAS_NON_ROW_DATA(pPoint->pRightRow)) {
×
1259
          return true;
×
1260
        } else {
1261
          if (pPoint->key.ts == ts) {
×
1262
            if (pFillSup->comparePkColFn == NULL || comparePkVal(pPoint->pRightRow, pPkVal, pFillSup) >= 0) {
×
1263
              return true;
×
1264
            }
1265
          }
1266
        }
1267
      }
1268
    } break;
×
1269
    case TSDB_FILL_PREV: {
×
1270
      if (isLeft) {
×
1271
        if (HAS_NON_ROW_DATA(pPoint->pLeftRow)) {
×
1272
          return true;
×
1273
        } else {
1274
          if (pPoint->pLeftRow->key < ts) {
×
1275
            return true;
×
1276
          } else if (pPoint->pLeftRow->key == ts) {
×
1277
            if (pFillSup->comparePkColFn == NULL || comparePkVal(pPoint->pLeftRow, pPkVal, pFillSup) >= 0) {
×
1278
              return true;
×
1279
            }
1280
          }
1281
        }
1282
      }
1283

1284
      if (!isLeft && pPoint->key.ts == ts) {
×
1285
        if (HAS_NON_ROW_DATA(pPoint->pLeftRow) || pFillSup->comparePkColFn == NULL ||
×
1286
            comparePkVal(pPoint->pLeftRow, pPkVal, pFillSup) >= 0) {
×
1287
          return true;
×
1288
        }
1289
      }
1290
    } break;
×
1291
    case TSDB_FILL_NEXT: {
×
1292
      if (!isLeft) {
×
1293
        if (HAS_NON_ROW_DATA(pPoint->pRightRow)) {
×
1294
          return true;
×
1295
        } else {
1296
          if (pPoint->pRightRow->key > ts) {
×
1297
            return true;
×
1298
          } else if (pPoint->pRightRow->key == ts) {
×
1299
            if (pFillSup->comparePkColFn == NULL || comparePkVal(pPoint->pRightRow, pPkVal, pFillSup) >= 0) {
×
1300
              return true;
×
1301
            }
1302
          }
1303
        }
1304
      }
1305
    } break;
×
1306
    case TSDB_FILL_LINEAR: {
×
1307
      if (isLeft) {
×
1308
        if (HAS_NON_ROW_DATA(pPoint->pLeftRow)) {
×
1309
          return true;
×
1310
        } else {
1311
          if (pPoint->pLeftRow->key < ts) {
×
1312
            return true;
×
1313
          } else if (pPoint->pLeftRow->key == ts) {
×
1314
            if (pFillSup->comparePkColFn == NULL || comparePkVal(pPoint->pLeftRow, pPkVal, pFillSup) >= 0) {
×
1315
              return true;
×
1316
            }
1317
          }
1318
        }
1319
      } else {
1320
        if (HAS_NON_ROW_DATA(pPoint->pRightRow)) {
×
1321
          return true;
×
1322
        } else {
1323
          if (pPoint->pRightRow->key > ts) {
×
1324
            return true;
×
1325
          } else if (pPoint->pRightRow->key == ts) {
×
1326
            if (pFillSup->comparePkColFn == NULL || comparePkVal(pPoint->pRightRow, pPkVal, pFillSup) >= 0) {
×
1327
              return true;
×
1328
            }
1329
          }
1330
        }
1331
      }
1332
    } break;
×
1333
    default:
×
1334
      qError("%s failed at line %d since invalid fill type", __func__, __LINE__);
×
1335
  }
1336
  return false;
×
1337
}
1338

1339
void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal,
×
1340
                                int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol, int32_t* pCellOffsetInfo) {
1341
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
×
1342
  for (int32_t i = 0; i < numOfCols; ++i) {
×
1343
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
1344
    SResultCellData* pCell = getSliceResultCell((SResultCellData*)pRowVal->pRowVal, i, pCellOffsetInfo);
×
1345
    if (!colDataIsNull_s(pColData, rowId)) {
×
1346
      pCell->isNull = false;
×
1347
      pCell->type = pColData->info.type;
×
1348
      pCell->bytes = pColData->info.bytes;
×
1349
      char* val = colDataGetData(pColData, rowId);
×
1350
      if (IS_VAR_DATA_TYPE(pCell->type)) {
×
1351
        memcpy(pCell->pData, val, varDataTLen(val));
×
1352
      } else {
1353
        memcpy(pCell->pData, val, pCell->bytes);
×
1354
      }
1355
    } else {
1356
      pCell->isNull = true;
×
1357
    }
1358
  }
1359
  pRowVal->key = ts;
×
1360
  if (pPkData != NULL) {
×
1361
    void* pPkVal = POINTER_SHIFT(pRowVal, rowSize);
×
1362
    if (IS_VAR_DATA_TYPE(pPkCol->info.type)) {
×
1363
      memcpy(pPkVal, pPkData, varDataTLen(pPkData));
×
1364
    } else {
1365
      memcpy(pPkVal, pPkData, pPkCol->info.bytes);
×
1366
    }
1367
  }
1368
}
×
1369

1370
static int32_t saveTimeSliceWinResultInfo(SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup, SWinKey* pKey,
×
1371
                                          SSHashObj* pUpdatedMap, bool needDel, SSHashObj* pDeletedMap) {
1372
  int32_t code = TSDB_CODE_SUCCESS;
×
1373
  int32_t lino = 0;
×
1374

1375
  if (pTwAggSup->calTrigger == STREAM_TRIGGER_AT_ONCE) {
×
1376
    code = saveTimeSliceWinResult(pKey, pUpdatedMap);
×
1377
    QUERY_CHECK_CODE(code, lino, _end);
×
1378
    if (needDel) {
×
1379
      code = saveTimeSliceWinResult(pKey, pDeletedMap);
×
1380
      QUERY_CHECK_CODE(code, lino, _end);
×
1381
    }
1382
  } else if (pTwAggSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
×
1383
    code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, pKey->groupId, NULL, 0);
×
1384
    QUERY_CHECK_CODE(code, lino, _end);
×
1385
  }
1386
  pTwAggSup->maxTs = TMAX(pTwAggSup->maxTs, pKey->ts);
×
1387

1388
_end:
×
1389
  if (code != TSDB_CODE_SUCCESS) {
×
1390
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1391
  }
1392
  return code;
×
1393
}
1394

1395
static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
×
1396
  int32_t                       code = TSDB_CODE_SUCCESS;
×
1397
  int32_t                       lino = 0;
×
1398
  int32_t                       winCode = TSDB_CODE_SUCCESS;
×
1399
  SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)pOperator->info;
×
1400
  SExecTaskInfo*                pTaskInfo = pOperator->pTaskInfo;
×
1401
  SStreamAggSupporter*          pAggSup = &pInfo->streamAggSup;
×
1402
  SExprSupp*                    pExprSup = &pOperator->exprSupp;
×
1403
  int32_t                       numOfOutput = pExprSup->numOfExprs;
×
1404
  SColumnInfoData*              pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
×
1405
  TSKEY*                        tsCols = (int64_t*)pColDataInfo->pData;
×
1406
  void*                         pPkVal = NULL;
×
1407
  int32_t                       pkLen = 0;
×
1408
  int64_t                       groupId = pBlock->info.id.groupId;
×
1409
  SColumnInfoData*              pPkColDataInfo = NULL;
×
1410
  SStreamFillSupporter*         pFillSup = pInfo->pFillSup;
×
1411
  SStreamFillInfo*              pFillInfo = pInfo->pFillInfo;
×
1412
  if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
1413
    pPkColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->basic.primaryPkIndex);
×
1414
  }
1415

1416
  pFillSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
×
1417
  if (pFillSup->winRange.ekey <= 0) {
×
1418
    pFillSup->winRange.ekey = INT64_MAX;
×
1419
  }
1420

1421
  int32_t startPos = 0;
×
1422
  for (; startPos < pBlock->info.rows; startPos++) {
×
1423
    if (hasSrcPrimaryKeyCol(&pInfo->basic) && pInfo->ignoreExpiredData) {
×
1424
      pPkVal = colDataGetData(pPkColDataInfo, startPos);
×
1425
      pkLen = colDataGetRowLength(pPkColDataInfo, startPos);
×
1426
    }
1427

1428
    if (pInfo->twAggSup.calTrigger != STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pInfo->ignoreExpiredData &&
×
1429
        checkExpiredData(&pAggSup->stateStore, pAggSup->pUpdateInfo, &pInfo->twAggSup, pBlock->info.id.uid,
×
1430
                         tsCols[startPos], pPkVal, pkLen)) {
×
1431
      qDebug("===stream===ignore expired data, window end ts:%" PRId64 ", maxts - wartermak:%" PRId64, tsCols[startPos],
×
1432
             pInfo->twAggSup.maxTs - pInfo->twAggSup.waterMark);
1433
      continue;
×
1434
    }
1435

1436
    if (checkNullRow(pExprSup, pBlock, startPos, pInfo->ignoreNull)) {
×
1437
      continue;
×
1438
    }
1439
    break;
×
1440
  }
1441

1442
  if (startPos >= pBlock->info.rows) {
×
1443
    return;
×
1444
  }
1445

1446
  SResultRowInfo dumyInfo = {0};
×
1447
  dumyInfo.cur.pageId = -1;
×
1448
  STimeWindow curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC);
×
1449
  SSlicePoint curPoint = {0};
×
1450
  SSlicePoint nextPoint = {0};
×
1451
  bool        left = false;
×
1452
  bool        right = false;
×
1453
  if (pFillSup->type != TSDB_FILL_PREV || curWin.skey == tsCols[startPos]) {
×
1454
    code = getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &nextPoint, &winCode);
×
1455
  } else {
1456
    code = getPointInfoFromStateRight(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &nextPoint, &winCode);
×
1457
  }
1458
  QUERY_CHECK_CODE(code, lino, _end);
×
1459

1460
  if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
1461
    pPkVal = colDataGetData(pPkColDataInfo, startPos);
×
1462
  }
1463
  right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type);
×
1464
  if (right) {
×
1465
    transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal,
×
1466
                               pPkColDataInfo, pFillSup->pOffsetInfo);
1467
    bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
×
1468
    code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel,
×
1469
                                      pInfo->pDeletedMap);
1470
    QUERY_CHECK_CODE(code, lino, _end);
×
1471
  }
1472
  releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
×
1473

1474
  while (startPos < pBlock->info.rows) {
×
1475
    int32_t numOfWin = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
×
1476
                                                TSDB_ORDER_ASC);
1477
    startPos += numOfWin;
×
1478
    int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull);
×
1479
    QUERY_CHECK_CONDITION((leftRowId >= 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1480
    if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
1481
      pPkVal = colDataGetData(pPkColDataInfo, leftRowId);
×
1482
    }
1483
    left = needAdjustValue(&nextPoint, tsCols[leftRowId], pPkVal, pFillSup, true, pFillSup->type);
×
1484
    if (left) {
×
1485
      transBlockToSliceResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal,
×
1486
                                 pPkColDataInfo, pFillSup->pOffsetInfo);
1487
      bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
×
1488
      code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &nextPoint.key, pInfo->pUpdatedMap, needDel,
×
1489
                                        pInfo->pDeletedMap);
1490
      QUERY_CHECK_CODE(code, lino, _end);
×
1491
    }
1492
    releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore);
×
1493

1494
    startPos = getQualifiedRowNumAsc(pExprSup, pBlock, startPos, pInfo->ignoreNull);
×
1495
    if (startPos < 0) {
×
1496
      break;
×
1497
    }
1498
    curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC);
×
1499
    if (pFillSup->type != TSDB_FILL_PREV || curWin.skey == tsCols[startPos]) {
×
1500
      code = getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &nextPoint, &winCode);
×
1501
    } else {
1502
      code = getPointInfoFromStateRight(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &nextPoint, &winCode);
×
1503
    }
1504
    QUERY_CHECK_CODE(code, lino, _end);
×
1505

1506
    if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
1507
      pPkVal = colDataGetData(pPkColDataInfo, startPos);
×
1508
    }
1509
    right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type);
×
1510
    if (right) {
×
1511
      transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal,
×
1512
                                 pPkColDataInfo, pFillSup->pOffsetInfo);
1513
      bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
×
1514
      code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel,
×
1515
                                        pInfo->pDeletedMap);
1516
      QUERY_CHECK_CODE(code, lino, _end);
×
1517
    }
1518
    releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
×
1519
  }
1520

1521
_end:
×
1522
  if (code != TSDB_CODE_SUCCESS) {
×
1523
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1524
  }
1525
}
1526

1527
void getNextResKey(int64_t curGroupId, SArray* pKeyArray, int32_t curIndex, TSKEY* pNextKey) {
×
1528
  int32_t nextIndex = curIndex + 1;
×
1529
  if (nextIndex < taosArrayGetSize(pKeyArray)) {
×
1530
    SWinKey* pKey = (SWinKey*)taosArrayGet(pKeyArray, nextIndex);
×
1531
    if (pKey->groupId == curGroupId) {
×
1532
      *pNextKey = pKey->ts;
×
1533
      return;
×
1534
    }
1535
  }
1536
  *pNextKey = INT64_MIN;
×
1537
}
1538

1539
void getPrevResKey(int64_t curGroupId, SArray* pKeyArray, int32_t curIndex, TSKEY* pNextKey) {
×
1540
  int32_t prevIndex = curIndex - 1;
×
1541
  if (prevIndex >= 0) {
×
1542
    SWinKey* pKey = (SWinKey*)taosArrayGet(pKeyArray, prevIndex);
×
1543
    if (pKey->groupId == curGroupId) {
×
1544
      *pNextKey = pKey->ts;
×
1545
      return;
×
1546
    }
1547
  }
1548
  *pNextKey = INT64_MIN;
×
1549
}
1550

1551
void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwSup,
×
1552
                                 SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock,
1553
                                 SGroupResInfo* pGroupResInfo, SExecTaskInfo* pTaskInfo) {
1554
  int32_t code = TSDB_CODE_SUCCESS;
×
1555
  int32_t lino = 0;
×
1556
  blockDataCleanup(pBlock);
×
1557
  if (!hasRemainResults(pGroupResInfo)) {
×
1558
    return;
×
1559
  }
1560

1561
  bool isFwc = (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE);
×
1562
  // clear the existed group id
1563
  pBlock->info.id.groupId = 0;
×
1564
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
×
1565
  for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
×
1566
    SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
×
1567
    qDebug("===stream=== build interp res. key:%" PRId64 ",groupId:%" PRIu64, pKey->ts, pKey->groupId);
×
1568
    if (pBlock->info.id.groupId == 0) {
×
1569
      pBlock->info.id.groupId = pKey->groupId;
×
1570
    } else if (pBlock->info.id.groupId != pKey->groupId) {
×
1571
      if (pBlock->info.rows > 0) {
×
1572
        break;
×
1573
      } else {
1574
        pBlock->info.id.groupId = pKey->groupId;
×
1575
      }
1576
    }
1577
    void*   tbname = NULL;
×
1578
    int32_t winCode = TSDB_CODE_SUCCESS;
×
1579
    code = pAggSup->stateStore.streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname,
×
1580
                                                     false, &winCode);
1581
    QUERY_CHECK_CODE(code, lino, _end);
×
1582
    if (winCode != TSDB_CODE_SUCCESS) {
×
1583
      pBlock->info.parTbName[0] = 0;
×
1584
    } else {
1585
      memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
×
1586
    }
1587
    qDebug("%s partName:%s, groupId:%"PRIu64, __FUNCTION__, (char*)tbname, pKey->groupId);
×
1588

1589
    pAggSup->stateStore.streamStateFreeVal(tbname);
×
1590

1591
    SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId};
×
1592
    SSlicePoint prevPoint = {0};
×
1593
    SSlicePoint nextPoint = {0};
×
1594
    if (pFillSup->type != TSDB_FILL_LINEAR) {
×
1595
      code =
1596
          getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, isFwc);
×
1597
    } else {
1598
      code =
1599
          getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);
×
1600
    }
1601
    QUERY_CHECK_CODE(code, lino, _end);
×
1602

1603
    if (pFillSup->type != TSDB_FILL_LINEAR) {
×
1604
      getPrevResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index, &pFillInfo->preRowKey);
×
1605
      if (hasPrevWindow(pFillSup)) {
×
1606
        pFillInfo->prePointKey = prevPoint.key.ts;
×
1607
      }
1608

1609
      getNextResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index, &pFillInfo->nextRowKey);
×
1610
      if (hasNextWindow(pFillSup)) {
×
1611
        pFillInfo->nextPointKey = nextPoint.key.ts;
×
1612
      }
1613
    }
1614

1615
    if (isFwc) {
×
1616
      setForceWindowCloseFillRule(pFillSup, pFillInfo, pKey->ts);
×
1617
    } else {
1618
      setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts);
×
1619
    }
1620
    doStreamTimeSliceFillRange(pFillSup, pFillInfo, pBlock);
×
1621
    releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
×
1622
    releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore);
×
1623
    releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore);
×
1624
    if (pBlock->info.rows >= pBlock->info.capacity) {
×
1625
      pGroupResInfo->index++;
×
1626
      break;
×
1627
    }
1628
  }
1629

1630
_end:
×
1631
  if (code != TSDB_CODE_SUCCESS) {
×
1632
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1633
  }
1634
}
1635

1636
static void doBuildTimeSliceDeleteResult(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, SArray* pWins,
×
1637
                                         int32_t* index, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
1638
  int32_t code = TSDB_CODE_SUCCESS;
×
1639
  int32_t lino = 0;
×
1640
  blockDataCleanup(pBlock);
×
1641
  int32_t size = taosArrayGetSize(pWins);
×
1642
  if (*index == size) {
×
1643
    *index = 0;
×
1644
    taosArrayClear(pWins);
×
1645
    goto _end;
×
1646
  }
1647
  code = blockDataEnsureCapacity(pBlock, size - *index);
×
1648
  QUERY_CHECK_CODE(code, lino, _end);
×
1649

1650
  uint64_t uid = 0;
×
1651
  for (int32_t i = *index; i < size; i++) {
×
1652
    SWinKey*    pKey = taosArrayGet(pWins, i);
×
1653
    SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId};
×
1654
    SSlicePoint prevPoint = {0};
×
1655
    SSlicePoint nextPoint = {0};
×
1656
    STimeWindow tw = {0};
×
1657
    if (pFillSup->type != TSDB_FILL_LINEAR) {
×
1658
      code =
1659
          getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, false);
×
1660
    } else {
1661
      code =
1662
          getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);
×
1663
    }
1664
    QUERY_CHECK_CODE(code, lino, _end);
×
1665

1666
    if (pFillSup->type == TSDB_FILL_PREV && hasNextWindow(pFillSup)) {
×
1667
      tw.skey = pFillSup->cur.key;
×
1668
      tw.ekey = pFillSup->next.key;
×
1669
    } else if (pFillSup->type == TSDB_FILL_NEXT && hasPrevWindow(pFillSup)) {
×
1670
      tw.skey = pFillSup->prev.key;
×
1671
      tw.ekey = pFillSup->cur.key;
×
1672
    } else if (pFillSup->type == TSDB_FILL_LINEAR) {
×
1673
      if (hasPrevWindow(pFillSup)) {
×
1674
        tw.skey = pFillSup->prev.key;
×
1675
      } else {
1676
        tw.skey = pFillSup->cur.key;
×
1677
      }
1678
      if (hasNextWindow(pFillSup)) {
×
1679
        tw.ekey = pFillSup->next.key;
×
1680
      } else {
1681
        tw.ekey = pFillSup->cur.key;
×
1682
      }
1683
    } else {
1684
      tw.skey = pFillSup->cur.key;
×
1685
      tw.ekey = pFillSup->cur.key;
×
1686
    }
1687

1688
    if (tw.skey == INT64_MIN || tw.ekey == INT64_MIN) {
×
1689
      continue;
×
1690
    }
1691

1692
    releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
×
1693
    releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore);
×
1694
    releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore);
×
1695

1696
    void*   tbname = NULL;
×
1697
    int32_t winCode = TSDB_CODE_SUCCESS;
×
1698

1699
    code = pAggSup->stateStore.streamStateGetParName(pTaskInfo->streamInfo.pState, pKey->groupId, &tbname, false,
×
1700
                                                     &winCode);
1701
    QUERY_CHECK_CODE(code, lino, _end);
×
1702

1703
    if (winCode != TSDB_CODE_SUCCESS) {
×
1704
      code = appendDataToSpecialBlock(pBlock, &tw.skey, &tw.ekey, &uid, &pKey->groupId, NULL);
×
1705
      QUERY_CHECK_CODE(code, lino, _end);
×
1706
    } else {
1707
      QUERY_CHECK_CONDITION((tbname), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1708
      char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
1709
      STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
×
1710
      code = appendDataToSpecialBlock(pBlock, &tw.skey, &tw.ekey, &uid, &pKey->groupId, parTbName);
×
1711
      QUERY_CHECK_CODE(code, lino, _end);
×
1712
    }
1713
    pAggSup->stateStore.streamStateFreeVal(tbname);
×
1714
    (*index)++;
×
1715
  }
1716

1717
_end:
×
1718
  if (code != TSDB_CODE_SUCCESS) {
×
1719
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1720
  }
1721
}
×
1722

1723
static int32_t buildTimeSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
1724
  int32_t                       code = TSDB_CODE_SUCCESS;
×
1725
  int32_t                       lino = 0;
×
1726
  SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
×
1727
  SExecTaskInfo*                pTaskInfo = pOperator->pTaskInfo;
×
1728
  uint16_t                      opType = pOperator->operatorType;
×
1729
  SStreamAggSupporter*          pAggSup = &pInfo->streamAggSup;
×
1730

1731
  doBuildTimeSliceDeleteResult(pAggSup, pInfo->pFillSup, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes, pTaskInfo);
×
1732
  if (pInfo->pDelRes->info.rows != 0) {
×
1733
    // process the rest of the data
1734
    printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
×
1735
    (*ppRes) = pInfo->pDelRes;
×
1736
    goto _end;
×
1737
  }
1738

1739
  doBuildTimeSlicePointResult(pAggSup, &pInfo->twAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes,
×
1740
                              &pInfo->groupResInfo, pTaskInfo);
1741
  if (pInfo->pRes->info.rows != 0) {
×
1742
    printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
×
1743
    (*ppRes) = pInfo->pRes;
×
1744
    goto _end;
×
1745
  }
1746

1747
  (*ppRes) = NULL;
×
1748

1749
_end:
×
1750
  if (code != TSDB_CODE_SUCCESS) {
×
1751
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1752
  }
1753
  return code;
×
1754
}
1755

1756
int32_t getSliceMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
×
1757
  int32_t code = TSDB_CODE_SUCCESS;
×
1758
  int32_t lino = 0;
×
1759
  int32_t size = taosArrayGetSize(pAllWins);
×
1760
  if (size == 0) {
×
1761
    goto _end;
×
1762
  }
1763
  SWinKey* pKey = taosArrayGet(pAllWins, size - 1);
×
1764
  void*    tmp = taosArrayPush(pMaxWins, pKey);
×
1765
  QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1766

1767
  if (pKey->groupId == 0) {
×
1768
    goto _end;
×
1769
  }
1770
  uint64_t preGpId = pKey->groupId;
×
1771
  for (int32_t i = size - 2; i >= 0; i--) {
×
1772
    pKey = taosArrayGet(pAllWins, i);
×
1773
    if (preGpId != pKey->groupId) {
×
1774
      void* p = taosArrayPush(pMaxWins, pKey);
×
1775
      QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1776
      preGpId = pKey->groupId;
×
1777
    }
1778
  }
1779

1780
_end:
×
1781
  if (code != TSDB_CODE_SUCCESS) {
×
1782
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1783
  }
1784
  return code;
×
1785
}
1786

1787
static int32_t doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pUpdatedMap) {
×
1788
  int32_t code = TSDB_CODE_SUCCESS;
×
1789
  int32_t lino = 0;
×
1790
  int32_t winCode = TSDB_CODE_SUCCESS;
×
1791

1792
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
1793
  uint64_t*        groupIds = (uint64_t*)pGroupCol->pData;
×
1794
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
1795
  TSKEY*           tsStarts = (TSKEY*)pStartCol->pData;
×
1796
  SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
1797
  TSKEY*           tsEnds = (TSKEY*)pEndCol->pData;
×
1798
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
1799
    TSKEY    ts = tsStarts[i];
×
1800
    TSKEY    endCalTs = tsEnds[i];
×
1801
    uint64_t groupId = groupIds[i];
×
1802
    SWinKey  key = {.ts = ts, .groupId = groupId};
×
1803
    while (1) {
×
1804
      SWinKey nextKey = {.groupId = groupId};
×
1805
      code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &key, &nextKey, NULL, NULL, &winCode);
×
1806
      QUERY_CHECK_CODE(code, lino, _end);
×
1807
      if (key.ts > endCalTs) {
×
1808
        break;
×
1809
      }
1810
      int32_t tmpRes = tSimpleHashRemove(pUpdatedMap, &key, sizeof(SWinKey));
×
1811
      qTrace("%s delete stream interp result at line %d res: %s", __func__, __LINE__, tstrerror(tmpRes));
×
1812

1813
      pAggSup->stateStore.streamStateDel(pAggSup->pState, &key);
×
1814
      if (winCode != TSDB_CODE_SUCCESS) {
×
1815
        break;
×
1816
      }
1817
      key = nextKey;
×
1818
    }
1819
  }
1820

1821
_end:
×
1822
  if (code != TSDB_CODE_SUCCESS) {
×
1823
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1824
  }
1825
  return code;
×
1826
}
1827

1828
int32_t setAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SSHashObj* pUpdatedMap) {
×
1829
  int32_t          code = TSDB_CODE_SUCCESS;
×
1830
  int32_t          lino = 0;
×
1831
  int64_t          groupId = 0;
×
1832
  SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState);
×
1833
  while (1) {
×
1834
    int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL);
×
1835
    if (winCode != TSDB_CODE_SUCCESS) {
×
1836
      break;
×
1837
    }
1838
    SWinKey key = {.ts = ts, .groupId = groupId};
×
1839
    code = saveTimeSliceWinResult(&key, pUpdatedMap);
×
1840
    QUERY_CHECK_CODE(code, lino, _end);
×
1841

1842
    pAggSup->stateStore.streamStateGroupCurNext(pCur);
×
1843
  }
1844
  pAggSup->stateStore.streamStateFreeCur(pCur);
×
1845
  pCur = NULL;
×
1846

1847
_end:
×
1848
  if (code != TSDB_CODE_SUCCESS) {
×
1849
    pAggSup->stateStore.streamStateFreeCur(pCur);
×
1850
    pCur = NULL;
×
1851
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1852
  }
1853
  return code;
×
1854
}
1855

1856
static void removeDuplicateTs(SArray* pTsArrray) {
×
1857
  __compar_fn_t fn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, TSDB_ORDER_ASC);
×
1858
  taosArraySort(pTsArrray, fn);
×
1859
  taosArrayRemoveDuplicate(pTsArrray, fn, NULL);
×
1860
}
×
1861

1862
static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
1863
  int32_t                       code = TSDB_CODE_SUCCESS;
×
1864
  int32_t                       lino = 0;
×
1865
  SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
×
1866
  SExecTaskInfo*                pTaskInfo = pOperator->pTaskInfo;
×
1867
  SStreamAggSupporter*          pAggSup = &pInfo->streamAggSup;
×
1868

1869
  if (pOperator->status == OP_EXEC_DONE) {
×
1870
    (*ppRes) = NULL;
×
1871
    goto _end;
×
1872
  }
1873

1874
  if (pOperator->status == OP_RES_TO_RETURN) {
×
1875
    if (hasRemainCalc(pInfo->pFillInfo) ||
×
1876
        (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
×
1877
      blockDataCleanup(pInfo->pRes);
×
1878
      doStreamTimeSliceFillRange(pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes);
×
1879
      if (pInfo->pRes->info.rows > 0) {
×
1880
        printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
1881
        (*ppRes) = pInfo->pRes;
×
1882
        goto _end;
×
1883
      }
1884
    }
1885

1886
    SSDataBlock* resBlock = NULL;
×
1887
    code = buildTimeSliceResult(pOperator, &resBlock);
×
1888
    QUERY_CHECK_CODE(code, lino, _end);
×
1889

1890
    if (resBlock != NULL) {
×
1891
      (*ppRes) = resBlock;
×
1892
      goto _end;
×
1893
    }
1894

1895
    if (pInfo->recvCkBlock) {
×
1896
      pInfo->recvCkBlock = false;
×
1897
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
1898
      (*ppRes) = pInfo->pCheckpointRes;
×
1899
      goto _end;
×
1900
    }
1901

1902
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
×
1903
      pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, 1, INT64_MAX);
×
1904
    }
1905
    setStreamOperatorCompleted(pOperator);
×
1906
    resetStreamFillSup(pInfo->pFillSup);
×
1907
    (*ppRes) = NULL;
×
1908
    goto _end;
×
1909
  }
1910

1911
  SSDataBlock*   fillResult = NULL;
×
1912
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
1913
  while (1) {
×
1914
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
1915
    if (pBlock == NULL) {
×
1916
      pOperator->status = OP_RES_TO_RETURN;
×
1917
      qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType),
×
1918
             pInfo->numOfDatapack);
1919
      pInfo->numOfDatapack = 0;
×
1920
      break;
×
1921
    }
1922
    pInfo->numOfDatapack++;
×
1923
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
×
1924
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
×
1925

1926
    switch (pBlock->info.type) {
×
1927
      case STREAM_DELETE_RESULT:
×
1928
      case STREAM_DELETE_DATA: {
1929
        code = doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap);
×
1930
        QUERY_CHECK_CODE(code, lino, _end);
×
1931
        code = copyDataBlock(pInfo->pDelRes, pBlock);
×
1932
        QUERY_CHECK_CODE(code, lino, _end);
×
1933
        pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
×
1934
        (*ppRes) = pInfo->pDelRes;
×
1935
        printDataBlock((*ppRes), getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
1936
        goto _end;
×
1937
      } break;
1938
      case STREAM_NORMAL:
×
1939
      case STREAM_INVALID: {
1940
        SExprSupp* pExprSup = &pInfo->scalarSup;
×
1941
        if (pExprSup->pExprInfo != NULL) {
×
1942
          code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
1943
          QUERY_CHECK_CODE(code, lino, _end);
×
1944
        }
1945
      } break;
×
1946
      case STREAM_CHECKPOINT: {
×
1947
        pInfo->recvCkBlock = true;
×
1948
        pAggSup->stateStore.streamStateCommit(pAggSup->pState);
×
1949
        doStreamTimeSliceSaveCheckpoint(pOperator);
×
1950
        code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
×
1951
        QUERY_CHECK_CODE(code, lino, _end);
×
1952
        continue;
×
1953
      } break;
1954
      case STREAM_CREATE_CHILD_TABLE: {
×
1955
        (*ppRes) = pBlock;
×
1956
        goto _end;
×
1957
      } break;
1958
      case STREAM_GET_RESULT: {
×
1959
        void* pPushRes = taosArrayPush(pInfo->pCloseTs, &pBlock->info.window.skey);
×
1960
        QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
×
1961
        continue;
×
1962
      }
1963
      default:
×
1964
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1965
        QUERY_CHECK_CODE(code, lino, _end);
×
1966
    }
1967

1968
    doStreamTimeSliceImpl(pOperator, pBlock);
×
1969
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
×
1970
  }
1971

1972
  if (pInfo->destHasPrimaryKey) {
×
1973
    code = copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins);
×
1974
    QUERY_CHECK_CODE(code, lino, _end);
×
1975
  }
1976

1977
  if (taosArrayGetSize(pInfo->pCloseTs) > 0) {
×
1978
    removeDuplicateTs(pInfo->pCloseTs);
×
1979
    int32_t size = taosArrayGetSize(pInfo->pCloseTs);
×
1980
    qDebug("===stream===build stream result, ts count:%d", size);
×
1981
    for (int32_t i = 0; i < size; i++) {
×
1982
      TSKEY ts = *(TSKEY*)taosArrayGet(pInfo->pCloseTs, i);
×
1983
      code = buildAllResultKey(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pState, ts, pInfo->pUpdated);
×
1984
      QUERY_CHECK_CODE(code, lino, _end);
×
1985
    }
1986
    qDebug("===stream===build stream result, res count:%ld", taosArrayGetSize(pInfo->pUpdated));
×
1987
    taosArrayClear(pInfo->pCloseTs);
×
1988
    if (size > 1024) {
×
1989
      taosArrayDestroy(pInfo->pCloseTs);
×
1990
      pInfo->pCloseTs = taosArrayInit(1024, sizeof(TSKEY));
×
1991
    }
1992
  } else {
1993
    void*   pIte = NULL;
×
1994
    int32_t iter = 0;
×
1995
    while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
×
1996
      SWinKey* pKey = (SWinKey*)tSimpleHashGetKey(pIte, NULL);
×
1997
      void*    tmp = taosArrayPush(pInfo->pUpdated, pKey);
×
1998
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
1999
    }
2000
  }
2001
  taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
×
2002

2003
  if (pInfo->isHistoryOp) {
×
2004
    code = getSliceMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
×
2005
    QUERY_CHECK_CODE(code, lino, _end);
×
2006
  }
2007

2008
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
×
2009
  pInfo->groupResInfo.freeItem = false;
×
2010

2011
  pInfo->pUpdated = taosArrayInit(16, sizeof(SWinKey));
×
2012
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
×
2013

2014
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
×
2015
  QUERY_CHECK_CODE(code, lino, _end);
×
2016

2017
  tSimpleHashCleanup(pInfo->pUpdatedMap);
×
2018
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
2019
  pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
×
2020

2021
  code = buildTimeSliceResult(pOperator, ppRes);
×
2022
  QUERY_CHECK_CODE(code, lino, _end);
×
2023

2024
  if (!(*ppRes)) {
×
2025
    if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
×
2026
      pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, 1, INT64_MAX);
×
2027
    }
2028
    setStreamOperatorCompleted(pOperator);
×
2029
    resetStreamFillSup(pInfo->pFillSup);
×
2030
  }
2031

2032
_end:
×
2033
  if (code != TSDB_CODE_SUCCESS) {
×
2034
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2035
  }
2036
  return code;
×
2037
}
2038

2039
static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
×
2040
  if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F) {
×
2041
    int32_t valueIndex = 0;
×
2042
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
×
2043
      SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
×
2044
      if (!isInterpFunc(pFillCol->pExpr)) {
×
2045
        continue;
×
2046
      }
2047
      int32_t          srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
×
2048
      SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot, pFillSup->pOffsetInfo);
×
2049
      SFillColInfo*    pValueCol = pFillSup->pAllColInfo + valueIndex;
×
2050
      SVariant*        pVar = &(pValueCol->fillVal);
×
2051
      if (pCell->type == TSDB_DATA_TYPE_FLOAT) {
×
2052
        float v = 0;
×
2053
        GET_TYPED_DATA(v, float, pVar->nType, &pVar->i, 0);
×
2054
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
×
2055
      } else if (IS_FLOAT_TYPE(pCell->type)) {
×
2056
        double v = 0;
×
2057
        GET_TYPED_DATA(v, double, pVar->nType, &pVar->i, 0);
×
2058
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
×
2059
      } else if (IS_INTEGER_TYPE(pCell->type)) {
×
2060
        int64_t v = 0;
×
2061
        GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i, 0);
×
2062
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
×
2063
      } else {
2064
        pCell->isNull = true;
×
2065
      }
2066
      valueIndex++;
×
2067
    }
2068
  } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
×
2069
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
×
2070
      SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
×
2071
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
×
2072
      SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, slotId, pFillSup->pOffsetInfo);
×
2073
      pCell->isNull = true;
×
2074
    }
2075
  }
2076
}
×
2077

2078
int32_t getDownstreamRes(SOperatorInfo* downstream, SSDataBlock** ppRes, SColumnInfo** ppPkCol) {
×
2079
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
2080
    SStreamScanInfo* pInfo = (SStreamScanInfo*)downstream->info;
×
2081
    *ppRes = pInfo->pRes;
×
2082
    if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
2083
      SColumnInfoData* pPkColInfo = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->basic.primaryPkIndex);
×
2084
      (*ppPkCol) = &pPkColInfo->info;
×
2085
    }
2086
    return TSDB_CODE_SUCCESS;
×
2087
  } else if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
×
2088
    SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)downstream->info;
×
2089
    *ppRes = pInfo->binfo.pRes;
×
2090
    if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
×
2091
      SColumnInfoData* pPkColInfo = taosArrayGet(pInfo->binfo.pRes->pDataBlock, pInfo->basic.primaryPkIndex);
×
2092
      (*ppPkCol) = &pPkColInfo->info;
×
2093
    }
2094
    return TSDB_CODE_SUCCESS;
×
2095
  }
2096
  qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_FAILED));
×
2097
  return TSDB_CODE_FAILED;
×
2098
}
2099

2100
int32_t initTimeSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type,
×
2101
                                int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic,
2102
                                SStreamFillSupporter* pFillSup) {
2103
  SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
×
2104
  int32_t        code = TSDB_CODE_SUCCESS;
×
2105
  int32_t        lino = 0;
×
2106
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
×
2107
    SStreamPartitionOperatorInfo* pPartionInfo = downstream->info;
×
2108
    pPartionInfo->tsColIndex = tsColIndex;
×
2109
    pBasic->primaryPkIndex = pPartionInfo->basic.primaryPkIndex;
×
2110
  }
2111

2112
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
2113
    code = initTimeSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pFillSup);
×
2114
    return code;
×
2115
  }
2116
  SStreamScanInfo* pScanInfo = downstream->info;
×
2117
  pScanInfo->igCheckUpdate = true;
×
2118
  pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
×
2119
  pScanInfo->pState = pAggSup->pState;
×
2120
  if (!pScanInfo->pUpdateInfo) {
×
2121
    code = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
×
2122
                                              pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen,
×
2123
                                              &pScanInfo->pUpdateInfo);
×
2124
    QUERY_CHECK_CODE(code, lino, _end);
×
2125
  }
2126
  pScanInfo->twAggSup = *pTwSup;
×
2127
  pScanInfo->pFillSup = pFillSup;
×
2128
  pScanInfo->interval = pFillSup->interval;
×
2129
  pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
×
2130
  if (!hasSrcPrimaryKeyCol(pBasic)) {
×
2131
    pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
×
2132
  }
2133

2134
_end:
×
2135
  if (code != TSDB_CODE_SUCCESS) {
×
2136
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
2137
  }
2138
  return code;
×
2139
}
2140

2141
int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
×
2142
                                          SReadHandle* pHandle, SOperatorInfo** ppOptInfo) {
2143
  int32_t                       code = TSDB_CODE_SUCCESS;
×
2144
  int32_t                       lino = 0;
×
2145
  SStreamTimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTimeSliceOperatorInfo));
×
2146
  QUERY_CHECK_NULL(pInfo, code, lino, _error, terrno);
×
2147

2148
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2149
  QUERY_CHECK_NULL(pOperator, code, lino, _error, terrno);
×
2150

2151
  SStreamInterpFuncPhysiNode* pInterpPhyNode = (SStreamInterpFuncPhysiNode*)pPhyNode;
×
2152
  pOperator->pTaskInfo = pTaskInfo;
×
2153
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2154
  SExprSupp* pExpSup = &pOperator->exprSupp;
×
2155
  int32_t    numOfExprs = 0;
×
2156
  SExprInfo* pExprInfo = NULL;
×
2157
  code = createExprInfo(pInterpPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
×
2158
  QUERY_CHECK_CODE(code, lino, _error);
×
2159

2160
  code = initExprSupp(pExpSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
×
2161
  QUERY_CHECK_CODE(code, lino, _error);
×
2162

2163
  if (pInterpPhyNode->pExprs != NULL) {
×
2164
    int32_t    num = 0;
×
2165
    SExprInfo* pScalarExprInfo = NULL;
×
2166
    code = createExprInfo(pInterpPhyNode->pExprs, NULL, &pScalarExprInfo, &num);
×
2167
    QUERY_CHECK_CODE(code, lino, _error);
×
2168

2169
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
×
2170
    QUERY_CHECK_CODE(code, lino, _error);
×
2171
  }
2172

2173
  code = filterInitFromNode((SNode*)pInterpPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
×
2174
  QUERY_CHECK_CODE(code, lino, _error);
×
2175

2176
  pInfo->twAggSup = (STimeWindowAggSupp){
×
2177
      .waterMark = pInterpPhyNode->streamNodeOption.watermark,
×
2178
      .calTrigger = pInterpPhyNode->streamNodeOption.triggerType,
×
2179
      .maxTs = INT64_MIN,
2180
      .minTs = INT64_MAX,
2181
      .deleteMark = getDeleteMarkFromOption(&pInterpPhyNode->streamNodeOption),
×
2182
  };
2183

2184
  pInfo->primaryTsIndex = ((SColumnNode*)pInterpPhyNode->pTimeSeries)->slotId;
×
2185

2186
  SSDataBlock* pDownRes = NULL;
×
2187
  SColumnInfo* pPkCol = NULL;
×
2188
  code = getDownstreamRes(downstream, &pDownRes, &pPkCol);
×
2189
  QUERY_CHECK_CODE(code, lino, _error);
×
2190

2191
  pInfo->pFillSup = NULL;
×
2192
  code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, pDownRes, pPkCol, &pInfo->pFillSup);
×
2193
  QUERY_CHECK_CODE(code, lino, _error);
×
2194

2195
  int32_t ratio = 1;
×
2196
  if (pInfo->pFillSup->type == TSDB_FILL_LINEAR) {
×
2197
    ratio = 2;
×
2198
  }
2199

2200
  int32_t keyBytes = sizeof(TSKEY);
×
2201
  keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock);
×
2202
  if (pPkCol) {
×
2203
    keyBytes += pPkCol->bytes;
×
2204
  }
2205
  code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState, keyBytes, 0,
×
2206
                                &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo),
×
2207
                                &pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_HASH_SORT, ratio);
2208
  QUERY_CHECK_CODE(code, lino, _error);
×
2209

2210
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
2211
  QUERY_CHECK_CODE(code, lino, _error);
×
2212

2213
  pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
×
2214
  pInfo->delIndex = 0;
×
2215
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
×
2216
  QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno);
×
2217

2218
  pInfo->pDelRes = NULL;
×
2219
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
×
2220
  QUERY_CHECK_CODE(code, lino, _error);
×
2221

2222
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
2223
  pInfo->pDeletedMap = tSimpleHashInit(1024, hashFn);
×
2224
  QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno);
×
2225

2226
  pInfo->ignoreExpiredData = pInterpPhyNode->streamNodeOption.igExpired;
×
2227
  pInfo->ignoreExpiredDataSaved = false;
×
2228
  pInfo->pUpdated = taosArrayInit(64, sizeof(SWinKey));
×
2229
  pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
×
2230
  pInfo->historyPoints = taosArrayInit(4, sizeof(SWinKey));
×
2231
  QUERY_CHECK_NULL(pInfo->historyPoints, code, lino, _error, terrno);
×
2232

2233
  pInfo->recvCkBlock = false;
×
2234
  pInfo->pCheckpointRes = NULL;
×
2235
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
×
2236
  QUERY_CHECK_CODE(code, lino, _error);
×
2237

2238
  pInfo->destHasPrimaryKey = pInterpPhyNode->streamNodeOption.destHasPrimaryKey;
×
2239
  pInfo->numOfDatapack = 0;
×
2240

2241
  pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pDownRes);
×
2242
  copyFillValueInfo(pInfo->pFillSup, pInfo->pFillInfo);
×
2243
  pInfo->ignoreNull = getIgoreNullRes(pExpSup);
×
2244

2245
  pInfo->historyWins = taosArrayInit(4, sizeof(SWinKey));
×
2246
  QUERY_CHECK_NULL(pInfo->historyWins, code, lino, _error, terrno);
×
2247

2248
  if (pHandle) {
×
2249
    pInfo->isHistoryOp = (pHandle->fillHistory == STREAM_HISTORY_OPERATOR);
×
2250
  }
2251

2252
  pInfo->pCloseTs = taosArrayInit(1024, sizeof(TSKEY));
×
2253
  QUERY_CHECK_NULL(pInfo->pCloseTs, code, lino, _error, terrno);
×
2254

2255
  pInfo->pOperator = pOperator;
×
2256

2257
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC;
×
2258
  setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC,
×
2259
                  true, OP_NOT_OPENED, pInfo, pTaskInfo);
2260
  // for stream
2261
  void*   buff = NULL;
×
2262
  int32_t len = 0;
×
2263
  int32_t res = pTaskInfo->storageAPI.stateStore.streamStateGetInfo(
×
2264
      pTaskInfo->streamInfo.pState, STREAM_TIME_SLICE_OP_CHECKPOINT_NAME, strlen(STREAM_TIME_SLICE_OP_CHECKPOINT_NAME),
2265
      &buff, &len);
2266
  if (res == TSDB_CODE_SUCCESS) {
×
2267
    code = doStreamTimeSliceDecodeOpState(buff, len, pOperator);
×
2268
    taosMemoryFree(buff);
×
2269
    QUERY_CHECK_CODE(code, lino, _error);
×
2270
  }
2271
  pOperator->fpSet =
2272
      createOperatorFpSet(optrDummyOpenFn, doStreamTimeSliceNext, NULL, destroyStreamTimeSliceOperatorInfo,
×
2273
                          optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2274
  setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState);
×
2275

2276
  code = initStreamBasicInfo(&pInfo->basic, pOperator);
×
2277
  QUERY_CHECK_CODE(code, lino, _error);
×
2278
  if (downstream) {
×
2279
    code = initTimeSliceDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
×
2280
                                   &pInfo->twAggSup, &pInfo->basic, pInfo->pFillSup);
×
2281
    QUERY_CHECK_CODE(code, lino, _error);
×
2282

2283
    code = appendDownstream(pOperator, &downstream, 1);
×
2284
    QUERY_CHECK_CODE(code, lino, _error);
×
2285
  }
2286
  (*ppOptInfo) = pOperator;
×
2287
  return code;
×
2288

2289
_error:
×
2290
  if (code != TSDB_CODE_SUCCESS) {
×
2291
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2292
  }
2293
  if (pInfo != NULL) {
×
2294
    destroyStreamTimeSliceOperatorInfo(pInfo);
×
2295
  }
2296
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2297
  pTaskInfo->code = code;
×
2298
  (*ppOptInfo) = NULL;
×
2299
  return code;
×
2300
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc