• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4887

16 Dec 2025 08:27AM UTC coverage: 65.289% (-0.003%) from 65.292%
#4887

push

travis-ci

web-flow
feat[TS-7233]: audit (#33850)

377 of 536 new or added lines in 28 files covered. (70.34%)

1025 existing lines in 111 files now uncovered.

178977 of 274129 relevant lines covered (65.29%)

102580217.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.58
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47

48
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
2,147,483,647✔
49
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
50
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
51
                                      SExecTaskInfo* pTaskInfo) {
52
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
2,147,483,647✔
53
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
54

55
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,147,483,647✔
UNCOV
56
    *pResult = NULL;
×
57
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
58
    return pTaskInfo->code;
×
59
  }
60

61
  // set time window for current result
62
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
2,147,483,647✔
63
  *pResult = pResultRow;
2,147,483,647✔
64
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,147,483,647✔
65
}
66

67
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, int32_t rowIndex, uint64_t groupId) {
89,394,772✔
68
  pRowSup->win.ekey = ts;
89,394,772✔
69
  pRowSup->prevTs = ts;
89,394,772✔
70
  pRowSup->groupId = groupId;
89,394,772✔
71
  pRowSup->numOfRows += 1;
89,394,772✔
72
  if (hasContinuousNullRows(pRowSup)) {
89,394,772✔
73
    // rows having null state col are wrapped by rows of same state
74
    // these rows can be counted into current window
75
    pRowSup->numOfRows += pRowSup->numNullRows;
5,349,548✔
76
    resetNumNullRows(pRowSup);
5,349,548✔
77
  }
78
}
89,394,772✔
79

80
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
1,450,236✔
81
  pRowSup->startRowIndex = rowIndex;
1,450,236✔
82
  pRowSup->numOfRows = 0;
1,450,236✔
83
  pRowSup->win.skey = tsList[rowIndex];
1,450,236✔
84
  pRowSup->groupId = groupId;
1,450,236✔
85
  resetNumNullRows(pRowSup);
1,450,236✔
86
}
1,450,236✔
87

88
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
89
                                            int32_t order, int64_t* pData) {
90
  int32_t forwardRows = 0;
2,147,483,647✔
91

92
  if (order == TSDB_ORDER_ASC) {
×
93
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
2,147,483,647✔
94
    if (end >= 0) {
2,147,483,647✔
95
      forwardRows = end;
2,147,483,647✔
96

97
      while (pData[end + pos] == ekey) {
2,147,483,647✔
98
        forwardRows += 1;
4,163,377✔
99
        ++pos;
4,163,377✔
100
      }
101
    }
102
  } else {
103
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
462,919,509✔
104
    if (end >= 0) {
462,856,961✔
105
      forwardRows = end;
463,024,017✔
106

107
      while (pData[end + pos] == ekey) {
918,934,413✔
108
        forwardRows += 1;
455,910,396✔
109
        ++pos;
455,910,396✔
110
      }
111
    }
112
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
113
    //    if (end >= 0) {
114
    //      forwardRows = pos - end;
115
    //
116
    //      if (pData[end] == ekey) {
117
    //        forwardRows += 1;
118
    //      }
119
    //    }
120
  }
121

122
  return forwardRows;
2,147,483,647✔
123
}
124

125
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
2,147,483,647✔
126
  int32_t midPos = -1;
2,147,483,647✔
127
  int32_t numOfRows;
128

129
  if (num <= 0) {
2,147,483,647✔
130
    return -1;
×
131
  }
132

133
  TSKEY*  keyList = (TSKEY*)pValue;
2,147,483,647✔
134
  int32_t firstPos = 0;
2,147,483,647✔
135
  int32_t lastPos = num - 1;
2,147,483,647✔
136

137
  if (order == TSDB_ORDER_DESC) {
2,147,483,647✔
138
    // find the first position which is smaller than the key
139
    while (1) {
140
      if (key >= keyList[firstPos]) return firstPos;
727,251,445✔
141
      if (key == keyList[lastPos]) return lastPos;
291,672,524✔
142

143
      if (key < keyList[lastPos]) {
284,303,623✔
144
        lastPos += 1;
7,009,597✔
145
        if (lastPos >= num) {
7,009,597✔
146
          return -1;
×
147
        } else {
148
          return lastPos;
7,009,597✔
149
        }
150
      }
151

152
      numOfRows = lastPos - firstPos + 1;
277,279,149✔
153
      midPos = (numOfRows >> 1) + firstPos;
277,279,149✔
154

155
      if (key < keyList[midPos]) {
277,279,149✔
156
        firstPos = midPos + 1;
14,322,819✔
157
      } else if (key > keyList[midPos]) {
263,036,498✔
158
        lastPos = midPos - 1;
249,686,513✔
159
      } else {
160
        break;
13,402,806✔
161
      }
162
    }
163

164
  } else {
165
    // find the first position which is bigger than the key
166
    while (1) {
167
      if (key <= keyList[firstPos]) return firstPos;
2,147,483,647✔
168
      if (key == keyList[lastPos]) return lastPos;
2,147,483,647✔
169

170
      if (key > keyList[lastPos]) {
2,147,483,647✔
171
        lastPos = lastPos + 1;
2,098,440,706✔
172
        if (lastPos >= num)
2,098,440,706✔
173
          return -1;
×
174
        else
175
          return lastPos;
2,098,440,706✔
176
      }
177

178
      numOfRows = lastPos - firstPos + 1;
2,147,483,647✔
179
      midPos = (numOfRows >> 1u) + firstPos;
2,147,483,647✔
180

181
      if (key < keyList[midPos]) {
2,147,483,647✔
182
        lastPos = midPos - 1;
2,147,483,647✔
183
      } else if (key > keyList[midPos]) {
1,115,811,152✔
184
        firstPos = midPos + 1;
1,114,400,551✔
185
      } else {
186
        break;
1,432,678✔
187
      }
188
    }
189
  }
190

191
  return midPos;
14,835,484✔
192
}
193

194
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
2,147,483,647✔
195
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
196
  int32_t num = -1;
2,147,483,647✔
197
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
198

199
  if (order == TSDB_ORDER_ASC) {
2,147,483,647✔
200
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
2,147,483,647✔
201
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
2,147,483,647✔
202
      if (item != NULL) {
2,147,483,647✔
203
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
204
      }
205
    } else {
206
      num = pDataBlockInfo->rows - startPos;
37,445,907✔
207
      if (item != NULL) {
37,814,087✔
208
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
209
      }
210
    }
211
  } else {  // desc
212
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
460,143,791✔
213
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
462,822,311✔
214
      if (item != NULL) {
462,851,668✔
215
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
216
      }
217
    } else {
218
      num = pDataBlockInfo->rows - startPos;
596,046✔
219
      if (item != NULL) {
1,212,037✔
220
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
221
      }
222
    }
223
  }
224

225
  return num;
2,147,483,647✔
226
}
227

228
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
11,135,979✔
229
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
230
  SqlFunctionCtx* pCtx = pSup->pCtx;
11,135,979✔
231

232
  int32_t index = 1;
11,135,979✔
233
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
33,413,337✔
234
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
22,277,358✔
235
      pCtx[k].start.key = INT64_MIN;
11,141,379✔
236
      continue;
11,141,379✔
237
    }
238

239
    SFunctParam*     pParam = &pCtx[k].param[0];
11,135,979✔
240
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
11,135,979✔
241

242
    double v1 = 0, v2 = 0, v = 0;
11,135,979✔
243
    if (prevRowIndex == -1) {
11,135,979✔
244
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
245
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
×
246
    } else {
247
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
11,135,979✔
248
                     typeGetTypeModFromColInfo(&pColInfo->info));
249
    }
250

251
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
11,135,979✔
252
                   typeGetTypeModFromColInfo(&pColInfo->info));
253

254
#if 0
255
    if (functionId == FUNCTION_INTERP) {
256
      if (type == RESULT_ROW_START_INTERP) {
257
        pCtx[k].start.key = prevTs;
258
        pCtx[k].start.val = v1;
259

260
        pCtx[k].end.key = curTs;
261
        pCtx[k].end.val = v2;
262

263
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
264
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
265
          if (prevRowIndex == -1) {
266
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
267
          } else {
268
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
269
          }
270

271
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
272
        }
273
      }
274
    } else if (functionId == FUNCTION_TWA) {
275
#endif
276

277
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
11,135,979✔
278
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
11,135,979✔
279
    SPoint point = (SPoint){.key = windowKey, .val = &v};
11,135,979✔
280

281
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
11,135,979✔
282
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
11,070,335✔
283
    }
284

285
    if (type == RESULT_ROW_START_INTERP) {
11,135,979✔
286
      pCtx[k].start.key = point.key;
5,533,916✔
287
      pCtx[k].start.val = v;
5,533,916✔
288
    } else {
289
      pCtx[k].end.key = point.key;
5,602,063✔
290
      pCtx[k].end.val = v;
5,602,063✔
291
    }
292

293
    index += 1;
11,135,979✔
294
  }
295
#if 0
296
  }
297
#endif
298
}
11,135,979✔
299

300
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
719,055✔
301
  if (type == RESULT_ROW_START_INTERP) {
719,055✔
302
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,144,927✔
303
      pCtx[k].start.key = INT64_MIN;
751,326✔
304
    }
305
  } else {
306
    for (int32_t k = 0; k < numOfOutput; ++k) {
975,254✔
307
      pCtx[k].end.key = INT64_MIN;
649,800✔
308
    }
309
  }
310
}
719,055✔
311

312
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
5,927,517✔
313
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
314
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
5,927,517✔
315

316
  TSKEY curTs = tsCols[pos];
5,927,517✔
317

318
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
5,927,517✔
319
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
5,927,517✔
320

321
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
322
  // start exactly from this point, no need to do interpolation
323
  TSKEY key = ascQuery ? win->skey : win->ekey;
5,927,517✔
324
  if (key == curTs) {
5,927,517✔
325
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
389,956✔
326
    return true;
389,956✔
327
  }
328

329
  // it is the first time window, no need to do interpolation
330
  if (pTsKey->isNull && pos == 0) {
5,537,561✔
331
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
3,645✔
332
  } else {
333
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
5,533,916✔
334
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
5,533,916✔
335
                              RESULT_ROW_START_INTERP, pSup);
336
  }
337

338
  return true;
5,537,561✔
339
}
340

341
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
5,927,517✔
342
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
343
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
344
  int32_t code = TSDB_CODE_SUCCESS;
5,927,517✔
345
  int32_t lino = 0;
5,927,517✔
346
  int32_t order = pInfo->binfo.inputTsOrder;
5,927,517✔
347

348
  TSKEY actualEndKey = tsCols[endRowIndex];
5,927,517✔
349
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
5,927,517✔
350

351
  // not ended in current data block, do not invoke interpolation
352
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
5,927,517✔
353
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
18,796✔
354
    (*pRes) = false;
18,796✔
355
    return code;
18,796✔
356
  }
357

358
  // there is actual end point of current time window, no interpolation needs
359
  if (key == actualEndKey) {
5,908,721✔
360
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
306,658✔
361
    (*pRes) = true;
306,658✔
362
    return code;
306,658✔
363
  }
364

365
  if (nextRowIndex < 0) {
5,602,063✔
366
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
367
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
368
  }
369

370
  TSKEY nextKey = tsCols[nextRowIndex];
5,602,063✔
371
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
5,602,063✔
372
                            RESULT_ROW_END_INTERP, pSup);
373
  (*pRes) = true;
5,602,063✔
374
  return code;
5,602,063✔
375
}
376

377
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
2,147,483,647✔
378
  if (pInterval->interval != pInterval->sliding &&
2,147,483,647✔
379
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
1,871,141✔
380
    return false;
×
381
  }
382

383
  return true;
2,147,483,647✔
384
}
385

386
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
2,147,483,647✔
387
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
2,147,483,647✔
388
}
389

390
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
2,147,483,647✔
391
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
392
  bool ascQuery = (order == TSDB_ORDER_ASC);
2,147,483,647✔
393

394
  int32_t precision = pInterval->precision;
2,147,483,647✔
395
  getNextTimeWindow(pInterval, pNext, order);
2,147,483,647✔
396

397
  // next time window is not in current block
398
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
2,147,483,647✔
399
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
2,147,483,647✔
400
    return -1;
33,984,124✔
401
  }
402

403
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
2,147,483,647✔
404
    return -1;
×
405
  }
406

407
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
2,147,483,647✔
408
  int32_t startPos = 0;
2,147,483,647✔
409

410
  // tumbling time window query, a special case of sliding time window query
411
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
2,147,483,647✔
412
    startPos = prevPosition + 1;
2,147,483,647✔
413
  } else {
414
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
629,936✔
415
      startPos = 0;
999,732✔
416
    } else {
417
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
871,409✔
418
    }
419
  }
420

421
  /* interp query with fill should not skip time window */
422
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
423
  //    return startPos;
424
  //  }
425

426
  /*
427
   * This time window does not cover any data, try next time window,
428
   * this case may happen when the time window is too small
429
   */
430
  if (primaryKeys != NULL) {
2,147,483,647✔
431
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
2,147,483,647✔
432
      TSKEY next = primaryKeys[startPos];
1,666,390,514✔
433
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
1,667,448,469✔
434
        pNext->skey = taosTimeTruncate(next, pInterval);
189,715✔
435
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
1,928✔
436
      } else {
437
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
1,667,641,327✔
438
        pNext->skey = pNext->ekey - pInterval->interval + 1;
1,668,348,867✔
439
      }
440
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
1,335,228,714✔
441
      TSKEY next = primaryKeys[startPos];
434,450,366✔
442
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
434,855,201✔
443
        pNext->skey = taosTimeTruncate(next, pInterval);
298,263✔
444
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
445
      } else {
446
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
435,071,714✔
447
        pNext->ekey = pNext->skey + pInterval->interval - 1;
435,071,990✔
448
      }
449
    }
450
  }
451

452
  return startPos;
2,147,483,647✔
453
}
454

455
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
17,782,551✔
456
  if (type == RESULT_ROW_START_INTERP) {
17,782,551✔
457
    return pResult->startInterp == true;
5,927,517✔
458
  } else {
459
    return pResult->endInterp == true;
11,855,034✔
460
  }
461
}
462

463
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
11,836,238✔
464
  if (type == RESULT_ROW_START_INTERP) {
11,836,238✔
465
    pResult->startInterp = true;
5,927,517✔
466
  } else {
467
    pResult->endInterp = true;
5,908,721✔
468
  }
469
}
11,836,238✔
470

471
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
2,147,483,647✔
472
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
473
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
474
  int32_t lino = 0;
2,147,483,647✔
475
  if (!pInfo->timeWindowInterpo) {
2,147,483,647✔
476
    return code;
2,147,483,647✔
477
  }
478

479
  if (pBlock == NULL) {
4,893,200✔
480
    code = TSDB_CODE_INVALID_PARA;
×
481
    return code;
×
482
  }
483

484
  if (pBlock->pDataBlock == NULL) {
4,893,200✔
485
    return code;
×
486
  }
487

488
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
5,927,517✔
489

490
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
5,927,517✔
491
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
5,927,517✔
492
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
5,927,517✔
493
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
5,927,517✔
494
    if (interp) {
5,927,517✔
495
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
5,927,517✔
496
    }
497
  } else {
498
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
×
499
  }
500

501
  // point interpolation does not require the end key time window interpolation.
502
  // interpolation query does not generate the time window end interpolation
503
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
5,927,517✔
504
  if (!done) {
5,927,517✔
505
    int32_t endRowIndex = startPos + forwardRows - 1;
5,927,517✔
506
    int32_t nextRowIndex = endRowIndex + 1;
5,927,517✔
507

508
    // duplicated ts row does not involve in the interpolation of end value for current time window
509
    int32_t x = endRowIndex;
5,927,517✔
510
    while (x > 0) {
5,941,138✔
511
      if (tsCols[x] == tsCols[x - 1]) {
5,927,509✔
512
        x -= 1;
13,621✔
513
      } else {
514
        endRowIndex = x;
5,913,888✔
515
        break;
5,913,888✔
516
      }
517
    }
518

519
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
5,927,517✔
520
    bool  interp = false;
5,927,517✔
521
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
5,927,517✔
522
                                           win, &interp);
523
    QUERY_CHECK_CODE(code, lino, _end);
5,927,517✔
524
    if (interp) {
5,927,517✔
525
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
5,908,721✔
526
    }
527
  } else {
528
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
529
  }
530

531
_end:
5,927,517✔
532
  if (code != TSDB_CODE_SUCCESS) {
5,927,517✔
533
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
534
  }
535
  return code;
5,927,517✔
536
}
537

538
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
18,796✔
539
  if (pBlock->pDataBlock == NULL) {
18,796✔
540
    return;
×
541
  }
542

543
  size_t num = taosArrayGetSize(pPrevKeys);
18,796✔
544
  for (int32_t k = 0; k < num; ++k) {
56,388✔
545
    SColumn* pc = taosArrayGet(pCols, k);
37,592✔
546

547
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
37,592✔
548

549
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
37,592✔
550
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
37,592✔
551
      if (colDataIsNull_s(pColInfo, i)) {
75,184✔
552
        continue;
×
553
      }
554

555
      char* val = colDataGetData(pColInfo, i);
37,592✔
556
      if (IS_VAR_DATA_TYPE(pkey->type)) {
37,592✔
557
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
558
          memcpy(pkey->pData, val, blobDataTLen(val));
×
559
        } else {
560
          memcpy(pkey->pData, val, varDataTLen(val));
×
561
        }
562
      } else {
563
        memcpy(pkey->pData, val, pkey->bytes);
37,592✔
564
      }
565

566
      break;
37,592✔
567
    }
568
  }
569
}
570

571
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
18,796✔
572
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
573
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
18,796✔
574

575
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
18,796✔
576
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
18,796✔
577

578
  int32_t startPos = 0;
18,796✔
579
  int32_t numOfOutput = pSup->numOfExprs;
18,796✔
580

581
  SResultRow* pResult = NULL;
18,796✔
582

583
  while (1) {
×
584
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
18,796✔
585
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
18,796✔
586
    uint64_t            groupId = pOpenWin->groupId;
18,796✔
587
    SResultRowPosition* p1 = &pOpenWin->pos;
18,796✔
588
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
18,796✔
589
      break;
18,796✔
590
    }
591

592
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
593
    if (NULL == pr) {
×
594
      T_LONG_JMP(pTaskInfo->env, terrno);
×
595
    }
596

597
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
598
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
599
      T_LONG_JMP(pTaskInfo->env, terrno);
×
600
    }
601

602
    if (pr->closed) {
×
603
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
604
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
605
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
606
        T_LONG_JMP(pTaskInfo->env, terrno);
×
607
      }
608
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
609
      taosMemoryFree(pNode);
×
610
      continue;
×
611
    }
612

613
    STimeWindow w = pr->win;
×
614
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
615
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
616
    if (ret != TSDB_CODE_SUCCESS) {
×
617
      T_LONG_JMP(pTaskInfo->env, ret);
×
618
    }
619

620
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
621
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
622
      T_LONG_JMP(pTaskInfo->env, terrno);
×
623
    }
624

625
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
626
    if (!pTsKey) {
×
627
      pTaskInfo->code = terrno;
×
628
      T_LONG_JMP(pTaskInfo->env, terrno);
×
629
    }
630

631
    int64_t prevTs = *(int64_t*)pTsKey->pData;
×
632
    if (groupId == pBlock->info.id.groupId) {
×
633
      TSKEY curTs = pBlock->info.window.skey;
×
634
      if (tsCols != NULL) {
×
635
        curTs = tsCols[startPos];
×
636
      }
637
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
638
                                RESULT_ROW_END_INTERP, pSup);
639
    }
640

641
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
642
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
643

644
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
645
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
646
                                          pBlock->info.rows, numOfExprs);
×
647
    if (ret != TSDB_CODE_SUCCESS) {
×
648
      T_LONG_JMP(pTaskInfo->env, ret);
×
649
    }
650

651
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
652
      closeResultRow(pr);
×
653
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
654
      taosMemoryFree(pNode);
×
655
    } else {  // the remains are can not be closed yet.
656
      break;
×
657
    }
658
  }
659
}
18,796✔
660

661
static bool tsKeyCompFn(void* l, void* r, void* param) {
1,249,540,138✔
662
  TSKEY*                    lTS = (TSKEY*)l;
1,249,540,138✔
663
  TSKEY*                    rTS = (TSKEY*)r;
1,249,540,138✔
664
  SIntervalAggOperatorInfo* pInfo = param;
1,249,540,138✔
665
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
1,249,540,138✔
666
}
667

668
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
180,095,205✔
669
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
180,095,205✔
670
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
180,141,697✔
671
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
180,568,005✔
672
}
673

674
/**
675
 * @brief check if cur window should be filtered out by limit info
676
 * @retval true if should be filtered out
677
 * @retval false if not filtering out
678
 * @note If no limit info, we skip filtering.
679
 *       If input/output ts order mismatch, we skip filtering too.
680
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
681
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
682
 *       every tuple in every block.
683
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
684
 */
685
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
2,147,483,647✔
686
                                  SExecTaskInfo* pTaskInfo) {
687
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
688
  int32_t lino = 0;
2,147,483,647✔
689
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
2,147,483,647✔
690
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
373,857,498✔
691
      // if input/output ts order mismatch, no filter
692
  ) {
693
    return false;
2,147,483,647✔
694
  }
695

696
  if (pOperatorInfo->limit == 0) return true;
183,467,085✔
697

698
  if (pOperatorInfo->pBQ == NULL) {
183,339,086✔
699
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
728,195✔
700
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
727,998✔
701
  }
702

703
  bool shouldFilter = false;
183,458,271✔
704
  // if BQ has been full, compare it with top of BQ
705
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
183,458,271✔
706
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
45,595,634✔
707
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
45,590,315✔
708
  }
709
  if (shouldFilter) {
183,212,691✔
710
    return true;
2,908,193✔
711
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
180,304,498✔
712
    return false;
63,726,836✔
713
  }
714

715
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
716
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
116,850,340✔
717
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
116,589,788✔
718

719
  *((TSKEY*)node.data) = win->skey;
116,589,788✔
720

721
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
116,602,593✔
722
    taosMemoryFree(node.data);
×
723
    return true;
×
724
  }
725

726
_end:
116,712,804✔
727
  if (code != TSDB_CODE_SUCCESS) {
116,707,288✔
728
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
67,374✔
729
    pTaskInfo->code = code;
67,374✔
730
    T_LONG_JMP(pTaskInfo->env, code);
×
731
  }
732
  return false;
116,640,229✔
733
}
734

735
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
36,927,982✔
736
                            int32_t scanFlag) {
737
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
36,927,982✔
738

739
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
36,928,496✔
740
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
36,927,300✔
741

742
  int32_t     startPos = 0;
36,927,799✔
743
  int32_t     numOfOutput = pSup->numOfExprs;
36,927,799✔
744
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
36,926,688✔
745
  uint64_t    tableGroupId = pBlock->info.id.groupId;
36,923,249✔
746
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
36,926,377✔
747
  TSKEY       ts = getStartTsKey(&pBlock->info.window, tsCols);
36,925,483✔
748
  SResultRow* pResult = NULL;
36,920,239✔
749

750
  if (tableGroupId != pInfo->curGroupId) {
36,923,252✔
751
    pInfo->handledGroupNum += 1;
3,314,681✔
752
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
3,313,666✔
753
      return true;
11,499✔
754
    } else {
755
      pInfo->curGroupId = tableGroupId;
3,303,894✔
756
      destroyBoundedQueue(pInfo->pBQ);
3,304,112✔
757
      pInfo->pBQ = NULL;
3,303,408✔
758
    }
759
  }
760

761
  STimeWindow win =
36,914,689✔
762
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
36,907,879✔
763
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
36,910,730✔
764

765
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
34,202,835✔
766
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
767
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
34,212,076✔
768
    T_LONG_JMP(pTaskInfo->env, ret);
3,938✔
769
  }
770

771
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
34,208,138✔
772
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
34,208,138✔
773
                                                 pInfo->binfo.inputTsOrder);
774

775
  // prev time window not interpolation yet.
776
  if (pInfo->timeWindowInterpo) {
34,202,536✔
777
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
18,796✔
778
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
18,796✔
779

780
    // restore current time window
781
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
18,796✔
782
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
783
    if (ret != TSDB_CODE_SUCCESS) {
18,796✔
784
      T_LONG_JMP(pTaskInfo->env, ret);
×
785
    }
786

787
    // window start key interpolation
788
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
18,796✔
789
    if (ret != TSDB_CODE_SUCCESS) {
18,796✔
790
      T_LONG_JMP(pTaskInfo->env, ret);
×
791
    }
792
  }
793
  // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
794
  //   win.skey, win.ekey, startPos, forwardRows);
795
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
34,201,638✔
796
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
34,207,843✔
797
                                        pBlock->info.rows, numOfOutput);
34,209,277✔
798
  if (ret != TSDB_CODE_SUCCESS) {
34,203,485✔
799
    T_LONG_JMP(pTaskInfo->env, ret);
×
800
  }
801

802
  doCloseWindow(pResultRowInfo, pInfo, pResult);
34,203,485✔
803

804
  STimeWindow nextWin = win;
34,204,689✔
805
  while (1) {
2,147,483,647✔
806
    int32_t prevEndPos = forwardRows - 1 + startPos;
2,147,483,647✔
807
    startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
2,147,483,647✔
808
                                      pInfo->binfo.inputTsOrder);
809
    if (startPos < 0 || filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
2,147,483,647✔
810
      break;
811
    }
812
    // null data, failed to allocate more memory buffer
813
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,147,483,647✔
814
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
815
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
2,147,483,647✔
816
      T_LONG_JMP(pTaskInfo->env, code);
661✔
817
    }
818

819
    // qDebug("hashIntervalAgg2 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
820
      // nextWin.skey, nextWin.ekey, startPos, forwardRows);
821

822
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
2,147,483,647✔
823
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,147,483,647✔
824
                                           pInfo->binfo.inputTsOrder);
825
    // window start(end) key interpolation
826
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
2,147,483,647✔
827
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
828
      T_LONG_JMP(pTaskInfo->env, code);
×
829
    }
830
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
831
#if 0
832
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
833
        (!ascScan && ekey >= pBlock->info.window.skey)) {
834
      // window start(end) key interpolation
835
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
836
    } else if (pInfo->timeWindowInterpo) {
837
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
838
    }
839
#endif
840
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
2,147,483,647✔
841
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,147,483,647✔
842
                                          pBlock->info.rows, numOfOutput);
2,147,483,647✔
843
    if (ret != TSDB_CODE_SUCCESS) {
2,147,483,647✔
844
      T_LONG_JMP(pTaskInfo->env, ret);
×
845
    }
846
    doCloseWindow(pResultRowInfo, pInfo, pResult);
2,147,483,647✔
847
  }
848

849
  if (pInfo->timeWindowInterpo) {
33,221,523✔
850
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
18,796✔
851
  }
852
  return false;
34,209,815✔
853
}
854

855
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
2,147,483,647✔
856
  // current result is done in computing final results.
857
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
2,147,483,647✔
858
    closeResultRow(pResult);
5,908,721✔
859
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
5,908,721✔
860
    taosMemoryFree(pNode);
5,908,721✔
861
  }
862
}
2,147,483,647✔
863

864
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
18,796✔
865
                                       SExecTaskInfo* pTaskInfo) {
866
  int32_t         code = TSDB_CODE_SUCCESS;
18,796✔
867
  int32_t         lino = 0;
18,796✔
868
  SOpenWindowInfo openWin = {0};
18,796✔
869
  openWin.pos.pageId = pResult->pageId;
18,796✔
870
  openWin.pos.offset = pResult->offset;
18,796✔
871
  openWin.groupId = groupId;
18,796✔
872
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
18,796✔
873
  if (pn == NULL) {
18,796✔
874
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
18,796✔
875
    QUERY_CHECK_CODE(code, lino, _end);
18,796✔
876
    return openWin.pos;
18,796✔
877
  }
878

879
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
×
880
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
×
881
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
882
    QUERY_CHECK_CODE(code, lino, _end);
×
883
  }
884

885
_end:
×
886
  if (code != TSDB_CODE_SUCCESS) {
×
887
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
888
    pTaskInfo->code = code;
×
889
    T_LONG_JMP(pTaskInfo->env, code);
×
890
  }
891
  return openWin.pos;
×
892
}
893

894
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
38,689,083✔
895
  TSKEY* tsCols = NULL;
38,689,083✔
896

897
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
38,689,083✔
898
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
38,692,400✔
899
    if (!pColDataInfo) {
38,688,697✔
900
      pTaskInfo->code = terrno;
×
901
      T_LONG_JMP(pTaskInfo->env, terrno);
×
902
    }
903

904
    tsCols = (int64_t*)pColDataInfo->pData;
38,688,697✔
905
    if (tsCols[0] == 0) {
38,687,491✔
906
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
482✔
907
            tsCols[pBlock->info.rows - 1]);
908
    }
909

910
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
38,692,220✔
911
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
2,406,521✔
912
      if (code != TSDB_CODE_SUCCESS) {
2,404,028✔
913
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
914
        pTaskInfo->code = code;
×
915
        T_LONG_JMP(pTaskInfo->env, code);
×
916
      }
917
    }
918
  }
919

920
  return tsCols;
38,685,327✔
921
}
922

923
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
8,613,261✔
924
  if (OPTR_IS_OPENED(pOperator)) {
8,613,261✔
925
    return TSDB_CODE_SUCCESS;
6,522,535✔
926
  }
927

928
  int32_t        code = TSDB_CODE_SUCCESS;
2,090,726✔
929
  int32_t        lino = 0;
2,090,726✔
930
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,090,726✔
931
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2,090,726✔
932

933
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
2,090,726✔
934
  SExprSupp*                pSup = &pOperator->exprSupp;
2,090,726✔
935

936
  int32_t scanFlag = MAIN_SCAN;
2,090,726✔
937
  int64_t st = taosGetTimestampUs();
2,090,726✔
938

939
  pInfo->cleanGroupResInfo = false;
2,090,726✔
940
  while (1) {
36,912,352✔
941
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
39,003,078✔
942
    if (pBlock == NULL) {
38,999,167✔
943
      break;
2,077,324✔
944
    }
945

946
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
36,921,843✔
947

948
    if (pInfo->scalarSupp.pExprInfo != NULL) {
36,928,247✔
949
      SExprSupp* pExprSup = &pInfo->scalarSupp;
2,592,033✔
950
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
2,592,166✔
951
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
2,592,142✔
952
      QUERY_CHECK_CODE(code, lino, _end);
2,592,142✔
953
    }
954

955
    // the pDataBlock are always the same one, no need to call this again
956
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
36,927,089✔
957
    QUERY_CHECK_CODE(code, lino, _end);
36,927,858✔
958
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
36,927,858✔
959
  }
960

961
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
2,088,823✔
962
  QUERY_CHECK_CODE(code, lino, _end);
2,089,768✔
963
  pInfo->cleanGroupResInfo = true;
2,089,768✔
964

965
  OPTR_SET_OPENED(pOperator);
2,089,185✔
966

967
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2,090,143✔
968

969
_end:
2,090,249✔
970
  if (code != TSDB_CODE_SUCCESS) {
2,090,249✔
971
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
972
    pTaskInfo->code = code;
×
973
    T_LONG_JMP(pTaskInfo->env, code);
×
974
  }
975
  return code;
2,090,249✔
976
}
977

978
// start a new state window and record the start info
979
void doKeepNewStateWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList,
5,341,884✔
980
  int32_t rowIndex, uint64_t groupId, const EStateWinExtendOption* extendOption, bool hasPrevWin) {
981
  pRowSup->groupId = groupId;
5,341,884✔
982
  if (*extendOption == STATE_WIN_EXTEND_OPTION_DEFAULT ||
5,341,884✔
983
      *extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
91,814✔
984
    pRowSup->win.skey = tsList[rowIndex];
5,295,254✔
985
    pRowSup->startRowIndex = rowIndex;
5,295,254✔
986
    pRowSup->numOfRows = 0;  // does not include the current row yet
5,295,254✔
987
  } else {
988
    pRowSup->startRowIndex = hasContinuousNullRows(pRowSup) ?
93,260✔
989
      rowIndex - pRowSup->numNullRows : rowIndex;
46,630✔
990
    pRowSup->win.skey = hasPrevWin ?
46,630✔
991
                        pRowSup->win.ekey + 1 : tsList[pRowSup->startRowIndex];
46,630✔
992
    pRowSup->numOfRows = rowIndex - pRowSup->startRowIndex;
46,630✔
993
  }
994
  resetNumNullRows(pRowSup);
5,341,884✔
995
}
5,341,884✔
996

997
// close a state window and record its end info
998
// this functions is called when a new state row appears
999
// @param rowIndex the index of the first row of next window
1000
void doKeepCurStateWindowEndInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, 
5,520,734✔
1001
                                 int32_t rowIndex,
1002
                                 const EStateWinExtendOption* extendOption,
1003
                                 bool hasNextWin) {
1004
  if (*extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
5,520,734✔
1005
      pRowSup->win.ekey = hasNextWin?
52,864✔
1006
                          tsList[rowIndex] - 1 : pRowSup->prevTs;
52,864✔
1007
      // continuous rows having null state col should be included in this window
1008
      pRowSup->numOfRows += hasContinuousNullRows(pRowSup) ?
105,728✔
1009
        pRowSup->numNullRows : 0;
52,864✔
1010
      resetNumNullRows(pRowSup);
52,864✔
1011
  }
1012
}
5,520,734✔
1013

1014
void doKeepStateWindowNullInfo(SWindowRowsSup* pRowSup, TSKEY nullRowTs) {
25,278,209✔
1015
  pRowSup->numNullRows += 1;
25,278,209✔
1016
  pRowSup->prevTs = nullRowTs;
25,278,209✔
1017
}
25,278,209✔
1018

1019
// process a closed state window
1020
// do aggregation on the tuples within the window
1021
// partial aggregation results are stored in the output buffer
1022
static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo,
5,520,734✔
1023
  SWindowRowsSup* pRowSup, SExecTaskInfo* pTaskInfo,
1024
  SExprSupp* pSup, int32_t numOfOutput) {
1025
  int32_t     code = 0;
5,520,734✔
1026
  int32_t     lino = 0;
5,520,734✔
1027
  SResultRow* pResult = NULL;
5,520,734✔
1028
  if (pRowSup->numOfRows == 0) {
5,520,734✔
1029
    // no valid rows within the window
1030
    return TSDB_CODE_SUCCESS;
171,695✔
1031
  }
1032
  code = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win,
10,698,078✔
1033
    true, &pResult, pRowSup->groupId, pSup->pCtx, numOfOutput,
5,349,039✔
1034
    pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1035
  QUERY_CHECK_CODE(code, lino, _return);
5,349,039✔
1036

1037
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
5,349,039✔
1038
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx,
5,349,039✔
1039
    &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1040
    pRowSup->numOfRows, 0, numOfOutput);
1041
  QUERY_CHECK_CODE(code, lino, _return);
5,349,039✔
1042

1043
_return:
5,349,039✔
1044
  if (code != TSDB_CODE_SUCCESS) {
5,349,039✔
1045
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
520✔
1046
  }
1047
  return code;
5,349,039✔
1048
}
1049

1050
// process a data block for state window aggregation
1051
// scan from startIndex to endIndex
1052
// numPartialCalcRows returns the number of rows that have been
1053
// partially calculated within the block
1054
static void doStateWindowAggImpl(SOperatorInfo* pOperator,
600,587✔
1055
                                 SStateWindowOperatorInfo* pInfo,
1056
                                 SSDataBlock* pBlock, int32_t* startIndex,
1057
                                 int32_t* endIndex,
1058
                                 int32_t* numPartialCalcRows) {
1059
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
600,587✔
1060
  SExprSupp*     pExprSup = &pOperator->exprSupp;
600,587✔
1061

1062
  SColumnInfoData* pStateColInfoData = 
1063
    taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
600,587✔
1064
  if (!pStateColInfoData) {
600,587✔
1065
    pTaskInfo->code = terrno;
×
1066
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1067
  }
1068
  uint64_t gid = pBlock->info.id.groupId;
600,587✔
1069
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
600,587✔
1070
  int32_t bytes = pStateColInfoData->info.bytes;
600,587✔
1071

1072
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock,
600,587✔
1073
                                               pInfo->tsSlotId);
600,587✔
1074
  if (NULL == pColInfoData) {
600,587✔
1075
    pTaskInfo->code = terrno;
×
1076
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1077
  }
1078
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
600,587✔
1079

1080
  struct SColumnDataAgg* pAgg = (pBlock->pBlockAgg != NULL) ?
600,587✔
1081
                                &pBlock->pBlockAgg[pInfo->stateCol.slotId] :
600,587✔
1082
                                NULL;
1083
  EStateWinExtendOption  extendOption = pInfo->extendOption;
600,587✔
1084
  SWindowRowsSup*        pRowSup = &pInfo->winSup;
600,587✔
1085

1086
  if (pRowSup->groupId != gid) {
600,587✔
1087
    /*
1088
      group changed, process the previous group's unclosed state window first
1089
    */
1090
    doKeepCurStateWindowEndInfo(pRowSup, tsList, 0, &extendOption, false);
167,871✔
1091
    int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
167,871✔
1092
                                            pExprSup, numOfOutput);
1093
    if (TSDB_CODE_SUCCESS != code) T_LONG_JMP(pTaskInfo->env, code);
167,871✔
1094
    *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
167,871✔
1095

1096
    /*
1097
      unhandled null rows should be ignored, since they belong to previous group
1098
    */
1099
    *numPartialCalcRows += pRowSup->numNullRows;
167,871✔
1100

1101
    /*
1102
      reset state window info for new group
1103
    */
1104
    pInfo->hasKey = false;
167,871✔
1105
    resetWindowRowsSup(pRowSup);
167,871✔
1106
  }
1107

1108
  for (int32_t j = *startIndex; j < *endIndex; ++j) {
105,082,629✔
1109
    if (pBlock->info.scanFlag != PRE_SCAN) {
104,495,319✔
1110
      if (pInfo->winSup.lastTs == INT64_MIN || gid != pRowSup->groupId || !pInfo->hasKey) {
104,423,779✔
1111
        pInfo->winSup.lastTs = tsList[j];
5,964,636✔
1112
      } else {
1113
        if (tsList[j] == pInfo->winSup.lastTs) {
98,459,143✔
1114
          // forbid duplicated ts rows
1115
          qError("%s:%d duplicated ts found in state window aggregation", __FILE__, __LINE__);
13,277✔
1116
          pTaskInfo->code = TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP;
13,277✔
1117
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP);
13,277✔
1118
        } else {
1119
          pInfo->winSup.lastTs = tsList[j];
98,446,000✔
1120
        }
1121
      }
1122
    }
1123
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
208,964,084✔
1124
      doKeepStateWindowNullInfo(pRowSup, tsList[j]);
25,278,209✔
1125
      continue;
25,278,209✔
1126
    }
1127
    if (pStateColInfoData->pData == NULL) {
79,203,833✔
1128
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1129
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1130
    }
1131
    char* val = colDataGetData(pStateColInfoData, j);
79,203,833✔
1132

1133
    if (!pInfo->hasKey) {
79,203,833✔
1134
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
552,693✔
1135
      pInfo->hasKey = true;
552,693✔
1136
      doKeepNewStateWindowStartInfo(
552,693✔
1137
        pRowSup, tsList, j, gid, &extendOption, false);
1138
      doKeepTuple(pRowSup, tsList[j], j, gid);
552,693✔
1139
    } else if (!compareVal(val, &pInfo->stateKey)) {
78,650,872✔
1140
      doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption, true);
4,789,191✔
1141
      int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
4,789,191✔
1142
                                              pExprSup, numOfOutput);
1143
      if (TSDB_CODE_SUCCESS != code) {
4,789,191✔
1144
        T_LONG_JMP(pTaskInfo->env, code);
×
1145
      }
1146
      *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
4,789,191✔
1147

1148
      doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid,
4,789,191✔
1149
                                    &extendOption, true);
1150
      doKeepTuple(pRowSup, tsList[j], j, gid);
4,789,191✔
1151
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
4,789,191✔
1152
    } else {
1153
      doKeepTuple(pRowSup, tsList[j], j, gid);
73,861,949✔
1154
    }
1155
  }
1156

1157
  if (!pInfo->hasKey && extendOption != STATE_WIN_EXTEND_OPTION_FORWARD) {
587,310✔
1158
    /*
1159
      No valid state rows within the block and we don't care about
1160
      null rows before valid state window, mark them as processed and drop them
1161
    */
1162
    *numPartialCalcRows = pBlock->info.rows;
20,280✔
1163
    return;
20,280✔
1164
  }
1165
  if (pRowSup->numOfRows == 0 && 
567,030✔
1166
      extendOption != STATE_WIN_EXTEND_OPTION_BACKWARD) {
4,798✔
1167
    /*
1168
      If no valid state window or we don't know the belonging of
1169
      these null rows, return and handle them with next block
1170
    */
1171
    return;
3,358✔
1172
  }
1173
  doKeepCurStateWindowEndInfo(pRowSup, tsList, *endIndex, &extendOption, false);
563,672✔
1174
  int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
563,672✔
1175
                                          pExprSup, numOfOutput);
1176
  if (TSDB_CODE_SUCCESS != code) {
563,672✔
1177
    pTaskInfo->code = code;
520✔
1178
    T_LONG_JMP(pTaskInfo->env, code);
520✔
1179
  }
1180
  *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
563,152✔
1181
  // reset part of pRowSup after doing agg calculation
1182
  pRowSup->startRowIndex = 0;
563,152✔
1183
  pRowSup->numOfRows = 0;
563,152✔
1184
}
1185

1186
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
583,275✔
1187
  if (OPTR_IS_OPENED(pOperator)) {
583,275✔
1188
    return TSDB_CODE_SUCCESS;
73,803✔
1189
  }
1190

1191
  int32_t                   code = TSDB_CODE_SUCCESS;
509,472✔
1192
  int32_t                   lino = 0;
509,472✔
1193
  SStateWindowOperatorInfo* pInfo = pOperator->info;
509,472✔
1194
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
509,472✔
1195

1196
  SExprSupp* pSup = &pOperator->exprSupp;
509,472✔
1197
  int32_t    order = pInfo->binfo.inputTsOrder;
509,472✔
1198
  int64_t    st = taosGetTimestampUs();
509,472✔
1199

1200
  SOperatorInfo* downstream = pOperator->pDownstream[0];
509,472✔
1201
  pInfo->cleanGroupResInfo = false;
509,472✔
1202

1203
  SSDataBlock* pUnfinishedBlock = NULL;
509,472✔
1204
  int32_t      startIndex = 0;
509,472✔
1205
  int32_t      endIndex = 0;
509,472✔
1206
  int32_t      numPartialCalcRows = 0;
509,472✔
1207
  while (1) {
586,790✔
1208
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,096,262✔
1209
    if (pBlock == NULL) {
1,095,796✔
1210
      if (pUnfinishedBlock != NULL) {
495,209✔
1211
        blockDataDestroy(pUnfinishedBlock);
10,360✔
1212
        pUnfinishedBlock = NULL;
10,360✔
1213
        resetWindowRowsSup(&pInfo->winSup);
10,360✔
1214
      }
1215
      break;
495,209✔
1216
    }
1217
    
1218
    // mark whether pUnfinishedBlock is a reference to pBlock
1219
    bool isRef = false;
600,587✔
1220
    startIndex = 0;
600,587✔
1221
    if (pUnfinishedBlock != NULL) {
600,587✔
1222
      startIndex = pUnfinishedBlock->info.rows;
7,676✔
1223
      // merge unfinished block with current block
1224
      code = blockDataMerge(pUnfinishedBlock, pBlock);
7,676✔
1225
      // reset id to current block id
1226
      pUnfinishedBlock->info.id = pBlock->info.id;
7,676✔
1227
      QUERY_CHECK_CODE(code, lino, _end);
7,676✔
1228
    } else {
1229
      pUnfinishedBlock = pBlock;
592,911✔
1230
      isRef = true;
592,911✔
1231
    }
1232
    endIndex = pUnfinishedBlock->info.rows;
600,587✔
1233

1234
    pInfo->binfo.pRes->info.scanFlag = pUnfinishedBlock->info.scanFlag;
600,587✔
1235
    code = setInputDataBlock(
600,587✔
1236
      pSup, pUnfinishedBlock, order, pUnfinishedBlock->info.scanFlag, true);
600,587✔
1237
    QUERY_CHECK_CODE(code, lino, _end);
600,587✔
1238

1239
    code = blockDataUpdateTsWindow(pUnfinishedBlock, pInfo->tsSlotId);
600,587✔
1240
    QUERY_CHECK_CODE(code, lino, _end);
600,587✔
1241

1242
    // there is an scalar expression that 
1243
    // needs to be calculated right before apply the group aggregation.
1244
    if (pInfo->scalarSup.pExprInfo != NULL) {
600,587✔
1245
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo,
469,147✔
1246
        pUnfinishedBlock, pUnfinishedBlock, pInfo->scalarSup.pCtx,
1247
        pInfo->scalarSup.numOfExprs, NULL,
1248
        GET_STM_RTINFO(pOperator->pTaskInfo));
469,147✔
1249
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
469,147✔
1250
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1251
      }
1252
    }
1253

1254
    doStateWindowAggImpl(pOperator, pInfo, pUnfinishedBlock, 
600,587✔
1255
      &startIndex, &endIndex, &numPartialCalcRows);
1256
    if (numPartialCalcRows < pUnfinishedBlock->info.rows) {
586,790✔
1257
      // save unfinished block for next round processing
1258
      if (isRef) {
18,036✔
1259
        code = createOneDataBlock(pBlock, true, &pUnfinishedBlock);
14,678✔
1260
        QUERY_CHECK_CODE(code, lino, _end);
14,678✔
1261
      }
1262
      code = blockDataTrimFirstRows(pUnfinishedBlock, numPartialCalcRows);
18,036✔
1263
      QUERY_CHECK_NULL(pUnfinishedBlock, code, lino, _end, terrno);
18,036✔
1264
    } else {
1265
      if (!isRef) {
568,754✔
1266
        blockDataDestroy(pUnfinishedBlock);
4,318✔
1267
      }
1268
      pUnfinishedBlock = NULL;
568,754✔
1269
    }
1270
    numPartialCalcRows = 0;
586,790✔
1271
  }
1272

1273
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
495,209✔
1274
  code = initGroupedResultInfo(
495,209✔
1275
    &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1276
  QUERY_CHECK_CODE(code, lino, _end);
495,209✔
1277
  pInfo->cleanGroupResInfo = true;
495,209✔
1278
  pOperator->status = OP_RES_TO_RETURN;
495,209✔
1279

1280
_end:
495,209✔
1281
  if (code != TSDB_CODE_SUCCESS) {
495,209✔
1282
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1283
    pTaskInfo->code = code;
×
1284
    T_LONG_JMP(pTaskInfo->env, code);
×
1285
  }
1286
  return code;
495,209✔
1287
}
1288

1289
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,025,356✔
1290
  if (pOperator->status == OP_EXEC_DONE) {
1,025,356✔
1291
    (*ppRes) = NULL;
442,081✔
1292
    return TSDB_CODE_SUCCESS;
442,081✔
1293
  }
1294

1295
  int32_t                   code = TSDB_CODE_SUCCESS;
583,275✔
1296
  int32_t                   lino = 0;
583,275✔
1297
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
583,275✔
1298
  SStateWindowOperatorInfo* pInfo = pOperator->info;
583,275✔
1299
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
583,275✔
1300

1301
  code = pOperator->fpSet._openFn(pOperator);
583,275✔
1302
  QUERY_CHECK_CODE(code, lino, _end);
569,012✔
1303

1304
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
569,012✔
1305
  QUERY_CHECK_CODE(code, lino, _end);
569,012✔
1306

1307
  while (1) {
×
1308
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
569,012✔
1309
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
569,012✔
1310
    QUERY_CHECK_CODE(code, lino, _end);
569,012✔
1311

1312
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
569,012✔
1313
    if (!hasRemain) {
569,012✔
1314
      setOperatorCompleted(pOperator);
495,209✔
1315
      break;
495,209✔
1316
    }
1317

1318
    if (pBInfo->pRes->info.rows > 0) {
73,803✔
1319
      break;
73,803✔
1320
    }
1321
  }
1322

1323
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
569,012✔
1324

1325
_end:
569,012✔
1326
  if (code != TSDB_CODE_SUCCESS) {
569,012✔
1327
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1328
    pTaskInfo->code = code;
×
1329
    T_LONG_JMP(pTaskInfo->env, code);
×
1330
  }
1331
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
569,012✔
1332
  return code;
569,012✔
1333
}
1334

1335
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
10,000,527✔
1336
  int32_t                   code = TSDB_CODE_SUCCESS;
10,000,527✔
1337
  int32_t                   lino = 0;
10,000,527✔
1338
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
10,000,527✔
1339
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
10,000,527✔
1340

1341
  if (pOperator->status == OP_EXEC_DONE) {
10,000,527✔
1342
    (*ppRes) = NULL;
1,387,266✔
1343
    return code;
1,387,266✔
1344
  }
1345

1346
  SSDataBlock* pBlock = pInfo->binfo.pRes;
8,613,261✔
1347
  code = pOperator->fpSet._openFn(pOperator);
8,613,261✔
1348
  QUERY_CHECK_CODE(code, lino, _end);
8,612,587✔
1349

1350
  while (1) {
2,085✔
1351
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
8,614,672✔
1352
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
8,614,865✔
1353
    QUERY_CHECK_CODE(code, lino, _end);
8,614,388✔
1354

1355
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
8,614,388✔
1356
    if (!hasRemain) {
8,614,388✔
1357
      setOperatorCompleted(pOperator);
2,080,544✔
1358
      break;
2,081,502✔
1359
    }
1360

1361
    if (pBlock->info.rows > 0) {
6,533,844✔
1362
      break;
6,531,759✔
1363
    }
1364
  }
1365

1366
  size_t rows = pBlock->info.rows;
8,613,261✔
1367
  pOperator->resultInfo.totalRows += rows;
8,612,780✔
1368

1369
_end:
8,612,303✔
1370
  if (code != TSDB_CODE_SUCCESS) {
8,612,303✔
1371
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1372
    pTaskInfo->code = code;
×
1373
    T_LONG_JMP(pTaskInfo->env, code);
×
1374
  }
1375
  (*ppRes) = (rows == 0) ? NULL : pBlock;
8,612,303✔
1376
  return code;
8,612,303✔
1377
}
1378

1379
static void destroyStateWindowOperatorInfo(void* param) {
484,446✔
1380
  if (param == NULL) {
484,446✔
1381
    return;
×
1382
  }
1383
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
484,446✔
1384
  cleanupBasicInfo(&pInfo->binfo);
484,446✔
1385
  taosMemoryFreeClear(pInfo->stateKey.pData);
484,446✔
1386
  if (pInfo->pOperator) {
484,446✔
1387
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
484,446✔
1388
                      pInfo->cleanGroupResInfo);
484,446✔
1389
    pInfo->pOperator = NULL;
484,446✔
1390
  }
1391

1392
  cleanupExprSupp(&pInfo->scalarSup);
484,446✔
1393
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
484,446✔
1394
  cleanupAggSup(&pInfo->aggSup);
484,446✔
1395
  cleanupGroupResInfo(&pInfo->groupResInfo);
484,446✔
1396

1397
  taosMemoryFreeClear(param);
484,446✔
1398
}
1399

1400
static void freeItem(void* param) {
35,260✔
1401
  SGroupKeys* pKey = (SGroupKeys*)param;
35,260✔
1402
  taosMemoryFree(pKey->pData);
35,260✔
1403
}
35,260✔
1404

1405
void destroyIntervalOperatorInfo(void* param) {
2,582,081✔
1406
  if (param == NULL) {
2,582,081✔
1407
    return;
×
1408
  }
1409

1410
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
2,582,081✔
1411

1412
  cleanupBasicInfo(&pInfo->binfo);
2,582,081✔
1413
  if (pInfo->pOperator) {
2,582,801✔
1414
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
2,582,801✔
1415
                      pInfo->cleanGroupResInfo);
2,582,801✔
1416
    pInfo->pOperator = NULL;
2,582,801✔
1417
  }
1418

1419
  cleanupAggSup(&pInfo->aggSup);
2,582,801✔
1420
  cleanupExprSupp(&pInfo->scalarSupp);
2,582,801✔
1421

1422
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
2,582,801✔
1423

1424
  taosArrayDestroy(pInfo->pInterpCols);
2,582,801✔
1425
  pInfo->pInterpCols = NULL;
2,582,801✔
1426

1427
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
2,582,801✔
1428
  pInfo->pPrevValues = NULL;
2,582,801✔
1429

1430
  cleanupGroupResInfo(&pInfo->groupResInfo);
2,582,801✔
1431
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
2,582,801✔
1432
  destroyBoundedQueue(pInfo->pBQ);
2,582,320✔
1433
  taosMemoryFreeClear(param);
2,582,320✔
1434
}
1435

1436
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
17,630✔
1437
  int32_t code = TSDB_CODE_SUCCESS;
17,630✔
1438
  int32_t lino = 0;
17,630✔
1439
  void*   tmp = NULL;
17,630✔
1440

1441
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
17,630✔
1442
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
17,630✔
1443

1444
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
17,630✔
1445
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
17,630✔
1446

1447
  {  // ts column
1448
    SColumn c = {0};
17,630✔
1449
    c.colId = 1;
17,630✔
1450
    c.slotId = pInfo->primaryTsIndex;
17,630✔
1451
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
17,630✔
1452
    c.bytes = sizeof(int64_t);
17,630✔
1453
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
17,630✔
1454
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,630✔
1455

1456
    SGroupKeys key;
17,630✔
1457
    key.bytes = c.bytes;
17,630✔
1458
    key.type = c.type;
17,630✔
1459
    key.isNull = true;  // to denote no value is assigned yet
17,630✔
1460
    key.pData = taosMemoryCalloc(1, c.bytes);
17,630✔
1461
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
17,630✔
1462

1463
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
17,630✔
1464
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,630✔
1465
  }
1466
_end:
17,630✔
1467
  if (code != TSDB_CODE_SUCCESS) {
17,630✔
1468
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1469
  }
1470
  return code;
17,630✔
1471
}
1472

1473
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
2,579,801✔
1474
                                      bool* pRes) {
1475
  // the primary timestamp column
1476
  bool    needed = false;
2,579,801✔
1477
  int32_t code = TSDB_CODE_SUCCESS;
2,579,801✔
1478
  int32_t lino = 0;
2,579,801✔
1479
  void*   tmp = NULL;
2,579,801✔
1480

1481
  for (int32_t i = 0; i < numOfCols; ++i) {
13,517,792✔
1482
    SExprInfo* pExpr = pCtx[i].pExpr;
10,957,097✔
1483
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
10,957,660✔
1484
      needed = true;
17,630✔
1485
      break;
17,630✔
1486
    }
1487
  }
1488

1489
  if (needed) {
2,578,325✔
1490
    code = initWindowInterpPrevVal(pInfo);
17,630✔
1491
    QUERY_CHECK_CODE(code, lino, _end);
17,630✔
1492
  }
1493

1494
  for (int32_t i = 0; i < numOfCols; ++i) {
13,541,186✔
1495
    SExprInfo* pExpr = pCtx[i].pExpr;
10,962,206✔
1496

1497
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
10,965,334✔
1498
      SFunctParam* pParam = &pExpr->base.pParam[0];
17,630✔
1499

1500
      SColumn c = *pParam->pCol;
17,630✔
1501
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
17,630✔
1502
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,630✔
1503

1504
      SGroupKeys key = {0};
17,630✔
1505
      key.bytes = c.bytes;
17,630✔
1506
      key.type = c.type;
17,630✔
1507
      key.isNull = false;
17,630✔
1508
      key.pData = taosMemoryCalloc(1, c.bytes);
17,630✔
1509
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
17,630✔
1510

1511
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
17,630✔
1512
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,630✔
1513
    }
1514
  }
1515

1516
_end:
2,578,980✔
1517
  if (code != TSDB_CODE_SUCCESS) {
2,581,055✔
1518
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1519
  }
1520
  *pRes = needed;
2,581,055✔
1521
  return code;
2,578,383✔
1522
}
1523

1524
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
×
1525
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
1526
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
×
1527
  pOper->status = OP_NOT_OPENED;
×
1528

1529
  resetBasicOperatorState(&pIntervalInfo->binfo);
×
1530
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
×
1531
    pIntervalInfo->cleanGroupResInfo);
×
1532

1533
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
×
1534
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
1535
  if (code == 0) {
×
1536
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
1537
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
1538
                       &pTaskInfo->storageAPI.functionStore);
1539
  }
1540
  if (code == 0) {
×
1541
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
×
1542
                         &pTaskInfo->storageAPI.functionStore);
1543
  }
1544

1545
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
×
1546
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
1547
  }
1548

1549
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
×
1550
    pIntervalInfo->curGroupId = UINT64_MAX;
×
1551
  }
1552

1553
  pIntervalInfo->cleanGroupResInfo = false;
×
1554
  pIntervalInfo->handledGroupNum = 0;
×
1555
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
×
1556
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
×
1557

1558
  taosArrayDestroy(pIntervalInfo->pInterpCols);
×
1559
  pIntervalInfo->pInterpCols = NULL;
×
1560

1561
  if (pIntervalInfo->pPrevValues != NULL) {
×
1562
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
1563
    pIntervalInfo->pPrevValues = NULL;
×
1564
    code = initWindowInterpPrevVal(pIntervalInfo);
×
1565
  }
1566

1567
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
×
1568
  destroyBoundedQueue(pIntervalInfo->pBQ);
×
1569
  pIntervalInfo->pBQ = NULL;
×
1570
  return code;
×
1571
}
1572

1573
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
×
1574
  SIntervalAggOperatorInfo* pInfo = pOper->info;
×
1575
  return resetInterval(pOper, pInfo);
×
1576
}
1577

1578
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
2,088,684✔
1579
                                   SOperatorInfo** pOptrInfo) {
1580
  QRY_PARAM_CHECK(pOptrInfo);
2,088,684✔
1581

1582
  int32_t                   code = TSDB_CODE_SUCCESS;
2,089,947✔
1583
  int32_t                   lino = 0;
2,089,947✔
1584
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
2,089,947✔
1585
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,088,130✔
1586
  if (pInfo == NULL || pOperator == NULL) {
2,089,020✔
1587
    code = terrno;
112✔
1588
    lino = __LINE__;
×
1589
    goto _error;
×
1590
  }
1591

1592
  pOperator->pPhyNode = pPhyNode;
2,088,908✔
1593
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
2,088,437✔
1594
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,090,726✔
1595
  initBasicInfo(&pInfo->binfo, pResBlock);
2,090,726✔
1596

1597
  SExprSupp* pSup = &pOperator->exprSupp;
2,089,831✔
1598
  pSup->hasWindowOrGroup = true;
2,088,684✔
1599
  pSup->hasWindow = true;
2,090,617✔
1600

1601
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
2,089,635✔
1602

1603
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,090,421✔
1604
  initResultSizeInfo(&pOperator->resultInfo, 512);
2,090,421✔
1605
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
2,090,358✔
1606
  QUERY_CHECK_CODE(code, lino, _error);
2,090,726✔
1607

1608
  int32_t    num = 0;
2,090,726✔
1609
  SExprInfo* pExprInfo = NULL;
2,090,726✔
1610
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
2,090,726✔
1611
  QUERY_CHECK_CODE(code, lino, _error);
2,090,726✔
1612

1613
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
2,090,726✔
1614
                    &pTaskInfo->storageAPI.functionStore);
1615
  QUERY_CHECK_CODE(code, lino, _error);
2,089,291✔
1616

1617
  SInterval interval = {.interval = pPhyNode->interval,
6,264,619✔
1618
                        .sliding = pPhyNode->sliding,
2,088,710✔
1619
                        .intervalUnit = pPhyNode->intervalUnit,
2,089,291✔
1620
                        .slidingUnit = pPhyNode->slidingUnit,
2,087,193✔
1621
                        .offset = pPhyNode->offset,
2,085,936✔
1622
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
2,088,466✔
1623
                        .timeRange = pPhyNode->timeRange};
1624
  calcIntervalAutoOffset(&interval);
2,087,036✔
1625

1626
  STimeWindowAggSupp as = {
2,086,797✔
1627
      .maxTs = INT64_MIN,
1628
  };
1629

1630
  pInfo->win = pTaskInfo->window;
2,086,797✔
1631
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
2,087,252✔
1632
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
2,087,561✔
1633
  pInfo->interval = interval;
2,087,640✔
1634
  pInfo->twAggSup = as;
2,088,982✔
1635
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
2,084,507✔
1636
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
2,088,395✔
1637
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
894,646✔
1638
    pInfo->limited = true;
893,688✔
1639
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
894,450✔
1640
  }
1641
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
2,084,576✔
1642
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
61,866✔
1643
    pInfo->slimited = true;
61,866✔
1644
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
61,866✔
1645
    pInfo->curGroupId = UINT64_MAX;
61,866✔
1646
  }
1647

1648
  if (pPhyNode->window.pExprs != NULL) {
2,086,287✔
1649
    int32_t    numOfScalar = 0;
71,292✔
1650
    SExprInfo* pScalarExprInfo = NULL;
71,292✔
1651
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
71,050✔
1652
    QUERY_CHECK_CODE(code, lino, _error);
71,401✔
1653

1654
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
71,401✔
1655
    if (code != TSDB_CODE_SUCCESS) {
71,401✔
1656
      goto _error;
×
1657
    }
1658
  }
1659

1660
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
2,088,065✔
1661
                            pTaskInfo->pStreamRuntimeInfo);
2,088,177✔
1662
  if (code != TSDB_CODE_SUCCESS) {
2,086,592✔
1663
    goto _error;
×
1664
  }
1665

1666
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
2,086,592✔
1667
  QUERY_CHECK_CODE(code, lino, _error);
2,085,373✔
1668

1669
  pInfo->timeWindowInterpo = false;
2,085,373✔
1670
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
2,085,398✔
1671
  QUERY_CHECK_CODE(code, lino, _error);
2,087,140✔
1672
  if (pInfo->timeWindowInterpo) {
2,087,140✔
1673
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
17,630✔
1674
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
17,630✔
1675
      goto _error;
×
1676
    }
1677
  }
1678

1679
  pInfo->pOperator = pOperator;
2,087,140✔
1680
  pInfo->cleanGroupResInfo = false;
2,085,997✔
1681
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2,087,960✔
1682
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
2,087,443✔
1683
                  pInfo, pTaskInfo);
1684

1685
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
2,088,289✔
1686
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1687
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
2,088,986✔
1688
  code = appendDownstream(pOperator, &downstream, 1);
2,089,944✔
1689
  if (code != TSDB_CODE_SUCCESS) {
2,084,525✔
1690
    goto _error;
×
1691
  }
1692

1693
  *pOptrInfo = pOperator;
2,084,525✔
1694
  return TSDB_CODE_SUCCESS;
2,084,525✔
1695

1696
_error:
×
1697
  if (pInfo != NULL) {
×
1698
    destroyIntervalOperatorInfo(pInfo);
×
1699
  }
1700

1701
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1702
  pTaskInfo->code = code;
×
1703
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
×
1704
  return code;
×
1705
}
1706

1707
// todo handle multiple timeline cases. assume no timeline interweaving
1708
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
125,352✔
1709
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
125,352✔
1710
  SExprSupp*     pSup = &pOperator->exprSupp;
125,352✔
1711

1712
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
125,352✔
1713
  if (!pColInfoData) {
125,352✔
1714
    pTaskInfo->code = terrno;
×
1715
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1716
  }
1717

1718
  bool    masterScan = true;
125,352✔
1719
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
125,352✔
1720
  int64_t gid = pBlock->info.id.groupId;
125,352✔
1721

1722
  int64_t gap = pInfo->gap;
125,352✔
1723

1724
  if (!pInfo->reptScan) {
125,352✔
1725
    pInfo->reptScan = true;
117,442✔
1726
    pInfo->winSup.prevTs = INT64_MIN;
117,442✔
1727
  }
1728

1729
  SWindowRowsSup* pRowSup = &pInfo->winSup;
125,352✔
1730
  pRowSup->numOfRows = 0;
125,352✔
1731
  pRowSup->startRowIndex = 0;
125,352✔
1732

1733
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1734
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
125,352✔
1735
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
10,245,784✔
1736
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
10,120,432✔
1737
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
119,403✔
1738
      doKeepTuple(pRowSup, tsList[j], j, gid);
119,403✔
1739
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
10,001,029✔
1740
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
1,291,992✔
1741
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1742
      doKeepTuple(pRowSup, tsList[j], j, gid);
8,735,887✔
1743
    } else {  // start a new session window
1744
      // start a new session window
1745
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
1,265,142✔
1746
        SResultRow* pResult = NULL;
1,259,666✔
1747

1748
        // keep the time window for the closed time window.
1749
        STimeWindow window = pRowSup->win;
1,259,666✔
1750

1751
        int32_t ret =
1752
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
1,259,666✔
1753
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1754
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
1,259,666✔
1755
          T_LONG_JMP(pTaskInfo->env, ret);
×
1756
        }
1757

1758
        // pInfo->numOfRows data belong to the current session window
1759
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
1,259,666✔
1760
        ret =
1761
            applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
1,259,666✔
1762
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1,259,666✔
1763
        if (ret != TSDB_CODE_SUCCESS) {
1,259,666✔
1764
          T_LONG_JMP(pTaskInfo->env, ret);
×
1765
        }
1766
      }
1767

1768
      // here we start a new session window
1769
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
1,265,142✔
1770
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,265,142✔
1771
    }
1772
  }
1773

1774
  SResultRow* pResult = NULL;
125,352✔
1775
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
125,352✔
1776
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
125,352✔
1777
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1778
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
125,352✔
1779
    T_LONG_JMP(pTaskInfo->env, ret);
×
1780
  }
1781

1782
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
125,352✔
1783
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
125,352✔
1784
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
125,352✔
1785
  if (ret != TSDB_CODE_SUCCESS) {
125,352✔
1786
    T_LONG_JMP(pTaskInfo->env, ret);
×
1787
  }
1788
}
125,352✔
1789

1790
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
228,488✔
1791
  if (pOperator->status == OP_EXEC_DONE) {
228,488✔
1792
    (*ppRes) = NULL;
102,938✔
1793
    return TSDB_CODE_SUCCESS;
102,938✔
1794
  }
1795

1796
  int32_t                  code = TSDB_CODE_SUCCESS;
125,550✔
1797
  int32_t                  lino = 0;
125,550✔
1798
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
125,550✔
1799
  SSessionAggOperatorInfo* pInfo = pOperator->info;
125,550✔
1800
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
125,550✔
1801
  SExprSupp*               pSup = &pOperator->exprSupp;
125,550✔
1802

1803
  if (pOperator->status == OP_RES_TO_RETURN) {
125,550✔
1804
    while (1) {
×
1805
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,961✔
1806
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,961✔
1807
      QUERY_CHECK_CODE(code, lino, _end);
1,961✔
1808

1809
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,961✔
1810
      if (!hasRemain) {
1,961✔
1811
        setOperatorCompleted(pOperator);
1,392✔
1812
        break;
1,392✔
1813
      }
1814

1815
      if (pBInfo->pRes->info.rows > 0) {
569✔
1816
        break;
569✔
1817
      }
1818
    }
1819
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1,961✔
1820
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1,961✔
1821
    return code;
1,961✔
1822
  }
1823

1824
  int64_t st = taosGetTimestampUs();
123,589✔
1825
  int32_t order = pInfo->binfo.inputTsOrder;
123,589✔
1826

1827
  SOperatorInfo* downstream = pOperator->pDownstream[0];
123,589✔
1828

1829
  pInfo->cleanGroupResInfo = false;
123,589✔
1830
  while (1) {
125,352✔
1831
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
248,941✔
1832
    if (pBlock == NULL) {
248,941✔
1833
      break;
123,589✔
1834
    }
1835

1836
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
125,352✔
1837
    if (pInfo->scalarSupp.pExprInfo != NULL) {
125,352✔
1838
      SExprSupp* pExprSup = &pInfo->scalarSupp;
672✔
1839
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
672✔
1840
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
672✔
1841
      QUERY_CHECK_CODE(code, lino, _end);
672✔
1842
    }
1843
    // the pDataBlock are always the same one, no need to call this again
1844
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
125,352✔
1845
    QUERY_CHECK_CODE(code, lino, _end);
125,352✔
1846

1847
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
125,352✔
1848
    QUERY_CHECK_CODE(code, lino, _end);
125,352✔
1849

1850
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
125,352✔
1851
  }
1852

1853
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
123,589✔
1854

1855
  // restore the value
1856
  pOperator->status = OP_RES_TO_RETURN;
123,589✔
1857

1858
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
123,589✔
1859
  QUERY_CHECK_CODE(code, lino, _end);
123,589✔
1860
  pInfo->cleanGroupResInfo = true;
123,589✔
1861

1862
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
123,589✔
1863
  QUERY_CHECK_CODE(code, lino, _end);
123,589✔
1864
  while (1) {
×
1865
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
123,589✔
1866
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
123,589✔
1867
    QUERY_CHECK_CODE(code, lino, _end);
123,589✔
1868

1869
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
123,589✔
1870
    if (!hasRemain) {
123,589✔
1871
      setOperatorCompleted(pOperator);
122,197✔
1872
      break;
122,197✔
1873
    }
1874

1875
    if (pBInfo->pRes->info.rows > 0) {
1,392✔
1876
      break;
1,392✔
1877
    }
1878
  }
1879
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
123,589✔
1880

1881
_end:
123,589✔
1882
  if (code != TSDB_CODE_SUCCESS) {
123,589✔
1883
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1884
    pTaskInfo->code = code;
×
1885
    T_LONG_JMP(pTaskInfo->env, code);
×
1886
  }
1887
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
123,589✔
1888
  return code;
123,589✔
1889
}
1890

1891
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
31,294✔
1892
  SStateWindowOperatorInfo* pInfo = pOper->info;
31,294✔
1893
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
31,294✔
1894
  SStateWindowPhysiNode* pPhynode = (SStateWindowPhysiNode*)pOper->pPhyNode;
31,294✔
1895
  pOper->status = OP_NOT_OPENED;
31,294✔
1896

1897
  resetBasicOperatorState(&pInfo->binfo);
31,294✔
1898
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
31,294✔
1899
                    pInfo->cleanGroupResInfo);
31,294✔
1900

1901
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
31,294✔
1902
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
31,294✔
1903
  if (code == 0) {
31,294✔
1904
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
62,588✔
1905
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
31,294✔
1906
                       &pTaskInfo->storageAPI.functionStore);
1907
  }
1908
  if (code == 0) {
31,294✔
1909
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
31,294✔
1910
                         &pTaskInfo->storageAPI.functionStore);
1911
  }
1912

1913
  pInfo->cleanGroupResInfo = false;
31,294✔
1914
  pInfo->hasKey = false;
31,294✔
1915
  pInfo->winSup.lastTs = INT64_MIN;
31,294✔
1916
  cleanupGroupResInfo(&pInfo->groupResInfo);
31,294✔
1917
  memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
31,294✔
1918
  return code;
31,294✔
1919
}
1920

1921
// todo make this as an non-blocking operator
1922
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhysiNode* pStateNode,
484,446✔
1923
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1924
  QRY_PARAM_CHECK(pOptrInfo);
484,446✔
1925

1926
  int32_t                   code = TSDB_CODE_SUCCESS;
484,446✔
1927
  int32_t                   lino = 0;
484,446✔
1928
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
484,446✔
1929
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
484,446✔
1930
  if (pInfo == NULL || pOperator == NULL) {
484,446✔
1931
    code = terrno;
×
1932
    goto _error;
×
1933
  }
1934

1935
  pOperator->pPhyNode = pStateNode;
484,446✔
1936
  pOperator->exprSupp.hasWindowOrGroup = true;
484,446✔
1937
  pOperator->exprSupp.hasWindow = true;
484,446✔
1938
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
484,446✔
1939
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
484,446✔
1940

1941
  if (pStateNode->window.pExprs != NULL) {
484,446✔
1942
    int32_t    numOfScalarExpr = 0;
402,611✔
1943
    SExprInfo* pScalarExprInfo = NULL;
402,611✔
1944
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
402,611✔
1945
    QUERY_CHECK_CODE(code, lino, _error);
402,611✔
1946

1947
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
402,611✔
1948
    if (code != TSDB_CODE_SUCCESS) {
402,611✔
1949
      goto _error;
×
1950
    }
1951
  }
1952

1953
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
484,446✔
1954
  pInfo->stateKey.type = pInfo->stateCol.type;
484,446✔
1955
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
484,446✔
1956
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
484,446✔
1957
  if (pInfo->stateKey.pData == NULL) {
484,446✔
1958
    goto _error;
×
1959
  }
1960
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
484,446✔
1961
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
484,446✔
1962

1963
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
484,446✔
1964
                            pTaskInfo->pStreamRuntimeInfo);
484,446✔
1965
  if (code != TSDB_CODE_SUCCESS) {
484,446✔
1966
    goto _error;
×
1967
  }
1968

1969
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
484,446✔
1970

1971
  int32_t    num = 0;
484,446✔
1972
  SExprInfo* pExprInfo = NULL;
484,446✔
1973
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
484,446✔
1974
  QUERY_CHECK_CODE(code, lino, _error);
484,446✔
1975

1976
  initResultSizeInfo(&pOperator->resultInfo, 4096);
484,446✔
1977

1978
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
968,892✔
1979
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
484,446✔
1980
  if (code != TSDB_CODE_SUCCESS) {
484,446✔
1981
    goto _error;
×
1982
  }
1983

1984
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
484,446✔
1985
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
484,446✔
1986
  initBasicInfo(&pInfo->binfo, pResBlock);
484,446✔
1987
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
484,446✔
1988

1989
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
484,446✔
1990
  QUERY_CHECK_CODE(code, lino, _error);
484,446✔
1991

1992
  pInfo->tsSlotId = tsSlotId;
484,446✔
1993
  pInfo->pOperator = pOperator;
484,446✔
1994
  pInfo->cleanGroupResInfo = false;
484,446✔
1995
  pInfo->extendOption = pStateNode->extendOption;
484,446✔
1996
  pInfo->trueForLimit = pStateNode->trueForLimit;
484,446✔
1997
  pInfo->winSup.lastTs = INT64_MIN;
484,446✔
1998

1999
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
484,446✔
2000
                  pTaskInfo);
2001
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
484,446✔
2002
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2003
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
484,446✔
2004

2005
  code = appendDownstream(pOperator, &downstream, 1);
484,446✔
2006
  if (code != TSDB_CODE_SUCCESS) {
484,446✔
2007
    goto _error;
×
2008
  }
2009

2010
  *pOptrInfo = pOperator;
484,446✔
2011
  return TSDB_CODE_SUCCESS;
484,446✔
2012

2013
_error:
×
2014
  if (pInfo != NULL) {
×
2015
    destroyStateWindowOperatorInfo(pInfo);
×
2016
  }
2017

2018
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2019
  pTaskInfo->code = code;
×
2020
  return code;
×
2021
}
2022

2023
void destroySWindowOperatorInfo(void* param) {
123,589✔
2024
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
123,589✔
2025
  if (pInfo == NULL) {
123,589✔
2026
    return;
×
2027
  }
2028

2029
  cleanupBasicInfo(&pInfo->binfo);
123,589✔
2030
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
123,589✔
2031
  if (pInfo->pOperator) {
123,589✔
2032
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
123,589✔
2033
                      pInfo->cleanGroupResInfo);
123,589✔
2034
    pInfo->pOperator = NULL;
123,589✔
2035
  }
2036

2037
  cleanupAggSup(&pInfo->aggSup);
123,589✔
2038
  cleanupExprSupp(&pInfo->scalarSupp);
123,589✔
2039

2040
  cleanupGroupResInfo(&pInfo->groupResInfo);
123,589✔
2041
  taosMemoryFreeClear(param);
123,589✔
2042
}
2043

2044
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
×
2045
  SSessionAggOperatorInfo* pInfo = pOper->info;
×
2046
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
2047
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
×
2048
  pOper->status = OP_NOT_OPENED;
×
2049

2050
  resetBasicOperatorState(&pInfo->binfo);
×
2051
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
×
2052
                    pInfo->cleanGroupResInfo);
×
2053

2054
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
×
2055
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
×
2056
  if (code == 0) {
×
2057
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
×
2058
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
×
2059
                       &pTaskInfo->storageAPI.functionStore);
2060
  }
2061
  if (code == 0) {
×
2062
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
×
2063
                         &pTaskInfo->storageAPI.functionStore);
2064
  }
2065

2066
  pInfo->cleanGroupResInfo = false;
×
2067
  pInfo->winSup = (SWindowRowsSup){0};
×
2068
  pInfo->winSup.prevTs = INT64_MIN;
×
2069
  pInfo->reptScan = false;
×
2070

2071
  cleanupGroupResInfo(&pInfo->groupResInfo);
×
2072
  return code;
×
2073
}
2074

2075
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
123,589✔
2076
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2077
  QRY_PARAM_CHECK(pOptrInfo);
123,589✔
2078

2079
  int32_t                  code = TSDB_CODE_SUCCESS;
123,589✔
2080
  int32_t                  lino = 0;
123,589✔
2081
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
123,589✔
2082
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
123,589✔
2083
  if (pInfo == NULL || pOperator == NULL) {
123,589✔
2084
    code = terrno;
×
2085
    goto _error;
×
2086
  }
2087

2088
  pOperator->pPhyNode = pSessionNode;
123,589✔
2089
  pOperator->exprSupp.hasWindowOrGroup = true;
123,589✔
2090
  pOperator->exprSupp.hasWindow = true;
123,589✔
2091

2092
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
123,589✔
2093
  initResultSizeInfo(&pOperator->resultInfo, 4096);
123,589✔
2094

2095
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
123,589✔
2096
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
123,589✔
2097
  initBasicInfo(&pInfo->binfo, pResBlock);
123,589✔
2098

2099
  int32_t    numOfCols = 0;
123,589✔
2100
  SExprInfo* pExprInfo = NULL;
123,589✔
2101
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
123,589✔
2102
  QUERY_CHECK_CODE(code, lino, _error);
123,589✔
2103

2104
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
247,178✔
2105
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
123,589✔
2106
  QUERY_CHECK_CODE(code, lino, _error);
123,589✔
2107

2108
  pInfo->gap = pSessionNode->gap;
123,589✔
2109

2110
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
123,589✔
2111
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
123,589✔
2112
  QUERY_CHECK_CODE(code, lino, _error);
123,589✔
2113

2114
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
123,589✔
2115
  pInfo->binfo.pRes = pResBlock;
123,589✔
2116
  pInfo->winSup.prevTs = INT64_MIN;
123,589✔
2117
  pInfo->reptScan = false;
123,589✔
2118
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
123,589✔
2119
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
123,589✔
2120

2121
  if (pSessionNode->window.pExprs != NULL) {
123,589✔
2122
    int32_t    numOfScalar = 0;
224✔
2123
    SExprInfo* pScalarExprInfo = NULL;
224✔
2124
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
224✔
2125
    QUERY_CHECK_CODE(code, lino, _error);
224✔
2126

2127
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
224✔
2128
    QUERY_CHECK_CODE(code, lino, _error);
224✔
2129
  }
2130

2131
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
123,589✔
2132
                            pTaskInfo->pStreamRuntimeInfo);
123,589✔
2133
  QUERY_CHECK_CODE(code, lino, _error);
123,589✔
2134

2135
  pInfo->pOperator = pOperator;
123,589✔
2136
  pInfo->cleanGroupResInfo = false;
123,589✔
2137
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
123,589✔
2138
                  pInfo, pTaskInfo);
2139
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
123,589✔
2140
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2141
  pOperator->pTaskInfo = pTaskInfo;
123,589✔
2142
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
123,589✔
2143

2144
  code = appendDownstream(pOperator, &downstream, 1);
123,589✔
2145
  QUERY_CHECK_CODE(code, lino, _error);
123,589✔
2146

2147
  *pOptrInfo = pOperator;
123,589✔
2148
  return TSDB_CODE_SUCCESS;
123,589✔
2149

2150
_error:
×
2151
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
2152
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2153
  pTaskInfo->code = code;
×
2154
  return code;
×
2155
}
2156

2157
void destroyMAIOperatorInfo(void* param) {
492,075✔
2158
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
492,075✔
2159
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
492,075✔
2160
  taosMemoryFreeClear(param);
492,075✔
2161
}
492,075✔
2162

2163
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
435,376✔
2164
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
435,376✔
2165
  if (NULL == pResult) {
435,376✔
2166
    return pResult;
×
2167
  }
2168
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
435,376✔
2169
  return pResult;
435,376✔
2170
}
2171

2172
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
386,569,879✔
2173
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
2174
  if (*pResult == NULL) {
386,569,879✔
2175
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
435,376✔
2176
    if (*pResult == NULL) {
435,376✔
2177
      return terrno;
×
2178
    }
2179
  }
2180

2181
  // set time window for current result
2182
  (*pResult)->win = (*win);
386,570,245✔
2183
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
386,569,696✔
2184
}
2185

2186
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
1,764,789✔
2187
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2188
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
1,764,789✔
2189
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
1,764,789✔
2190

2191
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
1,764,789✔
2192
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
1,764,789✔
2193
  SInterval*     pInterval = &iaInfo->interval;
1,764,789✔
2194

2195
  int32_t  startPos = 0;
1,764,789✔
2196
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
1,764,789✔
2197

2198
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
1,764,789✔
2199

2200
  // there is an result exists
2201
  if (miaInfo->curTs != INT64_MIN) {
1,764,789✔
2202
    if (ts != miaInfo->curTs) {
636,632✔
2203
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
604,790✔
2204
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
604,790✔
2205
      miaInfo->curTs = ts;
604,790✔
2206
    }
2207
  } else {
2208
    miaInfo->curTs = ts;
1,128,157✔
2209
  }
2210

2211
  STimeWindow win = {0};
1,764,789✔
2212
  win.skey = miaInfo->curTs;
1,764,789✔
2213
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
1,764,789✔
2214

2215
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
1,764,789✔
2216
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
1,764,789✔
2217
    T_LONG_JMP(pTaskInfo->env, ret);
×
2218
  }
2219

2220
  int32_t currPos = startPos;
1,764,789✔
2221

2222
  STimeWindow currWin = win;
1,764,789✔
2223
  while (++currPos < pBlock->info.rows) {
886,576,043✔
2224
    if (tsCols[currPos] == miaInfo->curTs) {
884,804,849✔
2225
      continue;
500,006,530✔
2226
    }
2227

2228
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
384,803,626✔
2229
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
769,609,265✔
2230
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
384,804,541✔
2231
    if (ret != TSDB_CODE_SUCCESS) {
384,803,626✔
2232
      T_LONG_JMP(pTaskInfo->env, ret);
×
2233
    }
2234

2235
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
384,803,626✔
2236
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
384,803,260✔
2237
    miaInfo->curTs = tsCols[currPos];
384,804,358✔
2238

2239
    currWin.skey = miaInfo->curTs;
384,805,090✔
2240
    currWin.ekey =
384,804,907✔
2241
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
384,804,907✔
2242

2243
    startPos = currPos;
384,804,907✔
2244
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
384,804,907✔
2245
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
384,804,724✔
2246
      T_LONG_JMP(pTaskInfo->env, ret);
×
2247
    }
2248

2249
    miaInfo->curTs = currWin.skey;
384,804,724✔
2250
  }
2251

2252
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
1,764,789✔
2253
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
3,529,578✔
2254
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
1,764,789✔
2255
  if (ret != TSDB_CODE_SUCCESS) {
1,764,789✔
2256
    T_LONG_JMP(pTaskInfo->env, ret);
×
2257
  }
2258
}
1,764,789✔
2259

2260
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
1,122,711✔
2261
  pRes->info.id.groupId = pMiaInfo->groupId;
1,122,711✔
2262
  pMiaInfo->curTs = INT64_MIN;
1,122,711✔
2263
  pMiaInfo->groupId = 0;
1,122,711✔
2264
}
1,122,711✔
2265

2266
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
1,586,423✔
2267
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
1,586,423✔
2268
  int32_t                               code = TSDB_CODE_SUCCESS;
1,586,423✔
2269
  int32_t                               lino = 0;
1,586,423✔
2270
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
1,586,423✔
2271
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
1,586,423✔
2272

2273
  SExprSupp*      pSup = &pOperator->exprSupp;
1,586,423✔
2274
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
1,586,423✔
2275
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
1,586,423✔
2276
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
1,586,423✔
2277

2278
  while (1) {
1,357,776✔
2279
    SSDataBlock* pBlock = NULL;
2,944,199✔
2280
    if (pMiaInfo->prefetchedBlock == NULL) {
2,944,199✔
2281
      pBlock = getNextBlockFromDownstream(pOperator, 0);
2,251,418✔
2282
    } else {
2283
      pBlock = pMiaInfo->prefetchedBlock;
692,781✔
2284
      pMiaInfo->prefetchedBlock = NULL;
692,781✔
2285

2286
      pMiaInfo->groupId = pBlock->info.id.groupId;
692,781✔
2287
    }
2288

2289
    // no data exists, all query processing is done
2290
    if (pBlock == NULL) {
2,944,199✔
2291
      // close last unclosed time window
2292
      if (pMiaInfo->curTs != INT64_MIN) {
486,629✔
2293
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
429,930✔
2294
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
429,930✔
2295
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
429,930✔
2296
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
429,930✔
2297
        QUERY_CHECK_CODE(code, lino, _end);
429,930✔
2298
      }
2299

2300
      setOperatorCompleted(pOperator);
486,629✔
2301
      break;
486,629✔
2302
    }
2303

2304
    if (pMiaInfo->groupId == 0) {
2,457,570✔
2305
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
799,652✔
2306
        pMiaInfo->groupId = pBlock->info.id.groupId;
111,354✔
2307
        pRes->info.id.groupId = pMiaInfo->groupId;
111,354✔
2308
      }
2309
    } else {
2310
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
1,657,918✔
2311
        // if there are unclosed time window, close it firstly.
2312
        if (pMiaInfo->curTs == INT64_MIN) {
692,781✔
2313
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2314
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2315
        }
2316
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
692,781✔
2317
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
692,781✔
2318

2319
        pMiaInfo->prefetchedBlock = pBlock;
692,781✔
2320
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
692,781✔
2321
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
692,781✔
2322
        QUERY_CHECK_CODE(code, lino, _end);
692,781✔
2323
        if (pRes->info.rows == 0) {
692,781✔
2324
          // After filtering for last group, the result is empty, so we need to continue to process next group
2325
          continue;
4,726✔
2326
        } else {
2327
          break;
688,055✔
2328
        }
2329
      } else {
2330
        // continue
2331
        pRes->info.id.groupId = pMiaInfo->groupId;
965,137✔
2332
      }
2333
    }
2334

2335
    pRes->info.scanFlag = pBlock->info.scanFlag;
1,764,789✔
2336
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
1,764,789✔
2337
    QUERY_CHECK_CODE(code, lino, _end);
1,764,789✔
2338

2339
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
1,764,789✔
2340

2341
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,764,789✔
2342
    QUERY_CHECK_CODE(code, lino, _end);
1,764,789✔
2343

2344
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
1,764,789✔
2345
      break;
411,739✔
2346
    }
2347
  }
2348

2349
_end:
1,586,423✔
2350
  if (code != TSDB_CODE_SUCCESS) {
1,586,423✔
2351
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2352
    pTaskInfo->code = code;
×
2353
    T_LONG_JMP(pTaskInfo->env, code);
×
2354
  }
2355
}
1,586,423✔
2356

2357
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,829,296✔
2358
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
1,829,296✔
2359
  int32_t                               code = TSDB_CODE_SUCCESS;
1,829,296✔
2360
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
1,829,296✔
2361
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
1,829,296✔
2362
  if (pOperator->status == OP_EXEC_DONE) {
1,829,296✔
2363
    (*ppRes) = NULL;
448,354✔
2364
    return code;
448,354✔
2365
  }
2366

2367
  SSDataBlock* pRes = iaInfo->binfo.pRes;
1,380,942✔
2368
  blockDataCleanup(pRes);
1,380,942✔
2369

2370
  if (iaInfo->binfo.mergeResultBlock) {
1,380,942✔
2371
    while (1) {
2372
      if (pOperator->status == OP_EXEC_DONE) {
1,228,799✔
2373
        break;
182,644✔
2374
      }
2375

2376
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
1,046,155✔
2377
        break;
329,015✔
2378
      }
2379

2380
      doMergeAlignedIntervalAgg(pOperator);
717,140✔
2381
    }
2382
  } else {
2383
    doMergeAlignedIntervalAgg(pOperator);
869,283✔
2384
  }
2385

2386
  size_t rows = pRes->info.rows;
1,380,942✔
2387
  pOperator->resultInfo.totalRows += rows;
1,380,942✔
2388
  (*ppRes) = (rows == 0) ? NULL : pRes;
1,380,942✔
2389
  return code;
1,380,942✔
2390
}
2391

2392
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
×
2393
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
×
2394
  
2395
  uint64_t     groupId;  // current groupId
2396
  int64_t      curTs;    // current ts
2397
  SSDataBlock* prefetchedBlock;
2398
  SResultRow*  pResultRow;
2399

2400
  pInfo->groupId = 0;
×
2401
  pInfo->curTs = INT64_MIN;
×
2402
  pInfo->prefetchedBlock = NULL;
×
2403
  pInfo->pResultRow = NULL;
×
2404

2405
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
×
2406
}
2407

2408
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
492,075✔
2409
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2410
  QRY_PARAM_CHECK(pOptrInfo);
492,075✔
2411

2412
  int32_t                               code = TSDB_CODE_SUCCESS;
492,075✔
2413
  int32_t                               lino = 0;
492,075✔
2414
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
492,075✔
2415
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
492,075✔
2416
  if (miaInfo == NULL || pOperator == NULL) {
492,075✔
2417
    code = terrno;
×
2418
    goto _error;
×
2419
  }
2420

2421
  pOperator->pPhyNode = pNode;
492,075✔
2422
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
492,075✔
2423
  if (miaInfo->intervalAggOperatorInfo == NULL) {
492,075✔
2424
    code = terrno;
×
2425
    goto _error;
×
2426
  }
2427

2428
  SInterval interval = {.interval = pNode->interval,
1,476,225✔
2429
                        .sliding = pNode->sliding,
492,075✔
2430
                        .intervalUnit = pNode->intervalUnit,
492,075✔
2431
                        .slidingUnit = pNode->slidingUnit,
492,075✔
2432
                        .offset = pNode->offset,
492,075✔
2433
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
492,075✔
2434
                        .timeRange = pNode->timeRange};
2435
  calcIntervalAutoOffset(&interval);
492,075✔
2436

2437
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
492,075✔
2438
  SExprSupp*                pSup = &pOperator->exprSupp;
492,075✔
2439
  pSup->hasWindowOrGroup = true;
492,075✔
2440
  pSup->hasWindow = true;
492,075✔
2441

2442
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
492,075✔
2443
                            pTaskInfo->pStreamRuntimeInfo);
492,075✔
2444
  QUERY_CHECK_CODE(code, lino, _error);
492,075✔
2445

2446
  miaInfo->curTs = INT64_MIN;
492,075✔
2447
  iaInfo->win = pTaskInfo->window;
492,075✔
2448
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
492,075✔
2449
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
492,075✔
2450
  iaInfo->interval = interval;
492,075✔
2451
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
492,075✔
2452
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
492,075✔
2453

2454
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
492,075✔
2455
  initResultSizeInfo(&pOperator->resultInfo, 512);
492,075✔
2456

2457
  int32_t    num = 0;
492,075✔
2458
  SExprInfo* pExprInfo = NULL;
492,075✔
2459
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
492,075✔
2460
  QUERY_CHECK_CODE(code, lino, _error);
492,075✔
2461

2462
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
984,150✔
2463
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
492,075✔
2464
  QUERY_CHECK_CODE(code, lino, _error);
492,075✔
2465

2466
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
492,075✔
2467
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
492,075✔
2468
  initBasicInfo(&iaInfo->binfo, pResBlock);
492,075✔
2469
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
492,075✔
2470
  QUERY_CHECK_CODE(code, lino, _error);
492,075✔
2471

2472
  iaInfo->timeWindowInterpo = false;
492,075✔
2473
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
492,075✔
2474
  QUERY_CHECK_CODE(code, lino, _error);
492,075✔
2475
  if (iaInfo->timeWindowInterpo) {
492,075✔
2476
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2477
  }
2478

2479
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
492,075✔
2480
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
492,075✔
2481
  QUERY_CHECK_CODE(code, lino, _error);
492,075✔
2482
  iaInfo->pOperator = pOperator;
492,075✔
2483
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
492,075✔
2484
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2485

2486
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
492,075✔
2487
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2488
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
492,075✔
2489

2490
  code = appendDownstream(pOperator, &downstream, 1);
492,075✔
2491
  QUERY_CHECK_CODE(code, lino, _error);
492,075✔
2492

2493
  *pOptrInfo = pOperator;
492,075✔
2494
  return TSDB_CODE_SUCCESS;
492,075✔
2495

2496
_error:
×
2497
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2498
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2499
  pTaskInfo->code = code;
×
2500
  return code;
×
2501
}
2502

2503
//=====================================================================================================================
2504
// merge interval operator
2505
typedef struct SMergeIntervalAggOperatorInfo {
2506
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2507
  SList*                   groupIntervals;
2508
  SListIter                groupIntervalsIter;
2509
  bool                     hasGroupId;
2510
  uint64_t                 groupId;
2511
  SSDataBlock*             prefetchedBlock;
2512
  bool                     inputBlocksFinished;
2513
} SMergeIntervalAggOperatorInfo;
2514

2515
typedef struct SGroupTimeWindow {
2516
  uint64_t    groupId;
2517
  STimeWindow window;
2518
} SGroupTimeWindow;
2519

2520
void destroyMergeIntervalOperatorInfo(void* param) {
×
2521
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2522
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2523
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2524

2525
  taosMemoryFreeClear(param);
×
2526
}
×
2527

2528
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2529
                                        STimeWindow* newWin) {
2530
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2531
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2532
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2533

2534
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2535
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2536
  if (code != TSDB_CODE_SUCCESS) {
×
2537
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2538
    return code;
×
2539
  }
2540

2541
  SListIter iter = {0};
×
2542
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2543
  SListNode* listNode = NULL;
×
2544
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2545
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2546
    if (prevGrpWin->groupId != tableGroupId) {
×
2547
      continue;
×
2548
    }
2549

2550
    STimeWindow* prevWin = &prevGrpWin->window;
×
2551
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2552
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2553
      taosMemoryFreeClear(tmp);
×
2554
    }
2555
  }
2556

2557
  return TSDB_CODE_SUCCESS;
×
2558
}
2559

2560
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2561
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2562
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2563
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2564

2565
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2566
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2567

2568
  int32_t     startPos = 0;
×
2569
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2570
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2571
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2572
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2573
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2574
  SResultRow* pResult = NULL;
×
2575

2576
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2577
                                        iaInfo->binfo.inputTsOrder);
2578

2579
  int32_t ret =
2580
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2581
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2582
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2583
    T_LONG_JMP(pTaskInfo->env, ret);
×
2584
  }
2585

2586
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2587
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2588
                                                 iaInfo->binfo.inputTsOrder);
2589
  if (forwardRows <= 0) {
×
2590
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2591
  }
2592

2593
  // prev time window not interpolation yet.
2594
  if (iaInfo->timeWindowInterpo) {
×
2595
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2596
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2597

2598
    // restore current time window
2599
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2600
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2601
    if (ret != TSDB_CODE_SUCCESS) {
×
2602
      T_LONG_JMP(pTaskInfo->env, ret);
×
2603
    }
2604

2605
    // window start key interpolation
2606
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2607
    if (ret != TSDB_CODE_SUCCESS) {
×
2608
      T_LONG_JMP(pTaskInfo->env, ret);
×
2609
    }
2610
  }
2611

2612
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2613
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2614
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
2615
  if (ret != TSDB_CODE_SUCCESS) {
×
2616
    T_LONG_JMP(pTaskInfo->env, ret);
×
2617
  }
2618
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2619

2620
  // output previous interval results after this interval (&win) is closed
2621
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2622
  if (code != TSDB_CODE_SUCCESS) {
×
2623
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2624
    T_LONG_JMP(pTaskInfo->env, code);
×
2625
  }
2626

2627
  STimeWindow nextWin = win;
×
2628
  while (1) {
×
2629
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2630
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2631
                                      iaInfo->binfo.inputTsOrder);
2632
    if (startPos < 0) {
×
2633
      break;
×
2634
    }
2635

2636
    // null data, failed to allocate more memory buffer
2637
    code =
2638
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2639
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2640
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2641
      T_LONG_JMP(pTaskInfo->env, code);
×
2642
    }
2643

2644
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2645
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2646
                                           iaInfo->binfo.inputTsOrder);
2647

2648
    // window start(end) key interpolation
2649
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2650
    if (code != TSDB_CODE_SUCCESS) {
×
2651
      T_LONG_JMP(pTaskInfo->env, code);
×
2652
    }
2653

2654
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2655
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2656
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2657
    if (code != TSDB_CODE_SUCCESS) {
×
2658
      T_LONG_JMP(pTaskInfo->env, code);
×
2659
    }
2660
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2661

2662
    // output previous interval results after this interval (&nextWin) is closed
2663
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2664
    if (code != TSDB_CODE_SUCCESS) {
×
2665
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2666
      T_LONG_JMP(pTaskInfo->env, code);
×
2667
    }
2668
  }
2669

2670
  if (iaInfo->timeWindowInterpo) {
×
2671
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2672
  }
2673
}
×
2674

2675
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2676
  int32_t        code = TSDB_CODE_SUCCESS;
×
2677
  int32_t        lino = 0;
×
2678
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2679

2680
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2681
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2682
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2683

2684
  if (pOperator->status == OP_EXEC_DONE) {
×
2685
    (*ppRes) = NULL;
×
2686
    return code;
×
2687
  }
2688

2689
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2690
  blockDataCleanup(pRes);
×
2691
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2692
  QUERY_CHECK_CODE(code, lino, _end);
×
2693

2694
  if (!miaInfo->inputBlocksFinished) {
×
2695
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2696
    while (1) {
×
2697
      SSDataBlock* pBlock = NULL;
×
2698
      if (miaInfo->prefetchedBlock == NULL) {
×
2699
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2700
      } else {
2701
        pBlock = miaInfo->prefetchedBlock;
×
2702
        miaInfo->groupId = pBlock->info.id.groupId;
×
2703
        miaInfo->prefetchedBlock = NULL;
×
2704
      }
2705

2706
      if (pBlock == NULL) {
×
2707
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2708
        miaInfo->inputBlocksFinished = true;
×
2709
        break;
×
2710
      }
2711

2712
      if (!miaInfo->hasGroupId) {
×
2713
        miaInfo->hasGroupId = true;
×
2714
        miaInfo->groupId = pBlock->info.id.groupId;
×
2715
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2716
        miaInfo->prefetchedBlock = pBlock;
×
2717
        break;
×
2718
      }
2719

2720
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2721
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2722
      QUERY_CHECK_CODE(code, lino, _end);
×
2723

2724
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2725

2726
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2727
        break;
×
2728
      }
2729
    }
2730

2731
    pRes->info.id.groupId = miaInfo->groupId;
×
2732
  }
2733

2734
  if (miaInfo->inputBlocksFinished) {
×
2735
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2736

2737
    if (listNode != NULL) {
×
2738
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2739
      pRes->info.id.groupId = grpWin->groupId;
×
2740
    }
2741
  }
2742

2743
  if (pRes->info.rows == 0) {
×
2744
    setOperatorCompleted(pOperator);
×
2745
  }
2746

2747
  size_t rows = pRes->info.rows;
×
2748
  pOperator->resultInfo.totalRows += rows;
×
2749

2750
_end:
×
2751
  if (code != TSDB_CODE_SUCCESS) {
×
2752
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2753
    pTaskInfo->code = code;
×
2754
    T_LONG_JMP(pTaskInfo->env, code);
×
2755
  }
2756
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2757
  return code;
×
2758
}
2759

2760
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
2761
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
2762

2763
  pInfo->hasGroupId = false;
×
2764
  pInfo->groupId = 0;
×
2765
  pInfo->prefetchedBlock = NULL;
×
2766
  pInfo->inputBlocksFinished = false;
×
2767
  tdListEmpty(pInfo->groupIntervals);
×
2768
  
2769
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
2770
  return resetInterval(pOper, pIntervalInfo);
×
2771
}
2772

2773
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2774
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2775
  QRY_PARAM_CHECK(pOptrInfo);
×
2776

2777
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2778
  int32_t                        lino = 0;
×
2779
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2780
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2781
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2782
    code = terrno;
×
2783
    goto _error;
×
2784
  }
2785

2786
  pOperator->pPhyNode = pIntervalPhyNode;
×
2787
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2788
                        .sliding = pIntervalPhyNode->sliding,
×
2789
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2790
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2791
                        .offset = pIntervalPhyNode->offset,
×
2792
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2793
                        .timeRange = pIntervalPhyNode->timeRange};
2794
  calcIntervalAutoOffset(&interval);
×
2795

2796
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2797

2798
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2799
  pIntervalInfo->win = pTaskInfo->window;
×
2800
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2801
  pIntervalInfo->interval = interval;
×
2802
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2803
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2804
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2805

2806
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2807
  pExprSupp->hasWindowOrGroup = true;
×
2808
  pExprSupp->hasWindow = true;
×
2809

2810
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2811
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2812

2813
  int32_t    num = 0;
×
2814
  SExprInfo* pExprInfo = NULL;
×
2815
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2816
  QUERY_CHECK_CODE(code, lino, _error);
×
2817

2818
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2819
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2820
  if (code != TSDB_CODE_SUCCESS) {
×
2821
    goto _error;
×
2822
  }
2823

2824
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2825
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2826
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2827
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2828
  QUERY_CHECK_CODE(code, lino, _error);
×
2829

2830
  pIntervalInfo->timeWindowInterpo = false;
×
2831
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2832
  QUERY_CHECK_CODE(code, lino, _error);
×
2833
  if (pIntervalInfo->timeWindowInterpo) {
×
2834
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2835
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2836
      goto _error;
×
2837
    }
2838
  }
2839

2840
  pIntervalInfo->pOperator = pOperator;
×
2841
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2842
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2843
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2844
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2845
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2846
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
2847

2848
  code = appendDownstream(pOperator, &downstream, 1);
×
2849
  if (code != TSDB_CODE_SUCCESS) {
×
2850
    goto _error;
×
2851
  }
2852

2853
  *pOptrInfo = pOperator;
×
2854
  return TSDB_CODE_SUCCESS;
×
2855
_error:
×
2856
  if (pMergeIntervalInfo != NULL) {
×
2857
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2858
  }
2859
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2860
  pTaskInfo->code = code;
×
2861
  return code;
×
2862
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc