• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3646

12 Mar 2025 12:34PM UTC coverage: 28.375% (-27.8%) from 56.156%
#3646

push

travis-ci

web-flow
Merge pull request #30119 from taosdata/ciup30

ci: Update workflow to fix param issue of run_tdgpt_test

59085 of 286935 branches covered (20.59%)

Branch coverage included in aggregate %.

102775 of 283490 relevant lines covered (36.25%)

55149.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/streamintervalsliceoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "functionMgt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "storageapi.h"
20
#include "streamexecutorInt.h"
21
#include "tcommon.h"
22
#include "tcompare.h"
23
#include "tdatablock.h"
24
#include "ttime.h"
25

26
#define STREAM_INTERVAL_SLICE_OP_CHECKPOINT_NAME "StreamIntervalSliceOperator_Checkpoint"
27

28
typedef struct SInervalSlicePoint {
29
  SSessionKey      winKey;
30
  bool             *pFinished;
31
  SSliceRowData*   pLastRow;
32
  SRowBuffPos*     pResPos;
33
} SInervalSlicePoint;
34

35
typedef enum SIntervalSliceType {
36
  INTERVAL_SLICE_START = 1,
37
  INTERVAL_SLICE_END = 2,
38
} SIntervalSliceType;
39

40
void streamIntervalSliceReleaseState(SOperatorInfo* pOperator) {
×
41
}
×
42

43
void streamIntervalSliceReloadState(SOperatorInfo* pOperator) {
×
44
}
×
45

46
void destroyStreamIntervalSliceOperatorInfo(void* param) {
×
47
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)param;
×
48
  if (param == NULL) {
×
49
    return;
×
50
  }
51
  cleanupBasicInfo(&pInfo->binfo);
×
52
  if (pInfo->pOperator) {
×
53
    cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
×
54
                              &pInfo->groupResInfo);
55
    pInfo->pOperator = NULL;
×
56
  }
57

58
  destroyStreamBasicInfo(&pInfo->basic);
×
59
  clearGroupResInfo(&pInfo->groupResInfo);
×
60
  taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
×
61
  pInfo->pUpdated = NULL;
×
62

63
  if (pInfo->pUpdatedMap != NULL) {
×
64
    tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos);
×
65
    tSimpleHashCleanup(pInfo->pUpdatedMap);
×
66
    pInfo->pUpdatedMap = NULL;
×
67
  }
68
  destroyStreamAggSupporter(&pInfo->streamAggSup);
×
69

70
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
71
  cleanupExprSupp(&pInfo->scalarSup);
×
72

73
  tSimpleHashCleanup(pInfo->pDeletedMap);
×
74
  taosArrayDestroy(pInfo->pDelWins);
×
75
  blockDataDestroy(pInfo->pDelRes);
×
76

77
  blockDataDestroy(pInfo->pCheckpointRes);
×
78
  taosMemoryFreeClear(pInfo->pOffsetInfo);
×
79

80
  taosMemoryFreeClear(param);
×
81
}
82

83
static int32_t buildIntervalSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
84
  int32_t                           code = TSDB_CODE_SUCCESS;
×
85
  int32_t                           lino = 0;
×
86
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
87
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
88
  uint16_t                          opType = pOperator->operatorType;
×
89
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
90
  SStreamNotifyEventSupp*           pNotifySup = &pInfo->basic.notifyEventSup;
×
91
  STaskNotifyEventStat*             pNotifyEventStat = pTaskInfo->streamInfo.pNotifyEventStat;
×
92
  bool                              addNotifyEvent = false;
×
93
  addNotifyEvent = IS_NORMAL_INTERVAL_OP(pOperator) &&
×
94
                   BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE);
×
95
  doBuildDeleteResultImpl(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pState, pInfo->pDelWins, &pInfo->delIndex,
×
96
                          pInfo->pDelRes);
97
  if (pInfo->pDelRes->info.rows != 0) {
×
98
    // process the rest of the data
99
    printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
×
100
    if (addNotifyEvent) {
×
101
      code = addAggDeleteNotifyEvent(pInfo->pDelRes, pNotifySup, pNotifyEventStat);
×
102
      QUERY_CHECK_CODE(code, lino, _end);
×
103
    }
104
    (*ppRes) = pInfo->pDelRes;
×
105
    return code;
×
106
  }
107

108
  doBuildStreamIntervalResult(pOperator, pInfo->streamAggSup.pState, pInfo->binfo.pRes, &pInfo->groupResInfo,
×
109
                              addNotifyEvent ? pNotifySup->pSessionKeys : NULL);
110
  if (pInfo->binfo.pRes->info.rows != 0) {
×
111
    printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
×
112
    if (addNotifyEvent) {
×
113
      code = addAggResultNotifyEvent(pInfo->binfo.pRes, pNotifySup->pSessionKeys,
×
114
                                     pTaskInfo->streamInfo.notifyResultSchema, pNotifySup, pNotifyEventStat);
×
115
      QUERY_CHECK_CODE(code, lino, _end);
×
116
    }
117
    (*ppRes) = pInfo->binfo.pRes;
×
118
    goto _end;
×
119
  }
120

121
  code = buildNotifyEventBlock(pTaskInfo, pNotifySup, pNotifyEventStat);
×
122
  QUERY_CHECK_CODE(code, lino, _end);
×
123
  if (pNotifySup->pEventBlock && pNotifySup->pEventBlock->info.rows > 0) {
×
124
    printDataBlock(pNotifySup->pEventBlock, getStreamOpName(opType), GET_TASKID(pTaskInfo));
×
125
    (*ppRes) = pNotifySup->pEventBlock;
×
126
    return code;
×
127
  }
128

129
  code = removeOutdatedNotifyEvents(&pInfo->twAggSup, pNotifySup, pNotifyEventStat);
×
130
  QUERY_CHECK_CODE(code, lino, _end);
×
131

132
_end:
×
133
  if (code != TSDB_CODE_SUCCESS) {
×
134
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
135
  }
136
  return code;
×
137
}
138

139
// static void doStreamIntervalSliceSaveCheckpoint(SOperatorInfo* pOperator) {
140
// }
141

142
void initIntervalSlicePoint(SStreamAggSupporter* pAggSup, STimeWindow* pTWin, int64_t groupId, SInervalSlicePoint* pPoint) {
×
143
  pPoint->winKey.groupId = groupId;
×
144
  pPoint->winKey.win = *pTWin;
×
145
  pPoint->pFinished = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize);
×
146
  pPoint->pLastRow = POINTER_SHIFT(pPoint->pFinished, sizeof(bool));
×
147
}
×
148

149
int32_t getIntervalSlicePrevStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, SWinKey* pCurKey,
×
150
                                     SInervalSlicePoint* pPrevPoint) {
151
  int32_t code = TSDB_CODE_SUCCESS;
×
152
  int32_t lino = 0;
×
153
  SWinKey prevKey = {.groupId = pCurKey->groupId};
×
154
  SET_WIN_KEY_INVALID(prevKey.ts);
×
155
  int32_t prevVLen = 0;
×
156
  int32_t prevWinCode = TSDB_CODE_SUCCESS;
×
157
  code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, pCurKey, &prevKey, (void**)&pPrevPoint->pResPos,
×
158
                                                &prevVLen, &prevWinCode);
159
  QUERY_CHECK_CODE(code, lino, _end);
×
160

161
  if (prevWinCode == TSDB_CODE_SUCCESS) {
×
162
    STimeWindow prevSTW = {.skey = prevKey.ts};
×
163
    prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval);
×
164
    initIntervalSlicePoint(pAggSup, &prevSTW, pCurKey->groupId, pPrevPoint);
×
165
    qDebug("===stream=== set stream twa prev point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d",
×
166
           pPrevPoint->winKey.win.skey, pPrevPoint->winKey.groupId, prevWinCode);
167
  } else {
168
    SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey);
×
169
    SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey);
×
170
  }
171

172
_end:
×
173
  if (code != TSDB_CODE_SUCCESS) {
×
174
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
175
  }
176
  return code;
×
177
}
178

179
static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, bool needPrev, STimeWindow* pTWin, int64_t groupId,
×
180
                                           SInervalSlicePoint* pCurPoint, SInervalSlicePoint* pPrevPoint, int32_t* pWinCode) {
181
  int32_t code = TSDB_CODE_SUCCESS;
×
182
  int32_t lino = 0;
×
183
  SWinKey curKey = {.ts = pTWin->skey, .groupId = groupId};
×
184
  int32_t curVLen = 0;
×
185
  code = pAggSup->stateStore.streamStateAddIfNotExist(pAggSup->pState, &curKey, (void**)&pCurPoint->pResPos,
×
186
                                                      &curVLen, pWinCode);
187
  QUERY_CHECK_CODE(code, lino, _end);
×
188

189
  qDebug("===stream=== set stream twa cur point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d",
×
190
         curKey.ts, curKey.groupId, *pWinCode);
191

192
  initIntervalSlicePoint(pAggSup, pTWin, groupId, pCurPoint);
×
193

194
  if (needPrev) {
×
195
    code = getIntervalSlicePrevStateBuf(pAggSup, pInterval, &curKey, pPrevPoint);
×
196
    QUERY_CHECK_CODE(code, lino, _end);
×
197
  }
198

199
_end:
×
200
  if (code != TSDB_CODE_SUCCESS) {
×
201
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
202
  }
203
  return code;
×
204
}
205

206
void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY curTs, SSDataBlock* pDataBlock,
×
207
                                int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type, int32_t* pOffsetInfo) {
208
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
209
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
×
210
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
×
211
      pCtx[k].start.key = INT64_MIN;
×
212
      continue;
×
213
    }
214

215
    SFunctParam*     pParam = &pCtx[k].param[0];
×
216
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, pParam->pCol->slotId);
×
217

218
    double           prevVal = 0, curVal = 0, winVal = 0;
×
219
    SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId, pOffsetInfo);
×
220
    GET_TYPED_DATA(prevVal, double, pCell->type, pCell->pData);
×
221
    GET_TYPED_DATA(curVal, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
×
222

223
    SPoint point1 = (SPoint){.key = pPrevWinVal->key, .val = &prevVal};
×
224
    SPoint point2 = (SPoint){.key = curTs, .val = &curVal};
×
225
    SPoint point = (SPoint){.key = winKey, .val = &winVal};
×
226

227
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
×
228
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
×
229
    }
230

231
    if (type == INTERVAL_SLICE_START) {
×
232
      pCtx[k].start.key = point.key;
×
233
      pCtx[k].start.val = winVal;
×
234
    } else {
235
      pCtx[k].end.key = point.key;
×
236
      pCtx[k].end.val = winVal;
×
237
    }
238
  }
239
}
×
240

241
void doSetElapsedEndKey(TSKEY winKey, SExprSupp* pSup) {
×
242
  SqlFunctionCtx* pCtx = pSup->pCtx;
×
243
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
×
244
    if (fmIsElapsedFunc(pCtx[k].functionId)) {
×
245
      pCtx[k].end.key = winKey;
×
246
      pCtx[k].end.val = 0;
×
247
    }
248
  }
249
}
×
250

251
static void resetIntervalSliceFunctionKey(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
×
252
  for (int32_t k = 0; k < numOfOutput; ++k) {
×
253
    pCtx[k].start.key = INT64_MIN;
×
254
    pCtx[k].end.key = INT64_MIN;
×
255
  }
256
}
×
257

258
int32_t setIntervalSliceOutputBuf(SInervalSlicePoint* pPoint, SqlFunctionCtx* pCtx, int32_t numOfOutput,
×
259
                                  int32_t* rowEntryInfoOffset) {
260
  int32_t     code = TSDB_CODE_SUCCESS;
×
261
  int32_t     lino = 0;
×
262
  SResultRow* res = pPoint->pResPos->pRowBuff;
×
263

264
  // set time window for current result
265
  res->win = pPoint->winKey.win;
×
266
  code = setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset);
×
267
  QUERY_CHECK_CODE(code, lino, _end);
×
268

269
_end:
×
270
  if (code != TSDB_CODE_SUCCESS) {
×
271
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
272
  }
273
  return code;
×
274
}
275

276
static void setInterpoWindowFinished(SInervalSlicePoint* pPoint) {
×
277
  (*pPoint->pFinished) = true;
×
278
}
×
279

280
static bool isInterpoWindowFinished(SInervalSlicePoint* pPoint) {
×
281
  return *pPoint->pFinished;
×
282
}
283

284
static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock, SSHashObj* pUpdatedMap,
×
285
                                            SSHashObj* pDeletedMap) {
286
  int32_t                           code = TSDB_CODE_SUCCESS;
×
287
  int32_t                           lino = 0;
×
288
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)pOperator->info;
×
289
  SResultRowInfo*                   pResultRowInfo = &(pInfo->binfo.resultRowInfo);
×
290
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
291
  SExprSupp*                        pSup = &pOperator->exprSupp;
×
292
  int32_t                           numOfOutput = pSup->numOfExprs;
×
293
  TSKEY*                            tsCols = NULL;
×
294
  int64_t                           groupId = pBlock->info.id.groupId;
×
295
  SResultRow*                       pResult = NULL;
×
296
  int32_t                           forwardRows = 0;
×
297

298
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
×
299
  tsCols = (int64_t*)pColDataInfo->pData;
×
300

301
  int32_t     startPos = 0;
×
302
  TSKEY       curTs = getStartTsKey(&pBlock->info.window, tsCols);
×
303
  SInervalSlicePoint curPoint = {0};
×
304
  SInervalSlicePoint prevPoint = {0};
×
305
  STimeWindow curWin =
306
      getActiveTimeWindow(NULL, pResultRowInfo, curTs, &pInfo->interval, TSDB_ORDER_ASC);
×
307
  while (1) {
×
308
    int32_t winCode = TSDB_CODE_SUCCESS;
×
309
    if (curTs <= pInfo->endTs) {
×
310
      code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId, &curPoint, &prevPoint, &winCode);
×
311
      QUERY_CHECK_CODE(code, lino, _end);
×
312
    } else if (pInfo->hasInterpoFunc) {
×
313
      SWinKey curKey = {.ts = curWin.skey, .groupId = groupId};
×
314
      code = getIntervalSlicePrevStateBuf(&pInfo->streamAggSup, &pInfo->interval, &curKey, &prevPoint);
×
315
      QUERY_CHECK_CODE(code, lino, _end);
×
316
    }
317

318
    if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && isInterpoWindowFinished(&prevPoint) == false) {
×
319
      code = setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
×
320
      QUERY_CHECK_CODE(code, lino, _end);
×
321

322
      resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
×
323
      doSetElapsedEndKey(prevPoint.winKey.win.ekey, &pOperator->exprSupp);
×
324
      doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END, pInfo->pOffsetInfo);
×
325
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1);
×
326
      code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
×
327
                                             0, pBlock->info.rows, numOfOutput);
×
328
      QUERY_CHECK_CODE(code, lino, _end);
×
329
      SWinKey prevKey = {.ts = prevPoint.winKey.win.skey, .groupId = prevPoint.winKey.groupId};
×
330
      code = saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap);
×
331
      QUERY_CHECK_CODE(code, lino, _end);
×
332
      setInterpoWindowFinished(&prevPoint);
×
333
    } else if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey)) {
×
334
      releaseOutputBuf(pInfo->streamAggSup.pState, prevPoint.pResPos, &pInfo->streamAggSup.stateStore);
×
335
    }
336

337
    if (curTs > pInfo->endTs) {
×
338
      break;
×
339
    }
340

341
    code = setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
×
342
    QUERY_CHECK_CODE(code, lino, _end);
×
343

344
    if (winCode != TSDB_CODE_SUCCESS && IS_NORMAL_INTERVAL_OP(pOperator) &&
×
345
        BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN)) {
×
346
      SSessionKey key = {.win = curWin, .groupId = groupId};
×
347
      code = addIntervalAggNotifyEvent(SNOTIFY_EVENT_WINDOW_OPEN, &key, &pInfo->basic.notifyEventSup,
×
348
                                       pTaskInfo->streamInfo.pNotifyEventStat);
349
      QUERY_CHECK_CODE(code, lino, _end);
×
350
    }
351

352
    resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
×
353
    if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) {
×
354
      doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START, pInfo->pOffsetInfo);
×
355
    }
356
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
×
357
                                           TSDB_ORDER_ASC);
358
    int32_t prevEndPos = (forwardRows - 1) + startPos;
×
359
    if (pInfo->hasInterpoFunc) {
×
360
      int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false);
×
361
      TSKEY endRowTs = tsCols[endRowId];
×
362
      transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL, pInfo->pOffsetInfo);
×
363
    }
364
    SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = curPoint.winKey.groupId};
×
365
    if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) {
×
366
      code = tSimpleHashPut(pDeletedMap, &curKey, sizeof(SWinKey), NULL, 0);
×
367
      QUERY_CHECK_CODE(code, lino, _end);
×
368
    }
369

370
    code = saveWinResult(&curKey, curPoint.pResPos, pInfo->pUpdatedMap);
×
371
    QUERY_CHECK_CODE(code, lino, _end);
×
372

373
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curPoint.winKey.win, 1);
×
374
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
×
375
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
376
    QUERY_CHECK_CODE(code, lino, _end);
×
377

378
    if (curPoint.pLastRow->key == curPoint.winKey.win.ekey) {
×
379
      setInterpoWindowFinished(&curPoint);
×
380
    }
381

382
    startPos = getNextQualifiedWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
×
383
    if (startPos < 0) {
×
384
      break;
×
385
    }
386
    curTs = tsCols[startPos];
×
387
  }
388

389
_end:
×
390
  if (code != TSDB_CODE_SUCCESS) {
×
391
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
392
  }
393
  return code;
×
394
}
395

396
static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
397
  int32_t                           code = TSDB_CODE_SUCCESS;
×
398
  int32_t                           lino = 0;
×
399
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
400
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
401
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
402

403
  qDebug("stask:%s  %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
×
404

405
  if (pOperator->status == OP_EXEC_DONE) {
×
406
    (*ppRes) = NULL;
×
407
    goto _end;
×
408
  }
409

410
  if (pOperator->status == OP_RES_TO_RETURN) {
×
411
    SSDataBlock* resBlock = NULL;
×
412
    code = buildIntervalSliceResult(pOperator, &resBlock);
×
413
    QUERY_CHECK_CODE(code, lino, _end);
×
414
    if (resBlock != NULL) {
×
415
      (*ppRes) = resBlock;
×
416
      return code;
×
417
    }
418

419
    if (pInfo->recvCkBlock) {
×
420
      pInfo->recvCkBlock = false;
×
421
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
422
      (*ppRes) = pInfo->pCheckpointRes;
×
423
      return code;
×
424
    } 
425

426
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
×
427
    setStreamOperatorCompleted(pOperator);
×
428
    (*ppRes) = NULL;
×
429
    return code;
×
430
  }
431

432
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
433
  int32_t numOfDatapack = 0;
×
434

435
  while (1) {
×
436
    SSDataBlock* pBlock = NULL;
×
437
    code = downstream->fpSet.getNextFn(downstream, &pBlock);
×
438
    QUERY_CHECK_CODE(code, lino, _end);
×
439

440
    if (pBlock == NULL) {
×
441
      pOperator->status = OP_RES_TO_RETURN;
×
442
      break;
×
443
    }
444

445
    switch (pBlock->info.type) {
×
446
      case STREAM_NORMAL:
×
447
      case STREAM_INVALID: {
448
        SExprSupp* pExprSup = &pInfo->scalarSup;
×
449
        if (pExprSup->pExprInfo != NULL) {
×
450
          code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
451
          QUERY_CHECK_CODE(code, lino, _end);
×
452
        }
453
      } break;
×
454
      case STREAM_CHECKPOINT: {
×
455
        pInfo->recvCkBlock = true;
×
456
        pAggSup->stateStore.streamStateCommit(pAggSup->pState);
×
457
        code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
×
458
        QUERY_CHECK_CODE(code, lino, _end);
×
459
        continue;
×
460
      } break;
461
      case STREAM_CREATE_CHILD_TABLE: {
×
462
        (*ppRes) = pBlock;
×
463
        goto _end;
×
464
      } break;
465
      case STREAM_GET_RESULT: {
×
466
        pInfo->endTs = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval);
×
467
        if (pInfo->hasFill) {
×
468
          (*ppRes) = pBlock;
×
469
          goto _end;
×
470
        } else {
471
          continue;
×
472
        }
473
      }
474
      default:
×
475
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
476
        QUERY_CHECK_CODE(code, lino, _end);
×
477
    }
478

479
    code = setInputDataBlock(&pOperator->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
×
480
    QUERY_CHECK_CODE(code, lino, _end);
×
481
    code = doStreamIntervalSliceAggImpl(pOperator, pBlock, pInfo->pUpdatedMap, NULL);
×
482
    QUERY_CHECK_CODE(code, lino, _end);
×
483
    
484
  }
485

486
  if (!pInfo->destHasPrimaryKey) {
×
487
    removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
×
488
  }
489

490
  if (pInfo->destHasPrimaryKey) {
×
491
    code = copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins);
×
492
    QUERY_CHECK_CODE(code, lino, _end);
×
493
  }
494

495
  code = copyUpdateResult(&pInfo->pUpdatedMap, pInfo->pUpdated, winPosCmprImpl);
×
496
  QUERY_CHECK_CODE(code, lino, _end);
×
497

498
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
499
  pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
×
500
  QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
×
501

502
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
×
503
  pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
×
504
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
×
505

506
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
×
507
  QUERY_CHECK_CODE(code, lino, _end);
×
508

509
  (*ppRes) = NULL;
×
510
  code = buildIntervalSliceResult(pOperator, ppRes);
×
511
  QUERY_CHECK_CODE(code, lino, _end);
×
512

513
  if ((*ppRes) == NULL) {
×
514
    if (pInfo->recvCkBlock) {
×
515
      pInfo->recvCkBlock = false;
×
516
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
517
      (*ppRes) = pInfo->pCheckpointRes;
×
518
      return code;
×
519
    } 
520
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
×
521
    setStreamOperatorCompleted(pOperator);
×
522
  }
523

524
_end:
×
525
  if (code != TSDB_CODE_SUCCESS) {
×
526
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
527
  }
528
  return code;
×
529
}
530

531
int32_t initIntervalSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type,
×
532
                                    int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic,
533
                                    SInterval* pInterval, bool hasInterpoFunc) {
534
  SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
×
535
  int32_t        code = TSDB_CODE_SUCCESS;
×
536
  int32_t        lino = 0;
×
537
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
×
538
    SStreamPartitionOperatorInfo* pPartionInfo = downstream->info;
×
539
    pPartionInfo->tsColIndex = tsColIndex;
×
540
    pBasic->primaryPkIndex = pPartionInfo->basic.primaryPkIndex;
×
541
  }
542

543
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
544
    code =
545
        initIntervalSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pInterval, hasInterpoFunc);
×
546
    return code;
×
547
  }
548
  SStreamScanInfo* pScanInfo = downstream->info;
×
549
  pScanInfo->useGetResultRange = hasInterpoFunc;
×
550
  pScanInfo->igCheckUpdate = true;
×
551
  pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
×
552
  pScanInfo->pState = pAggSup->pState;
×
553
  if (!pScanInfo->pUpdateInfo) {
×
554
    code = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
×
555
                                              pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen,
×
556
                                              &pScanInfo->pUpdateInfo);
×
557
    QUERY_CHECK_CODE(code, lino, _end);
×
558
  }
559
  pScanInfo->twAggSup = *pTwSup;
×
560
  pScanInfo->interval = *pInterval;
×
561
  pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
×
562
  if (!hasSrcPrimaryKeyCol(pBasic)) {
×
563
    pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
×
564
  }
565

566
_end:
×
567
  if (code != TSDB_CODE_SUCCESS) {
×
568
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
569
  }
570
  return code;
×
571
}
572

573
static bool windowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols) {
×
574
  bool  needed = false;
×
575
  for (int32_t i = 0; i < numOfCols; ++i) {
×
576
    SExprInfo* pExpr = pCtx[i].pExpr;
×
577
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
×
578
      needed = true;
×
579
      break;
×
580
    }
581
  }
582
  return needed;
×
583
}
584

585
int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
×
586
                                              SReadHandle* pHandle, SOperatorInfo** ppOptInfo) {
587
  int32_t                           code = TSDB_CODE_SUCCESS;
×
588
  int32_t                           lino = 0;
×
589
  SStreamIntervalSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalSliceOperatorInfo));
×
590
  QUERY_CHECK_NULL(pInfo, code, lino, _error, terrno);
×
591

592
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
593
  QUERY_CHECK_NULL(pOperator, code, lino, _error, terrno)
×
594

595
  pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
×
596
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _error, terrno);
×
597

598
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
599
  pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
×
600
  QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _error, terrno);
×
601

602
  pInfo->pDeletedMap = tSimpleHashInit(1024, hashFn);
×
603
  QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno);
×
604

605
  pInfo->delIndex = 0;
×
606
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
×
607
  QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno);
×
608

609
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
×
610
  QUERY_CHECK_CODE(code, lino, _error);
×
611

612
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
×
613
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
614
  initBasicInfo(&pInfo->binfo, pResBlock);
×
615

616
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
×
617
  QUERY_CHECK_CODE(code, lino, _error);
×
618
  pInfo->recvCkBlock = false;
×
619

620
  SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
×
621
  pOperator->pTaskInfo = pTaskInfo;
×
622
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
623
  SExprSupp* pExpSup = &pOperator->exprSupp;
×
624
  int32_t    numOfExprs = 0;
×
625
  SExprInfo* pExprInfo = NULL;
×
626
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfExprs);
×
627
  QUERY_CHECK_CODE(code, lino, _error);
×
628

629
  code = initExprSupp(pExpSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
×
630
  QUERY_CHECK_CODE(code, lino, _error);
×
631

632
  pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
×
633
                                .sliding = pIntervalPhyNode->sliding,
×
634
                                .intervalUnit = pIntervalPhyNode->intervalUnit,
×
635
                                .slidingUnit = pIntervalPhyNode->slidingUnit,
×
636
                                .offset = pIntervalPhyNode->offset,
×
637
                                .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
638
                                .timeRange = pIntervalPhyNode->timeRange};
×
639
  calcIntervalAutoOffset(&pInfo->interval);
×
640

641
  pInfo->twAggSup =
×
642
      (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
×
643
                           .calTrigger = pIntervalPhyNode->window.triggerType,
×
644
                           .maxTs = INT64_MIN,
645
                           .minTs = INT64_MAX,
646
                           .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)};
×
647
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
648
  QUERY_CHECK_CODE(code, lino, _error);
×
649

650
  if (pIntervalPhyNode->window.pExprs != NULL) {
×
651
    int32_t    numOfScalar = 0;
×
652
    SExprInfo* pScalarExprInfo = NULL;
×
653
    code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
×
654
    QUERY_CHECK_CODE(code, lino, _error);
×
655

656
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
×
657
    QUERY_CHECK_CODE(code, lino, _error);
×
658
  }
659

660
  SSDataBlock* pDownRes = NULL;
×
661
  SColumnInfo* pPkCol = NULL;
×
662
  code = getDownstreamRes(downstream, &pDownRes, &pPkCol);
×
663
  QUERY_CHECK_CODE(code, lino, _error);
×
664

665
  code = initOffsetInfo(&pInfo->pOffsetInfo, pDownRes);
×
666
  QUERY_CHECK_CODE(code, lino, _error);
×
667

668
  int32_t keyBytes = sizeof(TSKEY);
×
669
  keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock) + sizeof(bool);
×
670
  if (pPkCol) {
×
671
    keyBytes += pPkCol->bytes;
×
672
  }
673
  code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState, keyBytes, 0,
×
674
                                &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo),
×
675
                                &pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_HASH_SEARCH, 1);
676

677
  pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimaryKey;
×
678
  pInfo->pOperator = pOperator;
×
679
  pInfo->hasFill = false;
×
680
  pInfo->hasInterpoFunc = windowinterpNeeded(pExpSup->pCtx, numOfExprs);
×
681

682
  setOperatorInfo(pOperator, "StreamIntervalSliceOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL, true, OP_NOT_OPENED,
×
683
                  pInfo, pTaskInfo);
684
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalSliceNext, NULL, destroyStreamIntervalSliceOperatorInfo,
×
685
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
686
  setOperatorStreamStateFn(pOperator, streamIntervalSliceReleaseState, streamIntervalSliceReloadState);
×
687

688
  code = initStreamBasicInfo(&pInfo->basic, pOperator);
×
689
  QUERY_CHECK_CODE(code, lino, _error);
×
690

691
  if (downstream) {
×
692
    code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex,
×
693
                                       &pInfo->twAggSup, &pInfo->basic, &pInfo->interval, pInfo->hasInterpoFunc);
×
694
    QUERY_CHECK_CODE(code, lino, _error);
×
695

696
    code = appendDownstream(pOperator, &downstream, 1);
×
697
    QUERY_CHECK_CODE(code, lino, _error);
×
698
  }
699

700
  (*ppOptInfo) = pOperator;
×
701
  return code;
×
702

703
_error:
×
704
  if (code != TSDB_CODE_SUCCESS) {
×
705
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
706
  }
707
  if (pInfo != NULL) {
×
708
    destroyStreamIntervalSliceOperatorInfo(pInfo);
×
709
  }
710
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
711
  pTaskInfo->code = code;
×
712
  (*ppOptInfo) = NULL;
×
713
  return code;
×
714
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc