• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3843

08 Apr 2025 10:23AM UTC coverage: 63.077% (+0.4%) from 62.696%
#3843

push

travis-ci

web-flow
fix: clear cache when meta abort (#30674)

155571 of 315083 branches covered (49.37%)

Branch coverage included in aggregate %.

241876 of 315013 relevant lines covered (76.78%)

19243431.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.51
/source/libs/executor/src/streamintervalsliceoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "functionMgt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "storageapi.h"
20
#include "streamexecutorInt.h"
21
#include "streaminterval.h"
22
#include "tcommon.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "ttime.h"
26

27
#define STREAM_INTERVAL_SLICE_OP_CHECKPOINT_NAME "StreamIntervalSliceOperator_Checkpoint"
28

29
void streamIntervalSliceReleaseState(SOperatorInfo* pOperator) {}
×
30

31
void streamIntervalSliceReloadState(SOperatorInfo* pOperator) {}
×
32

33
void destroyStreamIntervalSliceOperatorInfo(void* param) {
161✔
34
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)param;
161✔
35
  if (param == NULL) {
161!
36
    return;
×
37
  }
38
  cleanupBasicInfo(&pInfo->binfo);
161✔
39
  if (pInfo->pOperator) {
161!
40
    cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
161✔
41
                              &pInfo->groupResInfo);
42
    pInfo->pOperator = NULL;
161✔
43
  }
44

45
  destroyStreamBasicInfo(&pInfo->basic);
161✔
46
  clearGroupResInfo(&pInfo->groupResInfo);
161✔
47
  taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
161✔
48
  pInfo->pUpdated = NULL;
161✔
49

50
  if (pInfo->pUpdatedMap != NULL) {
161!
51
    tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos);
161✔
52
    tSimpleHashCleanup(pInfo->pUpdatedMap);
161✔
53
    pInfo->pUpdatedMap = NULL;
161✔
54
  }
55
  destroyStreamAggSupporter(&pInfo->streamAggSup);
161✔
56

57
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
161✔
58
  cleanupExprSupp(&pInfo->scalarSup);
161✔
59

60
  tSimpleHashCleanup(pInfo->pDeletedMap);
161✔
61
  taosArrayDestroy(pInfo->pDelWins);
161✔
62
  blockDataDestroy(pInfo->pDelRes);
161✔
63

64
  blockDataDestroy(pInfo->pCheckpointRes);
161✔
65
  taosMemoryFreeClear(pInfo->pOffsetInfo);
161!
66
  destroyNonBlockAggSupptor(&pInfo->nbSup);
161✔
67

68
  taosMemoryFreeClear(param);
161!
69
}
70

71
int32_t buildIntervalSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,046✔
72
  int32_t                           code = TSDB_CODE_SUCCESS;
3,046✔
73
  int32_t                           lino = 0;
3,046✔
74
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
3,046✔
75
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
3,046✔
76
  uint16_t                          opType = pOperator->operatorType;
3,046✔
77
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
3,046✔
78
  SStreamNotifyEventSupp*           pNotifySup = &pInfo->basic.notifyEventSup;
3,046✔
79
  STaskNotifyEventStat*             pNotifyEventStat = pTaskInfo->streamInfo.pNotifyEventStat;
3,046✔
80
  bool                              addNotifyEvent = false;
3,046✔
81
  addNotifyEvent = IS_NORMAL_INTERVAL_OP(pOperator) &&
3,045!
82
                   BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE);
×
83
  (*ppRes) = NULL;
3,046✔
84

85
  if (isFinalOperator(&pInfo->basic)) {
3,046✔
86
    doBuildPullDataBlock(pInfo->nbSup.pPullWins, &pInfo->nbSup.pullIndex, pInfo->nbSup.pPullDataRes);
140✔
87
    if (pInfo->nbSup.pPullDataRes->info.rows != 0) {
140✔
88
      printDataBlock(pInfo->nbSup.pPullDataRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
2✔
89
      (*ppRes) = pInfo->nbSup.pPullDataRes;
2✔
90
      return code;
2✔
91
    }
92
  }
93

94
  if (pOperator->status == OP_RES_TO_RETURN) {
3,045✔
95
    doBuildDeleteResultImpl(&pInfo->streamAggSup.stateStore, pTaskInfo->streamInfo.pState, pInfo->pDelWins,
2,520✔
96
                            &pInfo->delIndex, pInfo->pDelRes);
97
    if (pInfo->pDelRes->info.rows != 0) {
2,520✔
98
      printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
4✔
99
      if (addNotifyEvent) {
4!
100
        code = addAggDeleteNotifyEvent(pInfo->pDelRes, pNotifySup, pNotifyEventStat);
×
101
        QUERY_CHECK_CODE(code, lino, _end);
×
102
      }
103
      (*ppRes) = pInfo->pDelRes;
4✔
104
      return code;
4✔
105
    }
106
  }
107

108
  doBuildStreamIntervalResult(pOperator, pInfo->streamAggSup.pState, pInfo->binfo.pRes, &pInfo->groupResInfo,
3,041!
109
                              addNotifyEvent ? pNotifySup->pSessionKeys : NULL);
110
  if (pInfo->binfo.pRes->info.rows != 0) {
3,041✔
111
    printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
351✔
112
    if (addNotifyEvent) {
351!
113
      code = addAggResultNotifyEvent(pInfo->binfo.pRes, pNotifySup->pSessionKeys,
×
114
                                     pTaskInfo->streamInfo.notifyResultSchema, pNotifySup, pNotifyEventStat);
×
115
      QUERY_CHECK_CODE(code, lino, _end);
×
116
    }
117
    (*ppRes) = pInfo->binfo.pRes;
351✔
118
    goto _end;
351✔
119
  }
120

121
  code = buildNotifyEventBlock(pTaskInfo, pNotifySup, pNotifyEventStat);
2,690✔
122
  QUERY_CHECK_CODE(code, lino, _end);
2,689!
123
  if (pNotifySup->pEventBlock && pNotifySup->pEventBlock->info.rows > 0) {
2,689!
124
    printDataBlock(pNotifySup->pEventBlock, getStreamOpName(opType), GET_TASKID(pTaskInfo));
×
125
    (*ppRes) = pNotifySup->pEventBlock;
×
126
    return code;
×
127
  }
128

129
  code = removeOutdatedNotifyEvents(&pInfo->twAggSup, pNotifySup, pNotifyEventStat);
2,689✔
130
  QUERY_CHECK_CODE(code, lino, _end);
2,686!
131

132
_end:
2,686✔
133
  if (code != TSDB_CODE_SUCCESS) {
3,037!
134
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
135
  }
136
  return code;
3,038✔
137
}
138

139
// static void doStreamIntervalSliceSaveCheckpoint(SOperatorInfo* pOperator) {
140
// }
141

142
void initIntervalSlicePoint(SStreamAggSupporter* pAggSup, STimeWindow* pTWin, int64_t groupId,
778✔
143
                            SInervalSlicePoint* pPoint) {
144
  pPoint->winKey.groupId = groupId;
778✔
145
  pPoint->winKey.win = *pTWin;
778✔
146
  pPoint->pFinished = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize);
778✔
147
  pPoint->pLastRow = POINTER_SHIFT(pPoint->pFinished, sizeof(bool));
778✔
148
}
778✔
149

150
int32_t getIntervalSlicePrevStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, SWinKey* pCurKey,
152✔
151
                                     SInervalSlicePoint* pPrevPoint) {
152
  int32_t code = TSDB_CODE_SUCCESS;
152✔
153
  int32_t lino = 0;
152✔
154
  SWinKey prevKey = {.groupId = pCurKey->groupId};
152✔
155
  SET_WIN_KEY_INVALID(prevKey.ts);
152✔
156
  int32_t prevVLen = 0;
152✔
157
  int32_t prevWinCode = TSDB_CODE_SUCCESS;
152✔
158
  code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, pCurKey, &prevKey, (void**)&pPrevPoint->pResPos,
152✔
159
                                                &prevVLen, &prevWinCode);
160
  QUERY_CHECK_CODE(code, lino, _end);
152!
161

162
  if (prevWinCode == TSDB_CODE_SUCCESS) {
152✔
163
    STimeWindow prevSTW = {.skey = prevKey.ts};
109✔
164
    prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval);
109✔
165
    initIntervalSlicePoint(pAggSup, &prevSTW, pCurKey->groupId, pPrevPoint);
109✔
166
    qDebug("===stream=== set stream twa prev point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d",
109!
167
           pPrevPoint->winKey.win.skey, pPrevPoint->winKey.groupId, prevWinCode);
168
  } else {
169
    SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey);
43✔
170
    SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey);
43✔
171
  }
172

173
_end:
152✔
174
  if (code != TSDB_CODE_SUCCESS) {
152!
175
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
176
  }
177
  return code;
152✔
178
}
179

180
int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, bool needPrev,
669✔
181
                                    STimeWindow* pTWin, int64_t groupId, SInervalSlicePoint* pCurPoint,
182
                                    SInervalSlicePoint* pPrevPoint, int32_t* pWinCode) {
183
  int32_t code = TSDB_CODE_SUCCESS;
669✔
184
  int32_t lino = 0;
669✔
185
  SWinKey curKey = {.ts = pTWin->skey, .groupId = groupId};
669✔
186
  int32_t curVLen = 0;
669✔
187
  code = pAggSup->stateStore.streamStateAddIfNotExist(pAggSup->pState, &curKey, (void**)&pCurPoint->pResPos, &curVLen,
669✔
188
                                                      pWinCode);
189
  QUERY_CHECK_CODE(code, lino, _end);
669!
190

191
  qDebug("===stream=== set stream twa cur point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d", curKey.ts,
669!
192
         curKey.groupId, *pWinCode);
193

194
  initIntervalSlicePoint(pAggSup, pTWin, groupId, pCurPoint);
669✔
195

196
  if (needPrev) {
669✔
197
    code = getIntervalSlicePrevStateBuf(pAggSup, pInterval, &curKey, pPrevPoint);
103✔
198
    QUERY_CHECK_CODE(code, lino, _end);
103!
199
  }
200

201
_end:
669✔
202
  if (code != TSDB_CODE_SUCCESS) {
669!
203
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
204
  }
205
  return code;
669✔
206
}
207

208
void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY curTs, SSDataBlock* pDataBlock,
152✔
209
                                int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type, int32_t* pOffsetInfo) {
210
  SqlFunctionCtx* pCtx = pSup->pCtx;
152✔
211
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
1,256✔
212
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
1,104✔
213
      pCtx[k].start.key = INT64_MIN;
784✔
214
      continue;
784✔
215
    }
216

217
    SFunctParam*     pParam = &pCtx[k].param[0];
320✔
218
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, pParam->pCol->slotId);
320✔
219

220
    double           prevVal = 0, curVal = 0, winVal = 0;
320✔
221
    SResultCellData* pCell =
222
        getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId, pOffsetInfo);
320✔
223
    GET_TYPED_DATA(prevVal, double, pCell->type, pCell->pData, typeGetTypeModFromColInfo(&pColInfo->info));
320!
224
    GET_TYPED_DATA(curVal, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex), typeGetTypeModFromColInfo(&pColInfo->info));
320!
225

226
    SPoint point1 = (SPoint){.key = pPrevWinVal->key, .val = &prevVal};
320✔
227
    SPoint point2 = (SPoint){.key = curTs, .val = &curVal};
320✔
228
    SPoint point = (SPoint){.key = winKey, .val = &winVal};
320✔
229

230
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
320✔
231
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
216✔
232
    }
233

234
    if (type == INTERVAL_SLICE_START) {
320✔
235
      pCtx[k].start.key = point.key;
160✔
236
      pCtx[k].start.val = winVal;
160✔
237
    } else {
238
      pCtx[k].end.key = point.key;
160✔
239
      pCtx[k].end.val = winVal;
160✔
240
    }
241
  }
242
}
152✔
243

244
void doSetElapsedEndKey(TSKEY winKey, SExprSupp* pSup) {
76✔
245
  SqlFunctionCtx* pCtx = pSup->pCtx;
76✔
246
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
628✔
247
    if (fmIsElapsedFunc(pCtx[k].functionId)) {
552✔
248
      pCtx[k].end.key = winKey;
52✔
249
      pCtx[k].end.val = 0;
52✔
250
    }
251
  }
252
}
76✔
253

254
void resetIntervalSliceFunctionKey(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
489✔
255
  for (int32_t k = 0; k < numOfOutput; ++k) {
5,020✔
256
    pCtx[k].start.key = INT64_MIN;
4,531✔
257
    pCtx[k].end.key = INT64_MIN;
4,531✔
258
  }
259
}
489✔
260

261
static int32_t checkAndRecoverPointBuff(SStreamAggSupporter* pAggSup, SInervalSlicePoint* pPoint) {
745✔
262
  int32_t code = TSDB_CODE_SUCCESS;
745✔
263
  if (pPoint->pResPos->pRowBuff == NULL) {
745!
264
    void* pVal = NULL;
×
265
    // recover curPoint.pResPos->pRowBuff
266
    code = pAggSup->stateStore.streamStateGetByPos(pAggSup->pState, pPoint->pResPos, &pVal);
×
267
    pPoint->pFinished = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize);
×
268
    pPoint->pLastRow = POINTER_SHIFT(pPoint->pFinished, sizeof(bool));
×
269
  }
270
  return code;
745✔
271
}
272

273
int32_t setIntervalSliceOutputBuf(SStreamAggSupporter* pAggSup, SInervalSlicePoint* pPoint, SqlFunctionCtx* pCtx,
745✔
274
                                  int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
275
  int32_t code = TSDB_CODE_SUCCESS;
745✔
276
  int32_t lino = 0;
745✔
277

278
  checkAndRecoverPointBuff(pAggSup, pPoint);
745✔
279
  SResultRow* res = pPoint->pResPos->pRowBuff;
745✔
280

281
  // set time window for current result
282
  res->win = pPoint->winKey.win;
745✔
283
  code = setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset);
745✔
284
  QUERY_CHECK_CODE(code, lino, _end);
745!
285

286
_end:
745✔
287
  if (code != TSDB_CODE_SUCCESS) {
745!
288
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
289
  }
290
  return code;
745✔
291
}
292

293
void setInterpoWindowFinished(SInervalSlicePoint* pPoint) { (*pPoint->pFinished) = true; }
76✔
294

295
bool isInterpoWindowFinished(SInervalSlicePoint* pPoint) { return *pPoint->pFinished; }
76✔
296

297
static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock, SSHashObj* pUpdatedMap,
217✔
298
                                            SSHashObj* pDeletedMap) {
299
  int32_t                           code = TSDB_CODE_SUCCESS;
217✔
300
  int32_t                           lino = 0;
217✔
301
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)pOperator->info;
217✔
302
  SResultRowInfo*                   pResultRowInfo = &(pInfo->binfo.resultRowInfo);
217✔
303
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
217✔
304
  SExprSupp*                        pSup = &pOperator->exprSupp;
217✔
305
  int32_t                           numOfOutput = pSup->numOfExprs;
217✔
306
  TSKEY*                            tsCols = NULL;
217✔
307
  int64_t                           groupId = pBlock->info.id.groupId;
217✔
308
  SResultRow*                       pResult = NULL;
217✔
309
  int32_t                           forwardRows = 0;
217✔
310

311
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
217✔
312
  tsCols = (int64_t*)pColDataInfo->pData;
217✔
313

314
  int32_t            startPos = 0;
217✔
315
  TSKEY              curTs = getStartTsKey(&pBlock->info.window, tsCols);
217✔
316
  SInervalSlicePoint curPoint = {0};
217✔
317
  SInervalSlicePoint prevPoint = {0};
217✔
318
  STimeWindow        curWin = getActiveTimeWindow(NULL, pResultRowInfo, curTs, &pInfo->interval, TSDB_ORDER_ASC);
217✔
319
  while (1) {
48✔
320
    int32_t winCode = TSDB_CODE_SUCCESS;
265✔
321
    if (curTs <= pInfo->endTs) {
265✔
322
      code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin,
262✔
323
                                         groupId, &curPoint, &prevPoint, &winCode);
324
      QUERY_CHECK_CODE(code, lino, _end);
262!
325
    } else if (pInfo->hasInterpoFunc) {
3!
326
      SWinKey curKey = {.ts = curWin.skey, .groupId = groupId};
3✔
327
      code = getIntervalSlicePrevStateBuf(&pInfo->streamAggSup, &pInfo->interval, &curKey, &prevPoint);
3✔
328
      QUERY_CHECK_CODE(code, lino, _end);
3!
329
    }
330

331
    if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) &&
265✔
332
        isInterpoWindowFinished(&prevPoint) == false) {
152!
333
      code = setIntervalSliceOutputBuf(&pInfo->streamAggSup, &prevPoint, pSup->pCtx, numOfOutput,
76✔
334
                                       pSup->rowEntryInfoOffset);
335
      QUERY_CHECK_CODE(code, lino, _end);
76!
336

337
      resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
76✔
338
      doSetElapsedEndKey(prevPoint.winKey.win.ekey, &pOperator->exprSupp);
76✔
339
      doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos,
76✔
340
                                 &pOperator->exprSupp, INTERVAL_SLICE_END, pInfo->pOffsetInfo);
341
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1);
76✔
342
      code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
76✔
343
                                             pBlock->info.rows, numOfOutput);
76✔
344
      QUERY_CHECK_CODE(code, lino, _end);
76!
345
      SWinKey prevKey = {.ts = prevPoint.winKey.win.skey, .groupId = prevPoint.winKey.groupId};
76✔
346
      code = saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap);
76✔
347
      QUERY_CHECK_CODE(code, lino, _end);
76!
348
      setInterpoWindowFinished(&prevPoint);
76✔
349
    } else if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey)) {
189✔
350
      releaseOutputBuf(pInfo->streamAggSup.pState, prevPoint.pResPos, &pInfo->streamAggSup.stateStore);
159✔
351
    }
352

353
    if (curTs > pInfo->endTs) {
265✔
354
      break;
217✔
355
    }
356

357
    code =
358
        setIntervalSliceOutputBuf(&pInfo->streamAggSup, &curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
262✔
359
    QUERY_CHECK_CODE(code, lino, _end);
262!
360

361
    if (winCode != TSDB_CODE_SUCCESS && IS_NORMAL_INTERVAL_OP(pOperator) &&
262!
362
        BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN)) {
×
363
      SSessionKey key = {.win = curWin, .groupId = groupId};
×
364
      code = addIntervalAggNotifyEvent(SNOTIFY_EVENT_WINDOW_OPEN, &key, &pInfo->basic.notifyEventSup,
×
365
                                       pTaskInfo->streamInfo.pNotifyEventStat);
366
      QUERY_CHECK_CODE(code, lino, _end);
×
367
    }
368

369
    resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
262✔
370
    if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) {
262!
371
      doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos,
76✔
372
                                 &pOperator->exprSupp, INTERVAL_SLICE_START, pInfo->pOffsetInfo);
373
    }
374
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
262✔
375
                                           TSDB_ORDER_ASC);
376
    int32_t prevEndPos = (forwardRows - 1) + startPos;
262✔
377
    if (pInfo->hasInterpoFunc) {
262✔
378
      int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false);
103✔
379
      TSKEY   endRowTs = tsCols[endRowId];
103✔
380
      transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL, pInfo->pOffsetInfo);
103✔
381
    }
382
    SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = curPoint.winKey.groupId};
262✔
383
    if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) {
262!
384
      code = tSimpleHashPut(pDeletedMap, &curKey, sizeof(SWinKey), NULL, 0);
×
385
      QUERY_CHECK_CODE(code, lino, _end);
×
386
    }
387

388
    code = saveWinResult(&curKey, curPoint.pResPos, pInfo->pUpdatedMap);
262✔
389
    QUERY_CHECK_CODE(code, lino, _end);
262!
390

391
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curPoint.winKey.win, 1);
262✔
392
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
262✔
393
                                           forwardRows, pBlock->info.rows, numOfOutput);
262✔
394
    QUERY_CHECK_CODE(code, lino, _end);
262!
395

396
    if (curPoint.pLastRow->key == curPoint.winKey.win.ekey) {
262!
397
      setInterpoWindowFinished(&curPoint);
×
398
    }
399

400
    startPos = getNextQualifiedWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
262✔
401
    if (startPos < 0) {
262✔
402
      break;
214✔
403
    }
404
    curTs = tsCols[startPos];
48✔
405
  }
406

407
_end:
217✔
408
  if (code != TSDB_CODE_SUCCESS) {
217!
409
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
410
  }
411
  return code;
217✔
412
}
413

414
static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,129✔
415
  int32_t                           code = TSDB_CODE_SUCCESS;
3,129✔
416
  int32_t                           lino = 0;
3,129✔
417
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
3,129✔
418
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
3,129✔
419
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
3,129✔
420

421
  qDebug("stask:%s  %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
3,129!
422

423
  if (pOperator->status == OP_EXEC_DONE) {
3,129!
424
    (*ppRes) = NULL;
×
425
    goto _end;
×
426
  }
427

428
  if (pOperator->status == OP_RES_TO_RETURN) {
3,129✔
429
    SSDataBlock* resBlock = NULL;
220✔
430
    code = buildIntervalSliceResult(pOperator, &resBlock);
220✔
431
    QUERY_CHECK_CODE(code, lino, _end);
220!
432
    if (resBlock != NULL) {
220✔
433
      (*ppRes) = resBlock;
45✔
434
      return code;
220✔
435
    }
436

437
    if (pInfo->recvCkBlock) {
175!
438
      pInfo->recvCkBlock = false;
×
439
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
440
      (*ppRes) = pInfo->pCheckpointRes;
×
441
      return code;
×
442
    }
443

444
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, pInfo->nbSup.numOfKeep, pInfo->nbSup.tsOfKeep);
175✔
445
    setStreamOperatorCompleted(pOperator);
175✔
446
    (*ppRes) = NULL;
175✔
447
    return code;
175✔
448
  }
449

450
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2,909✔
451
  int32_t        numOfDatapack = 0;
2,909✔
452

453
  while (1) {
1,128✔
454
    SSDataBlock* pBlock = NULL;
4,037✔
455
    code = downstream->fpSet.getNextFn(downstream, &pBlock);
4,037✔
456
    QUERY_CHECK_CODE(code, lino, _end);
5,039!
457

458
    if (pBlock == NULL) {
4,037✔
459
      pOperator->status = OP_RES_TO_RETURN;
1,907✔
460
      break;
1,907✔
461
    }
462

463
    switch (pBlock->info.type) {
2,130!
464
      case STREAM_NORMAL:
217✔
465
      case STREAM_INVALID: {
466
        SExprSupp* pExprSup = &pInfo->scalarSup;
217✔
467
        if (pExprSup->pExprInfo != NULL) {
217!
468
          code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
469
          QUERY_CHECK_CODE(code, lino, _end);
×
470
        }
471
      } break;
217✔
472
      case STREAM_CHECKPOINT: {
6✔
473
        pInfo->recvCkBlock = true;
6✔
474
        pAggSup->stateStore.streamStateCommit(pAggSup->pState);
6✔
475
        code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
6✔
476
        QUERY_CHECK_CODE(code, lino, _end);
6!
477
        continue;
911✔
478
      } break;
479
      case STREAM_CREATE_CHILD_TABLE: {
6✔
480
        (*ppRes) = pBlock;
6✔
481
        goto _end;
6✔
482
      } break;
483
      case STREAM_GET_RESULT: {
1,901✔
484
        pInfo->endTs = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval);
1,901✔
485
        if (pInfo->hasFill) {
1,901✔
486
          (*ppRes) = pBlock;
996✔
487
          goto _end;
996✔
488
        } else {
489
          continue;
905✔
490
        }
491
      }
492
      default:
×
493
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
494
        QUERY_CHECK_CODE(code, lino, _end);
×
495
    }
496

497
    code = setInputDataBlock(&pOperator->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
217✔
498
    QUERY_CHECK_CODE(code, lino, _end);
217!
499
    code = doStreamIntervalSliceAggImpl(pOperator, pBlock, pInfo->pUpdatedMap, NULL);
217✔
500
    QUERY_CHECK_CODE(code, lino, _end);
217!
501
  }
502

503
  if (!pInfo->destHasPrimaryKey) {
1,907!
504
    removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
1,907✔
505
  }
506

507
  if (pInfo->destHasPrimaryKey) {
1,907!
508
    code = copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins);
×
509
    QUERY_CHECK_CODE(code, lino, _end);
×
510
  }
511

512
  code = copyUpdateResult(&pInfo->pUpdatedMap, pInfo->pUpdated, winPosCmprImpl);
1,907✔
513
  QUERY_CHECK_CODE(code, lino, _end);
1,907!
514

515
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
1,907✔
516
  pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
1,907✔
517
  QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
1,906!
518

519
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
1,906✔
520
  pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
1,907✔
521
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
1,907!
522

523
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,907✔
524
  QUERY_CHECK_CODE(code, lino, _end);
1,907!
525

526
  code = buildIntervalSliceResult(pOperator, ppRes);
1,907✔
527
  QUERY_CHECK_CODE(code, lino, _end);
1,904!
528

529
  if ((*ppRes) == NULL) {
1,904✔
530
    if (pInfo->recvCkBlock) {
1,736✔
531
      pInfo->recvCkBlock = false;
6✔
532
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
6✔
533
      (*ppRes) = pInfo->pCheckpointRes;
6✔
534
      return code;
6✔
535
    }
536
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, pInfo->nbSup.numOfKeep, pInfo->nbSup.tsOfKeep);
1,730✔
537
    setStreamOperatorCompleted(pOperator);
1,732✔
538
  }
539

540
_end:
168✔
541
  if (code != TSDB_CODE_SUCCESS) {
2,903!
542
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
543
  }
544
  return code;
2,903✔
545
}
546

547
int32_t initIntervalSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type,
189✔
548
                                    int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic,
549
                                    SInterval* pInterval, bool hasInterpoFunc, int64_t recalculateInterval) {
550
  SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
189✔
551
  int32_t        code = TSDB_CODE_SUCCESS;
189✔
552
  int32_t        lino = 0;
189✔
553
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
189✔
554
    SStreamPartitionOperatorInfo* pPartionInfo = downstream->info;
28✔
555
    pPartionInfo->tsColIndex = tsColIndex;
28✔
556
    pBasic->primaryPkIndex = pPartionInfo->basic.primaryPkIndex;
28✔
557
  }
558

559
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
189✔
560
    code = initIntervalSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pInterval,
28✔
561
                                       hasInterpoFunc, recalculateInterval);
562
    return code;
28✔
563
  }
564
  SStreamScanInfo* pScanInfo = downstream->info;
161✔
565
  pScanInfo->useGetResultRange = hasInterpoFunc;
161✔
566
  pScanInfo->igCheckUpdate = true;
161✔
567
  pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
161✔
568
  pScanInfo->pState = pAggSup->pState;
161✔
569
  if (!pScanInfo->pUpdateInfo && pTwSup->calTrigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
161✔
570
    code = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
61✔
571
                                              pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen,
61✔
572
                                              &pScanInfo->pUpdateInfo);
61✔
573
    QUERY_CHECK_CODE(code, lino, _end);
61!
574
  }
575
  pScanInfo->twAggSup = *pTwSup;
161✔
576
  pScanInfo->interval = *pInterval;
161✔
577
  pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
161✔
578
  if (!hasSrcPrimaryKeyCol(pBasic)) {
161✔
579
    pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
157✔
580
  }
581
  pBasic->pTsDataState = pScanInfo->basic.pTsDataState;
161✔
582

583
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SEMI_INTERVAL) {
161✔
584
    pScanInfo->scanAllTables = true;
36✔
585
  }
586
  pScanInfo->recalculateInterval = recalculateInterval;
161✔
587

588
_end:
161✔
589
  if (code != TSDB_CODE_SUCCESS) {
161!
590
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
591
  }
592
  return code;
161✔
593
}
594

595
static bool windowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols) {
161✔
596
  bool needed = false;
161✔
597
  for (int32_t i = 0; i < numOfCols; ++i) {
1,135✔
598
    SExprInfo* pExpr = pCtx[i].pExpr;
1,012✔
599
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
1,012✔
600
      needed = true;
38✔
601
      break;
38✔
602
    }
603
  }
604
  return needed;
161✔
605
}
606

607
int32_t initNonBlockAggSupptor(SNonBlockAggSupporter* pNbSup, SInterval* pInterval, SOperatorInfo* downstream) {
1,636✔
608
  int32_t code = TSDB_CODE_SUCCESS;
1,636✔
609
  int32_t lino = 0;
1,636✔
610
  if (pInterval != NULL) {
1,636✔
611
    pNbSup->numOfKeep = ceil(((double)pInterval->interval) / pInterval->sliding);
161✔
612
  } else {
613
    pNbSup->numOfKeep = 1;
1,475✔
614
  }
615
  pNbSup->tsOfKeep = INT64_MAX;
1,636✔
616
  pNbSup->pullIndex = 0;
1,636✔
617
  pNbSup->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo));
1,636✔
618
  QUERY_CHECK_NULL(pNbSup->pPullWins, code, lino, _end, terrno);
1,637!
619

620
  code = createSpecialDataBlock(STREAM_RETRIEVE, &pNbSup->pPullDataRes);
1,637✔
621
  QUERY_CHECK_CODE(code, lino, _end);
1,637!
622

623
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
1,637✔
624
  pNbSup->pPullDataMap = tSimpleHashInit(64, hashFn);
1,637✔
625
  pNbSup->numOfChild = 0;
1,636✔
626

627
  while (downstream != NULL && downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
1,636!
628
    downstream = downstream->pDownstream[0];
×
629
  }
630

631
  if (downstream != NULL) {
1,636!
632
    SStreamScanInfo* pInfo = (SStreamScanInfo*)downstream->info;
×
633
    pNbSup->recParam = pInfo->recParam;
×
634
  } else {
635
    pNbSup->recParam = (SStreamRecParam){0};
1,636✔
636
  }
637

638
_end:
1,636✔
639
  if (code != TSDB_CODE_SUCCESS) {
1,636!
640
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
641
  }
642
  return code;
1,636✔
643
}
644

645
void destroyNonBlockAggSupptor(SNonBlockAggSupporter* pNbSup) {
2,088✔
646
  blockDataDestroy(pNbSup->pPullDataRes);
2,088✔
647
  pNbSup->pPullDataRes = NULL;
2,086✔
648
  tSimpleHashCleanup(pNbSup->pHistoryGroup);
2,086✔
649
  pNbSup->pHistoryGroup = NULL;
2,086✔
650
  taosArrayDestroy(pNbSup->pPullWins);
2,086✔
651
  pNbSup->pPullWins = NULL;
2,088✔
652
  tSimpleHashCleanup(pNbSup->pPullDataMap);
2,088✔
653
  pNbSup->pPullDataMap = NULL;
2,088✔
654
}
2,088✔
655

656
int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
161✔
657
                                              SReadHandle* pHandle, SOperatorInfo** ppOptInfo) {
658
  int32_t                           code = TSDB_CODE_SUCCESS;
161✔
659
  int32_t                           lino = 0;
161✔
660
  SStreamIntervalSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalSliceOperatorInfo));
161!
661
  QUERY_CHECK_NULL(pInfo, code, lino, _error, terrno);
161!
662

663
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
161!
664
  QUERY_CHECK_NULL(pOperator, code, lino, _error, terrno)
161!
665

666
  pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
161✔
667
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _error, terrno);
161!
668

669
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
161✔
670
  pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
161✔
671
  QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _error, terrno);
161!
672

673
  pInfo->pDeletedMap = tSimpleHashInit(1024, hashFn);
161✔
674
  QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno);
161!
675

676
  pInfo->delIndex = 0;
161✔
677
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
161✔
678
  QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno);
161!
679

680
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
161✔
681
  QUERY_CHECK_CODE(code, lino, _error);
161!
682

683
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
161✔
684
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
161!
685
  initBasicInfo(&pInfo->binfo, pResBlock);
161✔
686

687
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
161✔
688
  QUERY_CHECK_CODE(code, lino, _error);
161!
689
  pInfo->recvCkBlock = false;
161✔
690

691
  SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
161✔
692
  pOperator->pTaskInfo = pTaskInfo;
161✔
693
  initResultSizeInfo(&pOperator->resultInfo, 4096);
161✔
694
  SExprSupp* pExpSup = &pOperator->exprSupp;
161✔
695
  int32_t    numOfExprs = 0;
161✔
696
  SExprInfo* pExprInfo = NULL;
161✔
697
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfExprs);
161✔
698
  QUERY_CHECK_CODE(code, lino, _error);
161!
699

700
  code = initExprSupp(pExpSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
161✔
701
  QUERY_CHECK_CODE(code, lino, _error);
161!
702

703
  pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
161✔
704
                                .sliding = pIntervalPhyNode->sliding,
161✔
705
                                .intervalUnit = pIntervalPhyNode->intervalUnit,
161✔
706
                                .slidingUnit = pIntervalPhyNode->slidingUnit,
161✔
707
                                .offset = pIntervalPhyNode->offset,
161✔
708
                                .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
161✔
709
                                .timeRange = pIntervalPhyNode->timeRange};
161✔
710
  calcIntervalAutoOffset(&pInfo->interval);
161✔
711

712
  pInfo->twAggSup =
161✔
713
      (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
161✔
714
                           .calTrigger = pIntervalPhyNode->window.triggerType,
161✔
715
                           .maxTs = INT64_MIN,
716
                           .minTs = INT64_MAX,
717
                           .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)};
161✔
718
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
161✔
719
  QUERY_CHECK_CODE(code, lino, _error);
161!
720
  pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
161✔
721

722
  if (pIntervalPhyNode->window.pExprs != NULL) {
161!
723
    int32_t    numOfScalar = 0;
×
724
    SExprInfo* pScalarExprInfo = NULL;
×
725
    code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
×
726
    QUERY_CHECK_CODE(code, lino, _error);
×
727

728
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
×
729
    QUERY_CHECK_CODE(code, lino, _error);
×
730
  }
731

732
  SSDataBlock* pDownRes = NULL;
161✔
733
  SColumnInfo* pPkCol = NULL;
161✔
734
  code = getDownstreamRes(downstream, &pDownRes, &pPkCol);
161✔
735
  QUERY_CHECK_CODE(code, lino, _error);
161!
736

737
  code = initOffsetInfo(&pInfo->pOffsetInfo, pDownRes);
161✔
738
  QUERY_CHECK_CODE(code, lino, _error);
161!
739

740
  int32_t keyBytes = sizeof(TSKEY);
161✔
741
  keyBytes +=
161✔
742
      blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock) + sizeof(bool);
161✔
743
  if (pPkCol) {
161✔
744
    keyBytes += pPkCol->bytes;
7✔
745
  }
746
  code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState, keyBytes, 0,
161✔
747
                                &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo),
161✔
748
                                &pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_HASH_SEARCH, 1);
749

750
  pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimaryKey;
161✔
751
  pInfo->pOperator = pOperator;
161✔
752
  pInfo->hasFill = false;
161✔
753
  pInfo->hasInterpoFunc = windowinterpNeeded(pExpSup->pCtx, numOfExprs);
161✔
754
  initNonBlockAggSupptor(&pInfo->nbSup, &pInfo->interval, NULL);
161✔
755

756
  setOperatorInfo(pOperator, "StreamIntervalSliceOperator", nodeType(pPhyNode), true, OP_NOT_OPENED, pInfo, pTaskInfo);
161✔
757
  code = initStreamBasicInfo(&pInfo->basic, pOperator);
161✔
758
  QUERY_CHECK_CODE(code, lino, _error);
161!
759

760
  if (pIntervalPhyNode->window.triggerType == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
161✔
761
    qDebug("create continuous interval operator. op type:%d, task type:%d, task id:%s", nodeType(pPhyNode),
82!
762
           pHandle->fillHistory, GET_TASKID(pTaskInfo));
763
    if (pHandle->fillHistory == STREAM_HISTORY_OPERATOR) {
82✔
764
      setFillHistoryOperatorFlag(&pInfo->basic);
10✔
765
    } else if (pHandle->fillHistory == STREAM_RECALCUL_OPERATOR) {
72✔
766
      setRecalculateOperatorFlag(&pInfo->basic);
36✔
767
    }
768
    pInfo->nbSup.pWindowAggFn = doStreamIntervalNonblockAggImpl;
82✔
769
    if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL) {
82✔
770
      setSingleOperatorFlag(&pInfo->basic);
28✔
771
    }
772
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalNonblockAggNext, NULL,
82✔
773
                                           destroyStreamIntervalSliceOperatorInfo, optrDefaultBufFn, NULL,
774
                                           optrDefaultGetNextExtFn, NULL);
775
    setOperatorStreamStateFn(pOperator, streamIntervalNonblockReleaseState, streamIntervalNonblockReloadState);
82✔
776
  } else {
777
    pOperator->fpSet =
778
        createOperatorFpSet(optrDummyOpenFn, doStreamIntervalSliceNext, NULL, destroyStreamIntervalSliceOperatorInfo,
79✔
779
                            optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
780
    setOperatorStreamStateFn(pOperator, streamIntervalSliceReleaseState, streamIntervalSliceReloadState);
79✔
781
  }
782

783
  if (downstream) {
161!
784
    code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex,
161✔
785
                                       &pInfo->twAggSup, &pInfo->basic, &pInfo->interval, pInfo->hasInterpoFunc,
161✔
786
                                       pIntervalPhyNode->window.recalculateInterval);
787
    QUERY_CHECK_CODE(code, lino, _error);
161!
788

789
    code = appendDownstream(pOperator, &downstream, 1);
161✔
790
    QUERY_CHECK_CODE(code, lino, _error);
161!
791
  }
792

793
  (*ppOptInfo) = pOperator;
161✔
794
  return code;
161✔
795

796
_error:
×
797
  if (code != TSDB_CODE_SUCCESS) {
×
798
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
799
  }
800
  if (pInfo != NULL) {
×
801
    destroyStreamIntervalSliceOperatorInfo(pInfo);
×
802
  }
803
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
804
  pTaskInfo->code = code;
×
805
  (*ppOptInfo) = NULL;
×
806
  return code;
×
807
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc