• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4914

06 Jan 2026 01:30AM UTC coverage: 64.876% (-0.008%) from 64.884%
#4914

push

travis-ci

web-flow
merge: from main to 3.0 branch #34167

180 of 319 new or added lines in 14 files covered. (56.43%)

3475 existing lines in 124 files now uncovered.

194993 of 300563 relevant lines covered (64.88%)

116239151.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.53
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47

48
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
2,147,483,647✔
49
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
50
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
51
                                      SExecTaskInfo* pTaskInfo) {
52
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
2,147,483,647✔
53
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
54

55
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,147,483,647✔
56
    *pResult = NULL;
1✔
57
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
58
    return pTaskInfo->code;
×
59
  }
60

61
  // set time window for current result
62
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
2,147,483,647✔
63
  *pResult = pResultRow;
2,147,483,647✔
64
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,147,483,647✔
65
}
66

67
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, int32_t rowIndex, uint64_t groupId) {
89,595,715✔
68
  pRowSup->win.ekey = ts;
89,595,715✔
69
  pRowSup->prevTs = ts;
89,596,141✔
70
  pRowSup->groupId = groupId;
89,596,141✔
71
  pRowSup->numOfRows += 1;
89,596,141✔
72
  if (hasContinuousNullRows(pRowSup)) {
89,595,715✔
73
    // rows having null state col are wrapped by rows of same state
74
    // these rows can be counted into current window
75
    pRowSup->numOfRows += pRowSup->numNullRows;
4,064,907✔
76
    resetNumNullRows(pRowSup);
4,064,907✔
77
  }
78
}
89,596,630✔
79

80
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
5,371,537✔
81
  pRowSup->startRowIndex = rowIndex;
5,371,537✔
82
  pRowSup->numOfRows = 0;
5,371,537✔
83
  pRowSup->win.skey = tsList[rowIndex];
5,371,537✔
84
  pRowSup->groupId = groupId;
5,371,537✔
85
  resetNumNullRows(pRowSup);
5,371,111✔
86
}
5,371,537✔
87

88
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
89
                                            int32_t order, int64_t* pData) {
90
  int32_t forwardRows = 0;
2,039,833,416✔
91

92
  if (order == TSDB_ORDER_ASC) {
×
93
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
1,691,910,515✔
94
    if (end >= 0) {
1,688,812,414✔
95
      forwardRows = end;
1,688,737,631✔
96

97
      while (pData[end + pos] == ekey) {
1,692,419,593✔
98
        forwardRows += 1;
3,681,962✔
99
        ++pos;
3,681,962✔
100
      }
101
    }
102
  } else {
103
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
347,922,901✔
104
    if (end >= 0) {
347,809,167✔
105
      forwardRows = end;
348,185,585✔
106

107
      while (pData[end + pos] == ekey) {
690,767,242✔
108
        forwardRows += 1;
342,581,657✔
109
        ++pos;
342,581,657✔
110
      }
111
    }
112
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
113
    //    if (end >= 0) {
114
    //      forwardRows = pos - end;
115
    //
116
    //      if (pData[end] == ekey) {
117
    //        forwardRows += 1;
118
    //      }
119
    //    }
120
  }
121

122
  return forwardRows;
2,035,385,557✔
123
}
124

125
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
2,104,323,138✔
126
  int32_t midPos = -1;
2,104,323,138✔
127
  int32_t numOfRows;
128

129
  if (num <= 0) {
2,104,323,138✔
130
    return -1;
×
131
  }
132

133
  TSKEY*  keyList = (TSKEY*)pValue;
2,104,323,138✔
134
  int32_t firstPos = 0;
2,104,323,138✔
135
  int32_t lastPos = num - 1;
2,104,323,138✔
136

137
  if (order == TSDB_ORDER_DESC) {
2,104,323,138✔
138
    // find the first position which is smaller than the key
139
    while (1) {
140
      if (key >= keyList[firstPos]) return firstPos;
399,301,742✔
141
      if (key == keyList[lastPos]) return lastPos;
58,246,762✔
142

143
      if (key < keyList[lastPos]) {
57,813,134✔
144
        lastPos += 1;
6,442,337✔
145
        if (lastPos >= num) {
6,442,337✔
146
          return -1;
×
147
        } else {
148
          return lastPos;
6,442,337✔
149
        }
150
      }
151

152
      numOfRows = lastPos - firstPos + 1;
51,375,181✔
153
      midPos = (numOfRows >> 1) + firstPos;
51,375,181✔
154

155
      if (key < keyList[midPos]) {
51,375,181✔
156
        firstPos = midPos + 1;
1,436,257✔
157
      } else if (key > keyList[midPos]) {
49,942,812✔
158
        lastPos = midPos - 1;
49,452,437✔
159
      } else {
160
        break;
486,487✔
161
      }
162
    }
163

164
  } else {
165
    // find the first position which is bigger than the key
166
    while (1) {
167
      if (key <= keyList[firstPos]) return firstPos;
2,147,483,647✔
168
      if (key == keyList[lastPos]) return lastPos;
2,147,483,647✔
169

170
      if (key > keyList[lastPos]) {
2,147,483,647✔
171
        lastPos = lastPos + 1;
1,592,424,389✔
172
        if (lastPos >= num)
1,592,424,389✔
173
          return -1;
1,105,315✔
174
        else
175
          return lastPos;
1,591,319,074✔
176
      }
177

178
      numOfRows = lastPos - firstPos + 1;
2,147,483,647✔
179
      midPos = (numOfRows >> 1u) + firstPos;
2,147,483,647✔
180

181
      if (key < keyList[midPos]) {
2,147,483,647✔
182
        lastPos = midPos - 1;
2,147,483,647✔
183
      } else if (key > keyList[midPos]) {
161,066,752✔
184
        firstPos = midPos + 1;
160,029,039✔
185
      } else {
186
        break;
1,016,253✔
187
      }
188
    }
189
  }
190

191
  return midPos;
1,502,740✔
192
}
193

194
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
2,042,833,435✔
195
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
196
  int32_t num = -1;
2,042,833,435✔
197
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
2,042,833,435✔
198

199
  if (order == TSDB_ORDER_ASC) {
2,042,833,435✔
200
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
1,702,558,901✔
201
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
1,691,542,019✔
202
      if (item != NULL) {
1,687,579,622✔
203
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
204
      }
205
    } else {
206
      num = pDataBlockInfo->rows - startPos;
9,931,306✔
207
      if (item != NULL) {
11,814,756✔
208
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
209
      }
210
    }
211
  } else {  // desc
212
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
340,274,534✔
213
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
347,810,068✔
214
      if (item != NULL) {
347,805,935✔
215
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
216
      }
217
    } else {
218
      num = pDataBlockInfo->rows - startPos;
510,837✔
219
      if (item != NULL) {
1,388,789✔
220
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
221
      }
222
    }
223
  }
224

225
  return num;
2,044,143,033✔
226
}
227

228
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
9,720,480✔
229
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
230
  SqlFunctionCtx* pCtx = pSup->pCtx;
9,720,480✔
231

232
  int32_t index = 1;
9,720,480✔
233
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
29,166,669✔
234
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
19,446,189✔
235
      pCtx[k].start.key = INT64_MIN;
9,725,709✔
236
      continue;
9,725,709✔
237
    }
238

239
    SFunctParam*     pParam = &pCtx[k].param[0];
9,720,480✔
240
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
9,720,480✔
241

242
    double v1 = 0, v2 = 0, v = 0;
9,720,480✔
243
    if (prevRowIndex == -1) {
9,720,480✔
244
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
245
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
×
246
    } else {
247
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
9,720,480✔
248
                     typeGetTypeModFromColInfo(&pColInfo->info));
249
    }
250

251
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
9,720,480✔
252
                   typeGetTypeModFromColInfo(&pColInfo->info));
253

254
#if 0
255
    if (functionId == FUNCTION_INTERP) {
256
      if (type == RESULT_ROW_START_INTERP) {
257
        pCtx[k].start.key = prevTs;
258
        pCtx[k].start.val = v1;
259

260
        pCtx[k].end.key = curTs;
261
        pCtx[k].end.val = v2;
262

263
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
264
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
265
          if (prevRowIndex == -1) {
266
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
267
          } else {
268
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
269
          }
270

271
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
272
        }
273
      }
274
    } else if (functionId == FUNCTION_TWA) {
275
#endif
276

277
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
9,720,480✔
278
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
9,720,480✔
279
    SPoint point = (SPoint){.key = windowKey, .val = &v};
9,720,480✔
280

281
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
9,720,480✔
282
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
9,663,275✔
283
    }
284

285
    if (type == RESULT_ROW_START_INTERP) {
9,720,480✔
286
      pCtx[k].start.key = point.key;
4,833,933✔
287
      pCtx[k].start.val = v;
4,833,933✔
288
    } else {
289
      pCtx[k].end.key = point.key;
4,886,547✔
290
      pCtx[k].end.val = v;
4,886,547✔
291
    }
292

293
    index += 1;
9,720,480✔
294
  }
295
#if 0
296
  }
297
#endif
298
}
9,720,480✔
299

300
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
626,972✔
301
  if (type == RESULT_ROW_START_INTERP) {
626,972✔
302
    for (int32_t k = 0; k < numOfOutput; ++k) {
997,216✔
303
      pCtx[k].start.key = INT64_MIN;
657,423✔
304
    }
305
  } else {
306
    for (int32_t k = 0; k < numOfOutput; ++k) {
858,699✔
307
      pCtx[k].end.key = INT64_MIN;
571,520✔
308
    }
309
  }
310
}
626,972✔
311

312
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
5,173,726✔
313
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
314
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
5,173,726✔
315

316
  TSKEY curTs = tsCols[pos];
5,173,726✔
317

318
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
5,173,726✔
319
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
5,173,726✔
320

321
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
322
  // start exactly from this point, no need to do interpolation
323
  TSKEY key = ascQuery ? win->skey : win->ekey;
5,173,726✔
324
  if (key == curTs) {
5,173,726✔
325
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
331,673✔
326
    return true;
331,673✔
327
  }
328

329
  // it is the first time window, no need to do interpolation
330
  if (pTsKey->isNull && pos == 0) {
4,842,053✔
331
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
8,120✔
332
  } else {
333
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
4,833,933✔
334
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
4,833,933✔
335
                              RESULT_ROW_START_INTERP, pSup);
336
  }
337

338
  return true;
4,842,053✔
339
}
340

341
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
5,173,726✔
342
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
343
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
344
  int32_t code = TSDB_CODE_SUCCESS;
5,173,726✔
345
  int32_t lino = 0;
5,173,726✔
346
  int32_t order = pInfo->binfo.inputTsOrder;
5,173,726✔
347

348
  TSKEY actualEndKey = tsCols[endRowIndex];
5,173,726✔
349
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
5,173,726✔
350

351
  // not ended in current data block, do not invoke interpolation
352
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
5,173,726✔
353
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
12,065✔
354
    (*pRes) = false;
12,065✔
355
    return code;
12,065✔
356
  }
357

358
  // there is actual end point of current time window, no interpolation needs
359
  if (key == actualEndKey) {
5,161,661✔
360
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
275,114✔
361
    (*pRes) = true;
275,114✔
362
    return code;
275,114✔
363
  }
364

365
  if (nextRowIndex < 0) {
4,886,547✔
366
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
367
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
368
  }
369

370
  TSKEY nextKey = tsCols[nextRowIndex];
4,886,547✔
371
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
4,886,547✔
372
                            RESULT_ROW_END_INTERP, pSup);
373
  (*pRes) = true;
4,886,547✔
374
  return code;
4,886,547✔
375
}
376

377
bool inCalSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) {
2,147,483,647✔
378
  if (pInterval->interval != pInterval->sliding &&
2,147,483,647✔
379
      ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) {
80,263,497✔
380
    return false;
×
381
  }
382

383
  return true;
2,147,483,647✔
384
}
385

386
bool inSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, const SDataBlockInfo* pBlockInfo) {
2,147,483,647✔
387
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey, pBlockInfo->type);
2,147,483,647✔
388
}
389

390
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
2,147,483,647✔
391
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
392
  bool ascQuery = (order != TSDB_ORDER_DESC);
2,147,483,647✔
393

394
  int32_t precision = pInterval->precision;
2,147,483,647✔
395
  getNextTimeWindow(pInterval, pNext, order);
2,147,483,647✔
396

397
  // next time window is not in current block
398
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
2,147,483,647✔
399
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
2,147,483,647✔
400
    return -1;
9,391,669✔
401
  }
402

403
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
2,147,483,647✔
404
    return -1;
×
405
  }
406

407
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
2,147,483,647✔
408
  int32_t startPos = 0;
2,147,483,647✔
409

410
  // tumbling time window query, a special case of sliding time window query
411
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
2,147,483,647✔
412
    startPos = prevPosition + 1;
2,147,483,647✔
413
  } else {
414
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
79,099,895✔
415
      startPos = 0;
7,376,059✔
416
    } else {
417
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
72,886,483✔
418
    }
419
  }
420
  if(startPos < 0 || startPos >= pDataBlockInfo->rows) {
2,147,483,647✔
421
    return -1;
2,147,483,647✔
422
  }
423

424
  /* interp query with fill should not skip time window */
425
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
426
  //    return startPos;
427
  //  }
428

429
  /*
430
   * This time window does not cover any data, try next time window,
431
   * this case may happen when the time window is too small
432
   */
433
  if (primaryKeys != NULL) {
2,108,780,270✔
434
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
2,147,483,647✔
435
      TSKEY next = primaryKeys[startPos];
1,458,043,593✔
436
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
1,458,391,864✔
437
        pNext->skey = taosTimeTruncate(next, pInterval);
489,084✔
438
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
1,480✔
439
      } else {
440
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
1,458,877,992✔
441
        pNext->skey = pNext->ekey - pInterval->interval + 1;
1,459,068,037✔
442
      }
443
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
652,701,501✔
444
      TSKEY next = primaryKeys[startPos];
346,850,014✔
445
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
346,989,493✔
446
        pNext->skey = taosTimeTruncate(next, pInterval);
584,366✔
447
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
448
      } else {
449
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
346,441,250✔
450
        pNext->ekey = pNext->skey + pInterval->interval - 1;
347,004,772✔
451
      }
452
    }
453
  }
454

455
  return startPos;
2,106,517,440✔
456
}
457

458
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
15,521,178✔
459
  if (type == RESULT_ROW_START_INTERP) {
15,521,178✔
460
    return pResult->startInterp == true;
5,173,726✔
461
  } else {
462
    return pResult->endInterp == true;
10,347,452✔
463
  }
464
}
465

466
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
10,335,387✔
467
  if (type == RESULT_ROW_START_INTERP) {
10,335,387✔
468
    pResult->startInterp = true;
5,173,726✔
469
  } else {
470
    pResult->endInterp = true;
5,161,661✔
471
  }
472
}
10,335,387✔
473

474
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
2,147,483,647✔
475
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
476
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
477
  int32_t lino = 0;
2,147,483,647✔
478
  if (!pInfo->timeWindowInterpo) {
2,147,483,647✔
479
    return code;
2,147,483,647✔
480
  }
481

482
  if (pBlock == NULL) {
3,332,566✔
483
    code = TSDB_CODE_INVALID_PARA;
×
484
    return code;
×
485
  }
486

487
  if (pBlock->pDataBlock == NULL) {
3,332,566✔
488
    return code;
×
489
  }
490

491
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
5,173,726✔
492

493
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
5,173,726✔
494
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
5,173,726✔
495
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
5,173,726✔
496
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
5,173,726✔
497
    if (interp) {
5,173,726✔
498
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
5,173,726✔
499
    }
500
  } else {
501
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
×
502
  }
503

504
  // point interpolation does not require the end key time window interpolation.
505
  // interpolation query does not generate the time window end interpolation
506
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
5,173,726✔
507
  if (!done) {
5,173,726✔
508
    int32_t endRowIndex = startPos + forwardRows - 1;
5,173,726✔
509
    int32_t nextRowIndex = endRowIndex + 1;
5,173,726✔
510

511
    // duplicated ts row does not involve in the interpolation of end value for current time window
512
    int32_t x = endRowIndex;
5,173,726✔
513
    while (x > 0) {
5,188,406✔
514
      if (tsCols[x] == tsCols[x - 1]) {
5,176,947✔
515
        x -= 1;
14,680✔
516
      } else {
517
        endRowIndex = x;
5,162,267✔
518
        break;
5,162,267✔
519
      }
520
    }
521

522
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
5,173,726✔
523
    bool  interp = false;
5,173,726✔
524
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
5,173,726✔
525
                                           win, &interp);
526
    QUERY_CHECK_CODE(code, lino, _end);
5,173,726✔
527
    if (interp) {
5,173,726✔
528
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
5,161,661✔
529
    }
530
  } else {
531
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
532
  }
533

534
_end:
5,173,726✔
535
  if (code != TSDB_CODE_SUCCESS) {
5,173,726✔
536
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
537
  }
538
  return code;
5,173,726✔
539
}
540

541
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
16,165✔
542
  if (pBlock->pDataBlock == NULL) {
16,165✔
543
    return;
×
544
  }
545

546
  size_t num = taosArrayGetSize(pPrevKeys);
16,165✔
547
  for (int32_t k = 0; k < num; ++k) {
48,495✔
548
    SColumn* pc = taosArrayGet(pCols, k);
32,330✔
549

550
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
32,330✔
551

552
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
32,330✔
553
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
32,330✔
554
      if (colDataIsNull_s(pColInfo, i)) {
64,660✔
555
        continue;
×
556
      }
557

558
      char* val = colDataGetData(pColInfo, i);
32,330✔
559
      if (IS_VAR_DATA_TYPE(pkey->type)) {
32,330✔
560
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
561
          memcpy(pkey->pData, val, blobDataTLen(val));
×
562
        } else {
563
          memcpy(pkey->pData, val, varDataTLen(val));
×
564
        }
565
      } else {
566
        memcpy(pkey->pData, val, pkey->bytes);
32,330✔
567
      }
568

569
      break;
32,330✔
570
    }
571
  }
572
}
573

574
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
16,165✔
575
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
576
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
16,165✔
577

578
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
16,165✔
579
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
16,165✔
580

581
  int32_t startPos = 0;
16,165✔
582
  int32_t numOfOutput = pSup->numOfExprs;
16,165✔
583

584
  SResultRow* pResult = NULL;
16,165✔
585

586
  while (1) {
×
587
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
16,165✔
588
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
16,165✔
589
    uint64_t            groupId = pOpenWin->groupId;
16,165✔
590
    SResultRowPosition* p1 = &pOpenWin->pos;
16,165✔
591
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
16,165✔
592
      break;
16,165✔
593
    }
594

595
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
596
    if (NULL == pr) {
×
597
      T_LONG_JMP(pTaskInfo->env, terrno);
×
598
    }
599

600
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
601
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
602
      T_LONG_JMP(pTaskInfo->env, terrno);
×
603
    }
604

605
    if (pr->closed) {
×
606
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
607
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
608
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
609
        T_LONG_JMP(pTaskInfo->env, terrno);
×
610
      }
611
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
612
      taosMemoryFree(pNode);
×
613
      continue;
×
614
    }
615

616
    STimeWindow w = pr->win;
×
617
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
618
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
619
    if (ret != TSDB_CODE_SUCCESS) {
×
620
      T_LONG_JMP(pTaskInfo->env, ret);
×
621
    }
622

623
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
624
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
625
      T_LONG_JMP(pTaskInfo->env, terrno);
×
626
    }
627

628
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
629
    if (!pTsKey) {
×
630
      pTaskInfo->code = terrno;
×
631
      T_LONG_JMP(pTaskInfo->env, terrno);
×
632
    }
633

634
    int64_t prevTs = *(int64_t*)pTsKey->pData;
×
635
    if (groupId == pBlock->info.id.groupId) {
×
636
      TSKEY curTs = pBlock->info.window.skey;
×
637
      if (tsCols != NULL) {
×
638
        curTs = tsCols[startPos];
×
639
      }
640
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
641
                                RESULT_ROW_END_INTERP, pSup);
642
    }
643

644
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
645
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
646

647
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
648
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
649
                                          pBlock->info.rows, numOfExprs);
×
650
    if (ret != TSDB_CODE_SUCCESS) {
×
651
      T_LONG_JMP(pTaskInfo->env, ret);
×
652
    }
653

654
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
655
      closeResultRow(pr);
×
656
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
657
      taosMemoryFree(pNode);
×
658
    } else {  // the remains are can not be closed yet.
659
      break;
×
660
    }
661
  }
662
}
16,165✔
663

664
static bool tsKeyCompFn(void* l, void* r, void* param) {
1,626,767,744✔
665
  TSKEY*                    lTS = (TSKEY*)l;
1,626,767,744✔
666
  TSKEY*                    rTS = (TSKEY*)r;
1,626,767,744✔
667
  SIntervalAggOperatorInfo* pInfo = param;
1,626,767,744✔
668
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
1,626,767,744✔
669
}
670

671
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
258,208,291✔
672
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
258,208,291✔
673
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
258,525,925✔
674
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
259,883,301✔
675
}
676

677
/**
678
 * @brief check if cur window should be filtered out by limit info
679
 * @retval true if should be filtered out
680
 * @retval false if not filtering out
681
 * @note If no limit info, we skip filtering.
682
 *       If input/output ts order mismatch, we skip filtering too.
683
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
684
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
685
 *       every tuple in every block.
686
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
687
 */
688
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
2,147,483,647✔
689
                                  SExecTaskInfo* pTaskInfo) {
690
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
691
  int32_t lino = 0;
2,147,483,647✔
692
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
2,147,483,647✔
693
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
644,736,983✔
694
      // if input/output ts order mismatch, no filter
695
  ) {
696
    return false;
2,147,483,647✔
697
  }
698

699
  if (pOperatorInfo->limit == 0) return true;
260,478,261✔
700

701
  if (pOperatorInfo->pBQ == NULL) {
260,451,945✔
702
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
320,053✔
703
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
320,387✔
704
  }
705

706
  bool shouldFilter = false;
260,378,465✔
707
  // if BQ has been full, compare it with top of BQ
708
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
260,378,465✔
709
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
73,203,550✔
710
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
73,203,550✔
711
  }
712
  if (shouldFilter) {
259,448,609✔
713
    return true;
837,514✔
714
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
258,611,095✔
715
    return false;
107,647,605✔
716
  }
717

718
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
719
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
152,115,122✔
720
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
152,014,922✔
721

722
  *((TSKEY*)node.data) = win->skey;
152,014,922✔
723

724
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
151,958,142✔
725
    taosMemoryFree(node.data);
×
726
    return true;
×
727
  }
728

729
_end:
152,145,850✔
730
  if (code != TSDB_CODE_SUCCESS) {
152,066,024✔
731
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
127,588✔
732
    pTaskInfo->code = code;
127,588✔
733
    T_LONG_JMP(pTaskInfo->env, code);
×
734
  }
735
  return false;
151,938,436✔
736
}
737

738
int32_t getNumOfRowsInTimeWinUnsorted(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, STimeWindow* win,
2,147,483,647✔
739
                                      int32_t startPos) {
740
  int32_t rows = pDataBlockInfo->rows;
2,147,483,647✔
741
  for (int32_t i = startPos; i < pDataBlockInfo->rows; ++i) {
2,147,483,647✔
742
    if (pPrimaryColumn[i] >= win->skey && pPrimaryColumn[i] <= win->ekey) {
2,147,483,647✔
743
      continue;
2,147,483,647✔
744
    } else {
745
      return i - startPos;
2,147,483,647✔
746
    }
747
  }
748
  return rows - startPos;
95,285,496✔
749
}
750

751
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
100,557,902✔
752
                            int32_t scanFlag) {
753
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
100,557,902✔
754
  bool                      sorted = pInfo->binfo.inputTsOrder == ORDER_ASC || pInfo->binfo.inputTsOrder == ORDER_DESC;
100,569,343✔
755

756
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
100,476,477✔
757
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
100,494,619✔
758

759
  int32_t     startPos = 0;
100,525,074✔
760
  int32_t     numOfOutput = pSup->numOfExprs;
100,525,074✔
761
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
100,491,691✔
762
  uint64_t    tableGroupId = pBlock->info.id.groupId;
100,485,933✔
763
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
100,494,673✔
764
  SResultRow* pResult = NULL;
100,507,502✔
765
  TSKEY       ts = sorted ? getStartTsKey(&pBlock->info.window, tsCols) : tsCols[startPos];
100,500,348✔
766

767
  if (tableGroupId != pInfo->curGroupId) {
100,500,627✔
768
    pInfo->handledGroupNum += 1;
10,966,306✔
769
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
10,963,990✔
770
      return true;
13,780✔
771
    } else {
772
      pInfo->curGroupId = tableGroupId;
10,948,174✔
773
      destroyBoundedQueue(pInfo->pBQ);
10,950,603✔
774
      pInfo->pBQ = NULL;
10,946,248✔
775
    }
776
  }
777

778
  STimeWindow win =
100,481,796✔
779
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
100,440,862✔
780
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
100,511,466✔
781

782
  int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
99,847,873✔
783
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
784
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
99,794,103✔
785
    T_LONG_JMP(pTaskInfo->env, ret);
598✔
786
  }
787

788
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
99,810,393✔
789
  int32_t forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey,
9,531,251✔
790
                                                          NULL, pInfo->binfo.inputTsOrder)
791
                               : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &win, startPos);
99,810,393✔
792

793
  // prev time window not interpolation yet.
794
  if (pInfo->timeWindowInterpo) {
99,863,437✔
795
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
16,165✔
796
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
16,165✔
797

798
    // restore current time window
799
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
16,165✔
800
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
801
    if (ret != TSDB_CODE_SUCCESS) {
16,165✔
802
      T_LONG_JMP(pTaskInfo->env, ret);
×
803
    }
804

805
    // window start key interpolation
806
    ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
16,165✔
807
    if (ret != TSDB_CODE_SUCCESS) {
16,165✔
808
      T_LONG_JMP(pTaskInfo->env, ret);
×
809
    }
810
  }
811
  // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
812
  //   win.skey, win.ekey, startPos, forwardRows);
813
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
99,862,177✔
814
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
99,840,812✔
815
                                        pBlock->info.rows, numOfOutput);
99,831,407✔
816
  if (ret != TSDB_CODE_SUCCESS) {
99,780,671✔
817
    T_LONG_JMP(pTaskInfo->env, ret);
×
818
  }
819

820
  doCloseWindow(pResultRowInfo, pInfo, pResult);
99,780,671✔
821

822
  STimeWindow nextWin = win;
99,761,399✔
823
  int32_t rows = pBlock->info.rows;
99,782,323✔
824

825
  while (startPos < pBlock->info.rows) {
2,147,483,647✔
826
    if (sorted) {
2,147,483,647✔
827
      startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, forwardRows - 1 + startPos,
2,037,884,354✔
828
                                        pInfo->binfo.inputTsOrder);
829
      if (startPos < 0) {
2,041,991,949✔
830
        break;
9,391,335✔
831
      }
832
    } else {
833
      pBlock->info.rows = forwardRows;
2,147,483,647✔
834
      int32_t newStartOff = forwardRows >= 1
2,147,483,647✔
835
                                ? getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols + startPos,
2,147,483,647✔
836
                                                         forwardRows - 1, pInfo->binfo.inputTsOrder)
837
                                : -1;
2,147,483,647✔
838
      pBlock->info.rows = rows;
2,147,483,647✔
839
      if (newStartOff >= 0) {
2,147,483,647✔
840
        startPos += newStartOff;
75,690,207✔
841
      } else if ((startPos += forwardRows) < pBlock->info.rows) {
2,147,483,647✔
842
        getInitialStartTimeWindow(&pInfo->interval, tsCols[startPos], &nextWin, true);
2,147,483,647✔
843
      }
844
      if (startPos >= pBlock->info.rows) {
2,147,483,647✔
845
        break;
90,293,143✔
846
      }
847
    }
848

849
    if (filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
2,147,483,647✔
850
      break;
188,465✔
851
    }
852

853
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
2,147,483,647✔
854
    forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,029,138,236✔
855
                                                    pInfo->binfo.inputTsOrder)
856
                         : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &nextWin, startPos);
2,147,483,647✔
857
    if (forwardRows == 0) continue;
2,147,483,647✔
858

859
    // null data, failed to allocate more memory buffer
860
    int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,147,483,647✔
861
                                          pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
862
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
2,147,483,647✔
863
      T_LONG_JMP(pTaskInfo->env, code);
×
864
    }
865

866
    // window start(end) key interpolation
867
    code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
2,147,483,647✔
868
    if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
869
      T_LONG_JMP(pTaskInfo->env, code);
×
870
    }
871
    // TODO: add to open window? how to close the open windows after input blocks exhausted?
872
#if 0
873
    if ((ascScan && ekey <= pBlock->info.window.ekey) ||
874
        (!ascScan && ekey >= pBlock->info.window.skey)) {
875
      // window start(end) key interpolation
876
      doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
877
    } else if (pInfo->timeWindowInterpo) {
878
      addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
879
    }
880
#endif
881
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
2,147,483,647✔
882
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,147,483,647✔
883
                                          pBlock->info.rows, numOfOutput);
2,147,483,647✔
884
    if (ret != TSDB_CODE_SUCCESS) {
2,147,483,647✔
885
      T_LONG_JMP(pTaskInfo->env, ret);
×
886
    }
887
    doCloseWindow(pResultRowInfo, pInfo, pResult);
2,147,483,647✔
888
  }
889

890
  if (pInfo->timeWindowInterpo) {
104,928,207✔
891
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
16,165✔
892
  }
893
  return false;
99,870,099✔
894
}
895

896
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
2,147,483,647✔
897
  // current result is done in computing final results.
898
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
2,147,483,647✔
899
    closeResultRow(pResult);
5,161,661✔
900
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
5,161,661✔
901
    taosMemoryFree(pNode);
5,161,661✔
902
  }
903
}
2,147,483,647✔
904

905
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
16,165✔
906
                                       SExecTaskInfo* pTaskInfo) {
907
  int32_t         code = TSDB_CODE_SUCCESS;
16,165✔
908
  int32_t         lino = 0;
16,165✔
909
  SOpenWindowInfo openWin = {0};
16,165✔
910
  openWin.pos.pageId = pResult->pageId;
16,165✔
911
  openWin.pos.offset = pResult->offset;
16,165✔
912
  openWin.groupId = groupId;
16,165✔
913
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
16,165✔
914
  if (pn == NULL) {
16,165✔
915
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
16,165✔
916
    QUERY_CHECK_CODE(code, lino, _end);
16,165✔
917
    return openWin.pos;
16,165✔
918
  }
919

920
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
×
921
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
×
922
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
923
    QUERY_CHECK_CODE(code, lino, _end);
×
924
  }
925

926
_end:
×
927
  if (code != TSDB_CODE_SUCCESS) {
×
928
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
929
    pTaskInfo->code = code;
×
930
    T_LONG_JMP(pTaskInfo->env, code);
×
931
  }
932
  return openWin.pos;
×
933
}
934

935
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
104,603,962✔
936
  TSKEY* tsCols = NULL;
104,603,962✔
937

938
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
104,603,962✔
939
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
104,591,905✔
940
    if (!pColDataInfo) {
104,513,208✔
941
      pTaskInfo->code = terrno;
×
942
      T_LONG_JMP(pTaskInfo->env, terrno);
×
943
    }
944

945
    tsCols = (int64_t*)pColDataInfo->pData;
104,513,208✔
946
    if (tsCols[0] == 0) {
104,547,111✔
947
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
370✔
948
            tsCols[pBlock->info.rows - 1]);
949
    }
950

951
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
104,579,035✔
952
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
5,518,597✔
953
      if (code != TSDB_CODE_SUCCESS) {
5,521,106✔
954
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
955
        pTaskInfo->code = code;
×
956
        T_LONG_JMP(pTaskInfo->env, code);
×
957
      }
958
    }
959
  }
960

961
  return tsCols;
104,554,011✔
962
}
963

964
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
27,074,281✔
965
  if (OPTR_IS_OPENED(pOperator)) {
27,074,281✔
966
    return TSDB_CODE_SUCCESS;
16,592,530✔
967
  }
968

969
  int32_t        code = TSDB_CODE_SUCCESS;
10,483,215✔
970
  int32_t        lino = 0;
10,483,215✔
971
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
10,483,215✔
972
  SOperatorInfo* downstream = pOperator->pDownstream[0];
10,463,534✔
973

974
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
10,442,752✔
975
  SExprSupp*                pSup = &pOperator->exprSupp;
10,446,622✔
976

977
  int32_t scanFlag = MAIN_SCAN;
10,443,614✔
978
  int64_t st = taosGetTimestampUs();
10,447,887✔
979

980
  pInfo->cleanGroupResInfo = false;
10,447,887✔
981
  while (1) {
100,557,084✔
982
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
111,004,722✔
983
    if (pBlock == NULL) {
111,034,585✔
984
      break;
9,955,219✔
985
    }
986

987
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
101,079,366✔
988

989
    if (pInfo->scalarSupp.pExprInfo != NULL) {
101,099,194✔
990
      SExprSupp* pExprSup = &pInfo->scalarSupp;
11,795,212✔
991
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
11,794,465✔
992
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
11,795,212✔
993
      QUERY_CHECK_CODE(code, lino, _end);
11,790,580✔
994
    }
995

996
    // the pDataBlock are always the same one, no need to call this again
997
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
100,544,887✔
998
    QUERY_CHECK_CODE(code, lino, _end);
100,553,167✔
999
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
100,553,167✔
1000
  }
1001

1002
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
9,968,999✔
1003
  QUERY_CHECK_CODE(code, lino, _end);
9,968,660✔
1004
  pInfo->cleanGroupResInfo = true;
9,968,660✔
1005

1006
  OPTR_SET_OPENED(pOperator);
9,968,411✔
1007

1008
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
9,968,233✔
1009

1010
_end:
10,501,323✔
1011
  if (code != TSDB_CODE_SUCCESS) {
10,501,323✔
1012
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
531,378✔
1013
    pTaskInfo->code = code;
531,378✔
1014
    T_LONG_JMP(pTaskInfo->env, code);
531,378✔
1015
  }
1016
  return code;
9,969,945✔
1017
}
1018

1019
// start a new state window and record the start info
1020
void doKeepNewStateWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList,
7,936,005✔
1021
  int32_t rowIndex, uint64_t groupId, const EStateWinExtendOption* extendOption, bool hasPrevWin) {
1022
  pRowSup->groupId = groupId;
7,936,005✔
1023
  if (*extendOption == STATE_WIN_EXTEND_OPTION_DEFAULT ||
7,935,516✔
1024
      *extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
83,784✔
1025
    pRowSup->win.skey = tsList[rowIndex];
7,886,813✔
1026
    pRowSup->startRowIndex = rowIndex;
7,886,324✔
1027
    pRowSup->numOfRows = 0;  // does not include the current row yet
7,886,813✔
1028
  } else {
1029
    pRowSup->startRowIndex = hasContinuousNullRows(pRowSup) ?
98,384✔
1030
      rowIndex - pRowSup->numNullRows : rowIndex;
49,192✔
1031
    pRowSup->win.skey = hasPrevWin ?
49,192✔
1032
                        pRowSup->win.ekey + 1 : tsList[pRowSup->startRowIndex];
49,192✔
1033
    pRowSup->numOfRows = rowIndex - pRowSup->startRowIndex;
49,192✔
1034
  }
1035
  resetNumNullRows(pRowSup);
7,936,005✔
1036
}
7,936,005✔
1037

1038
// close a state window and record its end info
1039
// this functions is called when a new state row appears
1040
// @param rowIndex the index of the first row of next window
1041
void doKeepCurStateWindowEndInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, 
8,046,015✔
1042
                                 int32_t rowIndex,
1043
                                 const EStateWinExtendOption* extendOption,
1044
                                 bool hasNextWin) {
1045
  if (*extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
8,046,015✔
1046
      pRowSup->win.ekey = hasNextWin?
40,432✔
1047
                          tsList[rowIndex] - 1 : pRowSup->prevTs;
40,432✔
1048
      // continuous rows having null state col should be included in this window
1049
      pRowSup->numOfRows += hasContinuousNullRows(pRowSup) ?
80,864✔
1050
        pRowSup->numNullRows : 0;
40,432✔
1051
      resetNumNullRows(pRowSup);
40,432✔
1052
  }
1053
}
8,046,015✔
1054

1055
void doKeepStateWindowNullInfo(SWindowRowsSup* pRowSup, TSKEY nullRowTs) {
19,178,564✔
1056
  pRowSup->numNullRows += 1;
19,178,564✔
1057
  pRowSup->prevTs = nullRowTs;
19,178,564✔
1058
}
19,178,564✔
1059

1060
// process a closed state window
1061
// do aggregation on the tuples within the window
1062
// partial aggregation results are stored in the output buffer
1063
static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo,
8,046,504✔
1064
  SWindowRowsSup* pRowSup, SExecTaskInfo* pTaskInfo,
1065
  SExprSupp* pSup, int32_t numOfOutput) {
1066
  int32_t     code = 0;
8,046,504✔
1067
  int32_t     lino = 0;
8,046,504✔
1068
  SResultRow* pResult = NULL;
8,046,504✔
1069
  if (pRowSup->numOfRows == 0) {
8,046,504✔
1070
    // no valid rows within the window
1071
    return TSDB_CODE_SUCCESS;
149,465✔
1072
  }
1073
  code = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win,
15,794,078✔
1074
    true, &pResult, pRowSup->groupId, pSup->pCtx, numOfOutput,
7,896,550✔
1075
    pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1076
  QUERY_CHECK_CODE(code, lino, _return);
7,897,039✔
1077

1078
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
7,897,039✔
1079
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx,
7,896,550✔
1080
    &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1081
    pRowSup->numOfRows, 0, numOfOutput);
1082
  QUERY_CHECK_CODE(code, lino, _return);
7,896,550✔
1083

1084
_return:
7,896,550✔
1085
  if (code != TSDB_CODE_SUCCESS) {
7,896,550✔
1086
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
412✔
1087
  }
1088
  return code;
7,897,039✔
1089
}
1090

1091
// process a data block for state window aggregation
1092
// scan from startIndex to endIndex
1093
// numPartialCalcRows returns the number of rows that have been
1094
// partially calculated within the block
1095
static void doStateWindowAggImpl(SOperatorInfo* pOperator,
1,050,835✔
1096
                                 SStateWindowOperatorInfo* pInfo,
1097
                                 SSDataBlock* pBlock, int32_t* startIndex,
1098
                                 int32_t* endIndex,
1099
                                 int32_t* numPartialCalcRows) {
1100
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,050,835✔
1101
  SExprSupp*     pExprSup = &pOperator->exprSupp;
1,050,835✔
1102

1103
  SColumnInfoData* pStateColInfoData = 
1104
    taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
1,050,835✔
1105
  if (!pStateColInfoData) {
1,050,346✔
1106
    pTaskInfo->code = terrno;
×
1107
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1108
  }
1109
  uint64_t gid = pBlock->info.id.groupId;
1,050,346✔
1110
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
1,050,346✔
1111
  int32_t bytes = pStateColInfoData->info.bytes;
1,050,835✔
1112

1113
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock,
1,050,346✔
1114
                                               pInfo->tsSlotId);
1,050,835✔
1115
  if (NULL == pColInfoData) {
1,050,346✔
1116
    pTaskInfo->code = terrno;
×
1117
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1118
  }
1119
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
1,050,346✔
1120

1121
  struct SColumnDataAgg* pAgg = (pBlock->pBlockAgg != NULL) ?
1,050,835✔
1122
                                &pBlock->pBlockAgg[pInfo->stateCol.slotId] :
1,050,835✔
1123
                                NULL;
1124
  EStateWinExtendOption  extendOption = pInfo->extendOption;
1,050,835✔
1125
  SWindowRowsSup*        pRowSup = &pInfo->winSup;
1,050,346✔
1126

1127
  if (pRowSup->groupId != gid) {
1,050,346✔
1128
    /*
1129
      group changed, process the previous group's unclosed state window first
1130
    */
1131
    doKeepCurStateWindowEndInfo(pRowSup, tsList, 0, &extendOption, false);
146,545✔
1132
    int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
146,545✔
1133
                                            pExprSup, numOfOutput);
1134
    if (TSDB_CODE_SUCCESS != code) T_LONG_JMP(pTaskInfo->env, code);
146,545✔
1135
    *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
146,545✔
1136

1137
    /*
1138
      unhandled null rows should be ignored, since they belong to previous group
1139
    */
1140
    *numPartialCalcRows += pRowSup->numNullRows;
146,545✔
1141

1142
    /*
1143
      reset state window info for new group
1144
    */
1145
    pInfo->hasKey = false;
146,545✔
1146
    resetWindowRowsSup(pRowSup);
146,056✔
1147
  }
1148

1149
  for (int32_t j = *startIndex; j < *endIndex; ++j) {
85,906,724✔
1150
    if (pBlock->info.scanFlag != PRE_SCAN) {
84,910,993✔
1151
      if (pInfo->winSup.lastTs == INT64_MIN || gid != pRowSup->groupId || !pInfo->hasKey) {
84,853,093✔
1152
        pInfo->winSup.lastTs = tsList[j];
5,086,044✔
1153
      } else {
1154
        if (tsList[j] == pInfo->winSup.lastTs) {
79,767,049✔
1155
          // forbid duplicated ts rows
1156
          qError("%s:%d duplicated ts found in state window aggregation", __FILE__, __LINE__);
54,615✔
1157
          pTaskInfo->code = TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP;
54,615✔
1158
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP);
54,615✔
1159
        } else {
1160
          pInfo->winSup.lastTs = tsList[j];
79,712,434✔
1161
        }
1162
      }
1163
    }
1164
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
169,713,245✔
1165
      doKeepStateWindowNullInfo(pRowSup, tsList[j]);
19,178,564✔
1166
      continue;
19,178,564✔
1167
    }
1168
    if (pStateColInfoData->pData == NULL) {
65,677,814✔
1169
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1170
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1171
    }
1172
    char* val = colDataGetData(pStateColInfoData, j);
65,677,325✔
1173

1174
    if (!pInfo->hasKey) {
65,677,325✔
1175
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
1,018,953✔
1176
      pInfo->hasKey = true;
1,019,442✔
1177
      doKeepNewStateWindowStartInfo(
1,019,442✔
1178
        pRowSup, tsList, j, gid, &extendOption, false);
1179
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,019,442✔
1180
    } else if (!compareVal(val, &pInfo->stateKey)) {
64,658,372✔
1181
      doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption, true);
6,916,563✔
1182
      int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
6,916,074✔
1183
                                              pExprSup, numOfOutput);
1184
      if (TSDB_CODE_SUCCESS != code) {
6,916,074✔
1185
        T_LONG_JMP(pTaskInfo->env, code);
×
1186
      }
1187
      *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
6,916,074✔
1188

1189
      doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid,
6,916,563✔
1190
                                    &extendOption, true);
1191
      doKeepTuple(pRowSup, tsList[j], j, gid);
6,916,563✔
1192
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
6,916,563✔
1193
    } else {
1194
      doKeepTuple(pRowSup, tsList[j], j, gid);
57,741,809✔
1195
    }
1196
  }
1197

1198
  if (!pInfo->hasKey && extendOption != STATE_WIN_EXTEND_OPTION_FORWARD) {
996,220✔
1199
    /*
1200
      No valid state rows within the block and we don't care about
1201
      null rows before valid state window, mark them as processed and drop them
1202
    */
1203
    *numPartialCalcRows = pBlock->info.rows;
10,269✔
1204
    return;
10,269✔
1205
  }
1206
  if (pRowSup->numOfRows == 0 && 
985,951✔
1207
      extendOption != STATE_WIN_EXTEND_OPTION_BACKWARD) {
3,650✔
1208
    /*
1209
      If no valid state window or we don't know the belonging of
1210
      these null rows, return and handle them with next block
1211
    */
1212
    return;
2,555✔
1213
  }
1214
  doKeepCurStateWindowEndInfo(pRowSup, tsList, *endIndex, &extendOption, false);
983,396✔
1215
  int32_t code = processClosedStateWindow(pInfo, pRowSup, pTaskInfo,
983,396✔
1216
                                          pExprSup, numOfOutput);
1217
  if (TSDB_CODE_SUCCESS != code) {
983,396✔
1218
    pTaskInfo->code = code;
412✔
1219
    T_LONG_JMP(pTaskInfo->env, code);
412✔
1220
  }
1221
  *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
982,984✔
1222
  // reset part of pRowSup after doing agg calculation
1223
  pRowSup->startRowIndex = 0;
982,984✔
1224
  pRowSup->numOfRows = 0;
982,984✔
1225
}
1226

1227
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
1,894,791✔
1228
  if (OPTR_IS_OPENED(pOperator)) {
1,894,791✔
1229
    return TSDB_CODE_SUCCESS;
72,215✔
1230
  }
1231

1232
  int32_t                   code = TSDB_CODE_SUCCESS;
1,822,576✔
1233
  int32_t                   lino = 0;
1,822,576✔
1234
  SStateWindowOperatorInfo* pInfo = pOperator->info;
1,822,576✔
1235
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1,822,576✔
1236

1237
  SExprSupp* pSup = &pOperator->exprSupp;
1,822,576✔
1238
  int32_t    order = pInfo->binfo.inputTsOrder;
1,822,576✔
1239
  int64_t    st = taosGetTimestampUs();
1,822,576✔
1240

1241
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,822,576✔
1242
  pInfo->cleanGroupResInfo = false;
1,822,576✔
1243

1244
  SSDataBlock* pUnfinishedBlock = NULL;
1,822,576✔
1245
  int32_t      startIndex = 0;
1,822,576✔
1246
  int32_t      endIndex = 0;
1,822,576✔
1247
  int32_t      numPartialCalcRows = 0;
1,822,576✔
1248
  while (1) {
995,808✔
1249
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
2,818,384✔
1250
    if (pBlock == NULL) {
2,817,310✔
1251
      if (pUnfinishedBlock != NULL) {
1,411,527✔
1252
        blockDataDestroy(pUnfinishedBlock);
11,217✔
1253
        pUnfinishedBlock = NULL;
11,217✔
1254
        resetWindowRowsSup(&pInfo->winSup);
11,217✔
1255
      }
1256
      break;
1,411,527✔
1257
    }
1258
    
1259
    // mark whether pUnfinishedBlock is a reference to pBlock
1260
    bool isRef = false;
1,405,783✔
1261
    startIndex = 0;
1,405,783✔
1262
    if (pUnfinishedBlock != NULL) {
1,405,783✔
1263
      startIndex = pUnfinishedBlock->info.rows;
5,840✔
1264
      // merge unfinished block with current block
1265
      code = blockDataMerge(pUnfinishedBlock, pBlock);
5,840✔
1266
      // reset id to current block id
1267
      pUnfinishedBlock->info.id = pBlock->info.id;
5,840✔
1268
      QUERY_CHECK_CODE(code, lino, _end);
5,840✔
1269
    } else {
1270
      pUnfinishedBlock = pBlock;
1,399,943✔
1271
      isRef = true;
1,399,943✔
1272
    }
1273
    endIndex = pUnfinishedBlock->info.rows;
1,405,783✔
1274

1275
    pInfo->binfo.pRes->info.scanFlag = pUnfinishedBlock->info.scanFlag;
1,405,783✔
1276
    code = setInputDataBlock(
1,405,783✔
1277
      pSup, pUnfinishedBlock, order, pUnfinishedBlock->info.scanFlag, true);
1,405,783✔
1278
    QUERY_CHECK_CODE(code, lino, _end);
1,405,783✔
1279

1280
    code = blockDataUpdateTsWindow(pUnfinishedBlock, pInfo->tsSlotId);
1,405,783✔
1281
    QUERY_CHECK_CODE(code, lino, _end);
1,405,783✔
1282

1283
    // there is an scalar expression that 
1284
    // needs to be calculated right before apply the group aggregation.
1285
    if (pInfo->scalarSup.pExprInfo != NULL) {
1,405,783✔
1286
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo,
1,172,981✔
1287
        pUnfinishedBlock, pUnfinishedBlock, pInfo->scalarSup.pCtx,
1288
        pInfo->scalarSup.numOfExprs, NULL,
1289
        GET_STM_RTINFO(pOperator->pTaskInfo));
1,172,981✔
1290
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
1,172,981✔
1291
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
354,948✔
1292
      }
1293
    }
1294

1295
    doStateWindowAggImpl(pOperator, pInfo, pUnfinishedBlock, 
1,050,835✔
1296
      &startIndex, &endIndex, &numPartialCalcRows);
1297
    if (numPartialCalcRows < pUnfinishedBlock->info.rows) {
995,808✔
1298
      // save unfinished block for next round processing
1299
      if (isRef) {
17,057✔
1300
        code = createOneDataBlock(pBlock, true, &pUnfinishedBlock);
14,502✔
1301
        QUERY_CHECK_CODE(code, lino, _end);
14,502✔
1302
      }
1303
      code = blockDataTrimFirstRows(pUnfinishedBlock, numPartialCalcRows);
17,057✔
1304
      QUERY_CHECK_NULL(pUnfinishedBlock, code, lino, _end, terrno);
17,057✔
1305
    } else {
1306
      if (!isRef) {
978,751✔
1307
        blockDataDestroy(pUnfinishedBlock);
3,285✔
1308
      }
1309
      pUnfinishedBlock = NULL;
978,751✔
1310
    }
1311
    numPartialCalcRows = 0;
995,808✔
1312
  }
1313

1314
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1,411,527✔
1315
  code = initGroupedResultInfo(
1,411,527✔
1316
    &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1317
  QUERY_CHECK_CODE(code, lino, _end);
1,411,527✔
1318
  pInfo->cleanGroupResInfo = true;
1,411,527✔
1319
  pOperator->status = OP_RES_TO_RETURN;
1,411,527✔
1320

1321
_end:
1,411,527✔
1322
  if (code != TSDB_CODE_SUCCESS) {
1,411,527✔
1323
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1324
    pTaskInfo->code = code;
×
1325
    T_LONG_JMP(pTaskInfo->env, code);
×
1326
  }
1327
  return code;
1,411,527✔
1328
}
1329

1330
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,736,339✔
1331
  if (pOperator->status == OP_EXEC_DONE) {
2,736,339✔
1332
    (*ppRes) = NULL;
841,548✔
1333
    return TSDB_CODE_SUCCESS;
841,548✔
1334
  }
1335

1336
  int32_t                   code = TSDB_CODE_SUCCESS;
1,894,791✔
1337
  int32_t                   lino = 0;
1,894,791✔
1338
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1,894,791✔
1339
  SStateWindowOperatorInfo* pInfo = pOperator->info;
1,894,791✔
1340
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
1,894,791✔
1341

1342
  code = pOperator->fpSet._openFn(pOperator);
1,894,791✔
1343
  QUERY_CHECK_CODE(code, lino, _end);
1,483,742✔
1344

1345
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
1,483,742✔
1346
  QUERY_CHECK_CODE(code, lino, _end);
1,483,742✔
1347

1348
  while (1) {
×
1349
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,483,742✔
1350
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,483,742✔
1351
    QUERY_CHECK_CODE(code, lino, _end);
1,483,742✔
1352

1353
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,483,742✔
1354
    if (!hasRemain) {
1,483,742✔
1355
      setOperatorCompleted(pOperator);
1,411,527✔
1356
      break;
1,411,527✔
1357
    }
1358

1359
    if (pBInfo->pRes->info.rows > 0) {
72,215✔
1360
      break;
72,215✔
1361
    }
1362
  }
1363

1364
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
1,483,742✔
1365

1366
_end:
1,483,742✔
1367
  if (code != TSDB_CODE_SUCCESS) {
1,483,742✔
1368
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1369
    pTaskInfo->code = code;
×
1370
    T_LONG_JMP(pTaskInfo->env, code);
×
1371
  }
1372
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1,483,742✔
1373
  return code;
1,483,742✔
1374
}
1375

1376
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
33,728,386✔
1377
  int32_t                   code = TSDB_CODE_SUCCESS;
33,728,386✔
1378
  int32_t                   lino = 0;
33,728,386✔
1379
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
33,728,386✔
1380
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
33,738,097✔
1381

1382
  if (pOperator->status == OP_EXEC_DONE) {
33,715,687✔
1383
    (*ppRes) = NULL;
6,654,233✔
1384
    return code;
6,654,482✔
1385
  }
1386

1387
  SSDataBlock* pBlock = pInfo->binfo.pRes;
27,010,756✔
1388
  code = pOperator->fpSet._openFn(pOperator);
27,047,130✔
1389
  QUERY_CHECK_CODE(code, lino, _end);
26,560,994✔
1390

1391
  while (1) {
5,550✔
1392
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
26,566,544✔
1393
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
26,566,664✔
1394
    QUERY_CHECK_CODE(code, lino, _end);
26,565,419✔
1395

1396
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
26,565,419✔
1397
    if (!hasRemain) {
26,566,650✔
1398
      setOperatorCompleted(pOperator);
9,958,640✔
1399
      break;
9,960,632✔
1400
    }
1401

1402
    if (pBlock->info.rows > 0) {
16,608,010✔
1403
      break;
16,602,460✔
1404
    }
1405
  }
1406

1407
  size_t rows = pBlock->info.rows;
26,563,092✔
1408
  pOperator->resultInfo.totalRows += rows;
26,562,226✔
1409

1410
_end:
26,560,367✔
1411
  if (code != TSDB_CODE_SUCCESS) {
26,560,367✔
1412
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1413
    pTaskInfo->code = code;
×
1414
    T_LONG_JMP(pTaskInfo->env, code);
×
1415
  }
1416
  (*ppRes) = (rows == 0) ? NULL : pBlock;
26,560,367✔
1417
  return code;
26,561,977✔
1418
}
1419

1420
static void destroyStateWindowOperatorInfo(void* param) {
1,772,976✔
1421
  if (param == NULL) {
1,772,976✔
1422
    return;
×
1423
  }
1424
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
1,772,976✔
1425
  cleanupBasicInfo(&pInfo->binfo);
1,772,976✔
1426
  taosMemoryFreeClear(pInfo->stateKey.pData);
1,772,976✔
1427
  if (pInfo->pOperator) {
1,772,976✔
1428
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,772,976✔
1429
                      pInfo->cleanGroupResInfo);
1,772,976✔
1430
    pInfo->pOperator = NULL;
1,772,976✔
1431
  }
1432

1433
  cleanupExprSupp(&pInfo->scalarSup);
1,772,976✔
1434
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,772,976✔
1435
  cleanupAggSup(&pInfo->aggSup);
1,772,976✔
1436
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,772,976✔
1437

1438
  taosMemoryFreeClear(param);
1,772,976✔
1439
}
1440

1441
static void freeItem(void* param) {
30,294✔
1442
  SGroupKeys* pKey = (SGroupKeys*)param;
30,294✔
1443
  taosMemoryFree(pKey->pData);
30,294✔
1444
}
30,294✔
1445

1446
void destroyIntervalOperatorInfo(void* param) {
12,123,909✔
1447
  if (param == NULL) {
12,123,909✔
1448
    return;
×
1449
  }
1450

1451
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
12,123,909✔
1452

1453
  cleanupBasicInfo(&pInfo->binfo);
12,123,909✔
1454
  if (pInfo->pOperator) {
12,123,411✔
1455
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
11,640,446✔
1456
                      pInfo->cleanGroupResInfo);
11,642,578✔
1457
    pInfo->pOperator = NULL;
11,641,940✔
1458
  }
1459

1460
  cleanupAggSup(&pInfo->aggSup);
12,122,514✔
1461
  cleanupExprSupp(&pInfo->scalarSupp);
12,121,160✔
1462

1463
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
12,123,660✔
1464

1465
  taosArrayDestroy(pInfo->pInterpCols);
12,122,654✔
1466
  pInfo->pInterpCols = NULL;
12,119,646✔
1467

1468
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
12,119,656✔
1469
  pInfo->pPrevValues = NULL;
12,117,913✔
1470

1471
  cleanupGroupResInfo(&pInfo->groupResInfo);
12,118,411✔
1472
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
12,117,684✔
1473
  destroyBoundedQueue(pInfo->pBQ);
12,120,174✔
1474
  taosMemoryFreeClear(param);
12,117,176✔
1475
}
1476

1477
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
15,147✔
1478
  int32_t code = TSDB_CODE_SUCCESS;
15,147✔
1479
  int32_t lino = 0;
15,147✔
1480
  void*   tmp = NULL;
15,147✔
1481

1482
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
15,147✔
1483
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
15,147✔
1484

1485
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
15,147✔
1486
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
15,147✔
1487

1488
  {  // ts column
1489
    SColumn c = {0};
15,147✔
1490
    c.colId = 1;
15,147✔
1491
    c.slotId = pInfo->primaryTsIndex;
15,147✔
1492
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
15,147✔
1493
    c.bytes = sizeof(int64_t);
15,147✔
1494
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
15,147✔
1495
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,147✔
1496

1497
    SGroupKeys key;
15,147✔
1498
    key.bytes = c.bytes;
15,147✔
1499
    key.type = c.type;
15,147✔
1500
    key.isNull = true;  // to denote no value is assigned yet
15,147✔
1501
    key.pData = taosMemoryCalloc(1, c.bytes);
15,147✔
1502
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
15,147✔
1503

1504
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
15,147✔
1505
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,147✔
1506
  }
1507
_end:
15,147✔
1508
  if (code != TSDB_CODE_SUCCESS) {
15,147✔
1509
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1510
  }
1511
  return code;
15,147✔
1512
}
1513

1514
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
11,620,237✔
1515
                                      bool* pRes) {
1516
  // the primary timestamp column
1517
  bool    needed = false;
11,620,237✔
1518
  int32_t code = TSDB_CODE_SUCCESS;
11,620,237✔
1519
  int32_t lino = 0;
11,620,237✔
1520
  void*   tmp = NULL;
11,620,237✔
1521

1522
  for (int32_t i = 0; i < numOfCols; ++i) {
36,259,186✔
1523
    SExprInfo* pExpr = pCtx[i].pExpr;
24,714,213✔
1524
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
24,702,928✔
1525
      needed = true;
15,147✔
1526
      break;
15,147✔
1527
    }
1528
  }
1529

1530
  if (needed) {
11,560,120✔
1531
    code = initWindowInterpPrevVal(pInfo);
15,147✔
1532
    QUERY_CHECK_CODE(code, lino, _end);
15,147✔
1533
  }
1534

1535
  for (int32_t i = 0; i < numOfCols; ++i) {
36,193,230✔
1536
    SExprInfo* pExpr = pCtx[i].pExpr;
24,663,042✔
1537

1538
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
24,671,547✔
1539
      SFunctParam* pParam = &pExpr->base.pParam[0];
15,147✔
1540

1541
      SColumn c = *pParam->pCol;
15,147✔
1542
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
15,147✔
1543
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,147✔
1544

1545
      SGroupKeys key = {0};
15,147✔
1546
      key.bytes = c.bytes;
15,147✔
1547
      key.type = c.type;
15,147✔
1548
      key.isNull = false;
15,147✔
1549
      key.pData = taosMemoryCalloc(1, c.bytes);
15,147✔
1550
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
15,147✔
1551

1552
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
15,147✔
1553
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,147✔
1554
    }
1555
  }
1556

1557
_end:
11,530,188✔
1558
  if (code != TSDB_CODE_SUCCESS) {
11,547,928✔
1559
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1560
  }
1561
  *pRes = needed;
11,547,928✔
1562
  return code;
11,551,247✔
1563
}
1564

1565
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
1,368✔
1566
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
1,368✔
1567
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
1,368✔
1568
  pOper->status = OP_NOT_OPENED;
1,368✔
1569

1570
  resetBasicOperatorState(&pIntervalInfo->binfo);
1,368✔
1571
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
1,368✔
1572
    pIntervalInfo->cleanGroupResInfo);
1,368✔
1573

1574
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
1,368✔
1575
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,368✔
1576
  if (code == 0) {
1,368✔
1577
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
2,736✔
1578
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,368✔
1579
                       &pTaskInfo->storageAPI.functionStore);
1580
  }
1581
  if (code == 0) {
1,368✔
1582
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
1,368✔
1583
                         &pTaskInfo->storageAPI.functionStore);
1584
  }
1585

1586
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
1,368✔
1587
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
1588
  }
1589

1590
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
1,368✔
1591
    pIntervalInfo->curGroupId = UINT64_MAX;
×
1592
  }
1593

1594
  pIntervalInfo->cleanGroupResInfo = false;
1,368✔
1595
  pIntervalInfo->handledGroupNum = 0;
1,368✔
1596
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
1,368✔
1597
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
1,368✔
1598

1599
  taosArrayDestroy(pIntervalInfo->pInterpCols);
1,368✔
1600
  pIntervalInfo->pInterpCols = NULL;
1,368✔
1601

1602
  if (pIntervalInfo->pPrevValues != NULL) {
1,368✔
1603
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
1604
    pIntervalInfo->pPrevValues = NULL;
×
1605
    code = initWindowInterpPrevVal(pIntervalInfo);
×
1606
  }
1607

1608
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
1,368✔
1609
  destroyBoundedQueue(pIntervalInfo->pBQ);
1,368✔
1610
  pIntervalInfo->pBQ = NULL;
1,368✔
1611
  return code;
1,368✔
1612
}
1613

1614
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
1,368✔
1615
  SIntervalAggOperatorInfo* pInfo = pOper->info;
1,368✔
1616
  return resetInterval(pOper, pInfo);
1,368✔
1617
}
1618

1619
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
10,997,903✔
1620
                                   SOperatorInfo** pOptrInfo) {
1621
  QRY_PARAM_CHECK(pOptrInfo);
10,997,903✔
1622

1623
  int32_t                   code = TSDB_CODE_SUCCESS;
11,005,492✔
1624
  int32_t                   lino = 0;
11,005,492✔
1625
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
11,005,492✔
1626
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
10,930,529✔
1627
  if (pInfo == NULL || pOperator == NULL) {
10,936,434✔
UNCOV
1628
    code = terrno;
×
1629
    lino = __LINE__;
×
1630
    goto _error;
×
1631
  }
1632

1633
  pOperator->pPhyNode = pPhyNode;
10,940,358✔
1634
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
10,940,692✔
1635
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
11,011,065✔
1636
  initBasicInfo(&pInfo->binfo, pResBlock);
11,011,065✔
1637

1638
  SExprSupp* pSup = &pOperator->exprSupp;
11,002,814✔
1639
  pSup->hasWindowOrGroup = true;
11,000,253✔
1640
  pSup->hasWindow = true;
11,008,531✔
1641

1642
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
10,998,453✔
1643

1644
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
10,998,897✔
1645
  initResultSizeInfo(&pOperator->resultInfo, 512);
10,998,897✔
1646
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
10,995,409✔
1647
  QUERY_CHECK_CODE(code, lino, _error);
10,999,763✔
1648

1649
  int32_t    num = 0;
10,999,763✔
1650
  SExprInfo* pExprInfo = NULL;
11,002,502✔
1651
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
11,003,249✔
1652
  QUERY_CHECK_CODE(code, lino, _error);
11,003,825✔
1653

1654
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
11,003,825✔
1655
                    &pTaskInfo->storageAPI.functionStore);
1656
  QUERY_CHECK_CODE(code, lino, _error);
10,990,395✔
1657

1658
  SInterval interval = {.interval = pPhyNode->interval,
32,909,468✔
1659
                        .sliding = pPhyNode->sliding,
10,967,514✔
1660
                        .intervalUnit = pPhyNode->intervalUnit,
10,994,379✔
1661
                        .slidingUnit = pPhyNode->slidingUnit,
10,956,844✔
1662
                        .offset = pPhyNode->offset,
10,933,519✔
1663
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
10,958,893✔
1664
                        .timeRange = pPhyNode->timeRange};
1665
  calcIntervalAutoOffset(&interval);
10,947,256✔
1666

1667
  STimeWindowAggSupp as = {
10,954,453✔
1668
      .maxTs = INT64_MIN,
1669
  };
1670

1671
  pInfo->win = pTaskInfo->window;
10,954,453✔
1672
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
10,962,742✔
1673
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
10,967,883✔
1674
  pInfo->interval = interval;
10,952,200✔
1675
  pInfo->twAggSup = as;
10,953,711✔
1676
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
10,937,051✔
1677
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
10,948,020✔
1678
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
960,342✔
1679
    pInfo->limited = true;
960,342✔
1680
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
960,342✔
1681
  }
1682
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
10,931,029✔
1683
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
66,778✔
1684
    pInfo->slimited = true;
66,778✔
1685
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
66,746✔
1686
    pInfo->curGroupId = UINT64_MAX;
66,778✔
1687
  }
1688

1689
  if (pPhyNode->window.pExprs != NULL) {
10,921,981✔
1690
    int32_t    numOfScalar = 0;
3,867,265✔
1691
    SExprInfo* pScalarExprInfo = NULL;
3,869,008✔
1692
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
3,859,287✔
1693
    QUERY_CHECK_CODE(code, lino, _error);
3,879,217✔
1694

1695
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
3,398,254✔
1696
    if (code != TSDB_CODE_SUCCESS) {
3,400,744✔
1697
      goto _error;
×
1698
    }
1699
  }
1700

1701
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
10,516,553✔
1702
                            pTaskInfo->pStreamRuntimeInfo);
10,530,408✔
1703
  if (code != TSDB_CODE_SUCCESS) {
10,484,306✔
1704
    goto _error;
×
1705
  }
1706

1707
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
10,484,306✔
1708
  QUERY_CHECK_CODE(code, lino, _error);
10,481,518✔
1709

1710
  pInfo->timeWindowInterpo = false;
10,481,518✔
1711
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
10,474,086✔
1712
  QUERY_CHECK_CODE(code, lino, _error);
10,466,433✔
1713
  if (pInfo->timeWindowInterpo) {
10,466,433✔
1714
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
15,147✔
1715
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
15,147✔
1716
      goto _error;
×
1717
    }
1718
  }
1719

1720
  pInfo->pOperator = pOperator;
10,470,480✔
1721
  pInfo->cleanGroupResInfo = false;
10,459,055✔
1722
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
10,491,972✔
1723
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
10,479,325✔
1724
                  pInfo, pTaskInfo);
1725

1726
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
10,487,312✔
1727
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1728
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
10,512,316✔
1729
  code = appendDownstream(pOperator, &downstream, 1);
10,502,199✔
1730
  if (code != TSDB_CODE_SUCCESS) {
10,483,784✔
1731
    goto _error;
×
1732
  }
1733

1734
  *pOptrInfo = pOperator;
10,483,784✔
1735
  return TSDB_CODE_SUCCESS;
10,479,845✔
1736

1737
_error:
480,963✔
1738
  if (pInfo != NULL) {
480,963✔
1739
    destroyIntervalOperatorInfo(pInfo);
480,963✔
1740
  }
1741

1742
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
480,216✔
1743
  pTaskInfo->code = code;
480,963✔
1744
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
480,963✔
1745
  return code;
480,963✔
1746
}
1747

1748
// todo handle multiple timeline cases. assume no timeline interweaving
1749
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
1,425,316✔
1750
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,425,316✔
1751
  SExprSupp*     pSup = &pOperator->exprSupp;
1,425,316✔
1752

1753
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
1,425,316✔
1754
  if (!pColInfoData) {
1,425,316✔
1755
    pTaskInfo->code = terrno;
×
1756
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1757
  }
1758

1759
  bool    masterScan = true;
1,425,316✔
1760
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
1,425,316✔
1761
  int64_t gid = pBlock->info.id.groupId;
1,425,316✔
1762

1763
  int64_t gap = pInfo->gap;
1,425,316✔
1764

1765
  if (!pInfo->reptScan) {
1,425,316✔
1766
    pInfo->reptScan = true;
1,412,992✔
1767
    pInfo->winSup.prevTs = INT64_MIN;
1,412,992✔
1768
  }
1769

1770
  SWindowRowsSup* pRowSup = &pInfo->winSup;
1,425,316✔
1771
  pRowSup->numOfRows = 0;
1,425,316✔
1772
  pRowSup->startRowIndex = 0;
1,425,316✔
1773

1774
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1775
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
1,425,316✔
1776
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
24,713,550✔
1777
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
23,288,234✔
1778
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
1,417,031✔
1779
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,417,031✔
1780
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
21,871,203✔
1781
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
3,382,092✔
1782
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1783
      doKeepTuple(pRowSup, tsList[j], j, gid);
18,543,271✔
1784
    } else {  // start a new session window
1785
      // start a new session window
1786
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
3,327,932✔
1787
        SResultRow* pResult = NULL;
3,320,011✔
1788

1789
        // keep the time window for the closed time window.
1790
        STimeWindow window = pRowSup->win;
3,320,011✔
1791

1792
        int32_t ret =
1793
            setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
3,320,011✔
1794
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1795
        if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
3,320,011✔
1796
          T_LONG_JMP(pTaskInfo->env, ret);
×
1797
        }
1798

1799
        // pInfo->numOfRows data belong to the current session window
1800
        updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
3,320,011✔
1801
        ret =
1802
            applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
3,320,011✔
1803
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
3,320,011✔
1804
        if (ret != TSDB_CODE_SUCCESS) {
3,320,011✔
1805
          T_LONG_JMP(pTaskInfo->env, ret);
×
1806
        }
1807
      }
1808

1809
      // here we start a new session window
1810
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
3,327,932✔
1811
      doKeepTuple(pRowSup, tsList[j], j, gid);
3,327,932✔
1812
    }
1813
  }
1814

1815
  SResultRow* pResult = NULL;
1,425,316✔
1816
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
1,425,316✔
1817
  int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
1,425,316✔
1818
                                       pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1819
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
1,425,316✔
1820
    T_LONG_JMP(pTaskInfo->env, ret);
×
1821
  }
1822

1823
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
1,425,316✔
1824
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1,425,316✔
1825
                                        pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1,425,316✔
1826
  if (ret != TSDB_CODE_SUCCESS) {
1,425,316✔
1827
    T_LONG_JMP(pTaskInfo->env, ret);
×
1828
  }
1829
}
1,425,316✔
1830

1831
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,101,422✔
1832
  if (pOperator->status == OP_EXEC_DONE) {
3,101,422✔
1833
    (*ppRes) = NULL;
1,001,090✔
1834
    return TSDB_CODE_SUCCESS;
1,001,090✔
1835
  }
1836

1837
  int32_t                  code = TSDB_CODE_SUCCESS;
2,100,332✔
1838
  int32_t                  lino = 0;
2,100,332✔
1839
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
2,100,332✔
1840
  SSessionAggOperatorInfo* pInfo = pOperator->info;
2,100,332✔
1841
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
2,100,332✔
1842
  SExprSupp*               pSup = &pOperator->exprSupp;
2,100,332✔
1843

1844
  if (pOperator->status == OP_RES_TO_RETURN) {
2,100,332✔
1845
    while (1) {
×
1846
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
4,039✔
1847
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
4,039✔
1848
      QUERY_CHECK_CODE(code, lino, _end);
4,039✔
1849

1850
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
4,039✔
1851
      if (!hasRemain) {
4,039✔
1852
        setOperatorCompleted(pOperator);
2,662✔
1853
        break;
2,662✔
1854
      }
1855

1856
      if (pBInfo->pRes->info.rows > 0) {
1,377✔
1857
        break;
1,377✔
1858
      }
1859
    }
1860
    pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
4,039✔
1861
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
4,039✔
1862
    return code;
4,039✔
1863
  }
1864

1865
  int64_t st = taosGetTimestampUs();
2,096,293✔
1866
  int32_t order = pInfo->binfo.inputTsOrder;
2,096,293✔
1867

1868
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2,096,293✔
1869

1870
  pInfo->cleanGroupResInfo = false;
2,096,293✔
1871
  while (1) {
1,425,316✔
1872
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,521,609✔
1873
    if (pBlock == NULL) {
3,521,609✔
1874
      break;
2,096,293✔
1875
    }
1876

1877
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
1,425,316✔
1878
    if (pInfo->scalarSupp.pExprInfo != NULL) {
1,425,316✔
1879
      SExprSupp* pExprSup = &pInfo->scalarSupp;
1,278✔
1880
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
1,278✔
1881
                                   GET_STM_RTINFO(pOperator->pTaskInfo));
1,278✔
1882
      QUERY_CHECK_CODE(code, lino, _end);
1,278✔
1883
    }
1884
    // the pDataBlock are always the same one, no need to call this again
1885
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
1,425,316✔
1886
    QUERY_CHECK_CODE(code, lino, _end);
1,425,316✔
1887

1888
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
1,425,316✔
1889
    QUERY_CHECK_CODE(code, lino, _end);
1,425,316✔
1890

1891
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
1,425,316✔
1892
  }
1893

1894
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2,096,293✔
1895

1896
  // restore the value
1897
  pOperator->status = OP_RES_TO_RETURN;
2,096,293✔
1898

1899
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
2,096,293✔
1900
  QUERY_CHECK_CODE(code, lino, _end);
2,096,293✔
1901
  pInfo->cleanGroupResInfo = true;
2,096,293✔
1902

1903
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
2,096,293✔
1904
  QUERY_CHECK_CODE(code, lino, _end);
2,096,293✔
1905
  while (1) {
×
1906
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
2,096,293✔
1907
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
2,096,293✔
1908
    QUERY_CHECK_CODE(code, lino, _end);
2,096,293✔
1909

1910
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
2,096,293✔
1911
    if (!hasRemain) {
2,096,293✔
1912
      setOperatorCompleted(pOperator);
2,093,631✔
1913
      break;
2,093,631✔
1914
    }
1915

1916
    if (pBInfo->pRes->info.rows > 0) {
2,662✔
1917
      break;
2,662✔
1918
    }
1919
  }
1920
  pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
2,096,293✔
1921

1922
_end:
2,096,293✔
1923
  if (code != TSDB_CODE_SUCCESS) {
2,096,293✔
1924
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1925
    pTaskInfo->code = code;
×
1926
    T_LONG_JMP(pTaskInfo->env, code);
×
1927
  }
1928
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
2,096,293✔
1929
  return code;
2,096,293✔
1930
}
1931

1932
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
53,278✔
1933
  SStateWindowOperatorInfo* pInfo = pOper->info;
53,278✔
1934
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
53,278✔
1935
  SStateWindowPhysiNode* pPhynode = (SStateWindowPhysiNode*)pOper->pPhyNode;
53,278✔
1936
  pOper->status = OP_NOT_OPENED;
53,278✔
1937

1938
  resetBasicOperatorState(&pInfo->binfo);
53,278✔
1939
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
53,278✔
1940
                    pInfo->cleanGroupResInfo);
53,278✔
1941

1942
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
52,920✔
1943
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
53,278✔
1944
  if (code == 0) {
53,278✔
1945
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
106,556✔
1946
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
53,278✔
1947
                       &pTaskInfo->storageAPI.functionStore);
1948
  }
1949
  if (code == 0) {
53,278✔
1950
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
53,278✔
1951
                         &pTaskInfo->storageAPI.functionStore);
1952
  }
1953

1954
  pInfo->cleanGroupResInfo = false;
53,278✔
1955
  pInfo->hasKey = false;
53,278✔
1956
  pInfo->winSup.lastTs = INT64_MIN;
53,278✔
1957
  cleanupGroupResInfo(&pInfo->groupResInfo);
53,278✔
1958
  memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
53,278✔
1959
  return code;
53,278✔
1960
}
1961

1962
// todo make this as an non-blocking operator
1963
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhysiNode* pStateNode,
1,772,976✔
1964
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1965
  QRY_PARAM_CHECK(pOptrInfo);
1,772,976✔
1966

1967
  int32_t                   code = TSDB_CODE_SUCCESS;
1,772,976✔
1968
  int32_t                   lino = 0;
1,772,976✔
1969
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
1,772,976✔
1970
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,772,976✔
1971
  if (pInfo == NULL || pOperator == NULL) {
1,772,976✔
1972
    code = terrno;
×
1973
    goto _error;
×
1974
  }
1975

1976
  pOperator->pPhyNode = pStateNode;
1,772,976✔
1977
  pOperator->exprSupp.hasWindowOrGroup = true;
1,772,976✔
1978
  pOperator->exprSupp.hasWindow = true;
1,772,976✔
1979
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
1,772,976✔
1980
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
1,772,976✔
1981

1982
  if (pStateNode->window.pExprs != NULL) {
1,772,976✔
1983
    int32_t    numOfScalarExpr = 0;
1,587,212✔
1984
    SExprInfo* pScalarExprInfo = NULL;
1,587,212✔
1985
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
1,587,212✔
1986
    QUERY_CHECK_CODE(code, lino, _error);
1,587,212✔
1987

1988
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
1,587,212✔
1989
    if (code != TSDB_CODE_SUCCESS) {
1,587,212✔
1990
      goto _error;
×
1991
    }
1992
  }
1993

1994
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
1,772,976✔
1995
  pInfo->stateKey.type = pInfo->stateCol.type;
1,772,976✔
1996
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
1,772,976✔
1997
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
1,772,976✔
1998
  if (pInfo->stateKey.pData == NULL) {
1,772,976✔
1999
    goto _error;
×
2000
  }
2001
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
1,772,976✔
2002
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
1,772,976✔
2003

2004
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,772,976✔
2005
                            pTaskInfo->pStreamRuntimeInfo);
1,772,976✔
2006
  if (code != TSDB_CODE_SUCCESS) {
1,772,976✔
2007
    goto _error;
×
2008
  }
2009

2010
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,772,976✔
2011

2012
  int32_t    num = 0;
1,772,976✔
2013
  SExprInfo* pExprInfo = NULL;
1,772,976✔
2014
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
1,772,976✔
2015
  QUERY_CHECK_CODE(code, lino, _error);
1,772,976✔
2016

2017
  initResultSizeInfo(&pOperator->resultInfo, 4096);
1,772,976✔
2018

2019
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
3,545,952✔
2020
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
1,772,976✔
2021
  if (code != TSDB_CODE_SUCCESS) {
1,772,976✔
2022
    goto _error;
×
2023
  }
2024

2025
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
1,772,976✔
2026
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,772,976✔
2027
  initBasicInfo(&pInfo->binfo, pResBlock);
1,772,976✔
2028
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,772,976✔
2029

2030
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,772,976✔
2031
  QUERY_CHECK_CODE(code, lino, _error);
1,772,976✔
2032

2033
  pInfo->tsSlotId = tsSlotId;
1,772,976✔
2034
  pInfo->pOperator = pOperator;
1,772,976✔
2035
  pInfo->cleanGroupResInfo = false;
1,772,976✔
2036
  pInfo->extendOption = pStateNode->extendOption;
1,772,976✔
2037
  pInfo->trueForLimit = pStateNode->trueForLimit;
1,772,976✔
2038
  pInfo->winSup.lastTs = INT64_MIN;
1,772,976✔
2039

2040
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
1,772,976✔
2041
                  pTaskInfo);
2042
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
1,772,976✔
2043
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2044
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
1,772,976✔
2045

2046
  code = appendDownstream(pOperator, &downstream, 1);
1,772,976✔
2047
  if (code != TSDB_CODE_SUCCESS) {
1,772,976✔
2048
    goto _error;
×
2049
  }
2050

2051
  *pOptrInfo = pOperator;
1,772,976✔
2052
  return TSDB_CODE_SUCCESS;
1,772,976✔
2053

2054
_error:
×
2055
  if (pInfo != NULL) {
×
2056
    destroyStateWindowOperatorInfo(pInfo);
×
2057
  }
2058

2059
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2060
  pTaskInfo->code = code;
×
2061
  return code;
×
2062
}
2063

2064
void destroySWindowOperatorInfo(void* param) {
2,146,629✔
2065
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
2,146,629✔
2066
  if (pInfo == NULL) {
2,146,629✔
2067
    return;
×
2068
  }
2069

2070
  cleanupBasicInfo(&pInfo->binfo);
2,146,629✔
2071
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
2,146,629✔
2072
  if (pInfo->pOperator) {
2,146,629✔
2073
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
2,146,629✔
2074
                      pInfo->cleanGroupResInfo);
2,146,629✔
2075
    pInfo->pOperator = NULL;
2,146,629✔
2076
  }
2077

2078
  cleanupAggSup(&pInfo->aggSup);
2,146,629✔
2079
  cleanupExprSupp(&pInfo->scalarSupp);
2,146,629✔
2080

2081
  cleanupGroupResInfo(&pInfo->groupResInfo);
2,146,629✔
2082
  taosMemoryFreeClear(param);
2,146,629✔
2083
}
2084

2085
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
1,368✔
2086
  SSessionAggOperatorInfo* pInfo = pOper->info;
1,368✔
2087
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
1,368✔
2088
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
1,368✔
2089
  pOper->status = OP_NOT_OPENED;
1,368✔
2090

2091
  resetBasicOperatorState(&pInfo->binfo);
1,368✔
2092
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,368✔
2093
                    pInfo->cleanGroupResInfo);
1,368✔
2094

2095
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,368✔
2096
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,368✔
2097
  if (code == 0) {
1,368✔
2098
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
2,736✔
2099
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, pTaskInfo->streamInfo.pState,
1,368✔
2100
                       &pTaskInfo->storageAPI.functionStore);
2101
  }
2102
  if (code == 0) {
1,368✔
2103
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
1,368✔
2104
                         &pTaskInfo->storageAPI.functionStore);
2105
  }
2106

2107
  pInfo->cleanGroupResInfo = false;
1,368✔
2108
  pInfo->winSup = (SWindowRowsSup){0};
1,368✔
2109
  pInfo->winSup.prevTs = INT64_MIN;
1,368✔
2110
  pInfo->reptScan = false;
1,368✔
2111

2112
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,368✔
2113
  return code;
1,368✔
2114
}
2115

2116
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
2,146,629✔
2117
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2118
  QRY_PARAM_CHECK(pOptrInfo);
2,146,629✔
2119

2120
  int32_t                  code = TSDB_CODE_SUCCESS;
2,146,629✔
2121
  int32_t                  lino = 0;
2,146,629✔
2122
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
2,146,629✔
2123
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,146,629✔
2124
  if (pInfo == NULL || pOperator == NULL) {
2,146,629✔
2125
    code = terrno;
×
2126
    goto _error;
×
2127
  }
2128

2129
  pOperator->pPhyNode = pSessionNode;
2,146,629✔
2130
  pOperator->exprSupp.hasWindowOrGroup = true;
2,146,629✔
2131
  pOperator->exprSupp.hasWindow = true;
2,146,629✔
2132

2133
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2,146,629✔
2134
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2,146,629✔
2135

2136
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
2,146,629✔
2137
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
2,146,629✔
2138
  initBasicInfo(&pInfo->binfo, pResBlock);
2,146,629✔
2139

2140
  int32_t    numOfCols = 0;
2,146,629✔
2141
  SExprInfo* pExprInfo = NULL;
2,146,629✔
2142
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
2,146,629✔
2143
  QUERY_CHECK_CODE(code, lino, _error);
2,146,629✔
2144

2145
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
4,293,258✔
2146
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
2,146,629✔
2147
  QUERY_CHECK_CODE(code, lino, _error);
2,146,629✔
2148

2149
  pInfo->gap = pSessionNode->gap;
2,146,629✔
2150

2151
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
2,146,629✔
2152
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
2,146,629✔
2153
  QUERY_CHECK_CODE(code, lino, _error);
2,146,629✔
2154

2155
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
2,146,629✔
2156
  pInfo->binfo.pRes = pResBlock;
2,146,629✔
2157
  pInfo->winSup.prevTs = INT64_MIN;
2,146,629✔
2158
  pInfo->reptScan = false;
2,146,629✔
2159
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
2,146,629✔
2160
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
2,146,629✔
2161

2162
  if (pSessionNode->window.pExprs != NULL) {
2,146,629✔
2163
    int32_t    numOfScalar = 0;
426✔
2164
    SExprInfo* pScalarExprInfo = NULL;
426✔
2165
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
426✔
2166
    QUERY_CHECK_CODE(code, lino, _error);
426✔
2167

2168
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
426✔
2169
    QUERY_CHECK_CODE(code, lino, _error);
426✔
2170
  }
2171

2172
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
2,146,629✔
2173
                            pTaskInfo->pStreamRuntimeInfo);
2,146,629✔
2174
  QUERY_CHECK_CODE(code, lino, _error);
2,146,629✔
2175

2176
  pInfo->pOperator = pOperator;
2,146,629✔
2177
  pInfo->cleanGroupResInfo = false;
2,146,629✔
2178
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
2,146,629✔
2179
                  pInfo, pTaskInfo);
2180
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
2,146,629✔
2181
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2182
  pOperator->pTaskInfo = pTaskInfo;
2,146,629✔
2183
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
2,146,629✔
2184

2185
  code = appendDownstream(pOperator, &downstream, 1);
2,146,629✔
2186
  QUERY_CHECK_CODE(code, lino, _error);
2,146,629✔
2187

2188
  *pOptrInfo = pOperator;
2,146,629✔
2189
  return TSDB_CODE_SUCCESS;
2,146,629✔
2190

2191
_error:
×
2192
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
2193
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2194
  pTaskInfo->code = code;
×
2195
  return code;
×
2196
}
2197

2198
void destroyMAIOperatorInfo(void* param) {
1,105,192✔
2199
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
1,105,192✔
2200
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
1,105,192✔
2201
  taosMemoryFreeClear(param);
1,105,192✔
2202
}
1,105,192✔
2203

2204
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
1,019,829✔
2205
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
1,019,829✔
2206
  if (NULL == pResult) {
1,019,829✔
2207
    return pResult;
×
2208
  }
2209
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
1,019,829✔
2210
  return pResult;
1,019,829✔
2211
}
2212

2213
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
847,476,282✔
2214
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
2215
  if (*pResult == NULL) {
847,476,282✔
2216
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
1,019,829✔
2217
    if (*pResult == NULL) {
1,019,829✔
2218
      return terrno;
×
2219
    }
2220
  }
2221

2222
  // set time window for current result
2223
  (*pResult)->win = (*win);
847,476,282✔
2224
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
847,479,612✔
2225
}
2226

2227
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
4,063,029✔
2228
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2229
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
4,063,029✔
2230
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
4,063,029✔
2231

2232
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
4,063,029✔
2233
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
4,063,029✔
2234
  SInterval*     pInterval = &iaInfo->interval;
4,063,029✔
2235

2236
  int32_t  startPos = 0;
4,063,029✔
2237
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
4,063,029✔
2238

2239
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
4,063,029✔
2240

2241
  // there is an result exists
2242
  if (miaInfo->curTs != INT64_MIN) {
4,063,029✔
2243
    if (ts != miaInfo->curTs) {
1,166,424✔
2244
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
1,105,089✔
2245
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,105,089✔
2246
      miaInfo->curTs = ts;
1,105,089✔
2247
    }
2248
  } else {
2249
    miaInfo->curTs = ts;
2,896,605✔
2250
  }
2251

2252
  STimeWindow win = {0};
4,063,029✔
2253
  win.skey = miaInfo->curTs;
4,063,029✔
2254
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
4,063,029✔
2255

2256
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
4,063,029✔
2257
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
4,063,029✔
2258
    T_LONG_JMP(pTaskInfo->env, ret);
×
2259
  }
2260

2261
  int32_t currPos = startPos;
4,063,029✔
2262

2263
  STimeWindow currWin = win;
4,063,029✔
2264
  while (++currPos < pBlock->info.rows) {
1,785,866,679✔
2265
    if (tsCols[currPos] == miaInfo->curTs) {
1,781,766,280✔
2266
      continue;
938,407,787✔
2267
    }
2268

2269
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
843,401,043✔
2270
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
1,686,812,446✔
2271
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
843,404,743✔
2272
    if (ret != TSDB_CODE_SUCCESS) {
843,417,323✔
2273
      T_LONG_JMP(pTaskInfo->env, ret);
×
2274
    }
2275

2276
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
843,417,323✔
2277
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
843,404,003✔
2278
    miaInfo->curTs = tsCols[currPos];
843,397,343✔
2279

2280
    currWin.skey = miaInfo->curTs;
843,402,893✔
2281
    currWin.ekey =
843,409,923✔
2282
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
843,403,263✔
2283

2284
    startPos = currPos;
843,409,923✔
2285
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
843,409,923✔
2286
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
843,400,673✔
2287
      T_LONG_JMP(pTaskInfo->env, ret);
×
2288
    }
2289

2290
    miaInfo->curTs = currWin.skey;
843,399,933✔
2291
  }
2292

2293
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
4,063,029✔
2294
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
8,126,058✔
2295
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
4,063,029✔
2296
  if (ret != TSDB_CODE_SUCCESS) {
4,063,029✔
2297
    T_LONG_JMP(pTaskInfo->env, ret);
×
2298
  }
2299
}
4,063,029✔
2300

2301
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
2,888,324✔
2302
  pRes->info.id.groupId = pMiaInfo->groupId;
2,888,324✔
2303
  pMiaInfo->curTs = INT64_MIN;
2,888,324✔
2304
  pMiaInfo->groupId = 0;
2,888,324✔
2305
}
2,888,324✔
2306

2307
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
3,724,263✔
2308
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
3,724,263✔
2309
  int32_t                               code = TSDB_CODE_SUCCESS;
3,724,263✔
2310
  int32_t                               lino = 0;
3,724,263✔
2311
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
3,724,263✔
2312
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
3,724,263✔
2313

2314
  SExprSupp*      pSup = &pOperator->exprSupp;
3,724,263✔
2315
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
3,724,263✔
2316
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
3,724,263✔
2317
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
3,724,263✔
2318

2319
  while (1) {
3,297,931✔
2320
    SSDataBlock* pBlock = NULL;
7,022,194✔
2321
    if (pMiaInfo->prefetchedBlock == NULL) {
7,022,194✔
2322
      pBlock = getNextBlockFromDownstream(pOperator, 0);
5,145,418✔
2323
    } else {
2324
      pBlock = pMiaInfo->prefetchedBlock;
1,876,776✔
2325
      pMiaInfo->prefetchedBlock = NULL;
1,876,776✔
2326

2327
      pMiaInfo->groupId = pBlock->info.id.groupId;
1,876,776✔
2328
    }
2329

2330
    // no data exists, all query processing is done
2331
    if (pBlock == NULL) {
7,022,194✔
2332
      // close last unclosed time window
2333
      if (pMiaInfo->curTs != INT64_MIN) {
1,082,389✔
2334
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
1,011,548✔
2335
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,011,548✔
2336
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
1,011,548✔
2337
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,011,548✔
2338
        QUERY_CHECK_CODE(code, lino, _end);
1,011,548✔
2339
      }
2340

2341
      setOperatorCompleted(pOperator);
1,082,389✔
2342
      break;
1,082,389✔
2343
    }
2344

2345
    if (pMiaInfo->groupId == 0) {
5,939,805✔
2346
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
1,634,117✔
2347
        pMiaInfo->groupId = pBlock->info.id.groupId;
182,797✔
2348
        pRes->info.id.groupId = pMiaInfo->groupId;
182,797✔
2349
      }
2350
    } else {
2351
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
4,305,688✔
2352
        // if there are unclosed time window, close it firstly.
2353
        if (pMiaInfo->curTs == INT64_MIN) {
1,876,776✔
2354
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2355
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2356
        }
2357
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
1,876,776✔
2358
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,876,776✔
2359

2360
        pMiaInfo->prefetchedBlock = pBlock;
1,876,776✔
2361
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
1,876,776✔
2362
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,876,776✔
2363
        QUERY_CHECK_CODE(code, lino, _end);
1,876,776✔
2364
        if (pRes->info.rows == 0) {
1,876,776✔
2365
          // After filtering for last group, the result is empty, so we need to continue to process next group
2366
          continue;
12,580✔
2367
        } else {
2368
          break;
1,864,196✔
2369
        }
2370
      } else {
2371
        // continue
2372
        pRes->info.id.groupId = pMiaInfo->groupId;
2,428,912✔
2373
      }
2374
    }
2375

2376
    pRes->info.scanFlag = pBlock->info.scanFlag;
4,063,029✔
2377
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
4,063,029✔
2378
    QUERY_CHECK_CODE(code, lino, _end);
4,063,029✔
2379

2380
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
4,063,029✔
2381

2382
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
4,063,029✔
2383
    QUERY_CHECK_CODE(code, lino, _end);
4,063,029✔
2384

2385
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
4,063,029✔
2386
      break;
777,678✔
2387
    }
2388
  }
2389

2390
_end:
3,724,263✔
2391
  if (code != TSDB_CODE_SUCCESS) {
3,724,263✔
2392
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2393
    pTaskInfo->code = code;
×
2394
    T_LONG_JMP(pTaskInfo->env, code);
×
2395
  }
2396
}
3,724,263✔
2397

2398
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
4,087,990✔
2399
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
4,087,990✔
2400
  int32_t                               code = TSDB_CODE_SUCCESS;
4,087,990✔
2401
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
4,087,990✔
2402
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
4,087,990✔
2403
  if (pOperator->status == OP_EXEC_DONE) {
4,087,990✔
2404
    (*ppRes) = NULL;
1,040,695✔
2405
    return code;
1,040,695✔
2406
  }
2407

2408
  SSDataBlock* pRes = iaInfo->binfo.pRes;
3,047,295✔
2409
  blockDataCleanup(pRes);
3,047,295✔
2410

2411
  if (iaInfo->binfo.mergeResultBlock) {
3,047,295✔
2412
    while (1) {
2413
      if (pOperator->status == OP_EXEC_DONE) {
3,707,506✔
2414
        break;
750,539✔
2415
      }
2416

2417
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
2,956,967✔
2418
        break;
764,730✔
2419
      }
2420

2421
      doMergeAlignedIntervalAgg(pOperator);
2,192,237✔
2422
    }
2423
  } else {
2424
    doMergeAlignedIntervalAgg(pOperator);
1,532,026✔
2425
  }
2426

2427
  size_t rows = pRes->info.rows;
3,047,295✔
2428
  pOperator->resultInfo.totalRows += rows;
3,047,295✔
2429
  (*ppRes) = (rows == 0) ? NULL : pRes;
3,047,295✔
2430
  return code;
3,047,295✔
2431
}
2432

2433
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
×
2434
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
×
2435
  
2436
  uint64_t     groupId;  // current groupId
2437
  int64_t      curTs;    // current ts
2438
  SSDataBlock* prefetchedBlock;
2439
  SResultRow*  pResultRow;
2440

2441
  pInfo->groupId = 0;
×
2442
  pInfo->curTs = INT64_MIN;
×
2443
  pInfo->prefetchedBlock = NULL;
×
2444
  pInfo->pResultRow = NULL;
×
2445

2446
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
×
2447
}
2448

2449
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
1,105,192✔
2450
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2451
  QRY_PARAM_CHECK(pOptrInfo);
1,105,192✔
2452

2453
  int32_t                               code = TSDB_CODE_SUCCESS;
1,105,192✔
2454
  int32_t                               lino = 0;
1,105,192✔
2455
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
1,105,192✔
2456
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,105,192✔
2457
  if (miaInfo == NULL || pOperator == NULL) {
1,105,192✔
2458
    code = terrno;
×
2459
    goto _error;
×
2460
  }
2461

2462
  pOperator->pPhyNode = pNode;
1,105,192✔
2463
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
1,105,192✔
2464
  if (miaInfo->intervalAggOperatorInfo == NULL) {
1,105,192✔
2465
    code = terrno;
×
2466
    goto _error;
×
2467
  }
2468

2469
  SInterval interval = {.interval = pNode->interval,
3,315,576✔
2470
                        .sliding = pNode->sliding,
1,105,192✔
2471
                        .intervalUnit = pNode->intervalUnit,
1,105,192✔
2472
                        .slidingUnit = pNode->slidingUnit,
1,105,192✔
2473
                        .offset = pNode->offset,
1,105,192✔
2474
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
1,105,192✔
2475
                        .timeRange = pNode->timeRange};
2476
  calcIntervalAutoOffset(&interval);
1,105,192✔
2477

2478
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
1,105,192✔
2479
  SExprSupp*                pSup = &pOperator->exprSupp;
1,105,192✔
2480
  pSup->hasWindowOrGroup = true;
1,105,192✔
2481
  pSup->hasWindow = true;
1,105,192✔
2482

2483
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,105,192✔
2484
                            pTaskInfo->pStreamRuntimeInfo);
1,105,192✔
2485
  QUERY_CHECK_CODE(code, lino, _error);
1,105,192✔
2486

2487
  miaInfo->curTs = INT64_MIN;
1,105,192✔
2488
  iaInfo->win = pTaskInfo->window;
1,105,192✔
2489
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
1,105,192✔
2490
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
1,105,192✔
2491
  iaInfo->interval = interval;
1,105,192✔
2492
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
1,105,192✔
2493
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
1,105,192✔
2494

2495
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,105,192✔
2496
  initResultSizeInfo(&pOperator->resultInfo, 512);
1,105,192✔
2497

2498
  int32_t    num = 0;
1,105,192✔
2499
  SExprInfo* pExprInfo = NULL;
1,105,192✔
2500
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
1,105,192✔
2501
  QUERY_CHECK_CODE(code, lino, _error);
1,105,192✔
2502

2503
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
2,210,384✔
2504
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
1,105,192✔
2505
  QUERY_CHECK_CODE(code, lino, _error);
1,105,192✔
2506

2507
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
1,105,192✔
2508
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,105,192✔
2509
  initBasicInfo(&iaInfo->binfo, pResBlock);
1,105,192✔
2510
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
1,105,192✔
2511
  QUERY_CHECK_CODE(code, lino, _error);
1,105,192✔
2512

2513
  iaInfo->timeWindowInterpo = false;
1,105,192✔
2514
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
1,105,192✔
2515
  QUERY_CHECK_CODE(code, lino, _error);
1,105,192✔
2516
  if (iaInfo->timeWindowInterpo) {
1,105,192✔
2517
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2518
  }
2519

2520
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
1,105,192✔
2521
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,105,192✔
2522
  QUERY_CHECK_CODE(code, lino, _error);
1,105,192✔
2523
  iaInfo->pOperator = pOperator;
1,105,192✔
2524
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
1,105,192✔
2525
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2526

2527
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
1,105,192✔
2528
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2529
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
1,105,192✔
2530

2531
  code = appendDownstream(pOperator, &downstream, 1);
1,105,192✔
2532
  QUERY_CHECK_CODE(code, lino, _error);
1,105,192✔
2533

2534
  *pOptrInfo = pOperator;
1,105,192✔
2535
  return TSDB_CODE_SUCCESS;
1,105,192✔
2536

2537
_error:
×
2538
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2539
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2540
  pTaskInfo->code = code;
×
2541
  return code;
×
2542
}
2543

2544
//=====================================================================================================================
2545
// merge interval operator
2546
typedef struct SMergeIntervalAggOperatorInfo {
2547
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2548
  SList*                   groupIntervals;
2549
  SListIter                groupIntervalsIter;
2550
  bool                     hasGroupId;
2551
  uint64_t                 groupId;
2552
  SSDataBlock*             prefetchedBlock;
2553
  bool                     inputBlocksFinished;
2554
} SMergeIntervalAggOperatorInfo;
2555

2556
typedef struct SGroupTimeWindow {
2557
  uint64_t    groupId;
2558
  STimeWindow window;
2559
} SGroupTimeWindow;
2560

2561
void destroyMergeIntervalOperatorInfo(void* param) {
×
2562
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2563
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2564
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2565

2566
  taosMemoryFreeClear(param);
×
2567
}
×
2568

2569
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2570
                                        STimeWindow* newWin) {
2571
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2572
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2573
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2574

2575
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2576
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2577
  if (code != TSDB_CODE_SUCCESS) {
×
2578
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2579
    return code;
×
2580
  }
2581

2582
  SListIter iter = {0};
×
2583
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2584
  SListNode* listNode = NULL;
×
2585
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2586
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2587
    if (prevGrpWin->groupId != tableGroupId) {
×
2588
      continue;
×
2589
    }
2590

2591
    STimeWindow* prevWin = &prevGrpWin->window;
×
2592
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2593
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2594
      taosMemoryFreeClear(tmp);
×
2595
    }
2596
  }
2597

2598
  return TSDB_CODE_SUCCESS;
×
2599
}
2600

2601
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2602
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2603
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2604
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2605

2606
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2607
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2608

2609
  int32_t     startPos = 0;
×
2610
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2611
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2612
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2613
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2614
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2615
  SResultRow* pResult = NULL;
×
2616

2617
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2618
                                        iaInfo->binfo.inputTsOrder);
2619

2620
  int32_t ret =
2621
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2622
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2623
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2624
    T_LONG_JMP(pTaskInfo->env, ret);
×
2625
  }
2626

2627
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2628
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2629
                                                 iaInfo->binfo.inputTsOrder);
2630
  if (forwardRows <= 0) {
×
2631
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2632
  }
2633

2634
  // prev time window not interpolation yet.
2635
  if (iaInfo->timeWindowInterpo) {
×
2636
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2637
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2638

2639
    // restore current time window
2640
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2641
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2642
    if (ret != TSDB_CODE_SUCCESS) {
×
2643
      T_LONG_JMP(pTaskInfo->env, ret);
×
2644
    }
2645

2646
    // window start key interpolation
2647
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2648
    if (ret != TSDB_CODE_SUCCESS) {
×
2649
      T_LONG_JMP(pTaskInfo->env, ret);
×
2650
    }
2651
  }
2652

2653
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2654
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2655
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
2656
  if (ret != TSDB_CODE_SUCCESS) {
×
2657
    T_LONG_JMP(pTaskInfo->env, ret);
×
2658
  }
2659
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2660

2661
  // output previous interval results after this interval (&win) is closed
2662
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2663
  if (code != TSDB_CODE_SUCCESS) {
×
2664
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2665
    T_LONG_JMP(pTaskInfo->env, code);
×
2666
  }
2667

2668
  STimeWindow nextWin = win;
×
2669
  while (1) {
×
2670
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2671
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2672
                                      iaInfo->binfo.inputTsOrder);
2673
    if (startPos < 0) {
×
2674
      break;
×
2675
    }
2676

2677
    // null data, failed to allocate more memory buffer
2678
    code =
2679
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2680
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2681
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2682
      T_LONG_JMP(pTaskInfo->env, code);
×
2683
    }
2684

2685
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2686
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2687
                                           iaInfo->binfo.inputTsOrder);
2688

2689
    // window start(end) key interpolation
2690
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2691
    if (code != TSDB_CODE_SUCCESS) {
×
2692
      T_LONG_JMP(pTaskInfo->env, code);
×
2693
    }
2694

2695
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2696
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2697
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2698
    if (code != TSDB_CODE_SUCCESS) {
×
2699
      T_LONG_JMP(pTaskInfo->env, code);
×
2700
    }
2701
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2702

2703
    // output previous interval results after this interval (&nextWin) is closed
2704
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2705
    if (code != TSDB_CODE_SUCCESS) {
×
2706
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2707
      T_LONG_JMP(pTaskInfo->env, code);
×
2708
    }
2709
  }
2710

2711
  if (iaInfo->timeWindowInterpo) {
×
2712
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2713
  }
2714
}
×
2715

2716
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2717
  int32_t        code = TSDB_CODE_SUCCESS;
×
2718
  int32_t        lino = 0;
×
2719
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2720

2721
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2722
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2723
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2724

2725
  if (pOperator->status == OP_EXEC_DONE) {
×
2726
    (*ppRes) = NULL;
×
2727
    return code;
×
2728
  }
2729

2730
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2731
  blockDataCleanup(pRes);
×
2732
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2733
  QUERY_CHECK_CODE(code, lino, _end);
×
2734

2735
  if (!miaInfo->inputBlocksFinished) {
×
2736
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2737
    while (1) {
×
2738
      SSDataBlock* pBlock = NULL;
×
2739
      if (miaInfo->prefetchedBlock == NULL) {
×
2740
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2741
      } else {
2742
        pBlock = miaInfo->prefetchedBlock;
×
2743
        miaInfo->groupId = pBlock->info.id.groupId;
×
2744
        miaInfo->prefetchedBlock = NULL;
×
2745
      }
2746

2747
      if (pBlock == NULL) {
×
2748
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2749
        miaInfo->inputBlocksFinished = true;
×
2750
        break;
×
2751
      }
2752

2753
      if (!miaInfo->hasGroupId) {
×
2754
        miaInfo->hasGroupId = true;
×
2755
        miaInfo->groupId = pBlock->info.id.groupId;
×
2756
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2757
        miaInfo->prefetchedBlock = pBlock;
×
2758
        break;
×
2759
      }
2760

2761
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2762
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2763
      QUERY_CHECK_CODE(code, lino, _end);
×
2764

2765
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2766

2767
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2768
        break;
×
2769
      }
2770
    }
2771

2772
    pRes->info.id.groupId = miaInfo->groupId;
×
2773
  }
2774

2775
  if (miaInfo->inputBlocksFinished) {
×
2776
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2777

2778
    if (listNode != NULL) {
×
2779
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2780
      pRes->info.id.groupId = grpWin->groupId;
×
2781
    }
2782
  }
2783

2784
  if (pRes->info.rows == 0) {
×
2785
    setOperatorCompleted(pOperator);
×
2786
  }
2787

2788
  size_t rows = pRes->info.rows;
×
2789
  pOperator->resultInfo.totalRows += rows;
×
2790

2791
_end:
×
2792
  if (code != TSDB_CODE_SUCCESS) {
×
2793
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2794
    pTaskInfo->code = code;
×
2795
    T_LONG_JMP(pTaskInfo->env, code);
×
2796
  }
2797
  (*ppRes) = (rows == 0) ? NULL : pRes;
×
2798
  return code;
×
2799
}
2800

2801
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
2802
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
2803

2804
  pInfo->hasGroupId = false;
×
2805
  pInfo->groupId = 0;
×
2806
  pInfo->prefetchedBlock = NULL;
×
2807
  pInfo->inputBlocksFinished = false;
×
2808
  tdListEmpty(pInfo->groupIntervals);
×
2809
  
2810
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
2811
  return resetInterval(pOper, pIntervalInfo);
×
2812
}
2813

2814
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2815
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2816
  QRY_PARAM_CHECK(pOptrInfo);
×
2817

2818
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2819
  int32_t                        lino = 0;
×
2820
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2821
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2822
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2823
    code = terrno;
×
2824
    goto _error;
×
2825
  }
2826

2827
  pOperator->pPhyNode = pIntervalPhyNode;
×
2828
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2829
                        .sliding = pIntervalPhyNode->sliding,
×
2830
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2831
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
2832
                        .offset = pIntervalPhyNode->offset,
×
2833
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
2834
                        .timeRange = pIntervalPhyNode->timeRange};
2835
  calcIntervalAutoOffset(&interval);
×
2836

2837
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
2838

2839
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
2840
  pIntervalInfo->win = pTaskInfo->window;
×
2841
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
2842
  pIntervalInfo->interval = interval;
×
2843
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
2844
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
2845
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
2846

2847
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
2848
  pExprSupp->hasWindowOrGroup = true;
×
2849
  pExprSupp->hasWindow = true;
×
2850

2851
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
2852
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
2853

2854
  int32_t    num = 0;
×
2855
  SExprInfo* pExprInfo = NULL;
×
2856
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
2857
  QUERY_CHECK_CODE(code, lino, _error);
×
2858

2859
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
2860
                    pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
×
2861
  if (code != TSDB_CODE_SUCCESS) {
×
2862
    goto _error;
×
2863
  }
2864

2865
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
2866
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
2867
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
2868
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
2869
  QUERY_CHECK_CODE(code, lino, _error);
×
2870

2871
  pIntervalInfo->timeWindowInterpo = false;
×
2872
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
2873
  QUERY_CHECK_CODE(code, lino, _error);
×
2874
  if (pIntervalInfo->timeWindowInterpo) {
×
2875
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2876
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
2877
      goto _error;
×
2878
    }
2879
  }
2880

2881
  pIntervalInfo->pOperator = pOperator;
×
2882
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
2883
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
2884
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
2885
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
2886
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2887
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
2888

2889
  code = appendDownstream(pOperator, &downstream, 1);
×
2890
  if (code != TSDB_CODE_SUCCESS) {
×
2891
    goto _error;
×
2892
  }
2893

2894
  *pOptrInfo = pOperator;
×
2895
  return TSDB_CODE_SUCCESS;
×
2896
_error:
×
2897
  if (pMergeIntervalInfo != NULL) {
×
2898
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
2899
  }
2900
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2901
  pTaskInfo->code = code;
×
2902
  return code;
×
2903
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc