• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3544

30 Nov 2024 03:06AM UTC coverage: 60.88% (+0.04%) from 60.842%
#3544

push

travis-ci

web-flow
Merge pull request #28988 from taosdata/main

merge: from main to 3.0 branch

120724 of 253479 branches covered (47.63%)

Branch coverage included in aggregate %.

407 of 489 new or added lines in 21 files covered. (83.23%)

1148 existing lines in 113 files now uncovered.

201919 of 276488 relevant lines covered (73.03%)

18898587.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.75
/source/libs/executor/src/streamintervalsliceoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "functionMgt.h"
17
#include "operator.h"
18
#include "querytask.h"
19
#include "storageapi.h"
20
#include "streamexecutorInt.h"
21
#include "tcommon.h"
22
#include "tcompare.h"
23
#include "tdatablock.h"
24
#include "ttime.h"
25

26
#define STREAM_INTERVAL_SLICE_OP_CHECKPOINT_NAME "StreamIntervalSliceOperator_Checkpoint"
27

28
typedef struct SInervalSlicePoint {
29
  SSessionKey      winKey;
30
  bool             *pFinished;
31
  SSliceRowData*   pLastRow;
32
  SRowBuffPos*     pResPos;
33
} SInervalSlicePoint;
34

35
typedef enum SIntervalSliceType {
36
  INTERVAL_SLICE_START = 1,
37
  INTERVAL_SLICE_END = 2,
38
} SIntervalSliceType;
39

40
void streamIntervalSliceReleaseState(SOperatorInfo* pOperator) {
×
41
}
×
42

43
void streamIntervalSliceReloadState(SOperatorInfo* pOperator) {
×
44
}
×
45

46
void destroyStreamIntervalSliceOperatorInfo(void* param) {
52✔
47
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)param;
52✔
48
  if (param == NULL) {
52!
49
    return;
×
50
  }
51
  cleanupBasicInfo(&pInfo->binfo);
52✔
52
  if (pInfo->pOperator) {
52!
53
    cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
52✔
54
                              &pInfo->groupResInfo);
55
    pInfo->pOperator = NULL;
52✔
56
  }
57

58
  clearGroupResInfo(&pInfo->groupResInfo);
52✔
59
  taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
52✔
60
  pInfo->pUpdated = NULL;
52✔
61

62
  if (pInfo->pUpdatedMap != NULL) {
52!
63
    tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos);
52✔
64
    tSimpleHashCleanup(pInfo->pUpdatedMap);
52✔
65
    pInfo->pUpdatedMap = NULL;
52✔
66
  }
67
  destroyStreamAggSupporter(&pInfo->streamAggSup);
52✔
68

69
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
52✔
70
  cleanupExprSupp(&pInfo->scalarSup);
52✔
71

72
  tSimpleHashCleanup(pInfo->pDeletedMap);
52✔
73
  taosArrayDestroy(pInfo->pDelWins);
52✔
74
  blockDataDestroy(pInfo->pDelRes);
52✔
75

76
  blockDataDestroy(pInfo->pCheckpointRes);
52✔
77
  taosMemoryFreeClear(pInfo->pOffsetInfo);
52!
78

79
  taosMemoryFreeClear(param);
52!
80
}
81

82
static int32_t buildIntervalSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,305✔
83
  int32_t                           code = TSDB_CODE_SUCCESS;
1,305✔
84
  int32_t                           lino = 0;
1,305✔
85
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
1,305✔
86
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
1,305✔
87
  uint16_t                          opType = pOperator->operatorType;
1,305✔
88
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
1,305✔
89

90
  doBuildDeleteResultImpl(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pState, pInfo->pDelWins, &pInfo->delIndex,
1,305✔
91
                          pInfo->pDelRes);
92
  if (pInfo->pDelRes->info.rows != 0) {
1,305!
93
    // process the rest of the data
94
    printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
×
95
    (*ppRes) = pInfo->pDelRes;
×
96
    return code;
×
97
  }
98

99
  doBuildStreamIntervalResult(pOperator, pInfo->streamAggSup.pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
1,305✔
100
  if (pInfo->binfo.pRes->info.rows != 0) {
1,305✔
101
    printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
152✔
102
    (*ppRes) = pInfo->binfo.pRes;
152✔
103
    goto _end;
152✔
104
  }
105

106
_end:
1,153✔
107
  if (code != TSDB_CODE_SUCCESS) {
1,305!
108
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
109
  }
110
  return code;
1,305✔
111
}
112

113
// static void doStreamIntervalSliceSaveCheckpoint(SOperatorInfo* pOperator) {
114
// }
115

116
void initIntervalSlicePoint(SStreamAggSupporter* pAggSup, STimeWindow* pTWin, int64_t groupId, SInervalSlicePoint* pPoint) {
226✔
117
  pPoint->winKey.groupId = groupId;
226✔
118
  pPoint->winKey.win = *pTWin;
226✔
119
  pPoint->pFinished = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize);
226✔
120
  pPoint->pLastRow = POINTER_SHIFT(pPoint->pFinished, sizeof(bool));
226✔
121
}
226✔
122

123
static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, bool needPrev, STimeWindow* pTWin, int64_t groupId,
152✔
124
                                           SInervalSlicePoint* pCurPoint, SInervalSlicePoint* pPrevPoint, int32_t* pWinCode) {
125
  int32_t code = TSDB_CODE_SUCCESS;
152✔
126
  int32_t lino = 0;
152✔
127
  SWinKey curKey = {.ts = pTWin->skey, .groupId = groupId};
152✔
128
  int32_t curVLen = 0;
152✔
129
  code = pAggSup->stateStore.streamStateAddIfNotExist(pAggSup->pState, &curKey, (void**)&pCurPoint->pResPos,
152✔
130
                                                      &curVLen, pWinCode);
131
  QUERY_CHECK_CODE(code, lino, _end);
152!
132

133
  qDebug("===stream=== set stream twa cur point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d",
152!
134
         curKey.ts, curKey.groupId, *pWinCode);
135

136
  initIntervalSlicePoint(pAggSup, pTWin, groupId, pCurPoint);
152✔
137

138
  if (needPrev) {
152✔
139
    SWinKey prevKey = {.groupId = groupId};
97✔
140
    SET_WIN_KEY_INVALID(prevKey.ts);
97✔
141
    int32_t prevVLen = 0;
97✔
142
    int32_t prevWinCode = TSDB_CODE_SUCCESS;
97✔
143
    code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, &curKey, &prevKey, (void**)&pPrevPoint->pResPos,
97✔
144
                                                  &prevVLen, &prevWinCode);
145
    QUERY_CHECK_CODE(code, lino, _end);
97!
146

147
    if (prevWinCode == TSDB_CODE_SUCCESS) {
97✔
148
      STimeWindow prevSTW = {.skey = prevKey.ts};
74✔
149
      prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval);
74✔
150
      initIntervalSlicePoint(pAggSup, &prevSTW, groupId, pPrevPoint);
74✔
151
      qDebug("===stream=== set stream twa prev point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d", pPrevPoint->winKey.win.skey,
74!
152
             pPrevPoint->winKey.groupId, prevWinCode);
153
    } else {
154
      SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey);
23✔
155
      SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey);
23✔
156
    }
157
  }
158

159
_end:
55✔
160
  if (code != TSDB_CODE_SUCCESS) {
152!
161
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
162
  }
163
  return code;
152✔
164
}
165

166
void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY curTs, SSDataBlock* pDataBlock,
148✔
167
                                int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type, int32_t* pOffsetInfo) {
168
  SqlFunctionCtx* pCtx = pSup->pCtx;
148✔
169
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
1,248✔
170
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
1,100✔
171
      pCtx[k].start.key = INT64_MIN;
784✔
172
      continue;
784✔
173
    }
174

175
    SFunctParam*     pParam = &pCtx[k].param[0];
316✔
176
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, pParam->pCol->slotId);
316✔
177

178
    double           prevVal = 0, curVal = 0, winVal = 0;
316✔
179
    SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId, pOffsetInfo);
316✔
180
    GET_TYPED_DATA(prevVal, double, pCell->type, pCell->pData);
316!
181
    GET_TYPED_DATA(curVal, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
316!
182

183
    SPoint point1 = (SPoint){.key = pPrevWinVal->key, .val = &prevVal};
316✔
184
    SPoint point2 = (SPoint){.key = curTs, .val = &curVal};
316✔
185
    SPoint point = (SPoint){.key = winKey, .val = &winVal};
316✔
186

187
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
316✔
188
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
212✔
189
    }
190

191
    if (type == INTERVAL_SLICE_START) {
316✔
192
      pCtx[k].start.key = point.key;
158✔
193
      pCtx[k].start.val = winVal;
158✔
194
    } else {
195
      pCtx[k].end.key = point.key;
158✔
196
      pCtx[k].end.val = winVal;
158✔
197
    }
198
  }
199
}
148✔
200

201
void doSetElapsedEndKey(TSKEY winKey, SExprSupp* pSup) {
74✔
202
  SqlFunctionCtx* pCtx = pSup->pCtx;
74✔
203
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
624✔
204
    if (fmIsElapsedFunc(pCtx[k].functionId)) {
550✔
205
      pCtx[k].end.key = winKey;
52✔
206
      pCtx[k].end.val = 0;
52✔
207
    }
208
  }
209
}
74✔
210

211
static void resetIntervalSliceFunctionKey(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
226✔
212
  for (int32_t k = 0; k < numOfOutput; ++k) {
1,676✔
213
    pCtx[k].start.key = INT64_MIN;
1,450✔
214
    pCtx[k].end.key = INT64_MIN;
1,450✔
215
  }
216
}
226✔
217

218
int32_t setIntervalSliceOutputBuf(SInervalSlicePoint* pPoint, SqlFunctionCtx* pCtx, int32_t numOfOutput,
226✔
219
                                  int32_t* rowEntryInfoOffset) {
220
  int32_t     code = TSDB_CODE_SUCCESS;
226✔
221
  int32_t     lino = 0;
226✔
222
  SResultRow* res = pPoint->pResPos->pRowBuff;
226✔
223

224
  // set time window for current result
225
  res->win = pPoint->winKey.win;
226✔
226
  code = setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset);
226✔
227
  QUERY_CHECK_CODE(code, lino, _end);
226!
228

229
_end:
226✔
230
  if (code != TSDB_CODE_SUCCESS) {
226!
231
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
232
  }
233
  return code;
226✔
234
}
235

236
static void setInterpoWindowFinished(SInervalSlicePoint* pPoint) {
74✔
237
  (*pPoint->pFinished) = true;
74✔
238
}
74✔
239

240
static bool isInterpoWindowFinished(SInervalSlicePoint* pPoint) {
74✔
241
  return *pPoint->pFinished;
74✔
242
}
243

244
static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock, SSHashObj* pUpdatedMap,
152✔
245
                                            SSHashObj* pDeletedMap) {
246
  int32_t                           code = TSDB_CODE_SUCCESS;
152✔
247
  int32_t                           lino = 0;
152✔
248
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)pOperator->info;
152✔
249
  SResultRowInfo*                   pResultRowInfo = &(pInfo->binfo.resultRowInfo);
152✔
250
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
152✔
251
  SExprSupp*                        pSup = &pOperator->exprSupp;
152✔
252
  int32_t                           numOfOutput = pSup->numOfExprs;
152✔
253
  TSKEY*                            tsCols = NULL;
152✔
254
  int64_t                           groupId = pBlock->info.id.groupId;
152✔
255
  SResultRow*                       pResult = NULL;
152✔
256
  int32_t                           forwardRows = 0;
152✔
257

258
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
152✔
259
  tsCols = (int64_t*)pColDataInfo->pData;
152✔
260

261
  int32_t     startPos = 0;
152✔
262
  TSKEY       curTs = getStartTsKey(&pBlock->info.window, tsCols);
152✔
263
  SInervalSlicePoint curPoint = {0};
152✔
264
  SInervalSlicePoint prevPoint = {0};
152✔
265
  STimeWindow curWin =
266
      getActiveTimeWindow(NULL, pResultRowInfo, curTs, &pInfo->interval, TSDB_ORDER_ASC);
152✔
267
  while (1) {
×
268
    if (curTs > pInfo->endTs) {
152!
269
      break;
152✔
270
    }
271

272
    int32_t winCode = TSDB_CODE_SUCCESS;
152✔
273
    code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId, &curPoint, &prevPoint, &winCode);
152✔
274
    QUERY_CHECK_CODE(code, lino, _end);
152!
275

276
    if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && isInterpoWindowFinished(&prevPoint) == false) {
152!
277
      code = setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
74✔
278
      QUERY_CHECK_CODE(code, lino, _end);
74!
279

280
      resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
74✔
281
      doSetElapsedEndKey(prevPoint.winKey.win.ekey, &pOperator->exprSupp);
74✔
282
      doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END, pInfo->pOffsetInfo);
74✔
283
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1);
74✔
284
      code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
74✔
285
                                             0, pBlock->info.rows, numOfOutput);
74✔
286
      QUERY_CHECK_CODE(code, lino, _end);
74!
287
      SWinKey prevKey = {.ts = prevPoint.winKey.win.skey, .groupId = prevPoint.winKey.groupId};
74✔
288
      code = saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap);
74✔
289
      QUERY_CHECK_CODE(code, lino, _end);
74!
290
      setInterpoWindowFinished(&prevPoint);
74✔
291
    }
292

293
    code = setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
152✔
294
    QUERY_CHECK_CODE(code, lino, _end);
152!
295

296
    resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
152✔
297
    if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) {
152!
298
      doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START, pInfo->pOffsetInfo);
74✔
299
    }
300
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
152✔
301
                                           TSDB_ORDER_ASC);
302
    int32_t prevEndPos = (forwardRows - 1) + startPos;
152✔
303
    if (pInfo->hasInterpoFunc && winCode != TSDB_CODE_SUCCESS) {
152!
304
      int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false);
97✔
305
      TSKEY endRowTs = tsCols[endRowId];
97✔
306
      transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL, pInfo->pOffsetInfo);
97✔
307
    }
308
    SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = curPoint.winKey.groupId};
152✔
309
    if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) {
152!
310
      code = tSimpleHashPut(pDeletedMap, &curKey, sizeof(SWinKey), NULL, 0);
×
311
      QUERY_CHECK_CODE(code, lino, _end);
×
312
    }
313

314
    code = saveWinResult(&curKey, curPoint.pResPos, pInfo->pUpdatedMap);
152✔
315
    QUERY_CHECK_CODE(code, lino, _end);
152!
316

317
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curPoint.winKey.win, 1);
152✔
318
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
152✔
319
                                           forwardRows, pBlock->info.rows, numOfOutput);
152✔
320
    QUERY_CHECK_CODE(code, lino, _end);
152!
321

322
    if (curPoint.pLastRow->key == curPoint.winKey.win.ekey) {
152!
323
      setInterpoWindowFinished(&curPoint);
×
324
    }
325

326
    startPos = getNextQualifiedWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
152✔
327
    if (startPos < 0) {
152!
328
      break;
152✔
329
    }
330
    curTs = tsCols[startPos];
×
331
  }
332

333
_end:
152✔
334
  if (code != TSDB_CODE_SUCCESS) {
152!
335
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
336
  }
337
  return code;
152✔
338
}
339

340
static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,084✔
341
  int32_t                           code = TSDB_CODE_SUCCESS;
2,084✔
342
  int32_t                           lino = 0;
2,084✔
343
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
2,084✔
344
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
2,084✔
345
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
2,084✔
346

347
  qDebug("stask:%s  %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
2,084!
348

349
  if (pOperator->status == OP_EXEC_DONE) {
2,084!
350
    (*ppRes) = NULL;
×
351
    goto _end;
×
352
  }
353

354
  if (pOperator->status == OP_RES_TO_RETURN) {
2,084✔
355
    SSDataBlock* resBlock = NULL;
168✔
356
    code = buildIntervalSliceResult(pOperator, &resBlock);
168✔
357
    QUERY_CHECK_CODE(code, lino, _end);
168!
358
    if (resBlock != NULL) {
168✔
359
      (*ppRes) = resBlock;
43✔
360
      return code;
168✔
361
    }
362

363
    if (pInfo->recvCkBlock) {
125!
NEW
364
      pInfo->recvCkBlock = false;
×
NEW
365
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
NEW
366
      (*ppRes) = pInfo->pCheckpointRes;
×
NEW
367
      return code;
×
368
    } 
369

370
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
125✔
371
    setStreamOperatorCompleted(pOperator);
125✔
372
    (*ppRes) = NULL;
125✔
373
    return code;
125✔
374
  }
375

376
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,916✔
377
  int32_t numOfDatapack = 0;
1,916✔
378

379
  while (1) {
516✔
380
    SSDataBlock* pBlock = NULL;
2,432✔
381
    code = downstream->fpSet.getNextFn(downstream, &pBlock);
2,432✔
382
    QUERY_CHECK_CODE(code, lino, _end);
3,211!
383

384
    if (pBlock == NULL) {
2,432✔
385
      pOperator->status = OP_RES_TO_RETURN;
1,137✔
386
      break;
1,137✔
387
    }
388

389
    switch (pBlock->info.type) {
1,295!
390
      case STREAM_NORMAL:
152✔
391
      case STREAM_INVALID: {
392
        SExprSupp* pExprSup = &pInfo->scalarSup;
152✔
393
        if (pExprSup->pExprInfo != NULL) {
152!
394
          code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
395
          QUERY_CHECK_CODE(code, lino, _end);
×
396
        }
397
      } break;
152✔
398
      case STREAM_CHECKPOINT: {
16✔
399
        pInfo->recvCkBlock = true;
16✔
400
        pAggSup->stateStore.streamStateCommit(pAggSup->pState);
16✔
401
        code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
16✔
402
        QUERY_CHECK_CODE(code, lino, _end);
16!
403
        continue;
364✔
404
      } break;
405
      case STREAM_CREATE_CHILD_TABLE: {
6✔
406
        (*ppRes) = pBlock;
6✔
407
        goto _end;
6✔
408
      } break;
409
      case STREAM_GET_RESULT: {
1,121✔
410
        pInfo->endTs = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval);
1,121✔
411
        if (pInfo->hasFill) {
1,121✔
412
          (*ppRes) = pBlock;
773✔
413
          goto _end;
773✔
414
        } else {
415
          continue;
348✔
416
        }
417
      }
418
      default:
×
419
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
420
        QUERY_CHECK_CODE(code, lino, _end);
×
421
    }
422

423
    code = setInputDataBlock(&pOperator->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
152✔
424
    QUERY_CHECK_CODE(code, lino, _end);
152!
425
    code = doStreamIntervalSliceAggImpl(pOperator, pBlock, pInfo->pUpdatedMap, NULL);
152✔
426
    QUERY_CHECK_CODE(code, lino, _end);
152!
427
    
428
  }
429

430
  if (!pInfo->destHasPrimaryKey) {
1,137!
431
    removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
1,137✔
432
  }
433

434
  if (pInfo->destHasPrimaryKey) {
1,137!
435
    code = copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins);
×
436
    QUERY_CHECK_CODE(code, lino, _end);
×
437
  }
438

439
  code = copyUpdateResult(&pInfo->pUpdatedMap, pInfo->pUpdated, winPosCmprImpl);
1,137✔
440
  QUERY_CHECK_CODE(code, lino, _end);
1,137!
441

442
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
1,137✔
443
  pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
1,137✔
444
  QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
1,137!
445

446
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
1,137✔
447
  pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
1,137✔
448
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
1,137!
449

450
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,137✔
451
  QUERY_CHECK_CODE(code, lino, _end);
1,137!
452

453
  (*ppRes) = NULL;
1,137✔
454
  code = buildIntervalSliceResult(pOperator, ppRes);
1,137✔
455
  QUERY_CHECK_CODE(code, lino, _end);
1,137!
456

457
  if ((*ppRes) == NULL) {
1,137✔
458
    if (pInfo->recvCkBlock) {
1,028✔
459
      pInfo->recvCkBlock = false;
16✔
460
      printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
16✔
461
      (*ppRes) = pInfo->pCheckpointRes;
16✔
462
      return code;
16✔
463
    } 
464
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
1,012✔
465
    setStreamOperatorCompleted(pOperator);
1,012✔
466
  }
467

468
_end:
109✔
469
  if (code != TSDB_CODE_SUCCESS) {
1,900!
470
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
471
  }
472
  return code;
1,900✔
473
}
474

475
int32_t initIntervalSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type,
68✔
476
                                    int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic,
477
                                    SInterval* pInterval, bool hasInterpoFunc) {
478
  SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
68✔
479
  int32_t        code = TSDB_CODE_SUCCESS;
68✔
480
  int32_t        lino = 0;
68✔
481
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
68✔
482
    SStreamPartitionOperatorInfo* pPartionInfo = downstream->info;
16✔
483
    pPartionInfo->tsColIndex = tsColIndex;
16✔
484
    pBasic->primaryPkIndex = pPartionInfo->basic.primaryPkIndex;
16✔
485
  }
486

487
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
68✔
488
    code =
489
        initIntervalSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pInterval, hasInterpoFunc);
16✔
490
    return code;
16✔
491
  }
492
  SStreamScanInfo* pScanInfo = downstream->info;
52✔
493
  pScanInfo->useGetResultRange = hasInterpoFunc;
52✔
494
  pScanInfo->igCheckUpdate = true;
52✔
495
  pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
52✔
496
  pScanInfo->pState = pAggSup->pState;
52✔
497
  if (!pScanInfo->pUpdateInfo) {
52✔
498
    code = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
36✔
499
                                              pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen,
36✔
500
                                              &pScanInfo->pUpdateInfo);
36✔
501
    QUERY_CHECK_CODE(code, lino, _end);
36!
502
  }
503
  pScanInfo->twAggSup = *pTwSup;
52✔
504
  pScanInfo->interval = *pInterval;
52✔
505
  pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
52✔
506
  if (!hasSrcPrimaryKeyCol(pBasic)) {
52!
507
    pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
52✔
508
  }
509

510
_end:
×
511
  if (code != TSDB_CODE_SUCCESS) {
52!
512
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
513
  }
514
  return code;
52✔
515
}
516

517
static bool windowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols) {
52✔
518
  bool  needed = false;
52✔
519
  for (int32_t i = 0; i < numOfCols; ++i) {
199✔
520
    SExprInfo* pExpr = pCtx[i].pExpr;
181✔
521
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
181✔
522
      needed = true;
34✔
523
      break;
34✔
524
    }
525
  }
526
  return needed;
52✔
527
}
528

529
int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
52✔
530
                                              SReadHandle* pHandle, SOperatorInfo** ppOptInfo) {
531
  int32_t                           code = TSDB_CODE_SUCCESS;
52✔
532
  int32_t                           lino = 0;
52✔
533
  SStreamIntervalSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalSliceOperatorInfo));
52✔
534
  QUERY_CHECK_NULL(pInfo, code, lino, _error, terrno);
52!
535

536
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
52✔
537
  QUERY_CHECK_NULL(pOperator, code, lino, _error, terrno)
52!
538

539
  pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
52✔
540
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _error, terrno);
52!
541

542
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
52✔
543
  pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
52✔
544
  QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _error, terrno);
52!
545

546
  pInfo->pDeletedMap = tSimpleHashInit(1024, hashFn);
52✔
547
  QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno);
52!
548

549
  pInfo->delIndex = 0;
52✔
550
  pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
52✔
551
  QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno);
52!
552

553
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
52✔
554
  QUERY_CHECK_CODE(code, lino, _error);
52!
555

556
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
52✔
557
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
52!
558
  initBasicInfo(&pInfo->binfo, pResBlock);
52✔
559

560
  code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
52✔
561
  QUERY_CHECK_CODE(code, lino, _error);
52!
562
  pInfo->recvCkBlock = false;
52✔
563

564
  SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
52✔
565
  pOperator->pTaskInfo = pTaskInfo;
52✔
566
  initResultSizeInfo(&pOperator->resultInfo, 4096);
52✔
567
  SExprSupp* pExpSup = &pOperator->exprSupp;
52✔
568
  int32_t    numOfExprs = 0;
52✔
569
  SExprInfo* pExprInfo = NULL;
52✔
570
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfExprs);
52✔
571
  QUERY_CHECK_CODE(code, lino, _error);
52!
572

573
  code = initExprSupp(pExpSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
52✔
574
  QUERY_CHECK_CODE(code, lino, _error);
52!
575

576
  pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
52✔
577
                                .sliding = pIntervalPhyNode->sliding,
52✔
578
                                .intervalUnit = pIntervalPhyNode->intervalUnit,
52✔
579
                                .slidingUnit = pIntervalPhyNode->slidingUnit,
52✔
580
                                .offset = pIntervalPhyNode->offset,
52✔
581
                                .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
52✔
582

583
  pInfo->twAggSup =
52✔
584
      (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
52✔
585
                           .calTrigger = pIntervalPhyNode->window.triggerType,
52✔
586
                           .maxTs = INT64_MIN,
587
                           .minTs = INT64_MAX,
588
                           .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)};
52✔
589
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
52✔
590
  QUERY_CHECK_CODE(code, lino, _error);
52!
591

592
  if (pIntervalPhyNode->window.pExprs != NULL) {
52!
593
    int32_t    numOfScalar = 0;
×
594
    SExprInfo* pScalarExprInfo = NULL;
×
595
    code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
×
596
    QUERY_CHECK_CODE(code, lino, _error);
×
597

598
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
×
599
    QUERY_CHECK_CODE(code, lino, _error);
×
600
  }
601

602
  SSDataBlock* pDownRes = NULL;
52✔
603
  SColumnInfo* pPkCol = NULL;
52✔
604
  code = getDownstreamRes(downstream, &pDownRes, &pPkCol);
52✔
605
  QUERY_CHECK_CODE(code, lino, _error);
52!
606

607
  code = initOffsetInfo(&pInfo->pOffsetInfo, pDownRes);
52✔
608
  QUERY_CHECK_CODE(code, lino, _error);
52!
609

610
  int32_t keyBytes = sizeof(TSKEY);
52✔
611
  keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock) + sizeof(bool);
52✔
612
  if (pPkCol) {
52✔
613
    keyBytes += pPkCol->bytes;
3✔
614
  }
615
  code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState, keyBytes, 0,
52✔
616
                                &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo),
52✔
617
                                &pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_HASH_SEARCH, 1);
618

619
  pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimaryKey;
52✔
620
  pInfo->pOperator = pOperator;
52✔
621
  pInfo->hasFill = false;
52✔
622
  pInfo->hasInterpoFunc = windowinterpNeeded(pExpSup->pCtx, numOfExprs);
52✔
623

624
  setOperatorInfo(pOperator, "StreamIntervalSliceOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL, true, OP_NOT_OPENED,
52✔
625
                  pInfo, pTaskInfo);
626
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalSliceNext, NULL, destroyStreamIntervalSliceOperatorInfo,
52✔
627
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
628
  setOperatorStreamStateFn(pOperator, streamIntervalSliceReleaseState, streamIntervalSliceReloadState);
52✔
629

630
  initStreamBasicInfo(&pInfo->basic);
52✔
631
  if (downstream) {
52!
632
    code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex,
52✔
633
                                       &pInfo->twAggSup, &pInfo->basic, &pInfo->interval, pInfo->hasInterpoFunc);
52✔
634
    QUERY_CHECK_CODE(code, lino, _error);
52!
635

636
    code = appendDownstream(pOperator, &downstream, 1);
52✔
637
    QUERY_CHECK_CODE(code, lino, _error);
52!
638
  }
639

640
  (*ppOptInfo) = pOperator;
52✔
641
  return code;
52✔
642

643
_error:
×
644
  if (code != TSDB_CODE_SUCCESS) {
×
645
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
646
  }
647
  if (pInfo != NULL) {
×
648
    destroyStreamIntervalSliceOperatorInfo(pInfo);
×
649
  }
650
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
651
  pTaskInfo->code = code;
×
652
  (*ppOptInfo) = NULL;
×
653
  return code;
×
654
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc