• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3796

31 Mar 2025 10:39AM UTC coverage: 30.372% (-7.1%) from 37.443%
#3796

push

travis-ci

happyguoxy
test:add test cases

69287 of 309062 branches covered (22.42%)

Branch coverage included in aggregate %.

118044 of 307720 relevant lines covered (38.36%)

278592.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/streamintervalnonblockoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "streamexecutorInt.h"
22
#include "streaminterval.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
#define STREAM_INTERVAL_NONBLOCK_OP_STATE_NAME "StreamIntervalNonblockHistoryState"
33

34
// static int32_t buildWinResultKey(SRowBuffPos* pPos, SSHashObj* pUpdatedMap, SSHashObj* pResultCache, bool savePos) {
35
//   int32_t      code = TSDB_CODE_SUCCESS;
36
//   int32_t      lino = 0;
37
//   SWinKey*     pKey = pPos->pKey;
38
//   SRowBuffPos* pPrevResPos = tSimpleHashGet(pResultCache, &pKey->groupId, sizeof(uint64_t));
39
//   if (pPrevResPos != NULL) {
40
//     SWinKey* pPrevResKey = (SWinKey*)pPrevResPos->pKey;
41
//     if (savePos) {
42
//       code = tSimpleHashPut(pUpdatedMap, pPrevResKey, sizeof(SWinKey), &pPrevResPos, POINTER_BYTES);
43
//     } else {
44
//       code = tSimpleHashPut(pUpdatedMap, pPrevResKey, sizeof(SWinKey), NULL, 0);
45
//     }
46
//     QUERY_CHECK_CODE(code, lino, _end);
47
//   }
48
//   code = tSimpleHashPut(pResultCache, &pKey->groupId, sizeof(uint64_t), &pPos, POINTER_BYTES);
49
//   QUERY_CHECK_CODE(code, lino, _end);
50

51
// _end:
52
//   if (code != TSDB_CODE_SUCCESS) {
53
//     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
54
//   }
55
//   return code;
56
// }
57

58
void releaseFlusedPos(void* pRes) {
×
59
  SRowBuffPos* pPos = *(SRowBuffPos**)pRes;
×
60
  if (pPos != NULL && pPos->needFree) {
×
61
    pPos->beUsed = false;
×
62
  }
63
}
×
64

65
void getStateKeepInfo(SNonBlockAggSupporter* pNbSup, bool isRecOp, int32_t* pNumRes, TSKEY* pTsRes) {
×
66
  if (isRecOp) {
×
67
    (*pNumRes) = 0;
×
68
    (*pTsRes) = INT64_MIN;
×
69
  } else {
70
    (*pNumRes) = pNbSup->numOfKeep;
×
71
    (*pTsRes) = pNbSup->tsOfKeep;
×
72
  }
73
}
×
74

75
void streamIntervalNonblockReleaseState(SOperatorInfo* pOperator) {
×
76
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
77
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
78
  pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, pInfo->nbSup.numOfKeep, pInfo->nbSup.tsOfKeep);
×
79
  pAggSup->stateStore.streamStateCommit(pAggSup->pState);
×
80
  int32_t resSize = sizeof(TSKEY);
×
81
  pAggSup->stateStore.streamStateSaveInfo(pAggSup->pState, STREAM_INTERVAL_NONBLOCK_OP_STATE_NAME,
×
82
                                          strlen(STREAM_INTERVAL_NONBLOCK_OP_STATE_NAME), &pInfo->twAggSup.maxTs,
×
83
                                          resSize);
84

85
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
86
  if (downstream->fpSet.releaseStreamStateFn) {
×
87
    downstream->fpSet.releaseStreamStateFn(downstream);
×
88
  }
89
  qDebug("%s===stream===streamIntervalNonblockReleaseState:%" PRId64, GET_TASKID(pOperator->pTaskInfo),
×
90
         pInfo->twAggSup.maxTs);
91
}
×
92

93
void streamIntervalNonblockReloadState(SOperatorInfo* pOperator) {
×
94
  int32_t                           code = TSDB_CODE_SUCCESS;
×
95
  int32_t                           lino = 0;
×
96
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
97
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
98
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
99
  int32_t                           size = 0;
×
100
  void*                             pBuf = NULL;
×
101
  code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_INTERVAL_NONBLOCK_OP_STATE_NAME,
×
102
                                                strlen(STREAM_INTERVAL_NONBLOCK_OP_STATE_NAME), &pBuf, &size);
103
  QUERY_CHECK_CODE(code, lino, _end);
×
104

105
  TSKEY ts = *(TSKEY*)pBuf;
×
106
  taosMemoryFreeClear(pBuf);
×
107
  pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
×
108
  pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts);
×
109

110
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
111
  if (downstream->fpSet.reloadStreamStateFn) {
×
112
    downstream->fpSet.reloadStreamStateFn(downstream);
×
113
  }
114
  qDebug("%s===stream===streamIntervalNonblockReloadState:%" PRId64, GET_TASKID(pOperator->pTaskInfo), ts);
×
115

116
_end:
×
117
  if (code != TSDB_CODE_SUCCESS) {
×
118
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
119
  }
120
}
×
121

122
int32_t saveRecWindowToDisc(SSessionKey* pWinKey, uint64_t uid, EStreamType mode, STableTsDataState* pTsDataState,
×
123
                            SStreamAggSupporter* pAggSup) {
124
  int32_t len = copyRecDataToBuff(pWinKey->win.skey, pWinKey->win.ekey, uid, -1, mode, NULL, 0,
×
125
                                  pTsDataState->pRecValueBuff, pTsDataState->recValueLen);
126
  return pAggSup->stateStore.streamStateSessionSaveToDisk(pTsDataState, pWinKey, pTsDataState->pRecValueBuff, len);
×
127
}
128

129
static int32_t checkAndSaveWinStateToDisc(int32_t startIndex, SArray* pUpdated, uint64_t uid,
×
130
                                          STableTsDataState* pTsDataState, SStreamAggSupporter* pAggSup,
131
                                          SInterval* pInterval) {
132
  int32_t code = TSDB_CODE_SUCCESS;
×
133
  int32_t lino = 0;
×
134
  int32_t mode = 0;
×
135
  int32_t size = taosArrayGetSize(pUpdated);
×
136
  for (int32_t i = startIndex; i < size; i++) {
×
137
    SRowBuffPos* pWinPos = taosArrayGetP(pUpdated, i);
×
138
    SWinKey*     pKey = pWinPos->pKey;
×
139
    int32_t      winRes = pAggSup->stateStore.streamStateGetRecFlag(pAggSup->pState, pKey, sizeof(SWinKey), &mode);
×
140
    if (winRes == TSDB_CODE_SUCCESS) {
×
141
      SSessionKey winKey = {
×
142
          .win.skey = pKey->ts, .win.ekey = taosTimeGetIntervalEnd(pKey->ts, pInterval), .groupId = pKey->groupId};
×
143
      code = saveRecWindowToDisc(&winKey, uid, mode, pTsDataState, pAggSup);
×
144
      QUERY_CHECK_CODE(code, lino, _end);
×
145
    }
146
  }
147

148
_end:
×
149
  if (code != TSDB_CODE_SUCCESS) {
×
150
    qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
151
  }
152
  return code;
×
153
}
154

155
int32_t doStreamIntervalNonblockAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
×
156
  int32_t                           code = TSDB_CODE_SUCCESS;
×
157
  int32_t                           lino = 0;
×
158
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)pOperator->info;
×
159
  SResultRowInfo*                   pResultRowInfo = &(pInfo->binfo.resultRowInfo);
×
160
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
161
  SExprSupp*                        pSup = &pOperator->exprSupp;
×
162
  int32_t                           numOfOutput = pSup->numOfExprs;
×
163
  TSKEY*                            tsCols = NULL;
×
164
  int64_t                           groupId = pBlock->info.id.groupId;
×
165
  SResultRow*                       pResult = NULL;
×
166
  int32_t                           forwardRows = 0;
×
167

168
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
×
169
  tsCols = (int64_t*)pColDataInfo->pData;
×
170

171
  int32_t            startPos = 0;
×
172
  TSKEY              curTs = getStartTsKey(&pBlock->info.window, tsCols);
×
173
  SInervalSlicePoint curPoint = {0};
×
174
  SInervalSlicePoint prevPoint = {0};
×
175
  STimeWindow        curWin = getActiveTimeWindow(NULL, pResultRowInfo, curTs, &pInfo->interval, TSDB_ORDER_ASC);
×
176
  while (1) {
×
177
    int32_t winCode = TSDB_CODE_SUCCESS;
×
178
    code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId,
×
179
                                       &curPoint, &prevPoint, &winCode);
180
    QUERY_CHECK_CODE(code, lino, _end);
×
181

182
    if (winCode != TSDB_CODE_SUCCESS && pInfo->hasInterpoFunc == false && pInfo->nbSup.numOfKeep == 1) {
×
183
      SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = groupId};
×
184
      code = getIntervalSlicePrevStateBuf(&pInfo->streamAggSup, &pInfo->interval, &curKey, &prevPoint);
×
185
      QUERY_CHECK_CODE(code, lino, _end);
×
186
    }
187

188
    if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && winCode != TSDB_CODE_SUCCESS) {
×
189
      if (pInfo->hasInterpoFunc && isInterpoWindowFinished(&prevPoint) == false) {
×
190
        code = setIntervalSliceOutputBuf(&pInfo->streamAggSup, &prevPoint, pSup->pCtx, numOfOutput,
×
191
                                         pSup->rowEntryInfoOffset);
192
        QUERY_CHECK_CODE(code, lino, _end);
×
193

194
        resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
×
195
        doSetElapsedEndKey(prevPoint.winKey.win.ekey, &pOperator->exprSupp);
×
196
        doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos,
×
197
                                   &pOperator->exprSupp, INTERVAL_SLICE_END, pInfo->pOffsetInfo);
198
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1);
×
199
        code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
200
                                               pBlock->info.rows, numOfOutput);
×
201
        QUERY_CHECK_CODE(code, lino, _end);
×
202
        setInterpoWindowFinished(&prevPoint);
×
203
      }
204

205
      if (pInfo->nbSup.numOfKeep == 1) {
×
206
        void* pResPtr = taosArrayPush(pInfo->pUpdated, &prevPoint.pResPos);
×
207
        QUERY_CHECK_NULL(pResPtr, code, lino, _end, terrno);
×
208
        int32_t  mode = 0;
×
209
        SWinKey* pKey = prevPoint.pResPos->pKey;
×
210
        int32_t  winRes = pInfo->streamAggSup.stateStore.streamStateGetRecFlag(pInfo->streamAggSup.pState, pKey,
×
211
                                                                               sizeof(SWinKey), &mode);
212
        if (winRes == TSDB_CODE_SUCCESS) {
×
213
          code = saveRecWindowToDisc(&prevPoint.winKey, pBlock->info.id.uid, mode, pInfo->basic.pTsDataState,
×
214
                                     &pInfo->streamAggSup);
215
          QUERY_CHECK_CODE(code, lino, _end);
×
216
        }
217
      } else {
218
        SWinKey curKey = {.groupId = groupId};
×
219
        curKey.ts = taosTimeAdd(curTs, -pInfo->interval.interval, pInfo->interval.intervalUnit,
×
220
                                pInfo->interval.precision, NULL) +
×
221
                    1;
222
        int32_t startIndex = taosArrayGetSize(pInfo->pUpdated);
×
223
        code = pInfo->streamAggSup.stateStore.streamStateGetAllPrev(pInfo->streamAggSup.pState, &curKey,
×
224
                                                                    pInfo->pUpdated, pInfo->nbSup.numOfKeep);
225
        QUERY_CHECK_CODE(code, lino, _end);
×
226
        if (!isRecalculateOperator(&pInfo->basic)) {
×
227
          code = checkAndSaveWinStateToDisc(startIndex, pInfo->pUpdated, 0, pInfo->basic.pTsDataState, &pInfo->streamAggSup, &pInfo->interval);
×
228
          QUERY_CHECK_CODE(code, lino, _end);
×
229
        }
230
      }
231
    }
232

233
    code =
234
        setIntervalSliceOutputBuf(&pInfo->streamAggSup, &curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
×
235
    QUERY_CHECK_CODE(code, lino, _end);
×
236

237
    resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
×
238
    if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) {
×
239
      doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos,
×
240
                                 &pOperator->exprSupp, INTERVAL_SLICE_START, pInfo->pOffsetInfo);
241
    }
242
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
×
243
                                           TSDB_ORDER_ASC);
244
    int32_t prevEndPos = (forwardRows - 1) + startPos;
×
245
    if (pInfo->hasInterpoFunc) {
×
246
      int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false);
×
247
      TSKEY   endRowTs = tsCols[endRowId];
×
248
      transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL, pInfo->pOffsetInfo);
×
249
    }
250

251
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curPoint.winKey.win, 1);
×
252
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
×
253
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
254
    QUERY_CHECK_CODE(code, lino, _end);
×
255
    curPoint.pResPos->beUpdated = true;
×
256

257
    if (curPoint.pLastRow->key == curPoint.winKey.win.ekey) {
×
258
      setInterpoWindowFinished(&curPoint);
×
259
    }
260
    releaseOutputBuf(pInfo->streamAggSup.pState, curPoint.pResPos, &pInfo->streamAggSup.stateStore);
×
261

262
    startPos = getNextQualifiedWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
×
263
    if (startPos < 0) {
×
264
      break;
×
265
    }
266
    curTs = tsCols[startPos];
×
267
  }
268

269
_end:
×
270
  if (code != TSDB_CODE_SUCCESS) {
×
271
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
272
  }
273
  return code;
×
274
}
275

276
int32_t getHistoryRemainResultInfo(SStreamAggSupporter* pAggSup, int32_t numOfState, SArray* pUpdated,
×
277
                                   int32_t capacity) {
278
  int32_t code = TSDB_CODE_SUCCESS;
×
279
  int32_t lino = 0;
×
280

281
  if (pAggSup->pCur == NULL) {
×
282
    goto _end;
×
283
  }
284

285
  int32_t num = capacity - taosArrayGetSize(pUpdated);
×
286
  for (int32_t i = 0; i < num; i++) {
×
287
    int32_t winCode = pAggSup->stateStore.streamStateNLastStateGetKVByCur(pAggSup->pCur, numOfState, pUpdated);
×
288
    if (winCode == TSDB_CODE_FAILED) {
×
289
      pAggSup->stateStore.streamStateFreeCur(pAggSup->pCur);
×
290
      pAggSup->pCur = NULL;
×
291
      break;
×
292
    }
293

294
    pAggSup->stateStore.streamStateLastStateCurNext(pAggSup->pCur);
×
295
    num = capacity - taosArrayGetSize(pUpdated);
×
296
  }
297

298
_end:
×
299
  if (code != TSDB_CODE_SUCCESS) {
×
300
    qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
301
  }
302
  return code;
×
303
}
304

305
int32_t buildIntervalHistoryResult(SOperatorInfo* pOperator) {
×
306
  int32_t                           code = TSDB_CODE_SUCCESS;
×
307
  int32_t                           lino = 0;
×
308
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
309
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
310
  SStreamNotifyEventSupp*           pNotifySup = &pInfo->basic.notifyEventSup;
×
311
  bool                              addNotifyEvent = false;
×
312

313
  code = getHistoryRemainResultInfo(pAggSup, pInfo->nbSup.numOfKeep, pInfo->pUpdated, pOperator->resultInfo.capacity);
×
314
  QUERY_CHECK_CODE(code, lino, _end);
×
315
  if (taosArrayGetSize(pInfo->pUpdated) > 0) {
×
316
    taosArraySort(pInfo->pUpdated, winPosCmprImpl);
×
317
    if (pInfo->nbSup.numOfKeep > 1) {
×
318
      taosArrayRemoveDuplicate(pInfo->pUpdated, winPosCmprImpl, releaseFlusedPos);
×
319
    }
320
    initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
×
321
    pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
×
322
    QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
×
323

324
    doBuildStreamIntervalResult(pOperator, pInfo->streamAggSup.pState, pInfo->binfo.pRes, &pInfo->groupResInfo, addNotifyEvent ? pNotifySup->pSessionKeys : NULL);
×
325
  }
326

327
_end:
×
328
  if (code != TSDB_CODE_SUCCESS) {
×
329
    qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
×
330
  }
331
  return code;
×
332
}
333

334
static void removeDataDeleteResults(SArray* pUpdatedWins, SArray* pDelWins) {
×
335
  int32_t size = taosArrayGetSize(pUpdatedWins);
×
336
  int32_t delSize = taosArrayGetSize(pDelWins);
×
337
  if (delSize == 0 || size == 0) {
×
338
    return;
×
339
  }
340
  taosArraySort(pDelWins, winKeyCmprImpl);
×
341
  taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL);
×
342
  delSize = taosArrayGetSize(pDelWins);
×
343

344
  for (int32_t i = 0; i < size; i++) {
×
345
    SRowBuffPos* pPos = (SRowBuffPos*) taosArrayGetP(pUpdatedWins, i);
×
346
    SWinKey*     pResKey = (SWinKey*)pPos->pKey;
×
347
    int32_t      index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinKey);
×
348
    if (index >= 0 && 0 == compareWinKey(pResKey, pDelWins, index)) {
×
349
      taosArrayRemove(pDelWins, index);
×
350
      delSize = taosArrayGetSize(pDelWins);
×
351
    }
352
  }
353
}
354

355
static int32_t doTransformRecalculateWindows(SExecTaskInfo* pTaskInfo, SInterval* pInterval, SSDataBlock* pBlock,
×
356
                                             SArray* pUpWins) {
357
  int32_t          code = TSDB_CODE_SUCCESS;
×
358
  int32_t          lino = 0;
×
359
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
360
  TSKEY*           startTsCols = (TSKEY*)pStartTsCol->pData;
×
361
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
362
  TSKEY*           endTsCols = (TSKEY*)pEndTsCol->pData;
×
363
  SColumnInfoData* pCalStTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
364
  TSKEY*           calStTsCols = (TSKEY*)pCalStTsCol->pData;
×
365
  SColumnInfoData* pCalEnTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
366
  TSKEY*           calEnTsCols = (TSKEY*)pCalEnTsCol->pData;
×
367
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
368
  uint64_t*        pGpDatas = (uint64_t*)pGpCol->pData;
×
369
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
370
    STimeWindow win = {.skey = startTsCols[i], .ekey = endTsCols[i]};
×
371
    do {
372
      if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i], pBlock->info.type)) {
×
373
        getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
×
374
        continue;
×
375
      }
376

377
      uint64_t winGpId = pGpDatas[i];
×
378
      SWinKey  winRes = {.ts = win.skey, .groupId = winGpId};
×
379
      void*    pTmp = taosArrayPush(pUpWins, &winRes);
×
380
      QUERY_CHECK_NULL(pTmp, code, lino, _end, terrno);
×
381
      getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
×
382
    } while (win.ekey <= endTsCols[i]);
×
383
  }
384
_end:
×
385
  if (code != TSDB_CODE_SUCCESS) {
×
386
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
387
  }
388
  return code;
×
389
}
390

391
static int32_t buildOtherResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
392
  int32_t                           code = TSDB_CODE_SUCCESS;
×
393
  int32_t                           lino = 0;
×
394
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
395
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
396
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
397
  if (isHistoryOperator(&pInfo->basic) && isSingleOperator(&pInfo->basic)) {
×
398
    code = buildIntervalHistoryResult(pOperator);
×
399
    QUERY_CHECK_CODE(code, lino, _end);
×
400
    if (pInfo->binfo.pRes->info.rows != 0) {
×
401
      printDataBlock(pInfo->binfo.pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
402
      (*ppRes) = pInfo->binfo.pRes;
×
403
      return code;
×
404
    }
405
  }
406

407
  if (pInfo->recvCkBlock) {
×
408
    pInfo->recvCkBlock = false;
×
409
    printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
410
    (*ppRes) = pInfo->pCheckpointRes;
×
411
    return code;
×
412
  }
413

414
  if (pInfo->twAggSup.minTs != INT64_MAX) {
×
415
    pInfo->nbSup.tsOfKeep = pInfo->twAggSup.minTs;
×
416
  }
417

418
  if ( !(isFinalOperator(&pInfo->basic) && (isRecalculateOperator(&pInfo->basic) || isHistoryOperator(&pInfo->basic))) ) {
×
419
    int32_t numOfKeep = 0;
×
420
    TSKEY tsOfKeep = INT64_MAX;
×
421
    getStateKeepInfo(&pInfo->nbSup, isRecalculateOperator(&pInfo->basic), &numOfKeep, &tsOfKeep);
×
422
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, numOfKeep, tsOfKeep);
×
423
  }
424

425
  pInfo->twAggSup.minTs = INT64_MAX;
×
426
  pInfo->basic.numOfRecv = 0;
×
427
  setStreamOperatorCompleted(pOperator);
×
428
  if (isFinalOperator(&pInfo->basic) && isRecalculateOperator(&pInfo->basic) && tSimpleHashGetSize(pInfo->nbSup.pPullDataMap) == 0) {
×
429
    qInfo("===stream===%s recalculate is finished.", GET_TASKID(pTaskInfo));
×
430
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, 0, INT64_MAX);
×
431
    pTaskInfo->streamInfo.recoverScanFinished = true;
×
432
  }
433
  (*ppRes) = NULL;
×
434

435
_end:
×
436
  if (code != TSDB_CODE_SUCCESS) {
×
437
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
438
  }
439
  return code;
×
440
}
441

442
int32_t copyNewResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar) {
×
443
  int32_t code = TSDB_CODE_SUCCESS;
×
444
  int32_t lino = 0;
×
445
  void*   pIte = NULL;
×
446
  int32_t iter = 0;
×
447
  while ((pIte = tSimpleHashIterate(*ppWinUpdated, pIte, &iter)) != NULL) {
×
448
    void* tmp = taosArrayPush(pUpdated, pIte);
×
449
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
×
450
  }
451
  taosArraySort(pUpdated, compar);
×
452
  if (tSimpleHashGetSize(*ppWinUpdated) < 4096) {
×
453
    tSimpleHashClear(*ppWinUpdated);
×
454
  } else {
455
    tSimpleHashClear(*ppWinUpdated);
×
456
    tSimpleHashCleanup(*ppWinUpdated);
×
457
    _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
×
458
    (*ppWinUpdated) = tSimpleHashInit(1024, hashFn);
×
459
  }
460

461
_end:
×
462
  if (code != TSDB_CODE_SUCCESS) {
×
463
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
464
  }
465
  return code;
×
466
}
467

468
static int32_t closeNonblockIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SInterval* pInterval,
×
469
                                           SArray* pUpdated, SExecTaskInfo* pTaskInfo) {
470
  int32_t code = TSDB_CODE_SUCCESS;
×
471
  int32_t lino = 0;
×
472
  void*   pIte = NULL;
×
473
  int32_t iter = 0;
×
474
  while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
×
475
    void*    key = tSimpleHashGetKey(pIte, NULL);
×
476
    SWinKey* pWinKey = (SWinKey*)key;
×
477

478
    STimeWindow win = {
×
479
        .skey = pWinKey->ts,
×
480
        .ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1,
×
481
    };
482

483
    if (isCloseWindow(&win, pTwSup)) {
×
484
      void* pTemp = taosArrayPush(pUpdated, pIte);
×
485
      QUERY_CHECK_NULL(pTemp, code, lino, _end, terrno);
×
486

487
      int32_t tmpRes = tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
×
488
      qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
×
489
    }
490
  }
491

492
_end:
×
493
  if (code != TSDB_CODE_SUCCESS) {
×
494
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
495
  }
496
  return code;
×
497
}
498

499
static int32_t doProcessRecalculateReq(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
×
500
  int32_t                           code = TSDB_CODE_SUCCESS;
×
501
  int32_t                           lino = 0;
×
502
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
503
  SInterval*                        pInterval = &pInfo->interval;
×
504
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
505
  SColumnInfoData*                  pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
506
  TSKEY*                            startTsCols = (TSKEY*)pStartTsCol->pData;
×
507
  SColumnInfoData*                  pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
508
  TSKEY*                            endTsCols = (TSKEY*)pEndTsCol->pData;
×
509
  SColumnInfoData*                  pCalStTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
510
  TSKEY*                            calStTsCols = (TSKEY*)pCalStTsCol->pData;
×
511
  SColumnInfoData*                  pCalEnTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
512
  TSKEY*                            calEnTsCols = (TSKEY*)pCalEnTsCol->pData;
×
513
  SColumnInfoData*                  pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
514
  uint64_t*                         pGpDatas = (uint64_t*)pGpCol->pData;
×
515
  SColumnInfoData*                  pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
×
516
  uint64_t*                         pUidDatas = (uint64_t*)pUidCol->pData;
×
517

518
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
519
    SResultRowInfo dumyInfo = {0};
×
520
    dumyInfo.cur.pageId = -1;
×
521

522
    STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
×
523

524
    do {
525
      if (!inCalSlidingWindow(pInterval, &win, calStTsCols[i], calEnTsCols[i], pBlock->info.type)) {
×
526
        getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
×
527
        continue;
×
528
      }
529

530
      SWinKey key = {.ts = win.skey, .groupId = pGpDatas[i]};
×
531
      bool isLastWin = false;
×
532
      if (pInfo->streamAggSup.stateStore.streamStateCheck(pInfo->streamAggSup.pState, &key,
×
533
                                                          isFinalOperator(&pInfo->basic), &isLastWin)) {
×
534
        qDebug("===stream===%s set recalculate flag ts:%" PRId64 ",group id:%" PRIu64, GET_TASKID(pTaskInfo), key.ts,
×
535
               key.groupId);
536
        pInfo->streamAggSup.stateStore.streamStateSetRecFlag(pInfo->streamAggSup.pState, &key, sizeof(SWinKey),
×
537
                                                             pBlock->info.type);
×
538
        if ((isFinalOperator(&pInfo->basic) && isCloseWindow(&win, &pInfo->twAggSup)) || (isSingleOperator(&pInfo->basic) && isLastWin == false) ) {
×
539
          SSessionKey winKey = {.win = win, .groupId = key.groupId};
×
540
          code = saveRecWindowToDisc(&winKey, pUidDatas[i], pBlock->info.type, pInfo->basic.pTsDataState,
×
541
                                     &pInfo->streamAggSup);
542
          QUERY_CHECK_CODE(code, lino, _end);
×
543
        }
544
      } else {
545
        SSessionKey winKey = {.win = win, .groupId = key.groupId};
×
546
        code = saveRecWindowToDisc(&winKey, pUidDatas[i], pBlock->info.type, pInfo->basic.pTsDataState,
×
547
                                   &pInfo->streamAggSup);
548
        QUERY_CHECK_CODE(code, lino, _end);
×
549
      }
550
      getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
×
551
    } while (win.ekey <= endTsCols[i]);
×
552
  }
553
_end:
×
554
  if (code != TSDB_CODE_SUCCESS) {
×
555
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
556
  }
557
  return code;
×
558
}
559

560
static int32_t addDataPullWindowInfo(SSHashObj* pPullMap, SPullWindowInfo* pPullKey, int32_t numOfChild, SExecTaskInfo* pTaskInfo) {
×
561
  int32_t code = TSDB_CODE_SUCCESS;
×
562
  int32_t lino = 0;
×
563
  SArray* childIds = taosArrayInit(numOfChild, sizeof(int32_t));
×
564
  QUERY_CHECK_NULL(childIds, code, lino, _end, terrno);
×
565
  for (int32_t i = 0; i < numOfChild; i++) {
×
566
    void* pTemp = taosArrayPush(childIds, &i);
×
567
    QUERY_CHECK_NULL(pTemp, code, lino, _end, terrno);
×
568
  }
569
  code = tSimpleHashPut(pPullMap, pPullKey, sizeof(SPullWindowInfo), &childIds, POINTER_BYTES);
×
570
  QUERY_CHECK_CODE(code, lino, _end);
×
571
_end:
×
572
  if (code != TSDB_CODE_SUCCESS) {
×
573
    qError("%s %s failed at line %d since %s", GET_TASKID(pTaskInfo), __func__, lino, tstrerror(code));
×
574
  }
575
  return code;
×
576
}
577

578
int32_t buildRetriveRequest(SExecTaskInfo* pTaskInfo, SStreamAggSupporter* pAggSup, STableTsDataState* pTsDataState,
×
579
                            SNonBlockAggSupporter* pNbSup) {
580
  int32_t code = TSDB_CODE_SUCCESS;
×
581
  int32_t lino = 0;
×
582
  code = pAggSup->stateStore.streamStateMergeAllScanRange(pTsDataState);
×
583
  QUERY_CHECK_CODE(code, lino, _end);
×
584

585
  while (1) {
×
586
    SScanRange range = {0};
×
587
    code = pAggSup->stateStore.streamStatePopScanRange(pTsDataState, &range);
×
588
    QUERY_CHECK_CODE(code, lino, _end);
×
589
    if (IS_INVALID_RANGE(range)) {
×
590
      break;
×
591
    }
592
    void*   pIte = NULL;
×
593
    int32_t iter = 0;
×
594
    while ((pIte = tSimpleHashIterate(range.pGroupIds, pIte, &iter)) != NULL) {
×
595
      uint64_t        groupId = *(uint64_t*)tSimpleHashGetKey(pIte, NULL);
×
596
      SPullWindowInfo pullReq = {.window = range.win, .groupId = groupId, .calWin = range.calWin};
×
597
      code = addDataPullWindowInfo(pNbSup->pPullDataMap, &pullReq, pNbSup->numOfChild, pTaskInfo);
×
598
      QUERY_CHECK_CODE(code, lino, _end);
×
599
      void*           pTemp = taosArrayPush(pNbSup->pPullWins, &pullReq);
×
600
      QUERY_CHECK_NULL(pTemp, code, lino, _end, terrno);
×
601
    }
602
  }
603

604
_end:
×
605
  if (code != TSDB_CODE_SUCCESS) {
×
606
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
607
    pTaskInfo->code = code;
×
608
  }
609
  return code;
×
610
}
611

612
int32_t processDataPullOver(SSDataBlock* pBlock, SSHashObj* pPullMap, SExecTaskInfo* pTaskInfo) {
×
613
  int32_t          code = TSDB_CODE_SUCCESS;
×
614
  int32_t          lino = 0;
×
615
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
616
  TSKEY*           pTsStartData = (TSKEY*)pStartCol->pData;
×
617
  SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
618
  TSKEY*           pTsEndData = (TSKEY*)pEndCol->pData;
×
619
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
×
620
  TSKEY*           pCalTsStartData = (TSKEY*)pCalStartCol->pData;
×
621
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
×
622
  TSKEY*           pCalTsEndData = (TSKEY*)pCalEndCol->pData;
×
623
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
624
  uint64_t*        pGroupIdData = (uint64_t*)pGroupCol->pData;
×
625
  int32_t          chId = getChildIndex(pBlock);
×
626
  for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
627
    SPullWindowInfo pull = {.window.skey = pTsStartData[i],
×
628
                            .window.ekey = pTsEndData[i],
×
629
                            .groupId = pGroupIdData[i],
×
630
                            .calWin.skey = pCalTsStartData[i],
×
631
                            .calWin.ekey = pCalTsEndData[i]};
×
632
    void*           pChIds = tSimpleHashGet(pPullMap, &pull, sizeof(SPullWindowInfo));
×
633
    if (pChIds == NULL) {
×
634
      qInfo("===stream===%s did not find retrive window. ts:%" PRId64 ",groupId:%" PRIu64 ",child id %d",
×
635
            GET_TASKID(pTaskInfo), pull.window.skey, pull.groupId, chId);
636
      continue;
×
637
    }
638
    SArray* chArray = *(SArray**)pChIds;
×
639
    int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
×
640
    if (index == -1) {
×
641
      qInfo("===stream===%s did not find child id. retrive window ts:%" PRId64 ",groupId:%" PRIu64 ",child id %d",
×
642
            GET_TASKID(pTaskInfo), pull.window.skey, pull.groupId, chId);
643
      continue;
×
644
    }
645
    qDebug("===stream===%s retrive window %" PRId64 " delete child id %d", GET_TASKID(pTaskInfo), pull.window.skey,
×
646
           chId);
647
    taosArrayRemove(chArray, index);
×
648
    if (taosArrayGetSize(chArray) == 0) {
×
649
      // pull data is over
650
      taosArrayDestroy(chArray);
×
651
      int32_t tmpRes = tSimpleHashRemove(pPullMap, &pull, sizeof(SPullWindowInfo));
×
652
       qDebug("===stream===%s retrive pull data over. ts:%" PRId64 ",groupId:%" PRIu64 , GET_TASKID(pTaskInfo), pull.window.skey,
×
653
              pull.groupId);
654
    }
655
  }
656

657
_end:
×
658
  if (code != TSDB_CODE_SUCCESS) {
×
659
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
660
  }
661
  return code;
×
662
}
663

664
int32_t doStreamIntervalNonblockAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
665
  int32_t                           code = TSDB_CODE_SUCCESS;
×
666
  int32_t                           lino = 0;
×
667
  SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
×
668
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
669
  SStorageAPI*                      pAPI = &pOperator->pTaskInfo->storageAPI;
×
670
  SExprSupp*                        pSup = &pOperator->exprSupp;
×
671
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
672

673
  qDebug("stask:%s  %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
×
674

675
  if (pOperator->status == OP_EXEC_DONE) {
×
676
    (*ppRes) = NULL;
×
677
    return code;
×
678
  }
679

680
  code = buildIntervalSliceResult(pOperator, ppRes);
×
681
  QUERY_CHECK_CODE(code, lino, _end);
×
682
  if ((*ppRes) != NULL) {
×
683
    return code;
×
684
  }
685

686
  if (isHistoryOperator(&pInfo->basic) && !isFinalOperator(&pInfo->basic)) {
×
687
    pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState, pInfo->nbSup.numOfKeep, pInfo->nbSup.tsOfKeep);
×
688
  }
689

690
  if (pOperator->status == OP_RES_TO_RETURN) {
×
691
    return buildOtherResult(pOperator, ppRes);
×
692
  }
693

694
  SOperatorInfo* downstream = pOperator->pDownstream[0];
×
695

696
  while (1) {
×
697
    if (isTaskKilled(pTaskInfo)) {
×
698
      qInfo("===stream===%s task is killed, code %s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
×
699
      (*ppRes) = NULL;
×
700
      return code;
×
701
    }
702
    SSDataBlock* pBlock = NULL;
×
703
    code = downstream->fpSet.getNextFn(downstream, &pBlock);
×
704
    QUERY_CHECK_CODE(code, lino, _end);
×
705

706
    if (pBlock == NULL) {
×
707
      qDebug("===stream===%s return data:%s. rev rows:%d", GET_TASKID(pTaskInfo),
×
708
             getStreamOpName(pOperator->operatorType), pInfo->basic.numOfRecv);
709
      if (isFinalOperator(&pInfo->basic)) {
×
710
        if (isRecalculateOperator(&pInfo->basic)) {
×
711
          code = buildRetriveRequest(pTaskInfo, pAggSup, pInfo->basic.pTsDataState, &pInfo->nbSup);
×
712
          QUERY_CHECK_CODE(code, lino, _end);
×
713
        } else {
714
          code = pAggSup->stateStore.streamStateFlushReaminInfoToDisk(pInfo->basic.pTsDataState);
×
715
          QUERY_CHECK_CODE(code, lino, _end);
×
716
        }
717
      }
718
      pOperator->status = OP_RES_TO_RETURN;
×
719
      break;
×
720
    }
721
    pInfo->basic.numOfRecv += pBlock->info.rows;
×
722

723
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
×
724
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
×
725

726
    switch (pBlock->info.type) {
×
727
      case STREAM_NORMAL:
×
728
      case STREAM_INVALID:
729
      case STREAM_PULL_DATA: {
730
        SExprSupp* pExprSup = &pInfo->scalarSup;
×
731
        if (pExprSup->pExprInfo != NULL) {
×
732
          code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
×
733
          QUERY_CHECK_CODE(code, lino, _end);
×
734
        }
735
      } break;
×
736
      case STREAM_CHECKPOINT: {
×
737
        pInfo->recvCkBlock = true;
×
738
        pAggSup->stateStore.streamStateCommit(pAggSup->pState);
×
739
        code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
×
740
        QUERY_CHECK_CODE(code, lino, _end);
×
741
        continue;
×
742
      } break;
743
      case STREAM_CREATE_CHILD_TABLE:
×
744
      case STREAM_DROP_CHILD_TABLE: {
745
        printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
746
        (*ppRes) = pBlock;
×
747
        return code;
×
748
      } break;
749
      case STREAM_RECALCULATE_DATA:
×
750
      case STREAM_RECALCULATE_DELETE: {
751
        if (isRecalculateOperator(&pInfo->basic)) {
×
752
          if (!isSemiOperator(&pInfo->basic)) {
×
753
            code = doTransformRecalculateWindows(pTaskInfo, &pInfo->interval, pBlock, pInfo->pDelWins);
×
754
            QUERY_CHECK_CODE(code, lino, _end);
×
755
            if (isFinalOperator(&pInfo->basic)) {
×
756
              saveRecalculateData(&pAggSup->stateStore, pInfo->basic.pTsDataState, pBlock, pBlock->info.type);
×
757
            }
758
            continue;
×
759
          }
760
        }
761

762
        if (isSemiOperator(&pInfo->basic)) {
×
763
          (*ppRes) = pBlock;
×
764
          return code;
×
765
        } else {
766
          code = doProcessRecalculateReq(pOperator, pBlock);
×
767
          QUERY_CHECK_CODE(code, lino, _end);
×
768
        }
769
        continue;
×
770
      } break;
771
      case STREAM_PULL_OVER: {
×
772
        code = processDataPullOver(pBlock, pInfo->nbSup.pPullDataMap, pTaskInfo);
×
773
        QUERY_CHECK_CODE(code, lino, _end);
×
774
        continue;
×
775
      }      
776
      default: {
×
777
        qDebug("===stream===%s ignore recv block. type:%d", GET_TASKID(pTaskInfo), pBlock->info.type);
×
778
        continue;
×
779
      } break;
780
    }
781

782
    if (pBlock->info.type == STREAM_NORMAL && pBlock->info.version != 0) {
×
783
      // set input version
784
      pTaskInfo->version = pBlock->info.version;
×
785
    }
786

787
    code = setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
×
788
    QUERY_CHECK_CODE(code, lino, _end);
×
789

790
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
×
791
    code = pInfo->nbSup.pWindowAggFn(pOperator, pBlock);
×
792
    if (code == TSDB_CODE_STREAM_INTERNAL_ERROR) {
×
793
      pOperator->status = OP_RES_TO_RETURN;
×
794
      code = TSDB_CODE_SUCCESS;
×
795
    }
796
    QUERY_CHECK_CODE(code, lino, _end);
×
797

798
    if (pAggSup->pScanBlock->info.rows > 0) {
×
799
      (*ppRes) = pAggSup->pScanBlock;
×
800
      printDataBlock(pAggSup->pScanBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
801
      return code;
×
802
    }
803

804
    if (taosArrayGetSize(pInfo->pUpdated) > 0) {
×
805
      break;
×
806
    }
807
  }
808

809
  if (pOperator->status == OP_RES_TO_RETURN &&
×
810
      (isHistoryOperator(&pInfo->basic) || isRecalculateOperator(&pInfo->basic) || isSemiOperator(&pInfo->basic))) {
×
811
    code = copyNewResult(&pAggSup->pResultRows, pInfo->pUpdated, winPosCmprImpl);
×
812
    QUERY_CHECK_CODE(code, lino, _end);
×
813

814
    if (isSingleOperator(&pInfo->basic)) {
×
815
      if (pAggSup->pCur == NULL) {
×
816
        pAggSup->pCur = pAggSup->stateStore.streamStateGetLastStateCur(pAggSup->pState);
×
817
      }
818
      code =
819
          getHistoryRemainResultInfo(pAggSup, pInfo->nbSup.numOfKeep, pInfo->pUpdated, pOperator->resultInfo.capacity);
×
820
      QUERY_CHECK_CODE(code, lino, _end);
×
821
    }
822
  }
823

824
  if (pOperator->status == OP_RES_TO_RETURN && pInfo->destHasPrimaryKey && isFinalOperator(&pInfo->basic)) {
×
825
    code = closeNonblockIntervalWindow(pAggSup->pResultRows, &pInfo->twAggSup, &pInfo->interval, pInfo->pUpdated,
×
826
                                       pTaskInfo);
827
    QUERY_CHECK_CODE(code, lino, _end);
×
828
    if (!isHistoryOperator(&pInfo->basic) && !isRecalculateOperator(&pInfo->basic)) {
×
829
      code = checkAndSaveWinStateToDisc(0, pInfo->pUpdated, 0, pInfo->basic.pTsDataState, &pInfo->streamAggSup, &pInfo->interval);
×
830
      QUERY_CHECK_CODE(code, lino, _end);
×
831
    }
832
  }
833

834
  taosArraySort(pInfo->pUpdated, winPosCmprImpl);
×
835
  if (pInfo->nbSup.numOfKeep > 1) {
×
836
    taosArrayRemoveDuplicate(pInfo->pUpdated, winPosCmprImpl, releaseFlusedPos);
×
837
  }
838
  if (!isSemiOperator(&pInfo->basic) && !pInfo->destHasPrimaryKey) {
×
839
    removeDataDeleteResults(pInfo->pUpdated, pInfo->pDelWins);
×
840
  }
841

842
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
×
843
  pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
×
844
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
×
845

846
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
×
847
  QUERY_CHECK_CODE(code, lino, _end);
×
848

849
  code = buildIntervalSliceResult(pOperator, ppRes);
×
850
  QUERY_CHECK_CODE(code, lino, _end);
×
851
  if ((*ppRes) != NULL) {
×
852
    return code;
×
853
  }
854

855
  return buildOtherResult(pOperator, ppRes);
×
856

857
_end:
×
858
  if (code != TSDB_CODE_SUCCESS) {
×
859
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
860
    pTaskInfo->code = code;
×
861
  }
862
  return code;
×
863
}
864

865
int32_t doStreamSemiIntervalNonblockAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
×
866
  int32_t                           code = TSDB_CODE_SUCCESS;
×
867
  int32_t                           lino = 0;
×
868
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)pOperator->info;
×
869
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
870
  SResultRowInfo*                   pResultRowInfo = &(pInfo->binfo.resultRowInfo);
×
871
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
872
  SExprSupp*                        pSup = &pOperator->exprSupp;
×
873
  int32_t                           numOfOutput = pSup->numOfExprs;
×
874
  TSKEY*                            tsCols = NULL;
×
875
  int64_t                           groupId = pBlock->info.id.groupId;
×
876
  SResultRow*                       pResult = NULL;
×
877
  int32_t                           forwardRows = 0;
×
878

879
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
×
880
  tsCols = (int64_t*)pColDataInfo->pData;
×
881

882
  int32_t            startPos = 0;
×
883
  TSKEY              curTs = getStartTsKey(&pBlock->info.window, tsCols);
×
884
  SInervalSlicePoint curPoint = {0};
×
885
  SInervalSlicePoint prevPoint = {0};
×
886
  STimeWindow        curWin = getActiveTimeWindow(NULL, pResultRowInfo, curTs, &pInfo->interval, TSDB_ORDER_ASC);
×
887
  while (1) {
×
888
    int32_t winCode = TSDB_CODE_SUCCESS;
×
889
    code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId,
×
890
                                       &curPoint, &prevPoint, &winCode);
891
    QUERY_CHECK_CODE(code, lino, _end);
×
892

893
    if (winCode != TSDB_CODE_SUCCESS) {
×
894
      SWinKey key = {.ts = curPoint.winKey.win.skey, .groupId = groupId};
×
895
      code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SWinKey), &curPoint.pResPos, POINTER_BYTES);
×
896
      QUERY_CHECK_CODE(code, lino, _end);
×
897
    }
898

899
    code =
900
        setIntervalSliceOutputBuf(&pInfo->streamAggSup, &curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
×
901
    QUERY_CHECK_CODE(code, lino, _end);
×
902

903
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
×
904
                                           TSDB_ORDER_ASC);
905
    int32_t prevEndPos = (forwardRows - 1) + startPos;
×
906

907
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curPoint.winKey.win, 1);
×
908
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
×
909
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
910
    QUERY_CHECK_CODE(code, lino, _end);
×
911
    curPoint.pResPos->beUpdated = true;
×
912

913
    if (curPoint.pLastRow->key == curPoint.winKey.win.ekey) {
×
914
      setInterpoWindowFinished(&curPoint);
×
915
    }
916

917
    startPos = getNextQualifiedWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
×
918
    if (startPos < 0) {
×
919
      break;
×
920
    }
921
    curTs = tsCols[startPos];
×
922
  }
923

924
  if (isHistoryOperator(&pInfo->basic) &&
×
925
      tSimpleHashGetSize(pAggSup->pResultRows) > pOperator->resultInfo.capacity * 10) {
×
926
    code = copyNewResult(&pAggSup->pResultRows, pInfo->pUpdated, winPosCmprImpl);
×
927
    QUERY_CHECK_CODE(code, lino, _end);
×
928
  }
929

930
_end:
×
931
  if (code != TSDB_CODE_SUCCESS) {
×
932
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
933
  }
934
  return code;
×
935
}
936

937
void adjustDownstreamBasicInfo(SOperatorInfo* downstream, struct SSteamOpBasicInfo* pBasic) {
×
938
  SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
×
939
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
×
940
    SStreamPartitionOperatorInfo* pPartionInfo = downstream->info;
×
941
    pPartionInfo->basic.operatorFlag = pBasic->operatorFlag;
×
942
  }
943
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
×
944
    adjustDownstreamBasicInfo(downstream->pDownstream[0], pBasic);
×
945
  }
946
  SStreamScanInfo* pScanInfo = downstream->info;
×
947
  pScanInfo->basic.operatorFlag = pBasic->operatorFlag;
×
948
}
×
949

950
int32_t createSemiIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
×
951
                                            SReadHandle* pHandle, SOperatorInfo** ppOptInfo) {
952
  int32_t code = TSDB_CODE_SUCCESS;
×
953
  int32_t lino = 0;
×
954
  code = createStreamIntervalSliceOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, ppOptInfo);
×
955
  QUERY_CHECK_CODE(code, lino, _end);
×
956

957
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)(*ppOptInfo)->info;
×
958
  pInfo->nbSup.numOfKeep = 0;
×
959
  pInfo->nbSup.pWindowAggFn = doStreamSemiIntervalNonblockAggImpl;
×
960
  setSemiOperatorFlag(&pInfo->basic);
×
961
  adjustDownstreamBasicInfo(downstream, &pInfo->basic);
×
962

963
_end:
×
964
  if (code != TSDB_CODE_SUCCESS) {
×
965
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
966
  }
967
  return code;
×
968
}
969

970
bool isDataDeletedStreamWindow(SStreamIntervalSliceOperatorInfo* pInfo, STimeWindow* pWin, uint64_t groupId) {
×
971
  SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
×
972
  if (pWin->skey < pInfo->nbSup.tsOfKeep) {
×
973
    SWinKey key = {.ts = pWin->skey, .groupId = groupId};
×
974
    return !(pAggSup->stateStore.streamStateCheck(pAggSup->pState, &key, isFinalOperator(&pInfo->basic), NULL));
×
975
  }
976
  return false;
×
977
}
978

979
static int32_t doStreamFinalntervalNonblockAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
×
980
  int32_t                           code = TSDB_CODE_SUCCESS;
×
981
  int32_t                           lino = 0;
×
982
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)pOperator->info;
×
983
  SStreamAggSupporter*              pAggSup = &pInfo->streamAggSup;
×
984
  SResultRowInfo*                   pResultRowInfo = &(pInfo->binfo.resultRowInfo);
×
985
  SExecTaskInfo*                    pTaskInfo = pOperator->pTaskInfo;
×
986
  SExprSupp*                        pSup = &pOperator->exprSupp;
×
987
  int32_t                           numOfOutput = pSup->numOfExprs;
×
988
  SResultRow*                       pResult = NULL;
×
989
  int32_t                           forwardRows = 1;
×
990
  SColumnInfoData*                  pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
×
991
  TSKEY*                            tsCols = (int64_t*)pColDataInfo->pData;
×
992
  int32_t                           startPos = 0;
×
993
  uint64_t                          groupId = pBlock->info.id.groupId;
×
994
  SInervalSlicePoint                curPoint = {0};
×
995
  SInervalSlicePoint                prevPoint = {0};
×
996

997
  if (pAggSup->pScanBlock->info.rows > 0) {
×
998
    blockDataCleanup(pAggSup->pScanBlock);
×
999
  }
1000
  pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.ekey);
×
1001

1002
  blockDataEnsureCapacity(pAggSup->pScanBlock, pBlock->info.rows);
×
1003
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
×
1004
  STimeWindow curWin = getFinalTimeWindow(ts, &pInfo->interval);
×
1005
  while (startPos >= 0) {
×
1006
    if (!isHistoryOperator(&pInfo->basic) && isDataDeletedStreamWindow(pInfo, &curWin, groupId)) {
×
1007
      uint64_t uid = 0;
×
1008
      code = appendOneRowToSpecialBlockImpl(pAggSup->pScanBlock, &curWin.skey, &curWin.ekey, &curWin.skey, &curWin.skey,
×
1009
                                            &uid, &groupId, NULL, NULL);
1010
      QUERY_CHECK_CODE(code, lino, _end);
×
1011

1012
      startPos = getNextQualifiedFinalWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, startPos);
×
1013
    } else if (isRecalculateOperator(&pInfo->basic) && !inSlidingWindow(&pInfo->interval, &curWin, &pBlock->info)) {
×
1014
      startPos = getNextQualifiedFinalWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, startPos);
×
1015
    } else {
1016
      break;
1017
    }
1018
  }
1019

1020
  while (startPos >= 0) {
×
1021
    int32_t winCode = TSDB_CODE_SUCCESS;
×
1022
    code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId,
×
1023
                                       &curPoint, &prevPoint, &winCode);
1024
    QUERY_CHECK_CODE(code, lino, _end);
×
1025

1026
    SWinKey key = {.ts = curPoint.winKey.win.skey, .groupId = groupId};
×
1027
    if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) {
×
1028
      if (tSimpleHashGet(pAggSup->pResultRows, &key, sizeof(SWinKey)) == NULL) {
×
1029
        void* pTmp = taosArrayPush(pInfo->pDelWins, &key);
×
1030
        QUERY_CHECK_NULL(pTmp, code, lino, _end, terrno);
×
1031
      }
1032
    }
1033

1034
    curPoint.pResPos->beUpdated = true;
×
1035
    code = tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SWinKey), &curPoint.pResPos, POINTER_BYTES);
×
1036
    QUERY_CHECK_CODE(code, lino, _end);
×
1037

1038
    code =
1039
        setIntervalSliceOutputBuf(&pInfo->streamAggSup, &curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
×
1040
    QUERY_CHECK_CODE(code, lino, _end);
×
1041
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curWin, 1);
×
1042
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
×
1043
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
1044
    QUERY_CHECK_CODE(code, lino, _end);
×
1045

1046
    int32_t prevEndPos = startPos;
×
1047
    startPos = getNextQualifiedFinalWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos);
×
1048
  }
1049

1050
  if (!pInfo->destHasPrimaryKey && !isHistoryOperator(&pInfo->basic)) {
×
1051
    code = closeNonblockIntervalWindow(pAggSup->pResultRows, &pInfo->twAggSup, &pInfo->interval, pInfo->pUpdated,
×
1052
                                       pTaskInfo);
1053
    QUERY_CHECK_CODE(code, lino, _end);
×
1054
  } else if ((isHistoryOperator(&pInfo->basic) || isRecalculateOperator(&pInfo->basic)) &&
×
1055
             tSimpleHashGetSize(pAggSup->pResultRows) > pOperator->resultInfo.capacity * 10) {
×
1056
    code = copyNewResult(&pAggSup->pResultRows, pInfo->pUpdated, winPosCmprImpl);
×
1057
    QUERY_CHECK_CODE(code, lino, _end);
×
1058
  }
1059

1060
  if (!isHistoryOperator(&pInfo->basic) && !isRecalculateOperator(&pInfo->basic)) {
×
1061
    code = checkAndSaveWinStateToDisc(0, pInfo->pUpdated, 0, pInfo->basic.pTsDataState, &pInfo->streamAggSup, &pInfo->interval);
×
1062
    QUERY_CHECK_CODE(code, lino, _end);
×
1063
  }
1064

1065
_end:
×
1066
  if (code != TSDB_CODE_SUCCESS) {
×
1067
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1068
  }
1069
  return code;
×
1070
}
1071

1072
int32_t createFinalIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
×
1073
                                             SReadHandle* pHandle, SOperatorInfo** ppOptInfo) {
1074
  int32_t code = TSDB_CODE_SUCCESS;
×
1075
  int32_t lino = 0;
×
1076
  code = createStreamIntervalSliceOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, ppOptInfo);
×
1077
  QUERY_CHECK_CODE(code, lino, _end);
×
1078

1079
  SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)(*ppOptInfo)->info;
×
1080
  pInfo->nbSup.pWindowAggFn = doStreamFinalntervalNonblockAggImpl;
×
1081
  pInfo->nbSup.numOfChild = pHandle->numOfVgroups;
×
1082
  pInfo->streamAggSup.pScanBlock->info.type = STREAM_RETRIEVE;
×
1083
  pInfo->nbSup.tsOfKeep = INT64_MIN;
×
1084
  pInfo->twAggSup.waterMark = 0;
×
1085
  setFinalOperatorFlag(&pInfo->basic);
×
1086
  adjustDownstreamBasicInfo(downstream, &pInfo->basic);
×
1087

1088
_end:
×
1089
  if (code != TSDB_CODE_SUCCESS) {
×
1090
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1091
  }
1092
  return code;
×
1093
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc