• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5044

06 May 2026 02:35AM UTC coverage: 73.169% (+0.06%) from 73.107%
#5044

push

travis-ci

web-flow
feat: [6659794715] cpu limit (#35153)

244 of 275 new or added lines in 23 files covered. (88.73%)

526 existing lines in 141 files now uncovered.

277745 of 379596 relevant lines covered (73.17%)

133740972.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.29
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47
static int32_t applyIndefRowsWindowSegment(SOperatorInfo* pOperator, SIndefRowsRuntime* pRuntime,
10,756,159✔
48
                                           SSDataBlock* pResultTemplate, int32_t resultRowSize,
49
                                           uint64_t groupId, const STimeWindow* pWin,
50
                                           SSDataBlock* pInputBlock, int32_t startRow, int32_t numRows,
51
                                           int32_t inputTsOrder, bool closeWindow) {
52
  int32_t                code = TSDB_CODE_SUCCESS;
10,756,159✔
53
  int32_t                lino = 0;
10,756,159✔
54
  SIndefRowsWindowState* pState = NULL;
10,756,159✔
55

56
  code = applyIndefRowsFuncOnWindowState(pOperator, pRuntime, &pState, pResultTemplate, groupId, pWin,
10,756,159✔
57
                                                 pInputBlock, startRow, numRows, inputTsOrder, resultRowSize);
58
  QUERY_CHECK_CODE(code, lino, _return);
10,756,159✔
59

60
  if (!closeWindow) {
10,752,981✔
61
    return code;
746,831✔
62
  }
63
  
64
  code = closeIndefRowsWindowState(pOperator, pRuntime, pState);
10,006,150✔
65
  QUERY_CHECK_CODE(code, lino, _return);
10,006,150✔
66

67
_return:
10,009,328✔
68
  if (code) {
10,009,328✔
69
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
3,178✔
70
  }
71
  return code;
10,009,328✔
72
}
73

74
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
2,147,483,647✔
75
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
76
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
77
                                      SExecTaskInfo* pTaskInfo) {
78
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
2,147,483,647✔
79
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
80

81
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,147,483,647✔
82
    *pResult = NULL;
101✔
83
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
84
    return pTaskInfo->code;
×
85
  }
86

87
  // set time window for current result
88
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
2,147,483,647✔
89
  *pResult = pResultRow;
2,147,483,647✔
90
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,147,483,647✔
91
}
92

93
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, int32_t rowIndex, uint64_t groupId) {
2,147,483,647✔
94
  pRowSup->win.ekey = ts;
2,147,483,647✔
95
  pRowSup->prevTs = ts;
2,147,483,647✔
96
  pRowSup->groupId = groupId;
2,147,483,647✔
97
  pRowSup->numOfRows += 1;
2,147,483,647✔
98
  if (hasContinuousNullRows(pRowSup)) {
2,147,483,647✔
99
    // rows having null state col are wrapped by rows of same state
100
    // these rows can be counted into current window
101
    pRowSup->numOfRows += pRowSup->numNullRows;
201,379,039✔
102
    resetNumNullRows(pRowSup);
201,379,039✔
103
  }
104
}
2,147,483,647✔
105

106
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
1,626,506,441✔
107
  pRowSup->startRowIndex = rowIndex;
1,626,506,441✔
108
  pRowSup->numOfRows = 0;
1,626,506,441✔
109
  pRowSup->win.skey = tsList[rowIndex];
1,626,506,441✔
110
  pRowSup->groupId = groupId;
1,626,506,441✔
111
  resetNumNullRows(pRowSup);
1,626,506,441✔
112
}
1,626,506,441✔
113

114
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
115
                                            int32_t order, int64_t* pData) {
116
  int32_t forwardRows = 0;
2,147,483,647✔
117

118
  if (order == TSDB_ORDER_ASC) {
×
119
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
2,147,483,647✔
120
    if (end >= 0) {
2,147,483,647✔
121
      forwardRows = end;
2,147,483,647✔
122

123
      while (pData[end + pos] == ekey) {
2,147,483,647✔
124
        forwardRows += 1;
2,147,483,647✔
125
        ++pos;
2,147,483,647✔
126
      }
127
    }
128
  } else {
129
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
432,505,068✔
130
    if (end >= 0) {
431,823,224✔
131
      forwardRows = end;
432,087,358✔
132

133
      while (pData[end + pos] == ekey) {
851,539,789✔
134
        forwardRows += 1;
419,452,431✔
135
        ++pos;
419,452,431✔
136
      }
137
    }
138
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
139
    //    if (end >= 0) {
140
    //      forwardRows = pos - end;
141
    //
142
    //      if (pData[end] == ekey) {
143
    //        forwardRows += 1;
144
    //      }
145
    //    }
146
  }
147

148
  return forwardRows;
2,147,483,647✔
149
}
150

151
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
2,147,483,647✔
152
  int32_t midPos = -1;
2,147,483,647✔
153
  int32_t numOfRows;
154

155
  if (num <= 0) {
2,147,483,647✔
156
    return -1;
×
157
  }
158

159
  TSKEY*  keyList = (TSKEY*)pValue;
2,147,483,647✔
160
  int32_t firstPos = 0;
2,147,483,647✔
161
  int32_t lastPos = num - 1;
2,147,483,647✔
162

163
  if (order == TSDB_ORDER_DESC) {
2,147,483,647✔
164
    // find the first position which is smaller than the key
165
    while (1) {
166
      if (key >= keyList[firstPos]) return firstPos;
526,888,114✔
167
      if (key == keyList[lastPos]) return lastPos;
106,393,446✔
168

169
      if (key < keyList[lastPos]) {
105,859,069✔
170
        lastPos += 1;
11,126,625✔
171
        if (lastPos >= num) {
11,126,625✔
172
          return -1;
×
173
        } else {
174
          return lastPos;
11,126,625✔
175
        }
176
      }
177

178
      numOfRows = lastPos - firstPos + 1;
94,746,204✔
179
      midPos = (numOfRows >> 1) + firstPos;
94,746,204✔
180

181
      if (key < keyList[midPos]) {
94,746,204✔
182
        firstPos = midPos + 1;
6,939,117✔
183
      } else if (key > keyList[midPos]) {
87,836,463✔
184
        lastPos = midPos - 1;
87,229,598✔
185
      } else {
186
        break;
607,753✔
187
      }
188
    }
189

190
  } else {
191
    // find the first position which is bigger than the key
192
    while (1) {
193
      if (key <= keyList[firstPos]) return firstPos;
2,147,483,647✔
194
      if (key == keyList[lastPos]) return lastPos;
2,147,483,647✔
195

196
      if (key > keyList[lastPos]) {
2,147,483,647✔
197
        lastPos = lastPos + 1;
2,147,483,647✔
198
        if (lastPos >= num)
2,147,483,647✔
199
          return -1;
1,137,195✔
200
        else
201
          return lastPos;
2,147,483,647✔
202
      }
203

204
      numOfRows = lastPos - firstPos + 1;
2,147,483,647✔
205
      midPos = (numOfRows >> 1u) + firstPos;
2,147,483,647✔
206

207
      if (key < keyList[midPos]) {
2,147,483,647✔
208
        lastPos = midPos - 1;
2,147,483,647✔
209
      } else if (key > keyList[midPos]) {
1,778,602,163✔
210
        firstPos = midPos + 1;
1,069,948,158✔
211
      } else {
212
        break;
708,657,908✔
213
      }
214
    }
215
  }
216

217
  return midPos;
709,265,661✔
218
}
219

220
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
2,147,483,647✔
221
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
222
  int32_t num = -1;
2,147,483,647✔
223
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
224

225
  if (order == TSDB_ORDER_ASC) {
2,147,483,647✔
226
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
2,147,483,647✔
227
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
2,147,483,647✔
228
      if (item != NULL) {
2,147,483,647✔
229
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
230
      }
231
    } else {
232
      num = pDataBlockInfo->rows - startPos;
18,372,280✔
233
      if (item != NULL) {
19,881,077✔
234
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
235
      }
236
    }
237
  } else {  // desc
238
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
421,510,407✔
239
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
432,851,043✔
240
      if (item != NULL) {
430,237,828✔
241
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
242
      }
243
    } else {
244
      num = pDataBlockInfo->rows - startPos;
151,192✔
245
      if (item != NULL) {
734,787✔
246
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
247
      }
248
    }
249
  }
250

251
  return num;
2,147,483,647✔
252
}
253

254
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
11,239,374✔
255
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
256
  SqlFunctionCtx* pCtx = pSup->pCtx;
11,239,374✔
257

258
  int32_t index = 1;
11,239,374✔
259
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
33,729,664✔
260
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
22,490,290✔
261
      pCtx[k].start.key = INT64_MIN;
11,250,916✔
262
      continue;
11,250,916✔
263
    }
264

265
    SFunctParam*     pParam = &pCtx[k].param[0];
11,239,374✔
266
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
11,239,374✔
267

268
    double v1 = 0, v2 = 0, v = 0;
11,239,374✔
269
    if (prevRowIndex == -1) {
11,239,374✔
270
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
271
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
×
272
    } else {
273
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
11,239,374✔
274
                     typeGetTypeModFromColInfo(&pColInfo->info));
275
    }
276

277
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
11,239,374✔
278
                   typeGetTypeModFromColInfo(&pColInfo->info));
279

280
#if 0
281
    if (functionId == FUNCTION_INTERP) {
282
      if (type == RESULT_ROW_START_INTERP) {
283
        pCtx[k].start.key = prevTs;
284
        pCtx[k].start.val = v1;
285

286
        pCtx[k].end.key = curTs;
287
        pCtx[k].end.val = v2;
288

289
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
290
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
291
          if (prevRowIndex == -1) {
292
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
293
          } else {
294
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
295
          }
296

297
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
298
        }
299
      }
300
    } else if (functionId == FUNCTION_TWA) {
301
#endif
302

303
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
11,239,374✔
304
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
11,239,374✔
305
    SPoint point = (SPoint){.key = windowKey, .val = &v};
11,239,374✔
306

307
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
11,239,374✔
308
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
11,174,724✔
309
    }
310

311
    if (type == RESULT_ROW_START_INTERP) {
11,239,374✔
312
      pCtx[k].start.key = point.key;
5,581,358✔
313
      pCtx[k].start.val = v;
5,581,358✔
314
    } else {
315
      pCtx[k].end.key = point.key;
5,658,016✔
316
      pCtx[k].end.val = v;
5,658,016✔
317
    }
318

319
    index += 1;
11,239,374✔
320
  }
321
#if 0
322
  }
323
#endif
324
}
11,239,374✔
325

326
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
749,436✔
327
  if (type == RESULT_ROW_START_INTERP) {
749,436✔
328
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,215,911✔
329
      pCtx[k].start.key = INT64_MIN;
802,864✔
330
    }
331
  } else {
332
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,011,923✔
333
      pCtx[k].end.key = INT64_MIN;
675,534✔
334
    }
335
  }
336
}
749,436✔
337

338
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
5,993,945✔
339
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
340
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
5,993,945✔
341

342
  TSKEY curTs = tsCols[pos];
5,993,945✔
343

344
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
5,993,945✔
345
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
5,993,945✔
346

347
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
348
  // start exactly from this point, no need to do interpolation
349
  TSKEY key = ascQuery ? win->skey : win->ekey;
5,993,945✔
350
  if (key == curTs) {
5,993,945✔
351
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
404,083✔
352
    return true;
404,083✔
353
  }
354

355
  // it is the first time window, no need to do interpolation
356
  if (pTsKey->isNull && pos == 0) {
5,589,862✔
357
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
8,504✔
358
  } else {
359
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
5,581,358✔
360
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
5,581,358✔
361
                              RESULT_ROW_START_INTERP, pSup);
362
  }
363

364
  return true;
5,589,862✔
365
}
366

367
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
5,994,405✔
368
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
369
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
370
  int32_t code = TSDB_CODE_SUCCESS;
5,994,405✔
371
  int32_t lino = 0;
5,994,405✔
372
  int32_t order = pInfo->binfo.inputTsOrder;
5,994,405✔
373

374
  TSKEY actualEndKey = tsCols[endRowIndex];
5,994,405✔
375
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
5,994,405✔
376

377
  // not ended in current data block, do not invoke interpolation
378
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
5,994,405✔
379
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
25,129✔
380
    (*pRes) = false;
25,129✔
381
    return code;
25,129✔
382
  }
383

384
  // there is actual end point of current time window, no interpolation needs
385
  if (key == actualEndKey) {
5,969,276✔
386
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
311,260✔
387
    (*pRes) = true;
311,260✔
388
    return code;
311,260✔
389
  }
390

391
  if (nextRowIndex < 0) {
5,658,016✔
392
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
393
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
394
  }
395

396
  TSKEY nextKey = tsCols[nextRowIndex];
5,658,016✔
397
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
5,658,016✔
398
                            RESULT_ROW_END_INTERP, pSup);
399
  (*pRes) = true;
5,658,016✔
400
  return code;
5,658,016✔
401
}
402

403
bool inCalSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, TSKEY calStart, TSKEY calEnd) {
2,147,483,647✔
404
  if (pInterval->interval != pInterval->sliding && (pWin->ekey < calStart || pWin->skey > calEnd)) {
2,147,483,647✔
405
    return false;
×
406
  }
407

408
  return true;
2,147,483,647✔
409
}
410

411
bool inSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, const SDataBlockInfo* pBlockInfo) {
2,147,483,647✔
412
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
2,147,483,647✔
413
}
414

415
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
2,147,483,647✔
416
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
417
  bool ascQuery = (order != TSDB_ORDER_DESC);
2,147,483,647✔
418

419
  int32_t precision = pInterval->precision;
2,147,483,647✔
420
  getNextTimeWindow(pInterval, pNext, order);
2,147,483,647✔
421

422
  // next time window is not in current block
423
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
2,147,483,647✔
424
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
2,147,483,647✔
425
    return -1;
8,290,792✔
426
  }
427

428
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
2,147,483,647✔
429
    return -1;
×
430
  }
431

432
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
2,147,483,647✔
433
  int32_t startPos = 0;
2,147,483,647✔
434

435
  // tumbling time window query, a special case of sliding time window query
436
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
2,147,483,647✔
437
    startPos = prevPosition + 1;
2,147,483,647✔
438
  } else {
439
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
108,836,851✔
440
      startPos = 0;
10,640,453✔
441
    } else {
442
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
106,888,076✔
443
    }
444
  }
445
  if(startPos < 0 || startPos >= pDataBlockInfo->rows) {
2,147,483,647✔
446
    return -1;
2,147,483,647✔
447
  }
448

449
  /* interp query with fill should not skip time window */
450
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
451
  //    return startPos;
452
  //  }
453

454
  /*
455
   * This time window does not cover any data, try next time window,
456
   * this case may happen when the time window is too small
457
   */
458
  if (primaryKeys != NULL) {
2,147,483,647✔
459
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
2,147,483,647✔
460
      TSKEY next = primaryKeys[startPos];
1,772,872,467✔
461
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
1,770,801,055✔
462
        pNext->skey = taosTimeTruncate(next, pInterval);
375✔
463
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
1,784✔
464
      } else {
465
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
1,774,949,776✔
466
        pNext->skey = pNext->ekey - pInterval->interval + 1;
1,772,383,903✔
467
      }
468
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
832,312,445✔
469
      TSKEY next = primaryKeys[startPos];
425,387,361✔
470
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
425,848,178✔
UNCOV
471
        pNext->skey = taosTimeTruncate(next, pInterval);
×
472
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
473
      } else {
474
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
426,174,135✔
475
        pNext->ekey = pNext->skey + pInterval->interval - 1;
426,275,432✔
476
      }
477
    }
478
  }
479

480
  return startPos;
2,147,483,647✔
481
}
482

483
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
17,983,215✔
484
  if (type == RESULT_ROW_START_INTERP) {
17,983,215✔
485
    return pResult->startInterp == true;
5,994,405✔
486
  } else {
487
    return pResult->endInterp == true;
11,988,810✔
488
  }
489
}
490

491
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
11,963,221✔
492
  if (type == RESULT_ROW_START_INTERP) {
11,963,221✔
493
    pResult->startInterp = true;
5,993,945✔
494
  } else {
495
    pResult->endInterp = true;
5,969,276✔
496
  }
497
}
11,963,221✔
498

499
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
2,147,483,647✔
500
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
501
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
502
  int32_t lino = 0;
2,147,483,647✔
503
  if (!pInfo->timeWindowInterpo) {
2,147,483,647✔
504
    return code;
2,147,483,647✔
505
  }
506

507
  if (pBlock == NULL) {
2,871,752✔
508
    code = TSDB_CODE_INVALID_PARA;
×
509
    return code;
×
510
  }
511

512
  if (pBlock->pDataBlock == NULL) {
2,871,752✔
513
    return code;
×
514
  }
515

516
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
5,994,405✔
517

518
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
5,994,405✔
519
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
5,994,405✔
520
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
5,994,405✔
521
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
5,993,945✔
522
    if (interp) {
5,993,945✔
523
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
5,993,945✔
524
    }
525
  } else {
526
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
460✔
527
  }
528

529
  // point interpolation does not require the end key time window interpolation.
530
  // interpolation query does not generate the time window end interpolation
531
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
5,994,405✔
532
  if (!done) {
5,994,405✔
533
    int32_t endRowIndex = startPos + forwardRows - 1;
5,994,405✔
534
    int32_t nextRowIndex = endRowIndex + 1;
5,994,405✔
535

536
    // duplicated ts row does not involve in the interpolation of end value for current time window
537
    int32_t x = endRowIndex;
5,994,405✔
538
    while (x > 0) {
6,011,844✔
539
      if (tsCols[x] == tsCols[x - 1]) {
5,998,122✔
540
        x -= 1;
17,439✔
541
      } else {
542
        endRowIndex = x;
5,980,683✔
543
        break;
5,980,683✔
544
      }
545
    }
546

547
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
5,994,405✔
548
    bool  interp = false;
5,994,405✔
549
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
5,994,405✔
550
                                           win, &interp);
551
    QUERY_CHECK_CODE(code, lino, _end);
5,994,405✔
552
    if (interp) {
5,994,405✔
553
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
5,969,276✔
554
    }
555
  } else {
556
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
557
  }
558

559
_end:
5,994,405✔
560
  if (code != TSDB_CODE_SUCCESS) {
5,994,405✔
561
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
562
  }
563
  return code;
5,994,405✔
564
}
565

566
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
27,101✔
567
  if (pBlock->pDataBlock == NULL) {
27,101✔
568
    return;
×
569
  }
570

571
  size_t num = taosArrayGetSize(pPrevKeys);
27,101✔
572
  for (int32_t k = 0; k < num; ++k) {
81,303✔
573
    SColumn* pc = taosArrayGet(pCols, k);
54,202✔
574

575
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
54,202✔
576

577
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
54,202✔
578
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
54,202✔
579
      if (colDataIsNull_s(pColInfo, i)) {
108,404✔
580
        continue;
×
581
      }
582

583
      char* val = colDataGetData(pColInfo, i);
54,202✔
584
      if (IS_VAR_DATA_TYPE(pkey->type)) {
54,202✔
585
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
586
          memcpy(pkey->pData, val, blobDataTLen(val));
×
587
        } else {
588
          memcpy(pkey->pData, val, varDataTLen(val));
×
589
        }
590
      } else {
591
        memcpy(pkey->pData, val, pkey->bytes);
54,202✔
592
      }
593

594
      break;
54,202✔
595
    }
596
  }
597
}
598

599
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
27,101✔
600
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
601
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
27,101✔
602

603
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
27,101✔
604
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
27,101✔
605

606
  int32_t startPos = 0;
27,101✔
607
  int32_t numOfOutput = pSup->numOfExprs;
27,101✔
608

609
  SResultRow* pResult = NULL;
27,101✔
610

611
  while (1) {
×
612
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
27,101✔
613
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
27,101✔
614
    uint64_t            groupId = pOpenWin->groupId;
27,101✔
615
    SResultRowPosition* p1 = &pOpenWin->pos;
27,101✔
616
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
27,101✔
617
      break;
27,101✔
618
    }
619

620
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
621
    if (NULL == pr) {
×
622
      T_LONG_JMP(pTaskInfo->env, terrno);
×
623
    }
624

625
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
626
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
627
      T_LONG_JMP(pTaskInfo->env, terrno);
×
628
    }
629

630
    if (pr->closed) {
×
631
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
632
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
633
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
634
        T_LONG_JMP(pTaskInfo->env, terrno);
×
635
      }
636
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
637
      taosMemoryFree(pNode);
×
638
      continue;
×
639
    }
640

641
    STimeWindow w = pr->win;
×
642
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
643
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
644
    if (ret != TSDB_CODE_SUCCESS) {
×
645
      T_LONG_JMP(pTaskInfo->env, ret);
×
646
    }
647

648
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
649
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
650
      T_LONG_JMP(pTaskInfo->env, terrno);
×
651
    }
652

653
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
654
    if (!pTsKey) {
×
655
      pTaskInfo->code = terrno;
×
656
      T_LONG_JMP(pTaskInfo->env, terrno);
×
657
    }
658

659
    int64_t prevTs = *(int64_t*)pTsKey->pData;
×
660
    if (groupId == pBlock->info.id.groupId) {
×
661
      TSKEY curTs = pBlock->info.window.skey;
×
662
      if (tsCols != NULL) {
×
663
        curTs = tsCols[startPos];
×
664
      }
665
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
666
                                RESULT_ROW_END_INTERP, pSup);
667
    }
668

669
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
670
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
671

672
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
673
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
674
                                          pBlock->info.rows, numOfExprs);
×
675
    if (ret != TSDB_CODE_SUCCESS) {
×
676
      T_LONG_JMP(pTaskInfo->env, ret);
×
677
    }
678

679
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
680
      closeResultRow(pr);
×
681
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
682
      taosMemoryFree(pNode);
×
683
    } else {  // the remains are can not be closed yet.
684
      break;
×
685
    }
686
  }
687
}
27,101✔
688

689
static bool tsKeyCompFn(void* l, void* r, void* param) {
2,028,424,624✔
690
  TSKEY*                    lTS = (TSKEY*)l;
2,028,424,624✔
691
  TSKEY*                    rTS = (TSKEY*)r;
2,028,424,624✔
692
  SIntervalAggOperatorInfo* pInfo = param;
2,028,424,624✔
693
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
2,028,424,624✔
694
}
695

696
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
319,425,377✔
697
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
319,425,377✔
698
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
319,494,397✔
699
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
320,214,641✔
700
}
701

702
/**
703
 * @brief check if cur window should be filtered out by limit info
704
 * @retval true if should be filtered out
705
 * @retval false if not filtering out
706
 * @note If no limit info, we skip filtering.
707
 *       If input/output ts order mismatch, we skip filtering too.
708
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
709
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
710
 *       every tuple in every block.
711
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
712
 */
713
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
2,147,483,647✔
714
                                  SExecTaskInfo* pTaskInfo) {
715
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
716
  int32_t lino = 0;
2,147,483,647✔
717
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
2,147,483,647✔
718
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
783,002,627✔
719
      // if input/output ts order mismatch, no filter
720
  ) {
721
    return false;
2,147,483,647✔
722
  }
723

724
  if (pOperatorInfo->limit == 0) return true;
321,469,624✔
725

726
  if (pOperatorInfo->pBQ == NULL) {
320,991,854✔
727
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
383,374✔
728
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
383,780✔
729
  }
730

731
  bool shouldFilter = false;
321,140,450✔
732
  // if BQ has been full, compare it with top of BQ
733
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
321,140,450✔
734
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
88,911,779✔
735
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
88,910,155✔
736
  }
737
  if (shouldFilter) {
319,788,470✔
738
    return true;
1,012,693✔
739
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
318,775,777✔
740
    return false;
130,803,218✔
741
  }
742

743
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
744
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
189,029,783✔
745
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
189,144,275✔
746

747
  *((TSKEY*)node.data) = win->skey;
189,144,275✔
748

749
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
189,135,343✔
750
    taosMemoryFree(node.data);
×
751
    return true;
×
752
  }
753

754
_end:
189,091,495✔
755
  if (code != TSDB_CODE_SUCCESS) {
188,887,683✔
756
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
757
    pTaskInfo->code = code;
×
758
    T_LONG_JMP(pTaskInfo->env, code);
×
759
  }
760
  return false;
189,101,645✔
761
}
762

763
int32_t getNumOfRowsInTimeWinUnsorted(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, STimeWindow* win,
2,147,483,647✔
764
                                      int32_t startPos) {
765
  int32_t rows = pDataBlockInfo->rows;
2,147,483,647✔
766
  for (int32_t i = startPos; i < pDataBlockInfo->rows; ++i) {
2,147,483,647✔
767
    if (pPrimaryColumn[i] >= win->skey && pPrimaryColumn[i] <= win->ekey) {
2,147,483,647✔
768
      continue;
2,147,483,647✔
769
    } else {
770
      return i - startPos;
2,147,483,647✔
771
    }
772
  }
773
  return rows - startPos;
115,794,590✔
774
}
775

776
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
121,368,249✔
777
                            int32_t scanFlag) {
778
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
121,368,249✔
779

780
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
121,369,553✔
781
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
121,368,424✔
782

783
  int32_t     startPos = 0;
121,365,118✔
784
  int32_t     numOfOutput = pSup->numOfExprs;
121,365,118✔
785
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
121,361,823✔
786
  uint64_t    tableGroupId = pBlock->info.id.groupId;
121,368,861✔
787
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
121,369,501✔
788
  SResultRow* pResult = NULL;
121,362,971✔
789
  bool        sorted = pInfo->binfo.inputTsOrder == ORDER_ASC || pInfo->binfo.inputTsOrder == ORDER_DESC || tsCols == NULL;
121,364,643✔
790
  TSKEY       ts = sorted ? getStartTsKey(&pBlock->info.window, tsCols) : tsCols[startPos];
121,366,933✔
791
  int32_t     ret = TSDB_CODE_SUCCESS;
121,357,064✔
792

793
  if (tableGroupId != pInfo->curGroupId) {
121,357,064✔
794
    if (pInfo->indefRowsMode) {
14,009,055✔
795
      ret = closeAllIndefRowsWindowStates(pOperatorInfo, &pInfo->indefRows);
17,480✔
796
      if (ret != TSDB_CODE_SUCCESS) {
17,480✔
797
        T_LONG_JMP(pTaskInfo->env, ret);
×
798
      }
799
    }
800

801
    pInfo->handledGroupNum += 1;
14,009,461✔
802
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
14,008,350✔
803
      return true;
37,215✔
804
    } else {
805
      pInfo->curGroupId = tableGroupId;
13,972,246✔
806
      destroyBoundedQueue(pInfo->pBQ);
13,972,384✔
807
      pInfo->pBQ = NULL;
13,970,181✔
808
    }
809
  }
810

811
  STimeWindow win =
121,329,640✔
812
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
121,327,616✔
813
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
121,326,897✔
814

815
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
120,497,457✔
816
  int32_t forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey,
8,524,976✔
817
                                                          NULL, pInfo->binfo.inputTsOrder)
818
                               : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &win, startPos);
120,497,457✔
819

820
  if (pInfo->indefRowsMode) {
120,501,825✔
821
    bool closeWindow = sorted && (startPos + forwardRows < pBlock->info.rows);
315,615✔
822
    ret = applyIndefRowsWindowSegment(pOperatorInfo, &pInfo->indefRows, pInfo->binfo.pRes, pInfo->aggSup.resultRowSize,
315,615✔
823
                                      tableGroupId, &win, pBlock, startPos, forwardRows, pInfo->binfo.inputTsOrder,
824
                                      closeWindow);
825
    if (ret != TSDB_CODE_SUCCESS) {
315,615✔
826
      T_LONG_JMP(pTaskInfo->env, ret);
1,819✔
827
    }
828
  } else {
829
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
120,185,708✔
830
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
831
    if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
120,179,050✔
832
      T_LONG_JMP(pTaskInfo->env, ret);
4,313✔
833
    }
834

835
    // prev time window not interpolation yet.
836
    if (pInfo->timeWindowInterpo) {
120,174,912✔
837
      SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
27,101✔
838
      doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
27,101✔
839

840
      // restore current time window
841
      ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
27,101✔
842
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
843
      if (ret != TSDB_CODE_SUCCESS) {
27,101✔
844
        T_LONG_JMP(pTaskInfo->env, ret);
×
845
      }
846

847
      // window start key interpolation
848
      ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
27,101✔
849
      if (ret != TSDB_CODE_SUCCESS) {
27,101✔
850
        T_LONG_JMP(pTaskInfo->env, ret);
×
851
      }
852
    }
853
    // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
854
    //   win.skey, win.ekey, startPos, forwardRows);
855
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
120,180,172✔
856
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
120,173,516✔
857
                                          pBlock->info.rows, numOfOutput);
120,179,667✔
858
    if (ret != TSDB_CODE_SUCCESS) {
120,152,691✔
859
      T_LONG_JMP(pTaskInfo->env, ret);
×
860
    }
861

862
    doCloseWindow(pResultRowInfo, pInfo, pResult);
120,152,691✔
863
  }
864

865
  STimeWindow nextWin = win;
120,495,186✔
866
  int32_t rows = pBlock->info.rows;
120,496,486✔
867

868
  while (startPos < pBlock->info.rows) {
2,147,483,647✔
869
    if (sorted) {
2,147,483,647✔
870
      startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, forwardRows - 1 + startPos,
2,147,483,647✔
871
                                        pInfo->binfo.inputTsOrder);
872
      if (startPos < 0) {
2,147,483,647✔
873
        break;
8,290,386✔
874
      }
875
    } else {
876
      pBlock->info.rows = forwardRows;
2,147,483,647✔
877
      int32_t newStartOff = forwardRows >= 1
2,147,483,647✔
878
                                ? getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols + startPos,
2,147,483,647✔
879
                                                         forwardRows - 1, pInfo->binfo.inputTsOrder)
880
                                : -1;
2,147,483,647✔
881
      pBlock->info.rows = rows;
2,147,483,647✔
882
      if (newStartOff >= 0) {
2,147,483,647✔
883
        startPos += newStartOff;
82,432,826✔
884
      } else if ((startPos += forwardRows) < pBlock->info.rows) {
2,147,483,647✔
885
        getInitialStartTimeWindow(&pInfo->interval, tsCols[startPos], &nextWin, true);
2,147,483,647✔
886
      }
887
      if (startPos >= pBlock->info.rows) {
2,147,483,647✔
888
        break;
111,975,611✔
889
      }
890
    }
891

892
    if (filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
2,147,483,647✔
893
      break;
235,089✔
894
    }
895

896
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
2,147,483,647✔
897
    forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,147,483,647✔
898
                                                    pInfo->binfo.inputTsOrder)
899
                         : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &nextWin, startPos);
2,147,483,647✔
900
    if (forwardRows == 0) continue;
2,147,483,647✔
901

902
    if (pInfo->indefRowsMode) {
2,147,483,647✔
903
      bool closeWindow = sorted && (startPos + forwardRows < pBlock->info.rows);
10,350,042✔
904
      ret = applyIndefRowsWindowSegment(pOperatorInfo, &pInfo->indefRows, pInfo->binfo.pRes, pInfo->aggSup.resultRowSize,
10,350,042✔
905
                                        tableGroupId, &nextWin, pBlock, startPos, forwardRows,
906
                                        pInfo->binfo.inputTsOrder, closeWindow);
907
      if (ret != TSDB_CODE_SUCCESS) {
10,350,042✔
908
        T_LONG_JMP(pTaskInfo->env, ret);
×
909
      }
910
    } else {
911
      // null data, failed to allocate more memory buffer
912
      int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,147,483,647✔
913
                                            pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
914
      if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
2,147,483,647✔
915
        T_LONG_JMP(pTaskInfo->env, code);
37✔
916
      }
917

918
      // window start(end) key interpolation
919
      code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
2,147,483,647✔
920
      if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
921
        T_LONG_JMP(pTaskInfo->env, code);
×
922
      }
923
      // TODO: add to open window? how to close the open windows after input blocks exhausted?
924
#if 0
925
      if ((ascScan && ekey <= pBlock->info.window.ekey) ||
926
          (!ascScan && ekey >= pBlock->info.window.skey)) {
927
        // window start(end) key interpolation
928
        doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
929
      } else if (pInfo->timeWindowInterpo) {
930
        addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
931
      }
932
#endif
933
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
2,147,483,647✔
934
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,147,483,647✔
935
                                            pBlock->info.rows, numOfOutput);
2,147,483,647✔
936
      if (ret != TSDB_CODE_SUCCESS) {
2,147,483,647✔
937
        T_LONG_JMP(pTaskInfo->env, ret);
×
938
      }
939
      doCloseWindow(pResultRowInfo, pInfo, pResult);
2,147,483,647✔
940
    }
941
  }
942

943
  if (pInfo->timeWindowInterpo) {
125,284,332✔
944
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
27,101✔
945
  }
946
  return false;
120,502,437✔
947
}
948

949
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
2,147,483,647✔
950
  // current result is done in computing final results.
951
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
2,147,483,647✔
952
    closeResultRow(pResult);
5,969,276✔
953
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
5,969,276✔
954
    taosMemoryFree(pNode);
5,969,276✔
955
  }
956
}
2,147,483,647✔
957

958
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
27,101✔
959
                                       SExecTaskInfo* pTaskInfo) {
960
  int32_t         code = TSDB_CODE_SUCCESS;
27,101✔
961
  int32_t         lino = 0;
27,101✔
962
  SOpenWindowInfo openWin = {0};
27,101✔
963
  openWin.pos.pageId = pResult->pageId;
27,101✔
964
  openWin.pos.offset = pResult->offset;
27,101✔
965
  openWin.groupId = groupId;
27,101✔
966
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
27,101✔
967
  if (pn == NULL) {
27,101✔
968
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
27,101✔
969
    QUERY_CHECK_CODE(code, lino, _end);
27,101✔
970
    return openWin.pos;
27,101✔
971
  }
972

UNCOV
973
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
×
UNCOV
974
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
×
975
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
976
    QUERY_CHECK_CODE(code, lino, _end);
×
977
  }
978

UNCOV
979
_end:
×
UNCOV
980
  if (code != TSDB_CODE_SUCCESS) {
×
981
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
982
    pTaskInfo->code = code;
×
983
    T_LONG_JMP(pTaskInfo->env, code);
×
984
  }
UNCOV
985
  return openWin.pos;
×
986
}
987

988
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
125,459,068✔
989
  TSKEY* tsCols = NULL;
125,459,068✔
990

991
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
125,459,068✔
992
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
125,460,211✔
993
    if (!pColDataInfo) {
125,455,495✔
994
      pTaskInfo->code = terrno;
×
995
      T_LONG_JMP(pTaskInfo->env, terrno);
×
996
    }
997

998
    tsCols = (int64_t*)pColDataInfo->pData;
125,455,495✔
999
    if (tsCols[0] == 0) {
125,457,379✔
1000
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
442✔
1001
            tsCols[pBlock->info.rows - 1]);
1002
    }
1003

1004
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
125,460,157✔
1005
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
7,834,210✔
1006
      if (code != TSDB_CODE_SUCCESS) {
7,832,743✔
1007
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1008
        pTaskInfo->code = code;
×
1009
        T_LONG_JMP(pTaskInfo->env, code);
×
1010
      }
1011
    }
1012
  }
1013

1014
  return tsCols;
125,456,781✔
1015
}
1016

1017
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
28,423,259✔
1018
  if (OPTR_IS_OPENED(pOperator)) {
28,423,259✔
1019
    return TSDB_CODE_SUCCESS;
23,274,235✔
1020
  }
1021

1022
  int32_t        code = TSDB_CODE_SUCCESS;
5,151,394✔
1023
  int32_t        lino = 0;
5,151,394✔
1024
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
5,151,394✔
1025
  SOperatorInfo* downstream = pOperator->pDownstream[0];
5,153,132✔
1026

1027
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
5,150,232✔
1028
  SExprSupp*                pSup = &pOperator->exprSupp;
5,151,394✔
1029

1030
  if (pSup->pFilterInfo != NULL) {
5,149,245✔
1031
    filterSetExecContext(pSup->pFilterInfo, pTaskInfo, isTaskKilled);
7,524✔
1032
  }
1033

1034
  int32_t scanFlag = MAIN_SCAN;
5,143,585✔
1035

1036
  pInfo->cleanGroupResInfo = false;
5,143,585✔
1037
  while (1) {
121,330,261✔
1038
    SSDataBlock* pBlock = getNextBlockFromDownstreamRemainDetach(pOperator, 0);
126,477,332✔
1039
    if (pBlock == NULL) {
126,477,492✔
1040
      break;
5,070,939✔
1041
    }
1042

1043
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
121,406,553✔
1044

1045
    if (pInfo->scalarSupp.pExprInfo != NULL) {
121,410,804✔
1046
      SExprSupp* pExprSup = &pInfo->scalarSupp;
11,052,588✔
1047
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
22,103,639✔
1048
                                   GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
11,052,588✔
1049
      QUERY_CHECK_CODE(code, lino, _end);
11,052,055✔
1050
    }
1051

1052
    // the pDataBlock are always the same one, no need to call this again
1053
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
121,366,748✔
1054
    QUERY_CHECK_CODE(code, lino, _end);
121,368,249✔
1055
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
121,368,249✔
1056
  }
1057

1058
  if (pInfo->indefRowsMode) {
5,108,154✔
1059
    code = closeAllIndefRowsWindowStates(pOperator, &pInfo->indefRows);
74,088✔
1060
    QUERY_CHECK_CODE(code, lino, _end);
74,088✔
1061
  } else {
1062
    code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
5,033,192✔
1063
    QUERY_CHECK_CODE(code, lino, _end);
5,033,192✔
1064
    pInfo->cleanGroupResInfo = true;
5,033,192✔
1065
  }
1066

1067
  OPTR_SET_OPENED(pOperator);
5,107,276✔
1068

1069
_end:
5,148,531✔
1070
  if (code != TSDB_CODE_SUCCESS) {
5,148,531✔
1071
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
40,670✔
1072
    pTaskInfo->code = code;
40,670✔
1073
    T_LONG_JMP(pTaskInfo->env, code);
40,670✔
1074
  }
1075
  return code;
5,107,861✔
1076
}
1077

1078
// start a new state window and record the start info
1079
void doKeepNewStateWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList,
2,147,483,647✔
1080
  int32_t rowIndex, uint64_t groupId, const EStateWinExtendOption* extendOption, bool hasPrevWin) {
1081
  pRowSup->groupId = groupId;
2,147,483,647✔
1082
  if (*extendOption == STATE_WIN_EXTEND_OPTION_DEFAULT ||
2,147,483,647✔
1083
      *extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
2,147,483,647✔
1084
    pRowSup->win.skey = tsList[rowIndex];
2,147,483,647✔
1085
    pRowSup->startRowIndex = rowIndex;
2,147,483,647✔
1086
    pRowSup->numOfRows = 0;  // does not include the current row yet
2,147,483,647✔
1087
  } else {
1088
    pRowSup->startRowIndex = hasContinuousNullRows(pRowSup) ?
2,147,483,647✔
1089
      rowIndex - pRowSup->numNullRows : rowIndex;
1,162,774,400✔
1090
    pRowSup->win.skey = hasPrevWin ?
1,162,774,400✔
1091
                        pRowSup->win.ekey + 1 : tsList[pRowSup->startRowIndex];
1,162,774,400✔
1092
    pRowSup->numOfRows = rowIndex - pRowSup->startRowIndex;
1,162,774,400✔
1093
  }
1094
  resetNumNullRows(pRowSup);
2,147,483,647✔
1095
}
2,147,483,647✔
1096

1097
// close a state window and record its end info
1098
// this functions is called when a new state row appears
1099
// @param rowIndex the index of the first row of next window
1100
void doKeepCurStateWindowEndInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, 
2,147,483,647✔
1101
                                 int32_t rowIndex,
1102
                                 const EStateWinExtendOption* extendOption,
1103
                                 bool hasNextWin) {
1104
  /*
1105
   * For deferred rows between two windows:
1106
   * - EXTEND(0): follow single-col default null behavior.
1107
   *   unresolved rows are dropped when state changes.
1108
   * - EXTEND(1): unresolved rows belong to old window.
1109
   * - EXTEND(2): unresolved rows belong to new window
1110
   *   (handled by doKeepNewStateWindowStartInfo).
1111
   */
1112
  if (*extendOption == STATE_WIN_EXTEND_OPTION_DEFAULT) {
2,147,483,647✔
1113
      /*
1114
       * Partial-NULL rows must belong to a window.
1115
       * Under EXTEND(0), unresolved all-NULL rows are still handled by the
1116
       * default null behavior, while deferred partial-NULL rows are merged
1117
       * into the previous window.
1118
       */
1119
      if (pRowSup->numDeferredPartialNull > 0) {
2,147,483,647✔
1120
        pRowSup->win.ekey = pRowSup->lastDeferredPartialNullTs;
637,387,012✔
1121
        if (pRowSup->numNullRows < pRowSup->numDeferredPartialNull) {
637,387,012✔
1122
          qError("%s:%d numNullRows(%u) < numDeferredPartialNull(%u), clamping",
×
1123
                 __func__, __LINE__, pRowSup->numNullRows,
1124
                 pRowSup->numDeferredPartialNull);
1125
          pRowSup->numNullRows = 0;
×
1126
        } else {
1127
          pRowSup->numNullRows -= pRowSup->numDeferredPartialNull;
637,387,012✔
1128
        }
1129
      }
1130
      pRowSup->numOfRows += pRowSup->numDeferredPartialNull;
2,147,483,647✔
1131
      pRowSup->numDeferredPartialNull = 0;
2,147,483,647✔
1132
      pRowSup->lastDeferredPartialNullTs = INT64_MIN;
2,147,483,647✔
1133
      if (hasNextWin) {
2,147,483,647✔
1134
        /*
1135
         * State changed: drop unresolved deferred rows
1136
         * (all-NULL + dual-side compatible partial-NULL)
1137
         * between windows.
1138
         */
1139
        resetNumNullRows(pRowSup);
2,147,483,647✔
1140
      }
1141
      /*
1142
       * End of block without state change (!hasNextWin): preserve
1143
       * numNullRows so trailing unresolved rows survive cross-block
1144
       * and can be resolved by rows in the next block.
1145
       * Their physical data stays in the unfinished block
1146
       * (numPartialCalcRows excludes them), so aggregation in the
1147
       * next block can read their column values correctly.
1148
       */
1149
  } else if (*extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
2,147,483,647✔
1150
      pRowSup->win.ekey = hasNextWin?
1,171,552,103✔
1151
                          tsList[rowIndex] - 1 : pRowSup->prevTs;
1,171,552,103✔
1152
      pRowSup->numOfRows += hasContinuousNullRows(pRowSup) ?
2,147,483,647✔
1153
        pRowSup->numNullRows : 0;
1,171,552,103✔
1154
      resetNumNullRows(pRowSup);
1,171,552,103✔
1155
  } else if (*extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
1,170,741,050✔
1156
      /*
1157
       * EXTEND(2) uses deferred rows to extend the next window when a cut
1158
       * happens.
1159
       *
1160
       * At block tail (!hasNextWin), only deferred partial-NULL rows may
1161
       * need to be preserved into current window. Pure all-NULL tail rows
1162
       * must not be merged into the current window; they are unresolved and
1163
       * should follow default tail behavior (typically dropped).
1164
       */
1165
      if (!hasNextWin && pRowSup->numDeferredPartialNull > 0) {
1,170,741,050✔
1166
        pRowSup->win.ekey = pRowSup->lastDeferredPartialNullTs;
398✔
1167
        pRowSup->numOfRows += pRowSup->numDeferredPartialNull;
398✔
1168
        if (pRowSup->numNullRows < pRowSup->numDeferredPartialNull) {
398✔
1169
          qError("%s:%d numNullRows(%u) < numDeferredPartialNull(%u), clamping",
×
1170
                 __func__, __LINE__, pRowSup->numNullRows,
1171
                 pRowSup->numDeferredPartialNull);
1172
          pRowSup->numNullRows = 0;
×
1173
        } else {
1174
          pRowSup->numNullRows -= pRowSup->numDeferredPartialNull;
398✔
1175
        }
1176
        pRowSup->numDeferredPartialNull = 0;
398✔
1177
        pRowSup->lastDeferredPartialNullTs = INT64_MIN;
398✔
1178
        pRowSup->firstDeferredPartialRowIndex = -1;
398✔
1179
      }
1180
  }
1181
}
2,147,483,647✔
1182

1183
void doKeepStateWindowNullInfo(SWindowRowsSup* pRowSup, TSKEY nullRowTs) {
2,060,516,621✔
1184
  pRowSup->numNullRows += 1;
2,060,516,621✔
1185
  pRowSup->prevTs = nullRowTs;
2,060,516,621✔
1186
}
2,060,516,621✔
1187

1188
static FORCE_INLINE void absorbDeferredTailAllNull(SWindowRowsSup* pRowSup) {
1189
  if (pRowSup->numDeferredTailAllNull == 0) {
850,841,123✔
1190
    return;
845,958,855✔
1191
  }
1192

1193
  pRowSup->numDeferredPartialNull += pRowSup->numDeferredTailAllNull;
4,882,268✔
1194
  pRowSup->numDeferredTailAllNull = 0;
4,882,268✔
1195
}
1196

1197
static void resetStateKeysUndefined(SStateWindowOperatorInfo* pInfo) {
2,147,483,647✔
1198
  int32_t keyNum = taosArrayGetSize(pInfo->stateKeys);
2,147,483,647✔
1199
  for (int32_t i = 0; i < keyNum; ++i) {
2,147,483,647✔
1200
    SStateKeys* pKey = taosArrayGet(pInfo->stateKeys, i);
2,147,483,647✔
1201
    if (pKey != NULL) pKey->isNull = true;
2,147,483,647✔
1202
  }
1203
}
2,147,483,647✔
1204

1205
static bool stateWindowKeysAllDefined(const SStateWindowOperatorInfo* pInfo) {
1,194✔
1206
  int32_t keyNum = taosArrayGetSize(pInfo->stateKeys);
1,194✔
1207
  for (int32_t i = 0; i < keyNum; ++i) {
2,786✔
1208
    SStateKeys* pKey = taosArrayGet(pInfo->stateKeys, i);
1,990✔
1209
    if (pKey == NULL || pKey->isNull) {
1,990✔
1210
      return false;
398✔
1211
    }
1212
  }
1213
  return true;
796✔
1214
}
1215

1216
/**
1217
  @brief Process the closed state window and do aggregation on the tuples
1218
  within the window. Partial results are stored in the output buffer. If window
1219
  has no valid rows, return success.
1220
*/
1221
static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo,
2,147,483,647✔
1222
                                        SWindowRowsSup* pRowSup,
1223
                                        SSDataBlock* pBlock,
1224
                                        SExecTaskInfo* pTaskInfo,
1225
                                        SExprSupp* pSup,
1226
                                        int32_t numOfOutput,
1227
                                        bool closeWindow) {
1228
  if (pRowSup->numOfRows == 0) {
2,147,483,647✔
1229
    // no valid rows in the window
1230
    return TSDB_CODE_SUCCESS;
22,849,623✔
1231
  }
1232

1233
  if (pInfo->indefRowsMode) {
2,147,483,647✔
1234
    return applyIndefRowsWindowSegment(pInfo->pOperator, &pInfo->indefRows, pInfo->binfo.pRes,
131,094✔
1235
                                       pInfo->aggSup.resultRowSize, pRowSup->groupId, &pRowSup->win, pBlock,
65,547✔
1236
                                       pRowSup->startRowIndex, pRowSup->numOfRows, pInfo->binfo.inputTsOrder,
1237
                                       closeWindow);
1238
  }
1239

1240
  int32_t     code = TSDB_CODE_SUCCESS;
2,147,483,647✔
1241
  int32_t     lino = 0;
2,147,483,647✔
1242
  SResultRow* pResult = NULL;
2,147,483,647✔
1243
  code = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win,
2,147,483,647✔
1244
    true, &pResult, pRowSup->groupId, pSup->pCtx, numOfOutput,
2,147,483,647✔
1245
    pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1246
  QUERY_CHECK_CODE(code, lino, _return);
2,147,483,647✔
1247

1248
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
2,147,483,647✔
1249
  pResult->nOrigRows += pRowSup->numOfRows;
2,147,483,647✔
1250
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx,
2,147,483,647✔
1251
    &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1252
    pRowSup->numOfRows, 0, numOfOutput);
1253
  QUERY_CHECK_CODE(code, lino, _return);
2,147,483,647✔
1254

1255
_return:
2,147,483,647✔
1256
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
1257
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1258
  }
1259
  return code;
2,147,483,647✔
1260
}
1261

1262
/*
1263
 * Return the per-column aggregate pointer for slotId, or NULL when
1264
 * the block has no aggregates or slotId is out of range.
1265
 */
1266
static FORCE_INLINE struct SColumnDataAgg* getBlockAggForSlot(
1267
    const SSDataBlock* pBlock, int32_t slotId) {
1268
  if (pBlock->pBlockAgg == NULL) {
2,147,483,647✔
1269
    return NULL;
2,147,483,647✔
1270
  }
1271
  int32_t numCols = taosArrayGetSize(pBlock->pDataBlock);
×
1272
  if (slotId < 0 || slotId >= numCols) {
×
1273
    qError("%s pBlockAgg slotId out of bounds, slotId:%d numCols:%d",
×
1274
           __func__, slotId, numCols);
1275
    return NULL;
×
1276
  }
1277
  return &pBlock->pBlockAgg[slotId];
×
1278
}
1279

1280
/*
1281
 * Check NULL status of all state key columns for the given row.
1282
 *   *pAllNull  = true  when every column is NULL.
1283
 *   *pHasNull  = true  when at least one column is NULL
1284
 *                       (includes the all-NULL case).
1285
 */
1286
static int32_t stateWindowRowNullCheck(
2,147,483,647✔
1287
    SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock,
1288
    int32_t rowIndex, bool* pAllNull, bool* pHasNull) {
1289
  int32_t keyNum = taosArrayGetSize(pInfo->stateCols);
2,147,483,647✔
1290

1291
  bool anyNull = false;
2,147,483,647✔
1292
  bool anyNonNull = false;
2,147,483,647✔
1293
  for (int32_t i = 0; i < keyNum; ++i) {
2,147,483,647✔
1294
    SColumn* pStateCol = taosArrayGet(pInfo->stateCols, i);
2,147,483,647✔
1295
    SColumnInfoData* pColData =
1296
        taosArrayGet(pBlock->pDataBlock, pStateCol->slotId);
2,147,483,647✔
1297
    struct SColumnDataAgg* pAgg = getBlockAggForSlot(pBlock, pStateCol->slotId);
2,147,483,647✔
1298
    if (pColData == NULL) {
2,147,483,647✔
1299
      qError("%s invalid state key column, slotId:%d is missing",
×
1300
             __func__, pStateCol->slotId);
1301
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1302
    }
1303

1304
    if (colDataIsNull(pColData, pBlock->info.rows, rowIndex, pAgg)) {
2,147,483,647✔
1305
      anyNull = true;
2,147,483,647✔
1306
    } else {
1307
      anyNonNull = true;
2,147,483,647✔
1308
    }
1309
  }
1310
  *pAllNull = !anyNonNull;
2,147,483,647✔
1311
  *pHasNull = anyNull;
2,147,483,647✔
1312
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
1313
}
1314

1315
/*
1316
 * Assign state key values from the given row.  Only non-NULL
1317
 * columns are copied; NULL columns keep their previous value
1318
 * (or stay "undefined" if isNull is still true).
1319
 */
1320
static int32_t assignStateWindowKeys(
2,147,483,647✔
1321
    SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock,
1322
    int32_t rowIndex) {
1323
  int32_t keyNum = taosArrayGetSize(pInfo->stateCols);
2,147,483,647✔
1324
  for (int32_t i = 0; i < keyNum; ++i) {
2,147,483,647✔
1325
    SColumn* pStateCol = taosArrayGet(pInfo->stateCols, i);
2,147,483,647✔
1326
    SStateKeys* pKey = taosArrayGet(pInfo->stateKeys, i);
2,147,483,647✔
1327
    SColumnInfoData* pColData =
1328
        taosArrayGet(pBlock->pDataBlock, pStateCol->slotId);
2,147,483,647✔
1329
    if (pColData == NULL || pColData->pData == NULL) {
2,147,483,647✔
1330
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1331
    }
1332
    struct SColumnDataAgg* pAgg = getBlockAggForSlot(pBlock, pStateCol->slotId);
2,147,483,647✔
1333
    if (colDataIsNull(pColData, pBlock->info.rows, rowIndex, pAgg)) {
2,147,483,647✔
1334
      continue;
2,147,483,647✔
1335
    }
1336
    assignVal(pKey->pData, colDataGetData(pColData, rowIndex),
2,147,483,647✔
1337
              pColData->info.bytes, pKey->type);
2,147,483,647✔
1338
    pKey->isNull = false;
2,147,483,647✔
1339
  }
1340
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
1341
}
1342

1343
/*
1344
 * Copy pendingKeys from stateKeys.  Called when a new window
1345
 * opens or after pending rows are flushed.
1346
 */
1347
static void syncPendingKeysFromState(SStateWindowOperatorInfo* pInfo) {
2,147,483,647✔
1348
  int32_t keyNum = taosArrayGetSize(pInfo->stateKeys);
2,147,483,647✔
1349
  for (int32_t i = 0; i < keyNum; ++i) {
2,147,483,647✔
1350
    SStateKeys* pSrc = taosArrayGet(pInfo->stateKeys, i);
2,147,483,647✔
1351
    SStateKeys* pDst = taosArrayGet(pInfo->pendingKeys, i);
2,147,483,647✔
1352
    if (pSrc == NULL || pDst == NULL) continue;
2,147,483,647✔
1353
    pDst->isNull = pSrc->isNull;
2,147,483,647✔
1354
    if (!pSrc->isNull) {
2,147,483,647✔
1355
      memcpy(pDst->pData, pSrc->pData, pSrc->bytes);
2,147,483,647✔
1356
    }
1357
  }
1358
  memset(pInfo->pendingColTouched, 0, sizeof(bool) * keyNum);
2,147,483,647✔
1359
  pInfo->hasPendingPartialNull = false;
2,147,483,647✔
1360
}
2,147,483,647✔
1361

1362
/*
1363
 * Copy stateKeys from pendingKeys.  Called when flushing
1364
 * pending rows into the current window (all-non-NULL confirm).
1365
 */
1366
static void syncStateKeysFromPending(SStateWindowOperatorInfo* pInfo) {
2,147,483,647✔
1367
  int32_t keyNum = taosArrayGetSize(pInfo->stateKeys);
2,147,483,647✔
1368
  for (int32_t i = 0; i < keyNum; ++i) {
2,147,483,647✔
1369
    SStateKeys* pSrc = taosArrayGet(pInfo->pendingKeys, i);
2,147,483,647✔
1370
    SStateKeys* pDst = taosArrayGet(pInfo->stateKeys, i);
2,147,483,647✔
1371
    if (pSrc == NULL || pDst == NULL) continue;
2,147,483,647✔
1372
    pDst->isNull = pSrc->isNull;
2,147,483,647✔
1373
    if (!pSrc->isNull) {
2,147,483,647✔
1374
      memcpy(pDst->pData, pSrc->pData, pSrc->bytes);
2,147,483,647✔
1375
    }
1376
  }
1377
}
2,147,483,647✔
1378

1379
static void resetPendingState(SStateWindowOperatorInfo* pInfo) {
2,147,483,647✔
1380
  int32_t keyNum = taosArrayGetSize(pInfo->stateKeys);
2,147,483,647✔
1381
  memset(pInfo->pendingColTouched, 0, sizeof(bool) * keyNum);
2,147,483,647✔
1382
  pInfo->hasPendingPartialNull = false;
2,147,483,647✔
1383
}
2,147,483,647✔
1384

1385
/*
1386
 * Read-only comparison against pendingKeys.  Returns *pEqual =
1387
 * false if any column is judged "changed".
1388
 */
1389
static int32_t checkPendingKeysCompatible(
2,147,483,647✔
1390
    SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock,
1391
    int32_t rowIndex, bool* pEqual) {
1392
  int32_t keyNum = taosArrayGetSize(pInfo->stateCols);
2,147,483,647✔
1393
  *pEqual = true;
2,147,483,647✔
1394
  for (int32_t i = 0; i < keyNum; ++i) {
2,147,483,647✔
1395
    SColumn* pStateCol = taosArrayGet(pInfo->stateCols, i);
2,147,483,647✔
1396
    SStateKeys* pKey = taosArrayGet(pInfo->pendingKeys, i);
2,147,483,647✔
1397
    SColumnInfoData* pColData =
1398
        taosArrayGet(pBlock->pDataBlock, pStateCol->slotId);
2,147,483,647✔
1399
    if (pColData == NULL || pColData->pData == NULL
2,147,483,647✔
1400
        || pKey == NULL || pKey->pData == NULL) {
2,147,483,647✔
1401
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1402
    }
1403
    struct SColumnDataAgg* pAgg = getBlockAggForSlot(pBlock, pStateCol->slotId);
2,147,483,647✔
1404
    if (colDataIsNull(pColData, pBlock->info.rows, rowIndex, pAgg)) {
2,147,483,647✔
1405
      continue;
2,147,483,647✔
1406
    }
1407
    if (pKey->isNull) {
2,147,483,647✔
1408
      continue;  /* init: not a change */
652,356,128✔
1409
    }
1410
    if (!compareVal(colDataGetData(pColData, rowIndex), pKey)) {
2,147,483,647✔
1411
      *pEqual = false;
2,024,961,767✔
1412
      return TSDB_CODE_SUCCESS;
2,024,961,767✔
1413
    }
1414
  }
1415
  return TSDB_CODE_SUCCESS;
850,841,123✔
1416
}
1417

1418
/*
1419
 * Update pendingKeys from a deferred partial-NULL row and mark
1420
 * touched columns.
1421
 */
1422
static int32_t updatePendingKeysFromRow(
850,841,123✔
1423
    SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock,
1424
    int32_t rowIndex) {
1425
  int32_t keyNum = taosArrayGetSize(pInfo->stateCols);
850,841,123✔
1426
  for (int32_t i = 0; i < keyNum; ++i) {
2,147,483,647✔
1427
    SColumn* pStateCol = taosArrayGet(pInfo->stateCols, i);
1,815,272,623✔
1428
    SStateKeys* pKey = taosArrayGet(pInfo->pendingKeys, i);
1,815,272,623✔
1429
    SColumnInfoData* pColData =
1430
        taosArrayGet(pBlock->pDataBlock, pStateCol->slotId);
1,815,272,623✔
1431
    if (pColData == NULL || pColData->pData == NULL) {
1,815,272,623✔
1432
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1433
    }
1434
    struct SColumnDataAgg* pAgg = getBlockAggForSlot(pBlock, pStateCol->slotId);
1,815,272,623✔
1435
    if (colDataIsNull(pColData, pBlock->info.rows, rowIndex, pAgg)) {
2,147,483,647✔
1436
      continue;
951,674,063✔
1437
    }
1438
    if (pKey->isNull) {
863,598,560✔
1439
      assignVal(pKey->pData, colDataGetData(pColData, rowIndex),
652,356,128✔
1440
                pColData->info.bytes, pKey->type);
652,356,128✔
1441
      pKey->isNull = false;
652,356,128✔
1442
    }
1443
    pInfo->pendingColTouched[i] = true;
863,598,560✔
1444
  }
1445
  pInfo->hasPendingPartialNull = true;
850,841,123✔
1446
  return TSDB_CODE_SUCCESS;
850,841,123✔
1447
}
1448

1449
/*
1450
 * At cut time, check whether the pending partial-NULL segment
1451
 * is dual-side compatible with the new window's first row.
1452
 *
1453
 * For each column touched by a pending partial-NULL row, check
1454
 * if the value in pendingKeys is compatible with the new row:
1455
 *   - new row col is NULL          -> compatible (can init)
1456
 *   - pending col undefined        -> compatible
1457
 *   - new row col equals pending   -> compatible
1458
 *   - otherwise                    -> NOT compatible
1459
 */
1460
static int32_t checkPendingDualSideCompatible(
2,147,483,647✔
1461
    SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock,
1462
    int32_t newRowIndex, bool* pCompatible) {
1463
  int32_t keyNum = taosArrayGetSize(pInfo->stateCols);
2,147,483,647✔
1464
  *pCompatible = true;
2,147,483,647✔
1465

1466
  if (!pInfo->hasPendingPartialNull) {
2,147,483,647✔
1467
    return TSDB_CODE_SUCCESS;
2,147,483,647✔
1468
  }
1469

1470
  for (int32_t i = 0; i < keyNum; ++i) {
2,134,291,547✔
1471
    if (!pInfo->pendingColTouched[i]) continue;
1,497,155,274✔
1472
    SColumn* pStateCol = taosArrayGet(pInfo->stateCols, i);
757,879,845✔
1473
    SStateKeys* pPendKey = taosArrayGet(pInfo->pendingKeys, i);
757,879,845✔
1474
    SColumnInfoData* pColData =
1475
        taosArrayGet(pBlock->pDataBlock, pStateCol->slotId);
757,879,845✔
1476
    if (pColData == NULL || pPendKey == NULL) {
757,879,845✔
1477
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1478
    }
1479
    struct SColumnDataAgg* pAgg = getBlockAggForSlot(pBlock, pStateCol->slotId);
757,879,845✔
1480
    if (colDataIsNull(pColData, pBlock->info.rows, newRowIndex, pAgg)) {
1,515,759,690✔
1481
      continue;
650,683,248✔
1482
    }
1483
    if (pPendKey->isNull) {
107,196,597✔
1484
      continue;
×
1485
    }
1486
    if (!compareVal(colDataGetData(pColData, newRowIndex), pPendKey)) {
107,196,597✔
1487
      *pCompatible = false;
106,417,516✔
1488
      return TSDB_CODE_SUCCESS;
106,417,516✔
1489
    }
1490
  }
1491
  return TSDB_CODE_SUCCESS;
637,136,273✔
1492
}
1493

1494
/*
1495
 * Commit pending rows to the current (old) window: add
1496
 * numNullRows to numOfRows, copy pendingKeys -> stateKeys.
1497
 */
1498
static void commitPendingToOldWindow(
106,416,720✔
1499
    SStateWindowOperatorInfo* pInfo, SWindowRowsSup* pRowSup) {
1500
  /*
1501
   * Partial-NULL rows must belong to a window (cannot be dropped).
1502
   * Add them to numOfRows now.  Remaining all-NULL rows in
1503
   * numNullRows are left for doKeepCurStateWindowEndInfo/EXTEND.
1504
   */
1505
  if (pRowSup->numDeferredPartialNull > 0) {
106,416,720✔
1506
    pRowSup->win.ekey = pRowSup->lastDeferredPartialNullTs;
106,350,300✔
1507
  }
1508
  pRowSup->numOfRows += pRowSup->numDeferredPartialNull;
106,416,720✔
1509
  if (pRowSup->numNullRows < pRowSup->numDeferredPartialNull) {
106,416,720✔
1510
    qError("%s:%d numNullRows(%u) < numDeferredPartialNull(%u), clamping",
×
1511
           __func__, __LINE__, pRowSup->numNullRows,
1512
           pRowSup->numDeferredPartialNull);
1513
    pRowSup->numNullRows = 0;
×
1514
  } else {
1515
    pRowSup->numNullRows -= pRowSup->numDeferredPartialNull;
106,416,720✔
1516
  }
1517
  pRowSup->numDeferredPartialNull = 0;
106,416,720✔
1518
  pRowSup->firstDeferredPartialRowIndex = -1;
106,416,720✔
1519
  pRowSup->lastDeferredPartialNullTs = INT64_MIN;
106,416,720✔
1520

1521
  /* Only sync columns touched by deferred partial-NULL rows */
1522
  int32_t keyNum = taosArrayGetSize(pInfo->stateKeys);
106,416,720✔
1523
  for (int32_t i = 0; i < keyNum; ++i) {
319,926,537✔
1524
    if (!pInfo->pendingColTouched[i]) continue;
213,509,817✔
1525
    SStateKeys* pSrc = taosArrayGet(pInfo->pendingKeys, i);
110,045,097✔
1526
    SStateKeys* pDst = taosArrayGet(pInfo->stateKeys, i);
110,045,097✔
1527
    if (pSrc == NULL || pDst == NULL) continue;
110,045,097✔
1528
    pDst->isNull = pSrc->isNull;
110,045,097✔
1529
    if (!pSrc->isNull) {
110,045,097✔
1530
      memcpy(pDst->pData, pSrc->pData, pSrc->bytes);
110,045,097✔
1531
    }
1532
  }
1533
  resetPendingState(pInfo);
106,416,720✔
1534
}
106,416,720✔
1535

1536
static bool shouldSplitDeferredPartialStandalone(const SStateWindowOperatorInfo* pInfo,
2,147,483,647✔
1537
             const SWindowRowsSup* pRowSup,
1538
             bool dualSide,
1539
             EStateWinExtendOption extendOption) {
1540
  return (!dualSide && extendOption == STATE_WIN_EXTEND_OPTION_FORWARD &&
106,417,516✔
1541
          pRowSup->numDeferredPartialNull > 0 &&
1,194✔
1542
    pRowSup->firstDeferredPartialRowIndex >= 0 &&
2,147,483,647✔
1543
    stateWindowKeysAllDefined(pInfo));
1,194✔
1544
}
1545

1546
static int32_t processStandaloneDeferredPartialWindow(
796✔
1547
    SStateWindowOperatorInfo* pInfo, SWindowRowsSup* pRowSup, SSDataBlock* pBlock,
1548
    SExecTaskInfo* pTaskInfo, SExprSupp* pExprSup, int32_t numOfOutput) {
1549
  if (pRowSup->numDeferredPartialNull == 0 ||
796✔
1550
      pRowSup->firstDeferredPartialRowIndex < 0) {
796✔
1551
    return TSDB_CODE_SUCCESS;
×
1552
  }
1553

1554
  SWindowRowsSup partialWin = {0};
796✔
1555
  partialWin.groupId = pRowSup->groupId;
796✔
1556
  partialWin.startRowIndex = pRowSup->firstDeferredPartialRowIndex;
796✔
1557
  partialWin.numOfRows = (int32_t)pRowSup->numDeferredPartialNull;
796✔
1558
  partialWin.win.skey = pRowSup->win.ekey + 1;
796✔
1559
  partialWin.win.ekey = pRowSup->lastDeferredPartialNullTs;
796✔
1560

1561
  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
796✔
1562
  if (pTsCol == NULL || pTsCol->pData == NULL) {
796✔
1563
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1564
  }
1565

1566
  int32_t code = processClosedStateWindow(
796✔
1567
      pInfo, &partialWin, pBlock, pTaskInfo, pExprSup, numOfOutput, true);
1568
  if (code != TSDB_CODE_SUCCESS) {
796✔
1569
    return code;
×
1570
  }
1571

1572
  /*
1573
   * Keep window continuity for EXTEND(2): after emitting a standalone
1574
   * deferred partial-NULL window, the next window start is derived from
1575
   * previous ekey + 1. Update ekey here so the next window starts right
1576
   * after the standalone window instead of after the old window.
1577
   */
1578
  pRowSup->win.ekey = partialWin.win.ekey;
796✔
1579

1580
  if (pRowSup->numNullRows < pRowSup->numDeferredPartialNull) {
796✔
1581
    qError("%s:%d numNullRows(%u) < numDeferredPartialNull(%u), clamping",
×
1582
           __func__, __LINE__, pRowSup->numNullRows,
1583
           pRowSup->numDeferredPartialNull);
1584
    pRowSup->numNullRows = 0;
×
1585
  } else {
1586
    pRowSup->numNullRows -= pRowSup->numDeferredPartialNull;
796✔
1587
  }
1588
  pRowSup->numDeferredPartialNull = 0;
796✔
1589
  pRowSup->firstDeferredPartialRowIndex = -1;
796✔
1590
  pRowSup->lastDeferredPartialNullTs = INT64_MIN;
796✔
1591
  resetPendingState(pInfo);
796✔
1592
  return TSDB_CODE_SUCCESS;
796✔
1593
}
1594

1595
/*
1596
 * Two-phase comparison against pendingKeys.
1597
 *
1598
 * Phase 1 (read-only):
1599
 *   - ri is NULL          -> skip
1600
 *   - ri non-NULL, pi undefined -> not a change
1601
 *   - ri non-NULL, pi defined   -> compare; ri != pi => changed
1602
 *
1603
 * Phase 2 (commit):
1604
 *   - only when no column changed, initialize undefined pi with ri.
1605
 *
1606
 * This ensures "undefined -> defined" does not force a cut-window by itself.
1607
 */
1608
static int32_t compareStateWindowKeys(
2,147,483,647✔
1609
    SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock,
1610
    int32_t rowIndex, bool* pEqual) {
1611
  int32_t keyNum = taosArrayGetSize(pInfo->stateCols);
2,147,483,647✔
1612
  *pEqual = true;
2,147,483,647✔
1613
  for (int32_t i = 0; i < keyNum; ++i) {
2,147,483,647✔
1614
    SColumn* pStateCol = taosArrayGet(pInfo->stateCols, i);
2,147,483,647✔
1615
    SStateKeys* pKey = taosArrayGet(pInfo->pendingKeys, i);
2,147,483,647✔
1616
    SColumnInfoData* pColData =
1617
        taosArrayGet(pBlock->pDataBlock, pStateCol->slotId);
2,147,483,647✔
1618
    if (pColData == NULL || pColData->pData == NULL
2,147,483,647✔
1619
        || pKey == NULL || pKey->pData == NULL) {
2,147,483,647✔
1620
      qError("%s invalid state key at slotId:%d",
453✔
1621
             __func__, pStateCol->slotId);
1622
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1623
    }
1624

1625
    struct SColumnDataAgg* pAgg = getBlockAggForSlot(pBlock, pStateCol->slotId);
2,147,483,647✔
1626

1627
    if (colDataIsNull(pColData, pBlock->info.rows, rowIndex, pAgg)) {
2,147,483,647✔
1628
      continue;
×
1629
    }
1630

1631
    if (pKey->isNull) {
2,147,483,647✔
1632
      continue;
400,456,436✔
1633
    }
1634

1635
    if (!compareVal(colDataGetData(pColData, rowIndex), pKey)) {
2,147,483,647✔
1636
      *pEqual = false;
2,147,483,647✔
1637
      return TSDB_CODE_SUCCESS;
2,147,483,647✔
1638
    }
1639
  }
1640

1641
  /* phase 2: no change, initialize undefined columns */
1642
  for (int32_t i = 0; i < keyNum; ++i) {
2,147,483,647✔
1643
    SColumn* pStateCol = taosArrayGet(pInfo->stateCols, i);
2,147,483,647✔
1644
    SStateKeys* pKey = taosArrayGet(pInfo->pendingKeys, i);
2,147,483,647✔
1645
    SColumnInfoData* pColData =
1646
        taosArrayGet(pBlock->pDataBlock, pStateCol->slotId);
2,147,483,647✔
1647
    if (pColData == NULL || pColData->pData == NULL
2,147,483,647✔
1648
        || pKey == NULL || pKey->pData == NULL) {
2,147,483,647✔
1649
      qError("%s invalid state key at slotId:%d",
×
1650
             __func__, pStateCol->slotId);
1651
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1652
    }
1653
    if (!pKey->isNull) {
2,147,483,647✔
1654
      continue;
2,147,483,647✔
1655
    }
1656

1657
    struct SColumnDataAgg* pAgg = getBlockAggForSlot(pBlock, pStateCol->slotId);
1,579,511✔
1658
    if (colDataIsNull(pColData, pBlock->info.rows, rowIndex, pAgg)) {
3,159,906✔
1659
      continue;
×
1660
    }
1661

1662
    assignVal(pKey->pData, colDataGetData(pColData, rowIndex),
1,579,953✔
1663
              pColData->info.bytes, pKey->type);
1,579,953✔
1664
    pKey->isNull = false;
1,579,953✔
1665
  }
1666
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
1667
}
1668

1669
// process a data block for state window aggregation
1670
// scan from startIndex to endIndex
1671
// numPartialCalcRows returns the number of rows that have been
1672
// partially calculated within the block
1673
static void doStateWindowAggImpl(SOperatorInfo* pOperator,
37,178,322✔
1674
                                 SStateWindowOperatorInfo* pInfo,
1675
                                 SSDataBlock* pBlock, int32_t* startIndex,
1676
                                 int32_t* endIndex,
1677
                                 int32_t* numPartialCalcRows) {
1678
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
37,178,322✔
1679
  SExprSupp*     pExprSup = &pOperator->exprSupp;
37,178,322✔
1680
  uint64_t gid = pBlock->info.id.groupId;
37,178,322✔
1681
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
37,178,764✔
1682

1683
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock,
37,178,322✔
1684
                                               pInfo->tsSlotId);
37,178,322✔
1685
  if (NULL == pColInfoData) {
37,178,322✔
1686
    pTaskInfo->code = terrno;
×
1687
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1688
  }
1689
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
37,178,322✔
1690

1691
  EStateWinExtendOption  extendOption = pInfo->extendOption;
37,178,322✔
1692
  SWindowRowsSup*        pRowSup = &pInfo->winSup;
37,178,764✔
1693

1694
  if (pRowSup->groupId != gid) {
37,177,869✔
1695
    /*
1696
      group changed, process the previous group's unclosed state window first
1697
    */
1698
    doKeepCurStateWindowEndInfo(pRowSup, tsList, 0, &extendOption, false);
13,608,273✔
1699
    int32_t code = processClosedStateWindow(pInfo, pRowSup, pBlock, pTaskInfo,
13,608,726✔
1700
                                            pExprSup, numOfOutput, true);
1701
    if (TSDB_CODE_SUCCESS != code) T_LONG_JMP(pTaskInfo->env, code);
13,608,273✔
1702
    *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
13,608,273✔
1703

1704
    /*
1705
      unhandled null rows should be ignored, since they belong to previous group
1706
    */
1707
    *numPartialCalcRows += pRowSup->numNullRows;
13,609,168✔
1708

1709
    /*
1710
      reset state window info for new group
1711
    */
1712
    pInfo->hasKey = false;
13,608,273✔
1713
    resetStateKeysUndefined(pInfo);
13,608,726✔
1714
    resetPendingState(pInfo);
13,608,726✔
1715
    resetWindowRowsSup(pRowSup);
13,608,726✔
1716
  }
1717

1718
  for (int32_t j = *startIndex; j < *endIndex; ++j) {
2,147,483,647✔
1719
    if (pBlock->info.scanFlag != PRE_SCAN) {
2,147,483,647✔
1720
      if (pInfo->winSup.lastTs == INT64_MIN || gid != pRowSup->groupId || !pInfo->hasKey) {
2,147,483,647✔
1721
        pInfo->winSup.lastTs = tsList[j];
453,269,230✔
1722
      } else {
1723
        if (tsList[j] == pInfo->winSup.lastTs) {
2,147,483,647✔
1724
          // forbid duplicated ts rows
1725
          qError("%s:%d duplicated ts found in state window aggregation", __FILE__, __LINE__);
26,803✔
1726
          pTaskInfo->code = TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP;
26,803✔
1727
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP);
26,803✔
1728
        } else {
1729
          pInfo->winSup.lastTs = tsList[j];
2,147,483,647✔
1730
        }
1731
      }
1732
    }
1733
    bool    allNull = false;
2,147,483,647✔
1734
    bool    hasNull = false;
2,147,483,647✔
1735
    int32_t code = stateWindowRowNullCheck(pInfo, pBlock, j, &allNull, &hasNull);
2,147,483,647✔
1736
    if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1737
      pTaskInfo->code = code;
×
1738
      T_LONG_JMP(pTaskInfo->env, code);
×
1739
    }
1740

1741
    if (allNull) {
2,147,483,647✔
1742
      doKeepStateWindowNullInfo(pRowSup, tsList[j]);
1,209,675,498✔
1743
      if (pRowSup->numDeferredPartialNull > 0) {
1,209,675,498✔
1744
        pRowSup->numDeferredTailAllNull++;
28,232,277✔
1745
      }
1746
      continue;
1,209,675,498✔
1747
    }
1748

1749
    if (!pInfo->hasKey) {
2,147,483,647✔
1750
      code = assignStateWindowKeys(pInfo, pBlock, j);
6,296,982✔
1751
      if (TSDB_CODE_SUCCESS != code) {
6,296,993✔
1752
        pTaskInfo->code = code;
×
1753
        T_LONG_JMP(pTaskInfo->env, code);
×
1754
      }
1755
      syncPendingKeysFromState(pInfo);
6,296,993✔
1756
      pInfo->hasKey = true;
6,296,993✔
1757
      doKeepNewStateWindowStartInfo(
6,296,993✔
1758
        pRowSup, tsList, j, gid, &extendOption, false);
1759
      doKeepTuple(pRowSup, tsList[j], j, gid);
6,296,993✔
1760
    } else if (hasNull) {
2,147,483,647✔
1761
      /*
1762
       * Partial NULL row: check compatibility against pendingKeys
1763
       * (read-only).  If compatible, defer like an all-NULL row
1764
       * so that EXTEND decides its final window assignment.
1765
       */
1766
      bool keysEqual = false;
2,147,483,647✔
1767
      code = checkPendingKeysCompatible(pInfo, pBlock, j, &keysEqual);
2,147,483,647✔
1768
      if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1769
        pTaskInfo->code = code;
×
1770
        T_LONG_JMP(pTaskInfo->env, code);
×
1771
      }
1772
      if (keysEqual) {
2,147,483,647✔
1773
        absorbDeferredTailAllNull(pRowSup);
1774
        code = updatePendingKeysFromRow(pInfo, pBlock, j);
850,841,123✔
1775
        if (TSDB_CODE_SUCCESS != code) {
850,841,123✔
1776
          pTaskInfo->code = code;
×
1777
          T_LONG_JMP(pTaskInfo->env, code);
×
1778
        }
1779
        doKeepStateWindowNullInfo(pRowSup, tsList[j]);
850,841,123✔
1780
        if (pRowSup->numDeferredPartialNull == 0) {
850,841,123✔
1781
          pRowSup->firstDeferredPartialRowIndex = j;
745,357,297✔
1782
        }
1783
        pRowSup->numDeferredPartialNull++;
850,841,123✔
1784
        pRowSup->lastDeferredPartialNullTs = tsList[j];
850,841,123✔
1785
        continue;
850,841,123✔
1786
      }
1787
      /* not compatible → resolve pending, then cut */
1788
      bool dualSide = false;
2,024,961,767✔
1789
      code = checkPendingDualSideCompatible(pInfo, pBlock, j, &dualSide);
2,024,961,767✔
1790
      if (TSDB_CODE_SUCCESS != code) {
2,024,961,767✔
1791
        pTaskInfo->code = code;
×
1792
        T_LONG_JMP(pTaskInfo->env, code);
×
1793
      }
1794
      bool splitStandalone =
1795
          shouldSplitDeferredPartialStandalone(pInfo, pRowSup, dualSide, extendOption);
2,024,961,767✔
1796
      if (!dualSide && !splitStandalone) {
2,024,961,767✔
1797
        commitPendingToOldWindow(pInfo, pRowSup);
103,030,997✔
1798
      }
1799
      /* numNullRows left for EXTEND when dualSide==true */
1800
      doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption, true);
2,024,961,767✔
1801
      code = processClosedStateWindow(pInfo, pRowSup, pBlock, pTaskInfo,
2,024,961,767✔
1802
                                              pExprSup, numOfOutput, true);
1803
      if (TSDB_CODE_SUCCESS != code) {
2,024,961,767✔
1804
        T_LONG_JMP(pTaskInfo->env, code);
×
1805
      }
1806
      *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
2,024,961,767✔
1807
      if (splitStandalone) {
2,024,961,767✔
1808
        code = processStandaloneDeferredPartialWindow(
×
1809
            pInfo, pRowSup, pBlock, pTaskInfo, pExprSup, numOfOutput);
1810
        if (TSDB_CODE_SUCCESS != code) {
×
1811
          T_LONG_JMP(pTaskInfo->env, code);
×
1812
        }
1813
      }
1814

1815
      doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid,
2,024,961,767✔
1816
                                    &extendOption, true);
1817
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,024,961,767✔
1818
      resetStateKeysUndefined(pInfo);
2,024,961,767✔
1819
      code = assignStateWindowKeys(pInfo, pBlock, j);
2,024,961,767✔
1820
      if (TSDB_CODE_SUCCESS != code) {
2,024,961,767✔
1821
        pTaskInfo->code = code;
×
1822
        T_LONG_JMP(pTaskInfo->env, code);
×
1823
      }
1824
      syncPendingKeysFromState(pInfo);
2,024,961,767✔
1825
    } else {
1826
      /* all non-NULL row */
1827
      bool keysEqual = false;
2,147,483,647✔
1828
      code = compareStateWindowKeys(pInfo, pBlock, j, &keysEqual);
2,147,483,647✔
1829
      if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1830
        pTaskInfo->code = code;
×
1831
        T_LONG_JMP(pTaskInfo->env, code);
×
1832
      }
1833
      if (keysEqual) {
2,147,483,647✔
1834
        /*
1835
         * Flush pending: sync committed stateKeys from pendingKeys
1836
         * (deferred rows may have initialized undefined columns),
1837
         * then doKeepTuple absorbs numNullRows (all-NULL + partial-NULL).
1838
         */
1839
        syncStateKeysFromPending(pInfo);
2,147,483,647✔
1840
        doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
1841
        pRowSup->numDeferredPartialNull = 0;
2,147,483,647✔
1842
        pRowSup->firstDeferredPartialRowIndex = -1;
2,147,483,647✔
1843
        resetPendingState(pInfo);
2,147,483,647✔
1844
        continue;
2,147,483,647✔
1845
      }
1846
      /* not compatible → resolve pending, then cut */
1847
      bool dualSide = false;
2,147,483,647✔
1848
      code = checkPendingDualSideCompatible(pInfo, pBlock, j, &dualSide);
2,147,483,647✔
1849
      if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1850
        pTaskInfo->code = code;
×
1851
        T_LONG_JMP(pTaskInfo->env, code);
×
1852
      }
1853
      bool splitStandalone =
1854
          shouldSplitDeferredPartialStandalone(pInfo, pRowSup, dualSide, extendOption);
2,147,483,647✔
1855
      if (!dualSide && !splitStandalone) {
2,147,483,647✔
1856
        commitPendingToOldWindow(pInfo, pRowSup);
3,385,723✔
1857
      }
1858
      doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption, true);
2,147,483,647✔
1859
      code = processClosedStateWindow(pInfo, pRowSup, pBlock, pTaskInfo,
2,147,483,647✔
1860
                                              pExprSup, numOfOutput, true);
1861
      if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1862
        T_LONG_JMP(pTaskInfo->env, code);
×
1863
      }
1864
      *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
2,147,483,647✔
1865
      if (splitStandalone) {
2,147,483,647✔
1866
        code = processStandaloneDeferredPartialWindow(
796✔
1867
            pInfo, pRowSup, pBlock, pTaskInfo, pExprSup, numOfOutput);
1868
        if (TSDB_CODE_SUCCESS != code) {
796✔
1869
          T_LONG_JMP(pTaskInfo->env, code);
×
1870
        }
1871
      }
1872

1873
      doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid,
2,147,483,647✔
1874
                                    &extendOption, true);
1875
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
1876
      resetStateKeysUndefined(pInfo);
2,147,483,647✔
1877
      code = assignStateWindowKeys(pInfo, pBlock, j);
2,147,483,647✔
1878
      if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1879
        pTaskInfo->code = code;
×
1880
        T_LONG_JMP(pTaskInfo->env, code);
×
1881
      }
1882
      syncPendingKeysFromState(pInfo);
2,147,483,647✔
1883
    }
1884
  }
1885

1886
  if (!pInfo->hasKey && extendOption != STATE_WIN_EXTEND_OPTION_FORWARD) {
37,152,403✔
1887
    /*
1888
      No valid state rows within the block and we don't care about
1889
      null rows before valid state window, mark them as processed and drop them
1890
    */
1891
    *numPartialCalcRows = pBlock->info.rows;
5,901,399✔
1892
    resetNumNullRows(pRowSup);
5,901,399✔
1893
    return;
5,901,399✔
1894
  }
1895
  if (pRowSup->numOfRows == 0 &&
31,250,562✔
1896
      extendOption != STATE_WIN_EXTEND_OPTION_BACKWARD) {
5,033,205✔
1897
    /*
1898
      If no valid state window or we don't know the belonging of
1899
      null rows in the end of the block, handle them with next block
1900
    */
1901
    return;
4,207,554✔
1902
  }
1903
  doKeepCurStateWindowEndInfo(pRowSup, tsList, *endIndex, &extendOption, false);
27,042,566✔
1904
  int32_t code = processClosedStateWindow(pInfo, pRowSup, pBlock, pTaskInfo,
27,043,008✔
1905
                                          pExprSup, numOfOutput, false);
1906
  if (TSDB_CODE_SUCCESS != code) {
27,042,566✔
1907
    pTaskInfo->code = code;
×
1908
    T_LONG_JMP(pTaskInfo->env, code);
×
1909
  }
1910
  *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
27,042,566✔
1911
  // reset part of pRowSup after doing agg calculation
1912
  pRowSup->startRowIndex = 0;
27,043,008✔
1913
  pRowSup->numOfRows = 0;
27,042,566✔
1914
}
1915

1916
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
4,534,684✔
1917
  if (OPTR_IS_OPENED(pOperator)) {
4,534,684✔
1918
    return TSDB_CODE_SUCCESS;
3,248,799✔
1919
  }
1920

1921
  int32_t                   code = TSDB_CODE_SUCCESS;
1,285,885✔
1922
  int32_t                   lino = 0;
1,285,885✔
1923
  SStateWindowOperatorInfo* pInfo = pOperator->info;
1,285,885✔
1924
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1,285,885✔
1925

1926
  SExprSupp* pSup = &pOperator->exprSupp;
1,285,885✔
1927
  int32_t    order = pInfo->binfo.inputTsOrder;
1,285,885✔
1928

1929
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,285,885✔
1930
  pInfo->cleanGroupResInfo = false;
1,285,885✔
1931

1932
  SSDataBlock* pUnfinishedBlock = NULL;
1,285,885✔
1933
  int32_t      startIndex = 0;
1,285,885✔
1934
  int32_t      endIndex = 0;
1,285,885✔
1935
  int32_t      numPartialCalcRows = 0;
1,285,885✔
1936
  while (1) {
37,151,961✔
1937
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
38,437,846✔
1938
    if (pBlock == NULL) {
38,437,846✔
1939
      if (pUnfinishedBlock != NULL) {
1,243,395✔
1940
        blockDataDestroy(pUnfinishedBlock);
54,915✔
1941
        pUnfinishedBlock = NULL;
54,915✔
1942
        resetWindowRowsSup(&pInfo->winSup);
54,915✔
1943
      }
1944
      break;
1,243,395✔
1945
    }
1946
    
1947
    // mark whether pUnfinishedBlock is a reference to pBlock
1948
    bool isRef = false;
37,194,451✔
1949
    startIndex = 0;
37,194,451✔
1950
    if (pUnfinishedBlock != NULL) {
37,194,451✔
1951
      startIndex = pUnfinishedBlock->info.rows;
7,928,691✔
1952
      // merge unfinished block with current block
1953
      code = blockDataMerge(pUnfinishedBlock, pBlock);
7,928,691✔
1954
      // reset id to current block id
1955
      pUnfinishedBlock->info.id = pBlock->info.id;
7,928,691✔
1956
      QUERY_CHECK_CODE(code, lino, _end);
7,928,691✔
1957
    } else {
1958
      pUnfinishedBlock = pBlock;
29,265,760✔
1959
      isRef = true;
29,265,760✔
1960
    }
1961
    endIndex = pUnfinishedBlock->info.rows;
37,194,451✔
1962

1963
    pInfo->binfo.pRes->info.scanFlag = pUnfinishedBlock->info.scanFlag;
37,194,451✔
1964
    pInfo->binfo.pRes->info.dataLoad = 1;
37,194,451✔
1965
    code = setInputDataBlock(
37,194,451✔
1966
      pSup, pUnfinishedBlock, order, pUnfinishedBlock->info.scanFlag, true);
37,194,451✔
1967
    QUERY_CHECK_CODE(code, lino, _end);
37,193,998✔
1968

1969
    code = blockDataUpdateTsWindow(pUnfinishedBlock, pInfo->tsSlotId);
37,193,998✔
1970
    QUERY_CHECK_CODE(code, lino, _end);
37,194,451✔
1971

1972
    // there is an scalar expression that 
1973
    // needs to be calculated right before apply the group aggregation.
1974
    if (pInfo->scalarSup.pExprInfo != NULL) {
37,194,451✔
1975
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo,
706,658✔
1976
        pUnfinishedBlock, pUnfinishedBlock, pInfo->scalarSup.pCtx,
1977
        pInfo->scalarSup.numOfExprs, NULL,
1978
        GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
353,329✔
1979
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
353,329✔
1980
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
15,687✔
1981
      }
1982
    }
1983

1984
    doStateWindowAggImpl(pOperator, pInfo, pUnfinishedBlock, 
37,178,764✔
1985
      &startIndex, &endIndex, &numPartialCalcRows);
1986
    if (numPartialCalcRows < pUnfinishedBlock->info.rows) {
37,151,519✔
1987
      // save unfinished block for next round processing
1988
      if (isRef) {
7,983,606✔
1989
        code = createOneDataBlock(pBlock, true, &pUnfinishedBlock);
3,147,459✔
1990
        QUERY_CHECK_CODE(code, lino, _end);
3,147,459✔
1991
      }
1992
      code = blockDataTrimFirstRows(pUnfinishedBlock, numPartialCalcRows);
7,983,606✔
1993
      QUERY_CHECK_NULL(pUnfinishedBlock, code, lino, _end, terrno);
7,983,606✔
1994
    } else {
1995
      if (!isRef) {
29,167,913✔
1996
        blockDataDestroy(pUnfinishedBlock);
3,092,544✔
1997
      }
1998
      pUnfinishedBlock = NULL;
29,167,913✔
1999
    }
2000
    numPartialCalcRows = 0;
37,151,961✔
2001
  }
2002

2003
  if (pInfo->indefRowsMode) {
1,243,395✔
2004
    code = closeAllIndefRowsWindowStates(pOperator, &pInfo->indefRows);
10,517✔
2005
    QUERY_CHECK_CODE(code, lino, _end);
10,517✔
2006
  } else {
2007
    code = initGroupedResultInfo(
1,232,878✔
2008
      &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
2009
    QUERY_CHECK_CODE(code, lino, _end);
1,232,878✔
2010
    pInfo->cleanGroupResInfo = true;
1,232,878✔
2011
  }
2012
  pOperator->status = OP_RES_TO_RETURN;
1,243,395✔
2013

2014
_end:
1,243,395✔
2015
  if (code != TSDB_CODE_SUCCESS) {
1,243,395✔
2016
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2017
    pTaskInfo->code = code;
×
2018
    T_LONG_JMP(pTaskInfo->env, code);
×
2019
  }
2020
  return code;
1,243,395✔
2021
}
2022

2023
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
5,565,273✔
2024
  if (pOperator->status == OP_EXEC_DONE) {
5,565,273✔
2025
    (*ppRes) = NULL;
1,030,589✔
2026
    return TSDB_CODE_SUCCESS;
1,030,589✔
2027
  }
2028

2029
  int32_t                   code = TSDB_CODE_SUCCESS;
4,534,684✔
2030
  int32_t                   lino = 0;
4,534,684✔
2031
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
4,534,684✔
2032
  SStateWindowOperatorInfo* pInfo = pOperator->info;
4,534,684✔
2033
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
4,534,684✔
2034

2035
  code = pOperator->fpSet._openFn(pOperator);
4,534,684✔
2036
  QUERY_CHECK_CODE(code, lino, _end);
4,492,194✔
2037

2038
  if (pInfo->indefRowsMode) {
4,492,194✔
2039
    (*ppRes) = getNextIndefRowsResultBlock(&pInfo->indefRows, pOperator);
73,304✔
2040
    if ((*ppRes) == NULL) {
73,304✔
2041
      setOperatorCompleted(pOperator);
10,517✔
2042
    }
2043
    return code;
73,304✔
2044
  }
2045

2046
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
4,418,890✔
2047
  QUERY_CHECK_CODE(code, lino, _end);
4,418,890✔
2048

2049
  while (1) {
×
2050
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
4,418,890✔
2051
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
4,418,890✔
2052
    QUERY_CHECK_CODE(code, lino, _end);
4,418,890✔
2053

2054
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
4,418,890✔
2055
    if (!hasRemain) {
4,418,890✔
2056
      setOperatorCompleted(pOperator);
1,139,152✔
2057
      break;
1,139,152✔
2058
    }
2059

2060
    if (pBInfo->pRes->info.rows > 0) {
3,279,738✔
2061
      break;
3,279,738✔
2062
    }
2063
  }
2064

2065
_end:
4,418,890✔
2066
  if (code != TSDB_CODE_SUCCESS) {
4,418,890✔
2067
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2068
    pTaskInfo->code = code;
×
2069
    T_LONG_JMP(pTaskInfo->env, code);
×
2070
  }
2071
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
4,418,890✔
2072
  return code;
4,418,890✔
2073
}
2074

2075
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
32,203,833✔
2076
  int32_t                   code = TSDB_CODE_SUCCESS;
32,203,833✔
2077
  int32_t                   lino = 0;
32,203,833✔
2078
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
32,203,833✔
2079
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
32,206,733✔
2080

2081
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
32,203,833✔
2082
    (*ppRes) = NULL;
3,779,366✔
2083
    return code;
3,779,366✔
2084
  }
2085

2086
  if (pOperator->pOperatorGetParam) {
28,427,948✔
2087
    if (pOperator->status == OP_EXEC_DONE && pOperator->fpSet.resetStateFn) {
69,892✔
2088
      code = pOperator->fpSet.resetStateFn(pOperator);
×
2089
      QUERY_CHECK_CODE(code, lino, _end);
×
2090
    }
2091
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
69,892✔
2092
    pOperator->pOperatorGetParam = NULL;
69,892✔
2093
  }
2094

2095
  SSDataBlock* pBlock = pInfo->binfo.pRes;
28,425,053✔
2096
  code = pOperator->fpSet._openFn(pOperator);
28,425,043✔
2097
  QUERY_CHECK_CODE(code, lino, _end);
28,382,140✔
2098

2099
  if (pInfo->indefRowsMode) {
28,382,140✔
2100
    (*ppRes) = getNextIndefRowsResultBlock(&pInfo->indefRows, pOperator);
1,146,654✔
2101
    if ((*ppRes) == NULL) {
1,146,654✔
2102
      setOperatorCompleted(pOperator);
74,088✔
2103
    }
2104
    return code;
1,146,654✔
2105
  }
2106

2107
  while (1) {
×
2108
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
27,235,881✔
2109
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
27,235,438✔
2110
    QUERY_CHECK_CODE(code, lino, _end);
27,235,137✔
2111

2112
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
27,235,137✔
2113
    if (!hasRemain) {
27,235,576✔
2114
      setOperatorCompleted(pOperator);
5,021,239✔
2115
      break;
5,020,662✔
2116
    }
2117

2118
    if (pBlock->info.rows > 0) {
22,214,337✔
2119
      break;
22,214,337✔
2120
    }
2121
  }
2122

2123
_end:
27,234,999✔
2124
  if (code != TSDB_CODE_SUCCESS) {
27,234,999✔
2125
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2126
    pTaskInfo->code = code;
×
2127
    T_LONG_JMP(pTaskInfo->env, code);
×
2128
  }
2129
  (*ppRes) = (pBlock->info.rows == 0) ? NULL : pBlock;
27,234,999✔
2130
  return code;
27,235,576✔
2131
}
2132

2133
static void destroyStateWindowOperatorInfo(void* param) {
1,216,408✔
2134
  if (param == NULL) {
1,216,408✔
2135
    return;
×
2136
  }
2137
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
1,216,408✔
2138
  cleanupBasicInfo(&pInfo->binfo);
1,216,408✔
2139
  cleanupIndefRowsRuntime(&pInfo->indefRows, pInfo->pOperator);
1,216,408✔
2140
  if (pInfo->stateKeys != NULL) {
1,216,408✔
2141
    int32_t keyNum = taosArrayGetSize(pInfo->stateKeys);
1,216,408✔
2142
    for (int32_t i = 0; i < keyNum; ++i) {
2,623,537✔
2143
      SStateKeys* pKey = taosArrayGet(pInfo->stateKeys, i);
1,407,129✔
2144
      taosMemoryFreeClear(pKey->pData);
1,407,129✔
2145
    }
2146
  }
2147
  taosArrayDestroy(pInfo->stateKeys);
1,216,408✔
2148
  if (pInfo->pendingKeys != NULL) {
1,216,408✔
2149
    int32_t keyNum = taosArrayGetSize(pInfo->pendingKeys);
1,216,408✔
2150
    for (int32_t i = 0; i < keyNum; ++i) {
2,623,537✔
2151
      SStateKeys* pKey = taosArrayGet(pInfo->pendingKeys, i);
1,407,129✔
2152
      taosMemoryFreeClear(pKey->pData);
1,407,129✔
2153
    }
2154
  }
2155
  taosArrayDestroy(pInfo->pendingKeys);
1,216,408✔
2156
  taosMemoryFreeClear(pInfo->pendingColTouched);
1,216,408✔
2157
  taosArrayDestroy(pInfo->stateCols);
1,216,408✔
2158
  if (pInfo->pOperator) {
1,216,408✔
2159
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,216,408✔
2160
                      pInfo->cleanGroupResInfo);
1,216,408✔
2161
    pInfo->pOperator = NULL;
1,216,408✔
2162
  }
2163

2164
  cleanupExprSupp(&pInfo->scalarSup);
1,216,408✔
2165
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,216,408✔
2166
  cleanupAggSup(&pInfo->aggSup);
1,216,408✔
2167
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,216,408✔
2168

2169
  taosMemoryFreeClear(param);
1,216,408✔
2170
}
2171

2172
static void freeItem(void* param) {
50,930✔
2173
  SGroupKeys* pKey = (SGroupKeys*)param;
50,930✔
2174
  taosMemoryFree(pKey->pData);
50,930✔
2175
}
50,930✔
2176

2177
void destroyIntervalOperatorInfo(void* param) {
6,118,294✔
2178
  if (param == NULL) {
6,118,294✔
2179
    return;
×
2180
  }
2181

2182
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
6,118,294✔
2183

2184
  cleanupBasicInfo(&pInfo->binfo);
6,118,294✔
2185
  cleanupIndefRowsRuntime(&pInfo->indefRows, pInfo->pOperator);
6,118,856✔
2186
  if (pInfo->pOperator) {
6,117,779✔
2187
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
6,079,103✔
2188
                      pInfo->cleanGroupResInfo);
6,078,852✔
2189
    pInfo->pOperator = NULL;
6,077,929✔
2190
  }
2191

2192
  cleanupAggSup(&pInfo->aggSup);
6,116,863✔
2193
  cleanupExprSupp(&pInfo->scalarSupp);
6,119,163✔
2194

2195
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
6,120,468✔
2196

2197
  taosArrayDestroy(pInfo->pInterpCols);
6,119,598✔
2198
  pInfo->pInterpCols = NULL;
6,117,921✔
2199

2200
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
6,118,298✔
2201
  pInfo->pPrevValues = NULL;
6,119,017✔
2202

2203
  cleanupGroupResInfo(&pInfo->groupResInfo);
6,119,598✔
2204
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
6,118,160✔
2205
  destroyBoundedQueue(pInfo->pBQ);
6,118,444✔
2206
  taosMemoryFreeClear(param);
6,114,877✔
2207
}
2208

2209
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
25,465✔
2210
  int32_t code = TSDB_CODE_SUCCESS;
25,465✔
2211
  int32_t lino = 0;
25,465✔
2212
  void*   tmp = NULL;
25,465✔
2213

2214
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
25,465✔
2215
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
25,465✔
2216

2217
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
25,465✔
2218
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
25,465✔
2219

2220
  {  // ts column
2221
    SColumn c = {0};
25,465✔
2222
    c.colId = 1;
25,465✔
2223
    c.slotId = pInfo->primaryTsIndex;
25,465✔
2224
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
25,465✔
2225
    c.bytes = sizeof(int64_t);
25,465✔
2226
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
25,465✔
2227
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
25,465✔
2228

2229
    SGroupKeys key;
25,465✔
2230
    key.bytes = c.bytes;
25,465✔
2231
    key.type = c.type;
25,465✔
2232
    key.isNull = true;  // to denote no value is assigned yet
25,465✔
2233
    key.pData = taosMemoryCalloc(1, c.bytes);
25,465✔
2234
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
25,465✔
2235

2236
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
25,465✔
2237
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
25,465✔
2238
  }
2239
_end:
25,465✔
2240
  if (code != TSDB_CODE_SUCCESS) {
25,465✔
2241
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2242
  }
2243
  return code;
25,465✔
2244
}
2245

2246
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
6,075,926✔
2247
                                      bool* pRes) {
2248
  // the primary timestamp column
2249
  bool    needed = false;
6,075,926✔
2250
  int32_t code = TSDB_CODE_SUCCESS;
6,075,926✔
2251
  int32_t lino = 0;
6,075,926✔
2252
  void*   tmp = NULL;
6,075,926✔
2253

2254
  for (int32_t i = 0; i < numOfCols; ++i) {
28,640,770✔
2255
    SExprInfo* pExpr = pCtx[i].pExpr;
22,604,803✔
2256
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
22,601,598✔
2257
      needed = true;
25,465✔
2258
      break;
25,465✔
2259
    }
2260
  }
2261

2262
  if (needed) {
6,061,432✔
2263
    code = initWindowInterpPrevVal(pInfo);
25,465✔
2264
    QUERY_CHECK_CODE(code, lino, _end);
25,465✔
2265
  }
2266

2267
  for (int32_t i = 0; i < numOfCols; ++i) {
28,648,496✔
2268
    SExprInfo* pExpr = pCtx[i].pExpr;
22,600,161✔
2269

2270
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
22,598,762✔
2271
      SFunctParam* pParam = &pExpr->base.pParam[0];
25,465✔
2272

2273
      SColumn c = *pParam->pCol;
25,465✔
2274
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
25,465✔
2275
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
25,465✔
2276

2277
      SGroupKeys key = {0};
25,465✔
2278
      key.bytes = c.bytes;
25,465✔
2279
      key.type = c.type;
25,465✔
2280
      key.isNull = false;
25,465✔
2281
      key.pData = taosMemoryCalloc(1, c.bytes);
25,465✔
2282
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
25,465✔
2283

2284
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
25,465✔
2285
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
25,465✔
2286
    }
2287
  }
2288

2289
_end:
6,048,335✔
2290
  if (code != TSDB_CODE_SUCCESS) {
6,048,866✔
2291
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2292
  }
2293
  *pRes = needed;
6,048,866✔
2294
  return code;
6,063,906✔
2295
}
2296

2297
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
10,272✔
2298
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
10,272✔
2299
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
10,272✔
2300
  pOper->status = OP_NOT_OPENED;
10,272✔
2301

2302
  resetBasicOperatorState(&pIntervalInfo->binfo);
10,272✔
2303
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
10,272✔
2304
    pIntervalInfo->cleanGroupResInfo);
10,272✔
2305

2306
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
10,272✔
2307
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
10,272✔
2308
  if (code == 0) {
10,272✔
2309
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
20,544✔
2310
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
10,272✔
2311
                       &pTaskInfo->storageAPI.functionStore);
2312
  }
2313
  if (code == 0) {
10,272✔
2314
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
10,272✔
2315
                         &pTaskInfo->storageAPI.functionStore);
2316
  }
2317

2318
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
10,272✔
2319
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
2320
  }
2321

2322
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
10,272✔
2323
    pIntervalInfo->curGroupId = UINT64_MAX;
×
2324
  }
2325

2326
  pIntervalInfo->cleanGroupResInfo = false;
10,272✔
2327
  pIntervalInfo->handledGroupNum = 0;
10,272✔
2328
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
10,272✔
2329
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
10,272✔
2330

2331
  taosArrayDestroy(pIntervalInfo->pInterpCols);
10,272✔
2332
  pIntervalInfo->pInterpCols = NULL;
10,272✔
2333

2334
  if (pIntervalInfo->pPrevValues != NULL) {
10,272✔
2335
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
2336
    pIntervalInfo->pPrevValues = NULL;
×
2337
    code = initWindowInterpPrevVal(pIntervalInfo);
×
2338
  }
2339

2340
  resetIndefRowsRuntime(&pIntervalInfo->indefRows, pIntervalInfo->pOperator);
10,272✔
2341

2342
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
10,272✔
2343
  destroyBoundedQueue(pIntervalInfo->pBQ);
10,272✔
2344
  pIntervalInfo->pBQ = NULL;
10,272✔
2345
  return code;
10,272✔
2346
}
2347

2348
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
8,648✔
2349
  SIntervalAggOperatorInfo* pInfo = pOper->info;
8,648✔
2350
  return resetInterval(pOper, pInfo);
8,648✔
2351
}
2352

2353
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
5,249,081✔
2354
                                   SOperatorInfo** pOptrInfo) {
2355
  QRY_PARAM_CHECK(pOptrInfo);
5,249,081✔
2356

2357
  int32_t                   code = TSDB_CODE_SUCCESS;
5,252,330✔
2358
  int32_t                   lino = 0;
5,252,330✔
2359
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
5,252,330✔
2360
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
5,241,350✔
2361
  if (pInfo == NULL || pOperator == NULL) {
5,244,308✔
2362
    code = terrno;
43✔
2363
    lino = __LINE__;
×
2364
    goto _error;
×
2365
  }
2366
  initOperatorCostInfo(pOperator);
5,244,265✔
2367

2368
  pOperator->pPhyNode = pPhyNode;
5,250,880✔
2369
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
5,251,461✔
2370
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
5,254,651✔
2371
  initBasicInfo(&pInfo->binfo, pResBlock);
5,254,651✔
2372

2373
  SExprSupp* pSup = &pOperator->exprSupp;
5,249,612✔
2374
  pSup->hasWindowOrGroup = true;
5,249,782✔
2375
  pSup->hasWindow = true;
5,246,537✔
2376

2377
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
5,253,247✔
2378

2379
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
5,252,915✔
2380
  initResultSizeInfo(&pOperator->resultInfo, 512);
5,252,915✔
2381
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5,248,463✔
2382
  QUERY_CHECK_CODE(code, lino, _error);
5,253,624✔
2383

2384
  int32_t    num = 0;
5,253,624✔
2385
  SExprInfo* pExprInfo = NULL;
5,252,343✔
2386
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
5,254,205✔
2387
  QUERY_CHECK_CODE(code, lino, _error);
5,252,294✔
2388

2389
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, NULL,
5,252,294✔
2390
                    &pTaskInfo->storageAPI.functionStore);
2391
  QUERY_CHECK_CODE(code, lino, _error);
5,251,705✔
2392

2393
  pInfo->indefRowsMode = pPhyNode->window.indefRowsFunc;
5,251,705✔
2394
  if (pInfo->indefRowsMode) {
5,250,752✔
2395
    code = initIndefRowsRuntime(&pInfo->indefRows, pOperator->exprSupp.pCtx, num, pOperator->resultInfo.capacity);
75,907✔
2396
    QUERY_CHECK_CODE(code, lino, _error);
75,907✔
2397
  }
2398

2399
  SInterval interval = {.interval = pPhyNode->interval,
15,734,083✔
2400
                        .sliding = pPhyNode->sliding,
5,250,078✔
2401
                        .intervalUnit = pPhyNode->intervalUnit,
5,251,547✔
2402
                        .slidingUnit = pPhyNode->slidingUnit,
5,251,547✔
2403
                        .offset = pPhyNode->offset,
5,242,706✔
2404
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
5,247,959✔
2405
                        .timeRange = pPhyNode->timeRange};
2406
  calcIntervalAutoOffset(&interval);
5,245,253✔
2407

2408
  STimeWindowAggSupp as = {
5,246,942✔
2409
      .maxTs = INT64_MIN,
2410
  };
2411

2412
  pInfo->win = pTaskInfo->window;
5,246,942✔
2413
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
5,247,325✔
2414
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
5,244,788✔
2415
  pInfo->interval = interval;
5,248,531✔
2416
  pInfo->twAggSup = as;
5,241,058✔
2417
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
5,241,623✔
2418
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
5,244,364✔
2419
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
1,177,933✔
2420
    pInfo->limited = true;
1,177,990✔
2421
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
1,177,988✔
2422
  }
2423
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
5,244,668✔
2424
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
114,443✔
2425
    pInfo->slimited = true;
114,288✔
2426
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
114,849✔
2427
    pInfo->curGroupId = UINT64_MAX;
114,849✔
2428
  }
2429

2430
  if (pPhyNode->window.pExprs != NULL) {
5,253,558✔
2431
    int32_t    numOfScalar = 0;
509,677✔
2432
    SExprInfo* pScalarExprInfo = NULL;
509,096✔
2433
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
507,511✔
2434
    QUERY_CHECK_CODE(code, lino, _error);
509,756✔
2435

2436
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
471,410✔
2437
    if (code != TSDB_CODE_SUCCESS) {
471,912✔
2438
      goto _error;
×
2439
    }
2440
  }
2441

2442
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
5,210,667✔
2443
                            pTaskInfo->pStreamRuntimeInfo);
5,212,500✔
2444
  if (code != TSDB_CODE_SUCCESS) {
5,212,245✔
2445
    goto _error;
×
2446
  }
2447

2448
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
5,212,245✔
2449
  QUERY_CHECK_CODE(code, lino, _error);
5,200,776✔
2450

2451
  pInfo->timeWindowInterpo = false;
5,200,776✔
2452
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
5,209,503✔
2453
  QUERY_CHECK_CODE(code, lino, _error);
5,199,963✔
2454
  if (pInfo->timeWindowInterpo) {
5,199,963✔
2455
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
25,465✔
2456
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
25,465✔
2457
      goto _error;
×
2458
    }
2459
  }
2460

2461
  pInfo->pOperator = pOperator;
5,202,278✔
2462
  pInfo->cleanGroupResInfo = false;
5,212,158✔
2463
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
5,203,272✔
2464
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
5,198,182✔
2465
                  pInfo, pTaskInfo);
2466

2467
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
5,203,405✔
2468
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2469
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
5,193,846✔
2470
  code = appendDownstream(pOperator, &downstream, 1);
5,192,695✔
2471
  if (code != TSDB_CODE_SUCCESS) {
5,210,377✔
2472
    goto _error;
×
2473
  }
2474

2475
  *pOptrInfo = pOperator;
5,210,377✔
2476
  return TSDB_CODE_SUCCESS;
5,209,956✔
2477

2478
_error:
38,346✔
2479
  if (pInfo != NULL) {
38,346✔
2480
    destroyIntervalOperatorInfo(pInfo);
38,346✔
2481
  }
2482

2483
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
38,346✔
2484
  pTaskInfo->code = code;
38,346✔
2485
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
38,346✔
2486
  return code;
38,346✔
2487
}
2488

2489
// todo handle multiple timeline cases. assume no timeline interweaving
2490
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
16,483,525✔
2491
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
16,483,525✔
2492
  SExprSupp*     pSup = &pOperator->exprSupp;
16,483,525✔
2493

2494
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
16,483,525✔
2495
  if (!pColInfoData) {
16,483,525✔
2496
    pTaskInfo->code = terrno;
×
2497
    T_LONG_JMP(pTaskInfo->env, terrno);
×
2498
  }
2499

2500
  bool    masterScan = true;
16,483,525✔
2501
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
16,483,525✔
2502
  int64_t gid = pBlock->info.id.groupId;
16,483,525✔
2503

2504
  int64_t gap = pInfo->gap;
16,483,525✔
2505

2506
  if (!pInfo->reptScan) {
16,483,525✔
2507
    pInfo->reptScan = true;
552,219✔
2508
    pInfo->winSup.prevTs = INT64_MIN;
552,219✔
2509
  }
2510

2511
  SWindowRowsSup* pRowSup = &pInfo->winSup;
16,483,525✔
2512
  pRowSup->numOfRows = 0;
16,483,525✔
2513
  pRowSup->startRowIndex = 0;
16,483,525✔
2514

2515
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
2516
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
16,483,525✔
2517
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
2,147,483,647✔
2518
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
2,147,483,647✔
2519
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
5,554,622✔
2520
      doKeepTuple(pRowSup, tsList[j], j, gid);
5,554,622✔
2521
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
2,147,483,647✔
2522
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
1,089,579,808✔
2523
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
2524
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
2525
    } else {  // start a new session window
2526
      // start a new session window
2527
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
1,089,482,826✔
2528
        // keep the time window for the closed time window.
2529
        STimeWindow window = pRowSup->win;
1,086,882,306✔
2530

2531
        int32_t ret = TSDB_CODE_SUCCESS;
1,086,882,306✔
2532
        if (pInfo->indefRowsMode) {
1,086,882,306✔
2533
          ret = applyIndefRowsWindowSegment(pOperator, &pInfo->indefRows, pInfo->binfo.pRes,
13,969✔
2534
                                            pInfo->aggSup.resultRowSize, gid, &window, pBlock,
2535
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pInfo->binfo.inputTsOrder,
2536
                                            true);
2537
        } else {
2538
          SResultRow* pResult = NULL;
1,086,868,337✔
2539
          ret =
2540
              setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
1,086,868,337✔
2541
                                     numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
2542
          if (ret == TSDB_CODE_SUCCESS) {
1,086,868,337✔
2543
            // pInfo->numOfRows data belong to the current session window
2544
            updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
1,086,868,337✔
2545
            ret =
2546
                applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
1,086,868,337✔
2547
                                                pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1,086,868,337✔
2548
          }
2549
        }
2550
        if (ret != TSDB_CODE_SUCCESS) {
1,086,882,306✔
2551
          T_LONG_JMP(pTaskInfo->env, ret);
1,359✔
2552
        }
2553
      }
2554

2555
      // here we start a new session window
2556
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
1,089,481,467✔
2557
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,089,481,467✔
2558
    }
2559
  }
2560

2561
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
16,482,166✔
2562
  int32_t ret = TSDB_CODE_SUCCESS;
16,482,166✔
2563
  if (pInfo->indefRowsMode) {
16,482,166✔
2564
    ret = applyIndefRowsWindowSegment(pOperator, &pInfo->indefRows, pInfo->binfo.pRes, pInfo->aggSup.resultRowSize,
21,972✔
2565
                                      gid, &pRowSup->win, pBlock, pRowSup->startRowIndex, pRowSup->numOfRows,
10,986✔
2566
                                      pInfo->binfo.inputTsOrder, false);
2567
  } else {
2568
    SResultRow* pResult = NULL;
16,471,180✔
2569
    ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
16,471,180✔
2570
                                 pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
2571
    if (ret == TSDB_CODE_SUCCESS) {
16,471,180✔
2572
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
16,471,180✔
2573
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
16,471,180✔
2574
                                            pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
16,471,180✔
2575
    }
2576
  }
2577
  if (ret != TSDB_CODE_SUCCESS) {
16,482,166✔
2578
    T_LONG_JMP(pTaskInfo->env, ret);
×
2579
  }
2580
}
16,482,166✔
2581

2582
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,999,927✔
2583
  if (pOperator->status == OP_EXEC_DONE) {
2,999,927✔
2584
    (*ppRes) = NULL;
405,313✔
2585
    return TSDB_CODE_SUCCESS;
405,313✔
2586
  }
2587

2588
  int32_t                  code = TSDB_CODE_SUCCESS;
2,594,614✔
2589
  int32_t                  lino = 0;
2,594,614✔
2590
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
2,594,614✔
2591
  SSessionAggOperatorInfo* pInfo = pOperator->info;
2,594,614✔
2592
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
2,594,614✔
2593
  SExprSupp*               pSup = &pOperator->exprSupp;
2,594,614✔
2594

2595
  if (pOperator->status == OP_RES_TO_RETURN) {
2,594,614✔
2596
    if (pInfo->indefRowsMode) {
2,000,009✔
2597
      (*ppRes) = getNextIndefRowsResultBlock(&pInfo->indefRows, pOperator);
21,160✔
2598
      if ((*ppRes) == NULL) {
21,160✔
2599
        setOperatorCompleted(pOperator);
7,820✔
2600
      }
2601
      return code;
21,160✔
2602
    }
2603

2604
    while (1) {
×
2605
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,978,849✔
2606
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,978,849✔
2607
      QUERY_CHECK_CODE(code, lino, _end);
1,978,849✔
2608

2609
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,978,849✔
2610
      if (!hasRemain) {
1,978,849✔
2611
        setOperatorCompleted(pOperator);
79,025✔
2612
        break;
79,025✔
2613
      }
2614

2615
      if (pBInfo->pRes->info.rows > 0) {
1,899,824✔
2616
        break;
1,899,824✔
2617
      }
2618
    }
2619
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1,978,849✔
2620
    return code;
1,978,849✔
2621
  }
2622

2623
  int32_t order = pInfo->binfo.inputTsOrder;
594,605✔
2624

2625
  SOperatorInfo* downstream = pOperator->pDownstream[0];
594,605✔
2626

2627
  pInfo->cleanGroupResInfo = false;
594,605✔
2628
  while (1) {
16,482,166✔
2629
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
17,076,771✔
2630
    if (pBlock == NULL) {
17,076,771✔
2631
      break;
593,246✔
2632
    }
2633

2634
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
16,483,525✔
2635
    pBInfo->pRes->info.dataLoad = 1;
16,483,525✔
2636
    if (pInfo->scalarSupp.pExprInfo != NULL) {
16,483,525✔
2637
      SExprSupp* pExprSup = &pInfo->scalarSupp;
3,012✔
2638
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
6,024✔
2639
                                   GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
3,012✔
2640
      QUERY_CHECK_CODE(code, lino, _end);
3,012✔
2641
    }
2642
    // the pDataBlock are always the same one, no need to call this again
2643
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
16,483,525✔
2644
    QUERY_CHECK_CODE(code, lino, _end);
16,483,525✔
2645

2646
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
16,483,525✔
2647
    QUERY_CHECK_CODE(code, lino, _end);
16,483,525✔
2648

2649
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
16,483,525✔
2650
  }
2651

2652
  if (pInfo->indefRowsMode) {
593,246✔
2653
    code = closeAllIndefRowsWindowStates(pOperator, &pInfo->indefRows);
9,606✔
2654
    QUERY_CHECK_CODE(code, lino, _end);
9,606✔
2655
    pOperator->status = OP_RES_TO_RETURN;
9,606✔
2656
    (*ppRes) = getNextIndefRowsResultBlock(&pInfo->indefRows, pOperator);
9,606✔
2657
    if ((*ppRes) == NULL) {
9,606✔
2658
      setOperatorCompleted(pOperator);
1,786✔
2659
    }
2660
    return code;
9,606✔
2661
  }
2662

2663
  // restore the value
2664
  pOperator->status = OP_RES_TO_RETURN;
583,640✔
2665

2666
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
583,640✔
2667
  QUERY_CHECK_CODE(code, lino, _end);
583,640✔
2668
  pInfo->cleanGroupResInfo = true;
583,640✔
2669

2670
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
583,640✔
2671
  QUERY_CHECK_CODE(code, lino, _end);
583,640✔
2672
  while (1) {
×
2673
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
583,640✔
2674
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
583,640✔
2675
    QUERY_CHECK_CODE(code, lino, _end);
583,640✔
2676

2677
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
583,640✔
2678
    if (!hasRemain) {
583,640✔
2679
      setOperatorCompleted(pOperator);
461,073✔
2680
      break;
461,073✔
2681
    }
2682

2683
    if (pBInfo->pRes->info.rows > 0) {
122,567✔
2684
      break;
122,567✔
2685
    }
2686
  }
2687

2688
_end:
583,640✔
2689
  if (code != TSDB_CODE_SUCCESS) {
583,640✔
2690
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2691
    pTaskInfo->code = code;
×
2692
    T_LONG_JMP(pTaskInfo->env, code);
×
2693
  }
2694
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
583,640✔
2695
  return code;
583,640✔
2696
}
2697

2698
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
80,869✔
2699
  SStateWindowOperatorInfo* pInfo = pOper->info;
80,869✔
2700
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
80,869✔
2701
  SStateWindowPhysiNode* pPhynode = (SStateWindowPhysiNode*)pOper->pPhyNode;
80,869✔
2702
  pOper->status = OP_NOT_OPENED;
80,869✔
2703

2704
  resetBasicOperatorState(&pInfo->binfo);
80,869✔
2705
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
80,869✔
2706
                    pInfo->cleanGroupResInfo);
80,869✔
2707

2708
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
80,869✔
2709
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
80,869✔
2710
  if (code == 0) {
80,869✔
2711
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
161,738✔
2712
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
80,869✔
2713
                       &pTaskInfo->storageAPI.functionStore);
2714
  }
2715
  if (code == 0) {
80,869✔
2716
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
80,869✔
2717
                         &pTaskInfo->storageAPI.functionStore);
2718
  }
2719

2720
  pInfo->cleanGroupResInfo = false;
80,869✔
2721
  pInfo->hasKey = false;
80,869✔
2722
  pInfo->winSup.lastTs = INT64_MIN;
80,869✔
2723
  resetIndefRowsRuntime(&pInfo->indefRows, pInfo->pOperator);
80,869✔
2724
  cleanupGroupResInfo(&pInfo->groupResInfo);
80,869✔
2725
  int32_t keyNum = taosArrayGetSize(pInfo->stateKeys);
80,869✔
2726
  for (int32_t i = 0; i < keyNum; ++i) {
162,598✔
2727
    SStateKeys* pKey = taosArrayGet(pInfo->stateKeys, i);
81,729✔
2728
    memset(pKey->pData, 0, pKey->bytes);
81,729✔
2729
    pKey->isNull = true;
81,729✔
2730
    SStateKeys* pPend = taosArrayGet(pInfo->pendingKeys, i);
81,729✔
2731
    if (pPend != NULL) {
81,729✔
2732
      memset(pPend->pData, 0, pPend->bytes);
81,729✔
2733
      pPend->isNull = true;
81,729✔
2734
    }
2735
  }
2736
  resetPendingState(pInfo);
80,869✔
2737
  return code;
80,869✔
2738
}
2739

2740
// todo make this as an non-blocking operator
2741
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhysiNode* pStateNode,
1,216,408✔
2742
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2743
  QRY_PARAM_CHECK(pOptrInfo);
1,216,408✔
2744

2745
  int32_t                   code = TSDB_CODE_SUCCESS;
1,216,408✔
2746
  int32_t                   lino = 0;
1,216,408✔
2747
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
1,216,408✔
2748
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,216,408✔
2749
  if (pInfo == NULL || pOperator == NULL) {
1,216,408✔
2750
    code = terrno;
×
2751
    goto _error;
×
2752
  }
2753
  initOperatorCostInfo(pOperator);
1,216,408✔
2754

2755
  pOperator->pPhyNode = pStateNode;
1,216,408✔
2756
  pOperator->exprSupp.hasWindowOrGroup = true;
1,216,408✔
2757
  pOperator->exprSupp.hasWindow = true;
1,216,408✔
2758
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
1,216,408✔
2759

2760
  if (pStateNode->window.pExprs != NULL) {
1,216,408✔
2761
    int32_t    numOfScalarExpr = 0;
310,047✔
2762
    SExprInfo* pScalarExprInfo = NULL;
310,047✔
2763
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
310,047✔
2764
    QUERY_CHECK_CODE(code, lino, _error);
310,047✔
2765

2766
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
310,047✔
2767
    if (code != TSDB_CODE_SUCCESS) {
310,047✔
2768
      goto _error;
×
2769
    }
2770
  }
2771

2772
  int32_t keyNum = LIST_LENGTH(pStateNode->pStateKeys);
1,216,408✔
2773
  pInfo->stateCols = taosArrayInit(keyNum, sizeof(SColumn));
1,216,408✔
2774
  pInfo->stateKeys = taosArrayInit(keyNum, sizeof(SStateKeys));
1,216,408✔
2775
  pInfo->pendingKeys = taosArrayInit(keyNum, sizeof(SStateKeys));
1,215,955✔
2776
  pInfo->pendingColTouched = taosMemoryCalloc(keyNum, sizeof(bool));
1,216,408✔
2777
  pInfo->hasPendingPartialNull = false;
1,216,408✔
2778
  if (pInfo->stateCols == NULL || pInfo->stateKeys == NULL
1,216,408✔
2779
      || pInfo->pendingKeys == NULL || pInfo->pendingColTouched == NULL) {
1,216,408✔
2780
    goto _error;
×
2781
  }
2782
  for (int32_t i = 0; i < keyNum; ++i) {
2,623,537✔
2783
    SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pStateNode->pStateKeys, i);
1,407,129✔
2784
    SColumn      stateCol = extractColumnFromColumnNode(pColNode);
1,407,129✔
2785
    SStateKeys   stateKey = {.type = stateCol.type, .bytes = stateCol.bytes, .isNull = true, .pData = taosMemoryCalloc(1, stateCol.bytes)};
1,407,129✔
2786
    SStateKeys   pendKey  = {.type = stateCol.type, .bytes = stateCol.bytes, .isNull = true, .pData = taosMemoryCalloc(1, stateCol.bytes)};
1,407,129✔
2787
    if (stateKey.pData == NULL || pendKey.pData == NULL
1,407,129✔
2788
        || taosArrayPush(pInfo->stateCols, &stateCol) == NULL
2,814,258✔
2789
        || taosArrayPush(pInfo->stateKeys, &stateKey) == NULL
2,814,258✔
2790
        || taosArrayPush(pInfo->pendingKeys, &pendKey) == NULL) {
2,814,258✔
2791
      taosMemoryFreeClear(stateKey.pData);
×
2792
      taosMemoryFreeClear(pendKey.pData);
×
2793
      goto _error;
×
2794
    }
2795
  }
2796
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
1,216,408✔
2797
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
1,216,408✔
2798

2799
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,216,408✔
2800
                            pTaskInfo->pStreamRuntimeInfo);
1,216,408✔
2801
  if (code != TSDB_CODE_SUCCESS) {
1,216,408✔
2802
    goto _error;
×
2803
  }
2804

2805
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,216,408✔
2806

2807
  int32_t    num = 0;
1,216,408✔
2808
  SExprInfo* pExprInfo = NULL;
1,216,408✔
2809
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
1,216,408✔
2810
  QUERY_CHECK_CODE(code, lino, _error);
1,216,408✔
2811

2812
  initResultSizeInfo(&pOperator->resultInfo, 4096);
1,216,408✔
2813

2814
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
1,216,408✔
2815
                    NULL, &pTaskInfo->storageAPI.functionStore);
2816
  if (code != TSDB_CODE_SUCCESS) {
1,216,408✔
2817
    goto _error;
×
2818
  }
2819

2820
  pInfo->indefRowsMode = pStateNode->window.indefRowsFunc;
1,216,408✔
2821
  if (pInfo->indefRowsMode) {
1,216,408✔
2822
    code = initIndefRowsRuntime(&pInfo->indefRows, pOperator->exprSupp.pCtx, num, pOperator->resultInfo.capacity);
10,517✔
2823
    QUERY_CHECK_CODE(code, lino, _error);
10,517✔
2824
  }
2825

2826
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
1,215,955✔
2827
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,216,408✔
2828
  initBasicInfo(&pInfo->binfo, pResBlock);
1,216,408✔
2829
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,215,955✔
2830

2831
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,215,955✔
2832
  QUERY_CHECK_CODE(code, lino, _error);
1,216,408✔
2833

2834
  pInfo->tsSlotId = tsSlotId;
1,216,408✔
2835
  pInfo->pOperator = pOperator;
1,216,408✔
2836
  pInfo->cleanGroupResInfo = false;
1,215,955✔
2837
  pInfo->extendOption = pStateNode->extendOption;
1,216,408✔
2838
  pInfo->trueForInfo.trueForType = pStateNode->trueForType;
1,216,408✔
2839
  pInfo->trueForInfo.count = pStateNode->trueForCount;
1,215,966✔
2840
  pInfo->trueForInfo.duration = pStateNode->trueForDuration;
1,216,408✔
2841
  pInfo->winSup.lastTs = INT64_MIN;
1,216,408✔
2842

2843
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
1,215,966✔
2844
                  pTaskInfo);
2845
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
1,216,408✔
2846
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2847
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
1,215,955✔
2848

2849
  code = appendDownstream(pOperator, &downstream, 1);
1,215,955✔
2850
  if (code != TSDB_CODE_SUCCESS) {
1,216,408✔
2851
    goto _error;
×
2852
  }
2853

2854
  *pOptrInfo = pOperator;
1,216,408✔
2855
  return TSDB_CODE_SUCCESS;
1,216,408✔
2856

2857
_error:
×
2858
  if (pInfo != NULL) {
×
2859
    destroyStateWindowOperatorInfo(pInfo);
×
2860
  }
2861

2862
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2863
  pTaskInfo->code = code;
×
2864
  return code;
×
2865
}
2866

2867
void destroySWindowOperatorInfo(void* param) {
609,128✔
2868
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
609,128✔
2869
  if (pInfo == NULL) {
609,128✔
2870
    return;
×
2871
  }
2872

2873
  cleanupBasicInfo(&pInfo->binfo);
609,128✔
2874
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
609,128✔
2875
  cleanupIndefRowsRuntime(&pInfo->indefRows, pInfo->pOperator);
609,128✔
2876
  if (pInfo->pOperator) {
609,128✔
2877
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
609,128✔
2878
                      pInfo->cleanGroupResInfo);
609,128✔
2879
    pInfo->pOperator = NULL;
609,128✔
2880
  }
2881

2882
  cleanupAggSup(&pInfo->aggSup);
609,128✔
2883
  cleanupExprSupp(&pInfo->scalarSupp);
609,128✔
2884

2885
  cleanupGroupResInfo(&pInfo->groupResInfo);
609,128✔
2886
  taosMemoryFreeClear(param);
609,128✔
2887
}
2888

2889
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
1,746✔
2890
  SSessionAggOperatorInfo* pInfo = pOper->info;
1,746✔
2891
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
1,746✔
2892
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
1,746✔
2893
  pOper->status = OP_NOT_OPENED;
1,746✔
2894

2895
  resetBasicOperatorState(&pInfo->binfo);
1,746✔
2896
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,746✔
2897
                    pInfo->cleanGroupResInfo);
1,746✔
2898

2899
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,746✔
2900
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,746✔
2901
  if (code == 0) {
1,746✔
2902
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
3,492✔
2903
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
1,746✔
2904
                       &pTaskInfo->storageAPI.functionStore);
2905
  }
2906
  if (code == 0) {
1,746✔
2907
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
1,746✔
2908
                         &pTaskInfo->storageAPI.functionStore);
2909
  }
2910

2911
  pInfo->cleanGroupResInfo = false;
1,746✔
2912
  pInfo->winSup = (SWindowRowsSup){0};
1,746✔
2913
  pInfo->winSup.prevTs = INT64_MIN;
1,746✔
2914
  pInfo->reptScan = false;
1,746✔
2915
  resetIndefRowsRuntime(&pInfo->indefRows, pInfo->pOperator);
1,746✔
2916

2917
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,746✔
2918
  return code;
1,746✔
2919
}
2920

2921
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
609,128✔
2922
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2923
  QRY_PARAM_CHECK(pOptrInfo);
609,128✔
2924

2925
  int32_t                  code = TSDB_CODE_SUCCESS;
609,128✔
2926
  int32_t                  lino = 0;
609,128✔
2927
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
609,128✔
2928
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
609,128✔
2929
  if (pInfo == NULL || pOperator == NULL) {
609,128✔
2930
    code = terrno;
×
2931
    goto _error;
×
2932
  }
2933
  initOperatorCostInfo(pOperator);
609,128✔
2934

2935
  pOperator->pPhyNode = pSessionNode;
609,128✔
2936
  pOperator->exprSupp.hasWindowOrGroup = true;
609,128✔
2937
  pOperator->exprSupp.hasWindow = true;
609,128✔
2938

2939
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
609,128✔
2940
  initResultSizeInfo(&pOperator->resultInfo, 4096);
609,128✔
2941

2942
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
609,128✔
2943
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
609,128✔
2944
  initBasicInfo(&pInfo->binfo, pResBlock);
609,128✔
2945

2946
  int32_t    numOfCols = 0;
609,128✔
2947
  SExprInfo* pExprInfo = NULL;
609,128✔
2948
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
609,128✔
2949
  QUERY_CHECK_CODE(code, lino, _error);
609,128✔
2950

2951
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
609,128✔
2952
                    NULL, &pTaskInfo->storageAPI.functionStore);
2953
  QUERY_CHECK_CODE(code, lino, _error);
609,128✔
2954

2955
  pInfo->indefRowsMode = pSessionNode->window.indefRowsFunc;
609,128✔
2956
  if (pInfo->indefRowsMode) {
609,128✔
2957
    code = initIndefRowsRuntime(&pInfo->indefRows, pOperator->exprSupp.pCtx, numOfCols, pOperator->resultInfo.capacity);
10,965✔
2958
    QUERY_CHECK_CODE(code, lino, _error);
10,965✔
2959
  }
2960

2961
  pInfo->gap = pSessionNode->gap;
609,128✔
2962

2963
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
609,128✔
2964
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
609,128✔
2965
  QUERY_CHECK_CODE(code, lino, _error);
609,128✔
2966

2967
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
609,128✔
2968
  pInfo->binfo.pRes = pResBlock;
609,128✔
2969
  pInfo->winSup.prevTs = INT64_MIN;
609,128✔
2970
  pInfo->reptScan = false;
609,128✔
2971
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
609,128✔
2972
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
609,128✔
2973

2974
  if (pSessionNode->window.pExprs != NULL) {
609,128✔
2975
    int32_t    numOfScalar = 0;
2,008✔
2976
    SExprInfo* pScalarExprInfo = NULL;
2,008✔
2977
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
2,008✔
2978
    QUERY_CHECK_CODE(code, lino, _error);
2,008✔
2979

2980
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
2,008✔
2981
    QUERY_CHECK_CODE(code, lino, _error);
2,008✔
2982
  }
2983

2984
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
609,128✔
2985
                            pTaskInfo->pStreamRuntimeInfo);
609,128✔
2986
  QUERY_CHECK_CODE(code, lino, _error);
609,128✔
2987

2988
  pInfo->pOperator = pOperator;
609,128✔
2989
  pInfo->cleanGroupResInfo = false;
609,128✔
2990
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
609,128✔
2991
                  pInfo, pTaskInfo);
2992
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
609,128✔
2993
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2994
  pOperator->pTaskInfo = pTaskInfo;
609,128✔
2995
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
609,128✔
2996

2997
  code = appendDownstream(pOperator, &downstream, 1);
609,128✔
2998
  QUERY_CHECK_CODE(code, lino, _error);
609,128✔
2999

3000
  *pOptrInfo = pOperator;
609,128✔
3001
  return TSDB_CODE_SUCCESS;
609,128✔
3002

3003
_error:
×
3004
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
3005
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
3006
  pTaskInfo->code = code;
×
3007
  return code;
×
3008
}
3009

3010
void destroyMAIOperatorInfo(void* param) {
864,258✔
3011
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
864,258✔
3012
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
864,258✔
3013
  taosMemoryFreeClear(param);
864,697✔
3014
}
864,697✔
3015

3016
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
760,967✔
3017
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
760,967✔
3018
  if (NULL == pResult) {
760,967✔
3019
    return pResult;
×
3020
  }
3021
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
760,967✔
3022
  return pResult;
760,967✔
3023
}
3024

3025
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
997,149,719✔
3026
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
3027
  if (*pResult == NULL) {
997,149,719✔
3028
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
760,967✔
3029
    if (*pResult == NULL) {
760,967✔
3030
      return terrno;
×
3031
    }
3032
  }
3033

3034
  // set time window for current result
3035
  (*pResult)->win = (*win);
997,150,158✔
3036
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
997,151,036✔
3037
}
3038

3039
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
4,090,238✔
3040
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
3041
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
4,090,238✔
3042
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
4,090,238✔
3043

3044
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
4,090,238✔
3045
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
4,090,238✔
3046
  SInterval*     pInterval = &iaInfo->interval;
4,090,238✔
3047

3048
  int32_t  startPos = 0;
4,090,238✔
3049
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
4,090,238✔
3050

3051
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
4,090,238✔
3052

3053
  // there is an result exists
3054
  if (miaInfo->curTs != INT64_MIN) {
4,090,238✔
3055
    if (ts != miaInfo->curTs) {
1,381,080✔
3056
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
1,319,457✔
3057
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,319,457✔
3058
      miaInfo->curTs = ts;
1,319,457✔
3059
    }
3060
  } else {
3061
    miaInfo->curTs = ts;
2,709,158✔
3062
  }
3063

3064
  STimeWindow win = {0};
4,090,238✔
3065
  win.skey = miaInfo->curTs;
4,090,238✔
3066
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
4,090,238✔
3067

3068
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
4,090,238✔
3069
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
4,090,238✔
3070
    T_LONG_JMP(pTaskInfo->env, ret);
×
3071
  }
3072

3073
  int32_t currPos = startPos;
4,090,238✔
3074

3075
  STimeWindow currWin = win;
4,090,238✔
3076
  while (++currPos < pBlock->info.rows) {
2,116,716,276✔
3077
    if (tsCols[currPos] == miaInfo->curTs) {
2,112,606,722✔
3078
      continue;
1,119,566,996✔
3079
    }
3080

3081
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
993,052,018✔
3082
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
1,986,118,962✔
3083
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
993,059,481✔
3084
    if (ret != TSDB_CODE_SUCCESS) {
993,058,164✔
3085
      T_LONG_JMP(pTaskInfo->env, ret);
×
3086
    }
3087

3088
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
993,058,164✔
3089
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
993,059,920✔
3090
    miaInfo->curTs = tsCols[currPos];
993,058,164✔
3091

3092
    currWin.skey = miaInfo->curTs;
993,059,481✔
3093
    currWin.ekey =
993,059,920✔
3094
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
993,059,481✔
3095

3096
    startPos = currPos;
993,059,920✔
3097
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
993,059,920✔
3098
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
993,059,042✔
3099
      T_LONG_JMP(pTaskInfo->env, ret);
×
3100
    }
3101

3102
    miaInfo->curTs = currWin.skey;
993,059,042✔
3103
  }
3104

3105
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
4,090,238✔
3106
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
8,180,476✔
3107
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
4,090,238✔
3108
  if (ret != TSDB_CODE_SUCCESS) {
4,090,238✔
3109
    T_LONG_JMP(pTaskInfo->env, ret);
×
3110
  }
3111
}
4,090,238✔
3112

3113
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
2,699,057✔
3114
  pRes->info.id.groupId = pMiaInfo->groupId;
2,699,057✔
3115
  pMiaInfo->curTs = INT64_MIN;
2,699,057✔
3116
  pMiaInfo->groupId = 0;
2,699,057✔
3117
}
2,699,057✔
3118

3119
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
3,700,135✔
3120
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
3,700,135✔
3121
  int32_t                               code = TSDB_CODE_SUCCESS;
3,700,135✔
3122
  int32_t                               lino = 0;
3,700,135✔
3123
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
3,700,135✔
3124
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
3,700,135✔
3125

3126
  SExprSupp*      pSup = &pOperator->exprSupp;
3,700,135✔
3127
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
3,700,135✔
3128
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
3,700,135✔
3129
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
3,700,135✔
3130

3131
  while (1) {
3,189,460✔
3132
    SSDataBlock* pBlock = NULL;
6,889,595✔
3133
    if (pMiaInfo->prefetchedBlock == NULL) {
6,889,595✔
3134
      pBlock = getNextBlockFromDownstreamRemainDetach(pOperator, 0);
4,941,404✔
3135
    } else {
3136
      pBlock = pMiaInfo->prefetchedBlock;
1,948,191✔
3137
      pMiaInfo->prefetchedBlock = NULL;
1,948,191✔
3138

3139
      pMiaInfo->groupId = pBlock->info.id.groupId;
1,948,191✔
3140
    }
3141

3142
    // no data exists, all query processing is done
3143
    if (pBlock == NULL) {
6,888,949✔
3144
      // close last unclosed time window
3145
      if (pMiaInfo->curTs != INT64_MIN) {
850,520✔
3146
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
750,866✔
3147
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
750,866✔
3148
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
750,866✔
3149
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
750,866✔
3150
        QUERY_CHECK_CODE(code, lino, _end);
750,866✔
3151
      }
3152

3153
      setOperatorCompleted(pOperator);
850,520✔
3154
      break;
850,520✔
3155
    }
3156

3157
    if (pMiaInfo->groupId == 0) {
6,038,429✔
3158
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
1,502,983✔
3159
        pMiaInfo->groupId = pBlock->info.id.groupId;
173,953✔
3160
        pRes->info.id.groupId = pMiaInfo->groupId;
173,953✔
3161
      }
3162
    } else {
3163
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
4,535,446✔
3164
        // if there are unclosed time window, close it firstly.
3165
        if (pMiaInfo->curTs == INT64_MIN) {
1,948,191✔
3166
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
3167
          T_LONG_JMP(pTaskInfo->env, terrno);
×
3168
        }
3169
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
1,948,191✔
3170
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,948,191✔
3171

3172
        pMiaInfo->prefetchedBlock = pBlock;
1,948,191✔
3173
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
1,948,191✔
3174
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,948,191✔
3175
        QUERY_CHECK_CODE(code, lino, _end);
1,948,191✔
3176
        if (pRes->info.rows == 0) {
1,948,191✔
3177
          // After filtering for last group, the result is empty, so we need to continue to process next group
3178
          continue;
14,926✔
3179
        } else {
3180
          break;
1,933,265✔
3181
        }
3182
      } else {
3183
        // continue
3184
        pRes->info.id.groupId = pMiaInfo->groupId;
2,587,255✔
3185
      }
3186
    }
3187

3188
    pRes->info.scanFlag = pBlock->info.scanFlag;
4,090,238✔
3189

3190
    if (pIaInfo->scalarSupp.pExprInfo != NULL) {
4,090,238✔
3191
      SExprSupp* pExprSup = &pIaInfo->scalarSupp;
1,506✔
3192
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
3,012✔
3193
                                   GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
1,506✔
3194
      QUERY_CHECK_CODE(code, lino, _end);
1,506✔
3195
    }
3196

3197
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
4,090,238✔
3198
    QUERY_CHECK_CODE(code, lino, _end);
4,090,238✔
3199

3200
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
4,090,238✔
3201

3202
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
4,090,238✔
3203
    QUERY_CHECK_CODE(code, lino, _end);
4,090,238✔
3204

3205
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
4,090,238✔
3206
      break;
915,704✔
3207
    }
3208
  }
3209

3210
_end:
3,699,489✔
3211
  if (code != TSDB_CODE_SUCCESS) {
3,699,489✔
3212
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3213
    pTaskInfo->code = code;
×
3214
    T_LONG_JMP(pTaskInfo->env, code);
×
3215
  }
3216
}
3,699,489✔
3217

3218
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,999,421✔
3219
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
3,999,421✔
3220
  int32_t                               code = TSDB_CODE_SUCCESS;
3,999,421✔
3221
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
3,999,421✔
3222
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
3,999,421✔
3223
  if (pOperator->status == OP_EXEC_DONE) {
3,999,421✔
3224
    (*ppRes) = NULL;
812,755✔
3225
    return code;
812,755✔
3226
  }
3227

3228
  SSDataBlock* pRes = iaInfo->binfo.pRes;
3,186,666✔
3229
  blockDataCleanup(pRes);
3,186,666✔
3230

3231
  if (iaInfo->binfo.mergeResultBlock) {
3,186,666✔
3232
    while (1) {
3233
      if (pOperator->status == OP_EXEC_DONE) {
3,160,779✔
3234
        break;
409,018✔
3235
      }
3236

3237
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
2,751,761✔
3238
        break;
914,314✔
3239
      }
3240

3241
      doMergeAlignedIntervalAgg(pOperator);
1,837,447✔
3242
    }
3243
  } else {
3244
    doMergeAlignedIntervalAgg(pOperator);
1,862,688✔
3245
  }
3246

3247
  (*ppRes) = (pRes->info.rows == 0) ? NULL : pRes;
3,186,020✔
3248
  return code;
3,186,020✔
3249
}
3250

3251
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
1,624✔
3252
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
1,624✔
3253
  
3254
  uint64_t     groupId;  // current groupId
3255
  int64_t      curTs;    // current ts
3256
  SSDataBlock* prefetchedBlock;
3257
  SResultRow*  pResultRow;
3258

3259
  pInfo->groupId = 0;
1,624✔
3260
  pInfo->curTs = INT64_MIN;
1,624✔
3261
  pInfo->prefetchedBlock = NULL;
1,624✔
3262
  pInfo->pResultRow = NULL;
1,624✔
3263

3264
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
1,624✔
3265
}
3266

3267
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
864,697✔
3268
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
3269
  QRY_PARAM_CHECK(pOptrInfo);
864,697✔
3270

3271
  int32_t                               code = TSDB_CODE_SUCCESS;
864,697✔
3272
  int32_t                               lino = 0;
864,697✔
3273
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
864,697✔
3274
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
864,697✔
3275
  if (miaInfo == NULL || pOperator == NULL) {
864,697✔
3276
    code = terrno;
×
3277
    goto _error;
×
3278
  }
3279
  initOperatorCostInfo(pOperator);
864,697✔
3280

3281
  pOperator->pPhyNode = pNode;
864,697✔
3282
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
864,697✔
3283
  if (miaInfo->intervalAggOperatorInfo == NULL) {
864,697✔
3284
    code = terrno;
×
3285
    goto _error;
×
3286
  }
3287

3288
  SInterval interval = {.interval = pNode->interval,
2,594,091✔
3289
                        .sliding = pNode->sliding,
864,697✔
3290
                        .intervalUnit = pNode->intervalUnit,
864,697✔
3291
                        .slidingUnit = pNode->slidingUnit,
864,697✔
3292
                        .offset = pNode->offset,
864,697✔
3293
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
864,697✔
3294
                        .timeRange = pNode->timeRange};
3295
  calcIntervalAutoOffset(&interval);
864,697✔
3296

3297
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
864,697✔
3298
  SExprSupp*                pSup = &pOperator->exprSupp;
864,697✔
3299
  pSup->hasWindowOrGroup = true;
864,697✔
3300
  pSup->hasWindow = true;
864,697✔
3301

3302
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
864,697✔
3303
                            pTaskInfo->pStreamRuntimeInfo);
864,697✔
3304
  QUERY_CHECK_CODE(code, lino, _error);
864,697✔
3305

3306
  miaInfo->curTs = INT64_MIN;
864,697✔
3307
  iaInfo->win = pTaskInfo->window;
864,697✔
3308
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
864,697✔
3309
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
864,697✔
3310
  iaInfo->interval = interval;
864,697✔
3311
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
864,697✔
3312
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
864,697✔
3313

3314
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
864,697✔
3315
  initResultSizeInfo(&pOperator->resultInfo, 512);
864,697✔
3316

3317
  int32_t    num = 0;
864,697✔
3318
  SExprInfo* pExprInfo = NULL;
864,697✔
3319
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
864,697✔
3320
  QUERY_CHECK_CODE(code, lino, _error);
864,697✔
3321

3322
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
864,697✔
3323
                    NULL, &pTaskInfo->storageAPI.functionStore);
3324
  QUERY_CHECK_CODE(code, lino, _error);
864,697✔
3325

3326
  if (pNode->window.pExprs != NULL) {
864,697✔
3327
    int32_t    numOfScalar = 0;
1,506✔
3328
    SExprInfo* pScalarExprInfo = NULL;
1,506✔
3329
    code = createExprInfo(pNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
1,506✔
3330
    QUERY_CHECK_CODE(code, lino, _error);
1,506✔
3331
    code = initExprSupp(&iaInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
1,506✔
3332
    QUERY_CHECK_CODE(code, lino, _error);
1,506✔
3333
  }
3334

3335
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
864,697✔
3336
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
864,697✔
3337
  initBasicInfo(&iaInfo->binfo, pResBlock);
864,697✔
3338
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
864,697✔
3339
  QUERY_CHECK_CODE(code, lino, _error);
864,697✔
3340

3341
  iaInfo->timeWindowInterpo = false;
864,697✔
3342
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
864,697✔
3343
  QUERY_CHECK_CODE(code, lino, _error);
864,697✔
3344
  if (iaInfo->timeWindowInterpo) {
864,697✔
3345
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
3346
  }
3347

3348
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
864,697✔
3349
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
864,697✔
3350
  QUERY_CHECK_CODE(code, lino, _error);
864,697✔
3351
  iaInfo->pOperator = pOperator;
864,697✔
3352
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
864,697✔
3353
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
3354

3355
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
864,697✔
3356
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
3357
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
864,697✔
3358

3359
  code = appendDownstream(pOperator, &downstream, 1);
864,697✔
3360
  QUERY_CHECK_CODE(code, lino, _error);
864,697✔
3361

3362
  *pOptrInfo = pOperator;
864,697✔
3363
  return TSDB_CODE_SUCCESS;
864,697✔
3364

3365
_error:
×
3366
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
3367
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
3368
  pTaskInfo->code = code;
×
3369
  return code;
×
3370
}
3371

3372
//=====================================================================================================================
3373
// merge interval operator
3374
typedef struct SMergeIntervalAggOperatorInfo {
3375
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
3376
  SList*                   groupIntervals;
3377
  SListIter                groupIntervalsIter;
3378
  bool                     hasGroupId;
3379
  uint64_t                 groupId;
3380
  SSDataBlock*             prefetchedBlock;
3381
  bool                     inputBlocksFinished;
3382
} SMergeIntervalAggOperatorInfo;
3383

3384
typedef struct SGroupTimeWindow {
3385
  uint64_t    groupId;
3386
  STimeWindow window;
3387
} SGroupTimeWindow;
3388

3389
void destroyMergeIntervalOperatorInfo(void* param) {
×
3390
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
3391
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
3392
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
3393

3394
  taosMemoryFreeClear(param);
×
3395
}
×
3396

3397
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
3398
                                        STimeWindow* newWin) {
3399
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
3400
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
3401
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
3402

3403
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
3404
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
3405
  if (code != TSDB_CODE_SUCCESS) {
×
3406
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3407
    return code;
×
3408
  }
3409

3410
  SListIter iter = {0};
×
3411
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
3412
  SListNode* listNode = NULL;
×
3413
  while ((listNode = tdListNext(&iter)) != NULL) {
×
3414
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
3415
    if (prevGrpWin->groupId != tableGroupId) {
×
3416
      continue;
×
3417
    }
3418

3419
    STimeWindow* prevWin = &prevGrpWin->window;
×
3420
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
3421
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
3422
      taosMemoryFreeClear(tmp);
×
3423
    }
3424
  }
3425

3426
  return TSDB_CODE_SUCCESS;
×
3427
}
3428

3429
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
3430
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
3431
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
3432
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
3433

3434
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
3435
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
3436

3437
  int32_t     startPos = 0;
×
3438
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
3439
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
3440
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
3441
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
3442
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
3443
  SResultRow* pResult = NULL;
×
3444

3445
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
3446
                                        iaInfo->binfo.inputTsOrder);
3447

3448
  int32_t ret =
3449
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
3450
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
3451
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
3452
    T_LONG_JMP(pTaskInfo->env, ret);
×
3453
  }
3454

3455
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
3456
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
3457
                                                 iaInfo->binfo.inputTsOrder);
3458
  if (forwardRows <= 0) {
×
3459
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3460
  }
3461

3462
  // prev time window not interpolation yet.
3463
  if (iaInfo->timeWindowInterpo) {
×
3464
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
3465
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
3466

3467
    // restore current time window
3468
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
3469
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
3470
    if (ret != TSDB_CODE_SUCCESS) {
×
3471
      T_LONG_JMP(pTaskInfo->env, ret);
×
3472
    }
3473

3474
    // window start key interpolation
3475
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
3476
    if (ret != TSDB_CODE_SUCCESS) {
×
3477
      T_LONG_JMP(pTaskInfo->env, ret);
×
3478
    }
3479
  }
3480

3481
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
3482
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
3483
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
3484
  if (ret != TSDB_CODE_SUCCESS) {
×
3485
    T_LONG_JMP(pTaskInfo->env, ret);
×
3486
  }
3487
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
3488

3489
  // output previous interval results after this interval (&win) is closed
3490
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
3491
  if (code != TSDB_CODE_SUCCESS) {
×
3492
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3493
    T_LONG_JMP(pTaskInfo->env, code);
×
3494
  }
3495

3496
  STimeWindow nextWin = win;
×
3497
  while (1) {
×
3498
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
3499
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
3500
                                      iaInfo->binfo.inputTsOrder);
3501
    if (startPos < 0) {
×
3502
      break;
×
3503
    }
3504

3505
    // null data, failed to allocate more memory buffer
3506
    code =
3507
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
3508
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
3509
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
3510
      T_LONG_JMP(pTaskInfo->env, code);
×
3511
    }
3512

3513
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
3514
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
3515
                                           iaInfo->binfo.inputTsOrder);
3516

3517
    // window start(end) key interpolation
3518
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
3519
    if (code != TSDB_CODE_SUCCESS) {
×
3520
      T_LONG_JMP(pTaskInfo->env, code);
×
3521
    }
3522

3523
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
3524
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
3525
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
3526
    if (code != TSDB_CODE_SUCCESS) {
×
3527
      T_LONG_JMP(pTaskInfo->env, code);
×
3528
    }
3529
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
3530

3531
    // output previous interval results after this interval (&nextWin) is closed
3532
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
3533
    if (code != TSDB_CODE_SUCCESS) {
×
3534
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
3535
      T_LONG_JMP(pTaskInfo->env, code);
×
3536
    }
3537
  }
3538

3539
  if (iaInfo->timeWindowInterpo) {
×
3540
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
3541
  }
3542
}
×
3543

3544
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
3545
  int32_t        code = TSDB_CODE_SUCCESS;
×
3546
  int32_t        lino = 0;
×
3547
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
3548

3549
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
3550
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
3551
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
3552

3553
  if (pOperator->status == OP_EXEC_DONE) {
×
3554
    (*ppRes) = NULL;
×
3555
    return code;
×
3556
  }
3557

3558
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
3559
  blockDataCleanup(pRes);
×
3560
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
3561
  QUERY_CHECK_CODE(code, lino, _end);
×
3562

3563
  if (!miaInfo->inputBlocksFinished) {
×
3564
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
3565
    while (1) {
×
3566
      SSDataBlock* pBlock = NULL;
×
3567
      if (miaInfo->prefetchedBlock == NULL) {
×
3568
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
3569
      } else {
3570
        pBlock = miaInfo->prefetchedBlock;
×
3571
        miaInfo->groupId = pBlock->info.id.groupId;
×
3572
        miaInfo->prefetchedBlock = NULL;
×
3573
      }
3574

3575
      if (pBlock == NULL) {
×
3576
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
3577
        miaInfo->inputBlocksFinished = true;
×
3578
        break;
×
3579
      }
3580

3581
      if (!miaInfo->hasGroupId) {
×
3582
        miaInfo->hasGroupId = true;
×
3583
        miaInfo->groupId = pBlock->info.id.groupId;
×
3584
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
3585
        miaInfo->prefetchedBlock = pBlock;
×
3586
        break;
×
3587
      }
3588

3589
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
3590
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
3591
      QUERY_CHECK_CODE(code, lino, _end);
×
3592

3593
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
3594

3595
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
3596
        break;
×
3597
      }
3598
    }
3599

3600
    pRes->info.id.groupId = miaInfo->groupId;
×
3601
  }
3602

3603
  if (miaInfo->inputBlocksFinished) {
×
3604
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
3605

3606
    if (listNode != NULL) {
×
3607
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
3608
      pRes->info.id.groupId = grpWin->groupId;
×
3609
    }
3610
  }
3611

3612
  if (pRes->info.rows == 0) {
×
3613
    setOperatorCompleted(pOperator);
×
3614
  }
3615

3616
_end:
×
3617
  if (code != TSDB_CODE_SUCCESS) {
×
3618
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3619
    pTaskInfo->code = code;
×
3620
    T_LONG_JMP(pTaskInfo->env, code);
×
3621
  }
3622
  (*ppRes) = (pRes->info.rows == 0) ? NULL : pRes;
×
3623
  return code;
×
3624
}
3625

3626
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
3627
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
3628

3629
  pInfo->hasGroupId = false;
×
3630
  pInfo->groupId = 0;
×
3631
  pInfo->prefetchedBlock = NULL;
×
3632
  pInfo->inputBlocksFinished = false;
×
3633
  tdListEmpty(pInfo->groupIntervals);
×
3634
  
3635
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
3636
  return resetInterval(pOper, pIntervalInfo);
×
3637
}
3638

3639
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
3640
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
3641
  QRY_PARAM_CHECK(pOptrInfo);
×
3642

3643
  int32_t                        code = TSDB_CODE_SUCCESS;
×
3644
  int32_t                        lino = 0;
×
3645
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
3646
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
3647
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
3648
    code = terrno;
×
3649
    goto _error;
×
3650
  }
3651
  initOperatorCostInfo(pOperator);
×
3652

3653
  pOperator->pPhyNode = pIntervalPhyNode;
×
3654
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
3655
                        .sliding = pIntervalPhyNode->sliding,
×
3656
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
3657
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
3658
                        .offset = pIntervalPhyNode->offset,
×
3659
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
3660
                        .timeRange = pIntervalPhyNode->timeRange};
3661
  calcIntervalAutoOffset(&interval);
×
3662

3663
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
3664

3665
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
3666
  pIntervalInfo->win = pTaskInfo->window;
×
3667
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
3668
  pIntervalInfo->interval = interval;
×
3669
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
3670
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
3671
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
3672

3673
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
3674
  pExprSupp->hasWindowOrGroup = true;
×
3675
  pExprSupp->hasWindow = true;
×
3676

3677
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
3678
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
3679

3680
  int32_t    num = 0;
×
3681
  SExprInfo* pExprInfo = NULL;
×
3682
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
3683
  QUERY_CHECK_CODE(code, lino, _error);
×
3684

3685
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
3686
                    NULL, &pTaskInfo->storageAPI.functionStore);
3687
  if (code != TSDB_CODE_SUCCESS) {
×
3688
    goto _error;
×
3689
  }
3690

3691
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
3692
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
3693
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
3694
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
3695
  QUERY_CHECK_CODE(code, lino, _error);
×
3696

3697
  pIntervalInfo->timeWindowInterpo = false;
×
3698
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
3699
  QUERY_CHECK_CODE(code, lino, _error);
×
3700
  if (pIntervalInfo->timeWindowInterpo) {
×
3701
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
3702
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
3703
      goto _error;
×
3704
    }
3705
  }
3706

3707
  pIntervalInfo->pOperator = pOperator;
×
3708
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
3709
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
3710
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
3711
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
3712
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
3713
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
3714

3715
  code = appendDownstream(pOperator, &downstream, 1);
×
3716
  if (code != TSDB_CODE_SUCCESS) {
×
3717
    goto _error;
×
3718
  }
3719

3720
  *pOptrInfo = pOperator;
×
3721
  return TSDB_CODE_SUCCESS;
×
3722
_error:
×
3723
  if (pMergeIntervalInfo != NULL) {
×
3724
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
3725
  }
3726
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
3727
  pTaskInfo->code = code;
×
3728
  return code;
×
3729
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc