• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3608

12 Feb 2025 05:57AM UTC coverage: 63.066% (+1.4%) from 61.715%
#3608

push

travis-ci

web-flow
Merge pull request #29746 from taosdata/merge/mainto3.02

merge: from main to 3.0 branch

140199 of 286257 branches covered (48.98%)

Branch coverage included in aggregate %.

89 of 161 new or added lines in 18 files covered. (55.28%)

3211 existing lines in 190 files now uncovered.

218998 of 283298 relevant lines covered (77.3%)

5949310.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.02
/source/libs/executor/src/streamintervalsliceoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "functionMgt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "storageapi.h"
20
#include "streamexecutorInt.h"
21
#include "tcommon.h"
22
#include "tcompare.h"
23
#include "tdatablock.h"
24
#include "ttime.h"
25

26
#define STREAM_INTERVAL_SLICE_OP_CHECKPOINT_NAME "StreamIntervalSliceOperator_Checkpoint"
27

28
typedef struct SInervalSlicePoint {
29
  SSessionKey      winKey;
30
  bool             *pFinished;
31
  SSliceRowData*   pLastRow;
32
  SRowBuffPos*     pResPos;
33
} SInervalSlicePoint;
34

35
typedef enum SIntervalSliceType {
36
  INTERVAL_SLICE_START = 1,
37
  INTERVAL_SLICE_END = 2,
38
} SIntervalSliceType;
39

40
void streamIntervalSliceReleaseState(SOperatorInfo* pOperator) {
×
41
}
×
42

43
void streamIntervalSliceReloadState(SOperatorInfo* pOperator) {
×
44
}
×
45

46
void destroyStreamIntervalSliceOperatorInfo(void* param) {
70✔
47
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)param;
70✔
48
  if (param == NULL) {
70!
49
    return;
×
50
  }
51
  cleanupBasicInfo(&pInfo->binfo);
70✔
52
  if (pInfo->pOperator) {
70!
53
    cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
70✔
54
                              &pInfo->groupResInfo);
55
    pInfo->pOperator = NULL;
70✔
56
  }
57

58
  destroyStreamBasicInfo(&pInfo->basic);
70✔
59
  clearGroupResInfo(&pInfo->groupResInfo);
70✔
60
  taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
70✔
61
  pInfo->pUpdated = NULL;
70✔
62

63
  if (pInfo->pUpdatedMap != NULL) {
70!
64
    tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos);
70✔
65
    tSimpleHashCleanup(pInfo->pUpdatedMap);
70✔
66
    pInfo->pUpdatedMap = NULL;
70✔
67
  }
68
  destroyStreamAggSupporter(&pInfo->streamAggSup);
70✔
69

70
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
70✔
71
  cleanupExprSupp(&pInfo->scalarSup);
70✔
72

73
  tSimpleHashCleanup(pInfo->pDeletedMap);
70✔
74
  taosArrayDestroy(pInfo->pDelWins);
70✔
75
  blockDataDestroy(pInfo->pDelRes);
70✔
76

77
  blockDataDestroy(pInfo->pCheckpointRes);
70✔
78
  taosMemoryFreeClear(pInfo->pOffsetInfo);
70!
79

80
  taosMemoryFreeClear(param);
70!
81
}
82

83
static int32_t buildIntervalSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,761✔
84
  int32_t                           code = TSDB_CODE_SUCCESS;
1,761✔
85
  int32_t                           lino = 0;
1,761✔
86
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
1,761✔
87
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
1,761✔
88
  uint16_t                          opType = pOperator->operatorType;
1,761✔
89
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
1,761✔
90

91
  doBuildDeleteResultImpl(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pState, pInfo->pDelWins, &pInfo->delIndex,
1,761✔
92
                          pInfo->pDelRes);
93
  if (pInfo->pDelRes->info.rows != 0) {
1,761!
94
    // process the rest of the data
95
    printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
×
96
    (*ppRes) = pInfo->pDelRes;
×
97
    return code;
×
98
  }
99

100
  doBuildStreamIntervalResult(pOperator, pInfo->streamAggSup.pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
1,761✔
101
  if (pInfo->binfo.pRes->info.rows != 0) {
1,761✔
102
    printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
194✔
103
    (*ppRes) = pInfo->binfo.pRes;
194✔
104
    goto _end;
194✔
105
  }
106

107
_end:
1,567✔
108
  if (code != TSDB_CODE_SUCCESS) {
1,761!
109
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
110
  }
111
  return code;
1,761✔
112
}
113

114
// static void doStreamIntervalSliceSaveCheckpoint(SOperatorInfo* pOperator) {
115
// }
116

117
void initIntervalSlicePoint(SStreamAggSupporter* pAggSup, STimeWindow* pTWin, int64_t groupId, SInervalSlicePoint* pPoint) {
273✔
118
  pPoint->winKey.groupId = groupId;
273✔
119
  pPoint->winKey.win = *pTWin;
273✔
120
  pPoint->pFinished = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize);
273✔
121
  pPoint->pLastRow = POINTER_SHIFT(pPoint->pFinished, sizeof(bool));
273✔
122
}
273✔
123

124
int32_t getIntervalSlicePrevStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, SWinKey* pCurKey,
107✔
125
                                     SInervalSlicePoint* pPrevPoint) {
126
  int32_t code = TSDB_CODE_SUCCESS;
107✔
127
  int32_t lino = 0;
107✔
128
  SWinKey prevKey = {.groupId = pCurKey->groupId};
107✔
129
  SET_WIN_KEY_INVALID(prevKey.ts);
107✔
130
  int32_t prevVLen = 0;
107✔
131
  int32_t prevWinCode = TSDB_CODE_SUCCESS;
107✔
132
  code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, pCurKey, &prevKey, (void**)&pPrevPoint->pResPos,
107✔
133
                                                &prevVLen, &prevWinCode);
134
  QUERY_CHECK_CODE(code, lino, _end);
107!
135

136
  if (prevWinCode == TSDB_CODE_SUCCESS) {
107✔
137
    STimeWindow prevSTW = {.skey = prevKey.ts};
79✔
138
    prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval);
79✔
139
    initIntervalSlicePoint(pAggSup, &prevSTW, pCurKey->groupId, pPrevPoint);
79✔
140
    qDebug("===stream=== set stream twa prev point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d",
79!
141
           pPrevPoint->winKey.win.skey, pPrevPoint->winKey.groupId, prevWinCode);
142
  } else {
143
    SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey);
28✔
144
    SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey);
28✔
145
  }
146

147
_end:
107✔
148
  if (code != TSDB_CODE_SUCCESS) {
107!
149
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
150
  }
151
  return code;
107✔
152
}
153

154
static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, bool needPrev, STimeWindow* pTWin, int64_t groupId,
194✔
155
                                           SInervalSlicePoint* pCurPoint, SInervalSlicePoint* pPrevPoint, int32_t* pWinCode) {
156
  int32_t code = TSDB_CODE_SUCCESS;
194✔
157
  int32_t lino = 0;
194✔
158
  SWinKey curKey = {.ts = pTWin->skey, .groupId = groupId};
194✔
159
  int32_t curVLen = 0;
194✔
160
  code = pAggSup->stateStore.streamStateAddIfNotExist(pAggSup->pState, &curKey, (void**)&pCurPoint->pResPos,
194✔
161
                                                      &curVLen, pWinCode);
162
  QUERY_CHECK_CODE(code, lino, _end);
194!
163

164
  qDebug("===stream=== set stream twa cur point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d",
194!
165
         curKey.ts, curKey.groupId, *pWinCode);
166

167
  initIntervalSlicePoint(pAggSup, pTWin, groupId, pCurPoint);
194✔
168

169
  if (needPrev) {
194✔
170
    code = getIntervalSlicePrevStateBuf(pAggSup, pInterval, &curKey, pPrevPoint);
107✔
171
    QUERY_CHECK_CODE(code, lino, _end);
107!
172
  }
173

174
_end:
194✔
175
  if (code != TSDB_CODE_SUCCESS) {
194!
176
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
177
  }
178
  return code;
194✔
179
}
180

181
void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY curTs, SSDataBlock* pDataBlock,
158✔
182
                                int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type, int32_t* pOffsetInfo) {
183
  SqlFunctionCtx* pCtx = pSup->pCtx;
158✔
184
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
1,332✔
185
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
1,174✔
186
      pCtx[k].start.key = INT64_MIN;
836✔
187
      continue;
836✔
188
    }
189

190
    SFunctParam*     pParam = &pCtx[k].param[0];
338✔
191
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, pParam->pCol->slotId);
338✔
192

193
    double           prevVal = 0, curVal = 0, winVal = 0;
338✔
194
    SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId, pOffsetInfo);
338✔
195
    GET_TYPED_DATA(prevVal, double, pCell->type, pCell->pData);
338!
196
    GET_TYPED_DATA(curVal, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
338!
197

198
    SPoint point1 = (SPoint){.key = pPrevWinVal->key, .val = &prevVal};
338✔
199
    SPoint point2 = (SPoint){.key = curTs, .val = &curVal};
338✔
200
    SPoint point = (SPoint){.key = winKey, .val = &winVal};
338✔
201

202
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
338✔
203
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
226✔
204
    }
205

206
    if (type == INTERVAL_SLICE_START) {
338✔
207
      pCtx[k].start.key = point.key;
169✔
208
      pCtx[k].start.val = winVal;
169✔
209
    } else {
210
      pCtx[k].end.key = point.key;
169✔
211
      pCtx[k].end.val = winVal;
169✔
212
    }
213
  }
214
}
158✔
215

216
void doSetElapsedEndKey(TSKEY winKey, SExprSupp* pSup) {
79✔
217
  SqlFunctionCtx* pCtx = pSup->pCtx;
79✔
218
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
666✔
219
    if (fmIsElapsedFunc(pCtx[k].functionId)) {
587✔
220
      pCtx[k].end.key = winKey;
56✔
221
      pCtx[k].end.val = 0;
56✔
222
    }
223
  }
224
}
79✔
225

226
static void resetIntervalSliceFunctionKey(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
273✔
227
  for (int32_t k = 0; k < numOfOutput; ++k) {
1,908✔
228
    pCtx[k].start.key = INT64_MIN;
1,635✔
229
    pCtx[k].end.key = INT64_MIN;
1,635✔
230
  }
231
}
273✔
232

233
int32_t setIntervalSliceOutputBuf(SInervalSlicePoint* pPoint, SqlFunctionCtx* pCtx, int32_t numOfOutput,
273✔
234
                                  int32_t* rowEntryInfoOffset) {
235
  int32_t     code = TSDB_CODE_SUCCESS;
273✔
236
  int32_t     lino = 0;
273✔
237
  SResultRow* res = pPoint->pResPos->pRowBuff;
273✔
238

239
  // set time window for current result
240
  res->win = pPoint->winKey.win;
273✔
241
  code = setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset);
273✔
242
  QUERY_CHECK_CODE(code, lino, _end);
273!
243

244
_end:
273✔
245
  if (code != TSDB_CODE_SUCCESS) {
273!
246
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
247
  }
248
  return code;
273✔
249
}
250

251
static void setInterpoWindowFinished(SInervalSlicePoint* pPoint) {
79✔
252
  (*pPoint->pFinished) = true;
79✔
253
}
79✔
254

255
static bool isInterpoWindowFinished(SInervalSlicePoint* pPoint) {
79✔
256
  return *pPoint->pFinished;
79✔
257
}
258

259
static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock, SSHashObj* pUpdatedMap,
194✔
260
                                            SSHashObj* pDeletedMap) {
261
  int32_t                           code = TSDB_CODE_SUCCESS;
194✔
262
  int32_t                           lino = 0;
194✔
263
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)pOperator->info;
194✔
264
  SResultRowInfo*                   pResultRowInfo = &(pInfo->binfo.resultRowInfo);
194✔
265
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
194✔
266
  SExprSupp*                        pSup = &pOperator->exprSupp;
194✔
267
  int32_t                           numOfOutput = pSup->numOfExprs;
194✔
268
  TSKEY*                            tsCols = NULL;
194✔
269
  int64_t                           groupId = pBlock->info.id.groupId;
194✔
270
  SResultRow*                       pResult = NULL;
194✔
271
  int32_t                           forwardRows = 0;
194✔
272

273
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
194✔
274
  tsCols = (int64_t*)pColDataInfo->pData;
194✔
275

276
  int32_t     startPos = 0;
194✔
277
  TSKEY       curTs = getStartTsKey(&pBlock->info.window, tsCols);
194✔
278
  SInervalSlicePoint curPoint = {0};
194✔
279
  SInervalSlicePoint prevPoint = {0};
194✔
280
  STimeWindow curWin =
281
      getActiveTimeWindow(NULL, pResultRowInfo, curTs, &pInfo->interval, TSDB_ORDER_ASC);
194✔
UNCOV
282
  while (1) {
×
283
    int32_t winCode = TSDB_CODE_SUCCESS;
194✔
284
    if (curTs <= pInfo->endTs) {
194!
285
      code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId, &curPoint, &prevPoint, &winCode);
194✔
286
      QUERY_CHECK_CODE(code, lino, _end);
194!
UNCOV
287
    } else if (pInfo->hasInterpoFunc) {
×
UNCOV
288
      SWinKey curKey = {.ts = curWin.skey, .groupId = groupId};
×
UNCOV
289
      code = getIntervalSlicePrevStateBuf(&pInfo->streamAggSup, &pInfo->interval, &curKey, &prevPoint);
×
UNCOV
290
      QUERY_CHECK_CODE(code, lino, _end);
×
291
    }
292

293
    if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && isInterpoWindowFinished(&prevPoint) == false) {
273!
294
      code = setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
79✔
295
      QUERY_CHECK_CODE(code, lino, _end);
79!
296

297
      resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
79✔
298
      doSetElapsedEndKey(prevPoint.winKey.win.ekey, &pOperator->exprSupp);
79✔
299
      doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END, pInfo->pOffsetInfo);
79✔
300
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1);
79✔
301
      code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
79✔
302
                                             0, pBlock->info.rows, numOfOutput);
79✔
303
      QUERY_CHECK_CODE(code, lino, _end);
79!
304
      SWinKey prevKey = {.ts = prevPoint.winKey.win.skey, .groupId = prevPoint.winKey.groupId};
79✔
305
      code = saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap);
79✔
306
      QUERY_CHECK_CODE(code, lino, _end);
79!
307
      setInterpoWindowFinished(&prevPoint);
79✔
308
    } else if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey)) {
115✔
309
      releaseOutputBuf(pInfo->streamAggSup.pState, prevPoint.pResPos, &pInfo->streamAggSup.stateStore);
87✔
310
    }
311

312
    if (curTs > pInfo->endTs) {
194!
313
      break;
194✔
314
    }
315

316
    code = setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
194✔
317
    QUERY_CHECK_CODE(code, lino, _end);
194!
318

319
    resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
194✔
320
    if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) {
194!
321
      doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START, pInfo->pOffsetInfo);
79✔
322
    }
323
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
194✔
324
                                           TSDB_ORDER_ASC);
325
    int32_t prevEndPos = (forwardRows - 1) + startPos;
194✔
326
    if (pInfo->hasInterpoFunc) {
194✔
327
      int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false);
107✔
328
      TSKEY endRowTs = tsCols[endRowId];
107✔
329
      transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL, pInfo->pOffsetInfo);
107✔
330
    }
331
    SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = curPoint.winKey.groupId};
194✔
332
    if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) {
194!
333
      code = tSimpleHashPut(pDeletedMap, &curKey, sizeof(SWinKey), NULL, 0);
×
334
      QUERY_CHECK_CODE(code, lino, _end);
×
335
    }
336

337
    code = saveWinResult(&curKey, curPoint.pResPos, pInfo->pUpdatedMap);
194✔
338
    QUERY_CHECK_CODE(code, lino, _end);
194!
339

340
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curPoint.winKey.win, 1);
194✔
341
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
194✔
342
                                           forwardRows, pBlock->info.rows, numOfOutput);
194✔
343
    QUERY_CHECK_CODE(code, lino, _end);
194!
344

345
    if (curPoint.pLastRow->key == curPoint.winKey.win.ekey) {
194!
UNCOV
346
      setInterpoWindowFinished(&curPoint);
×
347
    }
348

349
    startPos = getNextQualifiedWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
194✔
350
    if (startPos < 0) {
194!
351
      break;
194✔
352
    }
UNCOV
353
    curTs = tsCols[startPos];
×
354
  }
355

356
_end:
194✔
357
  if (code != TSDB_CODE_SUCCESS) {
194!
358
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
359
  }
360
  return code;
194✔
361
}
362

363
static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,733✔
364
  int32_t                           code = TSDB_CODE_SUCCESS;
2,733✔
365
  int32_t                           lino = 0;
2,733✔
366
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
2,733✔
367
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
2,733✔
368
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
2,733✔
369

370
  qDebug("stask:%s  %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
2,733!
371

372
  if (pOperator->status == OP_EXEC_DONE) {
2,734!
373
    (*ppRes) = NULL;
×
374
    goto _end;
×
375
  }
376

377
  if (pOperator->status == OP_RES_TO_RETURN) {
2,734✔
378
    SSDataBlock* resBlock = NULL;
197✔
379
    code = buildIntervalSliceResult(pOperator, &resBlock);
197✔
380
    QUERY_CHECK_CODE(code, lino, _end);
197!
381
    if (resBlock != NULL) {
197✔
382
      (*ppRes) = resBlock;
44✔
383
      return code;
197✔
384
    }
385

386
    if (pInfo->recvCkBlock) {
153!
387
      pInfo->recvCkBlock = false;
×
388
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
389
      (*ppRes) = pInfo->pCheckpointRes;
×
390
      return code;
×
391
    } 
392

393
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
153✔
394
    setStreamOperatorCompleted(pOperator);
153✔
395
    (*ppRes) = NULL;
153✔
396
    return code;
153✔
397
  }
398

399
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2,537✔
400
  int32_t numOfDatapack = 0;
2,537✔
401

402
  while (1) {
791✔
403
    SSDataBlock* pBlock = NULL;
3,328✔
404
    code = downstream->fpSet.getNextFn(downstream, &pBlock);
3,328✔
405
    QUERY_CHECK_CODE(code, lino, _end);
4,301!
406

407
    if (pBlock == NULL) {
3,328✔
408
      pOperator->status = OP_RES_TO_RETURN;
1,564✔
409
      break;
1,564✔
410
    }
411

412
    switch (pBlock->info.type) {
1,764!
413
      case STREAM_NORMAL:
194✔
414
      case STREAM_INVALID: {
415
        SExprSupp* pExprSup = &pInfo->scalarSup;
194✔
416
        if (pExprSup->pExprInfo != NULL) {
194!
417
          code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
418
          QUERY_CHECK_CODE(code, lino, _end);
×
419
        }
420
      } break;
194✔
421
      case STREAM_CHECKPOINT: {
3✔
422
        pInfo->recvCkBlock = true;
3✔
423
        pAggSup->stateStore.streamStateCommit(pAggSup->pState);
3✔
424
        code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
3✔
425
        QUERY_CHECK_CODE(code, lino, _end);
3!
426
        continue;
597✔
427
      } break;
428
      case STREAM_CREATE_CHILD_TABLE: {
6✔
429
        (*ppRes) = pBlock;
6✔
430
        goto _end;
6✔
431
      } break;
432
      case STREAM_GET_RESULT: {
1,561✔
433
        pInfo->endTs = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval);
1,561✔
434
        if (pInfo->hasFill) {
1,561✔
435
          (*ppRes) = pBlock;
967✔
436
          goto _end;
967✔
437
        } else {
438
          continue;
594✔
439
        }
440
      }
441
      default:
×
442
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
443
        QUERY_CHECK_CODE(code, lino, _end);
×
444
    }
445

446
    code = setInputDataBlock(&pOperator->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
194✔
447
    QUERY_CHECK_CODE(code, lino, _end);
194!
448
    code = doStreamIntervalSliceAggImpl(pOperator, pBlock, pInfo->pUpdatedMap, NULL);
194✔
449
    QUERY_CHECK_CODE(code, lino, _end);
194!
450
    
451
  }
452

453
  if (!pInfo->destHasPrimaryKey) {
1,564!
454
    removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
1,564✔
455
  }
456

457
  if (pInfo->destHasPrimaryKey) {
1,564!
458
    code = copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins);
×
459
    QUERY_CHECK_CODE(code, lino, _end);
×
460
  }
461

462
  code = copyUpdateResult(&pInfo->pUpdatedMap, pInfo->pUpdated, winPosCmprImpl);
1,564✔
463
  QUERY_CHECK_CODE(code, lino, _end);
1,564!
464

465
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
1,564✔
466
  pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
1,564✔
467
  QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
1,564!
468

469
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
1,564✔
470
  pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
1,564✔
471
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
1,563!
472

473
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,563✔
474
  QUERY_CHECK_CODE(code, lino, _end);
1,564!
475

476
  (*ppRes) = NULL;
1,564✔
477
  code = buildIntervalSliceResult(pOperator, ppRes);
1,564✔
478
  QUERY_CHECK_CODE(code, lino, _end);
1,564!
479

480
  if ((*ppRes) == NULL) {
1,564✔
481
    if (pInfo->recvCkBlock) {
1,414✔
482
      pInfo->recvCkBlock = false;
3✔
483
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
3✔
484
      (*ppRes) = pInfo->pCheckpointRes;
3✔
485
      return code;
3✔
486
    } 
487
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
1,411✔
488
    setStreamOperatorCompleted(pOperator);
1,411✔
489
  }
490

491
_end:
150✔
492
  if (code != TSDB_CODE_SUCCESS) {
2,534!
493
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
494
  }
495
  return code;
2,534✔
496
}
497

498
int32_t initIntervalSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type,
86✔
499
                                    int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic,
500
                                    SInterval* pInterval, bool hasInterpoFunc) {
501
  SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
86✔
502
  int32_t        code = TSDB_CODE_SUCCESS;
86✔
503
  int32_t        lino = 0;
86✔
504
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
86✔
505
    SStreamPartitionOperatorInfo* pPartionInfo = downstream->info;
16✔
506
    pPartionInfo->tsColIndex = tsColIndex;
16✔
507
    pBasic->primaryPkIndex = pPartionInfo->basic.primaryPkIndex;
16✔
508
  }
509

510
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
86✔
511
    code =
512
        initIntervalSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pInterval, hasInterpoFunc);
16✔
513
    return code;
16✔
514
  }
515
  SStreamScanInfo* pScanInfo = downstream->info;
70✔
516
  pScanInfo->useGetResultRange = hasInterpoFunc;
70✔
517
  pScanInfo->igCheckUpdate = true;
70✔
518
  pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
70✔
519
  pScanInfo->pState = pAggSup->pState;
70✔
520
  if (!pScanInfo->pUpdateInfo) {
70✔
521
    code = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
52✔
522
                                              pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen,
52✔
523
                                              &pScanInfo->pUpdateInfo);
52✔
524
    QUERY_CHECK_CODE(code, lino, _end);
52!
525
  }
526
  pScanInfo->twAggSup = *pTwSup;
70✔
527
  pScanInfo->interval = *pInterval;
70✔
528
  pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
70✔
529
  if (!hasSrcPrimaryKeyCol(pBasic)) {
70!
530
    pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
70✔
531
  }
532

533
_end:
×
534
  if (code != TSDB_CODE_SUCCESS) {
70!
535
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
536
  }
537
  return code;
70✔
538
}
539

540
static bool windowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols) {
70✔
541
  bool  needed = false;
70✔
542
  for (int32_t i = 0; i < numOfCols; ++i) {
269✔
543
    SExprInfo* pExpr = pCtx[i].pExpr;
237✔
544
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
237✔
545
      needed = true;
38✔
546
      break;
38✔
547
    }
548
  }
549
  return needed;
70✔
550
}
551

552
int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
69✔
553
                                              SReadHandle* pHandle, SOperatorInfo** ppOptInfo) {
554
  int32_t                           code = TSDB_CODE_SUCCESS;
69✔
555
  int32_t                           lino = 0;
69✔
556
  SStreamIntervalSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalSliceOperatorInfo));
69!
557
  QUERY_CHECK_NULL(pInfo, code, lino, _error, terrno);
70!
558

559
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
70!
560
  QUERY_CHECK_NULL(pOperator, code, lino, _error, terrno)
70!
561

562
  pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
70✔
563
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _error, terrno);
70!
564

565
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
70✔
566
  pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
69✔
567
  QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _error, terrno);
70!
568

569
  pInfo->pDeletedMap = tSimpleHashInit(1024, hashFn);
70✔
570
  QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno);
69!
571

572
  pInfo->delIndex = 0;
69✔
573
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
69✔
574
  QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno);
69!
575

576
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
69✔
577
  QUERY_CHECK_CODE(code, lino, _error);
70!
578

579
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
70✔
580
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
70!
581
  initBasicInfo(&pInfo->binfo, pResBlock);
70✔
582

583
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
70✔
584
  QUERY_CHECK_CODE(code, lino, _error);
70!
585
  pInfo->recvCkBlock = false;
70✔
586

587
  SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
70✔
588
  pOperator->pTaskInfo = pTaskInfo;
70✔
589
  initResultSizeInfo(&pOperator->resultInfo, 4096);
70✔
590
  SExprSupp* pExpSup = &pOperator->exprSupp;
70✔
591
  int32_t    numOfExprs = 0;
70✔
592
  SExprInfo* pExprInfo = NULL;
70✔
593
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfExprs);
70✔
594
  QUERY_CHECK_CODE(code, lino, _error);
70!
595

596
  code = initExprSupp(pExpSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
70✔
597
  QUERY_CHECK_CODE(code, lino, _error);
70!
598

599
  pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
70✔
600
                                .sliding = pIntervalPhyNode->sliding,
70✔
601
                                .intervalUnit = pIntervalPhyNode->intervalUnit,
70✔
602
                                .slidingUnit = pIntervalPhyNode->slidingUnit,
70✔
603
                                .offset = pIntervalPhyNode->offset,
70✔
604
                                .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
70✔
605
                                .timeRange = pIntervalPhyNode->timeRange};
70✔
606
  calcIntervalAutoOffset(&pInfo->interval);
70✔
607

608
  pInfo->twAggSup =
70✔
609
      (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
70✔
610
                           .calTrigger = pIntervalPhyNode->window.triggerType,
70✔
611
                           .maxTs = INT64_MIN,
612
                           .minTs = INT64_MAX,
613
                           .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)};
70✔
614
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
70✔
615
  QUERY_CHECK_CODE(code, lino, _error);
70!
616

617
  if (pIntervalPhyNode->window.pExprs != NULL) {
70!
618
    int32_t    numOfScalar = 0;
×
619
    SExprInfo* pScalarExprInfo = NULL;
×
620
    code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
×
621
    QUERY_CHECK_CODE(code, lino, _error);
×
622

623
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
×
624
    QUERY_CHECK_CODE(code, lino, _error);
×
625
  }
626

627
  SSDataBlock* pDownRes = NULL;
70✔
628
  SColumnInfo* pPkCol = NULL;
70✔
629
  code = getDownstreamRes(downstream, &pDownRes, &pPkCol);
70✔
630
  QUERY_CHECK_CODE(code, lino, _error);
70!
631

632
  code = initOffsetInfo(&pInfo->pOffsetInfo, pDownRes);
70✔
633
  QUERY_CHECK_CODE(code, lino, _error);
70!
634

635
  int32_t keyBytes = sizeof(TSKEY);
70✔
636
  keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock) + sizeof(bool);
70✔
637
  if (pPkCol) {
70✔
638
    keyBytes += pPkCol->bytes;
3✔
639
  }
640
  code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState, keyBytes, 0,
70✔
641
                                &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo),
70✔
642
                                &pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_HASH_SEARCH, 1);
643

644
  pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimaryKey;
70✔
645
  pInfo->pOperator = pOperator;
70✔
646
  pInfo->hasFill = false;
70✔
647
  pInfo->hasInterpoFunc = windowinterpNeeded(pExpSup->pCtx, numOfExprs);
70✔
648

649
  setOperatorInfo(pOperator, "StreamIntervalSliceOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL, true, OP_NOT_OPENED,
70✔
650
                  pInfo, pTaskInfo);
651
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalSliceNext, NULL, destroyStreamIntervalSliceOperatorInfo,
70✔
652
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
653
  setOperatorStreamStateFn(pOperator, streamIntervalSliceReleaseState, streamIntervalSliceReloadState);
70✔
654

655
  code = initStreamBasicInfo(&pInfo->basic);
70✔
656
  QUERY_CHECK_CODE(code, lino, _error);
70!
657
  if (downstream) {
70!
658
    code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex,
70✔
659
                                       &pInfo->twAggSup, &pInfo->basic, &pInfo->interval, pInfo->hasInterpoFunc);
70✔
660
    QUERY_CHECK_CODE(code, lino, _error);
70!
661

662
    code = appendDownstream(pOperator, &downstream, 1);
70✔
663
    QUERY_CHECK_CODE(code, lino, _error);
70!
664
  }
665

666
  (*ppOptInfo) = pOperator;
70✔
667
  return code;
70✔
668

669
_error:
×
670
  if (code != TSDB_CODE_SUCCESS) {
×
671
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
672
  }
673
  if (pInfo != NULL) {
×
674
    destroyStreamIntervalSliceOperatorInfo(pInfo);
×
675
  }
676
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
677
  pTaskInfo->code = code;
×
678
  (*ppOptInfo) = NULL;
×
679
  return code;
×
680
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc