• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4896

24 Dec 2025 07:36AM UTC coverage: 65.929% (+0.4%) from 65.513%
#4896

push

travis-ci

web-flow
enh: [TS-7591] Some code refactor and add more log. (#34022)

326 of 537 new or added lines in 4 files covered. (60.71%)

370 existing lines in 111 files now uncovered.

185828 of 281861 relevant lines covered (65.93%)

116309824.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.53
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47

48
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
2,147,483,647✔
49
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
50
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
51
                                      SExecTaskInfo* pTaskInfo) {
52
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
2,147,483,647✔
53
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
54

55
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,147,483,647✔
56
    *pResult = NULL;
98,198✔
57
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
58
    return pTaskInfo->code;
×
59
  }
60

61
  // set time window for current result
62
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
2,147,483,647✔
63
  *pResult = pResultRow;
2,147,483,647✔
64
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,147,483,647✔
65
}
66

67
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, int32_t rowIndex, uint64_t groupId) {
1,061,641,786✔
68
  pRowSup->win.ekey = ts;
1,061,641,786✔
69
  pRowSup->prevTs = ts;
1,061,641,786✔
70
  pRowSup->groupId = groupId;
1,061,641,786✔
71
  pRowSup->numOfRows += 1;
1,061,641,786✔
72
  if (hasContinuousNullRows(pRowSup)) {
1,061,641,646✔
73
    // rows having null state col are wrapped by rows of same state
74
    // these rows can be counted into current window
75
    pRowSup->numOfRows += pRowSup->numNullRows;
5,409,991✔
76
    resetNumNullRows(pRowSup);
5,409,991✔
77
  }
78
}
1,061,641,506✔
79

80
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
374,771,841✔
81
  pRowSup->startRowIndex = rowIndex;
374,771,841✔
82
  pRowSup->numOfRows = 0;
374,771,841✔
83
  pRowSup->win.skey = tsList[rowIndex];
374,771,841✔
84
  pRowSup->groupId = groupId;
374,771,841✔
85
  resetNumNullRows(pRowSup);
374,771,841✔
86
}
374,771,841✔
87

88
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
89
                                            int32_t order, int64_t* pData) {
90
  int32_t forwardRows = 0;
1,308,905,645✔
91

92
  if (order == TSDB_ORDER_ASC) {
×
93
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
1,052,566,694✔
94
    if (end >= 0) {
1,050,959,558✔
95
      forwardRows = end;
1,049,392,694✔
96

97
      while (pData[end + pos] == ekey) {
1,055,438,661✔
98
        forwardRows += 1;
6,045,967✔
99
        ++pos;
6,045,967✔
100
      }
101
    }
102
  } else {
103
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
256,338,951✔
104
    if (end >= 0) {
256,473,735✔
105
      forwardRows = end;
256,595,831✔
106

107
      while (pData[end + pos] == ekey) {
506,337,847✔
108
        forwardRows += 1;
249,742,016✔
109
        ++pos;
249,742,016✔
110
      }
111
    }
112
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
113
    //    if (end >= 0) {
114
    //      forwardRows = pos - end;
115
    //
116
    //      if (pData[end] == ekey) {
117
    //        forwardRows += 1;
118
    //      }
119
    //    }
120
  }
121

122
  return forwardRows;
1,305,751,159✔
123
}
124

125
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
1,348,344,792✔
126
  int32_t midPos = -1;
1,348,344,792✔
127
  int32_t numOfRows;
128

129
  if (num <= 0) {
1,348,344,792✔
130
    return -1;
×
131
  }
132

133
  TSKEY*  keyList = (TSKEY*)pValue;
1,348,344,792✔
134
  int32_t firstPos = 0;
1,348,344,792✔
135
  int32_t lastPos = num - 1;
1,348,344,792✔
136

137
  if (order == TSDB_ORDER_DESC) {
1,348,344,792✔
138
    // find the first position which is smaller than the key
139
    while (1) {
140
      if (key >= keyList[firstPos]) return firstPos;
316,972,624✔
141
      if (key == keyList[lastPos]) return lastPos;
68,137,010✔
142

143
      if (key < keyList[lastPos]) {
67,580,955✔
144
        lastPos += 1;
7,063,329✔
145
        if (lastPos >= num) {
7,063,329✔
146
          return -1;
×
147
        } else {
148
          return lastPos;
7,063,329✔
149
        }
150
      }
151

152
      numOfRows = lastPos - firstPos + 1;
60,518,224✔
153
      midPos = (numOfRows >> 1) + firstPos;
60,518,224✔
154

155
      if (key < keyList[midPos]) {
60,518,224✔
156
        firstPos = midPos + 1;
1,268,373✔
157
      } else if (key > keyList[midPos]) {
59,249,851✔
158
        lastPos = midPos - 1;
58,626,421✔
159
      } else {
160
        break;
623,430✔
161
      }
162
    }
163

164
  } else {
165
    // find the first position which is bigger than the key
166
    while (1) {
167
      if (key <= keyList[firstPos]) return firstPos;
2,147,483,647✔
168
      if (key == keyList[lastPos]) return lastPos;
2,147,483,647✔
169

170
      if (key > keyList[lastPos]) {
2,147,483,647✔
171
        lastPos = lastPos + 1;
1,001,097,385✔
172
        if (lastPos >= num)
1,001,097,385✔
173
          return -1;
385,560✔
174
        else
175
          return lastPos;
1,000,711,825✔
176
      }
177

178
      numOfRows = lastPos - firstPos + 1;
2,147,483,647✔
179
      midPos = (numOfRows >> 1u) + firstPos;
2,147,483,647✔
180

181
      if (key < keyList[midPos]) {
2,147,483,647✔
182
        lastPos = midPos - 1;
2,147,483,647✔
183
      } else if (key > keyList[midPos]) {
209,903,518✔
184
        firstPos = midPos + 1;
200,518,516✔
185
      } else {
186
        break;
9,389,598✔
187
      }
188
    }
189
  }
190

191
  return midPos;
10,013,028✔
192
}
193

194
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
1,323,946,399✔
195
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
196
  int32_t num = -1;
1,323,946,399✔
197
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
1,323,946,399✔
198

199
  if (order == TSDB_ORDER_ASC) {
1,323,946,399✔
200
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
1,072,381,172✔
201
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
1,053,322,573✔
202
      if (item != NULL) {
1,049,035,310✔
203
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
204
      }
205
    } else {
206
      num = pDataBlockInfo->rows - startPos;
17,459,813✔
207
      if (item != NULL) {
19,715,371✔
208
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
209
      }
210
    }
211
  } else {  // desc
212
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
251,565,227✔
213
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
256,638,622✔
214
      if (item != NULL) {
256,715,849✔
215
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
216
      }
217
    } else {
218
      num = pDataBlockInfo->rows - startPos;
167,879✔
219
      if (item != NULL) {
355,269✔
220
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
221
      }
222
    }
223
  }
224

225
  return num;
1,326,921,896✔
226
}
227

228
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
11,670,139✔
229
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
230
  SqlFunctionCtx* pCtx = pSup->pCtx;
11,670,139✔
231

232
  int32_t index = 1;
11,670,139✔
233
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
35,013,363✔
234
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
23,343,224✔
235
      pCtx[k].start.key = INT64_MIN;
11,673,085✔
236
      continue;
11,673,085✔
237
    }
238

239
    SFunctParam*     pParam = &pCtx[k].param[0];
11,670,139✔
240
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
11,670,139✔
241

242
    double v1 = 0, v2 = 0, v = 0;
11,670,139✔
243
    if (prevRowIndex == -1) {
11,670,139✔
244
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
245
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
×
246
    } else {
247
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
11,670,139✔
248
                     typeGetTypeModFromColInfo(&pColInfo->info));
249
    }
250

251
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
11,670,139✔
252
                   typeGetTypeModFromColInfo(&pColInfo->info));
253

254
#if 0
255
    if (functionId == FUNCTION_INTERP) {
256
      if (type == RESULT_ROW_START_INTERP) {
257
        pCtx[k].start.key = prevTs;
258
        pCtx[k].start.val = v1;
259

260
        pCtx[k].end.key = curTs;
261
        pCtx[k].end.val = v2;
262

263
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
264
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
265
          if (prevRowIndex == -1) {
266
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
267
          } else {
268
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
269
          }
270

271
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
272
        }
273
      }
274
    } else if (functionId == FUNCTION_TWA) {
275
#endif
276

277
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
11,670,139✔
278
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
11,670,139✔
279
    SPoint point = (SPoint){.key = windowKey, .val = &v};
11,670,139✔
280

281
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
11,670,139✔
282
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
11,601,169✔
283
    }
284

285
    if (type == RESULT_ROW_START_INTERP) {
11,670,139✔
286
      pCtx[k].start.key = point.key;
5,803,525✔
287
      pCtx[k].start.val = v;
5,803,525✔
288
    } else {
289
      pCtx[k].end.key = point.key;
5,866,614✔
290
      pCtx[k].end.val = v;
5,866,614✔
291
    }
292

293
    index += 1;
11,670,139✔
294
  }
295
#if 0
296
  }
297
#endif
298
}
11,670,139✔
299

300
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
752,297✔
301
  if (type == RESULT_ROW_START_INTERP) {
752,297✔
302
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,188,394✔
303
      pCtx[k].start.key = INT64_MIN;
780,701✔
304
    }
305
  } else {
306
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,028,599✔
307
      pCtx[k].end.key = INT64_MIN;
683,995✔
308
    }
309
  }
310
}
752,297✔
311

312
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
6,211,218✔
313
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
314
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
6,211,218✔
315

316
  TSKEY curTs = tsCols[pos];
6,211,218✔
317

318
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
6,211,218✔
319
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
6,211,218✔
320

321
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
322
  // start exactly from this point, no need to do interpolation
323
  TSKEY key = ascQuery ? win->skey : win->ekey;
6,211,218✔
324
  if (key == curTs) {
6,211,218✔
325
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
398,651✔
326
    return true;
398,651✔
327
  }
328

329
  // it is the first time window, no need to do interpolation
330
  if (pTsKey->isNull && pos == 0) {
5,812,567✔
331
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
9,042✔
332
  } else {
333
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
5,803,525✔
334
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
5,803,525✔
335
                              RESULT_ROW_START_INTERP, pSup);
336
  }
337

338
  return true;
5,812,567✔
339
}
340

341
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
6,211,218✔
342
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
343
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
344
  int32_t code = TSDB_CODE_SUCCESS;
6,211,218✔
345
  int32_t lino = 0;
6,211,218✔
346
  int32_t order = pInfo->binfo.inputTsOrder;
6,211,218✔
347

348
  TSKEY actualEndKey = tsCols[endRowIndex];
6,211,218✔
349
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
6,211,218✔
350

351
  // not ended in current data block, do not invoke interpolation
352
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
6,211,218✔
353
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
13,714✔
354
    (*pRes) = false;
13,714✔
355
    return code;
13,714✔
356
  }
357

358
  // there is actual end point of current time window, no interpolation needs
359
  if (key == actualEndKey) {
6,197,504✔
360
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
330,890✔
361
    (*pRes) = true;
330,890✔
362
    return code;
330,890✔
363
  }
364

365
  if (nextRowIndex < 0) {
5,866,614✔
366
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
367
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
368
  }
369

370
  TSKEY nextKey = tsCols[nextRowIndex];
5,866,614✔
371
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
5,866,614✔
372
                            RESULT_ROW_END_INTERP, pSup);
373
  (*pRes) = true;
5,866,614✔
374
  return code;
5,866,614✔
375
}
376

377
bool inCalSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
2,147,483,647✔
378
  if (pInterval->interval != pInterval->sliding &&
2,147,483,647✔
379
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
48,417,772✔
380
    return false;
×
381
  }
382

383
  return true;
2,147,483,647✔
384
}
385

386
bool inSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, const SDataBlockInfo* pBlockInfo) {
2,147,483,647✔
387
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
2,147,483,647✔
388
}
389

390
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
2,147,483,647✔
391
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
392
  bool ascQuery = (order != TSDB_ORDER_DESC);
2,147,483,647✔
393

394
  int32_t precision = pInterval->precision;
2,147,483,647✔
395
  getNextTimeWindow(pInterval, pNext, order);
2,147,483,647✔
396

397
  // next time window is not in current block
398
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
2,147,483,647✔
399
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
2,147,483,647✔
400
    return -1;
9,681,001✔
401
  }
402

403
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
2,147,483,647✔
404
    return -1;
×
405
  }
406

407
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
2,147,483,647✔
408
  int32_t startPos = 0;
2,147,483,647✔
409

410
  // tumbling time window query, a special case of sliding time window query
411
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
2,147,483,647✔
412
    startPos = prevPosition + 1;
2,147,483,647✔
413
  } else {
414
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
45,022,534✔
415
      startPos = 0;
6,734,143✔
416
    } else {
417
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
41,683,629✔
418
    }
419
  }
420
  if(startPos < 0 || startPos >= pDataBlockInfo->rows) {
2,147,483,647✔
421
    return -1;
1,945,737,311✔
422
  }
423

424
  /* interp query with fill should not skip time window */
425
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
426
  //    return startPos;
427
  //  }
428

429
  /*
430
   * This time window does not cover any data, try next time window,
431
   * this case may happen when the time window is too small
432
   */
433
  if (primaryKeys != NULL) {
1,334,498,327✔
434
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
2,147,483,647✔
435
      TSKEY next = primaryKeys[startPos];
939,983,850✔
436
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
940,178,330✔
437
        pNext->skey = taosTimeTruncate(next, pInterval);
282,006✔
438
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
1,972✔
439
      } else {
440
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
940,392,820✔
441
        pNext->skey = pNext->ekey - pInterval->interval + 1;
940,873,366✔
442
      }
443
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
393,123,898✔
444
      TSKEY next = primaryKeys[startPos];
254,847,423✔
445
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
254,813,646✔
UNCOV
446
        pNext->skey = taosTimeTruncate(next, pInterval);
×
447
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
448
      } else {
449
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
254,979,007✔
450
        pNext->ekey = pNext->skey + pInterval->interval - 1;
254,962,510✔
451
      }
452
    }
453
  }
454

455
  return startPos;
1,331,392,928✔
456
}
457

458
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
18,633,654✔
459
  if (type == RESULT_ROW_START_INTERP) {
18,633,654✔
460
    return pResult->startInterp == true;
6,211,218✔
461
  } else {
462
    return pResult->endInterp == true;
12,422,436✔
463
  }
464
}
465

466
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
12,408,722✔
467
  if (type == RESULT_ROW_START_INTERP) {
12,408,722✔
468
    pResult->startInterp = true;
6,211,218✔
469
  } else {
470
    pResult->endInterp = true;
6,197,504✔
471
  }
472
}
12,408,722✔
473

474
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
2,147,483,647✔
475
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
476
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
477
  int32_t lino = 0;
2,147,483,647✔
478
  if (!pInfo->timeWindowInterpo) {
2,147,483,647✔
479
    return code;
2,147,483,647✔
480
  }
481

482
  if (pBlock == NULL) {
5,638,971✔
483
    code = TSDB_CODE_INVALID_PARA;
×
484
    return code;
×
485
  }
486

487
  if (pBlock->pDataBlock == NULL) {
5,638,971✔
488
    return code;
×
489
  }
490

491
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
6,211,218✔
492

493
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
6,211,218✔
494
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
6,211,218✔
495
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
6,211,218✔
496
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
6,211,218✔
497
    if (interp) {
6,211,218✔
498
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
6,211,218✔
499
    }
500
  } else {
501
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
×
502
  }
503

504
  // point interpolation does not require the end key time window interpolation.
505
  // interpolation query does not generate the time window end interpolation
506
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
6,211,218✔
507
  if (!done) {
6,211,218✔
508
    int32_t endRowIndex = startPos + forwardRows - 1;
6,211,218✔
509
    int32_t nextRowIndex = endRowIndex + 1;
6,211,218✔
510

511
    // duplicated ts row does not involve in the interpolation of end value for current time window
512
    int32_t x = endRowIndex;
6,211,218✔
513
    while (x > 0) {
6,224,902✔
514
      if (tsCols[x] == tsCols[x - 1]) {
6,210,073✔
515
        x -= 1;
13,684✔
516
      } else {
517
        endRowIndex = x;
6,196,389✔
518
        break;
6,196,389✔
519
      }
520
    }
521

522
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
6,211,218✔
523
    bool  interp = false;
6,211,218✔
524
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
6,211,218✔
525
                                           win, &interp);
526
    QUERY_CHECK_CODE(code, lino, _end);
6,211,218✔
527
    if (interp) {
6,211,218✔
528
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
6,197,504✔
529
    }
530
  } else {
531
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
532
  }
533

534
_end:
6,211,218✔
535
  if (code != TSDB_CODE_SUCCESS) {
6,211,218✔
536
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
537
  }
538
  return code;
6,211,218✔
539
}
540

541
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
18,994✔
542
  if (pBlock->pDataBlock == NULL) {
18,994✔
543
    return;
×
544
  }
545

546
  size_t num = taosArrayGetSize(pPrevKeys);
18,994✔
547
  for (int32_t k = 0; k < num; ++k) {
56,982✔
548
    SColumn* pc = taosArrayGet(pCols, k);
37,988✔
549

550
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
37,988✔
551

552
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
37,988✔
553
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
37,988✔
554
      if (colDataIsNull_s(pColInfo, i)) {
75,976✔
555
        continue;
×
556
      }
557

558
      char* val = colDataGetData(pColInfo, i);
37,988✔
559
      if (IS_VAR_DATA_TYPE(pkey->type)) {
37,988✔
560
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
561
          memcpy(pkey->pData, val, blobDataTLen(val));
×
562
        } else {
563
          memcpy(pkey->pData, val, varDataTLen(val));
×
564
        }
565
      } else {
566
        memcpy(pkey->pData, val, pkey->bytes);
37,988✔
567
      }
568

569
      break;
37,988✔
570
    }
571
  }
572
}
573

574
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
18,994✔
575
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
576
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
18,994✔
577

578
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
18,994✔
579
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
18,994✔
580

581
  int32_t startPos = 0;
18,994✔
582
  int32_t numOfOutput = pSup->numOfExprs;
18,994✔
583

584
  SResultRow* pResult = NULL;
18,994✔
585

586
  while (1) {
×
587
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
18,994✔
588
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
18,994✔
589
    uint64_t            groupId = pOpenWin->groupId;
18,994✔
590
    SResultRowPosition* p1 = &pOpenWin->pos;
18,994✔
591
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
18,994✔
592
      break;
18,994✔
593
    }
594

595
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
596
    if (NULL == pr) {
×
597
      T_LONG_JMP(pTaskInfo->env, terrno);
×
598
    }
599

600
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
601
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
602
      T_LONG_JMP(pTaskInfo->env, terrno);
×
603
    }
604

605
    if (pr->closed) {
×
606
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
607
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
608
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
609
        T_LONG_JMP(pTaskInfo->env, terrno);
×
610
      }
611
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
612
      taosMemoryFree(pNode);
×
613
      continue;
×
614
    }
615

616
    STimeWindow w = pr->win;
×
617
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
618
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
619
    if (ret != TSDB_CODE_SUCCESS) {
×
620
      T_LONG_JMP(pTaskInfo->env, ret);
×
621
    }
622

623
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
624
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
625
      T_LONG_JMP(pTaskInfo->env, terrno);
×
626
    }
627

628
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
629
    if (!pTsKey) {
×
630
      pTaskInfo->code = terrno;
×
631
      T_LONG_JMP(pTaskInfo->env, terrno);
×
632
    }
633

634
    int64_t prevTs = *(int64_t*)pTsKey->pData;
×
635
    if (groupId == pBlock->info.id.groupId) {
×
636
      TSKEY curTs = pBlock->info.window.skey;
×
637
      if (tsCols != NULL) {
×
638
        curTs = tsCols[startPos];
×
639
      }
640
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
641
                                RESULT_ROW_END_INTERP, pSup);
642
    }
643

644
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
645
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
646

647
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
648
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
649
                                          pBlock->info.rows, numOfExprs);
×
650
    if (ret != TSDB_CODE_SUCCESS) {
×
651
      T_LONG_JMP(pTaskInfo->env, ret);
×
652
    }
653

654
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
655
      closeResultRow(pr);
×
656
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
657
      taosMemoryFree(pNode);
×
658
    } else {  // the remains are can not be closed yet.
659
      break;
×
660
    }
661
  }
662
}
18,994✔
663

664
static bool tsKeyCompFn(void* l, void* r, void* param) {
1,310,860,697✔
665
  TSKEY*                    lTS = (TSKEY*)l;
1,310,860,697✔
666
  TSKEY*                    rTS = (TSKEY*)r;
1,310,860,697✔
667
  SIntervalAggOperatorInfo* pInfo = param;
1,310,860,697✔
668
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
1,310,860,697✔
669
}
670

671
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
187,271,304✔
672
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
187,271,304✔
673
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
187,448,936✔
674
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
187,531,928✔
675
}
676

677
/**
678
 * @brief check if cur window should be filtered out by limit info
679
 * @retval true if should be filtered out
680
 * @retval false if not filtering out
681
 * @note If no limit info, we skip filtering.
682
 *       If input/output ts order mismatch, we skip filtering too.
683
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
684
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
685
 *       every tuple in every block.
686
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
687
 */
688
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
2,147,483,647✔
689
                                  SExecTaskInfo* pTaskInfo) {
690
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
691
  int32_t lino = 0;
2,147,483,647✔
692
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
2,147,483,647✔
693
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
444,289,771✔
694
      // if input/output ts order mismatch, no filter
695
  ) {
696
    return false;
2,147,483,647✔
697
  }
698

699
  if (pOperatorInfo->limit == 0) return true;
188,402,822✔
700

701
  if (pOperatorInfo->pBQ == NULL) {
188,423,885✔
702
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
271,484✔
703
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
271,484✔
704
  }
705

706
  bool shouldFilter = false;
188,420,765✔
707
  // if BQ has been full, compare it with top of BQ
708
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
188,420,765✔
709
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
45,947,455✔
710
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
45,939,967✔
711
  }
712
  if (shouldFilter) {
188,216,093✔
713
    return true;
864,293✔
714
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
187,351,800✔
715
    return false;
67,372,457✔
716
  }
717

718
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
719
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
120,223,535✔
720
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
120,283,023✔
721

722
  *((TSKEY*)node.data) = win->skey;
120,283,023✔
723

724
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
120,275,119✔
725
    taosMemoryFree(node.data);
×
726
    return true;
×
727
  }
728

729
_end:
120,368,095✔
730
  if (code != TSDB_CODE_SUCCESS) {
120,335,023✔
731
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
10,192✔
732
    pTaskInfo->code = code;
10,192✔
733
    T_LONG_JMP(pTaskInfo->env, code);
×
734
  }
735
  return false;
120,324,831✔
736
}
737

738
int32_t getNumOfRowsInTimeWinUnsorted(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, STimeWindow* win,
1,972,239,194✔
739
                                      int32_t startPos) {
740
  int32_t rows = pDataBlockInfo->rows;
1,972,239,194✔
741
  for (int32_t i = startPos; i < pDataBlockInfo->rows; ++i) {
2,147,483,647✔
742
    if (pPrimaryColumn[i] >= win->skey && pPrimaryColumn[i] <= win->ekey) {
2,147,483,647✔
743
      continue;
2,147,483,647✔
744
    } else {
745
      return i - startPos;
1,935,649,058✔
746
    }
747
  }
748
  return rows - startPos;
37,357,687✔
749
}
750

751
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
47,445,102✔
752
                            int32_t scanFlag) {
753
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
47,445,102✔
754
  bool                      sorted = pInfo->binfo.inputTsOrder == ORDER_ASC || pInfo->binfo.inputTsOrder == ORDER_DESC;
47,444,697✔
755

756
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
47,443,947✔
757
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
47,445,867✔
758

759
  int32_t     startPos = 0;
47,446,659✔
760
  int32_t     numOfOutput = pSup->numOfExprs;
47,446,659✔
761
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
47,446,800✔
762
  uint64_t    tableGroupId = pBlock->info.id.groupId;
47,444,671✔
763
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
47,445,633✔
764
  SResultRow* pResult = NULL;
47,443,797✔
765
  TSKEY       ts = sorted ? getStartTsKey(&pBlock->info.window, tsCols) : tsCols[startPos];
47,443,858✔
766

767
  if (tableGroupId != pInfo->curGroupId) {
47,442,190✔
768
    pInfo->handledGroupNum += 1;
5,243,933✔
769
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
5,242,870✔
770
      return true;
11,737✔
771
    } else {
772
      pInfo->curGroupId = tableGroupId;
5,232,080✔
773
      destroyBoundedQueue(pInfo->pBQ);
5,231,529✔
774
      pInfo->pBQ = NULL;
5,231,124✔
775
    }
776
  }
777

778
  STimeWindow win =
47,431,138✔
779
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
47,431,449✔
780
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
47,431,678✔
781

782
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
46,688,447✔
783
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
784
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
46,684,838✔
785
    T_LONG_JMP(pTaskInfo->env, ret);
396✔
786
  }
787

788
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
46,692,294✔
789
  int32_t forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey,
9,831,408✔
790
                                                          NULL, pInfo->binfo.inputTsOrder)
791
                               : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &win, startPos);
46,692,294✔
792

793
  // prev time window not interpolation yet.
794
  if (pInfo->timeWindowInterpo) {
46,695,891✔
795
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
18,994✔
796
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
18,994✔
797

798
    // restore current time window
799
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
18,994✔
800
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
801
    if (ret != TSDB_CODE_SUCCESS) {
18,994✔
802
      T_LONG_JMP(pTaskInfo->env, ret);
×
803
    }
804

805
    // window start key interpolation
806
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
18,994✔
807
    if (ret != TSDB_CODE_SUCCESS) {
18,994✔
808
      T_LONG_JMP(pTaskInfo->env, ret);
×
809
    }
810
  }
811
  // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
812
  //   win.skey, win.ekey, startPos, forwardRows);
813
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
46,696,171✔
814
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
46,693,867✔
815
                                        pBlock->info.rows, numOfOutput);
46,694,751✔
816
  if (ret != TSDB_CODE_SUCCESS) {
46,689,370✔
817
    T_LONG_JMP(pTaskInfo->env, ret);
×
818
  }
819

820
  doCloseWindow(pResultRowInfo, pInfo, pResult);
46,689,370✔
821

822
  STimeWindow nextWin = win;
46,690,796✔
823
  int32_t rows = pBlock->info.rows;
46,692,429✔
824

825
  while (startPos < pBlock->info.rows) {
2,147,483,647✔
826
    if (sorted) {
2,147,483,647✔
827
      startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, forwardRows - 1 + startPos,
1,316,067,207✔
828
                                        pInfo->binfo.inputTsOrder);
829
      if (startPos < 0) {
1,316,357,106✔
830
        break;
9,681,001✔
831
      }
832
    } else {
833
      pBlock->info.rows = forwardRows;
1,972,689,093✔
834
      int32_t newStartOff = forwardRows >= 1
1,973,204,557✔
835
                                ? getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols + startPos,
1,973,632,062✔
836
                                                         forwardRows - 1, pInfo->binfo.inputTsOrder)
837
                                : -1;
1,973,632,062✔
838
      pBlock->info.rows = rows;
1,973,204,557✔
839
      if (newStartOff >= 0) {
1,973,500,756✔
840
        startPos += newStartOff;
27,348,703✔
841
      } else if ((startPos += forwardRows) < pBlock->info.rows) {
1,946,152,053✔
842
        getInitialStartTimeWindow(&pInfo->interval, tsCols[startPos], &nextWin, true);
1,908,811,325✔
843
      }
844
      if (startPos >= pBlock->info.rows) {
1,973,286,916✔
845
        break;
36,864,062✔
846
      }
847
    }
848

849
    if (filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
2,147,483,647✔
850
      break;
152,361✔
851
    }
852

853
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
2,147,483,647✔
854
    forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
1,305,364,140✔
855
                                                    pInfo->binfo.inputTsOrder)
856
                         : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &nextWin, startPos);
2,147,483,647✔
857
    if (forwardRows == 0) continue;
2,147,483,647✔
858

859
    // null data, failed to allocate more memory buffer
860
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,147,483,647✔
861
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
862
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
2,147,483,647✔
863
      T_LONG_JMP(pTaskInfo->env, code);
7✔
864
    }
865

866
    // window start(end) key interpolation
867
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
2,147,483,647✔
868
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
869
      T_LONG_JMP(pTaskInfo->env, code);
×
870
    }
871
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
872
#if 0
873
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
874
        (!ascScan && ekey >= pBlock->info.window.skey)) {
875
      // window start(end) key interpolation
876
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
877
    } else if (pInfo->timeWindowInterpo) {
878
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
879
    }
880
#endif
881
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
2,147,483,647✔
882
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,147,483,647✔
883
                                          pBlock->info.rows, numOfOutput);
2,147,483,647✔
884
    if (ret != TSDB_CODE_SUCCESS) {
2,147,483,647✔
885
      T_LONG_JMP(pTaskInfo->env, ret);
×
886
    }
887
    doCloseWindow(pResultRowInfo, pInfo, pResult);
2,147,483,647✔
888
  }
889

890
  if (pInfo->timeWindowInterpo) {
49,074,441✔
891
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
18,994✔
892
  }
893
  return false;
46,696,328✔
894
}
895

896
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
2,147,483,647✔
897
  // current result is done in computing final results.
898
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
2,147,483,647✔
899
    closeResultRow(pResult);
6,197,504✔
900
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
6,197,504✔
901
    taosMemoryFree(pNode);
6,197,504✔
902
  }
903
}
2,147,483,647✔
904

905
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
18,994✔
906
                                       SExecTaskInfo* pTaskInfo) {
907
  int32_t         code = TSDB_CODE_SUCCESS;
18,994✔
908
  int32_t         lino = 0;
18,994✔
909
  SOpenWindowInfo openWin = {0};
18,994✔
910
  openWin.pos.pageId = pResult->pageId;
18,994✔
911
  openWin.pos.offset = pResult->offset;
18,994✔
912
  openWin.groupId = groupId;
18,994✔
913
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
18,994✔
914
  if (pn == NULL) {
18,994✔
915
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
18,994✔
916
    QUERY_CHECK_CODE(code, lino, _end);
18,994✔
917
    return openWin.pos;
18,994✔
918
  }
919

920
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
×
921
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
×
922
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
923
    QUERY_CHECK_CODE(code, lino, _end);
×
924
  }
925

926
_end:
×
927
  if (code != TSDB_CODE_SUCCESS) {
×
928
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
929
    pTaskInfo->code = code;
×
930
    T_LONG_JMP(pTaskInfo->env, code);
×
931
  }
932
  return openWin.pos;
×
933
}
934

935
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
49,346,275✔
936
  TSKEY* tsCols = NULL;
49,346,275✔
937

938
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
49,346,275✔
939
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
49,348,916✔
940
    if (!pColDataInfo) {
49,347,349✔
941
      pTaskInfo->code = terrno;
×
942
      T_LONG_JMP(pTaskInfo->env, terrno);
×
943
    }
944

945
    tsCols = (int64_t*)pColDataInfo->pData;
49,347,349✔
946
    if (tsCols[0] == 0) {
49,347,528✔
947
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
491✔
948
            tsCols[pBlock->info.rows - 1]);
949
    }
950

951
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
49,350,523✔
952
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
5,237,333✔
953
      if (code != TSDB_CODE_SUCCESS) {
5,242,284✔
954
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
955
        pTaskInfo->code = code;
×
956
        T_LONG_JMP(pTaskInfo->env, code);
×
957
      }
958
    }
959
  }
960

961
  return tsCols;
49,346,523✔
962
}
963

964
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
17,407,855✔
965
  if (OPTR_IS_OPENED(pOperator)) {
17,407,855✔
966
    return TSDB_CODE_SUCCESS;
7,080,163✔
967
  }
968

969
  int32_t        code = TSDB_CODE_SUCCESS;
10,327,890✔
970
  int32_t        lino = 0;
10,327,890✔
971
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
10,327,890✔
972
  SOperatorInfo* downstream = pOperator->pDownstream[0];
10,329,098✔
973

974
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
10,328,326✔
975
  SExprSupp*                pSup = &pOperator->exprSupp;
10,328,326✔
976

977
  int32_t scanFlag = MAIN_SCAN;
10,329,098✔
978
  int64_t st = taosGetTimestampUs();
10,330,910✔
979

980
  pInfo->cleanGroupResInfo = false;
10,330,910✔
981
  while (1) {
47,434,827✔
982
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
57,761,341✔
983
    if (pBlock == NULL) {
57,761,002✔
984
      break;
9,959,295✔
985
    }
986

987
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
47,801,707✔
988

989
    if (pInfo->scalarSupp.pExprInfo != NULL) {
47,806,381✔
990
      SExprSupp* pExprSup = &pInfo->scalarSupp;
4,999,549✔
991
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
4,999,409✔
992
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
4,999,549✔
993
      QUERY_CHECK_CODE(code, lino, _end);
4,998,805✔
994
    }
995

996
    // the pDataBlock are always the same one, no need to call this again
997
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
47,445,015✔
998
    QUERY_CHECK_CODE(code, lino, _end);
47,446,788✔
999
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
47,446,788✔
1000
  }
1001

1002
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
9,971,032✔
1003
  QUERY_CHECK_CODE(code, lino, _end);
9,972,234✔
1004
  pInfo->cleanGroupResInfo = true;
9,972,234✔
1005

1006
  OPTR_SET_OPENED(pOperator);
9,972,234✔
1007

1008
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
9,972,234✔
1009

1010
_end:
10,330,427✔
1011
  if (code != TSDB_CODE_SUCCESS) {
10,330,427✔
1012
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
359,280✔
1013
    pTaskInfo->code = code;
359,280✔
1014
    T_LONG_JMP(pTaskInfo->env, code);
359,280✔
1015
  }
1016
  return code;
9,971,147✔
1017
}
1018

1019
// start a new state window and record the start info
1020
void doKeepNewStateWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList,
5,061,188✔
1021
  int32_t rowIndex, uint64_t groupId, const EStateWinExtendOption* extendOption, bool hasPrevWin) {
1022
  pRowSup->groupId = groupId;
5,061,188✔
1023
  if (*extendOption == STATE_WIN_EXTEND_OPTION_DEFAULT ||
5,061,188✔
1024
      *extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
110,939✔
1025
    pRowSup->win.skey = tsList[rowIndex];
4,995,960✔
1026
    pRowSup->startRowIndex = rowIndex;
4,995,960✔
1027
    pRowSup->numOfRows = 0;  // does not include the current row yet
4,995,820✔
1028
  } else {
1029
    pRowSup->startRowIndex = hasContinuousNullRows(pRowSup) ?
130,736✔
1030
      rowIndex - pRowSup->numNullRows : rowIndex;
65,368✔
1031
    pRowSup->win.skey = hasPrevWin ?
65,368✔
1032
                        pRowSup->win.ekey + 1 : tsList[pRowSup->startRowIndex];
65,368✔
1033
    pRowSup->numOfRows = rowIndex - pRowSup->startRowIndex;
65,368✔
1034
  }
1035
  resetNumNullRows(pRowSup);
5,061,328✔
1036
}
5,061,328✔
1037

1038
// close a state window and record its end info
1039
// this functions is called when a new state row appears
1040
// @param rowIndex the index of the first row of next window
1041
void doKeepCurStateWindowEndInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, 
5,245,292✔
1042
                                 int32_t rowIndex,
1043
                                 const EStateWinExtendOption* extendOption,
1044
                                 bool hasNextWin) {
1045
  if (*extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
5,245,292✔
1046
      pRowSup->win.ekey = hasNextWin?
53,331✔
1047
                          tsList[rowIndex] - 1 : pRowSup->prevTs;
53,331✔
1048
      // continuous rows having null state col should be included in this window
1049
      pRowSup->numOfRows += hasContinuousNullRows(pRowSup) ?
106,662✔
1050
        pRowSup->numNullRows : 0;
53,331✔
1051
      resetNumNullRows(pRowSup);
53,331✔
1052
  }
1053
}
5,245,292✔
1054

1055
void doKeepStateWindowNullInfo(SWindowRowsSup* pRowSup, TSKEY nullRowTs) {
25,626,977✔
1056
  pRowSup->numNullRows += 1;
25,626,977✔
1057
  pRowSup->prevTs = nullRowTs;
25,626,977✔
1058
}
25,626,977✔
1059

1060
// process a closed state window
1061
// do aggregation on the tuples within the window
1062
// partial aggregation results are stored in the output buffer
1063
static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo,
5,245,292✔
1064
  SWindowRowsSup* pRowSup, SExecTaskInfo* pTaskInfo,
1065
  SExprSupp* pSup, int32_t numOfOutput) {
1066
  int32_t     code = 0;
5,245,292✔
1067
  int32_t     lino = 0;
5,245,292✔
1068
  SResultRow* pResult = NULL;
5,245,292✔
1069
  if (pRowSup->numOfRows == 0) {
5,245,292✔
1070
    // no valid rows within the window
1071
    return TSDB_CODE_SUCCESS;
178,896✔
1072
  }
1073
  code = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win,
10,132,792✔
1074
    true, &pResult, pRowSup->groupId, pSup->pCtx, numOfOutput,
5,066,396✔
1075
    pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1076
  QUERY_CHECK_CODE(code, lino, _return);
5,066,256✔
1077

1078
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
5,066,256✔
1079
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx,
5,066,256✔
1080
    &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1081
    pRowSup->numOfRows, 0, numOfOutput);
1082
  QUERY_CHECK_CODE(code, lino, _return);
5,066,396✔
1083

1084
_return:
5,066,396✔
1085
  if (code != TSDB_CODE_SUCCESS) {
5,066,396✔
1086
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
531✔
1087
  }
1088
  return code;
5,066,256✔
1089
}
1090

1091
// process a data block for state window aggregation
1092
// scan from startIndex to endIndex
1093
// numPartialCalcRows returns the number of rows that have been
1094
// partially calculated within the block
1095
static void doStateWindowAggImpl(SOperatorInfo* pOperator,
1,194,920✔
1096
                                 SStateWindowOperatorInfo* pInfo,
1097
                                 SSDataBlock* pBlock, int32_t* startIndex,
1098
                                 int32_t* endIndex,
1099
                                 int32_t* numPartialCalcRows) {
1100
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,194,920✔
1101
  SExprSupp*     pExprSup = &pOperator->exprSupp;
1,194,920✔
1102

1103
  SColumnInfoData* pStateColInfoData = 
1104
    taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
1,194,920✔
1105
  if (!pStateColInfoData) {
1,194,920✔
1106
    pTaskInfo->code = terrno;
×
1107
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1108
  }
1109
  uint64_t gid = pBlock->info.id.groupId;
1,194,920✔
1110
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
1,194,920✔
1111
  int32_t bytes = pStateColInfoData->info.bytes;
1,194,920✔
1112

1113
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock,
1,194,920✔
1114
                                               pInfo->tsSlotId);
1,194,920✔
1115
  if (NULL == pColInfoData) {
1,194,920✔
1116
    pTaskInfo->code = terrno;
×
1117
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1118
  }
1119
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
1,194,920✔
1120

1121
  struct SColumnDataAgg* pAgg = (pBlock->pBlockAgg != NULL) ?
1,194,920✔
1122
                                &pBlock->pBlockAgg[pInfo->stateCol.slotId] :
1,194,920✔
1123
                                NULL;
1124
  EStateWinExtendOption  extendOption = pInfo->extendOption;
1,194,920✔
1125
  SWindowRowsSup*        pRowSup = &pInfo->winSup;
1,194,920✔
1126

1127
  if (pRowSup->groupId != gid) {
1,194,920✔
1128
    /*
1129
      group changed, process the previous group's unclosed state window first
1130
    */
1131
    doKeepCurStateWindowEndInfo(pRowSup, tsList, 0, &extendOption, false);
175,032✔
1132
    int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
175,032✔
1133
                                            pExprSup, numOfOutput);
1134
    if (TSDB_CODE_SUCCESS != code) T_LONG_JMP(pTaskInfo->env, code);
175,032✔
1135
    *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
175,032✔
1136

1137
    /*
1138
      unhandled null rows should be ignored, since they belong to previous group
1139
    */
1140
    *numPartialCalcRows += pRowSup->numNullRows;
175,032✔
1141

1142
    /*
1143
      reset state window info for new group
1144
    */
1145
    pInfo->hasKey = false;
175,032✔
1146
    resetWindowRowsSup(pRowSup);
175,032✔
1147
  }
1148

1149
  for (int32_t j = *startIndex; j < *endIndex; ++j) {
107,026,618✔
1150
    if (pBlock->info.scanFlag != PRE_SCAN) {
105,847,268✔
1151
      if (pInfo->winSup.lastTs == INT64_MIN || gid != pRowSup->groupId || !pInfo->hasKey) {
105,774,342✔
1152
        pInfo->winSup.lastTs = tsList[j];
6,670,958✔
1153
      } else {
1154
        if (tsList[j] == pInfo->winSup.lastTs) {
99,103,384✔
1155
          // forbid duplicated ts rows
1156
          qError("%s:%d duplicated ts found in state window aggregation", __FILE__, __LINE__);
15,570✔
1157
          pTaskInfo->code = TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP;
15,570✔
1158
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP);
15,570✔
1159
        } else {
1160
          pInfo->winSup.lastTs = tsList[j];
99,087,814✔
1161
        }
1162
      }
1163
    }
1164
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
211,664,096✔
1165
      doKeepStateWindowNullInfo(pRowSup, tsList[j]);
25,626,977✔
1166
      continue;
25,626,977✔
1167
    }
1168
    if (pStateColInfoData->pData == NULL) {
80,205,141✔
1169
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1170
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1171
    }
1172
    char* val = colDataGetData(pStateColInfoData, j);
80,204,861✔
1173

1174
    if (!pInfo->hasKey) {
80,205,141✔
1175
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
1,140,757✔
1176
      pInfo->hasKey = true;
1,140,757✔
1177
      doKeepNewStateWindowStartInfo(
1,140,757✔
1178
        pRowSup, tsList, j, gid, &extendOption, false);
1179
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,140,757✔
1180
    } else if (!compareVal(val, &pInfo->stateKey)) {
79,064,384✔
1181
      doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption, true);
3,920,571✔
1182
      int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
3,920,571✔
1183
                                              pExprSup, numOfOutput);
1184
      if (TSDB_CODE_SUCCESS != code) {
3,920,431✔
1185
        T_LONG_JMP(pTaskInfo->env, code);
×
1186
      }
1187
      *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
3,920,431✔
1188

1189
      doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid,
3,920,431✔
1190
                                    &extendOption, true);
1191
      doKeepTuple(pRowSup, tsList[j], j, gid);
3,920,571✔
1192
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
3,920,431✔
1193
    } else {
1194
      doKeepTuple(pRowSup, tsList[j], j, gid);
75,143,953✔
1195
    }
1196
  }
1197

1198
  if (!pInfo->hasKey && extendOption != STATE_WIN_EXTEND_OPTION_FORWARD) {
1,179,350✔
1199
    /*
1200
      No valid state rows within the block and we don't care about
1201
      null rows before valid state window, mark them as processed and drop them
1202
    */
1203
    *numPartialCalcRows = pBlock->info.rows;
26,268✔
1204
    return;
26,268✔
1205
  }
1206
  if (pRowSup->numOfRows == 0 && 
1,153,082✔
1207
      extendOption != STATE_WIN_EXTEND_OPTION_BACKWARD) {
4,848✔
1208
    /*
1209
      If no valid state window or we don't know the belonging of
1210
      these null rows, return and handle them with next block
1211
    */
1212
    return;
3,393✔
1213
  }
1214
  doKeepCurStateWindowEndInfo(pRowSup, tsList, *endIndex, &extendOption, false);
1,149,689✔
1215
  int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
1,149,689✔
1216
                                          pExprSup, numOfOutput);
1217
  if (TSDB_CODE_SUCCESS != code) {
1,149,689✔
1218
    pTaskInfo->code = code;
531✔
1219
    T_LONG_JMP(pTaskInfo->env, code);
531✔
1220
  }
1221
  *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
1,149,158✔
1222
  // reset part of pRowSup after doing agg calculation
1223
  pRowSup->startRowIndex = 0;
1,149,158✔
1224
  pRowSup->numOfRows = 0;
1,149,158✔
1225
}
1226

1227
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
2,077,254✔
1228
  if (OPTR_IS_OPENED(pOperator)) {
2,077,254✔
1229
    return TSDB_CODE_SUCCESS;
100,670✔
1230
  }
1231

1232
  int32_t                   code = TSDB_CODE_SUCCESS;
1,976,584✔
1233
  int32_t                   lino = 0;
1,976,584✔
1234
  SStateWindowOperatorInfo* pInfo = pOperator->info;
1,976,584✔
1235
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1,976,584✔
1236

1237
  SExprSupp* pSup = &pOperator->exprSupp;
1,976,584✔
1238
  int32_t    order = pInfo->binfo.inputTsOrder;
1,976,584✔
1239
  int64_t    st = taosGetTimestampUs();
1,976,584✔
1240

1241
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,976,584✔
1242
  pInfo->cleanGroupResInfo = false;
1,976,584✔
1243

1244
  SSDataBlock* pUnfinishedBlock = NULL;
1,976,584✔
1245
  int32_t      startIndex = 0;
1,976,584✔
1246
  int32_t      endIndex = 0;
1,976,584✔
1247
  int32_t      numPartialCalcRows = 0;
1,976,584✔
1248
  while (1) {
1,178,819✔
1249
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,155,403✔
1250
    if (pBlock == NULL) {
3,155,403✔
1251
      if (pUnfinishedBlock != NULL) {
1,657,795✔
1252
        blockDataDestroy(pUnfinishedBlock);
14,901✔
1253
        pUnfinishedBlock = NULL;
14,901✔
1254
        resetWindowRowsSup(&pInfo->winSup);
14,901✔
1255
      }
1256
      break;
1,657,795✔
1257
    }
1258
    
1259
    // mark whether pUnfinishedBlock is a reference to pBlock
1260
    bool isRef = false;
1,497,608✔
1261
    startIndex = 0;
1,497,608✔
1262
    if (pUnfinishedBlock != NULL) {
1,497,608✔
1263
      startIndex = pUnfinishedBlock->info.rows;
7,756✔
1264
      // merge unfinished block with current block
1265
      code = blockDataMerge(pUnfinishedBlock, pBlock);
7,756✔
1266
      // reset id to current block id
1267
      pUnfinishedBlock->info.id = pBlock->info.id;
7,756✔
1268
      QUERY_CHECK_CODE(code, lino, _end);
7,756✔
1269
    } else {
1270
      pUnfinishedBlock = pBlock;
1,489,852✔
1271
      isRef = true;
1,489,852✔
1272
    }
1273
    endIndex = pUnfinishedBlock->info.rows;
1,497,608✔
1274

1275
    pInfo->binfo.pRes->info.scanFlag = pUnfinishedBlock->info.scanFlag;
1,497,608✔
1276
    code = setInputDataBlock(
1,497,608✔
1277
      pSup, pUnfinishedBlock, order, pUnfinishedBlock->info.scanFlag, true);
1,497,608✔
1278
    QUERY_CHECK_CODE(code, lino, _end);
1,497,608✔
1279

1280
    code = blockDataUpdateTsWindow(pUnfinishedBlock, pInfo->tsSlotId);
1,497,608✔
1281
    QUERY_CHECK_CODE(code, lino, _end);
1,497,608✔
1282

1283
    // there is an scalar expression that 
1284
    // needs to be calculated right before apply the group aggregation.
1285
    if (pInfo->scalarSup.pExprInfo != NULL) {
1,497,608✔
1286
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo,
1,317,468✔
1287
        pUnfinishedBlock, pUnfinishedBlock, pInfo->scalarSup.pCtx,
1288
        pInfo->scalarSup.numOfExprs, NULL,
1289
        GET_STM_RTINFO(pOperator->pTaskInfo));
1,317,468✔
1290
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
1,317,468✔
1291
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
302,688✔
1292
      }
1293
    }
1294

1295
    doStateWindowAggImpl(pOperator, pInfo, pUnfinishedBlock, 
1,194,920✔
1296
      &startIndex, &endIndex, &numPartialCalcRows);
1297
    if (numPartialCalcRows < pUnfinishedBlock->info.rows) {
1,178,819✔
1298
      // save unfinished block for next round processing
1299
      if (isRef) {
22,657✔
1300
        code = createOneDataBlock(pBlock, true, &pUnfinishedBlock);
19,264✔
1301
        QUERY_CHECK_CODE(code, lino, _end);
19,264✔
1302
      }
1303
      code = blockDataTrimFirstRows(pUnfinishedBlock, numPartialCalcRows);
22,657✔
1304
      QUERY_CHECK_NULL(pUnfinishedBlock, code, lino, _end, terrno);
22,657✔
1305
    } else {
1306
      if (!isRef) {
1,156,162✔
1307
        blockDataDestroy(pUnfinishedBlock);
4,363✔
1308
      }
1309
      pUnfinishedBlock = NULL;
1,156,162✔
1310
    }
1311
    numPartialCalcRows = 0;
1,178,819✔
1312
  }
1313

1314
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,657,795✔
1315
  code = initGroupedResultInfo(
1,657,795✔
1316
    &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1317
  QUERY_CHECK_CODE(code, lino, _end);
1,657,795✔
1318
  pInfo->cleanGroupResInfo = true;
1,657,795✔
1319
  pOperator->status = OP_RES_TO_RETURN;
1,657,795✔
1320

1321
_end:
1,657,795✔
1322
  if (code != TSDB_CODE_SUCCESS) {
1,657,795✔
1323
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1324
    pTaskInfo->code = code;
×
1325
    T_LONG_JMP(pTaskInfo->env, code);
×
1326
  }
1327
  return code;
1,657,795✔
1328
}
1329

1330
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,077,146✔
1331
  if (pOperator->status == OP_EXEC_DONE) {
3,077,146✔
1332
    (*ppRes) = NULL;
999,892✔
1333
    return TSDB_CODE_SUCCESS;
999,892✔
1334
  }
1335

1336
  int32_t                   code = TSDB_CODE_SUCCESS;
2,077,254✔
1337
  int32_t                   lino = 0;
2,077,254✔
1338
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2,077,254✔
1339
  SStateWindowOperatorInfo* pInfo = pOperator->info;
2,077,254✔
1340
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
2,077,254✔
1341

1342
  code = pOperator->fpSet._openFn(pOperator);
2,077,254✔
1343
  QUERY_CHECK_CODE(code, lino, _end);
1,758,465✔
1344

1345
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
1,758,465✔
1346
  QUERY_CHECK_CODE(code, lino, _end);
1,758,465✔
1347

1348
  while (1) {
×
1349
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,758,465✔
1350
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,758,465✔
1351
    QUERY_CHECK_CODE(code, lino, _end);
1,758,465✔
1352

1353
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,758,465✔
1354
    if (!hasRemain) {
1,758,325✔
1355
      setOperatorCompleted(pOperator);
1,657,655✔
1356
      break;
1,657,795✔
1357
    }
1358

1359
    if (pBInfo->pRes->info.rows > 0) {
100,670✔
1360
      break;
100,670✔
1361
    }
1362
  }
1363

1364
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1,758,465✔
1365

1366
_end:
1,758,465✔
1367
  if (code != TSDB_CODE_SUCCESS) {
1,758,465✔
1368
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1369
    pTaskInfo->code = code;
×
1370
    T_LONG_JMP(pTaskInfo->env, code);
×
1371
  }
1372
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1,758,465✔
1373
  return code;
1,758,465✔
1374
}
1375

1376
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
23,413,879✔
1377
  int32_t                   code = TSDB_CODE_SUCCESS;
23,413,879✔
1378
  int32_t                   lino = 0;
23,413,879✔
1379
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
23,413,879✔
1380
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
23,416,619✔
1381

1382
  if (pOperator->status == OP_EXEC_DONE) {
23,415,586✔
1383
    (*ppRes) = NULL;
6,006,054✔
1384
    return code;
6,006,658✔
1385
  }
1386

1387
  SSDataBlock* pBlock = pInfo->binfo.pRes;
17,407,552✔
1388
  code = pOperator->fpSet._openFn(pOperator);
17,408,149✔
1389
  QUERY_CHECK_CODE(code, lino, _end);
17,051,316✔
1390

1391
  while (1) {
2,130✔
1392
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
17,053,446✔
1393
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
17,055,402✔
1394
    QUERY_CHECK_CODE(code, lino, _end);
17,055,402✔
1395

1396
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
17,055,402✔
1397
    if (!hasRemain) {
17,055,402✔
1398
      setOperatorCompleted(pOperator);
9,962,216✔
1399
      break;
9,962,216✔
1400
    }
1401

1402
    if (pBlock->info.rows > 0) {
7,093,186✔
1403
      break;
7,091,056✔
1404
    }
1405
  }
1406

1407
  size_t rows = pBlock->info.rows;
17,053,272✔
1408
  pOperator->resultInfo.totalRows += rows;
17,053,272✔
1409

1410
_end:
17,053,272✔
1411
  if (code != TSDB_CODE_SUCCESS) {
17,053,272✔
1412
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1413
    pTaskInfo->code = code;
×
1414
    T_LONG_JMP(pTaskInfo->env, code);
×
1415
  }
1416
  (*ppRes) = (rows == 0) ? NULL : pBlock;
17,053,272✔
1417
  return code;
17,053,272✔
1418
}
1419

1420
static void destroyStateWindowOperatorInfo(void* param) {
1,938,974✔
1421
  if (param == NULL) {
1,938,974✔
1422
    return;
×
1423
  }
1424
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
1,938,974✔
1425
  cleanupBasicInfo(&pInfo->binfo);
1,938,974✔
1426
  taosMemoryFreeClear(pInfo->stateKey.pData);
1,938,974✔
1427
  if (pInfo->pOperator) {
1,938,974✔
1428
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,938,974✔
1429
                      pInfo->cleanGroupResInfo);
1,938,974✔
1430
    pInfo->pOperator = NULL;
1,938,974✔
1431
  }
1432

1433
  cleanupExprSupp(&pInfo->scalarSup);
1,938,974✔
1434
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,938,974✔
1435
  cleanupAggSup(&pInfo->aggSup);
1,938,974✔
1436
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,938,974✔
1437

1438
  taosMemoryFreeClear(param);
1,938,974✔
1439
}
1440

1441
static void freeItem(void* param) {
35,544✔
1442
  SGroupKeys* pKey = (SGroupKeys*)param;
35,544✔
1443
  taosMemoryFree(pKey->pData);
35,544✔
1444
}
35,544✔
1445

1446
void destroyIntervalOperatorInfo(void* param) {
11,278,226✔
1447
  if (param == NULL) {
11,278,226✔
1448
    return;
×
1449
  }
1450

1451
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
11,278,226✔
1452

1453
  cleanupBasicInfo(&pInfo->binfo);
11,278,226✔
1454
  if (pInfo->pOperator) {
11,278,226✔
1455
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
10,950,106✔
1456
                      pInfo->cleanGroupResInfo);
10,950,106✔
1457
    pInfo->pOperator = NULL;
10,950,106✔
1458
  }
1459

1460
  cleanupAggSup(&pInfo->aggSup);
11,278,226✔
1461
  cleanupExprSupp(&pInfo->scalarSupp);
11,278,226✔
1462

1463
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
11,278,226✔
1464

1465
  taosArrayDestroy(pInfo->pInterpCols);
11,278,226✔
1466
  pInfo->pInterpCols = NULL;
11,278,226✔
1467

1468
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
11,278,226✔
1469
  pInfo->pPrevValues = NULL;
11,278,226✔
1470

1471
  cleanupGroupResInfo(&pInfo->groupResInfo);
11,278,226✔
1472
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
11,277,720✔
1473
  destroyBoundedQueue(pInfo->pBQ);
11,278,226✔
1474
  taosMemoryFreeClear(param);
11,278,226✔
1475
}
1476

1477
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
17,772✔
1478
  int32_t code = TSDB_CODE_SUCCESS;
17,772✔
1479
  int32_t lino = 0;
17,772✔
1480
  void*   tmp = NULL;
17,772✔
1481

1482
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
17,772✔
1483
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
17,772✔
1484

1485
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
17,772✔
1486
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
17,772✔
1487

1488
  {  // ts column
1489
    SColumn c = {0};
17,772✔
1490
    c.colId = 1;
17,772✔
1491
    c.slotId = pInfo->primaryTsIndex;
17,772✔
1492
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
17,772✔
1493
    c.bytes = sizeof(int64_t);
17,772✔
1494
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
17,772✔
1495
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,772✔
1496

1497
    SGroupKeys key;
17,772✔
1498
    key.bytes = c.bytes;
17,772✔
1499
    key.type = c.type;
17,772✔
1500
    key.isNull = true;  // to denote no value is assigned yet
17,772✔
1501
    key.pData = taosMemoryCalloc(1, c.bytes);
17,772✔
1502
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
17,772✔
1503

1504
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
17,772✔
1505
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,772✔
1506
  }
1507
_end:
17,772✔
1508
  if (code != TSDB_CODE_SUCCESS) {
17,772✔
1509
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1510
  }
1511
  return code;
17,772✔
1512
}
1513

1514
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
10,947,188✔
1515
                                      bool* pRes) {
1516
  // the primary timestamp column
1517
  bool    needed = false;
10,947,188✔
1518
  int32_t code = TSDB_CODE_SUCCESS;
10,947,188✔
1519
  int32_t lino = 0;
10,947,188✔
1520
  void*   tmp = NULL;
10,947,188✔
1521

1522
  for (int32_t i = 0; i < numOfCols; ++i) {
32,138,664✔
1523
    SExprInfo* pExpr = pCtx[i].pExpr;
21,209,789✔
1524
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
21,209,570✔
1525
      needed = true;
17,772✔
1526
      break;
17,772✔
1527
    }
1528
  }
1529

1530
  if (needed) {
10,946,647✔
1531
    code = initWindowInterpPrevVal(pInfo);
17,772✔
1532
    QUERY_CHECK_CODE(code, lino, _end);
17,772✔
1533
  }
1534

1535
  for (int32_t i = 0; i < numOfCols; ++i) {
32,155,036✔
1536
    SExprInfo* pExpr = pCtx[i].pExpr;
21,211,805✔
1537

1538
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
21,212,537✔
1539
      SFunctParam* pParam = &pExpr->base.pParam[0];
17,772✔
1540

1541
      SColumn c = *pParam->pCol;
17,772✔
1542
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
17,772✔
1543
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,772✔
1544

1545
      SGroupKeys key = {0};
17,772✔
1546
      key.bytes = c.bytes;
17,772✔
1547
      key.type = c.type;
17,772✔
1548
      key.isNull = false;
17,772✔
1549
      key.pData = taosMemoryCalloc(1, c.bytes);
17,772✔
1550
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
17,772✔
1551

1552
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
17,772✔
1553
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,772✔
1554
    }
1555
  }
1556

1557
_end:
10,943,231✔
1558
  if (code != TSDB_CODE_SUCCESS) {
10,945,519✔
1559
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1560
  }
1561
  *pRes = needed;
10,945,519✔
1562
  return code;
10,943,960✔
1563
}
1564

1565
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
2,658✔
1566
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
2,658✔
1567
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
2,658✔
1568
  pOper->status = OP_NOT_OPENED;
2,658✔
1569

1570
  resetBasicOperatorState(&pIntervalInfo->binfo);
2,658✔
1571
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
2,658✔
1572
    pIntervalInfo->cleanGroupResInfo);
2,658✔
1573

1574
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
2,658✔
1575
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
2,658✔
1576
  if (code == 0) {
2,658✔
1577
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
5,316✔
1578
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
2,658✔
1579
                       &pTaskInfo->storageAPI.functionStore);
1580
  }
1581
  if (code == 0) {
2,658✔
1582
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
2,658✔
1583
                         &pTaskInfo->storageAPI.functionStore);
1584
  }
1585

1586
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
2,658✔
1587
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
1588
  }
1589

1590
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
2,658✔
1591
    pIntervalInfo->curGroupId = UINT64_MAX;
×
1592
  }
1593

1594
  pIntervalInfo->cleanGroupResInfo = false;
2,658✔
1595
  pIntervalInfo->handledGroupNum = 0;
2,658✔
1596
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
2,658✔
1597
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
2,658✔
1598

1599
  taosArrayDestroy(pIntervalInfo->pInterpCols);
2,658✔
1600
  pIntervalInfo->pInterpCols = NULL;
2,658✔
1601

1602
  if (pIntervalInfo->pPrevValues != NULL) {
2,658✔
1603
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
1604
    pIntervalInfo->pPrevValues = NULL;
×
1605
    code = initWindowInterpPrevVal(pIntervalInfo);
×
1606
  }
1607

1608
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
2,658✔
1609
  destroyBoundedQueue(pIntervalInfo->pBQ);
2,658✔
1610
  pIntervalInfo->pBQ = NULL;
2,658✔
1611
  return code;
2,658✔
1612
}
1613

1614
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
2,658✔
1615
  SIntervalAggOperatorInfo* pInfo = pOper->info;
2,658✔
1616
  return resetInterval(pOper, pInfo);
2,658✔
1617
}
1618

1619
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
10,700,815✔
1620
                                   SOperatorInfo** pOptrInfo) {
1621
  QRY_PARAM_CHECK(pOptrInfo);
10,700,815✔
1622

1623
  int32_t                   code = TSDB_CODE_SUCCESS;
10,702,314✔
1624
  int32_t                   lino = 0;
10,702,314✔
1625
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
10,702,314✔
1626
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
10,698,293✔
1627
  if (pInfo == NULL || pOperator == NULL) {
10,698,781✔
1628
    code = terrno;
×
1629
    lino = __LINE__;
×
1630
    goto _error;
×
1631
  }
1632

1633
  pOperator->pPhyNode = pPhyNode;
10,699,267✔
1634
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
10,700,362✔
1635
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
10,702,918✔
1636
  initBasicInfo(&pInfo->binfo, pResBlock);
10,702,918✔
1637

1638
  SExprSupp* pSup = &pOperator->exprSupp;
10,701,655✔
1639
  pSup->hasWindowOrGroup = true;
10,701,655✔
1640
  pSup->hasWindow = true;
10,700,911✔
1641

1642
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
10,700,475✔
1643

1644
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
10,702,051✔
1645
  initResultSizeInfo(&pOperator->resultInfo, 512);
10,702,051✔
1646
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
10,701,502✔
1647
  QUERY_CHECK_CODE(code, lino, _error);
10,702,314✔
1648

1649
  int32_t    num = 0;
10,702,314✔
1650
  SExprInfo* pExprInfo = NULL;
10,702,314✔
1651
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
10,702,314✔
1652
  QUERY_CHECK_CODE(code, lino, _error);
10,702,435✔
1653

1654
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
10,702,435✔
1655
                    &pTaskInfo->storageAPI.functionStore);
1656
  QUERY_CHECK_CODE(code, lino, _error);
10,702,435✔
1657

1658
  SInterval interval = {.interval = pPhyNode->interval,
32,104,024✔
1659
                        .sliding = pPhyNode->sliding,
10,700,938✔
1660
                        .intervalUnit = pPhyNode->intervalUnit,
10,701,087✔
1661
                        .slidingUnit = pPhyNode->slidingUnit,
10,701,087✔
1662
                        .offset = pPhyNode->offset,
10,701,087✔
1663
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
10,700,798✔
1664
                        .timeRange = pPhyNode->timeRange};
1665
  calcIntervalAutoOffset(&interval);
10,701,087✔
1666

1667
  STimeWindowAggSupp as = {
10,699,107✔
1668
      .maxTs = INT64_MIN,
1669
  };
1670

1671
  pInfo->win = pTaskInfo->window;
10,699,107✔
1672
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
10,699,879✔
1673
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
10,700,966✔
1674
  pInfo->interval = interval;
10,699,220✔
1675
  pInfo->twAggSup = as;
10,699,879✔
1676
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
10,700,919✔
1677
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
10,698,671✔
1678
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
922,368✔
1679
    pInfo->limited = true;
922,368✔
1680
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
921,877✔
1681
  }
1682
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
10,698,519✔
1683
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
63,378✔
1684
    pInfo->slimited = true;
63,378✔
1685
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
63,378✔
1686
    pInfo->curGroupId = UINT64_MAX;
63,378✔
1687
  }
1688

1689
  if (pPhyNode->window.pExprs != NULL) {
10,700,483✔
1690
    int32_t    numOfScalar = 0;
3,780,275✔
1691
    SExprInfo* pScalarExprInfo = NULL;
3,780,275✔
1692
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
3,779,671✔
1693
    QUERY_CHECK_CODE(code, lino, _error);
3,780,275✔
1694

1695
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
3,452,155✔
1696
    if (code != TSDB_CODE_SUCCESS) {
3,452,015✔
1697
      goto _error;
×
1698
    }
1699
  }
1700

1701
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
10,374,026✔
1702
                            pTaskInfo->pStreamRuntimeInfo);
10,373,088✔
1703
  if (code != TSDB_CODE_SUCCESS) {
10,370,757✔
1704
    goto _error;
×
1705
  }
1706

1707
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
10,370,757✔
1708
  QUERY_CHECK_CODE(code, lino, _error);
10,371,880✔
1709

1710
  pInfo->timeWindowInterpo = false;
10,371,880✔
1711
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
10,374,194✔
1712
  QUERY_CHECK_CODE(code, lino, _error);
10,370,672✔
1713
  if (pInfo->timeWindowInterpo) {
10,370,672✔
1714
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
17,772✔
1715
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
17,772✔
1716
      goto _error;
×
1717
    }
1718
  }
1719

1720
  pInfo->pOperator = pOperator;
10,371,155✔
1721
  pInfo->cleanGroupResInfo = false;
10,371,759✔
1722
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
10,368,325✔
1723
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
10,371,276✔
1724
                  pInfo, pTaskInfo);
1725

1726
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
10,371,598✔
1727
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1728
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
10,372,355✔
1729
  code = appendDownstream(pOperator, &downstream, 1);
10,372,948✔
1730
  if (code != TSDB_CODE_SUCCESS) {
10,371,315✔
1731
    goto _error;
×
1732
  }
1733

1734
  *pOptrInfo = pOperator;
10,371,315✔
1735
  return TSDB_CODE_SUCCESS;
10,369,169✔
1736

1737
_error:
328,120✔
1738
  if (pInfo != NULL) {
328,120✔
1739
    destroyIntervalOperatorInfo(pInfo);
328,120✔
1740
  }
1741

1742
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
328,120✔
1743
  pTaskInfo->code = code;
328,120✔
1744
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
328,120✔
1745
  return code;
328,120✔
1746
}
1747

1748
// todo handle multiple timeline cases. assume no timeline interweaving
1749
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
2,294,147✔
1750
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,294,147✔
1751
  SExprSupp*     pSup = &pOperator->exprSupp;
2,294,147✔
1752

1753
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
2,294,147✔
1754
  if (!pColInfoData) {
2,294,147✔
1755
    pTaskInfo->code = terrno;
×
1756
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1757
  }
1758

1759
  bool    masterScan = true;
2,294,147✔
1760
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
2,294,147✔
1761
  int64_t gid = pBlock->info.id.groupId;
2,294,147✔
1762

1763
  int64_t gap = pInfo->gap;
2,294,147✔
1764

1765
  if (!pInfo->reptScan) {
2,294,147✔
1766
    pInfo->reptScan = true;
1,442,932✔
1767
    pInfo->winSup.prevTs = INT64_MIN;
1,442,932✔
1768
  }
1769

1770
  SWindowRowsSup* pRowSup = &pInfo->winSup;
2,294,147✔
1771
  pRowSup->numOfRows = 0;
2,294,147✔
1772
  pRowSup->startRowIndex = 0;
2,294,147✔
1773

1774
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1775
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
2,294,147✔
1776
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
881,478,031✔
1777
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
879,183,884✔
1778
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
1,444,932✔
1779
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,444,932✔
1780
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
877,738,952✔
1781
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
271,620,039✔
1782
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1783
      doKeepTuple(pRowSup, tsList[j], j, gid);
606,191,464✔
1784
    } else {  // start a new session window
1785
      // start a new session window
1786
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
271,547,488✔
1787
        SResultRow* pResult = NULL;
271,265,539✔
1788

1789
        // keep the time window for the closed time window.
1790
        STimeWindow window = pRowSup->win;
271,265,539✔
1791

1792
        int32_t ret =
1793
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
271,265,539✔
1794
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1795
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
271,265,539✔
1796
          T_LONG_JMP(pTaskInfo->env, ret);
×
1797
        }
1798

1799
        // pInfo->numOfRows data belong to the current session window
1800
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
271,265,539✔
1801
        ret =
1802
            applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
271,265,539✔
1803
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
271,265,539✔
1804
        if (ret != TSDB_CODE_SUCCESS) {
271,265,539✔
1805
          T_LONG_JMP(pTaskInfo->env, ret);
×
1806
        }
1807
      }
1808

1809
      // here we start a new session window
1810
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
271,547,488✔
1811
      doKeepTuple(pRowSup, tsList[j], j, gid);
271,547,488✔
1812
    }
1813
  }
1814

1815
  SResultRow* pResult = NULL;
2,294,147✔
1816
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
2,294,147✔
1817
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
2,294,147✔
1818
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1819
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
2,294,147✔
1820
    T_LONG_JMP(pTaskInfo->env, ret);
×
1821
  }
1822

1823
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
2,294,147✔
1824
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
2,294,147✔
1825
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
2,294,147✔
1826
  if (ret != TSDB_CODE_SUCCESS) {
2,294,147✔
1827
    T_LONG_JMP(pTaskInfo->env, ret);
×
1828
  }
1829
}
2,294,147✔
1830

1831
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,392,593✔
1832
  if (pOperator->status == OP_EXEC_DONE) {
3,392,593✔
1833
    (*ppRes) = NULL;
1,086,164✔
1834
    return TSDB_CODE_SUCCESS;
1,086,164✔
1835
  }
1836

1837
  int32_t                  code = TSDB_CODE_SUCCESS;
2,306,429✔
1838
  int32_t                  lino = 0;
2,306,429✔
1839
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
2,306,429✔
1840
  SSessionAggOperatorInfo* pInfo = pOperator->info;
2,306,429✔
1841
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
2,306,429✔
1842
  SExprSupp*               pSup = &pOperator->exprSupp;
2,306,429✔
1843

1844
  if (pOperator->status == OP_RES_TO_RETURN) {
2,306,429✔
1845
    while (1) {
×
1846
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
2,000✔
1847
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
2,000✔
1848
      QUERY_CHECK_CODE(code, lino, _end);
2,000✔
1849

1850
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
2,000✔
1851
      if (!hasRemain) {
2,000✔
1852
        setOperatorCompleted(pOperator);
1,420✔
1853
        break;
1,420✔
1854
      }
1855

1856
      if (pBInfo->pRes->info.rows > 0) {
580✔
1857
        break;
580✔
1858
      }
1859
    }
1860
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
2,000✔
1861
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
2,000✔
1862
    return code;
2,000✔
1863
  }
1864

1865
  int64_t st = taosGetTimestampUs();
2,304,429✔
1866
  int32_t order = pInfo->binfo.inputTsOrder;
2,304,429✔
1867

1868
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2,304,429✔
1869

1870
  pInfo->cleanGroupResInfo = false;
2,304,429✔
1871
  while (1) {
2,294,147✔
1872
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
4,598,576✔
1873
    if (pBlock == NULL) {
4,598,576✔
1874
      break;
2,304,429✔
1875
    }
1876

1877
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
2,294,147✔
1878
    if (pInfo->scalarSupp.pExprInfo != NULL) {
2,294,147✔
1879
      SExprSupp* pExprSup = &pInfo->scalarSupp;
678✔
1880
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
678✔
1881
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
678✔
1882
      QUERY_CHECK_CODE(code, lino, _end);
678✔
1883
    }
1884
    // the pDataBlock are always the same one, no need to call this again
1885
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
2,294,147✔
1886
    QUERY_CHECK_CODE(code, lino, _end);
2,294,147✔
1887

1888
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
2,294,147✔
1889
    QUERY_CHECK_CODE(code, lino, _end);
2,294,147✔
1890

1891
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
2,294,147✔
1892
  }
1893

1894
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2,304,429✔
1895

1896
  // restore the value
1897
  pOperator->status = OP_RES_TO_RETURN;
2,304,429✔
1898

1899
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
2,304,429✔
1900
  QUERY_CHECK_CODE(code, lino, _end);
2,304,429✔
1901
  pInfo->cleanGroupResInfo = true;
2,304,429✔
1902

1903
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
2,304,429✔
1904
  QUERY_CHECK_CODE(code, lino, _end);
2,304,429✔
1905
  while (1) {
×
1906
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
2,304,429✔
1907
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
2,304,429✔
1908
    QUERY_CHECK_CODE(code, lino, _end);
2,304,429✔
1909

1910
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
2,304,429✔
1911
    if (!hasRemain) {
2,304,429✔
1912
      setOperatorCompleted(pOperator);
2,291,201✔
1913
      break;
2,291,201✔
1914
    }
1915

1916
    if (pBInfo->pRes->info.rows > 0) {
13,228✔
1917
      break;
13,228✔
1918
    }
1919
  }
1920
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
2,304,429✔
1921

1922
_end:
2,304,429✔
1923
  if (code != TSDB_CODE_SUCCESS) {
2,304,429✔
1924
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1925
    pTaskInfo->code = code;
×
1926
    T_LONG_JMP(pTaskInfo->env, code);
×
1927
  }
1928
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
2,304,429✔
1929
  return code;
2,304,429✔
1930
}
1931

1932
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
44,946✔
1933
  SStateWindowOperatorInfo* pInfo = pOper->info;
44,946✔
1934
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
44,946✔
1935
  SStateWindowPhysiNode* pPhynode = (SStateWindowPhysiNode*)pOper->pPhyNode;
44,946✔
1936
  pOper->status = OP_NOT_OPENED;
44,946✔
1937

1938
  resetBasicOperatorState(&pInfo->binfo);
44,946✔
1939
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
44,946✔
1940
                    pInfo->cleanGroupResInfo);
44,946✔
1941

1942
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
44,946✔
1943
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
44,946✔
1944
  if (code == 0) {
44,946✔
1945
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
89,892✔
1946
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
44,946✔
1947
                       &pTaskInfo->storageAPI.functionStore);
1948
  }
1949
  if (code == 0) {
44,946✔
1950
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
44,946✔
1951
                         &pTaskInfo->storageAPI.functionStore);
1952
  }
1953

1954
  pInfo->cleanGroupResInfo = false;
44,946✔
1955
  pInfo->hasKey = false;
44,946✔
1956
  pInfo->winSup.lastTs = INT64_MIN;
44,946✔
1957
  cleanupGroupResInfo(&pInfo->groupResInfo);
44,946✔
1958
  memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
44,946✔
1959
  return code;
44,946✔
1960
}
1961

1962
// todo make this as an non-blocking operator
1963
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhysiNode* pStateNode,
1,938,974✔
1964
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1965
  QRY_PARAM_CHECK(pOptrInfo);
1,938,974✔
1966

1967
  int32_t                   code = TSDB_CODE_SUCCESS;
1,938,974✔
1968
  int32_t                   lino = 0;
1,938,974✔
1969
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
1,938,974✔
1970
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,938,974✔
1971
  if (pInfo == NULL || pOperator == NULL) {
1,938,974✔
1972
    code = terrno;
×
1973
    goto _error;
×
1974
  }
1975

1976
  pOperator->pPhyNode = pStateNode;
1,938,974✔
1977
  pOperator->exprSupp.hasWindowOrGroup = true;
1,938,974✔
1978
  pOperator->exprSupp.hasWindow = true;
1,938,974✔
1979
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
1,938,974✔
1980
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
1,938,974✔
1981

1982
  if (pStateNode->window.pExprs != NULL) {
1,938,974✔
1983
    int32_t    numOfScalarExpr = 0;
1,831,367✔
1984
    SExprInfo* pScalarExprInfo = NULL;
1,831,367✔
1985
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
1,831,367✔
1986
    QUERY_CHECK_CODE(code, lino, _error);
1,831,367✔
1987

1988
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
1,831,367✔
1989
    if (code != TSDB_CODE_SUCCESS) {
1,831,367✔
1990
      goto _error;
×
1991
    }
1992
  }
1993

1994
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
1,938,974✔
1995
  pInfo->stateKey.type = pInfo->stateCol.type;
1,938,974✔
1996
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
1,938,974✔
1997
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
1,938,974✔
1998
  if (pInfo->stateKey.pData == NULL) {
1,938,974✔
1999
    goto _error;
×
2000
  }
2001
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
1,938,974✔
2002
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
1,938,974✔
2003

2004
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,938,974✔
2005
                            pTaskInfo->pStreamRuntimeInfo);
1,938,974✔
2006
  if (code != TSDB_CODE_SUCCESS) {
1,938,974✔
2007
    goto _error;
×
2008
  }
2009

2010
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,938,974✔
2011

2012
  int32_t    num = 0;
1,938,974✔
2013
  SExprInfo* pExprInfo = NULL;
1,938,974✔
2014
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
1,938,974✔
2015
  QUERY_CHECK_CODE(code, lino, _error);
1,938,974✔
2016

2017
  initResultSizeInfo(&pOperator->resultInfo, 4096);
1,938,974✔
2018

2019
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
3,877,948✔
2020
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
1,938,974✔
2021
  if (code != TSDB_CODE_SUCCESS) {
1,938,974✔
2022
    goto _error;
×
2023
  }
2024

2025
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
1,938,974✔
2026
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,938,974✔
2027
  initBasicInfo(&pInfo->binfo, pResBlock);
1,938,974✔
2028
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,938,974✔
2029

2030
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,938,974✔
2031
  QUERY_CHECK_CODE(code, lino, _error);
1,938,974✔
2032

2033
  pInfo->tsSlotId = tsSlotId;
1,938,974✔
2034
  pInfo->pOperator = pOperator;
1,938,974✔
2035
  pInfo->cleanGroupResInfo = false;
1,938,974✔
2036
  pInfo->extendOption = pStateNode->extendOption;
1,938,974✔
2037
  pInfo->trueForLimit = pStateNode->trueForLimit;
1,938,974✔
2038
  pInfo->winSup.lastTs = INT64_MIN;
1,938,974✔
2039

2040
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
1,938,974✔
2041
                  pTaskInfo);
2042
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
1,938,974✔
2043
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2044
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
1,938,974✔
2045

2046
  code = appendDownstream(pOperator, &downstream, 1);
1,938,974✔
2047
  if (code != TSDB_CODE_SUCCESS) {
1,938,974✔
2048
    goto _error;
×
2049
  }
2050

2051
  *pOptrInfo = pOperator;
1,938,974✔
2052
  return TSDB_CODE_SUCCESS;
1,938,974✔
2053

2054
_error:
×
2055
  if (pInfo != NULL) {
×
2056
    destroyStateWindowOperatorInfo(pInfo);
×
2057
  }
2058

2059
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2060
  pTaskInfo->code = code;
×
2061
  return code;
×
2062
}
2063

2064
void destroySWindowOperatorInfo(void* param) {
2,352,545✔
2065
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
2,352,545✔
2066
  if (pInfo == NULL) {
2,352,545✔
2067
    return;
×
2068
  }
2069

2070
  cleanupBasicInfo(&pInfo->binfo);
2,352,545✔
2071
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
2,352,545✔
2072
  if (pInfo->pOperator) {
2,352,545✔
2073
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
2,352,545✔
2074
                      pInfo->cleanGroupResInfo);
2,352,545✔
2075
    pInfo->pOperator = NULL;
2,352,545✔
2076
  }
2077

2078
  cleanupAggSup(&pInfo->aggSup);
2,352,545✔
2079
  cleanupExprSupp(&pInfo->scalarSupp);
2,352,545✔
2080

2081
  cleanupGroupResInfo(&pInfo->groupResInfo);
2,352,545✔
2082
  taosMemoryFreeClear(param);
2,352,545✔
2083
}
2084

2085
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
2,658✔
2086
  SSessionAggOperatorInfo* pInfo = pOper->info;
2,658✔
2087
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
2,658✔
2088
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
2,658✔
2089
  pOper->status = OP_NOT_OPENED;
2,658✔
2090

2091
  resetBasicOperatorState(&pInfo->binfo);
2,658✔
2092
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
2,658✔
2093
                    pInfo->cleanGroupResInfo);
2,658✔
2094

2095
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
2,658✔
2096
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
2,658✔
2097
  if (code == 0) {
2,658✔
2098
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
5,316✔
2099
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
2,658✔
2100
                       &pTaskInfo->storageAPI.functionStore);
2101
  }
2102
  if (code == 0) {
2,658✔
2103
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
2,658✔
2104
                         &pTaskInfo->storageAPI.functionStore);
2105
  }
2106

2107
  pInfo->cleanGroupResInfo = false;
2,658✔
2108
  pInfo->winSup = (SWindowRowsSup){0};
2,658✔
2109
  pInfo->winSup.prevTs = INT64_MIN;
2,658✔
2110
  pInfo->reptScan = false;
2,658✔
2111

2112
  cleanupGroupResInfo(&pInfo->groupResInfo);
2,658✔
2113
  return code;
2,658✔
2114
}
2115

2116
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
2,352,545✔
2117
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2118
  QRY_PARAM_CHECK(pOptrInfo);
2,352,545✔
2119

2120
  int32_t                  code = TSDB_CODE_SUCCESS;
2,352,545✔
2121
  int32_t                  lino = 0;
2,352,545✔
2122
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
2,352,545✔
2123
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,352,545✔
2124
  if (pInfo == NULL || pOperator == NULL) {
2,352,545✔
2125
    code = terrno;
×
2126
    goto _error;
×
2127
  }
2128

2129
  pOperator->pPhyNode = pSessionNode;
2,352,545✔
2130
  pOperator->exprSupp.hasWindowOrGroup = true;
2,352,545✔
2131
  pOperator->exprSupp.hasWindow = true;
2,352,545✔
2132

2133
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,352,545✔
2134
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2,352,545✔
2135

2136
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
2,352,545✔
2137
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,352,545✔
2138
  initBasicInfo(&pInfo->binfo, pResBlock);
2,352,545✔
2139

2140
  int32_t    numOfCols = 0;
2,352,545✔
2141
  SExprInfo* pExprInfo = NULL;
2,352,545✔
2142
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
2,352,545✔
2143
  QUERY_CHECK_CODE(code, lino, _error);
2,352,545✔
2144

2145
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
4,705,090✔
2146
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
2,352,545✔
2147
  QUERY_CHECK_CODE(code, lino, _error);
2,352,545✔
2148

2149
  pInfo->gap = pSessionNode->gap;
2,352,545✔
2150

2151
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2,352,545✔
2152
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
2,352,545✔
2153
  QUERY_CHECK_CODE(code, lino, _error);
2,352,545✔
2154

2155
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
2,352,545✔
2156
  pInfo->binfo.pRes = pResBlock;
2,352,545✔
2157
  pInfo->winSup.prevTs = INT64_MIN;
2,352,545✔
2158
  pInfo->reptScan = false;
2,352,545✔
2159
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
2,352,545✔
2160
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
2,352,545✔
2161

2162
  if (pSessionNode->window.pExprs != NULL) {
2,352,545✔
2163
    int32_t    numOfScalar = 0;
226✔
2164
    SExprInfo* pScalarExprInfo = NULL;
226✔
2165
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
226✔
2166
    QUERY_CHECK_CODE(code, lino, _error);
226✔
2167

2168
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
226✔
2169
    QUERY_CHECK_CODE(code, lino, _error);
226✔
2170
  }
2171

2172
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
2,352,545✔
2173
                            pTaskInfo->pStreamRuntimeInfo);
2,352,545✔
2174
  QUERY_CHECK_CODE(code, lino, _error);
2,352,545✔
2175

2176
  pInfo->pOperator = pOperator;
2,352,545✔
2177
  pInfo->cleanGroupResInfo = false;
2,352,545✔
2178
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
2,352,545✔
2179
                  pInfo, pTaskInfo);
2180
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
2,352,545✔
2181
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2182
  pOperator->pTaskInfo = pTaskInfo;
2,352,545✔
2183
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
2,352,545✔
2184

2185
  code = appendDownstream(pOperator, &downstream, 1);
2,352,545✔
2186
  QUERY_CHECK_CODE(code, lino, _error);
2,352,545✔
2187

2188
  *pOptrInfo = pOperator;
2,352,545✔
2189
  return TSDB_CODE_SUCCESS;
2,352,545✔
2190

2191
_error:
×
2192
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
2193
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2194
  pTaskInfo->code = code;
×
2195
  return code;
×
2196
}
2197

2198
void destroyMAIOperatorInfo(void* param) {
575,308✔
2199
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
575,308✔
2200
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
575,308✔
2201
  taosMemoryFreeClear(param);
575,308✔
2202
}
575,308✔
2203

2204
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
508,175✔
2205
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
508,175✔
2206
  if (NULL == pResult) {
508,175✔
2207
    return pResult;
×
2208
  }
2209
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
508,175✔
2210
  return pResult;
508,175✔
2211
}
2212

2213
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
416,088,551✔
2214
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
2215
  if (*pResult == NULL) {
416,088,551✔
2216
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
508,175✔
2217
    if (*pResult == NULL) {
508,175✔
2218
      return terrno;
×
2219
    }
2220
  }
2221

2222
  // set time window for current result
2223
  (*pResult)->win = (*win);
416,088,551✔
2224
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
416,088,733✔
2225
}
2226

2227
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
1,902,350✔
2228
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2229
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
1,902,350✔
2230
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
1,902,350✔
2231

2232
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
1,902,350✔
2233
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
1,902,350✔
2234
  SInterval*     pInterval = &iaInfo->interval;
1,902,350✔
2235

2236
  int32_t  startPos = 0;
1,902,350✔
2237
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
1,902,350✔
2238

2239
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
1,902,350✔
2240

2241
  // there is an result exists
2242
  if (miaInfo->curTs != INT64_MIN) {
1,902,350✔
2243
    if (ts != miaInfo->curTs) {
667,094✔
2244
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
639,620✔
2245
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
639,620✔
2246
      miaInfo->curTs = ts;
639,620✔
2247
    }
2248
  } else {
2249
    miaInfo->curTs = ts;
1,235,256✔
2250
  }
2251

2252
  STimeWindow win = {0};
1,902,350✔
2253
  win.skey = miaInfo->curTs;
1,902,350✔
2254
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
1,902,350✔
2255

2256
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
1,902,350✔
2257
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
1,902,350✔
2258
    T_LONG_JMP(pTaskInfo->env, ret);
×
2259
  }
2260

2261
  int32_t currPos = startPos;
1,902,350✔
2262

2263
  STimeWindow currWin = win;
1,902,350✔
2264
  while (++currPos < pBlock->info.rows) {
936,510,141✔
2265
    if (tsCols[currPos] == miaInfo->curTs) {
934,603,787✔
2266
      continue;
520,420,498✔
2267
    }
2268

2269
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
414,187,475✔
2270
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
828,374,768✔
2271
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
414,187,293✔
2272
    if (ret != TSDB_CODE_SUCCESS) {
414,187,657✔
2273
      T_LONG_JMP(pTaskInfo->env, ret);
×
2274
    }
2275

2276
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
414,187,657✔
2277
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
414,187,111✔
2278
    miaInfo->curTs = tsCols[currPos];
414,187,475✔
2279

2280
    currWin.skey = miaInfo->curTs;
414,188,203✔
2281
    currWin.ekey =
414,186,201✔
2282
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
414,188,203✔
2283

2284
    startPos = currPos;
414,186,201✔
2285
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
414,186,201✔
2286
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
414,187,293✔
2287
      T_LONG_JMP(pTaskInfo->env, ret);
×
2288
    }
2289

2290
    miaInfo->curTs = currWin.skey;
414,187,293✔
2291
  }
2292

2293
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
1,902,350✔
2294
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
3,804,700✔
2295
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
1,902,350✔
2296
  if (ret != TSDB_CODE_SUCCESS) {
1,902,350✔
2297
    T_LONG_JMP(pTaskInfo->env, ret);
×
2298
  }
2299
}
1,902,350✔
2300

2301
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
1,229,487✔
2302
  pRes->info.id.groupId = pMiaInfo->groupId;
1,229,487✔
2303
  pMiaInfo->curTs = INT64_MIN;
1,229,487✔
2304
  pMiaInfo->groupId = 0;
1,229,487✔
2305
}
1,229,487✔
2306

2307
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
1,720,982✔
2308
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
1,720,982✔
2309
  int32_t                               code = TSDB_CODE_SUCCESS;
1,720,982✔
2310
  int32_t                               lino = 0;
1,720,982✔
2311
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
1,720,982✔
2312
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
1,720,982✔
2313

2314
  SExprSupp*      pSup = &pOperator->exprSupp;
1,720,982✔
2315
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
1,720,982✔
2316
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
1,720,982✔
2317
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
1,720,982✔
2318

2319
  while (1) {
1,473,156✔
2320
    SSDataBlock* pBlock = NULL;
3,194,138✔
2321
    if (pMiaInfo->prefetchedBlock == NULL) {
3,194,138✔
2322
      pBlock = getNextBlockFromDownstream(pOperator, 0);
2,467,057✔
2323
    } else {
2324
      pBlock = pMiaInfo->prefetchedBlock;
727,081✔
2325
      pMiaInfo->prefetchedBlock = NULL;
727,081✔
2326

2327
      pMiaInfo->groupId = pBlock->info.id.groupId;
727,081✔
2328
    }
2329

2330
    // no data exists, all query processing is done
2331
    if (pBlock == NULL) {
3,194,138✔
2332
      // close last unclosed time window
2333
      if (pMiaInfo->curTs != INT64_MIN) {
564,707✔
2334
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
502,406✔
2335
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
502,406✔
2336
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
502,406✔
2337
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
502,406✔
2338
        QUERY_CHECK_CODE(code, lino, _end);
502,406✔
2339
      }
2340

2341
      setOperatorCompleted(pOperator);
564,707✔
2342
      break;
564,707✔
2343
    }
2344

2345
    if (pMiaInfo->groupId == 0) {
2,629,431✔
2346
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
875,943✔
2347
        pMiaInfo->groupId = pBlock->info.id.groupId;
114,557✔
2348
        pRes->info.id.groupId = pMiaInfo->groupId;
114,557✔
2349
      }
2350
    } else {
2351
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
1,753,488✔
2352
        // if there are unclosed time window, close it firstly.
2353
        if (pMiaInfo->curTs == INT64_MIN) {
727,081✔
2354
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2355
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2356
        }
2357
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
727,081✔
2358
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
727,081✔
2359

2360
        pMiaInfo->prefetchedBlock = pBlock;
727,081✔
2361
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
727,081✔
2362
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
727,081✔
2363
        QUERY_CHECK_CODE(code, lino, _end);
727,081✔
2364
        if (pRes->info.rows == 0) {
727,081✔
2365
          // After filtering for last group, the result is empty, so we need to continue to process next group
2366
          continue;
4,828✔
2367
        } else {
2368
          break;
722,253✔
2369
        }
2370
      } else {
2371
        // continue
2372
        pRes->info.id.groupId = pMiaInfo->groupId;
1,026,407✔
2373
      }
2374
    }
2375

2376
    pRes->info.scanFlag = pBlock->info.scanFlag;
1,902,350✔
2377
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
1,902,350✔
2378
    QUERY_CHECK_CODE(code, lino, _end);
1,902,350✔
2379

2380
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
1,902,350✔
2381

2382
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,902,350✔
2383
    QUERY_CHECK_CODE(code, lino, _end);
1,902,350✔
2384

2385
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
1,902,350✔
2386
      break;
434,022✔
2387
    }
2388
  }
2389

2390
_end:
1,720,982✔
2391
  if (code != TSDB_CODE_SUCCESS) {
1,720,982✔
2392
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2393
    pTaskInfo->code = code;
×
2394
    T_LONG_JMP(pTaskInfo->env, code);
×
2395
  }
2396
}
1,720,982✔
2397

2398
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,011,584✔
2399
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
2,011,584✔
2400
  int32_t                               code = TSDB_CODE_SUCCESS;
2,011,584✔
2401
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
2,011,584✔
2402
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
2,011,584✔
2403
  if (pOperator->status == OP_EXEC_DONE) {
2,011,584✔
2404
    (*ppRes) = NULL;
508,701✔
2405
    return code;
508,701✔
2406
  }
2407

2408
  SSDataBlock* pRes = iaInfo->binfo.pRes;
1,502,883✔
2409
  blockDataCleanup(pRes);
1,502,883✔
2410

2411
  if (iaInfo->binfo.mergeResultBlock) {
1,502,883✔
2412
    while (1) {
2413
      if (pOperator->status == OP_EXEC_DONE) {
1,428,729✔
2414
        break;
254,200✔
2415
      }
2416

2417
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
1,174,529✔
2418
        break;
351,115✔
2419
      }
2420

2421
      doMergeAlignedIntervalAgg(pOperator);
823,414✔
2422
    }
2423
  } else {
2424
    doMergeAlignedIntervalAgg(pOperator);
897,568✔
2425
  }
2426

2427
  size_t rows = pRes->info.rows;
1,502,883✔
2428
  pOperator->resultInfo.totalRows += rows;
1,502,883✔
2429
  (*ppRes) = (rows == 0) ? NULL : pRes;
1,502,883✔
2430
  return code;
1,502,883✔
2431
}
2432

2433
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
×
2434
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
×
2435
  
2436
  uint64_t     groupId;  // current groupId
2437
  int64_t      curTs;    // current ts
2438
  SSDataBlock* prefetchedBlock;
2439
  SResultRow*  pResultRow;
2440

2441
  pInfo->groupId = 0;
×
2442
  pInfo->curTs = INT64_MIN;
×
2443
  pInfo->prefetchedBlock = NULL;
×
2444
  pInfo->pResultRow = NULL;
×
2445

2446
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
×
2447
}
2448

2449
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
575,308✔
2450
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2451
  QRY_PARAM_CHECK(pOptrInfo);
575,308✔
2452

2453
  int32_t                               code = TSDB_CODE_SUCCESS;
575,308✔
2454
  int32_t                               lino = 0;
575,308✔
2455
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
575,308✔
2456
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
575,308✔
2457
  if (miaInfo == NULL || pOperator == NULL) {
575,308✔
2458
    code = terrno;
×
2459
    goto _error;
×
2460
  }
2461

2462
  pOperator->pPhyNode = pNode;
575,308✔
2463
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
575,308✔
2464
  if (miaInfo->intervalAggOperatorInfo == NULL) {
575,308✔
2465
    code = terrno;
×
2466
    goto _error;
×
2467
  }
2468

2469
  SInterval interval = {.interval = pNode->interval,
1,725,924✔
2470
                        .sliding = pNode->sliding,
575,308✔
2471
                        .intervalUnit = pNode->intervalUnit,
575,308✔
2472
                        .slidingUnit = pNode->slidingUnit,
575,308✔
2473
                        .offset = pNode->offset,
575,308✔
2474
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
575,308✔
2475
                        .timeRange = pNode->timeRange};
2476
  calcIntervalAutoOffset(&interval);
575,308✔
2477

2478
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
575,308✔
2479
  SExprSupp*                pSup = &pOperator->exprSupp;
575,308✔
2480
  pSup->hasWindowOrGroup = true;
575,308✔
2481
  pSup->hasWindow = true;
575,308✔
2482

2483
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
575,308✔
2484
                            pTaskInfo->pStreamRuntimeInfo);
575,308✔
2485
  QUERY_CHECK_CODE(code, lino, _error);
575,308✔
2486

2487
  miaInfo->curTs = INT64_MIN;
575,308✔
2488
  iaInfo->win = pTaskInfo->window;
575,308✔
2489
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
575,308✔
2490
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
575,308✔
2491
  iaInfo->interval = interval;
575,308✔
2492
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
575,308✔
2493
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
575,308✔
2494

2495
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
575,308✔
2496
  initResultSizeInfo(&pOperator->resultInfo, 512);
575,308✔
2497

2498
  int32_t    num = 0;
575,308✔
2499
  SExprInfo* pExprInfo = NULL;
575,308✔
2500
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
575,308✔
2501
  QUERY_CHECK_CODE(code, lino, _error);
575,308✔
2502

2503
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
1,150,616✔
2504
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
575,308✔
2505
  QUERY_CHECK_CODE(code, lino, _error);
575,308✔
2506

2507
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
575,308✔
2508
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
575,308✔
2509
  initBasicInfo(&iaInfo->binfo, pResBlock);
575,308✔
2510
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
575,308✔
2511
  QUERY_CHECK_CODE(code, lino, _error);
575,308✔
2512

2513
  iaInfo->timeWindowInterpo = false;
575,308✔
2514
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
575,308✔
2515
  QUERY_CHECK_CODE(code, lino, _error);
575,308✔
2516
  if (iaInfo->timeWindowInterpo) {
575,308✔
2517
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2518
  }
2519

2520
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
575,308✔
2521
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
575,308✔
2522
  QUERY_CHECK_CODE(code, lino, _error);
575,308✔
2523
  iaInfo->pOperator = pOperator;
575,308✔
2524
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
575,308✔
2525
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2526

2527
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
575,308✔
2528
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2529
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
575,308✔
2530

2531
  code = appendDownstream(pOperator, &downstream, 1);
575,308✔
2532
  QUERY_CHECK_CODE(code, lino, _error);
575,308✔
2533

2534
  *pOptrInfo = pOperator;
575,308✔
2535
  return TSDB_CODE_SUCCESS;
575,308✔
2536

2537
_error:
×
2538
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2539
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2540
  pTaskInfo->code = code;
×
2541
  return code;
×
2542
}
2543

2544
//=====================================================================================================================
2545
// merge interval operator
2546
typedef struct SMergeIntervalAggOperatorInfo {
2547
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2548
  SList*                   groupIntervals;
2549
  SListIter                groupIntervalsIter;
2550
  bool                     hasGroupId;
2551
  uint64_t                 groupId;
2552
  SSDataBlock*             prefetchedBlock;
2553
  bool                     inputBlocksFinished;
2554
} SMergeIntervalAggOperatorInfo;
2555

2556
typedef struct SGroupTimeWindow {
2557
  uint64_t    groupId;
2558
  STimeWindow window;
2559
} SGroupTimeWindow;
2560

2561
void destroyMergeIntervalOperatorInfo(void* param) {
×
2562
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2563
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2564
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2565

2566
  taosMemoryFreeClear(param);
×
2567
}
×
2568

2569
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2570
                                        STimeWindow* newWin) {
2571
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2572
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2573
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2574

2575
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2576
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2577
  if (code != TSDB_CODE_SUCCESS) {
×
2578
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2579
    return code;
×
2580
  }
2581

2582
  SListIter iter = {0};
×
2583
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2584
  SListNode* listNode = NULL;
×
2585
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2586
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2587
    if (prevGrpWin->groupId != tableGroupId) {
×
2588
      continue;
×
2589
    }
2590

2591
    STimeWindow* prevWin = &prevGrpWin->window;
×
2592
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2593
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2594
      taosMemoryFreeClear(tmp);
×
2595
    }
2596
  }
2597

2598
  return TSDB_CODE_SUCCESS;
×
2599
}
2600

2601
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2602
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2603
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2604
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2605

2606
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2607
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2608

2609
  int32_t     startPos = 0;
×
2610
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2611
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2612
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2613
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2614
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2615
  SResultRow* pResult = NULL;
×
2616

2617
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2618
                                        iaInfo->binfo.inputTsOrder);
2619

2620
  int32_t ret =
2621
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2622
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2623
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2624
    T_LONG_JMP(pTaskInfo->env, ret);
×
2625
  }
2626

2627
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2628
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2629
                                                 iaInfo->binfo.inputTsOrder);
2630
  if (forwardRows <= 0) {
×
2631
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2632
  }
2633

2634
  // prev time window not interpolation yet.
2635
  if (iaInfo->timeWindowInterpo) {
×
2636
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2637
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2638

2639
    // restore current time window
2640
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2641
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2642
    if (ret != TSDB_CODE_SUCCESS) {
×
2643
      T_LONG_JMP(pTaskInfo->env, ret);
×
2644
    }
2645

2646
    // window start key interpolation
2647
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2648
    if (ret != TSDB_CODE_SUCCESS) {
×
2649
      T_LONG_JMP(pTaskInfo->env, ret);
×
2650
    }
2651
  }
2652

2653
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2654
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2655
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
2656
  if (ret != TSDB_CODE_SUCCESS) {
×
2657
    T_LONG_JMP(pTaskInfo->env, ret);
×
2658
  }
2659
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2660

2661
  // output previous interval results after this interval (&win) is closed
2662
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2663
  if (code != TSDB_CODE_SUCCESS) {
×
2664
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2665
    T_LONG_JMP(pTaskInfo->env, code);
×
2666
  }
2667

2668
  STimeWindow nextWin = win;
×
2669
  while (1) {
×
2670
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2671
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2672
                                      iaInfo->binfo.inputTsOrder);
2673
    if (startPos < 0) {
×
2674
      break;
×
2675
    }
2676

2677
    // null data, failed to allocate more memory buffer
2678
    code =
2679
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2680
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2681
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2682
      T_LONG_JMP(pTaskInfo->env, code);
×
2683
    }
2684

2685
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2686
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2687
                                           iaInfo->binfo.inputTsOrder);
2688

2689
    // window start(end) key interpolation
2690
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2691
    if (code != TSDB_CODE_SUCCESS) {
×
2692
      T_LONG_JMP(pTaskInfo->env, code);
×
2693
    }
2694

2695
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2696
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2697
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2698
    if (code != TSDB_CODE_SUCCESS) {
×
2699
      T_LONG_JMP(pTaskInfo->env, code);
×
2700
    }
2701
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2702

2703
    // output previous interval results after this interval (&nextWin) is closed
2704
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2705
    if (code != TSDB_CODE_SUCCESS) {
×
2706
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2707
      T_LONG_JMP(pTaskInfo->env, code);
×
2708
    }
2709
  }
2710

2711
  if (iaInfo->timeWindowInterpo) {
×
2712
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2713
  }
2714
}
×
2715

2716
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2717
  int32_t        code = TSDB_CODE_SUCCESS;
×
2718
  int32_t        lino = 0;
×
2719
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2720

2721
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2722
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2723
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2724

2725
  if (pOperator->status == OP_EXEC_DONE) {
×
2726
    (*ppRes) = NULL;
×
2727
    return code;
×
2728
  }
2729

2730
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2731
  blockDataCleanup(pRes);
×
2732
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2733
  QUERY_CHECK_CODE(code, lino, _end);
×
2734

2735
  if (!miaInfo->inputBlocksFinished) {
×
2736
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2737
    while (1) {
×
2738
      SSDataBlock* pBlock = NULL;
×
2739
      if (miaInfo->prefetchedBlock == NULL) {
×
2740
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2741
      } else {
2742
        pBlock = miaInfo->prefetchedBlock;
×
2743
        miaInfo->groupId = pBlock->info.id.groupId;
×
2744
        miaInfo->prefetchedBlock = NULL;
×
2745
      }
2746

2747
      if (pBlock == NULL) {
×
2748
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2749
        miaInfo->inputBlocksFinished = true;
×
2750
        break;
×
2751
      }
2752

2753
      if (!miaInfo->hasGroupId) {
×
2754
        miaInfo->hasGroupId = true;
×
2755
        miaInfo->groupId = pBlock->info.id.groupId;
×
2756
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2757
        miaInfo->prefetchedBlock = pBlock;
×
2758
        break;
×
2759
      }
2760

2761
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2762
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2763
      QUERY_CHECK_CODE(code, lino, _end);
×
2764

2765
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2766

2767
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2768
        break;
×
2769
      }
2770
    }
2771

2772
    pRes->info.id.groupId = miaInfo->groupId;
×
2773
  }
2774

2775
  if (miaInfo->inputBlocksFinished) {
×
2776
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2777

2778
    if (listNode != NULL) {
×
2779
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2780
      pRes->info.id.groupId = grpWin->groupId;
×
2781
    }
2782
  }
2783

2784
  if (pRes->info.rows == 0) {
×
2785
    setOperatorCompleted(pOperator);
×
2786
  }
2787

2788
  size_t rows = pRes->info.rows;
×
2789
  pOperator->resultInfo.totalRows += rows;
×
2790

2791
_end:
×
2792
  if (code != TSDB_CODE_SUCCESS) {
×
2793
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2794
    pTaskInfo->code = code;
×
2795
    T_LONG_JMP(pTaskInfo->env, code);
×
2796
  }
2797
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2798
  return code;
×
2799
}
2800

2801
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
2802
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
2803

2804
  pInfo->hasGroupId = false;
×
2805
  pInfo->groupId = 0;
×
2806
  pInfo->prefetchedBlock = NULL;
×
2807
  pInfo->inputBlocksFinished = false;
×
2808
  tdListEmpty(pInfo->groupIntervals);
×
2809
  
2810
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
2811
  return resetInterval(pOper, pIntervalInfo);
×
2812
}
2813

2814
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2815
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2816
  QRY_PARAM_CHECK(pOptrInfo);
×
2817

2818
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2819
  int32_t                        lino = 0;
×
2820
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2821
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2822
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2823
    code = terrno;
×
2824
    goto _error;
×
2825
  }
2826

2827
  pOperator->pPhyNode = pIntervalPhyNode;
×
2828
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2829
                        .sliding = pIntervalPhyNode->sliding,
×
2830
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2831
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2832
                        .offset = pIntervalPhyNode->offset,
×
2833
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2834
                        .timeRange = pIntervalPhyNode->timeRange};
2835
  calcIntervalAutoOffset(&interval);
×
2836

2837
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2838

2839
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2840
  pIntervalInfo->win = pTaskInfo->window;
×
2841
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2842
  pIntervalInfo->interval = interval;
×
2843
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2844
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2845
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2846

2847
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2848
  pExprSupp->hasWindowOrGroup = true;
×
2849
  pExprSupp->hasWindow = true;
×
2850

2851
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2852
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2853

2854
  int32_t    num = 0;
×
2855
  SExprInfo* pExprInfo = NULL;
×
2856
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2857
  QUERY_CHECK_CODE(code, lino, _error);
×
2858

2859
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2860
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2861
  if (code != TSDB_CODE_SUCCESS) {
×
2862
    goto _error;
×
2863
  }
2864

2865
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2866
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2867
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2868
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2869
  QUERY_CHECK_CODE(code, lino, _error);
×
2870

2871
  pIntervalInfo->timeWindowInterpo = false;
×
2872
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2873
  QUERY_CHECK_CODE(code, lino, _error);
×
2874
  if (pIntervalInfo->timeWindowInterpo) {
×
2875
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2876
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2877
      goto _error;
×
2878
    }
2879
  }
2880

2881
  pIntervalInfo->pOperator = pOperator;
×
2882
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2883
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2884
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2885
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2886
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2887
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
2888

2889
  code = appendDownstream(pOperator, &downstream, 1);
×
2890
  if (code != TSDB_CODE_SUCCESS) {
×
2891
    goto _error;
×
2892
  }
2893

2894
  *pOptrInfo = pOperator;
×
2895
  return TSDB_CODE_SUCCESS;
×
2896
_error:
×
2897
  if (pMergeIntervalInfo != NULL) {
×
2898
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2899
  }
2900
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2901
  pTaskInfo->code = code;
×
2902
  return code;
×
2903
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc