• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4961

09 Feb 2026 01:16AM UTC coverage: 66.798% (-0.08%) from 66.88%
#4961

push

travis-ci

web-flow
docs: add support for recording STMT to CSV files (#34276)

* docs: add support for recording STMT to CSV files

* docs: update version for STMT recording feature in CSV files

205534 of 307696 relevant lines covered (66.8%)

127069311.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.98
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47

48
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
2,147,483,647✔
49
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
50
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
51
                                      SExecTaskInfo* pTaskInfo) {
52
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
2,147,483,647✔
53
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
54

55
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,147,483,647✔
56
    *pResult = NULL;
238✔
57
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
58
    return pTaskInfo->code;
×
59
  }
60

61
  // set time window for current result
62
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
2,147,483,647✔
63
  *pResult = pResultRow;
2,147,483,647✔
64
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,147,483,647✔
65
}
66

67
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, int32_t rowIndex, uint64_t groupId) {
2,147,483,647✔
68
  pRowSup->win.ekey = ts;
2,147,483,647✔
69
  pRowSup->prevTs = ts;
2,147,483,647✔
70
  pRowSup->groupId = groupId;
2,147,483,647✔
71
  pRowSup->numOfRows += 1;
2,147,483,647✔
72
  if (hasContinuousNullRows(pRowSup)) {
2,147,483,647✔
73
    // rows having null state col are wrapped by rows of same state
74
    // these rows can be counted into current window
75
    pRowSup->numOfRows += pRowSup->numNullRows;
159,471,971✔
76
    resetNumNullRows(pRowSup);
159,471,971✔
77
  }
78
}
2,147,483,647✔
79

80
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
833,321,766✔
81
  pRowSup->startRowIndex = rowIndex;
833,321,766✔
82
  pRowSup->numOfRows = 0;
833,322,574✔
83
  pRowSup->win.skey = tsList[rowIndex];
833,322,574✔
84
  pRowSup->groupId = groupId;
833,321,304✔
85
  resetNumNullRows(pRowSup);
833,320,842✔
86
}
833,322,170✔
87

88
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
89
                                            int32_t order, int64_t* pData) {
90
  int32_t forwardRows = 0;
2,147,483,647✔
91

92
  if (order == TSDB_ORDER_ASC) {
×
93
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
2,147,483,647✔
94
    if (end >= 0) {
2,147,483,647✔
95
      forwardRows = end;
2,147,483,647✔
96

97
      while (pData[end + pos] == ekey) {
2,147,483,647✔
98
        forwardRows += 1;
1,565,487,374✔
99
        ++pos;
1,565,487,374✔
100
      }
101
    }
102
  } else {
103
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
375,870,131✔
104
    if (end >= 0) {
375,319,230✔
105
      forwardRows = end;
375,681,020✔
106

107
      while (pData[end + pos] == ekey) {
741,508,002✔
108
        forwardRows += 1;
365,826,982✔
109
        ++pos;
365,826,982✔
110
      }
111
    }
112
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
113
    //    if (end >= 0) {
114
    //      forwardRows = pos - end;
115
    //
116
    //      if (pData[end] == ekey) {
117
    //        forwardRows += 1;
118
    //      }
119
    //    }
120
  }
121

122
  return forwardRows;
2,147,483,647✔
123
}
124

125
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
2,147,483,647✔
126
  int32_t midPos = -1;
2,147,483,647✔
127
  int32_t numOfRows;
128

129
  if (num <= 0) {
2,147,483,647✔
130
    return -1;
×
131
  }
132

133
  TSKEY*  keyList = (TSKEY*)pValue;
2,147,483,647✔
134
  int32_t firstPos = 0;
2,147,483,647✔
135
  int32_t lastPos = num - 1;
2,147,483,647✔
136

137
  if (order == TSDB_ORDER_DESC) {
2,147,483,647✔
138
    // find the first position which is smaller than the key
139
    while (1) {
140
      if (key >= keyList[firstPos]) return firstPos;
459,144,522✔
141
      if (key == keyList[lastPos]) return lastPos;
93,405,390✔
142

143
      if (key < keyList[lastPos]) {
92,938,333✔
144
        lastPos += 1;
9,658,616✔
145
        if (lastPos >= num) {
9,658,616✔
146
          return -1;
×
147
        } else {
148
          return lastPos;
9,658,616✔
149
        }
150
      }
151

152
      numOfRows = lastPos - firstPos + 1;
83,302,454✔
153
      midPos = (numOfRows >> 1) + firstPos;
83,302,454✔
154

155
      if (key < keyList[midPos]) {
83,302,454✔
156
        firstPos = midPos + 1;
6,057,372✔
157
      } else if (key > keyList[midPos]) {
77,283,490✔
158
        lastPos = midPos - 1;
76,752,843✔
159
      } else {
160
        break;
527,764✔
161
      }
162
    }
163

164
  } else {
165
    // find the first position which is bigger than the key
166
    while (1) {
167
      if (key <= keyList[firstPos]) return firstPos;
2,147,483,647✔
168
      if (key == keyList[lastPos]) return lastPos;
2,147,483,647✔
169

170
      if (key > keyList[lastPos]) {
2,147,483,647✔
171
        lastPos = lastPos + 1;
2,147,483,647✔
172
        if (lastPos >= num)
2,147,483,647✔
173
          return -1;
660,492✔
174
        else
175
          return lastPos;
2,147,483,647✔
176
      }
177

178
      numOfRows = lastPos - firstPos + 1;
2,147,483,647✔
179
      midPos = (numOfRows >> 1u) + firstPos;
2,147,483,647✔
180

181
      if (key < keyList[midPos]) {
2,147,483,647✔
182
        lastPos = midPos - 1;
2,147,483,647✔
183
      } else if (key > keyList[midPos]) {
1,166,207,679✔
184
        firstPos = midPos + 1;
730,276,753✔
185
      } else {
186
        break;
436,018,209✔
187
      }
188
    }
189
  }
190

191
  return midPos;
436,545,973✔
192
}
193

194
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
2,147,483,647✔
195
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
196
  int32_t num = -1;
2,147,483,647✔
197
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
198

199
  if (order == TSDB_ORDER_ASC) {
2,147,483,647✔
200
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
2,147,483,647✔
201
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
2,147,483,647✔
202
      if (item != NULL) {
2,147,483,647✔
203
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
204
      }
205
    } else {
206
      num = pDataBlockInfo->rows - startPos;
15,675,475✔
207
      if (item != NULL) {
18,380,454✔
208
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
209
      }
210
    }
211
  } else {  // desc
212
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
369,803,680✔
213
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
375,787,808✔
214
      if (item != NULL) {
375,467,483✔
215
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
216
      }
217
    } else {
218
      num = pDataBlockInfo->rows - startPos;
133,296✔
219
      if (item != NULL) {
617,428✔
220
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
221
      }
222
    }
223
  }
224

225
  return num;
2,147,483,647✔
226
}
227

228
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
9,910,594✔
229
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
230
  SqlFunctionCtx* pCtx = pSup->pCtx;
9,910,594✔
231

232
  int32_t index = 1;
9,910,594✔
233
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
29,740,693✔
234
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
19,830,099✔
235
      pCtx[k].start.key = INT64_MIN;
9,919,505✔
236
      continue;
9,919,505✔
237
    }
238

239
    SFunctParam*     pParam = &pCtx[k].param[0];
9,910,594✔
240
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
9,910,594✔
241

242
    double v1 = 0, v2 = 0, v = 0;
9,910,594✔
243
    if (prevRowIndex == -1) {
9,910,594✔
244
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
245
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
×
246
    } else {
247
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
9,910,594✔
248
                     typeGetTypeModFromColInfo(&pColInfo->info));
249
    }
250

251
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
9,910,594✔
252
                   typeGetTypeModFromColInfo(&pColInfo->info));
253

254
#if 0
255
    if (functionId == FUNCTION_INTERP) {
256
      if (type == RESULT_ROW_START_INTERP) {
257
        pCtx[k].start.key = prevTs;
258
        pCtx[k].start.val = v1;
259

260
        pCtx[k].end.key = curTs;
261
        pCtx[k].end.val = v2;
262

263
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
264
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
265
          if (prevRowIndex == -1) {
266
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
267
          } else {
268
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
269
          }
270

271
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
272
        }
273
      }
274
    } else if (functionId == FUNCTION_TWA) {
275
#endif
276

277
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
9,910,594✔
278
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
9,910,594✔
279
    SPoint point = (SPoint){.key = windowKey, .val = &v};
9,910,594✔
280

281
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
9,910,594✔
282
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
9,853,867✔
283
    }
284

285
    if (type == RESULT_ROW_START_INTERP) {
9,910,594✔
286
      pCtx[k].start.key = point.key;
4,925,908✔
287
      pCtx[k].start.val = v;
4,925,908✔
288
    } else {
289
      pCtx[k].end.key = point.key;
4,984,686✔
290
      pCtx[k].end.val = v;
4,984,686✔
291
    }
292

293
    index += 1;
9,910,594✔
294
  }
295
#if 0
296
  }
297
#endif
298
}
9,910,594✔
299

300
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
638,612✔
301
  if (type == RESULT_ROW_START_INTERP) {
638,612✔
302
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,022,571✔
303
      pCtx[k].start.key = INT64_MIN;
673,876✔
304
    }
305
  } else {
306
    for (int32_t k = 0; k < numOfOutput; ++k) {
870,156✔
307
      pCtx[k].end.key = INT64_MIN;
580,239✔
308
    }
309
  }
310
}
638,612✔
311

312
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
5,274,603✔
313
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
314
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
5,274,603✔
315

316
  TSKEY curTs = tsCols[pos];
5,274,603✔
317

318
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
5,274,603✔
319
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
5,274,603✔
320

321
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
322
  // start exactly from this point, no need to do interpolation
323
  TSKEY key = ascQuery ? win->skey : win->ekey;
5,274,603✔
324
  if (key == curTs) {
5,274,603✔
325
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
342,821✔
326
    return true;
342,821✔
327
  }
328

329
  // it is the first time window, no need to do interpolation
330
  if (pTsKey->isNull && pos == 0) {
4,931,782✔
331
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
5,874✔
332
  } else {
333
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
4,925,908✔
334
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
4,925,908✔
335
                              RESULT_ROW_START_INTERP, pSup);
336
  }
337

338
  return true;
4,931,782✔
339
}
340

341
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
5,274,603✔
342
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
343
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
344
  int32_t code = TSDB_CODE_SUCCESS;
5,274,603✔
345
  int32_t lino = 0;
5,274,603✔
346
  int32_t order = pInfo->binfo.inputTsOrder;
5,274,603✔
347

348
  TSKEY actualEndKey = tsCols[endRowIndex];
5,274,603✔
349
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
5,274,603✔
350

351
  // not ended in current data block, do not invoke interpolation
352
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
5,274,603✔
353
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
15,195✔
354
    (*pRes) = false;
15,195✔
355
    return code;
15,195✔
356
  }
357

358
  // there is actual end point of current time window, no interpolation needs
359
  if (key == actualEndKey) {
5,259,408✔
360
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
274,722✔
361
    (*pRes) = true;
274,722✔
362
    return code;
274,722✔
363
  }
364

365
  if (nextRowIndex < 0) {
4,984,686✔
366
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
367
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
368
  }
369

370
  TSKEY nextKey = tsCols[nextRowIndex];
4,984,686✔
371
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
4,984,686✔
372
                            RESULT_ROW_END_INTERP, pSup);
373
  (*pRes) = true;
4,984,686✔
374
  return code;
4,984,686✔
375
}
376

377
bool inCalSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
2,147,483,647✔
378
  if (pInterval->interval != pInterval->sliding &&
2,147,483,647✔
379
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
74,335,666✔
380
    return false;
×
381
  }
382

383
  return true;
2,147,483,647✔
384
}
385

386
bool inSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, const SDataBlockInfo* pBlockInfo) {
2,147,483,647✔
387
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
2,147,483,647✔
388
}
389

390
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
2,147,483,647✔
391
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
392
  bool ascQuery = (order != TSDB_ORDER_DESC);
2,147,483,647✔
393

394
  int32_t precision = pInterval->precision;
2,147,483,647✔
395
  getNextTimeWindow(pInterval, pNext, order);
2,147,483,647✔
396

397
  // next time window is not in current block
398
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
2,147,483,647✔
399
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
2,147,483,647✔
400
    return -1;
7,619,435✔
401
  }
402

403
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
2,147,483,647✔
404
    return -1;
×
405
  }
406

407
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
2,147,483,647✔
408
  int32_t startPos = 0;
2,147,483,647✔
409

410
  // tumbling time window query, a special case of sliding time window query
411
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
2,147,483,647✔
412
    startPos = prevPosition + 1;
2,147,483,647✔
413
  } else {
414
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
72,541,543✔
415
      startPos = 0;
9,468,405✔
416
    } else {
417
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
64,883,532✔
418
    }
419
  }
420
  if(startPos < 0 || startPos >= pDataBlockInfo->rows) {
2,147,483,647✔
421
    return -1;
2,147,483,647✔
422
  }
423

424
  /* interp query with fill should not skip time window */
425
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
426
  //    return startPos;
427
  //  }
428

429
  /*
430
   * This time window does not cover any data, try next time window,
431
   * this case may happen when the time window is too small
432
   */
433
  if (primaryKeys != NULL) {
2,147,483,647✔
434
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
2,147,483,647✔
435
      TSKEY next = primaryKeys[startPos];
1,514,302,142✔
436
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
1,515,160,504✔
437
        pNext->skey = taosTimeTruncate(next, pInterval);
1,833,629✔
438
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
1,544✔
439
      } else {
440
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
1,515,147,398✔
441
        pNext->skey = pNext->ekey - pInterval->interval + 1;
1,518,884,814✔
442
      }
443
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
639,003,166✔
444
      TSKEY next = primaryKeys[startPos];
369,799,130✔
445
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
370,410,537✔
446
        pNext->skey = taosTimeTruncate(next, pInterval);
216,937✔
447
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
448
      } else {
449
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
370,573,280✔
450
        pNext->ekey = pNext->skey + pInterval->interval - 1;
370,349,186✔
451
      }
452
    }
453
  }
454

455
  return startPos;
2,147,483,647✔
456
}
457

458
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
15,823,809✔
459
  if (type == RESULT_ROW_START_INTERP) {
15,823,809✔
460
    return pResult->startInterp == true;
5,274,603✔
461
  } else {
462
    return pResult->endInterp == true;
10,549,206✔
463
  }
464
}
465

466
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
10,534,011✔
467
  if (type == RESULT_ROW_START_INTERP) {
10,534,011✔
468
    pResult->startInterp = true;
5,274,603✔
469
  } else {
470
    pResult->endInterp = true;
5,259,408✔
471
  }
472
}
10,534,011✔
473

474
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
2,147,483,647✔
475
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
476
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
477
  int32_t lino = 0;
2,147,483,647✔
478
  if (!pInfo->timeWindowInterpo) {
2,147,483,647✔
479
    return code;
2,147,483,647✔
480
  }
481

482
  if (pBlock == NULL) {
1,549,749✔
483
    code = TSDB_CODE_INVALID_PARA;
×
484
    return code;
×
485
  }
486

487
  if (pBlock->pDataBlock == NULL) {
1,549,749✔
488
    return code;
×
489
  }
490

491
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
5,274,603✔
492

493
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
5,274,603✔
494
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
5,274,603✔
495
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
5,274,603✔
496
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
5,274,603✔
497
    if (interp) {
5,274,603✔
498
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
5,274,603✔
499
    }
500
  } else {
501
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
×
502
  }
503

504
  // point interpolation does not require the end key time window interpolation.
505
  // interpolation query does not generate the time window end interpolation
506
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
5,274,603✔
507
  if (!done) {
5,274,603✔
508
    int32_t endRowIndex = startPos + forwardRows - 1;
5,274,603✔
509
    int32_t nextRowIndex = endRowIndex + 1;
5,274,603✔
510

511
    // duplicated ts row does not involve in the interpolation of end value for current time window
512
    int32_t x = endRowIndex;
5,274,603✔
513
    while (x > 0) {
5,289,969✔
514
      if (tsCols[x] == tsCols[x - 1]) {
5,277,941✔
515
        x -= 1;
15,366✔
516
      } else {
517
        endRowIndex = x;
5,262,575✔
518
        break;
5,262,575✔
519
      }
520
    }
521

522
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
5,274,603✔
523
    bool  interp = false;
5,274,603✔
524
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
5,274,603✔
525
                                           win, &interp);
526
    QUERY_CHECK_CODE(code, lino, _end);
5,274,603✔
527
    if (interp) {
5,274,603✔
528
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
5,259,408✔
529
    }
530
  } else {
531
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
532
  }
533

534
_end:
5,274,603✔
535
  if (code != TSDB_CODE_SUCCESS) {
5,274,603✔
536
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
537
  }
538
  return code;
5,274,603✔
539
}
540

541
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
16,923✔
542
  if (pBlock->pDataBlock == NULL) {
16,923✔
543
    return;
×
544
  }
545

546
  size_t num = taosArrayGetSize(pPrevKeys);
16,923✔
547
  for (int32_t k = 0; k < num; ++k) {
50,769✔
548
    SColumn* pc = taosArrayGet(pCols, k);
33,846✔
549

550
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
33,846✔
551

552
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
33,846✔
553
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
33,846✔
554
      if (colDataIsNull_s(pColInfo, i)) {
67,692✔
555
        continue;
×
556
      }
557

558
      char* val = colDataGetData(pColInfo, i);
33,846✔
559
      if (IS_VAR_DATA_TYPE(pkey->type)) {
33,846✔
560
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
561
          memcpy(pkey->pData, val, blobDataTLen(val));
×
562
        } else {
563
          memcpy(pkey->pData, val, varDataTLen(val));
×
564
        }
565
      } else {
566
        memcpy(pkey->pData, val, pkey->bytes);
33,846✔
567
      }
568

569
      break;
33,846✔
570
    }
571
  }
572
}
573

574
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
16,923✔
575
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
576
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
16,923✔
577

578
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
16,923✔
579
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
16,923✔
580

581
  int32_t startPos = 0;
16,923✔
582
  int32_t numOfOutput = pSup->numOfExprs;
16,923✔
583

584
  SResultRow* pResult = NULL;
16,923✔
585

586
  while (1) {
×
587
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
16,923✔
588
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
16,923✔
589
    uint64_t            groupId = pOpenWin->groupId;
16,923✔
590
    SResultRowPosition* p1 = &pOpenWin->pos;
16,923✔
591
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
16,923✔
592
      break;
16,923✔
593
    }
594

595
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
596
    if (NULL == pr) {
×
597
      T_LONG_JMP(pTaskInfo->env, terrno);
×
598
    }
599

600
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
601
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
602
      T_LONG_JMP(pTaskInfo->env, terrno);
×
603
    }
604

605
    if (pr->closed) {
×
606
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
607
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
608
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
609
        T_LONG_JMP(pTaskInfo->env, terrno);
×
610
      }
611
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
612
      taosMemoryFree(pNode);
×
613
      continue;
×
614
    }
615

616
    STimeWindow w = pr->win;
×
617
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
618
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
619
    if (ret != TSDB_CODE_SUCCESS) {
×
620
      T_LONG_JMP(pTaskInfo->env, ret);
×
621
    }
622

623
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
624
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
625
      T_LONG_JMP(pTaskInfo->env, terrno);
×
626
    }
627

628
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
629
    if (!pTsKey) {
×
630
      pTaskInfo->code = terrno;
×
631
      T_LONG_JMP(pTaskInfo->env, terrno);
×
632
    }
633

634
    int64_t prevTs = *(int64_t*)pTsKey->pData;
×
635
    if (groupId == pBlock->info.id.groupId) {
×
636
      TSKEY curTs = pBlock->info.window.skey;
×
637
      if (tsCols != NULL) {
×
638
        curTs = tsCols[startPos];
×
639
      }
640
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
641
                                RESULT_ROW_END_INTERP, pSup);
642
    }
643

644
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
645
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
646

647
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
648
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
649
                                          pBlock->info.rows, numOfExprs);
×
650
    if (ret != TSDB_CODE_SUCCESS) {
×
651
      T_LONG_JMP(pTaskInfo->env, ret);
×
652
    }
653

654
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
655
      closeResultRow(pr);
×
656
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
657
      taosMemoryFree(pNode);
×
658
    } else {  // the remains are can not be closed yet.
659
      break;
×
660
    }
661
  }
662
}
16,923✔
663

664
static bool tsKeyCompFn(void* l, void* r, void* param) {
1,743,461,521✔
665
  TSKEY*                    lTS = (TSKEY*)l;
1,743,461,521✔
666
  TSKEY*                    rTS = (TSKEY*)r;
1,743,461,521✔
667
  SIntervalAggOperatorInfo* pInfo = param;
1,743,461,521✔
668
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
1,743,461,521✔
669
}
670

671
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
275,504,970✔
672
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
275,504,970✔
673
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
275,714,065✔
674
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
277,750,489✔
675
}
676

677
/**
678
 * @brief check if cur window should be filtered out by limit info
679
 * @retval true if should be filtered out
680
 * @retval false if not filtering out
681
 * @note If no limit info, we skip filtering.
682
 *       If input/output ts order mismatch, we skip filtering too.
683
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
684
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
685
 *       every tuple in every block.
686
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
687
 */
688
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
2,147,483,647✔
689
                                  SExecTaskInfo* pTaskInfo) {
690
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
691
  int32_t lino = 0;
2,147,483,647✔
692
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
2,147,483,647✔
693
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
655,446,678✔
694
      // if input/output ts order mismatch, no filter
695
  ) {
696
    return false;
2,147,483,647✔
697
  }
698

699
  if (pOperatorInfo->limit == 0) return true;
278,493,746✔
700

701
  if (pOperatorInfo->pBQ == NULL) {
278,553,606✔
702
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
334,407✔
703
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
337,602✔
704
  }
705

706
  bool shouldFilter = false;
278,196,476✔
707
  // if BQ has been full, compare it with top of BQ
708
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
278,196,476✔
709
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
77,765,175✔
710
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
77,755,590✔
711
  }
712
  if (shouldFilter) {
277,025,187✔
713
    return true;
894,352✔
714
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
276,130,835✔
715
    return false;
114,377,627✔
716
  }
717

718
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
719
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
163,708,759✔
720
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
162,635,594✔
721

722
  *((TSKEY*)node.data) = win->skey;
162,635,594✔
723

724
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
162,686,714✔
725
    taosMemoryFree(node.data);
×
726
    return true;
×
727
  }
728

729
_end:
163,393,164✔
730
  if (code != TSDB_CODE_SUCCESS) {
163,333,524✔
731
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
84,845✔
732
    pTaskInfo->code = code;
84,845✔
733
    T_LONG_JMP(pTaskInfo->env, code);
×
734
  }
735
  return false;
163,248,679✔
736
}
737

738
int32_t getNumOfRowsInTimeWinUnsorted(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, STimeWindow* win,
2,147,483,647✔
739
                                      int32_t startPos) {
740
  int32_t rows = pDataBlockInfo->rows;
2,147,483,647✔
741
  for (int32_t i = startPos; i < pDataBlockInfo->rows; ++i) {
2,147,483,647✔
742
    if (pPrimaryColumn[i] >= win->skey && pPrimaryColumn[i] <= win->ekey) {
2,147,483,647✔
743
      continue;
2,147,483,647✔
744
    } else {
745
      return i - startPos;
2,147,483,647✔
746
    }
747
  }
748
  return rows - startPos;
103,364,329✔
749
}
750

751
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
110,517,802✔
752
                            int32_t scanFlag) {
753
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
110,517,802✔
754
  bool                      sorted = pInfo->binfo.inputTsOrder == ORDER_ASC || pInfo->binfo.inputTsOrder == ORDER_DESC;
110,570,159✔
755

756
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
110,558,177✔
757
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
110,556,979✔
758

759
  int32_t     startPos = 0;
110,560,103✔
760
  int32_t     numOfOutput = pSup->numOfExprs;
110,560,103✔
761
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
110,560,854✔
762
  uint64_t    tableGroupId = pBlock->info.id.groupId;
110,468,082✔
763
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
110,485,759✔
764
  SResultRow* pResult = NULL;
110,513,671✔
765
  TSKEY       ts = sorted ? getStartTsKey(&pBlock->info.window, tsCols) : tsCols[startPos];
110,484,074✔
766

767
  if (tableGroupId != pInfo->curGroupId) {
110,489,463✔
768
    pInfo->handledGroupNum += 1;
12,544,146✔
769
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
12,540,347✔
770
      return true;
33,189✔
771
    } else {
772
      pInfo->curGroupId = tableGroupId;
12,508,157✔
773
      destroyBoundedQueue(pInfo->pBQ);
12,504,168✔
774
      pInfo->pBQ = NULL;
12,500,954✔
775
    }
776
  }
777

778
  STimeWindow win =
110,444,692✔
779
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
110,414,344✔
780
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
110,528,202✔
781

782
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
109,766,445✔
783
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
784
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
109,809,644✔
785
    T_LONG_JMP(pTaskInfo->env, ret);
24,164✔
786
  }
787

788
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
109,785,617✔
789
  int32_t forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey,
7,822,780✔
790
                                                          NULL, pInfo->binfo.inputTsOrder)
791
                               : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &win, startPos);
109,785,617✔
792

793
  // prev time window not interpolation yet.
794
  if (pInfo->timeWindowInterpo) {
109,822,164✔
795
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
16,923✔
796
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
16,923✔
797

798
    // restore current time window
799
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
16,923✔
800
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
801
    if (ret != TSDB_CODE_SUCCESS) {
16,923✔
802
      T_LONG_JMP(pTaskInfo->env, ret);
×
803
    }
804

805
    // window start key interpolation
806
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
16,923✔
807
    if (ret != TSDB_CODE_SUCCESS) {
16,923✔
808
      T_LONG_JMP(pTaskInfo->env, ret);
×
809
    }
810
  }
811
  // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
812
  //   win.skey, win.ekey, startPos, forwardRows);
813
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
109,824,991✔
814
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
109,797,832✔
815
                                        pBlock->info.rows, numOfOutput);
109,788,741✔
816
  if (ret != TSDB_CODE_SUCCESS) {
109,703,062✔
817
    T_LONG_JMP(pTaskInfo->env, ret);
×
818
  }
819

820
  doCloseWindow(pResultRowInfo, pInfo, pResult);
109,703,062✔
821

822
  STimeWindow nextWin = win;
109,716,999✔
823
  int32_t rows = pBlock->info.rows;
109,766,028✔
824

825
  while (startPos < pBlock->info.rows) {
2,147,483,647✔
826
    if (sorted) {
2,147,483,647✔
827
      startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, forwardRows - 1 + startPos,
2,108,481,558✔
828
                                        pInfo->binfo.inputTsOrder);
829
      if (startPos < 0) {
2,116,230,764✔
830
        break;
7,617,613✔
831
      }
832
    } else {
833
      pBlock->info.rows = forwardRows;
2,147,483,647✔
834
      int32_t newStartOff = forwardRows >= 1
2,147,483,647✔
835
                                ? getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols + startPos,
2,147,483,647✔
836
                                                         forwardRows - 1, pInfo->binfo.inputTsOrder)
837
                                : -1;
2,147,483,647✔
838
      pBlock->info.rows = rows;
2,147,483,647✔
839
      if (newStartOff >= 0) {
2,147,483,647✔
840
        startPos += newStartOff;
44,494,149✔
841
      } else if ((startPos += forwardRows) < pBlock->info.rows) {
2,147,483,647✔
842
        getInitialStartTimeWindow(&pInfo->interval, tsCols[startPos], &nextWin, true);
2,147,483,647✔
843
      }
844
      if (startPos >= pBlock->info.rows) {
2,147,483,647✔
845
        break;
101,989,907✔
846
      }
847
    }
848

849
    if (filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
2,147,483,647✔
850
      break;
212,148✔
851
    }
852

853
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
2,147,483,647✔
854
    forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,102,431,653✔
855
                                                    pInfo->binfo.inputTsOrder)
856
                         : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &nextWin, startPos);
2,147,483,647✔
857
    if (forwardRows == 0) continue;
2,147,483,647✔
858

859
    // null data, failed to allocate more memory buffer
860
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,147,483,647✔
861
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
862
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
2,147,483,647✔
863
      T_LONG_JMP(pTaskInfo->env, code);
×
864
    }
865

866
    // window start(end) key interpolation
867
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
2,147,483,647✔
868
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
869
      T_LONG_JMP(pTaskInfo->env, code);
×
870
    }
871
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
872
#if 0
873
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
874
        (!ascScan && ekey >= pBlock->info.window.skey)) {
875
      // window start(end) key interpolation
876
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
877
    } else if (pInfo->timeWindowInterpo) {
878
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
879
    }
880
#endif
881
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
2,147,483,647✔
882
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,147,483,647✔
883
                                          pBlock->info.rows, numOfOutput);
2,147,483,647✔
884
    if (ret != TSDB_CODE_SUCCESS) {
2,147,483,647✔
885
      T_LONG_JMP(pTaskInfo->env, ret);
×
886
    }
887
    doCloseWindow(pResultRowInfo, pInfo, pResult);
2,147,483,647✔
888
  }
889

890
  if (pInfo->timeWindowInterpo) {
112,998,378✔
891
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
16,923✔
892
  }
893
  return false;
109,804,506✔
894
}
895

896
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
2,147,483,647✔
897
  // current result is done in computing final results.
898
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
2,147,483,647✔
899
    closeResultRow(pResult);
5,259,408✔
900
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
5,259,408✔
901
    taosMemoryFree(pNode);
5,259,408✔
902
  }
903
}
2,147,483,647✔
904

905
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
16,923✔
906
                                       SExecTaskInfo* pTaskInfo) {
907
  int32_t         code = TSDB_CODE_SUCCESS;
16,923✔
908
  int32_t         lino = 0;
16,923✔
909
  SOpenWindowInfo openWin = {0};
16,923✔
910
  openWin.pos.pageId = pResult->pageId;
16,923✔
911
  openWin.pos.offset = pResult->offset;
16,923✔
912
  openWin.groupId = groupId;
16,923✔
913
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
16,923✔
914
  if (pn == NULL) {
16,923✔
915
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
16,923✔
916
    QUERY_CHECK_CODE(code, lino, _end);
16,923✔
917
    return openWin.pos;
16,923✔
918
  }
919

920
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
×
921
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
×
922
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
923
    QUERY_CHECK_CODE(code, lino, _end);
×
924
  }
925

926
_end:
×
927
  if (code != TSDB_CODE_SUCCESS) {
×
928
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
929
    pTaskInfo->code = code;
×
930
    T_LONG_JMP(pTaskInfo->env, code);
×
931
  }
932
  return openWin.pos;
×
933
}
934

935
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
113,967,829✔
936
  TSKEY* tsCols = NULL;
113,967,829✔
937

938
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
113,967,829✔
939
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
114,031,745✔
940
    if (!pColDataInfo) {
113,992,178✔
941
      pTaskInfo->code = terrno;
×
942
      T_LONG_JMP(pTaskInfo->env, terrno);
×
943
    }
944

945
    tsCols = (int64_t*)pColDataInfo->pData;
113,992,178✔
946
    if (tsCols[0] == 0) {
113,927,244✔
947
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
383✔
948
            tsCols[pBlock->info.rows - 1]);
949
    }
950

951
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
114,043,795✔
952
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
7,936,441✔
953
      if (code != TSDB_CODE_SUCCESS) {
7,909,465✔
954
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
955
        pTaskInfo->code = code;
×
956
        T_LONG_JMP(pTaskInfo->env, code);
×
957
      }
958
    }
959
  }
960

961
  return tsCols;
113,924,468✔
962
}
963

964
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
24,013,838✔
965
  if (OPTR_IS_OPENED(pOperator)) {
24,013,838✔
966
    return TSDB_CODE_SUCCESS;
20,004,488✔
967
  }
968

969
  int32_t        code = TSDB_CODE_SUCCESS;
4,012,807✔
970
  int32_t        lino = 0;
4,012,807✔
971
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,012,807✔
972
  SOperatorInfo* downstream = pOperator->pDownstream[0];
4,012,094✔
973

974
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
4,008,316✔
975
  SExprSupp*                pSup = &pOperator->exprSupp;
4,009,428✔
976

977
  int32_t scanFlag = MAIN_SCAN;
4,003,364✔
978
  int64_t st = taosGetTimestampUs();
4,003,874✔
979

980
  pInfo->cleanGroupResInfo = false;
4,003,874✔
981
  while (1) {
110,527,844✔
982
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
114,528,148✔
983
    if (pBlock == NULL) {
114,432,498✔
984
      break;
3,944,660✔
985
    }
986

987
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
110,487,838✔
988

989
    if (pInfo->scalarSupp.pExprInfo != NULL) {
110,614,338✔
990
      SExprSupp* pExprSup = &pInfo->scalarSupp;
9,977,477✔
991
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
9,978,056✔
992
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
9,977,477✔
993
      QUERY_CHECK_CODE(code, lino, _end);
9,973,154✔
994
    }
995

996
    // the pDataBlock are always the same one, no need to call this again
997
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
110,574,005✔
998
    QUERY_CHECK_CODE(code, lino, _end);
110,580,399✔
999
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
110,580,399✔
1000
  }
1001

1002
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
3,977,849✔
1003
  QUERY_CHECK_CODE(code, lino, _end);
3,978,335✔
1004
  pInfo->cleanGroupResInfo = true;
3,978,335✔
1005

1006
  OPTR_SET_OPENED(pOperator);
3,978,333✔
1007

1008
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3,978,086✔
1009

1010
_end:
4,014,269✔
1011
  if (code != TSDB_CODE_SUCCESS) {
4,014,269✔
1012
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
33,735✔
1013
    pTaskInfo->code = code;
33,735✔
1014
    T_LONG_JMP(pTaskInfo->env, code);
33,735✔
1015
  }
1016
  return code;
3,980,534✔
1017
}
1018

1019
// start a new state window and record the start info
1020
void doKeepNewStateWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList,
2,147,483,647✔
1021
  int32_t rowIndex, uint64_t groupId, const EStateWinExtendOption* extendOption, bool hasPrevWin) {
1022
  pRowSup->groupId = groupId;
2,147,483,647✔
1023
  if (*extendOption == STATE_WIN_EXTEND_OPTION_DEFAULT ||
2,147,483,647✔
1024
      *extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
1,619,535,579✔
1025
    pRowSup->win.skey = tsList[rowIndex];
1,759,962,497✔
1026
    pRowSup->startRowIndex = rowIndex;
1,759,962,093✔
1027
    pRowSup->numOfRows = 0;  // does not include the current row yet
1,759,962,497✔
1028
  } else {
1029
    pRowSup->startRowIndex = hasContinuousNullRows(pRowSup) ?
2,147,483,647✔
1030
      rowIndex - pRowSup->numNullRows : rowIndex;
1,073,824,746✔
1031
    pRowSup->win.skey = hasPrevWin ?
1,073,824,746✔
1032
                        pRowSup->win.ekey + 1 : tsList[pRowSup->startRowIndex];
1,073,824,746✔
1033
    pRowSup->numOfRows = rowIndex - pRowSup->startRowIndex;
1,073,824,746✔
1034
  }
1035
  resetNumNullRows(pRowSup);
2,147,483,647✔
1036
}
2,147,483,647✔
1037

1038
// close a state window and record its end info
1039
// this functions is called when a new state row appears
1040
// @param rowIndex the index of the first row of next window
1041
void doKeepCurStateWindowEndInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, 
2,147,483,647✔
1042
                                 int32_t rowIndex,
1043
                                 const EStateWinExtendOption* extendOption,
1044
                                 bool hasNextWin) {
1045
  if (*extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
2,147,483,647✔
1046
      pRowSup->win.ekey = hasNextWin?
545,955,866✔
1047
                          tsList[rowIndex] - 1 : pRowSup->prevTs;
545,955,866✔
1048
      // continuous rows having null state col should be included in this window
1049
      pRowSup->numOfRows += hasContinuousNullRows(pRowSup) ?
1,091,911,732✔
1050
        pRowSup->numNullRows : 0;
545,955,866✔
1051
      resetNumNullRows(pRowSup);
545,955,866✔
1052
  }
1053
}
2,147,483,647✔
1054

1055
void doKeepStateWindowNullInfo(SWindowRowsSup* pRowSup, TSKEY nullRowTs) {
804,658,598✔
1056
  pRowSup->numNullRows += 1;
804,658,598✔
1057
  pRowSup->prevTs = nullRowTs;
804,658,598✔
1058
}
804,658,598✔
1059

1060
/**
1061
  @brief Process the closed state window and do aggregation on the tuples
1062
  within the window. Partial results are stored in the output buffer. If window
1063
  has no valid rows, return success.
1064
*/
1065
static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo,
2,147,483,647✔
1066
                                        SWindowRowsSup* pRowSup,
1067
                                        SExecTaskInfo* pTaskInfo,
1068
                                        SExprSupp* pSup,
1069
                                        int32_t numOfOutput) {
1070
  if (pRowSup->numOfRows == 0) {
2,147,483,647✔
1071
    // no valid rows in the window
1072
    return TSDB_CODE_SUCCESS;
15,559,420✔
1073
  }
1074
  int32_t     code = TSDB_CODE_SUCCESS;
2,147,483,647✔
1075
  int32_t     lino = 0;
2,147,483,647✔
1076
  SResultRow* pResult = NULL;
2,147,483,647✔
1077
  code = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win,
2,147,483,647✔
1078
    true, &pResult, pRowSup->groupId, pSup->pCtx, numOfOutput,
2,147,483,647✔
1079
    pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1080
  QUERY_CHECK_CODE(code, lino, _return);
2,147,483,647✔
1081

1082
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
2,147,483,647✔
1083
  pResult->nOrigRows += pRowSup->numOfRows;
2,147,483,647✔
1084
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx,
2,147,483,647✔
1085
    &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1086
    pRowSup->numOfRows, 0, numOfOutput);
1087
  QUERY_CHECK_CODE(code, lino, _return);
2,147,483,647✔
1088

1089
_return:
2,147,483,647✔
1090
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
1091
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
434✔
1092
  }
1093
  return code;
2,147,483,647✔
1094
}
1095

1096
// process a data block for state window aggregation
1097
// scan from startIndex to endIndex
1098
// numPartialCalcRows returns the number of rows that have been
1099
// partially calculated within the block
1100
static void doStateWindowAggImpl(SOperatorInfo* pOperator,
23,414,098✔
1101
                                 SStateWindowOperatorInfo* pInfo,
1102
                                 SSDataBlock* pBlock, int32_t* startIndex,
1103
                                 int32_t* endIndex,
1104
                                 int32_t* numPartialCalcRows) {
1105
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
23,414,098✔
1106
  SExprSupp*     pExprSup = &pOperator->exprSupp;
23,414,098✔
1107

1108
  SColumnInfoData* pStateColInfoData = 
1109
    taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
23,414,098✔
1110
  if (!pStateColInfoData) {
23,413,592✔
1111
    pTaskInfo->code = terrno;
×
1112
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1113
  }
1114
  uint64_t gid = pBlock->info.id.groupId;
23,413,592✔
1115
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
23,414,098✔
1116
  int32_t bytes = pStateColInfoData->info.bytes;
23,414,098✔
1117

1118
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock,
23,414,098✔
1119
                                               pInfo->tsSlotId);
23,414,098✔
1120
  if (NULL == pColInfoData) {
23,413,592✔
1121
    pTaskInfo->code = terrno;
×
1122
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1123
  }
1124
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
23,413,592✔
1125

1126
  struct SColumnDataAgg* pAgg = (pBlock->pBlockAgg != NULL) ?
23,413,592✔
1127
                                &pBlock->pBlockAgg[pInfo->stateCol.slotId] :
23,413,592✔
1128
                                NULL;
1129
  EStateWinExtendOption  extendOption = pInfo->extendOption;
23,413,592✔
1130
  SWindowRowsSup*        pRowSup = &pInfo->winSup;
23,413,592✔
1131

1132
  if (pRowSup->groupId != gid) {
23,414,098✔
1133
    /*
1134
      group changed, process the previous group's unclosed state window first
1135
    */
1136
    doKeepCurStateWindowEndInfo(pRowSup, tsList, 0, &extendOption, false);
10,235,870✔
1137
    int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
10,235,870✔
1138
                                            pExprSup, numOfOutput);
1139
    if (TSDB_CODE_SUCCESS != code) T_LONG_JMP(pTaskInfo->env, code);
10,235,870✔
1140
    *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
10,235,870✔
1141

1142
    /*
1143
      unhandled null rows should be ignored, since they belong to previous group
1144
    */
1145
    *numPartialCalcRows += pRowSup->numNullRows;
10,236,376✔
1146

1147
    /*
1148
      reset state window info for new group
1149
    */
1150
    pInfo->hasKey = false;
10,235,870✔
1151
    resetWindowRowsSup(pRowSup);
10,235,870✔
1152
  }
1153

1154
  for (int32_t j = *startIndex; j < *endIndex; ++j) {
2,147,483,647✔
1155
    if (pBlock->info.scanFlag != PRE_SCAN) {
2,147,483,647✔
1156
      if (pInfo->winSup.lastTs == INT64_MIN || gid != pRowSup->groupId || !pInfo->hasKey) {
2,147,483,647✔
1157
        pInfo->winSup.lastTs = tsList[j];
355,240,380✔
1158
      } else {
1159
        if (tsList[j] == pInfo->winSup.lastTs) {
2,147,483,647✔
1160
          // forbid duplicated ts rows
1161
          qError("%s:%d duplicated ts found in state window aggregation", __FILE__, __LINE__);
14,546✔
1162
          pTaskInfo->code = TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP;
14,546✔
1163
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP);
14,546✔
1164
        } else {
1165
          pInfo->winSup.lastTs = tsList[j];
2,147,483,647✔
1166
        }
1167
      }
1168
    }
1169
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
2,147,483,647✔
1170
      doKeepStateWindowNullInfo(pRowSup, tsList[j]);
804,658,598✔
1171
      continue;
804,658,598✔
1172
    }
1173
    if (pStateColInfoData->pData == NULL) {
2,147,483,647✔
1174
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1175
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1176
    }
1177
    char* val = colDataGetData(pStateColInfoData, j);
2,147,483,647✔
1178

1179
    if (!pInfo->hasKey) {
2,147,483,647✔
1180
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
5,423,631✔
1181
      pInfo->hasKey = true;
5,423,125✔
1182
      doKeepNewStateWindowStartInfo(
5,423,125✔
1183
        pRowSup, tsList, j, gid, &extendOption, false);
1184
      doKeepTuple(pRowSup, tsList[j], j, gid);
5,423,631✔
1185
    } else if (!compareVal(val, &pInfo->stateKey)) {
2,147,483,647✔
1186
      doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption, true);
2,147,483,647✔
1187
      int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
2,147,483,647✔
1188
                                              pExprSup, numOfOutput);
1189
      if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1190
        T_LONG_JMP(pTaskInfo->env, code);
×
1191
      }
1192
      *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
2,147,483,647✔
1193

1194
      doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid,
2,147,483,647✔
1195
                                    &extendOption, true);
1196
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
1197
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
2,147,483,647✔
1198
    } else {
1199
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
1200
    }
1201
  }
1202

1203
  if (!pInfo->hasKey && extendOption != STATE_WIN_EXTEND_OPTION_FORWARD) {
23,397,732✔
1204
    /*
1205
      No valid state rows within the block and we don't care about
1206
      null rows before valid state window, mark them as processed and drop them
1207
    */
1208
    *numPartialCalcRows = pBlock->info.rows;
3,209,750✔
1209
    resetNumNullRows(pRowSup);
3,209,750✔
1210
    return;
3,209,750✔
1211
  }
1212
  if (pRowSup->numOfRows == 0 && 
20,189,802✔
1213
      extendOption != STATE_WIN_EXTEND_OPTION_BACKWARD) {
3,923,034✔
1214
    /*
1215
      If no valid state window or we don't know the belonging of
1216
      null rows in the end of the block, handle them with next block
1217
    */
1218
    return;
3,917,223✔
1219
  }
1220
  doKeepCurStateWindowEndInfo(pRowSup, tsList, *endIndex, &extendOption, false);
16,272,579✔
1221
  int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
16,272,579✔
1222
                                          pExprSup, numOfOutput);
1223
  if (TSDB_CODE_SUCCESS != code) {
16,272,579✔
1224
    pTaskInfo->code = code;
434✔
1225
    T_LONG_JMP(pTaskInfo->env, code);
434✔
1226
  }
1227
  *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
16,272,145✔
1228
  // reset part of pRowSup after doing agg calculation
1229
  pRowSup->startRowIndex = 0;
16,272,145✔
1230
  pRowSup->numOfRows = 0;
16,272,145✔
1231
}
1232

1233
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
3,229,165✔
1234
  if (OPTR_IS_OPENED(pOperator)) {
3,229,165✔
1235
    return TSDB_CODE_SUCCESS;
2,310,337✔
1236
  }
1237

1238
  int32_t                   code = TSDB_CODE_SUCCESS;
918,828✔
1239
  int32_t                   lino = 0;
918,828✔
1240
  SStateWindowOperatorInfo* pInfo = pOperator->info;
918,828✔
1241
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
918,828✔
1242

1243
  SExprSupp* pSup = &pOperator->exprSupp;
918,828✔
1244
  int32_t    order = pInfo->binfo.inputTsOrder;
918,828✔
1245
  int64_t    st = taosGetTimestampUs();
918,828✔
1246

1247
  SOperatorInfo* downstream = pOperator->pDownstream[0];
918,828✔
1248
  pInfo->cleanGroupResInfo = false;
918,828✔
1249

1250
  SSDataBlock* pUnfinishedBlock = NULL;
918,828✔
1251
  int32_t      startIndex = 0;
918,828✔
1252
  int32_t      endIndex = 0;
918,828✔
1253
  int32_t      numPartialCalcRows = 0;
918,828✔
1254
  while (1) {
23,399,118✔
1255
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
24,317,946✔
1256
    if (pBlock == NULL) {
24,317,186✔
1257
      if (pUnfinishedBlock != NULL) {
889,075✔
1258
        blockDataDestroy(pUnfinishedBlock);
26,958✔
1259
        pUnfinishedBlock = NULL;
26,958✔
1260
        resetWindowRowsSup(&pInfo->winSup);
26,958✔
1261
      }
1262
      break;
889,075✔
1263
    }
1264
    
1265
    // mark whether pUnfinishedBlock is a reference to pBlock
1266
    bool isRef = false;
23,428,111✔
1267
    startIndex = 0;
23,428,111✔
1268
    if (pUnfinishedBlock != NULL) {
23,428,111✔
1269
      startIndex = pUnfinishedBlock->info.rows;
7,457,966✔
1270
      // merge unfinished block with current block
1271
      code = blockDataMerge(pUnfinishedBlock, pBlock);
7,457,966✔
1272
      // reset id to current block id
1273
      pUnfinishedBlock->info.id = pBlock->info.id;
7,457,966✔
1274
      QUERY_CHECK_CODE(code, lino, _end);
7,457,966✔
1275
    } else {
1276
      pUnfinishedBlock = pBlock;
15,970,145✔
1277
      isRef = true;
15,970,145✔
1278
    }
1279
    endIndex = pUnfinishedBlock->info.rows;
23,428,111✔
1280

1281
    pInfo->binfo.pRes->info.scanFlag = pUnfinishedBlock->info.scanFlag;
23,428,111✔
1282
    code = setInputDataBlock(
23,428,111✔
1283
      pSup, pUnfinishedBlock, order, pUnfinishedBlock->info.scanFlag, true);
23,428,111✔
1284
    QUERY_CHECK_CODE(code, lino, _end);
23,428,111✔
1285

1286
    code = blockDataUpdateTsWindow(pUnfinishedBlock, pInfo->tsSlotId);
23,428,111✔
1287
    QUERY_CHECK_CODE(code, lino, _end);
23,428,111✔
1288

1289
    // there is an scalar expression that 
1290
    // needs to be calculated right before apply the group aggregation.
1291
    if (pInfo->scalarSup.pExprInfo != NULL) {
23,428,111✔
1292
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo,
293,834✔
1293
        pUnfinishedBlock, pUnfinishedBlock, pInfo->scalarSup.pCtx,
1294
        pInfo->scalarSup.numOfExprs, NULL,
1295
        GET_STM_RTINFO(pOperator->pTaskInfo));
293,834✔
1296
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
293,834✔
1297
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
14,013✔
1298
      }
1299
    }
1300

1301
    doStateWindowAggImpl(pOperator, pInfo, pUnfinishedBlock, 
23,413,592✔
1302
      &startIndex, &endIndex, &numPartialCalcRows);
1303
    if (numPartialCalcRows < pUnfinishedBlock->info.rows) {
23,399,118✔
1304
      // save unfinished block for next round processing
1305
      if (isRef) {
7,484,924✔
1306
        code = createOneDataBlock(pBlock, true, &pUnfinishedBlock);
2,902,788✔
1307
        QUERY_CHECK_CODE(code, lino, _end);
2,902,788✔
1308
      }
1309
      code = blockDataTrimFirstRows(pUnfinishedBlock, numPartialCalcRows);
7,484,924✔
1310
      QUERY_CHECK_NULL(pUnfinishedBlock, code, lino, _end, terrno);
7,484,924✔
1311
    } else {
1312
      if (!isRef) {
15,914,194✔
1313
        blockDataDestroy(pUnfinishedBlock);
2,875,830✔
1314
      }
1315
      pUnfinishedBlock = NULL;
15,914,194✔
1316
    }
1317
    numPartialCalcRows = 0;
23,399,118✔
1318
  }
1319

1320
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
889,075✔
1321
  code = initGroupedResultInfo(
889,075✔
1322
    &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1323
  QUERY_CHECK_CODE(code, lino, _end);
889,075✔
1324
  pInfo->cleanGroupResInfo = true;
889,075✔
1325
  pOperator->status = OP_RES_TO_RETURN;
889,075✔
1326

1327
_end:
889,075✔
1328
  if (code != TSDB_CODE_SUCCESS) {
889,075✔
1329
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1330
    pTaskInfo->code = code;
×
1331
    T_LONG_JMP(pTaskInfo->env, code);
×
1332
  }
1333
  return code;
889,075✔
1334
}
1335

1336
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,971,337✔
1337
  if (pOperator->status == OP_EXEC_DONE) {
3,971,337✔
1338
    (*ppRes) = NULL;
742,172✔
1339
    return TSDB_CODE_SUCCESS;
742,172✔
1340
  }
1341

1342
  int32_t                   code = TSDB_CODE_SUCCESS;
3,229,165✔
1343
  int32_t                   lino = 0;
3,229,165✔
1344
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
3,229,165✔
1345
  SStateWindowOperatorInfo* pInfo = pOperator->info;
3,229,165✔
1346
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
3,229,165✔
1347

1348
  code = pOperator->fpSet._openFn(pOperator);
3,229,165✔
1349
  QUERY_CHECK_CODE(code, lino, _end);
3,199,412✔
1350

1351
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
3,199,412✔
1352
  QUERY_CHECK_CODE(code, lino, _end);
3,199,412✔
1353

1354
  while (1) {
×
1355
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
3,199,412✔
1356
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
3,199,412✔
1357
    QUERY_CHECK_CODE(code, lino, _end);
3,199,412✔
1358

1359
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
3,199,412✔
1360
    if (!hasRemain) {
3,199,412✔
1361
      setOperatorCompleted(pOperator);
842,455✔
1362
      break;
842,455✔
1363
    }
1364

1365
    if (pBInfo->pRes->info.rows > 0) {
2,356,957✔
1366
      break;
2,356,957✔
1367
    }
1368
  }
1369

1370
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
3,199,412✔
1371

1372
_end:
3,199,412✔
1373
  if (code != TSDB_CODE_SUCCESS) {
3,199,412✔
1374
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1375
    pTaskInfo->code = code;
×
1376
    T_LONG_JMP(pTaskInfo->env, code);
×
1377
  }
1378
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
3,199,412✔
1379
  return code;
3,199,412✔
1380
}
1381

1382
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
26,966,625✔
1383
  int32_t                   code = TSDB_CODE_SUCCESS;
26,966,625✔
1384
  int32_t                   lino = 0;
26,966,625✔
1385
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
26,966,625✔
1386
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
26,970,125✔
1387

1388
  if (pOperator->status == OP_EXEC_DONE) {
26,967,594✔
1389
    (*ppRes) = NULL;
2,952,259✔
1390
    return code;
2,952,259✔
1391
  }
1392

1393
  SSDataBlock* pBlock = pInfo->binfo.pRes;
24,008,544✔
1394
  code = pOperator->fpSet._openFn(pOperator);
24,008,508✔
1395
  QUERY_CHECK_CODE(code, lino, _end);
23,983,890✔
1396

1397
  while (1) {
5,745✔
1398
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
23,989,635✔
1399
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
23,988,435✔
1400
    QUERY_CHECK_CODE(code, lino, _end);
23,990,103✔
1401

1402
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
23,990,103✔
1403
    if (!hasRemain) {
23,990,875✔
1404
      setOperatorCompleted(pOperator);
3,969,783✔
1405
      break;
3,970,166✔
1406
    }
1407

1408
    if (pBlock->info.rows > 0) {
20,021,092✔
1409
      break;
20,015,347✔
1410
    }
1411
  }
1412

1413
  size_t rows = pBlock->info.rows;
23,985,513✔
1414
  pOperator->resultInfo.totalRows += rows;
23,983,073✔
1415

1416
_end:
23,982,307✔
1417
  if (code != TSDB_CODE_SUCCESS) {
23,982,307✔
1418
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1419
    pTaskInfo->code = code;
×
1420
    T_LONG_JMP(pTaskInfo->env, code);
×
1421
  }
1422
  (*ppRes) = (rows == 0) ? NULL : pBlock;
23,982,307✔
1423
  return code;
23,980,699✔
1424
}
1425

1426
static void destroyStateWindowOperatorInfo(void* param) {
887,828✔
1427
  if (param == NULL) {
887,828✔
1428
    return;
×
1429
  }
1430
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
887,828✔
1431
  cleanupBasicInfo(&pInfo->binfo);
887,828✔
1432
  taosMemoryFreeClear(pInfo->stateKey.pData);
887,828✔
1433
  if (pInfo->pOperator) {
887,828✔
1434
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
887,828✔
1435
                      pInfo->cleanGroupResInfo);
887,828✔
1436
    pInfo->pOperator = NULL;
887,828✔
1437
  }
1438

1439
  cleanupExprSupp(&pInfo->scalarSup);
887,828✔
1440
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
887,828✔
1441
  cleanupAggSup(&pInfo->aggSup);
887,828✔
1442
  cleanupGroupResInfo(&pInfo->groupResInfo);
887,828✔
1443

1444
  taosMemoryFreeClear(param);
887,445✔
1445
}
1446

1447
static void freeItem(void* param) {
31,770✔
1448
  SGroupKeys* pKey = (SGroupKeys*)param;
31,770✔
1449
  taosMemoryFree(pKey->pData);
31,770✔
1450
}
31,770✔
1451

1452
void destroyIntervalOperatorInfo(void* param) {
4,703,229✔
1453
  if (param == NULL) {
4,703,229✔
1454
    return;
×
1455
  }
1456

1457
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
4,703,229✔
1458

1459
  cleanupBasicInfo(&pInfo->binfo);
4,703,229✔
1460
  if (pInfo->pOperator) {
4,704,103✔
1461
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
4,669,400✔
1462
                      pInfo->cleanGroupResInfo);
4,673,068✔
1463
    pInfo->pOperator = NULL;
4,662,988✔
1464
  }
1465

1466
  cleanupAggSup(&pInfo->aggSup);
4,696,534✔
1467
  cleanupExprSupp(&pInfo->scalarSupp);
4,697,619✔
1468

1469
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
4,705,578✔
1470

1471
  taosArrayDestroy(pInfo->pInterpCols);
4,702,446✔
1472
  pInfo->pInterpCols = NULL;
4,702,240✔
1473

1474
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
4,698,493✔
1475
  pInfo->pPrevValues = NULL;
4,697,206✔
1476

1477
  cleanupGroupResInfo(&pInfo->groupResInfo);
4,697,070✔
1478
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
4,696,129✔
1479
  destroyBoundedQueue(pInfo->pBQ);
4,703,880✔
1480
  taosMemoryFreeClear(param);
4,695,445✔
1481
}
1482

1483
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
15,885✔
1484
  int32_t code = TSDB_CODE_SUCCESS;
15,885✔
1485
  int32_t lino = 0;
15,885✔
1486
  void*   tmp = NULL;
15,885✔
1487

1488
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
15,885✔
1489
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
15,885✔
1490

1491
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
15,885✔
1492
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
15,885✔
1493

1494
  {  // ts column
1495
    SColumn c = {0};
15,885✔
1496
    c.colId = 1;
15,885✔
1497
    c.slotId = pInfo->primaryTsIndex;
15,885✔
1498
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
15,885✔
1499
    c.bytes = sizeof(int64_t);
15,885✔
1500
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
15,885✔
1501
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,885✔
1502

1503
    SGroupKeys key;
15,885✔
1504
    key.bytes = c.bytes;
15,885✔
1505
    key.type = c.type;
15,885✔
1506
    key.isNull = true;  // to denote no value is assigned yet
15,885✔
1507
    key.pData = taosMemoryCalloc(1, c.bytes);
15,885✔
1508
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
15,885✔
1509

1510
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
15,885✔
1511
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,885✔
1512
  }
1513
_end:
15,885✔
1514
  if (code != TSDB_CODE_SUCCESS) {
15,885✔
1515
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1516
  }
1517
  return code;
15,885✔
1518
}
1519

1520
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
4,659,251✔
1521
                                      bool* pRes) {
1522
  // the primary timestamp column
1523
  bool    needed = false;
4,659,251✔
1524
  int32_t code = TSDB_CODE_SUCCESS;
4,659,251✔
1525
  int32_t lino = 0;
4,659,251✔
1526
  void*   tmp = NULL;
4,659,251✔
1527

1528
  for (int32_t i = 0; i < numOfCols; ++i) {
21,996,671✔
1529
    SExprInfo* pExpr = pCtx[i].pExpr;
17,395,085✔
1530
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
17,388,293✔
1531
      needed = true;
15,885✔
1532
      break;
15,885✔
1533
    }
1534
  }
1535

1536
  if (needed) {
4,617,471✔
1537
    code = initWindowInterpPrevVal(pInfo);
15,885✔
1538
    QUERY_CHECK_CODE(code, lino, _end);
15,885✔
1539
  }
1540

1541
  for (int32_t i = 0; i < numOfCols; ++i) {
21,956,762✔
1542
    SExprInfo* pExpr = pCtx[i].pExpr;
17,357,108✔
1543

1544
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
17,365,699✔
1545
      SFunctParam* pParam = &pExpr->base.pParam[0];
15,885✔
1546

1547
      SColumn c = *pParam->pCol;
15,885✔
1548
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
15,885✔
1549
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,885✔
1550

1551
      SGroupKeys key = {0};
15,885✔
1552
      key.bytes = c.bytes;
15,885✔
1553
      key.type = c.type;
15,885✔
1554
      key.isNull = false;
15,885✔
1555
      key.pData = taosMemoryCalloc(1, c.bytes);
15,885✔
1556
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
15,885✔
1557

1558
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
15,885✔
1559
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,885✔
1560
    }
1561
  }
1562

1563
_end:
4,599,654✔
1564
  if (code != TSDB_CODE_SUCCESS) {
4,606,490✔
1565
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1566
  }
1567
  *pRes = needed;
4,606,490✔
1568
  return code;
4,626,107✔
1569
}
1570

1571
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
8,925✔
1572
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
8,925✔
1573
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
8,925✔
1574
  pOper->status = OP_NOT_OPENED;
8,570✔
1575

1576
  resetBasicOperatorState(&pIntervalInfo->binfo);
8,925✔
1577
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
8,925✔
1578
    pIntervalInfo->cleanGroupResInfo);
8,925✔
1579

1580
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
8,570✔
1581
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
8,570✔
1582
  if (code == 0) {
8,925✔
1583
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
17,850✔
1584
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
8,570✔
1585
                       &pTaskInfo->storageAPI.functionStore);
1586
  }
1587
  if (code == 0) {
8,925✔
1588
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
8,925✔
1589
                         &pTaskInfo->storageAPI.functionStore);
1590
  }
1591

1592
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
8,925✔
1593
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
1594
  }
1595

1596
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
8,925✔
1597
    pIntervalInfo->curGroupId = UINT64_MAX;
×
1598
  }
1599

1600
  pIntervalInfo->cleanGroupResInfo = false;
8,925✔
1601
  pIntervalInfo->handledGroupNum = 0;
8,925✔
1602
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
8,925✔
1603
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
8,925✔
1604

1605
  taosArrayDestroy(pIntervalInfo->pInterpCols);
8,925✔
1606
  pIntervalInfo->pInterpCols = NULL;
8,925✔
1607

1608
  if (pIntervalInfo->pPrevValues != NULL) {
8,925✔
1609
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
1610
    pIntervalInfo->pPrevValues = NULL;
×
1611
    code = initWindowInterpPrevVal(pIntervalInfo);
×
1612
  }
1613

1614
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
8,925✔
1615
  destroyBoundedQueue(pIntervalInfo->pBQ);
8,925✔
1616
  pIntervalInfo->pBQ = NULL;
8,925✔
1617
  return code;
8,925✔
1618
}
1619

1620
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
7,505✔
1621
  SIntervalAggOperatorInfo* pInfo = pOper->info;
7,505✔
1622
  return resetInterval(pOper, pInfo);
7,505✔
1623
}
1624

1625
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
4,038,089✔
1626
                                   SOperatorInfo** pOptrInfo) {
1627
  QRY_PARAM_CHECK(pOptrInfo);
4,038,089✔
1628

1629
  int32_t                   code = TSDB_CODE_SUCCESS;
4,046,740✔
1630
  int32_t                   lino = 0;
4,046,740✔
1631
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
4,046,740✔
1632
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4,007,826✔
1633
  if (pInfo == NULL || pOperator == NULL) {
4,019,638✔
1634
    code = terrno;
×
1635
    lino = __LINE__;
×
1636
    goto _error;
×
1637
  }
1638

1639
  pOperator->pPhyNode = pPhyNode;
4,020,487✔
1640
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
4,021,503✔
1641
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
4,051,870✔
1642
  initBasicInfo(&pInfo->binfo, pResBlock);
4,051,870✔
1643

1644
  SExprSupp* pSup = &pOperator->exprSupp;
4,043,594✔
1645
  pSup->hasWindowOrGroup = true;
4,044,359✔
1646
  pSup->hasWindow = true;
4,047,702✔
1647

1648
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
4,044,794✔
1649

1650
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
4,046,071✔
1651
  initResultSizeInfo(&pOperator->resultInfo, 512);
4,046,071✔
1652
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
4,043,574✔
1653
  QUERY_CHECK_CODE(code, lino, _error);
4,049,914✔
1654

1655
  int32_t    num = 0;
4,049,914✔
1656
  SExprInfo* pExprInfo = NULL;
4,049,914✔
1657
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
4,049,914✔
1658
  QUERY_CHECK_CODE(code, lino, _error);
4,046,021✔
1659

1660
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
4,046,021✔
1661
                    &pTaskInfo->storageAPI.functionStore);
1662
  QUERY_CHECK_CODE(code, lino, _error);
4,039,133✔
1663

1664
  SInterval interval = {.interval = pPhyNode->interval,
12,075,905✔
1665
                        .sliding = pPhyNode->sliding,
4,024,735✔
1666
                        .intervalUnit = pPhyNode->intervalUnit,
4,039,971✔
1667
                        .slidingUnit = pPhyNode->slidingUnit,
4,031,065✔
1668
                        .offset = pPhyNode->offset,
4,013,511✔
1669
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
4,021,592✔
1670
                        .timeRange = pPhyNode->timeRange};
1671
  calcIntervalAutoOffset(&interval);
4,009,340✔
1672

1673
  STimeWindowAggSupp as = {
4,023,423✔
1674
      .maxTs = INT64_MIN,
1675
  };
1676

1677
  pInfo->win = pTaskInfo->window;
4,023,423✔
1678
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
4,020,278✔
1679
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
4,021,149✔
1680
  pInfo->interval = interval;
4,025,140✔
1681
  pInfo->twAggSup = as;
4,020,232✔
1682
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
4,010,381✔
1683
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
4,018,529✔
1684
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
1,036,024✔
1685
    pInfo->limited = true;
1,035,990✔
1686
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
1,035,286✔
1687
  }
1688
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
4,015,891✔
1689
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
100,068✔
1690
    pInfo->slimited = true;
100,010✔
1691
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
98,903✔
1692
    pInfo->curGroupId = UINT64_MAX;
98,932✔
1693
  }
1694

1695
  if (pPhyNode->window.pExprs != NULL) {
4,034,254✔
1696
    int32_t    numOfScalar = 0;
405,996✔
1697
    SExprInfo* pScalarExprInfo = NULL;
405,996✔
1698
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
403,992✔
1699
    QUERY_CHECK_CODE(code, lino, _error);
410,439✔
1700

1701
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
378,261✔
1702
    if (code != TSDB_CODE_SUCCESS) {
378,756✔
1703
      goto _error;
×
1704
    }
1705
  }
1706

1707
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
4,005,522✔
1708
                            pTaskInfo->pStreamRuntimeInfo);
3,995,045✔
1709
  if (code != TSDB_CODE_SUCCESS) {
3,993,233✔
1710
    goto _error;
×
1711
  }
1712

1713
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
3,993,233✔
1714
  QUERY_CHECK_CODE(code, lino, _error);
3,977,673✔
1715

1716
  pInfo->timeWindowInterpo = false;
3,977,673✔
1717
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
3,979,399✔
1718
  QUERY_CHECK_CODE(code, lino, _error);
3,979,307✔
1719
  if (pInfo->timeWindowInterpo) {
3,979,307✔
1720
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
15,885✔
1721
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
15,885✔
1722
      goto _error;
×
1723
    }
1724
  }
1725

1726
  pInfo->pOperator = pOperator;
3,979,827✔
1727
  pInfo->cleanGroupResInfo = false;
3,960,254✔
1728
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
3,991,617✔
1729
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
3,966,832✔
1730
                  pInfo, pTaskInfo);
1731

1732
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
3,983,632✔
1733
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1734
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
3,995,464✔
1735
  code = appendDownstream(pOperator, &downstream, 1);
3,992,020✔
1736
  if (code != TSDB_CODE_SUCCESS) {
3,980,615✔
1737
    goto _error;
×
1738
  }
1739

1740
  *pOptrInfo = pOperator;
3,980,615✔
1741
  return TSDB_CODE_SUCCESS;
3,981,369✔
1742

1743
_error:
32,178✔
1744
  if (pInfo != NULL) {
32,178✔
1745
    destroyIntervalOperatorInfo(pInfo);
32,178✔
1746
  }
1747

1748
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
32,178✔
1749
  pTaskInfo->code = code;
32,178✔
1750
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
32,178✔
1751
  return code;
32,178✔
1752
}
1753

1754
// todo handle multiple timeline cases. assume no timeline interweaving
1755
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
6,934,469✔
1756
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
6,934,469✔
1757
  SExprSupp*     pSup = &pOperator->exprSupp;
6,934,469✔
1758

1759
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
6,934,469✔
1760
  if (!pColInfoData) {
6,934,469✔
1761
    pTaskInfo->code = terrno;
×
1762
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1763
  }
1764

1765
  bool    masterScan = true;
6,934,469✔
1766
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
6,934,469✔
1767
  int64_t gid = pBlock->info.id.groupId;
6,934,469✔
1768

1769
  int64_t gap = pInfo->gap;
6,934,469✔
1770

1771
  if (!pInfo->reptScan) {
6,934,469✔
1772
    pInfo->reptScan = true;
410,877✔
1773
    pInfo->winSup.prevTs = INT64_MIN;
410,877✔
1774
  }
1775

1776
  SWindowRowsSup* pRowSup = &pInfo->winSup;
6,934,469✔
1777
  pRowSup->numOfRows = 0;
6,934,469✔
1778
  pRowSup->startRowIndex = 0;
6,934,469✔
1779

1780
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1781
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
6,934,469✔
1782
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
2,057,659,811✔
1783
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
2,050,725,342✔
1784
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
4,873,846✔
1785
      doKeepTuple(pRowSup, tsList[j], j, gid);
4,873,846✔
1786
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
2,045,851,496✔
1787
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
666,909,404✔
1788
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1789
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,379,026,997✔
1790
    } else {  // start a new session window
1791
      // start a new session window
1792
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
666,824,499✔
1793
        SResultRow* pResult = NULL;
666,527,461✔
1794

1795
        // keep the time window for the closed time window.
1796
        STimeWindow window = pRowSup->win;
666,527,461✔
1797

1798
        int32_t ret =
1799
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
666,527,461✔
1800
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1801
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
666,527,461✔
1802
          T_LONG_JMP(pTaskInfo->env, ret);
×
1803
        }
1804

1805
        // pInfo->numOfRows data belong to the current session window
1806
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
666,527,461✔
1807
        ret =
1808
            applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
666,527,461✔
1809
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
666,527,461✔
1810
        if (ret != TSDB_CODE_SUCCESS) {
666,527,461✔
1811
          T_LONG_JMP(pTaskInfo->env, ret);
×
1812
        }
1813
      }
1814

1815
      // here we start a new session window
1816
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
666,824,499✔
1817
      doKeepTuple(pRowSup, tsList[j], j, gid);
666,824,499✔
1818
    }
1819
  }
1820

1821
  SResultRow* pResult = NULL;
6,934,469✔
1822
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
6,934,469✔
1823
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
6,934,469✔
1824
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1825
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
6,934,469✔
1826
    T_LONG_JMP(pTaskInfo->env, ret);
×
1827
  }
1828

1829
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
6,934,469✔
1830
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
6,934,469✔
1831
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
6,934,469✔
1832
  if (ret != TSDB_CODE_SUCCESS) {
6,934,469✔
1833
    T_LONG_JMP(pTaskInfo->env, ret);
×
1834
  }
1835
}
6,934,469✔
1836

1837
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,336,771✔
1838
  if (pOperator->status == OP_EXEC_DONE) {
2,336,771✔
1839
    (*ppRes) = NULL;
291,835✔
1840
    return TSDB_CODE_SUCCESS;
291,835✔
1841
  }
1842

1843
  int32_t                  code = TSDB_CODE_SUCCESS;
2,044,936✔
1844
  int32_t                  lino = 0;
2,044,936✔
1845
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
2,044,936✔
1846
  SSessionAggOperatorInfo* pInfo = pOperator->info;
2,044,936✔
1847
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
2,044,936✔
1848
  SExprSupp*               pSup = &pOperator->exprSupp;
2,044,936✔
1849

1850
  if (pOperator->status == OP_RES_TO_RETURN) {
2,044,936✔
1851
    while (1) {
×
1852
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,605,221✔
1853
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,605,221✔
1854
      QUERY_CHECK_CODE(code, lino, _end);
1,605,221✔
1855

1856
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,605,221✔
1857
      if (!hasRemain) {
1,605,221✔
1858
        setOperatorCompleted(pOperator);
16,831✔
1859
        break;
16,831✔
1860
      }
1861

1862
      if (pBInfo->pRes->info.rows > 0) {
1,588,390✔
1863
        break;
1,588,390✔
1864
      }
1865
    }
1866
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1,605,221✔
1867
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1,605,221✔
1868
    return code;
1,605,221✔
1869
  }
1870

1871
  int64_t st = taosGetTimestampUs();
439,715✔
1872
  int32_t order = pInfo->binfo.inputTsOrder;
439,715✔
1873

1874
  SOperatorInfo* downstream = pOperator->pDownstream[0];
439,715✔
1875

1876
  pInfo->cleanGroupResInfo = false;
439,715✔
1877
  while (1) {
6,934,469✔
1878
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
7,374,184✔
1879
    if (pBlock == NULL) {
7,374,184✔
1880
      break;
439,715✔
1881
    }
1882

1883
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
6,934,469✔
1884
    if (pInfo->scalarSupp.pExprInfo != NULL) {
6,934,469✔
1885
      SExprSupp* pExprSup = &pInfo->scalarSupp;
1,386✔
1886
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
1,386✔
1887
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
1,386✔
1888
      QUERY_CHECK_CODE(code, lino, _end);
1,386✔
1889
    }
1890
    // the pDataBlock are always the same one, no need to call this again
1891
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
6,934,469✔
1892
    QUERY_CHECK_CODE(code, lino, _end);
6,934,469✔
1893

1894
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
6,934,469✔
1895
    QUERY_CHECK_CODE(code, lino, _end);
6,934,469✔
1896

1897
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
6,934,469✔
1898
  }
1899

1900
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
439,715✔
1901

1902
  // restore the value
1903
  pOperator->status = OP_RES_TO_RETURN;
439,332✔
1904

1905
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
439,332✔
1906
  QUERY_CHECK_CODE(code, lino, _end);
439,715✔
1907
  pInfo->cleanGroupResInfo = true;
439,715✔
1908

1909
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
439,715✔
1910
  QUERY_CHECK_CODE(code, lino, _end);
439,715✔
1911
  while (1) {
×
1912
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
439,715✔
1913
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
439,715✔
1914
    QUERY_CHECK_CODE(code, lino, _end);
439,715✔
1915

1916
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
439,715✔
1917
    if (!hasRemain) {
439,715✔
1918
      setOperatorCompleted(pOperator);
379,774✔
1919
      break;
379,312✔
1920
    }
1921

1922
    if (pBInfo->pRes->info.rows > 0) {
59,941✔
1923
      break;
59,941✔
1924
    }
1925
  }
1926
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
439,253✔
1927

1928
_end:
439,253✔
1929
  if (code != TSDB_CODE_SUCCESS) {
439,253✔
1930
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1931
    pTaskInfo->code = code;
×
1932
    T_LONG_JMP(pTaskInfo->env, code);
×
1933
  }
1934
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
439,253✔
1935
  return code;
439,715✔
1936
}
1937

1938
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
35,290✔
1939
  SStateWindowOperatorInfo* pInfo = pOper->info;
35,290✔
1940
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
35,290✔
1941
  SStateWindowPhysiNode* pPhynode = (SStateWindowPhysiNode*)pOper->pPhyNode;
35,290✔
1942
  pOper->status = OP_NOT_OPENED;
35,290✔
1943

1944
  resetBasicOperatorState(&pInfo->binfo);
35,290✔
1945
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
35,290✔
1946
                    pInfo->cleanGroupResInfo);
35,290✔
1947

1948
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
35,290✔
1949
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
35,290✔
1950
  if (code == 0) {
35,290✔
1951
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
70,580✔
1952
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
35,290✔
1953
                       &pTaskInfo->storageAPI.functionStore);
1954
  }
1955
  if (code == 0) {
35,290✔
1956
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
35,290✔
1957
                         &pTaskInfo->storageAPI.functionStore);
1958
  }
1959

1960
  pInfo->cleanGroupResInfo = false;
35,290✔
1961
  pInfo->hasKey = false;
35,290✔
1962
  pInfo->winSup.lastTs = INT64_MIN;
35,290✔
1963
  cleanupGroupResInfo(&pInfo->groupResInfo);
35,290✔
1964
  memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
35,290✔
1965
  return code;
35,290✔
1966
}
1967

1968
// todo make this as an non-blocking operator
1969
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhysiNode* pStateNode,
887,828✔
1970
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1971
  QRY_PARAM_CHECK(pOptrInfo);
887,828✔
1972

1973
  int32_t                   code = TSDB_CODE_SUCCESS;
887,828✔
1974
  int32_t                   lino = 0;
887,828✔
1975
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
887,828✔
1976
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
887,828✔
1977
  if (pInfo == NULL || pOperator == NULL) {
887,424✔
1978
    code = terrno;
×
1979
    goto _error;
×
1980
  }
1981

1982
  pOperator->pPhyNode = pStateNode;
887,828✔
1983
  pOperator->exprSupp.hasWindowOrGroup = true;
887,828✔
1984
  pOperator->exprSupp.hasWindow = true;
887,424✔
1985
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
887,828✔
1986
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
887,828✔
1987

1988
  if (pStateNode->window.pExprs != NULL) {
887,828✔
1989
    int32_t    numOfScalarExpr = 0;
250,906✔
1990
    SExprInfo* pScalarExprInfo = NULL;
250,906✔
1991
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
250,906✔
1992
    QUERY_CHECK_CODE(code, lino, _error);
250,400✔
1993

1994
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
250,400✔
1995
    if (code != TSDB_CODE_SUCCESS) {
250,906✔
1996
      goto _error;
×
1997
    }
1998
  }
1999

2000
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
887,828✔
2001
  pInfo->stateKey.type = pInfo->stateCol.type;
886,918✔
2002
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
887,322✔
2003
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
887,322✔
2004
  if (pInfo->stateKey.pData == NULL) {
887,322✔
2005
    goto _error;
×
2006
  }
2007
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
887,322✔
2008
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
887,424✔
2009

2010
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
886,918✔
2011
                            pTaskInfo->pStreamRuntimeInfo);
887,322✔
2012
  if (code != TSDB_CODE_SUCCESS) {
887,322✔
2013
    goto _error;
×
2014
  }
2015

2016
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
887,322✔
2017

2018
  int32_t    num = 0;
887,322✔
2019
  SExprInfo* pExprInfo = NULL;
887,322✔
2020
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
887,828✔
2021
  QUERY_CHECK_CODE(code, lino, _error);
887,828✔
2022

2023
  initResultSizeInfo(&pOperator->resultInfo, 4096);
887,828✔
2024

2025
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
1,775,656✔
2026
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
887,828✔
2027
  if (code != TSDB_CODE_SUCCESS) {
886,661✔
2028
    goto _error;
×
2029
  }
2030

2031
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
886,661✔
2032
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
887,828✔
2033
  initBasicInfo(&pInfo->binfo, pResBlock);
887,828✔
2034
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
887,424✔
2035

2036
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
887,424✔
2037
  QUERY_CHECK_CODE(code, lino, _error);
887,828✔
2038

2039
  pInfo->tsSlotId = tsSlotId;
887,828✔
2040
  pInfo->pOperator = pOperator;
886,918✔
2041
  pInfo->cleanGroupResInfo = false;
887,322✔
2042
  pInfo->extendOption = pStateNode->extendOption;
887,322✔
2043
  pInfo->trueForInfo.trueForType = pStateNode->trueForType;
887,828✔
2044
  pInfo->trueForInfo.count = pStateNode->trueForCount;
887,322✔
2045
  pInfo->trueForInfo.duration = pStateNode->trueForDuration;
887,322✔
2046
  pInfo->winSup.lastTs = INT64_MIN;
887,041✔
2047

2048
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
886,942✔
2049
                  pTaskInfo);
2050
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
887,445✔
2051
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2052
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
886,939✔
2053

2054
  code = appendDownstream(pOperator, &downstream, 1);
886,939✔
2055
  if (code != TSDB_CODE_SUCCESS) {
887,828✔
2056
    goto _error;
×
2057
  }
2058

2059
  *pOptrInfo = pOperator;
887,828✔
2060
  return TSDB_CODE_SUCCESS;
887,828✔
2061

2062
_error:
×
2063
  if (pInfo != NULL) {
×
2064
    destroyStateWindowOperatorInfo(pInfo);
×
2065
  }
2066

2067
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2068
  pTaskInfo->code = code;
×
2069
  return code;
×
2070
}
2071

2072
void destroySWindowOperatorInfo(void* param) {
452,365✔
2073
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
452,365✔
2074
  if (pInfo == NULL) {
452,365✔
2075
    return;
×
2076
  }
2077

2078
  cleanupBasicInfo(&pInfo->binfo);
452,365✔
2079
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
452,365✔
2080
  if (pInfo->pOperator) {
452,748✔
2081
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
452,748✔
2082
                      pInfo->cleanGroupResInfo);
452,748✔
2083
    pInfo->pOperator = NULL;
452,365✔
2084
  }
2085

2086
  cleanupAggSup(&pInfo->aggSup);
451,982✔
2087
  cleanupExprSupp(&pInfo->scalarSupp);
452,365✔
2088

2089
  cleanupGroupResInfo(&pInfo->groupResInfo);
452,748✔
2090
  taosMemoryFreeClear(param);
451,982✔
2091
}
2092

2093
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
1,470✔
2094
  SSessionAggOperatorInfo* pInfo = pOper->info;
1,470✔
2095
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
1,470✔
2096
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
1,470✔
2097
  pOper->status = OP_NOT_OPENED;
1,470✔
2098

2099
  resetBasicOperatorState(&pInfo->binfo);
1,470✔
2100
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,470✔
2101
                    pInfo->cleanGroupResInfo);
1,470✔
2102

2103
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,470✔
2104
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,470✔
2105
  if (code == 0) {
1,470✔
2106
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
2,940✔
2107
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,470✔
2108
                       &pTaskInfo->storageAPI.functionStore);
2109
  }
2110
  if (code == 0) {
1,470✔
2111
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
1,470✔
2112
                         &pTaskInfo->storageAPI.functionStore);
2113
  }
2114

2115
  pInfo->cleanGroupResInfo = false;
1,470✔
2116
  pInfo->winSup = (SWindowRowsSup){0};
1,470✔
2117
  pInfo->winSup.prevTs = INT64_MIN;
1,470✔
2118
  pInfo->reptScan = false;
1,470✔
2119

2120
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,470✔
2121
  return code;
1,470✔
2122
}
2123

2124
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
452,748✔
2125
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2126
  QRY_PARAM_CHECK(pOptrInfo);
452,748✔
2127

2128
  int32_t                  code = TSDB_CODE_SUCCESS;
452,748✔
2129
  int32_t                  lino = 0;
452,748✔
2130
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
452,748✔
2131
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
452,748✔
2132
  if (pInfo == NULL || pOperator == NULL) {
452,286✔
2133
    code = terrno;
×
2134
    goto _error;
×
2135
  }
2136

2137
  pOperator->pPhyNode = pSessionNode;
452,286✔
2138
  pOperator->exprSupp.hasWindowOrGroup = true;
452,286✔
2139
  pOperator->exprSupp.hasWindow = true;
452,748✔
2140

2141
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
452,748✔
2142
  initResultSizeInfo(&pOperator->resultInfo, 4096);
452,748✔
2143

2144
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
452,748✔
2145
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
452,748✔
2146
  initBasicInfo(&pInfo->binfo, pResBlock);
452,748✔
2147

2148
  int32_t    numOfCols = 0;
452,748✔
2149
  SExprInfo* pExprInfo = NULL;
452,748✔
2150
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
452,748✔
2151
  QUERY_CHECK_CODE(code, lino, _error);
452,748✔
2152

2153
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
905,034✔
2154
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
452,748✔
2155
  QUERY_CHECK_CODE(code, lino, _error);
452,748✔
2156

2157
  pInfo->gap = pSessionNode->gap;
452,748✔
2158

2159
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
452,748✔
2160
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
452,748✔
2161
  QUERY_CHECK_CODE(code, lino, _error);
452,748✔
2162

2163
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
452,748✔
2164
  pInfo->binfo.pRes = pResBlock;
452,286✔
2165
  pInfo->winSup.prevTs = INT64_MIN;
452,286✔
2166
  pInfo->reptScan = false;
452,286✔
2167
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
452,748✔
2168
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
452,748✔
2169

2170
  if (pSessionNode->window.pExprs != NULL) {
452,748✔
2171
    int32_t    numOfScalar = 0;
462✔
2172
    SExprInfo* pScalarExprInfo = NULL;
462✔
2173
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
462✔
2174
    QUERY_CHECK_CODE(code, lino, _error);
462✔
2175

2176
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
462✔
2177
    QUERY_CHECK_CODE(code, lino, _error);
462✔
2178
  }
2179

2180
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
452,286✔
2181
                            pTaskInfo->pStreamRuntimeInfo);
452,286✔
2182
  QUERY_CHECK_CODE(code, lino, _error);
452,748✔
2183

2184
  pInfo->pOperator = pOperator;
452,748✔
2185
  pInfo->cleanGroupResInfo = false;
452,748✔
2186
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
452,286✔
2187
                  pInfo, pTaskInfo);
2188
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
452,748✔
2189
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2190
  pOperator->pTaskInfo = pTaskInfo;
452,286✔
2191
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
452,286✔
2192

2193
  code = appendDownstream(pOperator, &downstream, 1);
452,286✔
2194
  QUERY_CHECK_CODE(code, lino, _error);
452,748✔
2195

2196
  *pOptrInfo = pOperator;
452,748✔
2197
  return TSDB_CODE_SUCCESS;
452,748✔
2198

2199
_error:
×
2200
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
2201
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2202
  pTaskInfo->code = code;
×
2203
  return code;
×
2204
}
2205

2206
void destroyMAIOperatorInfo(void* param) {
656,410✔
2207
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
656,410✔
2208
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
656,410✔
2209
  taosMemoryFreeClear(param);
656,410✔
2210
}
656,410✔
2211

2212
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
570,100✔
2213
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
570,100✔
2214
  if (NULL == pResult) {
570,100✔
2215
    return pResult;
×
2216
  }
2217
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
570,100✔
2218
  return pResult;
570,100✔
2219
}
2220

2221
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
887,076,554✔
2222
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
2223
  if (*pResult == NULL) {
887,076,554✔
2224
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
570,100✔
2225
    if (*pResult == NULL) {
570,100✔
2226
      return terrno;
×
2227
    }
2228
  }
2229

2230
  // set time window for current result
2231
  (*pResult)->win = (*win);
887,078,469✔
2232
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
887,083,831✔
2233
}
2234

2235
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
3,472,633✔
2236
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2237
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
3,472,633✔
2238
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
3,472,633✔
2239

2240
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
3,472,633✔
2241
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
3,472,633✔
2242
  SInterval*     pInterval = &iaInfo->interval;
3,472,633✔
2243

2244
  int32_t  startPos = 0;
3,472,633✔
2245
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
3,472,633✔
2246

2247
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
3,472,633✔
2248

2249
  // there is an result exists
2250
  if (miaInfo->curTs != INT64_MIN) {
3,472,633✔
2251
    if (ts != miaInfo->curTs) {
1,211,639✔
2252
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
1,166,943✔
2253
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,166,943✔
2254
      miaInfo->curTs = ts;
1,166,943✔
2255
    }
2256
  } else {
2257
    miaInfo->curTs = ts;
2,260,994✔
2258
  }
2259

2260
  STimeWindow win = {0};
3,472,633✔
2261
  win.skey = miaInfo->curTs;
3,472,633✔
2262
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
3,472,633✔
2263

2264
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
3,472,633✔
2265
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
3,472,633✔
2266
    T_LONG_JMP(pTaskInfo->env, ret);
×
2267
  }
2268

2269
  int32_t currPos = startPos;
3,472,633✔
2270

2271
  STimeWindow currWin = win;
3,472,633✔
2272
  while (++currPos < pBlock->info.rows) {
1,868,906,685✔
2273
    if (tsCols[currPos] == miaInfo->curTs) {
1,865,252,893✔
2274
      continue;
981,815,194✔
2275
    }
2276

2277
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
883,572,515✔
2278
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
1,767,208,225✔
2279
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
883,603,538✔
2280
    if (ret != TSDB_CODE_SUCCESS) {
883,601,623✔
2281
      T_LONG_JMP(pTaskInfo->env, ret);
×
2282
    }
2283

2284
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
883,601,623✔
2285
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
883,584,771✔
2286
    miaInfo->curTs = tsCols[currPos];
883,581,324✔
2287

2288
    currWin.skey = miaInfo->curTs;
883,595,878✔
2289
    currWin.ekey =
883,594,729✔
2290
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
883,592,814✔
2291

2292
    startPos = currPos;
883,594,729✔
2293
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
883,594,729✔
2294
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
883,622,688✔
2295
      T_LONG_JMP(pTaskInfo->env, ret);
×
2296
    }
2297

2298
    miaInfo->curTs = currWin.skey;
883,621,156✔
2299
  }
2300

2301
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
3,472,633✔
2302
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
6,945,266✔
2303
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
3,472,633✔
2304
  if (ret != TSDB_CODE_SUCCESS) {
3,472,633✔
2305
    T_LONG_JMP(pTaskInfo->env, ret);
×
2306
  }
2307
}
3,472,633✔
2308

2309
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
2,252,169✔
2310
  pRes->info.id.groupId = pMiaInfo->groupId;
2,252,169✔
2311
  pMiaInfo->curTs = INT64_MIN;
2,252,169✔
2312
  pMiaInfo->groupId = 0;
2,252,169✔
2313
}
2,252,169✔
2314

2315
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
3,132,772✔
2316
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
3,132,772✔
2317
  int32_t                               code = TSDB_CODE_SUCCESS;
3,132,772✔
2318
  int32_t                               lino = 0;
3,132,772✔
2319
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
3,132,772✔
2320
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
3,132,772✔
2321

2322
  SExprSupp*      pSup = &pOperator->exprSupp;
3,132,772✔
2323
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
3,132,772✔
2324
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
3,132,772✔
2325
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
3,132,772✔
2326

2327
  while (1) {
2,675,253✔
2328
    SSDataBlock* pBlock = NULL;
5,808,025✔
2329
    if (pMiaInfo->prefetchedBlock == NULL) {
5,808,025✔
2330
      pBlock = getNextBlockFromDownstream(pOperator, 0);
4,117,131✔
2331
    } else {
2332
      pBlock = pMiaInfo->prefetchedBlock;
1,690,894✔
2333
      pMiaInfo->prefetchedBlock = NULL;
1,690,894✔
2334

2335
      pMiaInfo->groupId = pBlock->info.id.groupId;
1,690,894✔
2336
    }
2337

2338
    // no data exists, all query processing is done
2339
    if (pBlock == NULL) {
5,808,025✔
2340
      // close last unclosed time window
2341
      if (pMiaInfo->curTs != INT64_MIN) {
644,498✔
2342
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
561,275✔
2343
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
561,275✔
2344
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
561,275✔
2345
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
561,275✔
2346
        QUERY_CHECK_CODE(code, lino, _end);
561,275✔
2347
      }
2348

2349
      setOperatorCompleted(pOperator);
644,498✔
2350
      break;
644,498✔
2351
    }
2352

2353
    if (pMiaInfo->groupId == 0) {
5,163,527✔
2354
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
1,190,353✔
2355
        pMiaInfo->groupId = pBlock->info.id.groupId;
129,836✔
2356
        pRes->info.id.groupId = pMiaInfo->groupId;
129,836✔
2357
      }
2358
    } else {
2359
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
3,973,174✔
2360
        // if there are unclosed time window, close it firstly.
2361
        if (pMiaInfo->curTs == INT64_MIN) {
1,690,894✔
2362
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2363
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2364
        }
2365
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
1,690,894✔
2366
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,690,894✔
2367

2368
        pMiaInfo->prefetchedBlock = pBlock;
1,690,894✔
2369
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
1,690,894✔
2370
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,690,894✔
2371
        QUERY_CHECK_CODE(code, lino, _end);
1,690,894✔
2372
        if (pRes->info.rows == 0) {
1,690,894✔
2373
          // After filtering for last group, the result is empty, so we need to continue to process next group
2374
          continue;
13,022✔
2375
        } else {
2376
          break;
1,677,872✔
2377
        }
2378
      } else {
2379
        // continue
2380
        pRes->info.id.groupId = pMiaInfo->groupId;
2,282,280✔
2381
      }
2382
    }
2383

2384
    pRes->info.scanFlag = pBlock->info.scanFlag;
3,472,633✔
2385
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
3,472,633✔
2386
    QUERY_CHECK_CODE(code, lino, _end);
3,472,633✔
2387

2388
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
3,472,633✔
2389

2390
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
3,472,633✔
2391
    QUERY_CHECK_CODE(code, lino, _end);
3,472,633✔
2392

2393
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
3,472,633✔
2394
      break;
810,402✔
2395
    }
2396
  }
2397

2398
_end:
3,132,772✔
2399
  if (code != TSDB_CODE_SUCCESS) {
3,132,772✔
2400
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2401
    pTaskInfo->code = code;
×
2402
    T_LONG_JMP(pTaskInfo->env, code);
×
2403
  }
2404
}
3,132,772✔
2405

2406
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,332,443✔
2407
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
3,332,443✔
2408
  int32_t                               code = TSDB_CODE_SUCCESS;
3,332,443✔
2409
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
3,332,443✔
2410
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
3,332,443✔
2411
  if (pOperator->status == OP_EXEC_DONE) {
3,332,443✔
2412
    (*ppRes) = NULL;
610,311✔
2413
    return code;
610,311✔
2414
  }
2415

2416
  SSDataBlock* pRes = iaInfo->binfo.pRes;
2,722,132✔
2417
  blockDataCleanup(pRes);
2,722,132✔
2418

2419
  if (iaInfo->binfo.mergeResultBlock) {
2,722,132✔
2420
    while (1) {
2421
      if (pOperator->status == OP_EXEC_DONE) {
2,624,980✔
2422
        break;
291,843✔
2423
      }
2424

2425
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
2,333,137✔
2426
        break;
815,327✔
2427
      }
2428

2429
      doMergeAlignedIntervalAgg(pOperator);
1,517,810✔
2430
    }
2431
  } else {
2432
    doMergeAlignedIntervalAgg(pOperator);
1,614,962✔
2433
  }
2434

2435
  size_t rows = pRes->info.rows;
2,722,132✔
2436
  pOperator->resultInfo.totalRows += rows;
2,722,132✔
2437
  (*ppRes) = (rows == 0) ? NULL : pRes;
2,722,132✔
2438
  return code;
2,722,132✔
2439
}
2440

2441
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
1,420✔
2442
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
1,420✔
2443
  
2444
  uint64_t     groupId;  // current groupId
2445
  int64_t      curTs;    // current ts
2446
  SSDataBlock* prefetchedBlock;
2447
  SResultRow*  pResultRow;
2448

2449
  pInfo->groupId = 0;
1,420✔
2450
  pInfo->curTs = INT64_MIN;
1,420✔
2451
  pInfo->prefetchedBlock = NULL;
1,420✔
2452
  pInfo->pResultRow = NULL;
1,420✔
2453

2454
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
1,420✔
2455
}
2456

2457
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
656,410✔
2458
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2459
  QRY_PARAM_CHECK(pOptrInfo);
656,410✔
2460

2461
  int32_t                               code = TSDB_CODE_SUCCESS;
656,410✔
2462
  int32_t                               lino = 0;
656,410✔
2463
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
656,410✔
2464
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
656,410✔
2465
  if (miaInfo == NULL || pOperator == NULL) {
656,410✔
2466
    code = terrno;
×
2467
    goto _error;
×
2468
  }
2469

2470
  pOperator->pPhyNode = pNode;
656,410✔
2471
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
656,410✔
2472
  if (miaInfo->intervalAggOperatorInfo == NULL) {
656,410✔
2473
    code = terrno;
×
2474
    goto _error;
×
2475
  }
2476

2477
  SInterval interval = {.interval = pNode->interval,
1,969,230✔
2478
                        .sliding = pNode->sliding,
656,410✔
2479
                        .intervalUnit = pNode->intervalUnit,
656,410✔
2480
                        .slidingUnit = pNode->slidingUnit,
656,410✔
2481
                        .offset = pNode->offset,
656,410✔
2482
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
656,410✔
2483
                        .timeRange = pNode->timeRange};
2484
  calcIntervalAutoOffset(&interval);
656,410✔
2485

2486
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
656,410✔
2487
  SExprSupp*                pSup = &pOperator->exprSupp;
656,410✔
2488
  pSup->hasWindowOrGroup = true;
656,410✔
2489
  pSup->hasWindow = true;
656,410✔
2490

2491
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
656,410✔
2492
                            pTaskInfo->pStreamRuntimeInfo);
656,410✔
2493
  QUERY_CHECK_CODE(code, lino, _error);
656,410✔
2494

2495
  miaInfo->curTs = INT64_MIN;
656,410✔
2496
  iaInfo->win = pTaskInfo->window;
656,410✔
2497
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
656,410✔
2498
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
656,410✔
2499
  iaInfo->interval = interval;
656,410✔
2500
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
656,410✔
2501
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
656,410✔
2502

2503
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
656,410✔
2504
  initResultSizeInfo(&pOperator->resultInfo, 512);
656,410✔
2505

2506
  int32_t    num = 0;
656,410✔
2507
  SExprInfo* pExprInfo = NULL;
656,410✔
2508
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
656,410✔
2509
  QUERY_CHECK_CODE(code, lino, _error);
656,027✔
2510

2511
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
1,312,054✔
2512
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
656,027✔
2513
  QUERY_CHECK_CODE(code, lino, _error);
656,410✔
2514

2515
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
656,410✔
2516
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
656,410✔
2517
  initBasicInfo(&iaInfo->binfo, pResBlock);
656,410✔
2518
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
656,410✔
2519
  QUERY_CHECK_CODE(code, lino, _error);
656,410✔
2520

2521
  iaInfo->timeWindowInterpo = false;
656,410✔
2522
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
656,410✔
2523
  QUERY_CHECK_CODE(code, lino, _error);
656,410✔
2524
  if (iaInfo->timeWindowInterpo) {
656,410✔
2525
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2526
  }
2527

2528
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
656,410✔
2529
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
656,410✔
2530
  QUERY_CHECK_CODE(code, lino, _error);
656,410✔
2531
  iaInfo->pOperator = pOperator;
656,410✔
2532
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
656,410✔
2533
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2534

2535
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
656,410✔
2536
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2537
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
656,410✔
2538

2539
  code = appendDownstream(pOperator, &downstream, 1);
656,410✔
2540
  QUERY_CHECK_CODE(code, lino, _error);
656,410✔
2541

2542
  *pOptrInfo = pOperator;
656,410✔
2543
  return TSDB_CODE_SUCCESS;
656,410✔
2544

2545
_error:
×
2546
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2547
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2548
  pTaskInfo->code = code;
×
2549
  return code;
×
2550
}
2551

2552
//=====================================================================================================================
2553
// merge interval operator
2554
typedef struct SMergeIntervalAggOperatorInfo {
2555
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2556
  SList*                   groupIntervals;
2557
  SListIter                groupIntervalsIter;
2558
  bool                     hasGroupId;
2559
  uint64_t                 groupId;
2560
  SSDataBlock*             prefetchedBlock;
2561
  bool                     inputBlocksFinished;
2562
} SMergeIntervalAggOperatorInfo;
2563

2564
typedef struct SGroupTimeWindow {
2565
  uint64_t    groupId;
2566
  STimeWindow window;
2567
} SGroupTimeWindow;
2568

2569
void destroyMergeIntervalOperatorInfo(void* param) {
×
2570
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2571
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2572
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2573

2574
  taosMemoryFreeClear(param);
×
2575
}
×
2576

2577
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2578
                                        STimeWindow* newWin) {
2579
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2580
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2581
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2582

2583
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2584
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2585
  if (code != TSDB_CODE_SUCCESS) {
×
2586
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2587
    return code;
×
2588
  }
2589

2590
  SListIter iter = {0};
×
2591
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2592
  SListNode* listNode = NULL;
×
2593
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2594
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2595
    if (prevGrpWin->groupId != tableGroupId) {
×
2596
      continue;
×
2597
    }
2598

2599
    STimeWindow* prevWin = &prevGrpWin->window;
×
2600
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2601
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2602
      taosMemoryFreeClear(tmp);
×
2603
    }
2604
  }
2605

2606
  return TSDB_CODE_SUCCESS;
×
2607
}
2608

2609
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2610
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2611
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2612
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2613

2614
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2615
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2616

2617
  int32_t     startPos = 0;
×
2618
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2619
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2620
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2621
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2622
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2623
  SResultRow* pResult = NULL;
×
2624

2625
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2626
                                        iaInfo->binfo.inputTsOrder);
2627

2628
  int32_t ret =
2629
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2630
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2631
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2632
    T_LONG_JMP(pTaskInfo->env, ret);
×
2633
  }
2634

2635
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2636
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2637
                                                 iaInfo->binfo.inputTsOrder);
2638
  if (forwardRows <= 0) {
×
2639
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2640
  }
2641

2642
  // prev time window not interpolation yet.
2643
  if (iaInfo->timeWindowInterpo) {
×
2644
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2645
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2646

2647
    // restore current time window
2648
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2649
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2650
    if (ret != TSDB_CODE_SUCCESS) {
×
2651
      T_LONG_JMP(pTaskInfo->env, ret);
×
2652
    }
2653

2654
    // window start key interpolation
2655
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2656
    if (ret != TSDB_CODE_SUCCESS) {
×
2657
      T_LONG_JMP(pTaskInfo->env, ret);
×
2658
    }
2659
  }
2660

2661
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2662
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2663
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
2664
  if (ret != TSDB_CODE_SUCCESS) {
×
2665
    T_LONG_JMP(pTaskInfo->env, ret);
×
2666
  }
2667
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2668

2669
  // output previous interval results after this interval (&win) is closed
2670
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2671
  if (code != TSDB_CODE_SUCCESS) {
×
2672
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2673
    T_LONG_JMP(pTaskInfo->env, code);
×
2674
  }
2675

2676
  STimeWindow nextWin = win;
×
2677
  while (1) {
×
2678
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2679
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2680
                                      iaInfo->binfo.inputTsOrder);
2681
    if (startPos < 0) {
×
2682
      break;
×
2683
    }
2684

2685
    // null data, failed to allocate more memory buffer
2686
    code =
2687
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2688
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2689
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2690
      T_LONG_JMP(pTaskInfo->env, code);
×
2691
    }
2692

2693
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2694
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2695
                                           iaInfo->binfo.inputTsOrder);
2696

2697
    // window start(end) key interpolation
2698
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2699
    if (code != TSDB_CODE_SUCCESS) {
×
2700
      T_LONG_JMP(pTaskInfo->env, code);
×
2701
    }
2702

2703
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2704
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2705
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2706
    if (code != TSDB_CODE_SUCCESS) {
×
2707
      T_LONG_JMP(pTaskInfo->env, code);
×
2708
    }
2709
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2710

2711
    // output previous interval results after this interval (&nextWin) is closed
2712
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2713
    if (code != TSDB_CODE_SUCCESS) {
×
2714
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2715
      T_LONG_JMP(pTaskInfo->env, code);
×
2716
    }
2717
  }
2718

2719
  if (iaInfo->timeWindowInterpo) {
×
2720
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2721
  }
2722
}
×
2723

2724
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2725
  int32_t        code = TSDB_CODE_SUCCESS;
×
2726
  int32_t        lino = 0;
×
2727
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2728

2729
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2730
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2731
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2732

2733
  if (pOperator->status == OP_EXEC_DONE) {
×
2734
    (*ppRes) = NULL;
×
2735
    return code;
×
2736
  }
2737

2738
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2739
  blockDataCleanup(pRes);
×
2740
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2741
  QUERY_CHECK_CODE(code, lino, _end);
×
2742

2743
  if (!miaInfo->inputBlocksFinished) {
×
2744
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2745
    while (1) {
×
2746
      SSDataBlock* pBlock = NULL;
×
2747
      if (miaInfo->prefetchedBlock == NULL) {
×
2748
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2749
      } else {
2750
        pBlock = miaInfo->prefetchedBlock;
×
2751
        miaInfo->groupId = pBlock->info.id.groupId;
×
2752
        miaInfo->prefetchedBlock = NULL;
×
2753
      }
2754

2755
      if (pBlock == NULL) {
×
2756
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2757
        miaInfo->inputBlocksFinished = true;
×
2758
        break;
×
2759
      }
2760

2761
      if (!miaInfo->hasGroupId) {
×
2762
        miaInfo->hasGroupId = true;
×
2763
        miaInfo->groupId = pBlock->info.id.groupId;
×
2764
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2765
        miaInfo->prefetchedBlock = pBlock;
×
2766
        break;
×
2767
      }
2768

2769
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2770
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2771
      QUERY_CHECK_CODE(code, lino, _end);
×
2772

2773
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2774

2775
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2776
        break;
×
2777
      }
2778
    }
2779

2780
    pRes->info.id.groupId = miaInfo->groupId;
×
2781
  }
2782

2783
  if (miaInfo->inputBlocksFinished) {
×
2784
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2785

2786
    if (listNode != NULL) {
×
2787
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2788
      pRes->info.id.groupId = grpWin->groupId;
×
2789
    }
2790
  }
2791

2792
  if (pRes->info.rows == 0) {
×
2793
    setOperatorCompleted(pOperator);
×
2794
  }
2795

2796
  size_t rows = pRes->info.rows;
×
2797
  pOperator->resultInfo.totalRows += rows;
×
2798

2799
_end:
×
2800
  if (code != TSDB_CODE_SUCCESS) {
×
2801
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2802
    pTaskInfo->code = code;
×
2803
    T_LONG_JMP(pTaskInfo->env, code);
×
2804
  }
2805
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2806
  return code;
×
2807
}
2808

2809
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
2810
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
2811

2812
  pInfo->hasGroupId = false;
×
2813
  pInfo->groupId = 0;
×
2814
  pInfo->prefetchedBlock = NULL;
×
2815
  pInfo->inputBlocksFinished = false;
×
2816
  tdListEmpty(pInfo->groupIntervals);
×
2817
  
2818
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
2819
  return resetInterval(pOper, pIntervalInfo);
×
2820
}
2821

2822
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2823
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2824
  QRY_PARAM_CHECK(pOptrInfo);
×
2825

2826
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2827
  int32_t                        lino = 0;
×
2828
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2829
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2830
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2831
    code = terrno;
×
2832
    goto _error;
×
2833
  }
2834

2835
  pOperator->pPhyNode = pIntervalPhyNode;
×
2836
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2837
                        .sliding = pIntervalPhyNode->sliding,
×
2838
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2839
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2840
                        .offset = pIntervalPhyNode->offset,
×
2841
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2842
                        .timeRange = pIntervalPhyNode->timeRange};
2843
  calcIntervalAutoOffset(&interval);
×
2844

2845
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2846

2847
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2848
  pIntervalInfo->win = pTaskInfo->window;
×
2849
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2850
  pIntervalInfo->interval = interval;
×
2851
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2852
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2853
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2854

2855
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2856
  pExprSupp->hasWindowOrGroup = true;
×
2857
  pExprSupp->hasWindow = true;
×
2858

2859
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2860
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2861

2862
  int32_t    num = 0;
×
2863
  SExprInfo* pExprInfo = NULL;
×
2864
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2865
  QUERY_CHECK_CODE(code, lino, _error);
×
2866

2867
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2868
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2869
  if (code != TSDB_CODE_SUCCESS) {
×
2870
    goto _error;
×
2871
  }
2872

2873
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2874
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2875
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2876
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2877
  QUERY_CHECK_CODE(code, lino, _error);
×
2878

2879
  pIntervalInfo->timeWindowInterpo = false;
×
2880
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2881
  QUERY_CHECK_CODE(code, lino, _error);
×
2882
  if (pIntervalInfo->timeWindowInterpo) {
×
2883
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2884
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2885
      goto _error;
×
2886
    }
2887
  }
2888

2889
  pIntervalInfo->pOperator = pOperator;
×
2890
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2891
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2892
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2893
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2894
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2895
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
2896

2897
  code = appendDownstream(pOperator, &downstream, 1);
×
2898
  if (code != TSDB_CODE_SUCCESS) {
×
2899
    goto _error;
×
2900
  }
2901

2902
  *pOptrInfo = pOperator;
×
2903
  return TSDB_CODE_SUCCESS;
×
2904
_error:
×
2905
  if (pMergeIntervalInfo != NULL) {
×
2906
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2907
  }
2908
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2909
  pTaskInfo->code = code;
×
2910
  return code;
×
2911
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc