• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3720

26 Mar 2025 06:20AM UTC coverage: 30.242% (-31.7%) from 61.936%
#3720

push

travis-ci

web-flow
feat(taosBenchmark): supports decimal data type (#30456)

* feat: taosBenchmark supports decimal data type

* build: decimal script not use pytest.sh

* fix: fix typo for decimal script

* test: insertBasic.py debug

71234 of 313946 branches covered (22.69%)

Branch coverage included in aggregate %.

38 of 423 new or added lines in 8 files covered. (8.98%)

120240 existing lines in 447 files now uncovered.

118188 of 312400 relevant lines covered (37.83%)

1450220.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/streamintervalnonblockoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "streamexecutorInt.h"
22
#include "streaminterval.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
#define STREAM_INTERVAL_NONBLOCK_OP_STATE_NAME "StreamIntervalNonblockHistoryState"
33

34
// static int32_t buildWinResultKey(SRowBuffPos* pPos, SSHashObj* pUpdatedMap, SSHashObj* pResultCache, bool savePos) {
35
//   int32_t      code = TSDB_CODE_SUCCESS;
36
//   int32_t      lino = 0;
37
//   SWinKey*     pKey = pPos->pKey;
38
//   SRowBuffPos* pPrevResPos = tSimpleHashGet(pResultCache, &pKey->groupId, sizeof(uint64_t));
39
//   if (pPrevResPos != NULL) {
40
//     SWinKey* pPrevResKey = (SWinKey*)pPrevResPos->pKey;
41
//     if (savePos) {
42
//       code = tSimpleHashPut(pUpdatedMap, pPrevResKey, sizeof(SWinKey), &pPrevResPos, POINTER_BYTES);
43
//     } else {
44
//       code = tSimpleHashPut(pUpdatedMap, pPrevResKey, sizeof(SWinKey), NULL, 0);
45
//     }
46
//     QUERY_CHECK_CODE(code, lino, _end);
47
//   }
48
//   code = tSimpleHashPut(pResultCache, &pKey->groupId, sizeof(uint64_t), &pPos, POINTER_BYTES);
49
//   QUERY_CHECK_CODE(code, lino, _end);
50

51
// _end:
52
//   if (code != TSDB_CODE_SUCCESS) {
53
//     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
54
//   }
55
//   return code;
56
// }
57

UNCOV
58
void releaseFlusedPos(void* pRes) {
×
UNCOV
59
  SRowBuffPos* pPos = *(SRowBuffPos**)pRes;
×
UNCOV
60
  if (pPos != NULL && pPos->needFree) {
×
61
    pPos->beUsed = false;
×
62
  }
UNCOV
63
}
×
64

UNCOV
65
void getStateKeepInfo(SNonBlockAggSupporter* pNbSup, bool isRecOp, int32_t* pNumRes, TSKEY* pTsRes) {
×
UNCOV
66
  if (isRecOp) {
×
UNCOV
67
    (*pNumRes) = 0;
×
UNCOV
68
    (*pTsRes) = INT64_MIN;
×
69
  } else {
UNCOV
70
    (*pNumRes) = pNbSup->numOfKeep;
×
UNCOV
71
    (*pTsRes) = pNbSup->tsOfKeep;
×
72
  }
UNCOV
73
}
×
74

UNCOV
75
void streamIntervalNonblockReleaseState(SOperatorInfo* pOperator) {
×
UNCOV
76
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
UNCOV
77
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
UNCOV
78
  pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, pInfo->nbSup.numOfKeep, pInfo->nbSup.tsOfKeep);
×
UNCOV
79
  pAggSup->stateStore.streamStateCommit(pAggSup->pState);
×
UNCOV
80
  int32_t resSize = sizeof(TSKEY);
×
UNCOV
81
  pAggSup->stateStore.streamStateSaveInfo(pAggSup->pState, STREAM_INTERVAL_NONBLOCK_OP_STATE_NAME,
×
UNCOV
82
                                          strlen(STREAM_INTERVAL_NONBLOCK_OP_STATE_NAME), &pInfo->twAggSup.maxTs,
×
83
                                          resSize);
84

UNCOV
85
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
UNCOV
86
  if (downstream->fpSet.releaseStreamStateFn) {
×
UNCOV
87
    downstream->fpSet.releaseStreamStateFn(downstream);
×
88
  }
UNCOV
89
  qDebug("%s===stream===streamIntervalNonblockReleaseState:%" PRId64, GET_TASKID(pOperator->pTaskInfo),
×
90
         pInfo->twAggSup.maxTs);
UNCOV
91
}
×
92

UNCOV
93
void streamIntervalNonblockReloadState(SOperatorInfo* pOperator) {
×
UNCOV
94
  int32_t                           code = TSDB_CODE_SUCCESS;
×
UNCOV
95
  int32_t                           lino = 0;
×
UNCOV
96
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
UNCOV
97
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
UNCOV
98
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
99
  int32_t                           size = 0;
×
UNCOV
100
  void*                             pBuf = NULL;
×
UNCOV
101
  code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_INTERVAL_NONBLOCK_OP_STATE_NAME,
×
102
                                                strlen(STREAM_INTERVAL_NONBLOCK_OP_STATE_NAME), &pBuf, &size);
UNCOV
103
  QUERY_CHECK_CODE(code, lino, _end);
×
104

UNCOV
105
  TSKEY ts = *(TSKEY*)pBuf;
×
UNCOV
106
  taosMemoryFreeClear(pBuf);
×
UNCOV
107
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
×
UNCOV
108
  pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts);
×
109

UNCOV
110
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
UNCOV
111
  if (downstream->fpSet.reloadStreamStateFn) {
×
UNCOV
112
    downstream->fpSet.reloadStreamStateFn(downstream);
×
113
  }
UNCOV
114
  qDebug("%s===stream===streamIntervalNonblockReloadState:%" PRId64, GET_TASKID(pOperator->pTaskInfo), ts);
×
115

116
_end:
×
UNCOV
117
  if (code != TSDB_CODE_SUCCESS) {
×
118
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
119
  }
UNCOV
120
}
×
121

UNCOV
122
int32_t saveRecWindowToDisc(SSessionKey* pWinKey, uint64_t uid, EStreamType mode, STableTsDataState* pTsDataState,
×
123
                            SStreamAggSupporter* pAggSup) {
UNCOV
124
  int32_t len = copyRecDataToBuff(pWinKey->win.skey, pWinKey->win.ekey, uid, -1, mode, NULL, 0,
×
125
                                  pTsDataState->pRecValueBuff, pTsDataState->recValueLen);
UNCOV
126
  return pAggSup->stateStore.streamStateSessionSaveToDisk(pTsDataState, pWinKey, pTsDataState->pRecValueBuff, len);
×
127
}
128

UNCOV
129
static int32_t checkAndSaveWinStateToDisc(int32_t startIndex, SArray* pUpdated, uint64_t uid,
×
130
                                          STableTsDataState* pTsDataState, SStreamAggSupporter* pAggSup,
131
                                          SInterval* pInterval) {
UNCOV
132
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
133
  int32_t lino = 0;
×
UNCOV
134
  int32_t mode = 0;
×
UNCOV
135
  int32_t size = taosArrayGetSize(pUpdated);
×
UNCOV
136
  for (int32_t i = startIndex; i < size; i++) {
×
UNCOV
137
    SRowBuffPos* pWinPos = taosArrayGetP(pUpdated, i);
×
UNCOV
138
    SWinKey*     pKey = pWinPos->pKey;
×
UNCOV
139
    int32_t      winRes = pAggSup->stateStore.streamStateGetRecFlag(pAggSup->pState, pKey, sizeof(SWinKey), &mode);
×
UNCOV
140
    if (winRes == TSDB_CODE_SUCCESS) {
×
UNCOV
141
      SSessionKey winKey = {
×
UNCOV
142
          .win.skey = pKey->ts, .win.ekey = taosTimeGetIntervalEnd(pKey->ts, pInterval), .groupId = pKey->groupId};
×
UNCOV
143
      code = saveRecWindowToDisc(&winKey, uid, mode, pTsDataState, pAggSup);
×
UNCOV
144
      QUERY_CHECK_CODE(code, lino, _end);
×
145
    }
146
  }
147

UNCOV
148
_end:
×
UNCOV
149
  if (code != TSDB_CODE_SUCCESS) {
×
150
    qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
151
  }
UNCOV
152
  return code;
×
153
}
154

UNCOV
155
int32_t doStreamIntervalNonblockAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
×
UNCOV
156
  int32_t                           code = TSDB_CODE_SUCCESS;
×
UNCOV
157
  int32_t                           lino = 0;
×
UNCOV
158
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)pOperator->info;
×
UNCOV
159
  SResultRowInfo*                   pResultRowInfo = &(pInfo->binfo.resultRowInfo);
×
UNCOV
160
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
161
  SExprSupp*                        pSup = &pOperator->exprSupp;
×
UNCOV
162
  int32_t                           numOfOutput = pSup->numOfExprs;
×
UNCOV
163
  TSKEY*                            tsCols = NULL;
×
UNCOV
164
  int64_t                           groupId = pBlock->info.id.groupId;
×
UNCOV
165
  SResultRow*                       pResult = NULL;
×
UNCOV
166
  int32_t                           forwardRows = 0;
×
167

UNCOV
168
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
×
UNCOV
169
  tsCols = (int64_t*)pColDataInfo->pData;
×
170

UNCOV
171
  int32_t            startPos = 0;
×
UNCOV
172
  TSKEY              curTs = getStartTsKey(&pBlock->info.window, tsCols);
×
UNCOV
173
  SInervalSlicePoint curPoint = {0};
×
UNCOV
174
  SInervalSlicePoint prevPoint = {0};
×
UNCOV
175
  STimeWindow        curWin = getActiveTimeWindow(NULL, pResultRowInfo, curTs, &pInfo->interval, TSDB_ORDER_ASC);
×
UNCOV
176
  while (1) {
×
UNCOV
177
    int32_t winCode = TSDB_CODE_SUCCESS;
×
UNCOV
178
    code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId,
×
179
                                       &curPoint, &prevPoint, &winCode);
UNCOV
180
    QUERY_CHECK_CODE(code, lino, _end);
×
181

UNCOV
182
    if (winCode != TSDB_CODE_SUCCESS && pInfo->hasInterpoFunc == false && pInfo->nbSup.numOfKeep == 1) {
×
UNCOV
183
      SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = groupId};
×
UNCOV
184
      code = getIntervalSlicePrevStateBuf(&pInfo->streamAggSup, &pInfo->interval, &curKey, &prevPoint);
×
UNCOV
185
      QUERY_CHECK_CODE(code, lino, _end);
×
186
    }
187

UNCOV
188
    if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && winCode != TSDB_CODE_SUCCESS) {
×
UNCOV
189
      if (pInfo->hasInterpoFunc && isInterpoWindowFinished(&prevPoint) == false) {
×
190
        code = setIntervalSliceOutputBuf(&pInfo->streamAggSup, &prevPoint, pSup->pCtx, numOfOutput,
×
191
                                         pSup->rowEntryInfoOffset);
192
        QUERY_CHECK_CODE(code, lino, _end);
×
193

194
        resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
×
195
        doSetElapsedEndKey(prevPoint.winKey.win.ekey, &pOperator->exprSupp);
×
196
        doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos,
×
197
                                   &pOperator->exprSupp, INTERVAL_SLICE_END, pInfo->pOffsetInfo);
198
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1);
×
199
        code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
200
                                               pBlock->info.rows, numOfOutput);
×
201
        QUERY_CHECK_CODE(code, lino, _end);
×
202
        setInterpoWindowFinished(&prevPoint);
×
203
      }
204

UNCOV
205
      if (pInfo->nbSup.numOfKeep == 1) {
×
UNCOV
206
        void* pResPtr = taosArrayPush(pInfo->pUpdated, &prevPoint.pResPos);
×
UNCOV
207
        QUERY_CHECK_NULL(pResPtr, code, lino, _end, terrno);
×
UNCOV
208
        int32_t  mode = 0;
×
UNCOV
209
        SWinKey* pKey = prevPoint.pResPos->pKey;
×
UNCOV
210
        int32_t  winRes = pInfo->streamAggSup.stateStore.streamStateGetRecFlag(pInfo->streamAggSup.pState, pKey,
×
211
                                                                               sizeof(SWinKey), &mode);
UNCOV
212
        if (winRes == TSDB_CODE_SUCCESS) {
×
UNCOV
213
          code = saveRecWindowToDisc(&prevPoint.winKey, pBlock->info.id.uid, mode, pInfo->basic.pTsDataState,
×
214
                                     &pInfo->streamAggSup);
UNCOV
215
          QUERY_CHECK_CODE(code, lino, _end);
×
216
        }
217
      } else {
UNCOV
218
        SWinKey curKey = {.groupId = groupId};
×
UNCOV
219
        curKey.ts = taosTimeAdd(curTs, -pInfo->interval.interval, pInfo->interval.intervalUnit,
×
UNCOV
220
                                pInfo->interval.precision, NULL) +
×
221
                    1;
UNCOV
222
        int32_t startIndex = taosArrayGetSize(pInfo->pUpdated);
×
UNCOV
223
        code = pInfo->streamAggSup.stateStore.streamStateGetAllPrev(pInfo->streamAggSup.pState, &curKey,
×
224
                                                                    pInfo->pUpdated, pInfo->nbSup.numOfKeep);
UNCOV
225
        QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
226
        if (!isRecalculateOperator(&pInfo->basic)) {
×
UNCOV
227
          code = checkAndSaveWinStateToDisc(startIndex, pInfo->pUpdated, 0, pInfo->basic.pTsDataState, &pInfo->streamAggSup, &pInfo->interval);
×
UNCOV
228
          QUERY_CHECK_CODE(code, lino, _end);
×
229
        }
230
      }
231
    }
232

233
    code =
UNCOV
234
        setIntervalSliceOutputBuf(&pInfo->streamAggSup, &curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
×
UNCOV
235
    QUERY_CHECK_CODE(code, lino, _end);
×
236

UNCOV
237
    resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
×
UNCOV
238
    if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) {
×
239
      doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos,
×
240
                                 &pOperator->exprSupp, INTERVAL_SLICE_START, pInfo->pOffsetInfo);
241
    }
UNCOV
242
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
×
243
                                           TSDB_ORDER_ASC);
UNCOV
244
    int32_t prevEndPos = (forwardRows - 1) + startPos;
×
UNCOV
245
    if (pInfo->hasInterpoFunc) {
×
246
      int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false);
×
247
      TSKEY   endRowTs = tsCols[endRowId];
×
248
      transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL, pInfo->pOffsetInfo);
×
249
    }
250

UNCOV
251
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curPoint.winKey.win, 1);
×
UNCOV
252
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
×
UNCOV
253
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
UNCOV
254
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
255
    curPoint.pResPos->beUpdated = true;
×
256

UNCOV
257
    if (curPoint.pLastRow->key == curPoint.winKey.win.ekey) {
×
258
      setInterpoWindowFinished(&curPoint);
×
259
    }
UNCOV
260
    releaseOutputBuf(pInfo->streamAggSup.pState, curPoint.pResPos, &pInfo->streamAggSup.stateStore);
×
261

UNCOV
262
    startPos = getNextQualifiedWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
×
UNCOV
263
    if (startPos < 0) {
×
UNCOV
264
      break;
×
265
    }
UNCOV
266
    curTs = tsCols[startPos];
×
267
  }
268

UNCOV
269
_end:
×
UNCOV
270
  if (code != TSDB_CODE_SUCCESS) {
×
271
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
272
  }
UNCOV
273
  return code;
×
274
}
275

UNCOV
276
int32_t getHistoryRemainResultInfo(SStreamAggSupporter* pAggSup, int32_t numOfState, SArray* pUpdated,
×
277
                                   int32_t capacity) {
UNCOV
278
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
279
  int32_t lino = 0;
×
280

UNCOV
281
  if (pAggSup->pCur == NULL) {
×
UNCOV
282
    goto _end;
×
283
  }
284

UNCOV
285
  int32_t num = capacity - taosArrayGetSize(pUpdated);
×
UNCOV
286
  for (int32_t i = 0; i < num; i++) {
×
UNCOV
287
    int32_t winCode = pAggSup->stateStore.streamStateNLastStateGetKVByCur(pAggSup->pCur, numOfState, pUpdated);
×
UNCOV
288
    if (winCode == TSDB_CODE_FAILED) {
×
UNCOV
289
      pAggSup->stateStore.streamStateFreeCur(pAggSup->pCur);
×
UNCOV
290
      pAggSup->pCur = NULL;
×
UNCOV
291
      break;
×
292
    }
293

UNCOV
294
    pAggSup->stateStore.streamStateLastStateCurNext(pAggSup->pCur);
×
UNCOV
295
    num = capacity - taosArrayGetSize(pUpdated);
×
296
  }
297

298
_end:
×
UNCOV
299
  if (code != TSDB_CODE_SUCCESS) {
×
300
    qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
301
  }
UNCOV
302
  return code;
×
303
}
304

UNCOV
305
int32_t buildIntervalHistoryResult(SOperatorInfo* pOperator) {
×
UNCOV
306
  int32_t                           code = TSDB_CODE_SUCCESS;
×
UNCOV
307
  int32_t                           lino = 0;
×
UNCOV
308
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
UNCOV
309
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
UNCOV
310
  SStreamNotifyEventSupp*           pNotifySup = &pInfo->basic.notifyEventSup;
×
UNCOV
311
  bool                              addNotifyEvent = false;
×
312

UNCOV
313
  code = getHistoryRemainResultInfo(pAggSup, pInfo->nbSup.numOfKeep, pInfo->pUpdated, pOperator->resultInfo.capacity);
×
UNCOV
314
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
315
  if (taosArrayGetSize(pInfo->pUpdated) > 0) {
×
316
    taosArraySort(pInfo->pUpdated, winPosCmprImpl);
×
317
    if (pInfo->nbSup.numOfKeep > 1) {
×
318
      taosArrayRemoveDuplicate(pInfo->pUpdated, winPosCmprImpl, releaseFlusedPos);
×
319
    }
320
    initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
×
321
    pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
×
322
    QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
×
323

324
    doBuildStreamIntervalResult(pOperator, pInfo->streamAggSup.pState, pInfo->binfo.pRes, &pInfo->groupResInfo, addNotifyEvent ? pNotifySup->pSessionKeys : NULL);
×
325
  }
326

UNCOV
327
_end:
×
UNCOV
328
  if (code != TSDB_CODE_SUCCESS) {
×
329
    qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
330
  }
UNCOV
331
  return code;
×
332
}
333

UNCOV
334
static void removeDataDeleteResults(SArray* pUpdatedWins, SArray* pDelWins) {
×
UNCOV
335
  int32_t size = taosArrayGetSize(pUpdatedWins);
×
UNCOV
336
  int32_t delSize = taosArrayGetSize(pDelWins);
×
UNCOV
337
  if (delSize == 0 || size == 0) {
×
UNCOV
338
    return;
×
339
  }
UNCOV
340
  taosArraySort(pDelWins, winKeyCmprImpl);
×
UNCOV
341
  taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL);
×
UNCOV
342
  delSize = taosArrayGetSize(pDelWins);
×
343

UNCOV
344
  for (int32_t i = 0; i < size; i++) {
×
UNCOV
345
    SRowBuffPos* pPos = (SRowBuffPos*) taosArrayGetP(pUpdatedWins, i);
×
UNCOV
346
    SWinKey*     pResKey = (SWinKey*)pPos->pKey;
×
UNCOV
347
    int32_t      index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinKey);
×
UNCOV
348
    if (index >= 0 && 0 == compareWinKey(pResKey, pDelWins, index)) {
×
UNCOV
349
      taosArrayRemove(pDelWins, index);
×
UNCOV
350
      delSize = taosArrayGetSize(pDelWins);
×
351
    }
352
  }
353
}
354

UNCOV
355
static int32_t doTransformRecalculateWindows(SExecTaskInfo* pTaskInfo, SInterval* pInterval, SSDataBlock* pBlock,
×
356
                                             SArray* pUpWins) {
UNCOV
357
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
358
  int32_t          lino = 0;
×
UNCOV
359
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
UNCOV
360
  TSKEY*           startTsCols = (TSKEY*)pStartTsCol->pData;
×
UNCOV
361
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
UNCOV
362
  TSKEY*           endTsCols = (TSKEY*)pEndTsCol->pData;
×
UNCOV
363
  SColumnInfoData* pCalStTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
UNCOV
364
  TSKEY*           calStTsCols = (TSKEY*)pCalStTsCol->pData;
×
UNCOV
365
  SColumnInfoData* pCalEnTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
UNCOV
366
  TSKEY*           calEnTsCols = (TSKEY*)pCalEnTsCol->pData;
×
UNCOV
367
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
UNCOV
368
  uint64_t*        pGpDatas = (uint64_t*)pGpCol->pData;
×
UNCOV
369
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
UNCOV
370
    STimeWindow win = {.skey = startTsCols[i], .ekey = endTsCols[i]};
×
371
    do {
UNCOV
372
      if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i], pBlock->info.type)) {
×
373
        getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
×
374
        continue;
×
375
      }
376

UNCOV
377
      uint64_t winGpId = pGpDatas[i];
×
UNCOV
378
      SWinKey  winRes = {.ts = win.skey, .groupId = winGpId};
×
UNCOV
379
      void*    pTmp = taosArrayPush(pUpWins, &winRes);
×
UNCOV
380
      QUERY_CHECK_NULL(pTmp, code, lino, _end, terrno);
×
UNCOV
381
      getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
×
UNCOV
382
    } while (win.ekey <= endTsCols[i]);
×
383
  }
UNCOV
384
_end:
×
UNCOV
385
  if (code != TSDB_CODE_SUCCESS) {
×
386
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
387
  }
UNCOV
388
  return code;
×
389
}
390

UNCOV
391
static int32_t buildOtherResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
392
  int32_t                           code = TSDB_CODE_SUCCESS;
×
UNCOV
393
  int32_t                           lino = 0;
×
UNCOV
394
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
UNCOV
395
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
UNCOV
396
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
397
  if (isHistoryOperator(&pInfo->basic) && isSingleOperator(&pInfo->basic)) {
×
UNCOV
398
    code = buildIntervalHistoryResult(pOperator);
×
UNCOV
399
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
400
    if (pInfo->binfo.pRes->info.rows != 0) {
×
401
      printDataBlock(pInfo->binfo.pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
402
      (*ppRes) = pInfo->binfo.pRes;
×
403
      return code;
×
404
    }
405
  }
406

UNCOV
407
  if (pInfo->recvCkBlock) {
×
UNCOV
408
    pInfo->recvCkBlock = false;
×
UNCOV
409
    printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
UNCOV
410
    (*ppRes) = pInfo->pCheckpointRes;
×
UNCOV
411
    return code;
×
412
  }
413

UNCOV
414
  if (pInfo->twAggSup.minTs != INT64_MAX) {
×
UNCOV
415
    pInfo->nbSup.tsOfKeep = pInfo->twAggSup.minTs;
×
416
  }
417

UNCOV
418
  if ( !(isFinalOperator(&pInfo->basic) && (isRecalculateOperator(&pInfo->basic) || isHistoryOperator(&pInfo->basic))) ) {
×
UNCOV
419
    int32_t numOfKeep = 0;
×
UNCOV
420
    TSKEY tsOfKeep = INT64_MAX;
×
UNCOV
421
    getStateKeepInfo(&pInfo->nbSup, isRecalculateOperator(&pInfo->basic), &numOfKeep, &tsOfKeep);
×
UNCOV
422
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, numOfKeep, tsOfKeep);
×
423
  }
424

UNCOV
425
  pInfo->twAggSup.minTs = INT64_MAX;
×
UNCOV
426
  pInfo->basic.numOfRecv = 0;
×
UNCOV
427
  setStreamOperatorCompleted(pOperator);
×
UNCOV
428
  if (isFinalOperator(&pInfo->basic) && isRecalculateOperator(&pInfo->basic) && tSimpleHashGetSize(pInfo->nbSup.pPullDataMap) == 0) {
×
UNCOV
429
    qInfo("===stream===%s recalculate is finished.", GET_TASKID(pTaskInfo));
×
UNCOV
430
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, 0, INT64_MAX);
×
UNCOV
431
    pTaskInfo->streamInfo.recoverScanFinished = true;
×
432
  }
UNCOV
433
  (*ppRes) = NULL;
×
434

UNCOV
435
_end:
×
UNCOV
436
  if (code != TSDB_CODE_SUCCESS) {
×
437
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
438
  }
UNCOV
439
  return code;
×
440
}
441

UNCOV
442
int32_t copyNewResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar) {
×
UNCOV
443
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
444
  int32_t lino = 0;
×
UNCOV
445
  void*   pIte = NULL;
×
UNCOV
446
  int32_t iter = 0;
×
UNCOV
447
  while ((pIte = tSimpleHashIterate(*ppWinUpdated, pIte, &iter)) != NULL) {
×
UNCOV
448
    void* tmp = taosArrayPush(pUpdated, pIte);
×
UNCOV
449
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
450
  }
UNCOV
451
  taosArraySort(pUpdated, compar);
×
UNCOV
452
  if (tSimpleHashGetSize(*ppWinUpdated) < 4096) {
×
UNCOV
453
    tSimpleHashClear(*ppWinUpdated);
×
454
  } else {
455
    tSimpleHashClear(*ppWinUpdated);
×
456
    tSimpleHashCleanup(*ppWinUpdated);
×
457
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
458
    (*ppWinUpdated) = tSimpleHashInit(1024, hashFn);
×
459
  }
460

UNCOV
461
_end:
×
UNCOV
462
  if (code != TSDB_CODE_SUCCESS) {
×
463
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
464
  }
UNCOV
465
  return code;
×
466
}
467

UNCOV
468
static int32_t closeNonblockIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
×
469
                                           SArray* pUpdated, SExecTaskInfo* pTaskInfo) {
UNCOV
470
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
471
  int32_t lino = 0;
×
UNCOV
472
  void*   pIte = NULL;
×
UNCOV
473
  int32_t iter = 0;
×
UNCOV
474
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
×
UNCOV
475
    void*    key = tSimpleHashGetKey(pIte, NULL);
×
UNCOV
476
    SWinKey* pWinKey = (SWinKey*)key;
×
477

UNCOV
478
    STimeWindow win = {
×
UNCOV
479
        .skey = pWinKey->ts,
×
UNCOV
480
        .ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1,
×
481
    };
482

UNCOV
483
    if (isCloseWindow(&win, pTwSup)) {
×
UNCOV
484
      void* pTemp = taosArrayPush(pUpdated, pIte);
×
UNCOV
485
      QUERY_CHECK_NULL(pTemp, code, lino, _end, terrno);
×
486

UNCOV
487
      int32_t tmpRes = tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
×
UNCOV
488
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
489
    }
490
  }
491

UNCOV
492
_end:
×
UNCOV
493
  if (code != TSDB_CODE_SUCCESS) {
×
494
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
495
  }
UNCOV
496
  return code;
×
497
}
498

UNCOV
499
static int32_t doProcessRecalculateReq(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
×
UNCOV
500
  int32_t                           code = TSDB_CODE_SUCCESS;
×
UNCOV
501
  int32_t                           lino = 0;
×
UNCOV
502
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
UNCOV
503
  SInterval*                        pInterval = &pInfo->interval;
×
UNCOV
504
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
505
  SColumnInfoData*                  pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
UNCOV
506
  TSKEY*                            startTsCols = (TSKEY*)pStartTsCol->pData;
×
UNCOV
507
  SColumnInfoData*                  pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
UNCOV
508
  TSKEY*                            endTsCols = (TSKEY*)pEndTsCol->pData;
×
UNCOV
509
  SColumnInfoData*                  pCalStTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
UNCOV
510
  TSKEY*                            calStTsCols = (TSKEY*)pCalStTsCol->pData;
×
UNCOV
511
  SColumnInfoData*                  pCalEnTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
UNCOV
512
  TSKEY*                            calEnTsCols = (TSKEY*)pCalEnTsCol->pData;
×
UNCOV
513
  SColumnInfoData*                  pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
UNCOV
514
  uint64_t*                         pGpDatas = (uint64_t*)pGpCol->pData;
×
UNCOV
515
  SColumnInfoData*                  pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
×
UNCOV
516
  uint64_t*                         pUidDatas = (uint64_t*)pUidCol->pData;
×
517

UNCOV
518
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
UNCOV
519
    SResultRowInfo dumyInfo = {0};
×
UNCOV
520
    dumyInfo.cur.pageId = -1;
×
521

UNCOV
522
    STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
×
523

524
    do {
UNCOV
525
      if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i], pBlock->info.type)) {
×
526
        getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
×
527
        continue;
×
528
      }
529

UNCOV
530
      SWinKey key = {.ts = win.skey, .groupId = pGpDatas[i]};
×
UNCOV
531
      bool isLastWin = false;
×
UNCOV
532
      if (pInfo->streamAggSup.stateStore.streamStateCheck(pInfo->streamAggSup.pState, &key,
×
UNCOV
533
                                                          isFinalOperator(&pInfo->basic), &isLastWin)) {
×
UNCOV
534
        qDebug("===stream===%s set recalculate flag ts:%" PRId64 ",group id:%" PRIu64, GET_TASKID(pTaskInfo), key.ts,
×
535
               key.groupId);
UNCOV
536
        pInfo->streamAggSup.stateStore.streamStateSetRecFlag(pInfo->streamAggSup.pState, &key, sizeof(SWinKey),
×
UNCOV
537
                                                             pBlock->info.type);
×
UNCOV
538
        if ((isFinalOperator(&pInfo->basic) && isCloseWindow(&win, &pInfo->twAggSup)) || (isSingleOperator(&pInfo->basic) && isLastWin == false) ) {
×
539
          SSessionKey winKey = {.win = win, .groupId = key.groupId};
×
540
          code = saveRecWindowToDisc(&winKey, pUidDatas[i], pBlock->info.type, pInfo->basic.pTsDataState,
×
541
                                     &pInfo->streamAggSup);
542
          QUERY_CHECK_CODE(code, lino, _end);
×
543
        }
544
      } else {
545
        SSessionKey winKey = {.win = win, .groupId = key.groupId};
×
546
        code = saveRecWindowToDisc(&winKey, pUidDatas[i], pBlock->info.type, pInfo->basic.pTsDataState,
×
547
                                   &pInfo->streamAggSup);
548
        QUERY_CHECK_CODE(code, lino, _end);
×
549
      }
UNCOV
550
      getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
×
UNCOV
551
    } while (win.ekey <= endTsCols[i]);
×
552
  }
UNCOV
553
_end:
×
UNCOV
554
  if (code != TSDB_CODE_SUCCESS) {
×
555
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
556
  }
UNCOV
557
  return code;
×
558
}
559

UNCOV
560
static int32_t addDataPullWindowInfo(SSHashObj* pPullMap, SPullWindowInfo* pPullKey, int32_t numOfChild, SExecTaskInfo* pTaskInfo) {
×
UNCOV
561
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
562
  int32_t lino = 0;
×
UNCOV
563
  SArray* childIds = taosArrayInit(numOfChild, sizeof(int32_t));
×
UNCOV
564
  QUERY_CHECK_NULL(childIds, code, lino, _end, terrno);
×
UNCOV
565
  for (int32_t i = 0; i < numOfChild; i++) {
×
UNCOV
566
    void* pTemp = taosArrayPush(childIds, &i);
×
UNCOV
567
    QUERY_CHECK_NULL(pTemp, code, lino, _end, terrno);
×
568
  }
UNCOV
569
  code = tSimpleHashPut(pPullMap, pPullKey, sizeof(SPullWindowInfo), &childIds, POINTER_BYTES);
×
UNCOV
570
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
571
_end:
×
UNCOV
572
  if (code != TSDB_CODE_SUCCESS) {
×
573
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
574
  }
UNCOV
575
  return code;
×
576
}
577

UNCOV
578
int32_t buildRetriveRequest(SExecTaskInfo* pTaskInfo, SStreamAggSupporter* pAggSup, STableTsDataState* pTsDataState,
×
579
                            SNonBlockAggSupporter* pNbSup) {
UNCOV
580
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
581
  int32_t lino = 0;
×
UNCOV
582
  code = pAggSup->stateStore.streamStateMergeAllScanRange(pTsDataState);
×
UNCOV
583
  QUERY_CHECK_CODE(code, lino, _end);
×
584

UNCOV
585
  while (1) {
×
UNCOV
586
    SScanRange range = {0};
×
UNCOV
587
    code = pAggSup->stateStore.streamStatePopScanRange(pTsDataState, &range);
×
UNCOV
588
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
589
    if (IS_INVALID_RANGE(range)) {
×
UNCOV
590
      break;
×
591
    }
UNCOV
592
    void*   pIte = NULL;
×
UNCOV
593
    int32_t iter = 0;
×
UNCOV
594
    while ((pIte = tSimpleHashIterate(range.pGroupIds, pIte, &iter)) != NULL) {
×
UNCOV
595
      uint64_t        groupId = *(uint64_t*)tSimpleHashGetKey(pIte, NULL);
×
UNCOV
596
      SPullWindowInfo pullReq = {.window = range.win, .groupId = groupId, .calWin = range.calWin};
×
UNCOV
597
      code = addDataPullWindowInfo(pNbSup->pPullDataMap, &pullReq, pNbSup->numOfChild, pTaskInfo);
×
UNCOV
598
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
599
      void*           pTemp = taosArrayPush(pNbSup->pPullWins, &pullReq);
×
UNCOV
600
      QUERY_CHECK_NULL(pTemp, code, lino, _end, terrno);
×
601
    }
602
  }
603

UNCOV
604
_end:
×
UNCOV
605
  if (code != TSDB_CODE_SUCCESS) {
×
606
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
607
    pTaskInfo->code = code;
×
608
  }
UNCOV
609
  return code;
×
610
}
611

UNCOV
612
int32_t processDataPullOver(SSDataBlock* pBlock, SSHashObj* pPullMap, SExecTaskInfo* pTaskInfo) {
×
UNCOV
613
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
614
  int32_t          lino = 0;
×
UNCOV
615
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
UNCOV
616
  TSKEY*           pTsStartData = (TSKEY*)pStartCol->pData;
×
UNCOV
617
  SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
UNCOV
618
  TSKEY*           pTsEndData = (TSKEY*)pEndCol->pData;
×
UNCOV
619
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
UNCOV
620
  TSKEY*           pCalTsStartData = (TSKEY*)pCalStartCol->pData;
×
UNCOV
621
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
UNCOV
622
  TSKEY*           pCalTsEndData = (TSKEY*)pCalEndCol->pData;
×
UNCOV
623
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
UNCOV
624
  uint64_t*        pGroupIdData = (uint64_t*)pGroupCol->pData;
×
UNCOV
625
  int32_t          chId = getChildIndex(pBlock);
×
UNCOV
626
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
UNCOV
627
    SPullWindowInfo pull = {.window.skey = pTsStartData[i],
×
UNCOV
628
                            .window.ekey = pTsEndData[i],
×
UNCOV
629
                            .groupId = pGroupIdData[i],
×
UNCOV
630
                            .calWin.skey = pCalTsStartData[i],
×
UNCOV
631
                            .calWin.ekey = pCalTsEndData[i]};
×
UNCOV
632
    void*           pChIds = tSimpleHashGet(pPullMap, &pull, sizeof(SPullWindowInfo));
×
UNCOV
633
    if (pChIds == NULL) {
×
634
      qInfo("===stream===%s did not find retrive window. ts:%" PRId64 ",groupId:%" PRIu64 ",child id %d",
×
635
            GET_TASKID(pTaskInfo), pull.window.skey, pull.groupId, chId);
636
      continue;
×
637
    }
UNCOV
638
    SArray* chArray = *(SArray**)pChIds;
×
UNCOV
639
    int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
×
UNCOV
640
    if (index == -1) {
×
641
      qInfo("===stream===%s did not find child id. retrive window ts:%" PRId64 ",groupId:%" PRIu64 ",child id %d",
×
642
            GET_TASKID(pTaskInfo), pull.window.skey, pull.groupId, chId);
643
      continue;
×
644
    }
UNCOV
645
    qDebug("===stream===%s retrive window %" PRId64 " delete child id %d", GET_TASKID(pTaskInfo), pull.window.skey,
×
646
           chId);
UNCOV
647
    taosArrayRemove(chArray, index);
×
UNCOV
648
    if (taosArrayGetSize(chArray) == 0) {
×
649
      // pull data is over
UNCOV
650
      taosArrayDestroy(chArray);
×
UNCOV
651
      int32_t tmpRes = tSimpleHashRemove(pPullMap, &pull, sizeof(SPullWindowInfo));
×
UNCOV
652
       qDebug("===stream===%s retrive pull data over. ts:%" PRId64 ",groupId:%" PRIu64 , GET_TASKID(pTaskInfo), pull.window.skey,
×
653
              pull.groupId);
654
    }
655
  }
656

UNCOV
657
_end:
×
UNCOV
658
  if (code != TSDB_CODE_SUCCESS) {
×
659
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
660
  }
UNCOV
661
  return code;
×
662
}
663

UNCOV
664
int32_t doStreamIntervalNonblockAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
665
  int32_t                           code = TSDB_CODE_SUCCESS;
×
UNCOV
666
  int32_t                           lino = 0;
×
UNCOV
667
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
UNCOV
668
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
669
  SStorageAPI*                      pAPI = &pOperator->pTaskInfo->storageAPI;
×
UNCOV
670
  SExprSupp*                        pSup = &pOperator->exprSupp;
×
UNCOV
671
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
672

UNCOV
673
  qDebug("stask:%s  %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
×
674

UNCOV
675
  if (pOperator->status == OP_EXEC_DONE) {
×
676
    (*ppRes) = NULL;
×
677
    return code;
×
678
  }
679

UNCOV
680
  code = buildIntervalSliceResult(pOperator, ppRes);
×
UNCOV
681
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
682
  if ((*ppRes) != NULL) {
×
UNCOV
683
    return code;
×
684
  }
685

UNCOV
686
  if (isHistoryOperator(&pInfo->basic) && !isFinalOperator(&pInfo->basic)) {
×
UNCOV
687
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, pInfo->nbSup.numOfKeep, pInfo->nbSup.tsOfKeep);
×
688
  }
689

UNCOV
690
  if (pOperator->status == OP_RES_TO_RETURN) {
×
UNCOV
691
    return buildOtherResult(pOperator, ppRes);
×
692
  }
693

UNCOV
694
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
695

UNCOV
696
  while (1) {
×
UNCOV
697
    if (isTaskKilled(pTaskInfo)) {
×
698
      qInfo("===stream===%s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
×
699
      (*ppRes) = NULL;
×
UNCOV
700
      return code;
×
701
    }
UNCOV
702
    SSDataBlock* pBlock = NULL;
×
UNCOV
703
    code = downstream->fpSet.getNextFn(downstream, &pBlock);
×
UNCOV
704
    QUERY_CHECK_CODE(code, lino, _end);
×
705

UNCOV
706
    if (pBlock == NULL) {
×
UNCOV
707
      qDebug("===stream===%s return data:%s. rev rows:%d", GET_TASKID(pTaskInfo),
×
708
             getStreamOpName(pOperator->operatorType), pInfo->basic.numOfRecv);
UNCOV
709
      if (isFinalOperator(&pInfo->basic)) {
×
UNCOV
710
        if (isRecalculateOperator(&pInfo->basic)) {
×
UNCOV
711
          code = buildRetriveRequest(pTaskInfo, pAggSup, pInfo->basic.pTsDataState, &pInfo->nbSup);
×
UNCOV
712
          QUERY_CHECK_CODE(code, lino, _end);
×
713
        } else {
UNCOV
714
          code = pAggSup->stateStore.streamStateFlushReaminInfoToDisk(pInfo->basic.pTsDataState);
×
UNCOV
715
          QUERY_CHECK_CODE(code, lino, _end);
×
716
        }
717
      }
UNCOV
718
      pOperator->status = OP_RES_TO_RETURN;
×
UNCOV
719
      break;
×
720
    }
UNCOV
721
    pInfo->basic.numOfRecv += pBlock->info.rows;
×
722

UNCOV
723
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
×
UNCOV
724
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
×
725

UNCOV
726
    switch (pBlock->info.type) {
×
UNCOV
727
      case STREAM_NORMAL:
×
728
      case STREAM_INVALID:
729
      case STREAM_PULL_DATA: {
UNCOV
730
        SExprSupp* pExprSup = &pInfo->scalarSup;
×
UNCOV
731
        if (pExprSup->pExprInfo != NULL) {
×
732
          code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
733
          QUERY_CHECK_CODE(code, lino, _end);
×
734
        }
UNCOV
735
      } break;
×
UNCOV
736
      case STREAM_CHECKPOINT: {
×
UNCOV
737
        pInfo->recvCkBlock = true;
×
UNCOV
738
        pAggSup->stateStore.streamStateCommit(pAggSup->pState);
×
UNCOV
739
        code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
×
UNCOV
740
        QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
741
        continue;
×
742
      } break;
UNCOV
743
      case STREAM_CREATE_CHILD_TABLE:
×
744
      case STREAM_DROP_CHILD_TABLE: {
UNCOV
745
        printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
UNCOV
746
        (*ppRes) = pBlock;
×
UNCOV
747
        return code;
×
748
      } break;
UNCOV
749
      case STREAM_RECALCULATE_DATA:
×
750
      case STREAM_RECALCULATE_DELETE: {
UNCOV
751
        if (isRecalculateOperator(&pInfo->basic)) {
×
UNCOV
752
          if (!isSemiOperator(&pInfo->basic)) {
×
UNCOV
753
            code = doTransformRecalculateWindows(pTaskInfo, &pInfo->interval, pBlock, pInfo->pDelWins);
×
UNCOV
754
            QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
755
            if (isFinalOperator(&pInfo->basic)) {
×
UNCOV
756
              saveRecalculateData(&pAggSup->stateStore, pInfo->basic.pTsDataState, pBlock, pBlock->info.type);
×
757
            }
UNCOV
758
            continue;
×
759
          }
760
        }
761

UNCOV
762
        if (isSemiOperator(&pInfo->basic)) {
×
UNCOV
763
          (*ppRes) = pBlock;
×
UNCOV
764
          return code;
×
765
        } else {
UNCOV
766
          code = doProcessRecalculateReq(pOperator, pBlock);
×
UNCOV
767
          QUERY_CHECK_CODE(code, lino, _end);
×
768
        }
UNCOV
769
        continue;
×
770
      } break;
UNCOV
771
      case STREAM_PULL_OVER: {
×
UNCOV
772
        code = processDataPullOver(pBlock, pInfo->nbSup.pPullDataMap, pTaskInfo);
×
UNCOV
773
        QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
774
        continue;
×
775
      }      
UNCOV
776
      default: {
×
UNCOV
777
        qDebug("===stream===%s ignore recv block. type:%d", GET_TASKID(pTaskInfo), pBlock->info.type);
×
UNCOV
778
        continue;
×
779
      } break;
780
    }
781

UNCOV
782
    if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) {
×
783
      // set input version
UNCOV
784
      pTaskInfo->version = pBlock->info.version;
×
785
    }
786

UNCOV
787
    code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
×
UNCOV
788
    QUERY_CHECK_CODE(code, lino, _end);
×
789

UNCOV
790
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
×
UNCOV
791
    code = pInfo->nbSup.pWindowAggFn(pOperator, pBlock);
×
UNCOV
792
    if (code == TSDB_CODE_STREAM_INTERNAL_ERROR) {
×
793
      pOperator->status = OP_RES_TO_RETURN;
×
794
      code = TSDB_CODE_SUCCESS;
×
795
    }
UNCOV
796
    QUERY_CHECK_CODE(code, lino, _end);
×
797

UNCOV
798
    if (pAggSup->pScanBlock->info.rows > 0) {
×
799
      (*ppRes) = pAggSup->pScanBlock;
×
800
      printDataBlock(pAggSup->pScanBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
801
      return code;
×
802
    }
803

UNCOV
804
    if (taosArrayGetSize(pInfo->pUpdated) > 0) {
×
UNCOV
805
      break;
×
806
    }
807
  }
808

UNCOV
809
  if (pOperator->status == OP_RES_TO_RETURN &&
×
UNCOV
810
      (isHistoryOperator(&pInfo->basic) || isRecalculateOperator(&pInfo->basic) || isSemiOperator(&pInfo->basic))) {
×
UNCOV
811
    code = copyNewResult(&pAggSup->pResultRows, pInfo->pUpdated, winPosCmprImpl);
×
UNCOV
812
    QUERY_CHECK_CODE(code, lino, _end);
×
813

UNCOV
814
    if (isSingleOperator(&pInfo->basic)) {
×
UNCOV
815
      if (pAggSup->pCur == NULL) {
×
UNCOV
816
        pAggSup->pCur = pAggSup->stateStore.streamStateGetLastStateCur(pAggSup->pState);
×
817
      }
818
      code =
UNCOV
819
          getHistoryRemainResultInfo(pAggSup, pInfo->nbSup.numOfKeep, pInfo->pUpdated, pOperator->resultInfo.capacity);
×
UNCOV
820
      QUERY_CHECK_CODE(code, lino, _end);
×
821
    }
822
  }
823

UNCOV
824
  if (pOperator->status == OP_RES_TO_RETURN && pInfo->destHasPrimaryKey && isFinalOperator(&pInfo->basic)) {
×
UNCOV
825
    code = closeNonblockIntervalWindow(pAggSup->pResultRows, &pInfo->twAggSup, &pInfo->interval, pInfo->pUpdated,
×
826
                                       pTaskInfo);
UNCOV
827
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
828
    if (!isHistoryOperator(&pInfo->basic) && !isRecalculateOperator(&pInfo->basic)) {
×
UNCOV
829
      code = checkAndSaveWinStateToDisc(0, pInfo->pUpdated, 0, pInfo->basic.pTsDataState, &pInfo->streamAggSup, &pInfo->interval);
×
UNCOV
830
      QUERY_CHECK_CODE(code, lino, _end);
×
831
    }
832
  }
833

UNCOV
834
  taosArraySort(pInfo->pUpdated, winPosCmprImpl);
×
UNCOV
835
  if (pInfo->nbSup.numOfKeep > 1) {
×
UNCOV
836
    taosArrayRemoveDuplicate(pInfo->pUpdated, winPosCmprImpl, releaseFlusedPos);
×
837
  }
UNCOV
838
  if (!isSemiOperator(&pInfo->basic) && !pInfo->destHasPrimaryKey) {
×
UNCOV
839
    removeDataDeleteResults(pInfo->pUpdated, pInfo->pDelWins);
×
840
  }
841

UNCOV
842
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
×
UNCOV
843
  pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
×
UNCOV
844
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
×
845

UNCOV
846
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
×
UNCOV
847
  QUERY_CHECK_CODE(code, lino, _end);
×
848

UNCOV
849
  code = buildIntervalSliceResult(pOperator, ppRes);
×
UNCOV
850
  QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
851
  if ((*ppRes) != NULL) {
×
UNCOV
852
    return code;
×
853
  }
854

UNCOV
855
  return buildOtherResult(pOperator, ppRes);
×
856

857
_end:
×
858
  if (code != TSDB_CODE_SUCCESS) {
×
859
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
860
    pTaskInfo->code = code;
×
861
  }
862
  return code;
×
863
}
864

UNCOV
865
int32_t doStreamSemiIntervalNonblockAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
×
UNCOV
866
  int32_t                           code = TSDB_CODE_SUCCESS;
×
UNCOV
867
  int32_t                           lino = 0;
×
UNCOV
868
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)pOperator->info;
×
UNCOV
869
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
UNCOV
870
  SResultRowInfo*                   pResultRowInfo = &(pInfo->binfo.resultRowInfo);
×
UNCOV
871
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
872
  SExprSupp*                        pSup = &pOperator->exprSupp;
×
UNCOV
873
  int32_t                           numOfOutput = pSup->numOfExprs;
×
UNCOV
874
  TSKEY*                            tsCols = NULL;
×
UNCOV
875
  int64_t                           groupId = pBlock->info.id.groupId;
×
UNCOV
876
  SResultRow*                       pResult = NULL;
×
UNCOV
877
  int32_t                           forwardRows = 0;
×
878

UNCOV
879
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
×
UNCOV
880
  tsCols = (int64_t*)pColDataInfo->pData;
×
881

UNCOV
882
  int32_t            startPos = 0;
×
UNCOV
883
  TSKEY              curTs = getStartTsKey(&pBlock->info.window, tsCols);
×
UNCOV
884
  SInervalSlicePoint curPoint = {0};
×
UNCOV
885
  SInervalSlicePoint prevPoint = {0};
×
UNCOV
886
  STimeWindow        curWin = getActiveTimeWindow(NULL, pResultRowInfo, curTs, &pInfo->interval, TSDB_ORDER_ASC);
×
UNCOV
887
  while (1) {
×
UNCOV
888
    int32_t winCode = TSDB_CODE_SUCCESS;
×
UNCOV
889
    code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId,
×
890
                                       &curPoint, &prevPoint, &winCode);
UNCOV
891
    QUERY_CHECK_CODE(code, lino, _end);
×
892

UNCOV
893
    if (winCode != TSDB_CODE_SUCCESS) {
×
UNCOV
894
      SWinKey key = {.ts = curPoint.winKey.win.skey, .groupId = groupId};
×
UNCOV
895
      code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SWinKey), &curPoint.pResPos, POINTER_BYTES);
×
UNCOV
896
      QUERY_CHECK_CODE(code, lino, _end);
×
897
    }
898

899
    code =
UNCOV
900
        setIntervalSliceOutputBuf(&pInfo->streamAggSup, &curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
×
UNCOV
901
    QUERY_CHECK_CODE(code, lino, _end);
×
902

UNCOV
903
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
×
904
                                           TSDB_ORDER_ASC);
UNCOV
905
    int32_t prevEndPos = (forwardRows - 1) + startPos;
×
906

UNCOV
907
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curPoint.winKey.win, 1);
×
UNCOV
908
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
×
UNCOV
909
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
UNCOV
910
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
911
    curPoint.pResPos->beUpdated = true;
×
912

UNCOV
913
    if (curPoint.pLastRow->key == curPoint.winKey.win.ekey) {
×
914
      setInterpoWindowFinished(&curPoint);
×
915
    }
916

UNCOV
917
    startPos = getNextQualifiedWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
×
UNCOV
918
    if (startPos < 0) {
×
UNCOV
919
      break;
×
920
    }
UNCOV
921
    curTs = tsCols[startPos];
×
922
  }
923

UNCOV
924
  if (isHistoryOperator(&pInfo->basic) &&
×
UNCOV
925
      tSimpleHashGetSize(pAggSup->pResultRows) > pOperator->resultInfo.capacity * 10) {
×
926
    code = copyNewResult(&pAggSup->pResultRows, pInfo->pUpdated, winPosCmprImpl);
×
927
    QUERY_CHECK_CODE(code, lino, _end);
×
928
  }
929

UNCOV
930
_end:
×
UNCOV
931
  if (code != TSDB_CODE_SUCCESS) {
×
932
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
933
  }
UNCOV
934
  return code;
×
935
}
936

UNCOV
937
void adjustDownstreamBasicInfo(SOperatorInfo* downstream, struct SSteamOpBasicInfo* pBasic) {
×
UNCOV
938
  SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
×
UNCOV
939
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
×
UNCOV
940
    SStreamPartitionOperatorInfo* pPartionInfo = downstream->info;
×
UNCOV
941
    pPartionInfo->basic.operatorFlag = pBasic->operatorFlag;
×
942
  }
UNCOV
943
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
UNCOV
944
    adjustDownstreamBasicInfo(downstream->pDownstream[0], pBasic);
×
945
  }
UNCOV
946
  SStreamScanInfo* pScanInfo = downstream->info;
×
UNCOV
947
  pScanInfo->basic.operatorFlag = pBasic->operatorFlag;
×
UNCOV
948
}
×
949

UNCOV
950
int32_t createSemiIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
×
951
                                            SReadHandle* pHandle, SOperatorInfo** ppOptInfo) {
UNCOV
952
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
953
  int32_t lino = 0;
×
UNCOV
954
  code = createStreamIntervalSliceOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, ppOptInfo);
×
UNCOV
955
  QUERY_CHECK_CODE(code, lino, _end);
×
956

UNCOV
957
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)(*ppOptInfo)->info;
×
UNCOV
958
  pInfo->nbSup.numOfKeep = 0;
×
UNCOV
959
  pInfo->nbSup.pWindowAggFn = doStreamSemiIntervalNonblockAggImpl;
×
UNCOV
960
  setSemiOperatorFlag(&pInfo->basic);
×
UNCOV
961
  adjustDownstreamBasicInfo(downstream, &pInfo->basic);
×
962

UNCOV
963
_end:
×
UNCOV
964
  if (code != TSDB_CODE_SUCCESS) {
×
965
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
966
  }
UNCOV
967
  return code;
×
968
}
969

UNCOV
970
bool isDataDeletedStreamWindow(SStreamIntervalSliceOperatorInfo* pInfo, STimeWindow* pWin, uint64_t groupId) {
×
UNCOV
971
  SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
×
UNCOV
972
  if (pWin->skey < pInfo->nbSup.tsOfKeep) {
×
UNCOV
973
    SWinKey key = {.ts = pWin->skey, .groupId = groupId};
×
UNCOV
974
    return !(pAggSup->stateStore.streamStateCheck(pAggSup->pState, &key, isFinalOperator(&pInfo->basic), NULL));
×
975
  }
UNCOV
976
  return false;
×
977
}
978

UNCOV
979
static int32_t doStreamFinalntervalNonblockAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
×
UNCOV
980
  int32_t                           code = TSDB_CODE_SUCCESS;
×
UNCOV
981
  int32_t                           lino = 0;
×
UNCOV
982
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)pOperator->info;
×
UNCOV
983
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
UNCOV
984
  SResultRowInfo*                   pResultRowInfo = &(pInfo->binfo.resultRowInfo);
×
UNCOV
985
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
986
  SExprSupp*                        pSup = &pOperator->exprSupp;
×
UNCOV
987
  int32_t                           numOfOutput = pSup->numOfExprs;
×
UNCOV
988
  SResultRow*                       pResult = NULL;
×
UNCOV
989
  int32_t                           forwardRows = 1;
×
UNCOV
990
  SColumnInfoData*                  pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
×
UNCOV
991
  TSKEY*                            tsCols = (int64_t*)pColDataInfo->pData;
×
UNCOV
992
  int32_t                           startPos = 0;
×
UNCOV
993
  uint64_t                          groupId = pBlock->info.id.groupId;
×
UNCOV
994
  SInervalSlicePoint                curPoint = {0};
×
UNCOV
995
  SInervalSlicePoint                prevPoint = {0};
×
996

UNCOV
997
  if (pAggSup->pScanBlock->info.rows > 0) {
×
998
    blockDataCleanup(pAggSup->pScanBlock);
×
999
  }
UNCOV
1000
  pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.ekey);
×
1001

UNCOV
1002
  blockDataEnsureCapacity(pAggSup->pScanBlock, pBlock->info.rows);
×
UNCOV
1003
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
×
UNCOV
1004
  STimeWindow curWin = getFinalTimeWindow(ts, &pInfo->interval);
×
UNCOV
1005
  while (startPos >= 0) {
×
UNCOV
1006
    if (!isHistoryOperator(&pInfo->basic) && isDataDeletedStreamWindow(pInfo, &curWin, groupId)) {
×
1007
      uint64_t uid = 0;
×
1008
      code = appendOneRowToSpecialBlockImpl(pAggSup->pScanBlock, &curWin.skey, &curWin.ekey, &curWin.skey, &curWin.skey,
×
1009
                                            &uid, &groupId, NULL, NULL);
1010
      QUERY_CHECK_CODE(code, lino, _end);
×
1011

1012
      startPos = getNextQualifiedFinalWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, startPos);
×
UNCOV
1013
    } else if (isRecalculateOperator(&pInfo->basic) && !inSlidingWindow(&pInfo->interval, &curWin, &pBlock->info)) {
×
1014
      startPos = getNextQualifiedFinalWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, startPos);
×
1015
    } else {
1016
      break;
1017
    }
1018
  }
1019

UNCOV
1020
  while (startPos >= 0) {
×
UNCOV
1021
    int32_t winCode = TSDB_CODE_SUCCESS;
×
UNCOV
1022
    code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId,
×
1023
                                       &curPoint, &prevPoint, &winCode);
UNCOV
1024
    QUERY_CHECK_CODE(code, lino, _end);
×
1025

UNCOV
1026
    SWinKey key = {.ts = curPoint.winKey.win.skey, .groupId = groupId};
×
UNCOV
1027
    if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) {
×
UNCOV
1028
      if (tSimpleHashGet(pAggSup->pResultRows, &key, sizeof(SWinKey)) == NULL) {
×
UNCOV
1029
        void* pTmp = taosArrayPush(pInfo->pDelWins, &key);
×
UNCOV
1030
        QUERY_CHECK_NULL(pTmp, code, lino, _end, terrno);
×
1031
      }
1032
    }
1033

UNCOV
1034
    curPoint.pResPos->beUpdated = true;
×
UNCOV
1035
    code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SWinKey), &curPoint.pResPos, POINTER_BYTES);
×
UNCOV
1036
    QUERY_CHECK_CODE(code, lino, _end);
×
1037

1038
    code =
UNCOV
1039
        setIntervalSliceOutputBuf(&pInfo->streamAggSup, &curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
×
UNCOV
1040
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1041
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curWin, 1);
×
UNCOV
1042
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
×
UNCOV
1043
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
UNCOV
1044
    QUERY_CHECK_CODE(code, lino, _end);
×
1045

UNCOV
1046
    int32_t prevEndPos = startPos;
×
UNCOV
1047
    startPos = getNextQualifiedFinalWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos);
×
1048
  }
1049

UNCOV
1050
  if (!pInfo->destHasPrimaryKey && !isHistoryOperator(&pInfo->basic)) {
×
UNCOV
1051
    code = closeNonblockIntervalWindow(pAggSup->pResultRows, &pInfo->twAggSup, &pInfo->interval, pInfo->pUpdated,
×
1052
                                       pTaskInfo);
UNCOV
1053
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1054
  } else if ((isHistoryOperator(&pInfo->basic) || isRecalculateOperator(&pInfo->basic)) &&
×
UNCOV
1055
             tSimpleHashGetSize(pAggSup->pResultRows) > pOperator->resultInfo.capacity * 10) {
×
1056
    code = copyNewResult(&pAggSup->pResultRows, pInfo->pUpdated, winPosCmprImpl);
×
1057
    QUERY_CHECK_CODE(code, lino, _end);
×
1058
  }
1059

UNCOV
1060
  if (!isHistoryOperator(&pInfo->basic) && !isRecalculateOperator(&pInfo->basic)) {
×
UNCOV
1061
    code = checkAndSaveWinStateToDisc(0, pInfo->pUpdated, 0, pInfo->basic.pTsDataState, &pInfo->streamAggSup, &pInfo->interval);
×
UNCOV
1062
    QUERY_CHECK_CODE(code, lino, _end);
×
1063
  }
1064

UNCOV
1065
_end:
×
UNCOV
1066
  if (code != TSDB_CODE_SUCCESS) {
×
1067
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1068
  }
UNCOV
1069
  return code;
×
1070
}
1071

UNCOV
1072
int32_t createFinalIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
×
1073
                                             SReadHandle* pHandle, SOperatorInfo** ppOptInfo) {
UNCOV
1074
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1075
  int32_t lino = 0;
×
UNCOV
1076
  code = createStreamIntervalSliceOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, ppOptInfo);
×
UNCOV
1077
  QUERY_CHECK_CODE(code, lino, _end);
×
1078

UNCOV
1079
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)(*ppOptInfo)->info;
×
UNCOV
1080
  pInfo->nbSup.pWindowAggFn = doStreamFinalntervalNonblockAggImpl;
×
UNCOV
1081
  pInfo->nbSup.numOfChild = pHandle->numOfVgroups;
×
UNCOV
1082
  pInfo->streamAggSup.pScanBlock->info.type = STREAM_RETRIEVE;
×
UNCOV
1083
  pInfo->nbSup.tsOfKeep = INT64_MIN;
×
UNCOV
1084
  pInfo->twAggSup.waterMark = 0;
×
UNCOV
1085
  setFinalOperatorFlag(&pInfo->basic);
×
UNCOV
1086
  adjustDownstreamBasicInfo(downstream, &pInfo->basic);
×
1087

UNCOV
1088
_end:
×
UNCOV
1089
  if (code != TSDB_CODE_SUCCESS) {
×
1090
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1091
  }
UNCOV
1092
  return code;
×
1093
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc