• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.17
/source/libs/executor/src/timewindowoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "taoserror.h"
23
#include "tchecksum.h"
24
#include "tcommon.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "tfill.h"
28
#include "tglobal.h"
29
#include "tlog.h"
30
#include "ttime.h"
31

32
typedef enum SResultTsInterpType {
33
  RESULT_ROW_START_INTERP = 1,
34
  RESULT_ROW_END_INTERP = 2,
35
} SResultTsInterpType;
36

37
typedef struct SOpenWindowInfo {
38
  SResultRowPosition pos;
39
  uint64_t           groupId;
40
} SOpenWindowInfo;
41

42
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
43

44
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult,
45
                                              uint64_t groupId, SExecTaskInfo* pTaskInfo);
46
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
47
static int32_t applyIndefRowsWindowSegment(SOperatorInfo* pOperator, SIndefRowsRuntime* pRuntime,
10,585,378✔
48
                                           SSDataBlock* pResultTemplate, int32_t resultRowSize,
49
                                           uint64_t groupId, const STimeWindow* pWin,
50
                                           SSDataBlock* pInputBlock, int32_t startRow, int32_t numRows,
51
                                           int32_t inputTsOrder, bool closeWindow) {
52
  int32_t                code = TSDB_CODE_SUCCESS;
10,585,378✔
53
  int32_t                lino = 0;
10,585,378✔
54
  SIndefRowsWindowState* pState = NULL;
10,585,378✔
55

56
  code = applyIndefRowsFuncOnWindowState(pOperator, pRuntime, &pState, pResultTemplate, groupId, pWin,
10,585,378✔
57
                                                 pInputBlock, startRow, numRows, inputTsOrder, resultRowSize);
58
  QUERY_CHECK_CODE(code, lino, _return);
10,585,378✔
59

60
  if (!closeWindow) {
10,582,272✔
61
    return code;
727,484✔
62
  }
63
  
64
  code = closeIndefRowsWindowState(pOperator, pRuntime, pState);
9,854,788✔
65
  QUERY_CHECK_CODE(code, lino, _return);
9,854,788✔
66

67
_return:
9,857,894✔
68
  if (code) {
9,857,894✔
69
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
3,106✔
70
  }
71
  return code;
9,857,894✔
72
}
73

74
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
2,147,483,647✔
75
                                      SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
76
                                      int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
77
                                      SExecTaskInfo* pTaskInfo) {
78
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
2,147,483,647✔
79
                                                  masterscan, tableGroupId, pTaskInfo, true, pAggSup, true);
80

81
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,147,483,647✔
82
    *pResult = NULL;
154✔
83
    qError("failed to set result output buffer, error:%s", tstrerror(pTaskInfo->code));
×
84
    return pTaskInfo->code;
×
85
  }
86

87
  // set time window for current result
88
  TAOS_SET_POBJ_ALIGNED(&pResultRow->win, win);
2,147,483,647✔
89
  *pResult = pResultRow;
2,147,483,647✔
90
  return setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,147,483,647✔
91
}
92

93
void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, int32_t rowIndex, uint64_t groupId) {
2,147,483,647✔
94
  pRowSup->win.ekey = ts;
2,147,483,647✔
95
  pRowSup->prevTs = ts;
2,147,483,647✔
96
  pRowSup->groupId = groupId;
2,147,483,647✔
97
  pRowSup->numOfRows += 1;
2,147,483,647✔
98
  if (hasContinuousNullRows(pRowSup)) {
2,147,483,647✔
99
    // rows having null state col are wrapped by rows of same state
100
    // these rows can be counted into current window
101
    pRowSup->numOfRows += pRowSup->numNullRows;
200,413,599✔
102
    resetNumNullRows(pRowSup);
200,413,599✔
103
  }
104
}
2,147,483,647✔
105

106
void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
1,634,663,888✔
107
  pRowSup->startRowIndex = rowIndex;
1,634,663,888✔
108
  pRowSup->numOfRows = 0;
1,634,663,888✔
109
  pRowSup->win.skey = tsList[rowIndex];
1,634,663,888✔
110
  pRowSup->groupId = groupId;
1,634,663,888✔
111
  resetNumNullRows(pRowSup);
1,634,663,888✔
112
}
1,634,663,888✔
113

114
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
×
115
                                            int32_t order, int64_t* pData) {
116
  int32_t forwardRows = 0;
2,147,483,647✔
117

118
  if (order == TSDB_ORDER_ASC) {
×
119
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
2,147,483,647✔
120
    if (end >= 0) {
2,147,483,647✔
121
      forwardRows = end;
2,147,483,647✔
122

123
      while (pData[end + pos] == ekey) {
2,147,483,647✔
124
        forwardRows += 1;
2,147,483,647✔
125
        ++pos;
2,147,483,647✔
126
      }
127
    }
128
  } else {
129
    int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
427,741,957✔
130
    if (end >= 0) {
427,081,901✔
131
      forwardRows = end;
427,248,743✔
132

133
      while (pData[end + pos] == ekey) {
841,993,896✔
134
        forwardRows += 1;
414,745,153✔
135
        ++pos;
414,745,153✔
136
      }
137
    }
138
    //    int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
139
    //    if (end >= 0) {
140
    //      forwardRows = pos - end;
141
    //
142
    //      if (pData[end] == ekey) {
143
    //        forwardRows += 1;
144
    //      }
145
    //    }
146
  }
147

148
  return forwardRows;
2,147,483,647✔
149
}
150

151
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
2,147,483,647✔
152
  int32_t midPos = -1;
2,147,483,647✔
153
  int32_t numOfRows;
154

155
  if (num <= 0) {
2,147,483,647✔
156
    return -1;
×
157
  }
158

159
  TSKEY*  keyList = (TSKEY*)pValue;
2,147,483,647✔
160
  int32_t firstPos = 0;
2,147,483,647✔
161
  int32_t lastPos = num - 1;
2,147,483,647✔
162

163
  if (order == TSDB_ORDER_DESC) {
2,147,483,647✔
164
    // find the first position which is smaller than the key
165
    while (1) {
166
      if (key >= keyList[firstPos]) return firstPos;
517,320,263✔
167
      if (key == keyList[lastPos]) return lastPos;
100,997,804✔
168

169
      if (key < keyList[lastPos]) {
100,467,174✔
170
        lastPos += 1;
10,823,207✔
171
        if (lastPos >= num) {
10,823,207✔
172
          return -1;
×
173
        } else {
174
          return lastPos;
10,823,207✔
175
        }
176
      }
177

178
      numOfRows = lastPos - firstPos + 1;
89,647,096✔
179
      midPos = (numOfRows >> 1) + firstPos;
89,647,096✔
180

181
      if (key < keyList[midPos]) {
89,647,096✔
182
        firstPos = midPos + 1;
6,327,329✔
183
      } else if (key > keyList[midPos]) {
83,331,416✔
184
        lastPos = midPos - 1;
82,736,177✔
185
      } else {
186
        break;
593,961✔
187
      }
188
    }
189

190
  } else {
191
    // find the first position which is bigger than the key
192
    while (1) {
193
      if (key <= keyList[firstPos]) return firstPos;
2,147,483,647✔
194
      if (key == keyList[lastPos]) return lastPos;
2,147,483,647✔
195

196
      if (key > keyList[lastPos]) {
2,147,483,647✔
197
        lastPos = lastPos + 1;
2,147,483,647✔
198
        if (lastPos >= num)
2,147,483,647✔
199
          return -1;
1,213,185✔
200
        else
201
          return lastPos;
2,147,483,647✔
202
      }
203

204
      numOfRows = lastPos - firstPos + 1;
2,147,483,647✔
205
      midPos = (numOfRows >> 1u) + firstPos;
2,147,483,647✔
206

207
      if (key < keyList[midPos]) {
2,147,483,647✔
208
        lastPos = midPos - 1;
2,147,483,647✔
209
      } else if (key > keyList[midPos]) {
1,775,562,387✔
210
        firstPos = midPos + 1;
1,055,347,567✔
211
      } else {
212
        break;
720,214,554✔
213
      }
214
    }
215
  }
216

217
  return midPos;
720,808,515✔
218
}
219

220
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
2,147,483,647✔
221
                                 __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order) {
222
  int32_t num = -1;
2,147,483,647✔
223
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
2,147,483,647✔
224

225
  if (order == TSDB_ORDER_ASC) {
2,147,483,647✔
226
    if (ekey < pDataBlockInfo->window.ekey && pPrimaryColumn) {
2,147,483,647✔
227
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
2,147,483,647✔
228
      if (item != NULL) {
2,147,483,647✔
229
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
230
      }
231
    } else {
232
      num = pDataBlockInfo->rows - startPos;
18,928,340✔
233
      if (item != NULL) {
20,003,954✔
234
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
235
      }
236
    }
237
  } else {  // desc
238
    if (ekey > pDataBlockInfo->window.skey && pPrimaryColumn) {
422,007,386✔
239
      num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
427,737,000✔
240
      if (item != NULL) {
425,705,402✔
241
        item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
×
242
      }
243
    } else {
244
      num = pDataBlockInfo->rows - startPos;
1,266,067✔
245
      if (item != NULL) {
1,693,207✔
246
        item->lastKey = pDataBlockInfo->window.ekey + step;
×
247
      }
248
    }
249
  }
250

251
  return num;
2,147,483,647✔
252
}
253

254
void doTimeWindowInterpolation(SArray* pPrevValues, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs,
11,066,764✔
255
                               int32_t curRowIndex, TSKEY windowKey, int32_t type, SExprSupp* pSup) {
256
  SqlFunctionCtx* pCtx = pSup->pCtx;
11,066,764✔
257

258
  int32_t index = 1;
11,066,764✔
259
  for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
33,211,849✔
260
    if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
22,145,085✔
261
      pCtx[k].start.key = INT64_MIN;
11,078,321✔
262
      continue;
11,078,321✔
263
    }
264

265
    SFunctParam*     pParam = &pCtx[k].param[0];
11,066,764✔
266
    SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
11,066,764✔
267

268
    double v1 = 0, v2 = 0, v = 0;
11,066,764✔
269
    if (prevRowIndex == -1) {
11,066,764✔
270
      SGroupKeys* p = taosArrayGet(pPrevValues, index);
×
271
      GET_TYPED_DATA(v1, double, pColInfo->info.type, p->pData, typeGetTypeModFromColInfo(&pColInfo->info));
×
272
    } else {
273
      GET_TYPED_DATA(v1, double, pColInfo->info.type, colDataGetData(pColInfo, prevRowIndex),
11,066,764✔
274
                     typeGetTypeModFromColInfo(&pColInfo->info));
275
    }
276

277
    GET_TYPED_DATA(v2, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex),
11,066,764✔
278
                   typeGetTypeModFromColInfo(&pColInfo->info));
279

280
#if 0
281
    if (functionId == FUNCTION_INTERP) {
282
      if (type == RESULT_ROW_START_INTERP) {
283
        pCtx[k].start.key = prevTs;
284
        pCtx[k].start.val = v1;
285

286
        pCtx[k].end.key = curTs;
287
        pCtx[k].end.val = v2;
288

289
        if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_VARBINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR ||
290
            pColInfo->info.type == TSDB_DATA_TYPE_GEOMETRY) {
291
          if (prevRowIndex == -1) {
292
            //            pCtx[k].start.ptr = (char*)pRuntimeEnv->prevRow[index];
293
          } else {
294
            pCtx[k].start.ptr = (char*)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
295
          }
296

297
          pCtx[k].end.ptr = (char*)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
298
        }
299
      }
300
    } else if (functionId == FUNCTION_TWA) {
301
#endif
302

303
    SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
11,066,764✔
304
    SPoint point2 = (SPoint){.key = curTs, .val = &v2};
11,066,764✔
305
    SPoint point = (SPoint){.key = windowKey, .val = &v};
11,066,764✔
306

307
    if (!fmIsElapsedFunc(pCtx[k].functionId)) {
11,066,764✔
308
      taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, 0);
11,003,551✔
309
    }
310

311
    if (type == RESULT_ROW_START_INTERP) {
11,066,764✔
312
      pCtx[k].start.key = point.key;
5,495,928✔
313
      pCtx[k].start.val = v;
5,495,928✔
314
    } else {
315
      pCtx[k].end.key = point.key;
5,570,836✔
316
      pCtx[k].end.val = v;
5,570,836✔
317
    }
318

319
    index += 1;
11,066,764✔
320
  }
321
#if 0
322
  }
323
#endif
324
}
11,066,764✔
325

326
static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t type) {
736,990✔
327
  if (type == RESULT_ROW_START_INTERP) {
736,990✔
328
    for (int32_t k = 0; k < numOfOutput; ++k) {
1,195,109✔
329
      pCtx[k].start.key = INT64_MIN;
789,160✔
330
    }
331
  } else {
332
    for (int32_t k = 0; k < numOfOutput; ++k) {
995,810✔
333
      pCtx[k].end.key = INT64_MIN;
664,769✔
334
    }
335
  }
336
}
736,990✔
337

338
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock,
5,901,429✔
339
                                              const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) {
340
  bool ascQuery = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
5,901,429✔
341

342
  TSKEY curTs = tsCols[pos];
5,901,429✔
343

344
  SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
5,901,429✔
345
  TSKEY       lastTs = *(int64_t*)pTsKey->pData;
5,901,429✔
346

347
  // lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
348
  // start exactly from this point, no need to do interpolation
349
  TSKEY key = ascQuery ? win->skey : win->ekey;
5,901,429✔
350
  if (key == curTs) {
5,901,429✔
351
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
397,155✔
352
    return true;
397,155✔
353
  }
354

355
  // it is the first time window, no need to do interpolation
356
  if (pTsKey->isNull && pos == 0) {
5,504,274✔
357
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
8,346✔
358
  } else {
359
    TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
5,495,928✔
360
    doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
5,495,928✔
361
                              RESULT_ROW_START_INTERP, pSup);
362
  }
363

364
  return true;
5,504,274✔
365
}
366

367
static int32_t setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
5,901,877✔
368
                                               int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
369
                                               TSKEY blockEkey, STimeWindow* win, bool* pRes) {
370
  int32_t code = TSDB_CODE_SUCCESS;
5,901,877✔
371
  int32_t lino = 0;
5,901,877✔
372
  int32_t order = pInfo->binfo.inputTsOrder;
5,901,877✔
373

374
  TSKEY actualEndKey = tsCols[endRowIndex];
5,901,877✔
375
  TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey;
5,901,877✔
376

377
  // not ended in current data block, do not invoke interpolation
378
  if ((key > blockEkey && (order == TSDB_ORDER_ASC)) || (key < blockEkey && (order == TSDB_ORDER_DESC))) {
5,901,877✔
379
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
24,559✔
380
    (*pRes) = false;
24,559✔
381
    return code;
24,559✔
382
  }
383

384
  // there is actual end point of current time window, no interpolation needs
385
  if (key == actualEndKey) {
5,877,318✔
386
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
306,482✔
387
    (*pRes) = true;
306,482✔
388
    return code;
306,482✔
389
  }
390

391
  if (nextRowIndex < 0) {
5,570,836✔
392
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
393
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
394
  }
395

396
  TSKEY nextKey = tsCols[nextRowIndex];
5,570,836✔
397
  doTimeWindowInterpolation(pInfo->pPrevValues, pDataBlock, actualEndKey, endRowIndex, nextKey, nextRowIndex, key,
5,570,836✔
398
                            RESULT_ROW_END_INTERP, pSup);
399
  (*pRes) = true;
5,570,836✔
400
  return code;
5,570,836✔
401
}
402

403
bool inCalSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, TSKEY calStart, TSKEY calEnd) {
2,147,483,647✔
404
  if (pInterval->interval != pInterval->sliding && (pWin->ekey < calStart || pWin->skey > calEnd)) {
2,147,483,647✔
405
    return false;
×
406
  }
407

408
  return true;
2,147,483,647✔
409
}
410

411
bool inSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, const SDataBlockInfo* pBlockInfo) {
2,147,483,647✔
412
  return inCalSlidingWindow(pInterval, pWin, pBlockInfo->calWin.skey, pBlockInfo->calWin.ekey);
2,147,483,647✔
413
}
414

415
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
2,147,483,647✔
416
                               TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
417
  bool ascQuery = (order != TSDB_ORDER_DESC);
2,147,483,647✔
418

419
  int32_t precision = pInterval->precision;
2,147,483,647✔
420
  getNextTimeWindow(pInterval, pNext, order);
2,147,483,647✔
421

422
  // next time window is not in current block
423
  if ((pNext->skey > pDataBlockInfo->window.ekey && order == TSDB_ORDER_ASC) ||
2,147,483,647✔
424
      (pNext->ekey < pDataBlockInfo->window.skey && order == TSDB_ORDER_DESC)) {
2,147,483,647✔
425
    return -1;
9,611,089✔
426
  }
427

428
  if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
2,147,483,647✔
429
    return -1;
×
430
  }
431

432
  TSKEY   skey = ascQuery ? pNext->skey : pNext->ekey;
2,147,483,647✔
433
  int32_t startPos = 0;
2,147,483,647✔
434

435
  // tumbling time window query, a special case of sliding time window query
436
  if (pInterval->sliding == pInterval->interval && prevPosition != -1) {
2,147,483,647✔
437
    startPos = prevPosition + 1;
2,147,483,647✔
438
  } else {
439
    if ((skey <= pDataBlockInfo->window.skey && ascQuery) || (skey >= pDataBlockInfo->window.ekey && !ascQuery)) {
123,746,930✔
440
      startPos = 0;
8,312,117✔
441
    } else {
442
      startPos = binarySearchForKey((char*)primaryKeys, pDataBlockInfo->rows, skey, order);
122,772,320✔
443
    }
444
  }
445
  if(startPos < 0 || startPos >= pDataBlockInfo->rows) {
2,147,483,647✔
446
    return -1;
2,147,483,647✔
447
  }
448

449
  /* interp query with fill should not skip time window */
450
  //  if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
451
  //    return startPos;
452
  //  }
453

454
  /*
455
   * This time window does not cover any data, try next time window,
456
   * this case may happen when the time window is too small
457
   */
458
  if (primaryKeys != NULL) {
2,147,483,647✔
459
    if (ascQuery && primaryKeys[startPos] > pNext->ekey) {
2,147,483,647✔
460
      TSKEY next = primaryKeys[startPos];
1,744,264,324✔
461
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
1,743,457,588✔
462
        pNext->skey = taosTimeTruncate(next, pInterval);
77✔
463
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
1,728✔
464
      } else {
465
        pNext->ekey += ((next - pNext->ekey + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
1,746,003,653✔
466
        pNext->skey = pNext->ekey - pInterval->interval + 1;
1,744,963,585✔
467
      }
468
    } else if ((!ascQuery) && primaryKeys[startPos] < pNext->skey) {
836,209,023✔
469
      TSKEY next = primaryKeys[startPos];
420,368,648✔
470
      if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
420,509,465✔
UNCOV
471
        pNext->skey = taosTimeTruncate(next, pInterval);
×
472
        pNext->ekey = taosTimeGetIntervalEnd(pNext->skey, pInterval);
×
473
      } else {
474
        pNext->skey -= ((pNext->skey - next + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
420,902,756✔
475
        pNext->ekey = pNext->skey + pInterval->interval - 1;
420,494,406✔
476
      }
477
    }
478
  }
479

480
  return startPos;
2,147,483,647✔
481
}
482

483
static bool isResultRowInterpolated(SResultRow* pResult, SResultTsInterpType type) {
17,705,631✔
484
  if (type == RESULT_ROW_START_INTERP) {
17,705,631✔
485
    return pResult->startInterp == true;
5,901,877✔
486
  } else {
487
    return pResult->endInterp == true;
11,803,754✔
488
  }
489
}
490

491
static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
11,778,747✔
492
  if (type == RESULT_ROW_START_INTERP) {
11,778,747✔
493
    pResult->startInterp = true;
5,901,429✔
494
  } else {
495
    pResult->endInterp = true;
5,877,318✔
496
  }
497
}
11,778,747✔
498

499
static int32_t doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataBlock* pBlock, SResultRow* pResult,
2,147,483,647✔
500
                                           STimeWindow* win, int32_t startPos, int32_t forwardRows, SExprSupp* pSup) {
501
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
502
  int32_t lino = 0;
2,147,483,647✔
503
  if (!pInfo->timeWindowInterpo) {
2,147,483,647✔
504
    return code;
2,147,483,647✔
505
  }
506

507
  if (pBlock == NULL) {
3,500,181✔
508
    code = TSDB_CODE_INVALID_PARA;
×
509
    return code;
×
510
  }
511

512
  if (pBlock->pDataBlock == NULL) {
3,500,181✔
513
    return code;
×
514
  }
515

516
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
5,901,877✔
517

518
  TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
5,901,877✔
519
  bool   done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
5,901,877✔
520
  if (!done) {  // it is not interpolated, now start to generated the interpolated value
5,901,877✔
521
    bool interp = setTimeWindowInterpolationStartTs(pInfo, startPos, pBlock, tsCols, win, pSup);
5,901,429✔
522
    if (interp) {
5,901,429✔
523
      setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
5,901,429✔
524
    }
525
  } else {
526
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_START_INTERP);
448✔
527
  }
528

529
  // point interpolation does not require the end key time window interpolation.
530
  // interpolation query does not generate the time window end interpolation
531
  done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
5,901,877✔
532
  if (!done) {
5,901,877✔
533
    int32_t endRowIndex = startPos + forwardRows - 1;
5,901,877✔
534
    int32_t nextRowIndex = endRowIndex + 1;
5,901,877✔
535

536
    // duplicated ts row does not involve in the interpolation of end value for current time window
537
    int32_t x = endRowIndex;
5,901,877✔
538
    while (x > 0) {
5,918,933✔
539
      if (tsCols[x] == tsCols[x - 1]) {
5,905,493✔
540
        x -= 1;
17,056✔
541
      } else {
542
        endRowIndex = x;
5,888,437✔
543
        break;
5,888,437✔
544
      }
545
    }
546

547
    TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
5,901,877✔
548
    bool  interp = false;
5,901,877✔
549
    code = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey,
5,901,877✔
550
                                           win, &interp);
551
    QUERY_CHECK_CODE(code, lino, _end);
5,901,877✔
552
    if (interp) {
5,901,877✔
553
      setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
5,877,318✔
554
    }
555
  } else {
556
    setNotInterpoWindowKey(pSup->pCtx, pSup->numOfExprs, RESULT_ROW_END_INTERP);
×
557
  }
558

559
_end:
5,901,877✔
560
  if (code != TSDB_CODE_SUCCESS) {
5,901,877✔
561
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
562
  }
563
  return code;
5,901,877✔
564
}
565

566
static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, SArray* pCols) {
26,487✔
567
  if (pBlock->pDataBlock == NULL) {
26,487✔
568
    return;
×
569
  }
570

571
  size_t num = taosArrayGetSize(pPrevKeys);
26,487✔
572
  for (int32_t k = 0; k < num; ++k) {
79,461✔
573
    SColumn* pc = taosArrayGet(pCols, k);
52,974✔
574

575
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pc->slotId);
52,974✔
576

577
    SGroupKeys* pkey = taosArrayGet(pPrevKeys, k);
52,974✔
578
    for (int32_t i = pBlock->info.rows - 1; i >= 0; --i) {
52,974✔
579
      if (colDataIsNull_s(pColInfo, i)) {
105,948✔
580
        continue;
×
581
      }
582

583
      char* val = colDataGetData(pColInfo, i);
52,974✔
584
      if (IS_VAR_DATA_TYPE(pkey->type)) {
52,974✔
585
        if (IS_STR_DATA_BLOB(pkey->type)) {
×
586
          memcpy(pkey->pData, val, blobDataTLen(val));
×
587
        } else {
588
          memcpy(pkey->pData, val, varDataTLen(val));
×
589
        }
590
      } else {
591
        memcpy(pkey->pData, val, pkey->bytes);
52,974✔
592
      }
593

594
      break;
52,974✔
595
    }
596
  }
597
}
598

599
static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
26,487✔
600
                                       SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
601
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
26,487✔
602

603
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
26,487✔
604
  SExprSupp*                pSup = &pOperatorInfo->exprSupp;
26,487✔
605

606
  int32_t startPos = 0;
26,487✔
607
  int32_t numOfOutput = pSup->numOfExprs;
26,487✔
608

609
  SResultRow* pResult = NULL;
26,487✔
610

611
  while (1) {
×
612
    SListNode*          pn = tdListGetHead(pResultRowInfo->openWindow);
26,487✔
613
    SOpenWindowInfo*    pOpenWin = (SOpenWindowInfo*)pn->data;
26,487✔
614
    uint64_t            groupId = pOpenWin->groupId;
26,487✔
615
    SResultRowPosition* p1 = &pOpenWin->pos;
26,487✔
616
    if (p->pageId == p1->pageId && p->offset == p1->offset) {
26,487✔
617
      break;
26,487✔
618
    }
619

620
    SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
×
621
    if (NULL == pr) {
×
622
      T_LONG_JMP(pTaskInfo->env, terrno);
×
623
    }
624

625
    if (!(pr->offset == p1->offset && pr->pageId == p1->pageId)) {
×
626
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
627
      T_LONG_JMP(pTaskInfo->env, terrno);
×
628
    }
629

630
    if (pr->closed) {
×
631
      if (!(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) &&
×
632
            isResultRowInterpolated(pr, RESULT_ROW_END_INTERP))) {
×
633
        pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
634
        T_LONG_JMP(pTaskInfo->env, terrno);
×
635
      }
636
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
637
      taosMemoryFree(pNode);
×
638
      continue;
×
639
    }
640

641
    STimeWindow w = pr->win;
×
642
    int32_t     ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
×
643
                                             numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
644
    if (ret != TSDB_CODE_SUCCESS) {
×
645
      T_LONG_JMP(pTaskInfo->env, ret);
×
646
    }
647

648
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
649
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
650
      T_LONG_JMP(pTaskInfo->env, terrno);
×
651
    }
652

653
    SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
×
654
    if (!pTsKey) {
×
655
      pTaskInfo->code = terrno;
×
656
      T_LONG_JMP(pTaskInfo->env, terrno);
×
657
    }
658

659
    int64_t prevTs = *(int64_t*)pTsKey->pData;
×
660
    if (groupId == pBlock->info.id.groupId) {
×
661
      TSKEY curTs = pBlock->info.window.skey;
×
662
      if (tsCols != NULL) {
×
663
        curTs = tsCols[startPos];
×
664
      }
665
      doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, curTs, startPos, w.ekey,
×
666
                                RESULT_ROW_END_INTERP, pSup);
667
    }
668

669
    setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
×
670
    setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
×
671

672
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
×
673
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
×
674
                                          pBlock->info.rows, numOfExprs);
×
675
    if (ret != TSDB_CODE_SUCCESS) {
×
676
      T_LONG_JMP(pTaskInfo->env, ret);
×
677
    }
678

679
    if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
×
680
      closeResultRow(pr);
×
681
      SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
×
682
      taosMemoryFree(pNode);
×
683
    } else {  // the remains are can not be closed yet.
684
      break;
×
685
    }
686
  }
687
}
26,487✔
688

689
static bool tsKeyCompFn(void* l, void* r, void* param) {
2,009,349,005✔
690
  TSKEY*                    lTS = (TSKEY*)l;
2,009,349,005✔
691
  TSKEY*                    rTS = (TSKEY*)r;
2,009,349,005✔
692
  SIntervalAggOperatorInfo* pInfo = param;
2,009,349,005✔
693
  return pInfo->binfo.outputTsOrder == ORDER_ASC ? *lTS < *rTS : *lTS > *rTS;
2,009,349,005✔
694
}
695

696
static bool isCalculatedWin(SIntervalAggOperatorInfo* pInfo, const STimeWindow* win, uint64_t tableGroupId) {
315,699,164✔
697
  char keyBuf[sizeof(TSKEY) + sizeof(uint64_t)] = {0};
315,699,164✔
698
  SET_RES_WINDOW_KEY(keyBuf, (char*)&win->skey, sizeof(TSKEY), tableGroupId);
315,660,879✔
699
  return tSimpleHashGet(pInfo->aggSup.pResultRowHashTable, keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))) != NULL;
316,078,790✔
700
}
701

702
/**
703
 * @brief check if cur window should be filtered out by limit info
704
 * @retval true if should be filtered out
705
 * @retval false if not filtering out
706
 * @note If no limit info, we skip filtering.
707
 *       If input/output ts order mismatch, we skip filtering too.
708
 *       eg. input ts order: desc, and output ts order: asc, limit: 10
709
 *       IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
710
 *       every tuple in every block.
711
 *       And the boundedQueue keeps refreshing all records with smaller ts key.
712
 */
713
static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STimeWindow* win, uint64_t groupId,
2,147,483,647✔
714
                                  SExecTaskInfo* pTaskInfo) {
715
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
716
  int32_t lino = 0;
2,147,483,647✔
717
  if (!pOperatorInfo->limited  // if no limit info, no filter will be applied
2,147,483,647✔
718
      || pOperatorInfo->binfo.inputTsOrder != pOperatorInfo->binfo.outputTsOrder
791,686,078✔
719
      // if input/output ts order mismatch, no filter
720
  ) {
721
    return false;
2,147,483,647✔
722
  }
723

724
  if (pOperatorInfo->limit == 0) return true;
317,167,949✔
725

726
  if (pOperatorInfo->pBQ == NULL) {
317,106,819✔
727
    pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
377,749✔
728
    QUERY_CHECK_NULL(pOperatorInfo->pBQ, code, lino, _end, terrno);
378,152✔
729
  }
730

731
  bool shouldFilter = false;
317,036,294✔
732
  // if BQ has been full, compare it with top of BQ
733
  if (taosBQSize(pOperatorInfo->pBQ) == taosBQMaxSize(pOperatorInfo->pBQ) + 1) {
317,036,294✔
734
    PriorityQueueNode* top = taosBQTop(pOperatorInfo->pBQ);
88,292,337✔
735
    shouldFilter = tsKeyCompFn(top->data, &win->skey, pOperatorInfo);
88,287,098✔
736
  }
737
  if (shouldFilter) {
316,371,344✔
738
    return true;
984,102✔
739
  } else if (isCalculatedWin(pOperatorInfo, win, groupId)) {
315,387,242✔
740
    return false;
129,846,308✔
741
  }
742

743
  // cur win not been filtered out and not been pushed into BQ yet, push it into BQ
744
  PriorityQueueNode node = {.data = taosMemoryMalloc(sizeof(TSKEY))};
186,086,999✔
745
  QUERY_CHECK_NULL(node.data, code, lino, _end, terrno);
186,032,191✔
746

747
  *((TSKEY*)node.data) = win->skey;
186,032,191✔
748

749
  if (NULL == taosBQPush(pOperatorInfo->pBQ, &node)) {
186,035,012✔
750
    taosMemoryFree(node.data);
×
751
    return true;
×
752
  }
753

754
_end:
185,586,876✔
755
  if (code != TSDB_CODE_SUCCESS) {
186,121,657✔
756
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
244,621✔
757
    pTaskInfo->code = code;
244,621✔
758
    T_LONG_JMP(pTaskInfo->env, code);
×
759
  }
760
  return false;
185,877,036✔
761
}
762

763
int32_t getNumOfRowsInTimeWinUnsorted(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, STimeWindow* win,
2,147,483,647✔
764
                                      int32_t startPos) {
765
  int32_t rows = pDataBlockInfo->rows;
2,147,483,647✔
766
  for (int32_t i = startPos; i < pDataBlockInfo->rows; ++i) {
2,147,483,647✔
767
    if (pPrimaryColumn[i] >= win->skey && pPrimaryColumn[i] <= win->ekey) {
2,147,483,647✔
768
      continue;
2,147,483,647✔
769
    } else {
770
      return i - startPos;
2,147,483,647✔
771
    }
772
  }
773
  return rows - startPos;
114,874,319✔
774
}
775

776
static bool hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
123,680,426✔
777
                            int32_t scanFlag) {
778
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
123,680,426✔
779

780
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
123,683,351✔
781
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
123,682,653✔
782

783
  int32_t     startPos = 0;
123,684,872✔
784
  int32_t     numOfOutput = pSup->numOfExprs;
123,684,872✔
785
  int64_t*    tsCols = extractTsCol(pBlock, pInfo, pTaskInfo);
123,680,872✔
786
  uint64_t    tableGroupId = pBlock->info.id.groupId;
123,677,531✔
787
  bool        ascScan = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
123,681,262✔
788
  SResultRow* pResult = NULL;
123,678,092✔
789
  bool        sorted = pInfo->binfo.inputTsOrder == ORDER_ASC || pInfo->binfo.inputTsOrder == ORDER_DESC || tsCols == NULL;
123,678,587✔
790
  TSKEY       ts = sorted ? getStartTsKey(&pBlock->info.window, tsCols) : tsCols[startPos];
123,677,409✔
791
  int32_t     ret = TSDB_CODE_SUCCESS;
123,672,689✔
792

793
  if (tableGroupId != pInfo->curGroupId) {
123,672,689✔
794
    if (pInfo->indefRowsMode) {
13,969,375✔
795
      ret = closeAllIndefRowsWindowStates(pOperatorInfo, &pInfo->indefRows);
17,024✔
796
      if (ret != TSDB_CODE_SUCCESS) {
17,024✔
797
        T_LONG_JMP(pTaskInfo->env, ret);
×
798
      }
799
    }
800

801
    pInfo->handledGroupNum += 1;
13,971,077✔
802
    if (pInfo->slimited && pInfo->handledGroupNum > pInfo->slimit) {
13,970,632✔
803
      return true;
36,966✔
804
    } else {
805
      pInfo->curGroupId = tableGroupId;
13,933,536✔
806
      destroyBoundedQueue(pInfo->pBQ);
13,934,069✔
807
      pInfo->pBQ = NULL;
13,932,974✔
808
    }
809
  }
810

811
  STimeWindow win =
123,642,836✔
812
      getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->binfo.inputTsOrder);
123,634,665✔
813
  if (filterWindowWithLimit(pInfo, &win, tableGroupId, pTaskInfo)) return false;
123,641,082✔
814

815
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
122,821,519✔
816
  int32_t forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey,
9,835,187✔
817
                                                          NULL, pInfo->binfo.inputTsOrder)
818
                               : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &win, startPos);
122,821,519✔
819

820
  if (pInfo->indefRowsMode) {
122,832,736✔
821
    bool closeWindow = sorted && (startPos + forwardRows < pBlock->info.rows);
307,455✔
822
    ret = applyIndefRowsWindowSegment(pOperatorInfo, &pInfo->indefRows, pInfo->binfo.pRes, pInfo->aggSup.resultRowSize,
307,455✔
823
                                      tableGroupId, &win, pBlock, startPos, forwardRows, pInfo->binfo.inputTsOrder,
824
                                      closeWindow);
825
    if (ret != TSDB_CODE_SUCCESS) {
307,455✔
826
      T_LONG_JMP(pTaskInfo->env, ret);
1,777✔
827
    }
828
  } else {
829
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
122,519,123✔
830
                                 numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
831
    if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
122,523,890✔
832
      T_LONG_JMP(pTaskInfo->env, ret);
6,221✔
833
    }
834

835
    // prev time window not interpolation yet.
836
    if (pInfo->timeWindowInterpo) {
122,517,825✔
837
      SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
26,487✔
838
      doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
26,487✔
839

840
      // restore current time window
841
      ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
26,487✔
842
                                   numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
843
      if (ret != TSDB_CODE_SUCCESS) {
26,487✔
844
        T_LONG_JMP(pTaskInfo->env, ret);
×
845
      }
846

847
      // window start key interpolation
848
      ret = doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
26,487✔
849
      if (ret != TSDB_CODE_SUCCESS) {
26,487✔
850
        T_LONG_JMP(pTaskInfo->env, ret);
×
851
      }
852
    }
853
    // qDebug("hashIntervalAgg1 window skey: %lld, ekey:%lld, startPos: %d, forwardRows: %d",
854
    //   win.skey, win.ekey, startPos, forwardRows);
855
    updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
122,516,685✔
856
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
122,519,082✔
857
                                          pBlock->info.rows, numOfOutput);
122,513,279✔
858
    if (ret != TSDB_CODE_SUCCESS) {
122,518,454✔
859
      T_LONG_JMP(pTaskInfo->env, ret);
×
860
    }
861

862
    doCloseWindow(pResultRowInfo, pInfo, pResult);
122,518,454✔
863
  }
864

865
  STimeWindow nextWin = win;
122,824,340✔
866
  int32_t rows = pBlock->info.rows;
122,825,893✔
867

868
  while (startPos < pBlock->info.rows) {
2,147,483,647✔
869
    if (sorted) {
2,147,483,647✔
870
      startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, forwardRows - 1 + startPos,
2,147,483,647✔
871
                                        pInfo->binfo.inputTsOrder);
872
      if (startPos < 0) {
2,147,483,647✔
873
        break;
9,610,935✔
874
      }
875
    } else {
876
      pBlock->info.rows = forwardRows;
2,147,483,647✔
877
      int32_t newStartOff = forwardRows >= 1
2,147,483,647✔
878
                                ? getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols + startPos,
2,147,483,647✔
879
                                                         forwardRows - 1, pInfo->binfo.inputTsOrder)
880
                                : -1;
2,147,483,647✔
881
      pBlock->info.rows = rows;
2,147,483,647✔
882
      if (newStartOff >= 0) {
2,147,483,647✔
883
        startPos += newStartOff;
96,326,822✔
884
      } else if ((startPos += forwardRows) < pBlock->info.rows) {
2,147,483,647✔
885
        getInitialStartTimeWindow(&pInfo->interval, tsCols[startPos], &nextWin, true);
2,147,483,647✔
886
      }
887
      if (startPos >= pBlock->info.rows) {
2,147,483,647✔
888
        break;
112,998,730✔
889
      }
890
    }
891

892
    if (filterWindowWithLimit(pInfo, &nextWin, tableGroupId, pTaskInfo)) {
2,147,483,647✔
893
      break;
222,748✔
894
    }
895

896
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
2,147,483,647✔
897
    forwardRows = sorted ? getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
2,147,483,647✔
898
                                                    pInfo->binfo.inputTsOrder)
899
                         : getNumOfRowsInTimeWinUnsorted(&pBlock->info, tsCols, &nextWin, startPos);
2,147,483,647✔
900
    if (forwardRows == 0) continue;
2,147,483,647✔
901

902
    if (pInfo->indefRowsMode) {
2,147,483,647✔
903
      bool closeWindow = sorted && (startPos + forwardRows < pBlock->info.rows);
10,189,551✔
904
      ret = applyIndefRowsWindowSegment(pOperatorInfo, &pInfo->indefRows, pInfo->binfo.pRes, pInfo->aggSup.resultRowSize,
10,189,551✔
905
                                        tableGroupId, &nextWin, pBlock, startPos, forwardRows,
906
                                        pInfo->binfo.inputTsOrder, closeWindow);
907
      if (ret != TSDB_CODE_SUCCESS) {
10,189,551✔
908
        T_LONG_JMP(pTaskInfo->env, ret);
×
909
      }
910
    } else {
911
      // null data, failed to allocate more memory buffer
912
      int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
2,147,483,647✔
913
                                            pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
914
      if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
2,147,483,647✔
UNCOV
915
        T_LONG_JMP(pTaskInfo->env, code);
×
916
      }
917

918
      // window start(end) key interpolation
919
      code = doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
2,147,483,647✔
920
      if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
921
        T_LONG_JMP(pTaskInfo->env, code);
×
922
      }
923
      // TODO: add to open window? how to close the open windows after input blocks exhausted?
924
#if 0
925
      if ((ascScan && ekey <= pBlock->info.window.ekey) ||
926
          (!ascScan && ekey >= pBlock->info.window.skey)) {
927
        // window start(end) key interpolation
928
        doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
929
      } else if (pInfo->timeWindowInterpo) {
930
        addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
931
      }
932
#endif
933
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
2,147,483,647✔
934
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
2,147,483,647✔
935
                                            pBlock->info.rows, numOfOutput);
2,147,483,647✔
936
      if (ret != TSDB_CODE_SUCCESS) {
2,147,483,647✔
937
        T_LONG_JMP(pTaskInfo->env, ret);
×
938
      }
939
      doCloseWindow(pResultRowInfo, pInfo, pResult);
2,147,483,647✔
940
    }
941
  }
942

943
  if (pInfo->timeWindowInterpo) {
124,283,275✔
944
    saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
26,487✔
945
  }
946
  return false;
122,828,252✔
947
}
948

949
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
2,147,483,647✔
950
  // current result is done in computing final results.
951
  if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
2,147,483,647✔
952
    closeResultRow(pResult);
5,877,318✔
953
    SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow);
5,877,318✔
954
    taosMemoryFree(pNode);
5,877,318✔
955
  }
956
}
2,147,483,647✔
957

958
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId,
26,487✔
959
                                       SExecTaskInfo* pTaskInfo) {
960
  int32_t         code = TSDB_CODE_SUCCESS;
26,487✔
961
  int32_t         lino = 0;
26,487✔
962
  SOpenWindowInfo openWin = {0};
26,487✔
963
  openWin.pos.pageId = pResult->pageId;
26,487✔
964
  openWin.pos.offset = pResult->offset;
26,487✔
965
  openWin.groupId = groupId;
26,487✔
966
  SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
26,487✔
967
  if (pn == NULL) {
26,487✔
968
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
26,487✔
969
    QUERY_CHECK_CODE(code, lino, _end);
26,487✔
970
    return openWin.pos;
26,487✔
971
  }
972

973
  SOpenWindowInfo* px = (SOpenWindowInfo*)pn->data;
×
974
  if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
×
975
    code = tdListAppend(pResultRowInfo->openWindow, &openWin);
×
976
    QUERY_CHECK_CODE(code, lino, _end);
×
977
  }
978

979
_end:
×
980
  if (code != TSDB_CODE_SUCCESS) {
×
981
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
982
    pTaskInfo->code = code;
×
983
    T_LONG_JMP(pTaskInfo->env, code);
×
984
  }
985
  return openWin.pos;
×
986
}
987

988
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
127,808,712✔
989
  TSKEY* tsCols = NULL;
127,808,712✔
990

991
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
127,808,712✔
992
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
127,815,624✔
993
    if (!pColDataInfo) {
127,805,586✔
994
      pTaskInfo->code = terrno;
×
995
      T_LONG_JMP(pTaskInfo->env, terrno);
×
996
    }
997

998
    tsCols = (int64_t*)pColDataInfo->pData;
127,805,586✔
999
    if (tsCols[0] == 0) {
127,807,261✔
1000
      qWarn("%s at line %d.block start ts:%" PRId64 ",end ts:%" PRId64, __func__, __LINE__, tsCols[0],
431✔
1001
            tsCols[pBlock->info.rows - 1]);
1002
    }
1003

1004
    if (tsCols[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
127,814,262✔
1005
      int32_t code = blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
7,912,131✔
1006
      if (code != TSDB_CODE_SUCCESS) {
7,912,199✔
1007
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1008
        pTaskInfo->code = code;
×
1009
        T_LONG_JMP(pTaskInfo->env, code);
×
1010
      }
1011
    }
1012
  }
1013

1014
  return tsCols;
127,808,902✔
1015
}
1016

1017
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
28,349,605✔
1018
  if (OPTR_IS_OPENED(pOperator)) {
28,349,605✔
1019
    return TSDB_CODE_SUCCESS;
23,319,545✔
1020
  }
1021

1022
  int32_t        code = TSDB_CODE_SUCCESS;
5,030,064✔
1023
  int32_t        lino = 0;
5,030,064✔
1024
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
5,030,064✔
1025
  SOperatorInfo* downstream = pOperator->pDownstream[0];
5,027,784✔
1026

1027
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
5,030,064✔
1028
  SExprSupp*                pSup = &pOperator->exprSupp;
5,028,924✔
1029

1030
  if (pSup->pFilterInfo != NULL) {
5,026,640✔
1031
    filterSetExecContext(pSup->pFilterInfo, pTaskInfo, isTaskKilled);
7,383✔
1032
  }
1033

1034
  int32_t scanFlag = MAIN_SCAN;
5,023,802✔
1035

1036
  pInfo->cleanGroupResInfo = false;
5,023,802✔
1037
  while (1) {
123,595,562✔
1038
    SSDataBlock* pBlock = getNextBlockFromDownstreamRemainDetach(pOperator, 0);
128,621,070✔
1039
    if (pBlock == NULL) {
128,658,905✔
1040
      break;
4,950,805✔
1041
    }
1042

1043
    pInfo->binfo.pRes->info.scanFlag = scanFlag = pBlock->info.scanFlag;
123,708,100✔
1044

1045
    if (pInfo->scalarSupp.pExprInfo != NULL) {
123,721,739✔
1046
      SExprSupp* pExprSup = &pInfo->scalarSupp;
11,134,722✔
1047
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
22,269,612✔
1048
                                   GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
11,135,149✔
1049
      QUERY_CHECK_CODE(code, lino, _end);
11,134,085✔
1050
    }
1051

1052
    // the pDataBlock are always the same one, no need to call this again
1053
    code = setInputDataBlock(pSup, pBlock, pInfo->binfo.inputTsOrder, scanFlag, true);
123,676,030✔
1054
    QUERY_CHECK_CODE(code, lino, _end);
123,681,528✔
1055
    if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
123,681,528✔
1056
  }
1057

1058
  if (pInfo->indefRowsMode) {
4,987,771✔
1059
    code = closeAllIndefRowsWindowStates(pOperator, &pInfo->indefRows);
72,192✔
1060
    QUERY_CHECK_CODE(code, lino, _end);
72,192✔
1061
  } else {
1062
    code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
4,915,579✔
1063
    QUERY_CHECK_CODE(code, lino, _end);
4,915,579✔
1064
    pInfo->cleanGroupResInfo = true;
4,915,579✔
1065
  }
1066

1067
  OPTR_SET_OPENED(pOperator);
4,987,771✔
1068

1069
_end:
5,027,671✔
1070
  if (code != TSDB_CODE_SUCCESS) {
5,027,671✔
1071
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
39,900✔
1072
    pTaskInfo->code = code;
39,900✔
1073
    T_LONG_JMP(pTaskInfo->env, code);
39,900✔
1074
  }
1075
  return code;
4,987,771✔
1076
}
1077

1078
// start a new state window and record the start info
1079
void doKeepNewStateWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList,
2,147,483,647✔
1080
  int32_t rowIndex, uint64_t groupId, const EStateWinExtendOption* extendOption, bool hasPrevWin) {
1081
  pRowSup->groupId = groupId;
2,147,483,647✔
1082
  if (*extendOption == STATE_WIN_EXTEND_OPTION_DEFAULT ||
2,147,483,647✔
1083
      *extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
2,147,483,647✔
1084
    pRowSup->win.skey = tsList[rowIndex];
2,147,483,647✔
1085
    pRowSup->startRowIndex = rowIndex;
2,147,483,647✔
1086
    pRowSup->numOfRows = 0;  // does not include the current row yet
2,147,483,647✔
1087
  } else {
1088
    pRowSup->startRowIndex = hasContinuousNullRows(pRowSup) ?
2,147,483,647✔
1089
      rowIndex - pRowSup->numNullRows : rowIndex;
1,171,065,358✔
1090
    pRowSup->win.skey = hasPrevWin ?
1,171,065,358✔
1091
                        pRowSup->win.ekey + 1 : tsList[pRowSup->startRowIndex];
1,171,065,358✔
1092
    pRowSup->numOfRows = rowIndex - pRowSup->startRowIndex;
1,171,065,358✔
1093
  }
1094
  resetNumNullRows(pRowSup);
2,147,483,647✔
1095
}
2,147,483,647✔
1096

1097
// close a state window and record its end info
1098
// this functions is called when a new state row appears
1099
// @param rowIndex the index of the first row of next window
1100
void doKeepCurStateWindowEndInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, 
2,147,483,647✔
1101
                                 int32_t rowIndex,
1102
                                 const EStateWinExtendOption* extendOption,
1103
                                 bool hasNextWin) {
1104
  if (*extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
2,147,483,647✔
1105
      pRowSup->win.ekey = hasNextWin?
1,180,014,470✔
1106
                          tsList[rowIndex] - 1 : pRowSup->prevTs;
1,180,014,470✔
1107
      // continuous rows having null state col should be included in this window
1108
      pRowSup->numOfRows += hasContinuousNullRows(pRowSup) ?
2,147,483,647✔
1109
        pRowSup->numNullRows : 0;
1,180,014,470✔
1110
      resetNumNullRows(pRowSup);
1,180,014,470✔
1111
  }
1112
}
2,147,483,647✔
1113

1114
void doKeepStateWindowNullInfo(SWindowRowsSup* pRowSup, TSKEY nullRowTs) {
997,416,846✔
1115
  pRowSup->numNullRows += 1;
997,416,846✔
1116
  pRowSup->prevTs = nullRowTs;
997,416,846✔
1117
}
997,416,846✔
1118

1119
/**
1120
  @brief Process the closed state window and do aggregation on the tuples
1121
  within the window. Partial results are stored in the output buffer. If window
1122
  has no valid rows, return success.
1123
*/
1124
static int32_t processClosedStateWindow(SStateWindowOperatorInfo* pInfo,
2,147,483,647✔
1125
                                        SWindowRowsSup* pRowSup,
1126
                                        SSDataBlock* pBlock,
1127
                                        SExecTaskInfo* pTaskInfo,
1128
                                        SExprSupp* pSup,
1129
                                        int32_t numOfOutput,
1130
                                        bool closeWindow) {
1131
  if (pRowSup->numOfRows == 0) {
2,147,483,647✔
1132
    // no valid rows in the window
1133
    return TSDB_CODE_SUCCESS;
20,691,565✔
1134
  }
1135

1136
  if (pInfo->indefRowsMode) {
2,147,483,647✔
1137
    return applyIndefRowsWindowSegment(pInfo->pOperator, &pInfo->indefRows, pInfo->binfo.pRes,
128,034✔
1138
                                       pInfo->aggSup.resultRowSize, pRowSup->groupId, &pRowSup->win, pBlock,
64,017✔
1139
                                       pRowSup->startRowIndex, pRowSup->numOfRows, pInfo->binfo.inputTsOrder,
1140
                                       closeWindow);
1141
  }
1142

1143
  int32_t     code = TSDB_CODE_SUCCESS;
2,147,483,647✔
1144
  int32_t     lino = 0;
2,147,483,647✔
1145
  SResultRow* pResult = NULL;
2,147,483,647✔
1146
  code = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win,
2,147,483,647✔
1147
    true, &pResult, pRowSup->groupId, pSup->pCtx, numOfOutput,
2,147,483,647✔
1148
    pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1149
  QUERY_CHECK_CODE(code, lino, _return);
2,147,483,647✔
1150

1151
  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
2,147,483,647✔
1152
  pResult->nOrigRows += pRowSup->numOfRows;
2,147,483,647✔
1153
  code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx,
2,147,483,647✔
1154
    &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
1155
    pRowSup->numOfRows, 0, numOfOutput);
1156
  QUERY_CHECK_CODE(code, lino, _return);
2,147,483,647✔
1157

1158
_return:
2,147,483,647✔
1159
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
1160
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
484✔
1161
  }
1162
  return code;
2,147,483,647✔
1163
}
1164

1165
// process a data block for state window aggregation
1166
// scan from startIndex to endIndex
1167
// numPartialCalcRows returns the number of rows that have been
1168
// partially calculated within the block
1169
static void doStateWindowAggImpl(SOperatorInfo* pOperator,
34,010,367✔
1170
                                 SStateWindowOperatorInfo* pInfo,
1171
                                 SSDataBlock* pBlock, int32_t* startIndex,
1172
                                 int32_t* endIndex,
1173
                                 int32_t* numPartialCalcRows) {
1174
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
34,010,367✔
1175
  SExprSupp*     pExprSup = &pOperator->exprSupp;
34,010,367✔
1176

1177
  SColumnInfoData* pStateColInfoData = 
1178
    taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
34,010,367✔
1179
  if (!pStateColInfoData) {
34,010,367✔
1180
    pTaskInfo->code = terrno;
×
1181
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1182
  }
1183
  uint64_t gid = pBlock->info.id.groupId;
34,010,367✔
1184
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
34,010,367✔
1185
  int32_t bytes = pStateColInfoData->info.bytes;
34,010,367✔
1186

1187
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock,
34,010,367✔
1188
                                               pInfo->tsSlotId);
34,010,367✔
1189
  if (NULL == pColInfoData) {
34,010,367✔
1190
    pTaskInfo->code = terrno;
×
1191
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1192
  }
1193
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
34,010,367✔
1194

1195
  struct SColumnDataAgg* pAgg = (pBlock->pBlockAgg != NULL) ?
34,010,367✔
1196
                                &pBlock->pBlockAgg[pInfo->stateCol.slotId] :
34,010,367✔
1197
                                NULL;
1198
  EStateWinExtendOption  extendOption = pInfo->extendOption;
34,010,367✔
1199
  SWindowRowsSup*        pRowSup = &pInfo->winSup;
34,010,367✔
1200

1201
  if (pRowSup->groupId != gid) {
34,010,367✔
1202
    /*
1203
      group changed, process the previous group's unclosed state window first
1204
    */
1205
    doKeepCurStateWindowEndInfo(pRowSup, tsList, 0, &extendOption, false);
13,569,506✔
1206
    int32_t code = processClosedStateWindow(pInfo, pRowSup, pBlock, pTaskInfo,
13,569,506✔
1207
                                            pExprSup, numOfOutput, true);
1208
    if (TSDB_CODE_SUCCESS != code) T_LONG_JMP(pTaskInfo->env, code);
13,569,506✔
1209
    *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
13,569,506✔
1210

1211
    /*
1212
      unhandled null rows should be ignored, since they belong to previous group
1213
    */
1214
    *numPartialCalcRows += pRowSup->numNullRows;
13,569,506✔
1215

1216
    /*
1217
      reset state window info for new group
1218
    */
1219
    pInfo->hasKey = false;
13,569,506✔
1220
    resetWindowRowsSup(pRowSup);
13,569,506✔
1221
  }
1222

1223
  for (int32_t j = *startIndex; j < *endIndex; ++j) {
2,147,483,647✔
1224
    if (pBlock->info.scanFlag != PRE_SCAN) {
2,147,483,647✔
1225
      if (pInfo->winSup.lastTs == INT64_MIN || gid != pRowSup->groupId || !pInfo->hasKey) {
2,147,483,647✔
1226
        pInfo->winSup.lastTs = tsList[j];
433,568,393✔
1227
      } else {
1228
        if (tsList[j] == pInfo->winSup.lastTs) {
2,147,483,647✔
1229
          // forbid duplicated ts rows
1230
          qError("%s:%d duplicated ts found in state window aggregation", __FILE__, __LINE__);
23,328✔
1231
          pTaskInfo->code = TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP;
23,328✔
1232
          T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_WINDOW_DUP_TIMESTAMP);
23,328✔
1233
        } else {
1234
          pInfo->winSup.lastTs = tsList[j];
2,147,483,647✔
1235
        }
1236
      }
1237
    }
1238
    if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
2,147,483,647✔
1239
      doKeepStateWindowNullInfo(pRowSup, tsList[j]);
997,416,846✔
1240
      continue;
997,416,846✔
1241
    }
1242
    if (pStateColInfoData->pData == NULL) {
2,147,483,647✔
1243
      pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1244
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1245
    }
1246
    char* val = colDataGetData(pStateColInfoData, j);
2,147,483,647✔
1247

1248
    if (!pInfo->hasKey) {
2,147,483,647✔
1249
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
5,925,459✔
1250
      pInfo->hasKey = true;
5,925,459✔
1251
      doKeepNewStateWindowStartInfo(
5,925,459✔
1252
        pRowSup, tsList, j, gid, &extendOption, false);
1253
      doKeepTuple(pRowSup, tsList[j], j, gid);
5,925,459✔
1254
    } else if (!compareVal(val, &pInfo->stateKey)) {
2,147,483,647✔
1255
      doKeepCurStateWindowEndInfo(pRowSup, tsList, j, &extendOption, true);
2,147,483,647✔
1256
      int32_t code = processClosedStateWindow(pInfo, pRowSup, pBlock, pTaskInfo,
2,147,483,647✔
1257
                                              pExprSup, numOfOutput, true);
1258
      if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1259
        T_LONG_JMP(pTaskInfo->env, code);
×
1260
      }
1261
      *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
2,147,483,647✔
1262

1263
      doKeepNewStateWindowStartInfo(pRowSup, tsList, j, gid,
2,147,483,647✔
1264
                                    &extendOption, true);
1265
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
1266
      assignVal(pInfo->stateKey.pData, val, bytes, pInfo->stateKey.type);
2,147,483,647✔
1267
    } else {
1268
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
1269
    }
1270
  }
1271

1272
  if (!pInfo->hasKey && extendOption != STATE_WIN_EXTEND_OPTION_FORWARD) {
33,987,039✔
1273
    /*
1274
      No valid state rows within the block and we don't care about
1275
      null rows before valid state window, mark them as processed and drop them
1276
    */
1277
    *numPartialCalcRows = pBlock->info.rows;
5,976,664✔
1278
    resetNumNullRows(pRowSup);
5,976,664✔
1279
    return;
5,976,664✔
1280
  }
1281
  if (pRowSup->numOfRows == 0 && 
28,010,375✔
1282
      extendOption != STATE_WIN_EXTEND_OPTION_BACKWARD) {
5,066,035✔
1283
    /*
1284
      If no valid state window or we don't know the belonging of
1285
      null rows in the end of the block, handle them with next block
1286
    */
1287
    return;
4,223,787✔
1288
  }
1289
  doKeepCurStateWindowEndInfo(pRowSup, tsList, *endIndex, &extendOption, false);
23,786,588✔
1290
  int32_t code = processClosedStateWindow(pInfo, pRowSup, pBlock, pTaskInfo,
23,786,588✔
1291
                                          pExprSup, numOfOutput, false);
1292
  if (TSDB_CODE_SUCCESS != code) {
23,786,588✔
1293
    pTaskInfo->code = code;
484✔
1294
    T_LONG_JMP(pTaskInfo->env, code);
484✔
1295
  }
1296
  *numPartialCalcRows = pRowSup->startRowIndex + pRowSup->numOfRows;
23,786,104✔
1297
  // reset part of pRowSup after doing agg calculation
1298
  pRowSup->startRowIndex = 0;
23,786,104✔
1299
  pRowSup->numOfRows = 0;
23,786,104✔
1300
}
1301

1302
static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
3,720,407✔
1303
  if (OPTR_IS_OPENED(pOperator)) {
3,720,407✔
1304
    return TSDB_CODE_SUCCESS;
2,650,711✔
1305
  }
1306

1307
  int32_t                   code = TSDB_CODE_SUCCESS;
1,069,696✔
1308
  int32_t                   lino = 0;
1,069,696✔
1309
  SStateWindowOperatorInfo* pInfo = pOperator->info;
1,069,696✔
1310
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
1,069,696✔
1311

1312
  SExprSupp* pSup = &pOperator->exprSupp;
1,069,696✔
1313
  int32_t    order = pInfo->binfo.inputTsOrder;
1,069,696✔
1314

1315
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,069,696✔
1316
  pInfo->cleanGroupResInfo = false;
1,069,696✔
1317

1318
  SSDataBlock* pUnfinishedBlock = NULL;
1,069,696✔
1319
  int32_t      startIndex = 0;
1,069,696✔
1320
  int32_t      endIndex = 0;
1,069,696✔
1321
  int32_t      numPartialCalcRows = 0;
1,069,696✔
1322
  while (1) {
33,986,555✔
1323
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
35,056,251✔
1324
    if (pBlock == NULL) {
35,055,827✔
1325
      if (pUnfinishedBlock != NULL) {
1,030,070✔
1326
        blockDataDestroy(pUnfinishedBlock);
29,728✔
1327
        pUnfinishedBlock = NULL;
29,728✔
1328
        resetWindowRowsSup(&pInfo->winSup);
29,728✔
1329
      }
1330
      break;
1,030,070✔
1331
    }
1332
    
1333
    // mark whether pUnfinishedBlock is a reference to pBlock
1334
    bool isRef = false;
34,025,757✔
1335
    startIndex = 0;
34,025,757✔
1336
    if (pUnfinishedBlock != NULL) {
34,025,757✔
1337
      startIndex = pUnfinishedBlock->info.rows;
8,092,691✔
1338
      // merge unfinished block with current block
1339
      code = blockDataMerge(pUnfinishedBlock, pBlock);
8,092,691✔
1340
      // reset id to current block id
1341
      pUnfinishedBlock->info.id = pBlock->info.id;
8,092,691✔
1342
      QUERY_CHECK_CODE(code, lino, _end);
8,092,691✔
1343
    } else {
1344
      pUnfinishedBlock = pBlock;
25,933,066✔
1345
      isRef = true;
25,933,066✔
1346
    }
1347
    endIndex = pUnfinishedBlock->info.rows;
34,025,757✔
1348

1349
    pInfo->binfo.pRes->info.scanFlag = pUnfinishedBlock->info.scanFlag;
34,025,757✔
1350
    pInfo->binfo.pRes->info.dataLoad = 1;
34,025,757✔
1351
    code = setInputDataBlock(
34,025,757✔
1352
      pSup, pUnfinishedBlock, order, pUnfinishedBlock->info.scanFlag, true);
34,025,757✔
1353
    QUERY_CHECK_CODE(code, lino, _end);
34,025,757✔
1354

1355
    code = blockDataUpdateTsWindow(pUnfinishedBlock, pInfo->tsSlotId);
34,025,757✔
1356
    QUERY_CHECK_CODE(code, lino, _end);
34,025,757✔
1357

1358
    // there is an scalar expression that 
1359
    // needs to be calculated right before apply the group aggregation.
1360
    if (pInfo->scalarSup.pExprInfo != NULL) {
34,025,757✔
1361
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo,
607,684✔
1362
        pUnfinishedBlock, pUnfinishedBlock, pInfo->scalarSup.pCtx,
1363
        pInfo->scalarSup.numOfExprs, NULL,
1364
        GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
303,842✔
1365
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
303,842✔
1366
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
15,390✔
1367
      }
1368
    }
1369

1370
    doStateWindowAggImpl(pOperator, pInfo, pUnfinishedBlock, 
34,010,367✔
1371
      &startIndex, &endIndex, &numPartialCalcRows);
1372
    if (numPartialCalcRows < pUnfinishedBlock->info.rows) {
33,986,555✔
1373
      // save unfinished block for next round processing
1374
      if (isRef) {
8,122,419✔
1375
        code = createOneDataBlock(pBlock, true, &pUnfinishedBlock);
3,098,702✔
1376
        QUERY_CHECK_CODE(code, lino, _end);
3,098,702✔
1377
      }
1378
      code = blockDataTrimFirstRows(pUnfinishedBlock, numPartialCalcRows);
8,122,419✔
1379
      QUERY_CHECK_NULL(pUnfinishedBlock, code, lino, _end, terrno);
8,122,419✔
1380
    } else {
1381
      if (!isRef) {
25,864,136✔
1382
        blockDataDestroy(pUnfinishedBlock);
3,068,974✔
1383
      }
1384
      pUnfinishedBlock = NULL;
25,864,136✔
1385
    }
1386
    numPartialCalcRows = 0;
33,986,555✔
1387
  }
1388

1389
  if (pInfo->indefRowsMode) {
1,030,070✔
1390
    code = closeAllIndefRowsWindowStates(pOperator, &pInfo->indefRows);
10,259✔
1391
    QUERY_CHECK_CODE(code, lino, _end);
10,259✔
1392
  } else {
1393
    code = initGroupedResultInfo(
1,019,811✔
1394
      &pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
1395
    QUERY_CHECK_CODE(code, lino, _end);
1,019,811✔
1396
    pInfo->cleanGroupResInfo = true;
1,019,811✔
1397
  }
1398
  pOperator->status = OP_RES_TO_RETURN;
1,030,070✔
1399

1400
_end:
1,030,070✔
1401
  if (code != TSDB_CODE_SUCCESS) {
1,030,070✔
1402
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1403
    pTaskInfo->code = code;
×
1404
    T_LONG_JMP(pTaskInfo->env, code);
×
1405
  }
1406
  return code;
1,030,070✔
1407
}
1408

1409
static int32_t doStateWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
4,575,850✔
1410
  if (pOperator->status == OP_EXEC_DONE) {
4,575,850✔
1411
    (*ppRes) = NULL;
855,443✔
1412
    return TSDB_CODE_SUCCESS;
855,443✔
1413
  }
1414

1415
  int32_t                   code = TSDB_CODE_SUCCESS;
3,720,407✔
1416
  int32_t                   lino = 0;
3,720,407✔
1417
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
3,720,407✔
1418
  SStateWindowOperatorInfo* pInfo = pOperator->info;
3,720,407✔
1419
  SOptrBasicInfo*           pBInfo = &pInfo->binfo;
3,720,407✔
1420

1421
  code = pOperator->fpSet._openFn(pOperator);
3,720,407✔
1422
  QUERY_CHECK_CODE(code, lino, _end);
3,680,781✔
1423

1424
  if (pInfo->indefRowsMode) {
3,680,781✔
1425
    (*ppRes) = getNextIndefRowsResultBlock(&pInfo->indefRows, pOperator);
71,588✔
1426
    if ((*ppRes) == NULL) {
71,588✔
1427
      setOperatorCompleted(pOperator);
10,259✔
1428
    }
1429
    return code;
71,588✔
1430
  }
1431

1432
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
3,609,193✔
1433
  QUERY_CHECK_CODE(code, lino, _end);
3,609,193✔
1434

1435
  while (1) {
×
1436
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
3,609,193✔
1437
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
3,609,193✔
1438
    QUERY_CHECK_CODE(code, lino, _end);
3,609,193✔
1439

1440
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
3,609,193✔
1441
    if (!hasRemain) {
3,609,193✔
1442
      setOperatorCompleted(pOperator);
968,411✔
1443
      break;
968,411✔
1444
    }
1445

1446
    if (pBInfo->pRes->info.rows > 0) {
2,640,782✔
1447
      break;
2,640,782✔
1448
    }
1449
  }
1450

1451
_end:
3,609,193✔
1452
  if (code != TSDB_CODE_SUCCESS) {
3,609,193✔
1453
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1454
    pTaskInfo->code = code;
×
1455
    T_LONG_JMP(pTaskInfo->env, code);
×
1456
  }
1457
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
3,609,193✔
1458
  return code;
3,609,193✔
1459
}
1460

1461
static int32_t doBuildIntervalResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
32,093,227✔
1462
  int32_t                   code = TSDB_CODE_SUCCESS;
32,093,227✔
1463
  int32_t                   lino = 0;
32,093,227✔
1464
  SIntervalAggOperatorInfo* pInfo = pOperator->info;
32,093,227✔
1465
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
32,095,044✔
1466

1467
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
32,094,474✔
1468
    (*ppRes) = NULL;
3,744,299✔
1469
    return code;
3,744,299✔
1470
  }
1471

1472
  if (pOperator->pOperatorGetParam) {
28,349,609✔
1473
    if (pOperator->status == OP_EXEC_DONE && pOperator->fpSet.resetStateFn) {
71,238✔
1474
      code = pOperator->fpSet.resetStateFn(pOperator);
×
1475
      QUERY_CHECK_CODE(code, lino, _end);
×
1476
    }
1477
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
71,238✔
1478
    pOperator->pOperatorGetParam = NULL;
71,238✔
1479
  }
1480

1481
  SSDataBlock* pBlock = pInfo->binfo.pRes;
28,350,745✔
1482
  code = pOperator->fpSet._openFn(pOperator);
28,349,609✔
1483
  QUERY_CHECK_CODE(code, lino, _end);
28,307,316✔
1484

1485
  if (pInfo->indefRowsMode) {
28,307,316✔
1486
    (*ppRes) = getNextIndefRowsResultBlock(&pInfo->indefRows, pOperator);
1,121,996✔
1487
    if ((*ppRes) == NULL) {
1,121,996✔
1488
      setOperatorCompleted(pOperator);
72,192✔
1489
    }
1490
    return code;
1,121,996✔
1491
  }
1492

1493
  while (1) {
×
1494
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
27,185,320✔
1495
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
27,185,320✔
1496
    QUERY_CHECK_CODE(code, lino, _end);
27,185,320✔
1497

1498
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
27,185,320✔
1499
    if (!hasRemain) {
27,185,320✔
1500
      setOperatorCompleted(pOperator);
4,903,201✔
1501
      break;
4,903,201✔
1502
    }
1503

1504
    if (pBlock->info.rows > 0) {
22,282,119✔
1505
      break;
22,282,119✔
1506
    }
1507
  }
1508

1509
_end:
27,185,320✔
1510
  if (code != TSDB_CODE_SUCCESS) {
27,185,320✔
1511
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1512
    pTaskInfo->code = code;
×
1513
    T_LONG_JMP(pTaskInfo->env, code);
×
1514
  }
1515
  (*ppRes) = (pBlock->info.rows == 0) ? NULL : pBlock;
27,185,320✔
1516
  return code;
27,185,320✔
1517
}
1518

1519
static void destroyStateWindowOperatorInfo(void* param) {
1,034,640✔
1520
  if (param == NULL) {
1,034,640✔
1521
    return;
×
1522
  }
1523
  SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
1,034,640✔
1524
  cleanupBasicInfo(&pInfo->binfo);
1,034,640✔
1525
  taosMemoryFreeClear(pInfo->stateKey.pData);
1,034,640✔
1526
  cleanupIndefRowsRuntime(&pInfo->indefRows, pInfo->pOperator);
1,034,640✔
1527
  if (pInfo->pOperator) {
1,034,640✔
1528
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,034,640✔
1529
                      pInfo->cleanGroupResInfo);
1,034,640✔
1530
    pInfo->pOperator = NULL;
1,034,640✔
1531
  }
1532

1533
  cleanupExprSupp(&pInfo->scalarSup);
1,034,640✔
1534
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,034,640✔
1535
  cleanupAggSup(&pInfo->aggSup);
1,034,640✔
1536
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,034,640✔
1537

1538
  taosMemoryFreeClear(param);
1,034,640✔
1539
}
1540

1541
static void freeItem(void* param) {
49,762✔
1542
  SGroupKeys* pKey = (SGroupKeys*)param;
49,762✔
1543
  taosMemoryFree(pKey->pData);
49,762✔
1544
}
49,762✔
1545

1546
void destroyIntervalOperatorInfo(void* param) {
5,975,441✔
1547
  if (param == NULL) {
5,975,441✔
1548
    return;
×
1549
  }
1550

1551
  SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
5,975,441✔
1552

1553
  cleanupBasicInfo(&pInfo->binfo);
5,975,441✔
1554
  cleanupIndefRowsRuntime(&pInfo->indefRows, pInfo->pOperator);
5,975,441✔
1555
  if (pInfo->pOperator) {
5,975,303✔
1556
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
5,936,257✔
1557
                      pInfo->cleanGroupResInfo);
5,938,249✔
1558
    pInfo->pOperator = NULL;
5,936,064✔
1559
  }
1560

1561
  cleanupAggSup(&pInfo->aggSup);
5,974,875✔
1562
  cleanupExprSupp(&pInfo->scalarSupp);
5,974,966✔
1563

1564
  pInfo->binfo.resultRowInfo.openWindow = tdListFree(pInfo->binfo.resultRowInfo.openWindow);
5,975,299✔
1565

1566
  taosArrayDestroy(pInfo->pInterpCols);
5,975,441✔
1567
  pInfo->pInterpCols = NULL;
5,974,301✔
1568

1569
  taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
5,975,013✔
1570
  pInfo->pPrevValues = NULL;
5,974,163✔
1571

1572
  cleanupGroupResInfo(&pInfo->groupResInfo);
5,974,163✔
1573
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
5,975,013✔
1574
  destroyBoundedQueue(pInfo->pBQ);
5,973,402✔
1575
  taosMemoryFreeClear(param);
5,972,686✔
1576
}
1577

1578
static int32_t initWindowInterpPrevVal(SIntervalAggOperatorInfo* pInfo) {
24,881✔
1579
  int32_t code = TSDB_CODE_SUCCESS;
24,881✔
1580
  int32_t lino = 0;
24,881✔
1581
  void*   tmp = NULL;
24,881✔
1582

1583
  pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
24,881✔
1584
  QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
24,881✔
1585

1586
  pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
24,881✔
1587
  QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
24,881✔
1588

1589
  {  // ts column
1590
    SColumn c = {0};
24,881✔
1591
    c.colId = 1;
24,881✔
1592
    c.slotId = pInfo->primaryTsIndex;
24,881✔
1593
    c.type = TSDB_DATA_TYPE_TIMESTAMP;
24,881✔
1594
    c.bytes = sizeof(int64_t);
24,881✔
1595
    tmp = taosArrayPush(pInfo->pInterpCols, &c);
24,881✔
1596
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
24,881✔
1597

1598
    SGroupKeys key;
24,881✔
1599
    key.bytes = c.bytes;
24,881✔
1600
    key.type = c.type;
24,881✔
1601
    key.isNull = true;  // to denote no value is assigned yet
24,881✔
1602
    key.pData = taosMemoryCalloc(1, c.bytes);
24,881✔
1603
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
24,881✔
1604

1605
    tmp = taosArrayPush(pInfo->pPrevValues, &key);
24,881✔
1606
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
24,881✔
1607
  }
1608
_end:
24,881✔
1609
  if (code != TSDB_CODE_SUCCESS) {
24,881✔
1610
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1611
  }
1612
  return code;
24,881✔
1613
}
1614

1615
static int32_t timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo,
5,934,865✔
1616
                                      bool* pRes) {
1617
  // the primary timestamp column
1618
  bool    needed = false;
5,934,865✔
1619
  int32_t code = TSDB_CODE_SUCCESS;
5,934,865✔
1620
  int32_t lino = 0;
5,934,865✔
1621
  void*   tmp = NULL;
5,934,865✔
1622

1623
  for (int32_t i = 0; i < numOfCols; ++i) {
28,005,523✔
1624
    SExprInfo* pExpr = pCtx[i].pExpr;
22,102,841✔
1625
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
22,101,710✔
1626
      needed = true;
24,881✔
1627
      break;
24,881✔
1628
    }
1629
  }
1630

1631
  if (needed) {
5,927,563✔
1632
    code = initWindowInterpPrevVal(pInfo);
24,881✔
1633
    QUERY_CHECK_CODE(code, lino, _end);
24,881✔
1634
  }
1635

1636
  for (int32_t i = 0; i < numOfCols; ++i) {
28,009,655✔
1637
    SExprInfo* pExpr = pCtx[i].pExpr;
22,094,887✔
1638

1639
    if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
22,090,891✔
1640
      SFunctParam* pParam = &pExpr->base.pParam[0];
24,881✔
1641

1642
      SColumn c = *pParam->pCol;
24,881✔
1643
      tmp = taosArrayPush(pInfo->pInterpCols, &c);
24,881✔
1644
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
24,881✔
1645

1646
      SGroupKeys key = {0};
24,881✔
1647
      key.bytes = c.bytes;
24,881✔
1648
      key.type = c.type;
24,881✔
1649
      key.isNull = false;
24,881✔
1650
      key.pData = taosMemoryCalloc(1, c.bytes);
24,881✔
1651
      QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
24,881✔
1652

1653
      tmp = taosArrayPush(pInfo->pPrevValues, &key);
24,881✔
1654
      QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
24,881✔
1655
    }
1656
  }
1657

1658
_end:
5,914,768✔
1659
  if (code != TSDB_CODE_SUCCESS) {
5,909,918✔
1660
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1661
  }
1662
  *pRes = needed;
5,909,918✔
1663
  return code;
5,922,751✔
1664
}
1665

1666
static int32_t resetInterval(SOperatorInfo* pOper, SIntervalAggOperatorInfo* pIntervalInfo){
10,167✔
1667
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
10,167✔
1668
  SIntervalPhysiNode* pPhynode = (SIntervalPhysiNode*)pOper->pPhyNode;
10,167✔
1669
  pOper->status = OP_NOT_OPENED;
10,167✔
1670

1671
  resetBasicOperatorState(&pIntervalInfo->binfo);
10,167✔
1672
  cleanupResultInfo(pIntervalInfo->pOperator->pTaskInfo, &pIntervalInfo->pOperator->exprSupp, &pIntervalInfo->groupResInfo, &pIntervalInfo->aggSup,
10,167✔
1673
    pIntervalInfo->cleanGroupResInfo);
10,167✔
1674

1675
  colDataDestroy(&pIntervalInfo->twAggSup.timeWindowData);
10,167✔
1676
  int32_t code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pTaskInfo->window);
10,167✔
1677
  if (code == 0) {
10,167✔
1678
    code = resetAggSup(&pOper->exprSupp, &pIntervalInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
20,334✔
1679
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
10,167✔
1680
                       &pTaskInfo->storageAPI.functionStore);
1681
  }
1682
  if (code == 0) {
10,167✔
1683
    code = resetExprSupp(&pIntervalInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
10,167✔
1684
                         &pTaskInfo->storageAPI.functionStore);
1685
  }
1686

1687
  if (pIntervalInfo->binfo.resultRowInfo.openWindow != NULL){
10,167✔
1688
    tdListEmpty(pIntervalInfo->binfo.resultRowInfo.openWindow);
×
1689
  }
1690

1691
  if (pPhynode->window.node.pSlimit && ((SLimitNode*)pPhynode->window.node.pSlimit)->limit) {
10,167✔
1692
    pIntervalInfo->curGroupId = UINT64_MAX;
×
1693
  }
1694

1695
  pIntervalInfo->cleanGroupResInfo = false;
10,167✔
1696
  pIntervalInfo->handledGroupNum = 0;
10,167✔
1697
  pIntervalInfo->binfo.inputTsOrder = pPhynode->window.node.inputTsOrder;
10,167✔
1698
  pIntervalInfo->binfo.outputTsOrder = pPhynode->window.node.outputTsOrder;
10,167✔
1699

1700
  taosArrayDestroy(pIntervalInfo->pInterpCols);
10,167✔
1701
  pIntervalInfo->pInterpCols = NULL;
10,167✔
1702

1703
  if (pIntervalInfo->pPrevValues != NULL) {
10,167✔
1704
    taosArrayDestroyEx(pIntervalInfo->pPrevValues, freeItem);
×
1705
    pIntervalInfo->pPrevValues = NULL;
×
1706
    code = initWindowInterpPrevVal(pIntervalInfo);
×
1707
  }
1708

1709
  resetIndefRowsRuntime(&pIntervalInfo->indefRows, pIntervalInfo->pOperator);
10,167✔
1710

1711
  cleanupGroupResInfo(&pIntervalInfo->groupResInfo);
10,167✔
1712
  destroyBoundedQueue(pIntervalInfo->pBQ);
10,167✔
1713
  pIntervalInfo->pBQ = NULL;
10,167✔
1714
  return code;
10,167✔
1715
}
1716

1717
static int32_t resetIntervalOperState(SOperatorInfo* pOper) {
8,555✔
1718
  SIntervalAggOperatorInfo* pInfo = pOper->info;
8,555✔
1719
  return resetInterval(pOper, pInfo);
8,555✔
1720
}
1721

1722
int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
5,132,049✔
1723
                                   SOperatorInfo** pOptrInfo) {
1724
  QRY_PARAM_CHECK(pOptrInfo);
5,132,049✔
1725

1726
  int32_t                   code = TSDB_CODE_SUCCESS;
5,131,923✔
1727
  int32_t                   lino = 0;
5,131,923✔
1728
  SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
5,131,923✔
1729
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
5,123,809✔
1730
  if (pInfo == NULL || pOperator == NULL) {
5,125,873✔
1731
    code = terrno;
×
1732
    lino = __LINE__;
×
1733
    goto _error;
×
1734
  }
1735
  initOperatorCostInfo(pOperator);
5,125,873✔
1736

1737
  pOperator->pPhyNode = pPhyNode;
5,132,340✔
1738
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc);
5,133,480✔
1739
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
5,134,046✔
1740
  initBasicInfo(&pInfo->binfo, pResBlock);
5,134,046✔
1741

1742
  SExprSupp* pSup = &pOperator->exprSupp;
5,130,301✔
1743
  pSup->hasWindowOrGroup = true;
5,132,007✔
1744
  pSup->hasWindow = true;
5,132,531✔
1745

1746
  pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
5,132,018✔
1747

1748
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
5,130,895✔
1749
  initResultSizeInfo(&pOperator->resultInfo, 512);
5,130,895✔
1750
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
5,129,953✔
1751
  QUERY_CHECK_CODE(code, lino, _error);
5,134,046✔
1752

1753
  int32_t    num = 0;
5,134,046✔
1754
  SExprInfo* pExprInfo = NULL;
5,134,046✔
1755
  code = createExprInfo(pPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
5,134,046✔
1756
  QUERY_CHECK_CODE(code, lino, _error);
5,133,619✔
1757

1758
  code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, NULL,
5,133,619✔
1759
                    &pTaskInfo->storageAPI.functionStore);
1760
  QUERY_CHECK_CODE(code, lino, _error);
5,131,107✔
1761

1762
  pInfo->indefRowsMode = pPhyNode->window.indefRowsFunc;
5,131,107✔
1763
  if (pInfo->indefRowsMode) {
5,127,121✔
1764
    code = initIndefRowsRuntime(&pInfo->indefRows, pOperator->exprSupp.pCtx, num, pOperator->resultInfo.capacity);
73,969✔
1765
    QUERY_CHECK_CODE(code, lino, _error);
73,969✔
1766
  }
1767

1768
  SInterval interval = {.interval = pPhyNode->interval,
15,366,921✔
1769
                        .sliding = pPhyNode->sliding,
5,129,766✔
1770
                        .intervalUnit = pPhyNode->intervalUnit,
5,130,897✔
1771
                        .slidingUnit = pPhyNode->slidingUnit,
5,130,744✔
1772
                        .offset = pPhyNode->offset,
5,123,590✔
1773
                        .precision = ((SColumnNode*)pPhyNode->window.pTspk)->node.resType.precision,
5,125,155✔
1774
                        .timeRange = pPhyNode->timeRange};
1775
  calcIntervalAutoOffset(&interval);
5,119,718✔
1776

1777
  STimeWindowAggSupp as = {
5,120,653✔
1778
      .maxTs = INT64_MIN,
1779
  };
1780

1781
  pInfo->win = pTaskInfo->window;
5,120,653✔
1782
  pInfo->binfo.inputTsOrder = pPhyNode->window.node.inputTsOrder;
5,124,404✔
1783
  pInfo->binfo.outputTsOrder = pPhyNode->window.node.outputTsOrder;
5,124,676✔
1784
  pInfo->interval = interval;
5,127,249✔
1785
  pInfo->twAggSup = as;
5,120,165✔
1786
  pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
5,121,083✔
1787
  if (pPhyNode->window.node.pLimit && ((SLimitNode*)pPhyNode->window.node.pLimit)->limit) {
5,124,484✔
1788
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pLimit;
1,158,308✔
1789
    pInfo->limited = true;
1,158,183✔
1790
    pInfo->limit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
1,157,755✔
1791
  }
1792
  if (pPhyNode->window.node.pSlimit && ((SLimitNode*)pPhyNode->window.node.pSlimit)->limit) {
5,121,061✔
1793
    SLimitNode* pLimit = (SLimitNode*)pPhyNode->window.node.pSlimit;
113,965✔
1794
    pInfo->slimited = true;
113,562✔
1795
    pInfo->slimit = pLimit->limit->datum.i + (pLimit->offset ? pLimit->offset->datum.i : 0);
114,000✔
1796
    pInfo->curGroupId = UINT64_MAX;
113,562✔
1797
  }
1798

1799
  if (pPhyNode->window.pExprs != NULL) {
5,130,449✔
1800
    int32_t    numOfScalar = 0;
515,428✔
1801
    SExprInfo* pScalarExprInfo = NULL;
515,428✔
1802
    code = createExprInfo(pPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
513,186✔
1803
    QUERY_CHECK_CODE(code, lino, _error);
515,998✔
1804

1805
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
478,378✔
1806
    if (code != TSDB_CODE_SUCCESS) {
478,378✔
1807
      goto _error;
×
1808
    }
1809
  }
1810

1811
  code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
5,091,152✔
1812
                            pTaskInfo->pStreamRuntimeInfo);
5,094,763✔
1813
  if (code != TSDB_CODE_SUCCESS) {
5,093,685✔
1814
    goto _error;
×
1815
  }
1816

1817
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
5,093,685✔
1818
  QUERY_CHECK_CODE(code, lino, _error);
5,085,093✔
1819

1820
  pInfo->timeWindowInterpo = false;
5,085,093✔
1821
  code = timeWindowinterpNeeded(pSup->pCtx, num, pInfo, &pInfo->timeWindowInterpo);
5,092,679✔
1822
  QUERY_CHECK_CODE(code, lino, _error);
5,084,314✔
1823
  if (pInfo->timeWindowInterpo) {
5,084,314✔
1824
    pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
24,881✔
1825
    if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
24,881✔
1826
      goto _error;
×
1827
    }
1828
  }
1829

1830
  pInfo->pOperator = pOperator;
5,091,869✔
1831
  pInfo->cleanGroupResInfo = false;
5,091,991✔
1832
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
5,082,998✔
1833
  setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
5,080,329✔
1834
                  pInfo, pTaskInfo);
1835

1836
  pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResultNext, NULL, destroyIntervalOperatorInfo,
5,082,934✔
1837
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1838
  setOperatorResetStateFn(pOperator, resetIntervalOperState);
5,084,493✔
1839
  code = appendDownstream(pOperator, &downstream, 1);
5,082,871✔
1840
  if (code != TSDB_CODE_SUCCESS) {
5,094,282✔
1841
    goto _error;
×
1842
  }
1843

1844
  *pOptrInfo = pOperator;
5,094,282✔
1845
  return TSDB_CODE_SUCCESS;
5,095,255✔
1846

1847
_error:
37,620✔
1848
  if (pInfo != NULL) {
37,620✔
1849
    destroyIntervalOperatorInfo(pInfo);
37,620✔
1850
  }
1851

1852
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
37,620✔
1853
  pTaskInfo->code = code;
37,620✔
1854
  qError("error happens at %s %d, code:%s", __func__, lino, tstrerror(code));
37,620✔
1855
  return code;
37,620✔
1856
}
1857

1858
// todo handle multiple timeline cases. assume no timeline interweaving
1859
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
16,494,443✔
1860
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
16,494,443✔
1861
  SExprSupp*     pSup = &pOperator->exprSupp;
16,494,443✔
1862

1863
  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
16,494,443✔
1864
  if (!pColInfoData) {
16,494,443✔
1865
    pTaskInfo->code = terrno;
×
1866
    T_LONG_JMP(pTaskInfo->env, terrno);
×
1867
  }
1868

1869
  bool    masterScan = true;
16,494,443✔
1870
  int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
16,494,443✔
1871
  int64_t gid = pBlock->info.id.groupId;
16,494,443✔
1872

1873
  int64_t gap = pInfo->gap;
16,494,443✔
1874

1875
  if (!pInfo->reptScan) {
16,494,443✔
1876
    pInfo->reptScan = true;
544,845✔
1877
    pInfo->winSup.prevTs = INT64_MIN;
544,845✔
1878
  }
1879

1880
  SWindowRowsSup* pRowSup = &pInfo->winSup;
16,494,443✔
1881
  pRowSup->numOfRows = 0;
16,494,443✔
1882
  pRowSup->startRowIndex = 0;
16,494,443✔
1883

1884
  // In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
1885
  TSKEY* tsList = (TSKEY*)pColInfoData->pData;
16,494,443✔
1886
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
2,147,483,647✔
1887
    if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
2,147,483,647✔
1888
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
5,424,989✔
1889
      doKeepTuple(pRowSup, tsList[j], j, gid);
5,424,989✔
1890
    } else if (((tsList[j] - pRowSup->prevTs >= 0) && (tsList[j] - pRowSup->prevTs <= gap)) ||
2,147,483,647✔
1891
               ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
1,091,504,283✔
1892
      // The gap is less than the threshold, so it belongs to current session window that has been opened already.
1893
      doKeepTuple(pRowSup, tsList[j], j, gid);
2,147,483,647✔
1894
    } else {  // start a new session window
1895
      // start a new session window
1896
      if (pRowSup->numOfRows > 0) {  // handled data that belongs to the previous session window
1,091,409,580✔
1897
        // keep the time window for the closed time window.
1898
        STimeWindow window = pRowSup->win;
1,088,759,123✔
1899

1900
        int32_t ret = TSDB_CODE_SUCCESS;
1,088,759,123✔
1901
        if (pInfo->indefRowsMode) {
1,088,759,123✔
1902
          ret = applyIndefRowsWindowSegment(pOperator, &pInfo->indefRows, pInfo->binfo.pRes,
13,648✔
1903
                                            pInfo->aggSup.resultRowSize, gid, &window, pBlock,
1904
                                            pRowSup->startRowIndex, pRowSup->numOfRows, pInfo->binfo.inputTsOrder,
1905
                                            true);
1906
        } else {
1907
          SResultRow* pResult = NULL;
1,088,745,475✔
1908
          ret =
1909
              setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
1,088,745,475✔
1910
                                     numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1911
          if (ret == TSDB_CODE_SUCCESS) {
1,088,745,475✔
1912
            // pInfo->numOfRows data belong to the current session window
1913
            updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
1,088,745,475✔
1914
            ret =
1915
                applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData,
1,088,745,475✔
1916
                                                pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
1,088,745,475✔
1917
          }
1918
        }
1919
        if (ret != TSDB_CODE_SUCCESS) {
1,088,759,123✔
1920
          T_LONG_JMP(pTaskInfo->env, ret);
1,329✔
1921
        }
1922
      }
1923

1924
      // here we start a new session window
1925
      doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
1,091,408,251✔
1926
      doKeepTuple(pRowSup, tsList[j], j, gid);
1,091,408,251✔
1927
    }
1928
  }
1929

1930
  pRowSup->win.ekey = tsList[pBlock->info.rows - 1];
16,493,114✔
1931
  int32_t ret = TSDB_CODE_SUCCESS;
16,493,114✔
1932
  if (pInfo->indefRowsMode) {
16,493,114✔
1933
    ret = applyIndefRowsWindowSegment(pOperator, &pInfo->indefRows, pInfo->binfo.pRes, pInfo->aggSup.resultRowSize,
21,414✔
1934
                                      gid, &pRowSup->win, pBlock, pRowSup->startRowIndex, pRowSup->numOfRows,
10,707✔
1935
                                      pInfo->binfo.inputTsOrder, false);
1936
  } else {
1937
    SResultRow* pResult = NULL;
16,482,407✔
1938
    ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
16,482,407✔
1939
                                 pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
1940
    if (ret == TSDB_CODE_SUCCESS) {
16,482,407✔
1941
      updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
16,482,407✔
1942
      ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
16,482,407✔
1943
                                            pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
16,482,407✔
1944
    }
1945
  }
1946
  if (ret != TSDB_CODE_SUCCESS) {
16,493,114✔
1947
    T_LONG_JMP(pTaskInfo->env, ret);
×
1948
  }
1949
}
16,493,114✔
1950

1951
static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,884,032✔
1952
  if (pOperator->status == OP_EXEC_DONE) {
2,884,032✔
1953
    (*ppRes) = NULL;
401,083✔
1954
    return TSDB_CODE_SUCCESS;
401,083✔
1955
  }
1956

1957
  int32_t                  code = TSDB_CODE_SUCCESS;
2,482,949✔
1958
  int32_t                  lino = 0;
2,482,949✔
1959
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
2,482,949✔
1960
  SSessionAggOperatorInfo* pInfo = pOperator->info;
2,482,949✔
1961
  SOptrBasicInfo*          pBInfo = &pInfo->binfo;
2,482,949✔
1962
  SExprSupp*               pSup = &pOperator->exprSupp;
2,482,949✔
1963

1964
  if (pOperator->status == OP_RES_TO_RETURN) {
2,482,949✔
1965
    if (pInfo->indefRowsMode) {
1,895,989✔
1966
      (*ppRes) = getNextIndefRowsResultBlock(&pInfo->indefRows, pOperator);
20,608✔
1967
      if ((*ppRes) == NULL) {
20,608✔
1968
        setOperatorCompleted(pOperator);
7,616✔
1969
      }
1970
      return code;
20,608✔
1971
    }
1972

1973
    while (1) {
×
1974
      doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
1,875,381✔
1975
      code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,875,381✔
1976
      QUERY_CHECK_CODE(code, lino, _end);
1,875,381✔
1977

1978
      bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
1,875,381✔
1979
      if (!hasRemain) {
1,875,381✔
1980
        setOperatorCompleted(pOperator);
78,555✔
1981
        break;
78,555✔
1982
      }
1983

1984
      if (pBInfo->pRes->info.rows > 0) {
1,796,826✔
1985
        break;
1,796,826✔
1986
      }
1987
    }
1988
    (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
1,875,381✔
1989
    return code;
1,875,381✔
1990
  }
1991

1992
  int32_t order = pInfo->binfo.inputTsOrder;
586,960✔
1993

1994
  SOperatorInfo* downstream = pOperator->pDownstream[0];
586,960✔
1995

1996
  pInfo->cleanGroupResInfo = false;
586,960✔
1997
  while (1) {
16,493,114✔
1998
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
17,080,074✔
1999
    if (pBlock == NULL) {
17,080,074✔
2000
      break;
585,631✔
2001
    }
2002

2003
    pBInfo->pRes->info.scanFlag = pBlock->info.scanFlag;
16,494,443✔
2004
    pBInfo->pRes->info.dataLoad = 1;
16,494,443✔
2005
    if (pInfo->scalarSupp.pExprInfo != NULL) {
16,494,443✔
2006
      SExprSupp* pExprSup = &pInfo->scalarSupp;
3,036✔
2007
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
6,072✔
2008
                                   GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
3,036✔
2009
      QUERY_CHECK_CODE(code, lino, _end);
3,036✔
2010
    }
2011
    // the pDataBlock are always the same one, no need to call this again
2012
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
16,494,443✔
2013
    QUERY_CHECK_CODE(code, lino, _end);
16,494,443✔
2014

2015
    code = blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
16,494,443✔
2016
    QUERY_CHECK_CODE(code, lino, _end);
16,494,443✔
2017

2018
    doSessionWindowAggImpl(pOperator, pInfo, pBlock);
16,494,443✔
2019
  }
2020

2021
  if (pInfo->indefRowsMode) {
585,631✔
2022
    code = closeAllIndefRowsWindowStates(pOperator, &pInfo->indefRows);
9,363✔
2023
    QUERY_CHECK_CODE(code, lino, _end);
9,363✔
2024
    pOperator->status = OP_RES_TO_RETURN;
9,363✔
2025
    (*ppRes) = getNextIndefRowsResultBlock(&pInfo->indefRows, pOperator);
9,363✔
2026
    if ((*ppRes) == NULL) {
9,363✔
2027
      setOperatorCompleted(pOperator);
1,747✔
2028
    }
2029
    return code;
9,363✔
2030
  }
2031

2032
  // restore the value
2033
  pOperator->status = OP_RES_TO_RETURN;
576,268✔
2034

2035
  code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
576,268✔
2036
  QUERY_CHECK_CODE(code, lino, _end);
576,268✔
2037
  pInfo->cleanGroupResInfo = true;
576,268✔
2038

2039
  code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
576,268✔
2040
  QUERY_CHECK_CODE(code, lino, _end);
576,268✔
2041
  while (1) {
×
2042
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
576,268✔
2043
    code = doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
576,268✔
2044
    QUERY_CHECK_CODE(code, lino, _end);
576,268✔
2045

2046
    bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
576,268✔
2047
    if (!hasRemain) {
576,268✔
2048
      setOperatorCompleted(pOperator);
454,203✔
2049
      break;
454,203✔
2050
    }
2051

2052
    if (pBInfo->pRes->info.rows > 0) {
122,065✔
2053
      break;
122,065✔
2054
    }
2055
  }
2056

2057
_end:
576,268✔
2058
  if (code != TSDB_CODE_SUCCESS) {
576,268✔
2059
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2060
    pTaskInfo->code = code;
×
2061
    T_LONG_JMP(pTaskInfo->env, code);
×
2062
  }
2063
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
576,268✔
2064
  return code;
576,268✔
2065
}
2066

2067
static int32_t resetStatewindowOperState(SOperatorInfo* pOper) {
40,288✔
2068
  SStateWindowOperatorInfo* pInfo = pOper->info;
40,288✔
2069
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
40,288✔
2070
  SStateWindowPhysiNode* pPhynode = (SStateWindowPhysiNode*)pOper->pPhyNode;
40,288✔
2071
  pOper->status = OP_NOT_OPENED;
40,288✔
2072

2073
  resetBasicOperatorState(&pInfo->binfo);
40,288✔
2074
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
40,288✔
2075
                    pInfo->cleanGroupResInfo);
40,288✔
2076

2077
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
40,288✔
2078
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
40,288✔
2079
  if (code == 0) {
40,288✔
2080
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
80,576✔
2081
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
40,288✔
2082
                       &pTaskInfo->storageAPI.functionStore);
2083
  }
2084
  if (code == 0) {
40,288✔
2085
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->window.pExprs, NULL,
40,288✔
2086
                         &pTaskInfo->storageAPI.functionStore);
2087
  }
2088

2089
  pInfo->cleanGroupResInfo = false;
40,288✔
2090
  pInfo->hasKey = false;
40,288✔
2091
  pInfo->winSup.lastTs = INT64_MIN;
40,288✔
2092
  resetIndefRowsRuntime(&pInfo->indefRows, pInfo->pOperator);
40,288✔
2093
  cleanupGroupResInfo(&pInfo->groupResInfo);
40,288✔
2094
  memset(pInfo->stateKey.pData, 0, pInfo->stateKey.bytes);
40,288✔
2095
  return code;
40,288✔
2096
}
2097

2098
// todo make this as an non-blocking operator
2099
int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWindowPhysiNode* pStateNode,
1,034,640✔
2100
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2101
  QRY_PARAM_CHECK(pOptrInfo);
1,034,640✔
2102

2103
  int32_t                   code = TSDB_CODE_SUCCESS;
1,034,640✔
2104
  int32_t                   lino = 0;
1,034,640✔
2105
  SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
1,034,640✔
2106
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,034,640✔
2107
  if (pInfo == NULL || pOperator == NULL) {
1,034,640✔
2108
    code = terrno;
×
2109
    goto _error;
×
2110
  }
2111
  initOperatorCostInfo(pOperator);
1,034,640✔
2112

2113
  pOperator->pPhyNode = pStateNode;
1,034,640✔
2114
  pOperator->exprSupp.hasWindowOrGroup = true;
1,034,640✔
2115
  pOperator->exprSupp.hasWindow = true;
1,034,640✔
2116
  int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
1,034,640✔
2117
  SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
1,034,640✔
2118

2119
  if (pStateNode->window.pExprs != NULL) {
1,034,640✔
2120
    int32_t    numOfScalarExpr = 0;
259,402✔
2121
    SExprInfo* pScalarExprInfo = NULL;
259,402✔
2122
    code = createExprInfo(pStateNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
259,402✔
2123
    QUERY_CHECK_CODE(code, lino, _error);
259,402✔
2124

2125
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
259,402✔
2126
    if (code != TSDB_CODE_SUCCESS) {
259,402✔
2127
      goto _error;
×
2128
    }
2129
  }
2130

2131
  pInfo->stateCol = extractColumnFromColumnNode(pColNode);
1,034,640✔
2132
  pInfo->stateKey.type = pInfo->stateCol.type;
1,034,640✔
2133
  pInfo->stateKey.bytes = pInfo->stateCol.bytes;
1,034,640✔
2134
  pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
1,034,640✔
2135
  if (pInfo->stateKey.pData == NULL) {
1,034,640✔
2136
    goto _error;
×
2137
  }
2138
  pInfo->binfo.inputTsOrder = pStateNode->window.node.inputTsOrder;
1,034,640✔
2139
  pInfo->binfo.outputTsOrder = pStateNode->window.node.outputTsOrder;
1,034,640✔
2140

2141
  code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,034,640✔
2142
                            pTaskInfo->pStreamRuntimeInfo);
1,034,640✔
2143
  if (code != TSDB_CODE_SUCCESS) {
1,034,640✔
2144
    goto _error;
×
2145
  }
2146

2147
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,034,640✔
2148

2149
  int32_t    num = 0;
1,034,640✔
2150
  SExprInfo* pExprInfo = NULL;
1,034,640✔
2151
  code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &num);
1,034,640✔
2152
  QUERY_CHECK_CODE(code, lino, _error);
1,034,640✔
2153

2154
  initResultSizeInfo(&pOperator->resultInfo, 4096);
1,034,640✔
2155

2156
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
1,034,640✔
2157
                    NULL, &pTaskInfo->storageAPI.functionStore);
2158
  if (code != TSDB_CODE_SUCCESS) {
1,034,640✔
2159
    goto _error;
×
2160
  }
2161

2162
  pInfo->indefRowsMode = pStateNode->window.indefRowsFunc;
1,034,640✔
2163
  if (pInfo->indefRowsMode) {
1,034,640✔
2164
    code = initIndefRowsRuntime(&pInfo->indefRows, pOperator->exprSupp.pCtx, num, pOperator->resultInfo.capacity);
10,259✔
2165
    QUERY_CHECK_CODE(code, lino, _error);
10,259✔
2166
  }
2167

2168
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc);
1,034,640✔
2169
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,034,640✔
2170
  initBasicInfo(&pInfo->binfo, pResBlock);
1,034,640✔
2171
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
1,034,640✔
2172

2173
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,034,640✔
2174
  QUERY_CHECK_CODE(code, lino, _error);
1,034,640✔
2175

2176
  pInfo->tsSlotId = tsSlotId;
1,034,640✔
2177
  pInfo->pOperator = pOperator;
1,034,640✔
2178
  pInfo->cleanGroupResInfo = false;
1,034,640✔
2179
  pInfo->extendOption = pStateNode->extendOption;
1,034,640✔
2180
  pInfo->trueForInfo.trueForType = pStateNode->trueForType;
1,034,640✔
2181
  pInfo->trueForInfo.count = pStateNode->trueForCount;
1,034,640✔
2182
  pInfo->trueForInfo.duration = pStateNode->trueForDuration;
1,034,640✔
2183
  pInfo->winSup.lastTs = INT64_MIN;
1,034,640✔
2184

2185
  setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
1,034,640✔
2186
                  pTaskInfo);
2187
  pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAggNext, NULL, destroyStateWindowOperatorInfo,
1,034,640✔
2188
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2189
  setOperatorResetStateFn(pOperator, resetStatewindowOperState);
1,034,640✔
2190

2191
  code = appendDownstream(pOperator, &downstream, 1);
1,034,640✔
2192
  if (code != TSDB_CODE_SUCCESS) {
1,034,640✔
2193
    goto _error;
×
2194
  }
2195

2196
  *pOptrInfo = pOperator;
1,034,640✔
2197
  return TSDB_CODE_SUCCESS;
1,034,640✔
2198

2199
_error:
×
2200
  if (pInfo != NULL) {
×
2201
    destroyStateWindowOperatorInfo(pInfo);
×
2202
  }
2203

2204
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2205
  pTaskInfo->code = code;
×
2206
  return code;
×
2207
}
2208

2209
void destroySWindowOperatorInfo(void* param) {
601,214✔
2210
  SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
601,214✔
2211
  if (pInfo == NULL) {
601,214✔
2212
    return;
×
2213
  }
2214

2215
  cleanupBasicInfo(&pInfo->binfo);
601,214✔
2216
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
601,214✔
2217
  cleanupIndefRowsRuntime(&pInfo->indefRows, pInfo->pOperator);
601,214✔
2218
  if (pInfo->pOperator) {
601,214✔
2219
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
601,214✔
2220
                      pInfo->cleanGroupResInfo);
601,214✔
2221
    pInfo->pOperator = NULL;
601,214✔
2222
  }
2223

2224
  cleanupAggSup(&pInfo->aggSup);
601,214✔
2225
  cleanupExprSupp(&pInfo->scalarSupp);
601,214✔
2226

2227
  cleanupGroupResInfo(&pInfo->groupResInfo);
601,214✔
2228
  taosMemoryFreeClear(param);
601,214✔
2229
}
2230

2231
static int32_t resetSessionAggOperState(SOperatorInfo* pOper) {
1,704✔
2232
  SSessionAggOperatorInfo* pInfo = pOper->info;
1,704✔
2233
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
1,704✔
2234
  SSessionWinodwPhysiNode* pPhynode = (SSessionWinodwPhysiNode*)pOper->pPhyNode;
1,704✔
2235
  pOper->status = OP_NOT_OPENED;
1,704✔
2236

2237
  resetBasicOperatorState(&pInfo->binfo);
1,704✔
2238
  cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,704✔
2239
                    pInfo->cleanGroupResInfo);
1,704✔
2240

2241
  colDataDestroy(&pInfo->twAggSup.timeWindowData);
1,704✔
2242
  int32_t code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
1,704✔
2243
  if (code == 0) {
1,704✔
2244
    code = resetAggSup(&pOper->exprSupp, &pInfo->aggSup, pTaskInfo, pPhynode->window.pFuncs, NULL,
3,408✔
2245
                       sizeof(int64_t) * 2 + POINTER_BYTES, pTaskInfo->id.str, NULL,
1,704✔
2246
                       &pTaskInfo->storageAPI.functionStore);
2247
  }
2248
  if (code == 0) {
1,704✔
2249
    code = resetExprSupp(&pInfo->scalarSupp, pTaskInfo, pPhynode->window.pExprs, NULL,
1,704✔
2250
                         &pTaskInfo->storageAPI.functionStore);
2251
  }
2252

2253
  pInfo->cleanGroupResInfo = false;
1,704✔
2254
  pInfo->winSup = (SWindowRowsSup){0};
1,704✔
2255
  pInfo->winSup.prevTs = INT64_MIN;
1,704✔
2256
  pInfo->reptScan = false;
1,704✔
2257
  resetIndefRowsRuntime(&pInfo->indefRows, pInfo->pOperator);
1,704✔
2258

2259
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,704✔
2260
  return code;
1,704✔
2261
}
2262

2263
int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
601,214✔
2264
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2265
  QRY_PARAM_CHECK(pOptrInfo);
601,214✔
2266

2267
  int32_t                  code = TSDB_CODE_SUCCESS;
601,214✔
2268
  int32_t                  lino = 0;
601,214✔
2269
  SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
601,214✔
2270
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
601,214✔
2271
  if (pInfo == NULL || pOperator == NULL) {
601,214✔
2272
    code = terrno;
×
2273
    goto _error;
×
2274
  }
2275
  initOperatorCostInfo(pOperator);
601,214✔
2276

2277
  pOperator->pPhyNode = pSessionNode;
601,214✔
2278
  pOperator->exprSupp.hasWindowOrGroup = true;
601,214✔
2279
  pOperator->exprSupp.hasWindow = true;
601,214✔
2280

2281
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
601,214✔
2282
  initResultSizeInfo(&pOperator->resultInfo, 4096);
601,214✔
2283

2284
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc);
601,214✔
2285
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
601,214✔
2286
  initBasicInfo(&pInfo->binfo, pResBlock);
601,214✔
2287

2288
  int32_t    numOfCols = 0;
601,214✔
2289
  SExprInfo* pExprInfo = NULL;
601,214✔
2290
  code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols);
601,214✔
2291
  QUERY_CHECK_CODE(code, lino, _error);
601,214✔
2292

2293
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
601,214✔
2294
                    NULL, &pTaskInfo->storageAPI.functionStore);
2295
  QUERY_CHECK_CODE(code, lino, _error);
601,214✔
2296

2297
  pInfo->indefRowsMode = pSessionNode->window.indefRowsFunc;
601,214✔
2298
  if (pInfo->indefRowsMode) {
601,214✔
2299
    code = initIndefRowsRuntime(&pInfo->indefRows, pOperator->exprSupp.pCtx, numOfCols, pOperator->resultInfo.capacity);
10,692✔
2300
    QUERY_CHECK_CODE(code, lino, _error);
10,692✔
2301
  }
2302

2303
  pInfo->gap = pSessionNode->gap;
601,214✔
2304

2305
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
601,214✔
2306
  code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
601,214✔
2307
  QUERY_CHECK_CODE(code, lino, _error);
601,214✔
2308

2309
  pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
601,214✔
2310
  pInfo->binfo.pRes = pResBlock;
601,214✔
2311
  pInfo->winSup.prevTs = INT64_MIN;
601,214✔
2312
  pInfo->reptScan = false;
601,214✔
2313
  pInfo->binfo.inputTsOrder = pSessionNode->window.node.inputTsOrder;
601,214✔
2314
  pInfo->binfo.outputTsOrder = pSessionNode->window.node.outputTsOrder;
601,214✔
2315

2316
  if (pSessionNode->window.pExprs != NULL) {
601,214✔
2317
    int32_t    numOfScalar = 0;
2,014✔
2318
    SExprInfo* pScalarExprInfo = NULL;
2,014✔
2319
    code = createExprInfo(pSessionNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
2,014✔
2320
    QUERY_CHECK_CODE(code, lino, _error);
2,014✔
2321

2322
    code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
2,014✔
2323
    QUERY_CHECK_CODE(code, lino, _error);
2,014✔
2324
  }
2325

2326
  code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
601,214✔
2327
                            pTaskInfo->pStreamRuntimeInfo);
601,214✔
2328
  QUERY_CHECK_CODE(code, lino, _error);
601,214✔
2329

2330
  pInfo->pOperator = pOperator;
601,214✔
2331
  pInfo->cleanGroupResInfo = false;
601,214✔
2332
  setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
601,214✔
2333
                  pInfo, pTaskInfo);
2334
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAggNext, NULL, destroySWindowOperatorInfo,
601,214✔
2335
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2336
  pOperator->pTaskInfo = pTaskInfo;
601,214✔
2337
  setOperatorResetStateFn(pOperator, resetSessionAggOperState);
601,214✔
2338

2339
  code = appendDownstream(pOperator, &downstream, 1);
601,214✔
2340
  QUERY_CHECK_CODE(code, lino, _error);
601,214✔
2341

2342
  *pOptrInfo = pOperator;
601,214✔
2343
  return TSDB_CODE_SUCCESS;
601,214✔
2344

2345
_error:
×
2346
  if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
×
2347
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2348
  pTaskInfo->code = code;
×
2349
  return code;
×
2350
}
2351

2352
void destroyMAIOperatorInfo(void* param) {
841,823✔
2353
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
841,823✔
2354
  destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo);
841,823✔
2355
  taosMemoryFreeClear(param);
841,823✔
2356
}
841,823✔
2357

2358
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
748,364✔
2359
  SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
748,364✔
2360
  if (NULL == pResult) {
748,364✔
2361
    return pResult;
×
2362
  }
2363
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
748,364✔
2364
  return pResult;
748,364✔
2365
}
2366

2367
static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
1,020,689,750✔
2368
                                       SExprSupp* pExprSup, SAggSupporter* pAggSup) {
2369
  if (*pResult == NULL) {
1,020,689,750✔
2370
    *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup);
748,364✔
2371
    if (*pResult == NULL) {
748,364✔
2372
      return terrno;
×
2373
    }
2374
  }
2375

2376
  // set time window for current result
2377
  (*pResult)->win = (*win);
1,020,691,885✔
2378
  return setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
1,020,720,494✔
2379
}
2380

2381
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
4,132,885✔
2382
                                          SSDataBlock* pBlock, SSDataBlock* pResultBlock) {
2383
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
4,132,885✔
2384
  SIntervalAggOperatorInfo*             iaInfo = miaInfo->intervalAggOperatorInfo;
4,132,885✔
2385

2386
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
4,132,885✔
2387
  SExprSupp*     pSup = &pOperatorInfo->exprSupp;
4,132,885✔
2388
  SInterval*     pInterval = &iaInfo->interval;
4,132,885✔
2389

2390
  int32_t  startPos = 0;
4,132,885✔
2391
  int64_t* tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
4,132,885✔
2392

2393
  TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
4,132,885✔
2394

2395
  // there is an result exists
2396
  if (miaInfo->curTs != INT64_MIN) {
4,132,885✔
2397
    if (ts != miaInfo->curTs) {
1,398,295✔
2398
      finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
1,342,501✔
2399
      resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,342,501✔
2400
      miaInfo->curTs = ts;
1,342,501✔
2401
    }
2402
  } else {
2403
    miaInfo->curTs = ts;
2,734,590✔
2404
  }
2405

2406
  STimeWindow win = {0};
4,132,885✔
2407
  win.skey = miaInfo->curTs;
4,132,885✔
2408
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
4,132,885✔
2409

2410
  int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
4,132,885✔
2411
  if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
4,132,885✔
2412
    T_LONG_JMP(pTaskInfo->env, ret);
×
2413
  }
2414

2415
  int32_t currPos = startPos;
4,132,885✔
2416

2417
  STimeWindow currWin = win;
4,132,885✔
2418
  while (++currPos < pBlock->info.rows) {
2,144,386,492✔
2419
    if (tsCols[currPos] == miaInfo->curTs) {
2,140,131,485✔
2420
      continue;
1,123,660,447✔
2421
    }
2422

2423
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
1,016,563,697✔
2424
    ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
2,033,164,543✔
2425
                                          currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
1,016,582,058✔
2426
    if (ret != TSDB_CODE_SUCCESS) {
1,016,575,226✔
2427
      T_LONG_JMP(pTaskInfo->env, ret);
×
2428
    }
2429

2430
    finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo);
1,016,575,226✔
2431
    resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,016,586,755✔
2432
    miaInfo->curTs = tsCols[currPos];
1,016,623,477✔
2433

2434
    currWin.skey = miaInfo->curTs;
1,016,619,207✔
2435
    currWin.ekey =
1,016,555,584✔
2436
        taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL) - 1;
1,016,619,634✔
2437

2438
    startPos = currPos;
1,016,555,584✔
2439
    ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup);
1,016,555,584✔
2440
    if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) {
1,016,592,306✔
2441
      T_LONG_JMP(pTaskInfo->env, ret);
2,989✔
2442
    }
2443

2444
    miaInfo->curTs = currWin.skey;
1,016,590,598✔
2445
  }
2446

2447
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
4,132,885✔
2448
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
8,265,770✔
2449
                                        currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
4,132,885✔
2450
  if (ret != TSDB_CODE_SUCCESS) {
4,132,885✔
2451
    T_LONG_JMP(pTaskInfo->env, ret);
×
2452
  }
2453
}
4,132,885✔
2454

2455
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
2,724,581✔
2456
  pRes->info.id.groupId = pMiaInfo->groupId;
2,724,581✔
2457
  pMiaInfo->curTs = INT64_MIN;
2,724,581✔
2458
  pMiaInfo->groupId = 0;
2,724,581✔
2459
}
2,724,581✔
2460

2461
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
3,743,433✔
2462
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
3,743,433✔
2463
  int32_t                               code = TSDB_CODE_SUCCESS;
3,743,433✔
2464
  int32_t                               lino = 0;
3,743,433✔
2465
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
3,743,433✔
2466
  SIntervalAggOperatorInfo*             pIaInfo = pMiaInfo->intervalAggOperatorInfo;
3,743,433✔
2467

2468
  SExprSupp*      pSup = &pOperator->exprSupp;
3,743,433✔
2469
  SSDataBlock*    pRes = pIaInfo->binfo.pRes;
3,743,433✔
2470
  SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo;
3,743,433✔
2471
  SOperatorInfo*  downstream = pOperator->pDownstream[0];
3,743,433✔
2472

2473
  while (1) {
3,204,141✔
2474
    SSDataBlock* pBlock = NULL;
6,947,574✔
2475
    if (pMiaInfo->prefetchedBlock == NULL) {
6,947,574✔
2476
      pBlock = getNextBlockFromDownstreamRemainDetach(pOperator, 0);
4,961,348✔
2477
    } else {
2478
      pBlock = pMiaInfo->prefetchedBlock;
1,986,226✔
2479
      pMiaInfo->prefetchedBlock = NULL;
1,986,226✔
2480

2481
      pMiaInfo->groupId = pBlock->info.id.groupId;
1,986,226✔
2482
    }
2483

2484
    // no data exists, all query processing is done
2485
    if (pBlock == NULL) {
6,946,934✔
2486
      // close last unclosed time window
2487
      if (pMiaInfo->curTs != INT64_MIN) {
827,823✔
2488
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
738,355✔
2489
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
738,355✔
2490
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
738,355✔
2491
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
738,355✔
2492
        QUERY_CHECK_CODE(code, lino, _end);
738,355✔
2493
      }
2494

2495
      setOperatorCompleted(pOperator);
827,823✔
2496
      break;
827,823✔
2497
    }
2498

2499
    if (pMiaInfo->groupId == 0) {
6,119,111✔
2500
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
1,478,853✔
2501
        pMiaInfo->groupId = pBlock->info.id.groupId;
174,251✔
2502
        pRes->info.id.groupId = pMiaInfo->groupId;
174,251✔
2503
      }
2504
    } else {
2505
      if (pMiaInfo->groupId != pBlock->info.id.groupId) {
4,640,258✔
2506
        // if there are unclosed time window, close it firstly.
2507
        if (pMiaInfo->curTs == INT64_MIN) {
1,986,226✔
2508
          pTaskInfo->code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2509
          T_LONG_JMP(pTaskInfo->env, terrno);
×
2510
        }
2511
        finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
1,986,226✔
2512
        resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
1,986,226✔
2513

2514
        pMiaInfo->prefetchedBlock = pBlock;
1,986,226✔
2515
        cleanupAfterGroupResultGen(pMiaInfo, pRes);
1,986,226✔
2516
        code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
1,986,226✔
2517
        QUERY_CHECK_CODE(code, lino, _end);
1,986,226✔
2518
        if (pRes->info.rows == 0) {
1,986,226✔
2519
          // After filtering for last group, the result is empty, so we need to continue to process next group
2520
          continue;
14,518✔
2521
        } else {
2522
          break;
1,971,708✔
2523
        }
2524
      } else {
2525
        // continue
2526
        pRes->info.id.groupId = pMiaInfo->groupId;
2,654,032✔
2527
      }
2528
    }
2529

2530
    pRes->info.scanFlag = pBlock->info.scanFlag;
4,132,885✔
2531

2532
    if (pIaInfo->scalarSupp.pExprInfo != NULL) {
4,132,885✔
2533
      SExprSupp* pExprSup = &pIaInfo->scalarSupp;
1,503✔
2534
      code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
3,006✔
2535
                                   GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
1,503✔
2536
      QUERY_CHECK_CODE(code, lino, _end);
1,503✔
2537
    }
2538

2539
    code = setInputDataBlock(pSup, pBlock, pIaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
4,132,885✔
2540
    QUERY_CHECK_CODE(code, lino, _end);
4,132,885✔
2541

2542
    doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
4,132,885✔
2543

2544
    code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL, NULL);
4,132,885✔
2545
    QUERY_CHECK_CODE(code, lino, _end);
4,132,885✔
2546

2547
    if (pRes->info.rows >= pOperator->resultInfo.capacity) {
4,132,885✔
2548
      break;
943,262✔
2549
    }
2550
  }
2551

2552
_end:
3,742,793✔
2553
  if (code != TSDB_CODE_SUCCESS) {
3,742,793✔
2554
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2555
    pTaskInfo->code = code;
×
2556
    T_LONG_JMP(pTaskInfo->env, code);
×
2557
  }
2558
}
3,742,793✔
2559

2560
static int32_t mergeAlignedIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
4,013,394✔
2561
  SExecTaskInfo*                        pTaskInfo = pOperator->pTaskInfo;
4,013,394✔
2562
  int32_t                               code = TSDB_CODE_SUCCESS;
4,013,394✔
2563
  SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
4,013,394✔
2564
  SIntervalAggOperatorInfo*             iaInfo = pMiaInfo->intervalAggOperatorInfo;
4,013,394✔
2565
  if (pOperator->status == OP_EXEC_DONE) {
4,013,394✔
2566
    (*ppRes) = NULL;
792,624✔
2567
    return code;
792,624✔
2568
  }
2569

2570
  SSDataBlock* pRes = iaInfo->binfo.pRes;
3,220,770✔
2571
  blockDataCleanup(pRes);
3,220,770✔
2572

2573
  if (iaInfo->binfo.mergeResultBlock) {
3,220,770✔
2574
    while (1) {
2575
      if (pOperator->status == OP_EXEC_DONE) {
3,165,637✔
2576
        break;
400,448✔
2577
      }
2578

2579
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
2,765,189✔
2580
        break;
920,719✔
2581
      }
2582

2583
      doMergeAlignedIntervalAgg(pOperator);
1,844,470✔
2584
    }
2585
  } else {
2586
    doMergeAlignedIntervalAgg(pOperator);
1,898,963✔
2587
  }
2588

2589
  (*ppRes) = (pRes->info.rows == 0) ? NULL : pRes;
3,220,130✔
2590
  return code;
3,220,130✔
2591
}
2592

2593
static int32_t resetMergeAlignedIntervalOperState(SOperatorInfo* pOper) {
1,612✔
2594
  SMergeAlignedIntervalAggOperatorInfo* pInfo = pOper->info;
1,612✔
2595
  
2596
  uint64_t     groupId;  // current groupId
2597
  int64_t      curTs;    // current ts
2598
  SSDataBlock* prefetchedBlock;
2599
  SResultRow*  pResultRow;
2600

2601
  pInfo->groupId = 0;
1,612✔
2602
  pInfo->curTs = INT64_MIN;
1,612✔
2603
  pInfo->prefetchedBlock = NULL;
1,612✔
2604
  pInfo->pResultRow = NULL;
1,612✔
2605

2606
  return resetInterval(pOper, pInfo->intervalAggOperatorInfo);
1,612✔
2607
}
2608

2609
int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode,
841,823✔
2610
                                               SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2611
  QRY_PARAM_CHECK(pOptrInfo);
841,823✔
2612

2613
  int32_t                               code = TSDB_CODE_SUCCESS;
841,823✔
2614
  int32_t                               lino = 0;
841,823✔
2615
  SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
841,823✔
2616
  SOperatorInfo*                        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
841,823✔
2617
  if (miaInfo == NULL || pOperator == NULL) {
841,823✔
2618
    code = terrno;
×
2619
    goto _error;
×
2620
  }
2621
  initOperatorCostInfo(pOperator);
841,823✔
2622

2623
  pOperator->pPhyNode = pNode;
841,823✔
2624
  miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
841,823✔
2625
  if (miaInfo->intervalAggOperatorInfo == NULL) {
841,823✔
2626
    code = terrno;
×
2627
    goto _error;
×
2628
  }
2629

2630
  SInterval interval = {.interval = pNode->interval,
2,525,469✔
2631
                        .sliding = pNode->sliding,
841,823✔
2632
                        .intervalUnit = pNode->intervalUnit,
841,823✔
2633
                        .slidingUnit = pNode->slidingUnit,
841,823✔
2634
                        .offset = pNode->offset,
841,823✔
2635
                        .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision,
841,823✔
2636
                        .timeRange = pNode->timeRange};
2637
  calcIntervalAutoOffset(&interval);
841,823✔
2638

2639
  SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
841,823✔
2640
  SExprSupp*                pSup = &pOperator->exprSupp;
841,823✔
2641
  pSup->hasWindowOrGroup = true;
841,823✔
2642
  pSup->hasWindow = true;
841,823✔
2643

2644
  code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
841,823✔
2645
                            pTaskInfo->pStreamRuntimeInfo);
841,823✔
2646
  QUERY_CHECK_CODE(code, lino, _error);
841,823✔
2647

2648
  miaInfo->curTs = INT64_MIN;
841,823✔
2649
  iaInfo->win = pTaskInfo->window;
841,823✔
2650
  iaInfo->binfo.inputTsOrder = pNode->window.node.inputTsOrder;
841,823✔
2651
  iaInfo->binfo.outputTsOrder = pNode->window.node.outputTsOrder;
841,823✔
2652
  iaInfo->interval = interval;
841,823✔
2653
  iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId;
841,823✔
2654
  iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
841,823✔
2655

2656
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
841,823✔
2657
  initResultSizeInfo(&pOperator->resultInfo, 512);
841,823✔
2658

2659
  int32_t    num = 0;
841,823✔
2660
  SExprInfo* pExprInfo = NULL;
841,823✔
2661
  code = createExprInfo(pNode->window.pFuncs, NULL, &pExprInfo, &num);
841,823✔
2662
  QUERY_CHECK_CODE(code, lino, _error);
841,823✔
2663

2664
  code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
841,823✔
2665
                    NULL, &pTaskInfo->storageAPI.functionStore);
2666
  QUERY_CHECK_CODE(code, lino, _error);
841,823✔
2667

2668
  if (pNode->window.pExprs != NULL) {
841,823✔
2669
    int32_t    numOfScalar = 0;
1,503✔
2670
    SExprInfo* pScalarExprInfo = NULL;
1,503✔
2671
    code = createExprInfo(pNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
1,503✔
2672
    QUERY_CHECK_CODE(code, lino, _error);
1,503✔
2673
    code = initExprSupp(&iaInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
1,503✔
2674
    QUERY_CHECK_CODE(code, lino, _error);
1,503✔
2675
  }
2676

2677
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc);
841,823✔
2678
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
841,823✔
2679
  initBasicInfo(&iaInfo->binfo, pResBlock);
841,823✔
2680
  code = initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win);
841,823✔
2681
  QUERY_CHECK_CODE(code, lino, _error);
841,823✔
2682

2683
  iaInfo->timeWindowInterpo = false;
841,823✔
2684
  code = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo, &iaInfo->timeWindowInterpo);
841,823✔
2685
  QUERY_CHECK_CODE(code, lino, _error);
841,823✔
2686
  if (iaInfo->timeWindowInterpo) {
841,823✔
2687
    iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
2688
  }
2689

2690
  initResultRowInfo(&iaInfo->binfo.resultRowInfo);
841,823✔
2691
  code = blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
841,823✔
2692
  QUERY_CHECK_CODE(code, lino, _error);
841,823✔
2693
  iaInfo->pOperator = pOperator;
841,823✔
2694
  setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
841,823✔
2695
                  false, OP_NOT_OPENED, miaInfo, pTaskInfo);
2696

2697
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAggNext, NULL, destroyMAIOperatorInfo,
841,823✔
2698
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2699
  setOperatorResetStateFn(pOperator, resetMergeAlignedIntervalOperState);
841,823✔
2700

2701
  code = appendDownstream(pOperator, &downstream, 1);
841,823✔
2702
  QUERY_CHECK_CODE(code, lino, _error);
841,823✔
2703

2704
  *pOptrInfo = pOperator;
841,823✔
2705
  return TSDB_CODE_SUCCESS;
841,823✔
2706

2707
_error:
×
2708
  if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
×
2709
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
2710
  pTaskInfo->code = code;
×
2711
  return code;
×
2712
}
2713

2714
//=====================================================================================================================
2715
// merge interval operator
2716
typedef struct SMergeIntervalAggOperatorInfo {
2717
  SIntervalAggOperatorInfo intervalAggOperatorInfo;
2718
  SList*                   groupIntervals;
2719
  SListIter                groupIntervalsIter;
2720
  bool                     hasGroupId;
2721
  uint64_t                 groupId;
2722
  SSDataBlock*             prefetchedBlock;
2723
  bool                     inputBlocksFinished;
2724
} SMergeIntervalAggOperatorInfo;
2725

2726
typedef struct SGroupTimeWindow {
2727
  uint64_t    groupId;
2728
  STimeWindow window;
2729
} SGroupTimeWindow;
2730

2731
void destroyMergeIntervalOperatorInfo(void* param) {
×
2732
  SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
×
2733
  miaInfo->groupIntervals = tdListFree(miaInfo->groupIntervals);
×
2734
  destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo);
×
2735

2736
  taosMemoryFreeClear(param);
×
2737
}
×
2738

2739
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
×
2740
                                        STimeWindow* newWin) {
2741
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2742
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2743
  bool                           ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2744

2745
  SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
×
2746
  int32_t          code = tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
×
2747
  if (code != TSDB_CODE_SUCCESS) {
×
2748
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2749
    return code;
×
2750
  }
2751

2752
  SListIter iter = {0};
×
2753
  tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
×
2754
  SListNode* listNode = NULL;
×
2755
  while ((listNode = tdListNext(&iter)) != NULL) {
×
2756
    SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
×
2757
    if (prevGrpWin->groupId != tableGroupId) {
×
2758
      continue;
×
2759
    }
2760

2761
    STimeWindow* prevWin = &prevGrpWin->window;
×
2762
    if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
×
2763
      SListNode* tmp = tdListPopNode(miaInfo->groupIntervals, listNode);
×
2764
      taosMemoryFreeClear(tmp);
×
2765
    }
2766
  }
2767

2768
  return TSDB_CODE_SUCCESS;
×
2769
}
2770

2771
static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
×
2772
                                   int32_t scanFlag, SSDataBlock* pResultBlock) {
2773
  SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
×
2774
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2775

2776
  SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
×
2777
  SExprSupp*     pExprSup = &pOperatorInfo->exprSupp;
×
2778

2779
  int32_t     startPos = 0;
×
2780
  int32_t     numOfOutput = pExprSup->numOfExprs;
×
2781
  int64_t*    tsCols = extractTsCol(pBlock, iaInfo, pTaskInfo);
×
2782
  uint64_t    tableGroupId = pBlock->info.id.groupId;
×
2783
  bool        ascScan = (iaInfo->binfo.inputTsOrder == TSDB_ORDER_ASC);
×
2784
  TSKEY       blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
×
2785
  SResultRow* pResult = NULL;
×
2786

2787
  STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
×
2788
                                        iaInfo->binfo.inputTsOrder);
2789

2790
  int32_t ret =
2791
      setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2792
                             numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2793
  if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2794
    T_LONG_JMP(pTaskInfo->env, ret);
×
2795
  }
2796

2797
  TSKEY   ekey = ascScan ? win.ekey : win.skey;
×
2798
  int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2799
                                                 iaInfo->binfo.inputTsOrder);
2800
  if (forwardRows <= 0) {
×
2801
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2802
  }
2803

2804
  // prev time window not interpolation yet.
2805
  if (iaInfo->timeWindowInterpo) {
×
2806
    SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId, pTaskInfo);
×
2807
    doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
×
2808

2809
    // restore current time window
2810
    ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
×
2811
                                 numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2812
    if (ret != TSDB_CODE_SUCCESS) {
×
2813
      T_LONG_JMP(pTaskInfo->env, ret);
×
2814
    }
2815

2816
    // window start key interpolation
2817
    ret = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
×
2818
    if (ret != TSDB_CODE_SUCCESS) {
×
2819
      T_LONG_JMP(pTaskInfo->env, ret);
×
2820
    }
2821
  }
2822

2823
  updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
×
2824
  ret = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2825
                                        forwardRows, pBlock->info.rows, numOfOutput);
×
2826
  if (ret != TSDB_CODE_SUCCESS) {
×
2827
    T_LONG_JMP(pTaskInfo->env, ret);
×
2828
  }
2829
  doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2830

2831
  // output previous interval results after this interval (&win) is closed
2832
  int32_t code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win);
×
2833
  if (code != TSDB_CODE_SUCCESS) {
×
2834
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2835
    T_LONG_JMP(pTaskInfo->env, code);
×
2836
  }
2837

2838
  STimeWindow nextWin = win;
×
2839
  while (1) {
×
2840
    int32_t prevEndPos = forwardRows - 1 + startPos;
×
2841
    startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos,
×
2842
                                      iaInfo->binfo.inputTsOrder);
2843
    if (startPos < 0) {
×
2844
      break;
×
2845
    }
2846

2847
    // null data, failed to allocate more memory buffer
2848
    code =
2849
        setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
×
2850
                               pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
2851
    if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
×
2852
      T_LONG_JMP(pTaskInfo->env, code);
×
2853
    }
2854

2855
    ekey = ascScan ? nextWin.ekey : nextWin.skey;
×
2856
    forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL,
×
2857
                                           iaInfo->binfo.inputTsOrder);
2858

2859
    // window start(end) key interpolation
2860
    code = doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
×
2861
    if (code != TSDB_CODE_SUCCESS) {
×
2862
      T_LONG_JMP(pTaskInfo->env, code);
×
2863
    }
2864

2865
    updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
×
2866
    code = applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
×
2867
                                           forwardRows, pBlock->info.rows, numOfOutput);
×
2868
    if (code != TSDB_CODE_SUCCESS) {
×
2869
      T_LONG_JMP(pTaskInfo->env, code);
×
2870
    }
2871
    doCloseWindow(pResultRowInfo, iaInfo, pResult);
×
2872

2873
    // output previous interval results after this interval (&nextWin) is closed
2874
    code = outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin);
×
2875
    if (code != TSDB_CODE_SUCCESS) {
×
2876
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2877
      T_LONG_JMP(pTaskInfo->env, code);
×
2878
    }
2879
  }
2880

2881
  if (iaInfo->timeWindowInterpo) {
×
2882
    saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols);
×
2883
  }
2884
}
×
2885

2886
static int32_t doMergeIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2887
  int32_t        code = TSDB_CODE_SUCCESS;
×
2888
  int32_t        lino = 0;
×
2889
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
×
2890

2891
  SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info;
×
2892
  SIntervalAggOperatorInfo*      iaInfo = &miaInfo->intervalAggOperatorInfo;
×
2893
  SExprSupp*                     pExpSupp = &pOperator->exprSupp;
×
2894

2895
  if (pOperator->status == OP_EXEC_DONE) {
×
2896
    (*ppRes) = NULL;
×
2897
    return code;
×
2898
  }
2899

2900
  SSDataBlock* pRes = iaInfo->binfo.pRes;
×
2901
  blockDataCleanup(pRes);
×
2902
  code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
×
2903
  QUERY_CHECK_CODE(code, lino, _end);
×
2904

2905
  if (!miaInfo->inputBlocksFinished) {
×
2906
    SOperatorInfo* downstream = pOperator->pDownstream[0];
×
2907
    while (1) {
×
2908
      SSDataBlock* pBlock = NULL;
×
2909
      if (miaInfo->prefetchedBlock == NULL) {
×
2910
        pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2911
      } else {
2912
        pBlock = miaInfo->prefetchedBlock;
×
2913
        miaInfo->groupId = pBlock->info.id.groupId;
×
2914
        miaInfo->prefetchedBlock = NULL;
×
2915
      }
2916

2917
      if (pBlock == NULL) {
×
2918
        tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
×
2919
        miaInfo->inputBlocksFinished = true;
×
2920
        break;
×
2921
      }
2922

2923
      if (!miaInfo->hasGroupId) {
×
2924
        miaInfo->hasGroupId = true;
×
2925
        miaInfo->groupId = pBlock->info.id.groupId;
×
2926
      } else if (miaInfo->groupId != pBlock->info.id.groupId) {
×
2927
        miaInfo->prefetchedBlock = pBlock;
×
2928
        break;
×
2929
      }
2930

2931
      pRes->info.scanFlag = pBlock->info.scanFlag;
×
2932
      code = setInputDataBlock(pExpSupp, pBlock, iaInfo->binfo.inputTsOrder, pBlock->info.scanFlag, true);
×
2933
      QUERY_CHECK_CODE(code, lino, _end);
×
2934

2935
      doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, pBlock->info.scanFlag, pRes);
×
2936

2937
      if (pRes->info.rows >= pOperator->resultInfo.threshold) {
×
2938
        break;
×
2939
      }
2940
    }
2941

2942
    pRes->info.id.groupId = miaInfo->groupId;
×
2943
  }
2944

2945
  if (miaInfo->inputBlocksFinished) {
×
2946
    SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
×
2947

2948
    if (listNode != NULL) {
×
2949
      SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
×
2950
      pRes->info.id.groupId = grpWin->groupId;
×
2951
    }
2952
  }
2953

2954
  if (pRes->info.rows == 0) {
×
2955
    setOperatorCompleted(pOperator);
×
2956
  }
2957

2958
_end:
×
2959
  if (code != TSDB_CODE_SUCCESS) {
×
2960
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2961
    pTaskInfo->code = code;
×
2962
    T_LONG_JMP(pTaskInfo->env, code);
×
2963
  }
2964
  (*ppRes) = (pRes->info.rows == 0) ? NULL : pRes;
×
2965
  return code;
×
2966
}
2967

2968
static int32_t resetMergeIntervalOperState(SOperatorInfo* pOper) {
×
2969
  SMergeIntervalAggOperatorInfo* pInfo = pOper->info;
×
2970

2971
  pInfo->hasGroupId = false;
×
2972
  pInfo->groupId = 0;
×
2973
  pInfo->prefetchedBlock = NULL;
×
2974
  pInfo->inputBlocksFinished = false;
×
2975
  tdListEmpty(pInfo->groupIntervals);
×
2976
  
2977
  SIntervalAggOperatorInfo* pIntervalInfo = &pInfo->intervalAggOperatorInfo;
×
2978
  return resetInterval(pOper, pIntervalInfo);
×
2979
}
2980

2981
int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode,
×
2982
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
2983
  QRY_PARAM_CHECK(pOptrInfo);
×
2984

2985
  int32_t                        code = TSDB_CODE_SUCCESS;
×
2986
  int32_t                        lino = 0;
×
2987
  SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo));
×
2988
  SOperatorInfo*                 pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
2989
  if (pMergeIntervalInfo == NULL || pOperator == NULL) {
×
2990
    code = terrno;
×
2991
    goto _error;
×
2992
  }
2993
  initOperatorCostInfo(pOperator);
×
2994

2995
  pOperator->pPhyNode = pIntervalPhyNode;
×
2996
  SInterval interval = {.interval = pIntervalPhyNode->interval,
×
2997
                        .sliding = pIntervalPhyNode->sliding,
×
2998
                        .intervalUnit = pIntervalPhyNode->intervalUnit,
×
2999
                        .slidingUnit = pIntervalPhyNode->slidingUnit,
×
3000
                        .offset = pIntervalPhyNode->offset,
×
3001
                        .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
×
3002
                        .timeRange = pIntervalPhyNode->timeRange};
3003
  calcIntervalAutoOffset(&interval);
×
3004

3005
  pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
×
3006

3007
  SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo;
×
3008
  pIntervalInfo->win = pTaskInfo->window;
×
3009
  pIntervalInfo->binfo.inputTsOrder = pIntervalPhyNode->window.node.inputTsOrder;
×
3010
  pIntervalInfo->interval = interval;
×
3011
  pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock;
×
3012
  pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
×
3013
  pIntervalInfo->binfo.outputTsOrder = pIntervalPhyNode->window.node.outputTsOrder;
×
3014

3015
  SExprSupp* pExprSupp = &pOperator->exprSupp;
×
3016
  pExprSupp->hasWindowOrGroup = true;
×
3017
  pExprSupp->hasWindow = true;
×
3018

3019
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
×
3020
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
3021

3022
  int32_t    num = 0;
×
3023
  SExprInfo* pExprInfo = NULL;
×
3024
  code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num);
×
3025
  QUERY_CHECK_CODE(code, lino, _error);
×
3026

3027
  code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
×
3028
                    NULL, &pTaskInfo->storageAPI.functionStore);
3029
  if (code != TSDB_CODE_SUCCESS) {
×
3030
    goto _error;
×
3031
  }
3032

3033
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
×
3034
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
×
3035
  initBasicInfo(&pIntervalInfo->binfo, pResBlock);
×
3036
  code = initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
×
3037
  QUERY_CHECK_CODE(code, lino, _error);
×
3038

3039
  pIntervalInfo->timeWindowInterpo = false;
×
3040
  code = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo, &pIntervalInfo->timeWindowInterpo);
×
3041
  QUERY_CHECK_CODE(code, lino, _error);
×
3042
  if (pIntervalInfo->timeWindowInterpo) {
×
3043
    pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
×
3044
    if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
×
3045
      goto _error;
×
3046
    }
3047
  }
3048

3049
  pIntervalInfo->pOperator = pOperator;
×
3050
  initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
×
3051
  setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
×
3052
                  OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
3053
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAggNext, NULL, destroyMergeIntervalOperatorInfo,
×
3054
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
3055
  setOperatorResetStateFn(pOperator, resetMergeIntervalOperState);
×
3056

3057
  code = appendDownstream(pOperator, &downstream, 1);
×
3058
  if (code != TSDB_CODE_SUCCESS) {
×
3059
    goto _error;
×
3060
  }
3061

3062
  *pOptrInfo = pOperator;
×
3063
  return TSDB_CODE_SUCCESS;
×
3064
_error:
×
3065
  if (pMergeIntervalInfo != NULL) {
×
3066
    destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
×
3067
  }
3068
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
3069
  pTaskInfo->code = code;
×
3070
  return code;
×
3071
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc